Source code for runtimepy.task.basic.manager
"""
A module implementing a periodic-task manager.
"""
# built-in
import asyncio as _asyncio
from contextlib import asynccontextmanager as _asynccontextmanager
from typing import AsyncIterator as _AsyncIterator
from typing import Generic as _Generic
from typing import Iterator as _Iterator
from typing import TypeVar as _TypeVar
# internal
from runtimepy.task.basic.periodic import PeriodicTask as _PeriodicTask
T = _TypeVar("T", bound=_PeriodicTask)
[docs]
class PeriodicTaskManager(_Generic[T]):
"""A class for managing periodic tasks as a single group."""
def __init__(self) -> None:
"""Initialize this instance."""
self._tasks: dict[str, T] = {}
[docs]
def register(self, task: T, period_s: float = None) -> bool:
"""Register a periodic task."""
result = task.name not in self._tasks
if result:
self._tasks[task.name] = task
task.set_period(period_s=period_s)
return result
@property
def tasks(self) -> _Iterator[T]:
"""Iterate over tasks."""
yield from self._tasks.values()
def __getitem__(self, name: str) -> T:
"""Get a task by name."""
return self._tasks[name]
[docs]
async def start(self, stop_sig: _asyncio.Event = None) -> None:
"""Ensure tasks are started."""
await _asyncio.gather(
*(x.task(stop_sig=stop_sig) for x in self._tasks.values())
)
[docs]
async def stop(self) -> None:
"""Ensure tasks are stopped."""
await _asyncio.gather(*(x.stop() for x in self._tasks.values()))
[docs]
@_asynccontextmanager
async def running(
self, stop_sig: _asyncio.Event = None
) -> _AsyncIterator[None]:
"""Run tasks as an async context."""
await self.start(stop_sig=stop_sig)
try:
yield
finally:
await self.stop()