wance_data/src/data_processing/history_data_processing.py

413 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
from datetime import datetime
import akshare as ak
import pandas as pd
from xtquant import xtdata
from src.models import wance_data_stock
from src.tortoises_orm_config import init_tortoise
from src.utils.history_data_processing_utils import translation_dict, get_best_match, safe_get_value, on_progress
from src.utils.split_stock_utils import split_stock_code, join_stock_code, percent_to_float
from src.xtdata.service import download_history_data_service, get_full_tick_keys_service, download_history_data2_service
# period - 周期,用于表示要获取的周期和具体数据类型
period_list = ["1d", "1h", "30m", "15m", "5m", "1m", "tick", "1w", "1mon", "1q", "1hy", "1y"]
# 数据的列名
columns = ['open', 'high', 'low', 'close', 'volume', 'amount', 'settelmentPrice',
'openInterest', 'preClose', 'suspendFlag']
def processing_data(field_list: list, stock_list: list, period: str, start_time: str, end_time: str, count: int,
dividend_type: str, fill_data: bool):
"""
:param field_list: []
:param stock_list: ["186511.SH","173312.SH","231720.SH","173709.SH","019523.SH"]
:param period: "1d"
:param start_time: "20240506"
:param end_time: ""
:param count: -1
:param dividend_type: "none"
:param fill_data: False
:return:
"""
try:
# 获取本地数据
result_local = xtdata.get_local_data(field_list=field_list,
stock_list=stock_list,
period=period,
start_time=start_time,
end_time=end_time,
count=count,
dividend_type=dividend_type,
fill_data=fill_data,
data_dir=""
)
# 初始化一个空的列表,用于存储每个股票的数据框
df_list = []
# 遍历字典中的 DataFrame
for stock_code, df in result_local.items():
# 确保 df 是一个 DataFrame
if isinstance(df, pd.DataFrame):
# 将时间戳转换为日期时间格式,并格式化为字符串 'YYYYMMDD'
df['time'] = pd.to_datetime(df['time'], unit='ms').dt.strftime('%Y%m%d')
# 将 'time' 列设置为索引
df.set_index('time', inplace=True)
# 指定列名
df.columns = columns
# 添加一列 'stock_code' 用于标识不同的股票
df['stock_code'] = stock_code
# 将 DataFrame 添加到列表中
df_list.append(df)
else:
print(f"数据格式错误: {stock_code} 不包含 DataFrame")
# 使用 pd.concat() 将所有 DataFrame 合并为一个大的 DataFrame
combined_df = pd.concat(df_list)
# 打印合并后的 DataFrame
print(combined_df)
print(f"开始获取股票数据{result_local}")
except Exception as e:
print(f"处理数据发生错误: {str(e)}")
def history_data_processing():
"""
本地路径 D:\\e海方舟-量化交易版\\userdata_mini\\datadir
:return:
"""
try:
result = xtdata.get_full_tick(code_list=['SH', 'SZ'])
result = list(result.keys())
# for key in result:
# xtdata.download_history_data(stock_code=key, period="1d", start_time="", end_time="", incrementally="")
datas = xtdata.get_local_data(field_list=[], stock_list=result, period='1d', start_time='', end_time='',
count=-1,
dividend_type='none', fill_data=True)
# datas = xtdata.download_history_data2(stock_list=result, period="1d", start_time="", end_time="",
# callback=on_progress)
print(datas, "这里是返回的数据")
except Exception as e:
print(f"处理数据发生错误: {str(e)}")
async def init_indicator():
await init_tortoise()
# 从数据库中获取股票列表
stock_list = await wance_data_stock.WanceDataStock.filter(stock_type__contains=["stock"]).all()
stock_zh_a_spot_em_df = ak.stock_zh_a_spot_em()
# 遍历股票列表拿到股票实体
for stock in stock_list:
try:
# 使用 akshare 获取股票指标数据
stock_code = stock.stock_code # 假设 stock_code 是股票代码的字段
stock_code_suffix = split_stock_code(stock_code) # 提取股票代码部分
stock_code_xq = join_stock_code(stock_code_suffix)
stock_code_front = stock_code_suffix[0]
# 筛选匹配的行
match = stock_zh_a_spot_em_df.loc[stock_zh_a_spot_em_df['代码'] == stock_code_front, '涨跌幅']
# 从akshare中获取数据
stock_a_indicator_lg_df = ak.stock_a_indicator_lg(symbol=stock_code_front) # 乐咕乐股-A 股个股指标: 市盈率, 市净率, 股息率
stock_financial_abstract_ths_df = ak.stock_financial_abstract_ths(symbol=stock_code_front,
indicator="按报告期") # 同花顺-财务指标-主要指标
stock_financial_abstract_df = ak.stock_financial_abstract(symbol=stock_code_front) # 新浪财经-财务报表-关键指标
stock_individual_spot_xq_df = ak.stock_individual_spot_xq(symbol=stock_code_xq) # 雪球-行情中心-个股
stock_zh_valuation_baidu_df = ak.stock_zh_valuation_baidu(symbol=stock_code_front, indicator="市现率",
period="近一年") # 百度股市通-A 股-财务报表-估值数据
stock_fhps_detail_em_df = ak.stock_fhps_detail_em(symbol=stock_code_front) # 东方财富网-数据中心-分红送配-分红送配详情
# 查询数据库中是否已有该股票的数据
existing_record = await wance_data_stock.WanceDataStock.filter(stock_code=stock_code).first()
if existing_record is None:
print(f"未找到股票记录: {stock_code}")
continue
# 处理并插入数据到数据库
last_indicator_row = stock_a_indicator_lg_df.iloc[-1]
last_abstract_row = stock_financial_abstract_ths_df.iloc[0]
last_financial_row = stock_financial_abstract_df.iloc[:, 2]
last_spot_xq_row = stock_individual_spot_xq_df.iloc[:, 1]
last_baidu_df_row = stock_zh_valuation_baidu_df.iloc[-1]
last_detail_em_row = stock_fhps_detail_em_df.iloc[-1]
# 更新字段的对应数据
# 每股指标模块
existing_record.financial_dividend = safe_get_value(last_detail_em_row.get('现金分红-现金分红比例'))
existing_record.financial_ex_gratia = safe_get_value(last_spot_xq_row[8])
existing_record.financial_cash_flow = safe_get_value(last_financial_row[10])
existing_record.financial_asset_value = safe_get_value(last_financial_row[9])
existing_record.financial_reserve_per = safe_get_value(last_abstract_row.get('每股资本公积金'))
existing_record.financial_undistributed_profit = safe_get_value(last_abstract_row.get('每股未分配利润'))
# 盈利能力模块
existing_record.profit_asset_value = safe_get_value(last_financial_row.iloc[33])
existing_record.profit_sale_ratio = safe_get_value(last_financial_row.iloc[43])
existing_record.profit_gross_rate = safe_get_value(last_financial_row.iloc[44])
existing_record.profit_business_increase = safe_get_value(last_financial_row.iloc[53])
existing_record.profit_dividend_rate = safe_get_value(last_spot_xq_row.iloc[26])
# 成长能力
existing_record.growth_Income_rate = percent_to_float(
safe_get_value(last_abstract_row.get('营业总收入同比增长率', 0.0)))
existing_record.growth_growth_rate = safe_get_value(last_financial_row.iloc[46])
existing_record.growth_nonnet_profit = percent_to_float(
safe_get_value(last_abstract_row.get('扣非净利润同比增长率', 0.0)))
existing_record.growth_attributable_rate = safe_get_value(last_financial_row.iloc[54])
# 估值指标
existing_record.valuation_PEGTTM_ratio = safe_get_value(last_indicator_row.get('pe_ttm'))
existing_record.valuation_PEG_percentile = safe_get_value(last_indicator_row.get('pe'))
existing_record.valuation_PB_TTM = safe_get_value(last_indicator_row.get('ps'))
existing_record.valuation_PB_percentile = safe_get_value(last_indicator_row.get('pb'))
existing_record.valuation_PTS_TTM = safe_get_value(last_indicator_row.get('dv_ratio'))
existing_record.valuation_PTS_percentile = safe_get_value(last_indicator_row.get('ps_ttm'))
existing_record.valuation_market_TTM = safe_get_value(last_baidu_df_row[-1])
existing_record.valuation_market_percentile = safe_get_value(1 / last_baidu_df_row[-1] * 100) if \
last_baidu_df_row[-1] != 0 else 0
# 行情指标
existing_record.market_indicator = safe_get_value(match.values[0]) if not match.empty else 0
# 保存更改
await existing_record.save()
print(f"更新股票指标成功!: {stock_code}")
except Exception as e:
print(f"处理股票 {stock_code} 时发生错误: {e}")
continue # 继续处理下一个股票
async def init_stock_pool(incremental: bool = False):
"""
初始化股票池参数,包括股票名、股票代码、股票上市时间、股票板块等信息。
@param incremental: 是否执行增量下载
@type incremental: bool
"""
await init_tortoise()
# 获取所有现有股票代码
existing_stocks = set()
if incremental:
existing_stocks = {stock.stock_code for stock in await wance_data_stock.WanceDataStock.all()}
# 初始化股票池
tick_result = xtdata.get_full_tick(['SH', 'SZ'])
stocks_to_create = [] # 使用一个列表批量创建
for key in tick_result.keys():
if incremental and key in existing_stocks:
print(f"股票代码 {key} 已经存在,跳过...\n")
continue
detail_result = xtdata.get_instrument_detail(key, False)
InstrumentName_result = detail_result.get("InstrumentName")
start_time = detail_result.get("OpenDate") or detail_result.get("CreateDate")
end_time = datetime.now().strftime('%Y%m%d')
time_expire = detail_result.get("ExpireDate")
type_dict = xtdata.get_instrument_type(key)
type_list = []
if type_dict:
for i in type_dict.keys():
type_list.append(i)
# 只在 type_list 包含 "stock" 时继续执行
if "stock" in type_list:
# 检查股票是否已存在
existing_stock = await wance_data_stock.WanceDataStock.filter(stock_code=key).first()
if not existing_stock:
stocks_to_create.append(
wance_data_stock.WanceDataStock(
stock_code=key,
stock_name=InstrumentName_result,
stock_sector=[], # 初始化为空列表,后续会加入板块
stock_type=type_list,
time_start=start_time,
time_end=end_time,
time_expire=time_expire,
market_sector=key.rsplit('.', 1)[-1]
)
)
print(f"加载成功 股票名称 {InstrumentName_result} 股票代码 {key} 股票类型 {type_list} \n")
else:
print(f"股票代码 {key} 已经存在,跳过...\n")
else:
print(f"跳过非股票类型:{key} 类型:{type_list} \n")
# 如果有新的股票,批量创建所有新股票记录
if stocks_to_create:
bulk_db_result = await wance_data_stock.WanceDataStock.bulk_create(stocks_to_create)
print(bulk_db_result, "股票池创建完成 \n")
else:
print("没有新的股票需要创建 \n")
# 获取并更新sector模块
sector_list = xtdata.get_sector_list()
for sector in sector_list:
# 使用模糊匹配找到最佳的中文板块匹配
best_match = get_best_match(sector, translation_dict)
if best_match:
translated_sector = translation_dict[best_match] # 获取对应的英文名称
else:
print(f"没有找到合适的板块匹配:{sector} \n")
continue # 如果没有找到匹配,跳过该板块
# 获取板块对应的股票列表
sector_stock = xtdata.get_stock_list_in_sector(sector)
# 获取所有相关股票
stocks_to_update = await wance_data_stock.WanceDataStock.filter(stock_code__in=sector_stock)
# 遍历并更新每个股票的sector避免重复添加相同的英文板块
for stock in stocks_to_update:
if translated_sector not in stock.stock_sector: # 检查是否已经存在该英文板块
stock.stock_sector.append(translated_sector)
await stock.save() # 保存更新后的数据
else:
print(f"{stock.stock_code} 已经包含板块 {translated_sector}, 跳过重复添加 \n")
print(f"更新板块完成 {sector}: {translated_sector} \n")
print(f"所有股票已经加载完成 \n")
"""
# 初始化股票池参数 股票名、股票代码、股票上市时间、股票板块、股票最后回测时间、股票退市时间、股票所属市场
@param incremental:是否执行增量下载
@type incremental:bool
@return:
@rtype:
"""
"""
async def init_stock_pool(incremental: bool = False):
# 这行代码存在一个问题就是,存入的板块信息,是中文的,在查询的时候应为是中文的数据所以没有办法被选中
await init_tortoise()
# 获取所有现有股票代码
existing_stocks = set()
if incremental:
existing_stocks = {stock.stock_code for stock in await wance_data_stock.WanceDataStock.all()}
# 初始化股票池
tick_result = xtdata.get_full_tick(['SH', 'SZ'])
stocks_to_create = [] # 使用一个列表批量创建
for key in tick_result.keys():
# 如果是增量更新,且该股票已经存在,则跳过
if incremental and key in existing_stocks:
continue
detail_result = xtdata.get_instrument_detail(key, False)
InstrumentName_result = detail_result.get("InstrumentName")
start_time = detail_result.get("OpenDate") or detail_result.get("CreateDate")
end_time = datetime.now().strftime('%Y%m%d')
time_expire = detail_result.get("ExporeDate")
type_dict = xtdata.get_instrument_type(key)
type_list = []
if type_dict:
for i in type_dict.keys():
type_list.append(i)
stocks_to_create.append(
wance_data_stock.WanceDataStock(
stock_code=key,
stock_name=InstrumentName_result,
stock_sector=[],
stock_type=type_list,
time_start=start_time,
time_end=end_time,
time_expire=time_expire,
market_sector=key.rsplit('.', 1)[-1]
)
)
print(f"加载成功 股票名称 {InstrumentName_result} 股票代码 {key} 股票类型 {type_list} \n")
# 如果有新的股票,批量创建所有新股票记录
if stocks_to_create:
bulk_db_result = await wance_data_stock.WanceDataStock.bulk_create(stocks_to_create)
print(bulk_db_result, "股票池创建完成 \n")
else:
print("没有新的股票需要创建 \n")
# 获取并更新sector模块
sector_list = xtdata.get_sector_list()
if incremental:
# 获取已经更新过的sector
updated_sectors = set()
# 获取所有已经存在的股票及其相关板块
existing_stock_sectors = await wance_data_stock.WanceDataStock.all().values('stock_code', 'stock_sector')
for stock_info in existing_stock_sectors:
for sector in stock_info['stock_sector']:
updated_sectors.add(sector)
# 过滤出没有更新过的sector
sector_list = [sector for sector in sector_list if sector not in updated_sectors]
for sector in sector_list:
sector_stock = xtdata.get_stock_list_in_sector(sector)
# 获取所有相关股票
stocks_to_update = await wance_data_stock.WanceDataStock.filter(stock_code__in=sector_stock)
# 遍历并更新每个股票的sector避免重复添加
for stock in stocks_to_update:
if sector not in stock.stock_sector:
stock.stock_sector.append(sector)
await stock.save() # 保存更新后的数据
print(f"更新板块完成 {stock.stock_code}: {stock.stock_sector} \n")
print(f"所有股票已经加载完成 \n")
"""
"""
# 每次都会操作数据库
async def init_stock_pool():
await init_tortoise()
# 初始化股票池
tick_result = xtdata.get_full_tick(['SH', 'SZ'])
tick_keys = list(tick_result.keys())
for key in tick_keys:
detail_result = xtdata.get_instrument_detail(key, False)
InstrumentName_result = detail_result.get("InstrumentName")
type_result = xtdata.get_instrument_type(key)
# 创建新的股票记录
await wance_data_stock.WanceDataStock.create(
stock_code=key,
stock_name=InstrumentName_result,
stock_sector="",
stock_type=type_result
)
# 获取并更新sector模块
sector_list = xtdata.get_sector_list()
for sector in sector_list:
sector_stock = xtdata.get_stock_list_in_sector(sector)
for stock in sector_stock:
# 更新已有股票的sector
await wance_data_stock.WanceDataStock.filter(stock_code=stock).update(stock_sector=sector)
"""
if __name__ == '__main__':
# processing_data(field_list=[],
# stock_list=["000062.SZ","600611.SH"],
# period="1d",
# start_time="20240506",
# end_time="",
# count=-1,
# dividend_type="none",
# fill_data=False,
# )
# history_data_processing()
asyncio.run(init_stock_pool(False))
asyncio.run(init_indicator())
# xtdata.run()