vcorelib.task.subprocess package#
Submodules#
vcorelib.task.subprocess.run module#
A task definition for wrapping subprocess’s ‘run’ method.
- class vcorelib.task.subprocess.run.SubprocessExec(name: str, *args, execute: Callable[[Dict[str, Dict[str, Any]], Dict[str, Any]], Coroutine[Any, Any, bool]] = None, log: Logger = None, timer: Timer = None, target: Target = None, **kwargs)[source]#
Bases:
SubprocessLogMixin
A task wrapping a subprocess.
- async run(inbox: Dict[str, Dict[str, Any]], outbox: Dict[str, Any], *caller_args, args: str = '--version', program: str = '/home/vkottler/src/vkottler/workspace/vcorelib/venv3.12/bin/python', separator: str = '::', require_success: bool = True, **kwargs) bool [source]#
Create a subprocess, wait for it to exit and add results to the outbox.
- class vcorelib.task.subprocess.run.SubprocessExecStreamed(name: str, *args, execute: Callable[[Dict[str, Dict[str, Any]], Dict[str, Any]], Coroutine[Any, Any, bool]] = None, log: Logger = None, timer: Timer = None, target: Target = None, **kwargs)[source]#
Bases:
SubprocessLogMixin
A task wrapping a subprocess.
- async run(inbox: Dict[str, Dict[str, Any]], outbox: Dict[str, Any], *caller_args, args: str = '--version', program: str = '/home/vkottler/src/vkottler/workspace/vcorelib/venv3.12/bin/python', separator: str = '::', require_success: bool = True, **kwargs) bool [source]#
Create a subprocess, wait for it to exit and add results to the outbox.
- class vcorelib.task.subprocess.run.SubprocessLogMixin(name: str, *args, execute: Callable[[Dict[str, Dict[str, Any]], Dict[str, Any]], Coroutine[Any, Any, bool]] = None, log: Logger = None, timer: Timer = None, target: Target = None, **kwargs)[source]#
Bases:
Task
A class for creating asyncio subprocesses and logging what gets created.
- async exec(program: str, *caller_args: str, args: str = '', separator: str = '::', stdout: int = None, stderr: int = None, **kwargs) bool [source]#
Execute a command and return whether or not it succeeded.
- async shell(cmd: str, *caller_args: str, args: str = '', joiner: str = ' ', separator: str = '::', stdout: int = None, stderr: int = None, **kwargs) bool [source]#
Execute a shell command and return whether or not it succeeded.
- async shell_cmd_in_dir(path: Path, cmd: List[str], joiner: str = ' ', cd: str = 'cd', **kwargs) bool [source]#
Run a shell command in a specific directory.
- class vcorelib.task.subprocess.run.SubprocessShell(name: str, *args, execute: Callable[[Dict[str, Dict[str, Any]], Dict[str, Any]], Coroutine[Any, Any, bool]] = None, log: Logger = None, timer: Timer = None, target: Target = None, **kwargs)[source]#
Bases:
SubprocessLogMixin
A task wrapping a shell command.
- async run(inbox: Dict[str, Dict[str, Any]], outbox: Dict[str, Any], *caller_args, args: str = '', cmd: str = '/home/vkottler/src/vkottler/workspace/vcorelib/venv3.12/bin/python --version', joiner: str = ' ', separator: str = '::', require_success: bool = True, **kwargs) bool [source]#
Run a shell command, wait for it to exit and add results to the outbox.
- class vcorelib.task.subprocess.run.SubprocessShellStreamed(name: str, *args, execute: Callable[[Dict[str, Dict[str, Any]], Dict[str, Any]], Coroutine[Any, Any, bool]] = None, log: Logger = None, timer: Timer = None, target: Target = None, **kwargs)[source]#
Bases:
SubprocessLogMixin
A task wrapping a shell command.
- async run(inbox: Dict[str, Dict[str, Any]], outbox: Dict[str, Any], *caller_args, args: str = '', cmd: str = '/home/vkottler/src/vkottler/workspace/vcorelib/venv3.12/bin/python --version', joiner: str = ' ', separator: str = '::', require_success: bool = True, **kwargs) bool [source]#
Run a shell command, wait for it to exit and add results to the outbox.