gwproactor

This packages provides infrastructure for running a proactor on top of asyncio with support multiple MQTT clients and and sub-objects which support their own threads for synchronous operations.

This packages is not GridWorks-aware (except that it links actors with multiple mqtt clients). This separation between communication / action infrastructure and GridWorks semantics is intended to allow the latter to be more focussed.

This package is not polished and the separation is up for debate.

Particular questions:

  • Is the programming model still clean after more concrete actors are implemented and more infrastructure are added.

  • Does the separation add value or just complicate analysis.

  • MQTTClients should be made async.

  • Semantics of building message type namespaces should be spelled out / further worked out.

  • Test support should be implemented / cleaner.

class gwproactor.Actor(name, services)
Parameters:
property alias
init()

Called after constructor so derived functions can be used in setup.

Return type:

None

property node
class gwproactor.ActorInterface

Pure interface for a proactor sub-object (an Actor) which can communicate and has a GridWorks ShNode.

abstract property alias: str
abstract init()

Called after constructor so derived functions can be used in setup.

Return type:

None

classmethod load(name, actor_class_name, services, module_name)
Parameters:
Return type:

ActorInterface

abstract property node: ShNode
class gwproactor.AsyncQueueWriter

Allow synchronous code to write to an asyncio Queue.

It is assumed the asynchronous reader has access to the asyncio Queue “await get()” from directly from it.

put(item)

Write to asyncio queue in a threadsafe way.

Parameters:

item (Any)

Return type:

None

set_async_loop(loop, async_queue)
Parameters:
  • loop (AbstractEventLoop)

  • async_queue (Queue)

Return type:

None

class gwproactor.Communicator(name, services)

A partial implementation of CommunicatorInterface which supplies the trivial implementations

Parameters:
property monitored_names: Sequence[MonitoredName]
property name: str
property services: ServicesInterface
class gwproactor.CommunicatorInterface

Pure interface necessary for interaction between a sub-object and the system services proactor

abstract property monitored_names: Sequence[MonitoredName]
abstract property name: str
abstract process_message(message)
Parameters:

message (Message)

Return type:

Ok[bool] | Err[BaseException]

abstract property services: ServicesInterface
class gwproactor.ExternalWatchdogCommandBuilder

Create arguments which will be passed to subprocess.run() to pat the external watchdog.

If the returned list is empty, pat process will be run.

By default an empty list is returned if the environment variable named by sevice_variable_name() is not set to 1 or true.

classmethod default_pat_args(pid=None)
Parameters:

pid (int | None)

Return type:

list[str]

classmethod pat_args(service_name, args=None, pid=None)

Return arguments to be passed to subprocess.run() to pat the external watchdog.

Parameters:
  • service_name (str)

  • args (list[str] | None)

  • pid (int | None)

Return type:

list[str]

classmethod running_as_service(service_name)
Parameters:

service_name (str)

Return type:

bool

classmethod service_variable_name(service_name)
Parameters:

service_name (str)

Return type:

str

class gwproactor.MQTTClientWrapper(name, client_config, receive_queue)
Parameters:
connected()
Return type:

bool

disable_logger()
enable_logger(logger=None)
Parameters:

logger (Logger | LoggerAdapter | None)

handle_suback(suback)
Parameters:

suback (MQTTSubackPayload)

Return type:

int

num_pending_subscriptions()
Return type:

int

num_subscriptions()
Return type:

int

on_connect(_, userdata, flags, rc)
on_connect_fail(_, userdata)
on_disconnect(_, userdata, rc)
on_message(_, userdata, message)
on_subscribe(_, userdata, mid, granted_qos)
publish(topic, payload, qos)
Parameters:
  • topic (str)

  • payload (bytes)

  • qos (int)

Return type:

MQTTMessageInfo

start()
stop()
subscribe(topic, qos)
Parameters:
  • topic (str)

  • qos (int)

Return type:

Tuple[int, int | None]

subscribe_all()
Return type:

Tuple[int, int | None]

subscribed()
Return type:

bool

subscription_items()
Return type:

list[Tuple[str, int]]

unsubscribe(topic)
Parameters:

topic (str)

Return type:

Tuple[int, int | None]

class gwproactor.MQTTClients
add_client(name, client_config, upstream=False, primary_peer=False)
Parameters:
  • name (str)

  • client_config (MQTTClient)

  • upstream (bool)

  • primary_peer (bool)

client_wrapper(client)
Parameters:

client (str)

Return type:

MQTTClientWrapper

clients: Dict[str, MQTTClientWrapper]
connected(client)
Parameters:

client (str)

Return type:

bool

disable_loggers()
enable_loggers(logger=None)
Parameters:

logger (Logger | LoggerAdapter | None)

handle_suback(suback)
Parameters:

suback (MQTTSubackPayload)

Return type:

int

num_pending_subscriptions(client)
Parameters:

client (str)

Return type:

int

num_subscriptions(client)
Parameters:

client (str)

Return type:

int

primary_peer()
Return type:

MQTTClientWrapper

primary_peer_client: str = ''
publish(client, topic, payload, qos)
Parameters:
  • client (str)

  • topic (str)

  • payload (bytes)

  • qos (int)

Return type:

MQTTMessageInfo

start(loop, async_queue)
Parameters:
  • loop (AbstractEventLoop)

  • async_queue (Queue)

stop()
subscribe(client, topic, qos)
Parameters:
  • client (str)

  • topic (str)

  • qos (int)

Return type:

Tuple[int, int | None]

subscribe_all(client)
Parameters:

client (str)

Return type:

Tuple[int, int | None]

subscribed(client)
Parameters:

client (str)

Return type:

bool

unsubscribe(client, topic)
Parameters:
  • client (str)

  • topic (str)

Return type:

Tuple[int, int | None]

upstream()
Return type:

MQTTClientWrapper

upstream_client: str = ''
class gwproactor.MonitoredName(name: str, timeout_seconds: float)
Parameters:
  • name (str)

  • timeout_seconds (float)

name: str
timeout_seconds: float
class gwproactor.Proactor(name, settings, hardware_layout=None)
Parameters:
  • name (str)

  • settings (ProactorSettings)

  • hardware_layout (HardwareLayout | None)

add_communicator(communicator)
Parameters:

communicator (CommunicatorInterface)

add_web_route(server_name, method, path, handler, **kwargs)

Adds configuration for web server route which will be available after start() is called.

May be called even if associated web server is not configured, in which case this route will simply be ignored.

Not thread safe.

Parameters:
  • server_name (str)

  • method (str)

  • path (str)

  • handler (Callable[[Request], Awaitable[StreamResponse]])

  • kwargs (Any)

add_web_server_config(name, host, port, **kwargs)

Adds configuration for web server which will be started when start() is called.

Not thread safe.

Parameters:
  • name (str)

  • host (str)

  • port (int)

  • kwargs (Any)

Return type:

None

property async_receive_queue: Queue | None
property event_loop: AbstractEventLoop | None
generate_event(event)
Parameters:

event (EventT)

Return type:

Ok[bool] | Err[BaseException]

get_communicator(name)
Parameters:

name (str)

Return type:

CommunicatorInterface | None

get_communicator_as_type(name, type_)
Parameters:
  • name (str)

  • type_ (Type[T])

Return type:

T | None

get_external_watchdog_builder_class()
Return type:

type[ExternalWatchdogCommandBuilder]

property hardware_layout: HardwareLayout
property io_loop_manager: IOLoopInterface
async join()
property logger: ProactorLogger
classmethod make_event_persister(settings)
Parameters:

settings (ProactorSettings)

Return type:

PersisterInterface

classmethod make_stats()
Return type:

ProactorStats

property monitored_names: Sequence[MonitoredName]
property name: str
property primary_peer_client: str
async process_message(message)
Parameters:

message (Message)

async process_messages()
property publication_name: str
async run_forever()
send(message)
Parameters:

message (Message)

send_threadsafe(message)
Parameters:

message (Message)

Return type:

None

property services: ServicesInterface
property settings: ProactorSettings
start()
start_tasks()
property stats: ProactorStats
stop()
property upstream_client: str
class gwproactor.ProactorLogger(base, message_summary, lifecycle, comm_event, extra=None)
Parameters:
  • base (str)

  • message_summary (str)

  • lifecycle (str)

  • comm_event (str)

  • extra (dict | None)

MESSAGE_DELIMITER_WIDTH = 88
MESSAGE_ENTRY_DELIMITER = '++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++'
MESSAGE_EXIT_DELIMITER = '----------------------------------------------------------------------------------------'
comm_event(msg, *args, **kwargs)
Parameters:

msg (str)

Return type:

None

property comm_event_enabled: bool
comm_event_logger: Logger
property general_enabled: bool
lifecycle(msg, *args, **kwargs)
Parameters:

msg (str)

Return type:

None

property lifecycle_enabled: bool
lifecycle_logger: Logger
message_enter(msg, *args, **kwargs)
Parameters:

msg (str)

Return type:

None

message_exit(msg, *args, **kwargs)
Parameters:

msg (str)

Return type:

None

message_summary(direction, actor_alias, topic, payload_object=None, broker_flag=' ', timestamp=None, message_id='')
Parameters:
  • direction (str)

  • actor_alias (str)

  • topic (str)

  • payload_object (Any)

  • broker_flag (str)

  • timestamp (datetime | None)

  • message_id (str)

Return type:

None

property message_summary_enabled: bool
message_summary_logger: Logger
path(msg, *args, **kwargs)
Parameters:

msg (str)

Return type:

None

property path_enabled: bool
class gwproactor.ProactorSettings(_env_file='<object object>', _env_file_encoding=None, _env_nested_delimiter=None, _secrets_dir=None, *, paths=None, logging=LoggingSettings(base_log_name='gridworks', base_log_level=30, levels=LoggerLevels(message_summary=30, lifecycle=20, comm_event=20), formatter=FormatterSettings(fmt='%(asctime)s %(message)s', datefmt='', default_msec_format='%s.%03d'), file_handler=RotatingFileHandlerSettings(filename='proactor.log', bytes_per_log_file=2097152, num_log_files=10, level=0)), mqtt_link_poll_seconds=60.0, ack_timeout_seconds=5.0, num_initial_event_reuploads=5)
Parameters:
  • _env_file (str | PathLike | List[str | PathLike] | Tuple[str | PathLike, ...] | None)

  • _env_file_encoding (str | None)

  • _env_nested_delimiter (str | None)

  • _secrets_dir (str | PathLike | None)

  • paths (Paths)

  • logging (LoggingSettings)

  • mqtt_link_poll_seconds (float)

  • ack_timeout_seconds (float)

  • num_initial_event_reuploads (int)

class Config
env_nested_delimiter = '__'
env_prefix = 'PROACTOR_'
ack_timeout_seconds: float
classmethod get_paths(v)
Parameters:

v (Paths)

Return type:

Paths

logging: LoggingSettings
num_initial_event_reuploads: int
paths: Paths
classmethod post_root_validator(values)

Update unset paths of any member MQTTClient’s TLS paths based on ProactorSettings ‘paths’ member.

Parameters:

values (dict)

Return type:

dict

classmethod update_paths_name(values, name)

Update paths member with a new ‘name’ attribute, e.g., a name known by a derived class.

This is meant to be called in a ‘pre=True’ root validator of a derived class.

Parameters:
  • values (dict)

  • name (str)

Return type:

dict

exception gwproactor.Problems(msg='', warnings=None, errors=None, max_problems=10)
Parameters:
  • msg (str)

  • warnings (list[BaseException])

  • errors (list[BaseException])

  • max_problems (int | None)

MAX_PROBLEMS = 10
add_error(error)
Parameters:

error (BaseException)

Return type:

Problems

add_problems(other)
Parameters:

other (Problems)

Return type:

Problems

add_warning(warning)
Parameters:

warning (BaseException)

Return type:

Problems

error_traceback_str()
Return type:

str

errors: list[BaseException]
max_problems: int | None = 10
problem_event(summary, src='')
Parameters:
  • summary (str)

  • src (str)

Return type:

ProblemEvent

warnings: list[BaseException]
class gwproactor.QOS(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)
AtLeastOnce = 1
AtMostOnce = 0
ExactlyOnce = 2
class gwproactor.Runnable

Pure interface to an object which is expected to support starting, stopping and joining.

abstract async join()
Return type:

None

abstract start()
Return type:

None

abstract stop()
Return type:

None

async stop_and_join()
Return type:

None

class gwproactor.ServicesInterface

Interface to system services (the proactor)

abstract add_web_route(server_name, method, path, handler, **kwargs)

Adds configuration for web server route which will be available after start() is called.

May be called even if associated web server is not configured, in which case this route will simply be ignored.

Not thread safe.

Parameters:
  • server_name (str)

  • method (str)

  • path (str)

  • handler (Callable[[Request], Awaitable[StreamResponse]])

  • kwargs (Any)

abstract add_web_server_config(name, host, port, **kwargs)

Adds configuration for web server which will be started when start() is called.

Not thread safe.

Parameters:
  • name (str)

  • host (str)

  • port (int)

  • kwargs (Any)

Return type:

None

abstract property async_receive_queue: Queue | None
abstract property event_loop: AbstractEventLoop | None
abstract generate_event(event)
Parameters:

event (EventT)

Return type:

None

abstract get_communicator(name)
Parameters:

name (str)

Return type:

CommunicatorInterface | None

abstract get_communicator_as_type(name, type_)
Parameters:
  • name (str)

  • type_ (Type[T])

Return type:

T | None

abstract get_external_watchdog_builder_class()
Return type:

type[ExternalWatchdogCommandBuilder]

abstract property hardware_layout: HardwareLayout
abstract property io_loop_manager: IOLoopInterface
abstract property logger: ProactorLogger
abstract property publication_name: str
abstract send(message)
Parameters:

message (Message)

Return type:

None

abstract send_threadsafe(message)
Parameters:

message (Message)

Return type:

None

abstract property settings: ProactorSettings
abstract property stats: ProactorStats
class gwproactor.Subscription(Topic, Qos)
Parameters:
  • Topic (str)

  • Qos (QOS)

Qos: QOS

Alias for field number 1

Topic: str

Alias for field number 0

class gwproactor.SyncAsyncInteractionThread(channel=None, name=None, iterate_sleep_seconds=None, responsive_sleep_step_seconds=0.1, pat_timeout=20, daemon=True)

A thread wrapper providing an async-sync communication channel and simple “iterate, sleep, read message” semantics.

Parameters:
  • channel (SyncAsyncQueueWriter | None)

  • name (str | None)

  • iterate_sleep_seconds (float | None)

  • responsive_sleep_step_seconds (float)

  • pat_timeout (float | None)

  • daemon (bool)

JOIN_CHECK_THREAD_SECONDS = 1.0
PAT_TIMEOUT = 20
SLEEP_STEP_SECONDS = 0.1
async async_join(timeout=None)
Parameters:

timeout (float)

Return type:

None

pat_timeout: float | None
pat_watchdog()
put_to_sync_queue(message, block=True, timeout=None)
Parameters:
  • message (Any)

  • block (bool)

  • timeout (float | None)

Return type:

None

request_stop()
Return type:

None

run()

Method representing the thread’s activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

running: bool | None
set_async_loop(loop, async_queue)
Parameters:
  • loop (AbstractEventLoop)

  • async_queue (Queue)

Return type:

None

set_async_loop_and_start(loop, async_queue)
Parameters:
  • loop (AbstractEventLoop)

  • async_queue (Queue)

Return type:

None

time_to_pat()
Return type:

bool

class gwproactor.SyncAsyncQueueWriter(sync_queue=None)

Provide a full duplex communication “channel” between synchronous and asynchronous code.

It is assumed the asynchronous reader has access to the asyncio Queue “await get()” from directly from it.

Parameters:

sync_queue (Queue | None)

get_from_sync_queue(block=True, timeout=None)

Read from synchronous queue in a threadsafe way.

Parameters:
  • block (bool)

  • timeout (float | None)

Return type:

Any

put_to_async_queue(item)

Write to asynchronous queue in a threadsafe way.

Parameters:

item (Any)

put_to_sync_queue(item, block=True, timeout=None)

Write to synchronous queue in a threadsafe way.

Parameters:
  • item (Any)

  • block (bool)

  • timeout (float | None)

set_async_loop(loop, async_queue)
Parameters:
  • loop (AbstractEventLoop)

  • async_queue (Queue)

Return type:

None

sync_queue: Queue | None
class gwproactor.SyncThreadActor(name, services, sync_thread)
Parameters:
async join()
property monitored_names: Sequence[MonitoredName]
process_message(message)
Parameters:

message (Message)

Return type:

Ok[bool] | Err[BaseException]

send_driver_message(message)
Parameters:

message (Any)

Return type:

None

start()
stop()
gwproactor.format_exceptions(exceptions)
Parameters:

exceptions (list[BaseException])

Return type:

str

gwproactor.responsive_sleep(obj, seconds, step_duration=0.1, running_field_name='_main_loop_running', running_field=True)

Sleep in way that is more responsive to thread termination: sleep in step_duration increments up to specificed seconds, at after each step checking obj._main_loop_running. If the designated running_field_name actually indicates that a stop has been requested (e.g. what you would expect from a field named ‘_stop_requested’), set running_field parameter to False.

Parameters:
  • seconds (float)

  • step_duration (float)

  • running_field_name (str)

  • running_field (bool)

Return type:

bool

gwproactor.setup_logging(args, settings, errors=None, add_screen_handler=True, root_gets_handlers=True)

Get python logging config based on parsed command line args, defaults, environment variables and logging config file.

The order of precedence is:

  1. Command line arguments

  2. Environment

  3. Defaults

Parameters:
  • args (Namespace)

  • settings (ProactorSettings)

  • errors (list[BaseException] | None)

  • add_screen_handler (bool)

  • root_gets_handlers (bool)

Return type:

None