IROS 任务代码复盘:人形机器人状态机与赛场执行链路
这篇是我在赛后做的一次完整代码复盘。 我不讲泛化套路,直接对着自己实际跑过的 `main_mission.py` 按执行链路拆开讲。 证书记录的会期是 2025-10-19 到 2025-10-25,地点在杭州;那一周让我更确定一件事:决定成败的不是某个炫技动作,而是状态机把感知、导航、抓取和恢复稳定串起来。 所以正文会重点展开状态切换、命令接入、导航校准、投放定位和摔倒恢复。 下面所有代码都来自我的真实项目文件,不写伪代码。
1. 任务入口先定状态机:把赛程动作拆成可执行阶段
我这版代码最关键的一点,是先把任务拆成状态,再给每个状态写动作。 `WAIT_FOR_COMMAND -> MISSION_2 -> M3_START -> M3_NAV_A ... -> M3_FINISH` 这条主线非常清晰。 这比把动作脚本堆在一起稳得多,因为每一步都有明确的进入和退出条件。 现场调试时,一旦某段出问题,可以直接定位到具体状态,而不是全链路盲查。 在竞赛里,这种“可诊断结构”通常比单次动作炫技更有价值。
从代码组织看,我把任务二和任务三拆开,并预留 `SKIP_MISSION_2` 开关做联调。 这让我可以在不完整跑赛道的情况下快速迭代后半段抓取与投放流程。 这种做法非常实用,特别是在比赛前夕时间紧、场地窗口少的时候。 另外状态常量集中定义,避免了字符串散落造成的跳转错误。 这是典型的工程化状态机写法,而不是一次性比赛脚本。
def run(self):
class State:
WAIT_FOR_COMMAND = "wait_for_command"
MISSION_2 = "mission_2"
M3_START = "m3_start"
M3_NAV_A = "m3_nav_a"
M3_PROCESS_A = "m3_process_a"
M3_PREP_B = "m3_prep_b"
M3_NAV_B = "m3_nav_b"
M3_PROCESS_B = "m3_process_b"
M3_FINISH = "m3_finish"
FINISHED = "finished"
current_state = State.WAIT_FOR_COMMAND
while current_state != State.FINISHED:
if current_state == State.WAIT_FOR_COMMAND:
self.tasks = self._wait_for_vehicle_command()
if SKIP_MISSION_2:
current_state = State.M3_START
else:
current_state = State.MISSION_22. 命令接入:6000 端口接收车型机器人任务
我的命令接入不是硬编码,而是走 `socket`,这样车型机器人和人形机器人之间可以通过协议层解耦。 `_wait_for_vehicle_command` 里监听 `0.0.0.0:6000`,收到数据后按两个字符一组解析。 解析结果会映射成 `(颜色, 平台)` 任务列表,直接喂给后续状态机。 如果超时或解析失败,还有默认后备任务,避免主流程直接崩掉。 这就是竞赛程序里常说的“通信不稳定时仍可继续执行”。
这里我觉得写得比较稳的一点是白名单映射。 `COLOR_MAP` 先做语义归一,再进入任务层。 这样后面抓取和投放逻辑都只处理统一颜色词,不会被上游文本差异污染。 我还支持 `USE_MOCK_COMMANDS`,现场断链时能立刻切到调试模式。 对比赛这种高压场景,这是很有价值的兜底。
def _wait_for_vehicle_command(self):
HOST = '0.0.0.0'
PORT = 6000
COLOR_MAP = {'。: '红色', '。: '绿色', '。: '蓝色'}
tasks = []
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind((HOST, PORT))
s.listen(1)
s.settimeout(60.0)
conn, addr = s.accept()
with conn:
data = conn.recv(1024)
received_string = data.decode('utf-8')
instruction_pairs = [received_string[i:i+2] for i in range(0, len(received_string), 2)]
for pair in instruction_pairs:
color_char, target_char = pair[0], pair[1]
if color_char in COLOR_MAP:
tasks.append((COLOR_MAP[color_char], target_char))
if not tasks:
tasks = [("红色", "A"), ("蓝色", "B")]
return tasks3. 任务二障碍穿越:动作序列和航向校正交替执行
障碍穿越这段不是单纯播一串动作,而是“走一步、校一次航向”的策略。 我在 `mission_2_obstacle_crossing` 里每个关键动作后都接 `correct_heading`。 这等于把陀螺仪反馈持续引入流程,抑制累计偏航。 对于台阶和栏架这种误差放大的场景,这种交替执行非常必要。 否则后续抓取区起点会偏,任务三会被前序误差拖垮。
另外我把栏架位置做成配置项 `HURDLE_POSITION`,支持 `LEFT/CENTER/RIGHT` 三条路线。 这比硬编码路线更适配现场任务卡变化。 同一套程序通过配置就能切不同路径,不需要临场改核心代码。 在比赛里减少“临时改代码”本身就是稳定性收益。 这一点和状态机框架是配套的。
def mission_2_obstacle_crossing(self):
if not self._execute_motion("向前走路1", 3, description="向前走两步,接近台阶", silent_tts=True):
raise Exception("Sequence Failed")
self.correct_heading(with_grab=False)
if not self._execute_motion("climb3", 1, description="上第一级台。", speed="normal", silent_tts=True):
raise Exception("Sequence Failed")
self.correct_heading(with_grab=False)
if not self._execute_motion("down3", 1, description="下第一级台。", speed="normal", silent_tts=True):
raise Exception("Sequence Failed")
self.correct_heading(with_grab=False)
if HURDLE_POSITION == "LEFT":
self._execute_motion("左走一。", 2, description="向左平移,对准左侧栏。", silent_tts=True)
elif HURDLE_POSITION == "RIGHT":
self._execute_motion("右走一。", 2, description="向右平移,对准右侧栏。", silent_tts=True)
else:
self._execute_motion("向前走路1", 1, description="向前走三步,对准中央栏架", speed="normal", silent_tts=True)
self.correct_heading(with_grab=False)
self._execute_motion("cross3", 1, description="执行跨越动作", speed="normal", silent_tts=True)4. 导航对齐:AprilTag 定位 + 航向闭环
我在 `_align_to_nav_tag` 里做的是“先找标签,再一步到位调整”,逻辑很实用。 先用 AprilTag id=0 做导航基准,再按 `x` 偏差做左右平移,按 `z` 偏差做前后步进。 每次位移后再 `correct_heading`,避免连续移动导致航向飘掉。 这套策略的优点是计算量小、动作直接,适合实机现场快速执行。 相比复杂路径规划,这种规则式对齐更稳定、更可控。
转向函数 `turn` 也做了粗调和微调分层。 误差大于阈值时走 `turn around`,误差较小时走 `min左转/min右转` 并循环校正。 这能减少大角度时的回摆和小角度时的过冲。 同时它支持持物和非持物两种动作模板,避免抓取后姿态失控。 对任务三这种“拿着物块还要转身”的场景,这个细节很关键。
def _align_to_nav_tag(self, target_distance_m):
target_pos = None
all_tags = [{"id": tag_id, "size": self.APRILTAG_SIZE_M} for tag_id in [0, 1, 2, 3, 4]]
YanAPI.start_aprilTag_recognition(tags=all_tags)
status = YanAPI.get_aprilTag_recognition_status()
for tag in status['data']['AprilTagStatus']:
if tag['id'] == 0:
target_pos = (tag['position-x'], tag['position-z'])
break
YanAPI.stop_aprilTag_recognition()
pos_x, pos_z = target_pos
if abs(pos_x) > 0.02:
steps = max(1, int(abs(pos_x) / 0.06))
direction = "left" if pos_x < 0 else "right"
self._execute_motion("walk", steps, direction=direction, description="水平调整", silent_tts=True)
self.correct_heading(with_grab=False)
def correct_heading(self, with_grab=False):
return self.turn(self.target_heading, with_grab=with_grab)5. 放置区识别:三角度扫描 + 物理顺序融合
抓取后的投放环节里,我用 `_scan_for_baskets` 做多角度观测,这段非常有代表性。 头部按 `[60, 90, 120]` 三个角度扫描 AprilTag,把相机坐标转换到机体坐标。 然后不是直接相信单次观测,而是做“物理顺序 + 间距约束 + 加权融合”。 这一步能显著缓解视觉噪声,尤其是在比赛光照和遮挡不稳定时。 属于典型的“把工程先验加入感知结果”。
我比较认可我这里的融合思路:预测模型占 60%,实测值占 40%。 先保证整体队形不乱,再吸收实时观测做微调。 这种做法在标签偶发抖动时,比纯观测更稳,比纯理论更灵活。 最后得到的是可直接用于平移步数和前进步数计算的最终地图。 在任务三的放置成功率上,这种处理通常是决定性因素。
def _scan_for_baskets(self):
raw_measurements = {}
scan_angles = [60, 90, 120]
tags_to_detect = [{"id": tag_id, "size": self.APRILTAG_SIZE_M} for tag_id in [1, 2, 3]]
YanAPI.start_aprilTag_recognition(tags=tags_to_detect)
for angle_deg in scan_angles:
self._set_head_angle(angle_deg)
status = YanAPI.get_aprilTag_recognition_status()
for tag in status['data']['AprilTagStatus']:
tag_id = tag['id']
if tag_id not in raw_measurements:
cam_x, cam_z = tag['position-x'], tag['position-z']
head_angle_rad = math.radians(angle_deg - 90)
body_x = cam_x * math.cos(-head_angle_rad) - cam_z * math.sin(-head_angle_rad)
body_z = cam_x * math.sin(-head_angle_rad) + cam_z * math.cos(-head_angle_rad)
raw_measurements[tag_id] = {'x': body_x, 'z': body_z}
prediction_weight = 0.6
observation_weight = 0.4
final_map = {}
for tag_id in raw_measurements:
final_x = predicted_map[tag_id]['x'] * prediction_weight + raw_measurements[tag_id]['x'] * observation_weight
final_z = predicted_map[tag_id]['z'] * prediction_weight + raw_measurements[tag_id]['z'] * observation_weight
final_map[tag_id] = {'x': final_x, 'z': final_z}6. 异常兜底:摔倒检测与状态恢复重试
我这版代码另一个很实用的点,是把摔倒检测做进了主执行路径,而不是赛后补救。 `is_fallen` 用站立基准和实时欧拉角差值做判断,超过阈值就直接触发异常。 状态机里捕获 `FallDownException` 后会等待自动爬起,再重新校准航向并继续执行当前状态。 这让系统具备“被打断后可恢复”的能力,而不是一次失败就全盘结束。 在真实比赛里,这类恢复逻辑往往比单次动作精度更能保分。
此外我还有视觉服务安全关闭和强制刷新函数。 它们通过子进程超时机制,避免 API 卡死把主流程拖挂,这是很工程化的处理。 机器人比赛里最怕“整机卡死”,所以这种隔离式调用非常必要。 从结构看,我的核心思路是:主链路要可持续,外围服务可降级。 这个原则我建议继续保留到后续版本。
def is_fallen(self):
gyro_data = YanAPI.get_sensors_gyro()
current_euler_x = gyro_data['data']['gyro'][0]['euler-x']
current_euler_y = gyro_data['data']['gyro'][0]['euler-y']
delta_x = abs(current_euler_x - self.standing_euler_x)
delta_y = abs(current_euler_y - self.standing_euler_y)
if delta_x > self.FALL_DOWN_THRESHOLD or delta_y > self.FALL_DOWN_THRESHOLD:
return True
return False
while current_state != State.FINISHED:
try:
if current_state == State.WAIT_FOR_COMMAND:
self.tasks = self._wait_for_vehicle_command()
self.color_a, self.platform_a = self.tasks[0]
self.color_b, self.platform_b = self.tasks[1]
except FallDownException as e:
time.sleep(15.0)
self.calibrate_initial_heading()
print(" - 恢复完成,将重试当前状。 {}".format(current_state))