CDTP Module#
SPDX-FileCopyrightText: 2024 DESY and the Constellation authors SPDX-License-Identifier: EUPL-1.2
Module implementing the Constellation Data Transmission Protocol.
- class core.cdtp.DataReceiver(context: Context, logger: ConstellationLogger, receive_bor_cb: Callable[[str, dict[str, Any], dict[str, Any]], None], receive_data_cb: Callable[[str, DataRecord], None], receive_eor_cb: Callable[[str, dict[str, Any], dict[str, Any]], None], data_transmitters: set[str] | None)#
Bases:
objectBase class for receiving CDTP messages via ZMQ.
- add_sender(service: DiscoveredService) None#
Prepare for pulling data from a discovered sender.
- property data_transmitters: set[str] | None#
Access available set of data transmitter’s canonical names.
- remove_sender(service: DiscoveredService) None#
Close connection to a sender.
- class core.cdtp.DataTransmitter(name: str, socket: Socket, logger: ConstellationLogger, failure_cb: Callable[[str], None])#
Bases:
objectBase class for sending CDTP messages via ZMQ.
- property payload_threshold: int#
The current threshold (in [KiB]) at which current payloads will be transmitted.
- send_bor(user_tags: dict[str, Any], configuration: dict[str, Any], flags: int = 0) None#
Send a beginning-of-run message.
- send_data_record(data_record: DataRecord) None#
Queue a data record for sending
- send_eor(user_tags: dict[str, Any], run_metadata: dict[str, Any], flags: int = 0) None#
Send an end-of-run message.
- property state: TransmitterState#
- exception core.cdtp.InvalidCDTPMessageType(msg_type: Type, reason: str)#
Bases:
RuntimeErrorException thrown when an invalid CTDP message type is encountered.
- class core.cdtp.PushThread(dtm: DataTransmitter)#
Bases:
ThreadSending thread for CDTP
- exception core.cdtp.RecvTimeoutError(what: str, timeout: int)#
Bases:
RuntimeErrorException thrown when a timeout occurs during a receive operation.
- class core.cdtp.RunCondition(*values)#
Bases:
IntFlagDefines the condition flags of run data as transmitted via CDTP.
- ABORTED = 8#
- DEGRADED = 16#
- GOOD = 0#
- INCOMPLETE = 2#
- INTERRUPTED = 4#
- TAINTED = 1#
- exception core.cdtp.SendTimeoutError(what: str, timeout: int)#
Bases:
RuntimeErrorException thrown when a timeout occurs during a send operation.