1、新增组合代码

This commit is contained in:
向键雄-全栈开发工程师 2024-10-16 12:58:23 +08:00
parent fa7bc34159
commit 1d8b78593e
16 changed files with 1236 additions and 1145 deletions

BIN
models.py

Binary file not shown.

@ -30,8 +30,3 @@ async def stock_chart(request: BackRequest):
result = await stock_chart_service(stock_code=request.stock_code, result = await stock_chart_service(stock_code=request.stock_code,
benchmark_code=request.benchmark_code) benchmark_code=request.benchmark_code)
return result return result
@router.get('/combination')
async def combination(request: BackRequest):
await combination_service()

@ -1,30 +1,58 @@
import asyncio import asyncio
import json import json
from datetime import datetime from datetime import datetime
import bt import bt # 引入bt框架用于回测
import numpy as np import numpy as np
import pandas as pd import pandas as pd
from src.utils.backtest_until import get_local_data, convert_pandas_to_json_serializable from src.models import wance_data_storage_backtest, wance_data_stock, user_combination_history # 导入数据库模型
from src.models import wance_data_storage_backtest, wance_data_stock from src.tortoises_orm_config import init_tortoise # 初始化数据库
from src.tortoises_orm_config import init_tortoise from src.utils.combination_until import get_local_data # 获取本地数据的工具函数
# 布林带策略函数 # 创建布林带策略函数
async def create_bollinger_bands_strategy(data, stock_code: str, bollingerMA: int = 50, std_dev: int = 200): async def create_bollinger_bands_strategy(data, stock_weights_dict, bollingerMA: int = 50, std_dev: int = 200):
# 生成布林带策略信号 """
signal = await bollinger_bands_strategy(data, bollingerMA, std_dev) 创建组合布林带策略并根据股票权重进行回测
# 使用bt框架构建策略 参数:
strategy = bt.Strategy(f'{stock_code} 布林带策略', data: pd.DataFrame, 股票的价格数据行索引为日期列为股票代码
[bt.algos.RunDaily(), stock_weights_dict: dict, 每只股票的权重字典
bt.algos.SelectAll(), # 选择所有股票 bollingerMA: int, 用于计算布林带的移动平均线周期
bt.algos.WeighTarget(signal), # 根据信号调整权重 std_dev: int, 用于计算布林带上下轨的标准差倍数
bt.algos.Rebalance()]) # 调仓
return strategy, signal 返回:
strategy: bt.Strategy, 构建的策略
combined_signal: pd.DataFrame, 股票的买卖信号
"""
# 创建一个数据框用于存储所有股票的买卖信号
combined_signal = pd.DataFrame(index=data.index)
# 遍历每只股票,根据股票权重生成布林带信号
for stock_code in stock_weights_dict.keys():
if f'{stock_code}' in data.columns:
# 提取每只股票的数据
stock_data_series = data[[f'{stock_code}']]
stock_data_series.columns = [f'{stock_code}']
# 调用布林带策略计算买卖信号
signal = await bollinger_bands_strategy(stock_data_series, bollingerMA, std_dev)
combined_signal[stock_code] = signal
else:
print(f"Warning: Stock code {stock_code} not found in data columns.") # 如果股票不在数据中,发出警告
# 创建权重数据框
weights = pd.DataFrame(index=data.index, data=[stock_weights_dict] * len(data.index))
# 使用bt框架构建布林带策略
strategy = bt.Strategy('组合布林带策略',
[bt.algos.RunDaily(), # 每天运行策略
bt.algos.SelectAll(), # 选择所有可用的股票
bt.algos.WeighSpecified(**stock_weights_dict), # 根据给定权重进行加权
bt.algos.Rebalance()]) # 调整投资组合
return strategy, combined_signal # 返回构建的策略和信号
async def bollinger_bands_strategy(df, window=20, num_std_dev=2): async def bollinger_bands_strategy(df, window=20, num_std_dev=2):
@ -39,173 +67,131 @@ async def bollinger_bands_strategy(df, window=20, num_std_dev=2):
返回: 返回:
signal: pd.DataFrame, 每只股票的买卖信号1 表示买入0 表示卖出 signal: pd.DataFrame, 每只股票的买卖信号1 表示买入0 表示卖出
""" """
# 计算中轨线(移动平均) # 计算布林带的中轨(移动平均)
middle_band = df.rolling(window=window, min_periods=1).mean() middle_band = df.rolling(window=window, min_periods=1).mean()
# 计算移动标准差
# 计算滚动标准差
rolling_std = df.rolling(window=window, min_periods=1).std() rolling_std = df.rolling(window=window, min_periods=1).std()
# 计算布林带的上下轨
# 计算上轨线和下轨线
upper_band = middle_band + (rolling_std * num_std_dev) upper_band = middle_band + (rolling_std * num_std_dev)
lower_band = middle_band - (rolling_std * num_std_dev) lower_band = middle_band - (rolling_std * num_std_dev)
# 初始化信号 DataFrame # 创建一个数据框用于存储每只股票的买卖信号
signal = pd.DataFrame(index=df.index, columns=df.columns) signal = pd.DataFrame(index=df.index, columns=df.columns)
# 生成买入信号:当价格突破下轨时 # 根据布林带计算买卖信号
for column in df.columns: for column in df.columns:
signal[column] = np.where(df[column] < lower_band[column], 1, np.nan) # 买入信号 signal[column] = np.where(df[column] < lower_band[column], 1, np.nan) # 当收盘价低于下轨,生成买入信号
signal[column] = np.where(df[column] > upper_band[column], 0, signal[column]) # 当收盘价高于上轨,生成卖出信号
# 生成卖出信号:当价格突破上轨时 # 前向填充信号并用0填充NaN值
for column in df.columns: signal = signal.ffill().fillna(0)
signal[column] = np.where(df[column] > upper_band[column], 0, signal[column]) # 卖出信号 return signal # 返回信号数据框
# 前向填充信号,持仓不变
signal = signal.ffill()
# 将剩余的 NaN 替换为 0
signal = signal.fillna(0)
return signal
async def storage_backtest_data(source_column_name, result, signal, stock_code, stock_data_series, bollingerMA, async def storage_backtest_data(user_id, stock_weights, strategy_parame, strategy_name):
std_dev): """
await init_tortoise() 存储回测结果到数据库
# 要存储的字段列表 参数:
fields_to_store = [ user_id: int, 用户的ID
'stock_code', 'strategy_name', 'stock_close_price', 'daily_price', stock_weights: dict, 股票权重字典
'price', 'returns', 'data_start_time', 'data_end_time', strategy_parame: dict, 策略参数字典
'backtest_end_time', 'position', 'backtest_name', 'rf', 'total_return', 'cagr', strategy_name: str, 策略名称
'max_drawdown', 'calmar', 'mtd', 'three_month', """
'six_month', 'ytd', 'one_year', 'three_year', await init_tortoise() # 初始化数据库
'five_year', 'ten_year', 'incep', 'daily_sharpe',
'daily_sortino', 'daily_mean', 'daily_vol',
'daily_skew', 'daily_kurt', 'best_day', 'worst_day',
'monthly_sharpe', 'monthly_sortino', 'monthly_mean',
'monthly_vol', 'monthly_skew', 'monthly_kurt',
'best_month', 'worst_month', 'yearly_sharpe',
'yearly_sortino', 'yearly_mean', 'yearly_vol',
'yearly_skew', 'yearly_kurt', 'best_year', 'worst_year',
'avg_drawdown', 'avg_drawdown_days', 'avg_up_month',
'avg_down_month', 'win_year_perc', 'twelve_month_win_perc'
]
# 准备要存储的数据 # 查询用户的已有记录
data_to_store = { existing_record = await user_combination_history.UserCombinationHistory.filter(user_id=user_id).first()
'stock_code': stock_code,
'strategy_name': "布林带策略",
'stock_close_price': json.dumps(stock_data_series.fillna(0).rename_axis('time').reset_index().assign(
time=stock_data_series.index.strftime('%Y%m%d')).set_index('time').to_dict(orient='index')),
'daily_price': convert_pandas_to_json_serializable(result[source_column_name].daily_prices),
'price': convert_pandas_to_json_serializable(result[source_column_name].prices),
'returns': convert_pandas_to_json_serializable(result[source_column_name].returns.fillna(0)),
'data_start_time': pd.to_datetime(result.stats.loc["start"].iloc[0]).strftime('%Y%m%d'),
'data_end_time': pd.to_datetime(result.stats.loc["end"].iloc[0]).strftime('%Y%m%d'),
'backtest_end_time': int(datetime.now().strftime('%Y%m%d')),
'position': convert_pandas_to_json_serializable(signal),
'backtest_name': f'{stock_code} 布林带策略 MA{bollingerMA}-{std_dev}倍标准差',
'indicator_type': 'Bollinger',
'indicator_information': json.dumps({'bollingerMA': bollingerMA, 'std_dev': std_dev})
}
# 使用循环填充其他字段 # 将股票权重和策略参数转换为JSON格式
for field in fields_to_store[12:]: # 从第10个字段开始 stock_weights_json = json.dumps(stock_weights)
value = result.stats.loc[field].iloc[0] strategy_parame_json = json.dumps(strategy_parame)
data_to_store[field] = 0.0 if (isinstance(value, float) and np.isnan(value)) else value
# 检查是否存在该 backtest_name
existing_record = await wance_data_storage_backtest.WanceDataStorageBacktest.filter(
backtest_name=data_to_store['backtest_name']
).first()
# 如果记录存在,更新记录;否则,创建新记录
if existing_record: if existing_record:
# 如果存在,更新记录 await user_combination_history.UserCombinationHistory.filter(
await wance_data_storage_backtest.WanceDataStorageBacktest.filter(
id=existing_record.id id=existing_record.id
).update(**data_to_store) ).update(user_id=user_id,
last_time=int(datetime.now().strftime('%Y%m%d')),
stock_weights=stock_weights_json,
strategy_name=strategy_name,
strategy_parame=strategy_parame_json)
else: else:
# 如果不存在,创建新的记录 await user_combination_history.UserCombinationHistory.create(user_id=user_id,
await wance_data_storage_backtest.WanceDataStorageBacktest.create(**data_to_store) last_time=int(datetime.now().strftime('%Y%m%d')),
stock_weights=stock_weights_json,
return data_to_store strategy_name=strategy_name,
strategy_parame=strategy_parame_json)
async def run_bollinger_backtest(field_list: list, async def run_combined_bollinger_backtest(field_list: list,
stock_list: list, stock_list: list,
period: str = '1d', stock_weights: list,
start_time: str = '', user_id: int = 1,
end_time: str = '', period: str = '1d',
count: int = 100, start_time: str = '',
dividend_type: str = 'none', end_time: str = '',
fill_data: bool = True, count: int = 100,
data_dir: str = '', dividend_type: str = 'none',
bollingerMA: int = 50, fill_data: bool = True,
std_dev: int = 200): data_dir: str = '',
bollingerMA: int = 50,
std_dev: int = 200):
"""
运行组合布林带策略的回测
参数:
field_list: list, 需要获取的字段列表
stock_list: list, 需要回测的股票代码列表
stock_weights: list, 股票权重列表
user_id: int, 用户ID
period: str, 数据周期
start_time: str, 开始时间
end_time: str, 结束时间
count: int, 数据数量限制
dividend_type: str, 分红类型
fill_data: bool, 是否填充数据
data_dir: str, 数据目录
bollingerMA: int, 布林带移动平均线周期
std_dev: int, 标准差倍数
返回:
result: 回测结果
"""
try: try:
# 初始化一个列表用于存储每只股票的回测结果字典 # 获取本地数据
results_list = []
# 遍历每只股票的数据(每列代表一个股票的收盘价)
data = await get_local_data(field_list, stock_list, period, start_time, end_time, count, dividend_type, data = await get_local_data(field_list, stock_list, period, start_time, end_time, count, dividend_type,
fill_data, fill_data, data_dir)
data_dir)
for stock_code in stock_list: # 获取股票权重字典
stock_weights_dict = stock_weights[0] if stock_weights else {}
data_column_name = f'close_{stock_code}' # 创建布林带策略
source_column_name = f'{stock_code} 布林带策略' strategy, signal = await create_bollinger_bands_strategy(data, stock_weights_dict, bollingerMA, std_dev)
backtest_name = f'{stock_code} 布林带策略 MA{bollingerMA}-{std_dev}倍标准差'
now_time = int(datetime.now().strftime('%Y%m%d'))
db_result = await wance_data_storage_backtest.WanceDataStorageBacktest.filter(
backtest_name=backtest_name)
if db_result:
if db_result[0].backtest_end_time == now_time:
results_list.append({source_column_name: db_result[0]})
# elif data_column_name in data.columns: # 创建回测对象
if data_column_name in data.columns: backtest = bt.Backtest(strategy, data, initial_capital=100000)
stock_data_series = data[[data_column_name]] # 提取该股票的收盘价 DataFrame
stock_data_series.columns = ['close'] # 重命名列为 'close'
# 创建布林带策略 # 策略参数
strategy, signal = await create_bollinger_bands_strategy(stock_data_series, stock_code, strategy_parame = {"bollingerMA": bollingerMA, "std_dev": std_dev}
bollingerMA=bollingerMA, strategy_name = "布林带策略"
std_dev=std_dev)
# 创建回测
backtest = bt.Backtest(strategy=strategy, data=stock_data_series, initial_capital=100000)
# 运行回测
result = bt.run(backtest)
# 存储回测结果
data_to_store = await storage_backtest_data(source_column_name, result, signal, stock_code,
stock_data_series,
bollingerMA, std_dev)
# # 绘制回测结果图表
# result.plot()
# # 绘制个别股票数据图表
# plt.figure(figsize=(12, 6))
# plt.plot(stock_data_series.index, stock_data_series['close'], label='Stock Price')
# plt.title(f'Stock Price for {stock_code}')
# plt.xlabel('Date')
# plt.ylabel('Price')
# plt.legend()
# plt.grid(True)
# plt.show()
# 将结果存储为字典并添加到列表中
results_list.append({source_column_name: data_to_store})
else: # 运行回测
print(f"数据中缺少列: {data_column_name}") result = bt.run(backtest)
return results_list # 返回结果列表 # 存储回测结果
await storage_backtest_data(user_id, stock_weights_dict, strategy_parame, strategy_name)
return result # 返回回测结果
except Exception as e: except Exception as e:
print(f"Error occurred: {e}") print(f"Error occurred: {e}") # 输出错误信息
async def start_bollinger_combination_service(field_list: list, async def start_bollinger_combination_service(field_list: list,
stock_list: list, stock_list: list,
stock_weights: list,
user_id: int = 1,
period: str = '1d', period: str = '1d',
start_time: str = '', start_time: str = '',
end_time: str = '', end_time: str = '',
@ -215,59 +201,51 @@ async def start_bollinger_combination_service(field_list: list,
data_dir: str = '', data_dir: str = '',
bollingerMA: int = 50, bollingerMA: int = 50,
std_dev: int = 200): std_dev: int = 200):
for stock_code in stock_list: """
backtest_name = f'{stock_code} 布林带策略 MA{bollingerMA}-{std_dev}倍标准差' 启动布林带策略回测服务
db_result = await wance_data_storage_backtest.WanceDataStorageBacktest.filter(
backtest_name=backtest_name)
now_time = int(datetime.now().strftime('%Y%m%d'))
if db_result and db_result[0].backtest_end_time == now_time: 参数:
return db_result field_list: list, 需要获取的字段列表
else: stock_list: list, 需要回测的股票代码列表
# 执行回测 stock_weights: list, 股票权重列表
result = await run_bollinger_backtest( user_id: int, 用户ID
field_list=field_list, period: str, 数据周期
stock_list=stock_list, start_time: str, 开始时间
period=period, end_time: str, 结束时间
start_time=start_time, count: int, 数据数量限制
end_time=end_time, dividend_type: str, 分红类型
count=count, fill_data: bool, 是否填充数据
dividend_type=dividend_type, data_dir: str, 数据目录
fill_data=fill_data, bollingerMA: int, 布林带移动平均线周期
data_dir=data_dir, std_dev: int, 标准差倍数
bollingerMA=bollingerMA,
std_dev=std_dev,
)
return result
返回:
async def init_backtest_db(): result: 回测结果
bollinger_list = [{"bollingerMA": 20, "std_dev": 2}, {"bollingerMA": 30, "std_dev": 2}, """
{"bollingerMA": 70, "std_dev": 2}, {"bollingerMA": 5, "std_dev": 1}, # 调用回测函数并返回结果
{"bollingerMA": 20, "std_dev": 3}, {"bollingerMA": 50, "std_dev": 2.5}] result = await run_combined_bollinger_backtest(
await init_tortoise() field_list=field_list,
wance_db = await wance_data_stock.WanceDataStock.all() stock_list=stock_list,
bollinger_list_lenght = len(bollinger_list) stock_weights=stock_weights,
user_id=user_id,
for stock_code in wance_db: period=period,
for i in range(bollinger_list_lenght): start_time=start_time,
bollingerMA = bollinger_list[i]['bollingerMA'] end_time=end_time,
std_dev = bollinger_list[i]['std_dev'] count=count,
source_column_name = f'{stock_code} 布林带策略 MA{bollingerMA}-{std_dev}倍标准差' dividend_type=dividend_type,
result = await run_bollinger_backtest(field_list=['close', 'time'], fill_data=fill_data,
stock_list=[stock_code.stock_code], data_dir=data_dir,
bollingerMA=bollingerMA, bollingerMA=bollingerMA,
std_dev=std_dev) std_dev=std_dev,
)
print(f"回测成功 {source_column_name}") return result # 返回回测结果
if __name__ == '__main__': if __name__ == '__main__':
# 测试类的回测 # 主函数,启动回测服务
asyncio.run(run_bollinger_backtest(field_list=['close', 'time'], asyncio.run(run_combined_bollinger_backtest(field_list=['close', 'time'],
stock_list=['601222.SH', '601677.SH'], stock_list=["688031.SH", "600025.SH", "601222.SH"],
bollingerMA=20, user_id=1,
std_dev=2)) stock_weights=[{"688031.SH": 0.5, "600025.SH": 0.3, "601222.SH": 0.2}],
bollingerMA=50,
# # 初始化数据库表 std_dev=200))
# asyncio.run(init_backtest_db())

@ -1,110 +0,0 @@
import pandas as pd
import bt
import yfinance as yf # 这里使用 yfinance 获取股票数据,实际应用可以替换成其他数据源
def fetch_stock_data(stock_list, start, end):
"""
获取指定股票的历史数据
参数:
stock: str, 股票代码
start: str, 开始时间
end: str, 结束时间
返回:
pd.DataFrame, 包含股票的收盘价数据
"""
data = yf.download(stock_list, start=start, end=end)['Adj Close']
return data
def combine_stock_data(stock_list, start, end):
"""
合并多只股票的数据
参数:
stocks: list, 股票代码列表
start: str, 开始时间
end: str, 结束时间
返回:
pd.DataFrame, 合并后的股票价格数据
"""
combined_data = pd.DataFrame()
for stock in stock_list:
data = fetch_stock_data(stock, start, end)
combined_data[stock] = data
return combined_data
def create_portfolio_strategy(stock_weights):
"""
创建基于股票权重的投资组合策略
参数:
stock_weights: list of dict, 股票与权重的字典列表例如
[{'601222.SH': 0.5, '605090.SH': 0.3, '600025.SH': 0.2}]
返回:
策略对象
"""
algos = [
bt.algos.RunDaily(), # 每日运行策略
bt.algos.SelectAll(), # 选择所有股票
bt.algos.WeighSpecified(**stock_weights[0]), # 分配指定的权重
bt.algos.Rebalance() # 进行调仓
]
strategy = bt.Strategy('Portfolio Strategy', algos)
return strategy
def run_portfolio_backtest(stocks_list, start, end, stock_weights):
"""
运行组合策略的回测
参数:
stocks: list, 股票代码列表
start: str, 开始时间
end: str, 结束时间
stock_weights: list of dict, 股票与权重的字典列表例如
[{'601222.SH': 0.5, '605090.SH': 0.3, '600025.SH': 0.2}]
返回:
回测结果
"""
# 获取股票的历史数据
data = combine_stock_data(stocks_list, start, end)
# 创建策略
strategy = create_portfolio_strategy(stock_weights)
# 创建回测
backtest = bt.Backtest(strategy, data)
# 运行回测
result = bt.run(backtest)
return result
# 示例使用
if __name__ == '__main__':
# 示例股票名
stocks = ['601222.SH', '605090.SH', '600025.SH']
start_date = '2021-01-01'
end_date = '2023-01-01'
# 示例权重分配
stock_weights = [{'601222.SH': 0.5, '605090.SH': 0.3, '600025.SH': 0.2}]
# 运行组合策略回测
result = run_portfolio_backtest(stocks, start_date, end_date, stock_weights)
# 打印回测结果
result.plot()
result.display()

@ -1,215 +0,0 @@
import asyncio
import json
from datetime import datetime
import bt
import numpy as np
import pandas as pd
from src.utils.backtest_until import get_local_data, convert_pandas_to_json_serializable
from src.models import wance_data_storage_backtest, wance_data_stock
from src.tortoises_orm_config import init_tortoise
# 双均线策略函数
async def create_dual_ma_strategy(data, short_window: int = 50, long_window: int = 200):
# 生成权重
weights = await dual_ma_strategy(data, short_window, long_window)
# 使用bt框架构建组合策略
strategy = bt.Strategy('组合双均线策略',
[bt.algos.RunDaily(),
bt.algos.SelectAll(),
bt.algos.WeighTarget(weights), # 根据信号调整权重
bt.algos.Rebalance()])
return strategy
async def dual_ma_strategy(df, short_window=20, long_window=50):
"""
基于双均线策略生成买卖信号
参数:
df: pd.DataFrame, 股票的价格数据行索引为日期列为股票代码
short_window: int, 短期均线窗口期
long_window: int, 长期均线窗口期
返回:
signal: pd.DataFrame, 每只股票的买卖信号1 表示买入0 表示卖出
"""
# 计算短期均线和长期均线
short_ma = df.rolling(window=short_window, min_periods=1).mean()
long_ma = df.rolling(window=long_window, min_periods=1).mean()
# 生成买入信号
buy_signal = (short_ma > long_ma).astype(int)
# 计算权重(例如:均等权重)
weights = buy_signal.div(buy_signal.sum(axis=1), axis=0).fillna(0)
return weights
async def storage_backtest_data(source_column_name, result, signal, stock_data_series, short_window, long_window):
await init_tortoise()
# 要存储的字段列表
fields_to_store = [
'stock_code', 'strategy_name', 'stock_close_price', 'daily_price',
'price', 'returns', 'data_start_time', 'data_end_time',
'backtest_end_time', 'position', 'backtest_name', 'rf', 'total_return', 'cagr',
'max_drawdown', 'calmar', 'mtd', 'three_month',
'six_month', 'ytd', 'one_year', 'three_year',
'five_year', 'ten_year', 'incep', 'daily_sharpe',
'daily_sortino', 'daily_mean', 'daily_vol',
'daily_skew', 'daily_kurt', 'best_day', 'worst_day',
'monthly_sharpe', 'monthly_sortino', 'monthly_mean',
'monthly_vol', 'monthly_skew', 'monthly_kurt',
'best_month', 'worst_month', 'yearly_sharpe',
'yearly_sortino', 'yearly_mean', 'yearly_vol',
'yearly_skew', 'yearly_kurt', 'best_year', 'worst_year',
'avg_drawdown', 'avg_drawdown_days', 'avg_up_month',
'avg_down_month', 'win_year_perc', 'twelve_month_win_perc'
]
# 准备要存储的数据
data_to_store = {
'strategy_name': "组合双均线策略",
'stock_close_price': json.dumps(stock_data_series.fillna(0).rename_axis('time').reset_index().assign(
time=stock_data_series.index.strftime('%Y%m%d')).set_index('time').to_dict(orient='index')),
'daily_price': convert_pandas_to_json_serializable(result['daily_prices']),
'price': convert_pandas_to_json_serializable(result['prices']),
'returns': convert_pandas_to_json_serializable(result['returns'].fillna(0)),
'data_start_time': pd.to_datetime(result.stats.loc["start"].iloc[0]).strftime('%Y%m%d'),
'data_end_time': pd.to_datetime(result.stats.loc["end"].iloc[0]).strftime('%Y%m%d'),
'backtest_end_time': int(datetime.now().strftime('%Y%m%d')),
'position': convert_pandas_to_json_serializable(signal),
'backtest_name': f'组合双均线策略 MA{short_window}-{long_window}',
'indicator_type': 'SMA',
'indicator_information': json.dumps({'short_window': short_window, 'long_window': long_window})
}
# 使用循环填充其他字段
for field in fields_to_store[12:]: # 从第10个字段开始
value = result.stats.loc[field].iloc[0]
data_to_store[field] = 0.0 if (isinstance(value, float) and np.isnan(value)) else value
# 检查是否存在该 backtest_name
existing_record = await wance_data_storage_backtest.WanceDataStorageBacktest.filter(
backtest_name=data_to_store['backtest_name']
).first()
if existing_record:
# 如果存在,更新记录
await wance_data_storage_backtest.WanceDataStorageBacktest.filter(
id=existing_record.id
).update(**data_to_store)
else:
# 如果不存在,创建新的记录
await wance_data_storage_backtest.WanceDataStorageBacktest.create(**data_to_store)
return data_to_store
async def run_sma_backtest(field_list: list,
stock_list: list,
period: str = '1d',
start_time: str = '',
end_time: str = '',
count: int = 100,
dividend_type: str = 'none',
fill_data: bool = True,
data_dir: str = '',
short_window: int = 50,
long_window: int = 200):
try:
# 初始化一个列表用于存储每只股票的回测结果字典
results_list = []
# 获取股票数据
data = await get_local_data(field_list, stock_list, period, start_time, end_time, count, dividend_type,
fill_data, data_dir)
# 创建组合数据
combined_data = data[stock_list].copy() # 只选择需要的股票列
# 创建双均线策略
strategy, signal = await create_dual_ma_strategy(combined_data, short_window=short_window,
long_window=long_window)
# 创建回测
backtest = bt.Backtest(strategy=strategy, data=combined_data, initial_capital=100000)
# 运行回测
result = bt.run(backtest)
# 存储回测结果
data_to_store = await storage_backtest_data(result, signal, combined_data, short_window, long_window)
results_list.append(data_to_store)
return results_list # 返回结果列表
except Exception as e:
print(f"Error occurred: {e}")
async def start_sma_backtest_service(field_list: list,
stock_list: list,
period: str = '1d',
start_time: str = '',
end_time: str = '',
count: int = -1,
dividend_type: str = 'none',
fill_data: bool = True,
data_dir: str = '',
short_window: int = 50,
long_window: int = 200):
# 执行回测
result = await run_sma_backtest(
field_list=field_list,
stock_list=stock_list,
period=period,
start_time=start_time,
end_time=end_time,
count=count,
dividend_type=dividend_type,
fill_data=fill_data,
data_dir=data_dir,
short_window=short_window,
long_window=long_window,
)
return result
async def init_backtest_db():
sma_list = [{"short_window": 5, "long_window": 10}, {"short_window": 10, "long_window": 30},
{"short_window": 30, "long_window": 60}, {"short_window": 30, "long_window": 90},
{"short_window": 70, "long_window": 140}, {"short_window": 120, "long_window": 250}]
await init_tortoise()
wance_db = await wance_data_stock.WanceDataStock.all()
sma_list_length = len(sma_list)
for stock_code in wance_db:
for i in range(sma_list_length):
short_window = sma_list[i]['short_window']
long_window = sma_list[i]['long_window']
source_column_name = f'{stock_code.stock_code}'
result = await start_sma_backtest_service(
field_list=[source_column_name],
stock_list=[stock_code.stock_code],
period='1d',
start_time='2022-01-01',
end_time='2022-09-01',
count=-1,
dividend_type='none',
fill_data=True,
data_dir='',
short_window=short_window,
long_window=long_window
)
print(
f"Finished backtesting for {stock_code.stock_code} with short_window: {short_window}, long_window: {long_window}")
# 启动回测
if __name__ == '__main__':
asyncio.run(init_backtest_db())

@ -6,23 +6,52 @@ import bt
import numpy as np import numpy as np
import pandas as pd import pandas as pd
from src.utils.backtest_until import get_local_data, convert_pandas_to_json_serializable from src.models import wance_data_storage_backtest, wance_data_stock, user_combination_history
from src.models import wance_data_storage_backtest, wance_data_stock
from src.tortoises_orm_config import init_tortoise from src.tortoises_orm_config import init_tortoise
from src.utils.backtest_until import convert_pandas_to_json_serializable
from src.utils.combination_until import get_local_data
# 双均线策略函数 # 组合双均线策略函数
async def create_dual_ma_strategy(data, stock_code: str, short_window: int = 50, long_window: int = 200): async def create_combined_sma_strategy(data, stock_weights_dict, short_window: int = 50, long_window: int = 200):
# 生成双均线策略信号 """
signal = await dual_ma_strategy(data, short_window, long_window) 创建组合双均线策略并根据股票权重进行回测
# 使用bt框架构建策略 参数:
strategy = bt.Strategy(f'{stock_code} 双均线策略', data: pd.DataFrame, 股票价格数据行索引为日期列为股票代码
[bt.algos.RunDaily(), stock_weights_dict: dict, 股票权重字典键为股票代码值为权重
short_window: int, 短期均线窗口期默认为50
long_window: int, 长期均线窗口期默认为200
返回:
strategy: bt.Strategy, 创建的双均线策略对象
combined_signal: pd.DataFrame, 每只股票的买卖信号数据框
"""
combined_signal = pd.DataFrame(index=data.index) # 初始化信号数据框
# 遍历每只股票代码,根据股票权重生成信号
for stock_code in stock_weights_dict.keys():
# 确保每个股票代码唯一,不要使用重复代码
if f'{stock_code}' in data.columns:
stock_data_series = data[[f'{stock_code}']] # 提取对应股票的数据
stock_data_series.columns = [f'{stock_code}'] # 重命名列
signal = await dual_ma_strategy(stock_data_series, short_window, long_window) # 生成信号
combined_signal[stock_code] = signal # 存储信号
else:
print(f"Warning: Stock code {stock_code} not found in data columns.") # 提示未找到的股票代码
# 将权重转换为 DataFrame行索引为数据的时间列为股票代码
weights = pd.DataFrame(index=data.index, data=[stock_weights_dict] * len(data.index))
# 使用bt框架构建组合策略
strategy = bt.Strategy('组合双均线策略',
[bt.algos.RunDaily(), # 每天运行
bt.algos.SelectAll(), # 选择所有股票 bt.algos.SelectAll(), # 选择所有股票
bt.algos.WeighTarget(signal), # 根据信号调整权重 bt.algos.WeighSpecified(**stock_weights_dict), # 根据权重调整股票权重
bt.algos.Rebalance()]) # 调仓 bt.algos.Rebalance()]) # 调仓
return strategy, signal
return strategy, combined_signal # 返回策略和信号
async def dual_ma_strategy(df, short_window=20, long_window=50): async def dual_ma_strategy(df, short_window=20, long_window=50):
@ -37,36 +66,39 @@ async def dual_ma_strategy(df, short_window=20, long_window=50):
返回: 返回:
signal: pd.DataFrame, 每只股票的买卖信号1 表示买入0 表示卖出 signal: pd.DataFrame, 每只股票的买卖信号1 表示买入0 表示卖出
""" """
# 计算短期均线和长期均线 # 计算短期和长期均线
short_ma = df.rolling(window=short_window, min_periods=1).mean() short_ma = df.rolling(window=short_window, min_periods=1).mean() # 短期均线
long_ma = df.rolling(window=long_window, min_periods=1).mean() long_ma = df.rolling(window=long_window, min_periods=1).mean() # 长期均线
# 生成买入信号: 当短期均线从下方穿过长期均线 # 生成买入和卖出信号
buy_signal = np.where(short_ma > long_ma, 1, np.nan) buy_signal = np.where(short_ma > long_ma, 1, np.nan) # 买入信号
sell_signal = np.where(short_ma < long_ma, 0, np.nan) # 卖出信号
# 生成卖出信号: 当短期均线从上方穿过长期均线 # 将信号转换为DataFrame并向前填充
sell_signal = np.where(short_ma < long_ma, 0, np.nan)
# 合并买卖信号
signal = pd.DataFrame(buy_signal, index=df.index, columns=df.columns) signal = pd.DataFrame(buy_signal, index=df.index, columns=df.columns)
signal = np.where(short_ma < long_ma, 0, signal) signal = np.where(short_ma < long_ma, 0, signal) # 卖出信号更新
signal = pd.DataFrame(signal, index=df.index, columns=df.columns).ffill() # 前向填充信号
# 前向填充信号,持仓不变 signal = signal.fillna(0) # 将 NaN 替换为 0
signal = pd.DataFrame(signal, index=df.index, columns=df.columns).ffill() return signal # 返回信号
# 将剩余的 NaN 替换为 0
signal = signal.fillna(0)
return signal
async def storage_backtest_data(source_column_name, result, signal, stock_code, stock_data_series, short_window, async def storage_backtest_data(user_id, result, stock_data_series, source_column_name, signal, stock_weights,
long_window): strategy_parame, strategy_name,short_window,long_window):
await init_tortoise() """
将回测数据存储到数据库
参数:
user_id: int, 用户ID
stock_weights: dict, 股票权重字典
strategy_parame: dict, 策略参数字典
strategy_name: str, 策略名称
"""
await init_tortoise() # 初始化数据库连接
# 要存储的字段列表 # 要存储的字段列表
fields_to_store = [ fields_to_store = [
'stock_code', 'strategy_name', 'stock_close_price', 'daily_price', 'stock_weights', 'strategy_name', 'stock_close_price', 'daily_price',
'price', 'returns', 'data_start_time', 'data_end_time', 'price', 'returns', 'data_start_time', 'data_end_time',
'backtest_end_time', 'position', 'backtest_name', 'rf', 'total_return', 'cagr', 'backtest_end_time', 'position', 'backtest_name', 'rf', 'total_return', 'cagr',
'max_drawdown', 'calmar', 'mtd', 'three_month', 'max_drawdown', 'calmar', 'mtd', 'three_month',
@ -85,7 +117,7 @@ async def storage_backtest_data(source_column_name, result, signal, stock_code,
# 准备要存储的数据 # 准备要存储的数据
data_to_store = { data_to_store = {
'stock_code': stock_code, 'stock_weights': stock_weights,
'strategy_name': "双均线策略", 'strategy_name': "双均线策略",
'stock_close_price': json.dumps(stock_data_series.fillna(0).rename_axis('time').reset_index().assign( 'stock_close_price': json.dumps(stock_data_series.fillna(0).rename_axis('time').reset_index().assign(
time=stock_data_series.index.strftime('%Y%m%d')).set_index('time').to_dict(orient='index')), time=stock_data_series.index.strftime('%Y%m%d')).set_index('time').to_dict(orient='index')),
@ -96,7 +128,7 @@ async def storage_backtest_data(source_column_name, result, signal, stock_code,
'data_end_time': pd.to_datetime(result.stats.loc["end"].iloc[0]).strftime('%Y%m%d'), 'data_end_time': pd.to_datetime(result.stats.loc["end"].iloc[0]).strftime('%Y%m%d'),
'backtest_end_time': int(datetime.now().strftime('%Y%m%d')), 'backtest_end_time': int(datetime.now().strftime('%Y%m%d')),
'position': convert_pandas_to_json_serializable(signal), 'position': convert_pandas_to_json_serializable(signal),
'backtest_name': f'{stock_code} 双均线策略 MA{short_window}-{long_window}', 'backtest_name': f'{stock_weights} 双均线策略 MA{short_window}-{long_window}',
'indicator_type': 'SMA', 'indicator_type': 'SMA',
'indicator_information': json.dumps({'short_window': short_window, 'long_window': long_window}) 'indicator_information': json.dumps({'short_window': short_window, 'long_window': long_window})
} }
@ -107,154 +139,178 @@ async def storage_backtest_data(source_column_name, result, signal, stock_code,
data_to_store[field] = 0.0 if (isinstance(value, float) and np.isnan(value)) else value data_to_store[field] = 0.0 if (isinstance(value, float) and np.isnan(value)) else value
# 检查是否存在该 backtest_name # 检查是否存在该 backtest_name
existing_record = await wance_data_storage_backtest.WanceDataStorageBacktest.filter( existing_record = await user_combination_history.UserCombinationHistory.filter(user_id=user_id).first()
backtest_name=data_to_store['backtest_name']
).first() stock_weights_json = json.dumps(stock_weights) # 将权重字典转换为JSON格式
strategy_parame_json = json.dumps(strategy_parame) # 将策略参数字典转换为JSON格式
if existing_record: if existing_record:
# 如果存在,更新记录 # 如果存在,更新记录
await wance_data_storage_backtest.WanceDataStorageBacktest.filter( await user_combination_history.UserCombinationHistory.filter(
id=existing_record.id id=existing_record.id
).update(**data_to_store) ).update(user_id=user_id,
last_time=int(datetime.now().strftime('%Y%m%d')), # 更新时间
stock_weights=stock_weights_json,
strategy_name=strategy_name,
strategy_parame=strategy_parame_json)
else: else:
# 如果不存在,创建新的记录 # 如果不存在,创建新的记录
await wance_data_storage_backtest.WanceDataStorageBacktest.create(**data_to_store) await user_combination_history.UserCombinationHistory.create(user_id=user_id,
last_time=int(datetime.now().strftime('%Y%m%d')),
stock_weights=stock_weights_json,
strategy_name=strategy_name,
strategy_parame=strategy_parame_json)
return data_to_store return data_to_store
async def run_sma_backtest(field_list: list, async def run_combined_sma_backtest(field_list: list,
stock_list: list, stock_list: list,
stock_weights: list, stock_weights: list,
user_id: int = 1, user_id: int = 1,
period: str = '1d', period: str = '1d',
start_time: str = '', start_time: str = '',
end_time: str = '', end_time: str = '',
count: int = 100, count: int = 100,
dividend_type: str = 'none', dividend_type: str = 'none',
fill_data: bool = True, fill_data: bool = True,
data_dir: str = '', data_dir: str = '',
short_window: int = 50, short_window: int = 50,
long_window: int = 200): long_window: int = 200):
"""
运行组合双均线策略的回测
参数:
field_list: list, 需要获取的字段列表
stock_list: list, 股票代码列表
stock_weights: list, 股票权重列表
user_id: int, 用户ID默认为1
period: str, 数据时间周期默认为'1d'
start_time: str, 开始时间默认为''空字符串表示不限制
end_time: str, 结束时间默认为''空字符串表示不限制
count: int, 获取的数据数量默认为100
dividend_type: str, 分红类型默认为'none'
fill_data: bool, 是否填充缺失数据默认为True
data_dir: str, 数据目录默认为''空字符串表示当前目录
short_window: int, 短期均线窗口期默认为50
long_window: int, 长期均线窗口期默认为200
返回:
result: bt.BacktestResult, 回测结果
"""
try: try:
# 初始化一个列表用于存储每只股票的回测结果字典 # 获取本地数据
results_list = []
# 遍历每只股票的数据(每列代表一个股票的收盘价)
data = await get_local_data(field_list, stock_list, period, start_time, end_time, count, dividend_type, data = await get_local_data(field_list, stock_list, period, start_time, end_time, count, dividend_type,
fill_data, fill_data, data_dir)
data_dir)
for stock_code in stock_list: # 将 stock_weights 转换为字典形式,提取第一个字典作为权重
stock_weights_dict = stock_weights[0] if stock_weights else {}
data_column_name = f'close_{stock_code}' # 创建组合双均线策略
source_column_name = f'{stock_code} 双均线策略' strategy, signal = await create_combined_sma_strategy(data, stock_weights_dict, short_window, long_window)
backtest_name = f'{stock_code} 双均线策略 MA{short_window}-{long_window}'
now_data = int(datetime.now().strftime('%Y%m%d'))
db_result_data = await wance_data_storage_backtest.WanceDataStorageBacktest.filter(
backtest_name=backtest_name)
if db_result_data: # 创建回测
if db_result_data[0].backtest_end_time == now_data: backtest = bt.Backtest(strategy, data, initial_capital=100000) # 初始化回测对象
results_list.append({source_column_name: db_result_data[0]})
if data_column_name in data.columns: strategy_parame = {"short_window": short_window, "long_window": long_window} # 策略参数
stock_data_series = data[[data_column_name]] # 提取该股票的收盘价 DataFrame
stock_data_series.columns = ['close'] # 重命名列为 'close'
# 创建双均线策略 strategy_name = "双均线策略" # 策略名称
strategy, signal = await create_dual_ma_strategy(stock_data_series, stock_code,
short_window=short_window,
long_window=long_window)
# 创建回测
backtest = bt.Backtest(strategy=strategy, data=stock_data_series, initial_capital=100000)
# 运行回测
result = bt.run(backtest)
# 存储回测结果
data_to_store = await storage_backtest_data(source_column_name, result, signal, stock_code,
stock_data_series,
short_window, long_window)
# # 绘制回测结果图表
# result.plot()
# # 绘制个别股票数据图表
# plt.figure(figsize=(12, 6))
# plt.plot(stock_data_series.index, stock_data_series['close'], label='Stock Price')
# plt.title(f'Stock Price for {stock_code}')
# plt.xlabel('Date')
# plt.ylabel('Price')
# plt.legend()
# plt.grid(True)
# plt.show()
# 将结果存储为字典并添加到列表中
results_list.append({source_column_name: data_to_store})
else: # 运行回测
print(f"数据中缺少列: {data_column_name}") result = bt.run(backtest) # 执行回测
return results_list # 返回结果列表 # 存储回测数据
result = await storage_backtest_data(user_id=user_id, result=result, stock_data_series=data,
source_column_name="组合双均线策略",
signal=signal, stock_weights=stock_weights_dict,
strategy_parame=strategy_parame,
strategy_name=strategy_name,short_window=short_window,long_window=long_window)
return result # 返回回测结果
except Exception as e: except Exception as e:
print(f"Error occurred: {e}") print(f"Error occurred: {e}") # 打印错误信息
async def start_sma_combination_service(field_list: list, async def start_sma_combination_service(field_list: list,
stock_list: list, stock_list: list,
stock_weights:list, stock_weights: list,
user_id:int = 1, user_id: int = 1,
period: str = '1d', period: str = '1d',
start_time: str = '', start_time: str = '',
end_time: str = '', end_time: str = '',
count: int = -1, count: int = -1,
dividend_type: str = 'none', dividend_type: str = 'none',
fill_data: bool = True, fill_data: bool = True,
data_dir: str = '', data_dir: str = '',
short_window: int = 50, short_window: int = 50,
long_window: int = 200): long_window: int = 200):
for stock_code in stock_list: """
backtest_name = f'{stock_code} 双均线策略 MA{short_window}-{long_window}' 启动双均线组合策略服务
db_result = await wance_data_storage_backtest.WanceDataStorageBacktest.filter(
backtest_name=backtest_name)
now_time = int(datetime.now().strftime('%Y%m%d'))
if db_result and db_result[0].backtest_end_time == now_time: 参数:
return db_result field_list: list, 需要获取的字段列表
else: stock_list: list, 股票代码列表
# 执行回测 stock_weights: list, 股票权重列表
result = await run_sma_backtest( user_id: int, 用户ID默认为1
field_list=field_list, period: str, 数据时间周期默认为'1d'
stock_list=stock_list, start_time: str, 开始时间默认为''空字符串表示不限制
period=period, end_time: str, 结束时间默认为''空字符串表示不限制
start_time=start_time, count: int, 获取的数据数量默认为-1表示不限制
end_time=end_time, dividend_type: str, 分红类型默认为'none'
count=count, fill_data: bool, 是否填充缺失数据默认为True
dividend_type=dividend_type, data_dir: str, 数据目录默认为''空字符串表示当前目录
fill_data=fill_data, short_window: int, 短期均线窗口期默认为50
data_dir=data_dir, long_window: int, 长期均线窗口期默认为200
short_window=short_window,
long_window=long_window, 返回:
) result: bt.BacktestResult, 回测结果
return result """
result = await run_combined_sma_backtest(
field_list=field_list,
stock_list=stock_list,
user_id=user_id,
stock_weights=stock_weights,
period=period,
start_time=start_time,
end_time=end_time,
count=count,
dividend_type=dividend_type,
fill_data=fill_data,
data_dir=data_dir,
short_window=short_window,
long_window=long_window,
)
return result # 返回回测结果
async def init_backtest_db(): async def init_backtest_db():
"""
初始化回测数据库针对每个股票和不同的均线组合运行回测
"""
# 定义短期和长期均线组合
sma_list = [{"short_window": 5, "long_window": 10}, {"short_window": 10, "long_window": 30}, sma_list = [{"short_window": 5, "long_window": 10}, {"short_window": 10, "long_window": 30},
{"short_window": 30, "long_window": 60}, {"short_window": 30, "long_window": 90}, {"short_window": 30, "long_window": 60}, {"short_window": 30, "long_window": 90},
{"short_window": 70, "long_window": 140}, {"short_window": 120, "long_window": 250}] {"short_window": 70, "long_window": 140}, {"short_window": 120, "long_window": 250}]
await init_tortoise()
wance_db = await wance_data_stock.WanceDataStock.all()
sma_list_lenght = len(sma_list)
await init_tortoise() # 初始化数据库连接
wance_db = await wance_data_stock.WanceDataStock.all() # 获取所有股票数据
sma_list_length = len(sma_list) # 获取均线组合的数量
# 遍历每只股票,针对每个均线组合运行回测
for stock_code in wance_db: for stock_code in wance_db:
for i in range(sma_list_lenght): for i in range(sma_list_length):
short_window = sma_list[i]['short_window'] short_window = sma_list[i]['short_window'] # 短期均线
long_window = sma_list[i]['long_window'] long_window = sma_list[i]['long_window'] # 长期均线
source_column_name = f'{stock_code} 双均线策略 MA{short_window}-{long_window}' source_column_name = f'{stock_code} 双均线策略 MA{short_window}-{long_window}' # 结果列名称
result = await run_sma_backtest(field_list=['close', 'time'],
stock_list=[stock_code.stock_code],
short_window=short_window,
long_window=long_window)
print(f"回测成功 {source_column_name}") # 运行回测
result = await run_combined_sma_backtest(field_list=['close', 'time'],
stock_list=[stock_code.stock_code],
short_window=short_window,
long_window=long_window)
print(f"回测成功 {source_column_name}") # 打印回测成功信息
if __name__ == '__main__': if __name__ == '__main__':
@ -265,4 +321,10 @@ if __name__ == '__main__':
# long_window=30)) # long_window=30))
# 初始化数据库表 # 初始化数据库表
asyncio.run(init_backtest_db()) # asyncio.run(init_backtest_db())
asyncio.run(run_combined_sma_backtest(field_list=['close', 'time'],
stock_list=["688031.SH", "600025.SH", "601222.SH"],
user_id=1,
stock_weights=[{"688031.SH": 0.5, "600025.SH": 0.3, "601222.SH": 0.2}],
short_window=10,
long_window=30))

@ -0,0 +1,254 @@
import asyncio
import json
from datetime import datetime
import bt
import numpy as np
import pandas as pd
from src.models import wance_data_storage_backtest, wance_data_stock, user_combination_history
from src.tortoises_orm_config import init_tortoise
from src.utils.combination_until import get_local_data
# 创建组合MACD策略函数
async def create_combined_macd_strategy(data, stock_weights_dict, short_window: int = 12, long_window: int = 26, signal_window: int = 9):
"""
创建组合MACD策略并根据股票权重进行回测
参数:
data: pd.DataFrame, 股票价格数据行索引为日期列为股票代码
stock_weights_dict: dict, 股票权重字典键为股票代码值为相应权重
short_window: int, 短期EMA窗口期默认为12
long_window: int, 长期EMA窗口期默认为26
signal_window: int, 信号线窗口期默认为9
返回:
strategy: bt.Strategy, 创建的组合MACD策略
combined_signal: pd.DataFrame, 各股票的买卖信号
"""
combined_signal = pd.DataFrame(index=data.index) # 创建一个空的DataFrame用于存放买卖信号
# 遍历每个股票代码计算对应的MACD信号
for stock_code in stock_weights_dict.keys():
if stock_code in data.columns:
stock_data_series = data[[stock_code]] # 获取该股票的价格数据
stock_data_series.columns = [stock_code] # 重命名列
signal = await macd_strategy(stock_data_series, short_window, long_window, signal_window) # 计算MACD信号
combined_signal[stock_code] = signal # 将信号存入combined_signal
else:
print(f"Warning: Stock code {stock_code} not found in data columns.") # 如果股票代码不存在,发出警告
# 将权重转换为 DataFrame行索引为数据的时间列为股票代码
weights = pd.DataFrame(index=data.index, data=[stock_weights_dict] * len(data.index))
# 使用bt框架构建组合策略
strategy = bt.Strategy('组合MACD策略',
[bt.algos.RunDaily(), # 每日运行策略
bt.algos.SelectAll(), # 选择所有股票
bt.algos.WeighSpecified(**stock_weights_dict), # 按照指定的权重分配
bt.algos.Rebalance()]) # 调整持仓
return strategy, combined_signal # 返回创建的策略和买卖信号
# MACD策略生成买卖信号
async def macd_strategy(df, short_window=12, long_window=26, signal_window=9):
"""
基于MACD策略生成买卖信号
参数:
df: pd.DataFrame, 股票的价格数据行索引为日期列为股票代码
short_window: int, 短期EMA窗口期默认为12
long_window: int, 长期EMA窗口期默认为26
signal_window: int, 信号线窗口期默认为9
返回:
signal: pd.DataFrame, 每只股票的买卖信号1 表示买入-1 表示卖出
"""
# 计算短期和长期EMA
short_ema = df.ewm(span=short_window, adjust=False).mean() # 短期指数移动平均
long_ema = df.ewm(span=long_window, adjust=False).mean() # 长期指数移动平均
# 计算MACD和信号线
macd = short_ema - long_ema # MACD值
signal_line = macd.ewm(span=signal_window, adjust=False).mean() # 信号线
# 生成买卖信号
signal = np.where(macd > signal_line, 1, 0) # MACD上穿信号线 -> 买入
signal = np.where(macd < signal_line, -1, signal) # MACD下穿信号线 -> 卖出
signal = pd.DataFrame(signal, index=df.index, columns=df.columns).ffill() # 前向填充信号
signal = signal.fillna(0) # 将所有NaN替换为0
return signal # 返回买卖信号
# 保存回测数据
async def storage_backtest_data(user_id, stock_weights, strategy_parame, strategy_name):
"""
将回测结果保存到数据库
参数:
user_id: int, 用户ID
stock_weights: list, 股票权重
strategy_parame: dict, 策略参数
strategy_name: str, 策略名称
"""
await init_tortoise() # 初始化Tortoise ORM
# 检查是否存在该 backtest_name
existing_record = await user_combination_history.UserCombinationHistory.filter(user_id=user_id).first()
# 将股票权重和策略参数转换为JSON格式
stock_weights_json = json.dumps(stock_weights)
strategy_parame_json = json.dumps(strategy_parame)
if existing_record: # 如果存在记录,则更新
await user_combination_history.UserCombinationHistory.filter(
id=existing_record.id
).update(user_id=user_id,
last_time=int(datetime.now().strftime('%Y%m%d')),
stock_weights=stock_weights_json,
strategy_name=strategy_name,
strategy_parame=strategy_parame_json)
else: # 如果不存在记录,则创建新的记录
await user_combination_history.UserCombinationHistory.create(user_id=user_id,
last_time=int(datetime.now().strftime('%Y%m%d')),
stock_weights=stock_weights_json,
strategy_name=strategy_name,
strategy_parame=strategy_parame_json)
# 运行组合MACD策略回测
async def run_combined_macd_backtest(field_list: list,
stock_list: list,
stock_weights: list,
user_id: int = 1,
period: str = '1d',
start_time: str = '',
end_time: str = '',
count: int = 100,
dividend_type: str = 'none',
fill_data: bool = True,
data_dir: str = '',
short_window: int = 12,
long_window: int = 26,
signal_window: int = 9):
"""
运行组合MACD策略的回测
参数:
field_list: list, 需要获取的字段列表
stock_list: list, 股票代码列表
stock_weights: list, 股票权重列表
user_id: int, 用户ID默认为1
period: str, 数据时间周期默认为'1d'
start_time: str, 开始时间默认为''空字符串表示不限制
end_time: str, 结束时间默认为''空字符串表示不限制
count: int, 获取的数据数量默认为100
dividend_type: str, 分红类型默认为'none'
fill_data: bool, 是否填充缺失数据默认为True
data_dir: str, 数据目录默认为''空字符串表示当前目录
short_window: int, 短期EMA窗口期默认为12
long_window: int, 长期EMA窗口期默认为26
signal_window: int, 信号线窗口期默认为9
返回:
result: bt.BacktestResult, 回测结果
"""
try:
# 获取本地数据
data = await get_local_data(field_list, stock_list, period, start_time, end_time, count, dividend_type,
fill_data, data_dir)
# 将 stock_weights 转换为字典形式,提取第一个字典作为权重
stock_weights_dict = stock_weights[0] if stock_weights else {}
# 创建组合MACD策略
strategy, signal = await create_combined_macd_strategy(data, stock_weights_dict, short_window, long_window, signal_window)
# 创建回测
backtest = bt.Backtest(strategy, data, initial_capital=100000) # 使用初始资本创建回测
# 策略参数和名称
strategy_parame = {"short_window": short_window, "long_window": long_window, "signal_window": signal_window}
strategy_name = "MACD策略"
# 运行回测
result = bt.run(backtest) # 运行回测
# 保存回测数据
await storage_backtest_data(user_id, stock_weights_dict, strategy_parame, strategy_name)
return result # 返回回测结果
except Exception as e:
print(f"Error occurred: {e}") # 捕获并打印异常
# 启动MACD策略服务
async def start_macd_combination_service(field_list: list,
stock_list: list,
stock_weights: list,
user_id: int = 1,
period: str = '1d',
start_time: str = '',
end_time: str = '',
count: int = -1,
dividend_type: str = 'none',
fill_data: bool = True,
data_dir: str = '',
short_window: int = 12,
long_window: int = 26,
signal_window: int = 9):
"""
启动MACD策略组合服务
参数:
field_list: list, 需要获取的字段列表
stock_list: list, 股票代码列表
stock_weights: list, 股票权重列表
user_id: int, 用户ID默认为1
period: str, 数据时间周期默认为'1d'
start_time: str, 开始时间默认为''空字符串表示不限制
end_time: str, 结束时间默认为''空字符串表示不限制
count: int, 获取的数据数量默认为-1表示获取所有可用数据
dividend_type: str, 分红类型默认为'none'
fill_data: bool, 是否填充缺失数据默认为True
data_dir: str, 数据目录默认为''空字符串表示当前目录
short_window: int, 短期EMA窗口期默认为12
long_window: int, 长期EMA窗口期默认为26
signal_window: int, 信号线窗口期默认为9
返回:
result: bt.BacktestResult, 回测结果
"""
result = await run_combined_macd_backtest(
field_list=field_list,
stock_list=stock_list,
user_id=user_id,
stock_weights=stock_weights,
period=period,
start_time=start_time,
end_time=end_time,
count=count,
dividend_type=dividend_type,
fill_data=fill_data,
data_dir=data_dir,
short_window=short_window,
long_window=long_window,
signal_window=signal_window
)
return result # 返回回测结果
if __name__ == '__main__':
# 运行回测
asyncio.run(run_combined_macd_backtest(field_list=['close', 'time'],
stock_list=["688031.SH", "600025.SH", "601222.SH"],
user_id=1,
stock_weights=[{"688031.SH": 0.5, "600025.SH": 0.3, "601222.SH": 0.2}],
short_window=12,
long_window=26,
signal_window=9))

@ -6,256 +6,249 @@ import bt
import numpy as np import numpy as np
import pandas as pd import pandas as pd
from src.utils.backtest_until import get_local_data, convert_pandas_to_json_serializable from src.models import wance_data_storage_backtest, wance_data_stock, user_combination_history
from src.models import wance_data_storage_backtest, wance_data_stock
from src.tortoises_orm_config import init_tortoise from src.tortoises_orm_config import init_tortoise
from src.utils.combination_until import get_local_data
# 组合反双均线策略函数
# 反双均线策略函数 async def create_combined_sma_strategy(data, stock_weights_dict, short_window: int = 50, long_window: int = 200):
async def create_dual_ma_strategy(data, stock_code: str, short_window: int = 50, long_window: int = 200):
# 生成反双均线策略信号
signal = await reverse_dual_ma_strategy(data, short_window, long_window)
# 使用bt框架构建策略
strategy = bt.Strategy(f'{stock_code} 反双均线策略',
[bt.algos.RunDaily(),
bt.algos.SelectAll(), # 选择所有股票
bt.algos.WeighTarget(signal), # 根据信号调整权重
bt.algos.Rebalance()]) # 调仓
return strategy, signal
# 定义反反双均线策略的函数
def reverse_dual_ma_strategy(data, short_window=50, long_window=200):
""" """
反反双均线策略当短期均线跌破长期均线时买入穿过长期均线时卖出 创建组合反双均线策略并根据股票权重进行回测
参数: 参数:
data: pd.DataFrame, 股票的价格数据行索引为日期列为股票代码 data: pd.DataFrame, 包含股票的历史价格数据行索引为时间列为股票代码
short_window: int, 短期均线的窗口期 stock_weights_dict: dict, 包含每只股票的权重
long_window: int, 长期均线的窗口期 short_window: int, 短期均线的计算窗口默认为50
long_window: int, 长期均线的计算窗口默认为200
返回:
strategy: bt.Strategy, 创建的组合策略
combined_signal: pd.DataFrame, 每只股票的买卖信号
"""
# 创建一个 DataFrame 用于存储组合信号,索引为数据的日期
combined_signal = pd.DataFrame(index=data.index)
# 遍历每只股票,根据其权重生成信号
for stock_code in stock_weights_dict.keys():
# 确保每个股票代码在数据中唯一
if f'{stock_code}' in data.columns:
# 选择对应股票的数据列
stock_data_series = data[[f'{stock_code}']]
stock_data_series.columns = [f'{stock_code}']
# 调用反双均线策略生成买卖信号
signal = await reverse_dual_ma_strategy(stock_data_series, short_window, long_window)
# 将生成的信号添加到组合信号 DataFrame 中
combined_signal[stock_code] = signal
else:
print(f"Warning: Stock code {stock_code} not found in data columns.")
# 将权重字典转换为 DataFrame行索引为数据的时间列为股票代码
weights = pd.DataFrame(index=data.index, data=[stock_weights_dict] * len(data.index))
# 使用 bt 框架构建组合策略
strategy = bt.Strategy('组合反双均线策略',
[bt.algos.RunDaily(), # 每天运行策略
bt.algos.SelectAll(), # 选择所有股票
bt.algos.WeighSpecified(**stock_weights_dict), # 使用指定的权重
bt.algos.Rebalance()]) # 调仓
return strategy, combined_signal
# 反双均线策略生成买卖信号
async def reverse_dual_ma_strategy(df, short_window=20, long_window=50):
"""
基于反双均线策略生成买卖信号
参数:
df: pd.DataFrame, 股票的价格数据行索引为日期列为股票代码
short_window: int, 短期均线窗口期
long_window: int, 长期均线窗口期
返回: 返回:
signal: pd.DataFrame, 每只股票的买卖信号1 表示买入0 表示卖出 signal: pd.DataFrame, 每只股票的买卖信号1 表示买入0 表示卖出
""" """
# 计算短期均线和长期均线 # 计算短期和长期均线
short_ma = data.rolling(window=short_window).mean() short_ma = df.rolling(window=short_window, min_periods=1).mean()
long_ma = data.rolling(window=long_window).mean() long_ma = df.rolling(window=long_window, min_periods=1).mean()
# 初始化信号 DataFrame # 反双均线策略的买卖信号生成规则
signal = pd.DataFrame(index=data.index, columns=data.columns) buy_signal = np.where(short_ma < long_ma, 1, np.nan) # 短期均线跌破长期均线 -> 买入信号
sell_signal = np.where(short_ma > long_ma, 0, np.nan) # 短期均线上穿长期均线 -> 卖出信号
# 生成买入信号:短期均线从上往下穿过长期均线 # 创建信号 DataFrame
for column in data.columns: signal = pd.DataFrame(buy_signal, index=df.index, columns=df.columns)
signal[column] = (short_ma[column] < long_ma[column]).astype(int) # 跌破时买入信号为1 # 将卖出信号应用到信号 DataFrame 中
signal[column] = (short_ma[column] > long_ma[column]).astype(int) * -1 + signal[column] # 穿过时卖出信号为0 signal = np.where(short_ma > long_ma, 0, signal)
signal = pd.DataFrame(signal, index=df.index, columns=df.columns).ffill() # 向前填充信号
# 前向填充信号,保持持仓不变
signal = signal.ffill()
signal = signal.fillna(0) # 将 NaN 转换为 0
return signal return signal
async def storage_backtest_data(source_column_name, result, signal, stock_code, stock_data_series, short_window, # 保存回测数据
long_window): async def storage_backtest_data(user_id, stock_weights, strategy_parame, strategy_name):
await init_tortoise() """
保存回测数据到数据库
# 要存储的字段列表 参数:
fields_to_store = [ user_id: int, 用户的 ID
'stock_code', 'strategy_name', 'stock_close_price', 'daily_price', stock_weights: dict, 股票权重
'price', 'returns', 'data_start_time', 'data_end_time', strategy_parame: dict, 策略参数
'backtest_end_time', 'position', 'backtest_name', 'rf', 'total_return', 'cagr', strategy_name: str, 策略名称
'max_drawdown', 'calmar', 'mtd', 'three_month', """
'six_month', 'ytd', 'one_year', 'three_year', await init_tortoise() # 初始化 Tortoise ORM
'five_year', 'ten_year', 'incep', 'daily_sharpe',
'daily_sortino', 'daily_mean', 'daily_vol',
'daily_skew', 'daily_kurt', 'best_day', 'worst_day',
'monthly_sharpe', 'monthly_sortino', 'monthly_mean',
'monthly_vol', 'monthly_skew', 'monthly_kurt',
'best_month', 'worst_month', 'yearly_sharpe',
'yearly_sortino', 'yearly_mean', 'yearly_vol',
'yearly_skew', 'yearly_kurt', 'best_year', 'worst_year',
'avg_drawdown', 'avg_drawdown_days', 'avg_up_month',
'avg_down_month', 'win_year_perc', 'twelve_month_win_perc'
]
# 准备要存储的数据
data_to_store = {
'stock_code': stock_code,
'strategy_name': "反双均线策略",
'stock_close_price': json.dumps(stock_data_series.fillna(0).rename_axis('time').reset_index().assign(
time=stock_data_series.index.strftime('%Y%m%d')).set_index('time').to_dict(orient='index')),
'daily_price': convert_pandas_to_json_serializable(result[source_column_name].daily_prices),
'price': convert_pandas_to_json_serializable(result[source_column_name].prices),
'returns': convert_pandas_to_json_serializable(result[source_column_name].returns.fillna(0)),
'data_start_time': pd.to_datetime(result.stats.loc["start"].iloc[0]).strftime('%Y%m%d'),
'data_end_time': pd.to_datetime(result.stats.loc["end"].iloc[0]).strftime('%Y%m%d'),
'backtest_end_time': int(datetime.now().strftime('%Y%m%d')),
'position': convert_pandas_to_json_serializable(signal),
'backtest_name': f'{stock_code} 反双均线策略 MA{short_window}-{long_window}',
'indicator_type': 'reverse_SMA',
'indicator_information': json.dumps({'short_window': short_window, 'long_window': long_window})
}
# 使用循环填充其他字段
for field in fields_to_store[12:]: # 从第10个字段开始
value = result.stats.loc[field].iloc[0]
data_to_store[field] = 0.0 if (isinstance(value, float) and np.isnan(value)) else value
# 检查是否存在该 backtest_name # 检查是否存在该 backtest_name
existing_record = await wance_data_storage_backtest.WanceDataStorageBacktest.filter( existing_record = await user_combination_history.UserCombinationHistory.filter(user_id=user_id).first()
backtest_name=data_to_store['backtest_name']
).first() # 将权重和策略参数转换为 JSON 格式
stock_weights_json = json.dumps(stock_weights)
strategy_parame_json = json.dumps(strategy_parame)
if existing_record: if existing_record:
# 如果存在,更新记录 # 如果记录已存在,更新记录
await wance_data_storage_backtest.WanceDataStorageBacktest.filter( await user_combination_history.UserCombinationHistory.filter(
id=existing_record.id id=existing_record.id
).update(**data_to_store) ).update(user_id=user_id,
last_time=int(datetime.now().strftime('%Y%m%d')),
stock_weights=stock_weights_json,
strategy_name=strategy_name,
strategy_parame=strategy_parame_json)
else: else:
# 如果不存在,创建新的记录 # 如果记录不存在,创建新的记录
await wance_data_storage_backtest.WanceDataStorageBacktest.create(**data_to_store) await user_combination_history.UserCombinationHistory.create(user_id=user_id,
last_time=int(datetime.now().strftime('%Y%m%d')),
return data_to_store stock_weights=stock_weights_json,
strategy_name=strategy_name,
strategy_parame=strategy_parame_json)
async def run_reverse_reverse_SMA_backtest(field_list: list, # 运行组合反双均线策略回测
stock_list: list, async def run_combined_sma_backtest(field_list: list,
period: str = '1d', stock_list: list,
start_time: str = '', stock_weights: list,
end_time: str = '', user_id: int = 1,
count: int = 100, period: str = '1d',
dividend_type: str = 'none', start_time: str = '',
fill_data: bool = True, end_time: str = '',
data_dir: str = '', count: int = 100,
short_window: int = 50, dividend_type: str = 'none',
long_window: int = 200): fill_data: bool = True,
data_dir: str = '',
short_window: int = 50,
long_window: int = 200):
"""
运行组合反双均线策略的回测
参数:
field_list: list, 包含需要获取的字段如收盘价时间等
stock_list: list, 需要回测的股票列表
stock_weights: list, 包含每只股票的权重字典
user_id: int, 用户 ID默认为 1
period: str, 数据时间周期默认为 '1d'
start_time: str, 开始时间默认为 ''空字符串表示不限制
end_time: str, 结束时间默认为 ''空字符串表示不限制
count: int, 获取的数据数量默认为 100
dividend_type: str, 分红类型默认为 'none'
fill_data: bool, 是否填充缺失数据默认为 True
data_dir: str, 数据目录默认为 ''空字符串表示当前目录
short_window: int, 短期均线窗口默认为 50
long_window: int, 长期均线窗口默认为 200
返回:
result: bt.BacktestResult, 回测结果
"""
try: try:
# 初始化一个列表用于存储每只股票的回测结果字典 # 获取本地数据
results_list = []
# 遍历每只股票的数据(每列代表一个股票的收盘价)
data = await get_local_data(field_list, stock_list, period, start_time, end_time, count, dividend_type, data = await get_local_data(field_list, stock_list, period, start_time, end_time, count, dividend_type,
fill_data, fill_data, data_dir)
data_dir)
for stock_code in stock_list: # 将 stock_weights 转换为字典形式,提取第一个字典作为权重
stock_weights_dict = stock_weights[0] if stock_weights else {}
data_column_name = f'close_{stock_code}' # 创建组合反双均线策略
source_column_name = f'{stock_code} 反双均线策略' strategy, signal = await create_combined_sma_strategy(data, stock_weights_dict, short_window, long_window)
backtest_name = f'{stock_code} 反双均线策略 MA{short_window}-{long_window}'
now_data = int(datetime.now().strftime('%Y%m%d'))
db_result_data = await wance_data_storage_backtest.WanceDataStorageBacktest.filter(
backtest_name=backtest_name)
if db_result_data: # 创建回测实例
if db_result_data[0].backtest_end_time == now_data: backtest = bt.Backtest(strategy, data, initial_capital=100000)
results_list.append({source_column_name: db_result_data[0]})
elif data_column_name in data.columns: # 策略参数
stock_data_series = data[[data_column_name]] # 提取该股票的收盘价 DataFrame strategy_parame = {"short_window": short_window, "long_window": long_window}
stock_data_series.columns = ['close'] # 重命名列为 'close'
# 创建反双均线策略 strategy_name = "反双均线策略"
strategy, signal = await create_dual_ma_strategy(stock_data_series, stock_code,
short_window=short_window,
long_window=long_window)
# 创建回测
backtest = bt.Backtest(strategy=strategy, data=stock_data_series, initial_capital=100000)
# 运行回测
result = bt.run(backtest)
# 存储回测结果
data_to_store = await storage_backtest_data(source_column_name, result, signal, stock_code,
stock_data_series,
short_window, long_window)
# # 绘制回测结果图表
# result.plot()
# # 绘制个别股票数据图表
# plt.figure(figsize=(12, 6))
# plt.plot(stock_data_series.index, stock_data_series['close'], label='Stock Price')
# plt.title(f'Stock Price for {stock_code}')
# plt.xlabel('Date')
# plt.ylabel('Price')
# plt.legend()
# plt.grid(True)
# plt.show()
# 将结果存储为字典并添加到列表中
results_list.append({source_column_name: data_to_store})
else: # 运行回测
print(f"数据中缺少列: {data_column_name}") result = bt.run(backtest)
return results_list # 返回结果列表 # 保存回测数据
await storage_backtest_data(user_id, stock_weights_dict, strategy_parame, strategy_name)
return result
except Exception as e: except Exception as e:
print(f"Error occurred: {e}") print(f"Error occurred: {e}")
# 启动反双均线策略服务
async def start_reverse_SMA_combination_service(field_list: list, async def start_reverse_SMA_combination_service(field_list: list,
stock_list: list, stock_list: list,
period: str = '1d', stock_weights: list,
start_time: str = '', user_id: int = 1,
end_time: str = '', period: str = '1d',
count: int = -1, start_time: str = '',
dividend_type: str = 'none', end_time: str = '',
fill_data: bool = True, count: int = -1,
data_dir: str = '', dividend_type: str = 'none',
short_window: int = 50, fill_data: bool = True,
long_window: int = 200): data_dir: str = '',
for stock_code in stock_list: short_window: int = 50,
backtest_name = f'{stock_code} 反双均线策略 MA{short_window}-{long_window}' long_window: int = 200):
db_result = await wance_data_storage_backtest.WanceDataStorageBacktest.filter( """
backtest_name=backtest_name) 启动反双均线策略服务运行回测并返回结果
now_time = int(datetime.now().strftime('%Y%m%d'))
if db_result and db_result[0].backtest_end_time == now_time: 参数:
return db_result field_list: list, 包含需要获取的字段
else: stock_list: list, 需要回测的股票列表
# 执行回测 stock_weights: list, 股票权重列表
result = await run_reverse_reverse_SMA_backtest( user_id: int, 用户 ID默认为 1
field_list=field_list, period: str, 数据时间周期默认为 '1d'
stock_list=stock_list, start_time: str, 开始时间默认为 ''空字符串表示不限制
period=period, end_time: str, 结束时间默认为 ''空字符串表示不限制
start_time=start_time, count: int, 获取的数据数量默认为 -1表示全部
end_time=end_time, dividend_type: str, 分红类型默认为 'none'
count=count, fill_data: bool, 是否填充缺失数据默认为 True
dividend_type=dividend_type, data_dir: str, 数据目录默认为 ''空字符串表示当前目录
fill_data=fill_data, short_window: int, 短期均线窗口默认为 50
data_dir=data_dir, long_window: int, 长期均线窗口默认为 200
short_window=short_window,
long_window=long_window,
)
return result
返回:
async def init_backtest_db(): result: bt.BacktestResult, 回测结果
reverse_SMA_list = [{"short_window": 5, "long_window": 10}, {"short_window": 10, "long_window": 30}, """
{"short_window": 30, "long_window": 60}, {"short_window": 30, "long_window": 90}, result = await run_combined_sma_backtest(
{"short_window": 70, "long_window": 140}, {"short_window": 120, "long_window": 250}] field_list=field_list,
await init_tortoise() stock_list=stock_list,
wance_db = await wance_data_stock.WanceDataStock.all() user_id=user_id,
reverse_SMA_list_lenght = len(reverse_SMA_list) stock_weights=stock_weights,
period=period,
for stock_code in wance_db: start_time=start_time,
for i in range(reverse_SMA_list_lenght): end_time=end_time,
short_window = reverse_SMA_list[i]['short_window'] count=count,
long_window = reverse_SMA_list[i]['long_window'] dividend_type=dividend_type,
source_column_name = f'{stock_code} 反双均线策略 MA{short_window}-{long_window}' fill_data=fill_data,
result = await run_reverse_reverse_SMA_backtest(field_list=['close', 'time'], data_dir=data_dir,
stock_list=[stock_code.stock_code], short_window=short_window,
short_window=short_window, long_window=long_window,
long_window=long_window) )
return result
print(f"回测成功 {source_column_name}")
if __name__ == '__main__': if __name__ == '__main__':
# 测试类的回测 # 运行回测
# asyncio.run(run_reverse_SMA_backtest(field_list=['close', 'time'], asyncio.run(run_combined_sma_backtest(field_list=['close', 'time'],
# stock_list=['601222.SH', '601677.SH'], stock_list=["688031.SH", "600025.SH", "601222.SH"],
# short_window=10, user_id=1,
# long_window=30)) stock_weights=[{"688031.SH": 0.5, "600025.SH": 0.3, "601222.SH": 0.2}],
short_window=10,
# 初始化数据库表 long_window=30))
asyncio.run(init_backtest_db())

@ -34,7 +34,3 @@ async def stock_chart(request: BackRequest):
benchmark_code=request.benchmark_code) benchmark_code=request.benchmark_code)
return result return result
@router.get('/combination')
async def combination(request: BackRequest):
await combination_service()

@ -2,292 +2,261 @@ import asyncio
import json import json
from datetime import datetime from datetime import datetime
import bt import bt # 引入bt框架以进行回测
import numpy as np import numpy as np
import pandas as pd import pandas as pd
from src.utils.backtest_until import get_local_data, convert_pandas_to_json_serializable from src.models import wance_data_storage_backtest, wance_data_stock, user_combination_history # 导入数据库模型
from src.models import wance_data_storage_backtest, wance_data_stock from src.tortoises_orm_config import init_tortoise # 初始化数据库
from src.tortoises_orm_config import init_tortoise from src.utils.combination_until import get_local_data # 获取本地数据的工具函数
# RSI策略函数 # 创建RSI策略函数
async def create_dual_ma_strategy(data, stock_code: str, short_window: int = 50, long_window: int = 200, async def create_rsi_strategy(data, stock_weights_dict, rsi_period: int = 14, overbought: float = 70,
overbought: int = 70, oversold: int = 30): oversold: float = 30):
# 生成RSI策略信号
signal = await rsi_strategy(data, short_window, long_window, overbought, oversold)
# 使用bt框架构建策略
strategy = bt.Strategy(f'{stock_code} RSI策略',
[bt.algos.RunDaily(),
bt.algos.SelectAll(), # 选择所有股票
bt.algos.WeighTarget(signal), # 根据信号调整权重
bt.algos.Rebalance()]) # 调仓
return strategy, signal
async def rsi_strategy(df, short_window=14, long_window=28, overbought=70, oversold=30):
""" """
基于RSI的策略生成买卖信号 创建组合RSI策略并根据股票权重进行回测
参数:
data: pd.DataFrame, 股票的价格数据行索引为日期列为股票代码
stock_weights_dict: dict, 每只股票的权重字典
rsi_period: int, 用于计算RSI的周期默认值为14
overbought: float, 超买阈值默认值为70
oversold: float, 超卖阈值默认值为30
返回:
strategy: bt.Strategy, 构建的RSI策略
combined_signal: pd.DataFrame, 股票的买卖信号
"""
# 创建一个数据框用于存储所有股票的买卖信号
combined_signal = pd.DataFrame(index=data.index)
# 遍历每只股票生成RSI信号
for stock_code in stock_weights_dict.keys():
if f'{stock_code}' in data.columns:
# 提取每只股票的数据
stock_data_series = data[[f'{stock_code}']]
stock_data_series.columns = [f'{stock_code}']
# 调用RSI策略计算买卖信号
signal = await rsi_strategy(stock_data_series, rsi_period, overbought, oversold)
combined_signal[stock_code] = signal # 存储信号
else:
print(f"Warning: Stock code {stock_code} not found in data columns.") # 股票未在数据中,发出警告
# 创建权重数据框
weights = pd.DataFrame(index=data.index, data=[stock_weights_dict] * len(data.index))
# 使用bt框架构建RSI策略
strategy = bt.Strategy('组合RSI策略',
[bt.algos.RunDaily(), # 每天运行策略
bt.algos.SelectAll(), # 选择所有可用的股票
bt.algos.WeighSpecified(**stock_weights_dict), # 根据给定权重进行加权
bt.algos.Rebalance()]) # 调整投资组合
return strategy, combined_signal # 返回构建的策略和信号
async def rsi_strategy(df, period=14, overbought=70, oversold=30):
"""
基于RSI策略生成买卖信号
参数: 参数:
df: pd.DataFrame, 股票的价格数据行索引为日期列为股票代码 df: pd.DataFrame, 股票的价格数据行索引为日期列为股票代码
short_window: int, 短期RSI的窗口期 period: int, RSI计算的周期默认值为14
long_window: int, 长期RSI的窗口期 overbought: float, 超买阈值默认值为70
overbought: int, 超买水平 oversold: float, 超卖阈值默认值为30
oversold: int, 超卖水平
返回: 返回:
signal: pd.DataFrame, 每只股票的买卖信号1 表示买入0 表示卖出 signal: pd.DataFrame, 每只股票的买卖信号1 表示买入0 表示卖出
""" """
delta = df.diff().fillna(0) # 计算价格变化量
delta = df.diff()
# 计算上涨和下跌的平均值
gain = (delta.where(delta > 0, 0)).rolling(window=period).mean() # 上涨的平均值
loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean() # 下跌的平均值
gain = (delta.where(delta > 0, 0).rolling(window=short_window).mean()).fillna(0) # 计算相对强弱指数RS和RSI
loss = (-delta.where(delta < 0, 0).rolling(window=short_window).mean()).fillna(0) rs = gain / loss
rsi = 100 - (100 / (1 + rs)) # 计算RSI
short_rsi = (100 - (100 / (1 + (gain / loss)))).fillna(0)
long_gain = (delta.where(delta > 0, 0).rolling(window=long_window).mean()).fillna(0)
long_loss = (-delta.where(delta < 0, 0).rolling(window=long_window).mean()).fillna(0)
long_rsi = (100 - (100 / (1 + (long_gain / long_loss)))).fillna(0)
# 创建一个数据框用于存储每只股票的买卖信号
signal = pd.DataFrame(index=df.index, columns=df.columns) signal = pd.DataFrame(index=df.index, columns=df.columns)
# 根据RSI计算买卖信号
for column in df.columns: for column in df.columns:
signal[column] = np.where((short_rsi[column] < 30) & (long_rsi[column] < 30) & (short_rsi[column] != 0) & (long_rsi[column] != 0), 1, 0) signal[column] = np.where(rsi[column] < oversold, 1, np.nan) # 当RSI低于超卖阈值生成买入信号
signal[column] = np.where((short_rsi[column] > 70) & (long_rsi[column] > 70) & (short_rsi[column] != 0) & (long_rsi[column] != 0), 0, signal[column]) signal[column] = np.where(rsi[column] > overbought, 0, signal[column]) # 当RSI高于超买阈值生成卖出信号
return signal.ffill().fillna(0) # 前向填充信号并用0填充NaN值
signal = signal.ffill().fillna(0)
return signal # 返回信号数据框
async def storage_backtest_data(source_column_name, result, signal, stock_code, stock_data_series, short_window: int, async def storage_backtest_data(user_id, stock_weights, strategy_parame, strategy_name):
long_window: int, overbought: int = 70, """
oversold: int = 30): 存储回测结果到数据库
await init_tortoise()
# 要存储的字段列表 参数:
fields_to_store = [ user_id: int, 用户的ID
'stock_code', 'strategy_name', 'stock_close_price', 'daily_price', stock_weights: dict, 股票权重字典
'price', 'returns', 'data_start_time', 'data_end_time', strategy_parame: dict, 策略参数字典
'backtest_end_time', 'position', 'backtest_name', 'rf', 'total_return', 'cagr', strategy_name: str, 策略名称
'max_drawdown', 'calmar', 'mtd', 'three_month', """
'six_month', 'ytd', 'one_year', 'three_year', await init_tortoise() # 初始化数据库
'five_year', 'ten_year', 'incep', 'daily_sharpe',
'daily_sortino', 'daily_mean', 'daily_vol',
'daily_skew', 'daily_kurt', 'best_day', 'worst_day',
'monthly_sharpe', 'monthly_sortino', 'monthly_mean',
'monthly_vol', 'monthly_skew', 'monthly_kurt',
'best_month', 'worst_month', 'yearly_sharpe',
'yearly_sortino', 'yearly_mean', 'yearly_vol',
'yearly_skew', 'yearly_kurt', 'best_year', 'worst_year',
'avg_drawdown', 'avg_drawdown_days', 'avg_up_month',
'avg_down_month', 'win_year_perc', 'twelve_month_win_perc'
]
# 准备要存储的数据 # 查询用户的已有记录
data_to_store = { existing_record = await user_combination_history.UserCombinationHistory.filter(user_id=user_id).first()
'stock_code': stock_code,
'strategy_name': "RSI策略",
'stock_close_price': json.dumps(stock_data_series.fillna(0).rename_axis('time').reset_index().assign(
time=stock_data_series.index.strftime('%Y%m%d')).set_index('time').to_dict(orient='index')),
'daily_price': convert_pandas_to_json_serializable(result[source_column_name].daily_prices),
'price': convert_pandas_to_json_serializable(result[source_column_name].prices),
'returns': convert_pandas_to_json_serializable(result[source_column_name].returns.fillna(0)),
'data_start_time': pd.to_datetime(result.stats.loc["start"].iloc[0]).strftime('%Y%m%d'),
'data_end_time': pd.to_datetime(result.stats.loc["end"].iloc[0]).strftime('%Y%m%d'),
'backtest_end_time': int(datetime.now().strftime('%Y%m%d')),
'position': convert_pandas_to_json_serializable(signal),
'backtest_name': f'{stock_code} RSI策略 RSI{short_window}-RSI{long_window}-overbought{overbought}-oversold{oversold}',
'indicator_type': 'RSI',
'indicator_information': json.dumps(
{'short_window': short_window, 'long_window': long_window, 'overbought': overbought, 'oversold': oversold})
}
# 使用循环填充其他字段 # 将股票权重和策略参数转换为JSON格式
for field in fields_to_store[12:]: # 从第12个字段开始 stock_weights_json = json.dumps(stock_weights)
value = result.stats.loc[field].iloc[0] strategy_parame_json = json.dumps(strategy_parame)
if isinstance(value, float):
if np.isnan(value):
data_to_store[field] = 0.0 # NaN 处理为 0
elif np.isinf(value): # 判断是否为无穷大或无穷小
if value > 0:
data_to_store[field] = 99999.9999 # 正无穷处理
else:
data_to_store[field] = -99999.9999 # 负无穷处理
else:
data_to_store[field] = value # 正常的浮点值
else:
data_to_store[field] = value # 非浮点类型保持不变
# 检查是否存在该 backtest_name
existing_record = await wance_data_storage_backtest.WanceDataStorageBacktest.filter(
backtest_name=data_to_store['backtest_name']
).first()
# 如果记录存在,更新记录;否则,创建新记录
if existing_record: if existing_record:
# 如果存在,更新记录 await user_combination_history.UserCombinationHistory.filter(
await wance_data_storage_backtest.WanceDataStorageBacktest.filter(
id=existing_record.id id=existing_record.id
).update(**data_to_store) ).update(user_id=user_id,
last_time=int(datetime.now().strftime('%Y%m%d')),
stock_weights=stock_weights_json,
strategy_name=strategy_name,
strategy_parame=strategy_parame_json)
else: else:
# 如果不存在,创建新的记录 await user_combination_history.UserCombinationHistory.create(user_id=user_id,
await wance_data_storage_backtest.WanceDataStorageBacktest.create(**data_to_store) last_time=int(datetime.now().strftime('%Y%m%d')),
stock_weights=stock_weights_json,
return data_to_store strategy_name=strategy_name,
strategy_parame=strategy_parame_json)
async def run_rsi_backtest(field_list: list, async def run_combined_rsi_backtest(field_list: list,
stock_list: list, stock_list: list,
period: str = '1d', stock_weights: list,
start_time: str = '', user_id: int = 1,
end_time: str = '', period: str = '1d',
count: int = 100, start_time: str = '',
dividend_type: str = 'none', end_time: str = '',
fill_data: bool = True, count: int = 100,
data_dir: str = '', dividend_type: str = 'none',
short_window: int = 50, fill_data: bool = True,
long_window: int = 200, data_dir: str = '',
overbought: int = 70, rsi_period: int = 14,
oversold: int = 30 overbought: float = 70,
): oversold: float = 30):
"""
运行组合RSI策略的回测
参数:
field_list: list, 需要获取的字段列表
stock_list: list, 需要回测的股票代码列表
stock_weights: list, 股票权重列表
user_id: int, 用户ID默认为1
period: str, 数据周期默认为'1d'
start_time: str, 开始时间默认为''
end_time: str, 结束时间默认为''
count: int, 数据数量限制默认为100
dividend_type: str, 分红类型默认为'none'
fill_data: bool, 是否填充数据默认为True
data_dir: str, 数据目录默认为''
rsi_period: int, RSI计算周期默认为14
overbought: float, 超买阈值默认为70
oversold: float, 超卖阈值默认为30
返回:
result: 回测结果
"""
try: try:
# 初始化一个列表用于存储每只股票的回测结果字典 # 获取本地数据
results_list = []
# 遍历每只股票的数据(每列代表一个股票的收盘价)
data = await get_local_data(field_list, stock_list, period, start_time, end_time, count, dividend_type, data = await get_local_data(field_list, stock_list, period, start_time, end_time, count, dividend_type,
fill_data, fill_data, data_dir)
data_dir)
for stock_code in stock_list: # 获取股票权重字典
stock_weights_dict = stock_weights[0] if stock_weights else {}
data_column_name = f'close_{stock_code}' # 创建RSI策略
source_column_name = f'{stock_code} RSI策略' strategy, signal = await create_rsi_strategy(data, stock_weights_dict, rsi_period, overbought, oversold)
backtest_name = f'{stock_code} RSI策略 RSI{short_window}-RSI{long_window}'
now_data = int(datetime.now().strftime('%Y%m%d'))
db_result_data = await wance_data_storage_backtest.WanceDataStorageBacktest.filter(
backtest_name=backtest_name)
if db_result_data: # 创建回测对象
if db_result_data[0].backtest_end_time == now_data: backtest = bt.Backtest(strategy, data, initial_capital=100000)
results_list.append({source_column_name: db_result_data[0]})
# elif data_column_name in data.columns: # 策略参数
if data_column_name in data.columns: strategy_parame = {"rsi_period": rsi_period, "overbought": overbought, "oversold": oversold}
stock_data_series = data[[data_column_name]] # 提取该股票的收盘价 DataFrame strategy_name = "RSI策略"
stock_data_series.columns = ['close'] # 重命名列为 'close'
# 创建RSI策略 # 运行回测
strategy, signal = await create_dual_ma_strategy(stock_data_series, stock_code, result = bt.run(backtest)
short_window=short_window, long_window=long_window)
# 创建回测
backtest = bt.Backtest(strategy=strategy, data=stock_data_series, initial_capital=100000)
# 运行回测
result = bt.run(backtest)
# 存储回测结果
data_to_store = await storage_backtest_data(source_column_name, result, signal, stock_code,
stock_data_series, short_window, long_window, overbought,
oversold)
# # 绘制回测结果图表
# result.plot()
# # 绘制个别股票数据图表
# plt.figure(figsize=(12, 6))
# plt.plot(stock_data_series.index, stock_data_series['close'], label='Stock Price')
# plt.title(f'Stock Price for {stock_code}')
# plt.xlabel('Date')
# plt.ylabel('Price')
# plt.legend()
# plt.grid(True)
# plt.show()
# 将结果存储为字典并添加到列表中
results_list.append({source_column_name: data_to_store})
else: # 存储回测结果
print(f"数据中缺少列: {data_column_name}") await storage_backtest_data(user_id, stock_weights_dict, strategy_parame, strategy_name)
return results_list # 返回结果列表 return result # 返回回测结果
except Exception as e: except Exception as e:
print(f"Error occurred: {e}") print(f"Error occurred: {e}") # 输出错误信息
async def start_rsi_combination_service(field_list: list, async def start_rsi_combination_service(field_list: list,
stock_list: list, stock_list: list,
period: str = '1d', stock_weights: list,
start_time: str = '', user_id: int = 1,
end_time: str = '', period: str = '1d',
count: int = -1, start_time: str = '',
dividend_type: str = 'none', end_time: str = '',
fill_data: bool = True, count: int = -1,
data_dir: str = '', dividend_type: str = 'none',
short_window: int = 50, fill_data: bool = True,
long_window: int = 200, data_dir: str = '',
overbought: int = 70, rsi_period: int = 14,
oversold: int = 30 overbought: float = 70,
): oversold: float = 30):
for stock_code in stock_list: """
backtest_name = f'{stock_code} RSI策略 RSI{short_window}-RSI{long_window}' 启动RSI组合策略服务
db_result = await wance_data_storage_backtest.WanceDataStorageBacktest.filter(
backtest_name=backtest_name)
now_time = int(datetime.now().strftime('%Y%m%d'))
if db_result and db_result[0].backtest_end_time == now_time: 参数:
return db_result field_list: list, 需要获取的字段列表
else: stock_list: list, 需要回测的股票代码列表
# 执行回测 stock_weights: list, 股票权重列表
result = await run_rsi_backtest( user_id: int, 用户ID默认为1
field_list=field_list, period: str, 数据周期默认为'1d'
stock_list=stock_list, start_time: str, 开始时间默认为''
period=period, end_time: str, 结束时间默认为''
start_time=start_time, count: int, 数据数量限制默认为-1
end_time=end_time, dividend_type: str, 分红类型默认为'none'
count=count, fill_data: bool, 是否填充数据默认为True
dividend_type=dividend_type, data_dir: str, 数据目录默认为''
fill_data=fill_data, rsi_period: int, RSI计算周期默认为14
data_dir=data_dir, overbought: float, 超买阈值默认为70
short_window=short_window, oversold: float, 超卖阈值默认为30
long_window=long_window,
overbought=overbought,
oversold=oversold
)
return result
返回:
async def init_backtest_db(): result: 回测结果
sma_list = [{"short_window": 3, "long_window": 6}, {"short_window": 6, "long_window": 12}, """
{"short_window": 12, "long_window": 24}, {"short_window": 14, "long_window": 18}, # 调用回测函数并返回结果
{"short_window": 15, "long_window": 10}] result = await run_combined_rsi_backtest(
await init_tortoise() field_list=field_list,
wance_db = await wance_data_stock.WanceDataStock.all() stock_list=stock_list,
sma_list_lenght = len(sma_list) stock_weights=stock_weights,
user_id=user_id,
for stock_code in wance_db: period=period,
for i in range(sma_list_lenght): start_time=start_time,
short_window = sma_list[i]['short_window'] end_time=end_time,
long_window = sma_list[i]['long_window'] count=count,
source_column_name = f'{stock_code} RSI策略 RSI{short_window}-RSI{long_window}' dividend_type=dividend_type,
result = await start_rsi_backtest_service(field_list=['close', 'time'], fill_data=fill_data,
stock_list=[stock_code.stock_code], data_dir=data_dir,
short_window=short_window, rsi_period=rsi_period,
long_window=long_window, overbought=overbought,
overbought=70, oversold=oversold,
oversold=30) )
return result # 返回回测结果
print(f"回测成功 {source_column_name}")
if __name__ == '__main__': if __name__ == '__main__':
# 测试类的回测 # 主函数,启动回测服务
asyncio.run(run_rsi_backtest(field_list=['close', 'time'], asyncio.run(run_combined_rsi_backtest(field_list=['close', 'time'],
stock_list=['601222.SH', '601677.SH'], stock_list=["688031.SH", "600025.SH", "601222.SH"],
count=-1, user_id=1,
short_window=10, stock_weights=[{"688031.SH": 0.5, "600025.SH": 0.3, "601222.SH": 0.2}],
long_window=30, rsi_period=14,
overbought=70, overbought=70,
oversold=30 oversold=30))
))
# # 初始化数据库表
# asyncio.run(init_backtest_db())

@ -28,7 +28,7 @@ async def start_combination_service(field_list: list,
overbought: int = 70, overbought: int = 70,
oversold: int = 30, oversold: int = 30,
signal_window: int = 9, signal_window: int = 9,
user_id: int = 1): user_id: int = 1) -> object:
# 数据检查 # 数据检查
await data_check(field_list=field_list, await data_check(field_list=field_list,
stock_list=stock_list, stock_list=stock_list,

@ -10,4 +10,4 @@ class BackObservedData(Model):
class Meta: class Meta:
table = with_table_name("user_combination_history") table = with_table_name("back_observed_data")

@ -3,11 +3,12 @@ from src.models import with_table_name
class UserCombinationHistory(Model): class UserCombinationHistory(Model):
id = fields.IntField() id = fields.IntField(pk=True, )
last_time = fields.CharField(max_length=10, null=True, description='最后一次使用的时间', ) last_time = fields.CharField(max_length=10, null=True, description='最后一次使用的时间', )
stock_code = fields.CharField(max_length=20, null=True, description='股票代码', )
stock_name = fields.CharField(max_length=20, null=True, description='股票名称', )
stock_weights = fields.JSONField(null=True, description='权重存储', ) stock_weights = fields.JSONField(null=True, description='权重存储', )
strategy = fields.JSONField(null=True, description='用户使用的策略参数', )
strategy_name = fields.CharField(max_length=50, null=True, description='用户使用的策略', ) strategy_name = fields.CharField(max_length=50, null=True, description='用户使用的策略', )
strategy_parame = fields.JSONField(null=True, description='用户使用的策略参数', )
user_id = fields.IntField(description='用户的id', ) user_id = fields.IntField(description='用户的id', )
class Meta:
table = with_table_name("user_combination_history")

@ -18,7 +18,6 @@ async def get_local_data(field_list: list, stock_list: list, period: str, start_
data_dir=data_dir) data_dir=data_dir)
return await data_processing(result) return await data_processing(result)
async def data_processing(result_local): async def data_processing(result_local):
# 初始化一个空的列表,用于存储每个股票的数据框 # 初始化一个空的列表,用于存储每个股票的数据框
df_list = [] df_list = []
@ -27,9 +26,11 @@ async def data_processing(result_local):
for stock_code, df in result_local.items(): for stock_code, df in result_local.items():
# 确保 df 是一个 DataFrame # 确保 df 是一个 DataFrame
if isinstance(df, pd.DataFrame): if isinstance(df, pd.DataFrame):
# 将时间戳转换为日期时间格式,只保留年-月-日 # 将时间戳转换为 UTC 日期时间格式
df['time'] = pd.to_datetime(df['time'], unit='ms').dt.date df['time'] = pd.to_datetime(df['time'], unit='ms', utc=True)
# 将 'time' 列设置为索引,保留为日期格式 # 转换为指定时区(例如东八区)
df['time'] = df['time'].dt.tz_convert('Asia/Shanghai')
# 将 'time' 列设置为索引
df.set_index('time', inplace=True) df.set_index('time', inplace=True)
# 将 'close' 列重命名为 'close_股票代码' # 将 'close' 列重命名为 'close_股票代码'
df.rename(columns={'close': f'close_{stock_code}'}, inplace=True) df.rename(columns={'close': f'close_{stock_code}'}, inplace=True)
@ -41,13 +42,14 @@ async def data_processing(result_local):
# 使用 pd.concat() 将所有 DataFrame 合并为一个大的 DataFrame保留所有列 # 使用 pd.concat() 将所有 DataFrame 合并为一个大的 DataFrame保留所有列
combined_df = pd.concat(df_list, axis=1) combined_df = pd.concat(df_list, axis=1)
# 确保返回的 DataFrame 索引是日期格式 # 确保索引是日期时间格式
combined_df.index = pd.to_datetime(combined_df.index) combined_df.index = pd.to_datetime(combined_df.index)
return combined_df return combined_df
def convert_pandas_to_json_serializable(data: pd.Series) -> str: def convert_pandas_to_json_serializable(data: pd.Series) -> str:
""" """
Pandas Series DataFrame 中的 Timestamp 索引转换为字符串并返回 JSON 可序列化的结果 Pandas Series DataFrame 中的 Timestamp 索引转换为字符串并返回 JSON 可序列化的结果

@ -0,0 +1,166 @@
import json
from datetime import datetime
import numpy as np
from xtquant import xtdata
import pandas as pd
from src.models import wance_data_stock
# 数据的列名
columns = ['open', 'high', 'low', 'close', 'volume', 'amount', 'settelmentPrice',
'openInterest', 'preClose', 'suspendFlag']
# 获取本地数据并进行处理
async def get_local_data(field_list: list, stock_list: list, period: str, start_time: str, end_time: str,
count: int, dividend_type: str, fill_data: bool, data_dir: str):
list = []
for field in stock_list:
result = await wance_data_stock.WanceDataStock.filter(stock_code=field)
list.append(int(result[0].time_start))
start_time = str(max(list))
result = xtdata.get_local_data(field_list=field_list, stock_list=stock_list, period=period, start_time=start_time,
end_time=end_time, count=count, dividend_type=dividend_type, fill_data=fill_data,
data_dir=data_dir)
return await data_processing(result)
async def data_processing(result_local):
# 初始化一个空的列表,用于存储每个股票的数据框
df_list = []
# 用于存储每个股票的最早时间
start_times = []
# 遍历字典中的 DataFrame
for stock_code, df in result_local.items():
# 确保 df 是一个 DataFrame
if isinstance(df, pd.DataFrame):
# 将时间戳转换为 UTC 日期时间格式
df['time'] = pd.to_datetime(df['time'], unit='ms', utc=True)
# 转换为指定时区(例如东八区)
df['time'] = df['time'].dt.tz_convert('Asia/Shanghai')
# 记录每个股票的最早时间
start_times.append(df['time'].min())
# 将 'time' 列设置为索引
df.set_index('time', inplace=True)
# 将 'close' 列重命名为 'close_股票代码'
df.rename(columns={'close': f'{stock_code}'}, inplace=True)
# 将所有列添加到列表中
df_list.append(df) # 保留所有字段,包括重命名后的 'close_股票代码'
else:
print(f"数据格式错误: {stock_code} 不包含 DataFrame")
# 找到所有股票数据中最晚的开始时间
latest_start_time = max(start_times)
# 使用 pd.concat() 将所有 DataFrame 合并为一个大的 DataFrame保留所有列
combined_df = pd.concat(df_list, axis=1)
# 确保索引是日期时间格式
combined_df.index = pd.to_datetime(combined_df.index)
# 根据最新的开始时间切割数据
combined_df = combined_df[combined_df.index >= latest_start_time]
# 填充 NaN 值为 0
combined_df.fillna(0, inplace=True)
return combined_df
def convert_pandas_to_json_serializable(data: pd.Series) -> str:
"""
Pandas Series DataFrame 中的 Timestamp 索引转换为字符串并返回 JSON 可序列化的结果
参数:
data: pd.Series pd.DataFrame, 带有时间戳索引的 pandas 数据
返回:
JSON 字符串键为日期字符串值为原数据的值
"""
# 判断数据类型
if isinstance(data, (pd.Series, pd.DataFrame)):
# 如果索引是时间戳类型,则转换为 YYYYMMDD 格式
if isinstance(data.index, pd.DatetimeIndex):
data.index = data.index.strftime('%Y%m%d')
# 处理 NaN 和 None 的情况,替换为 0 或其他合适的默认值
data = data.replace([np.nan, None], 0)
# 将索引重置为普通列,然后转换为字典
json_serializable_data = data.rename_axis('date').reset_index().to_dict(orient='records')
# 将字典转换为 JSON 格式字符串
json_string = json.dumps(json_serializable_data)
return json_string
else:
raise ValueError("输入必须为 Pandas Series 或 DataFrame")
def clean_value(value):
"""清理数据值,使其适合 JSON 序列化。"""
if pd.isna(value) or np.isinf(value):
return 0 # 或者可以使用 None 代替
return value
def to_json_serializable(data):
"""
Pandas DataFrame 转换为 JSON 可序列化的格式
- 将索引如果是时间戳转换为 'YYYYMMDD' 格式
- 将数据按时间升序排列
- 返回数组形式的结构内部仍使用字典存放每一行数据
"""
result = []
if isinstance(data, pd.DataFrame):
# 将索引(时间戳)转换为 YYYYMMDD 格式
data.index = data.index.strftime('%Y%m%d')
# 按时间(索引)升序排序
data = data.sort_index(ascending=True)
# 遍历 DataFrame 的每一行
for idx, row in data.iterrows():
# 创建一个字典来存放当前行的数据
row_dict = {'date': idx} # 将日期作为键
# 清理行中的每个值
for col in row.index:
row_dict[col] = clean_value(row[col]) # 清理每个值
result.append(row_dict) # 将字典添加到结果数组中
return result
elif isinstance(data, pd.Series):
# 将 Series 转换为数组形式(每一行的数据以字典形式存放)
for idx in data.index:
result.append({str(idx.strftime('%Y%m%d')): clean_value(data[idx])})
return result
else:
raise TypeError("输入数据必须是 Pandas 的 DataFrame 或 Series")
async def data_check(field_list: list,
stock_list: list,
period: str = '1d',
start_time: str = '',
end_time: str = '',
count: int = -1,
dividend_type: str = 'none',
fill_data: bool = True,
data_dir: str = '', ):
result_data = xtdata.get_local_data(field_list=[], stock_list=stock_list, period=period, start_time=start_time,
end_time=end_time, count=count, dividend_type=dividend_type, fill_data=fill_data,
data_dir=data_dir)
time_now = int(datetime.now().strftime('%Y%m%d'))
for i in stock_list:
close = int(result_data.get(i).index[-1])
if close != 0 and close < time_now:
xtdata.download_history_data(stock_code=i,
period='1d',
start_time='',
end_time='',
incrementally=True)