import torch from torch.autograd.functional import jacobian import itertools import math import abc ############### CONSTANTS ################### ############# COMPOSITION FUNCTIONS ################### def compose(f,g): """ this function composes two functions f and g in the order f(g(*args)) it returns a function. """ return lambda *args: f(g(*args)) def recursively_compose_functions(fn,n): """ This function takes a function fn and composes it with itself n times. Returns an array/list that contains each of the n+1 composition levels, ordered from fn() to fn^n() """ #Set base conditions out_func = None out_func_list = [] #build the compositions of functions for f in itertools.repeat(fn, n): #if first iteration if out_func == None: out_func = f else: out_func = compose(f,out_func) #append the ou out_func_list.append(out_func) return out_func_list ############### Physical Model ################### class PhysicalModel(): """ This is designed as a default, base class for the physical model of satellite interactions. It captures the constants that characterize interactions, and provides a set of function to calculate changes to the physical environment. It has two sets of interfaces, one that handles tensors (denoted by _function():) and one that handles state objects (denoted by function():) """ def __init__(self ,debris_from_collision ,constellations_collision_risk ,debris_decay_rate ,launch_debris ,debris_autocatalysis_rate): self.debris_from_collision= debris_from_collision self.constellations_collision_risk = constellations_collision_risk self.debris_decay_rate = debris_decay_rate self.launch_debris = launch_debris self.debris_autocatalysis_rate = debris_autocatalysis_rate def __str__(self): return "\n{}\n{}\n{}\n{}\n{}".format( self.debris_from_collision ,self.constellations_collision_risk ,self.debris_decay_rate ,self.launch_debris ,self.debris_autocatalysis_rate ) def _survival(self, stocks, debris): #returns the survival rate (not destruction rate) for the given constellation. return 1- torch.exp(-self.constellations_collision_risk * stocks - debris.sum()) def survival(self, states): """ This is an interface wrapper """ return self._survival(states.stocks, states.debris) def _transition_debris(self, stocks,debris,launches): """ This function transitions debris levels based off of a state and launch decision. """ new_debris = (1-self.debris_decay_rate + self.debris_autocatalysis_rate) * debris \ + self.launch_debris * launches.sum() \ + self.debris_from_collision * (1-self._survival(stocks,debris)) @ stocks return new_debris def transition_debris(self, state, estimand_interface): """ This is an interface wrapper. """ return self._transition_debris(state.stocks,state.debris,estimand_interface.launches) def _transition_stocks(self, stocks, debris, launches): """ This function calculates new stock levels. """ new_stock = self._survival(stocks,debris) * stocks + launches return new_stock def transition_stocks(self, state, estimand_interface): """ This is an interface wrapper """ return self._transition_stocks(state.stocks,state.debris,estimand_interface.launches) def transition(self, state, estimand_interface): """ This function takes a state and launch decision, and updates the state according to the physical laws of motion. It returns a State object. """ d = self.transition_debris(state, estimand_interface) s = self.transition_stocks(state, estimand_interface) return States(s,d) def transition_jacobian_wrt_states(self,state,estimand_interface): """ This function takes values of the state and estimand, and returns a properly formatted jacobian of the transition function with respect to the states. The reason this is done here is because there is some reshaping that must happen, so it is easier to wrap it here. """ jac_debris = jacobian(self._transition_debris, (state.stocks,state.debris,estimand_interface.launches)) jac_stocks = jacobian(self._transition_stocks, (state.stocks,state.debris,estimand_interface.launches)) h1 = torch.cat((jac_stocks[0],jac_stocks[1]),dim=1) h2 = torch.cat((jac_debris[0],jac_debris[1]),dim=1) a = torch.cat((h1,h2),dim=0) return a def transition_jacobian_wrt_launches(self,state,estimand_interface): """ This function takes values of the state and estimand, and returns a properly formatted jacobian of the transition function with respect to the launch decisions. The reason this is done here is because there is some reshaping that must happen, so it is easier to wrap it here. """ jac_debris = jacobian(self._transition_debris, (state.stocks,state.debris,estimand_interface.launches)) jac_stocks = jacobian(self._transition_stocks, (state.stocks,state.debris,estimand_interface.launches)) b = torch.cat((jac_stocks[2],jac_debris[2].T),dim=1) return b class States(): """ This is supposed to capture the state variables of the model, to create a common interface when passing between functions. """ def __init__(self, stocks,debris): self.stocks = stocks self.debris = debris def __str__(self): return "stocks\t{} \ndebris\t {}".format(self.stocks,self.debris) @property def number_constellations(self): return len(self.stocks) @property def number_debris_trackers(self): return len(self.debris) ################ NEURAL NETWORK TOOLS ################### class EstimandInterface(): """ This defines a clean interface for working with the estimand (i.e. thing we are trying to estimate). In general, we are trying to estimate the choice variables and the partial derivatives of the value functions. This This class wraps output for the neural network (or other estimand), allowing me to - easily substitute various types of launch functions by having a common interface - this eases testing - check dimensionality etc without dealing with randomness - again, easing testing - reason more cleanly about the component pieces - easing programming - provide a clean interface to find constellation level launch decisions etc. It takes inputs of two general categories: - the choice function results - the partial derivatives of the value function """ def __init__(self, partials, launches, deorbits=None): self.partials = partials self.launches = launches self.deorbits = deorbits @property def number_constellations(self): return len(self.launches) @property def number_states(self): return self.number_constellations+1 #This depends on the debris trackers technically. def launch_single(self, constellation): #returns the launch decision for the constellation of interest filter_tensor = torch.zeros(self.number_constellations) filter_tensor[constellation] = 1.0 return self.launches @ filter_tensor def launch_vector(self, constellation): #returns the launch decision for the constellation of interest as a vector filter_tensor = torch.zeros(self.number_constellations) filter_tensor[constellation] = 1.0 return self.launches * filter_tensor def partial_vector(self, constellation): #returns the partials of the value function corresponding to the constellation of interest filter_tensor = torch.zeros(self.number_states) filter_tensor[constellation] = 1.0 return self.partials @ filter_tensor def partial_matrix(self, constellation): #returns the partials of the value function corresponding to #the constellation of interest as a matrix filter_tensor = torch.zeros(self.number_states) filter_tensor[constellation] = 1.0 return self.partials * filter_tensor def __str__(self): #just a human readable descriptor return "Launch Decisions and Partial Derivativs of value function with\n\tlaunches\n\t\t {}\n\tPartials\n\t\t{}".format(self.launches,self.partials) ############## ECONOMIC MODEL ############ #Abstract class describing profit. Each subclass will connect a profit function "style" to a specific instance of parameters. class EconomicAgent(metaclass=abc.ABCMeta): @abc.abstractmethod def period_benefit(self,state,estimand_interface): pass @abc.abstractmethod def _period_benefit(self): pass @abc.abstractmethod def period_benefit_jacobian_wrt_states(self): pass @abc.abstractmethod def _period_benefit_jacobian_wrt_states(self): pass @abc.abstractmethod def period_benefit_jacobian_wrt_launches(self): pass @abc.abstractmethod def _period_benefit_jacobian_wrt_launches(self): pass class LinearProfit(EconomicAgent): """ The simplest type of profit function available. """ def __init__(self, constellation_number, discount_factor, benefit_weight, launch_cost, deorbit_cost=0): #track which constellation this is. self.constellation_number = constellation_number #parameters describing the agent's situation self.discount_factor = discount_factor self.benefit_weights = benefit_weight self.launch_cost = launch_cost self.deorbit_cost = deorbit_cost def __str__(self): return "LinearProfit\n Benefit weights:\t{}\n launch cost:\t{}\n Deorbit cost:\t{}".format(self.benefit_weights, self.launch_cost, self.deorbit_cost) def period_benefit(self,state,estimand_interface): return self._period_benefit(state.stocks, state.debris, estimand_interface.launches) def _period_benefit(self,stocks,debris,launches): profits = self.benefit_weights @ stocks \ - self.launch_cost * launches[self.constellation_number] #\ #- deorbit_cost @ deorbits[self.constellation_number] return profits def period_benefit_jacobian_wrt_states(self, states, estimand_interface): return self._period_benefit_jacobian_wrt_states(states.stocks, states.debris, estimand_interface.launches) def _period_benefit_jacobian_wrt_states(self, stocks, debris, launches): jac = jacobian(self._period_benefit, (stocks,debris,launches)) return torch.cat((jac[0], jac[1])) def period_benefit_jacobian_wrt_launches(self, states, estimand_interface): return self._period_benefit_jacobian_wrt_launches(states.stocks, states.debris, estimand_interface.launches) def _period_benefit_jacobian_wrt_launches(self,stocks,debris,launches): jac = jacobian(self._period_benefit, (stocks,debris,launches)) return jac[2] #other profit functions to implement # price competition (substitution) # military (complementarity) ############### TRANSITION AND OPTIMALITY FUNCTIONS ################# #at some point I should wrap the two functions below into a class that holds various things class OrbitalModel: def __init__(self, number_debris_trackers, num_choice_variables): pass def single_transition(physical_model, economic_agent, states, estimand_interface): """ This function represents the inverted envelope conditions. It allows us to describe the derivatives of the value function evaluated at time $t+1$ in terms based in time period $t$. It takes a few derivatives (jacobians), and due to the nature of the return as tuples they must be concatenated. It returns the transitioned values """ #Transition the partials #Get the discounted jacobian with respect to states A = economic_agent.discount_factor * physical_model.transition_jacobian_wrt_states(states, estimand_interface) f_theta = economic_agent.period_benefit_jacobian_wrt_states(states, estimand_interface) T = estimand_interface.partial_vector(economic_agent.constellation_number) - f_theta #need to do testing to nicely handle when A is non-invertible #using linalg.solve because it has more numerical stability and is faster. iterated_value_partials = torch.linalg.solve(A,T) #transition the states iterated_states = physical_model.transition(states, estimand_interface) #The goal is to create a set of partials that I can throw into the appropriate return iterated_value_partials, iterated_states #Optimality math def optimality(physical_model, economic_agent, states, estimand_interface): """ This takes the given models, states, and the estimand and returns the optimality condition. """ fx = economic_agent.period_benefit_jacobian_wrt_launches(states, estimand_interface) B = physical_model.transition_jacobian_wrt_launches(states, estimand_interface) iterated_partials, iterated_state = single_transition(physical_model, economic_agent, states, estimand_interface) return fx + economic_agent.discount_factor * B @ iterated_partials, iterated_partials, iterated_state