Contents Menu Expand Light mode Dark mode Auto light/dark, in light mode Auto light/dark, in dark mode Skip to content
Gridworks Proactor documentation
Gridworks Proactor documentation
  • Communication state
  • Reference
    • gwproactor
    • gwproactor.links
    • gwproactor_test
  • Contributor Guide
  • Code of Conduct
  • License
  • Changelog
Back to top
View this page

gwproactor.links¶

Internal package used to manage communications state. LinkManager is the interface into this package used by Proactor

class gwproactor.links.AckManager(timer_mgr, callback, delay=5.0)¶
Parameters:
  • timer_mgr (TimerManagerInterface)

  • callback (Callable[[AckWaitInfo], None])

  • delay (float | None)

_acks: dict[str, dict[str, AckWaitInfo]]¶
_callback: Callable[[AckWaitInfo], None]¶
_default_delay_seconds: float¶
_pop_wait_info(link_name, message_id)¶
Parameters:
  • link_name (str)

  • message_id (str)

Return type:

AckWaitInfo | None

_timeout(link_name, message_id)¶
Parameters:
  • link_name (str)

  • message_id (str)

Return type:

None

_timer_mgr: TimerManagerInterface¶
add_link(link_name)¶
Parameters:

link_name (str)

Return type:

None

cancel_ack_timer(link_name, message_id)¶
Parameters:
  • link_name (str)

  • message_id (str)

Return type:

AckWaitInfo

cancel_ack_timers(link_name)¶
Parameters:

link_name (str)

Return type:

list[AckWaitInfo]

property default_delay_seconds: float¶
num_acks(link_name)¶
Parameters:

link_name (str)

Return type:

int

start_ack_timer(link_name, message_id, context=None, delay_seconds=None)¶
Parameters:
  • link_name (str)

  • message_id (str)

  • context (Any | None)

  • delay_seconds (float | None)

Return type:

AckWaitInfo

class gwproactor.links.AckWaitInfo(link_name: str, message_id: str, timer_handle: Any, context: Any = None)¶
Parameters:
  • link_name (str)

  • message_id (str)

  • timer_handle (Any)

  • context (Any)

context: Any = None¶
link_name: str¶
message_id: str¶
timer_handle: Any¶
class gwproactor.links.AsyncioTimerManager¶
_abc_impl = <_abc._abc_data object>¶
cancel_timer(timer_handle)¶

Cancel callback associated with _timer_handle_.

Note that callback might still run after this call returns.

Parameters:

timer_handle (Any) – The value returned by start_timer()

Return type:

None

start_timer(delay_seconds, callback)¶

Start a timer. Implementation is expected to call _callback_ after approximately _delay_sceonds_.

The execution context (e.g. the thread) of the callback must be specified by the implemntation.

The callback must have sufficient context available to it do its work as well as to detect if it is no longer relevant. Note a callback might run after cancelation if the callack was already “in-flight” at time of cancellation and it is up to the callback to tolerate this situation.

Parameters:
  • delay_seconds (float) – The approximate delay before the callback is called.

  • callback (Callable[[], None]) – The function called after delay_seconds.

Returns:

A timer handle which can be passed to _cancel_timer()_ to cancel the callback.

Return type:

TimerHandle

exception gwproactor.links.CommLinkAlreadyExists(name='', current_state=StateName.none, transition=TransitionName.none, *, msg='')¶
Parameters:
  • name (str)

  • current_state (StateName)

  • transition (TransitionName)

  • msg (str)

Return type:

None

exception gwproactor.links.CommLinkMissing(name, *, msg='')¶
Parameters:
  • name (str)

  • msg (str)

Return type:

None

exception gwproactor.links.InvalidCommStateInput(name='', current_state=StateName.none, transition=TransitionName.none, *, msg='')¶
Parameters:
  • name (str)

  • current_state (StateName)

  • transition (TransitionName)

  • msg (str)

Return type:

None

current_state: StateName = 'none'¶
name: str = ''¶
transition: TransitionName = 'none'¶
class gwproactor.links.LinkManager(publication_name, subscription_name, settings, logger, stats, event_persister, timer_manager, ack_timeout_callback)¶
Parameters:
  • publication_name (str)

  • subscription_name (str)

  • settings (ProactorSettings)

  • logger (ProactorLogger)

  • stats (ProactorStats)

  • event_persister (PersisterInterface)

  • timer_manager (TimerManagerInterface)

  • ack_timeout_callback (Callable[[AckWaitInfo], None])

PERSISTER_ENCODING = 'utf-8'¶
_acks: AckManager¶
_continue_reupload(event_ids)¶
Parameters:

event_ids (list[str])

Return type:

None

_event_persister: PersisterInterface¶
_logger: ProactorLogger¶
_message_times: MessageTimes¶
_mqtt_clients: MQTTClients¶
_mqtt_codecs: dict[str, MQTTCodec]¶
_recv_activated(transition)¶
Parameters:

transition (Transition)

Return type:

None

_reupload_event(event_id)¶

Load event for event_id from storage, decoded to JSON and send it.

Return either Ok(True) or Err(Problems(list of decoding errors)).

Send errors handled either by exception, which will propagate up, or by ack timeout.

Parameters:

event_id (str)

Return type:

Ok[bool] | Err[Problems]

_reuploads: Reuploads¶
_settings: ProactorSettings¶
_start_reupload()¶
Return type:

None

_states: LinkStates¶
_stats: ProactorStats¶
property ack_manager: AckManager¶
add_mqtt_link(settings)¶
Parameters:

settings (LinkSettings)

Return type:

None

decode(link_name, topic, payload)¶
Parameters:
  • link_name (str)

  • topic (str)

  • payload (bytes)

Return type:

Message[Any]

decoder(link_name)¶
Parameters:

link_name (str)

Return type:

MQTTCodec | None

disable_mqtt_loggers()¶
Return type:

None

property downstream_client: str¶
enable_mqtt_loggers(logger=None)¶
Parameters:

logger (Logger | LoggerAdapter | None)

Return type:

None

generate_event(event)¶
Parameters:

event (EventT)

Return type:

Ok[bool] | Err[Exception]

get_message_times(link_name)¶
Parameters:

link_name (str)

Return type:

LinkMessageTimes

get_reuploads_str(verbose=True, num_events=5)¶
Parameters:
  • verbose (bool)

  • num_events (int)

Return type:

str

link(name)¶
Parameters:

name (str)

Return type:

LinkState | None

link_names()¶
Return type:

list[str]

link_state(name)¶
Parameters:

name (str)

Return type:

StateName | None

log_subscriptions(tag='')¶
Parameters:

tag (str)

Return type:

None

mqtt_client_wrapper(client_name)¶
Parameters:

client_name (str)

Return type:

MQTTClientWrapper

mqtt_clients()¶
Return type:

MQTTClients

num_acks(link_name)¶
Parameters:

link_name (str)

Return type:

int

property num_pending: int¶
property num_reupload_pending: int¶
property num_reuploaded_unacked: int¶
process_ack(link_name, message_id)¶
Parameters:
  • link_name (str)

  • message_id (str)

Return type:

None

process_ack_timeout(wait_info)¶
Parameters:

wait_info (AckWaitInfo)

Return type:

Ok[LinkManagerTransition] | Err[Exception]

process_mqtt_connect_fail(message)¶
Parameters:

message (Message[MQTTConnectFailPayload])

Return type:

Ok[Transition] | Err[InvalidCommStateInput]

process_mqtt_connected(message)¶
Parameters:

message (Message[MQTTConnectPayload])

Return type:

Ok[Transition] | Err[InvalidCommStateInput]

process_mqtt_disconnected(message)¶
Parameters:

message (Message[MQTTDisconnectPayload])

Return type:

Ok[LinkManagerTransition] | Err[InvalidCommStateInput]

process_mqtt_message(message)¶
Parameters:

message (Message[MQTTReceiptPayload])

Return type:

Ok[Transition] | Err[InvalidCommStateInput]

process_mqtt_suback(message)¶
Parameters:

message (Message[MQTTSubackPayload])

Return type:

Ok[Transition] | Err[InvalidCommStateInput]

publication_name: str¶
publish_message(link_name, message, qos=0, context=None)¶
Parameters:
  • link_name (str)

  • message (Message)

  • qos (int)

  • context (Any)

Return type:

MQTTMessageInfo

publish_upstream(payload, qos=QOS.AtMostOnce, **message_args)¶
Parameters:
  • payload (Any)

  • qos (QOS)

  • message_args (Any)

Return type:

MQTTMessageInfo

reuploading()¶
Return type:

bool

send_ack(link_name, message)¶
Parameters:
  • link_name (str)

  • message (Message[Any])

Return type:

None

async send_ping(link_name)¶
Parameters:

link_name (str)

Return type:

None

start(loop, async_queue)¶
Parameters:
  • loop (AbstractEventLoop)

  • async_queue (Queue)

Return type:

None

start_ping_tasks()¶
Return type:

list[Task]

stop()¶
Return type:

Ok[bool] | Err[Problems]

stopped(name)¶
Parameters:

name (str)

Return type:

bool

subscribe(client, topic, qos)¶
Parameters:
  • client (str)

  • topic (str)

  • qos (int)

Return type:

Tuple[int, int | None]

subscribed(link_name)¶
Parameters:

link_name (str)

Return type:

bool

subscription_name: str¶
subscription_str(tag='')¶
Parameters:

tag (str)

Return type:

str

topic_dst(client_name)¶
Parameters:

client_name (str)

Return type:

str

update_recv_time(link_name)¶
Parameters:

link_name (str)

Return type:

None

property upstream_client: str¶
class gwproactor.links.LinkManagerTransition(link_name: str = '', transition_name: gwproactor.links.link_state.TransitionName = <TransitionName.none: 'none'>, old_state: gwproactor.links.link_state.StateName = <StateName.not_started: 'not_started'>, new_state: gwproactor.links.link_state.StateName = <StateName.not_started: 'not_started'>, canceled_acks: list[gwproactor.links.acks.AckWaitInfo] = <factory>)¶
Parameters:
  • link_name (str)

  • transition_name (TransitionName)

  • old_state (StateName)

  • new_state (StateName)

  • canceled_acks (list[AckWaitInfo])

canceled_acks: list[AckWaitInfo]¶
class gwproactor.links.LinkMessageTimes(last_send: float = <factory>, last_recv: float = <factory>)¶
Parameters:
  • last_send (float)

  • last_recv (float)

get_str(*, link_poll_seconds=60.0, relative=True)¶
Parameters:
  • link_poll_seconds (float)

  • relative (bool)

Return type:

str

last_recv: float¶
last_send: float¶
next_ping_second(link_poll_seconds)¶
Parameters:

link_poll_seconds (float)

Return type:

float

seconds_until_next_ping(link_poll_seconds)¶
Parameters:

link_poll_seconds (float)

Return type:

float

time_to_send_ping(link_poll_seconds)¶
Parameters:

link_poll_seconds (float)

Return type:

bool

class gwproactor.links.LinkState(name, curr_state=StateName.not_started)¶
Parameters:
  • name (str)

  • curr_state (State)

_handle(result)¶
Parameters:

result (Ok[T] | Err[E])

Return type:

Ok[Transition] | Err[InvalidCommStateInput]

active()¶
Return type:

bool

active_for_recv()¶
Return type:

bool

active_for_send()¶
Return type:

bool

curr_state: State¶
in_state(state)¶
Parameters:

state (StateName)

Return type:

bool

name: str¶
process_ack_timeout()¶
Return type:

Ok[Transition] | Err[InvalidCommStateInput]

process_mqtt_connect_fail()¶
Return type:

Ok[Transition] | Err[InvalidCommStateInput]

process_mqtt_connected()¶
Return type:

Ok[Transition] | Err[InvalidCommStateInput]

process_mqtt_disconnected()¶
Return type:

Ok[Transition] | Err[InvalidCommStateInput]

process_mqtt_message()¶
Return type:

Ok[Transition] | Err[InvalidCommStateInput]

process_mqtt_suback(num_pending_subscriptions)¶
Parameters:

num_pending_subscriptions (int)

Return type:

Ok[Transition] | Err[InvalidCommStateInput]

start()¶
Return type:

Ok[Transition] | Err[InvalidCommStateInput]

property state: StateName¶
states: dict[StateName, State]¶
stop()¶
Return type:

Ok[Transition] | Err[InvalidCommStateInput]

class gwproactor.links.LinkStates(names=None)¶
Parameters:

names (Sequence[str] | None)

_links: dict[str, LinkState]¶
add(name, state=StateName.not_started)¶
Parameters:
  • name (str)

  • state (StateName)

Return type:

LinkState

link(name)¶
Parameters:

name (str)

Return type:

LinkState | None

link_names()¶
Return type:

list[str]

link_state(name)¶
Parameters:

name (str)

Return type:

StateName | None

process_ack_timeout(name)¶
Parameters:

name (str)

Return type:

Ok[Transition] | Err[InvalidCommStateInput]

process_mqtt_connect_fail(message)¶
Parameters:

message (Message[MQTTConnectFailPayload])

Return type:

Ok[Transition] | Err[InvalidCommStateInput]

process_mqtt_connected(message)¶
Parameters:

message (Message[MQTTConnectPayload])

Return type:

Ok[Transition] | Err[InvalidCommStateInput]

process_mqtt_disconnected(message)¶
Parameters:

message (Message[MQTTDisconnectPayload])

Return type:

Ok[Transition] | Err[InvalidCommStateInput]

process_mqtt_message(message)¶
Parameters:

message (Message[MQTTReceiptPayload])

Return type:

Ok[Transition] | Err[InvalidCommStateInput]

process_mqtt_suback(name, num_pending_subscriptions)¶
Parameters:
  • name (str)

  • num_pending_subscriptions (int)

Return type:

Ok[Transition] | Err[InvalidCommStateInput]

start(name)¶
Parameters:

name (str)

Return type:

Ok[Transition] | Err[InvalidCommStateInput]

start_all()¶
Return type:

Ok[bool] | Err[Sequence[Exception]]

stop(name)¶
Parameters:

name (str)

Return type:

Ok[Transition] | Err[InvalidCommStateInput]

stopped(name)¶
Parameters:

name (str)

Return type:

bool

class gwproactor.links.MQTTClientWrapper(client_name, topic_dst, client_config, receive_queue)¶
Parameters:
  • client_name (str)

  • topic_dst (str)

  • client_config (MQTTClient)

  • receive_queue (AsyncQueueWriter)

_client: Client¶
_client_config: MQTTClient¶
_client_name: str¶
_client_thread()¶
Return type:

None

_pending_subacks: Dict[int, List[str]]¶
_pending_subscriptions: Set[str]¶
_receive_queue: AsyncQueueWriter¶
_stop_requested: bool¶
_subscriptions: Dict[str, int]¶
connected()¶
Return type:

bool

disable_logger()¶
Return type:

None

enable_logger(logger=None)¶
Parameters:

logger (Logger | LoggerAdapter | None)

Return type:

None

handle_suback(suback)¶
Parameters:

suback (MQTTSubackPayload)

Return type:

int

property mqtt_client: Client¶
num_pending_subscriptions()¶
Return type:

int

num_subscriptions()¶
Return type:

int

on_connect(_, userdata, flags, rc)¶
Parameters:
  • _ (Any)

  • userdata (Any)

  • flags (dict)

  • rc (int)

Return type:

None

on_connect_fail(_, userdata)¶
Parameters:
  • _ (Any)

  • userdata (Any)

Return type:

None

on_disconnect(_, userdata, rc)¶
Parameters:
  • _ (Any)

  • userdata (Any)

  • rc (int)

Return type:

None

on_message(_, userdata, message)¶
Parameters:
  • _ (Any)

  • userdata (Any)

  • message (MQTTMessage)

Return type:

None

on_subscribe(_, userdata, mid, granted_qos)¶
Parameters:
  • _ (Any)

  • userdata (Any)

  • mid (int)

  • granted_qos (list[int])

Return type:

None

publish(topic, payload, qos)¶
Parameters:
  • topic (str)

  • payload (bytes)

  • qos (int)

Return type:

MQTTMessageInfo

start()¶
Return type:

None

stop()¶
Return type:

None

subscribe(topic, qos)¶
Parameters:
  • topic (str)

  • qos (int)

Return type:

Tuple[int, int | None]

subscribe_all()¶
Return type:

Tuple[int, int | None]

subscribed()¶
Return type:

bool

subscription_items()¶
Return type:

list[Tuple[str, int]]

topic_dst: str¶
unsubscribe(topic)¶
Parameters:

topic (str)

Return type:

Tuple[int, int | None]

class gwproactor.links.MQTTClients¶
_send_queue: AsyncQueueWriter¶
add_client(settings)¶
Parameters:

settings (LinkSettings)

Return type:

None

client_wrapper(client)¶
Parameters:

client (str)

Return type:

MQTTClientWrapper

clients: Dict[str, MQTTClientWrapper]¶
connected(client)¶
Parameters:

client (str)

Return type:

bool

disable_loggers()¶
Return type:

None

downstream_client: str = ''¶
enable_loggers(logger=None)¶
Parameters:

logger (Logger | LoggerAdapter | None)

Return type:

None

handle_suback(suback)¶
Parameters:

suback (MQTTSubackPayload)

Return type:

int

num_pending_subscriptions(client)¶
Parameters:

client (str)

Return type:

int

num_subscriptions(client)¶
Parameters:

client (str)

Return type:

int

primary_peer()¶
Return type:

MQTTClientWrapper

publish(client, topic, payload, qos)¶
Parameters:
  • client (str)

  • topic (str)

  • payload (bytes)

  • qos (int)

Return type:

MQTTMessageInfo

start(loop, async_queue)¶
Parameters:
  • loop (AbstractEventLoop)

  • async_queue (Queue)

Return type:

None

stop()¶
Return type:

None

subscribe(client, topic, qos)¶
Parameters:
  • client (str)

  • topic (str)

  • qos (int)

Return type:

Tuple[int, int | None]

subscribe_all(client)¶
Parameters:

client (str)

Return type:

Tuple[int, int | None]

subscribed(client)¶
Parameters:

client (str)

Return type:

bool

topic_dst(client)¶
Parameters:

client (str)

Return type:

str

unsubscribe(client, topic)¶
Parameters:
  • client (str)

  • topic (str)

Return type:

Tuple[int, int | None]

upstream()¶
Return type:

MQTTClientWrapper

upstream_client: str = ''¶
upstream_topic_dst: str = ''¶
class gwproactor.links.MessageTimes¶
_links: dict[str, LinkMessageTimes]¶
add_link(name)¶
Parameters:

name (str)

Return type:

None

get_copy(link_name)¶
Parameters:

link_name (str)

Return type:

LinkMessageTimes

link_names()¶
Return type:

list[str]

update_recv(link_name, now=None)¶
Parameters:
  • link_name (str)

  • now (float | None)

Return type:

None

update_send(link_name, now=None)¶
Parameters:
  • link_name (str)

  • now (float | None)

Return type:

None

class gwproactor.links.QOS(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)¶
AtLeastOnce = 1¶
AtMostOnce = 0¶
ExactlyOnce = 2¶
class gwproactor.links.Reuploads(logger, num_initial_events=5)¶

Track event uids that are part of a re-upload, both those that have not yet been sent (pending) and those that are “in-flight”, (unacked). Upon ack, update records and return next message to be sent.

Parameters:
  • logger (ProactorLogger)

  • num_initial_events (int)

NUM_INITIAL_EVENTS: int = 5¶

Default number of events to send when re-upload starts.

_log_start_reupload(num_pending_events, num_reupload_now)¶
Parameters:
  • num_pending_events (int)

  • num_reupload_now (int)

Return type:

None

_logger: ProactorLogger¶
_num_initial_events: int¶

Number of events to send when re-upload starts.

_reupload_pending: dict[str, None]¶

‘Ordered set’ of unsent events that are part of this reupload. A dict is used to provide insertion order and fast lookup.

_reuploaded_unacked: dict[str, None]¶

‘Ordered set’ of sent but as-yet unacked A dict is used to provide insertion order and fast lookup.

clear()¶
Return type:

None

clear_unacked_event(ack_id)¶
Parameters:

ack_id (str)

Return type:

None

get_str(*, verbose=True, num_events=5)¶
Parameters:
  • verbose (bool)

  • num_events (int)

Return type:

str

property logger: ProactorLogger¶
property num_reupload_pending: int¶
property num_reuploaded_unacked: int¶
process_ack_for_reupload(ack_id)¶

If ack_id is in our “unacked” store, remove it from the unacked store. If any events remain in our “pending” store, move the first pending event from the pending store to the unacked store and return it for sending next.

Parameters:

ack_id (str)

Return type:

list[str]

reuploading()¶
Return type:

bool

start_reupload(pending_events)¶

Track all pending_events for reupload. Of the pending_events, record the first _num_initial_events as “unacked” and the rest as “pending”. Return the “unacked” group so they can be sent.

Parameters:

pending_events (list[str])

Return type:

list[str]

stats: LinkStats | None = None¶

Object into which we can record reupload start and complete. Set during LinkManager.start() since upstream client does not exist durin LinkManager construction.

exception gwproactor.links.RuntimeLinkStateError(name='', current_state=StateName.none, transition=TransitionName.none, *, msg='')¶
Parameters:
  • name (str)

  • current_state (StateName)

  • transition (TransitionName)

  • msg (str)

Return type:

None

class gwproactor.links.StateName(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)¶
active = 'active'¶
awaiting_peer = 'awaiting_peer'¶
awaiting_setup = 'awaiting_setup'¶
awaiting_setup_and_peer = 'awaiting_setup_and_peer'¶
connecting = 'connecting'¶
none = 'none'¶
not_started = 'not_started'¶
stopped = 'stopped'¶
class gwproactor.links.Subscription(Topic, Qos)¶
Parameters:
  • Topic (str)

  • Qos (QOS)

Qos: QOS¶

Alias for field number 1

Topic: str¶

Alias for field number 0

_asdict()¶

Return a new dict which maps field names to their values.

_field_defaults = {}¶
_fields = ('Topic', 'Qos')¶
classmethod _make(iterable)¶

Make a new Subscription object from a sequence or iterable

_replace(**kwds)¶

Return a new Subscription object replacing specified fields with new values

class gwproactor.links.TimerManagerInterface¶

Simple interface to infrastructure which can start timers, run callbacks on timer completion, and cancel timers.

_abc_impl = <_abc._abc_data object>¶
abstract cancel_timer(timer_handle)¶

Cancel callback associated with _timer_handle_.

Note that callback might still run after this call returns.

Parameters:

timer_handle (Any) – The value returned by start_timer()

Return type:

None

abstract start_timer(delay_seconds, callback)¶

Start a timer. Implementation is expected to call _callback_ after approximately _delay_sceonds_.

The execution context (e.g. the thread) of the callback must be specified by the implemntation.

The callback must have sufficient context available to it do its work as well as to detect if it is no longer relevant. Note a callback might run after cancelation if the callack was already “in-flight” at time of cancellation and it is up to the callback to tolerate this situation.

Parameters:
  • delay_seconds (float) – The approximate delay before the callback is called.

  • callback (Callable[[], None]) – The function called after delay_seconds.

Returns:

A timer handle which can be passed to _cancel_timer()_ to cancel the callback.

Return type:

Any

class gwproactor.links.Transition(link_name: str = '', transition_name: gwproactor.links.link_state.TransitionName = <TransitionName.none: 'none'>, old_state: gwproactor.links.link_state.StateName = <StateName.not_started: 'not_started'>, new_state: gwproactor.links.link_state.StateName = <StateName.not_started: 'not_started'>)¶
Parameters:
  • link_name (str)

  • transition_name (TransitionName)

  • old_state (StateName)

  • new_state (StateName)

activated()¶
Return type:

bool

active()¶
Return type:

bool

deactivated()¶
Return type:

bool

link_name: str = ''¶
new_state: StateName = 'not_started'¶
old_state: StateName = 'not_started'¶
recv_activated()¶
Return type:

bool

recv_deactivated()¶
Return type:

bool

send_activated()¶
Return type:

bool

send_deactivated()¶
Return type:

bool

send_is_active()¶
Return type:

bool

transition_name: TransitionName = 'none'¶
class gwproactor.links.TransitionName(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)¶
message_from_peer = 'message_from_peer'¶
mqtt_connect_failed = 'mqtt_connect_failed'¶
mqtt_connected = 'mqtt_connected'¶
mqtt_disconnected = 'mqtt_disconnected'¶
mqtt_suback = 'mqtt_suback'¶
none = 'none'¶
response_timeout = 'response_timeout'¶
start_called = 'start_called'¶
stop_called = 'stop_called'¶
Next
gwproactor_test
Previous
gwproactor
Copyright © 2023, Jessica Millar
Made with Sphinx and @pradyunsg's Furo
On this page
  • gwproactor.links
    • AckManager
      • AckManager._acks
      • AckManager._callback
      • AckManager._default_delay_seconds
      • AckManager._pop_wait_info()
      • AckManager._timeout()
      • AckManager._timer_mgr
      • AckManager.add_link()
      • AckManager.cancel_ack_timer()
      • AckManager.cancel_ack_timers()
      • AckManager.default_delay_seconds
      • AckManager.num_acks()
      • AckManager.start_ack_timer()
    • AckWaitInfo
      • AckWaitInfo.context
      • AckWaitInfo.link_name
      • AckWaitInfo.message_id
      • AckWaitInfo.timer_handle
    • AsyncioTimerManager
      • AsyncioTimerManager._abc_impl
      • AsyncioTimerManager.cancel_timer()
      • AsyncioTimerManager.start_timer()
    • CommLinkAlreadyExists
    • CommLinkMissing
    • InvalidCommStateInput
      • InvalidCommStateInput.current_state
      • InvalidCommStateInput.name
      • InvalidCommStateInput.transition
    • LinkManager
      • LinkManager.PERSISTER_ENCODING
      • LinkManager._acks
      • LinkManager._continue_reupload()
      • LinkManager._event_persister
      • LinkManager._logger
      • LinkManager._message_times
      • LinkManager._mqtt_clients
      • LinkManager._mqtt_codecs
      • LinkManager._recv_activated()
      • LinkManager._reupload_event()
      • LinkManager._reuploads
      • LinkManager._settings
      • LinkManager._start_reupload()
      • LinkManager._states
      • LinkManager._stats
      • LinkManager.ack_manager
      • LinkManager.add_mqtt_link()
      • LinkManager.decode()
      • LinkManager.decoder()
      • LinkManager.disable_mqtt_loggers()
      • LinkManager.downstream_client
      • LinkManager.enable_mqtt_loggers()
      • LinkManager.generate_event()
      • LinkManager.get_message_times()
      • LinkManager.get_reuploads_str()
      • LinkManager.link()
      • LinkManager.link_names()
      • LinkManager.link_state()
      • LinkManager.log_subscriptions()
      • LinkManager.mqtt_client_wrapper()
      • LinkManager.mqtt_clients()
      • LinkManager.num_acks()
      • LinkManager.num_pending
      • LinkManager.num_reupload_pending
      • LinkManager.num_reuploaded_unacked
      • LinkManager.process_ack()
      • LinkManager.process_ack_timeout()
      • LinkManager.process_mqtt_connect_fail()
      • LinkManager.process_mqtt_connected()
      • LinkManager.process_mqtt_disconnected()
      • LinkManager.process_mqtt_message()
      • LinkManager.process_mqtt_suback()
      • LinkManager.publication_name
      • LinkManager.publish_message()
      • LinkManager.publish_upstream()
      • LinkManager.reuploading()
      • LinkManager.send_ack()
      • LinkManager.send_ping()
      • LinkManager.start()
      • LinkManager.start_ping_tasks()
      • LinkManager.stop()
      • LinkManager.stopped()
      • LinkManager.subscribe()
      • LinkManager.subscribed()
      • LinkManager.subscription_name
      • LinkManager.subscription_str()
      • LinkManager.topic_dst()
      • LinkManager.update_recv_time()
      • LinkManager.upstream_client
    • LinkManagerTransition
      • LinkManagerTransition.canceled_acks
    • LinkMessageTimes
      • LinkMessageTimes.get_str()
      • LinkMessageTimes.last_recv
      • LinkMessageTimes.last_send
      • LinkMessageTimes.next_ping_second()
      • LinkMessageTimes.seconds_until_next_ping()
      • LinkMessageTimes.time_to_send_ping()
    • LinkState
      • LinkState._handle()
      • LinkState.active()
      • LinkState.active_for_recv()
      • LinkState.active_for_send()
      • LinkState.curr_state
      • LinkState.in_state()
      • LinkState.name
      • LinkState.process_ack_timeout()
      • LinkState.process_mqtt_connect_fail()
      • LinkState.process_mqtt_connected()
      • LinkState.process_mqtt_disconnected()
      • LinkState.process_mqtt_message()
      • LinkState.process_mqtt_suback()
      • LinkState.start()
      • LinkState.state
      • LinkState.states
      • LinkState.stop()
    • LinkStates
      • LinkStates._links
      • LinkStates.add()
      • LinkStates.link()
      • LinkStates.link_names()
      • LinkStates.link_state()
      • LinkStates.process_ack_timeout()
      • LinkStates.process_mqtt_connect_fail()
      • LinkStates.process_mqtt_connected()
      • LinkStates.process_mqtt_disconnected()
      • LinkStates.process_mqtt_message()
      • LinkStates.process_mqtt_suback()
      • LinkStates.start()
      • LinkStates.start_all()
      • LinkStates.stop()
      • LinkStates.stopped()
    • MQTTClientWrapper
      • MQTTClientWrapper._client
      • MQTTClientWrapper._client_config
      • MQTTClientWrapper._client_name
      • MQTTClientWrapper._client_thread()
      • MQTTClientWrapper._pending_subacks
      • MQTTClientWrapper._pending_subscriptions
      • MQTTClientWrapper._receive_queue
      • MQTTClientWrapper._stop_requested
      • MQTTClientWrapper._subscriptions
      • MQTTClientWrapper.connected()
      • MQTTClientWrapper.disable_logger()
      • MQTTClientWrapper.enable_logger()
      • MQTTClientWrapper.handle_suback()
      • MQTTClientWrapper.mqtt_client
      • MQTTClientWrapper.num_pending_subscriptions()
      • MQTTClientWrapper.num_subscriptions()
      • MQTTClientWrapper.on_connect()
      • MQTTClientWrapper.on_connect_fail()
      • MQTTClientWrapper.on_disconnect()
      • MQTTClientWrapper.on_message()
      • MQTTClientWrapper.on_subscribe()
      • MQTTClientWrapper.publish()
      • MQTTClientWrapper.start()
      • MQTTClientWrapper.stop()
      • MQTTClientWrapper.subscribe()
      • MQTTClientWrapper.subscribe_all()
      • MQTTClientWrapper.subscribed()
      • MQTTClientWrapper.subscription_items()
      • MQTTClientWrapper.topic_dst
      • MQTTClientWrapper.unsubscribe()
    • MQTTClients
      • MQTTClients._send_queue
      • MQTTClients.add_client()
      • MQTTClients.client_wrapper()
      • MQTTClients.clients
      • MQTTClients.connected()
      • MQTTClients.disable_loggers()
      • MQTTClients.downstream_client
      • MQTTClients.enable_loggers()
      • MQTTClients.handle_suback()
      • MQTTClients.num_pending_subscriptions()
      • MQTTClients.num_subscriptions()
      • MQTTClients.primary_peer()
      • MQTTClients.publish()
      • MQTTClients.start()
      • MQTTClients.stop()
      • MQTTClients.subscribe()
      • MQTTClients.subscribe_all()
      • MQTTClients.subscribed()
      • MQTTClients.topic_dst()
      • MQTTClients.unsubscribe()
      • MQTTClients.upstream()
      • MQTTClients.upstream_client
      • MQTTClients.upstream_topic_dst
    • MessageTimes
      • MessageTimes._links
      • MessageTimes.add_link()
      • MessageTimes.get_copy()
      • MessageTimes.link_names()
      • MessageTimes.update_recv()
      • MessageTimes.update_send()
    • QOS
      • QOS.AtLeastOnce
      • QOS.AtMostOnce
      • QOS.ExactlyOnce
    • Reuploads
      • Reuploads.NUM_INITIAL_EVENTS
      • Reuploads._log_start_reupload()
      • Reuploads._logger
      • Reuploads._num_initial_events
      • Reuploads._reupload_pending
      • Reuploads._reuploaded_unacked
      • Reuploads.clear()
      • Reuploads.clear_unacked_event()
      • Reuploads.get_str()
      • Reuploads.logger
      • Reuploads.num_reupload_pending
      • Reuploads.num_reuploaded_unacked
      • Reuploads.process_ack_for_reupload()
      • Reuploads.reuploading()
      • Reuploads.start_reupload()
      • Reuploads.stats
    • RuntimeLinkStateError
    • StateName
      • StateName.active
      • StateName.awaiting_peer
      • StateName.awaiting_setup
      • StateName.awaiting_setup_and_peer
      • StateName.connecting
      • StateName.none
      • StateName.not_started
      • StateName.stopped
    • Subscription
      • Subscription.Qos
      • Subscription.Topic
      • Subscription._asdict()
      • Subscription._field_defaults
      • Subscription._fields
      • Subscription._make()
      • Subscription._replace()
    • TimerManagerInterface
      • TimerManagerInterface._abc_impl
      • TimerManagerInterface.cancel_timer()
      • TimerManagerInterface.start_timer()
    • Transition
      • Transition.activated()
      • Transition.active()
      • Transition.deactivated()
      • Transition.link_name
      • Transition.new_state
      • Transition.old_state
      • Transition.recv_activated()
      • Transition.recv_deactivated()
      • Transition.send_activated()
      • Transition.send_deactivated()
      • Transition.send_is_active()
      • Transition.transition_name
    • TransitionName
      • TransitionName.message_from_peer
      • TransitionName.mqtt_connect_failed
      • TransitionName.mqtt_connected
      • TransitionName.mqtt_disconnected
      • TransitionName.mqtt_suback
      • TransitionName.none
      • TransitionName.response_timeout
      • TransitionName.start_called
      • TransitionName.stop_called