gwproactor¶
This packages provides infrastructure for running a proactor on top of asyncio with support multiple MQTT clients and and sub-objects which support their own threads for synchronous operations.
This packages is not GridWorks-aware (except that it links actors with multiple mqtt clients). This separation between communication / action infrastructure and GridWorks semantics is intended to allow the latter to be more focussed.
This package is not polished and the separation is up for debate.
Particular questions:
Is the programming model still clean after more concrete actors are implemented and more infrastructure are added.
Does the separation add value or just complicate analysis.
MQTTClients should be made async.
Semantics of building message type namespaces should be spelled out / further worked out.
Test support should be implemented / cleaner.
- class gwproactor.Actor(name, services)¶
- Parameters:
name (str)
services (AppInterface)
- init()¶
Called after constructor so derived functions can be used in setup.
- Return type:
None
- classmethod instantiate(name, services, **constructor_args)¶
- Parameters:
name (str)
services (AppInterface)
constructor_args (Any)
- Return type:
- property name: str¶
- property node: ShNode¶
- class gwproactor.ActorInterface¶
Pure interface for a proactor sub-object (an Actor) which can communicate and has a GridWorks ShNode.
- abstract init()¶
Called after constructor so derived functions can be used in setup.
- Return type:
None
- abstract classmethod instantiate(name, services, **contstructor_kwargs)¶
- Parameters:
name (str)
services (AppInterface)
contstructor_kwargs (Any)
- Return type:
- classmethod load(name, actor_class_name, services, actors_module, **constructor_kwargs)¶
- Parameters:
name (str)
actor_class_name (str)
services (AppInterface)
actors_module (ModuleType)
constructor_kwargs (Any)
- Return type:
- abstract property name: str¶
- abstract property node: ShNode¶
- class gwproactor.App(*, paths_name=None, paths=None, app_settings=None, codec_factory=None, sub_types=None, env_file=None)¶
- Parameters:
paths_name (str | None)
paths (Paths | None)
app_settings (AppSettings | None)
codec_factory (CodecFactory | None)
sub_types (SubTypes)
env_file (str | Path | None)
- classmethod actors_module()¶
- Return type:
ModuleType | None
- add_callbacks(callbacks)¶
- Parameters:
callbacks (ProactorCallbackInterface)
- Return type:
int
- add_communicator(communicator)¶
- Parameters:
communicator (CommunicatorInterface)
- Return type:
None
- add_task(task)¶
- Parameters:
task (Task[Any])
- Return type:
None
- add_web_route(server_name, method, path, handler, **kwargs)¶
Adds configuration for web server route which will be available after start() is called.
May be called even if associated web server is not configured, in which case this route will simply be ignored.
Not thread safe.
- Parameters:
server_name (str)
method (str)
path (str)
handler (Callable[[Request], Awaitable[StreamResponse]])
kwargs (Any)
- Return type:
None
- add_web_server_config(name, host, port, **kwargs)¶
Adds configuration for web server which will be started when start() is called.
Not thread safe.
- Parameters:
name (str)
host (str)
port (int)
kwargs (Any)
- Return type:
None
- classmethod app_settings_type()¶
- Return type:
type[AppSettings]
- property async_receive_queue: Queue[Any] | None¶
- async await_processing(message)¶
- Parameters:
message (Message[Any])
- Return type:
Ok[Any] | Err[BaseException]
- config: ProactorConfig¶
- property downstream_client: str¶
- property event_loop: AbstractEventLoop | None¶
- generate_event(event)¶
- Parameters:
event (EventT)
- Return type:
Ok[bool] | Err[Exception]
- get_communicator(name)¶
- Parameters:
name (str)
- Return type:
CommunicatorInterface | None
- get_communicator_as_type(name, type_)¶
- Parameters:
name (str)
type_ (Type[T])
- Return type:
T | None
- get_communicator_names()¶
- Return type:
set[str]
- get_external_watchdog_builder_class()¶
- Return type:
- classmethod get_settings(paths_name=None, paths=None, settings=None, settings_type=None, env_file=None)¶
- Parameters:
paths_name (str | None)
paths (Paths | None)
settings (AppSettings | None)
settings_type (type[AppSettings] | None)
env_file (str | Path | None)
- Return type:
- get_web_server_configs()¶
- Return type:
dict[str, WebServerGt]
- get_web_server_route_strings()¶
- Return type:
dict[str, list[str]]
- property hardware_layout: HardwareLayout¶
- property has_prime_actor: bool¶
- instantiate()¶
- Return type:
Self
- property io_loop_manager: IOLoopInterface¶
- links: dict[str, LinkSettings]¶
- property logger: ProactorLogger¶
- classmethod make_subtypes()¶
- Return type:
SubTypes
- property name: str¶
- classmethod paths_name()¶
- Return type:
str | None
- property prime_actor: PrimeActor¶
- classmethod prime_actor_type()¶
- Return type:
type[PrimeActor] | None
- classmethod print_settings(*, env_file='.env')¶
- Parameters:
env_file (str | Path)
- Return type:
None
- property publication_name: str¶
- publish_message(link_name, message, qos=0, context=None, *, topic='', use_link_topic=False)¶
- Parameters:
link_name (str)
message (Message[Any])
qos (int)
context (Any)
topic (str)
use_link_topic (bool)
- Return type:
MQTTMessageInfo
- publish_upstream(payload, qos=QOS.AtMostOnce, **message_args)¶
- Parameters:
payload (Any)
qos (QOS)
message_args (Any)
- Return type:
MQTTMessageInfo
- remove_callbacks(callbacks_id)¶
- Parameters:
callbacks_id (int)
- Return type:
None
- run_in_thread(*, daemon=True)¶
- Parameters:
daemon (bool)
- Return type:
Thread
- send(message)¶
- Parameters:
message (Message[Any])
- Return type:
None
- send_threadsafe(message)¶
- Parameters:
message (Message[Any])
- Return type:
None
- property settings: AppSettings¶
- property stats: ProactorStats¶
- sub_types: SubTypes¶
- property subscription_name: str¶
- property upstream_client: str¶
- wait_for_processing_threadsafe(message)¶
- Parameters:
message (Message[Any])
- Return type:
Ok[Any] | Err[BaseException]
- class gwproactor.AppInterface¶
Interface to system services (the proactor)
- abstract add_callbacks(callbacks)¶
- Parameters:
callbacks (ProactorCallbackInterface)
- Return type:
int
- abstract add_communicator(communicator)¶
- Parameters:
communicator (CommunicatorInterface)
- Return type:
None
- abstract add_task(task)¶
- Parameters:
task (Task[Any])
- Return type:
None
- abstract add_web_route(server_name, method, path, handler, **kwargs)¶
Adds configuration for web server route which will be available after start() is called.
May be called even if associated web server is not configured, in which case this route will simply be ignored.
Not thread safe.
- Parameters:
server_name (str)
method (str)
path (str)
handler (Callable[[Request], Awaitable[StreamResponse]])
kwargs (Any)
- Return type:
None
- abstract add_web_server_config(name, host, port, **kwargs)¶
Adds configuration for web server which will be started when start() is called.
Not thread safe.
- Parameters:
name (str)
host (str)
port (int)
kwargs (Any)
- Return type:
None
- abstract property async_receive_queue: Queue[Any] | None¶
- abstract async await_processing(message)¶
- Parameters:
message (Message[Any])
- Return type:
Ok[Any] | Err[BaseException]
- property downstream_client: str¶
- abstract property event_loop: AbstractEventLoop | None¶
- abstract generate_event(event)¶
- Parameters:
event (EventT)
- Return type:
Ok[bool] | Err[Exception]
- abstract get_communicator(name)¶
- Parameters:
name (str)
- Return type:
CommunicatorInterface | None
- abstract get_communicator_as_type(name, type_)¶
- Parameters:
name (str)
type_ (Type[T])
- Return type:
T | None
- abstract get_communicator_names()¶
- Return type:
set[str]
- abstract get_external_watchdog_builder_class()¶
- Return type:
- abstract get_web_server_configs()¶
- Return type:
dict[str, WebServerGt]
- abstract get_web_server_route_strings()¶
- Return type:
dict[str, list[str]]
- abstract property hardware_layout: HardwareLayout¶
- abstract property io_loop_manager: IOLoopInterface¶
- abstract property logger: ProactorLogger¶
- abstract property publication_name: str¶
- abstract publish_message(link_name, message, qos=0, context=None, *, topic='', use_link_topic=False)¶
- Parameters:
link_name (str)
message (Message[Any])
qos (int)
context (Any)
topic (str)
use_link_topic (bool)
- Return type:
MQTTMessageInfo
- publish_upstream(payload, qos=QOS.AtMostOnce, **message_args)¶
- Parameters:
payload (Any)
qos (QOS)
message_args (Any)
- Return type:
MQTTMessageInfo
- abstract remove_callbacks(callbacks_id)¶
- Parameters:
callbacks_id (int)
- Return type:
None
- abstract send(message)¶
- Parameters:
message (Message[Any])
- Return type:
None
- abstract send_threadsafe(message)¶
- Parameters:
message (Message[Any])
- Return type:
None
- abstract property settings: AppSettings¶
- abstract property stats: ProactorStats¶
- abstract property subscription_name: str¶
- property upstream_client: str¶
- abstract wait_for_processing_threadsafe(message)¶
- Parameters:
message (Message[Any])
- Return type:
Ok[Any] | Err[BaseException]
- class gwproactor.AppSettings(_case_sensitive=None, _nested_model_default_partial_update=None, _env_prefix=None, _env_file=PosixPath('.'), _env_file_encoding=None, _env_ignore_empty=None, _env_nested_delimiter=None, _env_nested_max_split=None, _env_parse_none_str=None, _env_parse_enums=None, _cli_prog_name=None, _cli_parse_args=None, _cli_settings_source=None, _cli_parse_none_str=None, _cli_hide_none_type=None, _cli_avoid_json=None, _cli_enforce_required=None, _cli_use_class_docs_for_groups=None, _cli_exit_on_error=None, _cli_prefix=None, _cli_flag_prefix_char=None, _cli_implicit_flags=None, _cli_ignore_unknown_args=None, _cli_kebab_case=None, _secrets_dir=None, *, paths=<factory>, logging=LoggingSettings(base_log_name='gridworks', base_log_level=30, levels=LoggerLevels(message_summary=30, lifecycle=20, comm_event=20, io_loop=20), formatter=FormatterSettings(fmt='%(asctime)s %(message)s', datefmt='', default_msec_format='%s.%03d', use_utc=False), file_handler=RotatingFileHandlerSettings(filename='proactor.log', bytes_per_log_file=2097152, num_log_files=10, level=0), io_loop=IOLoopLoggerSettings(file_handler=RotatingFileHandlerSettings(filename='io_loop.log', bytes_per_log_file=2097152, num_log_files=2, level=0), on_screen=False)), proactor=ProactorSettings(mqtt_link_poll_seconds=60.0, ack_timeout_seconds=5.0, num_initial_event_reuploads=5))¶
- Parameters:
_case_sensitive (bool | None)
_nested_model_default_partial_update (bool | None)
_env_prefix (str | None)
_env_file (DotenvType | None)
_env_file_encoding (str | None)
_env_ignore_empty (bool | None)
_env_nested_delimiter (str | None)
_env_nested_max_split (int | None)
_env_parse_none_str (str | None)
_env_parse_enums (bool | None)
_cli_prog_name (str | None)
_cli_parse_args (bool | list[str] | tuple[str, ...] | None)
_cli_settings_source (CliSettingsSource[Any] | None)
_cli_parse_none_str (str | None)
_cli_hide_none_type (bool | None)
_cli_avoid_json (bool | None)
_cli_enforce_required (bool | None)
_cli_use_class_docs_for_groups (bool | None)
_cli_exit_on_error (bool | None)
_cli_prefix (str | None)
_cli_flag_prefix_char (str | None)
_cli_implicit_flags (bool | None)
_cli_ignore_unknown_args (bool | None)
_cli_kebab_case (bool | None)
_secrets_dir (PathType | None)
paths (Paths)
logging (LoggingSettings)
proactor (ProactorSettings)
- broker(name)¶
- Parameters:
name (str)
- Return type:
MQTTClient
- brokers()¶
- Return type:
dict[str, MQTTClient]
- check_tls_paths_present(*, raise_error=True)¶
- Parameters:
raise_error (bool)
- Return type:
str
- logging: LoggingSettings¶
- model_config: ClassVar[SettingsConfigDict] = {'arbitrary_types_allowed': True, 'case_sensitive': False, 'cli_avoid_json': False, 'cli_enforce_required': False, 'cli_exit_on_error': True, 'cli_flag_prefix_char': '-', 'cli_hide_none_type': False, 'cli_ignore_unknown_args': False, 'cli_implicit_flags': False, 'cli_kebab_case': False, 'cli_parse_args': None, 'cli_parse_none_str': None, 'cli_prefix': '', 'cli_prog_name': None, 'cli_use_class_docs_for_groups': False, 'enable_decoding': True, 'env_file': None, 'env_file_encoding': None, 'env_ignore_empty': False, 'env_nested_delimiter': '__', 'env_nested_max_split': None, 'env_parse_enums': None, 'env_parse_none_str': None, 'env_prefix': 'PROACTOR_APP_', 'extra': 'ignore', 'json_file': None, 'json_file_encoding': None, 'nested_model_default_partial_update': True, 'protected_namespaces': ('model_validate', 'model_dump', 'settings_customise_sources'), 'secrets_dir': None, 'toml_file': None, 'validate_default': True, 'yaml_file': None, 'yaml_file_encoding': None}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- paths: Paths¶
- post_root_validator()¶
Update unset paths of any member MQTTClient’s TLS paths based on ProactorSettings ‘paths’ member.
- Return type:
Self
- proactor: ProactorSettings¶
- update_paths_name(name)¶
- Parameters:
name (str | Path)
- Return type:
Self
- classmethod update_paths_name_before_validator(values, name)¶
Update paths member with a new ‘name’ attribute, e.g., a name known by a derived class.
This may be called in a mode=”before” root validator of a derived class.
- Parameters:
values (dict[str, Any])
name (str)
- Return type:
dict[str, Any]
- update_tls_paths()¶
- Return type:
Self
- uses_tls()¶
- Return type:
bool
- with_paths(*, paths=None, exclude_unset=True, exclude_defaults=True, **kwargs)¶
- Parameters:
paths (Paths | None)
exclude_unset (bool)
exclude_defaults (bool)
kwargs (Any)
- Return type:
Self
- with_paths_name(name)¶
- Parameters:
name (str)
- Return type:
Self
- class gwproactor.AsyncQueueWriter¶
Allow synchronous code to write to an asyncio Queue.
It is assumed the asynchronous reader has access to the asyncio Queue “await get()” from directly from it.
- put(item)¶
Write to asyncio queue in a threadsafe way.
- Parameters:
item (Any)
- Return type:
None
- set_async_loop(loop, async_queue)¶
- Parameters:
loop (AbstractEventLoop)
async_queue (Queue[Any])
- Return type:
None
- class gwproactor.CodecFactory¶
- get_codec(link_name, link, proactor_name, layout)¶
- Parameters:
link_name (str)
link (LinkSettings)
proactor_name (ProactorName)
layout (HardwareLayout)
- Return type:
MQTTCodec
- class gwproactor.CodecSettings(_case_sensitive=None, _nested_model_default_partial_update=None, _env_prefix=None, _env_file=PosixPath('.'), _env_file_encoding=None, _env_ignore_empty=None, _env_nested_delimiter=None, _env_nested_max_split=None, _env_parse_none_str=None, _env_parse_enums=None, _cli_prog_name=None, _cli_parse_args=None, _cli_settings_source=None, _cli_parse_none_str=None, _cli_hide_none_type=None, _cli_avoid_json=None, _cli_enforce_required=None, _cli_use_class_docs_for_groups=None, _cli_exit_on_error=None, _cli_prefix=None, _cli_flag_prefix_char=None, _cli_implicit_flags=None, _cli_ignore_unknown_args=None, _cli_kebab_case=None, _secrets_dir=None, *, message_model_name='', message_modules=[], use_default_message_modules=True)¶
- Parameters:
_case_sensitive (bool | None)
_nested_model_default_partial_update (bool | None)
_env_prefix (str | None)
_env_file (DotenvType | None)
_env_file_encoding (str | None)
_env_ignore_empty (bool | None)
_env_nested_delimiter (str | None)
_env_nested_max_split (int | None)
_env_parse_none_str (str | None)
_env_parse_enums (bool | None)
_cli_prog_name (str | None)
_cli_parse_args (bool | list[str] | tuple[str, ...] | None)
_cli_settings_source (CliSettingsSource[Any] | None)
_cli_parse_none_str (str | None)
_cli_hide_none_type (bool | None)
_cli_avoid_json (bool | None)
_cli_enforce_required (bool | None)
_cli_use_class_docs_for_groups (bool | None)
_cli_exit_on_error (bool | None)
_cli_prefix (str | None)
_cli_flag_prefix_char (str | None)
_cli_implicit_flags (bool | None)
_cli_ignore_unknown_args (bool | None)
_cli_kebab_case (bool | None)
_secrets_dir (PathType | None)
message_model_name (str)
message_modules (list[str])
use_default_message_modules (bool)
- message_model_name: str¶
- message_modules: list[str]¶
- model_config: ClassVar[SettingsConfigDict] = {'arbitrary_types_allowed': True, 'case_sensitive': False, 'cli_avoid_json': False, 'cli_enforce_required': False, 'cli_exit_on_error': True, 'cli_flag_prefix_char': '-', 'cli_hide_none_type': False, 'cli_ignore_unknown_args': False, 'cli_implicit_flags': False, 'cli_kebab_case': False, 'cli_parse_args': None, 'cli_parse_none_str': None, 'cli_prefix': '', 'cli_prog_name': None, 'cli_use_class_docs_for_groups': False, 'enable_decoding': True, 'env_file': None, 'env_file_encoding': None, 'env_ignore_empty': False, 'env_nested_delimiter': None, 'env_nested_max_split': None, 'env_parse_enums': None, 'env_parse_none_str': None, 'env_prefix': '', 'extra': 'forbid', 'json_file': None, 'json_file_encoding': None, 'nested_model_default_partial_update': False, 'protected_namespaces': ('model_validate', 'model_dump', 'settings_customise_sources'), 'secrets_dir': None, 'toml_file': None, 'validate_default': True, 'yaml_file': None, 'yaml_file_encoding': None}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- use_default_message_modules: bool¶
- class gwproactor.Communicator(name, services)¶
A partial implementation of CommunicatorInterface which supplies the trivial implementations
- Parameters:
name (str)
services (AppInterface)
- property monitored_names: Sequence[MonitoredName]¶
- property name: str¶
- property services: AppInterface¶
- class gwproactor.CommunicatorInterface¶
Pure interface necessary for interaction between a sub-object and the system services proactor
- abstract property monitored_names: Sequence[MonitoredName]¶
- abstract property name: str¶
- abstract process_message(message)¶
- Parameters:
message (Message[Any])
- Return type:
Ok[bool] | Err[Exception]
- class gwproactor.ExternalWatchdogCommandBuilder¶
Create arguments which will be passed to subprocess.run() to pat the external watchdog.
If the returned list is empty, pat process will be run.
By default an empty list is returned if the environment variable named by sevice_variable_name() is not set to 1 or true.
- classmethod default_pat_args(pid=None)¶
- Parameters:
pid (int | None)
- Return type:
list[str]
- classmethod pat_args(service_name, args=None, pid=None)¶
Return arguments to be passed to subprocess.run() to pat the external watchdog.
- Parameters:
service_name (str)
args (list[str] | None)
pid (int | None)
- Return type:
list[str]
- classmethod running_as_service(service_name)¶
- Parameters:
service_name (str)
- Return type:
bool
- classmethod service_variable_name(service_name)¶
- Parameters:
service_name (str)
- Return type:
str
- class gwproactor.LinkSettings(_case_sensitive=None, _nested_model_default_partial_update=None, _env_prefix=None, _env_file=PosixPath('.'), _env_file_encoding=None, _env_ignore_empty=None, _env_nested_delimiter=None, _env_nested_max_split=None, _env_parse_none_str=None, _env_parse_enums=None, _cli_prog_name=None, _cli_parse_args=None, _cli_settings_source=None, _cli_parse_none_str=None, _cli_hide_none_type=None, _cli_avoid_json=None, _cli_enforce_required=None, _cli_use_class_docs_for_groups=None, _cli_exit_on_error=None, _cli_prefix=None, _cli_flag_prefix_char=None, _cli_implicit_flags=None, _cli_ignore_unknown_args=None, _cli_kebab_case=None, _secrets_dir=None, *, broker_name='', enabled=True, peer_long_name='', peer_short_name='', link_subscription_short_name='', codec=CodecSettings(message_model_name='', message_modules=[], use_default_message_modules=True), upstream=False, downstream=False)¶
- Parameters:
_case_sensitive (bool | None)
_nested_model_default_partial_update (bool | None)
_env_prefix (str | None)
_env_file (DotenvType | None)
_env_file_encoding (str | None)
_env_ignore_empty (bool | None)
_env_nested_delimiter (str | None)
_env_nested_max_split (int | None)
_env_parse_none_str (str | None)
_env_parse_enums (bool | None)
_cli_prog_name (str | None)
_cli_parse_args (bool | list[str] | tuple[str, ...] | None)
_cli_settings_source (CliSettingsSource[Any] | None)
_cli_parse_none_str (str | None)
_cli_hide_none_type (bool | None)
_cli_avoid_json (bool | None)
_cli_enforce_required (bool | None)
_cli_use_class_docs_for_groups (bool | None)
_cli_exit_on_error (bool | None)
_cli_prefix (str | None)
_cli_flag_prefix_char (str | None)
_cli_implicit_flags (bool | None)
_cli_ignore_unknown_args (bool | None)
_cli_kebab_case (bool | None)
_secrets_dir (PathType | None)
broker_name (str)
enabled (bool)
peer_long_name (str)
peer_short_name (str)
link_subscription_short_name (str)
codec (CodecSettings)
upstream (bool)
downstream (bool)
- broker_name: str¶
- codec: CodecSettings¶
- downstream: bool¶
- enabled: bool¶
- link_subscription_short_name: str¶
- model_config: ClassVar[SettingsConfigDict] = {'arbitrary_types_allowed': True, 'case_sensitive': False, 'cli_avoid_json': False, 'cli_enforce_required': False, 'cli_exit_on_error': True, 'cli_flag_prefix_char': '-', 'cli_hide_none_type': False, 'cli_ignore_unknown_args': False, 'cli_implicit_flags': False, 'cli_kebab_case': False, 'cli_parse_args': None, 'cli_parse_none_str': None, 'cli_prefix': '', 'cli_prog_name': None, 'cli_use_class_docs_for_groups': False, 'enable_decoding': True, 'env_file': None, 'env_file_encoding': None, 'env_ignore_empty': False, 'env_nested_delimiter': None, 'env_nested_max_split': None, 'env_parse_enums': None, 'env_parse_none_str': None, 'env_prefix': '', 'extra': 'forbid', 'json_file': None, 'json_file_encoding': None, 'nested_model_default_partial_update': False, 'protected_namespaces': ('model_validate', 'model_dump', 'settings_customise_sources'), 'secrets_dir': None, 'toml_file': None, 'validate_default': True, 'yaml_file': None, 'yaml_file_encoding': None}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- peer_long_name: str¶
- peer_short_name: str¶
- upstream: bool¶
- class gwproactor.MQTTClientWrapper(client_name, topic_dst, client_config, receive_queue)¶
- Parameters:
client_name (str)
topic_dst (str)
client_config (MQTTClient)
receive_queue (AsyncQueueWriter)
- connected()¶
- Return type:
bool
- disable_logger()¶
- Return type:
None
- enable_logger(logger=None)¶
- Parameters:
logger (Logger | LoggerAdapter[Logger] | None)
- Return type:
None
- handle_suback(suback)¶
- Parameters:
suback (MQTTSubackPayload)
- Return type:
int
- property mqtt_client: Client¶
- num_pending_subscriptions()¶
- Return type:
int
- num_subscriptions()¶
- Return type:
int
- on_connect(_, userdata, flags, reason_code, _properties=None)¶
- Parameters:
_ (Any)
userdata (Any)
flags (ConnectFlags)
reason_code (ReasonCode)
_properties (Properties | None)
- Return type:
None
- on_connect_fail(_, userdata)¶
- Parameters:
_ (Any)
userdata (Any)
- Return type:
None
- on_disconnect(_, userdata, _flags, rc, _properties)¶
- Parameters:
_ (Client)
userdata (Any)
_flags (DisconnectFlags)
rc (ReasonCode)
_properties (Properties | None)
- Return type:
None
- on_message(_, userdata, message)¶
- Parameters:
_ (Any)
userdata (Any)
message (MQTTMessage)
- Return type:
None
- on_subscribe(_, userdata, mid, reason_codes, _properties)¶
- Parameters:
_ (Client)
userdata (Any)
mid (int)
reason_codes (Sequence[ReasonCode])
_properties (Properties)
- Return type:
None
- publish(topic, payload, qos)¶
- Parameters:
topic (str)
payload (bytes)
qos (int)
- Return type:
MQTTMessageInfo
- start()¶
- Return type:
None
- stop()¶
- Return type:
None
- subscribe(topic, qos)¶
- Parameters:
topic (str)
qos (int)
- Return type:
tuple[int, int | None]
- subscribe_all()¶
- Return type:
tuple[int, int | None]
- subscribed()¶
- Return type:
bool
- subscription_items()¶
- Return type:
list[Tuple[str, int]]
- topic_dst: str¶
- unsubscribe(topic)¶
- Parameters:
topic (str)
- Return type:
Tuple[int, int | None]
- class gwproactor.MQTTClients¶
- add_client(settings)¶
- Parameters:
settings (LinkConfig)
- Return type:
None
- client_wrapper(client)¶
- Parameters:
client (str)
- Return type:
- clients: Dict[str, MQTTClientWrapper]¶
- connected(client)¶
- Parameters:
client (str)
- Return type:
bool
- disable_loggers()¶
- Return type:
None
- downstream_client: str = ''¶
- enable_loggers(logger=None)¶
- Parameters:
logger (Logger | LoggerAdapter[Logger] | None)
- Return type:
None
- handle_suback(suback)¶
- Parameters:
suback (MQTTSubackPayload)
- Return type:
int
- num_pending_subscriptions(client)¶
- Parameters:
client (str)
- Return type:
int
- num_subscriptions(client)¶
- Parameters:
client (str)
- Return type:
int
- primary_peer()¶
- Return type:
- publish(client, topic, payload, qos)¶
- Parameters:
client (str)
topic (str)
payload (bytes)
qos (int)
- Return type:
MQTTMessageInfo
- start(loop, async_queue)¶
- Parameters:
loop (AbstractEventLoop)
async_queue (Queue[Any])
- Return type:
None
- stop()¶
- Return type:
None
- subscribe(client, topic, qos)¶
- Parameters:
client (str)
topic (str)
qos (int)
- Return type:
Tuple[int, int | None]
- subscribe_all(client)¶
- Parameters:
client (str)
- Return type:
Tuple[int, int | None]
- subscribed(client)¶
- Parameters:
client (str)
- Return type:
bool
- topic_dst(client)¶
- Parameters:
client (str)
- Return type:
str
- unsubscribe(client, topic)¶
- Parameters:
client (str)
topic (str)
- Return type:
Tuple[int, int | None]
- upstream()¶
- Return type:
- upstream_client: str = ''¶
- upstream_topic_dst: str = ''¶
- class gwproactor.MonitoredName(name: str, timeout_seconds: float)¶
- Parameters:
name (str)
timeout_seconds (float)
- name: str¶
- timeout_seconds: float¶
- class gwproactor.PrimeActor(name, services)¶
- Parameters:
name (str)
services (AppInterface)
- classmethod get_codec_factory()¶
- Return type:
- async join()¶
- Return type:
None
- process_message(_)¶
- Parameters:
_ (Message[Any])
- Return type:
Ok[bool] | Err[Exception]
- start()¶
- Return type:
None
- stop()¶
- Return type:
None
- class gwproactor.Proactor(config)¶
- Parameters:
config (ProactorConfig)
- AWAIT_PROCESSING_FUTURE_ATTRIBUTE: str = '_await_processing_future'¶
- add_callbacks(callbacks)¶
- Parameters:
callbacks (ProactorCallbackInterface)
- Return type:
int
- add_communicator(communicator)¶
- Parameters:
communicator (CommunicatorInterface)
- Return type:
None
- add_task(task)¶
- Parameters:
task (Task[Any])
- Return type:
None
- add_web_route(server_name, method, path, handler, **kwargs)¶
Adds configuration for web server route which will be available after start() is called.
May be called even if associated web server is not configured, in which case this route will simply be ignored.
Not thread safe.
- Parameters:
server_name (str)
method (str)
path (str)
handler (Callable[[Request], Awaitable[StreamResponse]])
kwargs (Any)
- Return type:
None
- add_web_server_config(name, host, port, **kwargs)¶
Adds configuration for web server which will be started when start() is called.
Not thread safe.
- Parameters:
name (str)
host (str)
port (int)
kwargs (Any)
- Return type:
None
- async async_process_message(message)¶
- Parameters:
message (Message[Any])
- Return type:
None
- property async_receive_queue: Queue[Any] | None¶
- async await_processing(message)¶
- Parameters:
message (Message[Any])
- Return type:
Ok[Any] | Err[BaseException]
- property callback_functions: ProactorCallbackFunctions¶
- property downstream_client: str¶
- property event_loop: AbstractEventLoop | None¶
- property event_persister: PersisterInterface¶
- generate_event(event)¶
- Parameters:
event (EventT)
- Return type:
Ok[bool] | Err[Exception]
- get_communicator(name)¶
- Parameters:
name (str)
- Return type:
CommunicatorInterface | None
- get_communicator_as_type(name, type_)¶
- Parameters:
name (str)
type_ (Type[T])
- Return type:
T | None
- get_communicator_names()¶
- Return type:
set[str]
- get_external_watchdog_builder_class()¶
- Return type:
- get_web_server_configs()¶
- Return type:
dict[str, WebServerGt]
- get_web_server_route_strings()¶
- Return type:
dict[str, list[str]]
- property hardware_layout: HardwareLayout¶
- property io_loop_manager: IOLoopInterface¶
- async join()¶
- Return type:
None
- property links: LinkManager¶
- property logger: ProactorLogger¶
- property long_name: str¶
- classmethod make_stats()¶
- Return type:
ProactorStats
- property monitored_names: Sequence[MonitoredName]¶
- property name: str¶
- property name_object: ProactorName¶
- property paths_name: str¶
- process_message(message)¶
- Parameters:
message (Message[Any])
- Return type:
Ok[bool] | Err[Exception]
- async process_messages()¶
- Return type:
None
- property publication_name: str¶
- publish_message(link_name, message, qos=0, context=None, *, topic='', use_link_topic=False)¶
- Parameters:
link_name (str)
message (Message[Any])
qos (int)
context (Any)
topic (str)
use_link_topic (bool)
- Return type:
MQTTMessageInfo
- publish_upstream(payload, qos=QOS.AtMostOnce, **message_args)¶
- Parameters:
payload (Any)
qos (QOS)
message_args (Any)
- Return type:
MQTTMessageInfo
- remove_callbacks(callbacks_id)¶
- Parameters:
callbacks_id (int)
- Return type:
None
- async run_forever()¶
- Return type:
None
- run_in_thread(*, daemon=True)¶
- Parameters:
daemon (bool)
- Return type:
Thread
- send(message)¶
- Parameters:
message (Message[Any])
- Return type:
None
- send_threadsafe(message)¶
- Parameters:
message (Message[Any])
- Return type:
None
- property settings: AppSettings¶
- property short_name: str¶
- start()¶
- Return type:
None
- start_tasks()¶
- Return type:
None
- property stats: ProactorStats¶
- stop()¶
- Return type:
None
- property subscription_name: str¶
- property upstream_client: str¶
- wait_for_processing_threadsafe(message)¶
- Parameters:
message (Message[Any])
- Return type:
Ok[Any] | Err[BaseException]
- class gwproactor.ProactorCallbackFunctions(pre_child_start: Callable[[], NoneType] | None = None, start_tasks: Callable[[], Sequence[_asyncio.Task[Any]]] | None = None, start_processing_messages: Callable[[], NoneType] | None = None, process_internal_message: Callable[[gwproto.message.Message[Any]], NoneType] | None = None, process_mqtt_message: Callable[[gwproto.message.Message[MQTTReceiptPayload], gwproto.message.Message[Any]], NoneType] | None = None, recv_activated: Callable[[gwproactor.links.link_state.Transition], NoneType] | None = None, recv_deactivated: Callable[[gwproactor.links.link_state.Transition], NoneType] | None = None)¶
- Parameters:
pre_child_start (Callable[[], None] | None)
start_tasks (Callable[[], Sequence[Task[Any]]] | None)
start_processing_messages (Callable[[], None] | None)
process_internal_message (Callable[[Message[Any]], None] | None)
process_mqtt_message (Callable[[Message[MQTTReceiptPayload], Message[Any]], None] | None)
recv_activated (Callable[[Transition], None] | None)
recv_deactivated (Callable[[Transition], None] | None)
- pre_child_start: Callable[[], None] | None = None¶
- process_internal_message: Callable[[Message[Any]], None] | None = None¶
- process_mqtt_message: Callable[[Message[MQTTReceiptPayload], Message[Any]], None] | None = None¶
- recv_activated: Callable[[Transition], None] | None = None¶
- recv_deactivated: Callable[[Transition], None] | None = None¶
- start_processing_messages: Callable[[], None] | None = None¶
- start_tasks: Callable[[], Sequence[Task[Any]]] | None = None¶
- class gwproactor.ProactorCallbackInterface¶
- pre_child_start()¶
- Return type:
None
- process_internal_message(message)¶
- Parameters:
message (Message[Any])
- Return type:
None
- process_mqtt_message(mqtt_client_message, decoded)¶
- Parameters:
mqtt_client_message (Message[MQTTReceiptPayload])
decoded (Message[Any])
- Return type:
None
- recv_activated(transition)¶
- Parameters:
transition (Transition)
- Return type:
None
- recv_deactivated(transition)¶
- Parameters:
transition (Transition)
- Return type:
None
- start_processing_messages()¶
- Return type:
None
- start_tasks()¶
- Return type:
Sequence[Task[Any]]
- class gwproactor.ProactorCodec(*, src_name='', dst_name='', model_name='', module_names=None, use_default_modules=True)¶
- Parameters:
src_name (str)
dst_name (str)
model_name (str)
module_names (Sequence[str] | None)
use_default_modules (bool)
- DEFAULT_MESSAGE_MODULES: ClassVar[list[str]] = ['gwproto.messages', 'gwproactor.message']¶
- classmethod create_message_model(*, model_name='', module_names=None, use_default_modules=True)¶
- Parameters:
model_name (str)
module_names (Sequence[str] | None)
use_default_modules (bool)
- Return type:
type[Message[Any]]
- dst_name: str¶
- classmethod get_unrecognized_payload_error(e)¶
- Parameters:
e (ValidationError)
- Return type:
ErrorDetails | None
- handle_unrecognized_payload(payload, e, details)¶
- Parameters:
payload (bytes)
e (ValidationError)
details (ErrorDetails)
- Return type:
Message[Any]
- src_name: str¶
- validate_source_and_destination(src, dst)¶
- Parameters:
src (str)
dst (str)
- Return type:
None
- class gwproactor.ProactorConfig(name, *, settings=None, callbacks=None, logger=None, event_persister=None, hardware_layout=None)¶
- Parameters:
name (ProactorName)
settings (AppSettings)
callbacks (ProactorCallbackFunctions | None)
logger (ProactorLogger)
event_persister (PersisterInterface)
hardware_layout (HardwareLayout | None)
- callback_functions: ProactorCallbackFunctions¶
- event_persister: PersisterInterface¶
- layout: HardwareLayout¶
- logger: ProactorLogger¶
- name: ProactorName¶
- settings: AppSettings¶
- class gwproactor.ProactorLogger(base, message_summary, lifecycle, comm_event, io_loop, *, extra=None, category_logger_names=None, **_kwargs)¶
- Parameters:
base (str)
message_summary (str)
lifecycle (str)
comm_event (str)
io_loop (str)
extra (dict[str, Any] | None)
category_logger_names (Sequence[str] | None)
_kwargs (Mapping[str, Any])
- MESSAGE_DELIMITER_WIDTH = 88¶
- MESSAGE_ENTRY_DELIMITER = '++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++'¶
- MESSAGE_EXIT_DELIMITER = '----------------------------------------------------------------------------------------'¶
- add_category_logger(category='', level=20, logger=None)¶
- Parameters:
category (str)
level (int)
logger (Logger | LoggerAdapter[Logger] | None)
- Return type:
Logger | LoggerAdapter[Logger]
- category_logger(category)¶
Get existing category logger
- Parameters:
category (str)
- Return type:
Logger | LoggerAdapter[Logger] | None
- category_logger_name(category)¶
- Parameters:
category (str)
- Return type:
str
- category_loggers: dict[str, CategoryLoggerInfo]¶
- comm_event(msg, *args, **kwargs)¶
- Parameters:
msg (str)
args (Any)
kwargs (Any)
- Return type:
None
- property comm_event_enabled: bool¶
- comm_event_logger: Logger¶
- property general_enabled: bool¶
- io_loop(msg, *args, **kwargs)¶
- Parameters:
msg (str)
args (Any)
kwargs (Any)
- Return type:
None
- property io_loop_enabled: bool¶
- io_loop_logger: Logger¶
- lifecycle(msg, *args, **kwargs)¶
- Parameters:
msg (str)
args (Any)
kwargs (Any)
- Return type:
None
- property lifecycle_enabled: bool¶
- lifecycle_logger: Logger¶
- message_enter(msg, *args, **kwargs)¶
- Parameters:
msg (str)
args (Any)
kwargs (Any)
- Return type:
None
- message_exit(msg, *args, **kwargs)¶
- Parameters:
msg (str)
args (Any)
kwargs (Any)
- Return type:
None
- message_summary(*, direction, src, dst, topic, payload_object=None, broker_flag=' ', timestamp=None, message_id='')¶
- Parameters:
direction (str)
src (str)
dst (str)
topic (str)
payload_object (Any)
broker_flag (str)
timestamp (datetime | None)
message_id (str)
- Return type:
None
- property message_summary_enabled: bool¶
- message_summary_logger: Logger¶
- path(msg, *args, **kwargs)¶
- Parameters:
msg (str)
args (Any)
kwargs (Any)
- Return type:
None
- property path_enabled: bool¶
- reset_default_category_levels()¶
- Return type:
None
- class gwproactor.ProactorName(long_name: str, short_name: str)¶
- Parameters:
long_name (str)
short_name (str)
- long_name: str¶
- property name: str¶
- property publication_name: str¶
- short_name: str¶
- property subscription_name: str¶
- class gwproactor.ProactorSettings(_case_sensitive=None, _nested_model_default_partial_update=None, _env_prefix=None, _env_file=PosixPath('.'), _env_file_encoding=None, _env_ignore_empty=None, _env_nested_delimiter=None, _env_nested_max_split=None, _env_parse_none_str=None, _env_parse_enums=None, _cli_prog_name=None, _cli_parse_args=None, _cli_settings_source=None, _cli_parse_none_str=None, _cli_hide_none_type=None, _cli_avoid_json=None, _cli_enforce_required=None, _cli_use_class_docs_for_groups=None, _cli_exit_on_error=None, _cli_prefix=None, _cli_flag_prefix_char=None, _cli_implicit_flags=None, _cli_ignore_unknown_args=None, _cli_kebab_case=None, _secrets_dir=None, *, mqtt_link_poll_seconds=60.0, ack_timeout_seconds=5.0, num_initial_event_reuploads=5)¶
- Parameters:
_case_sensitive (bool | None)
_nested_model_default_partial_update (bool | None)
_env_prefix (str | None)
_env_file (DotenvType | None)
_env_file_encoding (str | None)
_env_ignore_empty (bool | None)
_env_nested_delimiter (str | None)
_env_nested_max_split (int | None)
_env_parse_none_str (str | None)
_env_parse_enums (bool | None)
_cli_prog_name (str | None)
_cli_parse_args (bool | list[str] | tuple[str, ...] | None)
_cli_settings_source (CliSettingsSource[Any] | None)
_cli_parse_none_str (str | None)
_cli_hide_none_type (bool | None)
_cli_avoid_json (bool | None)
_cli_enforce_required (bool | None)
_cli_use_class_docs_for_groups (bool | None)
_cli_exit_on_error (bool | None)
_cli_prefix (str | None)
_cli_flag_prefix_char (str | None)
_cli_implicit_flags (bool | None)
_cli_ignore_unknown_args (bool | None)
_cli_kebab_case (bool | None)
_secrets_dir (PathType | None)
mqtt_link_poll_seconds (float)
ack_timeout_seconds (float)
num_initial_event_reuploads (int)
- ack_timeout_seconds: float¶
- model_config: ClassVar[SettingsConfigDict] = {'arbitrary_types_allowed': True, 'case_sensitive': False, 'cli_avoid_json': False, 'cli_enforce_required': False, 'cli_exit_on_error': True, 'cli_flag_prefix_char': '-', 'cli_hide_none_type': False, 'cli_ignore_unknown_args': False, 'cli_implicit_flags': False, 'cli_kebab_case': False, 'cli_parse_args': None, 'cli_parse_none_str': None, 'cli_prefix': '', 'cli_prog_name': None, 'cli_use_class_docs_for_groups': False, 'enable_decoding': True, 'env_file': None, 'env_file_encoding': None, 'env_ignore_empty': False, 'env_nested_delimiter': '__', 'env_nested_max_split': None, 'env_parse_enums': None, 'env_parse_none_str': None, 'env_prefix': 'PROACTOR_', 'extra': 'forbid', 'json_file': None, 'json_file_encoding': None, 'nested_model_default_partial_update': True, 'protected_namespaces': ('model_validate', 'model_dump', 'settings_customise_sources'), 'secrets_dir': None, 'toml_file': None, 'validate_default': True, 'yaml_file': None, 'yaml_file_encoding': None}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- mqtt_link_poll_seconds: float¶
- num_initial_event_reuploads: int¶
- exception gwproactor.Problems(msg='', warnings=None, errors=None, max_problems=10)¶
- Parameters:
msg (str)
warnings (list[BaseException])
errors (list[BaseException])
max_problems (int)
- Return type:
None
- MAX_PROBLEMS = 10¶
- error_traceback_str()¶
- Return type:
str
- errors: list[BaseException]¶
- max_problems: int = 10¶
- problem_event(summary, src='')¶
- Parameters:
summary (str)
src (str)
- Return type:
ProblemEvent
- warnings: list[BaseException]¶
- class gwproactor.QOS(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)¶
- AtLeastOnce = 1¶
- AtMostOnce = 0¶
- ExactlyOnce = 2¶
- class gwproactor.Runnable¶
Pure interface to an object which is expected to support starting, stopping and joining.
- abstract async join()¶
- Return type:
None
- abstract start()¶
- Return type:
None
- abstract stop()¶
- Return type:
None
- async stop_and_join()¶
- Return type:
None
- class gwproactor.Subscription(Topic, Qos)¶
- Parameters:
Topic (str)
Qos (QOS)
- Topic: str¶
Alias for field number 0
- class gwproactor.SyncAsyncInteractionThread(channel=None, name=None, *, iterate_sleep_seconds=None, responsive_sleep_step_seconds=0.1, pat_timeout=20, daemon=True, logger=None)¶
A thread wrapper providing an async-sync communication channel and simple “iterate, sleep, read message” semantics.
- Parameters:
channel (SyncAsyncQueueWriter | None)
name (str | None)
iterate_sleep_seconds (float | None)
responsive_sleep_step_seconds (float)
pat_timeout (float | None)
daemon (bool)
logger (Logger | LoggerAdapter[Logger] | None)
- JOIN_CHECK_THREAD_SECONDS = 1.0¶
- PAT_TIMEOUT = 20¶
- SLEEP_STEP_SECONDS = 0.1¶
- async async_join(timeout=None)¶
- Parameters:
timeout (float | None)
- Return type:
None
- pat_timeout: float | None¶
- pat_watchdog()¶
- Return type:
None
- put_to_sync_queue(message, *, block=True, timeout=None)¶
- Parameters:
message (Any)
block (bool)
timeout (float | None)
- Return type:
None
- request_stop()¶
- Return type:
None
- run()¶
Method representing the thread’s activity.
You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.
- Return type:
None
- running: bool | None¶
- set_async_loop(loop, async_queue)¶
- Parameters:
loop (AbstractEventLoop)
async_queue (Queue[Any])
- Return type:
None
- set_async_loop_and_start(loop, async_queue)¶
- Parameters:
loop (AbstractEventLoop)
async_queue (Queue[Any])
- Return type:
None
- time_to_pat()¶
- Return type:
bool
- class gwproactor.SyncAsyncQueueWriter(sync_queue=None)¶
Provide a full duplex communication “channel” between synchronous and asynchronous code.
It is assumed the asynchronous reader has access to the asyncio Queue “await get()” from directly from it.
- Parameters:
sync_queue (Queue[Any] | None)
- get_from_sync_queue(*, block=True, timeout=None)¶
Read from synchronous queue in a threadsafe way.
- Parameters:
block (bool)
timeout (float | None)
- Return type:
Any
- put_to_async_queue(item)¶
Write to asynchronous queue in a threadsafe way.
- Parameters:
item (Any)
- Return type:
None
- put_to_sync_queue(item, *, block=True, timeout=None)¶
Write to synchronous queue in a threadsafe way.
- Parameters:
item (Any)
block (bool)
timeout (float | None)
- Return type:
None
- set_async_loop(loop, async_queue)¶
- Parameters:
loop (AbstractEventLoop)
async_queue (Queue[Any])
- Return type:
None
- sync_queue: Queue[Any] | None¶
- class gwproactor.SyncThreadActor(name, services, sync_thread)¶
- Parameters:
name (str)
services (AppInterface)
sync_thread (SyncAsyncInteractionThread)
- async join()¶
- Return type:
None
- property monitored_names: Sequence[MonitoredName]¶
- process_message(message)¶
- Parameters:
message (Message[Any])
- Return type:
Ok[bool] | Err[Exception]
- send_driver_message(message)¶
- Parameters:
message (Any)
- Return type:
None
- start()¶
- Return type:
None
- stop()¶
- Return type:
None
- class gwproactor.WebEventListener(name, services, settings=None)¶
- Parameters:
name (str)
services (AppInterface)
settings (WebEventListenerSettings | None)
- DEFAULT_NODE_NAME: str = 'web-event'¶
- classmethod default_node(name='web-event', parent_name='s')¶
- Parameters:
name (str)
parent_name (str)
- Return type:
SpaceheatNodeGt
- async join()¶
IOLoop will take care of shutting down the associated task.
- Return type:
None
- process_message(message)¶
- Parameters:
message (Message[Any])
- Return type:
Ok[bool] | Err[Exception]
- start()¶
IOLoop will take care of start.
- Return type:
None
- stop()¶
IOLoop will take care of stop.
- Return type:
None
- gwproactor.format_exceptions(exceptions)¶
- Parameters:
exceptions (list[Exception])
- Return type:
str
- gwproactor.get_app(*, paths_name=None, paths=None, app_settings=None, app_type=<class 'gwproactor.app.App'>, codec_factory=None, sub_types=None, env_file='.env', dry_run=False, verbose=False, message_summary=False, io_loop_verbose=False, io_loop_on_screen=False, aiohttp_logging=False, run_in_thread=False, add_screen_handler=True)¶
- Parameters:
paths_name (str | None)
paths (Paths | None)
app_settings (AppSettings | None)
app_type (type[App])
codec_factory (CodecFactory | None)
sub_types (SubTypes | None)
env_file (str | Path | None)
dry_run (bool)
verbose (bool)
message_summary (bool)
io_loop_verbose (bool)
io_loop_on_screen (bool)
aiohttp_logging (bool)
run_in_thread (bool)
add_screen_handler (bool)
- Return type:
- gwproactor.responsive_sleep(obj, seconds, *, step_duration=0.1, running_field_name='_main_loop_running', running_field=True)¶
Sleep in way that is more responsive to thread termination: sleep in step_duration increments up to specificed seconds, at after each step checking obj._main_loop_running. If the designated running_field_name actually indicates that a stop has been requested (e.g. what you would expect from a field named ‘_stop_requested’), set running_field parameter to False.
- Parameters:
obj (Any)
seconds (float)
step_duration (float)
running_field_name (str)
running_field (bool)
- Return type:
bool
- async gwproactor.run_async_main(*, paths_name=None, paths=None, app_settings=None, app_type=<class 'gwproactor.app.App'>, codec_factory=None, sub_types=None, env_file='.env', dry_run=False, verbose=False, message_summary=False, io_loop_verbose=False, io_loop_on_screen=False, aiohttp_logging=False, add_screen_handler=True)¶
- Parameters:
paths_name (str | None)
paths (Paths | None)
app_settings (AppSettings | None)
app_type (type[App])
codec_factory (CodecFactory | None)
sub_types (SubTypes | None)
env_file (str | Path | None)
dry_run (bool)
verbose (bool)
message_summary (bool)
io_loop_verbose (bool)
io_loop_on_screen (bool)
aiohttp_logging (bool)
add_screen_handler (bool)
- Return type:
None
- gwproactor.setup_logging(args, settings, *, errors=None, add_screen_handler=True, root_gets_handlers=True)¶
Get python logging config based on parsed command line args, defaults, environment variables and logging config file.
The order of precedence is:
Command line arguments
Environment
Defaults
- Parameters:
args (Namespace)
settings (AppSettings)
errors (list[Exception] | None)
add_screen_handler (bool)
root_gets_handlers (bool)
- Return type:
None