From 5f7a72385999c41b0d725a31df40cc9119414476 Mon Sep 17 00:00:00 2001 From: 丁松杰 <377973147@qq.com> Date: Sun, 9 Feb 2020 19:51:47 +0800 Subject: [PATCH] 优化代码 --- src/Pole.Core/EventBus/IObserverUnit.cs | 4 ++-- src/Pole.Core/EventBus/ObserverUnit.cs | 20 ++++++++++---------- src/Pole.EventBus.Rabbitmq/Configuration/RabbitOptions.cs | 5 ++++- src/Pole.EventBus.Rabbitmq/Consumer/ConsumerManager.cs | 48 ++++++++++-------------------------------------- src/Pole.EventBus.Rabbitmq/Consumer/ConsumerRunner.cs | 4 +++- src/Pole.EventBus.Rabbitmq/Consumer/RabbitConsumer.cs | 6 +++--- src/Pole.EventBus.Rabbitmq/Core/RabbitEventBus.cs | 27 ++++----------------------- src/Pole.EventBus.Rabbitmq/Pole.EventBus.Rabbitmq.csproj | 7 ------- 8 files changed, 36 insertions(+), 85 deletions(-) diff --git a/src/Pole.Core/EventBus/IObserverUnit.cs b/src/Pole.Core/EventBus/IObserverUnit.cs index 927c492..d041f3a 100644 --- a/src/Pole.Core/EventBus/IObserverUnit.cs +++ b/src/Pole.Core/EventBus/IObserverUnit.cs @@ -6,7 +6,7 @@ namespace Pole.Core.EventBus { public interface IObserverUnit : IGrainID { - List> GetEventHandlers(); - List, Task>> GetBatchEventHandlers(); + Func GetEventHandler(); + Func, Task> GetBatchEventHandler(); } } diff --git a/src/Pole.Core/EventBus/ObserverUnit.cs b/src/Pole.Core/EventBus/ObserverUnit.cs index b5aaef7..31b7f64 100644 --- a/src/Pole.Core/EventBus/ObserverUnit.cs +++ b/src/Pole.Core/EventBus/ObserverUnit.cs @@ -22,8 +22,8 @@ namespace Pole.Core.EventBus readonly ISerializer serializer; readonly ITypeFinder typeFinder; readonly IClusterClient clusterClient; - readonly List> eventHandlers = new List>(); - readonly List, Task>> batchEventHandlers = new List, Task>>(); + Func eventHandler; + Func, Task> batchEventHandler; protected ILogger Logger { get; private set; } public Type GrainType { get; } @@ -41,21 +41,21 @@ namespace Pole.Core.EventBus return new ObserverUnit(serviceProvider, typeof(Grain)); } - public List> GetEventHandlers() + public Func GetEventHandler() { - return eventHandlers; + return eventHandler; } - public List, Task>> GetBatchEventHandlers() + public Func, Task> GetBatchEventHandler() { - return batchEventHandlers; + return batchEventHandler; } public ObserverUnit UnreliableObserver( Func, ValueTask> handler) { - GetEventHandlers().Add(EventHandler); - GetBatchEventHandlers().Add(BatchEventHandler); + eventHandler = EventHandler; + batchEventHandler = BatchEventHandler; return this; //内部函数 Task EventHandler(byte[] bytes) @@ -111,8 +111,8 @@ namespace Pole.Core.EventBus { if (!typeof(PoleEventHandlerBase).IsAssignableFrom(observerType)) throw new NotSupportedException($"{observerType.FullName} must inheritance from PoleEventHandler"); - GetEventHandlers().Add(EventHandler); - GetBatchEventHandlers().Add(BatchEventHandler); + eventHandler = EventHandler; + batchEventHandler = BatchEventHandler; //内部函数 Task EventHandler(byte[] bytes) { diff --git a/src/Pole.EventBus.Rabbitmq/Configuration/RabbitOptions.cs b/src/Pole.EventBus.Rabbitmq/Configuration/RabbitOptions.cs index 2fb0cd2..e74b320 100644 --- a/src/Pole.EventBus.Rabbitmq/Configuration/RabbitOptions.cs +++ b/src/Pole.EventBus.Rabbitmq/Configuration/RabbitOptions.cs @@ -9,7 +9,10 @@ namespace Pole.EventBus.RabbitMQ public string Password { get; set; } public string VirtualHost { get; set; } public int MasChannelsPerConnection { get; set; } = 200; - public int MaxConnection { get; set; } = 20; + /// + /// 目前为一个连接 当消息数量非常大时,单个TCP连接的运输能力有限,可以修改这个最大连接数提高运输能力 + /// + public int MaxConnection { get; set; } = 1; /// /// 消费者批量处理每次处理的最大消息量 /// diff --git a/src/Pole.EventBus.Rabbitmq/Consumer/ConsumerManager.cs b/src/Pole.EventBus.Rabbitmq/Consumer/ConsumerManager.cs index 5020221..94c728c 100644 --- a/src/Pole.EventBus.Rabbitmq/Consumer/ConsumerManager.cs +++ b/src/Pole.EventBus.Rabbitmq/Consumer/ConsumerManager.cs @@ -43,49 +43,21 @@ namespace Pole.EventBus.RabbitMQ int distributedMonitorTimeLock = 0; int distributedHoldTimerLock = 0; int heathCheckTimerLock = 0; - private async Task DistributedStart() + private async Task Start() { - try + var consumers = rabbitEventBusContainer.GetConsumers(); + foreach (var consumer in consumers) { - if (Interlocked.CompareExchange(ref distributedMonitorTimeLock, 1, 0) == 0) + if (consumer is RabbitConsumer value) { - var consumers = rabbitEventBusContainer.GetConsumers(); - foreach (var consumer in consumers) - { - if (consumer is RabbitConsumer value) - { - for (int i = 0; i < value.QueueList.Count(); i++) - { - var queue = value.QueueList[i]; - var key = queue.ToString(); - if (!Runners.ContainsKey(key)) - { - var weight = 100000 - Runners.Count; - var (isOk, lockId, expectMillisecondDelay) = await grainFactory.GetGrain(key).Lock(weight, lockHoldingSeconds); - if (isOk) - { - if (Runners.TryAdd(key, lockId)) - { - var runner = new ConsumerRunner(client, provider, value, queue); - ConsumerRunners.TryAdd(key, runner); - await runner.Run(); - } + var queue = value.QueueInfo; + var key = queue.Queue; - } - } - } - } - } - Interlocked.Exchange(ref distributedMonitorTimeLock, 0); - if (logger.IsEnabled(LogLevel.Information)) - logger.LogInformation("EventBus Background Service is working."); + var runner = new ConsumerRunner(client, provider, value, queue); + ConsumerRunners.TryAdd(key, runner); + await runner.Run(); } } - catch (Exception exception) - { - logger.LogError(exception.InnerException ?? exception, nameof(DistributedStart)); - Interlocked.Exchange(ref distributedMonitorTimeLock, 0); - } } private async Task DistributedHold() { @@ -141,7 +113,7 @@ namespace Pole.EventBus.RabbitMQ { if (logger.IsEnabled(LogLevel.Information)) logger.LogInformation("EventBus Background Service is starting."); - DistributedMonitorTime = new Timer(state => DistributedStart().Wait(), null, 1000, _MonitTime); + DistributedMonitorTime = new Timer(state => Start().Wait(), null, 1000, _MonitTime); DistributedHoldTimer = new Timer(state => DistributedHold().Wait(), null, _HoldTime, _HoldTime); HeathCheckTimer = new Timer(state => { HeathCheck().Wait(); }, null, _checkTime, _checkTime); return Task.CompletedTask; diff --git a/src/Pole.EventBus.Rabbitmq/Consumer/ConsumerRunner.cs b/src/Pole.EventBus.Rabbitmq/Consumer/ConsumerRunner.cs index b5b3699..176d98f 100644 --- a/src/Pole.EventBus.Rabbitmq/Consumer/ConsumerRunner.cs +++ b/src/Pole.EventBus.Rabbitmq/Consumer/ConsumerRunner.cs @@ -42,8 +42,10 @@ namespace Pole.EventBus.RabbitMQ { isFirst = false; Model.Model.ExchangeDeclare(Consumer.EventBus.Exchange, "direct", true); + Model.Model.ExchangeDeclare(Queue.Queue, "direct", true); + Model.Model.ExchangeBind(Consumer.EventBus.Exchange, Queue.Queue,string.Empty); Model.Model.QueueDeclare(Queue.Queue, true, false, false, null); - Model.Model.QueueBind(Queue.Queue, Consumer.EventBus.Exchange, Queue.RoutingKey); + Model.Model.QueueBind(Queue.Queue, Queue.Queue, string.Empty); } Model.Model.BasicQos(0, Model.Connection.Options.CunsumerMaxBatchSize, false); BasicConsumer = new EventingBasicConsumer(Model.Model); diff --git a/src/Pole.EventBus.Rabbitmq/Consumer/RabbitConsumer.cs b/src/Pole.EventBus.Rabbitmq/Consumer/RabbitConsumer.cs index 75efdc4..be3e6a4 100644 --- a/src/Pole.EventBus.Rabbitmq/Consumer/RabbitConsumer.cs +++ b/src/Pole.EventBus.Rabbitmq/Consumer/RabbitConsumer.cs @@ -8,12 +8,12 @@ namespace Pole.EventBus.RabbitMQ public class RabbitConsumer : Consumer { public RabbitConsumer( - List> eventHandlers, - List, Task>> batchEventHandlers) : base(eventHandlers, batchEventHandlers) + Func eventHandlers, + Func, Task> batchEventHandlers) : base(new List> { eventHandlers }, new List, Task>> { batchEventHandlers }) { } public RabbitEventBus EventBus { get; set; } - public List QueueList { get; set; } + public QueueInfo QueueInfo { get; set; } public ConsumerOptions Config { get; set; } } } diff --git a/src/Pole.EventBus.Rabbitmq/Core/RabbitEventBus.cs b/src/Pole.EventBus.Rabbitmq/Core/RabbitEventBus.cs index 35c35a2..b001624 100644 --- a/src/Pole.EventBus.Rabbitmq/Core/RabbitEventBus.cs +++ b/src/Pole.EventBus.Rabbitmq/Core/RabbitEventBus.cs @@ -10,7 +10,6 @@ namespace Pole.EventBus.RabbitMQ { public class RabbitEventBus { - private readonly ConsistentHash _CHash; readonly IObserverUnitContainer observerUnitContainer; public RabbitEventBus( IObserverUnitContainer observerUnitContainer, @@ -34,8 +33,6 @@ namespace Pole.EventBus.RabbitMQ AutoAck = autoAck, Reenqueue = reenqueue, }; - RouteList = new List() { $"{routePrefix }" }; - _CHash = new ConsistentHash(RouteList, lBCount * 10); } public IRabbitEventBusContainer Container { get; } public string Exchange { get; } @@ -52,7 +49,7 @@ namespace Pole.EventBus.RabbitMQ public List Consumers { get; set; } = new List(); public string GetRoute(string key) { - return LBCount == 1 ? RoutePrefix : _CHash.GetNode(key); ; + return RoutePrefix; } public RabbitEventBus BindEvent(Type eventType, string eventName) { @@ -66,33 +63,17 @@ namespace Pole.EventBus.RabbitMQ foreach (var observerUnit in observerUnits) { var consumer = new RabbitConsumer( - observerUnit.GetEventHandlers(), - observerUnit.GetBatchEventHandlers()) + observerUnit.GetEventHandler(), + observerUnit.GetBatchEventHandler()) { EventBus = this, - QueueList = RouteList.Select(route => new QueueInfo { RoutingKey = "", Queue = $"{route}_{EventName}" }).ToList(), + QueueInfo = new QueueInfo { RoutingKey = RoutePrefix, Queue = $"{RoutePrefix}_{observerUnit}" }, Config = ConsumerConfig }; Consumers.Add(consumer); } return Enable(); } - public RabbitEventBus AddConsumer( - Func handler, - Func, Task> batchHandler, - string observerGroup) - { - var consumer = new RabbitConsumer( - new List> { handler }, - new List, Task>> { batchHandler }) - { - EventBus = this, - QueueList = RouteList.Select(route => new QueueInfo { RoutingKey = route, Queue = $"{route}_{observerGroup}" }).ToList(), - Config = ConsumerConfig - }; - Consumers.Add(consumer); - return this; - } public Task Enable() { return Container.Work(this); diff --git a/src/Pole.EventBus.Rabbitmq/Pole.EventBus.Rabbitmq.csproj b/src/Pole.EventBus.Rabbitmq/Pole.EventBus.Rabbitmq.csproj index 5d07954..dde5642 100644 --- a/src/Pole.EventBus.Rabbitmq/Pole.EventBus.Rabbitmq.csproj +++ b/src/Pole.EventBus.Rabbitmq/Pole.EventBus.Rabbitmq.csproj @@ -17,11 +17,4 @@ - - - - - - - -- libgit2 0.25.0