#!/usr/bin/env python3 # Copyright 2026 Enactic, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from typing import List import rclpy from controller_manager_msgs.srv import SetHardwareComponentState from lifecycle_msgs.msg import State from rclpy.node import Node from std_srvs.srv import Trigger class MotorDisableServiceNode(Node): """Expose a simple Trigger service to disable arm motors via ros2_control.""" def __init__(self) -> None: super().__init__("motor_disable_service") self.declare_parameter("controller_manager_name", "/controller_manager") self.declare_parameter( "hardware_components", ["openarm_left_hardware_interface", "openarm_right_hardware_interface"], ) self.declare_parameter("service_name", "disable_motors") self.declare_parameter("service_timeout_sec", 3.0) controller_manager_name = ( self.get_parameter("controller_manager_name").get_parameter_value().string_value ) service_name = self.get_parameter("service_name").get_parameter_value().string_value self._component_names = self._read_component_names() self._service_timeout_sec = ( self.get_parameter("service_timeout_sec").get_parameter_value().double_value ) if not controller_manager_name.startswith("/"): controller_manager_name = "/" + controller_manager_name controller_manager_name = controller_manager_name.rstrip("/") set_state_service = f"{controller_manager_name}/set_hardware_component_state" self._set_state_client = self.create_client( SetHardwareComponentState, set_state_service, ) self.create_service(Trigger, service_name, self._handle_disable_request) self.get_logger().info( f"Motor disable service ready: service='{service_name}', " f"cm='{controller_manager_name}', components={self._component_names}" ) def _read_component_names(self) -> List[str]: raw_value = ( self.get_parameter("hardware_components") .get_parameter_value() .string_array_value ) component_names = [name.strip() for name in raw_value if name.strip()] return component_names def _handle_disable_request( self, _request: Trigger.Request, response: Trigger.Response ) -> Trigger.Response: if not self._component_names: response.success = False response.message = "No hardware_components configured." return response if not self._set_state_client.wait_for_service(timeout_sec=self._service_timeout_sec): response.success = False response.message = "controller_manager set_hardware_component_state service not available." return response failed_components = [] for component_name in self._component_names: if not self._set_component_inactive(component_name): failed_components.append(component_name) if failed_components: response.success = False response.message = "Failed to disable components: " + ", ".join(failed_components) return response response.success = True response.message = "Motors disabled for: " + ", ".join(self._component_names) return response def _set_component_inactive(self, component_name: str) -> bool: request = SetHardwareComponentState.Request() request.name = component_name request.target_state.id = State.PRIMARY_STATE_INACTIVE request.target_state.label = "inactive" future = self._set_state_client.call_async(request) rclpy.spin_until_future_complete(self, future, timeout_sec=self._service_timeout_sec) if not future.done() or future.result() is None: self.get_logger().error( f"Timeout or empty response when disabling hardware component '{component_name}'." ) return False result = future.result() if not result.ok: self.get_logger().error( f"controller_manager rejected disable request for '{component_name}' " f"(state={result.state.id}:{result.state.label})." ) return False self.get_logger().info( f"Disabled component '{component_name}' -> " f"state={result.state.id}:{result.state.label}" ) return True def main(args=None) -> None: rclpy.init(args=args) node = MotorDisableServiceNode() try: rclpy.spin(node) except KeyboardInterrupt: pass finally: node.destroy_node() rclpy.shutdown() if __name__ == "__main__": main()