Commit 38809d97 by dingsongjie

移除 可靠消息的 幂等性,在微服务中每个api都需要在高并发下幂等,可靠消息中的幂等也需要 使用者自己保证幂等

parent aa873b89
...@@ -16,9 +16,11 @@ namespace Pole.ReliableMessage.Masstransit ...@@ -16,9 +16,11 @@ namespace Pole.ReliableMessage.Masstransit
public abstract class ReliableEventHandler<TEvent> : IReliableEventHandler<TEvent>, IConsumer<TEvent> public abstract class ReliableEventHandler<TEvent> : IReliableEventHandler<TEvent>, IConsumer<TEvent>
where TEvent : class where TEvent : class
{ {
private const string FIRST_TIME_STORAGE_EXECUTE_ERROR_TAG = "FirstTimeStorageExecuteErrorTag";
private readonly IMessageStorage _messageStorage; private readonly IMessageStorage _messageStorage;
private readonly ILogger<ReliableEventHandler<TEvent>> _logger; private readonly ILogger<ReliableEventHandler<TEvent>> _logger;
private readonly IServiceProvider _serviceProvider; private readonly IServiceProvider _serviceProvider;
private bool FirstTimeStorageExecuteErrorTag = false;
public ReliableEventHandler(IServiceProvider serviceProvider) public ReliableEventHandler(IServiceProvider serviceProvider)
{ {
_messageStorage = serviceProvider.GetRequiredService<IMessageStorage>(); _messageStorage = serviceProvider.GetRequiredService<IMessageStorage>();
...@@ -38,22 +40,29 @@ namespace Pole.ReliableMessage.Masstransit ...@@ -38,22 +40,29 @@ namespace Pole.ReliableMessage.Masstransit
_logger.LogDebug($"Message Begin Handle,messageId:{messageId}, message content :{json}"); _logger.LogDebug($"Message Begin Handle,messageId:{messageId}, message content :{json}");
} }
var retryAttempt = context.GetRetryAttempt(); //var retryAttempt = context.GetRetryAttempt();
if (retryAttempt == 0) //if (retryAttempt == 0)
{ //{
if (string.IsNullOrEmpty(messageId)) // if (string.IsNullOrEmpty(messageId))
{ // {
_logger.LogWarning($"Message has no ReliableMessageId, ignore"); // _logger.LogWarning($"Message has no ReliableMessageId, ignore");
return; // return;
} // }
var isHandled = !await _messageStorage.CheckAndUpdateStatus(m => m.Id == messageId, MessageStatus.Handed); // var isHandled = !await _messageStorage.CheckAndUpdateStatus(m => m.Id == messageId, MessageStatus.Handed);
if (isHandled) // if (isHandled)
{ // {
_logger.LogTrace($"This message has handled begore ReliableMessageId:{messageId}, ignore"); // _logger.LogTrace($"This message has handled begore ReliableMessageId:{messageId}, ignore");
return; // return;
} // }
}
await Handle(new DefaultReliableEventHandlerContext<TEvent>(context)); await Handle(new DefaultReliableEventHandlerContext<TEvent>(context));
//}
//else
//{
// // 确保 Handle 前
// await _messageStorage.CheckAndUpdateStatus(m => m.Id == messageId, MessageStatus.Handed);
// await Handle(new DefaultReliableEventHandlerContext<TEvent>(context));
//}
_logger.LogDebug($"Message handled successfully ,messageId:{messageId}"); _logger.LogDebug($"Message handled successfully ,messageId:{messageId}");
} }
......
...@@ -26,6 +26,14 @@ namespace Pole.ReliableMessage.Storage.Abstraction ...@@ -26,6 +26,14 @@ namespace Pole.ReliableMessage.Storage.Abstraction
Task<List<Message>> GetMany(Expression<Func<Message, bool>> filter, int count); Task<List<Message>> GetMany(Expression<Func<Message, bool>> filter, int count);
/// <summary> /// <summary>
///
/// </summary>
/// <param name="messageStatus"></param>
/// <param name="endRetryTime"></param>
/// <returns></returns>
Task<Message> GetOne(Expression<Func<Message, bool>> filter);
/// <summary>
/// 批量更新 /// 批量更新
/// 更新这几个值 MessageStatusId , RetryTimes LastRetryUTCTime, NextRetryUTCTime /// 更新这几个值 MessageStatusId , RetryTimes LastRetryUTCTime, NextRetryUTCTime
/// </summary> /// </summary>
......
...@@ -10,7 +10,7 @@ namespace Pole.ReliableMessage.Storage.Abstraction ...@@ -10,7 +10,7 @@ namespace Pole.ReliableMessage.Storage.Abstraction
public static MessageStatus Pending = new MessageStatus(3,"待发送"); public static MessageStatus Pending = new MessageStatus(3,"待发送");
public static MessageStatus Pushed = new MessageStatus(6,"已发送"); public static MessageStatus Pushed = new MessageStatus(6,"已发送");
public static MessageStatus Canced = new MessageStatus(9,"已取消"); public static MessageStatus Canced = new MessageStatus(9,"已取消");
public static MessageStatus Handed = new MessageStatus(12, "已处理"); //public static MessageStatus Handed = new MessageStatus(12, "已处理");
public MessageStatus(int id,string name ):base(id,name) public MessageStatus(int id,string name ):base(id,name)
{ {
......
...@@ -51,6 +51,10 @@ namespace Pole.ReliableMessage.Storage.Mongodb ...@@ -51,6 +51,10 @@ namespace Pole.ReliableMessage.Storage.Mongodb
var update = Builders<Message>.Update.Set(m => m.MessageStatusId, messageStatus.Id); var update = Builders<Message>.Update.Set(m => m.MessageStatusId, messageStatus.Id);
var beforeDoc = await collection.FindOneAndUpdateAsync(filter, update, new FindOneAndUpdateOptions<Message, Message>() { ReturnDocument = ReturnDocument.Before }); var beforeDoc = await collection.FindOneAndUpdateAsync(filter, update, new FindOneAndUpdateOptions<Message, Message>() { ReturnDocument = ReturnDocument.Before });
if (beforeDoc == null)
{
throw new Exception("IMessageStorage.CheckAndUpdateStatus Error ,Message not found in Storage");
}
if (beforeDoc.MessageStatusId == messageStatus.Id) if (beforeDoc.MessageStatusId == messageStatus.Id)
{ {
return false; return false;
...@@ -84,7 +88,7 @@ namespace Pole.ReliableMessage.Storage.Mongodb ...@@ -84,7 +88,7 @@ namespace Pole.ReliableMessage.Storage.Mongodb
var models = new List<WriteModel<Message>>(); var models = new List<WriteModel<Message>>();
foreach (var message in messages) foreach (var message in messages)
{ {
FilterDefinition<Message> filter = Builders<Message>.Filter.Where(m => m.Id == message.Id && m.MessageStatusId != MessageStatus.Handed.Id); FilterDefinition<Message> filter = Builders<Message>.Filter.Where(m => m.Id == message.Id);
UpdateDefinition<Message> update = Builders<Message>.Update UpdateDefinition<Message> update = Builders<Message>.Update
.Set(m => m.MessageStatusId, message.MessageStatus.Id) .Set(m => m.MessageStatusId, message.MessageStatus.Id)
.Set(m => m.RetryTimes, message.RetryTimes) .Set(m => m.RetryTimes, message.RetryTimes)
...@@ -116,5 +120,10 @@ namespace Pole.ReliableMessage.Storage.Mongodb ...@@ -116,5 +120,10 @@ namespace Pole.ReliableMessage.Storage.Mongodb
var result = await collection.DeleteManyAsync(filter); var result = await collection.DeleteManyAsync(filter);
return result.DeletedCount; return result.DeletedCount;
} }
public Task<Message> GetOne(Expression<Func<Message, bool>> filter)
{
throw new NotImplementedException();
}
} }
} }
...@@ -41,7 +41,7 @@ namespace Pole.ReliableMessage.Processor ...@@ -41,7 +41,7 @@ namespace Pole.ReliableMessage.Processor
} }
_logger.LogInformation($"Begin clean message"); _logger.LogInformation($"Begin clean message");
var deletedCount = await _messageStorage.Delete(m => m.MessageStatusId == MessageStatus.Canced.Id || m.MessageStatusId == MessageStatus.Handed.Id); var deletedCount = await _messageStorage.Delete(m => m.MessageStatusId == MessageStatus.Canced.Id || m.MessageStatusId == MessageStatus.Pushed.Id);
_logger.LogInformation($"End clean message ,delete message count : {deletedCount} , successfully"); _logger.LogInformation($"End clean message ,delete message count : {deletedCount} , successfully");
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment