|
- Method resolution order:
- RuntimepyServerConnection
- runtimepy.net.tcp.http.HttpConnection
- runtimepy.net.tcp.connection.TcpConnection
- runtimepy.net.connection.Connection
- runtimepy.mixins.logging.LoggerMixinLevelControl
- vcorelib.logging.LoggerMixin
- runtimepy.mixins.environment.ChannelEnvironmentMixin
- abc.ABC
- runtimepy.net.mixin.TransportMixin
- builtins.object
Methods defined here:
- add_path(self, path: Union[pathlib.Path, str, NoneType], front: bool = False) -> None
- Add a path.
- async get_handler(self, response: runtimepy.net.http.response.ResponseHeader, request: runtimepy.net.http.header.RequestHeader, request_data: Optional[bytes]) -> Optional[bytes]
- Sample handler.
- init(self) -> None
- Initialize this instance.
- log_paths(self) -> None
- Log search paths.
- try_file(self, path: Tuple[str, Optional[str]], response: runtimepy.net.http.response.ResponseHeader) -> Optional[bytes]
- Try serving this path as a file directly from the file-system.
Data and other attributes defined here:
- __abstractmethods__ = frozenset()
- __annotations__ = {'apps': dict[str, typing.Callable[[svgen.element.html.Ht...es]], typing.Awaitable[svgen.element.html.Html]]], 'class_paths': list[typing.Union[pathlib.Path, str, NoneType]], 'default_app': typing.Optional[typing.Callable[[svgen.element.h...es]], typing.Awaitable[svgen.element.html.Html]]], 'favicon_data': <class 'bytes'>, 'json_data': typing.Dict[str, typing.Union[str, int, float, b...[typing.Union[str, int, float, bool, NoneType]]]], 'paths': list[pathlib.Path]}
- apps = {}
- class_paths = [PosixPath('.'), PosixPath('/home/vkottler/src/vkottler/workspace/runtimepy/runtimepy/data')]
- default_app = None
- json_data = {'test': {'a': 1, 'b': 2, 'c': 3}}
Methods inherited from runtimepy.net.tcp.http.HttpConnection:
- async process_binary(self, data: bytes) -> bool
- Process a binary frame.
- async request(self, request: runtimepy.net.http.header.RequestHeader, data: Optional[bytes] = None) -> Tuple[runtimepy.net.http.response.ResponseHeader, Optional[bytes]]
- Make an HTTP request.
- async request_json(self, request: runtimepy.net.http.header.RequestHeader, data: Optional[bytes] = None) -> Any
- Perform a request and convert the response to a data structure by
decoding it as JSON.
Data and other attributes inherited from runtimepy.net.tcp.http.HttpConnection:
- handlers = {}
- identity = 'runtimepy/4.4.0'
- log_alias = 'HTTP'
- log_prefix = 'http://'
Methods inherited from runtimepy.net.tcp.connection.TcpConnection:
- __init__(self, transport: asyncio.transports.Transport, protocol: runtimepy.net.tcp.protocol.QueueProtocol) -> None
- Initialize this TCP connection.
- async close(self) -> None
- Close this connection.
- async restart(self) -> bool
- Reset necessary underlying state for this connection to 'process'
again.
- send_binary(self, data: Union[bytes, bytearray, memoryview]) -> None
- Enqueue a binary message tos end.
- send_text(self, data: str) -> None
- Enqueue a text message to send.
Class methods inherited from runtimepy.net.tcp.connection.TcpConnection:
- async app(stop_sig: asyncio.locks.Event, callback: Callable[[~T], NoneType] = None, serving_callback: Callable[[Any], NoneType] = None, manager: runtimepy.net.manager.ConnectionManager = None, **kwargs) -> None from abc.ABCMeta
- Run an application that serves new connections.
- async create_connection(backoff: runtimepy.net.backoff.ExponentialBackoff = None, **kwargs) -> ~T from abc.ABCMeta
- Create a TCP connection.
- create_pair() -> AsyncIterator[tuple[~T, ~T]] from abc.ABCMeta
- Create a connection pair.
- serve(callback: Callable[[~T], NoneType] = None, **kwargs) -> AsyncIterator[Any] from abc.ABCMeta
- Serve incoming connections.
Data and other attributes inherited from runtimepy.net.tcp.connection.TcpConnection:
- uses_binary_tx_queue = False
- uses_text_tx_queue = False
Methods inherited from runtimepy.net.connection.Connection:
- async async_init(self) -> bool
- A runtime initialization routine (executes during 'process').
- disable(self, reason: str) -> None
- Disable this connection.
- disable_extra(self) -> None
- Additional tasks to perform when disabling.
- async disable_in(self, time: float) -> None
- A method for disabling a connection after some delay.
- log_metrics(self, label: str = 'conn') -> None
- Log connection metrics.
- async process(self, stop_sig: asyncio.locks.Event = None, disable_time: float = None, backoff: runtimepy.net.backoff.ExponentialBackoff = None) -> None
- Process tasks for this connection while the connection is active.
- async process_text(self, data: str) -> bool
- Process a text frame.
Readonly properties inherited from runtimepy.net.connection.Connection:
- auto_restart
- Determine if this connection should be automatically restarted.
- disabled
- Determine if this connection is disabled.
Data and other attributes inherited from runtimepy.net.connection.Connection:
- byte_order = <ByteOrder.NETWORK: 4>
- connected = True
- default_auto_restart = False
Methods inherited from runtimepy.mixins.logging.LoggerMixinLevelControl:
- setup_level_channel(self, env: runtimepy.channel.environment.ChannelEnvironment, name: str = 'log_level', initial: str = 'info', description: str = 'Text-log level filter for this environment.') -> None
- Add a commandable log-level channel to the environment.
Methods inherited from vcorelib.logging.LoggerMixin:
- log_time(self, message: str, *args, level: int = 20, reminder: bool = False, **kwargs) -> Iterator[NoneType]
- A simple wrapper.
Data descriptors inherited from vcorelib.logging.LoggerMixin:
- __dict__
- dictionary for instance variables
- __weakref__
- list of weak references to the object
Methods inherited from runtimepy.mixins.environment.ChannelEnvironmentMixin:
- register_channel_metrics(self, name: str, channel: runtimepy.metrics.channel.ChannelMetrics, verb: str) -> None
- Register individual channel metrics.
- register_connection_metrics(self, metrics: runtimepy.metrics.connection.ConnectionMetrics, namespace: str = 'metrics') -> None
- Register connection metrics.
- register_task_metrics(self, metrics: runtimepy.metrics.task.PeriodicTaskMetrics, namespace: str = 'metrics') -> None
- Register periodic task metrics.
Methods inherited from runtimepy.net.mixin.TransportMixin:
- logger_name(self, prefix: 'str' = '') -> 'str'
- Get a logger name for this connection.
- set_transport(self, transport: '_asyncio.BaseTransport') -> 'None'
- Set the transport for this instance.
Readonly properties inherited from runtimepy.net.mixin.TransportMixin:
- socket
- Get this instance's underlying socket.
|