增加各个关节状态通过udp发布

This commit is contained in:
shen 2026-03-30 15:54:20 +08:00
parent 5a1db3c245
commit 6f163bff00
4 changed files with 232 additions and 0 deletions

View File

@ -43,6 +43,7 @@ install(DIRECTORY launch config
install(PROGRAMS install(PROGRAMS
scripts/motor_disable_service.py scripts/motor_disable_service.py
scripts/joint_state_udp_broadcaster.py
DESTINATION lib/${PROJECT_NAME} DESTINATION lib/${PROJECT_NAME}
) )

View File

@ -20,6 +20,7 @@ from ament_index_python.packages import get_package_share_directory
from launch import LaunchDescription, LaunchContext from launch import LaunchDescription, LaunchContext
from launch.actions import DeclareLaunchArgument, RegisterEventHandler, TimerAction, OpaqueFunction from launch.actions import DeclareLaunchArgument, RegisterEventHandler, TimerAction, OpaqueFunction
from launch.conditions import IfCondition
from launch.event_handlers import OnProcessExit from launch.event_handlers import OnProcessExit
from launch.substitutions import ( from launch.substitutions import (
LaunchConfiguration, LaunchConfiguration,
@ -306,6 +307,32 @@ def generate_launch_description():
default_value="", default_value="",
description="Tip link override for KDL chain. Empty uses arm-specific link7.", description="Tip link override for KDL chain. Empty uses arm-specific link7.",
), ),
DeclareLaunchArgument(
"enable_joint_state_udp_broadcast",
default_value="false",
description="Enable UDP broadcast for arm joint position/velocity/torque.",
),
DeclareLaunchArgument(
"joint_state_udp_broadcast_ip",
default_value="255.255.255.255",
description="UDP target IP for joint state broadcast (broadcast or unicast).",
),
DeclareLaunchArgument(
"joint_state_udp_broadcast_port",
default_value="10001",
description="UDP target port for joint state broadcast.",
),
DeclareLaunchArgument(
"joint_state_udp_joint_regex",
default_value="^openarm_(left|right)_(joint[1-7]|finger_joint1)$",
description="Regex to select joint names included in UDP payload.",
),
DeclareLaunchArgument(
"joint_state_udp_payload_format",
default_value="json",
choices=["json", "binary"],
description="UDP payload format for joint state broadcast.",
),
] ]
# Initialize launch configurations # Initialize launch configurations
@ -333,6 +360,11 @@ def generate_launch_description():
kdl_urdf_path = LaunchConfiguration("kdl_urdf_path") kdl_urdf_path = LaunchConfiguration("kdl_urdf_path")
kdl_base_link = LaunchConfiguration("kdl_base_link") kdl_base_link = LaunchConfiguration("kdl_base_link")
kdl_tip_link = LaunchConfiguration("kdl_tip_link") kdl_tip_link = LaunchConfiguration("kdl_tip_link")
enable_joint_state_udp_broadcast = LaunchConfiguration("enable_joint_state_udp_broadcast")
joint_state_udp_broadcast_ip = LaunchConfiguration("joint_state_udp_broadcast_ip")
joint_state_udp_broadcast_port = LaunchConfiguration("joint_state_udp_broadcast_port")
joint_state_udp_joint_regex = LaunchConfiguration("joint_state_udp_joint_regex")
joint_state_udp_payload_format = LaunchConfiguration("joint_state_udp_payload_format")
controllers_file = PathJoinSubstitution( controllers_file = PathJoinSubstitution(
[FindPackageShare(runtime_config_package), "config", [FindPackageShare(runtime_config_package), "config",
@ -397,6 +429,23 @@ def generate_launch_description():
args=[arm_prefix], args=[arm_prefix],
) )
joint_state_udp_broadcaster_node = OpaqueFunction(
function=lambda context: [Node(
package="openarm_bringup",
executable="joint_state_udp_broadcaster.py",
namespace=namespace_from_context(context, arm_prefix),
output="screen",
condition=IfCondition(enable_joint_state_udp_broadcast),
parameters=[{
"input_topic": "joint_states",
"broadcast_ip": joint_state_udp_broadcast_ip,
"broadcast_port": joint_state_udp_broadcast_port,
"joint_name_regex": joint_state_udp_joint_regex,
"payload_format": joint_state_udp_payload_format,
}],
)]
)
# Timing and sequencing # Timing and sequencing
LAUNCH_DELAY_SECONDS = 1.0 LAUNCH_DELAY_SECONDS = 1.0
delayed_joint_state_broadcaster = TimerAction( delayed_joint_state_broadcaster = TimerAction(
@ -418,6 +467,7 @@ def generate_launch_description():
robot_nodes_spawner_func, robot_nodes_spawner_func,
rviz_node, rviz_node,
motor_disable_service_spawner_func, motor_disable_service_spawner_func,
joint_state_udp_broadcaster_node,
] + ] +
[ [
delayed_joint_state_broadcaster, delayed_joint_state_broadcaster,

View File

@ -27,6 +27,7 @@
<exec_depend>ros2_controllers</exec_depend> <exec_depend>ros2_controllers</exec_depend>
<exec_depend>controller_manager</exec_depend> <exec_depend>controller_manager</exec_depend>
<exec_depend>rclpy</exec_depend> <exec_depend>rclpy</exec_depend>
<exec_depend>sensor_msgs</exec_depend>
<exec_depend>std_srvs</exec_depend> <exec_depend>std_srvs</exec_depend>
<exec_depend>controller_manager_msgs</exec_depend> <exec_depend>controller_manager_msgs</exec_depend>
<exec_depend>lifecycle_msgs</exec_depend> <exec_depend>lifecycle_msgs</exec_depend>

View File

@ -0,0 +1,180 @@
#!/usr/bin/env python3
# Copyright 2026 Enactic, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import math
import re
import socket
import struct
from typing import Any, Dict, List, Optional
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import JointState
class JointStateUdpBroadcaster(Node):
"""Broadcast selected joint states over UDP as JSON."""
def __init__(self) -> None:
super().__init__("joint_state_udp_broadcaster")
self.declare_parameter("input_topic", "joint_states")
self.declare_parameter(
"joint_name_regex",
r"^openarm_(left|right)_(joint[1-7]|finger_joint1)$",
)
self.declare_parameter("broadcast_ip", "255.255.255.255")
self.declare_parameter("broadcast_port", 10001)
self.declare_parameter("payload_format", "json")
input_topic = self.get_parameter("input_topic").get_parameter_value().string_value
regex_str = self.get_parameter("joint_name_regex").get_parameter_value().string_value
broadcast_ip = self.get_parameter("broadcast_ip").get_parameter_value().string_value
broadcast_port = self.get_parameter("broadcast_port").get_parameter_value().integer_value
payload_format = self.get_parameter("payload_format").get_parameter_value().string_value
self._payload_format = payload_format.lower().strip()
if self._payload_format not in ("json", "binary"):
raise ValueError("Parameter 'payload_format' must be 'json' or 'binary'.")
self._joint_name_pattern = re.compile(regex_str)
self._udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self._udp_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
self._target = (broadcast_ip, int(broadcast_port))
self._subscription = self.create_subscription(
JointState,
input_topic,
self._joint_state_callback,
50,
)
self.get_logger().info(
"UDP joint state broadcast started: "
f"topic='{input_topic}', target={broadcast_ip}:{broadcast_port}, "
f"regex='{regex_str}', format='{self._payload_format}'"
)
def _joint_state_callback(self, msg: JointState) -> None:
selected = self._select_joint_states(msg)
if not selected:
return
if self._payload_format == "binary":
data = self._build_binary_packet(msg, selected)
else:
payload = {
"protocol": "openarm_joint_state_json_v1",
"stamp": {
"sec": int(msg.header.stamp.sec),
"nanosec": int(msg.header.stamp.nanosec),
},
"frame_id": msg.header.frame_id,
"joint_count": len(selected),
"joints": selected,
}
data = json.dumps(payload, separators=(",", ":")).encode("utf-8")
try:
self._udp_socket.sendto(data, self._target)
except OSError as exc:
self.get_logger().error(f"Failed to send UDP packet: {exc}")
def _select_joint_states(self, msg: JointState) -> List[Dict[str, Any]]:
out: List[Dict[str, Any]] = []
for index, name in enumerate(msg.name):
if not self._joint_name_pattern.match(name):
continue
out.append(
{
"name": name,
"position": self._array_value(msg.position, index),
"velocity": self._array_value(msg.velocity, index),
"torque": self._array_value(msg.effort, index),
}
)
return out
@staticmethod
def _array_value(values: List[float], index: int) -> Optional[float]:
if index < len(values):
return float(values[index])
return None
def _build_binary_packet(
self,
msg: JointState,
selected: List[Dict[str, Any]],
) -> bytes:
# Packet layout (little-endian):
# magic(4='OAJS') version(u8=1) joint_count(u16) sec(i64) nanosec(u32)
# repeated joint entries:
# name_len(u8) name(bytes) position(f32) velocity(f32) torque(f32)
packet = bytearray()
packet.extend(b"OAJS")
packet.extend(struct.pack("<B", 1))
packet.extend(struct.pack("<H", len(selected)))
packet.extend(struct.pack("<q", int(msg.header.stamp.sec)))
packet.extend(struct.pack("<I", int(msg.header.stamp.nanosec)))
for joint in selected:
name_bytes = str(joint["name"]).encode("utf-8")
if len(name_bytes) > 255:
name_bytes = name_bytes[:255]
packet.extend(struct.pack("<B", len(name_bytes)))
packet.extend(name_bytes)
packet.extend(struct.pack("<f", self._to_f32(joint["position"])))
packet.extend(struct.pack("<f", self._to_f32(joint["velocity"])))
packet.extend(struct.pack("<f", self._to_f32(joint["torque"])))
return bytes(packet)
@staticmethod
def _to_f32(value: Optional[float]) -> float:
if value is None:
return float("nan")
if isinstance(value, float) and math.isnan(value):
return value
return float(value)
def destroy_node(self) -> bool:
try:
self._udp_socket.close()
except OSError:
pass
return super().destroy_node()
def main(args=None) -> None:
rclpy.init(args=args)
node = JointStateUdpBroadcaster()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
main()