You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
384 lines
14 KiB
Python
384 lines
14 KiB
Python
import torch
|
|
from torch.autograd.functional import jacobian
|
|
import itertools
|
|
import math
|
|
import abc
|
|
|
|
|
|
############### CONSTANTS ###################
|
|
|
|
|
|
############# COMPOSITION FUNCTIONS ###################
|
|
|
|
def compose(f,g):
|
|
"""
|
|
this function composes two functions f and g in
|
|
the order f(g(*args))
|
|
|
|
it returns a function.
|
|
"""
|
|
return lambda *args: f(g(*args))
|
|
|
|
def recursively_compose_functions(fn,n):
|
|
"""
|
|
This function takes a function fn and composes it with itself n times.
|
|
|
|
Returns an array/list that contains each of the n+1 composition levels, ordered
|
|
from fn() to fn^n()
|
|
"""
|
|
#Set base conditions
|
|
out_func = None
|
|
out_func_list = []
|
|
|
|
#build the compositions of functions
|
|
for f in itertools.repeat(fn, n):
|
|
#if first iteration
|
|
if out_func == None:
|
|
out_func = f
|
|
else:
|
|
out_func = compose(f,out_func)
|
|
|
|
#append the ou
|
|
out_func_list.append(out_func)
|
|
|
|
return out_func_list
|
|
|
|
############### Physical Model ###################
|
|
class PhysicalModel():
|
|
"""
|
|
This is designed as a default, base class for the physical model of satellite interactions.
|
|
|
|
It captures the constants that characterize interactions, and provides a set of
|
|
function to calculate changes to the physical environment.
|
|
|
|
It has two sets of interfaces, one that handles tensors (denoted by _function():)
|
|
and one that handles state objects (denoted by function():)
|
|
"""
|
|
def __init__(self
|
|
,debris_from_collision
|
|
,constellations_collision_risk
|
|
,debris_decay_rate
|
|
,launch_debris
|
|
,debris_autocatalysis_rate):
|
|
self.debris_from_collision= debris_from_collision
|
|
self.constellations_collision_risk = constellations_collision_risk
|
|
self.debris_decay_rate = debris_decay_rate
|
|
self.launch_debris = launch_debris
|
|
self.debris_autocatalysis_rate = debris_autocatalysis_rate
|
|
|
|
def __str__(self):
|
|
return "\n{}\n{}\n{}\n{}\n{}".format(
|
|
self.debris_from_collision
|
|
,self.constellations_collision_risk
|
|
,self.debris_decay_rate
|
|
,self.launch_debris
|
|
,self.debris_autocatalysis_rate
|
|
)
|
|
|
|
def _survival(self, stocks, debris):
|
|
#returns the survival rate (not destruction rate) for the given constellation.
|
|
return 1- torch.exp(-self.constellations_collision_risk * stocks - debris.sum())
|
|
|
|
def survival(self, states):
|
|
"""
|
|
This is an interface wrapper
|
|
"""
|
|
return self._survival(states.stocks, states.debris)
|
|
|
|
def _transition_debris(self, stocks,debris,launches):
|
|
"""
|
|
This function transitions debris levels based off of a state and launch decision.
|
|
"""
|
|
|
|
new_debris = (1-self.debris_decay_rate + self.debris_autocatalysis_rate) * debris \
|
|
+ self.launch_debris * launches.sum() \
|
|
+ self.debris_from_collision * (1-self._survival(stocks,debris)) @ stocks
|
|
return new_debris
|
|
|
|
def transition_debris(self, state, estimand_interface):
|
|
"""
|
|
This is an interface wrapper.
|
|
"""
|
|
return self._transition_debris(state.stocks,state.debris,estimand_interface.launches)
|
|
|
|
|
|
def _transition_stocks(self, stocks, debris, launches):
|
|
"""
|
|
This function calculates new stock levels.
|
|
"""
|
|
new_stock = self._survival(stocks,debris) * stocks + launches
|
|
return new_stock
|
|
|
|
|
|
def transition_stocks(self, state, estimand_interface):
|
|
"""
|
|
This is an interface wrapper
|
|
"""
|
|
return self._transition_stocks(state.stocks,state.debris,estimand_interface.launches)
|
|
|
|
def transition(self, state, estimand_interface):
|
|
"""
|
|
This function takes a state and launch decision, and updates the state according to the physical laws of motion.
|
|
|
|
It returns a State object.
|
|
"""
|
|
d = self.transition_debris(state, estimand_interface)
|
|
s = self.transition_stocks(state, estimand_interface)
|
|
|
|
return States(s,d)
|
|
|
|
def transition_jacobian_wrt_states(self,state,estimand_interface):
|
|
"""
|
|
This function takes values of the state and estimand, and returns a properly formatted
|
|
jacobian of the transition function with respect to the states.
|
|
|
|
The reason this is done here is because there is some reshaping that must happen, so
|
|
it is easier to wrap it here.
|
|
"""
|
|
jac_debris = jacobian(self._transition_debris, (state.stocks,state.debris,estimand_interface.launches))
|
|
jac_stocks = jacobian(self._transition_stocks, (state.stocks,state.debris,estimand_interface.launches))
|
|
|
|
h1 = torch.cat((jac_stocks[0],jac_stocks[1]),dim=1)
|
|
h2 = torch.cat((jac_debris[0],jac_debris[1]),dim=1)
|
|
|
|
a = torch.cat((h1,h2),dim=0)
|
|
return a
|
|
|
|
def transition_jacobian_wrt_launches(self,state,estimand_interface):
|
|
"""
|
|
This function takes values of the state and estimand, and returns a properly formatted
|
|
jacobian of the transition function with respect to the launch decisions.
|
|
|
|
The reason this is done here is because there is some reshaping that must happen, so
|
|
it is easier to wrap it here.
|
|
"""
|
|
jac_debris = jacobian(self._transition_debris, (state.stocks,state.debris,estimand_interface.launches))
|
|
jac_stocks = jacobian(self._transition_stocks, (state.stocks,state.debris,estimand_interface.launches))
|
|
|
|
b = torch.cat((jac_stocks[2],jac_debris[2].T),dim=1)
|
|
|
|
return b
|
|
|
|
|
|
class States():
|
|
"""
|
|
This is supposed to capture the state variables of the model, to create a common interface
|
|
when passing between functions.
|
|
"""
|
|
def __init__(self, stocks,debris):
|
|
self.stocks = stocks
|
|
self.debris = debris
|
|
|
|
|
|
def __str__(self):
|
|
return "stocks\t{} \ndebris\t {}".format(self.stocks,self.debris)
|
|
|
|
@property
|
|
def values(self):
|
|
#return these as a single tensor.
|
|
return torch.cat((self.stocks,self.debris), dim=-1)
|
|
|
|
@property
|
|
def number_constellations(self):
|
|
return len(self.stocks)
|
|
|
|
@property
|
|
def number_debris_trackers(self):
|
|
return len(self.debris)
|
|
|
|
################ NEURAL NETWORK TOOLS ###################
|
|
|
|
class EstimandInterface():
|
|
"""
|
|
This defines a clean interface for working with the estimand (i.e. thing we are trying to estimate).
|
|
In general, we are trying to estimate the choice variables and the partial derivatives of the value functions.
|
|
This
|
|
|
|
This class wraps output for the neural network (or other estimand), allowing me to
|
|
- easily substitute various types of launch functions by having a common interface
|
|
- this eases testing
|
|
- check dimensionality etc without dealing with randomness
|
|
- again, easing testing
|
|
- reason more cleanly about the component pieces
|
|
- easing programming
|
|
- provide a clean interface to find constellation level launch decisions etc.
|
|
|
|
It takes inputs of two general categories:
|
|
- the choice function results
|
|
- the partial derivatives of the value function
|
|
"""
|
|
def __init__(self, partials, launches, deorbits=None):
|
|
self.partials = partials
|
|
self.launches = launches
|
|
self.deorbits = deorbits
|
|
@property
|
|
def number_constellations(self):
|
|
return len(self.launches)
|
|
@property
|
|
def number_states(self):
|
|
return self.number_constellations+1 #This depends on the debris trackers technically.
|
|
|
|
def launch_single(self, constellation):
|
|
#returns the launch decision for the constellation of interest
|
|
|
|
filter_tensor = torch.zeros(self.number_constellations)
|
|
filter_tensor[constellation] = 1.0
|
|
|
|
return self.launches @ filter_tensor
|
|
|
|
def launch_vector(self, constellation):
|
|
#returns the launch decision for the constellation of interest as a vector
|
|
|
|
filter_tensor = torch.zeros(self.number_constellations)
|
|
filter_tensor[constellation] = 1.0
|
|
|
|
return self.launches * filter_tensor
|
|
|
|
def partial_vector(self, constellation):
|
|
#returns the partials of the value function corresponding to the constellation of interest
|
|
|
|
filter_tensor = torch.zeros(self.number_states)
|
|
filter_tensor[constellation] = 1.0
|
|
|
|
return self.partials @ filter_tensor
|
|
|
|
def partial_matrix(self, constellation):
|
|
#returns the partials of the value function corresponding to
|
|
#the constellation of interest as a matrix
|
|
|
|
filter_tensor = torch.zeros(self.number_states)
|
|
filter_tensor[constellation] = 1.0
|
|
|
|
return self.partials * filter_tensor
|
|
|
|
def __str__(self):
|
|
#just a human readable descriptor
|
|
return "Launch Decisions and Partial Derivativs of value function with\n\tlaunches\n\t\t {}\n\tPartials\n\t\t{}".format(self.launches,self.partials)
|
|
|
|
|
|
############## ECONOMIC MODEL ############
|
|
|
|
#Abstract class describing profit. Each subclass will connect a profit function "style" to a specific instance of parameters.
|
|
class EconomicAgent(metaclass=abc.ABCMeta):
|
|
@abc.abstractmethod
|
|
def period_benefit(self,state,estimand_interface):
|
|
pass
|
|
@abc.abstractmethod
|
|
def _period_benefit(self):
|
|
pass
|
|
@abc.abstractmethod
|
|
def period_benefit_jacobian_wrt_states(self):
|
|
pass
|
|
@abc.abstractmethod
|
|
def _period_benefit_jacobian_wrt_states(self):
|
|
pass
|
|
@abc.abstractmethod
|
|
def period_benefit_jacobian_wrt_launches(self):
|
|
pass
|
|
@abc.abstractmethod
|
|
def _period_benefit_jacobian_wrt_launches(self):
|
|
pass
|
|
|
|
class LinearProfit(EconomicAgent):
|
|
"""
|
|
The simplest type of profit function available.
|
|
"""
|
|
def __init__(self, batch_size, constellation_number, discount_factor, benefit_weight, launch_cost, deorbit_cost=0, ):
|
|
self.batch_size = batch_size
|
|
|
|
|
|
#track which constellation this is.
|
|
self.constellation_number = constellation_number
|
|
|
|
#get the number of constellations (pull from the benefit weight, in the dimension that counts across constellations)
|
|
self.number_of_constellations = benefit_weight.size()[1]
|
|
|
|
#parameters describing the agent's situation
|
|
self.discount_factor = discount_factor
|
|
self.benefit_weights = benefit_weight
|
|
self.launch_cost = launch_cost
|
|
self.deorbit_cost = deorbit_cost
|
|
|
|
def __str__(self):
|
|
return "LinearProfit\n Benefit weights:\t{}\n launch cost:\t{}\n Deorbit cost:\t{}".format(self.benefit_weights, self.launch_cost, self.deorbit_cost)
|
|
|
|
def period_benefit(self,state,estimand_interface):
|
|
return self._period_benefit(state.stocks, state.debris, estimand_interface.choices)
|
|
|
|
def _period_benefit(self,stocks,debris,launches):
|
|
# multiply benefits times stocks
|
|
# sum across constellations
|
|
# reshape to standard dimensions
|
|
# subtract launch costs.
|
|
pass
|
|
|
|
def period_benefit_jacobian_wrt_states(self, states, estimand_interface):
|
|
return self._period_benefit_jacobian_wrt_states(states.stocks, states.debris, estimand_interface.launches)
|
|
|
|
def _period_benefit_jacobian_wrt_states(self, stocks, debris, launches):
|
|
jac = jacobian(self._period_benefit, (stocks,debris,launches))
|
|
return torch.cat((jac[0], jac[1]))
|
|
|
|
def period_benefit_jacobian_wrt_launches(self, states, estimand_interface):
|
|
return self._period_benefit_jacobian_wrt_launches(states.stocks, states.debris, estimand_interface.launches)
|
|
|
|
def _period_benefit_jacobian_wrt_launches(self,stocks,debris,launches):
|
|
jac = jacobian(self._period_benefit, (stocks,debris,launches))
|
|
return jac[2]
|
|
|
|
|
|
#other profit functions to implement
|
|
# price competition (substitution)
|
|
# military (complementarity)
|
|
|
|
############### TRANSITION AND OPTIMALITY FUNCTIONS #################
|
|
#at some point I should wrap the two functions below into a class that holds various things
|
|
|
|
class OrbitalModel:
|
|
def __init__(self, number_debris_trackers, num_choice_variables):
|
|
pass
|
|
|
|
def single_transition(physical_model, economic_agent, states, estimand_interface):
|
|
"""
|
|
This function represents the inverted envelope conditions.
|
|
It allows us to describe the derivatives of the value function evaluated at time $t+1$ in terms based in time period $t$.
|
|
|
|
It takes a few derivatives (jacobians), and due to the nature of the return as tuples
|
|
they must be concatenated.
|
|
|
|
It returns the transitioned values
|
|
"""
|
|
|
|
#Transition the partials
|
|
#Get the discounted jacobian with respect to states
|
|
A = economic_agent.discount_factor * physical_model.transition_jacobian_wrt_states(states, estimand_interface)
|
|
f_theta = economic_agent.period_benefit_jacobian_wrt_states(states, estimand_interface)
|
|
|
|
T = estimand_interface.partial_vector(economic_agent.constellation_number) - f_theta
|
|
|
|
#need to do testing to nicely handle when A is non-invertible
|
|
|
|
#using linalg.solve because it has more numerical stability and is faster.
|
|
iterated_value_partials = torch.linalg.solve(A,T)
|
|
|
|
#transition the states
|
|
iterated_states = physical_model.transition(states, estimand_interface)
|
|
|
|
#The goal is to create a set of partials that I can throw into the appropriate
|
|
return iterated_value_partials, iterated_states
|
|
|
|
|
|
|
|
#Optimality math
|
|
def optimality(physical_model, economic_agent, states, estimand_interface):
|
|
"""
|
|
This takes the given models, states, and the estimand and returns the optimality condition.
|
|
"""
|
|
fx = economic_agent.period_benefit_jacobian_wrt_launches(states, estimand_interface)
|
|
B = physical_model.transition_jacobian_wrt_launches(states, estimand_interface)
|
|
|
|
iterated_partials, iterated_state = single_transition(physical_model, economic_agent, states, estimand_interface)
|
|
|
|
return fx + economic_agent.discount_factor * B @ iterated_partials, iterated_partials, iterated_state
|
|
|