diff --git a/src/Pole.ReliableMessage.Masstransit/MasstransitReliableEventHandler.cs b/src/Pole.ReliableMessage.Masstransit/MasstransitReliableEventHandler.cs index c5e3a02..e819699 100644 --- a/src/Pole.ReliableMessage.Masstransit/MasstransitReliableEventHandler.cs +++ b/src/Pole.ReliableMessage.Masstransit/MasstransitReliableEventHandler.cs @@ -16,9 +16,11 @@ namespace Pole.ReliableMessage.Masstransit public abstract class ReliableEventHandler : IReliableEventHandler, IConsumer where TEvent : class { + private const string FIRST_TIME_STORAGE_EXECUTE_ERROR_TAG = "FirstTimeStorageExecuteErrorTag"; private readonly IMessageStorage _messageStorage; private readonly ILogger> _logger; private readonly IServiceProvider _serviceProvider; + private bool FirstTimeStorageExecuteErrorTag = false; public ReliableEventHandler(IServiceProvider serviceProvider) { _messageStorage = serviceProvider.GetRequiredService(); @@ -38,22 +40,29 @@ namespace Pole.ReliableMessage.Masstransit _logger.LogDebug($"Message Begin Handle,messageId:{messageId}, message content :{json}"); } - var retryAttempt = context.GetRetryAttempt(); - if (retryAttempt == 0) - { - if (string.IsNullOrEmpty(messageId)) - { - _logger.LogWarning($"Message has no ReliableMessageId, ignore"); - return; - } - var isHandled = !await _messageStorage.CheckAndUpdateStatus(m => m.Id == messageId, MessageStatus.Handed); - if (isHandled) - { - _logger.LogTrace($"This message has handled begore ReliableMessageId:{messageId}, ignore"); - return; - } - } + //var retryAttempt = context.GetRetryAttempt(); + //if (retryAttempt == 0) + //{ + // if (string.IsNullOrEmpty(messageId)) + // { + // _logger.LogWarning($"Message has no ReliableMessageId, ignore"); + // return; + // } + // var isHandled = !await _messageStorage.CheckAndUpdateStatus(m => m.Id == messageId, MessageStatus.Handed); + // if (isHandled) + // { + // _logger.LogTrace($"This message has handled begore ReliableMessageId:{messageId}, ignore"); + // return; + // } + await Handle(new DefaultReliableEventHandlerContext(context)); + //} + //else + //{ + // // 确保 Handle 前 + // await _messageStorage.CheckAndUpdateStatus(m => m.Id == messageId, MessageStatus.Handed); + // await Handle(new DefaultReliableEventHandlerContext(context)); + //} _logger.LogDebug($"Message handled successfully ,messageId:{messageId}"); } diff --git a/src/Pole.ReliableMessage.Storage.Abstraction/IMessageStorage.cs b/src/Pole.ReliableMessage.Storage.Abstraction/IMessageStorage.cs index d2a265e..10c4e70 100644 --- a/src/Pole.ReliableMessage.Storage.Abstraction/IMessageStorage.cs +++ b/src/Pole.ReliableMessage.Storage.Abstraction/IMessageStorage.cs @@ -26,6 +26,14 @@ namespace Pole.ReliableMessage.Storage.Abstraction Task> GetMany(Expression> filter, int count); /// + /// + /// + /// + /// + /// + Task GetOne(Expression> filter); + + /// /// 批量更新 /// 更新这几个值 MessageStatusId , RetryTimes LastRetryUTCTime, NextRetryUTCTime /// diff --git a/src/Pole.ReliableMessage.Storage.Abstraction/MessageStatus.cs b/src/Pole.ReliableMessage.Storage.Abstraction/MessageStatus.cs index 1cc5d53..049dd6b 100644 --- a/src/Pole.ReliableMessage.Storage.Abstraction/MessageStatus.cs +++ b/src/Pole.ReliableMessage.Storage.Abstraction/MessageStatus.cs @@ -10,7 +10,7 @@ namespace Pole.ReliableMessage.Storage.Abstraction public static MessageStatus Pending = new MessageStatus(3,"待发送"); public static MessageStatus Pushed = new MessageStatus(6,"已发送"); public static MessageStatus Canced = new MessageStatus(9,"已取消"); - public static MessageStatus Handed = new MessageStatus(12, "已处理"); + //public static MessageStatus Handed = new MessageStatus(12, "已处理"); public MessageStatus(int id,string name ):base(id,name) { diff --git a/src/Pole.ReliableMessage.Storage.Mongodb/MongodbStorage.cs b/src/Pole.ReliableMessage.Storage.Mongodb/MongodbStorage.cs index c7193a5..eb40f33 100644 --- a/src/Pole.ReliableMessage.Storage.Mongodb/MongodbStorage.cs +++ b/src/Pole.ReliableMessage.Storage.Mongodb/MongodbStorage.cs @@ -51,6 +51,10 @@ namespace Pole.ReliableMessage.Storage.Mongodb var update = Builders.Update.Set(m => m.MessageStatusId, messageStatus.Id); var beforeDoc = await collection.FindOneAndUpdateAsync(filter, update, new FindOneAndUpdateOptions() { ReturnDocument = ReturnDocument.Before }); + if (beforeDoc == null) + { + throw new Exception("IMessageStorage.CheckAndUpdateStatus Error ,Message not found in Storage"); + } if (beforeDoc.MessageStatusId == messageStatus.Id) { return false; @@ -84,7 +88,7 @@ namespace Pole.ReliableMessage.Storage.Mongodb var models = new List>(); foreach (var message in messages) { - FilterDefinition filter = Builders.Filter.Where(m => m.Id == message.Id && m.MessageStatusId != MessageStatus.Handed.Id); + FilterDefinition filter = Builders.Filter.Where(m => m.Id == message.Id); UpdateDefinition update = Builders.Update .Set(m => m.MessageStatusId, message.MessageStatus.Id) .Set(m => m.RetryTimes, message.RetryTimes) @@ -116,5 +120,10 @@ namespace Pole.ReliableMessage.Storage.Mongodb var result = await collection.DeleteManyAsync(filter); return result.DeletedCount; } + + public Task GetOne(Expression> filter) + { + throw new NotImplementedException(); + } } } diff --git a/src/Pole.ReliableMessage/Processor/MessageCleanProcessor.cs b/src/Pole.ReliableMessage/Processor/MessageCleanProcessor.cs index 7f176b1..fa2cff3 100644 --- a/src/Pole.ReliableMessage/Processor/MessageCleanProcessor.cs +++ b/src/Pole.ReliableMessage/Processor/MessageCleanProcessor.cs @@ -41,7 +41,7 @@ namespace Pole.ReliableMessage.Processor } _logger.LogInformation($"Begin clean message"); - var deletedCount = await _messageStorage.Delete(m => m.MessageStatusId == MessageStatus.Canced.Id || m.MessageStatusId == MessageStatus.Handed.Id); + var deletedCount = await _messageStorage.Delete(m => m.MessageStatusId == MessageStatus.Canced.Id || m.MessageStatusId == MessageStatus.Pushed.Id); _logger.LogInformation($"End clean message ,delete message count : {deletedCount} , successfully"); }