这篇教程Python大批量写入数据(百万级别)的方法写得很实用,希望能帮到您。
背景现有一个百万行数据的csv格式文件,需要在两分钟之内存入数据库。
方案方案一:多线程+协程+异步MySql方案二:多线程+MySql批量插入
代码 1,先通过pandas读取所有csv数据存入列表。 2,设置N个线程,将一百万数据均分为N份,以start,end传递给线程以切片的方法读取区间数据(建议为16个线程) 3,方案二 线程内以 executemany 方法批量插入所有数据。 4,方案一 线程内使用异步事件循环遍历所有数据异步插入。 5,方案一纯属没事找事型。
方案二import threadingimport pandas as pdimport asyncioimport timeimport aiomysqlimport pymysqldata=[]error_data=[]def run(start,end): global data global error_data print("start"+threading.current_thread().name) print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))) mysdb = getDb("*", *, "*", "*", "*") cursor = mysdb.cursor() sql = """insert into *_*_* values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)""" cursor.executemany(sql,data[start:end]) mysdb.commit() mysdb.close() print("end" + threading.current_thread().name) print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))def csv_file_read_use_pd(csvFile): csv_result = pd.read_csv(csvFile,encoding="utf-16",sep='/t') csv_result = csv_result.fillna(value="None") result = csv_result.values.tolist() return resultclass MyDataBase: def __init__(self,host=None,port=None,username=None,password=None,database=None): self.db = pymysql.connect(host=host,port=port,user=username,password=password,database=database) def close(self): self.db.close()def getDb(host,port,username,password,database): MyDb = MyDataBase(host, port, username, password,database) return MyDb.dbdef main(csvFile): global data #获取全局对象 csv全量数据 #读取所有的数据 将所有数据均分成 thread_lens 份 分发给 thread_lens 个线程去执行 thread_lens=20 csv_result=csv_file_read_use_pd(csvFile) day = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())) for item in csv_result: item.insert(0,day) data=csv_result thread_exe_count_list=[] #线程需要执行的区间 csv_lens=len(csv_result) avg = csv_lens // thread_lens remainder=csv_lens % thread_lens # 0,27517 27517,55,034 nowIndex=0 for i in range(thread_lens): temp=[nowIndex,nowIndex+avg] nowIndex=nowIndex+avg thread_exe_count_list.append(temp) thread_exe_count_list[-1:][0][1]+=remainder #余数分给最后一个线程 # print(thread_exe_count_list) #th(thread_exe_count_list[0][0],thread_exe_count_list[0][1]) for i in range(thread_lens): sub_thread = threading.Thread(target=run,args=(thread_exe_count_list[i][0],thread_exe_count_list[i][1],)) sub_thread.start() sub_thread.join() time.sleep(3)if __name__=="__main__": #csv_file_read_use_pd("分公司箱型箱量.csv") main("分公司箱型箱量.csv")
方案一import threadingimport pandas as pdimport asyncioimport timeimport aiomysqldata=[]error_data=[]async def async_basic(loop,start,end): global data global error_data print("start"+threading.current_thread().name) print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))) conn = await aiomysql.connect( host="*", port=*, user="*", password="*", db="*", loop=loop ) day = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())) sql = """insert into **** values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)""" async with conn.cursor() as cursor: for item in data[start:end]: params=[day] params.extend(item) try: x=await cursor.execute(sql,params) if x==0: error_data.append(item) print(threading.current_thread().name+" result "+str(x)) except Exception as e: print(e) error_data.append(item) time.sleep(10) pass await conn.close() #await conn.commit() #关闭连接池 # pool.close() # await pool.wait_closed() print("end" + threading.current_thread().name) print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))def csv_file_read_use_pd(csvFile): csv_result = pd.read_csv(csvFile,encoding="utf-16",sep='/t') csv_result = csv_result.fillna(value="None") result = csv_result.values.tolist() return resultdef th(start,end): loop = asyncio.new_event_loop() loop.run_until_complete(async_basic(loop,start,end))def main(csvFile): global data #获取全局对象 csv全量数据 #读取所有的数据 将所有数据均分成 thread_lens 份 分发给 thread_lens 个线程去执行 thread_lens=20 csv_result=csv_file_read_use_pd(csvFile) data=csv_result thread_exe_count_list=[] #线程需要执行的区间 csv_lens=len(csv_result) avg = csv_lens // thread_lens remainder=csv_lens % thread_lens # 0,27517 27517,55,034 nowIndex=0 for i in range(thread_lens): temp=[nowIndex,nowIndex+avg] nowIndex=nowIndex+avg thread_exe_count_list.append(temp) thread_exe_count_list[-1:][0][1]+=remainder #余数分给最后一个线程 print(thread_exe_count_list) #th(thread_exe_count_list[0][0],thread_exe_count_list[0][1]) for i in range(thread_lens): sub_thread = threading.Thread(target=th,args=(thread_exe_count_list[i][0],thread_exe_count_list[i][1],)) sub_thread.start() time.sleep(3)if __name__=="__main__": #csv_file_read_use_pd("分公司箱型箱量.csv") main("分公司箱型箱量.csv")
总结到此这篇关于Python大批量写入数据的文章就介绍到这了,更多相关Python大批量写入数据内容请搜索wanshiok.com以前的文章或继续浏览下面的相关文章希望大家以后多多支持wanshiok.com! Python使用Matplotlib库创建3D 图形和交互式图形详解 Python爬虫原理与基本请求库urllib详解 |