You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
utils/transferOhterRyData_python.py

491 lines
21 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import pymysql
#a ry库 b是mojn库
def toTransStockBasis(db_conn_a,dbcursor_a,db_conn_b,dbcursor_b,trade_date):
# 连接数据库A ry库
db_a_conn = db_conn_a
db_ry_cursor = dbcursor_a
# 连接数据库B mojin库
db_b_conn = db_conn_b
db_mojin_cursor = dbcursor_b
trans_date = trade_date
mojin_stock_basic_map = {}
print('to transfer stockBasis ',trans_date , ' data.' )
#查询mojn库中的basis数据
sql_qurey = 'select code,name,blemind2,blemind3 from stock_basis'
db_mojin_cursor.execute(sql_qurey)
basisDatas = db_mojin_cursor.fetchall()
for row in basisDatas:
code,name,blemind2,blemind3 = row
mojin_stock_basic_map[code] = (code,name,blemind2,blemind3)
#查询ry库中的基础数据
sql_qurey = 'select code,name,blemind2,blemind3 from stock_basis'
db_ry_cursor.execute(sql_qurey)
basisDatas = db_ry_cursor.fetchall()
hasInsertData = False
for row in basisDatas:
code,name,blemind2,blemind3 = row
if code in mojin_stock_basic_map:
if mojin_stock_basic_map[code][1] != name:
sql_insert = 'update stock_basis set name = %s where code = %s'
db_mojin_cursor.execute(sql_insert, (name,code))
hasInsertData = True
else:
continue
else:
sql_insert = f"INSERT INTO stock_basis (code,name,blemind2,blemind3) VALUES ( %s, %s, %s, %s)"
db_mojin_cursor.execute(sql_insert, (code,name,blemind2,blemind3))
hasInsertData = True
if hasInsertData:
db_b_conn.commit()
print('successed transfer stockBasis ',trans_date , ' data.' )
print('\r\n')
#只在财报最后几天进行更新
def toTransStockFinancial(db_conn_a,dbcursor_a,db_conn_b,dbcursor_b,trade_date):
# 连接数据库A ry库
db_a_conn = db_conn_a
db_ry_cursor = dbcursor_a
# 连接数据库B mojin库
db_b_conn = db_conn_b
db_mojin_cursor = dbcursor_b
trans_date = trade_date
mojin_stock_financial_map = {}
print('to transfer stockFinancial ',trans_date , ' data.' )
#查询mojn库中的basis数据
sql_qurey = 'select code,period,jlrtbzzl,jzcsylroe,epsbasic,jlr,jbmgsy,mgjzc from stock_financial where period = %s'
db_mojin_cursor.execute(sql_qurey,trade_date)
basisDatas = db_mojin_cursor.fetchall()
for row in basisDatas:
code,period,jlrtbzzl,jzcsylroe,epsbasic,jlr,jbmgsy,mgjzc = row
mojin_stock_financial_map[code] = (code,period,jlrtbzzl,jzcsylroe,epsbasic,jlr,jbmgsy,mgjzc)
#查询ry库中的基础数据
sql_qurey = 'select code,period,jlrtbzzl,jzcsylroe,epsbasic,jlr,jbmgsy,mgjzc from stock_financial where period = %s'
db_ry_cursor.execute(sql_qurey,trade_date)
basisDatas = db_ry_cursor.fetchall()
hasInsertData = False
for row in basisDatas:
code,period,jlrtbzzl,jzcsylroe,epsbasic,jlr,jbmgsy,mgjzc = row
if code in mojin_stock_financial_map:
if mojin_stock_financial_map[code][1] == period:
continue
else:
sql_insert = f"INSERT INTO stock_financial (code,period,jlrtbzzl,jzcsylroe,epsbasic,jlr,jbmgsy,mgjzc) VALUES ( %s, %s, %s, %s,%s, %s, %s, %s)"
db_mojin_cursor.execute(sql_insert, (code,period,jlrtbzzl,jzcsylroe,epsbasic,jlr,jbmgsy,mgjzc))
hasInsertData = True
else:
sql_insert = f"INSERT INTO stock_financial (code,period,jlrtbzzl,jzcsylroe,epsbasic,jlr,jbmgsy,mgjzc) VALUES ( %s, %s, %s, %s,%s, %s, %s, %s)"
db_mojin_cursor.execute(sql_insert, (code,period,jlrtbzzl,jzcsylroe,epsbasic,jlr,jbmgsy,mgjzc))
hasInsertData = True
if hasInsertData:
db_b_conn.commit()
print('successed transfer stockFinancial ',trans_date , ' data.' )
print('\r\n')
def toTransStockIndex(db_conn_a,dbcursor_a,db_conn_b,dbcursor_b,trade_date):
# 连接数据库A ry库
db_a_conn = db_conn_a
db_ry_cursor = dbcursor_a
# 连接数据库B mojin库
db_b_conn = db_conn_b
db_mojin_cursor = dbcursor_b
trans_date = trade_date
mojin_stock_index_map = {}
print('to transfer stockIndex ',trans_date , ' data.' )
#查询mojn库中的basis数据
sql_qurey = 'select code,name,trade_day,open,close,high,low,differRange,volume,amount,limitupnum,suspendnum,risenum,fallnum,flatnum,componentnum,mv,pettm,pettm_mid,liqmv,rcnthigh,rcntlow from stock_index where trade_day = %s'
db_mojin_cursor.execute(sql_qurey,trade_date)
basisDatas = db_mojin_cursor.fetchall()
for row in basisDatas:
code,name,trade_day,open,close,high,low,differRange,volume,amount,limitupnum,suspendnum,risenum,fallnum,flatnum,componentnum,mv,pettm,pettm_mid,liqmv,rcnthigh,rcntlow = row
mojin_stock_index_map[code] = (code,name,trade_day,open,close,high,low,differRange,volume,amount,limitupnum,suspendnum,risenum,fallnum,flatnum,componentnum,mv,pettm,pettm_mid,liqmv,rcnthigh,rcntlow)
#查询ry库中的基础数据
sql_qurey = 'select code,name,trade_day,open,close,high,low,differRange,volume,amount,limitupnum,suspendnum,risenum,fallnum,flatnum,componentnum,mv,pettm,pettm_mid,liqmv,rcnthigh,rcntlow from stock_index where trade_day = %s'
db_ry_cursor.execute(sql_qurey,trade_date)
basisDatas = db_ry_cursor.fetchall()
hasInsertData = False
for row in basisDatas:
code,name,trade_day,open,close,high,low,differRange,volume,amount,limitupnum,suspendnum,risenum,fallnum,flatnum,componentnum,mv,pettm,pettm_mid,liqmv,rcnthigh,rcntlow = row
# 数据存在时,不进行更新
if code in mojin_stock_index_map:
continue
else:
sql_insert = f"INSERT INTO stock_index (code,name,trade_day,open,close,high,low,differRange,volume,amount,limitupnum,suspendnum,risenum,fallnum,flatnum,componentnum,mv,pettm,pettm_mid,liqmv,rcnthigh,rcntlow) VALUES ( %s, %s, %s, %s,%s, %s, %s, %s,%s, %s, %s, %s,%s, %s, %s, %s,%s, %s, %s, %s,%s, %s)"
db_mojin_cursor.execute(sql_insert, (code,name,trade_day,open,close,high,low,differRange,volume,amount,limitupnum,suspendnum,risenum,fallnum,flatnum,componentnum,mv,pettm,pettm_mid,liqmv,rcnthigh,rcntlow))
hasInsertData = True
if hasInsertData:
db_b_conn.commit()
print('successed transfer stockIndex ',trans_date , ' data.' )
print('\r\n')
def toTransStocks(db_conn_a,dbcursor_a,db_conn_b,dbcursor_b,trade_date):
# 连接数据库A ry库
db_a_conn = db_conn_a
db_ry_cursor = dbcursor_a
# 连接数据库B mojin库
db_b_conn = db_conn_b
db_mojin_cursor = dbcursor_b
trans_date = trade_date
mojin_stocks_map = {}
print('to transfer stocks ',trans_date , ' data.' )
#查询mojn库中的basis数据
sql_qurey = 'select code,trade_day,open,close,high,low,islimit,isdrop,differrange,volumn,amount,differrange10,differrange20,differrange60,avg_volume20,freefloat_market_value,total_market_value,agencies_hold from stocks where trade_day = %s'
db_mojin_cursor.execute(sql_qurey,trans_date)
basisDatas = db_mojin_cursor.fetchall()
for row in basisDatas:
code,trade_day,open,close,high,low,islimit,isdrop,differrange,volumn,amount,differrange10,differrange20,differrange60,avg_volume20,freefloat_market_value,total_market_value,agencies_hold = row
mojin_stocks_map[code] = (code,trade_day,open,close,high,low,islimit,isdrop,differrange,volumn,amount,differrange10,differrange20,differrange60,avg_volume20,freefloat_market_value,total_market_value,agencies_hold)
#查询ry库中的基础数据
sql_qurey = 'select code,trade_day,open,close,high,low,islimit,isdrop,differrange,volumn,amount,differrange10,differrange20,differrange60,avg_volume20,freefloat_market_value,total_market_value,agencies_hold from stocks where trade_day = %s'
db_ry_cursor.execute(sql_qurey,trans_date)
basisDatas = db_ry_cursor.fetchall()
hasInsertData = False
for row in basisDatas:
code,trade_day,open,close,high,low,islimit,isdrop,differrange,volumn,amount,differrange10,differrange20,differrange60,avg_volume20,freefloat_market_value,total_market_value,agencies_hold = row
# 数据存在时,不进行更新
if code in mojin_stocks_map:
continue
else:
sql_insert = f"INSERT INTO stocks (code,trade_day,open,close,high,low,islimit,isdrop,differrange,volumn,amount,differrange10,differrange20,differrange60,avg_volume20,freefloat_market_value,total_market_value,agencies_hold) VALUES ( %s, %s, %s, %s,%s, %s, %s, %s,%s, %s, %s, %s,%s, %s, %s, %s,%s, %s)"
db_mojin_cursor.execute(sql_insert, (code,trade_day,open,close,high,low,islimit,isdrop,differrange,volumn,amount,differrange10,differrange20,differrange60,avg_volume20,freefloat_market_value,total_market_value,agencies_hold))
hasInsertData = True
if hasInsertData:
db_b_conn.commit()
print('successed transfer stocks ',trans_date , ' data.' )
print('\r\n')
def toTransStocksInTrend(db_conn_a,dbcursor_a,db_conn_b,dbcursor_b,trade_date):
# 连接数据库A ry库
db_a_conn = db_conn_a
db_ry_cursor = dbcursor_a
# 连接数据库B mojin库
db_b_conn = db_conn_b
db_mojin_cursor = dbcursor_b
trans_date = trade_date
mojin_stockintrend_map = {}
print('to transfer stocksInTrend ',trans_date , ' data.' )
#查询mojn库中的basis数据
sql_qurey = 'select code,trade_day,sort,type from stocks_in_trend where trade_day = %s'
db_mojin_cursor.execute(sql_qurey,trans_date)
basisDatas = db_mojin_cursor.fetchall()
for row in basisDatas:
code,trade_day,sort,type = row
mojin_stockintrend_map[code] = {type:{'code':code,'trade_day':trade_day,'sort':sort,'type':type}}
#查询ry库中的基础数据
sql_qurey = 'select code,trade_day,sort,type from stocks_in_trend where trade_day = %s'
db_ry_cursor.execute(sql_qurey,trans_date)
basisDatas = db_ry_cursor.fetchall()
hasInsertData = False
for row in basisDatas:
code,trade_day,sort,type = row
# 数据存在时,不进行更新
if code in mojin_stockintrend_map:
continue
else:
sql_insert = f"INSERT INTO stocks_in_trend (code,trade_day,sort,type) VALUES ( %s, %s, %s, %s)"
db_mojin_cursor.execute(sql_insert, (code,trade_day,sort,type))
hasInsertData = True
if hasInsertData:
db_b_conn.commit()
print('successed transfer stocksInTrend ',trans_date , ' data.' )
print('\r\n')
def toTransStocksLimit(db_conn_a,dbcursor_a,db_conn_b,dbcursor_b,trade_date):
# 连接数据库A ry库
db_a_conn = db_conn_a
db_ry_cursor = dbcursor_a
# 连接数据库B mojin库
db_b_conn = db_conn_b
db_mojin_cursor = dbcursor_b
trans_date = trade_date
mojin_stocklimit_map = {}
print('to transfer stocksLimit ',trans_date , ' data.' )
#查询mojn库中的basis数据
sql_qurey = 'select code,trade_day,islimit,isdrop from stocks_limit where trade_day = %s'
db_mojin_cursor.execute(sql_qurey,trans_date)
basisDatas = db_mojin_cursor.fetchall()
for row in basisDatas:
code,trade_day,sort,type = row
mojin_stocklimit_map[code] = (code,trade_day,sort,type)
#查询ry库中的基础数据
sql_qurey = 'select code,trade_day,islimit,isdrop from stocks_limit where trade_day = %s'
db_ry_cursor.execute(sql_qurey,trans_date)
basisDatas = db_ry_cursor.fetchall()
hasInsertData = False
for row in basisDatas:
code,trade_day,islimit,isdrop = row
# 数据存在时,不进行更新
if code in mojin_stocklimit_map:
continue
else:
sql_insert = f"INSERT INTO stocks_limit (code,trade_day,islimit,isdrop) VALUES ( %s, %s, %s, %s)"
db_mojin_cursor.execute(sql_insert, (code,trade_day,islimit,isdrop))
hasInsertData = True
if hasInsertData:
db_b_conn.commit()
print('successed transfer stocksLimit ',trans_date , ' data.' )
print('\r\n')
def toTransStocksLimitUp(db_conn_a,dbcursor_a,db_conn_b,dbcursor_b,trade_date):
# 连接数据库A ry库
db_a_conn = db_conn_a
db_ry_cursor = dbcursor_a
# 连接数据库B mojin库
db_b_conn = db_conn_b
db_mojin_cursor = dbcursor_b
trans_date = trade_date
mojin_stocklimitup_map = {}
print('to transfer stocksLimitUp ',trans_date , ' data.' )
#查询mojn库中的basis数据
sql_qurey = 'select code,trade_day from stocks_limit_up where trade_day = %s'
db_mojin_cursor.execute(sql_qurey,trans_date)
basisDatas = db_mojin_cursor.fetchall()
for row in basisDatas:
code,trade_day = row
mojin_stocklimitup_map[code] = (code,trade_day)
#查询ry库中的基础数据
sql_qurey = 'select code,trade_day from stocks_limit_up where trade_day = %s'
db_ry_cursor.execute(sql_qurey,trans_date)
basisDatas = db_ry_cursor.fetchall()
hasInsertData = False
for row in basisDatas:
code,trade_day = row
# 数据存在时,不进行更新
if code in mojin_stocklimitup_map:
continue
else:
sql_insert = f"INSERT INTO stocks_limit _up(code,trade_day) VALUES ( %s, %s)"
db_mojin_cursor.execute(sql_insert, (code,trade_day))
hasInsertData = True
if hasInsertData:
db_b_conn.commit()
print('successed transfer stocksLimitUp ',trans_date , ' data.' )
print('\r\n')
def toTransStocksNewRecord(db_conn_a,dbcursor_a,db_conn_b,dbcursor_b,trade_date):
# 连接数据库A ry库
db_a_conn = db_conn_a
db_ry_cursor = dbcursor_a
# 连接数据库B mojin库
db_b_conn = db_conn_b
db_mojin_cursor = dbcursor_b
trans_date = trade_date
mojin_stocksNeRecord_map = {}
print('to transfer stocksNewRecord ',trans_date , ' data.' )
#查询mojn库中的basis数据
sql_qurey = 'select code,trade_day,isHigh,isLow from stocks_new_record where trade_day = %s'
db_mojin_cursor.execute(sql_qurey,trans_date)
basisDatas = db_mojin_cursor.fetchall()
for row in basisDatas:
code,trade_day,isHigh,isLow = row
mojin_stocksNeRecord_map[code] = (code,trade_day,isHigh,isLow)
#查询ry库中的基础数据
sql_qurey = 'select code,trade_day,isHigh,isLow from stocks_new_record where trade_day = %s'
db_ry_cursor.execute(sql_qurey,trans_date)
basisDatas = db_ry_cursor.fetchall()
hasInsertData = False
for row in basisDatas:
code,trade_day,isHigh,isLow = row
# 数据存在时,不进行更新
if code in mojin_stocksNeRecord_map:
continue
else:
sql_insert = f"INSERT INTO stocks_new_record (code,trade_day,isHigh,isLow) VALUES ( %s, %s, %s, %s)"
db_mojin_cursor.execute(sql_insert, (code,trade_day,isHigh,isLow))
hasInsertData = True
if hasInsertData:
db_b_conn.commit()
print('successed transfer stocksNewRecord ',trans_date , ' data.' )
print('\r\n')
def toTransStocksTmp(db_conn_a,dbcursor_a,db_conn_b,dbcursor_b,trade_date):
# 连接数据库A ry库
db_a_conn = db_conn_a
db_ry_cursor = dbcursor_a
# 连接数据库B mojin库
db_b_conn = db_conn_b
db_mojin_cursor = dbcursor_b
trans_date = trade_date
mojin_stocksTmp_map = {}
print('to transfer stocksTmp ',trans_date , ' data.' )
#查询mojn库中的basis数据
sql_qurey = 'select code,trade_day,differrange3,differrange5,differrange15,differrange30 from stocks_tmp where trade_day = %s'
db_mojin_cursor.execute(sql_qurey,trans_date)
basisDatas = db_mojin_cursor.fetchall()
for row in basisDatas:
code,trade_day,differrange3,differrange5,differrange15,differrange30 = row
mojin_stocksTmp_map[code] = (code,trade_day,differrange3,differrange5,differrange15,differrange30)
#查询ry库中的基础数据
sql_qurey = 'select code,trade_day,differrange3,differrange5,differrange15,differrange30 from stocks_tmp where trade_day = %s'
db_ry_cursor.execute(sql_qurey,trans_date)
basisDatas = db_ry_cursor.fetchall()
hasInsertData = False
for row in basisDatas:
code,trade_day,differrange3,differrange5,differrange15,differrange30 = row
# 数据存在时,不进行更新
if code in mojin_stocksTmp_map:
continue
else:
sql_insert = f"INSERT INTO stocks_tmp (code,trade_day,differrange3,differrange5,differrange15,differrange30) VALUES ( %s, %s, %s, %s,%s,%s)"
db_mojin_cursor.execute(sql_insert, (code,trade_day,differrange3,differrange5,differrange15,differrange30))
hasInsertData = True
if hasInsertData:
db_b_conn.commit()
print('successed transfer stocksTmp ',trans_date , ' data.' )
print('\r\n')
def toTransTrends(db_conn_a,dbcursor_a,db_conn_b,dbcursor_b,trade_date):
# 连接数据库A ry库
db_a_conn = db_conn_a
db_ry_cursor = dbcursor_a
# 连接数据库B mojin库
db_b_conn = db_conn_b
db_mojin_cursor = dbcursor_b
trans_date = trade_date
mojin_trends_map = {}
print('to transfer Trends ',trans_date , ' data.' )
#查询mojn库中的basis数据
sql_qurey = 'select trade_day,blemind2,stocks_count,trend_value,trend_value_change,sort,sort_change,type from trends where trade_day = %s'
db_mojin_cursor.execute(sql_qurey,trans_date)
basisDatas = db_mojin_cursor.fetchall()
for row in basisDatas:
trade_day,blemind2,stocks_count,trend_value,trend_value_change,sort,sort_change,type = row
mojin_trends_map[blemind2] = {type:(trade_day,blemind2,stocks_count,trend_value,trend_value_change,sort,sort_change,type)}
#查询ry库中的基础数据
sql_qurey = 'select trade_day,blemind2,stocks_count,trend_value,trend_value_change,sort,sort_change,type from trends where trade_day = %s'
db_ry_cursor.execute(sql_qurey,trans_date)
basisDatas = db_ry_cursor.fetchall()
hasInsertData = False
for row in basisDatas:
trade_day,blemind2,stocks_count,trend_value,trend_value_change,sort,sort_change,type = row
# 数据存在时,不进行更新
if blemind2 in mojin_trends_map:
continue
else:
sql_insert = f"INSERT INTO trends (trade_day,blemind2,stocks_count,trend_value,trend_value_change,sort,sort_change,type) VALUES ( %s, %s, %s, %s,%s, %s, %s, %s)"
db_mojin_cursor.execute(sql_insert, (trade_day,blemind2,stocks_count,trend_value,trend_value_change,sort,sort_change,type))
hasInsertData = True
if hasInsertData:
db_b_conn.commit()
print('successed transfer Trends ',trans_date , ' data.' )
print('\r\n')
def toTransRyOhterData(db_conn_a,dbcursor_a,db_conn_b,dbcursor_b,trade_date):
toTransStockBasis(db_conn_a,dbcursor_a,db_conn_b,dbcursor_b,trade_date)
# toTransStockFinancial(db_conn_a,dbcursor_a,db_conn_b,dbcursor_b,trade_date)
toTransStockIndex(db_conn_a,dbcursor_a,db_conn_b,dbcursor_b,trade_date)
toTransStocks(db_conn_a,dbcursor_a,db_conn_b,dbcursor_b,trade_date)
toTransStocksInTrend(db_conn_a,dbcursor_a,db_conn_b,dbcursor_b,trade_date)
toTransStocksLimit(db_conn_a,dbcursor_a,db_conn_b,dbcursor_b,trade_date)
toTransStocksLimitUp(db_conn_a,dbcursor_a,db_conn_b,dbcursor_b,trade_date)
toTransStocksNewRecord(db_conn_a,dbcursor_a,db_conn_b,dbcursor_b,trade_date)
toTransStocksTmp(db_conn_a,dbcursor_a,db_conn_b,dbcursor_b,trade_date)
toTransTrends(db_conn_a,dbcursor_a,db_conn_b,dbcursor_b,trade_date)
if __name__ == "__main__":
# 数据库A的连接信息腾讯云数据库
db_a_config = {
'host': '124.223.98.178',
'user': 'root',
'password': '1qazse42W3',
'db': 'ry',
'charset': 'utf8mb4',
}
# 数据库B的连接信息腾讯云数据库
db_b_config = {
'host': '124.223.98.178',
'user': 'root',
'password': '1qazse42W3',
'db': 'mojin',
'charset': 'utf8mb4',
}
try:
# 连接数据库A
db_a_conn = pymysql.connect(**db_a_config)
db_a_cursor = db_a_conn.cursor()
# 连接数据库B
db_b_conn = pymysql.connect(**db_b_config)
db_b_cursor = db_b_conn.cursor()
trade_day = '2023-08-10'
#测试
# toTransStocksInTrend(db_a_conn,db_a_cursor,db_b_conn,db_b_cursor,trade_day)
# toTransStockBasis(db_a_conn,db_a_cursor,db_b_conn,db_b_cursor,trade_day)
# # toTransStockFinancial(db_conn_a,dbcursor_a,db_conn_b,dbcursor_b,trade_date)
# toTransStockIndex(db_a_conn,db_a_cursor,db_b_conn,db_b_cursor,trade_day)
# toTransStocks(db_a_conn,db_a_cursor,db_b_conn,db_b_cursor,trade_day)
# toTransStocksInTrend(db_a_conn,db_a_cursor,db_b_conn,db_b_cursor,trade_day)
# toTransStocksLimit(db_a_conn,db_a_cursor,db_b_conn,db_b_cursor,trade_day)
# toTransStocksLimitUp(db_a_conn,db_a_cursor,db_b_conn,db_b_cursor,trade_day)
toTransStocksNewRecord(db_a_conn,db_a_cursor,db_b_conn,db_b_cursor,trade_day)
toTransStocksTmp(db_a_conn,db_a_cursor,db_b_conn,db_b_cursor,trade_day)
toTransTrends(db_a_conn,db_a_cursor,db_b_conn,db_b_cursor,trade_day)
except Exception as e:
print(f"出现错误:{e}")
finally:
# 关闭连接
if db_a_cursor:
db_a_cursor.close()
if db_a_conn:
db_a_conn.close()
if db_b_cursor:
db_b_cursor.close()
if db_b_conn:
db_b_conn.close()