From 9cf00ada7ed4b600e5a2ace3c0ba5b14fb4ad4e3 Mon Sep 17 00:00:00 2001
From: shen <664376944@qq.com>
Date: Mon, 30 Mar 2026 14:37:31 +0800
Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E7=94=B5=E6=9C=BA=E5=A4=B1?=
=?UTF-8?q?=E8=83=BD?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
openarm_bringup/CMakeLists.txt | 5 +
.../launch/openarm.bimanual.launch.py | 29 ++++
openarm_bringup/package.xml | 4 +
.../scripts/motor_disable_service.py | 146 ++++++++++++++++++
4 files changed, 184 insertions(+)
create mode 100755 openarm_bringup/scripts/motor_disable_service.py
diff --git a/openarm_bringup/CMakeLists.txt b/openarm_bringup/CMakeLists.txt
index 72ac5c0..fc904e9 100644
--- a/openarm_bringup/CMakeLists.txt
+++ b/openarm_bringup/CMakeLists.txt
@@ -41,4 +41,9 @@ install(DIRECTORY launch config
DESTINATION share/${PROJECT_NAME}
)
+install(PROGRAMS
+ scripts/motor_disable_service.py
+ DESTINATION lib/${PROJECT_NAME}
+)
+
ament_package()
diff --git a/openarm_bringup/launch/openarm.bimanual.launch.py b/openarm_bringup/launch/openarm.bimanual.launch.py
index 96ee60e..fa8297e 100644
--- a/openarm_bringup/launch/openarm.bimanual.launch.py
+++ b/openarm_bringup/launch/openarm.bimanual.launch.py
@@ -183,6 +183,29 @@ def controller_spawner(context: LaunchContext, robot_controller, arm_prefix):
return [robot_controller_spawner]
+def motor_disable_service_spawner(context: LaunchContext, arm_prefix):
+ """Spawn service node exposing a simple motor-disable Trigger interface."""
+ namespace = namespace_from_context(context, arm_prefix)
+ controller_manager_ref = f"/{namespace}/controller_manager" if namespace else "/controller_manager"
+
+ motor_disable_service_node = Node(
+ package="openarm_bringup",
+ executable="motor_disable_service.py",
+ namespace=namespace,
+ output="screen",
+ parameters=[{
+ "controller_manager_name": controller_manager_ref,
+ "hardware_components": [
+ "openarm_left_hardware_interface",
+ "openarm_right_hardware_interface",
+ ],
+ "service_name": "disable_motors",
+ }],
+ )
+
+ return [motor_disable_service_node]
+
+
def generate_launch_description():
"""Generate launch description for OpenArm bimanual configuration."""
@@ -369,6 +392,11 @@ def generate_launch_description():
)]
)
+ motor_disable_service_spawner_func = OpaqueFunction(
+ function=motor_disable_service_spawner,
+ args=[arm_prefix],
+ )
+
# Timing and sequencing
LAUNCH_DELAY_SECONDS = 1.0
delayed_joint_state_broadcaster = TimerAction(
@@ -389,6 +417,7 @@ def generate_launch_description():
declared_arguments + [
robot_nodes_spawner_func,
rviz_node,
+ motor_disable_service_spawner_func,
] +
[
delayed_joint_state_broadcaster,
diff --git a/openarm_bringup/package.xml b/openarm_bringup/package.xml
index 773de5a..d344058 100644
--- a/openarm_bringup/package.xml
+++ b/openarm_bringup/package.xml
@@ -26,6 +26,10 @@
ros2_controllers
controller_manager
+ rclpy
+ std_srvs
+ controller_manager_msgs
+ lifecycle_msgs
ament_lint_auto
ament_lint_common
diff --git a/openarm_bringup/scripts/motor_disable_service.py b/openarm_bringup/scripts/motor_disable_service.py
new file mode 100755
index 0000000..959b96b
--- /dev/null
+++ b/openarm_bringup/scripts/motor_disable_service.py
@@ -0,0 +1,146 @@
+#!/usr/bin/env python3
+
+# Copyright 2026 Enactic, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from typing import List
+
+import rclpy
+from controller_manager_msgs.srv import SetHardwareComponentState
+from lifecycle_msgs.msg import State
+from rclpy.node import Node
+from std_srvs.srv import Trigger
+
+
+class MotorDisableServiceNode(Node):
+ """Expose a simple Trigger service to disable arm motors via ros2_control."""
+
+ def __init__(self) -> None:
+ super().__init__("motor_disable_service")
+
+ self.declare_parameter("controller_manager_name", "/controller_manager")
+ self.declare_parameter(
+ "hardware_components",
+ ["openarm_left_hardware_interface", "openarm_right_hardware_interface"],
+ )
+ self.declare_parameter("service_name", "disable_motors")
+ self.declare_parameter("service_timeout_sec", 3.0)
+
+ controller_manager_name = (
+ self.get_parameter("controller_manager_name").get_parameter_value().string_value
+ )
+ service_name = self.get_parameter("service_name").get_parameter_value().string_value
+
+ self._component_names = self._read_component_names()
+ self._service_timeout_sec = (
+ self.get_parameter("service_timeout_sec").get_parameter_value().double_value
+ )
+
+ if not controller_manager_name.startswith("/"):
+ controller_manager_name = "/" + controller_manager_name
+ controller_manager_name = controller_manager_name.rstrip("/")
+
+ set_state_service = f"{controller_manager_name}/set_hardware_component_state"
+ self._set_state_client = self.create_client(
+ SetHardwareComponentState,
+ set_state_service,
+ )
+
+ self.create_service(Trigger, service_name, self._handle_disable_request)
+
+ self.get_logger().info(
+ f"Motor disable service ready: service='{service_name}', "
+ f"cm='{controller_manager_name}', components={self._component_names}"
+ )
+
+ def _read_component_names(self) -> List[str]:
+ raw_value = (
+ self.get_parameter("hardware_components")
+ .get_parameter_value()
+ .string_array_value
+ )
+ component_names = [name.strip() for name in raw_value if name.strip()]
+ return component_names
+
+ def _handle_disable_request(
+ self, _request: Trigger.Request, response: Trigger.Response
+ ) -> Trigger.Response:
+ if not self._component_names:
+ response.success = False
+ response.message = "No hardware_components configured."
+ return response
+
+ if not self._set_state_client.wait_for_service(timeout_sec=self._service_timeout_sec):
+ response.success = False
+ response.message = "controller_manager set_hardware_component_state service not available."
+ return response
+
+ failed_components = []
+ for component_name in self._component_names:
+ if not self._set_component_inactive(component_name):
+ failed_components.append(component_name)
+
+ if failed_components:
+ response.success = False
+ response.message = "Failed to disable components: " + ", ".join(failed_components)
+ return response
+
+ response.success = True
+ response.message = "Motors disabled for: " + ", ".join(self._component_names)
+ return response
+
+ def _set_component_inactive(self, component_name: str) -> bool:
+ request = SetHardwareComponentState.Request()
+ request.name = component_name
+ request.target_state.id = State.PRIMARY_STATE_INACTIVE
+ request.target_state.label = "inactive"
+
+ future = self._set_state_client.call_async(request)
+ rclpy.spin_until_future_complete(self, future, timeout_sec=self._service_timeout_sec)
+
+ if not future.done() or future.result() is None:
+ self.get_logger().error(
+ f"Timeout or empty response when disabling hardware component '{component_name}'."
+ )
+ return False
+
+ result = future.result()
+ if not result.ok:
+ self.get_logger().error(
+ f"controller_manager rejected disable request for '{component_name}' "
+ f"(state={result.state.id}:{result.state.label})."
+ )
+ return False
+
+ self.get_logger().info(
+ f"Disabled component '{component_name}' -> "
+ f"state={result.state.id}:{result.state.label}"
+ )
+ return True
+
+
+def main(args=None) -> None:
+ rclpy.init(args=args)
+ node = MotorDisableServiceNode()
+ try:
+ rclpy.spin(node)
+ except KeyboardInterrupt:
+ pass
+ finally:
+ node.destroy_node()
+ rclpy.shutdown()
+
+
+if __name__ == "__main__":
+ main()