# Copyright 2025 Enactic, Inc. # Copyright 2024 Stogl Robotics Consulting UG (haftungsbeschränkt) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import os import xacro from ament_index_python.packages import get_package_share_directory from launch import LaunchDescription, LaunchContext from launch.actions import DeclareLaunchArgument, RegisterEventHandler, TimerAction, OpaqueFunction from launch.conditions import IfCondition from launch.event_handlers import OnProcessExit from launch.substitutions import ( LaunchConfiguration, PathJoinSubstitution, ) from launch_ros.actions import Node from launch_ros.substitutions import FindPackageShare def namespace_from_context(context, arm_prefix): arm_prefix_str = context.perform_substitution(arm_prefix) if arm_prefix_str: return arm_prefix_str.strip('/') return None def generate_robot_description(context: LaunchContext, description_package, description_file, arm_type, use_fake_hardware, right_can_interface, left_can_interface, gravity_compensation_enabled, gravity_compensation_use_kdl, gravity_torque_mode, gravity_scale, gravity_tau_limit1, gravity_tau_limit2, gravity_tau_limit3, gravity_tau_limit4, gravity_tau_limit5, gravity_tau_limit6, gravity_tau_limit7, gripper_gravity_tau, gripper_gravity_tau_limit, kdl_urdf_path, kdl_base_link, kdl_tip_link): """Generate robot description using xacro processing.""" description_package_str = context.perform_substitution(description_package) description_file_str = context.perform_substitution(description_file) arm_type_str = context.perform_substitution(arm_type) use_fake_hardware_str = context.perform_substitution(use_fake_hardware) right_can_interface_str = context.perform_substitution(right_can_interface) left_can_interface_str = context.perform_substitution(left_can_interface) gravity_compensation_enabled_str = context.perform_substitution(gravity_compensation_enabled) gravity_compensation_use_kdl_str = context.perform_substitution(gravity_compensation_use_kdl) gravity_torque_mode_str = context.perform_substitution(gravity_torque_mode) gravity_scale_str = context.perform_substitution(gravity_scale) gravity_tau_limit1_str = context.perform_substitution(gravity_tau_limit1) gravity_tau_limit2_str = context.perform_substitution(gravity_tau_limit2) gravity_tau_limit3_str = context.perform_substitution(gravity_tau_limit3) gravity_tau_limit4_str = context.perform_substitution(gravity_tau_limit4) gravity_tau_limit5_str = context.perform_substitution(gravity_tau_limit5) gravity_tau_limit6_str = context.perform_substitution(gravity_tau_limit6) gravity_tau_limit7_str = context.perform_substitution(gravity_tau_limit7) gripper_gravity_tau_str = context.perform_substitution(gripper_gravity_tau) gripper_gravity_tau_limit_str = context.perform_substitution(gripper_gravity_tau_limit) kdl_urdf_path_str = context.perform_substitution(kdl_urdf_path) kdl_base_link_str = context.perform_substitution(kdl_base_link) kdl_tip_link_str = context.perform_substitution(kdl_tip_link) xacro_path = os.path.join( get_package_share_directory(description_package_str), "urdf", "robot", description_file_str ) # Process xacro with required arguments robot_description = xacro.process_file( xacro_path, mappings={ "arm_type": arm_type_str, "bimanual": "true", "use_fake_hardware": use_fake_hardware_str, "ros2_control": "true", "right_can_interface": right_can_interface_str, "left_can_interface": left_can_interface_str, "gravity_compensation_enabled": gravity_compensation_enabled_str, "gravity_compensation_use_kdl": gravity_compensation_use_kdl_str, "gravity_torque_mode": gravity_torque_mode_str, "gravity_scale": gravity_scale_str, "gravity_tau_limit1": gravity_tau_limit1_str, "gravity_tau_limit2": gravity_tau_limit2_str, "gravity_tau_limit3": gravity_tau_limit3_str, "gravity_tau_limit4": gravity_tau_limit4_str, "gravity_tau_limit5": gravity_tau_limit5_str, "gravity_tau_limit6": gravity_tau_limit6_str, "gravity_tau_limit7": gravity_tau_limit7_str, "gripper_gravity_tau": gripper_gravity_tau_str, "gripper_gravity_tau_limit": gripper_gravity_tau_limit_str, "kdl_urdf_path": kdl_urdf_path_str, "kdl_base_link": kdl_base_link_str, "kdl_tip_link": kdl_tip_link_str, } ).toprettyxml(indent=" ") with open(kdl_urdf_path_str, "w", encoding="utf-8") as urdf_file: urdf_file.write(robot_description) return robot_description def robot_nodes_spawner(context: LaunchContext, description_package, description_file, arm_type, use_fake_hardware, controllers_file, right_can_interface, left_can_interface, arm_prefix, gravity_compensation_enabled, gravity_compensation_use_kdl, gravity_torque_mode, gravity_scale, gravity_tau_limit1, gravity_tau_limit2, gravity_tau_limit3, gravity_tau_limit4, gravity_tau_limit5, gravity_tau_limit6, gravity_tau_limit7, gripper_gravity_tau, gripper_gravity_tau_limit, kdl_urdf_path, kdl_base_link, kdl_tip_link): """Spawn both robot state publisher and control nodes with shared robot description.""" namespace = namespace_from_context(context, arm_prefix) robot_description = generate_robot_description( context, description_package, description_file, arm_type, use_fake_hardware, right_can_interface, left_can_interface, gravity_compensation_enabled, gravity_compensation_use_kdl, gravity_torque_mode, gravity_scale, gravity_tau_limit1, gravity_tau_limit2, gravity_tau_limit3, gravity_tau_limit4, gravity_tau_limit5, gravity_tau_limit6, gravity_tau_limit7, gripper_gravity_tau, gripper_gravity_tau_limit, kdl_urdf_path, kdl_base_link, kdl_tip_link, ) controllers_file_str = context.perform_substitution(controllers_file) robot_description_param = {"robot_description": robot_description} if namespace: controllers_file_str = controllers_file_str.replace( "openarm_v10_bimanual_controllers.yaml", "openarm_v10_bimanual_controllers_namespaced.yaml" ) robot_state_pub_node = Node( package="robot_state_publisher", executable="robot_state_publisher", name="robot_state_publisher", output="screen", namespace=namespace, parameters=[robot_description_param], ) control_node = Node( package="controller_manager", executable="ros2_control_node", output="both", namespace=namespace, parameters=[robot_description_param, controllers_file_str], ) return [robot_state_pub_node, control_node] def controller_spawner(context: LaunchContext, robot_controller, arm_prefix): """Spawn controller based on robot_controller argument.""" namespace = namespace_from_context(context, arm_prefix) controller_manager_ref = f"/{namespace}/controller_manager" if namespace else "/controller_manager" robot_controller_str = context.perform_substitution(robot_controller) if robot_controller_str == "forward_position_controller": robot_controller_left = "left_forward_position_controller" robot_controller_right = "right_forward_position_controller" elif robot_controller_str == "forward_velocity_controller": robot_controller_left = "left_forward_velocity_controller" robot_controller_right = "right_forward_velocity_controller" elif robot_controller_str == "joint_trajectory_controller": robot_controller_left = "left_joint_trajectory_controller" robot_controller_right = "right_joint_trajectory_controller" else: raise ValueError(f"Unknown robot_controller: {robot_controller_str}") robot_controller_spawner = Node( package="controller_manager", executable="spawner", namespace=namespace, arguments=[robot_controller_left, robot_controller_right, "-c", controller_manager_ref], ) return [robot_controller_spawner] def motor_disable_service_spawner(context: LaunchContext, arm_prefix): """Spawn service node exposing a simple motor-disable Trigger interface.""" namespace = namespace_from_context(context, arm_prefix) controller_manager_ref = f"/{namespace}/controller_manager" if namespace else "/controller_manager" motor_disable_service_node = Node( package="openarm_bringup", executable="motor_disable_service.py", namespace=namespace, output="screen", parameters=[{ "controller_manager_name": controller_manager_ref, "hardware_components": [ "openarm_left_hardware_interface", "openarm_right_hardware_interface", ], "service_name": "disable_motors", }], ) return [motor_disable_service_node] def generate_launch_description(): """Generate launch description for OpenArm bimanual configuration.""" # Declare launch arguments declared_arguments = [ DeclareLaunchArgument( "description_package", default_value="openarm_description", description="Description package with robot URDF/xacro files.", ), DeclareLaunchArgument( "description_file", default_value="v10.urdf.xacro", description="URDF/XACRO description file with the robot.", ), DeclareLaunchArgument( "arm_type", default_value="v10", description="Type of arm (e.g., v10).", ), DeclareLaunchArgument( "use_fake_hardware", default_value="false", description="Use fake hardware instead of real hardware.", ), DeclareLaunchArgument( "robot_controller", default_value="joint_trajectory_controller", choices=["forward_position_controller", "forward_velocity_controller", "joint_trajectory_controller"], description="Robot controller to start.", ), DeclareLaunchArgument( "runtime_config_package", default_value="openarm_bringup", description="Package with the controller's configuration in config folder.", ), DeclareLaunchArgument( "arm_prefix", default_value="", description="Prefix for the arm for topic namespacing.", ), DeclareLaunchArgument( "right_can_interface", default_value="can0", description="CAN interface to use for the right arm.", ), DeclareLaunchArgument( "left_can_interface", default_value="can1", description="CAN interface to use for the left arm.", ), DeclareLaunchArgument( "controllers_file", default_value="openarm_v10_bimanual_controllers.yaml", description="Controllers file(s) to use. Can be a single file or comma-separated list of files.", ), DeclareLaunchArgument( "gravity_compensation_enabled", default_value="true", description="Enable gravity compensation in hardware interface.", ), DeclareLaunchArgument( "gravity_compensation_use_kdl", default_value="true", description="Use KDL dynamics solver for gravity compensation.", ), DeclareLaunchArgument( "gravity_torque_mode", default_value="true", description="If true, disables position stiffness and applies gravity in pure torque mode.", ), DeclareLaunchArgument( "gravity_scale", default_value="3.5", description="Scale multiplier for gravity torque from KDL.", ), DeclareLaunchArgument("gravity_tau_limit1", default_value="10.0"), DeclareLaunchArgument("gravity_tau_limit2", default_value="10.0"), DeclareLaunchArgument("gravity_tau_limit3", default_value="5.0"), DeclareLaunchArgument("gravity_tau_limit4", default_value="5.0"), DeclareLaunchArgument("gravity_tau_limit5", default_value="5.0"), DeclareLaunchArgument("gravity_tau_limit6", default_value="3.0"), DeclareLaunchArgument("gravity_tau_limit7", default_value="10.0"), DeclareLaunchArgument( "gripper_gravity_tau", default_value="-0.1", description="Constant gripper gravity compensation torque (Nm), sign follows motor positive direction.", ), DeclareLaunchArgument( "gripper_gravity_tau_limit", default_value="0.1", description="Absolute clamp limit (Nm) for gripper gravity compensation torque.", ), DeclareLaunchArgument( "kdl_urdf_path", default_value="/tmp/openarm_bimanual_runtime.urdf", description="Runtime URDF file path for KDL parser.", ), DeclareLaunchArgument( "kdl_base_link", default_value="openarm_body_link0", description="Base link used by KDL chain.", ), DeclareLaunchArgument( "kdl_tip_link", default_value="", description="Tip link override for KDL chain. Empty uses arm-specific link7.", ), DeclareLaunchArgument( "enable_joint_state_udp_broadcast", default_value="false", description="Enable UDP broadcast for arm joint position/velocity/torque.", ), DeclareLaunchArgument( "joint_state_udp_broadcast_ip", default_value="255.255.255.255", description="UDP target IP for joint state broadcast (broadcast or unicast).", ), DeclareLaunchArgument( "joint_state_udp_broadcast_port", default_value="10001", description="UDP target port for joint state broadcast.", ), DeclareLaunchArgument( "joint_state_udp_joint_regex", default_value="^openarm_(left|right)_(joint[1-7]|finger_joint1)$", description="Regex to select joint names included in UDP payload.", ), DeclareLaunchArgument( "joint_state_udp_payload_format", default_value="json", choices=["json", "binary"], description="UDP payload format for joint state broadcast.", ), ] # Initialize launch configurations description_package = LaunchConfiguration("description_package") description_file = LaunchConfiguration("description_file") arm_type = LaunchConfiguration("arm_type") use_fake_hardware = LaunchConfiguration("use_fake_hardware") robot_controller = LaunchConfiguration("robot_controller") runtime_config_package = LaunchConfiguration("runtime_config_package") controllers_file = LaunchConfiguration("controllers_file") rightcan_interface = LaunchConfiguration("right_can_interface") left_can_interface = LaunchConfiguration("left_can_interface") arm_prefix = LaunchConfiguration("arm_prefix") gravity_compensation_enabled = LaunchConfiguration("gravity_compensation_enabled") gravity_compensation_use_kdl = LaunchConfiguration("gravity_compensation_use_kdl") gravity_torque_mode = LaunchConfiguration("gravity_torque_mode") gravity_scale = LaunchConfiguration("gravity_scale") gravity_tau_limit1 = LaunchConfiguration("gravity_tau_limit1") gravity_tau_limit2 = LaunchConfiguration("gravity_tau_limit2") gravity_tau_limit3 = LaunchConfiguration("gravity_tau_limit3") gravity_tau_limit4 = LaunchConfiguration("gravity_tau_limit4") gravity_tau_limit5 = LaunchConfiguration("gravity_tau_limit5") gravity_tau_limit6 = LaunchConfiguration("gravity_tau_limit6") gravity_tau_limit7 = LaunchConfiguration("gravity_tau_limit7") gripper_gravity_tau = LaunchConfiguration("gripper_gravity_tau") gripper_gravity_tau_limit = LaunchConfiguration("gripper_gravity_tau_limit") kdl_urdf_path = LaunchConfiguration("kdl_urdf_path") kdl_base_link = LaunchConfiguration("kdl_base_link") kdl_tip_link = LaunchConfiguration("kdl_tip_link") enable_joint_state_udp_broadcast = LaunchConfiguration("enable_joint_state_udp_broadcast") joint_state_udp_broadcast_ip = LaunchConfiguration("joint_state_udp_broadcast_ip") joint_state_udp_broadcast_port = LaunchConfiguration("joint_state_udp_broadcast_port") joint_state_udp_joint_regex = LaunchConfiguration("joint_state_udp_joint_regex") joint_state_udp_payload_format = LaunchConfiguration("joint_state_udp_payload_format") controllers_file = PathJoinSubstitution( [FindPackageShare(runtime_config_package), "config", "v10_controllers", controllers_file] ) robot_nodes_spawner_func = OpaqueFunction( function=robot_nodes_spawner, args=[description_package, description_file, arm_type, use_fake_hardware, controllers_file, rightcan_interface, left_can_interface, arm_prefix, gravity_compensation_enabled, gravity_compensation_use_kdl, gravity_torque_mode, gravity_scale, gravity_tau_limit1, gravity_tau_limit2, gravity_tau_limit3, gravity_tau_limit4, gravity_tau_limit5, gravity_tau_limit6, gravity_tau_limit7, gripper_gravity_tau, gripper_gravity_tau_limit, kdl_urdf_path, kdl_base_link, kdl_tip_link] ) rviz_config_file = PathJoinSubstitution( [FindPackageShare(description_package), "rviz", "bimanual.rviz"] ) rviz_node = Node( package="rviz2", executable="rviz2", name="rviz2", output="log", arguments=["-d", rviz_config_file], ) # Joint state broadcaster spawner joint_state_broadcaster_spawner = OpaqueFunction( function=lambda context: [Node( package="controller_manager", executable="spawner", namespace=namespace_from_context(context, arm_prefix), arguments=["joint_state_broadcaster", "--controller-manager", f"/{namespace_from_context(context, arm_prefix)}/controller_manager" if namespace_from_context(context, arm_prefix) else "/controller_manager"], )] ) # Controller spawners controller_spawner_func = OpaqueFunction( function=controller_spawner, args=[robot_controller, arm_prefix] ) gripper_controller_spawner = OpaqueFunction( function=lambda context: [Node( package="controller_manager", executable="spawner", namespace=namespace_from_context(context, arm_prefix), arguments=["left_gripper_controller", "right_gripper_controller", "-c", f"/{namespace_from_context(context, arm_prefix)}/controller_manager" if namespace_from_context(context, arm_prefix) else "/controller_manager"], )] ) motor_disable_service_spawner_func = OpaqueFunction( function=motor_disable_service_spawner, args=[arm_prefix], ) joint_state_udp_broadcaster_node = OpaqueFunction( function=lambda context: [Node( package="openarm_bringup", executable="joint_state_udp_broadcaster.py", namespace=namespace_from_context(context, arm_prefix), output="screen", condition=IfCondition(enable_joint_state_udp_broadcast), parameters=[{ "input_topic": "joint_states", "broadcast_ip": joint_state_udp_broadcast_ip, "broadcast_port": joint_state_udp_broadcast_port, "joint_name_regex": joint_state_udp_joint_regex, "payload_format": joint_state_udp_payload_format, }], )] ) # Timing and sequencing LAUNCH_DELAY_SECONDS = 1.0 delayed_joint_state_broadcaster = TimerAction( period=LAUNCH_DELAY_SECONDS, actions=[joint_state_broadcaster_spawner], ) delayed_robot_controller = TimerAction( period=LAUNCH_DELAY_SECONDS, actions=[controller_spawner_func], ) delayed_gripper_controller = TimerAction( period=LAUNCH_DELAY_SECONDS, actions=[gripper_controller_spawner], ) return LaunchDescription( declared_arguments + [ robot_nodes_spawner_func, rviz_node, motor_disable_service_spawner_func, joint_state_udp_broadcaster_node, ] + [ delayed_joint_state_broadcaster, delayed_robot_controller, delayed_gripper_controller, ] )