#!/usr/bin/env python3 import argparse import json import socket import sys from typing import Any, Dict def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description="Receive and print OpenArm UDP JSON joint-state packets." ) parser.add_argument("--port", type=int, default=10001, help="Listen UDP port") parser.add_argument( "--bind-ip", default="0.0.0.0", help="Bind IP address (default: 0.0.0.0)", ) parser.add_argument( "--raw", action="store_true", help="Print raw JSON text instead of formatted joint table", ) return parser.parse_args() def print_packet(packet: Dict[str, Any]) -> None: protocol = packet.get("protocol", "unknown") stamp = packet.get("stamp", {}) sec = stamp.get("sec", "?") nanosec = stamp.get("nanosec", "?") joints = packet.get("joints", []) print(f"protocol={protocol} stamp={sec}.{nanosec} joints={len(joints)}") for joint in joints: name = joint.get("name", "?") pos = joint.get("position", None) vel = joint.get("velocity", None) tor = joint.get("torque", None) print(f" {name:28s} pos={pos!s:>12s} vel={vel!s:>12s} tau={tor!s:>12s}") def main() -> int: args = parse_args() sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.bind((args.bind_ip, args.port)) print(f"Listening JSON UDP on {args.bind_ip}:{args.port}") try: while True: data, addr = sock.recvfrom(65535) text = data.decode("utf-8", errors="replace") try: packet = json.loads(text) except json.JSONDecodeError as exc: print(f"[{addr[0]}:{addr[1]}] invalid JSON: {exc}") continue print(f"\nfrom {addr[0]}:{addr[1]}") if args.raw: print(text) else: print_packet(packet) except KeyboardInterrupt: print("\nStopped.") finally: sock.close() return 0 if __name__ == "__main__": sys.exit(main())