Commit 021ebde6 by 丁松杰

添加 批量消息处理时的 异常重试 及错误队列

parent 65b7891d
...@@ -8,7 +8,7 @@ namespace Pole.Core.EventBus.EventHandler ...@@ -8,7 +8,7 @@ namespace Pole.Core.EventBus.EventHandler
{ {
public class PoleEventHandler : PoleEventHandlerBase public class PoleEventHandler : PoleEventHandlerBase
{ {
public override Task BatchInvoke(Immutable<List<byte[]>> bytes) public override Task Invoke(Immutable<List<byte[]>> bytes)
{ {
throw new NotImplementedException(); throw new NotImplementedException();
} }
......
...@@ -10,6 +10,6 @@ namespace Pole.Core.EventBus.EventHandler ...@@ -10,6 +10,6 @@ namespace Pole.Core.EventBus.EventHandler
public abstract class PoleEventHandlerBase : Grain public abstract class PoleEventHandlerBase : Grain
{ {
public abstract Task Invoke(Immutable<byte[]> bytes); public abstract Task Invoke(Immutable<byte[]> bytes);
public abstract Task BatchInvoke(Immutable<List<byte[]>> bytes); public abstract Task Invoke(Immutable<List<byte[]>> bytes);
} }
} }
...@@ -144,7 +144,7 @@ namespace Pole.Core.EventBus ...@@ -144,7 +144,7 @@ namespace Pole.Core.EventBus
return Task.WhenAll(groups.Select(kv => return Task.WhenAll(groups.Select(kv =>
{ {
var items = kv.Select(item => item.bytes).ToList(); var items = kv.Select(item => item.bytes).ToList();
return GetObserver(observerType, kv.Key).BatchInvoke(new Immutable<List<byte[]>>(items)); return GetObserver(observerType, kv.Key).Invoke(new Immutable<List<byte[]>>(items));
})); }));
} }
} }
......
...@@ -16,7 +16,7 @@ namespace Pole.EventBus.RabbitMQ ...@@ -16,7 +16,7 @@ namespace Pole.EventBus.RabbitMQ
/// <summary> /// <summary>
/// 消费者批量处理每次处理的最大消息量 /// 消费者批量处理每次处理的最大消息量
/// </summary> /// </summary>
public ushort CunsumerMaxBatchSize { get; set; } = 3000; public ushort CunsumerMaxBatchSize { get; set; } = 300;
/// <summary> /// <summary>
/// 消费者批量处理每次处理的最大延时 /// 消费者批量处理每次处理的最大延时
/// </summary> /// </summary>
......
...@@ -78,23 +78,22 @@ namespace Pole.EventBus.RabbitMQ ...@@ -78,23 +78,22 @@ namespace Pole.EventBus.RabbitMQ
try try
{ {
await Consumer.Notice(list.Select(o => o.Body).ToList()); await Consumer.Notice(list.Select(o => o.Body).ToList());
if (!Consumer.Config.AutoAck)
{
Model.Model.BasicAck(list.Max(o => o.DeliveryTag), true);
}
} }
catch (Exception exception) catch (Exception exception)
{ {
Logger.LogError(exception.InnerException ?? exception, $"An error occurred in {Consumer.EventBus.Exchange}-{Queue}"); Logger.LogError(exception, $"An error occurred in batch consume {Queue.Queue} queue, routing path {Consumer.EventBus.Exchange}->{Queue.Queue}->{Queue.Queue}");
if (Consumer.Config.Reenqueue) if (Consumer.Config.Reenqueue)
{ {
await Task.Delay(1000);
foreach (var item in list) foreach (var item in list)
{ {
Model.Model.BasicReject(item.DeliveryTag, true); await ProcessComsumerErrors(item, exception);
} }
} }
} }
if (!Consumer.Config.AutoAck)
{
Model.Model.BasicAck(list.Max(o => o.DeliveryTag), true);
}
} }
} }
private async Task Process(BasicDeliverEventArgs ea) private async Task Process(BasicDeliverEventArgs ea)
...@@ -105,8 +104,11 @@ namespace Pole.EventBus.RabbitMQ ...@@ -105,8 +104,11 @@ namespace Pole.EventBus.RabbitMQ
} }
catch (Exception exception) catch (Exception exception)
{ {
Logger.LogError(exception, $"An error occurred in {Queue.Queue}, routing path {Consumer.EventBus.Exchange}->{Queue.Queue}->{Queue.Queue}"); Logger.LogError(exception, $"An error occurred in consume {Queue.Queue} queue, routing path {Consumer.EventBus.Exchange}->{Queue.Queue}->{Queue.Queue}");
await ProcessComsumerErrors(ea, exception); if (Consumer.Config.Reenqueue)
{
await ProcessComsumerErrors(ea, exception);
}
} }
if (!Consumer.Config.AutoAck) if (!Consumer.Config.AutoAck)
{ {
...@@ -116,32 +118,29 @@ namespace Pole.EventBus.RabbitMQ ...@@ -116,32 +118,29 @@ namespace Pole.EventBus.RabbitMQ
private async Task ProcessComsumerErrors(BasicDeliverEventArgs ea, Exception exception) private async Task ProcessComsumerErrors(BasicDeliverEventArgs ea, Exception exception)
{ {
if (Consumer.Config.Reenqueue) if (ea.BasicProperties.Headers.TryGetValue(Consts.ConsumerRetryTimesStr, out object retryTimesObj))
{ {
if (ea.BasicProperties.Headers.TryGetValue(Consts.ConsumerRetryTimesStr, out object retryTimesObj)) var retryTimes = Convert.ToInt32(retryTimesObj);
if (retryTimes <= Consumer.Config.MaxReenqueueTimes)
{ {
var retryTimes = Convert.ToInt32(retryTimesObj); retryTimes++;
if (retryTimes <= Consumer.Config.MaxReenqueueTimes) ea.BasicProperties.Headers[Consts.ConsumerRetryTimesStr] = retryTimes;
ea.BasicProperties.Headers[Consts.ConsumerExceptionDetailsStr] = serializer.Serialize(exception, typeof(Exception));
await Task.Delay((int)Math.Pow(2, retryTimes) * 1000).ContinueWith((task) =>
{ {
retryTimes++; Model.Model.BasicReject(ea.DeliveryTag, true);
ea.BasicProperties.Headers[Consts.ConsumerRetryTimesStr] = retryTimes; });
ea.BasicProperties.Headers[Consts.ConsumerExceptionDetailsStr] = serializer.Serialize(exception, typeof(Exception)); }
await Task.Delay((int)Math.Pow(2, retryTimes) * 1000).ContinueWith((task) => else
{ {
Model.Model.BasicReject(ea.DeliveryTag, true); var errorQueueName = $"{Queue.Queue}{Consumer.Config.ErrorQueueSuffix}";
}); var errorExchangeName = $"{Queue.Queue}{Consumer.Config.ErrorQueueSuffix}";
} Model.Model.ExchangeDeclare(errorExchangeName, "direct", true);
else Model.Model.QueueDeclare(errorQueueName, true, false, false, null);
Model.Model.QueueBind(errorQueueName, errorExchangeName, string.Empty);
if (!Consumer.Config.AutoAck)
{ {
var errorQueueName = $"{Queue.Queue}{Consumer.Config.ErrorQueueSuffix}"; Model.Model.BasicAck(ea.DeliveryTag, false);
var errorExchangeName = $"{Queue.Queue}{Consumer.Config.ErrorQueueSuffix}";
Model.Model.ExchangeDeclare(errorExchangeName, "direct", true);
Model.Model.QueueDeclare(errorQueueName, true, false, false, null);
Model.Model.QueueBind(errorQueueName, errorExchangeName, string.Empty);
if (!Consumer.Config.AutoAck)
{
Model.Model.BasicAck(ea.DeliveryTag, false);
}
} }
} }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment