← 返回文章详情

main_mission.py

相关文档预览

#!/usr/bin/python3
# -*- coding: utf-8 -*-

import YanAPI # type: ignore
import time
import math
import socket

class FallDownException(Exception):
    """自定义异常,用于表示机器人摔倒事件。"""
    pass

# ------------------- 比赛配置 -------------------
SKIP_MISSION_2 = True # True 表示跳过任务二
HURDLE_POSITION = "CENTER" # 根据任务卡修改,可选值: "LEFT", "CENTER", "RIGHT"
USE_MOCK_COMMANDS = True # True = 立即使用模拟指令;False = 等待网络指令
FACE_NAMES_FOR_MISSION = ["张三", "李四", "王五"]
# ----------------------------------------------

class IrosRoboMission:
    def __init__(self):
        print("--- 正在初始化并连接机器人 ---")
        self.is_connected = False
        
        # --- 航向与容差配置 ---
        self.target_heading = 0.0 # 目标航向,初始为0度
        self.initial_yaw_offset = 0.0 # 初始偏移量
        self.TOLERANCE_DEGREES = 6.0 # <--- 航向容差
        self.FINE_TUNE_THRESHOLD = 30.0 # <--- 粗调与微调的切换阈值
        self.FALL_DOWN_THRESHOLD = 70.0 # 摔倒检测阈值
        self.standing_euler_x = 0.0
        self.standing_euler_y = 0.0
        
        self.BASKET_DATA = {
            "红色": {"description": "红色物块"},
            "绿色": {"description": "绿色物块"},
            "蓝色": {"description": "蓝色物块"}
        }
        self.VALID_BLOCK_COLORS = ["红色", "绿色", "蓝色"] # <-- 【新增】颜色白名单
        self.APRILTAG_SIZE_M = 0.05
        self.dynamic_color_map = {
            "红色": 1,
            "绿色": 2,
            "蓝色": 3
        }

        try:
            YanAPI.yan_api_init("127.0.0.1")
            print("YanAPI 初始化完成!")
            self._safe_clear_vision()
            # self._start_vision_stream_for_referee()
            
            try:
                print("正在开启“摔倒后自动爬起”功能...")
                response = YanAPI.set_robot_fall_management_state(enable=True)
                if response and response['code'] == 0:
                    print("  - 功能开启成功!")
                else:
                    print("  - 警告: 功能开启失败: {}".format(response))
            except Exception as e:
                print("  - 警告: 在执行辅助初始化时发生错误: {}".format(e))

            time.sleep(1)
            self.is_connected = True
            self.calibrate_initial_heading()

        except Exception as e:
            print(" 严重错误: 核心初始化失败: {}".format(e))
            self.is_connected = False

    def _get_raw_yaw(self):
        try:
            gyro_data = YanAPI.get_sensors_gyro()
            if gyro_data and gyro_data['code'] == 0:
                if gyro_data['data'].get('gyro'):
                    return gyro_data['data']['gyro'][0]['euler-z']
        except Exception as e:
            print("读取陀螺仪原始数据失败: {}".format(e))
        return self.initial_yaw_offset + self.target_heading

    def get_current_heading(self):
        raw_yaw = self._get_raw_yaw()
        heading = raw_yaw - self.initial_yaw_offset
        
        while heading > 180:
            heading -= 360
        while heading <= -180:
            heading += 360
        return heading

    def calibrate_initial_heading(self):
        print("\n--- 正在校准初始航向 (设定当前方向为 0 度) ---")
        time.sleep(1) 
        try:
            gyro_data = YanAPI.get_sensors_gyro()
            if gyro_data and gyro_data['code'] == 0 and gyro_data['data'].get('gyro'):
                # 记录Z轴用于航向
                self.initial_yaw_offset = gyro_data['data']['gyro'][0]['euler-z']
                # 【核心修正】记录X和Y轴作为“正常站立”的基准
                self.standing_euler_x = gyro_data['data']['gyro'][0]['euler-x']
                self.standing_euler_y = gyro_data['data']['gyro'][0]['euler-y']
                
                print("  - 初始陀螺仪读数 (X, Y, Z): ({:.2f}, {:.2f}, {:.2f})".format(self.standing_euler_x, self.standing_euler_y, self.initial_yaw_offset))
                print("  - 正常站立姿态基准已记录。")
            else:
                self.initial_yaw_offset = self._get_raw_yaw()
        except Exception as e:
            print("  - 警告: 校准时读取陀螺仪失败: {}".format(e))
            self.initial_yaw_offset = 0.0
        self.target_heading = 0.0
        print("  - 目标航向已重置为: {:.2f} 度".format(self.target_heading))


    def _execute_motion(self, name, repeat=1, direction="", description="", speed="slow", silent_tts=False):
        """
        【带摔倒监控的异步执行版】
        下达动作指令后,进入监控循环,如果摔倒则抛出异常。
        """
        print("\n- [执行动作] {}".format(description))
        
        if not silent_tts:
            YanAPI.sync_do_tts(description)
        
        # 1. 异步下达动作指令
        response = YanAPI.start_play_motion(name=name, repeat=repeat, direction=direction, speed=speed)
        if not (response and response['code'] == 0):
            print("  - [错误!] 动作 '{}' 启动失败!".format(name))
            raise Exception("动作启动失败")

        print("  - 动作 '{}' 已启动,进入监控...".format(name))
        
        # 2. 进入监控循环
        start_time = time.time()
        timeout = 1000 # 动作超时时间

        while time.time() - start_time < timeout:
            # a. 检查是否摔倒
            if self.is_fallen():
                print("\n 检测到摔倒!正在中断当前动作... ")
                YanAPI.stop_play_motion()
                raise FallDownException("机器人摔倒") # 抛出自定义异常

            # b. 检查动作是否正常完成
            state = YanAPI.get_current_motion_play_state()
            if state and state['code'] == 0 and state['data']['status'] == 'idle':
                print("  - [成功] '{}' 已完成。".format(description))
                return True

            time.sleep(0.5)

        # c. 如果超时
        print("  - [错误!] 动作 '{}' 执行超时!".format(name))
        YanAPI.stop_play_motion()
        raise Exception("动作执行超时")

    def turn(self, target_angle, with_grab=False, description=""):
        self.target_heading = target_angle
        while self.target_heading > 180:
            self.target_heading -= 360
        while self.target_heading <= -180:
            self.target_heading += 360

        print("\n--- [开始智能转向], 目标: {:.2f} 度 ---".format(self.target_heading))
        
        max_attempts = 40
        for i in range(max_attempts):
            current_heading = self.get_current_heading()
            error = self.target_heading - current_heading
            
            while error > 180:
                error -= 360
            while error <= -180:
                error += 360

            print("  - 尝试 {}/{}: 目标 {:.2f}, 当前 {:.2f}, 误差 {:.2f}".format(i+1, max_attempts, self.target_heading, current_heading, error))

            # 1. 检查是否已经完成转向
            if abs(error) <= self.TOLERANCE_DEGREES:
                print("  -  成功转向到目标角度!航向在容差范围内。")
                return True
            
            # 2. 【核心逻辑】根据误差大小,判断是粗调还是微调
            is_large_turn = abs(error) > self.FINE_TUNE_THRESHOLD

            # 3. 根据误差方向、是否持物、以及是否粗调,来决定并执行最终的动作
            if error > 0: # 目标在当前航向的逆时针方向,需要左转
                if with_grab:
                    action_name = "左转抓1" if is_large_turn else "min左转_抓"
                    self._execute_motion(name=action_name, repeat=1, description="持物左转", silent_tts=True)
                else: # 不持物
                    if is_large_turn:
                        print("  - [粗调模式] 使用官方API 'turn around' 左转。")
                        self._execute_motion(name="turn around", direction="left", repeat=1, description="API左转", silent_tts=True)
                    else:
                        print("  - [微调模式] 使用自定义 '左转'。")
                        self._execute_motion(name="min左转", repeat=1, description="微调左转", silent_tts=True)

            else: # 误差 <= 0,目标在当前航向的顺时针方向,需要右转
                if with_grab:
                    action_name = "右转抓" if is_large_turn else "min右转_抓"
                    self._execute_motion(name=action_name, repeat=1, description="持物右转", silent_tts=True)
                else: # 不持物
                    if is_large_turn:
                        print("  - [粗调模式] 使用官方API 'turn around' 右转。")
                        self._execute_motion(name="turn around", direction="right", repeat=1, description="API右转", silent_tts=True)
                    else:
                        print("  - [微调模式] 使用自定义 '右转'。")
                        self._execute_motion(name="min右转", repeat=1, description="微调右转", silent_tts=True)
        print("  -  转向失败:超过最大尝试次数 {}。".format(max_attempts))
        return False
        
    def turn_relative(self, relative_angle, with_grab=False, description=""):
        new_target = self.target_heading + relative_angle
        return self.turn(new_target, with_grab, description)

    def correct_heading(self, with_grab=False):
        print("\n--- [开始航向校准], 目标朝向: {:.2f} 度 ---".format(self.target_heading))
        return self.turn(self.target_heading, with_grab=with_grab)
    
    def _set_head_angle(self, angle, runtime=500):
        print("  - 转动头部至角度: {}".format(angle))
        YanAPI.set_servos_angles(angles={"NeckLR": angle}, runtime=runtime)
        time.sleep(runtime / 1000.0 + 0.2)

    def _scan_for_baskets(self):
        """
        【最终物理排序推算版】扫描函数:
        1. 通过转头找到所有标签。
        2. 以最近(Z最小)标签的深度为基准深度。
        3. 以x绝对值最小的标签的水平位置为基准水平,再根据它看到的左右顺序建立模型算出距离。
        4. 结合物理顺序和30cm间距,重建完美地图。
        5. (新) 将理论地图与实际观测值进行加权平均,得到最终结果。
        """
        print("\n--- [开始头部扫描 - 物理排序推算融合模式] ---")
        
        raw_measurements = {}
        scan_angles = [60,90,120]
        
        try:
            tags_to_detect = [{"id": tag_id, "size": self.APRILTAG_SIZE_M} for tag_id in [1, 2, 3]]
            YanAPI.start_aprilTag_recognition(tags=tags_to_detect)
            time.sleep(0.5)

            for angle_deg in scan_angles:
                self._set_head_angle(angle_deg)
                time.sleep(1.0) # 等待视觉稳定
                status = YanAPI.get_aprilTag_recognition_status()
                if status and status['code'] == 0 and status['data'].get('AprilTagStatus'):
                    for tag in status['data']['AprilTagStatus']:
                        tag_id = tag['id']
                        if tag_id not in raw_measurements:
                            cam_x, cam_z = tag['position-x'], tag['position-z']
                            head_angle_rad = math.radians(angle_deg - 90)
                            body_x = cam_x * math.cos(-head_angle_rad) - cam_z * math.sin(-head_angle_rad)
                            body_z = cam_x * math.sin(-head_angle_rad) + cam_z * math.cos(-head_angle_rad)
                            raw_measurements[tag_id] = {'x': body_x, 'z': body_z}
                            print("  - 发现 ID: {}, 初步坐标(X,Z): ({:.3f}, {:.3f})".format(tag_id, body_x, body_z))

            self._set_head_angle(90)
        finally:
            YanAPI.stop_aprilTag_recognition()

        if not raw_measurements:
            print("  - ❌ 扫描失败:未能看到任何置物筐标签。")
            return {}
            
        print("\n  - [初步扫描结果]: 共看到 {} 个标签。".format(len(raw_measurements)))
        
        # --- 阶段二:物理模型重建 ---
        print("\n  - [阶段二: 物理模型重建] ...")
        # (此部分与原版相同:计算理论值)
        base_z_tag_id = min(raw_measurements, key=lambda id: raw_measurements[id]['z'])
        base_z_value = raw_measurements[base_z_tag_id]['z']
        print("  - (深度基准) 选择 ID:{} 的深度作为基准深度: {:.3f}".format(base_z_tag_id, base_z_value))

        sorted_by_x = sorted(raw_measurements.items(), key=lambda item: item[1]['x'])
        sorted_ids = [item[0] for item in sorted_by_x]
        print("  - (物理顺序) 从左到右依次为: {}".format(sorted_ids))

        horizontal_anchor_id, horizontal_anchor_pos = min(raw_measurements.items(), key=lambda item: abs(item[1]['x']))
        print("  - (水平基准) 选择X绝对值最小的标签 ID:{} 作为水平基准,其X坐标为: {:.3f}".format(horizontal_anchor_id, horizontal_anchor_pos['x']))

        basket_spacing = 0.25
        anchor_index_in_sorted_list = sorted_ids.index(horizontal_anchor_id)
        leftmost_x = horizontal_anchor_pos['x'] - anchor_index_in_sorted_list * basket_spacing

        predicted_map = {}
        for i, tag_id in enumerate(sorted_ids):
            predicted_map[tag_id] = {
                'x': leftmost_x + i * basket_spacing,
                'z': base_z_value
            }

        # --- 阶段三:融合观测与推算 (新增逻辑) ---
        print("\n  - [阶段三: 融合观测值与理论值] ...")
        prediction_weight = 0.6  # 理论模型的可信度权重 (可调参数)
        observation_weight = 1.0 - prediction_weight
        print("  - 融合权重: 理论模型={:.0f}%, 实际观测={:.0f}%".format(
            prediction_weight*100,
            observation_weight*100
        ))

        final_map = {}
        for tag_id in sorted_ids:
            observed_pos = raw_measurements[tag_id]
            predicted_pos = predicted_map[tag_id]
            
            # 对X和Z坐标分别进行加权平均
            final_x = (predicted_pos['x'] * prediction_weight) + (observed_pos['x'] * observation_weight)
            final_z = (predicted_pos['z'] * prediction_weight) + (observed_pos['z'] * observation_weight)
            
            final_map[tag_id] = {'x': final_x, 'z': final_z}

        all_ids = {1, 2, 3}
        seen_ids = set(raw_measurements.keys())
        unseen_ids = all_ids - seen_ids
        if unseen_ids:
            print("  - 注意: 发现未被看到的标签: {}".format(unseen_ids))
            
        print("\n--- [扫描结束], 最终生成的融合地图: ---")
        for tag_id, data in sorted(final_map.items()):
            print("    - ID: {}, 最终坐标(X,Z): ({:.3f}, {:.3f})".format(tag_id, data['x'], data['z']))
            
        return final_map

    def recognize_faces(self):
        print("\n--- 开始人脸识别流程 ---")
        recognized_count = 0
        recognized_names = []
        max_attempts = 10
        print("将进行最多 {} 次识别尝试,目标完成3次不同人脸的识别。".format(max_attempts))
        for i in range(max_attempts):
            if recognized_count >= len(FACE_NAMES_FOR_MISSION):
                print("已成功识别所有目标人脸!")
                break
            print("\n第 {}/{} 次识别尝试...".format(i + 1, max_attempts))
            result_name = YanAPI.sync_do_face_recognition_value(type="recognition")
            if result_name and result_name not in recognized_names:
                recognized_count += 1
                recognized_names.append(result_name)
                recognition_msg = "我看到了 {}".format(result_name)
                print("识别成功: " + recognition_msg)
                YanAPI.sync_do_tts(recognition_msg)
                time.sleep(1)
            else:
                print("识别无结果或识别到重复人脸。")
                time.sleep(1)
        print("\n--- 人脸识别流程结束, 共识别出 {} 个不同的人脸: {} ---".format(recognized_count, recognized_names))
        return recognized_count > 0
    
    def is_fallen(self):
        """使用相对于“正常站立姿态”的角度差来判断是否摔倒。"""
        try:
            gyro_data = YanAPI.get_sensors_gyro()
            if gyro_data and gyro_data['code'] == 0 and gyro_data['data'].get('gyro'):
                current_euler_x = gyro_data['data']['gyro'][0]['euler-x']
                current_euler_y = gyro_data['data']['gyro'][0]['euler-y']
                
                # 【核心修正】计算当前角度与站立基准的差值
                delta_x = abs(current_euler_x - self.standing_euler_x)
                delta_y = abs(current_euler_y - self.standing_euler_y)

                if delta_x > self.FALL_DOWN_THRESHOLD or delta_y > self.FALL_DOWN_THRESHOLD:
                    print("  - [摔倒检测] 警告!检测到相对于站立姿态的大角度变化: dX={:.1f}, dY={:.1f}".format(delta_x, delta_y))
                    return True
        except Exception as e:
            print("  - 警告: 检测摔倒状态时出错: {}".format(e))
        return False
    
    def _safe_stop_vision(self):
        """
        一个安全的视觉服务关闭函数,使用多进程来防止主程序被无限期卡死。
        """
        import multiprocessing

        print("  - [视频流] 准备关闭...")
        
        # 创建一个子进程来执行可能会卡住的API调用
        p = multiprocessing.Process(target=YanAPI.do_visions_visible, args=("stop", "color_detect_remote"))
        p.start()

        # 等待子进程最多 3 秒
        p.join(3.0)

        # 如果3秒后子进程还在运行,说明它卡住了
        if p.is_alive():
            print("  - ⚠️ 警告: 关闭视觉服务的指令超时,可能已被卡住。正在强制终止...")
            p.terminate() # 强制杀死子进程
            p.join()
            print("  - 强制终止完成。程序将继续执行。")
        else:
            print("  - 视觉服务已成功关闭。")
    
    def _safe_clear_vision(self):
        """
        【开机自检】安全地尝试关闭可能残留的视频流服务。
        使用子进程执行,确保即使卡住也不会影响主程序启动。
        """
        import multiprocessing
        print("--- [自检] 正在清理残留的视觉服务...")

        # 定义一个专门用来关闭服务的简单函数
        def stop_task():
            try:
                YanAPI.do_visions_visible(operation="stop", task="color_detect_remote")
            except:
                pass

        # 创建并启动子进程
        p = multiprocessing.Process(target=stop_task)
        p.start()
        # 等待最多 3 秒
        p.join(3.0)

        if p.is_alive():
            print("  - [自检] 警告: 残留服务清理超时,将强制跳过。")
            p.terminate() # 杀掉卡住的子进程
            p.join()
        else:
            print("  - [自检] 清理完毕,系统环境干净。")

    def _get_visual_adjustment(self, target_color):
        """
        【V6 - 计算函数】
        此函数仅负责视觉计算,完全不涉及视频流的开启和关闭。
        它私下完成任务,并返回需要进行的调整。

        Args:
            target_color (str): 目标颜色。

        Returns:
            dict: 包含计算结果的字典。
                成功: {'success': True, 'steps': int, 'direction': str}
                失败: {'success': False}
        """
        print("  - [视觉计算] 正在私下定位物块...")
        
        # 快速头部扫描
        SCAN_ANGLES = [ 90 ]
        best_angle = -1

        for angle in SCAN_ANGLES:
            self._set_head_angle(angle, runtime=300)
            time.sleep(0.4)
            color_result = YanAPI.sync_do_color_recognition()
            if color_result and color_result['code'] == 0 and color_result['data'].get('color'):
                detected_colors_raw = [item['name'] for item in color_result['data']['color']]
                valid_detected_colors = [color for color in detected_colors_raw if color in self.VALID_BLOCK_COLORS]
                
                if target_color in valid_detected_colors:
                    best_angle = angle
                    print("    - 在角度 {}° 发现 {}!".format(angle, target_color))
                    break
        
        self._set_head_angle(90) # 头部回正

        if best_angle == -1:
            print("  - ❌ 计算失败: 未找到 {} 物块。".format(target_color))
            return {'success': False}

        # 根据角度误差计算平移量
        angle_error = best_angle - 90
        SIDE_STEP_DISTANCE_M = 0.04
        DISTANCE_TO_BLOCK_M = 0.30
        
        required_translation_m = DISTANCE_TO_BLOCK_M * math.tan(math.radians(angle_error))
        
        steps_to_move = 0
        direction = ""
        if abs(required_translation_m) > 0.02:
            steps_to_move = round(abs(required_translation_m) / SIDE_STEP_DISTANCE_M)
            if steps_to_move > 0:
                direction = "left" if angle_error > 0 else "right"

        print("  - [计算完成] 需向 {} 平移 {} 步。".format(direction if direction else "无", steps_to_move))
        return {'success': True, 'steps': steps_to_move, 'direction': direction}

    def _start_vision_stream_for_referee(self):
        """
        此函数只负责一件事:开启视频流并打印URL。
        它是一个“即发即忘”的操作,不检查返回值,也不关闭。
        """
        print("  - [视频流] 开启给裁判的直播...")
        try:
            YanAPI.do_visions_visible(operation="start", task="color_detect_remote")
            time.sleep(1.0) # 等待服务启动
        except Exception as e:
            print("  - ⚠️ 开启直播时发生错误 (已忽略): {}".format(e))

    def mission_2_obstacle_crossing(self):
        print("\n" + "="*50)
        print("--- 开始执行任务二:障碍穿越 ---")
        YanAPI.sync_do_tts("任务二,障碍穿越")
        
        recognition_success = self.recognize_faces()
        if not recognition_success:
            print("人脸识别环节失败或未完全成功,但将继续执行后续步骤。")
        else:
            print("人脸识别环节完成!")
            
        print("\n--- 步骤 2.2: 开始执行上下台阶序列 ---")
        try:
            if not self._execute_motion("向前走路1", 3, description="向前走两步,接近台阶", silent_tts=True): raise Exception("Sequence Failed")
            self.correct_heading(with_grab=False)
            if not self._execute_motion("climb3", 1, description="上第一级台阶",speed="normal", silent_tts=True): raise Exception("Sequence Failed")
            self.correct_heading(with_grab=False)
            if not self._execute_motion("climb3", 1, description="上第二级台阶",speed="normal", silent_tts=True): raise Exception("Sequence Failed")
            self.correct_heading(with_grab=False)
            if not self._execute_motion("down3", 1, description="下第一级台阶", speed="normal",silent_tts=True): raise Exception("Sequence Failed")
            self.correct_heading(with_grab=False)
            if not self._execute_motion("down3", 1, description="下第二级台阶", speed="normal",silent_tts=True): raise Exception("Sequence Failed")
            self.correct_heading(with_grab=False)

            print("\n--- 根据任务卡配置 [HURDLE_POSITION = {}],开始穿越栏架 ---".format(HURDLE_POSITION))

            if HURDLE_POSITION == "LEFT":
                if not self._execute_motion("左走一步", 2, description="向左平移,对准左侧栏架", silent_tts=True): raise Exception("Sequence Failed")
                self.correct_heading(with_grab=False)
                if not self._execute_motion("向前走路1", 1, description="向前走三步,对准中央栏架", speed="normal", silent_tts=True): raise Exception("Sequence Failed")
                self.correct_heading(with_grab=False)
                if not self._execute_motion("向前一步", 1, description="调整", speed="normal", silent_tts=True): raise Exception("Sequence Failed")
                self.correct_heading(with_grab=False)
                if not self._execute_motion("cross3", 1, description="执行跨越动作", speed="normal",silent_tts=True): raise Exception("Sequence Failed")

            elif HURDLE_POSITION == "RIGHT":
                if not self._execute_motion("右走一步", 2, description="向右平移,对准右侧栏架", silent_tts=True): raise Exception("Sequence Failed")
                self.correct_heading(with_grab=False)
                if not self._execute_motion("向前走路1", 1, description="向前走三步,对准中央栏架", speed="normal", silent_tts=True): raise Exception("Sequence Failed")
                self.correct_heading(with_grab=False)
                if not self._execute_motion("向前一步", 1, description="调整", speed="normal", silent_tts=True): raise Exception("Sequence Failed")
                self.correct_heading(with_grab=False)
                if not self._execute_motion("cross3", 1, description="执行跨越动作", speed="normal",silent_tts=True): raise Exception("Sequence Failed")
                
            else: # "CENTER"
                if not self._execute_motion("向前走路1", 1, description="向前走三步,对准中央栏架", speed="normal", silent_tts=True): raise Exception("Sequence Failed")
                self.correct_heading(with_grab=False)
                if not self._execute_motion("向前一步", 1, description="调整", speed="normal", silent_tts=True): raise Exception("Sequence Failed")
                self.correct_heading(with_grab=False)
                if not self._execute_motion("cross3", 1, description="执行跨越动作", speed="normal", silent_tts=True): raise Exception("Sequence Failed")

            self.correct_heading(with_grab=False)
            print("\n--- 障碍穿越序列成功完成! ---")

        except Exception as e:
            print("\n--- 障碍穿越序列中断!---")
            
        YanAPI.sync_do_tts("任务二已完成")
        print("--- 任务二完成 ---")
        return True

    def _wait_for_vehicle_command(self):
        print("\n--- 正在准备获取车型机器人的指令... ---")
        if USE_MOCK_COMMANDS:
            print("  - [调试模式] 跳过网络等待,立即使用默认指令。")
            tasks = [("绿色", "A"), ("红色", "B")]
            print("  - 使用模拟指令: {}".format(tasks))
            return tasks
        print("  - [真实模式] 正在启动服务器等待指令...")
        HOST = '0.0.0.0'
        PORT = 6000
        COLOR_MAP = { '红': '红色', '绿': '绿色', '蓝': '蓝色' }
        tasks = []
        try:
            with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
                s.bind((HOST, PORT))
                s.listen(1)
                print("  - 服务器启动,正在监听 {} 端口...".format(PORT))
                s.settimeout(60.0) 
                conn, addr = s.accept()
                with conn:
                    print("  - 已连接客户端:{}".format(addr))
                    data = conn.recv(1024)
                    if data:
                        received_string = data.decode('utf-8')
                        print("  - 收到原始消息: '{}'".format(received_string))
                        if len(received_string) % 2 == 0:
                            instruction_pairs = [received_string[i:i+2] for i in range(0, len(received_string), 2)]
                            for pair in instruction_pairs:
                                color_char, target_char = pair[0], pair[1]
                                if color_char in COLOR_MAP:
                                    full_color = COLOR_MAP[color_char]
                                    tasks.append((full_color, target_char))
                                else:
                                    print("  - 警告: 未知的颜色简称 '{}'".format(color_char))
                            print("  - ✅ 解析成功!生成的任务列表: {}".format(tasks))
                        else:
                            print("  - ❌ 解析失败:消息长度不是偶数!")
        except socket.timeout:
            print("  - ❌ 错误: 等待连接超时!")
        except Exception as e:
            print("  - ❌ 错误: 接收消息时发生异常: {}".format(e))
        if not tasks:
            print("  - ⚠️ 警告: 未能接收到有效指令,将使用默认模拟任务作为后备。")
            tasks = [("红色", "A"), ("蓝色", "B")]
        return tasks

    def _force_refresh_vision_service(self):
        """【硬重启辅助函数】只负责刷新,不负责启动。"""
        try:
            print("    - [硬重启] 正在强制刷新视觉服务...")
            YanAPI.stop_aprilTag_recognition()
            YanAPI.do_visions_visible("start", "face_recognition_remote")
            YanAPI.do_visions_visible("stop", "face_recognition_remote")
            print("    - [硬重启] 视觉服务已刷新。")
        except Exception as e:
            print("    - 警告: 强制刷新视觉服务时发生异常: {}".format(e))

    def _align_to_nav_tag(self, target_distance_m):
        print("\n--- [开始导航 - 一步到位模式], 目标距离: {}m ---".format(target_distance_m))
        original_target_heading = self.target_heading
        
        try:
            target_pos = None
            max_search_attempts = 10
            print("  - [阶段一: 静默搜索] ...")
            all_tags = [{"id": tag_id, "size": self.APRILTAG_SIZE_M} for tag_id in [0, 1, 2, 3, 4]]
            YanAPI.start_aprilTag_recognition(tags=all_tags)
            
            for i in range(max_search_attempts):
                print("  - [搜索中] 第 {}/{} 次检查视野...".format(i + 1, max_search_attempts))
                status = YanAPI.get_aprilTag_recognition_status()
                if status and status['code'] == 0 and status['data'].get('AprilTagStatus'):
                    for tag in status['data']['AprilTagStatus']:
                        if tag['id'] == 0:
                            print("  -  [搜索成功] 已锁定导航码!")
                            target_pos = (tag['position-x'], tag['position-z'])
                            break
                if target_pos: break
            
            YanAPI.stop_aprilTag_recognition()
            if not target_pos:
                print("  -  [搜索失败] 跳过本次导航。")
                return

            pos_x, pos_z = target_pos
            print("  - [阶段二: 一步到位] 基于位置 ({:.3f}, {:.3f}) 进行移动".format(pos_x, pos_z))

            current_heading = self.get_current_heading()
            if abs(current_heading) > self.TOLERANCE_DEGREES:
                print("  - [执行1: 姿态] 校正回0度...")
                self.correct_heading(with_grab=False)
            if abs(pos_x) > 0.02:
                steps = max(1, int(abs(pos_x) / 0.06))
                direction = "left" if pos_x < 0 else "right"
                print("  - [执行2: 水平] 目标在 {} 边,向 {} 平移 {} 步...".format("左" if pos_x < 0 else "右", direction, steps))
                self._execute_motion("walk", steps, direction=direction, description="水平调整", silent_tts=True)
                self.correct_heading(with_grab=False)
            else:
                print("  - [决策] 水平位置已在容差范围内,无需调整。")

            self._execute_motion("reset_抓", 1, description="恢复站姿", silent_tts=True)
            error_z = pos_z - target_distance_m
            if abs(error_z) > 0.07:
                steps = max(1, int(abs(error_z) / 0.15))
                direction = "forward" if error_z > 0 else "backward"
                print("  - [执行3: 距离] 向 {} 移动 {} 步...".format(direction, steps))
                for i in range(steps):
                    print("    - 距离调整第 {}/{} 步".format(i + 1, steps))
                    self._execute_motion("向前走路1", 1, direction=direction, description="距离调整", silent_tts=True)
                    self.correct_heading(with_grab=False)
            else:
                print("  - [决策] 距离已在容差范围内,无需调整。")

            print("  -  [一步到位] 所有导航动作执行完毕。")

        finally:
            print("  - [导航结束] 恢复原始航向...")
            self.target_heading = original_target_heading
            self.correct_heading(with_grab=False)
            YanAPI.stop_aprilTag_recognition()

    def run(self):
        if not self.is_connected:
            print("未连接到机器人,程序退出。")
            return

        class State:
            WAIT_FOR_COMMAND = "wait_for_command"
            MISSION_2 = "mission_2"
            M3_START = "m3_start"
            M3_NAV_A = "m3_nav_a"
            M3_PROCESS_A = "m3_process_a"
            M3_PREP_B = "m3_prep_b"
            M3_NAV_B = "m3_nav_b"
            M3_PROCESS_B = "m3_process_b"
            M3_FINISH = "m3_finish"
            FINISHED = "finished"

        current_state = State.WAIT_FOR_COMMAND
        self.tasks = None
        self.color_a, self.platform_a = None, None
        self.color_b, self.platform_b = None, None
        
        while current_state != State.FINISHED:
            try:
                print("\n" + "="*60)
                print(">>> [状态机] 进入状态: {} <<<".format(current_state))
                print("="*60)

                if current_state == State.WAIT_FOR_COMMAND:
                    self.tasks = self._wait_for_vehicle_command()
                    self.color_a, self.platform_a = self.tasks[0]
                    self.color_b, self.platform_b = self.tasks[1]
                    print("✅ 指令接收成功!任务已加载。")

                    if SKIP_MISSION_2:
                        print("\n" + "="*50 + "\n--- 已根据配置跳过任务二 ---\n" + "="*50)
                        current_state = State.M3_START
                    else:
                        current_state = State.MISSION_2

                elif current_state == State.MISSION_2:
                    self.mission_2_obstacle_crossing()
                    current_state = State.M3_START
                
                elif current_state == State.M3_START:
                    task_announcement_a = "我要去{}号存储区,分拣{}色块".format(self.platform_a, self.color_a)
                    print("播报任务: " + task_announcement_a)
                    YanAPI.sync_do_tts(task_announcement_a)
                    current_state = State.M3_NAV_A

                elif current_state == State.M3_NAV_A:
                    print("\n--- 启动A点导航流程 ---")
                    self._align_to_nav_tag(target_distance_m=0.60)
                    current_state = State.M3_PROCESS_A

                elif current_state == State.M3_PROCESS_A:
                    self.turn_relative(90, with_grab=False, description="左转面向A高台")
                    self.correct_heading(with_grab=False)
                    self._execute_motion("向前走路1", 1, description="前进至A高台前", silent_tts=True)
                    self.correct_heading(with_grab=False)
                    self._execute_motion("向前走路1", 1, description="前进至A高台前", silent_tts=True)
                    self.correct_heading(with_grab=False)
                    self._execute_motion("蹲1", 1, description="执行下蹲动作", silent_tts=True)
                    # adjustment = self._get_visual_adjustment(self.color_a)
                    # if not adjustment['success']:
                    #     print("A平台物块视觉计算失败")
                    # self._start_vision_stream_for_referee()
                    # if adjustment['steps'] > 0:
                    #     self._execute_motion("walk", repeat=adjustment['steps'], direction=adjustment['direction'], description="视觉伺服微调", speed="very slow", silent_tts=True)
                    #     self.correct_heading(with_grab=False)
                    self._execute_motion("向前一步", 1, description="调整", silent_tts=True)
                    self.correct_heading(with_grab=False)
                    self._execute_motion("抓物块3", 1, description="执行抓取动作", silent_tts=True)
                    print("\n--- [状态] 执行A物块的放置流程 ---")
                    self.correct_heading(with_grab=True)
                    self.turn_relative(180, with_grab=True, description="掉头面向置物筐区域")
                    basket_locations = self._scan_for_baskets()
                    target_tag_id_a = self.dynamic_color_map[self.color_a]
                    if target_tag_id_a in basket_locations:
                        target_pos = basket_locations[target_tag_id_a]
                        distance_z, distance_x = target_pos['z'], target_pos['x']
                        print("  - 路径规划: 需前进 {:.2f}m, 平移 {:.2f}m".format(distance_z, distance_x))
                        steps_lr = 0
                        steps_fb = 0
                        HORIZONTAL_TOLERANCE = 0
                        if abs(distance_x) > HORIZONTAL_TOLERANCE:
                            steps_lr = int(abs(distance_x) / 0.031)
                        
                        target_distance = distance_z - 0.15
                        if target_distance > 0:
                            steps_fb = math.ceil(target_distance / 0.20)
                        print("  - [决策] 总计需要平移 {} 步, 前进 {} 步。".format(steps_lr, steps_fb))
                        if steps_lr > 0:
                            print("  - 水平偏差较大,执行宏观平移...")
                            action_lr = "右走抓1" if distance_x > 0 else "左走抓1"
                            for i in range(steps_lr):
                                self._execute_motion(action_lr, 1, description="宏观平移 {}/{}".format(i+1, steps_lr), silent_tts=True)
                                if (i + 1) % 5 == 0 or (i + 1) == steps_lr:
                                    print("    - >> 已执行 {} 步,进行一次航向校准...".format(i + 1))
                                    self.correct_heading(with_grab=True)
                        else:
                            print("  - 水平偏差在容差范围内,无需平移。")
                        self.correct_heading(with_grab=True)
                        if steps_fb > 0:
                            for i in range(steps_fb):
                                self._execute_motion("向前走路抓1", 1, speed="normal", description="持物前进 {}/{}".format(i+1, steps_fb), silent_tts=True)
                                self.correct_heading(with_grab=True)
                            if self.color_a == "蓝色":
                                self._execute_motion("右走抓1", 1, speed="slow", description="任务间特殊微调", silent_tts=True)
                                self.correct_heading(with_grab=True)
                            self._execute_motion("向前一步抓", 1, speed="normal", description="持物前进 {}/{}".format(i+1, steps_fb), silent_tts=True)
                            self.correct_heading(with_grab=True)
                    else:
                        print("  - 严重错误:未能扫描到目标 '{}' 的置物筐!将盲放。".format(self.color_a))
                    
                    self._execute_motion("放0", 1, description="执行放置动作", silent_tts=True)
                    print("--- A-高台 任务放置完成 ---")
                    current_state = State.M3_PREP_B
                
                elif current_state == State.M3_PREP_B:
                    self._force_refresh_vision_service()
                    task_announcement_b = "我要去{}号存储区,分拣{}色块".format(self.platform_b, self.color_b)
                    print("播报任务: " + task_announcement_b)
                    YanAPI.sync_do_tts(task_announcement_b)
                    self.turn_relative(90, with_grab=False, description="左转大致面向导航码")
                    current_state = State.M3_NAV_B

                elif current_state == State.M3_NAV_B:
                    self._align_to_nav_tag(target_distance_m=0.15)
                    current_state = State.M3_PROCESS_B

                elif current_state == State.M3_PROCESS_B:
                    self.turn_relative(90, with_grab=False, description="左转面向B高台")
                    self.correct_heading(with_grab=False)
                    self._execute_motion("向前走路1", 1, description="前进至B高台前", silent_tts=True)
                    self.correct_heading(with_grab=False)
                    self._execute_motion("向前走路1", 1, description="前进至B高台前", silent_tts=True)
                    self.correct_heading(with_grab=False)
                    if self.color_a == "蓝色":
                        print("  - [特殊策略] 检测到第一个任务是蓝色,执行一次向左平移以优化位置...")
                        self._execute_motion("walk", 1, direction="left", speed="slow", description="任务间特殊微调", silent_tts=True)
                        self._execute_motion("reset_抓", 1, description="恢复站姿", silent_tts=True)
                    self._execute_motion("蹲1", 1, description="执行下蹲动作", silent_tts=True)
                    # adjustment = self._get_visual_adjustment(self.color_b)
                    # if not adjustment['success']:
                    #     print("B平台物块视觉计算失败!")
                    # # 3. 【核心修改】第二步:再公开直播 (这会重置/替换掉上一个直播流)
                    # self._start_vision_stream_for_referee()
                    # YanAPI.sync_do_tts("确认成功")

                    # # 4. 执行之前算好的微调
                    # if adjustment['steps'] > 0:
                    #     self._execute_motion("walk", repeat=adjustment['steps'], direction=adjustment['direction'], description="视觉伺服微调", speed="very slow", silent_tts=True)
                    #     self.correct_heading(with_grab=False)

                    self._execute_motion("向前一步", 2, description="调整", silent_tts=True)
                    self.correct_heading(with_grab=False)
                    self._execute_motion("抓物块3", 1, description="执行抓取动作", silent_tts=True)

                    print("\n--- [状态] 执行B物块的放置流程 ---")
                    self.correct_heading(with_grab=True)
                    self.turn_relative(180, with_grab=True, description="掉头面向置物筐区域")
                    basket_locations = self._scan_for_baskets()
                    target_tag_id_b = self.dynamic_color_map[self.color_b]
                    if target_tag_id_b in basket_locations:
                        target_pos = basket_locations[target_tag_id_b]
                        distance_z, distance_x = target_pos['z'], target_pos['x']
                        print("  - 路径规划: 需前进 {:.2f}m, 平移 {:.2f}m".format(distance_z, distance_x))
                        steps_lr = 0
                        steps_fb = 0
                        HORIZONTAL_TOLERANCE = 0
                        if abs(distance_x) > HORIZONTAL_TOLERANCE:
                            steps_lr = int(abs(distance_x) / 0.031)
                        
                        target_distance = distance_z - 0.15
                        if target_distance > 0:
                            steps_fb = math.ceil(target_distance / 0.20)
                        print("  - [决策] 总计需要平移 {} 步, 前进 {} 步。".format(steps_lr, steps_fb))
                        if steps_lr > 0:
                            print("  - 水平偏差较大,执行宏观平移...")
                            action_lr = "右走抓1" if distance_x > 0 else "左走抓1"
                            for i in range(steps_lr):
                                self._execute_motion(action_lr, 1, description="宏观平移 {}/{}".format(i+1, steps_lr), silent_tts=True)
                                if (i + 1) % 5 == 0 or (i + 1) == steps_lr:
                                    print("    - >> 已执行 {} 步,进行一次航向校准...".format(i + 1))
                                    self.correct_heading(with_grab=True)
                        else:
                            print("  - 水平偏差在容差范围内,无需平移。")
                        self.correct_heading(with_grab=True)
                        if steps_fb > 0:
                            for i in range(steps_fb):
                                self._execute_motion("向前走路抓1", 1, speed="normal", description="持物前进 {}/{}".format(i+1, steps_fb), silent_tts=True)
                                self.correct_heading(with_grab=True)
                            if self.color_b == "蓝色":
                                self._execute_motion("右走抓1", 1, speed="slow", description="任务间特殊微调", silent_tts=True)
                            self.correct_heading(with_grab=True)
                            self._execute_motion("向前一步抓", 1, speed="normal", description="持物前进 {}/{}".format(i+1, steps_fb), silent_tts=True)
                            self.correct_heading(with_grab=True)
                    else:
                        print("  - 严重错误:未能扫描到目标 '{}' 的置物筐!将盲放。".format(self.color_b))
                    self._execute_motion("放0", 1, description="执行放置动作", silent_tts=True)
                    print("--- B-高台 任务放置完成 ---")
                    current_state = State.M3_FINISH

                elif current_state == State.M3_FINISH:
                    print("\n" + "="*50)
                    YanAPI.sync_do_tts("任务三已完成")
                    print("--- 任务三全部分拣任务执行完毕 ---")
                    current_state = State.FINISHED

            except FallDownException as e:
                print("\n [状态机] 捕获到摔倒事件: {} ".format(e))
                print("  - 等待15秒让机器人完成自动爬起...")
                time.sleep(15.0)
                print("  - 爬起完成,重新校准航向...")
                self.calibrate_initial_heading()
                print("  - 恢复完成,将重试当前状态: {}".format(current_state))
            
            except Exception as e:
                print("\n [状态机] 捕获到未知严重错误: {} ".format(e))
                print("  - 任务流程中断。")
                current_state = State.FINISHED

        print("\n所有任务已完成或终止。")
        self.shutdown()
    
    def shutdown(self):
        if self.is_connected:
            print("\n" + "="*50)
            print("--- 所有任务完成,正在关闭 ---")
            YanAPI.stop_play_motion()
            print("脚本执行完毕。")

if __name__ == '__main__':
    mission = IrosRoboMission()
    mission.run()
返回顶部