runtimepy.net.udp package#
Submodules#
runtimepy.net.udp.connection module#
A module implementing a UDP connection interface.
- class runtimepy.net.udp.connection.EchoUdpConnection(transport: DatagramTransport, protocol: UdpQueueProtocol)[source]#
Bases:
UdpConnection,EchoConnectionAn echo connection for UDP.
- class runtimepy.net.udp.connection.NullUdpConnection(transport: DatagramTransport, protocol: UdpQueueProtocol)[source]#
Bases:
UdpConnection,NullConnectionA null UDP connection.
- class runtimepy.net.udp.connection.UdpConnection(transport: DatagramTransport, protocol: UdpQueueProtocol)[source]#
Bases:
Connection,TransportMixinA UDP connection interface.
- async classmethod create_connection(connect: bool = True, **kwargs) T[source]#
Create a UDP connection.
- env: ChannelEnvironment#
- latest_rx_address: tuple[str, int] | None#
- logger: LoggerType#
- abstract async process_datagram(data: bytes, addr: tuple[str, int]) bool[source]#
Process a datagram.
- remote_address: _Optional[_IpHost]#
- async restart() bool[source]#
Reset necessary underlying state for this connection to ‘process’ again.
- sendto(data: bytes, addr: IPv4Host | IPv6Host | tuple[str, int] = None) None[source]#
Send to a specific address.
- set_remote_address(addr: IPv4Host | IPv6Host) None[source]#
Set a new remote address. Note that this doesn’t interact with or attempt to ‘connect’ the underlying socket. That should be done at creation time.
- tx_binary_hwm: int#
- tx_text_hwm: int#
- uses_binary_tx_queue = False#
- uses_text_tx_queue = False#
runtimepy.net.udp.create module#
A module for instantiating the underlying networking resources for UdpConnection.
- async runtimepy.net.udp.create.try_udp_transport_protocol(callback: Callable[[tuple[DatagramTransport, UdpQueueProtocol]], None] = None, **kwargs) tuple[DatagramTransport, UdpQueueProtocol] | None[source]#
Attempt to create a transport and protocol pair.
- async runtimepy.net.udp.create.udp_transport_protocol(**kwargs) tuple[DatagramTransport, UdpQueueProtocol][source]#
Create a transport and protocol pair relevant for this class’s implementation.
- async runtimepy.net.udp.create.udp_transport_protocol_backoff(backoff: ExponentialBackoff = None, **kwargs) tuple[DatagramTransport, UdpQueueProtocol][source]#
Create a transport and protocol pair relevant for this class’s implementation.
runtimepy.net.udp.protocol module#
A module implementing a DatagramProtocol for UdpConnection.
- class runtimepy.net.udp.protocol.UdpQueueProtocol[source]#
Bases:
DatagramProtocolA simple UDP protocol that populates a message queue.
- conn: Connection#
- logger: Logger | LoggerAdapter[Any]#
runtimepy.net.udp.queue module#
A module implementing a simple queue-based UDP interface.
- class runtimepy.net.udp.queue.QueueUdpConnection(transport: DatagramTransport, protocol: UdpQueueProtocol)[source]#
Bases:
UdpConnectionAn echo connection for UDP.
- datagrams: Queue[tuple[bytes, tuple[str, int]]]#
Module contents#
A module implementing networking utilities relevant to UDP.
- class runtimepy.net.udp.EchoUdpConnection(transport: DatagramTransport, protocol: UdpQueueProtocol)[source]#
Bases:
UdpConnection,EchoConnectionAn echo connection for UDP.
- class runtimepy.net.udp.NullUdpConnection(transport: DatagramTransport, protocol: UdpQueueProtocol)[source]#
Bases:
UdpConnection,NullConnectionA null UDP connection.
- class runtimepy.net.udp.QueueUdpConnection(transport: DatagramTransport, protocol: UdpQueueProtocol)[source]#
Bases:
UdpConnectionAn echo connection for UDP.
- datagrams: Queue[tuple[bytes, tuple[str, int]]]#
- env: ChannelEnvironment#
- latest_rx_address: _Optional[tuple[str, int]]#
- logger: LoggerType#
- remote_address: _Optional[_IpHost]#
- tx_binary_hwm: int#
- tx_text_hwm: int#
- class runtimepy.net.udp.UdpConnection(transport: DatagramTransport, protocol: UdpQueueProtocol)[source]#
Bases:
Connection,TransportMixinA UDP connection interface.
- async classmethod create_connection(connect: bool = True, **kwargs) T[source]#
Create a UDP connection.
- env: ChannelEnvironment#
- latest_rx_address: tuple[str, int] | None#
- logger: LoggerType#
- abstract async process_datagram(data: bytes, addr: tuple[str, int]) bool[source]#
Process a datagram.
- remote_address: _Optional[_IpHost]#
- async restart() bool[source]#
Reset necessary underlying state for this connection to ‘process’ again.
- sendto(data: bytes, addr: IPv4Host | IPv6Host | tuple[str, int] = None) None[source]#
Send to a specific address.
- set_remote_address(addr: IPv4Host | IPv6Host) None[source]#
Set a new remote address. Note that this doesn’t interact with or attempt to ‘connect’ the underlying socket. That should be done at creation time.
- tx_binary_hwm: int#
- tx_text_hwm: int#
- uses_binary_tx_queue = False#
- uses_text_tx_queue = False#