CDTP Module#

SPDX-FileCopyrightText: 2024 DESY and the Constellation authors SPDX-License-Identifier: EUPL-1.2

Module implementing the Constellation Data Transmission Protocol.

class core.cdtp.DataReceiver(context: Context, logger: ConstellationLogger, receive_bor_cb: Callable[[str, dict[str, Any], dict[str, Any]], None], receive_data_cb: Callable[[str, DataRecord], None], receive_eor_cb: Callable[[str, dict[str, Any], dict[str, Any]], None], data_transmitters: set[str] | None)#

Bases: object

Base class for receiving CDTP messages via ZMQ.

add_sender(service: DiscoveredService) None#

Prepare for pulling data from a discovered sender.

property bytes_received: int#

The number of bytes received so far.

check_exception() None#

Raise any exception encountered in pull thread.

property data_transmitters: set[str] | None#

Access available set of data transmitter’s canonical names.

property eor_timeout: int#

The EOR receiving timeout value (in s).

remove_sender(service: DiscoveredService) None#

Close connection to a sender.

property running: bool#

Whether or not puller thread is running.

start_receiving() None#

Prepare and start a thread pulling data.

stop_receiving() None#

Stop the thread pulling data.

class core.cdtp.DataTransmitter(name: str, socket: Socket, logger: ConstellationLogger, failure_cb: Callable[[str], None])#

Bases: object

Base class for sending CDTP messages via ZMQ.

property bor_timeout: int#

The BOR sending timeout value (in s).

property bytes_transmitted: int#
can_send_record() bool#

Check if a data record can be send immediately

check_exception() None#

Raise any exception encountered in push thread.

property data_timeout: int#

The DATA sending timeout value (in s).

property eor_timeout: int#

The EOR sending timeout value (in s).

new_data_record(tags: dict[str, Any] | None = None) DataRecord#

Return new data record for sending

property payload_threshold: int#

The current threshold (in [KiB]) at which current payloads will be transmitted.

property queue_size: int#

The maximum size of the data record queue.

property records_transmitted: int#
send_bor(user_tags: dict[str, Any], configuration: dict[str, Any], flags: int = 0) None#

Send a beginning-of-run message.

send_data_record(data_record: DataRecord) None#

Queue a data record for sending

send_eor(user_tags: dict[str, Any], run_metadata: dict[str, Any], flags: int = 0) None#

Send an end-of-run message.

property sequence_number: int#
start_sending() None#

Start the data push thread.

property state: TransmitterState#
stop_sending() None#

Stop data push thread.

exception core.cdtp.InvalidCDTPMessageType(msg_type: Type, reason: str)#

Bases: RuntimeError

Exception thrown when an invalid CTDP message type is encountered.

class core.cdtp.PushThread(dtm: DataTransmitter)#

Bases: Thread

Sending thread for CDTP

exception core.cdtp.RecvTimeoutError(what: str, timeout: int)#

Bases: RuntimeError

Exception thrown when a timeout occurs during a receive operation.

class core.cdtp.RunCondition(*values)#

Bases: IntFlag

Defines the condition flags of run data as transmitted via CDTP.

ABORTED = 8#
DEGRADED = 16#
GOOD = 0#
INCOMPLETE = 2#
INTERRUPTED = 4#
TAINTED = 1#
exception core.cdtp.SendTimeoutError(what: str, timeout: int)#

Bases: RuntimeError

Exception thrown when a timeout occurs during a send operation.

class core.cdtp.TransmitterState(*values)#

Bases: IntEnum

Possible states of a transmitter with respect to having received BOR/EOR.

BOR_RECEIVED = 2#
EOR_RECEIVED = 3#
NOT_CONNECTED = 1#
class core.cdtp.TransmitterStateSeq(state: TransmitterState, seq: int, missed: int)#

Bases: object

Class recording state, current sequence number and missed messages of a Transmitter.

missed: int#
seq: int#
state: TransmitterState#