|  |  | 
runtimepy.subprocess.program.PeerProgram(runtimepy.subprocess.interface.RuntimepyPeerInterface, runtimepy.mixins.psutil.PsutilMixin)
SampleProgram
 
 
| class SampleProgram(runtimepy.subprocess.program.PeerProgram)
 |  |  | SampleProgram(name: str, config: Dict[str, Union[str, int, float, bool, NoneType, Dict[str, Union[str, int, float, bool, NoneType]], List[Union[str, int, float, bool, NoneType]]]]) -> None 
 A sample peer program.
 
 |  |  | Method resolution order:SampleProgramruntimepy.subprocess.program.PeerProgramruntimepy.subprocess.interface.RuntimepyPeerInterfaceruntimepy.message.interface.JsonMessageInterfaceruntimepy.mixins.async_command.AsyncCommandProcessingMixinvcorelib.logging.LoggerMixinabc.ABCruntimepy.mixins.psutil.PsutilMixinbuiltins.object
 Methods defined here:
 
 async cleanup(self) -> NoneRuns when program 'running' context exits.
 async log_message_sender(self, poll_period_s: float, did_write: asyncio.locks.Event) -> NoneWrite to stderr periodically.
 pre_environment_exchange(self) -> NonePerform early initialization tasks.
 Data and other attributes defined here:
 
 __abstractmethods__ = frozenset()
 __annotations__ = {'stderr_task': _asyncio.Task[None]}
 Methods inherited from runtimepy.subprocess.program.PeerProgram:
 
 async heartbeat_task(self) -> NoneSend a message heartbeat back and forth.
 async io_task(self, buffer: <class 'BinaryIO'>) -> NoneRun this peer program's main loop.
 async main(self, argv: list[str]) -> NoneProgram entry.
 async poll_handler(self) -> NoneHandle a 'poll' message.
 streaming_events(self) -> Iterator[NoneType]Stream events to the stream output.
 struct_pre_finalize(self) -> NoneConfigure struct before finalization.
 write(self, data: bytes, addr: tuple[str, int] = None) -> NoneWrite data.
 Class methods inherited from runtimepy.subprocess.program.PeerProgram:
 
 async run(name: str, config: Dict[str, Union[str, int, float, bool, NoneType, Dict[str, Union[str, int, float, bool, NoneType]], List[Union[str, int, float, bool, NoneType]]]], argv: list[str]) -> None from abc.ABCMetaRun the program.
 run_standard(name: str, config: Dict[str, Union[str, int, float, bool, NoneType, Dict[str, Union[str, int, float, bool, NoneType]], List[Union[str, int, float, bool, NoneType]]]]) -> tuple[_asyncio.Task[None], ~T] from abc.ABCMetaRun this program using standard input and output.
 running(name: str, config: Dict[str, Union[str, int, float, bool, NoneType, Dict[str, Union[str, int, float, bool, NoneType]], List[Union[str, int, float, bool, NoneType]]]], argv: list[str]) -> AsyncIterator[tuple[_asyncio.Task[None], _asyncio.Task[None], ~T]] from abc.ABCMetaProvide an interface for managed-context cleanup of the peer process.
 singleton() -> ~T from abc.ABCMetaGet a shared single instance of a protocol for this class.
 Data and other attributes inherited from runtimepy.subprocess.program.PeerProgram:
 
 struct_type = <class 'runtimepy.net.arbiter.info.SampleStruct'>A sample runtime structure.
 Methods inherited from runtimepy.subprocess.interface.RuntimepyPeerInterface:
 
 __init__(self, name: str, config: Dict[str, Union[str, int, float, bool, NoneType, Dict[str, Union[str, int, float, bool, NoneType]], List[Union[str, int, float, bool, NoneType]]]]) -> NoneInitialize this instance.
 async handle_command(self, args: argparse.Namespace, channel: Union[runtimepy.primitives.field.BitField, runtimepy.channel.Channel[runtimepy.primitives.int.Int8Primitive], runtimepy.channel.Channel[runtimepy.primitives.int.Int16Primitive], runtimepy.channel.Channel[runtimepy.primitives.int.Int32Primitive], runtimepy.channel.Channel[runtimepy.primitives.int.Int64Primitive], runtimepy.channel.Channel[runtimepy.primitives.int.Uint8Primitive], runtimepy.channel.Channel[runtimepy.primitives.int.Uint16Primitive], runtimepy.channel.Channel[runtimepy.primitives.int.Uint32Primitive], runtimepy.channel.Channel[runtimepy.primitives.int.Uint64Primitive], runtimepy.channel.Channel[runtimepy.primitives.float.FloatPrimitive], runtimepy.channel.Channel[runtimepy.primitives.float.DoublePrimitive], runtimepy.channel.Channel[runtimepy.primitives.bool.BooleanPrimitive], NoneType]) -> NoneHandle a command.
 handle_log_message(self, message: dict[str, typing.Any]) -> NoneHandle a log message.
 handle_stderr(self, data: bytes) -> NoneForward stderr.
 async handle_stdout(self, data: bytes) -> NoneHandle messages from stdout.
 poll_metrics(self, time_ns: int = None) -> NonePoll channels.
 async share_config(self, data: dict[str, typing.Any]) -> NoneExchange configuration data.
 async share_environment(self) -> NoneExchange channel environments.
 Readonly properties inherited from runtimepy.subprocess.interface.RuntimepyPeerInterface:
 
 peer_nameGet the name of the peer's environment.
 Data and other attributes inherited from runtimepy.subprocess.interface.RuntimepyPeerInterface:
 
 poll_period_s = 0.01
 Methods inherited from runtimepy.message.interface.JsonMessageInterface:
 
 basic_handler(self, key: str, handler: Callable[[dict[str, Any], dict[str, Any]], Awaitable[NoneType]] = <function loopback_handler at 0x7f0350b0d620>) -> NoneRegister a basic handler.
 async channel_command(self, command: str, environment: str = 'default', addr: tuple[str, int] = None) -> runtimepy.channel.environment.command.result.CommandResultSend a channel command to an endpoint.
 async loopback(self, data: dict[str, typing.Any] = None, addr: tuple[str, int] = None, timeout: float = 3) -> boolPerform a simple loopback test on this connection.
 async process_json(self, data: dict[str, typing.Any], addr: tuple[str, int] = None) -> boolProcess a JSON message.
 send_json(self, data: Union[dict[str, Any], vcorelib.dict.codec.JsonCodec], addr: tuple[str, int] = None) -> NoneSend a JSON message.
 send_poll(self, loopback: int = 1) -> NoneSend a poll message with a default loopback of 1, so that this instancewill also be polled.
 stage_remote_log(self, msg: str, *args, level: int = 20) -> NoneLog a message on the remote.
 typed_handler(self, key: str, kind: type[~T], handler: Callable[[dict[str, Any], ~T], Awaitable[NoneType]]) -> NoneRegister a typed handler.
 async wait_json(self, data: Union[dict[str, Any], vcorelib.dict.codec.JsonCodec] = None, addr: tuple[str, int] = None, timeout: float = 3) -> dict[str, typing.Any]Send a JSON message and wait for a response.
 Data descriptors inherited from runtimepy.message.interface.JsonMessageInterface:
 
 __dict__dictionary for instance variables
 __weakref__list of weak references to the object
 Methods inherited from runtimepy.mixins.async_command.AsyncCommandProcessingMixin:
 
 async process_command_queue(self) -> NoneProcess any outgoing command requests.
 Methods inherited from vcorelib.logging.LoggerMixin:
 
 log_time(self, message: str, *args, level: int = 20, reminder: bool = False, **kwargs) -> Iterator[NoneType]A simple wrapper.
 Methods inherited from runtimepy.mixins.psutil.PsutilMixin:
 
 init_psutil(self, env: runtimepy.channel.environment.ChannelEnvironment) -> NoneInitialize psutil-based metrics.
 poll_psutil(self, weight: float) -> NonePoll psutil-based metrics.
 |  |