Как можно объединить результаты, полученные от каждого процесса в Pool?

  • 15 августа 2023 г. 12:17
Ответы на вопрос 5
Проблема в том, что вы используете объект multiprocessing.Manager для совместного использования данных между процессами, но вы не правильно обрабатываете результаты. 

Вместо использования словаря outcomes, вы должны вернуть результаты из функции reader и обрабатывать их в основном процессе. Например, вы можете использовать метод Pool.map вместо Pool.starmap, который автоматически соберет результаты из каждого процесса в итераторе. Затем вы можете пройти по этому итератору и добавить элементы в словарь outcomes.

Вот как может выглядеть измененный код:

from itertools import repeat
from multiprocessing import Pool, Manager

def reader(entry): # other args are constant
   .............................................
   #Some condition:
   prediction = min(distances) + (mlc_data[min(distances)[1]],)
   return entry, prediction

manager = Manager()
outcomes = manager.dict()
with Pool(3) as p:
  results = p.map(reader, input[0])
  for entry, prediction in results:
    outcomes[entry_pairs[' '.join(entry)]] = prediction

Также вам следует обратить внимание на предупреждение DeprecationWarning, которое говорит о том, что использование функций-команд столкнется с проблемами при совместном использовании пула. Вместо этого вы можете использовать стандартный синтаксис управления контекстом (with) для автоматического закрытия и объединения пула.
Используйте outcomes после выполнения Pool, попробуйте так:from itertools import repeat
from multiprocessing import Pool, Manager

def reader(entry, outcomes):
    # ...

    distances = calculate_distances(entry)
    min_distance = min(distances)
    prediction = min_distance + (mlc_data[min_distance[1]],)
    outcomes[entry_pairs[' '.join(entry)]] = prediction

manager = Manager()
outcomes = manager.dict()

with Pool(3) as p:
    # Обрабатываем каждый элемент из input[0] и сохраняем результаты в outcomes:
    p.starmap(reader, zip(input[0], repeat(outcomes)))
    p.close()
    p.join()

print(outcomes)  # должен содержать все элементы
from itertools import repeat
from multiprocessing import Pool, Manager

def reader(entry, outcomes):
    # ...

    distances = calculate_distances(entry)
    min_distance = min(distances)
    prediction = min_distance + (mlc_data[min_distance[1]],)
    outcomes[entry_pairs[' '.join(entry)]] = prediction

manager = Manager()
outcomes = manager.dict()

with Pool(3) as p:
    # Обрабатываем каждый элемент из input[0] и сохраняем результаты в outcomes:
    p.starmap(reader, zip(input[0], repeat(outcomes)))
    p.close()
    p.join()

print(outcomes)  # должен содержать все элементы
Используйте переменную outcomes после завершения работы пула процессов. Вот так:
```python
from itertools import repeat
from multiprocessing import Pool, Manager

def reader(entry, outcomes):
    # ...

prediction = min(distances) + (mlc_data[min(distances)[1]],)
outcomes[entry_pairs[' '.join(entry)]] = prediction

manager = Manager()
outcomes = manager.dict()

with Pool(3) as p:
    # Обрабатываем каждый элемент из input[0] и сохраняем результаты в outcomes:
    p.starmap(reader, zip(input[0], repeat(outcomes)))
    p.close()
    p.join()

print(outcomes)  # должен содержать все элементы
```
Похожие вопросы