| |
- async close_protocol(protocol: runtimepy.subprocess.protocol.RuntimepySubprocessProtocol, poll_rate: float = 2.0, stdout: asyncio.queues.Queue[bytes] = None, stderr: asyncio.queues.Queue[bytes] = None, logger: Union[logging.Logger, logging.LoggerAdapter[Any]] = <Logger runtimepy.subprocess (WARNING)>) -> None
- Shutdown a protocol instance.
- get_running_loop()
- Return the running event loop. Raise a RuntimeError if there is none.
This function is thread-specific.
- async shutdown_protocol(protocol: runtimepy.subprocess.protocol.RuntimepySubprocessProtocol, poll_rate: float = 2.0, logger: Union[logging.Logger, logging.LoggerAdapter[Any]] = <Logger runtimepy.subprocess (WARNING)>) -> None
- Shutdown a subprocess protocol instance.
- spawn_exec(*args: str, stdout: asyncio.queues.Queue[bytes] = None, stderr: asyncio.queues.Queue[bytes] = None, logger: Union[logging.Logger, logging.LoggerAdapter[Any]] = <Logger runtimepy.subprocess (WARNING)>, **kwargs) -> AsyncIterator[runtimepy.subprocess.protocol.RuntimepySubprocessProtocol]
- Create a subprocess.
- spawn_shell(cmd: str, stdout: asyncio.queues.Queue[bytes] = None, stderr: asyncio.queues.Queue[bytes] = None, logger: Union[logging.Logger, logging.LoggerAdapter[Any]] = <Logger runtimepy.subprocess (WARNING)>, **kwargs) -> AsyncIterator[runtimepy.subprocess.protocol.RuntimepySubprocessProtocol]
- Create a shell subprocess.
|