增加ik解算

This commit is contained in:
shenchenyang 2026-02-09 18:32:23 +08:00
parent e3b7360b9a
commit ca79489052
2 changed files with 249 additions and 0 deletions

View File

@ -0,0 +1,248 @@
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import PoseStamped
from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint
from moveit_msgs.srv import GetPositionIK
from sensor_msgs.msg import JointState
import numpy as np
from scipy.spatial.transform import Rotation as R
POSITION_DEADZONE = 0.050 # 5 mm
# ---- 配置 ----
LEFT_ARM_JOINTS = [
"openarm_left_joint1",
"openarm_left_joint2",
"openarm_left_joint3",
"openarm_left_joint4",
"openarm_left_joint5",
"openarm_left_joint6",
"openarm_left_joint7",
]
CONTROL_PERIOD = 0.02 # 20ms
MAX_VELOCITY = 0.5
MAX_ACCELERATION = 1.0
POSE_ALPHA = 0.3
# ---- 平滑/限制类 ----
class PoseFilter:
def __init__(self, alpha=0.3):
self.alpha = alpha
self.pos = None
self.ori = None
def slerp_quat(q0, q1, alpha):
"""
q0, q1: numpy array [x, y, z, w]
alpha: float, 0~1
"""
r0 = R.from_quat(q0)
r1 = R.from_quat(q1)
r_slerp = R.slerp(0, 1, [r0, r1])([alpha])[0]
return r_slerp.as_quat()
def filter(self, new_pos, new_ori):
if self.pos is None:
self.pos = np.array([new_pos.x, new_pos.y, new_pos.z])
self.ori = np.array([new_ori.x, new_ori.y, new_ori.z, new_ori.w])
return self.pos, self.ori
self.pos = self.alpha * np.array([new_pos.x, new_pos.y, new_pos.z]) + (1 - self.alpha) * self.pos
self.ori = self.slerp_quat(self.ori, np.array([new_ori.x, new_ori.y, new_ori.z, new_ori.w]), self.alpha)
return self.pos, self.ori
class JointLimiter:
def __init__(self, joint_names, max_velocity=0.5, max_acceleration=1.0, dt=0.02):
self.joint_names = joint_names
self.max_velocity = max_velocity
self.max_acceleration = max_acceleration
self.dt = dt
self.prev_q = np.zeros(len(joint_names))
self.prev_delta = np.zeros(len(joint_names))
def limit(self, q_target):
q_target = np.array(q_target)
delta = q_target - self.prev_q
max_delta = self.max_velocity * self.dt
delta = np.clip(delta, -max_delta, max_delta)
delta_change = delta - self.prev_delta
max_delta_change = self.max_acceleration * self.dt
delta_change = np.clip(delta_change, -max_delta_change, max_delta_change)
delta = self.prev_delta + delta_change
q_next = self.prev_q + delta
self.prev_q = q_next
self.prev_delta = delta
return q_next
class ArucoIKController(Node):
def __init__(self):
super().__init__("aruco_ik_controller")
self.pose_filter = PoseFilter(POSE_ALPHA)
self.joint_limiter = JointLimiter(LEFT_ARM_JOINTS, MAX_VELOCITY, MAX_ACCELERATION, CONTROL_PERIOD)
self.ik_ready = False
self.prev_filtered_pos = None
self.prev_joint_angles = None
# === 参数(根据你的 openarm 改)===
self.arm_group = "left_arm"
self.base_frame = "base_link"
self.ee_link = "openarm_left_link7"
self.joint_names = [
"openarm_left_joint1",
"openarm_left_joint2",
"openarm_left_joint3",
"openarm_left_joint4",
"openarm_left_joint5",
"openarm_left_joint6",
"openarm_left_joint7",
]
self.latest_joint_state = None
self.create_subscription(
JointState,
"/joint_states",
self.joint_state_cb,
10,
)
self.traj_pub = self.create_publisher(
JointTrajectory,
"/left_joint_trajectory_controller/joint_trajectory",
10
)
self.ik_client = self.create_client(
GetPositionIK,
"/compute_ik"
)
self.get_logger().info("Waiting for IK service...")
self.ik_client.wait_for_service()
self.create_subscription(
PoseStamped,
"/hand_3d", # 已经转到 base 的 ArUco
self.on_pose,
10
)
self.timer = self.create_timer(3.0, self.check_ik_service)
def joint_state_cb(self, msg: JointState):
self.latest_joint_state = msg
def build_seed_state(self):
if self.latest_joint_state is None:
return None
seed = []
for j in self.joint_names:
try:
idx = self.latest_joint_state.name.index(j)
seed.append(self.latest_joint_state.position[idx])
except ValueError:
self.get_logger().error(f"Joint {j} not in joint_states")
return None
js = JointState()
js.name = self.joint_names
js.position = seed
return js
def check_ik_service(self):
if self.ik_client.service_is_ready():
self.get_logger().info("IK service ready!")
self.ik_ready = True
self.timer.cancel()
def on_pose(self, pose: PoseStamped):
if not self.ik_ready:
self.get_logger().warn("IK service not ready, skip IK")
return
seed_state = self.build_seed_state()
if seed_state is None:
self.get_logger().warn("No valid seed, skip IK")
return
req = GetPositionIK.Request()
req.ik_request.group_name = self.arm_group
pos, ori = self.pose_filter.filter(pose.pose.position, pose.pose.orientation)
if self.prev_filtered_pos is not None:
if np.linalg.norm(filtered_pose.pose.position - self.prev_filtered_pos) < POSITION_DEADZONE:
return # EE 位置变化太小,跳过
self.prev_filtered_pos = pos
filtered_pose = PoseStamped()
filtered_pose.header = pose.header
filtered_pose.pose.position.x, filtered_pose.pose.position.y, filtered_pose.pose.position.z = pos
# filtered_pose.pose.orientation.x, filtered_pose.pose.orientation.y, filtered_pose.pose.orientation.z, filtered_pose.pose.orientation.w = ori
# smoothed_ori = slerp(last_ori, new_ori, alpha)
target_pos = [
pos.x,
pos.y,
pos.z
]
target_pos[0] = np.clip(target_pos[0], 0, 0.8)
target_pos[1] = np.clip(target_pos[1], -0.2, 0.2) + 0.15
target_pos[2] = np.clip(target_pos[2], -0.3, 0.3) + 0.3
# pose.pose.position.x = 0.1
# pose.pose.position.y = 0.25
# pose.pose.position.z = 0.4
pose.pose.orientation.x = 1.0
pose.pose.orientation.y = 0.0
pose.pose.orientation.z = 0.0
pose.pose.orientation.w = 0.0
req.ik_request.pose_stamped.pose.position.x = target_pos[0]
req.ik_request.pose_stamped.pose.position.y = target_pos[1]
req.ik_request.pose_stamped.pose.position.z = target_pos[2]
# req.ik_request.pose_stamped.pose.position = pose.pose.position
req.ik_request.pose_stamped.pose.orientation = pose.pose.orientation
req.ik_request.ik_link_name = self.ee_link
req.ik_request.timeout.sec = 1
req.ik_request.timeout.nanosec = 0 # 0.2s
# req.ik_request.robot_state.joint_state.name = seed_state.name
# req.ik_request.robot_state.joint_state.position = seed_state.position
req.ik_request.robot_state.joint_state = seed_state
self.get_logger().info(f"Seed joints: {seed_state.position}")
# self.get_logger().info(f"joint names: {seed_state.name}")
self.get_logger().info(f"Target pose: {pose.pose.position}, {pose.pose.orientation}")
future = self.ik_client.call_async(req)
future.add_done_callback(self.on_ik_result)
def on_ik_result(self, future):
res = future.result()
self.get_logger().info(f"IK result code: {res.error_code.val}")
if res.error_code.val != res.error_code.SUCCESS:
self.get_logger().warn("IK failed")
return
jt = JointTrajectory()
jt.joint_names = self.joint_names
point = JointTrajectoryPoint()
point.positions = res.solution.joint_state.position[:len(self.joint_names)]
point.time_from_start.sec = 0
point.time_from_start.nanosec = 300_000_00 # 0.3s
jt.points.append(point)
self.traj_pub.publish(jt)
def main():
rclpy.init()
node = ArucoIKController()
rclpy.spin(node)
if __name__ == "__main__":
main()

View File

@ -29,6 +29,7 @@ setup(
'tf_test = openarm_moveit_control.tf_test:main',
'hand_follow_moveit = openarm_moveit_control.hand_follow_moveit:main',
'follow_pose_goal = openarm_moveit_control.follow_pose_goal:main',
'aruco_ik_controller = openarm_moveit_control.aruco_ik_controller:main',
],
},
)