初始提交

This commit is contained in:
shen 2026-01-26 01:06:54 +08:00
commit f6391b9b63
83 changed files with 2786 additions and 0 deletions

4
.gitignore vendored Normal file
View File

@ -0,0 +1,4 @@
.vscode/
build/
install/
log/

BIN
calibrationdata.tar.gz Normal file

Binary file not shown.

View File

@ -0,0 +1,31 @@
digraph G {
"openarm_left_hand" -> "openarm_left_right_finger"[label=" Broadcaster: default_authority\nAverage rate: 18.541\nBuffer length: 3.29\nMost recent transform: 1769151143.718129\nOldest transform: 1769151140.428147\n"];
"openarm_left_link7" -> "openarm_left_hand"[label=" Broadcaster: default_authority\nAverage rate: 10000.0\nBuffer length: 0.0\nMost recent transform: 0.0\nOldest transform: 0.0\n"];
"openarm_left_hand" -> "openarm_left_left_finger"[label=" Broadcaster: default_authority\nAverage rate: 18.541\nBuffer length: 3.29\nMost recent transform: 1769151143.718129\nOldest transform: 1769151140.428147\n"];
"openarm_left_link0" -> "openarm_left_link1"[label=" Broadcaster: default_authority\nAverage rate: 18.541\nBuffer length: 3.29\nMost recent transform: 1769151143.718129\nOldest transform: 1769151140.428147\n"];
"openarm_body_link0" -> "openarm_left_link0"[label=" Broadcaster: default_authority\nAverage rate: 10000.0\nBuffer length: 0.0\nMost recent transform: 0.0\nOldest transform: 0.0\n"];
"openarm_left_link1" -> "openarm_left_link2"[label=" Broadcaster: default_authority\nAverage rate: 18.541\nBuffer length: 3.29\nMost recent transform: 1769151143.718129\nOldest transform: 1769151140.428147\n"];
"openarm_left_link2" -> "openarm_left_link3"[label=" Broadcaster: default_authority\nAverage rate: 18.541\nBuffer length: 3.29\nMost recent transform: 1769151143.718129\nOldest transform: 1769151140.428147\n"];
"openarm_left_link3" -> "openarm_left_link4"[label=" Broadcaster: default_authority\nAverage rate: 18.541\nBuffer length: 3.29\nMost recent transform: 1769151143.718129\nOldest transform: 1769151140.428147\n"];
"openarm_left_link4" -> "openarm_left_link5"[label=" Broadcaster: default_authority\nAverage rate: 18.541\nBuffer length: 3.29\nMost recent transform: 1769151143.718129\nOldest transform: 1769151140.428147\n"];
"openarm_left_link5" -> "openarm_left_link6"[label=" Broadcaster: default_authority\nAverage rate: 18.541\nBuffer length: 3.29\nMost recent transform: 1769151143.718129\nOldest transform: 1769151140.428147\n"];
"openarm_left_link6" -> "openarm_left_link7"[label=" Broadcaster: default_authority\nAverage rate: 18.541\nBuffer length: 3.29\nMost recent transform: 1769151143.718129\nOldest transform: 1769151140.428147\n"];
"openarm_right_hand" -> "openarm_right_right_finger"[label=" Broadcaster: default_authority\nAverage rate: 18.541\nBuffer length: 3.29\nMost recent transform: 1769151143.718129\nOldest transform: 1769151140.428147\n"];
"openarm_right_link7" -> "openarm_right_hand"[label=" Broadcaster: default_authority\nAverage rate: 10000.0\nBuffer length: 0.0\nMost recent transform: 0.0\nOldest transform: 0.0\n"];
"openarm_right_hand" -> "openarm_right_left_finger"[label=" Broadcaster: default_authority\nAverage rate: 18.541\nBuffer length: 3.29\nMost recent transform: 1769151143.718129\nOldest transform: 1769151140.428147\n"];
"openarm_right_link0" -> "openarm_right_link1"[label=" Broadcaster: default_authority\nAverage rate: 18.541\nBuffer length: 3.29\nMost recent transform: 1769151143.718129\nOldest transform: 1769151140.428147\n"];
"openarm_body_link0" -> "openarm_right_link0"[label=" Broadcaster: default_authority\nAverage rate: 10000.0\nBuffer length: 0.0\nMost recent transform: 0.0\nOldest transform: 0.0\n"];
"openarm_right_link1" -> "openarm_right_link2"[label=" Broadcaster: default_authority\nAverage rate: 18.541\nBuffer length: 3.29\nMost recent transform: 1769151143.718129\nOldest transform: 1769151140.428147\n"];
"openarm_right_link2" -> "openarm_right_link3"[label=" Broadcaster: default_authority\nAverage rate: 18.541\nBuffer length: 3.29\nMost recent transform: 1769151143.718129\nOldest transform: 1769151140.428147\n"];
"openarm_right_link3" -> "openarm_right_link4"[label=" Broadcaster: default_authority\nAverage rate: 18.541\nBuffer length: 3.29\nMost recent transform: 1769151143.718129\nOldest transform: 1769151140.428147\n"];
"openarm_right_link4" -> "openarm_right_link5"[label=" Broadcaster: default_authority\nAverage rate: 18.541\nBuffer length: 3.29\nMost recent transform: 1769151143.718129\nOldest transform: 1769151140.428147\n"];
"openarm_right_link5" -> "openarm_right_link6"[label=" Broadcaster: default_authority\nAverage rate: 18.541\nBuffer length: 3.29\nMost recent transform: 1769151143.718129\nOldest transform: 1769151140.428147\n"];
"openarm_right_link6" -> "openarm_right_link7"[label=" Broadcaster: default_authority\nAverage rate: 18.541\nBuffer length: 3.29\nMost recent transform: 1769151143.718129\nOldest transform: 1769151140.428147\n"];
"world" -> "openarm_body_link0"[label=" Broadcaster: default_authority\nAverage rate: 10000.0\nBuffer length: 0.0\nMost recent transform: 0.0\nOldest transform: 0.0\n"];
"openarm_left_hand" -> "openarm_left_hand_tcp"[label=" Broadcaster: default_authority\nAverage rate: 10000.0\nBuffer length: 0.0\nMost recent transform: 0.0\nOldest transform: 0.0\n"];
"openarm_right_hand" -> "openarm_right_hand_tcp"[label=" Broadcaster: default_authority\nAverage rate: 10000.0\nBuffer length: 0.0\nMost recent transform: 0.0\nOldest transform: 0.0\n"];
edge [style=invis];
subgraph cluster_legend { style=bold; color=black; label ="view_frames Result";
"Recorded at time: 1769151143.7895906"[ shape=plaintext ] ;
}->"world";
}

Binary file not shown.

BIN
handeye_data.npz Normal file

Binary file not shown.

View File

@ -0,0 +1,113 @@
import rclpy
from rclpy.node import Node
# from control_msgs.msg import JointTrajectoryControllerState
from control_msgs.action import FollowJointTrajectory
from rclpy.action import ActionClient
from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint
from geometry_msgs.msg import Twist
# class ControlPub(Node):
# def __init__(self, node_name):
# super().__init__(node_name)
# self.status_publisher_ = self.create_publisher(
# JointTrajectoryControllerState, '/right_joint_trajectory_controller/controller_state', 100)
# self.timer = self.create_timer(0.02, self.timer_callback)
# def timer_callback(self):
# arm_pos = [0.15, 0.15, 0.15, 0.15, 0.15, 0.15, 0.15]
# arm_zero = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
# msg = JointTrajectoryControllerState()
# msg.reference.positions = arm_zero
# self.get_logger().info(f'发布:{msg}')
# self.status_publisher_.publish(msg)
# def main():
# rclpy.init()
# node = ControlPub('control_pub')
# rclpy.spin(node)
# rclpy.shutdown()
count = 0
count1 = 0
class ProgressActionClient(Node):
def __init__(self):
super().__init__('progress_action_client')
self.subscriber_ = self.create_subscription(
Twist,
"/cmd_vel",
self.listener_callback,
100
)
# 3-1.创建动作客户端;
self._action_client = ActionClient(self, FollowJointTrajectory, '/right_joint_trajectory_controller/follow_joint_trajectory')
# 3. 创建订阅者
# 参数1消息类型如String与发布者一致
# 参数2话题名称如"chatter",与发布者一致)
# 参数3回调函数消息到达时触发用于处理消息
# 参数4队列长度10缓存消息的最大数量
self.subscriber_ # 防止未使用变量警告
# 4. 消息处理回调函数(核心)
# 当订阅到消息时,自动调用此函数处理消息
def listener_callback(self, msg):
global count
global count1
count += msg.linear.x
count1 += msg.linear.z
# 打印接收的消息(可根据需求修改处理逻辑,如存储、解析、控制硬件等)
self.get_logger().info(f"接收到消息: {msg.linear.x} {msg.linear.z} angle:{count}")
self.send_goal([count, count1, 0.0, 0.0, 0.0, 0.0, 0.0])
def send_goal(self, arm_pos):
arm_joint = ['openarm_right_joint1', 'openarm_right_joint2', 'openarm_right_joint3', 'openarm_right_joint4', 'openarm_right_joint5', 'openarm_right_joint6', 'openarm_right_joint7']
arm_effort = [10.0, 10.0, 10.0, 10.0, 10.0, 10.0, 10.0]
arm_zero = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
# 3-2.发送请求;
goal_msg = FollowJointTrajectory.Goal()
goal_msg.trajectory.joint_names = arm_joint
goal_msg.trajectory.points.append(JointTrajectoryPoint())
goal_msg.trajectory.points[0].positions = arm_pos
# goal_msg.trajectory.points[0].effort = arm_effort
print(goal_msg)
self._action_client.wait_for_server()
self._send_goal_future = self._action_client.send_goal_async(goal_msg, feedback_callback=self.feedback_callback)
self._send_goal_future.add_done_callback(self.goal_response_callback)
def goal_response_callback(self, future):
# 3-3.处理目标发送后的反馈;
goal_handle = future.result()
print(goal_handle)
if not goal_handle.accepted:
self.get_logger().info('请求被拒绝')
return
self.get_logger().info('请求被接收,开始执行任务!')
self._get_result_future = goal_handle.get_result_async()
self._get_result_future.add_done_callback(self.get_result_callback)
# 3-5.处理最终响应。
def get_result_callback(self, future):
return
# result = future.result().result
# self.get_logger().info('最终计算结果sum = %d' % result.sum)
# 5.释放资源。
# rclpy.shutdown()
# 3-4.处理连续反馈;
def feedback_callback(self, feedback_msg):
feedback = (int)(feedback_msg.feedback.progress * 100)
self.get_logger().info('当前进度: %d%%' % feedback)
def main():
rclpy.init()
node = ProgressActionClient()
# node.send_goal([0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0])
rclpy.spin(node)
rclpy.shutdown()

View File

@ -0,0 +1,20 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>demo_openarm_publish</name>
<version>0.0.0</version>
<description>TODO: Package description</description>
<maintainer email="shen@todo.todo">shen</maintainer>
<license>TODO: License declaration</license>
<depend>rclpy</depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View File

@ -0,0 +1,4 @@
[develop]
script_dir=$base/lib/demo_openarm_publish
[install]
install_scripts=$base/lib/demo_openarm_publish

View File

@ -0,0 +1,26 @@
from setuptools import find_packages, setup
package_name = 'demo_openarm_publish'
setup(
name=package_name,
version='0.0.0',
packages=find_packages(exclude=['test']),
data_files=[
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
],
install_requires=['setuptools'],
zip_safe=True,
maintainer='shen',
maintainer_email='shen@todo.todo',
description='TODO: Package description',
license='TODO: License declaration',
tests_require=['pytest'],
entry_points={
'console_scripts': [
'pub_node = demo_openarm_publish.publish:main'
],
},
)

View File

@ -0,0 +1,25 @@
# Copyright 2015 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ament_copyright.main import main
import pytest
# Remove the `skip` decorator once the source file(s) have a copyright header
@pytest.mark.skip(reason='No copyright header has been placed in the generated source file.')
@pytest.mark.copyright
@pytest.mark.linter
def test_copyright():
rc = main(argv=['.', 'test'])
assert rc == 0, 'Found errors'

View File

@ -0,0 +1,25 @@
# Copyright 2017 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ament_flake8.main import main_with_errors
import pytest
@pytest.mark.flake8
@pytest.mark.linter
def test_flake8():
rc, errors = main_with_errors(argv=[])
assert rc == 0, \
'Found %d code style errors / warnings:\n' % len(errors) + \
'\n'.join(errors)

View File

@ -0,0 +1,23 @@
# Copyright 2015 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ament_pep257.main import main
import pytest
@pytest.mark.linter
@pytest.mark.pep257
def test_pep257():
rc = main(argv=['.', 'test'])
assert rc == 0, 'Found code style errors / warnings'

View File

@ -0,0 +1,39 @@
import rclpy
from rclpy.node import Node
from control_msgs.msg import JointTrajectoryControllerState
# 2. 定义节点类继承Node
class MySubscriber(Node):
def __init__(self):
# 调用父类构造函数,初始化节点名称(如"py_subscriber_node"
super().__init__("py_subscriber_node")
# 3. 创建订阅者
# 参数1消息类型如String与发布者一致
# 参数2话题名称如"chatter",与发布者一致)
# 参数3回调函数消息到达时触发用于处理消息
# 参数4队列长度10缓存消息的最大数量
self.subscriber_ = self.create_subscription(
JointTrajectoryControllerState,
"/right_joint_trajectory_controller/controller_state",
self.listener_callback,
100
)
self.subscriber_ # 防止未使用变量警告
# 4. 消息处理回调函数(核心)
# 当订阅到消息时,自动调用此函数处理消息
def listener_callback(self, msg):
# 打印接收的消息(可根据需求修改处理逻辑,如存储、解析、控制硬件等)
self.get_logger().info(f"接收到消息: {msg}")
def main():
rclpy.init()
node = MySubscriber()
node.get_logger().info('111111111111')
node.get_logger().warn('222222222222222')
rclpy.spin(node) # 启动事件循环(持续监听话题,等待消息)
node.destroy_node() # 退出时销毁节点
rclpy.shutdown() # 关闭ROS 2运行时

View File

@ -0,0 +1,20 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>demo_openarm_subscriber</name>
<version>0.0.0</version>
<description>TODO: Package description</description>
<maintainer email="shen@todo.todo">shen</maintainer>
<license>TODO: License declaration</license>
<depend>rclpy</depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View File

@ -0,0 +1,4 @@
[develop]
script_dir=$base/lib/demo_openarm_subscriber
[install]
install_scripts=$base/lib/demo_openarm_subscriber

View File

@ -0,0 +1,26 @@
from setuptools import find_packages, setup
package_name = 'demo_openarm_subscriber'
setup(
name=package_name,
version='0.0.0',
packages=find_packages(exclude=['test']),
data_files=[
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
],
install_requires=['setuptools'],
zip_safe=True,
maintainer='shen',
maintainer_email='shen@todo.todo',
description='TODO: Package description',
license='TODO: License declaration',
tests_require=['pytest'],
entry_points={
'console_scripts': [
'sub_node = demo_openarm_subscriber.subscriber:main'
],
},
)

View File

@ -0,0 +1,25 @@
# Copyright 2015 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ament_copyright.main import main
import pytest
# Remove the `skip` decorator once the source file(s) have a copyright header
@pytest.mark.skip(reason='No copyright header has been placed in the generated source file.')
@pytest.mark.copyright
@pytest.mark.linter
def test_copyright():
rc = main(argv=['.', 'test'])
assert rc == 0, 'Found errors'

View File

@ -0,0 +1,25 @@
# Copyright 2017 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ament_flake8.main import main_with_errors
import pytest
@pytest.mark.flake8
@pytest.mark.linter
def test_flake8():
rc, errors = main_with_errors(argv=[])
assert rc == 0, \
'Found %d code style errors / warnings:\n' % len(errors) + \
'\n'.join(errors)

View File

@ -0,0 +1,23 @@
# Copyright 2015 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ament_pep257.main import main
import pytest
@pytest.mark.linter
@pytest.mark.pep257
def test_pep257():
rc = main(argv=['.', 'test'])
assert rc == 0, 'Found code style errors / warnings'

View File

@ -0,0 +1,232 @@
import cv2
from mediapipe.framework.formats import landmark_pb2
import mediapipe as mp
mp_drawing = mp.solutions.drawing_utils
mp_face_mesh = mp.solutions.face_mesh
FACE_CONNECTIONS_LIPS = frozenset([
# Lips.
(61, 146),
(146, 91),
(91, 181),
(181, 84),
(84, 17),
(17, 314),
(314, 405),
(405, 321),
(321, 375),
(375, 291),
(61, 185),
(185, 40),
(40, 39),
(39, 37),
(37, 0),
(0, 267),
(267, 269),
(269, 270),
(270, 409),
(409, 291),
(78, 95),
(95, 88),
(88, 178),
(178, 87),
(87, 14),
(14, 317),
(317, 402),
(402, 318),
(318, 324),
(324, 308),
(78, 191),
(191, 80),
(80, 81),
(81, 82),
(82, 13),
(13, 312),
(312, 311),
(311, 310),
(310, 415),
(415, 308)
])
FACE_CONNECTIONS_OVAL = frozenset([
# Face oval.
(10, 338),
(338, 297),
(297, 332),
(332, 284),
(284, 251),
(251, 389),
(389, 356),
(356, 454),
(454, 323),
(323, 361),
(361, 288),
(288, 397),
(397, 365),
(365, 379),
(379, 378),
(378, 400),
(400, 377),
(377, 152),
(152, 148),
(148, 176),
(176, 149),
(149, 150),
(150, 136),
(136, 172),
(172, 58),
(58, 132),
(132, 93),
(93, 234),
(234, 127),
(127, 162),
(162, 21),
(21, 54),
(54, 103),
(103, 67),
(67, 109),
(109, 10)
])
FACE_CONNECTIONS_EYES= frozenset([
# Left eye.
(263, 249),
(249, 390),
(390, 373),
(373, 374),
(374, 380),
(380, 381),
(381, 382),
(382, 362),
(263, 466),
(466, 388),
(388, 387),
(387, 386),
(386, 385),
(385, 384),
(384, 398),
(398, 362),
# Right eye.
(33, 7),
(7, 163),
(163, 144),
(144, 145),
(145, 153),
(153, 154),
(154, 155),
(155, 133),
(33, 246),
(246, 161),
(161, 160),
(160, 159),
(159, 158),
(158, 157),
(157, 173),
(173, 133),
])
FACE_CONNECTIONS_EYEBROWS= frozenset([
# Left eyebrow.
(276, 283),
(283, 282),
(282, 295),
(295, 285),
(300, 293),
(293, 334),
(334, 296),
(296, 336),
# Right eyebrow.
(46, 53),
(53, 52),
(52, 65),
(65, 55),
(70, 63),
(63, 105),
(105, 66),
(66, 107),
])
# For static images:
IMAGE_FILES = []
drawing_spec = mp_drawing.DrawingSpec(thickness=1, circle_radius=1)
with mp_face_mesh.FaceMesh(
static_image_mode=True,
max_num_faces=1,
min_detection_confidence=0.5) as face_mesh:
for idx, file in enumerate(IMAGE_FILES):
image = cv2.imread(file)
# Convert the BGR image to RGB before processing.
results = face_mesh.process(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
# Print and draw face mesh landmarks on the image.
if not results.multi_face_landmarks:
continue
annotated_image = image.copy()
for face_landmarks in results.multi_face_landmarks:
print('face_landmarks:', face_landmarks)
mp_drawing.draw_landmarks(
image=annotated_image,
landmark_list=face_landmarks,
connections=mp_face_mesh.FACE_CONNECTIONS,
landmark_drawing_spec=drawing_spec,
connection_drawing_spec=drawing_spec)
cv2.imwrite('/tmp/annotated_image' + str(idx) + '.png', annotated_image)
# For webcam input:
drawing_spec = mp_drawing.DrawingSpec(thickness=1, circle_radius=1)
cap = cv2.VideoCapture(0)
with mp_face_mesh.FaceMesh(
min_detection_confidence=0.5,
min_tracking_confidence=0.5) as face_mesh:
while cap.isOpened():
success, image = cap.read()
if not success:
print("Ignoring empty camera frame.")
# If loading a video, use 'break' instead of 'continue'.
continue
# Flip the image horizontally for a later selfie-view display, and convert
# the BGR image to RGB.
image = cv2.cvtColor(cv2.flip(image, 1), cv2.COLOR_BGR2RGB)
# To improve performance, optionally mark the image as not writeable to
# pass by reference.
image.flags.writeable = False
results = face_mesh.process(image)
image_hight, image_width, _ = image.shape
# Draw the face mesh annotations on the image.
image.flags.writeable = True
image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
teste = []
if results.multi_face_landmarks:
index_face = 0
while index_face <= 467:
teste.append(results.multi_face_landmarks[0].landmark[index_face])
index_face = index_face +1
landmark_subset = landmark_pb2.NormalizedLandmarkList(
landmark = teste
)
print(landmark_subset)
mp_drawing.draw_landmarks(
image=image,
landmark_list=landmark_subset,
connections=FACE_CONNECTIONS_EYEBROWS,
landmark_drawing_spec=drawing_spec,
connection_drawing_spec=drawing_spec)
cv2.imshow('MediaPipe FaceMesh', image)
if cv2.waitKey(5) & 0xFF == 27:
break
cap.release()
#boca cima
# teste.append(results.multi_face_landmarks[0].landmark[0])
# teste.append(results.multi_face_landmarks[0].landmark[12])
# # teste.append(results.multi_face_landmarks[0].landmark[13])
# # teste.append(results.multi_face_landmarks[0].landmark[14])
#boca baixo
# # teste.append(results.multi_face_landmarks[0].landmark[15])

View File

@ -0,0 +1,117 @@
import rclpy
import cv2
import mediapipe as mp
from mediapipe.framework.formats import landmark_pb2
from rclpy.node import Node
from media_pipe_ros2_msg.msg import MediaPipeHumanFaceMeshList
from mediapipe.python.solutions.pose import PoseLandmark
mp_drawing = mp.solutions.drawing_utils
mp_face_mesh = mp.solutions.face_mesh
NAME_POSE = [
(PoseLandmark.NOSE), (PoseLandmark.LEFT_EYE_INNER),
(PoseLandmark.LEFT_EYE), (PoseLandmark.LEFT_EYE_OUTER),
(PoseLandmark.RIGHT_EYE_INNER), ( PoseLandmark.RIGHT_EYE),
(PoseLandmark.RIGHT_EYE_OUTER), ( PoseLandmark.LEFT_EAR),
(PoseLandmark.RIGHT_EAR), ( PoseLandmark.MOUTH_LEFT),
(PoseLandmark.MOUTH_RIGHT), ( PoseLandmark.LEFT_SHOULDER),
(PoseLandmark.RIGHT_SHOULDER), ( PoseLandmark.LEFT_ELBOW),
(PoseLandmark.RIGHT_ELBOW), ( PoseLandmark.LEFT_WRIST),
(PoseLandmark.RIGHT_WRIST), ( PoseLandmark.LEFT_PINKY),
(PoseLandmark.RIGHT_PINKY), ( PoseLandmark.LEFT_INDEX),
(PoseLandmark.RIGHT_INDEX), ( PoseLandmark.LEFT_THUMB),
(PoseLandmark.RIGHT_THUMB), ( PoseLandmark.LEFT_HIP),
(PoseLandmark.RIGHT_HIP), ( PoseLandmark.LEFT_KNEE),
(PoseLandmark.RIGHT_KNEE), ( PoseLandmark.LEFT_ANKLE),
(PoseLandmark.RIGHT_ANKLE), ( PoseLandmark.LEFT_HEEL),
(PoseLandmark.RIGHT_HEEL), ( PoseLandmark.LEFT_FOOT_INDEX),
(PoseLandmark.RIGHT_FOOT_INDEX)
]
cap = cv2.VideoCapture(0)
class FaceMeshPublisher(Node):
def __init__(self):
super().__init__('mediapipe_face_mesh_publisher')
self.publisher_ = self.create_publisher(MediaPipeHumanFaceMeshList, '/mediapipe/human_face_mesh_list', 10)
def getimage_callback(self):
mediapipehumanfacemeshlist = MediaPipeHumanFaceMeshList()
drawing_spec = mp_drawing.DrawingSpec(thickness=1, circle_radius=1)
with mp_face_mesh.FaceMesh(
min_detection_confidence=0.5,
min_tracking_confidence=0.5) as face_mesh:
while cap.isOpened():
success, image = cap.read()
if not success:
print("Sem camera.")
image = cv2.cvtColor(cv2.flip(image, 1), cv2.COLOR_BGR2RGB)
image.flags.writeable = False
results = face_mesh.process(image)
image.flags.writeable = True
image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
imageHeight, imageWidth, _ = image.shape
# Draw the pose annotation on the image.
image.flags.writeable = True
image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
teste = []
if results.multi_face_landmarks:
index_face_mesh = 0
while index_face_mesh <= 467:
print(index_face_mesh )
teste.append(results.multi_face_landmarks[0].landmark[index_face_mesh])
mediapipehumanfacemeshlist.human_face_mesh_list[index_face_mesh].x = results.multi_face_landmarks[0].landmark[index_face_mesh].x
mediapipehumanfacemeshlist.human_face_mesh_list[index_face_mesh].y = results.multi_face_landmarks[0].landmark[index_face_mesh].y
mediapipehumanfacemeshlist.human_face_mesh_list[index_face_mesh].z = results.multi_face_landmarks[0].landmark[index_face_mesh].z
index_face_mesh = index_face_mesh +1
landmark_subset = landmark_pb2.NormalizedLandmarkList(landmark = teste)
mp_drawing.draw_landmarks(
image=image,
landmark_list=landmark_subset,
connections=mp_face_mesh.FACE_CONNECTIONS,
landmark_drawing_spec=drawing_spec,
connection_drawing_spec=drawing_spec)
mediapipehumanfacemeshlist.num_humans = 1
self.publisher_.publish(mediapipehumanfacemeshlist)
else: # responsavel por mandar 0 nos topicos quando corpo nao esta na tela
index_face_mesh = 0
while index_face_mesh <= 467:
mediapipehumanfacemeshlist.human_face_mesh_list[index_face_mesh].x = 0.0
mediapipehumanfacemeshlist.human_face_mesh_list[index_face_mesh].y = 0.0
mediapipehumanfacemeshlist.human_face_mesh_list[index_face_mesh].z = 0.0
index_face_mesh = index_face_mesh +1
mediapipehumanfacemeshlist.num_humans = 1
self.publisher_.publish(mediapipehumanfacemeshlist)
cv2.imshow('MediaPipe Face Mesh', image)
if cv2.waitKey(5) & 0xFF == 27:
break
def main(args=None):
rclpy.init(args=args)
face_mesh_publisher = FaceMeshPublisher()
face_mesh_publisher.getimage_callback()
cap.release()
rclpy.spin(face_mesh_publisher)
face_mesh_publisher.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()

View File

@ -0,0 +1,131 @@
import rclpy
import cv2
import mediapipe as mp
from rclpy.node import Node
from media_pipe_ros2_msg.msg import HandPoint,MediaPipeHumanHand,MediaPipeHumanHandList
mp_drawing = mp.solutions.drawing_utils
mp_hands = mp.solutions.hands
cap = cv2.VideoCapture(0)
class HandsPublisher(Node):
def __init__(self):
super().__init__('mediapipe_publisher')
self.publisher_ = self.create_publisher(MediaPipeHumanHandList, '/mediapipe/human_hand_list', 10)
def getimage_callback(self):
mediapipehumanlist = MediaPipeHumanHandList()
mediapipehuman = MediaPipeHumanHand()
points = HandPoint()
with mp_hands.Hands(
static_image_mode=False,
min_detection_confidence=0.7,
min_tracking_confidence=0.7,
max_num_hands=2) as hands:
while cap.isOpened():
success, image = cap.read()
if not success:
print("Sem camera.")
image = cv2.cvtColor(cv2.flip(image, 1), cv2.COLOR_BGR2RGB)
image.flags.writeable = False
results = hands.process(image)
image.flags.writeable = True
image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
imageHeight, imageWidth, _ = image.shape
if results.multi_hand_landmarks != None:
hand_number_screen = 0 # index de controle de quantas maos aparecem na tela
#esse for passa pela quantidades de mão na tela setada como maximo 2 no momento
for hand_landmarks, handedness in zip(results.multi_hand_landmarks,results.multi_handedness):
if handedness.classification[0].label == "Right":
mp_drawing.draw_landmarks(
image, hand_landmarks, mp_hands.HAND_CONNECTIONS)
index_point = 0
for point in mp_hands.HandLandmark:
normalizedLandmark = hand_landmarks.landmark[point]
mediapipehuman.right_hand_key_points[index_point].name = str(point)
mediapipehuman.right_hand_key_points[index_point].x = normalizedLandmark.x
mediapipehuman.right_hand_key_points[index_point].y = normalizedLandmark.y
mediapipehuman.right_hand_key_points[index_point].z = normalizedLandmark.z
if hand_number_screen == 0:
mediapipehuman.left_hand_key_points[index_point].name = str(point)
mediapipehuman.left_hand_key_points[index_point].x = 0.0
mediapipehuman.left_hand_key_points[index_point].y = 0.0
mediapipehuman.left_hand_key_points[index_point].z = 0.0
index_point = index_point +1
hand_number_screen = hand_number_screen +1
elif handedness.classification[0].label == "Left":
mp_drawing.draw_landmarks(
image, hand_landmarks, mp_hands.HAND_CONNECTIONS)
index_point = 0
for point in mp_hands.HandLandmark:
normalizedLandmark = hand_landmarks.landmark[point]
points.name = str(point)
mediapipehuman.left_hand_key_points[index_point].name = str(point)
mediapipehuman.left_hand_key_points[index_point].x = normalizedLandmark.x
mediapipehuman.left_hand_key_points[index_point].y = normalizedLandmark.y
mediapipehuman.left_hand_key_points[index_point].z = normalizedLandmark.z
if hand_number_screen == 0:
mediapipehuman.right_hand_key_points[index_point].name = str(point)
mediapipehuman.right_hand_key_points[index_point].x = 0.0
mediapipehuman.right_hand_key_points[index_point].y = 0.0
mediapipehuman.right_hand_key_points[index_point].z = 0.0
index_point = index_point +1
hand_number_screen = hand_number_screen +1
mediapipehumanlist.human_hand_list.right_hand_key_points = mediapipehuman.right_hand_key_points
mediapipehumanlist.human_hand_list.left_hand_key_points = mediapipehuman.left_hand_key_points
mediapipehumanlist.num_humans = 1
self.publisher_.publish(mediapipehumanlist)
else: # responsavel por mandar 0 nos topicos quando as duas maos nao estao na tela
index_point = 0
for point in mp_hands.HandLandmark:
mediapipehuman.right_hand_key_points[index_point].name = str(point)
mediapipehuman.right_hand_key_points[index_point].x = 0.0
mediapipehuman.right_hand_key_points[index_point].y = 0.0
mediapipehuman.right_hand_key_points[index_point].z = 0.0
mediapipehuman.left_hand_key_points[index_point].name = str(point)
mediapipehuman.left_hand_key_points[index_point].x = 0.0
mediapipehuman.left_hand_key_points[index_point].y = 0.0
mediapipehuman.left_hand_key_points[index_point].z = 0.0
index_point = index_point + 1
mediapipehumanlist.human_hand_list.right_hand_key_points = mediapipehuman.right_hand_key_points
mediapipehumanlist.human_hand_list.left_hand_key_points = mediapipehuman.left_hand_key_points
mediapipehumanlist.num_humans = 1
self.publisher_.publish(mediapipehumanlist)
cv2.imshow('MediaPipe Hands', image)
if cv2.waitKey(5) & 0xFF == 27:
break
def main(args=None):
rclpy.init(args=args)
hands_publisher = HandsPublisher()
hands_publisher.getimage_callback()
cap.release()
rclpy.spin(hands_publisher)
hands_publisher.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()

View File

@ -0,0 +1,221 @@
import rclpy
import cv2
import mediapipe as mp
from rclpy.node import Node
from media_pipe_ros2_msg.msg import HandPoint,HandPoint,MediaPipeHumanHand,MediaPipeHumanHolisticList
from mediapipe.python.solutions.pose import PoseLandmark
from mediapipe.framework.formats import landmark_pb2
mp_drawing = mp.solutions.drawing_utils
mp_holistic = mp.solutions.holistic
mp_hands = mp.solutions.hands
mp_pose = mp.solutions.pose
mp_face_mesh = mp.solutions.face_mesh
cap = cv2.VideoCapture(0)
NAME_POSE = [
(PoseLandmark.NOSE), (PoseLandmark.LEFT_EYE_INNER),
(PoseLandmark.LEFT_EYE), (PoseLandmark.LEFT_EYE_OUTER),
(PoseLandmark.RIGHT_EYE_INNER), ( PoseLandmark.RIGHT_EYE),
(PoseLandmark.RIGHT_EYE_OUTER), ( PoseLandmark.LEFT_EAR),
(PoseLandmark.RIGHT_EAR), ( PoseLandmark.MOUTH_LEFT),
(PoseLandmark.MOUTH_RIGHT), ( PoseLandmark.LEFT_SHOULDER),
(PoseLandmark.RIGHT_SHOULDER), ( PoseLandmark.LEFT_ELBOW),
(PoseLandmark.RIGHT_ELBOW), ( PoseLandmark.LEFT_WRIST),
(PoseLandmark.RIGHT_WRIST), ( PoseLandmark.LEFT_PINKY),
(PoseLandmark.RIGHT_PINKY), ( PoseLandmark.LEFT_INDEX),
(PoseLandmark.RIGHT_INDEX), ( PoseLandmark.LEFT_THUMB),
(PoseLandmark.RIGHT_THUMB), ( PoseLandmark.LEFT_HIP),
(PoseLandmark.RIGHT_HIP), ( PoseLandmark.LEFT_KNEE),
(PoseLandmark.RIGHT_KNEE), ( PoseLandmark.LEFT_ANKLE),
(PoseLandmark.RIGHT_ANKLE), ( PoseLandmark.LEFT_HEEL),
(PoseLandmark.RIGHT_HEEL), ( PoseLandmark.LEFT_FOOT_INDEX),
(PoseLandmark.RIGHT_FOOT_INDEX)
]
class HolisticPublisher(Node):
def __init__(self):
super().__init__('mediapipe_publisher_holistic')
self.publisher_ = self.create_publisher(MediaPipeHumanHolisticList, '/mediapipe/human_holistic_list', 10)
def getimage_callback(self):
mediapipehumanholisticlist = MediaPipeHumanHolisticList()
mediapipehuman = MediaPipeHumanHand()
points = HandPoint()
drawing_spec = mp_drawing.DrawingSpec(thickness=1, circle_radius=1)
with mp_face_mesh.FaceMesh(
min_detection_confidence=0.5,
min_tracking_confidence=0.5) as face_mesh,mp_hands.Hands(
static_image_mode=False,
min_detection_confidence=0.7,
min_tracking_confidence=0.7,
max_num_hands=2) as hands, mp_pose.Pose(
min_detection_confidence=0.5,
min_tracking_confidence=0.5) as pose:
while cap.isOpened():
success, image = cap.read()
if not success:
print("Sem camera.")
image = cv2.cvtColor(cv2.flip(image, 1), cv2.COLOR_BGR2RGB)
image.flags.writeable = False
results_face_mesh = face_mesh.process(image)
results_pose = pose.process(image)
results_hands= hands.process(image)
image.flags.writeable = True
image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
imageHeight, imageWidth, _ = image.shape
landmark_temp = []
#face process
if results_face_mesh.multi_face_landmarks:
index_face_mesh = 0
while index_face_mesh <= 467:
landmark_temp.append(results_face_mesh.multi_face_landmarks[0].landmark[index_face_mesh])
mediapipehumanholisticlist.human_face_mesh_list[index_face_mesh].x = results_face_mesh.multi_face_landmarks[0].landmark[index_face_mesh].x
mediapipehumanholisticlist.human_face_mesh_list[index_face_mesh].y = results_face_mesh.multi_face_landmarks[0].landmark[index_face_mesh].y
mediapipehumanholisticlist.human_face_mesh_list[index_face_mesh].z = results_face_mesh.multi_face_landmarks[0].landmark[index_face_mesh].z
index_face_mesh = index_face_mesh +1
landmark_subset = landmark_pb2.NormalizedLandmarkList(landmark = landmark_temp)
mp_drawing.draw_landmarks(
image=image,
landmark_list=landmark_subset,
connections=mp_face_mesh.FACE_CONNECTIONS,
landmark_drawing_spec=drawing_spec,
connection_drawing_spec=drawing_spec)
else: # responsavel por mandar 0 nos topicos quando corpo nao esta na tela
index_pose = 0
for point in mp_pose.PoseLandmark:
mediapipehumanholisticlist.human_pose_list[index_pose].name = str(NAME_POSE[index_pose])
mediapipehumanholisticlist.human_pose_list[index_pose].x = 0.0
mediapipehumanholisticlist.human_pose_list[index_pose].y = 0.0
mediapipehumanholisticlist.human_pose_list[index_pose].visibility = 0.0
index_pose = index_pose +1
#processo pose
# Draw the pose annotation on the image.
image.flags.writeable = True
image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
mp_drawing.draw_landmarks(
image, results_pose.pose_landmarks, mp_pose.POSE_CONNECTIONS)
if results_pose.pose_landmarks != None:
index_pose = 0
for pose_landmarks in (results_pose.pose_landmarks.landmark):
mediapipehumanholisticlist.human_pose_list[index_pose].name = str(NAME_POSE[index_pose])
mediapipehumanholisticlist.human_pose_list[index_pose].x = pose_landmarks.x
mediapipehumanholisticlist.human_pose_list[index_pose].y = pose_landmarks.y
mediapipehumanholisticlist.human_pose_list[index_pose].visibility = pose_landmarks.visibility
index_pose = index_pose +1
mediapipehumanholisticlist.num_humans = 1
self.publisher_.publish(mediapipehumanholisticlist)
else: # responsavel por mandar 0 nos topicos quando corpo nao esta na tela
index_pose = 0
for point in mp_pose.PoseLandmark:
print(index_pose)
mediapipehumanholisticlist.human_pose_list[index_pose].name = str(NAME_POSE[index_pose])
mediapipehumanholisticlist.human_pose_list[index_pose].x = 0.0
mediapipehumanholisticlist.human_pose_list[index_pose].y = 0.0
mediapipehumanholisticlist.human_pose_list[index_pose].visibility = 0.0
index_pose = index_pose +1
#HAND PROCESS
if results_hands.multi_hand_landmarks != None:
hand_number_screen = 0 # index de controle de quantas maos aparecem na tela
#esse for passa pela quantidades de mão na tela setada como maximo 2 no momento
for hand_landmarks, handedness in zip(results_hands.multi_hand_landmarks,results_hands.multi_handedness):
if handedness.classification[0].label == "Right":
mp_drawing.draw_landmarks(
image, hand_landmarks, mp_hands.HAND_CONNECTIONS)
index_point = 0
for point in mp_hands.HandLandmark:
normalizedLandmark = hand_landmarks.landmark[point]
mediapipehuman.right_hand_key_points[index_point].name = str(point)
mediapipehuman.right_hand_key_points[index_point].x = normalizedLandmark.x
mediapipehuman.right_hand_key_points[index_point].y = normalizedLandmark.y
mediapipehuman.right_hand_key_points[index_point].z = normalizedLandmark.z
if hand_number_screen == 0:
mediapipehuman.left_hand_key_points[index_point].name = str(point)
mediapipehuman.left_hand_key_points[index_point].x = 0.0
mediapipehuman.left_hand_key_points[index_point].y = 0.0
mediapipehuman.left_hand_key_points[index_point].z = 0.0
index_point = index_point +1
hand_number_screen = hand_number_screen +1
elif handedness.classification[0].label == "Left":
mp_drawing.draw_landmarks(
image, hand_landmarks, mp_hands.HAND_CONNECTIONS)
index_point = 0
for point in mp_hands.HandLandmark:
normalizedLandmark = hand_landmarks.landmark[point]
points.name = str(point)
mediapipehuman.left_hand_key_points[index_point].name = str(point)
mediapipehuman.left_hand_key_points[index_point].x = normalizedLandmark.x
mediapipehuman.left_hand_key_points[index_point].y = normalizedLandmark.y
mediapipehuman.left_hand_key_points[index_point].z = normalizedLandmark.z
if hand_number_screen == 0:
mediapipehuman.right_hand_key_points[index_point].name = str(point)
mediapipehuman.right_hand_key_points[index_point].x = 0.0
mediapipehuman.right_hand_key_points[index_point].y = 0.0
mediapipehuman.right_hand_key_points[index_point].z = 0.0
index_point = index_point +1
hand_number_screen = hand_number_screen +1
else: # responsavel por mandar 0 nos topicos quando as duas maos nao estao na tela
index_point = 0
for point in mp_hands.HandLandmark:
mediapipehuman.right_hand_key_points[index_point].name = str(point)
mediapipehuman.right_hand_key_points[index_point].x = 0.0
mediapipehuman.right_hand_key_points[index_point].y = 0.0
mediapipehuman.right_hand_key_points[index_point].z = 0.0
mediapipehuman.left_hand_key_points[index_point].name = str(point)
mediapipehuman.left_hand_key_points[index_point].x = 0.0
mediapipehuman.left_hand_key_points[index_point].y = 0.0
mediapipehuman.left_hand_key_points[index_point].z = 0.0
index_point = index_point + 1
mediapipehumanholisticlist.human_hand_list.right_hand_key_points = mediapipehuman.right_hand_key_points
mediapipehumanholisticlist.human_hand_list.left_hand_key_points = mediapipehuman.left_hand_key_points
mediapipehumanholisticlist.num_humans = 1
self.publisher_.publish(mediapipehumanholisticlist)
cv2.imshow('MediaPipe Hands', image)
if cv2.waitKey(5) & 0xFF == 27:
break
def main(args=None):
rclpy.init(args=args)
holistic_publisher = HolisticPublisher()
holistic_publisher.getimage_callback()
cap.release()
rclpy.spin(holistic_publisher)
holistic_publisher.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()

View File

@ -0,0 +1,110 @@
import rclpy
import cv2
import mediapipe as mp
from rclpy.node import Node
from media_pipe_ros2_msg.msg import MediaPipeHumanPoseList
from mediapipe.python.solutions.pose import PoseLandmark
mp_drawing = mp.solutions.drawing_utils
mp_pose = mp.solutions.pose
NAME_POSE = [
(PoseLandmark.NOSE), (PoseLandmark.LEFT_EYE_INNER),
(PoseLandmark.LEFT_EYE), (PoseLandmark.LEFT_EYE_OUTER),
(PoseLandmark.RIGHT_EYE_INNER), ( PoseLandmark.RIGHT_EYE),
(PoseLandmark.RIGHT_EYE_OUTER), ( PoseLandmark.LEFT_EAR),
(PoseLandmark.RIGHT_EAR), ( PoseLandmark.MOUTH_LEFT),
(PoseLandmark.MOUTH_RIGHT), ( PoseLandmark.LEFT_SHOULDER),
(PoseLandmark.RIGHT_SHOULDER), ( PoseLandmark.LEFT_ELBOW),
(PoseLandmark.RIGHT_ELBOW), ( PoseLandmark.LEFT_WRIST),
(PoseLandmark.RIGHT_WRIST), ( PoseLandmark.LEFT_PINKY),
(PoseLandmark.RIGHT_PINKY), ( PoseLandmark.LEFT_INDEX),
(PoseLandmark.RIGHT_INDEX), ( PoseLandmark.LEFT_THUMB),
(PoseLandmark.RIGHT_THUMB), ( PoseLandmark.LEFT_HIP),
(PoseLandmark.RIGHT_HIP), ( PoseLandmark.LEFT_KNEE),
(PoseLandmark.RIGHT_KNEE), ( PoseLandmark.LEFT_ANKLE),
(PoseLandmark.RIGHT_ANKLE), ( PoseLandmark.LEFT_HEEL),
(PoseLandmark.RIGHT_HEEL), ( PoseLandmark.LEFT_FOOT_INDEX),
(PoseLandmark.RIGHT_FOOT_INDEX)
]
cap = cv2.VideoCapture(0)
class PosePublisher(Node):
def __init__(self):
super().__init__('mediapipe_pose_publisher')
self.publisher_ = self.create_publisher(MediaPipeHumanPoseList, '/mediapipe/human_pose_list', 10)
def getimage_callback(self):
mediapipehumanposelist = MediaPipeHumanPoseList()
with mp_pose.Pose(
min_detection_confidence=0.5,
min_tracking_confidence=0.5) as pose:
while cap.isOpened():
success, image = cap.read()
if not success:
print("Sem camera.")
image = cv2.cvtColor(cv2.flip(image, 1), cv2.COLOR_BGR2RGB)
image.flags.writeable = False
results = pose.process(image)
image.flags.writeable = True
image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
imageHeight, imageWidth, _ = image.shape
# Draw the pose annotation on the image.
image.flags.writeable = True
image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
mp_drawing.draw_landmarks(
image, results.pose_landmarks, mp_pose.POSE_CONNECTIONS)
if results.pose_landmarks != None:
index_pose = 0
for pose_landmarks in (results.pose_landmarks.landmark):
print(index_pose)
mediapipehumanposelist.human_pose_list[index_pose].name = str(NAME_POSE[index_pose])
mediapipehumanposelist.human_pose_list[index_pose].x = pose_landmarks.x
mediapipehumanposelist.human_pose_list[index_pose].y = pose_landmarks.y
mediapipehumanposelist.human_pose_list[index_pose].visibility = pose_landmarks.visibility
index_pose = index_pose +1
mediapipehumanposelist.num_humans = 1
self.publisher_.publish(mediapipehumanposelist)
else: # responsavel por mandar 0 nos topicos quando corpo nao esta na tela
index_pose = 0
for point in mp_pose.PoseLandmark:
mediapipehumanposelist.human_pose_list[index_pose].name = str(NAME_POSE[index_pose])
mediapipehumanposelist.human_pose_list[index_pose].x = 0.0
mediapipehumanposelist.human_pose_list[index_pose].y = 0.0
mediapipehumanposelist.human_pose_list[index_pose].visibility = 0.0
index_pose = index_pose +1
mediapipehumanposelist.num_humans = 1
self.publisher_.publish(mediapipehumanposelist)
cv2.imshow('MediaPipe Pose', image)
if cv2.waitKey(5) & 0xFF == 27:
break
def main(args=None):
rclpy.init(args=args)
pose_publisher = PosePublisher()
pose_publisher.getimage_callback()
cap.release()
rclpy.spin(pose_publisher)
pose_publisher.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()

View File

@ -0,0 +1,47 @@
import cv2
import mediapipe as mp
mp_drawing = mp.solutions.drawing_utils
mp_pose = mp.solutions.pose
mp_hands = mp.solutions.hands
# For webcam input:
cap = cv2.VideoCapture(0)
with mp_pose.Pose(
min_detection_confidence=0.5,
min_tracking_confidence=0.5) as pose, mp_hands.Hands(
min_detection_confidence=0.5,
min_tracking_confidence=0.5) as hands:
while cap.isOpened():
success, image = cap.read()
if not success:
print("Ignoring empty camera frame.")
# If loading a video, use 'break' instead of 'continue'.
continue
# Flip the image horizontally for a later selfie-view display, and convert
# the BGR image to RGB.
image = cv2.cvtColor(cv2.flip(image, 1), cv2.COLOR_BGR2RGB)
# To improve performance, optionally mark the image as not writeable to
# pass by reference.
image.flags.writeable = False
results = pose.process(image)
results_hand = hands.process(image)
# Draw the pose annotation on the image.
image.flags.writeable = True
image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
mp_drawing.draw_landmarks(
image, results.pose_landmarks, mp_pose.POSE_CONNECTIONS)
# Draw the hand annotations on the image.
image.flags.writeable = True
image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
if results_hand.multi_hand_landmarks:
for hand_landmarks in results_hand.multi_hand_landmarks:
mp_drawing.draw_landmarks(
image, hand_landmarks, mp_hands.HAND_CONNECTIONS)
cv2.imshow('MediaPipe Pose', image)
if cv2.waitKey(5) & 0xFF == 27:
break
cap.release()

View File

@ -0,0 +1,18 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>media_pipe_ros2</name>
<version>0.0.0</version>
<description>Package responsible for using the media pipe in ros2 </description>
<maintainer email="dmartinelli1997@gmail.com">Dieisson Martinelli</maintainer>
<license>TODO: License declaration</license>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View File

@ -0,0 +1,4 @@
[develop]
script_dir=$base/lib/media_pipe_ros2
[install]
install_scripts=$base/lib/media_pipe_ros2

View File

@ -0,0 +1,29 @@
from setuptools import setup
package_name = 'media_pipe_ros2'
setup(
name=package_name,
version='0.0.0',
packages=[package_name],
data_files=[
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
],
install_requires=['setuptools'],
zip_safe=True,
maintainer='Dieisson Martinelli',
maintainer_email='dmartinelli1997@gmail.com',
description='Package responsible for using the media pipe in ros2',
license='TODO: License declaration',
tests_require=['pytest'],
entry_points={
'console_scripts': [
'hands_detector = media_pipe_ros2.hands_detector:main',
'pose_detector = media_pipe_ros2.pose_detector:main',
'face_mesh_detector = media_pipe_ros2.face_mesh_detector:main',
'holistic_detector = media_pipe_ros2.holistic_detector:main',
],
},
)

View File

@ -0,0 +1,23 @@
# Copyright 2015 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ament_copyright.main import main
import pytest
@pytest.mark.copyright
@pytest.mark.linter
def test_copyright():
rc = main(argv=['.', 'test'])
assert rc == 0, 'Found errors'

View File

@ -0,0 +1,25 @@
# Copyright 2017 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ament_flake8.main import main_with_errors
import pytest
@pytest.mark.flake8
@pytest.mark.linter
def test_flake8():
rc, errors = main_with_errors(argv=[])
assert rc == 0, \
'Found %d code style errors / warnings:\n' % len(errors) + \
'\n'.join(errors)

View File

@ -0,0 +1,23 @@
# Copyright 2015 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ament_pep257.main import main
import pytest
@pytest.mark.linter
@pytest.mark.pep257
def test_pep257():
rc = main(argv=['.', 'test'])
assert rc == 0, 'Found code style errors / warnings'

View File

@ -0,0 +1,37 @@
cmake_minimum_required(VERSION 3.8)
project(media_pipe_ros2_msg)
if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
add_compile_options(-Wall -Wextra -Wpedantic)
endif()
# find dependencies
find_package(ament_cmake REQUIRED)
find_package(rosidl_default_generators REQUIRED)
rosidl_generate_interfaces(${PROJECT_NAME}
"msg/HandPoint.msg"
"msg/MediaPipeHumanHand.msg"
"msg/MediaPipeHumanHandList.msg"
"msg/PosePoint.msg"
"msg/MediaPipeHumanPoseList.msg"
"msg/FaceMeshPoint.msg"
"msg/MediaPipeHumanFaceMeshList.msg"
"msg/MediaPipeHumanHolisticList.msg"
)
# uncomment the following section in order to fill in
# further dependencies manually.
# find_package(<dependency> REQUIRED)
if(BUILD_TESTING)
find_package(ament_lint_auto REQUIRED)
# the following line skips the linter which checks for copyrights
# uncomment the line when a copyright and license is not present in all source files
#set(ament_cmake_copyright_FOUND TRUE)
# the following line skips cpplint (only works in a git repo)
# uncomment the line when this package is not in a git repo
#set(ament_cmake_cpplint_FOUND TRUE)
ament_lint_auto_find_test_dependencies()
endif()
ament_package()

View File

@ -0,0 +1,3 @@
float64 x
float64 y
float64 z

View File

@ -0,0 +1,4 @@
string name
float64 x
float64 y
float64 z

View File

@ -0,0 +1,2 @@
int32 num_humans
FaceMeshPoint[468] human_face_mesh_list

View File

@ -0,0 +1,2 @@
HandPoint[21] right_hand_key_points
HandPoint[21] left_hand_key_points

View File

@ -0,0 +1,2 @@
int32 num_humans
MediaPipeHumanHand human_hand_list

View File

@ -0,0 +1,4 @@
int32 num_humans
MediaPipeHumanHand human_hand_list
FaceMeshPoint[468] human_face_mesh_list
PosePoint[33] human_pose_list

View File

@ -0,0 +1,2 @@
int32 num_humans
PosePoint[33] human_pose_list

View File

@ -0,0 +1,5 @@
string name
float64 x
float64 y
float64 z
float64 visibility

View File

@ -0,0 +1,21 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>media_pipe_ros2_msg</name>
<version>0.0.0</version>
<description>TODO: Package description</description>
<maintainer email="dmartinelli1997@gmail.com">ros2</maintainer>
<license>TODO: License declaration</license>
<buildtool_depend>ament_cmake</buildtool_depend>
<test_depend>ament_lint_auto</test_depend>
<test_depend>ament_lint_common</test_depend>
<build_depend>rosidl_default_generators</build_depend>
<exec_depend>rosidl_default_runtime</exec_depend>
<member_of_group>rosidl_interface_packages</member_of_group>
<export>
<build_type>ament_cmake</build_type>
</export>
</package>

View File

@ -0,0 +1,12 @@
from launch import LaunchDescription
from launch_ros.actions import Node
def generate_launch_description():
stereo_proc = Node(
package='stereo_image_proc',
executable='stereo_image_proc',
output='screen'
)
return LaunchDescription([
stereo_proc
])

View File

@ -0,0 +1,130 @@
import rclpy
from rclpy.node import Node
import cv2
import numpy as np
from sensor_msgs.msg import Image
from geometry_msgs.msg import PointStamped
from cv_bridge import CvBridge
class StereoHandDepthNode(Node):
def __init__(self):
super().__init__('stereo_hand_depth_node')
# ---------- 参数 ----------
self.fx = 600.0 # 焦距(像素)👉 后面用标定值替换
self.fy = 600.0
self.cx = 320.0
self.cy = 240.0
self.baseline = 0.06 # 双目基线(米)👉 换成你真实的
# ---------- OpenCV ----------
self.bridge = CvBridge()
self.stereo = cv2.StereoSGBM_create(
minDisparity=0,
numDisparities=128,
blockSize=7,
P1=8 * 3 * 7 ** 2,
P2=32 * 3 * 7 ** 2,
disp12MaxDiff=1,
uniquenessRatio=10,
speckleWindowSize=100,
speckleRange=32
)
# ---------- ROS ----------
# self.left = self.create_subscription(
# Image,
# '/camera/left/image_raw',
# self.image_left_callback,
# 10
# )
# self.right = self.create_subscription(
# Image,
# '/camera/right/image_raw',
# self.image_right_callback,
# 10
# )
self.image = self.create_subscription(
Image,
'/image',
self.image_callback,
10
)
self.pub = self.create_publisher(
PointStamped,
'/hand_3d',
10
)
self.get_logger().info('StereoHandDepthNode started')
def image_callback(self, msg: Image):
# 转 OpenCV
frame = self.bridge.imgmsg_to_cv2(msg, desired_encoding='bgr8')
h, w, _ = frame.shape
mid = w // 2
# 左右拆分
left = frame[:, :mid]
right = frame[:, mid:]
cv2.imshow("left", left)
cv2.imshow("right", right)
# 转灰度
left_gray = cv2.cvtColor(left, cv2.COLOR_BGR2GRAY)
right_gray = cv2.cvtColor(right, cv2.COLOR_BGR2GRAY)
# 计算视差
disparity = self.stereo.compute(left_gray, right_gray).astype(np.float32) / 16.0
# ----------- 这里先用“假手点”(画面中心)-----------
x = int(self.cx)
y = int(self.cy)
# ROI 取中值,稳很多
roi = disparity[y-3:y+3, x-3:x+3]
d = np.nanmedian(roi)
if d <= 0:
return
# 深度
Z = (self.fx * self.baseline) / d
X = (x - self.cx) * Z / self.fx
Y = (y - self.cy) * Z / self.fy
# 发布
msg_out = PointStamped()
msg_out.header.stamp = self.get_clock().now().to_msg()
msg_out.header.frame_id = 'camera_link'
msg_out.point.x = float(X)
msg_out.point.y = float(Y)
msg_out.point.z = float(Z)
self.pub.publish(msg_out)
# ----------- 可视化(调试用)-----------
disp_vis = cv2.normalize(disparity, None, 0, 255, cv2.NORM_MINMAX)
disp_vis = disp_vis.astype(np.uint8)
cv2.circle(disp_vis, (x, y), 5, 255, -1)
cv2.imshow('disparity', disp_vis)
cv2.waitKey(10)
def main(args=None):
rclpy.init(args=args)
node = StereoHandDepthNode()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()

View File

@ -0,0 +1,75 @@
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
from cv_bridge import CvBridge
import cv2
import numpy as np
class CameraNode(Node):
def __init__(self):
super().__init__('camera_node')
self.bridge = CvBridge()
self.publisher_left = self.create_publisher(Image, '/camera/left/image_raw', 10)
self.publisher_right = self.create_publisher(Image, '/camera/right/image_raw', 10)
self.publisher_raw = self.create_publisher(Image, '/image', 10)
self.get_logger().info('正在初始化摄像头...')
# 尝试打开物理摄像头
self.cap = None
for i in range(1): # 尝试设备0,1,2
self.get_logger().info(f'尝试打开摄像头设备 /dev/video{i}...')
self.cap = cv2.VideoCapture(i)
if self.cap.isOpened():
# 设置摄像头参数
self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1600)
self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 600)
self.cap.set(cv2.CAP_PROP_FPS, 30)
# 让摄像头预热
for _ in range(5):
self.cap.read()
# 验证摄像头能正常读取
ret, frame = self.cap.read()
if ret and frame is not None:
self.get_logger().info(f'成功打开物理摄像头 /dev/video{i}')
self.get_logger().info(f'图像尺寸: {frame.shape}')
break
self.timer = self.create_timer(0.033, self.timer_callback) # 30Hz
self.get_logger().info('摄像头节点已启动')
def timer_callback(self):
ret, frame = self.cap.read()
if ret and frame is not None:
# frameDist = cv2.rotate(frame, cv2.ROTATE_90_COUNTERCLOCKWISE)
# cv2.imshow("camera_left", frame[:, :800])
# cv2.imshow("camera_right", frame[:, 800:])
cv2.waitKey(10)
msg_left= self.bridge.cv2_to_imgmsg(frame[:, :800], 'bgr8')
msg_right= self.bridge.cv2_to_imgmsg(frame[:, 800:], 'bgr8')
msg_raw= self.bridge.cv2_to_imgmsg(frame, 'bgr8')
msg_left.header.stamp = self.get_clock().now().to_msg()
msg_right.header.stamp = self.get_clock().now().to_msg()
msg_raw.header.stamp = self.get_clock().now().to_msg()
msg_left.header.frame_id = 'camera_frame_left'
msg_right.header.frame_id = 'camera_frame_right'
msg_raw.header.frame_id = 'camera_frame_raw'
self.publisher_left.publish(msg_left)
self.publisher_right.publish(msg_right)
self.publisher_raw.publish(msg_raw)
def main():
rclpy.init()
node = CameraNode()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()

View File

@ -0,0 +1,18 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>openarm_camera</name>
<version>0.0.0</version>
<description>TODO: Package description</description>
<maintainer email="shen@todo.todo">shen</maintainer>
<license>TODO: License declaration</license>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View File

@ -0,0 +1,4 @@
[develop]
script_dir=$base/lib/openarm_camera
[install]
install_scripts=$base/lib/openarm_camera

View File

@ -0,0 +1,36 @@
from setuptools import find_packages, setup
import os
from glob import glob
package_name = 'openarm_camera'
setup(
name=package_name,
version='0.0.0',
packages=find_packages(exclude=['test']),
data_files=[
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
# launch 文件
(os.path.join('share', package_name, 'launch'),
glob('launch/*.launch.py')),
],
install_requires=['setuptools'],
zip_safe=True,
maintainer='shen',
maintainer_email='shen@todo.todo',
description='TODO: Package description',
license='TODO: License declaration',
extras_require={
'test': [
'pytest',
],
},
entry_points={
'console_scripts': [
'camera_node = openarm_camera.camera:main',
'camera_pub_node = openarm_camera.camera_pub:main',
],
},
)

View File

@ -0,0 +1,25 @@
# Copyright 2015 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ament_copyright.main import main
import pytest
# Remove the `skip` decorator once the source file(s) have a copyright header
@pytest.mark.skip(reason='No copyright header has been placed in the generated source file.')
@pytest.mark.copyright
@pytest.mark.linter
def test_copyright():
rc = main(argv=['.', 'test'])
assert rc == 0, 'Found errors'

View File

@ -0,0 +1,25 @@
# Copyright 2017 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ament_flake8.main import main_with_errors
import pytest
@pytest.mark.flake8
@pytest.mark.linter
def test_flake8():
rc, errors = main_with_errors(argv=[])
assert rc == 0, \
'Found %d code style errors / warnings:\n' % len(errors) + \
'\n'.join(errors)

View File

@ -0,0 +1,23 @@
# Copyright 2015 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ament_pep257.main import main
import pytest
@pytest.mark.linter
@pytest.mark.pep257
def test_pep257():
rc = main(argv=['.', 'test'])
assert rc == 0, 'Found code style errors / warnings'

View File

@ -0,0 +1,228 @@
import rclpy
from rclpy.node import Node
from rclpy.action import ActionClient
import numpy as np
import time
import cv2
from moveit_msgs.action import MoveGroup
from moveit_msgs.msg import Constraints, PositionConstraint
from geometry_msgs.msg import PoseStamped
from shape_msgs.msg import SolidPrimitive
data = np.load("/home/shen/openarm_test/src/openarm_moveit_control/openarm_moveit_control/stereo_params_1920x1080.npz")
mtxL = data["mtxL"]
distL = data["distL"]
mtxR = data["mtxR"]
distR = data["distR"]
R = data["R"]
T = data["T"]
image_size = tuple(data["image_size"])
handey_cal = np.load("/home/shen/openarm_test/handeye_data.npz")
R1, R2, P1, P2, Q0, roi1, roi2 = cv2.stereoRectify(
mtxL, distL,
mtxR, distR,
image_size,
R, T,
flags=cv2.CALIB_ZERO_DISPARITY,
alpha=0
)
mapL1, mapL2 = cv2.initUndistortRectifyMap(
mtxL, distL, R1, P1, image_size, cv2.CV_32FC1
)
mapR1, mapR2 = cv2.initUndistortRectifyMap(
mtxR, distR, R2, P2, image_size, cv2.CV_32FC1
)
print("Q:", Q0)
class HandFollowMoveIt(Node):
def __init__(self):
super().__init__('hand_follow_moveit')
self.client = ActionClient(self, MoveGroup, '/move_action')
self.last_sent = time.time()
self.send_period = 0.3 # 约 3 Hz
self.p_prev = None
# ========= 相机 =========
self.cap = cv2.VideoCapture(0) # ← 改成你的相机 ID
self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, 2560)
self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 720)
self.cap.set(cv2.CAP_PROP_FPS, 30)
self.cap.set(cv2.CAP_PROP_EXPOSURE, -8)
# ========= ArUco =========
self.aruco_dict = cv2.aruco.getPredefinedDictionary(
cv2.aruco.DICT_4X4_50
)
self.aruco_params = cv2.aruco.DetectorParameters()
# ========= 双目标定参数(必须填) =========
# 方法 AQ 矩阵(推荐)
self.Q = Q0 # ← TODO: 填你的 4x4 Q 矩阵
self.create_timer(0.5, self.spin)
# ==============================
# 手眼标定求解
# ==============================
def solve_handeye(self, cam, base):
cA = cam.mean(axis=0)
cB = base.mean(axis=0)
AA = cam - cA
BB = base - cB
U, _, Vt = np.linalg.svd(AA.T @ BB)
R = Vt.T @ U.T
if np.linalg.det(R) < 0:
Vt[2] *= -1
R = Vt.T @ U.T
t = cB - R @ cA
return R, t
def split_stereo(self, frame):
"""2560x720 → 左右 1280x720"""
h, w, _ = frame.shape
left = frame[:, :w//2]
right = frame[:, w//2:]
return left, right
def compute_disparity(self, left, right):
gray_l = cv2.cvtColor(left, cv2.COLOR_BGR2GRAY)
gray_r = cv2.cvtColor(right, cv2.COLOR_BGR2GRAY)
stereo = cv2.StereoSGBM_create(
minDisparity=0,
numDisparities=128,
blockSize=7
)
disp = stereo.compute(gray_l, gray_r).astype(np.float32) / 16.0
return disp
def pixel_to_3d(self, u, v, disparity):
"""
像素 + 视差 3Dcamera_link
"""
if self.Q is not None:
point_4d = np.array([u, v, disparity, 1.0])
X = self.Q @ point_4d
X /= X[3]
return X[:3]
def update_hand(self, p):
""" p: np.array([x,y,z]) in base_link """
# 1⃣ 限幅(按你机械臂改)
p[0] = np.clip(p[0], 0.25, 0.55)
p[1] = np.clip(p[1], -0.30, 0.30)
p[2] = np.clip(p[2], 0.25, 0.30)
# 2⃣ 低通滤波
if self.p_prev is None:
self.p_prev = p
return
alpha = 0.2
p = alpha * p + (1 - alpha) * self.p_prev
# 3⃣ 死区
if np.linalg.norm(p - self.p_prev) < 0.015:
return
# 4⃣ 控制频率
if time.time() - self.last_sent < self.send_period:
return
self.send_goal(p)
self.p_prev = p
self.last_sent = time.time()
def send_goal(self, p):
pose = PoseStamped()
pose.header.frame_id = "openarm_body_link0"
pose.pose.position.x = float(p[0])
pose.pose.position.y = float(p[1])
pose.pose.position.z = float(p[2])
pose.pose.orientation.w = 1.0
goal = MoveGroup.Goal()
req = goal.request
req.group_name = "left_arm"
req.allowed_planning_time = 2.0
req.max_velocity_scaling_factor = 0.2
pc = PositionConstraint()
pc.header = pose.header
pc.link_name = "openarm_left_link7"
box = SolidPrimitive()
box.type = SolidPrimitive.BOX
box.dimensions = [0.02, 0.02, 0.02]
pc.constraint_region.primitives.append(box)
pc.constraint_region.primitive_poses.append(pose.pose)
constraints = Constraints()
constraints.position_constraints.append(pc)
req.goal_constraints.append(constraints)
self.client.wait_for_server()
self.client.send_goal_async(goal)
def get_hand_3d(self):
return None
def spin(self):
# frame_count = 0
while rclpy.ok():
ret, frame = self.cap.read()
if not ret:
continue
left, right = self.split_stereo(frame)
rectL = cv2.remap(left, mapL1, mapL2, cv2.INTER_LINEAR)
rectR = cv2.remap(right, mapR1, mapR2, cv2.INTER_LINEAR)
# frame_count += 1
# if frame_count % 3 == 0:
# small_l = cv2.resize(rectL, None, fx=0.5, fy=0.5)
# small_r = cv2.resize(rectR, None, fx=0.5, fy=0.5)
disp = self.compute_disparity(rectL, rectR)
gray = cv2.cvtColor(rectL, cv2.COLOR_BGR2GRAY)
corners, ids, _ = cv2.aruco.detectMarkers(
gray, self.aruco_dict, parameters=self.aruco_params
)
if ids is not None:
cv2.aruco.drawDetectedMarkers(gray, corners, ids)
c = corners[0][0]
center = c.mean(axis=0)
u, v = int(center[0]), int(center[1])
d = disp[v, u]
cv2.circle(rectL, (u, v), 5, (0, 0, 255), -1)
cv2.putText(
rectL, f"disp={d:.2f}", (u+5, v-5),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0,255,0), 1
)
self.solve_handeye(handey_cal["cam"], handey_cal["base"])
cv2.imshow("left", rectL)
# self.check()
key = cv2.waitKey(1)
def main():
rclpy.init()
node = HandFollowMoveIt()
rclpy.spin(node)
rclpy.shutdown()
# while True:
# p_hand_base = get_hand_3d() # 你已有
# node.update_hand(p_hand_base)
if __name__ == "__main__":
main()

View File

@ -0,0 +1,249 @@
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
import numpy as np
import cv2
import tf2_ros
import time
from rclpy.time import Time
"""
==============================
手眼标定数据采集节点完整版本
Eye-to-Hand
==============================
"""
data = np.load("/home/shen/openarm/openarm_test/src/openarm_moveit_control/openarm_moveit_control/stereo_params_1920x1080.npz")
mtxL = data["mtxL"]
distL = data["distL"]
mtxR = data["mtxR"]
distR = data["distR"]
R = data["R"]
T = data["T"]
image_size = tuple(data["image_size"])
R1, R2, P1, P2, Q0, roi1, roi2 = cv2.stereoRectify(
mtxL, distL,
mtxR, distR,
image_size,
R, T,
flags=cv2.CALIB_ZERO_DISPARITY,
alpha=0
)
mapL1, mapL2 = cv2.initUndistortRectifyMap(
mtxL, distL, R1, P1, image_size, cv2.CV_32FC1
)
mapR1, mapR2 = cv2.initUndistortRectifyMap(
mtxR, distR, R2, P2, image_size, cv2.CV_32FC1
)
print("Q:", Q0)
class HandEyeCollector(Node):
def __init__(self):
super().__init__('handeye_collector')
# ========= TF =========
self.tf_buffer = tf2_ros.Buffer()
self.tf_listener = tf2_ros.TransformListener(self.tf_buffer, self)
# ========= 相机 =========
self.cap = cv2.VideoCapture(15) # ← 改成你的相机 ID
self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, 2560)
self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 720)
self.cap.set(cv2.CAP_PROP_FPS, 30)
self.cap.set(cv2.CAP_PROP_EXPOSURE, -8)
# ========= ArUco =========
self.aruco_dict = cv2.aruco.getPredefinedDictionary(
cv2.aruco.DICT_4X4_50
)
self.aruco_params = cv2.aruco.DetectorParameters()
# ========= 数据 =========
self.cam_points = []
self.base_points = []
# ========= 双目标定参数(必须填) =========
# 方法 AQ 矩阵(推荐)
self.Q = Q0 # ← TODO: 填你的 4x4 Q 矩阵
# 方法 B手动参数二选一
self.fx = None
self.fy = None
self.cx = None
self.cy = None
self.baseline = None # 单位:米
self.get_logger().info(
"\nHand-eye collector ready\n"
"ENTER: save sample\n"
"ESC: save & quit\n"
)
self.create_timer(0.5, self.spin)
# self.spin()
# ===============================
# 工具函数
# ===============================
def check(self):
try:
tf = self.tf_buffer.lookup_transform(
'openarm_body_link0',
'openarm_left_link7',
Time()
)
print("Transform:", tf)
self.get_logger().info("TF OK")
except:
self.get_logger().warn("TF not available yet")
return None
def get_tcp_position(self):
try:
# if not self.tf_buffer.can_transform(
# 'openarm_body_link0',
# 'openarm_left_link7',
# Time()
# ):
# self.get_logger().warn("TF not available yet")
# return None
t = self.tf_buffer.lookup_transform(
'openarm_body_link0',
'openarm_left_link7',
Time()
)
print("Transform:", t)
return np.array([
t.transform.translation.x,
t.transform.translation.y,
t.transform.translation.z
])
except Exception as e:
self.get_logger().warn(f"TF lookup failed: {str(e)}")
return None
def split_stereo(self, frame):
"""2560x720 → 左右 1280x720"""
h, w, _ = frame.shape
left = frame[:, :w//2]
right = frame[:, w//2:]
return left, right
def compute_disparity(self, left, right):
gray_l = cv2.cvtColor(left, cv2.COLOR_BGR2GRAY)
gray_r = cv2.cvtColor(right, cv2.COLOR_BGR2GRAY)
stereo = cv2.StereoSGBM_create(
minDisparity=0,
numDisparities=128,
blockSize=7
)
disp = stereo.compute(gray_l, gray_r).astype(np.float32) / 16.0
return disp
def pixel_to_3d(self, u, v, disparity):
"""
像素 + 视差 3Dcamera_link
"""
if self.Q is not None:
point_4d = np.array([u, v, disparity, 1.0])
X = self.Q @ point_4d
X /= X[3]
return X[:3]
# ===============================
# 主循环
# ===============================
def spin(self):
# frame_count = 0
while rclpy.ok():
ret, frame = self.cap.read()
if not ret:
continue
left, right = self.split_stereo(frame)
rectL = cv2.remap(left, mapL1, mapL2, cv2.INTER_LINEAR)
rectR = cv2.remap(right, mapR1, mapR2, cv2.INTER_LINEAR)
# frame_count += 1
# if frame_count % 3 == 0:
small_l = cv2.resize(rectL, None, fx=0.5, fy=0.5)
small_r = cv2.resize(rectR, None, fx=0.5, fy=0.5)
disp = self.compute_disparity(small_l, small_r)
gray = cv2.cvtColor(rectL, cv2.COLOR_BGR2GRAY)
corners, ids, _ = cv2.aruco.detectMarkers(
gray, self.aruco_dict, parameters=self.aruco_params
)
if ids is not None:
cv2.aruco.drawDetectedMarkers(gray, corners, ids)
c = corners[0][0]
center = c.mean(axis=0)
u, v = int(center[0]), int(center[1])
d = disp[v, u]
cv2.circle(rectL, (u, v), 5, (0, 0, 255), -1)
cv2.putText(
rectL, f"disp={d:.2f}", (u+5, v-5),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0,255,0), 1
)
cv2.imshow("left", rectL)
# self.check()
key = cv2.waitKey(1)
# ENTER保存
if key == 13 and ids is not None:
cam_p = self.pixel_to_3d(u, v, d)
print("cam_p:", cam_p)
base_p = self.get_tcp_position()
if cam_p is None or base_p is None:
self.get_logger().warn("Invalid sample")
continue
self.cam_points.append(cam_p)
self.base_points.append(base_p)
self.get_logger().info(
f"Saved #{len(self.cam_points)} "
f"cam={cam_p.round(3)} "
f"base={base_p.round(3)}"
)
time.sleep(0.3)
# ESC退出
elif key == 27:
break
self.save_and_exit()
def save_and_exit(self):
np.savez(
"handeye_data.npz",
cam=np.array(self.cam_points),
base=np.array(self.base_points)
)
self.get_logger().info(
f"Saved {len(self.cam_points)} samples to handeye_data.npz"
)
self.cap.release()
cv2.destroyAllWindows()
# ===============================
# main
# ===============================
def main():
rclpy.init()
node = HandEyeCollector()
rclpy.spin(node)
rclpy.shutdown()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,81 @@
import rclpy
from rclpy.node import Node
from rclpy.action import ActionClient
from moveit_msgs.action import MoveGroup
from moveit_msgs.msg import Constraints, PositionConstraint
from geometry_msgs.msg import PoseStamped
from shape_msgs.msg import SolidPrimitive
class MoveItActionClient(Node):
def __init__(self):
super().__init__('moveit_action_client')
self.client = ActionClient(self, MoveGroup, '/move_action')
def send_pose_goal(self, pose):
goal = MoveGroup.Goal()
req = goal.request
req.group_name = "right_arm" # ⚠️ 改成你的 planning group
req.num_planning_attempts = 5
req.allowed_planning_time = 5.0
req.max_velocity_scaling_factor = 0.2
req.max_acceleration_scaling_factor = 0.2
pc = PositionConstraint()
pc.header = pose.header
pc.link_name = "openarm_right_link7" # ⚠️ 改成你的 TCP
box = SolidPrimitive()
box.type = SolidPrimitive.BOX
box.dimensions = [0.01, 0.01, 0.01]
pc.constraint_region.primitives.append(box)
pc.constraint_region.primitive_poses.append(pose.pose)
pc.weight = 1.0
constraints = Constraints()
constraints.position_constraints.append(pc)
req.goal_constraints.append(constraints)
self.client.wait_for_server()
send_future = self.client.send_goal_async(
goal,
feedback_callback=self.feedback_cb
)
send_future.add_done_callback(self.goal_response_cb)
def goal_response_cb(self, future):
goal_handle = future.result()
if not goal_handle.accepted:
self.get_logger().error("❌ Goal 被 MoveIt 拒绝")
return
self.get_logger().info("✅ Goal 已接受,等待执行结果")
result_future = goal_handle.get_result_async()
result_future.add_done_callback(self.result_cb)
def result_cb(self, future):
result = future.result().result
self.get_logger().info(f"🎯 执行完成error_code = {result.error_code.val}")
rclpy.shutdown()
def feedback_cb(self, feedback):
pass
def main():
rclpy.init()
node = MoveItActionClient()
pose = PoseStamped()
pose.header.frame_id = "openarm_body_link0"
pose.pose.position.x = 0.01
pose.pose.position.y = -0.153
pose.pose.position.z = 0.462
pose.pose.orientation.w = 1.0
node.send_pose_goal(pose)
rclpy.spin(node)
if __name__ == "__main__":
main()

View File

@ -0,0 +1,32 @@
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import TransformStamped
import tf2_ros
from rclpy.time import Time
class TestTF(Node):
def __init__(self):
super().__init__("test_tf")
self.buffer = tf2_ros.Buffer()
self.listener = tf2_ros.TransformListener(self.buffer, self)
self.create_timer(0.5, self.check)
def check(self):
try:
tf = self.buffer.lookup_transform(
'openarm_body_link0',
'openarm_left_link7',
Time()
)
print("Transform:", tf)
self.get_logger().info("TF OK")
except:
self.get_logger().warn("TF not available yet")
def main():
rclpy.init()
node = TestTF()
rclpy.spin(node)
rclpy.shutdown()

View File

@ -0,0 +1,22 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>openarm_moveit_control</name>
<version>0.0.0</version>
<description>TODO: Package description</description>
<maintainer email="shen@todo.todo">shen</maintainer>
<license>TODO: License declaration</license>
<depend>moveit_ros_planning_interface</depend>
<depend>tf2_ros</depend>
<depend>geometry_msgs</depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View File

@ -0,0 +1,4 @@
[develop]
script_dir=$base/lib/openarm_moveit_control
[install]
install_scripts=$base/lib/openarm_moveit_control

View File

@ -0,0 +1,33 @@
from setuptools import find_packages, setup
package_name = 'openarm_moveit_control'
setup(
name=package_name,
version='0.0.0',
packages=find_packages(exclude=['test']),
data_files=[
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
],
install_requires=['setuptools'],
zip_safe=True,
maintainer='shen',
maintainer_email='shen@todo.todo',
description='TODO: Package description',
license='TODO: License declaration',
extras_require={
'test': [
'pytest',
],
},
entry_points={
'console_scripts': [
'moveit_node = openarm_moveit_control.test_control:main',
'handeye_collect = openarm_moveit_control.handeye_collect:main',
'tf_test = openarm_moveit_control.tf_test:main',
'hand_follow_moveit = openarm_moveit_control.hand_follow_moveit:main',
],
},
)

View File

@ -0,0 +1,25 @@
# Copyright 2015 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ament_copyright.main import main
import pytest
# Remove the `skip` decorator once the source file(s) have a copyright header
@pytest.mark.skip(reason='No copyright header has been placed in the generated source file.')
@pytest.mark.copyright
@pytest.mark.linter
def test_copyright():
rc = main(argv=['.', 'test'])
assert rc == 0, 'Found errors'

View File

@ -0,0 +1,25 @@
# Copyright 2017 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ament_flake8.main import main_with_errors
import pytest
@pytest.mark.flake8
@pytest.mark.linter
def test_flake8():
rc, errors = main_with_errors(argv=[])
assert rc == 0, \
'Found %d code style errors / warnings:\n' % len(errors) + \
'\n'.join(errors)

View File

@ -0,0 +1,23 @@
# Copyright 2015 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ament_pep257.main import main
import pytest
@pytest.mark.linter
@pytest.mark.pep257
def test_pep257():
rc = main(argv=['.', 'test'])
assert rc == 0, 'Found code style errors / warnings'

View File

@ -0,0 +1,31 @@
cmake_minimum_required(VERSION 3.8)
project(status_interfaces)
if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
add_compile_options(-Wall -Wextra -Wpedantic)
endif()
# find dependencies
find_package(ament_cmake REQUIRED)
find_package(rosidl_default_generators REQUIRED)
find_package(builtin_interfaces REQUIRED)
rosidl_generate_interfaces(${PROJECT_NAME}
"msg/SystemStatus.msg"
DEPENDENCIES builtin_interfaces
)
if(BUILD_TESTING)
find_package(ament_lint_auto REQUIRED)
# the following line skips the linter which checks for copyrights
# comment the line when a copyright and license is added to all source files
set(ament_cmake_copyright_FOUND TRUE)
# the following line skips cpplint (only works in a git repo)
# comment the line when this package is in a git repo and when
# a copyright and license is added to all source files
set(ament_cmake_cpplint_FOUND TRUE)
ament_lint_auto_find_test_dependencies()
endif()
ament_package()

View File

@ -0,0 +1,8 @@
builtin_interfaces/Time stamp # 记录时间戳
string host_name # 系统名称
float32 cpu_percent # CPU使用率
float32 memory_percent # 内存使用率
float32 memory_total # 内存总量
float32 memory_available # 剩余有效内存
float64 net_sent # 网络发送数据总量
float64 net_recv # 网络接收数据总量

View File

@ -0,0 +1,21 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>status_interfaces</name>
<version>0.0.0</version>
<description>TODO: Package description</description>
<maintainer email="shen@todo.todo">shen</maintainer>
<license>TODO: License declaration</license>
<buildtool_depend>ament_cmake</buildtool_depend>
<depend>rosidl_default_generators</depend>
<depend>builtin_interfaces</depend>
<member_of_group>rosidl_interface_packages</member_of_group>
<test_depend>ament_lint_auto</test_depend>
<test_depend>ament_lint_common</test_depend>
<export>
<build_type>ament_cmake</build_type>
</export>
</package>

View File

@ -0,0 +1,21 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>status_publish</name>
<version>0.0.0</version>
<description>TODO: Package description</description>
<maintainer email="shen@todo.todo">shen</maintainer>
<license>TODO: License declaration</license>
<depend>rclpy</depend>
<depend>status_interfaces</depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View File

@ -0,0 +1,4 @@
[develop]
script_dir=$base/lib/status_publish
[install]
install_scripts=$base/lib/status_publish

View File

@ -0,0 +1,26 @@
from setuptools import find_packages, setup
package_name = 'status_publish'
setup(
name=package_name,
version='0.0.0',
packages=find_packages(exclude=['test']),
data_files=[
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
],
install_requires=['setuptools'],
zip_safe=True,
maintainer='shen',
maintainer_email='shen@todo.todo',
description='TODO: Package description',
license='TODO: License declaration',
tests_require=['pytest'],
entry_points={
'console_scripts': [
'status_pub_node = status_publish.sys_status_pub:main'
],
},
)

View File

@ -0,0 +1,37 @@
import rclpy
from rclpy.node import Node
from status_interfaces.msg import SystemStatus # 导入消息接口
import psutil
import platform
class SysStatusPub(Node):
def __init__(self, node_name):
super().__init__(node_name)
self.status_publisher_ = self.create_publisher(
SystemStatus, 'sys_status', 10)
self.timer = self.create_timer(1, self.timer_callback)
def timer_callback(self):
cpu_percent = psutil.cpu_percent()
memory_info = psutil.virtual_memory()
net_io_counters = psutil.net_io_counters()
msg = SystemStatus()
msg.stamp = self.get_clock().now().to_msg()
msg.host_name = platform.node()
msg.cpu_percent = cpu_percent
msg.memory_percent = memory_info.percent
msg.memory_total = memory_info.total / 1024 / 1024
msg.memory_available = memory_info.available / 1024 / 1024
msg.net_sent = net_io_counters.bytes_sent / 1024 / 1024
msg.net_recv = net_io_counters.bytes_recv / 1024 / 1024
self.get_logger().info(f'发布:{str(msg)}')
self.status_publisher_.publish(msg)
def main():
rclpy.init()
node = SysStatusPub('sys_status_pub')
rclpy.spin(node)
rclpy.shutdown()

View File

@ -0,0 +1,25 @@
# Copyright 2015 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ament_copyright.main import main
import pytest
# Remove the `skip` decorator once the source file(s) have a copyright header
@pytest.mark.skip(reason='No copyright header has been placed in the generated source file.')
@pytest.mark.copyright
@pytest.mark.linter
def test_copyright():
rc = main(argv=['.', 'test'])
assert rc == 0, 'Found errors'

View File

@ -0,0 +1,25 @@
# Copyright 2017 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ament_flake8.main import main_with_errors
import pytest
@pytest.mark.flake8
@pytest.mark.linter
def test_flake8():
rc, errors = main_with_errors(argv=[])
assert rc == 0, \
'Found %d code style errors / warnings:\n' % len(errors) + \
'\n'.join(errors)

View File

@ -0,0 +1,23 @@
# Copyright 2015 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ament_pep257.main import main
import pytest
@pytest.mark.linter
@pytest.mark.pep257
def test_pep257():
rc = main(argv=['.', 'test'])
assert rc == 0, 'Found code style errors / warnings'