|        |   | 
- runtimepy.mixins.psutil.PsutilMixin(builtins.object)
 - 
- PeerProgram(runtimepy.subprocess.interface.RuntimepyPeerInterface, runtimepy.mixins.psutil.PsutilMixin)
  
 
- runtimepy.subprocess.interface.RuntimepyPeerInterface(runtimepy.message.interface.JsonMessageInterface, runtimepy.mixins.async_command.AsyncCommandProcessingMixin)
 - 
- PeerProgram(runtimepy.subprocess.interface.RuntimepyPeerInterface, runtimepy.mixins.psutil.PsutilMixin)
  
 
 
 
 
  class PeerProgram(runtimepy.subprocess.interface.RuntimepyPeerInterface, runtimepy.mixins.psutil.PsutilMixin) |  
    
|     | 
PeerProgram(name: str, config: Dict[str, Union[str, int, float, bool, NoneType, Dict[str, Union[str, int, float, bool, NoneType]], List[Union[str, int, float, bool, NoneType]]]]) -> None 
  
A communication interface for peer programs.   |  
|   | 
- Method resolution order:
 
- PeerProgram
 
- runtimepy.subprocess.interface.RuntimepyPeerInterface
 
- runtimepy.message.interface.JsonMessageInterface
 
- runtimepy.mixins.async_command.AsyncCommandProcessingMixin
 
- vcorelib.logging.LoggerMixin
 
- abc.ABC
 
- runtimepy.mixins.psutil.PsutilMixin
 
- builtins.object
 
 
 
Methods defined here: 
- async cleanup(self) -> None
 - Runs when program 'running' context exits.
  
- async heartbeat_task(self) -> None
 - Send a message heartbeat back and forth.
  
- async io_task(self, buffer: <class 'BinaryIO'>) -> None
 - Run this peer program's main loop.
  
- async main(self, argv: list[str]) -> None
 - Program entry.
  
- async poll_handler(self) -> None
 - Handle a 'poll' message.
  
- pre_environment_exchange(self) -> None
 - Perform early initialization tasks.
  
- streaming_events(self) -> Iterator[NoneType]
 - Stream events to the stream output.
  
- struct_pre_finalize(self) -> None
 - Configure struct before finalization.
  
- write(self, data: bytes, addr: tuple[str, int] = None) -> None
 - Write data.
  
 
Class methods defined here: 
- async run(name: str, config: Dict[str, Union[str, int, float, bool, NoneType, Dict[str, Union[str, int, float, bool, NoneType]], List[Union[str, int, float, bool, NoneType]]]], argv: list[str]) -> None from abc.ABCMeta
 - Run the program.
  
- run_standard(name: str, config: Dict[str, Union[str, int, float, bool, NoneType, Dict[str, Union[str, int, float, bool, NoneType]], List[Union[str, int, float, bool, NoneType]]]]) -> tuple[_asyncio.Task[None], ~T] from abc.ABCMeta
 - Run this program using standard input and output.
  
- running(name: str, config: Dict[str, Union[str, int, float, bool, NoneType, Dict[str, Union[str, int, float, bool, NoneType]], List[Union[str, int, float, bool, NoneType]]]], argv: list[str]) -> AsyncIterator[tuple[_asyncio.Task[None], _asyncio.Task[None], ~T]] from abc.ABCMeta
 - Provide an interface for managed-context cleanup of the peer process.
  
- singleton() -> ~T from abc.ABCMeta
 - Get a shared single instance of a protocol for this class.
  
 
Data and other attributes defined here: 
- __abstractmethods__ = frozenset()
  
- __annotations__ = {'_singleton': typing.Optional[ForwardRef('PeerProgram')], 'got_eof': <class 'asyncio.locks.Event'>, 'json_output': <class 'typing.BinaryIO'>, 'stream_metrics': <class 'runtimepy.metrics.channel.ChannelMetrics'>, 'stream_output': <class 'typing.BinaryIO'>, 'struct_type': typing.Type[runtimepy.net.arbiter.info.RuntimeStruct]}
  
- struct_type = <class 'runtimepy.net.arbiter.info.SampleStruct'>
 - A sample runtime structure.
  
 
Methods inherited from runtimepy.subprocess.interface.RuntimepyPeerInterface: 
- __init__(self, name: str, config: Dict[str, Union[str, int, float, bool, NoneType, Dict[str, Union[str, int, float, bool, NoneType]], List[Union[str, int, float, bool, NoneType]]]]) -> None
 - Initialize this instance.
  
- async handle_command(self, args: argparse.Namespace, channel: Union[runtimepy.primitives.field.BitField, runtimepy.channel.Channel[runtimepy.primitives.int.Int8Primitive], runtimepy.channel.Channel[runtimepy.primitives.int.Int16Primitive], runtimepy.channel.Channel[runtimepy.primitives.int.Int32Primitive], runtimepy.channel.Channel[runtimepy.primitives.int.Int64Primitive], runtimepy.channel.Channel[runtimepy.primitives.int.Uint8Primitive], runtimepy.channel.Channel[runtimepy.primitives.int.Uint16Primitive], runtimepy.channel.Channel[runtimepy.primitives.int.Uint32Primitive], runtimepy.channel.Channel[runtimepy.primitives.int.Uint64Primitive], runtimepy.channel.Channel[runtimepy.primitives.float.FloatPrimitive], runtimepy.channel.Channel[runtimepy.primitives.float.DoublePrimitive], runtimepy.channel.Channel[runtimepy.primitives.bool.BooleanPrimitive], NoneType]) -> None
 - Handle a command.
  
- handle_log_message(self, message: dict[str, typing.Any]) -> None
 - Handle a log message.
  
- handle_stderr(self, data: bytes) -> None
 - Forward stderr.
  
- async handle_stdout(self, data: bytes) -> None
 - Handle messages from stdout.
  
- poll_metrics(self, time_ns: int = None) -> None
 - Poll channels.
  
- async share_config(self, data: dict[str, typing.Any]) -> None
 - Exchange configuration data.
  
- async share_environment(self) -> None
 - Exchange channel environments.
  
 
Readonly properties inherited from runtimepy.subprocess.interface.RuntimepyPeerInterface: 
- peer_name
 
- Get the name of the peer's environment.
 
 
 
Data and other attributes inherited from runtimepy.subprocess.interface.RuntimepyPeerInterface: 
- poll_period_s = 0.01
  
 
Methods inherited from runtimepy.message.interface.JsonMessageInterface: 
- basic_handler(self, key: str, handler: Callable[[dict[str, Any], dict[str, Any]], Awaitable[NoneType]] = <function loopback_handler at 0x7ffaa0c0a340>) -> None
 - Register a basic handler.
  
- async channel_command(self, command: str, environment: str = 'default', addr: tuple[str, int] = None) -> runtimepy.channel.environment.command.result.CommandResult
 - Send a channel command to an endpoint.
  
- async loopback(self, data: dict[str, typing.Any] = None, addr: tuple[str, int] = None, timeout: float = 3) -> bool
 - Perform a simple loopback test on this connection.
  
- async process_json(self, data: dict[str, typing.Any], addr: tuple[str, int] = None) -> bool
 - Process a JSON message.
  
- send_json(self, data: Union[dict[str, Any], vcorelib.dict.codec.JsonCodec], addr: tuple[str, int] = None) -> None
 - Send a JSON message.
  
- send_poll(self, loopback: int = 1) -> None
 - Send a poll message with a default loopback of 1, so that this instance
 
will also be polled.  
- stage_remote_log(self, msg: str, *args, level: int = 20) -> None
 - Log a message on the remote.
  
- typed_handler(self, key: str, kind: type[~T], handler: Callable[[dict[str, Any], ~T], Awaitable[NoneType]]) -> None
 - Register a typed handler.
  
- async wait_json(self, data: Union[dict[str, Any], vcorelib.dict.codec.JsonCodec] = None, addr: tuple[str, int] = None, timeout: float = 3) -> dict[str, typing.Any]
 - Send a JSON message and wait for a response.
  
 
Data descriptors inherited from runtimepy.message.interface.JsonMessageInterface: 
- __dict__
 
- dictionary for instance variables
 
 
- __weakref__
 
- list of weak references to the object
 
 
 
Methods inherited from runtimepy.mixins.async_command.AsyncCommandProcessingMixin: 
- async process_command_queue(self) -> None
 - Process any outgoing command requests.
  
 
Methods inherited from vcorelib.logging.LoggerMixin: 
- log_time(self, message: str, *args, level: int = 20, reminder: bool = False, **kwargs) -> Iterator[NoneType]
 - A simple wrapper.
  
 
Methods inherited from runtimepy.mixins.psutil.PsutilMixin: 
- init_psutil(self, env: runtimepy.channel.environment.ChannelEnvironment) -> None
 - Initialize psutil-based metrics.
  
- poll_psutil(self, weight: float) -> None
 - Poll psutil-based metrics.
  
 |    |