runtimepy.subprocess
index
/home/vkottler/src/vkottler/workspace/runtimepy/runtimepy/subprocess/__init__.py

A module implementing a subprocess management interface.

 
Package Contents
       
interface
peer
program
protocol

 
Functions
       
async close_protocol(protocol: runtimepy.subprocess.protocol.RuntimepySubprocessProtocol, poll_rate: float = 2.0, stdout: asyncio.queues.Queue[bytes] = None, stderr: asyncio.queues.Queue[bytes] = None, logger: Union[logging.Logger, logging.LoggerAdapter[Any]] = <Logger runtimepy.subprocess (WARNING)>) -> None
Shutdown a protocol instance.
get_running_loop()
Return the running event loop.  Raise a RuntimeError if there is none.
 
This function is thread-specific.
async shutdown_protocol(protocol: runtimepy.subprocess.protocol.RuntimepySubprocessProtocol, poll_rate: float = 2.0, logger: Union[logging.Logger, logging.LoggerAdapter[Any]] = <Logger runtimepy.subprocess (WARNING)>) -> None
Shutdown a subprocess protocol instance.
spawn_exec(*args: str, stdout: asyncio.queues.Queue[bytes] = None, stderr: asyncio.queues.Queue[bytes] = None, logger: Union[logging.Logger, logging.LoggerAdapter[Any]] = <Logger runtimepy.subprocess (WARNING)>, **kwargs) -> AsyncIterator[runtimepy.subprocess.protocol.RuntimepySubprocessProtocol]
Create a subprocess.
spawn_shell(cmd: str, stdout: asyncio.queues.Queue[bytes] = None, stderr: asyncio.queues.Queue[bytes] = None, logger: Union[logging.Logger, logging.LoggerAdapter[Any]] = <Logger runtimepy.subprocess (WARNING)>, **kwargs) -> AsyncIterator[runtimepy.subprocess.protocol.RuntimepySubprocessProtocol]
Create a shell subprocess.

 
Data
        AsyncIterator = typing.AsyncIterator
LOG = <Logger runtimepy.subprocess (WARNING)>
LoggerType = typing.Union[logging.Logger, logging.LoggerAdapter[typing.Any]]
POLL_RATE = 2.0