增加末端执行器角度规划

This commit is contained in:
shenchenyang 2026-02-06 17:48:58 +08:00
parent a2fc8c1faf
commit e3b7360b9a
2 changed files with 121 additions and 91 deletions

View File

@ -6,6 +6,7 @@ import numpy as np
from sensor_msgs.msg import Image
from geometry_msgs.msg import PointStamped
from geometry_msgs.msg import PoseStamped
from cv_bridge import CvBridge
from sensor_msgs.msg import CameraInfo
import mediapipe as mp
@ -71,7 +72,7 @@ class StereoHandDepthNode(Node):
)
self.pub = self.create_publisher(
PointStamped,
PoseStamped,
'/hand_3d',
10
)
@ -134,9 +135,9 @@ class StereoHandDepthNode(Node):
left = frame[:, :mid]
right = frame[:, mid:]
left_rect = cv2.remap(left, self.map1_l, self.map2_l, cv2.INTER_LINEAR)
right_rect = cv2.remap(right, self.map1_r, self.map2_r, cv2.INTER_LINEAR)
# right_rect = cv2.remap(right, self.map1_r, self.map2_r, cv2.INTER_LINEAR)
# results = self.mp_hands.process(left_rect)
p = PointStamped()
# p = PointStamped()
# if results.multi_hand_landmarks:
# wrist = results.multi_hand_landmarks[0].landmark[0]
@ -148,29 +149,29 @@ class StereoHandDepthNode(Node):
# p.point.z = 0.0 # 暂时不用
# 转灰度
left_gray = cv2.cvtColor(left_rect, cv2.COLOR_BGR2GRAY)
right_gray = cv2.cvtColor(right_rect, cv2.COLOR_BGR2GRAY)
# left_gray = cv2.cvtColor(left_rect, cv2.COLOR_BGR2GRAY)
# right_gray = cv2.cvtColor(right_rect, cv2.COLOR_BGR2GRAY)
# 计算视差
disparity = self.stereo.compute(left_gray, right_gray).astype(np.float32) / 16.0
# disparity = self.stereo.compute(left_gray, right_gray).astype(np.float32) / 16.0
corners, ids, _ = cv2.aruco.detectMarkers(
left_rect, self.aruco_dict, parameters=self.aruco_params
)
msg_out = PoseStamped()
if ids is not None:
cv2.aruco.drawDetectedMarkers(left_gray, corners, ids)
c = corners[0][0]
center = c.mean(axis=0)
u, v = int(center[0]), int(center[1])
d = disparity[v, u]
cv2.aruco.drawDetectedMarkers(left_rect, corners, ids)
# c = corners[0][0]
# center = c.mean(axis=0)
# u, v = int(center[0]), int(center[1])
# d = disparity[v, u]
cv2.circle(left_gray, (u, v), 5, (0, 0, 255), -1)
cv2.putText(
left_gray, f"disp={d:.2f}", (u+5, v-5),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0,255,0), 1
)
# cv2.circle(left_gray, (u, v), 5, (0, 0, 255), -1)
# cv2.putText(
# left_gray, f"disp={d:.2f}", (u+5, v-5),
# cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0,255,0), 1
# )
for e in corners:
ret, rvec, tvec = cv2.solvePnP(
objectPoints=np.array([
@ -184,57 +185,73 @@ class StereoHandDepthNode(Node):
distCoeffs=self.D1
)
if(ret == True):
cv2.drawFrameAxes(left_gray, self.K1, self.D1, rvec, tvec, 0.1, 2)
R_ca, _ = cv2.Rodrigues(rvec)
rot = R.from_matrix(R_ca)
cv2.drawFrameAxes(left_rect, self.K1, self.D1, rvec, tvec, 0.1, 2)
R_cv, _ = cv2.Rodrigues(rvec)
R_cv_to_ros = np.array([
[ 0, 0, -1],
[ 1, 0, 0],
[ 0, -1, 0]
])
R_ros = R_cv_to_ros @ R_cv
t_ros = R_cv_to_ros @ tvec.reshape(3)
# t_ros = tvec.reshape(3)
rot = R.from_matrix(R_ros)
qx, qy, qz, qw = rot.as_quat()
roll, pitch, yaw = rot.as_euler('xyz')
self.get_logger().info(f"quat: ({qx}, {qy}, {qz}, {qw})")
self.get_logger().info(f"euler: ({roll*180/3.14:.1f}, {pitch*180/3.14:.1f}, {yaw*180/3.14:.1f})")
msg_out.pose.orientation.x = float(qx)
msg_out.pose.orientation.y = float(qy)
msg_out.pose.orientation.z = float(qz)
msg_out.pose.orientation.w = float(qw)
msg_out.pose.position.x = float(t_ros[0]) + 0.7
msg_out.pose.position.y = float(t_ros[1])
msg_out.pose.position.z = float(t_ros[2])
# u = int(u * (w / 2)) # 注意这里是左图宽度
# v = int(v * h)
p.header = msg.header
p.point.x = float(u)
p.point.y = float(v)
p.point.z = 0.0 # 暂时不用
cv2.imshow("left", left_gray)
# p.header = msg.header
# p.point.x = float(u)
# p.point.y = float(v)
# p.point.z = 0.0 # 暂时不用
cv2.imshow("left", left_rect)
# ----------- 这里先用“假手点”(画面中心)-----------
x = int(p.point.x)
y = int(p.point.y)
# x = int(p.point.x)
# y = int(p.point.y)
# cv2.circle(left_gray, (x, y), 5, (255, 0, 0), -1)
# print("left_gray.shape:", left_gray.shape)
# cv2.imshow('left', left_gray)
# print(f'Wrist pixel: ({x}, {y})')
# ROI 取中值,稳很多
roi = disparity[y-3:y+3, x-3:x+3]
d = np.nanmedian(roi)
# roi = disparity[y-3:y+3, x-3:x+3]
# d = np.nanmedian(roi)
# print(f'Disparity at wrist: {roi}')
if d <= 0:
return
# if d <= 0:
# return
# 深度
Z = (self.fx * self.baseline) / d
X = (x - self.cx) * Z / self.fx
Y = (y - self.cy) * Z / self.fy
# Z = (self.fx * self.baseline) / d
# X = (x - self.cx) * Z / self.fx
# Y = (y - self.cy) * Z / self.fy
# 发布
msg_out = PointStamped()
msg_out.header.stamp = self.get_clock().now().to_msg()
msg_out.header.frame_id = 'camera_link'
msg_out.point.x = float(Z)
msg_out.point.y = float(X)
msg_out.point.z = -float(Y)
self.get_logger().info(f"X:{X:.3f}, Y:{Y:.3f}, Z:{Z:.3f}")
# msg_out.pose.position.x = float(Z)
# msg_out.pose.position.y = float(X)
# msg_out.pose.position.z = -float(Y)
# self.get_logger().info(f"X:{X:.3f}, Y:{Y:.3f}, Z:{Z:.3f}")
self.pub.publish(msg_out)
# ----------- 可视化(调试用)-----------
disp_vis = cv2.normalize(disparity, None, 0, 255, cv2.NORM_MINMAX)
disp_vis = disp_vis.astype(np.uint8)
cv2.circle(disp_vis, (x, y), 5, 255, -1)
disp_vis = disp_vis[:, self.numDisparities:]
cv2.imshow('disparity', disp_vis)
# disp_vis = cv2.normalize(disparity, None, 0, 255, cv2.NORM_MINMAX)
# disp_vis = disp_vis.astype(np.uint8)
# cv2.circle(disp_vis, (x, y), 5, 255, -1)
# disp_vis = disp_vis[:, self.numDisparities:]
# cv2.imshow('disparity', disp_vis)
cv2.waitKey(1)

View File

@ -39,11 +39,11 @@ class HandFollowMoveIt(Node):
self.p_prev = None
self.plan_period = 1.0 # 秒(规划频率)
self.plan_period = 0.1 # 秒(规划频率)
self.min_delta = 0.005 # m手移动超过才重新规划
self.z_min = 0.15 # 防止撞桌
self.z_max = 0.60
self.latest_hand = None
self.latest_hand_pos = None
self.last_target = None
self.base_frame = 'openarm_body_link0'
# Create node for this example
@ -51,7 +51,7 @@ class HandFollowMoveIt(Node):
# Declare parameters for position and orientation
self.declare_parameter("position", [0.5, 0.0, 0.25])
self.declare_parameter("quat_xyzw", [1.0, 0.0, 0.0, 0.0])
self.declare_parameter("quat_xyzw", [0.723, 0.0, 0.69, 0.0])
self.declare_parameter("synchronous", True)
# If non-positive, don't cancel. Only used if synchronous is False
self.declare_parameter("cancel_after_secs", 0.0)
@ -83,7 +83,7 @@ class HandFollowMoveIt(Node):
# Spin the node in background thread(s) and wait a bit for initialization
# self.executor = rclpy.executors.MultiThreadedExecutor(2)
# self.executor.add_node(self)
# self.executor_thread = Thread(target=self.executor.spin, daemon=True, args=())
# self.executor_thread = Thread(target_pos=self.executor.spin, daemon=True, args=())
# self.executor_thread.start()
# self.create_rate(1.0).sleep()
@ -125,12 +125,12 @@ class HandFollowMoveIt(Node):
# ====== TF ======
self.tf_buffer = tf2_ros.Buffer()
self.tf_listener = tf2_ros.TransformListener(self.tf_buffer, self)
self.send_goal([0.0, 0.15, 0.3])
self.zero_hand()
time.sleep(3)
# ====== 订阅 ======
self.sub = self.create_subscription(
PointStamped,
PoseStamped,
'/hand_3d',
self.hand_cb,
10
@ -148,76 +148,89 @@ class HandFollowMoveIt(Node):
)
except Exception:
return
print(f"手坐标({self.base_frame}):{hand.point.x:.3f}, {hand.point.y:.3f}, {hand.point.z:.3f}")
self.latest_hand = np.array([
hand.point.x,
hand.point.y,
hand.point.z
self.get_logger().info(f"手坐标({self.base_frame}):{hand.pose.position.x:.3f}, {hand.pose.position.y:.3f}, {hand.pose.position.z:.3f}")
self.get_logger().info(f"手四元数({self.base_frame}):{hand.pose.orientation.x:.3f}, {hand.pose.orientation.y:.3f}, {hand.pose.orientation.z:.3f}, {hand.pose.orientation.w:.3f}")
self.latest_hand_pos = np.array([
hand.pose.position.x,
hand.pose.position.y,
hand.pose.position.z
])
self.quat_xyzw = np.array([
hand.pose.orientation.x,
hand.pose.orientation.y,
hand.pose.orientation.z,
hand.pose.orientation.w
])
def zero_hand(self):
self.send_goal([0.0, 0.15, 0.3], [1.0, 0.0, 0.0, 0.0])
# def update_hand(self, p):
# """ p: np.array([x,y,z]) in base_link """
def update_hand(self, p):
""" p: np.array([x,y,z]) in base_link """
# # 1⃣ 限幅(按你机械臂改)
# p[0] = np.clip(p[0], -0.1, 0.1)
# p[1] = np.clip(p[1], -0.20, 0.20)
# p[2] = np.clip(p[2], 0.0, 0.30)
# 1⃣ 限幅(按你机械臂改)
p[0] = np.clip(p[0], -0.1, 0.1)
p[1] = np.clip(p[1], -0.20, 0.20)
p[2] = np.clip(p[2], 0.0, 0.30)
# # 2⃣ 低通滤波
# if self.p_prev is None:
# self.p_prev = p
# return
# 2⃣ 低通滤波
if self.p_prev is None:
self.p_prev = p
return
# alpha = 0.2
# p = alpha * p + (1 - alpha) * self.p_prev
alpha = 0.2
p = alpha * p + (1 - alpha) * self.p_prev
# # 3⃣ 死区
# if np.linalg.norm(p - self.p_prev) < 0.015:
# return
# 3⃣ 死区
if np.linalg.norm(p - self.p_prev) < 0.015:
return
# # 4⃣ 控制频率
# if time.time() - self.last_sent < self.send_period:
# return
# 4⃣ 控制频率
if time.time() - self.last_sent < self.send_period:
return
# self.send_goal(p)
# self.p_prev = p
# self.last_sent = time.time()
self.send_goal(p)
self.p_prev = p
self.last_sent = time.time()
def send_goal(self, p):
def send_goal(self, p, quat):
# Move to pose
self.get_logger().info(
f"Moving to {{position: {list(p)}, quat_xyzw: {list(self.quat_xyzw)}}}"
)
self.moveit2.move_to_pose(
position=p,
quat_xyzw=self.quat_xyzw,
quat_xyzw=quat,
cartesian=self.cartesian,
cartesian_max_step=self.cartesian_max_step,
cartesian_fraction_threshold=self.cartesian_fraction_threshold,
)
def plan_timer(self):
if self.latest_hand is None:
if self.latest_hand_pos is None:
self.zero_hand()
return
# 限制 workspace
target = self.latest_hand.copy()
target_pos = self.latest_hand_pos.copy()
if np.linalg.norm(target_pos) == 0:
self.zero_hand()
return
target[0] = 0.6 - np.clip(target[0], 0, 0.8)
target[1] = np.clip(target[1], -0.2, 0.2) + 0.15
target[2] = np.clip(target[2], -0.3, 0.3) + 0.3
target_pos[0] = np.clip(target_pos[0], 0, 0.8)
target_pos[1] = np.clip(target_pos[1], -0.2, 0.2) + 0.15
target_pos[2] = np.clip(target_pos[2], -0.3, 0.3) + 0.3
# target[0] = 0.12
# target[2] = 0.32
# target_pos[0] = 0.12
# target_pos[2] = 0.32
# 判断是否值得重新规划
if self.last_target is not None:
if np.linalg.norm(target - self.last_target) < self.min_delta:
if np.linalg.norm(target_pos - self.last_target) < self.min_delta:
return
self.last_target = target.copy()
self.send_goal(target)
print(f"发送新目标:{target[0]:.3f}, {target[1]:.3f}, {target[2]:.3f}")
self.last_target = target_pos.copy()
self.send_goal(target_pos, self.quat_xyzw)
self.get_logger().info(f"发送新目标:{target_pos[0]:.3f}, {target_pos[1]:.3f}, {target_pos[2]:.3f}")
def main():
rclpy.init()