runtimepy.net.arbiter
index
/home/vkottler/src/vkottler/workspace/runtimepy/runtimepy/net/arbiter/__init__.py

A module implementing a connection arbiter interface.

 
Package Contents
       
base
config (package)
factory (package)
housekeeping (package)
imports (package)
info
result
task
tcp (package)
udp
websocket

 
Classes
       
runtimepy.net.arbiter.config.ConfigConnectionArbiter(runtimepy.net.arbiter.imports.ImportConnectionArbiter)
ConnectionArbiter
runtimepy.task.basic.manager.PeriodicTaskManager(typing.Generic)
runtimepy.net.arbiter.task.ArbiterTaskManager
runtimepy.task.basic.periodic.PeriodicTask(runtimepy.mixins.logging.LoggerMixinLevelControl, runtimepy.mixins.environment.ChannelEnvironmentMixin, abc.ABC)
runtimepy.net.arbiter.task.ArbiterTask
typing.Generic(builtins.object)
runtimepy.net.arbiter.task.TaskFactory
vcorelib.logging.LoggerMixin(builtins.object)
runtimepy.net.arbiter.info.AppInfo

 
class AppInfo(vcorelib.logging.LoggerMixin)
    AppInfo(logger: Union[logging.Logger, logging.LoggerAdapter[Any]], stack: contextlib.AsyncExitStack, connections: MutableMapping[str, runtimepy.net.connection.Connection], conn_manager: runtimepy.net.manager.ConnectionManager, names: vcorelib.namespace.base.Namespace, stop: asyncio.locks.Event, config: Dict[str, Union[str, int, float, bool, NoneType, Dict[str, Union[str, int, float, bool, NoneType]], List[Union[str, int, float, bool, NoneType]]]], tui: runtimepy.tui.mixin.TuiMixin, tasks: dict[str, runtimepy.task.basic.periodic.PeriodicTask], task_manager: runtimepy.task.basic.manager.PeriodicTaskManager[typing.Any], results: list[list[runtimepy.net.arbiter.result.AppResult]], structs: dict[str, runtimepy.struct.RuntimeStructBase], peers: dict[str, '_RuntimepyPeer']) -> None
 
References provided to network applications.
 
 
Method resolution order:
AppInfo
vcorelib.logging.LoggerMixin
builtins.object

Methods defined here:
__eq__(self, other)
Return self==value.
__init__(self, logger: Union[logging.Logger, logging.LoggerAdapter[Any]], stack: contextlib.AsyncExitStack, connections: MutableMapping[str, runtimepy.net.connection.Connection], conn_manager: runtimepy.net.manager.ConnectionManager, names: vcorelib.namespace.base.Namespace, stop: asyncio.locks.Event, config: Dict[str, Union[str, int, float, bool, NoneType, Dict[str, Union[str, int, float, bool, NoneType]], List[Union[str, int, float, bool, NoneType]]]], tui: runtimepy.tui.mixin.TuiMixin, tasks: dict[str, runtimepy.task.basic.periodic.PeriodicTask], task_manager: runtimepy.task.basic.manager.PeriodicTaskManager[typing.Any], results: list[list[runtimepy.net.arbiter.result.AppResult]], structs: dict[str, runtimepy.struct.RuntimeStructBase], peers: dict[str, '_RuntimepyPeer']) -> None
Initialize this object with logging capabilities.
__repr__(self)
Return repr(self).
async all_finalized(self) -> None
Wait for all tasks and connections to be finalized.
config_param(self, key: str, default: ~Z, strict: bool = False) -> ~Z
Attempt to get a configuration parameter.
exceptions(self) -> Iterator[Exception]
Iterate over exceptions raised by the application.
original_config(self) -> dict[str, typing.Any]
Re-assemble a dictionary closer to the original configuration data
(than the .config attribute).
result(self, logger: Union[logging.Logger, logging.LoggerAdapter[Any]] = None) -> bool
Get the overall boolean result for the application.
search(self, *names: str, pattern: str = '.*', kind: type[~T] = <class 'runtimepy.net.connection.Connection'>) -> Iterator[~T]
Get all connections that are matching a naming convention or are
specific kind (or both).
search_tasks(self, kind: type[~V], pattern: str = '.*') -> Iterator[~V]
Search for tasks by type or pattern.
single(self, *names: str, pattern: str = '.*', kind: type[~T] = <class 'runtimepy.net.connection.Connection'>) -> ~T
Search for a single node.
with_new_logger(self, name: str) -> 'AppInfo'
Get a copy of this AppInfo instance, but with a new logger.

Readonly properties defined here:
raised_exception
Determine if the application raised any exception.

Data and other attributes defined here:
__annotations__ = {'config': typing.Dict[str, typing.Union[str, int, float, b...[typing.Union[str, int, float, bool, NoneType]]]], 'conn_manager': <class 'runtimepy.net.manager.ConnectionManager'>, 'connections': typing.MutableMapping[str, runtimepy.net.connection.Connection], 'logger': typing.Union[logging.Logger, logging.LoggerAdapter[typing.Any]], 'names': <class 'vcorelib.namespace.base.Namespace'>, 'peers': dict[str, '_RuntimepyPeer'], 'results': list[list[runtimepy.net.arbiter.result.AppResult]], 'stack': <class 'contextlib.AsyncExitStack'>, 'stop': <class 'asyncio.locks.Event'>, 'structs': dict[str, runtimepy.struct.RuntimeStructBase], ...}
__dataclass_fields__ = {'config': Field(name='config',type=typing.Dict[str, typing...appingproxy({}),kw_only=False,_field_type=_FIELD), 'conn_manager': Field(name='conn_manager',type=<class 'runtimepy...appingproxy({}),kw_only=False,_field_type=_FIELD), 'connections': Field(name='connections',type=typing.MutableMapp...appingproxy({}),kw_only=False,_field_type=_FIELD), 'logger': Field(name='logger',type=typing.Union[logging.Lo...appingproxy({}),kw_only=False,_field_type=_FIELD), 'names': Field(name='names',type=<class 'vcorelib.namespa...appingproxy({}),kw_only=False,_field_type=_FIELD), 'peers': Field(name='peers',type=dict[str, '_RuntimepyPee...appingproxy({}),kw_only=False,_field_type=_FIELD), 'results': Field(name='results',type=list[list[runtimepy.ne...appingproxy({}),kw_only=False,_field_type=_FIELD), 'stack': Field(name='stack',type=<class 'contextlib.Async...appingproxy({}),kw_only=False,_field_type=_FIELD), 'stop': Field(name='stop',type=<class 'asyncio.locks.Eve...appingproxy({}),kw_only=False,_field_type=_FIELD), 'structs': Field(name='structs',type=dict[str, runtimepy.st...appingproxy({}),kw_only=False,_field_type=_FIELD), ...}
__dataclass_params__ = _DataclassParams(init=True,repr=True,eq=True,ord...rue,kw_only=False,slots=False,weakref_slot=False)
__hash__ = None
__match_args__ = ('logger', 'stack', 'connections', 'conn_manager', 'names', 'stop', 'config', 'tui', 'tasks', 'task_manager', 'results', 'structs', 'peers')

Methods inherited from vcorelib.logging.LoggerMixin:
log_time(self, message: str, *args, level: int = 20, reminder: bool = False, **kwargs) -> Iterator[NoneType]
A simple wrapper.

Data descriptors inherited from vcorelib.logging.LoggerMixin:
__dict__
dictionary for instance variables
__weakref__
list of weak references to the object

 
class ArbiterTask(runtimepy.task.basic.periodic.PeriodicTask)
    ArbiterTask(name: 'str', average_depth: 'int' = 10, metrics: 'PeriodicTaskMetrics' = None, period_s: 'float' = 1.0, env: 'ChannelEnvironment' = None) -&gt; 'None'
 
A base class for arbiter periodic tasks.
 
 
Method resolution order:
ArbiterTask
runtimepy.task.basic.periodic.PeriodicTask
runtimepy.mixins.logging.LoggerMixinLevelControl
vcorelib.logging.LoggerMixin
runtimepy.mixins.environment.ChannelEnvironmentMixin
abc.ABC
builtins.object

Methods defined here:
async init(self, app: runtimepy.net.arbiter.info.AppInfo) -> None
Initialize this task with application information.

Data and other attributes defined here:
__abstractmethods__ = frozenset({'dispatch'})
__annotations__ = {'app': <class 'runtimepy.net.arbiter.info.AppInfo'>}
auto_finalize = False

Methods inherited from runtimepy.task.basic.periodic.PeriodicTask:
__init__(self, name: 'str', average_depth: 'int' = 10, metrics: 'PeriodicTaskMetrics' = None, period_s: 'float' = 1.0, env: 'ChannelEnvironment' = None) -> 'None'
Initialize this task.
disable(self) -> 'bool'
Disable this task, return whether or not any action was taken.
async dispatch(self) -> 'bool'
Dispatch an iteration of this task.
async run(self, period_s: 'float' = None, stop_sig: '_asyncio.Event' = None) -> 'None'
Run this task by executing the dispatch method at the specified period
until a dispatch iteration fails or the task is otherwise disabled.
set_period(self, period_s: 'float' = None) -> 'bool'
Attempt to set a new period for this task.
async stop(self) -> 'bool'
Wait for this task to stop running (if it is).
async stop_extra(self) -> 'None'
Extra actions to perform when this task is stopping.
async task(self, period_s: 'float' = None, stop_sig: '_asyncio.Event' = None) -> '_asyncio.Task[None]'
Create an event-loop task for this periodic.

Methods inherited from runtimepy.mixins.logging.LoggerMixinLevelControl:
setup_level_channel(self, env: runtimepy.channel.environment.ChannelEnvironment, name: str = 'log_level', initial: str = 'info', description: str = 'Text-log level filter for this environment.') -> None
Add a commandable log-level channel to the environment.

Methods inherited from vcorelib.logging.LoggerMixin:
log_time(self, message: str, *args, level: int = 20, reminder: bool = False, **kwargs) -> Iterator[NoneType]
A simple wrapper.

Data descriptors inherited from vcorelib.logging.LoggerMixin:
__dict__
dictionary for instance variables
__weakref__
list of weak references to the object

Methods inherited from runtimepy.mixins.environment.ChannelEnvironmentMixin:
register_channel_metrics(self, name: str, channel: runtimepy.metrics.channel.ChannelMetrics, verb: str) -> None
Register individual channel metrics.
register_connection_metrics(self, metrics: runtimepy.metrics.connection.ConnectionMetrics, namespace: str = 'metrics') -> None
Register connection metrics.
register_task_metrics(self, metrics: runtimepy.metrics.task.PeriodicTaskMetrics, namespace: str = 'metrics') -> None
Register periodic task metrics.

 
class ArbiterTaskManager(runtimepy.task.basic.manager.PeriodicTaskManager)
    ArbiterTaskManager() -&gt; None
 
A task-manger class for the connection arbiter.
 
 
Method resolution order:
ArbiterTaskManager
runtimepy.task.basic.manager.PeriodicTaskManager
typing.Generic
builtins.object

Data and other attributes defined here:
__orig_bases__ = (runtimepy.task.basic.manager.PeriodicTaskManager[runtimepy.net.arbiter.task.ArbiterTask],)
__parameters__ = ()

Methods inherited from runtimepy.task.basic.manager.PeriodicTaskManager:
__getitem__(self, name: str) -> ~T
Get a task by name.
__init__(self) -> None
Initialize this instance.
register(self, task: ~T, period_s: float = None) -> bool
Register a periodic task.
running(self, stop_sig: asyncio.locks.Event = None) -> AsyncIterator[NoneType]
Run tasks as an async context.
async start(self, stop_sig: asyncio.locks.Event = None) -> None
Ensure tasks are started.
async stop(self) -> None
Ensure tasks are stopped.

Readonly properties inherited from runtimepy.task.basic.manager.PeriodicTaskManager:
tasks
Iterate over tasks.

Data descriptors inherited from runtimepy.task.basic.manager.PeriodicTaskManager:
__dict__
dictionary for instance variables
__weakref__
list of weak references to the object

Class methods inherited from typing.Generic:
__class_getitem__(...) from builtins.type
Parameterizes a generic class.
 
At least, parameterizing a generic class is the *main* thing this
method does. For example, for some generic class `Foo`, this is called
when we do `Foo[int]` - there, with `cls=Foo` and `params=int`.
 
However, note that this method is also called when defining generic
classes in the first place with `class Foo[T]: ...`.
__init_subclass__(...) from builtins.type
Function to initialize subclasses.

 
class ConnectionArbiter(runtimepy.net.arbiter.config.ConfigConnectionArbiter)
    ConnectionArbiter(manager: runtimepy.net.manager.ConnectionManager = None, stop_sig: asyncio.locks.Event = None, namespace: vcorelib.namespace.base.Namespace = None, logger: Union[logging.Logger, logging.LoggerAdapter[Any]] = None, app: Union[Callable[[runtimepy.net.arbiter.info.AppInfo], Awaitable[int]], list[Callable[[runtimepy.net.arbiter.info.AppInfo], Awaitable[int]]]] = None, config: Dict[str, Union[str, int, float, bool, NoneType, Dict[str, Union[str, int, float, bool, NoneType]], List[Union[str, int, float, bool, NoneType]]]] = None, window: Optional[Any] = None) -&gt; None
 
A class implementing a connection manager for a broader application.
 
 
Method resolution order:
ConnectionArbiter
runtimepy.net.arbiter.config.ConfigConnectionArbiter
runtimepy.net.arbiter.imports.ImportConnectionArbiter
runtimepy.net.arbiter.factory.connection.FactoryConnectionArbiter
runtimepy.net.arbiter.factory.task.TaskConnectionArbiter
runtimepy.net.arbiter.base.BaseConnectionArbiter
vcorelib.namespace.mixin.NamespaceMixin
vcorelib.logging.LoggerMixin
runtimepy.tui.mixin.TuiMixin
builtins.object

Data and other attributes defined here:
__annotations__ = {}

Methods inherited from runtimepy.net.arbiter.config.ConfigConnectionArbiter:
async load_configs(self, paths: Iterable[Union[pathlib.Path, str, NoneType]], wait_for_stop: bool = False) -> None
Load a client and server configuration to the arbiter.
async process_config(self, config: runtimepy.net.arbiter.config.codec.ConnectionArbiterConfig, wait_for_stop: bool = False) -> None
Register clients and servers from a configuration object.

Class methods inherited from runtimepy.net.arbiter.config.ConfigConnectionArbiter:
add_search_package(name: str, front: bool = True) -> None from builtins.type
Add a package to the search path.

Data and other attributes inherited from runtimepy.net.arbiter.config.ConfigConnectionArbiter:
search_packages = ['runtimepy']

Methods inherited from runtimepy.net.arbiter.imports.ImportConnectionArbiter:
factory_process(self, factory: str, name: str, config: Dict[str, Union[str, int, float, bool, NoneType, Dict[str, Union[str, int, float, bool, NoneType]], List[Union[str, int, float, bool, NoneType]]]], program: str) -> bool
Register a runtime process.
factory_struct(self, factory: str, name: str, config: Dict[str, Union[str, int, float, bool, NoneType, Dict[str, Union[str, int, float, bool, NoneType]], List[Union[str, int, float, bool, NoneType]]]]) -> bool
Register a runtime structure from factory and name.
register_module_factory(self, module_path: str, *namespaces: str, **kwargs) -> bool
Attempt to register a factory class based on its module path.
register_peer_factory(self, factory: type[runtimepy.subprocess.peer.RuntimepyPeer], *namespaces: str) -> bool
Attempt to register a subprocess peer factory.
register_struct_factory(self, factory: type[runtimepy.net.arbiter.info.RuntimeStruct], *namespaces: str) -> bool
Attempt to register a struct factory.

Methods inherited from runtimepy.net.arbiter.factory.connection.FactoryConnectionArbiter:
async factory_client(self, factory: str, name: str, *args, defer: bool = False, **kwargs) -> bool
Attempt to register a client connection using a registered factory.
async factory_server(self, factory: str, *args, **kwargs) -> bool
Attempt to create a server task using a registered factory.
register_connection_factory(self, factory: runtimepy.net.arbiter.factory.connection.ConnectionFactory, *namespaces: str) -> bool
Attempt to register a connection factory.

Methods inherited from runtimepy.net.arbiter.factory.task.TaskConnectionArbiter:
factory_task(self, factory: str, name: str, period_s: float = None, **kwargs) -> bool
Register a periodic task from one of the registered task factories.
register_task_factory(self, factory: runtimepy.net.arbiter.task.TaskFactory[runtimepy.net.arbiter.task.ArbiterTask], *namespaces: str) -> bool
Attempt to register a periodic task factory.

Methods inherited from runtimepy.net.arbiter.base.BaseConnectionArbiter:
__init__(self, manager: runtimepy.net.manager.ConnectionManager = None, stop_sig: asyncio.locks.Event = None, namespace: vcorelib.namespace.base.Namespace = None, logger: Union[logging.Logger, logging.LoggerAdapter[Any]] = None, app: Union[Callable[[runtimepy.net.arbiter.info.AppInfo], Awaitable[int]], list[Callable[[runtimepy.net.arbiter.info.AppInfo], Awaitable[int]]]] = None, config: Dict[str, Union[str, int, float, bool, NoneType, Dict[str, Union[str, int, float, bool, NoneType]], List[Union[str, int, float, bool, NoneType]]]] = None, window: Optional[Any] = None) -> None
Initialize this connection arbiter.
async app(self, app: Union[Callable[[runtimepy.net.arbiter.info.AppInfo], Awaitable[int]], list[Callable[[runtimepy.net.arbiter.info.AppInfo], Awaitable[int]]]] = None, check_connections: bool = True, config: Dict[str, Union[str, int, float, bool, NoneType, Dict[str, Union[str, int, float, bool, NoneType]], List[Union[str, int, float, bool, NoneType]]]] = None) -> int
Run the application alongside the connection manager and server tasks.
register_connection(self, connection: Union[runtimepy.net.connection.Connection, Awaitable[runtimepy.net.connection.Connection]], *names: str, delim: str = None) -> bool
Attempt to register a connection object.
run(self, app: Union[Callable[[runtimepy.net.arbiter.info.AppInfo], Awaitable[int]], list[Callable[[runtimepy.net.arbiter.info.AppInfo], Awaitable[int]]]] = None, eloop: asyncio.events.AbstractEventLoop = None, signals: Iterable[int] = None, check_connections: bool = True, config: Dict[str, Union[str, int, float, bool, NoneType, Dict[str, Union[str, int, float, bool, NoneType]], List[Union[str, int, float, bool, NoneType]]]] = None, enable_uvloop: bool = True) -> int
Run the application until the stop signal is set.

Methods inherited from vcorelib.namespace.mixin.NamespaceMixin:
child_namespace(self, *names: str, namespace: vcorelib.namespace.base.Namespace = None) -> vcorelib.namespace.base.Namespace
Obtain a child namespace.
names_pushed(self, *names: str, namespace: vcorelib.namespace.base.Namespace = None) -> Iterator[NoneType]
Apply some names to this object's namespace as a managed context.
namespace(self, name: str = None, delim: str = None, namespace: vcorelib.namespace.base.Namespace = None) -> str
Get a namespace string for this object.
namespace_search(self, *names: str, pattern: str = '.*', namespace: vcorelib.namespace.base.Namespace = None) -> Iterator[str]
Perform a search on the namespace.
namespace_suggest(self, data: str, delta: bool = True, namespace: vcorelib.namespace.base.Namespace = None) -> Optional[str]
Find the shortest name suggestion.
pop_name(self, name: str = None, namespace: vcorelib.namespace.base.Namespace = None) -> str
Pop the latest name off the stack.
push_name(self, name: str, namespace: vcorelib.namespace.base.Namespace = None) -> None
Push a name onto the stack.

Readonly properties inherited from vcorelib.namespace.mixin.NamespaceMixin:
ns
Get this instance's namespace.

Data descriptors inherited from vcorelib.namespace.mixin.NamespaceMixin:
__dict__
dictionary for instance variables
__weakref__
list of weak references to the object

Methods inherited from vcorelib.logging.LoggerMixin:
log_time(self, message: str, *args, level: int = 20, reminder: bool = False, **kwargs) -> Iterator[NoneType]
A simple wrapper.

Methods inherited from runtimepy.tui.mixin.TuiMixin:
async handle_char(self, char: int) -> bool
Handle character input.
init(self, window: Optional[Any]) -> bool
Initialize this interface's window.
tui_update(self) -> None
Re-draw the screen.
update_dimensions(self) -> Any
Handle an update to the window's dimensions.

Readonly properties inherited from runtimepy.tui.mixin.TuiMixin:
window
Get the window for this instance.

 
class TaskFactory(typing.Generic)
    A task-factory base class.
 
 
Method resolution order:
TaskFactory
typing.Generic
builtins.object

Data descriptors defined here:
__dict__
dictionary for instance variables
__weakref__
list of weak references to the object

Data and other attributes defined here:
__annotations__ = {'kind': type[~T]}
__orig_bases__ = (typing.Generic[~T],)
__parameters__ = (~T,)

Class methods inherited from typing.Generic:
__class_getitem__(...) from builtins.type
Parameterizes a generic class.
 
At least, parameterizing a generic class is the *main* thing this
method does. For example, for some generic class `Foo`, this is called
when we do `Foo[int]` - there, with `cls=Foo` and `params=int`.
 
However, note that this method is also called when defining generic
classes in the first place with `class Foo[T]: ...`.
__init_subclass__(...) from builtins.type
Function to initialize subclasses.

 
Functions
       
async init_only(app: runtimepy.net.arbiter.info.AppInfo) -> int
A network application that doesn't do anything.

 
Data
        ConnectionMap = typing.MutableMapping[str, runtimepy.net.connection.Connection]
NetworkApplication = typing.Callable[[runtimepy.net.arbiter.info.AppInfo], typing.Awaitable[int]]
__all__ = ['AppInfo', 'ArbiterTask', 'ArbiterTaskManager', 'ConnectionArbiter', 'ConnectionMap', 'NetworkApplication', 'init_only', 'TaskFactory']