Coverage for src / zooc / dsp / filters.py: 93%
261 statements
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-11 21:45 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-11 21:45 +0000
1"""Digital filters for single dimensional data processing in time domain."""
2from __future__ import annotations
4import logging
5import statistics
6import sys
7import warnings
8from abc import ABC, abstractmethod
9from collections.abc import Sequence
10from dataclasses import dataclass, field
11from functools import cached_property
12from typing import Any, Final, override
14import numpy as np
15import numpy.typing as npt
16import scipy as sp
17from scipy.signal import medfilt
19logger = logging.getLogger(__name__)
21type FloatArray = npt.NDArray[np.float64]
22"""Type alias for a NumPy float array."""
23type FloatAny = npt.NDArray[np.float64] | Sequence[float] | float
24"""Type alias for float array or single float value. """
25type Float = np.float64
26"""Type alias for a single NumPy float."""
28@dataclass(kw_only=True)
29class Filter(ABC):
30 """Mathematical filter to output a value based in single dimensional input data.
32 :param data_values: Input data values.
33 """
35 data_values: FloatArray
37 def __post_init__(self) -> None:
38 """Ensure data can be indexed."""
39 if self.data_values.size == 0:
40 self.data_values = np.asarray([float('nan')], dtype=float)
42 @abstractmethod
43 def is_valid(self) -> bool:
44 """Check whether the data is valid for the filter.
46 :return: True if valid.
47 """
48 return True
50 @abstractmethod
51 def get_output(self) -> float:
52 """Get the output value of the filter.
54 :return: Filtered output value.
55 """
57 def get_full_range(self) -> float:
58 """Get the full range of the input data.
60 :return: Input range.
61 """
62 return float(float(np.max(self.data_values)) - float(np.min(self.data_values)))
64 @abstractmethod
65 def describe(self) -> dict[str, object]:
66 """Describe the filter output.
68 :return: Dictionary of properties and their values.
69 """
70 return {'valid': self.is_valid(),
71 'output': self.get_output()}
74@dataclass(kw_only=True)
75class FilterMonotonic(Filter, ABC):
76 """Abstract filter for monotonically increasing or decreasing data with noise tolerance.
78 :param data_values: Input data values.
79 :param noise_rel_range_th: Allowed relative noise threshold over the value range when verifying data, e.g. 0.1 -> 10%.
80 :param noise_abs_range_th: Allowed absolute noise threshold over the value range when verifying data.
81 """
83 noise_rel_range_th: np.float64 | float
84 noise_abs_range_th: np.float64 | float
86 def is_monotonic(self) -> bool:
87 """Is input data monotonic within the noise tolerance.
89 :return: True if monotonic.
90 """
91 noise = self.get_noise_th()
92 return self.is_monotonic_inc(noise=noise) or self.is_monotonic_dec(noise=noise)
94 def is_monotonic_inc(self, noise: float) -> bool:
95 """Check whether the input data is increasing monotonically within the noise tolerance.
97 :param noise: Absolute noise tolerance.
98 :return: True if monotonically rising.
99 """
100 return len(self.data_values) == 0 or bool(np.all(self.data_values[1:] + noise >= self.data_values[:-1], axis=0))
102 def is_monotonic_dec(self, noise: float) -> bool:
103 """Check whether the input data is decreasing monotonically within noise tolerance.
105 :param noise: Absolute noise tolerance.
106 :return: True if monotonically decreasing.
107 """
108 return len(self.data_values) == 0 or bool(np.all(self.data_values[1:] - noise <= self.data_values[:-1], axis=0))
110 def get_noise_th(self) -> float:
111 """Get allowed noise threshold, larger of relative and absolute noise thresholds.
113 :return: Noise threshold as absolute value.
114 """
115 return max(self.noise_abs_range_th, self.get_full_range() * self.noise_rel_range_th)
117 @override
118 def is_valid(self) -> bool:
119 return super().is_valid() and self.is_monotonic()
121 @override
122 def describe(self) -> dict[str, object]:
123 return (super().describe()
124 | {'monotonic': self.is_monotonic()})
127@dataclass(kw_only=True)
128class FilterMedian(Filter):
129 """Median output filter for 1D data with min/max input noise tolerance.
131 :param data_values: Input data values.
132 :param max_abs_input_range: Allowed absolute input range.
133 """
135 MIN_SAMPLES_STDDEV: Final[int] = 2 # pylint: disable=invalid-name
136 MIN_SAMPLES: Final[int] = field(init=False, default=3, repr=False) # pylint: disable=invalid-name
137 window_size: Final[int] = field(init=False, default=3, repr=False) # pylint: disable=invalid-name
139 max_abs_input_range: float
141 @cached_property
142 def filtered_series(self) -> FloatArray:
143 """Get the median filtered output.
145 :return: Filtered output.
146 """
147 return medfilt(self.data_values, kernel_size=self.window_size)
149 @override
150 def get_output(self) -> float:
151 return float(self.filtered_series[-1])
153 @cached_property
154 def stddev(self) -> float:
155 """Get the linear regression standard deviation of the data.
157 :return: Standard deviation.
158 """
159 if len(self.data_values) < FilterMedian.MIN_SAMPLES_STDDEV:
160 return float('nan')
161 return float(statistics.stdev(self.data_values))
163 @override
164 def is_valid(self) -> bool:
165 return (self.data_values.size >= FilterMedian.MIN_SAMPLES
166 and self.max_abs_input_range >= self.get_full_range()
167 and super().is_valid())
169 @override
170 def describe(self) -> dict[str, object]:
171 return (super().describe()
172 | {'margin': self.max_abs_input_range - self.get_full_range()}
173 | {'SD': self.stddev})
176@dataclass(kw_only=True)
177class FilterMath(Filter):
178 """Abstract filter for forecasting future values.
180 :param data_t: Time data.
181 """
183 data_t: FloatArray
185 @abstractmethod
186 def solve(self, t: FloatArray) -> FloatArray:
187 """Calculate the value of the function at the given time point.
189 :param t: Time point [s].
190 :return: Value at t.
191 """
193 def solve_float(self, t: FloatAny) -> float:
194 """Calculate the value of the function at the given time point.
196 :param t: Time point [s].
197 :return: Value at t.
198 """
199 return self.solve(np.array([t])).item()
201 def solve_t0(self, t: FloatAny) -> FloatArray:
202 """Calculate the value of the function starting from the first time point.
204 :param t: Time after the first data point [s].
205 :return: Value at t_0 + t.
206 """
207 return self.solve(self.data_t[0] + t)
209 def forecast(self, t_future: FloatAny) -> FloatArray:
210 """Calculate the future value of the function starting from the last time point.
212 :param t_future: Time after the latest data point [s].
213 :return: Value at t_future.
214 """
215 return self.solve(t=self.data_t[-1] + t_future)
218@dataclass(kw_only=True)
219class FilterMedianLinear(FilterMath, FilterMedian):
220 """Median output filter for data with time dimension and linear regression.
222 :param data_values: Input data values.
223 :param data_t: Time data.
224 :param max_abs_input_range: Allowed absolute noise threshold over the input range.
225 """
227 @staticmethod
228 def create(data: dict[float, float], max_abs_input_range: float) -> FilterMedianLinear:
229 """Create a FilterLinear from dictionary data.
231 :param data: <time>: <value> dictionary.
232 :param max_abs_input_range: See FilterMedian.
233 :return: FilterMedianLinear.
234 """
235 return FilterMedianLinear(data_values=np.array(list(data.values())),
236 data_t=np.array(list(data.keys())),
237 max_abs_input_range=max_abs_input_range)
239 @cached_property
240 def slope(self) -> float:
241 """Get the linear regression slope of the data.
243 :return: Slope [x/y].
244 """
245 if len(self.data_values) <= 1:
246 return float('nan')
247 return float(self._linregress.slope)
249 @cached_property
250 def stderr_relative(self) -> float:
251 """Get the linear regression slope's relative standard error.
253 Note, not reliable for (nearly) constant (flat) dataset.
255 :return: Relative standard error of the slope [n].
256 """
257 abs_slope = abs(self.slope)
259 if len(self.data_values) <= 1 or abs_slope == 0.0:
260 return float('nan')
262 return float(self._linregress.stderr / abs_slope)
264 @cached_property
265 def intercept(self) -> float:
266 """Get the linear regression. y-intercept of the data.
268 :return: y-intercept at t=0.
269 """
270 if len(self.data_values) <= 1:
271 return float('nan')
272 return float(self._linregress.intercept)
274 @cached_property
275 def correlation(self) -> float:
276 """Get the linear regression correlation of the data.
278 Note, correlation of constant (flat) data is 0.
280 :return: Correlation as r-value.
281 """
282 if len(self.data_values) <= 1: 282 ↛ 283line 282 didn't jump to line 283 because the condition on line 282 was never true
283 return float('nan')
284 return float(self._linregress.rvalue)
286 @override
287 def describe(self) -> dict[str, object]:
288 return (super().describe()
289 | {'slope': self.slope,
290 'rse': self.stderr_relative})
292 @override
293 def solve(self, t: FloatArray) -> FloatArray:
294 return np.atleast_1d(self.slope * t + self.intercept)
296 @cached_property
297 def _linregress(self) -> Any:
298 """Linear regression statistics of the data (scipy).
300 :return: Result as scipy's linregress object.
301 """
302 with warnings.catch_warnings(record=True) as w:
303 stats = sp.stats.linregress(self.data_t, self.filtered_series)
304 # log warning data
305 for warning in w:
306 level = logging.WARNING
307 if warning.category is RuntimeWarning or warning.category is UserWarning: 307 ↛ 311line 307 didn't jump to line 311 because the condition on line 307 was always true
308 # RuntimeWarning: invalid value encountered in scalar divide data=...
309 # UserWarning: kernel_size exceeds volume extent: the volume will be zero-padded
310 level = logging.DEBUG
311 logger.log(level, f"{warning.category} {warning.message} data={self.data_values}")
312 return stats
315@dataclass(kw_only=True)
316class FilterStable(FilterMedianLinear):
317 """Stable filter for 2D data.
319 Data is determined as stable (valid) when the slope is small and correlation high.
321 :param data_values: Input data values.
322 :param max_abs_input_range: Allowed absolute noise threshold over the input range.
323 :param data_t: Time data.
324 :param max_slope: Maximum allowed slope in given unit / time [x/y].
325 :param max_rse: Maximum relative standard error of the slope when slope is high [n].
326 :param max_stddev: Maximum standard deviation when slope is low [y].
327 """
329 max_slope: float
330 max_rse: float
331 max_stddev: float
333 @override
334 def is_valid(self) -> bool:
335 return (abs(self.slope) <= self.max_slope
336 and (abs(self.stderr_relative) <= self.max_rse or self.stddev <= self.max_stddev)
337 and super().is_valid())
340@dataclass(kw_only=True)
341class FilterExpDecay(FilterMath):
342 """Exponentially decaying filter.
344 :param data_values: Input data values.
345 :param data_t: Time data.
346 :param max_linalg: Maximum value for the linear algebra condition number of the covariance matrix.
347 A typical valid value is in range of 1E5-20E6.
348 :raises ValueError: When data is not valid for exponential decay filter.
349 """
351 max_linalg: Final[float] = field(default=20E6, repr=False) # pylint: disable=invalid-name
353 MIN_SAMPLES: Final[int] = field(init=False, default=4, repr=False) # pylint: disable=invalid-name
355 a: np.float64 = field(init=False, repr=False)
356 """Amplitude constant."""
357 k: np.float64 = field(init=False, repr=False)
358 """Exponent constant. Absolute value. Negative value is used in :py:meth:`model_func`"""
359 c: np.float64 = field(init=False, repr=False)
360 """Constant where the function decays to, i.e. the limit value."""
361 parm_cov: FloatArray = field(init=False, repr=False)
362 """Numpy covariance matrix of the parameters."""
363 t_zero: np.float64 = field(init=False, repr=False)
364 """Zero time point."""
365 t_range: np.float64 = field(init=False, repr=False)
366 """Time range for normalized time."""
368 @staticmethod
369 def model_func(t: FloatArray, a: Float, k: Float, c: Float) -> FloatArray:
370 """Mathematical function for exponential decay used by this filter.
372 :param t: Time.
373 :param a: Amplitude constant.
374 :param k: Exponent constant.
375 :param c: Constant where the function decays to, i.e. the limit value.
376 :return: Value of the function at time t.
377 """
378 return a * np.exp(k * t) + c
380 @classmethod
381 def create[T: FilterExpDecay](cls: type[T], data: dict[float, float]) -> T:
382 """Create a FilterExpDecay from dictionary data.
384 :param data: <time>: <value> dictionary.
385 :return: FilterExpDecay.
386 """
387 return cls(data_t=np.array(list(data.keys())), data_values=np.array(list(data.values())))
389 def __post_init__(self) -> None:
390 """Initialize the curve fitting model.
392 :raises ValueError: When data is not valid for exponential decay filter.
393 """
394 # Numpy does not solve data with large x-values. Normalize time data.
395 self.t_zero = self.data_t[0]
396 self.t_range = self.data_t[-1] - self.t_zero
398 try:
399 if self.t_range > 0.0 and self.data_values.size >= FilterExpDecay.MIN_SAMPLES and (float(np.max(self.data_values)) - float(np.min(self.data_values))) > 0.0:
400 # Normalized time data to 0..1
401 data_t_normalized = (self.data_t - self.t_zero) / self.t_range
403 # maxfev=10000
404 (self.a, self.k, self.c), self.parm_cov = sp.optimize.curve_fit(FilterExpDecay.model_func, data_t_normalized, self.data_values, p0=self._p0, bounds=self.bounds)
405 # check the parameters did not converge at bounds. Avoids false positive
406 eps = 1e-6 # bounds should have margin
407 if not (self.bounds[0][0] + eps < self.a < self.bounds[1][0] - eps
408 and self.bounds[0][1] + eps < self.k < self.bounds[1][1] - eps
409 and self.bounds[0][2] + eps < self.c < self.bounds[1][2] - eps):
410 raise ValueError("Parameters converged at bounds")
411 else:
412 raise ValueError("Data is not valid for exponential decay filter")
413 except (RuntimeError, TypeError, ValueError) as _err:
414 # RuntimeError: When parameters are not solved
415 # TypeError: When too few data points (2)
416 # ValueError: When dataset is empty
417 # logger.exception(_err)
418 self.parm_cov = np.array([[sys.float_info.max], [sys.float_info.max]])
419 self.a = np.float64(0.0)
420 self.k = np.float64(1.0)
421 self.c = np.float64(0.0)
423 @cached_property
424 def bounds(self) -> tuple[list[float], list[float]]:
425 """Get bounds for the model.
427 :return: Tuple of (lower_bounds, upper_bounds) for [a, k, c].
428 """
429 # Set bound for decay, i.e. k is negative.
430 return ([-np.inf, -np.inf, -np.inf],
431 [np.inf, -0.0, np.inf])
433 @cached_property
434 def _p0(self) -> list[float]:
435 """Get initial guess for the curve fitting parameters.
437 :return: List of initial guess parameters [a, k, c].
438 """
439 c0 = self.data_values[-1]
440 a0 = self.data_values[0] - c0
441 return [a0, -1.0, c0]
443 @override
444 def solve(self, t: FloatArray) -> FloatArray:
445 t_norm = self._t_normalize(np.float64(t))
446 return self.model_func(np.asarray(t_norm), self.a, self.k, self.c)
448 @override
449 def get_output(self) -> float:
450 """Get the final limit value into which the function approaches.
452 :return: Final value, i.e., :attr:`.c`.
453 """
454 return self.c.item()
456 def get_time_constant(self) -> float:
457 """Get the time constant of the exponential decay function.
459 :return: Time constant [s].
460 """
461 return float(1.0 / -self.k * self.t_range)
463 def is_pos_or_neg(self) -> bool:
464 """Check whether the function is rising or decreasing.
466 :return: True if rising.
467 """
468 return bool(self.a >= 0.0)
470 def get_amplitude(self) -> float:
471 """Get the amplitude constant, i.e., the difference of the first sample and the final value.
473 :return: Amplitude.
474 """
475 return float(self.a)
477 def get_value_tc(self, value: float) -> float:
478 """Get time as a multiple of the time constant for given value.
480 Example: A value at 63.5% of the final value would result 1.0 (1 time constant).
482 :param value: Value to calculate the time constant for.
483 :return: Multiple of the time constant for given value.
484 """
485 return self.get_time_constant() * float(np.log(self.a / value))
487 def has_enough_data(self) -> bool:
488 """Was enough data points provided to solve the exponential decay function.
490 :return: True is enough data points.
491 """
492 return self.data_values.size >= FilterExpDecay.MIN_SAMPLES
494 def find_worst_outlier(self) -> tuple[float, float]:
495 """Get the worst outlier from the data.
497 :return: Tuple of (time, value) of the worst outlier.
498 """
499 # Calculate residuals
500 residuals = self.data_values - self.solve(self.data_t)
501 # Find the index of the largest absolute residual
502 worst_outlier_idx = np.argmax(np.abs(residuals))
503 return self.data_t[worst_outlier_idx].item(), self.data_values[worst_outlier_idx].item()
505 @override
506 def is_valid(self, max_linalg: float | None = None) -> bool:
507 """Validate the data resembles exponentially decaying function.
509 - At least 4 data points.
510 - Numpy evaluates the data as valid.
512 :param max_linalg: Maximum value for the linear algebra condition number of the covariance matrix.
513 :return: True: model is reliable.
514 """
515 return (self.has_enough_data()
516 and super().is_valid()
517 and self._is_exp_valid(max_linalg)
518 and self.get_time_constant() > 0)
520 @override
521 def describe(self) -> dict[str, object]:
522 return (super().describe()
523 | {
524 'tau': self.get_time_constant(),
525 'lin': float(np.linalg.cond(self.parm_cov)),
526 # 'cov': f"{self.parm_cov}",
527 })
529 def _t_normalize(self, t: np.float64) -> np.float64:
530 """Normalize time.
532 :param t: Time to normalize [s].
533 :return: Normalized time.
534 """
535 return (t - self.t_zero) / self.t_range
537 def _is_exp_valid(self, max_linalg: float | None) -> bool:
538 """Evaluate how well Numpy solves the exponential decay model.
540 :param max_linalg: Maximum value for the linear algebra condition number of the covariance matrix. None to use default value
541 :return: True if the exponential decay model is valid.
542 """
543 # Check for invalid values
544 if np.isinf(self.parm_cov).any() or np.isnan(self.parm_cov).any(): 544 ↛ 545line 544 didn't jump to line 545 because the condition on line 544 was never true
545 return False
547 # lin = np.linalg.cond(self.parm_cov)
548 # A large value is cause for concern. The diagonal elements of the covariance matrix, which is related to uncertainty of the fit, give more information:
549 # For further checks, evaluate :py:meth:`np.diag(self.parm_cov)`
551 # Clamp small values to avoid misleadingly high ratios. Ideal data results in range of 1e-21..1e-25
552 singular_values_clipped = np.maximum(np.linalg.svd(self.parm_cov, compute_uv=False), 1e-20)
553 lin = singular_values_clipped.max() / singular_values_clipped.min()
555 if not max_linalg: 555 ↛ 557line 555 didn't jump to line 557 because the condition on line 555 was always true
556 max_linalg = self.max_linalg
557 if lin > max_linalg:
558 return False
559 return bool(self.a != 0.0)
562class FilterMonotonicImpl(FilterMonotonic):
563 """Dummy implementation of FilterMonotonic."""
565 @override
566 def get_output(self) -> float:
567 return 0.0
570@dataclass(kw_only=True)
571class FilterExpDecayZ(FilterExpDecay):
572 """Exponentially decaying filter combined with min/max time constant and monotonic data validation.
574 :param data_values: See parent class: :py:class:`FilterExpDecay`.
575 :param data_t: See parent class: :py:class:`FilterExpDecay`.
576 :param a: See parent class: :py:class:`FilterExpDecay`.
577 :param k: See parent class: :py:class:`FilterExpDecay`.
578 :param c: See parent class: :py:class:`FilterExpDecay`.
579 :param parm_cov: See parent class: :py:class:`FilterExpDecay`.
580 :param t_zero: See parent class: :py:class:`FilterExpDecay`.
581 :param t_range: See parent class: :py:class:`FilterExpDecay`.
582 :param max_linalg: See parent class: :py:class:`FilterExpDecay`.
583 :param tau_min_th: Minimum time constant [s].
584 :param tau_max_th: Maximum time constant [s].
585 :param noise_rel_range_th: See :py:class:`FilterMonotonic`.
586 :param noise_abs_range_th: See :py:class:`FilterMonotonic`.
587 :raises ValueError: When data is not valid for exponential decay filter.
588 """
590 tau_min_th: float
591 tau_max_th: float
592 noise_rel_range_th: float
593 noise_abs_range_th: float
595 _filter_monotonic: FilterMonotonic = field(init=False)
596 """In addition to the exponential decay, ensure that the data is monotonic within the noise tolerance."""
598 @override
599 def __post_init__(self) -> None:
600 super().__post_init__()
601 self._filter_monotonic = FilterMonotonicImpl(data_values=self.data_values, noise_rel_range_th=self.noise_rel_range_th, noise_abs_range_th=self.noise_abs_range_th)
603 @override
604 @cached_property
605 def bounds(self) -> tuple[list[float], list[float]]:
606 # Provide both initial guesses (p0) and bounds based on the Z-offset.
607 # - Z-offset can vary +-0.5 mm and is positive
608 # - Time constant is within self.tau_min_th...self.tau_max_th
609 max_amp = 0.5 # Max allowed amplitude, i.e. thermal expansion [mm]
610 c0 = self.data_values[-1]
611 self.t_zero = np.min(self.data_t)
612 k_min = -1.0 / np.float64(self.tau_min_th / self.t_range)
613 k_max = -1.0 / np.float64(self.tau_max_th / self.t_range)
614 return ([-max_amp, k_min, max(0.0, c0 - max_amp)],
615 [max_amp, k_max, max(0.0, c0 + max_amp)])
617 @override
618 @cached_property
619 def _p0(self) -> list[float]:
620 k_mid = np.mean([self.bounds[0][1], self.bounds[1][1]])
621 return [0.0, k_mid, self.data_values[-1]]
623 @override
624 def is_valid(self, max_linalg: float | None = None) -> bool:
625 return (self._filter_monotonic.is_valid()
626 and super().is_valid(max_linalg=max_linalg)
627 and (self.tau_min_th <= self.get_time_constant() <= self.tau_max_th)
628 and self.get_output() >= 0.0)
630 @override
631 def describe(self) -> dict[str, object]:
632 return (self._filter_monotonic.describe() # First the abstract filter to override unimplemented keys
633 | super().describe())