Вопрос о том, как токи системы, использователь фоновых задач с библиотекой BullMQ, требует слаженной организации обработки задач так, чтобы задачи, касающиеся одного и того же параметра (например, `cars 1`), выполнялись последовательно, а задачи для разных параметров могли выполняться параллельно.
BullMQ позволяет настроить обработку задач с использованием очередей и группировок, что поможет достичь желаемого поведения. Для того чтобы задачи для каждого `car` обрабатывались последовательно, можно использовать "конкурентные" задачи по заданному `car ID` с ограничением на количество одновременно работающих процессов. Подход заключается в использовании отдельных очередей для каждой группы задач или же в разделении с использованием специальной метки (группы), чтобы задачи обрабатывались поочередно.
Вот пример кода на TypeScript, иллюстрирующий данное решение:
```typescript
import { Worker, Job, QueueScheduler, Queue } from 'bullmq';
import { setTimeout } from 'node:timers/promises';
const queueName = 'cars';
const queue = new Queue(queueName, {
limiter: {
max: 1, // Максимум 1 задача одновременно
duration: 1000,
},
});
new QueueScheduler(queueName);
// Функция для добавления задач в очередь
async function addJobs() {
await queue.obliterate({ force: true });
await queue.add('car 1', { data: 'job data A' });
await queue.add('car 1', { data: 'job data B' });
await queue.add('car 2', { data: 'job data C' });
await queue.add('car 3', { data: 'job data D' });
}
function createWorker(carId: string) {
const worker = new Worker(queueName, async (job: Job) => {
if (job.name === `car ${carId}`) {
console.log(`< started process for job ${job.id} ${job.data.data} for ${carId}`);
await setTimeout(2000); // Эмулируем длительную обработку
console.log(`> ended process for job ${job.id} ${job.data.data} for ${carId}`);
}
}, {
connection: {
host: 'localhost', // Укажите ваш хост
port: 6379, // Укажите ваш порт
},
// Создаем хэндлеры для максимальной параллельной обработки
// используя job ID или carId как ключ в нужной логике
});
// Все воркеры могут обрабатывать задачи для разных carId
}
// Добавляем задачи в очередь
addJobs().then(() => {
createWorker('1');
createWorker('2');
createWorker('3');
});
```
### Объяснение кода:
1. **Очередь**: Основная очередь `queue` создается с ограничением на один запущенный процесс, чтобы обеспечить последовательную обработку задач.
2. **Добавление задач**: Функция `addJobs` добавляет задачи в очередь, используя `obliterrate` для очистки предыдущих задач (если необходимо).
3. **Создание воркеров**: Функция `createWorker` создает воркеров, которые обрабатывают задачи. Здесь можно кодировать идентификатор `car`, чтобы воркеры обрабатывали их поочередно. Важно, что обработка происходит в одном воркере для одного `car`.
### Замечание:
Этот код создает независимые воркеры для каждого `carId`, однако, BullMQ обеспечивает механизм блокировок, когда несколько задач добавлены в очередь. Это может потребовать дополнительного управления логикой обработки, если вам необходимо более сложное поведение с учетом других условий.