runtimepy.task.basic package#

Submodules#

runtimepy.task.basic.manager module#

A module implementing a periodic-task manager.

class runtimepy.task.basic.manager.PeriodicTaskManager[source]#

Bases: Generic[T]

A class for managing periodic tasks as a single group.

register(task: T, period_s: float = None) bool[source]#

Register a periodic task.

running(stop_sig: Event = None) AsyncIterator[None][source]#

Run tasks as an async context.

async start(stop_sig: Event = None) None[source]#

Ensure tasks are started.

async stop() None[source]#

Ensure tasks are stopped.

property tasks: Iterator[T]#

Iterate over tasks.

runtimepy.task.basic.periodic module#

A module implementing a basic periodic task.

class runtimepy.task.basic.periodic.PeriodicTask(name: str, average_depth: int = 10, metrics: PeriodicTaskMetrics = None, period_s: float = 1.0, env: ChannelEnvironment = None)[source]#

Bases: LoggerMixinLevelControl, ChannelEnvironmentMixin, ABC

A class implementing a simple periodic-task interface.

auto_finalize = True#
disable() bool[source]#

Disable this task, return whether or not any action was taken.

abstract async dispatch() bool[source]#

Dispatch an iteration of this task.

env: ChannelEnvironment#
logger: LoggerType#
async run(period_s: float = None, stop_sig: Event = None) None[source]#

Run this task by executing the dispatch method at the specified period until a dispatch iteration fails or the task is otherwise disabled.

set_period(period_s: float = None) bool[source]#

Attempt to set a new period for this task.

async stop() bool[source]#

Wait for this task to stop running (if it is).

async stop_extra() None[source]#

Extra actions to perform when this task is stopping.

async task(period_s: float = None, stop_sig: Event = None) Task[None][source]#

Create an event-loop task for this periodic.

Module contents#

A module implementing a simple periodic-task interface.

class runtimepy.task.basic.PeriodicTask(name: str, average_depth: int = 10, metrics: PeriodicTaskMetrics = None, period_s: float = 1.0, env: ChannelEnvironment = None)[source]#

Bases: LoggerMixinLevelControl, ChannelEnvironmentMixin, ABC

A class implementing a simple periodic-task interface.

auto_finalize = True#
disable() bool[source]#

Disable this task, return whether or not any action was taken.

abstract async dispatch() bool[source]#

Dispatch an iteration of this task.

env: ChannelEnvironment#
logger: LoggerType#
async run(period_s: float = None, stop_sig: Event = None) None[source]#

Run this task by executing the dispatch method at the specified period until a dispatch iteration fails or the task is otherwise disabled.

set_period(period_s: float = None) bool[source]#

Attempt to set a new period for this task.

async stop() bool[source]#

Wait for this task to stop running (if it is).

async stop_extra() None[source]#

Extra actions to perform when this task is stopping.

async task(period_s: float = None, stop_sig: Event = None) Task[None][source]#

Create an event-loop task for this periodic.

class runtimepy.task.basic.PeriodicTaskManager[source]#

Bases: Generic[T]

A class for managing periodic tasks as a single group.

register(task: T, period_s: float = None) bool[source]#

Register a periodic task.

running(stop_sig: Event = None) AsyncIterator[None][source]#

Run tasks as an async context.

async start(stop_sig: Event = None) None[source]#

Ensure tasks are started.

async stop() None[source]#

Ensure tasks are stopped.

property tasks: Iterator[T]#

Iterate over tasks.