Coverage for src / zooc / dsp / filters.py: 93%

261 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2025-12-11 21:45 +0000

1"""Digital filters for single dimensional data processing in time domain.""" 

2from __future__ import annotations 

3 

4import logging 

5import statistics 

6import sys 

7import warnings 

8from abc import ABC, abstractmethod 

9from collections.abc import Sequence 

10from dataclasses import dataclass, field 

11from functools import cached_property 

12from typing import Any, Final, override 

13 

14import numpy as np 

15import numpy.typing as npt 

16import scipy as sp 

17from scipy.signal import medfilt 

18 

19logger = logging.getLogger(__name__) 

20 

21type FloatArray = npt.NDArray[np.float64] 

22"""Type alias for a NumPy float array.""" 

23type FloatAny = npt.NDArray[np.float64] | Sequence[float] | float 

24"""Type alias for float array or single float value. """ 

25type Float = np.float64 

26"""Type alias for a single NumPy float.""" 

27 

28@dataclass(kw_only=True) 

29class Filter(ABC): 

30 """Mathematical filter to output a value based in single dimensional input data. 

31 

32 :param data_values: Input data values. 

33 """ 

34 

35 data_values: FloatArray 

36 

37 def __post_init__(self) -> None: 

38 """Ensure data can be indexed.""" 

39 if self.data_values.size == 0: 

40 self.data_values = np.asarray([float('nan')], dtype=float) 

41 

42 @abstractmethod 

43 def is_valid(self) -> bool: 

44 """Check whether the data is valid for the filter. 

45 

46 :return: True if valid. 

47 """ 

48 return True 

49 

50 @abstractmethod 

51 def get_output(self) -> float: 

52 """Get the output value of the filter. 

53 

54 :return: Filtered output value. 

55 """ 

56 

57 def get_full_range(self) -> float: 

58 """Get the full range of the input data. 

59 

60 :return: Input range. 

61 """ 

62 return float(float(np.max(self.data_values)) - float(np.min(self.data_values))) 

63 

64 @abstractmethod 

65 def describe(self) -> dict[str, object]: 

66 """Describe the filter output. 

67 

68 :return: Dictionary of properties and their values. 

69 """ 

70 return {'valid': self.is_valid(), 

71 'output': self.get_output()} 

72 

73 

74@dataclass(kw_only=True) 

75class FilterMonotonic(Filter, ABC): 

76 """Abstract filter for monotonically increasing or decreasing data with noise tolerance. 

77 

78 :param data_values: Input data values. 

79 :param noise_rel_range_th: Allowed relative noise threshold over the value range when verifying data, e.g. 0.1 -> 10%. 

80 :param noise_abs_range_th: Allowed absolute noise threshold over the value range when verifying data. 

81 """ 

82 

83 noise_rel_range_th: np.float64 | float 

84 noise_abs_range_th: np.float64 | float 

85 

86 def is_monotonic(self) -> bool: 

87 """Is input data monotonic within the noise tolerance. 

88 

89 :return: True if monotonic. 

90 """ 

91 noise = self.get_noise_th() 

92 return self.is_monotonic_inc(noise=noise) or self.is_monotonic_dec(noise=noise) 

93 

94 def is_monotonic_inc(self, noise: float) -> bool: 

95 """Check whether the input data is increasing monotonically within the noise tolerance. 

96 

97 :param noise: Absolute noise tolerance. 

98 :return: True if monotonically rising. 

99 """ 

100 return len(self.data_values) == 0 or bool(np.all(self.data_values[1:] + noise >= self.data_values[:-1], axis=0)) 

101 

102 def is_monotonic_dec(self, noise: float) -> bool: 

103 """Check whether the input data is decreasing monotonically within noise tolerance. 

104 

105 :param noise: Absolute noise tolerance. 

106 :return: True if monotonically decreasing. 

107 """ 

108 return len(self.data_values) == 0 or bool(np.all(self.data_values[1:] - noise <= self.data_values[:-1], axis=0)) 

109 

110 def get_noise_th(self) -> float: 

111 """Get allowed noise threshold, larger of relative and absolute noise thresholds. 

112 

113 :return: Noise threshold as absolute value. 

114 """ 

115 return max(self.noise_abs_range_th, self.get_full_range() * self.noise_rel_range_th) 

116 

117 @override 

118 def is_valid(self) -> bool: 

119 return super().is_valid() and self.is_monotonic() 

120 

121 @override 

122 def describe(self) -> dict[str, object]: 

123 return (super().describe() 

124 | {'monotonic': self.is_monotonic()}) 

125 

126 

127@dataclass(kw_only=True) 

128class FilterMedian(Filter): 

129 """Median output filter for 1D data with min/max input noise tolerance. 

130 

131 :param data_values: Input data values. 

132 :param max_abs_input_range: Allowed absolute input range. 

133 """ 

134 

135 MIN_SAMPLES_STDDEV: Final[int] = 2 # pylint: disable=invalid-name 

136 MIN_SAMPLES: Final[int] = field(init=False, default=3, repr=False) # pylint: disable=invalid-name 

137 window_size: Final[int] = field(init=False, default=3, repr=False) # pylint: disable=invalid-name 

138 

139 max_abs_input_range: float 

140 

141 @cached_property 

142 def filtered_series(self) -> FloatArray: 

143 """Get the median filtered output. 

144 

145 :return: Filtered output. 

146 """ 

147 return medfilt(self.data_values, kernel_size=self.window_size) 

148 

149 @override 

150 def get_output(self) -> float: 

151 return float(self.filtered_series[-1]) 

152 

153 @cached_property 

154 def stddev(self) -> float: 

155 """Get the linear regression standard deviation of the data. 

156 

157 :return: Standard deviation. 

158 """ 

159 if len(self.data_values) < FilterMedian.MIN_SAMPLES_STDDEV: 

160 return float('nan') 

161 return float(statistics.stdev(self.data_values)) 

162 

163 @override 

164 def is_valid(self) -> bool: 

165 return (self.data_values.size >= FilterMedian.MIN_SAMPLES 

166 and self.max_abs_input_range >= self.get_full_range() 

167 and super().is_valid()) 

168 

169 @override 

170 def describe(self) -> dict[str, object]: 

171 return (super().describe() 

172 | {'margin': self.max_abs_input_range - self.get_full_range()} 

173 | {'SD': self.stddev}) 

174 

175 

176@dataclass(kw_only=True) 

177class FilterMath(Filter): 

178 """Abstract filter for forecasting future values. 

179 

180 :param data_t: Time data. 

181 """ 

182 

183 data_t: FloatArray 

184 

185 @abstractmethod 

186 def solve(self, t: FloatArray) -> FloatArray: 

187 """Calculate the value of the function at the given time point. 

188 

189 :param t: Time point [s]. 

190 :return: Value at t. 

191 """ 

192 

193 def solve_float(self, t: FloatAny) -> float: 

194 """Calculate the value of the function at the given time point. 

195 

196 :param t: Time point [s]. 

197 :return: Value at t. 

198 """ 

199 return self.solve(np.array([t])).item() 

200 

201 def solve_t0(self, t: FloatAny) -> FloatArray: 

202 """Calculate the value of the function starting from the first time point. 

203 

204 :param t: Time after the first data point [s]. 

205 :return: Value at t_0 + t. 

206 """ 

207 return self.solve(self.data_t[0] + t) 

208 

209 def forecast(self, t_future: FloatAny) -> FloatArray: 

210 """Calculate the future value of the function starting from the last time point. 

211 

212 :param t_future: Time after the latest data point [s]. 

213 :return: Value at t_future. 

214 """ 

215 return self.solve(t=self.data_t[-1] + t_future) 

216 

217 

218@dataclass(kw_only=True) 

219class FilterMedianLinear(FilterMath, FilterMedian): 

220 """Median output filter for data with time dimension and linear regression. 

221 

222 :param data_values: Input data values. 

223 :param data_t: Time data. 

224 :param max_abs_input_range: Allowed absolute noise threshold over the input range. 

225 """ 

226 

227 @staticmethod 

228 def create(data: dict[float, float], max_abs_input_range: float) -> FilterMedianLinear: 

229 """Create a FilterLinear from dictionary data. 

230 

231 :param data: <time>: <value> dictionary. 

232 :param max_abs_input_range: See FilterMedian. 

233 :return: FilterMedianLinear. 

234 """ 

235 return FilterMedianLinear(data_values=np.array(list(data.values())), 

236 data_t=np.array(list(data.keys())), 

237 max_abs_input_range=max_abs_input_range) 

238 

239 @cached_property 

240 def slope(self) -> float: 

241 """Get the linear regression slope of the data. 

242 

243 :return: Slope [x/y]. 

244 """ 

245 if len(self.data_values) <= 1: 

246 return float('nan') 

247 return float(self._linregress.slope) 

248 

249 @cached_property 

250 def stderr_relative(self) -> float: 

251 """Get the linear regression slope's relative standard error. 

252 

253 Note, not reliable for (nearly) constant (flat) dataset. 

254 

255 :return: Relative standard error of the slope [n]. 

256 """ 

257 abs_slope = abs(self.slope) 

258 

259 if len(self.data_values) <= 1 or abs_slope == 0.0: 

260 return float('nan') 

261 

262 return float(self._linregress.stderr / abs_slope) 

263 

264 @cached_property 

265 def intercept(self) -> float: 

266 """Get the linear regression. y-intercept of the data. 

267 

268 :return: y-intercept at t=0. 

269 """ 

270 if len(self.data_values) <= 1: 

271 return float('nan') 

272 return float(self._linregress.intercept) 

273 

274 @cached_property 

275 def correlation(self) -> float: 

276 """Get the linear regression correlation of the data. 

277 

278 Note, correlation of constant (flat) data is 0. 

279 

280 :return: Correlation as r-value. 

281 """ 

282 if len(self.data_values) <= 1: 282 ↛ 283line 282 didn't jump to line 283 because the condition on line 282 was never true

283 return float('nan') 

284 return float(self._linregress.rvalue) 

285 

286 @override 

287 def describe(self) -> dict[str, object]: 

288 return (super().describe() 

289 | {'slope': self.slope, 

290 'rse': self.stderr_relative}) 

291 

292 @override 

293 def solve(self, t: FloatArray) -> FloatArray: 

294 return np.atleast_1d(self.slope * t + self.intercept) 

295 

296 @cached_property 

297 def _linregress(self) -> Any: 

298 """Linear regression statistics of the data (scipy). 

299 

300 :return: Result as scipy's linregress object. 

301 """ 

302 with warnings.catch_warnings(record=True) as w: 

303 stats = sp.stats.linregress(self.data_t, self.filtered_series) 

304 # log warning data 

305 for warning in w: 

306 level = logging.WARNING 

307 if warning.category is RuntimeWarning or warning.category is UserWarning: 307 ↛ 311line 307 didn't jump to line 311 because the condition on line 307 was always true

308 # RuntimeWarning: invalid value encountered in scalar divide data=... 

309 # UserWarning: kernel_size exceeds volume extent: the volume will be zero-padded 

310 level = logging.DEBUG 

311 logger.log(level, f"{warning.category} {warning.message} data={self.data_values}") 

312 return stats 

313 

314 

315@dataclass(kw_only=True) 

316class FilterStable(FilterMedianLinear): 

317 """Stable filter for 2D data. 

318 

319 Data is determined as stable (valid) when the slope is small and correlation high. 

320 

321 :param data_values: Input data values. 

322 :param max_abs_input_range: Allowed absolute noise threshold over the input range. 

323 :param data_t: Time data. 

324 :param max_slope: Maximum allowed slope in given unit / time [x/y]. 

325 :param max_rse: Maximum relative standard error of the slope when slope is high [n]. 

326 :param max_stddev: Maximum standard deviation when slope is low [y]. 

327 """ 

328 

329 max_slope: float 

330 max_rse: float 

331 max_stddev: float 

332 

333 @override 

334 def is_valid(self) -> bool: 

335 return (abs(self.slope) <= self.max_slope 

336 and (abs(self.stderr_relative) <= self.max_rse or self.stddev <= self.max_stddev) 

337 and super().is_valid()) 

338 

339 

340@dataclass(kw_only=True) 

341class FilterExpDecay(FilterMath): 

342 """Exponentially decaying filter. 

343 

344 :param data_values: Input data values. 

345 :param data_t: Time data. 

346 :param max_linalg: Maximum value for the linear algebra condition number of the covariance matrix. 

347 A typical valid value is in range of 1E5-20E6. 

348 :raises ValueError: When data is not valid for exponential decay filter. 

349 """ 

350 

351 max_linalg: Final[float] = field(default=20E6, repr=False) # pylint: disable=invalid-name 

352 

353 MIN_SAMPLES: Final[int] = field(init=False, default=4, repr=False) # pylint: disable=invalid-name 

354 

355 a: np.float64 = field(init=False, repr=False) 

356 """Amplitude constant.""" 

357 k: np.float64 = field(init=False, repr=False) 

358 """Exponent constant. Absolute value. Negative value is used in :py:meth:`model_func`""" 

359 c: np.float64 = field(init=False, repr=False) 

360 """Constant where the function decays to, i.e. the limit value.""" 

361 parm_cov: FloatArray = field(init=False, repr=False) 

362 """Numpy covariance matrix of the parameters.""" 

363 t_zero: np.float64 = field(init=False, repr=False) 

364 """Zero time point.""" 

365 t_range: np.float64 = field(init=False, repr=False) 

366 """Time range for normalized time.""" 

367 

368 @staticmethod 

369 def model_func(t: FloatArray, a: Float, k: Float, c: Float) -> FloatArray: 

370 """Mathematical function for exponential decay used by this filter. 

371 

372 :param t: Time. 

373 :param a: Amplitude constant. 

374 :param k: Exponent constant. 

375 :param c: Constant where the function decays to, i.e. the limit value. 

376 :return: Value of the function at time t. 

377 """ 

378 return a * np.exp(k * t) + c 

379 

380 @classmethod 

381 def create[T: FilterExpDecay](cls: type[T], data: dict[float, float]) -> T: 

382 """Create a FilterExpDecay from dictionary data. 

383 

384 :param data: <time>: <value> dictionary. 

385 :return: FilterExpDecay. 

386 """ 

387 return cls(data_t=np.array(list(data.keys())), data_values=np.array(list(data.values()))) 

388 

389 def __post_init__(self) -> None: 

390 """Initialize the curve fitting model. 

391 

392 :raises ValueError: When data is not valid for exponential decay filter. 

393 """ 

394 # Numpy does not solve data with large x-values. Normalize time data. 

395 self.t_zero = self.data_t[0] 

396 self.t_range = self.data_t[-1] - self.t_zero 

397 

398 try: 

399 if self.t_range > 0.0 and self.data_values.size >= FilterExpDecay.MIN_SAMPLES and (float(np.max(self.data_values)) - float(np.min(self.data_values))) > 0.0: 

400 # Normalized time data to 0..1 

401 data_t_normalized = (self.data_t - self.t_zero) / self.t_range 

402 

403 # maxfev=10000 

404 (self.a, self.k, self.c), self.parm_cov = sp.optimize.curve_fit(FilterExpDecay.model_func, data_t_normalized, self.data_values, p0=self._p0, bounds=self.bounds) 

405 # check the parameters did not converge at bounds. Avoids false positive 

406 eps = 1e-6 # bounds should have margin 

407 if not (self.bounds[0][0] + eps < self.a < self.bounds[1][0] - eps 

408 and self.bounds[0][1] + eps < self.k < self.bounds[1][1] - eps 

409 and self.bounds[0][2] + eps < self.c < self.bounds[1][2] - eps): 

410 raise ValueError("Parameters converged at bounds") 

411 else: 

412 raise ValueError("Data is not valid for exponential decay filter") 

413 except (RuntimeError, TypeError, ValueError) as _err: 

414 # RuntimeError: When parameters are not solved 

415 # TypeError: When too few data points (2) 

416 # ValueError: When dataset is empty 

417 # logger.exception(_err) 

418 self.parm_cov = np.array([[sys.float_info.max], [sys.float_info.max]]) 

419 self.a = np.float64(0.0) 

420 self.k = np.float64(1.0) 

421 self.c = np.float64(0.0) 

422 

423 @cached_property 

424 def bounds(self) -> tuple[list[float], list[float]]: 

425 """Get bounds for the model. 

426 

427 :return: Tuple of (lower_bounds, upper_bounds) for [a, k, c]. 

428 """ 

429 # Set bound for decay, i.e. k is negative. 

430 return ([-np.inf, -np.inf, -np.inf], 

431 [np.inf, -0.0, np.inf]) 

432 

433 @cached_property 

434 def _p0(self) -> list[float]: 

435 """Get initial guess for the curve fitting parameters. 

436 

437 :return: List of initial guess parameters [a, k, c]. 

438 """ 

439 c0 = self.data_values[-1] 

440 a0 = self.data_values[0] - c0 

441 return [a0, -1.0, c0] 

442 

443 @override 

444 def solve(self, t: FloatArray) -> FloatArray: 

445 t_norm = self._t_normalize(np.float64(t)) 

446 return self.model_func(np.asarray(t_norm), self.a, self.k, self.c) 

447 

448 @override 

449 def get_output(self) -> float: 

450 """Get the final limit value into which the function approaches. 

451 

452 :return: Final value, i.e., :attr:`.c`. 

453 """ 

454 return self.c.item() 

455 

456 def get_time_constant(self) -> float: 

457 """Get the time constant of the exponential decay function. 

458 

459 :return: Time constant [s]. 

460 """ 

461 return float(1.0 / -self.k * self.t_range) 

462 

463 def is_pos_or_neg(self) -> bool: 

464 """Check whether the function is rising or decreasing. 

465 

466 :return: True if rising. 

467 """ 

468 return bool(self.a >= 0.0) 

469 

470 def get_amplitude(self) -> float: 

471 """Get the amplitude constant, i.e., the difference of the first sample and the final value. 

472 

473 :return: Amplitude. 

474 """ 

475 return float(self.a) 

476 

477 def get_value_tc(self, value: float) -> float: 

478 """Get time as a multiple of the time constant for given value. 

479 

480 Example: A value at 63.5% of the final value would result 1.0 (1 time constant). 

481 

482 :param value: Value to calculate the time constant for. 

483 :return: Multiple of the time constant for given value. 

484 """ 

485 return self.get_time_constant() * float(np.log(self.a / value)) 

486 

487 def has_enough_data(self) -> bool: 

488 """Was enough data points provided to solve the exponential decay function. 

489 

490 :return: True is enough data points. 

491 """ 

492 return self.data_values.size >= FilterExpDecay.MIN_SAMPLES 

493 

494 def find_worst_outlier(self) -> tuple[float, float]: 

495 """Get the worst outlier from the data. 

496 

497 :return: Tuple of (time, value) of the worst outlier. 

498 """ 

499 # Calculate residuals 

500 residuals = self.data_values - self.solve(self.data_t) 

501 # Find the index of the largest absolute residual 

502 worst_outlier_idx = np.argmax(np.abs(residuals)) 

503 return self.data_t[worst_outlier_idx].item(), self.data_values[worst_outlier_idx].item() 

504 

505 @override 

506 def is_valid(self, max_linalg: float | None = None) -> bool: 

507 """Validate the data resembles exponentially decaying function. 

508 

509 - At least 4 data points. 

510 - Numpy evaluates the data as valid. 

511 

512 :param max_linalg: Maximum value for the linear algebra condition number of the covariance matrix. 

513 :return: True: model is reliable. 

514 """ 

515 return (self.has_enough_data() 

516 and super().is_valid() 

517 and self._is_exp_valid(max_linalg) 

518 and self.get_time_constant() > 0) 

519 

520 @override 

521 def describe(self) -> dict[str, object]: 

522 return (super().describe() 

523 | { 

524 'tau': self.get_time_constant(), 

525 'lin': float(np.linalg.cond(self.parm_cov)), 

526 # 'cov': f"{self.parm_cov}", 

527 }) 

528 

529 def _t_normalize(self, t: np.float64) -> np.float64: 

530 """Normalize time. 

531 

532 :param t: Time to normalize [s]. 

533 :return: Normalized time. 

534 """ 

535 return (t - self.t_zero) / self.t_range 

536 

537 def _is_exp_valid(self, max_linalg: float | None) -> bool: 

538 """Evaluate how well Numpy solves the exponential decay model. 

539 

540 :param max_linalg: Maximum value for the linear algebra condition number of the covariance matrix. None to use default value 

541 :return: True if the exponential decay model is valid. 

542 """ 

543 # Check for invalid values 

544 if np.isinf(self.parm_cov).any() or np.isnan(self.parm_cov).any(): 544 ↛ 545line 544 didn't jump to line 545 because the condition on line 544 was never true

545 return False 

546 

547 # lin = np.linalg.cond(self.parm_cov) 

548 # A large value is cause for concern. The diagonal elements of the covariance matrix, which is related to uncertainty of the fit, give more information: 

549 # For further checks, evaluate :py:meth:`np.diag(self.parm_cov)` 

550 

551 # Clamp small values to avoid misleadingly high ratios. Ideal data results in range of 1e-21..1e-25 

552 singular_values_clipped = np.maximum(np.linalg.svd(self.parm_cov, compute_uv=False), 1e-20) 

553 lin = singular_values_clipped.max() / singular_values_clipped.min() 

554 

555 if not max_linalg: 555 ↛ 557line 555 didn't jump to line 557 because the condition on line 555 was always true

556 max_linalg = self.max_linalg 

557 if lin > max_linalg: 

558 return False 

559 return bool(self.a != 0.0) 

560 

561 

562class FilterMonotonicImpl(FilterMonotonic): 

563 """Dummy implementation of FilterMonotonic.""" 

564 

565 @override 

566 def get_output(self) -> float: 

567 return 0.0 

568 

569 

570@dataclass(kw_only=True) 

571class FilterExpDecayZ(FilterExpDecay): 

572 """Exponentially decaying filter combined with min/max time constant and monotonic data validation. 

573 

574 :param data_values: See parent class: :py:class:`FilterExpDecay`. 

575 :param data_t: See parent class: :py:class:`FilterExpDecay`. 

576 :param a: See parent class: :py:class:`FilterExpDecay`. 

577 :param k: See parent class: :py:class:`FilterExpDecay`. 

578 :param c: See parent class: :py:class:`FilterExpDecay`. 

579 :param parm_cov: See parent class: :py:class:`FilterExpDecay`. 

580 :param t_zero: See parent class: :py:class:`FilterExpDecay`. 

581 :param t_range: See parent class: :py:class:`FilterExpDecay`. 

582 :param max_linalg: See parent class: :py:class:`FilterExpDecay`. 

583 :param tau_min_th: Minimum time constant [s]. 

584 :param tau_max_th: Maximum time constant [s]. 

585 :param noise_rel_range_th: See :py:class:`FilterMonotonic`. 

586 :param noise_abs_range_th: See :py:class:`FilterMonotonic`. 

587 :raises ValueError: When data is not valid for exponential decay filter. 

588 """ 

589 

590 tau_min_th: float 

591 tau_max_th: float 

592 noise_rel_range_th: float 

593 noise_abs_range_th: float 

594 

595 _filter_monotonic: FilterMonotonic = field(init=False) 

596 """In addition to the exponential decay, ensure that the data is monotonic within the noise tolerance.""" 

597 

598 @override 

599 def __post_init__(self) -> None: 

600 super().__post_init__() 

601 self._filter_monotonic = FilterMonotonicImpl(data_values=self.data_values, noise_rel_range_th=self.noise_rel_range_th, noise_abs_range_th=self.noise_abs_range_th) 

602 

603 @override 

604 @cached_property 

605 def bounds(self) -> tuple[list[float], list[float]]: 

606 # Provide both initial guesses (p0) and bounds based on the Z-offset. 

607 # - Z-offset can vary +-0.5 mm and is positive 

608 # - Time constant is within self.tau_min_th...self.tau_max_th 

609 max_amp = 0.5 # Max allowed amplitude, i.e. thermal expansion [mm] 

610 c0 = self.data_values[-1] 

611 self.t_zero = np.min(self.data_t) 

612 k_min = -1.0 / np.float64(self.tau_min_th / self.t_range) 

613 k_max = -1.0 / np.float64(self.tau_max_th / self.t_range) 

614 return ([-max_amp, k_min, max(0.0, c0 - max_amp)], 

615 [max_amp, k_max, max(0.0, c0 + max_amp)]) 

616 

617 @override 

618 @cached_property 

619 def _p0(self) -> list[float]: 

620 k_mid = np.mean([self.bounds[0][1], self.bounds[1][1]]) 

621 return [0.0, k_mid, self.data_values[-1]] 

622 

623 @override 

624 def is_valid(self, max_linalg: float | None = None) -> bool: 

625 return (self._filter_monotonic.is_valid() 

626 and super().is_valid(max_linalg=max_linalg) 

627 and (self.tau_min_th <= self.get_time_constant() <= self.tau_max_th) 

628 and self.get_output() >= 0.0) 

629 

630 @override 

631 def describe(self) -> dict[str, object]: 

632 return (self._filter_monotonic.describe() # First the abstract filter to override unimplemented keys 

633 | super().describe())