CDTP Module#

SPDX-FileCopyrightText: 2024 DESY and the Constellation authors SPDX-License-Identifier: EUPL-1.2

Module implementing the Constellation Data Transmission Protocol.

class core.cdtp.DataReceiver(context: Context, logger: ConstellationLogger, receive_bor_cb: Callable[[str, dict[str, Any], dict[str, Any]], None], receive_data_cb: Callable[[str, DataRecord], None], receive_eor_cb: Callable[[str, dict[str, Any], dict[str, Any]], None], data_transmitters: set[str] | None)#

Bases: object

Base class for receiving CDTP messages via ZMQ.

add_sender(service: DiscoveredService) None#
property bytes_received: int#
check_exception() None#
property data_transmitters: set[str] | None#
property eor_timeout: int#

The EOR receiving timeout value (in s).

remove_sender(service: DiscoveredService) None#
property running: bool#
start_receiving() None#
stop_receiving() None#
class core.cdtp.DataTransmitter(name: str, socket: Socket, logger: ConstellationLogger, failure_cb: Callable[[str], None])#

Bases: object

Base class for sending CDTP messages via ZMQ.

property bor_timeout: int#

The BOR sending timeout value (in s).

property bytes_transmitted: int#
can_send_record() bool#

Check if a data record can be send immediately

check_exception() None#
property data_timeout: int#

The DATA sending timeout value (in s).

property eor_timeout: int#

The EOR sending timeout value (in s).

new_data_record(tags: dict[str, Any] | None = None) DataRecord#

Return new data record for sending

property payload_threshold: int#
property queue_size: int#
property records_transmitted: int#
send_bor(user_tags: dict[str, Any], configuration: dict[str, Any], flags: int = 0) None#
send_data_record(data_record: DataRecord) None#

Queue a data record for sending

send_eor(user_tags: dict[str, Any], run_metadata: dict[str, Any], flags: int = 0) None#
property sequence_number: int#
start_sending() None#
property state: TransmitterState#
stop_sending() None#
exception core.cdtp.InvalidCDTPMessageType(type: Type, reason: str)#

Bases: RuntimeError

class core.cdtp.PullThread(drc: DataReceiver)#

Bases: Thread

Receiving thread for CDTP

run() None#

Thread method pulling data messages

class core.cdtp.PushThread(dtm: DataTransmitter)#

Bases: Thread

Sending thread for CDTP

run() None#

Thread method pushing data messages

exception core.cdtp.RecvTimeoutError(what: str, timeout: int)#

Bases: RuntimeError

class core.cdtp.RunCondition(*values)#

Bases: IntFlag

Defines the condition flags of run data as transmitted via CDTP.

ABORTED = 8#
DEGRADED = 16#
GOOD = 0#
INCOMPLETE = 2#
INTERRUPTED = 4#
TAINTED = 1#
exception core.cdtp.SendTimeoutError(what: str, timeout: int)#

Bases: RuntimeError

class core.cdtp.TransmitterState(*values)#

Bases: IntEnum

BOR_RECEIVED = 2#
EOR_RECEIVED = 3#
NOT_CONNECTED = 1#
class core.cdtp.TransmitterStateSeq(state: 'TransmitterState', seq: 'int', missed: 'int')#

Bases: object

missed: int#
seq: int#
state: TransmitterState#