|
Methods defined here:
- __init__(self) -> None
- Initialize this instance
- basic_handler(self, key: str, handler: Callable[[dict[str, Any], dict[str, Any]], Awaitable[NoneType]] = <function loopback_handler at 0x7f38d4a3f6a0>) -> None
- Register a basic handler.
- async channel_command(self, command: str, environment: str = 'default', addr: tuple[str, int] = None) -> runtimepy.channel.environment.command.result.CommandResult
- Send a channel command to an endpoint.
- handle_log_message(self, message: dict[str, typing.Any]) -> None
- Handle a log message.
- async loopback(self, data: dict[str, typing.Any] = None, addr: tuple[str, int] = None, timeout: float = 3) -> bool
- Perform a simple loopback test on this connection.
- async poll_handler(self) -> None
- Poll this instance.
- async process_json(self, data: dict[str, typing.Any], addr: tuple[str, int] = None) -> bool
- Process a JSON message.
- send_json(self, data: Union[dict[str, Any], vcorelib.dict.codec.JsonCodec], addr: tuple[str, int] = None) -> None
- Send a JSON message.
- send_poll(self, loopback: int = 1) -> None
- Send a poll message with a default loopback of 1, so that this instance
will also be polled.
- stage_remote_log(self, msg: str, *args, level: int = 20) -> None
- Log a message on the remote.
- typed_handler(self, key: str, kind: type[~T], handler: Callable[[dict[str, Any], ~T], Awaitable[NoneType]]) -> None
- Register a typed handler.
- async wait_json(self, data: Union[dict[str, Any], vcorelib.dict.codec.JsonCodec] = None, addr: tuple[str, int] = None, timeout: float = 3) -> dict[str, typing.Any]
- Send a JSON message and wait for a response.
- write(self, data: bytes, addr: tuple[str, int] = None) -> None
- Write data.
Data descriptors defined here:
- __dict__
- dictionary for instance variables
- __weakref__
- list of weak references to the object
Data and other attributes defined here:
- __annotations__ = {'command': <class 'runtimepy.channel.environment.command.processor.ChannelCommandProcessor'>, 'list_handler': <class 'runtimepy.util.ListLogger'>, 'logger': typing.Union[logging.Logger, logging.LoggerAdapter[typing.Any]], 'processor': <class 'runtimepy.message.MessageProcessor'>, 'remote_environments': dict[str, runtimepy.channel.environment.ChannelEnvironment]}
|