增加外骨骼角度传感器控制

This commit is contained in:
shenchenyang 2026-02-25 15:12:23 +08:00
parent ee8e0fbd47
commit 1dde8de63f
11 changed files with 362 additions and 0 deletions

View File

@ -0,0 +1,89 @@
import rclpy
from rclpy.node import Node
# from control_msgs.msg import JointTrajectoryControllerState
from control_msgs.action import FollowJointTrajectory
from rclpy.action import ActionClient
from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint
from sensor_msgs.msg import JointState
count = 0
count1 = 0
class ProgressActionClient(Node):
def __init__(self):
super().__init__('progress_action_client')
self.subscriber_ = self.create_subscription(
JointState,
"/joint_sensor_states",
self.listener_callback,
10
)
# 3-1.创建动作客户端;
self._action_client = ActionClient(self, FollowJointTrajectory, '/right_joint_trajectory_controller/follow_joint_trajectory')
# 3. 创建订阅者
# 参数1消息类型如String与发布者一致
# 参数2话题名称如"chatter",与发布者一致)
# 参数3回调函数消息到达时触发用于处理消息
# 参数4队列长度10缓存消息的最大数量
self.subscriber_ # 防止未使用变量警告
# 4. 消息处理回调函数(核心)
# 当订阅到消息时,自动调用此函数处理消息
def listener_callback(self, msg):
# self.send_goal([count, count1, 0.0, 0.0, 0.0, 0.0, 0.0])
self.get_logger().info(f'Received joint states: {list(msg.name)} positions: {list(msg.position)}')
self.send_goal(msg.position)
def send_goal(self, arm_pos):
arm_joint = ['openarm_right_joint1', 'openarm_right_joint2', 'openarm_right_joint3', 'openarm_right_joint4', 'openarm_right_joint5', 'openarm_right_joint6', 'openarm_right_joint7']
arm_effort = [10.0, 10.0, 10.0, 10.0, 10.0, 10.0, 10.0]
arm_zero = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
# 3-2.发送请求;
goal_msg = FollowJointTrajectory.Goal()
goal_msg.trajectory.joint_names = arm_joint
goal_msg.trajectory.points.append(JointTrajectoryPoint())
goal_msg.trajectory.points[0].positions = arm_pos
# goal_msg.trajectory.points[0].effort = arm_effort
# print(goal_msg)
self._action_client.wait_for_server()
self._send_goal_future = self._action_client.send_goal_async(goal_msg, feedback_callback=self.feedback_callback)
self._send_goal_future.add_done_callback(self.goal_response_callback)
def goal_response_callback(self, future):
# 3-3.处理目标发送后的反馈;
goal_handle = future.result()
print(goal_handle)
if not goal_handle.accepted:
self.get_logger().info('请求被拒绝')
return
self.get_logger().info('请求被接收,开始执行任务!')
self._get_result_future = goal_handle.get_result_async()
self._get_result_future.add_done_callback(self.get_result_callback)
# 3-5.处理最终响应。
def get_result_callback(self, future):
return
# result = future.result().result
# self.get_logger().info('最终计算结果sum = %d' % result.sum)
# 5.释放资源。
# rclpy.shutdown()
# 3-4.处理连续反馈;
def feedback_callback(self, feedback_msg):
feedback = (int)(feedback_msg.feedback.progress * 100)
self.get_logger().info('当前进度: %d%%' % feedback)
def main():
rclpy.init()
node = ProgressActionClient()
rclpy.spin(node)
rclpy.shutdown()

View File

@ -0,0 +1,52 @@
import rclpy
from rclpy.callback_groups import ReentrantCallbackGroup
from rclpy.node import Node
from pymoveit2 import MoveIt2, MoveIt2State
from pymoveit2.robots import openarm as robot
from sensor_msgs.msg import JointState
class MoveIt2ControlNode(Node):
def __init__(self):
super().__init__('moveit2_control_node')
self.get_logger().info('正在初始化MoveIt2控制节点...')
self.subscriber_ = self.create_subscription(
JointState,
"/joint_sensor_states",
self.listener_callback,
10
)
# Create callback group that allows execution of callbacks in parallel without restrictions
callback_group = ReentrantCallbackGroup()
# Create MoveIt 2 interface
self.moveit2 = MoveIt2(
node=self,
joint_names=robot.joint_names(),
base_link_name=robot.base_link_name(),
end_effector_name=robot.end_effector_name(),
group_name=robot.MOVE_GROUP_ARM,
callback_group=callback_group,
)
self.moveit2.planner_id = (
"RRTConnectkConfigDefault"
)
# Scale down velocity and acceleration of joints (percentage of maximum)
self.moveit2.max_velocity = 0.5
self.moveit2.max_acceleration = 0.5
def listener_callback(self, msg):
# self.send_goal([count, count1, 0.0, 0.0, 0.0, 0.0, 0.0])
self.get_logger().info(f'Received joint states: positions: {list(msg.position)}')
self.moveit2.move_to_configuration(msg.position)
# self.get_logger().info("Waiting until execution is complete...")
self.moveit2.wait_until_executed()
def main():
rclpy.init()
node = MoveIt2ControlNode()
rclpy.spin(node)
rclpy.shutdown()
if __name__ == '__main__':
main()

View File

@ -0,0 +1,94 @@
import rclpy
from rclpy.node import Node
import can
import math
from sensor_msgs.msg import JointState
class CallbackListener(can.Listener):
def __init__(self, user_callback):
super().__init__()
self.user_callback = user_callback
def on_message_received(self, msg: can.Message):
# 这是在 Notifier 的线程里调用的 -> 回调应该尽量快
try:
self.user_callback(msg)
except Exception as e:
# 捕获回调内异常,避免中断 Notifier 分发
print(f"Callback error: {e}")
class JointSensorNode(Node):
def __init__(self):
super().__init__('joint_sensor_node')
self.get_logger().info('正在初始化CAN接口...')
self.bus = can.interface.Bus(channel='can0', bustype='socketcan')
# ROS2 publisher: 发布 JointState 到话题 'joint_states'
self.publisher_ = self.create_publisher(JointState, 'joint_sensor_states', 10)
# 可配置的 CAN ID -> joint name 列表映射
# 默认示例ID 0x123 包含两个关节角,每个角用 int16 存储(单位 centi-degrees除以100得到度
# 你可以根据真实硬件修改此映射或解析函数
self.can_to_joints = {
0x01: ['joint1'],
}
self.listener = CallbackListener(self.listen_can_callback)
self.notifier = can.Notifier(self.bus, [self.listener])
def listen_can_callback(self, msg: can.Message):
# 用户自定义的回调函数
try:
self.get_logger().debug(f"Received: id=0x{msg.arbitration_id:X} dlc={msg.dlc} data={msg.data.hex()} ts={msg.timestamp}")
# 解析角度数据(默认按每 2 字节 int16单位 centi-degrees
angles = self.parse_angles_from_can(msg.data)
# 根据 can id 找到 joint 名称列表
names = self.can_to_joints.get(msg.arbitration_id, [f'joint_{i}' for i in range(len(angles))])
# 对长度不匹配做适配(截断或填充 0
if len(names) != len(angles):
# 截断或填充名称或角度,保证二者等长
if len(names) > len(angles):
names = names[:len(angles)]
else:
# 增加默认名称
names = names + [f'joint_{i}' for i in range(len(names), len(angles))]
# 发布 JointState位置使用弧度
js = JointState()
js.header.stamp = self.get_clock().now().to_msg()
js.name = names
js.position = angles
self.publisher_.publish(js)
self.get_logger().info(f"Published JointState: ids=0x{msg.arbitration_id:X} names={names} positions={angles}")
except Exception as e:
self.get_logger().error(f"Error handling CAN msg: {e}")
def shutdown(self):
self.notifier.stop()
self.bus.shutdown()
def parse_angles_from_can(self, data: bytes):
angles = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0] # 默认两个关节角,单位弧度
if data[0] == 0xaa: # 假设 0xaa 开头表示特殊格式,使用不同解析方式
raw = (data[2] << 8) | data[3]
deg = (raw / 4096.0) * 360.0 # 例如特殊格式直接除以 4096 得到度
rad = math.radians(deg) - 3.14
self.get_logger().info(f"Special format detected: raw={raw} deg={deg} rad={rad}")
angles[0] = rad # 假设特殊格式只包含一个关节角,放在第一个位置
return angles
def main():
rclpy.init()
node = JointSensorNode()
rclpy.spin(node)
node.shutdown()
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()

View File

@ -0,0 +1,18 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>openarm_joint_sensor</name>
<version>0.0.0</version>
<description>TODO: Package description</description>
<maintainer email="664376944@qq.com">shen</maintainer>
<license>TODO: License declaration</license>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View File

@ -0,0 +1,4 @@
[develop]
script_dir=$base/lib/openarm_joint_sensor
[install]
install_scripts=$base/lib/openarm_joint_sensor

View File

@ -0,0 +1,32 @@
from setuptools import find_packages, setup
package_name = 'openarm_joint_sensor'
setup(
name=package_name,
version='0.0.0',
packages=find_packages(exclude=['test']),
data_files=[
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
],
install_requires=['setuptools'],
zip_safe=True,
maintainer='shen',
maintainer_email='664376944@qq.com',
description='TODO: Package description',
license='TODO: License declaration',
extras_require={
'test': [
'pytest',
],
},
entry_points={
'console_scripts': [
'joint_sensor_can = openarm_joint_sensor.joint_sensor_can:main',
'joint_control = openarm_joint_sensor.joint_control:main',
'joint_moveit_control = openarm_joint_sensor.joint_moveit_control:main',
],
},
)

View File

@ -0,0 +1,25 @@
# Copyright 2015 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ament_copyright.main import main
import pytest
# Remove the `skip` decorator once the source file(s) have a copyright header
@pytest.mark.skip(reason='No copyright header has been placed in the generated source file.')
@pytest.mark.copyright
@pytest.mark.linter
def test_copyright():
rc = main(argv=['.', 'test'])
assert rc == 0, 'Found errors'

View File

@ -0,0 +1,25 @@
# Copyright 2017 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ament_flake8.main import main_with_errors
import pytest
@pytest.mark.flake8
@pytest.mark.linter
def test_flake8():
rc, errors = main_with_errors(argv=[])
assert rc == 0, \
'Found %d code style errors / warnings:\n' % len(errors) + \
'\n'.join(errors)

View File

@ -0,0 +1,23 @@
# Copyright 2015 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ament_pep257.main import main
import pytest
@pytest.mark.linter
@pytest.mark.pep257
def test_pep257():
rc = main(argv=['.', 'test'])
assert rc == 0, 'Found code style errors / warnings'