wance_data/src/backtest/macd_strategy.py

269 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import json
from datetime import datetime
import bt
import numpy as np
import pandas as pd
from src.utils.backtest_until import get_local_data, convert_pandas_to_json_serializable
from src.models import wance_data_storage_backtest, wance_data_stock
from src.tortoises_orm_config import init_tortoise
# MACD策略函数
async def create_dual_ma_strategy(data, stock_code: str, short_window: int = 50, long_window: int = 200):
# 生成MACD策略信号
signal = await macd_strategy(data, short_window, long_window)
# 使用bt框架构建策略
strategy = bt.Strategy(f'{stock_code} MACD策略',
[bt.algos.RunDaily(),
bt.algos.SelectAll(), # 选择所有股票
bt.algos.WeighTarget(signal), # 根据信号调整权重
bt.algos.Rebalance()]) # 调仓
return strategy, signal
# 定义 MACD 策略的函数
def macd_strategy(data, short_window=12, long_window=26, signal_window=9):
"""
MACD 策略,当 MACD 线穿过信号线时买入,反之卖出。
参数:
data: pd.DataFrame, 股票的价格数据,行索引为日期,列为股票代码。
short_window: int, 短期 EMA 的窗口期。
long_window: int, 长期 EMA 的窗口期。
signal_window: int, 信号线 EMA 的窗口期。
返回:
signal: pd.DataFrame, 每只股票的买卖信号1 表示买入,-1 表示卖出。
"""
# 计算短期和长期的 EMA
short_ema = data.ewm(span=short_window, adjust=False).mean()
long_ema = data.ewm(span=long_window, adjust=False).mean()
# 计算 MACD 线
macd_line = short_ema - long_ema
# 计算信号线
signal_line = macd_line.ewm(span=signal_window, adjust=False).mean()
# 生成买入和卖出信号
signal = pd.DataFrame(index=data.index, columns=data.columns)
for column in data.columns:
signal[column] = 0 # 初始化信号为 0
# 买入信号MACD 线从下方穿过信号线
signal[column] = (macd_line[column] > signal_line[column]) & (macd_line[column].shift(1) <= signal_line[column].shift(1)).astype(int)
# 卖出信号MACD 线从上方穿过信号线
signal[column] = (macd_line[column] < signal_line[column]) & (macd_line[column].shift(1) >= signal_line[column].shift(1)).astype(int) * -1 + signal[column]
# 前向填充信号,保持持仓不变
signal = signal.ffill().fillna(0)
return signal
async def storage_backtest_data(source_column_name, result, signal, stock_code, stock_data_series, short_window,
long_window):
await init_tortoise()
# 要存储的字段列表
fields_to_store = [
'stock_code', 'strategy_name', 'stock_close_price', 'daily_price',
'price', 'returns', 'data_start_time', 'data_end_time',
'backtest_end_time', 'position', 'backtest_name', 'rf', 'total_return', 'cagr',
'max_drawdown', 'calmar', 'mtd', 'three_month',
'six_month', 'ytd', 'one_year', 'three_year',
'five_year', 'ten_year', 'incep', 'daily_sharpe',
'daily_sortino', 'daily_mean', 'daily_vol',
'daily_skew', 'daily_kurt', 'best_day', 'worst_day',
'monthly_sharpe', 'monthly_sortino', 'monthly_mean',
'monthly_vol', 'monthly_skew', 'monthly_kurt',
'best_month', 'worst_month', 'yearly_sharpe',
'yearly_sortino', 'yearly_mean', 'yearly_vol',
'yearly_skew', 'yearly_kurt', 'best_year', 'worst_year',
'avg_drawdown', 'avg_drawdown_days', 'avg_up_month',
'avg_down_month', 'win_year_perc', 'twelve_month_win_perc'
]
# 准备要存储的数据
data_to_store = {
'stock_code': stock_code,
'strategy_name': "MACD策略",
'stock_close_price': json.dumps(stock_data_series.fillna(0).rename_axis('time').reset_index().assign(
time=stock_data_series.index.strftime('%Y%m%d')).set_index('time').to_dict(orient='index')),
'daily_price': convert_pandas_to_json_serializable(result[source_column_name].daily_prices),
'price': convert_pandas_to_json_serializable(result[source_column_name].prices),
'returns': convert_pandas_to_json_serializable(result[source_column_name].returns.fillna(0)),
'data_start_time': pd.to_datetime(result.stats.loc["start"].iloc[0]).strftime('%Y%m%d'),
'data_end_time': pd.to_datetime(result.stats.loc["end"].iloc[0]).strftime('%Y%m%d'),
'backtest_end_time': int(datetime.now().strftime('%Y%m%d')),
'position': convert_pandas_to_json_serializable(signal),
'backtest_name': f'{stock_code} MACD策略 MA{short_window}-{long_window}',
'indicator_type': 'MACD',
'indicator_information': json.dumps({'short_window': short_window, 'long_window': long_window})
}
# 使用循环填充其他字段
for field in fields_to_store[12:]: # 从第10个字段开始
value = result.stats.loc[field].iloc[0]
data_to_store[field] = 0.0 if (isinstance(value, float) and np.isnan(value)) else value
# 检查是否存在该 backtest_name
existing_record = await wance_data_storage_backtest.WanceDataStorageBacktest.filter(
backtest_name=data_to_store['backtest_name']
).first()
if existing_record:
# 如果存在,更新记录
await wance_data_storage_backtest.WanceDataStorageBacktest.filter(
id=existing_record.id
).update(**data_to_store)
else:
# 如果不存在,创建新的记录
await wance_data_storage_backtest.WanceDataStorageBacktest.create(**data_to_store)
return data_to_store
async def run_macd_backtest(field_list: list,
stock_list: list,
period: str = '1d',
start_time: str = '',
end_time: str = '',
count: int = 100,
dividend_type: str = 'none',
fill_data: bool = True,
data_dir: str = '',
short_window: int = 50,
long_window: int = 200):
try:
# 初始化一个列表用于存储每只股票的回测结果字典
results_list = []
# 遍历每只股票的数据(每列代表一个股票的收盘价)
data = await get_local_data(field_list, stock_list, period, start_time, end_time, count, dividend_type,
fill_data,
data_dir)
for stock_code in stock_list:
data_column_name = f'close_{stock_code}'
source_column_name = f'{stock_code} MACD策略'
backtest_name = f'{stock_code} MACD策略 MA{short_window}-{long_window}'
now_data = int(datetime.now().strftime('%Y%m%d'))
db_result_data = await wance_data_storage_backtest.WanceDataStorageBacktest.filter(
backtest_name=backtest_name)
if db_result_data:
if db_result_data[0].backtest_end_time == now_data:
results_list.append({source_column_name: db_result_data[0]})
elif data_column_name in data.columns:
stock_data_series = data[[data_column_name]] # 提取该股票的收盘价 DataFrame
stock_data_series.columns = ['close'] # 重命名列为 'close'
# 创建MACD策略
strategy, signal = await create_dual_ma_strategy(stock_data_series, stock_code,
short_window=short_window,
long_window=long_window)
# 创建回测
backtest = bt.Backtest(strategy=strategy, data=stock_data_series, initial_capital=100000)
# 运行回测
result = bt.run(backtest)
# 存储回测结果
data_to_store = await storage_backtest_data(source_column_name, result, signal, stock_code,
stock_data_series,
short_window, long_window)
# # 绘制回测结果图表
# result.plot()
# # 绘制个别股票数据图表
# plt.figure(figsize=(12, 6))
# plt.plot(stock_data_series.index, stock_data_series['close'], label='Stock Price')
# plt.title(f'Stock Price for {stock_code}')
# plt.xlabel('Date')
# plt.ylabel('Price')
# plt.legend()
# plt.grid(True)
# plt.show()
# 将结果存储为字典并添加到列表中
results_list.append({source_column_name: data_to_store})
else:
print(f"数据中缺少列: {data_column_name}")
return results_list # 返回结果列表
except Exception as e:
print(f"Error occurred: {e}")
async def start_macd_backtest_service(field_list: list,
stock_list: list,
period: str = '1d',
start_time: str = '',
end_time: str = '',
count: int = -1,
dividend_type: str = 'none',
fill_data: bool = True,
data_dir: str = '',
short_window: int = 50,
long_window: int = 200):
for stock_code in stock_list:
backtest_name = f'{stock_code} MACD策略 MA{short_window}-{long_window}'
db_result = await wance_data_storage_backtest.WanceDataStorageBacktest.filter(
backtest_name=backtest_name)
now_time = int(datetime.now().strftime('%Y%m%d'))
if db_result and db_result[0].backtest_end_time == now_time:
return db_result
else:
# 执行回测
result = await run_macd_backtest(
field_list=field_list,
stock_list=stock_list,
period=period,
start_time=start_time,
end_time=end_time,
count=count,
dividend_type=dividend_type,
fill_data=fill_data,
data_dir=data_dir,
short_window=short_window,
long_window=long_window,
)
return result
async def init_backtest_db():
MACD_list = [{"short_window": 5, "long_window": 10}, {"short_window": 10, "long_window": 30},
{"short_window": 30, "long_window": 60}, {"short_window": 30, "long_window": 90},
{"short_window": 70, "long_window": 140}, {"short_window": 120, "long_window": 250}]
await init_tortoise()
wance_db = await wance_data_stock.WanceDataStock.all()
MACD_list_lenght = len(MACD_list)
for stock_code in wance_db:
for i in range(MACD_list_lenght):
short_window = MACD_list[i]['short_window']
long_window = MACD_list[i]['long_window']
source_column_name = f'{stock_code} MACD策略 MA{short_window}-{long_window}'
result = await run_macd_backtest(field_list=['close', 'time'],
stock_list=[stock_code.stock_code],
short_window=short_window,
long_window=long_window)
print(f"回测成功 {source_column_name}")
if __name__ == '__main__':
# 测试类的回测
# asyncio.run(run_macd_backtest(field_list=['close', 'time'],
# stock_list=['601222.SH', '601677.SH'],
# short_window=10,
# long_window=30))
# 初始化数据库表
asyncio.run(init_backtest_db())