增加servo控制器

This commit is contained in:
shenchenyang 2026-03-09 15:30:16 +08:00
parent 638ecab791
commit 933d5ec745
10 changed files with 547 additions and 0 deletions

View File

@ -0,0 +1,359 @@
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import PoseStamped
from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint
from moveit_msgs.srv import GetPositionIK
from sensor_msgs.msg import JointState
import numpy as np
import time
from std_msgs.msg import Float64MultiArray
import queue
POSITION_DEADZONE = 0.010 # 5 mm
class JointLimiter:
def __init__(self,
dof,
max_velocity=0.1, # rad/s
max_acceleration=0.1, # rad/s^2
dt=0.05): # 控制周期
self.dof = dof
self.max_velocity = max_velocity
self.max_acceleration = max_acceleration
self.dt = dt
self.q = np.zeros(dof)
self.dq = np.zeros(dof)
self.initialized = False
def reset(self, q_init):
self.q = np.array(q_init, dtype=float)
self.dq = np.zeros(self.dof)
self.initialized = True
def limit(self, q_target):
q_target = np.array(q_target, dtype=float)
if not self.initialized:
self.reset(q_target)
return q_target
# ---------- 位置误差 ----------
error = q_target - self.q
# ---------- 理想速度 ----------
dq_desired = error / self.dt
# ---------- 速度限制 ----------
dq_desired = np.clip(
dq_desired,
-self.max_velocity,
self.max_velocity
)
# ---------- 加速度限制 ----------
ddq = (dq_desired - self.dq) / self.dt
ddq = np.clip(
ddq,
-self.max_acceleration,
self.max_acceleration
)
# 更新速度
self.dq = self.dq + ddq * self.dt
# 再做一次速度保护
self.dq = np.clip(
self.dq,
-self.max_velocity,
self.max_velocity
)
# 更新位置
self.q = self.q + self.dq * self.dt
return self.q.copy()
class ArucoIKController(Node):
def __init__(self):
super().__init__("aruco_ik_controller")
self.control_period = 0.05 # 20 ms
self.joint_limiter = JointLimiter(
dof=7,
max_velocity=0.2, # 建议 0.1~0.3,降低以减小速度
max_acceleration=0.5, # 建议 0.2~0.6,降低以减小加速度
dt=self.control_period
)
self.pose_queue = queue.Queue(maxsize=10) # 用于存储最近的目标位姿,避免处理过旧的指令
self.ik_ready = False
self.prev_filtered_pos = None
self.prev_filtered_quat = None
self.prev_joint_angles = None
self.count = 0.0
self.countFlag = False
self.startCount = 0
# === 参数(根据你的 openarm 改)===
self.arm_group = "left_arm"
self.base_frame = "base_link"
self.ee_link = "openarm_left_link7"
self.joint_names = [
"openarm_left_joint1",
"openarm_left_joint2",
"openarm_left_joint3",
"openarm_left_joint4",
"openarm_left_joint5",
"openarm_left_joint6",
"openarm_left_joint7",
]
self.latest_joint_state = None
self.create_subscription(
JointState,
"/joint_states",
self.joint_state_cb,
10,
)
self.traj_pub = self.create_publisher(
JointTrajectory,
"/left_joint_trajectory_controller/joint_trajectory",
10
)
self.ik_client = self.create_client(
GetPositionIK,
"/compute_ik"
)
self.float64MultiArrayPublisher_ = self.create_publisher(
Float64MultiArray,
"/left_forward_position_controller/commands",
10)
self.create_subscription(
PoseStamped,
"/hand_3d", # 已经转到 base 的 ArUco
self.producer,
10
)
self.get_logger().info("Waiting for IK service...")
self.ik_client.wait_for_service()
# self.create_subscription(
# PoseStamped,
# "/hand_3d", # 已经转到 base 的 ArUco
# self.on_pose,
# 10
# )
self.timer = self.create_timer(3.0, self.check_ik_service)
# time.sleep(3)
self.timer1 = self.create_timer(0.05, self.on_pose)
self.test_timer = self.create_timer(0.05, self.test_callback)
# 用于计算速度/加速度(用于平滑输出)
self.prev_q = None
self.prev_vel = None
# 如果为 True则只发布 velocities用于 velocity 型控制器)。
# 如果为 False则同时发布 positions/vel/acc 给 position/trajectory 控制器。
self.publish_velocity_only = False
def test_callback(self):
if self.count > 0.2:
self.countFlag = True
if self.count < 0.1:
self.countFlag = False
if self.countFlag:
self.count -= 0.004
else:
self.count += 0.004
pose = PoseStamped()
pose.pose.position.x = 0.0
pose.pose.position.y = 0.0
pose.pose.position.z = self.count
pose.pose.orientation.x = 1.0
pose.pose.orientation.y = 0.0
pose.pose.orientation.z = 0.0
pose.pose.orientation.w = 0.0
self.producer(pose)
def producer(self, pose: PoseStamped):
try:
self.pose_queue.put_nowait(pose)
except queue.Full:
self.get_logger().warn("Pose queue full, dropping pose")
def joint_state_cb(self, msg: JointState):
self.latest_joint_state = msg
def build_seed_state(self):
if self.latest_joint_state is None:
return None
seed = []
for j in self.joint_names:
try:
idx = self.latest_joint_state.name.index(j)
seed.append(self.latest_joint_state.position[idx])
except ValueError:
self.get_logger().error(f"Joint {j} not in joint_states")
return None
js = JointState()
js.name = self.joint_names
js.position = seed
return js
def check_ik_service(self):
if self.ik_client.service_is_ready():
self.get_logger().info("IK service ready!")
self.ik_ready = True
self.timer.cancel()
# def on_pose(self, pose: PoseStamped):
def on_pose(self):
if not self.ik_ready and rclpy.ok():
self.get_logger().warn("IK service not ready, waiting...")
# if self.count > 0.2:
# self.countFlag = True
# if self.count < 0.1:
# self.countFlag = False
# if self.countFlag:
# self.count -= 0.004
# else:
# self.count += 0.004
if self.pose_queue.empty():
self.get_logger().warn("Pose queue empty, waiting for pose...")
return
pose = self.pose_queue.get()
# pose.pose.position.x = 0.1
# pose.pose.position.y = 0.1
# pose.pose.position.z = self.count
# pose.pose.orientation.x = 1.0
# pose.pose.orientation.y = 0.0
# pose.pose.orientation.z = 0.0
# pose.pose.orientation.w = 0.0
seed_state = self.build_seed_state()
if seed_state is None:
self.get_logger().warn("No valid seed, skip IK")
return
curr_pos = np.array([pose.pose.position.x, pose.pose.position.y, pose.pose.position.z])
curr_quat = np.array([pose.pose.orientation.x, pose.pose.orientation.y, pose.pose.orientation.z, pose.pose.orientation.w])
# self.get_logger().info(f"Received target pose: {curr_pos}, {curr_quat}")
req = GetPositionIK.Request()
req.ik_request.group_name = self.arm_group
# if self.prev_filtered_pos is not None:
# if np.linalg.norm(curr_pos - self.prev_filtered_pos) < POSITION_DEADZONE:
# return # EE 位置变化太小,跳过
self.prev_filtered_pos = curr_pos
self.prev_filtered_quat = curr_quat
curr_pos[0] = np.clip(curr_pos[0], 0, 0.8)
curr_pos[1] = np.clip(curr_pos[1], -0.2, 0.2) + 0.15
curr_pos[2] = np.clip(curr_pos[2], 0.0, 0.3) + 0.27
# pose.pose.position.x = 0.1
# pose.pose.position.y = 0.25
# pose.pose.position.z = 0.4
# pose.pose.orientation.x = 1.0
# pose.pose.orientation.y = 0.0
# pose.pose.orientation.z = 0.0
# pose.pose.orientation.w = 0.0
# curr_pos = np.array([0.0, 0.0, 0.0]) # 相对于机械臂 base 的位置
curr_quat = np.array([1.0, 0.0, 0.0, 0.0])
req.ik_request.pose_stamped.pose.position.x = curr_pos[0]
req.ik_request.pose_stamped.pose.position.y = curr_pos[1]
req.ik_request.pose_stamped.pose.position.z = curr_pos[2]
# req.ik_request.pose_stamped.pose.position = pose.pose.position
req.ik_request.pose_stamped.pose.orientation.x = curr_quat[0]
req.ik_request.pose_stamped.pose.orientation.y = curr_quat[1]
req.ik_request.pose_stamped.pose.orientation.z = curr_quat[2]
req.ik_request.pose_stamped.pose.orientation.w = curr_quat[3]
req.ik_request.ik_link_name = self.ee_link
req.ik_request.timeout.sec = 1
req.ik_request.timeout.nanosec = 200_000_000 # 0.2s
# req.ik_request.robot_state.joint_state.name = seed_state.name
# req.ik_request.robot_state.joint_state.position = seed_state.position
req.ik_request.robot_state.joint_state = seed_state
# self.get_logger().info(f"Seed joints: {seed_state.position}")
# self.get_logger().info(f"joint names: {seed_state.name}")
# self.get_logger().info(f"Target pose: {curr_pos}, {curr_quat}")
future = self.ik_client.call_async(req)
future.add_done_callback(self.on_ik_result)
def on_ik_result(self, future):
res = future.result()
# self.get_logger().info(f"IK result code: {res.error_code.val}")
if res.error_code.val != res.error_code.SUCCESS:
self.get_logger().warn("IK failed")
return
jt = JointTrajectory()
jt.joint_names = self.joint_names
# 计算受限后的关节目标
q_sol = np.array(res.solution.joint_state.position[:len(self.joint_names)])
q_limited = self.joint_limiter.limit(q_sol)
# 计算速度(基于上一帧的 q并限幅
if self.prev_q is None:
vel = np.zeros_like(q_limited)
else:
vel = (q_limited - self.prev_q) / self.control_period
vel = np.clip(vel, -self.joint_limiter.max_velocity, self.joint_limiter.max_velocity)
# 计算加速度并限幅
if self.prev_vel is None:
acc = np.zeros_like(vel)
else:
acc = (vel - self.prev_vel) / self.control_period
acc = np.clip(acc, -self.joint_limiter.max_acceleration, self.joint_limiter.max_acceleration)
# 保存为下一次使用
self.prev_q = q_limited.copy()
self.prev_vel = vel.copy()
# 构造轨迹点:把 time_from_start 设大一些(例如 0.1s)让控制器有更长时间插值,减少瞬时加速度
# point = JointTrajectoryPoint()
msg = Float64MultiArray()
# 在 velocity-only 模式下,不设置 positions只填 velocities取决于控制器实现是否支持
# if not self.publish_velocity_only:
# point.positions = q_limited.tolist()
# point.velocities = vel.tolist()
# point.accelerations = acc.tolist()
# point.time_from_start.sec = 0
# point.time_from_start.nanosec = int(0.1 * 1e9) # 0.1s
msg.data = (q_sol).tolist()
# self.get_logger().info(f"msg.data: {msg.data}")
# self.get_logger().info(f"Publishing joint command: vel={np.round(vel,3).tolist()}, acc={np.round(acc,3).tolist()}, pos={np.round(q_limited,3).tolist()}")
self.float64MultiArrayPublisher_.publish(msg)
# jt.points.append(point)
# self.traj_pub.publish(jt)
def main():
rclpy.init()
node = ArucoIKController()
rclpy.spin(node)
if __name__ == "__main__":
main()

View File

@ -0,0 +1,62 @@
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import TwistStamped
from control_msgs.msg import JointJog
from std_msgs.msg import Float64MultiArray
TWIST_TOPIC = "/left_servo_node/delta_twist_cmds"
JOINT_TOPIC = "/left_servo_node/delta_joint_cmds"
EEF_FRAME_ID = "openarm_left_link7"
BASE_FRAME_ID = "openarm_body_link0"
class ServoControlNode(Node):
def __init__(self):
super().__init__('servo_control_node')
self.get_logger().info('正在初始化Servo控制节点...')
# self.jointJogPublisher_ = self.create_publisher(JointJog, JOINT_TOPIC, 10)
# self.twistStampedPublisher_ = self.create_publisher(TwistStamped, TWIST_TOPIC, 10)
self.float64MultiArrayPublisher_ = self.create_publisher(Float64MultiArray, "/left_forward_position_controller/commands", 10)
self.timer = self.create_timer(0.05, self.timer_callback) # 每20ms发布一次
self.count = 0.0
self.countFlag = False
def timer_callback(self):
msg = Float64MultiArray()
if self.count > 1.5:
self.countFlag = True
if self.count < -1.0:
self.countFlag = False
if self.countFlag:
self.count -= 0.05
else:
self.count += 0.05
msg.data = [self.count, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0] # 示例速度
# msg = JointJog()
# msg.header.stamp = self.get_clock().now().to_msg()
# msg.joint_names = ["openarm_left_joint1"]
# msg.velocities = [10.0] # 示例速度
# msg.displacements = [1.0] # 示例位移
# # 设置持续时间为1秒
# try:
# msg.duration.sec = 1
# except Exception:
# # 如果字段不存在或不能直接赋值,忽略以保持兼容性
# pass
# msg.header.frame_id = BASE_FRAME_ID
# self.jointJogPublisher_.publish(msg)
self.float64MultiArrayPublisher_.publish(msg)
self.get_logger().info('Published servo command')
def main():
rclpy.init()
node = ServoControlNode()
try:
rclpy.spin(node)
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()

View File

@ -0,0 +1,18 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>openarm_servo_control</name>
<version>0.0.0</version>
<description>TODO: Package description</description>
<maintainer email="664376944@qq.com">shen</maintainer>
<license>TODO: License declaration</license>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View File

@ -0,0 +1,4 @@
[develop]
script_dir=$base/lib/openarm_servo_control
[install]
install_scripts=$base/lib/openarm_servo_control

View File

@ -0,0 +1,31 @@
from setuptools import find_packages, setup
package_name = 'openarm_servo_control'
setup(
name=package_name,
version='0.0.0',
packages=find_packages(exclude=['test']),
data_files=[
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
],
install_requires=['setuptools'],
zip_safe=True,
maintainer='shen',
maintainer_email='664376944@qq.com',
description='TODO: Package description',
license='TODO: License declaration',
extras_require={
'test': [
'pytest',
],
},
entry_points={
'console_scripts': [
'servo_control_node = openarm_servo_control.servo_control:main',
'ik_controller_node = openarm_servo_control.ik_controller:main',
],
},
)

View File

@ -0,0 +1,25 @@
# Copyright 2015 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ament_copyright.main import main
import pytest
# Remove the `skip` decorator once the source file(s) have a copyright header
@pytest.mark.skip(reason='No copyright header has been placed in the generated source file.')
@pytest.mark.copyright
@pytest.mark.linter
def test_copyright():
rc = main(argv=['.', 'test'])
assert rc == 0, 'Found errors'

View File

@ -0,0 +1,25 @@
# Copyright 2017 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ament_flake8.main import main_with_errors
import pytest
@pytest.mark.flake8
@pytest.mark.linter
def test_flake8():
rc, errors = main_with_errors(argv=[])
assert rc == 0, \
'Found %d code style errors / warnings:\n' % len(errors) + \
'\n'.join(errors)

View File

@ -0,0 +1,23 @@
# Copyright 2015 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ament_pep257.main import main
import pytest
@pytest.mark.linter
@pytest.mark.pep257
def test_pep257():
rc = main(argv=['.', 'test'])
assert rc == 0, 'Found code style errors / warnings'