增加can编码器自动校准零点和零点手动配置,增加多节点关节控制

This commit is contained in:
shenchenyang 2026-03-27 20:06:26 +08:00
parent 8a020b18ac
commit ec52ded9aa
5 changed files with 243 additions and 39 deletions

View File

@ -0,0 +1,6 @@
joint_sensor_node:
ros__parameters:
zero_offsets: [2.7995149378912956, 2.5233983960718795, 0.0, 0.0, 0.0, 0.0, 0.0]
sensor_directions: [-1.0, 1.0, -1.0, 1.0, 1.0, 1.0, 1.0]
auto_zero_on_first_sample: true
can_interface: 'can2'

View File

@ -0,0 +1,34 @@
from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument
from launch.substitutions import LaunchConfiguration
from launch_ros.actions import Node
from ament_index_python.packages import get_package_share_directory
import os
def generate_launch_description():
package_share = get_package_share_directory('openarm_joint_sensor')
default_params_file = os.path.join(
package_share,
'config',
'joint_sensor_zero_offsets.yaml',
)
params_file_arg = DeclareLaunchArgument(
'params_file',
default_value=default_params_file,
description='Path to the YAML parameter file for joint_sensor_can node.',
)
joint_sensor_node = Node(
package='openarm_joint_sensor',
executable='joint_sensor_can',
name='joint_sensor_node',
output='screen',
parameters=[LaunchConfiguration('params_file')],
)
return LaunchDescription([
params_file_arg,
joint_sensor_node,
])

View File

@ -5,6 +5,7 @@ from control_msgs.action import FollowJointTrajectory
from rclpy.action import ActionClient
from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint
from sensor_msgs.msg import JointState
from std_msgs.msg import Float64MultiArray
count = 0
@ -90,6 +91,8 @@ class ProgressActionClient(Node):
self.listener_callback,
10
)
self.float64MultiArrayPublisher_ = self.create_publisher(Float64MultiArray, "/right_forward_position_controller/commands", 10)
# 3-1.创建动作客户端;
self._action_client = ActionClient(self, FollowJointTrajectory, '/right_joint_trajectory_controller/follow_joint_trajectory')
@ -135,16 +138,20 @@ class ProgressActionClient(Node):
arm_effort = [10.0, 10.0, 10.0, 10.0, 10.0, 10.0, 10.0]
arm_zero = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
# 3-2.发送请求;
goal_msg = FollowJointTrajectory.Goal()
goal_msg.trajectory.joint_names = arm_joint
goal_msg.trajectory.points.append(JointTrajectoryPoint())
goal_msg.trajectory.points[0].positions = arm_pos
# goal_msg = FollowJointTrajectory.Goal()
# goal_msg.trajectory.joint_names = arm_joint
# goal_msg.trajectory.points.append(JointTrajectoryPoint())
# goal_msg.trajectory.points[0].positions = arm_pos
# goal_msg.trajectory.points[0].effort = arm_effort
# print(goal_msg)
self._action_client.wait_for_server()
self._send_goal_future = self._action_client.send_goal_async(goal_msg, feedback_callback=self.feedback_callback)
self._send_goal_future.add_done_callback(self.goal_response_callback)
# self._action_client.wait_for_server()
# self._send_goal_future = self._action_client.send_goal_async(goal_msg, feedback_callback=self.feedback_callback)
# self._send_goal_future.add_done_callback(self.goal_response_callback)
msg = Float64MultiArray()
msg.data = arm_pos
self.float64MultiArrayPublisher_.publish(msg)
self.get_logger().info(f'Published joint commands: {arm_pos}')
def goal_response_callback(self, future):
# 3-3.处理目标发送后的反馈;

View File

@ -3,6 +3,7 @@ from rclpy.node import Node
import can
import math
from sensor_msgs.msg import JointState
from rcl_interfaces.msg import SetParametersResult
class CallbackListener(can.Listener):
def __init__(self, user_callback):
@ -21,17 +22,43 @@ class CallbackListener(can.Listener):
class JointSensorNode(Node):
def __init__(self):
super().__init__('joint_sensor_node')
self.declare_parameter('can_interface', 'can0')
self.get_logger().info('正在初始化CAN接口...')
self.bus = can.interface.Bus(channel='can0', bustype='socketcan')
can_interface = self.get_parameter('can_interface').value
self.bus = can.interface.Bus(channel=can_interface, bustype='socketcan')
# ROS2 publisher: 发布 JointState 到话题 'joint_states'
self.publisher_ = self.create_publisher(JointState, 'joint_sensor_states', 10)
# 可配置的 CAN ID -> joint name 列表映射
# 默认示例ID 0x123 包含两个关节角,每个角用 int16 存储(单位 centi-degrees除以100得到度
# 你可以根据真实硬件修改此映射或解析函数
self.can_to_joints = {
0x01: ['joint1'],
self.joint_names = [f'joint{i}' for i in range(1, 8)]
# 7路传感器 CAN ID -> 7个关节索引
self.can_to_joint_index = {
0x01: 0,
0x02: 1,
0x03: 2,
0x04: 3,
0x05: 4,
0x06: 5,
0x07: 6,
}
self.current_positions = [0.0] * 7
self.current_velocities = [0.0] * 7
self.has_position = [False] * 7
self.last_wrapped_positions = [0.0] * 7
self.has_wrapped_position = [False] * 7
# 传感器原始单位:'centi-rad' 表示原始值/100后得到弧度'centi-deg' 表示原始值/100后得到角度
self.raw_angle_unit = 'centi-rad'
# 每个关节零点偏置(弧度),发布角度时会先减去该偏置
self.declare_parameter('zero_offsets', [0.0] * 7)
# 每个关节转向系数1.0 为正向,-1.0 为反向
self.declare_parameter('sensor_directions', [1.0] * 7)
self.declare_parameter('auto_zero_on_first_sample', False)
self.declare_parameter('calibrate_now', False)
self.zero_offsets = self._load_zero_offsets_from_param()
self.sensor_directions = self._load_sensor_directions_from_param()
self.auto_zero_on_first_sample = bool(self.get_parameter('auto_zero_on_first_sample').value)
self.auto_zero_done = [False] * 7
self.add_on_set_parameters_callback(self.on_set_parameters)
self.listener = CallbackListener(self.listen_can_callback)
self.notifier = can.Notifier(self.bus, [self.listener])
@ -39,31 +66,43 @@ class JointSensorNode(Node):
def listen_can_callback(self, msg: can.Message):
# 用户自定义的回调函数
try:
self.get_logger().debug(f"Received: id=0x{msg.arbitration_id:X} dlc={msg.dlc} data={msg.data.hex()} ts={msg.timestamp}")
# self.get_logger().info(f"Received: id=0x{msg.arbitration_id:X} dlc={msg.dlc} data={msg.data.hex()} ts={msg.timestamp}")
# 解析角度数据(默认按每 2 字节 int16单位 centi-degrees
angles = self.parse_angles_from_can(msg.data)
joint_index = self.can_to_joint_index.get(msg.arbitration_id)
if joint_index is None:
self.get_logger().debug(f"忽略未映射CAN ID: 0x{msg.arbitration_id:X}")
return
# 根据 can id 找到 joint 名称列表
names = self.can_to_joints.get(msg.arbitration_id, [f'joint_{i}' for i in range(len(angles))])
# 每条CAN报文默认对应一个关节角和速度
angle, vel = self.parse_sensor_from_can(msg.data)
direction = self.sensor_directions[joint_index]
angle = self.wrap_to_pi(angle * direction)
vel = vel * direction
# 对长度不匹配做适配(截断或填充 0
if len(names) != len(angles):
# 截断或填充名称或角度,保证二者等长
if len(names) > len(angles):
names = names[:len(angles)]
else:
# 增加默认名称
names = names + [f'joint_{i}' for i in range(len(names), len(angles))]
self.last_wrapped_positions[joint_index] = angle
self.has_wrapped_position[joint_index] = True
# 发布 JointState位置使用弧度
if self.auto_zero_on_first_sample and not self.auto_zero_done[joint_index]:
self.zero_offsets[joint_index] = angle
self.auto_zero_done[joint_index] = True
angle = self.wrap_to_pi(angle - self.zero_offsets[joint_index])
angle = self.unwrap_joint_angle(joint_index, angle)
self.current_positions[joint_index] = angle
self.current_velocities[joint_index] = vel
# 发布完整7关节状态位置/速度单位均为弧度)
js = JointState()
js.header.stamp = self.get_clock().now().to_msg()
js.name = names
js.position = angles
js.name = self.joint_names
js.position = self.current_positions
js.velocity = self.current_velocities
self.publisher_.publish(js)
self.get_logger().debug(f"Published JointState: ids=0x{msg.arbitration_id:X} names={names} positions={angles}")
self.get_logger().info(
f"Published JointState: id=0x{msg.arbitration_id:X} "
f"joint={self.joint_names[joint_index]} pos={angle:.6f} vel={vel:.6f}"
)
except Exception as e:
self.get_logger().error(f"Error handling CAN msg: {e}")
@ -71,15 +110,130 @@ class JointSensorNode(Node):
self.notifier.stop()
self.bus.shutdown()
def parse_angles_from_can(self, data: bytes):
angles = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0] # 默认两个关节角,单位弧度
if data[0] == 0xaa: # 假设 0xaa 开头表示特殊格式,使用不同解析方式
raw = (data[2] << 8) | data[3]
deg = (raw / 4096.0) * 360.0 # 例如特殊格式直接除以 4096 得到度
rad = math.radians(deg) - 3.14
self.get_logger().info(f"Special format detected: raw={raw} deg={deg} rad={rad}")
angles[0] = rad # 假设特殊格式只包含一个关节角,放在第一个位置
return angles
def _load_zero_offsets_from_param(self):
raw_offsets = self.get_parameter('zero_offsets').value
if not isinstance(raw_offsets, (list, tuple)) or len(raw_offsets) != 7:
self.get_logger().warn('参数 zero_offsets 长度不是7,已使用默认零偏。')
return [0.0] * 7
return [float(v) for v in raw_offsets]
def _load_sensor_directions_from_param(self):
raw_dirs = self.get_parameter('sensor_directions').value
if not isinstance(raw_dirs, (list, tuple)) or len(raw_dirs) != 7:
self.get_logger().warn('参数 sensor_directions 长度不是7,已使用默认方向。')
return [1.0] * 7
dirs = [float(v) for v in raw_dirs]
for i, v in enumerate(dirs):
if abs(v) < 1e-9:
self.get_logger().warn(f'sensor_directions[{i}] 不能为0已回退为1.0')
dirs[i] = 1.0
return dirs
def calibrate_zero_from_current(self):
updated = 0
for i in range(7):
if self.has_wrapped_position[i]:
self.zero_offsets[i] = self.last_wrapped_positions[i]
self.has_position[i] = False
updated += 1
if updated > 0:
self.set_parameters([
rclpy.parameter.Parameter(
'zero_offsets',
rclpy.Parameter.Type.DOUBLE_ARRAY,
self.zero_offsets,
)
])
self.get_logger().info(f'零点校准完成,已更新 {updated}/7 个关节零偏。')
else:
self.get_logger().warn('零点校准失败:当前还没有可用的关节采样数据。')
def on_set_parameters(self, params):
for param in params:
if param.name == 'zero_offsets':
if not isinstance(param.value, (list, tuple)) or len(param.value) != 7:
return SetParametersResult(successful=False, reason='zero_offsets 必须是长度为7的数组')
if param.name == 'sensor_directions':
if not isinstance(param.value, (list, tuple)) or len(param.value) != 7:
return SetParametersResult(successful=False, reason='sensor_directions 必须是长度为7的数组')
for i, v in enumerate(param.value):
if abs(float(v)) < 1e-9:
return SetParametersResult(successful=False, reason=f'sensor_directions[{i}] 不能为0')
if param.name == 'auto_zero_on_first_sample':
self.auto_zero_on_first_sample = bool(param.value)
if param.name == 'calibrate_now' and bool(param.value):
self.calibrate_zero_from_current()
for param in params:
if param.name == 'zero_offsets':
self.zero_offsets = [float(v) for v in param.value]
self.has_position = [False] * 7
if param.name == 'sensor_directions':
self.sensor_directions = [float(v) for v in param.value]
self.has_position = [False] * 7
return SetParametersResult(successful=True)
def wrap_to_pi(self, angle_rad: float) -> float:
# 归一化到 [-pi, pi),解决 0~2pi 到 ±pi 的过零映射问题
return (angle_rad + math.pi) % (2.0 * math.pi) - math.pi
def unwrap_joint_angle(self, joint_index: int, wrapped_angle: float) -> float:
# 基于上一帧做相位展开,避免在 -pi/+pi 边界发生数值跳变
if not self.has_position[joint_index]:
self.has_position[joint_index] = True
return wrapped_angle
prev = self.current_positions[joint_index]
prev_wrapped = self.wrap_to_pi(prev)
delta = wrapped_angle - prev_wrapped
if delta > math.pi:
delta -= 2.0 * math.pi
elif delta < -math.pi:
delta += 2.0 * math.pi
return prev + delta
def raw_to_rad(self, raw_value: int) -> float:
if self.raw_angle_unit == 'centi-deg':
return math.radians(raw_value / 100.0)
# 默认按 centi-rad 处理,适配你当前 0~6.28 弧度输出
return raw_value / 100.0
def parse_sensor_from_can(self, data: bytes):
if len(data) == 0:
return 0.0, 0.0
# 特殊格式0xAA + 16bit角度 + 16bit速度
if data[0] == 0xAA:
if len(data) < 5:
return 0.0, 0.0
raw_deg = (data[1] << 8) | data[2]
raw_vel = (data[3] << 8) | data[4]
vel_deg = raw_vel / 100.0
deg = (raw_deg / 4096.0) * 360.0
rad = self.wrap_to_pi(math.radians(deg))
vel_rad = math.radians(vel_deg)
return rad, vel_rad
if len(data) < 4:
return 0.0, 0.0
# 通用格式前2字节为角度后2字节为速度默认按 centi-rad 解析
raw_deg = int.from_bytes(data[0:2], byteorder='big', signed=True)
rad = self.wrap_to_pi(self.raw_to_rad(raw_deg))
raw_vel = int.from_bytes(data[2:4], byteorder='big', signed=True)
vel_rad = self.raw_to_rad(raw_vel)
return rad, vel_rad
def main():

View File

@ -1,4 +1,5 @@
from setuptools import find_packages, setup
from glob import glob
package_name = 'openarm_joint_sensor'
@ -10,6 +11,8 @@ setup(
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
('share/' + package_name + '/config', glob('config/*.yaml')),
('share/' + package_name + '/launch', glob('launch/*.py')),
],
install_requires=['setuptools'],
zip_safe=True,