通用趋势策略增加过滤条件,剔除成交额过低流动性不足个股。
聚焦“通用趋势策略 + 成交额流动性过滤”这一件事,适合直接写进课程讲义或技术博客。
通用趋势策略增加流动性过滤:剔除成交额过低个股
一、实际应用场景描述
趋势策略(Trend Following)是量化投资中最常见、最“通用”的策略骨架之一:
均线金叉 → 做多;均线死叉 → 止损/做空。
但在实盘落地时,一个被广泛忽视、却频繁“吃人”的问题就是:
🔴 成交额过低带来的隐性风险
真实场景 问题本质
策略选出一只“技术形态完美”的小盘股 日均成交额只有 300 万
计划买入 50 万,挂单后发现 冲击成本高达 3%~5%
持有期间遇到利空 想卖,但盘口没有承接,连续跌停
回测中“完美止盈” 实盘直接变成“止盈变巨亏”
👉 回测里你是用收盘价“无摩擦”成交的,而现实是:没有流动性,就没有执行力。
二、引入痛点(问题结构化)
我们把问题拆成可被工程化解决的几个层级:
层级 痛点 后果
数据层 只看价格/均线,忽略成交额 选到“纸面趋势”
策略层 没有流动性约束 实盘无法复现回测
风控层 缺乏“最坏情况可退出”的保证 单次黑天鹅致命
系统工程层 参数拍脑袋 不知道 3000 万还是 1 亿更合适
教学层 趋势策略 demo 过度简化 学生以为“均线好用”
本质结论:
没有流动性约束的趋势策略,不是策略,是假设。
三、核心逻辑讲解(“为什么要这么设计”)
3.1 成交额为什么比市值更“真实”?
很多教程喜欢用市值来过滤流动性,但在 A 股:
- 大量小盘股 市值小 ≠ 成交额小
- 也存在 大盘股流动性崩塌 的情况(如停牌复盘、利空连续一字板)
✅ 成交额(Turnover / Amount) 是:
- 实时可观测
- 可直接回测
- 可直接映射到“能否按计划成交”
3.2 流动性过滤的“三层结构”
我们引入一个分层流动性过滤框架:
┌─────────────────────────────────────────────┐
│ 成交额流动性过滤(三层) │
├─────────────────────────────────────────────┤
│ │
│ Layer 1:绝对成交额门槛 │
│ ───────────────────────────────────── │
│ 近 N 日日均成交额 ≥ 阈值(如 2000 万) │
│ 👉 过滤“僵尸股” │
│ │
│ Layer 2:相对成交额稳定性 │
│ ───────────────────────────────────── │
│ 近 N 日成交额标准差 / 均值 ≤ 阈值 │
│ 👉 过滤“偶尔放量、平时没流动性”的个股 │
│ │
│ Layer 3:极端低流动性预警 │
│ ───────────────────────────────────── │
│ 当日成交额 < 阈值 → 强制不交易 │
│ 👉 防止“突然失去流动性”的尾部风险 │
│ │
└─────────────────────────────────────────────┘
3.3 核心参数设计(可解释、可回测)
参数 含义 经验值(A 股)
"min_avg_amount" 日均成交额下限 1000 万 ~ 5000 万
"lookback_days" 计算窗口 20 日
"max_cv" 变异系数上限 ≤ 0.8
"emergency_amount" 当日熔断阈值 300 万
"min_trading_days" 最少交易日 ≥ 15 天
3.4 和趋势策略如何“正确集成”?
✅ 正确顺序(这是关键工程细节):
选股信号生成
↓
趋势策略筛选(均线 / 突破 / 动量)
↓
★ 流动性过滤(先粗筛 → 再精筛)
↓
下单执行
❌ 常见错误:
- 先过滤流动性 → 再算趋势(破坏了策略逻辑)
- 在回测里用 future data(用未来成交额决定过去是否交易)
四、项目结构(工程化)
trend_liquidity_filter/
├── README.md
├── requirements.txt
├── config.yaml
├── data/
│ ├── daily_prices.csv # 日频行情
│ └── daily_amounts.csv # 日频成交额
├── src/
│ ├── data_loader.py # 数据加载
│ ├── trend_signal.py # 通用趋势策略(均线/突破)
│ ├── liquidity_filter.py # ★ 流动性过滤核心模块
│ ├── strategy_engine.py # 策略引擎
│ ├── backtester.py # 回测框架
│ └── visualizer.py # 可视化
├── main.py
└── compare_liquidity.py # 有/无流动性过滤对比
五、完整代码(模块化 + 清晰注释)
"requirements.txt"
pandas>=1.5
numpy>=1.21
matplotlib>=3.5
seaborn>=0.12
pyyaml>=6.0
"config.yaml"
# 趋势策略 + 流动性过滤配置
# 流动性过滤参数
liquidity:
enabled: true
lookback_days: 20
min_avg_amount: 20000000 # 2000 万(单位:元)
max_cv: 0.8 # 变异系数上限
emergency_amount: 3000000 # 单日 < 300 万 → 禁止交易
min_trading_days: 15
# 趋势策略参数
strategy:
fast_ma: 5
slow_ma: 20
max_positions: 5
take_profit_pct: 0.08
stop_loss_pct: -0.05
initial_capital: 1000000
commission_rate: 0.0003
stamp_tax_rate: 0.001
backtest:
start_date: "2022-01-01"
end_date: "2024-12-31"
"src/data_loader.py"
"""
data_loader.py
行情 + 成交额数据加载
"""
import pandas as pd
def load_price_data(path: str) -> pd.DataFrame:
"""
CSV 格式:
date,code,open,high,low,close
"""
df = pd.read_csv(path, parse_dates=['date'])
df['code'] = df['code'].astype(str).str.zfill(6)
return df.set_index(['date', 'code']).sort_index()
def load_amount_data(path: str) -> pd.DataFrame:
"""
CSV 格式:
date,code,amount
"""
df = pd.read_csv(path, parse_dates=['date'])
df['code'] = df['code'].astype(str).str.zfill(6)
return df.set_index(['date', 'code']).sort_index()
def get_close_matrix(price_data: pd.DataFrame) -> pd.DataFrame:
return price_data['close'].unstack()
def get_amount_matrix(amount_data: pd.DataFrame) -> pd.DataFrame:
return amount_data['amount'].unstack()
"src/trend_signal.py"
"""
trend_signal.py
通用趋势策略:双均线 + 突破
"""
import pandas as pd
import numpy as np
def compute_trend_signals(
close: pd.DataFrame,
fast: int = 5,
slow: int = 20
) -> pd.DataFrame:
"""
为每只股票计算趋势信号
返回 DataFrame:
- ma_fast / ma_slow / trend_signal
- trend_signal: 1 = 做多, 0 = 无信号
"""
signals = pd.DataFrame(index=close.index, columns=close.columns)
for code in close.columns:
s = close[code].dropna()
if len(s) < slow:
continue
ma_fast = s.rolling(fast).mean()
ma_slow = s.rolling(slow).mean()
# ★ 核心趋势逻辑
signal = (ma_fast > ma_slow).astype(int)
signals[code] = signal
return signals
def select_trend_candidates(
signals: pd.DataFrame,
date: pd.Timestamp,
n_candidates: int = 20
) -> list:
"""
选择当日处于“上升趋势”的股票
"""
if date not in signals.index:
return []
row = signals.loc[date]
candidates = row[row == 1].index.tolist()
return candidates[:n_candidates]
"src/liquidity_filter.py"(★ 核心模块)
"""
liquidity_filter.py
★ 成交额流动性过滤(三层结构)
"""
import pandas as pd
import numpy as np
from typing import Dict, List
class LiquidityFilter:
"""
成交额流动性过滤器
三层过滤:
1. 日均成交额 ≥ min_avg_amount
2. 成交额变异系数 ≤ max_cv
3. 当日成交额 ≥ emergency_amount
"""
def __init__(
self,
lookback_days: int = 20,
min_avg_amount: float = 20_000_000,
max_cv: float = 0.8,
emergency_amount: float = 3_000_000,
min_trading_days: int = 15
):
self.lookback = lookback_days
self.min_avg = min_avg_amount
self.max_cv = max_cv
self.emergency = emergency_amount
self.min_days = min_trading_days
def filter(
self,
candidates: List[str],
amount_matrix: pd.DataFrame,
date: pd.Timestamp
) -> Dict:
"""
★ 核心方法
返回:
{
'passed': [code, ...],
'rejected': {
'avg_amount': [code, ...],
'cv': [code, ...],
'emergency': [code, ...],
'insufficient_data': [code, ...]
}
}
"""
passed = []
rejected = {
'avg_amount': [],
'cv': [],
'emergency': [],
'insufficient_data': []
}
# 计算窗口
start = date - pd.Timedelta(days=self.lookback * 1.5)
window = amount_matrix.loc[start:date]
for code in candidates:
if code not in amount_matrix.columns:
rejected['insufficient_data'].append(code)
continue
series = window[code].dropna()
# ★ Layer 0:数据充足性检查
if len(series) < self.min_days:
rejected['insufficient_data'].append(code)
continue
# ★ Layer 1:日均成交额
avg = series.mean()
if avg < self.min_avg:
rejected['avg_amount'].append(code)
continue
# ★ Layer 2:成交额稳定性(变异系数)
cv = series.std() / avg if avg > 0 else float('inf')
if cv > self.max_cv:
rejected['cv'].append(code)
continue
# ★ Layer 3:当日成交额熔断
today_amount = series.iloc[-1] if len(series) > 0 else 0
if today_amount < self.emergency:
rejected['emergency'].append(code)
continue
passed.append(code)
return {
'passed': passed,
'rejected': rejected
}
def print_report(self, rejected: Dict, date: pd.Timestamp):
total_rejected = sum(len(v) for v in rejected.values())
print(f" [{date.strftime('%Y-%m-%d')}] 流动性过滤:")
print(f" 候选: {sum(len(v) for v in rejected.values()) + len(self.passed)}")
for reason, codes in rejected.items():
if codes:
print(f" {reason}: {len(codes)} 只")
"src/strategy_engine.py"
"""
strategy_engine.py
趋势策略引擎(集成流动性过滤)
"""
import pandas as pd
import numpy as np
from src.liquidity_filter import LiquidityFilter
class TrendLiquidityStrategy:
"""
★ 趋势策略 + 流动性过滤
"""
def __init__(
self,
liquidity_filter: LiquidityFilter,
fast_ma: int = 5,
slow_ma: int = 20,
max_positions: int = 5,
take_profit_pct: float = 0.08,
stop_loss_pct: float = -0.05,
initial_capital: float = 1_000_000,
commission_rate: float = 0.0003,
stamp_tax_rate: float = 0.001
):
self.lf = liquidity_filter
self.fast = fast_ma
self.slow = slow_ma
self.max_pos = max_positions
self.tp = take_profit_pct
self.sl = stop_loss_pct
self.comm = commission_rate
self.tax = stamp_tax_rate
self.capital = initial_capital
self.positions = {}
self.daily_nav = {}
self.trade_log = []
self.total_rejected = 0
print(f"\n{'='*60}")
print(f" 趋势策略 + 流动性过滤引擎")
print(f" 均线: {fast_ma}/{slow_ma}")
print(f" 流动性门槛: ¥{liquidity_filter.min_avg/1e4:.0f}万/日")
print(f"{'='*60}\n")
def run_daily(
self,
date: pd.Timestamp,
close: pd.Series,
amount_matrix: pd.DataFrame,
trend_signals: pd.DataFrame
):
# 1) 生成趋势候选
candidates = self._get_trend_candidates(trend_signals, date)
# ★ 2) 流动性过滤
result = self.lf.filter(candidates, amount_matrix, date)
passed = result['passed']
self.total_rejected += sum(len(v) for v in result['rejected'].values())
# 3) 买入
if len(self.positions) < self.max_pos:
for code in passed:
if len(self.positions) >= self.max_pos:
break
if code in self.positions:
continue
if code not in close or close[code] <= 0:
continue
self._open(code, date, close[code])
# 4) 止盈止损
self._check_exit(date, close)
# 5) 记录净值
self._record_nav(date, close)
def _get_trend_candidates(self, signals, date) -> list:
if date not in signals.index:
return []
row = signals.loc[date]
return row[row == 1].index.tolist()
def _open(self, code, date, price):
alloc = self.capital / max(1, self.max_pos - len(self.positions))
qty = int(alloc / price / 100) * 100
if qty <= 0:
return
cost = qty * price * (1 + self.comm)
if cost > self.capital:
return
self.capital -= cost
self.positions[code] = {
'open_date': date,
'open_price': price,
'qty': qty
}
def _check_exit(self, date, prices):
to_close = []
for code, pos in self.positions.items():
if code not in prices or prices[code] <= 0:
continue
pnl = (prices[code] - pos['open_price']) / pos['open_price']
if pnl >= self.tp or pnl <= self.sl:
to_close.append(code)
for code in to_close:
px = prices.get(code, self.positions[code]['open_price'])
self._close(code, date, px)
def _close(self, code, date, price):
pos = self.positions.pop(code, None)
if not pos:
return
revenue = pos['qty'] * price * (1 - self.comm - self.tax)
self.capital += revenue
def _record_nav(self, date, prices):
nav = self.capital
for code, pos in self.positions.items():
if code in prices and prices[code] > 0:
nav += pos['qty'] * prices[code]
self.daily_nav[date] = nav
def final_liquidate(self, date, prices):
for code in list(self.positions.keys()):
px = prices.get(code, self.positions[code]['open_price'])
self._close(code, date, px)
def print_stats(self):
print(f"\n{'='*60}")
print(f" 期末资金: ¥{self.capital:,.2f}")
print(f" 累计拦截: {self.total_rejected} 次")
print(f"{'='*60}\n")
"src/backtester.py"
"""
backtester.py
回测引擎(含对比模式)
"""
import pandas as pd
from src.strategy_engine import TrendLiquidityStrategy
def run_backtest(
strategy: TrendLiquidityStrategy,
close: pd.DataFrame,
amount_matrix: pd.DataFrame,
trend_signals: pd.DataFrame,
start: str = None,
end: str = None,
enable_liquidity: bool = True
) -> dict:
dates = close.index
if start:
dates = dates[dates >= pd.Timestamp(start)]
if end:
dates = dates[dates <= pd.Timestamp(end)]
for i, dt in enumerate(dates):
day_close = close.loc[dt] if dt in close.index else pd.Series()
strategy.run_daily(dt, day_close, amount_matrix, trend_signals)
if i % max(1, len(dates)//10) == 0:
nav = strategy.daily_nav.get(dt, 0)
print(f" [{i+1}/{len(dates)}] {dt.strftime('%Y-%m-%d')} | "
f"持仓:{len(strategy.positions)} | 净值:¥{nav:,.0f}")
last = dates[-1]
last_px = close.loc[last] if last in close.index else pd.Series()
strategy.final_liquidate(last, last_px)
return {'nav': pd.Series(strategy.daily_nav), 'strategy': strategy}
def calc_metrics(nav: pd.Series) -> dict:
nav = nav.dropna()
if len(nav) < 2:
return {}
ret = nav.pct_change().dropna()
days = (nav.index[-1] - nav.index[0]).days
yrs = days / 365.25
tot = nav.iloc[-1]/nav.iloc[0] - 1
ann = (1+tot)**(1/yrs) - 1 if yrs > 0 else 0
vol = ret.std() * 252**0.5
sharpe = (ann - 0.025)/vol if vol > 0 else 0
dd = (nav - nav.cummax())/nav.cummax()
return {
'total_ret_pct': round(tot*100, 2),
'ann_ret_pct': round(ann*100, 2),
'vol_pct': round(vol*100, 2),
'sharpe': round(sharpe, 3),
'max_dd_pct': round(dd.min()*100, 2),
'final_equity': round(nav.iloc[-1], 2),
'n_rejected': strategy.total_rejected if 'strategy' in locals() else 0
}
def print_comparison(m1: dict, m2: dict,
l1="含流动性过滤", l2="无流动性过滤"):
print(f"\n{'='*65}")
print(f"{'指标':<22}{l1:<18}{l2:<18}{'差异':<10}")
print(f"{'='*65}")
for lab, key, fmt in [
('累计收益(%)','total_ret_pct','.2f'),
('年化(%)','ann_ret_pct','.2f'),
('波动(%)','vol_pct','.2f'),
('夏普','sharpe','.3f'),
('最大回撤(%)','max_dd_pct','.2f'),
('期末权益','final_equity','.0f')
]:
v1, v2 = m1.get(key,0), m2.get(key,0)
fs = f"{{:<22}}{{:<18{fmt}}}{{:<18{fmt}}}{{:<+15{fmt}}}"
print(fs.format(lab, v1, v2, v1-v2))
print(f"{'='*65}\n")
"main.py"
"""
main.py
趋势策略 + 流动性过滤回测
"""
import yaml
import pandas as pd
import numpy as np
from src.data_loader import load_price_data, load_amount_data, get_close_matrix, get_amount_matrix
from src.trend_signal import compute_trend_signals, select_trend_candidates
from src.liquidity_filter import LiquidityFilter
from src.strategy_engine import TrendLiquidityStrategy
from src.backtester import run_backtest, calc_metrics, print_comparison
def load_config(path='config.yaml'):
with open(path) as f:
return yaml.safe_load(f)
def generate_mock_data(n_stocks=30, seed=42):
np.random.seed(seed)
dates = pd.date_range('2022-01-01', '2024-12-31', freq='B')
codes = [f'{i:06d}' for i in range(n_stocks)]
price_recs = []
amount_recs = []
for code in codes:
drift = np.random.normal(0.0003, 0.015, len(dates))
close = 10 * np.cumprod(1 + drift)
for d, c in zip(dates, close):
price_recs.append({'date': d, 'code': code, 'close': round(c, 2)})
# 成交额:大部分有流动性,部分模拟"僵尸股"
if np.random.random() < 0.2:
amt = np.random.uniform(1e5, 5e5) # 低流动性
else:
amt = np.random.uniform(2e6, 5e7) # 正常
amount_recs.append({'date': d, 'code': code, 'amount': round(amt, 2)})
return (pd.DataFrame(price_recs), pd.DataFrame(amount_recs))
def main():
cfg = load_config()
# 数据
try:
prices = load_price_data('data/daily_prices.csv')
amounts = load_amount_data('data/daily_amounts.csv')
close = get_close_matrix(prices)
amount_mtx = get_amount_matrix(amounts)
except FileNotFoundError:
print("生成模拟数据...")
p, a = generate_mock_data()
close = p.pivot(index='date', columns='code', values='close')
amount_mtx = a.pivot(index='date', columns='code', values='amount')
# 趋势信号
signals = compute_trend_signals(close, cfg['strategy']['fast_ma'], cfg['strategy']['slow_ma'])
# ====== ① 含流动性过滤 ======
print("\n" + "="*60)
print(" ① 趋势策略 + 流动性过滤")
print("="*60)
lf = LiquidityFilter(
lookback_days=cfg['liquidity']['lookback_days'],
min_avg_amount=cfg['liquidity']['min_avg_amount'],
max_cv=cfg['liquidity']['max_cv'],
emergency_amount=cfg['liquidity']['emergency_amount']
)
strat1 = TrendLiquidityStrategy(
liquidity_filter=lf,
fast_ma=cfg['strategy']['fast_ma'],
slow_ma=cfg['strategy']['slow_ma'],
max_positions=cfg['strategy']['max_positions'],
initial_capital=cfg['strategy']['initial_capital']
)
r1 = run_backtest(
strat1, close, amount_mtx, signals,
cfg['backtest']['start_date'], cfg['backtest']['end_date'],
enable_liquidity=True
)
strat1.print_stats()
# ====== ② 无流动性过滤(对比组)======
print("\n" + "="*60)
print(" ② 趋势策略(无流动性过滤)")
print("="*60)
lf_none = LiquidityFilter(min_avg_amount=0, max_cv=999, emergency_amount=0)
strat2 = TrendLiquidityStrategy(
liquidity_filter=lf_none,
fast_ma=cfg['strategy']['fast_ma'],
slow_ma=cfg['strategy']['slow_ma'],
max_positions=cfg['strategy']['max_positions'],
initial_capital=cfg['strategy']['initial_capital']
)
r2 = run_backtest(
strat2, close, amount_mtx, signals,
cfg['backtest']['start_date'], cfg['backtest']['end_date'],
enable_liquidity=False
)
strat2.print_stats()
# 对比
m1 = calc_metrics(r1['nav'])
m2 = calc_metrics(r2['nav'])
print_comparison(m1, m2)
print("✅ 回测完成")
if __name__ == '__main__':
main()
六、README.md 与使用说明
# 通用趋势策略 + 成交额流动性过滤
## 核心功能
在通用趋势策略(双均线)基础上,增加三层成交额流动性过滤:
1. 日均成交额 ≥ N 元
2. 成交额变异系数 ≤ 阈值
3. 单日成交额熔断保护
## 安装
bash
pip install -r requirements.txt
## 快速开始
### 1. 准备数据
**行情数据** `data/daily_prices.csv`:
csv
date,code,open,high,low,close
2022-01-03,000001,12.3,12.5,12.1,12.4
...
**成交额数据** `data/daily_amounts.csv`:
csv
date,code,amount
2022-01-03,000001,152345678
...
### 2. 配置流动性参数
yaml
liquidity:
lookback_days: 20
min_avg_amount: 20000000 # 2000 万/日
max_cv: 0.8
emergency_amount: 3000000 # 300 万熔断
### 3. 运行
bash
python main.py
输出:
- 有/无流动性过滤的策略对比
- 被拦截个股统计
- 核心评价指标差异
## 参数参考
| 参数 | 宽松 | 中性 | 严格 |
|------|------|------|------|
| min_avg_amount | 500 万 | 2000 万 | 5000 万 |
| max_cv | 1.0 | 0.8 | 0.5 |
| emergency_amount | 100 万 | 300 万 | 500 万 |
七、核心知识点卡片
┌──────────────────────────────────────────────────────────────┐
│ 趋势策略 + 流动性过滤 — 核心知识 │
├────────────────┬─────────────────────────────────────────────┤
│ 趋势策略 │ 双均线/突破/动量,最通用的量化骨架 │
│ 成交额过滤 │ 比市值更真实反映"能不能卖出" │
│ 三层结构 │ 日均门槛 → 稳定性 → 熔断保护 │
│ 变异系数 │ CV = std/mean,衡量流动性稳定性 │
│ 回测陷阱 │ 无流动性过滤的回测 ≈ 假设 │
│ 正确集成顺序 │ 趋势信号 → 流动性过滤 → 下单 │
│ 核心原则 │ 不能退出的策略不是策略,是假设 │
└────────────────┴─────────────────────────────────────────────┘
八、免责声明与风险提示
⚠️ 免责声明:本代码仅供学习、研究与量化教学用途,不构成任何投资建议或投资决策依据。模拟数据为随机数生成,不代表任何真实标的历史或未来表现。
⚠️ 风险提示:
- 流动性过滤会减少可选标的,可能降低策略容量
- 成交额数据本身可能被短期操纵(对敲放量)
- 极端行情下流动性可能在持仓期间恶化,静态过滤无法应对
- 回测中信号为简化逻辑,实盘需更完善的趋势判定
- 历史流动性不保证未来流动性
本文代码仅供学习与技术交流,不构成任何投资建议,股市有风险,入市需谨慎!
利用AI解决实际问题,如果你觉得这个工具好用,欢迎关注长安牧笛!
