diff --git a/models.py b/models.py index fed55ba..5b69d42 100644 Binary files a/models.py and b/models.py differ diff --git a/src/backtest/router.py b/src/backtest/router.py index baa5c72..cc85b60 100644 --- a/src/backtest/router.py +++ b/src/backtest/router.py @@ -30,8 +30,3 @@ async def stock_chart(request: BackRequest): result = await stock_chart_service(stock_code=request.stock_code, benchmark_code=request.benchmark_code) return result - - -@router.get('/combination') -async def combination(request: BackRequest): - await combination_service() diff --git a/src/combination/bollinger_bands.py b/src/combination/bollinger_bands.py index 18050b1..7411912 100644 --- a/src/combination/bollinger_bands.py +++ b/src/combination/bollinger_bands.py @@ -1,30 +1,58 @@ - - import asyncio import json from datetime import datetime -import bt +import bt # 引入bt框架用于回测 import numpy as np import pandas as pd -from src.utils.backtest_until import get_local_data, convert_pandas_to_json_serializable -from src.models import wance_data_storage_backtest, wance_data_stock -from src.tortoises_orm_config import init_tortoise +from src.models import wance_data_storage_backtest, wance_data_stock, user_combination_history # 导入数据库模型 +from src.tortoises_orm_config import init_tortoise # 初始化数据库 +from src.utils.combination_until import get_local_data # 获取本地数据的工具函数 -# 布林带策略函数 -async def create_bollinger_bands_strategy(data, stock_code: str, bollingerMA: int = 50, std_dev: int = 200): - # 生成布林带策略信号 - signal = await bollinger_bands_strategy(data, bollingerMA, std_dev) +# 创建布林带策略函数 +async def create_bollinger_bands_strategy(data, stock_weights_dict, bollingerMA: int = 50, std_dev: int = 200): + """ + 创建组合布林带策略,并根据股票权重进行回测。 - # 使用bt框架构建策略 - strategy = bt.Strategy(f'{stock_code} 布林带策略', - [bt.algos.RunDaily(), - bt.algos.SelectAll(), # 选择所有股票 - bt.algos.WeighTarget(signal), # 根据信号调整权重 - bt.algos.Rebalance()]) # 调仓 - return strategy, signal + 参数: + data: pd.DataFrame, 股票的价格数据,行索引为日期,列为股票代码。 + stock_weights_dict: dict, 每只股票的权重字典。 + bollingerMA: int, 用于计算布林带的移动平均线周期。 + std_dev: int, 用于计算布林带上下轨的标准差倍数。 + + 返回: + strategy: bt.Strategy, 构建的策略。 + combined_signal: pd.DataFrame, 股票的买卖信号。 + """ + # 创建一个数据框用于存储所有股票的买卖信号 + combined_signal = pd.DataFrame(index=data.index) + + # 遍历每只股票,根据股票权重生成布林带信号 + for stock_code in stock_weights_dict.keys(): + if f'{stock_code}' in data.columns: + # 提取每只股票的数据 + stock_data_series = data[[f'{stock_code}']] + stock_data_series.columns = [f'{stock_code}'] + + # 调用布林带策略计算买卖信号 + signal = await bollinger_bands_strategy(stock_data_series, bollingerMA, std_dev) + combined_signal[stock_code] = signal + else: + print(f"Warning: Stock code {stock_code} not found in data columns.") # 如果股票不在数据中,发出警告 + + # 创建权重数据框 + weights = pd.DataFrame(index=data.index, data=[stock_weights_dict] * len(data.index)) + + # 使用bt框架构建布林带策略 + strategy = bt.Strategy('组合布林带策略', + [bt.algos.RunDaily(), # 每天运行策略 + bt.algos.SelectAll(), # 选择所有可用的股票 + bt.algos.WeighSpecified(**stock_weights_dict), # 根据给定权重进行加权 + bt.algos.Rebalance()]) # 调整投资组合 + + return strategy, combined_signal # 返回构建的策略和信号 async def bollinger_bands_strategy(df, window=20, num_std_dev=2): @@ -39,173 +67,131 @@ async def bollinger_bands_strategy(df, window=20, num_std_dev=2): 返回: signal: pd.DataFrame, 每只股票的买卖信号,1 表示买入,0 表示卖出。 """ - # 计算中轨线(移动平均) + # 计算布林带的中轨(移动平均) middle_band = df.rolling(window=window, min_periods=1).mean() - - # 计算滚动标准差 + # 计算移动标准差 rolling_std = df.rolling(window=window, min_periods=1).std() - - # 计算上轨线和下轨线 + # 计算布林带的上下轨 upper_band = middle_band + (rolling_std * num_std_dev) lower_band = middle_band - (rolling_std * num_std_dev) - # 初始化信号 DataFrame + # 创建一个数据框用于存储每只股票的买卖信号 signal = pd.DataFrame(index=df.index, columns=df.columns) - # 生成买入信号:当价格突破下轨时 + # 根据布林带计算买卖信号 for column in df.columns: - signal[column] = np.where(df[column] < lower_band[column], 1, np.nan) # 买入信号 + signal[column] = np.where(df[column] < lower_band[column], 1, np.nan) # 当收盘价低于下轨,生成买入信号 + signal[column] = np.where(df[column] > upper_band[column], 0, signal[column]) # 当收盘价高于上轨,生成卖出信号 - # 生成卖出信号:当价格突破上轨时 - for column in df.columns: - signal[column] = np.where(df[column] > upper_band[column], 0, signal[column]) # 卖出信号 - - # 前向填充信号,持仓不变 - signal = signal.ffill() - - # 将剩余的 NaN 替换为 0 - signal = signal.fillna(0) - - return signal + # 前向填充信号,并用0填充NaN值 + signal = signal.ffill().fillna(0) + return signal # 返回信号数据框 -async def storage_backtest_data(source_column_name, result, signal, stock_code, stock_data_series, bollingerMA, - std_dev): - await init_tortoise() +async def storage_backtest_data(user_id, stock_weights, strategy_parame, strategy_name): + """ + 存储回测结果到数据库。 - # 要存储的字段列表 - fields_to_store = [ - 'stock_code', 'strategy_name', 'stock_close_price', 'daily_price', - 'price', 'returns', 'data_start_time', 'data_end_time', - 'backtest_end_time', 'position', 'backtest_name', 'rf', 'total_return', 'cagr', - 'max_drawdown', 'calmar', 'mtd', 'three_month', - 'six_month', 'ytd', 'one_year', 'three_year', - 'five_year', 'ten_year', 'incep', 'daily_sharpe', - 'daily_sortino', 'daily_mean', 'daily_vol', - 'daily_skew', 'daily_kurt', 'best_day', 'worst_day', - 'monthly_sharpe', 'monthly_sortino', 'monthly_mean', - 'monthly_vol', 'monthly_skew', 'monthly_kurt', - 'best_month', 'worst_month', 'yearly_sharpe', - 'yearly_sortino', 'yearly_mean', 'yearly_vol', - 'yearly_skew', 'yearly_kurt', 'best_year', 'worst_year', - 'avg_drawdown', 'avg_drawdown_days', 'avg_up_month', - 'avg_down_month', 'win_year_perc', 'twelve_month_win_perc' - ] + 参数: + user_id: int, 用户的ID。 + stock_weights: dict, 股票权重字典。 + strategy_parame: dict, 策略参数字典。 + strategy_name: str, 策略名称。 + """ + await init_tortoise() # 初始化数据库 - # 准备要存储的数据 - data_to_store = { - 'stock_code': stock_code, - 'strategy_name': "布林带策略", - 'stock_close_price': json.dumps(stock_data_series.fillna(0).rename_axis('time').reset_index().assign( - time=stock_data_series.index.strftime('%Y%m%d')).set_index('time').to_dict(orient='index')), - 'daily_price': convert_pandas_to_json_serializable(result[source_column_name].daily_prices), - 'price': convert_pandas_to_json_serializable(result[source_column_name].prices), - 'returns': convert_pandas_to_json_serializable(result[source_column_name].returns.fillna(0)), - 'data_start_time': pd.to_datetime(result.stats.loc["start"].iloc[0]).strftime('%Y%m%d'), - 'data_end_time': pd.to_datetime(result.stats.loc["end"].iloc[0]).strftime('%Y%m%d'), - 'backtest_end_time': int(datetime.now().strftime('%Y%m%d')), - 'position': convert_pandas_to_json_serializable(signal), - 'backtest_name': f'{stock_code} 布林带策略 MA{bollingerMA}-{std_dev}倍标准差', - 'indicator_type': 'Bollinger', - 'indicator_information': json.dumps({'bollingerMA': bollingerMA, 'std_dev': std_dev}) - } + # 查询用户的已有记录 + existing_record = await user_combination_history.UserCombinationHistory.filter(user_id=user_id).first() - # 使用循环填充其他字段 - for field in fields_to_store[12:]: # 从第10个字段开始 - value = result.stats.loc[field].iloc[0] - data_to_store[field] = 0.0 if (isinstance(value, float) and np.isnan(value)) else value - - # 检查是否存在该 backtest_name - existing_record = await wance_data_storage_backtest.WanceDataStorageBacktest.filter( - backtest_name=data_to_store['backtest_name'] - ).first() + # 将股票权重和策略参数转换为JSON格式 + stock_weights_json = json.dumps(stock_weights) + strategy_parame_json = json.dumps(strategy_parame) + # 如果记录存在,更新记录;否则,创建新记录 if existing_record: - # 如果存在,更新记录 - await wance_data_storage_backtest.WanceDataStorageBacktest.filter( + await user_combination_history.UserCombinationHistory.filter( id=existing_record.id - ).update(**data_to_store) + ).update(user_id=user_id, + last_time=int(datetime.now().strftime('%Y%m%d')), + stock_weights=stock_weights_json, + strategy_name=strategy_name, + strategy_parame=strategy_parame_json) else: - # 如果不存在,创建新的记录 - await wance_data_storage_backtest.WanceDataStorageBacktest.create(**data_to_store) - - return data_to_store + await user_combination_history.UserCombinationHistory.create(user_id=user_id, + last_time=int(datetime.now().strftime('%Y%m%d')), + stock_weights=stock_weights_json, + strategy_name=strategy_name, + strategy_parame=strategy_parame_json) -async def run_bollinger_backtest(field_list: list, - stock_list: list, - period: str = '1d', - start_time: str = '', - end_time: str = '', - count: int = 100, - dividend_type: str = 'none', - fill_data: bool = True, - data_dir: str = '', - bollingerMA: int = 50, - std_dev: int = 200): +async def run_combined_bollinger_backtest(field_list: list, + stock_list: list, + stock_weights: list, + user_id: int = 1, + period: str = '1d', + start_time: str = '', + end_time: str = '', + count: int = 100, + dividend_type: str = 'none', + fill_data: bool = True, + data_dir: str = '', + bollingerMA: int = 50, + std_dev: int = 200): + """ + 运行组合布林带策略的回测。 + + 参数: + field_list: list, 需要获取的字段列表。 + stock_list: list, 需要回测的股票代码列表。 + stock_weights: list, 股票权重列表。 + user_id: int, 用户ID。 + period: str, 数据周期。 + start_time: str, 开始时间。 + end_time: str, 结束时间。 + count: int, 数据数量限制。 + dividend_type: str, 分红类型。 + fill_data: bool, 是否填充数据。 + data_dir: str, 数据目录。 + bollingerMA: int, 布林带移动平均线周期。 + std_dev: int, 标准差倍数。 + + 返回: + result: 回测结果。 + """ try: - # 初始化一个列表用于存储每只股票的回测结果字典 - results_list = [] - - # 遍历每只股票的数据(每列代表一个股票的收盘价) + # 获取本地数据 data = await get_local_data(field_list, stock_list, period, start_time, end_time, count, dividend_type, - fill_data, - data_dir) + fill_data, data_dir) - for stock_code in stock_list: + # 获取股票权重字典 + stock_weights_dict = stock_weights[0] if stock_weights else {} - data_column_name = f'close_{stock_code}' - source_column_name = f'{stock_code} 布林带策略' - backtest_name = f'{stock_code} 布林带策略 MA{bollingerMA}-{std_dev}倍标准差' - now_time = int(datetime.now().strftime('%Y%m%d')) - db_result = await wance_data_storage_backtest.WanceDataStorageBacktest.filter( - backtest_name=backtest_name) - if db_result: - if db_result[0].backtest_end_time == now_time: - results_list.append({source_column_name: db_result[0]}) + # 创建布林带策略 + strategy, signal = await create_bollinger_bands_strategy(data, stock_weights_dict, bollingerMA, std_dev) - # elif data_column_name in data.columns: - if data_column_name in data.columns: - stock_data_series = data[[data_column_name]] # 提取该股票的收盘价 DataFrame - stock_data_series.columns = ['close'] # 重命名列为 'close' + # 创建回测对象 + backtest = bt.Backtest(strategy, data, initial_capital=100000) - # 创建布林带策略 - strategy, signal = await create_bollinger_bands_strategy(stock_data_series, stock_code, - bollingerMA=bollingerMA, - std_dev=std_dev) - # 创建回测 - backtest = bt.Backtest(strategy=strategy, data=stock_data_series, initial_capital=100000) - # 运行回测 - result = bt.run(backtest) - # 存储回测结果 - data_to_store = await storage_backtest_data(source_column_name, result, signal, stock_code, - stock_data_series, - bollingerMA, std_dev) - # # 绘制回测结果图表 - # result.plot() - # # 绘制个别股票数据图表 - # plt.figure(figsize=(12, 6)) - # plt.plot(stock_data_series.index, stock_data_series['close'], label='Stock Price') - # plt.title(f'Stock Price for {stock_code}') - # plt.xlabel('Date') - # plt.ylabel('Price') - # plt.legend() - # plt.grid(True) - # plt.show() - # 将结果存储为字典并添加到列表中 - results_list.append({source_column_name: data_to_store}) + # 策略参数 + strategy_parame = {"bollingerMA": bollingerMA, "std_dev": std_dev} + strategy_name = "布林带策略" - else: - print(f"数据中缺少列: {data_column_name}") + # 运行回测 + result = bt.run(backtest) - return results_list # 返回结果列表 + # 存储回测结果 + await storage_backtest_data(user_id, stock_weights_dict, strategy_parame, strategy_name) + + return result # 返回回测结果 except Exception as e: - print(f"Error occurred: {e}") + print(f"Error occurred: {e}") # 输出错误信息 async def start_bollinger_combination_service(field_list: list, stock_list: list, + stock_weights: list, + user_id: int = 1, period: str = '1d', start_time: str = '', end_time: str = '', @@ -215,59 +201,51 @@ async def start_bollinger_combination_service(field_list: list, data_dir: str = '', bollingerMA: int = 50, std_dev: int = 200): - for stock_code in stock_list: - backtest_name = f'{stock_code} 布林带策略 MA{bollingerMA}-{std_dev}倍标准差' - db_result = await wance_data_storage_backtest.WanceDataStorageBacktest.filter( - backtest_name=backtest_name) - now_time = int(datetime.now().strftime('%Y%m%d')) + """ + 启动布林带策略回测服务。 - if db_result and db_result[0].backtest_end_time == now_time: - return db_result - else: - # 执行回测 - result = await run_bollinger_backtest( - field_list=field_list, - stock_list=stock_list, - period=period, - start_time=start_time, - end_time=end_time, - count=count, - dividend_type=dividend_type, - fill_data=fill_data, - data_dir=data_dir, - bollingerMA=bollingerMA, - std_dev=std_dev, - ) - return result + 参数: + field_list: list, 需要获取的字段列表。 + stock_list: list, 需要回测的股票代码列表。 + stock_weights: list, 股票权重列表。 + user_id: int, 用户ID。 + period: str, 数据周期。 + start_time: str, 开始时间。 + end_time: str, 结束时间。 + count: int, 数据数量限制。 + dividend_type: str, 分红类型。 + fill_data: bool, 是否填充数据。 + data_dir: str, 数据目录。 + bollingerMA: int, 布林带移动平均线周期。 + std_dev: int, 标准差倍数。 - -async def init_backtest_db(): - bollinger_list = [{"bollingerMA": 20, "std_dev": 2}, {"bollingerMA": 30, "std_dev": 2}, - {"bollingerMA": 70, "std_dev": 2}, {"bollingerMA": 5, "std_dev": 1}, - {"bollingerMA": 20, "std_dev": 3}, {"bollingerMA": 50, "std_dev": 2.5}] - await init_tortoise() - wance_db = await wance_data_stock.WanceDataStock.all() - bollinger_list_lenght = len(bollinger_list) - - for stock_code in wance_db: - for i in range(bollinger_list_lenght): - bollingerMA = bollinger_list[i]['bollingerMA'] - std_dev = bollinger_list[i]['std_dev'] - source_column_name = f'{stock_code} 布林带策略 MA{bollingerMA}-{std_dev}倍标准差' - result = await run_bollinger_backtest(field_list=['close', 'time'], - stock_list=[stock_code.stock_code], - bollingerMA=bollingerMA, - std_dev=std_dev) - - print(f"回测成功 {source_column_name}") + 返回: + result: 回测结果。 + """ + # 调用回测函数并返回结果 + result = await run_combined_bollinger_backtest( + field_list=field_list, + stock_list=stock_list, + stock_weights=stock_weights, + user_id=user_id, + period=period, + start_time=start_time, + end_time=end_time, + count=count, + dividend_type=dividend_type, + fill_data=fill_data, + data_dir=data_dir, + bollingerMA=bollingerMA, + std_dev=std_dev, + ) + return result # 返回回测结果 if __name__ == '__main__': - # 测试类的回测 - asyncio.run(run_bollinger_backtest(field_list=['close', 'time'], - stock_list=['601222.SH', '601677.SH'], - bollingerMA=20, - std_dev=2)) - - # # 初始化数据库表 - # asyncio.run(init_backtest_db()) + # 主函数,启动回测服务 + asyncio.run(run_combined_bollinger_backtest(field_list=['close', 'time'], + stock_list=["688031.SH", "600025.SH", "601222.SH"], + user_id=1, + stock_weights=[{"688031.SH": 0.5, "600025.SH": 0.3, "601222.SH": 0.2}], + bollingerMA=50, + std_dev=200)) diff --git a/src/combination/combination.py b/src/combination/combination.py deleted file mode 100644 index ca9551c..0000000 --- a/src/combination/combination.py +++ /dev/null @@ -1,110 +0,0 @@ -import pandas as pd -import bt -import yfinance as yf # 这里使用 yfinance 获取股票数据,实际应用可以替换成其他数据源 - - -def fetch_stock_data(stock_list, start, end): - """ - 获取指定股票的历史数据。 - - 参数: - stock: str, 股票代码。 - start: str, 开始时间。 - end: str, 结束时间。 - - 返回: - pd.DataFrame, 包含股票的收盘价数据。 - """ - data = yf.download(stock_list, start=start, end=end)['Adj Close'] - return data - - -def combine_stock_data(stock_list, start, end): - """ - 合并多只股票的数据。 - - 参数: - stocks: list, 股票代码列表。 - start: str, 开始时间。 - end: str, 结束时间。 - - 返回: - pd.DataFrame, 合并后的股票价格数据。 - """ - combined_data = pd.DataFrame() - - for stock in stock_list: - data = fetch_stock_data(stock, start, end) - combined_data[stock] = data - - return combined_data - - -def create_portfolio_strategy(stock_weights): - """ - 创建基于股票权重的投资组合策略。 - - 参数: - stock_weights: list of dict, 股票与权重的字典列表。例如: - [{'601222.SH': 0.5, '605090.SH': 0.3, '600025.SH': 0.2}] - - 返回: - 策略对象。 - """ - algos = [ - bt.algos.RunDaily(), # 每日运行策略 - bt.algos.SelectAll(), # 选择所有股票 - bt.algos.WeighSpecified(**stock_weights[0]), # 分配指定的权重 - bt.algos.Rebalance() # 进行调仓 - ] - - strategy = bt.Strategy('Portfolio Strategy', algos) - - return strategy - - -def run_portfolio_backtest(stocks_list, start, end, stock_weights): - """ - 运行组合策略的回测。 - - 参数: - stocks: list, 股票代码列表。 - start: str, 开始时间。 - end: str, 结束时间。 - stock_weights: list of dict, 股票与权重的字典列表。例如: - [{'601222.SH': 0.5, '605090.SH': 0.3, '600025.SH': 0.2}] - - 返回: - 回测结果。 - """ - # 获取股票的历史数据 - data = combine_stock_data(stocks_list, start, end) - - # 创建策略 - strategy = create_portfolio_strategy(stock_weights) - - # 创建回测 - backtest = bt.Backtest(strategy, data) - - # 运行回测 - result = bt.run(backtest) - - return result - - -# 示例使用 -if __name__ == '__main__': - # 示例股票名 - stocks = ['601222.SH', '605090.SH', '600025.SH'] - start_date = '2021-01-01' - end_date = '2023-01-01' - - # 示例权重分配 - stock_weights = [{'601222.SH': 0.5, '605090.SH': 0.3, '600025.SH': 0.2}] - - # 运行组合策略回测 - result = run_portfolio_backtest(stocks, start_date, end_date, stock_weights) - - # 打印回测结果 - result.plot() - result.display() diff --git a/src/combination/combination_dual_ma_strategy.py b/src/combination/combination_dual_ma_strategy.py deleted file mode 100644 index 1397979..0000000 --- a/src/combination/combination_dual_ma_strategy.py +++ /dev/null @@ -1,215 +0,0 @@ -import asyncio -import json -from datetime import datetime - -import bt -import numpy as np -import pandas as pd - -from src.utils.backtest_until import get_local_data, convert_pandas_to_json_serializable -from src.models import wance_data_storage_backtest, wance_data_stock -from src.tortoises_orm_config import init_tortoise - - -# 双均线策略函数 -async def create_dual_ma_strategy(data, short_window: int = 50, long_window: int = 200): - # 生成权重 - weights = await dual_ma_strategy(data, short_window, long_window) - - # 使用bt框架构建组合策略 - strategy = bt.Strategy('组合双均线策略', - [bt.algos.RunDaily(), - bt.algos.SelectAll(), - bt.algos.WeighTarget(weights), # 根据信号调整权重 - bt.algos.Rebalance()]) - return strategy - - -async def dual_ma_strategy(df, short_window=20, long_window=50): - """ - 基于双均线策略生成买卖信号。 - - 参数: - df: pd.DataFrame, 股票的价格数据,行索引为日期,列为股票代码。 - short_window: int, 短期均线窗口期。 - long_window: int, 长期均线窗口期。 - - 返回: - signal: pd.DataFrame, 每只股票的买卖信号,1 表示买入,0 表示卖出。 - """ - # 计算短期均线和长期均线 - short_ma = df.rolling(window=short_window, min_periods=1).mean() - long_ma = df.rolling(window=long_window, min_periods=1).mean() - - # 生成买入信号 - buy_signal = (short_ma > long_ma).astype(int) - - # 计算权重(例如:均等权重) - weights = buy_signal.div(buy_signal.sum(axis=1), axis=0).fillna(0) - - return weights - - -async def storage_backtest_data(source_column_name, result, signal, stock_data_series, short_window, long_window): - await init_tortoise() - - # 要存储的字段列表 - fields_to_store = [ - 'stock_code', 'strategy_name', 'stock_close_price', 'daily_price', - 'price', 'returns', 'data_start_time', 'data_end_time', - 'backtest_end_time', 'position', 'backtest_name', 'rf', 'total_return', 'cagr', - 'max_drawdown', 'calmar', 'mtd', 'three_month', - 'six_month', 'ytd', 'one_year', 'three_year', - 'five_year', 'ten_year', 'incep', 'daily_sharpe', - 'daily_sortino', 'daily_mean', 'daily_vol', - 'daily_skew', 'daily_kurt', 'best_day', 'worst_day', - 'monthly_sharpe', 'monthly_sortino', 'monthly_mean', - 'monthly_vol', 'monthly_skew', 'monthly_kurt', - 'best_month', 'worst_month', 'yearly_sharpe', - 'yearly_sortino', 'yearly_mean', 'yearly_vol', - 'yearly_skew', 'yearly_kurt', 'best_year', 'worst_year', - 'avg_drawdown', 'avg_drawdown_days', 'avg_up_month', - 'avg_down_month', 'win_year_perc', 'twelve_month_win_perc' - ] - - # 准备要存储的数据 - data_to_store = { - 'strategy_name': "组合双均线策略", - 'stock_close_price': json.dumps(stock_data_series.fillna(0).rename_axis('time').reset_index().assign( - time=stock_data_series.index.strftime('%Y%m%d')).set_index('time').to_dict(orient='index')), - 'daily_price': convert_pandas_to_json_serializable(result['daily_prices']), - 'price': convert_pandas_to_json_serializable(result['prices']), - 'returns': convert_pandas_to_json_serializable(result['returns'].fillna(0)), - 'data_start_time': pd.to_datetime(result.stats.loc["start"].iloc[0]).strftime('%Y%m%d'), - 'data_end_time': pd.to_datetime(result.stats.loc["end"].iloc[0]).strftime('%Y%m%d'), - 'backtest_end_time': int(datetime.now().strftime('%Y%m%d')), - 'position': convert_pandas_to_json_serializable(signal), - 'backtest_name': f'组合双均线策略 MA{short_window}-{long_window}日', - 'indicator_type': 'SMA', - 'indicator_information': json.dumps({'short_window': short_window, 'long_window': long_window}) - } - - # 使用循环填充其他字段 - for field in fields_to_store[12:]: # 从第10个字段开始 - value = result.stats.loc[field].iloc[0] - data_to_store[field] = 0.0 if (isinstance(value, float) and np.isnan(value)) else value - - # 检查是否存在该 backtest_name - existing_record = await wance_data_storage_backtest.WanceDataStorageBacktest.filter( - backtest_name=data_to_store['backtest_name'] - ).first() - - if existing_record: - # 如果存在,更新记录 - await wance_data_storage_backtest.WanceDataStorageBacktest.filter( - id=existing_record.id - ).update(**data_to_store) - else: - # 如果不存在,创建新的记录 - await wance_data_storage_backtest.WanceDataStorageBacktest.create(**data_to_store) - - return data_to_store - - -async def run_sma_backtest(field_list: list, - stock_list: list, - period: str = '1d', - start_time: str = '', - end_time: str = '', - count: int = 100, - dividend_type: str = 'none', - fill_data: bool = True, - data_dir: str = '', - short_window: int = 50, - long_window: int = 200): - try: - # 初始化一个列表用于存储每只股票的回测结果字典 - results_list = [] - - # 获取股票数据 - data = await get_local_data(field_list, stock_list, period, start_time, end_time, count, dividend_type, - fill_data, data_dir) - - # 创建组合数据 - combined_data = data[stock_list].copy() # 只选择需要的股票列 - - # 创建双均线策略 - strategy, signal = await create_dual_ma_strategy(combined_data, short_window=short_window, - long_window=long_window) - # 创建回测 - backtest = bt.Backtest(strategy=strategy, data=combined_data, initial_capital=100000) - # 运行回测 - result = bt.run(backtest) - - # 存储回测结果 - data_to_store = await storage_backtest_data(result, signal, combined_data, short_window, long_window) - results_list.append(data_to_store) - - return results_list # 返回结果列表 - - except Exception as e: - print(f"Error occurred: {e}") - - -async def start_sma_backtest_service(field_list: list, - stock_list: list, - period: str = '1d', - start_time: str = '', - end_time: str = '', - count: int = -1, - dividend_type: str = 'none', - fill_data: bool = True, - data_dir: str = '', - short_window: int = 50, - long_window: int = 200): - # 执行回测 - result = await run_sma_backtest( - field_list=field_list, - stock_list=stock_list, - period=period, - start_time=start_time, - end_time=end_time, - count=count, - dividend_type=dividend_type, - fill_data=fill_data, - data_dir=data_dir, - short_window=short_window, - long_window=long_window, - ) - return result - - -async def init_backtest_db(): - sma_list = [{"short_window": 5, "long_window": 10}, {"short_window": 10, "long_window": 30}, - {"short_window": 30, "long_window": 60}, {"short_window": 30, "long_window": 90}, - {"short_window": 70, "long_window": 140}, {"short_window": 120, "long_window": 250}] - await init_tortoise() - wance_db = await wance_data_stock.WanceDataStock.all() - sma_list_length = len(sma_list) - - for stock_code in wance_db: - for i in range(sma_list_length): - short_window = sma_list[i]['short_window'] - long_window = sma_list[i]['long_window'] - source_column_name = f'{stock_code.stock_code}' - - result = await start_sma_backtest_service( - field_list=[source_column_name], - stock_list=[stock_code.stock_code], - period='1d', - start_time='2022-01-01', - end_time='2022-09-01', - count=-1, - dividend_type='none', - fill_data=True, - data_dir='', - short_window=short_window, - long_window=long_window - ) - print( - f"Finished backtesting for {stock_code.stock_code} with short_window: {short_window}, long_window: {long_window}") - - -# 启动回测 -if __name__ == '__main__': - asyncio.run(init_backtest_db()) diff --git a/src/combination/combination_until.py b/src/combination/combination_until.py deleted file mode 100644 index e69de29..0000000 diff --git a/src/combination/dual_moving_average.py b/src/combination/dual_moving_average.py index f8785ee..266fffd 100644 --- a/src/combination/dual_moving_average.py +++ b/src/combination/dual_moving_average.py @@ -6,23 +6,52 @@ import bt import numpy as np import pandas as pd -from src.utils.backtest_until import get_local_data, convert_pandas_to_json_serializable -from src.models import wance_data_storage_backtest, wance_data_stock +from src.models import wance_data_storage_backtest, wance_data_stock, user_combination_history from src.tortoises_orm_config import init_tortoise +from src.utils.backtest_until import convert_pandas_to_json_serializable +from src.utils.combination_until import get_local_data -# 双均线策略函数 -async def create_dual_ma_strategy(data, stock_code: str, short_window: int = 50, long_window: int = 200): - # 生成双均线策略信号 - signal = await dual_ma_strategy(data, short_window, long_window) +# 组合双均线策略函数 +async def create_combined_sma_strategy(data, stock_weights_dict, short_window: int = 50, long_window: int = 200): + """ + 创建组合双均线策略,并根据股票权重进行回测。 - # 使用bt框架构建策略 - strategy = bt.Strategy(f'{stock_code} 双均线策略', - [bt.algos.RunDaily(), + 参数: + data: pd.DataFrame, 股票价格数据,行索引为日期,列为股票代码。 + stock_weights_dict: dict, 股票权重字典,键为股票代码,值为权重。 + short_window: int, 短期均线窗口期,默认为50。 + long_window: int, 长期均线窗口期,默认为200。 + + 返回: + strategy: bt.Strategy, 创建的双均线策略对象。 + combined_signal: pd.DataFrame, 每只股票的买卖信号数据框。 + """ + combined_signal = pd.DataFrame(index=data.index) # 初始化信号数据框 + + # 遍历每只股票代码,根据股票权重生成信号 + for stock_code in stock_weights_dict.keys(): + # 确保每个股票代码唯一,不要使用重复代码 + if f'{stock_code}' in data.columns: + stock_data_series = data[[f'{stock_code}']] # 提取对应股票的数据 + stock_data_series.columns = [f'{stock_code}'] # 重命名列 + + signal = await dual_ma_strategy(stock_data_series, short_window, long_window) # 生成信号 + combined_signal[stock_code] = signal # 存储信号 + else: + print(f"Warning: Stock code {stock_code} not found in data columns.") # 提示未找到的股票代码 + + # 将权重转换为 DataFrame,行索引为数据的时间,列为股票代码 + weights = pd.DataFrame(index=data.index, data=[stock_weights_dict] * len(data.index)) + + # 使用bt框架构建组合策略 + strategy = bt.Strategy('组合双均线策略', + [bt.algos.RunDaily(), # 每天运行 bt.algos.SelectAll(), # 选择所有股票 - bt.algos.WeighTarget(signal), # 根据信号调整权重 + bt.algos.WeighSpecified(**stock_weights_dict), # 根据权重调整股票权重 bt.algos.Rebalance()]) # 调仓 - return strategy, signal + + return strategy, combined_signal # 返回策略和信号 async def dual_ma_strategy(df, short_window=20, long_window=50): @@ -37,36 +66,39 @@ async def dual_ma_strategy(df, short_window=20, long_window=50): 返回: signal: pd.DataFrame, 每只股票的买卖信号,1 表示买入,0 表示卖出。 """ - # 计算短期均线和长期均线 - short_ma = df.rolling(window=short_window, min_periods=1).mean() - long_ma = df.rolling(window=long_window, min_periods=1).mean() + # 计算短期和长期均线 + short_ma = df.rolling(window=short_window, min_periods=1).mean() # 短期均线 + long_ma = df.rolling(window=long_window, min_periods=1).mean() # 长期均线 - # 生成买入信号: 当短期均线从下方穿过长期均线 - buy_signal = np.where(short_ma > long_ma, 1, np.nan) + # 生成买入和卖出信号 + buy_signal = np.where(short_ma > long_ma, 1, np.nan) # 买入信号 + sell_signal = np.where(short_ma < long_ma, 0, np.nan) # 卖出信号 - # 生成卖出信号: 当短期均线从上方穿过长期均线 - sell_signal = np.where(short_ma < long_ma, 0, np.nan) - - # 合并买卖信号 + # 将信号转换为DataFrame,并向前填充 signal = pd.DataFrame(buy_signal, index=df.index, columns=df.columns) - signal = np.where(short_ma < long_ma, 0, signal) + signal = np.where(short_ma < long_ma, 0, signal) # 卖出信号更新 + signal = pd.DataFrame(signal, index=df.index, columns=df.columns).ffill() # 前向填充信号 - # 前向填充信号,持仓不变 - signal = pd.DataFrame(signal, index=df.index, columns=df.columns).ffill() - - # 将剩余的 NaN 替换为 0 - signal = signal.fillna(0) - - return signal + signal = signal.fillna(0) # 将 NaN 替换为 0 + return signal # 返回信号 -async def storage_backtest_data(source_column_name, result, signal, stock_code, stock_data_series, short_window, - long_window): - await init_tortoise() +async def storage_backtest_data(user_id, result, stock_data_series, source_column_name, signal, stock_weights, + strategy_parame, strategy_name,short_window,long_window): + """ + 将回测数据存储到数据库。 + + 参数: + user_id: int, 用户ID。 + stock_weights: dict, 股票权重字典。 + strategy_parame: dict, 策略参数字典。 + strategy_name: str, 策略名称。 + """ + await init_tortoise() # 初始化数据库连接 # 要存储的字段列表 fields_to_store = [ - 'stock_code', 'strategy_name', 'stock_close_price', 'daily_price', + 'stock_weights', 'strategy_name', 'stock_close_price', 'daily_price', 'price', 'returns', 'data_start_time', 'data_end_time', 'backtest_end_time', 'position', 'backtest_name', 'rf', 'total_return', 'cagr', 'max_drawdown', 'calmar', 'mtd', 'three_month', @@ -85,7 +117,7 @@ async def storage_backtest_data(source_column_name, result, signal, stock_code, # 准备要存储的数据 data_to_store = { - 'stock_code': stock_code, + 'stock_weights': stock_weights, 'strategy_name': "双均线策略", 'stock_close_price': json.dumps(stock_data_series.fillna(0).rename_axis('time').reset_index().assign( time=stock_data_series.index.strftime('%Y%m%d')).set_index('time').to_dict(orient='index')), @@ -96,7 +128,7 @@ async def storage_backtest_data(source_column_name, result, signal, stock_code, 'data_end_time': pd.to_datetime(result.stats.loc["end"].iloc[0]).strftime('%Y%m%d'), 'backtest_end_time': int(datetime.now().strftime('%Y%m%d')), 'position': convert_pandas_to_json_serializable(signal), - 'backtest_name': f'{stock_code} 双均线策略 MA{short_window}-{long_window}日', + 'backtest_name': f'{stock_weights} 双均线策略 MA{short_window}-{long_window}日', 'indicator_type': 'SMA', 'indicator_information': json.dumps({'short_window': short_window, 'long_window': long_window}) } @@ -107,154 +139,178 @@ async def storage_backtest_data(source_column_name, result, signal, stock_code, data_to_store[field] = 0.0 if (isinstance(value, float) and np.isnan(value)) else value # 检查是否存在该 backtest_name - existing_record = await wance_data_storage_backtest.WanceDataStorageBacktest.filter( - backtest_name=data_to_store['backtest_name'] - ).first() + existing_record = await user_combination_history.UserCombinationHistory.filter(user_id=user_id).first() + + stock_weights_json = json.dumps(stock_weights) # 将权重字典转换为JSON格式 + strategy_parame_json = json.dumps(strategy_parame) # 将策略参数字典转换为JSON格式 if existing_record: # 如果存在,更新记录 - await wance_data_storage_backtest.WanceDataStorageBacktest.filter( + await user_combination_history.UserCombinationHistory.filter( id=existing_record.id - ).update(**data_to_store) + ).update(user_id=user_id, + last_time=int(datetime.now().strftime('%Y%m%d')), # 更新时间 + stock_weights=stock_weights_json, + strategy_name=strategy_name, + strategy_parame=strategy_parame_json) else: # 如果不存在,创建新的记录 - await wance_data_storage_backtest.WanceDataStorageBacktest.create(**data_to_store) - + await user_combination_history.UserCombinationHistory.create(user_id=user_id, + last_time=int(datetime.now().strftime('%Y%m%d')), + stock_weights=stock_weights_json, + strategy_name=strategy_name, + strategy_parame=strategy_parame_json) return data_to_store -async def run_sma_backtest(field_list: list, - stock_list: list, - stock_weights: list, - user_id: int = 1, - period: str = '1d', - start_time: str = '', - end_time: str = '', - count: int = 100, - dividend_type: str = 'none', - fill_data: bool = True, - data_dir: str = '', - short_window: int = 50, - long_window: int = 200): +async def run_combined_sma_backtest(field_list: list, + stock_list: list, + stock_weights: list, + user_id: int = 1, + period: str = '1d', + start_time: str = '', + end_time: str = '', + count: int = 100, + dividend_type: str = 'none', + fill_data: bool = True, + data_dir: str = '', + short_window: int = 50, + long_window: int = 200): + """ + 运行组合双均线策略的回测。 + + 参数: + field_list: list, 需要获取的字段列表。 + stock_list: list, 股票代码列表。 + stock_weights: list, 股票权重列表。 + user_id: int, 用户ID,默认为1。 + period: str, 数据时间周期,默认为'1d'。 + start_time: str, 开始时间,默认为''(空字符串表示不限制)。 + end_time: str, 结束时间,默认为''(空字符串表示不限制)。 + count: int, 获取的数据数量,默认为100。 + dividend_type: str, 分红类型,默认为'none'。 + fill_data: bool, 是否填充缺失数据,默认为True。 + data_dir: str, 数据目录,默认为''(空字符串表示当前目录)。 + short_window: int, 短期均线窗口期,默认为50。 + long_window: int, 长期均线窗口期,默认为200。 + + 返回: + result: bt.BacktestResult, 回测结果。 + """ try: - # 初始化一个列表用于存储每只股票的回测结果字典 - results_list = [] - - # 遍历每只股票的数据(每列代表一个股票的收盘价) + # 获取本地数据 data = await get_local_data(field_list, stock_list, period, start_time, end_time, count, dividend_type, - fill_data, - data_dir) + fill_data, data_dir) - for stock_code in stock_list: + # 将 stock_weights 转换为字典形式,提取第一个字典作为权重 + stock_weights_dict = stock_weights[0] if stock_weights else {} - data_column_name = f'close_{stock_code}' - source_column_name = f'{stock_code} 双均线策略' - backtest_name = f'{stock_code} 双均线策略 MA{short_window}-{long_window}日' - now_data = int(datetime.now().strftime('%Y%m%d')) - db_result_data = await wance_data_storage_backtest.WanceDataStorageBacktest.filter( - backtest_name=backtest_name) + # 创建组合双均线策略 + strategy, signal = await create_combined_sma_strategy(data, stock_weights_dict, short_window, long_window) - if db_result_data: - if db_result_data[0].backtest_end_time == now_data: - results_list.append({source_column_name: db_result_data[0]}) + # 创建回测 + backtest = bt.Backtest(strategy, data, initial_capital=100000) # 初始化回测对象 - if data_column_name in data.columns: - stock_data_series = data[[data_column_name]] # 提取该股票的收盘价 DataFrame - stock_data_series.columns = ['close'] # 重命名列为 'close' + strategy_parame = {"short_window": short_window, "long_window": long_window} # 策略参数 - # 创建双均线策略 - strategy, signal = await create_dual_ma_strategy(stock_data_series, stock_code, - short_window=short_window, - long_window=long_window) - # 创建回测 - backtest = bt.Backtest(strategy=strategy, data=stock_data_series, initial_capital=100000) - # 运行回测 - result = bt.run(backtest) - # 存储回测结果 - data_to_store = await storage_backtest_data(source_column_name, result, signal, stock_code, - stock_data_series, - short_window, long_window) - # # 绘制回测结果图表 - # result.plot() - # # 绘制个别股票数据图表 - # plt.figure(figsize=(12, 6)) - # plt.plot(stock_data_series.index, stock_data_series['close'], label='Stock Price') - # plt.title(f'Stock Price for {stock_code}') - # plt.xlabel('Date') - # plt.ylabel('Price') - # plt.legend() - # plt.grid(True) - # plt.show() - # 将结果存储为字典并添加到列表中 - results_list.append({source_column_name: data_to_store}) + strategy_name = "双均线策略" # 策略名称 - else: - print(f"数据中缺少列: {data_column_name}") + # 运行回测 + result = bt.run(backtest) # 执行回测 - return results_list # 返回结果列表 + # 存储回测数据 + result = await storage_backtest_data(user_id=user_id, result=result, stock_data_series=data, + source_column_name="组合双均线策略", + signal=signal, stock_weights=stock_weights_dict, + strategy_parame=strategy_parame, + strategy_name=strategy_name,short_window=short_window,long_window=long_window) + + return result # 返回回测结果 except Exception as e: - print(f"Error occurred: {e}") + print(f"Error occurred: {e}") # 打印错误信息 async def start_sma_combination_service(field_list: list, - stock_list: list, - stock_weights:list, - user_id:int = 1, - period: str = '1d', - start_time: str = '', - end_time: str = '', - count: int = -1, - dividend_type: str = 'none', - fill_data: bool = True, - data_dir: str = '', - short_window: int = 50, - long_window: int = 200): - for stock_code in stock_list: - backtest_name = f'{stock_code} 双均线策略 MA{short_window}-{long_window}日' - db_result = await wance_data_storage_backtest.WanceDataStorageBacktest.filter( - backtest_name=backtest_name) - now_time = int(datetime.now().strftime('%Y%m%d')) + stock_list: list, + stock_weights: list, + user_id: int = 1, + period: str = '1d', + start_time: str = '', + end_time: str = '', + count: int = -1, + dividend_type: str = 'none', + fill_data: bool = True, + data_dir: str = '', + short_window: int = 50, + long_window: int = 200): + """ + 启动双均线组合策略服务。 - if db_result and db_result[0].backtest_end_time == now_time: - return db_result - else: - # 执行回测 - result = await run_sma_backtest( - field_list=field_list, - stock_list=stock_list, - period=period, - start_time=start_time, - end_time=end_time, - count=count, - dividend_type=dividend_type, - fill_data=fill_data, - data_dir=data_dir, - short_window=short_window, - long_window=long_window, - ) - return result + 参数: + field_list: list, 需要获取的字段列表。 + stock_list: list, 股票代码列表。 + stock_weights: list, 股票权重列表。 + user_id: int, 用户ID,默认为1。 + period: str, 数据时间周期,默认为'1d'。 + start_time: str, 开始时间,默认为''(空字符串表示不限制)。 + end_time: str, 结束时间,默认为''(空字符串表示不限制)。 + count: int, 获取的数据数量,默认为-1(表示不限制)。 + dividend_type: str, 分红类型,默认为'none'。 + fill_data: bool, 是否填充缺失数据,默认为True。 + data_dir: str, 数据目录,默认为''(空字符串表示当前目录)。 + short_window: int, 短期均线窗口期,默认为50。 + long_window: int, 长期均线窗口期,默认为200。 + + 返回: + result: bt.BacktestResult, 回测结果。 + """ + result = await run_combined_sma_backtest( + field_list=field_list, + stock_list=stock_list, + user_id=user_id, + stock_weights=stock_weights, + period=period, + start_time=start_time, + end_time=end_time, + count=count, + dividend_type=dividend_type, + fill_data=fill_data, + data_dir=data_dir, + short_window=short_window, + long_window=long_window, + ) + return result # 返回回测结果 async def init_backtest_db(): + """ + 初始化回测数据库,针对每个股票和不同的均线组合运行回测。 + + """ + # 定义短期和长期均线组合 sma_list = [{"short_window": 5, "long_window": 10}, {"short_window": 10, "long_window": 30}, {"short_window": 30, "long_window": 60}, {"short_window": 30, "long_window": 90}, {"short_window": 70, "long_window": 140}, {"short_window": 120, "long_window": 250}] - await init_tortoise() - wance_db = await wance_data_stock.WanceDataStock.all() - sma_list_lenght = len(sma_list) + await init_tortoise() # 初始化数据库连接 + wance_db = await wance_data_stock.WanceDataStock.all() # 获取所有股票数据 + sma_list_length = len(sma_list) # 获取均线组合的数量 + + # 遍历每只股票,针对每个均线组合运行回测 for stock_code in wance_db: - for i in range(sma_list_lenght): - short_window = sma_list[i]['short_window'] - long_window = sma_list[i]['long_window'] - source_column_name = f'{stock_code} 双均线策略 MA{short_window}-{long_window}日' - result = await run_sma_backtest(field_list=['close', 'time'], - stock_list=[stock_code.stock_code], - short_window=short_window, - long_window=long_window) + for i in range(sma_list_length): + short_window = sma_list[i]['short_window'] # 短期均线 + long_window = sma_list[i]['long_window'] # 长期均线 + source_column_name = f'{stock_code} 双均线策略 MA{short_window}-{long_window}日' # 结果列名称 - print(f"回测成功 {source_column_name}") + # 运行回测 + result = await run_combined_sma_backtest(field_list=['close', 'time'], + stock_list=[stock_code.stock_code], + short_window=short_window, + long_window=long_window) + + print(f"回测成功 {source_column_name}") # 打印回测成功信息 if __name__ == '__main__': @@ -265,4 +321,10 @@ if __name__ == '__main__': # long_window=30)) # 初始化数据库表 - asyncio.run(init_backtest_db()) + # asyncio.run(init_backtest_db()) + asyncio.run(run_combined_sma_backtest(field_list=['close', 'time'], + stock_list=["688031.SH", "600025.SH", "601222.SH"], + user_id=1, + stock_weights=[{"688031.SH": 0.5, "600025.SH": 0.3, "601222.SH": 0.2}], + short_window=10, + long_window=30)) diff --git a/src/combination/macd_strategy.py b/src/combination/macd_strategy.py new file mode 100644 index 0000000..3030e29 --- /dev/null +++ b/src/combination/macd_strategy.py @@ -0,0 +1,254 @@ +import asyncio +import json +from datetime import datetime + +import bt +import numpy as np +import pandas as pd + +from src.models import wance_data_storage_backtest, wance_data_stock, user_combination_history +from src.tortoises_orm_config import init_tortoise +from src.utils.combination_until import get_local_data + + +# 创建组合MACD策略函数 +async def create_combined_macd_strategy(data, stock_weights_dict, short_window: int = 12, long_window: int = 26, signal_window: int = 9): + """ + 创建组合MACD策略,并根据股票权重进行回测。 + + 参数: + data: pd.DataFrame, 股票价格数据,行索引为日期,列为股票代码。 + stock_weights_dict: dict, 股票权重字典,键为股票代码,值为相应权重。 + short_window: int, 短期EMA窗口期,默认为12。 + long_window: int, 长期EMA窗口期,默认为26。 + signal_window: int, 信号线窗口期,默认为9。 + + 返回: + strategy: bt.Strategy, 创建的组合MACD策略。 + combined_signal: pd.DataFrame, 各股票的买卖信号。 + """ + combined_signal = pd.DataFrame(index=data.index) # 创建一个空的DataFrame用于存放买卖信号 + + # 遍历每个股票代码,计算对应的MACD信号 + for stock_code in stock_weights_dict.keys(): + if stock_code in data.columns: + stock_data_series = data[[stock_code]] # 获取该股票的价格数据 + stock_data_series.columns = [stock_code] # 重命名列 + + signal = await macd_strategy(stock_data_series, short_window, long_window, signal_window) # 计算MACD信号 + combined_signal[stock_code] = signal # 将信号存入combined_signal + else: + print(f"Warning: Stock code {stock_code} not found in data columns.") # 如果股票代码不存在,发出警告 + + # 将权重转换为 DataFrame,行索引为数据的时间,列为股票代码 + weights = pd.DataFrame(index=data.index, data=[stock_weights_dict] * len(data.index)) + + # 使用bt框架构建组合策略 + strategy = bt.Strategy('组合MACD策略', + [bt.algos.RunDaily(), # 每日运行策略 + bt.algos.SelectAll(), # 选择所有股票 + bt.algos.WeighSpecified(**stock_weights_dict), # 按照指定的权重分配 + bt.algos.Rebalance()]) # 调整持仓 + + return strategy, combined_signal # 返回创建的策略和买卖信号 + + +# MACD策略生成买卖信号 +async def macd_strategy(df, short_window=12, long_window=26, signal_window=9): + """ + 基于MACD策略生成买卖信号。 + + 参数: + df: pd.DataFrame, 股票的价格数据,行索引为日期,列为股票代码。 + short_window: int, 短期EMA窗口期,默认为12。 + long_window: int, 长期EMA窗口期,默认为26。 + signal_window: int, 信号线窗口期,默认为9。 + + 返回: + signal: pd.DataFrame, 每只股票的买卖信号,1 表示买入,-1 表示卖出。 + """ + # 计算短期和长期EMA + short_ema = df.ewm(span=short_window, adjust=False).mean() # 短期指数移动平均 + long_ema = df.ewm(span=long_window, adjust=False).mean() # 长期指数移动平均 + + # 计算MACD和信号线 + macd = short_ema - long_ema # MACD值 + signal_line = macd.ewm(span=signal_window, adjust=False).mean() # 信号线 + + # 生成买卖信号 + signal = np.where(macd > signal_line, 1, 0) # MACD上穿信号线 -> 买入 + signal = np.where(macd < signal_line, -1, signal) # MACD下穿信号线 -> 卖出 + signal = pd.DataFrame(signal, index=df.index, columns=df.columns).ffill() # 前向填充信号 + + signal = signal.fillna(0) # 将所有NaN替换为0 + return signal # 返回买卖信号 + + +# 保存回测数据 +async def storage_backtest_data(user_id, stock_weights, strategy_parame, strategy_name): + """ + 将回测结果保存到数据库。 + + 参数: + user_id: int, 用户ID。 + stock_weights: list, 股票权重。 + strategy_parame: dict, 策略参数。 + strategy_name: str, 策略名称。 + """ + await init_tortoise() # 初始化Tortoise ORM + + # 检查是否存在该 backtest_name + existing_record = await user_combination_history.UserCombinationHistory.filter(user_id=user_id).first() + + # 将股票权重和策略参数转换为JSON格式 + stock_weights_json = json.dumps(stock_weights) + strategy_parame_json = json.dumps(strategy_parame) + + if existing_record: # 如果存在记录,则更新 + await user_combination_history.UserCombinationHistory.filter( + id=existing_record.id + ).update(user_id=user_id, + last_time=int(datetime.now().strftime('%Y%m%d')), + stock_weights=stock_weights_json, + strategy_name=strategy_name, + strategy_parame=strategy_parame_json) + else: # 如果不存在记录,则创建新的记录 + await user_combination_history.UserCombinationHistory.create(user_id=user_id, + last_time=int(datetime.now().strftime('%Y%m%d')), + stock_weights=stock_weights_json, + strategy_name=strategy_name, + strategy_parame=strategy_parame_json) + + +# 运行组合MACD策略回测 +async def run_combined_macd_backtest(field_list: list, + stock_list: list, + stock_weights: list, + user_id: int = 1, + period: str = '1d', + start_time: str = '', + end_time: str = '', + count: int = 100, + dividend_type: str = 'none', + fill_data: bool = True, + data_dir: str = '', + short_window: int = 12, + long_window: int = 26, + signal_window: int = 9): + """ + 运行组合MACD策略的回测。 + + 参数: + field_list: list, 需要获取的字段列表。 + stock_list: list, 股票代码列表。 + stock_weights: list, 股票权重列表。 + user_id: int, 用户ID,默认为1。 + period: str, 数据时间周期,默认为'1d'。 + start_time: str, 开始时间,默认为''(空字符串表示不限制)。 + end_time: str, 结束时间,默认为''(空字符串表示不限制)。 + count: int, 获取的数据数量,默认为100。 + dividend_type: str, 分红类型,默认为'none'。 + fill_data: bool, 是否填充缺失数据,默认为True。 + data_dir: str, 数据目录,默认为''(空字符串表示当前目录)。 + short_window: int, 短期EMA窗口期,默认为12。 + long_window: int, 长期EMA窗口期,默认为26。 + signal_window: int, 信号线窗口期,默认为9。 + + 返回: + result: bt.BacktestResult, 回测结果。 + """ + try: + # 获取本地数据 + data = await get_local_data(field_list, stock_list, period, start_time, end_time, count, dividend_type, + fill_data, data_dir) + + # 将 stock_weights 转换为字典形式,提取第一个字典作为权重 + stock_weights_dict = stock_weights[0] if stock_weights else {} + + # 创建组合MACD策略 + strategy, signal = await create_combined_macd_strategy(data, stock_weights_dict, short_window, long_window, signal_window) + + # 创建回测 + backtest = bt.Backtest(strategy, data, initial_capital=100000) # 使用初始资本创建回测 + + # 策略参数和名称 + strategy_parame = {"short_window": short_window, "long_window": long_window, "signal_window": signal_window} + strategy_name = "MACD策略" + + # 运行回测 + result = bt.run(backtest) # 运行回测 + + # 保存回测数据 + await storage_backtest_data(user_id, stock_weights_dict, strategy_parame, strategy_name) + + return result # 返回回测结果 + + except Exception as e: + print(f"Error occurred: {e}") # 捕获并打印异常 + + +# 启动MACD策略服务 +async def start_macd_combination_service(field_list: list, + stock_list: list, + stock_weights: list, + user_id: int = 1, + period: str = '1d', + start_time: str = '', + end_time: str = '', + count: int = -1, + dividend_type: str = 'none', + fill_data: bool = True, + data_dir: str = '', + short_window: int = 12, + long_window: int = 26, + signal_window: int = 9): + """ + 启动MACD策略组合服务。 + + 参数: + field_list: list, 需要获取的字段列表。 + stock_list: list, 股票代码列表。 + stock_weights: list, 股票权重列表。 + user_id: int, 用户ID,默认为1。 + period: str, 数据时间周期,默认为'1d'。 + start_time: str, 开始时间,默认为''(空字符串表示不限制)。 + end_time: str, 结束时间,默认为''(空字符串表示不限制)。 + count: int, 获取的数据数量,默认为-1(表示获取所有可用数据)。 + dividend_type: str, 分红类型,默认为'none'。 + fill_data: bool, 是否填充缺失数据,默认为True。 + data_dir: str, 数据目录,默认为''(空字符串表示当前目录)。 + short_window: int, 短期EMA窗口期,默认为12。 + long_window: int, 长期EMA窗口期,默认为26。 + signal_window: int, 信号线窗口期,默认为9。 + + 返回: + result: bt.BacktestResult, 回测结果。 + """ + result = await run_combined_macd_backtest( + field_list=field_list, + stock_list=stock_list, + user_id=user_id, + stock_weights=stock_weights, + period=period, + start_time=start_time, + end_time=end_time, + count=count, + dividend_type=dividend_type, + fill_data=fill_data, + data_dir=data_dir, + short_window=short_window, + long_window=long_window, + signal_window=signal_window + ) + return result # 返回回测结果 + + +if __name__ == '__main__': + # 运行回测 + asyncio.run(run_combined_macd_backtest(field_list=['close', 'time'], + stock_list=["688031.SH", "600025.SH", "601222.SH"], + user_id=1, + stock_weights=[{"688031.SH": 0.5, "600025.SH": 0.3, "601222.SH": 0.2}], + short_window=12, + long_window=26, + signal_window=9)) diff --git a/src/combination/reverse_dual_ma_strategy.py b/src/combination/reverse_dual_ma_strategy.py index 84926d7..4766eaf 100644 --- a/src/combination/reverse_dual_ma_strategy.py +++ b/src/combination/reverse_dual_ma_strategy.py @@ -6,256 +6,249 @@ import bt import numpy as np import pandas as pd -from src.utils.backtest_until import get_local_data, convert_pandas_to_json_serializable -from src.models import wance_data_storage_backtest, wance_data_stock +from src.models import wance_data_storage_backtest, wance_data_stock, user_combination_history from src.tortoises_orm_config import init_tortoise +from src.utils.combination_until import get_local_data - -# 反双均线策略函数 -async def create_dual_ma_strategy(data, stock_code: str, short_window: int = 50, long_window: int = 200): - # 生成反双均线策略信号 - signal = await reverse_dual_ma_strategy(data, short_window, long_window) - - # 使用bt框架构建策略 - strategy = bt.Strategy(f'{stock_code} 反双均线策略', - [bt.algos.RunDaily(), - bt.algos.SelectAll(), # 选择所有股票 - bt.algos.WeighTarget(signal), # 根据信号调整权重 - bt.algos.Rebalance()]) # 调仓 - return strategy, signal - - - -# 定义反反双均线策略的函数 -def reverse_dual_ma_strategy(data, short_window=50, long_window=200): +# 组合反双均线策略函数 +async def create_combined_sma_strategy(data, stock_weights_dict, short_window: int = 50, long_window: int = 200): """ - 反反双均线策略,当短期均线跌破长期均线时买入,穿过长期均线时卖出。 + 创建组合反双均线策略,并根据股票权重进行回测。 参数: - data: pd.DataFrame, 股票的价格数据,行索引为日期,列为股票代码。 - short_window: int, 短期均线的窗口期。 - long_window: int, 长期均线的窗口期。 + data: pd.DataFrame, 包含股票的历史价格数据,行索引为时间,列为股票代码。 + stock_weights_dict: dict, 包含每只股票的权重。 + short_window: int, 短期均线的计算窗口,默认为50。 + long_window: int, 长期均线的计算窗口,默认为200。 + + 返回: + strategy: bt.Strategy, 创建的组合策略。 + combined_signal: pd.DataFrame, 每只股票的买卖信号。 + """ + # 创建一个 DataFrame 用于存储组合信号,索引为数据的日期 + combined_signal = pd.DataFrame(index=data.index) + + # 遍历每只股票,根据其权重生成信号 + for stock_code in stock_weights_dict.keys(): + # 确保每个股票代码在数据中唯一 + if f'{stock_code}' in data.columns: + # 选择对应股票的数据列 + stock_data_series = data[[f'{stock_code}']] + stock_data_series.columns = [f'{stock_code}'] + + # 调用反双均线策略生成买卖信号 + signal = await reverse_dual_ma_strategy(stock_data_series, short_window, long_window) + # 将生成的信号添加到组合信号 DataFrame 中 + combined_signal[stock_code] = signal + else: + print(f"Warning: Stock code {stock_code} not found in data columns.") + + # 将权重字典转换为 DataFrame,行索引为数据的时间,列为股票代码 + weights = pd.DataFrame(index=data.index, data=[stock_weights_dict] * len(data.index)) + + # 使用 bt 框架构建组合策略 + strategy = bt.Strategy('组合反双均线策略', + [bt.algos.RunDaily(), # 每天运行策略 + bt.algos.SelectAll(), # 选择所有股票 + bt.algos.WeighSpecified(**stock_weights_dict), # 使用指定的权重 + bt.algos.Rebalance()]) # 调仓 + + return strategy, combined_signal + + +# 反双均线策略生成买卖信号 +async def reverse_dual_ma_strategy(df, short_window=20, long_window=50): + """ + 基于反双均线策略生成买卖信号。 + + 参数: + df: pd.DataFrame, 股票的价格数据,行索引为日期,列为股票代码。 + short_window: int, 短期均线窗口期。 + long_window: int, 长期均线窗口期。 返回: signal: pd.DataFrame, 每只股票的买卖信号,1 表示买入,0 表示卖出。 """ - # 计算短期均线和长期均线 - short_ma = data.rolling(window=short_window).mean() - long_ma = data.rolling(window=long_window).mean() + # 计算短期和长期均线 + short_ma = df.rolling(window=short_window, min_periods=1).mean() + long_ma = df.rolling(window=long_window, min_periods=1).mean() - # 初始化信号 DataFrame - signal = pd.DataFrame(index=data.index, columns=data.columns) + # 反双均线策略的买卖信号生成规则 + buy_signal = np.where(short_ma < long_ma, 1, np.nan) # 短期均线跌破长期均线 -> 买入信号 + sell_signal = np.where(short_ma > long_ma, 0, np.nan) # 短期均线上穿长期均线 -> 卖出信号 - # 生成买入信号:短期均线从上往下穿过长期均线 - for column in data.columns: - signal[column] = (short_ma[column] < long_ma[column]).astype(int) # 跌破时买入,信号为1 - signal[column] = (short_ma[column] > long_ma[column]).astype(int) * -1 + signal[column] # 穿过时卖出,信号为0 - - # 前向填充信号,保持持仓不变 - signal = signal.ffill() + # 创建信号 DataFrame + signal = pd.DataFrame(buy_signal, index=df.index, columns=df.columns) + # 将卖出信号应用到信号 DataFrame 中 + signal = np.where(short_ma > long_ma, 0, signal) + signal = pd.DataFrame(signal, index=df.index, columns=df.columns).ffill() # 向前填充信号 + signal = signal.fillna(0) # 将 NaN 转换为 0 return signal -async def storage_backtest_data(source_column_name, result, signal, stock_code, stock_data_series, short_window, - long_window): - await init_tortoise() +# 保存回测数据 +async def storage_backtest_data(user_id, stock_weights, strategy_parame, strategy_name): + """ + 保存回测数据到数据库。 - # 要存储的字段列表 - fields_to_store = [ - 'stock_code', 'strategy_name', 'stock_close_price', 'daily_price', - 'price', 'returns', 'data_start_time', 'data_end_time', - 'backtest_end_time', 'position', 'backtest_name', 'rf', 'total_return', 'cagr', - 'max_drawdown', 'calmar', 'mtd', 'three_month', - 'six_month', 'ytd', 'one_year', 'three_year', - 'five_year', 'ten_year', 'incep', 'daily_sharpe', - 'daily_sortino', 'daily_mean', 'daily_vol', - 'daily_skew', 'daily_kurt', 'best_day', 'worst_day', - 'monthly_sharpe', 'monthly_sortino', 'monthly_mean', - 'monthly_vol', 'monthly_skew', 'monthly_kurt', - 'best_month', 'worst_month', 'yearly_sharpe', - 'yearly_sortino', 'yearly_mean', 'yearly_vol', - 'yearly_skew', 'yearly_kurt', 'best_year', 'worst_year', - 'avg_drawdown', 'avg_drawdown_days', 'avg_up_month', - 'avg_down_month', 'win_year_perc', 'twelve_month_win_perc' - ] - - # 准备要存储的数据 - data_to_store = { - 'stock_code': stock_code, - 'strategy_name': "反双均线策略", - 'stock_close_price': json.dumps(stock_data_series.fillna(0).rename_axis('time').reset_index().assign( - time=stock_data_series.index.strftime('%Y%m%d')).set_index('time').to_dict(orient='index')), - 'daily_price': convert_pandas_to_json_serializable(result[source_column_name].daily_prices), - 'price': convert_pandas_to_json_serializable(result[source_column_name].prices), - 'returns': convert_pandas_to_json_serializable(result[source_column_name].returns.fillna(0)), - 'data_start_time': pd.to_datetime(result.stats.loc["start"].iloc[0]).strftime('%Y%m%d'), - 'data_end_time': pd.to_datetime(result.stats.loc["end"].iloc[0]).strftime('%Y%m%d'), - 'backtest_end_time': int(datetime.now().strftime('%Y%m%d')), - 'position': convert_pandas_to_json_serializable(signal), - 'backtest_name': f'{stock_code} 反双均线策略 MA{short_window}-{long_window}日', - 'indicator_type': 'reverse_SMA', - 'indicator_information': json.dumps({'short_window': short_window, 'long_window': long_window}) - } - - # 使用循环填充其他字段 - for field in fields_to_store[12:]: # 从第10个字段开始 - value = result.stats.loc[field].iloc[0] - data_to_store[field] = 0.0 if (isinstance(value, float) and np.isnan(value)) else value + 参数: + user_id: int, 用户的 ID。 + stock_weights: dict, 股票权重。 + strategy_parame: dict, 策略参数。 + strategy_name: str, 策略名称。 + """ + await init_tortoise() # 初始化 Tortoise ORM # 检查是否存在该 backtest_name - existing_record = await wance_data_storage_backtest.WanceDataStorageBacktest.filter( - backtest_name=data_to_store['backtest_name'] - ).first() + existing_record = await user_combination_history.UserCombinationHistory.filter(user_id=user_id).first() + + # 将权重和策略参数转换为 JSON 格式 + stock_weights_json = json.dumps(stock_weights) + strategy_parame_json = json.dumps(strategy_parame) if existing_record: - # 如果存在,更新记录 - await wance_data_storage_backtest.WanceDataStorageBacktest.filter( + # 如果记录已存在,更新记录 + await user_combination_history.UserCombinationHistory.filter( id=existing_record.id - ).update(**data_to_store) + ).update(user_id=user_id, + last_time=int(datetime.now().strftime('%Y%m%d')), + stock_weights=stock_weights_json, + strategy_name=strategy_name, + strategy_parame=strategy_parame_json) else: - # 如果不存在,创建新的记录 - await wance_data_storage_backtest.WanceDataStorageBacktest.create(**data_to_store) - - return data_to_store + # 如果记录不存在,创建新的记录 + await user_combination_history.UserCombinationHistory.create(user_id=user_id, + last_time=int(datetime.now().strftime('%Y%m%d')), + stock_weights=stock_weights_json, + strategy_name=strategy_name, + strategy_parame=strategy_parame_json) -async def run_reverse_reverse_SMA_backtest(field_list: list, - stock_list: list, - period: str = '1d', - start_time: str = '', - end_time: str = '', - count: int = 100, - dividend_type: str = 'none', - fill_data: bool = True, - data_dir: str = '', - short_window: int = 50, - long_window: int = 200): +# 运行组合反双均线策略回测 +async def run_combined_sma_backtest(field_list: list, + stock_list: list, + stock_weights: list, + user_id: int = 1, + period: str = '1d', + start_time: str = '', + end_time: str = '', + count: int = 100, + dividend_type: str = 'none', + fill_data: bool = True, + data_dir: str = '', + short_window: int = 50, + long_window: int = 200): + """ + 运行组合反双均线策略的回测。 + + 参数: + field_list: list, 包含需要获取的字段,如收盘价、时间等。 + stock_list: list, 需要回测的股票列表。 + stock_weights: list, 包含每只股票的权重字典。 + user_id: int, 用户 ID,默认为 1。 + period: str, 数据时间周期,默认为 '1d'。 + start_time: str, 开始时间,默认为 ''(空字符串表示不限制)。 + end_time: str, 结束时间,默认为 ''(空字符串表示不限制)。 + count: int, 获取的数据数量,默认为 100。 + dividend_type: str, 分红类型,默认为 'none'。 + fill_data: bool, 是否填充缺失数据,默认为 True。 + data_dir: str, 数据目录,默认为 ''(空字符串表示当前目录)。 + short_window: int, 短期均线窗口,默认为 50。 + long_window: int, 长期均线窗口,默认为 200。 + + 返回: + result: bt.BacktestResult, 回测结果。 + """ try: - # 初始化一个列表用于存储每只股票的回测结果字典 - results_list = [] - - # 遍历每只股票的数据(每列代表一个股票的收盘价) + # 获取本地数据 data = await get_local_data(field_list, stock_list, period, start_time, end_time, count, dividend_type, - fill_data, - data_dir) + fill_data, data_dir) - for stock_code in stock_list: + # 将 stock_weights 转换为字典形式,提取第一个字典作为权重 + stock_weights_dict = stock_weights[0] if stock_weights else {} - data_column_name = f'close_{stock_code}' - source_column_name = f'{stock_code} 反双均线策略' - backtest_name = f'{stock_code} 反双均线策略 MA{short_window}-{long_window}日' - now_data = int(datetime.now().strftime('%Y%m%d')) - db_result_data = await wance_data_storage_backtest.WanceDataStorageBacktest.filter( - backtest_name=backtest_name) + # 创建组合反双均线策略 + strategy, signal = await create_combined_sma_strategy(data, stock_weights_dict, short_window, long_window) - if db_result_data: - if db_result_data[0].backtest_end_time == now_data: - results_list.append({source_column_name: db_result_data[0]}) + # 创建回测实例 + backtest = bt.Backtest(strategy, data, initial_capital=100000) - elif data_column_name in data.columns: - stock_data_series = data[[data_column_name]] # 提取该股票的收盘价 DataFrame - stock_data_series.columns = ['close'] # 重命名列为 'close' + # 策略参数 + strategy_parame = {"short_window": short_window, "long_window": long_window} - # 创建反双均线策略 - strategy, signal = await create_dual_ma_strategy(stock_data_series, stock_code, - short_window=short_window, - long_window=long_window) - # 创建回测 - backtest = bt.Backtest(strategy=strategy, data=stock_data_series, initial_capital=100000) - # 运行回测 - result = bt.run(backtest) - # 存储回测结果 - data_to_store = await storage_backtest_data(source_column_name, result, signal, stock_code, - stock_data_series, - short_window, long_window) - # # 绘制回测结果图表 - # result.plot() - # # 绘制个别股票数据图表 - # plt.figure(figsize=(12, 6)) - # plt.plot(stock_data_series.index, stock_data_series['close'], label='Stock Price') - # plt.title(f'Stock Price for {stock_code}') - # plt.xlabel('Date') - # plt.ylabel('Price') - # plt.legend() - # plt.grid(True) - # plt.show() - # 将结果存储为字典并添加到列表中 - results_list.append({source_column_name: data_to_store}) + strategy_name = "反双均线策略" - else: - print(f"数据中缺少列: {data_column_name}") + # 运行回测 + result = bt.run(backtest) - return results_list # 返回结果列表 + # 保存回测数据 + await storage_backtest_data(user_id, stock_weights_dict, strategy_parame, strategy_name) + + return result except Exception as e: print(f"Error occurred: {e}") +# 启动反双均线策略服务 async def start_reverse_SMA_combination_service(field_list: list, - stock_list: list, - period: str = '1d', - start_time: str = '', - end_time: str = '', - count: int = -1, - dividend_type: str = 'none', - fill_data: bool = True, - data_dir: str = '', - short_window: int = 50, - long_window: int = 200): - for stock_code in stock_list: - backtest_name = f'{stock_code} 反双均线策略 MA{short_window}-{long_window}日' - db_result = await wance_data_storage_backtest.WanceDataStorageBacktest.filter( - backtest_name=backtest_name) - now_time = int(datetime.now().strftime('%Y%m%d')) + stock_list: list, + stock_weights: list, + user_id: int = 1, + period: str = '1d', + start_time: str = '', + end_time: str = '', + count: int = -1, + dividend_type: str = 'none', + fill_data: bool = True, + data_dir: str = '', + short_window: int = 50, + long_window: int = 200): + """ + 启动反双均线策略服务,运行回测并返回结果。 - if db_result and db_result[0].backtest_end_time == now_time: - return db_result - else: - # 执行回测 - result = await run_reverse_reverse_SMA_backtest( - field_list=field_list, - stock_list=stock_list, - period=period, - start_time=start_time, - end_time=end_time, - count=count, - dividend_type=dividend_type, - fill_data=fill_data, - data_dir=data_dir, - short_window=short_window, - long_window=long_window, - ) - return result + 参数: + field_list: list, 包含需要获取的字段。 + stock_list: list, 需要回测的股票列表。 + stock_weights: list, 股票权重列表。 + user_id: int, 用户 ID,默认为 1。 + period: str, 数据时间周期,默认为 '1d'。 + start_time: str, 开始时间,默认为 ''(空字符串表示不限制)。 + end_time: str, 结束时间,默认为 ''(空字符串表示不限制)。 + count: int, 获取的数据数量,默认为 -1(表示全部)。 + dividend_type: str, 分红类型,默认为 'none'。 + fill_data: bool, 是否填充缺失数据,默认为 True。 + data_dir: str, 数据目录,默认为 ''(空字符串表示当前目录)。 + short_window: int, 短期均线窗口,默认为 50。 + long_window: int, 长期均线窗口,默认为 200。 - -async def init_backtest_db(): - reverse_SMA_list = [{"short_window": 5, "long_window": 10}, {"short_window": 10, "long_window": 30}, - {"short_window": 30, "long_window": 60}, {"short_window": 30, "long_window": 90}, - {"short_window": 70, "long_window": 140}, {"short_window": 120, "long_window": 250}] - await init_tortoise() - wance_db = await wance_data_stock.WanceDataStock.all() - reverse_SMA_list_lenght = len(reverse_SMA_list) - - for stock_code in wance_db: - for i in range(reverse_SMA_list_lenght): - short_window = reverse_SMA_list[i]['short_window'] - long_window = reverse_SMA_list[i]['long_window'] - source_column_name = f'{stock_code} 反双均线策略 MA{short_window}-{long_window}日' - result = await run_reverse_reverse_SMA_backtest(field_list=['close', 'time'], - stock_list=[stock_code.stock_code], - short_window=short_window, - long_window=long_window) - - print(f"回测成功 {source_column_name}") + 返回: + result: bt.BacktestResult, 回测结果。 + """ + result = await run_combined_sma_backtest( + field_list=field_list, + stock_list=stock_list, + user_id=user_id, + stock_weights=stock_weights, + period=period, + start_time=start_time, + end_time=end_time, + count=count, + dividend_type=dividend_type, + fill_data=fill_data, + data_dir=data_dir, + short_window=short_window, + long_window=long_window, + ) + return result if __name__ == '__main__': - # 测试类的回测 - # asyncio.run(run_reverse_SMA_backtest(field_list=['close', 'time'], - # stock_list=['601222.SH', '601677.SH'], - # short_window=10, - # long_window=30)) - - # 初始化数据库表 - asyncio.run(init_backtest_db()) + # 运行回测 + asyncio.run(run_combined_sma_backtest(field_list=['close', 'time'], + stock_list=["688031.SH", "600025.SH", "601222.SH"], + user_id=1, + stock_weights=[{"688031.SH": 0.5, "600025.SH": 0.3, "601222.SH": 0.2}], + short_window=10, + long_window=30)) diff --git a/src/combination/router.py b/src/combination/router.py index 81c83d9..fe0dac3 100644 --- a/src/combination/router.py +++ b/src/combination/router.py @@ -34,7 +34,3 @@ async def stock_chart(request: BackRequest): benchmark_code=request.benchmark_code) return result - -@router.get('/combination') -async def combination(request: BackRequest): - await combination_service() diff --git a/src/combination/rsi_strategy.py b/src/combination/rsi_strategy.py index 7501286..248f56e 100644 --- a/src/combination/rsi_strategy.py +++ b/src/combination/rsi_strategy.py @@ -2,292 +2,261 @@ import asyncio import json from datetime import datetime -import bt +import bt # 引入bt框架以进行回测 import numpy as np import pandas as pd -from src.utils.backtest_until import get_local_data, convert_pandas_to_json_serializable -from src.models import wance_data_storage_backtest, wance_data_stock -from src.tortoises_orm_config import init_tortoise +from src.models import wance_data_storage_backtest, wance_data_stock, user_combination_history # 导入数据库模型 +from src.tortoises_orm_config import init_tortoise # 初始化数据库 +from src.utils.combination_until import get_local_data # 获取本地数据的工具函数 -# RSI策略函数 -async def create_dual_ma_strategy(data, stock_code: str, short_window: int = 50, long_window: int = 200, - overbought: int = 70, oversold: int = 30): - # 生成RSI策略信号 - signal = await rsi_strategy(data, short_window, long_window, overbought, oversold) - - # 使用bt框架构建策略 - strategy = bt.Strategy(f'{stock_code} RSI策略', - [bt.algos.RunDaily(), - bt.algos.SelectAll(), # 选择所有股票 - bt.algos.WeighTarget(signal), # 根据信号调整权重 - bt.algos.Rebalance()]) # 调仓 - return strategy, signal - - -async def rsi_strategy(df, short_window=14, long_window=28, overbought=70, oversold=30): +# 创建RSI策略函数 +async def create_rsi_strategy(data, stock_weights_dict, rsi_period: int = 14, overbought: float = 70, + oversold: float = 30): """ - 基于RSI的策略生成买卖信号。 + 创建组合RSI策略,并根据股票权重进行回测。 + + 参数: + data: pd.DataFrame, 股票的价格数据,行索引为日期,列为股票代码。 + stock_weights_dict: dict, 每只股票的权重字典。 + rsi_period: int, 用于计算RSI的周期,默认值为14。 + overbought: float, 超买阈值,默认值为70。 + oversold: float, 超卖阈值,默认值为30。 + + 返回: + strategy: bt.Strategy, 构建的RSI策略。 + combined_signal: pd.DataFrame, 股票的买卖信号。 + """ + # 创建一个数据框用于存储所有股票的买卖信号 + combined_signal = pd.DataFrame(index=data.index) + + # 遍历每只股票,生成RSI信号 + for stock_code in stock_weights_dict.keys(): + if f'{stock_code}' in data.columns: + # 提取每只股票的数据 + stock_data_series = data[[f'{stock_code}']] + stock_data_series.columns = [f'{stock_code}'] + + # 调用RSI策略计算买卖信号 + signal = await rsi_strategy(stock_data_series, rsi_period, overbought, oversold) + combined_signal[stock_code] = signal # 存储信号 + else: + print(f"Warning: Stock code {stock_code} not found in data columns.") # 股票未在数据中,发出警告 + + # 创建权重数据框 + weights = pd.DataFrame(index=data.index, data=[stock_weights_dict] * len(data.index)) + + # 使用bt框架构建RSI策略 + strategy = bt.Strategy('组合RSI策略', + [bt.algos.RunDaily(), # 每天运行策略 + bt.algos.SelectAll(), # 选择所有可用的股票 + bt.algos.WeighSpecified(**stock_weights_dict), # 根据给定权重进行加权 + bt.algos.Rebalance()]) # 调整投资组合 + + return strategy, combined_signal # 返回构建的策略和信号 + + +async def rsi_strategy(df, period=14, overbought=70, oversold=30): + """ + 基于RSI策略生成买卖信号。 参数: df: pd.DataFrame, 股票的价格数据,行索引为日期,列为股票代码。 - short_window: int, 短期RSI的窗口期。 - long_window: int, 长期RSI的窗口期。 - overbought: int, 超买水平。 - oversold: int, 超卖水平。 + period: int, RSI计算的周期,默认值为14。 + overbought: float, 超买阈值,默认值为70。 + oversold: float, 超卖阈值,默认值为30。 返回: signal: pd.DataFrame, 每只股票的买卖信号,1 表示买入,0 表示卖出。 """ - delta = df.diff().fillna(0) + # 计算价格变化量 + delta = df.diff() + # 计算上涨和下跌的平均值 + gain = (delta.where(delta > 0, 0)).rolling(window=period).mean() # 上涨的平均值 + loss = (-delta.where(delta < 0, 0)).rolling(window=period).mean() # 下跌的平均值 - gain = (delta.where(delta > 0, 0).rolling(window=short_window).mean()).fillna(0) - loss = (-delta.where(delta < 0, 0).rolling(window=short_window).mean()).fillna(0) - - short_rsi = (100 - (100 / (1 + (gain / loss)))).fillna(0) - - long_gain = (delta.where(delta > 0, 0).rolling(window=long_window).mean()).fillna(0) - long_loss = (-delta.where(delta < 0, 0).rolling(window=long_window).mean()).fillna(0) - - long_rsi = (100 - (100 / (1 + (long_gain / long_loss)))).fillna(0) + # 计算相对强弱指数RS和RSI + rs = gain / loss + rsi = 100 - (100 / (1 + rs)) # 计算RSI + # 创建一个数据框用于存储每只股票的买卖信号 signal = pd.DataFrame(index=df.index, columns=df.columns) + # 根据RSI计算买卖信号 for column in df.columns: - signal[column] = np.where((short_rsi[column] < 30) & (long_rsi[column] < 30) & (short_rsi[column] != 0) & (long_rsi[column] != 0), 1, 0) - signal[column] = np.where((short_rsi[column] > 70) & (long_rsi[column] > 70) & (short_rsi[column] != 0) & (long_rsi[column] != 0), 0, signal[column]) + signal[column] = np.where(rsi[column] < oversold, 1, np.nan) # 当RSI低于超卖阈值,生成买入信号 + signal[column] = np.where(rsi[column] > overbought, 0, signal[column]) # 当RSI高于超买阈值,生成卖出信号 - return signal.ffill().fillna(0) + # 前向填充信号,并用0填充NaN值 + signal = signal.ffill().fillna(0) + return signal # 返回信号数据框 -async def storage_backtest_data(source_column_name, result, signal, stock_code, stock_data_series, short_window: int, - long_window: int, overbought: int = 70, - oversold: int = 30): - await init_tortoise() +async def storage_backtest_data(user_id, stock_weights, strategy_parame, strategy_name): + """ + 存储回测结果到数据库。 - # 要存储的字段列表 - fields_to_store = [ - 'stock_code', 'strategy_name', 'stock_close_price', 'daily_price', - 'price', 'returns', 'data_start_time', 'data_end_time', - 'backtest_end_time', 'position', 'backtest_name', 'rf', 'total_return', 'cagr', - 'max_drawdown', 'calmar', 'mtd', 'three_month', - 'six_month', 'ytd', 'one_year', 'three_year', - 'five_year', 'ten_year', 'incep', 'daily_sharpe', - 'daily_sortino', 'daily_mean', 'daily_vol', - 'daily_skew', 'daily_kurt', 'best_day', 'worst_day', - 'monthly_sharpe', 'monthly_sortino', 'monthly_mean', - 'monthly_vol', 'monthly_skew', 'monthly_kurt', - 'best_month', 'worst_month', 'yearly_sharpe', - 'yearly_sortino', 'yearly_mean', 'yearly_vol', - 'yearly_skew', 'yearly_kurt', 'best_year', 'worst_year', - 'avg_drawdown', 'avg_drawdown_days', 'avg_up_month', - 'avg_down_month', 'win_year_perc', 'twelve_month_win_perc' - ] + 参数: + user_id: int, 用户的ID。 + stock_weights: dict, 股票权重字典。 + strategy_parame: dict, 策略参数字典。 + strategy_name: str, 策略名称。 + """ + await init_tortoise() # 初始化数据库 - # 准备要存储的数据 - data_to_store = { - 'stock_code': stock_code, - 'strategy_name': "RSI策略", - 'stock_close_price': json.dumps(stock_data_series.fillna(0).rename_axis('time').reset_index().assign( - time=stock_data_series.index.strftime('%Y%m%d')).set_index('time').to_dict(orient='index')), - 'daily_price': convert_pandas_to_json_serializable(result[source_column_name].daily_prices), - 'price': convert_pandas_to_json_serializable(result[source_column_name].prices), - 'returns': convert_pandas_to_json_serializable(result[source_column_name].returns.fillna(0)), - 'data_start_time': pd.to_datetime(result.stats.loc["start"].iloc[0]).strftime('%Y%m%d'), - 'data_end_time': pd.to_datetime(result.stats.loc["end"].iloc[0]).strftime('%Y%m%d'), - 'backtest_end_time': int(datetime.now().strftime('%Y%m%d')), - 'position': convert_pandas_to_json_serializable(signal), - 'backtest_name': f'{stock_code} RSI策略 RSI{short_window}-RSI{long_window}-overbought{overbought}-oversold{oversold}', - 'indicator_type': 'RSI', - 'indicator_information': json.dumps( - {'short_window': short_window, 'long_window': long_window, 'overbought': overbought, 'oversold': oversold}) - } + # 查询用户的已有记录 + existing_record = await user_combination_history.UserCombinationHistory.filter(user_id=user_id).first() - # 使用循环填充其他字段 - for field in fields_to_store[12:]: # 从第12个字段开始 - value = result.stats.loc[field].iloc[0] - - if isinstance(value, float): - if np.isnan(value): - data_to_store[field] = 0.0 # NaN 处理为 0 - elif np.isinf(value): # 判断是否为无穷大或无穷小 - if value > 0: - data_to_store[field] = 99999.9999 # 正无穷处理 - else: - data_to_store[field] = -99999.9999 # 负无穷处理 - else: - data_to_store[field] = value # 正常的浮点值 - else: - data_to_store[field] = value # 非浮点类型保持不变 - - # 检查是否存在该 backtest_name - existing_record = await wance_data_storage_backtest.WanceDataStorageBacktest.filter( - backtest_name=data_to_store['backtest_name'] - ).first() + # 将股票权重和策略参数转换为JSON格式 + stock_weights_json = json.dumps(stock_weights) + strategy_parame_json = json.dumps(strategy_parame) + # 如果记录存在,更新记录;否则,创建新记录 if existing_record: - # 如果存在,更新记录 - await wance_data_storage_backtest.WanceDataStorageBacktest.filter( + await user_combination_history.UserCombinationHistory.filter( id=existing_record.id - ).update(**data_to_store) + ).update(user_id=user_id, + last_time=int(datetime.now().strftime('%Y%m%d')), + stock_weights=stock_weights_json, + strategy_name=strategy_name, + strategy_parame=strategy_parame_json) else: - # 如果不存在,创建新的记录 - await wance_data_storage_backtest.WanceDataStorageBacktest.create(**data_to_store) - - return data_to_store + await user_combination_history.UserCombinationHistory.create(user_id=user_id, + last_time=int(datetime.now().strftime('%Y%m%d')), + stock_weights=stock_weights_json, + strategy_name=strategy_name, + strategy_parame=strategy_parame_json) -async def run_rsi_backtest(field_list: list, - stock_list: list, - period: str = '1d', - start_time: str = '', - end_time: str = '', - count: int = 100, - dividend_type: str = 'none', - fill_data: bool = True, - data_dir: str = '', - short_window: int = 50, - long_window: int = 200, - overbought: int = 70, - oversold: int = 30 - ): +async def run_combined_rsi_backtest(field_list: list, + stock_list: list, + stock_weights: list, + user_id: int = 1, + period: str = '1d', + start_time: str = '', + end_time: str = '', + count: int = 100, + dividend_type: str = 'none', + fill_data: bool = True, + data_dir: str = '', + rsi_period: int = 14, + overbought: float = 70, + oversold: float = 30): + """ + 运行组合RSI策略的回测。 + + 参数: + field_list: list, 需要获取的字段列表。 + stock_list: list, 需要回测的股票代码列表。 + stock_weights: list, 股票权重列表。 + user_id: int, 用户ID,默认为1。 + period: str, 数据周期,默认为'1d'。 + start_time: str, 开始时间,默认为''。 + end_time: str, 结束时间,默认为''。 + count: int, 数据数量限制,默认为100。 + dividend_type: str, 分红类型,默认为'none'。 + fill_data: bool, 是否填充数据,默认为True。 + data_dir: str, 数据目录,默认为''。 + rsi_period: int, RSI计算周期,默认为14。 + overbought: float, 超买阈值,默认为70。 + oversold: float, 超卖阈值,默认为30。 + + 返回: + result: 回测结果。 + """ try: - # 初始化一个列表用于存储每只股票的回测结果字典 - results_list = [] - - # 遍历每只股票的数据(每列代表一个股票的收盘价) + # 获取本地数据 data = await get_local_data(field_list, stock_list, period, start_time, end_time, count, dividend_type, - fill_data, - data_dir) + fill_data, data_dir) - for stock_code in stock_list: + # 获取股票权重字典 + stock_weights_dict = stock_weights[0] if stock_weights else {} - data_column_name = f'close_{stock_code}' - source_column_name = f'{stock_code} RSI策略' - backtest_name = f'{stock_code} RSI策略 RSI{short_window}-RSI{long_window}' - now_data = int(datetime.now().strftime('%Y%m%d')) - db_result_data = await wance_data_storage_backtest.WanceDataStorageBacktest.filter( - backtest_name=backtest_name) + # 创建RSI策略 + strategy, signal = await create_rsi_strategy(data, stock_weights_dict, rsi_period, overbought, oversold) - if db_result_data: - if db_result_data[0].backtest_end_time == now_data: - results_list.append({source_column_name: db_result_data[0]}) + # 创建回测对象 + backtest = bt.Backtest(strategy, data, initial_capital=100000) - # elif data_column_name in data.columns: - if data_column_name in data.columns: - stock_data_series = data[[data_column_name]] # 提取该股票的收盘价 DataFrame - stock_data_series.columns = ['close'] # 重命名列为 'close' + # 策略参数 + strategy_parame = {"rsi_period": rsi_period, "overbought": overbought, "oversold": oversold} + strategy_name = "RSI策略" - # 创建RSI策略 - strategy, signal = await create_dual_ma_strategy(stock_data_series, stock_code, - short_window=short_window, long_window=long_window) - # 创建回测 - backtest = bt.Backtest(strategy=strategy, data=stock_data_series, initial_capital=100000) - # 运行回测 - result = bt.run(backtest) - # 存储回测结果 - data_to_store = await storage_backtest_data(source_column_name, result, signal, stock_code, - stock_data_series, short_window, long_window, overbought, - oversold) - # # 绘制回测结果图表 - # result.plot() - # # 绘制个别股票数据图表 - # plt.figure(figsize=(12, 6)) - # plt.plot(stock_data_series.index, stock_data_series['close'], label='Stock Price') - # plt.title(f'Stock Price for {stock_code}') - # plt.xlabel('Date') - # plt.ylabel('Price') - # plt.legend() - # plt.grid(True) - # plt.show() - # 将结果存储为字典并添加到列表中 - results_list.append({source_column_name: data_to_store}) + # 运行回测 + result = bt.run(backtest) - else: - print(f"数据中缺少列: {data_column_name}") + # 存储回测结果 + await storage_backtest_data(user_id, stock_weights_dict, strategy_parame, strategy_name) - return results_list # 返回结果列表 + return result # 返回回测结果 except Exception as e: - print(f"Error occurred: {e}") + print(f"Error occurred: {e}") # 输出错误信息 async def start_rsi_combination_service(field_list: list, - stock_list: list, - period: str = '1d', - start_time: str = '', - end_time: str = '', - count: int = -1, - dividend_type: str = 'none', - fill_data: bool = True, - data_dir: str = '', - short_window: int = 50, - long_window: int = 200, - overbought: int = 70, - oversold: int = 30 - ): - for stock_code in stock_list: - backtest_name = f'{stock_code} RSI策略 RSI{short_window}-RSI{long_window}' - db_result = await wance_data_storage_backtest.WanceDataStorageBacktest.filter( - backtest_name=backtest_name) - now_time = int(datetime.now().strftime('%Y%m%d')) + stock_list: list, + stock_weights: list, + user_id: int = 1, + period: str = '1d', + start_time: str = '', + end_time: str = '', + count: int = -1, + dividend_type: str = 'none', + fill_data: bool = True, + data_dir: str = '', + rsi_period: int = 14, + overbought: float = 70, + oversold: float = 30): + """ + 启动RSI组合策略服务。 - if db_result and db_result[0].backtest_end_time == now_time: - return db_result - else: - # 执行回测 - result = await run_rsi_backtest( - field_list=field_list, - stock_list=stock_list, - period=period, - start_time=start_time, - end_time=end_time, - count=count, - dividend_type=dividend_type, - fill_data=fill_data, - data_dir=data_dir, - short_window=short_window, - long_window=long_window, - overbought=overbought, - oversold=oversold - ) - return result + 参数: + field_list: list, 需要获取的字段列表。 + stock_list: list, 需要回测的股票代码列表。 + stock_weights: list, 股票权重列表。 + user_id: int, 用户ID,默认为1。 + period: str, 数据周期,默认为'1d'。 + start_time: str, 开始时间,默认为''。 + end_time: str, 结束时间,默认为''。 + count: int, 数据数量限制,默认为-1。 + dividend_type: str, 分红类型,默认为'none'。 + fill_data: bool, 是否填充数据,默认为True。 + data_dir: str, 数据目录,默认为''。 + rsi_period: int, RSI计算周期,默认为14。 + overbought: float, 超买阈值,默认为70。 + oversold: float, 超卖阈值,默认为30。 - -async def init_backtest_db(): - sma_list = [{"short_window": 3, "long_window": 6}, {"short_window": 6, "long_window": 12}, - {"short_window": 12, "long_window": 24}, {"short_window": 14, "long_window": 18}, - {"short_window": 15, "long_window": 10}] - await init_tortoise() - wance_db = await wance_data_stock.WanceDataStock.all() - sma_list_lenght = len(sma_list) - - for stock_code in wance_db: - for i in range(sma_list_lenght): - short_window = sma_list[i]['short_window'] - long_window = sma_list[i]['long_window'] - source_column_name = f'{stock_code} RSI策略 RSI{short_window}-RSI{long_window}' - result = await start_rsi_backtest_service(field_list=['close', 'time'], - stock_list=[stock_code.stock_code], - short_window=short_window, - long_window=long_window, - overbought=70, - oversold=30) - - print(f"回测成功 {source_column_name}") + 返回: + result: 回测结果。 + """ + # 调用回测函数并返回结果 + result = await run_combined_rsi_backtest( + field_list=field_list, + stock_list=stock_list, + stock_weights=stock_weights, + user_id=user_id, + period=period, + start_time=start_time, + end_time=end_time, + count=count, + dividend_type=dividend_type, + fill_data=fill_data, + data_dir=data_dir, + rsi_period=rsi_period, + overbought=overbought, + oversold=oversold, + ) + return result # 返回回测结果 if __name__ == '__main__': - # 测试类的回测 - asyncio.run(run_rsi_backtest(field_list=['close', 'time'], - stock_list=['601222.SH', '601677.SH'], - count=-1, - short_window=10, - long_window=30, - overbought=70, - oversold=30 - )) - - # # 初始化数据库表 - # asyncio.run(init_backtest_db()) + # 主函数,启动回测服务 + asyncio.run(run_combined_rsi_backtest(field_list=['close', 'time'], + stock_list=["688031.SH", "600025.SH", "601222.SH"], + user_id=1, + stock_weights=[{"688031.SH": 0.5, "600025.SH": 0.3, "601222.SH": 0.2}], + rsi_period=14, + overbought=70, + oversold=30)) diff --git a/src/combination/service.py b/src/combination/service.py index af431e3..ad0e364 100644 --- a/src/combination/service.py +++ b/src/combination/service.py @@ -28,7 +28,7 @@ async def start_combination_service(field_list: list, overbought: int = 70, oversold: int = 30, signal_window: int = 9, - user_id: int = 1): + user_id: int = 1) -> object: # 数据检查 await data_check(field_list=field_list, stock_list=stock_list, diff --git a/src/models/back_observed_data.py b/src/models/back_observed_data.py index d406fe9..ce6662e 100644 --- a/src/models/back_observed_data.py +++ b/src/models/back_observed_data.py @@ -10,4 +10,4 @@ class BackObservedData(Model): class Meta: - table = with_table_name("user_combination_history") \ No newline at end of file + table = with_table_name("back_observed_data") \ No newline at end of file diff --git a/src/models/user_combination_history.py b/src/models/user_combination_history.py index 6d97144..c7c30be 100644 --- a/src/models/user_combination_history.py +++ b/src/models/user_combination_history.py @@ -3,11 +3,12 @@ from src.models import with_table_name class UserCombinationHistory(Model): - id = fields.IntField() + id = fields.IntField(pk=True, ) last_time = fields.CharField(max_length=10, null=True, description='最后一次使用的时间', ) - stock_code = fields.CharField(max_length=20, null=True, description='股票代码', ) - stock_name = fields.CharField(max_length=20, null=True, description='股票名称', ) stock_weights = fields.JSONField(null=True, description='权重存储', ) - strategy = fields.JSONField(null=True, description='用户使用的策略参数', ) strategy_name = fields.CharField(max_length=50, null=True, description='用户使用的策略', ) + strategy_parame = fields.JSONField(null=True, description='用户使用的策略参数', ) user_id = fields.IntField(description='用户的id', ) + + class Meta: + table = with_table_name("user_combination_history") \ No newline at end of file diff --git a/src/utils/backtest_until.py b/src/utils/backtest_until.py index 264d59b..718f4fa 100644 --- a/src/utils/backtest_until.py +++ b/src/utils/backtest_until.py @@ -18,7 +18,6 @@ async def get_local_data(field_list: list, stock_list: list, period: str, start_ data_dir=data_dir) return await data_processing(result) - async def data_processing(result_local): # 初始化一个空的列表,用于存储每个股票的数据框 df_list = [] @@ -27,9 +26,11 @@ async def data_processing(result_local): for stock_code, df in result_local.items(): # 确保 df 是一个 DataFrame if isinstance(df, pd.DataFrame): - # 将时间戳转换为日期时间格式,只保留年-月-日 - df['time'] = pd.to_datetime(df['time'], unit='ms').dt.date - # 将 'time' 列设置为索引,保留为日期格式 + # 将时间戳转换为 UTC 日期时间格式 + df['time'] = pd.to_datetime(df['time'], unit='ms', utc=True) + # 转换为指定时区(例如东八区) + df['time'] = df['time'].dt.tz_convert('Asia/Shanghai') + # 将 'time' 列设置为索引 df.set_index('time', inplace=True) # 将 'close' 列重命名为 'close_股票代码' df.rename(columns={'close': f'close_{stock_code}'}, inplace=True) @@ -41,13 +42,14 @@ async def data_processing(result_local): # 使用 pd.concat() 将所有 DataFrame 合并为一个大的 DataFrame,保留所有列 combined_df = pd.concat(df_list, axis=1) - # 确保返回的 DataFrame 索引是日期格式 + # 确保索引是日期时间格式 combined_df.index = pd.to_datetime(combined_df.index) return combined_df + def convert_pandas_to_json_serializable(data: pd.Series) -> str: """ 将 Pandas Series 或 DataFrame 中的 Timestamp 索引转换为字符串,并返回 JSON 可序列化的结果。 diff --git a/src/utils/combination_until.py b/src/utils/combination_until.py new file mode 100644 index 0000000..18fba57 --- /dev/null +++ b/src/utils/combination_until.py @@ -0,0 +1,166 @@ +import json +from datetime import datetime + +import numpy as np +from xtquant import xtdata +import pandas as pd + +from src.models import wance_data_stock + +# 数据的列名 +columns = ['open', 'high', 'low', 'close', 'volume', 'amount', 'settelmentPrice', + 'openInterest', 'preClose', 'suspendFlag'] + + +# 获取本地数据并进行处理 +async def get_local_data(field_list: list, stock_list: list, period: str, start_time: str, end_time: str, + count: int, dividend_type: str, fill_data: bool, data_dir: str): + list = [] + for field in stock_list: + result = await wance_data_stock.WanceDataStock.filter(stock_code=field) + list.append(int(result[0].time_start)) + start_time = str(max(list)) + result = xtdata.get_local_data(field_list=field_list, stock_list=stock_list, period=period, start_time=start_time, + end_time=end_time, count=count, dividend_type=dividend_type, fill_data=fill_data, + data_dir=data_dir) + return await data_processing(result) + +async def data_processing(result_local): + # 初始化一个空的列表,用于存储每个股票的数据框 + df_list = [] + # 用于存储每个股票的最早时间 + start_times = [] + + # 遍历字典中的 DataFrame + for stock_code, df in result_local.items(): + # 确保 df 是一个 DataFrame + if isinstance(df, pd.DataFrame): + # 将时间戳转换为 UTC 日期时间格式 + df['time'] = pd.to_datetime(df['time'], unit='ms', utc=True) + # 转换为指定时区(例如东八区) + df['time'] = df['time'].dt.tz_convert('Asia/Shanghai') + # 记录每个股票的最早时间 + start_times.append(df['time'].min()) + # 将 'time' 列设置为索引 + df.set_index('time', inplace=True) + # 将 'close' 列重命名为 'close_股票代码' + df.rename(columns={'close': f'{stock_code}'}, inplace=True) + # 将所有列添加到列表中 + df_list.append(df) # 保留所有字段,包括重命名后的 'close_股票代码' + else: + print(f"数据格式错误: {stock_code} 不包含 DataFrame") + + # 找到所有股票数据中最晚的开始时间 + latest_start_time = max(start_times) + + # 使用 pd.concat() 将所有 DataFrame 合并为一个大的 DataFrame,保留所有列 + combined_df = pd.concat(df_list, axis=1) + + # 确保索引是日期时间格式 + combined_df.index = pd.to_datetime(combined_df.index) + + # 根据最新的开始时间切割数据 + combined_df = combined_df[combined_df.index >= latest_start_time] + + # 填充 NaN 值为 0 + combined_df.fillna(0, inplace=True) + + return combined_df + + + + + +def convert_pandas_to_json_serializable(data: pd.Series) -> str: + """ + 将 Pandas Series 或 DataFrame 中的 Timestamp 索引转换为字符串,并返回 JSON 可序列化的结果。 + + 参数: + data: pd.Series 或 pd.DataFrame, 带有时间戳索引的 pandas 数据 + + 返回: + JSON 字符串,键为日期字符串,值为原数据的值。 + """ + # 判断数据类型 + if isinstance(data, (pd.Series, pd.DataFrame)): + # 如果索引是时间戳类型,则转换为 YYYYMMDD 格式 + if isinstance(data.index, pd.DatetimeIndex): + data.index = data.index.strftime('%Y%m%d') + + # 处理 NaN 和 None 的情况,替换为 0 或其他合适的默认值 + data = data.replace([np.nan, None], 0) + + # 将索引重置为普通列,然后转换为字典 + json_serializable_data = data.rename_axis('date').reset_index().to_dict(orient='records') + + # 将字典转换为 JSON 格式字符串 + json_string = json.dumps(json_serializable_data) + return json_string + else: + raise ValueError("输入必须为 Pandas Series 或 DataFrame") + + +def clean_value(value): + """清理数据值,使其适合 JSON 序列化。""" + if pd.isna(value) or np.isinf(value): + return 0 # 或者可以使用 None 代替 + return value + + +def to_json_serializable(data): + """ + 将 Pandas DataFrame 转换为 JSON 可序列化的格式。 + - 将索引(如果是时间戳)转换为 'YYYYMMDD' 格式。 + - 将数据按时间升序排列。 + - 返回数组形式的结构,内部仍使用字典存放每一行数据。 + """ + result = [] + + if isinstance(data, pd.DataFrame): + # 将索引(时间戳)转换为 YYYYMMDD 格式 + data.index = data.index.strftime('%Y%m%d') + + # 按时间(索引)升序排序 + data = data.sort_index(ascending=True) + + # 遍历 DataFrame 的每一行 + for idx, row in data.iterrows(): + # 创建一个字典来存放当前行的数据 + row_dict = {'date': idx} # 将日期作为键 + # 清理行中的每个值 + for col in row.index: + row_dict[col] = clean_value(row[col]) # 清理每个值 + result.append(row_dict) # 将字典添加到结果数组中 + + return result + + elif isinstance(data, pd.Series): + # 将 Series 转换为数组形式(每一行的数据以字典形式存放) + for idx in data.index: + result.append({str(idx.strftime('%Y%m%d')): clean_value(data[idx])}) + return result + + else: + raise TypeError("输入数据必须是 Pandas 的 DataFrame 或 Series") + +async def data_check(field_list: list, + stock_list: list, + period: str = '1d', + start_time: str = '', + end_time: str = '', + count: int = -1, + dividend_type: str = 'none', + fill_data: bool = True, + data_dir: str = '', ): + result_data = xtdata.get_local_data(field_list=[], stock_list=stock_list, period=period, start_time=start_time, + end_time=end_time, count=count, dividend_type=dividend_type, fill_data=fill_data, + data_dir=data_dir) + time_now = int(datetime.now().strftime('%Y%m%d')) + for i in stock_list: + close = int(result_data.get(i).index[-1]) + if close != 0 and close < time_now: + xtdata.download_history_data(stock_code=i, + period='1d', + start_time='', + end_time='', + incrementally=True)