77 lines
2.1 KiB
Python
77 lines
2.1 KiB
Python
|
|
#!/usr/bin/env python3
|
||
|
|
|
||
|
|
import argparse
|
||
|
|
import json
|
||
|
|
import socket
|
||
|
|
import sys
|
||
|
|
from typing import Any, Dict
|
||
|
|
|
||
|
|
|
||
|
|
def parse_args() -> argparse.Namespace:
|
||
|
|
parser = argparse.ArgumentParser(
|
||
|
|
description="Receive and print OpenArm UDP JSON joint-state packets."
|
||
|
|
)
|
||
|
|
parser.add_argument("--port", type=int, default=10001, help="Listen UDP port")
|
||
|
|
parser.add_argument(
|
||
|
|
"--bind-ip",
|
||
|
|
default="0.0.0.0",
|
||
|
|
help="Bind IP address (default: 0.0.0.0)",
|
||
|
|
)
|
||
|
|
parser.add_argument(
|
||
|
|
"--raw",
|
||
|
|
action="store_true",
|
||
|
|
help="Print raw JSON text instead of formatted joint table",
|
||
|
|
)
|
||
|
|
return parser.parse_args()
|
||
|
|
|
||
|
|
|
||
|
|
def print_packet(packet: Dict[str, Any]) -> None:
|
||
|
|
protocol = packet.get("protocol", "unknown")
|
||
|
|
stamp = packet.get("stamp", {})
|
||
|
|
sec = stamp.get("sec", "?")
|
||
|
|
nanosec = stamp.get("nanosec", "?")
|
||
|
|
joints = packet.get("joints", [])
|
||
|
|
|
||
|
|
print(f"protocol={protocol} stamp={sec}.{nanosec} joints={len(joints)}")
|
||
|
|
for joint in joints:
|
||
|
|
name = joint.get("name", "?")
|
||
|
|
pos = joint.get("position", None)
|
||
|
|
vel = joint.get("velocity", None)
|
||
|
|
tor = joint.get("torque", None)
|
||
|
|
print(f" {name:28s} pos={pos!s:>12s} vel={vel!s:>12s} tau={tor!s:>12s}")
|
||
|
|
|
||
|
|
|
||
|
|
def main() -> int:
|
||
|
|
args = parse_args()
|
||
|
|
|
||
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||
|
|
sock.bind((args.bind_ip, args.port))
|
||
|
|
|
||
|
|
print(f"Listening JSON UDP on {args.bind_ip}:{args.port}")
|
||
|
|
|
||
|
|
try:
|
||
|
|
while True:
|
||
|
|
data, addr = sock.recvfrom(65535)
|
||
|
|
text = data.decode("utf-8", errors="replace")
|
||
|
|
try:
|
||
|
|
packet = json.loads(text)
|
||
|
|
except json.JSONDecodeError as exc:
|
||
|
|
print(f"[{addr[0]}:{addr[1]}] invalid JSON: {exc}")
|
||
|
|
continue
|
||
|
|
|
||
|
|
print(f"\nfrom {addr[0]}:{addr[1]}")
|
||
|
|
if args.raw:
|
||
|
|
print(text)
|
||
|
|
else:
|
||
|
|
print_packet(packet)
|
||
|
|
except KeyboardInterrupt:
|
||
|
|
print("\nStopped.")
|
||
|
|
finally:
|
||
|
|
sock.close()
|
||
|
|
|
||
|
|
return 0
|
||
|
|
|
||
|
|
|
||
|
|
if __name__ == "__main__":
|
||
|
|
sys.exit(main())
|