Implementing a Satellite (Python)#
This tutorial will walk through the implementation of a new satellite, written in Python, step by step.
With this basic satellite in place, the code can be extended with the functionality for specific hardware and needs.
It is recommended to have a peek into the overall concept of satellites
in Constellation in order to get an impression of which functionality of the application could fit into which state of the
finite state machine.
There are also multiple read-made satellites as well as the example Python satellite Mariner available in the repository, which may serve as a source of inspiration.
See also
This how-to describes the procedure of implementing a new satellite for Constellation in Python. For C++ look here and for the microcontroller implementation, please refer to the MicroSat project.
The basic satellite class structure#
The soon-to-be satellite will gain all its basic functionality, such as the ability to receive and react to commands, initiate state changes, and send its logging output and monitoring information via the network, by inheriting from the Satellite class:
from constellation.core.satellite import Satellite, SatelliteArgumentParser
from constellation.core.logging import setup_cli_logging
class TutorialMachine(Satellite):
"""An in-progress Satellite implementation."""
pass # NOTE placeholder statement until we start the implementation!
def main(args=None):
"""In-progress tutorial Satellite."""
# Get a dict of the parsed arguments
parser = SatelliteArgumentParser(description=main.__doc__)
args = vars(parser.parse_args(args))
# Set up logging
setup_cli_logging(args.pop("level"))
# Start satellite with remaining args
s = TutorialMachine(**args)
s.run_satellite()
if __name__ == "__main__":
main()
Most code so far deals with argument parsing and start-up, while the actual satellite does not implement any extra
functionality. When saving the above code to a file tutorial.py, this satellite can already be started and operated though.
python3 tutorial.py --help
The above command will list all available parameters. To run the satellite as-is, it can be started with:
python3 tutorial.py -g myconstellation
The satellite can now be controlled by running Controller -g myconstellation
from a different terminal window or by starting the graphical MissionControl controller. More details on operating a
Constellation are provided in the Operator’s Guide.
In its current form, the tutorial satellite will not yet do much. The following sections will guide through extending its functionality.
Implementing the FSM Transitions#
Any satellite will start in the NEW state and remain there until it receives a
command to initiate a state change.
In Constellation, all common actions such as device configuration and hardware
initialization are realized through so-called transitional states which are
entered by a command and exited as soon as their action is complete. The actions
attached to these transitional states are implemented by overriding methods
provided by the Satellite base class.
For a new satellite, the following transitional state actions should be all be considered for implementation:
def do_initializing(self, config: Configuration): use the configuration provided by theconfigargument to gather and validate all parameters of the satellite and (optionally) establish a connection to the hardware using said parameters.def do_launching(self): prepare everything for data taking. This includes all one-time actions that are needed before subsequent measurements can take place.def do_starting(self, run_identifier: str): start a measurement using therun_identifieras a label.def do_stopping(self): stop the on-going measurement but remain prepared for the next.def do_landing(self): power down and e.g. disarm the hardware.
The following transitional state actions are optional:
def do_interrupting(self): this is the transition to theSAFEstate and defaults todo_stopping(if necessary because current state isRUN), followed bydo_landing. If desired, this can be overwritten with a custom action.def do_reconfigure(self, partial_config: Configuration): this transition occurs fromORBIT(i.e. when your satellite is ready to take data) and – if your satellite and hardware supports it – allows to change some or all of the configuration parameters.
For the steady state action for the RUN state, see below.
The following code implements some basic transitions into the tutorial class:
import socket
class TutorialMachine(Satellite):
"""An in-progress Satellite implementation."""
def do_initializing(self, config: Configuration) -> str:
"""Establish a connection to the hardware via socket."""
address = config.get("ip_address", "192.168.1.2")
port = config.get_int("port", 56789)
if getattr(self, "socket", None):
# already have a connection?
if self.socket.connected():
self.socket.close()
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.connect((address, port))
return f"Established a connection to {address}:{port}"
def do_launching(self) -> str:
self.socket.send("PREPARE")
return "Connection fully prepared"
def do_starting(self, run_identifier: str) -> None:
self.socket.send(f"START_{run_identifier}")
def do_stopping(self) -> None:
self.socket.send(f"STOP")
def do_landing(self) -> None:
self.socket.send(f"POWER_DOWN")
Note that some of the above transitions return a string which summarizes the state change. This string will afterwards be a part of the status of the satellite.
This particular satellite sets up a network socket to send commands via a TCP/IP network connection to a device. In the actual satellite implementation, this might be a USB connection, or maybe the hardware has its own Python library that communicates with it.
Important
The do_initializing routine can be called more than once as this transition is allowed from both NEW and INIT as well as ‘ERROR’ and ‘SAFE’ states. It should therefore be carefully ensured that e.g. any already open connections are closed before establishing new ones or that the class keeps track of any steps that only needs to be performed once (e.g. loading an FPGA bit stream).
Reading Configuration Parameters#
Important
Reading information from the satellite configuration is only possible in the do_initializing function.
All parameters the satellite requires should be read and validated in this function, the do_launching function should only be used to apply this configuration to hardware.
Configuration parameters can be read using the get() method on the configuration object:
address = config.get("ip_address")
A default value can be provided as the second argument:
address = config.get("ip_address", "192.168.1.2")
An explicit return type can be specified with the return_type argument:
address = config.get("ip_address", return_type=str)
Hint
It is also possible to set a method as return type that takes Any and returns the desired type.
Several other get methods exists for convenience:
get_num,get_int,get_float: get anintorfloatwith optional limit checksget_array: get alistwith homogeneous elementsget_set: get asetwith homogeneous elementsget_path: get apathlib.Path
To access nested dictionaries, get_section can be used:
# Get configuration section for channels
channels_section = config.get_section("channels")
# Read channel 0 & 1 as nested sections
for n in range(2):
# Get section for channel n
channel_section = config.get_section(f"channel_{n}")
# Note that channel_section has the same methods available as config
# Get voltage for channel
voltage = channel_section.get_num("voltage")
See also
More details about configuration sections and their intricacies can be found in Configuration Sections.
It is possible to iterate over sections using get_keys:
# Get section for channels
channels_section = config.get_section("channels");
# Iterate over keys in dictionary
for key in channels_section.get_keys():
# Keys defined as `chN`
if not key.startswith("ch")
raise InvalidKeyError(channels_section, key, "key have to start with `ch`");
channel = int(key[2:])
# Get section for channel
channel_section = channels_section.get_section(key)
enabled = channel_section.get("enabled", False, return_type=bool)
Besides the get methods, following methods might be helpful when reading the configuration:
has: checks if a key is present in the configurationcount: counts how many of the given keys are present in the configuration, useful to check for invalid key combinationsset_alias: can be used to set an alias for a renamed key
Running and the Stop Event#
In satellite’s RUN state, all actions are performed by the do_run method, which – just as
the transitional state actions above – must be overridden from the Satellite base
class. do_run will be called upon entering the RUN state (and after the
do_starting action has completed). It can run as long as it needs to but is expected to finish as quickly as
possible as soon as the stop_running Event is set. An example run loop is shown
below:
def do_run(self, payload: any) -> str:
# the stop_running Event will be set from outside the thread when it is
# time to close down.
while not self.stop_requested():
# Do work
...
return "Finished acquisition."
Any finalization of the measurement run should be performed in the do_stopping action rather than at the end of the do_run function, if possible.
Error handling and shutdown#
All transition methods are wrapped into a try:/except: clause and thus
prevent the satellite from crashing should any unexpected error arise. Instead,
the transition (or run) will be aborted, and the satellite will enter the
ERROR state. In this state, the satellite’s status or its logs are a useful resource to find out the reason for the error.
To allow the satellite to fail “gracefully”, that is in a controlled fashion, the class can provide a method fail_gracefully that handles e.g. the closure of any resources:
def fail_gracefully(self) -> str:
"""Method called when reaching 'ERROR' state."""
if getattr(self, "socket", None):
# already have a connection?
if self.socket.connected():
self.socket.close()
return "Failed gracefully."
Be aware that the ERROR state could potentially be reached from any other state. The routine should therefore do the best it can to assist a later recovery through initialization.
If a satellite is shut down, for example by receiving a termination signal from
the OS or if you press Ctrl-C in the terminal window where the satellite
runs, the method reentry will be called. In most circumstances, there is no need
to implement anything satellite-specific for this method. If the satellite however requires this, it should be ensured
that super().reentry() is called as last step of that routine, so that the
Satellite base classes reentry methods are executed as well.
Troubleshooting#
Satellite base functionality not working as expected#
If a satellite class has been implemented, but it does not behave as expected, this could depend on one or multiple of the following
Other methods than the ones described above have been implemented, but are not calling the parent class’ method in your implementation, e.g. the
__init__method. In those cases, be sure to add asuper()call, e.g.super().__init__(*args, **kwargs)and adjust the signature of the implementation to support the same arguments as the parent class’ method.The satellite accidentally implemented a method, an attribute or a property already existing in the parent class, thus shadowing members of the
Satelliteclass and its base classes. Currently, there is no protection against this and satellite implementations need to be carefully written to not fall into this trap.
The following code can be run in a Python console (REPL) to see all members of a Satellite and compare that with the attributes and methods the satellite class uses:
from constellation.core.satellite import Satellite
s = Satellite()
dir(s)
More information about a specific method can be gathered by either looking at the source code or, after having run the above
commands, calling help(s.method_of_interest) to learn more about a method, in this case, method_of_interest.