跳转至

ThreadPoolExecutor 如何选择最大工作线程数

您可以通过设置 max_workers 参数来配置Python中的 ThreadPoolExecutor 中的线程数。

在本教程中,您将了解如何配置 Python 线程池中的工作线程数。

目录

  • 需要在 ThreadPoolExecutor 中配置工作线程数
  • 如何配置工作线程数
  • 检查缺省工作线程数
  • 配置工作线程数的示例
  • 常见问题
    • 线程池执行器中的默认线程数是多少?
    • 我有多少个 CPU 或 CPU 内核?
    • ThreadPoolExecutor 中的线程数是否与 CPU 或内核数匹配?
    • 我应该使用多少个线程?
    • 线程池执行器中工作线程的最大数量是多少?
  • 附记:
    • concurrent.futures.ThreadPoolExecutor
    • multiprocessing.dummy.Pool

需要在 ThreadPoolExecutor 中配置工作线程数

Python中的ThreadPoolExecutor提供了一个可重用线程池,用于执行临时任务。

您可以通过调用 submit() 函数,并传入您希望在另一个线程上执行的函数的名称。您还可以通过调用map() 函数,并指定要执行的函数的名称以及将应用函数的项的可迭代性。

线程池具有固定数量的工作线程。

根据系统中的资源或要在任务中使用的资源数,将线程池中的工作线程数限制为要完成的异步任务数非常重要。

或者,您可能希望大幅增加工作线程的数量,因为您打算使用的资源容量更大。

如何配置工作线程数

您可以在 ThreadPoolExecutor,方法是将 max_workers构造函数中。

例如:

1
2
3
4
5
# create a thread pool and set the number of worker threads
executor = ThreadPoolExecutor(max_workers=100)
# ...
# shutdown the thread pool
executor.shutdown()

如果您正在使用上下文管理器来创建线程池,以便它自动关闭,那么您可以以相同的方式配置线程数。例如:

1
2
3
# create a thread pool using the context manager and set the number of workers
with ThreadPoolExecutor(50) as executor:
    # ...

它采用正整数,并默认为系统中的 CPU 数加上 4。 工作线程总数 = (系统中的 CPU) + 4

例如,如果您的系统中有 2 个物理 CPU,并且每个 CPU 都有超线程(在现代 CPU 中很常见),那么您将有 2 个物理 CPU 和 4 个逻辑 CPU。Python将看到4个CPU。然后,系统上的默认工作线程数将为 (4 + 4) 或 8。

如果此数字大于 32(例如,16 个物理内核、32 个逻辑内核加 4 个),则默认值会将上限裁剪为 32 个线程。

系统中的线程数通常多于 CPU(物理或逻辑)。

这样做的原因是线程用于 IO 密集型任务,而不是 CPU 密集型任务。这意味着线程用于等待相对较慢的资源响应的任务,例如硬盘驱动器,DVD驱动器,打印机,网络连接等等。我们将在后面的部分中讨论线程的最佳应用。

因此,根据您的特定需求,在您的应用程序中拥有数十个,数百个甚至数千个线程并不罕见。具有多个或几千个线程是不寻常的。如果需要这么多线程,则可能首选替代解决方案,例如 AsyncIO。

现在我们知道了如何配置 工作线程数 ThreadPoolExecutor,让我们看一个工作的例子。

检查缺省工作线程数

让我们检查为系统上的线程池创建了多少个线程。

查看源码 ThreadPoolExecutor,我们可以看到默认选择的工作线程数存储在 _max_workers属性,我们可以在创建线程池后访问和报告该属性。

注意_max_workers是受保护的成员,将来可能会更改。

下面的示例报告系统上线程池中的默认线程数。

1
2
3
4
5
6
7
# SuperFastPython.com
# report the default number of worker threads on your system
from concurrent.futures import ThreadPoolExecutor
# create a thread pool with the default number of worker threads
executor = ThreadPoolExecutor()
# report the number of worker threads chosen by default
print(executor._max_workers)

运行该示例将报告系统上默认使用的工作线程数。

我有四个物理CPU内核,八个逻辑内核,因此默认值为8 + 4或12个线程。

配置工作线程数的示例

我们可以直接指定工作线程的数量,这在大多数应用程序中都是一个好主意。

下面的示例演示如何配置 500 个工作线程。

1
2
3
4
5
6
7
# SuperFastPython.com
# configure and report the default number of worker threads
from concurrent.futures import ThreadPoolExecutor
# create a thread pool with a large number of worker threads
with ThreadPoolExecutor(500) as executor:
    # report the number of worker threads
    print(executor._max_workers)

运行该示例将线程池配置为使用 500 个线程,并确认它将创建 500 个线程。

常见问题

线程池执行器中的默认线程数是多少?

ThreadPoolExecutor 中的默认线程数计算如下: * 工作线程总数 = (系统中的 CPU) + 4

系统中的CPU数量由Python决定,并将考虑超线程。

例如,如果你有两个CPU内核,每个内核都有超线程(这很常见),那么Python将在你的系统中“看到”四个CPU。

我有多少个 CPU 或 CPU 内核?

您可以通过 os 模块中的 cpu_count() 函数。

例如,以下程序将报告系统中对 Python 可见的 CPU 内核数:

1
2
3
# report the number of CPUs in your system visible to Python
import os
print(os.cpu_count())

ThreadPoolExecutor 中的线程数是否与 CPU 或内核数匹配?

中工作线程数 ThreadPoolExecutor 与系统中 CPU 或 CPU 内核的数量无关。

您可以根据需要执行的任务数、可用的本地系统资源量(例如内存)以及要在任务中访问的资源限制(例如,与远程服务器的连接)来配置线程数。

我应该使用多少个线程?

如果您有数百个任务,则可能应将线程数设置为等于任务数。

如果您有数千个任务,则可能应该将线程数限制为数百或1,000。

如果应用程序打算在将来多次执行,则可以测试不同数量的线程并比较总体执行时间,然后选择一些可提供近似最佳性能的线程。您可能希望通过随机睡眠操作模拟这些测试中的任务。

线程池执行器中工作线程的最大数量是多少?

线程池执行器中没有最大工作线程数。

但是,您的系统将具有基于可用主内存 (RAM) 的可创建的线程数的上限。

在超过主内存之前,您将在添加新线程和执行更多任务方面达到收益递减的点。这是因为您的操作系统必须在线程之间切换,这称为上下文切换。如果同时处于活动状态的线程太多,则程序可能会花费更多的时间进行上下文切换,而不是实际执行任务。

对于许多应用程序来说,一个合理的上限是数百个线程到几千个线程。现代系统上的几千多个线程可能会导致过多的上下文切换,具体取决于您的系统和正在执行的任务类型。

附记

多线程方法,可以使用concurrent.futures.ThreadPoolExecutor和multiprocessing.dummy.Pool 这两种,

concurrent.futures.ThreadPoolExecutor

1
2
3
4
5
6
7
8
9
import concurrent
import requests

def request_post(url, data):
    return requests.post(url, data=data)

with concurrent.futures.ThreadPoolExecutor() as executor: # optimally defined number of threads
    res = [executor.submit(request_post, url, data) for data in names]
    concurrent.futures.wait(res)
或者
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed
from time import time

def download_file(url):
    html = requests.get(url, stream=True)
    return html.status_code

start = time()

processes = []
with ThreadPoolExecutor(max_workers=10) as executor:
    for url in url_list:
        processes.append(executor.submit(download_file, url))

for task in as_completed(processes):
    print(task.result())

multiprocessing.dummy.Pool

1
2
3
4
5
6
from multiprocessing.dummy import Pool as ThreadPool
import itertools
import requests

with ThreadPool(len(names)) as pool: # creates a Pool of 3 threads 
    res = pool.starmap(requests.post(itertools.repeat(url),names))

pool.starmap - 用于传递(映射)多个参数到一个函数(requests.post),该函数将被一个线程(ThreadPool)列表调用。它将为每个请求返回一个request.Response的列表。

intertools.repeat(url)是需要的,以使第一个参数被重复创建相同数量的线程。

names是request.post的第二个参数,所以不需要明确使用可选参数data就可以工作。它的长度必须与正在创建的线程数量相同。

凡本网注明"来源:XXX "的文/图/视频等稿件,本网转载出于传递更多信息之目的,并不意味着赞同其观点或证实其内容的真实性。如涉及作品内容、版权和其它问题,请与本网联系,我们将在第一时间删除内容!
作者: Jason Brownlee
来源: https://superfastpython.com/threadpoolexecutor-number-of-threads/