Coverage for src / klipper_utils / klipper_module.py: 18%

96 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2025-12-11 21:45 +0000

1"""Klipper module with some helper functions.""" 

2 

3import logging 

4from typing import Any 

5 

6from zooc.data.temps import Temps 

7 

8from .klipper_utils import Command, ConfigWrapper, Gcmd, KlipperUtils, KlipperUtilsError 

9 

10logger = logging.getLogger(__name__) 

11 

12 

13class KlipperModule: 

14 """Generic base Klipper module with some helper methods.""" 

15 

16 def __init__(self, config: ConfigWrapper, cmd_prefix: str) -> None: 

17 """Initialize the module. 

18 

19 :param config: Klipper config object. 

20 :param cmd_prefix: Common prefix used in all registered commands. 

21 """ 

22 self.name = config.get_name() 

23 self.printer = config.get_printer() # klippy/klippy.py 

24 self.gcode = self.printer.lookup_object('gcode') # klippy/gcode.py 

25 self.gcode_move = self.printer.lookup_object('gcode_move') # klippy/extras/gcode_move.py 

26 self.cmd_prefix = cmd_prefix 

27 

28 def get_time(self) -> float: 

29 """Readable helper to get Klipper time. 

30 

31 :return: Klipper time in seconds. 

32 """ 

33 return self.printer.get_reactor().monotonic() # type: ignore 

34 

35 @property 

36 def toolhead(self) -> Any: 

37 """Get the toolhead. 

38 

39 :return: Toolhead object. 

40 :raises KlipperUtilsError: If the printer is not ready. 

41 """ 

42 toolhead = self.printer.lookup_object("toolhead") 

43 if toolhead is None: 

44 raise KlipperUtilsError("Printer not ready") 

45 return toolhead 

46 

47 def check_homed_xyz(self, gcmd: Gcmd) -> None: 

48 """Check the required axes are homed. Raise error if not. 

49 

50 :param gcmd: G-code object. 

51 :raises gcmd.error: Klipper exception when some axis are not homed. 

52 """ 

53 kin_status = self.toolhead.get_kinematics().get_status(self.get_time()) 

54 if 'x' not in kin_status['homed_axes'] or 'y' not in kin_status['homed_axes'] or 'z' not in kin_status['homed_axes']: 

55 raise gcmd.error("Must home X, Y and Z axes first") 

56 

57 def move_relative(self, move_by: tuple[float | None, ...], speed: float) -> None: 

58 """Move to given relative XYZ-position. 

59 

60 :param move_by: Array of relative XYZ-coordinates to move to or None (or zero) to stay stationary. 

61 :param speed: Move with speed mm/s. 

62 """ 

63 self._move(move_abs=False, move=move_by, speed=speed) 

64 

65 def move_absolute(self, move_to: tuple[float, ...], speed: float) -> None: 

66 """Move to given absolute XYZ-position. 

67 

68 :param move_to: Array of XYZ-coordinates to move to or None to stay stationary. 

69 :param speed: Move with speed mm/s. 

70 """ 

71 self._move(move_abs=True, move=move_to, speed=speed) 

72 

73 def move_inside(self, move_inside: tuple[tuple[float, float] | None, ...], speed: float) -> None: 

74 """Move inside the given coordinated. Do not move if already inside. 

75 

76 :param move_inside: Min/max ranges for XYZ-coordinates to move inside to or None to stay stationary for that axis. 

77 :param speed: Move with speed mm/s. 

78 """ 

79 # Check if the coordinates are inside the given range 

80 move_to = self.toolhead.get_position().copy() 

81 for index, inside in enumerate(move_inside): 

82 if inside: 

83 if inside[0] <= move_to[index] <= inside[1]: 

84 move_to[index] = None # Do not move 

85 else: 

86 move_to[index] = inside[0] if move_to[index] < inside[0] else inside[1] 

87 self._move(move_abs=True, move=move_to, speed=speed) 

88 

89 def get_temperatures(self) -> tuple[Temps, Temps]: 

90 """Read current and target bed and extruder temperatures. 

91 

92 :return: Tuple of current and target temperatures. 

93 """ 

94 heaters = self.printer.lookup_object('heaters') 

95 time = self.get_time() 

96 bed_temp_curr, bed_temp_target = heaters.lookup_heater('heater_bed').get_temp(time) 

97 extruder_temp_curr, extruder_temp_target = heaters.lookup_heater('extruder').get_temp(time) 

98 return Temps.klipper(bed_temp_curr, extruder_temp_curr), Temps.klipper(bed_temp_target, extruder_temp_target) 

99 

100 def set_temperatures(self, gcmd: Gcmd, temps: Temps) -> None: 

101 """Set bed and extruder temperatures. Does not wait for those to reach the target. 

102 

103 :param gcmd: G-code object. 

104 :param temps: Set temperatures. 

105 """ 

106 if not temps.bed.enabled and not temps.extruder.enabled: 

107 return 

108 

109 gcmd.respond_info(f"Heating: {temps}") 

110 if temps.bed.enabled: 

111 self.gcode.run_script_from_command(f'M140 S{temps.bed_temp}') # Set bed temperature: M140 S<temperature> 

112 if temps.extruder.enabled: 

113 self.gcode.run_script_from_command(f'M104 S{temps.extruder_temp}') # Set extruder temperature: M104 [T<index>] [S<temperature>] 

114 

115 def wait_temperatures(self, gcmd: Gcmd, temps: Temps, tolerance_min: float, tolerance_max: float) -> None: 

116 """Wait temperatures to reach given level. Does not set them. 

117 

118 :param gcmd: G-code object. 

119 :param temps: Temperatures to wait. Usually the target temperature. 

120 :param tolerance_min: Allowed negative offset in degrees. 

121 :param tolerance_max: Allowed positive offset in degrees. 

122 :raises gcmd.error: If tolerance is not positive. 

123 """ 

124 if not temps.bed.enabled and not temps.extruder.enabled: 

125 return 

126 

127 gcmd.respond_info(f"Waiting: {temps}") 

128 # Assert causes this function just to return and exception being swallowed by klipper?! 

129 # assert tolerance_min >= 0.0 and tolerance_max >= 0.0, "Tolerance must be positive" 

130 if tolerance_min < 0.0 or tolerance_max < 0.0: 

131 raise gcmd.error("Tolerance must be positive") 

132 

133 if temps.bed.enabled: 

134 logger.debug(f'TEMPERATURE_WAIT SENSOR=heater_bed MINIMUM={temps.bed_temp - tolerance_min} MAXIMUM={temps.bed_temp + tolerance_max}') 

135 self.gcode.run_script_from_command(f'TEMPERATURE_WAIT SENSOR=heater_bed MINIMUM={temps.bed_temp - tolerance_min} MAXIMUM={temps.bed_temp + tolerance_max}') 

136 if temps.extruder.enabled: 

137 logger.debug(f'TEMPERATURE_WAIT SENSOR=extruder MINIMUM={temps.extruder_temp - tolerance_min} MAXIMUM={temps.extruder_temp + tolerance_max}') 

138 self.gcode.run_script_from_command(f'TEMPERATURE_WAIT SENSOR=extruder MINIMUM={temps.extruder_temp - tolerance_min} MAXIMUM={temps.extruder_temp + tolerance_max}') 

139 

140 def set_wait_temperatures(self, gcmd: Gcmd, temps: Temps, tolerance_min: float, tolerance_max: float) -> None: 

141 """Set and wait for temperatures to reach given level. 

142 

143 :param gcmd: G-code object. 

144 :param temps: Temperatures to wait. Usually the target temperature. 

145 :param tolerance_min: Allowed negative offset in degrees. 

146 :param tolerance_max: Allowed positive offset in degrees. 

147 """ 

148 self.set_temperatures(gcmd=gcmd, temps=temps) 

149 self.wait_temperatures(gcmd=gcmd, temps=temps, tolerance_min=tolerance_min, tolerance_max=tolerance_max) 

150 

151 # noinspection PyUnusedLocal 

152 # pylint: disable=unused-argument 

153 def wait(self, gcmd: Gcmd, duration: float) -> None: 

154 """Wait for given time. 

155 

156 :param gcmd: G-code object. 

157 :param duration: Duration to wait [s]. 

158 """ 

159 self.toolhead.dwell(duration) 

160 

161 def _register_command(self, function: Command, help_text: str) -> None: 

162 """Register a G-code command with Klipper. 

163 

164 The command name will be the cmd_prefix plus function name without '_cmd_'. 

165 

166 :param function: Command function accepting gcmd and optional parameters. 

167 :param help_text: Help to be printed with HELP code. 

168 """ 

169 cmd_name = f"{self.cmd_prefix}{function.__name__.removeprefix('_cmd_').upper()}" 

170 KlipperUtils.register_command(gcode=self.gcode, function=function, cmd_name=cmd_name, help_text=help_text, fail_on_error=True) 

171 

172 def _move(self, move_abs: bool, move: tuple[float | None, ...], speed: float) -> None: 

173 """Move to positions in defined axis. 

174 

175 :param move_abs: True: Move to an absolute coordinate. False: Move relative distance. 

176 :param move: Absolute/relative XYZ-coordinates or None/0 to stay stationary. 

177 :param speed: Move with speed mm/s. 

178 """ 

179 # positions are formatted in a list: [x pos, y pos, z pos, extruder pos] 

180 if move_abs: 

181 move_to_merge = self.toolhead.get_position().copy() 

182 else: 

183 move_to_merge = [0.0] * 4 

184 

185 has_value = False 

186 for index, value in enumerate(move): 

187 if value: 

188 move_to_merge[index] = value 

189 has_value = True 

190 

191 if not has_value: 

192 return # avoid moving and logging when no actual movement occurs 

193 

194 logger.debug(f"Moving toolhead {"abs" if move_abs else "rel"} -> {move_to_merge} @ {speed} mm/s") 

195 self.gcode.run_script_from_command("SAVE_GCODE_STATE NAME=zooc_move_state") 

196 try: 

197 if move_abs: 

198 self.gcode.run_script_from_command("G90") 

199 else: 

200 self.gcode.run_script_from_command("G91") 

201 

202 speed_mm_min = speed * 60.0 # Convert to mm/min 

203 self.gcode.run_script_from_command(f"G0 X{move_to_merge[0]} Y{move_to_merge[1]} Z{move_to_merge[2]} F{speed_mm_min}") 

204 finally: 

205 self.gcode.run_script_from_command("RESTORE_GCODE_STATE NAME=zooc_move_state") 

206 

207 def _get_curr_offset_positions(self) -> list[float]: 

208 """Get current offset positions. 

209 

210 I.e. GET_POSITION's gcode homing. See kKlipper gcode_move.py. 

211 

212 :return: List of offsets ('x', 'y', 'z', 'e'). 

213 """ 

214 dict_status = self.gcode_move.get_status() 

215 list_pos: list[float] = dict_status.get('homing_origin') 

216 return list_pos 

217 

218 def _get_curr_positions(self) -> list[float]: 

219 """Get current positions. 

220 

221 See Klipper gcode.py. 

222 

223 :return: List of positions ('x', 'y', 'z', 'e'). 

224 """ 

225 # dict_status = self.gcode_move.get_status() 

226 # list_pos: list[float] = dict_status.get('position') 

227 # return list_pos 

228 

229 # Use the toolhead to get the fundamental position. 

230 return self.toolhead.get_position() # type: ignore