runtimepy.net.arbiter.base
index
/home/vkottler/src/vkottler/workspace/runtimepy/runtimepy/net/arbiter/base.py

A module implementing a base connection-arbiter interface.

 
Modules
       
asyncio

 
Classes
       
runtimepy.tui.mixin.TuiMixin(builtins.object)
BaseConnectionArbiter(vcorelib.namespace.mixin.NamespaceMixin, vcorelib.logging.LoggerMixin, runtimepy.tui.mixin.TuiMixin)
vcorelib.logging.LoggerMixin(builtins.object)
BaseConnectionArbiter(vcorelib.namespace.mixin.NamespaceMixin, vcorelib.logging.LoggerMixin, runtimepy.tui.mixin.TuiMixin)
vcorelib.namespace.mixin.NamespaceMixin(builtins.object)
BaseConnectionArbiter(vcorelib.namespace.mixin.NamespaceMixin, vcorelib.logging.LoggerMixin, runtimepy.tui.mixin.TuiMixin)

 
class BaseConnectionArbiter(vcorelib.namespace.mixin.NamespaceMixin, vcorelib.logging.LoggerMixin, runtimepy.tui.mixin.TuiMixin)
    BaseConnectionArbiter(manager: runtimepy.net.manager.ConnectionManager = None, stop_sig: asyncio.locks.Event = None, namespace: vcorelib.namespace.base.Namespace = None, logger: Union[logging.Logger, logging.LoggerAdapter[Any]] = None, app: Union[Callable[[runtimepy.net.arbiter.info.AppInfo], Awaitable[int]], list[Callable[[runtimepy.net.arbiter.info.AppInfo], Awaitable[int]]]] = None, config: Dict[str, Union[str, int, float, bool, NoneType, Dict[str, Union[str, int, float, bool, NoneType]], List[Union[str, int, float, bool, NoneType]]]] = None, window: Optional[Any] = None) -> None
 
A class implementing a base connection-manager for a broader application.
 
 
Method resolution order:
BaseConnectionArbiter
vcorelib.namespace.mixin.NamespaceMixin
vcorelib.logging.LoggerMixin
runtimepy.tui.mixin.TuiMixin
builtins.object

Methods defined here:
__init__(self, manager: runtimepy.net.manager.ConnectionManager = None, stop_sig: asyncio.locks.Event = None, namespace: vcorelib.namespace.base.Namespace = None, logger: Union[logging.Logger, logging.LoggerAdapter[Any]] = None, app: Union[Callable[[runtimepy.net.arbiter.info.AppInfo], Awaitable[int]], list[Callable[[runtimepy.net.arbiter.info.AppInfo], Awaitable[int]]]] = None, config: Dict[str, Union[str, int, float, bool, NoneType, Dict[str, Union[str, int, float, bool, NoneType]], List[Union[str, int, float, bool, NoneType]]]] = None, window: Optional[Any] = None) -> None
Initialize this connection arbiter.
async app(self, app: Union[Callable[[runtimepy.net.arbiter.info.AppInfo], Awaitable[int]], list[Callable[[runtimepy.net.arbiter.info.AppInfo], Awaitable[int]]]] = None, check_connections: bool = True, config: Dict[str, Union[str, int, float, bool, NoneType, Dict[str, Union[str, int, float, bool, NoneType]], List[Union[str, int, float, bool, NoneType]]]] = None) -> int
Run the application alongside the connection manager and server tasks.
register_connection(self, connection: Union[runtimepy.net.connection.Connection, Awaitable[runtimepy.net.connection.Connection]], *names: str, delim: str = None) -> bool
Attempt to register a connection object.
run(self, app: Union[Callable[[runtimepy.net.arbiter.info.AppInfo], Awaitable[int]], list[Callable[[runtimepy.net.arbiter.info.AppInfo], Awaitable[int]]]] = None, eloop: asyncio.events.AbstractEventLoop = None, signals: Iterable[int] = None, check_connections: bool = True, config: Dict[str, Union[str, int, float, bool, NoneType, Dict[str, Union[str, int, float, bool, NoneType]], List[Union[str, int, float, bool, NoneType]]]] = None, enable_uvloop: bool = True) -> int
Run the application until the stop signal is set.

Data and other attributes defined here:
__annotations__ = {}

Methods inherited from vcorelib.namespace.mixin.NamespaceMixin:
child_namespace(self, *names: str, namespace: vcorelib.namespace.base.Namespace = None) -> vcorelib.namespace.base.Namespace
Obtain a child namespace.
names_pushed(self, *names: str, namespace: vcorelib.namespace.base.Namespace = None) -> Iterator[NoneType]
Apply some names to this object's namespace as a managed context.
namespace(self, name: str = None, delim: str = None, namespace: vcorelib.namespace.base.Namespace = None) -> str
Get a namespace string for this object.
namespace_search(self, *names: str, pattern: str = '.*', namespace: vcorelib.namespace.base.Namespace = None) -> Iterator[str]
Perform a search on the namespace.
namespace_suggest(self, data: str, delta: bool = True, namespace: vcorelib.namespace.base.Namespace = None) -> Optional[str]
Find the shortest name suggestion.
pop_name(self, name: str = None, namespace: vcorelib.namespace.base.Namespace = None) -> str
Pop the latest name off the stack.
push_name(self, name: str, namespace: vcorelib.namespace.base.Namespace = None) -> None
Push a name onto the stack.

Readonly properties inherited from vcorelib.namespace.mixin.NamespaceMixin:
ns
Get this instance's namespace.

Data descriptors inherited from vcorelib.namespace.mixin.NamespaceMixin:
__dict__
dictionary for instance variables
__weakref__
list of weak references to the object

Methods inherited from vcorelib.logging.LoggerMixin:
log_time(self, message: str, *args, level: int = 20, reminder: bool = False, **kwargs) -> Iterator[NoneType]
A simple wrapper.

Methods inherited from runtimepy.tui.mixin.TuiMixin:
async handle_char(self, char: int) -> bool
Handle character input.
init(self, window: Optional[Any]) -> bool
Initialize this interface's window.
tui_update(self) -> None
Re-draw the screen.
update_dimensions(self) -> Any
Handle an update to the window's dimensions.

Readonly properties inherited from runtimepy.tui.mixin.TuiMixin:
window
Get the window for this instance.

 
Functions
       
async init_only(app: runtimepy.net.arbiter.info.AppInfo) -> int
A network application that doesn't do anything.
normalize_app(app: Union[Callable[[runtimepy.net.arbiter.info.AppInfo], Awaitable[int]], list[Callable[[runtimepy.net.arbiter.info.AppInfo], Awaitable[int]]]] = None) -> list[list[typing.Callable[[runtimepy.net.arbiter.info.AppInfo], typing.Awaitable[int]]]]
Normalize some application parameter into a list of network applications.

 
Data
        ArbiterApps = list[list[typing.Callable[[runtimepy.net.arbiter.info.AppInfo], typing.Awaitable[int]]]]
ConnectionMap = typing.MutableMapping[str, runtimepy.net.connection.Connection]
NetworkApplication = typing.Callable[[runtimepy.net.arbiter.info.AppInfo], typing.Awaitable[int]]
NetworkApplicationlike = typing.Union[typing.Callable[[runtimepy.net.arbi...t.arbiter.info.AppInfo], typing.Awaitable[int]]]]
Optional = typing.Optional
RuntimeProcessTask = tuple[type[runtimepy.subprocess.peer.RuntimepyPe...g.Union[str, int, float, bool, NoneType]]]], str]
ServerTask = typing.Awaitable[NoneType]
TIMER = <vcorelib.math.time.Timer object>