增加使用Aruco识别位置,增加pymoveit控制机械臂

This commit is contained in:
shenchenyang 2026-02-05 19:06:57 +08:00
parent 404ceea8ac
commit bb7115b5a5
3 changed files with 307 additions and 30 deletions

View File

@ -38,7 +38,13 @@ class StereoHandDepthNode(Node):
speckleWindowSize=100,
speckleRange=32
)
self.aruco_dict = cv2.aruco.getPredefinedDictionary(
cv2.aruco.DICT_4X4_50
)
self.aruco_params = cv2.aruco.DetectorParameters()
self.marker_size = 0.058 # 米
# ---------- ROS ----------
self.left_info = None
@ -68,32 +74,32 @@ class StereoHandDepthNode(Node):
10
)
self.mp_hands = mp.solutions.hands.Hands(
static_image_mode=False,
max_num_hands=1,
min_detection_confidence=0.7,
min_tracking_confidence=0.7
)
# self.mp_hands = mp.solutions.hands.Hands(
# static_image_mode=False,
# max_num_hands=1,
# min_detection_confidence=0.4,
# min_tracking_confidence=0.4
# )
self.get_logger().info('StereoHandDepthNode started')
def init_rectify_maps(self, left_info, right_info, image_size):
K1 = np.array(left_info.k).reshape(3, 3)
D1 = np.array(left_info.d)
K2 = np.array(right_info.k).reshape(3, 3)
D2 = np.array(right_info.d)
self.K1 = np.array(left_info.k).reshape(3, 3)
self.D1 = np.array(left_info.d)
self.K2 = np.array(right_info.k).reshape(3, 3)
self.D2 = np.array(right_info.d)
R1 = np.array(left_info.r).reshape(3, 3)
R2 = np.array(right_info.r).reshape(3, 3)
self.R1 = np.array(left_info.r).reshape(3, 3)
self.R2 = np.array(right_info.r).reshape(3, 3)
P1 = np.array(left_info.p).reshape(3, 4)
P2 = np.array(right_info.p).reshape(3, 4)
self.P1 = np.array(left_info.p).reshape(3, 4)
self.P2 = np.array(right_info.p).reshape(3, 4)
map1_l, map2_l = cv2.initUndistortRectifyMap(
K1, D1, R1, P1[:3, :3], image_size, cv2.CV_32FC1
self.K1, self.D1, self.R1, self.P1[:3, :3], image_size, cv2.CV_32FC1
)
map1_r, map2_r = cv2.initUndistortRectifyMap(
K2, D2, R2, P2[:3, :3], image_size, cv2.CV_32FC1
self.K2, self.D2, self.R2, self.P2[:3, :3], image_size, cv2.CV_32FC1
)
return map1_l, map2_l, map1_r, map2_r
@ -127,17 +133,17 @@ class StereoHandDepthNode(Node):
right = frame[:, mid:]
left_rect = cv2.remap(left, self.map1_l, self.map2_l, cv2.INTER_LINEAR)
right_rect = cv2.remap(right, self.map1_r, self.map2_r, cv2.INTER_LINEAR)
results = self.mp_hands.process(left_rect)
# results = self.mp_hands.process(left_rect)
p = PointStamped()
if results.multi_hand_landmarks:
wrist = results.multi_hand_landmarks[0].landmark[0]
# if results.multi_hand_landmarks:
# wrist = results.multi_hand_landmarks[0].landmark[0]
u = int(wrist.x * (w / 2)) # 注意这里是左图宽度
v = int(wrist.y * h)
p.header = msg.header
p.point.x = float(u)
p.point.y = float(v)
p.point.z = 0.0 # 暂时不用
# u = int(wrist.x * (w / 2)) # 注意这里是左图宽度
# v = int(wrist.y * h)
# p.header = msg.header
# p.point.x = float(u)
# p.point.y = float(v)
# p.point.z = 0.0 # 暂时不用
# 转灰度
left_gray = cv2.cvtColor(left_rect, cv2.COLOR_BGR2GRAY)
@ -147,10 +153,47 @@ class StereoHandDepthNode(Node):
# 计算视差
disparity = self.stereo.compute(left_gray, right_gray).astype(np.float32) / 16.0
corners, ids, _ = cv2.aruco.detectMarkers(
left_rect, self.aruco_dict, parameters=self.aruco_params
)
if ids is not None:
cv2.aruco.drawDetectedMarkers(left_gray, corners, ids)
c = corners[0][0]
center = c.mean(axis=0)
u, v = int(center[0]), int(center[1])
d = disparity[v, u]
cv2.circle(left_gray, (u, v), 5, (0, 0, 255), -1)
cv2.putText(
left_gray, f"disp={d:.2f}", (u+5, v-5),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0,255,0), 1
)
for e in corners:
ret, rvec, tvec = cv2.solvePnP(
objectPoints=np.array([
[-self.marker_size/2, self.marker_size/2, 0],
[ self.marker_size/2, self.marker_size/2, 0],
[ self.marker_size/2, -self.marker_size/2, 0],
[-self.marker_size/2, -self.marker_size/2, 0]
], dtype=np.float32),
imagePoints=e,
cameraMatrix=self.K1,
distCoeffs=self.D1
)
cv2.drawFrameAxes(left_gray, self.K1, self.D1, rvec, tvec, 0.1, 2)
# self.get_logger().info(f"rvec: {rvec}")
# u = int(u * (w / 2)) # 注意这里是左图宽度
# v = int(v * h)
p.header = msg.header
p.point.x = float(u)
p.point.y = float(v)
p.point.z = 0.0 # 暂时不用
cv2.imshow("left", left_gray)
# ----------- 这里先用“假手点”(画面中心)-----------
x = int(p.point.x)
y = int(p.point.y)
cv2.circle(left_gray, (x, y), 5, (255, 0, 0), -1)
# cv2.circle(left_gray, (x, y), 5, (255, 0, 0), -1)
# print("left_gray.shape:", left_gray.shape)
# cv2.imshow('left', left_gray)
# print(f'Wrist pixel: ({x}, {y})')
@ -171,9 +214,10 @@ class StereoHandDepthNode(Node):
msg_out = PointStamped()
msg_out.header.stamp = self.get_clock().now().to_msg()
msg_out.header.frame_id = 'camera_link'
msg_out.point.x = float(X)
msg_out.point.y = float(Y)
msg_out.point.z = float(Z)
msg_out.point.x = float(Z)
msg_out.point.y = float(X)
msg_out.point.z = -float(Y)
self.get_logger().info(f"X:{X:.3f}, Y:{Y:.3f}, Z:{Z:.3f}")
self.pub.publish(msg_out)

View File

@ -0,0 +1,232 @@
#!/usr/bin/env python3
"""
Example of moving to a pose goal.
- ros2 run pymoveit2 ex_pose_goal.py --ros-args -p position:="[0.25, 0.0, 1.0]" -p quat_xyzw:="[0.0, 0.0, 0.0, 1.0]" -p cartesian:=False
- ros2 run pymoveit2 ex_pose_goal.py --ros-args -p position:="[0.25, 0.0, 1.0]" -p quat_xyzw:="[0.0, 0.0, 0.0, 1.0]" -p cartesian:=False -p synchronous:=False -p cancel_after_secs:=1.0
- ros2 run pymoveit2 ex_pose_goal.py --ros-args -p position:="[0.25, 0.0, 1.0]" -p quat_xyzw:="[0.0, 0.0, 0.0, 1.0]" -p cartesian:=False -p synchronous:=False -p cancel_after_secs:=0.0
"""
from threading import Thread
import rclpy
from rclpy.callback_groups import ReentrantCallbackGroup
from rclpy.node import Node
from moveit_msgs.action import MoveGroup
from moveit_msgs.msg import Constraints, PositionConstraint
from geometry_msgs.msg import PointStamped, PoseStamped
from shape_msgs.msg import SolidPrimitive
import numpy as np
import time
import cv2
import tf2_ros
import tf2_geometry_msgs
from pymoveit2 import MoveIt2, MoveIt2State
from pymoveit2.robots import openarm as robot
class HandFollowMoveIt(Node):
def __init__(self):
super().__init__('hand_follow_moveit')
self.last_sent = time.time()
self.send_period = 0.3 # 约 3 Hz
self.p_prev = None
self.plan_period = 1.0 # 秒(规划频率)
self.min_delta = 0.005 # m手移动超过才重新规划
self.z_min = 0.15 # 防止撞桌
self.z_max = 0.60
self.latest_hand = None
self.last_target = None
self.base_frame = 'openarm_body_link0'
# Create node for this example
# Declare parameters for position and orientation
self.declare_parameter("position", [0.5, 0.0, 0.25])
self.declare_parameter("quat_xyzw", [1.0, 0.0, 0.0, 0.0])
self.declare_parameter("synchronous", True)
# If non-positive, don't cancel. Only used if synchronous is False
self.declare_parameter("cancel_after_secs", 0.0)
# Planner ID
self.declare_parameter("planner_id", "RRTConnectkConfigDefault")
# Declare parameters for cartesian planning
self.declare_parameter("cartesian", True)
self.declare_parameter("cartesian_max_step", 0.0025)
self.declare_parameter("cartesian_fraction_threshold", 0.0)
self.declare_parameter("cartesian_jump_threshold", 0.0)
self.declare_parameter("cartesian_avoid_collisions", False)
# Create callback group that allows execution of callbacks in parallel without restrictions
callback_group = ReentrantCallbackGroup()
# Create MoveIt 2 interface
self.moveit2 = MoveIt2(
node=self,
joint_names=robot.joint_names(),
base_link_name=robot.base_link_name(),
end_effector_name=robot.end_effector_name(),
group_name=robot.MOVE_GROUP_ARM,
callback_group=callback_group,
)
self.moveit2.planner_id = (
self.get_parameter("planner_id").get_parameter_value().string_value
)
# Spin the node in background thread(s) and wait a bit for initialization
# self.executor = rclpy.executors.MultiThreadedExecutor(2)
# self.executor.add_node(self)
# self.executor_thread = Thread(target=self.executor.spin, daemon=True, args=())
# self.executor_thread.start()
# self.create_rate(1.0).sleep()
# Scale down velocity and acceleration of joints (percentage of maximum)
self.moveit2.max_velocity = 0.5
self.moveit2.max_acceleration = 0.5
# Get parameters
self.position = self.get_parameter("position").get_parameter_value().double_array_value
self.quat_xyzw = self.get_parameter("quat_xyzw").get_parameter_value().double_array_value
self.synchronous = self.get_parameter("synchronous").get_parameter_value().bool_value
self.cancel_after_secs = (
self.get_parameter("cancel_after_secs").get_parameter_value().double_value
)
self.cartesian = self.get_parameter("cartesian").get_parameter_value().bool_value
self.cartesian_max_step = (
self.get_parameter("cartesian_max_step").get_parameter_value().double_value
)
self.cartesian_fraction_threshold = (
self.get_parameter("cartesian_fraction_threshold")
.get_parameter_value()
.double_value
)
self.cartesian_jump_threshold = (
self.get_parameter("cartesian_jump_threshold")
.get_parameter_value()
.double_value
)
self.cartesian_avoid_collisions = (
self.get_parameter("cartesian_avoid_collisions")
.get_parameter_value()
.bool_value
)
# Set parameters for cartesian planning
self.moveit2.cartesian_avoid_collisions = self.cartesian_avoid_collisions
self.moveit2.cartesian_jump_threshold = self.cartesian_jump_threshold
# ====== TF ======
self.tf_buffer = tf2_ros.Buffer()
self.tf_listener = tf2_ros.TransformListener(self.tf_buffer, self)
self.send_goal([0.0, 0.15, 0.3])
time.sleep(3)
# ====== 订阅 ======
self.sub = self.create_subscription(
PointStamped,
'/hand_3d',
self.hand_cb,
10
)
self.timer = self.create_timer(self.plan_period, self.plan_timer)
self.get_logger().info('Hand → MoveIt Planning node started')
# ---------- 回调 ----------
def hand_cb(self, msg):
try:
hand = self.tf_buffer.transform(
msg,
self.base_frame,
timeout=rclpy.duration.Duration(seconds=0.1)
)
except Exception:
return
print(f"手坐标({self.base_frame}):{hand.point.x:.3f}, {hand.point.y:.3f}, {hand.point.z:.3f}")
self.latest_hand = np.array([
hand.point.x,
hand.point.y,
hand.point.z
])
def update_hand(self, p):
""" p: np.array([x,y,z]) in base_link """
# 1⃣ 限幅(按你机械臂改)
p[0] = np.clip(p[0], -0.1, 0.1)
p[1] = np.clip(p[1], -0.20, 0.20)
p[2] = np.clip(p[2], 0.0, 0.30)
# 2⃣ 低通滤波
if self.p_prev is None:
self.p_prev = p
return
alpha = 0.2
p = alpha * p + (1 - alpha) * self.p_prev
# 3⃣ 死区
if np.linalg.norm(p - self.p_prev) < 0.015:
return
# 4⃣ 控制频率
if time.time() - self.last_sent < self.send_period:
return
self.send_goal(p)
self.p_prev = p
self.last_sent = time.time()
def send_goal(self, p):
# Move to pose
self.get_logger().info(
f"Moving to {{position: {list(p)}, quat_xyzw: {list(self.quat_xyzw)}}}"
)
self.moveit2.move_to_pose(
position=p,
quat_xyzw=self.quat_xyzw,
cartesian=self.cartesian,
cartesian_max_step=self.cartesian_max_step,
cartesian_fraction_threshold=self.cartesian_fraction_threshold,
)
def plan_timer(self):
if self.latest_hand is None:
return
# 限制 workspace
target = self.latest_hand.copy()
target[0] = 0.6 - np.clip(target[0], 0, 0.8)
target[1] = np.clip(target[1], -0.2, 0.2) + 0.15
target[2] = np.clip(target[2], -0.3, 0.3) + 0.3
# target[0] = 0.12
# target[2] = 0.32
# 判断是否值得重新规划
if self.last_target is not None:
if np.linalg.norm(target - self.last_target) < self.min_delta:
return
self.last_target = target.copy()
self.send_goal(target)
print(f"发送新目标:{target[0]:.3f}, {target[1]:.3f}, {target[2]:.3f}")
def main():
rclpy.init()
node = HandFollowMoveIt()
rclpy.spin(node)
rclpy.shutdown()
# node.executor_thread.join()
exit(0)
if __name__ == "__main__":
main()

View File

@ -28,6 +28,7 @@ setup(
'handeye_collect = openarm_moveit_control.handeye_collect:main',
'tf_test = openarm_moveit_control.tf_test:main',
'hand_follow_moveit = openarm_moveit_control.hand_follow_moveit:main',
'follow_pose_goal = openarm_moveit_control.follow_pose_goal:main',
],
},
)