| |
- runtimepy.net.stream.UdpPrefixedMessageConnection(runtimepy.net.stream.base.PrefixedMessageConnection, runtimepy.net.udp.connection.UdpConnection)
-
- UdpJsonMessageConnection(runtimepy.net.stream.json.JsonMessageConnection, runtimepy.net.stream.UdpPrefixedMessageConnection)
- runtimepy.net.stream.json.JsonMessageConnection(runtimepy.net.stream.string.StringMessageConnection, runtimepy.mixins.async_command.AsyncCommandProcessingMixin, runtimepy.message.interface.JsonMessageInterface)
-
- TcpJsonMessageConnection(runtimepy.net.stream.json.JsonMessageConnection, runtimepy.net.tcp.connection.TcpConnection)
- UdpJsonMessageConnection(runtimepy.net.stream.json.JsonMessageConnection, runtimepy.net.stream.UdpPrefixedMessageConnection)
- WebsocketJsonMessageConnection(runtimepy.net.stream.json.JsonMessageConnection, runtimepy.net.websocket.connection.WebsocketConnection)
- runtimepy.net.tcp.connection.TcpConnection(runtimepy.net.connection.Connection, runtimepy.net.mixin.TransportMixin)
-
- TcpJsonMessageConnection(runtimepy.net.stream.json.JsonMessageConnection, runtimepy.net.tcp.connection.TcpConnection)
- runtimepy.net.websocket.connection.WebsocketConnection(runtimepy.net.connection.Connection)
-
- WebsocketJsonMessageConnection(runtimepy.net.stream.json.JsonMessageConnection, runtimepy.net.websocket.connection.WebsocketConnection)
class TcpJsonMessageConnection(runtimepy.net.stream.json.JsonMessageConnection, runtimepy.net.tcp.connection.TcpConnection) |
|
TcpJsonMessageConnection(transport: asyncio.transports.Transport, protocol: runtimepy.net.tcp.protocol.QueueProtocol) -> None
A TCP connection interface for JSON messaging. |
|
- Method resolution order:
- TcpJsonMessageConnection
- runtimepy.net.stream.json.JsonMessageConnection
- runtimepy.net.stream.string.StringMessageConnection
- runtimepy.net.stream.base.PrefixedMessageConnection
- runtimepy.net.tcp.connection.TcpConnection
- runtimepy.net.connection.Connection
- runtimepy.mixins.logging.LoggerMixinLevelControl
- runtimepy.mixins.async_command.AsyncCommandProcessingMixin
- vcorelib.logging.LoggerMixin
- runtimepy.mixins.environment.ChannelEnvironmentMixin
- abc.ABC
- runtimepy.message.interface.JsonMessageInterface
- runtimepy.net.mixin.TransportMixin
- builtins.object
Data and other attributes defined here:
- __abstractmethods__ = frozenset()
- __annotations__ = {}
Methods inherited from runtimepy.net.stream.json.JsonMessageConnection:
- async async_init(self) -> bool
- A runtime initialization routine (executes during 'process').
- async handle_command(self, args: argparse.Namespace, channel: Union[runtimepy.primitives.field.BitField, runtimepy.channel.Channel[runtimepy.primitives.int.Int8Primitive], runtimepy.channel.Channel[runtimepy.primitives.int.Int16Primitive], runtimepy.channel.Channel[runtimepy.primitives.int.Int32Primitive], runtimepy.channel.Channel[runtimepy.primitives.int.Int64Primitive], runtimepy.channel.Channel[runtimepy.primitives.int.Uint8Primitive], runtimepy.channel.Channel[runtimepy.primitives.int.Uint16Primitive], runtimepy.channel.Channel[runtimepy.primitives.int.Uint32Primitive], runtimepy.channel.Channel[runtimepy.primitives.int.Uint64Primitive], runtimepy.channel.Channel[runtimepy.primitives.float.FloatPrimitive], runtimepy.channel.Channel[runtimepy.primitives.float.DoublePrimitive], runtimepy.channel.Channel[runtimepy.primitives.bool.BooleanPrimitive], NoneType]) -> None
- Handle a remote command asynchronously.
- init(self) -> None
- Initialize this instance.
- async process_message(self, data: str, addr: tuple[str, int] = None) -> bool
- Process a string message.
- write(self, data: bytes, addr: tuple[str, int] = None) -> None
- Write data.
Methods inherited from runtimepy.net.stream.string.StringMessageConnection:
- async process_single(self, stream: <class 'BinaryIO'>, addr: Tuple[str, int] = None) -> bool
- Process a single message.
Methods inherited from runtimepy.net.stream.base.PrefixedMessageConnection:
- async process_binary(self, data: bytes, addr: tuple[str, int] = None) -> bool
- Process an incoming message.
- send_message(self, data: Union[bytes, bytearray, memoryview], addr: tuple[str, int] = None) -> None
- Handle inter-message prefixes for outgoing messages.
- send_message_str(self, data: str, addr: tuple[str, int] = None) -> None
- Convert a message to bytes before sending.
Methods inherited from runtimepy.net.tcp.connection.TcpConnection:
- __init__(self, transport: asyncio.transports.Transport, protocol: runtimepy.net.tcp.protocol.QueueProtocol) -> None
- Initialize this TCP connection.
- async close(self) -> None
- Close this connection.
- async restart(self) -> bool
- Reset necessary underlying state for this connection to 'process'
again.
- send_binary(self, data: Union[bytes, bytearray, memoryview]) -> None
- Enqueue a binary message tos end.
- send_text(self, data: str) -> None
- Enqueue a text message to send.
Class methods inherited from runtimepy.net.tcp.connection.TcpConnection:
- async app(stop_sig: asyncio.locks.Event, callback: Callable[[~T], NoneType] = None, serving_callback: Callable[[Any], NoneType] = None, manager: runtimepy.net.manager.ConnectionManager = None, **kwargs) -> None from abc.ABCMeta
- Run an application that serves new connections.
- async create_connection(backoff: runtimepy.net.backoff.ExponentialBackoff = None, **kwargs) -> ~T from abc.ABCMeta
- Create a TCP connection.
- create_pair() -> AsyncIterator[tuple[~T, ~T]] from abc.ABCMeta
- Create a connection pair.
- serve(callback: Callable[[~T], NoneType] = None, **kwargs) -> AsyncIterator[Any] from abc.ABCMeta
- Serve incoming connections.
Data and other attributes inherited from runtimepy.net.tcp.connection.TcpConnection:
- log_alias = 'TCP'
- log_prefix = ''
- uses_binary_tx_queue = False
- uses_text_tx_queue = False
Methods inherited from runtimepy.net.connection.Connection:
- disable(self, reason: str) -> None
- Disable this connection.
- disable_extra(self) -> None
- Additional tasks to perform when disabling.
- async disable_in(self, time: float) -> None
- A method for disabling a connection after some delay.
- log_metrics(self, label: str = 'conn') -> None
- Log connection metrics.
- async process(self, stop_sig: asyncio.locks.Event = None, disable_time: float = None, backoff: runtimepy.net.backoff.ExponentialBackoff = None) -> None
- Process tasks for this connection while the connection is active.
- async process_text(self, data: str) -> bool
- Process a text frame.
Readonly properties inherited from runtimepy.net.connection.Connection:
- auto_restart
- Determine if this connection should be automatically restarted.
- disabled
- Determine if this connection is disabled.
Data and other attributes inherited from runtimepy.net.connection.Connection:
- byte_order = <ByteOrder.NETWORK: 4>
- connected = True
- default_auto_restart = False
Methods inherited from runtimepy.mixins.logging.LoggerMixinLevelControl:
- setup_level_channel(self, env: runtimepy.channel.environment.ChannelEnvironment, name: str = 'log_level', initial: str = 'info', description: str = 'Text-log level filter for this environment.') -> None
- Add a commandable log-level channel to the environment.
Methods inherited from runtimepy.mixins.async_command.AsyncCommandProcessingMixin:
- async process_command_queue(self) -> None
- Process any outgoing command requests.
Methods inherited from vcorelib.logging.LoggerMixin:
- log_time(self, message: str, *args, level: int = 20, reminder: bool = False, **kwargs) -> Iterator[NoneType]
- A simple wrapper.
Data descriptors inherited from vcorelib.logging.LoggerMixin:
- __dict__
- dictionary for instance variables
- __weakref__
- list of weak references to the object
Methods inherited from runtimepy.mixins.environment.ChannelEnvironmentMixin:
- register_channel_metrics(self, name: str, channel: runtimepy.metrics.channel.ChannelMetrics, verb: str) -> None
- Register individual channel metrics.
- register_connection_metrics(self, metrics: runtimepy.metrics.connection.ConnectionMetrics, namespace: str = 'metrics') -> None
- Register connection metrics.
- register_task_metrics(self, metrics: runtimepy.metrics.task.PeriodicTaskMetrics, namespace: str = 'metrics') -> None
- Register periodic task metrics.
Methods inherited from runtimepy.message.interface.JsonMessageInterface:
- basic_handler(self, key: str, handler: Callable[[dict[str, Any], dict[str, Any]], Awaitable[NoneType]] = <function loopback_handler at 0x7f29563253a0>) -> None
- Register a basic handler.
- async channel_command(self, command: str, environment: str = 'default', addr: tuple[str, int] = None) -> runtimepy.channel.environment.command.result.CommandResult
- Send a channel command to an endpoint.
- handle_log_message(self, message: dict[str, typing.Any]) -> None
- Handle a log message.
- async loopback(self, data: dict[str, typing.Any] = None, addr: tuple[str, int] = None, timeout: float = 3) -> bool
- Perform a simple loopback test on this connection.
- async poll_handler(self) -> None
- Poll this instance.
- async process_json(self, data: dict[str, typing.Any], addr: tuple[str, int] = None) -> bool
- Process a JSON message.
- send_json(self, data: Union[dict[str, Any], vcorelib.dict.codec.JsonCodec], addr: tuple[str, int] = None) -> None
- Send a JSON message.
- send_poll(self, loopback: int = 1) -> None
- Send a poll message with a default loopback of 1, so that this instance
will also be polled.
- stage_remote_log(self, msg: str, *args, level: int = 20) -> None
- Log a message on the remote.
- typed_handler(self, key: str, kind: type[~T], handler: Callable[[dict[str, Any], ~T], Awaitable[NoneType]]) -> None
- Register a typed handler.
- async wait_json(self, data: Union[dict[str, Any], vcorelib.dict.codec.JsonCodec] = None, addr: tuple[str, int] = None, timeout: float = 3) -> dict[str, typing.Any]
- Send a JSON message and wait for a response.
Methods inherited from runtimepy.net.mixin.TransportMixin:
- logger_name(self, prefix: 'str' = '') -> 'str'
- Get a logger name for this connection.
- set_transport(self, transport: '_asyncio.BaseTransport') -> 'None'
- Set the transport for this instance.
Readonly properties inherited from runtimepy.net.mixin.TransportMixin:
- socket
- Get this instance's underlying socket.
|
class UdpJsonMessageConnection(runtimepy.net.stream.json.JsonMessageConnection, runtimepy.net.stream.UdpPrefixedMessageConnection) |
|
UdpJsonMessageConnection(transport: asyncio.transports.DatagramTransport, protocol: runtimepy.net.udp.protocol.UdpQueueProtocol) -> None
A UDP connection interface for JSON messaging. |
|
- Method resolution order:
- UdpJsonMessageConnection
- runtimepy.net.stream.json.JsonMessageConnection
- runtimepy.net.stream.string.StringMessageConnection
- runtimepy.net.stream.UdpPrefixedMessageConnection
- runtimepy.net.stream.base.PrefixedMessageConnection
- runtimepy.net.udp.connection.UdpConnection
- runtimepy.net.connection.Connection
- runtimepy.mixins.logging.LoggerMixinLevelControl
- runtimepy.mixins.async_command.AsyncCommandProcessingMixin
- vcorelib.logging.LoggerMixin
- runtimepy.mixins.environment.ChannelEnvironmentMixin
- abc.ABC
- runtimepy.message.interface.JsonMessageInterface
- runtimepy.net.mixin.TransportMixin
- builtins.object
Data and other attributes defined here:
- __abstractmethods__ = frozenset()
- __annotations__ = {}
Methods inherited from runtimepy.net.stream.json.JsonMessageConnection:
- async async_init(self) -> bool
- A runtime initialization routine (executes during 'process').
- async handle_command(self, args: argparse.Namespace, channel: Union[runtimepy.primitives.field.BitField, runtimepy.channel.Channel[runtimepy.primitives.int.Int8Primitive], runtimepy.channel.Channel[runtimepy.primitives.int.Int16Primitive], runtimepy.channel.Channel[runtimepy.primitives.int.Int32Primitive], runtimepy.channel.Channel[runtimepy.primitives.int.Int64Primitive], runtimepy.channel.Channel[runtimepy.primitives.int.Uint8Primitive], runtimepy.channel.Channel[runtimepy.primitives.int.Uint16Primitive], runtimepy.channel.Channel[runtimepy.primitives.int.Uint32Primitive], runtimepy.channel.Channel[runtimepy.primitives.int.Uint64Primitive], runtimepy.channel.Channel[runtimepy.primitives.float.FloatPrimitive], runtimepy.channel.Channel[runtimepy.primitives.float.DoublePrimitive], runtimepy.channel.Channel[runtimepy.primitives.bool.BooleanPrimitive], NoneType]) -> None
- Handle a remote command asynchronously.
- init(self) -> None
- Initialize this instance.
- async process_message(self, data: str, addr: tuple[str, int] = None) -> bool
- Process a string message.
- write(self, data: bytes, addr: tuple[str, int] = None) -> None
- Write data.
Methods inherited from runtimepy.net.stream.string.StringMessageConnection:
- async process_single(self, stream: <class 'BinaryIO'>, addr: Tuple[str, int] = None) -> bool
- Process a single message.
Methods inherited from runtimepy.net.stream.UdpPrefixedMessageConnection:
- async process_datagram(self, data: bytes, addr: Tuple[str, int]) -> bool
- Process a datagram.
Methods inherited from runtimepy.net.stream.base.PrefixedMessageConnection:
- async process_binary(self, data: bytes, addr: tuple[str, int] = None) -> bool
- Process an incoming message.
- send_message(self, data: Union[bytes, bytearray, memoryview], addr: tuple[str, int] = None) -> None
- Handle inter-message prefixes for outgoing messages.
- send_message_str(self, data: str, addr: tuple[str, int] = None) -> None
- Convert a message to bytes before sending.
Methods inherited from runtimepy.net.udp.connection.UdpConnection:
- __init__(self, transport: asyncio.transports.DatagramTransport, protocol: runtimepy.net.udp.protocol.UdpQueueProtocol) -> None
- Initialize this UDP connection.
- async close(self) -> None
- Close this connection.
- async restart(self) -> bool
- Reset necessary underlying state for this connection to 'process'
again.
- send_binary(self, data: Union[bytes, bytearray, memoryview]) -> None
- Enqueue a binary message tos end.
- send_text(self, data: str) -> None
- Enqueue a text message to send.
- sendto(self, data: bytes, addr: Union[runtimepy.net.util.IPv4Host, runtimepy.net.util.IPv6Host, tuple[str, int]] = None) -> None
- Send to a specific address.
- set_remote_address(self, addr: Union[runtimepy.net.util.IPv4Host, runtimepy.net.util.IPv6Host]) -> None
- Set a new remote address. Note that this doesn't interact with or
attempt to 'connect' the underlying socket. That should be done at
creation time.
Class methods inherited from runtimepy.net.udp.connection.UdpConnection:
- async create_connection(connect: bool = True, **kwargs) -> ~T from abc.ABCMeta
- Create a UDP connection.
- async create_pair() -> tuple[~T, ~T] from abc.ABCMeta
- Create a connection pair.
Data and other attributes inherited from runtimepy.net.udp.connection.UdpConnection:
- uses_binary_tx_queue = False
- uses_text_tx_queue = False
Methods inherited from runtimepy.net.connection.Connection:
- disable(self, reason: str) -> None
- Disable this connection.
- disable_extra(self) -> None
- Additional tasks to perform when disabling.
- async disable_in(self, time: float) -> None
- A method for disabling a connection after some delay.
- log_metrics(self, label: str = 'conn') -> None
- Log connection metrics.
- async process(self, stop_sig: asyncio.locks.Event = None, disable_time: float = None, backoff: runtimepy.net.backoff.ExponentialBackoff = None) -> None
- Process tasks for this connection while the connection is active.
- async process_text(self, data: str) -> bool
- Process a text frame.
Readonly properties inherited from runtimepy.net.connection.Connection:
- auto_restart
- Determine if this connection should be automatically restarted.
- disabled
- Determine if this connection is disabled.
Data and other attributes inherited from runtimepy.net.connection.Connection:
- byte_order = <ByteOrder.NETWORK: 4>
- connected = True
- default_auto_restart = False
Methods inherited from runtimepy.mixins.logging.LoggerMixinLevelControl:
- setup_level_channel(self, env: runtimepy.channel.environment.ChannelEnvironment, name: str = 'log_level', initial: str = 'info', description: str = 'Text-log level filter for this environment.') -> None
- Add a commandable log-level channel to the environment.
Methods inherited from runtimepy.mixins.async_command.AsyncCommandProcessingMixin:
- async process_command_queue(self) -> None
- Process any outgoing command requests.
Methods inherited from vcorelib.logging.LoggerMixin:
- log_time(self, message: str, *args, level: int = 20, reminder: bool = False, **kwargs) -> Iterator[NoneType]
- A simple wrapper.
Data descriptors inherited from vcorelib.logging.LoggerMixin:
- __dict__
- dictionary for instance variables
- __weakref__
- list of weak references to the object
Methods inherited from runtimepy.mixins.environment.ChannelEnvironmentMixin:
- register_channel_metrics(self, name: str, channel: runtimepy.metrics.channel.ChannelMetrics, verb: str) -> None
- Register individual channel metrics.
- register_connection_metrics(self, metrics: runtimepy.metrics.connection.ConnectionMetrics, namespace: str = 'metrics') -> None
- Register connection metrics.
- register_task_metrics(self, metrics: runtimepy.metrics.task.PeriodicTaskMetrics, namespace: str = 'metrics') -> None
- Register periodic task metrics.
Methods inherited from runtimepy.message.interface.JsonMessageInterface:
- basic_handler(self, key: str, handler: Callable[[dict[str, Any], dict[str, Any]], Awaitable[NoneType]] = <function loopback_handler at 0x7f29563253a0>) -> None
- Register a basic handler.
- async channel_command(self, command: str, environment: str = 'default', addr: tuple[str, int] = None) -> runtimepy.channel.environment.command.result.CommandResult
- Send a channel command to an endpoint.
- handle_log_message(self, message: dict[str, typing.Any]) -> None
- Handle a log message.
- async loopback(self, data: dict[str, typing.Any] = None, addr: tuple[str, int] = None, timeout: float = 3) -> bool
- Perform a simple loopback test on this connection.
- async poll_handler(self) -> None
- Poll this instance.
- async process_json(self, data: dict[str, typing.Any], addr: tuple[str, int] = None) -> bool
- Process a JSON message.
- send_json(self, data: Union[dict[str, Any], vcorelib.dict.codec.JsonCodec], addr: tuple[str, int] = None) -> None
- Send a JSON message.
- send_poll(self, loopback: int = 1) -> None
- Send a poll message with a default loopback of 1, so that this instance
will also be polled.
- stage_remote_log(self, msg: str, *args, level: int = 20) -> None
- Log a message on the remote.
- typed_handler(self, key: str, kind: type[~T], handler: Callable[[dict[str, Any], ~T], Awaitable[NoneType]]) -> None
- Register a typed handler.
- async wait_json(self, data: Union[dict[str, Any], vcorelib.dict.codec.JsonCodec] = None, addr: tuple[str, int] = None, timeout: float = 3) -> dict[str, typing.Any]
- Send a JSON message and wait for a response.
Methods inherited from runtimepy.net.mixin.TransportMixin:
- logger_name(self, prefix: 'str' = '') -> 'str'
- Get a logger name for this connection.
- set_transport(self, transport: '_asyncio.BaseTransport') -> 'None'
- Set the transport for this instance.
Readonly properties inherited from runtimepy.net.mixin.TransportMixin:
- socket
- Get this instance's underlying socket.
|
class WebsocketJsonMessageConnection(runtimepy.net.stream.json.JsonMessageConnection, runtimepy.net.websocket.connection.WebsocketConnection) |
|
WebsocketJsonMessageConnection(protocol: '_Union[_WebSocketClientProtocol, _WebSocketServerProtocol]') -> 'None'
A websocket connection interface for JSON messaging. |
|
- Method resolution order:
- WebsocketJsonMessageConnection
- runtimepy.net.stream.json.JsonMessageConnection
- runtimepy.net.stream.string.StringMessageConnection
- runtimepy.net.stream.base.PrefixedMessageConnection
- runtimepy.net.websocket.connection.WebsocketConnection
- runtimepy.net.connection.Connection
- runtimepy.mixins.logging.LoggerMixinLevelControl
- runtimepy.mixins.async_command.AsyncCommandProcessingMixin
- vcorelib.logging.LoggerMixin
- runtimepy.mixins.environment.ChannelEnvironmentMixin
- abc.ABC
- runtimepy.message.interface.JsonMessageInterface
- builtins.object
Data and other attributes defined here:
- __abstractmethods__ = frozenset()
- __annotations__ = {}
Methods inherited from runtimepy.net.stream.json.JsonMessageConnection:
- async async_init(self) -> bool
- A runtime initialization routine (executes during 'process').
- async handle_command(self, args: argparse.Namespace, channel: Union[runtimepy.primitives.field.BitField, runtimepy.channel.Channel[runtimepy.primitives.int.Int8Primitive], runtimepy.channel.Channel[runtimepy.primitives.int.Int16Primitive], runtimepy.channel.Channel[runtimepy.primitives.int.Int32Primitive], runtimepy.channel.Channel[runtimepy.primitives.int.Int64Primitive], runtimepy.channel.Channel[runtimepy.primitives.int.Uint8Primitive], runtimepy.channel.Channel[runtimepy.primitives.int.Uint16Primitive], runtimepy.channel.Channel[runtimepy.primitives.int.Uint32Primitive], runtimepy.channel.Channel[runtimepy.primitives.int.Uint64Primitive], runtimepy.channel.Channel[runtimepy.primitives.float.FloatPrimitive], runtimepy.channel.Channel[runtimepy.primitives.float.DoublePrimitive], runtimepy.channel.Channel[runtimepy.primitives.bool.BooleanPrimitive], NoneType]) -> None
- Handle a remote command asynchronously.
- init(self) -> None
- Initialize this instance.
- async process_message(self, data: str, addr: tuple[str, int] = None) -> bool
- Process a string message.
- write(self, data: bytes, addr: tuple[str, int] = None) -> None
- Write data.
Methods inherited from runtimepy.net.stream.string.StringMessageConnection:
- async process_single(self, stream: <class 'BinaryIO'>, addr: Tuple[str, int] = None) -> bool
- Process a single message.
Methods inherited from runtimepy.net.stream.base.PrefixedMessageConnection:
- async process_binary(self, data: bytes, addr: tuple[str, int] = None) -> bool
- Process an incoming message.
- send_message(self, data: Union[bytes, bytearray, memoryview], addr: tuple[str, int] = None) -> None
- Handle inter-message prefixes for outgoing messages.
- send_message_str(self, data: str, addr: tuple[str, int] = None) -> None
- Convert a message to bytes before sending.
Methods inherited from runtimepy.net.websocket.connection.WebsocketConnection:
- __init__(self, protocol: '_Union[_WebSocketClientProtocol, _WebSocketServerProtocol]') -> 'None'
- Initialize this connection.
- async close(self) -> 'None'
- Close this connection.
Class methods inherited from runtimepy.net.websocket.connection.WebsocketConnection:
- async app(stop_sig: '_asyncio.Event', init: 'ConnectionInit[T]' = None, manager: '_ConnectionManager' = None, serving_callback: '_Callable[[_WebSocketServer], None]' = None, **kwargs) -> 'None' from abc.ABCMeta
- Run a WebSocket-server application.
- client(uri: 'str', **kwargs) -> '_AsyncIterator[T]' from abc.ABCMeta
- A wrapper for connecting a client.
- async create_connection(uri: 'str', **kwargs) -> 'T' from abc.ABCMeta
- Connect a client to an endpoint.
- create_pair() -> '_AsyncIterator[tuple[T, T]]' from abc.ABCMeta
- Obtain a connected pair of WebsocketConnection objects.
- serve(init: 'ConnectionInit[T]' = None, stop_sig: '_asyncio.Event' = None, manager: '_ConnectionManager' = None, **kwargs) -> '_AsyncIterator[_WebSocketServer]' from abc.ABCMeta
- Serve a WebSocket server.
- server_handler(init: 'ConnectionInit[T]' = None, stop_sig: '_asyncio.Event' = None, manager: '_ConnectionManager' = None) -> '_Callable[[_WebSocketServerProtocol], _Awaitable[None]]' from abc.ABCMeta
- A wrapper for passing in a websocket handler and initializing a
connection.
Methods inherited from runtimepy.net.connection.Connection:
- disable(self, reason: str) -> None
- Disable this connection.
- disable_extra(self) -> None
- Additional tasks to perform when disabling.
- async disable_in(self, time: float) -> None
- A method for disabling a connection after some delay.
- log_metrics(self, label: str = 'conn') -> None
- Log connection metrics.
- async process(self, stop_sig: asyncio.locks.Event = None, disable_time: float = None, backoff: runtimepy.net.backoff.ExponentialBackoff = None) -> None
- Process tasks for this connection while the connection is active.
- async process_text(self, data: str) -> bool
- Process a text frame.
- async restart(self) -> bool
- Reset necessary underlying state for this connection to 'process'
again.
- send_binary(self, data: Union[bytes, bytearray, memoryview]) -> None
- Enqueue a binary message tos end.
- send_text(self, data: str) -> None
- Enqueue a text message to send.
Readonly properties inherited from runtimepy.net.connection.Connection:
- auto_restart
- Determine if this connection should be automatically restarted.
- disabled
- Determine if this connection is disabled.
Data and other attributes inherited from runtimepy.net.connection.Connection:
- byte_order = <ByteOrder.NETWORK: 4>
- connected = True
- default_auto_restart = False
- uses_binary_tx_queue = True
- uses_text_tx_queue = True
Methods inherited from runtimepy.mixins.logging.LoggerMixinLevelControl:
- setup_level_channel(self, env: runtimepy.channel.environment.ChannelEnvironment, name: str = 'log_level', initial: str = 'info', description: str = 'Text-log level filter for this environment.') -> None
- Add a commandable log-level channel to the environment.
Methods inherited from runtimepy.mixins.async_command.AsyncCommandProcessingMixin:
- async process_command_queue(self) -> None
- Process any outgoing command requests.
Methods inherited from vcorelib.logging.LoggerMixin:
- log_time(self, message: str, *args, level: int = 20, reminder: bool = False, **kwargs) -> Iterator[NoneType]
- A simple wrapper.
Data descriptors inherited from vcorelib.logging.LoggerMixin:
- __dict__
- dictionary for instance variables
- __weakref__
- list of weak references to the object
Methods inherited from runtimepy.mixins.environment.ChannelEnvironmentMixin:
- register_channel_metrics(self, name: str, channel: runtimepy.metrics.channel.ChannelMetrics, verb: str) -> None
- Register individual channel metrics.
- register_connection_metrics(self, metrics: runtimepy.metrics.connection.ConnectionMetrics, namespace: str = 'metrics') -> None
- Register connection metrics.
- register_task_metrics(self, metrics: runtimepy.metrics.task.PeriodicTaskMetrics, namespace: str = 'metrics') -> None
- Register periodic task metrics.
Methods inherited from runtimepy.message.interface.JsonMessageInterface:
- basic_handler(self, key: str, handler: Callable[[dict[str, Any], dict[str, Any]], Awaitable[NoneType]] = <function loopback_handler at 0x7f29563253a0>) -> None
- Register a basic handler.
- async channel_command(self, command: str, environment: str = 'default', addr: tuple[str, int] = None) -> runtimepy.channel.environment.command.result.CommandResult
- Send a channel command to an endpoint.
- handle_log_message(self, message: dict[str, typing.Any]) -> None
- Handle a log message.
- async loopback(self, data: dict[str, typing.Any] = None, addr: tuple[str, int] = None, timeout: float = 3) -> bool
- Perform a simple loopback test on this connection.
- async poll_handler(self) -> None
- Poll this instance.
- async process_json(self, data: dict[str, typing.Any], addr: tuple[str, int] = None) -> bool
- Process a JSON message.
- send_json(self, data: Union[dict[str, Any], vcorelib.dict.codec.JsonCodec], addr: tuple[str, int] = None) -> None
- Send a JSON message.
- send_poll(self, loopback: int = 1) -> None
- Send a poll message with a default loopback of 1, so that this instance
will also be polled.
- stage_remote_log(self, msg: str, *args, level: int = 20) -> None
- Log a message on the remote.
- typed_handler(self, key: str, kind: type[~T], handler: Callable[[dict[str, Any], ~T], Awaitable[NoneType]]) -> None
- Register a typed handler.
- async wait_json(self, data: Union[dict[str, Any], vcorelib.dict.codec.JsonCodec] = None, addr: tuple[str, int] = None, timeout: float = 3) -> dict[str, typing.Any]
- Send a JSON message and wait for a response.
| |