runtimepy.net.server.websocket package#

Submodules#

runtimepy.net.server.websocket.state module#

A module implementing a stateful data interface for per-connection tabs.

class runtimepy.net.server.websocket.state.TabState(shown: bool, shown_ever: bool, tab_logger: ListLogger, points: dict[str, list[tuple[str | int | float | bool, int]]], primitives: dict[str, Int8Primitive | Int16Primitive | Int32Primitive | Int64Primitive | Uint8Primitive | Uint16Primitive | Uint32Primitive | Uint64Primitive | HalfPrimitive | FloatPrimitive | DoublePrimitive | BooleanPrimitive], callbacks: dict[str, int], _loggers: list[Logger])[source]#

Bases: object

Stateful information relevant to individual tabs.

add_logger(logger: Logger) None[source]#

Add a logger.

callbacks: dict[str, int]#
clear_loggers() None[source]#

Clear all logging handlers.

static create() TabState[source]#

Create a new instance.

frame(time: float) dict[str, Any][source]#

Handle a new UI frame.

points: dict[str, list[tuple[str | int | float | bool, int]]]#
primitives: dict[str, Int8Primitive | Int16Primitive | Int32Primitive | Int64Primitive | Uint8Primitive | Uint16Primitive | Uint32Primitive | Uint64Primitive | HalfPrimitive | FloatPrimitive | DoublePrimitive | BooleanPrimitive]#
shown: bool#
shown_ever: bool#
tab_logger: ListLogger#

Module contents#

A module implementing a simple WebSocket server for the package.

class runtimepy.net.server.websocket.RuntimepyDataWebsocketConnection(protocol: WebSocketClientProtocol | WebSocketServerProtocol)[source]#

Bases: WebsocketConnection

A class implementing a WebSocket connection for streaming raw data.

class runtimepy.net.server.websocket.RuntimepyWebsocketConnection(protocol: WebSocketClientProtocol | WebSocketServerProtocol)[source]#

Bases: WebsocketJsonMessageConnection

A class implementing a package-specific WebSocket connection.

command: ChannelCommandProcessor#
disable_extra() None[source]#

Additional tasks to perform when disabling.

env: ChannelEnvironment#
first_client: bool#
first_message: bool#
id_responses: dict[int, JsonMessage]#
ids_waiting: dict[int, asyncio.Event]#
init() None[source]#

Initialize this instance.

list_handler: ListLogger#
logger: LoggerType#
outgoing_commands: asyncio.Queue[ChannelCommandParams]#
processor: MessageProcessor#
remote_environments: dict[str, ChannelEnvironment]#
remote_meta: JsonMessage | None#
send_interfaces: dict[str, Callable[[dict[str, Any]], None]]#
tab_sender(name: str) Callable[[dict[str, Any]], None][source]#

Get a tab message-sending interface.

tabs: dict[str, TabState]#
tx_binary_hwm: int#
tx_text_hwm: int#
ui_time: float#