Для упрощения работы с несколькими параллельными процессами можно использовать класс Pool из библиотеки multiprocessing.
Основными методами экземпляра класса Pool являются:
apply – эквивалентен обычному вызову функции. Блокируется до момента получения результата и выполняется только в одном рабочем процессе пула. Синтаксис: apply(func, *args)
map – синтаксис: map(func, iter, [chunksize]) разделяет все итеративные данные iter на указанное число фрагментов chunksize, которые поставляются пулу процессов в виде отдельных задач. Блокирует пул до тех пор, пока не получен конечный результат.
imap – lazy версия map. Позволяет получать результаты по мере их завершения.
apply_async – неблокирующий вызов apply. Синтаксис: apply_async(func, *args, [callback]). Результаты выполнения могут быть переданы в callback функцию для их дальнейшей обработки, либо можно получить результат с помощью метда get().
Рассмотрим несколько примеров.
1import os
2from multiprocessing import Pool, current_process
3import time
4import sys
5
6import logging
7
8logger = logging.getLogger(__name__)
9logger.setLevel(logging.DEBUG)
10handler = logging.StreamHandler(stream=sys.stdout)
11handler.setFormatter(logging.Formatter(fmt='%(asctime)s: %(message)s'))
12logger.addHandler(handler)
13
14
15def calc(value: int) -> int:
16 logger.info(f'calc started in process {current_process().name} with pid {current_process().pid}')
17 time.sleep(1)
18 return value**10
19
20
21if __name__ == '__main__':
22 logger.info(f'main pid: {os.getpid()}')
23 with Pool(processes=2) as pool:
24 result1 = pool.apply(calc, (10,))
25 logger.info('result1: %s', result1)
26 result2 = pool.apply(calc, (10,))
27 logger.info('result2: %s', result2)
28 result3 = pool.apply(calc, (10,))
29 logger.info('result3: %s', result3)
30 result4 = pool.apply(calc, (10,))
31 logger.info('result4: %s', result4)
32
33
34 print('End program')
Пример вывода:
12023-01-25 18:21:40,027: main pid: 34098
22023-01-25 18:21:40,041: calc started in process ForkPoolWorker-1 with pid 34099
32023-01-25 18:21:41,044: result1: 10000000000
42023-01-25 18:21:41,047: calc started in process ForkPoolWorker-2 with pid 34100
52023-01-25 18:21:42,050: result2: 10000000000
62023-01-25 18:21:42,050: calc started in process ForkPoolWorker-1 with pid 34099
72023-01-25 18:21:43,052: result3: 10000000000
82023-01-25 18:21:43,053: calc started in process ForkPoolWorker-2 with pid 34100
92023-01-25 18:21:44,056: result4: 10000000000
10End program
Рассмотрим пример с apply. В начале скрипта происходит добавление необходимых импортов и базовая настройка логгера, для отображения времени. Определяем базовую функцию calc, которая будет возводить в 10 степень полученное число.
Затем в основном блоке с помощью контекстного менеджера with определяем пул процессов с максимальным числом равным двум. И начинаем вызывать функцию calc в отдельном процессе.
В консоли можно увидеть, что учавствовают два дочерних процесса с pid 34100 и 34099, но при этом параллельно они не работают, задержка между каждым выводом результат составляет одну секунду. Никакого выигрыша в производительность в данном случае нет.
Рассмотрим пример с функцией map:
1import os
2from multiprocessing import Pool, current_process
3import time
4import sys
5
6import logging
7
8logger = logging.getLogger(__name__)
9logger.setLevel(logging.DEBUG)
10handler = logging.StreamHandler(stream=sys.stdout)
11handler.setFormatter(logging.Formatter(fmt='%(asctime)s: %(message)s'))
12logger.addHandler(handler)
13
14
15def calc(value: int) -> int:
16 logger.info(f'calc started in process {current_process().name} with pid {current_process().pid}')
17 time.sleep(1)
18 return value**10
19
20if __name__ == '__main__':
21 logger.info(f'main pid: {os.getpid()}')
22
23 l = [1, 2, 5, 10]
24 with Pool(processes=2) as pool:
25 results = pool.map(calc, l)
26 print(results)
27
28 print('End program')
Вывод в консоль:
12023-01-25 18:28:27,135: main pid: 34311
22023-01-25 18:28:27,148: calc started in process ForkPoolWorker-2 with pid 34313
32023-01-25 18:28:27,148: calc started in process ForkPoolWorker-1 with pid 34312
42023-01-25 18:28:28,150: calc started in process ForkPoolWorker-1 with pid 34312
52023-01-25 18:28:28,150: calc started in process ForkPoolWorker-2 with pid 34313
6[1, 1024, 9765625, 10000000000]
7End program
В основном коде программы заранее готовим список l с числами, для которых будем производить расчет и запускаем вызов функции calc с помощью метода map.
В консоли видим, что два дочерних процесса стартанули одновременно, затем еще два. Результат представлен в виде списка выходных данных функции calc. Также следует обратить внимание на то, что список упорядочен в соответвии со входными данными в списке l.
Попробуем вариант с использованием apply_async:
1import os
2from multiprocessing import Pool, current_process
3import time
4import sys
5
6import logging
7
8logger = logging.getLogger(__name__)
9logger.setLevel(logging.DEBUG)
10handler = logging.StreamHandler(stream=sys.stdout)
11handler.setFormatter(logging.Formatter(fmt='%(asctime)s: %(message)s'))
12logger.addHandler(handler)
13
14
15def calc(value: int) -> int:
16 logger.info(f'calc started in process {current_process().name} with pid {current_process().pid}')
17 time.sleep(1)
18 return value**10
19
20
21def callback(result):
22 logger.info(f'Callback result: {result}, process pid: {current_process().pid}')
23
24
25if __name__ == '__main__':
26 logger.info(f'main pid: {os.getpid()}')
27
28 with Pool(processes=2) as pool:
29 result1 = pool.apply_async(calc, (1,), callback=callback)
30 logger.info('result1: %s', result1)
31 result2 = pool.apply_async(calc, (2,), callback=callback)
32 logger.info('result2: %s', result2)
33 result3 = pool.apply_async(calc, (3,), callback=callback)
34 logger.info('result3: %s', result3)
35 result4 = pool.apply_async(calc, (4,), callback=callback)
36 logger.info('result4: %s', result4)
37
38 pool.close()
39 pool.join()
40
41 print('End program')
По сравнению с предыдущими примерами, добавлена функция callback, которая будет вызываться при завершении процесса. В основном коде происходит последовательная передача чисел 1,2,3,4 в функцию calc, передается также callback для каждого вызова. В конце контекстного менеджера добавлены две инструкции close() и join(). сlose() предотвращает отправку новых задач в пул. Как только все задачи будут выполнены, рабочие процессы завершатся, а join() ожидает завершения рабочих процессов. Без указания этих команд, основная программа просто завершит свою работу, не дождавшись выполнения процессов и вызова callback функции.
Вывод в консоль:
12023-01-25 18:33:03,614: main pid: 34515
22023-01-25 18:33:03,625: result1: <multiprocessing.pool.ApplyResult object at 0x7fa55b754100>
32023-01-25 18:33:03,626: result2: <multiprocessing.pool.ApplyResult object at 0x7fa55b754130>
42023-01-25 18:33:03,626: result3: <multiprocessing.pool.ApplyResult object at 0x7fa55b754340>
52023-01-25 18:33:03,626: result4: <multiprocessing.pool.ApplyResult object at 0x7fa55b754460>
62023-01-25 18:33:03,626: calc started in process ForkPoolWorker-1 with pid 34516
72023-01-25 18:33:03,626: calc started in process ForkPoolWorker-2 with pid 34517
82023-01-25 18:33:04,628: calc started in process ForkPoolWorker-2 with pid 34517
92023-01-25 18:33:04,628: calc started in process ForkPoolWorker-1 with pid 34516
102023-01-25 18:33:04,628: Callback result: 1024, process pid: 34515
112023-01-25 18:33:04,628: Callback result: 1, process pid: 34515
122023-01-25 18:33:05,630: Callback result: 1048576, process pid: 34515
132023-01-25 18:33:05,630: Callback result: 59049, process pid: 34515
14End program
Здесь мы видим, что все задачи для процессов были созданы одновременно. Функция apply_async вернула объект класса multiprocessing.pool.ApplyResult. Затем стартанули первые два процесса с pid 34516 и 34517, а затем вторые два.
После этого происходил вызов callback функции в основном процессе с pid 34515, но результаты перемешались. Сначала вывод для инструкции, которая была второй, затем вывод результата для первой инструкции и т.д. apply_async не гарантирует корректной последовательно получения результата, в отличае от map функции.