runtimepy.net.server.websocket
index
/home/vkottler/src/vkottler/workspace/runtimepy/runtimepy/net/server/websocket/__init__.py

A module implementing a simple WebSocket server for the package.

 
Package Contents
       
state

 
Classes
       
runtimepy.net.arbiter.tcp.json.WebsocketJsonMessageConnection(runtimepy.net.stream.json.JsonMessageConnection, runtimepy.net.websocket.connection.WebsocketConnection)
RuntimepyWebsocketConnection
runtimepy.net.websocket.connection.WebsocketConnection(runtimepy.net.connection.Connection)
RuntimepyDataWebsocketConnection

 
class RuntimepyDataWebsocketConnection(runtimepy.net.websocket.connection.WebsocketConnection)
    RuntimepyDataWebsocketConnection(protocol: '_Union[_WebSocketClientProtocol, _WebSocketServerProtocol]') -> 'None'
 
A class implementing a WebSocket connection for streaming raw data.
 
 
Method resolution order:
RuntimepyDataWebsocketConnection
runtimepy.net.websocket.connection.WebsocketConnection
runtimepy.net.connection.Connection
runtimepy.mixins.logging.LoggerMixinLevelControl
vcorelib.logging.LoggerMixin
runtimepy.mixins.environment.ChannelEnvironmentMixin
abc.ABC
builtins.object

Data and other attributes defined here:
__abstractmethods__ = frozenset()
__annotations__ = {}

Methods inherited from runtimepy.net.websocket.connection.WebsocketConnection:
__init__(self, protocol: '_Union[_WebSocketClientProtocol, _WebSocketServerProtocol]') -> 'None'
Initialize this connection.
async close(self) -> 'None'
Close this connection.

Class methods inherited from runtimepy.net.websocket.connection.WebsocketConnection:
async app(stop_sig: '_asyncio.Event', init: 'ConnectionInit[T]' = None, manager: '_ConnectionManager' = None, serving_callback: '_Callable[[_WebSocketServer], None]' = None, **kwargs) -> 'None' from abc.ABCMeta
Run a WebSocket-server application.
client(uri: 'str', **kwargs) -> '_AsyncIterator[T]' from abc.ABCMeta
A wrapper for connecting a client.
async create_connection(uri: 'str', **kwargs) -> 'T' from abc.ABCMeta
Connect a client to an endpoint.
create_pair() -> '_AsyncIterator[tuple[T, T]]' from abc.ABCMeta
Obtain a connected pair of WebsocketConnection objects.
serve(init: 'ConnectionInit[T]' = None, stop_sig: '_asyncio.Event' = None, manager: '_ConnectionManager' = None, **kwargs) -> '_AsyncIterator[_WebSocketServer]' from abc.ABCMeta
Serve a WebSocket server.
server_handler(init: 'ConnectionInit[T]' = None, stop_sig: '_asyncio.Event' = None, manager: '_ConnectionManager' = None) -> '_Callable[[_WebSocketServerProtocol], _Awaitable[None]]' from abc.ABCMeta
A wrapper for passing in a websocket handler and initializing a
connection.

Methods inherited from runtimepy.net.connection.Connection:
async async_init(self) -> bool
A runtime initialization routine (executes during 'process').
disable(self, reason: str) -> None
Disable this connection.
disable_extra(self) -> None
Additional tasks to perform when disabling.
async disable_in(self, time: float) -> None
A method for disabling a connection after some delay.
init(self) -> None
Initialize this instance.
log_metrics(self, label: str = 'conn') -> None
Log connection metrics.
async process(self, stop_sig: asyncio.locks.Event = None, disable_time: float = None, backoff: runtimepy.net.backoff.ExponentialBackoff = None) -> None
Process tasks for this connection while the connection is active.
async process_binary(self, data: bytes) -> bool
Process a binary frame.
async process_text(self, data: str) -> bool
Process a text frame.
async restart(self) -> bool
Reset necessary underlying state for this connection to 'process'
again.
send_binary(self, data: Union[bytes, bytearray, memoryview]) -> None
Enqueue a binary message tos end.
send_text(self, data: str) -> None
Enqueue a text message to send.

Readonly properties inherited from runtimepy.net.connection.Connection:
auto_restart
Determine if this connection should be automatically restarted.
disabled
Determine if this connection is disabled.

Data and other attributes inherited from runtimepy.net.connection.Connection:
byte_order = <ByteOrder.NETWORK: 4>
connected = True
default_auto_restart = False
uses_binary_tx_queue = True
uses_text_tx_queue = True

Methods inherited from runtimepy.mixins.logging.LoggerMixinLevelControl:
setup_level_channel(self, env: runtimepy.channel.environment.ChannelEnvironment, name: str = 'log_level', initial: str = 'info', description: str = 'Text-log level filter for this environment.') -> None
Add a commandable log-level channel to the environment.

Methods inherited from vcorelib.logging.LoggerMixin:
log_time(self, message: str, *args, level: int = 20, reminder: bool = False, **kwargs) -> Iterator[NoneType]
A simple wrapper.

Data descriptors inherited from vcorelib.logging.LoggerMixin:
__dict__
dictionary for instance variables
__weakref__
list of weak references to the object

Methods inherited from runtimepy.mixins.environment.ChannelEnvironmentMixin:
register_channel_metrics(self, name: str, channel: runtimepy.metrics.channel.ChannelMetrics, verb: str) -> None
Register individual channel metrics.
register_connection_metrics(self, metrics: runtimepy.metrics.connection.ConnectionMetrics, namespace: str = 'metrics') -> None
Register connection metrics.
register_task_metrics(self, metrics: runtimepy.metrics.task.PeriodicTaskMetrics, namespace: str = 'metrics') -> None
Register periodic task metrics.

 
class RuntimepyWebsocketConnection(runtimepy.net.arbiter.tcp.json.WebsocketJsonMessageConnection)
    RuntimepyWebsocketConnection(protocol: '_Union[_WebSocketClientProtocol, _WebSocketServerProtocol]') -&gt; 'None'
 
A class implementing a package-specific WebSocket connection.
 
 
Method resolution order:
RuntimepyWebsocketConnection
runtimepy.net.arbiter.tcp.json.WebsocketJsonMessageConnection
runtimepy.net.stream.json.JsonMessageConnection
runtimepy.net.stream.string.StringMessageConnection
runtimepy.net.stream.base.PrefixedMessageConnection
runtimepy.net.websocket.connection.WebsocketConnection
runtimepy.net.connection.Connection
runtimepy.mixins.logging.LoggerMixinLevelControl
runtimepy.mixins.async_command.AsyncCommandProcessingMixin
vcorelib.logging.LoggerMixin
runtimepy.mixins.environment.ChannelEnvironmentMixin
abc.ABC
runtimepy.message.interface.JsonMessageInterface
builtins.object

Methods defined here:
disable_extra(self) -> None
Additional tasks to perform when disabling.
init(self) -> None
Initialize this instance.
tab_sender(self, name: str) -> Callable[[dict[str, Any]], NoneType]
Get a tab message-sending interface.

Data and other attributes defined here:
__abstractmethods__ = frozenset()
__annotations__ = {'first_client': <class 'bool'>, 'first_message': <class 'bool'>, 'send_interfaces': dict[str, typing.Callable[[dict[str, typing.Any]], NoneType]], 'tabs': dict[str, runtimepy.net.server.websocket.state.TabState], 'ui_time': <class 'float'>}

Methods inherited from runtimepy.net.stream.json.JsonMessageConnection:
async async_init(self) -> bool
A runtime initialization routine (executes during 'process').
async handle_command(self, args: argparse.Namespace, channel: Union[runtimepy.primitives.field.BitField, runtimepy.channel.Channel[runtimepy.primitives.int.Int8Primitive], runtimepy.channel.Channel[runtimepy.primitives.int.Int16Primitive], runtimepy.channel.Channel[runtimepy.primitives.int.Int32Primitive], runtimepy.channel.Channel[runtimepy.primitives.int.Int64Primitive], runtimepy.channel.Channel[runtimepy.primitives.int.Uint8Primitive], runtimepy.channel.Channel[runtimepy.primitives.int.Uint16Primitive], runtimepy.channel.Channel[runtimepy.primitives.int.Uint32Primitive], runtimepy.channel.Channel[runtimepy.primitives.int.Uint64Primitive], runtimepy.channel.Channel[runtimepy.primitives.float.FloatPrimitive], runtimepy.channel.Channel[runtimepy.primitives.float.DoublePrimitive], runtimepy.channel.Channel[runtimepy.primitives.bool.BooleanPrimitive], NoneType]) -> None
Handle a remote command asynchronously.
async process_message(self, data: str, addr: tuple[str, int] = None) -> bool
Process a string message.
write(self, data: bytes, addr: tuple[str, int] = None) -> None
Write data.

Methods inherited from runtimepy.net.stream.string.StringMessageConnection:
async process_single(self, stream: <class 'BinaryIO'>, addr: Tuple[str, int] = None) -> bool
Process a single message.

Methods inherited from runtimepy.net.stream.base.PrefixedMessageConnection:
async process_binary(self, data: bytes, addr: tuple[str, int] = None) -> bool
Process an incoming message.
send_message(self, data: Union[bytes, bytearray, memoryview], addr: tuple[str, int] = None) -> None
Handle inter-message prefixes for outgoing messages.
send_message_str(self, data: str, addr: tuple[str, int] = None) -> None
Convert a message to bytes before sending.

Methods inherited from runtimepy.net.websocket.connection.WebsocketConnection:
__init__(self, protocol: '_Union[_WebSocketClientProtocol, _WebSocketServerProtocol]') -> 'None'
Initialize this connection.
async close(self) -> 'None'
Close this connection.

Class methods inherited from runtimepy.net.websocket.connection.WebsocketConnection:
async app(stop_sig: '_asyncio.Event', init: 'ConnectionInit[T]' = None, manager: '_ConnectionManager' = None, serving_callback: '_Callable[[_WebSocketServer], None]' = None, **kwargs) -> 'None' from abc.ABCMeta
Run a WebSocket-server application.
client(uri: 'str', **kwargs) -> '_AsyncIterator[T]' from abc.ABCMeta
A wrapper for connecting a client.
async create_connection(uri: 'str', **kwargs) -> 'T' from abc.ABCMeta
Connect a client to an endpoint.
create_pair() -> '_AsyncIterator[tuple[T, T]]' from abc.ABCMeta
Obtain a connected pair of WebsocketConnection objects.
serve(init: 'ConnectionInit[T]' = None, stop_sig: '_asyncio.Event' = None, manager: '_ConnectionManager' = None, **kwargs) -> '_AsyncIterator[_WebSocketServer]' from abc.ABCMeta
Serve a WebSocket server.
server_handler(init: 'ConnectionInit[T]' = None, stop_sig: '_asyncio.Event' = None, manager: '_ConnectionManager' = None) -> '_Callable[[_WebSocketServerProtocol], _Awaitable[None]]' from abc.ABCMeta
A wrapper for passing in a websocket handler and initializing a
connection.

Methods inherited from runtimepy.net.connection.Connection:
disable(self, reason: str) -> None
Disable this connection.
async disable_in(self, time: float) -> None
A method for disabling a connection after some delay.
log_metrics(self, label: str = 'conn') -> None
Log connection metrics.
async process(self, stop_sig: asyncio.locks.Event = None, disable_time: float = None, backoff: runtimepy.net.backoff.ExponentialBackoff = None) -> None
Process tasks for this connection while the connection is active.
async process_text(self, data: str) -> bool
Process a text frame.
async restart(self) -> bool
Reset necessary underlying state for this connection to 'process'
again.
send_binary(self, data: Union[bytes, bytearray, memoryview]) -> None
Enqueue a binary message tos end.
send_text(self, data: str) -> None
Enqueue a text message to send.

Readonly properties inherited from runtimepy.net.connection.Connection:
auto_restart
Determine if this connection should be automatically restarted.
disabled
Determine if this connection is disabled.

Data and other attributes inherited from runtimepy.net.connection.Connection:
byte_order = <ByteOrder.NETWORK: 4>
connected = True
default_auto_restart = False
uses_binary_tx_queue = True
uses_text_tx_queue = True

Methods inherited from runtimepy.mixins.logging.LoggerMixinLevelControl:
setup_level_channel(self, env: runtimepy.channel.environment.ChannelEnvironment, name: str = 'log_level', initial: str = 'info', description: str = 'Text-log level filter for this environment.') -> None
Add a commandable log-level channel to the environment.

Methods inherited from runtimepy.mixins.async_command.AsyncCommandProcessingMixin:
async process_command_queue(self) -> None
Process any outgoing command requests.

Methods inherited from vcorelib.logging.LoggerMixin:
log_time(self, message: str, *args, level: int = 20, reminder: bool = False, **kwargs) -> Iterator[NoneType]
A simple wrapper.

Data descriptors inherited from vcorelib.logging.LoggerMixin:
__dict__
dictionary for instance variables
__weakref__
list of weak references to the object

Methods inherited from runtimepy.mixins.environment.ChannelEnvironmentMixin:
register_channel_metrics(self, name: str, channel: runtimepy.metrics.channel.ChannelMetrics, verb: str) -> None
Register individual channel metrics.
register_connection_metrics(self, metrics: runtimepy.metrics.connection.ConnectionMetrics, namespace: str = 'metrics') -> None
Register connection metrics.
register_task_metrics(self, metrics: runtimepy.metrics.task.PeriodicTaskMetrics, namespace: str = 'metrics') -> None
Register periodic task metrics.

Methods inherited from runtimepy.message.interface.JsonMessageInterface:
basic_handler(self, key: str, handler: Callable[[dict[str, Any], dict[str, Any]], Awaitable[NoneType]] = <function loopback_handler at 0x7f186a664ae0>) -> None
Register a basic handler.
async channel_command(self, command: str, environment: str = 'default', addr: tuple[str, int] = None) -> runtimepy.channel.environment.command.result.CommandResult
Send a channel command to an endpoint.
handle_log_message(self, message: dict[str, typing.Any]) -> None
Handle a log message.
async loopback(self, data: dict[str, typing.Any] = None, addr: tuple[str, int] = None, timeout: float = 3) -> bool
Perform a simple loopback test on this connection.
async poll_handler(self) -> None
Poll this instance.
async process_json(self, data: dict[str, typing.Any], addr: tuple[str, int] = None) -> bool
Process a JSON message.
send_json(self, data: Union[dict[str, Any], vcorelib.dict.codec.JsonCodec], addr: tuple[str, int] = None) -> None
Send a JSON message.
send_poll(self, loopback: int = 1) -> None
Send a poll message with a default loopback of 1, so that this instance
will also be polled.
stage_remote_log(self, msg: str, *args, level: int = 20) -> None
Log a message on the remote.
typed_handler(self, key: str, kind: type[~T], handler: Callable[[dict[str, Any], ~T], Awaitable[NoneType]]) -> None
Register a typed handler.
async wait_json(self, data: Union[dict[str, Any], vcorelib.dict.codec.JsonCodec] = None, addr: tuple[str, int] = None, timeout: float = 3) -> dict[str, typing.Any]
Send a JSON message and wait for a response.

 
Data
        JsonMessage = dict[str, typing.Any]
TabMessageSender = typing.Callable[[dict[str, typing.Any]], NoneType]