From 25ee651226970bd6e82121729cf42e8a528247b5 Mon Sep 17 00:00:00 2001 From: CChen616 <714434105@qq.com> Date: Fri, 23 Aug 2024 10:45:24 +0800 Subject: [PATCH] repair missing update --- klippy/extras/adxl345.py | 841 ++++++++++++--------- klippy/extras/angle.py | 1156 ++++++++++++++--------------- klippy/extras/endstop_phase.py | 462 ++++++------ klippy/extras/extruder_stepper.py | 42 +- klippy/extras/manual_probe.py | 25 +- klippy/extras/resonance_tester.py | 129 ++-- klippy/extras/save_variables.py | 23 + klippy/extras/shaper_calibrate.py | 160 +++- klippy/pins.py | 5 + 9 files changed, 1560 insertions(+), 1283 deletions(-) diff --git a/klippy/extras/adxl345.py b/klippy/extras/adxl345.py index 4ef8df0..7325e30 100644 --- a/klippy/extras/adxl345.py +++ b/klippy/extras/adxl345.py @@ -1,10 +1,10 @@ # Support for reading acceleration data from an adxl345 chip # -# Copyright (C) 2020-2021 Kevin O'Connor +# Copyright (C) 2020 Kevin O'Connor # # This file may be distributed under the terms of the GNU GPLv3 license. -import logging, time, collections, threading, multiprocessing, os -from . import bus, motion_report +import logging, math, time, collections, multiprocessing, os +from . import bus, manual_probe, probe # ADXL345 registers REG_DEVID = 0x00 @@ -14,6 +14,15 @@ REG_DATA_FORMAT = 0x31 REG_FIFO_CTL = 0x38 REG_MOD_READ = 0x80 REG_MOD_MULTI = 0x40 +REG_THRESH_TAP = 0x1D +REG_DUR = 0x21 +REG_INT_MAP = 0x2F +REG_TAP_AXES = 0x2A +REG_OFSX = 0x1E +REG_OFSY = 0x1F +REG_OFSZ = 0x20 +REG_INT_ENABLE = 0x2E +REG_INT_SOURCE = 0x30 QUERY_RATES = { 25: 0x8, 50: 0x9, 100: 0xa, 200: 0xb, 400: 0xc, @@ -21,66 +30,67 @@ QUERY_RATES = { } ADXL345_DEV_ID = 0xe5 -SET_FIFO_CTL = 0x90 FREEFALL_ACCEL = 9.80665 * 1000. SCALE = 0.0039 * FREEFALL_ACCEL # 3.9mg/LSB * Earth gravity in mm/s**2 +DUR_SCALE = 0.000625 # 0.625 msec / LSB +TAP_SCALE = 0.0625 * FREEFALL_ACCEL # 62.5mg/LSB * Earth gravity in mm/s**2 +OFS_SCALE = 0.0156 * FREEFALL_ACCEL # 15.6mg/LSB * Earth gravity in mm/s**2 + +PROBE_CALIBRATION_TIME = 1. +ADXL345_REST_TIME = .01 Accel_Measurement = collections.namedtuple( 'Accel_Measurement', ('time', 'accel_x', 'accel_y', 'accel_z')) -# Helper class to obtain measurements -class AccelQueryHelper: - def __init__(self, printer, cconn): - self.printer = printer - self.cconn = cconn - print_time = printer.lookup_object('toolhead').get_last_move_time() - self.request_start_time = self.request_end_time = print_time - self.samples = self.raw_samples = [] - def finish_measurements(self): - toolhead = self.printer.lookup_object('toolhead') - self.request_end_time = toolhead.get_last_move_time() - toolhead.wait_moves() - self.cconn.finalize() - def _get_raw_samples(self): - raw_samples = self.cconn.get_messages() - if raw_samples: - self.raw_samples = raw_samples - return self.raw_samples - def has_valid_samples(self): - raw_samples = self._get_raw_samples() - for msg in raw_samples: - data = msg['params']['data'] - first_sample_time = data[0][0] - last_sample_time = data[-1][0] - if (first_sample_time > self.request_end_time - or last_sample_time < self.request_start_time): - continue - # The time intervals [first_sample_time, last_sample_time] - # and [request_start_time, request_end_time] have non-zero - # intersection. It is still theoretically possible that none - # of the samples from raw_samples fall into the time interval - # [request_start_time, request_end_time] if it is too narrow - # or on very heavy data losses. In practice, that interval - # is at least 1 second, so this possibility is negligible. - return True - return False - def get_samples(self): - raw_samples = self._get_raw_samples() - if not raw_samples: +# Sample results +class ADXL345Results: + def __init__(self): + self.raw_samples = None + self.samples = [] + self.drops = self.overflows = 0 + self.time_per_sample = self.start_range = self.end_range = 0. + def get_stats(self): + return ("drops=%d,overflows=%d" + ",time_per_sample=%.9f,start_range=%.6f,end_range=%.6f" + % (self.drops, self.overflows, + self.time_per_sample, self.start_range, self.end_range)) + def setup_data(self, axes_map, raw_samples, end_sequence, overflows, + start1_time, start2_time, end1_time, end2_time): + if not raw_samples or not end_sequence: + return + self.axes_map = axes_map + self.raw_samples = raw_samples + self.overflows = overflows + self.start2_time = start2_time + self.start_range = start2_time - start1_time + self.end_range = end2_time - end1_time + self.total_count = (end_sequence - 1) * 8 + len(raw_samples[-1][1]) // 6 + total_time = end2_time - start2_time + self.time_per_sample = time_per_sample = total_time / self.total_count + self.seq_to_time = time_per_sample * 8. + actual_count = sum([len(data)//6 for _, data in raw_samples]) + self.drops = self.total_count - actual_count + def decode_samples(self): + if not self.raw_samples: return self.samples - total = sum([len(m['params']['data']) for m in raw_samples]) - count = 0 - self.samples = samples = [None] * total - for msg in raw_samples: - for samp_time, x, y, z in msg['params']['data']: - if samp_time < self.request_start_time: - continue - if samp_time > self.request_end_time: - break - samples[count] = Accel_Measurement(samp_time, x, y, z) - count += 1 - del samples[count:] + (x_pos, x_scale), (y_pos, y_scale), (z_pos, z_scale) = self.axes_map + actual_count = 0 + self.samples = samples = [None] * self.total_count + for seq, data in self.raw_samples: + d = bytearray(data) + count = len(data) + sdata = [(d[i] | (d[i+1] << 8)) - ((d[i+1] & 0x80) << 9) + for i in range(0, count-1, 2)] + seq_time = self.start2_time + seq * self.seq_to_time + for i in range(count//6): + samp_time = seq_time + i * self.time_per_sample + x = sdata[i*3 + x_pos] * x_scale + y = sdata[i*3 + y_pos] * y_scale + z = sdata[i*3 + z_pos] * z_scale + samples[actual_count] = Accel_Measurement(samp_time, x, y, z) + actual_count += 1 + del samples[actual_count:] return self.samples def write_to_file(self, filename): def write_impl(): @@ -90,8 +100,9 @@ class AccelQueryHelper: except: pass f = open(filename, "w") - f.write("#time,accel_x,accel_y,accel_z\n") - samples = self.samples or self.get_samples() + f.write("##%s\n#time,accel_x,accel_y,accel_z\n" % ( + self.get_stats(),)) + samples = self.samples or self.decode_samples() for t, accel_x, accel_y, accel_z in samples: f.write("%.6f,%.6f,%.6f,%.6f\n" % ( t, accel_x, accel_y, accel_z)) @@ -100,184 +111,383 @@ class AccelQueryHelper: write_proc.daemon = True write_proc.start() -# Helper class for G-Code commands -class AccelCommandHelper: - def __init__(self, config, chip): +class BedOffsetHelper: + def __init__(self, config): self.printer = config.get_printer() - self.chip = chip - self.bg_client = None - name_parts = config.get_name().split() - self.base_name = name_parts[0] - self.name = name_parts[-1] - self.register_commands(self.name) - if len(name_parts) == 1: - if self.name == "adxl345" or not config.has_section("adxl345"): - self.register_commands(None) - def register_commands(self, name): + # Register BED_OFFSET_CALIBRATE command + zconfig = config.getsection('stepper_z') + self.z_position_endstop = zconfig.getfloat('position_endstop', None, + note_valid=False) + if self.z_position_endstop is None: + return + self.bed_probe_point = None + if config.get('bed_probe_point', None) is not None: + try: + self.bed_probe_point = [ + float(coord.strip()) for coord in + config.get('bed_probe_point').split(',', 1)] + except: + raise config.error( + "Unable to parse bed_probe_point '%s'" % ( + config.get('bed_probe_point'))) + self.horizontal_move_z = config.getfloat( + 'horizontal_move_z', 5.) + self.horizontal_move_speed = config.getfloat( + 'horizontal_move_speed', 50., above=0.) + gcode = self.printer.lookup_object('gcode') + gcode.register_command( + 'BED_OFFSET_CALIBRATE', self.cmd_BED_OFFSET_CALIBRATE, + desc=self.cmd_BED_OFFSET_CALIBRATE_help) + def bed_offset_finalize(self, pos, gcmd): + if pos is None: + return + z_pos = self.z_position_endstop - pos[2] + gcmd.respond_info( + "stepper_z: position_endstop: %.3f\n" + "The SAVE_CONFIG command will update the printer config file\n" + "with the above and restart the printer." % (z_pos,)) + configfile = self.printer.lookup_object('configfile') + configfile.set('stepper_z', 'position_endstop', "%.3f" % (z_pos,)) + cmd_BED_OFFSET_CALIBRATE_help = "Calibrate a bed offset using ADXL345 probe" + def cmd_BED_OFFSET_CALIBRATE(self, gcmd): + manual_probe.verify_no_manual_probe(self.printer) + probe = self.printer.lookup_object('probe') + lift_speed = probe.get_lift_speed(gcmd) + toolhead = self.printer.lookup_object('toolhead') + oldpos = toolhead.get_position() + if self.bed_probe_point is not None: + toolhead.manual_move([None, None, self.horizontal_move_z], + lift_speed) + toolhead.manual_move(self.bed_probe_point + [None], + self.horizontal_move_speed) + curpos = probe.run_probe(gcmd) + offset_pos = [0., 0., curpos[2] - probe.get_offsets()[2]] + if self.bed_probe_point is not None: + curpos[2] = self.horizontal_move_z + else: + curpos[2] = oldpos[2] + toolhead.manual_move(curpos, lift_speed) + self.bed_offset_finalize(offset_pos, gcmd) + +# ADXL345 virtual endstop wrapper for probing +class ADXL345EndstopWrapper: + def __init__(self, config, adxl345, axes_map): + self.printer = config.get_printer() + self.printer.register_event_handler("klippy:connect", self.calibrate) + self.calibrated = False + self.adxl345 = adxl345 + self.axes_map = axes_map + self.ofs_regs = (REG_OFSX, REG_OFSY, REG_OFSZ) + int_pin = config.get('int_pin').strip() + self.inverted = False + if int_pin.startswith('!'): + self.inverted = True + int_pin = int_pin[1:].strip() + if int_pin != 'int1' and int_pin != 'int2': + raise config.error('int_pin must specify one of int1 or int2 pins') + self.int_map = 0x40 if int_pin == 'int2' else 0x0 + probe_pin = config.get('probe_pin') + self.position_endstop = config.getfloat('z_offset') + self.tap_thresh = config.getfloat('tap_thresh', 5000, + minval=TAP_SCALE, maxval=100000.) + self.tap_dur = config.getfloat('tap_dur', 0.01, + above=DUR_SCALE, maxval=0.1) + self.next_cmd_time = self.action_end_time = 0. + # Create an "endstop" object to handle the sensor pin + ppins = self.printer.lookup_object('pins') + pin_params = ppins.lookup_pin(probe_pin, can_invert=True, + can_pullup=True) + mcu = pin_params['chip'] + mcu.register_config_callback(self._build_config) + self.mcu_endstop = mcu.setup_pin('endstop', pin_params) + # Wrappers + self.get_mcu = self.mcu_endstop.get_mcu + self.add_stepper = self.mcu_endstop.add_stepper + self.get_steppers = self.mcu_endstop.get_steppers + self.home_start = self.mcu_endstop.home_start + self.home_wait = self.mcu_endstop.home_wait + self.query_endstop = self.mcu_endstop.query_endstop # Register commands gcode = self.printer.lookup_object('gcode') - gcode.register_mux_command("ACCELEROMETER_MEASURE", "CHIP", name, - self.cmd_ACCELEROMETER_MEASURE, - desc=self.cmd_ACCELEROMETER_MEASURE_help) - gcode.register_mux_command("ACCELEROMETER_QUERY", "CHIP", name, - self.cmd_ACCELEROMETER_QUERY, - desc=self.cmd_ACCELEROMETER_QUERY_help) - gcode.register_mux_command("ACCELEROMETER_DEBUG_READ", "CHIP", name, - self.cmd_ACCELEROMETER_DEBUG_READ, - desc=self.cmd_ACCELEROMETER_DEBUG_READ_help) - gcode.register_mux_command("ACCELEROMETER_DEBUG_WRITE", "CHIP", name, - self.cmd_ACCELEROMETER_DEBUG_WRITE, - desc=self.cmd_ACCELEROMETER_DEBUG_WRITE_help) - cmd_ACCELEROMETER_MEASURE_help = "Start/stop accelerometer" - def cmd_ACCELEROMETER_MEASURE(self, gcmd): - if self.bg_client is None: - # Start measurements - self.bg_client = self.chip.start_internal_client() - gcmd.respond_info("accelerometer measurements started") + gcode.register_mux_command( + "ACCEL_PROBE_CALIBRATE", "CHIP", None, + self.cmd_ACCEL_PROBE_CALIBRATE, + desc=self.cmd_ACCEL_PROBE_CALIBRATE_help) + gcode.register_mux_command( + "SET_ACCEL_PROBE", "CHIP", None, self.cmd_SET_ACCEL_PROBE, + desc=self.cmd_SET_ACCEL_PROBE_help) + # Register bed offset calibration helper + BedOffsetHelper(config) + def _build_config(self): + kin = self.printer.lookup_object('toolhead').get_kinematics() + for stepper in kin.get_steppers(): + if stepper.is_active_axis('z'): + self.add_stepper(stepper) + def calibrate(self, gcmd=None, retries=3): + adxl345 = self.adxl345 + if not adxl345.is_initialized(): + # ADXL345 that works as a probe must be initialized from the start + adxl345.initialize() + adxl345.set_reg(REG_POWER_CTL, 0x00) + if self.inverted: + adxl345.set_reg(REG_DATA_FORMAT, 0x2B) + adxl345.set_reg(REG_INT_MAP, self.int_map) + adxl345.set_reg(REG_TAP_AXES, 0x7) + adxl345.set_reg(REG_THRESH_TAP, int(self.tap_thresh / TAP_SCALE)) + adxl345.set_reg(REG_DUR, int(self.tap_dur / DUR_SCALE)) + # Offset freefall accleration on the true Z axis + for reg in self.ofs_regs: + adxl345.set_reg(reg, 0x00) + adxl345.start_measurements() + reactor = self.printer.get_reactor() + reactor.register_callback(lambda ev: self._offset_axes(gcmd, retries), + reactor.monotonic() + PROBE_CALIBRATION_TIME) + def _offset_axes(self, gcmd, retries): + res = self.adxl345.finish_measurements() + msg_func = gcmd.respond_info if gcmd is not None else logging.info + samples = res.decode_samples() + x_ofs = sum([s.accel_x for s in samples]) / len(samples) + y_ofs = sum([s.accel_y for s in samples]) / len(samples) + z_ofs = sum([s.accel_z for s in samples]) / len(samples) + meas_freefall_accel = math.sqrt(x_ofs**2 + y_ofs**2 + z_ofs**2) + if abs(meas_freefall_accel - FREEFALL_ACCEL) > FREEFALL_ACCEL * 0.5: + err_msg = ("Calibration error: ADXL345 incorrectly measures " + "freefall accleration: %.0f (measured) vs %.0f " + "(expected)" % (meas_freefall_accel, FREEFALL_ACCEL)) + if retries > 0: + msg_func(err_msg + ", retrying (%d)" % (retries-1,)) + self.calibrate(gcmd, retries-1) + else: + msg_func(err_msg + ", aborting self-calibration") return - # End measurements - name = gcmd.get("NAME", time.strftime("%Y%m%d_%H%M%S")) - if not name.replace('-', '').replace('_', '').isalnum(): - raise gcmd.error("Invalid NAME parameter") - bg_client = self.bg_client - self.bg_client = None - bg_client.finish_measurements() - # Write data to file - if self.base_name == self.name: - filename = "/tmp/%s-%s.csv" % (self.base_name, name) - else: - filename = "/tmp/%s-%s-%s.csv" % (self.base_name, self.name, name) - bg_client.write_to_file(filename) - gcmd.respond_info("Writing raw accelerometer data to %s file" - % (filename,)) - cmd_ACCELEROMETER_QUERY_help = "Query accelerometer for the current values" - def cmd_ACCELEROMETER_QUERY(self, gcmd): - aclient = self.chip.start_internal_client() - self.printer.lookup_object('toolhead').dwell(1.) - aclient.finish_measurements() - values = aclient.get_samples() - if not values: - raise gcmd.error("No accelerometer measurements found") - _, accel_x, accel_y, accel_z = values[-1] - gcmd.respond_info("accelerometer values (x, y, z): %.6f, %.6f, %.6f" - % (accel_x, accel_y, accel_z)) - cmd_ACCELEROMETER_DEBUG_READ_help = "Query register (for debugging)" - def cmd_ACCELEROMETER_DEBUG_READ(self, gcmd): - reg = gcmd.get("REG", minval=0, maxval=126, parser=lambda x: int(x, 0)) - val = self.chip.read_reg(reg) - gcmd.respond_info("Accelerometer REG[0x%x] = 0x%x" % (reg, val)) - cmd_ACCELEROMETER_DEBUG_WRITE_help = "Set register (for debugging)" - def cmd_ACCELEROMETER_DEBUG_WRITE(self, gcmd): - reg = gcmd.get("REG", minval=0, maxval=126, parser=lambda x: int(x, 0)) - val = gcmd.get("VAL", minval=0, maxval=255, parser=lambda x: int(x, 0)) - self.chip.set_reg(reg, val) - -# Helper class for chip clock synchronization via linear regression -class ClockSyncRegression: - def __init__(self, mcu, chip_clock_smooth, decay = 1. / 20.): - self.mcu = mcu - self.chip_clock_smooth = chip_clock_smooth - self.decay = decay - self.last_chip_clock = self.last_exp_mcu_clock = 0. - self.mcu_clock_avg = self.mcu_clock_variance = 0. - self.chip_clock_avg = self.chip_clock_covariance = 0. - def reset(self, mcu_clock, chip_clock): - self.mcu_clock_avg = self.last_mcu_clock = mcu_clock - self.chip_clock_avg = chip_clock - self.mcu_clock_variance = self.chip_clock_covariance = 0. - self.last_chip_clock = self.last_exp_mcu_clock = 0. - def update(self, mcu_clock, chip_clock): - # Update linear regression - decay = self.decay - diff_mcu_clock = mcu_clock - self.mcu_clock_avg - self.mcu_clock_avg += decay * diff_mcu_clock - self.mcu_clock_variance = (1. - decay) * ( - self.mcu_clock_variance + diff_mcu_clock**2 * decay) - diff_chip_clock = chip_clock - self.chip_clock_avg - self.chip_clock_avg += decay * diff_chip_clock - self.chip_clock_covariance = (1. - decay) * ( - self.chip_clock_covariance + diff_mcu_clock*diff_chip_clock*decay) - def set_last_chip_clock(self, chip_clock): - base_mcu, base_chip, inv_cfreq = self.get_clock_translation() - self.last_chip_clock = chip_clock - self.last_exp_mcu_clock = base_mcu + (chip_clock-base_chip) * inv_cfreq - def get_clock_translation(self): - inv_chip_freq = self.mcu_clock_variance / self.chip_clock_covariance - if not self.last_chip_clock: - return self.mcu_clock_avg, self.chip_clock_avg, inv_chip_freq - # Find mcu clock associated with future chip_clock - s_chip_clock = self.last_chip_clock + self.chip_clock_smooth - scdiff = s_chip_clock - self.chip_clock_avg - s_mcu_clock = self.mcu_clock_avg + scdiff * inv_chip_freq - # Calculate frequency to converge at future point - mdiff = s_mcu_clock - self.last_exp_mcu_clock - s_inv_chip_freq = mdiff / self.chip_clock_smooth - return self.last_exp_mcu_clock, self.last_chip_clock, s_inv_chip_freq - def get_time_translation(self): - base_mcu, base_chip, inv_cfreq = self.get_clock_translation() - clock_to_print_time = self.mcu.clock_to_print_time - base_time = clock_to_print_time(base_mcu) - inv_freq = clock_to_print_time(base_mcu + inv_cfreq) - base_time - return base_time, base_chip, inv_freq - -MIN_MSG_TIME = 0.100 - -BYTES_PER_SAMPLE = 5 -SAMPLES_PER_BLOCK = 10 + x_m = max([abs(s.accel_x - x_ofs) for s in samples]) + y_m = max([abs(s.accel_y - y_ofs) for s in samples]) + z_m = max([abs(s.accel_z - z_ofs) for s in samples]) + accel_noise = max(x_m, y_m, z_m) + if accel_noise > self.tap_thresh: + err_msg = ("Calibration error: ADXL345 noise level too high for " + "the configured tap_thresh: %.0f (tap_thresh) vs " + "%.0f (noise)" % (self.tap_thresh, accel_noise)) + if retries > 0: + msg_func(err_msg + ", retrying (%d)" % (retries-1,)) + self.calibrate(gcmd, retries-1) + else: + msg_func(err_msg + ", aborting self-calibration") + return + for ofs, axis in zip((x_ofs, y_ofs, z_ofs), (0, 1, 2)): + ofs_reg = self.ofs_regs[self.axes_map[axis][0]] + ofs_val = 0xFF & int(round( + -ofs / self.axes_map[axis][1] * (SCALE / OFS_SCALE))) + self.adxl345.set_reg(ofs_reg, ofs_val) + msg_func("Successfully calibrated ADXL345") + self.calibrated = True + def multi_probe_begin(self): + pass + def multi_probe_end(self): + pass + def _try_clear_tap(self): + adxl345 = self.adxl345 + tries = 8 + while tries > 0: + val = adxl345.read_reg(REG_INT_SOURCE) + if not (val & 0x40): + return True + tries -= 1 + return False + def probe_prepare(self, hmove): + if not self.calibrated: + raise self.printer.command_error( + "ADXL345 probe failed calibration, " + "retry with ACCEL_PROBE_CALIBRATE command") + adxl345 = self.adxl345 + toolhead = self.printer.lookup_object('toolhead') + toolhead.flush_step_generation() + print_time = toolhead.get_last_move_time() + clock = self.adxl345.get_mcu().print_time_to_clock(print_time + + ADXL345_REST_TIME) + if not adxl345.is_initialized(): + adxl345.initialize() + adxl345.set_reg(REG_INT_ENABLE, 0x00, minclock=clock) + adxl345.read_reg(REG_INT_SOURCE) + adxl345.set_reg(REG_INT_ENABLE, 0x40, minclock=clock) + if not adxl345.is_measuring(): + adxl345.set_reg(REG_POWER_CTL, 0x08, minclock=clock) + if not self._try_clear_tap(): + raise self.printer.command_error( + "ADXL345 tap triggered before move, too sensitive?") + def probe_finish(self, hmove): + adxl345 = self.adxl345 + toolhead = self.printer.lookup_object('toolhead') + toolhead.dwell(ADXL345_REST_TIME) + print_time = toolhead.get_last_move_time() + clock = adxl345.get_mcu().print_time_to_clock(print_time) + adxl345.set_reg(REG_INT_ENABLE, 0x00, minclock=clock) + if not adxl345.is_measuring(): + adxl345.set_reg(REG_POWER_CTL, 0x00) + if not self._try_clear_tap(): + raise self.printer.command_error( + "ADXL345 tap triggered after move, too sensitive?") + cmd_ACCEL_PROBE_CALIBRATE_help = "Force ADXL345 probe [re-]calibration" + def cmd_ACCEL_PROBE_CALIBRATE(self, gcmd): + self.calibrate(gcmd) + cmd_SET_ACCEL_PROBE_help = "Configure ADXL345 parameters related to probing" + def cmd_SET_ACCEL_PROBE(self, gcmd): + self.tap_thresh = gcmd.get_float('TAP_THRESH', self.tap_thresh, + minval=TAP_SCALE, maxval=100000.) + self.tap_dur = config.getfloat('TAP_DUR', self.tap_dur, + above=DUR_SCALE, maxval=0.1) + adxl345.set_reg(REG_THRESH_TAP, int(self.tap_thresh / TAP_SCALE)) + adxl345.set_reg(REG_DUR, int(self.tap_dur / DUR_SCALE)) # Printer class that controls ADXL345 chip class ADXL345: def __init__(self, config): self.printer = config.get_printer() - AccelCommandHelper(config, self) self.query_rate = 0 + self.last_tx_time = 0. + self.config = config am = {'x': (0, SCALE), 'y': (1, SCALE), 'z': (2, SCALE), '-x': (0, -SCALE), '-y': (1, -SCALE), '-z': (2, -SCALE)} - axes_map = config.getlist('axes_map', ('x','y','z'), count=3) - if any([a not in am for a in axes_map]): + axes_map = config.get('axes_map', 'x,y,z').split(',') + if len(axes_map) != 3 or any([a.strip() not in am for a in axes_map]): raise config.error("Invalid adxl345 axes_map parameter") self.axes_map = [am[a.strip()] for a in axes_map] self.data_rate = config.getint('rate', 3200) if self.data_rate not in QUERY_RATES: raise config.error("Invalid rate parameter: %d" % (self.data_rate,)) # Measurement storage (accessed from background thread) - self.lock = threading.Lock() self.raw_samples = [] + self.last_sequence = 0 + self.samples_start1 = self.samples_start2 = 0. # Setup mcu sensor_adxl345 bulk query code self.spi = bus.MCU_SPI_from_config(config, 3, default_speed=5000000) self.mcu = mcu = self.spi.get_mcu() self.oid = oid = mcu.create_oid() - self.query_adxl345_cmd = self.query_adxl345_end_cmd = None - self.query_adxl345_status_cmd = None + self.query_adxl345_cmd = self.query_adxl345_end_cmd =None mcu.add_config_cmd("config_adxl345 oid=%d spi_oid=%d" % (oid, self.spi.get_oid())) mcu.add_config_cmd("query_adxl345 oid=%d clock=0 rest_ticks=0" % (oid,), on_restart=True) mcu.register_config_callback(self._build_config) + mcu.register_response(self._handle_adxl345_start, "adxl345_start", oid) mcu.register_response(self._handle_adxl345_data, "adxl345_data", oid) - # Clock tracking - self.last_sequence = self.max_query_duration = 0 - self.last_limit_count = self.last_error_count = 0 - self.clock_sync = ClockSyncRegression(self.mcu, 640) - # API server endpoints - self.api_dump = motion_report.APIDumpHelper( - self.printer, self._api_update, self._api_startstop, 0.100) - self.name = config.get_name().split()[-1] - wh = self.printer.lookup_object('webhooks') - wh.register_mux_endpoint("adxl345/dump_adxl345", "sensor", self.name, - self._handle_dump_adxl345) + # Register commands + self.name = "default" + if len(config.get_name().split()) > 1: + self.name = config.get_name().split()[1] + gcode = self.printer.lookup_object('gcode') + gcode.register_mux_command("ACCELEROMETER_MEASURE", "CHIP", self.name, + self.cmd_ACCELEROMETER_MEASURE, + desc=self.cmd_ACCELEROMETER_MEASURE_help) + gcode.register_mux_command("ACCELEROMETER_QUERY", "CHIP", self.name, + self.cmd_ACCELEROMETER_QUERY, + desc=self.cmd_ACCELEROMETER_QUERY_help) + gcode.register_mux_command("ADXL345_DEBUG_READ", "CHIP", self.name, + self.cmd_ADXL345_DEBUG_READ, + desc=self.cmd_ADXL345_DEBUG_READ_help) + gcode.register_mux_command("ADXL345_DEBUG_WRITE", "CHIP", self.name, + self.cmd_ADXL345_DEBUG_WRITE, + desc=self.cmd_ADXL345_DEBUG_WRITE_help) + gcode.register_mux_command("TEST_MYCMD", "CHIP", self.name, + self.cmd_TEST_MYCMD, + desc=self.cmd_TEST_MYCMD_help) + gcode.register_mux_command("MKS_PROBE", "CHIP", self.name, + self.cmd_MKS_PROBE, + desc=self.cmd_MKS_PROBE_help) + if self.name == "default": + gcode.register_mux_command("ACCELEROMETER_MEASURE", "CHIP", None, + self.cmd_ACCELEROMETER_MEASURE) + gcode.register_mux_command("ACCELEROMETER_QUERY", "CHIP", None, + self.cmd_ACCELEROMETER_QUERY) + gcode.register_mux_command("ADXL345_DEBUG_READ", "CHIP", None, + self.cmd_ADXL345_DEBUG_READ, + desc=self.cmd_ADXL345_DEBUG_READ_help) + gcode.register_mux_command("ADXL345_DEBUG_WRITE", "CHIP", None, + self.cmd_ADXL345_DEBUG_WRITE, + desc=self.cmd_ADXL345_DEBUG_WRITE_help) + gcode.register_mux_command("TEST_MYCMD", "CHIP", None, + self.cmd_TEST_MYCMD, + desc=self.cmd_TEST_MYCMD_help) + gcode.register_mux_command("MKS_PROBE", "CHIP", None, + self.cmd_MKS_PROBE, + desc=self.cmd_MKS_PROBE_help) + + self.adxl345_endstop = ADXL345EndstopWrapper(self.config, self, self.axes_map) + cmd_MKS_PROBE_help = "test my cmd" + def cmd_MKS_PROBE(self, gcmd): + gcmd.respond_info("go to mks Probe success") + self.printer.remove_object('probe') + self.printer.lookup_object('gcode').remove_command('PROBE') + self.printer.lookup_object('gcode').remove_command('QUERY_PROBE') + self.printer.lookup_object('gcode').remove_command('PROBE_CALIBRATE') + self.printer.lookup_object('gcode').remove_command('PROBE_ACCURACY') + self.printer.lookup_object('gcode').remove_command('Z_OFFSET_APPLY_PROBE') + self.printer.lookup_object('gcode').remove_command('MKS_SHOW_Z_OFFSET') + self.printer.lookup_object('pins').remove_chip('probe') + self.printer.add_object('probe', probe.load_config(self.probe_config)) + # self.printer.lookup_object('probe').multi_probe_end() + cmd_TEST_MYCMD_help = "test my cmd" + def cmd_TEST_MYCMD(self, gcmd): + gcmd.respond_info("TEST MY CMD success") + # if config.get('probe_pin', None) is not None: + # adxl345_endstop = ADXL345EndstopWrapper(config, self, self.axes_map) + self.probe_config = self.printer.lookup_object('probe').config + # self.printer.add_object('probebak', None) + # self.printer.copy_object('probe', 'probebak') + self.printer.remove_object('probe') + self.printer.lookup_object('gcode').remove_command('PROBE') + self.printer.lookup_object('gcode').remove_command('QUERY_PROBE') + self.printer.lookup_object('gcode').remove_command('PROBE_CALIBRATE') + self.printer.lookup_object('gcode').remove_command('PROBE_ACCURACY') + self.printer.lookup_object('gcode').remove_command('Z_OFFSET_APPLY_PROBE') + self.printer.lookup_object('gcode').remove_command('MKS_SHOW_Z_OFFSET') + self.printer.lookup_object('pins').remove_chip('probe') + self.printer.add_object('probe', probe.PrinterProbe(self.config, self.adxl345_endstop)) + def is_initialized(self): + # In case of miswiring, testing ADXL345 device ID prevents treating + # noise or wrong signal as a correctly initialized device + return (self.read_reg(REG_DEVID) == ADXL345_DEV_ID and + (self.read_reg(REG_DATA_FORMAT) & 0xB) != 0) + def initialize(self): + # Setup ADXL345 parameters and verify chip connectivity + self.set_reg(REG_POWER_CTL, 0x00) + dev_id = self.read_reg(REG_DEVID) + if dev_id != ADXL345_DEV_ID: + raise self.printer.command_error("Invalid adxl345 id (got %x vs %x)" + % (dev_id, ADXL345_DEV_ID)) + self.set_reg(REG_DATA_FORMAT, 0x0B) + def get_mcu(self): + return self.spi.get_mcu() def _build_config(self): - cmdqueue = self.spi.get_command_queue() self.query_adxl345_cmd = self.mcu.lookup_command( - "query_adxl345 oid=%c clock=%u rest_ticks=%u", cq=cmdqueue) + "query_adxl345 oid=%c clock=%u rest_ticks=%u", + cq=self.spi.get_command_queue()) self.query_adxl345_end_cmd = self.mcu.lookup_query_command( "query_adxl345 oid=%c clock=%u rest_ticks=%u", - "adxl345_status oid=%c clock=%u query_ticks=%u next_sequence=%hu" - " buffered=%c fifo=%c limit_count=%hu", oid=self.oid, cq=cmdqueue) - self.query_adxl345_status_cmd = self.mcu.lookup_query_command( - "query_adxl345_status oid=%c", - "adxl345_status oid=%c clock=%u query_ticks=%u next_sequence=%hu" - " buffered=%c fifo=%c limit_count=%hu", oid=self.oid, cq=cmdqueue) + "adxl345_end oid=%c end1_clock=%u end2_clock=%u" + " limit_count=%hu sequence=%hu", + oid=self.oid, cq=self.spi.get_command_queue()) + def _clock_to_print_time(self, clock): + return self.mcu.clock_to_print_time(self.mcu.clock32_to_clock64(clock)) + def _handle_adxl345_start(self, params): + self.samples_start1 = self._clock_to_print_time(params['start1_clock']) + self.samples_start2 = self._clock_to_print_time(params['start2_clock']) + def _handle_adxl345_data(self, params): + last_sequence = self.last_sequence + sequence = (last_sequence & ~0xffff) | params['sequence'] + if sequence < last_sequence: + sequence += 0x10000 + self.last_sequence = sequence + raw_samples = self.raw_samples + if len(raw_samples) >= 300000: + # Avoid filling up memory with too many samples + return + raw_samples.append((sequence, params['data'])) + def _convert_sequence(self, sequence): + sequence = (self.last_sequence & ~0xffff) | sequence + if sequence < self.last_sequence: + sequence += 0x10000 + return sequence def read_reg(self, reg): params = self.spi.spi_transfer([reg | REG_MOD_READ, 0x00]) response = bytearray(params['response']) @@ -291,151 +501,116 @@ class ADXL345: "This is generally indicative of connection problems " "(e.g. faulty wiring) or a faulty adxl345 chip." % ( reg, val, stored_val)) - # Measurement collection def is_measuring(self): return self.query_rate > 0 - def _handle_adxl345_data(self, params): - with self.lock: - self.raw_samples.append(params) - def _extract_samples(self, raw_samples): - # Load variables to optimize inner loop below - (x_pos, x_scale), (y_pos, y_scale), (z_pos, z_scale) = self.axes_map - last_sequence = self.last_sequence - time_base, chip_base, inv_freq = self.clock_sync.get_time_translation() - # Process every message in raw_samples - count = seq = 0 - samples = [None] * (len(raw_samples) * SAMPLES_PER_BLOCK) - for params in raw_samples: - seq_diff = (last_sequence - params['sequence']) & 0xffff - seq_diff -= (seq_diff & 0x8000) << 1 - seq = last_sequence - seq_diff - d = bytearray(params['data']) - msg_cdiff = seq * SAMPLES_PER_BLOCK - chip_base - for i in range(len(d) // BYTES_PER_SAMPLE): - d_xyz = d[i*BYTES_PER_SAMPLE:(i+1)*BYTES_PER_SAMPLE] - xlow, ylow, zlow, xzhigh, yzhigh = d_xyz - if yzhigh & 0x80: - self.last_error_count += 1 - continue - rx = (xlow | ((xzhigh & 0x1f) << 8)) - ((xzhigh & 0x10) << 9) - ry = (ylow | ((yzhigh & 0x1f) << 8)) - ((yzhigh & 0x10) << 9) - rz = ((zlow | ((xzhigh & 0xe0) << 3) | ((yzhigh & 0xe0) << 6)) - - ((yzhigh & 0x40) << 7)) - raw_xyz = (rx, ry, rz) - x = round(raw_xyz[x_pos] * x_scale, 6) - y = round(raw_xyz[y_pos] * y_scale, 6) - z = round(raw_xyz[z_pos] * z_scale, 6) - ptime = round(time_base + (msg_cdiff + i) * inv_freq, 6) - samples[count] = (ptime, x, y, z) - count += 1 - self.clock_sync.set_last_chip_clock(seq * SAMPLES_PER_BLOCK + i) - del samples[count:] - return samples - def _update_clock(self, minclock=0): - # Query current state - for retry in range(5): - params = self.query_adxl345_status_cmd.send([self.oid], - minclock=minclock) - fifo = params['fifo'] & 0x7f - if fifo <= 32: - break - else: - raise self.printer.command_error("Unable to query adxl345 fifo") - mcu_clock = self.mcu.clock32_to_clock64(params['clock']) - sequence = (self.last_sequence & ~0xffff) | params['next_sequence'] - if sequence < self.last_sequence: - sequence += 0x10000 - self.last_sequence = sequence - buffered = params['buffered'] - limit_count = (self.last_limit_count & ~0xffff) | params['limit_count'] - if limit_count < self.last_limit_count: - limit_count += 0x10000 - self.last_limit_count = limit_count - duration = params['query_ticks'] - if duration > self.max_query_duration: - # Skip measurement as a high query time could skew clock tracking - self.max_query_duration = max(2 * self.max_query_duration, - self.mcu.seconds_to_clock(.000005)) - return - self.max_query_duration = 2 * duration - msg_count = (sequence * SAMPLES_PER_BLOCK - + buffered // BYTES_PER_SAMPLE + fifo) - # The "chip clock" is the message counter plus .5 for average - # inaccuracy of query responses and plus .5 for assumed offset - # of adxl345 hw processing time. - chip_clock = msg_count + 1 - self.clock_sync.update(mcu_clock + duration // 2, chip_clock) - def _start_measurements(self): + def start_measurements(self, rate=None): if self.is_measuring(): return - # In case of miswiring, testing ADXL345 device ID prevents treating - # noise or wrong signal as a correctly initialized device - dev_id = self.read_reg(REG_DEVID) - if dev_id != ADXL345_DEV_ID: - raise self.printer.command_error( - "Invalid adxl345 id (got %x vs %x).\n" - "This is generally indicative of connection problems\n" - "(e.g. faulty wiring) or a faulty adxl345 chip." - % (dev_id, ADXL345_DEV_ID)) + rate = rate or self.data_rate + if not self.is_initialized(): + self.initialize() # Setup chip in requested query rate - self.set_reg(REG_POWER_CTL, 0x00) - self.set_reg(REG_DATA_FORMAT, 0x0B) + clock = 0 + if self.last_tx_time: + clock = self.mcu.print_time_to_clock(self.last_tx_time) + self.set_reg(REG_POWER_CTL, 0x00, minclock=clock) self.set_reg(REG_FIFO_CTL, 0x00) - self.set_reg(REG_BW_RATE, QUERY_RATES[self.data_rate]) - self.set_reg(REG_FIFO_CTL, SET_FIFO_CTL) + self.set_reg(REG_BW_RATE, QUERY_RATES[rate]) + self.set_reg(REG_FIFO_CTL, 0x80) # Setup samples - with self.lock: - self.raw_samples = [] + print_time = self.printer.lookup_object('toolhead').get_last_move_time() + self.raw_samples = [] + self.last_sequence = 0 + self.samples_start1 = self.samples_start2 = print_time # Start bulk reading - systime = self.printer.get_reactor().monotonic() - print_time = self.mcu.estimated_print_time(systime) + MIN_MSG_TIME reqclock = self.mcu.print_time_to_clock(print_time) - rest_ticks = self.mcu.seconds_to_clock(4. / self.data_rate) - self.query_rate = self.data_rate + rest_ticks = self.mcu.seconds_to_clock(4. / rate) + self.last_tx_time = print_time + self.query_rate = rate self.query_adxl345_cmd.send([self.oid, reqclock, rest_ticks], reqclock=reqclock) - logging.info("ADXL345 starting '%s' measurements", self.name) - # Initialize clock tracking - self.last_sequence = 0 - self.last_limit_count = self.last_error_count = 0 - self.clock_sync.reset(reqclock, 0) - self.max_query_duration = 1 << 31 - self._update_clock(minclock=reqclock) - self.max_query_duration = 1 << 31 - def _finish_measurements(self): + def finish_measurements(self): + if not self.is_measuring(): + return ADXL345Results() + # Halt bulk reading + print_time = self.printer.lookup_object('toolhead').get_last_move_time() + clock = self.mcu.print_time_to_clock(print_time) + params = self.query_adxl345_end_cmd.send([self.oid, 0, 0], + minclock=clock) + self.last_tx_time = print_time + self.query_rate = 0 + raw_samples = self.raw_samples + self.raw_samples = [] + # Generate results + end1_time = self._clock_to_print_time(params['end1_clock']) + end2_time = self._clock_to_print_time(params['end2_clock']) + end_sequence = self._convert_sequence(params['sequence']) + overflows = params['limit_count'] + res = ADXL345Results() + res.setup_data(self.axes_map, raw_samples, end_sequence, overflows, + self.samples_start1, self.samples_start2, + end1_time, end2_time) + logging.info("ADXL345 finished %d measurements: %s", + res.total_count, res.get_stats()) + return res + def end_query(self, name, gcmd): if not self.is_measuring(): return - # Halt bulk reading - params = self.query_adxl345_end_cmd.send([self.oid, 0, 0]) - self.query_rate = 0 - with self.lock: - self.raw_samples = [] - logging.info("ADXL345 finished '%s' measurements", self.name) - # API interface - def _api_update(self, eventtime): - self._update_clock() - with self.lock: - raw_samples = self.raw_samples - self.raw_samples = [] - if not raw_samples: - return {} - samples = self._extract_samples(raw_samples) - if not samples: - return {} - return {'data': samples, 'errors': self.last_error_count, - 'overflows': self.last_limit_count} - def _api_startstop(self, is_start): - if is_start: - self._start_measurements() + res = self.finish_measurements() + # Write data to file + if self.name == "default": + filename = "/tmp/adxl345-%s.csv" % (name,) else: - self._finish_measurements() - def _handle_dump_adxl345(self, web_request): - self.api_dump.add_client(web_request) - hdr = ('time', 'x_acceleration', 'y_acceleration', 'z_acceleration') - web_request.send({'header': hdr}) - def start_internal_client(self): - cconn = self.api_dump.add_internal_client() - return AccelQueryHelper(self.printer, cconn) + filename = "/tmp/adxl345-%s-%s.csv" % (self.name, name,) + res.write_to_file(filename) + gcmd.respond_info( + "Writing raw accelerometer data to %s file" % (filename,)) + cmd_ACCELEROMETER_MEASURE_help = "Start/stop accelerometer" + def cmd_ACCELEROMETER_MEASURE(self, gcmd): + if self.is_measuring(): + name = gcmd.get("NAME", time.strftime("%Y%m%d_%H%M%S")) + if not name.replace('-', '').replace('_', '').isalnum(): + raise gcmd.error("Invalid adxl345 NAME parameter") + self.end_query(name, gcmd) + gcmd.respond_info("adxl345 measurements stopped") + else: + rate = gcmd.get_int("RATE", self.data_rate) + if rate not in QUERY_RATES: + raise gcmd.error("Not a valid adxl345 query rate: %d" % (rate,)) + self.start_measurements(rate) + gcmd.respond_info("adxl345 measurements started") + cmd_ACCELEROMETER_QUERY_help = "Query accelerometer for the current values" + def cmd_ACCELEROMETER_QUERY(self, gcmd): + if self.is_measuring(): + raise gcmd.error("adxl345 measurements in progress") + self.start_measurements() + reactor = self.printer.get_reactor() + eventtime = starttime = reactor.monotonic() + while not self.raw_samples: + eventtime = reactor.pause(eventtime + .1) + if eventtime > starttime + 3.: + # Try to shutdown the measurements + self.finish_measurements() + raise gcmd.error("Timeout reading adxl345 data") + result = self.finish_measurements() + values = result.decode_samples() + _, accel_x, accel_y, accel_z = values[-1] + gcmd.respond_info("adxl345 values (x, y, z): %.6f, %.6f, %.6f" % ( + accel_x, accel_y, accel_z)) + cmd_ADXL345_DEBUG_READ_help = "Query accelerometer register (for debugging)" + def cmd_ADXL345_DEBUG_READ(self, gcmd): + if self.is_measuring(): + raise gcmd.error("adxl345 measurements in progress") + reg = gcmd.get("REG", minval=29, maxval=57, parser=lambda x: int(x, 0)) + val = self.read_reg(reg) + gcmd.respond_info("ADXL345 REG[0x%x] = 0x%x" % (reg, val)) + cmd_ADXL345_DEBUG_WRITE_help = "Set accelerometer register (for debugging)" + def cmd_ADXL345_DEBUG_WRITE(self, gcmd): + if self.is_measuring(): + raise gcmd.error("adxl345 measurements in progress") + reg = gcmd.get("REG", minval=29, maxval=57, parser=lambda x: int(x, 0)) + val = gcmd.get("VAL", minval=0, maxval=255, parser=lambda x: int(x, 0)) + self.set_reg(reg, val) def load_config(config): return ADXL345(config) diff --git a/klippy/extras/angle.py b/klippy/extras/angle.py index d61a763..1d15457 100644 --- a/klippy/extras/angle.py +++ b/klippy/extras/angle.py @@ -1,578 +1,578 @@ -# Support for reading SPI magnetic angle sensors -# -# Copyright (C) 2021,2022 Kevin O'Connor -# -# This file may be distributed under the terms of the GNU GPLv3 license. -import logging, math, threading -from . import bus, motion_report - -MIN_MSG_TIME = 0.100 -TCODE_ERROR = 0xff - -TRINAMIC_DRIVERS = ["tmc2130", "tmc2208", "tmc2209", "tmc2660", "tmc5160"] - -CALIBRATION_BITS = 6 # 64 entries -ANGLE_BITS = 16 # angles range from 0..65535 - -class AngleCalibration: - def __init__(self, config): - self.printer = config.get_printer() - self.name = config.get_name() - self.stepper_name = config.get('stepper', None) - if self.stepper_name is None: - # No calibration - return - try: - import numpy - except: - raise config.error("Angle calibration requires numpy module") - sconfig = config.getsection(self.stepper_name) - sconfig.getint('microsteps', note_valid=False) - self.tmc_module = self.mcu_stepper = None - # Current calibration data - self.mcu_pos_offset = None - self.angle_phase_offset = 0. - self.calibration_reversed = False - self.calibration = [] - cal = config.get('calibrate', None) - if cal is not None: - data = [d.strip() for d in cal.split(',')] - angles = [float(d) for d in data if d] - self.load_calibration(angles) - # Register commands - self.printer.register_event_handler("stepper:sync_mcu_position", - self.handle_sync_mcu_pos) - self.printer.register_event_handler("klippy:connect", self.connect) - cname = self.name.split()[-1] - gcode = self.printer.lookup_object('gcode') - gcode.register_mux_command("ANGLE_CALIBRATE", "CHIP", - cname, self.cmd_ANGLE_CALIBRATE, - desc=self.cmd_ANGLE_CALIBRATE_help) - def handle_sync_mcu_pos(self, mcu_stepper): - if mcu_stepper.get_name() == self.stepper_name: - self.mcu_pos_offset = None - def calc_mcu_pos_offset(self, sample): - # Lookup phase information - mcu_phase_offset, phases = self.tmc_module.get_phase_offset() - if mcu_phase_offset is None: - return - # Find mcu position at time of sample - angle_time, angle_pos = sample - mcu_pos = self.mcu_stepper.get_past_mcu_position(angle_time) - # Convert angle_pos to mcu_pos units - microsteps, full_steps = self.get_microsteps() - angle_to_mcu_pos = full_steps * microsteps / float(1< phases//2: - phase_diff -= phases - # Store final offset - self.mcu_pos_offset = mcu_pos - (angle_mpos - phase_diff) - def apply_calibration(self, samples): - calibration = self.calibration - if not calibration: - return None - calibration_reversed = self.calibration_reversed - interp_bits = ANGLE_BITS - CALIBRATION_BITS - interp_mask = (1 << interp_bits) - 1 - interp_round = 1 << (interp_bits - 1) - for i, (samp_time, angle) in enumerate(samples): - bucket = (angle & 0xffff) >> interp_bits - cal1 = calibration[bucket] - cal2 = calibration[bucket + 1] - adj = (angle & interp_mask) * (cal2 - cal1) - adj = cal1 + ((adj + interp_round) >> interp_bits) - angle_diff = (angle - adj) & 0xffff - angle_diff -= (angle_diff & 0x8000) << 1 - new_angle = angle - angle_diff - if calibration_reversed: - new_angle = -new_angle - samples[i] = (samp_time, new_angle) - if self.mcu_pos_offset is None: - self.calc_mcu_pos_offset(samples[0]) - if self.mcu_pos_offset is None: - return None - return self.mcu_stepper.mcu_to_commanded_position(self.mcu_pos_offset) - def load_calibration(self, angles): - # Calculate linear intepolation calibration buckets by solving - # linear equations - angle_max = 1 << ANGLE_BITS - calibration_count = 1 << CALIBRATION_BITS - bucket_size = angle_max // calibration_count - full_steps = len(angles) - nominal_step = float(angle_max) / full_steps - self.angle_phase_offset = (angles.index(min(angles)) & 3) * nominal_step - self.calibration_reversed = angles[-2] > angles[-1] - if self.calibration_reversed: - angles = list(reversed(angles)) - first_step = angles.index(min(angles)) - angles = angles[first_step:] + angles[:first_step] - import numpy - eqs = numpy.zeros((full_steps, calibration_count)) - ans = numpy.zeros((full_steps,)) - for step, angle in enumerate(angles): - int_angle = int(angle + .5) % angle_max - bucket = int(int_angle / bucket_size) - bucket_start = bucket * bucket_size - ang_diff = angle - bucket_start - ang_diff_per = ang_diff / bucket_size - eq = eqs[step] - eq[bucket] = 1. - ang_diff_per - eq[(bucket + 1) % calibration_count] = ang_diff_per - ans[step] = float(step * nominal_step) - if bucket + 1 >= calibration_count: - ans[step] -= ang_diff_per * angle_max - sol = numpy.linalg.lstsq(eqs, ans, rcond=None)[0] - isol = [int(s + .5) for s in sol] - self.calibration = isol + [isol[0] + angle_max] - def lookup_tmc(self): - for driver in TRINAMIC_DRIVERS: - driver_name = "%s %s" % (driver, self.stepper_name) - module = self.printer.lookup_object(driver_name, None) - if module is not None: - return module - raise self.printer.command_error("Unable to find TMC driver for %s" - % (self.stepper_name,)) - def connect(self): - self.tmc_module = self.lookup_tmc() - fmove = self.printer.lookup_object('force_move') - self.mcu_stepper = fmove.lookup_stepper(self.stepper_name) - def get_microsteps(self): - configfile = self.printer.lookup_object('configfile') - sconfig = configfile.get_status(None)['settings'] - stconfig = sconfig.get(self.stepper_name, {}) - microsteps = stconfig['microsteps'] - full_steps = stconfig['full_steps_per_rotation'] - return microsteps, full_steps - def get_stepper_phase(self): - mcu_phase_offset, phases = self.tmc_module.get_phase_offset() - if mcu_phase_offset is None: - raise self.printer.command_error("Driver phase not known for %s" - % (self.stepper_name,)) - mcu_pos = self.mcu_stepper.get_mcu_position() - return (mcu_pos + mcu_phase_offset) % phases - def do_calibration_moves(self): - move = self.printer.lookup_object('force_move').manual_move - # Start data collection - angle_sensor = self.printer.lookup_object(self.name) - cconn = angle_sensor.start_internal_client() - # Move stepper several turns (to allow internal sensor calibration) - microsteps, full_steps = self.get_microsteps() - mcu_stepper = self.mcu_stepper - step_dist = mcu_stepper.get_step_dist() - full_step_dist = step_dist * microsteps - rotation_dist = full_steps * full_step_dist - align_dist = step_dist * self.get_stepper_phase() - move_time = 0.010 - move_speed = full_step_dist / move_time - move(mcu_stepper, -(rotation_dist+align_dist), move_speed) - move(mcu_stepper, 2. * rotation_dist, move_speed) - move(mcu_stepper, -2. * rotation_dist, move_speed) - move(mcu_stepper, .5 * rotation_dist - full_step_dist, move_speed) - # Move to each full step position - toolhead = self.printer.lookup_object('toolhead') - times = [] - samp_dist = full_step_dist - for i in range(2 * full_steps): - move(mcu_stepper, samp_dist, move_speed) - start_query_time = toolhead.get_last_move_time() + 0.050 - end_query_time = start_query_time + 0.050 - times.append((start_query_time, end_query_time)) - toolhead.dwell(0.150) - if i == full_steps-1: - # Reverse direction and test each full step again - move(mcu_stepper, .5 * rotation_dist, move_speed) - move(mcu_stepper, -.5 * rotation_dist + samp_dist, move_speed) - samp_dist = -samp_dist - move(mcu_stepper, .5*rotation_dist + align_dist, move_speed) - toolhead.wait_moves() - # Finish data collection - cconn.finalize() - msgs = cconn.get_messages() - # Correlate query responses - cal = {} - step = 0 - for msg in msgs: - for query_time, pos in msg['params']['data']: - # Add to step tracking - while step < len(times) and query_time > times[step][1]: - step += 1 - if step < len(times) and query_time >= times[step][0]: - cal.setdefault(step, []).append(pos) - if len(cal) != len(times): - raise self.printer.command_error( - "Failed calibration - incomplete sensor data") - fcal = { i: cal[i] for i in range(full_steps) } - rcal = { full_steps-i-1: cal[i+full_steps] for i in range(full_steps) } - return fcal, rcal - def calc_angles(self, meas): - total_count = total_variance = 0 - angles = {} - for step, data in meas.items(): - count = len(data) - angle_avg = float(sum(data)) / count - angles[step] = angle_avg - total_count += count - total_variance += sum([(d - angle_avg)**2 for d in data]) - return angles, math.sqrt(total_variance / total_count), total_count - cmd_ANGLE_CALIBRATE_help = "Calibrate angle sensor to stepper motor" - def cmd_ANGLE_CALIBRATE(self, gcmd): - # Perform calibration movement and capture - old_calibration = self.calibration - self.calibration = [] - try: - fcal, rcal = self.do_calibration_moves() - finally: - self.calibration = old_calibration - # Calculate each step position average and variance - microsteps, full_steps = self.get_microsteps() - fangles, fstd, ftotal = self.calc_angles(fcal) - rangles, rstd, rtotal = self.calc_angles(rcal) - if (len({a: i for i, a in fangles.items()}) != len(fangles) - or len({a: i for i, a in rangles.items()}) != len(rangles)): - raise self.printer.command_error( - "Failed calibration - sensor not updating for each step") - merged = { i: fcal[i] + rcal[i] for i in range(full_steps) } - angles, std, total = self.calc_angles(merged) - gcmd.respond_info("angle: stddev=%.3f (%.3f forward / %.3f reverse)" - " in %d queries" % (std, fstd, rstd, total)) - # Order data with lowest/highest magnet position first - anglist = [angles[i] % 0xffff for i in range(full_steps)] - if angles[0] > angles[1]: - first_ang = max(anglist) - else: - first_ang = min(anglist) - first_phase = anglist.index(first_ang) & ~3 - anglist = anglist[first_phase:] + anglist[:first_phase] - # Save results - cal_contents = [] - for i, angle in enumerate(anglist): - if not i % 8: - cal_contents.append('\n') - cal_contents.append("%.1f" % (angle,)) - cal_contents.append(',') - cal_contents.pop() - configfile = self.printer.lookup_object('configfile') - configfile.remove_section(self.name) - configfile.set(self.name, 'calibrate', ''.join(cal_contents)) - -class HelperA1333: - SPI_MODE = 3 - SPI_SPEED = 10000000 - def __init__(self, config, spi, oid): - self.spi = spi - self.is_tcode_absolute = False - self.last_temperature = None - def get_static_delay(self): - return .000001 - def start(self): - # Setup for angle query - self.spi.spi_transfer([0x32, 0x00]) - -class HelperAS5047D: - SPI_MODE = 1 - SPI_SPEED = int(1. / .000000350) - def __init__(self, config, spi, oid): - self.spi = spi - self.is_tcode_absolute = False - self.last_temperature = None - def get_static_delay(self): - return .000100 - def start(self): - # Clear any errors from device - self.spi.spi_transfer([0xff, 0xfc]) # Read DIAAGC - self.spi.spi_transfer([0x40, 0x01]) # Read ERRFL - self.spi.spi_transfer([0xc0, 0x00]) # Read NOP - -class HelperTLE5012B: - SPI_MODE = 1 - SPI_SPEED = 4000000 - def __init__(self, config, spi, oid): - self.printer = config.get_printer() - self.spi = spi - self.oid = oid - self.is_tcode_absolute = True - self.last_temperature = None - self.mcu = spi.get_mcu() - self.mcu.register_config_callback(self._build_config) - self.spi_angle_transfer_cmd = None - self.last_chip_mcu_clock = self.last_chip_clock = 0 - self.chip_freq = 0. - name = config.get_name().split()[-1] - gcode = self.printer.lookup_object("gcode") - gcode.register_mux_command("ANGLE_DEBUG_READ", "CHIP", name, - self.cmd_ANGLE_DEBUG_READ, - desc=self.cmd_ANGLE_DEBUG_READ_help) - gcode.register_mux_command("ANGLE_DEBUG_WRITE", "CHIP", name, - self.cmd_ANGLE_DEBUG_WRITE, - desc=self.cmd_ANGLE_DEBUG_WRITE_help) - def _build_config(self): - cmdqueue = self.spi.get_command_queue() - self.spi_angle_transfer_cmd = self.mcu.lookup_query_command( - "spi_angle_transfer oid=%c data=%*s", - "spi_angle_transfer_response oid=%c clock=%u response=%*s", - oid=self.oid, cq=cmdqueue) - def get_tcode_params(self): - return self.last_chip_mcu_clock, self.last_chip_clock, self.chip_freq - def _calc_crc(self, data): - crc = 0xff - for d in data: - crc ^= d - for i in range(8): - if crc & 0x80: - crc = (crc << 1) ^ 0x1d - else: - crc <<= 1 - return (~crc) & 0xff - def _send_spi(self, msg): - for retry in range(5): - if msg[0] & 0x04: - params = self.spi_angle_transfer_cmd.send([self.oid, msg]) - else: - params = self.spi.spi_transfer(msg) - resp = bytearray(params['response']) - crc = self._calc_crc(bytearray(msg[:2]) + resp[2:-2]) - if crc == resp[-1]: - return params - raise self.printer.command_error("Unable to query tle5012b chip") - def _read_reg(self, reg): - cw = 0x8000 | ((reg & 0x3f) << 4) | 0x01 - if reg >= 0x05 and reg <= 0x11: - cw |= 0x5000 - msg = [cw >> 8, cw & 0xff, 0, 0, 0, 0] - params = self._send_spi(msg) - resp = bytearray(params['response']) - return (resp[2] << 8) | resp[3] - def _write_reg(self, reg, val): - cw = ((reg & 0x3f) << 4) | 0x01 - if reg >= 0x05 and reg <= 0x11: - cw |= 0x5000 - msg = [cw >> 8, cw & 0xff, (val >> 8) & 0xff, val & 0xff, 0, 0] - for retry in range(5): - self._send_spi(msg) - rval = self._read_reg(reg) - if rval == val: - return - raise self.printer.command_error("Unable to write to tle5012b chip") - def _mask_reg(self, reg, off, on): - rval = self._read_reg(reg) - self._write_reg(reg, (rval & ~off) | on) - def _query_clock(self): - # Read frame counter (and normalize to a 16bit counter) - msg = [0x84, 0x42, 0, 0, 0, 0, 0, 0] # Read with latch, AREV and FSYNC - params = self._send_spi(msg) - resp = bytearray(params['response']) - mcu_clock = self.mcu.clock32_to_clock64(params['clock']) - chip_clock = ((resp[2] & 0x7e) << 9) | ((resp[4] & 0x3e) << 4) - # Calculate temperature - temper = resp[5] - ((resp[4] & 0x01) << 8) - self.last_temperature = (temper + 152) / 2.776 - return mcu_clock, chip_clock - def update_clock(self): - mcu_clock, chip_clock = self._query_clock() - mdiff = mcu_clock - self.last_chip_mcu_clock - chip_mclock = self.last_chip_clock + int(mdiff * self.chip_freq + .5) - cdiff = (chip_mclock - chip_clock) & 0xffff - cdiff -= (cdiff & 0x8000) << 1 - new_chip_clock = chip_mclock - cdiff - self.chip_freq = float(new_chip_clock - self.last_chip_clock) / mdiff - self.last_chip_clock = new_chip_clock - self.last_chip_mcu_clock = mcu_clock - def start(self): - # Clear any errors from device - self._read_reg(0x00) # Read STAT - # Initialize chip (so different chip variants work the same way) - self._mask_reg(0x06, 0xc003, 0x4000) # MOD1: 42.7us, IIF disable - self._mask_reg(0x08, 0x0007, 0x0001) # MOD2: Predict off, autocal=1 - self._mask_reg(0x0e, 0x0003, 0x0000) # MOD4: IIF mode - # Setup starting clock values - mcu_clock, chip_clock = self._query_clock() - self.last_chip_clock = chip_clock - self.last_chip_mcu_clock = mcu_clock - self.chip_freq = float(1<<5) / self.mcu.seconds_to_clock(1. / 750000.) - self.update_clock() - cmd_ANGLE_DEBUG_READ_help = "Query low-level angle sensor register" - def cmd_ANGLE_DEBUG_READ(self, gcmd): - reg = gcmd.get("REG", minval=0, maxval=0x30, parser=lambda x: int(x, 0)) - val = self._read_reg(reg) - gcmd.respond_info("ANGLE REG[0x%02x] = 0x%04x" % (reg, val)) - cmd_ANGLE_DEBUG_WRITE_help = "Set low-level angle sensor register" - def cmd_ANGLE_DEBUG_WRITE(self, gcmd): - reg = gcmd.get("REG", minval=0, maxval=0x30, parser=lambda x: int(x, 0)) - val = gcmd.get("VAL", minval=0, maxval=0xffff, - parser=lambda x: int(x, 0)) - self._write_reg(reg, val) - -SAMPLE_PERIOD = 0.000400 - -class Angle: - def __init__(self, config): - self.printer = config.get_printer() - self.sample_period = config.getfloat('sample_period', SAMPLE_PERIOD, - above=0.) - self.calibration = AngleCalibration(config) - # Measurement conversion - self.start_clock = self.time_shift = self.sample_ticks = 0 - self.last_sequence = self.last_angle = 0 - # Measurement storage (accessed from background thread) - self.lock = threading.Lock() - self.raw_samples = [] - # Sensor type - sensors = { "a1333": HelperA1333, "as5047d": HelperAS5047D, - "tle5012b": HelperTLE5012B } - sensor_type = config.getchoice('sensor_type', {s: s for s in sensors}) - sensor_class = sensors[sensor_type] - self.spi = bus.MCU_SPI_from_config(config, sensor_class.SPI_MODE, - default_speed=sensor_class.SPI_SPEED) - self.mcu = mcu = self.spi.get_mcu() - self.oid = oid = mcu.create_oid() - self.sensor_helper = sensor_class(config, self.spi, oid) - # Setup mcu sensor_spi_angle bulk query code - self.query_spi_angle_cmd = self.query_spi_angle_end_cmd = None - mcu.add_config_cmd( - "config_spi_angle oid=%d spi_oid=%d spi_angle_type=%s" - % (oid, self.spi.get_oid(), sensor_type)) - mcu.add_config_cmd( - "query_spi_angle oid=%d clock=0 rest_ticks=0 time_shift=0" - % (oid,), on_restart=True) - mcu.register_config_callback(self._build_config) - mcu.register_response(self._handle_spi_angle_data, - "spi_angle_data", oid) - # API server endpoints - self.api_dump = motion_report.APIDumpHelper( - self.printer, self._api_update, self._api_startstop, 0.100) - self.name = config.get_name().split()[1] - wh = self.printer.lookup_object('webhooks') - wh.register_mux_endpoint("angle/dump_angle", "sensor", self.name, - self._handle_dump_angle) - def _build_config(self): - freq = self.mcu.seconds_to_clock(1.) - while float(TCODE_ERROR << self.time_shift) / freq < 0.002: - self.time_shift += 1 - cmdqueue = self.spi.get_command_queue() - self.query_spi_angle_cmd = self.mcu.lookup_command( - "query_spi_angle oid=%c clock=%u rest_ticks=%u time_shift=%c", - cq=cmdqueue) - self.query_spi_angle_end_cmd = self.mcu.lookup_query_command( - "query_spi_angle oid=%c clock=%u rest_ticks=%u time_shift=%c", - "spi_angle_end oid=%c sequence=%hu", oid=self.oid, cq=cmdqueue) - def get_status(self, eventtime=None): - return {'temperature': self.sensor_helper.last_temperature} - # Measurement collection - def is_measuring(self): - return self.start_clock != 0 - def _handle_spi_angle_data(self, params): - with self.lock: - self.raw_samples.append(params) - def _extract_samples(self, raw_samples): - # Load variables to optimize inner loop below - sample_ticks = self.sample_ticks - start_clock = self.start_clock - clock_to_print_time = self.mcu.clock_to_print_time - last_sequence = self.last_sequence - last_angle = self.last_angle - time_shift = 0 - static_delay = 0. - last_chip_mcu_clock = last_chip_clock = chip_freq = inv_chip_freq = 0. - is_tcode_absolute = self.sensor_helper.is_tcode_absolute - if is_tcode_absolute: - tparams = self.sensor_helper.get_tcode_params() - last_chip_mcu_clock, last_chip_clock, chip_freq = tparams - inv_chip_freq = 1. / chip_freq - else: - time_shift = self.time_shift - static_delay = self.sensor_helper.get_static_delay() - # Process every message in raw_samples - count = error_count = 0 - samples = [None] * (len(raw_samples) * 16) - for params in raw_samples: - seq = (last_sequence & ~0xffff) | params['sequence'] - if seq < last_sequence: - seq += 0x10000 - last_sequence = seq - d = bytearray(params['data']) - msg_mclock = start_clock + seq*16*sample_ticks - for i in range(len(d) // 3): - tcode = d[i*3] - if tcode == TCODE_ERROR: - error_count += 1 - continue - raw_angle = d[i*3 + 1] | (d[i*3 + 2] << 8) - angle_diff = (last_angle - raw_angle) & 0xffff - angle_diff -= (angle_diff & 0x8000) << 1 - last_angle -= angle_diff - mclock = msg_mclock + i*sample_ticks - if is_tcode_absolute: - # tcode is tle5012b frame counter - mdiff = mclock - last_chip_mcu_clock - chip_mclock = last_chip_clock + int(mdiff * chip_freq + .5) - cdiff = ((tcode << 10) - chip_mclock) & 0xffff - cdiff -= (cdiff & 0x8000) << 1 - sclock = mclock + (cdiff - 0x800) * inv_chip_freq - else: - # tcode is mcu clock offset shifted by time_shift - sclock = mclock + (tcode< +# +# This file may be distributed under the terms of the GNU GPLv3 license. +import logging, math, threading +from . import bus, motion_report + +MIN_MSG_TIME = 0.100 +TCODE_ERROR = 0xff + +TRINAMIC_DRIVERS = ["tmc2130", "tmc2208", "tmc2209", "tmc2240", "tmc2660", "tmc5160"] + +CALIBRATION_BITS = 6 # 64 entries +ANGLE_BITS = 16 # angles range from 0..65535 + +class AngleCalibration: + def __init__(self, config): + self.printer = config.get_printer() + self.name = config.get_name() + self.stepper_name = config.get('stepper', None) + if self.stepper_name is None: + # No calibration + return + try: + import numpy + except: + raise config.error("Angle calibration requires numpy module") + sconfig = config.getsection(self.stepper_name) + sconfig.getint('microsteps', note_valid=False) + self.tmc_module = self.mcu_stepper = None + # Current calibration data + self.mcu_pos_offset = None + self.angle_phase_offset = 0. + self.calibration_reversed = False + self.calibration = [] + cal = config.get('calibrate', None) + if cal is not None: + data = [d.strip() for d in cal.split(',')] + angles = [float(d) for d in data if d] + self.load_calibration(angles) + # Register commands + self.printer.register_event_handler("stepper:sync_mcu_position", + self.handle_sync_mcu_pos) + self.printer.register_event_handler("klippy:connect", self.connect) + cname = self.name.split()[-1] + gcode = self.printer.lookup_object('gcode') + gcode.register_mux_command("ANGLE_CALIBRATE", "CHIP", + cname, self.cmd_ANGLE_CALIBRATE, + desc=self.cmd_ANGLE_CALIBRATE_help) + def handle_sync_mcu_pos(self, mcu_stepper): + if mcu_stepper.get_name() == self.stepper_name: + self.mcu_pos_offset = None + def calc_mcu_pos_offset(self, sample): + # Lookup phase information + mcu_phase_offset, phases = self.tmc_module.get_phase_offset() + if mcu_phase_offset is None: + return + # Find mcu position at time of sample + angle_time, angle_pos = sample + mcu_pos = self.mcu_stepper.get_past_mcu_position(angle_time) + # Convert angle_pos to mcu_pos units + microsteps, full_steps = self.get_microsteps() + angle_to_mcu_pos = full_steps * microsteps / float(1< phases//2: + phase_diff -= phases + # Store final offset + self.mcu_pos_offset = mcu_pos - (angle_mpos - phase_diff) + def apply_calibration(self, samples): + calibration = self.calibration + if not calibration: + return None + calibration_reversed = self.calibration_reversed + interp_bits = ANGLE_BITS - CALIBRATION_BITS + interp_mask = (1 << interp_bits) - 1 + interp_round = 1 << (interp_bits - 1) + for i, (samp_time, angle) in enumerate(samples): + bucket = (angle & 0xffff) >> interp_bits + cal1 = calibration[bucket] + cal2 = calibration[bucket + 1] + adj = (angle & interp_mask) * (cal2 - cal1) + adj = cal1 + ((adj + interp_round) >> interp_bits) + angle_diff = (angle - adj) & 0xffff + angle_diff -= (angle_diff & 0x8000) << 1 + new_angle = angle - angle_diff + if calibration_reversed: + new_angle = -new_angle + samples[i] = (samp_time, new_angle) + if self.mcu_pos_offset is None: + self.calc_mcu_pos_offset(samples[0]) + if self.mcu_pos_offset is None: + return None + return self.mcu_stepper.mcu_to_commanded_position(self.mcu_pos_offset) + def load_calibration(self, angles): + # Calculate linear intepolation calibration buckets by solving + # linear equations + angle_max = 1 << ANGLE_BITS + calibration_count = 1 << CALIBRATION_BITS + bucket_size = angle_max // calibration_count + full_steps = len(angles) + nominal_step = float(angle_max) / full_steps + self.angle_phase_offset = (angles.index(min(angles)) & 3) * nominal_step + self.calibration_reversed = angles[-2] > angles[-1] + if self.calibration_reversed: + angles = list(reversed(angles)) + first_step = angles.index(min(angles)) + angles = angles[first_step:] + angles[:first_step] + import numpy + eqs = numpy.zeros((full_steps, calibration_count)) + ans = numpy.zeros((full_steps,)) + for step, angle in enumerate(angles): + int_angle = int(angle + .5) % angle_max + bucket = int(int_angle / bucket_size) + bucket_start = bucket * bucket_size + ang_diff = angle - bucket_start + ang_diff_per = ang_diff / bucket_size + eq = eqs[step] + eq[bucket] = 1. - ang_diff_per + eq[(bucket + 1) % calibration_count] = ang_diff_per + ans[step] = float(step * nominal_step) + if bucket + 1 >= calibration_count: + ans[step] -= ang_diff_per * angle_max + sol = numpy.linalg.lstsq(eqs, ans, rcond=None)[0] + isol = [int(s + .5) for s in sol] + self.calibration = isol + [isol[0] + angle_max] + def lookup_tmc(self): + for driver in TRINAMIC_DRIVERS: + driver_name = "%s %s" % (driver, self.stepper_name) + module = self.printer.lookup_object(driver_name, None) + if module is not None: + return module + raise self.printer.command_error("Unable to find TMC driver for %s" + % (self.stepper_name,)) + def connect(self): + self.tmc_module = self.lookup_tmc() + fmove = self.printer.lookup_object('force_move') + self.mcu_stepper = fmove.lookup_stepper(self.stepper_name) + def get_microsteps(self): + configfile = self.printer.lookup_object('configfile') + sconfig = configfile.get_status(None)['settings'] + stconfig = sconfig.get(self.stepper_name, {}) + microsteps = stconfig['microsteps'] + full_steps = stconfig['full_steps_per_rotation'] + return microsteps, full_steps + def get_stepper_phase(self): + mcu_phase_offset, phases = self.tmc_module.get_phase_offset() + if mcu_phase_offset is None: + raise self.printer.command_error("Driver phase not known for %s" + % (self.stepper_name,)) + mcu_pos = self.mcu_stepper.get_mcu_position() + return (mcu_pos + mcu_phase_offset) % phases + def do_calibration_moves(self): + move = self.printer.lookup_object('force_move').manual_move + # Start data collection + angle_sensor = self.printer.lookup_object(self.name) + cconn = angle_sensor.start_internal_client() + # Move stepper several turns (to allow internal sensor calibration) + microsteps, full_steps = self.get_microsteps() + mcu_stepper = self.mcu_stepper + step_dist = mcu_stepper.get_step_dist() + full_step_dist = step_dist * microsteps + rotation_dist = full_steps * full_step_dist + align_dist = step_dist * self.get_stepper_phase() + move_time = 0.010 + move_speed = full_step_dist / move_time + move(mcu_stepper, -(rotation_dist+align_dist), move_speed) + move(mcu_stepper, 2. * rotation_dist, move_speed) + move(mcu_stepper, -2. * rotation_dist, move_speed) + move(mcu_stepper, .5 * rotation_dist - full_step_dist, move_speed) + # Move to each full step position + toolhead = self.printer.lookup_object('toolhead') + times = [] + samp_dist = full_step_dist + for i in range(2 * full_steps): + move(mcu_stepper, samp_dist, move_speed) + start_query_time = toolhead.get_last_move_time() + 0.050 + end_query_time = start_query_time + 0.050 + times.append((start_query_time, end_query_time)) + toolhead.dwell(0.150) + if i == full_steps-1: + # Reverse direction and test each full step again + move(mcu_stepper, .5 * rotation_dist, move_speed) + move(mcu_stepper, -.5 * rotation_dist + samp_dist, move_speed) + samp_dist = -samp_dist + move(mcu_stepper, .5*rotation_dist + align_dist, move_speed) + toolhead.wait_moves() + # Finish data collection + cconn.finalize() + msgs = cconn.get_messages() + # Correlate query responses + cal = {} + step = 0 + for msg in msgs: + for query_time, pos in msg['params']['data']: + # Add to step tracking + while step < len(times) and query_time > times[step][1]: + step += 1 + if step < len(times) and query_time >= times[step][0]: + cal.setdefault(step, []).append(pos) + if len(cal) != len(times): + raise self.printer.command_error( + "Failed calibration - incomplete sensor data") + fcal = { i: cal[i] for i in range(full_steps) } + rcal = { full_steps-i-1: cal[i+full_steps] for i in range(full_steps) } + return fcal, rcal + def calc_angles(self, meas): + total_count = total_variance = 0 + angles = {} + for step, data in meas.items(): + count = len(data) + angle_avg = float(sum(data)) / count + angles[step] = angle_avg + total_count += count + total_variance += sum([(d - angle_avg)**2 for d in data]) + return angles, math.sqrt(total_variance / total_count), total_count + cmd_ANGLE_CALIBRATE_help = "Calibrate angle sensor to stepper motor" + def cmd_ANGLE_CALIBRATE(self, gcmd): + # Perform calibration movement and capture + old_calibration = self.calibration + self.calibration = [] + try: + fcal, rcal = self.do_calibration_moves() + finally: + self.calibration = old_calibration + # Calculate each step position average and variance + microsteps, full_steps = self.get_microsteps() + fangles, fstd, ftotal = self.calc_angles(fcal) + rangles, rstd, rtotal = self.calc_angles(rcal) + if (len({a: i for i, a in fangles.items()}) != len(fangles) + or len({a: i for i, a in rangles.items()}) != len(rangles)): + raise self.printer.command_error( + "Failed calibration - sensor not updating for each step") + merged = { i: fcal[i] + rcal[i] for i in range(full_steps) } + angles, std, total = self.calc_angles(merged) + gcmd.respond_info("angle: stddev=%.3f (%.3f forward / %.3f reverse)" + " in %d queries" % (std, fstd, rstd, total)) + # Order data with lowest/highest magnet position first + anglist = [angles[i] % 0xffff for i in range(full_steps)] + if angles[0] > angles[1]: + first_ang = max(anglist) + else: + first_ang = min(anglist) + first_phase = anglist.index(first_ang) & ~3 + anglist = anglist[first_phase:] + anglist[:first_phase] + # Save results + cal_contents = [] + for i, angle in enumerate(anglist): + if not i % 8: + cal_contents.append('\n') + cal_contents.append("%.1f" % (angle,)) + cal_contents.append(',') + cal_contents.pop() + configfile = self.printer.lookup_object('configfile') + configfile.remove_section(self.name) + configfile.set(self.name, 'calibrate', ''.join(cal_contents)) + +class HelperA1333: + SPI_MODE = 3 + SPI_SPEED = 10000000 + def __init__(self, config, spi, oid): + self.spi = spi + self.is_tcode_absolute = False + self.last_temperature = None + def get_static_delay(self): + return .000001 + def start(self): + # Setup for angle query + self.spi.spi_transfer([0x32, 0x00]) + +class HelperAS5047D: + SPI_MODE = 1 + SPI_SPEED = int(1. / .000000350) + def __init__(self, config, spi, oid): + self.spi = spi + self.is_tcode_absolute = False + self.last_temperature = None + def get_static_delay(self): + return .000100 + def start(self): + # Clear any errors from device + self.spi.spi_transfer([0xff, 0xfc]) # Read DIAAGC + self.spi.spi_transfer([0x40, 0x01]) # Read ERRFL + self.spi.spi_transfer([0xc0, 0x00]) # Read NOP + +class HelperTLE5012B: + SPI_MODE = 1 + SPI_SPEED = 4000000 + def __init__(self, config, spi, oid): + self.printer = config.get_printer() + self.spi = spi + self.oid = oid + self.is_tcode_absolute = True + self.last_temperature = None + self.mcu = spi.get_mcu() + self.mcu.register_config_callback(self._build_config) + self.spi_angle_transfer_cmd = None + self.last_chip_mcu_clock = self.last_chip_clock = 0 + self.chip_freq = 0. + name = config.get_name().split()[-1] + gcode = self.printer.lookup_object("gcode") + gcode.register_mux_command("ANGLE_DEBUG_READ", "CHIP", name, + self.cmd_ANGLE_DEBUG_READ, + desc=self.cmd_ANGLE_DEBUG_READ_help) + gcode.register_mux_command("ANGLE_DEBUG_WRITE", "CHIP", name, + self.cmd_ANGLE_DEBUG_WRITE, + desc=self.cmd_ANGLE_DEBUG_WRITE_help) + def _build_config(self): + cmdqueue = self.spi.get_command_queue() + self.spi_angle_transfer_cmd = self.mcu.lookup_query_command( + "spi_angle_transfer oid=%c data=%*s", + "spi_angle_transfer_response oid=%c clock=%u response=%*s", + oid=self.oid, cq=cmdqueue) + def get_tcode_params(self): + return self.last_chip_mcu_clock, self.last_chip_clock, self.chip_freq + def _calc_crc(self, data): + crc = 0xff + for d in data: + crc ^= d + for i in range(8): + if crc & 0x80: + crc = (crc << 1) ^ 0x1d + else: + crc <<= 1 + return (~crc) & 0xff + def _send_spi(self, msg): + for retry in range(5): + if msg[0] & 0x04: + params = self.spi_angle_transfer_cmd.send([self.oid, msg]) + else: + params = self.spi.spi_transfer(msg) + resp = bytearray(params['response']) + crc = self._calc_crc(bytearray(msg[:2]) + resp[2:-2]) + if crc == resp[-1]: + return params + raise self.printer.command_error("Unable to query tle5012b chip") + def _read_reg(self, reg): + cw = 0x8000 | ((reg & 0x3f) << 4) | 0x01 + if reg >= 0x05 and reg <= 0x11: + cw |= 0x5000 + msg = [cw >> 8, cw & 0xff, 0, 0, 0, 0] + params = self._send_spi(msg) + resp = bytearray(params['response']) + return (resp[2] << 8) | resp[3] + def _write_reg(self, reg, val): + cw = ((reg & 0x3f) << 4) | 0x01 + if reg >= 0x05 and reg <= 0x11: + cw |= 0x5000 + msg = [cw >> 8, cw & 0xff, (val >> 8) & 0xff, val & 0xff, 0, 0] + for retry in range(5): + self._send_spi(msg) + rval = self._read_reg(reg) + if rval == val: + return + raise self.printer.command_error("Unable to write to tle5012b chip") + def _mask_reg(self, reg, off, on): + rval = self._read_reg(reg) + self._write_reg(reg, (rval & ~off) | on) + def _query_clock(self): + # Read frame counter (and normalize to a 16bit counter) + msg = [0x84, 0x42, 0, 0, 0, 0, 0, 0] # Read with latch, AREV and FSYNC + params = self._send_spi(msg) + resp = bytearray(params['response']) + mcu_clock = self.mcu.clock32_to_clock64(params['clock']) + chip_clock = ((resp[2] & 0x7e) << 9) | ((resp[4] & 0x3e) << 4) + # Calculate temperature + temper = resp[5] - ((resp[4] & 0x01) << 8) + self.last_temperature = (temper + 152) / 2.776 + return mcu_clock, chip_clock + def update_clock(self): + mcu_clock, chip_clock = self._query_clock() + mdiff = mcu_clock - self.last_chip_mcu_clock + chip_mclock = self.last_chip_clock + int(mdiff * self.chip_freq + .5) + cdiff = (chip_mclock - chip_clock) & 0xffff + cdiff -= (cdiff & 0x8000) << 1 + new_chip_clock = chip_mclock - cdiff + self.chip_freq = float(new_chip_clock - self.last_chip_clock) / mdiff + self.last_chip_clock = new_chip_clock + self.last_chip_mcu_clock = mcu_clock + def start(self): + # Clear any errors from device + self._read_reg(0x00) # Read STAT + # Initialize chip (so different chip variants work the same way) + self._mask_reg(0x06, 0xc003, 0x4000) # MOD1: 42.7us, IIF disable + self._mask_reg(0x08, 0x0007, 0x0001) # MOD2: Predict off, autocal=1 + self._mask_reg(0x0e, 0x0003, 0x0000) # MOD4: IIF mode + # Setup starting clock values + mcu_clock, chip_clock = self._query_clock() + self.last_chip_clock = chip_clock + self.last_chip_mcu_clock = mcu_clock + self.chip_freq = float(1<<5) / self.mcu.seconds_to_clock(1. / 750000.) + self.update_clock() + cmd_ANGLE_DEBUG_READ_help = "Query low-level angle sensor register" + def cmd_ANGLE_DEBUG_READ(self, gcmd): + reg = gcmd.get("REG", minval=0, maxval=0x30, parser=lambda x: int(x, 0)) + val = self._read_reg(reg) + gcmd.respond_info("ANGLE REG[0x%02x] = 0x%04x" % (reg, val)) + cmd_ANGLE_DEBUG_WRITE_help = "Set low-level angle sensor register" + def cmd_ANGLE_DEBUG_WRITE(self, gcmd): + reg = gcmd.get("REG", minval=0, maxval=0x30, parser=lambda x: int(x, 0)) + val = gcmd.get("VAL", minval=0, maxval=0xffff, + parser=lambda x: int(x, 0)) + self._write_reg(reg, val) + +SAMPLE_PERIOD = 0.000400 + +class Angle: + def __init__(self, config): + self.printer = config.get_printer() + self.sample_period = config.getfloat('sample_period', SAMPLE_PERIOD, + above=0.) + self.calibration = AngleCalibration(config) + # Measurement conversion + self.start_clock = self.time_shift = self.sample_ticks = 0 + self.last_sequence = self.last_angle = 0 + # Measurement storage (accessed from background thread) + self.lock = threading.Lock() + self.raw_samples = [] + # Sensor type + sensors = { "a1333": HelperA1333, "as5047d": HelperAS5047D, + "tle5012b": HelperTLE5012B } + sensor_type = config.getchoice('sensor_type', {s: s for s in sensors}) + sensor_class = sensors[sensor_type] + self.spi = bus.MCU_SPI_from_config(config, sensor_class.SPI_MODE, + default_speed=sensor_class.SPI_SPEED) + self.mcu = mcu = self.spi.get_mcu() + self.oid = oid = mcu.create_oid() + self.sensor_helper = sensor_class(config, self.spi, oid) + # Setup mcu sensor_spi_angle bulk query code + self.query_spi_angle_cmd = self.query_spi_angle_end_cmd = None + mcu.add_config_cmd( + "config_spi_angle oid=%d spi_oid=%d spi_angle_type=%s" + % (oid, self.spi.get_oid(), sensor_type)) + mcu.add_config_cmd( + "query_spi_angle oid=%d clock=0 rest_ticks=0 time_shift=0" + % (oid,), on_restart=True) + mcu.register_config_callback(self._build_config) + mcu.register_response(self._handle_spi_angle_data, + "spi_angle_data", oid) + # API server endpoints + self.api_dump = motion_report.APIDumpHelper( + self.printer, self._api_update, self._api_startstop, 0.100) + self.name = config.get_name().split()[1] + wh = self.printer.lookup_object('webhooks') + wh.register_mux_endpoint("angle/dump_angle", "sensor", self.name, + self._handle_dump_angle) + def _build_config(self): + freq = self.mcu.seconds_to_clock(1.) + while float(TCODE_ERROR << self.time_shift) / freq < 0.002: + self.time_shift += 1 + cmdqueue = self.spi.get_command_queue() + self.query_spi_angle_cmd = self.mcu.lookup_command( + "query_spi_angle oid=%c clock=%u rest_ticks=%u time_shift=%c", + cq=cmdqueue) + self.query_spi_angle_end_cmd = self.mcu.lookup_query_command( + "query_spi_angle oid=%c clock=%u rest_ticks=%u time_shift=%c", + "spi_angle_end oid=%c sequence=%hu", oid=self.oid, cq=cmdqueue) + def get_status(self, eventtime=None): + return {'temperature': self.sensor_helper.last_temperature} + # Measurement collection + def is_measuring(self): + return self.start_clock != 0 + def _handle_spi_angle_data(self, params): + with self.lock: + self.raw_samples.append(params) + def _extract_samples(self, raw_samples): + # Load variables to optimize inner loop below + sample_ticks = self.sample_ticks + start_clock = self.start_clock + clock_to_print_time = self.mcu.clock_to_print_time + last_sequence = self.last_sequence + last_angle = self.last_angle + time_shift = 0 + static_delay = 0. + last_chip_mcu_clock = last_chip_clock = chip_freq = inv_chip_freq = 0. + is_tcode_absolute = self.sensor_helper.is_tcode_absolute + if is_tcode_absolute: + tparams = self.sensor_helper.get_tcode_params() + last_chip_mcu_clock, last_chip_clock, chip_freq = tparams + inv_chip_freq = 1. / chip_freq + else: + time_shift = self.time_shift + static_delay = self.sensor_helper.get_static_delay() + # Process every message in raw_samples + count = error_count = 0 + samples = [None] * (len(raw_samples) * 16) + for params in raw_samples: + seq = (last_sequence & ~0xffff) | params['sequence'] + if seq < last_sequence: + seq += 0x10000 + last_sequence = seq + d = bytearray(params['data']) + msg_mclock = start_clock + seq*16*sample_ticks + for i in range(len(d) // 3): + tcode = d[i*3] + if tcode == TCODE_ERROR: + error_count += 1 + continue + raw_angle = d[i*3 + 1] | (d[i*3 + 2] << 8) + angle_diff = (last_angle - raw_angle) & 0xffff + angle_diff -= (angle_diff & 0x8000) << 1 + last_angle -= angle_diff + mclock = msg_mclock + i*sample_ticks + if is_tcode_absolute: + # tcode is tle5012b frame counter + mdiff = mclock - last_chip_mcu_clock + chip_mclock = last_chip_clock + int(mdiff * chip_freq + .5) + cdiff = ((tcode << 10) - chip_mclock) & 0xffff + cdiff -= (cdiff & 0x8000) << 1 + sclock = mclock + (cdiff - 0x800) * inv_chip_freq + else: + # tcode is mcu clock offset shifted by time_shift + sclock = mclock + (tcode< -# -# This file may be distributed under the terms of the GNU GPLv3 license. -import math, logging -import stepper - -TRINAMIC_DRIVERS = ["tmc2130", "tmc2208", "tmc2209", "tmc2660", "tmc5160"] - -# Calculate the trigger phase of a stepper motor -class PhaseCalc: - def __init__(self, printer, name, phases=None): - self.printer = printer - self.name = name - self.phases = phases - self.tmc_module = None - # Statistics tracking for ENDSTOP_PHASE_CALIBRATE - self.phase_history = self.last_phase = self.last_mcu_position = None - self.is_primary = self.stats_only = False - def lookup_tmc(self): - for driver in TRINAMIC_DRIVERS: - driver_name = "%s %s" % (driver, self.name) - module = self.printer.lookup_object(driver_name, None) - if module is not None: - self.tmc_module = module - if self.phases is None: - phase_offset, self.phases = module.get_phase_offset() - break - if self.phases is not None: - self.phase_history = [0] * self.phases - def convert_phase(self, driver_phase, driver_phases): - phases = self.phases - return (int(float(driver_phase) / driver_phases * phases + .5) % phases) - def calc_phase(self, stepper, trig_mcu_pos): - mcu_phase_offset = 0 - if self.tmc_module is not None: - mcu_phase_offset, phases = self.tmc_module.get_phase_offset() - if mcu_phase_offset is None: - if self.printer.get_start_args().get('debugoutput') is None: - raise self.printer.command_error("Stepper %s phase unknown" - % (self.name,)) - mcu_phase_offset = 0 - phase = (trig_mcu_pos + mcu_phase_offset) % self.phases - self.phase_history[phase] += 1 - self.last_phase = phase - self.last_mcu_position = trig_mcu_pos - return phase - -# Adjusted endstop trigger positions -class EndstopPhase: - def __init__(self, config): - self.printer = config.get_printer() - self.name = config.get_name().split()[1] - # Obtain step_distance and microsteps from stepper config section - sconfig = config.getsection(self.name) - rotation_dist, steps_per_rotation = stepper.parse_step_distance(sconfig) - self.step_dist = rotation_dist / steps_per_rotation - self.phases = sconfig.getint("microsteps", note_valid=False) * 4 - self.phase_calc = PhaseCalc(self.printer, self.name, self.phases) - # Register event handlers - self.printer.register_event_handler("klippy:connect", - self.phase_calc.lookup_tmc) - self.printer.register_event_handler("homing:home_rails_end", - self.handle_home_rails_end) - self.printer.load_object(config, "endstop_phase") - # Read config - self.endstop_phase = None - trigger_phase = config.get('trigger_phase', None) - if trigger_phase is not None: - p, ps = config.getintlist('trigger_phase', sep='/', count=2) - if p >= ps: - raise config.error("Invalid trigger_phase '%s'" - % (trigger_phase,)) - self.endstop_phase = self.phase_calc.convert_phase(p, ps) - self.endstop_align_zero = config.getboolean('endstop_align_zero', False) - self.endstop_accuracy = config.getfloat('endstop_accuracy', None, - above=0.) - # Determine endstop accuracy - if self.endstop_accuracy is None: - self.endstop_phase_accuracy = self.phases//2 - 1 - elif self.endstop_phase is not None: - self.endstop_phase_accuracy = int( - math.ceil(self.endstop_accuracy * .5 / self.step_dist)) - else: - self.endstop_phase_accuracy = int( - math.ceil(self.endstop_accuracy / self.step_dist)) - if self.endstop_phase_accuracy >= self.phases // 2: - raise config.error("Endstop for %s is not accurate enough for" - " stepper phase adjustment" % (self.name,)) - if self.printer.get_start_args().get('debugoutput') is not None: - self.endstop_phase_accuracy = self.phases - def align_endstop(self, rail): - if not self.endstop_align_zero or self.endstop_phase is None: - return 0. - # Adjust the endstop position so 0.0 is always at a full step - microsteps = self.phases // 4 - half_microsteps = microsteps // 2 - phase_offset = (((self.endstop_phase + half_microsteps) % microsteps) - - half_microsteps) * self.step_dist - full_step = microsteps * self.step_dist - pe = rail.get_homing_info().position_endstop - return int(pe / full_step + .5) * full_step - pe + phase_offset - def get_homed_offset(self, stepper, trig_mcu_pos): - phase = self.phase_calc.calc_phase(stepper, trig_mcu_pos) - if self.endstop_phase is None: - logging.info("Setting %s endstop phase to %d", self.name, phase) - self.endstop_phase = phase - return 0. - delta = (phase - self.endstop_phase) % self.phases - if delta >= self.phases - self.endstop_phase_accuracy: - delta -= self.phases - elif delta > self.endstop_phase_accuracy: - raise self.printer.command_error( - "Endstop %s incorrect phase (got %d vs %d)" % ( - self.name, phase, self.endstop_phase)) - return delta * self.step_dist - def handle_home_rails_end(self, homing_state, rails): - for rail in rails: - stepper = rail.get_steppers()[0] - if stepper.get_name() == self.name: - trig_mcu_pos = homing_state.get_trigger_position(self.name) - align = self.align_endstop(rail) - offset = self.get_homed_offset(stepper, trig_mcu_pos) - homing_state.set_stepper_adjustment(self.name, align + offset) - return - -# Support for ENDSTOP_PHASE_CALIBRATE command -class EndstopPhases: - def __init__(self, config): - self.printer = config.get_printer() - self.tracking = {} - # Register handlers - self.printer.register_event_handler("homing:home_rails_end", - self.handle_home_rails_end) - self.gcode = self.printer.lookup_object('gcode') - self.gcode.register_command("ENDSTOP_PHASE_CALIBRATE", - self.cmd_ENDSTOP_PHASE_CALIBRATE, - desc=self.cmd_ENDSTOP_PHASE_CALIBRATE_help) - def update_stepper(self, stepper, trig_mcu_pos, is_primary): - stepper_name = stepper.get_name() - phase_calc = self.tracking.get(stepper_name) - if phase_calc is None: - # Check if stepper has an endstop_phase config section defined - mod_name = "endstop_phase %s" % (stepper_name,) - m = self.printer.lookup_object(mod_name, None) - if m is not None: - phase_calc = m.phase_calc - else: - # Create new PhaseCalc tracker - phase_calc = PhaseCalc(self.printer, stepper_name) - phase_calc.stats_only = True - phase_calc.lookup_tmc() - self.tracking[stepper_name] = phase_calc - if phase_calc.phase_history is None: - return - if is_primary: - phase_calc.is_primary = True - if phase_calc.stats_only: - phase_calc.calc_phase(stepper, trig_mcu_pos) - def handle_home_rails_end(self, homing_state, rails): - for rail in rails: - is_primary = True - for stepper in rail.get_steppers(): - sname = stepper.get_name() - trig_mcu_pos = homing_state.get_trigger_position(sname) - self.update_stepper(stepper, trig_mcu_pos, is_primary) - is_primary = False - cmd_ENDSTOP_PHASE_CALIBRATE_help = "Calibrate stepper phase" - def cmd_ENDSTOP_PHASE_CALIBRATE(self, gcmd): - stepper_name = gcmd.get('STEPPER', None) - if stepper_name is None: - self.report_stats() - return - phase_calc = self.tracking.get(stepper_name) - if phase_calc is None or phase_calc.phase_history is None: - raise gcmd.error("Stats not available for stepper %s" - % (stepper_name,)) - endstop_phase, phases = self.generate_stats(stepper_name, phase_calc) - if not phase_calc.is_primary: - return - configfile = self.printer.lookup_object('configfile') - section = 'endstop_phase %s' % (stepper_name,) - configfile.remove_section(section) - configfile.set(section, "trigger_phase", - "%s/%s" % (endstop_phase, phases)) - gcmd.respond_info( - "The SAVE_CONFIG command will update the printer config\n" - "file with these parameters and restart the printer.") - def generate_stats(self, stepper_name, phase_calc): - phase_history = phase_calc.phase_history - wph = phase_history + phase_history - count = sum(phase_history) - phases = len(phase_history) - half_phases = phases // 2 - res = [] - for i in range(phases): - phase = i + half_phases - cost = sum([wph[j] * abs(j-phase) for j in range(i, i+phases)]) - res.append((cost, phase)) - res.sort() - best = res[0][1] - found = [j for j in range(best - half_phases, best + half_phases) - if wph[j]] - best_phase = best % phases - lo, hi = found[0] % phases, found[-1] % phases - self.gcode.respond_info("%s: trigger_phase=%d/%d (range %d to %d)" - % (stepper_name, best_phase, phases, lo, hi)) - return best_phase, phases - def report_stats(self): - if not self.tracking: - self.gcode.respond_info( - "No steppers found. (Be sure to home at least once.)") - return - for stepper_name in sorted(self.tracking.keys()): - phase_calc = self.tracking[stepper_name] - if phase_calc is None or not phase_calc.is_primary: - continue - self.generate_stats(stepper_name, phase_calc) - def get_status(self, eventtime): - lh = { name: {'phase': pc.last_phase, 'phases': pc.phases, - 'mcu_position': pc.last_mcu_position} - for name, pc in self.tracking.items() - if pc.phase_history is not None } - return { 'last_home': lh } - -def load_config_prefix(config): - return EndstopPhase(config) - -def load_config(config): - return EndstopPhases(config) +# Endstop accuracy improvement via stepper phase tracking +# +# Copyright (C) 2016-2021 Kevin O'Connor +# +# This file may be distributed under the terms of the GNU GPLv3 license. +import math, logging +import stepper + +TRINAMIC_DRIVERS = ["tmc2130", "tmc2208", "tmc2209", "tmc2240", "tmc2660", "tmc5160"] + +# Calculate the trigger phase of a stepper motor +class PhaseCalc: + def __init__(self, printer, name, phases=None): + self.printer = printer + self.name = name + self.phases = phases + self.tmc_module = None + # Statistics tracking for ENDSTOP_PHASE_CALIBRATE + self.phase_history = self.last_phase = self.last_mcu_position = None + self.is_primary = self.stats_only = False + def lookup_tmc(self): + for driver in TRINAMIC_DRIVERS: + driver_name = "%s %s" % (driver, self.name) + module = self.printer.lookup_object(driver_name, None) + if module is not None: + self.tmc_module = module + if self.phases is None: + phase_offset, self.phases = module.get_phase_offset() + break + if self.phases is not None: + self.phase_history = [0] * self.phases + def convert_phase(self, driver_phase, driver_phases): + phases = self.phases + return (int(float(driver_phase) / driver_phases * phases + .5) % phases) + def calc_phase(self, stepper, trig_mcu_pos): + mcu_phase_offset = 0 + if self.tmc_module is not None: + mcu_phase_offset, phases = self.tmc_module.get_phase_offset() + if mcu_phase_offset is None: + if self.printer.get_start_args().get('debugoutput') is None: + raise self.printer.command_error("Stepper %s phase unknown" + % (self.name,)) + mcu_phase_offset = 0 + phase = (trig_mcu_pos + mcu_phase_offset) % self.phases + self.phase_history[phase] += 1 + self.last_phase = phase + self.last_mcu_position = trig_mcu_pos + return phase + +# Adjusted endstop trigger positions +class EndstopPhase: + def __init__(self, config): + self.printer = config.get_printer() + self.name = config.get_name().split()[1] + # Obtain step_distance and microsteps from stepper config section + sconfig = config.getsection(self.name) + rotation_dist, steps_per_rotation = stepper.parse_step_distance(sconfig) + self.step_dist = rotation_dist / steps_per_rotation + self.phases = sconfig.getint("microsteps", note_valid=False) * 4 + self.phase_calc = PhaseCalc(self.printer, self.name, self.phases) + # Register event handlers + self.printer.register_event_handler("klippy:connect", + self.phase_calc.lookup_tmc) + self.printer.register_event_handler("homing:home_rails_end", + self.handle_home_rails_end) + self.printer.load_object(config, "endstop_phase") + # Read config + self.endstop_phase = None + trigger_phase = config.get('trigger_phase', None) + if trigger_phase is not None: + p, ps = config.getintlist('trigger_phase', sep='/', count=2) + if p >= ps: + raise config.error("Invalid trigger_phase '%s'" + % (trigger_phase,)) + self.endstop_phase = self.phase_calc.convert_phase(p, ps) + self.endstop_align_zero = config.getboolean('endstop_align_zero', False) + self.endstop_accuracy = config.getfloat('endstop_accuracy', None, + above=0.) + # Determine endstop accuracy + if self.endstop_accuracy is None: + self.endstop_phase_accuracy = self.phases//2 - 1 + elif self.endstop_phase is not None: + self.endstop_phase_accuracy = int( + math.ceil(self.endstop_accuracy * .5 / self.step_dist)) + else: + self.endstop_phase_accuracy = int( + math.ceil(self.endstop_accuracy / self.step_dist)) + if self.endstop_phase_accuracy >= self.phases // 2: + raise config.error("Endstop for %s is not accurate enough for" + " stepper phase adjustment" % (self.name,)) + if self.printer.get_start_args().get('debugoutput') is not None: + self.endstop_phase_accuracy = self.phases + def align_endstop(self, rail): + if not self.endstop_align_zero or self.endstop_phase is None: + return 0. + # Adjust the endstop position so 0.0 is always at a full step + microsteps = self.phases // 4 + half_microsteps = microsteps // 2 + phase_offset = (((self.endstop_phase + half_microsteps) % microsteps) + - half_microsteps) * self.step_dist + full_step = microsteps * self.step_dist + pe = rail.get_homing_info().position_endstop + return int(pe / full_step + .5) * full_step - pe + phase_offset + def get_homed_offset(self, stepper, trig_mcu_pos): + phase = self.phase_calc.calc_phase(stepper, trig_mcu_pos) + if self.endstop_phase is None: + logging.info("Setting %s endstop phase to %d", self.name, phase) + self.endstop_phase = phase + return 0. + delta = (phase - self.endstop_phase) % self.phases + if delta >= self.phases - self.endstop_phase_accuracy: + delta -= self.phases + elif delta > self.endstop_phase_accuracy: + raise self.printer.command_error( + "Endstop %s incorrect phase (got %d vs %d)" % ( + self.name, phase, self.endstop_phase)) + return delta * self.step_dist + def handle_home_rails_end(self, homing_state, rails): + for rail in rails: + stepper = rail.get_steppers()[0] + if stepper.get_name() == self.name: + trig_mcu_pos = homing_state.get_trigger_position(self.name) + align = self.align_endstop(rail) + offset = self.get_homed_offset(stepper, trig_mcu_pos) + homing_state.set_stepper_adjustment(self.name, align + offset) + return + +# Support for ENDSTOP_PHASE_CALIBRATE command +class EndstopPhases: + def __init__(self, config): + self.printer = config.get_printer() + self.tracking = {} + # Register handlers + self.printer.register_event_handler("homing:home_rails_end", + self.handle_home_rails_end) + self.gcode = self.printer.lookup_object('gcode') + self.gcode.register_command("ENDSTOP_PHASE_CALIBRATE", + self.cmd_ENDSTOP_PHASE_CALIBRATE, + desc=self.cmd_ENDSTOP_PHASE_CALIBRATE_help) + def update_stepper(self, stepper, trig_mcu_pos, is_primary): + stepper_name = stepper.get_name() + phase_calc = self.tracking.get(stepper_name) + if phase_calc is None: + # Check if stepper has an endstop_phase config section defined + mod_name = "endstop_phase %s" % (stepper_name,) + m = self.printer.lookup_object(mod_name, None) + if m is not None: + phase_calc = m.phase_calc + else: + # Create new PhaseCalc tracker + phase_calc = PhaseCalc(self.printer, stepper_name) + phase_calc.stats_only = True + phase_calc.lookup_tmc() + self.tracking[stepper_name] = phase_calc + if phase_calc.phase_history is None: + return + if is_primary: + phase_calc.is_primary = True + if phase_calc.stats_only: + phase_calc.calc_phase(stepper, trig_mcu_pos) + def handle_home_rails_end(self, homing_state, rails): + for rail in rails: + is_primary = True + for stepper in rail.get_steppers(): + sname = stepper.get_name() + trig_mcu_pos = homing_state.get_trigger_position(sname) + self.update_stepper(stepper, trig_mcu_pos, is_primary) + is_primary = False + cmd_ENDSTOP_PHASE_CALIBRATE_help = "Calibrate stepper phase" + def cmd_ENDSTOP_PHASE_CALIBRATE(self, gcmd): + stepper_name = gcmd.get('STEPPER', None) + if stepper_name is None: + self.report_stats() + return + phase_calc = self.tracking.get(stepper_name) + if phase_calc is None or phase_calc.phase_history is None: + raise gcmd.error("Stats not available for stepper %s" + % (stepper_name,)) + endstop_phase, phases = self.generate_stats(stepper_name, phase_calc) + if not phase_calc.is_primary: + return + configfile = self.printer.lookup_object('configfile') + section = 'endstop_phase %s' % (stepper_name,) + configfile.remove_section(section) + configfile.set(section, "trigger_phase", + "%s/%s" % (endstop_phase, phases)) + gcmd.respond_info( + "The SAVE_CONFIG command will update the printer config\n" + "file with these parameters and restart the printer.") + def generate_stats(self, stepper_name, phase_calc): + phase_history = phase_calc.phase_history + wph = phase_history + phase_history + count = sum(phase_history) + phases = len(phase_history) + half_phases = phases // 2 + res = [] + for i in range(phases): + phase = i + half_phases + cost = sum([wph[j] * abs(j-phase) for j in range(i, i+phases)]) + res.append((cost, phase)) + res.sort() + best = res[0][1] + found = [j for j in range(best - half_phases, best + half_phases) + if wph[j]] + best_phase = best % phases + lo, hi = found[0] % phases, found[-1] % phases + self.gcode.respond_info("%s: trigger_phase=%d/%d (range %d to %d)" + % (stepper_name, best_phase, phases, lo, hi)) + return best_phase, phases + def report_stats(self): + if not self.tracking: + self.gcode.respond_info( + "No steppers found. (Be sure to home at least once.)") + return + for stepper_name in sorted(self.tracking.keys()): + phase_calc = self.tracking[stepper_name] + if phase_calc is None or not phase_calc.is_primary: + continue + self.generate_stats(stepper_name, phase_calc) + def get_status(self, eventtime): + lh = { name: {'phase': pc.last_phase, 'phases': pc.phases, + 'mcu_position': pc.last_mcu_position} + for name, pc in self.tracking.items() + if pc.phase_history is not None } + return { 'last_home': lh } + +def load_config_prefix(config): + return EndstopPhase(config) + +def load_config(config): + return EndstopPhases(config) diff --git a/klippy/extras/extruder_stepper.py b/klippy/extras/extruder_stepper.py index 1664767..6c4460b 100644 --- a/klippy/extras/extruder_stepper.py +++ b/klippy/extras/extruder_stepper.py @@ -1,20 +1,22 @@ -# Code for supporting multiple steppers in single filament extruder. -# -# Copyright (C) 2019 Simo Apell -# -# This file may be distributed under the terms of the GNU GPLv3 license. -import logging -from kinematics import extruder - -class PrinterExtruderStepper: - def __init__(self, config): - self.printer = config.get_printer() - self.extruder_stepper = extruder.ExtruderStepper(config) - self.extruder_name = config.get('extruder') - self.printer.register_event_handler("klippy:connect", - self.handle_connect) - def handle_connect(self): - self.extruder_stepper.sync_to_extruder(self.extruder_name) - -def load_config_prefix(config): - return PrinterExtruderStepper(config) +# Code for supporting multiple steppers in single filament extruder. +# +# Copyright (C) 2019 Simo Apell +# +# This file may be distributed under the terms of the GNU GPLv3 license. +import logging +from kinematics import extruder + +class PrinterExtruderStepper: + def __init__(self, config): + self.printer = config.get_printer() + self.extruder_stepper = extruder.ExtruderStepper(config) + self.extruder_name = config.get('extruder') + self.printer.register_event_handler("klippy:connect", + self.handle_connect) + def handle_connect(self): + self.extruder_stepper.sync_to_extruder(self.extruder_name) + def get_status(self, eventtime): + return self.extruder_stepper.get_status(eventtime) + +def load_config_prefix(config): + return PrinterExtruderStepper(config) diff --git a/klippy/extras/manual_probe.py b/klippy/extras/manual_probe.py index eb74ff2..c6e9dc6 100644 --- a/klippy/extras/manual_probe.py +++ b/klippy/extras/manual_probe.py @@ -24,9 +24,19 @@ class ManualProbe: 'Z_OFFSET_APPLY_ENDSTOP', self.cmd_Z_OFFSET_APPLY_ENDSTOP, desc=self.cmd_Z_OFFSET_APPLY_ENDSTOP_help) + self.reset_status() def manual_probe_finalize(self, kin_pos): if kin_pos is not None: self.gcode.respond_info("Z position is %.3f" % (kin_pos[2],)) + def reset_status(self): + self.status = { + 'is_active': False, + 'z_position': None, + 'z_position_lower': None, + 'z_position_upper': None + } + def get_status(self, eventtime): + return self.status cmd_MANUAL_PROBE_help = "Start manual probe helper script" def cmd_MANUAL_PROBE(self, gcmd): ManualProbeHelper(self.printer, gcmd, self.manual_probe_finalize) @@ -78,6 +88,7 @@ class ManualProbeHelper: self.finalize_callback = finalize_callback self.gcode = self.printer.lookup_object('gcode') self.toolhead = self.printer.lookup_object('toolhead') + self.manual_probe = self.printer.lookup_object('manual_probe') self.speed = gcmd.get_float("SPEED", 5.) self.past_positions = [] self.last_toolhead_pos = self.last_kinematics_pos = None @@ -130,11 +141,20 @@ class ManualProbeHelper: prev_pos = next_pos - 1 if next_pos < len(pp) and pp[next_pos] == z_pos: next_pos += 1 + prev_pos_val = next_pos_val = None prev_str = next_str = "??????" if prev_pos >= 0: - prev_str = "%.3f" % (pp[prev_pos],) + prev_pos_val = pp[prev_pos] + prev_str = "%.3f" % (prev_pos_val,) if next_pos < len(pp): - next_str = "%.3f" % (pp[next_pos],) + next_pos_val = pp[next_pos] + next_str = "%.3f" % (next_pos_val,) + self.manual_probe.status = { + 'is_active': True, + 'z_position': z_pos, + 'z_position_lower': prev_pos_val, + 'z_position_upper': next_pos_val, + } # Find recent positions self.gcode.respond_info("Z position: %s --> %.3f <-- %s" % (prev_str, z_pos, next_str)) @@ -183,6 +203,7 @@ class ManualProbeHelper: self.move_z(next_z_pos) self.report_z_status(next_z_pos != z_pos, z_pos) def finalize(self, success): + self.manual_probe.reset_status() self.gcode.register_command('ACCEPT', None) self.gcode.register_command('NEXT', None) self.gcode.register_command('ABORT', None) diff --git a/klippy/extras/resonance_tester.py b/klippy/extras/resonance_tester.py index db03e32..97802a7 100644 --- a/klippy/extras/resonance_tester.py +++ b/klippy/extras/resonance_tester.py @@ -6,6 +6,15 @@ import logging, math, os, time from . import shaper_calibrate +def _parse_probe_points(config): + points = config.get('probe_points').split('\n') + try: + points = [line.split(',', 2) for line in points if line.strip()] + return [[float(coord.strip()) for coord in p] for p in points] + except: + raise config.error("Unable to parse probe_points in %s" % ( + config.get_name())) + class TestAxis: def __init__(self, axis=None, vib_dir=None): if axis is None: @@ -57,8 +66,7 @@ class VibrationPulseTest: self.hz_per_sec = config.getfloat('hz_per_sec', 1., minval=0.1, maxval=2.) - self.probe_points = config.getlists('probe_points', seps=(',', '\n'), - parser=float, count=3) + self.probe_points = _parse_probe_points(config) def get_start_test_points(self): return self.probe_points def prepare_test(self, gcmd): @@ -147,21 +155,15 @@ class ResonanceTester: (chip_axis, self.printer.lookup_object(chip_name)) for chip_axis, chip_name in self.accel_chip_names] - def _run_test(self, gcmd, axes, helper, raw_name_suffix=None, - accel_chips=None, test_point=None): + def _run_test(self, gcmd, axes, helper, raw_name_suffix=None): toolhead = self.printer.lookup_object('toolhead') calibration_data = {axis: None for axis in axes} self.test.prepare_test(gcmd) - - if test_point is not None: - test_points = [test_point] - else: - test_points = self.test.get_start_test_points() - + test_points = self.test.get_start_test_points() for point in test_points: toolhead.manual_move(point, self.move_speed) - if len(test_points) > 1 or test_point is not None: + if len(test_points) > 1: gcmd.respond_info( "Probing point (%.3f, %.3f, %.3f)" % tuple(point)) for axis in axes: @@ -170,38 +172,34 @@ class ResonanceTester: if len(axes) > 1: gcmd.respond_info("Testing axis %s" % axis.get_name()) - raw_values = [] - if accel_chips is None: - for chip_axis, chip in self.accel_chips: - if axis.matches(chip_axis): - aclient = chip.start_internal_client() - raw_values.append((chip_axis, aclient, chip.name)) - else: - for chip in accel_chips: - aclient = chip.start_internal_client() - raw_values.append((axis, aclient, chip.name)) - + for chip_axis, chip in self.accel_chips: + if axis.matches(chip_axis): + chip.start_measurements() # Generate moves self.test.run_test(axis, gcmd) - for chip_axis, aclient, chip_name in raw_values: - aclient.finish_measurements() - if raw_name_suffix is not None: - raw_name = self.get_filename( - 'raw_data', raw_name_suffix, axis, - point if len(test_points) > 1 else None, - chip_name if accel_chips is not None else None,) - aclient.write_to_file(raw_name) - gcmd.respond_info( - "Writing raw accelerometer data to " - "%s file" % (raw_name,)) + raw_values = [] + for chip_axis, chip in self.accel_chips: + if axis.matches(chip_axis): + results = chip.finish_measurements() + if raw_name_suffix is not None: + raw_name = self.get_filename( + 'raw_data', raw_name_suffix, axis, + point if len(test_points) > 1 else None) + results.write_to_file(raw_name) + gcmd.respond_info( + "Writing raw accelerometer data to " + "%s file" % (raw_name,)) + raw_values.append((chip_axis, results)) + gcmd.respond_info("%s-axis accelerometer stats: %s" % ( + chip_axis, results.get_stats(),)) if helper is None: continue - for chip_axis, aclient, chip_name in raw_values: - if not aclient.has_valid_samples(): + for chip_axis, chip_values in raw_values: + if not chip_values: raise gcmd.error( - "accelerometer '%s' measured no data" % ( - chip_name,)) - new_data = helper.process_accelerometer_data(aclient) + "%s-axis accelerometer measured no data" % ( + chip_axis,)) + new_data = helper.process_accelerometer_data(chip_values) if calibration_data[axis] is None: calibration_data[axis] = new_data else: @@ -211,28 +209,6 @@ class ResonanceTester: def cmd_TEST_RESONANCES(self, gcmd): # Parse parameters axis = _parse_axis(gcmd, gcmd.get("AXIS").lower()) - accel_chips = gcmd.get("CHIPS", None) - test_point = gcmd.get("POINT", None) - - if test_point: - test_coords = test_point.split(',') - if len(test_coords) != 3: - raise gcmd.error("Invalid POINT parameter, must be 'x,y,z'") - try: - test_point = [float(p.strip()) for p in test_coords] - except ValueError: - raise gcmd.error("Invalid POINT parameter, must be 'x,y,z'" - " where x, y and z are valid floating point numbers") - - if accel_chips: - parsed_chips = [] - for chip_name in accel_chips.split(','): - if "adxl345" in chip_name: - chip_lookup_name = chip_name.strip() - else: - chip_lookup_name = "adxl345 " + chip_name.strip(); - chip = self.printer.lookup_object(chip_lookup_name) - parsed_chips.append(chip) outputs = gcmd.get("OUTPUT", "resonances").lower().split(',') for output in outputs: @@ -256,13 +232,10 @@ class ResonanceTester: data = self._run_test( gcmd, [axis], helper, - raw_name_suffix=name_suffix if raw_output else None, - accel_chips=parsed_chips if accel_chips else None, - test_point=test_point)[axis] + raw_name_suffix=name_suffix if raw_output else None)[axis] if csv_output: csv_name = self.save_calibration_data('resonances', name_suffix, - helper, axis, data, - point=test_point) + helper, axis, data) gcmd.respond_info( "Resonances data written to %s file" % (csv_name,)) cmd_SHAPER_CALIBRATE_help = ( @@ -316,18 +289,14 @@ class ResonanceTester: "Measures noise of all enabled accelerometer chips") def cmd_MEASURE_AXES_NOISE(self, gcmd): meas_time = gcmd.get_float("MEAS_TIME", 2.) - raw_values = [(chip_axis, chip.start_internal_client()) - for chip_axis, chip in self.accel_chips] + for _, chip in self.accel_chips: + chip.start_measurements() self.printer.lookup_object('toolhead').dwell(meas_time) - for chip_axis, aclient in raw_values: - aclient.finish_measurements() + raw_values = [(chip_axis, chip.finish_measurements()) + for chip_axis, chip in self.accel_chips] helper = shaper_calibrate.ShaperCalibrate(self.printer) - for chip_axis, aclient in raw_values: - if not aclient.has_valid_samples(): - raise gcmd.error( - "%s-axis accelerometer measured no data" % ( - chip_axis,)) - data = helper.process_accelerometer_data(aclient) + for chip_axis, raw_data in raw_values: + data = helper.process_accelerometer_data(raw_data) vx = data.psd_x.mean() vy = data.psd_y.mean() vz = data.psd_z.mean() @@ -338,22 +307,18 @@ class ResonanceTester: def is_valid_name_suffix(self, name_suffix): return name_suffix.replace('-', '').replace('_', '').isalnum() - def get_filename(self, base, name_suffix, axis=None, - point=None, chip_name=None): + def get_filename(self, base, name_suffix, axis=None, point=None): name = base if axis: name += '_' + axis.get_name() - if chip_name: - name += '_' + chip_name.replace(" ", "_") if point: name += "_%.3f_%.3f_%.3f" % (point[0], point[1], point[2]) name += '_' + name_suffix return os.path.join("/tmp", name + ".csv") def save_calibration_data(self, base_name, name_suffix, shaper_calibrate, - axis, calibration_data, - all_shapers=None, point=None): - output = self.get_filename(base_name, name_suffix, axis, point) + axis, calibration_data, all_shapers=None): + output = self.get_filename(base_name, name_suffix, axis) shaper_calibrate.save_calibration_data(output, calibration_data, all_shapers) return output diff --git a/klippy/extras/save_variables.py b/klippy/extras/save_variables.py index 3765a1a..317833c 100644 --- a/klippy/extras/save_variables.py +++ b/klippy/extras/save_variables.py @@ -18,6 +18,29 @@ class SaveVariables: gcode = self.printer.lookup_object('gcode') gcode.register_command('SAVE_VARIABLE', self.cmd_SAVE_VARIABLE, desc=self.cmd_SAVE_VARIABLE_help) + + def load_variable(self, section, option): + varfile = configparser.ConfigParser() + try: + varfile.read(self.filename) + return varfile.get(section, option) + except: + msg = "Unable to parse existing variable file" + logging.exception(msg) + raise self.printer.command_error(msg) + def save_variable(self, section, option, value): + varfile = configparser.ConfigParser() + try: + varfile.read(self.filename) + if not varfile.has_section(section): + varfile.add_section(section) + varfile.set(section, option, value) + with open(self.filename, 'w') as configfile: + varfile.write(configfile) + except Exception as e: + msg = "Unable to save variable" + logging.exception(msg) + raise self.printer.command_error(msg) def loadVariables(self): allvars = {} varfile = configparser.ConfigParser() diff --git a/klippy/extras/shaper_calibrate.py b/klippy/extras/shaper_calibrate.py index 4ac5897..3c6065d 100644 --- a/klippy/extras/shaper_calibrate.py +++ b/klippy/extras/shaper_calibrate.py @@ -3,17 +3,129 @@ # Copyright (C) 2020 Dmitry Butyugin # # This file may be distributed under the terms of the GNU GPLv3 license. -import collections, importlib, logging, math, multiprocessing, traceback -shaper_defs = importlib.import_module('.shaper_defs', 'extras') +import collections, importlib, logging, math, multiprocessing MIN_FREQ = 5. MAX_FREQ = 200. WINDOW_T_SEC = 0.5 MAX_SHAPER_FREQ = 150. +SHAPER_VIBRATION_REDUCTION=20. TEST_DAMPING_RATIOS=[0.075, 0.1, 0.15] +SHAPER_DAMPING_RATIO = 0.1 -AUTOTUNE_SHAPERS = ['zv', 'mzv', 'ei', '2hump_ei', '3hump_ei'] +###################################################################### +# Input shapers +###################################################################### + +InputShaperCfg = collections.namedtuple( + 'InputShaperCfg', ('name', 'init_func', 'min_freq')) + +def get_zv_shaper(shaper_freq, damping_ratio): + df = math.sqrt(1. - damping_ratio**2) + K = math.exp(-damping_ratio * math.pi / df) + t_d = 1. / (shaper_freq * df) + A = [1., K] + T = [0., .5*t_d] + return (A, T) + +def get_zvd_shaper(shaper_freq, damping_ratio): + df = math.sqrt(1. - damping_ratio**2) + K = math.exp(-damping_ratio * math.pi / df) + t_d = 1. / (shaper_freq * df) + A = [1., 2.*K, K**2] + T = [0., .5*t_d, t_d] + return (A, T) + +def get_mzv_shaper(shaper_freq, damping_ratio): + df = math.sqrt(1. - damping_ratio**2) + K = math.exp(-.75 * damping_ratio * math.pi / df) + t_d = 1. / (shaper_freq * df) + + a1 = 1. - 1. / math.sqrt(2.) + a2 = (math.sqrt(2.) - 1.) * K + a3 = a1 * K * K + + A = [a1, a2, a3] + T = [0., .375*t_d, .75*t_d] + return (A, T) + +def get_ei_shaper(shaper_freq, damping_ratio): + v_tol = 1. / SHAPER_VIBRATION_REDUCTION # vibration tolerance + df = math.sqrt(1. - damping_ratio**2) + K = math.exp(-damping_ratio * math.pi / df) + t_d = 1. / (shaper_freq * df) + + a1 = .25 * (1. + v_tol) + a2 = .5 * (1. - v_tol) * K + a3 = a1 * K * K + + A = [a1, a2, a3] + T = [0., .5*t_d, t_d] + return (A, T) + +def get_2hump_ei_shaper(shaper_freq, damping_ratio): + v_tol = 1. / SHAPER_VIBRATION_REDUCTION # vibration tolerance + df = math.sqrt(1. - damping_ratio**2) + K = math.exp(-damping_ratio * math.pi / df) + t_d = 1. / (shaper_freq * df) + + V2 = v_tol**2 + X = pow(V2 * (math.sqrt(1. - V2) + 1.), 1./3.) + a1 = (3.*X*X + 2.*X + 3.*V2) / (16.*X) + a2 = (.5 - a1) * K + a3 = a2 * K + a4 = a1 * K * K * K + + A = [a1, a2, a3, a4] + T = [0., .5*t_d, t_d, 1.5*t_d] + return (A, T) + +def get_3hump_ei_shaper(shaper_freq, damping_ratio): + v_tol = 1. / SHAPER_VIBRATION_REDUCTION # vibration tolerance + df = math.sqrt(1. - damping_ratio**2) + K = math.exp(-damping_ratio * math.pi / df) + t_d = 1. / (shaper_freq * df) + + K2 = K*K + a1 = 0.0625 * (1. + 3. * v_tol + 2. * math.sqrt(2. * (v_tol + 1.) * v_tol)) + a2 = 0.25 * (1. - v_tol) * K + a3 = (0.5 * (1. + v_tol) - 2. * a1) * K2 + a4 = a2 * K2 + a5 = a1 * K2 * K2 + + A = [a1, a2, a3, a4, a5] + T = [0., .5*t_d, t_d, 1.5*t_d, 2.*t_d] + return (A, T) + +def get_shaper_smoothing(shaper, accel=5000, scv=5.): + half_accel = accel * .5 + + A, T = shaper + inv_D = 1. / sum(A) + n = len(T) + # Calculate input shaper shift + ts = sum([A[i] * T[i] for i in range(n)]) * inv_D + + # Calculate offset for 90 and 180 degrees turn + offset_90 = offset_180 = 0. + for i in range(n): + if T[i] >= ts: + # Calculate offset for one of the axes + offset_90 += A[i] * (scv + half_accel * (T[i]-ts)) * (T[i]-ts) + offset_180 += A[i] * half_accel * (T[i]-ts)**2 + offset_90 *= inv_D * math.sqrt(2.) + offset_180 *= inv_D + return max(offset_90, offset_180) + +# min_freq for each shaper is chosen to have projected max_accel ~= 1500 +INPUT_SHAPERS = [ + InputShaperCfg('zv', get_zv_shaper, min_freq=21.), + InputShaperCfg('mzv', get_mzv_shaper, min_freq=23.), + InputShaperCfg('ei', get_ei_shaper, min_freq=29.), + InputShaperCfg('2hump_ei', get_2hump_ei_shaper, min_freq=39.), + InputShaperCfg('3hump_ei', get_3hump_ei_shaper, min_freq=48.), +] ###################################################################### # Frequency response calculation and shaper auto-tuning @@ -152,10 +264,7 @@ class ShaperCalibrate: if isinstance(raw_values, np.ndarray): data = raw_values else: - samples = raw_values.get_samples() - if not samples: - return None - data = np.array(samples) + data = np.array(raw_values.decode_samples()) N = data.shape[0] T = data[-1,0] - data[0,0] @@ -201,32 +310,12 @@ class ShaperCalibrate: # The input shaper can only reduce the amplitude of vibrations by # SHAPER_VIBRATION_REDUCTION times, so all vibrations below that # threshold can be igonred - vibr_threshold = psd.max() / shaper_defs.SHAPER_VIBRATION_REDUCTION + vibrations_threshold = psd.max() / SHAPER_VIBRATION_REDUCTION remaining_vibrations = self.numpy.maximum( - vals * psd - vibr_threshold, 0).sum() - all_vibrations = self.numpy.maximum(psd - vibr_threshold, 0).sum() + vals * psd - vibrations_threshold, 0).sum() + all_vibrations = self.numpy.maximum(psd - vibrations_threshold, 0).sum() return (remaining_vibrations / all_vibrations, vals) - def _get_shaper_smoothing(self, shaper, accel=5000, scv=5.): - half_accel = accel * .5 - - A, T = shaper - inv_D = 1. / sum(A) - n = len(T) - # Calculate input shaper shift - ts = sum([A[i] * T[i] for i in range(n)]) * inv_D - - # Calculate offset for 90 and 180 degrees turn - offset_90 = offset_180 = 0. - for i in range(n): - if T[i] >= ts: - # Calculate offset for one of the axes - offset_90 += A[i] * (scv + half_accel * (T[i]-ts)) * (T[i]-ts) - offset_180 += A[i] * half_accel * (T[i]-ts)**2 - offset_90 *= inv_D * math.sqrt(2.) - offset_180 *= inv_D - return max(offset_90, offset_180) - def fit_shaper(self, shaper_cfg, calibration_data, max_smoothing): np = self.numpy @@ -241,9 +330,8 @@ class ShaperCalibrate: for test_freq in test_freqs[::-1]: shaper_vibrations = 0. shaper_vals = np.zeros(shape=freq_bins.shape) - shaper = shaper_cfg.init_func( - test_freq, shaper_defs.DEFAULT_DAMPING_RATIO) - shaper_smoothing = self._get_shaper_smoothing(shaper) + shaper = shaper_cfg.init_func(test_freq, SHAPER_DAMPING_RATIO) + shaper_smoothing = get_shaper_smoothing(shaper) if max_smoothing and shaper_smoothing > max_smoothing and best_res: return best_res # Exact damping ratio of the printer is unknown, pessimizing @@ -296,16 +384,14 @@ class ShaperCalibrate: # Just some empirically chosen value which produces good projections # for max_accel without much smoothing TARGET_SMOOTHING = 0.12 - max_accel = self._bisect(lambda test_accel: self._get_shaper_smoothing( + max_accel = self._bisect(lambda test_accel: get_shaper_smoothing( shaper, test_accel) <= TARGET_SMOOTHING) return max_accel def find_best_shaper(self, calibration_data, max_smoothing, logger=None): best_shaper = None all_shapers = [] - for shaper_cfg in shaper_defs.INPUT_SHAPERS: - if shaper_cfg.name not in AUTOTUNE_SHAPERS: - continue + for shaper_cfg in INPUT_SHAPERS: shaper = self.background_process_exec(self.fit_shaper, ( shaper_cfg, calibration_data, max_smoothing)) if logger is not None: diff --git a/klippy/pins.py b/klippy/pins.py index 35fc58a..c8742dd 100644 --- a/klippy/pins.py +++ b/klippy/pins.py @@ -129,6 +129,11 @@ class PrinterPins: raise error("Duplicate chip name '%s'" % (chip_name,)) self.chips[chip_name] = chip self.pin_resolvers[chip_name] = PinResolver() + def remove_chip(self, chip_name): + if chip_name not in self.chips: + raise error("Chip '%s' not found" % (chip_name,)) + del self.chips[chip_name] + del self.pin_resolvers[chip_name] def allow_multi_use_pin(self, pin_desc): pin_params = self.parse_pin(pin_desc) share_name = "%s:%s" % (pin_params['chip_name'], pin_params['pin'])