From ef509f3f51075465ddcd453e81b0e0bc9f6701c1 Mon Sep 17 00:00:00 2001 From: laixingyu Date: Mon, 14 Aug 2023 14:48:55 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E5=AE=8C=E6=88=90=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E4=BC=A0=E8=BE=93?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- transferOhterRyData_python.py | 418 +++++++++++++++++++++++++++++++++- 1 file changed, 407 insertions(+), 11 deletions(-) diff --git a/transferOhterRyData_python.py b/transferOhterRyData_python.py index 04c2059..ce436fa 100644 --- a/transferOhterRyData_python.py +++ b/transferOhterRyData_python.py @@ -13,6 +13,7 @@ def toTransStockBasis(db_conn_a,dbcursor_a,db_conn_b,dbcursor_b,trade_date): mojin_stock_basic_map = {} + print('to transfer stockBasis ',trans_date , ' data.' ) #查询mojn库中的basis数据 sql_qurey = 'select code,name,blemind2,blemind3 from stock_basis' db_mojin_cursor.execute(sql_qurey) @@ -45,36 +46,383 @@ def toTransStockBasis(db_conn_a,dbcursor_a,db_conn_b,dbcursor_b,trade_date): print('successed transfer stockBasis ',trans_date , ' data.' ) print('\r\n') +#只在财报最后几天进行更新 def toTransStockFinancial(db_conn_a,dbcursor_a,db_conn_b,dbcursor_b,trade_date): - return True + # 连接数据库A ry库 + db_a_conn = db_conn_a + db_ry_cursor = dbcursor_a + # 连接数据库B mojin库 + db_b_conn = db_conn_b + db_mojin_cursor = dbcursor_b + + trans_date = trade_date + + mojin_stock_financial_map = {} + + print('to transfer stockFinancial ',trans_date , ' data.' ) + #查询mojn库中的basis数据 + sql_qurey = 'select code,period,jlrtbzzl,jzcsylroe,epsbasic,jlr,jbmgsy,mgjzc from stock_financial where period = %s' + db_mojin_cursor.execute(sql_qurey,trade_date) + basisDatas = db_mojin_cursor.fetchall() + for row in basisDatas: + code,period,jlrtbzzl,jzcsylroe,epsbasic,jlr,jbmgsy,mgjzc = row + mojin_stock_financial_map[code] = (code,period,jlrtbzzl,jzcsylroe,epsbasic,jlr,jbmgsy,mgjzc) + + #查询ry库中的基础数据 + sql_qurey = 'select code,period,jlrtbzzl,jzcsylroe,epsbasic,jlr,jbmgsy,mgjzc from stock_financial where period = %s' + db_ry_cursor.execute(sql_qurey,trade_date) + basisDatas = db_ry_cursor.fetchall() + hasInsertData = False + for row in basisDatas: + code,period,jlrtbzzl,jzcsylroe,epsbasic,jlr,jbmgsy,mgjzc = row + if code in mojin_stock_financial_map: + if mojin_stock_financial_map[code][1] == period: + continue + else: + sql_insert = f"INSERT INTO stock_financial (code,period,jlrtbzzl,jzcsylroe,epsbasic,jlr,jbmgsy,mgjzc) VALUES ( %s, %s, %s, %s,%s, %s, %s, %s)" + db_mojin_cursor.execute(sql_insert, (code,period,jlrtbzzl,jzcsylroe,epsbasic,jlr,jbmgsy,mgjzc)) + hasInsertData = True + else: + sql_insert = f"INSERT INTO stock_financial (code,period,jlrtbzzl,jzcsylroe,epsbasic,jlr,jbmgsy,mgjzc) VALUES ( %s, %s, %s, %s,%s, %s, %s, %s)" + db_mojin_cursor.execute(sql_insert, (code,period,jlrtbzzl,jzcsylroe,epsbasic,jlr,jbmgsy,mgjzc)) + hasInsertData = True + if hasInsertData: + db_mojin_cursor.commit() + + print('successed transfer stockFinancial ',trans_date , ' data.' ) + print('\r\n') def toTransStockIndex(db_conn_a,dbcursor_a,db_conn_b,dbcursor_b,trade_date): - return True + # 连接数据库A ry库 + db_a_conn = db_conn_a + db_ry_cursor = dbcursor_a + # 连接数据库B mojin库 + db_b_conn = db_conn_b + db_mojin_cursor = dbcursor_b + + trans_date = trade_date + + mojin_stock_index_map = {} + + print('to transfer stockIndex ',trans_date , ' data.' ) + #查询mojn库中的basis数据 + sql_qurey = 'select code,name,trade_day,open,close,high,low,differRange,volume,amount,limitupnum,suspendnum,risenum,fallnum,flatnum,componentnum,mv,pettm,pettm_mid,liqmv,rcnthigh,rcntlow from stock_index where trade_day = %s' + db_mojin_cursor.execute(sql_qurey,trade_date) + basisDatas = db_mojin_cursor.fetchall() + for row in basisDatas: + code,name,trade_day,open,close,high,low,differRange,volume,amount,limitupnum,suspendnum,risenum,fallnum,flatnum,componentnum,mv,pettm,pettm_mid,liqmv,rcnthigh,rcntlow = row + mojin_stock_index_map[code] = (code,name,trade_day,open,close,high,low,differRange,volume,amount,limitupnum,suspendnum,risenum,fallnum,flatnum,componentnum,mv,pettm,pettm_mid,liqmv,rcnthigh,rcntlow) + + #查询ry库中的基础数据 + sql_qurey = 'select code,name,trade_day,open,close,high,low,differRange,volume,amount,limitupnum,suspendnum,risenum,fallnum,flatnum,componentnum,mv,pettm,pettm_mid,liqmv,rcnthigh,rcntlow from stock_index where trade_day = %s' + db_ry_cursor.execute(sql_qurey,trade_date) + basisDatas = db_ry_cursor.fetchall() + hasInsertData = False + for row in basisDatas: + code,name,trade_day,open,close,high,low,differRange,volume,amount,limitupnum,suspendnum,risenum,fallnum,flatnum,componentnum,mv,pettm,pettm_mid,liqmv,rcnthigh,rcntlow = row + # 数据存在时,不进行更新 + if code in mojin_stock_index_map: + continue + else: + sql_insert = f"INSERT INTO stock_index (code,name,trade_day,open,close,high,low,differRange,volume,amount,limitupnum,suspendnum,risenum,fallnum,flatnum,componentnum,mv,pettm,pettm_mid,liqmv,rcnthigh,rcntlow) VALUES ( %s, %s, %s, %s,%s, %s, %s, %s,%s, %s, %s, %s,%s, %s, %s, %s,%s, %s, %s, %s,%s, %s)" + db_mojin_cursor.execute(sql_insert, (code,name,trade_day,open,close,high,low,differRange,volume,amount,limitupnum,suspendnum,risenum,fallnum,flatnum,componentnum,mv,pettm,pettm_mid,liqmv,rcnthigh,rcntlow)) + hasInsertData = True + if hasInsertData: + db_mojin_cursor.commit() + + print('successed transfer stockIndex ',trans_date , ' data.' ) + print('\r\n') def toTransStocks(db_conn_a,dbcursor_a,db_conn_b,dbcursor_b,trade_date): - return True + # 连接数据库A ry库 + db_a_conn = db_conn_a + db_ry_cursor = dbcursor_a + # 连接数据库B mojin库 + db_b_conn = db_conn_b + db_mojin_cursor = dbcursor_b + + trans_date = trade_date + + mojin_stocks_map = {} + + print('to transfer stocks ',trans_date , ' data.' ) + #查询mojn库中的basis数据 + sql_qurey = 'select code,trade_day,open,close,high,low,islimit,isdrop,differrange,volumn,amount,differrange10,differrange20,differrange60,avg_volume20,freefloat_market_value,total_market_value,agencies_hold from stocks where trade_day = %s' + db_mojin_cursor.execute(sql_qurey,trans_date) + basisDatas = db_mojin_cursor.fetchall() + for row in basisDatas: + code,trade_day,open,close,high,low,islimit,isdrop,differrange,volumn,amount,differrange10,differrange20,differrange60,avg_volume20,freefloat_market_value,total_market_value,agencies_hold = row + mojin_stocks_map[code] = (code,trade_day,open,close,high,low,islimit,isdrop,differrange,volumn,amount,differrange10,differrange20,differrange60,avg_volume20,freefloat_market_value,total_market_value,agencies_hold) + + #查询ry库中的基础数据 + sql_qurey = 'select code,trade_day,open,close,high,low,islimit,isdrop,differrange,volumn,amount,differrange10,differrange20,differrange60,avg_volume20,freefloat_market_value,total_market_value,agencies_hold from stocks where trade_day = %s' + db_ry_cursor.execute(sql_qurey,trans_date) + basisDatas = db_ry_cursor.fetchall() + hasInsertData = False + for row in basisDatas: + code,trade_day,open,close,high,low,islimit,isdrop,differrange,volumn,amount,differrange10,differrange20,differrange60,avg_volume20,freefloat_market_value,total_market_value,agencies_hold = row + # 数据存在时,不进行更新 + if code in mojin_stocks_map: + continue + else: + sql_insert = f"INSERT INTO stocks (code,trade_day,open,close,high,low,islimit,isdrop,differrange,volumn,amount,differrange10,differrange20,differrange60,avg_volume20,freefloat_market_value,total_market_value,agencies_hold) VALUES ( %s, %s, %s, %s,%s, %s, %s, %s,%s, %s, %s, %s,%s, %s, %s, %s,%s, %s)" + db_mojin_cursor.execute(sql_insert, (code,trade_day,open,close,high,low,islimit,isdrop,differrange,volumn,amount,differrange10,differrange20,differrange60,avg_volume20,freefloat_market_value,total_market_value,agencies_hold)) + hasInsertData = True + if hasInsertData: + db_mojin_cursor.commit() + + print('successed transfer stocks ',trans_date , ' data.' ) + print('\r\n') def toTransStocksInTrend(db_conn_a,dbcursor_a,db_conn_b,dbcursor_b,trade_date): - return True + # 连接数据库A ry库 + db_a_conn = db_conn_a + db_ry_cursor = dbcursor_a + # 连接数据库B mojin库 + db_b_conn = db_conn_b + db_mojin_cursor = dbcursor_b + + trans_date = trade_date + + mojin_stockintrend_map = {} + + print('to transfer stocksInTrend ',trans_date , ' data.' ) + #查询mojn库中的basis数据 + sql_qurey = 'select code,trade_day,sort,type from stocks_in_trend where trade_day = %s' + db_mojin_cursor.execute(sql_qurey,trans_date) + basisDatas = db_mojin_cursor.fetchall() + for row in basisDatas: + code,trade_day,sort,type = row + mojin_stockintrend_map[code] = {type:{'code':code,'trade_day':trade_day,'sort':sort,'type':type}} + + #查询ry库中的基础数据 + sql_qurey = 'select code,trade_day,sort,type from stocks_in_trend where trade_day = %s' + db_ry_cursor.execute(sql_qurey) + basisDatas = db_ry_cursor.fetchall() + hasInsertData = False + for row in basisDatas: + code,trade_day,sort,type = row + # 数据存在时,不进行更新 + if code in mojin_stockintrend_map: + continue + else: + sql_insert = f"INSERT INTO stocks_in_trend (code,trade_day,sort,type) VALUES ( %s, %s, %s, %s)" + db_mojin_cursor.execute(sql_insert, (code,trade_day,sort,type)) + hasInsertData = True + if hasInsertData: + db_mojin_cursor.commit() + + print('successed transfer stocksInTrend ',trans_date , ' data.' ) + print('\r\n') def toTransStocksLimit(db_conn_a,dbcursor_a,db_conn_b,dbcursor_b,trade_date): - return True + # 连接数据库A ry库 + db_a_conn = db_conn_a + db_ry_cursor = dbcursor_a + # 连接数据库B mojin库 + db_b_conn = db_conn_b + db_mojin_cursor = dbcursor_b + + trans_date = trade_date + + mojin_stocklimit_map = {} + + print('to transfer stocksLimit ',trans_date , ' data.' ) + #查询mojn库中的basis数据 + sql_qurey = 'select code,trade_day,islimit,isdrop from stocks_limit where trade_day = %s' + db_mojin_cursor.execute(sql_qurey,trans_date) + basisDatas = db_mojin_cursor.fetchall() + for row in basisDatas: + code,trade_day,sort,type = row + mojin_stocklimit_map[code] = (code,trade_day,sort,type) + + #查询ry库中的基础数据 + sql_qurey = 'select code,trade_day,islimit,isdrop from stocks_limit where trade_day = %s' + db_ry_cursor.execute(sql_qurey) + basisDatas = db_ry_cursor.fetchall() + hasInsertData = False + for row in basisDatas: + code,trade_day,islimit,isdrop = row + # 数据存在时,不进行更新 + if code in mojin_stocklimit_map: + continue + else: + sql_insert = f"INSERT INTO stocks_limit (code,trade_day,islimit,isdrop) VALUES ( %s, %s, %s, %s)" + db_mojin_cursor.execute(sql_insert, (code,trade_day,islimit,isdrop)) + hasInsertData = True + if hasInsertData: + db_mojin_cursor.commit() + + print('successed transfer stocksLimit ',trans_date , ' data.' ) + print('\r\n') def toTransStocksLimitUp(db_conn_a,dbcursor_a,db_conn_b,dbcursor_b,trade_date): - return True + # 连接数据库A ry库 + db_a_conn = db_conn_a + db_ry_cursor = dbcursor_a + # 连接数据库B mojin库 + db_b_conn = db_conn_b + db_mojin_cursor = dbcursor_b + + trans_date = trade_date + + mojin_stocklimitup_map = {} + + print('to transfer stocksLimitUp ',trans_date , ' data.' ) + #查询mojn库中的basis数据 + sql_qurey = 'select code,trade_day from stocks_limit_up where trade_day = %s' + db_mojin_cursor.execute(sql_qurey,trans_date) + basisDatas = db_mojin_cursor.fetchall() + for row in basisDatas: + code,trade_day = row + mojin_stocklimitup_map[code] = (code,trade_day) + + #查询ry库中的基础数据 + sql_qurey = 'select code,trade_day from stocks_limit_up where trade_day = %s' + db_ry_cursor.execute(sql_qurey) + basisDatas = db_ry_cursor.fetchall() + hasInsertData = False + for row in basisDatas: + code,trade_day = row + # 数据存在时,不进行更新 + if code in mojin_stocklimitup_map: + continue + else: + sql_insert = f"INSERT INTO stocks_limit _up(code,trade_day) VALUES ( %s, %s)" + db_mojin_cursor.execute(sql_insert, (code,trade_day)) + hasInsertData = True + if hasInsertData: + db_mojin_cursor.commit() + + print('successed transfer stocksLimitUp ',trans_date , ' data.' ) + print('\r\n') def toTransStocksNewRecord(db_conn_a,dbcursor_a,db_conn_b,dbcursor_b,trade_date): - return True + # 连接数据库A ry库 + db_a_conn = db_conn_a + db_ry_cursor = dbcursor_a + # 连接数据库B mojin库 + db_b_conn = db_conn_b + db_mojin_cursor = dbcursor_b + + trans_date = trade_date + + mojin_stocksNeRecord_map = {} + + print('to transfer stocksNewRecord ',trans_date , ' data.' ) + #查询mojn库中的basis数据 + sql_qurey = 'select code,trade_day,isHigh,isLow from stocks_new_record where trade_day = %s' + db_mojin_cursor.execute(sql_qurey,trans_date) + basisDatas = db_mojin_cursor.fetchall() + for row in basisDatas: + code,trade_day,isHigh,isLow = row + mojin_stocksNeRecord_map[code] = (code,trade_day,isHigh,isLow) + + #查询ry库中的基础数据 + sql_qurey = 'select code,trade_day,isHigh,isLow from stocks_new_record where trade_day = %s' + db_ry_cursor.execute(sql_qurey) + basisDatas = db_ry_cursor.fetchall() + hasInsertData = False + for row in basisDatas: + ode,trade_day,isHigh,isLow = row + # 数据存在时,不进行更新 + if code in mojin_stocksNeRecord_map: + continue + else: + sql_insert = f"INSERT INTO stocks_new_record (ode,trade_day,isHigh,isLow) VALUES ( %s, %s, %s, %s)" + db_mojin_cursor.execute(sql_insert, (ode,trade_day,isHigh,isLow)) + hasInsertData = True + if hasInsertData: + db_mojin_cursor.commit() + + print('successed transfer stocksNewRecord ',trans_date , ' data.' ) + print('\r\n') def toTransStocksTmp(db_conn_a,dbcursor_a,db_conn_b,dbcursor_b,trade_date): - return True + # 连接数据库A ry库 + db_a_conn = db_conn_a + db_ry_cursor = dbcursor_a + # 连接数据库B mojin库 + db_b_conn = db_conn_b + db_mojin_cursor = dbcursor_b + + trans_date = trade_date + + mojin_stocksTmp_map = {} + + print('to transfer stocksTmp ',trans_date , ' data.' ) + #查询mojn库中的basis数据 + sql_qurey = 'select code,trade_day,differrange3,differrange5,differrange15,differrange30 from stocks_tmp where trade_day = %s' + db_mojin_cursor.execute(sql_qurey,trans_date) + basisDatas = db_mojin_cursor.fetchall() + for row in basisDatas: + code,trade_day,differrange3,differrange5,differrange15,differrange30 = row + mojin_stocksTmp_map[code] = (code,trade_day,differrange3,differrange5,differrange15,differrange30) + + #查询ry库中的基础数据 + sql_qurey = 'select code,trade_day,differrange3,differrange5,differrange15,differrange30 from stocks_tmp where trade_day = %s' + db_ry_cursor.execute(sql_qurey) + basisDatas = db_ry_cursor.fetchall() + hasInsertData = False + for row in basisDatas: + code,trade_day,differrange3,differrange5,differrange15,differrange30 = row + # 数据存在时,不进行更新 + if code in mojin_stocksTmp_map: + continue + else: + sql_insert = f"INSERT INTO stocks_new_record (code,trade_day,differrange3,differrange5,differrange15,differrange30) VALUES ( %s, %s, %s, %s,%s,%s)" + db_mojin_cursor.execute(sql_insert, (code,trade_day,differrange3,differrange5,differrange15,differrange30)) + hasInsertData = True + if hasInsertData: + db_mojin_cursor.commit() + + print('successed transfer stocksTmp ',trans_date , ' data.' ) + print('\r\n') def toTransTrends(db_conn_a,dbcursor_a,db_conn_b,dbcursor_b,trade_date): - return True + # 连接数据库A ry库 + db_a_conn = db_conn_a + db_ry_cursor = dbcursor_a + # 连接数据库B mojin库 + db_b_conn = db_conn_b + db_mojin_cursor = dbcursor_b + + trans_date = trade_date + + mojin_trends_map = {} + + print('to transfer Trends ',trans_date , ' data.' ) + #查询mojn库中的basis数据 + sql_qurey = 'select trade_day,blemind2,stocks_count,trend_value,trend_value_change,sort,sort_change,type from trends where trade_day = %s' + db_mojin_cursor.execute(sql_qurey,trans_date) + basisDatas = db_mojin_cursor.fetchall() + for row in basisDatas: + trade_day,blemind2,stocks_count,trend_value,trend_value_change,sort,sort_change,type = row + mojin_trends_map[blemind2] = {type:(trade_day,blemind2,stocks_count,trend_value,trend_value_change,sort,sort_change,type)} + + #查询ry库中的基础数据 + sql_qurey = 'select trade_day,blemind2,stocks_count,trend_value,trend_value_change,sort,sort_change,type from trends where trade_day = %s' + db_ry_cursor.execute(sql_qurey) + basisDatas = db_ry_cursor.fetchall() + hasInsertData = False + for row in basisDatas: + trade_day,blemind2,stocks_count,trend_value,trend_value_change,sort,sort_change,type = row + # 数据存在时,不进行更新 + if blemind2 in mojin_trends_map: + continue + else: + sql_insert = f"INSERT INTO trends (trade_day,blemind2,stocks_count,trend_value,trend_value_change,sort,sort_change,type) VALUES ( %s, %s, %s, %s,%s, %s, %s, %s)" + db_mojin_cursor.execute(sql_insert, (trade_day,blemind2,stocks_count,trend_value,trend_value_change,sort,sort_change,type)) + hasInsertData = True + if hasInsertData: + db_mojin_cursor.commit() + + print('successed transfer Trends ',trans_date , ' data.' ) + print('\r\n') def toTransRyOhterData(db_conn_a,dbcursor_a,db_conn_b,dbcursor_b,trade_date): toTransStockBasis(db_conn_a,dbcursor_a,db_conn_b,dbcursor_b,trade_date) - toTransStockFinancial(db_conn_a,dbcursor_a,db_conn_b,dbcursor_b,trade_date) + # toTransStockFinancial(db_conn_a,dbcursor_a,db_conn_b,dbcursor_b,trade_date) toTransStockIndex(db_conn_a,dbcursor_a,db_conn_b,dbcursor_b,trade_date) toTransStocks(db_conn_a,dbcursor_a,db_conn_b,dbcursor_b,trade_date) toTransStocksInTrend(db_conn_a,dbcursor_a,db_conn_b,dbcursor_b,trade_date) @@ -82,4 +430,52 @@ def toTransRyOhterData(db_conn_a,dbcursor_a,db_conn_b,dbcursor_b,trade_date): toTransStocksLimitUp(db_conn_a,dbcursor_a,db_conn_b,dbcursor_b,trade_date) toTransStocksNewRecord(db_conn_a,dbcursor_a,db_conn_b,dbcursor_b,trade_date) toTransStocksTmp(db_conn_a,dbcursor_a,db_conn_b,dbcursor_b,trade_date) - toTransTrends(db_conn_a,dbcursor_a,db_conn_b,dbcursor_b,trade_date) \ No newline at end of file + toTransTrends(db_conn_a,dbcursor_a,db_conn_b,dbcursor_b,trade_date) + +if __name__ == "__main__": + # 数据库A的连接信息;腾讯云数据库 + db_a_config = { + 'host': '124.223.98.178', + 'user': 'root', + 'password': '1qazse42W3', + 'db': 'ry', + 'charset': 'utf8mb4', + } + + # 数据库B的连接信息;腾讯云数据库 + db_b_config = { + 'host': '124.223.98.178', + 'user': 'root', + 'password': '1qazse42W3', + 'db': 'mojin', + 'charset': 'utf8mb4', + } + + try: + # 连接数据库A + db_a_conn = pymysql.connect(**db_a_config) + db_a_cursor = db_a_conn.cursor() + + # 连接数据库B + db_b_conn = pymysql.connect(**db_b_config) + db_b_cursor = db_b_conn.cursor() + + trade_day = '2023-08-09' + + #测试 + toTransStocksInTrend(db_a_conn,db_a_cursor,db_b_conn,db_b_cursor,trade_day) + + except Exception as e: + print(f"出现错误:{e}") + + finally: + # 关闭连接 + if db_a_cursor: + db_a_cursor.close() + if db_a_conn: + db_a_conn.close() + + if db_b_cursor: + db_b_cursor.close() + if db_b_conn: + db_b_conn.close() \ No newline at end of file