增加机械臂动作录制

This commit is contained in:
shenchenyang 2026-03-23 14:46:40 +08:00
parent a7b19d766b
commit 8a020b18ac
15 changed files with 21923 additions and 107 deletions

1
.gitignore vendored
View File

@ -3,3 +3,4 @@ build/
install/ install/
log/ log/
*.png *.png
openarm_joint_record.yaml

21252
openarm_joint_trajectory.yaml Normal file

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,20 @@
camera_node:
ros__parameters:
# 输入源: usb 或 rtsp
source_type: "rtsp"
# USB 参数
usb_index: 0
usb_use_mjpg: true
# RTSP 参数source_type=rtsp 时生效)
rtsp_url: "rtsp://192.168.3.130:8554/test"
rtsp_transport: "udp"
# 通用采集参数
camera_width: 2304
camera_height: 1296
target_fps: 30
# 是否将原始图像按左右半幅拆分发布
split_stereo: false

View File

@ -1,17 +1,20 @@
import os import os
from launch import LaunchDescription from launch import LaunchDescription
from launch.actions import ExecuteProcess, TimerAction, IncludeLaunchDescription from launch.actions import TimerAction
from launch.launch_description_sources import PythonLaunchDescriptionSource
from launch.substitutions import PathJoinSubstitution
from launch_ros.actions import Node from launch_ros.actions import Node
from launch_ros.substitutions import FindPackageShare
from ament_index_python.packages import get_package_share_directory from ament_index_python.packages import get_package_share_directory
import xacro
def generate_launch_description(): def generate_launch_description():
params_file = os.path.join(
get_package_share_directory('openarm_camera'),
'config',
'camera_pub.params.yaml'
)
camera_init = Node( camera_init = Node(
package='openarm_camera', package='openarm_camera',
executable='camera_pub_node', executable='camera_pub_node',
parameters=[params_file],
output='screen' output='screen'
) )
camera_cal_init = Node( camera_cal_init = Node(
@ -43,8 +46,8 @@ def generate_launch_description():
# period=4.0, # period=4.0,
# actions=[camera_cal_init] # actions=[camera_cal_init]
# ), # ),
TimerAction( # TimerAction(
period=4.0, # period=4.0,
actions=[stereo_hand_depth] # actions=[stereo_hand_depth]
), # ),
]) ])

View File

@ -3,22 +3,18 @@ from rclpy.node import Node
from sensor_msgs.msg import Image from sensor_msgs.msg import Image
from cv_bridge import CvBridge from cv_bridge import CvBridge
import cv2 import cv2
import numpy as np import os
import time import time
from rcl_interfaces.msg import SetParametersResult
from camera_info_manager import CameraInfoManager from camera_info_manager import CameraInfoManager
from sensor_msgs.msg import CameraInfo from sensor_msgs.msg import CameraInfo
CAMERA_WIDTH = 1280 DEFAULT_CAMERA_WIDTH = 1280
CAMERA_HEIGHT = 480 DEFAULT_CAMERA_HEIGHT = 480
TARGET_FPS = 30 DEFAULT_TARGET_FPS = 30
FRAME_WIDTH = int(CAMERA_WIDTH / 2)
FRAME_HEIGHT = int(CAMERA_HEIGHT / 2)
DIAG_INTERVAL_SEC = 1.0 DIAG_INTERVAL_SEC = 1.0
print(FRAME_WIDTH, FRAME_HEIGHT)
class CameraNode(Node): class CameraNode(Node):
def __init__(self): def __init__(self):
super().__init__('camera_node') super().__init__('camera_node')
@ -52,6 +48,137 @@ class CameraNode(Node):
self.publisher_right = self.create_publisher(Image, '/camera/right/image_raw', 10) self.publisher_right = self.create_publisher(Image, '/camera/right/image_raw', 10)
self.publisher_raw = self.create_publisher(Image, '/image_1', 10) self.publisher_raw = self.create_publisher(Image, '/image_1', 10)
# 输入源参数:支持 usb/rtsp 两种来源,并支持运行时切换。
self.declare_parameter('source_type', 'usb')
self.declare_parameter('usb_index', 0)
self.declare_parameter('rtsp_url', '')
self.declare_parameter('rtsp_transport', 'tcp')
self.declare_parameter('camera_width', DEFAULT_CAMERA_WIDTH)
self.declare_parameter('camera_height', DEFAULT_CAMERA_HEIGHT)
self.declare_parameter('target_fps', DEFAULT_TARGET_FPS)
self.declare_parameter('usb_use_mjpg', True)
self.declare_parameter('split_stereo', False)
self.add_on_set_parameters_callback(self._on_set_parameters)
self.source_type = self.get_parameter('source_type').value
self.usb_index = int(self.get_parameter('usb_index').value)
self.rtsp_url = self.get_parameter('rtsp_url').value
self.rtsp_transport = self.get_parameter('rtsp_transport').value
self.camera_width = int(self.get_parameter('camera_width').value)
self.camera_height = int(self.get_parameter('camera_height').value)
self.target_fps = int(self.get_parameter('target_fps').value)
self.usb_use_mjpg = bool(self.get_parameter('usb_use_mjpg').value)
self.split_stereo = bool(self.get_parameter('split_stereo').value)
self.diag_last_time = time.perf_counter()
self.capture_ok_count = 0
self.publish_count = 0
self.read_cost_sum = 0.0
self.convert_cost_sum = 0.0
self.publish_cost_sum = 0.0
self.total_cost_sum = 0.0
self.failed_read_count = 0
self.reopen_requested = False
self.reopen_reason = ''
self.last_reopen_try = 0.0
self.get_logger().info('正在初始化摄像头...')
self.cap = None
self._open_capture()
timer_period = max(1.0 / max(self.target_fps, 1), 0.001)
self.timer = self.create_timer(timer_period, self.timer_callback)
self.get_logger().info('摄像头节点已启动')
def _on_set_parameters(self, params):
changed = False
for p in params:
if p.name == 'source_type' and p.value not in ('usb', 'rtsp'):
return SetParametersResult(successful=False, reason='source_type 必须是 usb 或 rtsp')
if p.name == 'target_fps' and int(p.value) <= 0:
return SetParametersResult(successful=False, reason='target_fps 必须 > 0')
if p.name == 'camera_width' and int(p.value) <= 0:
return SetParametersResult(successful=False, reason='camera_width 必须 > 0')
if p.name == 'camera_height' and int(p.value) <= 0:
return SetParametersResult(successful=False, reason='camera_height 必须 > 0')
if p.name in {
'source_type', 'usb_index', 'rtsp_url', 'rtsp_transport',
'camera_width', 'camera_height', 'target_fps', 'usb_use_mjpg',
'split_stereo'
}:
changed = True
if changed:
self.reopen_requested = True
self.reopen_reason = '参数更新触发重连'
return SetParametersResult(successful=True)
def _refresh_params(self):
self.source_type = self.get_parameter('source_type').value
self.usb_index = int(self.get_parameter('usb_index').value)
self.rtsp_url = self.get_parameter('rtsp_url').value
self.rtsp_transport = self.get_parameter('rtsp_transport').value
self.camera_width = int(self.get_parameter('camera_width').value)
self.camera_height = int(self.get_parameter('camera_height').value)
self.target_fps = int(self.get_parameter('target_fps').value)
self.usb_use_mjpg = bool(self.get_parameter('usb_use_mjpg').value)
self.split_stereo = bool(self.get_parameter('split_stereo').value)
def _open_capture(self):
self._refresh_params()
if self.cap is not None:
self.cap.release()
self.cap = None
if self.source_type == 'usb':
self.get_logger().info(f'正在打开 USB 摄像头 /dev/video{self.usb_index} ...')
cap = cv2.VideoCapture(self.usb_index, cv2.CAP_V4L2)
if not cap.isOpened():
raise RuntimeError(f'无法打开 USB 摄像头 /dev/video{self.usb_index}')
if self.usb_use_mjpg:
cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'MJPG'))
cap.set(cv2.CAP_PROP_FRAME_WIDTH, self.camera_width)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self.camera_height)
cap.set(cv2.CAP_PROP_FPS, self.target_fps)
cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
self.cap = cap
else:
if not self.rtsp_url:
raise RuntimeError('source_type=rtsp 时必须设置 rtsp_url')
transport = str(self.rtsp_transport).lower()
if transport in ('tcp', 'udp'):
os.environ['OPENCV_FFMPEG_CAPTURE_OPTIONS'] = f'rtsp_transport;{transport}'
self.get_logger().info(f'正在打开 RTSP 流: {self.rtsp_url}')
cap = cv2.VideoCapture(self.rtsp_url, cv2.CAP_FFMPEG)
if not cap.isOpened():
cap = cv2.VideoCapture(self.rtsp_url)
if not cap.isOpened():
raise RuntimeError('无法打开 RTSP 流,请检查 url、网络或认证信息')
cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
self.cap = cap
# 预热并读取一帧,确保数据流有效。
for _ in range(3):
self.cap.read()
ret, frame = self.cap.read()
if not ret or frame is None:
raise RuntimeError('视频源已打开但无法读取图像帧')
real_w = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
real_h = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
real_fps = self.cap.get(cv2.CAP_PROP_FPS)
fourcc = int(self.cap.get(cv2.CAP_PROP_FOURCC))
fourcc_str = ''.join([chr((fourcc >> 8 * j) & 0xFF) for j in range(4)])
self.get_logger().info(
f'视频源打开成功: source={self.source_type}, {real_w}x{real_h}, fps={real_fps:.2f}, fourcc={fourcc_str}'
)
self.failed_read_count = 0
self.diag_last_time = time.perf_counter() self.diag_last_time = time.perf_counter()
self.capture_ok_count = 0 self.capture_ok_count = 0
self.publish_count = 0 self.publish_count = 0
@ -60,72 +187,69 @@ class CameraNode(Node):
self.publish_cost_sum = 0.0 self.publish_cost_sum = 0.0
self.total_cost_sum = 0.0 self.total_cost_sum = 0.0
self.get_logger().info('正在初始化摄像头...') def _try_reopen_if_needed(self):
if not self.reopen_requested:
return
now = time.perf_counter()
if now - self.last_reopen_try < 1.0:
return
# 尝试打开物理摄像头 self.last_reopen_try = now
self.cap = None try:
for i in range(2): # 尝试设备0,1,2 self.get_logger().warn(f'尝试重连视频源: {self.reopen_reason}')
self.get_logger().info(f'尝试打开摄像头设备 /dev/video{i}...') self._open_capture()
self.cap = cv2.VideoCapture(i, cv2.CAP_V4L2) self.reopen_requested = False
if self.cap.isOpened(): self.reopen_reason = ''
# 先申请 MJPG再设置分辨率/FPS该设备在 1280x480 下 YUYV 仅支持 5FPS。 self.get_logger().info('重连视频源成功')
self.cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*'MJPG')) except Exception as e:
# 设置摄像头参数 self.get_logger().error(f'重连失败: {e}')
self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, CAMERA_WIDTH)
self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, CAMERA_HEIGHT)
self.cap.set(cv2.CAP_PROP_FPS, TARGET_FPS)
self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
# 让摄像头预热
for _ in range(5):
self.cap.read()
# 验证摄像头能正常读取
ret, frame = self.cap.read()
if ret and frame is not None:
self.get_logger().info(f'成功打开物理摄像头 /dev/video{i}')
self.get_logger().info(f'图像尺寸: {frame.shape}')
real_w = int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH))
real_h = int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
real_fps = self.cap.get(cv2.CAP_PROP_FPS)
fourcc = int(self.cap.get(cv2.CAP_PROP_FOURCC))
fourcc_str = ''.join([chr((fourcc >> 8 * j) & 0xFF) for j in range(4)])
self.get_logger().info(
f'驱动实际参数: {real_w}x{real_h}, fps={real_fps:.2f}, fourcc={fourcc_str} (target_fps={TARGET_FPS})'
)
if fourcc_str != 'MJPG':
self.get_logger().warn(
'当前不是MJPG模式,1280x480下可能会被限制到低帧率。'
)
break
if self.cap is None or not self.cap.isOpened():
raise RuntimeError('未找到可用摄像头设备,请检查 /dev/video* 和权限')
self.timer = self.create_timer(1.0 / TARGET_FPS, self.timer_callback)
self.get_logger().info('摄像头节点已启动')
def timer_callback(self): def timer_callback(self):
self._try_reopen_if_needed()
if self.cap is None or not self.cap.isOpened():
return
t0 = time.perf_counter() t0 = time.perf_counter()
ret, frame = self.cap.read() ret, frame = self.cap.read()
t1 = time.perf_counter() t1 = time.perf_counter()
if ret and frame is not None: if not ret or frame is None:
self.failed_read_count += 1
if self.failed_read_count >= 30:
self.reopen_requested = True
self.reopen_reason = '连续读取失败'
return
self.failed_read_count = 0
stamp = self.get_clock().now().to_msg() stamp = self.get_clock().now().to_msg()
# msg_left= self.bridge.cv2_to_imgmsg(frame[:, :FRAME_WIDTH], encoding='bgr8') if self.split_stereo:
# msg_right= self.bridge.cv2_to_imgmsg(frame[:, FRAME_WIDTH:], encoding='bgr8') frame_width = frame.shape[1]
half_width = frame_width // 2
msg_left = self.bridge.cv2_to_imgmsg(frame[:, :half_width], encoding='bgr8')
msg_right = self.bridge.cv2_to_imgmsg(frame[:, half_width:], encoding='bgr8')
msg_raw= self.bridge.cv2_to_imgmsg(frame, encoding='bgr8') msg_raw= self.bridge.cv2_to_imgmsg(frame, encoding='bgr8')
else:
msg_left = None
msg_right = None
msg_raw = self.bridge.cv2_to_imgmsg(frame, encoding='bgr8')
t2 = time.perf_counter() t2 = time.perf_counter()
# msg_left.header.stamp = stamp if msg_left is not None and msg_right is not None:
# msg_right.header.stamp = stamp msg_left.header.stamp = stamp
msg_right.header.stamp = stamp
msg_left.header.frame_id = 'camera_frame_left'
msg_right.header.frame_id = 'camera_frame_right'
if msg_raw is not None:
msg_raw.header.stamp = stamp msg_raw.header.stamp = stamp
# msg_left.header.frame_id = 'camera_frame_left'
# msg_right.header.frame_id = 'camera_frame_right'
msg_raw.header.frame_id = 'camera_frame_raw' msg_raw.header.frame_id = 'camera_frame_raw'
# self.publisher_left.publish(msg_left)
# self.publisher_right.publish(msg_right) if msg_left is not None and msg_right is not None:
self.publisher_left.publish(msg_left)
self.publisher_right.publish(msg_right)
if msg_raw is not None:
self.publisher_raw.publish(msg_raw) self.publisher_raw.publish(msg_raw)
t3 = time.perf_counter() t3 = time.perf_counter()
@ -152,7 +276,7 @@ class CameraNode(Node):
avg_total_ms = (self.total_cost_sum / max(self.publish_count, 1)) * 1000.0 avg_total_ms = (self.total_cost_sum / max(self.publish_count, 1)) * 1000.0
self.get_logger().info( self.get_logger().info(
f'[diag] cap_fps={cap_fps:.2f}, pub_fps={pub_fps:.2f}, ' f'[diag] source={self.source_type}, cap_fps={cap_fps:.2f}, pub_fps={pub_fps:.2f}, '
f'read={avg_read_ms:.2f}ms, convert={avg_convert_ms:.2f}ms, ' f'read={avg_read_ms:.2f}ms, convert={avg_convert_ms:.2f}ms, '
f'publish={avg_publish_ms:.2f}ms, total={avg_total_ms:.2f}ms' f'publish={avg_publish_ms:.2f}ms, total={avg_total_ms:.2f}ms'
) )
@ -165,6 +289,12 @@ class CameraNode(Node):
self.publish_cost_sum = 0.0 self.publish_cost_sum = 0.0
self.total_cost_sum = 0.0 self.total_cost_sum = 0.0
def destroy_node(self):
if self.cap is not None:
self.cap.release()
self.cap = None
super().destroy_node()

View File

@ -280,7 +280,7 @@ class StereoHandDepthNode(Node):
# msg_out.pose.position.z = -float(Y) # msg_out.pose.position.z = -float(Y)
# self.get_logger().info(f"X:{X:.3f}, Y:{Y:.3f}, Z:{Z:.3f}") # self.get_logger().info(f"X:{X:.3f}, Y:{Y:.3f}, Z:{Z:.3f}")
# 对位置和姿态做简单平滑(滑动平均) # 对位置和姿态做简单平滑(滑动平均)
alpha = 0.2 # 平滑系数0.0-1.0,越小越平滑 alpha = 0.15 # 平滑系数0.0-1.0,越小越平滑
if not hasattr(self, 'smoothed_pos'): if not hasattr(self, 'smoothed_pos'):
self.smoothed_pos = self.target_pos.copy() self.smoothed_pos = self.target_pos.copy()
self.smoothed_quat = self.target_quat.copy() self.smoothed_quat = self.target_quat.copy()

View File

@ -15,6 +15,9 @@ setup(
# launch 文件 # launch 文件
(os.path.join('share', package_name, 'launch'), (os.path.join('share', package_name, 'launch'),
glob('launch/*.launch.py')), glob('launch/*.launch.py')),
# 参数文件
(os.path.join('share', package_name, 'config'),
glob('config/*.yaml')),
], ],
install_requires=['setuptools'], install_requires=['setuptools'],
zip_safe=True, zip_safe=True,

View File

@ -0,0 +1,48 @@
# openarm_joint_recorder
用于录制关节位置轨迹并按记录时间回放。
## 功能
- 订阅控制器状态话题JointTrajectoryControllerState并录制每个采样点的时间与位置
- 通过 ROS2 service 启停录制与回放
- 轨迹保存到 YAML 文件,支持加载后回放
## 参数
- `input_topic`:录制输入话题,默认 `/left_joint_trajectory_controller/controller_state`
- `output_topic`:回放输出话题,默认 `/left_joint_trajectory_controller/joint_trajectory`
- `play_speed`:回放速度倍率,默认 `1.0`
- `loop_playback`:是否循环回放,默认 `false`
- `storage_path`:保存/加载文件路径,默认 `/tmp/openarm_joint_trajectory.yaml`
## 启动
```bash
ros2 launch openarm_joint_recorder joint_trajectory_recorder.launch.py
```
## 服务接口
- `~/set_recording` (`std_srvs/srv/SetBool`): `true` 开始录制,`false` 停止录制
- `~/play` (`std_srvs/srv/Trigger`): 开始回放
- `~/stop_playback` (`std_srvs/srv/Trigger`): 停止回放
- `~/save` (`std_srvs/srv/Trigger`): 保存轨迹到 `storage_path`
- `~/load` (`std_srvs/srv/Trigger`): 从 `storage_path` 加载轨迹
- `~/clear` (`std_srvs/srv/Trigger`): 清空内存轨迹
## 示例命令
```bash
# 开始录制
ros2 service call /joint_trajectory_recorder/set_recording std_srvs/srv/SetBool "{data: true}"
# 停止录制
ros2 service call /joint_trajectory_recorder/set_recording std_srvs/srv/SetBool "{data: false}"
# 保存
ros2 service call /joint_trajectory_recorder/save std_srvs/srv/Trigger "{}"
# 回放
ros2 service call /joint_trajectory_recorder/play std_srvs/srv/Trigger "{}"
```

View File

@ -0,0 +1,20 @@
from launch import LaunchDescription
from launch_ros.actions import Node
def generate_launch_description():
return LaunchDescription([
Node(
package='openarm_joint_recorder',
executable='joint_trajectory_recorder',
name='joint_trajectory_recorder',
output='screen',
parameters=[{
'input_topic': '/left_joint_trajectory_controller/controller_state',
'output_topic': '/left_joint_trajectory_controller/joint_trajectory',
'play_speed': 1.0,
'loop_playback': False,
'storage_path': '/home/shen/openarm_test/openarm_joint_record.yaml',
}],
),
])

View File

@ -0,0 +1,282 @@
import threading
import time
from typing import Dict, List
import rclpy
from rclpy.node import Node
from control_msgs.msg import JointTrajectoryControllerState
from std_srvs.srv import SetBool, Trigger
from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint
import yaml
class JointTrajectoryRecorder(Node):
def __init__(self):
super().__init__('joint_trajectory_recorder')
self.declare_parameter('input_topic', '/left_joint_trajectory_controller/controller_state')
self.declare_parameter('output_topic', '/left_joint_trajectory_controller/joint_trajectory')
self.declare_parameter('play_speed', 1.0)
self.declare_parameter('loop_playback', False)
self.declare_parameter('storage_path', '/tmp/openarm_joint_record.yaml')
self.input_topic = self.get_parameter('input_topic').value
self.output_topic = self.get_parameter('output_topic').value
self.record_lock = threading.Lock()
self.play_lock = threading.Lock()
self.is_recording = False
self.is_playing = False
self.stop_playback_event = threading.Event()
self._record_start_time = None
self.recorded_points: List[Dict] = []
self.get_logger().info(f"input_topic: {self.input_topic}, output_topic: {self.output_topic}")
self.sub = self.create_subscription(
JointTrajectoryControllerState,
self.input_topic,
self.controller_state_callback,
50,
)
self.pub = self.create_publisher(JointTrajectory, self.output_topic, 50)
self.set_recording_srv = self.create_service(
SetBool,
'~/set_recording',
self.handle_set_recording,
)
self.play_srv = self.create_service(
Trigger,
'~/play',
self.handle_play,
)
self.stop_play_srv = self.create_service(
Trigger,
'~/stop_playback',
self.handle_stop_playback,
)
self.save_srv = self.create_service(
Trigger,
'~/save',
self.handle_save,
)
self.load_srv = self.create_service(
Trigger,
'~/load',
self.handle_load,
)
self.clear_srv = self.create_service(
Trigger,
'~/clear',
self.handle_clear,
)
# self.get_logger().info(
# 'joint_trajectory_recorder started. Use services: '
# '~/set_recording, ~/play, ~/stop_playback, ~/save, ~/load, ~/clear'
# )
def controller_state_callback(self, msg: JointTrajectoryControllerState):
if not self.is_recording:
return
names = list(msg.joint_names)
positions = [float(v) for v in msg.actual.positions]
if len(names) == 0 or len(positions) == 0:
return
now_s = time.time()
with self.record_lock:
if self._record_start_time is None:
self._record_start_time = now_s
rel_time = now_s - self._record_start_time
self.recorded_points.append({
't': rel_time,
'name': names,
'position': positions,
})
def handle_set_recording(self, request, response):
if request.data:
with self.record_lock:
self.recorded_points = []
self._record_start_time = None
self.is_recording = True
response.success = True
response.message = 'Recording started. Existing trajectory cleared.'
self.get_logger().info(response.message)
return response
self.is_recording = False
with self.record_lock:
count = len(self.recorded_points)
response.success = True
response.message = f'Recording stopped. Collected {count} points.'
self.get_logger().info(response.message)
return response
def handle_play(self, _request, response):
with self.play_lock:
if self.is_playing:
response.success = False
response.message = 'Playback is already running.'
return response
with self.record_lock:
if len(self.recorded_points) < 1:
response.success = False
response.message = 'No recorded points. Record or load trajectory first.'
return response
self.stop_playback_event.clear()
self.is_playing = True
thread = threading.Thread(target=self._playback_worker, daemon=True)
thread.start()
response.success = True
response.message = 'Playback started.'
self.get_logger().info(response.message)
return response
def handle_stop_playback(self, _request, response):
self.stop_playback_event.set()
response.success = True
response.message = 'Stop playback signal sent.'
self.get_logger().info(response.message)
return response
def handle_clear(self, _request, response):
self.is_recording = False
self.stop_playback_event.set()
with self.record_lock:
self.recorded_points = []
self._record_start_time = None
response.success = True
response.message = 'Recorded trajectory cleared.'
self.get_logger().info(response.message)
return response
def handle_save(self, _request, response):
storage_path = self.get_parameter('storage_path').value
with self.record_lock:
data = {
'version': 1,
'points': self.recorded_points,
}
try:
with open(storage_path, 'w', encoding='utf-8') as f:
yaml.safe_dump(data, f, sort_keys=False)
except Exception as exc:
response.success = False
response.message = f'Save failed: {exc}'
self.get_logger().error(response.message)
return response
response.success = True
response.message = f'Trajectory saved to {storage_path}'
self.get_logger().info(response.message)
return response
def handle_load(self, _request, response):
storage_path = self.get_parameter('storage_path').value
try:
with open(storage_path, 'r', encoding='utf-8') as f:
data = yaml.safe_load(f)
except Exception as exc:
response.success = False
response.message = f'Load failed: {exc}'
self.get_logger().error(response.message)
return response
points = data.get('points', []) if isinstance(data, dict) else []
if not isinstance(points, list):
response.success = False
response.message = 'Load failed: invalid file format.'
self.get_logger().error(response.message)
return response
normalized: List[Dict] = []
for pt in points:
try:
normalized.append({
't': float(pt['t']),
'name': list(pt['name']),
'position': [float(v) for v in pt['position']],
})
except Exception:
continue
with self.record_lock:
self.recorded_points = normalized
self._record_start_time = None
response.success = True
response.message = f'Loaded {len(normalized)} points from {storage_path}'
self.get_logger().info(response.message)
return response
def _playback_worker(self):
play_speed = float(self.get_parameter('play_speed').value)
loop_playback = bool(self.get_parameter('loop_playback').value)
if play_speed <= 0.0:
self.get_logger().warning('play_speed <= 0.0, force to 1.0')
play_speed = 1.0
try:
while rclpy.ok():
with self.record_lock:
points = list(self.recorded_points)
if not points:
break
prev_t = points[0]['t']
for idx, pt in enumerate(points):
if self.stop_playback_event.is_set() or not rclpy.ok():
return
if idx > 0:
dt = max(0.0, (pt['t'] - prev_t) / play_speed)
if dt > 0.0:
time.sleep(dt)
prev_t = pt['t']
traj = JointTrajectory()
# traj.header.stamp = self.get_clock().now().to_msg()
traj.joint_names = pt['name']
point = JointTrajectoryPoint()
point.positions = pt['position']
# point.time_from_start.sec = 0
# point.time_from_start.nanosec = 100000000
traj.points.append(point)
self.pub.publish(traj)
# time.sleep(0.02)
if not loop_playback:
break
finally:
with self.play_lock:
self.is_playing = False
self.stop_playback_event.clear()
self.get_logger().info('Playback finished.')
def main(args=None):
rclpy.init(args=args)
node = JointTrajectoryRecorder()
try:
rclpy.spin(node)
finally:
node.stop_playback_event.set()
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()

View File

@ -0,0 +1,22 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>openarm_joint_recorder</name>
<version>0.0.0</version>
<description>Record and playback joint positions for OpenArm.</description>
<maintainer email="664376944@qq.com">shen</maintainer>
<license>TODO: License declaration</license>
<depend>rclpy</depend>
<depend>control_msgs</depend>
<depend>trajectory_msgs</depend>
<depend>std_srvs</depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View File

@ -0,0 +1 @@
openarm_joint_recorder

View File

@ -0,0 +1,4 @@
[develop]
script_dir=$base/lib/openarm_joint_recorder
[install]
install_scripts=$base/lib/openarm_joint_recorder

View File

@ -0,0 +1,30 @@
from setuptools import find_packages, setup
import os
from glob import glob
package_name = 'openarm_joint_recorder'
setup(
name=package_name,
version='0.0.0',
packages=find_packages(exclude=['test']),
data_files=[
('share/ament_index/resource_index/packages', ['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
(os.path.join('share', package_name, 'launch'), glob('launch/*.launch.py')),
],
install_requires=['setuptools'],
zip_safe=True,
maintainer='shen',
maintainer_email='664376944@qq.com',
description='Record and playback joint positions for OpenArm.',
license='TODO: License declaration',
extras_require={
'test': ['pytest'],
},
entry_points={
'console_scripts': [
'joint_trajectory_recorder = openarm_joint_recorder.joint_trajectory_recorder:main',
],
},
)