using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using Pole.EventBus; using Pole.EventBus.Event; using Pole.EventBus.EventStorage; using Pole.Core.Serialization; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using Pole.Core.Processor; using Pole.Core; namespace Pole.EventBus.Processor { class PendingMessageRetryProcessor : ProcessorBase { private readonly IEventStorage eventStorage; private readonly PoleEventBusOption options; private readonly IProducerInfoContainer producerContainer; private readonly ISerializer serializer; private readonly ILogger logger; private readonly IProducer producer; private readonly IEventBuffer eventBuffer; public PendingMessageRetryProcessor(IEventStorage eventStorage, IOptions options, ILogger logger, IProducerInfoContainer producerContainer, ISerializer serializer, IProducer producer, IEventBuffer eventBuffer) { this.eventStorage = eventStorage; this.options = options.Value ?? throw new Exception($"{nameof(PoleEventBusOption)} Must be injected"); this.logger = logger; this.producerContainer = producerContainer; this.serializer = serializer; this.producer = producer; this.eventBuffer = eventBuffer; } public override string Name => nameof(PendingMessageRetryProcessor); public override async Task Process(ProcessingContext context) { try { await ProcessInternal(); } catch (Exception ex) { logger.LogError(ex, $"{nameof(PendingMessageRetryProcessor)} Process Error"); } finally { await Task.Delay(options.PendingMessageRetryIntervalSeconds * 1000); } } public async Task ProcessInternal() { var now = DateTime.UtcNow; var pendingMessages = await eventStorage.GetMessagesOfNeedRetry(); if (logger.IsEnabled(LogLevel.Debug)) { logger.LogDebug($"{nameof(PendingMessageRetryProcessor)} pendingMessages count:{pendingMessages.Count()}"); } foreach (var pendingMessage in pendingMessages) { var eventContentBytes = Encoding.UTF8.GetBytes(pendingMessage.Content); var bytesTransport = new EventBytesTransport(pendingMessage.Name, pendingMessage.Id, eventContentBytes); var bytes = bytesTransport.GetBytes(); if (pendingMessage.Retries > options.MaxFailedRetryCount) { pendingMessage.StatusName = nameof(EventStatus.Failed); continue; } pendingMessage.Retries++; var targetName = producerContainer.GetTargetName(pendingMessage.Name); await producer.Publish(targetName, bytes); pendingMessage.StatusName = nameof(EventStatus.Published); } if (pendingMessages.Count() > 0) { if (pendingMessages.Count() > 10) { await eventStorage.BulkChangePublishStateAsync(pendingMessages); } else { await eventStorage.ChangePublishStateAsync(pendingMessages); } } } } }