程序员视角看量化交易:多因子选股模型背后的数据工程与回测系统

量化投资听起来很高大上,但拆开来看,核心就是一套复杂的数据工程系统。从程序员的视角,拆解多因子选股模型背后的数据采集、清洗、因子计算、回测框架和系统架构设计。

量化交易的本质:一个大型数据处理系统

很多人听到"量化交易",第一反应是复杂的数学公式和高频交易。但对于真正做过量化系统的程序员来说,量化交易的本质其实更接近一个大规模数据处理流水线——从原始行情数据的采集,到特征因子的计算,再到策略回测和实盘执行,整个链路的工程复杂度远超大多数人的想象。

如果说传统 Web 开发是"用户请求 → 业务逻辑 → 数据库读写",那么量化交易系统就是"行情数据 → 因子计算 → 策略决策 → 交易执行"。两者的区别在于:Web 系统处理的是确定性的业务规则,而量化系统处理的是充满噪声和不确定性的金融数据。

本文从一个后端程序员的视角,拆解多因子选股模型背后的数据工程与回测系统。不讲复杂的金融理论,而是聚焦在系统架构、数据管道、工程实现这些程序员最熟悉的话题上。

多因子选股模型:从金融学理论到工程问题

什么是多因子模型

多因子模型的核心思想很简单:股票的收益率可以被分解为多个"因子"的线性组合。最经典的是 Fama-French 三因子模型,它认为股票收益由三个因子驱动:

  • 市场因子(Market):整体市场的涨跌
  • 规模因子(Size / SMB):小盘股相对大盘股的超额收益
  • 价值因子(Value / HML):高账面市值比相对低账面市值比的超额收益

后来学术界又扩展出了五因子模型(加入盈利能力和投资风格),以及各种行业因子、动量因子、质量因子等等。

从程序员的角度理解,多因子模型就是一个特征工程问题:你要从海量的金融数据中提取出能够预测股票收益的"特征"(因子),然后用某种方式组合这些特征来做出投资决策。

工程视角下的因子体系

在工程实践中,常见的因子可以分为几大类:

因子类别 典型因子 数据来源 计算复杂度
估值因子 PE、PB、PS 财务报表
动量因子 过去 N 日收益率 行情数据
质量因子 ROE、毛利率 财务报表
成长因子 营收增长率、利润增长率 财务报表
波动因子 历史波动率、Beta 行情数据
技术因子 换手率、量价背离 行情 + 成交
另类因子 舆情情绪、供应链数据 爬虫 + NLP

每一个因子的背后,都是一条数据管道——从数据源获取原始数据,经过清洗、对齐、标准化,最终计算出一个数值。当你有 50 个因子、覆盖 5000 只股票、横跨 10 年历史数据时,这个数据管道的工程复杂度就会急剧上升。

系统架构:量化投资的技术栈全景

在写代码之前,我们先从整体架构的角度来看看一个完整的量化投资系统长什么样。

整体架构图(文字描述)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
┌─────────────────────────────────────────────────────────────────────┐
│                        量化投资系统整体架构                            │
├─────────────────────────────────────────────────────────────────────┤
│                                                                     │
│  ┌──────────────┐    ┌──────────────┐    ┌──────────────┐          │
│  │  数据采集层   │───→│  数据存储层   │───→│  因子计算层   │          │
│  │              │    │              │    │              │          │
│  │ · 行情API    │    │ · 时序数据库  │    │ · 因子表达式  │          │
│  │ · 财报爬虫   │    │   (ClickHouse│    │ · 向量化计算  │          │
│  │ · 另类数据   │    │    /Arctic)  │    │ · 因子库管理  │          │
│  │ · 实时推送   │    │ · 关系数据库  │    │ · IC/IR分析   │          │
│  │              │    │   (PostgreSQL│    │              │          │
│  │  技术栈:     │    │    /MySQL)   │    │  技术栈:     │          │
│  │  Scrapy     │    │ · 对象存储   │    │  Pandas      │          │
│  │  Celery     │    │   (MinIO/S3) │    │  NumPy       │          │
│  │  Kafka      │    │              │    │  Polars      │          │
│  └──────────────┘    └──────────────┘    └──────┬───────┘          │
│                                                  │                  │
│  ┌──────────────┐    ┌──────────────┐    ┌──────▼───────┐          │
│  │  交易执行层   │◀───│  策略决策层   │◀───│  回测引擎层   │          │
│  │              │    │              │    │              │          │
│  │ · 订单管理   │    │ · 组合优化   │    │ · 事件驱动   │          │
│  │ · 风控检查   │    │ · 仓位计算   │    │ · 向量化回测 │          │
│  │ · 券商接口   │    │ · 信号生成   │    │ · 滑点模拟   │          │
│  │ · 成交回报   │    │ · 风控约束   │    │ · 绩效归因   │          │
│  │              │    │              │    │              │          │
│  │  技术栈:     │    │  技术栈:     │    │  技术栈:     │          │
│  │  FIX协议    │    │  cvxpy       │    │  Backtrader  │          │
│  │  WebSocket  │    │  scipy       │    │  Zipline     │          │
│  │  Redis      │    │  规则引擎    │    │  VectorBT    │          │
│  └──────────────┘    └──────────────┘    └──────────────┘          │
│                                                                     │
│  ┌─────────────────────────────────────────────────────┐           │
│  │                    基础设施层                         │           │
│  │  · 调度: Airflow / Prefect                          │           │
│  │  · 监控: Prometheus + Grafana                       │           │
│  │  · 容器: Docker + Kubernetes                        │           │
│  │  · CI/CD: GitHub Actions / GitLab CI               │           │
│  └─────────────────────────────────────────────────────┘           │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

各层职责详解

数据采集层是整个系统的入口。它负责从各种数据源获取原始数据,包括交易所的行情数据(日K、分钟K、Tick 数据)、上市公司的财务报表(资产负债表、利润表、现金流量表)、以及各种另类数据(新闻舆情、分析师报告、供应链数据)。这一层的核心挑战是数据源的多样性和不稳定性——不同数据源的格式、频率、质量参差不齐,需要大量的适配和清洗工作。

数据存储层需要根据数据特点选择合适的存储方案。时序数据(行情)适合用 ClickHouse、Arctic 或 InfluxDB 这类时序数据库;结构化元数据(股票基本信息、行业分类)用 PostgreSQL 就够了;非结构化数据(研报 PDF、新闻文本)则需要对象存储配合搜索引擎。

因子计算层是系统的核心计算引擎。它将原始数据转化为可用于策略决策的因子值。这一层的核心要求是计算效率和可扩展性——当你需要在几分钟内算出 5000 只股票 × 50 个因子 × 250 个交易日的因子矩阵时,Pandas 的逐行循环是远远不够的,必须用向量化计算甚至分布式计算。

回测引擎层负责在历史数据上模拟策略的表现。它需要考虑很多现实因素:交易成本、滑点、涨跌停限制、流动性约束等。好的回测框架能让策略开发者专注于策略逻辑本身,而不用关心底层的执行细节。

策略决策层将回测验证过的策略部署到实盘环境,负责每日的信号生成、组合优化和仓位计算。

交易执行层是系统与外部市场的接口,负责将策略信号转化为实际的订单,并通过券商接口完成交易。

数据工程实战:从原始数据到因子矩阵

接下来我们深入到具体的工程实现。一个多因子选股系统的数据处理流程可以分为四个阶段:数据采集、数据清洗、因子计算、因子评估。

第一阶段:数据采集

数据采集的核心是建立一个统一的、可扩展的数据获取框架。不同的数据源有不同的接口协议(REST API、WebSocket、FTP 文件下载、网页爬虫),但我们需要一个统一的抽象层来屏蔽这些差异。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
"""
数据采集框架:统一的数据源抽象
"""
import pandas as pd
from abc import ABC, abstractmethod
from datetime import datetime, date
from typing import List, Optional
import logging

logger = logging.getLogger(__name__)


class DataSource(ABC):
    """数据源抽象基类"""
    
    @abstractmethod
    def fetch_daily_data(self, symbols: List[str], 
                         start_date: date, end_date: date) -> pd.DataFrame:
        """获取日线行情数据"""
        pass
    
    @abstractmethod
    def fetch_financial_data(self, symbols: List[str], 
                             report_date: date) -> pd.DataFrame:
        """获取财务数据"""
        pass


class TushareDataSource(DataSource):
    """Tushare 数据源实现(国内常用的金融数据接口)"""
    
    def __init__(self, token: str):
        import tushare as ts
        self.pro = ts.pro_api(token)
    
    def fetch_daily_data(self, symbols: List[str],
                         start_date: date, end_date: date) -> pd.DataFrame:
        """获取日线行情"""
        frames = []
        for symbol in symbols:
            try:
                df = self.pro.daily(
                    ts_code=symbol,
                    start_date=start_date.strftime('%Y%m%d'),
                    end_date=end_date.strftime('%Y%m%d')
                )
                frames.append(df)
            except Exception as e:
                logger.error(f"获取 {symbol} 行情失败: {e}")
        
        result = pd.concat(frames, ignore_index=True)
        result['trade_date'] = pd.to_datetime(result['trade_date'])
        return result
    
    def fetch_financial_data(self, symbols: List[str],
                             report_date: date) -> pd.DataFrame:
        """获取财务指标数据"""
        df = self.pro.fina_indicator(
            ts_code=','.join(symbols),
            period=report_date.strftime('%Y%m%d')
        )
        return df


class DataCollector:
    """数据采集调度器"""
    
    def __init__(self, source: DataSource, storage_backend):
        self.source = source
        self.storage = storage_backend
    
    def collect_and_store(self, symbols: List[str],
                          start_date: date, end_date: date):
        """采集并存储数据"""
        # 获取行情数据
        daily_data = self.source.fetch_daily_data(
            symbols, start_date, end_date
        )
        self.storage.save_daily(daily_data)
        logger.info(f"已存储 {len(daily_data)} 条日线数据")
        
        return daily_data

这段代码展示了一个简单的数据源抽象模式。在实际工程中,你还需要考虑:

  • 限流和重试:金融数据 API 通常有严格的调用频率限制
  • 增量更新:每天只拉取新增数据,而不是全量重新下载
  • 数据校验:检查数据完整性(有没有缺失交易日、停牌日是否正确标记)
  • 版本管理:财务数据会有修正,需要记录数据版本

第二阶段:数据清洗

金融数据的脏数据问题比大多数业务系统都要严重。常见的问题包括:

  • 复权问题:股票分红、拆股会导致价格不连续,需要做前复权或后复权
  • 缺失值:停牌股票没有行情数据,新股上市前的数据为空
  • 异常值:极端行情下的乌龙指、数据商的错误数据
  • 幸存者偏差:退市股票在历史数据中缺失,导致回测结果偏乐观
  • 时区对齐:不同市场的数据需要统一到同一个时区
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
"""
数据清洗流水线
"""
import pandas as pd
import numpy as np
from typing import Dict


class DataCleaner:
    """金融数据清洗器"""
    
    def __init__(self):
        self.cleaning_log = []
    
    def clean_daily_data(self, df: pd.DataFrame) -> pd.DataFrame:
        """日线数据清洗主流程"""
        df = df.copy()
        
        # Step 1: 基础清洗
        df = self._remove_duplicates(df)
        df = self._handle_missing_values(df)
        
        # Step 2: 复权处理(后复权)
        df = self._adjust_price(df)
        
        # Step 3: 异常值检测
        df = self._detect_outliers(df)
        
        # Step 4: 衍生字段计算
        df = self._compute_derived_fields(df)
        
        return df
    
    def _remove_duplicates(self, df: pd.DataFrame) -> pd.DataFrame:
        """去重:同一股票同一交易日只保留一条"""
        before = len(df)
        df = df.drop_duplicates(subset=['ts_code', 'trade_date'], keep='last')
        self.cleaning_log.append(f"去重: {before} -> {len(df)}")
        return df
    
    def _handle_missing_values(self, df: pd.DataFrame) -> pd.DataFrame:
        """处理缺失值"""
        # 停牌日的行情用前一日数据填充
        df = df.sort_values(['ts_code', 'trade_date'])
        fill_cols = ['open', 'high', 'low', 'close', 'vol']
        df[fill_cols] = df.groupby('ts_code')[fill_cols].ffill()
        return df
    
    def _adjust_price(self, df: pd.DataFrame) -> pd.DataFrame:
        """后复权价格调整"""
        if 'adj_factor' in df.columns:
            price_cols = ['open', 'high', 'low', 'close']
            for col in price_cols:
                df[f'{col}_adj'] = df[col] * df['adj_factor']
        return df
    
    def _detect_outliers(self, df: pd.DataFrame) -> pd.DataFrame:
        """异常值检测:日收益率超过 ±11% 的标记为异常(A股涨跌停 10%)"""
        df['returns'] = df.groupby('ts_code')['close'].pct_change()
        outlier_mask = df['returns'].abs() > 0.11
        if outlier_mask.any():
            self.cleaning_log.append(
                f"检测到 {outlier_mask.sum()} 条异常收益率数据"
            )
            df.loc[outlier_mask, 'is_outlier'] = True
        return df
    
    def _compute_derived_fields(self, df: pd.DataFrame) -> pd.DataFrame:
        """计算衍生字段"""
        # 日收益率
        df['daily_return'] = df.groupby('ts_code')['close_adj'].pct_change()
        
        # 对数收益率(更适合统计分析)
        df['log_return'] = np.log(
            df['close_adj'] / df['close_adj'].shift(1)
        )
        
        # 日振幅
        df['amplitude'] = (df['high_adj'] - df['low_adj']) / df['close_adj']
        
        return df

第三阶段:因子计算

因子计算是多因子模型的核心环节。我们需要把清洗后的数据转化为标准化的因子值。一个好的因子计算框架需要满足:

  1. 声明式定义:因子的计算逻辑可以用简洁的表达式描述
  2. 向量化计算:利用 NumPy/Pandas 的向量化操作,避免 Python 循环
  3. 跨期计算:支持滚动窗口、指数衰减等时间序列操作
  4. 标准化处理:因子值需要去极值、标准化、中性化
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
"""
因子计算引擎
"""
import pandas as pd
import numpy as np
from scipy import stats
from typing import Callable, Dict


class FactorEngine:
    """因子计算引擎"""
    
    def __init__(self):
        self.factor_registry: Dict[str, Callable] = {}
    
    def register_factor(self, name: str, func: Callable):
        """注册因子计算函数"""
        self.factor_registry[name] = func
    
    def compute_all_factors(self, daily_data: pd.DataFrame,
                            financial_data: pd.DataFrame) -> pd.DataFrame:
        """计算所有注册的因子"""
        results = {}
        for name, func in self.factor_registry.items():
            try:
                factor_values = func(daily_data, financial_data)
                results[name] = factor_values
            except Exception as e:
                print(f"因子 {name} 计算失败: {e}")
        
        factor_df = pd.DataFrame(results)
        return factor_df


# ========== 具体因子实现 ==========

def momentum_20d(daily: pd.DataFrame, _) -> pd.Series:
    """20日动量因子:过去20个交易日的累计收益率"""
    return daily.groupby('ts_code')['close_adj'].transform(
        lambda x: x.pct_change(20)
    )


def volatility_60d(daily: pd.DataFrame, _) -> pd.Series:
    """60日波动率因子:过去60日收益率的标准差"""
    return daily.groupby('ts_code')['daily_return'].transform(
        lambda x: x.rolling(60).std() * np.sqrt(252)
    )


def turnover_rate_avg(daily: pd.DataFrame, _) -> pd.Series:
    """20日平均换手率"""
    if 'turnover_rate' not in daily.columns:
        return pd.Series(dtype=float)
    return daily.groupby('ts_code')['turnover_rate'].transform(
        lambda x: x.rolling(20).mean()
    )


def pe_ttm(_, financial: pd.DataFrame) -> pd.Series:
    """滚动市盈率(TTM)"""
    if 'pe_ttm' in financial.columns:
        return financial['pe_ttm']
    return pd.Series(dtype=float)


def roe_ttm(_, financial: pd.DataFrame) -> pd.Series:
    """滚动净资产收益率"""
    if 'roe' in financial.columns:
        return financial['roe']
    return pd.Series(dtype=float)


# ========== 因子预处理 ==========

class FactorPreprocessor:
    """因子值预处理(去极值 + 标准化 + 中性化)"""
    
    @staticmethod
    def winsorize(series: pd.Series, n_std: float = 3.0) -> pd.Series:
        """MAD 去极值:将超出 n 倍标准差的值截断"""
        median = series.median()
        mad = (series - median).abs().median()
        upper = median + n_std * 1.4826 * mad
        lower = median - n_std * 1.4826 * mad
        return series.clip(lower, upper)
    
    @staticmethod
    def standardize(series: pd.Series) -> pd.Series:
        """Z-Score 标准化"""
        mean = series.mean()
        std = series.std()
        if std == 0 or pd.isna(std):
            return series * 0
        return (series - mean) / std
    
    @staticmethod
    def neutralize(factor_values: pd.Series,
                   industry_dummies: pd.DataFrame,
                   market_cap: pd.Series) -> pd.Series:
        """行业 + 市值中性化:回归残差作为因子值"""
        X = pd.concat([
            industry_dummies,
            np.log(market_cap)
        ], axis=1)
        X = X.dropna()
        y = factor_values.dropna()
        
        # 取交集
        common_idx = X.index.intersection(y.index)
        X, y = X.loc[common_idx], y.loc[common_idx]
        
        # OLS 回归取残差
        from numpy.linalg import lstsq
        beta, _, _, _ = lstsq(X.values, y.values, rcond=None)
        residuals = y - X.values @ beta
        
        return pd.Series(residuals, index=common_idx)
    
    @classmethod
    def process_factor(cls, factor_values: pd.Series,
                       industry_dummies: pd.DataFrame = None,
                       market_cap: pd.Series = None) -> pd.Series:
        """完整的因子预处理流程"""
        # Step 1: 去极值
        processed = cls.winsorize(factor_values)
        # Step 2: 标准化
        processed = cls.standardize(processed)
        # Step 3: 中性化(可选)
        if industry_dummies is not None and market_cap is not None:
            processed = cls.neutralize(processed, industry_dummies, market_cap)
        return processed

第四阶段:因子评估

算出因子之后,我们需要评估因子的有效性。核心指标是 IC(Information Coefficient)——因子值与未来收益率的截面相关系数。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
"""
因子评估模块
"""
import pandas as pd
import numpy as np


class FactorEvaluator:
    """因子有效性评估"""
    
    @staticmethod
    def calc_ic(factor_df: pd.DataFrame, 
                forward_returns: pd.Series,
                method: str = 'rank') -> pd.DataFrame:
        """
        计算因子的 IC 序列
        IC = 截面相关系数(某一日所有股票的因子值 vs 未来收益率)
        """
        ic_records = []
        dates = factor_df.index.get_level_values('trade_date').unique()
        
        for dt in dates:
            cross_section = factor_df.xs(dt, level='trade_date')
            fwd_ret = forward_returns.xs(dt, level='trade_date')
            
            common = cross_section.index.intersection(fwd_ret.index)
            if len(common) < 30:  # 样本太少则跳过
                continue
            
            for col in cross_section.columns:
                if method == 'rank':
                    ic = cross_section.loc[common, col].corr(
                        fwd_ret.loc[common], method='spearman'
                    )
                else:
                    ic = cross_section.loc[common, col].corr(
                        fwd_ret.loc[common]
                    )
                ic_records.append({
                    'date': dt, 'factor': col, 'IC': ic
                })
        
        ic_df = pd.DataFrame(ic_records)
        return ic_df
    
    @staticmethod
    def ic_summary(ic_df: pd.DataFrame) -> pd.DataFrame:
        """IC 统计摘要"""
        summary = ic_df.groupby('factor')['IC'].agg([
            ('IC_Mean', 'mean'),
            ('IC_Std', 'std'),
            ('IC_IR', lambda x: x.mean() / x.std() if x.std() > 0 else 0),
            ('IC_Positive_Ratio', lambda x: (x > 0).mean()),
            ('IC_Abs_Mean', lambda x: x.abs().mean()),
        ])
        return summary

一个"好因子"的标准大致是:

  • IC 均值:绝对值 > 0.03
  • IC_IR(IC 的夏普比率):> 0.5
  • IC 正比例:> 55%(做多因子)或 < 45%(做空因子)

回测系统:Backtrader 实战

有了因子之后,下一步就是构建策略并在历史数据上回测。这里我们使用 Backtrader——Python 生态中最成熟的事件驱动回测框架。

为什么选择 Backtrader

目前主流的 Python 回测框架有 Backtrader、Zipline、VectorBT、vnpy 等。它们各有优劣:

框架 类型 优势 劣势
Backtrader 事件驱动 生态成熟、文档丰富、易上手 速度较慢、已停止维护
VectorBT 向量化 速度极快(NumPy 向量化) 学习曲线陡、不适合复杂逻辑
Zipline 事件驱动 Quantopian 出身、设计优雅 已停止维护、数据绑定
vnpy 事件驱动 实盘交易支持好 偏 CTA 策略、多因子支持弱

对于多因子选股这类中低频策略,Backtrader 是一个很好的入门选择——它的事件驱动模型直观易懂,且能很好地与 Pandas 数据集成。

多因子选股策略回测代码

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
"""
基于 Backtrader 的多因子选股策略回测
策略逻辑:
  - 每月调仓一次
  - 根据综合因子得分选出 Top 20 股票
  - 等权持有
"""
import backtrader as bt
import pandas as pd
import numpy as np
from datetime import datetime


class MultiFactorStrategy(bt.Strategy):
    """多因子选股策略"""
    
    params = (
        ('rebalance_days', 20),      # 调仓周期(交易日)
        ('top_n', 20),               # 持有股票数量
        ('factor_weights', {         # 因子权重
            'momentum_20d': 0.3,
            'volatility_60d': -0.2,  # 低波动得分更高
            'turnover_avg': -0.1,
            'roe_ttm': 0.4,
        }),
    )
    
    def __init__(self):
        self.day_count = 0
        self.stock_names = [d._name for d in self.datas]
        self.order_dict = {}  # 记录挂单
        
        # 为每个数据源创建指标
        self.sma_indicators = {}
        for d in self.datas:
            self.sma_indicators[d._name] = bt.indicators.SMA(
                d.close, period=20
            )
    
    def next(self):
        self.day_count += 1
        
        # 每 N 个交易日调仓一次
        if self.day_count % self.params.rebalance_days != 0:
            return
        
        # 计算每只股票的因子综合得分
        scores = self._calculate_scores()
        
        # 选出 Top N 股票
        selected = sorted(scores.items(), key=lambda x: x[1], reverse=True)
        selected_stocks = [s[0] for s in selected[:self.params.top_n]]
        
        # 执行调仓
        self._rebalance(selected_stocks)
    
    def _calculate_scores(self):
        """计算当日各股票的综合因子得分"""
        scores = {}
        
        for d in self.datas:
            name = d._name
            score = 0
            
            # 动量因子:过去20日收益率
            if len(d) > 20:
                momentum = (d.close[0] - d.close[-20]) / d.close[-20]
                score += momentum * self.params.factor_weights['momentum_20d']
            
            # 波动率因子(用价格标准差近似)
            if len(d) > 60:
                prices = [d.close[-i] for i in range(60)]
                vol = np.std(prices) / np.mean(prices)
                score += vol * self.params.factor_weights['volatility_60d']
            
            scores[name] = score
        
        return scores
    
    def _rebalance(self, selected_stocks):
        """执行调仓:卖出不在列表中的,买入新入选的"""
        # 获取当前持仓
        current_holdings = [
            d._name for d in self.datas 
            if self.getposition(d).size > 0
        ]
        
        # 卖出:持有但不在新名单中的
        for d in self.datas:
            if d._name in current_holdings and d._name not in selected_stocks:
                self.order_target_percent(d, target=0)
        
        # 买入:等权分配资金
        if selected_stocks:
            target_weight = 1.0 / len(selected_stocks)
            for d in self.datas:
                if d._name in selected_stocks:
                    self.order_target_percent(d, target=target_weight)
    
    def notify_order(self, order):
        """订单回报处理"""
        if order.status in [order.Completed]:
            action = '买入' if order.isbuy() else '卖出'
            print(f"{self.datetime.date()} {action} "
                  f"{order.data._name}: "
                  f"价格={order.executed.price:.2f}, "
                  f"数量={order.executed.size:.0f}")


def run_backtest():
    """运行回测主函数"""
    # 创建 Cerebro 引擎
    cerebro = bt.Cerebro()
    
    # 添加策略
    cerebro.addstrategy(MultiFactorStrategy)
    
    # 设置初始资金和手续费
    cerebro.broker.setcash(1000000.0)  # 100 万初始资金
    cerebro.broker.setcommission(commission=0.001)  # 千一手续费
    
    # 加载数据(这里用 Pandas DataFeed)
    # 实际使用时替换为真实的股票数据
    stock_symbols = ['600519', '000858', '601318', '600036', 
                     '000333', '600276', '601166', '000002',
                     '600900', '601888', '002415', '300750',
                     '600809', '000568', '002304', '601398',
                     '600030', '601668', '600887', '000651',
                     '601012', '603259', '002714', '300059']
    
    for symbol in stock_symbols:
        # 实际项目中从数据库或CSV加载
        # df = load_stock_data(symbol, '2020-01-01', '2025-12-31')
        # data = bt.feeds.PandasData(dataname=df, name=symbol)
        # cerebro.adddata(data)
        pass
    
    # 添加分析器
    cerebro.addanalyzer(bt.analyzers.SharpeRatio, _name='sharpe')
    cerebro.addanalyzer(bt.analyzers.DrawDown, _name='drawdown')
    cerebro.addanalyzer(bt.analyzers.Returns, _name='returns')
    cerebro.addanalyzer(bt.analyzers.TradeAnalyzer, _name='trades')
    
    # 运行回测
    print(f"初始资金: {cerebro.broker.getvalue():,.2f}")
    results = cerebro.run()
    print(f"最终资金: {cerebro.broker.getvalue():,.2f}")
    
    # 输出绩效指标
    strat = results[0]
    sharpe = strat.analyzers.sharpe.get_analysis()
    drawdown = strat.analyzers.drawdown.get_analysis()
    
    print(f"\n===== 回测绩效报告 =====")
    print(f"夏普比率: {sharpe.get('sharperatio', 'N/A')}")
    print(f"最大回撤: {drawdown.max.drawdown:.2f}%")
    print(f"最大回撤持续: {drawdown.max.len} 天")
    
    return results


if __name__ == '__main__':
    run_backtest()

向量化回测:更快的替代方案

Backtrader 的事件驱动模式虽然灵活,但速度较慢。对于简单的多因子策略,向量化回测可以把速度提升 100 倍以上:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
"""
向量化回测:用 Pandas 实现快速因子回测
"""
import pandas as pd
import numpy as np


def vectorized_backtest(factor_scores: pd.DataFrame,
                        returns: pd.DataFrame,
                        top_n: int = 20,
                        rebalance_freq: int = 20) -> pd.DataFrame:
    """
    向量化回测引擎
    
    Args:
        factor_scores: 因子得分矩阵 (index=date, columns=stocks)
        returns: 日收益率矩阵 (index=date, columns=stocks)
        top_n: 每期持有股票数
        rebalance_freq: 调仓频率(交易日)
    
    Returns:
        回测结果 DataFrame
    """
    dates = factor_scores.index
    portfolio_returns = []
    holdings_history = []
    
    current_holdings = []
    
    for i, date in enumerate(dates):
        # 调仓日:重新选股
        if i % rebalance_freq == 0:
            scores_today = factor_scores.loc[date].dropna()
            if len(scores_today) >= top_n:
                current_holdings = scores_today.nlargest(top_n).index.tolist()
        
        # 计算当日组合收益(等权)
        if current_holdings:
            daily_ret = returns.loc[date, current_holdings].mean()
        else:
            daily_ret = 0.0
        
        portfolio_returns.append({
            'date': date,
            'portfolio_return': daily_ret,
            'n_holdings': len(current_holdings)
        })
    
    result = pd.DataFrame(portfolio_returns).set_index('date')
    result['cumulative_return'] = (1 + result['portfolio_return']).cumprod()
    
    # 计算绩效指标
    annual_return = result['portfolio_return'].mean() * 252
    annual_vol = result['portfolio_return'].std() * np.sqrt(252)
    sharpe = annual_return / annual_vol if annual_vol > 0 else 0
    
    # 最大回撤
    cummax = result['cumulative_return'].cummax()
    drawdown = (result['cumulative_return'] - cummax) / cummax
    max_drawdown = drawdown.min()
    
    print(f"年化收益率: {annual_return:.2%}")
    print(f"年化波动率: {annual_vol:.2%}")
    print(f"夏普比率: {sharpe:.2f}")
    print(f"最大回撤: {max_drawdown:.2%}")
    
    return result

工程实践中的关键挑战

1. 前视偏差(Look-ahead Bias)

前视偏差是量化回测中最致命的陷阱。它指的是在回测中不小心使用了"未来数据"——比如在 2024 年 3 月做决策时用到了 2024 年 6 月才发布的财务报表数据。

从工程角度防范前视偏差:

  • Point-in-Time 数据库:记录数据的实际发布时间,而非报告期
  • 严格的回测时间轴:策略只能访问当前时间点之前的数据
  • 数据版本化:财务数据的每次修正都要保留历史版本

2. 幸存者偏差(Survivorship Bias)

如果你只用当前还在上市的股票做回测,就会忽略那些已经退市的公司。这会导致回测结果虚高——因为你"恰好"避开了那些表现最差、最终退市的股票。

解决方案是维护一个包含历史全部股票的数据库,包括已退市的。

3. 过拟合(Overfitting)

当你在历史数据上反复调整因子权重和参数,最终找到了一个"完美"的策略——别高兴太早,这大概率是过拟合。你只是在历史数据上找到了噪声的模式,而不是真正的市场规律。

工程上的对策:

  • 样本外测试:数据分为训练集、验证集、测试集
  • Walk-Forward 分析:滚动窗口训练 + 前瞻验证
  • 参数敏感性分析:检查策略在参数微调时是否稳定

4. 交易成本建模

很多回测看起来很美,一旦加入真实的交易成本就亏钱了。需要建模的成本包括:

  • 佣金:国内 A 股约万三(双向)
  • 印花税:卖出千一
  • 滑点:市价单的成交价与下单价的偏差,通常按 1-2 个 tick 计算
  • 冲击成本:大资金买卖对股价的影响

2026 年量化数据工程技术栈趋势

随着技术的发展,量化投资的数据工程也在快速演进:

计算层的变化:

  • Polars 替代 Pandas:Rust 编写的 Polars 在大规模数据处理上比 Pandas 快 10-100 倍,越来越多的量化团队开始迁移
  • GPU 加速:cuDF(RAPIDS)和 JAX 被用于大规模因子计算和机器学习模型训练
  • 流式计算:Flink/Kafka Streams 用于实时因子计算,替代传统的批处理模式

存储层的变化:

  • ClickHouse 成为标配:列式存储引擎非常适合金融时序数据的聚合查询
  • Arctic + MongoDB:专门为金融时序数据设计的存储方案
  • Lakehouse 架构:Iceberg/Delta Lake 让量化团队可以在数据湖上做 ACID 事务

基础设施的变化:

  • Airflow/Prefect 替代 Cron 做任务调度,支持 DAG 依赖和重试
  • MLflow/Weights & Biases 做因子和模型的实验追踪
  • Feature Store(如 Feast)管理因子的版本、血缘和在线服务

AI 的融合:

  • 大模型辅助因子挖掘:用 LLM 从研报和新闻中提取投资逻辑,转化为可计算的因子
  • 强化学习优化组合:用 RL Agent 替代传统的均值-方差优化
  • 图神经网络:建模公司间的供应链关系、持股关系,提取关联因子

写在最后

从程序员的角度看,量化交易系统本质上就是一个高要求的分布式数据处理系统——它对数据质量的要求比电商系统高几个数量级,对延迟的容忍度比 Web 系统低几个数量级,对正确性的要求比推荐系统严格几个数量级(因为错误的代价是真金白银)。

但反过来说,做量化数据工程也是一个非常好的学习机会。它迫使你深入理解:

  • 时序数据的高效存储和查询
  • 大规模向量化计算的性能优化
  • 复杂数据管道的可靠性和可维护性
  • 回测框架的设计模式和抽象能力

这些能力在任何领域的数据密集型系统中都是通用的。即使你最终不做量化交易,在搭建这套系统的过程中学到的数据工程技能,也足以让你在任何一个数据平台的建设中大显身手。

量化交易的门槛不在金融知识,而在工程能力。一个能搭建高可用、低延迟、数据准确的数据管道的工程师,在量化行业里比一个懂 Black-Scholes 模型但写不出好代码的金融博士更有价值。


参考资料

广告
广告位预留中 (728x90)