diff --git a/src/Pole.Core/PoleOptions.cs b/src/Pole.Core/PoleOptions.cs index 732065a..3a8fbbb 100644 --- a/src/Pole.Core/PoleOptions.cs +++ b/src/Pole.Core/PoleOptions.cs @@ -8,6 +8,9 @@ namespace Pole.Core public class PoleOptions { public int PendingMessageRetryIntervalSeconds { get; set; } = 30; + + public int ExpiredEventsPreBulkDeleteDelaySeconds { get; set; } = 3; + public int ExpiredEventsCollectIntervalSeconds { get; set; } = 60 * 60; public IServiceCollection Services { get; private set; } } } diff --git a/src/Pole.Core/Processor/ExpiredEventsCollectorProcessor.cs b/src/Pole.Core/Processor/ExpiredEventsCollectorProcessor.cs new file mode 100644 index 0000000..72c25d9 --- /dev/null +++ b/src/Pole.Core/Processor/ExpiredEventsCollectorProcessor.cs @@ -0,0 +1,73 @@ +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using Pole.Core.EventBus.EventStorage; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; + +namespace Pole.Core.Processor +{ + class ExpiredEventsCollectorProcessor : IProcessor + { + private readonly ILogger logger; + private readonly IEventStorageInitializer initializer; + private readonly IEventStorage eventstorage; + private readonly PoleOptions poleOptions; + + private const int ItemBatch = 1000; + private readonly TimeSpan _waitingInterval = TimeSpan.FromMinutes(5); + private readonly TimeSpan _delay = TimeSpan.FromSeconds(1); + + public string Name => nameof(PendingMessageRetryProcessor); + + public ExpiredEventsCollectorProcessor( + ILogger logger, + IEventStorageInitializer initializer, + IEventStorage eventstorage, + IOptions poleOptions) + { + this.logger = logger; + this.initializer = initializer; + this.eventstorage = eventstorage; + this.poleOptions = poleOptions.Value; + } + + public async Task Process(ProcessingContext context) + { + try + { + var tables = new[] +{ + initializer.GetTableName(), + }; + + foreach (var table in tables) + { + logger.LogDebug($"Collecting expired data from table: {table}"); + + int deletedCount; + var time = DateTime.Now; + do + { + deletedCount = await eventstorage.DeleteExpiresAsync(table, time, ItemBatch, context.CancellationToken); + + if (deletedCount != 0) + { + await Task.Delay(poleOptions.ExpiredEventsPreBulkDeleteDelaySeconds * 1000); + } + } while (deletedCount != 0); + } + } + catch(Exception ex) + { + logger.LogError(ex, $"{nameof(ExpiredEventsCollectorProcessor)} Process Error"); + } + finally + { + + await Task.Delay(poleOptions.ExpiredEventsCollectIntervalSeconds * 1000); + } + } + } +}