From 779a2f18a343716e33ce1fb1dac3215c330c632c Mon Sep 17 00:00:00 2001 From: 丁松杰 <377973147@qq.com> Date: Tue, 7 Jan 2020 14:50:53 +0800 Subject: [PATCH] 优化 可靠消息 --- src/Pole.Domain/Entity/Enumeration.cs | 21 ++++++--------------- src/Pole.ReliableMessage.Masstransit/DefaultReliableEventHandlerRegistrarFactory.cs | 4 ++-- src/Pole.ReliableMessage.Masstransit/MasstransitReliableEventHandler.cs | 7 ++++--- src/Pole.ReliableMessage.Masstransit/QueueHaType.cs | 2 +- src/Pole.ReliableMessage.Storage.Abstraction/IMessageStorage.cs | 46 ++++++++++++++++++++++++++++++++++++++++++++++ src/Pole.ReliableMessage.Storage.Abstraction/MemberShipTable.cs | 21 +++++++++++++++++++++ src/Pole.ReliableMessage.Storage.Abstraction/Message.cs | 79 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/Pole.ReliableMessage.Storage.Abstraction/MessageStatus.cs | 20 ++++++++++++++++++++ src/Pole.ReliableMessage.Storage.Abstraction/Pole.ReliableMessage.Storage.Abstraction.csproj | 4 ++++ src/Pole.ReliableMessage.Storage.Mongodb/MongodbOption.cs | 10 ---------- src/Pole.ReliableMessage.Storage.Mongodb/MongodbStorage.cs | 42 +++++++++++------------------------------- src/Pole.ReliableMessage.Storage.Mongodb/ReliableMessageOptionExtension.cs | 9 +-------- src/Pole.ReliableMessage/Abstraction/IMessageBuffer.cs | 16 ---------------- src/Pole.ReliableMessage/Abstraction/IMessageCheckRetryer.cs | 13 +++++++++++++ src/Pole.ReliableMessage/Abstraction/IMessageChecker.cs | 1 + src/Pole.ReliableMessage/Abstraction/IMessageStorage.cs | 60 ------------------------------------------------------------ src/Pole.ReliableMessage/ComteckReliableMessageServiceCollectionExtensions.cs | 3 +-- src/Pole.ReliableMessage/Core/Enumeration.cs | 73 ------------------------------------------------------------------------- src/Pole.ReliableMessage/DefaultMessageCheckRetryer.cs | 103 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/Pole.ReliableMessage/EventBus/DefaultReliableBus.cs | 11 +++++------ src/Pole.ReliableMessage/MemberShipTable.cs | 21 --------------------- src/Pole.ReliableMessage/Messaging/CallBack/MessageCallBackResponseStatus.cs | 2 +- src/Pole.ReliableMessage/Messaging/DefaultMessageBuffer.cs | 58 ---------------------------------------------------------- src/Pole.ReliableMessage/Messaging/DefaultMessageChecker.cs | 1 + src/Pole.ReliableMessage/Messaging/Message.cs | 79 ------------------------------------------------------------------------------- src/Pole.ReliableMessage/Messaging/MessageStatus.cs | 20 -------------------- src/Pole.ReliableMessage/Processor/MessageBufferFlushProcessor.cs | 41 ----------------------------------------- src/Pole.ReliableMessage/Processor/MessageCleanProcessor.cs | 58 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/Pole.ReliableMessage/Processor/PendingMessageCheckProcessor.cs | 86 +++++++++++++++++--------------------------------------------------------------------- src/Pole.ReliableMessage/ReliableMessageOption.cs | 16 ++++++++++------ 30 files changed, 405 insertions(+), 522 deletions(-) create mode 100644 src/Pole.ReliableMessage.Storage.Abstraction/IMessageStorage.cs create mode 100644 src/Pole.ReliableMessage.Storage.Abstraction/MemberShipTable.cs create mode 100644 src/Pole.ReliableMessage.Storage.Abstraction/Message.cs create mode 100644 src/Pole.ReliableMessage.Storage.Abstraction/MessageStatus.cs delete mode 100644 src/Pole.ReliableMessage/Abstraction/IMessageBuffer.cs create mode 100644 src/Pole.ReliableMessage/Abstraction/IMessageCheckRetryer.cs delete mode 100644 src/Pole.ReliableMessage/Abstraction/IMessageStorage.cs delete mode 100644 src/Pole.ReliableMessage/Core/Enumeration.cs create mode 100644 src/Pole.ReliableMessage/DefaultMessageCheckRetryer.cs delete mode 100644 src/Pole.ReliableMessage/MemberShipTable.cs delete mode 100644 src/Pole.ReliableMessage/Messaging/DefaultMessageBuffer.cs delete mode 100644 src/Pole.ReliableMessage/Messaging/Message.cs delete mode 100644 src/Pole.ReliableMessage/Messaging/MessageStatus.cs delete mode 100644 src/Pole.ReliableMessage/Processor/MessageBufferFlushProcessor.cs create mode 100644 src/Pole.ReliableMessage/Processor/MessageCleanProcessor.cs diff --git a/src/Pole.Domain/Entity/Enumeration.cs b/src/Pole.Domain/Entity/Enumeration.cs index f3cfcb6..49f0865 100644 --- a/src/Pole.Domain/Entity/Enumeration.cs +++ b/src/Pole.Domain/Entity/Enumeration.cs @@ -27,20 +27,11 @@ namespace Pole.Domain return Name; } - public static IEnumerable GetAll() where T : Enumeration, new() + public static IEnumerable GetAll() where T : Enumeration { - var type = typeof(T); - var fields = type.GetTypeInfo().GetFields(BindingFlags.Public | BindingFlags.Static | BindingFlags.DeclaredOnly); + var fields = typeof(T).GetFields(BindingFlags.Public | BindingFlags.Static | BindingFlags.DeclaredOnly); - foreach (var info in fields) - { - var instance = new T(); - - if (info.GetValue(instance) is T locatedValue) - { - yield return locatedValue; - } - } + return fields.Select(f => f.GetValue(null)).Cast(); } public override bool Equals(object obj) @@ -69,19 +60,19 @@ namespace Pole.Domain return absoluteDifference; } - public static T FromValue(int value) where T : Enumeration, new() + public static T FromValue(int value) where T : Enumeration { var matchingItem = Parse(value, "value", item => item.Id == value); return matchingItem; } - public static T FromDisplayName(string displayName) where T : Enumeration, new() + public static T FromDisplayName(string displayName) where T : Enumeration { var matchingItem = Parse(displayName, "display name", item => item.Name == displayName); return matchingItem; } - private static T Parse(K value, string description, Func predicate) where T : Enumeration, new() + private static T Parse(K value, string description, Func predicate) where T : Enumeration { var matchingItem = GetAll().FirstOrDefault(predicate); diff --git a/src/Pole.ReliableMessage.Masstransit/DefaultReliableEventHandlerRegistrarFactory.cs b/src/Pole.ReliableMessage.Masstransit/DefaultReliableEventHandlerRegistrarFactory.cs index e6877d3..402561c 100644 --- a/src/Pole.ReliableMessage.Masstransit/DefaultReliableEventHandlerRegistrarFactory.cs +++ b/src/Pole.ReliableMessage.Masstransit/DefaultReliableEventHandlerRegistrarFactory.cs @@ -1,11 +1,11 @@ -using Pole.ReliableMessage.Core; -using Pole.ReliableMessage.EventBus; +using Pole.ReliableMessage.EventBus; using Pole.ReliableMessage.Masstransit.Abstraction; using Microsoft.Extensions.Options; using System; using System.Collections.Generic; using System.Linq; using System.Text; +using Pole.Domain; namespace Pole.ReliableMessage.Masstransit { diff --git a/src/Pole.ReliableMessage.Masstransit/MasstransitReliableEventHandler.cs b/src/Pole.ReliableMessage.Masstransit/MasstransitReliableEventHandler.cs index c9c47f0..c5e3a02 100644 --- a/src/Pole.ReliableMessage.Masstransit/MasstransitReliableEventHandler.cs +++ b/src/Pole.ReliableMessage.Masstransit/MasstransitReliableEventHandler.cs @@ -8,11 +8,12 @@ using System; using System.Collections.Generic; using System.Text; using System.Threading.Tasks; +using Pole.ReliableMessage.Storage.Abstraction; namespace Pole.ReliableMessage.Masstransit { - public abstract class ReliableEventHandler : IReliableEventHandler,IConsumer + public abstract class ReliableEventHandler : IReliableEventHandler, IConsumer where TEvent : class { private readonly IMessageStorage _messageStorage; @@ -20,8 +21,8 @@ namespace Pole.ReliableMessage.Masstransit private readonly IServiceProvider _serviceProvider; public ReliableEventHandler(IServiceProvider serviceProvider) { - _messageStorage = serviceProvider.GetRequiredService(typeof(IMessageStorage)) as IMessageStorage; - var loggerFactory = serviceProvider.GetRequiredService(typeof(ILoggerFactory)) as ILoggerFactory; + _messageStorage = serviceProvider.GetRequiredService(); + var loggerFactory = serviceProvider.GetRequiredService(); _logger = loggerFactory.CreateLogger>(); _serviceProvider = serviceProvider; } diff --git a/src/Pole.ReliableMessage.Masstransit/QueueHaType.cs b/src/Pole.ReliableMessage.Masstransit/QueueHaType.cs index 6914c6c..6fc0f3e 100644 --- a/src/Pole.ReliableMessage.Masstransit/QueueHaType.cs +++ b/src/Pole.ReliableMessage.Masstransit/QueueHaType.cs @@ -1,4 +1,4 @@ -using Pole.ReliableMessage.Core; +using Pole.Domain; using System; using System.Collections.Generic; using System.Text; diff --git a/src/Pole.ReliableMessage.Storage.Abstraction/IMessageStorage.cs b/src/Pole.ReliableMessage.Storage.Abstraction/IMessageStorage.cs new file mode 100644 index 0000000..d2a265e --- /dev/null +++ b/src/Pole.ReliableMessage.Storage.Abstraction/IMessageStorage.cs @@ -0,0 +1,46 @@ +using System; +using System.Collections.Generic; +using System.Linq.Expressions; +using System.Text; +using System.Threading.Tasks; + +namespace Pole.ReliableMessage.Storage.Abstraction +{ + public interface IMessageStorage + { + /// + /// + /// + /// + /// + Task Add(Message message); + + Task Delete(Expression> filter); + + /// + /// + /// + /// + /// + /// + Task> GetMany(Expression> filter, int count); + + /// + /// 批量更新 + /// 更新这几个值 MessageStatusId , RetryTimes LastRetryUTCTime, NextRetryUTCTime + /// + /// + /// + Task Save(IEnumerable messages); + + /// + /// 检查 消息的状态,如果不是指定状态则返回true,并且更新状态到指定状态 ,如果已经是指定状态返回false + /// + /// + /// + /// + Task CheckAndUpdateStatus(Expression> filter, MessageStatus messageStatus); + + Task UpdateStatus(Expression> filter, MessageStatus messageStatus); + } +} diff --git a/src/Pole.ReliableMessage.Storage.Abstraction/MemberShipTable.cs b/src/Pole.ReliableMessage.Storage.Abstraction/MemberShipTable.cs new file mode 100644 index 0000000..ba28c92 --- /dev/null +++ b/src/Pole.ReliableMessage.Storage.Abstraction/MemberShipTable.cs @@ -0,0 +1,21 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace Pole.ReliableMessage.Storage.Abstraction +{ + public class MemberShipTable + { + public string Id { get;private set; } + public MemberShipTable(string serviceName,string pendingMessageCheckerIp,DateTime iAmAliveUTCTime) + { + ServiceName = serviceName; + PendingMessageCheckerIp = pendingMessageCheckerIp; + IAmAliveUTCTime = iAmAliveUTCTime; + } + + public string ServiceName { get;private set; } + public string PendingMessageCheckerIp { get; private set; } + public DateTime IAmAliveUTCTime { get; private set; } + } +} diff --git a/src/Pole.ReliableMessage.Storage.Abstraction/Message.cs b/src/Pole.ReliableMessage.Storage.Abstraction/Message.cs new file mode 100644 index 0000000..da785a8 --- /dev/null +++ b/src/Pole.ReliableMessage.Storage.Abstraction/Message.cs @@ -0,0 +1,79 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace Pole.ReliableMessage.Storage.Abstraction +{ + public class Message : IComparable + { + /// + /// 这里id 永远为 string + /// + public string Id { get; set; } + + /// + /// 消息状态 + /// + public MessageStatus MessageStatus { get; set; } + + /// + /// 消息状态Id + /// + + public int MessageStatusId { get; set; } + + /// + /// 预发送的时间 + /// + public DateTime AddedUTCTime { get; set; } + + /// + /// 用来存放 消息内容 目前没有大小限制 这个需要根据 实际情况 , mongodb 和 rabiitmq 的 综合指标来定 ,开发人员 在使用超大对象时需谨慎 + /// + public string Content { get; set; } + + /// + /// 消息的名称 用来鉴别不同的消息 + /// + public string MessageTypeId { get; set; } + + /// + /// 当前消息 回调者所需参数值 + /// + public string RePushCallBackParameterValue { get; set; } + + ///// + ///// 最后一次的重试时间 + ///// + //public DateTime LastRetryUTCTime { get; set; } + + + /// + /// 下一次的重试时间 + /// + public DateTime NextRetryUTCTime { get; set; } + + /// + /// 重试次数 + /// + public int RetryTimes { get; set; } = 0; + + public int CompareTo(Message other) + { + return Id.CompareTo(other.Id); + } + } + public class MessageIEqualityComparer : IEqualityComparer + { + public static MessageIEqualityComparer Default = new MessageIEqualityComparer(); + public bool Equals(Message x, Message y) + { + return x.CompareTo(y) == 0; + } + + public int GetHashCode(Message obj) + { + return obj.Id.GetHashCode(); + } + } +} diff --git a/src/Pole.ReliableMessage.Storage.Abstraction/MessageStatus.cs b/src/Pole.ReliableMessage.Storage.Abstraction/MessageStatus.cs new file mode 100644 index 0000000..1cc5d53 --- /dev/null +++ b/src/Pole.ReliableMessage.Storage.Abstraction/MessageStatus.cs @@ -0,0 +1,20 @@ +using Pole.Domain; +using System; +using System.Collections.Generic; +using System.Text; + +namespace Pole.ReliableMessage.Storage.Abstraction +{ + public class MessageStatus : Enumeration + { + public static MessageStatus Pending = new MessageStatus(3,"待发送"); + public static MessageStatus Pushed = new MessageStatus(6,"已发送"); + public static MessageStatus Canced = new MessageStatus(9,"已取消"); + public static MessageStatus Handed = new MessageStatus(12, "已处理"); + + public MessageStatus(int id,string name ):base(id,name) + { + + } + } +} diff --git a/src/Pole.ReliableMessage.Storage.Abstraction/Pole.ReliableMessage.Storage.Abstraction.csproj b/src/Pole.ReliableMessage.Storage.Abstraction/Pole.ReliableMessage.Storage.Abstraction.csproj index 9f5c4f4..c48c146 100644 --- a/src/Pole.ReliableMessage.Storage.Abstraction/Pole.ReliableMessage.Storage.Abstraction.csproj +++ b/src/Pole.ReliableMessage.Storage.Abstraction/Pole.ReliableMessage.Storage.Abstraction.csproj @@ -4,4 +4,8 @@ netstandard2.0 + + + + diff --git a/src/Pole.ReliableMessage.Storage.Mongodb/MongodbOption.cs b/src/Pole.ReliableMessage.Storage.Mongodb/MongodbOption.cs index 9f39701..3990f98 100644 --- a/src/Pole.ReliableMessage.Storage.Mongodb/MongodbOption.cs +++ b/src/Pole.ReliableMessage.Storage.Mongodb/MongodbOption.cs @@ -8,16 +8,6 @@ namespace Pole.ReliableMessage.Storage.Mongodb { public string MessageDatabaseName { get; set; } = "ReliableMessage"; public string MembershipCollectionName { get; set; } = "Membership"; - /// - /// bucket 中最大消息数 一旦达到最大数量 后面的数据将覆盖前面的数据 - /// - public long CollectionMaxMessageCount { get; set; } = 20000000; - - /// - /// 默认最大为10G - /// - public long CollectionMaxSize { get; set; } = 10*1024*1024*1024L; - public string ServiceCollectionName { get; set; } public MongoHost[] Servers { get; set; } } diff --git a/src/Pole.ReliableMessage.Storage.Mongodb/MongodbStorage.cs b/src/Pole.ReliableMessage.Storage.Mongodb/MongodbStorage.cs index a22e5af..c7193a5 100644 --- a/src/Pole.ReliableMessage.Storage.Mongodb/MongodbStorage.cs +++ b/src/Pole.ReliableMessage.Storage.Mongodb/MongodbStorage.cs @@ -10,6 +10,8 @@ using System.Linq; using System.Linq.Expressions; using System.Text; using System.Threading.Tasks; +using Pole.ReliableMessage.Storage.Abstraction; +using Pole.Domain; namespace Pole.ReliableMessage.Storage.Mongodb { @@ -56,14 +58,14 @@ namespace Pole.ReliableMessage.Storage.Mongodb return true; } - public async Task> GetMany(Expression> filter,int count) + public async Task> GetMany(Expression> filter, int count) { IMongoCollection collection = GetCollection(); - var list= await collection.Find(filter).Limit(count).ToListAsync(); + var list = await collection.Find(filter).Limit(count).ToListAsync(); list.ForEach(m => { - m.MessageStatus = Core.Enumeration.FromValue(m.MessageStatusId); + m.MessageStatus = Enumeration.FromValue(m.MessageStatusId); }); return list; } @@ -98,43 +100,21 @@ namespace Pole.ReliableMessage.Storage.Mongodb return result.IsAcknowledged; } - public async Task UpdateStatus(IEnumerable messages) + public async Task UpdateStatus(Expression> filter, MessageStatus messageStatus) { - var count = messages.Count(); - _logger.LogDebug($"MongodbMessageStorage updateStatus begin, Messages count: {messages.Count()}"); - if (count == 0) - { - _logger.LogDebug($"MongodbMessageStorage updateStatus successfully, Modified count: 0"); - return true; - } IMongoCollection collection = GetCollection(); - var models = new List>(); - - foreach (var message in messages) - { - FilterDefinition filter = Builders.Filter.Where(m => m.Id == message.Id&&m.MessageStatusId!=MessageStatus.Handed.Id); - UpdateDefinition update = Builders.Update - .Set(m => m.MessageStatusId, message.MessageStatus.Id); - - var model = new UpdateOneModel(filter, update); - models.Add(model); - } - - var result = await collection.BulkWriteAsync(models, new BulkWriteOptions { IsOrdered = false }); - - _logger.LogDebug($"MongodbMessageStorage updateStatus successfully, Modified count: {result.ModifiedCount}"); - + var update = Builders.Update.Set(m => m.MessageStatusId, messageStatus.Id); + var result = await collection.UpdateOneAsync(filter, update); return result.IsAcknowledged; } - public async Task UpdateStatus(Expression> filter, MessageStatus messageStatus) + public async Task Delete(Expression> filter) { IMongoCollection collection = GetCollection(); - var update = Builders.Update.Set(m => m.MessageStatusId, messageStatus.Id); - var result = await collection.UpdateOneAsync(filter, update); - return result.IsAcknowledged; + var result = await collection.DeleteManyAsync(filter); + return result.DeletedCount; } } } diff --git a/src/Pole.ReliableMessage.Storage.Mongodb/ReliableMessageOptionExtension.cs b/src/Pole.ReliableMessage.Storage.Mongodb/ReliableMessageOptionExtension.cs index 781bd2e..13c51a4 100644 --- a/src/Pole.ReliableMessage.Storage.Mongodb/ReliableMessageOptionExtension.cs +++ b/src/Pole.ReliableMessage.Storage.Mongodb/ReliableMessageOptionExtension.cs @@ -60,13 +60,7 @@ namespace Microsoft.Extensions.DependencyInjection if (!collectionNames.Contains(mongodbOption.ServiceCollectionName)) { - database.CreateCollection(mongodbOption.ServiceCollectionName, new CreateCollectionOptions - { - Capped = true, - MaxDocuments = mongodbOption.CollectionMaxMessageCount, - MaxSize = mongodbOption.CollectionMaxSize, - - }); + database.CreateCollection(mongodbOption.ServiceCollectionName); var messageCollection = database.GetCollection(mongodbOption.ServiceCollectionName); AddMessageCollectionIndex(messageCollection); } @@ -74,7 +68,6 @@ namespace Microsoft.Extensions.DependencyInjection if (!collectionNames.Contains(mongodbOption.MembershipCollectionName)) { database.CreateCollection(mongodbOption.MembershipCollectionName); - var membershipCollection = database.GetCollection(mongodbOption.MembershipCollectionName); AddMemberShipTableCollectionIndex(membershipCollection); } diff --git a/src/Pole.ReliableMessage/Abstraction/IMessageBuffer.cs b/src/Pole.ReliableMessage/Abstraction/IMessageBuffer.cs deleted file mode 100644 index 7055878..0000000 --- a/src/Pole.ReliableMessage/Abstraction/IMessageBuffer.cs +++ /dev/null @@ -1,16 +0,0 @@ -using Pole.ReliableMessage.Messaging; -using System; -using System.Collections.Generic; -using System.Linq.Expressions; -using System.Text; -using System.Threading.Tasks; - -namespace Pole.ReliableMessage.Abstraction -{ - public interface IMessageBuffer - { - Task Flush(); - Task Add(Message message); - Task> GetAll(Func filter); - } -} diff --git a/src/Pole.ReliableMessage/Abstraction/IMessageCheckRetryer.cs b/src/Pole.ReliableMessage/Abstraction/IMessageCheckRetryer.cs new file mode 100644 index 0000000..a16c5b9 --- /dev/null +++ b/src/Pole.ReliableMessage/Abstraction/IMessageCheckRetryer.cs @@ -0,0 +1,13 @@ +using Pole.ReliableMessage.Storage.Abstraction; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; + +namespace Pole.ReliableMessage.Abstraction +{ + public interface IMessageCheckRetryer + { + Task Execute(IEnumerable messages, DateTime dateTime); + } +} diff --git a/src/Pole.ReliableMessage/Abstraction/IMessageChecker.cs b/src/Pole.ReliableMessage/Abstraction/IMessageChecker.cs index 11b2e44..5092971 100644 --- a/src/Pole.ReliableMessage/Abstraction/IMessageChecker.cs +++ b/src/Pole.ReliableMessage/Abstraction/IMessageChecker.cs @@ -1,4 +1,5 @@ using Pole.ReliableMessage.Messaging; +using Pole.ReliableMessage.Storage.Abstraction; using System; using System.Collections.Generic; using System.Text; diff --git a/src/Pole.ReliableMessage/Abstraction/IMessageStorage.cs b/src/Pole.ReliableMessage/Abstraction/IMessageStorage.cs deleted file mode 100644 index 849fcc6..0000000 --- a/src/Pole.ReliableMessage/Abstraction/IMessageStorage.cs +++ /dev/null @@ -1,60 +0,0 @@ -using Pole.ReliableMessage.Messaging; -using System; -using System.Collections.Generic; -using System.Linq.Expressions; -using System.Text; -using System.Threading.Tasks; - -namespace Pole.ReliableMessage.Abstraction -{ - public interface IMessageStorage - { - /// - /// - /// - /// - /// - Task Add(Message message); - ///// - ///// - ///// - ///// - ///// - ///// - //Task Get(Expression> filter); - /// - /// - /// - /// - /// - /// - Task> GetMany(Expression> filter, int count); - /// - /// 批量更新 - /// 更新这几个值 MessageStatusId , RetryTimes LastRetryUTCTime, NextRetryUTCTime - /// - /// - /// - Task Save(IEnumerable messages); - - Task UpdateStatus(IEnumerable messages); - - ///// - ///// - ///// - ///// 这里id 永远为 string - ///// - ///// - //Task Update(Expression> filter, MessageStatus messageStatus); - - /// - /// 检查 消息的状态,如果不是指定状态则返回true,并且更新状态到指定状态 ,如果已经是指定状态返回false - /// - /// - /// - /// - Task CheckAndUpdateStatus(Expression> filter, MessageStatus messageStatus); - - Task UpdateStatus(Expression> filter, MessageStatus messageStatus); - } -} diff --git a/src/Pole.ReliableMessage/ComteckReliableMessageServiceCollectionExtensions.cs b/src/Pole.ReliableMessage/ComteckReliableMessageServiceCollectionExtensions.cs index e92f470..fa9bcee 100644 --- a/src/Pole.ReliableMessage/ComteckReliableMessageServiceCollectionExtensions.cs +++ b/src/Pole.ReliableMessage/ComteckReliableMessageServiceCollectionExtensions.cs @@ -28,7 +28,6 @@ namespace Microsoft.Extensions.DependencyInjection services.Configure(optionConfig); services.AddSingleton(); - services.AddSingleton(); services.AddSingleton(); services.AddSingleton(); services.AddSingleton(); @@ -51,7 +50,7 @@ namespace Microsoft.Extensions.DependencyInjection - services.AddSingleton(); + services.AddSingleton(); services.AddSingleton(); services.AddSingleton(); diff --git a/src/Pole.ReliableMessage/Core/Enumeration.cs b/src/Pole.ReliableMessage/Core/Enumeration.cs deleted file mode 100644 index 3c584b6..0000000 --- a/src/Pole.ReliableMessage/Core/Enumeration.cs +++ /dev/null @@ -1,73 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Reflection; -using System.Text; - -namespace Pole.ReliableMessage.Core -{ - public abstract class Enumeration : IComparable - { - public string Name { get; private set; } - - public int Id { get; private set; } - - protected Enumeration(int id, string name) - { - Id = id; - Name = name; - } - - public override string ToString() => Name; - - public static IEnumerable GetAll() where T : Enumeration - { - var fields = typeof(T).GetFields(BindingFlags.Public | BindingFlags.Static | BindingFlags.DeclaredOnly); - - return fields.Select(f => f.GetValue(null)).Cast(); - } - - public override bool Equals(object obj) - { - if (!(obj is Enumeration otherValue)) - return false; - - var typeMatches = GetType().Equals(obj.GetType()); - var valueMatches = Id.Equals(otherValue.Id); - - return typeMatches && valueMatches; - } - - public override int GetHashCode() => Id.GetHashCode(); - - public static int AbsoluteDifference(Enumeration firstValue, Enumeration secondValue) - { - var absoluteDifference = Math.Abs(firstValue.Id - secondValue.Id); - return absoluteDifference; - } - - public static T FromValue(int value) where T : Enumeration - { - var matchingItem = Parse(value, "value", item => item.Id == value); - return matchingItem; - } - - public static T FromDisplayName(string displayName) where T : Enumeration - { - var matchingItem = Parse(displayName, "display name", item => item.Name == displayName); - return matchingItem; - } - - private static T Parse(K value, string description, Func predicate) where T : Enumeration - { - var matchingItem = GetAll().FirstOrDefault(predicate); - - if (matchingItem == null) - throw new InvalidOperationException($"'{value}' is not a valid {description} in {typeof(T)}"); - - return matchingItem; - } - - public int CompareTo(object other) => Id.CompareTo(((Enumeration)other).Id); - } -} diff --git a/src/Pole.ReliableMessage/DefaultMessageCheckRetryer.cs b/src/Pole.ReliableMessage/DefaultMessageCheckRetryer.cs new file mode 100644 index 0000000..3c9c692 --- /dev/null +++ b/src/Pole.ReliableMessage/DefaultMessageCheckRetryer.cs @@ -0,0 +1,103 @@ +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using Pole.ReliableMessage.Abstraction; +using Pole.ReliableMessage.Storage.Abstraction; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Pole.ReliableMessage +{ + class DefaultMessageCheckRetryer : IMessageCheckRetryer + { + private readonly ILogger _logger; + private readonly IRetryTimeDelayCalculator _retryTimeDelayCalculator; + private readonly ReliableMessageOption _options; + private readonly IMessageStorage _storage; + private readonly IMessageChecker _messageChecker; + private readonly IMessageBus _messageBus; + private readonly List _changedMessage = new List(); + public DefaultMessageCheckRetryer(ILogger logger, IRetryTimeDelayCalculator retryTimeDelayCalculator, IOptions options, IMessageStorage storage, IMessageChecker messageChecker, IMessageBus messageBus) + { + _logger = logger; + _retryTimeDelayCalculator = retryTimeDelayCalculator; + _options = options.Value ?? throw new Exception($"{nameof(ReliableMessageOption)} Must be injected"); + _storage = storage; + _messageChecker = messageChecker; + _messageBus = messageBus; + } + public async Task Execute(IEnumerable messages, DateTime dateTime) + { + try + { + messages.AsParallel().ForAll(async m => await Retry(m, dateTime)); + if (_changedMessage.Count != 0) + { + await _storage.Save(_changedMessage); + } + } + catch (Exception ex) + { + _logger.LogError(ex, $"DefaultMessageCheckRetryer.Execute ,Execute with errors"); + } + finally + { + if (_changedMessage.Count != 0) + { + _changedMessage.Clear(); + } + } + } + private async Task Retry(Message message, DateTime retryTime) + { + try + { + if (_logger.IsEnabled(LogLevel.Debug)) + { + _logger.LogDebug($"DefaultMessageCheckRetryer.Retry ,message:{message.Id} begin Retry"); + } + var nextRetryDelay = _retryTimeDelayCalculator.Get(message.RetryTimes, _options.MaxPendingMessageRetryDelay); + message.NextRetryUTCTime = retryTime.AddSeconds(nextRetryDelay); + + if (retryTime > message.AddedUTCTime.AddSeconds(_options.PendingMessageTimeOut)) + { + if (_logger.IsEnabled(LogLevel.Debug)) + { + _logger.LogDebug($"DefaultMessageCheckRetryer.Retry ,message:{message.Id} would be Canced ,PendingMessageTimeOut:{_options.PendingMessageTimeOut}"); + } + + message.NextRetryUTCTime = DateTime.MinValue; + message.MessageStatus = MessageStatus.Canced; + _changedMessage.Add(message); + return; + } + message.RetryTimes++; + + var messageCheckerResult = await _messageChecker.GetResult(message); + if (messageCheckerResult.IsFinished) + { + if (_logger.IsEnabled(LogLevel.Debug)) + { + _logger.LogDebug($"DefaultMessageCheckRetryer.Retry ,message:{message.Id} would be Pushed"); + } + message.MessageStatus = MessageStatus.Pushed; + await _messageBus.Publish(messageCheckerResult.Event, message.Id); + } + else + { + if (_logger.IsEnabled(LogLevel.Debug)) + { + _logger.LogDebug($"DefaultMessageCheckRetryer.Retry ,message:{message.Id} would be Retry next time"); + } + } + _changedMessage.Add(message); + } + catch (Exception ex) + { + _logger.LogError(ex, $"DefaultMessageCheckRetryer.Retry ,Message:{message.Id} retry with errors"); + } + } + } +} diff --git a/src/Pole.ReliableMessage/EventBus/DefaultReliableBus.cs b/src/Pole.ReliableMessage/EventBus/DefaultReliableBus.cs index 597bf07..5c6eacc 100644 --- a/src/Pole.ReliableMessage/EventBus/DefaultReliableBus.cs +++ b/src/Pole.ReliableMessage/EventBus/DefaultReliableBus.cs @@ -1,5 +1,4 @@ using Pole.ReliableMessage.Abstraction; -using Pole.ReliableMessage.Messaging; using Microsoft.Extensions.Logging; using System; using System.Collections.Generic; @@ -7,6 +6,7 @@ using System.Text; using System.Threading; using System.Threading.Tasks; using ILogger = Microsoft.Extensions.Logging.ILogger; +using Pole.ReliableMessage.Storage.Abstraction; namespace Pole.Pole.ReliableMessage.EventBus { @@ -16,18 +16,17 @@ namespace Pole.Pole.ReliableMessage.EventBus private readonly IMessageStorage _messageStorage; private readonly IMessageIdGenerator _messageIdGenerator; private readonly ITimeHelper _timeHelper; - private readonly IMessageBuffer _messageBuffer; + //private readonly IMessageBuffer _messageBuffer; private readonly ILogger _logger; private readonly IJsonConverter _jsonConverter; private readonly IMessageCallBackInfoStore _messageCallBackInfoStore; private readonly IMessageTypeIdGenerator _messageTypeIdGenerator; - public DefaultReliableBus(IMessageBus messageBus, IMessageStorage messageStorage, IMessageIdGenerator messageIdGenerator, ITimeHelper timeHelper, IMessageBuffer messageBuffer, ILogger logger, IJsonConverter jsonConverter, IMessageCallBackInfoStore messageCallBackInfoStore, IMessageTypeIdGenerator messageTypeIdGenerator) + public DefaultReliableBus(IMessageBus messageBus, IMessageStorage messageStorage, IMessageIdGenerator messageIdGenerator, ITimeHelper timeHelper, ILogger logger, IJsonConverter jsonConverter, IMessageCallBackInfoStore messageCallBackInfoStore, IMessageTypeIdGenerator messageTypeIdGenerator) { _messageBus = messageBus; _messageStorage = messageStorage; _messageIdGenerator = messageIdGenerator; _timeHelper = timeHelper; - _messageBuffer = messageBuffer; _logger = logger; _jsonConverter = jsonConverter; _messageCallBackInfoStore = messageCallBackInfoStore; @@ -97,8 +96,8 @@ namespace Pole.Pole.ReliableMessage.EventBus { await _messageBus.Publish(@event, prePublishMessageId, cancellationToken); - var messageBufferResult = await _messageBuffer.Add(new Message { Id = prePublishMessageId, MessageStatus = MessageStatus.Pushed }); - return true; + var messageBufferResult = await _messageStorage.UpdateStatus(m => m.Id == prePublishMessageId && m.MessageStatusId == MessageStatus.Pending.Id, MessageStatus.Pushed); + return messageBufferResult; } catch (Exception ex) { diff --git a/src/Pole.ReliableMessage/MemberShipTable.cs b/src/Pole.ReliableMessage/MemberShipTable.cs deleted file mode 100644 index 535ec71..0000000 --- a/src/Pole.ReliableMessage/MemberShipTable.cs +++ /dev/null @@ -1,21 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Text; - -namespace Pole.ReliableMessage -{ - public class MemberShipTable - { - public string Id { get;private set; } - public MemberShipTable(string serviceName,string pendingMessageCheckerIp,DateTime iAmAliveUTCTime) - { - ServiceName = serviceName; - PendingMessageCheckerIp = pendingMessageCheckerIp; - IAmAliveUTCTime = iAmAliveUTCTime; - } - - public string ServiceName { get;private set; } - public string PendingMessageCheckerIp { get; private set; } - public DateTime IAmAliveUTCTime { get; private set; } - } -} diff --git a/src/Pole.ReliableMessage/Messaging/CallBack/MessageCallBackResponseStatus.cs b/src/Pole.ReliableMessage/Messaging/CallBack/MessageCallBackResponseStatus.cs index f4f2875..331ff05 100644 --- a/src/Pole.ReliableMessage/Messaging/CallBack/MessageCallBackResponseStatus.cs +++ b/src/Pole.ReliableMessage/Messaging/CallBack/MessageCallBackResponseStatus.cs @@ -1,4 +1,4 @@ -using Pole.ReliableMessage.Core; +using Pole.Domain; using System; using System.Collections.Generic; using System.Text; diff --git a/src/Pole.ReliableMessage/Messaging/DefaultMessageBuffer.cs b/src/Pole.ReliableMessage/Messaging/DefaultMessageBuffer.cs deleted file mode 100644 index 27a6abf..0000000 --- a/src/Pole.ReliableMessage/Messaging/DefaultMessageBuffer.cs +++ /dev/null @@ -1,58 +0,0 @@ -using Pole.ReliableMessage.Abstraction; -using Microsoft.Extensions.Logging; -using System; -using System.Collections.Generic; -using System.Linq; -using System.Linq.Expressions; -using System.Text; -using System.Threading.Tasks; - -namespace Pole.ReliableMessage.Messaging -{ - class DefaultMessageBuffer : IMessageBuffer - { - private readonly IMessageStorage _storage; - private readonly System.Collections.Concurrent.ConcurrentDictionary Messages = new System.Collections.Concurrent.ConcurrentDictionary(); - private readonly ILogger _logger; - public DefaultMessageBuffer(IMessageStorage storage, ILogger logger) - { - _storage = storage; - _logger = logger; - } - public async Task Flush() - { - /// 通过 MessageTypeId 是否为空 判断 消息是否为 DefaultReliableBus Publish 完成后的消息状态修改缓冲, - var toUpdateStatusMessageKeyValuePairs = Messages.Where(m => string.IsNullOrEmpty(m.Value.MessageTypeId)); - var toUpdateStatusMessages= toUpdateStatusMessageKeyValuePairs.Select(m=>m.Value).ToArray(); - var toUpdateStatusMessageIds = toUpdateStatusMessageKeyValuePairs.Select(m => m.Key).ToList(); - await _storage.UpdateStatus(toUpdateStatusMessages); - - _logger.LogDebug($"DefaultMessageBuffer.Flush update successfully, Message count{toUpdateStatusMessages.Count()}"); - toUpdateStatusMessageIds.ForEach(m => { - Messages.TryRemove(m,out Message message); - }); - - var toSavedMessageKeyValuePairs = Messages.Where(m => !string.IsNullOrEmpty(m.Value.MessageTypeId)); - var toSavedMessages = toSavedMessageKeyValuePairs.Select(m => m.Value).ToArray(); - var toSavedMessagesIds = toSavedMessageKeyValuePairs.Select(m => m.Key).ToList(); - await _storage.Save(toSavedMessages); - - _logger.LogDebug($"DefaultMessageBuffer.Flush save successfully, Message count{toSavedMessages.Count()}"); - toSavedMessagesIds.ForEach(m => { - Messages.TryRemove(m, out Message message); - }); - } - - public Task Add(Message message) - { - Messages.TryAdd(message.Id, message); - return Task.FromResult(true); - } - - public async Task> GetAll(Func filter) - { - await Task.CompletedTask; - return Messages.Values.Where(filter).ToList(); - } - } -} diff --git a/src/Pole.ReliableMessage/Messaging/DefaultMessageChecker.cs b/src/Pole.ReliableMessage/Messaging/DefaultMessageChecker.cs index 731aff0..fc44b21 100644 --- a/src/Pole.ReliableMessage/Messaging/DefaultMessageChecker.cs +++ b/src/Pole.ReliableMessage/Messaging/DefaultMessageChecker.cs @@ -7,6 +7,7 @@ using System; using System.Collections.Generic; using System.Text; using System.Threading.Tasks; +using Pole.ReliableMessage.Storage.Abstraction; namespace Pole.ReliableMessage.Messaging { diff --git a/src/Pole.ReliableMessage/Messaging/Message.cs b/src/Pole.ReliableMessage/Messaging/Message.cs deleted file mode 100644 index 048ebd4..0000000 --- a/src/Pole.ReliableMessage/Messaging/Message.cs +++ /dev/null @@ -1,79 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Text; - -namespace Pole.ReliableMessage.Messaging -{ - public class Message : IComparable - { - /// - /// 这里id 永远为 string - /// - public string Id { get; set; } - - /// - /// 消息状态 - /// - public MessageStatus MessageStatus { get; set; } - - /// - /// 消息状态Id - /// - - public int MessageStatusId { get; set; } - - /// - /// 预发送的时间 - /// - public DateTime AddedUTCTime { get; set; } - - /// - /// 用来存放 消息内容 目前没有大小限制 这个需要根据 实际情况 , mongodb 和 rabiitmq 的 综合指标来定 ,开发人员 在使用超大对象时需谨慎 - /// - public string Content { get; set; } - - /// - /// 消息的名称 用来鉴别不同的消息 - /// - public string MessageTypeId { get; set; } - - /// - /// 当前消息 回调者所需参数值 - /// - public string RePushCallBackParameterValue { get; set; } - - ///// - ///// 最后一次的重试时间 - ///// - //public DateTime LastRetryUTCTime { get; set; } - - - /// - /// 下一次的重试时间 - /// - public DateTime NextRetryUTCTime { get; set; } - - /// - /// 重试次数 - /// - public int RetryTimes { get; set; } = 0; - - public int CompareTo(Message other) - { - return Id.CompareTo(other.Id); - } - } - public class MessageIEqualityComparer : IEqualityComparer - { - public static MessageIEqualityComparer Default = new MessageIEqualityComparer(); - public bool Equals(Message x, Message y) - { - return x.CompareTo(y) == 0; - } - - public int GetHashCode(Message obj) - { - return obj.Id.GetHashCode(); - } - } -} diff --git a/src/Pole.ReliableMessage/Messaging/MessageStatus.cs b/src/Pole.ReliableMessage/Messaging/MessageStatus.cs deleted file mode 100644 index 100ca52..0000000 --- a/src/Pole.ReliableMessage/Messaging/MessageStatus.cs +++ /dev/null @@ -1,20 +0,0 @@ -using Pole.ReliableMessage.Core; -using System; -using System.Collections.Generic; -using System.Text; - -namespace Pole.ReliableMessage.Messaging -{ - public class MessageStatus : Enumeration - { - public static MessageStatus Pending = new MessageStatus(3,"待发送"); - public static MessageStatus Pushed = new MessageStatus(6,"已发送"); - public static MessageStatus Canced = new MessageStatus(9,"已取消"); - public static MessageStatus Handed = new MessageStatus(12, "已处理"); - - public MessageStatus(int id,string name ):base(id,name) - { - - } - } -} diff --git a/src/Pole.ReliableMessage/Processor/MessageBufferFlushProcessor.cs b/src/Pole.ReliableMessage/Processor/MessageBufferFlushProcessor.cs deleted file mode 100644 index 8421317..0000000 --- a/src/Pole.ReliableMessage/Processor/MessageBufferFlushProcessor.cs +++ /dev/null @@ -1,41 +0,0 @@ -using Pole.ReliableMessage.Abstraction; -using Pole.ReliableMessage.Messaging; -using Microsoft.Extensions.Logging; -using Microsoft.Extensions.Options; -using System; -using System.Collections.Generic; -using System.Text; -using System.Threading.Tasks; - -namespace Pole.ReliableMessage.Processor -{ - public class MessageBufferFlushProcessor : ProcessorBase - { - private readonly IMessageBuffer _messageBuffer; - private readonly ReliableMessageOption _options; - private readonly ILogger _logger; - public MessageBufferFlushProcessor(IMessageBuffer messageBuffer, IOptions options, ILogger logger) - { - _messageBuffer = messageBuffer; - _options = options.Value ?? throw new Exception($"{nameof(ReliableMessageOption)} Must be injected"); - _logger = logger; - } - public override string Name => nameof(PendingMessageCheckProcessor); - - public override async Task Process(ProcessingContext context) - { - try - { - await _messageBuffer.Flush(); - } - catch(Exception ex) - { - _logger.LogError(ex, $"MessageBufferFlushProcessor Process Error"); - } - finally - { - await Task.Delay(_options.PushedMessageFlushInterval * 1000); - } - } - } -} diff --git a/src/Pole.ReliableMessage/Processor/MessageCleanProcessor.cs b/src/Pole.ReliableMessage/Processor/MessageCleanProcessor.cs new file mode 100644 index 0000000..6396e14 --- /dev/null +++ b/src/Pole.ReliableMessage/Processor/MessageCleanProcessor.cs @@ -0,0 +1,58 @@ +using Pole.ReliableMessage.Abstraction; +using Pole.ReliableMessage.Messaging; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; +using Pole.ReliableMessage.Storage.Abstraction; + +namespace Pole.ReliableMessage.Processor +{ + public class MessageCleanProcessor : ProcessorBase + { + private readonly ReliableMessageOption _options; + private readonly ILogger _logger; + private readonly IMessageStorage _messageStorage; + private readonly IMemberShipTable _memberShipTable; + private readonly IServiceIPv4AddressProvider _serviceIPv4AddressProvider; + public MessageCleanProcessor(IOptions options, ILogger logger, IMessageStorage messageStorage, IMemberShipTable memberShipTable, IServiceIPv4AddressProvider serviceIPv4AddressProvider) + { + _options = options.Value ?? throw new Exception($"{nameof(ReliableMessageOption)} Must be injected"); + _logger = logger; + _messageStorage = messageStorage; + _memberShipTable = memberShipTable; + _serviceIPv4AddressProvider = serviceIPv4AddressProvider; + } + public override string Name => nameof(PendingMessageCheckProcessor); + + + public override async Task Process(ProcessingContext context) + { + try + { + var iPStr = _serviceIPv4AddressProvider.Get(); + var isPendingChecker = await _memberShipTable.IsPendingMessageCheckerServiceInstance(iPStr);// 这里可以把时间加上去 + if (!isPendingChecker) + { + _logger.LogDebug("I an not the pendingChecker ,Ignore clean up messages"); + return; + } + _logger.LogInformation($"Begin clean message"); + + var deletedCount = await _messageStorage.Delete(m => m.MessageStatusId == MessageStatus.Canced.Id || m.MessageStatusId == MessageStatus.Handed.Id); + + _logger.LogInformation($"End clean message ,delete message count : {deletedCount} , successfully"); + } + catch (Exception ex) + { + _logger.LogError(ex, $"Clean message error"); + } + finally + { + await Task.Delay(_options.MessageCleanInterval * 1000); + } + } + } +} diff --git a/src/Pole.ReliableMessage/Processor/PendingMessageCheckProcessor.cs b/src/Pole.ReliableMessage/Processor/PendingMessageCheckProcessor.cs index 7d25d53..224c221 100644 --- a/src/Pole.ReliableMessage/Processor/PendingMessageCheckProcessor.cs +++ b/src/Pole.ReliableMessage/Processor/PendingMessageCheckProcessor.cs @@ -17,26 +17,22 @@ namespace Pole.ReliableMessage.Processor { private readonly IMessageStorage _storage; private readonly ReliableMessageOption _options; - private readonly IMessageBuffer _messageBuffer; + //private readonly IMessageBuffer _messageBuffer; private readonly ITimeHelper _timeHelper; - private readonly IMessageChecker _messageChecker; - private readonly IRetryTimeDelayCalculator _retryTimeDelayCalculator; private readonly IMemberShipTable _memberShipTable; private readonly ILogger _logger; private readonly IServiceIPv4AddressProvider _serviceIPv4AddressProvider; - private readonly IMessageBus _messageBus; - public PendingMessageCheckProcessor(IMessageStorage storage, IOptions options, IMessageBuffer messageBuffer, ITimeHelper timeHelper, IMessageChecker messageChecker, IRetryTimeDelayCalculator retryTimeDelayCalculator, IMemberShipTable memberShipTable, ILogger logger, IServiceIPv4AddressProvider serviceIPv4AddressProvider, IMessageBus messageBus) + private readonly IMessageCheckRetryer _messageCheckRetryer; + public PendingMessageCheckProcessor(IMessageStorage storage, IOptions options, ITimeHelper timeHelper, IMemberShipTable memberShipTable, ILogger logger, IServiceIPv4AddressProvider serviceIPv4AddressProvider, IMessageCheckRetryer messageCheckRetryer) { _storage = storage; _options = options.Value ?? throw new Exception($"{nameof(ReliableMessageOption)} Must be injected"); - _messageBuffer = messageBuffer; + //_messageBuffer = messageBuffer; _timeHelper = timeHelper; - _messageChecker = messageChecker; - _retryTimeDelayCalculator = retryTimeDelayCalculator; _memberShipTable = memberShipTable; _logger = logger; _serviceIPv4AddressProvider = serviceIPv4AddressProvider; - _messageBus = messageBus; + _messageCheckRetryer = messageCheckRetryer; } public override string Name => nameof(PendingMessageCheckProcessor); @@ -54,20 +50,7 @@ namespace Pole.ReliableMessage.Processor return; } - var now = _timeHelper.GetUTCNow(); - var pendingMessages = await _storage.GetMany(m => m.MessageStatusId == MessageStatus.Pending.Id &&m.NextRetryUTCTime <= now && m.AddedUTCTime <= now.AddSeconds(-1 * _options.PendingMessageFirstProcessingWaitTime)&&m.AddedUTCTime>= now.AddSeconds(-1 * _options.PendingMessageCheckingTimeOutSeconds),_options.PendingMessageCheckBatchCount); - - var cachedPushedMessage = await _messageBuffer.GetAll(m => m.MessageStatus == MessageStatus.Pushed); - - var finalToCheckedMessages = pendingMessages.Except(cachedPushedMessage, MessageIEqualityComparer.Default).ToList(); - if (_logger.IsEnabled(LogLevel.Debug)) - { - _logger.LogDebug($"PendingMessageCheckProcessor pendingMessages count:{pendingMessages.Count}"); - _logger.LogDebug($"PendingMessageCheckProcessor cachedPushedMessage count:{cachedPushedMessage.Count}"); - _logger.LogDebug($"PendingMessageCheckProcessor finalToCheckedMessages count:{finalToCheckedMessages.Count}"); - } - - finalToCheckedMessages.AsParallel().ForAll(async m => await Retry(m, now)); + await ProcessInternal(); } catch (Exception ex) { @@ -78,56 +61,21 @@ namespace Pole.ReliableMessage.Processor await Task.Delay(_options.PendingMessageRetryInterval * 1000); } } - - private async Task Retry(Message message, DateTime retryTime) + public async Task ProcessInternal() { - try - { - await Task.CompletedTask; - - if (_logger.IsEnabled(LogLevel.Debug)) - { - _logger.LogDebug($"PendingMessageCheckProcessor.Retry ,message:{message.Id} begin Retry"); - } - var nextRetryDelay = _retryTimeDelayCalculator.Get(message.RetryTimes, _options.MaxPendingMessageRetryDelay); - message.NextRetryUTCTime = retryTime.AddSeconds(nextRetryDelay); + var now = _timeHelper.GetUTCNow(); + var pendingMessages = await _storage.GetMany(m => m.MessageStatusId == MessageStatus.Pending.Id && m.NextRetryUTCTime <= now && m.AddedUTCTime <= now.AddSeconds(-1 * _options.PendingMessageFirstProcessingWaitTime) && m.AddedUTCTime >= now.AddSeconds(-1 * _options.PendingMessageCheckingTimeOutSeconds), _options.PendingMessageCheckBatchCount); - if (retryTime > message.AddedUTCTime.AddSeconds(_options.PendingMessageTimeOut)) - { - if (_logger.IsEnabled(LogLevel.Debug)) - { - _logger.LogDebug($"PendingMessageCheckProcessor.Retry ,message:{message.Id} would be Canced ,PendingMessageTimeOut:{_options.PendingMessageTimeOut}"); - } - - message.NextRetryUTCTime = DateTime.MinValue; - message.MessageStatus = MessageStatus.Canced; - await _messageBuffer.Add(message); - return; - } - message.RetryTimes++; - - var messageCheckerResult = await _messageChecker.GetResult(message); - if (messageCheckerResult.IsFinished) - { - if (_logger.IsEnabled(LogLevel.Debug)) - { - _logger.LogDebug($"PendingMessageCheckProcessor.Retry ,message:{message.Id} would be Pushed"); - } - message.MessageStatus = MessageStatus.Pushed; - await _messageBus.Publish(messageCheckerResult.Event, message.Id); - } - else - { - if (_logger.IsEnabled(LogLevel.Debug)) - { - _logger.LogDebug($"PendingMessageCheckProcessor.Retry ,message:{message.Id} would be Retry next time"); - } - } - await _messageBuffer.Add(message); + if (_logger.IsEnabled(LogLevel.Debug)) + { + _logger.LogDebug($"PendingMessageCheckProcessor pendingMessages count:{pendingMessages.Count}"); } - catch (Exception ex) + + await _messageCheckRetryer.Execute(pendingMessages, now); + // 说明此时 消息数量超过 批量获取数 + if (pendingMessages.Count == _options.PendingMessageCheckBatchCount) { - _logger.LogError(ex, $"PendingMessageCheckProcessor Retry ,Message:{message.Id} retry with errors"); + await ProcessInternal(); } } } diff --git a/src/Pole.ReliableMessage/ReliableMessageOption.cs b/src/Pole.ReliableMessage/ReliableMessageOption.cs index 0c3cfb4..fea3d98 100644 --- a/src/Pole.ReliableMessage/ReliableMessageOption.cs +++ b/src/Pole.ReliableMessage/ReliableMessageOption.cs @@ -25,7 +25,7 @@ namespace Pole.ReliableMessage /// /// 预发送消息超时时间 单位 秒 /// - public int PendingMessageTimeOut { get; set; } = 10*60; + public int PendingMessageTimeOut { get; set; } = 10 * 60; /// /// 预发送消息检查时每一次获取的消息数量 @@ -35,7 +35,7 @@ namespace Pole.ReliableMessage /// /// 预发送消息状态检查最后时间 单位 秒 /// - public int PendingMessageCheckingTimeOutSeconds { get; set; } = 13*60; + public int PendingMessageCheckingTimeOutSeconds { get; set; } = 13 * 60; /// /// 已发送的消息缓冲区 flush to storage 的时间间隔 单位 秒 @@ -46,7 +46,7 @@ namespace Pole.ReliableMessage /// /// PendingMessage 第一次处理等待时间 单位 秒 /// - public int PendingMessageFirstProcessingWaitTime { get; set; } = 2+10; + public int PendingMessageFirstProcessingWaitTime { get; set; } = 2 + 10; /// /// 每次重试之间最大间隔 单位 秒 @@ -61,14 +61,19 @@ namespace Pole.ReliableMessage /// /// PendingMessageCheck 实例存活超时时间 单位 秒 /// - public int PendingMessageCheckerInstanceIAnAliveTimeout { get; set; } = 3*10; + public int PendingMessageCheckerInstanceIAnAliveTimeout { get; set; } = 3 * 10; + + /// + /// Message 定期清理时间间隔 单位 秒 + /// + public int MessageCleanInterval { get; set; } = 30 * 60; /// /// 当主机有多个网络时通过指定网关地址找到合适的服务ip地址 /// public string NetworkInterfaceGatewayAddress { get; set; } = string.Empty; - public ReliableMessageOption AddEventAssemblies (params Assembly [] assemblies) + public ReliableMessageOption AddEventAssemblies(params Assembly[] assemblies) { EventCallbackAssemblies = assemblies.ToList(); return this; @@ -89,5 +94,4 @@ namespace Pole.ReliableMessage return this; } } - } -- libgit2 0.25.0