gwproactor.links#
Internal package used to manage communications state. LinkManager
is the interface into this package used by
Proactor
- class gwproactor.links.AckManager(timer_mgr, callback, delay=5.0)#
- Parameters:
timer_mgr (TimerManagerInterface) –
callback (Callable[[AckWaitInfo], None]) –
delay (float | None) –
- _acks: dict[str, dict[str, AckWaitInfo]]#
- _callback: Callable[[AckWaitInfo], None]#
- _default_delay_seconds: float#
- _pop_wait_info(link_name, message_id)#
- Parameters:
link_name (str) –
message_id (str) –
- Return type:
AckWaitInfo | None
- _timeout(link_name, message_id)#
- Parameters:
link_name (str) –
message_id (str) –
- Return type:
None
- _timer_mgr: TimerManagerInterface#
- add_link(link_name)#
- Parameters:
link_name (str) –
- Return type:
None
- cancel_ack_timer(link_name, message_id)#
- Parameters:
link_name (str) –
message_id (str) –
- Return type:
- cancel_ack_timers(link_name)#
- Parameters:
link_name (str) –
- Return type:
list[AckWaitInfo]
- num_acks(link_name)#
- Parameters:
link_name (str) –
- Return type:
int
- start_ack_timer(link_name, message_id, context=None, delay_seconds=None)#
- Parameters:
link_name (str) –
message_id (str) –
context (Any | None) –
delay_seconds (float | None) –
- Return type:
- class gwproactor.links.AckWaitInfo(link_name: str, message_id: str, timer_handle: Any, context: Any = None)#
- Parameters:
link_name (str) –
message_id (str) –
timer_handle (Any) –
context (Any) –
- context: Any = None#
- link_name: str#
- message_id: str#
- timer_handle: Any#
- class gwproactor.links.AsyncioTimerManager#
- _abc_impl = <_abc._abc_data object>#
- cancel_timer(timer_handle)#
Cancel callback associated with _timer_handle_.
Note that callback might still run after this call returns.
- Parameters:
timer_handle (Any) – The value returned by start_timer()
- Return type:
None
- start_timer(delay_seconds, callback)#
Start a timer. Implementation is expected to call _callback_ after approximately _delay_sceonds_.
The execution context (e.g. the thread) of the callback must be specified by the implemntation.
The callback must have sufficient context available to it do its work as well as to detect if it is no longer relevant. Note a callback might run after cancelation if the callack was already “in-flight” at time of cancellation and it is up to the callback to tolerate this situation.
- Parameters:
delay_seconds (float) – The approximate delay before the callback is called.
callback (Callable[[], None]) – The function called after delay_seconds.
- Returns:
A timer handle which can be passed to _cancel_timer()_ to cancel the callback.
- Return type:
TimerHandle
- exception gwproactor.links.CommLinkAlreadyExists(name='', current_state=StateName.none, transition=TransitionName.none, *, msg='')#
- Parameters:
name (str) –
current_state (StateName) –
transition (TransitionName) –
msg (str) –
- exception gwproactor.links.CommLinkMissing(name, *, msg='')#
- Parameters:
name (str) –
- exception gwproactor.links.InvalidCommStateInput(name='', current_state=StateName.none, transition=TransitionName.none, *, msg='')#
- Parameters:
name (str) –
current_state (StateName) –
transition (TransitionName) –
msg (str) –
- name: str = ''#
- transition: TransitionName = 'none'#
- class gwproactor.links.LinkManager(publication_name, settings, logger, stats, event_persister, timer_manager, ack_timeout_callback)#
- Parameters:
publication_name (str) –
settings (ProactorSettings) –
logger (ProactorLogger) –
stats (ProactorStats) –
event_persister (PersisterInterface) –
timer_manager (TimerManagerInterface) –
ack_timeout_callback (Callable[[AckWaitInfo], None]) –
- PERSISTER_ENCODING = 'utf-8'#
- _acks: AckManager#
- _event_persister: PersisterInterface#
- _logger: ProactorLogger#
- _message_times: MessageTimes#
- _mqtt_clients: MQTTClients#
- _mqtt_codecs: dict[str, MQTTCodec]#
- _recv_activated(transition)#
- Parameters:
transition (Transition) –
- _reupload_events(event_ids)#
- Parameters:
event_ids (list[str]) –
- Return type:
Ok[bool] | Err[BaseException]
- _settings: ProactorSettings#
- _start_reupload()#
- Return type:
None
- _states: LinkStates#
- _stats: ProactorStats#
- add_mqtt_link(name, mqtt_config, codec=None, upstream=False, primary_peer=False)#
- Parameters:
name (str) –
mqtt_config (MQTTClient) –
codec (MQTTCodec | None) –
upstream (bool) –
primary_peer (bool) –
- decode(link_name, topic, payload)#
- Parameters:
link_name (str) –
topic (str) –
payload (bytes) –
- Return type:
Message[Any]
- decoder(link_name)#
- Parameters:
link_name (str) –
- Return type:
MQTTCodec | None
- disable_mqtt_loggers()#
- enable_mqtt_loggers(logger=None)#
- Parameters:
logger (Logger | LoggerAdapter | None) –
- generate_event(event)#
- Parameters:
event (EventT) –
- Return type:
Ok[bool] | Err[BaseException]
- get_message_times(link_name)#
- Parameters:
link_name (str) –
- Return type:
- link_names()#
- Return type:
list[str]
- log_subscriptions(tag='')#
- mqtt_client_wrapper(client_name)#
- Parameters:
client_name (str) –
- Return type:
- mqtt_clients()#
- Return type:
- num_acks(link_name)#
- Parameters:
link_name (str) –
- Return type:
int
- property num_pending: int#
- property num_reupload_pending: int#
- property num_reuploaded_unacked: int#
- property primary_peer_client: str#
- process_ack(link_name, message_id)#
- Parameters:
link_name (str) –
message_id (str) –
- process_ack_timeout(wait_info)#
- Parameters:
wait_info (AckWaitInfo) –
- Return type:
Ok[LinkManagerTransition] | Err[BaseException]
- process_mqtt_connect_fail(message)#
- Parameters:
message (Message[MQTTConnectFailPayload]) –
- Return type:
Ok[Transition] | Err[InvalidCommStateInput]
- process_mqtt_connected(message)#
- Parameters:
message (Message[MQTTConnectPayload]) –
- Return type:
Ok[Transition] | Err[InvalidCommStateInput]
- process_mqtt_disconnected(message)#
- Parameters:
message (Message[MQTTDisconnectPayload]) –
- Return type:
Ok[LinkManagerTransition] | Err[InvalidCommStateInput]
- process_mqtt_message(message)#
- Parameters:
message (Message[MQTTReceiptPayload]) –
- Return type:
Ok[Transition] | Err[InvalidCommStateInput]
- process_mqtt_suback(message)#
- Parameters:
message (Message[MQTTSubackPayload]) –
- Return type:
Ok[Transition] | Err[InvalidCommStateInput]
- publication_name: str#
- publish_message(client, message, qos=0, context=None)#
- Parameters:
message (Message) –
qos (int) –
context (Any) –
- Return type:
MQTTMessageInfo
- publish_upstream(payload, qos=QOS.AtMostOnce, **message_args)#
- Parameters:
qos (QOS) –
message_args (Any) –
- Return type:
MQTTMessageInfo
- reuploading()#
- Return type:
bool
- send_ack(link_name, message)#
- Parameters:
link_name (str) –
message (Message[Any]) –
- Return type:
None
- async send_ping(link_name)#
- Parameters:
link_name (str) –
- start(loop, async_queue)#
- Parameters:
loop (AbstractEventLoop) –
async_queue (Queue) –
- Return type:
None
- start_ping_tasks()#
- Return type:
list[Task]
- stopped(name)#
- Parameters:
name (str) –
- Return type:
bool
- subscribe(client, topic, qos)#
- Parameters:
client (str) –
topic (str) –
qos (int) –
- Return type:
Tuple[int, int | None]
- subscribed(link_name)#
- Parameters:
link_name (str) –
- Return type:
bool
- update_recv_time(link_name)#
- Parameters:
link_name (str) –
- Return type:
None
- property upstream_client: str#
- class gwproactor.links.LinkManagerTransition(link_name: str = '', transition_name: gwproactor.links.link_state.TransitionName = <TransitionName.none: 'none'>, old_state: gwproactor.links.link_state.StateName = <StateName.not_started: 'not_started'>, new_state: gwproactor.links.link_state.StateName = <StateName.not_started: 'not_started'>, canceled_acks: list[gwproactor.links.acks.AckWaitInfo] = <factory>)#
- Parameters:
link_name (str) –
transition_name (TransitionName) –
old_state (StateName) –
new_state (StateName) –
canceled_acks (list[AckWaitInfo]) –
- canceled_acks: list[AckWaitInfo]#
- class gwproactor.links.LinkMessageTimes(last_send: float = <factory>, last_recv: float = <factory>)#
- Parameters:
last_send (float) –
last_recv (float) –
- get_str(link_poll_seconds=60.0, relative=True)#
- Parameters:
link_poll_seconds (float) –
relative (bool) –
- Return type:
str
- last_recv: float#
- last_send: float#
- next_ping_second(link_poll_seconds)#
- Parameters:
link_poll_seconds (float) –
- Return type:
float
- seconds_until_next_ping(link_poll_seconds)#
- Parameters:
link_poll_seconds (float) –
- Return type:
float
- time_to_send_ping(link_poll_seconds)#
- Parameters:
link_poll_seconds (float) –
- Return type:
bool
- class gwproactor.links.LinkState(name, curr_state=StateName.not_started)#
- Parameters:
name (str) –
curr_state (State) –
- _handle(result)#
- Return type:
Ok[Transition] | Err[InvalidCommStateInput]
- active()#
- active_for_recv()#
- active_for_send()#
- curr_state: State#
- name: str#
- process_ack_timeout()#
- Return type:
Ok[Transition] | Err[InvalidCommStateInput]
- process_mqtt_connect_fail()#
- Return type:
Ok[Transition] | Err[InvalidCommStateInput]
- process_mqtt_connected()#
- Return type:
Ok[Transition] | Err[InvalidCommStateInput]
- process_mqtt_disconnected()#
- Return type:
Ok[Transition] | Err[InvalidCommStateInput]
- process_mqtt_message()#
- Return type:
Ok[Transition] | Err[InvalidCommStateInput]
- process_mqtt_suback(num_pending_subscriptions)#
- Parameters:
num_pending_subscriptions (int) –
- Return type:
Ok[Transition] | Err[InvalidCommStateInput]
- start()#
- Return type:
Ok[Transition] | Err[InvalidCommStateInput]
- stop()#
- Return type:
Ok[Transition] | Err[InvalidCommStateInput]
- class gwproactor.links.LinkStates(names=None)#
- Parameters:
names (Sequence[str] | None) –
- add(name, state=StateName.not_started)#
- link_names()#
- Return type:
list[str]
- process_ack_timeout(name)#
- Parameters:
name (str) –
- Return type:
Ok[Transition] | Err[InvalidCommStateInput]
- process_mqtt_connect_fail(message)#
- Parameters:
message (Message[MQTTConnectFailPayload]) –
- Return type:
Ok[Transition] | Err[InvalidCommStateInput]
- process_mqtt_connected(message)#
- Parameters:
message (Message[MQTTConnectPayload]) –
- Return type:
Ok[Transition] | Err[InvalidCommStateInput]
- process_mqtt_disconnected(message)#
- Parameters:
message (Message[MQTTDisconnectPayload]) –
- Return type:
Ok[Transition] | Err[InvalidCommStateInput]
- process_mqtt_message(message)#
- Parameters:
message (Message[MQTTReceiptPayload]) –
- Return type:
Ok[Transition] | Err[InvalidCommStateInput]
- process_mqtt_suback(name, num_pending_subscriptions)#
- Parameters:
name (str) –
num_pending_subscriptions (int) –
- Return type:
Ok[Transition] | Err[InvalidCommStateInput]
- start(name)#
- Parameters:
name (str) –
- Return type:
Ok[Transition] | Err[InvalidCommStateInput]
- start_all()#
- Return type:
Ok[bool] | Err[Sequence[BaseException]]
- stop(name)#
- Parameters:
name (str) –
- Return type:
Ok[Transition] | Err[InvalidCommStateInput]
- stopped(name)#
- Parameters:
name (str) –
- Return type:
bool
- class gwproactor.links.MQTTClientWrapper(name, client_config, receive_queue)#
- Parameters:
name (str) –
client_config (MQTTClient) –
receive_queue (AsyncQueueWriter) –
- _client: Client#
- _client_config: MQTTClient#
- _client_thread()#
- _name: str#
- _pending_subacks: Dict[int, List[str]]#
- _pending_subscriptions: Set[str]#
- _receive_queue: AsyncQueueWriter#
- _stop_requested: bool#
- _subscriptions: Dict[str, int]#
- connected()#
- Return type:
bool
- disable_logger()#
- enable_logger(logger=None)#
- Parameters:
logger (Logger | LoggerAdapter | None) –
- handle_suback(suback)#
- Parameters:
suback (MQTTSubackPayload) –
- Return type:
int
- num_pending_subscriptions()#
- Return type:
int
- num_subscriptions()#
- Return type:
int
- on_connect(_, userdata, flags, rc)#
- on_connect_fail(_, userdata)#
- on_disconnect(_, userdata, rc)#
- on_message(_, userdata, message)#
- on_subscribe(_, userdata, mid, granted_qos)#
- publish(topic, payload, qos)#
- Parameters:
topic (str) –
payload (bytes) –
qos (int) –
- Return type:
MQTTMessageInfo
- start()#
- stop()#
- subscribe(topic, qos)#
- Parameters:
topic (str) –
qos (int) –
- Return type:
Tuple[int, int | None]
- subscribe_all()#
- Return type:
Tuple[int, int | None]
- subscribed()#
- Return type:
bool
- subscription_items()#
- Return type:
list[Tuple[str, int]]
- unsubscribe(topic)#
- Parameters:
topic (str) –
- Return type:
Tuple[int, int | None]
- class gwproactor.links.MQTTClients#
- _send_queue: AsyncQueueWriter#
- add_client(name, client_config, upstream=False, primary_peer=False)#
- Parameters:
name (str) –
client_config (MQTTClient) –
upstream (bool) –
primary_peer (bool) –
- client_wrapper(client)#
- Parameters:
client (str) –
- Return type:
- clients: Dict[str, MQTTClientWrapper]#
- connected(client)#
- Parameters:
client (str) –
- Return type:
bool
- disable_loggers()#
- enable_loggers(logger=None)#
- Parameters:
logger (Logger | LoggerAdapter | None) –
- handle_suback(suback)#
- Parameters:
suback (MQTTSubackPayload) –
- Return type:
int
- num_pending_subscriptions(client)#
- Parameters:
client (str) –
- Return type:
int
- num_subscriptions(client)#
- Parameters:
client (str) –
- Return type:
int
- primary_peer()#
- Return type:
- primary_peer_client: str = ''#
- publish(client, topic, payload, qos)#
- Parameters:
client (str) –
topic (str) –
payload (bytes) –
qos (int) –
- Return type:
MQTTMessageInfo
- start(loop, async_queue)#
- Parameters:
loop (AbstractEventLoop) –
async_queue (Queue) –
- stop()#
- subscribe(client, topic, qos)#
- Parameters:
client (str) –
topic (str) –
qos (int) –
- Return type:
Tuple[int, int | None]
- subscribe_all(client)#
- Parameters:
client (str) –
- Return type:
Tuple[int, int | None]
- subscribed(client)#
- Parameters:
client (str) –
- Return type:
bool
- unsubscribe(client, topic)#
- Parameters:
client (str) –
topic (str) –
- Return type:
Tuple[int, int | None]
- upstream()#
- Return type:
- upstream_client: str = ''#
- class gwproactor.links.MessageTimes#
- _links: dict[str, LinkMessageTimes]#
- add_link(name)#
- Parameters:
name (str) –
- Return type:
None
- get_copy(link_name)#
- Parameters:
link_name (str) –
- Return type:
- link_names()#
- Return type:
list[str]
- update_recv(link_name, now=None)#
- Parameters:
link_name (str) –
now (float | None) –
- Return type:
None
- update_send(link_name, now=None)#
- Parameters:
link_name (str) –
now (float | None) –
- Return type:
None
- class gwproactor.links.QOS(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)#
- AtLeastOnce = 1#
- AtMostOnce = 0#
- ExactlyOnce = 2#
- class gwproactor.links.Reuploads(event_persister, logger, num_initial_events=5)#
- Parameters:
event_persister (PersisterInterface) –
logger (ProactorLogger) –
num_initial_events (int) –
- NUM_INITIAL_EVENTS: int = 5#
- _event_persister: PersisterInterface#
- _logger: ProactorLogger#
- _num_initial_events: int#
- _reupload_pending: dict[str, None]#
- _reuploaded_unacked: dict[str, None]#
- clear()#
- Return type:
None
- property num_reupload_pending: int#
- property num_reuploaded_unacked: int#
- process_ack_for_reupload(message_id)#
- Parameters:
message_id (str) –
- Return type:
list[str]
- reuploading()#
- Return type:
bool
- start_reupload()#
- Return type:
list[str]
- exception gwproactor.links.RuntimeLinkStateError(name='', current_state=StateName.none, transition=TransitionName.none, *, msg='')#
- Parameters:
name (str) –
current_state (StateName) –
transition (TransitionName) –
msg (str) –
- class gwproactor.links.StateName(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)#
- active = 'active'#
- awaiting_peer = 'awaiting_peer'#
- awaiting_setup = 'awaiting_setup'#
- awaiting_setup_and_peer = 'awaiting_setup_and_peer'#
- connecting = 'connecting'#
- none = 'none'#
- not_started = 'not_started'#
- stopped = 'stopped'#
- class gwproactor.links.Subscription(Topic, Qos)#
- Parameters:
Topic (str) –
Qos (QOS) –
- Topic: str#
Alias for field number 0
- _asdict()#
Return a new dict which maps field names to their values.
- _field_defaults = {}#
- _fields = ('Topic', 'Qos')#
- classmethod _make(iterable)#
Make a new Subscription object from a sequence or iterable
- _replace(**kwds)#
Return a new Subscription object replacing specified fields with new values
- class gwproactor.links.TimerManagerInterface#
Simple interface to infrastructure which can start timers, run callbacks on timer completion, and cancel timers.
- _abc_impl = <_abc._abc_data object>#
- abstract cancel_timer(timer_handle)#
Cancel callback associated with _timer_handle_.
Note that callback might still run after this call returns.
- Parameters:
timer_handle (Any) – The value returned by start_timer()
- Return type:
None
- abstract start_timer(delay_seconds, callback)#
Start a timer. Implementation is expected to call _callback_ after approximately _delay_sceonds_.
The execution context (e.g. the thread) of the callback must be specified by the implemntation.
The callback must have sufficient context available to it do its work as well as to detect if it is no longer relevant. Note a callback might run after cancelation if the callack was already “in-flight” at time of cancellation and it is up to the callback to tolerate this situation.
- Parameters:
delay_seconds (float) – The approximate delay before the callback is called.
callback (Callable[[], None]) – The function called after delay_seconds.
- Returns:
A timer handle which can be passed to _cancel_timer()_ to cancel the callback.
- Return type:
Any
- class gwproactor.links.Transition(link_name: str = '', transition_name: gwproactor.links.link_state.TransitionName = <TransitionName.none: 'none'>, old_state: gwproactor.links.link_state.StateName = <StateName.not_started: 'not_started'>, new_state: gwproactor.links.link_state.StateName = <StateName.not_started: 'not_started'>)#
- Parameters:
link_name (str) –
transition_name (TransitionName) –
old_state (StateName) –
new_state (StateName) –
- activated()#
- active()#
- deactivated()#
- link_name: str = ''#
- recv_activated()#
- Return type:
bool
- recv_deactivated()#
- Return type:
bool
- send_activated()#
- Return type:
bool
- send_deactivated()#
- Return type:
bool
- send_is_active()#
- Return type:
bool
- transition_name: TransitionName = 'none'#
- class gwproactor.links.TransitionName(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)#
- message_from_peer = 'message_from_peer'#
- mqtt_connect_failed = 'mqtt_connect_failed'#
- mqtt_connected = 'mqtt_connected'#
- mqtt_disconnected = 'mqtt_disconnected'#
- mqtt_suback = 'mqtt_suback'#
- none = 'none'#
- response_timeout = 'response_timeout'#
- start_called = 'start_called'#
- stop_called = 'stop_called'#