You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
utils/transferOhterRyData_python.py

481 lines
20 KiB

import pymysql
#a ry库 b是mojn库
def toTransStockBasis(db_conn_a,dbcursor_a,db_conn_b,dbcursor_b,trade_date):
# 连接数据库A ry库
db_a_conn = db_conn_a
db_ry_cursor = dbcursor_a
# 连接数据库B mojin库
db_b_conn = db_conn_b
db_mojin_cursor = dbcursor_b
trans_date = trade_date
mojin_stock_basic_map = {}
print('to transfer stockBasis ',trans_date , ' data.' )
#查询mojn库中的basis数据
sql_qurey = 'select code,name,blemind2,blemind3 from stock_basis'
db_mojin_cursor.execute(sql_qurey)
basisDatas = db_mojin_cursor.fetchall()
for row in basisDatas:
code,name,blemind2,blemind3 = row
mojin_stock_basic_map[code] = (code,name,blemind2,blemind3)
#查询ry库中的基础数据
sql_qurey = 'select code,name,blemind2,blemind3 from stock_basis'
db_ry_cursor.execute(sql_qurey)
basisDatas = db_ry_cursor.fetchall()
hasInsertData = False
for row in basisDatas:
code,name,blemind2,blemind3 = row
if code in mojin_stock_basic_map:
if mojin_stock_basic_map[code][1] == name:
sql_insert = 'update stock_basis set name = %s where code = %s'
db_mojin_cursor.execute(sql_insert, (name,code))
hasInsertData = True
else:
continue
else:
sql_insert = f"INSERT INTO stock_basis (code,name,blemind2,blemind3) VALUES ( %s, %s, %s, %s)"
db_mojin_cursor.execute(sql_insert, (code,name,blemind2,blemind3))
hasInsertData = True
if hasInsertData:
db_mojin_cursor.commit()
print('successed transfer stockBasis ',trans_date , ' data.' )
print('\r\n')
#只在财报最后几天进行更新
def toTransStockFinancial(db_conn_a,dbcursor_a,db_conn_b,dbcursor_b,trade_date):
# 连接数据库A ry库
db_a_conn = db_conn_a
db_ry_cursor = dbcursor_a
# 连接数据库B mojin库
db_b_conn = db_conn_b
db_mojin_cursor = dbcursor_b
trans_date = trade_date
mojin_stock_financial_map = {}
print('to transfer stockFinancial ',trans_date , ' data.' )
#查询mojn库中的basis数据
sql_qurey = 'select code,period,jlrtbzzl,jzcsylroe,epsbasic,jlr,jbmgsy,mgjzc from stock_financial where period = %s'
db_mojin_cursor.execute(sql_qurey,trade_date)
basisDatas = db_mojin_cursor.fetchall()
for row in basisDatas:
code,period,jlrtbzzl,jzcsylroe,epsbasic,jlr,jbmgsy,mgjzc = row
mojin_stock_financial_map[code] = (code,period,jlrtbzzl,jzcsylroe,epsbasic,jlr,jbmgsy,mgjzc)
#查询ry库中的基础数据
sql_qurey = 'select code,period,jlrtbzzl,jzcsylroe,epsbasic,jlr,jbmgsy,mgjzc from stock_financial where period = %s'
db_ry_cursor.execute(sql_qurey,trade_date)
basisDatas = db_ry_cursor.fetchall()
hasInsertData = False
for row in basisDatas:
code,period,jlrtbzzl,jzcsylroe,epsbasic,jlr,jbmgsy,mgjzc = row
if code in mojin_stock_financial_map:
if mojin_stock_financial_map[code][1] == period:
continue
else:
sql_insert = f"INSERT INTO stock_financial (code,period,jlrtbzzl,jzcsylroe,epsbasic,jlr,jbmgsy,mgjzc) VALUES ( %s, %s, %s, %s,%s, %s, %s, %s)"
db_mojin_cursor.execute(sql_insert, (code,period,jlrtbzzl,jzcsylroe,epsbasic,jlr,jbmgsy,mgjzc))
hasInsertData = True
else:
sql_insert = f"INSERT INTO stock_financial (code,period,jlrtbzzl,jzcsylroe,epsbasic,jlr,jbmgsy,mgjzc) VALUES ( %s, %s, %s, %s,%s, %s, %s, %s)"
db_mojin_cursor.execute(sql_insert, (code,period,jlrtbzzl,jzcsylroe,epsbasic,jlr,jbmgsy,mgjzc))
hasInsertData = True
if hasInsertData:
db_mojin_cursor.commit()
print('successed transfer stockFinancial ',trans_date , ' data.' )
print('\r\n')
def toTransStockIndex(db_conn_a,dbcursor_a,db_conn_b,dbcursor_b,trade_date):
# 连接数据库A ry库
db_a_conn = db_conn_a
db_ry_cursor = dbcursor_a
# 连接数据库B mojin库
db_b_conn = db_conn_b
db_mojin_cursor = dbcursor_b
trans_date = trade_date
mojin_stock_index_map = {}
print('to transfer stockIndex ',trans_date , ' data.' )
#查询mojn库中的basis数据
sql_qurey = 'select code,name,trade_day,open,close,high,low,differRange,volume,amount,limitupnum,suspendnum,risenum,fallnum,flatnum,componentnum,mv,pettm,pettm_mid,liqmv,rcnthigh,rcntlow from stock_index where trade_day = %s'
db_mojin_cursor.execute(sql_qurey,trade_date)
basisDatas = db_mojin_cursor.fetchall()
for row in basisDatas:
code,name,trade_day,open,close,high,low,differRange,volume,amount,limitupnum,suspendnum,risenum,fallnum,flatnum,componentnum,mv,pettm,pettm_mid,liqmv,rcnthigh,rcntlow = row
mojin_stock_index_map[code] = (code,name,trade_day,open,close,high,low,differRange,volume,amount,limitupnum,suspendnum,risenum,fallnum,flatnum,componentnum,mv,pettm,pettm_mid,liqmv,rcnthigh,rcntlow)
#查询ry库中的基础数据
sql_qurey = 'select code,name,trade_day,open,close,high,low,differRange,volume,amount,limitupnum,suspendnum,risenum,fallnum,flatnum,componentnum,mv,pettm,pettm_mid,liqmv,rcnthigh,rcntlow from stock_index where trade_day = %s'
db_ry_cursor.execute(sql_qurey,trade_date)
basisDatas = db_ry_cursor.fetchall()
hasInsertData = False
for row in basisDatas:
code,name,trade_day,open,close,high,low,differRange,volume,amount,limitupnum,suspendnum,risenum,fallnum,flatnum,componentnum,mv,pettm,pettm_mid,liqmv,rcnthigh,rcntlow = row
# 数据存在时,不进行更新
if code in mojin_stock_index_map:
continue
else:
sql_insert = f"INSERT INTO stock_index (code,name,trade_day,open,close,high,low,differRange,volume,amount,limitupnum,suspendnum,risenum,fallnum,flatnum,componentnum,mv,pettm,pettm_mid,liqmv,rcnthigh,rcntlow) VALUES ( %s, %s, %s, %s,%s, %s, %s, %s,%s, %s, %s, %s,%s, %s, %s, %s,%s, %s, %s, %s,%s, %s)"
db_mojin_cursor.execute(sql_insert, (code,name,trade_day,open,close,high,low,differRange,volume,amount,limitupnum,suspendnum,risenum,fallnum,flatnum,componentnum,mv,pettm,pettm_mid,liqmv,rcnthigh,rcntlow))
hasInsertData = True
if hasInsertData:
db_mojin_cursor.commit()
print('successed transfer stockIndex ',trans_date , ' data.' )
print('\r\n')
def toTransStocks(db_conn_a,dbcursor_a,db_conn_b,dbcursor_b,trade_date):
# 连接数据库A ry库
db_a_conn = db_conn_a
db_ry_cursor = dbcursor_a
# 连接数据库B mojin库
db_b_conn = db_conn_b
db_mojin_cursor = dbcursor_b
trans_date = trade_date
mojin_stocks_map = {}
print('to transfer stocks ',trans_date , ' data.' )
#查询mojn库中的basis数据
sql_qurey = 'select code,trade_day,open,close,high,low,islimit,isdrop,differrange,volumn,amount,differrange10,differrange20,differrange60,avg_volume20,freefloat_market_value,total_market_value,agencies_hold from stocks where trade_day = %s'
db_mojin_cursor.execute(sql_qurey,trans_date)
basisDatas = db_mojin_cursor.fetchall()
for row in basisDatas:
code,trade_day,open,close,high,low,islimit,isdrop,differrange,volumn,amount,differrange10,differrange20,differrange60,avg_volume20,freefloat_market_value,total_market_value,agencies_hold = row
mojin_stocks_map[code] = (code,trade_day,open,close,high,low,islimit,isdrop,differrange,volumn,amount,differrange10,differrange20,differrange60,avg_volume20,freefloat_market_value,total_market_value,agencies_hold)
#查询ry库中的基础数据
sql_qurey = 'select code,trade_day,open,close,high,low,islimit,isdrop,differrange,volumn,amount,differrange10,differrange20,differrange60,avg_volume20,freefloat_market_value,total_market_value,agencies_hold from stocks where trade_day = %s'
db_ry_cursor.execute(sql_qurey,trans_date)
basisDatas = db_ry_cursor.fetchall()
hasInsertData = False
for row in basisDatas:
code,trade_day,open,close,high,low,islimit,isdrop,differrange,volumn,amount,differrange10,differrange20,differrange60,avg_volume20,freefloat_market_value,total_market_value,agencies_hold = row
# 数据存在时,不进行更新
if code in mojin_stocks_map:
continue
else:
sql_insert = f"INSERT INTO stocks (code,trade_day,open,close,high,low,islimit,isdrop,differrange,volumn,amount,differrange10,differrange20,differrange60,avg_volume20,freefloat_market_value,total_market_value,agencies_hold) VALUES ( %s, %s, %s, %s,%s, %s, %s, %s,%s, %s, %s, %s,%s, %s, %s, %s,%s, %s)"
db_mojin_cursor.execute(sql_insert, (code,trade_day,open,close,high,low,islimit,isdrop,differrange,volumn,amount,differrange10,differrange20,differrange60,avg_volume20,freefloat_market_value,total_market_value,agencies_hold))
hasInsertData = True
if hasInsertData:
db_mojin_cursor.commit()
print('successed transfer stocks ',trans_date , ' data.' )
print('\r\n')
def toTransStocksInTrend(db_conn_a,dbcursor_a,db_conn_b,dbcursor_b,trade_date):
# 连接数据库A ry库
db_a_conn = db_conn_a
db_ry_cursor = dbcursor_a
# 连接数据库B mojin库
db_b_conn = db_conn_b
db_mojin_cursor = dbcursor_b
trans_date = trade_date
mojin_stockintrend_map = {}
print('to transfer stocksInTrend ',trans_date , ' data.' )
#查询mojn库中的basis数据
sql_qurey = 'select code,trade_day,sort,type from stocks_in_trend where trade_day = %s'
db_mojin_cursor.execute(sql_qurey,trans_date)
basisDatas = db_mojin_cursor.fetchall()
for row in basisDatas:
code,trade_day,sort,type = row
mojin_stockintrend_map[code] = {type:{'code':code,'trade_day':trade_day,'sort':sort,'type':type}}
#查询ry库中的基础数据
sql_qurey = 'select code,trade_day,sort,type from stocks_in_trend where trade_day = %s'
db_ry_cursor.execute(sql_qurey)
basisDatas = db_ry_cursor.fetchall()
hasInsertData = False
for row in basisDatas:
code,trade_day,sort,type = row
# 数据存在时,不进行更新
if code in mojin_stockintrend_map:
continue
else:
sql_insert = f"INSERT INTO stocks_in_trend (code,trade_day,sort,type) VALUES ( %s, %s, %s, %s)"
db_mojin_cursor.execute(sql_insert, (code,trade_day,sort,type))
hasInsertData = True
if hasInsertData:
db_mojin_cursor.commit()
print('successed transfer stocksInTrend ',trans_date , ' data.' )
print('\r\n')
def toTransStocksLimit(db_conn_a,dbcursor_a,db_conn_b,dbcursor_b,trade_date):
# 连接数据库A ry库
db_a_conn = db_conn_a
db_ry_cursor = dbcursor_a
# 连接数据库B mojin库
db_b_conn = db_conn_b
db_mojin_cursor = dbcursor_b
trans_date = trade_date
mojin_stocklimit_map = {}
print('to transfer stocksLimit ',trans_date , ' data.' )
#查询mojn库中的basis数据
sql_qurey = 'select code,trade_day,islimit,isdrop from stocks_limit where trade_day = %s'
db_mojin_cursor.execute(sql_qurey,trans_date)
basisDatas = db_mojin_cursor.fetchall()
for row in basisDatas:
code,trade_day,sort,type = row
mojin_stocklimit_map[code] = (code,trade_day,sort,type)
#查询ry库中的基础数据
sql_qurey = 'select code,trade_day,islimit,isdrop from stocks_limit where trade_day = %s'
db_ry_cursor.execute(sql_qurey)
basisDatas = db_ry_cursor.fetchall()
hasInsertData = False
for row in basisDatas:
code,trade_day,islimit,isdrop = row
# 数据存在时,不进行更新
if code in mojin_stocklimit_map:
continue
else:
sql_insert = f"INSERT INTO stocks_limit (code,trade_day,islimit,isdrop) VALUES ( %s, %s, %s, %s)"
db_mojin_cursor.execute(sql_insert, (code,trade_day,islimit,isdrop))
hasInsertData = True
if hasInsertData:
db_mojin_cursor.commit()
print('successed transfer stocksLimit ',trans_date , ' data.' )
print('\r\n')
def toTransStocksLimitUp(db_conn_a,dbcursor_a,db_conn_b,dbcursor_b,trade_date):
# 连接数据库A ry库
db_a_conn = db_conn_a
db_ry_cursor = dbcursor_a
# 连接数据库B mojin库
db_b_conn = db_conn_b
db_mojin_cursor = dbcursor_b
trans_date = trade_date
mojin_stocklimitup_map = {}
print('to transfer stocksLimitUp ',trans_date , ' data.' )
#查询mojn库中的basis数据
sql_qurey = 'select code,trade_day from stocks_limit_up where trade_day = %s'
db_mojin_cursor.execute(sql_qurey,trans_date)
basisDatas = db_mojin_cursor.fetchall()
for row in basisDatas:
code,trade_day = row
mojin_stocklimitup_map[code] = (code,trade_day)
#查询ry库中的基础数据
sql_qurey = 'select code,trade_day from stocks_limit_up where trade_day = %s'
db_ry_cursor.execute(sql_qurey)
basisDatas = db_ry_cursor.fetchall()
hasInsertData = False
for row in basisDatas:
code,trade_day = row
# 数据存在时,不进行更新
if code in mojin_stocklimitup_map:
continue
else:
sql_insert = f"INSERT INTO stocks_limit _up(code,trade_day) VALUES ( %s, %s)"
db_mojin_cursor.execute(sql_insert, (code,trade_day))
hasInsertData = True
if hasInsertData:
db_mojin_cursor.commit()
print('successed transfer stocksLimitUp ',trans_date , ' data.' )
print('\r\n')
def toTransStocksNewRecord(db_conn_a,dbcursor_a,db_conn_b,dbcursor_b,trade_date):
# 连接数据库A ry库
db_a_conn = db_conn_a
db_ry_cursor = dbcursor_a
# 连接数据库B mojin库
db_b_conn = db_conn_b
db_mojin_cursor = dbcursor_b
trans_date = trade_date
mojin_stocksNeRecord_map = {}
print('to transfer stocksNewRecord ',trans_date , ' data.' )
#查询mojn库中的basis数据
sql_qurey = 'select code,trade_day,isHigh,isLow from stocks_new_record where trade_day = %s'
db_mojin_cursor.execute(sql_qurey,trans_date)
basisDatas = db_mojin_cursor.fetchall()
for row in basisDatas:
code,trade_day,isHigh,isLow = row
mojin_stocksNeRecord_map[code] = (code,trade_day,isHigh,isLow)
#查询ry库中的基础数据
sql_qurey = 'select code,trade_day,isHigh,isLow from stocks_new_record where trade_day = %s'
db_ry_cursor.execute(sql_qurey)
basisDatas = db_ry_cursor.fetchall()
hasInsertData = False
for row in basisDatas:
ode,trade_day,isHigh,isLow = row
# 数据存在时,不进行更新
if code in mojin_stocksNeRecord_map:
continue
else:
sql_insert = f"INSERT INTO stocks_new_record (ode,trade_day,isHigh,isLow) VALUES ( %s, %s, %s, %s)"
db_mojin_cursor.execute(sql_insert, (ode,trade_day,isHigh,isLow))
hasInsertData = True
if hasInsertData:
db_mojin_cursor.commit()
print('successed transfer stocksNewRecord ',trans_date , ' data.' )
print('\r\n')
def toTransStocksTmp(db_conn_a,dbcursor_a,db_conn_b,dbcursor_b,trade_date):
# 连接数据库A ry库
db_a_conn = db_conn_a
db_ry_cursor = dbcursor_a
# 连接数据库B mojin库
db_b_conn = db_conn_b
db_mojin_cursor = dbcursor_b
trans_date = trade_date
mojin_stocksTmp_map = {}
print('to transfer stocksTmp ',trans_date , ' data.' )
#查询mojn库中的basis数据
sql_qurey = 'select code,trade_day,differrange3,differrange5,differrange15,differrange30 from stocks_tmp where trade_day = %s'
db_mojin_cursor.execute(sql_qurey,trans_date)
basisDatas = db_mojin_cursor.fetchall()
for row in basisDatas:
code,trade_day,differrange3,differrange5,differrange15,differrange30 = row
mojin_stocksTmp_map[code] = (code,trade_day,differrange3,differrange5,differrange15,differrange30)
#查询ry库中的基础数据
sql_qurey = 'select code,trade_day,differrange3,differrange5,differrange15,differrange30 from stocks_tmp where trade_day = %s'
db_ry_cursor.execute(sql_qurey)
basisDatas = db_ry_cursor.fetchall()
hasInsertData = False
for row in basisDatas:
code,trade_day,differrange3,differrange5,differrange15,differrange30 = row
# 数据存在时,不进行更新
if code in mojin_stocksTmp_map:
continue
else:
sql_insert = f"INSERT INTO stocks_new_record (code,trade_day,differrange3,differrange5,differrange15,differrange30) VALUES ( %s, %s, %s, %s,%s,%s)"
db_mojin_cursor.execute(sql_insert, (code,trade_day,differrange3,differrange5,differrange15,differrange30))
hasInsertData = True
if hasInsertData:
db_mojin_cursor.commit()
print('successed transfer stocksTmp ',trans_date , ' data.' )
print('\r\n')
def toTransTrends(db_conn_a,dbcursor_a,db_conn_b,dbcursor_b,trade_date):
# 连接数据库A ry库
db_a_conn = db_conn_a
db_ry_cursor = dbcursor_a
# 连接数据库B mojin库
db_b_conn = db_conn_b
db_mojin_cursor = dbcursor_b
trans_date = trade_date
mojin_trends_map = {}
print('to transfer Trends ',trans_date , ' data.' )
#查询mojn库中的basis数据
sql_qurey = 'select trade_day,blemind2,stocks_count,trend_value,trend_value_change,sort,sort_change,type from trends where trade_day = %s'
db_mojin_cursor.execute(sql_qurey,trans_date)
basisDatas = db_mojin_cursor.fetchall()
for row in basisDatas:
trade_day,blemind2,stocks_count,trend_value,trend_value_change,sort,sort_change,type = row
mojin_trends_map[blemind2] = {type:(trade_day,blemind2,stocks_count,trend_value,trend_value_change,sort,sort_change,type)}
#查询ry库中的基础数据
sql_qurey = 'select trade_day,blemind2,stocks_count,trend_value,trend_value_change,sort,sort_change,type from trends where trade_day = %s'
db_ry_cursor.execute(sql_qurey)
basisDatas = db_ry_cursor.fetchall()
hasInsertData = False
for row in basisDatas:
trade_day,blemind2,stocks_count,trend_value,trend_value_change,sort,sort_change,type = row
# 数据存在时,不进行更新
if blemind2 in mojin_trends_map:
continue
else:
sql_insert = f"INSERT INTO trends (trade_day,blemind2,stocks_count,trend_value,trend_value_change,sort,sort_change,type) VALUES ( %s, %s, %s, %s,%s, %s, %s, %s)"
db_mojin_cursor.execute(sql_insert, (trade_day,blemind2,stocks_count,trend_value,trend_value_change,sort,sort_change,type))
hasInsertData = True
if hasInsertData:
db_mojin_cursor.commit()
print('successed transfer Trends ',trans_date , ' data.' )
print('\r\n')
def toTransRyOhterData(db_conn_a,dbcursor_a,db_conn_b,dbcursor_b,trade_date):
toTransStockBasis(db_conn_a,dbcursor_a,db_conn_b,dbcursor_b,trade_date)
# toTransStockFinancial(db_conn_a,dbcursor_a,db_conn_b,dbcursor_b,trade_date)
toTransStockIndex(db_conn_a,dbcursor_a,db_conn_b,dbcursor_b,trade_date)
toTransStocks(db_conn_a,dbcursor_a,db_conn_b,dbcursor_b,trade_date)
toTransStocksInTrend(db_conn_a,dbcursor_a,db_conn_b,dbcursor_b,trade_date)
toTransStocksLimit(db_conn_a,dbcursor_a,db_conn_b,dbcursor_b,trade_date)
toTransStocksLimitUp(db_conn_a,dbcursor_a,db_conn_b,dbcursor_b,trade_date)
toTransStocksNewRecord(db_conn_a,dbcursor_a,db_conn_b,dbcursor_b,trade_date)
toTransStocksTmp(db_conn_a,dbcursor_a,db_conn_b,dbcursor_b,trade_date)
toTransTrends(db_conn_a,dbcursor_a,db_conn_b,dbcursor_b,trade_date)
if __name__ == "__main__":
# 数据库A的连接信息腾讯云数据库
db_a_config = {
'host': '124.223.98.178',
'user': 'root',
'password': '1qazse42W3',
'db': 'ry',
'charset': 'utf8mb4',
}
# 数据库B的连接信息腾讯云数据库
db_b_config = {
'host': '124.223.98.178',
'user': 'root',
'password': '1qazse42W3',
'db': 'mojin',
'charset': 'utf8mb4',
}
try:
# 连接数据库A
db_a_conn = pymysql.connect(**db_a_config)
db_a_cursor = db_a_conn.cursor()
# 连接数据库B
db_b_conn = pymysql.connect(**db_b_config)
db_b_cursor = db_b_conn.cursor()
trade_day = '2023-08-09'
#测试
toTransStocksInTrend(db_a_conn,db_a_cursor,db_b_conn,db_b_cursor,trade_day)
except Exception as e:
print(f"出现错误:{e}")
finally:
# 关闭连接
if db_a_cursor:
db_a_cursor.close()
if db_a_conn:
db_a_conn.close()
if db_b_cursor:
db_b_cursor.close()
if db_b_conn:
db_b_conn.close()