RLinf复现RECAP(二):优势标签驱动pi0.5的CFG训练
前言、RECAP使用CFG引导模型生成高优势动作
CFG全称为Classifier-Free Guidance(无分类器引导),最初主要用于扩散生成模型,通过比较条件预测和无条件预测,让生成结果更符合指定条件。例如,在图像生成任务中,模型可以分别计算,
无条件预测:不提供文本描述 条件预测:提供文本描述推理时,模型沿着(条件预测-无条件预测)的方向调整输出,从而加强文本条件的影响。
RECAP将这一思路用于机器人动作生成,但条件不再是物体类别或图像描述,而是动作的优势信息。在RLinf实现中,同一个机器人状态会对应两种语言输入,
普通任务输入: pick up the black bowl 正优势任务输入: pick up the black bowl Advantage: positive前者用于学习整体数据中的动作分布,后者用于学习高优势样本中的动作分布。
训练时,模型需要同时学会这两种预测。高优势样本大部分会追加正优势文本,少量高优势样本会随机去掉优势文本;低优势样本则使用原始任务指令。
推理时,模型分别计算普通动作场和正优势动作场,再根据引导系数进行融合,
最终动作场 = 普通动作场 + 引导系数 ×(正优势动作场 - 普通动作场)当引导系数为0时,只使用普通动作场;当引导系数为1时,结果等于正优势动作场;当引导系数大于1时,会进一步放大正优势条件的影响。
RLinf示例默认使用cfgrl_guidance_scale: 1.0,直接采用正优势条件下的动作预测。
CFG在这里并不是根据Advantage数值给动作损失加权。Advantage的作用是决定训练样本使用哪种语言输入,动作模型仍然使用pi0.5原有的Flow Matching Loss。
一、优势标签在数据加载阶段被转换为文本条件
CFG训练相关代码主要位于,
examples/recap/cfg/ ├── train_cfg.py ├── run_cfg_sft.sh └── config/ └── libero_cfg_openpi.yaml rlinf/ ├── data/datasets/recap/ │ └── cfg_model.py ├── workers/sft/ │ └── fsdp_cfg_worker.py └── models/embodiment/openpi_cfg/ └── openpi_cfg_action_model.py整体数据流为,
LeRobot轨迹 -> advantages_{tag}.parquet -> 按照episode_index和frame_index对齐 -> 构造原始、正优势和负优势三种文本 -> 根据advantage标签选择当前训练输入 -> 使用Flow Matching Loss更新pi0.51、Advantage文件只提供布尔标签
配置中的,
data: advantage_tag: fail300_N10_q30对应读取,
meta/advantages_fail300_N10_q30.parquetFSDPCfgWorker会将Advantage文件转换成一个查询表,
lookup = dict( zip( zip( adv_df["episode_index"].values.astype(int), adv_df["frame_index"].values.astype(int), ), adv_df["advantage"].values.astype(bool), ) )最终形成,
(episode_index, frame_index) → True 或 False读取的是布尔列advantage,不是连续列advantage_continuous。进入策略训练后,模型只知道当前样本属于正优势还是非正优势,并不知道它的连续Advantage到底是0.01、0.1还是0.5。
2、AdvantagePreservingDataset防止标签被数据变换删除
OpenPI的数据处理包含Repack、Normalize和Tokenize等变换。部分变换只保留模型需要的标准字段,因此自定义的advantage字段可能被删除。 RLinf使用AdvantagePreservingDataset包装变换后的数据集,
final_dataset = AdvantagePreservingDataset( base_dataset=base_dataset, transformed_dataset=transformed_dataset, advantages_lookup=advantages_lookup, )每次获取样本时,先读取已经完成OpenPI变换的数据,
sample = self._transformed_dataset[idx]然后重新把Advantage标签放回样本,
sample["advantage"] = self._advantage_by_index[idx]如果Advantage文件与当前数据集不匹配,例如某些帧没有对应标签,代码会直接报错。因此,重新删除轨迹、筛选Episode或修改帧编号后,原来的Advantage文件通常不能直接复用。
3、每条任务指令会生成三种文本
TokenizePromptWithGuidance会为同一个任务构造三份文本。 假设原始任务为,
pick up the black bowl and place it on the plate普通文本为,
pick up the black bowl and place it on the plate正优势文本为,
pick up the black bowl and place it on the plate Advantage: positive负优势文本为,
pick up the black bowl and place it on the plate Advantage: negative对应代码为,
tokens, token_masks = self.tokenizer.tokenize( prompt, state, ) positive_prompt = ( f"{prompt}\nAdvantage: positive" ) negative_prompt = ( f"{prompt}\nAdvantage: negative" ) positive_tokens, positive_masks = ( self.tokenizer.tokenize( positive_prompt, state, ) ) negative_tokens, negative_masks = ( self.tokenizer.tokenize( negative_prompt, state, ) )处理完成后,一个样本同时携带,
tokenized_prompt tokenized_positive_guidance_prompt tokenized_negative_guidance_prompt advantage actions模型在forward中根据Advantage和路由配置,从三种文本中选择一种。
4、多个数据集可以按照样本数量混合
CFG训练可以同时使用SFT和Rollout数据,
data: train_data_paths: - dataset_path: /path/to/sft_dataset type: sft weight: 1.0 - dataset_path: /path/to/rollout_dataset type: rollout weight: 1.0 balance_dataset_weights: true当balance_dataset_weights=true时,实际数据集采样权重按照下面的方式计算,
sampling_weight = ( configured_weight * dataset_length )然后再归一化。假设两个数据集的配置权重都是 1.0,
SFT 数据:10000 帧 Rollout 数据:30000 帧数据集被选择的概率大约为,
SFT:25% Rollout:75%由于每次选中数据集后,又会在其中均匀选择一帧,因此两个数据集中的单帧样本具有接近的被采样概率。 如果关闭balance_dataset_weights,两个数据集会主要按照配置中的weight决定选择概率,小数据集中的单条样本会被更频繁地重复采样。
二、布尔Advantage决定样本语言输入
模型中的路由由下面的函数完成,
compute_cfg_routing_masks( advantage, positive_only_conditional, unconditional_prob, )默认配置为,
actor: model: openpi: positive_only_conditional: true unconditional_prob: 0.1这两个参数共同决定正负样本如何进入条件分支。
1、Unconditional不表示删除任务指令
RLinf中的无条件分支不是完全删除语言,而是只删除Advantage条件,仍然保留原始任务指令。
两种输入分别是,
无条件分支:任务指令 正优势条件分支:任务指令 + Advantage: positive2、positive_only_conditional只允许正样本使用优势条件
当positive_only_conditional: true,路由代码为,
positive_mask = advantage negative_mask = ~positive_mask positive_conditional_mask = ( positive_mask & (random_values > unconditional_prob) ) negative_conditional_mask = ( torch.zeros_like(positive_mask) )RLinf 提供的LIBERO 示例配置使用 unconditional_prob=0.1,路由关系如下,
| Advantage 标签 | 使用正优势文本 | 使用原始文本 |
|---|---|---|
True | 约 90% | 约 10% |
False | 0% | 100% |
最终文本选择代码为,
final_lang_tokens = torch.where( positive_conditional_mask.unsqueeze(-1), positive_guidance_lang_tokens, lang_tokens, )负优势样本使用的是原始任务指令,而不是Advantage: negative。
3、条件丢弃让模型同时学习两个动作分布
正优势样本不能全部使用正优势文本,否则模型只能学到,
任务指令 + Advantage: positive → 动作却无法在相同的数据分布上学到,
任务指令 → 动作因此,RLinf会按照unconditional_prob随机去掉部分正优势条件。例如一个Batch中有100条样本,其中,
正优势样本:40 条 负优势样本:60 条设置,
unconditional_prob = 0.1则期望路由结果大约为,
36条正样本:任务指令 + Advantage: positive 4条正样本:只使用任务指令 60条负样本:只使用任务指令4、关闭positive_only_conditional后会使用负优势文本
当positive_only_conditional: false,正负样本都可以进入对应条件分支,
guidance_mask = ( random_values > unconditional_prob ) positive_conditional_mask = ( positive_mask & guidance_mask ) negative_conditional_mask = ( negative_mask & guidance_mask )此时,
正样本:任务指令 + Advantage: positive 负样本:任务指令 + Advantage: negative 被随机丢弃条件的样本:只使用任务指令对应的文本选择为,
guidance_lang_tokens = torch.where( positive_mask.unsqueeze(-1), positive_guidance_lang_tokens, negative_guidance_lang_tokens, ) final_lang_tokens = torch.where( conditional_mask.unsqueeze(-1), guidance_lang_tokens, lang_tokens, )5、Advantage只作用在语言输入
Advantage只负责选择语言输入,
Advantage=True,可能追加 Advantage: positive Advantage=False,使用原始任务指令文本选择完成后,所有样本仍然使用正常的Flow Matching Loss。
三、策略训练仍然使用pi0.5的Flow Matching Loss
训练入口为,
examples/recap/cfg/train_cfg.py启动命令为,
bash examples/recap/cfg/run_cfg_sft.sh \ libero_cfg_openpitrain_cfg.py创建FSDPCfgWorker,再交给通用的SFTRunner执行训练,
actor_group = ( FSDPCfgWorker .create_group(cfg) .launch( cluster, name=cfg.actor.group_name, placement_strategy=actor_placement, ) ) runner = SFTRunner( cfg=cfg, actor=actor_group, ) runner.init_workers() runner.run()1、CFG模型从pi0.5检查点初始化
主要模型配置为,
actor: model: model_path: /path/to/pi05_base_pytorch model_type: cfg_model add_value_head: false openpi: config_name: pi05_libero train_expert_only: false positive_only_conditional: true unconditional_prob: 0.1 guidance_type: positive cfgrl_guidance_scale: 1.0model_type: cfg_model会创建,
OpenPi0ForCFGActionPrediction模型先读取pi0.5配置,再使用CFG配置覆盖相关参数,
actor_model_config = ( OpenPi0Config( **actor_model_config.__dict__ ) ) for key, val in cfg.openpi.items(): actor_model_config.__dict__[key] = val随后从model_path加载原始pi0.5权重。这一阶段不会增加Value Head,
add_value_head: false价值模型已经在上一步完成了优势标签计算,策略训练只需要动作模型。
2、Flow Matching目标与SFT时一致
给定真实动作actions和随机噪声noise,代码先采样时间t,构造中间状态,
x_t = ( time * noise + (1 - time) * actions )目标动作场为,
u_t = noise - actions模型根据图像、机器人状态、选中的语言条件和x_t,预测动作场。省略图像掩码、语言掩码和缓存等参数后,前向过程可以简化表示为,
v_t = model(images, state, selected_prompt, x_t, time)损失仍然是均方误差,
per_element_loss = F.mse_loss( u_t, v_t, reduction="none", ) flow_loss = per_element_loss.mean()优势条件只改变,
selected_prompt不会改变真实动作、噪声目标或Flow Matching公式。
3、一个Batch只为每条样本执行一条训练分支
训练时,代码不会对同一条样本同时执行条件和无条件两次前向传播。
它先为每条样本确定路由(正优势条件分支 or 原始任务分支),然后将选中的文本组成一个Batch,只执行一次Flow Matching前向,
flow_loss, per_sample_loss = ( self._compute_flow_losses( images=images, state=state, actions=actions, lang_tokens=final_lang_tokens, lang_masks=final_lang_masks, time=time, noise=noise, ) )因此,CFG训练的主要额外开销来自,
- 每条任务需要额外分词正负优势文本;
- Batch中包含不同的文本条件;
- 需要记录不同路由的统计指标。
训练阶段不会像推理阶段一样,在每个去噪步骤同时计算两条动作分支。
4、FSDPCfgWorker负责梯度累积和分布式训练
默认配置为,
actor: micro_batch_size: 32 global_batch_size: 512 optim: lr: 1.0e-5 weight_decay: 1.0e-10 clip_grad: 1.0 lr_scheduler: cosine lr_warmup_steps: 5000 total_training_steps: 30000梯度累积次数为,
global_batch_size ÷ micro_batch_size ÷ GPU 数量例如使用4张GPU,
512 ÷ 32 ÷ 4 = 4每张GPU连续处理4个Micro Batch后,再执行一次优化器更新。代码会检查,
global_batch_size % ( micro_batch_size * world_size ) == 0如果不能整除,会直接报错。
5. 路由比例
模型会分别统计,
conditional_count unconditional_count positive_label_count negative_label_count positive_conditional_count positive_unconditional_count negative_conditional_count negative_unconditional_count再计算,
conditional_ratio unconditional_ratio positive_label_ratio negative_label_ratio positive_effective_conditional_ratio positive_effective_unconditional_ratio同时还会分别记录不同路由下的损失,
conditional_loss unconditional_loss positive_conditional_loss positive_unconditional_loss negative_conditional_loss negative_unconditional_loss在默认配置下,应重点检查,
negative_conditional_ratio它应该接近0,因为负优势样本不会进入条件分支。还应检查,
positive_effective_unconditional_ratio它应当接近配置中的,
unconditional_prob = 0.1四、推理阶段会融合普通动作场和正优势动作场
训练完成后,推理不再需要读取Advantage文件,也不需要运行Value Model。对于每个任务,模型会构造两种输入,
普通分支:任务指令 正优势分支:任务指令 + Advantage: positive随后分别计算两条动作场。
1、两个语言前缀会分别建立缓存
普通分支先编码,
prefix_embs_uncond = self.embed_prefix( images, image_masks, original_prompt, original_prompt_mask, )正优势分支再编码,
prefix_embs_cond = self.embed_prefix( images, image_masks, positive_prompt, positive_prompt_mask, )两条分支都会生成各自的KV Cache,
past_key_values_uncond past_key_values_cond这样后续的多个Flow Matching去噪步骤可以复用图像和语言前缀,不需要每一步都重新编码完整输入。
2、每个去噪步骤都会融合两条动作场
动作采样从随机噪声开始,
x_t = noise时间从1逐步积分到0,
dt = -1.0 / num_steps每一步先计算普通动作场,
v_t_uncond = self.denoise_step( state, prefix_uncond, x_t, time, )再计算正优势动作场,
v_t_cond = self.denoise_step( state, prefix_cond, x_t, time, )最终融合方式为,
最终动作场 = (1 - guidance_scale) × 普通动作场 + guidance_scale × 正优势动作场代码为,
v_t = ( 1 - self.config.cfgrl_guidance_scale ) * v_t_uncond + ( self.config.cfgrl_guidance_scale ) * v_t_cond然后更新当前动作,
x_t = x_t + dt * v_t这个过程会在每一个去噪步骤重复,而不是只在最终动作上融合一次。
3、guidance_scale控制策略偏向正优势分布的程度
当guidance_scale = 0,最终动作场 = 普通动作场,完全不使用正优势条件。
当guidance_scale = 1,最终动作场 = 正优势动作场。
这也是RLinf示例中的默认值,
cfgrl_guidance_scale: 1.0此时虽然代码仍然计算普通分支,但最终数值等于正优势分支。
当0 < guidance_scale < 1,模型在普通动作分布和正优势动作分布之间插值。
当guidance_scale > 1,模型会沿着(正优势动作场 - 普通动作场)的方向继续外推,进一步增强优势条件的影响。
例如,guidance_scale = 1.5,对应,
最终动作场 = -0.5 × 普通动作场 + 1.5 × 正优势动作场不再是简单加权平均,而是在正优势方向上进行外推。
引导过强可能导致,
- 动作幅度增大;
- 机械臂运动过快;
- 夹爪频繁开合;
- 轨迹偏离训练数据分布;
- 长时程任务中的误差不断累积。
实际部署时更适合从1.0开始,再根据成功率、碰撞率和动作平滑性逐步调整。
4、guidance_type决定推理使用哪一种条件
代码支持三种方式,positive、negative、no_guide配置。当guidance_type: positive,表示使用Advantage: positive作为条件分支。如果设置guidance_type: no_guide,模型只运行普通分支v_t = v_t_uncond,此时不进行CFG,也不需要计算第二条去噪分支。如果设置guidance_type: negative,模型会使用Advantage: negative作为条件。但当positive_only_conditional: true,训练过程中并没有使用负优势条件,因此代码禁止在这种情况下进行负向引导。
5、CFG会增加推理计算量
无引导推理时,每个积分步骤只调用一次denoise_step。开启正优势引导后,每一步需要分别计算denoise_step_uncond、denoise_step_cond。
虽然两条分支都使用KV Cache,不需要反复编码完整图像和文本,但动作去噪部分的计算量仍然会明显增加。
