diff --git a/Pole.sln b/Pole.sln
index 4adbd7f..1621bad 100644
--- a/Pole.sln
+++ b/Pole.sln
@@ -21,7 +21,23 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "samples", "samples", "{4A0F
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "apis", "apis", "{475116FC-DEEC-4255-94E4-AE7B8C85038D}"
EndProject
-Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Product.Api", "samples\apis\Product.Api\Product.Api.csproj", "{6A68E63D-ED4B-4F46-9A2E-AA7FE2B0032E}"
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Pole.ReliableMessage", "src\Pole.ReliableMessage\Pole.ReliableMessage.csproj", "{699C75AB-4814-4E16-A3F3-9735C4C609FE}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Pole.ReliableMessage.Masstransit", "src\Pole.ReliableMessage.Masstransit\Pole.ReliableMessage.Masstransit.csproj", "{051BECA5-5E65-4FCB-9B7F-C9E64809E218}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Pole.ReliableMessage.Dashboard", "src\Pole.ReliableMessage.Dashboard\Pole.ReliableMessage.Dashboard.csproj", "{9EF13D70-C3AF-471A-8AA6-603338B194B7}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Pole.ReliableMessage.Storage.Abstraction", "src\Pole.ReliableMessage.Storage.Abstraction\Pole.ReliableMessage.Storage.Abstraction.csproj", "{3D92F460-350B-4614-A6F6-C00A2D0FA9E2}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Pole.ReliableMessage.Storage.Mongodb", "src\Pole.ReliableMessage.Storage.Mongodb\Pole.ReliableMessage.Storage.Mongodb.csproj", "{793C73C6-93DE-4A56-B979-137914B247F2}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Pole.ReliableMessage.Storage.EntityframeworkCore", "src\Pole.ReliableMessage.Storage.EntityframeworkCore\Pole.ReliableMessage.Storage.EntityframeworkCore.csproj", "{805CF4F7-CCDC-4390-A92B-55E0FFA7F659}"
+EndProject
+Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Grpc", "Grpc", "{629045E7-B047-452A-AADA-ACB455B4FAFD}"
+EndProject
+Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Rest", "Rest", "{BA122337-74D2-4439-A10E-B20E14881290}"
+EndProject
+Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "webs", "webs", "{452B9D9E-881E-4E0E-A90B-98F2253F20F1}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
@@ -49,10 +65,30 @@ Global
{F40FE25F-6081-4B29-A7BD-CB5C24F6FDA8}.Debug|Any CPU.Build.0 = Debug|Any CPU
{F40FE25F-6081-4B29-A7BD-CB5C24F6FDA8}.Release|Any CPU.ActiveCfg = Release|Any CPU
{F40FE25F-6081-4B29-A7BD-CB5C24F6FDA8}.Release|Any CPU.Build.0 = Release|Any CPU
- {6A68E63D-ED4B-4F46-9A2E-AA7FE2B0032E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
- {6A68E63D-ED4B-4F46-9A2E-AA7FE2B0032E}.Debug|Any CPU.Build.0 = Debug|Any CPU
- {6A68E63D-ED4B-4F46-9A2E-AA7FE2B0032E}.Release|Any CPU.ActiveCfg = Release|Any CPU
- {6A68E63D-ED4B-4F46-9A2E-AA7FE2B0032E}.Release|Any CPU.Build.0 = Release|Any CPU
+ {699C75AB-4814-4E16-A3F3-9735C4C609FE}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {699C75AB-4814-4E16-A3F3-9735C4C609FE}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {699C75AB-4814-4E16-A3F3-9735C4C609FE}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {699C75AB-4814-4E16-A3F3-9735C4C609FE}.Release|Any CPU.Build.0 = Release|Any CPU
+ {051BECA5-5E65-4FCB-9B7F-C9E64809E218}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {051BECA5-5E65-4FCB-9B7F-C9E64809E218}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {051BECA5-5E65-4FCB-9B7F-C9E64809E218}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {051BECA5-5E65-4FCB-9B7F-C9E64809E218}.Release|Any CPU.Build.0 = Release|Any CPU
+ {9EF13D70-C3AF-471A-8AA6-603338B194B7}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {9EF13D70-C3AF-471A-8AA6-603338B194B7}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {9EF13D70-C3AF-471A-8AA6-603338B194B7}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {9EF13D70-C3AF-471A-8AA6-603338B194B7}.Release|Any CPU.Build.0 = Release|Any CPU
+ {3D92F460-350B-4614-A6F6-C00A2D0FA9E2}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {3D92F460-350B-4614-A6F6-C00A2D0FA9E2}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {3D92F460-350B-4614-A6F6-C00A2D0FA9E2}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {3D92F460-350B-4614-A6F6-C00A2D0FA9E2}.Release|Any CPU.Build.0 = Release|Any CPU
+ {793C73C6-93DE-4A56-B979-137914B247F2}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {793C73C6-93DE-4A56-B979-137914B247F2}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {793C73C6-93DE-4A56-B979-137914B247F2}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {793C73C6-93DE-4A56-B979-137914B247F2}.Release|Any CPU.Build.0 = Release|Any CPU
+ {805CF4F7-CCDC-4390-A92B-55E0FFA7F659}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {805CF4F7-CCDC-4390-A92B-55E0FFA7F659}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {805CF4F7-CCDC-4390-A92B-55E0FFA7F659}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {805CF4F7-CCDC-4390-A92B-55E0FFA7F659}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@@ -64,7 +100,15 @@ Global
{1C26BE3A-CBEA-47D1-97A0-6DB4F41DFF5A} = {9932C965-8B38-4F70-9E43-86DC56860E2B}
{F40FE25F-6081-4B29-A7BD-CB5C24F6FDA8} = {9932C965-8B38-4F70-9E43-86DC56860E2B}
{475116FC-DEEC-4255-94E4-AE7B8C85038D} = {4A0FB696-EC29-4A5F-B40B-A6FC56001ADB}
- {6A68E63D-ED4B-4F46-9A2E-AA7FE2B0032E} = {475116FC-DEEC-4255-94E4-AE7B8C85038D}
+ {699C75AB-4814-4E16-A3F3-9735C4C609FE} = {9932C965-8B38-4F70-9E43-86DC56860E2B}
+ {051BECA5-5E65-4FCB-9B7F-C9E64809E218} = {9932C965-8B38-4F70-9E43-86DC56860E2B}
+ {9EF13D70-C3AF-471A-8AA6-603338B194B7} = {9932C965-8B38-4F70-9E43-86DC56860E2B}
+ {3D92F460-350B-4614-A6F6-C00A2D0FA9E2} = {9932C965-8B38-4F70-9E43-86DC56860E2B}
+ {793C73C6-93DE-4A56-B979-137914B247F2} = {9932C965-8B38-4F70-9E43-86DC56860E2B}
+ {805CF4F7-CCDC-4390-A92B-55E0FFA7F659} = {9932C965-8B38-4F70-9E43-86DC56860E2B}
+ {629045E7-B047-452A-AADA-ACB455B4FAFD} = {475116FC-DEEC-4255-94E4-AE7B8C85038D}
+ {BA122337-74D2-4439-A10E-B20E14881290} = {475116FC-DEEC-4255-94E4-AE7B8C85038D}
+ {452B9D9E-881E-4E0E-A90B-98F2253F20F1} = {4A0FB696-EC29-4A5F-B40B-A6FC56001ADB}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {DB0775A3-F293-4043-ADB7-72BAC081E87E}
diff --git a/src/Pole.ReliableMessage.Dashboard/Pole.ReliableMessage.Dashboard.csproj b/src/Pole.ReliableMessage.Dashboard/Pole.ReliableMessage.Dashboard.csproj
new file mode 100644
index 0000000..9f5c4f4
--- /dev/null
+++ b/src/Pole.ReliableMessage.Dashboard/Pole.ReliableMessage.Dashboard.csproj
@@ -0,0 +1,7 @@
+
+
+
+ netstandard2.0
+
+
+
diff --git a/src/Pole.ReliableMessage.Masstransit/Abstraction/IReliableEvent.cs b/src/Pole.ReliableMessage.Masstransit/Abstraction/IReliableEvent.cs
new file mode 100644
index 0000000..5f80e76
--- /dev/null
+++ b/src/Pole.ReliableMessage.Masstransit/Abstraction/IReliableEvent.cs
@@ -0,0 +1,10 @@
+using System;
+using System.Collections.Generic;
+using System.Text;
+
+namespace Pole.ReliableMessage.Masstransit.Abstraction
+{
+ public interface IReliableEvent
+ {
+ }
+}
diff --git a/src/Pole.ReliableMessage.Masstransit/Abstraction/IReliableEventHandlerRegistrarFactory.cs b/src/Pole.ReliableMessage.Masstransit/Abstraction/IReliableEventHandlerRegistrarFactory.cs
new file mode 100644
index 0000000..eb09889
--- /dev/null
+++ b/src/Pole.ReliableMessage.Masstransit/Abstraction/IReliableEventHandlerRegistrarFactory.cs
@@ -0,0 +1,12 @@
+using Pole.ReliableMessage.EventBus;
+using System;
+using System.Collections.Generic;
+using System.Text;
+
+namespace Pole.ReliableMessage.Masstransit.Abstraction
+{
+ public interface IReliableEventHandlerRegistrarFactory
+ {
+ MasstransitEventHandlerRegistrar Create(Type eventHandlerType);
+ }
+}
diff --git a/src/Pole.ReliableMessage.Masstransit/DefaultReliableEventHandlerContext.cs b/src/Pole.ReliableMessage.Masstransit/DefaultReliableEventHandlerContext.cs
new file mode 100644
index 0000000..fbb2ec5
--- /dev/null
+++ b/src/Pole.ReliableMessage.Masstransit/DefaultReliableEventHandlerContext.cs
@@ -0,0 +1,21 @@
+using Pole.ReliableMessage.Abstraction;
+using MassTransit;
+using System;
+using System.Collections.Generic;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Pole.ReliableMessage.Masstransit
+{
+ public class DefaultReliableEventHandlerContext : IReliableEventHandlerContext
+ where TEvent : class
+ {
+ private readonly ConsumeContext _executeContext;
+ public DefaultReliableEventHandlerContext(ConsumeContext executeContext)
+ {
+ _executeContext = executeContext;
+ this.Event = executeContext.Message;
+ }
+ public TEvent Event { get; private set; }
+ }
+}
diff --git a/src/Pole.ReliableMessage.Masstransit/DefaultReliableEventHandlerRegistrarFactory.cs b/src/Pole.ReliableMessage.Masstransit/DefaultReliableEventHandlerRegistrarFactory.cs
new file mode 100644
index 0000000..e6877d3
--- /dev/null
+++ b/src/Pole.ReliableMessage.Masstransit/DefaultReliableEventHandlerRegistrarFactory.cs
@@ -0,0 +1,76 @@
+using Pole.ReliableMessage.Core;
+using Pole.ReliableMessage.EventBus;
+using Pole.ReliableMessage.Masstransit.Abstraction;
+using Microsoft.Extensions.Options;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+
+namespace Pole.ReliableMessage.Masstransit
+{
+ class DefaultReliableEventHandlerRegistrarFactory : IReliableEventHandlerRegistrarFactory
+ {
+ private readonly MasstransitRabbitmqOption _masstransitOptions;
+ public DefaultReliableEventHandlerRegistrarFactory(IOptions masstransitOptions)
+ {
+ _masstransitOptions = masstransitOptions.Value ?? throw new ArgumentNullException(nameof(masstransitOptions));
+ }
+ public MasstransitEventHandlerRegistrar Create(Type eventHnadler)
+ {
+
+ if (!eventHnadler.Name.EndsWith("EventHandler"))
+ {
+ throw new Exception("EventHandler Name Must EndWith EventHandler");
+ }
+ var reliableEventHandlerParemeterAttribute = eventHnadler.GetCustomAttributes(typeof(ReliableEventHandlerParemeterAttribute), true).FirstOrDefault();
+
+ var eventHandlerName = GetQueueName(reliableEventHandlerParemeterAttribute, eventHnadler, _masstransitOptions.QueueNamePrefix);
+
+ var parentEventHandler = eventHnadler.BaseType;
+ var eventType = parentEventHandler.GetGenericArguments().ToList().FirstOrDefault();
+
+ ushort prefetchCount = GetPrefetchCount(eventHnadler, reliableEventHandlerParemeterAttribute);
+
+ MasstransitEventHandlerRegistrar eventHandlerRegisterInvoker = new MasstransitEventHandlerRegistrar(eventHandlerName, eventHnadler, eventType, _masstransitOptions.RetryConfigure, prefetchCount);
+ return eventHandlerRegisterInvoker;
+ }
+
+ private string GetQueueName(object reliableEventHandlerParemeterAttribute, Type eventHnadler, string queueNamePrefix)
+ {
+ var eventHandlerDefaultName = $"eventHandler-{ eventHnadler.Name.Replace("EventHandler", "").ToLowerInvariant()}";
+ var eventHandlerName = string.IsNullOrEmpty(queueNamePrefix) ? eventHandlerDefaultName : $"{queueNamePrefix}-{eventHandlerDefaultName}";
+
+ if (reliableEventHandlerParemeterAttribute != null)
+ {
+ var reliableEventHandlerParemeterAttributeType = reliableEventHandlerParemeterAttribute.GetType();
+ var prefetchCountPropertyInfo = reliableEventHandlerParemeterAttributeType.GetProperty(nameof(ReliableEventHandlerParemeterAttribute.QueueHaType));
+ var queueHaTypeValue = Convert.ToInt32(prefetchCountPropertyInfo.GetValue(reliableEventHandlerParemeterAttribute));
+ if (queueHaTypeValue != 0)
+ {
+ var currentQueueType = Enumeration.FromValue(queueHaTypeValue);
+ eventHandlerName = currentQueueType.GenerateQueueName(eventHandlerName);
+ }
+ }
+
+ return eventHandlerName;
+ }
+
+ private ushort GetPrefetchCount(Type eventHnadler, object reliableEventHandlerParemeterAttribute)
+ {
+ var prefetchCount = _masstransitOptions.PrefetchCount;
+ if (reliableEventHandlerParemeterAttribute != null)
+ {
+ var reliableEventHandlerParemeterAttributeType = reliableEventHandlerParemeterAttribute.GetType();
+ var prefetchCountPropertyInfo = reliableEventHandlerParemeterAttributeType.GetProperty(nameof(ReliableEventHandlerParemeterAttribute.PrefetchCount));
+ var prefetchCountValue = Convert.ToUInt16(prefetchCountPropertyInfo.GetValue(reliableEventHandlerParemeterAttribute));
+ if (prefetchCountValue != 0)
+ {
+ prefetchCount = prefetchCountValue;
+ }
+ }
+
+ return prefetchCount;
+ }
+ }
+}
diff --git a/src/Pole.ReliableMessage.Masstransit/MassTransitHostedService.cs b/src/Pole.ReliableMessage.Masstransit/MassTransitHostedService.cs
new file mode 100644
index 0000000..d819701
--- /dev/null
+++ b/src/Pole.ReliableMessage.Masstransit/MassTransitHostedService.cs
@@ -0,0 +1,33 @@
+using MassTransit;
+using Microsoft.Extensions.Hosting;
+using Microsoft.Extensions.Logging;
+using System;
+using System.Collections.Generic;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace Pole.ReliableMessage.Masstransit
+{
+ public class MassTransitHostedService : IHostedService
+ {
+ readonly IBusControl _bus;
+ readonly ILogger _logger;
+ public MassTransitHostedService(IBusControl bus, ILogger logger)
+ {
+ _bus = bus;
+ _logger = logger;
+ }
+ public async Task StartAsync(CancellationToken cancellationToken)
+ {
+ await _bus.StartAsync();
+ _logger.LogInformation("MassTransit Bus Start Successful");
+ }
+
+ public async Task StopAsync(CancellationToken cancellationToken)
+ {
+ await _bus.StopAsync();
+ _logger.LogInformation("MassTransit Bus Stop Successful");
+ }
+ }
+}
diff --git a/src/Pole.ReliableMessage.Masstransit/MasstransitBasedMessageBus.cs b/src/Pole.ReliableMessage.Masstransit/MasstransitBasedMessageBus.cs
new file mode 100644
index 0000000..3f9665e
--- /dev/null
+++ b/src/Pole.ReliableMessage.Masstransit/MasstransitBasedMessageBus.cs
@@ -0,0 +1,24 @@
+using Pole.ReliableMessage.Abstraction;
+using Pole.ReliableMessage.Masstransit.Pipe;
+using System;
+using System.Collections.Generic;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace Pole.ReliableMessage.Masstransit
+{
+ class MasstransitBasedMessageBus : IMessageBus
+ {
+ public MasstransitBasedMessageBus(MassTransit.IBus bus)
+ {
+ _bus = bus;
+ }
+ private readonly MassTransit.IBus _bus;
+ public Task Publish(object @event,string reliableMessageId, CancellationToken cancellationToken = default(CancellationToken))
+ {
+ var pipe = new AddReliableMessageIdPipe(reliableMessageId);
+ return _bus.Publish(@event, pipe, cancellationToken);
+ }
+ }
+}
diff --git a/src/Pole.ReliableMessage.Masstransit/MasstransitEventHandlerRegistrar.cs b/src/Pole.ReliableMessage.Masstransit/MasstransitEventHandlerRegistrar.cs
new file mode 100644
index 0000000..a1ea419
--- /dev/null
+++ b/src/Pole.ReliableMessage.Masstransit/MasstransitEventHandlerRegistrar.cs
@@ -0,0 +1,46 @@
+using GreenPipes;
+using GreenPipes.Configurators;
+using MassTransit;
+using MassTransit.ExtensionsDependencyInjectionIntegration;
+using MassTransit.RabbitMqTransport;
+using Microsoft.Extensions.DependencyInjection;
+using System;
+using System.Collections.Generic;
+using System.Text;
+
+namespace Pole.ReliableMessage.EventBus
+{
+ public class MasstransitEventHandlerRegistrar
+ {
+ private readonly string _queueName;
+ private readonly Type _eventHandlerType;
+ private readonly Type _eventHandlerEventType;
+ private readonly Action _retryConfigure;
+ public readonly ushort _prefetchCount;
+ public MasstransitEventHandlerRegistrar(string eventHandlerName, Type eventHandlerType, Type eventHandlerEventType, Action retryConfigure, ushort prefetchCount)
+ {
+ _queueName = eventHandlerName;
+ _eventHandlerType = eventHandlerType;
+ _eventHandlerEventType = eventHandlerEventType;
+ _retryConfigure = retryConfigure;
+ _prefetchCount = prefetchCount;
+ }
+ public void RegisterEventHandler(IServiceCollectionConfigurator serviceCollectionConfigurator, IServiceCollection services)
+ {
+ serviceCollectionConfigurator.AddConsumer(_eventHandlerType);
+ }
+ public void RegisterQueue(IServiceCollectionConfigurator serviceCollectionConfigurator, IRabbitMqBusFactoryConfigurator rabbitMqBusFactoryConfigurator, IRabbitMqHost rabbitMqHost, IServiceProvider serviceProvider)
+ {
+
+ //serviceCollectionConfigurator.AddConsumer(_eventHandlerType);
+
+ rabbitMqBusFactoryConfigurator.ReceiveEndpoint(_queueName, conf =>
+ {
+ //conf.Consumer()
+ conf.ConfigureConsumer(serviceProvider, _eventHandlerType);
+ conf.PrefetchCount = _prefetchCount;
+ conf.UseMessageRetry(_retryConfigure);
+ });
+ }
+ }
+}
diff --git a/src/Pole.ReliableMessage.Masstransit/MasstransitMessageBusConfigurator.cs b/src/Pole.ReliableMessage.Masstransit/MasstransitMessageBusConfigurator.cs
new file mode 100644
index 0000000..b2f1742
--- /dev/null
+++ b/src/Pole.ReliableMessage.Masstransit/MasstransitMessageBusConfigurator.cs
@@ -0,0 +1,59 @@
+using Pole.ReliableMessage.Abstraction;
+using Pole.ReliableMessage.EventBus;
+using Pole.ReliableMessage.Masstransit.Abstraction;
+using MassTransit;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Options;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Reflection;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Pole.ReliableMessage.Masstransit
+{
+ class MasstransitMessageBusConfigurator : IMessageBusConfigurator
+ {
+ private readonly IReliableEventHandlerRegistrarFactory _reliableEventHandlerRegistrarFactory;
+ private readonly MasstransitRabbitmqOption _options;
+ public MasstransitMessageBusConfigurator(IReliableEventHandlerRegistrarFactory reliableEventHandlerRegistrarFactory, IOptions options)
+ {
+ _reliableEventHandlerRegistrarFactory = reliableEventHandlerRegistrarFactory;
+ _options = options.Value;
+ }
+ public async Task Configure(IServiceCollection services,IEnumerable eventHandlerTypes)
+ {
+ await Task.CompletedTask;
+ var eventHandlerRegistrars = GetEventHandlerRegistrars(eventHandlerTypes).ToList();
+ services.AddMassTransit(x =>
+ {
+ foreach (var eventHandlerRegistrar in eventHandlerRegistrars)
+ {
+ eventHandlerRegistrar.RegisterEventHandler(x, services);
+ }
+ x.AddBus(provider => Bus.Factory.CreateUsingRabbitMq(cfg =>
+ {
+ var host = cfg.Host(new Uri(_options.RabbitMqHostAddress), h =>
+ {
+ h.Username(_options.RabbitMqHostUserName);
+ h.Password(_options.RabbitMqHostPassword);
+
+ });
+ foreach (var eventHandlerRegistrar in eventHandlerRegistrars)
+ {
+ eventHandlerRegistrar.RegisterQueue(x, cfg, host, provider);
+ }
+ }));
+ });
+ }
+ private IEnumerable GetEventHandlerRegistrars(IEnumerable eventHandlerTypes)
+ {
+ foreach (var eventHandler in eventHandlerTypes)
+ {
+ var model = _reliableEventHandlerRegistrarFactory.Create(eventHandler);
+ yield return model;
+ }
+ }
+ }
+}
diff --git a/src/Pole.ReliableMessage.Masstransit/MasstransitRabbitmqOption.cs b/src/Pole.ReliableMessage.Masstransit/MasstransitRabbitmqOption.cs
new file mode 100644
index 0000000..ab955d1
--- /dev/null
+++ b/src/Pole.ReliableMessage.Masstransit/MasstransitRabbitmqOption.cs
@@ -0,0 +1,29 @@
+using GreenPipes;
+using GreenPipes.Configurators;
+using Microsoft.Extensions.DependencyInjection;
+using System;
+using System.Collections.Generic;
+using System.Text;
+
+namespace Pole.ReliableMessage.Masstransit
+{
+ public class MasstransitRabbitmqOption
+ {
+ public string RabbitMqHostAddress { get; set; }
+ public string RabbitMqHostUserName { get; set; }
+ public string RabbitMqHostPassword { get; set; }
+ public string QueueNamePrefix { get; set; } = string.Empty;
+ ///
+ /// 4 个并发
+ ///
+ public ushort PrefetchCount { get; set; } = 4;
+
+ public Action RetryConfigure { get; set; } =
+ r => r.Intervals(TimeSpan.FromSeconds(0.1)
+ , TimeSpan.FromSeconds(1)
+ , TimeSpan.FromSeconds(4)
+ , TimeSpan.FromSeconds(16)
+ , TimeSpan.FromSeconds(64)
+ );
+ }
+}
diff --git a/src/Pole.ReliableMessage.Masstransit/MasstransitReliableEventHandler.cs b/src/Pole.ReliableMessage.Masstransit/MasstransitReliableEventHandler.cs
new file mode 100644
index 0000000..c9c47f0
--- /dev/null
+++ b/src/Pole.ReliableMessage.Masstransit/MasstransitReliableEventHandler.cs
@@ -0,0 +1,65 @@
+using Pole.ReliableMessage.Abstraction;
+using Pole.ReliableMessage.Masstransit.Pipe;
+using Pole.ReliableMessage.Messaging;
+using MassTransit;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Logging;
+using System;
+using System.Collections.Generic;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Pole.ReliableMessage.Masstransit
+{
+
+ public abstract class ReliableEventHandler : IReliableEventHandler,IConsumer
+ where TEvent : class
+ {
+ private readonly IMessageStorage _messageStorage;
+ private readonly ILogger> _logger;
+ private readonly IServiceProvider _serviceProvider;
+ public ReliableEventHandler(IServiceProvider serviceProvider)
+ {
+ _messageStorage = serviceProvider.GetRequiredService(typeof(IMessageStorage)) as IMessageStorage;
+ var loggerFactory = serviceProvider.GetRequiredService(typeof(ILoggerFactory)) as ILoggerFactory;
+ _logger = loggerFactory.CreateLogger>();
+ _serviceProvider = serviceProvider;
+ }
+
+ public abstract Task Handle(IReliableEventHandlerContext context);
+ public async Task Consume(ConsumeContext context)
+ {
+ var messageId = GetReliableMessageId(context);
+ if (_logger.IsEnabled(LogLevel.Debug))
+ {
+ var jsonConveter = _serviceProvider.GetRequiredService(typeof(IJsonConverter)) as IJsonConverter;
+ var json = jsonConveter.Serialize(context.Message);
+ _logger.LogDebug($"Message Begin Handle,messageId:{messageId}, message content :{json}");
+ }
+
+ var retryAttempt = context.GetRetryAttempt();
+ if (retryAttempt == 0)
+ {
+ if (string.IsNullOrEmpty(messageId))
+ {
+ _logger.LogWarning($"Message has no ReliableMessageId, ignore");
+ return;
+ }
+ var isHandled = !await _messageStorage.CheckAndUpdateStatus(m => m.Id == messageId, MessageStatus.Handed);
+ if (isHandled)
+ {
+ _logger.LogTrace($"This message has handled begore ReliableMessageId:{messageId}, ignore");
+ return;
+ }
+ }
+ await Handle(new DefaultReliableEventHandlerContext(context));
+
+ _logger.LogDebug($"Message handled successfully ,messageId:{messageId}");
+ }
+
+ private string GetReliableMessageId(ConsumeContext context)
+ {
+ return context.Headers.Get(AddReliableMessageIdPipe.RELIABLE_MESSAGE_ID, string.Empty);
+ }
+ }
+}
diff --git a/src/Pole.ReliableMessage.Masstransit/Messaging/DefaultMessageIdGenerator.cs b/src/Pole.ReliableMessage.Masstransit/Messaging/DefaultMessageIdGenerator.cs
new file mode 100644
index 0000000..20b50e1
--- /dev/null
+++ b/src/Pole.ReliableMessage.Masstransit/Messaging/DefaultMessageIdGenerator.cs
@@ -0,0 +1,16 @@
+using Pole.ReliableMessage.Abstraction;
+using MassTransit;
+using System;
+using System.Collections.Generic;
+using System.Text;
+
+namespace Pole.ReliableMessage.Masstransit.Messaging
+{
+ class DefaultMessageIdGenerator : IMessageIdGenerator
+ {
+ public string Generate()
+ {
+ return NewId.Next().ToString("N").ToLowerInvariant();
+ }
+ }
+}
diff --git a/src/Pole.ReliableMessage.Masstransit/Pipe/AddReliableMessageIdPipe.cs b/src/Pole.ReliableMessage.Masstransit/Pipe/AddReliableMessageIdPipe.cs
new file mode 100644
index 0000000..594bd58
--- /dev/null
+++ b/src/Pole.ReliableMessage.Masstransit/Pipe/AddReliableMessageIdPipe.cs
@@ -0,0 +1,29 @@
+using GreenPipes;
+using MassTransit;
+using System;
+using System.Collections.Generic;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Pole.ReliableMessage.Masstransit.Pipe
+{
+ class AddReliableMessageIdPipe : IPipe
+ {
+ public const string RELIABLE_MESSAGE_ID = "ReliableMessageId";
+ private readonly string _reliableMessageId;
+ public AddReliableMessageIdPipe(string reliableMessageId)
+ {
+ _reliableMessageId = reliableMessageId;
+ }
+ public void Probe(ProbeContext context)
+ {
+
+ }
+
+ public async Task Send(PublishContext context)
+ {
+ context.Headers.Set(RELIABLE_MESSAGE_ID, _reliableMessageId);
+ await Task.CompletedTask;
+ }
+ }
+}
diff --git a/src/Pole.ReliableMessage.Masstransit/Pole.ReliableMessage.Masstransit.csproj b/src/Pole.ReliableMessage.Masstransit/Pole.ReliableMessage.Masstransit.csproj
new file mode 100644
index 0000000..e4c922f
--- /dev/null
+++ b/src/Pole.ReliableMessage.Masstransit/Pole.ReliableMessage.Masstransit.csproj
@@ -0,0 +1,17 @@
+
+
+
+ netstandard2.0
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/Pole.ReliableMessage.Masstransit/QueueHaType.cs b/src/Pole.ReliableMessage.Masstransit/QueueHaType.cs
new file mode 100644
index 0000000..6914c6c
--- /dev/null
+++ b/src/Pole.ReliableMessage.Masstransit/QueueHaType.cs
@@ -0,0 +1,32 @@
+using Pole.ReliableMessage.Core;
+using System;
+using System.Collections.Generic;
+using System.Text;
+
+namespace Pole.ReliableMessage.Masstransit
+{
+ public class QueueHaType : Enumeration
+ {
+ public static QueueHaType None = new QueueHaType(1, "无",string.Empty);
+ public static QueueHaType Default = new QueueHaType(2, "默认高可用","Rmd.");
+ public static QueueHaType Backlog = new QueueHaType(3, "消息可积压","Rmb.");
+ private readonly string _queuePrefix;
+ public QueueHaType(int id, string name) : base(id, name)
+ {
+ }
+ public QueueHaType(int id, string name,string prefix) : this(id, name)
+ {
+ _queuePrefix = prefix;
+ }
+ public string GenerateQueueName(string rawQueueName)
+ {
+ return string.Concat(_queuePrefix, rawQueueName);
+ }
+ }
+ public enum QueueHaTypeEnum:int
+ {
+ None = 1,
+ Default = 2,
+ Backlog = 3
+ }
+}
diff --git a/src/Pole.ReliableMessage.Masstransit/ReliableEventHandlerParemeterAttribute.cs b/src/Pole.ReliableMessage.Masstransit/ReliableEventHandlerParemeterAttribute.cs
new file mode 100644
index 0000000..e1e6d8e
--- /dev/null
+++ b/src/Pole.ReliableMessage.Masstransit/ReliableEventHandlerParemeterAttribute.cs
@@ -0,0 +1,15 @@
+using GreenPipes.Configurators;
+using System;
+using System.Collections.Generic;
+using System.Text;
+
+namespace Pole.ReliableMessage.Masstransit
+{
+ [AttributeUsage(AttributeTargets.Class)]
+ public class ReliableEventHandlerParemeterAttribute : Attribute
+ {
+ public ushort PrefetchCount { get; set; }
+ public QueueHaTypeEnum QueueHaType { get; set; } = QueueHaTypeEnum.Default;
+
+ }
+}
diff --git a/src/Pole.ReliableMessage.Masstransit/ReliableMessageOptionExtension.cs b/src/Pole.ReliableMessage.Masstransit/ReliableMessageOptionExtension.cs
new file mode 100644
index 0000000..5b05c2e
--- /dev/null
+++ b/src/Pole.ReliableMessage.Masstransit/ReliableMessageOptionExtension.cs
@@ -0,0 +1,38 @@
+using Pole.ReliableMessage;
+using Pole.ReliableMessage.Abstraction;
+using Pole.ReliableMessage.Masstransit;
+using Pole.ReliableMessage.Masstransit.Abstraction;
+using Microsoft.Extensions.DependencyInjection;
+using System;
+using System.Collections.Generic;
+using System.Text;
+using Pole.ReliableMessage.Masstransit.Messaging;
+
+namespace Microsoft.Extensions.DependencyInjection
+{
+ public static class ReliableMessageOptionExtension
+ {
+ public static ReliableMessageOption AddMasstransitRabbitmq(this ReliableMessageOption option, Action optionConfig)
+ {
+ option.ReliableMessageOptionExtensions.Add(new MasstransitRabbitmqExtension(optionConfig));
+ return option;
+ }
+ }
+ public class MasstransitRabbitmqExtension : IReliableMessageOptionExtension
+ {
+ private readonly Action _masstransitRabbitmqOption;
+ public MasstransitRabbitmqExtension(Action masstransitRabbitmqOption)
+ {
+ _masstransitRabbitmqOption = masstransitRabbitmqOption;
+ }
+ public void AddServices(IServiceCollection services)
+ {
+ services.Configure(_masstransitRabbitmqOption);
+ services.AddSingleton();
+ services.AddSingleton();
+ services.AddSingleton();
+ services.AddSingleton();
+ services.AddHostedService();
+ }
+ }
+}
diff --git a/src/Pole.ReliableMessage.Storage.Abstraction/IMemberShipTable.cs b/src/Pole.ReliableMessage.Storage.Abstraction/IMemberShipTable.cs
new file mode 100644
index 0000000..9f9b1ed
--- /dev/null
+++ b/src/Pole.ReliableMessage.Storage.Abstraction/IMemberShipTable.cs
@@ -0,0 +1,22 @@
+using System;
+using System.Collections.Generic;
+using System.Net;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Pole.ReliableMessage.Storage.Abstraction
+{
+ public interface IMemberShipTable
+ {
+ Task IsPendingMessageCheckerServiceInstance(string ipAddress);
+ Task UpdateIAmAlive(string ipAddress, DateTime dateTime);
+ ///
+ /// 如果当前 超时时间内 没有可用 实例 返回 空
+ ///
+ ///
+ ///
+ Task GetPendingMessageCheckerServiceInstanceIp(DateTime iamAliveEndTime);
+
+ Task AddCheckerServiceInstanceAndDeleteOthers(string ipAddress, DateTime aliveUTCTime);
+ }
+}
diff --git a/src/Pole.ReliableMessage.Storage.Abstraction/Pole.ReliableMessage.Storage.Abstraction.csproj b/src/Pole.ReliableMessage.Storage.Abstraction/Pole.ReliableMessage.Storage.Abstraction.csproj
new file mode 100644
index 0000000..9f5c4f4
--- /dev/null
+++ b/src/Pole.ReliableMessage.Storage.Abstraction/Pole.ReliableMessage.Storage.Abstraction.csproj
@@ -0,0 +1,7 @@
+
+
+
+ netstandard2.0
+
+
+
diff --git a/src/Pole.ReliableMessage.Storage.EntityframeworkCore/Pole.ReliableMessage.Storage.EntityframeworkCore.csproj b/src/Pole.ReliableMessage.Storage.EntityframeworkCore/Pole.ReliableMessage.Storage.EntityframeworkCore.csproj
new file mode 100644
index 0000000..9ef9d54
--- /dev/null
+++ b/src/Pole.ReliableMessage.Storage.EntityframeworkCore/Pole.ReliableMessage.Storage.EntityframeworkCore.csproj
@@ -0,0 +1,11 @@
+
+
+
+ netstandard2.0
+
+
+
+
+
+
+
diff --git a/src/Pole.ReliableMessage.Storage.Mongodb/MongoHost.cs b/src/Pole.ReliableMessage.Storage.Mongodb/MongoHost.cs
new file mode 100644
index 0000000..33f4d32
--- /dev/null
+++ b/src/Pole.ReliableMessage.Storage.Mongodb/MongoHost.cs
@@ -0,0 +1,19 @@
+using System;
+using System.Collections.Generic;
+using System.Text;
+
+namespace Pole.ReliableMessage.Storage.Mongodb
+{
+ public sealed class MongoHost
+ {
+ ///
+ /// 主机或者IP地址
+ ///
+ public string Host { get; set; }
+
+ ///
+ /// 端口号
+ ///
+ public int Port { get; set; }
+ }
+}
diff --git a/src/Pole.ReliableMessage.Storage.Mongodb/MongodbMemberShipTable.cs b/src/Pole.ReliableMessage.Storage.Mongodb/MongodbMemberShipTable.cs
new file mode 100644
index 0000000..72f8ee4
--- /dev/null
+++ b/src/Pole.ReliableMessage.Storage.Mongodb/MongodbMemberShipTable.cs
@@ -0,0 +1,94 @@
+using Pole.ReliableMessage.Abstraction;
+using Microsoft.Extensions.Configuration;
+using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Options;
+using MongoDB.Driver;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using Pole.ReliableMessage.Storage.Abstraction;
+
+namespace Pole.ReliableMessage.Storage.Mongodb
+{
+ class MongodbMemberShipTable : IMemberShipTable
+ {
+ private readonly MongoClient _mongoClient;
+ private readonly MongodbOption _mongodbOption;
+ private readonly ILogger _logger;
+ public MongodbMemberShipTable(IConfiguration configuration, MongoClient mongoClient, IOptions mongodbOption, ILogger logger)
+ {
+ _mongoClient = mongoClient;
+ _mongodbOption = mongodbOption.Value;
+ _logger = logger;
+ }
+ private IMongoDatabase GetActiveMessageDatabase(string activeMessageDatabase)
+ {
+ return _mongoClient.GetDatabase(activeMessageDatabase);
+ }
+ private IMongoCollection GetCollection()
+ {
+ var database = GetActiveMessageDatabase(_mongodbOption.MessageDatabaseName);
+ var messageCollectionName = _mongodbOption.MembershipCollectionName;
+ var collection = database.GetCollection(messageCollectionName);
+ return collection;
+ }
+ public async Task AddCheckerServiceInstanceAndDeleteOthers(string ipAddress, DateTime aliveUTCTime)
+ {
+ var collection = GetCollection();
+ var deleteResult = await collection.DeleteManyAsync(m => m.ServiceName == _mongodbOption.ServiceCollectionName);
+ MemberShipTable memberShipTable = new MemberShipTable(_mongodbOption.ServiceCollectionName, ipAddress, aliveUTCTime);
+ await collection.InsertOneAsync(memberShipTable);
+ return true;
+ }
+
+ public async Task GetPendingMessageCheckerServiceInstanceIp(DateTime iamAliveEndTime)
+ {
+ var collection = GetCollection();
+
+ var instances = (await collection.FindAsync(m => m.ServiceName == _mongodbOption.ServiceCollectionName && m.IAmAliveUTCTime >= iamAliveEndTime)).ToList();
+ if (instances.Count > 1)
+ {
+ _logger.LogInformation($"Current time have {instances.Count} PendingMessageChecker in {_mongodbOption.ServiceCollectionName} service , I will delete the extra instances");
+ var currentInstance = instances.FirstOrDefault();
+ var extraInstances = instances.Remove(currentInstance);
+ instances.ForEach(async n =>
+ {
+ await collection.DeleteOneAsync(m => m.Id == n.Id);
+ });
+ _logger.LogInformation($"Extra PendingMessageChecker instances in {_mongodbOption.ServiceCollectionName} service deleted successfully");
+ return currentInstance.PendingMessageCheckerIp;
+ }
+ else if (instances.Count == 1)
+ {
+ return instances.FirstOrDefault().PendingMessageCheckerIp;
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ public async Task IsPendingMessageCheckerServiceInstance(string ipAddress)
+ {
+ var collection = GetCollection();
+
+ var instances = (await collection.FindAsync(m => m.ServiceName == _mongodbOption.ServiceCollectionName && m.PendingMessageCheckerIp== ipAddress)).FirstOrDefault();
+ if (instances != null)
+ {
+ return true;
+ }
+ return false;
+ }
+
+ public async Task UpdateIAmAlive(string ipAddress,DateTime dateTime)
+ {
+ var collection = GetCollection();
+ var filter = Builders.Filter.Where(m => m.ServiceName == _mongodbOption.ServiceCollectionName && m.PendingMessageCheckerIp == ipAddress);
+ var update = Builders.Update.Set(m=>m.IAmAliveUTCTime,dateTime);
+ var result = await collection.UpdateOneAsync(filter, update);
+ return true;
+ }
+ }
+}
diff --git a/src/Pole.ReliableMessage.Storage.Mongodb/MongodbOption.cs b/src/Pole.ReliableMessage.Storage.Mongodb/MongodbOption.cs
new file mode 100644
index 0000000..9f39701
--- /dev/null
+++ b/src/Pole.ReliableMessage.Storage.Mongodb/MongodbOption.cs
@@ -0,0 +1,25 @@
+using System;
+using System.Collections.Generic;
+using System.Text;
+
+namespace Pole.ReliableMessage.Storage.Mongodb
+{
+ public class MongodbOption
+ {
+ public string MessageDatabaseName { get; set; } = "ReliableMessage";
+ public string MembershipCollectionName { get; set; } = "Membership";
+ ///
+ /// bucket 中最大消息数 一旦达到最大数量 后面的数据将覆盖前面的数据
+ ///
+ public long CollectionMaxMessageCount { get; set; } = 20000000;
+
+ ///
+ /// 默认最大为10G
+ ///
+ public long CollectionMaxSize { get; set; } = 10*1024*1024*1024L;
+
+ public string ServiceCollectionName { get; set; }
+ public MongoHost[] Servers { get; set; }
+ }
+
+}
diff --git a/src/Pole.ReliableMessage.Storage.Mongodb/MongodbStorage.cs b/src/Pole.ReliableMessage.Storage.Mongodb/MongodbStorage.cs
new file mode 100644
index 0000000..a22e5af
--- /dev/null
+++ b/src/Pole.ReliableMessage.Storage.Mongodb/MongodbStorage.cs
@@ -0,0 +1,140 @@
+using Pole.ReliableMessage.Abstraction;
+using Pole.ReliableMessage.Messaging;
+using Microsoft.Extensions.Configuration;
+using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Options;
+using MongoDB.Driver;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Linq.Expressions;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Pole.ReliableMessage.Storage.Mongodb
+{
+ class MongodbMessageStorage : IMessageStorage
+ {
+ private readonly MongoClient _mongoClient;
+ private readonly MongodbOption _mongodbOption;
+ private readonly ILogger _logger;
+ public MongodbMessageStorage(MongoClient mongoClient, IOptions mongodbOption, ILogger logger)
+ {
+ _mongoClient = mongoClient;
+ _mongodbOption = mongodbOption.Value;
+ _logger = logger;
+ }
+ private IMongoDatabase GetActiveMessageDatabase(string messageDatabase)
+ {
+ return _mongoClient.GetDatabase(messageDatabase);
+ }
+ private IMongoCollection GetCollection()
+ {
+ var database = GetActiveMessageDatabase(_mongodbOption.MessageDatabaseName);
+ var messageCollectionName = _mongodbOption.ServiceCollectionName;
+ var collection = database.GetCollection(messageCollectionName);
+ return collection;
+ }
+ public async Task Add(Message message)
+ {
+ IMongoCollection collection = GetCollection();
+
+ await collection.InsertOneAsync(message);
+ return true;
+ }
+
+ public async Task CheckAndUpdateStatus(Expression> filter, MessageStatus messageStatus)
+ {
+ IMongoCollection collection = GetCollection();
+
+ var update = Builders.Update.Set(m => m.MessageStatusId, messageStatus.Id);
+ var beforeDoc = await collection.FindOneAndUpdateAsync(filter, update, new FindOneAndUpdateOptions() { ReturnDocument = ReturnDocument.Before });
+ if (beforeDoc.MessageStatusId == messageStatus.Id)
+ {
+ return false;
+ }
+ return true;
+ }
+
+ public async Task> GetMany(Expression> filter,int count)
+ {
+ IMongoCollection collection = GetCollection();
+
+ var list= await collection.Find(filter).Limit(count).ToListAsync();
+ list.ForEach(m =>
+ {
+ m.MessageStatus = Core.Enumeration.FromValue(m.MessageStatusId);
+ });
+ return list;
+ }
+
+ public async Task Save(IEnumerable messages)
+ {
+ var count = messages.Count();
+ _logger.LogDebug($"MongodbMessageStorage Save begin, Messages count: {messages.Count()}");
+ if (count == 0)
+ {
+ _logger.LogDebug($"MongodbMessageStorage Save successfully, saved count: 0");
+ return true;
+ }
+ IMongoCollection collection = GetCollection();
+
+ var models = new List>();
+ foreach (var message in messages)
+ {
+ FilterDefinition filter = Builders.Filter.Where(m => m.Id == message.Id && m.MessageStatusId != MessageStatus.Handed.Id);
+ UpdateDefinition update = Builders.Update
+ .Set(m => m.MessageStatusId, message.MessageStatus.Id)
+ .Set(m => m.RetryTimes, message.RetryTimes)
+ .Set(m => m.NextRetryUTCTime, message.NextRetryUTCTime);
+
+ var model = new UpdateOneModel(filter, update);
+ models.Add(model);
+ }
+ var result = await collection.BulkWriteAsync(models, new BulkWriteOptions { IsOrdered = false });
+
+ _logger.LogDebug($"MongodbMessageStorage Save successfully, saved count: {result.ModifiedCount}");
+
+ return result.IsAcknowledged;
+ }
+
+ public async Task UpdateStatus(IEnumerable messages)
+ {
+ var count = messages.Count();
+ _logger.LogDebug($"MongodbMessageStorage updateStatus begin, Messages count: {messages.Count()}");
+ if (count == 0)
+ {
+ _logger.LogDebug($"MongodbMessageStorage updateStatus successfully, Modified count: 0");
+ return true;
+ }
+ IMongoCollection collection = GetCollection();
+
+ var models = new List>();
+
+ foreach (var message in messages)
+ {
+ FilterDefinition filter = Builders.Filter.Where(m => m.Id == message.Id&&m.MessageStatusId!=MessageStatus.Handed.Id);
+ UpdateDefinition update = Builders.Update
+ .Set(m => m.MessageStatusId, message.MessageStatus.Id);
+
+ var model = new UpdateOneModel(filter, update);
+ models.Add(model);
+ }
+
+ var result = await collection.BulkWriteAsync(models, new BulkWriteOptions { IsOrdered = false });
+
+ _logger.LogDebug($"MongodbMessageStorage updateStatus successfully, Modified count: {result.ModifiedCount}");
+
+ return result.IsAcknowledged;
+ }
+
+ public async Task UpdateStatus(Expression> filter, MessageStatus messageStatus)
+ {
+ IMongoCollection collection = GetCollection();
+
+ var update = Builders.Update.Set(m => m.MessageStatusId, messageStatus.Id);
+ var result = await collection.UpdateOneAsync(filter, update);
+ return result.IsAcknowledged;
+ }
+ }
+}
diff --git a/src/Pole.ReliableMessage.Storage.Mongodb/Pole.ReliableMessage.Storage.Mongodb.csproj b/src/Pole.ReliableMessage.Storage.Mongodb/Pole.ReliableMessage.Storage.Mongodb.csproj
new file mode 100644
index 0000000..8ded00e
--- /dev/null
+++ b/src/Pole.ReliableMessage.Storage.Mongodb/Pole.ReliableMessage.Storage.Mongodb.csproj
@@ -0,0 +1,19 @@
+
+
+
+ netstandard2.0
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/Pole.ReliableMessage.Storage.Mongodb/ReliableMessageOptionExtension.cs b/src/Pole.ReliableMessage.Storage.Mongodb/ReliableMessageOptionExtension.cs
new file mode 100644
index 0000000..781bd2e
--- /dev/null
+++ b/src/Pole.ReliableMessage.Storage.Mongodb/ReliableMessageOptionExtension.cs
@@ -0,0 +1,128 @@
+using Pole.ReliableMessage;
+using Pole.ReliableMessage.Abstraction;
+using Pole.ReliableMessage.Messaging;
+using Pole.ReliableMessage.Storage.Mongodb;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Options;
+using MongoDB.Bson;
+using MongoDB.Bson.Serialization;
+using MongoDB.Bson.Serialization.IdGenerators;
+using MongoDB.Driver;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using Pole.ReliableMessage.Storage.Abstraction;
+
+namespace Microsoft.Extensions.DependencyInjection
+{
+ public static class ReliableMessageOptionExtension
+ {
+ public static ReliableMessageOption AddMongodb(this ReliableMessageOption option, Action mongodbOptionConfig)
+ {
+ option.ReliableMessageOptionExtensions.Add(new MongodbStorageExtension(mongodbOptionConfig));
+ return option;
+ }
+ }
+ public class MongodbStorageExtension : IReliableMessageOptionExtension
+ {
+ private readonly Action _mongodbOption;
+ public MongodbStorageExtension(Action masstransitRabbitmqOption)
+ {
+ _mongodbOption = masstransitRabbitmqOption;
+ }
+ public void AddServices(IServiceCollection services)
+ {
+ services.Configure(_mongodbOption);
+ services.AddSingleton();
+ services.AddSingleton();
+
+ var mongodbOption = services.BuildServiceProvider().GetRequiredService>().Value;
+
+ var servers = mongodbOption.Servers.Select(x => new MongoServerAddress(x.Host, x.Port)).ToList();
+ var settings = new MongoClientSettings()
+ {
+ Servers = servers
+ };
+ var client = new MongoClient(settings);
+ var database = client.GetDatabase(mongodbOption.MessageDatabaseName);
+
+ AddMapper();
+
+ InitCollection(mongodbOption, database);
+
+ services.AddSingleton(client);
+ }
+
+ private static void InitCollection(MongodbOption mongodbOption, IMongoDatabase database)
+ {
+ var collectionNames = database.ListCollectionNames().ToList();
+
+ if (!collectionNames.Contains(mongodbOption.ServiceCollectionName))
+ {
+ database.CreateCollection(mongodbOption.ServiceCollectionName, new CreateCollectionOptions
+ {
+ Capped = true,
+ MaxDocuments = mongodbOption.CollectionMaxMessageCount,
+ MaxSize = mongodbOption.CollectionMaxSize,
+
+ });
+ var messageCollection = database.GetCollection(mongodbOption.ServiceCollectionName);
+ AddMessageCollectionIndex(messageCollection);
+ }
+
+ if (!collectionNames.Contains(mongodbOption.MembershipCollectionName))
+ {
+ database.CreateCollection(mongodbOption.MembershipCollectionName);
+
+ var membershipCollection = database.GetCollection(mongodbOption.MembershipCollectionName);
+ AddMemberShipTableCollectionIndex(membershipCollection);
+ }
+ }
+
+ private static void AddMessageCollectionIndex(IMongoCollection collection)
+ {
+ List> createIndexModels = new List>();
+
+ //var nextRetryUTCTimeIndex = Builders.IndexKeys.Ascending(m => m.NextRetryUTCTime);
+ //CreateIndexModel nextRetryUTCTimeIndexModel = new CreateIndexModel(nextRetryUTCTimeIndex, new CreateIndexOptions() { Background = true });
+ //createIndexModels.Add(nextRetryUTCTimeIndexModel);
+
+ var AddedUTCTimeUTCTimeIndex = Builders.IndexKeys.Ascending(m => m.AddedUTCTime);
+ CreateIndexModel AddedUTCTimeIndexModel = new CreateIndexModel(AddedUTCTimeUTCTimeIndex, new CreateIndexOptions() { Background = true });
+ createIndexModels.Add(AddedUTCTimeIndexModel);
+
+ //var messageTypeIdIndex = Builders.IndexKeys.Ascending(m => m.MessageTypeId);
+ //CreateIndexModel messageTypeIdIndexModel = new CreateIndexModel(messageTypeIdIndex, new CreateIndexOptions() { Background = true });
+ //createIndexModels.Add(messageTypeIdIndexModel);
+
+ collection.Indexes.CreateMany(createIndexModels);
+ }
+ private static void AddMemberShipTableCollectionIndex(IMongoCollection collection)
+ {
+ List> createIndexMembershipModels = new List>();
+
+ var serviceNameIndex = Builders.IndexKeys.Ascending(m => m.ServiceName);
+ CreateIndexModel serviceNameIndexModel = new CreateIndexModel(serviceNameIndex, new CreateIndexOptions() { Background = true, Unique = true });
+ createIndexMembershipModels.Add(serviceNameIndexModel);
+
+ collection.Indexes.CreateMany(createIndexMembershipModels);
+ }
+
+ private static void AddMapper()
+ {
+ BsonClassMap.RegisterClassMap(cm =>
+ {
+ cm.AutoMap();
+ cm.UnmapMember(m => m.MessageStatus);
+ cm.MapIdField(m => m.Id);
+ cm.MapMember(m => m.NextRetryUTCTime).SetIsRequired(true);
+ });
+ BsonClassMap.RegisterClassMap(cm =>
+ {
+ cm.AutoMap();
+ cm.MapIdField(m => m.Id).SetIdGenerator(StringObjectIdGenerator.Instance);
+ });
+ }
+ }
+}
diff --git a/src/Pole.ReliableMessage/Abstraction/IApplicationBuilderConfigurator.cs b/src/Pole.ReliableMessage/Abstraction/IApplicationBuilderConfigurator.cs
new file mode 100644
index 0000000..5b9c8a8
--- /dev/null
+++ b/src/Pole.ReliableMessage/Abstraction/IApplicationBuilderConfigurator.cs
@@ -0,0 +1,13 @@
+using Microsoft.AspNetCore.Builder;
+using System;
+using System.Collections.Generic;
+using System.Text;
+
+namespace Pole.ReliableMessage.Abstraction
+{
+ public interface IApplicationBuilderConfigurator
+ {
+ void Config(IApplicationBuilder applicationBuilder);
+ void Add(Action config);
+ }
+}
diff --git a/src/Pole.ReliableMessage/Abstraction/IComteckReliableMessageBootstrap.cs b/src/Pole.ReliableMessage/Abstraction/IComteckReliableMessageBootstrap.cs
new file mode 100644
index 0000000..da17eb8
--- /dev/null
+++ b/src/Pole.ReliableMessage/Abstraction/IComteckReliableMessageBootstrap.cs
@@ -0,0 +1,14 @@
+using Microsoft.Extensions.DependencyInjection;
+using System;
+using System.Collections.Generic;
+using System.Reflection;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Pole.ReliableMessage.Abstraction
+{
+ public interface IComteckReliableMessageBootstrap
+ {
+ Task Initialize(IServiceCollection services, List eventHandlerAssemblies, List eventAssemblies);
+ }
+}
diff --git a/src/Pole.ReliableMessage/Abstraction/IJsonConverter.cs b/src/Pole.ReliableMessage/Abstraction/IJsonConverter.cs
new file mode 100644
index 0000000..88dd982
--- /dev/null
+++ b/src/Pole.ReliableMessage/Abstraction/IJsonConverter.cs
@@ -0,0 +1,13 @@
+using System;
+using System.Collections.Generic;
+using System.Text;
+
+namespace Pole.ReliableMessage.Abstraction
+{
+ public interface IJsonConverter
+ {
+ string Serialize(object obj);
+ T Deserialize(string json);
+ object Deserialize(string json,Type type);
+ }
+}
diff --git a/src/Pole.ReliableMessage/Abstraction/IMessageBuffer.cs b/src/Pole.ReliableMessage/Abstraction/IMessageBuffer.cs
new file mode 100644
index 0000000..7055878
--- /dev/null
+++ b/src/Pole.ReliableMessage/Abstraction/IMessageBuffer.cs
@@ -0,0 +1,16 @@
+using Pole.ReliableMessage.Messaging;
+using System;
+using System.Collections.Generic;
+using System.Linq.Expressions;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Pole.ReliableMessage.Abstraction
+{
+ public interface IMessageBuffer
+ {
+ Task Flush();
+ Task Add(Message message);
+ Task> GetAll(Func filter);
+ }
+}
diff --git a/src/Pole.ReliableMessage/Abstraction/IMessageBus.cs b/src/Pole.ReliableMessage/Abstraction/IMessageBus.cs
new file mode 100644
index 0000000..9f10177
--- /dev/null
+++ b/src/Pole.ReliableMessage/Abstraction/IMessageBus.cs
@@ -0,0 +1,13 @@
+using System;
+using System.Collections.Generic;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace Pole.ReliableMessage.Abstraction
+{
+ public interface IMessageBus
+ {
+ Task Publish(object @event,string reliableMessageId, CancellationToken cancellationToken = default);
+ }
+}
diff --git a/src/Pole.ReliableMessage/Abstraction/IMessageBusConfigurator.cs b/src/Pole.ReliableMessage/Abstraction/IMessageBusConfigurator.cs
new file mode 100644
index 0000000..0261d1a
--- /dev/null
+++ b/src/Pole.ReliableMessage/Abstraction/IMessageBusConfigurator.cs
@@ -0,0 +1,13 @@
+using Microsoft.Extensions.DependencyInjection;
+using System;
+using System.Collections.Generic;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Pole.ReliableMessage.Abstraction
+{
+ public interface IMessageBusConfigurator
+ {
+ Task Configure(IServiceCollection services, IEnumerable eventHandlerTypes);
+ }
+}
diff --git a/src/Pole.ReliableMessage/Abstraction/IMessageCallBackGenerator.cs b/src/Pole.ReliableMessage/Abstraction/IMessageCallBackGenerator.cs
new file mode 100644
index 0000000..3a7e5c6
--- /dev/null
+++ b/src/Pole.ReliableMessage/Abstraction/IMessageCallBackGenerator.cs
@@ -0,0 +1,12 @@
+using Pole.ReliableMessage.Messaging.CallBack;
+using System;
+using System.Collections.Generic;
+using System.Text;
+
+namespace Pole.ReliableMessage.Abstraction
+{
+ public interface IMessageCallBackInfoGenerator
+ {
+ MessageCallBackInfo Generate(Type eventHandlerType);
+ }
+}
diff --git a/src/Pole.ReliableMessage/Abstraction/IMessageCallBackInfoStore.cs b/src/Pole.ReliableMessage/Abstraction/IMessageCallBackInfoStore.cs
new file mode 100644
index 0000000..c82ab66
--- /dev/null
+++ b/src/Pole.ReliableMessage/Abstraction/IMessageCallBackInfoStore.cs
@@ -0,0 +1,14 @@
+using Pole.ReliableMessage.Messaging.CallBack;
+using System;
+using System.Collections.Generic;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Pole.ReliableMessage.Abstraction
+{
+ public interface IMessageCallBackInfoStore
+ {
+ Task Add(MessageCallBackInfo messageCallBackInfo);
+ Task Get(string messageTypeId);
+ }
+}
diff --git a/src/Pole.ReliableMessage/Abstraction/IMessageCallBackRegister.cs b/src/Pole.ReliableMessage/Abstraction/IMessageCallBackRegister.cs
new file mode 100644
index 0000000..c39a628
--- /dev/null
+++ b/src/Pole.ReliableMessage/Abstraction/IMessageCallBackRegister.cs
@@ -0,0 +1,12 @@
+using System;
+using System.Collections.Generic;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Pole.ReliableMessage.Abstraction
+{
+ public interface IMessageCallBackRegister
+ {
+ Task Register(IEnumerable eventHandlerTypes);
+ }
+}
diff --git a/src/Pole.ReliableMessage/Abstraction/IMessageChecker.cs b/src/Pole.ReliableMessage/Abstraction/IMessageChecker.cs
new file mode 100644
index 0000000..11b2e44
--- /dev/null
+++ b/src/Pole.ReliableMessage/Abstraction/IMessageChecker.cs
@@ -0,0 +1,14 @@
+using Pole.ReliableMessage.Messaging;
+using System;
+using System.Collections.Generic;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Pole.ReliableMessage.Abstraction
+{
+ public interface IMessageChecker
+ {
+ Task GetResult(Message message);
+
+ }
+}
diff --git a/src/Pole.ReliableMessage/Abstraction/IMessageIdGenerator.cs b/src/Pole.ReliableMessage/Abstraction/IMessageIdGenerator.cs
new file mode 100644
index 0000000..aa888c7
--- /dev/null
+++ b/src/Pole.ReliableMessage/Abstraction/IMessageIdGenerator.cs
@@ -0,0 +1,11 @@
+using System;
+using System.Collections.Generic;
+using System.Text;
+
+namespace Pole.ReliableMessage.Abstraction
+{
+ public interface IMessageIdGenerator
+ {
+ string Generate();
+ }
+}
diff --git a/src/Pole.ReliableMessage/Abstraction/IMessageStorage.cs b/src/Pole.ReliableMessage/Abstraction/IMessageStorage.cs
new file mode 100644
index 0000000..849fcc6
--- /dev/null
+++ b/src/Pole.ReliableMessage/Abstraction/IMessageStorage.cs
@@ -0,0 +1,60 @@
+using Pole.ReliableMessage.Messaging;
+using System;
+using System.Collections.Generic;
+using System.Linq.Expressions;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Pole.ReliableMessage.Abstraction
+{
+ public interface IMessageStorage
+ {
+ ///
+ ///
+ ///
+ ///
+ ///
+ Task Add(Message message);
+ /////
+ /////
+ /////
+ /////
+ /////
+ /////
+ //Task Get(Expression> filter);
+ ///
+ ///
+ ///
+ ///
+ ///
+ ///
+ Task> GetMany(Expression> filter, int count);
+ ///
+ /// 批量更新
+ /// 更新这几个值 MessageStatusId , RetryTimes LastRetryUTCTime, NextRetryUTCTime
+ ///
+ ///
+ ///
+ Task Save(IEnumerable messages);
+
+ Task UpdateStatus(IEnumerable messages);
+
+ /////
+ /////
+ /////
+ ///// 这里id 永远为 string
+ /////
+ /////
+ //Task Update(Expression> filter, MessageStatus messageStatus);
+
+ ///
+ /// 检查 消息的状态,如果不是指定状态则返回true,并且更新状态到指定状态 ,如果已经是指定状态返回false
+ ///
+ ///
+ ///
+ ///
+ Task CheckAndUpdateStatus(Expression> filter, MessageStatus messageStatus);
+
+ Task UpdateStatus(Expression> filter, MessageStatus messageStatus);
+ }
+}
diff --git a/src/Pole.ReliableMessage/Abstraction/IMessageTypeIdGenerator.cs b/src/Pole.ReliableMessage/Abstraction/IMessageTypeIdGenerator.cs
new file mode 100644
index 0000000..58d5206
--- /dev/null
+++ b/src/Pole.ReliableMessage/Abstraction/IMessageTypeIdGenerator.cs
@@ -0,0 +1,11 @@
+using System;
+using System.Collections.Generic;
+using System.Text;
+
+namespace Pole.ReliableMessage.Abstraction
+{
+ public interface IMessageTypeIdGenerator
+ {
+ string Generate(Type messageType);
+ }
+}
diff --git a/src/Pole.ReliableMessage/Abstraction/IProcessor.cs b/src/Pole.ReliableMessage/Abstraction/IProcessor.cs
new file mode 100644
index 0000000..8037f05
--- /dev/null
+++ b/src/Pole.ReliableMessage/Abstraction/IProcessor.cs
@@ -0,0 +1,14 @@
+using Pole.ReliableMessage.Processor;
+using System;
+using System.Collections.Generic;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Pole.ReliableMessage.Abstraction
+{
+ public interface IProcessor
+ {
+ string Name { get; }
+ Task Process(ProcessingContext context);
+ }
+}
diff --git a/src/Pole.ReliableMessage/Abstraction/IProcessorServer.cs b/src/Pole.ReliableMessage/Abstraction/IProcessorServer.cs
new file mode 100644
index 0000000..61f1988
--- /dev/null
+++ b/src/Pole.ReliableMessage/Abstraction/IProcessorServer.cs
@@ -0,0 +1,13 @@
+using System;
+using System.Collections.Generic;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace Pole.ReliableMessage.Abstraction
+{
+ public interface IProcessorServer
+ {
+ Task Start(CancellationToken stoppingToken);
+ }
+}
diff --git a/src/Pole.ReliableMessage/Abstraction/IReliableBus.cs b/src/Pole.ReliableMessage/Abstraction/IReliableBus.cs
new file mode 100644
index 0000000..9eb37d9
--- /dev/null
+++ b/src/Pole.ReliableMessage/Abstraction/IReliableBus.cs
@@ -0,0 +1,15 @@
+using System;
+using System.Collections.Generic;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace Pole.ReliableMessage.Abstraction
+{
+ public interface IReliableBus
+ {
+ Task PrePublish(TReliableEvent @event,object callbackParemeter, CancellationToken cancellationToken = default);
+ Task Publish(TReliableEvent @event,string prePublishMessageId, CancellationToken cancellationToken=default);
+ Task Cancel(TReliableEvent @event, string prePublishMessageId, CancellationToken cancellationToken = default);
+ }
+}
diff --git a/src/Pole.ReliableMessage/Abstraction/IReliableEventCallBackFinder.cs b/src/Pole.ReliableMessage/Abstraction/IReliableEventCallBackFinder.cs
new file mode 100644
index 0000000..8cfded7
--- /dev/null
+++ b/src/Pole.ReliableMessage/Abstraction/IReliableEventCallBackFinder.cs
@@ -0,0 +1,12 @@
+using System;
+using System.Collections.Generic;
+using System.Reflection;
+using System.Text;
+
+namespace Pole.ReliableMessage.Abstraction
+{
+ public interface IReliableEventCallBackFinder
+ {
+ List FindAll(IEnumerable assemblies);
+ }
+}
diff --git a/src/Pole.ReliableMessage/Abstraction/IReliableEventCallback.cs b/src/Pole.ReliableMessage/Abstraction/IReliableEventCallback.cs
new file mode 100644
index 0000000..22e7b5f
--- /dev/null
+++ b/src/Pole.ReliableMessage/Abstraction/IReliableEventCallback.cs
@@ -0,0 +1,16 @@
+using System;
+using System.Collections.Generic;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Pole.ReliableMessage.Abstraction
+{
+ public interface IReliableEventCallback : IReliableEventCallback
+ {
+ Task Callback(TCallbackParemeter callbackParemeter);
+ }
+ public interface IReliableEventCallback
+ {
+
+ }
+}
diff --git a/src/Pole.ReliableMessage/Abstraction/IReliableEventHandler.cs b/src/Pole.ReliableMessage/Abstraction/IReliableEventHandler.cs
new file mode 100644
index 0000000..03df883
--- /dev/null
+++ b/src/Pole.ReliableMessage/Abstraction/IReliableEventHandler.cs
@@ -0,0 +1,18 @@
+using System;
+using System.Collections.Generic;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Pole.ReliableMessage.Abstraction
+{
+ public interface IReliableEventHandler : IReliableEventHandler
+ where TEvent : class
+ {
+ Task Handle(IReliableEventHandlerContext context);
+
+ }
+ public interface IReliableEventHandler
+ {
+
+ }
+}
diff --git a/src/Pole.ReliableMessage/Abstraction/IReliableEventHandlerContext.cs b/src/Pole.ReliableMessage/Abstraction/IReliableEventHandlerContext.cs
new file mode 100644
index 0000000..9b36ea8
--- /dev/null
+++ b/src/Pole.ReliableMessage/Abstraction/IReliableEventHandlerContext.cs
@@ -0,0 +1,13 @@
+using System;
+using System.Collections.Generic;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Pole.ReliableMessage.Abstraction
+{
+ public interface IReliableEventHandlerContext where TEvent : class
+ {
+ TEvent Event { get; }
+ //Task Publish(object @event);
+ }
+}
diff --git a/src/Pole.ReliableMessage/Abstraction/IReliableEventHandlerFinder.cs b/src/Pole.ReliableMessage/Abstraction/IReliableEventHandlerFinder.cs
new file mode 100644
index 0000000..bd17aa1
--- /dev/null
+++ b/src/Pole.ReliableMessage/Abstraction/IReliableEventHandlerFinder.cs
@@ -0,0 +1,12 @@
+using System;
+using System.Collections.Generic;
+using System.Reflection;
+using System.Text;
+
+namespace Pole.ReliableMessage.Abstraction
+{
+ public interface IReliableEventHandlerFinder
+ {
+ List FindAll(IEnumerable assemblies);
+ }
+}
diff --git a/src/Pole.ReliableMessage/Abstraction/IRetryTimeCalculator.cs b/src/Pole.ReliableMessage/Abstraction/IRetryTimeCalculator.cs
new file mode 100644
index 0000000..5302364
--- /dev/null
+++ b/src/Pole.ReliableMessage/Abstraction/IRetryTimeCalculator.cs
@@ -0,0 +1,11 @@
+using System;
+using System.Collections.Generic;
+using System.Text;
+
+namespace Pole.ReliableMessage.Abstraction
+{
+ public interface IRetryTimeDelayCalculator
+ {
+ int Get(int retryTimes, int maxPendingMessageRetryDelay);
+ }
+}
diff --git a/src/Pole.ReliableMessage/Abstraction/IServiceIPv4AddressProvider.cs b/src/Pole.ReliableMessage/Abstraction/IServiceIPv4AddressProvider.cs
new file mode 100644
index 0000000..cdc42f7
--- /dev/null
+++ b/src/Pole.ReliableMessage/Abstraction/IServiceIPv4AddressProvider.cs
@@ -0,0 +1,11 @@
+using System;
+using System.Collections.Generic;
+using System.Text;
+
+namespace Pole.ReliableMessage.Abstraction
+{
+ public interface IServiceIPv4AddressProvider
+ {
+ string Get();
+ }
+}
diff --git a/src/Pole.ReliableMessage/Abstraction/ITimeHelper.cs b/src/Pole.ReliableMessage/Abstraction/ITimeHelper.cs
new file mode 100644
index 0000000..a844fc3
--- /dev/null
+++ b/src/Pole.ReliableMessage/Abstraction/ITimeHelper.cs
@@ -0,0 +1,16 @@
+using System;
+using System.Collections.Generic;
+using System.Text;
+
+namespace Pole.ReliableMessage.Abstraction
+{
+ public interface ITimeHelper
+ {
+ DateTime GetUTCNow();
+ ///
+ /// "UTC :{_timeHelper.GetNow().ToString("yyyy-MM-dd HH:mm:ss.fff")}"
+ ///
+ ///
+ string GetAppropriateFormatedDateString();
+ }
+}
diff --git a/src/Pole.ReliableMessage/BackgroundServiceBasedProcessorServer.cs b/src/Pole.ReliableMessage/BackgroundServiceBasedProcessorServer.cs
new file mode 100644
index 0000000..e1b3fb8
--- /dev/null
+++ b/src/Pole.ReliableMessage/BackgroundServiceBasedProcessorServer.cs
@@ -0,0 +1,53 @@
+using Pole.ReliableMessage.Abstraction;
+using Pole.ReliableMessage.Processor;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Hosting;
+using Microsoft.Extensions.Logging;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace Pole.ReliableMessage
+{
+ public class BackgroundServiceBasedProcessorServer : BackgroundService, IProcessorServer
+ {
+ private readonly IServiceProvider _serviceProvider;
+ private Task _compositeTask;
+
+ public BackgroundServiceBasedProcessorServer(IServiceProvider serviceProvider)
+ {
+ _serviceProvider = serviceProvider;
+ }
+ public async Task Start(CancellationToken stoppingToken)
+ {
+
+ ProcessingContext processingContext = new ProcessingContext(stoppingToken);
+ List loopProcessors = new List();
+ var innerProcessors = _serviceProvider.GetServices();
+ var loggerFactory = _serviceProvider.GetService();
+ var timeHelper = _serviceProvider.GetService();
+ foreach (var innerProcessor in innerProcessors)
+ {
+ LoopProcessor processor = new LoopProcessor(innerProcessor, loggerFactory, timeHelper);
+ loopProcessors.Add(processor);
+ }
+ var tasks = loopProcessors.Select(p => p.Process(processingContext));
+
+ _compositeTask = Task.WhenAll(tasks);
+ await _compositeTask;
+ }
+ protected override Task ExecuteAsync(CancellationToken stoppingToken)
+ {
+ return Start(stoppingToken);
+ }
+ public override void Dispose()
+ {
+ // 等待 10秒 待 消息处理完
+ _compositeTask?.Wait((int)TimeSpan.FromSeconds(10).TotalMilliseconds);
+ base.Dispose();
+ }
+ }
+}
diff --git a/src/Pole.ReliableMessage/ComteckReliableMessageIApplicationBuilderExtensions.cs b/src/Pole.ReliableMessage/ComteckReliableMessageIApplicationBuilderExtensions.cs
new file mode 100644
index 0000000..6c6b177
--- /dev/null
+++ b/src/Pole.ReliableMessage/ComteckReliableMessageIApplicationBuilderExtensions.cs
@@ -0,0 +1,27 @@
+using Pole.ReliableMessage;
+using Pole.ReliableMessage.Abstraction;
+using Microsoft.AspNetCore.Builder;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Options;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+
+namespace Microsoft.AspNetCore.Builder
+{
+ public static class ComteckReliableMessageIApplicationBuilderExtensions
+ {
+ public static IApplicationBuilder UseComteckReliableMessage(this IApplicationBuilder applicationBuilder)
+ {
+ var option = applicationBuilder.ApplicationServices.GetRequiredService(typeof(IOptions)) as IOptions;
+ var messageCallBackRegister = applicationBuilder.ApplicationServices.GetRequiredService(typeof(IMessageCallBackRegister)) as IMessageCallBackRegister;
+ var reliableEventCallBackFinder = applicationBuilder.ApplicationServices.GetRequiredService(typeof(IReliableEventCallBackFinder)) as IReliableEventCallBackFinder;
+
+ var eventCallbacks = reliableEventCallBackFinder.FindAll(option.Value.EventCallbackAssemblies);
+ messageCallBackRegister.Register(eventCallbacks).GetAwaiter().GetResult();
+
+ return applicationBuilder;
+ }
+ }
+}
diff --git a/src/Pole.ReliableMessage/ComteckReliableMessageServiceCollectionExtensions.cs b/src/Pole.ReliableMessage/ComteckReliableMessageServiceCollectionExtensions.cs
new file mode 100644
index 0000000..e92f470
--- /dev/null
+++ b/src/Pole.ReliableMessage/ComteckReliableMessageServiceCollectionExtensions.cs
@@ -0,0 +1,66 @@
+using Pole.Pole.ReliableMessage.EventBus;
+using Pole.ReliableMessage;
+using Pole.ReliableMessage.Abstraction;
+using Pole.ReliableMessage.EventBus;
+using Pole.ReliableMessage.Messaging;
+using Pole.ReliableMessage.Messaging.CallBack;
+using Pole.ReliableMessage.Processor;
+using Pole.ReliableMessage.Utils;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Logging;
+using System;
+using System.Collections.Generic;
+using System.Text;
+
+namespace Microsoft.Extensions.DependencyInjection
+{
+ public static class ComteckReliableMessageServiceCollectionExtensions
+ {
+ public static IServiceCollection AddComteckReliableMessage(this IServiceCollection services, Action optionConfig)
+ {
+ ReliableMessageOption reliableMessageOption = new ReliableMessageOption();
+ optionConfig(reliableMessageOption);
+
+ foreach(var extension in reliableMessageOption.ReliableMessageOptionExtensions)
+ {
+ extension.AddServices(services);
+ }
+ services.Configure(optionConfig);
+
+ services.AddSingleton();
+ services.AddSingleton();
+ services.AddSingleton();
+ services.AddSingleton();
+ services.AddSingleton();
+ services.AddSingleton();
+ services.AddSingleton();
+ services.AddSingleton();
+ services.AddSingleton();
+ services.AddSingleton();
+ services.AddSingleton();
+ services.AddSingleton();
+ services.AddSingleton();
+ services.AddSingleton();
+ services.AddSingleton();
+ services.AddSingleton();
+
+ services.AddHostedService();
+
+ services.AddHttpClient();
+
+
+
+
+ services.AddSingleton();
+ services.AddSingleton();
+ services.AddSingleton();
+
+ var provider = services.BuildServiceProvider();
+
+ IComteckReliableMessageBootstrap applicationBuilderConfigurator = provider.GetService(typeof(IComteckReliableMessageBootstrap)) as IComteckReliableMessageBootstrap;
+
+ applicationBuilderConfigurator.Initialize(services, reliableMessageOption.EventHandlerAssemblies, reliableMessageOption.EventCallbackAssemblies);
+ return services;
+ }
+ }
+}
diff --git a/src/Pole.ReliableMessage/Core/Enumeration.cs b/src/Pole.ReliableMessage/Core/Enumeration.cs
new file mode 100644
index 0000000..3c584b6
--- /dev/null
+++ b/src/Pole.ReliableMessage/Core/Enumeration.cs
@@ -0,0 +1,73 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Reflection;
+using System.Text;
+
+namespace Pole.ReliableMessage.Core
+{
+ public abstract class Enumeration : IComparable
+ {
+ public string Name { get; private set; }
+
+ public int Id { get; private set; }
+
+ protected Enumeration(int id, string name)
+ {
+ Id = id;
+ Name = name;
+ }
+
+ public override string ToString() => Name;
+
+ public static IEnumerable GetAll() where T : Enumeration
+ {
+ var fields = typeof(T).GetFields(BindingFlags.Public | BindingFlags.Static | BindingFlags.DeclaredOnly);
+
+ return fields.Select(f => f.GetValue(null)).Cast();
+ }
+
+ public override bool Equals(object obj)
+ {
+ if (!(obj is Enumeration otherValue))
+ return false;
+
+ var typeMatches = GetType().Equals(obj.GetType());
+ var valueMatches = Id.Equals(otherValue.Id);
+
+ return typeMatches && valueMatches;
+ }
+
+ public override int GetHashCode() => Id.GetHashCode();
+
+ public static int AbsoluteDifference(Enumeration firstValue, Enumeration secondValue)
+ {
+ var absoluteDifference = Math.Abs(firstValue.Id - secondValue.Id);
+ return absoluteDifference;
+ }
+
+ public static T FromValue(int value) where T : Enumeration
+ {
+ var matchingItem = Parse(value, "value", item => item.Id == value);
+ return matchingItem;
+ }
+
+ public static T FromDisplayName(string displayName) where T : Enumeration
+ {
+ var matchingItem = Parse(displayName, "display name", item => item.Name == displayName);
+ return matchingItem;
+ }
+
+ private static T Parse(K value, string description, Func predicate) where T : Enumeration
+ {
+ var matchingItem = GetAll().FirstOrDefault(predicate);
+
+ if (matchingItem == null)
+ throw new InvalidOperationException($"'{value}' is not a valid {description} in {typeof(T)}");
+
+ return matchingItem;
+ }
+
+ public int CompareTo(object other) => Id.CompareTo(((Enumeration)other).Id);
+ }
+}
diff --git a/src/Pole.ReliableMessage/DefaultApplicationBuilderConfigurator.cs b/src/Pole.ReliableMessage/DefaultApplicationBuilderConfigurator.cs
new file mode 100644
index 0000000..805204d
--- /dev/null
+++ b/src/Pole.ReliableMessage/DefaultApplicationBuilderConfigurator.cs
@@ -0,0 +1,24 @@
+using Pole.ReliableMessage.Abstraction;
+using Microsoft.AspNetCore.Builder;
+using System;
+using System.Collections.Generic;
+using System.Text;
+
+namespace Pole.ReliableMessage
+{
+ class DefaultApplicationBuilderConfigurator : IApplicationBuilderConfigurator
+ {
+ private readonly List> _configs = new List>();
+ public void Add(Action config)
+ {
+ _configs.Add(config);
+ }
+
+ public void Config(IApplicationBuilder applicationBuilder)
+ {
+ _configs.ForEach(m => {
+ m(applicationBuilder);
+ });
+ }
+ }
+}
diff --git a/src/Pole.ReliableMessage/DefaultComteckReliableMessageBootstrap.cs b/src/Pole.ReliableMessage/DefaultComteckReliableMessageBootstrap.cs
new file mode 100644
index 0000000..f0987b6
--- /dev/null
+++ b/src/Pole.ReliableMessage/DefaultComteckReliableMessageBootstrap.cs
@@ -0,0 +1,42 @@
+using Pole.ReliableMessage.Abstraction;
+using Microsoft.Extensions.DependencyInjection;
+using System;
+using System.Collections.Generic;
+using System.Reflection;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Pole.ReliableMessage
+{
+ class DefaultComteckReliableMessageBootstrap : IComteckReliableMessageBootstrap
+ {
+ private readonly IReliableEventHandlerFinder _reliableEventHandlerFinder;
+ private readonly IMessageBusConfigurator _messageBusConfigurator;
+ private readonly IMessageCallBackRegister _messageCallBackRegister;
+ private readonly IReliableEventCallBackFinder _reliableEventCallBackFinder;
+ public DefaultComteckReliableMessageBootstrap(IReliableEventHandlerFinder reliableEventHandlerFinder, IMessageBusConfigurator messageBusConfigurator, IMessageCallBackRegister messageCallBackRegister, IReliableEventCallBackFinder reliableEventCallBackFinder)
+ {
+ _reliableEventHandlerFinder = reliableEventHandlerFinder;
+ _messageBusConfigurator = messageBusConfigurator;
+ _messageCallBackRegister = messageCallBackRegister;
+ _reliableEventCallBackFinder = reliableEventCallBackFinder;
+ }
+ public async Task Initialize(IServiceCollection services, List eventHandlerAssemblies, List eventAssemblies)
+ {
+ var eventHandlers = _reliableEventHandlerFinder.FindAll(eventHandlerAssemblies);
+ await _messageBusConfigurator.Configure(services, eventHandlers);
+
+ var eventCallbacks = _reliableEventCallBackFinder.FindAll(eventAssemblies);
+ await _messageCallBackRegister.Register(eventCallbacks);
+ RegisterEventCallbacks(services, eventCallbacks);
+ }
+
+ private void RegisterEventCallbacks(IServiceCollection services, List eventCallbacks)
+ {
+ eventCallbacks.ForEach(m =>
+ {
+ services.AddScoped(m);
+ });
+ }
+ }
+}
diff --git a/src/Pole.ReliableMessage/DefaultRetryTimeCalculator.cs b/src/Pole.ReliableMessage/DefaultRetryTimeCalculator.cs
new file mode 100644
index 0000000..3d8d327
--- /dev/null
+++ b/src/Pole.ReliableMessage/DefaultRetryTimeCalculator.cs
@@ -0,0 +1,20 @@
+using Pole.ReliableMessage.Abstraction;
+using System;
+using System.Collections.Generic;
+using System.Text;
+
+namespace Pole.ReliableMessage
+{
+ class DefaultRetryTimeDelayCalculator : IRetryTimeDelayCalculator
+ {
+ public int Get(int retryTimes, int maxPendingMessageRetryDelay)
+ {
+ var retryTimeDelay = (int)Math.Pow(2, retryTimes + 1);
+ if (retryTimeDelay >= maxPendingMessageRetryDelay)
+ {
+ return maxPendingMessageRetryDelay;
+ }
+ return retryTimeDelay;
+ }
+ }
+}
diff --git a/src/Pole.ReliableMessage/EventBus/DefaultReliableBus.cs b/src/Pole.ReliableMessage/EventBus/DefaultReliableBus.cs
new file mode 100644
index 0000000..597bf07
--- /dev/null
+++ b/src/Pole.ReliableMessage/EventBus/DefaultReliableBus.cs
@@ -0,0 +1,111 @@
+using Pole.ReliableMessage.Abstraction;
+using Pole.ReliableMessage.Messaging;
+using Microsoft.Extensions.Logging;
+using System;
+using System.Collections.Generic;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+using ILogger = Microsoft.Extensions.Logging.ILogger;
+
+namespace Pole.Pole.ReliableMessage.EventBus
+{
+ public class DefaultReliableBus : IReliableBus
+ {
+ private readonly IMessageBus _messageBus;
+ private readonly IMessageStorage _messageStorage;
+ private readonly IMessageIdGenerator _messageIdGenerator;
+ private readonly ITimeHelper _timeHelper;
+ private readonly IMessageBuffer _messageBuffer;
+ private readonly ILogger _logger;
+ private readonly IJsonConverter _jsonConverter;
+ private readonly IMessageCallBackInfoStore _messageCallBackInfoStore;
+ private readonly IMessageTypeIdGenerator _messageTypeIdGenerator;
+ public DefaultReliableBus(IMessageBus messageBus, IMessageStorage messageStorage, IMessageIdGenerator messageIdGenerator, ITimeHelper timeHelper, IMessageBuffer messageBuffer, ILogger logger, IJsonConverter jsonConverter, IMessageCallBackInfoStore messageCallBackInfoStore, IMessageTypeIdGenerator messageTypeIdGenerator)
+ {
+ _messageBus = messageBus;
+ _messageStorage = messageStorage;
+ _messageIdGenerator = messageIdGenerator;
+ _timeHelper = timeHelper;
+ _messageBuffer = messageBuffer;
+ _logger = logger;
+ _jsonConverter = jsonConverter;
+ _messageCallBackInfoStore = messageCallBackInfoStore;
+ _messageTypeIdGenerator = messageTypeIdGenerator;
+ }
+
+ public Task Cancel(TReliableEvent @event, string prePublishMessageId, CancellationToken cancellationToken = default)
+ {
+ try
+ {
+ return _messageStorage.UpdateStatus(m => m.Id == prePublishMessageId, MessageStatus.Canced);
+ }
+ catch (Exception ex)
+ {
+ var errorInfo = $"Cancel PrePublish errors in defaultReliableBus;{ex.Message}";
+ _logger.LogError(ex, errorInfo);
+ throw new Exception(errorInfo, ex);
+ }
+ }
+
+ public async Task PrePublish(TReliableEvent @event, object callbackParemeter, CancellationToken cancellationToken = default)
+ {
+
+ var messageTypeId = _messageTypeIdGenerator.Generate(typeof(TReliableEvent));
+
+ var currentMessageCallbackInfo = _messageCallBackInfoStore.Get(messageTypeId);
+ if (currentMessageCallbackInfo == null)
+ {
+ throw new Exception($"Current message type not registered ,messageTypeId:{messageTypeId}");
+ }
+ try
+ {
+ var messageId = _messageIdGenerator.Generate();
+
+ _logger.LogDebug($"PrePublish message begin ,messageId:{messageId}");
+
+ var now = _timeHelper.GetUTCNow();
+ var content = _jsonConverter.Serialize(@event);
+ var callBackParem = _jsonConverter.Serialize(callbackParemeter);
+ Message newMessage = new Message()
+ {
+ AddedUTCTime = now,
+ Content = content,
+ Id = messageId,
+ MessageStatusId = MessageStatus.Pending.Id,
+ MessageTypeId = messageTypeId,
+ RePushCallBackParameterValue = callBackParem,
+ NextRetryUTCTime = DateTime.MinValue
+ };
+ await _messageStorage.Add(newMessage);
+
+ _logger.LogDebug($"PrePublish message successful ,messageId:{messageId}");
+
+ return messageId;
+ }
+ catch (Exception ex)
+ {
+ var errorInfo = $"PrePublish errors in DefaultReliableBus;{ex.Message}";
+ _logger.LogError(ex, errorInfo);
+ throw new Exception(errorInfo, ex);
+ }
+ }
+
+ public async Task Publish(TReliableEvent @event, string prePublishMessageId, CancellationToken cancellationToken = default)
+ {
+ try
+ {
+ await _messageBus.Publish(@event, prePublishMessageId, cancellationToken);
+
+ var messageBufferResult = await _messageBuffer.Add(new Message { Id = prePublishMessageId, MessageStatus = MessageStatus.Pushed });
+ return true;
+ }
+ catch (Exception ex)
+ {
+ var errorInfo = $"Publish errors in DefaultReliableBus;{ex.Message}";
+ _logger.LogError(ex, errorInfo);
+ throw new Exception(errorInfo, ex);
+ }
+ }
+ }
+}
diff --git a/src/Pole.ReliableMessage/EventBus/DefaultReliableEventFinder.cs b/src/Pole.ReliableMessage/EventBus/DefaultReliableEventFinder.cs
new file mode 100644
index 0000000..9d1f187
--- /dev/null
+++ b/src/Pole.ReliableMessage/EventBus/DefaultReliableEventFinder.cs
@@ -0,0 +1,20 @@
+using Pole.ReliableMessage.Abstraction;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Reflection;
+using System.Text;
+
+namespace Pole.ReliableMessage.EventBus
+{
+ class DefaultReliableEventCallBackFinder : IReliableEventCallBackFinder
+ {
+ public List FindAll(IEnumerable assemblies)
+ {
+ var eventType = typeof(IReliableEventCallback);
+
+ var eventTypes = assemblies.SelectMany(m => m.GetTypes().Where(type => eventType.IsAssignableFrom(type)));
+ return eventTypes.ToList(); ;
+ }
+ }
+}
diff --git a/src/Pole.ReliableMessage/EventBus/DefaultReliableEventHandlerFinder.cs b/src/Pole.ReliableMessage/EventBus/DefaultReliableEventHandlerFinder.cs
new file mode 100644
index 0000000..ebe3328
--- /dev/null
+++ b/src/Pole.ReliableMessage/EventBus/DefaultReliableEventHandlerFinder.cs
@@ -0,0 +1,20 @@
+using Pole.ReliableMessage.Abstraction;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Reflection;
+using System.Text;
+
+namespace Pole.ReliableMessage.EventBus
+{
+ public class DefaultReliableEventHandlerFinder : IReliableEventHandlerFinder
+ {
+ public List FindAll(IEnumerable assemblies)
+ {
+ var eventHandlerType = typeof(IReliableEventHandler);
+
+ var eventHandlerTypes = assemblies.SelectMany(m => m.GetTypes().Where(type => eventHandlerType.IsAssignableFrom(type)));
+ return eventHandlerTypes.ToList();
+ }
+ }
+}
diff --git a/src/Pole.ReliableMessage/IReliableMessageOptionExtension.cs b/src/Pole.ReliableMessage/IReliableMessageOptionExtension.cs
new file mode 100644
index 0000000..5032fa9
--- /dev/null
+++ b/src/Pole.ReliableMessage/IReliableMessageOptionExtension.cs
@@ -0,0 +1,12 @@
+using Microsoft.Extensions.DependencyInjection;
+using System;
+using System.Collections.Generic;
+using System.Text;
+
+namespace Pole.ReliableMessage
+{
+ public interface IReliableMessageOptionExtension
+ {
+ void AddServices(IServiceCollection services);
+ }
+}
diff --git a/src/Pole.ReliableMessage/MemberShipTable.cs b/src/Pole.ReliableMessage/MemberShipTable.cs
new file mode 100644
index 0000000..535ec71
--- /dev/null
+++ b/src/Pole.ReliableMessage/MemberShipTable.cs
@@ -0,0 +1,21 @@
+using System;
+using System.Collections.Generic;
+using System.Text;
+
+namespace Pole.ReliableMessage
+{
+ public class MemberShipTable
+ {
+ public string Id { get;private set; }
+ public MemberShipTable(string serviceName,string pendingMessageCheckerIp,DateTime iAmAliveUTCTime)
+ {
+ ServiceName = serviceName;
+ PendingMessageCheckerIp = pendingMessageCheckerIp;
+ IAmAliveUTCTime = iAmAliveUTCTime;
+ }
+
+ public string ServiceName { get;private set; }
+ public string PendingMessageCheckerIp { get; private set; }
+ public DateTime IAmAliveUTCTime { get; private set; }
+ }
+}
diff --git a/src/Pole.ReliableMessage/Messaging/CallBack/DefaultMessageCallBackInfoGenerator.cs b/src/Pole.ReliableMessage/Messaging/CallBack/DefaultMessageCallBackInfoGenerator.cs
new file mode 100644
index 0000000..6e3c93e
--- /dev/null
+++ b/src/Pole.ReliableMessage/Messaging/CallBack/DefaultMessageCallBackInfoGenerator.cs
@@ -0,0 +1,51 @@
+using Pole.ReliableMessage.Abstraction;
+using Pole.ReliableMessage.EventBus;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Linq.Expressions;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Pole.ReliableMessage.Messaging.CallBack
+{
+ class DefaultMessageCallBackInfoGenerator : IMessageCallBackInfoGenerator
+ {
+ private readonly IMessageTypeIdGenerator _messageTypeIdGenerator;
+ public DefaultMessageCallBackInfoGenerator(IMessageTypeIdGenerator messageTypeIdGenerator)
+ {
+ _messageTypeIdGenerator = messageTypeIdGenerator;
+ }
+ public MessageCallBackInfo Generate(Type eventCallbackType)
+ {
+ var @interface = eventCallbackType.GetInterfaces().FirstOrDefault();
+ Func