Pools#

constellation::pools Namespace#

template<typename MESSAGE, chirp::ServiceIdentifier SERVICE, zmq::socket_type SOCKET_TYPE>
class BasePool#
#include <constellation/core/pools/BasePool.hpp>

Abstract Base pool class.

This class registers a CHIRP callback for the services defined via the template parameter, listens to incoming messages and forwards them to a callback registered upon creation of the socket.

Template Parameters:
  • MESSAGE – Constellation Message class to be decoded

  • SERVICE – CHIRP service to connect to

  • SOCKET_TYPE – ZeroMQ socket type of connection

Public Functions

BasePool(std::string_view log_topic, std::function<void(MESSAGE&&)> callback)#

Construct BasePool.

Parameters:
  • log_topic – Logger topic to be used for this component

  • callback – Callback function pointer for received messages

virtual ~BasePool() = default#

Destruct BasePool.

Warning

stop_pool() has to be called before the pool can be safely destructed

void checkPoolException()#

Check if pool thread has thrown an exception.

Throws:

Exception – thrown by pool thread, if any

void startPool()#

Start the pool thread and send the CHIRP requests.

void stopPool()#

Stop the pool thread.

inline std::size_t pollerEvents()#

Return number of events returned by poller_.wait()

inline std::size_t countSockets()#

Return the number of currently connected sockets.

Protected Functions

virtual bool should_connect(const chirp::DiscoveredService &service)#

Method to select which services to connect to. By default this pool connects to all discovered services, derived pools may implement selection criteria.

Parameters:

service – The discovered service

Returns:

Boolean indicating whether a connection should be opened

virtual void host_connected(const chirp::DiscoveredService &service)#

Method for derived classes to act on newly connected hosts.

Parameters:

service – Service of the newly connected host

virtual void host_disconnected(const chirp::DiscoveredService &service)#

Method for derived classes to act on sockets before disconnecting.

Parameters:

service – Service of the disconnected host

virtual void host_disposed(const chirp::DiscoveredService &service)#

Method for derived classes to act on sockets that are removed because their endpoint is dead.

Parameters:

service – Service of the disconnected host

inline std::map<chirp::DiscoveredService, zmq::socket_t> &get_sockets()#

Return all connected sockets.

Warning

Read access to the sockets needs to be protected with sockets_mutex_

Returns:

Maps that maps the discovered service to the corresponding ZeroMQ sockets

Protected Attributes

std::mutex sockets_mutex_#
log::Logger pool_logger_#
template<typename MESSAGE, chirp::ServiceIdentifier SERVICE>
class SubscriberPool : public constellation::pools::BasePool<MESSAGE, SERVICE, zmq::socket_type::sub>#
#include <constellation/core/pools/SubscriberPool.hpp>

Abstract Subscriber pool class

This class registers a CHIRP callback for the services defined via the template parameter, listens to incoming messages and forwards them to a callback registered upon creation of the subscriber socket.

Warning

Duplicate subscriptions also require duplicate unsubscriptions, this class does not contain any logic to to track subscription states.

Public Types

using BasePoolT = BasePool<MESSAGE, SERVICE, zmq::socket_type::sub>#

Public Functions

SubscriberPool(std::string_view log_topic, std::function<void(MESSAGE&&)> callback)#

Construct SubscriberPool.

Parameters:
  • log_topic – Logger topic to be used for this component

  • callback – Callback function pointer for received messages

void subscribe(std::string_view host, const std::string &topic)#

Subscribe to a given topic of a specific host.

Parameters:
  • host – Canonical name of the host to subscribe to

  • topic – Topic to subscribe to

void subscribe(message::MD5Hash host_id, const std::string &topic)#

Subscribe to a given topic of a specific host.

Parameters:
  • host_id – MD5 hash of canonical name of the host to subscribe to

  • topic – Topic to subscribe to

void subscribe(const std::string &topic)#

Subscribe to a given topic for all connected hosts.

Parameters:

topic – Topic to subscribe to

void unsubscribe(std::string_view host, const std::string &topic)#

Unsubscribe from a given topic of a specific host.

Parameters:
  • host – Canonical name of the host to unsubscribe from

  • topic – Topic to unsubscribe

void unsubscribe(message::MD5Hash host_id, const std::string &topic)#

Unsubscribe from a given topic of a specific host.

Parameters:
  • host_id – MD5 hash of canonical name of the host to unsubscribe from

  • topic – Topic to unsubscribe

void unsubscribe(const std::string &topic)#

Unsubscribe from a given topic for all hosts.

Parameters:

topic – Topic to unsubscribe