RLinf复现RECAP(一):从轨迹回报到优势标签
一、RLinf使用Sidecar文件连接三个处理阶段
与RECAP数据处理相关的代码主要位于,
examples/recap/ ├── process/ │ ├── compute_returns.py │ ├── compute_advantages.py │ ├── run_compute_returns.sh │ ├── run_compute_advantages.sh │ └── config/ └── value/ ├── train_value.py ├── run_value_sft.sh └── config/libero_sft_value.yaml完整数据流如下,
LeRobot轨迹 -> compute_returns.py -> returns_{tag}.parquet -> Value Model SFT -> Value Model checkpoint -> compute_advantages.py -> advantages_{tag}.parquetRLinf不会把Return和Advantage直接写回原始轨迹文件,而是保存成独立的Sidecar Parquet。例如,
dataset/ ├── data/ │ └── chunk-000/ ├── meta/ │ ├── info.json │ ├── stats.json │ ├── returns_fail300.parquet │ └── advantages_fail300_N10_q30.parquet └── mixture_config.yamlReturn和Advantage依靠episode_index、frame_index字段与原始轨迹对齐。
注意,三个阶段必须使用相同的数据版本。重新过滤轨迹、删除帧或者重新编号后,旧的Sidecar文件通常不能继续使用。
SFT和Rollout数据采用不同的处理方式
数据配置示例如下,
data: train_data_paths: - dataset_path: /path/to/sft_dataset type: sft weight: 1.0 - dataset_path: /path/to/rollout_dataset type: rollout weight: 1.0其中,
- sft一般表示人工示范;
- rollout表示策略自主运行产生的数据。
二者差别如下,
- SFT:
- 所有轨迹默认成功,最终所有Advantage标签强制设为True。
- Rollout:
- 根据is_success区分成功和失败,根据连续Advantage和阈值生成正负标签。
二、compute_returns.py将轨迹结果转换为逐帧回报
Return的计算入口是,
examples/recap/process/compute_returns.py启动命令为,
bash examples/recap/process/run_compute_returns.sh compute_returns核心配置如下,
data: train_data_paths: - dataset_path: /path/to/sft_dataset type: sft - dataset_path: /path/to/rollout_dataset type: rollout gamma: 1.0 failure_reward: -300.0 tag: fail300 num_workers: 128compute_returns_for_episode负责计算单条轨迹
单条轨迹的Reward和Return由下面的函数计算,
def compute_returns_for_episode( episode_length, is_success, gamma, failure_reward, ): rewards = np.full( episode_length, -1.0, dtype=np.float32, ) rewards[-1] = ( 0.0 if is_success else failure_reward ) returns = np.zeros( episode_length, dtype=np.float32, ) returns[-1] = rewards[-1] for t in range( episode_length - 2, -1, -1, ): returns[t] = ( rewards[t] + gamma * returns[t + 1] ) return returns, rewards奖励规则为,
任务仍在执行:Reward = -1 成功终止帧:Reward = 0 失败终止帧:Reward = failure_reward其中,-1表示每执行一步产生一次时间成本;较大的失败惩罚则用于保证失败轨迹的回报明显低于成功轨迹。
Return从轨迹末尾向前累计
当gamma=1时,当前帧Return = 当前帧Reward + 下一帧Return。
假设一条成功轨迹包含三个普通步骤和一个成功终止帧,
帧 Reward Return 当前帧 -1 -3 下一帧 -1 -2 再下一帧 -1 -1 成功终止帧 0 0计算过程为,
成功终止帧:Return = 0 再下一帧:Return = -1 + 0 = -1 下一帧:Return = -1 + -1 = -2 当前帧:Return = -1 + -2 = -3因此,成功轨迹中的Return会逐渐接近0,
-3 → -2 → -1 → 0_process_single_parquet负责处理数据文件
RLinf不会加载图像数据,只读取计算Return所需的元数据列,
_READ_COLUMNS = [ "episode_index", "frame_index", "is_success", "task_index", "task", ]实际读取时,还会检查当前Parquet中是否存在这些字段,
pf = pq.ParquetFile(pq_file) available = set(pf.schema_arrow.names) cols_to_read = [ c for c in _READ_COLUMNS if c in available ] table = pq.read_table( pq_file, columns=cols_to_read, )由于没有读取图像,Return计算主要是Parquet I/O和数组运算,内存占用相对较低。
代码根据episode_index的变化找到轨迹边界,
change_mask = ( np.diff(episode_indices) != 0 ) change_positions = ( np.where(change_mask)[0] + 1 )每条轨迹分别调用,
compute_returns_for_episode(...)处理SFT数据时,代码直接将轨迹设为成功,
if dataset_type == "sft": is_success = True处理Rollout数据时,则读取轨迹最后一帧的is_success,
is_success = bool( is_success_col[ep_end - 1] )多个Parquet文件通过线程并行处理
process_dataset会递归查找数据目录中的所有Parquet,
parquet_files = sorted( str(p) for p in data_dir.rglob("*.parquet") )随后使用线程池并行处理,
with ThreadPoolExecutor( max_workers=effective_workers ) as pool: fut = pool.submit( _process_single_parquet, pq_file, dataset_type, gamma, failure_reward, tasks, )PyArrow文件读取时能够释放GIL,因此这里使用线程也可以获得较好的并行效果。
所有结果最终合并为一个Arrow Table,
combined = pa.concat_tables( result_tables )当配置为,
tag: fail300输出文件为,
meta/returns_fail300.parquet主要字段包括,
episode_index frame_index return reward prompt脚本还会更新,
meta/stats.json meta/info.jsonstats.json中会记录,
{ "return": { "mean": "...", "std": "...", "min": "...", "max": "..." }, "reward": { "mean": "...", "std": "...", "min": "...", "max": "..." } }后面的Value Model和Advantage计算都需要使用这里的Return范围。
三、Value Model将逐帧Return转换为状态价值
价值模型训练入口是,
examples/recap/value/train_value.py运行命令为,
bash examples/recap/value/run_value_sft.sh libero_sft_valuetrain_value.py本身没有实现完整训练循环,而是创建RLinf的SFT Worker和Runner,
actor_group = ( FSDPValueSftWorker .create_group(cfg) .launch( cluster, name=cfg.actor.group_name, placement_strategy=actor_placement, ) ) runner = SFTRunner( cfg=cfg, actor=actor_group, ) runner.init_workers() runner.run()ValueDataset根据帧索引读取Return标签
Value Model的数据集实现位于,
rlinf/data/datasets/recap/value_model.py每次读取样本时,代码先从原始LeRobot数据中获得episode_index、frame_index、图像、任务文本。然后使用episode_index和frame_index从Return Sidecar中读取标签,
raw = float( self._sidecar[ep]["return"][fr] )如果找不到对应Episode,代码会提示Sidecar或Tag不匹配,
if ep not in self._sidecar: raise KeyError( "The sidecar/tag may not match " "the dataset" )最终返回给模型的数据结构为,
result = { "images": images, "prompt": prompt, "target_values": target_value, "actions": None, }Value Model不需要动作标签,因此,
actions = NoneReturnNormalizer将原始Return归一化到-1到0
配置默认开启,
data: normalize_to_minus_one_zero: true对应实现为,
def normalize_value(self, value): denom = ( abs(self.return_min) if self.return_min != 0 else 1.0 ) return value / denom假设全局最小Return为-300(return_max = 0),
原始 Return = -300 归一化 Value = -1.0 原始 Return = -150 归一化 Value = -0.5 原始 Return = 0 归一化 Value = 0如果不同任务的最大轨迹长度差异很大,全局最小Return可能主要由最长任务或失败惩罚决定。这样会把短任务的价值压缩到靠近0的较小区间内,因此多任务训练时需要分别观察各任务的价值分布。
Value Head使用201个离散区间预测价值
默认配置为,
num_bins: 201 v_min: -1.0 v_max: 0.0模型输出201个Logits,
logit_0, logit_1, ..., logit_200每一个Logit对应-1到0之间的一个价值区间。推理时先计算概率,
probs = F.softmax( logits, dim=-1, )再计算期望价值,
values = ( probs * self.value_head.atoms ).sum(dim=-1)可以理解为,
预测价值 = 每个价值区间 × 该区间的预测概率连续标签被分配到相邻的两个区间
训练时,连续目标值通常不会刚好落在某一个离散区间上。代码先计算目标值在离散区间中的位置,
b = ( target_values - self.v_min ) / self.delta_z然后找到左右两个区间,
l = b.floor().long() u = b.ceil().long()监督概率按照距离分配,
target_probs[batch_idx, l] += d_to_u target_probs[batch_idx, u] += d_to_l假设目标值位于两个区间之间,并且更靠近左区间,
左区间监督概率:0.8 右区间监督概率:0.2这种方式比直接选择最近区间更平滑,也保留了连续Return中的相对距离信息。最终损失为,
loss = -( target_probs * F.log_softmax(logits, dim=-1) ).sum(dim=-1)训练日志中除了Loss,还会记录,
cat_acc_best cat_acc_neighbor mae value_spearman其中,value_spearman用于衡量预测价值和真实Return的排序一致性。
四、compute_advantages.py将价值变化转换为正负标签
Advantage的计算入口为,
examples/recap/process/compute_advantages.py运行命令为,
bash examples/recap/process/run_compute_advantages.sh compute_advantages主要配置如下,
advantage: value_checkpoint: /path/to/value_checkpoint batch_size: 1024 flush_interval: 256 num_dataloader_workers_per_gpu: 12 prefetch_factor: 2 discount_next_value: true positive_quantile: 0.3 returns_tag: fail300 tag: fail300_N10_q30 data: advantage_lookahead_step: 10 gamma: 1.0ValueInferenceDataset统一构造模型输入
不同机器人数据集使用的字段名称可能不同。例如LIBERO可能使用:
observation.image observation.wrist_image observation.stateFranka数据可能使用,
observation.images.front_cam observation.images.wrist_cam observation.state.tcp_poseRLinf通过KEY_MAPPINGS将它们转换成Value Model使用的统一格式,
KEY_MAPPINGS = { "libero": { "observation.image": "observation/image", "observation.wrist_image": "observation/wrist_image", "observation.state": "observation/state", "task": "prompt", } }ValueInferenceDataset每次返回,
{ "obs": obs, "global_idx": idx, "episode_index": ep_idx, "frame_index": frame_idx, "true_return": true_return, "reward": reward, }代码分两个阶段完成价值推理和优势计算
第一阶段批量推理所有状态的价值,
batch_results = value_model.infer_batch( obs_list, batch_size=batch_size, )预测结果保存到数组,
v_values[local_idx] = float( result["value"] )第二阶段不再调用模型,而是通过数组下标获取当前状态价值V(o_t)和N步之后的状态价值 V(o_t+N),每一帧只需要执行一次Value Model推理。如果直接针对每一个样本分别推理当前状态和未来状态,大部分中间帧会被重复计算两次。RLinf先统一推理再按下标复用,可以显著减少计算量。
分片推理会额外读取N个未来样本
多GPU模式下,每个进程只负责数据集的一部分,
shard_start, shard_end = ( get_shard_indices( total_samples, rank, world_size, ) )但计算当前分片末尾样本时,仍然需要访问未来N步的Value。因此,代码会将推理范围向后扩展,
extended_end = min( shard_end + action_horizon, len(dataset), )例如当前GPU负责,
样本 0 到样本 999并设置,
action_horizon = 10那么该GPU最多会推理到,
样本 1009多出来的10个样本只用于查询分片末尾位置的未来价值,不会重复写入最终结果。
N步Advantage同时考虑状态变化和动作成本
核心代码为,
reward_sum = normalize( reward_sum_raw ) gamma_k = ( gamma**num_valid if discount_next_value else 1.0 ) advantage = ( reward_sum + gamma_k * v_next - v_curr )直接写成容易理解的形式,
Advantage = 归一化后的未来N步累计奖励 + 折扣后的未来状态价值 - 当前状态价值其中,v_curr为当前状态价值,v_next为N步之后的状态价值,reward_sum_raw为中间N步的原始 Reward总和,num_valid为当前轨迹中实际可用的未来步数
假设,
当前价值v_curr = -0.40 10步后价值v_next = -0.30 10步Reward总和 = -10 全局Return范围 = [-300, 0]归一化后的累计Reward为,
reward_sum = -10 / 300 ≈ -0.033因此,
Advantage = -0.033 + (-0.30) - (-0.40) = 0.067结果为正,说明状态价值的提升超过了中间10步产生的时间成本。如果10步后状态反而变差,
v_curr = -0.30 v_next = -0.35则,
Advantage = -0.033 + (-0.35) - (-0.30) = -0.083结果为负,说明动作既消耗了时间,又没有推动任务进展。
gamma等于1时直接使用Return差值计算累计奖励
当gamma=1.0,代码不需要重新读取并累计每一步Reward。如果N步后的状态仍在当前轨迹中,
reward_sum_raw = ( true_return - next_return )原因是,
当前Return = 未来N步Reward总和 + N步后的Return移项后是,
未来N步Reward总和 = 当前Return - N步后的Return如果N步后已经超出轨迹末尾,
reward_sum_raw = true_return v_next = 0.0此时直接使用当前帧到轨迹结束的完整Return。
当gamma不等于1时,代码才会显式读取Reward序列:
reward_sum_raw = np.sum( gamma_powers[:num_valid] * reward_slice )轨迹末尾不会跨Episode读取未来状态
未来位置通过下面的方式计算,
next_gidx = ( gidx + action_horizon ) is_next_pad = ( next_gidx >= ep_end )实际使用的Reward数量为,
num_valid = min( action_horizon, ep_end - gidx, )连续Advantage通过全局阈值转换为标签
每个样本先得到连续值,
advantage_continuous所有数据集计算完成后,代码会合并 Advantage,
combined_advantages = np.concatenate( all_advantages )默认,
positive_quantile: 0.3表示将Advantage最高的30%作为正样本。因此,实际阈值位于70%分位点:
unified_threshold = np.percentile( combined_advantages, 70, )保存Rollout数据时,
save_df["advantage"] = ( save_df["advantage_continuous"] >= threshold )保存SFT数据时,
if dataset_type == "sft": save_df["advantage"] = True结果通过临时Parquet分块写入
优势计算可能包含数百万帧,如果一直把所有结果保存在内存中,容易产生OOM。
代码根据flush_interval和batch_size,定期将结果写入临时Parquet,
temp_df.to_parquet( temp_file, index=False, )写入后清空内存,
for k in results: results[k] = [] gc.collect()所有分块处理完成后再统一合并,
merged_df = pd.concat( [ pd.read_parquet(f) for f in temp_files ], ignore_index=True, )最终输出文件为,
meta/advantages_fail300_N10_q30.parquet主要字段包括,
episode_index frame_index return value_current value_next reward_sum reward_sum_raw num_valid_rewards advantage_continuous advantage dataset_name代码还会更新mixture_config.yaml,记录全局Return范围、统一阈值、正样本比例配置和数据集信息,主要用于实验记录与结果追踪。下一阶段的CFG训练直接读取advantages_{tag}.parquet中的布尔优势标签。记录信息如下,
global_return_min global_return_max unified_threshold positive_quantile 数据集名称与权重这些信息会在下一阶段的CFG策略训练中继续使用。
