股票系统gh_mirrors/st/stock数据库设计详解:MySQL与Pandas数据处理技巧
【免费下载链接】stock stock,股票系统。使用python进行开发。 项目地址: https://gitcode.***/gh_mirrors/st/stock
引言:股票数据处理的双重挑战
你是否曾面临过股票数据存储效率低下、Pandas数据处理与MySQL交互不畅的问题?本文将深入解析gh_mirrors/st/stock项目的数据库设计理念,展示如何通过MySQL与Pandas的高效协同,构建稳定可靠的股票数据处理系统。读完本文,你将掌握:
- 股票系统核心数据库架构设计与表结构规范
- MySQL与Pandas数据交互的最佳实践
- 高并发股票数据写入的优化技巧
- 数据缓存与性能提升的实现方案
一、数据库架构设计
1.1 整体架构概览
gh_mirrors/st/stock项目采用MySQL作为核心数据存储,结合Pandas进行数据处理与分析,通过分层设计实现数据的高效管理。系统架构如下:
1.2 数据库连接配置
项目通过环境变量实现数据库连接的灵活配置,核心配置如下:
# libs/***mon.py
MYSQL_HOST = os.environ.get('MYSQL_HOST') if (os.environ.get('MYSQL_HOST') != None) else "mysqldb"
MYSQL_USER = os.environ.get('MYSQL_USER') if (os.environ.get('MYSQL_USER') != None) else "root"
MYSQL_PWD = os.environ.get('MYSQL_PWD') if (os.environ.get('MYSQL_PWD') != None) else "mysqldb"
MYSQL_DB = os.environ.get('MYSQL_DB') if (os.environ.get('MYSQL_DB') != None) else "stock_data"
MYSQL_CONN_URL = "mysql+mysqldb://" + MYSQL_USER + ":" + MYSQL_PWD + "@" + MYSQL_HOST + ":3306/" + MYSQL_DB + "?charset=utf8mb4"
这种配置方式的优势在于:
- 开发环境与生产环境无缝切换
- 数据库凭证安全管理
- 多环境部署的灵活性
1.3 核心数据表设计
虽然项目中未直接提供.sql文件,但通过代码分析可推导出核心表结构设计。以下是三个关键表的结构说明:
1.3.1 股票基本信息表(stock_zh_ah_name)
| 字段名 | 类型 | 描述 |
|---|---|---|
| code | VARCHAR(10) | 股票代码,主键 |
| name | VARCHAR(20) | 股票名称 |
| latest_price | DECIMAL(10,2) | 最新价格 |
| quote_change | DECIMAL(5,2) | 涨跌幅(%) |
| ups_downs | DECIMAL(10,2) | 涨跌额 |
| volume | BIGINT | 成交量(手) |
| turnover | DECIMAL(15,2) | 成交额(万元) |
| amplitude | DECIMAL(5,2) | 振幅(%) |
| high | DECIMAL(10,2) | 最高价 |
| low | DECIMAL(10,2) | 最低价 |
| open | DECIMAL(10,2) | 开盘价 |
| closed | DECIMAL(10,2) | 昨收价 |
| quantity_ratio | DECIMAL(5,2) | 量比 |
| turnover_rate | DECIMAL(5,2) | 换手率(%) |
| pe_dynamic | DECIMAL(10,2) | 动态市盈率 |
| pb | DECIMAL(5,2) | 市净率 |
| date | INT | 日期(YYYYMMDD),主键 |
该表通过date和code组合作为主键,确保数据唯一性:
# jobs/18h_daily_job.py
***mon.insert_db(data, "stock_zh_ah_name", True, "`date`,`code`")
1.3.2 龙虎榜个股上榜统计表(stock_sina_lhb_ggtj)
| 字段名 | 类型 | 描述 |
|---|---|---|
| code | VARCHAR(10) | 股票代码,主键 |
| name | VARCHAR(20) | 股票名称 |
| ranking_times | INT | 上榜次数 |
| sum_buy | DECIMAL(15,2) | 买入金额(万元) |
| sum_sell | DECIMAL(15,2) | 卖出金额(万元) |
| ***_amount | DECIMAL(15,2) | 净额(万元) |
| buy_seat | VARCHAR(255) | 买入营业部 |
| sell_seat | VARCHAR(255) | 卖出营业部 |
| date | INT | 日期(YYYYMMDD),主键 |
1.3.3 大宗交易每日统计表(stock_dzjy_mrtj)
| 字段名 | 类型 | 描述 |
|---|---|---|
| code | VARCHAR(10) | 股票代码,主键 |
| name | VARCHAR(20) | 股票名称 |
| quote_change | DECIMAL(5,2) | 涨跌幅(%) |
| close_price | DECIMAL(10,2) | 收盘价 |
| average_price | DECIMAL(10,2) | 成交均价 |
| overflow_rate | DECIMAL(5,4) | 折溢率(%) |
| trade_number | INT | 成交笔数 |
| sum_volume | BIGINT | 成交数量(股) |
| sum_turnover | DECIMAL(15,2) | 成交金额(万元) |
| turnover_market_rate | DECIMAL(7,6) | 成交额/流通市值(%) |
| date | INT | 日期(YYYYMMDD),主键 |
1.4 数据库初始化流程
项目通过basic_job.py实现数据库的自动创建与初始化:
# jobs/basic_job.py
def create_new_database():
with MySQLdb.connect(***mon.MYSQL_HOST, ***mon.MYSQL_USER, ***mon.MYSQL_PWD, "mysql", charset="utf8") as db:
try:
create_sql = " CREATE DATABASE IF NOT EXISTS %s CHARACTER SET utf8 COLLATE utf8_general_ci " % ***mon.MYSQL_DB
print(create_sql)
db.auto***mit(on=True)
db.cursor().execute(create_sql)
except Exception as e:
print("error CREATE DATABASE :", e)
初始化检查流程:
- 尝试连接目标数据库
- 连接失败时自动创建数据库
- 设置字符集为UTF-8确保中文支持
二、MySQL与Pandas数据交互实现
2.1 核心交互模块设计
项目在libs/***mon.py中封装了完整的MySQL与Pandas交互接口,形成数据访问层:
2.2 数据写入实现机制
数据写入采用SQLAlchemy引擎,结合Pandas的to_sql方法实现高效数据插入:
# libs/***mon.py
def insert_other_db(to_db, data, table_name, write_index, primary_keys):
# 定义engine
engine_mysql = engine_to_db(to_db)
# 使用检查检查数据库表是否有主键
insp = inspect(engine_mysql)
col_name_list = data.columns.tolist()
# 如果有索引,把索引增加到varchar上面
if write_index:
# 插入到第一个位置
col_name_list.insert(0, data.index.name)
print(col_name_list)
data.to_sql(name=table_name, con=engine_mysql, schema=to_db, if_exists='append',
dtype={col_name: NVARCHAR(length=255) for col_name in col_name_list}, index=write_index)
# 自动添加主键约束
if insp.get_pk_constraint(table_name)['constrained_columns'] == []:
with engine_mysql.connect() as con:
try:
con.execute('ALTER TABLE `%s` ADD PRIMARY KEY (%s);' % (table_name, primary_keys))
except Exception as e:
print("################## ADD PRIMARY KEY ERROR :", e)
写入流程特点:
- 自动处理DataFrame列类型映射
- 支持索引写入与主键约束自动创建
- 统一设置字符串字段长度避免截断
- 采用append模式支持增量数据更新
2.3 数据写入前处理
在数据写入前,项目对AKShare返回的原始数据进行标准化处理:
# jobs/18h_daily_job.py
data = ak.stock_zh_a_spot_em()
data.columns = ['index', 'code', 'name', 'latest_price', 'quote_change', 'ups_downs', 'volume', 'turnover',
'amplitude', 'high', 'low', 'open', 'closed', 'quantity_ratio', 'turnover_rate', 'pe_dynamic',
'pb']
# 数据过滤
data = data.loc[data["code"].apply(stock_a)].loc[data["name"].apply(stock_a_filter_st)].loc[
data["latest_price"].apply(stock_a_filter_price)]
print(data)
data['date'] = datetime_int # 修改时间成为int类型
# 删除老数据
del_sql = " DELETE FROM `stock_zh_ah_name` where `date` = '%s' " % datetime_int
***mon.insert(del_sql)
data.set_index('code', inplace=True)
data.drop('index', axis=1, inplace=True)
print(data)
# 写入数据库
***mon.insert_db(data, "stock_zh_ah_name", True, "`date`,`code`")
数据处理流程:
- 标准化列名便于后续处理
- 过滤非目标股票数据(仅保留A股)
- 过滤ST股票和无价格数据
- 添加日期标识字段
- 删除指定日期的历史数据避免重复
- 设置股票代码为索引并写入数据库
2.4 数据查询操作实现
查询操作封装了基础的SQL执行与结果返回:
# libs/***mon.py
def select(sql, params=()):
with conn() as db:
print("select sql:" + sql)
try:
db.execute(sql, params)
except Exception as e:
print("error :", e)
result = db.fetchall()
return result
def select_count(sql, params=()):
with conn() as db:
print("select sql:" + sql)
try:
db.execute(sql, params)
except Exception as e:
print("error :", e)
result = db.fetchall()
# 只有一个数组中的第一个数据
if len(result) == 1:
return int(result[0][0])
else:
return 0
三、数据处理优化策略
3.1 数据缓存机制
为提高重复数据访问性能,项目实现了基于文件系统的缓存系统:
# libs/***mon.py
bash_stock_tmp = "/data/cache/hist_data_cache/%s/%s/"
def get_hist_data_cache(code, date_start, date_end):
cache_dir = bash_stock_tmp % (date_end[0:7], date_end)
# 如果没有文件夹创建一个。月文件夹和日文件夹。方便删除。
if not os.path.exists(cache_dir):
os.makedirs(cache_dir)
cache_file = cache_dir + "%s^%s.gzip.pickle" % (date_end, code)
# 如果缓存存在就直接返回缓存数据。压缩方式。
if os.path.isfile(cache_file):
print("######### read from cache #########", cache_file)
return pd.read_pickle(cache_file, ***pression="gzip")
else:
print("######### get data, write cache #########", code, date_start, date_end)
stock = ak.stock_zh_a_hist(symbol= code, start_date=date_start, end_date=date_end, adjust="")
stock.columns = ['date', 'open', 'close', 'high', 'low', 'volume', 'amount', 'amplitude', 'quote_change',
'ups_downs', 'turnover']
if stock is None:
return None
stock = stock.sort_index(0) # 将数据按照日期排序下
print(stock)
stock.to_pickle(cache_file, ***pression="gzip")
return stock
缓存系统特点:
- 按年月层级组织缓存目录
- 使用gzip压缩节省存储空间
- 基于日期和股票代码的缓存键设计
- 自动缓存失效机制(按日期目录)
3.2 数据重跑与去重策略
为确保数据准确性,项目实现了数据重跑机制,通过先删除后插入的方式保证数据一致性:
# jobs/18h_daily_job.py
# 删除老数据
del_sql = " DELETE FROM `stock_zh_ah_name` where `date` = '%s' " % datetime_int
***mon.insert(del_sql)
# 插入新数据
***mon.insert_db(data, "stock_zh_ah_name", True, "`date`,`code`")
这种策略的优势:
- 保证每日数据唯一性
- 支持历史数据重新生成
- 避免重复数据积累
3.3 数据类型处理优化
针对股票数据特点,项目对数值类型进行了特殊处理:
# jobs/18h_daily_job.py
# 数据保留指定小数位数
stock_dzjy_mrtj["average_price"] = stock_dzjy_mrtj["average_price"].round(2)
stock_dzjy_mrtj["overflow_rate"] = stock_dzjy_mrtj["overflow_rate"].round(4)
stock_dzjy_mrtj["turnover_market_rate"] = stock_dzjy_mrtj["turnover_market_rate"].round(6)
数据类型优化:
- 价格类数据保留2位小数
- 比率类数据保留4-6位小数
- 避免浮点精度问题影响分析结果
四、典型应用场景实现
4.1 股票数据每日更新
项目通过定时任务实现股票数据的每日更新,核心逻辑在18h_daily_job.py中实现:
# jobs/18h_daily_job.py
def stat_all(tmp_datetime):
datetime_str = (tmp_datetime).strftime("%Y-%m-%d")
datetime_int = (tmp_datetime).strftime("%Y%m%d")
print("datetime_str:", datetime_str)
print("datetime_int:", datetime_int)
# 股票列表数据采集与处理
try:
data = ak.stock_zh_a_spot_em()
# 数据处理与清洗
# ...
***mon.insert_db(data, "stock_zh_ah_name", True, "`date`,`code`")
except Exception as e:
print("error :", e)
# 龙虎榜数据采集与处理
try:
stock_sina_lhb_ggtj = ak.stock_sina_lhb_ggtj(recent_day="5")
# 数据处理与清洗
# ...
***mon.insert_db(stock_sina_lhb_ggtj, "stock_sina_lhb_ggtj", True, "`date`,`code`")
except Exception as e:
print("error :", e)
# 大宗交易数据采集与处理
try:
stock_dzjy_mrtj = ak.stock_dzjy_mrtj(start_date=datetime_str, end_date=datetime_str)
# 数据处理与清洗
# ...
***mon.insert_db(stock_dzjy_mrtj, "stock_dzjy_mrtj", True, "`date`,`code`")
except Exception as e:
print("error :", e)
每日更新流程:
- 获取指定日期的股票数据
- 多数据源并行处理(股票列表、龙虎榜、大宗交易)
- 数据清洗与标准化
- 数据写入数据库
- 异常捕获与错误处理
4.2 股票数据过滤实现
针对股票数据的特殊性,项目实现了多维度数据过滤:
# jobs/18h_daily_job.py
# 过滤A股股票
def stock_a(code):
if code.startswith('600') or code.startswith('6006') or code.startswith('601') or code.startswith('000') or code.startswith('001') or code.startswith('002'):
return True
else:
return False
# 过滤ST股票
def stock_a_filter_st(name):
if name.find("ST") == -1:
return True
else:
return False
# 过滤无价格数据
def stock_a_filter_price(latest_price):
# float 在 pandas 里面判断空值
if np.isnan(latest_price):
return False
else:
return True
# 应用过滤条件
data = data.loc[data["code"].apply(stock_a)].loc[data["name"].apply(stock_a_filter_st)].loc[
data["latest_price"].apply(stock_a_filter_price)]
过滤链的优势:
- 清晰分离不同过滤规则
- 便于单独测试和调整
- 支持链式组合使用
- 提高代码可读性和维护性
五、性能优化与最佳实践
5.1 数据库性能优化措施
项目采用多种策略优化数据库性能:
-
合理的索引设计
- 所有表均设置主键约束
- 日期+股票代码的复合主键设计
- 自动主键创建确保数据唯一性
-
批量操作优化
- 使用Pandas的
to_sql批量插入 - 避免单条记录循环插入
- 批量删除历史数据
- 使用Pandas的
-
连接池管理
- 使用SQLAlchemy引擎管理连接
- 避免频繁创建和销毁连接
5.2 Pandas数据处理最佳实践
-
数据类型优化
# 明确指定数据类型减少内存占用 data['date'] = data['date'].astype('int32') data['volume'] = data['volume'].astype('int64') -
索引优化
# 设置股票代码为索引加速查询 data.set_index('code', inplace=True) -
链式操作替代中间变量
# 直接链式处理减少内存占用 data = data.loc[data["code"].apply(stock_a)].drop('index', axis=1)
5.3 错误处理与容错机制
项目实现了完善的错误处理机制:
# libs/***mon.py
def run_with_args(run_fun):
try:
# 业务逻辑执行
run_fun(tmp_datetime_show)
except Exception as e:
print("error :", e)
traceback.print_exc()
错误处理策略:
- 异常捕获与堆栈打印
- 错误继续执行机制
- 关键操作日志记录
- 数据处理失败提示
六、总结与展望
gh_mirrors/st/stock项目通过精心设计的数据库架构和高效的数据处理流程,实现了股票市场数据的可靠存储与分析。项目的核心优势在于:
- 模块化设计:数据访问层与业务逻辑分离,便于维护和扩展
- 自动化机制:数据库自动创建、表结构自动生成、主键自动设置
- 性能优化:数据缓存、批量操作、索引优化等多重优化策略
- 健壮性保障:完善的错误处理、数据去重和重跑机制
未来优化方向:
- 实现数据库表结构的版本控制
- 添加数据完整性校验机制
- 引入时序数据库优化历史数据存储
- 实现数据同步的增量更新机制
通过本文介绍的数据库设计与数据处理技巧,你可以构建高效、可靠的股票数据处理系统,为量化分析和投资决策提供坚实的数据基础。
如果你觉得本文对你有帮助,请点赞、收藏并关注项目更新,下期我们将深入探讨股票技术指标的计算与应用实现。
【免费下载链接】stock stock,股票系统。使用python进行开发。 项目地址: https://gitcode.***/gh_mirrors/st/stock