Coverage for src / klipper_utils / klipper_module.py: 18%
96 statements
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-11 21:45 +0000
« prev ^ index » next coverage.py v7.13.0, created at 2025-12-11 21:45 +0000
1"""Klipper module with some helper functions."""
3import logging
4from typing import Any
6from zooc.data.temps import Temps
8from .klipper_utils import Command, ConfigWrapper, Gcmd, KlipperUtils, KlipperUtilsError
10logger = logging.getLogger(__name__)
13class KlipperModule:
14 """Generic base Klipper module with some helper methods."""
16 def __init__(self, config: ConfigWrapper, cmd_prefix: str) -> None:
17 """Initialize the module.
19 :param config: Klipper config object.
20 :param cmd_prefix: Common prefix used in all registered commands.
21 """
22 self.name = config.get_name()
23 self.printer = config.get_printer() # klippy/klippy.py
24 self.gcode = self.printer.lookup_object('gcode') # klippy/gcode.py
25 self.gcode_move = self.printer.lookup_object('gcode_move') # klippy/extras/gcode_move.py
26 self.cmd_prefix = cmd_prefix
28 def get_time(self) -> float:
29 """Readable helper to get Klipper time.
31 :return: Klipper time in seconds.
32 """
33 return self.printer.get_reactor().monotonic() # type: ignore
35 @property
36 def toolhead(self) -> Any:
37 """Get the toolhead.
39 :return: Toolhead object.
40 :raises KlipperUtilsError: If the printer is not ready.
41 """
42 toolhead = self.printer.lookup_object("toolhead")
43 if toolhead is None:
44 raise KlipperUtilsError("Printer not ready")
45 return toolhead
47 def check_homed_xyz(self, gcmd: Gcmd) -> None:
48 """Check the required axes are homed. Raise error if not.
50 :param gcmd: G-code object.
51 :raises gcmd.error: Klipper exception when some axis are not homed.
52 """
53 kin_status = self.toolhead.get_kinematics().get_status(self.get_time())
54 if 'x' not in kin_status['homed_axes'] or 'y' not in kin_status['homed_axes'] or 'z' not in kin_status['homed_axes']:
55 raise gcmd.error("Must home X, Y and Z axes first")
57 def move_relative(self, move_by: tuple[float | None, ...], speed: float) -> None:
58 """Move to given relative XYZ-position.
60 :param move_by: Array of relative XYZ-coordinates to move to or None (or zero) to stay stationary.
61 :param speed: Move with speed mm/s.
62 """
63 self._move(move_abs=False, move=move_by, speed=speed)
65 def move_absolute(self, move_to: tuple[float, ...], speed: float) -> None:
66 """Move to given absolute XYZ-position.
68 :param move_to: Array of XYZ-coordinates to move to or None to stay stationary.
69 :param speed: Move with speed mm/s.
70 """
71 self._move(move_abs=True, move=move_to, speed=speed)
73 def move_inside(self, move_inside: tuple[tuple[float, float] | None, ...], speed: float) -> None:
74 """Move inside the given coordinated. Do not move if already inside.
76 :param move_inside: Min/max ranges for XYZ-coordinates to move inside to or None to stay stationary for that axis.
77 :param speed: Move with speed mm/s.
78 """
79 # Check if the coordinates are inside the given range
80 move_to = self.toolhead.get_position().copy()
81 for index, inside in enumerate(move_inside):
82 if inside:
83 if inside[0] <= move_to[index] <= inside[1]:
84 move_to[index] = None # Do not move
85 else:
86 move_to[index] = inside[0] if move_to[index] < inside[0] else inside[1]
87 self._move(move_abs=True, move=move_to, speed=speed)
89 def get_temperatures(self) -> tuple[Temps, Temps]:
90 """Read current and target bed and extruder temperatures.
92 :return: Tuple of current and target temperatures.
93 """
94 heaters = self.printer.lookup_object('heaters')
95 time = self.get_time()
96 bed_temp_curr, bed_temp_target = heaters.lookup_heater('heater_bed').get_temp(time)
97 extruder_temp_curr, extruder_temp_target = heaters.lookup_heater('extruder').get_temp(time)
98 return Temps.klipper(bed_temp_curr, extruder_temp_curr), Temps.klipper(bed_temp_target, extruder_temp_target)
100 def set_temperatures(self, gcmd: Gcmd, temps: Temps) -> None:
101 """Set bed and extruder temperatures. Does not wait for those to reach the target.
103 :param gcmd: G-code object.
104 :param temps: Set temperatures.
105 """
106 if not temps.bed.enabled and not temps.extruder.enabled:
107 return
109 gcmd.respond_info(f"Heating: {temps}")
110 if temps.bed.enabled:
111 self.gcode.run_script_from_command(f'M140 S{temps.bed_temp}') # Set bed temperature: M140 S<temperature>
112 if temps.extruder.enabled:
113 self.gcode.run_script_from_command(f'M104 S{temps.extruder_temp}') # Set extruder temperature: M104 [T<index>] [S<temperature>]
115 def wait_temperatures(self, gcmd: Gcmd, temps: Temps, tolerance_min: float, tolerance_max: float) -> None:
116 """Wait temperatures to reach given level. Does not set them.
118 :param gcmd: G-code object.
119 :param temps: Temperatures to wait. Usually the target temperature.
120 :param tolerance_min: Allowed negative offset in degrees.
121 :param tolerance_max: Allowed positive offset in degrees.
122 :raises gcmd.error: If tolerance is not positive.
123 """
124 if not temps.bed.enabled and not temps.extruder.enabled:
125 return
127 gcmd.respond_info(f"Waiting: {temps}")
128 # Assert causes this function just to return and exception being swallowed by klipper?!
129 # assert tolerance_min >= 0.0 and tolerance_max >= 0.0, "Tolerance must be positive"
130 if tolerance_min < 0.0 or tolerance_max < 0.0:
131 raise gcmd.error("Tolerance must be positive")
133 if temps.bed.enabled:
134 logger.debug(f'TEMPERATURE_WAIT SENSOR=heater_bed MINIMUM={temps.bed_temp - tolerance_min} MAXIMUM={temps.bed_temp + tolerance_max}')
135 self.gcode.run_script_from_command(f'TEMPERATURE_WAIT SENSOR=heater_bed MINIMUM={temps.bed_temp - tolerance_min} MAXIMUM={temps.bed_temp + tolerance_max}')
136 if temps.extruder.enabled:
137 logger.debug(f'TEMPERATURE_WAIT SENSOR=extruder MINIMUM={temps.extruder_temp - tolerance_min} MAXIMUM={temps.extruder_temp + tolerance_max}')
138 self.gcode.run_script_from_command(f'TEMPERATURE_WAIT SENSOR=extruder MINIMUM={temps.extruder_temp - tolerance_min} MAXIMUM={temps.extruder_temp + tolerance_max}')
140 def set_wait_temperatures(self, gcmd: Gcmd, temps: Temps, tolerance_min: float, tolerance_max: float) -> None:
141 """Set and wait for temperatures to reach given level.
143 :param gcmd: G-code object.
144 :param temps: Temperatures to wait. Usually the target temperature.
145 :param tolerance_min: Allowed negative offset in degrees.
146 :param tolerance_max: Allowed positive offset in degrees.
147 """
148 self.set_temperatures(gcmd=gcmd, temps=temps)
149 self.wait_temperatures(gcmd=gcmd, temps=temps, tolerance_min=tolerance_min, tolerance_max=tolerance_max)
151 # noinspection PyUnusedLocal
152 # pylint: disable=unused-argument
153 def wait(self, gcmd: Gcmd, duration: float) -> None:
154 """Wait for given time.
156 :param gcmd: G-code object.
157 :param duration: Duration to wait [s].
158 """
159 self.toolhead.dwell(duration)
161 def _register_command(self, function: Command, help_text: str) -> None:
162 """Register a G-code command with Klipper.
164 The command name will be the cmd_prefix plus function name without '_cmd_'.
166 :param function: Command function accepting gcmd and optional parameters.
167 :param help_text: Help to be printed with HELP code.
168 """
169 cmd_name = f"{self.cmd_prefix}{function.__name__.removeprefix('_cmd_').upper()}"
170 KlipperUtils.register_command(gcode=self.gcode, function=function, cmd_name=cmd_name, help_text=help_text, fail_on_error=True)
172 def _move(self, move_abs: bool, move: tuple[float | None, ...], speed: float) -> None:
173 """Move to positions in defined axis.
175 :param move_abs: True: Move to an absolute coordinate. False: Move relative distance.
176 :param move: Absolute/relative XYZ-coordinates or None/0 to stay stationary.
177 :param speed: Move with speed mm/s.
178 """
179 # positions are formatted in a list: [x pos, y pos, z pos, extruder pos]
180 if move_abs:
181 move_to_merge = self.toolhead.get_position().copy()
182 else:
183 move_to_merge = [0.0] * 4
185 has_value = False
186 for index, value in enumerate(move):
187 if value:
188 move_to_merge[index] = value
189 has_value = True
191 if not has_value:
192 return # avoid moving and logging when no actual movement occurs
194 logger.debug(f"Moving toolhead {"abs" if move_abs else "rel"} -> {move_to_merge} @ {speed} mm/s")
195 self.gcode.run_script_from_command("SAVE_GCODE_STATE NAME=zooc_move_state")
196 try:
197 if move_abs:
198 self.gcode.run_script_from_command("G90")
199 else:
200 self.gcode.run_script_from_command("G91")
202 speed_mm_min = speed * 60.0 # Convert to mm/min
203 self.gcode.run_script_from_command(f"G0 X{move_to_merge[0]} Y{move_to_merge[1]} Z{move_to_merge[2]} F{speed_mm_min}")
204 finally:
205 self.gcode.run_script_from_command("RESTORE_GCODE_STATE NAME=zooc_move_state")
207 def _get_curr_offset_positions(self) -> list[float]:
208 """Get current offset positions.
210 I.e. GET_POSITION's gcode homing. See kKlipper gcode_move.py.
212 :return: List of offsets ('x', 'y', 'z', 'e').
213 """
214 dict_status = self.gcode_move.get_status()
215 list_pos: list[float] = dict_status.get('homing_origin')
216 return list_pos
218 def _get_curr_positions(self) -> list[float]:
219 """Get current positions.
221 See Klipper gcode.py.
223 :return: List of positions ('x', 'y', 'z', 'e').
224 """
225 # dict_status = self.gcode_move.get_status()
226 # list_pos: list[float] = dict_status.get('position')
227 # return list_pos
229 # Use the toolhead to get the fundamental position.
230 return self.toolhead.get_position() # type: ignore