diff --git a/openarm_bringup/scripts/udp_recv_binary.py b/openarm_bringup/scripts/udp_recv_binary.py new file mode 100755 index 0000000..8fcfcd7 --- /dev/null +++ b/openarm_bringup/scripts/udp_recv_binary.py @@ -0,0 +1,116 @@ +#!/usr/bin/env python3 + +import argparse +import socket +import struct +import sys +from typing import Dict, List, Tuple + + +MAGIC = b"OAJS" +VERSION = 1 + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description="Receive and decode OpenArm UDP binary joint-state packets (OAJS v1)." + ) + parser.add_argument("--port", type=int, default=10001, help="Listen UDP port") + parser.add_argument( + "--bind-ip", + default="0.0.0.0", + help="Bind IP address (default: 0.0.0.0)", + ) + parser.add_argument( + "--raw-size", + action="store_true", + help="Print packet byte size in output", + ) + return parser.parse_args() + + +def decode_packet(data: bytes) -> Dict[str, object]: + header_size = struct.calcsize("<4sB H q I") + if len(data) < header_size: + raise ValueError("packet too short for header") + + magic, version, joint_count, sec, nanosec = struct.unpack_from("<4sB H q I", data, 0) + + if magic != MAGIC: + raise ValueError(f"bad magic: {magic!r}") + if version != VERSION: + raise ValueError(f"unsupported version: {version}") + + offset = header_size + joints: List[Tuple[str, float, float, float]] = [] + + for _ in range(joint_count): + if offset + 1 > len(data): + raise ValueError("truncated packet at name length") + + name_len = data[offset] + offset += 1 + + if offset + name_len > len(data): + raise ValueError("truncated packet at name bytes") + + name_bytes = data[offset:offset + name_len] + offset += name_len + + if offset + 12 > len(data): + raise ValueError("truncated packet at float payload") + + position, velocity, torque = struct.unpack_from(" None: + prefix = f"v{packet['version']} stamp={packet['sec']}.{packet['nanosec']}" + if show_size: + prefix += f" bytes={size}" + + joints = packet["joints"] + print(f"{prefix} joints={len(joints)}") + for name, position, velocity, torque in joints: + print(f" {name:28s} pos={position:>12.6f} vel={velocity:>12.6f} tau={torque:>12.6f}") + + +def main() -> int: + args = parse_args() + + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + sock.bind((args.bind_ip, args.port)) + + print(f"Listening binary UDP on {args.bind_ip}:{args.port}") + + try: + while True: + data, addr = sock.recvfrom(65535) + print(f"\nfrom {addr[0]}:{addr[1]}") + try: + packet = decode_packet(data) + except ValueError as exc: + print(f"decode error: {exc}") + continue + + print_packet(packet, len(data), args.raw_size) + except KeyboardInterrupt: + print("\nStopped.") + finally: + sock.close() + + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/openarm_bringup/scripts/udp_recv_json.py b/openarm_bringup/scripts/udp_recv_json.py new file mode 100755 index 0000000..0dde5b9 --- /dev/null +++ b/openarm_bringup/scripts/udp_recv_json.py @@ -0,0 +1,76 @@ +#!/usr/bin/env python3 + +import argparse +import json +import socket +import sys +from typing import Any, Dict + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description="Receive and print OpenArm UDP JSON joint-state packets." + ) + parser.add_argument("--port", type=int, default=10001, help="Listen UDP port") + parser.add_argument( + "--bind-ip", + default="0.0.0.0", + help="Bind IP address (default: 0.0.0.0)", + ) + parser.add_argument( + "--raw", + action="store_true", + help="Print raw JSON text instead of formatted joint table", + ) + return parser.parse_args() + + +def print_packet(packet: Dict[str, Any]) -> None: + protocol = packet.get("protocol", "unknown") + stamp = packet.get("stamp", {}) + sec = stamp.get("sec", "?") + nanosec = stamp.get("nanosec", "?") + joints = packet.get("joints", []) + + print(f"protocol={protocol} stamp={sec}.{nanosec} joints={len(joints)}") + for joint in joints: + name = joint.get("name", "?") + pos = joint.get("position", None) + vel = joint.get("velocity", None) + tor = joint.get("torque", None) + print(f" {name:28s} pos={pos!s:>12s} vel={vel!s:>12s} tau={tor!s:>12s}") + + +def main() -> int: + args = parse_args() + + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + sock.bind((args.bind_ip, args.port)) + + print(f"Listening JSON UDP on {args.bind_ip}:{args.port}") + + try: + while True: + data, addr = sock.recvfrom(65535) + text = data.decode("utf-8", errors="replace") + try: + packet = json.loads(text) + except json.JSONDecodeError as exc: + print(f"[{addr[0]}:{addr[1]}] invalid JSON: {exc}") + continue + + print(f"\nfrom {addr[0]}:{addr[1]}") + if args.raw: + print(text) + else: + print_packet(packet) + except KeyboardInterrupt: + print("\nStopped.") + finally: + sock.close() + + return 0 + + +if __name__ == "__main__": + sys.exit(main())