runtimepy.mixins.async_command
index
/home/vkottler/src/vkottler/workspace/runtimepy/runtimepy/mixins/async_command.py

A module implementing an interface for classes with channel-command processors
that need to handle commands asynchronously.

 
Modules
       
asyncio

 
Classes
       
abc.ABC(builtins.object)
AsyncCommandProcessingMixin(vcorelib.logging.LoggerMixin, abc.ABC)
vcorelib.logging.LoggerMixin(builtins.object)
AsyncCommandProcessingMixin(vcorelib.logging.LoggerMixin, abc.ABC)

 
class AsyncCommandProcessingMixin(vcorelib.logging.LoggerMixin, abc.ABC)
    AsyncCommandProcessingMixin(logger: Union[logging.Logger, logging.LoggerAdapter[Any]] = None, logger_name: str = None) -> None
 
A class mixin for handling asynchronous commands.
 
 
Method resolution order:
AsyncCommandProcessingMixin
vcorelib.logging.LoggerMixin
abc.ABC
builtins.object

Methods defined here:
async handle_command(self, args: argparse.Namespace, channel: Union[runtimepy.primitives.field.BitField, runtimepy.channel.Channel[runtimepy.primitives.int.Int8Primitive], runtimepy.channel.Channel[runtimepy.primitives.int.Int16Primitive], runtimepy.channel.Channel[runtimepy.primitives.int.Int32Primitive], runtimepy.channel.Channel[runtimepy.primitives.int.Int64Primitive], runtimepy.channel.Channel[runtimepy.primitives.int.Uint8Primitive], runtimepy.channel.Channel[runtimepy.primitives.int.Uint16Primitive], runtimepy.channel.Channel[runtimepy.primitives.int.Uint32Primitive], runtimepy.channel.Channel[runtimepy.primitives.int.Uint64Primitive], runtimepy.channel.Channel[runtimepy.primitives.float.FloatPrimitive], runtimepy.channel.Channel[runtimepy.primitives.float.DoublePrimitive], runtimepy.channel.Channel[runtimepy.primitives.bool.BooleanPrimitive], NoneType]) -> None
Handle a command.
async process_command_queue(self) -> None
Process any outgoing command requests.

Data and other attributes defined here:
__abstractmethods__ = frozenset({'handle_command'})
__annotations__ = {'command': <class 'runtimepy.channel.environment.command.processor.ChannelCommandProcessor'>, 'outgoing_commands': asyncio.queues.Queue[tuple[argparse.Namespace, t...py.primitives.bool.BooleanPrimitive], NoneType]]]}

Methods inherited from vcorelib.logging.LoggerMixin:
__init__(self, logger: Union[logging.Logger, logging.LoggerAdapter[Any]] = None, logger_name: str = None) -> None
Initialize this object with logging capabilities.
log_time(self, message: str, *args, level: int = 20, reminder: bool = False, **kwargs) -> Iterator[NoneType]
A simple wrapper.

Data descriptors inherited from vcorelib.logging.LoggerMixin:
__dict__
dictionary for instance variables
__weakref__
list of weak references to the object

 
Data
        ChannelCommandParams = tuple[argparse.Namespace, typing.Union[runtimepy...epy.primitives.bool.BooleanPrimitive], NoneType]]
FieldOrChannel = typing.Union[runtimepy.primitives.field.BitField...nnel[runtimepy.primitives.bool.BooleanPrimitive]]
Optional = typing.Optional