gwproactor#

This packages provides infrastructure for running a proactor on top of asyncio with support multiple MQTT clients and and sub-objects which support their own threads for synchronous operations.

This packages is not GridWorks-aware (except that it links actors with multiple mqtt clients). This separation between communication / action infrastructure and GridWorks semantics is intended to allow the latter to be more focussed.

This package is not polished and the separation is up for debate.

Particular questions:

  • Is the programming model still clean after more concrete actors are implemented and more infrastructure are added.

  • Does the separation add value or just complicate analysis.

  • MQTTClients should be made async.

  • Semantics of building message type namespaces should be spelled out / further worked out.

  • Test support should be implemented / cleaner.

class gwproactor.Actor(name, services)#
Parameters:
property alias#
property node#
class gwproactor.ActorInterface#

Pure interface for a proactor sub-object (an Actor) which can communicate and has a GridWorks ShNode.

abstract property alias: str#
classmethod load(name, actor_class_name, services, module_name)#
Parameters:
  • name (str) –

  • actor_class_name (str) –

  • services (ServicesInterface) –

  • module_name (str) –

Return type:

ActorInterface

abstract property node: ShNode#
class gwproactor.AsyncQueueWriter#

Allow synchronous code to write to an asyncio Queue.

It is assumed the asynchronous reader has access to the asyncio Queue “await get()” from directly from it.

put(item)#

Write to asyncio queue in a threadsafe way.

Parameters:

item (Any) –

Return type:

None

set_async_loop(loop, async_queue)#
Parameters:
  • loop (AbstractEventLoop) –

  • async_queue (Queue) –

Return type:

None

class gwproactor.Communicator(name, services)#

A partial implementation of CommunicatorInterface which supplies the trivial implementations

Parameters:
property monitored_names: Sequence[MonitoredName]#
property name: str#
property services: ServicesInterface#
class gwproactor.CommunicatorInterface#

Pure interface necessary for interaction between a sub-object and the system services proactor

abstract property monitored_names: Sequence[MonitoredName]#
abstract property name: str#
abstract process_message(message)#
Parameters:

message (Message) –

Return type:

Ok[bool] | Err[BaseException]

abstract property services: ServicesInterface#
class gwproactor.ExternalWatchdogCommandBuilder#

Create arguments which will be passed to subprocess.run() to pat the external watchdog.

If the returned list is empty, pat process will be run.

By default an empty list is returned if the environment variable named by sevice_variable_name() is not set to 1 or true.

classmethod default_pat_args(pid=None)#
Parameters:

pid (int | None) –

Return type:

list[str]

classmethod pat_args(service_name, args=None, pid=None)#

Return arguments to be passed to subprocess.run() to pat the external watchdog.

Parameters:
  • service_name (str) –

  • args (list[str] | None) –

  • pid (int | None) –

Return type:

list[str]

classmethod running_as_service(service_name)#
Parameters:

service_name (str) –

Return type:

bool

classmethod service_variable_name(service_name)#
Parameters:

service_name (str) –

Return type:

str

class gwproactor.MQTTClientWrapper(name, client_config, receive_queue)#
Parameters:
  • name (str) –

  • client_config (MQTTClient) –

  • receive_queue (AsyncQueueWriter) –

connected()#
Return type:

bool

disable_logger()#
enable_logger(logger=None)#
Parameters:

logger (Logger | LoggerAdapter | None) –

handle_suback(suback)#
Parameters:

suback (MQTTSubackPayload) –

Return type:

int

num_pending_subscriptions()#
Return type:

int

num_subscriptions()#
Return type:

int

on_connect(_, userdata, flags, rc)#
on_connect_fail(_, userdata)#
on_disconnect(_, userdata, rc)#
on_message(_, userdata, message)#
on_subscribe(_, userdata, mid, granted_qos)#
publish(topic, payload, qos)#
Parameters:
  • topic (str) –

  • payload (bytes) –

  • qos (int) –

Return type:

MQTTMessageInfo

start()#
stop()#
subscribe(topic, qos)#
Parameters:
  • topic (str) –

  • qos (int) –

Return type:

Tuple[int, int | None]

subscribe_all()#
Return type:

Tuple[int, int | None]

subscribed()#
Return type:

bool

subscription_items()#
Return type:

list[Tuple[str, int]]

unsubscribe(topic)#
Parameters:

topic (str) –

Return type:

Tuple[int, int | None]

class gwproactor.MQTTClients#
add_client(name, client_config, upstream=False, primary_peer=False)#
Parameters:
  • name (str) –

  • client_config (MQTTClient) –

  • upstream (bool) –

  • primary_peer (bool) –

client_wrapper(client)#
Parameters:

client (str) –

Return type:

MQTTClientWrapper

clients: Dict[str, MQTTClientWrapper]#
connected(client)#
Parameters:

client (str) –

Return type:

bool

disable_loggers()#
enable_loggers(logger=None)#
Parameters:

logger (Logger | LoggerAdapter | None) –

handle_suback(suback)#
Parameters:

suback (MQTTSubackPayload) –

Return type:

int

num_pending_subscriptions(client)#
Parameters:

client (str) –

Return type:

int

num_subscriptions(client)#
Parameters:

client (str) –

Return type:

int

primary_peer()#
Return type:

MQTTClientWrapper

primary_peer_client: str = ''#
publish(client, topic, payload, qos)#
Parameters:
  • client (str) –

  • topic (str) –

  • payload (bytes) –

  • qos (int) –

Return type:

MQTTMessageInfo

start(loop, async_queue)#
Parameters:
  • loop (AbstractEventLoop) –

  • async_queue (Queue) –

stop()#
subscribe(client, topic, qos)#
Parameters:
  • client (str) –

  • topic (str) –

  • qos (int) –

Return type:

Tuple[int, int | None]

subscribe_all(client)#
Parameters:

client (str) –

Return type:

Tuple[int, int | None]

subscribed(client)#
Parameters:

client (str) –

Return type:

bool

unsubscribe(client, topic)#
Parameters:
  • client (str) –

  • topic (str) –

Return type:

Tuple[int, int | None]

upstream()#
Return type:

MQTTClientWrapper

upstream_client: str = ''#
class gwproactor.MonitoredName(name: str, timeout_seconds: float)#
Parameters:
  • name (str) –

  • timeout_seconds (float) –

name: str#
timeout_seconds: float#
class gwproactor.Proactor(name, settings, hardware_layout=None)#
Parameters:
  • name (str) –

  • settings (ProactorSettings) –

  • hardware_layout (HardwareLayout | None) –

add_communicator(communicator)#
Parameters:

communicator (CommunicatorInterface) –

property async_receive_queue: Queue | None#
property event_loop: AbstractEventLoop | None#
generate_event(event)#
Parameters:

event (EventT) –

Return type:

Ok[bool] | Err[BaseException]

get_communicator(name)#
Parameters:

name (str) –

Return type:

CommunicatorInterface

get_external_watchdog_builder_class()#
Return type:

type[ExternalWatchdogCommandBuilder]

property hardware_layout: HardwareLayout#
property io_loop_manager: IOLoopInterface#
async join()#
property logger: ProactorLogger#
classmethod make_event_persister(settings)#
Parameters:

settings (ProactorSettings) –

Return type:

PersisterInterface

classmethod make_stats()#
Return type:

ProactorStats

property monitored_names: Sequence[MonitoredName]#
property name: str#
property primary_peer_client: str#
async process_message(message)#
Parameters:

message (Message) –

async process_messages()#
property publication_name: str#
async run_forever()#
send(message)#
Parameters:

message (Message) –

send_threadsafe(message)#
Parameters:

message (Message) –

Return type:

None

property services: ServicesInterface#
property settings: ProactorSettings#
start()#
start_tasks()#
property stats: ProactorStats#
stop()#
property upstream_client: str#
class gwproactor.ProactorLogger(base, message_summary, lifecycle, comm_event, extra=None)#
Parameters:
  • base (str) –

  • message_summary (str) –

  • lifecycle (str) –

  • comm_event (str) –

  • extra (dict | None) –

MESSAGE_DELIMITER_WIDTH = 88#
MESSAGE_ENTRY_DELIMITER = '++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++'#
MESSAGE_EXIT_DELIMITER = '----------------------------------------------------------------------------------------'#
comm_event(msg, *args, **kwargs)#
Parameters:

msg (str) –

Return type:

None

property comm_event_enabled: bool#
comm_event_logger: Logger#
property general_enabled: bool#
lifecycle(msg, *args, **kwargs)#
Parameters:

msg (str) –

Return type:

None

property lifecycle_enabled: bool#
lifecycle_logger: Logger#
message_enter(msg, *args, **kwargs)#
Parameters:

msg (str) –

Return type:

None

message_exit(msg, *args, **kwargs)#
Parameters:

msg (str) –

Return type:

None

message_summary(direction, actor_alias, topic, payload_object=None, broker_flag=' ', timestamp=None, message_id='')#
Parameters:
  • direction (str) –

  • actor_alias (str) –

  • topic (str) –

  • payload_object (Any) –

  • broker_flag (str) –

  • timestamp (datetime | None) –

  • message_id (str) –

Return type:

None

property message_summary_enabled: bool#
message_summary_logger: Logger#
path(msg, *args, **kwargs)#
Parameters:

msg (str) –

Return type:

None

property path_enabled: bool#
class gwproactor.ProactorSettings(_env_file='<object object>', _env_file_encoding=None, _env_nested_delimiter=None, _secrets_dir=None, *, paths=None, logging=LoggingSettings(base_log_name='gridworks', base_log_level=30, levels=LoggerLevels(message_summary=30, lifecycle=20, comm_event=20), formatter=FormatterSettings(fmt='%(asctime)s %(message)s', datefmt='', default_msec_format='%s.%03d'), file_handler=RotatingFileHandlerSettings(filename='proactor.log', bytes_per_log_file=2097152, num_log_files=10, level=0)), mqtt_link_poll_seconds=60.0, ack_timeout_seconds=5.0, num_initial_event_reuploads=5)#
Parameters:
  • _env_file (str | PathLike | List[str | PathLike] | Tuple[str | PathLike, ...] | None) –

  • _env_file_encoding (str | None) –

  • _env_nested_delimiter (str | None) –

  • _secrets_dir (str | PathLike | None) –

  • paths (Paths) –

  • logging (LoggingSettings) –

  • mqtt_link_poll_seconds (float) –

  • ack_timeout_seconds (float) –

  • num_initial_event_reuploads (int) –

class Config#
env_nested_delimiter = '__'#
env_prefix = 'PROACTOR_'#
ack_timeout_seconds: float#
classmethod get_paths(v)#
Parameters:

v (Paths) –

Return type:

Paths

logging: LoggingSettings#
num_initial_event_reuploads: int#
paths: Paths#
classmethod post_root_validator(values)#

Update unset paths of any member MQTTClient’s TLS paths based on ProactorSettings ‘paths’ member.

Parameters:

values (dict) –

Return type:

dict

classmethod update_paths_name(values, name)#

Update paths member with a new ‘name’ attribute, e.g., a name known by a derived class.

This is meant to be called in a ‘pre=True’ root validator of a derived class.

Parameters:
  • values (dict) –

  • name (str) –

Return type:

dict

exception gwproactor.Problems(msg='', warnings=None, errors=None, max_problems=10)#
Parameters:
  • msg (str) –

  • warnings (list[BaseException]) –

  • errors (list[BaseException]) –

  • max_problems (int | None) –

MAX_PROBLEMS = 10#
add_error(error)#
Parameters:

error (BaseException) –

Return type:

Problems

add_problems(other)#
Parameters:

other (Problems) –

Return type:

Problems

add_warning(warning)#
Parameters:

warning (BaseException) –

Return type:

Problems

error_traceback_str()#
Return type:

str

errors: list[BaseException]#
max_problems: int | None = 10#
problem_event(summary, src='')#
Parameters:
  • summary (str) –

  • src (str) –

Return type:

ProblemEvent

warnings: list[BaseException]#
class gwproactor.QOS(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)#
AtLeastOnce = 1#
AtMostOnce = 0#
ExactlyOnce = 2#
class gwproactor.Runnable#

Pure interface to an object which is expected to support starting, stopping and joining.

abstract async join()#
Return type:

None

abstract start()#
Return type:

None

abstract stop()#
Return type:

None

async stop_and_join()#
Return type:

None

class gwproactor.ServicesInterface#

Interface to system services (the proactor)

abstract property async_receive_queue: Queue | None#
abstract property event_loop: AbstractEventLoop | None#
abstract generate_event(event)#
Parameters:

event (EventT) –

Return type:

None

abstract get_communicator(name)#
Parameters:

name (str) –

Return type:

CommunicatorInterface

abstract get_external_watchdog_builder_class()#
Return type:

type[ExternalWatchdogCommandBuilder]

abstract property hardware_layout: HardwareLayout#
abstract property io_loop_manager: IOLoopInterface#
abstract property logger: ProactorLogger#
abstract property publication_name: str#
abstract send(message)#
Parameters:

message (Message) –

Return type:

None

abstract send_threadsafe(message)#
Parameters:

message (Message) –

Return type:

None

abstract property settings: ProactorSettings#
abstract property stats: ProactorStats#
class gwproactor.Subscription(Topic, Qos)#
Parameters:
  • Topic (str) –

  • Qos (QOS) –

Qos: QOS#

Alias for field number 1

Topic: str#

Alias for field number 0

class gwproactor.SyncAsyncInteractionThread(channel=None, name=None, iterate_sleep_seconds=None, responsive_sleep_step_seconds=0.1, pat_timeout=20, daemon=True)#

A thread wrapper providing an async-sync communication channel and simple “iterate, sleep, read message” semantics.

Parameters:
  • channel (SyncAsyncQueueWriter | None) –

  • name (str | None) –

  • iterate_sleep_seconds (float | None) –

  • responsive_sleep_step_seconds (float) –

  • pat_timeout (float | None) –

  • daemon (bool) –

JOIN_CHECK_THREAD_SECONDS = 1.0#
PAT_TIMEOUT = 20#
SLEEP_STEP_SECONDS = 0.1#
async async_join(timeout=None)#
Parameters:

timeout (float) –

Return type:

None

pat_timeout: float | None#
pat_watchdog()#
put_to_sync_queue(message, block=True, timeout=None)#
Parameters:
  • message (Any) –

  • block (bool) –

  • timeout (float | None) –

Return type:

None

request_stop()#
Return type:

None

run()#

Method representing the thread’s activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

running: bool | None#
set_async_loop(loop, async_queue)#
Parameters:
  • loop (AbstractEventLoop) –

  • async_queue (Queue) –

Return type:

None

set_async_loop_and_start(loop, async_queue)#
Parameters:
  • loop (AbstractEventLoop) –

  • async_queue (Queue) –

Return type:

None

time_to_pat()#
Return type:

bool

class gwproactor.SyncAsyncQueueWriter(sync_queue=None)#

Provide a full duplex communication “channel” between synchronous and asynchronous code.

It is assumed the asynchronous reader has access to the asyncio Queue “await get()” from directly from it.

Parameters:

sync_queue (Queue | None) –

get_from_sync_queue(block=True, timeout=None)#

Read from synchronous queue in a threadsafe way.

Parameters:
  • block (bool) –

  • timeout (float | None) –

Return type:

Any

put_to_async_queue(item)#

Write to asynchronous queue in a threadsafe way.

Parameters:

item (Any) –

put_to_sync_queue(item, block=True, timeout=None)#

Write to synchronous queue in a threadsafe way.

Parameters:
  • item (Any) –

  • block (bool) –

  • timeout (float | None) –

set_async_loop(loop, async_queue)#
Parameters:
  • loop (AbstractEventLoop) –

  • async_queue (Queue) –

Return type:

None

sync_queue: Queue | None#
class gwproactor.SyncThreadActor(name, services, sync_thread)#
Parameters:
async join()#
property monitored_names: Sequence[MonitoredName]#
process_message(message)#
Parameters:

message (Message) –

Return type:

Ok[bool] | Err[BaseException]

send_driver_message(message)#
Parameters:

message (Any) –

Return type:

None

start()#
stop()#
gwproactor.format_exceptions(exceptions)#
Parameters:

exceptions (list[BaseException]) –

Return type:

str

gwproactor.responsive_sleep(obj, seconds, step_duration=0.1, running_field_name='_main_loop_running', running_field=True)#

Sleep in way that is more responsive to thread termination: sleep in step_duration increments up to specificed seconds, at after each step checking obj._main_loop_running. If the designated running_field_name actually indicates that a stop has been requested (e.g. what you would expect from a field named ‘_stop_requested’), set running_field parameter to False.

Parameters:
  • seconds (float) –

  • step_duration (float) –

  • running_field_name (str) –

  • running_field (bool) –

Return type:

bool

gwproactor.setup_logging(args, settings, errors=None, add_screen_handler=True, root_gets_handlers=True)#

Get python logging config based on parsed command line args, defaults, environment variables and logging config file.

The order of precedence is:

  1. Command line arguments

  2. Environment

  3. Defaults

Parameters:
  • args (Namespace) –

  • settings (ProactorSettings) –

  • errors (list[BaseException] | None) –

  • add_screen_handler (bool) –

  • root_gets_handlers (bool) –

Return type:

None