CDTP2 module#

SPDX-FileCopyrightText: 2025 DESY and the Constellation authors SPDX-License-Identifier: EUPL-1.2

Provides the message class for CDTP2

class core.message.cdtp2.CDTP2BORMessage(sender: str, user_tags: dict[str, Any], configuration: dict[str, Any])#

Bases: CDTP2Message

Message class for CDTP2 BOR messages

static cast(msg: CDTP2Message) CDTP2BORMessage#
property configuration: dict[str, Any]#
property user_tags: dict[str, Any]#
class core.message.cdtp2.CDTP2EORMessage(sender: str, user_tags: dict[str, Any], run_metadata: dict[str, Any])#

Bases: CDTP2Message

Message class for CDTP2 EOR messages

static cast(msg: CDTP2Message) CDTP2EORMessage#
property run_metadata: dict[str, Any]#
property user_tags: dict[str, Any]#
class core.message.cdtp2.CDTP2Message(sender: str, type: Type)#

Bases: object

Message class for CDTP2

class Type(*values)#

Bases: IntEnum

Enum describing the type of CDTP2 message

BOR = 1#

End-of-run message

DATA = 0#

Begin-of-run message

EOR = 2#
add_data_record(data_record: DataRecord) None#
assemble() MultipartMessage#
clear_data_records() None#
count_payload_bytes() int#
property data_records: list[DataRecord]#
static disassemble(frames: list[bytes]) CDTP2Message#
property sender: str#
property type: Type#
class core.message.cdtp2.DataRecord(sequence_number: int, tags: dict[str, Any] | None = None)#

Bases: object

Data record containing a tags and block with binary data

add_block(data: bytes) None#

Add a block of data to the data record

property blocks: list[bytes]#
count_payload_bytes() int#
pack(stream: BytesIO, packer: Packer) None#
property sequence_number: int#
property tags: dict[str, Any]#
static unpack(array: list[Any]) DataRecord#