Source code for vcorelib.graph.port
"""
A module for working with graph ports.
"""
from enum import Enum as _Enum
from enum import auto as _auto
# built-in
from itertools import zip_longest as _zip_longest
from typing import Dict as _Dict
from typing import NamedTuple
from typing import Optional as _Optional
from typing import Set as _Set
from typing import TextIO as _TextIO
[docs]
class PortType(_Enum):
"""An enumeration of port types."""
INPUT = _auto()
OUTPUT = _auto()
INOUT = _auto()
def __str__(self) -> str:
"""Get this port type as a string."""
return self.name.lower()
@property
def is_input(self) -> bool:
"""Determine if this port type is capable of input."""
return self is not PortType.OUTPUT
@property
def is_output(self) -> bool:
"""Determine if this port type is capable of output."""
return self is not PortType.INPUT
[docs]
class Port(NamedTuple):
"""A simple implementation for generic, input and output ports."""
label: str
kind: PortType = PortType.INOUT
alias: _Optional[str] = None
allocated: bool = False
[docs]
def allocate(self) -> "Port":
"""Allocate this port."""
assert not self.allocated, f"Port '{self}' is already allocated!"
return Port(
self.label, kind=self.kind, alias=self.alias, allocated=True
)
def __str__(self) -> str:
"""Get this port as a string."""
result = self.label
alloc_str = "x" if self.allocated else " "
result = f"{self.label}[{self.kind}][{alloc_str}]"
if self.alias:
result += f" ({self.alias})"
return result
@property
def is_input(self) -> bool:
"""Determine if this is an input port."""
return self.kind.is_input
@property
def is_output(self) -> bool:
"""Determine if this is an output port."""
return self.kind.is_output
[docs]
class PortManager:
"""An interface for managing input and output ports."""
def __init__(self) -> None:
"""Initialize this port manager."""
self.outputs: _Dict[str, Port] = {}
self.inputs: _Dict[str, Port] = {}
[docs]
def inout_labels(self) -> _Set[str]:
"""Get inout port labels."""
return set(self.inputs.keys()).intersection(self.outputs.keys())
[docs]
def output_labels(self, exclude_inout: bool = True) -> _Set[str]:
"""Get output port labels."""
result = set(self.outputs.keys())
if exclude_inout:
result -= self.inout_labels()
return result
[docs]
def label(self, stream: _TextIO) -> None:
"""Create a record label based on the current port configuration."""
columns = []
# Handle inout ports.
inouts = "|".join(
[f"<{x}> {self.outputs[x]}" for x in sorted(self.inout_labels())]
)
columns.append(inouts if inouts else "no inout ports")
# Put inputs above outputs and arrange in a table.
for in_label, out_label in _zip_longest(
sorted(self.input_labels()), sorted(self.output_labels())
):
in_data = (
f"<{in_label}> {self.inputs[in_label]}" if in_label else ""
)
out_data = (
f"<{out_label}> {self.outputs[out_label]}" if out_label else ""
)
columns.append("{" + in_data + "|" + out_data + "}")
# Write columns to the stream.
stream.write("|".join(columns))
[docs]
def create(self, label: str, **kwargs) -> Port:
"""Add a new port to this port manager."""
port = Port(label, **kwargs)
# Ensure the port's alias doesn't alias an existing input or output
# port.
if port.alias:
assert (
port.alias not in self.inputs
), f"Port alias '{port.alias}' is already an input label!"
assert (
port.alias not in self.outputs
), f"Port alias '{port.alias}' is already an output label!"
if port.is_input:
assert label not in self.inputs, f"Duplicate input port '{port}'!"
self.inputs[label] = port
if port.is_output:
assert (
label not in self.outputs
), f"Duplicate output port '{port}'!"
self.outputs[label] = port
return port
[docs]
def allocate(self, label: str) -> Port:
"""Allocate a labeled port."""
port = None
if label in self.inputs:
port = self.inputs[label].allocate()
self.inputs[label] = port
if label in self.outputs:
# Don't create a new port if it's an inout.
if port is None:
port = self.outputs[label].allocate()
self.outputs[label] = port
assert port is not None, f"Port '{label}' not found!"
return port