diff --git a/src/Pole.Domain/Entity/Enumeration.cs b/src/Pole.Domain/Entity/Enumeration.cs index f3cfcb6..49f0865 100644 --- a/src/Pole.Domain/Entity/Enumeration.cs +++ b/src/Pole.Domain/Entity/Enumeration.cs @@ -27,20 +27,11 @@ namespace Pole.Domain return Name; } - public static IEnumerable GetAll() where T : Enumeration, new() + public static IEnumerable GetAll() where T : Enumeration { - var type = typeof(T); - var fields = type.GetTypeInfo().GetFields(BindingFlags.Public | BindingFlags.Static | BindingFlags.DeclaredOnly); + var fields = typeof(T).GetFields(BindingFlags.Public | BindingFlags.Static | BindingFlags.DeclaredOnly); - foreach (var info in fields) - { - var instance = new T(); - - if (info.GetValue(instance) is T locatedValue) - { - yield return locatedValue; - } - } + return fields.Select(f => f.GetValue(null)).Cast(); } public override bool Equals(object obj) @@ -69,19 +60,19 @@ namespace Pole.Domain return absoluteDifference; } - public static T FromValue(int value) where T : Enumeration, new() + public static T FromValue(int value) where T : Enumeration { var matchingItem = Parse(value, "value", item => item.Id == value); return matchingItem; } - public static T FromDisplayName(string displayName) where T : Enumeration, new() + public static T FromDisplayName(string displayName) where T : Enumeration { var matchingItem = Parse(displayName, "display name", item => item.Name == displayName); return matchingItem; } - private static T Parse(K value, string description, Func predicate) where T : Enumeration, new() + private static T Parse(K value, string description, Func predicate) where T : Enumeration { var matchingItem = GetAll().FirstOrDefault(predicate); diff --git a/src/Pole.ReliableMessage.Masstransit/DefaultReliableEventHandlerRegistrarFactory.cs b/src/Pole.ReliableMessage.Masstransit/DefaultReliableEventHandlerRegistrarFactory.cs index e6877d3..402561c 100644 --- a/src/Pole.ReliableMessage.Masstransit/DefaultReliableEventHandlerRegistrarFactory.cs +++ b/src/Pole.ReliableMessage.Masstransit/DefaultReliableEventHandlerRegistrarFactory.cs @@ -1,11 +1,11 @@ -using Pole.ReliableMessage.Core; -using Pole.ReliableMessage.EventBus; +using Pole.ReliableMessage.EventBus; using Pole.ReliableMessage.Masstransit.Abstraction; using Microsoft.Extensions.Options; using System; using System.Collections.Generic; using System.Linq; using System.Text; +using Pole.Domain; namespace Pole.ReliableMessage.Masstransit { diff --git a/src/Pole.ReliableMessage.Masstransit/MasstransitReliableEventHandler.cs b/src/Pole.ReliableMessage.Masstransit/MasstransitReliableEventHandler.cs index c9c47f0..c5e3a02 100644 --- a/src/Pole.ReliableMessage.Masstransit/MasstransitReliableEventHandler.cs +++ b/src/Pole.ReliableMessage.Masstransit/MasstransitReliableEventHandler.cs @@ -8,11 +8,12 @@ using System; using System.Collections.Generic; using System.Text; using System.Threading.Tasks; +using Pole.ReliableMessage.Storage.Abstraction; namespace Pole.ReliableMessage.Masstransit { - public abstract class ReliableEventHandler : IReliableEventHandler,IConsumer + public abstract class ReliableEventHandler : IReliableEventHandler, IConsumer where TEvent : class { private readonly IMessageStorage _messageStorage; @@ -20,8 +21,8 @@ namespace Pole.ReliableMessage.Masstransit private readonly IServiceProvider _serviceProvider; public ReliableEventHandler(IServiceProvider serviceProvider) { - _messageStorage = serviceProvider.GetRequiredService(typeof(IMessageStorage)) as IMessageStorage; - var loggerFactory = serviceProvider.GetRequiredService(typeof(ILoggerFactory)) as ILoggerFactory; + _messageStorage = serviceProvider.GetRequiredService(); + var loggerFactory = serviceProvider.GetRequiredService(); _logger = loggerFactory.CreateLogger>(); _serviceProvider = serviceProvider; } diff --git a/src/Pole.ReliableMessage.Masstransit/QueueHaType.cs b/src/Pole.ReliableMessage.Masstransit/QueueHaType.cs index 6914c6c..6fc0f3e 100644 --- a/src/Pole.ReliableMessage.Masstransit/QueueHaType.cs +++ b/src/Pole.ReliableMessage.Masstransit/QueueHaType.cs @@ -1,4 +1,4 @@ -using Pole.ReliableMessage.Core; +using Pole.Domain; using System; using System.Collections.Generic; using System.Text; diff --git a/src/Pole.ReliableMessage/Abstraction/IMessageStorage.cs b/src/Pole.ReliableMessage.Storage.Abstraction/IMessageStorage.cs similarity index 75% rename from src/Pole.ReliableMessage/Abstraction/IMessageStorage.cs rename to src/Pole.ReliableMessage.Storage.Abstraction/IMessageStorage.cs index 849fcc6..d2a265e 100644 --- a/src/Pole.ReliableMessage/Abstraction/IMessageStorage.cs +++ b/src/Pole.ReliableMessage.Storage.Abstraction/IMessageStorage.cs @@ -1,11 +1,10 @@ -using Pole.ReliableMessage.Messaging; using System; using System.Collections.Generic; using System.Linq.Expressions; using System.Text; using System.Threading.Tasks; -namespace Pole.ReliableMessage.Abstraction +namespace Pole.ReliableMessage.Storage.Abstraction { public interface IMessageStorage { @@ -15,20 +14,17 @@ namespace Pole.ReliableMessage.Abstraction /// /// Task Add(Message message); - ///// - ///// - ///// - ///// - ///// - ///// - //Task Get(Expression> filter); + + Task Delete(Expression> filter); + /// /// /// /// /// /// - Task> GetMany(Expression> filter, int count); + Task> GetMany(Expression> filter, int count); + /// /// 批量更新 /// 更新这几个值 MessageStatusId , RetryTimes LastRetryUTCTime, NextRetryUTCTime @@ -37,23 +33,13 @@ namespace Pole.ReliableMessage.Abstraction /// Task Save(IEnumerable messages); - Task UpdateStatus(IEnumerable messages); - - ///// - ///// - ///// - ///// 这里id 永远为 string - ///// - ///// - //Task Update(Expression> filter, MessageStatus messageStatus); - /// /// 检查 消息的状态,如果不是指定状态则返回true,并且更新状态到指定状态 ,如果已经是指定状态返回false /// /// /// /// - Task CheckAndUpdateStatus(Expression> filter, MessageStatus messageStatus); + Task CheckAndUpdateStatus(Expression> filter, MessageStatus messageStatus); Task UpdateStatus(Expression> filter, MessageStatus messageStatus); } diff --git a/src/Pole.ReliableMessage/MemberShipTable.cs b/src/Pole.ReliableMessage.Storage.Abstraction/MemberShipTable.cs similarity index 95% rename from src/Pole.ReliableMessage/MemberShipTable.cs rename to src/Pole.ReliableMessage.Storage.Abstraction/MemberShipTable.cs index 535ec71..ba28c92 100644 --- a/src/Pole.ReliableMessage/MemberShipTable.cs +++ b/src/Pole.ReliableMessage.Storage.Abstraction/MemberShipTable.cs @@ -2,7 +2,7 @@ using System.Collections.Generic; using System.Text; -namespace Pole.ReliableMessage +namespace Pole.ReliableMessage.Storage.Abstraction { public class MemberShipTable { diff --git a/src/Pole.ReliableMessage/Messaging/Message.cs b/src/Pole.ReliableMessage.Storage.Abstraction/Message.cs similarity index 98% rename from src/Pole.ReliableMessage/Messaging/Message.cs rename to src/Pole.ReliableMessage.Storage.Abstraction/Message.cs index 048ebd4..da785a8 100644 --- a/src/Pole.ReliableMessage/Messaging/Message.cs +++ b/src/Pole.ReliableMessage.Storage.Abstraction/Message.cs @@ -2,7 +2,7 @@ using System; using System.Collections.Generic; using System.Text; -namespace Pole.ReliableMessage.Messaging +namespace Pole.ReliableMessage.Storage.Abstraction { public class Message : IComparable { diff --git a/src/Pole.ReliableMessage/Messaging/MessageStatus.cs b/src/Pole.ReliableMessage.Storage.Abstraction/MessageStatus.cs similarity index 88% rename from src/Pole.ReliableMessage/Messaging/MessageStatus.cs rename to src/Pole.ReliableMessage.Storage.Abstraction/MessageStatus.cs index 100ca52..1cc5d53 100644 --- a/src/Pole.ReliableMessage/Messaging/MessageStatus.cs +++ b/src/Pole.ReliableMessage.Storage.Abstraction/MessageStatus.cs @@ -1,9 +1,9 @@ -using Pole.ReliableMessage.Core; +using Pole.Domain; using System; using System.Collections.Generic; using System.Text; -namespace Pole.ReliableMessage.Messaging +namespace Pole.ReliableMessage.Storage.Abstraction { public class MessageStatus : Enumeration { diff --git a/src/Pole.ReliableMessage.Storage.Abstraction/Pole.ReliableMessage.Storage.Abstraction.csproj b/src/Pole.ReliableMessage.Storage.Abstraction/Pole.ReliableMessage.Storage.Abstraction.csproj index 9f5c4f4..c48c146 100644 --- a/src/Pole.ReliableMessage.Storage.Abstraction/Pole.ReliableMessage.Storage.Abstraction.csproj +++ b/src/Pole.ReliableMessage.Storage.Abstraction/Pole.ReliableMessage.Storage.Abstraction.csproj @@ -4,4 +4,8 @@ netstandard2.0 + + + + diff --git a/src/Pole.ReliableMessage.Storage.Mongodb/MongodbOption.cs b/src/Pole.ReliableMessage.Storage.Mongodb/MongodbOption.cs index 9f39701..3990f98 100644 --- a/src/Pole.ReliableMessage.Storage.Mongodb/MongodbOption.cs +++ b/src/Pole.ReliableMessage.Storage.Mongodb/MongodbOption.cs @@ -8,16 +8,6 @@ namespace Pole.ReliableMessage.Storage.Mongodb { public string MessageDatabaseName { get; set; } = "ReliableMessage"; public string MembershipCollectionName { get; set; } = "Membership"; - /// - /// bucket 中最大消息数 一旦达到最大数量 后面的数据将覆盖前面的数据 - /// - public long CollectionMaxMessageCount { get; set; } = 20000000; - - /// - /// 默认最大为10G - /// - public long CollectionMaxSize { get; set; } = 10*1024*1024*1024L; - public string ServiceCollectionName { get; set; } public MongoHost[] Servers { get; set; } } diff --git a/src/Pole.ReliableMessage.Storage.Mongodb/MongodbStorage.cs b/src/Pole.ReliableMessage.Storage.Mongodb/MongodbStorage.cs index a22e5af..c7193a5 100644 --- a/src/Pole.ReliableMessage.Storage.Mongodb/MongodbStorage.cs +++ b/src/Pole.ReliableMessage.Storage.Mongodb/MongodbStorage.cs @@ -10,6 +10,8 @@ using System.Linq; using System.Linq.Expressions; using System.Text; using System.Threading.Tasks; +using Pole.ReliableMessage.Storage.Abstraction; +using Pole.Domain; namespace Pole.ReliableMessage.Storage.Mongodb { @@ -56,14 +58,14 @@ namespace Pole.ReliableMessage.Storage.Mongodb return true; } - public async Task> GetMany(Expression> filter,int count) + public async Task> GetMany(Expression> filter, int count) { IMongoCollection collection = GetCollection(); - var list= await collection.Find(filter).Limit(count).ToListAsync(); + var list = await collection.Find(filter).Limit(count).ToListAsync(); list.ForEach(m => { - m.MessageStatus = Core.Enumeration.FromValue(m.MessageStatusId); + m.MessageStatus = Enumeration.FromValue(m.MessageStatusId); }); return list; } @@ -98,43 +100,21 @@ namespace Pole.ReliableMessage.Storage.Mongodb return result.IsAcknowledged; } - public async Task UpdateStatus(IEnumerable messages) + public async Task UpdateStatus(Expression> filter, MessageStatus messageStatus) { - var count = messages.Count(); - _logger.LogDebug($"MongodbMessageStorage updateStatus begin, Messages count: {messages.Count()}"); - if (count == 0) - { - _logger.LogDebug($"MongodbMessageStorage updateStatus successfully, Modified count: 0"); - return true; - } IMongoCollection collection = GetCollection(); - var models = new List>(); - - foreach (var message in messages) - { - FilterDefinition filter = Builders.Filter.Where(m => m.Id == message.Id&&m.MessageStatusId!=MessageStatus.Handed.Id); - UpdateDefinition update = Builders.Update - .Set(m => m.MessageStatusId, message.MessageStatus.Id); - - var model = new UpdateOneModel(filter, update); - models.Add(model); - } - - var result = await collection.BulkWriteAsync(models, new BulkWriteOptions { IsOrdered = false }); - - _logger.LogDebug($"MongodbMessageStorage updateStatus successfully, Modified count: {result.ModifiedCount}"); - + var update = Builders.Update.Set(m => m.MessageStatusId, messageStatus.Id); + var result = await collection.UpdateOneAsync(filter, update); return result.IsAcknowledged; } - public async Task UpdateStatus(Expression> filter, MessageStatus messageStatus) + public async Task Delete(Expression> filter) { IMongoCollection collection = GetCollection(); - var update = Builders.Update.Set(m => m.MessageStatusId, messageStatus.Id); - var result = await collection.UpdateOneAsync(filter, update); - return result.IsAcknowledged; + var result = await collection.DeleteManyAsync(filter); + return result.DeletedCount; } } } diff --git a/src/Pole.ReliableMessage.Storage.Mongodb/ReliableMessageOptionExtension.cs b/src/Pole.ReliableMessage.Storage.Mongodb/ReliableMessageOptionExtension.cs index 781bd2e..13c51a4 100644 --- a/src/Pole.ReliableMessage.Storage.Mongodb/ReliableMessageOptionExtension.cs +++ b/src/Pole.ReliableMessage.Storage.Mongodb/ReliableMessageOptionExtension.cs @@ -60,13 +60,7 @@ namespace Microsoft.Extensions.DependencyInjection if (!collectionNames.Contains(mongodbOption.ServiceCollectionName)) { - database.CreateCollection(mongodbOption.ServiceCollectionName, new CreateCollectionOptions - { - Capped = true, - MaxDocuments = mongodbOption.CollectionMaxMessageCount, - MaxSize = mongodbOption.CollectionMaxSize, - - }); + database.CreateCollection(mongodbOption.ServiceCollectionName); var messageCollection = database.GetCollection(mongodbOption.ServiceCollectionName); AddMessageCollectionIndex(messageCollection); } @@ -74,7 +68,6 @@ namespace Microsoft.Extensions.DependencyInjection if (!collectionNames.Contains(mongodbOption.MembershipCollectionName)) { database.CreateCollection(mongodbOption.MembershipCollectionName); - var membershipCollection = database.GetCollection(mongodbOption.MembershipCollectionName); AddMemberShipTableCollectionIndex(membershipCollection); } diff --git a/src/Pole.ReliableMessage/Abstraction/IMessageBuffer.cs b/src/Pole.ReliableMessage/Abstraction/IMessageCheckRetryer.cs similarity index 66% rename from src/Pole.ReliableMessage/Abstraction/IMessageBuffer.cs rename to src/Pole.ReliableMessage/Abstraction/IMessageCheckRetryer.cs index 7055878..a16c5b9 100644 --- a/src/Pole.ReliableMessage/Abstraction/IMessageBuffer.cs +++ b/src/Pole.ReliableMessage/Abstraction/IMessageCheckRetryer.cs @@ -1,16 +1,13 @@ -using Pole.ReliableMessage.Messaging; +using Pole.ReliableMessage.Storage.Abstraction; using System; using System.Collections.Generic; -using System.Linq.Expressions; using System.Text; using System.Threading.Tasks; namespace Pole.ReliableMessage.Abstraction { - public interface IMessageBuffer + public interface IMessageCheckRetryer { - Task Flush(); - Task Add(Message message); - Task> GetAll(Func filter); + Task Execute(IEnumerable messages, DateTime dateTime); } } diff --git a/src/Pole.ReliableMessage/Abstraction/IMessageChecker.cs b/src/Pole.ReliableMessage/Abstraction/IMessageChecker.cs index 11b2e44..5092971 100644 --- a/src/Pole.ReliableMessage/Abstraction/IMessageChecker.cs +++ b/src/Pole.ReliableMessage/Abstraction/IMessageChecker.cs @@ -1,4 +1,5 @@ using Pole.ReliableMessage.Messaging; +using Pole.ReliableMessage.Storage.Abstraction; using System; using System.Collections.Generic; using System.Text; diff --git a/src/Pole.ReliableMessage/ComteckReliableMessageServiceCollectionExtensions.cs b/src/Pole.ReliableMessage/ComteckReliableMessageServiceCollectionExtensions.cs index e92f470..fa9bcee 100644 --- a/src/Pole.ReliableMessage/ComteckReliableMessageServiceCollectionExtensions.cs +++ b/src/Pole.ReliableMessage/ComteckReliableMessageServiceCollectionExtensions.cs @@ -28,7 +28,6 @@ namespace Microsoft.Extensions.DependencyInjection services.Configure(optionConfig); services.AddSingleton(); - services.AddSingleton(); services.AddSingleton(); services.AddSingleton(); services.AddSingleton(); @@ -51,7 +50,7 @@ namespace Microsoft.Extensions.DependencyInjection - services.AddSingleton(); + services.AddSingleton(); services.AddSingleton(); services.AddSingleton(); diff --git a/src/Pole.ReliableMessage/Core/Enumeration.cs b/src/Pole.ReliableMessage/Core/Enumeration.cs deleted file mode 100644 index 3c584b6..0000000 --- a/src/Pole.ReliableMessage/Core/Enumeration.cs +++ /dev/null @@ -1,73 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Reflection; -using System.Text; - -namespace Pole.ReliableMessage.Core -{ - public abstract class Enumeration : IComparable - { - public string Name { get; private set; } - - public int Id { get; private set; } - - protected Enumeration(int id, string name) - { - Id = id; - Name = name; - } - - public override string ToString() => Name; - - public static IEnumerable GetAll() where T : Enumeration - { - var fields = typeof(T).GetFields(BindingFlags.Public | BindingFlags.Static | BindingFlags.DeclaredOnly); - - return fields.Select(f => f.GetValue(null)).Cast(); - } - - public override bool Equals(object obj) - { - if (!(obj is Enumeration otherValue)) - return false; - - var typeMatches = GetType().Equals(obj.GetType()); - var valueMatches = Id.Equals(otherValue.Id); - - return typeMatches && valueMatches; - } - - public override int GetHashCode() => Id.GetHashCode(); - - public static int AbsoluteDifference(Enumeration firstValue, Enumeration secondValue) - { - var absoluteDifference = Math.Abs(firstValue.Id - secondValue.Id); - return absoluteDifference; - } - - public static T FromValue(int value) where T : Enumeration - { - var matchingItem = Parse(value, "value", item => item.Id == value); - return matchingItem; - } - - public static T FromDisplayName(string displayName) where T : Enumeration - { - var matchingItem = Parse(displayName, "display name", item => item.Name == displayName); - return matchingItem; - } - - private static T Parse(K value, string description, Func predicate) where T : Enumeration - { - var matchingItem = GetAll().FirstOrDefault(predicate); - - if (matchingItem == null) - throw new InvalidOperationException($"'{value}' is not a valid {description} in {typeof(T)}"); - - return matchingItem; - } - - public int CompareTo(object other) => Id.CompareTo(((Enumeration)other).Id); - } -} diff --git a/src/Pole.ReliableMessage/DefaultMessageCheckRetryer.cs b/src/Pole.ReliableMessage/DefaultMessageCheckRetryer.cs new file mode 100644 index 0000000..3c9c692 --- /dev/null +++ b/src/Pole.ReliableMessage/DefaultMessageCheckRetryer.cs @@ -0,0 +1,103 @@ +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using Pole.ReliableMessage.Abstraction; +using Pole.ReliableMessage.Storage.Abstraction; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Pole.ReliableMessage +{ + class DefaultMessageCheckRetryer : IMessageCheckRetryer + { + private readonly ILogger _logger; + private readonly IRetryTimeDelayCalculator _retryTimeDelayCalculator; + private readonly ReliableMessageOption _options; + private readonly IMessageStorage _storage; + private readonly IMessageChecker _messageChecker; + private readonly IMessageBus _messageBus; + private readonly List _changedMessage = new List(); + public DefaultMessageCheckRetryer(ILogger logger, IRetryTimeDelayCalculator retryTimeDelayCalculator, IOptions options, IMessageStorage storage, IMessageChecker messageChecker, IMessageBus messageBus) + { + _logger = logger; + _retryTimeDelayCalculator = retryTimeDelayCalculator; + _options = options.Value ?? throw new Exception($"{nameof(ReliableMessageOption)} Must be injected"); + _storage = storage; + _messageChecker = messageChecker; + _messageBus = messageBus; + } + public async Task Execute(IEnumerable messages, DateTime dateTime) + { + try + { + messages.AsParallel().ForAll(async m => await Retry(m, dateTime)); + if (_changedMessage.Count != 0) + { + await _storage.Save(_changedMessage); + } + } + catch (Exception ex) + { + _logger.LogError(ex, $"DefaultMessageCheckRetryer.Execute ,Execute with errors"); + } + finally + { + if (_changedMessage.Count != 0) + { + _changedMessage.Clear(); + } + } + } + private async Task Retry(Message message, DateTime retryTime) + { + try + { + if (_logger.IsEnabled(LogLevel.Debug)) + { + _logger.LogDebug($"DefaultMessageCheckRetryer.Retry ,message:{message.Id} begin Retry"); + } + var nextRetryDelay = _retryTimeDelayCalculator.Get(message.RetryTimes, _options.MaxPendingMessageRetryDelay); + message.NextRetryUTCTime = retryTime.AddSeconds(nextRetryDelay); + + if (retryTime > message.AddedUTCTime.AddSeconds(_options.PendingMessageTimeOut)) + { + if (_logger.IsEnabled(LogLevel.Debug)) + { + _logger.LogDebug($"DefaultMessageCheckRetryer.Retry ,message:{message.Id} would be Canced ,PendingMessageTimeOut:{_options.PendingMessageTimeOut}"); + } + + message.NextRetryUTCTime = DateTime.MinValue; + message.MessageStatus = MessageStatus.Canced; + _changedMessage.Add(message); + return; + } + message.RetryTimes++; + + var messageCheckerResult = await _messageChecker.GetResult(message); + if (messageCheckerResult.IsFinished) + { + if (_logger.IsEnabled(LogLevel.Debug)) + { + _logger.LogDebug($"DefaultMessageCheckRetryer.Retry ,message:{message.Id} would be Pushed"); + } + message.MessageStatus = MessageStatus.Pushed; + await _messageBus.Publish(messageCheckerResult.Event, message.Id); + } + else + { + if (_logger.IsEnabled(LogLevel.Debug)) + { + _logger.LogDebug($"DefaultMessageCheckRetryer.Retry ,message:{message.Id} would be Retry next time"); + } + } + _changedMessage.Add(message); + } + catch (Exception ex) + { + _logger.LogError(ex, $"DefaultMessageCheckRetryer.Retry ,Message:{message.Id} retry with errors"); + } + } + } +} diff --git a/src/Pole.ReliableMessage/EventBus/DefaultReliableBus.cs b/src/Pole.ReliableMessage/EventBus/DefaultReliableBus.cs index 597bf07..5c6eacc 100644 --- a/src/Pole.ReliableMessage/EventBus/DefaultReliableBus.cs +++ b/src/Pole.ReliableMessage/EventBus/DefaultReliableBus.cs @@ -1,5 +1,4 @@ using Pole.ReliableMessage.Abstraction; -using Pole.ReliableMessage.Messaging; using Microsoft.Extensions.Logging; using System; using System.Collections.Generic; @@ -7,6 +6,7 @@ using System.Text; using System.Threading; using System.Threading.Tasks; using ILogger = Microsoft.Extensions.Logging.ILogger; +using Pole.ReliableMessage.Storage.Abstraction; namespace Pole.Pole.ReliableMessage.EventBus { @@ -16,18 +16,17 @@ namespace Pole.Pole.ReliableMessage.EventBus private readonly IMessageStorage _messageStorage; private readonly IMessageIdGenerator _messageIdGenerator; private readonly ITimeHelper _timeHelper; - private readonly IMessageBuffer _messageBuffer; + //private readonly IMessageBuffer _messageBuffer; private readonly ILogger _logger; private readonly IJsonConverter _jsonConverter; private readonly IMessageCallBackInfoStore _messageCallBackInfoStore; private readonly IMessageTypeIdGenerator _messageTypeIdGenerator; - public DefaultReliableBus(IMessageBus messageBus, IMessageStorage messageStorage, IMessageIdGenerator messageIdGenerator, ITimeHelper timeHelper, IMessageBuffer messageBuffer, ILogger logger, IJsonConverter jsonConverter, IMessageCallBackInfoStore messageCallBackInfoStore, IMessageTypeIdGenerator messageTypeIdGenerator) + public DefaultReliableBus(IMessageBus messageBus, IMessageStorage messageStorage, IMessageIdGenerator messageIdGenerator, ITimeHelper timeHelper, ILogger logger, IJsonConverter jsonConverter, IMessageCallBackInfoStore messageCallBackInfoStore, IMessageTypeIdGenerator messageTypeIdGenerator) { _messageBus = messageBus; _messageStorage = messageStorage; _messageIdGenerator = messageIdGenerator; _timeHelper = timeHelper; - _messageBuffer = messageBuffer; _logger = logger; _jsonConverter = jsonConverter; _messageCallBackInfoStore = messageCallBackInfoStore; @@ -97,8 +96,8 @@ namespace Pole.Pole.ReliableMessage.EventBus { await _messageBus.Publish(@event, prePublishMessageId, cancellationToken); - var messageBufferResult = await _messageBuffer.Add(new Message { Id = prePublishMessageId, MessageStatus = MessageStatus.Pushed }); - return true; + var messageBufferResult = await _messageStorage.UpdateStatus(m => m.Id == prePublishMessageId && m.MessageStatusId == MessageStatus.Pending.Id, MessageStatus.Pushed); + return messageBufferResult; } catch (Exception ex) { diff --git a/src/Pole.ReliableMessage/Messaging/CallBack/MessageCallBackResponseStatus.cs b/src/Pole.ReliableMessage/Messaging/CallBack/MessageCallBackResponseStatus.cs index f4f2875..331ff05 100644 --- a/src/Pole.ReliableMessage/Messaging/CallBack/MessageCallBackResponseStatus.cs +++ b/src/Pole.ReliableMessage/Messaging/CallBack/MessageCallBackResponseStatus.cs @@ -1,4 +1,4 @@ -using Pole.ReliableMessage.Core; +using Pole.Domain; using System; using System.Collections.Generic; using System.Text; diff --git a/src/Pole.ReliableMessage/Messaging/DefaultMessageBuffer.cs b/src/Pole.ReliableMessage/Messaging/DefaultMessageBuffer.cs deleted file mode 100644 index 27a6abf..0000000 --- a/src/Pole.ReliableMessage/Messaging/DefaultMessageBuffer.cs +++ /dev/null @@ -1,58 +0,0 @@ -using Pole.ReliableMessage.Abstraction; -using Microsoft.Extensions.Logging; -using System; -using System.Collections.Generic; -using System.Linq; -using System.Linq.Expressions; -using System.Text; -using System.Threading.Tasks; - -namespace Pole.ReliableMessage.Messaging -{ - class DefaultMessageBuffer : IMessageBuffer - { - private readonly IMessageStorage _storage; - private readonly System.Collections.Concurrent.ConcurrentDictionary Messages = new System.Collections.Concurrent.ConcurrentDictionary(); - private readonly ILogger _logger; - public DefaultMessageBuffer(IMessageStorage storage, ILogger logger) - { - _storage = storage; - _logger = logger; - } - public async Task Flush() - { - /// 通过 MessageTypeId 是否为空 判断 消息是否为 DefaultReliableBus Publish 完成后的消息状态修改缓冲, - var toUpdateStatusMessageKeyValuePairs = Messages.Where(m => string.IsNullOrEmpty(m.Value.MessageTypeId)); - var toUpdateStatusMessages= toUpdateStatusMessageKeyValuePairs.Select(m=>m.Value).ToArray(); - var toUpdateStatusMessageIds = toUpdateStatusMessageKeyValuePairs.Select(m => m.Key).ToList(); - await _storage.UpdateStatus(toUpdateStatusMessages); - - _logger.LogDebug($"DefaultMessageBuffer.Flush update successfully, Message count{toUpdateStatusMessages.Count()}"); - toUpdateStatusMessageIds.ForEach(m => { - Messages.TryRemove(m,out Message message); - }); - - var toSavedMessageKeyValuePairs = Messages.Where(m => !string.IsNullOrEmpty(m.Value.MessageTypeId)); - var toSavedMessages = toSavedMessageKeyValuePairs.Select(m => m.Value).ToArray(); - var toSavedMessagesIds = toSavedMessageKeyValuePairs.Select(m => m.Key).ToList(); - await _storage.Save(toSavedMessages); - - _logger.LogDebug($"DefaultMessageBuffer.Flush save successfully, Message count{toSavedMessages.Count()}"); - toSavedMessagesIds.ForEach(m => { - Messages.TryRemove(m, out Message message); - }); - } - - public Task Add(Message message) - { - Messages.TryAdd(message.Id, message); - return Task.FromResult(true); - } - - public async Task> GetAll(Func filter) - { - await Task.CompletedTask; - return Messages.Values.Where(filter).ToList(); - } - } -} diff --git a/src/Pole.ReliableMessage/Messaging/DefaultMessageChecker.cs b/src/Pole.ReliableMessage/Messaging/DefaultMessageChecker.cs index 731aff0..fc44b21 100644 --- a/src/Pole.ReliableMessage/Messaging/DefaultMessageChecker.cs +++ b/src/Pole.ReliableMessage/Messaging/DefaultMessageChecker.cs @@ -7,6 +7,7 @@ using System; using System.Collections.Generic; using System.Text; using System.Threading.Tasks; +using Pole.ReliableMessage.Storage.Abstraction; namespace Pole.ReliableMessage.Messaging { diff --git a/src/Pole.ReliableMessage/Processor/MessageBufferFlushProcessor.cs b/src/Pole.ReliableMessage/Processor/MessageCleanProcessor.cs similarity index 62% rename from src/Pole.ReliableMessage/Processor/MessageBufferFlushProcessor.cs rename to src/Pole.ReliableMessage/Processor/MessageCleanProcessor.cs index 8421317..6396e14 100644 --- a/src/Pole.ReliableMessage/Processor/MessageBufferFlushProcessor.cs +++ b/src/Pole.ReliableMessage/Processor/MessageCleanProcessor.cs @@ -6,35 +6,52 @@ using System; using System.Collections.Generic; using System.Text; using System.Threading.Tasks; +using Pole.ReliableMessage.Storage.Abstraction; namespace Pole.ReliableMessage.Processor { - public class MessageBufferFlushProcessor : ProcessorBase + public class MessageCleanProcessor : ProcessorBase { - private readonly IMessageBuffer _messageBuffer; private readonly ReliableMessageOption _options; - private readonly ILogger _logger; - public MessageBufferFlushProcessor(IMessageBuffer messageBuffer, IOptions options, ILogger logger) + private readonly ILogger _logger; + private readonly IMessageStorage _messageStorage; + private readonly IMemberShipTable _memberShipTable; + private readonly IServiceIPv4AddressProvider _serviceIPv4AddressProvider; + public MessageCleanProcessor(IOptions options, ILogger logger, IMessageStorage messageStorage, IMemberShipTable memberShipTable, IServiceIPv4AddressProvider serviceIPv4AddressProvider) { - _messageBuffer = messageBuffer; _options = options.Value ?? throw new Exception($"{nameof(ReliableMessageOption)} Must be injected"); _logger = logger; + _messageStorage = messageStorage; + _memberShipTable = memberShipTable; + _serviceIPv4AddressProvider = serviceIPv4AddressProvider; } public override string Name => nameof(PendingMessageCheckProcessor); + public override async Task Process(ProcessingContext context) { try { - await _messageBuffer.Flush(); + var iPStr = _serviceIPv4AddressProvider.Get(); + var isPendingChecker = await _memberShipTable.IsPendingMessageCheckerServiceInstance(iPStr);// 这里可以把时间加上去 + if (!isPendingChecker) + { + _logger.LogDebug("I an not the pendingChecker ,Ignore clean up messages"); + return; + } + _logger.LogInformation($"Begin clean message"); + + var deletedCount = await _messageStorage.Delete(m => m.MessageStatusId == MessageStatus.Canced.Id || m.MessageStatusId == MessageStatus.Handed.Id); + + _logger.LogInformation($"End clean message ,delete message count : {deletedCount} , successfully"); } - catch(Exception ex) + catch (Exception ex) { - _logger.LogError(ex, $"MessageBufferFlushProcessor Process Error"); + _logger.LogError(ex, $"Clean message error"); } finally { - await Task.Delay(_options.PushedMessageFlushInterval * 1000); + await Task.Delay(_options.MessageCleanInterval * 1000); } } } diff --git a/src/Pole.ReliableMessage/Processor/PendingMessageCheckProcessor.cs b/src/Pole.ReliableMessage/Processor/PendingMessageCheckProcessor.cs index 7d25d53..224c221 100644 --- a/src/Pole.ReliableMessage/Processor/PendingMessageCheckProcessor.cs +++ b/src/Pole.ReliableMessage/Processor/PendingMessageCheckProcessor.cs @@ -17,26 +17,22 @@ namespace Pole.ReliableMessage.Processor { private readonly IMessageStorage _storage; private readonly ReliableMessageOption _options; - private readonly IMessageBuffer _messageBuffer; + //private readonly IMessageBuffer _messageBuffer; private readonly ITimeHelper _timeHelper; - private readonly IMessageChecker _messageChecker; - private readonly IRetryTimeDelayCalculator _retryTimeDelayCalculator; private readonly IMemberShipTable _memberShipTable; private readonly ILogger _logger; private readonly IServiceIPv4AddressProvider _serviceIPv4AddressProvider; - private readonly IMessageBus _messageBus; - public PendingMessageCheckProcessor(IMessageStorage storage, IOptions options, IMessageBuffer messageBuffer, ITimeHelper timeHelper, IMessageChecker messageChecker, IRetryTimeDelayCalculator retryTimeDelayCalculator, IMemberShipTable memberShipTable, ILogger logger, IServiceIPv4AddressProvider serviceIPv4AddressProvider, IMessageBus messageBus) + private readonly IMessageCheckRetryer _messageCheckRetryer; + public PendingMessageCheckProcessor(IMessageStorage storage, IOptions options, ITimeHelper timeHelper, IMemberShipTable memberShipTable, ILogger logger, IServiceIPv4AddressProvider serviceIPv4AddressProvider, IMessageCheckRetryer messageCheckRetryer) { _storage = storage; _options = options.Value ?? throw new Exception($"{nameof(ReliableMessageOption)} Must be injected"); - _messageBuffer = messageBuffer; + //_messageBuffer = messageBuffer; _timeHelper = timeHelper; - _messageChecker = messageChecker; - _retryTimeDelayCalculator = retryTimeDelayCalculator; _memberShipTable = memberShipTable; _logger = logger; _serviceIPv4AddressProvider = serviceIPv4AddressProvider; - _messageBus = messageBus; + _messageCheckRetryer = messageCheckRetryer; } public override string Name => nameof(PendingMessageCheckProcessor); @@ -54,20 +50,7 @@ namespace Pole.ReliableMessage.Processor return; } - var now = _timeHelper.GetUTCNow(); - var pendingMessages = await _storage.GetMany(m => m.MessageStatusId == MessageStatus.Pending.Id &&m.NextRetryUTCTime <= now && m.AddedUTCTime <= now.AddSeconds(-1 * _options.PendingMessageFirstProcessingWaitTime)&&m.AddedUTCTime>= now.AddSeconds(-1 * _options.PendingMessageCheckingTimeOutSeconds),_options.PendingMessageCheckBatchCount); - - var cachedPushedMessage = await _messageBuffer.GetAll(m => m.MessageStatus == MessageStatus.Pushed); - - var finalToCheckedMessages = pendingMessages.Except(cachedPushedMessage, MessageIEqualityComparer.Default).ToList(); - if (_logger.IsEnabled(LogLevel.Debug)) - { - _logger.LogDebug($"PendingMessageCheckProcessor pendingMessages count:{pendingMessages.Count}"); - _logger.LogDebug($"PendingMessageCheckProcessor cachedPushedMessage count:{cachedPushedMessage.Count}"); - _logger.LogDebug($"PendingMessageCheckProcessor finalToCheckedMessages count:{finalToCheckedMessages.Count}"); - } - - finalToCheckedMessages.AsParallel().ForAll(async m => await Retry(m, now)); + await ProcessInternal(); } catch (Exception ex) { @@ -78,56 +61,21 @@ namespace Pole.ReliableMessage.Processor await Task.Delay(_options.PendingMessageRetryInterval * 1000); } } - - private async Task Retry(Message message, DateTime retryTime) + public async Task ProcessInternal() { - try - { - await Task.CompletedTask; - - if (_logger.IsEnabled(LogLevel.Debug)) - { - _logger.LogDebug($"PendingMessageCheckProcessor.Retry ,message:{message.Id} begin Retry"); - } - var nextRetryDelay = _retryTimeDelayCalculator.Get(message.RetryTimes, _options.MaxPendingMessageRetryDelay); - message.NextRetryUTCTime = retryTime.AddSeconds(nextRetryDelay); + var now = _timeHelper.GetUTCNow(); + var pendingMessages = await _storage.GetMany(m => m.MessageStatusId == MessageStatus.Pending.Id && m.NextRetryUTCTime <= now && m.AddedUTCTime <= now.AddSeconds(-1 * _options.PendingMessageFirstProcessingWaitTime) && m.AddedUTCTime >= now.AddSeconds(-1 * _options.PendingMessageCheckingTimeOutSeconds), _options.PendingMessageCheckBatchCount); - if (retryTime > message.AddedUTCTime.AddSeconds(_options.PendingMessageTimeOut)) - { - if (_logger.IsEnabled(LogLevel.Debug)) - { - _logger.LogDebug($"PendingMessageCheckProcessor.Retry ,message:{message.Id} would be Canced ,PendingMessageTimeOut:{_options.PendingMessageTimeOut}"); - } - - message.NextRetryUTCTime = DateTime.MinValue; - message.MessageStatus = MessageStatus.Canced; - await _messageBuffer.Add(message); - return; - } - message.RetryTimes++; - - var messageCheckerResult = await _messageChecker.GetResult(message); - if (messageCheckerResult.IsFinished) - { - if (_logger.IsEnabled(LogLevel.Debug)) - { - _logger.LogDebug($"PendingMessageCheckProcessor.Retry ,message:{message.Id} would be Pushed"); - } - message.MessageStatus = MessageStatus.Pushed; - await _messageBus.Publish(messageCheckerResult.Event, message.Id); - } - else - { - if (_logger.IsEnabled(LogLevel.Debug)) - { - _logger.LogDebug($"PendingMessageCheckProcessor.Retry ,message:{message.Id} would be Retry next time"); - } - } - await _messageBuffer.Add(message); + if (_logger.IsEnabled(LogLevel.Debug)) + { + _logger.LogDebug($"PendingMessageCheckProcessor pendingMessages count:{pendingMessages.Count}"); } - catch (Exception ex) + + await _messageCheckRetryer.Execute(pendingMessages, now); + // 说明此时 消息数量超过 批量获取数 + if (pendingMessages.Count == _options.PendingMessageCheckBatchCount) { - _logger.LogError(ex, $"PendingMessageCheckProcessor Retry ,Message:{message.Id} retry with errors"); + await ProcessInternal(); } } } diff --git a/src/Pole.ReliableMessage/ReliableMessageOption.cs b/src/Pole.ReliableMessage/ReliableMessageOption.cs index 0c3cfb4..fea3d98 100644 --- a/src/Pole.ReliableMessage/ReliableMessageOption.cs +++ b/src/Pole.ReliableMessage/ReliableMessageOption.cs @@ -25,7 +25,7 @@ namespace Pole.ReliableMessage /// /// 预发送消息超时时间 单位 秒 /// - public int PendingMessageTimeOut { get; set; } = 10*60; + public int PendingMessageTimeOut { get; set; } = 10 * 60; /// /// 预发送消息检查时每一次获取的消息数量 @@ -35,7 +35,7 @@ namespace Pole.ReliableMessage /// /// 预发送消息状态检查最后时间 单位 秒 /// - public int PendingMessageCheckingTimeOutSeconds { get; set; } = 13*60; + public int PendingMessageCheckingTimeOutSeconds { get; set; } = 13 * 60; /// /// 已发送的消息缓冲区 flush to storage 的时间间隔 单位 秒 @@ -46,7 +46,7 @@ namespace Pole.ReliableMessage /// /// PendingMessage 第一次处理等待时间 单位 秒 /// - public int PendingMessageFirstProcessingWaitTime { get; set; } = 2+10; + public int PendingMessageFirstProcessingWaitTime { get; set; } = 2 + 10; /// /// 每次重试之间最大间隔 单位 秒 @@ -61,14 +61,19 @@ namespace Pole.ReliableMessage /// /// PendingMessageCheck 实例存活超时时间 单位 秒 /// - public int PendingMessageCheckerInstanceIAnAliveTimeout { get; set; } = 3*10; + public int PendingMessageCheckerInstanceIAnAliveTimeout { get; set; } = 3 * 10; + + /// + /// Message 定期清理时间间隔 单位 秒 + /// + public int MessageCleanInterval { get; set; } = 30 * 60; /// /// 当主机有多个网络时通过指定网关地址找到合适的服务ip地址 /// public string NetworkInterfaceGatewayAddress { get; set; } = string.Empty; - public ReliableMessageOption AddEventAssemblies (params Assembly [] assemblies) + public ReliableMessageOption AddEventAssemblies(params Assembly[] assemblies) { EventCallbackAssemblies = assemblies.ToList(); return this; @@ -89,5 +94,4 @@ namespace Pole.ReliableMessage return this; } } - }