4S体系择时模块代码实现,根据指数估值分位,判定当前整体仓位档位。
4S 体系择时模块:基于指数估值分位的仓位管理
一、实际应用场景描述
在量化投资中,选股(Stock Selection) 和 择时(Market Timing) 是两件事。4S 体系不仅关注"买什么",同样关注"什么时候买、买多少"。
核心问题
即使选到了好股票,如果在市场整体估值过高时满仓入场,依然可能面临大幅回撤。
典型场景
场景 问题
2015 年创业板 PE 突破 100 倍 好公司 + 贵市场 = 依然亏钱
2018 年全市场估值底部 悲观情绪下不敢建仓,错过底部
牛市中段 不知道该加仓还是减仓
震荡市 频繁调整仓位,交易成本吞噬利润
二、引入痛点
痛点 表现
🔴 估值主观判断 "贵不贵"全凭感觉,缺乏量化标准
🔴 仓位拍脑袋 满仓 / 半仓 / 空仓没有系统化依据
🔴 滞后性 看到大跌才减仓,已经来不及
🔴 单一指标不可靠 只看 PE 或只看 PB,容易误判
🟡 不同指数差异大 沪深 300、中证 500、创业板不能用同一把尺
🟡 均值回归周期不确定 估值"均值回归"可能需要 3~5 年
三、核心逻辑讲解
3.1 什么是指数估值分位?
估值分位(Percentile) 回答的问题是:
当前估值在历史所有交易日中,排在什么位置?
估值分位 = 历史上估值低于当前值的天数 / 总天数 × 100%
示例:
当前 PE = 15 → 历史上 20% 的时间比这更低
→ 估值分位 = 20% → 当前处于历史低位 → 可以重仓
当前 PE = 40 → 历史上 85% 的时间比这更低
→ 估值分位 = 85% → 当前处于历史高位 → 应该轻仓
3.2 为什么用分位而不是绝对值?
对比 绝对值(如 PE < 15) 分位值(如 PE 分位 < 30%)
跨指数适用 ❌ 沪深 300 和创业板 PE 中枢完全不同 ✅ 都映射到 0~100%,可直接对比
跨时段适用 ❌ 10 年前 PE 15 和今天 PE 15 含义不同 ✅ 分位自动适应市场结构变化
极端值敏感 ❌ 一个极端值就能拉偏 ✅ 分位天然抗极端值
3.3 4S 择时仓位映射
┌─────────────────────────────────────────────────────┐
│ 4S 择时仓位决策矩阵 │
├─────────────────────────────────────────────────────┤
│ │
│ 估值分位区间 │ 市场状态 │ 建议仓位 │
│ ───────────────────────────────────────────── │
│ [0%, 20%) │ 🟢 极度低估 │ 90%~100% │
│ [20%, 40%) │ 🟢 低估 │ 70%~90% │
│ [40%, 60%) │ 🟡 合理 │ 50%~70% │
│ [60%, 80%) │ 🟠 偏高 │ 30%~50% │
│ [80%, 100%] │ 🔴 高估 │ 0%~30% │
│ │
│ ★ 核心原则:估值越低,仓位越重;估值越高,仓位越轻 │
│ │
└─────────────────────────────────────────────────────┘
3.4 多指数加权
实际使用中,单一指数不够全面。我们采用多指数加权方式:
综合仓位 = w1 × 沪深300分位 + w2 × 中证500分位 + w3 × 创业板指分位
默认权重:
沪深300:40%(大盘蓝筹代表)
中证500:30%(中盘成长代表)
创业板指:30%(小盘成长代表)
3.5 信号平滑与滞后处理
直接用分位切换仓位会导致频繁调仓。引入两个机制:
1. 分位区间缓冲带:在边界 ±5% 范围内保持当前仓位不变
2. 持仓再平衡频率限制:最短 10 个交易日才允许调整一次
四、项目结构
timing_module/
├── README.md
├── requirements.txt
├── config.yaml
├── data/
│ ├── index_pe_history.csv # 指数 PE 历史数据
│ └── index_pb_history.csv # 指数 PB 历史数据
├── src/
│ ├── data_loader.py # 数据加载
│ ├── valuation_calculator.py # ★ 估值分位计算
│ ├── position_mapper.py # ★ 分位 → 仓位映射
│ ├── signal_smoother.py # 信号平滑与频率控制
│ ├── timing_engine.py # ★ 择时主引擎
│ ├── backtester.py # 回测框架
│ └── visualizer.py # 可视化
├── main.py
└── output/
└── timing_signals.csv # 输出的择时信号
五、完整代码
"requirements.txt"
pandas>=1.5
numpy>=1.21
matplotlib>=3.5
seaborn>=0.12
scipy>=1.9
pyyaml>=6.0
"config.yaml"
# 4S 择时模块配置
# 跟踪的指数
indices:
- name: "沪深300"
code: "000300"
weight: 0.4
pe_col: "pe_300"
pb_col: "pb_300"
- name: "中证500"
code: "000905"
weight: 0.3
pe_col: "pe_500"
pb_col: "pb_500"
- name: "创业板指"
code: "399006"
weight: 0.3
pe_col: "pe_cyb"
pb_col: "pb_cyb"
# ★ 仓位映射(分位 → 仓位)
position_mapping:
method: "linear" # linear / step(线性插值 / 阶梯式)
min_position: 0.10 # 最低仓位 10%
max_position: 0.95 # 最高仓位 95%
# 阶梯式映射(step 模式)
step_rules:
- max_percentile: 20
position: 0.90
- max_percentile: 40
position: 0.75
- max_percentile: 60
position: 0.60
- max_percentile: 80
position: 0.35
- max_percentile: 100
position: 0.15
# 信号平滑
smoothing:
enabled: true
buffer_zone: 0.05 # 分位边界 ±5% 缓冲带
min_rebalance_days: 10 # 最短 10 天调仓一次
use_pb_fallback: true # PE 缺失时用 PB 替代
# 回测
backtest:
start_date: "2015-01-01"
end_date: "2024-12-31"
initial_capital: 1000000
output:
path: "output/timing_signals.csv"
"src/data_loader.py"
"""
data_loader.py
指数估值数据加载
"""
import pandas as pd
import numpy as np
from pathlib import Path
def load_index_valuation(filepath: str) -> pd.DataFrame:
"""
加载指数历史估值数据
预期格式:
date,index_code,pe_ttm,pb,close
2015-01-05,000300,13.5,1.85,3523.45
...
"""
df = pd.read_csv(filepath, parse_dates=['date'])
df = df.sort_values(['index_code', 'date']).reset_index(drop=True)
return df
def load_all_indices(directory: str) -> pd.DataFrame:
"""从目录加载所有指数文件并合并"""
import glob
all_files = glob.glob(f"{directory}/*.csv")
dfs = []
for f in all_files:
df = pd.read_csv(f, parse_dates=['date'])
dfs.append(df)
return pd.concat(dfs, ignore_index=True)
def generate_mock_valuation_data(
start: str = "2015-01-01",
end: str = "2024-12-31",
seed: int = 42
) -> pd.DataFrame:
"""
生成模拟指数估值数据
模拟逻辑:
- PE 围绕中枢波动,叠加长期趋势和短期噪声
- 2015 年中高估、2018 年底低估、2021 年初高估
"""
np.random.seed(seed)
dates = pd.date_range(start, end, freq='B')
indices = [
{"code": "000300", "name": "沪深300", "pe_mean": 14.0, "pe_std": 3.0},
{"code": "000905", "name": "中证500", "pe_mean": 22.0, "pe_std": 5.0},
{"code": "399006", "name": "创业板指", "pe_mean": 35.0, "pe_std": 10.0},
]
records = []
for idx_info in indices:
code = idx_info["code"]
pe_mean = idx_info["pe_mean"]
pe_std = idx_info["pe_std"]
n = len(dates)
# 长期趋势:2015 高估 → 2018 低估 → 2021 高估 → 现在中性
t = np.arange(n) / n
trend = 0.3 * np.sin(2 * np.pi * t * 3) + 0.2 * np.sin(2 * np.pi * t * 1.5)
noise = np.random.normal(0, 0.15, n)
pe_ratio = pe_mean * (1 + trend + noise)
pe_ratio = np.clip(pe_ratio, pe_mean * 0.4, pe_mean * 2.5)
pb = pe_ratio / 12.0 * np.random.uniform(0.8, 1.2, n)
close = 3000 * np.cumprod(1 + np.random.normal(0.0003, 0.015, n))
for i, d in enumerate(dates):
records.append({
'date': d,
'index_code': code,
'pe_ttm': round(pe_ratio[i], 2),
'pb': round(pb[i], 2),
'close': round(close[i], 2)
})
return pd.DataFrame(records)
"src/valuation_calculator.py"(★ 核心模块)
"""
valuation_calculator.py
★ 估值分位计算器
核心功能:
1. 计算 PE / PB 的历史分位
2. 支持滚动窗口(如"过去 5 年")
3. 缺失值处理(向前填充 / PB 替代)
4. 多指数聚合
"""
import pandas as pd
import numpy as np
from typing import Dict, List, Optional, Tuple
import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s')
logger = logging.getLogger(__name__)
class ValuationCalculator:
"""
★ 指数估值分位计算器
核心方法:
- compute_percentile(pe, history) → 当前 PE 在历史上的分位
- compute_rolling_percentile(data, window) → 滚动分位
- composite_signal(pe_pct, pb_pct) → 综合估值信号
"""
def __init__(
self,
lookback_years: int = 5,
use_pb_fallback: bool = True,
min_history_days: int = 252
):
"""
参数:
lookback_years: 回溯年数(默认 5 年 ≈ 1260 个交易日)
use_pb_fallback: PE 缺失时是否用 PB 替代
min_history_days: 最少需要多少天历史数据才计算分位
"""
self.lookback = lookback_years * 252 # 年 → 交易日
self.use_pb = use_pb_fallback
self.min_days = min_history_days
logger.info(f"估值计算器初始化: 回溯 {lookback_years} 年, "
f"PB 兜底: {'开' if use_pb_fallback else '关'}")
def compute_percentile(
self,
current_value: float,
history: pd.Series
) -> float:
"""
★ 核心方法:计算当前值在历史上的分位
参数:
current_value: 当前 PE 或 PB
history: 历史估值序列
返回:
分位值 (0~100),值越大表示当前越"贵"
"""
if pd.isna(current_value) or current_value <= 0:
return np.nan
history = history.dropna()
if len(history) < self.min_days:
logger.debug(f"历史数据不足: {len(history)} < {self.min_days}")
return np.nan
# 分位 = 历史上低于当前值的天数 / 总天数
pct = (history < current_value).sum() / len(history) * 100
return round(pct, 2)
def compute_rolling_percentile(
self,
valuation_data: pd.DataFrame,
index_code: str,
metric: str = 'pe_ttm'
) -> pd.Series:
"""
计算滚动分位序列
参数:
valuation_data: 估值数据(含 date, index_code, pe_ttm, pb 等)
index_code: 指数代码
metric: 用哪个指标('pe_ttm' 或 'pb')
返回:
Series: index=date, value=分位 (0~100)
"""
mask = valuation_data['index_code'] == index_code
idx_data = valuation_data[mask].sort_values('date').reset_index(drop=True)
if len(idx_data) < self.min_days:
logger.warning(f"指数 {index_code} 数据不足 {self.min_days} 天")
return pd.Series(dtype=float)
percentiles = pd.Series(dtype=float, index=idx_data['date'])
for i in range(len(idx_data)):
current = idx_data.iloc[i]
current_date = current['date']
current_val = current[metric]
# 回溯窗口
window_start = i - self.lookback
if window_start < 0:
window_start = 0
hist = idx_data.iloc[window_start:i + 1][metric].dropna()
pct = self.compute_percentile(current_val, hist)
percentiles[current_date] = pct
return percentiles
def composite_signal(
self,
pe_percentile: float,
pb_percentile: float
) -> Tuple[float, str]:
"""
★ 综合 PE + PB 分位,输出最终估值信号
参数:
pe_percentile: PE 分位 (0~100)
pb_percentile: PB 分位 (0~100)
返回:
(composite_pct, label)
composite_pct: 综合分位 (0~100)
label: 估值标签(极度低估 / 低估 / 合理 / 偏高 / 高估)
"""
# PE 优先,PB 作为补充
if pd.notna(pe_percentile):
primary = pe_percentile
source = "PE"
elif self.use_pb and pd.notna(pb_percentile):
primary = pb_percentile
source = "PB"
else:
return 50.0, "未知(数据不足)"
# 两者都有时,取加权平均(PE 权重更高)
if pd.notna(pe_percentile) and pd.notna(pb_percentile):
composite = pe_percentile * 0.7 + pb_percentile * 0.3
source = "PE+PB"
else:
composite = primary
# 打标签
label = self._classify_valuation(composite)
logger.debug(f"估值信号: PE={pe_percentile}, PB={pb_percentile} "
f"→ 综合={composite:.1f} ({label}, 来源={source})")
return round(composite, 2), label
def _classify_valuation(self, pct: float) -> str:
"""将分位映射为文字标签"""
if pct < 20:
return "极度低估"
elif pct < 40:
return "低估"
elif pct < 60:
return "合理"
elif pct < 80:
return "偏高"
else:
return "高估"
def batch_compute(
self,
valuation_data: pd.DataFrame,
index_configs: List[Dict]
) -> pd.DataFrame:
"""
批量计算多指数的估值分位
返回:
DataFrame:
date | index_code | pe_pct | pb_pct | composite_pct | label
"""
all_results = []
for config in index_configs:
code = config['code']
name = config.get('name', code)
logger.info(f" 计算 {name}({code})估值分位...")
pe_pcts = self.compute_rolling_percentile(
valuation_data, code, 'pe_ttm'
)
pb_pcts = self.compute_rolling_percentile(
valuation_data, code, 'pb'
)
for date in pe_pcts.index:
pe = pe_pcts[date]
pb = pb_pcts[date] if date in pb_pcts.index else np.nan
comp, label = self.composite_signal(pe, pb)
all_results.append({
'date': date,
'index_code': code,
'index_name': name,
'pe_pct': pe,
'pb_pct': pb,
'composite_pct': comp,
'valuation_label': label
})
return pd.DataFrame(all_results)
"src/position_mapper.py"(★ 核心模块)
"""
position_mapper.py
★ 估值分位 → 仓位映射
将 0~100 的分位值映射为建议仓位(0%~100%)
"""
import pandas as pd
import numpy as np
from typing import Dict, List, Optional
import logging
logger = logging.getLogger(__name__)
class PositionMapper:
"""
★ 分位 → 仓位映射器
两种模式:
1. linear(线性插值):分位从 0→100 时,仓位从 max→min 线性变化
2. step(阶梯式):预设分位区间对应固定仓位
"""
def __init__(
self,
method: str = "step",
min_position: float = 0.10,
max_position: float = 0.95,
step_rules: Optional[List[Dict]] = None
):
"""
参数:
method: 'linear' 或 'step'
min_position: 最高估值分位时的最低仓位
max_position: 最低估值分位时的最高仓位
step_rules: 阶梯规则(step 模式),如:
[
{'max_percentile': 20, 'position': 0.90},
{'max_percentile': 40, 'position': 0.75},
...
]
"""
self.method = method
self.min_pos = min_position
self.max_pos = max_position
self.step_rules = step_rules or [
{'max_percentile': 20, 'position': 0.90},
{'max_percentile': 40, 'position': 0.75},
{'max_percentile': 60, 'position': 0.60},
{'max_percentile': 80, 'position': 0.35},
{'max_percentile': 100, 'position': 0.15},
]
# 按 max_percentile 排序
self.step_rules = sorted(self.step_rules, key=lambda x: x['max_percentile'])
logger.info(f"仓位映射器: 模式={method}, 仓位范围 [{min_position*100:.0f}%, {max_position*100:.0f}%]")
if method == 'step':
logger.info(" 阶梯规则:")
for rule in self.step_rules:
logger.info(f" 分位 ≤ {rule['max_percentile']}% → 仓位 {rule['position']*100:.0f}%")
def map_to_position(self, composite_pct: float) -> float:
"""
★ 核心方法:将综合分位映射为建议仓位
参数:
composite_pct: 综合估值分位 (0~100)
返回:
建议仓位 (0.0 ~ 1.0)
"""
if pd.isna(composite_pct):
logger.warning("分位值为空,返回中性仓位 50%")
return 0.50
if self.method == 'linear':
return self._linear_map(composite_pct)
else:
return self._step_map(composite_pct)
def _linear_map(self, pct: float) -> float:
"""线性插值:分位越高 → 仓位越低"""
# 分位 0 → max_pos, 分位 100 → min_pos
pos = self.max_pos - (pct / 100.0) * (self.max_pos - self.min_pos)
return round(max(self.min_pos, min(self.max_pos, pos)), 4)
def _step_map(self, pct: float) -> float:
"""阶梯映射:找到第一个匹配的区间"""
for rule in self.step_rules:
if pct <= rule['max_percentile']:
return rule['position']
# 兜底
return self.min_pos
def map_dataframe(
self,
df: pd.DataFrame,
pct_col: str = 'composite_pct'
) -> pd.DataFrame:
"""
对 DataFrame 批量映射
新增列: 'suggested_position'
"""
df = df.copy()
df['suggested_position'] = df[pct_col].apply(self.map_to_position)
return df
"src/signal_smoother.py"
"""
signal_smoother.py
信号平滑:缓冲带 + 调仓频率控制
"""
import pandas as pd
import numpy as np
from typing import Optional
import logging
logger = logging.getLogger(__name__)
class SignalSmoother:
"""
择时信号平滑器
解决两个问题:
1. 分位在边界附近反复横跳 → 缓冲带
2. 信号变化过于频繁 → 最小调仓间隔
"""
def __init__(
self,
buffer_zone: float = 0.05,
min_rebalance_days: int = 10
):
"""
参数:
buffer_zone: 缓冲带宽度(如 0.05 = ±5% 分位)
min_rebalance_days: 最短调仓间隔(交易日)
"""
self.buffer = buffer_zone
self.min_days = min_rebalance_days
self.last_rebalance_date = None
self.last_position = None
logger.info(f"信号平滑器: 缓冲带±{buffer_zone*100:.0f}%, "
f"最小调仓间隔 {min_rebalance_days} 天")
def smooth(
self,
date: pd.Timestamp,
current_pct: float,
raw_position: float,
index_code: str = ""
) -> Tuple[float, bool, str]:
"""
★ 核心方法:平滑处理
参数:
date: 当前日期
current_pct: 当前估值分位
raw_position: 原始映射仓位
index_code: 指数代码(用于日志)
返回:
(final_position, changed, reason)
"""
if self.last_position is None:
# 首次调用,直接采用
self.last_position = raw_position
self.last_rebalance_date = date
return raw_position, True, "首次建仓"
# === 检查 1:缓冲带 ===
# 分位变化是否在缓冲带内
pct_change = abs(current_pct - getattr(self, '_last_pct', current_pct))
if pct_change < self.buffer * 100:
# 在缓冲带内,保持上次仓位
logger.debug(f"[{date.strftime('%Y-%m-%d')}] {index_code} "
f"分位变化 {pct_change:.1f}% < 缓冲带 {self.buffer*100:.0f}%,保持仓位")
self._last_pct = current_pct
return self.last_position, False, f"缓冲带内(变化{pct_change:.1f}%)"
# === 检查 2:最小调仓间隔 ===
if self.last_rebalance_date is not None:
days_since = (date - self.last_rebalance_date).days
if days_since < self.min_days:
logger.debug(f"[{date.strftime('%Y-%m-%d')}] {index_code} "
f"距上次调仓仅 {days_since} 天 < {self.min_days} 天,保持仓位")
self._last_pct = current_pct
return self.last_position, False, f"调仓间隔不足({days_since}天)"
# === 通过所有检查,执行调仓 ===
old_pos = self.last_position
self.last_position = raw_position
self.last_rebalance_date = date
self._last_pct = current_pct
change_pct = (raw_position - old_pos) * 100
direction = "加仓" if change_pct > 0 else "减仓"
logger.info(f"[{date.strftime('%Y-%m-%d')}] {index_code} "
f"{direction}: {old_pos*100:.0f}% → {raw_position*100:.0f}% "
f"(分位 {current_pct:.1f}%)")
return raw_position, True, f"{direction}(分位变化{pct_change:.1f}%)"
def smooth_dataframe(
self,
df: pd.DataFrame,
pct_col: str = 'composite_pct',
pos_col: str = 'suggested_position'
) -> pd.DataFrame:
"""
对 DataFrame 批量平滑(按日期升序处理)
"""
df = df.sort_values('date').reset_index(drop=True)
# 重置状态
self.last_rebalance_date = None
self.last_position = None
results = []
for _, row in df.iterrows():
date = row['date']
pct = row[pct_col]
raw_pos = row[pos_col]
code = row.get('index_code', '')
final_pos, changed, reason = self.smooth(date, pct, raw_pos, code)
r = row.to_dict()
r['final_position'] = final_pos
r['position_changed'] = changed
r['change_reason'] = reason
results.append(r)
return pd.DataFrame(results)
"src/timing_engine.py"(★ 主引擎)
"""
timing_engine.py
★ 4S 择时主引擎
串联:估值分位计算 → 仓位映射 → 信号平滑 → 输出
"""
import pandas as pd
import numpy as np
from src.valuation_calculator import ValuationCalculator
from src.position_mapper import PositionMapper
from src.signal_smoother import SignalSmoother
from typing import Dict, List, Optional
import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s')
logger = logging.getLogger(__name__)
class TimingEngine:
"""
★ 4S 择时引擎
完整流程:
1. 加载指数估值数据
2. 计算各指数 PE / PB 历史分位
3. 加权合成综合分位
4. 映射到建议仓位
5. 信号平滑(缓冲带 + 调仓频率控制)
6. 输出最终择时信号
"""
de
本文代码仅供学习与技术交流,不构成任何投资建议,股市有风险,入市需谨慎!
利用AI解决实际问题,如果你觉得这个工具好用,欢迎关注长安牧笛!
