From eb350c31739c685f7b0b00bb664cbc50b1042f6a Mon Sep 17 00:00:00 2001 From: 丁松杰 <377973147@qq.com> Date: Tue, 7 Jan 2020 14:21:54 +0800 Subject: [PATCH] 添加 可靠消息 组件 --- Pole.sln | 56 ++++++++++++++++++++++++++++++++++++++++++++++++++------ src/Pole.ReliableMessage.Dashboard/Pole.ReliableMessage.Dashboard.csproj | 7 +++++++ src/Pole.ReliableMessage.Masstransit/Abstraction/IReliableEvent.cs | 10 ++++++++++ src/Pole.ReliableMessage.Masstransit/Abstraction/IReliableEventHandlerRegistrarFactory.cs | 12 ++++++++++++ src/Pole.ReliableMessage.Masstransit/DefaultReliableEventHandlerContext.cs | 21 +++++++++++++++++++++ src/Pole.ReliableMessage.Masstransit/DefaultReliableEventHandlerRegistrarFactory.cs | 76 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/Pole.ReliableMessage.Masstransit/MassTransitHostedService.cs | 33 +++++++++++++++++++++++++++++++++ src/Pole.ReliableMessage.Masstransit/MasstransitBasedMessageBus.cs | 24 ++++++++++++++++++++++++ src/Pole.ReliableMessage.Masstransit/MasstransitEventHandlerRegistrar.cs | 46 ++++++++++++++++++++++++++++++++++++++++++++++ src/Pole.ReliableMessage.Masstransit/MasstransitMessageBusConfigurator.cs | 59 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/Pole.ReliableMessage.Masstransit/MasstransitRabbitmqOption.cs | 29 +++++++++++++++++++++++++++++ src/Pole.ReliableMessage.Masstransit/MasstransitReliableEventHandler.cs | 65 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/Pole.ReliableMessage.Masstransit/Messaging/DefaultMessageIdGenerator.cs | 16 ++++++++++++++++ src/Pole.ReliableMessage.Masstransit/Pipe/AddReliableMessageIdPipe.cs | 29 +++++++++++++++++++++++++++++ src/Pole.ReliableMessage.Masstransit/Pole.ReliableMessage.Masstransit.csproj | 17 +++++++++++++++++ src/Pole.ReliableMessage.Masstransit/QueueHaType.cs | 32 ++++++++++++++++++++++++++++++++ src/Pole.ReliableMessage.Masstransit/ReliableEventHandlerParemeterAttribute.cs | 15 +++++++++++++++ src/Pole.ReliableMessage.Masstransit/ReliableMessageOptionExtension.cs | 38 ++++++++++++++++++++++++++++++++++++++ src/Pole.ReliableMessage.Storage.Abstraction/IMemberShipTable.cs | 22 ++++++++++++++++++++++ src/Pole.ReliableMessage.Storage.Abstraction/Pole.ReliableMessage.Storage.Abstraction.csproj | 7 +++++++ src/Pole.ReliableMessage.Storage.EntityframeworkCore/Pole.ReliableMessage.Storage.EntityframeworkCore.csproj | 11 +++++++++++ src/Pole.ReliableMessage.Storage.Mongodb/MongoHost.cs | 19 +++++++++++++++++++ src/Pole.ReliableMessage.Storage.Mongodb/MongodbMemberShipTable.cs | 94 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/Pole.ReliableMessage.Storage.Mongodb/MongodbOption.cs | 25 +++++++++++++++++++++++++ src/Pole.ReliableMessage.Storage.Mongodb/MongodbStorage.cs | 140 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/Pole.ReliableMessage.Storage.Mongodb/Pole.ReliableMessage.Storage.Mongodb.csproj | 19 +++++++++++++++++++ src/Pole.ReliableMessage.Storage.Mongodb/ReliableMessageOptionExtension.cs | 128 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/Pole.ReliableMessage/Abstraction/IApplicationBuilderConfigurator.cs | 13 +++++++++++++ src/Pole.ReliableMessage/Abstraction/IComteckReliableMessageBootstrap.cs | 14 ++++++++++++++ src/Pole.ReliableMessage/Abstraction/IJsonConverter.cs | 13 +++++++++++++ src/Pole.ReliableMessage/Abstraction/IMessageBuffer.cs | 16 ++++++++++++++++ src/Pole.ReliableMessage/Abstraction/IMessageBus.cs | 13 +++++++++++++ src/Pole.ReliableMessage/Abstraction/IMessageBusConfigurator.cs | 13 +++++++++++++ src/Pole.ReliableMessage/Abstraction/IMessageCallBackGenerator.cs | 12 ++++++++++++ src/Pole.ReliableMessage/Abstraction/IMessageCallBackInfoStore.cs | 14 ++++++++++++++ src/Pole.ReliableMessage/Abstraction/IMessageCallBackRegister.cs | 12 ++++++++++++ src/Pole.ReliableMessage/Abstraction/IMessageChecker.cs | 14 ++++++++++++++ src/Pole.ReliableMessage/Abstraction/IMessageIdGenerator.cs | 11 +++++++++++ src/Pole.ReliableMessage/Abstraction/IMessageStorage.cs | 60 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/Pole.ReliableMessage/Abstraction/IMessageTypeIdGenerator.cs | 11 +++++++++++ src/Pole.ReliableMessage/Abstraction/IProcessor.cs | 14 ++++++++++++++ src/Pole.ReliableMessage/Abstraction/IProcessorServer.cs | 13 +++++++++++++ src/Pole.ReliableMessage/Abstraction/IReliableBus.cs | 15 +++++++++++++++ src/Pole.ReliableMessage/Abstraction/IReliableEventCallBackFinder.cs | 12 ++++++++++++ src/Pole.ReliableMessage/Abstraction/IReliableEventCallback.cs | 16 ++++++++++++++++ src/Pole.ReliableMessage/Abstraction/IReliableEventHandler.cs | 18 ++++++++++++++++++ src/Pole.ReliableMessage/Abstraction/IReliableEventHandlerContext.cs | 13 +++++++++++++ src/Pole.ReliableMessage/Abstraction/IReliableEventHandlerFinder.cs | 12 ++++++++++++ src/Pole.ReliableMessage/Abstraction/IRetryTimeCalculator.cs | 11 +++++++++++ src/Pole.ReliableMessage/Abstraction/IServiceIPv4AddressProvider.cs | 11 +++++++++++ src/Pole.ReliableMessage/Abstraction/ITimeHelper.cs | 16 ++++++++++++++++ src/Pole.ReliableMessage/BackgroundServiceBasedProcessorServer.cs | 53 +++++++++++++++++++++++++++++++++++++++++++++++++++++ src/Pole.ReliableMessage/ComteckReliableMessageIApplicationBuilderExtensions.cs | 27 +++++++++++++++++++++++++++ src/Pole.ReliableMessage/ComteckReliableMessageServiceCollectionExtensions.cs | 66 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/Pole.ReliableMessage/Core/Enumeration.cs | 73 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/Pole.ReliableMessage/DefaultApplicationBuilderConfigurator.cs | 24 ++++++++++++++++++++++++ src/Pole.ReliableMessage/DefaultComteckReliableMessageBootstrap.cs | 42 ++++++++++++++++++++++++++++++++++++++++++ src/Pole.ReliableMessage/DefaultRetryTimeCalculator.cs | 20 ++++++++++++++++++++ src/Pole.ReliableMessage/EventBus/DefaultReliableBus.cs | 111 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/Pole.ReliableMessage/EventBus/DefaultReliableEventFinder.cs | 20 ++++++++++++++++++++ src/Pole.ReliableMessage/EventBus/DefaultReliableEventHandlerFinder.cs | 20 ++++++++++++++++++++ src/Pole.ReliableMessage/IReliableMessageOptionExtension.cs | 12 ++++++++++++ src/Pole.ReliableMessage/MemberShipTable.cs | 21 +++++++++++++++++++++ src/Pole.ReliableMessage/Messaging/CallBack/DefaultMessageCallBackInfoGenerator.cs | 51 +++++++++++++++++++++++++++++++++++++++++++++++++++ src/Pole.ReliableMessage/Messaging/CallBack/DefaultMessageCallBackRegister.cs | 27 +++++++++++++++++++++++++++ src/Pole.ReliableMessage/Messaging/CallBack/MessageCallBackInfo.cs | 30 ++++++++++++++++++++++++++++++ src/Pole.ReliableMessage/Messaging/CallBack/MessageCallBackInfoInMemoryStore.cs | 38 ++++++++++++++++++++++++++++++++++++++ src/Pole.ReliableMessage/Messaging/CallBack/MessageCallBackResponseStatus.cs | 18 ++++++++++++++++++ src/Pole.ReliableMessage/Messaging/DefaultJsonConverter.cs | 25 +++++++++++++++++++++++++ src/Pole.ReliableMessage/Messaging/DefaultMessageBuffer.cs | 58 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/Pole.ReliableMessage/Messaging/DefaultMessageChecker.cs | 62 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/Pole.ReliableMessage/Messaging/DefaultMessageTypeIdGenerator.cs | 15 +++++++++++++++ src/Pole.ReliableMessage/Messaging/Message.cs | 79 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/Pole.ReliableMessage/Messaging/MessageCheckerResult.cs | 21 +++++++++++++++++++++ src/Pole.ReliableMessage/Messaging/MessageStatus.cs | 20 ++++++++++++++++++++ src/Pole.ReliableMessage/Pole.ReliableMessage.csproj | 24 ++++++++++++++++++++++++ src/Pole.ReliableMessage/Processor/LoopProcessor.cs | 51 +++++++++++++++++++++++++++++++++++++++++++++++++++ src/Pole.ReliableMessage/Processor/MessageBufferFlushProcessor.cs | 41 +++++++++++++++++++++++++++++++++++++++++ src/Pole.ReliableMessage/Processor/PendingMessageCheckProcessor.cs | 134 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/Pole.ReliableMessage/Processor/PendingMessageServiceInstanceCheckProcessor.cs | 68 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/Pole.ReliableMessage/Processor/ProcessingContext.cs | 17 +++++++++++++++++ src/Pole.ReliableMessage/Processor/ProcessorBase.cs | 20 ++++++++++++++++++++ src/Pole.ReliableMessage/ReliableMessageOption.cs | 93 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/Pole.ReliableMessage/Utils/DefaulTimeHelper.cs | 20 ++++++++++++++++++++ src/Pole.ReliableMessage/Utils/DefaultServiceIPv4AddressProvider.cs | 57 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 85 files changed, 2869 insertions(+), 6 deletions(-) create mode 100644 src/Pole.ReliableMessage.Dashboard/Pole.ReliableMessage.Dashboard.csproj create mode 100644 src/Pole.ReliableMessage.Masstransit/Abstraction/IReliableEvent.cs create mode 100644 src/Pole.ReliableMessage.Masstransit/Abstraction/IReliableEventHandlerRegistrarFactory.cs create mode 100644 src/Pole.ReliableMessage.Masstransit/DefaultReliableEventHandlerContext.cs create mode 100644 src/Pole.ReliableMessage.Masstransit/DefaultReliableEventHandlerRegistrarFactory.cs create mode 100644 src/Pole.ReliableMessage.Masstransit/MassTransitHostedService.cs create mode 100644 src/Pole.ReliableMessage.Masstransit/MasstransitBasedMessageBus.cs create mode 100644 src/Pole.ReliableMessage.Masstransit/MasstransitEventHandlerRegistrar.cs create mode 100644 src/Pole.ReliableMessage.Masstransit/MasstransitMessageBusConfigurator.cs create mode 100644 src/Pole.ReliableMessage.Masstransit/MasstransitRabbitmqOption.cs create mode 100644 src/Pole.ReliableMessage.Masstransit/MasstransitReliableEventHandler.cs create mode 100644 src/Pole.ReliableMessage.Masstransit/Messaging/DefaultMessageIdGenerator.cs create mode 100644 src/Pole.ReliableMessage.Masstransit/Pipe/AddReliableMessageIdPipe.cs create mode 100644 src/Pole.ReliableMessage.Masstransit/Pole.ReliableMessage.Masstransit.csproj create mode 100644 src/Pole.ReliableMessage.Masstransit/QueueHaType.cs create mode 100644 src/Pole.ReliableMessage.Masstransit/ReliableEventHandlerParemeterAttribute.cs create mode 100644 src/Pole.ReliableMessage.Masstransit/ReliableMessageOptionExtension.cs create mode 100644 src/Pole.ReliableMessage.Storage.Abstraction/IMemberShipTable.cs create mode 100644 src/Pole.ReliableMessage.Storage.Abstraction/Pole.ReliableMessage.Storage.Abstraction.csproj create mode 100644 src/Pole.ReliableMessage.Storage.EntityframeworkCore/Pole.ReliableMessage.Storage.EntityframeworkCore.csproj create mode 100644 src/Pole.ReliableMessage.Storage.Mongodb/MongoHost.cs create mode 100644 src/Pole.ReliableMessage.Storage.Mongodb/MongodbMemberShipTable.cs create mode 100644 src/Pole.ReliableMessage.Storage.Mongodb/MongodbOption.cs create mode 100644 src/Pole.ReliableMessage.Storage.Mongodb/MongodbStorage.cs create mode 100644 src/Pole.ReliableMessage.Storage.Mongodb/Pole.ReliableMessage.Storage.Mongodb.csproj create mode 100644 src/Pole.ReliableMessage.Storage.Mongodb/ReliableMessageOptionExtension.cs create mode 100644 src/Pole.ReliableMessage/Abstraction/IApplicationBuilderConfigurator.cs create mode 100644 src/Pole.ReliableMessage/Abstraction/IComteckReliableMessageBootstrap.cs create mode 100644 src/Pole.ReliableMessage/Abstraction/IJsonConverter.cs create mode 100644 src/Pole.ReliableMessage/Abstraction/IMessageBuffer.cs create mode 100644 src/Pole.ReliableMessage/Abstraction/IMessageBus.cs create mode 100644 src/Pole.ReliableMessage/Abstraction/IMessageBusConfigurator.cs create mode 100644 src/Pole.ReliableMessage/Abstraction/IMessageCallBackGenerator.cs create mode 100644 src/Pole.ReliableMessage/Abstraction/IMessageCallBackInfoStore.cs create mode 100644 src/Pole.ReliableMessage/Abstraction/IMessageCallBackRegister.cs create mode 100644 src/Pole.ReliableMessage/Abstraction/IMessageChecker.cs create mode 100644 src/Pole.ReliableMessage/Abstraction/IMessageIdGenerator.cs create mode 100644 src/Pole.ReliableMessage/Abstraction/IMessageStorage.cs create mode 100644 src/Pole.ReliableMessage/Abstraction/IMessageTypeIdGenerator.cs create mode 100644 src/Pole.ReliableMessage/Abstraction/IProcessor.cs create mode 100644 src/Pole.ReliableMessage/Abstraction/IProcessorServer.cs create mode 100644 src/Pole.ReliableMessage/Abstraction/IReliableBus.cs create mode 100644 src/Pole.ReliableMessage/Abstraction/IReliableEventCallBackFinder.cs create mode 100644 src/Pole.ReliableMessage/Abstraction/IReliableEventCallback.cs create mode 100644 src/Pole.ReliableMessage/Abstraction/IReliableEventHandler.cs create mode 100644 src/Pole.ReliableMessage/Abstraction/IReliableEventHandlerContext.cs create mode 100644 src/Pole.ReliableMessage/Abstraction/IReliableEventHandlerFinder.cs create mode 100644 src/Pole.ReliableMessage/Abstraction/IRetryTimeCalculator.cs create mode 100644 src/Pole.ReliableMessage/Abstraction/IServiceIPv4AddressProvider.cs create mode 100644 src/Pole.ReliableMessage/Abstraction/ITimeHelper.cs create mode 100644 src/Pole.ReliableMessage/BackgroundServiceBasedProcessorServer.cs create mode 100644 src/Pole.ReliableMessage/ComteckReliableMessageIApplicationBuilderExtensions.cs create mode 100644 src/Pole.ReliableMessage/ComteckReliableMessageServiceCollectionExtensions.cs create mode 100644 src/Pole.ReliableMessage/Core/Enumeration.cs create mode 100644 src/Pole.ReliableMessage/DefaultApplicationBuilderConfigurator.cs create mode 100644 src/Pole.ReliableMessage/DefaultComteckReliableMessageBootstrap.cs create mode 100644 src/Pole.ReliableMessage/DefaultRetryTimeCalculator.cs create mode 100644 src/Pole.ReliableMessage/EventBus/DefaultReliableBus.cs create mode 100644 src/Pole.ReliableMessage/EventBus/DefaultReliableEventFinder.cs create mode 100644 src/Pole.ReliableMessage/EventBus/DefaultReliableEventHandlerFinder.cs create mode 100644 src/Pole.ReliableMessage/IReliableMessageOptionExtension.cs create mode 100644 src/Pole.ReliableMessage/MemberShipTable.cs create mode 100644 src/Pole.ReliableMessage/Messaging/CallBack/DefaultMessageCallBackInfoGenerator.cs create mode 100644 src/Pole.ReliableMessage/Messaging/CallBack/DefaultMessageCallBackRegister.cs create mode 100644 src/Pole.ReliableMessage/Messaging/CallBack/MessageCallBackInfo.cs create mode 100644 src/Pole.ReliableMessage/Messaging/CallBack/MessageCallBackInfoInMemoryStore.cs create mode 100644 src/Pole.ReliableMessage/Messaging/CallBack/MessageCallBackResponseStatus.cs create mode 100644 src/Pole.ReliableMessage/Messaging/DefaultJsonConverter.cs create mode 100644 src/Pole.ReliableMessage/Messaging/DefaultMessageBuffer.cs create mode 100644 src/Pole.ReliableMessage/Messaging/DefaultMessageChecker.cs create mode 100644 src/Pole.ReliableMessage/Messaging/DefaultMessageTypeIdGenerator.cs create mode 100644 src/Pole.ReliableMessage/Messaging/Message.cs create mode 100644 src/Pole.ReliableMessage/Messaging/MessageCheckerResult.cs create mode 100644 src/Pole.ReliableMessage/Messaging/MessageStatus.cs create mode 100644 src/Pole.ReliableMessage/Pole.ReliableMessage.csproj create mode 100644 src/Pole.ReliableMessage/Processor/LoopProcessor.cs create mode 100644 src/Pole.ReliableMessage/Processor/MessageBufferFlushProcessor.cs create mode 100644 src/Pole.ReliableMessage/Processor/PendingMessageCheckProcessor.cs create mode 100644 src/Pole.ReliableMessage/Processor/PendingMessageServiceInstanceCheckProcessor.cs create mode 100644 src/Pole.ReliableMessage/Processor/ProcessingContext.cs create mode 100644 src/Pole.ReliableMessage/Processor/ProcessorBase.cs create mode 100644 src/Pole.ReliableMessage/ReliableMessageOption.cs create mode 100644 src/Pole.ReliableMessage/Utils/DefaulTimeHelper.cs create mode 100644 src/Pole.ReliableMessage/Utils/DefaultServiceIPv4AddressProvider.cs diff --git a/Pole.sln b/Pole.sln index 4adbd7f..1621bad 100644 --- a/Pole.sln +++ b/Pole.sln @@ -21,7 +21,23 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "samples", "samples", "{4A0F EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "apis", "apis", "{475116FC-DEEC-4255-94E4-AE7B8C85038D}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Product.Api", "samples\apis\Product.Api\Product.Api.csproj", "{6A68E63D-ED4B-4F46-9A2E-AA7FE2B0032E}" +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Pole.ReliableMessage", "src\Pole.ReliableMessage\Pole.ReliableMessage.csproj", "{699C75AB-4814-4E16-A3F3-9735C4C609FE}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Pole.ReliableMessage.Masstransit", "src\Pole.ReliableMessage.Masstransit\Pole.ReliableMessage.Masstransit.csproj", "{051BECA5-5E65-4FCB-9B7F-C9E64809E218}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Pole.ReliableMessage.Dashboard", "src\Pole.ReliableMessage.Dashboard\Pole.ReliableMessage.Dashboard.csproj", "{9EF13D70-C3AF-471A-8AA6-603338B194B7}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Pole.ReliableMessage.Storage.Abstraction", "src\Pole.ReliableMessage.Storage.Abstraction\Pole.ReliableMessage.Storage.Abstraction.csproj", "{3D92F460-350B-4614-A6F6-C00A2D0FA9E2}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Pole.ReliableMessage.Storage.Mongodb", "src\Pole.ReliableMessage.Storage.Mongodb\Pole.ReliableMessage.Storage.Mongodb.csproj", "{793C73C6-93DE-4A56-B979-137914B247F2}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Pole.ReliableMessage.Storage.EntityframeworkCore", "src\Pole.ReliableMessage.Storage.EntityframeworkCore\Pole.ReliableMessage.Storage.EntityframeworkCore.csproj", "{805CF4F7-CCDC-4390-A92B-55E0FFA7F659}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Grpc", "Grpc", "{629045E7-B047-452A-AADA-ACB455B4FAFD}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Rest", "Rest", "{BA122337-74D2-4439-A10E-B20E14881290}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "webs", "webs", "{452B9D9E-881E-4E0E-A90B-98F2253F20F1}" EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution @@ -49,10 +65,30 @@ Global {F40FE25F-6081-4B29-A7BD-CB5C24F6FDA8}.Debug|Any CPU.Build.0 = Debug|Any CPU {F40FE25F-6081-4B29-A7BD-CB5C24F6FDA8}.Release|Any CPU.ActiveCfg = Release|Any CPU {F40FE25F-6081-4B29-A7BD-CB5C24F6FDA8}.Release|Any CPU.Build.0 = Release|Any CPU - {6A68E63D-ED4B-4F46-9A2E-AA7FE2B0032E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {6A68E63D-ED4B-4F46-9A2E-AA7FE2B0032E}.Debug|Any CPU.Build.0 = Debug|Any CPU - {6A68E63D-ED4B-4F46-9A2E-AA7FE2B0032E}.Release|Any CPU.ActiveCfg = Release|Any CPU - {6A68E63D-ED4B-4F46-9A2E-AA7FE2B0032E}.Release|Any CPU.Build.0 = Release|Any CPU + {699C75AB-4814-4E16-A3F3-9735C4C609FE}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {699C75AB-4814-4E16-A3F3-9735C4C609FE}.Debug|Any CPU.Build.0 = Debug|Any CPU + {699C75AB-4814-4E16-A3F3-9735C4C609FE}.Release|Any CPU.ActiveCfg = Release|Any CPU + {699C75AB-4814-4E16-A3F3-9735C4C609FE}.Release|Any CPU.Build.0 = Release|Any CPU + {051BECA5-5E65-4FCB-9B7F-C9E64809E218}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {051BECA5-5E65-4FCB-9B7F-C9E64809E218}.Debug|Any CPU.Build.0 = Debug|Any CPU + {051BECA5-5E65-4FCB-9B7F-C9E64809E218}.Release|Any CPU.ActiveCfg = Release|Any CPU + {051BECA5-5E65-4FCB-9B7F-C9E64809E218}.Release|Any CPU.Build.0 = Release|Any CPU + {9EF13D70-C3AF-471A-8AA6-603338B194B7}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {9EF13D70-C3AF-471A-8AA6-603338B194B7}.Debug|Any CPU.Build.0 = Debug|Any CPU + {9EF13D70-C3AF-471A-8AA6-603338B194B7}.Release|Any CPU.ActiveCfg = Release|Any CPU + {9EF13D70-C3AF-471A-8AA6-603338B194B7}.Release|Any CPU.Build.0 = Release|Any CPU + {3D92F460-350B-4614-A6F6-C00A2D0FA9E2}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {3D92F460-350B-4614-A6F6-C00A2D0FA9E2}.Debug|Any CPU.Build.0 = Debug|Any CPU + {3D92F460-350B-4614-A6F6-C00A2D0FA9E2}.Release|Any CPU.ActiveCfg = Release|Any CPU + {3D92F460-350B-4614-A6F6-C00A2D0FA9E2}.Release|Any CPU.Build.0 = Release|Any CPU + {793C73C6-93DE-4A56-B979-137914B247F2}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {793C73C6-93DE-4A56-B979-137914B247F2}.Debug|Any CPU.Build.0 = Debug|Any CPU + {793C73C6-93DE-4A56-B979-137914B247F2}.Release|Any CPU.ActiveCfg = Release|Any CPU + {793C73C6-93DE-4A56-B979-137914B247F2}.Release|Any CPU.Build.0 = Release|Any CPU + {805CF4F7-CCDC-4390-A92B-55E0FFA7F659}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {805CF4F7-CCDC-4390-A92B-55E0FFA7F659}.Debug|Any CPU.Build.0 = Debug|Any CPU + {805CF4F7-CCDC-4390-A92B-55E0FFA7F659}.Release|Any CPU.ActiveCfg = Release|Any CPU + {805CF4F7-CCDC-4390-A92B-55E0FFA7F659}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -64,7 +100,15 @@ Global {1C26BE3A-CBEA-47D1-97A0-6DB4F41DFF5A} = {9932C965-8B38-4F70-9E43-86DC56860E2B} {F40FE25F-6081-4B29-A7BD-CB5C24F6FDA8} = {9932C965-8B38-4F70-9E43-86DC56860E2B} {475116FC-DEEC-4255-94E4-AE7B8C85038D} = {4A0FB696-EC29-4A5F-B40B-A6FC56001ADB} - {6A68E63D-ED4B-4F46-9A2E-AA7FE2B0032E} = {475116FC-DEEC-4255-94E4-AE7B8C85038D} + {699C75AB-4814-4E16-A3F3-9735C4C609FE} = {9932C965-8B38-4F70-9E43-86DC56860E2B} + {051BECA5-5E65-4FCB-9B7F-C9E64809E218} = {9932C965-8B38-4F70-9E43-86DC56860E2B} + {9EF13D70-C3AF-471A-8AA6-603338B194B7} = {9932C965-8B38-4F70-9E43-86DC56860E2B} + {3D92F460-350B-4614-A6F6-C00A2D0FA9E2} = {9932C965-8B38-4F70-9E43-86DC56860E2B} + {793C73C6-93DE-4A56-B979-137914B247F2} = {9932C965-8B38-4F70-9E43-86DC56860E2B} + {805CF4F7-CCDC-4390-A92B-55E0FFA7F659} = {9932C965-8B38-4F70-9E43-86DC56860E2B} + {629045E7-B047-452A-AADA-ACB455B4FAFD} = {475116FC-DEEC-4255-94E4-AE7B8C85038D} + {BA122337-74D2-4439-A10E-B20E14881290} = {475116FC-DEEC-4255-94E4-AE7B8C85038D} + {452B9D9E-881E-4E0E-A90B-98F2253F20F1} = {4A0FB696-EC29-4A5F-B40B-A6FC56001ADB} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {DB0775A3-F293-4043-ADB7-72BAC081E87E} diff --git a/src/Pole.ReliableMessage.Dashboard/Pole.ReliableMessage.Dashboard.csproj b/src/Pole.ReliableMessage.Dashboard/Pole.ReliableMessage.Dashboard.csproj new file mode 100644 index 0000000..9f5c4f4 --- /dev/null +++ b/src/Pole.ReliableMessage.Dashboard/Pole.ReliableMessage.Dashboard.csproj @@ -0,0 +1,7 @@ + + + + netstandard2.0 + + + diff --git a/src/Pole.ReliableMessage.Masstransit/Abstraction/IReliableEvent.cs b/src/Pole.ReliableMessage.Masstransit/Abstraction/IReliableEvent.cs new file mode 100644 index 0000000..5f80e76 --- /dev/null +++ b/src/Pole.ReliableMessage.Masstransit/Abstraction/IReliableEvent.cs @@ -0,0 +1,10 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace Pole.ReliableMessage.Masstransit.Abstraction +{ + public interface IReliableEvent + { + } +} diff --git a/src/Pole.ReliableMessage.Masstransit/Abstraction/IReliableEventHandlerRegistrarFactory.cs b/src/Pole.ReliableMessage.Masstransit/Abstraction/IReliableEventHandlerRegistrarFactory.cs new file mode 100644 index 0000000..eb09889 --- /dev/null +++ b/src/Pole.ReliableMessage.Masstransit/Abstraction/IReliableEventHandlerRegistrarFactory.cs @@ -0,0 +1,12 @@ +using Pole.ReliableMessage.EventBus; +using System; +using System.Collections.Generic; +using System.Text; + +namespace Pole.ReliableMessage.Masstransit.Abstraction +{ + public interface IReliableEventHandlerRegistrarFactory + { + MasstransitEventHandlerRegistrar Create(Type eventHandlerType); + } +} diff --git a/src/Pole.ReliableMessage.Masstransit/DefaultReliableEventHandlerContext.cs b/src/Pole.ReliableMessage.Masstransit/DefaultReliableEventHandlerContext.cs new file mode 100644 index 0000000..fbb2ec5 --- /dev/null +++ b/src/Pole.ReliableMessage.Masstransit/DefaultReliableEventHandlerContext.cs @@ -0,0 +1,21 @@ +using Pole.ReliableMessage.Abstraction; +using MassTransit; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; + +namespace Pole.ReliableMessage.Masstransit +{ + public class DefaultReliableEventHandlerContext : IReliableEventHandlerContext + where TEvent : class + { + private readonly ConsumeContext _executeContext; + public DefaultReliableEventHandlerContext(ConsumeContext executeContext) + { + _executeContext = executeContext; + this.Event = executeContext.Message; + } + public TEvent Event { get; private set; } + } +} diff --git a/src/Pole.ReliableMessage.Masstransit/DefaultReliableEventHandlerRegistrarFactory.cs b/src/Pole.ReliableMessage.Masstransit/DefaultReliableEventHandlerRegistrarFactory.cs new file mode 100644 index 0000000..e6877d3 --- /dev/null +++ b/src/Pole.ReliableMessage.Masstransit/DefaultReliableEventHandlerRegistrarFactory.cs @@ -0,0 +1,76 @@ +using Pole.ReliableMessage.Core; +using Pole.ReliableMessage.EventBus; +using Pole.ReliableMessage.Masstransit.Abstraction; +using Microsoft.Extensions.Options; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; + +namespace Pole.ReliableMessage.Masstransit +{ + class DefaultReliableEventHandlerRegistrarFactory : IReliableEventHandlerRegistrarFactory + { + private readonly MasstransitRabbitmqOption _masstransitOptions; + public DefaultReliableEventHandlerRegistrarFactory(IOptions masstransitOptions) + { + _masstransitOptions = masstransitOptions.Value ?? throw new ArgumentNullException(nameof(masstransitOptions)); + } + public MasstransitEventHandlerRegistrar Create(Type eventHnadler) + { + + if (!eventHnadler.Name.EndsWith("EventHandler")) + { + throw new Exception("EventHandler Name Must EndWith EventHandler"); + } + var reliableEventHandlerParemeterAttribute = eventHnadler.GetCustomAttributes(typeof(ReliableEventHandlerParemeterAttribute), true).FirstOrDefault(); + + var eventHandlerName = GetQueueName(reliableEventHandlerParemeterAttribute, eventHnadler, _masstransitOptions.QueueNamePrefix); + + var parentEventHandler = eventHnadler.BaseType; + var eventType = parentEventHandler.GetGenericArguments().ToList().FirstOrDefault(); + + ushort prefetchCount = GetPrefetchCount(eventHnadler, reliableEventHandlerParemeterAttribute); + + MasstransitEventHandlerRegistrar eventHandlerRegisterInvoker = new MasstransitEventHandlerRegistrar(eventHandlerName, eventHnadler, eventType, _masstransitOptions.RetryConfigure, prefetchCount); + return eventHandlerRegisterInvoker; + } + + private string GetQueueName(object reliableEventHandlerParemeterAttribute, Type eventHnadler, string queueNamePrefix) + { + var eventHandlerDefaultName = $"eventHandler-{ eventHnadler.Name.Replace("EventHandler", "").ToLowerInvariant()}"; + var eventHandlerName = string.IsNullOrEmpty(queueNamePrefix) ? eventHandlerDefaultName : $"{queueNamePrefix}-{eventHandlerDefaultName}"; + + if (reliableEventHandlerParemeterAttribute != null) + { + var reliableEventHandlerParemeterAttributeType = reliableEventHandlerParemeterAttribute.GetType(); + var prefetchCountPropertyInfo = reliableEventHandlerParemeterAttributeType.GetProperty(nameof(ReliableEventHandlerParemeterAttribute.QueueHaType)); + var queueHaTypeValue = Convert.ToInt32(prefetchCountPropertyInfo.GetValue(reliableEventHandlerParemeterAttribute)); + if (queueHaTypeValue != 0) + { + var currentQueueType = Enumeration.FromValue(queueHaTypeValue); + eventHandlerName = currentQueueType.GenerateQueueName(eventHandlerName); + } + } + + return eventHandlerName; + } + + private ushort GetPrefetchCount(Type eventHnadler, object reliableEventHandlerParemeterAttribute) + { + var prefetchCount = _masstransitOptions.PrefetchCount; + if (reliableEventHandlerParemeterAttribute != null) + { + var reliableEventHandlerParemeterAttributeType = reliableEventHandlerParemeterAttribute.GetType(); + var prefetchCountPropertyInfo = reliableEventHandlerParemeterAttributeType.GetProperty(nameof(ReliableEventHandlerParemeterAttribute.PrefetchCount)); + var prefetchCountValue = Convert.ToUInt16(prefetchCountPropertyInfo.GetValue(reliableEventHandlerParemeterAttribute)); + if (prefetchCountValue != 0) + { + prefetchCount = prefetchCountValue; + } + } + + return prefetchCount; + } + } +} diff --git a/src/Pole.ReliableMessage.Masstransit/MassTransitHostedService.cs b/src/Pole.ReliableMessage.Masstransit/MassTransitHostedService.cs new file mode 100644 index 0000000..d819701 --- /dev/null +++ b/src/Pole.ReliableMessage.Masstransit/MassTransitHostedService.cs @@ -0,0 +1,33 @@ +using MassTransit; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace Pole.ReliableMessage.Masstransit +{ + public class MassTransitHostedService : IHostedService + { + readonly IBusControl _bus; + readonly ILogger _logger; + public MassTransitHostedService(IBusControl bus, ILogger logger) + { + _bus = bus; + _logger = logger; + } + public async Task StartAsync(CancellationToken cancellationToken) + { + await _bus.StartAsync(); + _logger.LogInformation("MassTransit Bus Start Successful"); + } + + public async Task StopAsync(CancellationToken cancellationToken) + { + await _bus.StopAsync(); + _logger.LogInformation("MassTransit Bus Stop Successful"); + } + } +} diff --git a/src/Pole.ReliableMessage.Masstransit/MasstransitBasedMessageBus.cs b/src/Pole.ReliableMessage.Masstransit/MasstransitBasedMessageBus.cs new file mode 100644 index 0000000..3f9665e --- /dev/null +++ b/src/Pole.ReliableMessage.Masstransit/MasstransitBasedMessageBus.cs @@ -0,0 +1,24 @@ +using Pole.ReliableMessage.Abstraction; +using Pole.ReliableMessage.Masstransit.Pipe; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace Pole.ReliableMessage.Masstransit +{ + class MasstransitBasedMessageBus : IMessageBus + { + public MasstransitBasedMessageBus(MassTransit.IBus bus) + { + _bus = bus; + } + private readonly MassTransit.IBus _bus; + public Task Publish(object @event,string reliableMessageId, CancellationToken cancellationToken = default(CancellationToken)) + { + var pipe = new AddReliableMessageIdPipe(reliableMessageId); + return _bus.Publish(@event, pipe, cancellationToken); + } + } +} diff --git a/src/Pole.ReliableMessage.Masstransit/MasstransitEventHandlerRegistrar.cs b/src/Pole.ReliableMessage.Masstransit/MasstransitEventHandlerRegistrar.cs new file mode 100644 index 0000000..a1ea419 --- /dev/null +++ b/src/Pole.ReliableMessage.Masstransit/MasstransitEventHandlerRegistrar.cs @@ -0,0 +1,46 @@ +using GreenPipes; +using GreenPipes.Configurators; +using MassTransit; +using MassTransit.ExtensionsDependencyInjectionIntegration; +using MassTransit.RabbitMqTransport; +using Microsoft.Extensions.DependencyInjection; +using System; +using System.Collections.Generic; +using System.Text; + +namespace Pole.ReliableMessage.EventBus +{ + public class MasstransitEventHandlerRegistrar + { + private readonly string _queueName; + private readonly Type _eventHandlerType; + private readonly Type _eventHandlerEventType; + private readonly Action _retryConfigure; + public readonly ushort _prefetchCount; + public MasstransitEventHandlerRegistrar(string eventHandlerName, Type eventHandlerType, Type eventHandlerEventType, Action retryConfigure, ushort prefetchCount) + { + _queueName = eventHandlerName; + _eventHandlerType = eventHandlerType; + _eventHandlerEventType = eventHandlerEventType; + _retryConfigure = retryConfigure; + _prefetchCount = prefetchCount; + } + public void RegisterEventHandler(IServiceCollectionConfigurator serviceCollectionConfigurator, IServiceCollection services) + { + serviceCollectionConfigurator.AddConsumer(_eventHandlerType); + } + public void RegisterQueue(IServiceCollectionConfigurator serviceCollectionConfigurator, IRabbitMqBusFactoryConfigurator rabbitMqBusFactoryConfigurator, IRabbitMqHost rabbitMqHost, IServiceProvider serviceProvider) + { + + //serviceCollectionConfigurator.AddConsumer(_eventHandlerType); + + rabbitMqBusFactoryConfigurator.ReceiveEndpoint(_queueName, conf => + { + //conf.Consumer() + conf.ConfigureConsumer(serviceProvider, _eventHandlerType); + conf.PrefetchCount = _prefetchCount; + conf.UseMessageRetry(_retryConfigure); + }); + } + } +} diff --git a/src/Pole.ReliableMessage.Masstransit/MasstransitMessageBusConfigurator.cs b/src/Pole.ReliableMessage.Masstransit/MasstransitMessageBusConfigurator.cs new file mode 100644 index 0000000..b2f1742 --- /dev/null +++ b/src/Pole.ReliableMessage.Masstransit/MasstransitMessageBusConfigurator.cs @@ -0,0 +1,59 @@ +using Pole.ReliableMessage.Abstraction; +using Pole.ReliableMessage.EventBus; +using Pole.ReliableMessage.Masstransit.Abstraction; +using MassTransit; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Options; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Reflection; +using System.Text; +using System.Threading.Tasks; + +namespace Pole.ReliableMessage.Masstransit +{ + class MasstransitMessageBusConfigurator : IMessageBusConfigurator + { + private readonly IReliableEventHandlerRegistrarFactory _reliableEventHandlerRegistrarFactory; + private readonly MasstransitRabbitmqOption _options; + public MasstransitMessageBusConfigurator(IReliableEventHandlerRegistrarFactory reliableEventHandlerRegistrarFactory, IOptions options) + { + _reliableEventHandlerRegistrarFactory = reliableEventHandlerRegistrarFactory; + _options = options.Value; + } + public async Task Configure(IServiceCollection services,IEnumerable eventHandlerTypes) + { + await Task.CompletedTask; + var eventHandlerRegistrars = GetEventHandlerRegistrars(eventHandlerTypes).ToList(); + services.AddMassTransit(x => + { + foreach (var eventHandlerRegistrar in eventHandlerRegistrars) + { + eventHandlerRegistrar.RegisterEventHandler(x, services); + } + x.AddBus(provider => Bus.Factory.CreateUsingRabbitMq(cfg => + { + var host = cfg.Host(new Uri(_options.RabbitMqHostAddress), h => + { + h.Username(_options.RabbitMqHostUserName); + h.Password(_options.RabbitMqHostPassword); + + }); + foreach (var eventHandlerRegistrar in eventHandlerRegistrars) + { + eventHandlerRegistrar.RegisterQueue(x, cfg, host, provider); + } + })); + }); + } + private IEnumerable GetEventHandlerRegistrars(IEnumerable eventHandlerTypes) + { + foreach (var eventHandler in eventHandlerTypes) + { + var model = _reliableEventHandlerRegistrarFactory.Create(eventHandler); + yield return model; + } + } + } +} diff --git a/src/Pole.ReliableMessage.Masstransit/MasstransitRabbitmqOption.cs b/src/Pole.ReliableMessage.Masstransit/MasstransitRabbitmqOption.cs new file mode 100644 index 0000000..ab955d1 --- /dev/null +++ b/src/Pole.ReliableMessage.Masstransit/MasstransitRabbitmqOption.cs @@ -0,0 +1,29 @@ +using GreenPipes; +using GreenPipes.Configurators; +using Microsoft.Extensions.DependencyInjection; +using System; +using System.Collections.Generic; +using System.Text; + +namespace Pole.ReliableMessage.Masstransit +{ + public class MasstransitRabbitmqOption + { + public string RabbitMqHostAddress { get; set; } + public string RabbitMqHostUserName { get; set; } + public string RabbitMqHostPassword { get; set; } + public string QueueNamePrefix { get; set; } = string.Empty; + /// + /// 4 个并发 + /// + public ushort PrefetchCount { get; set; } = 4; + + public Action RetryConfigure { get; set; } = + r => r.Intervals(TimeSpan.FromSeconds(0.1) + , TimeSpan.FromSeconds(1) + , TimeSpan.FromSeconds(4) + , TimeSpan.FromSeconds(16) + , TimeSpan.FromSeconds(64) + ); + } +} diff --git a/src/Pole.ReliableMessage.Masstransit/MasstransitReliableEventHandler.cs b/src/Pole.ReliableMessage.Masstransit/MasstransitReliableEventHandler.cs new file mode 100644 index 0000000..c9c47f0 --- /dev/null +++ b/src/Pole.ReliableMessage.Masstransit/MasstransitReliableEventHandler.cs @@ -0,0 +1,65 @@ +using Pole.ReliableMessage.Abstraction; +using Pole.ReliableMessage.Masstransit.Pipe; +using Pole.ReliableMessage.Messaging; +using MassTransit; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; + +namespace Pole.ReliableMessage.Masstransit +{ + + public abstract class ReliableEventHandler : IReliableEventHandler,IConsumer + where TEvent : class + { + private readonly IMessageStorage _messageStorage; + private readonly ILogger> _logger; + private readonly IServiceProvider _serviceProvider; + public ReliableEventHandler(IServiceProvider serviceProvider) + { + _messageStorage = serviceProvider.GetRequiredService(typeof(IMessageStorage)) as IMessageStorage; + var loggerFactory = serviceProvider.GetRequiredService(typeof(ILoggerFactory)) as ILoggerFactory; + _logger = loggerFactory.CreateLogger>(); + _serviceProvider = serviceProvider; + } + + public abstract Task Handle(IReliableEventHandlerContext context); + public async Task Consume(ConsumeContext context) + { + var messageId = GetReliableMessageId(context); + if (_logger.IsEnabled(LogLevel.Debug)) + { + var jsonConveter = _serviceProvider.GetRequiredService(typeof(IJsonConverter)) as IJsonConverter; + var json = jsonConveter.Serialize(context.Message); + _logger.LogDebug($"Message Begin Handle,messageId:{messageId}, message content :{json}"); + } + + var retryAttempt = context.GetRetryAttempt(); + if (retryAttempt == 0) + { + if (string.IsNullOrEmpty(messageId)) + { + _logger.LogWarning($"Message has no ReliableMessageId, ignore"); + return; + } + var isHandled = !await _messageStorage.CheckAndUpdateStatus(m => m.Id == messageId, MessageStatus.Handed); + if (isHandled) + { + _logger.LogTrace($"This message has handled begore ReliableMessageId:{messageId}, ignore"); + return; + } + } + await Handle(new DefaultReliableEventHandlerContext(context)); + + _logger.LogDebug($"Message handled successfully ,messageId:{messageId}"); + } + + private string GetReliableMessageId(ConsumeContext context) + { + return context.Headers.Get(AddReliableMessageIdPipe.RELIABLE_MESSAGE_ID, string.Empty); + } + } +} diff --git a/src/Pole.ReliableMessage.Masstransit/Messaging/DefaultMessageIdGenerator.cs b/src/Pole.ReliableMessage.Masstransit/Messaging/DefaultMessageIdGenerator.cs new file mode 100644 index 0000000..20b50e1 --- /dev/null +++ b/src/Pole.ReliableMessage.Masstransit/Messaging/DefaultMessageIdGenerator.cs @@ -0,0 +1,16 @@ +using Pole.ReliableMessage.Abstraction; +using MassTransit; +using System; +using System.Collections.Generic; +using System.Text; + +namespace Pole.ReliableMessage.Masstransit.Messaging +{ + class DefaultMessageIdGenerator : IMessageIdGenerator + { + public string Generate() + { + return NewId.Next().ToString("N").ToLowerInvariant(); + } + } +} diff --git a/src/Pole.ReliableMessage.Masstransit/Pipe/AddReliableMessageIdPipe.cs b/src/Pole.ReliableMessage.Masstransit/Pipe/AddReliableMessageIdPipe.cs new file mode 100644 index 0000000..594bd58 --- /dev/null +++ b/src/Pole.ReliableMessage.Masstransit/Pipe/AddReliableMessageIdPipe.cs @@ -0,0 +1,29 @@ +using GreenPipes; +using MassTransit; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; + +namespace Pole.ReliableMessage.Masstransit.Pipe +{ + class AddReliableMessageIdPipe : IPipe + { + public const string RELIABLE_MESSAGE_ID = "ReliableMessageId"; + private readonly string _reliableMessageId; + public AddReliableMessageIdPipe(string reliableMessageId) + { + _reliableMessageId = reliableMessageId; + } + public void Probe(ProbeContext context) + { + + } + + public async Task Send(PublishContext context) + { + context.Headers.Set(RELIABLE_MESSAGE_ID, _reliableMessageId); + await Task.CompletedTask; + } + } +} diff --git a/src/Pole.ReliableMessage.Masstransit/Pole.ReliableMessage.Masstransit.csproj b/src/Pole.ReliableMessage.Masstransit/Pole.ReliableMessage.Masstransit.csproj new file mode 100644 index 0000000..e4c922f --- /dev/null +++ b/src/Pole.ReliableMessage.Masstransit/Pole.ReliableMessage.Masstransit.csproj @@ -0,0 +1,17 @@ + + + + netstandard2.0 + + + + + + + + + + + + + diff --git a/src/Pole.ReliableMessage.Masstransit/QueueHaType.cs b/src/Pole.ReliableMessage.Masstransit/QueueHaType.cs new file mode 100644 index 0000000..6914c6c --- /dev/null +++ b/src/Pole.ReliableMessage.Masstransit/QueueHaType.cs @@ -0,0 +1,32 @@ +using Pole.ReliableMessage.Core; +using System; +using System.Collections.Generic; +using System.Text; + +namespace Pole.ReliableMessage.Masstransit +{ + public class QueueHaType : Enumeration + { + public static QueueHaType None = new QueueHaType(1, "无",string.Empty); + public static QueueHaType Default = new QueueHaType(2, "默认高可用","Rmd."); + public static QueueHaType Backlog = new QueueHaType(3, "消息可积压","Rmb."); + private readonly string _queuePrefix; + public QueueHaType(int id, string name) : base(id, name) + { + } + public QueueHaType(int id, string name,string prefix) : this(id, name) + { + _queuePrefix = prefix; + } + public string GenerateQueueName(string rawQueueName) + { + return string.Concat(_queuePrefix, rawQueueName); + } + } + public enum QueueHaTypeEnum:int + { + None = 1, + Default = 2, + Backlog = 3 + } +} diff --git a/src/Pole.ReliableMessage.Masstransit/ReliableEventHandlerParemeterAttribute.cs b/src/Pole.ReliableMessage.Masstransit/ReliableEventHandlerParemeterAttribute.cs new file mode 100644 index 0000000..e1e6d8e --- /dev/null +++ b/src/Pole.ReliableMessage.Masstransit/ReliableEventHandlerParemeterAttribute.cs @@ -0,0 +1,15 @@ +using GreenPipes.Configurators; +using System; +using System.Collections.Generic; +using System.Text; + +namespace Pole.ReliableMessage.Masstransit +{ + [AttributeUsage(AttributeTargets.Class)] + public class ReliableEventHandlerParemeterAttribute : Attribute + { + public ushort PrefetchCount { get; set; } + public QueueHaTypeEnum QueueHaType { get; set; } = QueueHaTypeEnum.Default; + + } +} diff --git a/src/Pole.ReliableMessage.Masstransit/ReliableMessageOptionExtension.cs b/src/Pole.ReliableMessage.Masstransit/ReliableMessageOptionExtension.cs new file mode 100644 index 0000000..5b05c2e --- /dev/null +++ b/src/Pole.ReliableMessage.Masstransit/ReliableMessageOptionExtension.cs @@ -0,0 +1,38 @@ +using Pole.ReliableMessage; +using Pole.ReliableMessage.Abstraction; +using Pole.ReliableMessage.Masstransit; +using Pole.ReliableMessage.Masstransit.Abstraction; +using Microsoft.Extensions.DependencyInjection; +using System; +using System.Collections.Generic; +using System.Text; +using Pole.ReliableMessage.Masstransit.Messaging; + +namespace Microsoft.Extensions.DependencyInjection +{ + public static class ReliableMessageOptionExtension + { + public static ReliableMessageOption AddMasstransitRabbitmq(this ReliableMessageOption option, Action optionConfig) + { + option.ReliableMessageOptionExtensions.Add(new MasstransitRabbitmqExtension(optionConfig)); + return option; + } + } + public class MasstransitRabbitmqExtension : IReliableMessageOptionExtension + { + private readonly Action _masstransitRabbitmqOption; + public MasstransitRabbitmqExtension(Action masstransitRabbitmqOption) + { + _masstransitRabbitmqOption = masstransitRabbitmqOption; + } + public void AddServices(IServiceCollection services) + { + services.Configure(_masstransitRabbitmqOption); + services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(); + services.AddHostedService(); + } + } +} diff --git a/src/Pole.ReliableMessage.Storage.Abstraction/IMemberShipTable.cs b/src/Pole.ReliableMessage.Storage.Abstraction/IMemberShipTable.cs new file mode 100644 index 0000000..9f9b1ed --- /dev/null +++ b/src/Pole.ReliableMessage.Storage.Abstraction/IMemberShipTable.cs @@ -0,0 +1,22 @@ +using System; +using System.Collections.Generic; +using System.Net; +using System.Text; +using System.Threading.Tasks; + +namespace Pole.ReliableMessage.Storage.Abstraction +{ + public interface IMemberShipTable + { + Task IsPendingMessageCheckerServiceInstance(string ipAddress); + Task UpdateIAmAlive(string ipAddress, DateTime dateTime); + /// + /// 如果当前 超时时间内 没有可用 实例 返回 空 + /// + /// + /// + Task GetPendingMessageCheckerServiceInstanceIp(DateTime iamAliveEndTime); + + Task AddCheckerServiceInstanceAndDeleteOthers(string ipAddress, DateTime aliveUTCTime); + } +} diff --git a/src/Pole.ReliableMessage.Storage.Abstraction/Pole.ReliableMessage.Storage.Abstraction.csproj b/src/Pole.ReliableMessage.Storage.Abstraction/Pole.ReliableMessage.Storage.Abstraction.csproj new file mode 100644 index 0000000..9f5c4f4 --- /dev/null +++ b/src/Pole.ReliableMessage.Storage.Abstraction/Pole.ReliableMessage.Storage.Abstraction.csproj @@ -0,0 +1,7 @@ + + + + netstandard2.0 + + + diff --git a/src/Pole.ReliableMessage.Storage.EntityframeworkCore/Pole.ReliableMessage.Storage.EntityframeworkCore.csproj b/src/Pole.ReliableMessage.Storage.EntityframeworkCore/Pole.ReliableMessage.Storage.EntityframeworkCore.csproj new file mode 100644 index 0000000..9ef9d54 --- /dev/null +++ b/src/Pole.ReliableMessage.Storage.EntityframeworkCore/Pole.ReliableMessage.Storage.EntityframeworkCore.csproj @@ -0,0 +1,11 @@ + + + + netstandard2.0 + + + + + + + diff --git a/src/Pole.ReliableMessage.Storage.Mongodb/MongoHost.cs b/src/Pole.ReliableMessage.Storage.Mongodb/MongoHost.cs new file mode 100644 index 0000000..33f4d32 --- /dev/null +++ b/src/Pole.ReliableMessage.Storage.Mongodb/MongoHost.cs @@ -0,0 +1,19 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace Pole.ReliableMessage.Storage.Mongodb +{ + public sealed class MongoHost + { + /// + /// 主机或者IP地址 + /// + public string Host { get; set; } + + /// + /// 端口号 + /// + public int Port { get; set; } + } +} diff --git a/src/Pole.ReliableMessage.Storage.Mongodb/MongodbMemberShipTable.cs b/src/Pole.ReliableMessage.Storage.Mongodb/MongodbMemberShipTable.cs new file mode 100644 index 0000000..72f8ee4 --- /dev/null +++ b/src/Pole.ReliableMessage.Storage.Mongodb/MongodbMemberShipTable.cs @@ -0,0 +1,94 @@ +using Pole.ReliableMessage.Abstraction; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using MongoDB.Driver; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Pole.ReliableMessage.Storage.Abstraction; + +namespace Pole.ReliableMessage.Storage.Mongodb +{ + class MongodbMemberShipTable : IMemberShipTable + { + private readonly MongoClient _mongoClient; + private readonly MongodbOption _mongodbOption; + private readonly ILogger _logger; + public MongodbMemberShipTable(IConfiguration configuration, MongoClient mongoClient, IOptions mongodbOption, ILogger logger) + { + _mongoClient = mongoClient; + _mongodbOption = mongodbOption.Value; + _logger = logger; + } + private IMongoDatabase GetActiveMessageDatabase(string activeMessageDatabase) + { + return _mongoClient.GetDatabase(activeMessageDatabase); + } + private IMongoCollection GetCollection() + { + var database = GetActiveMessageDatabase(_mongodbOption.MessageDatabaseName); + var messageCollectionName = _mongodbOption.MembershipCollectionName; + var collection = database.GetCollection(messageCollectionName); + return collection; + } + public async Task AddCheckerServiceInstanceAndDeleteOthers(string ipAddress, DateTime aliveUTCTime) + { + var collection = GetCollection(); + var deleteResult = await collection.DeleteManyAsync(m => m.ServiceName == _mongodbOption.ServiceCollectionName); + MemberShipTable memberShipTable = new MemberShipTable(_mongodbOption.ServiceCollectionName, ipAddress, aliveUTCTime); + await collection.InsertOneAsync(memberShipTable); + return true; + } + + public async Task GetPendingMessageCheckerServiceInstanceIp(DateTime iamAliveEndTime) + { + var collection = GetCollection(); + + var instances = (await collection.FindAsync(m => m.ServiceName == _mongodbOption.ServiceCollectionName && m.IAmAliveUTCTime >= iamAliveEndTime)).ToList(); + if (instances.Count > 1) + { + _logger.LogInformation($"Current time have {instances.Count} PendingMessageChecker in {_mongodbOption.ServiceCollectionName} service , I will delete the extra instances"); + var currentInstance = instances.FirstOrDefault(); + var extraInstances = instances.Remove(currentInstance); + instances.ForEach(async n => + { + await collection.DeleteOneAsync(m => m.Id == n.Id); + }); + _logger.LogInformation($"Extra PendingMessageChecker instances in {_mongodbOption.ServiceCollectionName} service deleted successfully"); + return currentInstance.PendingMessageCheckerIp; + } + else if (instances.Count == 1) + { + return instances.FirstOrDefault().PendingMessageCheckerIp; + } + else + { + return null; + } + } + + public async Task IsPendingMessageCheckerServiceInstance(string ipAddress) + { + var collection = GetCollection(); + + var instances = (await collection.FindAsync(m => m.ServiceName == _mongodbOption.ServiceCollectionName && m.PendingMessageCheckerIp== ipAddress)).FirstOrDefault(); + if (instances != null) + { + return true; + } + return false; + } + + public async Task UpdateIAmAlive(string ipAddress,DateTime dateTime) + { + var collection = GetCollection(); + var filter = Builders.Filter.Where(m => m.ServiceName == _mongodbOption.ServiceCollectionName && m.PendingMessageCheckerIp == ipAddress); + var update = Builders.Update.Set(m=>m.IAmAliveUTCTime,dateTime); + var result = await collection.UpdateOneAsync(filter, update); + return true; + } + } +} diff --git a/src/Pole.ReliableMessage.Storage.Mongodb/MongodbOption.cs b/src/Pole.ReliableMessage.Storage.Mongodb/MongodbOption.cs new file mode 100644 index 0000000..9f39701 --- /dev/null +++ b/src/Pole.ReliableMessage.Storage.Mongodb/MongodbOption.cs @@ -0,0 +1,25 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace Pole.ReliableMessage.Storage.Mongodb +{ + public class MongodbOption + { + public string MessageDatabaseName { get; set; } = "ReliableMessage"; + public string MembershipCollectionName { get; set; } = "Membership"; + /// + /// bucket 中最大消息数 一旦达到最大数量 后面的数据将覆盖前面的数据 + /// + public long CollectionMaxMessageCount { get; set; } = 20000000; + + /// + /// 默认最大为10G + /// + public long CollectionMaxSize { get; set; } = 10*1024*1024*1024L; + + public string ServiceCollectionName { get; set; } + public MongoHost[] Servers { get; set; } + } + +} diff --git a/src/Pole.ReliableMessage.Storage.Mongodb/MongodbStorage.cs b/src/Pole.ReliableMessage.Storage.Mongodb/MongodbStorage.cs new file mode 100644 index 0000000..a22e5af --- /dev/null +++ b/src/Pole.ReliableMessage.Storage.Mongodb/MongodbStorage.cs @@ -0,0 +1,140 @@ +using Pole.ReliableMessage.Abstraction; +using Pole.ReliableMessage.Messaging; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using MongoDB.Driver; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Linq.Expressions; +using System.Text; +using System.Threading.Tasks; + +namespace Pole.ReliableMessage.Storage.Mongodb +{ + class MongodbMessageStorage : IMessageStorage + { + private readonly MongoClient _mongoClient; + private readonly MongodbOption _mongodbOption; + private readonly ILogger _logger; + public MongodbMessageStorage(MongoClient mongoClient, IOptions mongodbOption, ILogger logger) + { + _mongoClient = mongoClient; + _mongodbOption = mongodbOption.Value; + _logger = logger; + } + private IMongoDatabase GetActiveMessageDatabase(string messageDatabase) + { + return _mongoClient.GetDatabase(messageDatabase); + } + private IMongoCollection GetCollection() + { + var database = GetActiveMessageDatabase(_mongodbOption.MessageDatabaseName); + var messageCollectionName = _mongodbOption.ServiceCollectionName; + var collection = database.GetCollection(messageCollectionName); + return collection; + } + public async Task Add(Message message) + { + IMongoCollection collection = GetCollection(); + + await collection.InsertOneAsync(message); + return true; + } + + public async Task CheckAndUpdateStatus(Expression> filter, MessageStatus messageStatus) + { + IMongoCollection collection = GetCollection(); + + var update = Builders.Update.Set(m => m.MessageStatusId, messageStatus.Id); + var beforeDoc = await collection.FindOneAndUpdateAsync(filter, update, new FindOneAndUpdateOptions() { ReturnDocument = ReturnDocument.Before }); + if (beforeDoc.MessageStatusId == messageStatus.Id) + { + return false; + } + return true; + } + + public async Task> GetMany(Expression> filter,int count) + { + IMongoCollection collection = GetCollection(); + + var list= await collection.Find(filter).Limit(count).ToListAsync(); + list.ForEach(m => + { + m.MessageStatus = Core.Enumeration.FromValue(m.MessageStatusId); + }); + return list; + } + + public async Task Save(IEnumerable messages) + { + var count = messages.Count(); + _logger.LogDebug($"MongodbMessageStorage Save begin, Messages count: {messages.Count()}"); + if (count == 0) + { + _logger.LogDebug($"MongodbMessageStorage Save successfully, saved count: 0"); + return true; + } + IMongoCollection collection = GetCollection(); + + var models = new List>(); + foreach (var message in messages) + { + FilterDefinition filter = Builders.Filter.Where(m => m.Id == message.Id && m.MessageStatusId != MessageStatus.Handed.Id); + UpdateDefinition update = Builders.Update + .Set(m => m.MessageStatusId, message.MessageStatus.Id) + .Set(m => m.RetryTimes, message.RetryTimes) + .Set(m => m.NextRetryUTCTime, message.NextRetryUTCTime); + + var model = new UpdateOneModel(filter, update); + models.Add(model); + } + var result = await collection.BulkWriteAsync(models, new BulkWriteOptions { IsOrdered = false }); + + _logger.LogDebug($"MongodbMessageStorage Save successfully, saved count: {result.ModifiedCount}"); + + return result.IsAcknowledged; + } + + public async Task UpdateStatus(IEnumerable messages) + { + var count = messages.Count(); + _logger.LogDebug($"MongodbMessageStorage updateStatus begin, Messages count: {messages.Count()}"); + if (count == 0) + { + _logger.LogDebug($"MongodbMessageStorage updateStatus successfully, Modified count: 0"); + return true; + } + IMongoCollection collection = GetCollection(); + + var models = new List>(); + + foreach (var message in messages) + { + FilterDefinition filter = Builders.Filter.Where(m => m.Id == message.Id&&m.MessageStatusId!=MessageStatus.Handed.Id); + UpdateDefinition update = Builders.Update + .Set(m => m.MessageStatusId, message.MessageStatus.Id); + + var model = new UpdateOneModel(filter, update); + models.Add(model); + } + + var result = await collection.BulkWriteAsync(models, new BulkWriteOptions { IsOrdered = false }); + + _logger.LogDebug($"MongodbMessageStorage updateStatus successfully, Modified count: {result.ModifiedCount}"); + + return result.IsAcknowledged; + } + + public async Task UpdateStatus(Expression> filter, MessageStatus messageStatus) + { + IMongoCollection collection = GetCollection(); + + var update = Builders.Update.Set(m => m.MessageStatusId, messageStatus.Id); + var result = await collection.UpdateOneAsync(filter, update); + return result.IsAcknowledged; + } + } +} diff --git a/src/Pole.ReliableMessage.Storage.Mongodb/Pole.ReliableMessage.Storage.Mongodb.csproj b/src/Pole.ReliableMessage.Storage.Mongodb/Pole.ReliableMessage.Storage.Mongodb.csproj new file mode 100644 index 0000000..8ded00e --- /dev/null +++ b/src/Pole.ReliableMessage.Storage.Mongodb/Pole.ReliableMessage.Storage.Mongodb.csproj @@ -0,0 +1,19 @@ + + + + netstandard2.0 + + + + + + + + + + + + + + + diff --git a/src/Pole.ReliableMessage.Storage.Mongodb/ReliableMessageOptionExtension.cs b/src/Pole.ReliableMessage.Storage.Mongodb/ReliableMessageOptionExtension.cs new file mode 100644 index 0000000..781bd2e --- /dev/null +++ b/src/Pole.ReliableMessage.Storage.Mongodb/ReliableMessageOptionExtension.cs @@ -0,0 +1,128 @@ +using Pole.ReliableMessage; +using Pole.ReliableMessage.Abstraction; +using Pole.ReliableMessage.Messaging; +using Pole.ReliableMessage.Storage.Mongodb; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Options; +using MongoDB.Bson; +using MongoDB.Bson.Serialization; +using MongoDB.Bson.Serialization.IdGenerators; +using MongoDB.Driver; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using Pole.ReliableMessage.Storage.Abstraction; + +namespace Microsoft.Extensions.DependencyInjection +{ + public static class ReliableMessageOptionExtension + { + public static ReliableMessageOption AddMongodb(this ReliableMessageOption option, Action mongodbOptionConfig) + { + option.ReliableMessageOptionExtensions.Add(new MongodbStorageExtension(mongodbOptionConfig)); + return option; + } + } + public class MongodbStorageExtension : IReliableMessageOptionExtension + { + private readonly Action _mongodbOption; + public MongodbStorageExtension(Action masstransitRabbitmqOption) + { + _mongodbOption = masstransitRabbitmqOption; + } + public void AddServices(IServiceCollection services) + { + services.Configure(_mongodbOption); + services.AddSingleton(); + services.AddSingleton(); + + var mongodbOption = services.BuildServiceProvider().GetRequiredService>().Value; + + var servers = mongodbOption.Servers.Select(x => new MongoServerAddress(x.Host, x.Port)).ToList(); + var settings = new MongoClientSettings() + { + Servers = servers + }; + var client = new MongoClient(settings); + var database = client.GetDatabase(mongodbOption.MessageDatabaseName); + + AddMapper(); + + InitCollection(mongodbOption, database); + + services.AddSingleton(client); + } + + private static void InitCollection(MongodbOption mongodbOption, IMongoDatabase database) + { + var collectionNames = database.ListCollectionNames().ToList(); + + if (!collectionNames.Contains(mongodbOption.ServiceCollectionName)) + { + database.CreateCollection(mongodbOption.ServiceCollectionName, new CreateCollectionOptions + { + Capped = true, + MaxDocuments = mongodbOption.CollectionMaxMessageCount, + MaxSize = mongodbOption.CollectionMaxSize, + + }); + var messageCollection = database.GetCollection(mongodbOption.ServiceCollectionName); + AddMessageCollectionIndex(messageCollection); + } + + if (!collectionNames.Contains(mongodbOption.MembershipCollectionName)) + { + database.CreateCollection(mongodbOption.MembershipCollectionName); + + var membershipCollection = database.GetCollection(mongodbOption.MembershipCollectionName); + AddMemberShipTableCollectionIndex(membershipCollection); + } + } + + private static void AddMessageCollectionIndex(IMongoCollection collection) + { + List> createIndexModels = new List>(); + + //var nextRetryUTCTimeIndex = Builders.IndexKeys.Ascending(m => m.NextRetryUTCTime); + //CreateIndexModel nextRetryUTCTimeIndexModel = new CreateIndexModel(nextRetryUTCTimeIndex, new CreateIndexOptions() { Background = true }); + //createIndexModels.Add(nextRetryUTCTimeIndexModel); + + var AddedUTCTimeUTCTimeIndex = Builders.IndexKeys.Ascending(m => m.AddedUTCTime); + CreateIndexModel AddedUTCTimeIndexModel = new CreateIndexModel(AddedUTCTimeUTCTimeIndex, new CreateIndexOptions() { Background = true }); + createIndexModels.Add(AddedUTCTimeIndexModel); + + //var messageTypeIdIndex = Builders.IndexKeys.Ascending(m => m.MessageTypeId); + //CreateIndexModel messageTypeIdIndexModel = new CreateIndexModel(messageTypeIdIndex, new CreateIndexOptions() { Background = true }); + //createIndexModels.Add(messageTypeIdIndexModel); + + collection.Indexes.CreateMany(createIndexModels); + } + private static void AddMemberShipTableCollectionIndex(IMongoCollection collection) + { + List> createIndexMembershipModels = new List>(); + + var serviceNameIndex = Builders.IndexKeys.Ascending(m => m.ServiceName); + CreateIndexModel serviceNameIndexModel = new CreateIndexModel(serviceNameIndex, new CreateIndexOptions() { Background = true, Unique = true }); + createIndexMembershipModels.Add(serviceNameIndexModel); + + collection.Indexes.CreateMany(createIndexMembershipModels); + } + + private static void AddMapper() + { + BsonClassMap.RegisterClassMap(cm => + { + cm.AutoMap(); + cm.UnmapMember(m => m.MessageStatus); + cm.MapIdField(m => m.Id); + cm.MapMember(m => m.NextRetryUTCTime).SetIsRequired(true); + }); + BsonClassMap.RegisterClassMap(cm => + { + cm.AutoMap(); + cm.MapIdField(m => m.Id).SetIdGenerator(StringObjectIdGenerator.Instance); + }); + } + } +} diff --git a/src/Pole.ReliableMessage/Abstraction/IApplicationBuilderConfigurator.cs b/src/Pole.ReliableMessage/Abstraction/IApplicationBuilderConfigurator.cs new file mode 100644 index 0000000..5b9c8a8 --- /dev/null +++ b/src/Pole.ReliableMessage/Abstraction/IApplicationBuilderConfigurator.cs @@ -0,0 +1,13 @@ +using Microsoft.AspNetCore.Builder; +using System; +using System.Collections.Generic; +using System.Text; + +namespace Pole.ReliableMessage.Abstraction +{ + public interface IApplicationBuilderConfigurator + { + void Config(IApplicationBuilder applicationBuilder); + void Add(Action config); + } +} diff --git a/src/Pole.ReliableMessage/Abstraction/IComteckReliableMessageBootstrap.cs b/src/Pole.ReliableMessage/Abstraction/IComteckReliableMessageBootstrap.cs new file mode 100644 index 0000000..da17eb8 --- /dev/null +++ b/src/Pole.ReliableMessage/Abstraction/IComteckReliableMessageBootstrap.cs @@ -0,0 +1,14 @@ +using Microsoft.Extensions.DependencyInjection; +using System; +using System.Collections.Generic; +using System.Reflection; +using System.Text; +using System.Threading.Tasks; + +namespace Pole.ReliableMessage.Abstraction +{ + public interface IComteckReliableMessageBootstrap + { + Task Initialize(IServiceCollection services, List eventHandlerAssemblies, List eventAssemblies); + } +} diff --git a/src/Pole.ReliableMessage/Abstraction/IJsonConverter.cs b/src/Pole.ReliableMessage/Abstraction/IJsonConverter.cs new file mode 100644 index 0000000..88dd982 --- /dev/null +++ b/src/Pole.ReliableMessage/Abstraction/IJsonConverter.cs @@ -0,0 +1,13 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace Pole.ReliableMessage.Abstraction +{ + public interface IJsonConverter + { + string Serialize(object obj); + T Deserialize(string json); + object Deserialize(string json,Type type); + } +} diff --git a/src/Pole.ReliableMessage/Abstraction/IMessageBuffer.cs b/src/Pole.ReliableMessage/Abstraction/IMessageBuffer.cs new file mode 100644 index 0000000..7055878 --- /dev/null +++ b/src/Pole.ReliableMessage/Abstraction/IMessageBuffer.cs @@ -0,0 +1,16 @@ +using Pole.ReliableMessage.Messaging; +using System; +using System.Collections.Generic; +using System.Linq.Expressions; +using System.Text; +using System.Threading.Tasks; + +namespace Pole.ReliableMessage.Abstraction +{ + public interface IMessageBuffer + { + Task Flush(); + Task Add(Message message); + Task> GetAll(Func filter); + } +} diff --git a/src/Pole.ReliableMessage/Abstraction/IMessageBus.cs b/src/Pole.ReliableMessage/Abstraction/IMessageBus.cs new file mode 100644 index 0000000..9f10177 --- /dev/null +++ b/src/Pole.ReliableMessage/Abstraction/IMessageBus.cs @@ -0,0 +1,13 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace Pole.ReliableMessage.Abstraction +{ + public interface IMessageBus + { + Task Publish(object @event,string reliableMessageId, CancellationToken cancellationToken = default); + } +} diff --git a/src/Pole.ReliableMessage/Abstraction/IMessageBusConfigurator.cs b/src/Pole.ReliableMessage/Abstraction/IMessageBusConfigurator.cs new file mode 100644 index 0000000..0261d1a --- /dev/null +++ b/src/Pole.ReliableMessage/Abstraction/IMessageBusConfigurator.cs @@ -0,0 +1,13 @@ +using Microsoft.Extensions.DependencyInjection; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; + +namespace Pole.ReliableMessage.Abstraction +{ + public interface IMessageBusConfigurator + { + Task Configure(IServiceCollection services, IEnumerable eventHandlerTypes); + } +} diff --git a/src/Pole.ReliableMessage/Abstraction/IMessageCallBackGenerator.cs b/src/Pole.ReliableMessage/Abstraction/IMessageCallBackGenerator.cs new file mode 100644 index 0000000..3a7e5c6 --- /dev/null +++ b/src/Pole.ReliableMessage/Abstraction/IMessageCallBackGenerator.cs @@ -0,0 +1,12 @@ +using Pole.ReliableMessage.Messaging.CallBack; +using System; +using System.Collections.Generic; +using System.Text; + +namespace Pole.ReliableMessage.Abstraction +{ + public interface IMessageCallBackInfoGenerator + { + MessageCallBackInfo Generate(Type eventHandlerType); + } +} diff --git a/src/Pole.ReliableMessage/Abstraction/IMessageCallBackInfoStore.cs b/src/Pole.ReliableMessage/Abstraction/IMessageCallBackInfoStore.cs new file mode 100644 index 0000000..c82ab66 --- /dev/null +++ b/src/Pole.ReliableMessage/Abstraction/IMessageCallBackInfoStore.cs @@ -0,0 +1,14 @@ +using Pole.ReliableMessage.Messaging.CallBack; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; + +namespace Pole.ReliableMessage.Abstraction +{ + public interface IMessageCallBackInfoStore + { + Task Add(MessageCallBackInfo messageCallBackInfo); + Task Get(string messageTypeId); + } +} diff --git a/src/Pole.ReliableMessage/Abstraction/IMessageCallBackRegister.cs b/src/Pole.ReliableMessage/Abstraction/IMessageCallBackRegister.cs new file mode 100644 index 0000000..c39a628 --- /dev/null +++ b/src/Pole.ReliableMessage/Abstraction/IMessageCallBackRegister.cs @@ -0,0 +1,12 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; + +namespace Pole.ReliableMessage.Abstraction +{ + public interface IMessageCallBackRegister + { + Task Register(IEnumerable eventHandlerTypes); + } +} diff --git a/src/Pole.ReliableMessage/Abstraction/IMessageChecker.cs b/src/Pole.ReliableMessage/Abstraction/IMessageChecker.cs new file mode 100644 index 0000000..11b2e44 --- /dev/null +++ b/src/Pole.ReliableMessage/Abstraction/IMessageChecker.cs @@ -0,0 +1,14 @@ +using Pole.ReliableMessage.Messaging; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; + +namespace Pole.ReliableMessage.Abstraction +{ + public interface IMessageChecker + { + Task GetResult(Message message); + + } +} diff --git a/src/Pole.ReliableMessage/Abstraction/IMessageIdGenerator.cs b/src/Pole.ReliableMessage/Abstraction/IMessageIdGenerator.cs new file mode 100644 index 0000000..aa888c7 --- /dev/null +++ b/src/Pole.ReliableMessage/Abstraction/IMessageIdGenerator.cs @@ -0,0 +1,11 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace Pole.ReliableMessage.Abstraction +{ + public interface IMessageIdGenerator + { + string Generate(); + } +} diff --git a/src/Pole.ReliableMessage/Abstraction/IMessageStorage.cs b/src/Pole.ReliableMessage/Abstraction/IMessageStorage.cs new file mode 100644 index 0000000..849fcc6 --- /dev/null +++ b/src/Pole.ReliableMessage/Abstraction/IMessageStorage.cs @@ -0,0 +1,60 @@ +using Pole.ReliableMessage.Messaging; +using System; +using System.Collections.Generic; +using System.Linq.Expressions; +using System.Text; +using System.Threading.Tasks; + +namespace Pole.ReliableMessage.Abstraction +{ + public interface IMessageStorage + { + /// + /// + /// + /// + /// + Task Add(Message message); + ///// + ///// + ///// + ///// + ///// + ///// + //Task Get(Expression> filter); + /// + /// + /// + /// + /// + /// + Task> GetMany(Expression> filter, int count); + /// + /// 批量更新 + /// 更新这几个值 MessageStatusId , RetryTimes LastRetryUTCTime, NextRetryUTCTime + /// + /// + /// + Task Save(IEnumerable messages); + + Task UpdateStatus(IEnumerable messages); + + ///// + ///// + ///// + ///// 这里id 永远为 string + ///// + ///// + //Task Update(Expression> filter, MessageStatus messageStatus); + + /// + /// 检查 消息的状态,如果不是指定状态则返回true,并且更新状态到指定状态 ,如果已经是指定状态返回false + /// + /// + /// + /// + Task CheckAndUpdateStatus(Expression> filter, MessageStatus messageStatus); + + Task UpdateStatus(Expression> filter, MessageStatus messageStatus); + } +} diff --git a/src/Pole.ReliableMessage/Abstraction/IMessageTypeIdGenerator.cs b/src/Pole.ReliableMessage/Abstraction/IMessageTypeIdGenerator.cs new file mode 100644 index 0000000..58d5206 --- /dev/null +++ b/src/Pole.ReliableMessage/Abstraction/IMessageTypeIdGenerator.cs @@ -0,0 +1,11 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace Pole.ReliableMessage.Abstraction +{ + public interface IMessageTypeIdGenerator + { + string Generate(Type messageType); + } +} diff --git a/src/Pole.ReliableMessage/Abstraction/IProcessor.cs b/src/Pole.ReliableMessage/Abstraction/IProcessor.cs new file mode 100644 index 0000000..8037f05 --- /dev/null +++ b/src/Pole.ReliableMessage/Abstraction/IProcessor.cs @@ -0,0 +1,14 @@ +using Pole.ReliableMessage.Processor; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; + +namespace Pole.ReliableMessage.Abstraction +{ + public interface IProcessor + { + string Name { get; } + Task Process(ProcessingContext context); + } +} diff --git a/src/Pole.ReliableMessage/Abstraction/IProcessorServer.cs b/src/Pole.ReliableMessage/Abstraction/IProcessorServer.cs new file mode 100644 index 0000000..61f1988 --- /dev/null +++ b/src/Pole.ReliableMessage/Abstraction/IProcessorServer.cs @@ -0,0 +1,13 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace Pole.ReliableMessage.Abstraction +{ + public interface IProcessorServer + { + Task Start(CancellationToken stoppingToken); + } +} diff --git a/src/Pole.ReliableMessage/Abstraction/IReliableBus.cs b/src/Pole.ReliableMessage/Abstraction/IReliableBus.cs new file mode 100644 index 0000000..9eb37d9 --- /dev/null +++ b/src/Pole.ReliableMessage/Abstraction/IReliableBus.cs @@ -0,0 +1,15 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace Pole.ReliableMessage.Abstraction +{ + public interface IReliableBus + { + Task PrePublish(TReliableEvent @event,object callbackParemeter, CancellationToken cancellationToken = default); + Task Publish(TReliableEvent @event,string prePublishMessageId, CancellationToken cancellationToken=default); + Task Cancel(TReliableEvent @event, string prePublishMessageId, CancellationToken cancellationToken = default); + } +} diff --git a/src/Pole.ReliableMessage/Abstraction/IReliableEventCallBackFinder.cs b/src/Pole.ReliableMessage/Abstraction/IReliableEventCallBackFinder.cs new file mode 100644 index 0000000..8cfded7 --- /dev/null +++ b/src/Pole.ReliableMessage/Abstraction/IReliableEventCallBackFinder.cs @@ -0,0 +1,12 @@ +using System; +using System.Collections.Generic; +using System.Reflection; +using System.Text; + +namespace Pole.ReliableMessage.Abstraction +{ + public interface IReliableEventCallBackFinder + { + List FindAll(IEnumerable assemblies); + } +} diff --git a/src/Pole.ReliableMessage/Abstraction/IReliableEventCallback.cs b/src/Pole.ReliableMessage/Abstraction/IReliableEventCallback.cs new file mode 100644 index 0000000..22e7b5f --- /dev/null +++ b/src/Pole.ReliableMessage/Abstraction/IReliableEventCallback.cs @@ -0,0 +1,16 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; + +namespace Pole.ReliableMessage.Abstraction +{ + public interface IReliableEventCallback : IReliableEventCallback + { + Task Callback(TCallbackParemeter callbackParemeter); + } + public interface IReliableEventCallback + { + + } +} diff --git a/src/Pole.ReliableMessage/Abstraction/IReliableEventHandler.cs b/src/Pole.ReliableMessage/Abstraction/IReliableEventHandler.cs new file mode 100644 index 0000000..03df883 --- /dev/null +++ b/src/Pole.ReliableMessage/Abstraction/IReliableEventHandler.cs @@ -0,0 +1,18 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; + +namespace Pole.ReliableMessage.Abstraction +{ + public interface IReliableEventHandler : IReliableEventHandler + where TEvent : class + { + Task Handle(IReliableEventHandlerContext context); + + } + public interface IReliableEventHandler + { + + } +} diff --git a/src/Pole.ReliableMessage/Abstraction/IReliableEventHandlerContext.cs b/src/Pole.ReliableMessage/Abstraction/IReliableEventHandlerContext.cs new file mode 100644 index 0000000..9b36ea8 --- /dev/null +++ b/src/Pole.ReliableMessage/Abstraction/IReliableEventHandlerContext.cs @@ -0,0 +1,13 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; + +namespace Pole.ReliableMessage.Abstraction +{ + public interface IReliableEventHandlerContext where TEvent : class + { + TEvent Event { get; } + //Task Publish(object @event); + } +} diff --git a/src/Pole.ReliableMessage/Abstraction/IReliableEventHandlerFinder.cs b/src/Pole.ReliableMessage/Abstraction/IReliableEventHandlerFinder.cs new file mode 100644 index 0000000..bd17aa1 --- /dev/null +++ b/src/Pole.ReliableMessage/Abstraction/IReliableEventHandlerFinder.cs @@ -0,0 +1,12 @@ +using System; +using System.Collections.Generic; +using System.Reflection; +using System.Text; + +namespace Pole.ReliableMessage.Abstraction +{ + public interface IReliableEventHandlerFinder + { + List FindAll(IEnumerable assemblies); + } +} diff --git a/src/Pole.ReliableMessage/Abstraction/IRetryTimeCalculator.cs b/src/Pole.ReliableMessage/Abstraction/IRetryTimeCalculator.cs new file mode 100644 index 0000000..5302364 --- /dev/null +++ b/src/Pole.ReliableMessage/Abstraction/IRetryTimeCalculator.cs @@ -0,0 +1,11 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace Pole.ReliableMessage.Abstraction +{ + public interface IRetryTimeDelayCalculator + { + int Get(int retryTimes, int maxPendingMessageRetryDelay); + } +} diff --git a/src/Pole.ReliableMessage/Abstraction/IServiceIPv4AddressProvider.cs b/src/Pole.ReliableMessage/Abstraction/IServiceIPv4AddressProvider.cs new file mode 100644 index 0000000..cdc42f7 --- /dev/null +++ b/src/Pole.ReliableMessage/Abstraction/IServiceIPv4AddressProvider.cs @@ -0,0 +1,11 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace Pole.ReliableMessage.Abstraction +{ + public interface IServiceIPv4AddressProvider + { + string Get(); + } +} diff --git a/src/Pole.ReliableMessage/Abstraction/ITimeHelper.cs b/src/Pole.ReliableMessage/Abstraction/ITimeHelper.cs new file mode 100644 index 0000000..a844fc3 --- /dev/null +++ b/src/Pole.ReliableMessage/Abstraction/ITimeHelper.cs @@ -0,0 +1,16 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace Pole.ReliableMessage.Abstraction +{ + public interface ITimeHelper + { + DateTime GetUTCNow(); + /// + /// "UTC :{_timeHelper.GetNow().ToString("yyyy-MM-dd HH:mm:ss.fff")}" + /// + /// + string GetAppropriateFormatedDateString(); + } +} diff --git a/src/Pole.ReliableMessage/BackgroundServiceBasedProcessorServer.cs b/src/Pole.ReliableMessage/BackgroundServiceBasedProcessorServer.cs new file mode 100644 index 0000000..e1b3fb8 --- /dev/null +++ b/src/Pole.ReliableMessage/BackgroundServiceBasedProcessorServer.cs @@ -0,0 +1,53 @@ +using Pole.ReliableMessage.Abstraction; +using Pole.ReliableMessage.Processor; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace Pole.ReliableMessage +{ + public class BackgroundServiceBasedProcessorServer : BackgroundService, IProcessorServer + { + private readonly IServiceProvider _serviceProvider; + private Task _compositeTask; + + public BackgroundServiceBasedProcessorServer(IServiceProvider serviceProvider) + { + _serviceProvider = serviceProvider; + } + public async Task Start(CancellationToken stoppingToken) + { + + ProcessingContext processingContext = new ProcessingContext(stoppingToken); + List loopProcessors = new List(); + var innerProcessors = _serviceProvider.GetServices(); + var loggerFactory = _serviceProvider.GetService(); + var timeHelper = _serviceProvider.GetService(); + foreach (var innerProcessor in innerProcessors) + { + LoopProcessor processor = new LoopProcessor(innerProcessor, loggerFactory, timeHelper); + loopProcessors.Add(processor); + } + var tasks = loopProcessors.Select(p => p.Process(processingContext)); + + _compositeTask = Task.WhenAll(tasks); + await _compositeTask; + } + protected override Task ExecuteAsync(CancellationToken stoppingToken) + { + return Start(stoppingToken); + } + public override void Dispose() + { + // 等待 10秒 待 消息处理完 + _compositeTask?.Wait((int)TimeSpan.FromSeconds(10).TotalMilliseconds); + base.Dispose(); + } + } +} diff --git a/src/Pole.ReliableMessage/ComteckReliableMessageIApplicationBuilderExtensions.cs b/src/Pole.ReliableMessage/ComteckReliableMessageIApplicationBuilderExtensions.cs new file mode 100644 index 0000000..6c6b177 --- /dev/null +++ b/src/Pole.ReliableMessage/ComteckReliableMessageIApplicationBuilderExtensions.cs @@ -0,0 +1,27 @@ +using Pole.ReliableMessage; +using Pole.ReliableMessage.Abstraction; +using Microsoft.AspNetCore.Builder; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Options; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; + +namespace Microsoft.AspNetCore.Builder +{ + public static class ComteckReliableMessageIApplicationBuilderExtensions + { + public static IApplicationBuilder UseComteckReliableMessage(this IApplicationBuilder applicationBuilder) + { + var option = applicationBuilder.ApplicationServices.GetRequiredService(typeof(IOptions)) as IOptions; + var messageCallBackRegister = applicationBuilder.ApplicationServices.GetRequiredService(typeof(IMessageCallBackRegister)) as IMessageCallBackRegister; + var reliableEventCallBackFinder = applicationBuilder.ApplicationServices.GetRequiredService(typeof(IReliableEventCallBackFinder)) as IReliableEventCallBackFinder; + + var eventCallbacks = reliableEventCallBackFinder.FindAll(option.Value.EventCallbackAssemblies); + messageCallBackRegister.Register(eventCallbacks).GetAwaiter().GetResult(); + + return applicationBuilder; + } + } +} diff --git a/src/Pole.ReliableMessage/ComteckReliableMessageServiceCollectionExtensions.cs b/src/Pole.ReliableMessage/ComteckReliableMessageServiceCollectionExtensions.cs new file mode 100644 index 0000000..e92f470 --- /dev/null +++ b/src/Pole.ReliableMessage/ComteckReliableMessageServiceCollectionExtensions.cs @@ -0,0 +1,66 @@ +using Pole.Pole.ReliableMessage.EventBus; +using Pole.ReliableMessage; +using Pole.ReliableMessage.Abstraction; +using Pole.ReliableMessage.EventBus; +using Pole.ReliableMessage.Messaging; +using Pole.ReliableMessage.Messaging.CallBack; +using Pole.ReliableMessage.Processor; +using Pole.ReliableMessage.Utils; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using System; +using System.Collections.Generic; +using System.Text; + +namespace Microsoft.Extensions.DependencyInjection +{ + public static class ComteckReliableMessageServiceCollectionExtensions + { + public static IServiceCollection AddComteckReliableMessage(this IServiceCollection services, Action optionConfig) + { + ReliableMessageOption reliableMessageOption = new ReliableMessageOption(); + optionConfig(reliableMessageOption); + + foreach(var extension in reliableMessageOption.ReliableMessageOptionExtensions) + { + extension.AddServices(services); + } + services.Configure(optionConfig); + + services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(); + + services.AddHostedService(); + + services.AddHttpClient(); + + + + + services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(); + + var provider = services.BuildServiceProvider(); + + IComteckReliableMessageBootstrap applicationBuilderConfigurator = provider.GetService(typeof(IComteckReliableMessageBootstrap)) as IComteckReliableMessageBootstrap; + + applicationBuilderConfigurator.Initialize(services, reliableMessageOption.EventHandlerAssemblies, reliableMessageOption.EventCallbackAssemblies); + return services; + } + } +} diff --git a/src/Pole.ReliableMessage/Core/Enumeration.cs b/src/Pole.ReliableMessage/Core/Enumeration.cs new file mode 100644 index 0000000..3c584b6 --- /dev/null +++ b/src/Pole.ReliableMessage/Core/Enumeration.cs @@ -0,0 +1,73 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Reflection; +using System.Text; + +namespace Pole.ReliableMessage.Core +{ + public abstract class Enumeration : IComparable + { + public string Name { get; private set; } + + public int Id { get; private set; } + + protected Enumeration(int id, string name) + { + Id = id; + Name = name; + } + + public override string ToString() => Name; + + public static IEnumerable GetAll() where T : Enumeration + { + var fields = typeof(T).GetFields(BindingFlags.Public | BindingFlags.Static | BindingFlags.DeclaredOnly); + + return fields.Select(f => f.GetValue(null)).Cast(); + } + + public override bool Equals(object obj) + { + if (!(obj is Enumeration otherValue)) + return false; + + var typeMatches = GetType().Equals(obj.GetType()); + var valueMatches = Id.Equals(otherValue.Id); + + return typeMatches && valueMatches; + } + + public override int GetHashCode() => Id.GetHashCode(); + + public static int AbsoluteDifference(Enumeration firstValue, Enumeration secondValue) + { + var absoluteDifference = Math.Abs(firstValue.Id - secondValue.Id); + return absoluteDifference; + } + + public static T FromValue(int value) where T : Enumeration + { + var matchingItem = Parse(value, "value", item => item.Id == value); + return matchingItem; + } + + public static T FromDisplayName(string displayName) where T : Enumeration + { + var matchingItem = Parse(displayName, "display name", item => item.Name == displayName); + return matchingItem; + } + + private static T Parse(K value, string description, Func predicate) where T : Enumeration + { + var matchingItem = GetAll().FirstOrDefault(predicate); + + if (matchingItem == null) + throw new InvalidOperationException($"'{value}' is not a valid {description} in {typeof(T)}"); + + return matchingItem; + } + + public int CompareTo(object other) => Id.CompareTo(((Enumeration)other).Id); + } +} diff --git a/src/Pole.ReliableMessage/DefaultApplicationBuilderConfigurator.cs b/src/Pole.ReliableMessage/DefaultApplicationBuilderConfigurator.cs new file mode 100644 index 0000000..805204d --- /dev/null +++ b/src/Pole.ReliableMessage/DefaultApplicationBuilderConfigurator.cs @@ -0,0 +1,24 @@ +using Pole.ReliableMessage.Abstraction; +using Microsoft.AspNetCore.Builder; +using System; +using System.Collections.Generic; +using System.Text; + +namespace Pole.ReliableMessage +{ + class DefaultApplicationBuilderConfigurator : IApplicationBuilderConfigurator + { + private readonly List> _configs = new List>(); + public void Add(Action config) + { + _configs.Add(config); + } + + public void Config(IApplicationBuilder applicationBuilder) + { + _configs.ForEach(m => { + m(applicationBuilder); + }); + } + } +} diff --git a/src/Pole.ReliableMessage/DefaultComteckReliableMessageBootstrap.cs b/src/Pole.ReliableMessage/DefaultComteckReliableMessageBootstrap.cs new file mode 100644 index 0000000..f0987b6 --- /dev/null +++ b/src/Pole.ReliableMessage/DefaultComteckReliableMessageBootstrap.cs @@ -0,0 +1,42 @@ +using Pole.ReliableMessage.Abstraction; +using Microsoft.Extensions.DependencyInjection; +using System; +using System.Collections.Generic; +using System.Reflection; +using System.Text; +using System.Threading.Tasks; + +namespace Pole.ReliableMessage +{ + class DefaultComteckReliableMessageBootstrap : IComteckReliableMessageBootstrap + { + private readonly IReliableEventHandlerFinder _reliableEventHandlerFinder; + private readonly IMessageBusConfigurator _messageBusConfigurator; + private readonly IMessageCallBackRegister _messageCallBackRegister; + private readonly IReliableEventCallBackFinder _reliableEventCallBackFinder; + public DefaultComteckReliableMessageBootstrap(IReliableEventHandlerFinder reliableEventHandlerFinder, IMessageBusConfigurator messageBusConfigurator, IMessageCallBackRegister messageCallBackRegister, IReliableEventCallBackFinder reliableEventCallBackFinder) + { + _reliableEventHandlerFinder = reliableEventHandlerFinder; + _messageBusConfigurator = messageBusConfigurator; + _messageCallBackRegister = messageCallBackRegister; + _reliableEventCallBackFinder = reliableEventCallBackFinder; + } + public async Task Initialize(IServiceCollection services, List eventHandlerAssemblies, List eventAssemblies) + { + var eventHandlers = _reliableEventHandlerFinder.FindAll(eventHandlerAssemblies); + await _messageBusConfigurator.Configure(services, eventHandlers); + + var eventCallbacks = _reliableEventCallBackFinder.FindAll(eventAssemblies); + await _messageCallBackRegister.Register(eventCallbacks); + RegisterEventCallbacks(services, eventCallbacks); + } + + private void RegisterEventCallbacks(IServiceCollection services, List eventCallbacks) + { + eventCallbacks.ForEach(m => + { + services.AddScoped(m); + }); + } + } +} diff --git a/src/Pole.ReliableMessage/DefaultRetryTimeCalculator.cs b/src/Pole.ReliableMessage/DefaultRetryTimeCalculator.cs new file mode 100644 index 0000000..3d8d327 --- /dev/null +++ b/src/Pole.ReliableMessage/DefaultRetryTimeCalculator.cs @@ -0,0 +1,20 @@ +using Pole.ReliableMessage.Abstraction; +using System; +using System.Collections.Generic; +using System.Text; + +namespace Pole.ReliableMessage +{ + class DefaultRetryTimeDelayCalculator : IRetryTimeDelayCalculator + { + public int Get(int retryTimes, int maxPendingMessageRetryDelay) + { + var retryTimeDelay = (int)Math.Pow(2, retryTimes + 1); + if (retryTimeDelay >= maxPendingMessageRetryDelay) + { + return maxPendingMessageRetryDelay; + } + return retryTimeDelay; + } + } +} diff --git a/src/Pole.ReliableMessage/EventBus/DefaultReliableBus.cs b/src/Pole.ReliableMessage/EventBus/DefaultReliableBus.cs new file mode 100644 index 0000000..597bf07 --- /dev/null +++ b/src/Pole.ReliableMessage/EventBus/DefaultReliableBus.cs @@ -0,0 +1,111 @@ +using Pole.ReliableMessage.Abstraction; +using Pole.ReliableMessage.Messaging; +using Microsoft.Extensions.Logging; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using ILogger = Microsoft.Extensions.Logging.ILogger; + +namespace Pole.Pole.ReliableMessage.EventBus +{ + public class DefaultReliableBus : IReliableBus + { + private readonly IMessageBus _messageBus; + private readonly IMessageStorage _messageStorage; + private readonly IMessageIdGenerator _messageIdGenerator; + private readonly ITimeHelper _timeHelper; + private readonly IMessageBuffer _messageBuffer; + private readonly ILogger _logger; + private readonly IJsonConverter _jsonConverter; + private readonly IMessageCallBackInfoStore _messageCallBackInfoStore; + private readonly IMessageTypeIdGenerator _messageTypeIdGenerator; + public DefaultReliableBus(IMessageBus messageBus, IMessageStorage messageStorage, IMessageIdGenerator messageIdGenerator, ITimeHelper timeHelper, IMessageBuffer messageBuffer, ILogger logger, IJsonConverter jsonConverter, IMessageCallBackInfoStore messageCallBackInfoStore, IMessageTypeIdGenerator messageTypeIdGenerator) + { + _messageBus = messageBus; + _messageStorage = messageStorage; + _messageIdGenerator = messageIdGenerator; + _timeHelper = timeHelper; + _messageBuffer = messageBuffer; + _logger = logger; + _jsonConverter = jsonConverter; + _messageCallBackInfoStore = messageCallBackInfoStore; + _messageTypeIdGenerator = messageTypeIdGenerator; + } + + public Task Cancel(TReliableEvent @event, string prePublishMessageId, CancellationToken cancellationToken = default) + { + try + { + return _messageStorage.UpdateStatus(m => m.Id == prePublishMessageId, MessageStatus.Canced); + } + catch (Exception ex) + { + var errorInfo = $"Cancel PrePublish errors in defaultReliableBus;{ex.Message}"; + _logger.LogError(ex, errorInfo); + throw new Exception(errorInfo, ex); + } + } + + public async Task PrePublish(TReliableEvent @event, object callbackParemeter, CancellationToken cancellationToken = default) + { + + var messageTypeId = _messageTypeIdGenerator.Generate(typeof(TReliableEvent)); + + var currentMessageCallbackInfo = _messageCallBackInfoStore.Get(messageTypeId); + if (currentMessageCallbackInfo == null) + { + throw new Exception($"Current message type not registered ,messageTypeId:{messageTypeId}"); + } + try + { + var messageId = _messageIdGenerator.Generate(); + + _logger.LogDebug($"PrePublish message begin ,messageId:{messageId}"); + + var now = _timeHelper.GetUTCNow(); + var content = _jsonConverter.Serialize(@event); + var callBackParem = _jsonConverter.Serialize(callbackParemeter); + Message newMessage = new Message() + { + AddedUTCTime = now, + Content = content, + Id = messageId, + MessageStatusId = MessageStatus.Pending.Id, + MessageTypeId = messageTypeId, + RePushCallBackParameterValue = callBackParem, + NextRetryUTCTime = DateTime.MinValue + }; + await _messageStorage.Add(newMessage); + + _logger.LogDebug($"PrePublish message successful ,messageId:{messageId}"); + + return messageId; + } + catch (Exception ex) + { + var errorInfo = $"PrePublish errors in DefaultReliableBus;{ex.Message}"; + _logger.LogError(ex, errorInfo); + throw new Exception(errorInfo, ex); + } + } + + public async Task Publish(TReliableEvent @event, string prePublishMessageId, CancellationToken cancellationToken = default) + { + try + { + await _messageBus.Publish(@event, prePublishMessageId, cancellationToken); + + var messageBufferResult = await _messageBuffer.Add(new Message { Id = prePublishMessageId, MessageStatus = MessageStatus.Pushed }); + return true; + } + catch (Exception ex) + { + var errorInfo = $"Publish errors in DefaultReliableBus;{ex.Message}"; + _logger.LogError(ex, errorInfo); + throw new Exception(errorInfo, ex); + } + } + } +} diff --git a/src/Pole.ReliableMessage/EventBus/DefaultReliableEventFinder.cs b/src/Pole.ReliableMessage/EventBus/DefaultReliableEventFinder.cs new file mode 100644 index 0000000..9d1f187 --- /dev/null +++ b/src/Pole.ReliableMessage/EventBus/DefaultReliableEventFinder.cs @@ -0,0 +1,20 @@ +using Pole.ReliableMessage.Abstraction; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Reflection; +using System.Text; + +namespace Pole.ReliableMessage.EventBus +{ + class DefaultReliableEventCallBackFinder : IReliableEventCallBackFinder + { + public List FindAll(IEnumerable assemblies) + { + var eventType = typeof(IReliableEventCallback); + + var eventTypes = assemblies.SelectMany(m => m.GetTypes().Where(type => eventType.IsAssignableFrom(type))); + return eventTypes.ToList(); ; + } + } +} diff --git a/src/Pole.ReliableMessage/EventBus/DefaultReliableEventHandlerFinder.cs b/src/Pole.ReliableMessage/EventBus/DefaultReliableEventHandlerFinder.cs new file mode 100644 index 0000000..ebe3328 --- /dev/null +++ b/src/Pole.ReliableMessage/EventBus/DefaultReliableEventHandlerFinder.cs @@ -0,0 +1,20 @@ +using Pole.ReliableMessage.Abstraction; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Reflection; +using System.Text; + +namespace Pole.ReliableMessage.EventBus +{ + public class DefaultReliableEventHandlerFinder : IReliableEventHandlerFinder + { + public List FindAll(IEnumerable assemblies) + { + var eventHandlerType = typeof(IReliableEventHandler); + + var eventHandlerTypes = assemblies.SelectMany(m => m.GetTypes().Where(type => eventHandlerType.IsAssignableFrom(type))); + return eventHandlerTypes.ToList(); + } + } +} diff --git a/src/Pole.ReliableMessage/IReliableMessageOptionExtension.cs b/src/Pole.ReliableMessage/IReliableMessageOptionExtension.cs new file mode 100644 index 0000000..5032fa9 --- /dev/null +++ b/src/Pole.ReliableMessage/IReliableMessageOptionExtension.cs @@ -0,0 +1,12 @@ +using Microsoft.Extensions.DependencyInjection; +using System; +using System.Collections.Generic; +using System.Text; + +namespace Pole.ReliableMessage +{ + public interface IReliableMessageOptionExtension + { + void AddServices(IServiceCollection services); + } +} diff --git a/src/Pole.ReliableMessage/MemberShipTable.cs b/src/Pole.ReliableMessage/MemberShipTable.cs new file mode 100644 index 0000000..535ec71 --- /dev/null +++ b/src/Pole.ReliableMessage/MemberShipTable.cs @@ -0,0 +1,21 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace Pole.ReliableMessage +{ + public class MemberShipTable + { + public string Id { get;private set; } + public MemberShipTable(string serviceName,string pendingMessageCheckerIp,DateTime iAmAliveUTCTime) + { + ServiceName = serviceName; + PendingMessageCheckerIp = pendingMessageCheckerIp; + IAmAliveUTCTime = iAmAliveUTCTime; + } + + public string ServiceName { get;private set; } + public string PendingMessageCheckerIp { get; private set; } + public DateTime IAmAliveUTCTime { get; private set; } + } +} diff --git a/src/Pole.ReliableMessage/Messaging/CallBack/DefaultMessageCallBackInfoGenerator.cs b/src/Pole.ReliableMessage/Messaging/CallBack/DefaultMessageCallBackInfoGenerator.cs new file mode 100644 index 0000000..6e3c93e --- /dev/null +++ b/src/Pole.ReliableMessage/Messaging/CallBack/DefaultMessageCallBackInfoGenerator.cs @@ -0,0 +1,51 @@ +using Pole.ReliableMessage.Abstraction; +using Pole.ReliableMessage.EventBus; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Linq.Expressions; +using System.Text; +using System.Threading.Tasks; + +namespace Pole.ReliableMessage.Messaging.CallBack +{ + class DefaultMessageCallBackInfoGenerator : IMessageCallBackInfoGenerator + { + private readonly IMessageTypeIdGenerator _messageTypeIdGenerator; + public DefaultMessageCallBackInfoGenerator(IMessageTypeIdGenerator messageTypeIdGenerator) + { + _messageTypeIdGenerator = messageTypeIdGenerator; + } + public MessageCallBackInfo Generate(Type eventCallbackType) + { + var @interface = eventCallbackType.GetInterfaces().FirstOrDefault(); + Func> deleg = MakeCallBackFunc(eventCallbackType, @interface); + + var eventType = @interface.GetGenericArguments()[0]; + var eventCallbackArguemntType = @interface.GetGenericArguments()[1]; + var enentName = _messageTypeIdGenerator.Generate(eventType); + + MessageCallBackInfo messageCallBackInfo = new MessageCallBackInfo(enentName, deleg, eventCallbackType, eventCallbackArguemntType, eventType); + return messageCallBackInfo; + } + + private static Func> MakeCallBackFunc(Type eventType, Type @interface) + { + var callbackParemeterType = @interface.GetGenericArguments()[1]; + var argument = Expression.Parameter(typeof(object)); + var paremeter = Expression.Parameter(typeof(object)); + // var typedParemeter = Expression.Parameter(eventType); + var typedcallbackParemeter = Expression.Convert(argument, callbackParemeterType); + + var typedParemeter = Expression.Convert(paremeter, eventType); + + var callBackMethod = eventType.GetMethod("Callback"); + var call = Expression.Call(typedParemeter, callBackMethod, typedcallbackParemeter); + + //var innerParemeter = eventType.GetInterfaces().FirstOrDefault(); + var lambda = Expression.Lambda>>(call, true, argument, paremeter); + var deleg = lambda.Compile(); + return deleg; + } + } +} diff --git a/src/Pole.ReliableMessage/Messaging/CallBack/DefaultMessageCallBackRegister.cs b/src/Pole.ReliableMessage/Messaging/CallBack/DefaultMessageCallBackRegister.cs new file mode 100644 index 0000000..fbe8035 --- /dev/null +++ b/src/Pole.ReliableMessage/Messaging/CallBack/DefaultMessageCallBackRegister.cs @@ -0,0 +1,27 @@ +using Pole.ReliableMessage.Abstraction; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; + +namespace Pole.ReliableMessage.Messaging.CallBack +{ + class DefaultMessageCallBackRegister : IMessageCallBackRegister + { + private readonly IMessageCallBackInfoGenerator _messageCallBackInfoGenerator; + private readonly IMessageCallBackInfoStore _messageCallBackInfoStore; + public DefaultMessageCallBackRegister(IMessageCallBackInfoGenerator messageCallBackInfoGenerator, IMessageCallBackInfoStore messageCallBackInfoStore) + { + _messageCallBackInfoGenerator = messageCallBackInfoGenerator; + _messageCallBackInfoStore = messageCallBackInfoStore; + } + public async Task Register(IEnumerable eventCallbackTypes) + { + foreach(var eventCallbackType in eventCallbackTypes) + { + var messageCallBackInfo = _messageCallBackInfoGenerator.Generate(eventCallbackType); + await _messageCallBackInfoStore.Add(messageCallBackInfo); + } + } + } +} diff --git a/src/Pole.ReliableMessage/Messaging/CallBack/MessageCallBackInfo.cs b/src/Pole.ReliableMessage/Messaging/CallBack/MessageCallBackInfo.cs new file mode 100644 index 0000000..e111c97 --- /dev/null +++ b/src/Pole.ReliableMessage/Messaging/CallBack/MessageCallBackInfo.cs @@ -0,0 +1,30 @@ +using Pole.ReliableMessage.Abstraction; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; + +namespace Pole.ReliableMessage.Messaging.CallBack +{ + public class MessageCallBackInfo + { + private Func> _callBack; + public MessageCallBackInfo(string messageTypeId, Func> callBack, Type eventCallbackType, Type eventCallbackArguemntType,Type eventType) + { + MessageTypeId = messageTypeId; + _callBack = callBack; + EventCallbackType = eventCallbackType; + EventCallbackArguemntType = eventCallbackArguemntType; + EventType = eventType; + } + public string MessageTypeId { get;private set; } + public Type EventType { get; private set; } + public Type EventCallbackType { get; private set; } + public Type EventCallbackArguemntType { get; private set; } + + public Task Invoke(object parameter, object reliableEvent) + { + return _callBack(parameter, reliableEvent); + } + } +} diff --git a/src/Pole.ReliableMessage/Messaging/CallBack/MessageCallBackInfoInMemoryStore.cs b/src/Pole.ReliableMessage/Messaging/CallBack/MessageCallBackInfoInMemoryStore.cs new file mode 100644 index 0000000..7cee68e --- /dev/null +++ b/src/Pole.ReliableMessage/Messaging/CallBack/MessageCallBackInfoInMemoryStore.cs @@ -0,0 +1,38 @@ +using Pole.ReliableMessage.Abstraction; +using Microsoft.Extensions.Logging; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; + +namespace Pole.ReliableMessage.Messaging.CallBack +{ + public class MessageCallBackInfoInMemoryStore : Dictionary, IMessageCallBackInfoStore + { + private readonly ILogger _logger; + public MessageCallBackInfoInMemoryStore(ILogger logger) + { + _logger = logger; + } + public async Task Add(MessageCallBackInfo messageCallBackInfo) + { + await Task.CompletedTask; + if (TryGetValue(messageCallBackInfo.MessageTypeId, out MessageCallBackInfo info)) + { + throw new Exception($"Add MessageCallBackInfo Faild , MessageCallBackInfo Already Added ,MessageTypeId:{messageCallBackInfo.MessageTypeId}"); + } + Add(messageCallBackInfo.MessageTypeId, messageCallBackInfo); + _logger.LogDebug($"Add MessageCallBackInfo Success ,MessageTypeId:{messageCallBackInfo.MessageTypeId}"); + } + + public async Task Get(string messageTypeId) + { + await Task.CompletedTask; + if (TryGetValue(messageTypeId, out MessageCallBackInfo info)) + { + return info; + } + return null; + } + } +} diff --git a/src/Pole.ReliableMessage/Messaging/CallBack/MessageCallBackResponseStatus.cs b/src/Pole.ReliableMessage/Messaging/CallBack/MessageCallBackResponseStatus.cs new file mode 100644 index 0000000..f4f2875 --- /dev/null +++ b/src/Pole.ReliableMessage/Messaging/CallBack/MessageCallBackResponseStatus.cs @@ -0,0 +1,18 @@ +using Pole.ReliableMessage.Core; +using System; +using System.Collections.Generic; +using System.Text; + +namespace Pole.ReliableMessage.Messaging.CallBack +{ + public class MessageCallBackResponseStatus : Enumeration + { + public static MessageCallBackResponseStatus Finised = new MessageCallBackResponseStatus(3, "已完成"); + public static MessageCallBackResponseStatus Unfinished = new MessageCallBackResponseStatus(6, "未完成"); + public static MessageCallBackResponseStatus Unusual = new MessageCallBackResponseStatus(9, "异常"); + + public MessageCallBackResponseStatus(int id, string name) : base(id, name) + { + } + } +} diff --git a/src/Pole.ReliableMessage/Messaging/DefaultJsonConverter.cs b/src/Pole.ReliableMessage/Messaging/DefaultJsonConverter.cs new file mode 100644 index 0000000..0159576 --- /dev/null +++ b/src/Pole.ReliableMessage/Messaging/DefaultJsonConverter.cs @@ -0,0 +1,25 @@ +using Pole.ReliableMessage.Abstraction; +using System; +using System.Collections.Generic; +using System.Text; + +namespace Pole.ReliableMessage.Messaging +{ + class DefaultJsonConverter : IJsonConverter + { + public T Deserialize(string json) + { + return System.Text.Json.JsonSerializer.Deserialize(json); + } + + public object Deserialize(string json, Type type) + { + return System.Text.Json.JsonSerializer.Deserialize(json, type); + } + + public string Serialize(object obj) + { + return System.Text.Json.JsonSerializer.Serialize(obj); + } + } +} diff --git a/src/Pole.ReliableMessage/Messaging/DefaultMessageBuffer.cs b/src/Pole.ReliableMessage/Messaging/DefaultMessageBuffer.cs new file mode 100644 index 0000000..27a6abf --- /dev/null +++ b/src/Pole.ReliableMessage/Messaging/DefaultMessageBuffer.cs @@ -0,0 +1,58 @@ +using Pole.ReliableMessage.Abstraction; +using Microsoft.Extensions.Logging; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Linq.Expressions; +using System.Text; +using System.Threading.Tasks; + +namespace Pole.ReliableMessage.Messaging +{ + class DefaultMessageBuffer : IMessageBuffer + { + private readonly IMessageStorage _storage; + private readonly System.Collections.Concurrent.ConcurrentDictionary Messages = new System.Collections.Concurrent.ConcurrentDictionary(); + private readonly ILogger _logger; + public DefaultMessageBuffer(IMessageStorage storage, ILogger logger) + { + _storage = storage; + _logger = logger; + } + public async Task Flush() + { + /// 通过 MessageTypeId 是否为空 判断 消息是否为 DefaultReliableBus Publish 完成后的消息状态修改缓冲, + var toUpdateStatusMessageKeyValuePairs = Messages.Where(m => string.IsNullOrEmpty(m.Value.MessageTypeId)); + var toUpdateStatusMessages= toUpdateStatusMessageKeyValuePairs.Select(m=>m.Value).ToArray(); + var toUpdateStatusMessageIds = toUpdateStatusMessageKeyValuePairs.Select(m => m.Key).ToList(); + await _storage.UpdateStatus(toUpdateStatusMessages); + + _logger.LogDebug($"DefaultMessageBuffer.Flush update successfully, Message count{toUpdateStatusMessages.Count()}"); + toUpdateStatusMessageIds.ForEach(m => { + Messages.TryRemove(m,out Message message); + }); + + var toSavedMessageKeyValuePairs = Messages.Where(m => !string.IsNullOrEmpty(m.Value.MessageTypeId)); + var toSavedMessages = toSavedMessageKeyValuePairs.Select(m => m.Value).ToArray(); + var toSavedMessagesIds = toSavedMessageKeyValuePairs.Select(m => m.Key).ToList(); + await _storage.Save(toSavedMessages); + + _logger.LogDebug($"DefaultMessageBuffer.Flush save successfully, Message count{toSavedMessages.Count()}"); + toSavedMessagesIds.ForEach(m => { + Messages.TryRemove(m, out Message message); + }); + } + + public Task Add(Message message) + { + Messages.TryAdd(message.Id, message); + return Task.FromResult(true); + } + + public async Task> GetAll(Func filter) + { + await Task.CompletedTask; + return Messages.Values.Where(filter).ToList(); + } + } +} diff --git a/src/Pole.ReliableMessage/Messaging/DefaultMessageChecker.cs b/src/Pole.ReliableMessage/Messaging/DefaultMessageChecker.cs new file mode 100644 index 0000000..731aff0 --- /dev/null +++ b/src/Pole.ReliableMessage/Messaging/DefaultMessageChecker.cs @@ -0,0 +1,62 @@ +using Pole.ReliableMessage.Abstraction; +using Pole.ReliableMessage.Messaging; +using Pole.ReliableMessage.Messaging.CallBack; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; + +namespace Pole.ReliableMessage.Messaging +{ + class DefaultMessageChecker : IMessageChecker + { + private readonly IMessageCallBackInfoStore _messageCallBackInfoStore; + private readonly ILogger _logger; + private readonly IJsonConverter _jsonConverter; + private readonly IServiceProvider _serviceProvider; + public DefaultMessageChecker(IMessageCallBackInfoStore messageCallBackInfoStore, ILogger logger, IJsonConverter jsonConverter, IServiceProvider serviceProvider) + { + _messageCallBackInfoStore = messageCallBackInfoStore; + _logger = logger; + _jsonConverter = jsonConverter; + _serviceProvider = serviceProvider; + } + public async Task GetResult(Message message) + { + try + { + var callBackInfo = await _messageCallBackInfoStore.Get(message.MessageTypeId); + if (callBackInfo == null) + { + _logger.LogError($"Can't find the callBackInfo, MessageTypeId:{message.MessageTypeId}"); + return MessageCheckerResult.NotFinished; + } + using (var scope = _serviceProvider.CreateScope()) + { + var callback = scope.ServiceProvider.GetRequiredService(callBackInfo.EventCallbackType); + var argument = _jsonConverter.Deserialize(message.RePushCallBackParameterValue, callBackInfo.EventCallbackArguemntType); + var result = await callBackInfo.Invoke(argument, callback); + if (_logger.IsEnabled(LogLevel.Debug)) + { + var messageInfoDetail = _jsonConverter.Serialize(message); + _logger.LogDebug($"DefaultMessageChecker IsFinished Result:{result.ToString()},MessageTypeId:{message.MessageTypeId},MessageDetail:{messageInfoDetail}"); + } + if (result) + { + var @event = _jsonConverter.Deserialize(message.Content, callBackInfo.EventType); + return new MessageCheckerResult(true, @event); + } + return MessageCheckerResult.NotFinished; + } + } + catch (Exception ex) + { + var messageInfoDetail = _jsonConverter.Serialize(message); + _logger.LogError(ex, $"DefaultMessageChecker.IsFinished Error, MessageTypeId:{message.MessageTypeId},MessageDetail:{messageInfoDetail}"); + return MessageCheckerResult.NotFinished; + } + } + } +} diff --git a/src/Pole.ReliableMessage/Messaging/DefaultMessageTypeIdGenerator.cs b/src/Pole.ReliableMessage/Messaging/DefaultMessageTypeIdGenerator.cs new file mode 100644 index 0000000..893433a --- /dev/null +++ b/src/Pole.ReliableMessage/Messaging/DefaultMessageTypeIdGenerator.cs @@ -0,0 +1,15 @@ +using Pole.ReliableMessage.Abstraction; +using System; +using System.Collections.Generic; +using System.Text; + +namespace Pole.ReliableMessage.Messaging +{ + class DefaultMessageTypeIdGenerator : IMessageTypeIdGenerator + { + public string Generate(Type messageType) + { + return messageType.FullName; + } + } +} diff --git a/src/Pole.ReliableMessage/Messaging/Message.cs b/src/Pole.ReliableMessage/Messaging/Message.cs new file mode 100644 index 0000000..048ebd4 --- /dev/null +++ b/src/Pole.ReliableMessage/Messaging/Message.cs @@ -0,0 +1,79 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace Pole.ReliableMessage.Messaging +{ + public class Message : IComparable + { + /// + /// 这里id 永远为 string + /// + public string Id { get; set; } + + /// + /// 消息状态 + /// + public MessageStatus MessageStatus { get; set; } + + /// + /// 消息状态Id + /// + + public int MessageStatusId { get; set; } + + /// + /// 预发送的时间 + /// + public DateTime AddedUTCTime { get; set; } + + /// + /// 用来存放 消息内容 目前没有大小限制 这个需要根据 实际情况 , mongodb 和 rabiitmq 的 综合指标来定 ,开发人员 在使用超大对象时需谨慎 + /// + public string Content { get; set; } + + /// + /// 消息的名称 用来鉴别不同的消息 + /// + public string MessageTypeId { get; set; } + + /// + /// 当前消息 回调者所需参数值 + /// + public string RePushCallBackParameterValue { get; set; } + + ///// + ///// 最后一次的重试时间 + ///// + //public DateTime LastRetryUTCTime { get; set; } + + + /// + /// 下一次的重试时间 + /// + public DateTime NextRetryUTCTime { get; set; } + + /// + /// 重试次数 + /// + public int RetryTimes { get; set; } = 0; + + public int CompareTo(Message other) + { + return Id.CompareTo(other.Id); + } + } + public class MessageIEqualityComparer : IEqualityComparer + { + public static MessageIEqualityComparer Default = new MessageIEqualityComparer(); + public bool Equals(Message x, Message y) + { + return x.CompareTo(y) == 0; + } + + public int GetHashCode(Message obj) + { + return obj.Id.GetHashCode(); + } + } +} diff --git a/src/Pole.ReliableMessage/Messaging/MessageCheckerResult.cs b/src/Pole.ReliableMessage/Messaging/MessageCheckerResult.cs new file mode 100644 index 0000000..a3ab5cb --- /dev/null +++ b/src/Pole.ReliableMessage/Messaging/MessageCheckerResult.cs @@ -0,0 +1,21 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace Pole.ReliableMessage.Messaging +{ + public class MessageCheckerResult + { + public static MessageCheckerResult NotFinished = new MessageCheckerResult(false); + public MessageCheckerResult(bool isFinished,object @event):this(isFinished) + { + Event = @event; + } + public MessageCheckerResult(bool isFinished) + { + IsFinished = isFinished; + } + public bool IsFinished { get; set; } + public object Event { get; set; } + } +} diff --git a/src/Pole.ReliableMessage/Messaging/MessageStatus.cs b/src/Pole.ReliableMessage/Messaging/MessageStatus.cs new file mode 100644 index 0000000..100ca52 --- /dev/null +++ b/src/Pole.ReliableMessage/Messaging/MessageStatus.cs @@ -0,0 +1,20 @@ +using Pole.ReliableMessage.Core; +using System; +using System.Collections.Generic; +using System.Text; + +namespace Pole.ReliableMessage.Messaging +{ + public class MessageStatus : Enumeration + { + public static MessageStatus Pending = new MessageStatus(3,"待发送"); + public static MessageStatus Pushed = new MessageStatus(6,"已发送"); + public static MessageStatus Canced = new MessageStatus(9,"已取消"); + public static MessageStatus Handed = new MessageStatus(12, "已处理"); + + public MessageStatus(int id,string name ):base(id,name) + { + + } + } +} diff --git a/src/Pole.ReliableMessage/Pole.ReliableMessage.csproj b/src/Pole.ReliableMessage/Pole.ReliableMessage.csproj new file mode 100644 index 0000000..a71d49c --- /dev/null +++ b/src/Pole.ReliableMessage/Pole.ReliableMessage.csproj @@ -0,0 +1,24 @@ + + + + netstandard2.0 + + + + + + + + + + + + + + + + + + + + diff --git a/src/Pole.ReliableMessage/Processor/LoopProcessor.cs b/src/Pole.ReliableMessage/Processor/LoopProcessor.cs new file mode 100644 index 0000000..e52ffa7 --- /dev/null +++ b/src/Pole.ReliableMessage/Processor/LoopProcessor.cs @@ -0,0 +1,51 @@ +using Pole.ReliableMessage.Abstraction; +using Microsoft.Extensions.Logging; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; + +namespace Pole.ReliableMessage.Processor +{ + public class LoopProcessor : ProcessorBase + { + private IProcessor _processor; + private readonly ILoggerFactory _loggerFactory; + private readonly ITimeHelper _timeHelper; + + public LoopProcessor(IProcessor processor, ILoggerFactory loggerFactory, ITimeHelper timeHelper) + { + _processor = processor; + _loggerFactory = loggerFactory; + _timeHelper = timeHelper; + } + public override string Name => "LoopProcessor"; + public override async Task Process(ProcessingContext context) + { + var logger = _loggerFactory.CreateLogger(); + + while (!context.IsStopping) + { + try + { + logger.LogDebug($"{_timeHelper.GetAppropriateFormatedDateString()}...{ this.ToString() } process start"); + + await _processor.Process(context); + + logger.LogDebug($"{_timeHelper.GetAppropriateFormatedDateString()}...{ this.ToString() } process compelete"); + } + catch (Exception ex) + { + logger.LogError(ex, $"{_timeHelper.GetAppropriateFormatedDateString()}...{ this.ToString() } process error"); + } + } + } + public override string ToString() + { + var strArray = new string[2]; + strArray[0] = Name; + strArray[1] = _processor.Name; + return string.Join("_", strArray); + } + } +} diff --git a/src/Pole.ReliableMessage/Processor/MessageBufferFlushProcessor.cs b/src/Pole.ReliableMessage/Processor/MessageBufferFlushProcessor.cs new file mode 100644 index 0000000..8421317 --- /dev/null +++ b/src/Pole.ReliableMessage/Processor/MessageBufferFlushProcessor.cs @@ -0,0 +1,41 @@ +using Pole.ReliableMessage.Abstraction; +using Pole.ReliableMessage.Messaging; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; + +namespace Pole.ReliableMessage.Processor +{ + public class MessageBufferFlushProcessor : ProcessorBase + { + private readonly IMessageBuffer _messageBuffer; + private readonly ReliableMessageOption _options; + private readonly ILogger _logger; + public MessageBufferFlushProcessor(IMessageBuffer messageBuffer, IOptions options, ILogger logger) + { + _messageBuffer = messageBuffer; + _options = options.Value ?? throw new Exception($"{nameof(ReliableMessageOption)} Must be injected"); + _logger = logger; + } + public override string Name => nameof(PendingMessageCheckProcessor); + + public override async Task Process(ProcessingContext context) + { + try + { + await _messageBuffer.Flush(); + } + catch(Exception ex) + { + _logger.LogError(ex, $"MessageBufferFlushProcessor Process Error"); + } + finally + { + await Task.Delay(_options.PushedMessageFlushInterval * 1000); + } + } + } +} diff --git a/src/Pole.ReliableMessage/Processor/PendingMessageCheckProcessor.cs b/src/Pole.ReliableMessage/Processor/PendingMessageCheckProcessor.cs new file mode 100644 index 0000000..7d25d53 --- /dev/null +++ b/src/Pole.ReliableMessage/Processor/PendingMessageCheckProcessor.cs @@ -0,0 +1,134 @@ +using Pole.ReliableMessage.Abstraction; +using Pole.ReliableMessage.Messaging; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Net.NetworkInformation; +using System.Net.Sockets; +using System.Text; +using System.Threading.Tasks; +using Pole.ReliableMessage.Storage.Abstraction; + +namespace Pole.ReliableMessage.Processor +{ + class PendingMessageCheckProcessor : ProcessorBase + { + private readonly IMessageStorage _storage; + private readonly ReliableMessageOption _options; + private readonly IMessageBuffer _messageBuffer; + private readonly ITimeHelper _timeHelper; + private readonly IMessageChecker _messageChecker; + private readonly IRetryTimeDelayCalculator _retryTimeDelayCalculator; + private readonly IMemberShipTable _memberShipTable; + private readonly ILogger _logger; + private readonly IServiceIPv4AddressProvider _serviceIPv4AddressProvider; + private readonly IMessageBus _messageBus; + public PendingMessageCheckProcessor(IMessageStorage storage, IOptions options, IMessageBuffer messageBuffer, ITimeHelper timeHelper, IMessageChecker messageChecker, IRetryTimeDelayCalculator retryTimeDelayCalculator, IMemberShipTable memberShipTable, ILogger logger, IServiceIPv4AddressProvider serviceIPv4AddressProvider, IMessageBus messageBus) + { + _storage = storage; + _options = options.Value ?? throw new Exception($"{nameof(ReliableMessageOption)} Must be injected"); + _messageBuffer = messageBuffer; + _timeHelper = timeHelper; + _messageChecker = messageChecker; + _retryTimeDelayCalculator = retryTimeDelayCalculator; + _memberShipTable = memberShipTable; + _logger = logger; + _serviceIPv4AddressProvider = serviceIPv4AddressProvider; + _messageBus = messageBus; + } + public override string Name => nameof(PendingMessageCheckProcessor); + + + public override async Task Process(ProcessingContext context) + { + try + { + var iPStr = _serviceIPv4AddressProvider.Get(); + + var isPendingChecker = await _memberShipTable.IsPendingMessageCheckerServiceInstance(iPStr);// 这里可以把时间加上去 + if (!isPendingChecker) + { + _logger.LogDebug("I an not the PendingChecker ,Ignore check"); + return; + } + + var now = _timeHelper.GetUTCNow(); + var pendingMessages = await _storage.GetMany(m => m.MessageStatusId == MessageStatus.Pending.Id &&m.NextRetryUTCTime <= now && m.AddedUTCTime <= now.AddSeconds(-1 * _options.PendingMessageFirstProcessingWaitTime)&&m.AddedUTCTime>= now.AddSeconds(-1 * _options.PendingMessageCheckingTimeOutSeconds),_options.PendingMessageCheckBatchCount); + + var cachedPushedMessage = await _messageBuffer.GetAll(m => m.MessageStatus == MessageStatus.Pushed); + + var finalToCheckedMessages = pendingMessages.Except(cachedPushedMessage, MessageIEqualityComparer.Default).ToList(); + if (_logger.IsEnabled(LogLevel.Debug)) + { + _logger.LogDebug($"PendingMessageCheckProcessor pendingMessages count:{pendingMessages.Count}"); + _logger.LogDebug($"PendingMessageCheckProcessor cachedPushedMessage count:{cachedPushedMessage.Count}"); + _logger.LogDebug($"PendingMessageCheckProcessor finalToCheckedMessages count:{finalToCheckedMessages.Count}"); + } + + finalToCheckedMessages.AsParallel().ForAll(async m => await Retry(m, now)); + } + catch (Exception ex) + { + _logger.LogError(ex, $"PendingMessageCheckProcessor Process Error"); + } + finally + { + await Task.Delay(_options.PendingMessageRetryInterval * 1000); + } + } + + private async Task Retry(Message message, DateTime retryTime) + { + try + { + await Task.CompletedTask; + + if (_logger.IsEnabled(LogLevel.Debug)) + { + _logger.LogDebug($"PendingMessageCheckProcessor.Retry ,message:{message.Id} begin Retry"); + } + var nextRetryDelay = _retryTimeDelayCalculator.Get(message.RetryTimes, _options.MaxPendingMessageRetryDelay); + message.NextRetryUTCTime = retryTime.AddSeconds(nextRetryDelay); + + if (retryTime > message.AddedUTCTime.AddSeconds(_options.PendingMessageTimeOut)) + { + if (_logger.IsEnabled(LogLevel.Debug)) + { + _logger.LogDebug($"PendingMessageCheckProcessor.Retry ,message:{message.Id} would be Canced ,PendingMessageTimeOut:{_options.PendingMessageTimeOut}"); + } + + message.NextRetryUTCTime = DateTime.MinValue; + message.MessageStatus = MessageStatus.Canced; + await _messageBuffer.Add(message); + return; + } + message.RetryTimes++; + + var messageCheckerResult = await _messageChecker.GetResult(message); + if (messageCheckerResult.IsFinished) + { + if (_logger.IsEnabled(LogLevel.Debug)) + { + _logger.LogDebug($"PendingMessageCheckProcessor.Retry ,message:{message.Id} would be Pushed"); + } + message.MessageStatus = MessageStatus.Pushed; + await _messageBus.Publish(messageCheckerResult.Event, message.Id); + } + else + { + if (_logger.IsEnabled(LogLevel.Debug)) + { + _logger.LogDebug($"PendingMessageCheckProcessor.Retry ,message:{message.Id} would be Retry next time"); + } + } + await _messageBuffer.Add(message); + } + catch (Exception ex) + { + _logger.LogError(ex, $"PendingMessageCheckProcessor Retry ,Message:{message.Id} retry with errors"); + } + } + } +} diff --git a/src/Pole.ReliableMessage/Processor/PendingMessageServiceInstanceCheckProcessor.cs b/src/Pole.ReliableMessage/Processor/PendingMessageServiceInstanceCheckProcessor.cs new file mode 100644 index 0000000..a1d1e6d --- /dev/null +++ b/src/Pole.ReliableMessage/Processor/PendingMessageServiceInstanceCheckProcessor.cs @@ -0,0 +1,68 @@ +using Pole.ReliableMessage.Abstraction; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Pole.ReliableMessage.Storage.Abstraction; + +namespace Pole.ReliableMessage.Processor +{ + class PendingMessageServiceInstanceCheckProcessor : ProcessorBase + { + private readonly ReliableMessageOption _options; + private readonly ITimeHelper _timeHelper; + private readonly IMemberShipTable _memberShipTable; + private readonly ILogger _logger; + private readonly IServiceIPv4AddressProvider _serviceIPv4AddressProvider; + public PendingMessageServiceInstanceCheckProcessor(IOptions options, ITimeHelper timeHelper, IMemberShipTable memberShipTable, ILogger logger, IServiceIPv4AddressProvider serviceIPv4AddressProvider) + { + _options = options.Value ?? throw new Exception($"{nameof(ReliableMessageOption)} Must be injected"); + _timeHelper = timeHelper; + _memberShipTable = memberShipTable; + _logger = logger; + _serviceIPv4AddressProvider = serviceIPv4AddressProvider; + } + public override string Name => nameof(PendingMessageCheckProcessor); + + + public override async Task Process(ProcessingContext context) + { + try + { + var now = _timeHelper.GetUTCNow(); + var iPStr = _serviceIPv4AddressProvider.Get(); + _logger.LogDebug($"Current instance ip is {iPStr}"); + + var currentCheckIp = await _memberShipTable.GetPendingMessageCheckerServiceInstanceIp(now.AddSeconds(-1 * _options.PendingMessageCheckerInstanceIAnAliveTimeout)); + if (string.IsNullOrEmpty(currentCheckIp)) + { + var addInstanceResult = await _memberShipTable.AddCheckerServiceInstanceAndDeleteOthers(iPStr, now); + if (addInstanceResult) + { + _logger.LogInformation($"I am the PendingMessageCheck now, ip:{iPStr}"); + return; + } + _logger.LogError($"There is no PendingMessageChecker ,I want to be the PendingMessageCheck ,but faild ,memberShipTable.AddCheckerServiceInstance faild , ip:{iPStr}"); + return; + } + if (currentCheckIp == iPStr) + { + _logger.LogDebug($"Begin update pendingMessageChecker iAmAliveUTCTime"); + await _memberShipTable.UpdateIAmAlive(currentCheckIp, now); + _logger.LogDebug($"Update pendingMessageChecker iAmAliveUTCTime successfully"); + } + } + catch (Exception ex) + { + _logger.LogError(ex, $"PendingMessageServiceInstanceCheckProcessor Process Error"); + } + finally + { + await Task.Delay(_options.PendingMessageCheckerInstanceIAnAliveTimeUpdateDelay * 1000); + } + } + } +} diff --git a/src/Pole.ReliableMessage/Processor/ProcessingContext.cs b/src/Pole.ReliableMessage/Processor/ProcessingContext.cs new file mode 100644 index 0000000..0d51926 --- /dev/null +++ b/src/Pole.ReliableMessage/Processor/ProcessingContext.cs @@ -0,0 +1,17 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; + +namespace Pole.ReliableMessage.Processor +{ + public class ProcessingContext + { + public ProcessingContext(CancellationToken cancellationToken) + { + CancellationToken = cancellationToken; + } + public CancellationToken CancellationToken { get; } + public bool IsStopping => CancellationToken.IsCancellationRequested; + } +} diff --git a/src/Pole.ReliableMessage/Processor/ProcessorBase.cs b/src/Pole.ReliableMessage/Processor/ProcessorBase.cs new file mode 100644 index 0000000..e6cd01a --- /dev/null +++ b/src/Pole.ReliableMessage/Processor/ProcessorBase.cs @@ -0,0 +1,20 @@ +using Pole.ReliableMessage.Abstraction; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; + +namespace Pole.ReliableMessage.Processor +{ + public abstract class ProcessorBase : IProcessor + { + public abstract string Name { get; } + + public abstract Task Process(ProcessingContext context); + + public override string ToString() + { + return Name; + } + } +} diff --git a/src/Pole.ReliableMessage/ReliableMessageOption.cs b/src/Pole.ReliableMessage/ReliableMessageOption.cs new file mode 100644 index 0000000..0c3cfb4 --- /dev/null +++ b/src/Pole.ReliableMessage/ReliableMessageOption.cs @@ -0,0 +1,93 @@ +using Microsoft.Extensions.DependencyInjection; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Reflection; +using System.Text; + +namespace Pole.ReliableMessage +{ + public class ReliableMessageOption + { + public List ReliableMessageOptionExtensions { get; private set; } = new List(); + public List EventCallbackAssemblies { get; private set; } + public List EventHandlerAssemblies { get; private set; } + /// + /// 待发送消息重试间隔 单位 秒 + /// + public int PendingMessageRetryInterval { get; set; } = 10; + + /// + /// 待发送消息最大重试次数 , 0 为 无上限 + /// + public int PendingMessageRetryTimes { get; set; } = 0; + + /// + /// 预发送消息超时时间 单位 秒 + /// + public int PendingMessageTimeOut { get; set; } = 10*60; + + /// + /// 预发送消息检查时每一次获取的消息数量 + /// + public int PendingMessageCheckBatchCount { get; set; } = 1000; + + /// + /// 预发送消息状态检查最后时间 单位 秒 + /// + public int PendingMessageCheckingTimeOutSeconds { get; set; } = 13*60; + + /// + /// 已发送的消息缓冲区 flush to storage 的时间间隔 单位 秒 + /// + public int PushedMessageFlushInterval { get; set; } = 2; + + + /// + /// PendingMessage 第一次处理等待时间 单位 秒 + /// + public int PendingMessageFirstProcessingWaitTime { get; set; } = 2+10; + + /// + /// 每次重试之间最大间隔 单位 秒 + /// + public int MaxPendingMessageRetryDelay { get; set; } = 2 * 60; + + /// + /// PendingMessageCheck 实例更新 存活时间的时间间隔 单位 秒 + /// + public int PendingMessageCheckerInstanceIAnAliveTimeUpdateDelay { get; set; } = 10; + + /// + /// PendingMessageCheck 实例存活超时时间 单位 秒 + /// + public int PendingMessageCheckerInstanceIAnAliveTimeout { get; set; } = 3*10; + + /// + /// 当主机有多个网络时通过指定网关地址找到合适的服务ip地址 + /// + public string NetworkInterfaceGatewayAddress { get; set; } = string.Empty; + + public ReliableMessageOption AddEventAssemblies (params Assembly [] assemblies) + { + EventCallbackAssemblies = assemblies.ToList(); + return this; + } + public ReliableMessageOption AddEventAssemblies(IEnumerable assemblies) + { + EventCallbackAssemblies = assemblies.ToList(); + return this; + } + public ReliableMessageOption AddEventHandlerAssemblies(params Assembly[] assemblies) + { + EventHandlerAssemblies = assemblies.ToList(); + return this; + } + public ReliableMessageOption AddEventHandlerAssemblies(IEnumerable assemblies) + { + EventHandlerAssemblies = assemblies.ToList(); + return this; + } + } + +} diff --git a/src/Pole.ReliableMessage/Utils/DefaulTimeHelper.cs b/src/Pole.ReliableMessage/Utils/DefaulTimeHelper.cs new file mode 100644 index 0000000..a9945b8 --- /dev/null +++ b/src/Pole.ReliableMessage/Utils/DefaulTimeHelper.cs @@ -0,0 +1,20 @@ +using Pole.ReliableMessage.Abstraction; +using System; +using System.Collections.Generic; +using System.Text; + +namespace Pole.ReliableMessage.Utils +{ + class DefaulTimeHelper : ITimeHelper + { + public string GetAppropriateFormatedDateString() + { + return DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss.fff"); + } + + public DateTime GetUTCNow() + { + return DateTime.UtcNow; + } + } +} diff --git a/src/Pole.ReliableMessage/Utils/DefaultServiceIPv4AddressProvider.cs b/src/Pole.ReliableMessage/Utils/DefaultServiceIPv4AddressProvider.cs new file mode 100644 index 0000000..cd2a250 --- /dev/null +++ b/src/Pole.ReliableMessage/Utils/DefaultServiceIPv4AddressProvider.cs @@ -0,0 +1,57 @@ +using Pole.ReliableMessage.Abstraction; +using Microsoft.Extensions.Options; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Net.NetworkInformation; +using System.Net.Sockets; +using System.Text; + +namespace Pole.ReliableMessage.Utils +{ + class DefaultServiceIPv4AddressProvider : IServiceIPv4AddressProvider + { + private readonly ReliableMessageOption _options; + private string _ipAddress; + public DefaultServiceIPv4AddressProvider(IOptions options) + { + _options = options.Value; + Init(); + } + + private void Init() + { + var gatewayAddress = _options.NetworkInterfaceGatewayAddress; + NetworkInterface networkInterface = null; + if (string.IsNullOrEmpty(_options.NetworkInterfaceGatewayAddress)) + { + networkInterface = NetworkInterface.GetAllNetworkInterfaces() + .OrderByDescending(c => c.Speed) + .Where(m => m.NetworkInterfaceType != NetworkInterfaceType.Loopback && m.OperationalStatus == OperationalStatus.Up) + .FirstOrDefault(); + } + else + { + networkInterface = NetworkInterface.GetAllNetworkInterfaces() +.OrderByDescending(c => c.Speed).Where(m => m.NetworkInterfaceType != NetworkInterfaceType.Loopback && m.OperationalStatus == OperationalStatus.Up).Where(m => m.GetIPProperties().GatewayAddresses.FirstOrDefault(c => c.Address.AddressFamily == AddressFamily.InterNetwork)?.Address.ToString() == gatewayAddress) +.FirstOrDefault(); + } + if (networkInterface == null) + { + throw new Exception($"Not found correct NetworkInterface, option.NetworkInterfaceGatewayAddress:{gatewayAddress}"); + } + var props = networkInterface.GetIPProperties(); + // get first IPV4 address assigned to this interface + var firstIpV4Address = props.UnicastAddresses + .Where(c => c.Address.AddressFamily == AddressFamily.InterNetwork) + .Select(c => c.Address) + .FirstOrDefault(); + _ipAddress = firstIpV4Address.ToString(); + } + + public string Get() + { + return _ipAddress; + } + } +} -- libgit2 0.25.0