joint sensor增加双臂控制

This commit is contained in:
shenchenyang 2026-04-08 15:50:35 +08:00
parent 6934e6c7a3
commit 96b00bd0a1
5 changed files with 330 additions and 256 deletions

View File

@ -0,0 +1,15 @@
joint_control_node:
ros__parameters:
enable_filter: true
filter_type: ema
ema_alpha: 0.15
kalman_q: 1.0e-5
kalman_r: 1.0e-2
right_joint_states_topic: /right_joint_sensor_states
left_joint_states_topic: /left_joint_sensor_states
right_arm_cmd_topic: /right_forward_position_controller/commands
right_gripper_cmd_topic: /right_gripper_controller/commands
left_arm_cmd_topic: /left_forward_position_controller/commands
left_gripper_cmd_topic: /left_gripper_controller/commands

View File

@ -1,6 +1,36 @@
joint_sensor_node:
ros__parameters:
# CAN and topic settings (dual-arm)
can_interface: can2
right_can_interface: can2
left_can_interface: can3
enable_left_arm: true
right_joint_states_topic: /right_joint_sensor_states
left_joint_states_topic: /left_joint_sensor_states
# Right arm calibration
zero_offsets: [2.7995149378912956, 2.5233983960718795, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
sensor_directions: [-1.0, 1.0, -1.0, -1.0, -1.0, 1.0, 1.0, -1.0]
auto_zero_on_first_sample: true
can_interface: 'can2'
# Left arm calibration
left_zero_offsets: [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
left_sensor_directions: [-1.0, 1.0, -1.0, 1.0, -1.0, 1.0, -1.0, 1.0]
left_auto_zero_on_first_sample: true
# Runtime calibration triggers
calibrate_now: false
left_calibrate_now: false
# Diagnostics and anomaly filtering
enable_stage_diag_log: false
stage_log_min_interval_sec: 0.5
enable_speed_jump_log: false
speed_jump_threshold_rad_s: 3.0
jump_log_min_interval_sec: 0.2
joint_update_epsilon_rad: 1.0e-4
enable_raw_jump_log: false
raw_jump_threshold_rad: 0.2
raw_jump_log_min_interval_sec: 0.2
enable_raw_jump_reject: true
raw_jump_reject_threshold_rad: 0.35

View File

@ -13,12 +13,22 @@ def generate_launch_description():
'config',
'joint_sensor_zero_offsets.yaml',
)
default_control_params_file = os.path.join(
package_share,
'config',
'joint_control_dual_arm.yaml',
)
params_file_arg = DeclareLaunchArgument(
'params_file',
default_value=default_params_file,
description='Path to the YAML parameter file for joint_sensor_can node.',
)
control_params_file_arg = DeclareLaunchArgument(
'control_params_file',
default_value=default_control_params_file,
description='Path to the YAML parameter file for joint_control_node.',
)
joint_sensor_node = Node(
package='openarm_joint_sensor',
@ -33,10 +43,12 @@ def generate_launch_description():
executable='joint_control',
name='joint_control_node',
output='screen',
parameters=[LaunchConfiguration('control_params_file')],
)
delayed_joint_control_node = TimerAction(period=3.0, actions=[joint_control_node])
delayed_joint_control_node = TimerAction(period=1.0, actions=[joint_control_node])
return LaunchDescription([
params_file_arg,
control_params_file_arg,
joint_sensor_node,
delayed_joint_control_node,
])

View File

@ -91,112 +91,92 @@ class PositionFilter:
return float(z)
class ProgressActionClient(Node):
def __init__(self):
super().__init__('progress_action_client')
# Parameters for optional position filtering
self.num_joints = 8
# Parameters for optional position filtering and dual-arm topics
self.declare_parameter('enable_filter', True)
self.declare_parameter('filter_type', 'ema')
self.declare_parameter('ema_alpha', 0.15)
self.declare_parameter('kalman_q', 1e-5)
self.declare_parameter('kalman_r', 1e-2)
self.declare_parameter('enable_jump_log', False)
self.declare_parameter('jump_velocity_threshold_rad_s', 3.0)
self.declare_parameter('jump_log_min_interval_sec', 0.2)
self.declare_parameter('joint_update_epsilon_rad', 1e-4)
self.declare_parameter('right_joint_states_topic', '/right_joint_sensor_states')
self.declare_parameter('left_joint_states_topic', '/left_joint_sensor_states')
self.declare_parameter('right_arm_cmd_topic', '/right_forward_position_controller/commands')
self.declare_parameter('right_gripper_cmd_topic', '/right_gripper_controller/commands')
self.declare_parameter('left_arm_cmd_topic', '/left_forward_position_controller/commands')
self.declare_parameter('left_gripper_cmd_topic', '/left_gripper_controller/commands')
self.enable_filter = self.get_parameter('enable_filter').get_parameter_value().bool_value
self.filter_type = self.get_parameter('filter_type').get_parameter_value().string_value
self.ema_alpha = self.get_parameter('ema_alpha').get_parameter_value().double_value
self.kalman_q = self.get_parameter('kalman_q').get_parameter_value().double_value
self.kalman_r = self.get_parameter('kalman_r').get_parameter_value().double_value
self.enable_jump_log = self.get_parameter('enable_jump_log').get_parameter_value().bool_value
self.jump_velocity_threshold_rad_s = self.get_parameter('jump_velocity_threshold_rad_s').get_parameter_value().double_value
self.jump_log_min_interval_sec = self.get_parameter('jump_log_min_interval_sec').get_parameter_value().double_value
self.joint_update_epsilon_rad = self.get_parameter('joint_update_epsilon_rad').get_parameter_value().double_value
# Create per-joint filters
self.num_joints = 8
self.filters = [PositionFilter(filter_type=self.filter_type,
self.right_joint_states_topic = self.get_parameter('right_joint_states_topic').get_parameter_value().string_value
self.left_joint_states_topic = self.get_parameter('left_joint_states_topic').get_parameter_value().string_value
self.right_arm_cmd_topic = self.get_parameter('right_arm_cmd_topic').get_parameter_value().string_value
self.right_gripper_cmd_topic = self.get_parameter('right_gripper_cmd_topic').get_parameter_value().string_value
self.left_arm_cmd_topic = self.get_parameter('left_arm_cmd_topic').get_parameter_value().string_value
self.left_gripper_cmd_topic = self.get_parameter('left_gripper_cmd_topic').get_parameter_value().string_value
# Right arm runtime states
self.right_filters = [PositionFilter(filter_type=self.filter_type,
alpha=self.ema_alpha,
q=self.kalman_q,
r=self.kalman_r)
for _ in range(self.num_joints)]
self.prev_raw_positions = [None] * self.num_joints
self.last_update_time_sec = [None] * self.num_joints
self.last_jump_log_time_sec = [0.0] * self.num_joints
# Buffer outgoing joint commands and send them at timer rate.
self.command_queue = deque(maxlen=20)
self.right_command_queue = deque(maxlen=20)
self.subscriber_ = self.create_subscription(
# Left arm runtime states
self.left_filters = [PositionFilter(filter_type=self.filter_type,
alpha=self.ema_alpha,
q=self.kalman_q,
r=self.kalman_r)
for _ in range(self.num_joints)]
self.left_command_queue = deque(maxlen=20)
self.right_subscriber_ = self.create_subscription(
JointState,
"/joint_sensor_states",
self.listener_callback,
self.right_joint_states_topic,
self.right_listener_callback,
10
)
self.armPosPublisher_ = self.create_publisher(Float64MultiArray, "/right_forward_position_controller/commands", 10)
self.gipperPosPublisher_ = self.create_publisher(Float64MultiArray, "/right_gripper_controller/commands", 10)
# 3-1.创建动作客户端;
# self._action_client = ActionClient(self, FollowJointTrajectory, '/right_joint_trajectory_controller/follow_joint_trajectory')
# self.right_gripper_action_client = ActionClient(self, FollowJointTrajectory, '/right_joint_trajectory_controller/follow_joint_trajectory')
self.create_timer(0.01, self.timer_callback) # 定时器周期为0.02秒
# 3. 创建订阅者
# 参数1消息类型如String与发布者一致
# 参数2话题名称如"chatter",与发布者一致)
# 参数3回调函数消息到达时触发用于处理消息
# 参数4队列长度10缓存消息的最大数量
self.subscriber_ # 防止未使用变量警告
# 4. 消息处理回调函数(核心)
# 当订阅到消息时,自动调用此函数处理消息
def listener_callback(self, msg):
# self.send_goal([count, count1, 0.0, 0.0, 0.0, 0.0, 0.0])
# self.get_logger().info(f'Received joint states: {list(msg.name)} positions: {list(msg.position)}')
# Apply position filtering if enabled
arm_pos = list(msg.position)
# 基于“每关节速度阈值”检测异常,减少仅靠角度差导致的误报
if self.enable_jump_log and len(arm_pos) >= self.num_joints:
now_sec = self.get_clock().now().nanoseconds * 1e-9
for i in range(self.num_joints):
try:
curr = float(arm_pos[i])
except Exception:
continue
prev = self.prev_raw_positions[i]
if prev is None:
self.prev_raw_positions[i] = curr
self.last_update_time_sec[i] = now_sec
continue
delta = curr - prev
if abs(delta) <= self.joint_update_epsilon_rad:
continue
last_time = self.last_update_time_sec[i]
dt = 0.0 if last_time is None else (now_sec - last_time)
if dt > 1e-6:
speed = abs(delta) / dt
if speed >= self.jump_velocity_threshold_rad_s:
since_last_log = now_sec - self.last_jump_log_time_sec[i]
if since_last_log >= self.jump_log_min_interval_sec:
self.get_logger().warning(
f'Joint{i + 1} jump detected: prev={prev:.6f} curr={curr:.6f} '
f'delta={delta:.6f} rad dt={dt:.4f} s speed={speed:.3f} rad/s'
self.left_subscriber_ = self.create_subscription(
JointState,
self.left_joint_states_topic,
self.left_listener_callback,
10,
)
self.last_jump_log_time_sec[i] = now_sec
self.prev_raw_positions[i] = curr
self.last_update_time_sec[i] = now_sec
self.right_arm_publisher = self.create_publisher(Float64MultiArray, self.right_arm_cmd_topic, 10)
self.right_gripper_publisher = self.create_publisher(Float64MultiArray, self.right_gripper_cmd_topic, 10)
self.left_arm_publisher = self.create_publisher(Float64MultiArray, self.left_arm_cmd_topic, 10)
self.left_gripper_publisher = self.create_publisher(Float64MultiArray, self.left_gripper_cmd_topic, 10)
self.create_timer(0.01, self.timer_callback)
def right_listener_callback(self, msg):
self._process_arm_joint_state(
msg=msg,
filters=self.right_filters,
command_queue=self.right_command_queue,
)
def left_listener_callback(self, msg):
self._process_arm_joint_state(
msg=msg,
filters=self.left_filters,
command_queue=self.left_command_queue,
)
def _process_arm_joint_state(self, msg, filters, command_queue):
arm_pos = list(msg.position)
if self.enable_filter:
if len(arm_pos) >= self.num_joints:
filtered = []
@ -205,7 +185,7 @@ class ProgressActionClient(Node):
v = float(arm_pos[i])
except Exception:
v = None
fv = self.filters[i].update(v)
fv = filters[i].update(v)
# If filter returned None (no prior), fall back to raw
filtered.append(fv if fv is not None else (arm_pos[i] if i < len(arm_pos) else 0.0))
arm_pos_to_send = filtered
@ -226,68 +206,31 @@ class ProgressActionClient(Node):
arm7_pos_to_send = arm_pos_to_send[:7]
gripper_pos_to_send = arm_pos_to_send[7]
self.command_queue.append((arm7_pos_to_send, gripper_pos_to_send))
# self.send_goal(arm_pos_to_send)
command_queue.append((arm7_pos_to_send, gripper_pos_to_send))
def timer_callback(self):
# 定时器回调函数,可以用于周期性任务
if self.command_queue:
arm7_pos_to_send, gripper_pos_to_send = self.command_queue.popleft()
self.send_goal(arm7_pos_to_send, gripper_pos_to_send)
if self.right_command_queue:
arm7_pos_to_send, gripper_pos_to_send = self.right_command_queue.popleft()
self.send_goal('right', arm7_pos_to_send, gripper_pos_to_send)
def send_goal(self, arm7_pos, gripper_pos):
arm_joint = ['openarm_right_joint1', 'openarm_right_joint2', 'openarm_right_joint3', 'openarm_right_joint4', 'openarm_right_joint5', 'openarm_right_joint6', 'openarm_right_joint7']
arm_effort = [10.0, 10.0, 10.0, 10.0, 10.0, 10.0, 10.0]
arm_zero = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
# 3-2.发送请求;
# goal_msg = FollowJointTrajectory.Goal()
# goal_msg.trajectory.joint_names = arm_joint
# goal_msg.trajectory.points.append(JointTrajectoryPoint())
# goal_msg.trajectory.points[0].positions = arm_pos
# goal_msg.trajectory.points[0].effort = arm_effort
# print(goal_msg)
if self.left_command_queue:
arm7_pos_to_send, gripper_pos_to_send = self.left_command_queue.popleft()
self.send_goal('left', arm7_pos_to_send, gripper_pos_to_send)
# self._action_client.wait_for_server()
# self._send_goal_future = self._action_client.send_goal_async(goal_msg, feedback_callback=self.feedback_callback)
# self._send_goal_future.add_done_callback(self.goal_response_callback)
def send_goal(self, arm_name, arm7_pos, gripper_pos):
arm_msg = Float64MultiArray()
arm_msg.data = arm7_pos
self.armPosPublisher_.publish(arm_msg)
gripper_msg = Float64MultiArray()
gripper_msg.data = [map_float_range(gripper_pos, 0.0, 1.18, 0.0, 0.045)]
self.gipperPosPublisher_.publish(gripper_msg)
# self.get_logger().info(
# f'Published arm joints(1-7): {arm7_pos}, gripper joint8: {gripper_pos}'
# )
# self.get_logger().info(f'Published gripper joint8: {gripper_pos}')
if arm_name == 'left':
self.left_arm_publisher.publish(arm_msg)
self.left_gripper_publisher.publish(gripper_msg)
else:
self.right_arm_publisher.publish(arm_msg)
self.right_gripper_publisher.publish(gripper_msg)
# def goal_response_callback(self, future):
# # 3-3.处理目标发送后的反馈;
# goal_handle = future.result()
# print(goal_handle)
# if not goal_handle.accepted:
# self.get_logger().info('请求被拒绝')
# return
# self.get_logger().info('请求被接收,开始执行任务!')
# self._get_result_future = goal_handle.get_result_async()
# self._get_result_future.add_done_callback(self.get_result_callback)
# # 3-5.处理最终响应。
# def get_result_callback(self, future):
# return
# # result = future.result().result
# # self.get_logger().info('最终计算结果sum = %d' % result.sum)
# # 5.释放资源。
# # rclpy.shutdown()
# # 3-4.处理连续反馈;
# def feedback_callback(self, feedback_msg):
# feedback = (int)(feedback_msg.feedback.progress * 100)
# self.get_logger().info('当前进度: %d%%' % feedback)
def main():
rclpy.init()

View File

@ -23,45 +23,27 @@ class JointSensorNode(Node):
def __init__(self):
super().__init__('joint_sensor_node')
self.num_joints = 8
self.declare_parameter('can_interface', 'can2')
self.get_logger().info('正在初始化CAN接口...')
can_interface = self.get_parameter('can_interface').value
self.bus = can.interface.Bus(channel=can_interface, bustype='socketcan')
# ROS2 publisher: 发布 JointState 到话题 'joint_states'
self.publisher_ = self.create_publisher(JointState, 'joint_sensor_states', 10)
self.arm_labels = {'right': 'RIGHT', 'left': 'LEFT'}
self.joint_names = [f'joint{i}' for i in range(1, self.num_joints + 1)]
# 8路传感器 CAN ID -> 8个关节索引
self.can_to_joint_index = {
0x01: 0,
0x02: 1,
0x03: 2,
0x04: 3,
0x05: 4,
0x06: 5,
0x07: 6,
0x08: 7,
}
self.current_positions = [0.0] * self.num_joints
self.current_velocities = [0.0] * self.num_joints
self.has_position = [False] * self.num_joints
self.last_wrapped_positions = [0.0] * self.num_joints
self.has_wrapped_position = [False] * self.num_joints
self.last_joint_update_sec = [None] * self.num_joints
self.last_jump_log_sec = [0.0] * self.num_joints
self.last_stage_log_sec = [0.0] * self.num_joints
self.prev_raw_sensor_angle = [None] * self.num_joints
self.last_raw_jump_log_sec = [0.0] * self.num_joints
self.last_raw_reject_log_sec = [0.0] * self.num_joints
# 传感器原始单位:'centi-rad' 表示原始值/100后得到弧度'centi-deg' 表示原始值/100后得到角度
self.raw_angle_unit = 'centi-rad'
# CAN interface and topic parameters
self.declare_parameter('can_interface', 'can2') # backward-compatible alias for right arm
self.declare_parameter('right_can_interface', 'can2')
self.declare_parameter('left_can_interface', 'can3')
self.declare_parameter('enable_left_arm', True)
self.declare_parameter('right_joint_states_topic', 'right_joint_sensor_states')
self.declare_parameter('left_joint_states_topic', 'left_joint_sensor_states')
# 每个关节零点偏置(弧度),发布角度时会先减去该偏置
# Calibration and direction parameters
self.declare_parameter('zero_offsets', [0.0] * self.num_joints)
# 每个关节转向系数1.0 为正向,-1.0 为反向
self.declare_parameter('sensor_directions', [1.0] * self.num_joints)
self.declare_parameter('left_zero_offsets', [0.0] * self.num_joints)
self.declare_parameter('left_sensor_directions', [1.0] * self.num_joints)
self.declare_parameter('auto_zero_on_first_sample', False)
self.declare_parameter('left_auto_zero_on_first_sample', False)
self.declare_parameter('calibrate_now', False)
self.declare_parameter('left_calibrate_now', False)
# Diagnostics and anomaly filters
self.declare_parameter('enable_speed_jump_log', False)
self.declare_parameter('speed_jump_threshold_rad_s', 3.0)
self.declare_parameter('jump_log_min_interval_sec', 0.2)
@ -73,9 +55,9 @@ class JointSensorNode(Node):
self.declare_parameter('raw_jump_log_min_interval_sec', 0.2)
self.declare_parameter('enable_raw_jump_reject', True)
self.declare_parameter('raw_jump_reject_threshold_rad', 0.35)
self.zero_offsets = self._load_zero_offsets_from_param()
self.sensor_directions = self._load_sensor_directions_from_param()
self.auto_zero_on_first_sample = bool(self.get_parameter('auto_zero_on_first_sample').value)
# Runtime flags
self.enable_left_arm = bool(self.get_parameter('enable_left_arm').value)
self.enable_speed_jump_log = bool(self.get_parameter('enable_speed_jump_log').value)
self.speed_jump_threshold_rad_s = float(self.get_parameter('speed_jump_threshold_rad_s').value)
self.jump_log_min_interval_sec = float(self.get_parameter('jump_log_min_interval_sec').value)
@ -87,16 +69,87 @@ class JointSensorNode(Node):
self.raw_jump_log_min_interval_sec = float(self.get_parameter('raw_jump_log_min_interval_sec').value)
self.enable_raw_jump_reject = bool(self.get_parameter('enable_raw_jump_reject').value)
self.raw_jump_reject_threshold_rad = float(self.get_parameter('raw_jump_reject_threshold_rad').value)
self.auto_zero_done = [False] * self.num_joints
# Sensor unit setting
self.raw_angle_unit = 'centi-rad'
self.get_logger().info('正在初始化CAN接口...')
legacy_can = str(self.get_parameter('can_interface').value)
right_can_interface = str(self.get_parameter('right_can_interface').value or legacy_can)
left_can_interface = str(self.get_parameter('left_can_interface').value)
right_topic = str(self.get_parameter('right_joint_states_topic').value)
left_topic = str(self.get_parameter('left_joint_states_topic').value)
self.joint_names = [f'joint{i}' for i in range(1, self.num_joints + 1)]
self.can_to_joint_index = {
0x01: 0,
0x02: 1,
0x03: 2,
0x04: 3,
0x05: 4,
0x06: 5,
0x07: 6,
0x08: 7,
}
self.arms = {
'right': self._new_arm_state(
zero_offsets=self._load_offsets_param('zero_offsets'),
sensor_directions=self._load_directions_param('sensor_directions'),
auto_zero=bool(self.get_parameter('auto_zero_on_first_sample').value),
publisher=self.create_publisher(JointState, right_topic, 10),
bus=can.interface.Bus(channel=right_can_interface, bustype='socketcan'),
)
}
if self.enable_left_arm:
self.arms['left'] = self._new_arm_state(
zero_offsets=self._load_offsets_param('left_zero_offsets'),
sensor_directions=self._load_directions_param('left_sensor_directions'),
auto_zero=bool(self.get_parameter('left_auto_zero_on_first_sample').value),
publisher=self.create_publisher(JointState, left_topic, 10),
bus=can.interface.Bus(channel=left_can_interface, bustype='socketcan'),
)
self.add_on_set_parameters_callback(self.on_set_parameters)
self.listener = CallbackListener(self.listen_can_callback)
self.notifier = can.Notifier(self.bus, [self.listener])
self.notifiers = []
for arm_name, state in self.arms.items():
listener = CallbackListener(lambda msg, a=arm_name: self.listen_can_callback(msg, a))
notifier = can.Notifier(state['bus'], [listener])
state['listener'] = listener
state['notifier'] = notifier
self.notifiers.append(notifier)
def listen_can_callback(self, msg: can.Message):
def _new_arm_state(self, zero_offsets, sensor_directions, auto_zero, publisher, bus):
return {
'publisher': publisher,
'bus': bus,
'notifier': None,
'listener': None,
'positions': [0.0] * self.num_joints,
'velocities': [0.0] * self.num_joints,
'has_position': [False] * self.num_joints,
'last_wrapped_positions': [0.0] * self.num_joints,
'has_wrapped_position': [False] * self.num_joints,
'last_joint_update_sec': [None] * self.num_joints,
'last_jump_log_sec': [0.0] * self.num_joints,
'last_stage_log_sec': [0.0] * self.num_joints,
'prev_raw_sensor_angle': [None] * self.num_joints,
'last_raw_jump_log_sec': [0.0] * self.num_joints,
'last_raw_reject_log_sec': [0.0] * self.num_joints,
'zero_offsets': zero_offsets,
'sensor_directions': sensor_directions,
'auto_zero_on_first_sample': auto_zero,
'auto_zero_done': [False] * self.num_joints,
}
def listen_can_callback(self, msg: can.Message, arm_name: str):
# 用户自定义的回调函数
try:
# self.get_logger().info(f"Received: id=0x{msg.arbitration_id:X} dlc={msg.dlc} data={msg.data.hex()} ts={msg.timestamp}")
state = self.arms.get(arm_name)
if state is None:
return
joint_index = self.can_to_joint_index.get(msg.arbitration_id)
if joint_index is None:
@ -107,28 +160,30 @@ class JointSensorNode(Node):
raw_sensor_angle = self.extract_raw_sensor_angle_rad(msg.data)
# 原始数据跳变过大时直接丢弃该帧,避免异常值进入后续链路
if self._should_reject_raw_jump(joint_index, raw_sensor_angle, msg_time_sec, msg):
if self._should_reject_raw_jump(state, arm_name, joint_index, raw_sensor_angle, msg_time_sec, msg):
return
self._log_raw_jump(joint_index, raw_sensor_angle, msg_time_sec, msg)
self._log_raw_jump(state, arm_name, joint_index, raw_sensor_angle, msg_time_sec, msg)
# 每条CAN报文默认对应一个关节角和速度
parsed_angle, vel = self.parse_sensor_from_can(msg.data)
direction = self.sensor_directions[joint_index]
direction = state['sensor_directions'][joint_index]
directed_angle = self.wrap_to_pi(parsed_angle * direction)
vel = vel * direction
self.last_wrapped_positions[joint_index] = directed_angle
self.has_wrapped_position[joint_index] = True
state['last_wrapped_positions'][joint_index] = directed_angle
state['has_wrapped_position'][joint_index] = True
if self.auto_zero_on_first_sample and not self.auto_zero_done[joint_index]:
self.zero_offsets[joint_index] = directed_angle
self.auto_zero_done[joint_index] = True
if state['auto_zero_on_first_sample'] and not state['auto_zero_done'][joint_index]:
state['zero_offsets'][joint_index] = directed_angle
state['auto_zero_done'][joint_index] = True
zeroed_angle = self.wrap_to_pi(directed_angle - self.zero_offsets[joint_index])
angle = self.unwrap_joint_angle(joint_index, zeroed_angle)
zeroed_angle = self.wrap_to_pi(directed_angle - state['zero_offsets'][joint_index])
angle = self.unwrap_joint_angle(joint_index, zeroed_angle, state['positions'], state['has_position'])
self._log_stage_diag(
state,
arm_name,
joint_index,
parsed_angle,
directed_angle,
@ -137,41 +192,39 @@ class JointSensorNode(Node):
vel,
msg_time_sec,
)
self._log_speed_jump(joint_index, angle, msg_time_sec)
self._log_speed_jump(state, arm_name, joint_index, angle, msg_time_sec)
self.current_positions[joint_index] = angle
self.current_velocities[joint_index] = vel
state['positions'][joint_index] = angle
state['velocities'][joint_index] = vel
# 发布完整8关节状态位置/速度单位均为弧度)
js = JointState()
js.header.stamp = self.get_clock().now().to_msg()
js.name = self.joint_names
js.position = self.current_positions
js.velocity = self.current_velocities
self.publisher_.publish(js)
# self.get_logger().info(
# f"Published JointState: id=0x{msg.arbitration_id:X} "
# f"joint={self.joint_names[joint_index]} pos={angle:.6f} vel={vel:.6f}"
# )
js.position = state['positions']
js.velocity = state['velocities']
state['publisher'].publish(js)
except Exception as e:
self.get_logger().error(f"Error handling CAN msg: {e}")
def shutdown(self):
self.notifier.stop()
self.bus.shutdown()
for state in self.arms.values():
if state['notifier'] is not None:
state['notifier'].stop()
if state['bus'] is not None:
state['bus'].shutdown()
def _load_zero_offsets_from_param(self):
raw_offsets = self.get_parameter('zero_offsets').value
def _load_offsets_param(self, name):
raw_offsets = self.get_parameter(name).value
if not isinstance(raw_offsets, (list, tuple)) or len(raw_offsets) != self.num_joints:
self.get_logger().warn(f'参数 zero_offsets 长度不是{self.num_joints},已使用默认零偏。')
self.get_logger().warn(f'参数 {name} 长度不是{self.num_joints},已使用默认零偏。')
return [0.0] * self.num_joints
return [float(v) for v in raw_offsets]
def _load_sensor_directions_from_param(self):
raw_dirs = self.get_parameter('sensor_directions').value
def _load_directions_param(self, name):
raw_dirs = self.get_parameter(name).value
if not isinstance(raw_dirs, (list, tuple)) or len(raw_dirs) != self.num_joints:
self.get_logger().warn(f'参数 sensor_directions 长度不是{self.num_joints},已使用默认方向。')
self.get_logger().warn(f'参数 {name} 长度不是{self.num_joints},已使用默认方向。')
return [1.0] * self.num_joints
dirs = [float(v) for v in raw_dirs]
@ -181,41 +234,49 @@ class JointSensorNode(Node):
dirs[i] = 1.0
return dirs
def calibrate_zero_from_current(self):
def calibrate_zero_from_current(self, arm_name):
state = self.arms.get(arm_name)
if state is None:
return
param_name = 'left_zero_offsets' if arm_name == 'left' else 'zero_offsets'
updated = 0
for i in range(self.num_joints):
if self.has_wrapped_position[i]:
self.zero_offsets[i] = self.last_wrapped_positions[i]
self.has_position[i] = False
if state['has_wrapped_position'][i]:
state['zero_offsets'][i] = state['last_wrapped_positions'][i]
state['has_position'][i] = False
updated += 1
if updated > 0:
self.set_parameters([
rclpy.parameter.Parameter(
'zero_offsets',
param_name,
rclpy.Parameter.Type.DOUBLE_ARRAY,
self.zero_offsets,
state['zero_offsets'],
)
])
self.get_logger().info(f'零点校准完成,已更新 {updated}/{self.num_joints} 个关节零偏。')
self.get_logger().info(f'[{self.arm_labels[arm_name]}] 零点校准完成,已更新 {updated}/{self.num_joints} 个关节零偏。')
else:
self.get_logger().warn('零点校准失败:当前还没有可用的关节采样数据。')
def on_set_parameters(self, params):
for param in params:
if param.name == 'zero_offsets':
if param.name in ('zero_offsets', 'left_zero_offsets'):
if not isinstance(param.value, (list, tuple)) or len(param.value) != self.num_joints:
return SetParametersResult(successful=False, reason=f'zero_offsets 必须是长度为{self.num_joints}的数组')
return SetParametersResult(successful=False, reason=f'{param.name} 必须是长度为{self.num_joints}的数组')
if param.name == 'sensor_directions':
if param.name in ('sensor_directions', 'left_sensor_directions'):
if not isinstance(param.value, (list, tuple)) or len(param.value) != self.num_joints:
return SetParametersResult(successful=False, reason=f'sensor_directions 必须是长度为{self.num_joints}的数组')
return SetParametersResult(successful=False, reason=f'{param.name} 必须是长度为{self.num_joints}的数组')
for i, v in enumerate(param.value):
if abs(float(v)) < 1e-9:
return SetParametersResult(successful=False, reason=f'sensor_directions[{i}] 不能为0')
return SetParametersResult(successful=False, reason=f'{param.name}[{i}] 不能为0')
if param.name == 'auto_zero_on_first_sample':
self.auto_zero_on_first_sample = bool(param.value)
self.arms['right']['auto_zero_on_first_sample'] = bool(param.value)
if param.name == 'left_auto_zero_on_first_sample' and 'left' in self.arms:
self.arms['left']['auto_zero_on_first_sample'] = bool(param.value)
if param.name == 'enable_speed_jump_log':
self.enable_speed_jump_log = bool(param.value)
@ -251,21 +312,34 @@ class JointSensorNode(Node):
self.raw_jump_reject_threshold_rad = float(param.value)
if param.name == 'calibrate_now' and bool(param.value):
self.calibrate_zero_from_current()
self.calibrate_zero_from_current('right')
if param.name == 'left_calibrate_now' and bool(param.value):
self.calibrate_zero_from_current('left')
for param in params:
if param.name == 'zero_offsets':
self.zero_offsets = [float(v) for v in param.value]
self.has_position = [False] * self.num_joints
self.arms['right']['zero_offsets'] = [float(v) for v in param.value]
self.arms['right']['has_position'] = [False] * self.num_joints
if param.name == 'sensor_directions':
self.sensor_directions = [float(v) for v in param.value]
self.has_position = [False] * self.num_joints
self.arms['right']['sensor_directions'] = [float(v) for v in param.value]
self.arms['right']['has_position'] = [False] * self.num_joints
if param.name == 'left_zero_offsets' and 'left' in self.arms:
self.arms['left']['zero_offsets'] = [float(v) for v in param.value]
self.arms['left']['has_position'] = [False] * self.num_joints
if param.name == 'left_sensor_directions' and 'left' in self.arms:
self.arms['left']['sensor_directions'] = [float(v) for v in param.value]
self.arms['left']['has_position'] = [False] * self.num_joints
return SetParametersResult(successful=True)
def _log_stage_diag(
self,
state,
arm_name,
joint_index: int,
parsed_angle: float,
directed_angle: float,
@ -277,80 +351,80 @@ class JointSensorNode(Node):
if not self.enable_stage_diag_log:
return
since_last = now_sec - self.last_stage_log_sec[joint_index]
since_last = now_sec - state['last_stage_log_sec'][joint_index]
if since_last < self.stage_log_min_interval_sec:
return
self.last_stage_log_sec[joint_index] = now_sec
state['last_stage_log_sec'][joint_index] = now_sec
self.get_logger().info(
f"Joint{joint_index + 1} stages: parsed={parsed_angle:.6f} dir={directed_angle:.6f} "
f"[{self.arm_labels[arm_name]}] Joint{joint_index + 1} stages: parsed={parsed_angle:.6f} dir={directed_angle:.6f} "
f"zeroed={zeroed_angle:.6f} unwrapped={unwrapped_angle:.6f} vel={vel:.6f}"
)
def _log_speed_jump(self, joint_index: int, curr_unwrapped_angle: float, now_sec: float):
def _log_speed_jump(self, state, arm_name, joint_index: int, curr_unwrapped_angle: float, now_sec: float):
if not self.enable_speed_jump_log:
self.last_joint_update_sec[joint_index] = now_sec
state['last_joint_update_sec'][joint_index] = now_sec
return
prev_time = self.last_joint_update_sec[joint_index]
prev_time = state['last_joint_update_sec'][joint_index]
if prev_time is None:
self.last_joint_update_sec[joint_index] = now_sec
state['last_joint_update_sec'][joint_index] = now_sec
return
dt = now_sec - prev_time
if dt <= 1e-6:
self.last_joint_update_sec[joint_index] = now_sec
state['last_joint_update_sec'][joint_index] = now_sec
return
prev_unwrapped = self.current_positions[joint_index]
prev_unwrapped = state['positions'][joint_index]
delta = curr_unwrapped_angle - prev_unwrapped
if abs(delta) <= self.joint_update_epsilon_rad:
self.last_joint_update_sec[joint_index] = now_sec
state['last_joint_update_sec'][joint_index] = now_sec
return
speed = abs(delta) / dt
if speed >= self.speed_jump_threshold_rad_s:
since_last_log = now_sec - self.last_jump_log_sec[joint_index]
since_last_log = now_sec - state['last_jump_log_sec'][joint_index]
if since_last_log >= self.jump_log_min_interval_sec:
self.get_logger().warning(
f"Joint{joint_index + 1} speed jump: prev={prev_unwrapped:.6f} curr={curr_unwrapped_angle:.6f} "
f"[{self.arm_labels[arm_name]}] Joint{joint_index + 1} speed jump: prev={prev_unwrapped:.6f} curr={curr_unwrapped_angle:.6f} "
f"delta={delta:.6f} rad dt={dt:.4f} s speed={speed:.3f} rad/s"
)
self.last_jump_log_sec[joint_index] = now_sec
state['last_jump_log_sec'][joint_index] = now_sec
self.last_joint_update_sec[joint_index] = now_sec
state['last_joint_update_sec'][joint_index] = now_sec
def _log_raw_jump(self, joint_index: int, raw_sensor_angle: float, now_sec: float, msg: can.Message):
def _log_raw_jump(self, state, arm_name, joint_index: int, raw_sensor_angle: float, now_sec: float, msg: can.Message):
if not self.enable_raw_jump_log or raw_sensor_angle is None:
if raw_sensor_angle is not None:
self.prev_raw_sensor_angle[joint_index] = raw_sensor_angle
state['prev_raw_sensor_angle'][joint_index] = raw_sensor_angle
return
prev = self.prev_raw_sensor_angle[joint_index]
prev = state['prev_raw_sensor_angle'][joint_index]
if prev is None:
self.prev_raw_sensor_angle[joint_index] = raw_sensor_angle
state['prev_raw_sensor_angle'][joint_index] = raw_sensor_angle
return
# 原始角度做最短角距离,避免 0/2pi 边界导致假跳变
delta = self.wrap_to_pi(raw_sensor_angle - prev)
if abs(delta) >= self.raw_jump_threshold_rad:
since_last = now_sec - self.last_raw_jump_log_sec[joint_index]
since_last = now_sec - state['last_raw_jump_log_sec'][joint_index]
if since_last >= self.raw_jump_log_min_interval_sec:
self.get_logger().warning(
f"Joint{joint_index + 1} RAW jump: prev_raw={prev:.6f} curr_raw={raw_sensor_angle:.6f} "
f"[{self.arm_labels[arm_name]}] Joint{joint_index + 1} RAW jump: prev_raw={prev:.6f} curr_raw={raw_sensor_angle:.6f} "
f"delta={delta:.6f} rad can_id=0x{msg.arbitration_id:X} data={msg.data.hex()}"
)
self.last_raw_jump_log_sec[joint_index] = now_sec
state['last_raw_jump_log_sec'][joint_index] = now_sec
self.prev_raw_sensor_angle[joint_index] = raw_sensor_angle
state['prev_raw_sensor_angle'][joint_index] = raw_sensor_angle
def _should_reject_raw_jump(self, joint_index: int, raw_sensor_angle: float, now_sec: float, msg: can.Message) -> bool:
def _should_reject_raw_jump(self, state, arm_name, joint_index: int, raw_sensor_angle: float, now_sec: float, msg: can.Message) -> bool:
if raw_sensor_angle is None:
return False
prev = self.prev_raw_sensor_angle[joint_index]
prev = state['prev_raw_sensor_angle'][joint_index]
if prev is None:
self.prev_raw_sensor_angle[joint_index] = raw_sensor_angle
state['prev_raw_sensor_angle'][joint_index] = raw_sensor_angle
return False
if not self.enable_raw_jump_reject:
@ -360,14 +434,14 @@ class JointSensorNode(Node):
if abs(delta) < self.raw_jump_reject_threshold_rad:
return False
since_last = now_sec - self.last_raw_reject_log_sec[joint_index]
since_last = now_sec - state['last_raw_reject_log_sec'][joint_index]
if since_last >= self.raw_jump_log_min_interval_sec:
self.get_logger().warning(
f"Joint{joint_index + 1} RAW frame dropped: prev_raw={prev:.6f} curr_raw={raw_sensor_angle:.6f} "
f"[{self.arm_labels[arm_name]}] Joint{joint_index + 1} RAW frame dropped: prev_raw={prev:.6f} curr_raw={raw_sensor_angle:.6f} "
f"delta={delta:.6f} rad threshold={self.raw_jump_reject_threshold_rad:.6f} "
f"can_id=0x{msg.arbitration_id:X} data={msg.data.hex()}"
)
self.last_raw_reject_log_sec[joint_index] = now_sec
state['last_raw_reject_log_sec'][joint_index] = now_sec
return True
@ -393,13 +467,13 @@ class JointSensorNode(Node):
# 归一化到 [-pi, pi),解决 0~2pi 到 ±pi 的过零映射问题
return (angle_rad + math.pi) % (2.0 * math.pi) - math.pi
def unwrap_joint_angle(self, joint_index: int, wrapped_angle: float) -> float:
def unwrap_joint_angle(self, joint_index: int, wrapped_angle: float, positions, has_position) -> float:
# 基于上一帧做相位展开,避免在 -pi/+pi 边界发生数值跳变
if not self.has_position[joint_index]:
self.has_position[joint_index] = True
if not has_position[joint_index]:
has_position[joint_index] = True
return wrapped_angle
prev = self.current_positions[joint_index]
prev = positions[joint_index]
prev_wrapped = self.wrap_to_pi(prev)
delta = wrapped_angle - prev_wrapped