runtimepy.net.arbiter.factory.task
index
/home/vkottler/src/vkottler/workspace/runtimepy/runtimepy/net/arbiter/factory/task.py

A module implementing task-factory registration.

 
Classes
       
runtimepy.net.arbiter.base.BaseConnectionArbiter(vcorelib.namespace.mixin.NamespaceMixin, vcorelib.logging.LoggerMixin, runtimepy.tui.mixin.TuiMixin)
TaskConnectionArbiter

 
class TaskConnectionArbiter(runtimepy.net.arbiter.base.BaseConnectionArbiter)
    TaskConnectionArbiter(manager: runtimepy.net.manager.ConnectionManager = None, stop_sig: asyncio.locks.Event = None, namespace: vcorelib.namespace.base.Namespace = None, logger: Union[logging.Logger, logging.LoggerAdapter[Any]] = None, app: Union[Callable[[runtimepy.net.arbiter.info.AppInfo], Awaitable[int]], list[Callable[[runtimepy.net.arbiter.info.AppInfo], Awaitable[int]]]] = None, config: Dict[str, Union[str, int, float, bool, NoneType, Dict[str, Union[str, int, float, bool, NoneType]], List[Union[str, int, float, bool, NoneType]]]] = None, window: Optional[Any] = None) -> None
 
A class for managing task factories.
 
 
Method resolution order:
TaskConnectionArbiter
runtimepy.net.arbiter.base.BaseConnectionArbiter
vcorelib.namespace.mixin.NamespaceMixin
vcorelib.logging.LoggerMixin
runtimepy.tui.mixin.TuiMixin
builtins.object

Methods defined here:
factory_task(self, factory: str, name: str, period_s: float = None, **kwargs) -> bool
Register a periodic task from one of the registered task factories.
register_task_factory(self, factory: runtimepy.net.arbiter.task.TaskFactory[runtimepy.net.arbiter.task.ArbiterTask], *namespaces: str) -> bool
Attempt to register a periodic task factory.

Data and other attributes defined here:
__annotations__ = {}

Methods inherited from runtimepy.net.arbiter.base.BaseConnectionArbiter:
__init__(self, manager: runtimepy.net.manager.ConnectionManager = None, stop_sig: asyncio.locks.Event = None, namespace: vcorelib.namespace.base.Namespace = None, logger: Union[logging.Logger, logging.LoggerAdapter[Any]] = None, app: Union[Callable[[runtimepy.net.arbiter.info.AppInfo], Awaitable[int]], list[Callable[[runtimepy.net.arbiter.info.AppInfo], Awaitable[int]]]] = None, config: Dict[str, Union[str, int, float, bool, NoneType, Dict[str, Union[str, int, float, bool, NoneType]], List[Union[str, int, float, bool, NoneType]]]] = None, window: Optional[Any] = None) -> None
Initialize this connection arbiter.
async app(self, app: Union[Callable[[runtimepy.net.arbiter.info.AppInfo], Awaitable[int]], list[Callable[[runtimepy.net.arbiter.info.AppInfo], Awaitable[int]]]] = None, check_connections: bool = True, config: Dict[str, Union[str, int, float, bool, NoneType, Dict[str, Union[str, int, float, bool, NoneType]], List[Union[str, int, float, bool, NoneType]]]] = None) -> int
Run the application alongside the connection manager and server tasks.
register_connection(self, connection: Union[runtimepy.net.connection.Connection, Awaitable[runtimepy.net.connection.Connection]], *names: str, delim: str = None) -> bool
Attempt to register a connection object.
run(self, app: Union[Callable[[runtimepy.net.arbiter.info.AppInfo], Awaitable[int]], list[Callable[[runtimepy.net.arbiter.info.AppInfo], Awaitable[int]]]] = None, eloop: asyncio.events.AbstractEventLoop = None, signals: Iterable[int] = None, check_connections: bool = True, config: Dict[str, Union[str, int, float, bool, NoneType, Dict[str, Union[str, int, float, bool, NoneType]], List[Union[str, int, float, bool, NoneType]]]] = None, enable_uvloop: bool = True) -> int
Run the application until the stop signal is set.

Methods inherited from vcorelib.namespace.mixin.NamespaceMixin:
child_namespace(self, *names: str, namespace: vcorelib.namespace.base.Namespace = None) -> vcorelib.namespace.base.Namespace
Obtain a child namespace.
names_pushed(self, *names: str, namespace: vcorelib.namespace.base.Namespace = None) -> Iterator[NoneType]
Apply some names to this object's namespace as a managed context.
namespace(self, name: str = None, delim: str = None, namespace: vcorelib.namespace.base.Namespace = None) -> str
Get a namespace string for this object.
namespace_search(self, *names: str, pattern: str = '.*', namespace: vcorelib.namespace.base.Namespace = None) -> Iterator[str]
Perform a search on the namespace.
namespace_suggest(self, data: str, delta: bool = True, namespace: vcorelib.namespace.base.Namespace = None) -> Optional[str]
Find the shortest name suggestion.
pop_name(self, name: str = None, namespace: vcorelib.namespace.base.Namespace = None) -> str
Pop the latest name off the stack.
push_name(self, name: str, namespace: vcorelib.namespace.base.Namespace = None) -> None
Push a name onto the stack.

Readonly properties inherited from vcorelib.namespace.mixin.NamespaceMixin:
ns
Get this instance's namespace.

Data descriptors inherited from vcorelib.namespace.mixin.NamespaceMixin:
__dict__
dictionary for instance variables
__weakref__
list of weak references to the object

Methods inherited from vcorelib.logging.LoggerMixin:
log_time(self, message: str, *args, level: int = 20, reminder: bool = False, **kwargs) -> Iterator[NoneType]
A simple wrapper.

Methods inherited from runtimepy.tui.mixin.TuiMixin:
async handle_char(self, char: int) -> bool
Handle character input.
init(self, window: Optional[Any]) -> bool
Initialize this interface's window.
tui_update(self) -> None
Re-draw the screen.
update_dimensions(self) -> Any
Handle an update to the window's dimensions.

Readonly properties inherited from runtimepy.tui.mixin.TuiMixin:
window
Get the window for this instance.

 
Data
        Factory = runtimepy.net.arbiter.task.TaskFactory[runtimepy.net.arbiter.task.ArbiterTask]