#!/usr/bin/env python3 import argparse import json import socket import sys from typing import Any, Dict def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description="Receive and print OpenArm UDP JSON joint-state packets." ) parser.add_argument("--port", type=int, default=10001, help="Listen UDP port") parser.add_argument( "--bind-ip", default="0.0.0.0", help="Bind IP address (default: 0.0.0.0)", ) parser.add_argument( "--raw", action="store_true", help="Print raw JSON text instead of formatted joint table", ) return parser.parse_args() def print_packet(packet: Dict[str, Any]) -> None: protocol = packet.get("protocol", "unknown") stamp = packet.get("stamp", {}) sec = stamp.get("sec", "?") nanosec = stamp.get("nanosec", "?") if "joints" in packet: joints = packet.get("joints", []) left_arm = [j for j in joints if "_left_" in str(j.get("name", ""))] right_arm = [j for j in joints if "_right_" in str(j.get("name", ""))] other = [j for j in joints if j not in left_arm and j not in right_arm] else: left_arm = packet.get("left_arm", []) right_arm = packet.get("right_arm", []) other = packet.get("other", []) joints = left_arm + right_arm + other print(f"protocol={protocol} stamp={sec}.{nanosec} joints={len(joints)}") for group_name, group_joints in (("left_arm", left_arm), ("right_arm", right_arm), ("other", other)): if not group_joints: continue print(f"{group_name}:") for joint in group_joints: name = joint.get("name", "?") pos = joint.get("position", None) vel = joint.get("velocity", None) tor = joint.get("torque", None) print(f" {name:28s} pos={pos!s:>12s} vel={vel!s:>12s} tau={tor!s:>12s}") def main() -> int: args = parse_args() sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.bind((args.bind_ip, args.port)) print(f"Listening JSON UDP on {args.bind_ip}:{args.port}") try: while True: data, addr = sock.recvfrom(65535) text = data.decode("utf-8", errors="replace") try: packet = json.loads(text) except json.JSONDecodeError as exc: print(f"[{addr[0]}:{addr[1]}] invalid JSON: {exc}") continue print(f"\nfrom {addr[0]}:{addr[1]}") if args.raw: print(text) else: print_packet(packet) except KeyboardInterrupt: print("\nStopped.") finally: sock.close() return 0 if __name__ == "__main__": sys.exit(main())