From fa7bc341599184d6e642ae5c4dcc9b6ac3545428 Mon Sep 17 00:00:00 2001 From: XiangJianxiong <1152203250@qq.com> Date: Mon, 14 Oct 2024 14:12:46 +0800 Subject: [PATCH] =?UTF-8?q?1=E3=80=81=E6=9B=B4=E6=96=B0=E7=BB=84=E5=90=88?= =?UTF-8?q?=E5=88=9B=E5=BB=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- models.py | Bin 40996 -> 43014 bytes src/backtest/bollinger_bands.py | 5 +- src/backtest/dual_moving_average.py | 5 +- src/backtest/macd_strategy.py | 5 +- src/backtest/reverse_dual_ma_strategy.py | 5 +- src/backtest/router.py | 18 +- src/backtest/rsi_strategy.py | 5 +- src/backtest/service.py | 47 ++- src/combination/__init__.py | 0 src/combination/bollinger_bands.py | 273 ++++++++++++++++ src/combination/combination.py | 110 +++++++ .../combination_dual_ma_strategy.py | 215 +++++++++++++ src/combination/combination_until.py | 0 src/combination/dual_moving_average.py | 268 ++++++++++++++++ src/combination/reverse_dual_ma_strategy.py | 261 ++++++++++++++++ src/combination/router.py | 40 +++ src/combination/rsi_strategy.py | 293 ++++++++++++++++++ src/combination/service.py | 124 ++++++++ src/main.py | 8 +- src/models/back_observed_data.py | 2 +- src/models/user_combination_history.py | 13 + src/pydantic/backtest_request.py | 5 +- src/tortoises.py | 3 +- .../until.py => utils/backtest_until.py} | 50 ++- src/xtdata/router.py | 1 + src/xtdata/service.py | 7 +- 26 files changed, 1723 insertions(+), 40 deletions(-) create mode 100644 src/combination/__init__.py create mode 100644 src/combination/bollinger_bands.py create mode 100644 src/combination/combination.py create mode 100644 src/combination/combination_dual_ma_strategy.py create mode 100644 src/combination/combination_until.py create mode 100644 src/combination/dual_moving_average.py create mode 100644 src/combination/reverse_dual_ma_strategy.py create mode 100644 src/combination/router.py create mode 100644 src/combination/rsi_strategy.py create mode 100644 src/combination/service.py create mode 100644 src/models/user_combination_history.py rename src/{backtest/until.py => utils/backtest_until.py} (70%) diff --git a/models.py b/models.py index 4b13f384dd40c6941788c135a4328c8936bf2108..fed55bae3f9e27196e6fe2b63fbc31ba5db4ee65 100644 GIT binary patch delta 370 zcmZ2-fT`^Q(*`pMb{~d(hIEF^$%gLolZE06SgOlH<0oIRk(gYdZZesVPh#>t9%8)lXP(*U_1|GM`c0LxI>*62!dos8%s82q~D8JcE!p~0B8K^LqA&DWAArEMF z36RbQY7(^3o?M`*HF;hHA8QWC&dIfH&3p{$d?5^eirMx48l^2|lMe*TOumrEG1*VX zZSn^VAf06}c|%?voPEKTWpa#;3eY{NKx5MxGJwVxPZlhQ? z&ODQa3L++NkY(Y6m>gD}T6!qBAQdb_CVy}i+Wa8g#RC8`MS1T4 delta 27 jcmZp>z_jE5(*`q%%`wt`c9XN>j!X_ upper_band[column], 0, signal[column]) # 卖出信号 + + # 前向填充信号,持仓不变 + signal = signal.ffill() + + # 将剩余的 NaN 替换为 0 + signal = signal.fillna(0) + + return signal + + +async def storage_backtest_data(source_column_name, result, signal, stock_code, stock_data_series, bollingerMA, + std_dev): + await init_tortoise() + + # 要存储的字段列表 + fields_to_store = [ + 'stock_code', 'strategy_name', 'stock_close_price', 'daily_price', + 'price', 'returns', 'data_start_time', 'data_end_time', + 'backtest_end_time', 'position', 'backtest_name', 'rf', 'total_return', 'cagr', + 'max_drawdown', 'calmar', 'mtd', 'three_month', + 'six_month', 'ytd', 'one_year', 'three_year', + 'five_year', 'ten_year', 'incep', 'daily_sharpe', + 'daily_sortino', 'daily_mean', 'daily_vol', + 'daily_skew', 'daily_kurt', 'best_day', 'worst_day', + 'monthly_sharpe', 'monthly_sortino', 'monthly_mean', + 'monthly_vol', 'monthly_skew', 'monthly_kurt', + 'best_month', 'worst_month', 'yearly_sharpe', + 'yearly_sortino', 'yearly_mean', 'yearly_vol', + 'yearly_skew', 'yearly_kurt', 'best_year', 'worst_year', + 'avg_drawdown', 'avg_drawdown_days', 'avg_up_month', + 'avg_down_month', 'win_year_perc', 'twelve_month_win_perc' + ] + + # 准备要存储的数据 + data_to_store = { + 'stock_code': stock_code, + 'strategy_name': "布林带策略", + 'stock_close_price': json.dumps(stock_data_series.fillna(0).rename_axis('time').reset_index().assign( + time=stock_data_series.index.strftime('%Y%m%d')).set_index('time').to_dict(orient='index')), + 'daily_price': convert_pandas_to_json_serializable(result[source_column_name].daily_prices), + 'price': convert_pandas_to_json_serializable(result[source_column_name].prices), + 'returns': convert_pandas_to_json_serializable(result[source_column_name].returns.fillna(0)), + 'data_start_time': pd.to_datetime(result.stats.loc["start"].iloc[0]).strftime('%Y%m%d'), + 'data_end_time': pd.to_datetime(result.stats.loc["end"].iloc[0]).strftime('%Y%m%d'), + 'backtest_end_time': int(datetime.now().strftime('%Y%m%d')), + 'position': convert_pandas_to_json_serializable(signal), + 'backtest_name': f'{stock_code} 布林带策略 MA{bollingerMA}-{std_dev}倍标准差', + 'indicator_type': 'Bollinger', + 'indicator_information': json.dumps({'bollingerMA': bollingerMA, 'std_dev': std_dev}) + } + + # 使用循环填充其他字段 + for field in fields_to_store[12:]: # 从第10个字段开始 + value = result.stats.loc[field].iloc[0] + data_to_store[field] = 0.0 if (isinstance(value, float) and np.isnan(value)) else value + + # 检查是否存在该 backtest_name + existing_record = await wance_data_storage_backtest.WanceDataStorageBacktest.filter( + backtest_name=data_to_store['backtest_name'] + ).first() + + if existing_record: + # 如果存在,更新记录 + await wance_data_storage_backtest.WanceDataStorageBacktest.filter( + id=existing_record.id + ).update(**data_to_store) + else: + # 如果不存在,创建新的记录 + await wance_data_storage_backtest.WanceDataStorageBacktest.create(**data_to_store) + + return data_to_store + + +async def run_bollinger_backtest(field_list: list, + stock_list: list, + period: str = '1d', + start_time: str = '', + end_time: str = '', + count: int = 100, + dividend_type: str = 'none', + fill_data: bool = True, + data_dir: str = '', + bollingerMA: int = 50, + std_dev: int = 200): + try: + # 初始化一个列表用于存储每只股票的回测结果字典 + results_list = [] + + # 遍历每只股票的数据(每列代表一个股票的收盘价) + data = await get_local_data(field_list, stock_list, period, start_time, end_time, count, dividend_type, + fill_data, + data_dir) + + for stock_code in stock_list: + + data_column_name = f'close_{stock_code}' + source_column_name = f'{stock_code} 布林带策略' + backtest_name = f'{stock_code} 布林带策略 MA{bollingerMA}-{std_dev}倍标准差' + now_time = int(datetime.now().strftime('%Y%m%d')) + db_result = await wance_data_storage_backtest.WanceDataStorageBacktest.filter( + backtest_name=backtest_name) + if db_result: + if db_result[0].backtest_end_time == now_time: + results_list.append({source_column_name: db_result[0]}) + + # elif data_column_name in data.columns: + if data_column_name in data.columns: + stock_data_series = data[[data_column_name]] # 提取该股票的收盘价 DataFrame + stock_data_series.columns = ['close'] # 重命名列为 'close' + + # 创建布林带策略 + strategy, signal = await create_bollinger_bands_strategy(stock_data_series, stock_code, + bollingerMA=bollingerMA, + std_dev=std_dev) + # 创建回测 + backtest = bt.Backtest(strategy=strategy, data=stock_data_series, initial_capital=100000) + # 运行回测 + result = bt.run(backtest) + # 存储回测结果 + data_to_store = await storage_backtest_data(source_column_name, result, signal, stock_code, + stock_data_series, + bollingerMA, std_dev) + # # 绘制回测结果图表 + # result.plot() + # # 绘制个别股票数据图表 + # plt.figure(figsize=(12, 6)) + # plt.plot(stock_data_series.index, stock_data_series['close'], label='Stock Price') + # plt.title(f'Stock Price for {stock_code}') + # plt.xlabel('Date') + # plt.ylabel('Price') + # plt.legend() + # plt.grid(True) + # plt.show() + # 将结果存储为字典并添加到列表中 + results_list.append({source_column_name: data_to_store}) + + else: + print(f"数据中缺少列: {data_column_name}") + + return results_list # 返回结果列表 + + except Exception as e: + print(f"Error occurred: {e}") + + +async def start_bollinger_combination_service(field_list: list, + stock_list: list, + period: str = '1d', + start_time: str = '', + end_time: str = '', + count: int = -1, + dividend_type: str = 'none', + fill_data: bool = True, + data_dir: str = '', + bollingerMA: int = 50, + std_dev: int = 200): + for stock_code in stock_list: + backtest_name = f'{stock_code} 布林带策略 MA{bollingerMA}-{std_dev}倍标准差' + db_result = await wance_data_storage_backtest.WanceDataStorageBacktest.filter( + backtest_name=backtest_name) + now_time = int(datetime.now().strftime('%Y%m%d')) + + if db_result and db_result[0].backtest_end_time == now_time: + return db_result + else: + # 执行回测 + result = await run_bollinger_backtest( + field_list=field_list, + stock_list=stock_list, + period=period, + start_time=start_time, + end_time=end_time, + count=count, + dividend_type=dividend_type, + fill_data=fill_data, + data_dir=data_dir, + bollingerMA=bollingerMA, + std_dev=std_dev, + ) + return result + + +async def init_backtest_db(): + bollinger_list = [{"bollingerMA": 20, "std_dev": 2}, {"bollingerMA": 30, "std_dev": 2}, + {"bollingerMA": 70, "std_dev": 2}, {"bollingerMA": 5, "std_dev": 1}, + {"bollingerMA": 20, "std_dev": 3}, {"bollingerMA": 50, "std_dev": 2.5}] + await init_tortoise() + wance_db = await wance_data_stock.WanceDataStock.all() + bollinger_list_lenght = len(bollinger_list) + + for stock_code in wance_db: + for i in range(bollinger_list_lenght): + bollingerMA = bollinger_list[i]['bollingerMA'] + std_dev = bollinger_list[i]['std_dev'] + source_column_name = f'{stock_code} 布林带策略 MA{bollingerMA}-{std_dev}倍标准差' + result = await run_bollinger_backtest(field_list=['close', 'time'], + stock_list=[stock_code.stock_code], + bollingerMA=bollingerMA, + std_dev=std_dev) + + print(f"回测成功 {source_column_name}") + + +if __name__ == '__main__': + # 测试类的回测 + asyncio.run(run_bollinger_backtest(field_list=['close', 'time'], + stock_list=['601222.SH', '601677.SH'], + bollingerMA=20, + std_dev=2)) + + # # 初始化数据库表 + # asyncio.run(init_backtest_db()) diff --git a/src/combination/combination.py b/src/combination/combination.py new file mode 100644 index 0000000..ca9551c --- /dev/null +++ b/src/combination/combination.py @@ -0,0 +1,110 @@ +import pandas as pd +import bt +import yfinance as yf # 这里使用 yfinance 获取股票数据,实际应用可以替换成其他数据源 + + +def fetch_stock_data(stock_list, start, end): + """ + 获取指定股票的历史数据。 + + 参数: + stock: str, 股票代码。 + start: str, 开始时间。 + end: str, 结束时间。 + + 返回: + pd.DataFrame, 包含股票的收盘价数据。 + """ + data = yf.download(stock_list, start=start, end=end)['Adj Close'] + return data + + +def combine_stock_data(stock_list, start, end): + """ + 合并多只股票的数据。 + + 参数: + stocks: list, 股票代码列表。 + start: str, 开始时间。 + end: str, 结束时间。 + + 返回: + pd.DataFrame, 合并后的股票价格数据。 + """ + combined_data = pd.DataFrame() + + for stock in stock_list: + data = fetch_stock_data(stock, start, end) + combined_data[stock] = data + + return combined_data + + +def create_portfolio_strategy(stock_weights): + """ + 创建基于股票权重的投资组合策略。 + + 参数: + stock_weights: list of dict, 股票与权重的字典列表。例如: + [{'601222.SH': 0.5, '605090.SH': 0.3, '600025.SH': 0.2}] + + 返回: + 策略对象。 + """ + algos = [ + bt.algos.RunDaily(), # 每日运行策略 + bt.algos.SelectAll(), # 选择所有股票 + bt.algos.WeighSpecified(**stock_weights[0]), # 分配指定的权重 + bt.algos.Rebalance() # 进行调仓 + ] + + strategy = bt.Strategy('Portfolio Strategy', algos) + + return strategy + + +def run_portfolio_backtest(stocks_list, start, end, stock_weights): + """ + 运行组合策略的回测。 + + 参数: + stocks: list, 股票代码列表。 + start: str, 开始时间。 + end: str, 结束时间。 + stock_weights: list of dict, 股票与权重的字典列表。例如: + [{'601222.SH': 0.5, '605090.SH': 0.3, '600025.SH': 0.2}] + + 返回: + 回测结果。 + """ + # 获取股票的历史数据 + data = combine_stock_data(stocks_list, start, end) + + # 创建策略 + strategy = create_portfolio_strategy(stock_weights) + + # 创建回测 + backtest = bt.Backtest(strategy, data) + + # 运行回测 + result = bt.run(backtest) + + return result + + +# 示例使用 +if __name__ == '__main__': + # 示例股票名 + stocks = ['601222.SH', '605090.SH', '600025.SH'] + start_date = '2021-01-01' + end_date = '2023-01-01' + + # 示例权重分配 + stock_weights = [{'601222.SH': 0.5, '605090.SH': 0.3, '600025.SH': 0.2}] + + # 运行组合策略回测 + result = run_portfolio_backtest(stocks, start_date, end_date, stock_weights) + + # 打印回测结果 + result.plot() + result.display() diff --git a/src/combination/combination_dual_ma_strategy.py b/src/combination/combination_dual_ma_strategy.py new file mode 100644 index 0000000..1397979 --- /dev/null +++ b/src/combination/combination_dual_ma_strategy.py @@ -0,0 +1,215 @@ +import asyncio +import json +from datetime import datetime + +import bt +import numpy as np +import pandas as pd + +from src.utils.backtest_until import get_local_data, convert_pandas_to_json_serializable +from src.models import wance_data_storage_backtest, wance_data_stock +from src.tortoises_orm_config import init_tortoise + + +# 双均线策略函数 +async def create_dual_ma_strategy(data, short_window: int = 50, long_window: int = 200): + # 生成权重 + weights = await dual_ma_strategy(data, short_window, long_window) + + # 使用bt框架构建组合策略 + strategy = bt.Strategy('组合双均线策略', + [bt.algos.RunDaily(), + bt.algos.SelectAll(), + bt.algos.WeighTarget(weights), # 根据信号调整权重 + bt.algos.Rebalance()]) + return strategy + + +async def dual_ma_strategy(df, short_window=20, long_window=50): + """ + 基于双均线策略生成买卖信号。 + + 参数: + df: pd.DataFrame, 股票的价格数据,行索引为日期,列为股票代码。 + short_window: int, 短期均线窗口期。 + long_window: int, 长期均线窗口期。 + + 返回: + signal: pd.DataFrame, 每只股票的买卖信号,1 表示买入,0 表示卖出。 + """ + # 计算短期均线和长期均线 + short_ma = df.rolling(window=short_window, min_periods=1).mean() + long_ma = df.rolling(window=long_window, min_periods=1).mean() + + # 生成买入信号 + buy_signal = (short_ma > long_ma).astype(int) + + # 计算权重(例如:均等权重) + weights = buy_signal.div(buy_signal.sum(axis=1), axis=0).fillna(0) + + return weights + + +async def storage_backtest_data(source_column_name, result, signal, stock_data_series, short_window, long_window): + await init_tortoise() + + # 要存储的字段列表 + fields_to_store = [ + 'stock_code', 'strategy_name', 'stock_close_price', 'daily_price', + 'price', 'returns', 'data_start_time', 'data_end_time', + 'backtest_end_time', 'position', 'backtest_name', 'rf', 'total_return', 'cagr', + 'max_drawdown', 'calmar', 'mtd', 'three_month', + 'six_month', 'ytd', 'one_year', 'three_year', + 'five_year', 'ten_year', 'incep', 'daily_sharpe', + 'daily_sortino', 'daily_mean', 'daily_vol', + 'daily_skew', 'daily_kurt', 'best_day', 'worst_day', + 'monthly_sharpe', 'monthly_sortino', 'monthly_mean', + 'monthly_vol', 'monthly_skew', 'monthly_kurt', + 'best_month', 'worst_month', 'yearly_sharpe', + 'yearly_sortino', 'yearly_mean', 'yearly_vol', + 'yearly_skew', 'yearly_kurt', 'best_year', 'worst_year', + 'avg_drawdown', 'avg_drawdown_days', 'avg_up_month', + 'avg_down_month', 'win_year_perc', 'twelve_month_win_perc' + ] + + # 准备要存储的数据 + data_to_store = { + 'strategy_name': "组合双均线策略", + 'stock_close_price': json.dumps(stock_data_series.fillna(0).rename_axis('time').reset_index().assign( + time=stock_data_series.index.strftime('%Y%m%d')).set_index('time').to_dict(orient='index')), + 'daily_price': convert_pandas_to_json_serializable(result['daily_prices']), + 'price': convert_pandas_to_json_serializable(result['prices']), + 'returns': convert_pandas_to_json_serializable(result['returns'].fillna(0)), + 'data_start_time': pd.to_datetime(result.stats.loc["start"].iloc[0]).strftime('%Y%m%d'), + 'data_end_time': pd.to_datetime(result.stats.loc["end"].iloc[0]).strftime('%Y%m%d'), + 'backtest_end_time': int(datetime.now().strftime('%Y%m%d')), + 'position': convert_pandas_to_json_serializable(signal), + 'backtest_name': f'组合双均线策略 MA{short_window}-{long_window}日', + 'indicator_type': 'SMA', + 'indicator_information': json.dumps({'short_window': short_window, 'long_window': long_window}) + } + + # 使用循环填充其他字段 + for field in fields_to_store[12:]: # 从第10个字段开始 + value = result.stats.loc[field].iloc[0] + data_to_store[field] = 0.0 if (isinstance(value, float) and np.isnan(value)) else value + + # 检查是否存在该 backtest_name + existing_record = await wance_data_storage_backtest.WanceDataStorageBacktest.filter( + backtest_name=data_to_store['backtest_name'] + ).first() + + if existing_record: + # 如果存在,更新记录 + await wance_data_storage_backtest.WanceDataStorageBacktest.filter( + id=existing_record.id + ).update(**data_to_store) + else: + # 如果不存在,创建新的记录 + await wance_data_storage_backtest.WanceDataStorageBacktest.create(**data_to_store) + + return data_to_store + + +async def run_sma_backtest(field_list: list, + stock_list: list, + period: str = '1d', + start_time: str = '', + end_time: str = '', + count: int = 100, + dividend_type: str = 'none', + fill_data: bool = True, + data_dir: str = '', + short_window: int = 50, + long_window: int = 200): + try: + # 初始化一个列表用于存储每只股票的回测结果字典 + results_list = [] + + # 获取股票数据 + data = await get_local_data(field_list, stock_list, period, start_time, end_time, count, dividend_type, + fill_data, data_dir) + + # 创建组合数据 + combined_data = data[stock_list].copy() # 只选择需要的股票列 + + # 创建双均线策略 + strategy, signal = await create_dual_ma_strategy(combined_data, short_window=short_window, + long_window=long_window) + # 创建回测 + backtest = bt.Backtest(strategy=strategy, data=combined_data, initial_capital=100000) + # 运行回测 + result = bt.run(backtest) + + # 存储回测结果 + data_to_store = await storage_backtest_data(result, signal, combined_data, short_window, long_window) + results_list.append(data_to_store) + + return results_list # 返回结果列表 + + except Exception as e: + print(f"Error occurred: {e}") + + +async def start_sma_backtest_service(field_list: list, + stock_list: list, + period: str = '1d', + start_time: str = '', + end_time: str = '', + count: int = -1, + dividend_type: str = 'none', + fill_data: bool = True, + data_dir: str = '', + short_window: int = 50, + long_window: int = 200): + # 执行回测 + result = await run_sma_backtest( + field_list=field_list, + stock_list=stock_list, + period=period, + start_time=start_time, + end_time=end_time, + count=count, + dividend_type=dividend_type, + fill_data=fill_data, + data_dir=data_dir, + short_window=short_window, + long_window=long_window, + ) + return result + + +async def init_backtest_db(): + sma_list = [{"short_window": 5, "long_window": 10}, {"short_window": 10, "long_window": 30}, + {"short_window": 30, "long_window": 60}, {"short_window": 30, "long_window": 90}, + {"short_window": 70, "long_window": 140}, {"short_window": 120, "long_window": 250}] + await init_tortoise() + wance_db = await wance_data_stock.WanceDataStock.all() + sma_list_length = len(sma_list) + + for stock_code in wance_db: + for i in range(sma_list_length): + short_window = sma_list[i]['short_window'] + long_window = sma_list[i]['long_window'] + source_column_name = f'{stock_code.stock_code}' + + result = await start_sma_backtest_service( + field_list=[source_column_name], + stock_list=[stock_code.stock_code], + period='1d', + start_time='2022-01-01', + end_time='2022-09-01', + count=-1, + dividend_type='none', + fill_data=True, + data_dir='', + short_window=short_window, + long_window=long_window + ) + print( + f"Finished backtesting for {stock_code.stock_code} with short_window: {short_window}, long_window: {long_window}") + + +# 启动回测 +if __name__ == '__main__': + asyncio.run(init_backtest_db()) diff --git a/src/combination/combination_until.py b/src/combination/combination_until.py new file mode 100644 index 0000000..e69de29 diff --git a/src/combination/dual_moving_average.py b/src/combination/dual_moving_average.py new file mode 100644 index 0000000..f8785ee --- /dev/null +++ b/src/combination/dual_moving_average.py @@ -0,0 +1,268 @@ +import asyncio +import json +from datetime import datetime + +import bt +import numpy as np +import pandas as pd + +from src.utils.backtest_until import get_local_data, convert_pandas_to_json_serializable +from src.models import wance_data_storage_backtest, wance_data_stock +from src.tortoises_orm_config import init_tortoise + + +# 双均线策略函数 +async def create_dual_ma_strategy(data, stock_code: str, short_window: int = 50, long_window: int = 200): + # 生成双均线策略信号 + signal = await dual_ma_strategy(data, short_window, long_window) + + # 使用bt框架构建策略 + strategy = bt.Strategy(f'{stock_code} 双均线策略', + [bt.algos.RunDaily(), + bt.algos.SelectAll(), # 选择所有股票 + bt.algos.WeighTarget(signal), # 根据信号调整权重 + bt.algos.Rebalance()]) # 调仓 + return strategy, signal + + +async def dual_ma_strategy(df, short_window=20, long_window=50): + """ + 基于双均线策略生成买卖信号。 + + 参数: + df: pd.DataFrame, 股票的价格数据,行索引为日期,列为股票代码。 + short_window: int, 短期均线窗口期。 + long_window: int, 长期均线窗口期。 + + 返回: + signal: pd.DataFrame, 每只股票的买卖信号,1 表示买入,0 表示卖出。 + """ + # 计算短期均线和长期均线 + short_ma = df.rolling(window=short_window, min_periods=1).mean() + long_ma = df.rolling(window=long_window, min_periods=1).mean() + + # 生成买入信号: 当短期均线从下方穿过长期均线 + buy_signal = np.where(short_ma > long_ma, 1, np.nan) + + # 生成卖出信号: 当短期均线从上方穿过长期均线 + sell_signal = np.where(short_ma < long_ma, 0, np.nan) + + # 合并买卖信号 + signal = pd.DataFrame(buy_signal, index=df.index, columns=df.columns) + signal = np.where(short_ma < long_ma, 0, signal) + + # 前向填充信号,持仓不变 + signal = pd.DataFrame(signal, index=df.index, columns=df.columns).ffill() + + # 将剩余的 NaN 替换为 0 + signal = signal.fillna(0) + + return signal + + +async def storage_backtest_data(source_column_name, result, signal, stock_code, stock_data_series, short_window, + long_window): + await init_tortoise() + + # 要存储的字段列表 + fields_to_store = [ + 'stock_code', 'strategy_name', 'stock_close_price', 'daily_price', + 'price', 'returns', 'data_start_time', 'data_end_time', + 'backtest_end_time', 'position', 'backtest_name', 'rf', 'total_return', 'cagr', + 'max_drawdown', 'calmar', 'mtd', 'three_month', + 'six_month', 'ytd', 'one_year', 'three_year', + 'five_year', 'ten_year', 'incep', 'daily_sharpe', + 'daily_sortino', 'daily_mean', 'daily_vol', + 'daily_skew', 'daily_kurt', 'best_day', 'worst_day', + 'monthly_sharpe', 'monthly_sortino', 'monthly_mean', + 'monthly_vol', 'monthly_skew', 'monthly_kurt', + 'best_month', 'worst_month', 'yearly_sharpe', + 'yearly_sortino', 'yearly_mean', 'yearly_vol', + 'yearly_skew', 'yearly_kurt', 'best_year', 'worst_year', + 'avg_drawdown', 'avg_drawdown_days', 'avg_up_month', + 'avg_down_month', 'win_year_perc', 'twelve_month_win_perc' + ] + + # 准备要存储的数据 + data_to_store = { + 'stock_code': stock_code, + 'strategy_name': "双均线策略", + 'stock_close_price': json.dumps(stock_data_series.fillna(0).rename_axis('time').reset_index().assign( + time=stock_data_series.index.strftime('%Y%m%d')).set_index('time').to_dict(orient='index')), + 'daily_price': convert_pandas_to_json_serializable(result[source_column_name].daily_prices), + 'price': convert_pandas_to_json_serializable(result[source_column_name].prices), + 'returns': convert_pandas_to_json_serializable(result[source_column_name].returns.fillna(0)), + 'data_start_time': pd.to_datetime(result.stats.loc["start"].iloc[0]).strftime('%Y%m%d'), + 'data_end_time': pd.to_datetime(result.stats.loc["end"].iloc[0]).strftime('%Y%m%d'), + 'backtest_end_time': int(datetime.now().strftime('%Y%m%d')), + 'position': convert_pandas_to_json_serializable(signal), + 'backtest_name': f'{stock_code} 双均线策略 MA{short_window}-{long_window}日', + 'indicator_type': 'SMA', + 'indicator_information': json.dumps({'short_window': short_window, 'long_window': long_window}) + } + + # 使用循环填充其他字段 + for field in fields_to_store[12:]: # 从第10个字段开始 + value = result.stats.loc[field].iloc[0] + data_to_store[field] = 0.0 if (isinstance(value, float) and np.isnan(value)) else value + + # 检查是否存在该 backtest_name + existing_record = await wance_data_storage_backtest.WanceDataStorageBacktest.filter( + backtest_name=data_to_store['backtest_name'] + ).first() + + if existing_record: + # 如果存在,更新记录 + await wance_data_storage_backtest.WanceDataStorageBacktest.filter( + id=existing_record.id + ).update(**data_to_store) + else: + # 如果不存在,创建新的记录 + await wance_data_storage_backtest.WanceDataStorageBacktest.create(**data_to_store) + + return data_to_store + + +async def run_sma_backtest(field_list: list, + stock_list: list, + stock_weights: list, + user_id: int = 1, + period: str = '1d', + start_time: str = '', + end_time: str = '', + count: int = 100, + dividend_type: str = 'none', + fill_data: bool = True, + data_dir: str = '', + short_window: int = 50, + long_window: int = 200): + try: + # 初始化一个列表用于存储每只股票的回测结果字典 + results_list = [] + + # 遍历每只股票的数据(每列代表一个股票的收盘价) + data = await get_local_data(field_list, stock_list, period, start_time, end_time, count, dividend_type, + fill_data, + data_dir) + + for stock_code in stock_list: + + data_column_name = f'close_{stock_code}' + source_column_name = f'{stock_code} 双均线策略' + backtest_name = f'{stock_code} 双均线策略 MA{short_window}-{long_window}日' + now_data = int(datetime.now().strftime('%Y%m%d')) + db_result_data = await wance_data_storage_backtest.WanceDataStorageBacktest.filter( + backtest_name=backtest_name) + + if db_result_data: + if db_result_data[0].backtest_end_time == now_data: + results_list.append({source_column_name: db_result_data[0]}) + + if data_column_name in data.columns: + stock_data_series = data[[data_column_name]] # 提取该股票的收盘价 DataFrame + stock_data_series.columns = ['close'] # 重命名列为 'close' + + # 创建双均线策略 + strategy, signal = await create_dual_ma_strategy(stock_data_series, stock_code, + short_window=short_window, + long_window=long_window) + # 创建回测 + backtest = bt.Backtest(strategy=strategy, data=stock_data_series, initial_capital=100000) + # 运行回测 + result = bt.run(backtest) + # 存储回测结果 + data_to_store = await storage_backtest_data(source_column_name, result, signal, stock_code, + stock_data_series, + short_window, long_window) + # # 绘制回测结果图表 + # result.plot() + # # 绘制个别股票数据图表 + # plt.figure(figsize=(12, 6)) + # plt.plot(stock_data_series.index, stock_data_series['close'], label='Stock Price') + # plt.title(f'Stock Price for {stock_code}') + # plt.xlabel('Date') + # plt.ylabel('Price') + # plt.legend() + # plt.grid(True) + # plt.show() + # 将结果存储为字典并添加到列表中 + results_list.append({source_column_name: data_to_store}) + + else: + print(f"数据中缺少列: {data_column_name}") + + return results_list # 返回结果列表 + + except Exception as e: + print(f"Error occurred: {e}") + + +async def start_sma_combination_service(field_list: list, + stock_list: list, + stock_weights:list, + user_id:int = 1, + period: str = '1d', + start_time: str = '', + end_time: str = '', + count: int = -1, + dividend_type: str = 'none', + fill_data: bool = True, + data_dir: str = '', + short_window: int = 50, + long_window: int = 200): + for stock_code in stock_list: + backtest_name = f'{stock_code} 双均线策略 MA{short_window}-{long_window}日' + db_result = await wance_data_storage_backtest.WanceDataStorageBacktest.filter( + backtest_name=backtest_name) + now_time = int(datetime.now().strftime('%Y%m%d')) + + if db_result and db_result[0].backtest_end_time == now_time: + return db_result + else: + # 执行回测 + result = await run_sma_backtest( + field_list=field_list, + stock_list=stock_list, + period=period, + start_time=start_time, + end_time=end_time, + count=count, + dividend_type=dividend_type, + fill_data=fill_data, + data_dir=data_dir, + short_window=short_window, + long_window=long_window, + ) + return result + + +async def init_backtest_db(): + sma_list = [{"short_window": 5, "long_window": 10}, {"short_window": 10, "long_window": 30}, + {"short_window": 30, "long_window": 60}, {"short_window": 30, "long_window": 90}, + {"short_window": 70, "long_window": 140}, {"short_window": 120, "long_window": 250}] + await init_tortoise() + wance_db = await wance_data_stock.WanceDataStock.all() + sma_list_lenght = len(sma_list) + + for stock_code in wance_db: + for i in range(sma_list_lenght): + short_window = sma_list[i]['short_window'] + long_window = sma_list[i]['long_window'] + source_column_name = f'{stock_code} 双均线策略 MA{short_window}-{long_window}日' + result = await run_sma_backtest(field_list=['close', 'time'], + stock_list=[stock_code.stock_code], + short_window=short_window, + long_window=long_window) + + print(f"回测成功 {source_column_name}") + + +if __name__ == '__main__': + # 测试类的回测 + # asyncio.run(run_sma_backtest(field_list=['close', 'time'], + # stock_list=['601222.SH', '601677.SH'], + # short_window=10, + # long_window=30)) + + # 初始化数据库表 + asyncio.run(init_backtest_db()) diff --git a/src/combination/reverse_dual_ma_strategy.py b/src/combination/reverse_dual_ma_strategy.py new file mode 100644 index 0000000..84926d7 --- /dev/null +++ b/src/combination/reverse_dual_ma_strategy.py @@ -0,0 +1,261 @@ +import asyncio +import json +from datetime import datetime + +import bt +import numpy as np +import pandas as pd + +from src.utils.backtest_until import get_local_data, convert_pandas_to_json_serializable +from src.models import wance_data_storage_backtest, wance_data_stock +from src.tortoises_orm_config import init_tortoise + + +# 反双均线策略函数 +async def create_dual_ma_strategy(data, stock_code: str, short_window: int = 50, long_window: int = 200): + # 生成反双均线策略信号 + signal = await reverse_dual_ma_strategy(data, short_window, long_window) + + # 使用bt框架构建策略 + strategy = bt.Strategy(f'{stock_code} 反双均线策略', + [bt.algos.RunDaily(), + bt.algos.SelectAll(), # 选择所有股票 + bt.algos.WeighTarget(signal), # 根据信号调整权重 + bt.algos.Rebalance()]) # 调仓 + return strategy, signal + + + +# 定义反反双均线策略的函数 +def reverse_dual_ma_strategy(data, short_window=50, long_window=200): + """ + 反反双均线策略,当短期均线跌破长期均线时买入,穿过长期均线时卖出。 + + 参数: + data: pd.DataFrame, 股票的价格数据,行索引为日期,列为股票代码。 + short_window: int, 短期均线的窗口期。 + long_window: int, 长期均线的窗口期。 + + 返回: + signal: pd.DataFrame, 每只股票的买卖信号,1 表示买入,0 表示卖出。 + """ + # 计算短期均线和长期均线 + short_ma = data.rolling(window=short_window).mean() + long_ma = data.rolling(window=long_window).mean() + + # 初始化信号 DataFrame + signal = pd.DataFrame(index=data.index, columns=data.columns) + + # 生成买入信号:短期均线从上往下穿过长期均线 + for column in data.columns: + signal[column] = (short_ma[column] < long_ma[column]).astype(int) # 跌破时买入,信号为1 + signal[column] = (short_ma[column] > long_ma[column]).astype(int) * -1 + signal[column] # 穿过时卖出,信号为0 + + # 前向填充信号,保持持仓不变 + signal = signal.ffill() + + return signal + + +async def storage_backtest_data(source_column_name, result, signal, stock_code, stock_data_series, short_window, + long_window): + await init_tortoise() + + # 要存储的字段列表 + fields_to_store = [ + 'stock_code', 'strategy_name', 'stock_close_price', 'daily_price', + 'price', 'returns', 'data_start_time', 'data_end_time', + 'backtest_end_time', 'position', 'backtest_name', 'rf', 'total_return', 'cagr', + 'max_drawdown', 'calmar', 'mtd', 'three_month', + 'six_month', 'ytd', 'one_year', 'three_year', + 'five_year', 'ten_year', 'incep', 'daily_sharpe', + 'daily_sortino', 'daily_mean', 'daily_vol', + 'daily_skew', 'daily_kurt', 'best_day', 'worst_day', + 'monthly_sharpe', 'monthly_sortino', 'monthly_mean', + 'monthly_vol', 'monthly_skew', 'monthly_kurt', + 'best_month', 'worst_month', 'yearly_sharpe', + 'yearly_sortino', 'yearly_mean', 'yearly_vol', + 'yearly_skew', 'yearly_kurt', 'best_year', 'worst_year', + 'avg_drawdown', 'avg_drawdown_days', 'avg_up_month', + 'avg_down_month', 'win_year_perc', 'twelve_month_win_perc' + ] + + # 准备要存储的数据 + data_to_store = { + 'stock_code': stock_code, + 'strategy_name': "反双均线策略", + 'stock_close_price': json.dumps(stock_data_series.fillna(0).rename_axis('time').reset_index().assign( + time=stock_data_series.index.strftime('%Y%m%d')).set_index('time').to_dict(orient='index')), + 'daily_price': convert_pandas_to_json_serializable(result[source_column_name].daily_prices), + 'price': convert_pandas_to_json_serializable(result[source_column_name].prices), + 'returns': convert_pandas_to_json_serializable(result[source_column_name].returns.fillna(0)), + 'data_start_time': pd.to_datetime(result.stats.loc["start"].iloc[0]).strftime('%Y%m%d'), + 'data_end_time': pd.to_datetime(result.stats.loc["end"].iloc[0]).strftime('%Y%m%d'), + 'backtest_end_time': int(datetime.now().strftime('%Y%m%d')), + 'position': convert_pandas_to_json_serializable(signal), + 'backtest_name': f'{stock_code} 反双均线策略 MA{short_window}-{long_window}日', + 'indicator_type': 'reverse_SMA', + 'indicator_information': json.dumps({'short_window': short_window, 'long_window': long_window}) + } + + # 使用循环填充其他字段 + for field in fields_to_store[12:]: # 从第10个字段开始 + value = result.stats.loc[field].iloc[0] + data_to_store[field] = 0.0 if (isinstance(value, float) and np.isnan(value)) else value + + # 检查是否存在该 backtest_name + existing_record = await wance_data_storage_backtest.WanceDataStorageBacktest.filter( + backtest_name=data_to_store['backtest_name'] + ).first() + + if existing_record: + # 如果存在,更新记录 + await wance_data_storage_backtest.WanceDataStorageBacktest.filter( + id=existing_record.id + ).update(**data_to_store) + else: + # 如果不存在,创建新的记录 + await wance_data_storage_backtest.WanceDataStorageBacktest.create(**data_to_store) + + return data_to_store + + +async def run_reverse_reverse_SMA_backtest(field_list: list, + stock_list: list, + period: str = '1d', + start_time: str = '', + end_time: str = '', + count: int = 100, + dividend_type: str = 'none', + fill_data: bool = True, + data_dir: str = '', + short_window: int = 50, + long_window: int = 200): + try: + # 初始化一个列表用于存储每只股票的回测结果字典 + results_list = [] + + # 遍历每只股票的数据(每列代表一个股票的收盘价) + data = await get_local_data(field_list, stock_list, period, start_time, end_time, count, dividend_type, + fill_data, + data_dir) + + for stock_code in stock_list: + + data_column_name = f'close_{stock_code}' + source_column_name = f'{stock_code} 反双均线策略' + backtest_name = f'{stock_code} 反双均线策略 MA{short_window}-{long_window}日' + now_data = int(datetime.now().strftime('%Y%m%d')) + db_result_data = await wance_data_storage_backtest.WanceDataStorageBacktest.filter( + backtest_name=backtest_name) + + if db_result_data: + if db_result_data[0].backtest_end_time == now_data: + results_list.append({source_column_name: db_result_data[0]}) + + elif data_column_name in data.columns: + stock_data_series = data[[data_column_name]] # 提取该股票的收盘价 DataFrame + stock_data_series.columns = ['close'] # 重命名列为 'close' + + # 创建反双均线策略 + strategy, signal = await create_dual_ma_strategy(stock_data_series, stock_code, + short_window=short_window, + long_window=long_window) + # 创建回测 + backtest = bt.Backtest(strategy=strategy, data=stock_data_series, initial_capital=100000) + # 运行回测 + result = bt.run(backtest) + # 存储回测结果 + data_to_store = await storage_backtest_data(source_column_name, result, signal, stock_code, + stock_data_series, + short_window, long_window) + # # 绘制回测结果图表 + # result.plot() + # # 绘制个别股票数据图表 + # plt.figure(figsize=(12, 6)) + # plt.plot(stock_data_series.index, stock_data_series['close'], label='Stock Price') + # plt.title(f'Stock Price for {stock_code}') + # plt.xlabel('Date') + # plt.ylabel('Price') + # plt.legend() + # plt.grid(True) + # plt.show() + # 将结果存储为字典并添加到列表中 + results_list.append({source_column_name: data_to_store}) + + else: + print(f"数据中缺少列: {data_column_name}") + + return results_list # 返回结果列表 + + except Exception as e: + print(f"Error occurred: {e}") + + +async def start_reverse_SMA_combination_service(field_list: list, + stock_list: list, + period: str = '1d', + start_time: str = '', + end_time: str = '', + count: int = -1, + dividend_type: str = 'none', + fill_data: bool = True, + data_dir: str = '', + short_window: int = 50, + long_window: int = 200): + for stock_code in stock_list: + backtest_name = f'{stock_code} 反双均线策略 MA{short_window}-{long_window}日' + db_result = await wance_data_storage_backtest.WanceDataStorageBacktest.filter( + backtest_name=backtest_name) + now_time = int(datetime.now().strftime('%Y%m%d')) + + if db_result and db_result[0].backtest_end_time == now_time: + return db_result + else: + # 执行回测 + result = await run_reverse_reverse_SMA_backtest( + field_list=field_list, + stock_list=stock_list, + period=period, + start_time=start_time, + end_time=end_time, + count=count, + dividend_type=dividend_type, + fill_data=fill_data, + data_dir=data_dir, + short_window=short_window, + long_window=long_window, + ) + return result + + +async def init_backtest_db(): + reverse_SMA_list = [{"short_window": 5, "long_window": 10}, {"short_window": 10, "long_window": 30}, + {"short_window": 30, "long_window": 60}, {"short_window": 30, "long_window": 90}, + {"short_window": 70, "long_window": 140}, {"short_window": 120, "long_window": 250}] + await init_tortoise() + wance_db = await wance_data_stock.WanceDataStock.all() + reverse_SMA_list_lenght = len(reverse_SMA_list) + + for stock_code in wance_db: + for i in range(reverse_SMA_list_lenght): + short_window = reverse_SMA_list[i]['short_window'] + long_window = reverse_SMA_list[i]['long_window'] + source_column_name = f'{stock_code} 反双均线策略 MA{short_window}-{long_window}日' + result = await run_reverse_reverse_SMA_backtest(field_list=['close', 'time'], + stock_list=[stock_code.stock_code], + short_window=short_window, + long_window=long_window) + + print(f"回测成功 {source_column_name}") + + +if __name__ == '__main__': + # 测试类的回测 + # asyncio.run(run_reverse_SMA_backtest(field_list=['close', 'time'], + # stock_list=['601222.SH', '601677.SH'], + # short_window=10, + # long_window=30)) + + # 初始化数据库表 + asyncio.run(init_backtest_db()) diff --git a/src/combination/router.py b/src/combination/router.py new file mode 100644 index 0000000..81c83d9 --- /dev/null +++ b/src/combination/router.py @@ -0,0 +1,40 @@ +import json + +from fastapi import APIRouter + +from src.backtest.service import start_backtest_service, stock_chart_service +from src.combination.service import start_combination_service +from src.pydantic.backtest_request import BackRequest + +router = APIRouter() # 创建一个 FastAPI 路由器实例 + + +@router.get("/start_combination") +async def start_combination(request: BackRequest): + result = await start_combination_service(user_id=request.user_id, + field_list=['close', 'time'], + stock_weights=request.stock_weights, + stock_list=request.stock_list, + period=request.period, + start_time=request.start_time, + end_time=request.end_time, + count=request.count, + dividend_type=request.dividend_type, + fill_data=request.fill_data, + ma_type=request.ma_type, + short_window=request.short_window, + long_window=request.long_window + ) + return result + + +@router.get('/stock_chart') +async def stock_chart(request: BackRequest): + result = await stock_chart_service(stock_code=request.stock_code, + benchmark_code=request.benchmark_code) + return result + + +@router.get('/combination') +async def combination(request: BackRequest): + await combination_service() diff --git a/src/combination/rsi_strategy.py b/src/combination/rsi_strategy.py new file mode 100644 index 0000000..7501286 --- /dev/null +++ b/src/combination/rsi_strategy.py @@ -0,0 +1,293 @@ +import asyncio +import json +from datetime import datetime + +import bt +import numpy as np +import pandas as pd + +from src.utils.backtest_until import get_local_data, convert_pandas_to_json_serializable +from src.models import wance_data_storage_backtest, wance_data_stock +from src.tortoises_orm_config import init_tortoise + + +# RSI策略函数 +async def create_dual_ma_strategy(data, stock_code: str, short_window: int = 50, long_window: int = 200, + overbought: int = 70, oversold: int = 30): + # 生成RSI策略信号 + signal = await rsi_strategy(data, short_window, long_window, overbought, oversold) + + # 使用bt框架构建策略 + strategy = bt.Strategy(f'{stock_code} RSI策略', + [bt.algos.RunDaily(), + bt.algos.SelectAll(), # 选择所有股票 + bt.algos.WeighTarget(signal), # 根据信号调整权重 + bt.algos.Rebalance()]) # 调仓 + return strategy, signal + + +async def rsi_strategy(df, short_window=14, long_window=28, overbought=70, oversold=30): + """ + 基于RSI的策略生成买卖信号。 + + 参数: + df: pd.DataFrame, 股票的价格数据,行索引为日期,列为股票代码。 + short_window: int, 短期RSI的窗口期。 + long_window: int, 长期RSI的窗口期。 + overbought: int, 超买水平。 + oversold: int, 超卖水平。 + + 返回: + signal: pd.DataFrame, 每只股票的买卖信号,1 表示买入,0 表示卖出。 + """ + delta = df.diff().fillna(0) + + gain = (delta.where(delta > 0, 0).rolling(window=short_window).mean()).fillna(0) + loss = (-delta.where(delta < 0, 0).rolling(window=short_window).mean()).fillna(0) + + short_rsi = (100 - (100 / (1 + (gain / loss)))).fillna(0) + + long_gain = (delta.where(delta > 0, 0).rolling(window=long_window).mean()).fillna(0) + long_loss = (-delta.where(delta < 0, 0).rolling(window=long_window).mean()).fillna(0) + + long_rsi = (100 - (100 / (1 + (long_gain / long_loss)))).fillna(0) + + signal = pd.DataFrame(index=df.index, columns=df.columns) + + for column in df.columns: + signal[column] = np.where((short_rsi[column] < 30) & (long_rsi[column] < 30) & (short_rsi[column] != 0) & (long_rsi[column] != 0), 1, 0) + signal[column] = np.where((short_rsi[column] > 70) & (long_rsi[column] > 70) & (short_rsi[column] != 0) & (long_rsi[column] != 0), 0, signal[column]) + + return signal.ffill().fillna(0) + + +async def storage_backtest_data(source_column_name, result, signal, stock_code, stock_data_series, short_window: int, + long_window: int, overbought: int = 70, + oversold: int = 30): + await init_tortoise() + + # 要存储的字段列表 + fields_to_store = [ + 'stock_code', 'strategy_name', 'stock_close_price', 'daily_price', + 'price', 'returns', 'data_start_time', 'data_end_time', + 'backtest_end_time', 'position', 'backtest_name', 'rf', 'total_return', 'cagr', + 'max_drawdown', 'calmar', 'mtd', 'three_month', + 'six_month', 'ytd', 'one_year', 'three_year', + 'five_year', 'ten_year', 'incep', 'daily_sharpe', + 'daily_sortino', 'daily_mean', 'daily_vol', + 'daily_skew', 'daily_kurt', 'best_day', 'worst_day', + 'monthly_sharpe', 'monthly_sortino', 'monthly_mean', + 'monthly_vol', 'monthly_skew', 'monthly_kurt', + 'best_month', 'worst_month', 'yearly_sharpe', + 'yearly_sortino', 'yearly_mean', 'yearly_vol', + 'yearly_skew', 'yearly_kurt', 'best_year', 'worst_year', + 'avg_drawdown', 'avg_drawdown_days', 'avg_up_month', + 'avg_down_month', 'win_year_perc', 'twelve_month_win_perc' + ] + + # 准备要存储的数据 + data_to_store = { + 'stock_code': stock_code, + 'strategy_name': "RSI策略", + 'stock_close_price': json.dumps(stock_data_series.fillna(0).rename_axis('time').reset_index().assign( + time=stock_data_series.index.strftime('%Y%m%d')).set_index('time').to_dict(orient='index')), + 'daily_price': convert_pandas_to_json_serializable(result[source_column_name].daily_prices), + 'price': convert_pandas_to_json_serializable(result[source_column_name].prices), + 'returns': convert_pandas_to_json_serializable(result[source_column_name].returns.fillna(0)), + 'data_start_time': pd.to_datetime(result.stats.loc["start"].iloc[0]).strftime('%Y%m%d'), + 'data_end_time': pd.to_datetime(result.stats.loc["end"].iloc[0]).strftime('%Y%m%d'), + 'backtest_end_time': int(datetime.now().strftime('%Y%m%d')), + 'position': convert_pandas_to_json_serializable(signal), + 'backtest_name': f'{stock_code} RSI策略 RSI{short_window}-RSI{long_window}-overbought{overbought}-oversold{oversold}', + 'indicator_type': 'RSI', + 'indicator_information': json.dumps( + {'short_window': short_window, 'long_window': long_window, 'overbought': overbought, 'oversold': oversold}) + } + + # 使用循环填充其他字段 + for field in fields_to_store[12:]: # 从第12个字段开始 + value = result.stats.loc[field].iloc[0] + + if isinstance(value, float): + if np.isnan(value): + data_to_store[field] = 0.0 # NaN 处理为 0 + elif np.isinf(value): # 判断是否为无穷大或无穷小 + if value > 0: + data_to_store[field] = 99999.9999 # 正无穷处理 + else: + data_to_store[field] = -99999.9999 # 负无穷处理 + else: + data_to_store[field] = value # 正常的浮点值 + else: + data_to_store[field] = value # 非浮点类型保持不变 + + # 检查是否存在该 backtest_name + existing_record = await wance_data_storage_backtest.WanceDataStorageBacktest.filter( + backtest_name=data_to_store['backtest_name'] + ).first() + + if existing_record: + # 如果存在,更新记录 + await wance_data_storage_backtest.WanceDataStorageBacktest.filter( + id=existing_record.id + ).update(**data_to_store) + else: + # 如果不存在,创建新的记录 + await wance_data_storage_backtest.WanceDataStorageBacktest.create(**data_to_store) + + return data_to_store + + +async def run_rsi_backtest(field_list: list, + stock_list: list, + period: str = '1d', + start_time: str = '', + end_time: str = '', + count: int = 100, + dividend_type: str = 'none', + fill_data: bool = True, + data_dir: str = '', + short_window: int = 50, + long_window: int = 200, + overbought: int = 70, + oversold: int = 30 + ): + try: + # 初始化一个列表用于存储每只股票的回测结果字典 + results_list = [] + + # 遍历每只股票的数据(每列代表一个股票的收盘价) + data = await get_local_data(field_list, stock_list, period, start_time, end_time, count, dividend_type, + fill_data, + data_dir) + + for stock_code in stock_list: + + data_column_name = f'close_{stock_code}' + source_column_name = f'{stock_code} RSI策略' + backtest_name = f'{stock_code} RSI策略 RSI{short_window}-RSI{long_window}' + now_data = int(datetime.now().strftime('%Y%m%d')) + db_result_data = await wance_data_storage_backtest.WanceDataStorageBacktest.filter( + backtest_name=backtest_name) + + if db_result_data: + if db_result_data[0].backtest_end_time == now_data: + results_list.append({source_column_name: db_result_data[0]}) + + # elif data_column_name in data.columns: + if data_column_name in data.columns: + stock_data_series = data[[data_column_name]] # 提取该股票的收盘价 DataFrame + stock_data_series.columns = ['close'] # 重命名列为 'close' + + # 创建RSI策略 + strategy, signal = await create_dual_ma_strategy(stock_data_series, stock_code, + short_window=short_window, long_window=long_window) + # 创建回测 + backtest = bt.Backtest(strategy=strategy, data=stock_data_series, initial_capital=100000) + # 运行回测 + result = bt.run(backtest) + # 存储回测结果 + data_to_store = await storage_backtest_data(source_column_name, result, signal, stock_code, + stock_data_series, short_window, long_window, overbought, + oversold) + # # 绘制回测结果图表 + # result.plot() + # # 绘制个别股票数据图表 + # plt.figure(figsize=(12, 6)) + # plt.plot(stock_data_series.index, stock_data_series['close'], label='Stock Price') + # plt.title(f'Stock Price for {stock_code}') + # plt.xlabel('Date') + # plt.ylabel('Price') + # plt.legend() + # plt.grid(True) + # plt.show() + # 将结果存储为字典并添加到列表中 + results_list.append({source_column_name: data_to_store}) + + else: + print(f"数据中缺少列: {data_column_name}") + + return results_list # 返回结果列表 + + except Exception as e: + print(f"Error occurred: {e}") + + +async def start_rsi_combination_service(field_list: list, + stock_list: list, + period: str = '1d', + start_time: str = '', + end_time: str = '', + count: int = -1, + dividend_type: str = 'none', + fill_data: bool = True, + data_dir: str = '', + short_window: int = 50, + long_window: int = 200, + overbought: int = 70, + oversold: int = 30 + ): + for stock_code in stock_list: + backtest_name = f'{stock_code} RSI策略 RSI{short_window}-RSI{long_window}' + db_result = await wance_data_storage_backtest.WanceDataStorageBacktest.filter( + backtest_name=backtest_name) + now_time = int(datetime.now().strftime('%Y%m%d')) + + if db_result and db_result[0].backtest_end_time == now_time: + return db_result + else: + # 执行回测 + result = await run_rsi_backtest( + field_list=field_list, + stock_list=stock_list, + period=period, + start_time=start_time, + end_time=end_time, + count=count, + dividend_type=dividend_type, + fill_data=fill_data, + data_dir=data_dir, + short_window=short_window, + long_window=long_window, + overbought=overbought, + oversold=oversold + ) + return result + + +async def init_backtest_db(): + sma_list = [{"short_window": 3, "long_window": 6}, {"short_window": 6, "long_window": 12}, + {"short_window": 12, "long_window": 24}, {"short_window": 14, "long_window": 18}, + {"short_window": 15, "long_window": 10}] + await init_tortoise() + wance_db = await wance_data_stock.WanceDataStock.all() + sma_list_lenght = len(sma_list) + + for stock_code in wance_db: + for i in range(sma_list_lenght): + short_window = sma_list[i]['short_window'] + long_window = sma_list[i]['long_window'] + source_column_name = f'{stock_code} RSI策略 RSI{short_window}-RSI{long_window}' + result = await start_rsi_backtest_service(field_list=['close', 'time'], + stock_list=[stock_code.stock_code], + short_window=short_window, + long_window=long_window, + overbought=70, + oversold=30) + + print(f"回测成功 {source_column_name}") + + +if __name__ == '__main__': + # 测试类的回测 + asyncio.run(run_rsi_backtest(field_list=['close', 'time'], + stock_list=['601222.SH', '601677.SH'], + count=-1, + short_window=10, + long_window=30, + overbought=70, + oversold=30 + )) + + # # 初始化数据库表 + # asyncio.run(init_backtest_db()) diff --git a/src/combination/service.py b/src/combination/service.py new file mode 100644 index 0000000..af431e3 --- /dev/null +++ b/src/combination/service.py @@ -0,0 +1,124 @@ +import asyncio + +from xtquant import xtdata + +from src.combination.bollinger_bands import start_bollinger_combination_service +from src.combination.dual_moving_average import start_sma_combination_service +from src.combination.reverse_dual_ma_strategy import start_reverse_SMA_combination_service +from src.combination.rsi_strategy import start_rsi_combination_service +from src.pydantic.backtest_request import BackRequest +from src.utils.backtest_until import data_check, data_processing, to_json_serializable + + +async def start_combination_service(field_list: list, + stock_list: list, + stock_weights: list, + period: str = '1d', + start_time: str = '', + end_time: str = '', + count: int = -1, + dividend_type: str = 'none', + fill_data: bool = True, + data_dir: str = '', + ma_type: str = 'SMA', + short_window: int = 50, + long_window: int = 200, + bollingerMA: int = 200, + std_dev: int = 200, + overbought: int = 70, + oversold: int = 30, + signal_window: int = 9, + user_id: int = 1): + # 数据检查 + await data_check(field_list=field_list, + stock_list=stock_list, + period=period, + start_time=start_time, + end_time=end_time, + count=count, + dividend_type=dividend_type, + fill_data=fill_data, + data_dir=data_dir) + + # 策略映射 + strategies = { + 'SMA': start_sma_combination_service, + 'Bollinger': start_bollinger_combination_service, + 'RSI': start_rsi_combination_service, + 'RESMA': start_reverse_SMA_combination_service, + 'MACD': start_rsi_combination_service + } + + # 通用参数 + base_params = { + 'field_list': field_list, + 'stock_list': stock_list, + 'period': period, + 'start_time': start_time, + 'end_time': end_time, + 'count': count, + 'dividend_type': dividend_type, + 'fill_data': fill_data, + 'data_dir': data_dir, + } + + # 特定策略参数 + strategy_params = { + 'SMA': {'short_window': short_window, 'long_window': long_window}, + 'Bollinger': {'bollingerMA': bollingerMA, 'std_dev': std_dev}, + 'RSI': {'short_window': short_window, 'long_window': long_window, 'overbought': overbought, + 'oversold': oversold}, + 'RESMA': {'short_window': short_window, 'long_window': long_window}, + 'MACD': {'short_window': short_window, 'long_window': signal_window} + } + + # 选择策略并执行 + strategy_func = strategies.get(ma_type) + if strategy_func: + result = await strategy_func(**base_params, stock_weights=stock_weights, user_id=user_id, + **strategy_params[ma_type]) + return result + else: + return None + + +async def stock_chart_service(stock_code: str, benchmark_code: str = None): + result_list = [] + + # 获取本地数据并进行处理 + result = xtdata.get_local_data(field_list=[], stock_list=[stock_code], period='1d', + start_time='', end_time='', count=-1, dividend_type='none', fill_data=True, + data_dir='') + df = await data_processing(result) + + # 计算5日、10日、30日均线 + # 计算移动平均并填充 NaN 为 0 + df['MA5'] = df[f'close_{stock_code}'].rolling(window=5).mean().fillna(0) + df['MA10'] = df[f'close_{stock_code}'].rolling(window=10).mean().fillna(0) + df['MA30'] = df[f'close_{stock_code}'].rolling(window=30).mean().fillna(0) + + result_list.append(to_json_serializable(df)) + + if benchmark_code is not None: + # 获取指数数据并进行处理 + benchmark_result = xtdata.get_local_data(field_list=['close', 'time'], stock_list=[benchmark_code], period='1d', + start_time='', end_time='', count=-1, dividend_type='none', + fill_data=True, data_dir='') + benchmark_df = await data_processing(benchmark_result) + + # 计算5日、10日、30日均线 + benchmark_df['MA5'] = benchmark_df[f'close_{benchmark_code}'].rolling(window=5).mean() + benchmark_df['MA10'] = benchmark_df[f'close_{benchmark_code}'].rolling(window=10).mean() + benchmark_df['MA30'] = benchmark_df[f'close_{benchmark_code}'].rolling(window=30).mean() + + result_list.append(to_json_serializable(benchmark_df)) + + return result_list + + +async def combination_service(request: BackRequest): + pass + + +if __name__ == '__main__': + result = asyncio.run(stock_chart_service(stock_code="600051.SH", benchmark_code="000300.SH")) diff --git a/src/main.py b/src/main.py index 9f9cac5..69470e3 100644 --- a/src/main.py +++ b/src/main.py @@ -7,6 +7,7 @@ from src.exceptions import register_exception_handler from src.tortoises import register_tortoise_orm from src.xtdata.router import router as xtdata_router from src.backtest.router import router as backtest_router +from src.combination.router import router as combine_router from xtquant import xtdata from src.settings.config import app_configs, settings @@ -20,8 +21,9 @@ register_tortoise_orm(app) register_exception_handler(app) -app.include_router(xtdata_router, prefix="/getwancedata", tags=["盘口数据"]) -app.include_router(backtest_router, prefix="/backtest", tags=["盘口数据"]) +app.include_router(xtdata_router, prefix="/getwancedata", tags=["数据接口"]) +app.include_router(backtest_router, prefix="/backtest", tags=["回测接口"]) +app.include_router(combine_router, prefix="/combine", tags=["组合接口"]) if settings.ENVIRONMENT.is_deployed: @@ -49,4 +51,4 @@ async def root(): if __name__ == "__main__": - uvicorn.run('src.main:app', host="0.0.0.0", port=8011, reload=True) + uvicorn.run('src.main:app', host="0.0.0.0", port=8012, reload=True) diff --git a/src/models/back_observed_data.py b/src/models/back_observed_data.py index ce6662e..d406fe9 100644 --- a/src/models/back_observed_data.py +++ b/src/models/back_observed_data.py @@ -10,4 +10,4 @@ class BackObservedData(Model): class Meta: - table = with_table_name("back_observed_data") \ No newline at end of file + table = with_table_name("user_combination_history") \ No newline at end of file diff --git a/src/models/user_combination_history.py b/src/models/user_combination_history.py new file mode 100644 index 0000000..6d97144 --- /dev/null +++ b/src/models/user_combination_history.py @@ -0,0 +1,13 @@ +from tortoise import Model, fields +from src.models import with_table_name + + +class UserCombinationHistory(Model): + id = fields.IntField() + last_time = fields.CharField(max_length=10, null=True, description='最后一次使用的时间', ) + stock_code = fields.CharField(max_length=20, null=True, description='股票代码', ) + stock_name = fields.CharField(max_length=20, null=True, description='股票名称', ) + stock_weights = fields.JSONField(null=True, description='权重存储', ) + strategy = fields.JSONField(null=True, description='用户使用的策略参数', ) + strategy_name = fields.CharField(max_length=50, null=True, description='用户使用的策略', ) + user_id = fields.IntField(description='用户的id', ) diff --git a/src/pydantic/backtest_request.py b/src/pydantic/backtest_request.py index 905e025..db20984 100644 --- a/src/pydantic/backtest_request.py +++ b/src/pydantic/backtest_request.py @@ -1,4 +1,4 @@ -from typing import List, Optional +from typing import List, Optional, Dict from pydantic import BaseModel, Field @@ -6,7 +6,9 @@ from pydantic import BaseModel, Field class BackRequest(BaseModel): field_list: List[str] = Field(default_factory=list, description="字段列表,用于指定获取哪些数据字段") stock_list: List[str] = Field(default_factory=list, description="股票列表,用于指定获取哪些股票的数据") + stock_weights: List[Dict[str, float]] = Field(default_factory=list, description="股票与其对应权重的字典列表") stock_code: str = Field(default="000300.SH", description="股票代码,用于指定获取哪些股票的数据") + benchmark_code: str = Field(default="000300.SH", description="股票代码,用于指定获取哪些股票的数据") period: str = Field(default='1d', description="数据周期,如 '1d' 表示日线数据") start_time: Optional[str] = Field(default='', description="开始时间,格式为 'YYYY-MM-DD',默认为空字符串") end_time: Optional[str] = Field(default='', description="结束时间,格式为 'YYYY-MM-DD',默认为空字符串") @@ -26,4 +28,5 @@ class BackRequest(BaseModel): overbought: int = Field(default=70, description="超买区间的RSI阈值,表示价格处于相对高点,可能面临回调") oversold: int = Field(default=30, description="超卖区间的RSI阈值,表示价格处于相对低点,可能面临反弹") signal_window: int = Field(default=9, description="超卖区间的RSI阈值,表示价格处于相对低点,可能面临反弹") + user_id: int = Field(default=1, description="用户id,默认为 1 表示获取当前组合的所属用户") diff --git a/src/tortoises.py b/src/tortoises.py index a9db0fe..3f95d63 100644 --- a/src/tortoises.py +++ b/src/tortoises.py @@ -66,7 +66,8 @@ models = [ "src.models.stock_history", "src.models.stock_data_processing", "src.models.wance_data_stock", - "src.models.wance_data_storage_backtest" + "src.models.wance_data_storage_backtest", + "src.models.user_combination_history" ] diff --git a/src/backtest/until.py b/src/utils/backtest_until.py similarity index 70% rename from src/backtest/until.py rename to src/utils/backtest_until.py index 253919c..264d59b 100644 --- a/src/backtest/until.py +++ b/src/utils/backtest_until.py @@ -33,12 +33,12 @@ async def data_processing(result_local): df.set_index('time', inplace=True) # 将 'close' 列重命名为 'close_股票代码' df.rename(columns={'close': f'close_{stock_code}'}, inplace=True) - # 将 DataFrame 添加到列表中 - df_list.append(df[[f'close_{stock_code}']]) # 只保留 'close_股票代码' 列 + # 将所有列添加到列表中 + df_list.append(df) # 保留所有字段,包括重命名后的 'close_股票代码' else: print(f"数据格式错误: {stock_code} 不包含 DataFrame") - # 使用 pd.concat() 将所有 DataFrame 合并为一个大的 DataFrame + # 使用 pd.concat() 将所有 DataFrame 合并为一个大的 DataFrame,保留所有列 combined_df = pd.concat(df_list, axis=1) # 确保返回的 DataFrame 索引是日期格式 @@ -47,6 +47,7 @@ async def data_processing(result_local): return combined_df + def convert_pandas_to_json_serializable(data: pd.Series) -> str: """ 将 Pandas Series 或 DataFrame 中的 Timestamp 索引转换为字符串,并返回 JSON 可序列化的结果。 @@ -76,6 +77,49 @@ def convert_pandas_to_json_serializable(data: pd.Series) -> str: raise ValueError("输入必须为 Pandas Series 或 DataFrame") +def clean_value(value): + """清理数据值,使其适合 JSON 序列化。""" + if pd.isna(value) or np.isinf(value): + return 0 # 或者可以使用 None 代替 + return value + + +def to_json_serializable(data): + """ + 将 Pandas DataFrame 转换为 JSON 可序列化的格式。 + - 将索引(如果是时间戳)转换为 'YYYYMMDD' 格式。 + - 将数据按时间升序排列。 + - 返回数组形式的结构,内部仍使用字典存放每一行数据。 + """ + result = [] + + if isinstance(data, pd.DataFrame): + # 将索引(时间戳)转换为 YYYYMMDD 格式 + data.index = data.index.strftime('%Y%m%d') + + # 按时间(索引)升序排序 + data = data.sort_index(ascending=True) + + # 遍历 DataFrame 的每一行 + for idx, row in data.iterrows(): + # 创建一个字典来存放当前行的数据 + row_dict = {'date': idx} # 将日期作为键 + # 清理行中的每个值 + for col in row.index: + row_dict[col] = clean_value(row[col]) # 清理每个值 + result.append(row_dict) # 将字典添加到结果数组中 + + return result + + elif isinstance(data, pd.Series): + # 将 Series 转换为数组形式(每一行的数据以字典形式存放) + for idx in data.index: + result.append({str(idx.strftime('%Y%m%d')): clean_value(data[idx])}) + return result + + else: + raise TypeError("输入数据必须是 Pandas 的 DataFrame 或 Series") + async def data_check(field_list: list, stock_list: list, period: str = '1d', diff --git a/src/xtdata/router.py b/src/xtdata/router.py index af619b7..b4da9b7 100644 --- a/src/xtdata/router.py +++ b/src/xtdata/router.py @@ -158,6 +158,7 @@ async def subscribe_whole_quote(request: DataRequest): # 调用服务订阅所有股票的报价 result = await service.subscribe_whole_quote_service(code_list=request.code_list, callback=request.callback) + # return result # 返回响应 return response_list_response(data=result, status_code=200, message="Success") # 返回响应 diff --git a/src/xtdata/service.py b/src/xtdata/service.py index 76708be..e7813b1 100644 --- a/src/xtdata/service.py +++ b/src/xtdata/service.py @@ -1,12 +1,7 @@ -import asyncio -import json -from datetime import datetime, timedelta - import numpy as np # 导入 numpy 库 -from tortoise.expressions import Q from xtquant import xtdata # 导入 xtquant 库的 xtdata 模块 -from src.backtest.until import convert_pandas_to_json_serializable +from src.utils.backtest_until import convert_pandas_to_json_serializable from src.models.wance_data_stock import WanceDataStock from src.pydantic.factor_request import StockQuery from src.utils.history_data_processing_utils import translation_dict