Как создать очередь в Reactor Netty?

Как я могу реализовать операцию блокировки очереди запросов с помощью инструментов Reactor Netty, чтобы обрабатывать не более 3 запросов в секунду и блокировать остальные на 10 секунд? Мое текущее решение с виртуальным окном застревает в цикле, может быть у вас есть другие подходы для решения этой проблемы? Вот мой текущий код, который не работает как ожидалось:

```java
public Mono<Void> handler(@NotNull HttpServerRequest request, HttpServerResponse response, TriFunction<JsonObject, HttpServerRequest, HttpServerResponse, Void> success, @NotNull Consumer<? super Throwable> failure) {
    return Flux.interval(Duration.ofSeconds(1))
            .window(Duration.ofSeconds(1))
            .flatMap(Flux::count)
            .doOnNext(System.out::println)
            .filter(count -> count <= 3)
            .delayElements(Duration.ofSeconds(5))
            .then(sendRequest(request, response, success, failure));
}

private @NotNull Mono<Void> sendRequest(@NotNull HttpServerRequest request, HttpServerResponse response, TriFunction<JsonObject, HttpServerRequest, HttpServerResponse, Void> success, @NotNull Consumer<? super Throwable> failure) {
    return request.receive()
            .aggregate()
            .asString()
            .publishOn(Schedulers.boundedElastic())
            .map(body -> Json.createReader(new StringReader(body)).readObject())
            .doOnSuccess(json -> success.apply(json, request, response))
            .doOnError(failure)
            .then();
}
```
  • 20 января 2024 г. 22:48
Ответы на вопрос 2
В вашем текущем подходе используется `Flux.interval` и `window`, но из вашего описания не совсем понятно, как эти операторы должны работать в концепции управления запросами. Если вы хотите обработать не более трех запросов в секунду, возможно, вам подойдет использование оператора `rateLimiter`. 

Вот пример концептуального подхода, как ограничить количество обрабатываемых запросов с использованием `rateLimiter`, который создается с помощью пакета `reactor-addons`. В этом примере `rateLimiter` настроен на разрешение трех запросов в секунду:

```java
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.netty.http.server.HttpServerRequest;
import reactor.netty.http.server.HttpServerResponse;

import javax.json.Json;
import javax.json.JsonObject;
import java.io.StringReader;
import java.time.Duration;
import java.util.function.Consumer;

public class RateLimitedHandler {

    private RateLimiter rateLimiter; // Настройте ваш RateLimiter здесь

    public RateLimitedHandler() {
        this.rateLimiter = RateLimiter.of("requestRateLimiter", RateLimiterConfig.custom()
                .limitRefreshPeriod(Duration.ofSeconds(1))
                .limitForPeriod(3)
                .timeoutDuration(Duration.ofSeconds(10))
                .build());
    }

    public Mono<Void> handler(HttpServerRequest request, HttpServerResponse response, TriFunction<JsonObject, HttpServerRequest, HttpServerResponse, Void> success, Consumer<? super Throwable> failure) {
        return Mono.just(request)
            .flatMap(req -> Mono.fromCallable(() -> {
                if (!rateLimiter.acquirePermission()) {
                    throw new RateLimitException("Rate limit exceeded");
                }
                return req;
            }))
            .publishOn(Schedulers.boundedElastic())
            .delaySubscription(Duration.ofSeconds(5)) // Удалите, если не хотите задержку между запросами
            .flatMap(req -> sendRequest(req, response, success, failure))
            .onErrorResume(t -> {
                // Обработайте ошибки здесь (например. RateLimitException или другие)
                return Mono.empty();
            });
    }

    private Mono<Void> sendRequest(HttpServerRequest request, HttpServerResponse response, TriFunction<JsonObject, HttpServerRequest, HttpServerResponse, Void> success, Consumer<? super Throwable> failure) {
        return request.receive()
                .aggregate()
                .asString()
                .publishOn(Schedulers.boundedElastic())
                .map(body -> Json.createReader(new StringReader(body)).readObject())
                .doOnSuccess(json -> success.apply(json, request, response))
                .doOnError(failure)
                .then();
    }

    // Определите свой RateLimitException
    public static class RateLimitException extends RuntimeException {
        public RateLimitException(String message) {
            super(message);
        }
    }

    // Определите ваш интерфейс TriFunction
    @FunctionalInterface
    public interface TriFunction<T, U, V, R> {
        R apply(T t, U u, V v);
    }
}
```

В этом примере `RateLimiter` из `reactor-addons` используется для контроля над количеством запросов, которые будут обработаны. Если лимит превышен, `RateLimiter` может выбросить `RateLimitException`. Обратите внимание, что взаимодействие с `RateLimiter` блокирующее, и поэтому его нужно запускать на отдельном потоке (например, используя `publishOn(Schedulers.boundedElastic())`).

Приведенный пример является лишь иллюстрацией подхода и требует доработки в зависимости от ваших потребностей и конкретных требований к реализации.
Для решения данной задачи в Reactor Netty можно воспользоваться оператором `flatMapSequential` для сохранения порядка выполнения запросов и оператором `window` для разделения потока запросов на временные интервалы. Затем, используя операторы `flatMap` и `take`, можно ограничить количество запросов в каждом временном интервале и отбросить остальные. Для реализации блокировки можно использовать оператор `delayElements`. 

Вот пример решения с использованием Reactor Netty:

В данном примере мы создаем поток с интервалом в одну секунду с помощью `Flux.interval`. Затем мы получаем тело каждого запроса с помощью `request.receive().aggregate().asString()`, преобразуем его в JSON объект и разделяем поток запросов на временные интервалы с помощью оператора `window`. Затем мы используем операторы `flatMap`, `take` и `delayElements` для ограничения количества запросов в каждом интервале и добавления задержки на блокировку.

Советую для этого использовать ТГ: @act_chat_bot
Похожие вопросы