runtimepy.message.interface
index
/home/vkottler/src/vkottler/workspace/runtimepy/runtimepy/message/interface.py

A module implementing a JSON messaging interface.

 
Modules
       
asyncio
logging

 
Classes
       
builtins.object
JsonMessageInterface

 
class JsonMessageInterface(builtins.object)
    JsonMessageInterface() -> None
 
A JSON messaging interface class.
 
  Methods defined here:
__init__(self) -> None
Initialize this instance
basic_handler(self, key: str, handler: Callable[[dict[str, Any], dict[str, Any]], Awaitable[NoneType]] = <function loopback_handler at 0x7f38d4a3f6a0>) -> None
Register a basic handler.
async channel_command(self, command: str, environment: str = 'default', addr: tuple[str, int] = None) -> runtimepy.channel.environment.command.result.CommandResult
Send a channel command to an endpoint.
handle_log_message(self, message: dict[str, typing.Any]) -> None
Handle a log message.
async loopback(self, data: dict[str, typing.Any] = None, addr: tuple[str, int] = None, timeout: float = 3) -> bool
Perform a simple loopback test on this connection.
async poll_handler(self) -> None
Poll this instance.
async process_json(self, data: dict[str, typing.Any], addr: tuple[str, int] = None) -> bool
Process a JSON message.
send_json(self, data: Union[dict[str, Any], vcorelib.dict.codec.JsonCodec], addr: tuple[str, int] = None) -> None
Send a JSON message.
send_poll(self, loopback: int = 1) -> None
Send a poll message with a default loopback of 1, so that this instance
will also be polled.
stage_remote_log(self, msg: str, *args, level: int = 20) -> None
Log a message on the remote.
typed_handler(self, key: str, kind: type[~T], handler: Callable[[dict[str, Any], ~T], Awaitable[NoneType]]) -> None
Register a typed handler.
async wait_json(self, data: Union[dict[str, Any], vcorelib.dict.codec.JsonCodec] = None, addr: tuple[str, int] = None, timeout: float = 3) -> dict[str, typing.Any]
Send a JSON message and wait for a response.
write(self, data: bytes, addr: tuple[str, int] = None) -> None
Write data.

Data descriptors defined here:
__dict__
dictionary for instance variables
__weakref__
list of weak references to the object

Data and other attributes defined here:
__annotations__ = {'command': <class 'runtimepy.channel.environment.command.processor.ChannelCommandProcessor'>, 'list_handler': <class 'runtimepy.util.ListLogger'>, 'logger': typing.Union[logging.Logger, logging.LoggerAdapter[typing.Any]], 'processor': <class 'runtimepy.message.MessageProcessor'>, 'remote_environments': dict[str, runtimepy.channel.environment.ChannelEnvironment]}

 
Data
        DEFAULT_LOOPBACK = {'a': 1, 'b': 2, 'c': 3}
DEFAULT_TIMEOUT = 3
ENVIRONMENTS = {}
JsonMessage = dict[str, typing.Any]
LoggerType = typing.Union[logging.Logger, logging.LoggerAdapter[typing.Any]]
MessageHandler = typing.Callable[[dict[str, typing.Any], dict[str, typing.Any]], typing.Awaitable[NoneType]]
Optional = typing.Optional
PKG_NAME = 'runtimepy'
RESERVED_KEYS = {'__id__', '__log_messages__', 'keys_ignored'}
T = ~T
TypedHandler = typing.Callable[[dict[str, typing.Any], ~T], typing.Awaitable[NoneType]]
Union = typing.Union
VERSION = '4.4.0'