From 18f75144dbe2f386866712c21534c0cc7589478a Mon Sep 17 00:00:00 2001 From: 丁松杰 <377973147@qq.com> Date: Fri, 7 Feb 2020 11:43:06 +0800 Subject: [PATCH] add eventbus --- Pole.sln | 7 +++++++ src/Pole.Core/Abstraction/IGrainID.cs | 9 +++++++++ src/Pole.Core/Abstraction/IObserverUnit.cs | 26 ++++++++++++++++++++++++++ src/Pole.Core/Abstraction/IObserverUnitContainer.cs | 11 +++++++++++ src/Pole.Core/Channels/Abstractions/IBaseMpscChannel.cs | 28 ++++++++++++++++++++++++++++ src/Pole.Core/Channels/Abstractions/IMpscChannel.cs | 14 ++++++++++++++ src/Pole.Core/Channels/AsyncInputEvent.cs | 14 ++++++++++++++ src/Pole.Core/Channels/ChannelOptions.cs | 14 ++++++++++++++ src/Pole.Core/Channels/MpscChannel.cs | 164 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/Pole.Core/Channels/NoBindConsumerException.cs | 11 +++++++++++ src/Pole.Core/Channels/RebindConsumerException.cs | 11 +++++++++++ src/Pole.Core/EventBus/Consumer.cs | 33 +++++++++++++++++++++++++++++++++ src/Pole.Core/EventBus/IConsumer.cs | 11 +++++++++++ src/Pole.Core/EventBus/IConsumerContainer.cs | 9 +++++++++ src/Pole.Core/EventBus/IProducer.cs | 9 +++++++++ src/Pole.Core/EventBus/IProducerContainer.cs | 11 +++++++++++ src/Pole.Core/Exceptions/BeginTxTimeoutException.cs | 12 ++++++++++++ src/Pole.Core/Exceptions/ChannelUnavailabilityException.cs | 11 +++++++++++ src/Pole.Core/Exceptions/EventBusRepeatBindingProducerException.cs | 11 +++++++++++ src/Pole.Core/Exceptions/EventIsClearedException.cs | 11 +++++++++++ src/Pole.Core/Exceptions/EventVersionUnorderedException.cs | 12 ++++++++++++ src/Pole.Core/Exceptions/ObserverNotCompletedException.cs | 12 ++++++++++++ src/Pole.Core/Exceptions/ObserverUnitRepeatedException.cs | 11 +++++++++++ src/Pole.Core/Exceptions/PrimaryKeyTypeException.cs | 12 ++++++++++++ src/Pole.Core/Exceptions/RepeatedTxException.cs | 12 ++++++++++++ src/Pole.Core/Exceptions/SnapshotNotSupportTxException.cs | 11 +++++++++++ src/Pole.Core/Exceptions/StateInsecurityException.cs | 12 ++++++++++++ src/Pole.Core/Exceptions/StateIsOverException.cs | 12 ++++++++++++ src/Pole.Core/Exceptions/TxCommitException.cs | 8 ++++++++ src/Pole.Core/Exceptions/TxIdException.cs | 8 ++++++++ src/Pole.Core/Exceptions/TxSnapshotException.cs | 12 ++++++++++++ src/Pole.Core/Exceptions/TypeCodeRepeatedException.cs | 11 +++++++++++ src/Pole.Core/Exceptions/UnMatchObserverUnitException.cs | 11 +++++++++++ src/Pole.Core/Exceptions/UnfindEventHandlerException.cs | 12 ++++++++++++ src/Pole.Core/Exceptions/UnfindObserverUnitException.cs | 11 +++++++++++ src/Pole.Core/Exceptions/UnfindSnapshotHandlerException.cs | 12 ++++++++++++ src/Pole.Core/Exceptions/UnknowTypeCodeException.cs | 11 +++++++++++ src/Pole.Core/Exceptions/UnopenedTransactionException.cs | 12 ++++++++++++ src/Pole.Core/Observer/Abstraction/Attributes/ObserverAttribute.cs | 44 ++++++++++++++++++++++++++++++++++++++++++++ src/Pole.Core/Observer/Abstraction/IObserver.cs | 17 +++++++++++++++++ src/Pole.Core/Observer/Abstraction/IVersion.cs | 10 ++++++++++ src/Pole.Core/Pole.Core.csproj | 7 ++++++- src/Pole.Core/Services/Abstraction/IHoldLock.cs | 12 ++++++++++++ src/Pole.Core/Services/Abstraction/ILocalUID.cs | 15 +++++++++++++++ src/Pole.Core/Services/Abstraction/ILock.cs | 11 +++++++++++ src/Pole.Core/Services/Abstraction/IUtcUID.cs | 15 +++++++++++++++ src/Pole.Core/Services/Abstraction/IWeightHoldLock.cs | 13 +++++++++++++ src/Pole.Core/Services/HoldLockGrain.cs | 48 ++++++++++++++++++++++++++++++++++++++++++++++++ src/Pole.Core/Services/LocalUIDGrain.cs | 59 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/Pole.Core/Services/LockGrain.cs | 42 ++++++++++++++++++++++++++++++++++++++++++ src/Pole.Core/Services/UtcUIDGrain.cs | 59 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/Pole.Core/Services/WeightHoldLock.cs | 57 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/Pole.Core/Utils/AssemblyHelper.cs | 31 +++++++++++++++++++++++++++++++ src/Pole.Core/Utils/ConsistentHash.cs | 114 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/Pole.Core/Utils/Emit/SwitchMethodEmit.cs | 37 +++++++++++++++++++++++++++++++++++++ src/Pole.Core/Utils/MurmurHash2.cs | 73 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/Pole.Core/Utils/PooledMemoryStream.cs | 201 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/Pole.EventBus.Rabbitmq/Client/ConnectionWrapper.cs | 43 +++++++++++++++++++++++++++++++++++++++++++ src/Pole.EventBus.Rabbitmq/Client/IRabbitMQClient.cs | 7 +++++++ src/Pole.EventBus.Rabbitmq/Client/ModelPooledObjectPolicy.cs | 57 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/Pole.EventBus.Rabbitmq/Client/ModelWrapper.cs | 40 ++++++++++++++++++++++++++++++++++++++++ src/Pole.EventBus.Rabbitmq/Client/RabbitMQClient.cs | 33 +++++++++++++++++++++++++++++++++ src/Pole.EventBus.Rabbitmq/Configuration/ConsumerOptions.cs | 17 +++++++++++++++++ src/Pole.EventBus.Rabbitmq/Configuration/RabbitOptions.cs | 38 ++++++++++++++++++++++++++++++++++++++ src/Pole.EventBus.Rabbitmq/Consumer/ConsumerManager.cs | 169 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/Pole.EventBus.Rabbitmq/Consumer/ConsumerRunner.cs | 119 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/Pole.EventBus.Rabbitmq/Consumer/RabbitConsumer.cs | 19 +++++++++++++++++++ src/Pole.EventBus.Rabbitmq/Core/Attributes/ProducerAttribute.cs | 24 ++++++++++++++++++++++++ src/Pole.EventBus.Rabbitmq/Core/Client/ConnectionWrapper.cs | 43 +++++++++++++++++++++++++++++++++++++++++++ src/Pole.EventBus.Rabbitmq/Core/Client/IRabbitMQClient.cs | 7 +++++++ src/Pole.EventBus.Rabbitmq/Core/Client/ModelPooledObjectPolicy.cs | 57 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/Pole.EventBus.Rabbitmq/Core/Client/ModelWrapper.cs | 40 ++++++++++++++++++++++++++++++++++++++++ src/Pole.EventBus.Rabbitmq/Core/Client/RabbitMQClient.cs | 33 +++++++++++++++++++++++++++++++++ src/Pole.EventBus.Rabbitmq/Core/Configuration/ConsumerOptions.cs | 17 +++++++++++++++++ src/Pole.EventBus.Rabbitmq/Core/Configuration/RabbitOptions.cs | 38 ++++++++++++++++++++++++++++++++++++++ src/Pole.EventBus.Rabbitmq/Core/Consumer/ConsumerManager.cs | 169 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/Pole.EventBus.Rabbitmq/Core/Consumer/ConsumerRunner.cs | 119 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/Pole.EventBus.Rabbitmq/Core/Consumer/RabbitConsumer.cs | 19 +++++++++++++++++++ src/Pole.EventBus.Rabbitmq/Core/EventBusContainer.cs | 114 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/Pole.EventBus.Rabbitmq/Core/EventBusRepeatException.cs | 11 +++++++++++ src/Pole.EventBus.Rabbitmq/Core/IRabbitEventBusContainer.cs | 13 +++++++++++++ src/Pole.EventBus.Rabbitmq/Core/QueueInfo.cs | 12 ++++++++++++ src/Pole.EventBus.Rabbitmq/Core/RabbitEventBus.cs | 122 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/Pole.EventBus.Rabbitmq/Extensions.cs | 31 +++++++++++++++++++++++++++++++ src/Pole.EventBus.Rabbitmq/Pole.EventBus.Rabbitmq.csproj | 20 ++++++++++++++++++++ src/Pole.EventBus.Rabbitmq/Producer/RabbitProducer.cs | 25 +++++++++++++++++++++++++ 86 files changed, 2949 insertions(+), 1 deletion(-) create mode 100644 src/Pole.Core/Abstraction/IGrainID.cs create mode 100644 src/Pole.Core/Abstraction/IObserverUnit.cs create mode 100644 src/Pole.Core/Abstraction/IObserverUnitContainer.cs create mode 100644 src/Pole.Core/Channels/Abstractions/IBaseMpscChannel.cs create mode 100644 src/Pole.Core/Channels/Abstractions/IMpscChannel.cs create mode 100644 src/Pole.Core/Channels/AsyncInputEvent.cs create mode 100644 src/Pole.Core/Channels/ChannelOptions.cs create mode 100644 src/Pole.Core/Channels/MpscChannel.cs create mode 100644 src/Pole.Core/Channels/NoBindConsumerException.cs create mode 100644 src/Pole.Core/Channels/RebindConsumerException.cs create mode 100644 src/Pole.Core/EventBus/Consumer.cs create mode 100644 src/Pole.Core/EventBus/IConsumer.cs create mode 100644 src/Pole.Core/EventBus/IConsumerContainer.cs create mode 100644 src/Pole.Core/EventBus/IProducer.cs create mode 100644 src/Pole.Core/EventBus/IProducerContainer.cs create mode 100644 src/Pole.Core/Exceptions/BeginTxTimeoutException.cs create mode 100644 src/Pole.Core/Exceptions/ChannelUnavailabilityException.cs create mode 100644 src/Pole.Core/Exceptions/EventBusRepeatBindingProducerException.cs create mode 100644 src/Pole.Core/Exceptions/EventIsClearedException.cs create mode 100644 src/Pole.Core/Exceptions/EventVersionUnorderedException.cs create mode 100644 src/Pole.Core/Exceptions/ObserverNotCompletedException.cs create mode 100644 src/Pole.Core/Exceptions/ObserverUnitRepeatedException.cs create mode 100644 src/Pole.Core/Exceptions/PrimaryKeyTypeException.cs create mode 100644 src/Pole.Core/Exceptions/RepeatedTxException.cs create mode 100644 src/Pole.Core/Exceptions/SnapshotNotSupportTxException.cs create mode 100644 src/Pole.Core/Exceptions/StateInsecurityException.cs create mode 100644 src/Pole.Core/Exceptions/StateIsOverException.cs create mode 100644 src/Pole.Core/Exceptions/TxCommitException.cs create mode 100644 src/Pole.Core/Exceptions/TxIdException.cs create mode 100644 src/Pole.Core/Exceptions/TxSnapshotException.cs create mode 100644 src/Pole.Core/Exceptions/TypeCodeRepeatedException.cs create mode 100644 src/Pole.Core/Exceptions/UnMatchObserverUnitException.cs create mode 100644 src/Pole.Core/Exceptions/UnfindEventHandlerException.cs create mode 100644 src/Pole.Core/Exceptions/UnfindObserverUnitException.cs create mode 100644 src/Pole.Core/Exceptions/UnfindSnapshotHandlerException.cs create mode 100644 src/Pole.Core/Exceptions/UnknowTypeCodeException.cs create mode 100644 src/Pole.Core/Exceptions/UnopenedTransactionException.cs create mode 100644 src/Pole.Core/Observer/Abstraction/Attributes/ObserverAttribute.cs create mode 100644 src/Pole.Core/Observer/Abstraction/IObserver.cs create mode 100644 src/Pole.Core/Observer/Abstraction/IVersion.cs create mode 100644 src/Pole.Core/Services/Abstraction/IHoldLock.cs create mode 100644 src/Pole.Core/Services/Abstraction/ILocalUID.cs create mode 100644 src/Pole.Core/Services/Abstraction/ILock.cs create mode 100644 src/Pole.Core/Services/Abstraction/IUtcUID.cs create mode 100644 src/Pole.Core/Services/Abstraction/IWeightHoldLock.cs create mode 100644 src/Pole.Core/Services/HoldLockGrain.cs create mode 100644 src/Pole.Core/Services/LocalUIDGrain.cs create mode 100644 src/Pole.Core/Services/LockGrain.cs create mode 100644 src/Pole.Core/Services/UtcUIDGrain.cs create mode 100644 src/Pole.Core/Services/WeightHoldLock.cs create mode 100644 src/Pole.Core/Utils/AssemblyHelper.cs create mode 100644 src/Pole.Core/Utils/ConsistentHash.cs create mode 100644 src/Pole.Core/Utils/Emit/SwitchMethodEmit.cs create mode 100644 src/Pole.Core/Utils/MurmurHash2.cs create mode 100644 src/Pole.Core/Utils/PooledMemoryStream.cs create mode 100644 src/Pole.EventBus.Rabbitmq/Client/ConnectionWrapper.cs create mode 100644 src/Pole.EventBus.Rabbitmq/Client/IRabbitMQClient.cs create mode 100644 src/Pole.EventBus.Rabbitmq/Client/ModelPooledObjectPolicy.cs create mode 100644 src/Pole.EventBus.Rabbitmq/Client/ModelWrapper.cs create mode 100644 src/Pole.EventBus.Rabbitmq/Client/RabbitMQClient.cs create mode 100644 src/Pole.EventBus.Rabbitmq/Configuration/ConsumerOptions.cs create mode 100644 src/Pole.EventBus.Rabbitmq/Configuration/RabbitOptions.cs create mode 100644 src/Pole.EventBus.Rabbitmq/Consumer/ConsumerManager.cs create mode 100644 src/Pole.EventBus.Rabbitmq/Consumer/ConsumerRunner.cs create mode 100644 src/Pole.EventBus.Rabbitmq/Consumer/RabbitConsumer.cs create mode 100644 src/Pole.EventBus.Rabbitmq/Core/Attributes/ProducerAttribute.cs create mode 100644 src/Pole.EventBus.Rabbitmq/Core/Client/ConnectionWrapper.cs create mode 100644 src/Pole.EventBus.Rabbitmq/Core/Client/IRabbitMQClient.cs create mode 100644 src/Pole.EventBus.Rabbitmq/Core/Client/ModelPooledObjectPolicy.cs create mode 100644 src/Pole.EventBus.Rabbitmq/Core/Client/ModelWrapper.cs create mode 100644 src/Pole.EventBus.Rabbitmq/Core/Client/RabbitMQClient.cs create mode 100644 src/Pole.EventBus.Rabbitmq/Core/Configuration/ConsumerOptions.cs create mode 100644 src/Pole.EventBus.Rabbitmq/Core/Configuration/RabbitOptions.cs create mode 100644 src/Pole.EventBus.Rabbitmq/Core/Consumer/ConsumerManager.cs create mode 100644 src/Pole.EventBus.Rabbitmq/Core/Consumer/ConsumerRunner.cs create mode 100644 src/Pole.EventBus.Rabbitmq/Core/Consumer/RabbitConsumer.cs create mode 100644 src/Pole.EventBus.Rabbitmq/Core/EventBusContainer.cs create mode 100644 src/Pole.EventBus.Rabbitmq/Core/EventBusRepeatException.cs create mode 100644 src/Pole.EventBus.Rabbitmq/Core/IRabbitEventBusContainer.cs create mode 100644 src/Pole.EventBus.Rabbitmq/Core/QueueInfo.cs create mode 100644 src/Pole.EventBus.Rabbitmq/Core/RabbitEventBus.cs create mode 100644 src/Pole.EventBus.Rabbitmq/Extensions.cs create mode 100644 src/Pole.EventBus.Rabbitmq/Pole.EventBus.Rabbitmq.csproj create mode 100644 src/Pole.EventBus.Rabbitmq/Producer/RabbitProducer.cs diff --git a/Pole.sln b/Pole.sln index 9f4740c..8f3d917 100644 --- a/Pole.sln +++ b/Pole.sln @@ -45,6 +45,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "integrationEvents", "integr EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Product.IntegrationEvents", "samples\intergrationEvents\Product.IntegrationEvents\Product.IntegrationEvents.csproj", "{9C0DFC90-1AF9-424A-B5FB-2A7C3611970C}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Pole.EventBus.Rabbitmq", "src\Pole.EventBus.Rabbitmq\Pole.EventBus.Rabbitmq.csproj", "{BDF62A19-FFBD-4EE1-A07A-68472E680A95}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -111,6 +113,10 @@ Global {9C0DFC90-1AF9-424A-B5FB-2A7C3611970C}.Debug|Any CPU.Build.0 = Debug|Any CPU {9C0DFC90-1AF9-424A-B5FB-2A7C3611970C}.Release|Any CPU.ActiveCfg = Release|Any CPU {9C0DFC90-1AF9-424A-B5FB-2A7C3611970C}.Release|Any CPU.Build.0 = Release|Any CPU + {BDF62A19-FFBD-4EE1-A07A-68472E680A95}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {BDF62A19-FFBD-4EE1-A07A-68472E680A95}.Debug|Any CPU.Build.0 = Debug|Any CPU + {BDF62A19-FFBD-4EE1-A07A-68472E680A95}.Release|Any CPU.ActiveCfg = Release|Any CPU + {BDF62A19-FFBD-4EE1-A07A-68472E680A95}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -134,6 +140,7 @@ Global {F65858EC-C34F-4121-BEC5-4E20DEA74A0A} = {475116FC-DEEC-4255-94E4-AE7B8C85038D} {74422E64-29FE-4287-A86E-741D1DFF6698} = {4A0FB696-EC29-4A5F-B40B-A6FC56001ADB} {9C0DFC90-1AF9-424A-B5FB-2A7C3611970C} = {74422E64-29FE-4287-A86E-741D1DFF6698} + {BDF62A19-FFBD-4EE1-A07A-68472E680A95} = {9932C965-8B38-4F70-9E43-86DC56860E2B} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {DB0775A3-F293-4043-ADB7-72BAC081E87E} diff --git a/src/Pole.Core/Abstraction/IGrainID.cs b/src/Pole.Core/Abstraction/IGrainID.cs new file mode 100644 index 0000000..b823f9a --- /dev/null +++ b/src/Pole.Core/Abstraction/IGrainID.cs @@ -0,0 +1,9 @@ +using System; + +namespace Ray.Core.Abstractions +{ + public interface IGrainID + { + Type GrainType { get; } + } +} diff --git a/src/Pole.Core/Abstraction/IObserverUnit.cs b/src/Pole.Core/Abstraction/IObserverUnit.cs new file mode 100644 index 0000000..ccd188e --- /dev/null +++ b/src/Pole.Core/Abstraction/IObserverUnit.cs @@ -0,0 +1,26 @@ +using System; +using System.Collections.Generic; +using System.Threading.Tasks; + +namespace Ray.Core.Abstractions +{ + public interface IObserverUnit : IGrainID + { + /// + /// 获取所有监听者分组 + /// + /// + List GetGroups(); + Task GetAndSaveVersion(PrimaryKey primaryKey, long srcVersion); + /// + /// 重置Grain + /// + /// 重置Grain + /// + Task Reset(PrimaryKey primaryKey); + List> GetEventHandlers(string observerGroup); + List> GetAllEventHandlers(); + List, Task>> GetBatchEventHandlers(string observerGroup); + List, Task>> GetAllBatchEventHandlers(); + } +} diff --git a/src/Pole.Core/Abstraction/IObserverUnitContainer.cs b/src/Pole.Core/Abstraction/IObserverUnitContainer.cs new file mode 100644 index 0000000..054bb7e --- /dev/null +++ b/src/Pole.Core/Abstraction/IObserverUnitContainer.cs @@ -0,0 +1,11 @@ +using System; + +namespace Ray.Core.Abstractions +{ + public interface IObserverUnitContainer + { + IObserverUnit GetUnit(Type grainType); + object GetUnit(Type grainType); + void Register(IGrainID followUnit); + } +} diff --git a/src/Pole.Core/Channels/Abstractions/IBaseMpscChannel.cs b/src/Pole.Core/Channels/Abstractions/IBaseMpscChannel.cs new file mode 100644 index 0000000..5b0d1c4 --- /dev/null +++ b/src/Pole.Core/Channels/Abstractions/IBaseMpscChannel.cs @@ -0,0 +1,28 @@ +using System.Threading.Tasks; + +namespace Ray.Core.Channels +{ + public interface IBaseMpscChannel + { + /// + /// 是否已经完成 + /// + bool IsComplete { get; } + /// + /// 是否是子级channel + /// + bool IsChildren { get; set; } + /// + /// 把一个mpscchannel关联到另外一个mpscchannel,只要有消息进入,所有关联的channel都会顺序的进行消息检查和处理 + /// + /// + void JoinConsumerSequence(IBaseMpscChannel channel); + /// + /// 等待消息写入 + /// + /// + Task WaitToReadAsync(); + Task ManualConsume(); + void Complete(); + } +} diff --git a/src/Pole.Core/Channels/Abstractions/IMpscChannel.cs b/src/Pole.Core/Channels/Abstractions/IMpscChannel.cs new file mode 100644 index 0000000..a0aeb7f --- /dev/null +++ b/src/Pole.Core/Channels/Abstractions/IMpscChannel.cs @@ -0,0 +1,14 @@ +using System; +using System.Collections.Generic; +using System.Threading.Tasks; + +namespace Ray.Core.Channels +{ + public interface IMpscChannel : IBaseMpscChannel + { + void BindConsumer(Func, Task> consumer); + void BindConsumer(Func, Task> consumer, int maxBatchSize, int maxMillisecondsDelay); + void Config(int maxBatchSize, int maxMillisecondsDelay); + ValueTask WriteAsync(T data); + } +} diff --git a/src/Pole.Core/Channels/AsyncInputEvent.cs b/src/Pole.Core/Channels/AsyncInputEvent.cs new file mode 100644 index 0000000..bf8811b --- /dev/null +++ b/src/Pole.Core/Channels/AsyncInputEvent.cs @@ -0,0 +1,14 @@ +using System.Threading.Tasks; + +namespace Ray.Core.Channels +{ + public class AsyncInputEvent + { + public AsyncInputEvent(Input data) + { + Value = data; + } + public TaskCompletionSource TaskSource { get; } = new TaskCompletionSource(); + public Input Value { get; set; } + } +} diff --git a/src/Pole.Core/Channels/ChannelOptions.cs b/src/Pole.Core/Channels/ChannelOptions.cs new file mode 100644 index 0000000..9c62f0a --- /dev/null +++ b/src/Pole.Core/Channels/ChannelOptions.cs @@ -0,0 +1,14 @@ +namespace Ray.Core.Channels +{ + public class ChannelOptions + { + /// + /// 批量数据处理每次处理的最大数据量 + /// + public int MaxBatchSize { get; set; } = 100000; + /// + /// 批量数据接收的最大延时 + /// + public int MaxMillisecondsDelay { get; set; } = 1000; + } +} diff --git a/src/Pole.Core/Channels/MpscChannel.cs b/src/Pole.Core/Channels/MpscChannel.cs new file mode 100644 index 0000000..fcc8155 --- /dev/null +++ b/src/Pole.Core/Channels/MpscChannel.cs @@ -0,0 +1,164 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using System.Threading.Tasks.Dataflow; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; + +namespace Ray.Core.Channels +{ + /// + /// multi producter single consumer channel + /// + /// data type produced by producer + public class MpscChannel : IMpscChannel + { + readonly BufferBlock buffer = new BufferBlock(); + private Func, Task> consumer; + readonly List consumerSequence = new List(); + private Task waitToReadTask; + readonly ILogger logger; + /// + /// 是否在自动消费中 + /// + private int _autoConsuming = 0; + public MpscChannel(ILogger> logger, IOptions options) + { + this.logger = logger; + MaxBatchSize = options.Value.MaxBatchSize; + MaxMillisecondsDelay = options.Value.MaxMillisecondsDelay; + } + /// + /// 批量数据处理每次处理的最大数据量 + /// + public int MaxBatchSize { get; set; } + /// + /// 批量数据接收的最大延时 + /// + public int MaxMillisecondsDelay { get; set; } + public bool IsComplete { get; private set; } + public bool IsChildren { get; set; } + + public void BindConsumer(Func, Task> consumer) + { + if (this.consumer is null) + this.consumer = consumer; + else + throw new RebindConsumerException(GetType().Name); + } + public void BindConsumer(Func, Task> consumer, int maxBatchSize, int maxMillisecondsDelay) + { + if (this.consumer is null) + { + this.consumer = consumer; + MaxBatchSize = maxBatchSize; + MaxMillisecondsDelay = maxMillisecondsDelay; + } + else + throw new RebindConsumerException(GetType().Name); + } + public void Config(int maxBatchSize, int maxMillisecondsDelay) + { + MaxBatchSize = maxBatchSize; + MaxMillisecondsDelay = maxMillisecondsDelay; + } + public async ValueTask WriteAsync(T data) + { + if (consumer is null) + throw new NoBindConsumerException(GetType().Name); + if (!IsChildren && _autoConsuming == 0) + ActiveAutoConsumer(); + if (!buffer.Post(data)) + return await buffer.SendAsync(data); + return true; + } + private void ActiveAutoConsumer() + { + if (!IsChildren && _autoConsuming == 0) + ThreadPool.QueueUserWorkItem(ActiveConsumer); + async void ActiveConsumer(object state) + { + if (Interlocked.CompareExchange(ref _autoConsuming, 1, 0) == 0) + { + try + { + while (await WaitToReadAsync()) + { + try + { + await ManualConsume(); + } + catch (Exception ex) + { + logger.LogError(ex, ex.Message); + } + } + } + finally + { + Interlocked.Exchange(ref _autoConsuming, 0); + } + } + } + } + public void JoinConsumerSequence(IBaseMpscChannel channel) + { + if (consumerSequence.IndexOf(channel) == -1) + { + channel.IsChildren = true; + consumerSequence.Add(channel); + } + } + public async Task ManualConsume() + { + if (waitToReadTask.IsCompletedSuccessfully && waitToReadTask.Result) + { + var dataList = new List(); + var startTime = DateTimeOffset.UtcNow; + while (buffer.TryReceive(out var value)) + { + dataList.Add(value); + if (dataList.Count > MaxBatchSize) + { + break; + } + else if ((DateTimeOffset.UtcNow - startTime).TotalMilliseconds > MaxMillisecondsDelay) + { + break; + } + } + if (dataList.Count > 0) + await consumer(dataList); + } + foreach (var joinConsumer in consumerSequence) + { + await joinConsumer.ManualConsume(); + } + } + public async Task WaitToReadAsync() + { + waitToReadTask = buffer.OutputAvailableAsync(); + if (consumerSequence.Count == 0) + { + return await waitToReadTask; + } + else + { + var taskList = consumerSequence.Select(c => c.WaitToReadAsync()).ToList(); + taskList.Add(waitToReadTask); + return await await Task.WhenAny(taskList); + } + } + public void Complete() + { + IsComplete = true; + foreach (var joinConsumer in consumerSequence) + { + joinConsumer.Complete(); + } + buffer.Complete(); + } + } +} diff --git a/src/Pole.Core/Channels/NoBindConsumerException.cs b/src/Pole.Core/Channels/NoBindConsumerException.cs new file mode 100644 index 0000000..f643c75 --- /dev/null +++ b/src/Pole.Core/Channels/NoBindConsumerException.cs @@ -0,0 +1,11 @@ +using System; + +namespace Ray.Core.Channels +{ + public class NoBindConsumerException : Exception + { + public NoBindConsumerException(string message) : base(message) + { + } + } +} diff --git a/src/Pole.Core/Channels/RebindConsumerException.cs b/src/Pole.Core/Channels/RebindConsumerException.cs new file mode 100644 index 0000000..d6da90d --- /dev/null +++ b/src/Pole.Core/Channels/RebindConsumerException.cs @@ -0,0 +1,11 @@ +using System; + +namespace Ray.Core.Channels +{ + public class RebindConsumerException : Exception + { + public RebindConsumerException(string message) : base(message) + { + } + } +} diff --git a/src/Pole.Core/EventBus/Consumer.cs b/src/Pole.Core/EventBus/Consumer.cs new file mode 100644 index 0000000..4d1fa15 --- /dev/null +++ b/src/Pole.Core/EventBus/Consumer.cs @@ -0,0 +1,33 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; + +namespace Ray.Core.EventBus +{ + public abstract class Consumer : IConsumer + { + readonly List> eventHandlers; + readonly List, Task>> batchEventHandlers; + public Consumer( + List> eventHandlers, + List, Task>> batchEventHandlers) + { + this.eventHandlers = eventHandlers; + this.batchEventHandlers = batchEventHandlers; + } + public void AddHandler(Func func) + { + eventHandlers.Add(func); + } + public Task Notice(byte[] bytes) + { + return Task.WhenAll(eventHandlers.Select(func => func(bytes))); + } + + public Task Notice(List list) + { + return Task.WhenAll(batchEventHandlers.Select(func => func(list))); + } + } +} diff --git a/src/Pole.Core/EventBus/IConsumer.cs b/src/Pole.Core/EventBus/IConsumer.cs new file mode 100644 index 0000000..7ea0b2a --- /dev/null +++ b/src/Pole.Core/EventBus/IConsumer.cs @@ -0,0 +1,11 @@ +using System.Collections.Generic; +using System.Threading.Tasks; + +namespace Ray.Core.EventBus +{ + public interface IConsumer + { + Task Notice(byte[] bytes); + Task Notice(List list); + } +} diff --git a/src/Pole.Core/EventBus/IConsumerContainer.cs b/src/Pole.Core/EventBus/IConsumerContainer.cs new file mode 100644 index 0000000..fa7d524 --- /dev/null +++ b/src/Pole.Core/EventBus/IConsumerContainer.cs @@ -0,0 +1,9 @@ +using System.Collections.Generic; + +namespace Ray.Core.EventBus +{ + public interface IConsumerContainer + { + List GetConsumers(); + } +} diff --git a/src/Pole.Core/EventBus/IProducer.cs b/src/Pole.Core/EventBus/IProducer.cs new file mode 100644 index 0000000..b8e2c15 --- /dev/null +++ b/src/Pole.Core/EventBus/IProducer.cs @@ -0,0 +1,9 @@ +using System.Threading.Tasks; + +namespace Ray.Core.EventBus +{ + public interface IProducer + { + ValueTask Publish(byte[] bytes, string hashKey); + } +} diff --git a/src/Pole.Core/EventBus/IProducerContainer.cs b/src/Pole.Core/EventBus/IProducerContainer.cs new file mode 100644 index 0000000..4d25e74 --- /dev/null +++ b/src/Pole.Core/EventBus/IProducerContainer.cs @@ -0,0 +1,11 @@ +using System; +using System.Threading.Tasks; + +namespace Ray.Core.EventBus +{ + public interface IProducerContainer + { + ValueTask GetProducer(); + ValueTask GetProducer(Type type); + } +} diff --git a/src/Pole.Core/Exceptions/BeginTxTimeoutException.cs b/src/Pole.Core/Exceptions/BeginTxTimeoutException.cs new file mode 100644 index 0000000..9f73444 --- /dev/null +++ b/src/Pole.Core/Exceptions/BeginTxTimeoutException.cs @@ -0,0 +1,12 @@ +using System; + +namespace Ray.Core.Exceptions +{ + public class BeginTxTimeoutException : Exception + { + public BeginTxTimeoutException(string stateId, long transactionId, Type type) : + base($"Grain type {type.FullName} with grainId {stateId} and transactionId {transactionId}") + { + } + } +} diff --git a/src/Pole.Core/Exceptions/ChannelUnavailabilityException.cs b/src/Pole.Core/Exceptions/ChannelUnavailabilityException.cs new file mode 100644 index 0000000..a166cf3 --- /dev/null +++ b/src/Pole.Core/Exceptions/ChannelUnavailabilityException.cs @@ -0,0 +1,11 @@ +using System; + +namespace Ray.Core.Exceptions +{ + public class ChannelUnavailabilityException : Exception + { + public ChannelUnavailabilityException(string id, Type grainType) : base($"Channel unavailability,type {grainType.FullName} with id {id}") + { + } + } +} diff --git a/src/Pole.Core/Exceptions/EventBusRepeatBindingProducerException.cs b/src/Pole.Core/Exceptions/EventBusRepeatBindingProducerException.cs new file mode 100644 index 0000000..755fc71 --- /dev/null +++ b/src/Pole.Core/Exceptions/EventBusRepeatBindingProducerException.cs @@ -0,0 +1,11 @@ +using System; + +namespace Ray.Core.Exceptions +{ + public class EventBusRepeatBindingProducerException : Exception + { + public EventBusRepeatBindingProducerException(string name) : base(name) + { + } + } +} diff --git a/src/Pole.Core/Exceptions/EventIsClearedException.cs b/src/Pole.Core/Exceptions/EventIsClearedException.cs new file mode 100644 index 0000000..8f63b91 --- /dev/null +++ b/src/Pole.Core/Exceptions/EventIsClearedException.cs @@ -0,0 +1,11 @@ +using System; + +namespace Ray.Core.Exceptions +{ + public class EventIsClearedException : Exception + { + public EventIsClearedException(string eventType, string eventJsonString, long archiveIndex) : base($"eventType:{eventType},event:{eventJsonString},archive index:{archiveIndex}") + { + } + } +} diff --git a/src/Pole.Core/Exceptions/EventVersionUnorderedException.cs b/src/Pole.Core/Exceptions/EventVersionUnorderedException.cs new file mode 100644 index 0000000..3f3caeb --- /dev/null +++ b/src/Pole.Core/Exceptions/EventVersionUnorderedException.cs @@ -0,0 +1,12 @@ +using System; + +namespace Ray.Core.Exceptions +{ + public class EventVersionUnorderedException : Exception + { + public EventVersionUnorderedException(string id, Type type, long eventVersion, long stateVersion) : + base($"Event version and state version do not match of Grain type {type.FullName} and Id {id}.There state version are {stateVersion} and event version are {eventVersion}") + { + } + } +} diff --git a/src/Pole.Core/Exceptions/ObserverNotCompletedException.cs b/src/Pole.Core/Exceptions/ObserverNotCompletedException.cs new file mode 100644 index 0000000..9049f24 --- /dev/null +++ b/src/Pole.Core/Exceptions/ObserverNotCompletedException.cs @@ -0,0 +1,12 @@ +using System; + +namespace Ray.Core.Exceptions +{ + public class ObserverNotCompletedException : Exception + { + public ObserverNotCompletedException(string typeName, string stateId) : base($"{typeName} with id={stateId}") + { + + } + } +} diff --git a/src/Pole.Core/Exceptions/ObserverUnitRepeatedException.cs b/src/Pole.Core/Exceptions/ObserverUnitRepeatedException.cs new file mode 100644 index 0000000..112353f --- /dev/null +++ b/src/Pole.Core/Exceptions/ObserverUnitRepeatedException.cs @@ -0,0 +1,11 @@ +using System; + +namespace Ray.Core.Exceptions +{ + public class ObserverUnitRepeatedException : Exception + { + public ObserverUnitRepeatedException(string name) : base(name) + { + } + } +} diff --git a/src/Pole.Core/Exceptions/PrimaryKeyTypeException.cs b/src/Pole.Core/Exceptions/PrimaryKeyTypeException.cs new file mode 100644 index 0000000..6f24284 --- /dev/null +++ b/src/Pole.Core/Exceptions/PrimaryKeyTypeException.cs @@ -0,0 +1,12 @@ +using System; + +namespace Ray.Core.Exceptions +{ + public class PrimaryKeyTypeException : Exception + { + public PrimaryKeyTypeException(string name) : base(name) + { + + } + } +} diff --git a/src/Pole.Core/Exceptions/RepeatedTxException.cs b/src/Pole.Core/Exceptions/RepeatedTxException.cs new file mode 100644 index 0000000..6f2c215 --- /dev/null +++ b/src/Pole.Core/Exceptions/RepeatedTxException.cs @@ -0,0 +1,12 @@ +using System; + +namespace Ray.Core.Exceptions +{ + public class RepeatedTxException : Exception + { + public RepeatedTxException(string stateId, long transactionId, Type type) : + base($"Grain type {type.FullName} with grainId {stateId} and transactionId {transactionId}") + { + } + } +} diff --git a/src/Pole.Core/Exceptions/SnapshotNotSupportTxException.cs b/src/Pole.Core/Exceptions/SnapshotNotSupportTxException.cs new file mode 100644 index 0000000..9e8013f --- /dev/null +++ b/src/Pole.Core/Exceptions/SnapshotNotSupportTxException.cs @@ -0,0 +1,11 @@ +using System; + +namespace Ray.Core.Exceptions +{ + public class SnapshotNotSupportTxException : Exception + { + public SnapshotNotSupportTxException(Type type) : base(type.FullName) + { + } + } +} diff --git a/src/Pole.Core/Exceptions/StateInsecurityException.cs b/src/Pole.Core/Exceptions/StateInsecurityException.cs new file mode 100644 index 0000000..d6b305a --- /dev/null +++ b/src/Pole.Core/Exceptions/StateInsecurityException.cs @@ -0,0 +1,12 @@ +using System; + +namespace Ray.Core.Exceptions +{ + public class StateInsecurityException : Exception + { + public StateInsecurityException(string id, Type grainType, long doingVersion, long stateVersion) : + base($"State insecurity of Grain type {grainType.FullName} and Id {id},Maybe because the previous event failed to execute.There state version are {stateVersion} and doing version are {doingVersion}") + { + } + } +} diff --git a/src/Pole.Core/Exceptions/StateIsOverException.cs b/src/Pole.Core/Exceptions/StateIsOverException.cs new file mode 100644 index 0000000..280c9cd --- /dev/null +++ b/src/Pole.Core/Exceptions/StateIsOverException.cs @@ -0,0 +1,12 @@ +using System; + +namespace Ray.Core.Exceptions +{ + public class StateIsOverException : Exception + { + public StateIsOverException(string id, Type grainType) : + base($"State Is Over of Grain type {grainType.FullName} and Id {id}") + { + } + } +} diff --git a/src/Pole.Core/Exceptions/TxCommitException.cs b/src/Pole.Core/Exceptions/TxCommitException.cs new file mode 100644 index 0000000..083191f --- /dev/null +++ b/src/Pole.Core/Exceptions/TxCommitException.cs @@ -0,0 +1,8 @@ +using System; + +namespace Ray.Core.Exceptions +{ + public class TxCommitException : Exception + { + } +} diff --git a/src/Pole.Core/Exceptions/TxIdException.cs b/src/Pole.Core/Exceptions/TxIdException.cs new file mode 100644 index 0000000..f77847f --- /dev/null +++ b/src/Pole.Core/Exceptions/TxIdException.cs @@ -0,0 +1,8 @@ +using System; + +namespace Ray.Core.Exceptions +{ + public class TxIdException : Exception + { + } +} diff --git a/src/Pole.Core/Exceptions/TxSnapshotException.cs b/src/Pole.Core/Exceptions/TxSnapshotException.cs new file mode 100644 index 0000000..3de4698 --- /dev/null +++ b/src/Pole.Core/Exceptions/TxSnapshotException.cs @@ -0,0 +1,12 @@ +using System; + +namespace Ray.Core.Exceptions +{ + public class TxSnapshotException : Exception + { + public TxSnapshotException(string stateId, long snapshotVersion, long backupSnapshotVersion) : + base($"StateId {stateId} and snapshot version {snapshotVersion} and backup snapshot version {backupSnapshotVersion}") + { + } + } +} diff --git a/src/Pole.Core/Exceptions/TypeCodeRepeatedException.cs b/src/Pole.Core/Exceptions/TypeCodeRepeatedException.cs new file mode 100644 index 0000000..2fd84fa --- /dev/null +++ b/src/Pole.Core/Exceptions/TypeCodeRepeatedException.cs @@ -0,0 +1,11 @@ +using System; + +namespace Ray.Core.Exceptions +{ + public class TypeCodeRepeatedException : Exception + { + public TypeCodeRepeatedException(string typeName, string typeFullName) : base($"Type named {typeName} was repeated of {typeFullName}.") + { + } + } +} diff --git a/src/Pole.Core/Exceptions/UnMatchObserverUnitException.cs b/src/Pole.Core/Exceptions/UnMatchObserverUnitException.cs new file mode 100644 index 0000000..8c67d30 --- /dev/null +++ b/src/Pole.Core/Exceptions/UnMatchObserverUnitException.cs @@ -0,0 +1,11 @@ +using System; + +namespace Ray.Core.Exceptions +{ + public class UnmatchObserverUnitException : Exception + { + public UnmatchObserverUnitException(string grainName, string unitName) : base($"{unitName} and {grainName} do not match") + { + } + } +} diff --git a/src/Pole.Core/Exceptions/UnfindEventHandlerException.cs b/src/Pole.Core/Exceptions/UnfindEventHandlerException.cs new file mode 100644 index 0000000..0cc6df5 --- /dev/null +++ b/src/Pole.Core/Exceptions/UnfindEventHandlerException.cs @@ -0,0 +1,12 @@ +using System; + +namespace Ray.Core.Exceptions +{ + public class UnfindEventHandlerException : Exception + { + public UnfindEventHandlerException(Type eventType) : base(eventType.FullName) + { + + } + } +} diff --git a/src/Pole.Core/Exceptions/UnfindObserverUnitException.cs b/src/Pole.Core/Exceptions/UnfindObserverUnitException.cs new file mode 100644 index 0000000..ecc8a44 --- /dev/null +++ b/src/Pole.Core/Exceptions/UnfindObserverUnitException.cs @@ -0,0 +1,11 @@ +using System; + +namespace Ray.Core.Exceptions +{ + public class UnfindObserverUnitException : Exception + { + public UnfindObserverUnitException(string name) : base(name) + { + } + } +} diff --git a/src/Pole.Core/Exceptions/UnfindSnapshotHandlerException.cs b/src/Pole.Core/Exceptions/UnfindSnapshotHandlerException.cs new file mode 100644 index 0000000..f732d2f --- /dev/null +++ b/src/Pole.Core/Exceptions/UnfindSnapshotHandlerException.cs @@ -0,0 +1,12 @@ +using System; + +namespace Ray.Core.Exceptions +{ + public class UnfindSnapshotHandlerException : Exception + { + public UnfindSnapshotHandlerException(Type grainType) : base(grainType.FullName) + { + + } + } +} diff --git a/src/Pole.Core/Exceptions/UnknowTypeCodeException.cs b/src/Pole.Core/Exceptions/UnknowTypeCodeException.cs new file mode 100644 index 0000000..184e15a --- /dev/null +++ b/src/Pole.Core/Exceptions/UnknowTypeCodeException.cs @@ -0,0 +1,11 @@ +using System; + +namespace Ray.Core.Exceptions +{ + public class UnknowTypeCodeException : Exception + { + public UnknowTypeCodeException(string typeName) : base(typeName) + { + } + } +} diff --git a/src/Pole.Core/Exceptions/UnopenedTransactionException.cs b/src/Pole.Core/Exceptions/UnopenedTransactionException.cs new file mode 100644 index 0000000..97505ad --- /dev/null +++ b/src/Pole.Core/Exceptions/UnopenedTransactionException.cs @@ -0,0 +1,12 @@ +using System; + +namespace Ray.Core.Exceptions +{ + public class UnopenedTransactionException : Exception + { + public UnopenedTransactionException(string id, Type grainType, string methodName) : + base($"Unopened transaction, cannot be invoke {methodName},type {grainType.FullName} with id {id}") + { + } + } +} diff --git a/src/Pole.Core/Observer/Abstraction/Attributes/ObserverAttribute.cs b/src/Pole.Core/Observer/Abstraction/Attributes/ObserverAttribute.cs new file mode 100644 index 0000000..038894d --- /dev/null +++ b/src/Pole.Core/Observer/Abstraction/Attributes/ObserverAttribute.cs @@ -0,0 +1,44 @@ +using System; +using System.Collections.Generic; +using System.Linq; + +namespace Ray.Core.Observer +{ + /// + /// 标记为观察者 + /// + [AttributeUsage(AttributeTargets.Class, AllowMultiple = false)] + public class ObserverAttribute : Attribute + { + /// + /// 事件监听者标记 + /// + /// 监听者分组 + /// 监听者名称(如果是shadow请设置为null) + /// 被监听的Type + /// 监听者的Type + public ObserverAttribute(string group, string name, Type observable, Type observer = default) + { + Group = group; + Name = name; + Observable = observable; + Observer = observer; + } + /// + /// 监听者分组 + /// + public string Group { get; set; } + /// + /// 监听者名称(如果是shadow请设置为null) + /// + public string Name { get; set; } + /// + /// 被监听的Type + /// + public Type Observable { get; set; } + /// + /// 监听者的Type + /// + public Type Observer { get; set; } + } +} diff --git a/src/Pole.Core/Observer/Abstraction/IObserver.cs b/src/Pole.Core/Observer/Abstraction/IObserver.cs new file mode 100644 index 0000000..ed404e1 --- /dev/null +++ b/src/Pole.Core/Observer/Abstraction/IObserver.cs @@ -0,0 +1,17 @@ +using System.Collections.Generic; +using System.Threading.Tasks; +using Orleans.Concurrency; + +namespace Ray.Core.Observer +{ + public interface IObserver : IVersion + { + Task OnNext(Immutable bytes); + Task OnNext(Immutable> items); + /// + /// 重置状态 + /// + /// + Task Reset(); + } +} diff --git a/src/Pole.Core/Observer/Abstraction/IVersion.cs b/src/Pole.Core/Observer/Abstraction/IVersion.cs new file mode 100644 index 0000000..f7f2ace --- /dev/null +++ b/src/Pole.Core/Observer/Abstraction/IVersion.cs @@ -0,0 +1,10 @@ +using System.Threading.Tasks; + +namespace Ray.Core.Observer +{ + public interface IVersion + { + Task GetVersion(); + Task GetAndSaveVersion(long compareVersion); + } +} diff --git a/src/Pole.Core/Pole.Core.csproj b/src/Pole.Core/Pole.Core.csproj index a965846..013f267 100644 --- a/src/Pole.Core/Pole.Core.csproj +++ b/src/Pole.Core/Pole.Core.csproj @@ -1,11 +1,16 @@ - netstandard2.0 + netstandard2.1 + + + + + diff --git a/src/Pole.Core/Services/Abstraction/IHoldLock.cs b/src/Pole.Core/Services/Abstraction/IHoldLock.cs new file mode 100644 index 0000000..a7d6ff2 --- /dev/null +++ b/src/Pole.Core/Services/Abstraction/IHoldLock.cs @@ -0,0 +1,12 @@ +using System.Threading.Tasks; +using Orleans; + +namespace Ray.Core.Services +{ + public interface IHoldLock : IGrainWithStringKey + { + Task<(bool isOk, long lockId)> Lock(int holdingSeconds =30); + Task Hold(long lockId, int holdingSeconds = 30); + Task Unlock(long lockId); + } +} diff --git a/src/Pole.Core/Services/Abstraction/ILocalUID.cs b/src/Pole.Core/Services/Abstraction/ILocalUID.cs new file mode 100644 index 0000000..fc35bf2 --- /dev/null +++ b/src/Pole.Core/Services/Abstraction/ILocalUID.cs @@ -0,0 +1,15 @@ +using System.Threading.Tasks; +using Orleans; +using Orleans.Concurrency; + +namespace Ray.Core.Services +{ + public interface ILocalUID : IGrainWithStringKey + { + /// + /// 通过utc时间生成分布式唯一id + /// + [AlwaysInterleave] + Task NewID(); + } +} diff --git a/src/Pole.Core/Services/Abstraction/ILock.cs b/src/Pole.Core/Services/Abstraction/ILock.cs new file mode 100644 index 0000000..ca9ae7a --- /dev/null +++ b/src/Pole.Core/Services/Abstraction/ILock.cs @@ -0,0 +1,11 @@ +using System.Threading.Tasks; +using Orleans; + +namespace Ray.Core.Services +{ + public interface ILock : IGrainWithStringKey + { + Task Lock(int millisecondsDelay = 0); + Task Unlock(); + } +} diff --git a/src/Pole.Core/Services/Abstraction/IUtcUID.cs b/src/Pole.Core/Services/Abstraction/IUtcUID.cs new file mode 100644 index 0000000..6f15877 --- /dev/null +++ b/src/Pole.Core/Services/Abstraction/IUtcUID.cs @@ -0,0 +1,15 @@ +using Orleans; +using Orleans.Concurrency; +using System.Threading.Tasks; + +namespace Ray.Core.Services +{ + public interface IUtcUID : IGrainWithStringKey + { + /// + /// 通过utc时间生成分布式唯一id + /// + [AlwaysInterleave] + Task NewID(); + } +} diff --git a/src/Pole.Core/Services/Abstraction/IWeightHoldLock.cs b/src/Pole.Core/Services/Abstraction/IWeightHoldLock.cs new file mode 100644 index 0000000..a1bf178 --- /dev/null +++ b/src/Pole.Core/Services/Abstraction/IWeightHoldLock.cs @@ -0,0 +1,13 @@ +using System.Threading.Tasks; +using Orleans; + + +namespace Ray.Core.Services +{ + public interface IWeightHoldLock : IGrainWithStringKey + { + Task<(bool isOk, long lockId, int expectMillisecondDelay)> Lock(int weight, int holdingSeconds = 30); + Task Hold(long lockId, int holdingSeconds = 30); + Task Unlock(long lockId); + } +} diff --git a/src/Pole.Core/Services/HoldLockGrain.cs b/src/Pole.Core/Services/HoldLockGrain.cs new file mode 100644 index 0000000..d6869f6 --- /dev/null +++ b/src/Pole.Core/Services/HoldLockGrain.cs @@ -0,0 +1,48 @@ +using System; +using System.Threading.Tasks; +using Orleans; + +namespace Ray.Core.Services +{ + public class HoldLockGrain : Grain, IHoldLock + { + long lockId = 0; + long expireTime = 0; + public Task<(bool isOk, long lockId)> Lock(int holdingSeconds = 30) + { + var now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); + if (lockId == 0 || now > expireTime) + { + lockId = now; + expireTime = now + holdingSeconds * 1000; + return Task.FromResult((true, now)); + } + else + { + return Task.FromResult((false, (long)0)); + } + } + public Task Hold(long lockId, int holdingSeconds = 30) + { + if (this.lockId == lockId) + { + expireTime = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() + holdingSeconds * 1000; + return Task.FromResult(true); + } + else + { + return Task.FromResult(false); + } + } + + public Task Unlock(long lockId) + { + if (this.lockId == lockId) + { + this.lockId = 0; + expireTime = 0; + } + return Task.CompletedTask; + } + } +} diff --git a/src/Pole.Core/Services/LocalUIDGrain.cs b/src/Pole.Core/Services/LocalUIDGrain.cs new file mode 100644 index 0000000..74144e1 --- /dev/null +++ b/src/Pole.Core/Services/LocalUIDGrain.cs @@ -0,0 +1,59 @@ +using Orleans; +using Orleans.Concurrency; +using System; +using System.Threading; +using System.Threading.Tasks; + +namespace Ray.Core.Services +{ + [Reentrant] + public class LocalUIDGrain : Grain, ILocalUID + { + int start_id = 1; + string start_string; + long start_long; + const int length = 19; + public LocalUIDGrain() + { + start_string = DateTimeOffset.Now.ToString("yyyyMMddHHmmss"); + start_long = long.Parse(start_string); ; + } + public Task NewID() + { + return Task.FromResult(GenerateUtcId()); + string GenerateUtcId() + { + var now_string = DateTimeOffset.Now.ToString("yyyyMMddHHmmss"); + var now_Long = long.Parse(now_string); + if (now_Long > start_long) + { + Interlocked.Exchange(ref start_string, now_string); + Interlocked.Exchange(ref start_long, now_Long); + Interlocked.Exchange(ref start_id, 0); + } + var builder = new Span(new char[length]); + var newTimes = Interlocked.Increment(ref start_id); + if (newTimes <= 99999) + { + start_string.AsSpan().CopyTo(builder); + + var timesString = newTimes.ToString(); + for (int i = start_string.Length; i < length - timesString.Length; i++) + { + builder[i] = '0'; + } + var span = length - timesString.Length; + for (int i = span; i < length; i++) + { + builder[i] = timesString[i - span]; + } + return builder.ToString(); + } + else + { + return GenerateUtcId(); + } + } + } + } +} diff --git a/src/Pole.Core/Services/LockGrain.cs b/src/Pole.Core/Services/LockGrain.cs new file mode 100644 index 0000000..198f0c4 --- /dev/null +++ b/src/Pole.Core/Services/LockGrain.cs @@ -0,0 +1,42 @@ +using System.Threading; +using System.Threading.Tasks; +using Orleans; +using Orleans.Concurrency; + +namespace Ray.Core.Services +{ + [Reentrant] + public class LockGrain : Grain, ILock + { + int locked = 0; + TaskCompletionSource taskSource; + public async Task Lock(int millisecondsDelay = 0) + { + if (locked == 0) + { + locked = 1; + return true; + } + else + { + taskSource = new TaskCompletionSource(); + if (millisecondsDelay != 0) + { + using var tc = new CancellationTokenSource(millisecondsDelay); + tc.Token.Register(() => + { + taskSource.TrySetCanceled(); + }); + } + return await taskSource.Task; + } + } + + public Task Unlock() + { + locked = 0; + taskSource.TrySetResult(true); + return Task.CompletedTask; + } + } +} diff --git a/src/Pole.Core/Services/UtcUIDGrain.cs b/src/Pole.Core/Services/UtcUIDGrain.cs new file mode 100644 index 0000000..58d465a --- /dev/null +++ b/src/Pole.Core/Services/UtcUIDGrain.cs @@ -0,0 +1,59 @@ +using Orleans; +using Orleans.Concurrency; +using System; +using System.Threading; +using System.Threading.Tasks; + +namespace Ray.Core.Services +{ + [Reentrant] + public class UtcUIDGrain : Grain, IUtcUID + { + int start_id = 1; + string start_string; + long start_long; + const int length = 19; + public UtcUIDGrain() + { + start_string = DateTimeOffset.UtcNow.ToString("yyyyMMddHHmmss"); + start_long = long.Parse(start_string); + } + public Task NewID() + { + return Task.FromResult(GenerateUtcId()); + string GenerateUtcId() + { + var now_string = DateTimeOffset.UtcNow.ToString("yyyyMMddHHmmss"); + var now_Long = long.Parse(now_string); + if (now_Long > start_long) + { + Interlocked.Exchange(ref start_string, now_string); + Interlocked.Exchange(ref start_long, now_Long); + Interlocked.Exchange(ref start_id, 0); + } + var builder = new Span(new char[length]); + var newTimes = Interlocked.Increment(ref start_id); + if (newTimes <= 99999) + { + start_string.AsSpan().CopyTo(builder); + + var timesString = newTimes.ToString(); + for (int i = start_string.Length; i < length - timesString.Length; i++) + { + builder[i] = '0'; + } + var span = length - timesString.Length; + for (int i = span; i < length; i++) + { + builder[i] = timesString[i - span]; + } + return builder.ToString(); + } + else + { + return GenerateUtcId(); + } + } + } + } +} diff --git a/src/Pole.Core/Services/WeightHoldLock.cs b/src/Pole.Core/Services/WeightHoldLock.cs new file mode 100644 index 0000000..7c17930 --- /dev/null +++ b/src/Pole.Core/Services/WeightHoldLock.cs @@ -0,0 +1,57 @@ +using System; +using System.Threading.Tasks; +using Orleans; + +namespace Ray.Core.Services +{ + public class WeightHoldLock : Grain, IWeightHoldLock + { + long lockId = 0; + long expireTime = 0; + int currentWeight = 0; + int maxWaitWeight = -1; + + public Task Hold(long lockId, int holdingSeconds = 30) + { + if (this.lockId == lockId && currentWeight >= maxWaitWeight) + { + expireTime = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() + holdingSeconds * 1000; + return Task.FromResult(true); + } + else + { + return Task.FromResult(false); + } + } + + public Task<(bool isOk, long lockId, int expectMillisecondDelay)> Lock(int weight, int holdingSeconds = 30) + { + var now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(); + if (lockId == 0 || now > expireTime) + { + lockId = now; + currentWeight = weight; + maxWaitWeight = -1; + expireTime = now + holdingSeconds * 1000; + return Task.FromResult((true, now, 0)); + } + if (weight >= maxWaitWeight && weight > currentWeight) + { + maxWaitWeight = weight; + return Task.FromResult((false, (long)0, (int)(expireTime - now))); + } + return Task.FromResult((false, (long)0, 0)); + } + + public Task Unlock(long lockId) + { + if (this.lockId == lockId) + { + this.lockId = 0; + currentWeight = 0; + expireTime = 0; + } + return Task.CompletedTask; + } + } +} diff --git a/src/Pole.Core/Utils/AssemblyHelper.cs b/src/Pole.Core/Utils/AssemblyHelper.cs new file mode 100644 index 0000000..75aba1f --- /dev/null +++ b/src/Pole.Core/Utils/AssemblyHelper.cs @@ -0,0 +1,31 @@ +using Microsoft.Extensions.DependencyModel; +using Microsoft.Extensions.Logging; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Reflection; +using System.Runtime.Loader; + +namespace Ray.Core.Utils +{ + public class AssemblyHelper + { + public static IList GetAssemblies(ILogger logger = default) + { + var libs = DependencyContext.Default.CompileLibraries.Where(lib => !lib.Serviceable); + return libs.Select(lib => + { + try + { + return AssemblyLoadContext.Default.LoadFromAssemblyName(new AssemblyName(lib.Name)); + } + catch (Exception ex) + { + if (logger != default) + logger.LogWarning(ex, ex.Message); + return default; + } + }).Where(assembly => assembly != default).ToList(); + } + } +} diff --git a/src/Pole.Core/Utils/ConsistentHash.cs b/src/Pole.Core/Utils/ConsistentHash.cs new file mode 100644 index 0000000..03c50e6 --- /dev/null +++ b/src/Pole.Core/Utils/ConsistentHash.cs @@ -0,0 +1,114 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Runtime.CompilerServices; +using System.Text; + +namespace Ray.Core.Utils +{ + public class ConsistentHash + { + readonly SortedDictionary circle = new SortedDictionary(); + int _replicate = 200; //default _replicate count + int[] ayKeys = null; //cache the ordered keys for better performance + + //it's better you override the GetHashCode() of T. + //we will use GetHashCode() to identify different node. + public ConsistentHash(IEnumerable nodes) + { + Init(nodes, _replicate); + } + + public ConsistentHash(IEnumerable nodes, int replicate) + { + Init(nodes, replicate); + } + private void Init(IEnumerable nodes, int replicate) + { + _replicate = replicate; + + foreach (string node in nodes) + { + Add(node, false); + } + ayKeys = circle.Keys.ToArray(); + } + + public void Add(string node) + { + Add(node, true); + } + + public void Add(string node, bool updateKeyArray) + { + for (int i = 0; i < _replicate; i++) + { + int hash = BetterHash(node + i); + circle[hash] = node; + } + + if (updateKeyArray) + { + ayKeys = circle.Keys.ToArray(); + } + } + + public void Remove(string node) + { + for (int i = 0; i < _replicate; i++) + { + int hash = BetterHash(node + i); + if (!circle.Remove(hash)) + { + throw new Exception("can not remove a node that not added"); + } + } + ayKeys = circle.Keys.ToArray(); + } + public string GetNode(string key) + { + int first = First_ge(ayKeys, BetterHash(key)); + return circle[ayKeys[first]]; + } + //return the index of first item that >= val. + //if not exist, return 0; + //ay should be ordered array. + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private static int First_ge(int[] ay, int val) + { + int begin = 0; + int end = ay.Length - 1; + + if (ay[end] < val || ay[0] > val) + { + return 0; + } + + while (end - begin > 1) + { + int mid = (end + begin) / 2; + if (ay[mid] >= val) + { + end = mid; + } + else + { + begin = mid; + } + } + + if (ay[begin] > val || ay[end] < val) + { + throw new Exception("should not happen"); + } + + return end; + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + public static int BetterHash(string key) + { + return (int)MurmurHash2.Hash(Encoding.UTF8.GetBytes(key)); + } + } +} diff --git a/src/Pole.Core/Utils/Emit/SwitchMethodEmit.cs b/src/Pole.Core/Utils/Emit/SwitchMethodEmit.cs new file mode 100644 index 0000000..9645815 --- /dev/null +++ b/src/Pole.Core/Utils/Emit/SwitchMethodEmit.cs @@ -0,0 +1,37 @@ +using System; +using System.Reflection; +using System.Reflection.Emit; + +namespace Ray.Core.Utils.Emit +{ + /// + /// 用来生成模式匹配方法调用的方法信息 + /// + public class SwitchMethodEmit + { + /// + /// 方法 + /// + public MethodInfo Mehod { get; set; } + /// + /// 匹配的类型 + /// + public Type CaseType { get; set; } + /// + /// 局部变量 + /// + public LocalBuilder DeclareLocal { get; set; } + /// + /// 方法调用Lable + /// + public Label Lable { get; set; } + /// + /// 方法的参数 + /// + public ParameterInfo[] Parameters { get; set; } + /// + /// 方法在类中的顺序 + /// + public int Index { get; set; } + } +} diff --git a/src/Pole.Core/Utils/MurmurHash2.cs b/src/Pole.Core/Utils/MurmurHash2.cs new file mode 100644 index 0000000..0563565 --- /dev/null +++ b/src/Pole.Core/Utils/MurmurHash2.cs @@ -0,0 +1,73 @@ +using System.Runtime.InteropServices; + +namespace Ray.Core.Utils +{ + public class MurmurHash2 + { + public static uint Hash(byte[] data) + { + return Hash(data, 0xc58f1a7b); + } + const uint m = 0x5bd1e995; + const int r = 24; + + [StructLayout(LayoutKind.Explicit)] + struct BytetouintConverter + { + [FieldOffset(0)] + public byte[] Bytes; + + [FieldOffset(0)] + public uint[] UInts; + } + + public static uint Hash(byte[] data, uint seed) + { + int length = data.Length; + if (length == 0) + return 0; + uint h = seed ^ (uint)length; + int currentIndex = 0; + uint[] hackArray = new BytetouintConverter { Bytes = data }.UInts; + while (length >= 4) + { + uint k = hackArray[currentIndex++]; + k *= m; + k ^= k >> r; + k *= m; + + h *= m; + h ^= k; + length -= 4; + } + currentIndex *= 4; // fix the length + switch (length) + { + case 3: + h ^= (ushort)(data[currentIndex++] | data[currentIndex++] << 8); + h ^= (uint)data[currentIndex] << 16; + h *= m; + break; + case 2: + h ^= (ushort)(data[currentIndex++] | data[currentIndex] << 8); + h *= m; + break; + case 1: + h ^= data[currentIndex]; + h *= m; + break; + default: + break; + } + + // Do a few final mixes of the hash to ensure the last few + // bytes are well-incorporated. + + h ^= h >> 13; + h *= m; + h ^= h >> 15; + return h; + } + } + +} diff --git a/src/Pole.Core/Utils/PooledMemoryStream.cs b/src/Pole.Core/Utils/PooledMemoryStream.cs new file mode 100644 index 0000000..ce31a2a --- /dev/null +++ b/src/Pole.Core/Utils/PooledMemoryStream.cs @@ -0,0 +1,201 @@ +using System; +using System.IO; +using System.Buffers; + +namespace Ray.Core.Utils +{ + public class PooledMemoryStream : Stream + { + /// create writable memory stream with default parameters + /// buffer is allocated from ArrayPool.Shared + public PooledMemoryStream() + : this(ArrayPool.Shared) + { + } + /// create writable memory stream with specified ArrayPool + /// buffer is allocated from ArrayPool + public PooledMemoryStream(ArrayPool pool) + : this(pool, 4096) + { + } + /// create writable memory stream with ensuring buffer length + /// buffer is allocated from ArrayPool + public PooledMemoryStream(ArrayPool pool, int capacity) + { + m_Pool = pool; + _currentbuffer = m_Pool.Rent(capacity); + _Length = 0; + _CanWrite = true; + _Position = 0; + } + /// create readonly MemoryStream without buffer copy + /// data will be read from 'data' parameter + public PooledMemoryStream(byte[] data) + { + m_Pool = null; + _currentbuffer = data; + _Length = data.Length; + _CanWrite = false; + } + public override bool CanRead + { + get + { + return true; + } + } + + public override bool CanSeek => true; + + public override bool CanWrite => _CanWrite; + + public override long Length => _Length; + + public override long Position + { + get => _Position; + set + { + _Position = value; + } + } + + public override void Flush() + { + } + + public override int Read(byte[] buffer, int offset, int count) + { + int readlen = count > (int)(_Length - _Position) ? (int)(_Length - _Position) : count; + if (readlen > 0) + { + Buffer.BlockCopy(_currentbuffer + , (int)_Position + , buffer, offset + , readlen) + ; + _Position += readlen; + return readlen; + } + else + { + return 0; + } + } + + public override long Seek(long offset, SeekOrigin origin) + { + long oldValue = _Position; + switch ((int)origin) + { + case (int)SeekOrigin.Begin: + _Position = offset; + break; + case (int)SeekOrigin.End: + _Position = _Length - offset; + break; + case (int)SeekOrigin.Current: + _Position += offset; + break; + default: + throw new InvalidOperationException("unknown SeekOrigin"); + } + if (_Position < 0 || _Position > _Length) + { + _Position = oldValue; + throw new IndexOutOfRangeException(); + } + return _Position; + } + void ReallocateBuffer(int minimumRequired) + { + var tmp = m_Pool.Rent(minimumRequired); + Buffer.BlockCopy(_currentbuffer, 0, tmp, 0, _currentbuffer.Length); + m_Pool.Return(_currentbuffer); + _currentbuffer = tmp; + } + public override void SetLength(long value) + { + if (!_CanWrite) + { + throw new NotSupportedException("stream is readonly"); + } + if (value > int.MaxValue) + { + throw new IndexOutOfRangeException("overflow"); + } + if (value < 0) + { + throw new IndexOutOfRangeException("underflow"); + } + _Length = value; + if (_currentbuffer.Length < _Length) + { + ReallocateBuffer((int)_Length); + } + } + /// write data to stream + /// if stream data length is over int.MaxValue, this method throws IndexOutOfRangeException + public override void Write(byte[] buffer, int offset, int count) + { + if (!_CanWrite) + { + throw new InvalidOperationException("stream is readonly"); + } + long endOffset = _Position + count; + if (endOffset > _currentbuffer.Length) + { + ReallocateBuffer((int)(endOffset) * 2); + } + Buffer.BlockCopy(buffer, offset, + _currentbuffer, (int)_Position, count); + if (endOffset > _Length) + { + _Length = endOffset; + } + _Position = endOffset; + } + + protected override void Dispose(bool disposing) + { + base.Dispose(disposing); + if (m_Pool != null && _currentbuffer != null) + { + m_Pool.Return(_currentbuffer); + _currentbuffer = null; + } + } + /// ensure the buffer size + /// capacity != stream buffer length + public void Reserve(int capacity) + { + if (capacity > _currentbuffer.Length) + { + ReallocateBuffer(capacity); + } + } + + /// Create newly allocated buffer and copy the stream data + public byte[] ToArray() + { + var ret = new byte[_Length]; + Buffer.BlockCopy(_currentbuffer, 0, ret, 0, (int)_Length); + return ret; + } + /// Create ArraySegment for current stream data without allocation buffer + /// After disposing stream, manupilating returned value(read or write) may cause undefined behavior + public ArraySegment ToUnsafeArraySegment() + { + return new ArraySegment(_currentbuffer, 0, (int)_Length); + } + public ReadOnlyMemory ToReadOnlyMemory() + { + return new ReadOnlyMemory(_currentbuffer, 0, (int)_Length); + } + ArrayPool m_Pool; + byte[] _currentbuffer; + readonly bool _CanWrite; + long _Length; + long _Position; + } +} \ No newline at end of file diff --git a/src/Pole.EventBus.Rabbitmq/Client/ConnectionWrapper.cs b/src/Pole.EventBus.Rabbitmq/Client/ConnectionWrapper.cs new file mode 100644 index 0000000..0de42d8 --- /dev/null +++ b/src/Pole.EventBus.Rabbitmq/Client/ConnectionWrapper.cs @@ -0,0 +1,43 @@ +using RabbitMQ.Client; +using System.Collections.Generic; +using System.Threading; + +namespace Ray.EventBus.RabbitMQ +{ + public class ConnectionWrapper + { + private readonly List models = new List(); + private readonly IConnection connection; + readonly SemaphoreSlim semaphoreSlim = new SemaphoreSlim(1); + public ConnectionWrapper( + IConnection connection, + RabbitOptions options) + { + this.connection = connection; + Options = options; + } + public RabbitOptions Options { get; } + public (bool success, ModelWrapper model) Get() + { + semaphoreSlim.Wait(); + try + { + if (models.Count < Options.PoolSizePerConnection) + { + var model = new ModelWrapper(this, connection.CreateModel()); + models.Add(model); + return (true, model); + } + } + finally + { + semaphoreSlim.Release(); + } + return (false, default); + } + public void Return(ModelWrapper model) + { + models.Remove(model); + } + } +} diff --git a/src/Pole.EventBus.Rabbitmq/Client/IRabbitMQClient.cs b/src/Pole.EventBus.Rabbitmq/Client/IRabbitMQClient.cs new file mode 100644 index 0000000..21958c9 --- /dev/null +++ b/src/Pole.EventBus.Rabbitmq/Client/IRabbitMQClient.cs @@ -0,0 +1,7 @@ +namespace Ray.EventBus.RabbitMQ +{ + public interface IRabbitMQClient + { + ModelWrapper PullModel(); + } +} diff --git a/src/Pole.EventBus.Rabbitmq/Client/ModelPooledObjectPolicy.cs b/src/Pole.EventBus.Rabbitmq/Client/ModelPooledObjectPolicy.cs new file mode 100644 index 0000000..22bfde1 --- /dev/null +++ b/src/Pole.EventBus.Rabbitmq/Client/ModelPooledObjectPolicy.cs @@ -0,0 +1,57 @@ +using Microsoft.Extensions.ObjectPool; +using RabbitMQ.Client; +using System.Collections.Generic; +using System.Threading; + +namespace Ray.EventBus.RabbitMQ +{ + public class ModelPooledObjectPolicy : IPooledObjectPolicy + { + readonly ConnectionFactory connectionFactory; + readonly List connections = new List(); + readonly SemaphoreSlim semaphoreSlim = new SemaphoreSlim(1); + readonly RabbitOptions options; + public ModelPooledObjectPolicy(ConnectionFactory connectionFactory, RabbitOptions options) + { + this.connectionFactory = connectionFactory; + this.options = options; + } + public ModelWrapper Create() + { + foreach (var connection in connections) + { + (bool success, ModelWrapper model) = connection.Get(); + if (success) + return model; + } + semaphoreSlim.Wait(); + try + { + if (connections.Count < options.MaxConnection) + { + var connection = new ConnectionWrapper(connectionFactory.CreateConnection(options.EndPoints), options); + (bool success, ModelWrapper model) = connection.Get(); + connections.Add(connection); + if (success) + return model; + } + throw new System.OverflowException(nameof(connections)); + } + finally + { + semaphoreSlim.Release(); + } + } + + public bool Return(ModelWrapper obj) + { + if (obj.Model.IsOpen) + return true; + else + { + obj.ForceDispose(); + return false; + } + } + } +} diff --git a/src/Pole.EventBus.Rabbitmq/Client/ModelWrapper.cs b/src/Pole.EventBus.Rabbitmq/Client/ModelWrapper.cs new file mode 100644 index 0000000..32a3d1b --- /dev/null +++ b/src/Pole.EventBus.Rabbitmq/Client/ModelWrapper.cs @@ -0,0 +1,40 @@ +using Microsoft.Extensions.ObjectPool; +using RabbitMQ.Client; +using System; + +namespace Ray.EventBus.RabbitMQ +{ + public class ModelWrapper : IDisposable + { + readonly IBasicProperties persistentProperties; + readonly IBasicProperties noPersistentProperties; + public DefaultObjectPool Pool { get; set; } + public ConnectionWrapper Connection { get; set; } + public IModel Model { get; set; } + public ModelWrapper( + ConnectionWrapper connectionWrapper, + IModel model) + { + Connection = connectionWrapper; + Model = model; + persistentProperties = Model.CreateBasicProperties(); + persistentProperties.Persistent = true; + noPersistentProperties = Model.CreateBasicProperties(); + noPersistentProperties.Persistent = false; + } + public void Publish(byte[] msg, string exchange, string routingKey, bool persistent = true) + { + Model.BasicPublish(exchange, routingKey, persistent ? persistentProperties : noPersistentProperties, msg); + } + public void Dispose() + { + Pool.Return(this); + } + public void ForceDispose() + { + Model.Close(); + Model.Dispose(); + Connection.Return(this); + } + } +} diff --git a/src/Pole.EventBus.Rabbitmq/Client/RabbitMQClient.cs b/src/Pole.EventBus.Rabbitmq/Client/RabbitMQClient.cs new file mode 100644 index 0000000..412fdfc --- /dev/null +++ b/src/Pole.EventBus.Rabbitmq/Client/RabbitMQClient.cs @@ -0,0 +1,33 @@ +using Microsoft.Extensions.ObjectPool; +using Microsoft.Extensions.Options; +using RabbitMQ.Client; + +namespace Ray.EventBus.RabbitMQ +{ + public class RabbitMQClient : IRabbitMQClient + { + readonly ConnectionFactory connectionFactory; + readonly RabbitOptions options; + readonly DefaultObjectPool pool; + public RabbitMQClient(IOptions config) + { + options = config.Value; + connectionFactory = new ConnectionFactory + { + UserName = options.UserName, + Password = options.Password, + VirtualHost = options.VirtualHost, + AutomaticRecoveryEnabled = false + }; + pool = new DefaultObjectPool(new ModelPooledObjectPolicy(connectionFactory, options)); + } + + public ModelWrapper PullModel() + { + var result = pool.Get(); + if (result.Pool is null) + result.Pool = pool; + return result; + } + } +} diff --git a/src/Pole.EventBus.Rabbitmq/Configuration/ConsumerOptions.cs b/src/Pole.EventBus.Rabbitmq/Configuration/ConsumerOptions.cs new file mode 100644 index 0000000..148a280 --- /dev/null +++ b/src/Pole.EventBus.Rabbitmq/Configuration/ConsumerOptions.cs @@ -0,0 +1,17 @@ +namespace Ray.EventBus.RabbitMQ +{ + /// + /// Consumer配置信息 + /// + public class ConsumerOptions + { + /// + /// 是否自动ack + /// + public bool AutoAck { get; set; } + /// + /// 消息处理失败是否重回队列还是不停重发 + /// + public bool Reenqueue { get; set; } + } +} diff --git a/src/Pole.EventBus.Rabbitmq/Configuration/RabbitOptions.cs b/src/Pole.EventBus.Rabbitmq/Configuration/RabbitOptions.cs new file mode 100644 index 0000000..20c7384 --- /dev/null +++ b/src/Pole.EventBus.Rabbitmq/Configuration/RabbitOptions.cs @@ -0,0 +1,38 @@ +using System.Collections.Generic; +using RabbitMQ.Client; + +namespace Ray.EventBus.RabbitMQ +{ + public class RabbitOptions + { + public string UserName { get; set; } + public string Password { get; set; } + public string VirtualHost { get; set; } + public int PoolSizePerConnection { get; set; } = 200; + public int MaxConnection { get; set; } = 20; + /// + /// 消费者批量处理每次处理的最大消息量 + /// + public ushort CunsumerMaxBatchSize { get; set; } = 3000; + /// + /// 消费者批量处理每次处理的最大延时 + /// + public int CunsumerMaxMillisecondsInterval { get; set; } = 1000; + public string[] Hosts + { + get; set; + } + public List EndPoints + { + get + { + var list = new List(); + foreach (var host in Hosts) + { + list.Add(AmqpTcpEndpoint.Parse(host)); + } + return list; + } + } + } +} diff --git a/src/Pole.EventBus.Rabbitmq/Consumer/ConsumerManager.cs b/src/Pole.EventBus.Rabbitmq/Consumer/ConsumerManager.cs new file mode 100644 index 0000000..90634c5 --- /dev/null +++ b/src/Pole.EventBus.Rabbitmq/Consumer/ConsumerManager.cs @@ -0,0 +1,169 @@ +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Orleans; +using Ray.Core.Services; +using System; +using System.Collections.Concurrent; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; + +namespace Ray.EventBus.RabbitMQ +{ + public class ConsumerManager : IHostedService, IDisposable + { + readonly ILogger logger; + readonly IRabbitMQClient client; + readonly IRabbitEventBusContainer rabbitEventBusContainer; + readonly IServiceProvider provider; + readonly IGrainFactory grainFactory; + const int _HoldTime = 20 * 1000; + const int _MonitTime = 60 * 2 * 1000; + const int _checkTime = 10 * 1000; + + public ConsumerManager( + ILogger logger, + IRabbitMQClient client, + IGrainFactory grainFactory, + IServiceProvider provider, + IRabbitEventBusContainer rabbitEventBusContainer) + { + this.provider = provider; + this.client = client; + this.logger = logger; + this.rabbitEventBusContainer = rabbitEventBusContainer; + this.grainFactory = grainFactory; + } + private readonly ConcurrentDictionary ConsumerRunners = new ConcurrentDictionary(); + private ConcurrentDictionary Runners { get; } = new ConcurrentDictionary(); + private Timer HeathCheckTimer { get; set; } + private Timer DistributedMonitorTime { get; set; } + private Timer DistributedHoldTimer { get; set; } + const int lockHoldingSeconds = 60; + int distributedMonitorTimeLock = 0; + int distributedHoldTimerLock = 0; + int heathCheckTimerLock = 0; + private async Task DistributedStart() + { + try + { + if (Interlocked.CompareExchange(ref distributedMonitorTimeLock, 1, 0) == 0) + { + var consumers = rabbitEventBusContainer.GetConsumers(); + foreach (var consumer in consumers) + { + if (consumer is RabbitConsumer value) + { + for (int i = 0; i < value.QueueList.Count(); i++) + { + var queue = value.QueueList[i]; + var key = queue.ToString(); + if (!Runners.ContainsKey(key)) + { + var weight = 100000 - Runners.Count; + var (isOk, lockId, expectMillisecondDelay) = await grainFactory.GetGrain(key).Lock(weight, lockHoldingSeconds); + if (isOk) + { + if (Runners.TryAdd(key, lockId)) + { + var runner = new ConsumerRunner(client, provider, value, queue); + ConsumerRunners.TryAdd(key, runner); + await runner.Run(); + } + + } + } + } + } + } + Interlocked.Exchange(ref distributedMonitorTimeLock, 0); + if (logger.IsEnabled(LogLevel.Information)) + logger.LogInformation("EventBus Background Service is working."); + } + } + catch (Exception exception) + { + logger.LogError(exception.InnerException ?? exception, nameof(DistributedStart)); + Interlocked.Exchange(ref distributedMonitorTimeLock, 0); + } + } + private async Task DistributedHold() + { + try + { + if (logger.IsEnabled(LogLevel.Information)) + logger.LogInformation("EventBus Background Service is holding."); + if (Interlocked.CompareExchange(ref distributedHoldTimerLock, 1, 0) == 0) + { + foreach (var lockKV in Runners) + { + if (Runners.TryGetValue(lockKV.Key, out var lockId)) + { + var holdResult = await grainFactory.GetGrain(lockKV.Key).Hold(lockId, lockHoldingSeconds); + if (!holdResult) + { + if (ConsumerRunners.TryRemove(lockKV.Key, out var runner)) + { + runner.Close(); + } + Runners.TryRemove(lockKV.Key, out var _); + } + } + } + Interlocked.Exchange(ref distributedHoldTimerLock, 0); + } + } + catch (Exception exception) + { + logger.LogError(exception.InnerException ?? exception, nameof(DistributedHold)); + Interlocked.Exchange(ref distributedHoldTimerLock, 0); + } + } + private async Task HeathCheck() + { + try + { + if (logger.IsEnabled(LogLevel.Information)) + logger.LogInformation("EventBus Background Service is checking."); + if (Interlocked.CompareExchange(ref heathCheckTimerLock, 1, 0) == 0) + { + await Task.WhenAll(ConsumerRunners.Values.Select(runner => runner.HeathCheck())); + Interlocked.Exchange(ref heathCheckTimerLock, 0); + } + } + catch (Exception exception) + { + logger.LogError(exception.InnerException ?? exception, nameof(HeathCheck)); + Interlocked.Exchange(ref heathCheckTimerLock, 0); + } + } + public Task StartAsync(CancellationToken cancellationToken) + { + if (logger.IsEnabled(LogLevel.Information)) + logger.LogInformation("EventBus Background Service is starting."); + DistributedMonitorTime = new Timer(state => DistributedStart().Wait(), null, 1000, _MonitTime); + DistributedHoldTimer = new Timer(state => DistributedHold().Wait(), null, _HoldTime, _HoldTime); + HeathCheckTimer = new Timer(state => { HeathCheck().Wait(); }, null, _checkTime, _checkTime); + return Task.CompletedTask; + } + public Task StopAsync(CancellationToken cancellationToken) + { + if (logger.IsEnabled(LogLevel.Information)) + logger.LogInformation("EventBus Background Service is stopping."); + Dispose(); + return Task.CompletedTask; + } + public void Dispose() + { + if (logger.IsEnabled(LogLevel.Information)) + logger.LogInformation("EventBus Background Service is disposing."); + foreach (var runner in ConsumerRunners.Values) + { + runner.Close(); + } + DistributedMonitorTime?.Dispose(); + DistributedHoldTimer?.Dispose(); + HeathCheckTimer?.Dispose(); + } + } +} diff --git a/src/Pole.EventBus.Rabbitmq/Consumer/ConsumerRunner.cs b/src/Pole.EventBus.Rabbitmq/Consumer/ConsumerRunner.cs new file mode 100644 index 0000000..0838505 --- /dev/null +++ b/src/Pole.EventBus.Rabbitmq/Consumer/ConsumerRunner.cs @@ -0,0 +1,119 @@ +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using RabbitMQ.Client; +using RabbitMQ.Client.Events; +using Ray.Core.Channels; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; + +namespace Ray.EventBus.RabbitMQ +{ + public class ConsumerRunner + { + readonly IMpscChannel mpscChannel; + public ConsumerRunner( + IRabbitMQClient client, + IServiceProvider provider, + RabbitConsumer consumer, + QueueInfo queue) + { + Client = client; + Logger = provider.GetService>(); + mpscChannel = provider.GetService>(); + mpscChannel.BindConsumer(BatchExecuter); + Consumer = consumer; + Queue = queue; + } + public ILogger Logger { get; } + public IRabbitMQClient Client { get; } + public RabbitConsumer Consumer { get; } + public QueueInfo Queue { get; } + public ModelWrapper Model { get; set; } + public EventingBasicConsumer BasicConsumer { get; set; } + public bool IsUnAvailable => !BasicConsumer.IsRunning || Model.Model.IsClosed; + private bool isFirst = true; + public Task Run() + { + Model = Client.PullModel(); + mpscChannel.Config(Model.Connection.Options.CunsumerMaxBatchSize, Model.Connection.Options.CunsumerMaxMillisecondsInterval); + if (isFirst) + { + isFirst = false; + Model.Model.ExchangeDeclare(Consumer.EventBus.Exchange, "direct", true); + Model.Model.QueueDeclare(Queue.Queue, true, false, false, null); + Model.Model.QueueBind(Queue.Queue, Consumer.EventBus.Exchange, Queue.RoutingKey); + } + Model.Model.BasicQos(0, Model.Connection.Options.CunsumerMaxBatchSize, false); + BasicConsumer = new EventingBasicConsumer(Model.Model); + BasicConsumer.Received += async (ch, ea) => await mpscChannel.WriteAsync(ea); + BasicConsumer.ConsumerTag = Model.Model.BasicConsume(Queue.Queue, Consumer.Config.AutoAck, BasicConsumer); + return Task.CompletedTask; + } + public Task HeathCheck() + { + if (IsUnAvailable) + { + Close(); + return Run(); + } + else + return Task.CompletedTask; + } + private async Task BatchExecuter(List list) + { + if (list.Count == 1) + { + await Process(list.First()); + } + else + { + try + { + await Consumer.Notice(list.Select(o => o.Body).ToList()); + if (!Consumer.Config.AutoAck) + { + Model.Model.BasicAck(list.Max(o => o.DeliveryTag), true); + } + } + catch (Exception exception) + { + Logger.LogError(exception.InnerException ?? exception, $"An error occurred in {Consumer.EventBus.Exchange}-{Queue}"); + if (Consumer.Config.Reenqueue) + { + await Task.Delay(1000); + foreach (var item in list) + { + Model.Model.BasicReject(item.DeliveryTag, true); + } + } + } + } + } + private async Task Process(BasicDeliverEventArgs ea) + { + try + { + await Consumer.Notice(ea.Body); + if (!Consumer.Config.AutoAck) + { + Model.Model.BasicAck(ea.DeliveryTag, false); + } + } + catch (Exception exception) + { + Logger.LogError(exception.InnerException ?? exception, $"An error occurred in {Consumer.EventBus.Exchange}-{Queue}"); + if (Consumer.Config.Reenqueue) + { + await Task.Delay(1000); + Model.Model.BasicReject(ea.DeliveryTag, true); + } + } + } + public void Close() + { + Model?.Dispose(); + } + } +} diff --git a/src/Pole.EventBus.Rabbitmq/Consumer/RabbitConsumer.cs b/src/Pole.EventBus.Rabbitmq/Consumer/RabbitConsumer.cs new file mode 100644 index 0000000..b519362 --- /dev/null +++ b/src/Pole.EventBus.Rabbitmq/Consumer/RabbitConsumer.cs @@ -0,0 +1,19 @@ +using System; +using System.Collections.Generic; +using System.Threading.Tasks; +using Ray.Core.EventBus; + +namespace Ray.EventBus.RabbitMQ +{ + public class RabbitConsumer : Consumer + { + public RabbitConsumer( + List> eventHandlers, + List, Task>> batchEventHandlers) : base(eventHandlers, batchEventHandlers) + { + } + public RabbitEventBus EventBus { get; set; } + public List QueueList { get; set; } + public ConsumerOptions Config { get; set; } + } +} diff --git a/src/Pole.EventBus.Rabbitmq/Core/Attributes/ProducerAttribute.cs b/src/Pole.EventBus.Rabbitmq/Core/Attributes/ProducerAttribute.cs new file mode 100644 index 0000000..c7c7056 --- /dev/null +++ b/src/Pole.EventBus.Rabbitmq/Core/Attributes/ProducerAttribute.cs @@ -0,0 +1,24 @@ +using System; + +namespace Ray.EventBus.RabbitMQ +{ + [AttributeUsage(AttributeTargets.Class, AllowMultiple = false)] + public class ProducerAttribute : Attribute + { + public ProducerAttribute(string exchange = null, string routePrefix = null, int lBCount = 1, bool autoAck = false, bool reenqueue = false, bool persistent = false) + { + Exchange = exchange; + RoutePrefix = routePrefix; + LBCount = lBCount; + AutoAck = autoAck; + Reenqueue = reenqueue; + Persistent = persistent; + } + public string Exchange { get; } + public string RoutePrefix { get; } + public int LBCount { get; } + public bool AutoAck { get; set; } + public bool Reenqueue { get; set; } + public bool Persistent { get; set; } + } +} diff --git a/src/Pole.EventBus.Rabbitmq/Core/Client/ConnectionWrapper.cs b/src/Pole.EventBus.Rabbitmq/Core/Client/ConnectionWrapper.cs new file mode 100644 index 0000000..0de42d8 --- /dev/null +++ b/src/Pole.EventBus.Rabbitmq/Core/Client/ConnectionWrapper.cs @@ -0,0 +1,43 @@ +using RabbitMQ.Client; +using System.Collections.Generic; +using System.Threading; + +namespace Ray.EventBus.RabbitMQ +{ + public class ConnectionWrapper + { + private readonly List models = new List(); + private readonly IConnection connection; + readonly SemaphoreSlim semaphoreSlim = new SemaphoreSlim(1); + public ConnectionWrapper( + IConnection connection, + RabbitOptions options) + { + this.connection = connection; + Options = options; + } + public RabbitOptions Options { get; } + public (bool success, ModelWrapper model) Get() + { + semaphoreSlim.Wait(); + try + { + if (models.Count < Options.PoolSizePerConnection) + { + var model = new ModelWrapper(this, connection.CreateModel()); + models.Add(model); + return (true, model); + } + } + finally + { + semaphoreSlim.Release(); + } + return (false, default); + } + public void Return(ModelWrapper model) + { + models.Remove(model); + } + } +} diff --git a/src/Pole.EventBus.Rabbitmq/Core/Client/IRabbitMQClient.cs b/src/Pole.EventBus.Rabbitmq/Core/Client/IRabbitMQClient.cs new file mode 100644 index 0000000..21958c9 --- /dev/null +++ b/src/Pole.EventBus.Rabbitmq/Core/Client/IRabbitMQClient.cs @@ -0,0 +1,7 @@ +namespace Ray.EventBus.RabbitMQ +{ + public interface IRabbitMQClient + { + ModelWrapper PullModel(); + } +} diff --git a/src/Pole.EventBus.Rabbitmq/Core/Client/ModelPooledObjectPolicy.cs b/src/Pole.EventBus.Rabbitmq/Core/Client/ModelPooledObjectPolicy.cs new file mode 100644 index 0000000..22bfde1 --- /dev/null +++ b/src/Pole.EventBus.Rabbitmq/Core/Client/ModelPooledObjectPolicy.cs @@ -0,0 +1,57 @@ +using Microsoft.Extensions.ObjectPool; +using RabbitMQ.Client; +using System.Collections.Generic; +using System.Threading; + +namespace Ray.EventBus.RabbitMQ +{ + public class ModelPooledObjectPolicy : IPooledObjectPolicy + { + readonly ConnectionFactory connectionFactory; + readonly List connections = new List(); + readonly SemaphoreSlim semaphoreSlim = new SemaphoreSlim(1); + readonly RabbitOptions options; + public ModelPooledObjectPolicy(ConnectionFactory connectionFactory, RabbitOptions options) + { + this.connectionFactory = connectionFactory; + this.options = options; + } + public ModelWrapper Create() + { + foreach (var connection in connections) + { + (bool success, ModelWrapper model) = connection.Get(); + if (success) + return model; + } + semaphoreSlim.Wait(); + try + { + if (connections.Count < options.MaxConnection) + { + var connection = new ConnectionWrapper(connectionFactory.CreateConnection(options.EndPoints), options); + (bool success, ModelWrapper model) = connection.Get(); + connections.Add(connection); + if (success) + return model; + } + throw new System.OverflowException(nameof(connections)); + } + finally + { + semaphoreSlim.Release(); + } + } + + public bool Return(ModelWrapper obj) + { + if (obj.Model.IsOpen) + return true; + else + { + obj.ForceDispose(); + return false; + } + } + } +} diff --git a/src/Pole.EventBus.Rabbitmq/Core/Client/ModelWrapper.cs b/src/Pole.EventBus.Rabbitmq/Core/Client/ModelWrapper.cs new file mode 100644 index 0000000..32a3d1b --- /dev/null +++ b/src/Pole.EventBus.Rabbitmq/Core/Client/ModelWrapper.cs @@ -0,0 +1,40 @@ +using Microsoft.Extensions.ObjectPool; +using RabbitMQ.Client; +using System; + +namespace Ray.EventBus.RabbitMQ +{ + public class ModelWrapper : IDisposable + { + readonly IBasicProperties persistentProperties; + readonly IBasicProperties noPersistentProperties; + public DefaultObjectPool Pool { get; set; } + public ConnectionWrapper Connection { get; set; } + public IModel Model { get; set; } + public ModelWrapper( + ConnectionWrapper connectionWrapper, + IModel model) + { + Connection = connectionWrapper; + Model = model; + persistentProperties = Model.CreateBasicProperties(); + persistentProperties.Persistent = true; + noPersistentProperties = Model.CreateBasicProperties(); + noPersistentProperties.Persistent = false; + } + public void Publish(byte[] msg, string exchange, string routingKey, bool persistent = true) + { + Model.BasicPublish(exchange, routingKey, persistent ? persistentProperties : noPersistentProperties, msg); + } + public void Dispose() + { + Pool.Return(this); + } + public void ForceDispose() + { + Model.Close(); + Model.Dispose(); + Connection.Return(this); + } + } +} diff --git a/src/Pole.EventBus.Rabbitmq/Core/Client/RabbitMQClient.cs b/src/Pole.EventBus.Rabbitmq/Core/Client/RabbitMQClient.cs new file mode 100644 index 0000000..412fdfc --- /dev/null +++ b/src/Pole.EventBus.Rabbitmq/Core/Client/RabbitMQClient.cs @@ -0,0 +1,33 @@ +using Microsoft.Extensions.ObjectPool; +using Microsoft.Extensions.Options; +using RabbitMQ.Client; + +namespace Ray.EventBus.RabbitMQ +{ + public class RabbitMQClient : IRabbitMQClient + { + readonly ConnectionFactory connectionFactory; + readonly RabbitOptions options; + readonly DefaultObjectPool pool; + public RabbitMQClient(IOptions config) + { + options = config.Value; + connectionFactory = new ConnectionFactory + { + UserName = options.UserName, + Password = options.Password, + VirtualHost = options.VirtualHost, + AutomaticRecoveryEnabled = false + }; + pool = new DefaultObjectPool(new ModelPooledObjectPolicy(connectionFactory, options)); + } + + public ModelWrapper PullModel() + { + var result = pool.Get(); + if (result.Pool is null) + result.Pool = pool; + return result; + } + } +} diff --git a/src/Pole.EventBus.Rabbitmq/Core/Configuration/ConsumerOptions.cs b/src/Pole.EventBus.Rabbitmq/Core/Configuration/ConsumerOptions.cs new file mode 100644 index 0000000..148a280 --- /dev/null +++ b/src/Pole.EventBus.Rabbitmq/Core/Configuration/ConsumerOptions.cs @@ -0,0 +1,17 @@ +namespace Ray.EventBus.RabbitMQ +{ + /// + /// Consumer配置信息 + /// + public class ConsumerOptions + { + /// + /// 是否自动ack + /// + public bool AutoAck { get; set; } + /// + /// 消息处理失败是否重回队列还是不停重发 + /// + public bool Reenqueue { get; set; } + } +} diff --git a/src/Pole.EventBus.Rabbitmq/Core/Configuration/RabbitOptions.cs b/src/Pole.EventBus.Rabbitmq/Core/Configuration/RabbitOptions.cs new file mode 100644 index 0000000..20c7384 --- /dev/null +++ b/src/Pole.EventBus.Rabbitmq/Core/Configuration/RabbitOptions.cs @@ -0,0 +1,38 @@ +using System.Collections.Generic; +using RabbitMQ.Client; + +namespace Ray.EventBus.RabbitMQ +{ + public class RabbitOptions + { + public string UserName { get; set; } + public string Password { get; set; } + public string VirtualHost { get; set; } + public int PoolSizePerConnection { get; set; } = 200; + public int MaxConnection { get; set; } = 20; + /// + /// 消费者批量处理每次处理的最大消息量 + /// + public ushort CunsumerMaxBatchSize { get; set; } = 3000; + /// + /// 消费者批量处理每次处理的最大延时 + /// + public int CunsumerMaxMillisecondsInterval { get; set; } = 1000; + public string[] Hosts + { + get; set; + } + public List EndPoints + { + get + { + var list = new List(); + foreach (var host in Hosts) + { + list.Add(AmqpTcpEndpoint.Parse(host)); + } + return list; + } + } + } +} diff --git a/src/Pole.EventBus.Rabbitmq/Core/Consumer/ConsumerManager.cs b/src/Pole.EventBus.Rabbitmq/Core/Consumer/ConsumerManager.cs new file mode 100644 index 0000000..90634c5 --- /dev/null +++ b/src/Pole.EventBus.Rabbitmq/Core/Consumer/ConsumerManager.cs @@ -0,0 +1,169 @@ +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Orleans; +using Ray.Core.Services; +using System; +using System.Collections.Concurrent; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; + +namespace Ray.EventBus.RabbitMQ +{ + public class ConsumerManager : IHostedService, IDisposable + { + readonly ILogger logger; + readonly IRabbitMQClient client; + readonly IRabbitEventBusContainer rabbitEventBusContainer; + readonly IServiceProvider provider; + readonly IGrainFactory grainFactory; + const int _HoldTime = 20 * 1000; + const int _MonitTime = 60 * 2 * 1000; + const int _checkTime = 10 * 1000; + + public ConsumerManager( + ILogger logger, + IRabbitMQClient client, + IGrainFactory grainFactory, + IServiceProvider provider, + IRabbitEventBusContainer rabbitEventBusContainer) + { + this.provider = provider; + this.client = client; + this.logger = logger; + this.rabbitEventBusContainer = rabbitEventBusContainer; + this.grainFactory = grainFactory; + } + private readonly ConcurrentDictionary ConsumerRunners = new ConcurrentDictionary(); + private ConcurrentDictionary Runners { get; } = new ConcurrentDictionary(); + private Timer HeathCheckTimer { get; set; } + private Timer DistributedMonitorTime { get; set; } + private Timer DistributedHoldTimer { get; set; } + const int lockHoldingSeconds = 60; + int distributedMonitorTimeLock = 0; + int distributedHoldTimerLock = 0; + int heathCheckTimerLock = 0; + private async Task DistributedStart() + { + try + { + if (Interlocked.CompareExchange(ref distributedMonitorTimeLock, 1, 0) == 0) + { + var consumers = rabbitEventBusContainer.GetConsumers(); + foreach (var consumer in consumers) + { + if (consumer is RabbitConsumer value) + { + for (int i = 0; i < value.QueueList.Count(); i++) + { + var queue = value.QueueList[i]; + var key = queue.ToString(); + if (!Runners.ContainsKey(key)) + { + var weight = 100000 - Runners.Count; + var (isOk, lockId, expectMillisecondDelay) = await grainFactory.GetGrain(key).Lock(weight, lockHoldingSeconds); + if (isOk) + { + if (Runners.TryAdd(key, lockId)) + { + var runner = new ConsumerRunner(client, provider, value, queue); + ConsumerRunners.TryAdd(key, runner); + await runner.Run(); + } + + } + } + } + } + } + Interlocked.Exchange(ref distributedMonitorTimeLock, 0); + if (logger.IsEnabled(LogLevel.Information)) + logger.LogInformation("EventBus Background Service is working."); + } + } + catch (Exception exception) + { + logger.LogError(exception.InnerException ?? exception, nameof(DistributedStart)); + Interlocked.Exchange(ref distributedMonitorTimeLock, 0); + } + } + private async Task DistributedHold() + { + try + { + if (logger.IsEnabled(LogLevel.Information)) + logger.LogInformation("EventBus Background Service is holding."); + if (Interlocked.CompareExchange(ref distributedHoldTimerLock, 1, 0) == 0) + { + foreach (var lockKV in Runners) + { + if (Runners.TryGetValue(lockKV.Key, out var lockId)) + { + var holdResult = await grainFactory.GetGrain(lockKV.Key).Hold(lockId, lockHoldingSeconds); + if (!holdResult) + { + if (ConsumerRunners.TryRemove(lockKV.Key, out var runner)) + { + runner.Close(); + } + Runners.TryRemove(lockKV.Key, out var _); + } + } + } + Interlocked.Exchange(ref distributedHoldTimerLock, 0); + } + } + catch (Exception exception) + { + logger.LogError(exception.InnerException ?? exception, nameof(DistributedHold)); + Interlocked.Exchange(ref distributedHoldTimerLock, 0); + } + } + private async Task HeathCheck() + { + try + { + if (logger.IsEnabled(LogLevel.Information)) + logger.LogInformation("EventBus Background Service is checking."); + if (Interlocked.CompareExchange(ref heathCheckTimerLock, 1, 0) == 0) + { + await Task.WhenAll(ConsumerRunners.Values.Select(runner => runner.HeathCheck())); + Interlocked.Exchange(ref heathCheckTimerLock, 0); + } + } + catch (Exception exception) + { + logger.LogError(exception.InnerException ?? exception, nameof(HeathCheck)); + Interlocked.Exchange(ref heathCheckTimerLock, 0); + } + } + public Task StartAsync(CancellationToken cancellationToken) + { + if (logger.IsEnabled(LogLevel.Information)) + logger.LogInformation("EventBus Background Service is starting."); + DistributedMonitorTime = new Timer(state => DistributedStart().Wait(), null, 1000, _MonitTime); + DistributedHoldTimer = new Timer(state => DistributedHold().Wait(), null, _HoldTime, _HoldTime); + HeathCheckTimer = new Timer(state => { HeathCheck().Wait(); }, null, _checkTime, _checkTime); + return Task.CompletedTask; + } + public Task StopAsync(CancellationToken cancellationToken) + { + if (logger.IsEnabled(LogLevel.Information)) + logger.LogInformation("EventBus Background Service is stopping."); + Dispose(); + return Task.CompletedTask; + } + public void Dispose() + { + if (logger.IsEnabled(LogLevel.Information)) + logger.LogInformation("EventBus Background Service is disposing."); + foreach (var runner in ConsumerRunners.Values) + { + runner.Close(); + } + DistributedMonitorTime?.Dispose(); + DistributedHoldTimer?.Dispose(); + HeathCheckTimer?.Dispose(); + } + } +} diff --git a/src/Pole.EventBus.Rabbitmq/Core/Consumer/ConsumerRunner.cs b/src/Pole.EventBus.Rabbitmq/Core/Consumer/ConsumerRunner.cs new file mode 100644 index 0000000..0838505 --- /dev/null +++ b/src/Pole.EventBus.Rabbitmq/Core/Consumer/ConsumerRunner.cs @@ -0,0 +1,119 @@ +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using RabbitMQ.Client; +using RabbitMQ.Client.Events; +using Ray.Core.Channels; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; + +namespace Ray.EventBus.RabbitMQ +{ + public class ConsumerRunner + { + readonly IMpscChannel mpscChannel; + public ConsumerRunner( + IRabbitMQClient client, + IServiceProvider provider, + RabbitConsumer consumer, + QueueInfo queue) + { + Client = client; + Logger = provider.GetService>(); + mpscChannel = provider.GetService>(); + mpscChannel.BindConsumer(BatchExecuter); + Consumer = consumer; + Queue = queue; + } + public ILogger Logger { get; } + public IRabbitMQClient Client { get; } + public RabbitConsumer Consumer { get; } + public QueueInfo Queue { get; } + public ModelWrapper Model { get; set; } + public EventingBasicConsumer BasicConsumer { get; set; } + public bool IsUnAvailable => !BasicConsumer.IsRunning || Model.Model.IsClosed; + private bool isFirst = true; + public Task Run() + { + Model = Client.PullModel(); + mpscChannel.Config(Model.Connection.Options.CunsumerMaxBatchSize, Model.Connection.Options.CunsumerMaxMillisecondsInterval); + if (isFirst) + { + isFirst = false; + Model.Model.ExchangeDeclare(Consumer.EventBus.Exchange, "direct", true); + Model.Model.QueueDeclare(Queue.Queue, true, false, false, null); + Model.Model.QueueBind(Queue.Queue, Consumer.EventBus.Exchange, Queue.RoutingKey); + } + Model.Model.BasicQos(0, Model.Connection.Options.CunsumerMaxBatchSize, false); + BasicConsumer = new EventingBasicConsumer(Model.Model); + BasicConsumer.Received += async (ch, ea) => await mpscChannel.WriteAsync(ea); + BasicConsumer.ConsumerTag = Model.Model.BasicConsume(Queue.Queue, Consumer.Config.AutoAck, BasicConsumer); + return Task.CompletedTask; + } + public Task HeathCheck() + { + if (IsUnAvailable) + { + Close(); + return Run(); + } + else + return Task.CompletedTask; + } + private async Task BatchExecuter(List list) + { + if (list.Count == 1) + { + await Process(list.First()); + } + else + { + try + { + await Consumer.Notice(list.Select(o => o.Body).ToList()); + if (!Consumer.Config.AutoAck) + { + Model.Model.BasicAck(list.Max(o => o.DeliveryTag), true); + } + } + catch (Exception exception) + { + Logger.LogError(exception.InnerException ?? exception, $"An error occurred in {Consumer.EventBus.Exchange}-{Queue}"); + if (Consumer.Config.Reenqueue) + { + await Task.Delay(1000); + foreach (var item in list) + { + Model.Model.BasicReject(item.DeliveryTag, true); + } + } + } + } + } + private async Task Process(BasicDeliverEventArgs ea) + { + try + { + await Consumer.Notice(ea.Body); + if (!Consumer.Config.AutoAck) + { + Model.Model.BasicAck(ea.DeliveryTag, false); + } + } + catch (Exception exception) + { + Logger.LogError(exception.InnerException ?? exception, $"An error occurred in {Consumer.EventBus.Exchange}-{Queue}"); + if (Consumer.Config.Reenqueue) + { + await Task.Delay(1000); + Model.Model.BasicReject(ea.DeliveryTag, true); + } + } + } + public void Close() + { + Model?.Dispose(); + } + } +} diff --git a/src/Pole.EventBus.Rabbitmq/Core/Consumer/RabbitConsumer.cs b/src/Pole.EventBus.Rabbitmq/Core/Consumer/RabbitConsumer.cs new file mode 100644 index 0000000..b519362 --- /dev/null +++ b/src/Pole.EventBus.Rabbitmq/Core/Consumer/RabbitConsumer.cs @@ -0,0 +1,19 @@ +using System; +using System.Collections.Generic; +using System.Threading.Tasks; +using Ray.Core.EventBus; + +namespace Ray.EventBus.RabbitMQ +{ + public class RabbitConsumer : Consumer + { + public RabbitConsumer( + List> eventHandlers, + List, Task>> batchEventHandlers) : base(eventHandlers, batchEventHandlers) + { + } + public RabbitEventBus EventBus { get; set; } + public List QueueList { get; set; } + public ConsumerOptions Config { get; set; } + } +} diff --git a/src/Pole.EventBus.Rabbitmq/Core/EventBusContainer.cs b/src/Pole.EventBus.Rabbitmq/Core/EventBusContainer.cs new file mode 100644 index 0000000..b2c794d --- /dev/null +++ b/src/Pole.EventBus.Rabbitmq/Core/EventBusContainer.cs @@ -0,0 +1,114 @@ +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using Orleans; +using RabbitMQ.Client; +using Ray.Core.Abstractions; +using Ray.Core.EventBus; +using Ray.Core.Exceptions; +using Ray.Core.Utils; +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Threading.Tasks; + +namespace Ray.EventBus.RabbitMQ +{ + public class EventBusContainer : IRabbitEventBusContainer, IProducerContainer + { + private readonly ConcurrentDictionary eventBusDictionary = new ConcurrentDictionary(); + private readonly List eventBusList = new List(); + readonly IRabbitMQClient rabbitMQClient; + readonly IServiceProvider serviceProvider; + private readonly IObserverUnitContainer observerUnitContainer; + public EventBusContainer( + IServiceProvider serviceProvider, + IObserverUnitContainer observerUnitContainer, + IRabbitMQClient rabbitMQClient) + { + this.serviceProvider = serviceProvider; + this.rabbitMQClient = rabbitMQClient; + this.observerUnitContainer = observerUnitContainer; + } + public async Task AutoRegister() + { + var observableList = new List<(Type type, ProducerAttribute config)>(); + foreach (var assembly in AssemblyHelper.GetAssemblies(serviceProvider.GetService>())) + { + foreach (var type in assembly.GetTypes()) + { + foreach (var attribute in type.GetCustomAttributes(false)) + { + if (attribute is ProducerAttribute config) + { + observableList.Add((type, config)); + break; + } + } + } + } + foreach (var (type, config) in observableList) + { + var eventBus = CreateEventBus(string.IsNullOrEmpty(config.Exchange) ? type.Name : config.Exchange, string.IsNullOrEmpty(config.RoutePrefix) ? type.Name : config.RoutePrefix, config.LBCount, config.AutoAck, config.Reenqueue, config.Persistent).BindProducer(type); + if (typeof(IGrainWithIntegerKey).IsAssignableFrom(type)) + { + await eventBus.AddGrainConsumer(); + } + else if (typeof(IGrainWithStringKey).IsAssignableFrom(type)) + { + await eventBus.AddGrainConsumer(); + } + else + throw new PrimaryKeyTypeException(type.FullName); + } + } + public RabbitEventBus CreateEventBus(string exchange, string routePrefix, int lBCount = 1, bool autoAck = false, bool reenqueue = false, bool persistent = false) + { + return new RabbitEventBus(observerUnitContainer, this, exchange, routePrefix, lBCount, autoAck, reenqueue, persistent); + } + public RabbitEventBus CreateEventBus(string exchange, string routePrefix, int lBCount = 1, bool autoAck = false, bool reenqueue = false, bool persistent = false) + { + return CreateEventBus(exchange, routePrefix, lBCount, autoAck, reenqueue, persistent).BindProducer(); + } + public Task Work(RabbitEventBus bus) + { + if (eventBusDictionary.TryAdd(bus.ProducerType, bus)) + { + eventBusList.Add(bus); + using var channel = rabbitMQClient.PullModel(); + channel.Model.ExchangeDeclare(bus.Exchange, "direct", true); + return Task.CompletedTask; + } + else + throw new EventBusRepeatException(bus.ProducerType.FullName); + } + + readonly ConcurrentDictionary producerDict = new ConcurrentDictionary(); + public ValueTask GetProducer(Type type) + { + if (eventBusDictionary.TryGetValue(type, out var eventBus)) + { + return new ValueTask(producerDict.GetOrAdd(type, key => + { + return new RabbitProducer(rabbitMQClient, eventBus); + })); + } + else + { + throw new NotImplementedException($"{nameof(IProducer)} of {type.FullName}"); + } + } + public ValueTask GetProducer() + { + return GetProducer(typeof(T)); + } + public List GetConsumers() + { + var result = new List(); + foreach (var eventBus in eventBusList) + { + result.AddRange(eventBus.Consumers); + } + return result; + } + } +} diff --git a/src/Pole.EventBus.Rabbitmq/Core/EventBusRepeatException.cs b/src/Pole.EventBus.Rabbitmq/Core/EventBusRepeatException.cs new file mode 100644 index 0000000..d706edb --- /dev/null +++ b/src/Pole.EventBus.Rabbitmq/Core/EventBusRepeatException.cs @@ -0,0 +1,11 @@ +using System; + +namespace Ray.EventBus.RabbitMQ +{ + public class EventBusRepeatException : Exception + { + public EventBusRepeatException(string message) : base(message) + { + } + } +} diff --git a/src/Pole.EventBus.Rabbitmq/Core/IRabbitEventBusContainer.cs b/src/Pole.EventBus.Rabbitmq/Core/IRabbitEventBusContainer.cs new file mode 100644 index 0000000..7c44495 --- /dev/null +++ b/src/Pole.EventBus.Rabbitmq/Core/IRabbitEventBusContainer.cs @@ -0,0 +1,13 @@ +using System.Threading.Tasks; +using Ray.Core.EventBus; + +namespace Ray.EventBus.RabbitMQ +{ + public interface IRabbitEventBusContainer : IConsumerContainer + { + Task AutoRegister(); + RabbitEventBus CreateEventBus(string exchange, string routePrefix, int lBCount = 1, bool autoAck = false, bool reenqueue = false, bool persistent = false); + RabbitEventBus CreateEventBus(string routePrefix, string queue, int lBCount = 1, bool autoAck = false, bool reenqueue = false, bool persistent = false); + Task Work(RabbitEventBus bus); + } +} diff --git a/src/Pole.EventBus.Rabbitmq/Core/QueueInfo.cs b/src/Pole.EventBus.Rabbitmq/Core/QueueInfo.cs new file mode 100644 index 0000000..ca9993d --- /dev/null +++ b/src/Pole.EventBus.Rabbitmq/Core/QueueInfo.cs @@ -0,0 +1,12 @@ +namespace Ray.EventBus.RabbitMQ +{ + public class QueueInfo + { + public string Queue { get; set; } + public string RoutingKey { get; set; } + public override string ToString() + { + return $"{Queue}_{RoutingKey}"; + } + } +} diff --git a/src/Pole.EventBus.Rabbitmq/Core/RabbitEventBus.cs b/src/Pole.EventBus.Rabbitmq/Core/RabbitEventBus.cs new file mode 100644 index 0000000..6d8036f --- /dev/null +++ b/src/Pole.EventBus.Rabbitmq/Core/RabbitEventBus.cs @@ -0,0 +1,122 @@ +using Ray.Core.Abstractions; +using Ray.Core.Exceptions; +using Ray.Core.Utils; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; + +namespace Ray.EventBus.RabbitMQ +{ + public class RabbitEventBus + { + private readonly ConsistentHash _CHash; + readonly IObserverUnitContainer observerUnitContainer; + public RabbitEventBus( + IObserverUnitContainer observerUnitContainer, + IRabbitEventBusContainer eventBusContainer, + string exchange, string routePrefix, int lBCount = 1, bool autoAck = false, bool reenqueue = true, bool persistent = false) + { + if (string.IsNullOrEmpty(exchange)) + throw new ArgumentNullException(nameof(exchange)); + if (string.IsNullOrEmpty(routePrefix)) + throw new ArgumentNullException(nameof(routePrefix)); + if (lBCount < 1) + throw new ArgumentOutOfRangeException($"{nameof(lBCount)} must be greater than 1"); + this.observerUnitContainer = observerUnitContainer; + Container = eventBusContainer; + Exchange = exchange; + RoutePrefix = routePrefix; + LBCount = lBCount; + Persistent = persistent; + ConsumerConfig = new ConsumerOptions + { + AutoAck = autoAck, + Reenqueue = reenqueue, + }; + RouteList = new List(); + if (LBCount == 1) + { + RouteList.Add(routePrefix); + } + else + { + for (int i = 0; i < LBCount; i++) + { + RouteList.Add($"{routePrefix }_{ i.ToString()}"); + } + } + _CHash = new ConsistentHash(RouteList, lBCount * 10); + } + public IRabbitEventBusContainer Container { get; } + public string Exchange { get; } + public string RoutePrefix { get; } + public int LBCount { get; } + public ConsumerOptions ConsumerConfig { get; set; } + public List RouteList { get; } + public Type ProducerType { get; set; } + /// + /// 消息是否持久化 + /// + public bool Persistent { get; set; } + public List Consumers { get; set; } = new List(); + public string GetRoute(string key) + { + return LBCount == 1 ? RoutePrefix : _CHash.GetNode(key); ; + } + public RabbitEventBus BindProducer() + { + return BindProducer(typeof(TGrain)); + } + public RabbitEventBus BindProducer(Type grainType) + { + if (ProducerType == null) + ProducerType = grainType; + else + throw new EventBusRepeatBindingProducerException(grainType.FullName); + return this; + } + public RabbitEventBus AddGrainConsumer(string observerGroup) + { + var observerUnit = observerUnitContainer.GetUnit(ProducerType); + var consumer = new RabbitConsumer( + observerUnit.GetEventHandlers(observerGroup), + observerUnit.GetBatchEventHandlers(observerGroup)) + { + EventBus = this, + QueueList = RouteList.Select(route => new QueueInfo { RoutingKey = route, Queue = $"{route}_{observerGroup}" }).ToList(), + Config = ConsumerConfig + }; + Consumers.Add(consumer); + return this; + } + public RabbitEventBus AddConsumer( + Func handler, + Func, Task> batchHandler, + string observerGroup) + { + var consumer = new RabbitConsumer( + new List> { handler }, + new List, Task>> { batchHandler }) + { + EventBus = this, + QueueList = RouteList.Select(route => new QueueInfo { RoutingKey = route, Queue = $"{route}_{observerGroup}" }).ToList(), + Config = ConsumerConfig + }; + Consumers.Add(consumer); + return this; + } + public Task Enable() + { + return Container.Work(this); + } + public Task AddGrainConsumer() + { + foreach (var group in observerUnitContainer.GetUnit(ProducerType).GetGroups()) + { + AddGrainConsumer(group); + }; + return Enable(); + } + } +} diff --git a/src/Pole.EventBus.Rabbitmq/Extensions.cs b/src/Pole.EventBus.Rabbitmq/Extensions.cs new file mode 100644 index 0000000..20fd0f9 --- /dev/null +++ b/src/Pole.EventBus.Rabbitmq/Extensions.cs @@ -0,0 +1,31 @@ +using System; +using System.Threading.Tasks; +using Microsoft.Extensions.DependencyInjection; +using Ray.Core; +using Ray.Core.EventBus; + +namespace Ray.EventBus.RabbitMQ +{ + public static class Extensions + { + public static void AddRabbitMQ( + this IServiceCollection serviceCollection, + Action rabbitConfigAction, + Func eventBusConfig = default) + { + serviceCollection.Configure(config => rabbitConfigAction(config)); + serviceCollection.AddSingleton(); + serviceCollection.AddHostedService(); + serviceCollection.AddSingleton(); + serviceCollection.AddSingleton(serviceProvider => serviceProvider.GetService() as IProducerContainer); + Startup.Register(async serviceProvider => + { + var container = serviceProvider.GetService(); + if (eventBusConfig != default) + await eventBusConfig(container); + else + await container.AutoRegister(); + }); + } + } +} diff --git a/src/Pole.EventBus.Rabbitmq/Pole.EventBus.Rabbitmq.csproj b/src/Pole.EventBus.Rabbitmq/Pole.EventBus.Rabbitmq.csproj new file mode 100644 index 0000000..dde5642 --- /dev/null +++ b/src/Pole.EventBus.Rabbitmq/Pole.EventBus.Rabbitmq.csproj @@ -0,0 +1,20 @@ + + + + netstandard2.1 + + + + + + + + + + + + + + + + diff --git a/src/Pole.EventBus.Rabbitmq/Producer/RabbitProducer.cs b/src/Pole.EventBus.Rabbitmq/Producer/RabbitProducer.cs new file mode 100644 index 0000000..4a91148 --- /dev/null +++ b/src/Pole.EventBus.Rabbitmq/Producer/RabbitProducer.cs @@ -0,0 +1,25 @@ +using Ray.Core; +using Ray.Core.EventBus; +using System.Threading.Tasks; + +namespace Ray.EventBus.RabbitMQ +{ + public class RabbitProducer : IProducer + { + readonly RabbitEventBus publisher; + readonly IRabbitMQClient rabbitMQClient; + public RabbitProducer( + IRabbitMQClient rabbitMQClient, + RabbitEventBus publisher) + { + this.publisher = publisher; + this.rabbitMQClient = rabbitMQClient; + } + public ValueTask Publish(byte[] bytes, string hashKey) + { + using var model = rabbitMQClient.PullModel(); + model.Publish(bytes, publisher.Exchange, publisher.GetRoute(hashKey), publisher.Persistent); + return Consts.ValueTaskDone; + } + } +} -- libgit2 0.25.0