diff --git a/src/Pole.Core/EventBus/EventHandler/PoleEventHandler.cs b/src/Pole.Core/EventBus/EventHandler/PoleEventHandler.cs index eb56f7f..e125507 100644 --- a/src/Pole.Core/EventBus/EventHandler/PoleEventHandler.cs +++ b/src/Pole.Core/EventBus/EventHandler/PoleEventHandler.cs @@ -8,7 +8,7 @@ namespace Pole.Core.EventBus.EventHandler { public class PoleEventHandler : PoleEventHandlerBase { - public override Task BatchInvoke(Immutable> bytes) + public override Task Invoke(Immutable> bytes) { throw new NotImplementedException(); } diff --git a/src/Pole.Core/EventBus/EventHandler/PoleEventHandlerBase.cs b/src/Pole.Core/EventBus/EventHandler/PoleEventHandlerBase.cs index 372957e..68b9bad 100644 --- a/src/Pole.Core/EventBus/EventHandler/PoleEventHandlerBase.cs +++ b/src/Pole.Core/EventBus/EventHandler/PoleEventHandlerBase.cs @@ -10,6 +10,6 @@ namespace Pole.Core.EventBus.EventHandler public abstract class PoleEventHandlerBase : Grain { public abstract Task Invoke(Immutable bytes); - public abstract Task BatchInvoke(Immutable> bytes); + public abstract Task Invoke(Immutable> bytes); } } diff --git a/src/Pole.Core/EventBus/ObserverUnit.cs b/src/Pole.Core/EventBus/ObserverUnit.cs index 31b7f64..959ec55 100644 --- a/src/Pole.Core/EventBus/ObserverUnit.cs +++ b/src/Pole.Core/EventBus/ObserverUnit.cs @@ -144,7 +144,7 @@ namespace Pole.Core.EventBus return Task.WhenAll(groups.Select(kv => { var items = kv.Select(item => item.bytes).ToList(); - return GetObserver(observerType, kv.Key).BatchInvoke(new Immutable>(items)); + return GetObserver(observerType, kv.Key).Invoke(new Immutable>(items)); })); } } diff --git a/src/Pole.EventBus.Rabbitmq/Configuration/RabbitOptions.cs b/src/Pole.EventBus.Rabbitmq/Configuration/RabbitOptions.cs index 13b545b..0a679a8 100644 --- a/src/Pole.EventBus.Rabbitmq/Configuration/RabbitOptions.cs +++ b/src/Pole.EventBus.Rabbitmq/Configuration/RabbitOptions.cs @@ -16,7 +16,7 @@ namespace Pole.EventBus.RabbitMQ /// /// 消费者批量处理每次处理的最大消息量 /// - public ushort CunsumerMaxBatchSize { get; set; } = 3000; + public ushort CunsumerMaxBatchSize { get; set; } = 300; /// /// 消费者批量处理每次处理的最大延时 /// diff --git a/src/Pole.EventBus.Rabbitmq/Consumer/ConsumerRunner.cs b/src/Pole.EventBus.Rabbitmq/Consumer/ConsumerRunner.cs index e68844d..a757002 100644 --- a/src/Pole.EventBus.Rabbitmq/Consumer/ConsumerRunner.cs +++ b/src/Pole.EventBus.Rabbitmq/Consumer/ConsumerRunner.cs @@ -78,23 +78,22 @@ namespace Pole.EventBus.RabbitMQ try { await Consumer.Notice(list.Select(o => o.Body).ToList()); - if (!Consumer.Config.AutoAck) - { - Model.Model.BasicAck(list.Max(o => o.DeliveryTag), true); - } } catch (Exception exception) { - Logger.LogError(exception.InnerException ?? exception, $"An error occurred in {Consumer.EventBus.Exchange}-{Queue}"); + Logger.LogError(exception, $"An error occurred in batch consume {Queue.Queue} queue, routing path {Consumer.EventBus.Exchange}->{Queue.Queue}->{Queue.Queue}"); if (Consumer.Config.Reenqueue) { - await Task.Delay(1000); foreach (var item in list) { - Model.Model.BasicReject(item.DeliveryTag, true); + await ProcessComsumerErrors(item, exception); } } } + if (!Consumer.Config.AutoAck) + { + Model.Model.BasicAck(list.Max(o => o.DeliveryTag), true); + } } } private async Task Process(BasicDeliverEventArgs ea) @@ -105,8 +104,11 @@ namespace Pole.EventBus.RabbitMQ } catch (Exception exception) { - Logger.LogError(exception, $"An error occurred in {Queue.Queue}, routing path {Consumer.EventBus.Exchange}->{Queue.Queue}->{Queue.Queue}"); - await ProcessComsumerErrors(ea, exception); + Logger.LogError(exception, $"An error occurred in consume {Queue.Queue} queue, routing path {Consumer.EventBus.Exchange}->{Queue.Queue}->{Queue.Queue}"); + if (Consumer.Config.Reenqueue) + { + await ProcessComsumerErrors(ea, exception); + } } if (!Consumer.Config.AutoAck) { @@ -116,32 +118,29 @@ namespace Pole.EventBus.RabbitMQ private async Task ProcessComsumerErrors(BasicDeliverEventArgs ea, Exception exception) { - if (Consumer.Config.Reenqueue) + if (ea.BasicProperties.Headers.TryGetValue(Consts.ConsumerRetryTimesStr, out object retryTimesObj)) { - if (ea.BasicProperties.Headers.TryGetValue(Consts.ConsumerRetryTimesStr, out object retryTimesObj)) + var retryTimes = Convert.ToInt32(retryTimesObj); + if (retryTimes <= Consumer.Config.MaxReenqueueTimes) { - var retryTimes = Convert.ToInt32(retryTimesObj); - if (retryTimes <= Consumer.Config.MaxReenqueueTimes) + retryTimes++; + ea.BasicProperties.Headers[Consts.ConsumerRetryTimesStr] = retryTimes; + ea.BasicProperties.Headers[Consts.ConsumerExceptionDetailsStr] = serializer.Serialize(exception, typeof(Exception)); + await Task.Delay((int)Math.Pow(2, retryTimes) * 1000).ContinueWith((task) => { - retryTimes++; - ea.BasicProperties.Headers[Consts.ConsumerRetryTimesStr] = retryTimes; - ea.BasicProperties.Headers[Consts.ConsumerExceptionDetailsStr] = serializer.Serialize(exception, typeof(Exception)); - await Task.Delay((int)Math.Pow(2, retryTimes) * 1000).ContinueWith((task) => - { - Model.Model.BasicReject(ea.DeliveryTag, true); - }); - } - else + Model.Model.BasicReject(ea.DeliveryTag, true); + }); + } + else + { + var errorQueueName = $"{Queue.Queue}{Consumer.Config.ErrorQueueSuffix}"; + var errorExchangeName = $"{Queue.Queue}{Consumer.Config.ErrorQueueSuffix}"; + Model.Model.ExchangeDeclare(errorExchangeName, "direct", true); + Model.Model.QueueDeclare(errorQueueName, true, false, false, null); + Model.Model.QueueBind(errorQueueName, errorExchangeName, string.Empty); + if (!Consumer.Config.AutoAck) { - var errorQueueName = $"{Queue.Queue}{Consumer.Config.ErrorQueueSuffix}"; - var errorExchangeName = $"{Queue.Queue}{Consumer.Config.ErrorQueueSuffix}"; - Model.Model.ExchangeDeclare(errorExchangeName, "direct", true); - Model.Model.QueueDeclare(errorQueueName, true, false, false, null); - Model.Model.QueueBind(errorQueueName, errorExchangeName, string.Empty); - if (!Consumer.Config.AutoAck) - { - Model.Model.BasicAck(ea.DeliveryTag, false); - } + Model.Model.BasicAck(ea.DeliveryTag, false); } } }