Coverage for src / zooc / run / model_z_offset.py: 90%
112 statements
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-11 21:45 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-11 21:45 +0000
1"""Z-offset model based on measurements."""
2import logging
3from abc import ABC, abstractmethod
4from dataclasses import dataclass, field
5from typing import override
7import numpy as np
9from zooc.dsp.filters import FilterExpDecayZ, FilterMath, FilterStable
11logger = logging.getLogger(__name__)
14@dataclass(kw_only=True)
15class ModelZOffset(ABC):
16 """Base class for the Z-offset model based on measurements."""
18 dict_time_z: dict[float, float] = field(init=False, default_factory=dict)
19 """Time and Z-offset data points."""
20 t_0: float | None = field(init=False, default=None)
21 """1st sample time."""
22 filter_model: FilterMath | None = field(init=False, default=None)
23 """Filter model based on the latest valid measurement."""
24 z_prev: float | None = field(init=False, default=None)
25 """Latest valid Z-offset value [mm]."""
27 @staticmethod
28 def log_data(filter_model: FilterMath | None) -> str:
29 """Convert to TOML formatted string.
31 Format::
33 z_samples=[[x0,y0],
34 [x1,y1],
35 [...]]
37 :param filter_model: Filter model.
38 :return: Input data in TOML format.
39 """
40 if filter_model: 40 ↛ 42line 40 didn't jump to line 42 because the condition on line 40 was always true
41 return f"z_samples = [\n{',\n'.join(f' [{t},{z}]' for t, z in zip(filter_model.data_t, filter_model.data_values, strict=True))}]"
42 return f"z_samples = [{float('nan')}]"
44 def calc_offset(self, sample: tuple[float, float], max_delta: float) -> bool:
45 """Calculate the final Z-offset based on the previous and the latest measurements.
47 :param sample: Z-offset measurement.
48 :param max_delta: Maximum allowed deviation between successful forecasted value.
49 :returns: True if valid measurement was acquired.
50 """
51 if self.t_0 is None:
52 self.t_0 = sample[0]
53 sample = (sample[0] - self.t_0, sample[1])
55 return self._calc_offset(sample=sample, max_delta=max_delta) is not None
57 @abstractmethod
58 def _calc_offset(self, sample: tuple[float, float], max_delta: float) -> float | None:
59 """Calculate the final Z-offset based on the previous and the latest measurements.
61 :param sample: Z-offset measurement.
62 :param max_delta: Maximum allowed deviation between successful forecasted value.
63 :returns: The calculated Z-offset or None if more measurements are needed.
64 """
66 def describe(self) -> dict[str, object]:
67 """Describe the internal state of the model.
69 :return: Dictionary with the model state.
70 """
71 return {'type': self.__class__.__name__,
72 'filter': self.filter_model.describe() if self.filter_model else None}
74 @abstractmethod
75 def create_filter(self, data: dict[float, float]) -> FilterMath:
76 """Create a filter model based on the collected data.
78 :param data: <time>: <value> dictionary.
79 :return: Filter model if converged, None if not converged.
80 """
83@dataclass(kw_only=True)
84class ModelZOffsetStable(ModelZOffset):
85 """Z-offset model for stable state where the offset is not changing.
87 :param t_end: Time to stop the estimation [s].
88 :param t_forecast: Time in the future to forecast the Z-offset starting from the last sample [s].
89 :param mean_samples: Number of samples used for the measurement.
90 :param max_abs_input_range: Allowed absolute noise threshold over the input range.
91 """
93 t_end: float
94 t_forecast: float
96 mean_samples: int = field(default=7, repr=False)
97 min_samples: int = field(init=False, default=7, repr=False)
98 max_abs_input_range: float = field(default=0.1) # Not used. The max slope is used instead.
99 completed: bool = field(init=False, default=False)
100 """When no more data is needed."""
102 @override
103 def _calc_offset(self, sample: tuple[float, float], max_delta: float) -> float | None:
104 # Recreate the dictionary with only the last mean_samples
105 self.dict_time_z[sample[0]] = sample[1]
106 if len(self.dict_time_z) > self.mean_samples: 106 ↛ 107line 106 didn't jump to line 107 because the condition on line 106 was never true
107 self.dict_time_z = dict(list(self.dict_time_z.items())[-self.mean_samples:])
109 filter_stable = self.create_filter(self.dict_time_z)
110 z_final = filter_stable.solve_float(sample[0] + self.t_forecast)
111 logger.info(f"Forecast: z={z_final}@{sample[0] + self.t_forecast:.1f}, model={filter_stable.describe()}")
112 logger.debug(self.log_data(filter_stable))
114 self.completed = sample[0] > self.t_end
116 if len(self.dict_time_z) >= self.min_samples and filter_stable.is_valid():
117 # Store the valid measurement. Next valid measurements will overwrite this.
118 self.filter_model = filter_stable
119 self.z_prev = z_final
120 return self.z_prev
121 return None
123 @override
124 def create_filter(self, data: dict[float, float]) -> FilterStable:
125 return FilterStable(data_t=np.array(list(self.dict_time_z.keys())),
126 data_values=np.array(list(self.dict_time_z.values())),
127 max_abs_input_range=self.max_abs_input_range,
128 max_slope=0.00004, # [mm/s], Slope should be flat. 0.00004 -> 0.04µm/s -> 2.4 µm/min
129 max_rse=0.2, # [%], Slope should have low error...
130 max_stddev=0.0020) # [mm], ...or data should have low noise.
132 @staticmethod
133 def _get_closest_neighbors(data: dict[float, float], target_key: float, neighbors: int) -> dict[float, float]:
134 """Retrieve the key-value pairs whose keys are closest in value to the target_key.
136 :param data: The input data.
137 :param target_key: The key to find the closest neighbors for.
138 :param neighbors: The number of closest neighbors to retrieve.
139 :return: Dictionary containing the `neighbors` key-value pairs from the input `data`.
140 """
141 # Sort the dictionary items based on the absolute difference between the key and the target
142 closest_items = sorted(data.items(), key=lambda item: abs(item[0] - target_key))
143 # Take the top N items and convert back to a dictionary (maintaining key order)
144 return dict(sorted(closest_items[:neighbors]))
147@dataclass(kw_only=True)
148class ModelZOffsetExp(ModelZOffset):
149 """Modeling the settling of Z-offset due to heat expansion over time.
151 The model provides the estimated Z-offset value based on the data collected.
153 :param require_success: How many consecutive measurements should yield valid offset.
154 :param tau_min_th: Minimum time constant for the exp filter.
155 Time constant should be reasonable, at least some seconds [s].
156 :param tau_max_th: Maximum time constant for the exp filter.
157 Large printers may have time constant of many minutes [s].
158 :param noise_rel_range_th: Allowed relative noise range for the monotonicity.
159 Allow 2% full range noise (or noise_abs_range_th which ever is larger) [%].
160 :param noise_abs_range_th: Allowed absolute noise range for the monotonicity.
161 """
163 require_success: int = field(default=2, repr=False) # pylint: disable=invalid-name
164 tau_min_th: float = field(default=30.0)
165 tau_max_th: float = field(default=600.0)
166 noise_rel_range_th: float = field(default=0.02)
167 noise_abs_range_th: float = field(default=0.01) # Settling should be monotonic. Allow some tolerance
169 min_samples: int = field(init=False, default=4, repr=False) # pylint: disable=invalid-name
170 max_samples: int = field(init=False, default=6, repr=False) # pylint: disable=invalid-name
172 success: int = field(init=False, default=0)
173 """Consecutive, valid measurement counter"""
175 @override
176 def _calc_offset(self, sample: tuple[float, float], max_delta: float) -> float | None:
177 # Add the new sample. Always start with whole dataset. Copy for processing
178 self.dict_time_z[sample[0]] = sample[1]
179 dict_time_z = dict(self.dict_time_z)
181 filter_exp = self.create_filter(dict_time_z)
182 z_final = filter_exp.get_output()
183 logger.info(f"Forecast: z={z_final}@inf, model={filter_exp.describe()}")
185 # If not enough samples collected yet, continue measuring.
186 if len(dict_time_z) < self.min_samples:
187 return None
189 if filter_exp.is_valid():
190 # If preliminary checks doesn't pass, continue measuring.
191 # Exponential decay is quite linear at the beginning. Ensure the data is measured at least up to the time constant.
192 if sample[0] < 1.0 * filter_exp.get_time_constant():
193 return None
195 return self._process_candidate(filter_exp=filter_exp, z_final=z_final, max_delta=max_delta)
197 @override
198 def create_filter(self, data: dict[float, float]) -> FilterExpDecayZ:
199 return FilterExpDecayZ(data_t=np.array(list(data.keys())),
200 data_values=np.array(list(data.values())),
201 tau_min_th=self.tau_min_th,
202 tau_max_th=self.tau_max_th,
203 noise_rel_range_th=self.noise_rel_range_th,
204 noise_abs_range_th=self.noise_abs_range_th)
206 # For even more thorough check, ensure that the time constant and direction of decay matches that of the previously obtained filter.
207 # e.g: filter_model.is_pos_or_neg() == self.filter_model.is_pos_or_neg()
209 @override
210 def describe(self) -> dict[str, object]:
211 return (super().describe()
212 | {'remaining': (self.require_success - self.success)})
214 def _process_candidate(self, filter_exp: FilterExpDecayZ, z_final: float, max_delta: float) -> float | None:
215 """Compare the value to previous consecutive valid results. If stable, return the final value, otherwise continue measuring.
217 If the filter is not valid, reset the counter. I.e. require consecutive valid results.
219 :param filter_exp: Filter model.
220 :param z_final: Candidate Z-offset value.
221 :param max_delta: Maximum allowed deviation between successful forecasted value.
222 :returns: The calculated Z-offset or None if more measurements are needed.
223 """
224 # Handle invalid measurement first
225 if not filter_exp.is_valid():
226 self.filter_model = None
227 self.z_prev = None
228 self.success = 0
229 return None
231 self.filter_model = filter_exp
233 # Handle first valid measurement (Set Baseline)
234 if self.z_prev is None:
235 self.z_prev = z_final
236 return None # Baseline set, need more data
238 # Compare against previous value
239 delta = abs(z_final - self.z_prev)
240 self.z_prev = z_final # Update baseline for the next round
241 if delta <= max_delta: 241 ↛ 248line 241 didn't jump to line 248 because the condition on line 241 was always true
242 self.success += 1
243 if self.success >= self.require_success: 243 ↛ 244line 243 didn't jump to line 244 because the condition on line 243 was never true
244 return z_final # Return final stable Z-offset
245 logger.info(f"Z-offset calculated {self.success}/{self.require_success}: Collecting more data")
246 else:
247 # Unstable: reset counter
248 self.success = 0
249 logger.info(f"Z-offset not stable yet: delta={delta:.3f}/{max_delta}. Measure again")
250 return None