runtimepy.net package

Contents

runtimepy.net package#

Subpackages#

Submodules#

runtimepy.net.backoff module#

A module implementing a simple exponential-backoff interface.

class runtimepy.net.backoff.ExponentialBackoff(interval: float = 0.1, max_sleep: float = 10.0, max_tries: int = 20)[source]#

Bases: object

A class implementing a simple exponential-backoff handler.

attempt: int#
property give_up: bool#

Determine whether or not to give up based on the number of attempts.

reset() None[source]#

Reset this instance’s state.

async sleep() None[source]#

Sleep for the correct amount of time.

wait: float#

runtimepy.net.connection module#

A module implementing a network-connection interface.

class runtimepy.net.connection.Connection(logger: Logger | LoggerAdapter[Any], env: ChannelEnvironment = None, add_metrics: bool = True)[source]#

Bases: LoggerMixinLevelControl, ChannelEnvironmentMixin, ABC

A connection interface.

async async_init() bool[source]#

A runtime initialization routine (executes during ‘process’).

property auto_restart: bool#

Determine if this connection should be automatically restarted.

byte_order: ByteOrder = 4#
async close() None[source]#

Close this connection.

connected = True#
default_auto_restart = False#
disable(reason: str) None[source]#

Disable this connection.

disable_extra() None[source]#

Additional tasks to perform when disabling.

async disable_in(time: float) None[source]#

A method for disabling a connection after some delay.

property disabled: bool#

Determine if this connection is disabled.

init() None[source]#

Initialize this instance.

log_metrics(label: str = 'conn') None[source]#

Log connection metrics.

async process(stop_sig: Event = None, disable_time: float = None, backoff: ExponentialBackoff = None) None[source]#

Process tasks for this connection while the connection is active.

async process_binary(data: bytes) bool[source]#

Process a binary frame.

async process_text(data: str) bool[source]#

Process a text frame.

async restart() bool[source]#

Reset necessary underlying state for this connection to ‘process’ again.

send_binary(data: bytes | bytearray | memoryview) None[source]#

Enqueue a binary message tos end.

send_text(data: str) None[source]#

Enqueue a text message to send.

uses_binary_tx_queue = True#
uses_text_tx_queue = True#
class runtimepy.net.connection.EchoConnection(logger: Logger | LoggerAdapter[Any], env: ChannelEnvironment = None, add_metrics: bool = True)[source]#

Bases: Connection

A connection that just echoes what it was sent.

async process_binary(data: bytes) bool[source]#

Process a binary frame.

async process_text(data: str) bool[source]#

Process a text frame.

class runtimepy.net.connection.NullConnection(logger: Logger | LoggerAdapter[Any], env: ChannelEnvironment = None, add_metrics: bool = True)[source]#

Bases: Connection

A connection that doesn’t do anything with incoming data.

async process_binary(data: bytes) bool[source]#

Process a binary frame.

async process_text(data: str) bool[source]#

Process a text frame.

runtimepy.net.manager module#

A module implementing a connection manager.

class runtimepy.net.manager.ConnectionManager[source]#

Bases: LoggerMixin

A class for managing connection processing at runtime.

by_type(kind: type[T]) Iterator[T][source]#

Iterate over connections of a specific type.

async manage(stop_sig: Event) None[source]#

Handle incoming connections until the stop signal is set.

property num_connections: int#

Return the number of managed connections.

poll_metrics(time_ns: int = None) None[source]#

Poll connection metrics.

reset_metrics() None[source]#

Reset connection metrics.

runtimepy.net.mixin module#

Various networking-related class utilities.

class runtimepy.net.mixin.BinaryMessageQueueMixin[source]#

Bases: object

A mixin for adding a ‘queue’ attribute.

class runtimepy.net.mixin.TransportMixin(transport: BaseTransport)[source]#

Bases: object

A class simplifying evaluation of local and remote addresses.

logger_name(prefix: str = '') str[source]#

Get a logger name for this connection.

remote_address: IPv4Host | IPv6Host | None#
set_transport(transport: BaseTransport) None[source]#

Set the transport for this instance.

property socket: socket#

Get this instance’s underlying socket.

runtimepy.net.util module#

A module implementing various networking utilities.

class runtimepy.net.util.IPv4Host(name: str = '', port: int = 0)[source]#

Bases: NamedTuple

See: https://docs.python.org/3/library/socket.html#socket-families.

name: str#

Alias for field number 0

port: int#

Alias for field number 1

class runtimepy.net.util.IPv6Host(name: str = '', port: int = 0, flowinfo: int = 0, scope_id: int = 0)[source]#

Bases: NamedTuple

See: https://docs.python.org/3/library/socket.html#socket-families.

flowinfo: int#

Alias for field number 2

name: str#

Alias for field number 0

port: int#

Alias for field number 1

scope_id: int#

Alias for field number 3

runtimepy.net.util.get_free_socket(local: IPv4Host | IPv6Host = None, kind: int = SocketKind.SOCK_STREAM, reuse: bool = False) socket[source]#

Attempt to get an available socket.

runtimepy.net.util.get_free_socket_name(local: IPv4Host | IPv6Host = None, kind: int = SocketKind.SOCK_STREAM) IPv4Host | IPv6Host[source]#

Create a socket to determine an arbitrary port number that’s available. There is an inherent race condition using this strategy.

runtimepy.net.util.hostname(ip_address: str) str[source]#

Attempt to get a string hostname for a string IP address argument that ‘gethostbyaddr’ accepts. Otherwise return the original string

runtimepy.net.util.hostname_port(ip_address: str, port: int) str[source]#

Get a hostname string with a port appended.

runtimepy.net.util.normalize_host(*args: str | int | IPv4Host | IPv6Host | None) IPv4Host | IPv6Host[source]#

Get a host object from caller parameters.

runtimepy.net.util.sockname(sock: socket) IPv4Host | IPv6Host[source]#

Get address information from a socket.

async runtimepy.net.util.try_log_connection_error(future: Awaitable[T], message: str, logger: Logger | LoggerAdapter[Any] = None) T | None[source]#

Attempt to resolve a future but log any connection-related exceptions.

Module contents#

A module aggregating commonly used networking interface.

class runtimepy.net.Connection(logger: Logger | LoggerAdapter[Any], env: ChannelEnvironment = None, add_metrics: bool = True)[source]#

Bases: LoggerMixinLevelControl, ChannelEnvironmentMixin, ABC

A connection interface.

async async_init() bool[source]#

A runtime initialization routine (executes during ‘process’).

property auto_restart: bool#

Determine if this connection should be automatically restarted.

byte_order: ByteOrder = 4#
async close() None[source]#

Close this connection.

connected = True#
default_auto_restart = False#
disable(reason: str) None[source]#

Disable this connection.

disable_extra() None[source]#

Additional tasks to perform when disabling.

async disable_in(time: float) None[source]#

A method for disabling a connection after some delay.

property disabled: bool#

Determine if this connection is disabled.

env: ChannelEnvironment#
init() None[source]#

Initialize this instance.

log_metrics(label: str = 'conn') None[source]#

Log connection metrics.

logger: LoggerType#
async process(stop_sig: Event = None, disable_time: float = None, backoff: ExponentialBackoff = None) None[source]#

Process tasks for this connection while the connection is active.

async process_binary(data: bytes) bool[source]#

Process a binary frame.

async process_text(data: str) bool[source]#

Process a text frame.

async restart() bool[source]#

Reset necessary underlying state for this connection to ‘process’ again.

send_binary(data: bytes | bytearray | memoryview) None[source]#

Enqueue a binary message tos end.

send_text(data: str) None[source]#

Enqueue a text message to send.

tx_binary_hwm: int#
tx_text_hwm: int#
uses_binary_tx_queue = True#
uses_text_tx_queue = True#
class runtimepy.net.ConnectionManager[source]#

Bases: LoggerMixin

A class for managing connection processing at runtime.

by_type(kind: type[T]) Iterator[T][source]#

Iterate over connections of a specific type.

async manage(stop_sig: Event) None[source]#

Handle incoming connections until the stop signal is set.

property num_connections: int#

Return the number of managed connections.

poll_metrics(time_ns: int = None) None[source]#

Poll connection metrics.

reset_metrics() None[source]#

Reset connection metrics.

class runtimepy.net.EchoConnection(logger: Logger | LoggerAdapter[Any], env: ChannelEnvironment = None, add_metrics: bool = True)[source]#

Bases: Connection

A connection that just echoes what it was sent.

async process_binary(data: bytes) bool[source]#

Process a binary frame.

async process_text(data: str) bool[source]#

Process a text frame.

class runtimepy.net.ExponentialBackoff(interval: float = 0.1, max_sleep: float = 10.0, max_tries: int = 20)[source]#

Bases: object

A class implementing a simple exponential-backoff handler.

attempt: int#
property give_up: bool#

Determine whether or not to give up based on the number of attempts.

reset() None[source]#

Reset this instance’s state.

async sleep() None[source]#

Sleep for the correct amount of time.

wait: float#
class runtimepy.net.IPv4Host(name: str = '', port: int = 0)[source]#

Bases: NamedTuple

See: https://docs.python.org/3/library/socket.html#socket-families.

name: str#

Alias for field number 0

port: int#

Alias for field number 1

class runtimepy.net.IPv6Host(name: str = '', port: int = 0, flowinfo: int = 0, scope_id: int = 0)[source]#

Bases: NamedTuple

See: https://docs.python.org/3/library/socket.html#socket-families.

flowinfo: int#

Alias for field number 2

name: str#

Alias for field number 0

port: int#

Alias for field number 1

scope_id: int#

Alias for field number 3

class runtimepy.net.NullConnection(logger: Logger | LoggerAdapter[Any], env: ChannelEnvironment = None, add_metrics: bool = True)[source]#

Bases: Connection

A connection that doesn’t do anything with incoming data.

async process_binary(data: bytes) bool[source]#

Process a binary frame.

async process_text(data: str) bool[source]#

Process a text frame.

runtimepy.net.get_free_socket(local: IPv4Host | IPv6Host = None, kind: int = SocketKind.SOCK_STREAM, reuse: bool = False) socket[source]#

Attempt to get an available socket.

runtimepy.net.get_free_socket_name(local: IPv4Host | IPv6Host = None, kind: int = SocketKind.SOCK_STREAM) IPv4Host | IPv6Host[source]#

Create a socket to determine an arbitrary port number that’s available. There is an inherent race condition using this strategy.

runtimepy.net.hostname(ip_address: str) str[source]#

Attempt to get a string hostname for a string IP address argument that ‘gethostbyaddr’ accepts. Otherwise return the original string

runtimepy.net.normalize_host(*args: str | int | IPv4Host | IPv6Host | None) IPv4Host | IPv6Host[source]#

Get a host object from caller parameters.

runtimepy.net.sockname(sock: socket) IPv4Host | IPv6Host[source]#

Get address information from a socket.

async runtimepy.net.try_log_connection_error(future: Awaitable[T], message: str, logger: Logger | LoggerAdapter[Any] = None) T | None[source]#

Attempt to resolve a future but log any connection-related exceptions.