2026-03-31 05:32:21 +00:00
|
|
|
#!/usr/bin/env python3
|
|
|
|
|
|
|
|
|
|
import argparse
|
|
|
|
|
import json
|
|
|
|
|
import socket
|
|
|
|
|
import sys
|
|
|
|
|
from typing import Any, Dict
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def parse_args() -> argparse.Namespace:
|
|
|
|
|
parser = argparse.ArgumentParser(
|
|
|
|
|
description="Receive and print OpenArm UDP JSON joint-state packets."
|
|
|
|
|
)
|
|
|
|
|
parser.add_argument("--port", type=int, default=10001, help="Listen UDP port")
|
|
|
|
|
parser.add_argument(
|
|
|
|
|
"--bind-ip",
|
|
|
|
|
default="0.0.0.0",
|
|
|
|
|
help="Bind IP address (default: 0.0.0.0)",
|
|
|
|
|
)
|
|
|
|
|
parser.add_argument(
|
|
|
|
|
"--raw",
|
|
|
|
|
action="store_true",
|
|
|
|
|
help="Print raw JSON text instead of formatted joint table",
|
|
|
|
|
)
|
|
|
|
|
return parser.parse_args()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def print_packet(packet: Dict[str, Any]) -> None:
|
|
|
|
|
protocol = packet.get("protocol", "unknown")
|
|
|
|
|
stamp = packet.get("stamp", {})
|
|
|
|
|
sec = stamp.get("sec", "?")
|
|
|
|
|
nanosec = stamp.get("nanosec", "?")
|
2026-03-31 11:53:30 +00:00
|
|
|
if "joints" in packet:
|
|
|
|
|
joints = packet.get("joints", [])
|
|
|
|
|
left_arm = [j for j in joints if "_left_" in str(j.get("name", ""))]
|
|
|
|
|
right_arm = [j for j in joints if "_right_" in str(j.get("name", ""))]
|
|
|
|
|
other = [j for j in joints if j not in left_arm and j not in right_arm]
|
|
|
|
|
else:
|
|
|
|
|
left_arm = packet.get("left_arm", [])
|
|
|
|
|
right_arm = packet.get("right_arm", [])
|
|
|
|
|
other = packet.get("other", [])
|
|
|
|
|
joints = left_arm + right_arm + other
|
2026-03-31 05:32:21 +00:00
|
|
|
|
|
|
|
|
print(f"protocol={protocol} stamp={sec}.{nanosec} joints={len(joints)}")
|
2026-03-31 11:53:30 +00:00
|
|
|
|
|
|
|
|
for group_name, group_joints in (("left_arm", left_arm), ("right_arm", right_arm), ("other", other)):
|
|
|
|
|
if not group_joints:
|
|
|
|
|
continue
|
|
|
|
|
print(f"{group_name}:")
|
|
|
|
|
for joint in group_joints:
|
|
|
|
|
name = joint.get("name", "?")
|
|
|
|
|
pos = joint.get("position", None)
|
|
|
|
|
vel = joint.get("velocity", None)
|
|
|
|
|
tor = joint.get("torque", None)
|
|
|
|
|
print(f" {name:28s} pos={pos!s:>12s} vel={vel!s:>12s} tau={tor!s:>12s}")
|
2026-03-31 05:32:21 +00:00
|
|
|
|
|
|
|
|
|
|
|
|
|
def main() -> int:
|
|
|
|
|
args = parse_args()
|
|
|
|
|
|
|
|
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
|
|
|
sock.bind((args.bind_ip, args.port))
|
|
|
|
|
|
|
|
|
|
print(f"Listening JSON UDP on {args.bind_ip}:{args.port}")
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
while True:
|
|
|
|
|
data, addr = sock.recvfrom(65535)
|
|
|
|
|
text = data.decode("utf-8", errors="replace")
|
|
|
|
|
try:
|
|
|
|
|
packet = json.loads(text)
|
|
|
|
|
except json.JSONDecodeError as exc:
|
|
|
|
|
print(f"[{addr[0]}:{addr[1]}] invalid JSON: {exc}")
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
print(f"\nfrom {addr[0]}:{addr[1]}")
|
|
|
|
|
if args.raw:
|
|
|
|
|
print(text)
|
|
|
|
|
else:
|
|
|
|
|
print_packet(packet)
|
|
|
|
|
except KeyboardInterrupt:
|
|
|
|
|
print("\nStopped.")
|
|
|
|
|
finally:
|
|
|
|
|
sock.close()
|
|
|
|
|
|
|
|
|
|
return 0
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
|
sys.exit(main())
|