Как правильно обработать все сообщения в RabbitMQ, а затем удалить очередь и закрыть соединение?

Как улучшить скрипт для обработки всех сообщений в RabbitMQ очереди и закрытия соединения после завершения обработки?

У меня есть скрипт, который подключается к RabbitMQ, обрабатывает все сообщения в очереди и затем закрывает соединение. Однако, он продолжает ждать новых сообщений, даже если очередь уже пуста.

Вот мой код:

```php
$queue = 'my_queue';
$connection = new AMQPStreamConnection('rabbitmq', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->queue_declare($queue, false, true, false, false);

$callback = function (AMQPMessage $msg) {
    echo json_decode($msg->body, true);
    $msg->ack();
};

$channel->basic_qos(null, 1, null);
$channel->basic_consume($queue, '', false, false, false, false, $callback);

while (count($channel->callbacks)) {
    $channel->wait();
}

$channel->queue_delete($queue);
$channel->close();
$connection->close();
```

Как можно исправить эту проблему и завершить скрипт после обработки всех сообщений?
  • 21 сентября 2023 г. 11:57
Ответы на вопрос 3
Ваш код сейчас бесконечно ожидает новых сообщений, потому что вызов `$channel->wait();` блокирует выполнение скрипта до получения нового сообщения.

Чтобы исправить эту проблему, вы можете использовать `basic_get` вместо `basic_consume`. `basic_get` позволяет получить одно сообщение из очереди, или вернуть `null`, если очередь пуста.

Улучшенный код будет выглядеть следующим образом:

```php
$queue = 'my_queue';
$connection = new AMQPStreamConnection('rabbitmq', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->queue_declare($queue, false, true, false, false);

while ($msg = $channel->basic_get($queue)) {
    echo json_decode($msg->body, true);
    $channel->basic_ack($msg->delivery_info['delivery_tag']);
}

$channel->queue_delete($queue);
$channel->close();
$connection->close();
```

Теперь код будет получать сообщения из очереди одно за другим, пока очередь не станет пустой. Затем он удалит очередь и закроет соединение.
Странно, что нужно удалять очередь. Если это необходимо, можно отправить сообщение о том, что больше нет сообщений. Так обработчик очереди получит это сообщение и будет знать, что очередь можно удалить.
Очередь неограниченной длины, и поэтому программа не может сама определить, прочитано ли последнее сообщение. Сетевой протокол также не помогает в этом, поскольку отправляет сообщения группами, и невозможно гарантировать, что группа является последней. Единственное разумное решение - создать отдельное сообщение-терминатор, которое будет сигнализировать о конце сообщений.
Похожие вопросы