增加双臂ik求解控制

This commit is contained in:
shenchenyang 2026-04-22 13:40:28 +08:00
parent 01f283c7cc
commit 3bab8dbd11
5 changed files with 472 additions and 11 deletions

View File

@ -0,0 +1,24 @@
from launch import LaunchDescription
from launch_ros.actions import Node
def generate_launch_description():
"""
Launch both IK controllers for left and right arms simultaneously.
"""
return LaunchDescription([
# Left arm IK controller
Node(
package='openarm_servo_control',
executable='ik_controller_left_node',
name='aruco_ik_controller_left',
output='screen',
),
# Right arm IK controller
Node(
package='openarm_servo_control',
executable='ik_controller_right_node',
name='aruco_ik_controller_right',
output='screen',
),
])

View File

@ -12,8 +12,9 @@ class DrawCircle(Node):
super().__init__("draw_circle")
# Circle and publish settings
self.declare_parameter("topic", "/hand_3d")
self.declare_parameter("frame_id", "base_link")
self.declare_parameter("topic_l", "/left_hand_3d")
self.declare_parameter("topic_r", "/right_hand_3d")
self.declare_parameter("frame_id", "openarm_body_link0")
self.declare_parameter("center_x", 0.15)
self.declare_parameter("center_y", 0.0)
self.declare_parameter("center_z", 0.16)
@ -21,8 +22,9 @@ class DrawCircle(Node):
self.declare_parameter("publish_rate", 50.0) # Hz
self.declare_parameter("angular_speed", 1.5) # rad/s
topic = self._get_param_str("topic", "/hand_3d")
self.frame_id = self._get_param_str("frame_id", "base_link")
topic_l = self._get_param_str("topic_l", "/left_hand_3d")
topic_r = self._get_param_str("topic_r", "/right_hand_3d")
self.frame_id = self._get_param_str("frame_id", "openarm_body_link0")
self.center_x = self._get_param_float("center_x", 0.15)
self.center_y = self._get_param_float("center_y", 0.2)
self.center_z = self._get_param_float("center_z", 0.16)
@ -34,13 +36,14 @@ class DrawCircle(Node):
self.get_logger().warn("publish_rate must be > 0.0, fallback to 50.0")
self.publish_rate = 50.0
self.publisher_ = self.create_publisher(PoseStamped, topic, 10)
self.publisher_l = self.create_publisher(PoseStamped, topic_l, 10)
self.publisher_r = self.create_publisher(PoseStamped, topic_r, 10)
self.theta = 0.0
self.dt = 1.0 / self.publish_rate
self.timer = self.create_timer(self.dt, self.publish_circle_pose)
self.get_logger().info(
f"Publishing circle pose on {topic} (frame_id={self.frame_id}, "
f"Publishing circle pose on {topic_l} (frame_id={self.frame_id}, "
f"center=({self.center_x:.3f}, {self.center_y:.3f}, {self.center_z:.3f}), "
f"radius={self.radius:.3f}, rate={self.publish_rate:.1f}Hz, "
f"omega={self.angular_speed:.3f}rad/s)"
@ -77,7 +80,8 @@ class DrawCircle(Node):
msg.pose.orientation.z = 0.0
msg.pose.orientation.w = 0.0
self.publisher_.publish(msg)
self.publisher_l.publish(msg)
self.publisher_r.publish(msg)
self.theta += self.angular_speed * self.dt
if self.theta > 2.0 * math.pi:

View File

@ -105,7 +105,7 @@ class ArucoIKController(Node):
self.startCount = 0
# === 参数(根据你的 openarm 改)===
self.arm_group = "left_arm"
self.base_frame = "base_link"
self.base_frame = "openarm_body_link0"
self.ee_link = "openarm_left_link7"
self.joint_names = [
@ -144,7 +144,7 @@ class ArucoIKController(Node):
self.create_subscription(
PoseStamped,
"/hand_3d", # 已经转到 base 的 ArUco
"/left_hand_3d", # 已经转到 base 的 ArUco
self.dataProcess,
10
)
@ -287,7 +287,7 @@ class ArucoIKController(Node):
return
curr_pos = np.array([pose.pose.position.x, pose.pose.position.y, pose.pose.position.z])
curr_quat = np.array([pose.pose.orientation.x, pose.pose.orientation.y, pose.pose.orientation.z, pose.pose.orientation.w])
self.get_logger().info(f"Received target pose: {curr_pos}, {curr_quat}")
# self.get_logger().info(f"Received target pose: {curr_pos}, {curr_quat}")
req = GetPositionIK.Request()
req.ik_request.group_name = self.arm_group
@ -345,8 +345,22 @@ class ArucoIKController(Node):
jt = JointTrajectory()
jt.joint_names = self.joint_names
# 打印 IK 返回的原始关节顺序与角度,便于核对 name-position 对应关系
ik_names = list(res.solution.joint_state.name)
ik_positions = list(res.solution.joint_state.position)
# raw_pairs = [f"{n}:{p:.3f}" for n, p in zip(ik_names, ik_positions)]
# self.get_logger().info(f"IK raw pairs: {raw_pairs}")
# 按关节名映射到当前控制器需要的关节顺序,避免误用 position[:7]
ik_name_to_pos = {n: p for n, p in zip(ik_names, ik_positions)}
# missing = [j for j in self.joint_names if j not in ik_name_to_pos]
# if missing:
# self.get_logger().error(f"IK result missing joints: {missing}")
# return
# 计算受限后的关节目标
q_sol = np.array(res.solution.joint_state.position[:len(self.joint_names)])
# mapped_pairs = [f"{j}:{ik_name_to_pos[j]:.3f}" for j in self.joint_names]
# self.get_logger().info(f"IK mapped pairs: {mapped_pairs}")
# q_limited = self.joint_limiter.limit(q_sol)

View File

@ -0,0 +1,417 @@
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import PoseStamped
from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint
from moveit_msgs.srv import GetPositionIK
from sensor_msgs.msg import JointState
import numpy as np
import time
from std_msgs.msg import Float64MultiArray
import queue
POSITION_DEADZONE = 0.010 # 5 mm
class JointLimiter:
def __init__(self,
dof,
max_velocity=0.1, # rad/s
max_acceleration=0.1, # rad/s^2
dt=0.05): # 控制周期
self.dof = dof
self.max_velocity = max_velocity
self.max_acceleration = max_acceleration
self.dt = dt
self.q = np.zeros(dof)
self.dq = np.zeros(dof)
self.initialized = False
def reset(self, q_init):
self.q = np.array(q_init, dtype=float)
self.dq = np.zeros(self.dof)
self.initialized = True
def limit(self, q_target):
q_target = np.array(q_target, dtype=float)
if not self.initialized:
self.reset(q_target)
return q_target
# ---------- 位置误差 ----------
error = q_target - self.q
# ---------- 理想速度 ----------
dq_desired = error / self.dt
# ---------- 速度限制 ----------
dq_desired = np.clip(
dq_desired,
-self.max_velocity,
self.max_velocity
)
# ---------- 加速度限制 ----------
ddq = (dq_desired - self.dq) / self.dt
ddq = np.clip(
ddq,
-self.max_acceleration,
self.max_acceleration
)
# 更新速度
self.dq = self.dq + ddq * self.dt
# 再做一次速度保护
self.dq = np.clip(
self.dq,
-self.max_velocity,
self.max_velocity
)
# 更新位置
self.q = self.q + self.dq * self.dt
return self.q.copy()
class ArucoIKController(Node):
def __init__(self):
super().__init__("aruco_ik_controller")
self.control_period = 0.05 # 20 ms
self.joint_limiter = JointLimiter(
dof=7,
max_velocity=0.01, # 建议 0.1~0.3,降低以减小速度
max_acceleration=0.01, # 建议 0.2~0.6,降低以减小加速度
dt=self.control_period
)
self.Kp = 25.0 # 比例增益(需调试)
self.max_vel = 2.5 # rad/s 每关节上限
self.max_acc = 0.1 # rad/s^2 加速度限幅
self.currPose = PoseStamped()
self.pose_queue = queue.Queue(maxsize=100) # 用于存储最近的目标位姿,避免处理过旧的指令
self.ik_ready = False
self.prev_filtered_pos = None
self.prev_filtered_quat = None
self.prev_joint_angles = None
self.count = 0.0
self.countFlag = False
self.startCount = 0
# === 参数(根据你的 openarm 改)===
self.arm_group = "right_arm"
self.base_frame = "openarm_body_link0"
self.ee_link = "openarm_right_link7"
self.joint_names = [
"openarm_right_joint1",
"openarm_right_joint2",
"openarm_right_joint3",
"openarm_right_joint4",
"openarm_right_joint5",
"openarm_right_joint6",
"openarm_right_joint7",
]
self.latest_joint_state = None
self.create_subscription(
JointState,
"/joint_states",
self.joint_state_cb,
10,
)
self.traj_pub = self.create_publisher(
JointTrajectory,
"/right_joint_trajectory_controller/joint_trajectory",
10
)
self.ik_client = self.create_client(
GetPositionIK,
"/compute_ik"
)
# self.float64MultiArrayPublisher_ = self.create_publisher(
# Float64MultiArray,
# "/left_forward_position_controller/commands",
# 10)
self.create_subscription(
PoseStamped,
"/right_hand_3d", # 已经转到 base 的 ArUco
self.dataProcess,
10
)
self.get_logger().info("Waiting for IK service...")
self.ik_client.wait_for_service()
# self.create_subscription(
# PoseStamped,
# "/hand_3d", # 已经转到 base 的 ArUco
# self.on_pose,
# 10
# )
self.timer = self.create_timer(3.0, self.check_ik_service)
time.sleep(3)
self.timer1 = self.create_timer(0.02, self.on_pose)
self.startTimer = self.create_timer(0.05, self.readyArm)
# 用于计算速度/加速度(用于平滑输出)
self.prev_q = None
self.prev_vel = None
# 如果为 True则只发布 velocities用于 velocity 型控制器)。
# 如果为 False则同时发布 positions/vel/acc 给 position/trajectory 控制器。
self.publish_velocity_only = False
def dataProcess(self, pose: PoseStamped):
posebuf = self.currPose
# posebuf.pose.position.x += pose.pose.position.x
# posebuf.pose.position.y += pose.pose.position.y
# posebuf.pose.position.z += pose.pose.position.z
posebuf.pose.position.x = pose.pose.position.x
posebuf.pose.position.y = pose.pose.position.y
posebuf.pose.position.z = pose.pose.position.z
# posebuf.pose.orientation.x = 1.0
# posebuf.pose.orientation.y = 0.0
# posebuf.pose.orientation.z = 0.0
# posebuf.pose.orientation.w = 0.0
posebuf.pose.orientation.x = pose.pose.orientation.x
posebuf.pose.orientation.y = pose.pose.orientation.y
posebuf.pose.orientation.z = pose.pose.orientation.z
posebuf.pose.orientation.w = pose.pose.orientation.w
self.producer(posebuf)
def readyArm(self):
if self.count < 0.2:
self.count += 0.005
self.currPose.pose.position.x = 0.0
self.currPose.pose.position.y = 0.0
self.currPose.pose.position.z = self.count
self.currPose.pose.orientation.x = 1.0
self.currPose.pose.orientation.y = 0.0
self.currPose.pose.orientation.z = 0.0
self.currPose.pose.orientation.w = 0.0
self.producer(self.currPose)
else:
self.startTimer.cancel()
def test_callback(self):
if self.count > 0.2:
self.countFlag = True
if self.count < 0.1:
self.countFlag = False
if self.countFlag:
self.count -= 0.004
else:
self.count += 0.004
pose = PoseStamped()
pose.pose.position.x = 0.0
pose.pose.position.y = 0.0
pose.pose.position.z = self.count
pose.pose.orientation.x = 1.0
pose.pose.orientation.y = 0.0
pose.pose.orientation.z = 0.0
pose.pose.orientation.w = 0.0
self.producer(pose)
def producer(self, pose: PoseStamped):
try:
self.pose_queue.put_nowait(pose)
except queue.Full:
self.get_logger().warn("Pose queue full, dropping pose")
def joint_state_cb(self, msg: JointState):
self.latest_joint_state = msg
def build_seed_state(self):
if self.latest_joint_state is None:
return None
seed = []
for j in self.joint_names:
try:
idx = self.latest_joint_state.name.index(j)
seed.append(self.latest_joint_state.position[idx])
# self.get_logger().info(f"Seed joint {j}: {self.latest_joint_state.position[idx]:.3f}")
except ValueError:
self.get_logger().error(f"Joint {j} not in joint_states")
return None
js = JointState()
js.name = self.joint_names
js.position = seed
return js
def check_ik_service(self):
if self.ik_client.service_is_ready():
self.get_logger().info("IK service ready!")
self.ik_ready = True
self.timer.cancel()
# def on_pose(self, pose: PoseStamped):
def on_pose(self):
if not self.ik_ready and rclpy.ok():
self.get_logger().warn("IK service not ready, waiting...")
# if self.count > 0.2:
# self.countFlag = True
# if self.count < 0.1:
# self.countFlag = False
# if self.countFlag:
# self.count -= 0.004
# else:
# self.count += 0.004
if self.pose_queue.empty():
# self.get_logger().warn("Pose queue empty, waiting for pose...")
return
pose = self.pose_queue.get()
# pose.pose.position.x = 0.1
# pose.pose.position.y = 0.1
# pose.pose.position.z = self.count
# pose.pose.orientation.x = 1.0
# pose.pose.orientation.y = 0.0
# pose.pose.orientation.z = 0.0
# pose.pose.orientation.w = 0.0
seed_state = self.build_seed_state()
if seed_state is None:
self.get_logger().warn("No valid seed, skip IK")
return
curr_pos = np.array([pose.pose.position.x, pose.pose.position.y, pose.pose.position.z])
curr_quat = np.array([pose.pose.orientation.x, pose.pose.orientation.y, pose.pose.orientation.z, pose.pose.orientation.w])
# self.get_logger().info(f"Received target pose: {curr_pos}, {curr_quat}")
req = GetPositionIK.Request()
req.ik_request.group_name = self.arm_group
# if self.prev_filtered_pos is not None:
# if np.linalg.norm(curr_pos - self.prev_filtered_pos) < POSITION_DEADZONE:
# return # EE 位置变化太小,跳过
self.prev_filtered_pos = curr_pos
self.prev_filtered_quat = curr_quat
curr_pos[0] = np.clip(curr_pos[0], -0.8, 0.8)
curr_pos[1] = np.clip(curr_pos[1], -0.5, 0.5) - 0.15
curr_pos[2] = np.clip(curr_pos[2], 0.0, 0.4) + 0.27
# pose.pose.position.x = 0.1
# pose.pose.position.y = 0.25
# pose.pose.position.z = 0.4
# pose.pose.orientation.x = 1.0
# pose.pose.orientation.y = 0.0
# pose.pose.orientation.z = 0.0
# pose.pose.orientation.w = 0.0
# curr_pos = np.array([0.0, 0.0, 0.0]) # 相对于机械臂 base 的位置
# curr_quat = np.array([1.0, 0.0, 0.0, 0.0])
req.ik_request.pose_stamped.pose.position.x = curr_pos[0]
req.ik_request.pose_stamped.pose.position.y = curr_pos[1]
req.ik_request.pose_stamped.pose.position.z = curr_pos[2]
# req.ik_request.pose_stamped.pose.position = pose.pose.position
req.ik_request.pose_stamped.pose.orientation.x = curr_quat[0]
req.ik_request.pose_stamped.pose.orientation.y = curr_quat[1]
req.ik_request.pose_stamped.pose.orientation.z = curr_quat[2]
req.ik_request.pose_stamped.pose.orientation.w = curr_quat[3]
req.ik_request.ik_link_name = self.ee_link
req.ik_request.timeout.sec = 1
req.ik_request.timeout.nanosec = 200_000_000 # 0.2s
# req.ik_request.robot_state.joint_state.name = seed_state.name
# req.ik_request.robot_state.joint_state.position = seed_state.position
req.ik_request.robot_state.joint_state = seed_state
# self.get_logger().info(f"Seed joints: {seed_state.position}")
# self.get_logger().info(f"joint names: {seed_state.name}")
# self.get_logger().info(f"Target pose: {curr_pos}, {curr_quat}")
future = self.ik_client.call_async(req)
future.add_done_callback(self.on_ik_result)
def on_ik_result(self, future):
res = future.result()
# self.get_logger().info(f"IK result code: {res.error_code.val}")
if res.error_code.val != res.error_code.SUCCESS:
self.get_logger().warn("IK failed")
return
jt = JointTrajectory()
jt.joint_names = self.joint_names
# 打印 IK 返回的原始关节顺序与角度,便于核对 name-position 对应关系
ik_names = list(res.solution.joint_state.name)
ik_positions = list(res.solution.joint_state.position)
# raw_pairs = [f"{n}:{p:.3f}" for n, p in zip(ik_names, ik_positions)]
# self.get_logger().info(f"IK raw pairs: {raw_pairs}")
# 按关节名映射到当前控制器需要的关节顺序,避免误用 position[:7]
ik_name_to_pos = {n: p for n, p in zip(ik_names, ik_positions)}
# missing = [j for j in self.joint_names if j not in ik_name_to_pos]
# if missing:
# self.get_logger().error(f"IK result missing joints: {missing}")
# return
q_sol = np.array([ik_name_to_pos[j] for j in self.joint_names])
# mapped_pairs = [f"{j}:{ik_name_to_pos[j]:.3f}" for j in self.joint_names]
# self.get_logger().info(f"IK mapped pairs: {mapped_pairs}")
# q_limited = self.joint_limiter.limit(q_sol)
# 保存为下一次使用
self.prev_q = q_sol.copy()
# 得到当前关节(按 self.joint_names 顺序)
if self.latest_joint_state is None:
return
q_curr = np.array([self.latest_joint_state.position[self.latest_joint_state.name.index(j)]
for j in self.joint_names])
# q_sol 是 IK 求得的目标位置numpy
q_target = q_sol.copy()
# 1) 计算速度(比例律)
v_des = self.Kp * (q_target - q_curr)
# 2) 限幅速度与加速度(基于 self.prev_vel
v_des = np.clip(v_des, -self.max_vel, self.max_vel)
if getattr(self, 'prev_vel', None) is not None:
dv = (v_des - self.prev_vel) / self.control_period
dv = np.clip(dv, -self.max_acc, self.max_acc)
v_des = self.prev_vel + dv * self.control_period
# 3) 积分得到下一个位置目标
q_cmd = q_curr + v_des * self.control_period
# 构造轨迹点:把 time_from_start 设大一些(例如 0.1s)让控制器有更长时间插值,减少瞬时加速度
point = JointTrajectoryPoint()
# msg = Float64MultiArray()
# 在 velocity-only 模式下,不设置 positions只填 velocities取决于控制器实现是否支持
# if not self.publish_velocity_only:
# point.positions = q_limited.tolist()
point.positions = q_cmd.tolist()
# point.velocities = vel.tolist()
# point.accelerations = acc.tolist()
# point.time_from_start.sec = 0
# point.time_from_start.nanosec = int(0.05 * 1e9) # 0.1s
# msg.data = (q_sol).tolist()
# self.get_logger().info(f"msg.data: {msg.data}")
# self.get_logger().info(f"Publishing joint command: vel={np.round(vel,3).tolist()}, acc={np.round(acc,3).tolist()}, pos={np.round(q_limited,3).tolist()}")
# self.float64MultiArrayPublisher_.publish(msg)
jt.points.append(point)
self.traj_pub.publish(jt)
def main():
rclpy.init()
node = ArucoIKController()
rclpy.spin(node)
if __name__ == "__main__":
main()

View File

@ -10,6 +10,7 @@ setup(
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
('share/' + package_name + '/launch', ['launch/ik_controller.launch.py']),
],
install_requires=['setuptools'],
zip_safe=True,
@ -25,7 +26,8 @@ setup(
entry_points={
'console_scripts': [
'servo_control_node = openarm_servo_control.servo_control:main',
'ik_controller_node = openarm_servo_control.ik_controller:main',
'ik_controller_right_node = openarm_servo_control.ik_controller_right:main',
'ik_controller_left_node = openarm_servo_control.ik_controller_left:main',
'keyboard_ctrl_node = openarm_servo_control.keyboard_ctrl:main',
'draw_circle_node = openarm_servo_control.draw_cricle:main',
],