Source code for vcorelib.math.analysis.weighted

"""
A module for implementing a weighted average.
"""

# internal
from vcorelib.math.analysis.average import MovingSum as _MovingSum
from vcorelib.math.analysis.buffer import DEFAULT_DEPTH as _DEFAULT_DEPTH
from vcorelib.math.analysis.buffer import FloatBuffer as _FloatBuffer


[docs] class WeightedAverage: """A class implementing a weighted average.""" def __init__(self, depth: int = _DEFAULT_DEPTH) -> None: """Initialize this weighted average.""" self.signals = _FloatBuffer(depth=depth) self.weights = _MovingSum(depth=depth) @property def depth(self) -> int: """This average's depth.""" return self.signals.depth
[docs] def reset(self) -> None: """Reset the average.""" self.signals.reset() self.weights.reset()
[docs] def average(self) -> float: """Compute the overall weighted average.""" total: float = 0.0 if self.weights.sum: for signal, weight in zip(self.signals.data, self.weights.data): # If an element has no weight, it shouldn't be considered. if weight > 0.0: total += signal * (weight / self.weights.sum) return total
def __call__(self, value: float, weight: float = 1.0) -> None: """Update tracking, doesn't compute weighted average.""" self.signals(value) self.weights(weight)