增加夹爪控制

This commit is contained in:
shenchenyang 2026-04-03 14:46:06 +08:00
parent b1e945ed92
commit c22cc75597
3 changed files with 116 additions and 62 deletions

View File

@ -1,6 +1,6 @@
joint_sensor_node:
ros__parameters:
zero_offsets: [2.7995149378912956, 2.5233983960718795, 0.0, 0.0, 0.0, 0.0, 0.0]
sensor_directions: [-1.0, 1.0, -1.0, -1.0, -1.0, 1.0, -1.0]
zero_offsets: [2.7995149378912956, 2.5233983960718795, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
sensor_directions: [-1.0, 1.0, -1.0, -1.0, -1.0, 1.0, 1.0, -1.0]
auto_zero_on_first_sample: true
can_interface: 'can2'

View File

@ -11,6 +11,37 @@ from std_msgs.msg import Float64MultiArray
count = 0
count1 = 0
def map_float_range(value, in_min, in_max, out_min, out_max, clamp=False):
"""Map a float value from one range to another.
Args:
value: input value.
in_min: input range minimum.
in_max: input range maximum.
out_min: output range minimum.
out_max: output range maximum.
clamp: whether to clamp output into [min(out_min, out_max), max(out_min, out_max)].
Returns:
Mapped float in output range.
"""
in_span = float(in_max) - float(in_min)
if abs(in_span) < 1e-12:
raise ValueError('in_max and in_min must not be equal')
out_span = float(out_max) - float(out_min)
mapped = float(out_min) + ((float(value) - float(in_min)) / in_span) * out_span
if clamp:
low = min(float(out_min), float(out_max))
high = max(float(out_min), float(out_max))
mapped = max(low, min(high, mapped))
return mapped
class PositionFilter:
"""Simple per-joint filter supporting EMA and a basic 1D Kalman.
@ -78,8 +109,8 @@ class ProgressActionClient(Node):
self.kalman_q = self.get_parameter('kalman_q').get_parameter_value().double_value
self.kalman_r = self.get_parameter('kalman_r').get_parameter_value().double_value
# Create per-joint filters (7 joints expected)
self.num_joints = 7
# Create per-joint filters
self.num_joints = 8
self.filters = [PositionFilter(filter_type=self.filter_type,
alpha=self.ema_alpha,
q=self.kalman_q,
@ -87,7 +118,7 @@ class ProgressActionClient(Node):
for _ in range(self.num_joints)]
# Buffer outgoing joint commands and send them at timer rate.
self.command_queue = deque(maxlen=100)
self.command_queue = deque(maxlen=50)
self.subscriber_ = self.create_subscription(
JointState,
@ -96,10 +127,13 @@ class ProgressActionClient(Node):
10
)
self.float64MultiArrayPublisher_ = self.create_publisher(Float64MultiArray, "/right_forward_position_controller/commands", 10)
self.armPosPublisher_ = self.create_publisher(Float64MultiArray, "/right_forward_position_controller/commands", 10)
self.gipperPosPublisher_ = self.create_publisher(Float64MultiArray, "/right_gripper_controller/commands", 10)
# 3-1.创建动作客户端;
self._action_client = ActionClient(self, FollowJointTrajectory, '/right_joint_trajectory_controller/follow_joint_trajectory')
self.create_timer(0.01, self.timer_callback) # 定时器周期为0.1秒
# self._action_client = ActionClient(self, FollowJointTrajectory, '/right_joint_trajectory_controller/follow_joint_trajectory')
# self.right_gripper_action_client = ActionClient(self, FollowJointTrajectory, '/right_joint_trajectory_controller/follow_joint_trajectory')
self.create_timer(0.02, self.timer_callback) # 定时器周期为0.02秒
# 3. 创建订阅者
# 参数1消息类型如String与发布者一致
# 参数2话题名称如"chatter",与发布者一致)
@ -135,16 +169,26 @@ class ProgressActionClient(Node):
else:
arm_pos_to_send = arm_pos
self.command_queue.append(arm_pos_to_send)
if len(arm_pos_to_send) < self.num_joints:
self.get_logger().warning(
f'Expected {self.num_joints} positions, got {len(arm_pos_to_send)}. Skip sending split commands.'
)
return
# 前7个关节给机械臂控制器第8个关节给夹爪控制器
arm7_pos_to_send = arm_pos_to_send[:7]
gripper_pos_to_send = arm_pos_to_send[7]
self.command_queue.append((arm7_pos_to_send, gripper_pos_to_send))
# self.send_goal(arm_pos_to_send)
def timer_callback(self):
# 定时器回调函数,可以用于周期性任务
if self.command_queue:
arm_pos_to_send = self.command_queue.popleft()
self.send_goal(arm_pos_to_send)
arm7_pos_to_send, gripper_pos_to_send = self.command_queue.popleft()
self.send_goal(arm7_pos_to_send, gripper_pos_to_send)
def send_goal(self, arm_pos):
def send_goal(self, arm7_pos, gripper_pos):
arm_joint = ['openarm_right_joint1', 'openarm_right_joint2', 'openarm_right_joint3', 'openarm_right_joint4', 'openarm_right_joint5', 'openarm_right_joint6', 'openarm_right_joint7']
arm_effort = [10.0, 10.0, 10.0, 10.0, 10.0, 10.0, 10.0]
arm_zero = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
@ -159,36 +203,44 @@ class ProgressActionClient(Node):
# self._action_client.wait_for_server()
# self._send_goal_future = self._action_client.send_goal_async(goal_msg, feedback_callback=self.feedback_callback)
# self._send_goal_future.add_done_callback(self.goal_response_callback)
msg = Float64MultiArray()
msg.data = arm_pos
self.float64MultiArrayPublisher_.publish(msg)
self.get_logger().info(f'Published joint commands: {arm_pos}')
arm_msg = Float64MultiArray()
arm_msg.data = arm7_pos
self.armPosPublisher_.publish(arm_msg)
def goal_response_callback(self, future):
# 3-3.处理目标发送后的反馈;
goal_handle = future.result()
print(goal_handle)
if not goal_handle.accepted:
self.get_logger().info('请求被拒绝')
return
gripper_msg = Float64MultiArray()
gripper_msg.data = [map_float_range(gripper_pos, 0.0, 1.18, 0.0, 0.045)]
self.gipperPosPublisher_.publish(gripper_msg)
self.get_logger().info('请求被接收,开始执行任务!')
# self.get_logger().info(
# f'Published arm joints(1-7): {arm7_pos}, gripper joint8: {gripper_pos}'
# )
# self.get_logger().info(f'Published gripper joint8: {gripper_pos}')
self._get_result_future = goal_handle.get_result_async()
self._get_result_future.add_done_callback(self.get_result_callback)
# def goal_response_callback(self, future):
# # 3-3.处理目标发送后的反馈;
# goal_handle = future.result()
# print(goal_handle)
# if not goal_handle.accepted:
# self.get_logger().info('请求被拒绝')
# return
# 3-5.处理最终响应。
def get_result_callback(self, future):
return
# result = future.result().result
# self.get_logger().info('最终计算结果sum = %d' % result.sum)
# 5.释放资源。
# rclpy.shutdown()
# self.get_logger().info('请求被接收,开始执行任务!')
# 3-4.处理连续反馈;
def feedback_callback(self, feedback_msg):
feedback = (int)(feedback_msg.feedback.progress * 100)
self.get_logger().info('当前进度: %d%%' % feedback)
# self._get_result_future = goal_handle.get_result_async()
# self._get_result_future.add_done_callback(self.get_result_callback)
# # 3-5.处理最终响应。
# def get_result_callback(self, future):
# return
# # result = future.result().result
# # self.get_logger().info('最终计算结果sum = %d' % result.sum)
# # 5.释放资源。
# # rclpy.shutdown()
# # 3-4.处理连续反馈;
# def feedback_callback(self, feedback_msg):
# feedback = (int)(feedback_msg.feedback.progress * 100)
# self.get_logger().info('当前进度: %d%%' % feedback)
def main():
rclpy.init()

View File

@ -22,6 +22,7 @@ class CallbackListener(can.Listener):
class JointSensorNode(Node):
def __init__(self):
super().__init__('joint_sensor_node')
self.num_joints = 8
self.declare_parameter('can_interface', 'can2')
self.get_logger().info('正在初始化CAN接口...')
can_interface = self.get_parameter('can_interface').value
@ -29,8 +30,8 @@ class JointSensorNode(Node):
# ROS2 publisher: 发布 JointState 到话题 'joint_states'
self.publisher_ = self.create_publisher(JointState, 'joint_sensor_states', 10)
self.joint_names = [f'joint{i}' for i in range(1, 8)]
# 7路传感器 CAN ID -> 7个关节索引
self.joint_names = [f'joint{i}' for i in range(1, self.num_joints + 1)]
# 8路传感器 CAN ID -> 8个关节索引
self.can_to_joint_index = {
0x01: 0,
0x02: 1,
@ -39,25 +40,26 @@ class JointSensorNode(Node):
0x05: 4,
0x06: 5,
0x07: 6,
0x08: 7,
}
self.current_positions = [0.0] * 7
self.current_velocities = [0.0] * 7
self.has_position = [False] * 7
self.last_wrapped_positions = [0.0] * 7
self.has_wrapped_position = [False] * 7
self.current_positions = [0.0] * self.num_joints
self.current_velocities = [0.0] * self.num_joints
self.has_position = [False] * self.num_joints
self.last_wrapped_positions = [0.0] * self.num_joints
self.has_wrapped_position = [False] * self.num_joints
# 传感器原始单位:'centi-rad' 表示原始值/100后得到弧度'centi-deg' 表示原始值/100后得到角度
self.raw_angle_unit = 'centi-rad'
# 每个关节零点偏置(弧度),发布角度时会先减去该偏置
self.declare_parameter('zero_offsets', [0.0] * 7)
self.declare_parameter('zero_offsets', [0.0] * self.num_joints)
# 每个关节转向系数1.0 为正向,-1.0 为反向
self.declare_parameter('sensor_directions', [1.0] * 7)
self.declare_parameter('sensor_directions', [1.0] * self.num_joints)
self.declare_parameter('auto_zero_on_first_sample', False)
self.declare_parameter('calibrate_now', False)
self.zero_offsets = self._load_zero_offsets_from_param()
self.sensor_directions = self._load_sensor_directions_from_param()
self.auto_zero_on_first_sample = bool(self.get_parameter('auto_zero_on_first_sample').value)
self.auto_zero_done = [False] * 7
self.auto_zero_done = [False] * self.num_joints
self.add_on_set_parameters_callback(self.on_set_parameters)
self.listener = CallbackListener(self.listen_can_callback)
@ -91,7 +93,7 @@ class JointSensorNode(Node):
self.current_positions[joint_index] = angle
self.current_velocities[joint_index] = vel
# 发布完整7关节状态(位置/速度单位均为弧度)
# 发布完整8关节状态(位置/速度单位均为弧度)
js = JointState()
js.header.stamp = self.get_clock().now().to_msg()
js.name = self.joint_names
@ -112,16 +114,16 @@ class JointSensorNode(Node):
def _load_zero_offsets_from_param(self):
raw_offsets = self.get_parameter('zero_offsets').value
if not isinstance(raw_offsets, (list, tuple)) or len(raw_offsets) != 7:
self.get_logger().warn('参数 zero_offsets 长度不是7,已使用默认零偏。')
return [0.0] * 7
if not isinstance(raw_offsets, (list, tuple)) or len(raw_offsets) != self.num_joints:
self.get_logger().warn(f'参数 zero_offsets 长度不是{self.num_joints},已使用默认零偏。')
return [0.0] * self.num_joints
return [float(v) for v in raw_offsets]
def _load_sensor_directions_from_param(self):
raw_dirs = self.get_parameter('sensor_directions').value
if not isinstance(raw_dirs, (list, tuple)) or len(raw_dirs) != 7:
self.get_logger().warn('参数 sensor_directions 长度不是7,已使用默认方向。')
return [1.0] * 7
if not isinstance(raw_dirs, (list, tuple)) or len(raw_dirs) != self.num_joints:
self.get_logger().warn(f'参数 sensor_directions 长度不是{self.num_joints},已使用默认方向。')
return [1.0] * self.num_joints
dirs = [float(v) for v in raw_dirs]
for i, v in enumerate(dirs):
@ -132,7 +134,7 @@ class JointSensorNode(Node):
def calibrate_zero_from_current(self):
updated = 0
for i in range(7):
for i in range(self.num_joints):
if self.has_wrapped_position[i]:
self.zero_offsets[i] = self.last_wrapped_positions[i]
self.has_position[i] = False
@ -146,19 +148,19 @@ class JointSensorNode(Node):
self.zero_offsets,
)
])
self.get_logger().info(f'零点校准完成,已更新 {updated}/7 个关节零偏。')
self.get_logger().info(f'零点校准完成,已更新 {updated}/{self.num_joints} 个关节零偏。')
else:
self.get_logger().warn('零点校准失败:当前还没有可用的关节采样数据。')
def on_set_parameters(self, params):
for param in params:
if param.name == 'zero_offsets':
if not isinstance(param.value, (list, tuple)) or len(param.value) != 7:
return SetParametersResult(successful=False, reason='zero_offsets 必须是长度为7的数组')
if not isinstance(param.value, (list, tuple)) or len(param.value) != self.num_joints:
return SetParametersResult(successful=False, reason=f'zero_offsets 必须是长度为{self.num_joints}的数组')
if param.name == 'sensor_directions':
if not isinstance(param.value, (list, tuple)) or len(param.value) != 7:
return SetParametersResult(successful=False, reason='sensor_directions 必须是长度为7的数组')
if not isinstance(param.value, (list, tuple)) or len(param.value) != self.num_joints:
return SetParametersResult(successful=False, reason=f'sensor_directions 必须是长度为{self.num_joints}的数组')
for i, v in enumerate(param.value):
if abs(float(v)) < 1e-9:
return SetParametersResult(successful=False, reason=f'sensor_directions[{i}] 不能为0')
@ -172,11 +174,11 @@ class JointSensorNode(Node):
for param in params:
if param.name == 'zero_offsets':
self.zero_offsets = [float(v) for v in param.value]
self.has_position = [False] * 7
self.has_position = [False] * self.num_joints
if param.name == 'sensor_directions':
self.sensor_directions = [float(v) for v in param.value]
self.has_position = [False] * 7
self.has_position = [False] * self.num_joints
return SetParametersResult(successful=True)