import pymysql #a ry库 b是mojn库 def toTransStockBasis(db_conn_a,dbcursor_a,db_conn_b,dbcursor_b,trade_date): # 连接数据库A ry库 db_a_conn = db_conn_a db_ry_cursor = dbcursor_a # 连接数据库B mojin库 db_b_conn = db_conn_b db_mojin_cursor = dbcursor_b trans_date = trade_date mojin_stock_basic_map = {} print('to transfer stockBasis ',trans_date , ' data.' ) #查询mojn库中的basis数据 sql_qurey = 'select code,name,blemind2,blemind3 from stock_basis' db_mojin_cursor.execute(sql_qurey) basisDatas = db_mojin_cursor.fetchall() for row in basisDatas: code,name,blemind2,blemind3 = row mojin_stock_basic_map[code] = (code,name,blemind2,blemind3) #查询ry库中的基础数据 sql_qurey = 'select code,name,blemind2,blemind3 from stock_basis' db_ry_cursor.execute(sql_qurey) basisDatas = db_ry_cursor.fetchall() hasInsertData = False for row in basisDatas: code,name,blemind2,blemind3 = row if code in mojin_stock_basic_map: if mojin_stock_basic_map[code][1] != name: sql_insert = 'update stock_basis set name = %s where code = %s' db_mojin_cursor.execute(sql_insert, (name,code)) hasInsertData = True else: continue else: sql_insert = f"INSERT INTO stock_basis (code,name,blemind2,blemind3) VALUES ( %s, %s, %s, %s)" db_mojin_cursor.execute(sql_insert, (code,name,blemind2,blemind3)) hasInsertData = True if hasInsertData: db_b_conn.commit() print('successed transfer stockBasis ',trans_date , ' data.' ) print('\r\n') #只在财报最后几天进行更新 def toTransStockFinancial(db_conn_a,dbcursor_a,db_conn_b,dbcursor_b,trade_date): # 连接数据库A ry库 db_a_conn = db_conn_a db_ry_cursor = dbcursor_a # 连接数据库B mojin库 db_b_conn = db_conn_b db_mojin_cursor = dbcursor_b trans_date = trade_date mojin_stock_financial_map = {} print('to transfer stockFinancial ',trans_date , ' data.' ) #查询mojn库中的basis数据 sql_qurey = 'select code,period,jlrtbzzl,jzcsylroe,epsbasic,jlr,jbmgsy,mgjzc from stock_financial where period = %s' db_mojin_cursor.execute(sql_qurey,trade_date) basisDatas = db_mojin_cursor.fetchall() for row in basisDatas: code,period,jlrtbzzl,jzcsylroe,epsbasic,jlr,jbmgsy,mgjzc = row mojin_stock_financial_map[code] = (code,period,jlrtbzzl,jzcsylroe,epsbasic,jlr,jbmgsy,mgjzc) #查询ry库中的基础数据 sql_qurey = 'select code,period,jlrtbzzl,jzcsylroe,epsbasic,jlr,jbmgsy,mgjzc from stock_financial where period = %s' db_ry_cursor.execute(sql_qurey,trade_date) basisDatas = db_ry_cursor.fetchall() hasInsertData = False for row in basisDatas: code,period,jlrtbzzl,jzcsylroe,epsbasic,jlr,jbmgsy,mgjzc = row if code in mojin_stock_financial_map: if mojin_stock_financial_map[code][1] == period: continue else: sql_insert = f"INSERT INTO stock_financial (code,period,jlrtbzzl,jzcsylroe,epsbasic,jlr,jbmgsy,mgjzc) VALUES ( %s, %s, %s, %s,%s, %s, %s, %s)" db_mojin_cursor.execute(sql_insert, (code,period,jlrtbzzl,jzcsylroe,epsbasic,jlr,jbmgsy,mgjzc)) hasInsertData = True else: sql_insert = f"INSERT INTO stock_financial (code,period,jlrtbzzl,jzcsylroe,epsbasic,jlr,jbmgsy,mgjzc) VALUES ( %s, %s, %s, %s,%s, %s, %s, %s)" db_mojin_cursor.execute(sql_insert, (code,period,jlrtbzzl,jzcsylroe,epsbasic,jlr,jbmgsy,mgjzc)) hasInsertData = True if hasInsertData: db_b_conn.commit() print('successed transfer stockFinancial ',trans_date , ' data.' ) print('\r\n') def toTransStockIndex(db_conn_a,dbcursor_a,db_conn_b,dbcursor_b,trade_date): # 连接数据库A ry库 db_a_conn = db_conn_a db_ry_cursor = dbcursor_a # 连接数据库B mojin库 db_b_conn = db_conn_b db_mojin_cursor = dbcursor_b trans_date = trade_date mojin_stock_index_map = {} print('to transfer stockIndex ',trans_date , ' data.' ) #查询mojn库中的basis数据 sql_qurey = 'select code,name,trade_day,open,close,high,low,differRange,volume,amount,limitupnum,suspendnum,risenum,fallnum,flatnum,componentnum,mv,pettm,pettm_mid,liqmv,rcnthigh,rcntlow from stock_index where trade_day = %s' db_mojin_cursor.execute(sql_qurey,trade_date) basisDatas = db_mojin_cursor.fetchall() for row in basisDatas: code,name,trade_day,open,close,high,low,differRange,volume,amount,limitupnum,suspendnum,risenum,fallnum,flatnum,componentnum,mv,pettm,pettm_mid,liqmv,rcnthigh,rcntlow = row mojin_stock_index_map[code] = (code,name,trade_day,open,close,high,low,differRange,volume,amount,limitupnum,suspendnum,risenum,fallnum,flatnum,componentnum,mv,pettm,pettm_mid,liqmv,rcnthigh,rcntlow) #查询ry库中的基础数据 sql_qurey = 'select code,name,trade_day,open,close,high,low,differRange,volume,amount,limitupnum,suspendnum,risenum,fallnum,flatnum,componentnum,mv,pettm,pettm_mid,liqmv,rcnthigh,rcntlow from stock_index where trade_day = %s' db_ry_cursor.execute(sql_qurey,trade_date) basisDatas = db_ry_cursor.fetchall() hasInsertData = False for row in basisDatas: code,name,trade_day,open,close,high,low,differRange,volume,amount,limitupnum,suspendnum,risenum,fallnum,flatnum,componentnum,mv,pettm,pettm_mid,liqmv,rcnthigh,rcntlow = row # 数据存在时,不进行更新 if code in mojin_stock_index_map: continue else: sql_insert = f"INSERT INTO stock_index (code,name,trade_day,open,close,high,low,differRange,volume,amount,limitupnum,suspendnum,risenum,fallnum,flatnum,componentnum,mv,pettm,pettm_mid,liqmv,rcnthigh,rcntlow) VALUES ( %s, %s, %s, %s,%s, %s, %s, %s,%s, %s, %s, %s,%s, %s, %s, %s,%s, %s, %s, %s,%s, %s)" db_mojin_cursor.execute(sql_insert, (code,name,trade_day,open,close,high,low,differRange,volume,amount,limitupnum,suspendnum,risenum,fallnum,flatnum,componentnum,mv,pettm,pettm_mid,liqmv,rcnthigh,rcntlow)) hasInsertData = True if hasInsertData: db_b_conn.commit() print('successed transfer stockIndex ',trans_date , ' data.' ) print('\r\n') def toTransStocks(db_conn_a,dbcursor_a,db_conn_b,dbcursor_b,trade_date): # 连接数据库A ry库 db_a_conn = db_conn_a db_ry_cursor = dbcursor_a # 连接数据库B mojin库 db_b_conn = db_conn_b db_mojin_cursor = dbcursor_b trans_date = trade_date mojin_stocks_map = {} print('to transfer stocks ',trans_date , ' data.' ) #查询mojn库中的basis数据 sql_qurey = 'select code,trade_day,open,close,high,low,islimit,isdrop,differrange,volumn,amount,differrange10,differrange20,differrange60,avg_volume20,freefloat_market_value,total_market_value,agencies_hold from stocks where trade_day = %s' db_mojin_cursor.execute(sql_qurey,trans_date) basisDatas = db_mojin_cursor.fetchall() for row in basisDatas: code,trade_day,open,close,high,low,islimit,isdrop,differrange,volumn,amount,differrange10,differrange20,differrange60,avg_volume20,freefloat_market_value,total_market_value,agencies_hold = row mojin_stocks_map[code] = (code,trade_day,open,close,high,low,islimit,isdrop,differrange,volumn,amount,differrange10,differrange20,differrange60,avg_volume20,freefloat_market_value,total_market_value,agencies_hold) #查询ry库中的基础数据 sql_qurey = 'select code,trade_day,open,close,high,low,islimit,isdrop,differrange,volumn,amount,differrange10,differrange20,differrange60,avg_volume20,freefloat_market_value,total_market_value,agencies_hold from stocks where trade_day = %s' db_ry_cursor.execute(sql_qurey,trans_date) basisDatas = db_ry_cursor.fetchall() hasInsertData = False for row in basisDatas: code,trade_day,open,close,high,low,islimit,isdrop,differrange,volumn,amount,differrange10,differrange20,differrange60,avg_volume20,freefloat_market_value,total_market_value,agencies_hold = row # 数据存在时,不进行更新 if code in mojin_stocks_map: continue else: sql_insert = f"INSERT INTO stocks (code,trade_day,open,close,high,low,islimit,isdrop,differrange,volumn,amount,differrange10,differrange20,differrange60,avg_volume20,freefloat_market_value,total_market_value,agencies_hold) VALUES ( %s, %s, %s, %s,%s, %s, %s, %s,%s, %s, %s, %s,%s, %s, %s, %s,%s, %s)" db_mojin_cursor.execute(sql_insert, (code,trade_day,open,close,high,low,islimit,isdrop,differrange,volumn,amount,differrange10,differrange20,differrange60,avg_volume20,freefloat_market_value,total_market_value,agencies_hold)) hasInsertData = True if hasInsertData: db_b_conn.commit() print('successed transfer stocks ',trans_date , ' data.' ) print('\r\n') def toTransStocksInTrend(db_conn_a,dbcursor_a,db_conn_b,dbcursor_b,trade_date): # 连接数据库A ry库 db_a_conn = db_conn_a db_ry_cursor = dbcursor_a # 连接数据库B mojin库 db_b_conn = db_conn_b db_mojin_cursor = dbcursor_b trans_date = trade_date mojin_stockintrend_map = {} print('to transfer stocksInTrend ',trans_date , ' data.' ) #查询mojn库中的basis数据 sql_qurey = 'select code,trade_day,sort,type from stocks_in_trend where trade_day = %s' db_mojin_cursor.execute(sql_qurey,trans_date) basisDatas = db_mojin_cursor.fetchall() for row in basisDatas: code,trade_day,sort,type = row mojin_stockintrend_map[code] = {type:{'code':code,'trade_day':trade_day,'sort':sort,'type':type}} #查询ry库中的基础数据 sql_qurey = 'select code,trade_day,sort,type from stocks_in_trend where trade_day = %s' db_ry_cursor.execute(sql_qurey,trans_date) basisDatas = db_ry_cursor.fetchall() hasInsertData = False for row in basisDatas: code,trade_day,sort,type = row # 数据存在时,不进行更新 if code in mojin_stockintrend_map: continue else: sql_insert = f"INSERT INTO stocks_in_trend (code,trade_day,sort,type) VALUES ( %s, %s, %s, %s)" db_mojin_cursor.execute(sql_insert, (code,trade_day,sort,type)) hasInsertData = True if hasInsertData: db_b_conn.commit() print('successed transfer stocksInTrend ',trans_date , ' data.' ) print('\r\n') def toTransStocksLimit(db_conn_a,dbcursor_a,db_conn_b,dbcursor_b,trade_date): # 连接数据库A ry库 db_a_conn = db_conn_a db_ry_cursor = dbcursor_a # 连接数据库B mojin库 db_b_conn = db_conn_b db_mojin_cursor = dbcursor_b trans_date = trade_date mojin_stocklimit_map = {} print('to transfer stocksLimit ',trans_date , ' data.' ) #查询mojn库中的basis数据 sql_qurey = 'select code,trade_day,islimit,isdrop from stocks_limit where trade_day = %s' db_mojin_cursor.execute(sql_qurey,trans_date) basisDatas = db_mojin_cursor.fetchall() for row in basisDatas: code,trade_day,sort,type = row mojin_stocklimit_map[code] = (code,trade_day,sort,type) #查询ry库中的基础数据 sql_qurey = 'select code,trade_day,islimit,isdrop from stocks_limit where trade_day = %s' db_ry_cursor.execute(sql_qurey,trans_date) basisDatas = db_ry_cursor.fetchall() hasInsertData = False for row in basisDatas: code,trade_day,islimit,isdrop = row # 数据存在时,不进行更新 if code in mojin_stocklimit_map: continue else: sql_insert = f"INSERT INTO stocks_limit (code,trade_day,islimit,isdrop) VALUES ( %s, %s, %s, %s)" db_mojin_cursor.execute(sql_insert, (code,trade_day,islimit,isdrop)) hasInsertData = True if hasInsertData: db_b_conn.commit() print('successed transfer stocksLimit ',trans_date , ' data.' ) print('\r\n') def toTransStocksLimitUp(db_conn_a,dbcursor_a,db_conn_b,dbcursor_b,trade_date): # 连接数据库A ry库 db_a_conn = db_conn_a db_ry_cursor = dbcursor_a # 连接数据库B mojin库 db_b_conn = db_conn_b db_mojin_cursor = dbcursor_b trans_date = trade_date mojin_stocklimitup_map = {} print('to transfer stocksLimitUp ',trans_date , ' data.' ) #查询mojn库中的basis数据 sql_qurey = 'select code,trade_day from stocks_limit_up where trade_day = %s' db_mojin_cursor.execute(sql_qurey,trans_date) basisDatas = db_mojin_cursor.fetchall() for row in basisDatas: code,trade_day = row mojin_stocklimitup_map[code] = (code,trade_day) #查询ry库中的基础数据 sql_qurey = 'select code,trade_day from stocks_limit_up where trade_day = %s' db_ry_cursor.execute(sql_qurey,trans_date) basisDatas = db_ry_cursor.fetchall() hasInsertData = False for row in basisDatas: code,trade_day = row # 数据存在时,不进行更新 if code in mojin_stocklimitup_map: continue else: sql_insert = f"INSERT INTO stocks_limit _up(code,trade_day) VALUES ( %s, %s)" db_mojin_cursor.execute(sql_insert, (code,trade_day)) hasInsertData = True if hasInsertData: db_b_conn.commit() print('successed transfer stocksLimitUp ',trans_date , ' data.' ) print('\r\n') def toTransStocksNewRecord(db_conn_a,dbcursor_a,db_conn_b,dbcursor_b,trade_date): # 连接数据库A ry库 db_a_conn = db_conn_a db_ry_cursor = dbcursor_a # 连接数据库B mojin库 db_b_conn = db_conn_b db_mojin_cursor = dbcursor_b trans_date = trade_date mojin_stocksNeRecord_map = {} print('to transfer stocksNewRecord ',trans_date , ' data.' ) #查询mojn库中的basis数据 sql_qurey = 'select code,trade_day,isHigh,isLow from stocks_new_record where trade_day = %s' db_mojin_cursor.execute(sql_qurey,trans_date) basisDatas = db_mojin_cursor.fetchall() for row in basisDatas: code,trade_day,isHigh,isLow = row mojin_stocksNeRecord_map[code] = (code,trade_day,isHigh,isLow) #查询ry库中的基础数据 sql_qurey = 'select code,trade_day,isHigh,isLow from stocks_new_record where trade_day = %s' db_ry_cursor.execute(sql_qurey,trans_date) basisDatas = db_ry_cursor.fetchall() hasInsertData = False for row in basisDatas: code,trade_day,isHigh,isLow = row # 数据存在时,不进行更新 if code in mojin_stocksNeRecord_map: continue else: sql_insert = f"INSERT INTO stocks_new_record (code,trade_day,isHigh,isLow) VALUES ( %s, %s, %s, %s)" db_mojin_cursor.execute(sql_insert, (code,trade_day,isHigh,isLow)) hasInsertData = True if hasInsertData: db_b_conn.commit() print('successed transfer stocksNewRecord ',trans_date , ' data.' ) print('\r\n') def toTransStocksTmp(db_conn_a,dbcursor_a,db_conn_b,dbcursor_b,trade_date): # 连接数据库A ry库 db_a_conn = db_conn_a db_ry_cursor = dbcursor_a # 连接数据库B mojin库 db_b_conn = db_conn_b db_mojin_cursor = dbcursor_b trans_date = trade_date mojin_stocksTmp_map = {} print('to transfer stocksTmp ',trans_date , ' data.' ) #查询mojn库中的basis数据 sql_qurey = 'select code,trade_day,differrange3,differrange5,differrange15,differrange30 from stocks_tmp where trade_day = %s' db_mojin_cursor.execute(sql_qurey,trans_date) basisDatas = db_mojin_cursor.fetchall() for row in basisDatas: code,trade_day,differrange3,differrange5,differrange15,differrange30 = row mojin_stocksTmp_map[code] = (code,trade_day,differrange3,differrange5,differrange15,differrange30) #查询ry库中的基础数据 sql_qurey = 'select code,trade_day,differrange3,differrange5,differrange15,differrange30 from stocks_tmp where trade_day = %s' db_ry_cursor.execute(sql_qurey,trans_date) basisDatas = db_ry_cursor.fetchall() hasInsertData = False for row in basisDatas: code,trade_day,differrange3,differrange5,differrange15,differrange30 = row # 数据存在时,不进行更新 if code in mojin_stocksTmp_map: continue else: sql_insert = f"INSERT INTO stocks_tmp (code,trade_day,differrange3,differrange5,differrange15,differrange30) VALUES ( %s, %s, %s, %s,%s,%s)" db_mojin_cursor.execute(sql_insert, (code,trade_day,differrange3,differrange5,differrange15,differrange30)) hasInsertData = True if hasInsertData: db_b_conn.commit() print('successed transfer stocksTmp ',trans_date , ' data.' ) print('\r\n') def toTransTrends(db_conn_a,dbcursor_a,db_conn_b,dbcursor_b,trade_date): # 连接数据库A ry库 db_a_conn = db_conn_a db_ry_cursor = dbcursor_a # 连接数据库B mojin库 db_b_conn = db_conn_b db_mojin_cursor = dbcursor_b trans_date = trade_date mojin_trends_map = {} print('to transfer Trends ',trans_date , ' data.' ) #查询mojn库中的basis数据 sql_qurey = 'select trade_day,blemind2,stocks_count,trend_value,trend_value_change,sort,sort_change,type from trends where trade_day = %s' db_mojin_cursor.execute(sql_qurey,trans_date) basisDatas = db_mojin_cursor.fetchall() for row in basisDatas: trade_day,blemind2,stocks_count,trend_value,trend_value_change,sort,sort_change,type = row mojin_trends_map[blemind2] = {type:(trade_day,blemind2,stocks_count,trend_value,trend_value_change,sort,sort_change,type)} #查询ry库中的基础数据 sql_qurey = 'select trade_day,blemind2,stocks_count,trend_value,trend_value_change,sort,sort_change,type from trends where trade_day = %s' db_ry_cursor.execute(sql_qurey,trans_date) basisDatas = db_ry_cursor.fetchall() hasInsertData = False for row in basisDatas: trade_day,blemind2,stocks_count,trend_value,trend_value_change,sort,sort_change,type = row # 数据存在时,不进行更新 if blemind2 in mojin_trends_map: continue else: sql_insert = f"INSERT INTO trends (trade_day,blemind2,stocks_count,trend_value,trend_value_change,sort,sort_change,type) VALUES ( %s, %s, %s, %s,%s, %s, %s, %s)" db_mojin_cursor.execute(sql_insert, (trade_day,blemind2,stocks_count,trend_value,trend_value_change,sort,sort_change,type)) hasInsertData = True if hasInsertData: db_b_conn.commit() print('successed transfer Trends ',trans_date , ' data.' ) print('\r\n') def toTransRyOhterData(db_conn_a,dbcursor_a,db_conn_b,dbcursor_b,trade_date): toTransStockBasis(db_conn_a,dbcursor_a,db_conn_b,dbcursor_b,trade_date) # toTransStockFinancial(db_conn_a,dbcursor_a,db_conn_b,dbcursor_b,trade_date) toTransStockIndex(db_conn_a,dbcursor_a,db_conn_b,dbcursor_b,trade_date) toTransStocks(db_conn_a,dbcursor_a,db_conn_b,dbcursor_b,trade_date) toTransStocksInTrend(db_conn_a,dbcursor_a,db_conn_b,dbcursor_b,trade_date) toTransStocksLimit(db_conn_a,dbcursor_a,db_conn_b,dbcursor_b,trade_date) toTransStocksLimitUp(db_conn_a,dbcursor_a,db_conn_b,dbcursor_b,trade_date) toTransStocksNewRecord(db_conn_a,dbcursor_a,db_conn_b,dbcursor_b,trade_date) toTransStocksTmp(db_conn_a,dbcursor_a,db_conn_b,dbcursor_b,trade_date) toTransTrends(db_conn_a,dbcursor_a,db_conn_b,dbcursor_b,trade_date) if __name__ == "__main__": # 数据库A的连接信息;腾讯云数据库 db_a_config = { 'host': '124.223.98.178', 'user': 'root', 'password': '1qazse42W3', 'db': 'ry', 'charset': 'utf8mb4', } # 数据库B的连接信息;腾讯云数据库 db_b_config = { 'host': '124.223.98.178', 'user': 'root', 'password': '1qazse42W3', 'db': 'mojin', 'charset': 'utf8mb4', } try: # 连接数据库A db_a_conn = pymysql.connect(**db_a_config) db_a_cursor = db_a_conn.cursor() # 连接数据库B db_b_conn = pymysql.connect(**db_b_config) db_b_cursor = db_b_conn.cursor() trade_day = '2023-08-10' #测试 # toTransStocksInTrend(db_a_conn,db_a_cursor,db_b_conn,db_b_cursor,trade_day) # toTransStockBasis(db_a_conn,db_a_cursor,db_b_conn,db_b_cursor,trade_day) # # toTransStockFinancial(db_conn_a,dbcursor_a,db_conn_b,dbcursor_b,trade_date) # toTransStockIndex(db_a_conn,db_a_cursor,db_b_conn,db_b_cursor,trade_day) # toTransStocks(db_a_conn,db_a_cursor,db_b_conn,db_b_cursor,trade_day) # toTransStocksInTrend(db_a_conn,db_a_cursor,db_b_conn,db_b_cursor,trade_day) # toTransStocksLimit(db_a_conn,db_a_cursor,db_b_conn,db_b_cursor,trade_day) # toTransStocksLimitUp(db_a_conn,db_a_cursor,db_b_conn,db_b_cursor,trade_day) toTransStocksNewRecord(db_a_conn,db_a_cursor,db_b_conn,db_b_cursor,trade_day) toTransStocksTmp(db_a_conn,db_a_cursor,db_b_conn,db_b_cursor,trade_day) toTransTrends(db_a_conn,db_a_cursor,db_b_conn,db_b_cursor,trade_day) except Exception as e: print(f"出现错误:{e}") finally: # 关闭连接 if db_a_cursor: db_a_cursor.close() if db_a_conn: db_a_conn.close() if db_b_cursor: db_b_cursor.close() if db_b_conn: db_b_conn.close()