Так как у процессов нет общей памяти, очень важно организовывать доставку параметров и результатов выполнения разных типов для корректной работы программы.
Для обеспечения этой потребности существуют различные модели обмена данными. Но мы остановимся на одной из них – очереди.
Очередь – это структура данных с типом FIFO (First in, First out), первым пришел, первым ушел. Очереди позволяют обмениваться сообщениями между группами процессов.
Задача “producer-consumer” описывает два процесса: один является поставщиком данных или задач, т.е. является их производителем, а второй получает их и обрабатывает, т.е. потребляет и удаляет их из очереди. Совместно они используют общий буфер для обмена сообщениями – очередь.
Рассмотрим на примере:
1from multiprocessing import Process, get_context
2from multiprocessing.queues import Queue
3import time
4
5
6class Producer(Process):
7 def __init__(self, queue: Queue):
8 super().__init__()
9 self.__queue = queue
10
11 def run(self):
12 for msg in range(5):
13 self.__queue.put(msg)
14 time.sleep(0.5)
15
16
17class Consumer(Process):
18 def __init__(self, queue: Queue, queue_result: Queue):
19 super().__init__()
20 self.__queue = queue
21 self.__queue_result = queue_result
22
23 def run(self):
24 while True:
25 if self.__queue.empty():
26 print('Queue is empty. Exit.')
27 break
28 else:
29 item = self.__queue.get()
30 res = item ** 2
31 print(res)
32 self.__queue_result.put(res)
33 time.sleep(1)
34
35
36if __name__ == '__main__':
37 data = []
38 context = get_context('spawn')
39 queue = Queue(ctx=context)
40 queue_result = Queue(ctx=context)
41
42 producer_process = Producer(queue)
43 consumer_process = Consumer(queue, queue_result)
44 producer_process.start()
45 consumer_process.start()
46 producer_process.join()
47 consumer_process.join()
48
49 while not queue_result.empty():
50 data.append(queue_result.get())
51 print(data)
Сначала импортируем необходимые библиотеки. Затем создаем класс Producer, унаследованный от Process. В методе __init__ сохраняем очередь и переопределяем метод run. По умолчанию этот метод вызывает в дочернем процессе после инициализации. Если он не определен, то вызывается функция, переданная в параметре target. В текущей реализации, мы просто генерируем числа от 0 до 4 и кладем их в очередь queue.
Далее реализован класс Consumer, также от родительноского класса Process. В конструктор класса передаем уже две очереди. Одна, из которой будем вычитывать данные и вторая, в которую будем помещать результат расчетов. В методе run производим возведение в квадрат, перемещение результата в очередь результатов, а также проверка – пуста ли очередь. Если пуста, то завершаем работу консьюмера. Если этого не сделать, то выход из программы не произойдет и она будет работать постоянно.
Затем основная программа. Создаем экземпляры очередей и продьюсера с консьюмером. Стартуем оба процесса с помощью метода start() и указываем, что ждем завершения их работы методом join().
После завершения работы процессов обработки данных, вычитываем из очереди результатов значения queue_result в цикле while и выводим их на экран.
Результат работы программы представлен ниже.
10
21
34
49
516
6Queue is empty. Exit.
7[0, 1, 4, 9, 16]
Также разберем отдельно функцию get_context. Для каждой очереди необходимо указать контекст. Всего доступно три варианта:
Для упрощения работы с несколькими параллельными процессами можно использовать класс Pool из библиотеки multiprocessing. Примеры использования можно найти в нашей статье.