Commit f75bb945 by dingsongjie

简化 ReliableEventHandler

parent cdda9bcf
......@@ -25,7 +25,7 @@ namespace Product.Api.Application.CommandHandler
}
public async Task<CommonCommandResponse> Handle(Command<AddProductTypeRequest, CommonCommandResponse> request, CancellationToken cancellationToken)
{
var productType = new Domain.ProductTypeAggregate.ProductType(request.Data.Id, request.Data.Name);
var productType = new Domain.ProductTypeAggregate.ProductType(Guid.NewGuid().ToString("N"), request.Data.Name);
_productTypeRepository.Add(productType);
ProductTypeAddedDomainEvent productTypeAddedDomainEvent = new ProductTypeAddedDomainEvent
......
......@@ -16,53 +16,21 @@ namespace Pole.ReliableMessage.Masstransit
public abstract class ReliableEventHandler<TEvent> : IReliableEventHandler<TEvent>, IConsumer<TEvent>
where TEvent : class
{
private const string FIRST_TIME_STORAGE_EXECUTE_ERROR_TAG = "FirstTimeStorageExecuteErrorTag";
private readonly IMessageStorage _messageStorage;
private readonly ILogger<ReliableEventHandler<TEvent>> _logger;
private readonly IServiceProvider _serviceProvider;
private bool FirstTimeStorageExecuteErrorTag = false;
public ReliableEventHandler(IServiceProvider serviceProvider)
{
_messageStorage = serviceProvider.GetRequiredService<IMessageStorage>();
var loggerFactory = serviceProvider.GetRequiredService<ILoggerFactory>();
_logger = loggerFactory.CreateLogger<ReliableEventHandler<TEvent>>();
_serviceProvider = serviceProvider;
}
public abstract Task Handle(IReliableEventHandlerContext<TEvent> context);
public async Task Consume(ConsumeContext<TEvent> context)
{
var messageId = GetReliableMessageId(context);
if (_logger.IsEnabled(LogLevel.Debug))
{
var jsonConveter = _serviceProvider.GetRequiredService(typeof(IJsonConverter)) as IJsonConverter;
var json = jsonConveter.Serialize(context.Message);
_logger.LogDebug($"Message Begin Handle,messageId:{messageId}, message content :{json}");
}
//var retryAttempt = context.GetRetryAttempt();
//if (retryAttempt == 0)
//{
// if (string.IsNullOrEmpty(messageId))
// {
// _logger.LogWarning($"Message has no ReliableMessageId, ignore");
// return;
// }
// var isHandled = !await _messageStorage.CheckAndUpdateStatus(m => m.Id == messageId, MessageStatus.Handed);
// if (isHandled)
// {
// _logger.LogTrace($"This message has handled begore ReliableMessageId:{messageId}, ignore");
// return;
// }
_logger.LogDebug($"Message Begin Handle,messageId:{messageId}");
await Handle(new DefaultReliableEventHandlerContext<TEvent>(context));
//}
//else
//{
// // 确保 Handle 前
// await _messageStorage.CheckAndUpdateStatus(m => m.Id == messageId, MessageStatus.Handed);
// await Handle(new DefaultReliableEventHandlerContext<TEvent>(context));
//}
_logger.LogDebug($"Message handled successfully ,messageId:{messageId}");
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment