[淘股吧]
MACD金叉死叉回测程序 - 详细注释版
====================================

【程序功能简介】
这个程序用来检验一个常见的股票交易策略:MACD金叉买入、死叉卖出,到底能不能赚钱?

它会做以下几件事:
1. 读取所有A股的日线数据(CSV格式)
2. 计算每只股票的MACD指标,识别 金叉 和 死叉 信号
3. 统计信号出现后,持有1/2/3/5/10/20天的平均收益率
4. 按行业分组统计,看看哪些行业的MACD信号更有效
5. 将所有结果输出为格式化的Excel文件

【什么是MACD?】
MACD(Moving Average Convergence Divergence)是一种技术分析指标:
- DIF线 = 短期均线(12日) - 长期均线(26日)
- DEA线 = DIF的9日移动平均
- 金叉:DIF从下方穿越DEA,传统上被视为买入信号
- 死叉:DIF从上方穿越DEA,传统上被视为卖出信号

【数据格式要求】
1. 股票数据:CSV文件,文件名为股票代码(如 sh600000.csv),GBK编码
- 第一行是说明文字(会被跳过),第二行是列名
- 必须包含列:股票代码、股票名称、交易日期、开盘价、最高价、最低价、收盘价、前收盘价、成交量
- 必须包含列:新版申万一级行业名称(用于按行业分组)
2. 指数数据:CSV文件(如 sh000300.csv),GBK编码
- 列名:candle_end_time, open, high, low, close, amount, volume, index_code


# ===================== 第一部分:导入依赖库 =====================
# 下面这些是程序需要用到的 工具箱 ,每个库负责不同的功能

import os # 操作系统相关:用于读取文件夹、拼接路径等
import warnings # 警告控制:忽略一些不影响结果的警告信息
import pandas as pd # 数据处理核心库:用于处理表格数据(类似Excel操作)
import numpy as np # 数值计算库:用于高速数学运算
from concurrent.futures import ProcessPoolExecutor, as_completed # 多进程库:让多个CPU核心同时工作,加快处理速度
from decimal import Decimal, ROUND_HALF_UP # 精确小数运算:用于涨跌停价的四舍五入(避免浮点误差)
from numba import jit # 代码加速库:把Python代码编译为机器码,大幅提升计算速度

# 忽略pandas产生的警告信息(不影响结果,只是让输出更干净)
warnings.filterwarnings( ignore )

# 设置pandas显示选项
pd.set_option(‘expand_frame_repr‘, False) # 数据太宽时不换行显示(一行显示完)
pd.set_option(‘display.max_rows‘, 5000) # 最多显示5000行(防止数据被省略)

# ===================== 第二部分:全局配置 =====================
# 下面这些参数控制程序的运行行为,可以根据需要修改

# --- 交易成本 ---
# A股交易成本包括:券商佣金(买卖各收一次)+ 印花税(只在卖出时收)
c_rate = 1.5 / 10000 # 佣金费率:万分之一点五(买卖各一次,所以总共收两次)
t_rate = 1 / 1000 # 印花税率:千分之一(只在卖出时收取)

# --- 持有天数 ---
# 程序会分别统计信号出现后持有这些天数的收益率
# 比如持有1天 = 信号日第二天开盘买入,第三天开盘卖出
day_list = [1, 2, 3, 5, 10, 20]

# --- 测试时间段 ---
# 只分析这个时间段内的信号(格式:年月日,8位数字)
start_time = ‘20070101‘ # 开始日期:2007年1月1日
end_time = ‘20261231‘ # 结束日期:2026年12月31日

# ============== 用户配置(请修改为本地路径) ==============
# 【重要!】运行前必须把下面三个路径改成你自己电脑上的实际路径

# 1. 股票日线数据文件夹路径
# 文件夹里应该包含所有股票的CSV文件,每个CSV一个股票
# 文件名格式如:sh600000.csv(浦发银行)、sz000001.csv(平安银行
file_path = r‘D:\万事不愁\stock-trading-data-pro-2026-03-12/‘

# 2. 沪深300指数数据文件路径
# 用于计算 超额收益 ——即跑赢大盘多少
# 文件名如:sh000300.csv
index_path = r‘C:\Users\abc\ZCodeProject\MACD4\sh000300.csv‘

# 3. 结果输出路径
# 程序运行后,Excel结果文件会保存在这个文件夹里
output_path = r‘C:\Users\abc\ZCodeProject\MACD4/‘

# ===================== 第三部分:数据加载函数 =====================

def rehabilitation(df):

【后复权处理】
这个函数的作用是消除股票 分红、送股 对价格的影响。

举个例子:某股票今天每股100元,明天 每10股送10股 (股价除以2变50元)
如果不做复权处理,程序会误以为股票暴跌了50%,产生错误的MACD信号。

后复权的做法:
- 历史价格保持不变(就是当时真实的交易价格)
- 最新的价格被向上调整(反映所有历史分红送股的累计价值)

这样算出来的MACD才是连续的、有意义的。

# 第一步:计算每天的涨跌幅
# 涨跌幅 = 今天收盘价 / 昨天收盘价 - 1
df[‘涨跌幅‘] = df[‘收盘价‘] / df[‘前收盘价‘] - 1

# 第二步:计算复权因子
# cumprod() 是 累乘 ,把每天的 (1+涨跌幅) 连乘起来
# 这样得到一个从第一天到最后一天的累积涨跌幅序列
df[‘复权因子‘] = (1 + df[‘涨跌幅‘]).cumprod()

# 第三步:计算后复权因子
# 最后一天的复权因子 / 当天的复权因子
# 第一天的因子最大(因为分红送股的累积效果最大),最后一天因子=1
df[‘复权因子_后复权‘] = df[‘复权因子‘].iloc[-1] / df[‘复权因子‘]

# 第四步:用后复权因子调整四个价格
df[‘收盘价_复权‘] = df[‘收盘价‘] * df[‘复权因子_后复权‘]
df[‘开盘价_复权‘] = df[‘开盘价‘] * df[‘复权因子_后复权‘]
df[‘最高价_复权‘] = df[‘最高价‘] * df[‘复权因子_后复权‘]
df[‘最低价_复权‘] = df[‘最低价‘] * df[‘复权因子_后复权‘]

# 第五步:列名替换
# 把原始价格保存为 xxx_原 ,把复权后的价格用标准列名(后面计算要用)
df.rename(columns={
‘开盘价‘: ‘开盘价_原‘, # 原始开盘价 -> 备份
‘最高价‘: ‘最高价_原‘, # 原始最高价 -> 备份
‘最低价‘: ‘最低价_原‘, # 原始最低价 -> 备份
‘收盘价‘: ‘收盘价_原‘, # 原始收盘价 -> 备份
‘开盘价_复权‘: ‘开盘价‘, # 复权开盘价 -> 成为标准列名
‘最高价_复权‘: ‘最高价‘, # 复权最高价 -> 成为标准列名
‘最低价_复权‘: ‘最低价‘, # 复权最低价 -> 成为标准列名
‘收盘价_复权‘: ‘收盘价‘ # 复权收盘价 -> 成为标准列名
}, inplace=True)
return df

def load_file(path, file):

【加载CSV数据文件】
从硬盘读取一个CSV文件,转换为pandas的DataFrame格式(可以理解为程序里的Excel表)

参数:
path: 文件所在的文件夹路径
file: 文件名(如 sh600000.csv)

返回:
df: 加载后的数据表格

path += file # 拼接完整文件路径:文件夹路径 + 文件名

if file in [‘sh000001.csv‘, ‘sh000300.csv‘]:
# ---------- 指数文件的处理方式 ----------
# 指数CSV的列名是英文(open, close等),需要翻译成中文
# 第一行就是数据(没有说明行),编码是GBK(中文Windows常用编码)
df = pd.read_csv(path, encoding=‘gbk‘, parse_dates=[‘candle_end_time‘])
df.rename(
columns={
‘candle_end_time‘: ‘交易日期‘, # 日期列名翻译
‘open‘: ‘开盘价‘, # 开盘价翻译
‘high‘: ‘最高价‘, # 最高价翻译
‘low‘: ‘最低价‘, # 最低价翻译
‘close‘: ‘收盘价‘ # 收盘价翻译
},
inplace=True
)
# 指数文件没有 前收盘价 列,需要手动计算
# 前收盘价 = 上一交易日的收盘价(shift(1)就是把数据往下移一行)
df[‘前收盘价‘] = df[‘收盘价‘].shift()
# 第一天没有前一天,用当天的开盘价填充
df[‘前收盘价‘].fillna(value=df[‘开盘价‘], inplace=True)
else:
# ---------- 股票文件的处理方式 ----------
# 股票CSV第一行是说明文字(skiprows=1跳过),第二行才是列名
df = pd.read_csv(path, encoding=‘gbk‘, parse_dates=[‘交易日期‘], skiprows=1)

return df

def merge_index(df, index_df):

【将股票数据和指数数据合并】
目的:把沪深300指数的每日数据拼到每只股票的数据里,方便后续计算超额收益。

合并方式说明:
- 以指数的交易日为基准(how=‘right‘)
- 如果某天股票停牌了,股票价格为空,但指数那天有数据,用前一天的价格填充
- 如果股票还没上市,那些行会被删掉

参数:
df: 股票数据
index_df: 沪深300指数数据

返回:
合并后的数据

# pd.merge 类似Excel的V LOOK UP,按 交易日期 列匹配两份数据
df = pd.merge(left=df, right=index_df, on=‘交易日期‘, how=‘right‘, sort=True, indicator=True)

# 对股票价格进行补全处理(停牌日没有数据,需要填充)
df[‘收盘价‘] = df[‘收盘价‘].ffill() # 收盘价:用前一天的值填充
df[‘开盘价‘] = df[‘开盘价‘].fillna(df[‘收盘价‘]) # 开盘价:空的话用当天收盘价
df[‘最高价‘] = df[‘最高价‘].fillna(df[‘收盘价‘]) # 最高价:同上
df[‘最低价‘] = df[‘最低价‘].fillna(df[‘收盘价‘]) # 最低价:同上
df[‘前收盘价‘] = df[‘前收盘价‘].fillna(df[‘收盘价‘].shift()) # 前收盘价:用前一天的收盘价

# 对其余所有列也用前一天数据填充(比如行业名称、股票代码等不会变的字段)
df = df.ffill()

# 删除股票上市之前的数据(这些行中股票代码为空)
df = df[df[‘股票代码‘].notnull()]

# 标记每天是否有交易(后面可能用到)
df[‘是否交易‘] = 1
# _merge列是pandas自动添加的,‘right_only‘表示只有指数有数据(股票已退市或还没上市)
df.loc[df[‘_merge‘] == ‘right_only‘, ‘是否交易‘] = 0
del df[‘_merge‘] # 用完了就删掉,保持数据干净

df.reset_index(drop=True, inplace=True) # 重新编排行号(从0开始)
return df

# ===================== 第四部分:技术指标计算 =====================

@jit(nopython=True, cache=True)
def fast_ewma(data, span):

【指数移动平均(EMA)- 加速版】

EMA是什么?
普通的移动平均(MA)是对最近N天的价格取算术平均,每天权重一样。
EMA是 指数 移动平均,越近的价格权重越大,越远的价格权重越小。
这样EMA对最新价格变化更敏感,反应更快。

计算公式:EMA(today) = alpha × price(today) + (1 - alpha) × EMA(yesterday)
其中 alpha = 2 / (span + 1),span就是天数

@jit 装饰器:让这个函数被编译为机器码,速度提升约100倍
普通Python循环很慢,但编译后就很快了。

参数:
data: 价格序列(一维数组)
span: EMA的天数(如12、26、9)

返回:
EMA序列

alpha = 2.0 / (span + 1.0) # 计算平滑系数
result = np.empty_like(data) # 创建一个和输入同样大小的空数组
result[0] = data[0] # 第一天的EMA就等于第一天的价格
for i in range(1, len(data)):
# 核心公式:今天的EMA = alpha×今天价格 + (1-alpha)×昨天的EMA
result = alpha * data[i] + (1 - alpha) * result[i - 1]
return result

def cal_macd(df):

【计算MACD指标 + 涨跌停判断 + 金叉死叉信号】

这个函数做了三件事:
1. 计算每天的涨跌停价,并判断涨跌停状态
2. 计算后复权价格和MACD指标
3. 识别金叉(买入信号)和死叉(卖出信号)

为什么先算涨跌停再算复权?
涨跌停价的计算需要用到 原始价格 (未复权的),
因为交易所的涨跌停规则是基于原始价格的。
算完涨跌停后,再切换到复权价格来计算MACD。

# ============ 第一步:计算涨跌停价 ============
# A股有涨跌停限制,每天最多涨/跌10%(ST股5%,科创板/创业板20%,北交所30%)

# 默认规则:普通股票涨跌幅限制为 ±10%
cond = df[‘股票名称‘].str.contains(‘ST‘) # 找出所有ST股票(名字里含有 ST )
df[‘涨停价‘] = df[‘前收盘价‘] * 1.1 # 涨停价 = 前收盘价 × 1.1
df[‘跌停价‘] = df[‘前收盘价‘] * 0.9 # 跌停价 = 前收盘价 × 0.9
# ST股票涨跌幅限制为 ±5%
df.loc[cond, ‘涨停价‘] = df[‘前收盘价‘] * 1.05 # ST涨停价 = 前收盘价 × 1.05
df.loc[cond, ‘跌停价‘] = df[‘前收盘价‘] * 0.95 # ST跌停价 = 前收盘价 × 0.95

# 2020年8月3日之后,注册制改革导致涨跌停规则变化:
# 科创板(股票代码以sh68开头)和创业板(以sz30开头)涨跌幅限制改为 ±20%
new_rule_kcb = (df[‘交易日期‘] > pd.to_datetime(‘2020-08-03‘)) df[‘股票代码‘].str.contains(‘sh68‘)
new_rule_cyb = (df[‘交易日期‘] > pd.to_datetime(‘2020-08-03‘)) df[‘股票代码‘].str.contains(‘sz30‘)
cond_bj = df[‘股票代码‘].str.contains(‘bj‘) # 北交所股票(以bj开头)

# 科创板 创业板:±20%
df.loc[new_rule_kcb | new_rule_cyb, ‘涨停价‘] = df[‘前收盘价‘] * 1.2
df.loc[new_rule_kcb | new_rule_cyb, ‘跌停价‘] = df[‘前收盘价‘] * 0.8

# 北交所:±30%
df.loc[cond_bj, ‘涨停价‘] = df[‘前收盘价‘] * 1.3
df.loc[cond_bj, ‘跌停价‘] = df[‘前收盘价‘] * 0.7

# 涨跌停价需要四舍五入到2位小数(A股最小价格单位是0.01元)
# 用Decimal做精确运算,避免 1.10 * 100 = 109.999999 这种浮点误差
df[‘涨停价‘] = df[‘涨停价‘].apply(
lambda x: float(Decimal(x * 100).quantize(Decimal(‘1‘), rounding=ROUND_HALF_UP) / 100))
df[‘跌停价‘] = df[‘跌停价‘].apply(
lambda x: float(Decimal(x * 100).quantize(Decimal(‘1‘), rounding=ROUND_HALF_UP) / 100))

# ============ 第二步:判断涨跌停状态 ============
# 一字涨停:全天最低价都 >= 涨停价(说明从开盘到收盘一直封死在涨停板,根本买不到)
df[‘一字涨停‘] = df[‘最低价‘] >= df[‘涨停价‘]
# 一字跌停:全天最高价都 <= 跌停价(说明从开盘到收盘一直封死在跌停板,根本卖不出去)
df[‘一字跌停‘] = df[‘最高价‘] <= df[‘跌停价‘]
# 开盘涨停:开盘价就 >= 涨停价(同样买不到)
df[‘开盘涨停‘] = df[‘开盘价‘] >= df[‘涨停价‘]
# 开盘跌停:开盘价就 <= 跌停价
df[‘开盘跌停‘] = df[‘开盘价‘] <= df[‘跌停价‘]

# ============ 第三步:计算后复权价格 ============
# 调用前面定义的函数,把原始价格转为后复权价格
# 注意:复权后,‘收盘价‘等列名已经是复权后的价格了
df = rehabilitation(df)

# ============ 第四步:计算MACD指标 ============
# 取出收盘价,转为numpy数组(用于加速计算)
close_array = df[‘收盘价‘].values.astype(np.float64)

# 计算12日EMA和26日EMA
ema12 = fast_ewma(close_array, 12) # 短期趋势线
ema26 = fast_ewma(close_array, 26) # 长期趋势线

# DIF = 短期EMA - 长期EMA
# DIF > 0 说明短期趋势强于长期(多头趋势)
# DIF < 0 说明短期趋势弱于长期(空头趋势)
dif = ema12 - ema26

# DEA = DIF的9日EMA(对DIF再做一次平滑)
dea = fast_ewma(dif, 9)

# MACD柱 = 2 × (DIF - DEA)
# 通常以柱状图显示,柱子越高说明多头力量越强
macd = 2 * (dif - dea)

# 把计算结果存回数据表
df[‘EMA12‘] = ema12
df[‘EMA26‘] = ema26
df[‘DIF‘] = dif
df[‘DEA‘] = dea
df[‘MACD‘] = macd

# ============ 第五步:识别金叉和死叉信号 ============
# 金叉(看涨信号):今天DIF > DEA,且昨天DIF <= DEA
# 意味着DIF线刚刚从下方穿越DEA线,短期趋势开始走强
# shift(1)表示取前一天的数据
con1 = (df[‘DIF‘] > df[‘DEA‘]) (df[‘DIF‘].shift(1) <= df[‘DEA‘].shift(1))

# 死叉(看跌信号):今天DIF < DEA,且昨天DIF >= DEA
# 意味着DIF线刚刚从上方穿越DEA线,短期趋势开始走弱
con2 = (df[‘DIF‘] < df[‘DEA‘]) (df[‘DIF‘].shift(1) >= df[‘DEA‘].shift(1))

# 把信号写入数据表
df.loc[con1, ‘signal‘] = 1 # 金叉标记为1
df.loc[con2, ‘signal‘] = 0 # 死叉标记为0
# 没有信号的行,signal为NaN(空值)

return df

# ===================== 第五部分:核心回测逻辑 =====================

def process_file(f, file_path, index_df, day_list, start_time, end_time):

【处理单只股票 - 核心回测函数】

这是整个程序最核心的函数,对一只股票完成全部回测流程:
1. 加载数据
2. 合并指数
3. 计算MACD
4. 计算信号后的收益率
5. 过滤不合格的信号
6. 返回有效信号数据

参数:
f: 文件名
file_path: 数据文件夹路径
index_df: 沪深300指数数据
day_list: 持有天数列表
start_time: 开始日期
end_time: 结束日期

返回:
有效信号的数据(DataFrame),如果没有有效信号则返回None

try:
# 步骤1:加载股票数据
df = load_file(file_path, f)

# 步骤2:检查数据是否完整
# 如果CSV里没有 行业 这一列,说明数据格式不对,跳过
if ‘新版申万一级行业名称‘ not in df.columns:
return None

# 步骤3:与沪深300指数数据合并
df = merge_index(df, index_df)

# 步骤4:剔除上市不足250天的新股
# 新股前250天的价格波动通常不正常(涨跌幅限制不同、炒作严重),
# 且MACD需要足够的历史数据才能准确计算,所以去掉前250天
df[‘交易天数‘] = df.index + 1
df = df[df[‘交易天数‘] > 250]
if df.empty:
return None

# 步骤5:计算MACD指标 + 涨跌停判断 + 信号识别
df = cal_macd(df)

# 步骤6:计算信号出现后的收益率
# 对每个持有天数,计算两条收益数据:
# a) 绝对收益:买入该股票能赚多少
# b) 超额收益:跑赢沪深300多少
#
# 收益计算逻辑(非常重要):
# 假设信号出现在T日
# T+1日开盘时买入(用开盘价),T+N日收盘时卖出(用收盘价)
# 收益率 = (卖出价 / 买入价 - 1) - 交易成本
# 交易成本 = 买入时佣金 + 卖出时佣金+印花税
for day in day_list:
# 绝对收益率(扣除双边交易成本)
# shift(-day) 表示取N天后的数据,shift(-1) 表示取明天的数据
# 收益 = (N日后收盘价 / 明天开盘价 - 1) × (1 - 卖出佣金 - 印花税) × (1 - 买入佣金)
df[f‘{day}日后涨跌幅‘] = (
df[‘收盘价‘].shift(-day) / df[‘开盘价‘].shift(-1) - 1
) * (1 - c_rate - t_rate) * (1 - c_rate)

# 超额收益率 = 个股绝对收益 - 指数收益
# 指数收益用指数的收盘价和开盘价计算(指数列名是close和open)
df[f‘{day}日后_相对指数涨跌幅‘] = (
(df[‘收盘价‘].shift(-day) / df[‘开盘价‘].shift(-1) - 1) * (1 - c_rate - t_rate) * (1 - c_rate)
) - (df[‘close‘].shift(-day) / df[‘open‘].shift(-1) - 1)

# 步骤7:预先计算 第二天 的状态
# 因为我们是在信号日(T日)做决策,实际买入是T+1日
# 所以需要知道T+1日的状态,来决定是否要排除这个信号
# shift(-1) 就是把数据往上移一行,让每行能看到明天的状态
df[‘下日_开盘涨停‘] = df[‘开盘涨停‘].shift(-1) # 明天是否开盘涨停
df[‘下日_是否ST‘] = df[‘股票名称‘].str.contains(‘ST‘).shift(-1) # 明天是否被ST
df[‘下日_是否S‘] = df[‘股票名称‘].str.contains(‘S‘).shift(-1) # 明天名字是否带S
df[‘下日_是否退市‘] = df[‘股票名称‘].str.contains(‘退‘).shift(-1) # 明天是否退市

# 步骤8:逐层过滤,只保留有效的交易信号

# 8a. 时间范围过滤:只保留指定时间段内的数据
df = df[(df[‘交易日期‘] >= pd.to_datetime(start_time)) (df[‘交易日期‘] <= pd.to_datetime(end_time))]

# 8b. 剔除当天零成交(说明停牌了,无法交易)
df = df[df[‘成交量‘] > 0]

# 8c. 剔除下日开盘涨停的情况(买不到)
# fillna(False) 把空值填充为False,避免NaN参与比较时报错
df = df[df[‘下日_开盘涨停‘].fillna(False) == False]

# 8d. 剔除ST股、带S标记的股票、退市股
# 这些股票交易限制多、风险大,不适合用技术指标分析
df = df[df[‘下日_是否S‘].fillna(False) == False]
df = df[df[‘下日_是否ST‘].fillna(False) == False]
df = df[df[‘下日_是否退市‘].fillna(False) == False]

# 8e. 剔除北交所和科创板股票
# 这些板块的涨跌停规则和交易制度不同,混在一起统计没有意义
df = df[~df[‘股票代码‘].str.contains(‘bj‘)] # 排除北交所
df = df[~df[‘股票代码‘].str.contains(‘sh68‘)] # 排除科创板

# 8f. 只保留有信号的行(金叉或死叉)
# 没有信号的日期对我们没有意义
df = df[df[‘signal‘].notna()]

return df

except Exception as e:
# 如果处理过程中出错,打印错误信息并跳过这只股票(不影响其他股票的处理)
print(f Error processing file {f}: {e} )
return None

# ===================== 第六部分:统计分析函数 =====================

def analyze_overall(all_df, day_list):

【总体分析 - 不分行业】

把所有股票的金叉/死叉信号合在一起统计,回答以下问题:
- 金叉出现后,平均能赚多少?
- 金叉出现后,上涨的概率是多少?
- 死叉出现后,平均会亏多少?

参数:
all_df: 所有股票的有效信号数据
day_list: 持有天数列表

返回:
统计结果表格

results = []

# 分别处理两种信号类型
for signal_type, signal_name in [(1, ‘看涨信号(金叉)‘), (0, ‘看跌信号(死叉)‘)]:
# 筛选出该类型的所有信号
signal_df = all_df[all_df[‘signal‘] == signal_type]

if signal_df.empty:
continue

# 对每个持有天数分别统计
for day in day_list:
return_col = f‘{day}日后涨跌幅‘ # 收益率列名
excess_col = f‘{day}日后_相对指数涨跌幅‘ # 超额收益率列名

returns = signal_df[return_col].dropna() # 去掉空值(最后几天没有未来数据)
excess_returns = signal_df[excess_col].dropna()

if returns.empty:
continue

# 把收益分为 盈利 和 亏损 两组
up_returns = returns[returns > 0] # 盈利的交易
down_returns = returns[returns <= 0] # 亏损的交易

if signal_type == 1: # ===== 看涨信号(金叉)的统计 =====
up_prob = len(up_returns) / len(returns) if len(returns) > 0 else 0 # 上涨概率
avg_up_return = up_returns.mean() if len(up_returns) > 0 else 0 # 平均盈利幅度
avg_down_return = down_returns.mean() if len(down_returns) > 0 else 0 # 平均亏损幅度
avg_return = returns.mean() # 每笔平均收益(所有交易一起平均)
avg_excess = excess_returns.mean() if len(excess_returns) > 0 else 0 # 平均超额收益

results.append({
‘行业名称‘: ‘【总体】‘,
‘信号类型‘: signal_name,
‘持有天数‘: day,
‘上涨概率‘: up_prob, # 金叉后涨的概率
‘平均上涨收益‘: avg_up_return, # 涨了的话平均涨多少
‘平均下跌收益‘: avg_down_return, # 跌了的话平均跌多少
‘每笔平均收益‘: avg_return, # 所有交易平均下来赚多少
‘超额收益‘: avg_excess, # 相比沪深300多赚了多少
‘样本数量‘: len(returns) # 一共有多少笔交易
})
else: # ===== 看跌信号(死叉)的统计 =====
down_prob = len(down_returns) / len(returns) if len(returns) > 0 else 0 # 下跌概率
avg_up_return = up_returns.mean() if len(up_returns) > 0 else 0
avg_down_return = down_returns.mean() if len(down_returns) > 0 else 0
avg_return = returns.mean()
avg_excess = excess_returns.mean() if len(excess_returns) > 0 else 0

results.append({
‘行业名称‘: ‘【总体】‘,
‘信号类型‘: signal_name,
‘持有天数‘: day,
‘下跌概率‘: down_prob, # 死叉后跌的概率
‘平均上涨收益‘: avg_up_return,
‘平均下跌收益‘: avg_down_return,
‘每笔平均收益‘: avg_return,
‘超额收益‘: avg_excess,
‘样本数量‘: len(returns)
})

return pd.DataFrame(results) # 把结果列表转为表格

def analyze_by_industry(all_df, day_list):

【按行业分析】

和总体分析逻辑完全一样,只是多了一层:按申万一级行业分组统计。
这样可以看出不同行业的MACD信号效果差异。

比如:银行股的金叉信号可能比科技股的金叉信号更有效。

参数和返回值同 analyze_overall

results = []
industries = all_df[‘新版申万一级行业名称‘].unique() # 获取所有行业名称

for industry in industries:
if pd.isna(industry):
continue # 跳过行业为空的记录

# 筛选出该行业的所有数据
industry_df = all_df[all_df[‘新版申万一级行业名称‘] == industry]

# 以下逻辑和 analyze_overall 完全一样,只是数据范围限定在一个行业内
for signal_type, signal_name in [(1, ‘看涨信号(金叉)‘), (0, ‘看跌信号(死叉)‘)]:
signal_df = industry_df[industry_df[‘signal‘] == signal_type]

if signal_df.empty:
continue

for day in day_list:
return_col = f‘{day}日后涨跌幅‘
excess_col = f‘{day}日后_相对指数涨跌幅‘

returns = signal_df[return_col].dropna()
excess_returns = signal_df[excess_col].dropna()

if returns.empty:
continue

up_returns = returns[returns > 0]
down_returns = returns[returns <= 0]

if signal_type == 1:
up_prob = len(up_returns) / len(returns) if len(returns) > 0 else 0
avg_up_return = up_returns.mean() if len(up_returns) > 0 else 0
avg_down_return = down_returns.mean() if len(down_returns) > 0 else 0
avg_return = returns.mean()
avg_excess = excess_returns.mean() if len(excess_returns) > 0 else 0

results.append({
‘行业名称‘: industry,
‘信号类型‘: signal_name,
‘持有天数‘: day,
‘上涨概率‘: up_prob,
‘平均上涨收益‘: avg_up_return,
‘平均下跌收益‘: avg_down_return,
‘每笔平均收益‘: avg_return,
‘超额收益‘: avg_excess,
‘样本数量‘: len(returns)
})
else:
down_prob = len(down_returns) / len(returns) if len(returns) > 0 else 0
avg_up_return = up_returns.mean() if len(up_returns) > 0 else 0
avg_down_return = down_returns.mean() if len(down_returns) > 0 else 0
avg_return = returns.mean()
avg_excess = excess_returns.mean() if len(excess_returns) > 0 else 0

results.append({
‘行业名称‘: industry,
‘信号类型‘: signal_name,
‘持有天数‘: day,
‘下跌概率‘: down_prob,
‘平均上涨收益‘: avg_up_return,
‘平均下跌收益‘: avg_down_return,
‘每笔平均收益‘: avg_return,
‘超额收益‘: avg_excess,
‘样本数量‘: len(returns)
})

return pd.DataFrame(results)

# ===================== 第七部分:Excel输出 =====================

def format_excel_sheet(ws, df):

【美化Excel工作表】

给Excel表格加上格式:
- 表头:蓝色背景、白色加粗字体、居中
- 数据行:居中对齐、加边框
- 百分比列:自动显示为百分比格式(如 0.05 显示为 5.00%)

参数:
ws: openpyxl的工作表对象
df: 要写入的数据

from openpyxl.styles import Font, Alignment, Border, Side, PatternFill
from openpyxl.utils import get_column_letter

# 定义样式
header_fill = PatternFill(start_color=‘4F81BD‘, end_color=‘4F81BD‘, fill_type=‘solid‘) # 蓝色背景
header_font_white = Font(bold=True, size=11, color=‘FFFFFF‘) # 白色加粗字体
center_alignment = Alignment(horizontal=‘center‘, vertical=‘center‘) # 居中对齐
thin_border = Border( # 细边框
left=Side(style=‘thin‘),
right=Side(style=‘thin‘),
top=Side(style=‘thin‘),
bottom=Side(style=‘thin‘)
)

# 自动调整列宽(根据内容的最大宽度)
for col_idx, col in enumerate(df.columns, 1):
max_length = max(
df[col].astype(str).map(len).max() if len(df) > 0 else 0,
len(str(col))
) + 2
ws.column_dimensions[get_column_letter(col_idx)].width = min(max_length, 50)

# 格式化标题行(第一行)
for cell in ws[1]:
cell.font = header_font_white
cell.fill = header_fill
cell.alignment = center_alignment
cell.border = thin_border

# 格式化数据行
for row in ws.iter_rows(min_row=2, max_row=ws.max_row, max_col=ws.max_column):
for cell in row:
cell.alignment = center_alignment
cell.border = thin_border
# 百分比格式:第4列(上涨/下跌概率)和第5-8列(收益率)显示为百分比
if cell.column in [4, 8]:
if isinstance(cell.value, float):
cell.number_format = ‘0.00%‘ # 两位小数的百分比
elif cell.column in [5, 6, 7]:
if isinstance(cell.value, float):
cell.number_format = ‘0.00%‘

def export_to_excel(df, overall_df, output_path):

【导出结果到Excel】

生成一个Excel文件,包含以下工作表(sheet):
1. 汇总:所有行业的数据,底部追加总体分析结果
2. 各行业:每个行业一个独立的sheet
3. 看涨信号汇总:所有行业的金叉信号统计
4. 看跌信号汇总:所有行业的死叉信号统计
5. 总体分析:不分行业的整体统计

参数:
df: 按行业分析的结果
overall_df: 总体分析的结果
output_path: 输出文件夹路径

from openpyxl import load_workbook
from openpyxl.styles import Font, Alignment, Border, Side, PatternFill

output_file = os.path.join(output_path, ‘基础金叉死叉.xlsx‘) # 输出文件名

# 第一步:创建Excel文件并写入各个sheet
with pd.ExcelWriter(output_file, engine=‘openpyxl‘) as writer:
# Sheet 1: 汇总(所有数据)
summary_df = df.copy()
summary_df.to_excel(writer, sheet_name=‘汇总‘, index=False)
format_excel_sheet(writer.sheets[‘汇总‘], summary_df)

# Sheet 2~N: 每个行业一个sheet
industries = df[‘行业名称‘].unique()
for industry in industries:
if pd.isna(industry):
continue
industry_df = df[df[‘行业名称‘] == industry]
sheet_name = str(industry)[:31] # Excel的sheet名最长31个字符
industry_df.to_excel(writer, sheet_name=sheet_name, index=False)
format_excel_sheet(writer.sheets[sheet_name], industry_df)

# Sheet: 看涨信号汇总
bull_df = df[df[‘信号类型‘] == ‘看涨信号(金叉)‘]
if not bull_df.empty:
bull_df.to_excel(writer, sheet_name=‘看涨信号汇总‘, index=False)
format_excel_sheet(writer.sheets[‘看涨信号汇总‘], bull_df)

# Sheet: 看跌信号汇总
bear_df = df[df[‘信号类型‘] == ‘看跌信号(死叉)‘]
if not bear_df.empty:
bear_df.to_excel(writer, sheet_name=‘看跌信号汇总‘, index=False)
format_excel_sheet(writer.sheets[‘看跌信号汇总‘], bear_df)

# Sheet: 总体分析
overall_df.to_excel(writer, sheet_name=‘总体分析‘, index=False)
format_excel_sheet(writer.sheets[‘总体分析‘], overall_df)

# 第二步:在汇总sheet末尾追加总体分析结果(方便一站式查看)
wb = load_workbook(output_file)
ws = wb[‘汇总‘]

last_row = ws.max_row + 2 # 跳一行空行

# 添加标题
title_font = Font(bold=True, size=14)
ws.cell(row=last_row, column=1, value=‘总体分析结果(不分行业)‘)
ws.cell(row=last_row, column=1).font = title_font
last_row += 2

# 定义样式(和format_excel_sheet中一样)
header_fill = PatternFill(start_color=‘4F81BD‘, end_color=‘4F81BD‘, fill_type=‘solid‘)
header_font_white = Font(bold=True, size=11, color=‘FFFFFF‘)
center_alignment = Alignment(horizontal=‘center‘, vertical=‘center‘)
thin_border = Border(
left=Side(style=‘thin‘), right=Side(style=‘thin‘),
top=Side(style=‘thin‘), bottom=Side(style=‘thin‘)
)

# 写入表头
for col_idx, col_name in enumerate(overall_df.columns, 1):
cell = ws.cell(row=last_row, column=col_idx, value=col_name)
cell.font = header_font_white
cell.fill = header_fill
cell.alignment = center_alignment
cell.border = thin_border
last_row += 1

# 写入数据行
for row_idx, row in overall_df.iterrows():
for col_idx, value in enumerate(row, 1):
cell = ws.cell(row=last_row, column=col_idx, value=value)
cell.alignment = center_alignment
cell.border = thin_border
if col_idx in [4, 8]:
if isinstance(value, float):
cell.number_format = ‘0.00%‘
elif col_idx in [5, 6, 7]:
if isinstance(value, float):
cell.number_format = ‘0.00%‘
last_row += 1

wb.save(output_file)
print(f \n结果已保存到: {output_file} )
return output_file

# ===================== 第八部分:主程序入口 =====================
# 下面这段代码只在直接运行本文件时执行(不会在被其他文件导入时执行)

if __name__ == ‘__main__‘:
# 打印基本信息
print(f 共发现 {len(os.listdir(file_path))} 个文件 )
print(f 测试时间段: {start_time} - {end_time} )
print( = * 50)

# 步骤1:读取沪深300指数数据
index_df = pd.read_csv(index_path, encoding=‘gbk‘, parse_dates=[‘candle_end_time‘])
index_df.rename(columns={‘candle_end_time‘: ‘交易日期‘}, inplace=True)

# 步骤2:获取文件夹下所有CSV文件
# 排除以 backtest 开头的文件(可能是之前运行产生的结果文件)
file_list = [f for f in os.listdir(file_path) if f.endswith(‘.csv‘) and not f.startswith(‘backtest‘)]

# 步骤3:多进程并行处理所有股票
# ProcessPoolExecutor 会启动多个进程,让多个CPU核心同时处理不同的股票文件
# 这样5000多只股票可以同时处理,大幅缩短运行时间
all_dfs = []
with ProcessPoolExecutor(max_workers=os.cpu_count()) as executor:
# 提交所有任务
futures = {
executor.submit(process_file, f, file_path, index_df, day_list, start_time, end_time): f
for f in file_list
}

# 收集结果
completed = 0
for future in as_completed(futures):
f = futures[future]
try:
df = future.result()
if df is not None:
all_dfs.append(df) # 收集有效结果
completed += 1
# 每处理100个文件打印一次进度
if completed % 100 == 0:
print(f 已处理 {completed}/{len(file_list)} 个文件 )
except Exception as e:
print(f Error processing file {f}: {e} )

print(f \n成功处理 {len(all_dfs)} 个股票文件 )

# 步骤4:检查是否有有效数据
if not all_dfs:
print( 没有有效的股票数据,请检查数据文件和路径配置! )
exit()

# 步骤5:合并所有股票的信号数据为一个大表
all_df = pd.concat(all_dfs, ignore_index=True)
print(f 总共有 {len(all_df)} 条信号记录 )

# 步骤6:统计分析
print( \n正在按行业分析MACD信号... )
results_df = analyze_by_industry(all_df, day_list)

print( 正在分析总体MACD信号... )
overall_df = analyze_overall(all_df, day_list)

# 步骤7:导出结果到Excel
export_to_excel(results_df, overall_df, output_path)

# 步骤8:打印最终统计
print( \n + = * 50)
print( 分析完成! )
print(f 共分析 {len(results_df[‘行业名称‘].unique())} 个行业 )
print(f 看涨信号样本数: {all_df[all_df[‘signal‘] == 1].shape[0]} )
print(f 看跌信号样本数: {all_df[all_df[‘signal‘] == 0].shape[0]} )
[/i]