Пулы процессов в python. Использование Pool из модуля multiprocessing

Для упрощения работы с несколькими параллельными процессами можно использовать класс Pool из библиотеки multiprocessing.

Для упрощения работы с несколькими параллельными процессами можно использовать класс Pool из библиотеки multiprocessing.

Основными методами экземпляра класса Pool являются:

  • apply – эквивалентен обычному вызову функции. Блокируется до момента получения результата и выполняется только в одном рабочем процессе пула. Синтаксис: apply(func, *args)

  • map – синтаксис: map(func, iter, [chunksize]) разделяет все итеративные данные iter на указанное число фрагментов chunksize, которые поставляются пулу процессов в виде отдельных задач. Блокирует пул до тех пор, пока не получен конечный результат.

  • imap – lazy версия map. Позволяет получать результаты по мере их завершения.

  • apply_async – неблокирующий вызов apply. Синтаксис: apply_async(func, *args, [callback]). Результаты выполнения могут быть переданы в callback функцию для их дальнейшей обработки, либо можно получить результат с помощью метда get().

Рассмотрим несколько примеров.

1import os 2from multiprocessing import Pool, current_process 3import time 4import sys 5 6import logging 7 8logger = logging.getLogger(__name__) 9logger.setLevel(logging.DEBUG) 10handler = logging.StreamHandler(stream=sys.stdout) 11handler.setFormatter(logging.Formatter(fmt='%(asctime)s: %(message)s')) 12logger.addHandler(handler) 13 14 15def calc(value: int) -> int: 16 logger.info(f'calc started in process {current_process().name} with pid {current_process().pid}') 17 time.sleep(1) 18 return value**10 19 20 21if __name__ == '__main__': 22 logger.info(f'main pid: {os.getpid()}') 23 with Pool(processes=2) as pool: 24 result1 = pool.apply(calc, (10,)) 25 logger.info('result1: %s', result1) 26 result2 = pool.apply(calc, (10,)) 27 logger.info('result2: %s', result2) 28 result3 = pool.apply(calc, (10,)) 29 logger.info('result3: %s', result3) 30 result4 = pool.apply(calc, (10,)) 31 logger.info('result4: %s', result4) 32 33 34 print('End program')

Пример вывода:

12023-01-25 18:21:40,027: main pid: 34098 22023-01-25 18:21:40,041: calc started in process ForkPoolWorker-1 with pid 34099 32023-01-25 18:21:41,044: result1: 10000000000 42023-01-25 18:21:41,047: calc started in process ForkPoolWorker-2 with pid 34100 52023-01-25 18:21:42,050: result2: 10000000000 62023-01-25 18:21:42,050: calc started in process ForkPoolWorker-1 with pid 34099 72023-01-25 18:21:43,052: result3: 10000000000 82023-01-25 18:21:43,053: calc started in process ForkPoolWorker-2 with pid 34100 92023-01-25 18:21:44,056: result4: 10000000000 10End program

Рассмотрим пример с apply. В начале скрипта происходит добавление необходимых импортов и базовая настройка логгера, для отображения времени. Определяем базовую функцию calc, которая будет возводить в 10 степень полученное число.

Затем в основном блоке с помощью контекстного менеджера with определяем пул процессов с максимальным числом равным двум. И начинаем вызывать функцию calc в отдельном процессе.

В консоли можно увидеть, что учавствовают два дочерних процесса с pid 34100 и 34099, но при этом параллельно они не работают, задержка между каждым выводом результат составляет одну секунду. Никакого выигрыша в производительность в данном случае нет.

Рассмотрим пример с функцией map:

1import os 2from multiprocessing import Pool, current_process 3import time 4import sys 5 6import logging 7 8logger = logging.getLogger(__name__) 9logger.setLevel(logging.DEBUG) 10handler = logging.StreamHandler(stream=sys.stdout) 11handler.setFormatter(logging.Formatter(fmt='%(asctime)s: %(message)s')) 12logger.addHandler(handler) 13 14 15def calc(value: int) -> int: 16 logger.info(f'calc started in process {current_process().name} with pid {current_process().pid}') 17 time.sleep(1) 18 return value**10 19 20if __name__ == '__main__': 21 logger.info(f'main pid: {os.getpid()}') 22 23 l = [1, 2, 5, 10] 24 with Pool(processes=2) as pool: 25 results = pool.map(calc, l) 26 print(results) 27 28 print('End program')

Вывод в консоль:

12023-01-25 18:28:27,135: main pid: 34311 22023-01-25 18:28:27,148: calc started in process ForkPoolWorker-2 with pid 34313 32023-01-25 18:28:27,148: calc started in process ForkPoolWorker-1 with pid 34312 42023-01-25 18:28:28,150: calc started in process ForkPoolWorker-1 with pid 34312 52023-01-25 18:28:28,150: calc started in process ForkPoolWorker-2 with pid 34313 6[1, 1024, 9765625, 10000000000] 7End program

В основном коде программы заранее готовим список l с числами, для которых будем производить расчет и запускаем вызов функции calc с помощью метода map.

В консоли видим, что два дочерних процесса стартанули одновременно, затем еще два. Результат представлен в виде списка выходных данных функции calc. Также следует обратить внимание на то, что список упорядочен в соответвии со входными данными в списке l.

Попробуем вариант с использованием apply_async:

1import os 2from multiprocessing import Pool, current_process 3import time 4import sys 5 6import logging 7 8logger = logging.getLogger(__name__) 9logger.setLevel(logging.DEBUG) 10handler = logging.StreamHandler(stream=sys.stdout) 11handler.setFormatter(logging.Formatter(fmt='%(asctime)s: %(message)s')) 12logger.addHandler(handler) 13 14 15def calc(value: int) -> int: 16 logger.info(f'calc started in process {current_process().name} with pid {current_process().pid}') 17 time.sleep(1) 18 return value**10 19 20 21def callback(result): 22 logger.info(f'Callback result: {result}, process pid: {current_process().pid}') 23 24 25if __name__ == '__main__': 26 logger.info(f'main pid: {os.getpid()}') 27 28 with Pool(processes=2) as pool: 29 result1 = pool.apply_async(calc, (1,), callback=callback) 30 logger.info('result1: %s', result1) 31 result2 = pool.apply_async(calc, (2,), callback=callback) 32 logger.info('result2: %s', result2) 33 result3 = pool.apply_async(calc, (3,), callback=callback) 34 logger.info('result3: %s', result3) 35 result4 = pool.apply_async(calc, (4,), callback=callback) 36 logger.info('result4: %s', result4) 37 38 pool.close() 39 pool.join() 40 41 print('End program')

По сравнению с предыдущими примерами, добавлена функция callback, которая будет вызываться при завершении процесса. В основном коде происходит последовательная передача чисел 1,2,3,4 в функцию calc, передается также callback для каждого вызова. В конце контекстного менеджера добавлены две инструкции close() и join(). сlose() предотвращает отправку новых задач в пул. Как только все задачи будут выполнены, рабочие процессы завершатся, а join() ожидает завершения рабочих процессов. Без указания этих команд, основная программа просто завершит свою работу, не дождавшись выполнения процессов и вызова callback функции.

Вывод в консоль:

12023-01-25 18:33:03,614: main pid: 34515 22023-01-25 18:33:03,625: result1: <multiprocessing.pool.ApplyResult object at 0x7fa55b754100> 32023-01-25 18:33:03,626: result2: <multiprocessing.pool.ApplyResult object at 0x7fa55b754130> 42023-01-25 18:33:03,626: result3: <multiprocessing.pool.ApplyResult object at 0x7fa55b754340> 52023-01-25 18:33:03,626: result4: <multiprocessing.pool.ApplyResult object at 0x7fa55b754460> 62023-01-25 18:33:03,626: calc started in process ForkPoolWorker-1 with pid 34516 72023-01-25 18:33:03,626: calc started in process ForkPoolWorker-2 with pid 34517 82023-01-25 18:33:04,628: calc started in process ForkPoolWorker-2 with pid 34517 92023-01-25 18:33:04,628: calc started in process ForkPoolWorker-1 with pid 34516 102023-01-25 18:33:04,628: Callback result: 1024, process pid: 34515 112023-01-25 18:33:04,628: Callback result: 1, process pid: 34515 122023-01-25 18:33:05,630: Callback result: 1048576, process pid: 34515 132023-01-25 18:33:05,630: Callback result: 59049, process pid: 34515 14End program

Здесь мы видим, что все задачи для процессов были созданы одновременно. Функция apply_async вернула объект класса multiprocessing.pool.ApplyResult. Затем стартанули первые два процесса с pid 34516 и 34517, а затем вторые два.

После этого происходил вызов callback функции в основном процессе с pid 34515, но результаты перемешались. Сначала вывод для инструкции, которая была второй, затем вывод результата для первой инструкции и т.д. apply_async не гарантирует корректной последовательно получения результата, в отличае от map функции.

Тэги:
python
Дата публикации:
21.09.2023

avatar
master
Admin

Похожие статьи