CHP Module#

SPDX-FileCopyrightText: 2024 DESY and the Constellation authors SPDX-License-Identifier: EUPL-1.2

core.chp.CHPDecodeMessage(msg: list[bytes]) Tuple[str, Timestamp, int, CHPMessageFlags, int, str | None]#

Decode a CHP binary message.

Returns host, timestamp, state, interval and status if available.

class core.chp.CHPMessageFlags(*values)#

Bases: IntFlag

Defines the message flags of CHP messages.

DENY_DEPARTURE = 1#
IS_EXTRASYSTOLE = 128#
MARK_DEGRADED = 4#
NONE = 0#
TRIGGER_INTERRUPT = 2#
class core.chp.CHPRole(*values)#

Bases: Enum

Defines the role of the satellite.

DYNAMIC = 3#
ESSENTIAL = 4#
NONE = 1#
TRANSIENT = 2#
flags() CHPMessageFlags#
classmethod from_flags(flags: CHPMessageFlags) CHPRole#
role_requires(flags: CHPMessageFlags) bool#
class core.chp.CHPTransmitter(name: str, socket: Socket)#

Bases: object

Send and receive via the Constellation Heartbeat Protocol (CHP).

close() None#

Close the socket of the transmitter.

parse_subscriptions() int#
recv(flags: int = <Flag.DONTWAIT: 1>) Tuple[str, Timestamp, int, CHPMessageFlags, int, str | None] | Tuple[None, None, None, None, None]#

Receive a heartbeat via CHP.

send(state: int, interval: int, msgflags: CHPMessageFlags, status: str | None = None, flags: int = 0) None#

Send state and interval via CHP.