← 返回文章列表
24 min readPublished

边缘端实时追踪实录:OpenCV + YOLO + CSRT 的落地细节

这块代码是我在边缘端做实时拦截时反复迭代出来的,不是一次写完。 最早我只用 OpenCV,响应快但误检多;后来只用 YOLO,识别准但延迟又太高。 最后稳定下来的是混合方案:光流先海选,YOLO 做确认,CSRT 持续追踪,再接预测控制。 这篇我不写伪代码,只把真实函数拆开讲清楚。

查看关联项目:基于 OpenCV + YOLO 的动态检测-追踪-预测算法

1. 双版本部署前提:开发板优先,PC 兜底

我的工程入口不是“先追踪”,而是先把运行环境区分清楚。 `track.py` 会先尝试导入 `hobot_dnn`,能用 BPU 就走 BPU,失败再回退到 `ultralytics`。 这样做不是炫技,而是让开发板和 PC 调试共用一套状态机逻辑。 同一个模块在两端都能跑,调参和故障复现才不会分叉。 这个结构也直接决定了后面 `_run_yolo_inference` 必须做统一适配层。

我在参数层也把关键约束收敛在类初始化里,包括检测阈值、重确认频率、丢失容忍和惯性航行时长。 这些参数不是装饰,它们分别控制误检成本、漂移成本和控制抖动成本。 比如 `RECONFIRM_INTERVAL=10` 表示每 10 帧做一次身份复核,`TRACKING_LOSS_TOLERANCE=10` 对应短时丢帧容忍。 如果这些阈值散落在函数内部,后期定位“为什么误触发/为什么误重置”会非常痛苦。 所以我把“可调参”当成工程一等公民,而不是补丁。

try:
    from hobot_dnn import pyeasy_dnn as dnn
    print("成功导入地平。aarch64 dnn 库。)
    IS_EDGE_PLATFORM = True
except ImportError:
    print("警告: 无法导入 hobot_dnn 库,将使用PC版的YOLO进行模拟。)
    from ultralytics import YOLO
    IS_EDGE_PLATFORM = False

class HybridTracker:
    def __init__(self, model_path='best.pt'):
        self.TARGET_CLASS_ID = 1
        self.CONFIDENCE_THRESHOLD = 0.25
        self.MIN_TARGET_AREA, self.MAX_TARGET_AREA = 500, 25000
        self.TRACKING_LOSS_TOLERANCE = 10
        self.prediction_time_ms = 400
        self.COASTING_DURATION = 0.5
        self.RECONFIRM_INTERVAL = 10

2. 发现阶段:光流海选 + ROI YOLO 确认

在未追踪状态下,我没有直接全图 YOLO,而是先做稠密光流把运动区域筛出来。 真正的 YOLO 推理只在 `roi = frame[y:y+h, x:x+w]` 上跑,这一步显著压低了每帧成本。 这也是我能在边缘端保持实时性的关键动作,不是模型本身有多轻。 如果全图硬跑 YOLO,哪怕检测准确率高,也很容易把控制链路拖慢。 对机器人而言,慢半拍通常比“偶发漏检”更致命。

这里还有一个很实际的点:光流只负责“有运动”,YOLO 才负责“是不是目标”。 这层职责分离让误检和漏检有了可拆解的优化路径。 如果误触发太多,我会先调运动阈值和面积阈值;如果身份确认不稳,再调类别和置信度阈值。 这样排障时不会一上来就在单个大模型里盲目调参。 工程上可控,才谈得上长期维护。

def _detect_new_target(self, frame, frame_gray):
    flow = cv2.calcOpticalFlowFarneback(self.prev_gray, frame_gray, None, 0.5, 3, 15, 3, 5, 1.2, 0)
    magnitude, _ = cv2.cartToPolar(flow[..., 0], flow[..., 1])
    motion_mask = (magnitude > self.MOTION_DETECTION_THRESHOLD).astype(np.uint8) * 255
    kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5, 5))
    motion_mask = cv2.morphologyEx(motion_mask, cv2.MORPH_CLOSE, kernel)
    contours, _ = cv2.findContours(motion_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    if not contours: return False, (0,0), None

    largest_contour = max(contours, key=cv2.contourArea)
    area = cv2.contourArea(largest_contour)
    if self.MIN_TARGET_AREA < area < self.MAX_TARGET_AREA:
        x, y, w, h = cv2.boundingRect(largest_contour)
        roi = frame[y:y+h, x:x+w]
        if roi.size == 0: return False, (0,0), None
        detections = self._run_yolo_inference(roi)
        is_target_confirmed = any(
            d['class_id'] == self.TARGET_CLASS_ID and d['confidence'] > self.CONFIDENCE_THRESHOLD
            for d in detections
        )
        if is_target_confirmed:
            self._initialize_tracker(frame, (x, y, w, h))
            center = (x + w // 2, y + h // 2)
            return True, center, self.bbox
    return False, (0,0), None

3. 追踪阶段:CSRT + 多重失效判断 + 周期复核

进入追踪态后我用的是 CSRT,但我没有把它当成“永远正确”的黑盒。 `_track_target` 里先调用 `tracker.update`,失败时进入 `lost_frames_count` 容忍窗口。 如果连续丢失超过阈值,立刻重置,避免控制端持续吃到过期框。 同时我又加了边界检查和静止检查,专门处理“追踪框粘背景”这个老问题。 这几层判断叠在一起,才构成可上线的追踪逻辑。

仅靠几何检查还不够,我还让系统每隔固定帧数做一次 YOLO 身份复核。 它的作用是防漂移,不是再检测一次图像这么简单。 一旦复核失败,直接视为追踪失效并复位,而不是继续凑合。 这一步在复杂背景里特别关键,能明显减少“看起来在追,实际已经跑偏”的假稳定。 从实战看,宁可早点重置,也不要让错控持续输出。

def _track_target(self, frame):
    success, bbox = self.tracker.update(frame)
    if not success:
        self.lost_frames_count += 1
        if self.lost_frames_count > self.TRACKING_LOSS_TOLERANCE:
            self._reset_tracker()
            return False, (0,0), None
        return False, (0,0), self.bbox

    self.lost_frames_count = 0
    x, y, w, h = map(int, bbox)
    sane = True
    if w <= 0 or h <= 0 or x <= 2 or y <= 2:
        sane = False

    current_center = (x + w // 2, y + h // 2)
    if self.last_bbox_center is not None:
        dist = np.sqrt((current_center[0] - self.last_bbox_center[0])**2 + (current_center[1] - self.last_bbox_center[1])**2)
        if dist < self.STATIC_PIXEL_TOLERANCE: self.static_frames_count += 1
        else: self.static_frames_count = 0
    self.last_bbox_center = current_center
    if self.static_frames_count > self.STATIC_FRAMES_TOLERANCE:
        sane = False

    self.reconfirm_frame_count += 1
    if sane and self.reconfirm_frame_count >= self.RECONFIRM_INTERVAL:
        self.reconfirm_frame_count = 0
        roi = frame[y:y+h, x:x+w]
        detections = self._run_yolo_inference(roi)
        is_still_target = any(d['class_id'] == self.TARGET_CLASS_ID and d['confidence'] > self.CONFIDENCE_THRESHOLD for d in detections)
        if not is_still_target:
            sane = False

4. 控制阶段:预测落点转麦轮控制映射

控制这块我没有做“追当前中心点”的朴素方案,而是先更新像素速度,再算预测点。 预测窗口在代码里是 `prediction_time_ms = 400`,实际就是看 400ms 后目标大概在哪。 然后把预测点喂给 `_calculate_mecanum_control`,输出四个电机的 `[action, speed, time]`。 这个映射里还加了 dead zone,避免目标接近中心时电机来回抖动。 这套做法更像“拦截”而不是“追尾”。

我在速度转换时也做了统一归一化,保证不同偏差下输出都落在可控区间。 具体是把理论最大速度作为分母,将轮速映射到 `-100~100` 百分比。 再把绝对值转成 `int`,并过滤小于 5 的抖动速度。 这样下位机侧收到的数据格式稳定,不会因为小数和噪声导致动作异常。 从控制链路看,稳定格式和稳定算法同样重要。

def _calculate_mecanum_control(self, predicted_center):
    dx = predicted_center[0] - self.frame_center_x
    dy = predicted_center[1] - self.frame_center_y

    if np.sqrt(dx**2 + dy**2) < self.dead_zone_radius:
        return {i: [0, 0, 40] for i in range(4)}

    Kp_translation, Kp_rotation = 0.4, 0.3
    vx, vy, v_rot = dy * Kp_translation, dx * Kp_translation, dx * Kp_rotation
    motor_speeds = [vx - vy - v_rot, vx + vy + v_rot, vx + vy - v_rot, vx - vy + v_rot]
    max_possible_speed = np.sqrt(self.frame_center_x**2 + self.frame_center_y**2) * Kp_translation

    control_map = {}
    for i in range(4):
        speed_percent_float = (motor_speeds[i] / max_possible_speed) * 100
        speed_percent_clipped = np.clip(speed_percent_float, -100, 100)
        action = 0 if speed_percent_clipped >= 0 else 1
        final_speed = int(abs(speed_percent_clipped))
        if final_speed < 5:
            final_speed = 0
        control_map[i] = [int(action), int(final_speed), 40]
    return control_map

5. 主循环三态切换:检测、追踪、惯性航行

真正把模块跑稳的是 `process_frame`,它把内部状态切换收敛成一个外部接口。 追踪成功时进入预测控制,追踪失败时根据条件进入惯性航行或直接停机。 惯性航行不是无限续命,而是 `COASTING_DURATION` 时间窗内复用上一次有效运动指令。 这一步对真实场景很有价值,目标短时遮挡时不至于立刻“刹死”。 同时它又有超时退出,避免系统在错误指令上长期漂移。

这套状态切换和说明文档里写的设计目标是一致的:把链路做成可控退化。 目标暂时丢了,系统先平稳过渡;确认找不回,再安全停止。 相比“检测不到就立刻归零”,这种做法更适合运动机器人。 对外调用者只拿一个返回值 `final_control_map`,不用关心内部细节。 这也是它能被后续系统复用的关键。

def process_frame(self, frame):
    was_tracking_before_this_frame = self.tracking
    if self.prev_gray is None:
        self.prev_gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        return {}

    frame_gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    if self.tracking:
        detected, center, _ = self._track_target(frame)
    else:
        detected, center, _ = self._detect_new_target(frame, frame_gray)

    final_control_map = {}
    if detected:
        self.is_coasting = False
        self._update_velocity(center)
        vx_pix, vy_pix = self.pixel_velocity
        pred_time_sec = self.prediction_time_ms / 1000.0
        predicted_center = (int(center[0] + vx_pix * pred_time_sec), int(center[1] + vy_pix * pred_time_sec))
        current_control_map = self._calculate_mecanum_control(predicted_center)
        final_control_map = current_control_map
        if self._is_motion_command(current_control_map):
            self.last_motion_control_map = current_control_map
    else:
        if was_tracking_before_this_frame and not self.is_coasting:
            self.is_coasting = True
            self.coasting_end_time = time.time() + self.COASTING_DURATION
        if self.is_coasting and time.time() < self.coasting_end_time:
            final_control_map = self.last_motion_control_map
        else:
            self.is_coasting = False
            self.last_motion_control_map = {}

    self.prev_gray = frame_gray.copy()
    return final_control_map
返回顶部