Monitoring Module#

SPDX-FileCopyrightText: 2024 DESY and the Constellation authors SPDX-License-Identifier: EUPL-1.2

class core.monitoring.FileMonitoringListener(name: str, group: str, interface: list[str], output_path: str)#

Bases: MonitoringListener

metric_callback(metric: Metric) None#

Metric callback.

class core.monitoring.MonitoringListener(*args, **kwargs)#

Bases: StatListener

Simple monitor class to receive logs and metrics from a Constellation.

reentry() None#

Orderly destroy the satellite.

class core.monitoring.MonitoringSender(**kwds: Any)#

Bases: BaseSatelliteFrame

Sender mixin class for Constellation Monitoring Distribution Protocol.

Any method of inheriting classes that has the @schedule_metric decorator, will be regularly polled for new values and a corresponding Metric be sent on the monitoring port.

reset_scheduled_metrics() None#

Reset all previously scheduled metrics.

Will only schedule metrics provided via decorator.

schedule_metric(name: str, unit: str, handling: MetricsType, interval: float, callback: Callable[[...], Any]) None#

Schedule a callback at regular intervals.

The callable needs to return a value [any] and a unit [str] and take no arguments. If you have a callable that requires arguments, consider using functools.partial to fill in the necessary information at scheduling time.

send_metric(metric: Metric) None#

Send a single metric via ZMQ.

class core.monitoring.StatListener(name: str, group: str, interface: list[str], mon_port: int | None = None, **kwds: Any)#

Bases: CHIRPBroadcaster

Simple listener class to receive metrics from a Constellation.

metric_callback(metric: Metric) None#

Metric callback.

reentry() None#

Orderly destroy the satellite.

run_listener() None#
class core.monitoring.ZeroMQSocketLogListener(transmitter: CMDPTransmitter, /, *handlers: Any, **kwargs: Any)#

Bases: QueueListener

This listener receives messages from a CMDPTransmitter.

NOTE that the corresponding socket should only subscribe to LOG messages!

dequeue(block: bool) LogRecord#

Dequeue a record and return it, optionally blocking.

The base implementation uses get. You may want to override this method if you want to use timeouts or work with custom queue implementations.

enqueue_sentinel() None#

This is used to enqueue the sentinel record.

The base implementation uses put_nowait. You may want to override this method if you want to use timeouts or work with custom queue implementations.

stop() None#

Close socket and stop thread.

core.monitoring.get_scheduled_metrics(cls: object) dict[str, dict[str, Any]]#

Loop over all class methods and return those marked as metric.

core.monitoring.main(args: Any = None) None#

Start a simple log listener service.

core.monitoring.schedule_metric(unit: str, handling: MetricsType, interval: float) Callable[[Callable[[P], Any]], Callable[[P], Metric]]#

Schedule a function for callback at interval [s] and send Metric.

The function should take no arguments and return a value [any]