边缘端实时追踪实录:OpenCV + YOLO + CSRT 的落地细节
这块代码是我在边缘端做实时拦截时反复迭代出来的,不是一次写完。 最早我只用 OpenCV,响应快但误检多;后来只用 YOLO,识别准但延迟又太高。 最后稳定下来的是混合方案:光流先海选,YOLO 做确认,CSRT 持续追踪,再接预测控制。 这篇我不写伪代码,只把真实函数拆开讲清楚。
1. 双版本部署前提:开发板优先,PC 兜底
我的工程入口不是“先追踪”,而是先把运行环境区分清楚。 `track.py` 会先尝试导入 `hobot_dnn`,能用 BPU 就走 BPU,失败再回退到 `ultralytics`。 这样做不是炫技,而是让开发板和 PC 调试共用一套状态机逻辑。 同一个模块在两端都能跑,调参和故障复现才不会分叉。 这个结构也直接决定了后面 `_run_yolo_inference` 必须做统一适配层。
我在参数层也把关键约束收敛在类初始化里,包括检测阈值、重确认频率、丢失容忍和惯性航行时长。 这些参数不是装饰,它们分别控制误检成本、漂移成本和控制抖动成本。 比如 `RECONFIRM_INTERVAL=10` 表示每 10 帧做一次身份复核,`TRACKING_LOSS_TOLERANCE=10` 对应短时丢帧容忍。 如果这些阈值散落在函数内部,后期定位“为什么误触发/为什么误重置”会非常痛苦。 所以我把“可调参”当成工程一等公民,而不是补丁。
try:
from hobot_dnn import pyeasy_dnn as dnn
print("成功导入地平。aarch64 dnn 库。)
IS_EDGE_PLATFORM = True
except ImportError:
print("警告: 无法导入 hobot_dnn 库,将使用PC版的YOLO进行模拟。)
from ultralytics import YOLO
IS_EDGE_PLATFORM = False
class HybridTracker:
def __init__(self, model_path='best.pt'):
self.TARGET_CLASS_ID = 1
self.CONFIDENCE_THRESHOLD = 0.25
self.MIN_TARGET_AREA, self.MAX_TARGET_AREA = 500, 25000
self.TRACKING_LOSS_TOLERANCE = 10
self.prediction_time_ms = 400
self.COASTING_DURATION = 0.5
self.RECONFIRM_INTERVAL = 102. 发现阶段:光流海选 + ROI YOLO 确认
在未追踪状态下,我没有直接全图 YOLO,而是先做稠密光流把运动区域筛出来。 真正的 YOLO 推理只在 `roi = frame[y:y+h, x:x+w]` 上跑,这一步显著压低了每帧成本。 这也是我能在边缘端保持实时性的关键动作,不是模型本身有多轻。 如果全图硬跑 YOLO,哪怕检测准确率高,也很容易把控制链路拖慢。 对机器人而言,慢半拍通常比“偶发漏检”更致命。
这里还有一个很实际的点:光流只负责“有运动”,YOLO 才负责“是不是目标”。 这层职责分离让误检和漏检有了可拆解的优化路径。 如果误触发太多,我会先调运动阈值和面积阈值;如果身份确认不稳,再调类别和置信度阈值。 这样排障时不会一上来就在单个大模型里盲目调参。 工程上可控,才谈得上长期维护。
def _detect_new_target(self, frame, frame_gray):
flow = cv2.calcOpticalFlowFarneback(self.prev_gray, frame_gray, None, 0.5, 3, 15, 3, 5, 1.2, 0)
magnitude, _ = cv2.cartToPolar(flow[..., 0], flow[..., 1])
motion_mask = (magnitude > self.MOTION_DETECTION_THRESHOLD).astype(np.uint8) * 255
kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
motion_mask = cv2.morphologyEx(motion_mask, cv2.MORPH_CLOSE, kernel)
contours, _ = cv2.findContours(motion_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
if not contours: return False, (0,0), None
largest_contour = max(contours, key=cv2.contourArea)
area = cv2.contourArea(largest_contour)
if self.MIN_TARGET_AREA < area < self.MAX_TARGET_AREA:
x, y, w, h = cv2.boundingRect(largest_contour)
roi = frame[y:y+h, x:x+w]
if roi.size == 0: return False, (0,0), None
detections = self._run_yolo_inference(roi)
is_target_confirmed = any(
d['class_id'] == self.TARGET_CLASS_ID and d['confidence'] > self.CONFIDENCE_THRESHOLD
for d in detections
)
if is_target_confirmed:
self._initialize_tracker(frame, (x, y, w, h))
center = (x + w // 2, y + h // 2)
return True, center, self.bbox
return False, (0,0), None3. 追踪阶段:CSRT + 多重失效判断 + 周期复核
进入追踪态后我用的是 CSRT,但我没有把它当成“永远正确”的黑盒。 `_track_target` 里先调用 `tracker.update`,失败时进入 `lost_frames_count` 容忍窗口。 如果连续丢失超过阈值,立刻重置,避免控制端持续吃到过期框。 同时我又加了边界检查和静止检查,专门处理“追踪框粘背景”这个老问题。 这几层判断叠在一起,才构成可上线的追踪逻辑。
仅靠几何检查还不够,我还让系统每隔固定帧数做一次 YOLO 身份复核。 它的作用是防漂移,不是再检测一次图像这么简单。 一旦复核失败,直接视为追踪失效并复位,而不是继续凑合。 这一步在复杂背景里特别关键,能明显减少“看起来在追,实际已经跑偏”的假稳定。 从实战看,宁可早点重置,也不要让错控持续输出。
def _track_target(self, frame):
success, bbox = self.tracker.update(frame)
if not success:
self.lost_frames_count += 1
if self.lost_frames_count > self.TRACKING_LOSS_TOLERANCE:
self._reset_tracker()
return False, (0,0), None
return False, (0,0), self.bbox
self.lost_frames_count = 0
x, y, w, h = map(int, bbox)
sane = True
if w <= 0 or h <= 0 or x <= 2 or y <= 2:
sane = False
current_center = (x + w // 2, y + h // 2)
if self.last_bbox_center is not None:
dist = np.sqrt((current_center[0] - self.last_bbox_center[0])**2 + (current_center[1] - self.last_bbox_center[1])**2)
if dist < self.STATIC_PIXEL_TOLERANCE: self.static_frames_count += 1
else: self.static_frames_count = 0
self.last_bbox_center = current_center
if self.static_frames_count > self.STATIC_FRAMES_TOLERANCE:
sane = False
self.reconfirm_frame_count += 1
if sane and self.reconfirm_frame_count >= self.RECONFIRM_INTERVAL:
self.reconfirm_frame_count = 0
roi = frame[y:y+h, x:x+w]
detections = self._run_yolo_inference(roi)
is_still_target = any(d['class_id'] == self.TARGET_CLASS_ID and d['confidence'] > self.CONFIDENCE_THRESHOLD for d in detections)
if not is_still_target:
sane = False4. 控制阶段:预测落点转麦轮控制映射
控制这块我没有做“追当前中心点”的朴素方案,而是先更新像素速度,再算预测点。 预测窗口在代码里是 `prediction_time_ms = 400`,实际就是看 400ms 后目标大概在哪。 然后把预测点喂给 `_calculate_mecanum_control`,输出四个电机的 `[action, speed, time]`。 这个映射里还加了 dead zone,避免目标接近中心时电机来回抖动。 这套做法更像“拦截”而不是“追尾”。
我在速度转换时也做了统一归一化,保证不同偏差下输出都落在可控区间。 具体是把理论最大速度作为分母,将轮速映射到 `-100~100` 百分比。 再把绝对值转成 `int`,并过滤小于 5 的抖动速度。 这样下位机侧收到的数据格式稳定,不会因为小数和噪声导致动作异常。 从控制链路看,稳定格式和稳定算法同样重要。
def _calculate_mecanum_control(self, predicted_center):
dx = predicted_center[0] - self.frame_center_x
dy = predicted_center[1] - self.frame_center_y
if np.sqrt(dx**2 + dy**2) < self.dead_zone_radius:
return {i: [0, 0, 40] for i in range(4)}
Kp_translation, Kp_rotation = 0.4, 0.3
vx, vy, v_rot = dy * Kp_translation, dx * Kp_translation, dx * Kp_rotation
motor_speeds = [vx - vy - v_rot, vx + vy + v_rot, vx + vy - v_rot, vx - vy + v_rot]
max_possible_speed = np.sqrt(self.frame_center_x**2 + self.frame_center_y**2) * Kp_translation
control_map = {}
for i in range(4):
speed_percent_float = (motor_speeds[i] / max_possible_speed) * 100
speed_percent_clipped = np.clip(speed_percent_float, -100, 100)
action = 0 if speed_percent_clipped >= 0 else 1
final_speed = int(abs(speed_percent_clipped))
if final_speed < 5:
final_speed = 0
control_map[i] = [int(action), int(final_speed), 40]
return control_map5. 主循环三态切换:检测、追踪、惯性航行
真正把模块跑稳的是 `process_frame`,它把内部状态切换收敛成一个外部接口。 追踪成功时进入预测控制,追踪失败时根据条件进入惯性航行或直接停机。 惯性航行不是无限续命,而是 `COASTING_DURATION` 时间窗内复用上一次有效运动指令。 这一步对真实场景很有价值,目标短时遮挡时不至于立刻“刹死”。 同时它又有超时退出,避免系统在错误指令上长期漂移。
这套状态切换和说明文档里写的设计目标是一致的:把链路做成可控退化。 目标暂时丢了,系统先平稳过渡;确认找不回,再安全停止。 相比“检测不到就立刻归零”,这种做法更适合运动机器人。 对外调用者只拿一个返回值 `final_control_map`,不用关心内部细节。 这也是它能被后续系统复用的关键。
def process_frame(self, frame):
was_tracking_before_this_frame = self.tracking
if self.prev_gray is None:
self.prev_gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
return {}
frame_gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
if self.tracking:
detected, center, _ = self._track_target(frame)
else:
detected, center, _ = self._detect_new_target(frame, frame_gray)
final_control_map = {}
if detected:
self.is_coasting = False
self._update_velocity(center)
vx_pix, vy_pix = self.pixel_velocity
pred_time_sec = self.prediction_time_ms / 1000.0
predicted_center = (int(center[0] + vx_pix * pred_time_sec), int(center[1] + vy_pix * pred_time_sec))
current_control_map = self._calculate_mecanum_control(predicted_center)
final_control_map = current_control_map
if self._is_motion_command(current_control_map):
self.last_motion_control_map = current_control_map
else:
if was_tracking_before_this_frame and not self.is_coasting:
self.is_coasting = True
self.coasting_end_time = time.time() + self.COASTING_DURATION
if self.is_coasting and time.time() < self.coasting_end_time:
final_control_map = self.last_motion_control_map
else:
self.is_coasting = False
self.last_motion_control_map = {}
self.prev_gray = frame_gray.copy()
return final_control_map