增加传感器滤波

This commit is contained in:
shenchenyang 2026-02-25 15:46:24 +08:00
parent 1dde8de63f
commit 26cf62214e

View File

@ -9,12 +9,81 @@ from sensor_msgs.msg import JointState
count = 0
count1 = 0
class PositionFilter:
"""Simple per-joint filter supporting EMA and a basic 1D Kalman.
Usage:
f = PositionFilter(filter_type='ema', alpha=0.2)
out = f.update(measurement)
"""
def __init__(self, filter_type='ema', alpha=0.2, q=1e-5, r=1e-2):
self.filter_type = filter_type
self.alpha = float(alpha)
# Kalman params
self.q = float(q) # process noise
self.r = float(r) # measurement noise
self.x = None # state estimate
self.p = 1.0 # estimate covariance
def reset(self):
self.x = None
self.p = 1.0
def update(self, z):
# z: measurement (float)
if z is None:
return self.x
if self.filter_type == 'ema':
if self.x is None:
self.x = float(z)
else:
self.x = self.alpha * float(z) + (1.0 - self.alpha) * self.x
return self.x
if self.filter_type == 'kalman':
# Predict
if self.x is None:
self.x = float(z)
self.p = 1.0
return self.x
self.p = self.p + self.q
# Update
k = self.p / (self.p + self.r)
self.x = self.x + k * (float(z) - self.x)
self.p = (1 - k) * self.p
return self.x
# default: passthrough
return float(z)
class ProgressActionClient(Node):
def __init__(self):
super().__init__('progress_action_client')
# Parameters for optional position filtering
self.declare_parameter('enable_filter', True)
self.declare_parameter('filter_type', 'ema')
self.declare_parameter('ema_alpha', 0.2)
self.declare_parameter('kalman_q', 1e-5)
self.declare_parameter('kalman_r', 1e-2)
self.enable_filter = self.get_parameter('enable_filter').get_parameter_value().bool_value
self.filter_type = self.get_parameter('filter_type').get_parameter_value().string_value
self.ema_alpha = self.get_parameter('ema_alpha').get_parameter_value().double_value
self.kalman_q = self.get_parameter('kalman_q').get_parameter_value().double_value
self.kalman_r = self.get_parameter('kalman_r').get_parameter_value().double_value
# Create per-joint filters (7 joints expected)
self.num_joints = 7
self.filters = [PositionFilter(filter_type=self.filter_type,
alpha=self.ema_alpha,
q=self.kalman_q,
r=self.kalman_r)
for _ in range(self.num_joints)]
self.subscriber_ = self.create_subscription(
JointState,
"/joint_sensor_states",
@ -38,7 +107,28 @@ class ProgressActionClient(Node):
# self.send_goal([count, count1, 0.0, 0.0, 0.0, 0.0, 0.0])
self.get_logger().info(f'Received joint states: {list(msg.name)} positions: {list(msg.position)}')
self.send_goal(msg.position)
# Apply position filtering if enabled
arm_pos = list(msg.position)
if self.enable_filter:
if len(arm_pos) >= self.num_joints:
filtered = []
for i in range(self.num_joints):
try:
v = float(arm_pos[i])
except Exception:
v = None
fv = self.filters[i].update(v)
# If filter returned None (no prior), fall back to raw
filtered.append(fv if fv is not None else (arm_pos[i] if i < len(arm_pos) else 0.0))
arm_pos_to_send = filtered
else:
# unexpected length: skip filtering and log
self.get_logger().warning(f'Expected {self.num_joints} positions, got {len(arm_pos)}. Sending raw positions.')
arm_pos_to_send = arm_pos
else:
arm_pos_to_send = arm_pos
self.send_goal(arm_pos_to_send)
def send_goal(self, arm_pos):
arm_joint = ['openarm_right_joint1', 'openarm_right_joint2', 'openarm_right_joint3', 'openarm_right_joint4', 'openarm_right_joint5', 'openarm_right_joint6', 'openarm_right_joint7']