Как наладить параллельную и последовательную обработку задач в соответствии с заданными условиями?

Как организовать систему для обработки фоновых задач таким образом, чтобы в любой момент времени выполнялась только одна задача для каждого конкретного параметра? <br/><br/> У меня есть очередь `cars`, в которую добавлены следующие фоновые задачи: <br/> - cars 1: job data A <br/> - cars 1: job data B <br/> - cars 2: job data C <br/> - cars 3: job data D <br/><br/> При этом я хочу, чтобы задачи для cars 1, такие как 'cars 1: job data A' и 'cars 1: job data B', выполнялись последовательно. Например, задача cars 1: job data X должна начинать выполнение только после завершения предыдущей задачи cars 1: job data W. <br/><br/> Важно, чтобы система могла обрабатывать задачи параллельно для разных параметров, таких как cars 1 и cars 2, но оставаться последовательной для задач, относящихся к одному и тому же параметру. Также следует учитывать, что может быть несколько воркеров, которые не должны знать о наличии друг друга. <br/><br/> Не могли бы вы привести пример кода на TypeScript, использующий bullmq для реализации этой логики? <br/><pre><code class="typescript">import { Worker, Job, Queue } from 'bullmq';
import { setTimeout } from 'node:timers/promises';

const queueName = 'cars';
const queue = new Queue(queueName);

await queue.obliterate({ force: true });
await queue.add('car 1', 'job data A');
await queue.add('car 1', 'job data B');
await queue.add('car 2', 'job data C');
await queue.add('car 3', 'job data D');

createWorker(1);
createWorker(2);

function createWorker(n: number) {
  const worker = new Worker(
    queueName,
    async (job: Job) =&gt; {
      console.log(
        `&lt; started process for job ${job.id} ${job.data} in worker ${n}`,
      );
      await setTimeout(2000); // Emulate long processing
      console.log(
        `&gt; ended process for job ${job.id} ${job.data} in worker ${n}`,
      );
    },
    {
      connection: { url: 're
  • 7 декабря 2024 г. 12:43
Ответы на вопрос 1
Вопрос о том, как токи системы, использователь фоновых задач с библиотекой BullMQ, требует слаженной организации обработки задач так, чтобы задачи, касающиеся одного и того же параметра (например, `cars 1`), выполнялись последовательно, а задачи для разных параметров могли выполняться параллельно.

BullMQ позволяет настроить обработку задач с использованием очередей и группировок, что поможет достичь желаемого поведения. Для того чтобы задачи для каждого `car` обрабатывались последовательно, можно использовать "конкурентные" задачи по заданному `car ID` с ограничением на количество одновременно работающих процессов. Подход заключается в использовании отдельных очередей для каждой группы задач или же в разделении с использованием специальной метки (группы), чтобы задачи обрабатывались поочередно.

Вот пример кода на TypeScript, иллюстрирующий данное решение:

```typescript
import { Worker, Job, QueueScheduler, Queue } from 'bullmq';
import { setTimeout } from 'node:timers/promises';

const queueName = 'cars';
const queue = new Queue(queueName, {
  limiter: {
    max: 1, // Максимум 1 задача одновременно
    duration: 1000,
  },
});

new QueueScheduler(queueName);

// Функция для добавления задач в очередь
async function addJobs() {
  await queue.obliterate({ force: true });
  await queue.add('car 1', { data: 'job data A' });
  await queue.add('car 1', { data: 'job data B' });
  await queue.add('car 2', { data: 'job data C' });
  await queue.add('car 3', { data: 'job data D' });
}

function createWorker(carId: string) {
  const worker = new Worker(queueName, async (job: Job) => {
    if (job.name === `car ${carId}`) {
      console.log(`< started process for job ${job.id} ${job.data.data} for ${carId}`);
      await setTimeout(2000); // Эмулируем длительную обработку
      console.log(`> ended process for job ${job.id} ${job.data.data} for ${carId}`);
    }
  }, {
    connection: {
      host: 'localhost', // Укажите ваш хост
      port: 6379, // Укажите ваш порт
    },
    // Создаем хэндлеры для максимальной параллельной обработки
    // используя job ID или carId как ключ в нужной логике
  });

  // Все воркеры могут обрабатывать задачи для разных carId
}

// Добавляем задачи в очередь
addJobs().then(() => {
  createWorker('1');
  createWorker('2');
  createWorker('3');
});
```

### Объяснение кода:
1. **Очередь**: Основная очередь `queue` создается с ограничением на один запущенный процесс, чтобы обеспечить последовательную обработку задач.
2. **Добавление задач**: Функция `addJobs` добавляет задачи в очередь, используя `obliterrate` для очистки предыдущих задач (если необходимо).
3. **Создание воркеров**: Функция `createWorker` создает воркеров, которые обрабатывают задачи. Здесь можно кодировать идентификатор `car`, чтобы воркеры обрабатывали их поочередно. Важно, что обработка происходит в одном воркере для одного `car`.

### Замечание:
Этот код создает независимые воркеры для каждого `carId`, однако, BullMQ обеспечивает механизм блокировок, когда несколько задач добавлены в очередь. Это может потребовать дополнительного управления логикой обработки, если вам необходимо более сложное поведение с учетом других условий.
Похожие вопросы