plus4的klipper版本

This commit is contained in:
whb0514
2024-09-02 13:37:34 +08:00
parent 653d7a8f6e
commit b90736975b
1006 changed files with 1195894 additions and 11114 deletions

View File

@@ -3,13 +3,14 @@
# Copyright (C) 2021,2022 Kevin O'Connor <kevin@koconnor.net>
#
# This file may be distributed under the terms of the GNU GPLv3 license.
import logging, math, threading
from . import bus, motion_report
import logging, math
from . import bus, bulk_sensor
MIN_MSG_TIME = 0.100
TCODE_ERROR = 0xff
TRINAMIC_DRIVERS = ["tmc2130", "tmc2208", "tmc2209", "tmc2240", "tmc2660", "tmc5160"]
TRINAMIC_DRIVERS = ["tmc2130", "tmc2208", "tmc2209", "tmc2240", "tmc2660",
"tmc5160"]
CALIBRATION_BITS = 6 # 64 entries
ANGLE_BITS = 16 # angles range from 0..65535
@@ -84,9 +85,9 @@ class AngleCalibration:
cal2 = calibration[bucket + 1]
adj = (angle & interp_mask) * (cal2 - cal1)
adj = cal1 + ((adj + interp_round) >> interp_bits)
angle_diff = (angle - adj) & 0xffff
angle_diff = (adj - angle) & 0xffff
angle_diff -= (angle_diff & 0x8000) << 1
new_angle = angle - angle_diff
new_angle = angle + angle_diff
if calibration_reversed:
new_angle = -new_angle
samples[i] = (samp_time, new_angle)
@@ -156,8 +157,14 @@ class AngleCalibration:
def do_calibration_moves(self):
move = self.printer.lookup_object('force_move').manual_move
# Start data collection
angle_sensor = self.printer.lookup_object(self.name)
cconn = angle_sensor.start_internal_client()
msgs = []
is_finished = False
def handle_batch(msg):
if is_finished:
return False
msgs.append(msg)
return True
self.printer.lookup_object(self.name).add_client(handle_batch)
# Move stepper several turns (to allow internal sensor calibration)
microsteps, full_steps = self.get_microsteps()
mcu_stepper = self.mcu_stepper
@@ -189,13 +196,12 @@ class AngleCalibration:
move(mcu_stepper, .5*rotation_dist + align_dist, move_speed)
toolhead.wait_moves()
# Finish data collection
cconn.finalize()
msgs = cconn.get_messages()
is_finished = True
# Correlate query responses
cal = {}
step = 0
for msg in msgs:
for query_time, pos in msg['params']['data']:
for query_time, pos in msg['data']:
# Add to step tracking
while step < len(times) and query_time > times[step][1]:
step += 1
@@ -374,9 +380,9 @@ class HelperTLE5012B:
mcu_clock, chip_clock = self._query_clock()
mdiff = mcu_clock - self.last_chip_mcu_clock
chip_mclock = self.last_chip_clock + int(mdiff * self.chip_freq + .5)
cdiff = (chip_mclock - chip_clock) & 0xffff
cdiff = (chip_clock - chip_mclock) & 0xffff
cdiff -= (cdiff & 0x8000) << 1
new_chip_clock = chip_mclock - cdiff
new_chip_clock = chip_mclock + cdiff
self.chip_freq = float(new_chip_clock - self.last_chip_clock) / mdiff
self.last_chip_clock = new_chip_clock
self.last_chip_mcu_clock = mcu_clock
@@ -405,7 +411,11 @@ class HelperTLE5012B:
parser=lambda x: int(x, 0))
self._write_reg(reg, val)
BYTES_PER_SAMPLE = 3
SAMPLES_PER_BLOCK = 16
SAMPLE_PERIOD = 0.000400
BATCH_UPDATES = 0.100
class Angle:
def __init__(self, config):
@@ -416,9 +426,6 @@ class Angle:
# Measurement conversion
self.start_clock = self.time_shift = self.sample_ticks = 0
self.last_sequence = self.last_angle = 0
# Measurement storage (accessed from background thread)
self.lock = threading.Lock()
self.raw_samples = []
# Sensor type
sensors = { "a1333": HelperA1333, "as5047d": HelperAS5047D,
"tle5012b": HelperTLE5012B }
@@ -438,15 +445,15 @@ class Angle:
"query_spi_angle oid=%d clock=0 rest_ticks=0 time_shift=0"
% (oid,), on_restart=True)
mcu.register_config_callback(self._build_config)
mcu.register_response(self._handle_spi_angle_data,
"spi_angle_data", oid)
# API server endpoints
self.api_dump = motion_report.APIDumpHelper(
self.printer, self._api_update, self._api_startstop, 0.100)
self.bulk_queue = bulk_sensor.BulkDataQueue(mcu, "spi_angle_data", oid)
# Process messages in batches
self.batch_bulk = bulk_sensor.BatchBulkHelper(
self.printer, self._process_batch,
self._start_measurements, self._finish_measurements, BATCH_UPDATES)
self.name = config.get_name().split()[1]
wh = self.printer.lookup_object('webhooks')
wh.register_mux_endpoint("angle/dump_angle", "sensor", self.name,
self._handle_dump_angle)
api_resp = {'header': ('time', 'angle')}
self.batch_bulk.add_mux_endpoint("angle/dump_angle",
"sensor", self.name, api_resp)
def _build_config(self):
freq = self.mcu.seconds_to_clock(1.)
while float(TCODE_ERROR << self.time_shift) / freq < 0.002:
@@ -460,12 +467,9 @@ class Angle:
"spi_angle_end oid=%c sequence=%hu", oid=self.oid, cq=cmdqueue)
def get_status(self, eventtime=None):
return {'temperature': self.sensor_helper.last_temperature}
# Measurement collection
def is_measuring(self):
return self.start_clock != 0
def _handle_spi_angle_data(self, params):
with self.lock:
self.raw_samples.append(params)
def add_client(self, client_cb):
self.batch_bulk.add_client(client_cb)
# Measurement decoding
def _extract_samples(self, raw_samples):
# Load variables to optimize inner loop below
sample_ticks = self.sample_ticks
@@ -486,23 +490,23 @@ class Angle:
static_delay = self.sensor_helper.get_static_delay()
# Process every message in raw_samples
count = error_count = 0
samples = [None] * (len(raw_samples) * 16)
samples = [None] * (len(raw_samples) * SAMPLES_PER_BLOCK)
for params in raw_samples:
seq = (last_sequence & ~0xffff) | params['sequence']
if seq < last_sequence:
seq += 0x10000
last_sequence = seq
seq_diff = (params['sequence'] - last_sequence) & 0xffff
last_sequence += seq_diff
samp_count = last_sequence * SAMPLES_PER_BLOCK
msg_mclock = start_clock + samp_count*sample_ticks
d = bytearray(params['data'])
msg_mclock = start_clock + seq*16*sample_ticks
for i in range(len(d) // 3):
tcode = d[i*3]
for i in range(len(d) // BYTES_PER_SAMPLE):
d_ta = d[i*BYTES_PER_SAMPLE:(i+1)*BYTES_PER_SAMPLE]
tcode = d_ta[0]
if tcode == TCODE_ERROR:
error_count += 1
continue
raw_angle = d[i*3 + 1] | (d[i*3 + 2] << 8)
angle_diff = (last_angle - raw_angle) & 0xffff
raw_angle = d_ta[1] | (d_ta[2] << 8)
angle_diff = (raw_angle - last_angle) & 0xffff
angle_diff -= (angle_diff & 0x8000) << 1
last_angle -= angle_diff
last_angle += angle_diff
mclock = msg_mclock + i*sample_ticks
if is_tcode_absolute:
# tcode is tle5012b frame counter
@@ -521,29 +525,14 @@ class Angle:
self.last_angle = last_angle
del samples[count:]
return samples, error_count
# API interface
def _api_update(self, eventtime):
if self.sensor_helper.is_tcode_absolute:
self.sensor_helper.update_clock()
with self.lock:
raw_samples = self.raw_samples
self.raw_samples = []
if not raw_samples:
return {}
samples, error_count = self._extract_samples(raw_samples)
if not samples:
return {}
offset = self.calibration.apply_calibration(samples)
return {'data': samples, 'errors': error_count,
'position_offset': offset}
# Start, stop, and process message batches
def _is_measuring(self):
return self.start_clock != 0
def _start_measurements(self):
if self.is_measuring():
return
logging.info("Starting angle '%s' measurements", self.name)
self.sensor_helper.start()
# Start bulk reading
with self.lock:
self.raw_samples = []
self.bulk_queue.clear_samples()
self.last_sequence = 0
systime = self.printer.get_reactor().monotonic()
print_time = self.mcu.estimated_print_time(systime) + MIN_MSG_TIME
@@ -553,26 +542,23 @@ class Angle:
self.query_spi_angle_cmd.send([self.oid, reqclock, rest_ticks,
self.time_shift], reqclock=reqclock)
def _finish_measurements(self):
if not self.is_measuring():
return
# Halt bulk reading
params = self.query_spi_angle_end_cmd.send([self.oid, 0, 0, 0])
self.start_clock = 0
with self.lock:
self.raw_samples = []
self.bulk_queue.clear_samples()
self.sensor_helper.last_temperature = None
logging.info("Stopped angle '%s' measurements", self.name)
def _api_startstop(self, is_start):
if is_start:
self._start_measurements()
else:
self._finish_measurements()
def _handle_dump_angle(self, web_request):
self.api_dump.add_client(web_request)
hdr = ('time', 'angle')
web_request.send({'header': hdr})
def start_internal_client(self):
return self.api_dump.add_internal_client()
def _process_batch(self, eventtime):
if self.sensor_helper.is_tcode_absolute:
self.sensor_helper.update_clock()
raw_samples = self.bulk_queue.pull_samples()
if not raw_samples:
return {}
samples, error_count = self._extract_samples(raw_samples)
if not samples:
return {}
offset = self.calibration.apply_calibration(samples)
return {'data': samples, 'errors': error_count,
'position_offset': offset}
def load_config_prefix(config):
return Angle(config)