gwproactor#
This packages provides infrastructure for running a proactor on top of asyncio with support multiple MQTT clients and and sub-objects which support their own threads for synchronous operations.
This packages is not GridWorks-aware (except that it links actors with multiple mqtt clients). This separation between communication / action infrastructure and GridWorks semantics is intended to allow the latter to be more focussed.
This package is not polished and the separation is up for debate.
Particular questions:
Is the programming model still clean after more concrete actors are implemented and more infrastructure are added.
Does the separation add value or just complicate analysis.
MQTTClients should be made async.
Semantics of building message type namespaces should be spelled out / further worked out.
Test support should be implemented / cleaner.
- class gwproactor.Actor(name, services)#
- Parameters:
name (str) –
services (ServicesInterface) –
- property alias#
- property node#
- class gwproactor.ActorInterface#
Pure interface for a proactor sub-object (an Actor) which can communicate and has a GridWorks ShNode.
- abstract property alias: str#
- classmethod load(name, actor_class_name, services, module_name)#
- Parameters:
name (str) –
actor_class_name (str) –
services (ServicesInterface) –
module_name (str) –
- Return type:
- abstract property node: ShNode#
- class gwproactor.AsyncQueueWriter#
Allow synchronous code to write to an asyncio Queue.
It is assumed the asynchronous reader has access to the asyncio Queue “await get()” from directly from it.
- put(item)#
Write to asyncio queue in a threadsafe way.
- Parameters:
item (Any) –
- Return type:
None
- set_async_loop(loop, async_queue)#
- Parameters:
loop (AbstractEventLoop) –
async_queue (Queue) –
- Return type:
None
- class gwproactor.Communicator(name, services)#
A partial implementation of CommunicatorInterface which supplies the trivial implementations
- Parameters:
name (str) –
services (ServicesInterface) –
- property monitored_names: Sequence[MonitoredName]#
- property name: str#
- property services: ServicesInterface#
- class gwproactor.CommunicatorInterface#
Pure interface necessary for interaction between a sub-object and the system services proactor
- abstract property monitored_names: Sequence[MonitoredName]#
- abstract property name: str#
- abstract process_message(message)#
- Parameters:
message (Message) –
- Return type:
Ok[bool] | Err[BaseException]
- abstract property services: ServicesInterface#
- class gwproactor.ExternalWatchdogCommandBuilder#
Create arguments which will be passed to subprocess.run() to pat the external watchdog.
If the returned list is empty, pat process will be run.
By default an empty list is returned if the environment variable named by sevice_variable_name() is not set to 1 or true.
- classmethod default_pat_args(pid=None)#
- Parameters:
pid (int | None) –
- Return type:
list[str]
- classmethod pat_args(service_name, args=None, pid=None)#
Return arguments to be passed to subprocess.run() to pat the external watchdog.
- Parameters:
service_name (str) –
args (list[str] | None) –
pid (int | None) –
- Return type:
list[str]
- classmethod running_as_service(service_name)#
- Parameters:
service_name (str) –
- Return type:
bool
- classmethod service_variable_name(service_name)#
- Parameters:
service_name (str) –
- Return type:
str
- class gwproactor.MQTTClientWrapper(name, client_config, receive_queue)#
- Parameters:
name (str) –
client_config (MQTTClient) –
receive_queue (AsyncQueueWriter) –
- connected()#
- Return type:
bool
- disable_logger()#
- enable_logger(logger=None)#
- Parameters:
logger (Logger | LoggerAdapter | None) –
- handle_suback(suback)#
- Parameters:
suback (MQTTSubackPayload) –
- Return type:
int
- num_pending_subscriptions()#
- Return type:
int
- num_subscriptions()#
- Return type:
int
- on_connect(_, userdata, flags, rc)#
- on_connect_fail(_, userdata)#
- on_disconnect(_, userdata, rc)#
- on_message(_, userdata, message)#
- on_subscribe(_, userdata, mid, granted_qos)#
- publish(topic, payload, qos)#
- Parameters:
topic (str) –
payload (bytes) –
qos (int) –
- Return type:
MQTTMessageInfo
- start()#
- stop()#
- subscribe(topic, qos)#
- Parameters:
topic (str) –
qos (int) –
- Return type:
Tuple[int, int | None]
- subscribe_all()#
- Return type:
Tuple[int, int | None]
- subscribed()#
- Return type:
bool
- subscription_items()#
- Return type:
list[Tuple[str, int]]
- unsubscribe(topic)#
- Parameters:
topic (str) –
- Return type:
Tuple[int, int | None]
- class gwproactor.MQTTClients#
- add_client(name, client_config, upstream=False, primary_peer=False)#
- Parameters:
name (str) –
client_config (MQTTClient) –
upstream (bool) –
primary_peer (bool) –
- client_wrapper(client)#
- Parameters:
client (str) –
- Return type:
- clients: Dict[str, MQTTClientWrapper]#
- connected(client)#
- Parameters:
client (str) –
- Return type:
bool
- disable_loggers()#
- enable_loggers(logger=None)#
- Parameters:
logger (Logger | LoggerAdapter | None) –
- handle_suback(suback)#
- Parameters:
suback (MQTTSubackPayload) –
- Return type:
int
- num_pending_subscriptions(client)#
- Parameters:
client (str) –
- Return type:
int
- num_subscriptions(client)#
- Parameters:
client (str) –
- Return type:
int
- primary_peer()#
- Return type:
- primary_peer_client: str = ''#
- publish(client, topic, payload, qos)#
- Parameters:
client (str) –
topic (str) –
payload (bytes) –
qos (int) –
- Return type:
MQTTMessageInfo
- start(loop, async_queue)#
- Parameters:
loop (AbstractEventLoop) –
async_queue (Queue) –
- stop()#
- subscribe(client, topic, qos)#
- Parameters:
client (str) –
topic (str) –
qos (int) –
- Return type:
Tuple[int, int | None]
- subscribe_all(client)#
- Parameters:
client (str) –
- Return type:
Tuple[int, int | None]
- subscribed(client)#
- Parameters:
client (str) –
- Return type:
bool
- unsubscribe(client, topic)#
- Parameters:
client (str) –
topic (str) –
- Return type:
Tuple[int, int | None]
- upstream()#
- Return type:
- upstream_client: str = ''#
- class gwproactor.MonitoredName(name: str, timeout_seconds: float)#
- Parameters:
name (str) –
timeout_seconds (float) –
- name: str#
- timeout_seconds: float#
- class gwproactor.Proactor(name, settings, hardware_layout=None)#
- Parameters:
name (str) –
settings (ProactorSettings) –
hardware_layout (HardwareLayout | None) –
- add_communicator(communicator)#
- Parameters:
communicator (CommunicatorInterface) –
- property async_receive_queue: Queue | None#
- property event_loop: AbstractEventLoop | None#
- generate_event(event)#
- Parameters:
event (EventT) –
- Return type:
Ok[bool] | Err[BaseException]
- get_communicator(name)#
- Parameters:
name (str) –
- Return type:
- get_external_watchdog_builder_class()#
- Return type:
- property hardware_layout: HardwareLayout#
- property io_loop_manager: IOLoopInterface#
- async join()#
- property logger: ProactorLogger#
- classmethod make_event_persister(settings)#
- Parameters:
settings (ProactorSettings) –
- Return type:
PersisterInterface
- classmethod make_stats()#
- Return type:
ProactorStats
- property monitored_names: Sequence[MonitoredName]#
- property name: str#
- property primary_peer_client: str#
- async process_message(message)#
- Parameters:
message (Message) –
- async process_messages()#
- property publication_name: str#
- async run_forever()#
- send(message)#
- Parameters:
message (Message) –
- send_threadsafe(message)#
- Parameters:
message (Message) –
- Return type:
None
- property services: ServicesInterface#
- property settings: ProactorSettings#
- start()#
- start_tasks()#
- property stats: ProactorStats#
- stop()#
- property upstream_client: str#
- class gwproactor.ProactorLogger(base, message_summary, lifecycle, comm_event, extra=None)#
- Parameters:
base (str) –
message_summary (str) –
lifecycle (str) –
comm_event (str) –
extra (dict | None) –
- MESSAGE_DELIMITER_WIDTH = 88#
- MESSAGE_ENTRY_DELIMITER = '++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++'#
- MESSAGE_EXIT_DELIMITER = '----------------------------------------------------------------------------------------'#
- comm_event(msg, *args, **kwargs)#
- Parameters:
msg (str) –
- Return type:
None
- property comm_event_enabled: bool#
- comm_event_logger: Logger#
- property general_enabled: bool#
- lifecycle(msg, *args, **kwargs)#
- Parameters:
msg (str) –
- Return type:
None
- property lifecycle_enabled: bool#
- lifecycle_logger: Logger#
- message_enter(msg, *args, **kwargs)#
- Parameters:
msg (str) –
- Return type:
None
- message_exit(msg, *args, **kwargs)#
- Parameters:
msg (str) –
- Return type:
None
- message_summary(direction, actor_alias, topic, payload_object=None, broker_flag=' ', timestamp=None, message_id='')#
- Parameters:
direction (str) –
actor_alias (str) –
topic (str) –
payload_object (Any) –
broker_flag (str) –
timestamp (datetime | None) –
message_id (str) –
- Return type:
None
- property message_summary_enabled: bool#
- message_summary_logger: Logger#
- path(msg, *args, **kwargs)#
- Parameters:
msg (str) –
- Return type:
None
- property path_enabled: bool#
- class gwproactor.ProactorSettings(_env_file='<object object>', _env_file_encoding=None, _env_nested_delimiter=None, _secrets_dir=None, *, paths=None, logging=LoggingSettings(base_log_name='gridworks', base_log_level=30, levels=LoggerLevels(message_summary=30, lifecycle=20, comm_event=20), formatter=FormatterSettings(fmt='%(asctime)s %(message)s', datefmt='', default_msec_format='%s.%03d'), file_handler=RotatingFileHandlerSettings(filename='proactor.log', bytes_per_log_file=2097152, num_log_files=10, level=0)), mqtt_link_poll_seconds=60.0, ack_timeout_seconds=5.0, num_initial_event_reuploads=5)#
- Parameters:
_env_file (str | PathLike | List[str | PathLike] | Tuple[str | PathLike, ...] | None) –
_env_file_encoding (str | None) –
_env_nested_delimiter (str | None) –
_secrets_dir (str | PathLike | None) –
paths (Paths) –
logging (LoggingSettings) –
mqtt_link_poll_seconds (float) –
ack_timeout_seconds (float) –
num_initial_event_reuploads (int) –
- ack_timeout_seconds: float#
- classmethod get_paths(v)#
- Parameters:
v (Paths) –
- Return type:
Paths
- logging: LoggingSettings#
- mqtt_link_poll_seconds: float#
- num_initial_event_reuploads: int#
- paths: Paths#
- classmethod post_root_validator(values)#
Update unset paths of any member MQTTClient’s TLS paths based on ProactorSettings ‘paths’ member.
- Parameters:
values (dict) –
- Return type:
dict
- classmethod update_paths_name(values, name)#
Update paths member with a new ‘name’ attribute, e.g., a name known by a derived class.
This is meant to be called in a ‘pre=True’ root validator of a derived class.
- Parameters:
values (dict) –
name (str) –
- Return type:
dict
- exception gwproactor.Problems(msg='', warnings=None, errors=None, max_problems=10)#
- Parameters:
msg (str) –
warnings (list[BaseException]) –
errors (list[BaseException]) –
max_problems (int | None) –
- MAX_PROBLEMS = 10#
- error_traceback_str()#
- Return type:
str
- errors: list[BaseException]#
- max_problems: int | None = 10#
- problem_event(summary, src='')#
- Parameters:
summary (str) –
src (str) –
- Return type:
ProblemEvent
- warnings: list[BaseException]#
- class gwproactor.QOS(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)#
- AtLeastOnce = 1#
- AtMostOnce = 0#
- ExactlyOnce = 2#
- class gwproactor.Runnable#
Pure interface to an object which is expected to support starting, stopping and joining.
- abstract async join()#
- Return type:
None
- abstract start()#
- Return type:
None
- abstract stop()#
- Return type:
None
- async stop_and_join()#
- Return type:
None
- class gwproactor.ServicesInterface#
Interface to system services (the proactor)
- abstract property async_receive_queue: Queue | None#
- abstract property event_loop: AbstractEventLoop | None#
- abstract generate_event(event)#
- Parameters:
event (EventT) –
- Return type:
None
- abstract get_communicator(name)#
- Parameters:
name (str) –
- Return type:
- abstract get_external_watchdog_builder_class()#
- Return type:
- abstract property hardware_layout: HardwareLayout#
- abstract property io_loop_manager: IOLoopInterface#
- abstract property logger: ProactorLogger#
- abstract property publication_name: str#
- abstract send(message)#
- Parameters:
message (Message) –
- Return type:
None
- abstract send_threadsafe(message)#
- Parameters:
message (Message) –
- Return type:
None
- abstract property settings: ProactorSettings#
- abstract property stats: ProactorStats#
- class gwproactor.Subscription(Topic, Qos)#
- Parameters:
Topic (str) –
Qos (QOS) –
- Topic: str#
Alias for field number 0
- class gwproactor.SyncAsyncInteractionThread(channel=None, name=None, iterate_sleep_seconds=None, responsive_sleep_step_seconds=0.1, pat_timeout=20, daemon=True)#
A thread wrapper providing an async-sync communication channel and simple “iterate, sleep, read message” semantics.
- Parameters:
channel (SyncAsyncQueueWriter | None) –
name (str | None) –
iterate_sleep_seconds (float | None) –
responsive_sleep_step_seconds (float) –
pat_timeout (float | None) –
daemon (bool) –
- JOIN_CHECK_THREAD_SECONDS = 1.0#
- PAT_TIMEOUT = 20#
- SLEEP_STEP_SECONDS = 0.1#
- async async_join(timeout=None)#
- Parameters:
timeout (float) –
- Return type:
None
- pat_timeout: float | None#
- pat_watchdog()#
- put_to_sync_queue(message, block=True, timeout=None)#
- Parameters:
message (Any) –
block (bool) –
timeout (float | None) –
- Return type:
None
- request_stop()#
- Return type:
None
- run()#
Method representing the thread’s activity.
You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.
- running: bool | None#
- set_async_loop(loop, async_queue)#
- Parameters:
loop (AbstractEventLoop) –
async_queue (Queue) –
- Return type:
None
- set_async_loop_and_start(loop, async_queue)#
- Parameters:
loop (AbstractEventLoop) –
async_queue (Queue) –
- Return type:
None
- time_to_pat()#
- Return type:
bool
- class gwproactor.SyncAsyncQueueWriter(sync_queue=None)#
Provide a full duplex communication “channel” between synchronous and asynchronous code.
It is assumed the asynchronous reader has access to the asyncio Queue “await get()” from directly from it.
- Parameters:
sync_queue (Queue | None) –
- get_from_sync_queue(block=True, timeout=None)#
Read from synchronous queue in a threadsafe way.
- Parameters:
block (bool) –
timeout (float | None) –
- Return type:
Any
- put_to_async_queue(item)#
Write to asynchronous queue in a threadsafe way.
- Parameters:
item (Any) –
- put_to_sync_queue(item, block=True, timeout=None)#
Write to synchronous queue in a threadsafe way.
- Parameters:
item (Any) –
block (bool) –
timeout (float | None) –
- set_async_loop(loop, async_queue)#
- Parameters:
loop (AbstractEventLoop) –
async_queue (Queue) –
- Return type:
None
- sync_queue: Queue | None#
- class gwproactor.SyncThreadActor(name, services, sync_thread)#
- Parameters:
name (str) –
services (ServicesInterface) –
sync_thread (SyncAsyncInteractionThread) –
- async join()#
- property monitored_names: Sequence[MonitoredName]#
- process_message(message)#
- Parameters:
message (Message) –
- Return type:
Ok[bool] | Err[BaseException]
- send_driver_message(message)#
- Parameters:
message (Any) –
- Return type:
None
- start()#
- stop()#
- gwproactor.format_exceptions(exceptions)#
- Parameters:
exceptions (list[BaseException]) –
- Return type:
str
- gwproactor.responsive_sleep(obj, seconds, step_duration=0.1, running_field_name='_main_loop_running', running_field=True)#
Sleep in way that is more responsive to thread termination: sleep in step_duration increments up to specificed seconds, at after each step checking obj._main_loop_running. If the designated running_field_name actually indicates that a stop has been requested (e.g. what you would expect from a field named ‘_stop_requested’), set running_field parameter to False.
- Parameters:
seconds (float) –
step_duration (float) –
running_field_name (str) –
running_field (bool) –
- Return type:
bool
- gwproactor.setup_logging(args, settings, errors=None, add_screen_handler=True, root_gets_handlers=True)#
Get python logging config based on parsed command line args, defaults, environment variables and logging config file.
The order of precedence is:
Command line arguments
Environment
Defaults
- Parameters:
args (Namespace) –
settings (ProactorSettings) –
errors (list[BaseException] | None) –
add_screen_handler (bool) –
root_gets_handlers (bool) –
- Return type:
None