You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
Orbits/Code/combined.py

384 lines
14 KiB
Python

import torch
from torch.autograd.functional import jacobian
import itertools
import math
import abc
############### CONSTANTS ###################
############# COMPOSITION FUNCTIONS ###################
def compose(f,g):
"""
this function composes two functions f and g in
the order f(g(*args))
it returns a function.
"""
return lambda *args: f(g(*args))
def recursively_compose_functions(fn,n):
"""
This function takes a function fn and composes it with itself n times.
Returns an array/list that contains each of the n+1 composition levels, ordered
from fn() to fn^n()
"""
#Set base conditions
out_func = None
out_func_list = []
#build the compositions of functions
for f in itertools.repeat(fn, n):
#if first iteration
if out_func == None:
out_func = f
else:
out_func = compose(f,out_func)
#append the ou
out_func_list.append(out_func)
return out_func_list
############### Physical Model ###################
class PhysicalModel():
"""
This is designed as a default, base class for the physical model of satellite interactions.
It captures the constants that characterize interactions, and provides a set of
function to calculate changes to the physical environment.
It has two sets of interfaces, one that handles tensors (denoted by _function():)
and one that handles state objects (denoted by function():)
"""
def __init__(self
,debris_from_collision
,constellations_collision_risk
,debris_decay_rate
,launch_debris
,debris_autocatalysis_rate):
self.debris_from_collision= debris_from_collision
self.constellations_collision_risk = constellations_collision_risk
self.debris_decay_rate = debris_decay_rate
self.launch_debris = launch_debris
self.debris_autocatalysis_rate = debris_autocatalysis_rate
def __str__(self):
return "\n{}\n{}\n{}\n{}\n{}".format(
self.debris_from_collision
,self.constellations_collision_risk
,self.debris_decay_rate
,self.launch_debris
,self.debris_autocatalysis_rate
)
def _survival(self, stocks, debris):
#returns the survival rate (not destruction rate) for the given constellation.
return 1- torch.exp(-self.constellations_collision_risk * stocks - debris.sum())
def survival(self, states):
"""
This is an interface wrapper
"""
return self._survival(states.stocks, states.debris)
def _transition_debris(self, stocks,debris,launches):
"""
This function transitions debris levels based off of a state and launch decision.
"""
new_debris = (1-self.debris_decay_rate + self.debris_autocatalysis_rate) * debris \
+ self.launch_debris * launches.sum() \
+ self.debris_from_collision * (1-self._survival(stocks,debris)) @ stocks
return new_debris
def transition_debris(self, state, estimand_interface):
"""
This is an interface wrapper.
"""
return self._transition_debris(state.stocks,state.debris,estimand_interface.launches)
def _transition_stocks(self, stocks, debris, launches):
"""
This function calculates new stock levels.
"""
new_stock = self._survival(stocks,debris) * stocks + launches
return new_stock
def transition_stocks(self, state, estimand_interface):
"""
This is an interface wrapper
"""
return self._transition_stocks(state.stocks,state.debris,estimand_interface.launches)
def transition(self, state, estimand_interface):
"""
This function takes a state and launch decision, and updates the state according to the physical laws of motion.
It returns a State object.
"""
d = self.transition_debris(state, estimand_interface)
s = self.transition_stocks(state, estimand_interface)
return States(s,d)
def transition_jacobian_wrt_states(self,state,estimand_interface):
"""
This function takes values of the state and estimand, and returns a properly formatted
jacobian of the transition function with respect to the states.
The reason this is done here is because there is some reshaping that must happen, so
it is easier to wrap it here.
"""
jac_debris = jacobian(self._transition_debris, (state.stocks,state.debris,estimand_interface.launches))
jac_stocks = jacobian(self._transition_stocks, (state.stocks,state.debris,estimand_interface.launches))
h1 = torch.cat((jac_stocks[0],jac_stocks[1]),dim=1)
h2 = torch.cat((jac_debris[0],jac_debris[1]),dim=1)
a = torch.cat((h1,h2),dim=0)
return a
def transition_jacobian_wrt_launches(self,state,estimand_interface):
"""
This function takes values of the state and estimand, and returns a properly formatted
jacobian of the transition function with respect to the launch decisions.
The reason this is done here is because there is some reshaping that must happen, so
it is easier to wrap it here.
"""
jac_debris = jacobian(self._transition_debris, (state.stocks,state.debris,estimand_interface.launches))
jac_stocks = jacobian(self._transition_stocks, (state.stocks,state.debris,estimand_interface.launches))
b = torch.cat((jac_stocks[2],jac_debris[2].T),dim=1)
return b
class States():
"""
This is supposed to capture the state variables of the model, to create a common interface
when passing between functions.
"""
def __init__(self, stocks,debris):
self.stocks = stocks
self.debris = debris
def __str__(self):
return "stocks\t{} \ndebris\t {}".format(self.stocks,self.debris)
@property
def values(self):
#return these as a single tensor.
return torch.cat((self.stocks,self.debris), dim=-1)
@property
def number_constellations(self):
return len(self.stocks)
@property
def number_debris_trackers(self):
return len(self.debris)
################ NEURAL NETWORK TOOLS ###################
class EstimandInterface():
"""
This defines a clean interface for working with the estimand (i.e. thing we are trying to estimate).
In general, we are trying to estimate the choice variables and the partial derivatives of the value functions.
This
This class wraps output for the neural network (or other estimand), allowing me to
- easily substitute various types of launch functions by having a common interface
- this eases testing
- check dimensionality etc without dealing with randomness
- again, easing testing
- reason more cleanly about the component pieces
- easing programming
- provide a clean interface to find constellation level launch decisions etc.
It takes inputs of two general categories:
- the choice function results
- the partial derivatives of the value function
"""
def __init__(self, partials, launches, deorbits=None):
self.partials = partials
self.launches = launches
self.deorbits = deorbits
@property
def number_constellations(self):
return len(self.launches)
@property
def number_states(self):
return self.number_constellations+1 #This depends on the debris trackers technically.
def launch_single(self, constellation):
#returns the launch decision for the constellation of interest
filter_tensor = torch.zeros(self.number_constellations)
filter_tensor[constellation] = 1.0
return self.launches @ filter_tensor
def launch_vector(self, constellation):
#returns the launch decision for the constellation of interest as a vector
filter_tensor = torch.zeros(self.number_constellations)
filter_tensor[constellation] = 1.0
return self.launches * filter_tensor
def partial_vector(self, constellation):
#returns the partials of the value function corresponding to the constellation of interest
filter_tensor = torch.zeros(self.number_states)
filter_tensor[constellation] = 1.0
return self.partials @ filter_tensor
def partial_matrix(self, constellation):
#returns the partials of the value function corresponding to
#the constellation of interest as a matrix
filter_tensor = torch.zeros(self.number_states)
filter_tensor[constellation] = 1.0
return self.partials * filter_tensor
def __str__(self):
#just a human readable descriptor
return "Launch Decisions and Partial Derivativs of value function with\n\tlaunches\n\t\t {}\n\tPartials\n\t\t{}".format(self.launches,self.partials)
############## ECONOMIC MODEL ############
#Abstract class describing profit. Each subclass will connect a profit function "style" to a specific instance of parameters.
class EconomicAgent(metaclass=abc.ABCMeta):
@abc.abstractmethod
def period_benefit(self,state,estimand_interface):
pass
@abc.abstractmethod
def _period_benefit(self):
pass
@abc.abstractmethod
def period_benefit_jacobian_wrt_states(self):
pass
@abc.abstractmethod
def _period_benefit_jacobian_wrt_states(self):
pass
@abc.abstractmethod
def period_benefit_jacobian_wrt_launches(self):
pass
@abc.abstractmethod
def _period_benefit_jacobian_wrt_launches(self):
pass
class LinearProfit(EconomicAgent):
"""
The simplest type of profit function available.
"""
def __init__(self, batch_size, constellation_number, discount_factor, benefit_weight, launch_cost, deorbit_cost=0, ):
self.batch_size = batch_size
#track which constellation this is.
self.constellation_number = constellation_number
#get the number of constellations (pull from the benefit weight, in the dimension that counts across constellations)
self.number_of_constellations = benefit_weight.size()[1]
#parameters describing the agent's situation
self.discount_factor = discount_factor
self.benefit_weights = benefit_weight
self.launch_cost = launch_cost
self.deorbit_cost = deorbit_cost
def __str__(self):
return "LinearProfit\n Benefit weights:\t{}\n launch cost:\t{}\n Deorbit cost:\t{}".format(self.benefit_weights, self.launch_cost, self.deorbit_cost)
def period_benefit(self,state,estimand_interface):
return self._period_benefit(state.stocks, state.debris, estimand_interface.choices)
def _period_benefit(self,stocks,debris,launches):
# multiply benefits times stocks
# sum across constellations
# reshape to standard dimensions
# subtract launch costs.
pass
def period_benefit_jacobian_wrt_states(self, states, estimand_interface):
return self._period_benefit_jacobian_wrt_states(states.stocks, states.debris, estimand_interface.launches)
def _period_benefit_jacobian_wrt_states(self, stocks, debris, launches):
jac = jacobian(self._period_benefit, (stocks,debris,launches))
return torch.cat((jac[0], jac[1]))
def period_benefit_jacobian_wrt_launches(self, states, estimand_interface):
return self._period_benefit_jacobian_wrt_launches(states.stocks, states.debris, estimand_interface.launches)
def _period_benefit_jacobian_wrt_launches(self,stocks,debris,launches):
jac = jacobian(self._period_benefit, (stocks,debris,launches))
return jac[2]
#other profit functions to implement
# price competition (substitution)
# military (complementarity)
############### TRANSITION AND OPTIMALITY FUNCTIONS #################
#at some point I should wrap the two functions below into a class that holds various things
class OrbitalModel:
def __init__(self, number_debris_trackers, num_choice_variables):
pass
def single_transition(physical_model, economic_agent, states, estimand_interface):
"""
This function represents the inverted envelope conditions.
It allows us to describe the derivatives of the value function evaluated at time $t+1$ in terms based in time period $t$.
It takes a few derivatives (jacobians), and due to the nature of the return as tuples
they must be concatenated.
It returns the transitioned values
"""
#Transition the partials
#Get the discounted jacobian with respect to states
A = economic_agent.discount_factor * physical_model.transition_jacobian_wrt_states(states, estimand_interface)
f_theta = economic_agent.period_benefit_jacobian_wrt_states(states, estimand_interface)
T = estimand_interface.partial_vector(economic_agent.constellation_number) - f_theta
#need to do testing to nicely handle when A is non-invertible
#using linalg.solve because it has more numerical stability and is faster.
iterated_value_partials = torch.linalg.solve(A,T)
#transition the states
iterated_states = physical_model.transition(states, estimand_interface)
#The goal is to create a set of partials that I can throw into the appropriate
return iterated_value_partials, iterated_states
#Optimality math
def optimality(physical_model, economic_agent, states, estimand_interface):
"""
This takes the given models, states, and the estimand and returns the optimality condition.
"""
fx = economic_agent.period_benefit_jacobian_wrt_launches(states, estimand_interface)
B = physical_model.transition_jacobian_wrt_launches(states, estimand_interface)
iterated_partials, iterated_state = single_transition(physical_model, economic_agent, states, estimand_interface)
return fx + economic_agent.discount_factor * B @ iterated_partials, iterated_partials, iterated_state