Python线程池ThreadPoolExecutor实践

2019年5月15日 23:00 by wst

python高级

在使用ThreadPoolExecutor的过程中,一直想探索线程池会使用到多个核心吗?

带着这个疑问,首先做如下实验:

1. 从某个数文件读取数据,处理完后写到另一个文件。

2. 从某一个文件读取数据,处理完后,直接打印输出。

情况1

要考虑的问题:

1. 使用多线程写文件,会不会串行,比如都写I love you,会不会变成I love I you love you?

2. 怎么保证不串行?

3. 这个会使用多核心吗?

测试代码如下:

使用锁保证文件不会写串,结果写入文件则不能使用多核心(为什么?)。

import threading

import jsonlines

from concurrent.futures import ThreadPoolExecutor, as_completed

lock = threading.Lock()

def consume_user(line, writer):

""" 把结果写入文件 """

# 这里放置处理逻辑

time.sleep(10)

line2 = line

if lock.acquire(True):

writer.write(line2)

lock.release()

def write_file(**kwargs):

""" 多线程调用 """

t1 = time.time()

log.info("Start write data to file...")

source = kwargs.get("data_source", None)

print("source:", source)

fn = "files/%s_es.jsonl" % source

with jsonlines.open(fn) as fp:

with jsonlines.open('files/%s_result.jsonl' % source, mode='w') as writer:

with ThreadPoolExecutor(20) as executor:

future_to_read = [executor.submit(consume_user(line, writer)) for line in fp]

for future in as_completed(future_to_read):

print("future_result:", future.result())

log.info("Update data use time:{:.3f}".format(time.time() - t1))

if __name__ == "__main__"

t1 =time.time()

write_file(data_source="source1")

print("Done.{:.3f}".format(time.time()-t1))

情况2

处理完后,直接打印输出,代码如下:

经过观察此时每个核心被使用的量基本一致(为什么?)。

import threading

import jsonlines

from concurrent.futures import ThreadPoolExecutor, as_completed

lock = threading.Lock()

def consume_user(line, writer):

""" 把结果写入文件 """

# 这里放置处理逻辑

time.sleep(10)

line2 = line

print(line2)

def write_file(**kwargs):

""" 多线程调用 """

t1 = time.time()

log.info("Start write data to file...")

source = kwargs.get("data_source", None)

print("source:", source)

fn = "files/%s_es.jsonl" % source

with jsonlines.open(fn) as fp:

with jsonlines.open('files/%s_result.jsonl' % source, mode='w') as writer:

with ThreadPoolExecutor(20) as executor:

future_to_read = [executor.submit(consume_user(line, writer)) for line in fp]

for future in as_completed(future_to_read):

print("future_result:", future.result())

log.info("Update data use time:{:.3f}".format(time.time() - t1))

if __name__ == "__main__"

t1 =time.time()

write_file(data_source="source1")

print("Done.{:.3f}".format(time.time()-t1))

结论

1. 如果为了保证一致性,请使用锁机制;

2. 如果想使用多核心,则把结果打印输出,或存储到MySQL或ES

如果你有不同意见,欢迎在下方留言!!!

还有上面的为什么,谁帮我解解惑。

Comment

×

Name

Email address

Comment

Close

Submit

Not Comment!

Logo

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。

更多推荐