runtimepy.subprocess.program
index
/home/vkottler/src/vkottler/workspace/runtimepy/runtimepy/subprocess/program.py

A module implementing a peer program communication interface.

 
Modules
       
asyncio
logging
os
signal
sys

 
Classes
       
runtimepy.mixins.psutil.PsutilMixin(builtins.object)
PeerProgram(runtimepy.subprocess.interface.RuntimepyPeerInterface, runtimepy.mixins.psutil.PsutilMixin)
runtimepy.subprocess.interface.RuntimepyPeerInterface(runtimepy.message.interface.JsonMessageInterface, runtimepy.mixins.async_command.AsyncCommandProcessingMixin)
PeerProgram(runtimepy.subprocess.interface.RuntimepyPeerInterface, runtimepy.mixins.psutil.PsutilMixin)

 
class PeerProgram(runtimepy.subprocess.interface.RuntimepyPeerInterface, runtimepy.mixins.psutil.PsutilMixin)
    PeerProgram(name: str, config: Dict[str, Union[str, int, float, bool, NoneType, Dict[str, Union[str, int, float, bool, NoneType]], List[Union[str, int, float, bool, NoneType]]]]) -> None
 
A communication interface for peer programs.
 
 
Method resolution order:
PeerProgram
runtimepy.subprocess.interface.RuntimepyPeerInterface
runtimepy.message.interface.JsonMessageInterface
runtimepy.mixins.async_command.AsyncCommandProcessingMixin
vcorelib.logging.LoggerMixin
abc.ABC
runtimepy.mixins.psutil.PsutilMixin
builtins.object

Methods defined here:
async cleanup(self) -> None
Runs when program 'running' context exits.
async heartbeat_task(self) -> None
Send a message heartbeat back and forth.
async io_task(self, buffer: <class 'BinaryIO'>) -> None
Run this peer program's main loop.
async main(self, argv: list[str]) -> None
Program entry.
async poll_handler(self) -> None
Handle a 'poll' message.
pre_environment_exchange(self) -> None
Perform early initialization tasks.
streaming_events(self) -> Iterator[NoneType]
Stream events to the stream output.
struct_pre_finalize(self) -> None
Configure struct before finalization.
write(self, data: bytes, addr: tuple[str, int] = None) -> None
Write data.

Class methods defined here:
async run(name: str, config: Dict[str, Union[str, int, float, bool, NoneType, Dict[str, Union[str, int, float, bool, NoneType]], List[Union[str, int, float, bool, NoneType]]]], argv: list[str]) -> None from abc.ABCMeta
Run the program.
run_standard(name: str, config: Dict[str, Union[str, int, float, bool, NoneType, Dict[str, Union[str, int, float, bool, NoneType]], List[Union[str, int, float, bool, NoneType]]]]) -> tuple[_asyncio.Task[None], ~T] from abc.ABCMeta
Run this program using standard input and output.
running(name: str, config: Dict[str, Union[str, int, float, bool, NoneType, Dict[str, Union[str, int, float, bool, NoneType]], List[Union[str, int, float, bool, NoneType]]]], argv: list[str]) -> AsyncIterator[tuple[_asyncio.Task[None], _asyncio.Task[None], ~T]] from abc.ABCMeta
Provide an interface for managed-context cleanup of the peer process.
singleton() -> ~T from abc.ABCMeta
Get a shared single instance of a protocol for this class.

Data and other attributes defined here:
__abstractmethods__ = frozenset()
__annotations__ = {'_singleton': typing.Optional[ForwardRef('PeerProgram')], 'got_eof': <class 'asyncio.locks.Event'>, 'json_output': <class 'typing.BinaryIO'>, 'stream_metrics': <class 'runtimepy.metrics.channel.ChannelMetrics'>, 'stream_output': <class 'typing.BinaryIO'>, 'struct_type': typing.Type[runtimepy.net.arbiter.info.RuntimeStruct]}
struct_type = <class 'runtimepy.net.arbiter.info.SampleStruct'>
A sample runtime structure.

Methods inherited from runtimepy.subprocess.interface.RuntimepyPeerInterface:
__init__(self, name: str, config: Dict[str, Union[str, int, float, bool, NoneType, Dict[str, Union[str, int, float, bool, NoneType]], List[Union[str, int, float, bool, NoneType]]]]) -> None
Initialize this instance.
async handle_command(self, args: argparse.Namespace, channel: Union[runtimepy.primitives.field.BitField, runtimepy.channel.Channel[runtimepy.primitives.int.Int8Primitive], runtimepy.channel.Channel[runtimepy.primitives.int.Int16Primitive], runtimepy.channel.Channel[runtimepy.primitives.int.Int32Primitive], runtimepy.channel.Channel[runtimepy.primitives.int.Int64Primitive], runtimepy.channel.Channel[runtimepy.primitives.int.Uint8Primitive], runtimepy.channel.Channel[runtimepy.primitives.int.Uint16Primitive], runtimepy.channel.Channel[runtimepy.primitives.int.Uint32Primitive], runtimepy.channel.Channel[runtimepy.primitives.int.Uint64Primitive], runtimepy.channel.Channel[runtimepy.primitives.float.FloatPrimitive], runtimepy.channel.Channel[runtimepy.primitives.float.DoublePrimitive], runtimepy.channel.Channel[runtimepy.primitives.bool.BooleanPrimitive], NoneType]) -> None
Handle a command.
handle_log_message(self, message: dict[str, typing.Any]) -> None
Handle a log message.
handle_stderr(self, data: bytes) -> None
Forward stderr.
async handle_stdout(self, data: bytes) -> None
Handle messages from stdout.
poll_metrics(self, time_ns: int = None) -> None
Poll channels.
async share_config(self, data: dict[str, typing.Any]) -> None
Exchange configuration data.
async share_environment(self) -> None
Exchange channel environments.

Readonly properties inherited from runtimepy.subprocess.interface.RuntimepyPeerInterface:
peer_name
Get the name of the peer's environment.

Data and other attributes inherited from runtimepy.subprocess.interface.RuntimepyPeerInterface:
poll_period_s = 0.01

Methods inherited from runtimepy.message.interface.JsonMessageInterface:
basic_handler(self, key: str, handler: Callable[[dict[str, Any], dict[str, Any]], Awaitable[NoneType]] = <function loopback_handler at 0x7ffaa0c0a340>) -> None
Register a basic handler.
async channel_command(self, command: str, environment: str = 'default', addr: tuple[str, int] = None) -> runtimepy.channel.environment.command.result.CommandResult
Send a channel command to an endpoint.
async loopback(self, data: dict[str, typing.Any] = None, addr: tuple[str, int] = None, timeout: float = 3) -> bool
Perform a simple loopback test on this connection.
async process_json(self, data: dict[str, typing.Any], addr: tuple[str, int] = None) -> bool
Process a JSON message.
send_json(self, data: Union[dict[str, Any], vcorelib.dict.codec.JsonCodec], addr: tuple[str, int] = None) -> None
Send a JSON message.
send_poll(self, loopback: int = 1) -> None
Send a poll message with a default loopback of 1, so that this instance
will also be polled.
stage_remote_log(self, msg: str, *args, level: int = 20) -> None
Log a message on the remote.
typed_handler(self, key: str, kind: type[~T], handler: Callable[[dict[str, Any], ~T], Awaitable[NoneType]]) -> None
Register a typed handler.
async wait_json(self, data: Union[dict[str, Any], vcorelib.dict.codec.JsonCodec] = None, addr: tuple[str, int] = None, timeout: float = 3) -> dict[str, typing.Any]
Send a JSON message and wait for a response.

Data descriptors inherited from runtimepy.message.interface.JsonMessageInterface:
__dict__
dictionary for instance variables
__weakref__
list of weak references to the object

Methods inherited from runtimepy.mixins.async_command.AsyncCommandProcessingMixin:
async process_command_queue(self) -> None
Process any outgoing command requests.

Methods inherited from vcorelib.logging.LoggerMixin:
log_time(self, message: str, *args, level: int = 20, reminder: bool = False, **kwargs) -> Iterator[NoneType]
A simple wrapper.

Methods inherited from runtimepy.mixins.psutil.PsutilMixin:
init_psutil(self, env: runtimepy.channel.environment.ChannelEnvironment) -> None
Initialize psutil-based metrics.
poll_psutil(self, weight: float) -> None
Poll psutil-based metrics.

 
Data
        ARBITER = <vcorelib.io.arbiter.DataArbiter object>
AsyncIterator = typing.AsyncIterator
Iterator = typing.Iterator
JsonObject = typing.Dict[str, typing.Union[str, int, float, b...[typing.Union[str, int, float, bool, NoneType]]]]
Optional = typing.Optional
PROGRAM = None
T = ~T
Type = typing.Type
__annotations__ = {'PROGRAM': typing.Optional[ForwardRef('PeerProgram')]}