Часто будет требоваться запуск нескольких потоков одновременно, например для обработки списка из нескольких файлов, или запросов на несколько url. Библиотека concurrent.future
предоставляет класс ThreadPoolExecutor
для упрощения обработки параллельных потоков.
Обычно экземпляр ThreadPoolExecutor
создается с контекстным менеджером with, чтобы определять исполняемые блоки и очищать потоки после выполнения.
Основные методы класса submit
и map
.
submit
позволяет создавать поток для одной функции с переданными в нее параметрами. Синтаксис: submit(func: Callable, [*args, **kwargs])
.
Map позволяет запускать несколько потоков для указанного callable объекта, и массива входных параметров для него. Синтаксис: map(func, *iterables, timeout=None, [chunksize])
Рассмотрим на примере:
1from concurrent.futures import ThreadPoolExecutor
2import time
3from threading import current_thread
4
5
6def sum_len(data: list[int]) -> int:
7 return sum(data)
8
9
10def foo(text: str) -> int:
11 time.sleep(len(text))
12 print('[foo] Current thread:', current_thread().name)
13 print('msg:', text)
14 return len(text)
15
16
17fruit_list = ['apple', 'banana', 'pineapple']
18
19with ThreadPoolExecutor(max_workers=3) as pool:
20 print('[main] Current thread:', current_thread().name)
21 results = pool.map(foo, fruit_list)
22 print('results type:', type(results))
23
24 result = pool.submit(sum_len, results)
25 print('result type:', type(result))
26
27 print('Waiting for thread end...')
28
29print('result:', result.result())
30print('Threads finished')
Вывод в консоль:
1[main] Current thread: MainThread
2results type: <class 'generator'>
3result type: <class 'concurrent.futures._base.Future'>
4Waiting for thread end...
5[foo] Current thread: ThreadPoolExecutor-0_0
6msg: apple
7[foo] Current thread: ThreadPoolExecutor-0_1
8msg: banana
9[foo] Current thread: ThreadPoolExecutor-0_2
10msg: pineapple
11result: 20
12Threads finished
Данный код позволяет получать для списка длину строковых значений и затем находить их сумму. Для начала необходимые импорты и определяем две функции: foo
, которая возвращает длину строки и sum_len
, которая возвращает сумму длин элементов списка. Определяем список fruit_list
для тестов.
Затем, с помощью контекстного менеджера with
, содаем экземпляр пула потоков. В качестве параметра указываем максимальное количество воркеров с помощью параметра max_workers
.
С помощью инструкции pool.map
указываем, что нам нужно выполнить функцию foo
для каждого элемента списка fruit_list
в отдельном потоке и записать результат в переменную results
. Обратите внимание на тип переменной results
– это генератор.
Инструкция pool.submit
указывает на то, что мы должны применить к результату выполнения предыдущих потоков функцию sum_len
и записать полученное значение в переменную result
. Типом переменной result
будет concurrent.futures._base.Future
(или как его еще называют “футура”). Футура – это ожидаемый объект. Чтобы получить его вычисленный результат, можно использовать его метод .result()
.