runtimepy.net
index
/home/vkottler/src/vkottler/workspace/runtimepy/runtimepy/net/__init__.py

A module aggregating commonly used networking interface.

 
Package Contents
       
apps (package)
arbiter (package)
backoff
connection
factories (package)
http (package)
manager
mixin
server (package)
stream (package)
tcp (package)
udp (package)
util
websocket (package)

 
Classes
       
abc.ABC(builtins.object)
runtimepy.net.connection.Connection(runtimepy.mixins.logging.LoggerMixinLevelControl, runtimepy.mixins.environment.ChannelEnvironmentMixin, abc.ABC)
runtimepy.net.connection.EchoConnection
runtimepy.net.connection.NullConnection
builtins.object
runtimepy.net.backoff.ExponentialBackoff
builtins.tuple(builtins.object)
runtimepy.net.util.IPv4Host
runtimepy.net.util.IPv6Host
runtimepy.mixins.environment.ChannelEnvironmentMixin(builtins.object)
runtimepy.net.connection.Connection(runtimepy.mixins.logging.LoggerMixinLevelControl, runtimepy.mixins.environment.ChannelEnvironmentMixin, abc.ABC)
runtimepy.net.connection.EchoConnection
runtimepy.net.connection.NullConnection
runtimepy.mixins.logging.LoggerMixinLevelControl(vcorelib.logging.LoggerMixin)
runtimepy.net.connection.Connection(runtimepy.mixins.logging.LoggerMixinLevelControl, runtimepy.mixins.environment.ChannelEnvironmentMixin, abc.ABC)
runtimepy.net.connection.EchoConnection
runtimepy.net.connection.NullConnection
vcorelib.logging.LoggerMixin(builtins.object)
runtimepy.net.manager.ConnectionManager

 
class Connection(runtimepy.mixins.logging.LoggerMixinLevelControl, runtimepy.mixins.environment.ChannelEnvironmentMixin, abc.ABC)
    Connection(logger: Union[logging.Logger, logging.LoggerAdapter[Any]], env: runtimepy.channel.environment.ChannelEnvironment = None, add_metrics: bool = True) -> None
 
A connection interface.
 
 
Method resolution order:
Connection
runtimepy.mixins.logging.LoggerMixinLevelControl
vcorelib.logging.LoggerMixin
runtimepy.mixins.environment.ChannelEnvironmentMixin
abc.ABC
builtins.object

Methods defined here:
__init__(self, logger: Union[logging.Logger, logging.LoggerAdapter[Any]], env: runtimepy.channel.environment.ChannelEnvironment = None, add_metrics: bool = True) -> None
Initialize this connection.
async async_init(self) -> bool
A runtime initialization routine (executes during 'process').
async close(self) -> None
Close this connection.
disable(self, reason: str) -> None
Disable this connection.
disable_extra(self) -> None
Additional tasks to perform when disabling.
async disable_in(self, time: float) -> None
A method for disabling a connection after some delay.
init(self) -> None
Initialize this instance.
log_metrics(self, label: str = 'conn') -> None
Log connection metrics.
async process(self, stop_sig: asyncio.locks.Event = None, disable_time: float = None, backoff: runtimepy.net.backoff.ExponentialBackoff = None) -> None
Process tasks for this connection while the connection is active.
async process_binary(self, data: bytes) -> bool
Process a binary frame.
async process_text(self, data: str) -> bool
Process a text frame.
async restart(self) -> bool
Reset necessary underlying state for this connection to 'process'
again.
send_binary(self, data: Union[bytes, bytearray, memoryview]) -> None
Enqueue a binary message tos end.
send_text(self, data: str) -> None
Enqueue a text message to send.

Readonly properties defined here:
auto_restart
Determine if this connection should be automatically restarted.
disabled
Determine if this connection is disabled.

Data and other attributes defined here:
__abstractmethods__ = frozenset()
__annotations__ = {'byte_order': <enum 'ByteOrder'>}
byte_order = <ByteOrder.NETWORK: 4>
connected = True
default_auto_restart = False
uses_binary_tx_queue = True
uses_text_tx_queue = True

Methods inherited from runtimepy.mixins.logging.LoggerMixinLevelControl:
setup_level_channel(self, env: runtimepy.channel.environment.ChannelEnvironment, name: str = 'log_level', initial: str = 'info', description: str = 'Text-log level filter for this environment.') -> None
Add a commandable log-level channel to the environment.

Methods inherited from vcorelib.logging.LoggerMixin:
log_time(self, message: str, *args, level: int = 20, reminder: bool = False, **kwargs) -> Iterator[NoneType]
A simple wrapper.

Data descriptors inherited from vcorelib.logging.LoggerMixin:
__dict__
dictionary for instance variables
__weakref__
list of weak references to the object

Methods inherited from runtimepy.mixins.environment.ChannelEnvironmentMixin:
register_channel_metrics(self, name: str, channel: runtimepy.metrics.channel.ChannelMetrics, verb: str) -> None
Register individual channel metrics.
register_connection_metrics(self, metrics: runtimepy.metrics.connection.ConnectionMetrics, namespace: str = 'metrics') -> None
Register connection metrics.
register_task_metrics(self, metrics: runtimepy.metrics.task.PeriodicTaskMetrics, namespace: str = 'metrics') -> None
Register periodic task metrics.

 
class ConnectionManager(vcorelib.logging.LoggerMixin)
    ConnectionManager() -&gt; 'None'
 
A class for managing connection processing at runtime.
 
 
Method resolution order:
ConnectionManager
vcorelib.logging.LoggerMixin
builtins.object

Methods defined here:
__init__(self) -> 'None'
Initialize this connection manager.
by_type(self, kind: 'type[T]') -> '_Iterator[T]'
Iterate over connections of a specific type.
async manage(self, stop_sig: '_asyncio.Event') -> 'None'
Handle incoming connections until the stop signal is set.
poll_metrics(self, time_ns: 'int' = None) -> 'None'
Poll connection metrics.
reset_metrics(self) -> 'None'
Reset connection metrics.

Readonly properties defined here:
num_connections
Return the number of managed connections.

Data and other attributes defined here:
__annotations__ = {}

Methods inherited from vcorelib.logging.LoggerMixin:
log_time(self, message: str, *args, level: int = 20, reminder: bool = False, **kwargs) -> Iterator[NoneType]
A simple wrapper.

Data descriptors inherited from vcorelib.logging.LoggerMixin:
__dict__
dictionary for instance variables
__weakref__
list of weak references to the object

 
class EchoConnection(Connection)
    EchoConnection(logger: Union[logging.Logger, logging.LoggerAdapter[Any]], env: runtimepy.channel.environment.ChannelEnvironment = None, add_metrics: bool = True) -&gt; None
 
A connection that just echoes what it was sent.
 
 
Method resolution order:
EchoConnection
Connection
runtimepy.mixins.logging.LoggerMixinLevelControl
vcorelib.logging.LoggerMixin
runtimepy.mixins.environment.ChannelEnvironmentMixin
abc.ABC
builtins.object

Methods defined here:
async process_binary(self, data: bytes) -> bool
Process a binary frame.
async process_text(self, data: str) -> bool
Process a text frame.

Data and other attributes defined here:
__abstractmethods__ = frozenset()
__annotations__ = {}

Methods inherited from Connection:
__init__(self, logger: Union[logging.Logger, logging.LoggerAdapter[Any]], env: runtimepy.channel.environment.ChannelEnvironment = None, add_metrics: bool = True) -> None
Initialize this connection.
async async_init(self) -> bool
A runtime initialization routine (executes during 'process').
async close(self) -> None
Close this connection.
disable(self, reason: str) -> None
Disable this connection.
disable_extra(self) -> None
Additional tasks to perform when disabling.
async disable_in(self, time: float) -> None
A method for disabling a connection after some delay.
init(self) -> None
Initialize this instance.
log_metrics(self, label: str = 'conn') -> None
Log connection metrics.
async process(self, stop_sig: asyncio.locks.Event = None, disable_time: float = None, backoff: runtimepy.net.backoff.ExponentialBackoff = None) -> None
Process tasks for this connection while the connection is active.
async restart(self) -> bool
Reset necessary underlying state for this connection to 'process'
again.
send_binary(self, data: Union[bytes, bytearray, memoryview]) -> None
Enqueue a binary message tos end.
send_text(self, data: str) -> None
Enqueue a text message to send.

Readonly properties inherited from Connection:
auto_restart
Determine if this connection should be automatically restarted.
disabled
Determine if this connection is disabled.

Data and other attributes inherited from Connection:
byte_order = <ByteOrder.NETWORK: 4>
connected = True
default_auto_restart = False
uses_binary_tx_queue = True
uses_text_tx_queue = True

Methods inherited from runtimepy.mixins.logging.LoggerMixinLevelControl:
setup_level_channel(self, env: runtimepy.channel.environment.ChannelEnvironment, name: str = 'log_level', initial: str = 'info', description: str = 'Text-log level filter for this environment.') -> None
Add a commandable log-level channel to the environment.

Methods inherited from vcorelib.logging.LoggerMixin:
log_time(self, message: str, *args, level: int = 20, reminder: bool = False, **kwargs) -> Iterator[NoneType]
A simple wrapper.

Data descriptors inherited from vcorelib.logging.LoggerMixin:
__dict__
dictionary for instance variables
__weakref__
list of weak references to the object

Methods inherited from runtimepy.mixins.environment.ChannelEnvironmentMixin:
register_channel_metrics(self, name: str, channel: runtimepy.metrics.channel.ChannelMetrics, verb: str) -> None
Register individual channel metrics.
register_connection_metrics(self, metrics: runtimepy.metrics.connection.ConnectionMetrics, namespace: str = 'metrics') -> None
Register connection metrics.
register_task_metrics(self, metrics: runtimepy.metrics.task.PeriodicTaskMetrics, namespace: str = 'metrics') -> None
Register periodic task metrics.

 
class ExponentialBackoff(builtins.object)
    ExponentialBackoff(interval: float = 0.1, max_sleep: float = 10.0, max_tries: int = 20) -&gt; None
 
A class implementing a simple exponential-backoff handler.
 
  Methods defined here:
__init__(self, interval: float = 0.1, max_sleep: float = 10.0, max_tries: int = 20) -> None
Initialize this instance.
reset(self) -> None
Reset this instance's state.
async sleep(self) -> None
Sleep for the correct amount of time.

Readonly properties defined here:
give_up
Determine whether or not to give up based on the number of attempts.

Data descriptors defined here:
__dict__
dictionary for instance variables
__weakref__
list of weak references to the object

Data and other attributes defined here:
__annotations__ = {'attempt': <class 'int'>, 'wait': <class 'float'>}

 
class IPv4Host(builtins.tuple)
    IPv4Host(name: str = '', port: int = 0)
 
See: https://docs.python.org/3/library/socket.html#socket-families.
 
 
Method resolution order:
IPv4Host
builtins.tuple
builtins.object

Methods defined here:
__getnewargs__(self)
Return self as a plain tuple.  Used by copy and pickle.
__repr__(self)
Return a nicely formatted representation string
__str__(self) -> str
Get this host as a string.
_asdict(self)
Return a new dict which maps field names to their values.
_replace(self, /, **kwds)
Return a new IPv4Host object replacing specified fields with new values

Class methods defined here:
_make(iterable) from builtins.type
Make a new IPv4Host object from a sequence or iterable

Static methods defined here:
__new__(_cls, name: str = '', port: int = 0)
Create new instance of IPv4Host(name, port)

Data descriptors defined here:
name
Alias for field number 0
port
Alias for field number 1

Data and other attributes defined here:
__annotations__ = {'name': <class 'str'>, 'port': <class 'int'>}
__match_args__ = ('name', 'port')
__orig_bases__ = (<function NamedTuple>,)
_field_defaults = {'name': '', 'port': 0}
_fields = ('name', 'port')

Methods inherited from builtins.tuple:
__add__(self, value, /)
Return self+value.
__contains__(self, key, /)
Return bool(key in self).
__eq__(self, value, /)
Return self==value.
__ge__(self, value, /)
Return self>=value.
__getattribute__(self, name, /)
Return getattr(self, name).
__getitem__(self, key, /)
Return self[key].
__gt__(self, value, /)
Return self>value.
__hash__(self, /)
Return hash(self).
__iter__(self, /)
Implement iter(self).
__le__(self, value, /)
Return self<=value.
__len__(self, /)
Return len(self).
__lt__(self, value, /)
Return self<value.
__mul__(self, value, /)
Return self*value.
__ne__(self, value, /)
Return self!=value.
__rmul__(self, value, /)
Return value*self.
count(self, value, /)
Return number of occurrences of value.
index(self, value, start=0, stop=9223372036854775807, /)
Return first index of value.
 
Raises ValueError if the value is not present.

Class methods inherited from builtins.tuple:
__class_getitem__(...) from builtins.type
See PEP 585

 
class IPv6Host(builtins.tuple)
    IPv6Host(name: str = '', port: int = 0, flowinfo: int = 0, scope_id: int = 0)
 
See: https://docs.python.org/3/library/socket.html#socket-families.
 
 
Method resolution order:
IPv6Host
builtins.tuple
builtins.object

Methods defined here:
__getnewargs__(self)
Return self as a plain tuple.  Used by copy and pickle.
__repr__(self)
Return a nicely formatted representation string
__str__(self) -> str
Get this host as a string.
_asdict(self)
Return a new dict which maps field names to their values.
_replace(self, /, **kwds)
Return a new IPv6Host object replacing specified fields with new values

Class methods defined here:
_make(iterable) from builtins.type
Make a new IPv6Host object from a sequence or iterable

Static methods defined here:
__new__(_cls, name: str = '', port: int = 0, flowinfo: int = 0, scope_id: int = 0)
Create new instance of IPv6Host(name, port, flowinfo, scope_id)

Data descriptors defined here:
name
Alias for field number 0
port
Alias for field number 1
flowinfo
Alias for field number 2
scope_id
Alias for field number 3

Data and other attributes defined here:
__annotations__ = {'flowinfo': <class 'int'>, 'name': <class 'str'>, 'port': <class 'int'>, 'scope_id': <class 'int'>}
__match_args__ = ('name', 'port', 'flowinfo', 'scope_id')
__orig_bases__ = (<function NamedTuple>,)
_field_defaults = {'flowinfo': 0, 'name': '', 'port': 0, 'scope_id': 0}
_fields = ('name', 'port', 'flowinfo', 'scope_id')

Methods inherited from builtins.tuple:
__add__(self, value, /)
Return self+value.
__contains__(self, key, /)
Return bool(key in self).
__eq__(self, value, /)
Return self==value.
__ge__(self, value, /)
Return self>=value.
__getattribute__(self, name, /)
Return getattr(self, name).
__getitem__(self, key, /)
Return self[key].
__gt__(self, value, /)
Return self>value.
__hash__(self, /)
Return hash(self).
__iter__(self, /)
Implement iter(self).
__le__(self, value, /)
Return self<=value.
__len__(self, /)
Return len(self).
__lt__(self, value, /)
Return self<value.
__mul__(self, value, /)
Return self*value.
__ne__(self, value, /)
Return self!=value.
__rmul__(self, value, /)
Return value*self.
count(self, value, /)
Return number of occurrences of value.
index(self, value, start=0, stop=9223372036854775807, /)
Return first index of value.
 
Raises ValueError if the value is not present.

Class methods inherited from builtins.tuple:
__class_getitem__(...) from builtins.type
See PEP 585

 
class NullConnection(Connection)
    NullConnection(logger: Union[logging.Logger, logging.LoggerAdapter[Any]], env: runtimepy.channel.environment.ChannelEnvironment = None, add_metrics: bool = True) -&gt; None
 
A connection that doesn't do anything with incoming data.
 
 
Method resolution order:
NullConnection
Connection
runtimepy.mixins.logging.LoggerMixinLevelControl
vcorelib.logging.LoggerMixin
runtimepy.mixins.environment.ChannelEnvironmentMixin
abc.ABC
builtins.object

Methods defined here:
async process_binary(self, data: bytes) -> bool
Process a binary frame.
async process_text(self, data: str) -> bool
Process a text frame.

Data and other attributes defined here:
__abstractmethods__ = frozenset()
__annotations__ = {}

Methods inherited from Connection:
__init__(self, logger: Union[logging.Logger, logging.LoggerAdapter[Any]], env: runtimepy.channel.environment.ChannelEnvironment = None, add_metrics: bool = True) -> None
Initialize this connection.
async async_init(self) -> bool
A runtime initialization routine (executes during 'process').
async close(self) -> None
Close this connection.
disable(self, reason: str) -> None
Disable this connection.
disable_extra(self) -> None
Additional tasks to perform when disabling.
async disable_in(self, time: float) -> None
A method for disabling a connection after some delay.
init(self) -> None
Initialize this instance.
log_metrics(self, label: str = 'conn') -> None
Log connection metrics.
async process(self, stop_sig: asyncio.locks.Event = None, disable_time: float = None, backoff: runtimepy.net.backoff.ExponentialBackoff = None) -> None
Process tasks for this connection while the connection is active.
async restart(self) -> bool
Reset necessary underlying state for this connection to 'process'
again.
send_binary(self, data: Union[bytes, bytearray, memoryview]) -> None
Enqueue a binary message tos end.
send_text(self, data: str) -> None
Enqueue a text message to send.

Readonly properties inherited from Connection:
auto_restart
Determine if this connection should be automatically restarted.
disabled
Determine if this connection is disabled.

Data and other attributes inherited from Connection:
byte_order = <ByteOrder.NETWORK: 4>
connected = True
default_auto_restart = False
uses_binary_tx_queue = True
uses_text_tx_queue = True

Methods inherited from runtimepy.mixins.logging.LoggerMixinLevelControl:
setup_level_channel(self, env: runtimepy.channel.environment.ChannelEnvironment, name: str = 'log_level', initial: str = 'info', description: str = 'Text-log level filter for this environment.') -> None
Add a commandable log-level channel to the environment.

Methods inherited from vcorelib.logging.LoggerMixin:
log_time(self, message: str, *args, level: int = 20, reminder: bool = False, **kwargs) -> Iterator[NoneType]
A simple wrapper.

Data descriptors inherited from vcorelib.logging.LoggerMixin:
__dict__
dictionary for instance variables
__weakref__
list of weak references to the object

Methods inherited from runtimepy.mixins.environment.ChannelEnvironmentMixin:
register_channel_metrics(self, name: str, channel: runtimepy.metrics.channel.ChannelMetrics, verb: str) -> None
Register individual channel metrics.
register_connection_metrics(self, metrics: runtimepy.metrics.connection.ConnectionMetrics, namespace: str = 'metrics') -> None
Register connection metrics.
register_task_metrics(self, metrics: runtimepy.metrics.task.PeriodicTaskMetrics, namespace: str = 'metrics') -> None
Register periodic task metrics.

 
Functions
       
get_free_socket(local: Union[runtimepy.net.util.IPv4Host, runtimepy.net.util.IPv6Host] = None, kind: int = <SocketKind.SOCK_STREAM: 1>, reuse: bool = False) -> _socket.socket
Attempt to get an available socket.
get_free_socket_name(local: Union[runtimepy.net.util.IPv4Host, runtimepy.net.util.IPv6Host] = None, kind: int = <SocketKind.SOCK_STREAM: 1>) -> Union[runtimepy.net.util.IPv4Host, runtimepy.net.util.IPv6Host]
Create a socket to determine an arbitrary port number that's available.
There is an inherent race condition using this strategy.
hostname(ip_address: str) -> str
Attempt to get a string hostname for a string IP address argument that
'gethostbyaddr' accepts. Otherwise return the original string
normalize_host(*args: Union[str, int, runtimepy.net.util.IPv4Host, runtimepy.net.util.IPv6Host, NoneType]) -> Union[runtimepy.net.util.IPv4Host, runtimepy.net.util.IPv6Host]
Get a host object from caller parameters.
sockname(sock: _socket.socket) -> Union[runtimepy.net.util.IPv4Host, runtimepy.net.util.IPv6Host]
Get address information from a socket.
async try_log_connection_error(future: Awaitable[~T], message: str, logger: Union[logging.Logger, logging.LoggerAdapter[Any]] = None) -> Optional[~T]
Attempt to resolve a future but log any connection-related exceptions.

 
Data
        BinaryMessage = typing.Union[bytes, bytearray, memoryview]
IpHost = typing.Union[runtimepy.net.util.IPv4Host, runtimepy.net.util.IPv6Host]
__all__ = ['BinaryMessage', 'Connection', 'EchoConnection', 'NullConnection', 'ConnectionManager', 'IpHost', 'IPv4Host', 'IPv6Host', 'get_free_socket', 'get_free_socket_name', 'hostname', 'normalize_host', 'sockname', 'ExponentialBackoff', 'try_log_connection_error']