gwproactor.links¶
Internal package used to manage communications state. LinkManager
is the interface into this package used by
Proactor
- class gwproactor.links.AckManager(timer_mgr, callback, delay=5.0)¶
- Parameters:
timer_mgr (TimerManagerInterface)
callback (Callable[[AckWaitInfo], None])
delay (float | None)
- _acks: dict[str, dict[str, AckWaitInfo]]¶
- _callback: Callable[[AckWaitInfo], None]¶
- _default_delay_seconds: float¶
- _pop_wait_info(link_name, message_id)¶
- Parameters:
link_name (str)
message_id (str)
- Return type:
AckWaitInfo | None
- _timeout(link_name, message_id)¶
- Parameters:
link_name (str)
message_id (str)
- Return type:
None
- _timer_mgr: TimerManagerInterface¶
- add_link(link_name)¶
- Parameters:
link_name (str)
- Return type:
None
- cancel_ack_timer(link_name, message_id)¶
- Parameters:
link_name (str)
message_id (str)
- Return type:
- cancel_ack_timers(link_name)¶
- Parameters:
link_name (str)
- Return type:
list[AckWaitInfo]
- num_acks(link_name)¶
- Parameters:
link_name (str)
- Return type:
int
- start_ack_timer(link_name, message_id, context=None, delay_seconds=None)¶
- Parameters:
link_name (str)
message_id (str)
context (Any | None)
delay_seconds (float | None)
- Return type:
- class gwproactor.links.AckWaitInfo(link_name: str, message_id: str, timer_handle: Any, context: Any = None)¶
- Parameters:
link_name (str)
message_id (str)
timer_handle (Any)
context (Any)
- context: Any = None¶
- link_name: str¶
- message_id: str¶
- timer_handle: Any¶
- class gwproactor.links.AsyncioTimerManager¶
- _abc_impl = <_abc._abc_data object>¶
- cancel_timer(timer_handle)¶
Cancel callback associated with _timer_handle_.
Note that callback might still run after this call returns.
- Parameters:
timer_handle (Any) – The value returned by start_timer()
- Return type:
None
- start_timer(delay_seconds, callback)¶
Start a timer. Implementation is expected to call _callback_ after approximately _delay_sceonds_.
The execution context (e.g. the thread) of the callback must be specified by the implemntation.
The callback must have sufficient context available to it do its work as well as to detect if it is no longer relevant. Note a callback might run after cancelation if the callack was already “in-flight” at time of cancellation and it is up to the callback to tolerate this situation.
- Parameters:
delay_seconds (float) – The approximate delay before the callback is called.
callback (Callable[[], None]) – The function called after delay_seconds.
- Returns:
A timer handle which can be passed to _cancel_timer()_ to cancel the callback.
- Return type:
TimerHandle
- exception gwproactor.links.CommLinkAlreadyExists(name='', current_state=StateName.none, transition=TransitionName.none, *, msg='')¶
- Parameters:
name (str)
current_state (StateName)
transition (TransitionName)
msg (str)
- exception gwproactor.links.CommLinkMissing(name, *, msg='')¶
- Parameters:
name (str)
- exception gwproactor.links.InvalidCommStateInput(name='', current_state=StateName.none, transition=TransitionName.none, *, msg='')¶
- Parameters:
name (str)
current_state (StateName)
transition (TransitionName)
msg (str)
- name: str = ''¶
- transition: TransitionName = 'none'¶
- class gwproactor.links.LinkManager(publication_name, settings, logger, stats, event_persister, timer_manager, ack_timeout_callback)¶
- Parameters:
publication_name (str)
settings (ProactorSettings)
logger (ProactorLogger)
stats (ProactorStats)
event_persister (PersisterInterface)
timer_manager (TimerManagerInterface)
ack_timeout_callback (Callable[[AckWaitInfo], None])
- PERSISTER_ENCODING = 'utf-8'¶
- _acks: AckManager¶
- _continue_reupload(event_ids)¶
- Parameters:
event_ids (list[str])
- Return type:
None
- _event_persister: PersisterInterface¶
- _logger: ProactorLogger¶
- _message_times: MessageTimes¶
- _mqtt_clients: MQTTClients¶
- _mqtt_codecs: dict[str, MQTTCodec]¶
- _recv_activated(transition)¶
- Parameters:
transition (Transition)
- _reupload_event(event_id)¶
Load event for event_id from storage, decoded to JSON and send it.
Return either Ok(True) or Err(Problems(list of decoding errors)).
Send errors handled either by exception, which will propagate up, or by ack timeout.
- Return type:
Ok[bool] | Err[Problems]
- _settings: ProactorSettings¶
- _start_reupload()¶
- Return type:
None
- _states: LinkStates¶
- _stats: ProactorStats¶
- add_mqtt_link(name, mqtt_config, codec=None, upstream=False, primary_peer=False)¶
- Parameters:
name (str)
mqtt_config (MQTTClient)
codec (MQTTCodec | None)
upstream (bool)
primary_peer (bool)
- decode(link_name, topic, payload)¶
- Parameters:
link_name (str)
topic (str)
payload (bytes)
- Return type:
Message[Any]
- decoder(link_name)¶
- Parameters:
link_name (str)
- Return type:
MQTTCodec | None
- disable_mqtt_loggers()¶
- enable_mqtt_loggers(logger=None)¶
- Parameters:
logger (Logger | LoggerAdapter | None)
- generate_event(event)¶
- Parameters:
event (EventT)
- Return type:
Ok[bool] | Err[BaseException]
- get_message_times(link_name)¶
- Parameters:
link_name (str)
- Return type:
- get_reuploads_str(verbose=True, num_events=5)¶
- Parameters:
verbose (bool)
num_events (int)
- Return type:
str
- link_names()¶
- Return type:
list[str]
- log_subscriptions(tag='')¶
- mqtt_client_wrapper(client_name)¶
- Parameters:
client_name (str)
- Return type:
- mqtt_clients()¶
- Return type:
- num_acks(link_name)¶
- Parameters:
link_name (str)
- Return type:
int
- property num_pending: int¶
- property num_reupload_pending: int¶
- property num_reuploaded_unacked: int¶
- property primary_peer_client: str¶
- process_ack(link_name, message_id)¶
- Parameters:
link_name (str)
message_id (str)
- process_ack_timeout(wait_info)¶
- Parameters:
wait_info (AckWaitInfo)
- Return type:
Ok[LinkManagerTransition] | Err[BaseException]
- process_mqtt_connect_fail(message)¶
- Parameters:
message (Message[MQTTConnectFailPayload])
- Return type:
Ok[Transition] | Err[InvalidCommStateInput]
- process_mqtt_connected(message)¶
- Parameters:
message (Message[MQTTConnectPayload])
- Return type:
Ok[Transition] | Err[InvalidCommStateInput]
- process_mqtt_disconnected(message)¶
- Parameters:
message (Message[MQTTDisconnectPayload])
- Return type:
Ok[LinkManagerTransition] | Err[InvalidCommStateInput]
- process_mqtt_message(message)¶
- Parameters:
message (Message[MQTTReceiptPayload])
- Return type:
Ok[Transition] | Err[InvalidCommStateInput]
- process_mqtt_suback(message)¶
- Parameters:
message (Message[MQTTSubackPayload])
- Return type:
Ok[Transition] | Err[InvalidCommStateInput]
- publication_name: str¶
- publish_message(client, message, qos=0, context=None)¶
- Parameters:
message (Message)
qos (int)
context (Any)
- Return type:
MQTTMessageInfo
- publish_upstream(payload, qos=QOS.AtMostOnce, **message_args)¶
- Parameters:
qos (QOS)
message_args (Any)
- Return type:
MQTTMessageInfo
- reuploading()¶
- Return type:
bool
- send_ack(link_name, message)¶
- Parameters:
link_name (str)
message (Message[Any])
- Return type:
None
- async send_ping(link_name)¶
- Parameters:
link_name (str)
- start(loop, async_queue)¶
- Parameters:
loop (AbstractEventLoop)
async_queue (Queue)
- Return type:
None
- start_ping_tasks()¶
- Return type:
list[Task]
- stopped(name)¶
- Parameters:
name (str)
- Return type:
bool
- subscribe(client, topic, qos)¶
- Parameters:
client (str)
topic (str)
qos (int)
- Return type:
Tuple[int, int | None]
- subscribed(link_name)¶
- Parameters:
link_name (str)
- Return type:
bool
- update_recv_time(link_name)¶
- Parameters:
link_name (str)
- Return type:
None
- property upstream_client: str¶
- class gwproactor.links.LinkManagerTransition(link_name: str = '', transition_name: gwproactor.links.link_state.TransitionName = <TransitionName.none: 'none'>, old_state: gwproactor.links.link_state.StateName = <StateName.not_started: 'not_started'>, new_state: gwproactor.links.link_state.StateName = <StateName.not_started: 'not_started'>, canceled_acks: list[gwproactor.links.acks.AckWaitInfo] = <factory>)¶
- Parameters:
link_name (str)
transition_name (TransitionName)
old_state (StateName)
new_state (StateName)
canceled_acks (list[AckWaitInfo])
- canceled_acks: list[AckWaitInfo]¶
- class gwproactor.links.LinkMessageTimes(last_send: float = <factory>, last_recv: float = <factory>)¶
- Parameters:
last_send (float)
last_recv (float)
- get_str(link_poll_seconds=60.0, relative=True)¶
- Parameters:
link_poll_seconds (float)
relative (bool)
- Return type:
str
- last_recv: float¶
- last_send: float¶
- next_ping_second(link_poll_seconds)¶
- Parameters:
link_poll_seconds (float)
- Return type:
float
- seconds_until_next_ping(link_poll_seconds)¶
- Parameters:
link_poll_seconds (float)
- Return type:
float
- time_to_send_ping(link_poll_seconds)¶
- Parameters:
link_poll_seconds (float)
- Return type:
bool
- class gwproactor.links.LinkState(name, curr_state=StateName.not_started)¶
- Parameters:
name (str)
curr_state (State)
- _handle(result)¶
- Return type:
Ok[Transition] | Err[InvalidCommStateInput]
- active()¶
- active_for_recv()¶
- active_for_send()¶
- curr_state: State¶
- name: str¶
- process_ack_timeout()¶
- Return type:
Ok[Transition] | Err[InvalidCommStateInput]
- process_mqtt_connect_fail()¶
- Return type:
Ok[Transition] | Err[InvalidCommStateInput]
- process_mqtt_connected()¶
- Return type:
Ok[Transition] | Err[InvalidCommStateInput]
- process_mqtt_disconnected()¶
- Return type:
Ok[Transition] | Err[InvalidCommStateInput]
- process_mqtt_message()¶
- Return type:
Ok[Transition] | Err[InvalidCommStateInput]
- process_mqtt_suback(num_pending_subscriptions)¶
- Parameters:
num_pending_subscriptions (int)
- Return type:
Ok[Transition] | Err[InvalidCommStateInput]
- start()¶
- Return type:
Ok[Transition] | Err[InvalidCommStateInput]
- stop()¶
- Return type:
Ok[Transition] | Err[InvalidCommStateInput]
- class gwproactor.links.LinkStates(names=None)¶
- Parameters:
names (Sequence[str] | None)
- add(name, state=StateName.not_started)¶
- link_names()¶
- Return type:
list[str]
- process_ack_timeout(name)¶
- Parameters:
name (str)
- Return type:
Ok[Transition] | Err[InvalidCommStateInput]
- process_mqtt_connect_fail(message)¶
- Parameters:
message (Message[MQTTConnectFailPayload])
- Return type:
Ok[Transition] | Err[InvalidCommStateInput]
- process_mqtt_connected(message)¶
- Parameters:
message (Message[MQTTConnectPayload])
- Return type:
Ok[Transition] | Err[InvalidCommStateInput]
- process_mqtt_disconnected(message)¶
- Parameters:
message (Message[MQTTDisconnectPayload])
- Return type:
Ok[Transition] | Err[InvalidCommStateInput]
- process_mqtt_message(message)¶
- Parameters:
message (Message[MQTTReceiptPayload])
- Return type:
Ok[Transition] | Err[InvalidCommStateInput]
- process_mqtt_suback(name, num_pending_subscriptions)¶
- Parameters:
name (str)
num_pending_subscriptions (int)
- Return type:
Ok[Transition] | Err[InvalidCommStateInput]
- start(name)¶
- Parameters:
name (str)
- Return type:
Ok[Transition] | Err[InvalidCommStateInput]
- start_all()¶
- Return type:
Ok[bool] | Err[Sequence[BaseException]]
- stop(name)¶
- Parameters:
name (str)
- Return type:
Ok[Transition] | Err[InvalidCommStateInput]
- stopped(name)¶
- Parameters:
name (str)
- Return type:
bool
- class gwproactor.links.MQTTClientWrapper(name, client_config, receive_queue)¶
- Parameters:
name (str)
client_config (MQTTClient)
receive_queue (AsyncQueueWriter)
- _client: Client¶
- _client_config: MQTTClient¶
- _client_thread()¶
- _name: str¶
- _pending_subacks: Dict[int, List[str]]¶
- _pending_subscriptions: Set[str]¶
- _receive_queue: AsyncQueueWriter¶
- _stop_requested: bool¶
- _subscriptions: Dict[str, int]¶
- connected()¶
- Return type:
bool
- disable_logger()¶
- enable_logger(logger=None)¶
- Parameters:
logger (Logger | LoggerAdapter | None)
- handle_suback(suback)¶
- Parameters:
suback (MQTTSubackPayload)
- Return type:
int
- num_pending_subscriptions()¶
- Return type:
int
- num_subscriptions()¶
- Return type:
int
- on_connect(_, userdata, flags, rc)¶
- on_connect_fail(_, userdata)¶
- on_disconnect(_, userdata, rc)¶
- on_message(_, userdata, message)¶
- on_subscribe(_, userdata, mid, granted_qos)¶
- publish(topic, payload, qos)¶
- Parameters:
topic (str)
payload (bytes)
qos (int)
- Return type:
MQTTMessageInfo
- start()¶
- stop()¶
- subscribe(topic, qos)¶
- Parameters:
topic (str)
qos (int)
- Return type:
Tuple[int, int | None]
- subscribe_all()¶
- Return type:
Tuple[int, int | None]
- subscribed()¶
- Return type:
bool
- subscription_items()¶
- Return type:
list[Tuple[str, int]]
- unsubscribe(topic)¶
- Parameters:
topic (str)
- Return type:
Tuple[int, int | None]
- class gwproactor.links.MQTTClients¶
- _send_queue: AsyncQueueWriter¶
- add_client(name, client_config, upstream=False, primary_peer=False)¶
- Parameters:
name (str)
client_config (MQTTClient)
upstream (bool)
primary_peer (bool)
- client_wrapper(client)¶
- Parameters:
client (str)
- Return type:
- clients: Dict[str, MQTTClientWrapper]¶
- connected(client)¶
- Parameters:
client (str)
- Return type:
bool
- disable_loggers()¶
- enable_loggers(logger=None)¶
- Parameters:
logger (Logger | LoggerAdapter | None)
- handle_suback(suback)¶
- Parameters:
suback (MQTTSubackPayload)
- Return type:
int
- num_pending_subscriptions(client)¶
- Parameters:
client (str)
- Return type:
int
- num_subscriptions(client)¶
- Parameters:
client (str)
- Return type:
int
- primary_peer()¶
- Return type:
- primary_peer_client: str = ''¶
- publish(client, topic, payload, qos)¶
- Parameters:
client (str)
topic (str)
payload (bytes)
qos (int)
- Return type:
MQTTMessageInfo
- start(loop, async_queue)¶
- Parameters:
loop (AbstractEventLoop)
async_queue (Queue)
- stop()¶
- subscribe(client, topic, qos)¶
- Parameters:
client (str)
topic (str)
qos (int)
- Return type:
Tuple[int, int | None]
- subscribe_all(client)¶
- Parameters:
client (str)
- Return type:
Tuple[int, int | None]
- subscribed(client)¶
- Parameters:
client (str)
- Return type:
bool
- unsubscribe(client, topic)¶
- Parameters:
client (str)
topic (str)
- Return type:
Tuple[int, int | None]
- upstream()¶
- Return type:
- upstream_client: str = ''¶
- class gwproactor.links.MessageTimes¶
- _links: dict[str, LinkMessageTimes]¶
- add_link(name)¶
- Parameters:
name (str)
- Return type:
None
- get_copy(link_name)¶
- Parameters:
link_name (str)
- Return type:
- link_names()¶
- Return type:
list[str]
- update_recv(link_name, now=None)¶
- Parameters:
link_name (str)
now (float | None)
- Return type:
None
- update_send(link_name, now=None)¶
- Parameters:
link_name (str)
now (float | None)
- Return type:
None
- class gwproactor.links.QOS(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)¶
- AtLeastOnce = 1¶
- AtMostOnce = 0¶
- ExactlyOnce = 2¶
- class gwproactor.links.Reuploads(logger, num_initial_events=5)¶
Track event uids that are part of a re-upload, both those that have not yet been sent (pending) and those that are “in-flight”, (unacked). Upon ack, update records and return next message to be sent.
- Parameters:
logger (ProactorLogger)
num_initial_events (int)
- NUM_INITIAL_EVENTS: int = 5¶
Default number of events to send when re-upload starts.
- _log_start_reupload(num_pending_events, num_reupload_now)¶
- _logger: ProactorLogger¶
- _num_initial_events: int¶
Number of events to send when re-upload starts.
- _reupload_pending: dict[str, None]¶
‘Ordered set’ of unsent events that are part of this reupload. A dict is used to provide insertion order and fast lookup.
- _reuploaded_unacked: dict[str, None]¶
‘Ordered set’ of sent but as-yet unacked A dict is used to provide insertion order and fast lookup.
- clear()¶
- Return type:
None
- clear_unacked_event(ack_id)¶
- Parameters:
ack_id (str)
- Return type:
None
- get_str(verbose=True, num_events=5)¶
- Parameters:
verbose (bool)
num_events (int)
- Return type:
str
- property num_reupload_pending: int¶
- property num_reuploaded_unacked: int¶
- process_ack_for_reupload(ack_id)¶
If ack_id is in our “unacked” store, remove it from the unacked store. If any events remain in our “pending” store, move the first pending event from the pending store to the unacked store and return it for sending next.
- Parameters:
ack_id (str)
- Return type:
list[str]
- reuploading()¶
- Return type:
bool
- start_reupload(pending_events)¶
Track all pending_events for reupload. Of the pending_events, record the first _num_initial_events as “unacked” and the rest as “pending”. Return the “unacked” group so they can be sent.
- Parameters:
pending_events (list[str])
- Return type:
list[str]
- stats: LinkStats | None = None¶
Object into which we can record reupload start and complete. Set during LinkManager.start() since upstream client does not exist durin LinkManager construction.
- exception gwproactor.links.RuntimeLinkStateError(name='', current_state=StateName.none, transition=TransitionName.none, *, msg='')¶
- Parameters:
name (str)
current_state (StateName)
transition (TransitionName)
msg (str)
- class gwproactor.links.StateName(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)¶
- active = 'active'¶
- awaiting_peer = 'awaiting_peer'¶
- awaiting_setup = 'awaiting_setup'¶
- awaiting_setup_and_peer = 'awaiting_setup_and_peer'¶
- connecting = 'connecting'¶
- none = 'none'¶
- not_started = 'not_started'¶
- stopped = 'stopped'¶
- class gwproactor.links.Subscription(Topic, Qos)¶
- Parameters:
Topic (str)
Qos (QOS)
- Topic: str¶
Alias for field number 0
- _asdict()¶
Return a new dict which maps field names to their values.
- _field_defaults = {}¶
- _fields = ('Topic', 'Qos')¶
- classmethod _make(iterable)¶
Make a new Subscription object from a sequence or iterable
- _replace(**kwds)¶
Return a new Subscription object replacing specified fields with new values
- class gwproactor.links.TimerManagerInterface¶
Simple interface to infrastructure which can start timers, run callbacks on timer completion, and cancel timers.
- _abc_impl = <_abc._abc_data object>¶
- abstract cancel_timer(timer_handle)¶
Cancel callback associated with _timer_handle_.
Note that callback might still run after this call returns.
- Parameters:
timer_handle (Any) – The value returned by start_timer()
- Return type:
None
- abstract start_timer(delay_seconds, callback)¶
Start a timer. Implementation is expected to call _callback_ after approximately _delay_sceonds_.
The execution context (e.g. the thread) of the callback must be specified by the implemntation.
The callback must have sufficient context available to it do its work as well as to detect if it is no longer relevant. Note a callback might run after cancelation if the callack was already “in-flight” at time of cancellation and it is up to the callback to tolerate this situation.
- Parameters:
delay_seconds (float) – The approximate delay before the callback is called.
callback (Callable[[], None]) – The function called after delay_seconds.
- Returns:
A timer handle which can be passed to _cancel_timer()_ to cancel the callback.
- Return type:
Any
- class gwproactor.links.Transition(link_name: str = '', transition_name: gwproactor.links.link_state.TransitionName = <TransitionName.none: 'none'>, old_state: gwproactor.links.link_state.StateName = <StateName.not_started: 'not_started'>, new_state: gwproactor.links.link_state.StateName = <StateName.not_started: 'not_started'>)¶
- Parameters:
link_name (str)
transition_name (TransitionName)
old_state (StateName)
new_state (StateName)
- activated()¶
- active()¶
- deactivated()¶
- link_name: str = ''¶
- recv_activated()¶
- Return type:
bool
- recv_deactivated()¶
- Return type:
bool
- send_activated()¶
- Return type:
bool
- send_deactivated()¶
- Return type:
bool
- send_is_active()¶
- Return type:
bool
- transition_name: TransitionName = 'none'¶
- class gwproactor.links.TransitionName(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)¶
- message_from_peer = 'message_from_peer'¶
- mqtt_connect_failed = 'mqtt_connect_failed'¶
- mqtt_connected = 'mqtt_connected'¶
- mqtt_disconnected = 'mqtt_disconnected'¶
- mqtt_suback = 'mqtt_suback'¶
- none = 'none'¶
- response_timeout = 'response_timeout'¶
- start_called = 'start_called'¶
- stop_called = 'stop_called'¶