Commit 779a2f18 by 丁松杰

优化 可靠消息

parent eb350c31
Showing with 208 additions and 325 deletions
......@@ -27,20 +27,11 @@ namespace Pole.Domain
return Name;
}
public static IEnumerable<T> GetAll<T>() where T : Enumeration, new()
public static IEnumerable<T> GetAll<T>() where T : Enumeration
{
var type = typeof(T);
var fields = type.GetTypeInfo().GetFields(BindingFlags.Public | BindingFlags.Static | BindingFlags.DeclaredOnly);
var fields = typeof(T).GetFields(BindingFlags.Public | BindingFlags.Static | BindingFlags.DeclaredOnly);
foreach (var info in fields)
{
var instance = new T();
if (info.GetValue(instance) is T locatedValue)
{
yield return locatedValue;
}
}
return fields.Select(f => f.GetValue(null)).Cast<T>();
}
public override bool Equals(object obj)
......@@ -69,19 +60,19 @@ namespace Pole.Domain
return absoluteDifference;
}
public static T FromValue<T>(int value) where T : Enumeration, new()
public static T FromValue<T>(int value) where T : Enumeration
{
var matchingItem = Parse<T, int>(value, "value", item => item.Id == value);
return matchingItem;
}
public static T FromDisplayName<T>(string displayName) where T : Enumeration, new()
public static T FromDisplayName<T>(string displayName) where T : Enumeration
{
var matchingItem = Parse<T, string>(displayName, "display name", item => item.Name == displayName);
return matchingItem;
}
private static T Parse<T, K>(K value, string description, Func<T, bool> predicate) where T : Enumeration, new()
private static T Parse<T, K>(K value, string description, Func<T, bool> predicate) where T : Enumeration
{
var matchingItem = GetAll<T>().FirstOrDefault(predicate);
......
using Pole.ReliableMessage.Core;
using Pole.ReliableMessage.EventBus;
using Pole.ReliableMessage.EventBus;
using Pole.ReliableMessage.Masstransit.Abstraction;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using Pole.Domain;
namespace Pole.ReliableMessage.Masstransit
{
......
......@@ -8,11 +8,12 @@ using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using Pole.ReliableMessage.Storage.Abstraction;
namespace Pole.ReliableMessage.Masstransit
{
public abstract class ReliableEventHandler<TEvent> : IReliableEventHandler<TEvent>,IConsumer<TEvent>
public abstract class ReliableEventHandler<TEvent> : IReliableEventHandler<TEvent>, IConsumer<TEvent>
where TEvent : class
{
private readonly IMessageStorage _messageStorage;
......@@ -20,8 +21,8 @@ namespace Pole.ReliableMessage.Masstransit
private readonly IServiceProvider _serviceProvider;
public ReliableEventHandler(IServiceProvider serviceProvider)
{
_messageStorage = serviceProvider.GetRequiredService(typeof(IMessageStorage)) as IMessageStorage;
var loggerFactory = serviceProvider.GetRequiredService(typeof(ILoggerFactory)) as ILoggerFactory;
_messageStorage = serviceProvider.GetRequiredService<IMessageStorage>();
var loggerFactory = serviceProvider.GetRequiredService<ILoggerFactory>();
_logger = loggerFactory.CreateLogger<ReliableEventHandler<TEvent>>();
_serviceProvider = serviceProvider;
}
......
using Pole.ReliableMessage.Core;
using Pole.Domain;
using System;
using System.Collections.Generic;
using System.Text;
......
using Pole.ReliableMessage.Messaging;
using System;
using System.Collections.Generic;
using System.Linq.Expressions;
using System.Text;
using System.Threading.Tasks;
namespace Pole.ReliableMessage.Abstraction
namespace Pole.ReliableMessage.Storage.Abstraction
{
public interface IMessageStorage
{
......@@ -15,20 +14,17 @@ namespace Pole.ReliableMessage.Abstraction
/// <param name="message"></param>
/// <returns></returns>
Task<bool> Add(Message message);
///// <summary>
/////
///// </summary>
///// <param name="messageStatus"></param>
///// <param name="endRetryTime"></param>
///// <returns></returns>
//Task<Message> Get(Expression<Func<Message, bool>> filter);
Task<long> Delete(Expression<Func<Message, bool>> filter);
/// <summary>
///
/// </summary>
/// <param name="messageStatus"></param>
/// <param name="endRetryTime"></param>
/// <returns></returns>
Task<List<Message>> GetMany(Expression<Func<Message,bool>> filter, int count);
Task<List<Message>> GetMany(Expression<Func<Message, bool>> filter, int count);
/// <summary>
/// 批量更新
/// 更新这几个值 MessageStatusId , RetryTimes LastRetryUTCTime, NextRetryUTCTime
......@@ -37,23 +33,13 @@ namespace Pole.ReliableMessage.Abstraction
/// <returns></returns>
Task<bool> Save(IEnumerable<Message> messages);
Task<bool> UpdateStatus(IEnumerable<Message> messages);
///// <summary>
/////
///// </summary>
///// <param name="id">这里id 永远为 string </param>
///// <param name="messageStatus"></param>
///// <returns></returns>
//Task<bool> Update(Expression<Func<Message, bool>> filter, MessageStatus messageStatus);
/// <summary>
/// 检查 消息的状态,如果不是指定状态则返回true,并且更新状态到指定状态 ,如果已经是指定状态返回false
/// </summary>
/// <param name="id"> </param>
/// <param name="messageStatus"></param>
/// <returns></returns>
Task<bool> CheckAndUpdateStatus(Expression<Func<Message,bool>> filter, MessageStatus messageStatus);
Task<bool> CheckAndUpdateStatus(Expression<Func<Message, bool>> filter, MessageStatus messageStatus);
Task<bool> UpdateStatus(Expression<Func<Message, bool>> filter, MessageStatus messageStatus);
}
......
......@@ -2,7 +2,7 @@
using System.Collections.Generic;
using System.Text;
namespace Pole.ReliableMessage
namespace Pole.ReliableMessage.Storage.Abstraction
{
public class MemberShipTable
{
......
......@@ -2,7 +2,7 @@ using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.ReliableMessage.Messaging
namespace Pole.ReliableMessage.Storage.Abstraction
{
public class Message : IComparable<Message>
{
......
using Pole.ReliableMessage.Core;
using Pole.Domain;
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.ReliableMessage.Messaging
namespace Pole.ReliableMessage.Storage.Abstraction
{
public class MessageStatus : Enumeration
{
......
......@@ -4,4 +4,8 @@
<TargetFramework>netstandard2.0</TargetFramework>
</PropertyGroup>
<ItemGroup>
<ProjectReference Include="..\Pole.Domain\Pole.Domain.csproj" />
</ItemGroup>
</Project>
......@@ -8,16 +8,6 @@ namespace Pole.ReliableMessage.Storage.Mongodb
{
public string MessageDatabaseName { get; set; } = "ReliableMessage";
public string MembershipCollectionName { get; set; } = "Membership";
/// <summary>
/// bucket 中最大消息数 一旦达到最大数量 后面的数据将覆盖前面的数据
/// </summary>
public long CollectionMaxMessageCount { get; set; } = 20000000;
/// <summary>
/// 默认最大为10G
/// </summary>
public long CollectionMaxSize { get; set; } = 10*1024*1024*1024L;
public string ServiceCollectionName { get; set; }
public MongoHost[] Servers { get; set; }
}
......
......@@ -10,6 +10,8 @@ using System.Linq;
using System.Linq.Expressions;
using System.Text;
using System.Threading.Tasks;
using Pole.ReliableMessage.Storage.Abstraction;
using Pole.Domain;
namespace Pole.ReliableMessage.Storage.Mongodb
{
......@@ -56,14 +58,14 @@ namespace Pole.ReliableMessage.Storage.Mongodb
return true;
}
public async Task<List<Message>> GetMany(Expression<Func<Message, bool>> filter,int count)
public async Task<List<Message>> GetMany(Expression<Func<Message, bool>> filter, int count)
{
IMongoCollection<Message> collection = GetCollection();
var list= await collection.Find(filter).Limit(count).ToListAsync();
var list = await collection.Find(filter).Limit(count).ToListAsync();
list.ForEach(m =>
{
m.MessageStatus = Core.Enumeration.FromValue<MessageStatus>(m.MessageStatusId);
m.MessageStatus = Enumeration.FromValue<MessageStatus>(m.MessageStatusId);
});
return list;
}
......@@ -98,43 +100,21 @@ namespace Pole.ReliableMessage.Storage.Mongodb
return result.IsAcknowledged;
}
public async Task<bool> UpdateStatus(IEnumerable<Message> messages)
public async Task<bool> UpdateStatus(Expression<Func<Message, bool>> filter, MessageStatus messageStatus)
{
var count = messages.Count();
_logger.LogDebug($"MongodbMessageStorage updateStatus begin, Messages count: {messages.Count()}");
if (count == 0)
{
_logger.LogDebug($"MongodbMessageStorage updateStatus successfully, Modified count: 0");
return true;
}
IMongoCollection<Message> collection = GetCollection();
var models = new List<WriteModel<Message>>();
foreach (var message in messages)
{
FilterDefinition<Message> filter = Builders<Message>.Filter.Where(m => m.Id == message.Id&&m.MessageStatusId!=MessageStatus.Handed.Id);
UpdateDefinition<Message> update = Builders<Message>.Update
.Set(m => m.MessageStatusId, message.MessageStatus.Id);
var model = new UpdateOneModel<Message>(filter, update);
models.Add(model);
}
var result = await collection.BulkWriteAsync(models, new BulkWriteOptions { IsOrdered = false });
_logger.LogDebug($"MongodbMessageStorage updateStatus successfully, Modified count: {result.ModifiedCount}");
var update = Builders<Message>.Update.Set(m => m.MessageStatusId, messageStatus.Id);
var result = await collection.UpdateOneAsync(filter, update);
return result.IsAcknowledged;
}
public async Task<bool> UpdateStatus(Expression<Func<Message, bool>> filter, MessageStatus messageStatus)
public async Task<long> Delete(Expression<Func<Message, bool>> filter)
{
IMongoCollection<Message> collection = GetCollection();
var update = Builders<Message>.Update.Set(m => m.MessageStatusId, messageStatus.Id);
var result = await collection.UpdateOneAsync(filter, update);
return result.IsAcknowledged;
var result = await collection.DeleteManyAsync(filter);
return result.DeletedCount;
}
}
}
......@@ -60,13 +60,7 @@ namespace Microsoft.Extensions.DependencyInjection
if (!collectionNames.Contains(mongodbOption.ServiceCollectionName))
{
database.CreateCollection(mongodbOption.ServiceCollectionName, new CreateCollectionOptions
{
Capped = true,
MaxDocuments = mongodbOption.CollectionMaxMessageCount,
MaxSize = mongodbOption.CollectionMaxSize,
});
database.CreateCollection(mongodbOption.ServiceCollectionName);
var messageCollection = database.GetCollection<Message>(mongodbOption.ServiceCollectionName);
AddMessageCollectionIndex(messageCollection);
}
......@@ -74,7 +68,6 @@ namespace Microsoft.Extensions.DependencyInjection
if (!collectionNames.Contains(mongodbOption.MembershipCollectionName))
{
database.CreateCollection(mongodbOption.MembershipCollectionName);
var membershipCollection = database.GetCollection<MemberShipTable>(mongodbOption.MembershipCollectionName);
AddMemberShipTableCollectionIndex(membershipCollection);
}
......
using Pole.ReliableMessage.Messaging;
using Pole.ReliableMessage.Storage.Abstraction;
using System;
using System.Collections.Generic;
using System.Linq.Expressions;
using System.Text;
using System.Threading.Tasks;
namespace Pole.ReliableMessage.Abstraction
{
public interface IMessageBuffer
public interface IMessageCheckRetryer
{
Task Flush();
Task<bool> Add(Message message);
Task<List<Message>> GetAll(Func<Message,bool> filter);
Task Execute(IEnumerable<Message> messages, DateTime dateTime);
}
}
using Pole.ReliableMessage.Messaging;
using Pole.ReliableMessage.Storage.Abstraction;
using System;
using System.Collections.Generic;
using System.Text;
......
......@@ -28,7 +28,6 @@ namespace Microsoft.Extensions.DependencyInjection
services.Configure(optionConfig);
services.AddSingleton<IJsonConverter, DefaultJsonConverter>();
services.AddSingleton<IMessageBuffer, DefaultMessageBuffer>();
services.AddSingleton<IRetryTimeDelayCalculator, DefaultRetryTimeDelayCalculator>();
services.AddSingleton<ITimeHelper, DefaulTimeHelper>();
services.AddSingleton<IApplicationBuilderConfigurator, DefaultApplicationBuilderConfigurator>();
......@@ -51,7 +50,7 @@ namespace Microsoft.Extensions.DependencyInjection
services.AddSingleton<IProcessor, MessageBufferFlushProcessor>();
services.AddSingleton<IProcessor, MessageCleanProcessor>();
services.AddSingleton<IProcessor, PendingMessageCheckProcessor>();
services.AddSingleton<IProcessor, PendingMessageServiceInstanceCheckProcessor>();
......
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Text;
namespace Pole.ReliableMessage.Core
{
public abstract class Enumeration : IComparable
{
public string Name { get; private set; }
public int Id { get; private set; }
protected Enumeration(int id, string name)
{
Id = id;
Name = name;
}
public override string ToString() => Name;
public static IEnumerable<T> GetAll<T>() where T : Enumeration
{
var fields = typeof(T).GetFields(BindingFlags.Public | BindingFlags.Static | BindingFlags.DeclaredOnly);
return fields.Select(f => f.GetValue(null)).Cast<T>();
}
public override bool Equals(object obj)
{
if (!(obj is Enumeration otherValue))
return false;
var typeMatches = GetType().Equals(obj.GetType());
var valueMatches = Id.Equals(otherValue.Id);
return typeMatches && valueMatches;
}
public override int GetHashCode() => Id.GetHashCode();
public static int AbsoluteDifference(Enumeration firstValue, Enumeration secondValue)
{
var absoluteDifference = Math.Abs(firstValue.Id - secondValue.Id);
return absoluteDifference;
}
public static T FromValue<T>(int value) where T : Enumeration
{
var matchingItem = Parse<T, int>(value, "value", item => item.Id == value);
return matchingItem;
}
public static T FromDisplayName<T>(string displayName) where T : Enumeration
{
var matchingItem = Parse<T, string>(displayName, "display name", item => item.Name == displayName);
return matchingItem;
}
private static T Parse<T, K>(K value, string description, Func<T, bool> predicate) where T : Enumeration
{
var matchingItem = GetAll<T>().FirstOrDefault(predicate);
if (matchingItem == null)
throw new InvalidOperationException($"'{value}' is not a valid {description} in {typeof(T)}");
return matchingItem;
}
public int CompareTo(object other) => Id.CompareTo(((Enumeration)other).Id);
}
}
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Pole.ReliableMessage.Abstraction;
using Pole.ReliableMessage.Storage.Abstraction;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace Pole.ReliableMessage
{
class DefaultMessageCheckRetryer : IMessageCheckRetryer
{
private readonly ILogger _logger;
private readonly IRetryTimeDelayCalculator _retryTimeDelayCalculator;
private readonly ReliableMessageOption _options;
private readonly IMessageStorage _storage;
private readonly IMessageChecker _messageChecker;
private readonly IMessageBus _messageBus;
private readonly List<Message> _changedMessage = new List<Message>();
public DefaultMessageCheckRetryer(ILogger<DefaultMessageCheckRetryer> logger, IRetryTimeDelayCalculator retryTimeDelayCalculator, IOptions<ReliableMessageOption> options, IMessageStorage storage, IMessageChecker messageChecker, IMessageBus messageBus)
{
_logger = logger;
_retryTimeDelayCalculator = retryTimeDelayCalculator;
_options = options.Value ?? throw new Exception($"{nameof(ReliableMessageOption)} Must be injected");
_storage = storage;
_messageChecker = messageChecker;
_messageBus = messageBus;
}
public async Task Execute(IEnumerable<Message> messages, DateTime dateTime)
{
try
{
messages.AsParallel().ForAll(async m => await Retry(m, dateTime));
if (_changedMessage.Count != 0)
{
await _storage.Save(_changedMessage);
}
}
catch (Exception ex)
{
_logger.LogError(ex, $"DefaultMessageCheckRetryer.Execute ,Execute with errors");
}
finally
{
if (_changedMessage.Count != 0)
{
_changedMessage.Clear();
}
}
}
private async Task Retry(Message message, DateTime retryTime)
{
try
{
if (_logger.IsEnabled(LogLevel.Debug))
{
_logger.LogDebug($"DefaultMessageCheckRetryer.Retry ,message:{message.Id} begin Retry");
}
var nextRetryDelay = _retryTimeDelayCalculator.Get(message.RetryTimes, _options.MaxPendingMessageRetryDelay);
message.NextRetryUTCTime = retryTime.AddSeconds(nextRetryDelay);
if (retryTime > message.AddedUTCTime.AddSeconds(_options.PendingMessageTimeOut))
{
if (_logger.IsEnabled(LogLevel.Debug))
{
_logger.LogDebug($"DefaultMessageCheckRetryer.Retry ,message:{message.Id} would be Canced ,PendingMessageTimeOut:{_options.PendingMessageTimeOut}");
}
message.NextRetryUTCTime = DateTime.MinValue;
message.MessageStatus = MessageStatus.Canced;
_changedMessage.Add(message);
return;
}
message.RetryTimes++;
var messageCheckerResult = await _messageChecker.GetResult(message);
if (messageCheckerResult.IsFinished)
{
if (_logger.IsEnabled(LogLevel.Debug))
{
_logger.LogDebug($"DefaultMessageCheckRetryer.Retry ,message:{message.Id} would be Pushed");
}
message.MessageStatus = MessageStatus.Pushed;
await _messageBus.Publish(messageCheckerResult.Event, message.Id);
}
else
{
if (_logger.IsEnabled(LogLevel.Debug))
{
_logger.LogDebug($"DefaultMessageCheckRetryer.Retry ,message:{message.Id} would be Retry next time");
}
}
_changedMessage.Add(message);
}
catch (Exception ex)
{
_logger.LogError(ex, $"DefaultMessageCheckRetryer.Retry ,Message:{message.Id} retry with errors");
}
}
}
}
using Pole.ReliableMessage.Abstraction;
using Pole.ReliableMessage.Messaging;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
......@@ -7,6 +6,7 @@ using System.Text;
using System.Threading;
using System.Threading.Tasks;
using ILogger = Microsoft.Extensions.Logging.ILogger;
using Pole.ReliableMessage.Storage.Abstraction;
namespace Pole.Pole.ReliableMessage.EventBus
{
......@@ -16,18 +16,17 @@ namespace Pole.Pole.ReliableMessage.EventBus
private readonly IMessageStorage _messageStorage;
private readonly IMessageIdGenerator _messageIdGenerator;
private readonly ITimeHelper _timeHelper;
private readonly IMessageBuffer _messageBuffer;
//private readonly IMessageBuffer _messageBuffer;
private readonly ILogger _logger;
private readonly IJsonConverter _jsonConverter;
private readonly IMessageCallBackInfoStore _messageCallBackInfoStore;
private readonly IMessageTypeIdGenerator _messageTypeIdGenerator;
public DefaultReliableBus(IMessageBus messageBus, IMessageStorage messageStorage, IMessageIdGenerator messageIdGenerator, ITimeHelper timeHelper, IMessageBuffer messageBuffer, ILogger<DefaultReliableBus> logger, IJsonConverter jsonConverter, IMessageCallBackInfoStore messageCallBackInfoStore, IMessageTypeIdGenerator messageTypeIdGenerator)
public DefaultReliableBus(IMessageBus messageBus, IMessageStorage messageStorage, IMessageIdGenerator messageIdGenerator, ITimeHelper timeHelper, ILogger<DefaultReliableBus> logger, IJsonConverter jsonConverter, IMessageCallBackInfoStore messageCallBackInfoStore, IMessageTypeIdGenerator messageTypeIdGenerator)
{
_messageBus = messageBus;
_messageStorage = messageStorage;
_messageIdGenerator = messageIdGenerator;
_timeHelper = timeHelper;
_messageBuffer = messageBuffer;
_logger = logger;
_jsonConverter = jsonConverter;
_messageCallBackInfoStore = messageCallBackInfoStore;
......@@ -97,8 +96,8 @@ namespace Pole.Pole.ReliableMessage.EventBus
{
await _messageBus.Publish(@event, prePublishMessageId, cancellationToken);
var messageBufferResult = await _messageBuffer.Add(new Message { Id = prePublishMessageId, MessageStatus = MessageStatus.Pushed });
return true;
var messageBufferResult = await _messageStorage.UpdateStatus(m => m.Id == prePublishMessageId && m.MessageStatusId == MessageStatus.Pending.Id, MessageStatus.Pushed);
return messageBufferResult;
}
catch (Exception ex)
{
......
using Pole.ReliableMessage.Core;
using Pole.Domain;
using System;
using System.Collections.Generic;
using System.Text;
......
using Pole.ReliableMessage.Abstraction;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Text;
using System.Threading.Tasks;
namespace Pole.ReliableMessage.Messaging
{
class DefaultMessageBuffer : IMessageBuffer
{
private readonly IMessageStorage _storage;
private readonly System.Collections.Concurrent.ConcurrentDictionary<string,Message> Messages = new System.Collections.Concurrent.ConcurrentDictionary<string, Message>();
private readonly ILogger<DefaultMessageBuffer> _logger;
public DefaultMessageBuffer(IMessageStorage storage, ILogger<DefaultMessageBuffer> logger)
{
_storage = storage;
_logger = logger;
}
public async Task Flush()
{
/// 通过 MessageTypeId 是否为空 判断 消息是否为 DefaultReliableBus Publish 完成后的消息状态修改缓冲,
var toUpdateStatusMessageKeyValuePairs = Messages.Where(m => string.IsNullOrEmpty(m.Value.MessageTypeId));
var toUpdateStatusMessages= toUpdateStatusMessageKeyValuePairs.Select(m=>m.Value).ToArray();
var toUpdateStatusMessageIds = toUpdateStatusMessageKeyValuePairs.Select(m => m.Key).ToList();
await _storage.UpdateStatus(toUpdateStatusMessages);
_logger.LogDebug($"DefaultMessageBuffer.Flush update successfully, Message count{toUpdateStatusMessages.Count()}");
toUpdateStatusMessageIds.ForEach(m => {
Messages.TryRemove(m,out Message message);
});
var toSavedMessageKeyValuePairs = Messages.Where(m => !string.IsNullOrEmpty(m.Value.MessageTypeId));
var toSavedMessages = toSavedMessageKeyValuePairs.Select(m => m.Value).ToArray();
var toSavedMessagesIds = toSavedMessageKeyValuePairs.Select(m => m.Key).ToList();
await _storage.Save(toSavedMessages);
_logger.LogDebug($"DefaultMessageBuffer.Flush save successfully, Message count{toSavedMessages.Count()}");
toSavedMessagesIds.ForEach(m => {
Messages.TryRemove(m, out Message message);
});
}
public Task<bool> Add(Message message)
{
Messages.TryAdd(message.Id, message);
return Task.FromResult(true);
}
public async Task<List<Message>> GetAll(Func<Message, bool> filter)
{
await Task.CompletedTask;
return Messages.Values.Where(filter).ToList();
}
}
}
......@@ -7,6 +7,7 @@ using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using Pole.ReliableMessage.Storage.Abstraction;
namespace Pole.ReliableMessage.Messaging
{
......
......@@ -6,35 +6,52 @@ using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using Pole.ReliableMessage.Storage.Abstraction;
namespace Pole.ReliableMessage.Processor
{
public class MessageBufferFlushProcessor : ProcessorBase
public class MessageCleanProcessor : ProcessorBase
{
private readonly IMessageBuffer _messageBuffer;
private readonly ReliableMessageOption _options;
private readonly ILogger<MessageBufferFlushProcessor> _logger;
public MessageBufferFlushProcessor(IMessageBuffer messageBuffer, IOptions<ReliableMessageOption> options, ILogger<MessageBufferFlushProcessor> logger)
private readonly ILogger<MessageCleanProcessor> _logger;
private readonly IMessageStorage _messageStorage;
private readonly IMemberShipTable _memberShipTable;
private readonly IServiceIPv4AddressProvider _serviceIPv4AddressProvider;
public MessageCleanProcessor(IOptions<ReliableMessageOption> options, ILogger<MessageCleanProcessor> logger, IMessageStorage messageStorage, IMemberShipTable memberShipTable, IServiceIPv4AddressProvider serviceIPv4AddressProvider)
{
_messageBuffer = messageBuffer;
_options = options.Value ?? throw new Exception($"{nameof(ReliableMessageOption)} Must be injected");
_logger = logger;
_messageStorage = messageStorage;
_memberShipTable = memberShipTable;
_serviceIPv4AddressProvider = serviceIPv4AddressProvider;
}
public override string Name => nameof(PendingMessageCheckProcessor);
public override async Task Process(ProcessingContext context)
{
try
{
await _messageBuffer.Flush();
var iPStr = _serviceIPv4AddressProvider.Get();
var isPendingChecker = await _memberShipTable.IsPendingMessageCheckerServiceInstance(iPStr);// 这里可以把时间加上去
if (!isPendingChecker)
{
_logger.LogDebug("I an not the pendingChecker ,Ignore clean up messages");
return;
}
_logger.LogInformation($"Begin clean message");
var deletedCount = await _messageStorage.Delete(m => m.MessageStatusId == MessageStatus.Canced.Id || m.MessageStatusId == MessageStatus.Handed.Id);
_logger.LogInformation($"End clean message ,delete message count : {deletedCount} , successfully");
}
catch(Exception ex)
catch (Exception ex)
{
_logger.LogError(ex, $"MessageBufferFlushProcessor Process Error");
_logger.LogError(ex, $"Clean message error");
}
finally
{
await Task.Delay(_options.PushedMessageFlushInterval * 1000);
await Task.Delay(_options.MessageCleanInterval * 1000);
}
}
}
......
......@@ -17,26 +17,22 @@ namespace Pole.ReliableMessage.Processor
{
private readonly IMessageStorage _storage;
private readonly ReliableMessageOption _options;
private readonly IMessageBuffer _messageBuffer;
//private readonly IMessageBuffer _messageBuffer;
private readonly ITimeHelper _timeHelper;
private readonly IMessageChecker _messageChecker;
private readonly IRetryTimeDelayCalculator _retryTimeDelayCalculator;
private readonly IMemberShipTable _memberShipTable;
private readonly ILogger<PendingMessageCheckProcessor> _logger;
private readonly IServiceIPv4AddressProvider _serviceIPv4AddressProvider;
private readonly IMessageBus _messageBus;
public PendingMessageCheckProcessor(IMessageStorage storage, IOptions<ReliableMessageOption> options, IMessageBuffer messageBuffer, ITimeHelper timeHelper, IMessageChecker messageChecker, IRetryTimeDelayCalculator retryTimeDelayCalculator, IMemberShipTable memberShipTable, ILogger<PendingMessageCheckProcessor> logger, IServiceIPv4AddressProvider serviceIPv4AddressProvider, IMessageBus messageBus)
private readonly IMessageCheckRetryer _messageCheckRetryer;
public PendingMessageCheckProcessor(IMessageStorage storage, IOptions<ReliableMessageOption> options, ITimeHelper timeHelper, IMemberShipTable memberShipTable, ILogger<PendingMessageCheckProcessor> logger, IServiceIPv4AddressProvider serviceIPv4AddressProvider, IMessageCheckRetryer messageCheckRetryer)
{
_storage = storage;
_options = options.Value ?? throw new Exception($"{nameof(ReliableMessageOption)} Must be injected");
_messageBuffer = messageBuffer;
//_messageBuffer = messageBuffer;
_timeHelper = timeHelper;
_messageChecker = messageChecker;
_retryTimeDelayCalculator = retryTimeDelayCalculator;
_memberShipTable = memberShipTable;
_logger = logger;
_serviceIPv4AddressProvider = serviceIPv4AddressProvider;
_messageBus = messageBus;
_messageCheckRetryer = messageCheckRetryer;
}
public override string Name => nameof(PendingMessageCheckProcessor);
......@@ -54,20 +50,7 @@ namespace Pole.ReliableMessage.Processor
return;
}
var now = _timeHelper.GetUTCNow();
var pendingMessages = await _storage.GetMany(m => m.MessageStatusId == MessageStatus.Pending.Id &&m.NextRetryUTCTime <= now && m.AddedUTCTime <= now.AddSeconds(-1 * _options.PendingMessageFirstProcessingWaitTime)&&m.AddedUTCTime>= now.AddSeconds(-1 * _options.PendingMessageCheckingTimeOutSeconds),_options.PendingMessageCheckBatchCount);
var cachedPushedMessage = await _messageBuffer.GetAll(m => m.MessageStatus == MessageStatus.Pushed);
var finalToCheckedMessages = pendingMessages.Except(cachedPushedMessage, MessageIEqualityComparer.Default).ToList();
if (_logger.IsEnabled(LogLevel.Debug))
{
_logger.LogDebug($"PendingMessageCheckProcessor pendingMessages count:{pendingMessages.Count}");
_logger.LogDebug($"PendingMessageCheckProcessor cachedPushedMessage count:{cachedPushedMessage.Count}");
_logger.LogDebug($"PendingMessageCheckProcessor finalToCheckedMessages count:{finalToCheckedMessages.Count}");
}
finalToCheckedMessages.AsParallel().ForAll(async m => await Retry(m, now));
await ProcessInternal();
}
catch (Exception ex)
{
......@@ -78,56 +61,21 @@ namespace Pole.ReliableMessage.Processor
await Task.Delay(_options.PendingMessageRetryInterval * 1000);
}
}
private async Task Retry(Message message, DateTime retryTime)
public async Task ProcessInternal()
{
try
{
await Task.CompletedTask;
if (_logger.IsEnabled(LogLevel.Debug))
{
_logger.LogDebug($"PendingMessageCheckProcessor.Retry ,message:{message.Id} begin Retry");
}
var nextRetryDelay = _retryTimeDelayCalculator.Get(message.RetryTimes, _options.MaxPendingMessageRetryDelay);
message.NextRetryUTCTime = retryTime.AddSeconds(nextRetryDelay);
var now = _timeHelper.GetUTCNow();
var pendingMessages = await _storage.GetMany(m => m.MessageStatusId == MessageStatus.Pending.Id && m.NextRetryUTCTime <= now && m.AddedUTCTime <= now.AddSeconds(-1 * _options.PendingMessageFirstProcessingWaitTime) && m.AddedUTCTime >= now.AddSeconds(-1 * _options.PendingMessageCheckingTimeOutSeconds), _options.PendingMessageCheckBatchCount);
if (retryTime > message.AddedUTCTime.AddSeconds(_options.PendingMessageTimeOut))
{
if (_logger.IsEnabled(LogLevel.Debug))
{
_logger.LogDebug($"PendingMessageCheckProcessor.Retry ,message:{message.Id} would be Canced ,PendingMessageTimeOut:{_options.PendingMessageTimeOut}");
}
message.NextRetryUTCTime = DateTime.MinValue;
message.MessageStatus = MessageStatus.Canced;
await _messageBuffer.Add(message);
return;
}
message.RetryTimes++;
var messageCheckerResult = await _messageChecker.GetResult(message);
if (messageCheckerResult.IsFinished)
{
if (_logger.IsEnabled(LogLevel.Debug))
{
_logger.LogDebug($"PendingMessageCheckProcessor.Retry ,message:{message.Id} would be Pushed");
}
message.MessageStatus = MessageStatus.Pushed;
await _messageBus.Publish(messageCheckerResult.Event, message.Id);
}
else
{
if (_logger.IsEnabled(LogLevel.Debug))
{
_logger.LogDebug($"PendingMessageCheckProcessor.Retry ,message:{message.Id} would be Retry next time");
}
}
await _messageBuffer.Add(message);
if (_logger.IsEnabled(LogLevel.Debug))
{
_logger.LogDebug($"PendingMessageCheckProcessor pendingMessages count:{pendingMessages.Count}");
}
catch (Exception ex)
await _messageCheckRetryer.Execute(pendingMessages, now);
// 说明此时 消息数量超过 批量获取数
if (pendingMessages.Count == _options.PendingMessageCheckBatchCount)
{
_logger.LogError(ex, $"PendingMessageCheckProcessor Retry ,Message:{message.Id} retry with errors");
await ProcessInternal();
}
}
}
......
......@@ -25,7 +25,7 @@ namespace Pole.ReliableMessage
/// <summary>
/// 预发送消息超时时间 单位 秒
/// </summary>
public int PendingMessageTimeOut { get; set; } = 10*60;
public int PendingMessageTimeOut { get; set; } = 10 * 60;
/// <summary>
/// 预发送消息检查时每一次获取的消息数量
......@@ -35,7 +35,7 @@ namespace Pole.ReliableMessage
/// <summary>
/// 预发送消息状态检查最后时间 单位 秒
/// </summary>
public int PendingMessageCheckingTimeOutSeconds { get; set; } = 13*60;
public int PendingMessageCheckingTimeOutSeconds { get; set; } = 13 * 60;
/// <summary>
/// 已发送的消息缓冲区 flush to storage 的时间间隔 单位 秒
......@@ -46,7 +46,7 @@ namespace Pole.ReliableMessage
/// <summary>
/// PendingMessage 第一次处理等待时间 单位 秒
/// </summary>
public int PendingMessageFirstProcessingWaitTime { get; set; } = 2+10;
public int PendingMessageFirstProcessingWaitTime { get; set; } = 2 + 10;
/// <summary>
/// 每次重试之间最大间隔 单位 秒
......@@ -61,14 +61,19 @@ namespace Pole.ReliableMessage
/// <summary>
/// PendingMessageCheck 实例存活超时时间 单位 秒
/// </summary>
public int PendingMessageCheckerInstanceIAnAliveTimeout { get; set; } = 3*10;
public int PendingMessageCheckerInstanceIAnAliveTimeout { get; set; } = 3 * 10;
/// <summary>
/// Message 定期清理时间间隔 单位 秒
/// </summary>
public int MessageCleanInterval { get; set; } = 30 * 60;
/// <summary>
/// 当主机有多个网络时通过指定网关地址找到合适的服务ip地址
/// </summary>
public string NetworkInterfaceGatewayAddress { get; set; } = string.Empty;
public ReliableMessageOption AddEventAssemblies (params Assembly [] assemblies)
public ReliableMessageOption AddEventAssemblies(params Assembly[] assemblies)
{
EventCallbackAssemblies = assemblies.ToList();
return this;
......@@ -89,5 +94,4 @@ namespace Pole.ReliableMessage
return this;
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment