Source code for runtimepy.task.asynchronous

"""
A module implementing an asynchronous task interface.
"""

# built-in
import asyncio as _asyncio
from logging import getLogger as _getLogger

# third-party
from vcorelib.logging import LoggerMixin as _LoggerMixin
from vcorelib.math import MovingAverage as _MovingAverage
from vcorelib.math import RateTracker as _RateTracker
from vcorelib.math import rate_str as _rate_str
from vcorelib.math import to_nanos

# internal
from runtimepy import METRICS_NAME
from runtimepy.channel.environment import (
    ChannelEnvironment as _ChannelEnvironment,
)


[docs] class AsyncTask(_LoggerMixin): """A basic implementation of a periodic task.""" def __init__( self, name: str, period_s: float, env: _ChannelEnvironment, average_depth: int = 10, max_iterations: int = 0, ) -> None: """Initialize this asynchronous task.""" super().__init__(logger=_getLogger(name)) self.name = name self.dispatch_rate = _RateTracker(depth=average_depth) self.dispatch_time = _MovingAverage(depth=average_depth) with env.names_pushed(name): # Track whether or not this task is currently enabled. self.enabled = env.bool_channel("enabled", commandable=True)[0] # Keep track of the time between task iterations in seconds. self.period_s = env.float_channel("period_s", commandable=True) self.period_s.raw.value = period_s # Allow commanding a maximum number of iterations. self.max_iterations = env.int_channel( "max_iterations", commandable=True )[0] self.max_iterations.raw.value = max_iterations with env.names_pushed(METRICS_NAME): # Track the number of times this task has been dispatched. self.dispatches = env.int_channel("dispatches", kind="uint8")[ 0 ] # Track metrics for how long this task takes to execute. self.rate_hz = env.float_channel("rate_hz") self.average_s = env.float_channel("average_s") self.max_s = env.float_channel("max_s") self.min_s = env.float_channel("min_s") # Initialize task-specific channels. self.init_channels(env) self.env = env
[docs] def init_channels(self, env: _ChannelEnvironment) -> None: """Initialize task-specific channels."""
[docs] async def init(self, *_, **__) -> bool: """Initialize this task.""" return True
[docs] async def dispatch(self, *_, **__) -> bool: """Dispatch this task.""" return True
[docs] def reset_metrics(self) -> None: """Reset metrics channel values.""" self.dispatch_rate.reset() self.dispatch_time.reset() self.dispatches.raw.value = 0 self.rate_hz.raw.value = 0.0 self.average_s.raw.value = 0.0 self.max_s.raw.value = 0.0 self.min_s.raw.value = 0.0
[docs] def log_metrics(self) -> None: """Log information related to metrics channels.""" self.logger.info( "'%s' dispatches: %d.", self.name, self.dispatches.raw.value ) self.logger.info( "'%s' rate: %0.6f Hz.", self.name, self.rate_hz.raw.value ) self.logger.info( "'%s' average time: %0.6fs.", self.name, self.average_s.raw.value ) self.logger.info( "'%s' max time: %0.6fs.", self.name, self.max_s.raw.value ) self.logger.info( "'%s' min time: %0.6fs.", self.name, self.min_s.raw.value )
@property def rate_str(self) -> str: """Get this periodic's rate as a string.""" return _rate_str(self.period_s.raw.value)
[docs] def enable(self) -> None: """Enable this task.""" self.enabled.raw.value = True
[docs] def disable(self) -> None: """Disable this task.""" self.enabled.raw.value = False
[docs] async def run( self, *args, stop_sig: _asyncio.Event = None, **kwargs ) -> None: """Run this task while it's enabled.""" eloop = _asyncio.get_running_loop() self.logger.info("Starting task '%s' at %s.", self.name, self.rate_str) self.reset_metrics() # Always re-enable the task if this is called. self.enable() # Run this task's initialization. self.enabled.raw.value &= await _asyncio.shield( self.init(*args, **kwargs) ) while self.enabled: # Keep track of the rate that this task is running at. start = eloop.time() self.rate_hz.raw.value = self.dispatch_rate(to_nanos(start)) self.enabled.raw.value &= await _asyncio.shield( self.dispatch(*args, **kwargs) ) exec_time = eloop.time() - start # Update runtime metrics. self.dispatches.raw.value += 1 self.average_s.raw.value = self.dispatch_time(exec_time) self.max_s.raw.value = self.dispatch_time.max self.min_s.raw.value = self.dispatch_time.min # Check if we've performed the maximum specified number of # dispatches. if ( self.max_iterations.raw.value > 0 and self.dispatches.raw.value >= self.max_iterations.raw.value ) or (stop_sig is not None and stop_sig.is_set()): self.disable() if self.enabled: try: await _asyncio.sleep( max(self.period_s.raw.value - exec_time, 0) ) except _asyncio.CancelledError: self.logger.warning("Task '%s' was cancelled!", self.name) self.disable() self.logger.info("Task '%s' stopped.", self.name) self.log_metrics()