背景
现有一个百万行数据的csv格式文件,需要在两分钟之内存入数据库。
方案
方案一:多线程+协程+异步MySql方案二:多线程+MySql批量插入
代码
1,先通过pandas读取所有csv数据存入列表。
2,设置N个线程,将一百万数据均分为N份,以start,end传递给线程以切片的方法读取区间数据(建议为16个线程)
3,方案二 线程内以 executemany 方法批量插入所有数据。
4,方案一 线程内使用异步事件循环遍历所有数据异步插入。
5,方案一纯属没事找事型。
方案二
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 | import threading import pandas as pd import asyncio import time import aiomysql import pymysql data = [] error_data = [] def run(start,end): global data global error_data print ( "start" + threading.current_thread().name) print (time.strftime( '%Y-%m-%d %H:%M:%S' , time.localtime(time.time()))) mysdb = getDb( "*" , * , "*" , "*" , "*" ) cursor = mysdb.cursor() sql = """insert into *_*_* values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)""" cursor.executemany(sql,data[start:end]) mysdb.commit() mysdb.close() print ( "end" + threading.current_thread().name) print (time.strftime( '%Y-%m-%d %H:%M:%S' , time.localtime(time.time()))) def csv_file_read_use_pd(csvFile): csv_result = pd.read_csv(csvFile,encoding = "utf-16" ,sep = 't' ) csv_result = csv_result.fillna(value = "None" ) result = csv_result.values.tolist() return result class MyDataBase: def __init__( self ,host = None ,port = None ,username = None ,password = None ,database = None ): self .db = pymysql.connect(host = host,port = port,user = username,password = password,database = database) def close( self ): self .db.close() def getDb(host,port,username,password,database): MyDb = MyDataBase(host, port, username, password,database) return MyDb.db def main(csvFile): global data #获取全局对象 csv全量数据 #读取所有的数据 将所有数据均分成 thread_lens 份 分发给 thread_lens 个线程去执行 thread_lens = 20 csv_result = csv_file_read_use_pd(csvFile) day = time.strftime( '%Y-%m-%d %H:%M:%S' , time.localtime(time.time())) for item in csv_result: item.insert( 0 ,day) data = csv_result thread_exe_count_list = [] #线程需要执行的区间 csv_lens = len (csv_result) avg = csv_lens / / thread_lens remainder = csv_lens % thread_lens # 0,27517 27517,55,034 nowIndex = 0 for i in range (thread_lens): temp = [nowIndex,nowIndex + avg] nowIndex = nowIndex + avg thread_exe_count_list.append(temp) thread_exe_count_list[ - 1 :][ 0 ][ 1 ] + = remainder #余数分给最后一个线程 # print(thread_exe_count_list) #th(thread_exe_count_list[0][0],thread_exe_count_list[0][1]) for i in range (thread_lens): sub_thread = threading.Thread(target = run,args = (thread_exe_count_list[i][ 0 ],thread_exe_count_list[i][ 1 ],)) sub_thread.start() sub_thread.join() time.sleep( 3 ) if __name__ = = "__main__" : #csv_file_read_use_pd("分公司箱型箱量.csv") main( "分公司箱型箱量.csv" ) |
方案一
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 | import threading import pandas as pd import asyncio import time import aiomysql data = [] error_data = [] async def async_basic(loop,start,end): global data global error_data print ( "start" + threading.current_thread().name) print (time.strftime( '%Y-%m-%d %H:%M:%S' , time.localtime(time.time()))) conn = await aiomysql.connect( host = "*" , port = * , user = "*" , password = "*" , db = "*" , loop = loop ) day = time.strftime( '%Y-%m-%d %H:%M:%S' , time.localtime(time.time())) sql = """insert into **** values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)""" async with conn.cursor() as cursor: for item in data[start:end]: params = [day] params.extend(item) try : x = await cursor.execute(sql,params) if x = = 0 : error_data.append(item) print (threading.current_thread().name + " result " + str (x)) except Exception as e: print (e) error_data.append(item) time.sleep( 10 ) pass await conn.close() #await conn.commit() #关闭连接池 # pool.close() # await pool.wait_closed() print ( "end" + threading.current_thread().name) print (time.strftime( '%Y-%m-%d %H:%M:%S' , time.localtime(time.time()))) def csv_file_read_use_pd(csvFile): csv_result = pd.read_csv(csvFile,encoding = "utf-16" ,sep = 't' ) csv_result = csv_result.fillna(value = "None" ) result = csv_result.values.tolist() return result def th(start,end): loop = asyncio.new_event_loop() loop.run_until_complete(async_basic(loop,start,end)) def main(csvFile): global data #获取全局对象 csv全量数据 #读取所有的数据 将所有数据均分成 thread_lens 份 分发给 thread_lens 个线程去执行 thread_lens = 20 csv_result = csv_file_read_use_pd(csvFile) data = csv_result thread_exe_count_list = [] #线程需要执行的区间 csv_lens = len (csv_result) avg = csv_lens / / thread_lens remainder = csv_lens % thread_lens # 0,27517 27517,55,034 nowIndex = 0 for i in range (thread_lens): temp = [nowIndex,nowIndex + avg] nowIndex = nowIndex + avg thread_exe_count_list.append(temp) thread_exe_count_list[ - 1 :][ 0 ][ 1 ] + = remainder #余数分给最后一个线程 print (thread_exe_count_list) #th(thread_exe_count_list[0][0],thread_exe_count_list[0][1]) for i in range (thread_lens): sub_thread = threading.Thread(target = th,args = (thread_exe_count_list[i][ 0 ],thread_exe_count_list[i][ 1 ],)) sub_thread.start() time.sleep( 3 ) if __name__ = = "__main__" : #csv_file_read_use_pd("分公司箱型箱量.csv") main( "分公司箱型箱量.csv" ) |
总结
到此这篇关于Python大批量写入数据的文章就介绍到这了,更多相关Python大批量写入数据内容请搜索IT俱乐部以前的文章或继续浏览下面的相关文章希望大家以后多多支持IT俱乐部!