Coverage for src / zooc / run / model_z_offset.py: 90%

112 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2025-12-11 21:45 +0000

1"""Z-offset model based on measurements.""" 

2import logging 

3from abc import ABC, abstractmethod 

4from dataclasses import dataclass, field 

5from typing import override 

6 

7import numpy as np 

8 

9from zooc.dsp.filters import FilterExpDecayZ, FilterMath, FilterStable 

10 

11logger = logging.getLogger(__name__) 

12 

13 

14@dataclass(kw_only=True) 

15class ModelZOffset(ABC): 

16 """Base class for the Z-offset model based on measurements.""" 

17 

18 dict_time_z: dict[float, float] = field(init=False, default_factory=dict) 

19 """Time and Z-offset data points.""" 

20 t_0: float | None = field(init=False, default=None) 

21 """1st sample time.""" 

22 filter_model: FilterMath | None = field(init=False, default=None) 

23 """Filter model based on the latest valid measurement.""" 

24 z_prev: float | None = field(init=False, default=None) 

25 """Latest valid Z-offset value [mm].""" 

26 

27 @staticmethod 

28 def log_data(filter_model: FilterMath | None) -> str: 

29 """Convert to TOML formatted string. 

30 

31 Format:: 

32 

33 z_samples=[[x0,y0], 

34 [x1,y1], 

35 [...]] 

36 

37 :param filter_model: Filter model. 

38 :return: Input data in TOML format. 

39 """ 

40 if filter_model: 40 ↛ 42line 40 didn't jump to line 42 because the condition on line 40 was always true

41 return f"z_samples = [\n{',\n'.join(f' [{t},{z}]' for t, z in zip(filter_model.data_t, filter_model.data_values, strict=True))}]" 

42 return f"z_samples = [{float('nan')}]" 

43 

44 def calc_offset(self, sample: tuple[float, float], max_delta: float) -> bool: 

45 """Calculate the final Z-offset based on the previous and the latest measurements. 

46 

47 :param sample: Z-offset measurement. 

48 :param max_delta: Maximum allowed deviation between successful forecasted value. 

49 :returns: True if valid measurement was acquired. 

50 """ 

51 if self.t_0 is None: 

52 self.t_0 = sample[0] 

53 sample = (sample[0] - self.t_0, sample[1]) 

54 

55 return self._calc_offset(sample=sample, max_delta=max_delta) is not None 

56 

57 @abstractmethod 

58 def _calc_offset(self, sample: tuple[float, float], max_delta: float) -> float | None: 

59 """Calculate the final Z-offset based on the previous and the latest measurements. 

60 

61 :param sample: Z-offset measurement. 

62 :param max_delta: Maximum allowed deviation between successful forecasted value. 

63 :returns: The calculated Z-offset or None if more measurements are needed. 

64 """ 

65 

66 def describe(self) -> dict[str, object]: 

67 """Describe the internal state of the model. 

68 

69 :return: Dictionary with the model state. 

70 """ 

71 return {'type': self.__class__.__name__, 

72 'filter': self.filter_model.describe() if self.filter_model else None} 

73 

74 @abstractmethod 

75 def create_filter(self, data: dict[float, float]) -> FilterMath: 

76 """Create a filter model based on the collected data. 

77 

78 :param data: <time>: <value> dictionary. 

79 :return: Filter model if converged, None if not converged. 

80 """ 

81 

82 

83@dataclass(kw_only=True) 

84class ModelZOffsetStable(ModelZOffset): 

85 """Z-offset model for stable state where the offset is not changing. 

86 

87 :param t_end: Time to stop the estimation [s]. 

88 :param t_forecast: Time in the future to forecast the Z-offset starting from the last sample [s]. 

89 :param mean_samples: Number of samples used for the measurement. 

90 :param max_abs_input_range: Allowed absolute noise threshold over the input range. 

91 """ 

92 

93 t_end: float 

94 t_forecast: float 

95 

96 mean_samples: int = field(default=7, repr=False) 

97 min_samples: int = field(init=False, default=7, repr=False) 

98 max_abs_input_range: float = field(default=0.1) # Not used. The max slope is used instead. 

99 completed: bool = field(init=False, default=False) 

100 """When no more data is needed.""" 

101 

102 @override 

103 def _calc_offset(self, sample: tuple[float, float], max_delta: float) -> float | None: 

104 # Recreate the dictionary with only the last mean_samples 

105 self.dict_time_z[sample[0]] = sample[1] 

106 if len(self.dict_time_z) > self.mean_samples: 106 ↛ 107line 106 didn't jump to line 107 because the condition on line 106 was never true

107 self.dict_time_z = dict(list(self.dict_time_z.items())[-self.mean_samples:]) 

108 

109 filter_stable = self.create_filter(self.dict_time_z) 

110 z_final = filter_stable.solve_float(sample[0] + self.t_forecast) 

111 logger.info(f"Forecast: z={z_final}@{sample[0] + self.t_forecast:.1f}, model={filter_stable.describe()}") 

112 logger.debug(self.log_data(filter_stable)) 

113 

114 self.completed = sample[0] > self.t_end 

115 

116 if len(self.dict_time_z) >= self.min_samples and filter_stable.is_valid(): 

117 # Store the valid measurement. Next valid measurements will overwrite this. 

118 self.filter_model = filter_stable 

119 self.z_prev = z_final 

120 return self.z_prev 

121 return None 

122 

123 @override 

124 def create_filter(self, data: dict[float, float]) -> FilterStable: 

125 return FilterStable(data_t=np.array(list(self.dict_time_z.keys())), 

126 data_values=np.array(list(self.dict_time_z.values())), 

127 max_abs_input_range=self.max_abs_input_range, 

128 max_slope=0.00004, # [mm/s], Slope should be flat. 0.00004 -> 0.04µm/s -> 2.4 µm/min 

129 max_rse=0.2, # [%], Slope should have low error... 

130 max_stddev=0.0020) # [mm], ...or data should have low noise. 

131 

132 @staticmethod 

133 def _get_closest_neighbors(data: dict[float, float], target_key: float, neighbors: int) -> dict[float, float]: 

134 """Retrieve the key-value pairs whose keys are closest in value to the target_key. 

135 

136 :param data: The input data. 

137 :param target_key: The key to find the closest neighbors for. 

138 :param neighbors: The number of closest neighbors to retrieve. 

139 :return: Dictionary containing the `neighbors` key-value pairs from the input `data`. 

140 """ 

141 # Sort the dictionary items based on the absolute difference between the key and the target 

142 closest_items = sorted(data.items(), key=lambda item: abs(item[0] - target_key)) 

143 # Take the top N items and convert back to a dictionary (maintaining key order) 

144 return dict(sorted(closest_items[:neighbors])) 

145 

146 

147@dataclass(kw_only=True) 

148class ModelZOffsetExp(ModelZOffset): 

149 """Modeling the settling of Z-offset due to heat expansion over time. 

150 

151 The model provides the estimated Z-offset value based on the data collected. 

152 

153 :param require_success: How many consecutive measurements should yield valid offset. 

154 :param tau_min_th: Minimum time constant for the exp filter. 

155 Time constant should be reasonable, at least some seconds [s]. 

156 :param tau_max_th: Maximum time constant for the exp filter. 

157 Large printers may have time constant of many minutes [s]. 

158 :param noise_rel_range_th: Allowed relative noise range for the monotonicity. 

159 Allow 2% full range noise (or noise_abs_range_th which ever is larger) [%]. 

160 :param noise_abs_range_th: Allowed absolute noise range for the monotonicity. 

161 """ 

162 

163 require_success: int = field(default=2, repr=False) # pylint: disable=invalid-name 

164 tau_min_th: float = field(default=30.0) 

165 tau_max_th: float = field(default=600.0) 

166 noise_rel_range_th: float = field(default=0.02) 

167 noise_abs_range_th: float = field(default=0.01) # Settling should be monotonic. Allow some tolerance 

168 

169 min_samples: int = field(init=False, default=4, repr=False) # pylint: disable=invalid-name 

170 max_samples: int = field(init=False, default=6, repr=False) # pylint: disable=invalid-name 

171 

172 success: int = field(init=False, default=0) 

173 """Consecutive, valid measurement counter""" 

174 

175 @override 

176 def _calc_offset(self, sample: tuple[float, float], max_delta: float) -> float | None: 

177 # Add the new sample. Always start with whole dataset. Copy for processing 

178 self.dict_time_z[sample[0]] = sample[1] 

179 dict_time_z = dict(self.dict_time_z) 

180 

181 filter_exp = self.create_filter(dict_time_z) 

182 z_final = filter_exp.get_output() 

183 logger.info(f"Forecast: z={z_final}@inf, model={filter_exp.describe()}") 

184 

185 # If not enough samples collected yet, continue measuring. 

186 if len(dict_time_z) < self.min_samples: 

187 return None 

188 

189 if filter_exp.is_valid(): 

190 # If preliminary checks doesn't pass, continue measuring. 

191 # Exponential decay is quite linear at the beginning. Ensure the data is measured at least up to the time constant. 

192 if sample[0] < 1.0 * filter_exp.get_time_constant(): 

193 return None 

194 

195 return self._process_candidate(filter_exp=filter_exp, z_final=z_final, max_delta=max_delta) 

196 

197 @override 

198 def create_filter(self, data: dict[float, float]) -> FilterExpDecayZ: 

199 return FilterExpDecayZ(data_t=np.array(list(data.keys())), 

200 data_values=np.array(list(data.values())), 

201 tau_min_th=self.tau_min_th, 

202 tau_max_th=self.tau_max_th, 

203 noise_rel_range_th=self.noise_rel_range_th, 

204 noise_abs_range_th=self.noise_abs_range_th) 

205 

206 # For even more thorough check, ensure that the time constant and direction of decay matches that of the previously obtained filter. 

207 # e.g: filter_model.is_pos_or_neg() == self.filter_model.is_pos_or_neg() 

208 

209 @override 

210 def describe(self) -> dict[str, object]: 

211 return (super().describe() 

212 | {'remaining': (self.require_success - self.success)}) 

213 

214 def _process_candidate(self, filter_exp: FilterExpDecayZ, z_final: float, max_delta: float) -> float | None: 

215 """Compare the value to previous consecutive valid results. If stable, return the final value, otherwise continue measuring. 

216 

217 If the filter is not valid, reset the counter. I.e. require consecutive valid results. 

218 

219 :param filter_exp: Filter model. 

220 :param z_final: Candidate Z-offset value. 

221 :param max_delta: Maximum allowed deviation between successful forecasted value. 

222 :returns: The calculated Z-offset or None if more measurements are needed. 

223 """ 

224 # Handle invalid measurement first 

225 if not filter_exp.is_valid(): 

226 self.filter_model = None 

227 self.z_prev = None 

228 self.success = 0 

229 return None 

230 

231 self.filter_model = filter_exp 

232 

233 # Handle first valid measurement (Set Baseline) 

234 if self.z_prev is None: 

235 self.z_prev = z_final 

236 return None # Baseline set, need more data 

237 

238 # Compare against previous value 

239 delta = abs(z_final - self.z_prev) 

240 self.z_prev = z_final # Update baseline for the next round 

241 if delta <= max_delta: 241 ↛ 248line 241 didn't jump to line 248 because the condition on line 241 was always true

242 self.success += 1 

243 if self.success >= self.require_success: 243 ↛ 244line 243 didn't jump to line 244 because the condition on line 243 was never true

244 return z_final # Return final stable Z-offset 

245 logger.info(f"Z-offset calculated {self.success}/{self.require_success}: Collecting more data") 

246 else: 

247 # Unstable: reset counter 

248 self.success = 0 

249 logger.info(f"Z-offset not stable yet: delta={delta:.3f}/{max_delta}. Measure again") 

250 return None