Commit eb350c31 by 丁松杰

添加 可靠消息 组件

parent 1f23761b
Showing with 2869 additions and 6 deletions
......@@ -21,7 +21,23 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "samples", "samples", "{4A0F
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "apis", "apis", "{475116FC-DEEC-4255-94E4-AE7B8C85038D}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Product.Api", "samples\apis\Product.Api\Product.Api.csproj", "{6A68E63D-ED4B-4F46-9A2E-AA7FE2B0032E}"
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Pole.ReliableMessage", "src\Pole.ReliableMessage\Pole.ReliableMessage.csproj", "{699C75AB-4814-4E16-A3F3-9735C4C609FE}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Pole.ReliableMessage.Masstransit", "src\Pole.ReliableMessage.Masstransit\Pole.ReliableMessage.Masstransit.csproj", "{051BECA5-5E65-4FCB-9B7F-C9E64809E218}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Pole.ReliableMessage.Dashboard", "src\Pole.ReliableMessage.Dashboard\Pole.ReliableMessage.Dashboard.csproj", "{9EF13D70-C3AF-471A-8AA6-603338B194B7}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Pole.ReliableMessage.Storage.Abstraction", "src\Pole.ReliableMessage.Storage.Abstraction\Pole.ReliableMessage.Storage.Abstraction.csproj", "{3D92F460-350B-4614-A6F6-C00A2D0FA9E2}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Pole.ReliableMessage.Storage.Mongodb", "src\Pole.ReliableMessage.Storage.Mongodb\Pole.ReliableMessage.Storage.Mongodb.csproj", "{793C73C6-93DE-4A56-B979-137914B247F2}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Pole.ReliableMessage.Storage.EntityframeworkCore", "src\Pole.ReliableMessage.Storage.EntityframeworkCore\Pole.ReliableMessage.Storage.EntityframeworkCore.csproj", "{805CF4F7-CCDC-4390-A92B-55E0FFA7F659}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Grpc", "Grpc", "{629045E7-B047-452A-AADA-ACB455B4FAFD}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Rest", "Rest", "{BA122337-74D2-4439-A10E-B20E14881290}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "webs", "webs", "{452B9D9E-881E-4E0E-A90B-98F2253F20F1}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
......@@ -49,10 +65,30 @@ Global
{F40FE25F-6081-4B29-A7BD-CB5C24F6FDA8}.Debug|Any CPU.Build.0 = Debug|Any CPU
{F40FE25F-6081-4B29-A7BD-CB5C24F6FDA8}.Release|Any CPU.ActiveCfg = Release|Any CPU
{F40FE25F-6081-4B29-A7BD-CB5C24F6FDA8}.Release|Any CPU.Build.0 = Release|Any CPU
{6A68E63D-ED4B-4F46-9A2E-AA7FE2B0032E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{6A68E63D-ED4B-4F46-9A2E-AA7FE2B0032E}.Debug|Any CPU.Build.0 = Debug|Any CPU
{6A68E63D-ED4B-4F46-9A2E-AA7FE2B0032E}.Release|Any CPU.ActiveCfg = Release|Any CPU
{6A68E63D-ED4B-4F46-9A2E-AA7FE2B0032E}.Release|Any CPU.Build.0 = Release|Any CPU
{699C75AB-4814-4E16-A3F3-9735C4C609FE}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{699C75AB-4814-4E16-A3F3-9735C4C609FE}.Debug|Any CPU.Build.0 = Debug|Any CPU
{699C75AB-4814-4E16-A3F3-9735C4C609FE}.Release|Any CPU.ActiveCfg = Release|Any CPU
{699C75AB-4814-4E16-A3F3-9735C4C609FE}.Release|Any CPU.Build.0 = Release|Any CPU
{051BECA5-5E65-4FCB-9B7F-C9E64809E218}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{051BECA5-5E65-4FCB-9B7F-C9E64809E218}.Debug|Any CPU.Build.0 = Debug|Any CPU
{051BECA5-5E65-4FCB-9B7F-C9E64809E218}.Release|Any CPU.ActiveCfg = Release|Any CPU
{051BECA5-5E65-4FCB-9B7F-C9E64809E218}.Release|Any CPU.Build.0 = Release|Any CPU
{9EF13D70-C3AF-471A-8AA6-603338B194B7}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{9EF13D70-C3AF-471A-8AA6-603338B194B7}.Debug|Any CPU.Build.0 = Debug|Any CPU
{9EF13D70-C3AF-471A-8AA6-603338B194B7}.Release|Any CPU.ActiveCfg = Release|Any CPU
{9EF13D70-C3AF-471A-8AA6-603338B194B7}.Release|Any CPU.Build.0 = Release|Any CPU
{3D92F460-350B-4614-A6F6-C00A2D0FA9E2}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{3D92F460-350B-4614-A6F6-C00A2D0FA9E2}.Debug|Any CPU.Build.0 = Debug|Any CPU
{3D92F460-350B-4614-A6F6-C00A2D0FA9E2}.Release|Any CPU.ActiveCfg = Release|Any CPU
{3D92F460-350B-4614-A6F6-C00A2D0FA9E2}.Release|Any CPU.Build.0 = Release|Any CPU
{793C73C6-93DE-4A56-B979-137914B247F2}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{793C73C6-93DE-4A56-B979-137914B247F2}.Debug|Any CPU.Build.0 = Debug|Any CPU
{793C73C6-93DE-4A56-B979-137914B247F2}.Release|Any CPU.ActiveCfg = Release|Any CPU
{793C73C6-93DE-4A56-B979-137914B247F2}.Release|Any CPU.Build.0 = Release|Any CPU
{805CF4F7-CCDC-4390-A92B-55E0FFA7F659}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{805CF4F7-CCDC-4390-A92B-55E0FFA7F659}.Debug|Any CPU.Build.0 = Debug|Any CPU
{805CF4F7-CCDC-4390-A92B-55E0FFA7F659}.Release|Any CPU.ActiveCfg = Release|Any CPU
{805CF4F7-CCDC-4390-A92B-55E0FFA7F659}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
......@@ -64,7 +100,15 @@ Global
{1C26BE3A-CBEA-47D1-97A0-6DB4F41DFF5A} = {9932C965-8B38-4F70-9E43-86DC56860E2B}
{F40FE25F-6081-4B29-A7BD-CB5C24F6FDA8} = {9932C965-8B38-4F70-9E43-86DC56860E2B}
{475116FC-DEEC-4255-94E4-AE7B8C85038D} = {4A0FB696-EC29-4A5F-B40B-A6FC56001ADB}
{6A68E63D-ED4B-4F46-9A2E-AA7FE2B0032E} = {475116FC-DEEC-4255-94E4-AE7B8C85038D}
{699C75AB-4814-4E16-A3F3-9735C4C609FE} = {9932C965-8B38-4F70-9E43-86DC56860E2B}
{051BECA5-5E65-4FCB-9B7F-C9E64809E218} = {9932C965-8B38-4F70-9E43-86DC56860E2B}
{9EF13D70-C3AF-471A-8AA6-603338B194B7} = {9932C965-8B38-4F70-9E43-86DC56860E2B}
{3D92F460-350B-4614-A6F6-C00A2D0FA9E2} = {9932C965-8B38-4F70-9E43-86DC56860E2B}
{793C73C6-93DE-4A56-B979-137914B247F2} = {9932C965-8B38-4F70-9E43-86DC56860E2B}
{805CF4F7-CCDC-4390-A92B-55E0FFA7F659} = {9932C965-8B38-4F70-9E43-86DC56860E2B}
{629045E7-B047-452A-AADA-ACB455B4FAFD} = {475116FC-DEEC-4255-94E4-AE7B8C85038D}
{BA122337-74D2-4439-A10E-B20E14881290} = {475116FC-DEEC-4255-94E4-AE7B8C85038D}
{452B9D9E-881E-4E0E-A90B-98F2253F20F1} = {4A0FB696-EC29-4A5F-B40B-A6FC56001ADB}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {DB0775A3-F293-4043-ADB7-72BAC081E87E}
......
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
</PropertyGroup>
</Project>
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.ReliableMessage.Masstransit.Abstraction
{
public interface IReliableEvent
{
}
}
using Pole.ReliableMessage.EventBus;
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.ReliableMessage.Masstransit.Abstraction
{
public interface IReliableEventHandlerRegistrarFactory
{
MasstransitEventHandlerRegistrar Create(Type eventHandlerType);
}
}
using Pole.ReliableMessage.Abstraction;
using MassTransit;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
namespace Pole.ReliableMessage.Masstransit
{
public class DefaultReliableEventHandlerContext<TEvent> : IReliableEventHandlerContext<TEvent>
where TEvent : class
{
private readonly ConsumeContext<TEvent> _executeContext;
public DefaultReliableEventHandlerContext(ConsumeContext<TEvent> executeContext)
{
_executeContext = executeContext;
this.Event = executeContext.Message;
}
public TEvent Event { get; private set; }
}
}
using Pole.ReliableMessage.Core;
using Pole.ReliableMessage.EventBus;
using Pole.ReliableMessage.Masstransit.Abstraction;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace Pole.ReliableMessage.Masstransit
{
class DefaultReliableEventHandlerRegistrarFactory : IReliableEventHandlerRegistrarFactory
{
private readonly MasstransitRabbitmqOption _masstransitOptions;
public DefaultReliableEventHandlerRegistrarFactory(IOptions<MasstransitRabbitmqOption> masstransitOptions)
{
_masstransitOptions = masstransitOptions.Value ?? throw new ArgumentNullException(nameof(masstransitOptions));
}
public MasstransitEventHandlerRegistrar Create(Type eventHnadler)
{
if (!eventHnadler.Name.EndsWith("EventHandler"))
{
throw new Exception("EventHandler Name Must EndWith EventHandler");
}
var reliableEventHandlerParemeterAttribute = eventHnadler.GetCustomAttributes(typeof(ReliableEventHandlerParemeterAttribute), true).FirstOrDefault();
var eventHandlerName = GetQueueName(reliableEventHandlerParemeterAttribute, eventHnadler, _masstransitOptions.QueueNamePrefix);
var parentEventHandler = eventHnadler.BaseType;
var eventType = parentEventHandler.GetGenericArguments().ToList().FirstOrDefault();
ushort prefetchCount = GetPrefetchCount(eventHnadler, reliableEventHandlerParemeterAttribute);
MasstransitEventHandlerRegistrar eventHandlerRegisterInvoker = new MasstransitEventHandlerRegistrar(eventHandlerName, eventHnadler, eventType, _masstransitOptions.RetryConfigure, prefetchCount);
return eventHandlerRegisterInvoker;
}
private string GetQueueName(object reliableEventHandlerParemeterAttribute, Type eventHnadler, string queueNamePrefix)
{
var eventHandlerDefaultName = $"eventHandler-{ eventHnadler.Name.Replace("EventHandler", "").ToLowerInvariant()}";
var eventHandlerName = string.IsNullOrEmpty(queueNamePrefix) ? eventHandlerDefaultName : $"{queueNamePrefix}-{eventHandlerDefaultName}";
if (reliableEventHandlerParemeterAttribute != null)
{
var reliableEventHandlerParemeterAttributeType = reliableEventHandlerParemeterAttribute.GetType();
var prefetchCountPropertyInfo = reliableEventHandlerParemeterAttributeType.GetProperty(nameof(ReliableEventHandlerParemeterAttribute.QueueHaType));
var queueHaTypeValue = Convert.ToInt32(prefetchCountPropertyInfo.GetValue(reliableEventHandlerParemeterAttribute));
if (queueHaTypeValue != 0)
{
var currentQueueType = Enumeration.FromValue<QueueHaType>(queueHaTypeValue);
eventHandlerName = currentQueueType.GenerateQueueName(eventHandlerName);
}
}
return eventHandlerName;
}
private ushort GetPrefetchCount(Type eventHnadler, object reliableEventHandlerParemeterAttribute)
{
var prefetchCount = _masstransitOptions.PrefetchCount;
if (reliableEventHandlerParemeterAttribute != null)
{
var reliableEventHandlerParemeterAttributeType = reliableEventHandlerParemeterAttribute.GetType();
var prefetchCountPropertyInfo = reliableEventHandlerParemeterAttributeType.GetProperty(nameof(ReliableEventHandlerParemeterAttribute.PrefetchCount));
var prefetchCountValue = Convert.ToUInt16(prefetchCountPropertyInfo.GetValue(reliableEventHandlerParemeterAttribute));
if (prefetchCountValue != 0)
{
prefetchCount = prefetchCountValue;
}
}
return prefetchCount;
}
}
}
using MassTransit;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Pole.ReliableMessage.Masstransit
{
public class MassTransitHostedService : IHostedService
{
readonly IBusControl _bus;
readonly ILogger _logger;
public MassTransitHostedService(IBusControl bus, ILogger<MassTransitHostedService> logger)
{
_bus = bus;
_logger = logger;
}
public async Task StartAsync(CancellationToken cancellationToken)
{
await _bus.StartAsync();
_logger.LogInformation("MassTransit Bus Start Successful");
}
public async Task StopAsync(CancellationToken cancellationToken)
{
await _bus.StopAsync();
_logger.LogInformation("MassTransit Bus Stop Successful");
}
}
}
using Pole.ReliableMessage.Abstraction;
using Pole.ReliableMessage.Masstransit.Pipe;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Pole.ReliableMessage.Masstransit
{
class MasstransitBasedMessageBus : IMessageBus
{
public MasstransitBasedMessageBus(MassTransit.IBus bus)
{
_bus = bus;
}
private readonly MassTransit.IBus _bus;
public Task Publish(object @event,string reliableMessageId, CancellationToken cancellationToken = default(CancellationToken))
{
var pipe = new AddReliableMessageIdPipe(reliableMessageId);
return _bus.Publish(@event, pipe, cancellationToken);
}
}
}
using GreenPipes;
using GreenPipes.Configurators;
using MassTransit;
using MassTransit.ExtensionsDependencyInjectionIntegration;
using MassTransit.RabbitMqTransport;
using Microsoft.Extensions.DependencyInjection;
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.ReliableMessage.EventBus
{
public class MasstransitEventHandlerRegistrar
{
private readonly string _queueName;
private readonly Type _eventHandlerType;
private readonly Type _eventHandlerEventType;
private readonly Action<IRetryConfigurator> _retryConfigure;
public readonly ushort _prefetchCount;
public MasstransitEventHandlerRegistrar(string eventHandlerName, Type eventHandlerType, Type eventHandlerEventType, Action<IRetryConfigurator> retryConfigure, ushort prefetchCount)
{
_queueName = eventHandlerName;
_eventHandlerType = eventHandlerType;
_eventHandlerEventType = eventHandlerEventType;
_retryConfigure = retryConfigure;
_prefetchCount = prefetchCount;
}
public void RegisterEventHandler(IServiceCollectionConfigurator serviceCollectionConfigurator, IServiceCollection services)
{
serviceCollectionConfigurator.AddConsumer(_eventHandlerType);
}
public void RegisterQueue(IServiceCollectionConfigurator serviceCollectionConfigurator, IRabbitMqBusFactoryConfigurator rabbitMqBusFactoryConfigurator, IRabbitMqHost rabbitMqHost, IServiceProvider serviceProvider)
{
//serviceCollectionConfigurator.AddConsumer(_eventHandlerType);
rabbitMqBusFactoryConfigurator.ReceiveEndpoint(_queueName, conf =>
{
//conf.Consumer()
conf.ConfigureConsumer(serviceProvider, _eventHandlerType);
conf.PrefetchCount = _prefetchCount;
conf.UseMessageRetry(_retryConfigure);
});
}
}
}
using Pole.ReliableMessage.Abstraction;
using Pole.ReliableMessage.EventBus;
using Pole.ReliableMessage.Masstransit.Abstraction;
using MassTransit;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Text;
using System.Threading.Tasks;
namespace Pole.ReliableMessage.Masstransit
{
class MasstransitMessageBusConfigurator : IMessageBusConfigurator
{
private readonly IReliableEventHandlerRegistrarFactory _reliableEventHandlerRegistrarFactory;
private readonly MasstransitRabbitmqOption _options;
public MasstransitMessageBusConfigurator(IReliableEventHandlerRegistrarFactory reliableEventHandlerRegistrarFactory, IOptions<MasstransitRabbitmqOption> options)
{
_reliableEventHandlerRegistrarFactory = reliableEventHandlerRegistrarFactory;
_options = options.Value;
}
public async Task Configure(IServiceCollection services,IEnumerable<Type> eventHandlerTypes)
{
await Task.CompletedTask;
var eventHandlerRegistrars = GetEventHandlerRegistrars(eventHandlerTypes).ToList();
services.AddMassTransit(x =>
{
foreach (var eventHandlerRegistrar in eventHandlerRegistrars)
{
eventHandlerRegistrar.RegisterEventHandler(x, services);
}
x.AddBus(provider => Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host(new Uri(_options.RabbitMqHostAddress), h =>
{
h.Username(_options.RabbitMqHostUserName);
h.Password(_options.RabbitMqHostPassword);
});
foreach (var eventHandlerRegistrar in eventHandlerRegistrars)
{
eventHandlerRegistrar.RegisterQueue(x, cfg, host, provider);
}
}));
});
}
private IEnumerable<MasstransitEventHandlerRegistrar> GetEventHandlerRegistrars(IEnumerable<Type> eventHandlerTypes)
{
foreach (var eventHandler in eventHandlerTypes)
{
var model = _reliableEventHandlerRegistrarFactory.Create(eventHandler);
yield return model;
}
}
}
}
using GreenPipes;
using GreenPipes.Configurators;
using Microsoft.Extensions.DependencyInjection;
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.ReliableMessage.Masstransit
{
public class MasstransitRabbitmqOption
{
public string RabbitMqHostAddress { get; set; }
public string RabbitMqHostUserName { get; set; }
public string RabbitMqHostPassword { get; set; }
public string QueueNamePrefix { get; set; } = string.Empty;
/// <summary>
/// 4 个并发
/// </summary>
public ushort PrefetchCount { get; set; } = 4;
public Action<IRetryConfigurator> RetryConfigure { get; set; } =
r => r.Intervals(TimeSpan.FromSeconds(0.1)
, TimeSpan.FromSeconds(1)
, TimeSpan.FromSeconds(4)
, TimeSpan.FromSeconds(16)
, TimeSpan.FromSeconds(64)
);
}
}
using Pole.ReliableMessage.Abstraction;
using Pole.ReliableMessage.Masstransit.Pipe;
using Pole.ReliableMessage.Messaging;
using MassTransit;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
namespace Pole.ReliableMessage.Masstransit
{
public abstract class ReliableEventHandler<TEvent> : IReliableEventHandler<TEvent>,IConsumer<TEvent>
where TEvent : class
{
private readonly IMessageStorage _messageStorage;
private readonly ILogger<ReliableEventHandler<TEvent>> _logger;
private readonly IServiceProvider _serviceProvider;
public ReliableEventHandler(IServiceProvider serviceProvider)
{
_messageStorage = serviceProvider.GetRequiredService(typeof(IMessageStorage)) as IMessageStorage;
var loggerFactory = serviceProvider.GetRequiredService(typeof(ILoggerFactory)) as ILoggerFactory;
_logger = loggerFactory.CreateLogger<ReliableEventHandler<TEvent>>();
_serviceProvider = serviceProvider;
}
public abstract Task Handle(IReliableEventHandlerContext<TEvent> context);
public async Task Consume(ConsumeContext<TEvent> context)
{
var messageId = GetReliableMessageId(context);
if (_logger.IsEnabled(LogLevel.Debug))
{
var jsonConveter = _serviceProvider.GetRequiredService(typeof(IJsonConverter)) as IJsonConverter;
var json = jsonConveter.Serialize(context.Message);
_logger.LogDebug($"Message Begin Handle,messageId:{messageId}, message content :{json}");
}
var retryAttempt = context.GetRetryAttempt();
if (retryAttempt == 0)
{
if (string.IsNullOrEmpty(messageId))
{
_logger.LogWarning($"Message has no ReliableMessageId, ignore");
return;
}
var isHandled = !await _messageStorage.CheckAndUpdateStatus(m => m.Id == messageId, MessageStatus.Handed);
if (isHandled)
{
_logger.LogTrace($"This message has handled begore ReliableMessageId:{messageId}, ignore");
return;
}
}
await Handle(new DefaultReliableEventHandlerContext<TEvent>(context));
_logger.LogDebug($"Message handled successfully ,messageId:{messageId}");
}
private string GetReliableMessageId(ConsumeContext<TEvent> context)
{
return context.Headers.Get(AddReliableMessageIdPipe.RELIABLE_MESSAGE_ID, string.Empty);
}
}
}
using Pole.ReliableMessage.Abstraction;
using MassTransit;
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.ReliableMessage.Masstransit.Messaging
{
class DefaultMessageIdGenerator : IMessageIdGenerator
{
public string Generate()
{
return NewId.Next().ToString("N").ToLowerInvariant();
}
}
}
using GreenPipes;
using MassTransit;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
namespace Pole.ReliableMessage.Masstransit.Pipe
{
class AddReliableMessageIdPipe : IPipe<PublishContext>
{
public const string RELIABLE_MESSAGE_ID = "ReliableMessageId";
private readonly string _reliableMessageId;
public AddReliableMessageIdPipe(string reliableMessageId)
{
_reliableMessageId = reliableMessageId;
}
public void Probe(ProbeContext context)
{
}
public async Task Send(PublishContext context)
{
context.Headers.Set(RELIABLE_MESSAGE_ID, _reliableMessageId);
await Task.CompletedTask;
}
}
}
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="MassTransit" Version="6.0.1" />
<PackageReference Include="MassTransit.Extensions.DependencyInjection" Version="6.0.1" />
<PackageReference Include="MassTransit.RabbitMQ" Version="6.0.1" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Pole.ReliableMessage\Pole.ReliableMessage.csproj" />
</ItemGroup>
</Project>
using Pole.ReliableMessage.Core;
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.ReliableMessage.Masstransit
{
public class QueueHaType : Enumeration
{
public static QueueHaType None = new QueueHaType(1, "无",string.Empty);
public static QueueHaType Default = new QueueHaType(2, "默认高可用","Rmd.");
public static QueueHaType Backlog = new QueueHaType(3, "消息可积压","Rmb.");
private readonly string _queuePrefix;
public QueueHaType(int id, string name) : base(id, name)
{
}
public QueueHaType(int id, string name,string prefix) : this(id, name)
{
_queuePrefix = prefix;
}
public string GenerateQueueName(string rawQueueName)
{
return string.Concat(_queuePrefix, rawQueueName);
}
}
public enum QueueHaTypeEnum:int
{
None = 1,
Default = 2,
Backlog = 3
}
}
using GreenPipes.Configurators;
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.ReliableMessage.Masstransit
{
[AttributeUsage(AttributeTargets.Class)]
public class ReliableEventHandlerParemeterAttribute : Attribute
{
public ushort PrefetchCount { get; set; }
public QueueHaTypeEnum QueueHaType { get; set; } = QueueHaTypeEnum.Default;
}
}
using Pole.ReliableMessage;
using Pole.ReliableMessage.Abstraction;
using Pole.ReliableMessage.Masstransit;
using Pole.ReliableMessage.Masstransit.Abstraction;
using Microsoft.Extensions.DependencyInjection;
using System;
using System.Collections.Generic;
using System.Text;
using Pole.ReliableMessage.Masstransit.Messaging;
namespace Microsoft.Extensions.DependencyInjection
{
public static class ReliableMessageOptionExtension
{
public static ReliableMessageOption AddMasstransitRabbitmq(this ReliableMessageOption option, Action<MasstransitRabbitmqOption> optionConfig)
{
option.ReliableMessageOptionExtensions.Add(new MasstransitRabbitmqExtension(optionConfig));
return option;
}
}
public class MasstransitRabbitmqExtension : IReliableMessageOptionExtension
{
private readonly Action<MasstransitRabbitmqOption> _masstransitRabbitmqOption;
public MasstransitRabbitmqExtension(Action<MasstransitRabbitmqOption> masstransitRabbitmqOption)
{
_masstransitRabbitmqOption = masstransitRabbitmqOption;
}
public void AddServices(IServiceCollection services)
{
services.Configure(_masstransitRabbitmqOption);
services.AddSingleton<IMessageBus, MasstransitBasedMessageBus>();
services.AddSingleton<IMessageBusConfigurator, MasstransitMessageBusConfigurator>();
services.AddSingleton<IReliableEventHandlerRegistrarFactory, DefaultReliableEventHandlerRegistrarFactory>();
services.AddSingleton<IMessageIdGenerator, DefaultMessageIdGenerator>();
services.AddHostedService<MassTransitHostedService>();
}
}
}
using System;
using System.Collections.Generic;
using System.Net;
using System.Text;
using System.Threading.Tasks;
namespace Pole.ReliableMessage.Storage.Abstraction
{
public interface IMemberShipTable
{
Task<bool> IsPendingMessageCheckerServiceInstance(string ipAddress);
Task<bool> UpdateIAmAlive(string ipAddress, DateTime dateTime);
/// <summary>
/// 如果当前 超时时间内 没有可用 实例 返回 空
/// </summary>
/// <param name="iamAliveTimeout"></param>
/// <returns></returns>
Task<string> GetPendingMessageCheckerServiceInstanceIp(DateTime iamAliveEndTime);
Task<bool> AddCheckerServiceInstanceAndDeleteOthers(string ipAddress, DateTime aliveUTCTime);
}
}
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
</PropertyGroup>
</Project>
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
</PropertyGroup>
<ItemGroup>
<ProjectReference Include="..\Pole.ReliableMessage.Storage.Abstraction\Pole.ReliableMessage.Storage.Abstraction.csproj" />
</ItemGroup>
</Project>
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.ReliableMessage.Storage.Mongodb
{
public sealed class MongoHost
{
/// <summary>
/// 主机或者IP地址
/// </summary>
public string Host { get; set; }
/// <summary>
/// 端口号
/// </summary>
public int Port { get; set; }
}
}
using Pole.ReliableMessage.Abstraction;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using MongoDB.Driver;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Pole.ReliableMessage.Storage.Abstraction;
namespace Pole.ReliableMessage.Storage.Mongodb
{
class MongodbMemberShipTable : IMemberShipTable
{
private readonly MongoClient _mongoClient;
private readonly MongodbOption _mongodbOption;
private readonly ILogger _logger;
public MongodbMemberShipTable(IConfiguration configuration, MongoClient mongoClient, IOptions<MongodbOption> mongodbOption, ILogger<MongodbMemberShipTable> logger)
{
_mongoClient = mongoClient;
_mongodbOption = mongodbOption.Value;
_logger = logger;
}
private IMongoDatabase GetActiveMessageDatabase(string activeMessageDatabase)
{
return _mongoClient.GetDatabase(activeMessageDatabase);
}
private IMongoCollection<MemberShipTable> GetCollection()
{
var database = GetActiveMessageDatabase(_mongodbOption.MessageDatabaseName);
var messageCollectionName = _mongodbOption.MembershipCollectionName;
var collection = database.GetCollection<MemberShipTable>(messageCollectionName);
return collection;
}
public async Task<bool> AddCheckerServiceInstanceAndDeleteOthers(string ipAddress, DateTime aliveUTCTime)
{
var collection = GetCollection();
var deleteResult = await collection.DeleteManyAsync(m => m.ServiceName == _mongodbOption.ServiceCollectionName);
MemberShipTable memberShipTable = new MemberShipTable(_mongodbOption.ServiceCollectionName, ipAddress, aliveUTCTime);
await collection.InsertOneAsync(memberShipTable);
return true;
}
public async Task<string> GetPendingMessageCheckerServiceInstanceIp(DateTime iamAliveEndTime)
{
var collection = GetCollection();
var instances = (await collection.FindAsync(m => m.ServiceName == _mongodbOption.ServiceCollectionName && m.IAmAliveUTCTime >= iamAliveEndTime)).ToList();
if (instances.Count > 1)
{
_logger.LogInformation($"Current time have {instances.Count} PendingMessageChecker in {_mongodbOption.ServiceCollectionName} service , I will delete the extra instances");
var currentInstance = instances.FirstOrDefault();
var extraInstances = instances.Remove(currentInstance);
instances.ForEach(async n =>
{
await collection.DeleteOneAsync(m => m.Id == n.Id);
});
_logger.LogInformation($"Extra PendingMessageChecker instances in {_mongodbOption.ServiceCollectionName} service deleted successfully");
return currentInstance.PendingMessageCheckerIp;
}
else if (instances.Count == 1)
{
return instances.FirstOrDefault().PendingMessageCheckerIp;
}
else
{
return null;
}
}
public async Task<bool> IsPendingMessageCheckerServiceInstance(string ipAddress)
{
var collection = GetCollection();
var instances = (await collection.FindAsync(m => m.ServiceName == _mongodbOption.ServiceCollectionName && m.PendingMessageCheckerIp== ipAddress)).FirstOrDefault();
if (instances != null)
{
return true;
}
return false;
}
public async Task<bool> UpdateIAmAlive(string ipAddress,DateTime dateTime)
{
var collection = GetCollection();
var filter = Builders<MemberShipTable>.Filter.Where(m => m.ServiceName == _mongodbOption.ServiceCollectionName && m.PendingMessageCheckerIp == ipAddress);
var update = Builders<MemberShipTable>.Update.Set(m=>m.IAmAliveUTCTime,dateTime);
var result = await collection.UpdateOneAsync(filter, update);
return true;
}
}
}
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.ReliableMessage.Storage.Mongodb
{
public class MongodbOption
{
public string MessageDatabaseName { get; set; } = "ReliableMessage";
public string MembershipCollectionName { get; set; } = "Membership";
/// <summary>
/// bucket 中最大消息数 一旦达到最大数量 后面的数据将覆盖前面的数据
/// </summary>
public long CollectionMaxMessageCount { get; set; } = 20000000;
/// <summary>
/// 默认最大为10G
/// </summary>
public long CollectionMaxSize { get; set; } = 10*1024*1024*1024L;
public string ServiceCollectionName { get; set; }
public MongoHost[] Servers { get; set; }
}
}
using Pole.ReliableMessage.Abstraction;
using Pole.ReliableMessage.Messaging;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using MongoDB.Driver;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Text;
using System.Threading.Tasks;
namespace Pole.ReliableMessage.Storage.Mongodb
{
class MongodbMessageStorage : IMessageStorage
{
private readonly MongoClient _mongoClient;
private readonly MongodbOption _mongodbOption;
private readonly ILogger<MongodbMessageStorage> _logger;
public MongodbMessageStorage(MongoClient mongoClient, IOptions<MongodbOption> mongodbOption, ILogger<MongodbMessageStorage> logger)
{
_mongoClient = mongoClient;
_mongodbOption = mongodbOption.Value;
_logger = logger;
}
private IMongoDatabase GetActiveMessageDatabase(string messageDatabase)
{
return _mongoClient.GetDatabase(messageDatabase);
}
private IMongoCollection<Message> GetCollection()
{
var database = GetActiveMessageDatabase(_mongodbOption.MessageDatabaseName);
var messageCollectionName = _mongodbOption.ServiceCollectionName;
var collection = database.GetCollection<Message>(messageCollectionName);
return collection;
}
public async Task<bool> Add(Message message)
{
IMongoCollection<Message> collection = GetCollection();
await collection.InsertOneAsync(message);
return true;
}
public async Task<bool> CheckAndUpdateStatus(Expression<Func<Message, bool>> filter, MessageStatus messageStatus)
{
IMongoCollection<Message> collection = GetCollection();
var update = Builders<Message>.Update.Set(m => m.MessageStatusId, messageStatus.Id);
var beforeDoc = await collection.FindOneAndUpdateAsync(filter, update, new FindOneAndUpdateOptions<Message, Message>() { ReturnDocument = ReturnDocument.Before });
if (beforeDoc.MessageStatusId == messageStatus.Id)
{
return false;
}
return true;
}
public async Task<List<Message>> GetMany(Expression<Func<Message, bool>> filter,int count)
{
IMongoCollection<Message> collection = GetCollection();
var list= await collection.Find(filter).Limit(count).ToListAsync();
list.ForEach(m =>
{
m.MessageStatus = Core.Enumeration.FromValue<MessageStatus>(m.MessageStatusId);
});
return list;
}
public async Task<bool> Save(IEnumerable<Message> messages)
{
var count = messages.Count();
_logger.LogDebug($"MongodbMessageStorage Save begin, Messages count: {messages.Count()}");
if (count == 0)
{
_logger.LogDebug($"MongodbMessageStorage Save successfully, saved count: 0");
return true;
}
IMongoCollection<Message> collection = GetCollection();
var models = new List<WriteModel<Message>>();
foreach (var message in messages)
{
FilterDefinition<Message> filter = Builders<Message>.Filter.Where(m => m.Id == message.Id && m.MessageStatusId != MessageStatus.Handed.Id);
UpdateDefinition<Message> update = Builders<Message>.Update
.Set(m => m.MessageStatusId, message.MessageStatus.Id)
.Set(m => m.RetryTimes, message.RetryTimes)
.Set(m => m.NextRetryUTCTime, message.NextRetryUTCTime);
var model = new UpdateOneModel<Message>(filter, update);
models.Add(model);
}
var result = await collection.BulkWriteAsync(models, new BulkWriteOptions { IsOrdered = false });
_logger.LogDebug($"MongodbMessageStorage Save successfully, saved count: {result.ModifiedCount}");
return result.IsAcknowledged;
}
public async Task<bool> UpdateStatus(IEnumerable<Message> messages)
{
var count = messages.Count();
_logger.LogDebug($"MongodbMessageStorage updateStatus begin, Messages count: {messages.Count()}");
if (count == 0)
{
_logger.LogDebug($"MongodbMessageStorage updateStatus successfully, Modified count: 0");
return true;
}
IMongoCollection<Message> collection = GetCollection();
var models = new List<WriteModel<Message>>();
foreach (var message in messages)
{
FilterDefinition<Message> filter = Builders<Message>.Filter.Where(m => m.Id == message.Id&&m.MessageStatusId!=MessageStatus.Handed.Id);
UpdateDefinition<Message> update = Builders<Message>.Update
.Set(m => m.MessageStatusId, message.MessageStatus.Id);
var model = new UpdateOneModel<Message>(filter, update);
models.Add(model);
}
var result = await collection.BulkWriteAsync(models, new BulkWriteOptions { IsOrdered = false });
_logger.LogDebug($"MongodbMessageStorage updateStatus successfully, Modified count: {result.ModifiedCount}");
return result.IsAcknowledged;
}
public async Task<bool> UpdateStatus(Expression<Func<Message, bool>> filter, MessageStatus messageStatus)
{
IMongoCollection<Message> collection = GetCollection();
var update = Builders<Message>.Update.Set(m => m.MessageStatusId, messageStatus.Id);
var result = await collection.UpdateOneAsync(filter, update);
return result.IsAcknowledged;
}
}
}
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="3.1.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="3.1.0" />
<PackageReference Include="Microsoft.Extensions.Options" Version="3.1.0" />
<PackageReference Include="MongoDB.Driver" Version="2.10.0" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Pole.ReliableMessage.Storage.Abstraction\Pole.ReliableMessage.Storage.Abstraction.csproj" />
<ProjectReference Include="..\Pole.ReliableMessage\Pole.ReliableMessage.csproj" />
</ItemGroup>
</Project>
using Pole.ReliableMessage;
using Pole.ReliableMessage.Abstraction;
using Pole.ReliableMessage.Messaging;
using Pole.ReliableMessage.Storage.Mongodb;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using MongoDB.Bson;
using MongoDB.Bson.Serialization;
using MongoDB.Bson.Serialization.IdGenerators;
using MongoDB.Driver;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using Pole.ReliableMessage.Storage.Abstraction;
namespace Microsoft.Extensions.DependencyInjection
{
public static class ReliableMessageOptionExtension
{
public static ReliableMessageOption AddMongodb(this ReliableMessageOption option, Action<MongodbOption> mongodbOptionConfig)
{
option.ReliableMessageOptionExtensions.Add(new MongodbStorageExtension(mongodbOptionConfig));
return option;
}
}
public class MongodbStorageExtension : IReliableMessageOptionExtension
{
private readonly Action<MongodbOption> _mongodbOption;
public MongodbStorageExtension(Action<MongodbOption> masstransitRabbitmqOption)
{
_mongodbOption = masstransitRabbitmqOption;
}
public void AddServices(IServiceCollection services)
{
services.Configure(_mongodbOption);
services.AddSingleton<IMessageStorage, MongodbMessageStorage>();
services.AddSingleton<IMemberShipTable, MongodbMemberShipTable>();
var mongodbOption = services.BuildServiceProvider().GetRequiredService<IOptions<MongodbOption>>().Value;
var servers = mongodbOption.Servers.Select(x => new MongoServerAddress(x.Host, x.Port)).ToList();
var settings = new MongoClientSettings()
{
Servers = servers
};
var client = new MongoClient(settings);
var database = client.GetDatabase(mongodbOption.MessageDatabaseName);
AddMapper();
InitCollection(mongodbOption, database);
services.AddSingleton(client);
}
private static void InitCollection(MongodbOption mongodbOption, IMongoDatabase database)
{
var collectionNames = database.ListCollectionNames().ToList();
if (!collectionNames.Contains(mongodbOption.ServiceCollectionName))
{
database.CreateCollection(mongodbOption.ServiceCollectionName, new CreateCollectionOptions
{
Capped = true,
MaxDocuments = mongodbOption.CollectionMaxMessageCount,
MaxSize = mongodbOption.CollectionMaxSize,
});
var messageCollection = database.GetCollection<Message>(mongodbOption.ServiceCollectionName);
AddMessageCollectionIndex(messageCollection);
}
if (!collectionNames.Contains(mongodbOption.MembershipCollectionName))
{
database.CreateCollection(mongodbOption.MembershipCollectionName);
var membershipCollection = database.GetCollection<MemberShipTable>(mongodbOption.MembershipCollectionName);
AddMemberShipTableCollectionIndex(membershipCollection);
}
}
private static void AddMessageCollectionIndex(IMongoCollection<Message> collection)
{
List<CreateIndexModel<Message>> createIndexModels = new List<CreateIndexModel<Message>>();
//var nextRetryUTCTimeIndex = Builders<Message>.IndexKeys.Ascending(m => m.NextRetryUTCTime);
//CreateIndexModel<Message> nextRetryUTCTimeIndexModel = new CreateIndexModel<Message>(nextRetryUTCTimeIndex, new CreateIndexOptions() { Background = true });
//createIndexModels.Add(nextRetryUTCTimeIndexModel);
var AddedUTCTimeUTCTimeIndex = Builders<Message>.IndexKeys.Ascending(m => m.AddedUTCTime);
CreateIndexModel<Message> AddedUTCTimeIndexModel = new CreateIndexModel<Message>(AddedUTCTimeUTCTimeIndex, new CreateIndexOptions() { Background = true });
createIndexModels.Add(AddedUTCTimeIndexModel);
//var messageTypeIdIndex = Builders<Message>.IndexKeys.Ascending(m => m.MessageTypeId);
//CreateIndexModel<Message> messageTypeIdIndexModel = new CreateIndexModel<Message>(messageTypeIdIndex, new CreateIndexOptions() { Background = true });
//createIndexModels.Add(messageTypeIdIndexModel);
collection.Indexes.CreateMany(createIndexModels);
}
private static void AddMemberShipTableCollectionIndex(IMongoCollection<MemberShipTable> collection)
{
List<CreateIndexModel<MemberShipTable>> createIndexMembershipModels = new List<CreateIndexModel<MemberShipTable>>();
var serviceNameIndex = Builders<MemberShipTable>.IndexKeys.Ascending(m => m.ServiceName);
CreateIndexModel<MemberShipTable> serviceNameIndexModel = new CreateIndexModel<MemberShipTable>(serviceNameIndex, new CreateIndexOptions() { Background = true, Unique = true });
createIndexMembershipModels.Add(serviceNameIndexModel);
collection.Indexes.CreateMany(createIndexMembershipModels);
}
private static void AddMapper()
{
BsonClassMap.RegisterClassMap<Message>(cm =>
{
cm.AutoMap();
cm.UnmapMember(m => m.MessageStatus);
cm.MapIdField(m => m.Id);
cm.MapMember(m => m.NextRetryUTCTime).SetIsRequired(true);
});
BsonClassMap.RegisterClassMap<MemberShipTable>(cm =>
{
cm.AutoMap();
cm.MapIdField(m => m.Id).SetIdGenerator(StringObjectIdGenerator.Instance);
});
}
}
}
using Microsoft.AspNetCore.Builder;
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.ReliableMessage.Abstraction
{
public interface IApplicationBuilderConfigurator
{
void Config(IApplicationBuilder applicationBuilder);
void Add(Action<IApplicationBuilder> config);
}
}
using Microsoft.Extensions.DependencyInjection;
using System;
using System.Collections.Generic;
using System.Reflection;
using System.Text;
using System.Threading.Tasks;
namespace Pole.ReliableMessage.Abstraction
{
public interface IComteckReliableMessageBootstrap
{
Task Initialize(IServiceCollection services, List<Assembly> eventHandlerAssemblies, List<Assembly> eventAssemblies);
}
}
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.ReliableMessage.Abstraction
{
public interface IJsonConverter
{
string Serialize(object obj);
T Deserialize<T>(string json);
object Deserialize(string json,Type type);
}
}
using Pole.ReliableMessage.Messaging;
using System;
using System.Collections.Generic;
using System.Linq.Expressions;
using System.Text;
using System.Threading.Tasks;
namespace Pole.ReliableMessage.Abstraction
{
public interface IMessageBuffer
{
Task Flush();
Task<bool> Add(Message message);
Task<List<Message>> GetAll(Func<Message,bool> filter);
}
}
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Pole.ReliableMessage.Abstraction
{
public interface IMessageBus
{
Task Publish(object @event,string reliableMessageId, CancellationToken cancellationToken = default);
}
}
using Microsoft.Extensions.DependencyInjection;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
namespace Pole.ReliableMessage.Abstraction
{
public interface IMessageBusConfigurator
{
Task Configure(IServiceCollection services, IEnumerable<Type> eventHandlerTypes);
}
}
using Pole.ReliableMessage.Messaging.CallBack;
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.ReliableMessage.Abstraction
{
public interface IMessageCallBackInfoGenerator
{
MessageCallBackInfo Generate(Type eventHandlerType);
}
}
using Pole.ReliableMessage.Messaging.CallBack;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
namespace Pole.ReliableMessage.Abstraction
{
public interface IMessageCallBackInfoStore
{
Task Add(MessageCallBackInfo messageCallBackInfo);
Task<MessageCallBackInfo> Get(string messageTypeId);
}
}
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
namespace Pole.ReliableMessage.Abstraction
{
public interface IMessageCallBackRegister
{
Task Register(IEnumerable<Type> eventHandlerTypes);
}
}
using Pole.ReliableMessage.Messaging;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
namespace Pole.ReliableMessage.Abstraction
{
public interface IMessageChecker
{
Task<MessageCheckerResult> GetResult(Message message);
}
}
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.ReliableMessage.Abstraction
{
public interface IMessageIdGenerator
{
string Generate();
}
}
using Pole.ReliableMessage.Messaging;
using System;
using System.Collections.Generic;
using System.Linq.Expressions;
using System.Text;
using System.Threading.Tasks;
namespace Pole.ReliableMessage.Abstraction
{
public interface IMessageStorage
{
/// <summary>
///
/// </summary>
/// <param name="message"></param>
/// <returns></returns>
Task<bool> Add(Message message);
///// <summary>
/////
///// </summary>
///// <param name="messageStatus"></param>
///// <param name="endRetryTime"></param>
///// <returns></returns>
//Task<Message> Get(Expression<Func<Message, bool>> filter);
/// <summary>
///
/// </summary>
/// <param name="messageStatus"></param>
/// <param name="endRetryTime"></param>
/// <returns></returns>
Task<List<Message>> GetMany(Expression<Func<Message,bool>> filter, int count);
/// <summary>
/// 批量更新
/// 更新这几个值 MessageStatusId , RetryTimes LastRetryUTCTime, NextRetryUTCTime
/// </summary>
/// <param name="messages"></param>
/// <returns></returns>
Task<bool> Save(IEnumerable<Message> messages);
Task<bool> UpdateStatus(IEnumerable<Message> messages);
///// <summary>
/////
///// </summary>
///// <param name="id">这里id 永远为 string </param>
///// <param name="messageStatus"></param>
///// <returns></returns>
//Task<bool> Update(Expression<Func<Message, bool>> filter, MessageStatus messageStatus);
/// <summary>
/// 检查 消息的状态,如果不是指定状态则返回true,并且更新状态到指定状态 ,如果已经是指定状态返回false
/// </summary>
/// <param name="id"> </param>
/// <param name="messageStatus"></param>
/// <returns></returns>
Task<bool> CheckAndUpdateStatus(Expression<Func<Message,bool>> filter, MessageStatus messageStatus);
Task<bool> UpdateStatus(Expression<Func<Message, bool>> filter, MessageStatus messageStatus);
}
}
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.ReliableMessage.Abstraction
{
public interface IMessageTypeIdGenerator
{
string Generate(Type messageType);
}
}
using Pole.ReliableMessage.Processor;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
namespace Pole.ReliableMessage.Abstraction
{
public interface IProcessor
{
string Name { get; }
Task Process(ProcessingContext context);
}
}
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Pole.ReliableMessage.Abstraction
{
public interface IProcessorServer
{
Task Start(CancellationToken stoppingToken);
}
}
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Pole.ReliableMessage.Abstraction
{
public interface IReliableBus
{
Task<string> PrePublish<TReliableEvent>(TReliableEvent @event,object callbackParemeter, CancellationToken cancellationToken = default);
Task<bool> Publish<TReliableEvent>(TReliableEvent @event,string prePublishMessageId, CancellationToken cancellationToken=default);
Task<bool> Cancel<TReliableEvent>(TReliableEvent @event, string prePublishMessageId, CancellationToken cancellationToken = default);
}
}
using System;
using System.Collections.Generic;
using System.Reflection;
using System.Text;
namespace Pole.ReliableMessage.Abstraction
{
public interface IReliableEventCallBackFinder
{
List<Type> FindAll(IEnumerable<Assembly> assemblies);
}
}
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
namespace Pole.ReliableMessage.Abstraction
{
public interface IReliableEventCallback<TReliableEvent,TCallbackParemeter> : IReliableEventCallback
{
Task<bool> Callback(TCallbackParemeter callbackParemeter);
}
public interface IReliableEventCallback
{
}
}
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
namespace Pole.ReliableMessage.Abstraction
{
public interface IReliableEventHandler<TEvent> : IReliableEventHandler
where TEvent : class
{
Task Handle(IReliableEventHandlerContext<TEvent> context);
}
public interface IReliableEventHandler
{
}
}
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
namespace Pole.ReliableMessage.Abstraction
{
public interface IReliableEventHandlerContext<TEvent> where TEvent : class
{
TEvent Event { get; }
//Task Publish(object @event);
}
}
using System;
using System.Collections.Generic;
using System.Reflection;
using System.Text;
namespace Pole.ReliableMessage.Abstraction
{
public interface IReliableEventHandlerFinder
{
List<Type> FindAll(IEnumerable<Assembly> assemblies);
}
}
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.ReliableMessage.Abstraction
{
public interface IRetryTimeDelayCalculator
{
int Get(int retryTimes, int maxPendingMessageRetryDelay);
}
}
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.ReliableMessage.Abstraction
{
public interface IServiceIPv4AddressProvider
{
string Get();
}
}
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.ReliableMessage.Abstraction
{
public interface ITimeHelper
{
DateTime GetUTCNow();
/// <summary>
/// "UTC :{_timeHelper.GetNow().ToString("yyyy-MM-dd HH:mm:ss.fff")}"
/// </summary>
/// <returns></returns>
string GetAppropriateFormatedDateString();
}
}
using Pole.ReliableMessage.Abstraction;
using Pole.ReliableMessage.Processor;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Pole.ReliableMessage
{
public class BackgroundServiceBasedProcessorServer : BackgroundService, IProcessorServer
{
private readonly IServiceProvider _serviceProvider;
private Task _compositeTask;
public BackgroundServiceBasedProcessorServer(IServiceProvider serviceProvider)
{
_serviceProvider = serviceProvider;
}
public async Task Start(CancellationToken stoppingToken)
{
ProcessingContext processingContext = new ProcessingContext(stoppingToken);
List<LoopProcessor> loopProcessors = new List<LoopProcessor>();
var innerProcessors = _serviceProvider.GetServices<IProcessor>();
var loggerFactory = _serviceProvider.GetService<ILoggerFactory>();
var timeHelper = _serviceProvider.GetService<ITimeHelper>();
foreach (var innerProcessor in innerProcessors)
{
LoopProcessor processor = new LoopProcessor(innerProcessor, loggerFactory, timeHelper);
loopProcessors.Add(processor);
}
var tasks = loopProcessors.Select(p => p.Process(processingContext));
_compositeTask = Task.WhenAll(tasks);
await _compositeTask;
}
protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
return Start(stoppingToken);
}
public override void Dispose()
{
// 等待 10秒 待 消息处理完
_compositeTask?.Wait((int)TimeSpan.FromSeconds(10).TotalMilliseconds);
base.Dispose();
}
}
}
using Pole.ReliableMessage;
using Pole.ReliableMessage.Abstraction;
using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace Microsoft.AspNetCore.Builder
{
public static class ComteckReliableMessageIApplicationBuilderExtensions
{
public static IApplicationBuilder UseComteckReliableMessage(this IApplicationBuilder applicationBuilder)
{
var option = applicationBuilder.ApplicationServices.GetRequiredService(typeof(IOptions<ReliableMessageOption>)) as IOptions<ReliableMessageOption>;
var messageCallBackRegister = applicationBuilder.ApplicationServices.GetRequiredService(typeof(IMessageCallBackRegister)) as IMessageCallBackRegister;
var reliableEventCallBackFinder = applicationBuilder.ApplicationServices.GetRequiredService(typeof(IReliableEventCallBackFinder)) as IReliableEventCallBackFinder;
var eventCallbacks = reliableEventCallBackFinder.FindAll(option.Value.EventCallbackAssemblies);
messageCallBackRegister.Register(eventCallbacks).GetAwaiter().GetResult();
return applicationBuilder;
}
}
}
using Pole.Pole.ReliableMessage.EventBus;
using Pole.ReliableMessage;
using Pole.ReliableMessage.Abstraction;
using Pole.ReliableMessage.EventBus;
using Pole.ReliableMessage.Messaging;
using Pole.ReliableMessage.Messaging.CallBack;
using Pole.ReliableMessage.Processor;
using Pole.ReliableMessage.Utils;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Text;
namespace Microsoft.Extensions.DependencyInjection
{
public static class ComteckReliableMessageServiceCollectionExtensions
{
public static IServiceCollection AddComteckReliableMessage(this IServiceCollection services, Action<ReliableMessageOption> optionConfig)
{
ReliableMessageOption reliableMessageOption = new ReliableMessageOption();
optionConfig(reliableMessageOption);
foreach(var extension in reliableMessageOption.ReliableMessageOptionExtensions)
{
extension.AddServices(services);
}
services.Configure(optionConfig);
services.AddSingleton<IJsonConverter, DefaultJsonConverter>();
services.AddSingleton<IMessageBuffer, DefaultMessageBuffer>();
services.AddSingleton<IRetryTimeDelayCalculator, DefaultRetryTimeDelayCalculator>();
services.AddSingleton<ITimeHelper, DefaulTimeHelper>();
services.AddSingleton<IApplicationBuilderConfigurator, DefaultApplicationBuilderConfigurator>();
services.AddSingleton<IComteckReliableMessageBootstrap, DefaultComteckReliableMessageBootstrap>();
services.AddSingleton<IMessageCallBackInfoGenerator, DefaultMessageCallBackInfoGenerator>();
services.AddSingleton<IMessageCallBackInfoStore, MessageCallBackInfoInMemoryStore>();
services.AddSingleton<IMessageCallBackRegister, DefaultMessageCallBackRegister>();
services.AddSingleton<IMessageChecker, DefaultMessageChecker>();
services.AddSingleton<IMessageCallBackInfoGenerator, DefaultMessageCallBackInfoGenerator>();
services.AddSingleton<IReliableBus, DefaultReliableBus>();
services.AddSingleton<IReliableEventCallBackFinder, DefaultReliableEventCallBackFinder>();
services.AddSingleton<IReliableEventHandlerFinder, DefaultReliableEventHandlerFinder>();
services.AddSingleton<IMessageTypeIdGenerator, DefaultMessageTypeIdGenerator>();
services.AddSingleton<IServiceIPv4AddressProvider, DefaultServiceIPv4AddressProvider>();
services.AddHostedService<BackgroundServiceBasedProcessorServer>();
services.AddHttpClient();
services.AddSingleton<IProcessor, MessageBufferFlushProcessor>();
services.AddSingleton<IProcessor, PendingMessageCheckProcessor>();
services.AddSingleton<IProcessor, PendingMessageServiceInstanceCheckProcessor>();
var provider = services.BuildServiceProvider();
IComteckReliableMessageBootstrap applicationBuilderConfigurator = provider.GetService(typeof(IComteckReliableMessageBootstrap)) as IComteckReliableMessageBootstrap;
applicationBuilderConfigurator.Initialize(services, reliableMessageOption.EventHandlerAssemblies, reliableMessageOption.EventCallbackAssemblies);
return services;
}
}
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Text;
namespace Pole.ReliableMessage.Core
{
public abstract class Enumeration : IComparable
{
public string Name { get; private set; }
public int Id { get; private set; }
protected Enumeration(int id, string name)
{
Id = id;
Name = name;
}
public override string ToString() => Name;
public static IEnumerable<T> GetAll<T>() where T : Enumeration
{
var fields = typeof(T).GetFields(BindingFlags.Public | BindingFlags.Static | BindingFlags.DeclaredOnly);
return fields.Select(f => f.GetValue(null)).Cast<T>();
}
public override bool Equals(object obj)
{
if (!(obj is Enumeration otherValue))
return false;
var typeMatches = GetType().Equals(obj.GetType());
var valueMatches = Id.Equals(otherValue.Id);
return typeMatches && valueMatches;
}
public override int GetHashCode() => Id.GetHashCode();
public static int AbsoluteDifference(Enumeration firstValue, Enumeration secondValue)
{
var absoluteDifference = Math.Abs(firstValue.Id - secondValue.Id);
return absoluteDifference;
}
public static T FromValue<T>(int value) where T : Enumeration
{
var matchingItem = Parse<T, int>(value, "value", item => item.Id == value);
return matchingItem;
}
public static T FromDisplayName<T>(string displayName) where T : Enumeration
{
var matchingItem = Parse<T, string>(displayName, "display name", item => item.Name == displayName);
return matchingItem;
}
private static T Parse<T, K>(K value, string description, Func<T, bool> predicate) where T : Enumeration
{
var matchingItem = GetAll<T>().FirstOrDefault(predicate);
if (matchingItem == null)
throw new InvalidOperationException($"'{value}' is not a valid {description} in {typeof(T)}");
return matchingItem;
}
public int CompareTo(object other) => Id.CompareTo(((Enumeration)other).Id);
}
}
using Pole.ReliableMessage.Abstraction;
using Microsoft.AspNetCore.Builder;
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.ReliableMessage
{
class DefaultApplicationBuilderConfigurator : IApplicationBuilderConfigurator
{
private readonly List<Action<IApplicationBuilder>> _configs = new List<Action<IApplicationBuilder>>();
public void Add(Action<IApplicationBuilder> config)
{
_configs.Add(config);
}
public void Config(IApplicationBuilder applicationBuilder)
{
_configs.ForEach(m => {
m(applicationBuilder);
});
}
}
}
using Pole.ReliableMessage.Abstraction;
using Microsoft.Extensions.DependencyInjection;
using System;
using System.Collections.Generic;
using System.Reflection;
using System.Text;
using System.Threading.Tasks;
namespace Pole.ReliableMessage
{
class DefaultComteckReliableMessageBootstrap : IComteckReliableMessageBootstrap
{
private readonly IReliableEventHandlerFinder _reliableEventHandlerFinder;
private readonly IMessageBusConfigurator _messageBusConfigurator;
private readonly IMessageCallBackRegister _messageCallBackRegister;
private readonly IReliableEventCallBackFinder _reliableEventCallBackFinder;
public DefaultComteckReliableMessageBootstrap(IReliableEventHandlerFinder reliableEventHandlerFinder, IMessageBusConfigurator messageBusConfigurator, IMessageCallBackRegister messageCallBackRegister, IReliableEventCallBackFinder reliableEventCallBackFinder)
{
_reliableEventHandlerFinder = reliableEventHandlerFinder;
_messageBusConfigurator = messageBusConfigurator;
_messageCallBackRegister = messageCallBackRegister;
_reliableEventCallBackFinder = reliableEventCallBackFinder;
}
public async Task Initialize(IServiceCollection services, List<Assembly> eventHandlerAssemblies, List<Assembly> eventAssemblies)
{
var eventHandlers = _reliableEventHandlerFinder.FindAll(eventHandlerAssemblies);
await _messageBusConfigurator.Configure(services, eventHandlers);
var eventCallbacks = _reliableEventCallBackFinder.FindAll(eventAssemblies);
await _messageCallBackRegister.Register(eventCallbacks);
RegisterEventCallbacks(services, eventCallbacks);
}
private void RegisterEventCallbacks(IServiceCollection services, List<Type> eventCallbacks)
{
eventCallbacks.ForEach(m =>
{
services.AddScoped(m);
});
}
}
}
using Pole.ReliableMessage.Abstraction;
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.ReliableMessage
{
class DefaultRetryTimeDelayCalculator : IRetryTimeDelayCalculator
{
public int Get(int retryTimes, int maxPendingMessageRetryDelay)
{
var retryTimeDelay = (int)Math.Pow(2, retryTimes + 1);
if (retryTimeDelay >= maxPendingMessageRetryDelay)
{
return maxPendingMessageRetryDelay;
}
return retryTimeDelay;
}
}
}
using Pole.ReliableMessage.Abstraction;
using Pole.ReliableMessage.Messaging;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using ILogger = Microsoft.Extensions.Logging.ILogger;
namespace Pole.Pole.ReliableMessage.EventBus
{
public class DefaultReliableBus : IReliableBus
{
private readonly IMessageBus _messageBus;
private readonly IMessageStorage _messageStorage;
private readonly IMessageIdGenerator _messageIdGenerator;
private readonly ITimeHelper _timeHelper;
private readonly IMessageBuffer _messageBuffer;
private readonly ILogger _logger;
private readonly IJsonConverter _jsonConverter;
private readonly IMessageCallBackInfoStore _messageCallBackInfoStore;
private readonly IMessageTypeIdGenerator _messageTypeIdGenerator;
public DefaultReliableBus(IMessageBus messageBus, IMessageStorage messageStorage, IMessageIdGenerator messageIdGenerator, ITimeHelper timeHelper, IMessageBuffer messageBuffer, ILogger<DefaultReliableBus> logger, IJsonConverter jsonConverter, IMessageCallBackInfoStore messageCallBackInfoStore, IMessageTypeIdGenerator messageTypeIdGenerator)
{
_messageBus = messageBus;
_messageStorage = messageStorage;
_messageIdGenerator = messageIdGenerator;
_timeHelper = timeHelper;
_messageBuffer = messageBuffer;
_logger = logger;
_jsonConverter = jsonConverter;
_messageCallBackInfoStore = messageCallBackInfoStore;
_messageTypeIdGenerator = messageTypeIdGenerator;
}
public Task<bool> Cancel<TReliableEvent>(TReliableEvent @event, string prePublishMessageId, CancellationToken cancellationToken = default)
{
try
{
return _messageStorage.UpdateStatus(m => m.Id == prePublishMessageId, MessageStatus.Canced);
}
catch (Exception ex)
{
var errorInfo = $"Cancel PrePublish errors in defaultReliableBus;{ex.Message}";
_logger.LogError(ex, errorInfo);
throw new Exception(errorInfo, ex);
}
}
public async Task<string> PrePublish<TReliableEvent>(TReliableEvent @event, object callbackParemeter, CancellationToken cancellationToken = default)
{
var messageTypeId = _messageTypeIdGenerator.Generate(typeof(TReliableEvent));
var currentMessageCallbackInfo = _messageCallBackInfoStore.Get(messageTypeId);
if (currentMessageCallbackInfo == null)
{
throw new Exception($"Current message type not registered ,messageTypeId:{messageTypeId}");
}
try
{
var messageId = _messageIdGenerator.Generate();
_logger.LogDebug($"PrePublish message begin ,messageId:{messageId}");
var now = _timeHelper.GetUTCNow();
var content = _jsonConverter.Serialize(@event);
var callBackParem = _jsonConverter.Serialize(callbackParemeter);
Message newMessage = new Message()
{
AddedUTCTime = now,
Content = content,
Id = messageId,
MessageStatusId = MessageStatus.Pending.Id,
MessageTypeId = messageTypeId,
RePushCallBackParameterValue = callBackParem,
NextRetryUTCTime = DateTime.MinValue
};
await _messageStorage.Add(newMessage);
_logger.LogDebug($"PrePublish message successful ,messageId:{messageId}");
return messageId;
}
catch (Exception ex)
{
var errorInfo = $"PrePublish errors in DefaultReliableBus;{ex.Message}";
_logger.LogError(ex, errorInfo);
throw new Exception(errorInfo, ex);
}
}
public async Task<bool> Publish<TReliableEvent>(TReliableEvent @event, string prePublishMessageId, CancellationToken cancellationToken = default)
{
try
{
await _messageBus.Publish(@event, prePublishMessageId, cancellationToken);
var messageBufferResult = await _messageBuffer.Add(new Message { Id = prePublishMessageId, MessageStatus = MessageStatus.Pushed });
return true;
}
catch (Exception ex)
{
var errorInfo = $"Publish errors in DefaultReliableBus;{ex.Message}";
_logger.LogError(ex, errorInfo);
throw new Exception(errorInfo, ex);
}
}
}
}
using Pole.ReliableMessage.Abstraction;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Text;
namespace Pole.ReliableMessage.EventBus
{
class DefaultReliableEventCallBackFinder : IReliableEventCallBackFinder
{
public List<Type> FindAll(IEnumerable<Assembly> assemblies)
{
var eventType = typeof(IReliableEventCallback);
var eventTypes = assemblies.SelectMany(m => m.GetTypes().Where(type => eventType.IsAssignableFrom(type)));
return eventTypes.ToList(); ;
}
}
}
using Pole.ReliableMessage.Abstraction;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Text;
namespace Pole.ReliableMessage.EventBus
{
public class DefaultReliableEventHandlerFinder : IReliableEventHandlerFinder
{
public List<Type> FindAll(IEnumerable<Assembly> assemblies)
{
var eventHandlerType = typeof(IReliableEventHandler);
var eventHandlerTypes = assemblies.SelectMany(m => m.GetTypes().Where(type => eventHandlerType.IsAssignableFrom(type)));
return eventHandlerTypes.ToList();
}
}
}
using Microsoft.Extensions.DependencyInjection;
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.ReliableMessage
{
public interface IReliableMessageOptionExtension
{
void AddServices(IServiceCollection services);
}
}
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.ReliableMessage
{
public class MemberShipTable
{
public string Id { get;private set; }
public MemberShipTable(string serviceName,string pendingMessageCheckerIp,DateTime iAmAliveUTCTime)
{
ServiceName = serviceName;
PendingMessageCheckerIp = pendingMessageCheckerIp;
IAmAliveUTCTime = iAmAliveUTCTime;
}
public string ServiceName { get;private set; }
public string PendingMessageCheckerIp { get; private set; }
public DateTime IAmAliveUTCTime { get; private set; }
}
}
using Pole.ReliableMessage.Abstraction;
using Pole.ReliableMessage.EventBus;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Text;
using System.Threading.Tasks;
namespace Pole.ReliableMessage.Messaging.CallBack
{
class DefaultMessageCallBackInfoGenerator : IMessageCallBackInfoGenerator
{
private readonly IMessageTypeIdGenerator _messageTypeIdGenerator;
public DefaultMessageCallBackInfoGenerator(IMessageTypeIdGenerator messageTypeIdGenerator)
{
_messageTypeIdGenerator = messageTypeIdGenerator;
}
public MessageCallBackInfo Generate(Type eventCallbackType)
{
var @interface = eventCallbackType.GetInterfaces().FirstOrDefault();
Func<object, object, Task<bool>> deleg = MakeCallBackFunc(eventCallbackType, @interface);
var eventType = @interface.GetGenericArguments()[0];
var eventCallbackArguemntType = @interface.GetGenericArguments()[1];
var enentName = _messageTypeIdGenerator.Generate(eventType);
MessageCallBackInfo messageCallBackInfo = new MessageCallBackInfo(enentName, deleg, eventCallbackType, eventCallbackArguemntType, eventType);
return messageCallBackInfo;
}
private static Func<object, object, Task<bool>> MakeCallBackFunc(Type eventType, Type @interface)
{
var callbackParemeterType = @interface.GetGenericArguments()[1];
var argument = Expression.Parameter(typeof(object));
var paremeter = Expression.Parameter(typeof(object));
// var typedParemeter = Expression.Parameter(eventType);
var typedcallbackParemeter = Expression.Convert(argument, callbackParemeterType);
var typedParemeter = Expression.Convert(paremeter, eventType);
var callBackMethod = eventType.GetMethod("Callback");
var call = Expression.Call(typedParemeter, callBackMethod, typedcallbackParemeter);
//var innerParemeter = eventType.GetInterfaces().FirstOrDefault();
var lambda = Expression.Lambda<Func<object, object, Task<bool>>>(call, true, argument, paremeter);
var deleg = lambda.Compile();
return deleg;
}
}
}
using Pole.ReliableMessage.Abstraction;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
namespace Pole.ReliableMessage.Messaging.CallBack
{
class DefaultMessageCallBackRegister : IMessageCallBackRegister
{
private readonly IMessageCallBackInfoGenerator _messageCallBackInfoGenerator;
private readonly IMessageCallBackInfoStore _messageCallBackInfoStore;
public DefaultMessageCallBackRegister(IMessageCallBackInfoGenerator messageCallBackInfoGenerator, IMessageCallBackInfoStore messageCallBackInfoStore)
{
_messageCallBackInfoGenerator = messageCallBackInfoGenerator;
_messageCallBackInfoStore = messageCallBackInfoStore;
}
public async Task Register(IEnumerable<Type> eventCallbackTypes)
{
foreach(var eventCallbackType in eventCallbackTypes)
{
var messageCallBackInfo = _messageCallBackInfoGenerator.Generate(eventCallbackType);
await _messageCallBackInfoStore.Add(messageCallBackInfo);
}
}
}
}
using Pole.ReliableMessage.Abstraction;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
namespace Pole.ReliableMessage.Messaging.CallBack
{
public class MessageCallBackInfo
{
private Func<object, object, Task<bool>> _callBack;
public MessageCallBackInfo(string messageTypeId, Func<object, object, Task<bool>> callBack, Type eventCallbackType, Type eventCallbackArguemntType,Type eventType)
{
MessageTypeId = messageTypeId;
_callBack = callBack;
EventCallbackType = eventCallbackType;
EventCallbackArguemntType = eventCallbackArguemntType;
EventType = eventType;
}
public string MessageTypeId { get;private set; }
public Type EventType { get; private set; }
public Type EventCallbackType { get; private set; }
public Type EventCallbackArguemntType { get; private set; }
public Task<bool> Invoke(object parameter, object reliableEvent)
{
return _callBack(parameter, reliableEvent);
}
}
}
using Pole.ReliableMessage.Abstraction;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
namespace Pole.ReliableMessage.Messaging.CallBack
{
public class MessageCallBackInfoInMemoryStore : Dictionary<string, MessageCallBackInfo>, IMessageCallBackInfoStore
{
private readonly ILogger<MessageCallBackInfoInMemoryStore> _logger;
public MessageCallBackInfoInMemoryStore(ILogger<MessageCallBackInfoInMemoryStore> logger)
{
_logger = logger;
}
public async Task Add(MessageCallBackInfo messageCallBackInfo)
{
await Task.CompletedTask;
if (TryGetValue(messageCallBackInfo.MessageTypeId, out MessageCallBackInfo info))
{
throw new Exception($"Add MessageCallBackInfo Faild , MessageCallBackInfo Already Added ,MessageTypeId:{messageCallBackInfo.MessageTypeId}");
}
Add(messageCallBackInfo.MessageTypeId, messageCallBackInfo);
_logger.LogDebug($"Add MessageCallBackInfo Success ,MessageTypeId:{messageCallBackInfo.MessageTypeId}");
}
public async Task<MessageCallBackInfo> Get(string messageTypeId)
{
await Task.CompletedTask;
if (TryGetValue(messageTypeId, out MessageCallBackInfo info))
{
return info;
}
return null;
}
}
}
using Pole.ReliableMessage.Core;
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.ReliableMessage.Messaging.CallBack
{
public class MessageCallBackResponseStatus : Enumeration
{
public static MessageCallBackResponseStatus Finised = new MessageCallBackResponseStatus(3, "已完成");
public static MessageCallBackResponseStatus Unfinished = new MessageCallBackResponseStatus(6, "未完成");
public static MessageCallBackResponseStatus Unusual = new MessageCallBackResponseStatus(9, "异常");
public MessageCallBackResponseStatus(int id, string name) : base(id, name)
{
}
}
}
using Pole.ReliableMessage.Abstraction;
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.ReliableMessage.Messaging
{
class DefaultJsonConverter : IJsonConverter
{
public T Deserialize<T>(string json)
{
return System.Text.Json.JsonSerializer.Deserialize<T>(json);
}
public object Deserialize(string json, Type type)
{
return System.Text.Json.JsonSerializer.Deserialize(json, type);
}
public string Serialize(object obj)
{
return System.Text.Json.JsonSerializer.Serialize(obj);
}
}
}
using Pole.ReliableMessage.Abstraction;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Text;
using System.Threading.Tasks;
namespace Pole.ReliableMessage.Messaging
{
class DefaultMessageBuffer : IMessageBuffer
{
private readonly IMessageStorage _storage;
private readonly System.Collections.Concurrent.ConcurrentDictionary<string,Message> Messages = new System.Collections.Concurrent.ConcurrentDictionary<string, Message>();
private readonly ILogger<DefaultMessageBuffer> _logger;
public DefaultMessageBuffer(IMessageStorage storage, ILogger<DefaultMessageBuffer> logger)
{
_storage = storage;
_logger = logger;
}
public async Task Flush()
{
/// 通过 MessageTypeId 是否为空 判断 消息是否为 DefaultReliableBus Publish 完成后的消息状态修改缓冲,
var toUpdateStatusMessageKeyValuePairs = Messages.Where(m => string.IsNullOrEmpty(m.Value.MessageTypeId));
var toUpdateStatusMessages= toUpdateStatusMessageKeyValuePairs.Select(m=>m.Value).ToArray();
var toUpdateStatusMessageIds = toUpdateStatusMessageKeyValuePairs.Select(m => m.Key).ToList();
await _storage.UpdateStatus(toUpdateStatusMessages);
_logger.LogDebug($"DefaultMessageBuffer.Flush update successfully, Message count{toUpdateStatusMessages.Count()}");
toUpdateStatusMessageIds.ForEach(m => {
Messages.TryRemove(m,out Message message);
});
var toSavedMessageKeyValuePairs = Messages.Where(m => !string.IsNullOrEmpty(m.Value.MessageTypeId));
var toSavedMessages = toSavedMessageKeyValuePairs.Select(m => m.Value).ToArray();
var toSavedMessagesIds = toSavedMessageKeyValuePairs.Select(m => m.Key).ToList();
await _storage.Save(toSavedMessages);
_logger.LogDebug($"DefaultMessageBuffer.Flush save successfully, Message count{toSavedMessages.Count()}");
toSavedMessagesIds.ForEach(m => {
Messages.TryRemove(m, out Message message);
});
}
public Task<bool> Add(Message message)
{
Messages.TryAdd(message.Id, message);
return Task.FromResult(true);
}
public async Task<List<Message>> GetAll(Func<Message, bool> filter)
{
await Task.CompletedTask;
return Messages.Values.Where(filter).ToList();
}
}
}
using Pole.ReliableMessage.Abstraction;
using Pole.ReliableMessage.Messaging;
using Pole.ReliableMessage.Messaging.CallBack;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
namespace Pole.ReliableMessage.Messaging
{
class DefaultMessageChecker : IMessageChecker
{
private readonly IMessageCallBackInfoStore _messageCallBackInfoStore;
private readonly ILogger<DefaultMessageChecker> _logger;
private readonly IJsonConverter _jsonConverter;
private readonly IServiceProvider _serviceProvider;
public DefaultMessageChecker(IMessageCallBackInfoStore messageCallBackInfoStore, ILogger<DefaultMessageChecker> logger, IJsonConverter jsonConverter, IServiceProvider serviceProvider)
{
_messageCallBackInfoStore = messageCallBackInfoStore;
_logger = logger;
_jsonConverter = jsonConverter;
_serviceProvider = serviceProvider;
}
public async Task<MessageCheckerResult> GetResult(Message message)
{
try
{
var callBackInfo = await _messageCallBackInfoStore.Get(message.MessageTypeId);
if (callBackInfo == null)
{
_logger.LogError($"Can't find the callBackInfo, MessageTypeId:{message.MessageTypeId}");
return MessageCheckerResult.NotFinished;
}
using (var scope = _serviceProvider.CreateScope())
{
var callback = scope.ServiceProvider.GetRequiredService(callBackInfo.EventCallbackType);
var argument = _jsonConverter.Deserialize(message.RePushCallBackParameterValue, callBackInfo.EventCallbackArguemntType);
var result = await callBackInfo.Invoke(argument, callback);
if (_logger.IsEnabled(LogLevel.Debug))
{
var messageInfoDetail = _jsonConverter.Serialize(message);
_logger.LogDebug($"DefaultMessageChecker IsFinished Result:{result.ToString()},MessageTypeId:{message.MessageTypeId},MessageDetail:{messageInfoDetail}");
}
if (result)
{
var @event = _jsonConverter.Deserialize(message.Content, callBackInfo.EventType);
return new MessageCheckerResult(true, @event);
}
return MessageCheckerResult.NotFinished;
}
}
catch (Exception ex)
{
var messageInfoDetail = _jsonConverter.Serialize(message);
_logger.LogError(ex, $"DefaultMessageChecker.IsFinished Error, MessageTypeId:{message.MessageTypeId},MessageDetail:{messageInfoDetail}");
return MessageCheckerResult.NotFinished;
}
}
}
}
using Pole.ReliableMessage.Abstraction;
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.ReliableMessage.Messaging
{
class DefaultMessageTypeIdGenerator : IMessageTypeIdGenerator
{
public string Generate(Type messageType)
{
return messageType.FullName;
}
}
}
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.ReliableMessage.Messaging
{
public class Message : IComparable<Message>
{
/// <summary>
/// 这里id 永远为 string
/// </summary>
public string Id { get; set; }
/// <summary>
/// 消息状态
/// </summary>
public MessageStatus MessageStatus { get; set; }
/// <summary>
/// 消息状态Id
/// </summary>
public int MessageStatusId { get; set; }
/// <summary>
/// 预发送的时间
/// </summary>
public DateTime AddedUTCTime { get; set; }
/// <summary>
/// 用来存放 消息内容 目前没有大小限制 这个需要根据 实际情况 , mongodb 和 rabiitmq 的 综合指标来定 ,开发人员 在使用超大对象时需谨慎
/// </summary>
public string Content { get; set; }
/// <summary>
/// 消息的名称 用来鉴别不同的消息
/// </summary>
public string MessageTypeId { get; set; }
/// <summary>
/// 当前消息 回调者所需参数值
/// </summary>
public string RePushCallBackParameterValue { get; set; }
///// <summary>
///// 最后一次的重试时间
///// </summary>
//public DateTime LastRetryUTCTime { get; set; }
/// <summary>
/// 下一次的重试时间
/// </summary>
public DateTime NextRetryUTCTime { get; set; }
/// <summary>
/// 重试次数
/// </summary>
public int RetryTimes { get; set; } = 0;
public int CompareTo(Message other)
{
return Id.CompareTo(other.Id);
}
}
public class MessageIEqualityComparer : IEqualityComparer<Message>
{
public static MessageIEqualityComparer Default = new MessageIEqualityComparer();
public bool Equals(Message x, Message y)
{
return x.CompareTo(y) == 0;
}
public int GetHashCode(Message obj)
{
return obj.Id.GetHashCode();
}
}
}
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.ReliableMessage.Messaging
{
public class MessageCheckerResult
{
public static MessageCheckerResult NotFinished = new MessageCheckerResult(false);
public MessageCheckerResult(bool isFinished,object @event):this(isFinished)
{
Event = @event;
}
public MessageCheckerResult(bool isFinished)
{
IsFinished = isFinished;
}
public bool IsFinished { get; set; }
public object Event { get; set; }
}
}
using Pole.ReliableMessage.Core;
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.ReliableMessage.Messaging
{
public class MessageStatus : Enumeration
{
public static MessageStatus Pending = new MessageStatus(3,"待发送");
public static MessageStatus Pushed = new MessageStatus(6,"已发送");
public static MessageStatus Canced = new MessageStatus(9,"已取消");
public static MessageStatus Handed = new MessageStatus(12, "已处理");
public MessageStatus(int id,string name ):base(id,name)
{
}
}
}
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
</PropertyGroup>
<ItemGroup>
<Folder Include="Filter\" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Microsoft.AspNetCore.Http.Abstractions" Version="2.2.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="3.1.0" />
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="3.1.0" />
<PackageReference Include="Microsoft.Extensions.Http" Version="3.1.0" />
<PackageReference Include="Microsoft.Extensions.Options" Version="3.1.0" />
<PackageReference Include="System.Text.Json" Version="4.7.0" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Pole.ReliableMessage.Storage.Abstraction\Pole.ReliableMessage.Storage.Abstraction.csproj" />
</ItemGroup>
</Project>
using Pole.ReliableMessage.Abstraction;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
namespace Pole.ReliableMessage.Processor
{
public class LoopProcessor : ProcessorBase
{
private IProcessor _processor;
private readonly ILoggerFactory _loggerFactory;
private readonly ITimeHelper _timeHelper;
public LoopProcessor(IProcessor processor, ILoggerFactory loggerFactory, ITimeHelper timeHelper)
{
_processor = processor;
_loggerFactory = loggerFactory;
_timeHelper = timeHelper;
}
public override string Name => "LoopProcessor";
public override async Task Process(ProcessingContext context)
{
var logger = _loggerFactory.CreateLogger<LoopProcessor>();
while (!context.IsStopping)
{
try
{
logger.LogDebug($"{_timeHelper.GetAppropriateFormatedDateString()}...{ this.ToString() } process start");
await _processor.Process(context);
logger.LogDebug($"{_timeHelper.GetAppropriateFormatedDateString()}...{ this.ToString() } process compelete");
}
catch (Exception ex)
{
logger.LogError(ex, $"{_timeHelper.GetAppropriateFormatedDateString()}...{ this.ToString() } process error");
}
}
}
public override string ToString()
{
var strArray = new string[2];
strArray[0] = Name;
strArray[1] = _processor.Name;
return string.Join("_", strArray);
}
}
}
using Pole.ReliableMessage.Abstraction;
using Pole.ReliableMessage.Messaging;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
namespace Pole.ReliableMessage.Processor
{
public class MessageBufferFlushProcessor : ProcessorBase
{
private readonly IMessageBuffer _messageBuffer;
private readonly ReliableMessageOption _options;
private readonly ILogger<MessageBufferFlushProcessor> _logger;
public MessageBufferFlushProcessor(IMessageBuffer messageBuffer, IOptions<ReliableMessageOption> options, ILogger<MessageBufferFlushProcessor> logger)
{
_messageBuffer = messageBuffer;
_options = options.Value ?? throw new Exception($"{nameof(ReliableMessageOption)} Must be injected");
_logger = logger;
}
public override string Name => nameof(PendingMessageCheckProcessor);
public override async Task Process(ProcessingContext context)
{
try
{
await _messageBuffer.Flush();
}
catch(Exception ex)
{
_logger.LogError(ex, $"MessageBufferFlushProcessor Process Error");
}
finally
{
await Task.Delay(_options.PushedMessageFlushInterval * 1000);
}
}
}
}
using Pole.ReliableMessage.Abstraction;
using Pole.ReliableMessage.Messaging;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.NetworkInformation;
using System.Net.Sockets;
using System.Text;
using System.Threading.Tasks;
using Pole.ReliableMessage.Storage.Abstraction;
namespace Pole.ReliableMessage.Processor
{
class PendingMessageCheckProcessor : ProcessorBase
{
private readonly IMessageStorage _storage;
private readonly ReliableMessageOption _options;
private readonly IMessageBuffer _messageBuffer;
private readonly ITimeHelper _timeHelper;
private readonly IMessageChecker _messageChecker;
private readonly IRetryTimeDelayCalculator _retryTimeDelayCalculator;
private readonly IMemberShipTable _memberShipTable;
private readonly ILogger<PendingMessageCheckProcessor> _logger;
private readonly IServiceIPv4AddressProvider _serviceIPv4AddressProvider;
private readonly IMessageBus _messageBus;
public PendingMessageCheckProcessor(IMessageStorage storage, IOptions<ReliableMessageOption> options, IMessageBuffer messageBuffer, ITimeHelper timeHelper, IMessageChecker messageChecker, IRetryTimeDelayCalculator retryTimeDelayCalculator, IMemberShipTable memberShipTable, ILogger<PendingMessageCheckProcessor> logger, IServiceIPv4AddressProvider serviceIPv4AddressProvider, IMessageBus messageBus)
{
_storage = storage;
_options = options.Value ?? throw new Exception($"{nameof(ReliableMessageOption)} Must be injected");
_messageBuffer = messageBuffer;
_timeHelper = timeHelper;
_messageChecker = messageChecker;
_retryTimeDelayCalculator = retryTimeDelayCalculator;
_memberShipTable = memberShipTable;
_logger = logger;
_serviceIPv4AddressProvider = serviceIPv4AddressProvider;
_messageBus = messageBus;
}
public override string Name => nameof(PendingMessageCheckProcessor);
public override async Task Process(ProcessingContext context)
{
try
{
var iPStr = _serviceIPv4AddressProvider.Get();
var isPendingChecker = await _memberShipTable.IsPendingMessageCheckerServiceInstance(iPStr);// 这里可以把时间加上去
if (!isPendingChecker)
{
_logger.LogDebug("I an not the PendingChecker ,Ignore check");
return;
}
var now = _timeHelper.GetUTCNow();
var pendingMessages = await _storage.GetMany(m => m.MessageStatusId == MessageStatus.Pending.Id &&m.NextRetryUTCTime <= now && m.AddedUTCTime <= now.AddSeconds(-1 * _options.PendingMessageFirstProcessingWaitTime)&&m.AddedUTCTime>= now.AddSeconds(-1 * _options.PendingMessageCheckingTimeOutSeconds),_options.PendingMessageCheckBatchCount);
var cachedPushedMessage = await _messageBuffer.GetAll(m => m.MessageStatus == MessageStatus.Pushed);
var finalToCheckedMessages = pendingMessages.Except(cachedPushedMessage, MessageIEqualityComparer.Default).ToList();
if (_logger.IsEnabled(LogLevel.Debug))
{
_logger.LogDebug($"PendingMessageCheckProcessor pendingMessages count:{pendingMessages.Count}");
_logger.LogDebug($"PendingMessageCheckProcessor cachedPushedMessage count:{cachedPushedMessage.Count}");
_logger.LogDebug($"PendingMessageCheckProcessor finalToCheckedMessages count:{finalToCheckedMessages.Count}");
}
finalToCheckedMessages.AsParallel().ForAll(async m => await Retry(m, now));
}
catch (Exception ex)
{
_logger.LogError(ex, $"PendingMessageCheckProcessor Process Error");
}
finally
{
await Task.Delay(_options.PendingMessageRetryInterval * 1000);
}
}
private async Task Retry(Message message, DateTime retryTime)
{
try
{
await Task.CompletedTask;
if (_logger.IsEnabled(LogLevel.Debug))
{
_logger.LogDebug($"PendingMessageCheckProcessor.Retry ,message:{message.Id} begin Retry");
}
var nextRetryDelay = _retryTimeDelayCalculator.Get(message.RetryTimes, _options.MaxPendingMessageRetryDelay);
message.NextRetryUTCTime = retryTime.AddSeconds(nextRetryDelay);
if (retryTime > message.AddedUTCTime.AddSeconds(_options.PendingMessageTimeOut))
{
if (_logger.IsEnabled(LogLevel.Debug))
{
_logger.LogDebug($"PendingMessageCheckProcessor.Retry ,message:{message.Id} would be Canced ,PendingMessageTimeOut:{_options.PendingMessageTimeOut}");
}
message.NextRetryUTCTime = DateTime.MinValue;
message.MessageStatus = MessageStatus.Canced;
await _messageBuffer.Add(message);
return;
}
message.RetryTimes++;
var messageCheckerResult = await _messageChecker.GetResult(message);
if (messageCheckerResult.IsFinished)
{
if (_logger.IsEnabled(LogLevel.Debug))
{
_logger.LogDebug($"PendingMessageCheckProcessor.Retry ,message:{message.Id} would be Pushed");
}
message.MessageStatus = MessageStatus.Pushed;
await _messageBus.Publish(messageCheckerResult.Event, message.Id);
}
else
{
if (_logger.IsEnabled(LogLevel.Debug))
{
_logger.LogDebug($"PendingMessageCheckProcessor.Retry ,message:{message.Id} would be Retry next time");
}
}
await _messageBuffer.Add(message);
}
catch (Exception ex)
{
_logger.LogError(ex, $"PendingMessageCheckProcessor Retry ,Message:{message.Id} retry with errors");
}
}
}
}
using Pole.ReliableMessage.Abstraction;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Pole.ReliableMessage.Storage.Abstraction;
namespace Pole.ReliableMessage.Processor
{
class PendingMessageServiceInstanceCheckProcessor : ProcessorBase
{
private readonly ReliableMessageOption _options;
private readonly ITimeHelper _timeHelper;
private readonly IMemberShipTable _memberShipTable;
private readonly ILogger<PendingMessageServiceInstanceCheckProcessor> _logger;
private readonly IServiceIPv4AddressProvider _serviceIPv4AddressProvider;
public PendingMessageServiceInstanceCheckProcessor(IOptions<ReliableMessageOption> options, ITimeHelper timeHelper, IMemberShipTable memberShipTable, ILogger<PendingMessageServiceInstanceCheckProcessor> logger, IServiceIPv4AddressProvider serviceIPv4AddressProvider)
{
_options = options.Value ?? throw new Exception($"{nameof(ReliableMessageOption)} Must be injected");
_timeHelper = timeHelper;
_memberShipTable = memberShipTable;
_logger = logger;
_serviceIPv4AddressProvider = serviceIPv4AddressProvider;
}
public override string Name => nameof(PendingMessageCheckProcessor);
public override async Task Process(ProcessingContext context)
{
try
{
var now = _timeHelper.GetUTCNow();
var iPStr = _serviceIPv4AddressProvider.Get();
_logger.LogDebug($"Current instance ip is {iPStr}");
var currentCheckIp = await _memberShipTable.GetPendingMessageCheckerServiceInstanceIp(now.AddSeconds(-1 * _options.PendingMessageCheckerInstanceIAnAliveTimeout));
if (string.IsNullOrEmpty(currentCheckIp))
{
var addInstanceResult = await _memberShipTable.AddCheckerServiceInstanceAndDeleteOthers(iPStr, now);
if (addInstanceResult)
{
_logger.LogInformation($"I am the PendingMessageCheck now, ip:{iPStr}");
return;
}
_logger.LogError($"There is no PendingMessageChecker ,I want to be the PendingMessageCheck ,but faild ,memberShipTable.AddCheckerServiceInstance faild , ip:{iPStr}");
return;
}
if (currentCheckIp == iPStr)
{
_logger.LogDebug($"Begin update pendingMessageChecker iAmAliveUTCTime");
await _memberShipTable.UpdateIAmAlive(currentCheckIp, now);
_logger.LogDebug($"Update pendingMessageChecker iAmAliveUTCTime successfully");
}
}
catch (Exception ex)
{
_logger.LogError(ex, $"PendingMessageServiceInstanceCheckProcessor Process Error");
}
finally
{
await Task.Delay(_options.PendingMessageCheckerInstanceIAnAliveTimeUpdateDelay * 1000);
}
}
}
}
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
namespace Pole.ReliableMessage.Processor
{
public class ProcessingContext
{
public ProcessingContext(CancellationToken cancellationToken)
{
CancellationToken = cancellationToken;
}
public CancellationToken CancellationToken { get; }
public bool IsStopping => CancellationToken.IsCancellationRequested;
}
}
using Pole.ReliableMessage.Abstraction;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
namespace Pole.ReliableMessage.Processor
{
public abstract class ProcessorBase : IProcessor
{
public abstract string Name { get; }
public abstract Task Process(ProcessingContext context);
public override string ToString()
{
return Name;
}
}
}
using Microsoft.Extensions.DependencyInjection;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Text;
namespace Pole.ReliableMessage
{
public class ReliableMessageOption
{
public List<IReliableMessageOptionExtension> ReliableMessageOptionExtensions { get; private set; } = new List<IReliableMessageOptionExtension>();
public List<Assembly> EventCallbackAssemblies { get; private set; }
public List<Assembly> EventHandlerAssemblies { get; private set; }
/// <summary>
/// 待发送消息重试间隔 单位 秒
/// </summary>
public int PendingMessageRetryInterval { get; set; } = 10;
/// <summary>
/// 待发送消息最大重试次数 , 0 为 无上限
/// </summary>
public int PendingMessageRetryTimes { get; set; } = 0;
/// <summary>
/// 预发送消息超时时间 单位 秒
/// </summary>
public int PendingMessageTimeOut { get; set; } = 10*60;
/// <summary>
/// 预发送消息检查时每一次获取的消息数量
/// </summary>
public int PendingMessageCheckBatchCount { get; set; } = 1000;
/// <summary>
/// 预发送消息状态检查最后时间 单位 秒
/// </summary>
public int PendingMessageCheckingTimeOutSeconds { get; set; } = 13*60;
/// <summary>
/// 已发送的消息缓冲区 flush to storage 的时间间隔 单位 秒
/// </summary>
public int PushedMessageFlushInterval { get; set; } = 2;
/// <summary>
/// PendingMessage 第一次处理等待时间 单位 秒
/// </summary>
public int PendingMessageFirstProcessingWaitTime { get; set; } = 2+10;
/// <summary>
/// 每次重试之间最大间隔 单位 秒
/// </summary>
public int MaxPendingMessageRetryDelay { get; set; } = 2 * 60;
/// <summary>
/// PendingMessageCheck 实例更新 存活时间的时间间隔 单位 秒
/// </summary>
public int PendingMessageCheckerInstanceIAnAliveTimeUpdateDelay { get; set; } = 10;
/// <summary>
/// PendingMessageCheck 实例存活超时时间 单位 秒
/// </summary>
public int PendingMessageCheckerInstanceIAnAliveTimeout { get; set; } = 3*10;
/// <summary>
/// 当主机有多个网络时通过指定网关地址找到合适的服务ip地址
/// </summary>
public string NetworkInterfaceGatewayAddress { get; set; } = string.Empty;
public ReliableMessageOption AddEventAssemblies (params Assembly [] assemblies)
{
EventCallbackAssemblies = assemblies.ToList();
return this;
}
public ReliableMessageOption AddEventAssemblies(IEnumerable<Assembly> assemblies)
{
EventCallbackAssemblies = assemblies.ToList();
return this;
}
public ReliableMessageOption AddEventHandlerAssemblies(params Assembly[] assemblies)
{
EventHandlerAssemblies = assemblies.ToList();
return this;
}
public ReliableMessageOption AddEventHandlerAssemblies(IEnumerable<Assembly> assemblies)
{
EventHandlerAssemblies = assemblies.ToList();
return this;
}
}
}
using Pole.ReliableMessage.Abstraction;
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.ReliableMessage.Utils
{
class DefaulTimeHelper : ITimeHelper
{
public string GetAppropriateFormatedDateString()
{
return DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss.fff");
}
public DateTime GetUTCNow()
{
return DateTime.UtcNow;
}
}
}
using Pole.ReliableMessage.Abstraction;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.NetworkInformation;
using System.Net.Sockets;
using System.Text;
namespace Pole.ReliableMessage.Utils
{
class DefaultServiceIPv4AddressProvider : IServiceIPv4AddressProvider
{
private readonly ReliableMessageOption _options;
private string _ipAddress;
public DefaultServiceIPv4AddressProvider(IOptions<ReliableMessageOption> options)
{
_options = options.Value;
Init();
}
private void Init()
{
var gatewayAddress = _options.NetworkInterfaceGatewayAddress;
NetworkInterface networkInterface = null;
if (string.IsNullOrEmpty(_options.NetworkInterfaceGatewayAddress))
{
networkInterface = NetworkInterface.GetAllNetworkInterfaces()
.OrderByDescending(c => c.Speed)
.Where(m => m.NetworkInterfaceType != NetworkInterfaceType.Loopback && m.OperationalStatus == OperationalStatus.Up)
.FirstOrDefault();
}
else
{
networkInterface = NetworkInterface.GetAllNetworkInterfaces()
.OrderByDescending(c => c.Speed).Where(m => m.NetworkInterfaceType != NetworkInterfaceType.Loopback && m.OperationalStatus == OperationalStatus.Up).Where(m => m.GetIPProperties().GatewayAddresses.FirstOrDefault(c => c.Address.AddressFamily == AddressFamily.InterNetwork)?.Address.ToString() == gatewayAddress)
.FirstOrDefault();
}
if (networkInterface == null)
{
throw new Exception($"Not found correct NetworkInterface, option.NetworkInterfaceGatewayAddress:{gatewayAddress}");
}
var props = networkInterface.GetIPProperties();
// get first IPV4 address assigned to this interface
var firstIpV4Address = props.UnicastAddresses
.Where(c => c.Address.AddressFamily == AddressFamily.InterNetwork)
.Select(c => c.Address)
.FirstOrDefault();
_ipAddress = firstIpV4Address.ToString();
}
public string Get()
{
return _ipAddress;
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment