| |
- runtimepy.subprocess.program.PeerProgram(runtimepy.subprocess.interface.RuntimepyPeerInterface, runtimepy.mixins.psutil.PsutilMixin)
-
- SampleProgram
class SampleProgram(runtimepy.subprocess.program.PeerProgram) |
|
SampleProgram(name: str, config: Dict[str, Union[str, int, float, bool, NoneType, Dict[str, Union[str, int, float, bool, NoneType]], List[Union[str, int, float, bool, NoneType]]]]) -> None
A sample peer program. |
|
- Method resolution order:
- SampleProgram
- runtimepy.subprocess.program.PeerProgram
- runtimepy.subprocess.interface.RuntimepyPeerInterface
- runtimepy.message.interface.JsonMessageInterface
- runtimepy.mixins.async_command.AsyncCommandProcessingMixin
- vcorelib.logging.LoggerMixin
- abc.ABC
- runtimepy.mixins.psutil.PsutilMixin
- builtins.object
Methods defined here:
- async cleanup(self) -> None
- Runs when program 'running' context exits.
- async log_message_sender(self, poll_period_s: float, did_write: asyncio.locks.Event) -> None
- Write to stderr periodically.
- pre_environment_exchange(self) -> None
- Perform early initialization tasks.
Data and other attributes defined here:
- __abstractmethods__ = frozenset()
- __annotations__ = {'stderr_task': _asyncio.Task[None]}
Methods inherited from runtimepy.subprocess.program.PeerProgram:
- async heartbeat_task(self) -> None
- Send a message heartbeat back and forth.
- async io_task(self, buffer: <class 'BinaryIO'>) -> None
- Run this peer program's main loop.
- async main(self, argv: list[str]) -> None
- Program entry.
- async poll_handler(self) -> None
- Handle a 'poll' message.
- streaming_events(self) -> Iterator[NoneType]
- Stream events to the stream output.
- struct_pre_finalize(self) -> None
- Configure struct before finalization.
- write(self, data: bytes, addr: tuple[str, int] = None) -> None
- Write data.
Class methods inherited from runtimepy.subprocess.program.PeerProgram:
- async run(name: str, config: Dict[str, Union[str, int, float, bool, NoneType, Dict[str, Union[str, int, float, bool, NoneType]], List[Union[str, int, float, bool, NoneType]]]], argv: list[str]) -> None from abc.ABCMeta
- Run the program.
- run_standard(name: str, config: Dict[str, Union[str, int, float, bool, NoneType, Dict[str, Union[str, int, float, bool, NoneType]], List[Union[str, int, float, bool, NoneType]]]]) -> tuple[_asyncio.Task[None], ~T] from abc.ABCMeta
- Run this program using standard input and output.
- running(name: str, config: Dict[str, Union[str, int, float, bool, NoneType, Dict[str, Union[str, int, float, bool, NoneType]], List[Union[str, int, float, bool, NoneType]]]], argv: list[str]) -> AsyncIterator[tuple[_asyncio.Task[None], _asyncio.Task[None], ~T]] from abc.ABCMeta
- Provide an interface for managed-context cleanup of the peer process.
- singleton() -> ~T from abc.ABCMeta
- Get a shared single instance of a protocol for this class.
Data and other attributes inherited from runtimepy.subprocess.program.PeerProgram:
- struct_type = <class 'runtimepy.net.arbiter.info.SampleStruct'>
- A sample runtime structure.
Methods inherited from runtimepy.subprocess.interface.RuntimepyPeerInterface:
- __init__(self, name: str, config: Dict[str, Union[str, int, float, bool, NoneType, Dict[str, Union[str, int, float, bool, NoneType]], List[Union[str, int, float, bool, NoneType]]]]) -> None
- Initialize this instance.
- async handle_command(self, args: argparse.Namespace, channel: Union[runtimepy.primitives.field.BitField, runtimepy.channel.Channel[runtimepy.primitives.int.Int8Primitive], runtimepy.channel.Channel[runtimepy.primitives.int.Int16Primitive], runtimepy.channel.Channel[runtimepy.primitives.int.Int32Primitive], runtimepy.channel.Channel[runtimepy.primitives.int.Int64Primitive], runtimepy.channel.Channel[runtimepy.primitives.int.Uint8Primitive], runtimepy.channel.Channel[runtimepy.primitives.int.Uint16Primitive], runtimepy.channel.Channel[runtimepy.primitives.int.Uint32Primitive], runtimepy.channel.Channel[runtimepy.primitives.int.Uint64Primitive], runtimepy.channel.Channel[runtimepy.primitives.float.FloatPrimitive], runtimepy.channel.Channel[runtimepy.primitives.float.DoublePrimitive], runtimepy.channel.Channel[runtimepy.primitives.bool.BooleanPrimitive], NoneType]) -> None
- Handle a command.
- handle_log_message(self, message: dict[str, typing.Any]) -> None
- Handle a log message.
- handle_stderr(self, data: bytes) -> None
- Forward stderr.
- async handle_stdout(self, data: bytes) -> None
- Handle messages from stdout.
- poll_metrics(self, time_ns: int = None) -> None
- Poll channels.
- async share_config(self, data: dict[str, typing.Any]) -> None
- Exchange configuration data.
- async share_environment(self) -> None
- Exchange channel environments.
Readonly properties inherited from runtimepy.subprocess.interface.RuntimepyPeerInterface:
- peer_name
- Get the name of the peer's environment.
Data and other attributes inherited from runtimepy.subprocess.interface.RuntimepyPeerInterface:
- poll_period_s = 0.01
Methods inherited from runtimepy.message.interface.JsonMessageInterface:
- basic_handler(self, key: str, handler: Callable[[dict[str, Any], dict[str, Any]], Awaitable[NoneType]] = <function loopback_handler at 0x7f0350b0d620>) -> None
- Register a basic handler.
- async channel_command(self, command: str, environment: str = 'default', addr: tuple[str, int] = None) -> runtimepy.channel.environment.command.result.CommandResult
- Send a channel command to an endpoint.
- async loopback(self, data: dict[str, typing.Any] = None, addr: tuple[str, int] = None, timeout: float = 3) -> bool
- Perform a simple loopback test on this connection.
- async process_json(self, data: dict[str, typing.Any], addr: tuple[str, int] = None) -> bool
- Process a JSON message.
- send_json(self, data: Union[dict[str, Any], vcorelib.dict.codec.JsonCodec], addr: tuple[str, int] = None) -> None
- Send a JSON message.
- send_poll(self, loopback: int = 1) -> None
- Send a poll message with a default loopback of 1, so that this instance
will also be polled.
- stage_remote_log(self, msg: str, *args, level: int = 20) -> None
- Log a message on the remote.
- typed_handler(self, key: str, kind: type[~T], handler: Callable[[dict[str, Any], ~T], Awaitable[NoneType]]) -> None
- Register a typed handler.
- async wait_json(self, data: Union[dict[str, Any], vcorelib.dict.codec.JsonCodec] = None, addr: tuple[str, int] = None, timeout: float = 3) -> dict[str, typing.Any]
- Send a JSON message and wait for a response.
Data descriptors inherited from runtimepy.message.interface.JsonMessageInterface:
- __dict__
- dictionary for instance variables
- __weakref__
- list of weak references to the object
Methods inherited from runtimepy.mixins.async_command.AsyncCommandProcessingMixin:
- async process_command_queue(self) -> None
- Process any outgoing command requests.
Methods inherited from vcorelib.logging.LoggerMixin:
- log_time(self, message: str, *args, level: int = 20, reminder: bool = False, **kwargs) -> Iterator[NoneType]
- A simple wrapper.
Methods inherited from runtimepy.mixins.psutil.PsutilMixin:
- init_psutil(self, env: runtimepy.channel.environment.ChannelEnvironment) -> None
- Initialize psutil-based metrics.
- poll_psutil(self, weight: float) -> None
- Poll psutil-based metrics.
| |