加入异常数据抓取和过滤
This commit is contained in:
parent
c22cc75597
commit
6934e6c7a3
@ -102,12 +102,20 @@ class ProgressActionClient(Node):
|
|||||||
self.declare_parameter('ema_alpha', 0.15)
|
self.declare_parameter('ema_alpha', 0.15)
|
||||||
self.declare_parameter('kalman_q', 1e-5)
|
self.declare_parameter('kalman_q', 1e-5)
|
||||||
self.declare_parameter('kalman_r', 1e-2)
|
self.declare_parameter('kalman_r', 1e-2)
|
||||||
|
self.declare_parameter('enable_jump_log', False)
|
||||||
|
self.declare_parameter('jump_velocity_threshold_rad_s', 3.0)
|
||||||
|
self.declare_parameter('jump_log_min_interval_sec', 0.2)
|
||||||
|
self.declare_parameter('joint_update_epsilon_rad', 1e-4)
|
||||||
|
|
||||||
self.enable_filter = self.get_parameter('enable_filter').get_parameter_value().bool_value
|
self.enable_filter = self.get_parameter('enable_filter').get_parameter_value().bool_value
|
||||||
self.filter_type = self.get_parameter('filter_type').get_parameter_value().string_value
|
self.filter_type = self.get_parameter('filter_type').get_parameter_value().string_value
|
||||||
self.ema_alpha = self.get_parameter('ema_alpha').get_parameter_value().double_value
|
self.ema_alpha = self.get_parameter('ema_alpha').get_parameter_value().double_value
|
||||||
self.kalman_q = self.get_parameter('kalman_q').get_parameter_value().double_value
|
self.kalman_q = self.get_parameter('kalman_q').get_parameter_value().double_value
|
||||||
self.kalman_r = self.get_parameter('kalman_r').get_parameter_value().double_value
|
self.kalman_r = self.get_parameter('kalman_r').get_parameter_value().double_value
|
||||||
|
self.enable_jump_log = self.get_parameter('enable_jump_log').get_parameter_value().bool_value
|
||||||
|
self.jump_velocity_threshold_rad_s = self.get_parameter('jump_velocity_threshold_rad_s').get_parameter_value().double_value
|
||||||
|
self.jump_log_min_interval_sec = self.get_parameter('jump_log_min_interval_sec').get_parameter_value().double_value
|
||||||
|
self.joint_update_epsilon_rad = self.get_parameter('joint_update_epsilon_rad').get_parameter_value().double_value
|
||||||
|
|
||||||
# Create per-joint filters
|
# Create per-joint filters
|
||||||
self.num_joints = 8
|
self.num_joints = 8
|
||||||
@ -116,9 +124,12 @@ class ProgressActionClient(Node):
|
|||||||
q=self.kalman_q,
|
q=self.kalman_q,
|
||||||
r=self.kalman_r)
|
r=self.kalman_r)
|
||||||
for _ in range(self.num_joints)]
|
for _ in range(self.num_joints)]
|
||||||
|
self.prev_raw_positions = [None] * self.num_joints
|
||||||
|
self.last_update_time_sec = [None] * self.num_joints
|
||||||
|
self.last_jump_log_time_sec = [0.0] * self.num_joints
|
||||||
|
|
||||||
# Buffer outgoing joint commands and send them at timer rate.
|
# Buffer outgoing joint commands and send them at timer rate.
|
||||||
self.command_queue = deque(maxlen=50)
|
self.command_queue = deque(maxlen=20)
|
||||||
|
|
||||||
self.subscriber_ = self.create_subscription(
|
self.subscriber_ = self.create_subscription(
|
||||||
JointState,
|
JointState,
|
||||||
@ -133,7 +144,7 @@ class ProgressActionClient(Node):
|
|||||||
# self._action_client = ActionClient(self, FollowJointTrajectory, '/right_joint_trajectory_controller/follow_joint_trajectory')
|
# self._action_client = ActionClient(self, FollowJointTrajectory, '/right_joint_trajectory_controller/follow_joint_trajectory')
|
||||||
# self.right_gripper_action_client = ActionClient(self, FollowJointTrajectory, '/right_joint_trajectory_controller/follow_joint_trajectory')
|
# self.right_gripper_action_client = ActionClient(self, FollowJointTrajectory, '/right_joint_trajectory_controller/follow_joint_trajectory')
|
||||||
|
|
||||||
self.create_timer(0.02, self.timer_callback) # 定时器,周期为0.02秒
|
self.create_timer(0.01, self.timer_callback) # 定时器,周期为0.02秒
|
||||||
# 3. 创建订阅者
|
# 3. 创建订阅者
|
||||||
# 参数1:消息类型(如String,与发布者一致)
|
# 参数1:消息类型(如String,与发布者一致)
|
||||||
# 参数2:话题名称(如"chatter",与发布者一致)
|
# 参数2:话题名称(如"chatter",与发布者一致)
|
||||||
@ -150,6 +161,42 @@ class ProgressActionClient(Node):
|
|||||||
# self.get_logger().info(f'Received joint states: {list(msg.name)} positions: {list(msg.position)}')
|
# self.get_logger().info(f'Received joint states: {list(msg.name)} positions: {list(msg.position)}')
|
||||||
# Apply position filtering if enabled
|
# Apply position filtering if enabled
|
||||||
arm_pos = list(msg.position)
|
arm_pos = list(msg.position)
|
||||||
|
|
||||||
|
# 基于“每关节速度阈值”检测异常,减少仅靠角度差导致的误报
|
||||||
|
if self.enable_jump_log and len(arm_pos) >= self.num_joints:
|
||||||
|
now_sec = self.get_clock().now().nanoseconds * 1e-9
|
||||||
|
for i in range(self.num_joints):
|
||||||
|
try:
|
||||||
|
curr = float(arm_pos[i])
|
||||||
|
except Exception:
|
||||||
|
continue
|
||||||
|
|
||||||
|
prev = self.prev_raw_positions[i]
|
||||||
|
if prev is None:
|
||||||
|
self.prev_raw_positions[i] = curr
|
||||||
|
self.last_update_time_sec[i] = now_sec
|
||||||
|
continue
|
||||||
|
|
||||||
|
delta = curr - prev
|
||||||
|
if abs(delta) <= self.joint_update_epsilon_rad:
|
||||||
|
continue
|
||||||
|
|
||||||
|
last_time = self.last_update_time_sec[i]
|
||||||
|
dt = 0.0 if last_time is None else (now_sec - last_time)
|
||||||
|
if dt > 1e-6:
|
||||||
|
speed = abs(delta) / dt
|
||||||
|
if speed >= self.jump_velocity_threshold_rad_s:
|
||||||
|
since_last_log = now_sec - self.last_jump_log_time_sec[i]
|
||||||
|
if since_last_log >= self.jump_log_min_interval_sec:
|
||||||
|
self.get_logger().warning(
|
||||||
|
f'Joint{i + 1} jump detected: prev={prev:.6f} curr={curr:.6f} '
|
||||||
|
f'delta={delta:.6f} rad dt={dt:.4f} s speed={speed:.3f} rad/s'
|
||||||
|
)
|
||||||
|
self.last_jump_log_time_sec[i] = now_sec
|
||||||
|
|
||||||
|
self.prev_raw_positions[i] = curr
|
||||||
|
self.last_update_time_sec[i] = now_sec
|
||||||
|
|
||||||
if self.enable_filter:
|
if self.enable_filter:
|
||||||
if len(arm_pos) >= self.num_joints:
|
if len(arm_pos) >= self.num_joints:
|
||||||
filtered = []
|
filtered = []
|
||||||
|
|||||||
@ -47,6 +47,12 @@ class JointSensorNode(Node):
|
|||||||
self.has_position = [False] * self.num_joints
|
self.has_position = [False] * self.num_joints
|
||||||
self.last_wrapped_positions = [0.0] * self.num_joints
|
self.last_wrapped_positions = [0.0] * self.num_joints
|
||||||
self.has_wrapped_position = [False] * self.num_joints
|
self.has_wrapped_position = [False] * self.num_joints
|
||||||
|
self.last_joint_update_sec = [None] * self.num_joints
|
||||||
|
self.last_jump_log_sec = [0.0] * self.num_joints
|
||||||
|
self.last_stage_log_sec = [0.0] * self.num_joints
|
||||||
|
self.prev_raw_sensor_angle = [None] * self.num_joints
|
||||||
|
self.last_raw_jump_log_sec = [0.0] * self.num_joints
|
||||||
|
self.last_raw_reject_log_sec = [0.0] * self.num_joints
|
||||||
# 传感器原始单位:'centi-rad' 表示原始值/100后得到弧度;'centi-deg' 表示原始值/100后得到角度
|
# 传感器原始单位:'centi-rad' 表示原始值/100后得到弧度;'centi-deg' 表示原始值/100后得到角度
|
||||||
self.raw_angle_unit = 'centi-rad'
|
self.raw_angle_unit = 'centi-rad'
|
||||||
|
|
||||||
@ -56,9 +62,31 @@ class JointSensorNode(Node):
|
|||||||
self.declare_parameter('sensor_directions', [1.0] * self.num_joints)
|
self.declare_parameter('sensor_directions', [1.0] * self.num_joints)
|
||||||
self.declare_parameter('auto_zero_on_first_sample', False)
|
self.declare_parameter('auto_zero_on_first_sample', False)
|
||||||
self.declare_parameter('calibrate_now', False)
|
self.declare_parameter('calibrate_now', False)
|
||||||
|
self.declare_parameter('enable_speed_jump_log', False)
|
||||||
|
self.declare_parameter('speed_jump_threshold_rad_s', 3.0)
|
||||||
|
self.declare_parameter('jump_log_min_interval_sec', 0.2)
|
||||||
|
self.declare_parameter('joint_update_epsilon_rad', 1e-4)
|
||||||
|
self.declare_parameter('enable_stage_diag_log', False)
|
||||||
|
self.declare_parameter('stage_log_min_interval_sec', 0.5)
|
||||||
|
self.declare_parameter('enable_raw_jump_log', False)
|
||||||
|
self.declare_parameter('raw_jump_threshold_rad', 0.2)
|
||||||
|
self.declare_parameter('raw_jump_log_min_interval_sec', 0.2)
|
||||||
|
self.declare_parameter('enable_raw_jump_reject', True)
|
||||||
|
self.declare_parameter('raw_jump_reject_threshold_rad', 0.35)
|
||||||
self.zero_offsets = self._load_zero_offsets_from_param()
|
self.zero_offsets = self._load_zero_offsets_from_param()
|
||||||
self.sensor_directions = self._load_sensor_directions_from_param()
|
self.sensor_directions = self._load_sensor_directions_from_param()
|
||||||
self.auto_zero_on_first_sample = bool(self.get_parameter('auto_zero_on_first_sample').value)
|
self.auto_zero_on_first_sample = bool(self.get_parameter('auto_zero_on_first_sample').value)
|
||||||
|
self.enable_speed_jump_log = bool(self.get_parameter('enable_speed_jump_log').value)
|
||||||
|
self.speed_jump_threshold_rad_s = float(self.get_parameter('speed_jump_threshold_rad_s').value)
|
||||||
|
self.jump_log_min_interval_sec = float(self.get_parameter('jump_log_min_interval_sec').value)
|
||||||
|
self.joint_update_epsilon_rad = float(self.get_parameter('joint_update_epsilon_rad').value)
|
||||||
|
self.enable_stage_diag_log = bool(self.get_parameter('enable_stage_diag_log').value)
|
||||||
|
self.stage_log_min_interval_sec = float(self.get_parameter('stage_log_min_interval_sec').value)
|
||||||
|
self.enable_raw_jump_log = bool(self.get_parameter('enable_raw_jump_log').value)
|
||||||
|
self.raw_jump_threshold_rad = float(self.get_parameter('raw_jump_threshold_rad').value)
|
||||||
|
self.raw_jump_log_min_interval_sec = float(self.get_parameter('raw_jump_log_min_interval_sec').value)
|
||||||
|
self.enable_raw_jump_reject = bool(self.get_parameter('enable_raw_jump_reject').value)
|
||||||
|
self.raw_jump_reject_threshold_rad = float(self.get_parameter('raw_jump_reject_threshold_rad').value)
|
||||||
self.auto_zero_done = [False] * self.num_joints
|
self.auto_zero_done = [False] * self.num_joints
|
||||||
self.add_on_set_parameters_callback(self.on_set_parameters)
|
self.add_on_set_parameters_callback(self.on_set_parameters)
|
||||||
|
|
||||||
@ -75,21 +103,42 @@ class JointSensorNode(Node):
|
|||||||
self.get_logger().debug(f"忽略未映射CAN ID: 0x{msg.arbitration_id:X}")
|
self.get_logger().debug(f"忽略未映射CAN ID: 0x{msg.arbitration_id:X}")
|
||||||
return
|
return
|
||||||
|
|
||||||
|
msg_time_sec = float(msg.timestamp) if msg.timestamp is not None else (self.get_clock().now().nanoseconds * 1e-9)
|
||||||
|
raw_sensor_angle = self.extract_raw_sensor_angle_rad(msg.data)
|
||||||
|
|
||||||
|
# 原始数据跳变过大时直接丢弃该帧,避免异常值进入后续链路
|
||||||
|
if self._should_reject_raw_jump(joint_index, raw_sensor_angle, msg_time_sec, msg):
|
||||||
|
return
|
||||||
|
|
||||||
|
self._log_raw_jump(joint_index, raw_sensor_angle, msg_time_sec, msg)
|
||||||
|
|
||||||
# 每条CAN报文默认对应一个关节角和速度
|
# 每条CAN报文默认对应一个关节角和速度
|
||||||
angle, vel = self.parse_sensor_from_can(msg.data)
|
parsed_angle, vel = self.parse_sensor_from_can(msg.data)
|
||||||
direction = self.sensor_directions[joint_index]
|
direction = self.sensor_directions[joint_index]
|
||||||
angle = self.wrap_to_pi(angle * direction)
|
directed_angle = self.wrap_to_pi(parsed_angle * direction)
|
||||||
vel = vel * direction
|
vel = vel * direction
|
||||||
|
|
||||||
self.last_wrapped_positions[joint_index] = angle
|
self.last_wrapped_positions[joint_index] = directed_angle
|
||||||
self.has_wrapped_position[joint_index] = True
|
self.has_wrapped_position[joint_index] = True
|
||||||
|
|
||||||
if self.auto_zero_on_first_sample and not self.auto_zero_done[joint_index]:
|
if self.auto_zero_on_first_sample and not self.auto_zero_done[joint_index]:
|
||||||
self.zero_offsets[joint_index] = angle
|
self.zero_offsets[joint_index] = directed_angle
|
||||||
self.auto_zero_done[joint_index] = True
|
self.auto_zero_done[joint_index] = True
|
||||||
|
|
||||||
angle = self.wrap_to_pi(angle - self.zero_offsets[joint_index])
|
zeroed_angle = self.wrap_to_pi(directed_angle - self.zero_offsets[joint_index])
|
||||||
angle = self.unwrap_joint_angle(joint_index, angle)
|
angle = self.unwrap_joint_angle(joint_index, zeroed_angle)
|
||||||
|
|
||||||
|
self._log_stage_diag(
|
||||||
|
joint_index,
|
||||||
|
parsed_angle,
|
||||||
|
directed_angle,
|
||||||
|
zeroed_angle,
|
||||||
|
angle,
|
||||||
|
vel,
|
||||||
|
msg_time_sec,
|
||||||
|
)
|
||||||
|
self._log_speed_jump(joint_index, angle, msg_time_sec)
|
||||||
|
|
||||||
self.current_positions[joint_index] = angle
|
self.current_positions[joint_index] = angle
|
||||||
self.current_velocities[joint_index] = vel
|
self.current_velocities[joint_index] = vel
|
||||||
|
|
||||||
@ -168,6 +217,39 @@ class JointSensorNode(Node):
|
|||||||
if param.name == 'auto_zero_on_first_sample':
|
if param.name == 'auto_zero_on_first_sample':
|
||||||
self.auto_zero_on_first_sample = bool(param.value)
|
self.auto_zero_on_first_sample = bool(param.value)
|
||||||
|
|
||||||
|
if param.name == 'enable_speed_jump_log':
|
||||||
|
self.enable_speed_jump_log = bool(param.value)
|
||||||
|
|
||||||
|
if param.name == 'speed_jump_threshold_rad_s':
|
||||||
|
self.speed_jump_threshold_rad_s = float(param.value)
|
||||||
|
|
||||||
|
if param.name == 'jump_log_min_interval_sec':
|
||||||
|
self.jump_log_min_interval_sec = float(param.value)
|
||||||
|
|
||||||
|
if param.name == 'joint_update_epsilon_rad':
|
||||||
|
self.joint_update_epsilon_rad = float(param.value)
|
||||||
|
|
||||||
|
if param.name == 'enable_stage_diag_log':
|
||||||
|
self.enable_stage_diag_log = bool(param.value)
|
||||||
|
|
||||||
|
if param.name == 'stage_log_min_interval_sec':
|
||||||
|
self.stage_log_min_interval_sec = float(param.value)
|
||||||
|
|
||||||
|
if param.name == 'enable_raw_jump_log':
|
||||||
|
self.enable_raw_jump_log = bool(param.value)
|
||||||
|
|
||||||
|
if param.name == 'raw_jump_threshold_rad':
|
||||||
|
self.raw_jump_threshold_rad = float(param.value)
|
||||||
|
|
||||||
|
if param.name == 'raw_jump_log_min_interval_sec':
|
||||||
|
self.raw_jump_log_min_interval_sec = float(param.value)
|
||||||
|
|
||||||
|
if param.name == 'enable_raw_jump_reject':
|
||||||
|
self.enable_raw_jump_reject = bool(param.value)
|
||||||
|
|
||||||
|
if param.name == 'raw_jump_reject_threshold_rad':
|
||||||
|
self.raw_jump_reject_threshold_rad = float(param.value)
|
||||||
|
|
||||||
if param.name == 'calibrate_now' and bool(param.value):
|
if param.name == 'calibrate_now' and bool(param.value):
|
||||||
self.calibrate_zero_from_current()
|
self.calibrate_zero_from_current()
|
||||||
|
|
||||||
@ -182,6 +264,131 @@ class JointSensorNode(Node):
|
|||||||
|
|
||||||
return SetParametersResult(successful=True)
|
return SetParametersResult(successful=True)
|
||||||
|
|
||||||
|
def _log_stage_diag(
|
||||||
|
self,
|
||||||
|
joint_index: int,
|
||||||
|
parsed_angle: float,
|
||||||
|
directed_angle: float,
|
||||||
|
zeroed_angle: float,
|
||||||
|
unwrapped_angle: float,
|
||||||
|
vel: float,
|
||||||
|
now_sec: float,
|
||||||
|
):
|
||||||
|
if not self.enable_stage_diag_log:
|
||||||
|
return
|
||||||
|
|
||||||
|
since_last = now_sec - self.last_stage_log_sec[joint_index]
|
||||||
|
if since_last < self.stage_log_min_interval_sec:
|
||||||
|
return
|
||||||
|
|
||||||
|
self.last_stage_log_sec[joint_index] = now_sec
|
||||||
|
self.get_logger().info(
|
||||||
|
f"Joint{joint_index + 1} stages: parsed={parsed_angle:.6f} dir={directed_angle:.6f} "
|
||||||
|
f"zeroed={zeroed_angle:.6f} unwrapped={unwrapped_angle:.6f} vel={vel:.6f}"
|
||||||
|
)
|
||||||
|
|
||||||
|
def _log_speed_jump(self, joint_index: int, curr_unwrapped_angle: float, now_sec: float):
|
||||||
|
if not self.enable_speed_jump_log:
|
||||||
|
self.last_joint_update_sec[joint_index] = now_sec
|
||||||
|
return
|
||||||
|
|
||||||
|
prev_time = self.last_joint_update_sec[joint_index]
|
||||||
|
if prev_time is None:
|
||||||
|
self.last_joint_update_sec[joint_index] = now_sec
|
||||||
|
return
|
||||||
|
|
||||||
|
dt = now_sec - prev_time
|
||||||
|
if dt <= 1e-6:
|
||||||
|
self.last_joint_update_sec[joint_index] = now_sec
|
||||||
|
return
|
||||||
|
|
||||||
|
prev_unwrapped = self.current_positions[joint_index]
|
||||||
|
delta = curr_unwrapped_angle - prev_unwrapped
|
||||||
|
if abs(delta) <= self.joint_update_epsilon_rad:
|
||||||
|
self.last_joint_update_sec[joint_index] = now_sec
|
||||||
|
return
|
||||||
|
|
||||||
|
speed = abs(delta) / dt
|
||||||
|
if speed >= self.speed_jump_threshold_rad_s:
|
||||||
|
since_last_log = now_sec - self.last_jump_log_sec[joint_index]
|
||||||
|
if since_last_log >= self.jump_log_min_interval_sec:
|
||||||
|
self.get_logger().warning(
|
||||||
|
f"Joint{joint_index + 1} speed jump: prev={prev_unwrapped:.6f} curr={curr_unwrapped_angle:.6f} "
|
||||||
|
f"delta={delta:.6f} rad dt={dt:.4f} s speed={speed:.3f} rad/s"
|
||||||
|
)
|
||||||
|
self.last_jump_log_sec[joint_index] = now_sec
|
||||||
|
|
||||||
|
self.last_joint_update_sec[joint_index] = now_sec
|
||||||
|
|
||||||
|
def _log_raw_jump(self, joint_index: int, raw_sensor_angle: float, now_sec: float, msg: can.Message):
|
||||||
|
if not self.enable_raw_jump_log or raw_sensor_angle is None:
|
||||||
|
if raw_sensor_angle is not None:
|
||||||
|
self.prev_raw_sensor_angle[joint_index] = raw_sensor_angle
|
||||||
|
return
|
||||||
|
|
||||||
|
prev = self.prev_raw_sensor_angle[joint_index]
|
||||||
|
if prev is None:
|
||||||
|
self.prev_raw_sensor_angle[joint_index] = raw_sensor_angle
|
||||||
|
return
|
||||||
|
|
||||||
|
# 原始角度做最短角距离,避免 0/2pi 边界导致假跳变
|
||||||
|
delta = self.wrap_to_pi(raw_sensor_angle - prev)
|
||||||
|
if abs(delta) >= self.raw_jump_threshold_rad:
|
||||||
|
since_last = now_sec - self.last_raw_jump_log_sec[joint_index]
|
||||||
|
if since_last >= self.raw_jump_log_min_interval_sec:
|
||||||
|
self.get_logger().warning(
|
||||||
|
f"Joint{joint_index + 1} RAW jump: prev_raw={prev:.6f} curr_raw={raw_sensor_angle:.6f} "
|
||||||
|
f"delta={delta:.6f} rad can_id=0x{msg.arbitration_id:X} data={msg.data.hex()}"
|
||||||
|
)
|
||||||
|
self.last_raw_jump_log_sec[joint_index] = now_sec
|
||||||
|
|
||||||
|
self.prev_raw_sensor_angle[joint_index] = raw_sensor_angle
|
||||||
|
|
||||||
|
def _should_reject_raw_jump(self, joint_index: int, raw_sensor_angle: float, now_sec: float, msg: can.Message) -> bool:
|
||||||
|
if raw_sensor_angle is None:
|
||||||
|
return False
|
||||||
|
|
||||||
|
prev = self.prev_raw_sensor_angle[joint_index]
|
||||||
|
if prev is None:
|
||||||
|
self.prev_raw_sensor_angle[joint_index] = raw_sensor_angle
|
||||||
|
return False
|
||||||
|
|
||||||
|
if not self.enable_raw_jump_reject:
|
||||||
|
return False
|
||||||
|
|
||||||
|
delta = self.wrap_to_pi(raw_sensor_angle - prev)
|
||||||
|
if abs(delta) < self.raw_jump_reject_threshold_rad:
|
||||||
|
return False
|
||||||
|
|
||||||
|
since_last = now_sec - self.last_raw_reject_log_sec[joint_index]
|
||||||
|
if since_last >= self.raw_jump_log_min_interval_sec:
|
||||||
|
self.get_logger().warning(
|
||||||
|
f"Joint{joint_index + 1} RAW frame dropped: prev_raw={prev:.6f} curr_raw={raw_sensor_angle:.6f} "
|
||||||
|
f"delta={delta:.6f} rad threshold={self.raw_jump_reject_threshold_rad:.6f} "
|
||||||
|
f"can_id=0x{msg.arbitration_id:X} data={msg.data.hex()}"
|
||||||
|
)
|
||||||
|
self.last_raw_reject_log_sec[joint_index] = now_sec
|
||||||
|
|
||||||
|
return True
|
||||||
|
|
||||||
|
def extract_raw_sensor_angle_rad(self, data: bytes):
|
||||||
|
if len(data) == 0:
|
||||||
|
return None
|
||||||
|
|
||||||
|
# 特殊格式:0xAA + 16bit角度,转换为 0~2pi
|
||||||
|
if data[0] == 0xAA:
|
||||||
|
if len(data) < 3:
|
||||||
|
return None
|
||||||
|
raw = (data[1] << 8) | data[2]
|
||||||
|
return (raw / 4096.0) * (2.0 * math.pi)
|
||||||
|
|
||||||
|
if len(data) < 2:
|
||||||
|
return None
|
||||||
|
|
||||||
|
# 通用格式:前2字节角度,默认 centi-rad
|
||||||
|
raw = int.from_bytes(data[0:2], byteorder='big', signed=True)
|
||||||
|
return self.raw_to_rad(raw)
|
||||||
|
|
||||||
def wrap_to_pi(self, angle_rad: float) -> float:
|
def wrap_to_pi(self, angle_rad: float) -> float:
|
||||||
# 归一化到 [-pi, pi),解决 0~2pi 到 ±pi 的过零映射问题
|
# 归一化到 [-pi, pi),解决 0~2pi 到 ±pi 的过零映射问题
|
||||||
return (angle_rad + math.pi) % (2.0 * math.pi) - math.pi
|
return (angle_rad + math.pi) % (2.0 * math.pi) - math.pi
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user