| |
- abc.ABC(builtins.object)
-
- RuntimeStruct(runtimepy.struct.RuntimeStructBase, abc.ABC)
-
- TrigStruct(RuntimeStruct, runtimepy.mixins.trig.TrigMixin)
-
- SampleStruct
- runtimepy.struct.RuntimeStructBase(runtimepy.mixins.logging.LoggerMixinLevelControl, runtimepy.mixins.environment.ChannelEnvironmentMixin)
-
- RuntimeStruct(runtimepy.struct.RuntimeStructBase, abc.ABC)
-
- TrigStruct(RuntimeStruct, runtimepy.mixins.trig.TrigMixin)
-
- SampleStruct
- vcorelib.logging.LoggerMixin(builtins.object)
-
- AppInfo
class AppInfo(vcorelib.logging.LoggerMixin) |
|
AppInfo(logger: Union[logging.Logger, logging.LoggerAdapter[Any]], stack: contextlib.AsyncExitStack, connections: MutableMapping[str, runtimepy.net.connection.Connection], conn_manager: runtimepy.net.manager.ConnectionManager, names: vcorelib.namespace.base.Namespace, stop: asyncio.locks.Event, config: Dict[str, Union[str, int, float, bool, NoneType, Dict[str, Union[str, int, float, bool, NoneType]], List[Union[str, int, float, bool, NoneType]]]], tui: runtimepy.tui.mixin.TuiMixin, tasks: dict[str, runtimepy.task.basic.periodic.PeriodicTask], task_manager: runtimepy.task.basic.manager.PeriodicTaskManager[typing.Any], results: list[list[runtimepy.net.arbiter.result.AppResult]], structs: dict[str, runtimepy.struct.RuntimeStructBase], peers: dict[str, '_RuntimepyPeer']) -> None
References provided to network applications. |
|
- Method resolution order:
- AppInfo
- vcorelib.logging.LoggerMixin
- builtins.object
Methods defined here:
- __eq__(self, other)
- Return self==value.
- __init__(self, logger: Union[logging.Logger, logging.LoggerAdapter[Any]], stack: contextlib.AsyncExitStack, connections: MutableMapping[str, runtimepy.net.connection.Connection], conn_manager: runtimepy.net.manager.ConnectionManager, names: vcorelib.namespace.base.Namespace, stop: asyncio.locks.Event, config: Dict[str, Union[str, int, float, bool, NoneType, Dict[str, Union[str, int, float, bool, NoneType]], List[Union[str, int, float, bool, NoneType]]]], tui: runtimepy.tui.mixin.TuiMixin, tasks: dict[str, runtimepy.task.basic.periodic.PeriodicTask], task_manager: runtimepy.task.basic.manager.PeriodicTaskManager[typing.Any], results: list[list[runtimepy.net.arbiter.result.AppResult]], structs: dict[str, runtimepy.struct.RuntimeStructBase], peers: dict[str, '_RuntimepyPeer']) -> None
- Initialize this object with logging capabilities.
- __repr__(self)
- Return repr(self).
- async all_finalized(self) -> None
- Wait for all tasks and connections to be finalized.
- config_param(self, key: str, default: ~Z, strict: bool = False) -> ~Z
- Attempt to get a configuration parameter.
- exceptions(self) -> Iterator[Exception]
- Iterate over exceptions raised by the application.
- original_config(self) -> dict[str, typing.Any]
- Re-assemble a dictionary closer to the original configuration data
(than the .config attribute).
- result(self, logger: Union[logging.Logger, logging.LoggerAdapter[Any]] = None) -> bool
- Get the overall boolean result for the application.
- search(self, *names: str, pattern: str = '.*', kind: type[~T] = <class 'runtimepy.net.connection.Connection'>) -> Iterator[~T]
- Get all connections that are matching a naming convention or are
specific kind (or both).
- search_tasks(self, kind: type[~V], pattern: str = '.*') -> Iterator[~V]
- Search for tasks by type or pattern.
- single(self, *names: str, pattern: str = '.*', kind: type[~T] = <class 'runtimepy.net.connection.Connection'>) -> ~T
- Search for a single node.
- with_new_logger(self, name: str) -> 'AppInfo'
- Get a copy of this AppInfo instance, but with a new logger.
Readonly properties defined here:
- raised_exception
- Determine if the application raised any exception.
Data and other attributes defined here:
- __annotations__ = {'config': typing.Dict[str, typing.Union[str, int, float, b...[typing.Union[str, int, float, bool, NoneType]]]], 'conn_manager': <class 'runtimepy.net.manager.ConnectionManager'>, 'connections': typing.MutableMapping[str, runtimepy.net.connection.Connection], 'logger': typing.Union[logging.Logger, logging.LoggerAdapter[typing.Any]], 'names': <class 'vcorelib.namespace.base.Namespace'>, 'peers': dict[str, '_RuntimepyPeer'], 'results': list[list[runtimepy.net.arbiter.result.AppResult]], 'stack': <class 'contextlib.AsyncExitStack'>, 'stop': <class 'asyncio.locks.Event'>, 'structs': dict[str, runtimepy.struct.RuntimeStructBase], ...}
- __dataclass_fields__ = {'config': Field(name='config',type=typing.Dict[str, typing...appingproxy({}),kw_only=False,_field_type=_FIELD), 'conn_manager': Field(name='conn_manager',type=<class 'runtimepy...appingproxy({}),kw_only=False,_field_type=_FIELD), 'connections': Field(name='connections',type=typing.MutableMapp...appingproxy({}),kw_only=False,_field_type=_FIELD), 'logger': Field(name='logger',type=typing.Union[logging.Lo...appingproxy({}),kw_only=False,_field_type=_FIELD), 'names': Field(name='names',type=<class 'vcorelib.namespa...appingproxy({}),kw_only=False,_field_type=_FIELD), 'peers': Field(name='peers',type=dict[str, '_RuntimepyPee...appingproxy({}),kw_only=False,_field_type=_FIELD), 'results': Field(name='results',type=list[list[runtimepy.ne...appingproxy({}),kw_only=False,_field_type=_FIELD), 'stack': Field(name='stack',type=<class 'contextlib.Async...appingproxy({}),kw_only=False,_field_type=_FIELD), 'stop': Field(name='stop',type=<class 'asyncio.locks.Eve...appingproxy({}),kw_only=False,_field_type=_FIELD), 'structs': Field(name='structs',type=dict[str, runtimepy.st...appingproxy({}),kw_only=False,_field_type=_FIELD), ...}
- __dataclass_params__ = _DataclassParams(init=True,repr=True,eq=True,ord...rue,kw_only=False,slots=False,weakref_slot=False)
- __hash__ = None
- __match_args__ = ('logger', 'stack', 'connections', 'conn_manager', 'names', 'stop', 'config', 'tui', 'tasks', 'task_manager', 'results', 'structs', 'peers')
Methods inherited from vcorelib.logging.LoggerMixin:
- log_time(self, message: str, *args, level: int = 20, reminder: bool = False, **kwargs) -> Iterator[NoneType]
- A simple wrapper.
Data descriptors inherited from vcorelib.logging.LoggerMixin:
- __dict__
- dictionary for instance variables
- __weakref__
- list of weak references to the object
|
class RuntimeStruct(runtimepy.struct.RuntimeStructBase, abc.ABC) |
|
RuntimeStruct(name: str, config: Dict[str, Union[str, int, float, bool, NoneType, Dict[str, Union[str, int, float, bool, NoneType]], List[Union[str, int, float, bool, NoneType]]]]) -> None
A class implementing a base runtime structure. |
|
- Method resolution order:
- RuntimeStruct
- runtimepy.struct.RuntimeStructBase
- runtimepy.mixins.logging.LoggerMixinLevelControl
- vcorelib.logging.LoggerMixin
- runtimepy.mixins.environment.ChannelEnvironmentMixin
- abc.ABC
- builtins.object
Methods defined here:
- async build(self, app: 'AppInfo') -> None
- Build a struct instance's channel environment.
- init_env(self) -> None
- Initialize this sample environment.
Data and other attributes defined here:
- __abstractmethods__ = frozenset()
- __annotations__ = {'app': 'AppInfo'}
Methods inherited from runtimepy.struct.RuntimeStructBase:
- __init__(self, name: str, config: Dict[str, Union[str, int, float, bool, NoneType, Dict[str, Union[str, int, float, bool, NoneType]], List[Union[str, int, float, bool, NoneType]]]]) -> None
- Initialize this instance.
- poll(self) -> None
- A method that other runtime entities can call to perform canonical
updates to this struct's environment.
Methods inherited from runtimepy.mixins.logging.LoggerMixinLevelControl:
- setup_level_channel(self, env: runtimepy.channel.environment.ChannelEnvironment, name: str = 'log_level', initial: str = 'info', description: str = 'Text-log level filter for this environment.') -> None
- Add a commandable log-level channel to the environment.
Methods inherited from vcorelib.logging.LoggerMixin:
- log_time(self, message: str, *args, level: int = 20, reminder: bool = False, **kwargs) -> Iterator[NoneType]
- A simple wrapper.
Data descriptors inherited from vcorelib.logging.LoggerMixin:
- __dict__
- dictionary for instance variables
- __weakref__
- list of weak references to the object
Methods inherited from runtimepy.mixins.environment.ChannelEnvironmentMixin:
- register_channel_metrics(self, name: str, channel: runtimepy.metrics.channel.ChannelMetrics, verb: str) -> None
- Register individual channel metrics.
- register_connection_metrics(self, metrics: runtimepy.metrics.connection.ConnectionMetrics, namespace: str = 'metrics') -> None
- Register connection metrics.
- register_task_metrics(self, metrics: runtimepy.metrics.task.PeriodicTaskMetrics, namespace: str = 'metrics') -> None
- Register periodic task metrics.
|
class SampleStruct(TrigStruct) |
|
SampleStruct(name: str, config: Dict[str, Union[str, int, float, bool, NoneType, Dict[str, Union[str, int, float, bool, NoneType]], List[Union[str, int, float, bool, NoneType]]]]) -> None
A sample runtime structure. |
|
- Method resolution order:
- SampleStruct
- TrigStruct
- RuntimeStruct
- runtimepy.struct.RuntimeStructBase
- runtimepy.mixins.logging.LoggerMixinLevelControl
- vcorelib.logging.LoggerMixin
- runtimepy.mixins.environment.ChannelEnvironmentMixin
- abc.ABC
- runtimepy.mixins.trig.TrigMixin
- builtins.object
Methods defined here:
- init_env(self) -> None
- Initialize this sample environment.
- poll(self) -> None
- A method that other runtime entities can call to perform canonical
updates to this struct's environment.
Data and other attributes defined here:
- __abstractmethods__ = frozenset()
- __annotations__ = {}
Methods inherited from RuntimeStruct:
- async build(self, app: 'AppInfo') -> None
- Build a struct instance's channel environment.
Methods inherited from runtimepy.struct.RuntimeStructBase:
- __init__(self, name: str, config: Dict[str, Union[str, int, float, bool, NoneType, Dict[str, Union[str, int, float, bool, NoneType]], List[Union[str, int, float, bool, NoneType]]]]) -> None
- Initialize this instance.
Methods inherited from runtimepy.mixins.logging.LoggerMixinLevelControl:
- setup_level_channel(self, env: runtimepy.channel.environment.ChannelEnvironment, name: str = 'log_level', initial: str = 'info', description: str = 'Text-log level filter for this environment.') -> None
- Add a commandable log-level channel to the environment.
Methods inherited from vcorelib.logging.LoggerMixin:
- log_time(self, message: str, *args, level: int = 20, reminder: bool = False, **kwargs) -> Iterator[NoneType]
- A simple wrapper.
Data descriptors inherited from vcorelib.logging.LoggerMixin:
- __dict__
- dictionary for instance variables
- __weakref__
- list of weak references to the object
Methods inherited from runtimepy.mixins.environment.ChannelEnvironmentMixin:
- register_channel_metrics(self, name: str, channel: runtimepy.metrics.channel.ChannelMetrics, verb: str) -> None
- Register individual channel metrics.
- register_connection_metrics(self, metrics: runtimepy.metrics.connection.ConnectionMetrics, namespace: str = 'metrics') -> None
- Register connection metrics.
- register_task_metrics(self, metrics: runtimepy.metrics.task.PeriodicTaskMetrics, namespace: str = 'metrics') -> None
- Register periodic task metrics.
Methods inherited from runtimepy.mixins.trig.TrigMixin:
- dispatch_trig(self, step: int) -> None
- Dispatch trig channel updates.
|
class TrigStruct(RuntimeStruct, runtimepy.mixins.trig.TrigMixin) |
|
TrigStruct(name: str, config: Dict[str, Union[str, int, float, bool, NoneType, Dict[str, Union[str, int, float, bool, NoneType]], List[Union[str, int, float, bool, NoneType]]]]) -> None
A simple trig struct. |
|
- Method resolution order:
- TrigStruct
- RuntimeStruct
- runtimepy.struct.RuntimeStructBase
- runtimepy.mixins.logging.LoggerMixinLevelControl
- vcorelib.logging.LoggerMixin
- runtimepy.mixins.environment.ChannelEnvironmentMixin
- abc.ABC
- runtimepy.mixins.trig.TrigMixin
- builtins.object
Methods defined here:
- init_env(self) -> None
- Initialize this sample environment.
- poll(self) -> None
- A method that other runtime entities can call to perform canonical
updates to this struct's environment.
Data and other attributes defined here:
- __abstractmethods__ = frozenset()
- __annotations__ = {'iterations': <class 'runtimepy.primitives.int.Uint32Primitive'>}
Methods inherited from RuntimeStruct:
- async build(self, app: 'AppInfo') -> None
- Build a struct instance's channel environment.
Methods inherited from runtimepy.struct.RuntimeStructBase:
- __init__(self, name: str, config: Dict[str, Union[str, int, float, bool, NoneType, Dict[str, Union[str, int, float, bool, NoneType]], List[Union[str, int, float, bool, NoneType]]]]) -> None
- Initialize this instance.
Methods inherited from runtimepy.mixins.logging.LoggerMixinLevelControl:
- setup_level_channel(self, env: runtimepy.channel.environment.ChannelEnvironment, name: str = 'log_level', initial: str = 'info', description: str = 'Text-log level filter for this environment.') -> None
- Add a commandable log-level channel to the environment.
Methods inherited from vcorelib.logging.LoggerMixin:
- log_time(self, message: str, *args, level: int = 20, reminder: bool = False, **kwargs) -> Iterator[NoneType]
- A simple wrapper.
Data descriptors inherited from vcorelib.logging.LoggerMixin:
- __dict__
- dictionary for instance variables
- __weakref__
- list of weak references to the object
Methods inherited from runtimepy.mixins.environment.ChannelEnvironmentMixin:
- register_channel_metrics(self, name: str, channel: runtimepy.metrics.channel.ChannelMetrics, verb: str) -> None
- Register individual channel metrics.
- register_connection_metrics(self, metrics: runtimepy.metrics.connection.ConnectionMetrics, namespace: str = 'metrics') -> None
- Register connection metrics.
- register_task_metrics(self, metrics: runtimepy.metrics.task.PeriodicTaskMetrics, namespace: str = 'metrics') -> None
- Register periodic task metrics.
Methods inherited from runtimepy.mixins.trig.TrigMixin:
- dispatch_trig(self, step: int) -> None
- Dispatch trig channel updates.
| |