Revert "Kirigirisu merge feature added" (#24)

Reverts reazon-research/openarm_ros2#23
This commit is contained in:
Kevin Jiang 2025-07-01 15:10:37 +09:00 committed by GitHub
parent f1420f1572
commit 40a1a5c1ca
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 2 additions and 164 deletions

View File

@ -33,9 +33,4 @@ install(DIRECTORY launch meshes rviz urdf worlds
DESTINATION share/${PROJECT_NAME}
)
install(PROGRAMS
scripts/joint_state_merger.py
DESTINATION lib/${PROJECT_NAME}
)
ament_package()

2
openarm_bimanual_description/launch/display.launch.py Executable file → Normal file
View File

@ -94,7 +94,7 @@ def generate_launch_description():
),
launch.actions.DeclareLaunchArgument(
name="gui",
default_value="true",
default_value="True",
description="Flag to enable joint_state_publisher_gui",
),
launch.actions.DeclareLaunchArgument(

View File

@ -1,70 +0,0 @@
import xacro
from pathlib import Path
from launch import LaunchDescription
import launch_ros
from launch_ros.actions import Node
from launch_ros.substitutions import FindPackageShare
def generate_launch_description():
pkg_share = Path(
launch_ros.substitutions.FindPackageShare(package="openarm_bimanual_description").find(
"openarm_bimanual_description"
)
)
default_model_path = pkg_share / "urdf/openarm_bimanual.urdf.xacro"
default_rviz_config_path = pkg_share / "rviz/robot_description.rviz"
# Parse xacro to get robot_description XML
doc = xacro.process_file(str(default_model_path))
robot_description_xml = doc.toxml()
# Directly read udrf file (no xacro)
# robot_description_xml = Path(default_model_path).read_text()
# Node to publish joint states from /joint_states topic or GUI
joint_state_publisher_node = Node(
package="joint_state_publisher",
executable="joint_state_publisher",
name="joint_state_publisher",
output="screen",
parameters=[{"use_sim_time": False}],
)
# Node to publish robot state TFs based on robot_description and /joint_states
robot_state_publisher_node = Node(
package="robot_state_publisher",
executable="robot_state_publisher",
name="robot_state_publisher",
output="screen",
parameters=[
{"robot_description": robot_description_xml,
"use_sim_time": False}
],
)
# RViz node to visualize the robot model and TF
rviz_node = Node(
package="rviz2",
executable="rviz2",
name="rviz2",
output="screen",
arguments=["-d", str(default_rviz_config_path)],
parameters=[{"use_sim_time": False}],
)
joint_state_merger_node = Node(
package="openarm_bimanual_description",
executable="joint_state_merger.py",
name="joint_state_merger",
output="screen",
parameters=[{"urdf_path": str(default_model_path)}],
)
return LaunchDescription([
#joint_state_publisher_node,
robot_state_publisher_node,
joint_state_merger_node,
rviz_node,
])

View File

@ -1,81 +0,0 @@
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import JointState
from urdf_parser_py.urdf import URDF
from pathlib import Path
import xacro
class JointStateMerger(Node):
def __init__(self):
super().__init__('joint_state_merger')
# Get URDF file parameter
urdf_path = self.declare_parameter('urdf_path', '').get_parameter_value().string_value
if not urdf_path or not Path(urdf_path).exists():
self.get_logger().error(f"Invalid or missing URDF path: {urdf_path}")
rclpy.shutdown()
return
# Parse URDF directly from file (not xacro)
# self.robot = URDF.from_xml_file(urdf_path)
# Process xacro and parse URDF
doc = xacro.process_file(urdf_path)
robot_description_xml = doc.toxml()
self.robot = URDF.from_xml_string(robot_description_xml)
# Initialize full joint position dict with zeros
self.joint_positions = {
j.name: 0.0 for j in self.robot.joints if j.type != 'fixed'
}
self.joint_names = list(self.joint_positions.keys())
# Store joint limits (min/max) for normalization
self.joint_limits = {
j.name: (j.limit.lower, j.limit.upper) if j.limit else (-3.14, 3.14)
for j in self.robot.joints if j.type != 'fixed'
}
# Publisher for merged joint states
self.joint_state_pub = self.create_publisher(JointState, '/joint_states', 10)
# Subscription to your partial updates — change topic & msg type as needed
self.partial_update_sub = self.create_subscription(
JointState, '/joint_states_partial', self.partial_update_callback, 10
)
# Optionally subscribe to existing /joint_states to keep current positions up to date
self.existing_joint_states_sub = self.create_subscription(
JointState, '/joint_states', self.existing_joint_states_callback, 10
)
# Timer to publish merged joint states regularly
self.timer = self.create_timer(0.05, self.publish_merged_joint_states)
def partial_update_callback(self, msg: JointState):
for name, pos in zip(msg.name, msg.position):
if name in self.joint_positions:
self.joint_positions[name] = pos
def existing_joint_states_callback(self, msg: JointState):
for name, pos in zip(msg.name, msg.position):
if name in self.joint_positions:
self.joint_positions[name] = pos
def publish_merged_joint_states(self):
msg = JointState()
msg.header.stamp = self.get_clock().now().to_msg()
msg.name = self.joint_names
msg.position = [self.joint_positions[n] for n in self.joint_names]
self.joint_state_pub.publish(msg)
def main(args=None):
rclpy.init(args=args)
node = JointStateMerger()
rclpy.spin(node)
rclpy.shutdown()
if __name__ == '__main__':
main()

View File

@ -17,9 +17,7 @@
#include <string>
#include "rclcpp/rclcpp.hpp"
#include "hardware_interface/resource_manager.hpp"
#include "ros2_control_test_assets/components_urdfs.hpp"
#include "ros2_control_test_assets/descriptions.hpp"
@ -126,9 +124,5 @@ class TestOpenArmHW : public ::testing::Test {
TEST_F(TestOpenArmHW, load_openarm_hardware_7dof) {
auto urdf = ros2_control_test_assets::urdf_head + openarm_hardware_7dof_ +
ros2_control_test_assets::urdf_tail;
ASSERT_NO_THROW({
auto logger = rclcpp::get_logger("test_logger");
auto clock = std::make_shared<rclcpp::Clock>(RCL_ROS_TIME);
hardware_interface::ResourceManager rm(urdf, clock, logger, true, 0);
});
ASSERT_NO_THROW(hardware_interface::ResourceManager rm(urdf));
}