ThreadPoolExecutor. Синхронизация потоков в python

Часто будет требоваться запуск нескольких потоков одновременно, например для обработки списка из нескольких файлов, или запросов на несколько url. Библиотека concurrent.future предоставляет класс ThreadPoolExecutor для упрощения обработки параллельных потоков.

Класс ThreadPoolExecutor

Часто будет требоваться запуск нескольких потоков одновременно, например для обработки списка из нескольких файлов, или запросов на несколько url. Библиотека concurrent.future предоставляет класс ThreadPoolExecutor для упрощения обработки параллельных потоков.

Обычно экземпляр ThreadPoolExecutor создается с контекстным менеджером with, чтобы определять исполняемые блоки и очищать потоки после выполнения.

Основные методы класса submit и map.

submit позволяет создавать поток для одной функции с переданными в нее параметрами. Синтаксис: submit(func: Callable, [*args, **kwargs]).

Map позволяет запускать несколько потоков для указанного callable объекта, и массива входных параметров для него. Синтаксис: map(func, *iterables, timeout=None, [chunksize])

Рассмотрим на примере:

1from concurrent.futures import ThreadPoolExecutor 2import time 3from threading import current_thread 4 5 6def sum_len(data: list[int]) -> int: 7 return sum(data) 8 9 10def foo(text: str) -> int: 11 time.sleep(len(text)) 12 print('[foo] Current thread:', current_thread().name) 13 print('msg:', text) 14 return len(text) 15 16 17fruit_list = ['apple', 'banana', 'pineapple'] 18 19with ThreadPoolExecutor(max_workers=3) as pool: 20 print('[main] Current thread:', current_thread().name) 21 results = pool.map(foo, fruit_list) 22 print('results type:', type(results)) 23 24 result = pool.submit(sum_len, results) 25 print('result type:', type(result)) 26 27 print('Waiting for thread end...') 28 29print('result:', result.result()) 30print('Threads finished')

Вывод в консоль:

1[main] Current thread: MainThread 2results type: <class 'generator'> 3result type: <class 'concurrent.futures._base.Future'> 4Waiting for thread end... 5[foo] Current thread: ThreadPoolExecutor-0_0 6msg: apple 7[foo] Current thread: ThreadPoolExecutor-0_1 8msg: banana 9[foo] Current thread: ThreadPoolExecutor-0_2 10msg: pineapple 11result: 20 12Threads finished

Данный код позволяет получать для списка длину строковых значений и затем находить их сумму. Для начала необходимые импорты и определяем две функции: foo, которая возвращает длину строки и sum_len, которая возвращает сумму длин элементов списка. Определяем список fruit_list для тестов.

Затем, с помощью контекстного менеджера with, содаем экземпляр пула потоков. В качестве параметра указываем максимальное количество воркеров с помощью параметра max_workers.

С помощью инструкции pool.map указываем, что нам нужно выполнить функцию foo для каждого элемента списка fruit_list в отдельном потоке и записать результат в переменную results. Обратите внимание на тип переменной results – это генератор.

Инструкция pool.submit указывает на то, что мы должны применить к результату выполнения предыдущих потоков функцию sum_len и записать полученное значение в переменную result. Типом переменной result будет concurrent.futures._base.Future (или как его еще называют “футура”). Футура – это ожидаемый объект. Чтобы получить его вычисленный результат, можно использовать его метод .result().

Тэги:
python
Дата публикации:
23.04.2024

avatar
master
Admin

Похожие статьи