Как организовать группировку входящих запросов с использованием неблокирующего ожидания?

Учитывая, что у вас возникла необходимость снижения количества мелких запросов на запись в базу данных в пользу более редких, но более объемных операций, как можно реализовать такой подход без создания отдельного API для пакетной вставки объектов? Одна из идей — использовать TaskCompletionSource для отправки объектов в коллекцию, где каждый запрос будет ожидать завершения операции с помощью соответствующего TaskCompletionSource. В таком случае, фоновый шедуллер будет периодически извлекать объекты из коллекции для записи и устанавливать результаты выполнения в соответствующие TaskCompletionSource. Насколько целесообразна такая концепция и насколько она может быть корректной? Существуют ли уже готовые решения для реализации такого подхода?
  • 24 ноября 2024 г. 21:13
Ответы на вопрос 2
Концепция, которую вы описали, имеет свои плюсы и может быть реализована достаточно эффективно. Давайте рассмотрим некоторые аспекты и детали этой идеи.

### Основные идеи реализации

1. **TaskCompletionSource**: Your idea of using `TaskCompletionSource` для ожидания завершения операции является разумным. Каждый запрос помещается в очередь, и соответствующий `TaskCompletionSource` может быть использован для того, чтобы сообщить клиенту об результате выполнения операции.

2. **Фоновый шедулер**: Вы можете создать фоновый шедулер (например, с помощью `Timer` или `Task`), который будет периодически проверять, есть ли объекты для обработки в очереди. Он будет извлекать и записывать их в базу данных.

3. **Группировка запросов**: Вместо того чтобы обрабатывать каждый запрос по отдельности, шедулер может собирать запросы в пакет и отправлять их за один раз в базу данных. Это можно сделать через задержку, чтобы ожидать дополнительные запросы, прежде чем отправить пакет.

4. **Потоко-безопасность**: Обязательно реализуйте доступ к вашей очереди (или коллекции) таким образом, чтобы избежать проблем с конкурентным доступом. Можно использовать `ConcurrentQueue`, которая предоставит необходимую безопасность при работе с потоками.

### Пример реализации

Вот примерный фрагмент кода, который иллюстрирует данную концепцию:

```csharp
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

public class RequestBatcher<T>
{
    private readonly ConcurrentQueue<(T request, TaskCompletionSource<bool> tcs)> _queue = new();
    private readonly Timer _timer;
    private readonly TimeSpan _batchInterval = TimeSpan.FromMilliseconds(100);
    private readonly int _maxBatchSize = 100;

    public RequestBatcher()
    {
        _timer = new Timer(ProcessBatch, null, _batchInterval, _batchInterval);
    }
    
    public Task<bool> AddRequest(T request)
    {
        var tcs = new TaskCompletionSource<bool>();
        _queue.Enqueue((request, tcs));
        return tcs.Task;
    }

    private void ProcessBatch(object state)
    {
        var batch = new List<(T request, TaskCompletionSource<bool> tcs)>();
        
        while (_queue.TryDequeue(out var item) && batch.Count < _maxBatchSize)
        {
            batch.Add(item);
        }

        if (batch.Count > 0)
        {
            // Здесь выполняется запись в базу данных
            // db.Insert(batch.Select(b => b.request));

            // Уведомляем все TaskCompletionSource о результате
            foreach (var (request, tcs) in batch)
            {
                tcs.SetResult(true); // Или false, в зависимости от результата операции
            }
        }
    }
}
```

### Готовые решения

Существуют библиотеки и фреймворки, которые могут помочь в реализации подобной логики:

- **Hangfire**: Это библиотека для фоновых задач, которая может обрабатывать групповые операции и планирование.
- **MassTransit или N Service Bus**: Они предоставляют решения для работы с сообщениями и событиями, включая пакетную обработку.

### Заключение

Ваше решение с использованием `TaskCompletionSource` и задания с фоновой обработкой имеет смысл и может быть корректным. В случае, если вы хотите уменьшить нагрузку на БД, группа запросов вместо отдельных операций — хороший подход. Однако убедитесь, что вы тщательно протестировали его на предмет таймингов и обработки ошибок, чтобы избежать потенциальных проблем с потерей данных или зависаниями.
А вам точно это нужно? 
Потому что тут у вас появляется нюанс, который не совсем ложится на схему работы веб-приложения: у ваших отдельных запросов появляется общее состояние. Как минимум, это - накапливаемый пакет запросов, а ещё, наверное, в это состояние входит нечто общее для всех запросов для работы с БД: подключение или, если используется Entity Framework, DbContext. Это общее состояние придется как-то хранить, регулировать доступ к нему (DbContext к примеру, параллельный доступ не поддерживает в принципе), и вовремя это состояние удалять. Если посмотреть на стандартные механизмы ASP.NET Core, то сессия (ISession) для этого, наверное, не подойдет - там можно хранить только сереализуемые в байты пассивные объекты, и насчет регулировки доступа там непросто. Подойдет концентратор (Hub) SignalR, у которого есть сохраняемый между вызовами контекст подкючения - но ради него, скорее всего, потребуется менять способ вызова API из браузера: у него там своя клиентская библиотека.
Ну и, по-любому, как-то надо реализовывать активную часть - которая, собственно, отслеживает пакет изменений и вовремя отправляет его в БД.
Ваша идея
Шедуллер в фоне будет периодически читать коллекцю объектов на запись и устанавливать результат выполнения в соотвтвующий TaskComplitionSource.

мне не совсем нравится. Зачем периодически? Kучше чтобы эта активная часть срабатывала по факту добавления запроса в пакет - например, асинхронно ожидала Task от TaskComplitionSource. который метод добавления запроса завершал бы по факту добавления завершающего пакет запроса. Но и завершение по таймауту тоже предусмотреть надо - по жизни оно всякое бывает: обычно для таких целей используется WaitAny для комбинации основной ожидаемой задачи с Delay по таймауту.
Ну, а ещё требуется, наверное, чтобы для каждого пользователя состояние было свое. В принципе, это делается, но надо делать. Для SignalR для этого можно использовать Hub.Context.Items - это словарь, который может содержать произвольные объекты, и сохраняется на время действия всего подключения.

А ещё у меня, в принципе, есть своя самодельная библиотека, которая решает ту же задачу - сохранение контекста сеанса, в том числе - активного, с выполняющимся кодом. Я описывал ее недавно в статьях (кроме основной статьи есть дополнительная) на Хабре. Можете попробовать её, если переделывать API не хочется: она вполне годится для работы с API на базе MVC API Controller. или Minimal API. В принципе, она заточена немного под другую задачу - получние и возврат дополнительных результатов в фоне, но для вашей задачи она тоже подойдет. Напишу тут сразу технические подробности как использовать: ссылку на активную часть, собирающую и отправляющую пакет, можно хранить в IActiveSession.Properties, точно так же, как если бы вы хранили ее в Hub.Context.Items, а обработчик завершения, который прибирает за собой (в SignalR его место в OnDisconnect) - привязать к IActiveSession.CompletionToken через его метод Register. В общее для всез запросов состояние входит свой контейнер сервисов со своей областью действия в течение всего существования состояния, так что, если для работы с БД требуется Scoped-сервис из DI-контейнера, то его можно получить оттуда (в дополнительной статье написано, как, а также написано, как защититься от нежелательного одновременного доступа к такому сервису).

Только вот библиотека эта, естественно, ни разу не стадартная, use at your own risk. Но если попробуете ее, мне будет интересно, что получилось. В том числе - и обнаруженные ошибки, заодно я и для вас их исправлю :-) .

PS И неплохо было бы IMHO, чтобы дальнейший диалог, если он будет, шел на русском языке, без "батч", "эвейтить", "шедулер" и прочих транслитераций, IMHO лучше уж по-английски писать, если перевод неизвестен. А то я человек старый, мне читать этот пиджин тяжеловато.

PPS А ещё благодарю за идею, о том, в какую сторону мне развивать мою библиотеку.
Похожие вопросы