python threadpoolexecutor_Python线程池ThreadPoolExecutor实践
Python线程池ThreadPoolExecutor实践
2019年5月15日 23:00 by wst
python高级
在使用ThreadPoolExecutor的过程中,一直想探索线程池会使用到多个核心吗?
带着这个疑问,首先做如下实验:
1. 从某个数文件读取数据,处理完后写到另一个文件。
2. 从某一个文件读取数据,处理完后,直接打印输出。
情况1
要考虑的问题:
1. 使用多线程写文件,会不会串行,比如都写I love you,会不会变成I love I you love you?
2. 怎么保证不串行?
3. 这个会使用多核心吗?
测试代码如下:
使用锁保证文件不会写串,结果写入文件则不能使用多核心(为什么?)。
import threading
import jsonlines
from concurrent.futures import ThreadPoolExecutor, as_completed
lock = threading.Lock()
def consume_user(line, writer):
""" 把结果写入文件 """
# 这里放置处理逻辑
time.sleep(10)
line2 = line
if lock.acquire(True):
writer.write(line2)
lock.release()
def write_file(**kwargs):
""" 多线程调用 """
t1 = time.time()
log.info("Start write data to file...")
source = kwargs.get("data_source", None)
print("source:", source)
fn = "files/%s_es.jsonl" % source
with jsonlines.open(fn) as fp:
with jsonlines.open('files/%s_result.jsonl' % source, mode='w') as writer:
with ThreadPoolExecutor(20) as executor:
future_to_read = [executor.submit(consume_user(line, writer)) for line in fp]
for future in as_completed(future_to_read):
print("future_result:", future.result())
log.info("Update data use time:{:.3f}".format(time.time() - t1))
if __name__ == "__main__"
t1 =time.time()
write_file(data_source="source1")
print("Done.{:.3f}".format(time.time()-t1))
情况2
处理完后,直接打印输出,代码如下:
经过观察此时每个核心被使用的量基本一致(为什么?)。
import threading
import jsonlines
from concurrent.futures import ThreadPoolExecutor, as_completed
lock = threading.Lock()
def consume_user(line, writer):
""" 把结果写入文件 """
# 这里放置处理逻辑
time.sleep(10)
line2 = line
print(line2)
def write_file(**kwargs):
""" 多线程调用 """
t1 = time.time()
log.info("Start write data to file...")
source = kwargs.get("data_source", None)
print("source:", source)
fn = "files/%s_es.jsonl" % source
with jsonlines.open(fn) as fp:
with jsonlines.open('files/%s_result.jsonl' % source, mode='w') as writer:
with ThreadPoolExecutor(20) as executor:
future_to_read = [executor.submit(consume_user(line, writer)) for line in fp]
for future in as_completed(future_to_read):
print("future_result:", future.result())
log.info("Update data use time:{:.3f}".format(time.time() - t1))
if __name__ == "__main__"
t1 =time.time()
write_file(data_source="source1")
print("Done.{:.3f}".format(time.time()-t1))
结论
1. 如果为了保证一致性,请使用锁机制;
2. 如果想使用多核心,则把结果打印输出,或存储到MySQL或ES
如果你有不同意见,欢迎在下方留言!!!
还有上面的为什么,谁帮我解解惑。
Comment
×
Name
Email address
Comment
Close
Submit
Not Comment!
魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。
更多推荐


所有评论(0)