Source code for slewpy.hub

import simpy
from astropy.time import Time
import astropy.units as u
import logging


[docs] class Hub: """Central hub for satellite communications Parameters ---------- env : simpy Environment Discrete event environment request_pipe : simpy Pipe channel for sensors to request new targets data_pipe : simpy Pipe channel to receive observations from sensors sensor_list : list of Sensor objects set of sensors to communicate with targets : list of target objects this contains the true orbit information for all targets obs_targets : list of ObsTarget list of currently known Targets scheduler : Scheduler object that determines how targets are assigned to sensors t0 : astropy Time time at which the simulation starts """ def __init__( self, env, request_pipe, data_pipe, sensor_list, obs_targets, target_pipe=None, scheduler=None, t0=Time("J2000"), runtime=None, ): self.env = env self.request_pipe = request_pipe self.data_pipe = data_pipe self.target_pipe = target_pipe self.targets = [] self.obs_targets = obs_targets self.t0 = t0 self.env.t0 = t0 if runtime: self.env.runtime = runtime self.scheduler = scheduler self.env.process(self.listen_request()) self.env.process(self.recieve_target_info()) if self.target_pipe: self.env.process(self.listen_new_target()) self.sensor_list = {} for sensor in sensor_list: self.sensor_list[sensor.name] = sensor self.sensor_list[sensor.name].t0 = t0 self.sensor_list[sensor.name].targets = self.targets
[docs] def send_target(self, sensor, pointings, obs_targets): """Send sensor a set of pointings and current set of observed targets""" delay = 0 sensor.recieve_pipe.put((f"Sending target ", pointings, obs_targets), delay)
[docs] def listen_new_target(self): """Listen for sensor to request a new set of pointings""" while True: name, msg, target = yield self.target_pipe.get() logging.info(f"New target added at {self.env.now}") self.obs_targets.append(target)
[docs] def listen_request(self): """Listen for sensor to request a new set of pointings""" while True: name, msg, data = yield self.request_pipe.get() logging.info( f"Received request at {self.env.now} from sensor {name}: {msg}" ) pointings, obs_targets = self.scheduler.manager.assign_pointings( self.obs_targets, self.sensor_list[name], self.t0 + self.env.now * u.s ) self.send_target(self.sensor_list[name], pointings, obs_targets)
[docs] def recieve_target_info(self): """Listen for sensor to send updated track information""" while True: name, msg, data = yield self.data_pipe.get()