diff --git a/src/Pole.Core/Abstraction/IGrainID.cs b/src/Pole.Core/Abstraction/IGrainID.cs index b823f9a..9b35a78 100644 --- a/src/Pole.Core/Abstraction/IGrainID.cs +++ b/src/Pole.Core/Abstraction/IGrainID.cs @@ -1,6 +1,6 @@ using System; -namespace Ray.Core.Abstractions +namespace Pole.Core.Abstractions { public interface IGrainID { diff --git a/src/Pole.Core/Abstraction/IObserverUnit.cs b/src/Pole.Core/Abstraction/IObserverUnit.cs index ccd188e..32b086d 100644 --- a/src/Pole.Core/Abstraction/IObserverUnit.cs +++ b/src/Pole.Core/Abstraction/IObserverUnit.cs @@ -2,7 +2,7 @@ using System.Collections.Generic; using System.Threading.Tasks; -namespace Ray.Core.Abstractions +namespace Pole.Core.Abstractions { public interface IObserverUnit : IGrainID { diff --git a/src/Pole.Core/Abstraction/IObserverUnitContainer.cs b/src/Pole.Core/Abstraction/IObserverUnitContainer.cs index 054bb7e..5da6d1e 100644 --- a/src/Pole.Core/Abstraction/IObserverUnitContainer.cs +++ b/src/Pole.Core/Abstraction/IObserverUnitContainer.cs @@ -1,6 +1,6 @@ using System; -namespace Ray.Core.Abstractions +namespace Pole.Core.Abstractions { public interface IObserverUnitContainer { diff --git a/src/Pole.Core/Channels/Abstractions/IBaseMpscChannel.cs b/src/Pole.Core/Channels/Abstractions/IBaseMpscChannel.cs index 5b0d1c4..9feffa6 100644 --- a/src/Pole.Core/Channels/Abstractions/IBaseMpscChannel.cs +++ b/src/Pole.Core/Channels/Abstractions/IBaseMpscChannel.cs @@ -1,6 +1,6 @@ using System.Threading.Tasks; -namespace Ray.Core.Channels +namespace Pole.Core.Channels { public interface IBaseMpscChannel { diff --git a/src/Pole.Core/Channels/Abstractions/IMpscChannel.cs b/src/Pole.Core/Channels/Abstractions/IMpscChannel.cs index a0aeb7f..63521fe 100644 --- a/src/Pole.Core/Channels/Abstractions/IMpscChannel.cs +++ b/src/Pole.Core/Channels/Abstractions/IMpscChannel.cs @@ -2,7 +2,7 @@ using System.Collections.Generic; using System.Threading.Tasks; -namespace Ray.Core.Channels +namespace Pole.Core.Channels { public interface IMpscChannel : IBaseMpscChannel { diff --git a/src/Pole.Core/Channels/AsyncInputEvent.cs b/src/Pole.Core/Channels/AsyncInputEvent.cs index bf8811b..a1818c3 100644 --- a/src/Pole.Core/Channels/AsyncInputEvent.cs +++ b/src/Pole.Core/Channels/AsyncInputEvent.cs @@ -1,6 +1,6 @@ using System.Threading.Tasks; -namespace Ray.Core.Channels +namespace Pole.Core.Channels { public class AsyncInputEvent { diff --git a/src/Pole.Core/Channels/ChannelOptions.cs b/src/Pole.Core/Channels/ChannelOptions.cs index 9c62f0a..8ee2d13 100644 --- a/src/Pole.Core/Channels/ChannelOptions.cs +++ b/src/Pole.Core/Channels/ChannelOptions.cs @@ -1,4 +1,4 @@ -namespace Ray.Core.Channels +namespace Pole.Core.Channels { public class ChannelOptions { diff --git a/src/Pole.Core/Channels/MpscChannel.cs b/src/Pole.Core/Channels/MpscChannel.cs index fcc8155..77de015 100644 --- a/src/Pole.Core/Channels/MpscChannel.cs +++ b/src/Pole.Core/Channels/MpscChannel.cs @@ -7,7 +7,7 @@ using System.Threading.Tasks.Dataflow; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; -namespace Ray.Core.Channels +namespace Pole.Core.Channels { /// /// multi producter single consumer channel diff --git a/src/Pole.Core/Channels/NoBindConsumerException.cs b/src/Pole.Core/Channels/NoBindConsumerException.cs index f643c75..362a4bb 100644 --- a/src/Pole.Core/Channels/NoBindConsumerException.cs +++ b/src/Pole.Core/Channels/NoBindConsumerException.cs @@ -1,6 +1,6 @@ using System; -namespace Ray.Core.Channels +namespace Pole.Core.Channels { public class NoBindConsumerException : Exception { diff --git a/src/Pole.Core/Channels/RebindConsumerException.cs b/src/Pole.Core/Channels/RebindConsumerException.cs index d6da90d..e7dc8ca 100644 --- a/src/Pole.Core/Channels/RebindConsumerException.cs +++ b/src/Pole.Core/Channels/RebindConsumerException.cs @@ -1,6 +1,6 @@ using System; -namespace Ray.Core.Channels +namespace Pole.Core.Channels { public class RebindConsumerException : Exception { diff --git a/src/Pole.Core/Consts.cs b/src/Pole.Core/Consts.cs new file mode 100644 index 0000000..cf3dd2a --- /dev/null +++ b/src/Pole.Core/Consts.cs @@ -0,0 +1,12 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; + +namespace Pole.Core +{ + public static class Consts + { + public static ValueTask ValueTaskDone = new ValueTask(); + } +} diff --git a/src/Pole.Core/EventBus/Consumer.cs b/src/Pole.Core/EventBus/Consumer.cs index 4d1fa15..15d2857 100644 --- a/src/Pole.Core/EventBus/Consumer.cs +++ b/src/Pole.Core/EventBus/Consumer.cs @@ -3,7 +3,7 @@ using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; -namespace Ray.Core.EventBus +namespace Pole.Core.EventBus { public abstract class Consumer : IConsumer { diff --git a/src/Pole.Core/EventBus/Event/EventInfoAttribute.cs b/src/Pole.Core/EventBus/Event/EventInfoAttribute.cs new file mode 100644 index 0000000..a75cdea --- /dev/null +++ b/src/Pole.Core/EventBus/Event/EventInfoAttribute.cs @@ -0,0 +1,12 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace Pole.Core.EventBus.Event +{ + [AttributeUsage(AttributeTargets.Class)] + public class EventInfoAttribute: Attribute + { + public string SendBoxName { get; set; } + } +} diff --git a/src/Pole.EventBus.Rabbitmq/Core/Client/IRabbitMQClient.cs b/src/Pole.Core/EventBus/Event/IEvent.cs similarity index 50% rename from src/Pole.EventBus.Rabbitmq/Core/Client/IRabbitMQClient.cs rename to src/Pole.Core/EventBus/Event/IEvent.cs index 21958c9..1b80f3d 100644 --- a/src/Pole.EventBus.Rabbitmq/Core/Client/IRabbitMQClient.cs +++ b/src/Pole.Core/EventBus/Event/IEvent.cs @@ -1,7 +1,10 @@ -namespace Ray.EventBus.RabbitMQ +using System; +using System.Collections.Generic; +using System.Text; + +namespace Pole.Core.EventBus.Event { - public interface IRabbitMQClient + public interface IEvent { - ModelWrapper PullModel(); } } diff --git a/src/Pole.Core/EventBus/IConsumer.cs b/src/Pole.Core/EventBus/IConsumer.cs index 7ea0b2a..d46a039 100644 --- a/src/Pole.Core/EventBus/IConsumer.cs +++ b/src/Pole.Core/EventBus/IConsumer.cs @@ -1,7 +1,7 @@ using System.Collections.Generic; using System.Threading.Tasks; -namespace Ray.Core.EventBus +namespace Pole.Core.EventBus { public interface IConsumer { diff --git a/src/Pole.Core/EventBus/IConsumerContainer.cs b/src/Pole.Core/EventBus/IConsumerContainer.cs index fa7d524..954552e 100644 --- a/src/Pole.Core/EventBus/IConsumerContainer.cs +++ b/src/Pole.Core/EventBus/IConsumerContainer.cs @@ -1,6 +1,6 @@ using System.Collections.Generic; -namespace Ray.Core.EventBus +namespace Pole.Core.EventBus { public interface IConsumerContainer { diff --git a/src/Pole.Core/EventBus/IProducer.cs b/src/Pole.Core/EventBus/IProducer.cs index b8e2c15..82e5fee 100644 --- a/src/Pole.Core/EventBus/IProducer.cs +++ b/src/Pole.Core/EventBus/IProducer.cs @@ -1,6 +1,6 @@ using System.Threading.Tasks; -namespace Ray.Core.EventBus +namespace Pole.Core.EventBus { public interface IProducer { diff --git a/src/Pole.Core/EventBus/IProducerContainer.cs b/src/Pole.Core/EventBus/IProducerContainer.cs index 4d25e74..9a79460 100644 --- a/src/Pole.Core/EventBus/IProducerContainer.cs +++ b/src/Pole.Core/EventBus/IProducerContainer.cs @@ -1,7 +1,7 @@ using System; using System.Threading.Tasks; -namespace Ray.Core.EventBus +namespace Pole.Core.EventBus { public interface IProducerContainer { diff --git a/src/Pole.Core/Exceptions/BeginTxTimeoutException.cs b/src/Pole.Core/Exceptions/BeginTxTimeoutException.cs index 9f73444..f0da534 100644 --- a/src/Pole.Core/Exceptions/BeginTxTimeoutException.cs +++ b/src/Pole.Core/Exceptions/BeginTxTimeoutException.cs @@ -1,6 +1,6 @@ using System; -namespace Ray.Core.Exceptions +namespace Pole.Core.Exceptions { public class BeginTxTimeoutException : Exception { diff --git a/src/Pole.Core/Exceptions/ChannelUnavailabilityException.cs b/src/Pole.Core/Exceptions/ChannelUnavailabilityException.cs index a166cf3..e271283 100644 --- a/src/Pole.Core/Exceptions/ChannelUnavailabilityException.cs +++ b/src/Pole.Core/Exceptions/ChannelUnavailabilityException.cs @@ -1,6 +1,6 @@ using System; -namespace Ray.Core.Exceptions +namespace Pole.Core.Exceptions { public class ChannelUnavailabilityException : Exception { diff --git a/src/Pole.Core/Exceptions/EventBusRepeatBindingProducerException.cs b/src/Pole.Core/Exceptions/EventBusRepeatBindingProducerException.cs index 755fc71..b0fe921 100644 --- a/src/Pole.Core/Exceptions/EventBusRepeatBindingProducerException.cs +++ b/src/Pole.Core/Exceptions/EventBusRepeatBindingProducerException.cs @@ -1,6 +1,6 @@ using System; -namespace Ray.Core.Exceptions +namespace Pole.Core.Exceptions { public class EventBusRepeatBindingProducerException : Exception { diff --git a/src/Pole.Core/Exceptions/EventIsClearedException.cs b/src/Pole.Core/Exceptions/EventIsClearedException.cs index 8f63b91..c3137ad 100644 --- a/src/Pole.Core/Exceptions/EventIsClearedException.cs +++ b/src/Pole.Core/Exceptions/EventIsClearedException.cs @@ -1,6 +1,6 @@ using System; -namespace Ray.Core.Exceptions +namespace Pole.Core.Exceptions { public class EventIsClearedException : Exception { diff --git a/src/Pole.Core/Exceptions/EventVersionUnorderedException.cs b/src/Pole.Core/Exceptions/EventVersionUnorderedException.cs index 3f3caeb..42262dd 100644 --- a/src/Pole.Core/Exceptions/EventVersionUnorderedException.cs +++ b/src/Pole.Core/Exceptions/EventVersionUnorderedException.cs @@ -1,6 +1,6 @@ using System; -namespace Ray.Core.Exceptions +namespace Pole.Core.Exceptions { public class EventVersionUnorderedException : Exception { diff --git a/src/Pole.Core/Exceptions/ObserverNotCompletedException.cs b/src/Pole.Core/Exceptions/ObserverNotCompletedException.cs index 9049f24..e19b69e 100644 --- a/src/Pole.Core/Exceptions/ObserverNotCompletedException.cs +++ b/src/Pole.Core/Exceptions/ObserverNotCompletedException.cs @@ -1,6 +1,6 @@ using System; -namespace Ray.Core.Exceptions +namespace Pole.Core.Exceptions { public class ObserverNotCompletedException : Exception { diff --git a/src/Pole.Core/Exceptions/ObserverUnitRepeatedException.cs b/src/Pole.Core/Exceptions/ObserverUnitRepeatedException.cs index 112353f..db89b95 100644 --- a/src/Pole.Core/Exceptions/ObserverUnitRepeatedException.cs +++ b/src/Pole.Core/Exceptions/ObserverUnitRepeatedException.cs @@ -1,6 +1,6 @@ using System; -namespace Ray.Core.Exceptions +namespace Pole.Core.Exceptions { public class ObserverUnitRepeatedException : Exception { diff --git a/src/Pole.Core/Exceptions/PrimaryKeyTypeException.cs b/src/Pole.Core/Exceptions/PrimaryKeyTypeException.cs index 6f24284..3d0182c 100644 --- a/src/Pole.Core/Exceptions/PrimaryKeyTypeException.cs +++ b/src/Pole.Core/Exceptions/PrimaryKeyTypeException.cs @@ -1,6 +1,6 @@ using System; -namespace Ray.Core.Exceptions +namespace Pole.Core.Exceptions { public class PrimaryKeyTypeException : Exception { diff --git a/src/Pole.Core/Exceptions/RepeatedTxException.cs b/src/Pole.Core/Exceptions/RepeatedTxException.cs index 6f2c215..ac7b83e 100644 --- a/src/Pole.Core/Exceptions/RepeatedTxException.cs +++ b/src/Pole.Core/Exceptions/RepeatedTxException.cs @@ -1,6 +1,6 @@ using System; -namespace Ray.Core.Exceptions +namespace Pole.Core.Exceptions { public class RepeatedTxException : Exception { diff --git a/src/Pole.Core/Exceptions/SnapshotNotSupportTxException.cs b/src/Pole.Core/Exceptions/SnapshotNotSupportTxException.cs index 9e8013f..463b3ba 100644 --- a/src/Pole.Core/Exceptions/SnapshotNotSupportTxException.cs +++ b/src/Pole.Core/Exceptions/SnapshotNotSupportTxException.cs @@ -1,6 +1,6 @@ using System; -namespace Ray.Core.Exceptions +namespace Pole.Core.Exceptions { public class SnapshotNotSupportTxException : Exception { diff --git a/src/Pole.Core/Exceptions/StateInsecurityException.cs b/src/Pole.Core/Exceptions/StateInsecurityException.cs index d6b305a..5f87ede 100644 --- a/src/Pole.Core/Exceptions/StateInsecurityException.cs +++ b/src/Pole.Core/Exceptions/StateInsecurityException.cs @@ -1,6 +1,6 @@ using System; -namespace Ray.Core.Exceptions +namespace Pole.Core.Exceptions { public class StateInsecurityException : Exception { diff --git a/src/Pole.Core/Exceptions/StateIsOverException.cs b/src/Pole.Core/Exceptions/StateIsOverException.cs index 280c9cd..8e3d7e7 100644 --- a/src/Pole.Core/Exceptions/StateIsOverException.cs +++ b/src/Pole.Core/Exceptions/StateIsOverException.cs @@ -1,6 +1,6 @@ using System; -namespace Ray.Core.Exceptions +namespace Pole.Core.Exceptions { public class StateIsOverException : Exception { diff --git a/src/Pole.Core/Exceptions/TxCommitException.cs b/src/Pole.Core/Exceptions/TxCommitException.cs index 083191f..a8dba18 100644 --- a/src/Pole.Core/Exceptions/TxCommitException.cs +++ b/src/Pole.Core/Exceptions/TxCommitException.cs @@ -1,6 +1,6 @@ using System; -namespace Ray.Core.Exceptions +namespace Pole.Core.Exceptions { public class TxCommitException : Exception { diff --git a/src/Pole.Core/Exceptions/TxIdException.cs b/src/Pole.Core/Exceptions/TxIdException.cs index f77847f..cae8003 100644 --- a/src/Pole.Core/Exceptions/TxIdException.cs +++ b/src/Pole.Core/Exceptions/TxIdException.cs @@ -1,6 +1,6 @@ using System; -namespace Ray.Core.Exceptions +namespace Pole.Core.Exceptions { public class TxIdException : Exception { diff --git a/src/Pole.Core/Exceptions/TxSnapshotException.cs b/src/Pole.Core/Exceptions/TxSnapshotException.cs index 3de4698..68c995c 100644 --- a/src/Pole.Core/Exceptions/TxSnapshotException.cs +++ b/src/Pole.Core/Exceptions/TxSnapshotException.cs @@ -1,6 +1,6 @@ using System; -namespace Ray.Core.Exceptions +namespace Pole.Core.Exceptions { public class TxSnapshotException : Exception { diff --git a/src/Pole.Core/Exceptions/TypeCodeRepeatedException.cs b/src/Pole.Core/Exceptions/TypeCodeRepeatedException.cs index 2fd84fa..f168c6a 100644 --- a/src/Pole.Core/Exceptions/TypeCodeRepeatedException.cs +++ b/src/Pole.Core/Exceptions/TypeCodeRepeatedException.cs @@ -1,6 +1,6 @@ using System; -namespace Ray.Core.Exceptions +namespace Pole.Core.Exceptions { public class TypeCodeRepeatedException : Exception { diff --git a/src/Pole.Core/Exceptions/UnMatchObserverUnitException.cs b/src/Pole.Core/Exceptions/UnMatchObserverUnitException.cs index 8c67d30..f4e60e4 100644 --- a/src/Pole.Core/Exceptions/UnMatchObserverUnitException.cs +++ b/src/Pole.Core/Exceptions/UnMatchObserverUnitException.cs @@ -1,6 +1,6 @@ using System; -namespace Ray.Core.Exceptions +namespace Pole.Core.Exceptions { public class UnmatchObserverUnitException : Exception { diff --git a/src/Pole.Core/Exceptions/UnfindEventHandlerException.cs b/src/Pole.Core/Exceptions/UnfindEventHandlerException.cs index 0cc6df5..e2b3f1c 100644 --- a/src/Pole.Core/Exceptions/UnfindEventHandlerException.cs +++ b/src/Pole.Core/Exceptions/UnfindEventHandlerException.cs @@ -1,6 +1,6 @@ using System; -namespace Ray.Core.Exceptions +namespace Pole.Core.Exceptions { public class UnfindEventHandlerException : Exception { diff --git a/src/Pole.Core/Exceptions/UnfindObserverUnitException.cs b/src/Pole.Core/Exceptions/UnfindObserverUnitException.cs index ecc8a44..dada47f 100644 --- a/src/Pole.Core/Exceptions/UnfindObserverUnitException.cs +++ b/src/Pole.Core/Exceptions/UnfindObserverUnitException.cs @@ -1,6 +1,6 @@ using System; -namespace Ray.Core.Exceptions +namespace Pole.Core.Exceptions { public class UnfindObserverUnitException : Exception { diff --git a/src/Pole.Core/Exceptions/UnfindSnapshotHandlerException.cs b/src/Pole.Core/Exceptions/UnfindSnapshotHandlerException.cs index f732d2f..bdcd289 100644 --- a/src/Pole.Core/Exceptions/UnfindSnapshotHandlerException.cs +++ b/src/Pole.Core/Exceptions/UnfindSnapshotHandlerException.cs @@ -1,6 +1,6 @@ using System; -namespace Ray.Core.Exceptions +namespace Pole.Core.Exceptions { public class UnfindSnapshotHandlerException : Exception { diff --git a/src/Pole.Core/Exceptions/UnknowTypeCodeException.cs b/src/Pole.Core/Exceptions/UnknowTypeCodeException.cs index 184e15a..b63e764 100644 --- a/src/Pole.Core/Exceptions/UnknowTypeCodeException.cs +++ b/src/Pole.Core/Exceptions/UnknowTypeCodeException.cs @@ -1,6 +1,6 @@ using System; -namespace Ray.Core.Exceptions +namespace Pole.Core.Exceptions { public class UnknowTypeCodeException : Exception { diff --git a/src/Pole.Core/Exceptions/UnopenedTransactionException.cs b/src/Pole.Core/Exceptions/UnopenedTransactionException.cs index 97505ad..c139a0d 100644 --- a/src/Pole.Core/Exceptions/UnopenedTransactionException.cs +++ b/src/Pole.Core/Exceptions/UnopenedTransactionException.cs @@ -1,6 +1,6 @@ using System; -namespace Ray.Core.Exceptions +namespace Pole.Core.Exceptions { public class UnopenedTransactionException : Exception { diff --git a/src/Pole.Core/Observer/Abstraction/Attributes/ObserverAttribute.cs b/src/Pole.Core/Observer/Abstraction/Attributes/ObserverAttribute.cs index 038894d..63f9f54 100644 --- a/src/Pole.Core/Observer/Abstraction/Attributes/ObserverAttribute.cs +++ b/src/Pole.Core/Observer/Abstraction/Attributes/ObserverAttribute.cs @@ -2,7 +2,7 @@ using System.Collections.Generic; using System.Linq; -namespace Ray.Core.Observer +namespace Pole.Core.Observer { /// /// 标记为观察者 diff --git a/src/Pole.Core/Observer/Abstraction/IObserver.cs b/src/Pole.Core/Observer/Abstraction/IObserver.cs index ed404e1..22bc455 100644 --- a/src/Pole.Core/Observer/Abstraction/IObserver.cs +++ b/src/Pole.Core/Observer/Abstraction/IObserver.cs @@ -2,7 +2,7 @@ using System.Threading.Tasks; using Orleans.Concurrency; -namespace Ray.Core.Observer +namespace Pole.Core.Observer { public interface IObserver : IVersion { diff --git a/src/Pole.Core/Observer/Abstraction/IVersion.cs b/src/Pole.Core/Observer/Abstraction/IVersion.cs index f7f2ace..67f6c26 100644 --- a/src/Pole.Core/Observer/Abstraction/IVersion.cs +++ b/src/Pole.Core/Observer/Abstraction/IVersion.cs @@ -1,6 +1,6 @@ using System.Threading.Tasks; -namespace Ray.Core.Observer +namespace Pole.Core.Observer { public interface IVersion { diff --git a/src/Pole.Core/Services/Abstraction/IHoldLock.cs b/src/Pole.Core/Services/Abstraction/IHoldLock.cs index a7d6ff2..b170201 100644 --- a/src/Pole.Core/Services/Abstraction/IHoldLock.cs +++ b/src/Pole.Core/Services/Abstraction/IHoldLock.cs @@ -1,7 +1,7 @@ using System.Threading.Tasks; using Orleans; -namespace Ray.Core.Services +namespace Pole.Core.Services { public interface IHoldLock : IGrainWithStringKey { diff --git a/src/Pole.Core/Services/Abstraction/ILocalUID.cs b/src/Pole.Core/Services/Abstraction/ILocalUID.cs index fc35bf2..08191e7 100644 --- a/src/Pole.Core/Services/Abstraction/ILocalUID.cs +++ b/src/Pole.Core/Services/Abstraction/ILocalUID.cs @@ -2,7 +2,7 @@ using Orleans; using Orleans.Concurrency; -namespace Ray.Core.Services +namespace Pole.Core.Services { public interface ILocalUID : IGrainWithStringKey { diff --git a/src/Pole.Core/Services/Abstraction/ILock.cs b/src/Pole.Core/Services/Abstraction/ILock.cs index ca9ae7a..67264b3 100644 --- a/src/Pole.Core/Services/Abstraction/ILock.cs +++ b/src/Pole.Core/Services/Abstraction/ILock.cs @@ -1,7 +1,7 @@ using System.Threading.Tasks; using Orleans; -namespace Ray.Core.Services +namespace Pole.Core.Services { public interface ILock : IGrainWithStringKey { diff --git a/src/Pole.Core/Services/Abstraction/IUtcUID.cs b/src/Pole.Core/Services/Abstraction/IUtcUID.cs index 6f15877..7b56d03 100644 --- a/src/Pole.Core/Services/Abstraction/IUtcUID.cs +++ b/src/Pole.Core/Services/Abstraction/IUtcUID.cs @@ -2,7 +2,7 @@ using Orleans.Concurrency; using System.Threading.Tasks; -namespace Ray.Core.Services +namespace Pole.Core.Services { public interface IUtcUID : IGrainWithStringKey { diff --git a/src/Pole.Core/Services/Abstraction/IWeightHoldLock.cs b/src/Pole.Core/Services/Abstraction/IWeightHoldLock.cs index a1bf178..8644a55 100644 --- a/src/Pole.Core/Services/Abstraction/IWeightHoldLock.cs +++ b/src/Pole.Core/Services/Abstraction/IWeightHoldLock.cs @@ -2,7 +2,7 @@ using Orleans; -namespace Ray.Core.Services +namespace Pole.Core.Services { public interface IWeightHoldLock : IGrainWithStringKey { diff --git a/src/Pole.Core/Services/HoldLockGrain.cs b/src/Pole.Core/Services/HoldLockGrain.cs index d6869f6..4cab2d2 100644 --- a/src/Pole.Core/Services/HoldLockGrain.cs +++ b/src/Pole.Core/Services/HoldLockGrain.cs @@ -2,7 +2,7 @@ using System.Threading.Tasks; using Orleans; -namespace Ray.Core.Services +namespace Pole.Core.Services { public class HoldLockGrain : Grain, IHoldLock { diff --git a/src/Pole.Core/Services/LocalUIDGrain.cs b/src/Pole.Core/Services/LocalUIDGrain.cs index 74144e1..9408031 100644 --- a/src/Pole.Core/Services/LocalUIDGrain.cs +++ b/src/Pole.Core/Services/LocalUIDGrain.cs @@ -4,7 +4,7 @@ using System; using System.Threading; using System.Threading.Tasks; -namespace Ray.Core.Services +namespace Pole.Core.Services { [Reentrant] public class LocalUIDGrain : Grain, ILocalUID diff --git a/src/Pole.Core/Services/LockGrain.cs b/src/Pole.Core/Services/LockGrain.cs index 198f0c4..0e9206b 100644 --- a/src/Pole.Core/Services/LockGrain.cs +++ b/src/Pole.Core/Services/LockGrain.cs @@ -3,7 +3,7 @@ using System.Threading.Tasks; using Orleans; using Orleans.Concurrency; -namespace Ray.Core.Services +namespace Pole.Core.Services { [Reentrant] public class LockGrain : Grain, ILock diff --git a/src/Pole.Core/Services/UtcUIDGrain.cs b/src/Pole.Core/Services/UtcUIDGrain.cs index 58d465a..638496c 100644 --- a/src/Pole.Core/Services/UtcUIDGrain.cs +++ b/src/Pole.Core/Services/UtcUIDGrain.cs @@ -4,7 +4,7 @@ using System; using System.Threading; using System.Threading.Tasks; -namespace Ray.Core.Services +namespace Pole.Core.Services { [Reentrant] public class UtcUIDGrain : Grain, IUtcUID diff --git a/src/Pole.Core/Services/WeightHoldLock.cs b/src/Pole.Core/Services/WeightHoldLock.cs index 7c17930..0d68149 100644 --- a/src/Pole.Core/Services/WeightHoldLock.cs +++ b/src/Pole.Core/Services/WeightHoldLock.cs @@ -2,7 +2,7 @@ using System.Threading.Tasks; using Orleans; -namespace Ray.Core.Services +namespace Pole.Core.Services { public class WeightHoldLock : Grain, IWeightHoldLock { diff --git a/src/Pole.Core/Startup.cs b/src/Pole.Core/Startup.cs new file mode 100644 index 0000000..8f98d93 --- /dev/null +++ b/src/Pole.Core/Startup.cs @@ -0,0 +1,31 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; + +namespace Pole.Core +{ + public static class Startup + { + static List tasks = new List(); + public static void Register(Func method, int sortIndex = 0) + { + tasks.Add(new StartupTask(sortIndex, method)); + } + internal static Task StartRay(IServiceProvider serviceProvider) + { + tasks = tasks.OrderBy(func => func.SortIndex).ToList(); + return Task.WhenAll(tasks.Select(value => value.Func(serviceProvider))); + } + private class StartupTask + { + public StartupTask(int sortIndex, Func func) + { + SortIndex = sortIndex; + Func = func; + } + public int SortIndex { get; set; } + public Func Func { get; set; } + } + } +} diff --git a/src/Pole.Core/Utils/AssemblyHelper.cs b/src/Pole.Core/Utils/AssemblyHelper.cs index 75aba1f..8c0c43b 100644 --- a/src/Pole.Core/Utils/AssemblyHelper.cs +++ b/src/Pole.Core/Utils/AssemblyHelper.cs @@ -6,7 +6,7 @@ using System.Linq; using System.Reflection; using System.Runtime.Loader; -namespace Ray.Core.Utils +namespace Pole.Core.Utils { public class AssemblyHelper { diff --git a/src/Pole.Core/Utils/ConsistentHash.cs b/src/Pole.Core/Utils/ConsistentHash.cs index 03c50e6..6955009 100644 --- a/src/Pole.Core/Utils/ConsistentHash.cs +++ b/src/Pole.Core/Utils/ConsistentHash.cs @@ -4,7 +4,7 @@ using System.Linq; using System.Runtime.CompilerServices; using System.Text; -namespace Ray.Core.Utils +namespace Pole.Core.Utils { public class ConsistentHash { @@ -72,7 +72,7 @@ namespace Ray.Core.Utils } //return the index of first item that >= val. //if not exist, return 0; - //ay should be ordered array. + //ay should be ordered arPole. [MethodImpl(MethodImplOptions.AggressiveInlining)] private static int First_ge(int[] ay, int val) { diff --git a/src/Pole.Core/Utils/Emit/SwitchMethodEmit.cs b/src/Pole.Core/Utils/Emit/SwitchMethodEmit.cs index 9645815..fa9ec63 100644 --- a/src/Pole.Core/Utils/Emit/SwitchMethodEmit.cs +++ b/src/Pole.Core/Utils/Emit/SwitchMethodEmit.cs @@ -2,7 +2,7 @@ using System.Reflection; using System.Reflection.Emit; -namespace Ray.Core.Utils.Emit +namespace Pole.Core.Utils.Emit { /// /// 用来生成模式匹配方法调用的方法信息 diff --git a/src/Pole.Core/Utils/MurmurHash2.cs b/src/Pole.Core/Utils/MurmurHash2.cs index 0563565..9cdfd63 100644 --- a/src/Pole.Core/Utils/MurmurHash2.cs +++ b/src/Pole.Core/Utils/MurmurHash2.cs @@ -1,6 +1,6 @@ using System.Runtime.InteropServices; -namespace Ray.Core.Utils +namespace Pole.Core.Utils { public class MurmurHash2 { diff --git a/src/Pole.Core/Utils/PooledMemoryStream.cs b/src/Pole.Core/Utils/PooledMemoryStream.cs index ce31a2a..eaffd6b 100644 --- a/src/Pole.Core/Utils/PooledMemoryStream.cs +++ b/src/Pole.Core/Utils/PooledMemoryStream.cs @@ -2,7 +2,7 @@ using System.IO; using System.Buffers; -namespace Ray.Core.Utils +namespace Pole.Core.Utils { public class PooledMemoryStream : Stream { diff --git a/src/Pole.EventBus.Rabbitmq/Core/Attributes/ProducerAttribute.cs b/src/Pole.EventBus.Rabbitmq/Attributes/ProducerAttribute.cs similarity index 96% rename from src/Pole.EventBus.Rabbitmq/Core/Attributes/ProducerAttribute.cs rename to src/Pole.EventBus.Rabbitmq/Attributes/ProducerAttribute.cs index c7c7056..ea16731 100644 --- a/src/Pole.EventBus.Rabbitmq/Core/Attributes/ProducerAttribute.cs +++ b/src/Pole.EventBus.Rabbitmq/Attributes/ProducerAttribute.cs @@ -1,6 +1,6 @@ using System; -namespace Ray.EventBus.RabbitMQ +namespace Pole.EventBus.RabbitMQ { [AttributeUsage(AttributeTargets.Class, AllowMultiple = false)] public class ProducerAttribute : Attribute diff --git a/src/Pole.EventBus.Rabbitmq/Client/ConnectionWrapper.cs b/src/Pole.EventBus.Rabbitmq/Client/ConnectionWrapper.cs index 0de42d8..25b5704 100644 --- a/src/Pole.EventBus.Rabbitmq/Client/ConnectionWrapper.cs +++ b/src/Pole.EventBus.Rabbitmq/Client/ConnectionWrapper.cs @@ -2,7 +2,7 @@ using System.Collections.Generic; using System.Threading; -namespace Ray.EventBus.RabbitMQ +namespace Pole.EventBus.RabbitMQ { public class ConnectionWrapper { @@ -22,7 +22,7 @@ namespace Ray.EventBus.RabbitMQ semaphoreSlim.Wait(); try { - if (models.Count < Options.PoolSizePerConnection) + if (models.Count < Options.MasChannelsPerConnection) { var model = new ModelWrapper(this, connection.CreateModel()); models.Add(model); diff --git a/src/Pole.EventBus.Rabbitmq/Client/IRabbitMQClient.cs b/src/Pole.EventBus.Rabbitmq/Client/IRabbitMQClient.cs index 21958c9..005c988 100644 --- a/src/Pole.EventBus.Rabbitmq/Client/IRabbitMQClient.cs +++ b/src/Pole.EventBus.Rabbitmq/Client/IRabbitMQClient.cs @@ -1,4 +1,4 @@ -namespace Ray.EventBus.RabbitMQ +namespace Pole.EventBus.RabbitMQ { public interface IRabbitMQClient { diff --git a/src/Pole.EventBus.Rabbitmq/Client/ModelPooledObjectPolicy.cs b/src/Pole.EventBus.Rabbitmq/Client/ModelPooledObjectPolicy.cs index 22bfde1..247c5a6 100644 --- a/src/Pole.EventBus.Rabbitmq/Client/ModelPooledObjectPolicy.cs +++ b/src/Pole.EventBus.Rabbitmq/Client/ModelPooledObjectPolicy.cs @@ -3,7 +3,7 @@ using RabbitMQ.Client; using System.Collections.Generic; using System.Threading; -namespace Ray.EventBus.RabbitMQ +namespace Pole.EventBus.RabbitMQ { public class ModelPooledObjectPolicy : IPooledObjectPolicy { diff --git a/src/Pole.EventBus.Rabbitmq/Client/ModelWrapper.cs b/src/Pole.EventBus.Rabbitmq/Client/ModelWrapper.cs index 32a3d1b..3fe924b 100644 --- a/src/Pole.EventBus.Rabbitmq/Client/ModelWrapper.cs +++ b/src/Pole.EventBus.Rabbitmq/Client/ModelWrapper.cs @@ -2,7 +2,7 @@ using RabbitMQ.Client; using System; -namespace Ray.EventBus.RabbitMQ +namespace Pole.EventBus.RabbitMQ { public class ModelWrapper : IDisposable { diff --git a/src/Pole.EventBus.Rabbitmq/Client/RabbitMQClient.cs b/src/Pole.EventBus.Rabbitmq/Client/RabbitMQClient.cs index 412fdfc..aed0fce 100644 --- a/src/Pole.EventBus.Rabbitmq/Client/RabbitMQClient.cs +++ b/src/Pole.EventBus.Rabbitmq/Client/RabbitMQClient.cs @@ -2,7 +2,7 @@ using Microsoft.Extensions.Options; using RabbitMQ.Client; -namespace Ray.EventBus.RabbitMQ +namespace Pole.EventBus.RabbitMQ { public class RabbitMQClient : IRabbitMQClient { @@ -17,7 +17,7 @@ namespace Ray.EventBus.RabbitMQ UserName = options.UserName, Password = options.Password, VirtualHost = options.VirtualHost, - AutomaticRecoveryEnabled = false + AutomaticRecoveryEnabled = true }; pool = new DefaultObjectPool(new ModelPooledObjectPolicy(connectionFactory, options)); } diff --git a/src/Pole.EventBus.Rabbitmq/Configuration/ConsumerOptions.cs b/src/Pole.EventBus.Rabbitmq/Configuration/ConsumerOptions.cs index 148a280..d4fc021 100644 --- a/src/Pole.EventBus.Rabbitmq/Configuration/ConsumerOptions.cs +++ b/src/Pole.EventBus.Rabbitmq/Configuration/ConsumerOptions.cs @@ -1,4 +1,4 @@ -namespace Ray.EventBus.RabbitMQ +namespace Pole.EventBus.RabbitMQ { /// /// Consumer配置信息 diff --git a/src/Pole.EventBus.Rabbitmq/Configuration/RabbitOptions.cs b/src/Pole.EventBus.Rabbitmq/Configuration/RabbitOptions.cs index 20c7384..2fb0cd2 100644 --- a/src/Pole.EventBus.Rabbitmq/Configuration/RabbitOptions.cs +++ b/src/Pole.EventBus.Rabbitmq/Configuration/RabbitOptions.cs @@ -1,14 +1,14 @@ using System.Collections.Generic; using RabbitMQ.Client; -namespace Ray.EventBus.RabbitMQ +namespace Pole.EventBus.RabbitMQ { public class RabbitOptions { public string UserName { get; set; } public string Password { get; set; } public string VirtualHost { get; set; } - public int PoolSizePerConnection { get; set; } = 200; + public int MasChannelsPerConnection { get; set; } = 200; public int MaxConnection { get; set; } = 20; /// /// 消费者批量处理每次处理的最大消息量 @@ -18,6 +18,10 @@ namespace Ray.EventBus.RabbitMQ /// 消费者批量处理每次处理的最大延时 /// public int CunsumerMaxMillisecondsInterval { get; set; } = 1000; + /// + /// exchange 和 queue 名称的前缀 + /// + public string Prefix = "Pole_"; public string[] Hosts { get; set; diff --git a/src/Pole.EventBus.Rabbitmq/Consumer/ConsumerManager.cs b/src/Pole.EventBus.Rabbitmq/Consumer/ConsumerManager.cs index 90634c5..5020221 100644 --- a/src/Pole.EventBus.Rabbitmq/Consumer/ConsumerManager.cs +++ b/src/Pole.EventBus.Rabbitmq/Consumer/ConsumerManager.cs @@ -1,14 +1,14 @@ using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Orleans; -using Ray.Core.Services; +using Pole.Core.Services; using System; using System.Collections.Concurrent; using System.Linq; using System.Threading; using System.Threading.Tasks; -namespace Ray.EventBus.RabbitMQ +namespace Pole.EventBus.RabbitMQ { public class ConsumerManager : IHostedService, IDisposable { diff --git a/src/Pole.EventBus.Rabbitmq/Consumer/ConsumerRunner.cs b/src/Pole.EventBus.Rabbitmq/Consumer/ConsumerRunner.cs index 0838505..b5b3699 100644 --- a/src/Pole.EventBus.Rabbitmq/Consumer/ConsumerRunner.cs +++ b/src/Pole.EventBus.Rabbitmq/Consumer/ConsumerRunner.cs @@ -2,13 +2,13 @@ using Microsoft.Extensions.Logging; using RabbitMQ.Client; using RabbitMQ.Client.Events; -using Ray.Core.Channels; +using Pole.Core.Channels; using System; using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; -namespace Ray.EventBus.RabbitMQ +namespace Pole.EventBus.RabbitMQ { public class ConsumerRunner { diff --git a/src/Pole.EventBus.Rabbitmq/Consumer/RabbitConsumer.cs b/src/Pole.EventBus.Rabbitmq/Consumer/RabbitConsumer.cs index b519362..75efdc4 100644 --- a/src/Pole.EventBus.Rabbitmq/Consumer/RabbitConsumer.cs +++ b/src/Pole.EventBus.Rabbitmq/Consumer/RabbitConsumer.cs @@ -1,9 +1,9 @@ using System; using System.Collections.Generic; using System.Threading.Tasks; -using Ray.Core.EventBus; +using Pole.Core.EventBus; -namespace Ray.EventBus.RabbitMQ +namespace Pole.EventBus.RabbitMQ { public class RabbitConsumer : Consumer { diff --git a/src/Pole.EventBus.Rabbitmq/Core/Client/ConnectionWrapper.cs b/src/Pole.EventBus.Rabbitmq/Core/Client/ConnectionWrapper.cs deleted file mode 100644 index 0de42d8..0000000 --- a/src/Pole.EventBus.Rabbitmq/Core/Client/ConnectionWrapper.cs +++ /dev/null @@ -1,43 +0,0 @@ -using RabbitMQ.Client; -using System.Collections.Generic; -using System.Threading; - -namespace Ray.EventBus.RabbitMQ -{ - public class ConnectionWrapper - { - private readonly List models = new List(); - private readonly IConnection connection; - readonly SemaphoreSlim semaphoreSlim = new SemaphoreSlim(1); - public ConnectionWrapper( - IConnection connection, - RabbitOptions options) - { - this.connection = connection; - Options = options; - } - public RabbitOptions Options { get; } - public (bool success, ModelWrapper model) Get() - { - semaphoreSlim.Wait(); - try - { - if (models.Count < Options.PoolSizePerConnection) - { - var model = new ModelWrapper(this, connection.CreateModel()); - models.Add(model); - return (true, model); - } - } - finally - { - semaphoreSlim.Release(); - } - return (false, default); - } - public void Return(ModelWrapper model) - { - models.Remove(model); - } - } -} diff --git a/src/Pole.EventBus.Rabbitmq/Core/Client/ModelPooledObjectPolicy.cs b/src/Pole.EventBus.Rabbitmq/Core/Client/ModelPooledObjectPolicy.cs deleted file mode 100644 index 22bfde1..0000000 --- a/src/Pole.EventBus.Rabbitmq/Core/Client/ModelPooledObjectPolicy.cs +++ /dev/null @@ -1,57 +0,0 @@ -using Microsoft.Extensions.ObjectPool; -using RabbitMQ.Client; -using System.Collections.Generic; -using System.Threading; - -namespace Ray.EventBus.RabbitMQ -{ - public class ModelPooledObjectPolicy : IPooledObjectPolicy - { - readonly ConnectionFactory connectionFactory; - readonly List connections = new List(); - readonly SemaphoreSlim semaphoreSlim = new SemaphoreSlim(1); - readonly RabbitOptions options; - public ModelPooledObjectPolicy(ConnectionFactory connectionFactory, RabbitOptions options) - { - this.connectionFactory = connectionFactory; - this.options = options; - } - public ModelWrapper Create() - { - foreach (var connection in connections) - { - (bool success, ModelWrapper model) = connection.Get(); - if (success) - return model; - } - semaphoreSlim.Wait(); - try - { - if (connections.Count < options.MaxConnection) - { - var connection = new ConnectionWrapper(connectionFactory.CreateConnection(options.EndPoints), options); - (bool success, ModelWrapper model) = connection.Get(); - connections.Add(connection); - if (success) - return model; - } - throw new System.OverflowException(nameof(connections)); - } - finally - { - semaphoreSlim.Release(); - } - } - - public bool Return(ModelWrapper obj) - { - if (obj.Model.IsOpen) - return true; - else - { - obj.ForceDispose(); - return false; - } - } - } -} diff --git a/src/Pole.EventBus.Rabbitmq/Core/Client/ModelWrapper.cs b/src/Pole.EventBus.Rabbitmq/Core/Client/ModelWrapper.cs deleted file mode 100644 index 32a3d1b..0000000 --- a/src/Pole.EventBus.Rabbitmq/Core/Client/ModelWrapper.cs +++ /dev/null @@ -1,40 +0,0 @@ -using Microsoft.Extensions.ObjectPool; -using RabbitMQ.Client; -using System; - -namespace Ray.EventBus.RabbitMQ -{ - public class ModelWrapper : IDisposable - { - readonly IBasicProperties persistentProperties; - readonly IBasicProperties noPersistentProperties; - public DefaultObjectPool Pool { get; set; } - public ConnectionWrapper Connection { get; set; } - public IModel Model { get; set; } - public ModelWrapper( - ConnectionWrapper connectionWrapper, - IModel model) - { - Connection = connectionWrapper; - Model = model; - persistentProperties = Model.CreateBasicProperties(); - persistentProperties.Persistent = true; - noPersistentProperties = Model.CreateBasicProperties(); - noPersistentProperties.Persistent = false; - } - public void Publish(byte[] msg, string exchange, string routingKey, bool persistent = true) - { - Model.BasicPublish(exchange, routingKey, persistent ? persistentProperties : noPersistentProperties, msg); - } - public void Dispose() - { - Pool.Return(this); - } - public void ForceDispose() - { - Model.Close(); - Model.Dispose(); - Connection.Return(this); - } - } -} diff --git a/src/Pole.EventBus.Rabbitmq/Core/Client/RabbitMQClient.cs b/src/Pole.EventBus.Rabbitmq/Core/Client/RabbitMQClient.cs deleted file mode 100644 index 412fdfc..0000000 --- a/src/Pole.EventBus.Rabbitmq/Core/Client/RabbitMQClient.cs +++ /dev/null @@ -1,33 +0,0 @@ -using Microsoft.Extensions.ObjectPool; -using Microsoft.Extensions.Options; -using RabbitMQ.Client; - -namespace Ray.EventBus.RabbitMQ -{ - public class RabbitMQClient : IRabbitMQClient - { - readonly ConnectionFactory connectionFactory; - readonly RabbitOptions options; - readonly DefaultObjectPool pool; - public RabbitMQClient(IOptions config) - { - options = config.Value; - connectionFactory = new ConnectionFactory - { - UserName = options.UserName, - Password = options.Password, - VirtualHost = options.VirtualHost, - AutomaticRecoveryEnabled = false - }; - pool = new DefaultObjectPool(new ModelPooledObjectPolicy(connectionFactory, options)); - } - - public ModelWrapper PullModel() - { - var result = pool.Get(); - if (result.Pool is null) - result.Pool = pool; - return result; - } - } -} diff --git a/src/Pole.EventBus.Rabbitmq/Core/Configuration/ConsumerOptions.cs b/src/Pole.EventBus.Rabbitmq/Core/Configuration/ConsumerOptions.cs deleted file mode 100644 index 148a280..0000000 --- a/src/Pole.EventBus.Rabbitmq/Core/Configuration/ConsumerOptions.cs +++ /dev/null @@ -1,17 +0,0 @@ -namespace Ray.EventBus.RabbitMQ -{ - /// - /// Consumer配置信息 - /// - public class ConsumerOptions - { - /// - /// 是否自动ack - /// - public bool AutoAck { get; set; } - /// - /// 消息处理失败是否重回队列还是不停重发 - /// - public bool Reenqueue { get; set; } - } -} diff --git a/src/Pole.EventBus.Rabbitmq/Core/Configuration/RabbitOptions.cs b/src/Pole.EventBus.Rabbitmq/Core/Configuration/RabbitOptions.cs deleted file mode 100644 index 20c7384..0000000 --- a/src/Pole.EventBus.Rabbitmq/Core/Configuration/RabbitOptions.cs +++ /dev/null @@ -1,38 +0,0 @@ -using System.Collections.Generic; -using RabbitMQ.Client; - -namespace Ray.EventBus.RabbitMQ -{ - public class RabbitOptions - { - public string UserName { get; set; } - public string Password { get; set; } - public string VirtualHost { get; set; } - public int PoolSizePerConnection { get; set; } = 200; - public int MaxConnection { get; set; } = 20; - /// - /// 消费者批量处理每次处理的最大消息量 - /// - public ushort CunsumerMaxBatchSize { get; set; } = 3000; - /// - /// 消费者批量处理每次处理的最大延时 - /// - public int CunsumerMaxMillisecondsInterval { get; set; } = 1000; - public string[] Hosts - { - get; set; - } - public List EndPoints - { - get - { - var list = new List(); - foreach (var host in Hosts) - { - list.Add(AmqpTcpEndpoint.Parse(host)); - } - return list; - } - } - } -} diff --git a/src/Pole.EventBus.Rabbitmq/Core/Consumer/ConsumerManager.cs b/src/Pole.EventBus.Rabbitmq/Core/Consumer/ConsumerManager.cs deleted file mode 100644 index 90634c5..0000000 --- a/src/Pole.EventBus.Rabbitmq/Core/Consumer/ConsumerManager.cs +++ /dev/null @@ -1,169 +0,0 @@ -using Microsoft.Extensions.Hosting; -using Microsoft.Extensions.Logging; -using Orleans; -using Ray.Core.Services; -using System; -using System.Collections.Concurrent; -using System.Linq; -using System.Threading; -using System.Threading.Tasks; - -namespace Ray.EventBus.RabbitMQ -{ - public class ConsumerManager : IHostedService, IDisposable - { - readonly ILogger logger; - readonly IRabbitMQClient client; - readonly IRabbitEventBusContainer rabbitEventBusContainer; - readonly IServiceProvider provider; - readonly IGrainFactory grainFactory; - const int _HoldTime = 20 * 1000; - const int _MonitTime = 60 * 2 * 1000; - const int _checkTime = 10 * 1000; - - public ConsumerManager( - ILogger logger, - IRabbitMQClient client, - IGrainFactory grainFactory, - IServiceProvider provider, - IRabbitEventBusContainer rabbitEventBusContainer) - { - this.provider = provider; - this.client = client; - this.logger = logger; - this.rabbitEventBusContainer = rabbitEventBusContainer; - this.grainFactory = grainFactory; - } - private readonly ConcurrentDictionary ConsumerRunners = new ConcurrentDictionary(); - private ConcurrentDictionary Runners { get; } = new ConcurrentDictionary(); - private Timer HeathCheckTimer { get; set; } - private Timer DistributedMonitorTime { get; set; } - private Timer DistributedHoldTimer { get; set; } - const int lockHoldingSeconds = 60; - int distributedMonitorTimeLock = 0; - int distributedHoldTimerLock = 0; - int heathCheckTimerLock = 0; - private async Task DistributedStart() - { - try - { - if (Interlocked.CompareExchange(ref distributedMonitorTimeLock, 1, 0) == 0) - { - var consumers = rabbitEventBusContainer.GetConsumers(); - foreach (var consumer in consumers) - { - if (consumer is RabbitConsumer value) - { - for (int i = 0; i < value.QueueList.Count(); i++) - { - var queue = value.QueueList[i]; - var key = queue.ToString(); - if (!Runners.ContainsKey(key)) - { - var weight = 100000 - Runners.Count; - var (isOk, lockId, expectMillisecondDelay) = await grainFactory.GetGrain(key).Lock(weight, lockHoldingSeconds); - if (isOk) - { - if (Runners.TryAdd(key, lockId)) - { - var runner = new ConsumerRunner(client, provider, value, queue); - ConsumerRunners.TryAdd(key, runner); - await runner.Run(); - } - - } - } - } - } - } - Interlocked.Exchange(ref distributedMonitorTimeLock, 0); - if (logger.IsEnabled(LogLevel.Information)) - logger.LogInformation("EventBus Background Service is working."); - } - } - catch (Exception exception) - { - logger.LogError(exception.InnerException ?? exception, nameof(DistributedStart)); - Interlocked.Exchange(ref distributedMonitorTimeLock, 0); - } - } - private async Task DistributedHold() - { - try - { - if (logger.IsEnabled(LogLevel.Information)) - logger.LogInformation("EventBus Background Service is holding."); - if (Interlocked.CompareExchange(ref distributedHoldTimerLock, 1, 0) == 0) - { - foreach (var lockKV in Runners) - { - if (Runners.TryGetValue(lockKV.Key, out var lockId)) - { - var holdResult = await grainFactory.GetGrain(lockKV.Key).Hold(lockId, lockHoldingSeconds); - if (!holdResult) - { - if (ConsumerRunners.TryRemove(lockKV.Key, out var runner)) - { - runner.Close(); - } - Runners.TryRemove(lockKV.Key, out var _); - } - } - } - Interlocked.Exchange(ref distributedHoldTimerLock, 0); - } - } - catch (Exception exception) - { - logger.LogError(exception.InnerException ?? exception, nameof(DistributedHold)); - Interlocked.Exchange(ref distributedHoldTimerLock, 0); - } - } - private async Task HeathCheck() - { - try - { - if (logger.IsEnabled(LogLevel.Information)) - logger.LogInformation("EventBus Background Service is checking."); - if (Interlocked.CompareExchange(ref heathCheckTimerLock, 1, 0) == 0) - { - await Task.WhenAll(ConsumerRunners.Values.Select(runner => runner.HeathCheck())); - Interlocked.Exchange(ref heathCheckTimerLock, 0); - } - } - catch (Exception exception) - { - logger.LogError(exception.InnerException ?? exception, nameof(HeathCheck)); - Interlocked.Exchange(ref heathCheckTimerLock, 0); - } - } - public Task StartAsync(CancellationToken cancellationToken) - { - if (logger.IsEnabled(LogLevel.Information)) - logger.LogInformation("EventBus Background Service is starting."); - DistributedMonitorTime = new Timer(state => DistributedStart().Wait(), null, 1000, _MonitTime); - DistributedHoldTimer = new Timer(state => DistributedHold().Wait(), null, _HoldTime, _HoldTime); - HeathCheckTimer = new Timer(state => { HeathCheck().Wait(); }, null, _checkTime, _checkTime); - return Task.CompletedTask; - } - public Task StopAsync(CancellationToken cancellationToken) - { - if (logger.IsEnabled(LogLevel.Information)) - logger.LogInformation("EventBus Background Service is stopping."); - Dispose(); - return Task.CompletedTask; - } - public void Dispose() - { - if (logger.IsEnabled(LogLevel.Information)) - logger.LogInformation("EventBus Background Service is disposing."); - foreach (var runner in ConsumerRunners.Values) - { - runner.Close(); - } - DistributedMonitorTime?.Dispose(); - DistributedHoldTimer?.Dispose(); - HeathCheckTimer?.Dispose(); - } - } -} diff --git a/src/Pole.EventBus.Rabbitmq/Core/Consumer/ConsumerRunner.cs b/src/Pole.EventBus.Rabbitmq/Core/Consumer/ConsumerRunner.cs deleted file mode 100644 index 0838505..0000000 --- a/src/Pole.EventBus.Rabbitmq/Core/Consumer/ConsumerRunner.cs +++ /dev/null @@ -1,119 +0,0 @@ -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Logging; -using RabbitMQ.Client; -using RabbitMQ.Client.Events; -using Ray.Core.Channels; -using System; -using System.Collections.Generic; -using System.Linq; -using System.Threading.Tasks; - -namespace Ray.EventBus.RabbitMQ -{ - public class ConsumerRunner - { - readonly IMpscChannel mpscChannel; - public ConsumerRunner( - IRabbitMQClient client, - IServiceProvider provider, - RabbitConsumer consumer, - QueueInfo queue) - { - Client = client; - Logger = provider.GetService>(); - mpscChannel = provider.GetService>(); - mpscChannel.BindConsumer(BatchExecuter); - Consumer = consumer; - Queue = queue; - } - public ILogger Logger { get; } - public IRabbitMQClient Client { get; } - public RabbitConsumer Consumer { get; } - public QueueInfo Queue { get; } - public ModelWrapper Model { get; set; } - public EventingBasicConsumer BasicConsumer { get; set; } - public bool IsUnAvailable => !BasicConsumer.IsRunning || Model.Model.IsClosed; - private bool isFirst = true; - public Task Run() - { - Model = Client.PullModel(); - mpscChannel.Config(Model.Connection.Options.CunsumerMaxBatchSize, Model.Connection.Options.CunsumerMaxMillisecondsInterval); - if (isFirst) - { - isFirst = false; - Model.Model.ExchangeDeclare(Consumer.EventBus.Exchange, "direct", true); - Model.Model.QueueDeclare(Queue.Queue, true, false, false, null); - Model.Model.QueueBind(Queue.Queue, Consumer.EventBus.Exchange, Queue.RoutingKey); - } - Model.Model.BasicQos(0, Model.Connection.Options.CunsumerMaxBatchSize, false); - BasicConsumer = new EventingBasicConsumer(Model.Model); - BasicConsumer.Received += async (ch, ea) => await mpscChannel.WriteAsync(ea); - BasicConsumer.ConsumerTag = Model.Model.BasicConsume(Queue.Queue, Consumer.Config.AutoAck, BasicConsumer); - return Task.CompletedTask; - } - public Task HeathCheck() - { - if (IsUnAvailable) - { - Close(); - return Run(); - } - else - return Task.CompletedTask; - } - private async Task BatchExecuter(List list) - { - if (list.Count == 1) - { - await Process(list.First()); - } - else - { - try - { - await Consumer.Notice(list.Select(o => o.Body).ToList()); - if (!Consumer.Config.AutoAck) - { - Model.Model.BasicAck(list.Max(o => o.DeliveryTag), true); - } - } - catch (Exception exception) - { - Logger.LogError(exception.InnerException ?? exception, $"An error occurred in {Consumer.EventBus.Exchange}-{Queue}"); - if (Consumer.Config.Reenqueue) - { - await Task.Delay(1000); - foreach (var item in list) - { - Model.Model.BasicReject(item.DeliveryTag, true); - } - } - } - } - } - private async Task Process(BasicDeliverEventArgs ea) - { - try - { - await Consumer.Notice(ea.Body); - if (!Consumer.Config.AutoAck) - { - Model.Model.BasicAck(ea.DeliveryTag, false); - } - } - catch (Exception exception) - { - Logger.LogError(exception.InnerException ?? exception, $"An error occurred in {Consumer.EventBus.Exchange}-{Queue}"); - if (Consumer.Config.Reenqueue) - { - await Task.Delay(1000); - Model.Model.BasicReject(ea.DeliveryTag, true); - } - } - } - public void Close() - { - Model?.Dispose(); - } - } -} diff --git a/src/Pole.EventBus.Rabbitmq/Core/Consumer/RabbitConsumer.cs b/src/Pole.EventBus.Rabbitmq/Core/Consumer/RabbitConsumer.cs deleted file mode 100644 index b519362..0000000 --- a/src/Pole.EventBus.Rabbitmq/Core/Consumer/RabbitConsumer.cs +++ /dev/null @@ -1,19 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Threading.Tasks; -using Ray.Core.EventBus; - -namespace Ray.EventBus.RabbitMQ -{ - public class RabbitConsumer : Consumer - { - public RabbitConsumer( - List> eventHandlers, - List, Task>> batchEventHandlers) : base(eventHandlers, batchEventHandlers) - { - } - public RabbitEventBus EventBus { get; set; } - public List QueueList { get; set; } - public ConsumerOptions Config { get; set; } - } -} diff --git a/src/Pole.EventBus.Rabbitmq/Core/EventBusContainer.cs b/src/Pole.EventBus.Rabbitmq/Core/EventBusContainer.cs index b2c794d..2842b21 100644 --- a/src/Pole.EventBus.Rabbitmq/Core/EventBusContainer.cs +++ b/src/Pole.EventBus.Rabbitmq/Core/EventBusContainer.cs @@ -2,16 +2,16 @@ using Microsoft.Extensions.Logging; using Orleans; using RabbitMQ.Client; -using Ray.Core.Abstractions; -using Ray.Core.EventBus; -using Ray.Core.Exceptions; -using Ray.Core.Utils; +using Pole.Core.Abstractions; +using Pole.Core.EventBus; +using Pole.Core.Exceptions; +using Pole.Core.Utils; using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Threading.Tasks; -namespace Ray.EventBus.RabbitMQ +namespace Pole.EventBus.RabbitMQ { public class EventBusContainer : IRabbitEventBusContainer, IProducerContainer { diff --git a/src/Pole.EventBus.Rabbitmq/Core/EventBusRepeatException.cs b/src/Pole.EventBus.Rabbitmq/Core/EventBusRepeatException.cs index d706edb..32374ca 100644 --- a/src/Pole.EventBus.Rabbitmq/Core/EventBusRepeatException.cs +++ b/src/Pole.EventBus.Rabbitmq/Core/EventBusRepeatException.cs @@ -1,6 +1,6 @@ using System; -namespace Ray.EventBus.RabbitMQ +namespace Pole.EventBus.RabbitMQ { public class EventBusRepeatException : Exception { diff --git a/src/Pole.EventBus.Rabbitmq/Core/IRabbitEventBusContainer.cs b/src/Pole.EventBus.Rabbitmq/Core/IRabbitEventBusContainer.cs index 7c44495..9730109 100644 --- a/src/Pole.EventBus.Rabbitmq/Core/IRabbitEventBusContainer.cs +++ b/src/Pole.EventBus.Rabbitmq/Core/IRabbitEventBusContainer.cs @@ -1,7 +1,7 @@ using System.Threading.Tasks; -using Ray.Core.EventBus; +using Pole.Core.EventBus; -namespace Ray.EventBus.RabbitMQ +namespace Pole.EventBus.RabbitMQ { public interface IRabbitEventBusContainer : IConsumerContainer { diff --git a/src/Pole.EventBus.Rabbitmq/Core/QueueInfo.cs b/src/Pole.EventBus.Rabbitmq/Core/QueueInfo.cs index ca9993d..3efebc7 100644 --- a/src/Pole.EventBus.Rabbitmq/Core/QueueInfo.cs +++ b/src/Pole.EventBus.Rabbitmq/Core/QueueInfo.cs @@ -1,4 +1,4 @@ -namespace Ray.EventBus.RabbitMQ +namespace Pole.EventBus.RabbitMQ { public class QueueInfo { diff --git a/src/Pole.EventBus.Rabbitmq/Core/RabbitEventBus.cs b/src/Pole.EventBus.Rabbitmq/Core/RabbitEventBus.cs index 6d8036f..00b9a0a 100644 --- a/src/Pole.EventBus.Rabbitmq/Core/RabbitEventBus.cs +++ b/src/Pole.EventBus.Rabbitmq/Core/RabbitEventBus.cs @@ -1,12 +1,12 @@ -using Ray.Core.Abstractions; -using Ray.Core.Exceptions; -using Ray.Core.Utils; +using Pole.Core.Abstractions; +using Pole.Core.Exceptions; +using Pole.Core.Utils; using System; using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; -namespace Ray.EventBus.RabbitMQ +namespace Pole.EventBus.RabbitMQ { public class RabbitEventBus { diff --git a/src/Pole.EventBus.Rabbitmq/Extensions.cs b/src/Pole.EventBus.Rabbitmq/Extensions.cs index 20fd0f9..91bed51 100644 --- a/src/Pole.EventBus.Rabbitmq/Extensions.cs +++ b/src/Pole.EventBus.Rabbitmq/Extensions.cs @@ -1,10 +1,10 @@ using System; using System.Threading.Tasks; using Microsoft.Extensions.DependencyInjection; -using Ray.Core; -using Ray.Core.EventBus; +using Pole.Core; +using Pole.Core.EventBus; -namespace Ray.EventBus.RabbitMQ +namespace Pole.EventBus.RabbitMQ { public static class Extensions { diff --git a/src/Pole.EventBus.Rabbitmq/Pole.EventBus.Rabbitmq.csproj b/src/Pole.EventBus.Rabbitmq/Pole.EventBus.Rabbitmq.csproj index dde5642..5d07954 100644 --- a/src/Pole.EventBus.Rabbitmq/Pole.EventBus.Rabbitmq.csproj +++ b/src/Pole.EventBus.Rabbitmq/Pole.EventBus.Rabbitmq.csproj @@ -17,4 +17,11 @@ + + + + + + + diff --git a/src/Pole.EventBus.Rabbitmq/Producer/RabbitProducer.cs b/src/Pole.EventBus.Rabbitmq/Producer/RabbitProducer.cs index 4a91148..79d27dd 100644 --- a/src/Pole.EventBus.Rabbitmq/Producer/RabbitProducer.cs +++ b/src/Pole.EventBus.Rabbitmq/Producer/RabbitProducer.cs @@ -1,8 +1,9 @@ -using Ray.Core; -using Ray.Core.EventBus; +using Pole.Core; +using Pole.Core; +using Pole.Core.EventBus; using System.Threading.Tasks; -namespace Ray.EventBus.RabbitMQ +namespace Pole.EventBus.RabbitMQ { public class RabbitProducer : IProducer {