using Pole.ReliableMessage.Abstraction; using Pole.ReliableMessage.Masstransit.Pipe; using Pole.ReliableMessage.Messaging; using MassTransit; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using System; using System.Collections.Generic; using System.Text; using System.Threading.Tasks; using Pole.ReliableMessage.Storage.Abstraction; namespace Pole.ReliableMessage.Masstransit { public abstract class ReliableEventHandler : IReliableEventHandler, IConsumer where TEvent : class { private const string FIRST_TIME_STORAGE_EXECUTE_ERROR_TAG = "FirstTimeStorageExecuteErrorTag"; private readonly IMessageStorage _messageStorage; private readonly ILogger> _logger; private readonly IServiceProvider _serviceProvider; private bool FirstTimeStorageExecuteErrorTag = false; public ReliableEventHandler(IServiceProvider serviceProvider) { _messageStorage = serviceProvider.GetRequiredService(); var loggerFactory = serviceProvider.GetRequiredService(); _logger = loggerFactory.CreateLogger>(); _serviceProvider = serviceProvider; } public abstract Task Handle(IReliableEventHandlerContext context); public async Task Consume(ConsumeContext context) { var messageId = GetReliableMessageId(context); if (_logger.IsEnabled(LogLevel.Debug)) { var jsonConveter = _serviceProvider.GetRequiredService(typeof(IJsonConverter)) as IJsonConverter; var json = jsonConveter.Serialize(context.Message); _logger.LogDebug($"Message Begin Handle,messageId:{messageId}, message content :{json}"); } //var retryAttempt = context.GetRetryAttempt(); //if (retryAttempt == 0) //{ // if (string.IsNullOrEmpty(messageId)) // { // _logger.LogWarning($"Message has no ReliableMessageId, ignore"); // return; // } // var isHandled = !await _messageStorage.CheckAndUpdateStatus(m => m.Id == messageId, MessageStatus.Handed); // if (isHandled) // { // _logger.LogTrace($"This message has handled begore ReliableMessageId:{messageId}, ignore"); // return; // } await Handle(new DefaultReliableEventHandlerContext(context)); //} //else //{ // // 确保 Handle 前 // await _messageStorage.CheckAndUpdateStatus(m => m.Id == messageId, MessageStatus.Handed); // await Handle(new DefaultReliableEventHandlerContext(context)); //} _logger.LogDebug($"Message handled successfully ,messageId:{messageId}"); } private string GetReliableMessageId(ConsumeContext context) { return context.Headers.Get(AddReliableMessageIdPipe.RELIABLE_MESSAGE_ID, string.Empty); } } }