From 8fc078fba4468fadb82483c06ddff1d7dafbe7ae Mon Sep 17 00:00:00 2001 From: 丁松杰 <377973147@qq.com> Date: Mon, 10 Feb 2020 18:46:53 +0800 Subject: [PATCH] 完成 发送者 发送确认 以及部分 消费者 重试及错误队列的功能 --- src/Pole.Core/Channels/MpscChannel.cs | 7 +++++-- src/Pole.Core/Consts.cs | 3 +++ src/Pole.Core/Exceptions/ProducerConfirmTimeOutException.cs | 14 ++++++++++++++ src/Pole.Core/Exceptions/ProducerReceivedNAckException.cs | 14 ++++++++++++++ src/Pole.Core/Serialization/DefaultJsonSerializer.cs | 50 ++++++++++++++++++++++++++++++++++++++++++++++++++ src/Pole.EventBus.Rabbitmq/Client/ConnectionWrapper.cs | 12 ++++++------ src/Pole.EventBus.Rabbitmq/Client/IRabbitMQClient.cs | 2 +- src/Pole.EventBus.Rabbitmq/Client/ModelWrapper.cs | 19 +++++++++++++++++++ src/Pole.EventBus.Rabbitmq/Client/RabbitMQClient.cs | 2 +- src/Pole.EventBus.Rabbitmq/Configuration/ConsumerOptions.cs | 10 +++++++++- src/Pole.EventBus.Rabbitmq/Configuration/RabbitOptions.cs | 6 +++++- src/Pole.EventBus.Rabbitmq/Consumer/ConsumerRunner.cs | 54 ++++++++++++++++++++++++++++++++++++++++++++---------- src/Pole.EventBus.Rabbitmq/Core/EventBusContainer.cs | 12 ++++++------ src/Pole.EventBus.Rabbitmq/Core/RabbitEventBus.cs | 2 ++ src/Pole.EventBus.Rabbitmq/Producer/RabbitProducer.cs | 4 ++-- 15 files changed, 181 insertions(+), 30 deletions(-) create mode 100644 src/Pole.Core/Exceptions/ProducerConfirmTimeOutException.cs create mode 100644 src/Pole.Core/Exceptions/ProducerReceivedNAckException.cs create mode 100644 src/Pole.Core/Serialization/DefaultJsonSerializer.cs diff --git a/src/Pole.Core/Channels/MpscChannel.cs b/src/Pole.Core/Channels/MpscChannel.cs index 77de015..0aaa382 100644 --- a/src/Pole.Core/Channels/MpscChannel.cs +++ b/src/Pole.Core/Channels/MpscChannel.cs @@ -68,10 +68,13 @@ namespace Pole.Core.Channels { if (consumer is null) throw new NoBindConsumerException(GetType().Name); - if (!IsChildren && _autoConsuming == 0) - ActiveAutoConsumer(); + if (!buffer.Post(data)) return await buffer.SendAsync(data); + + if (!IsChildren && _autoConsuming == 0) + ActiveAutoConsumer(); + return true; } private void ActiveAutoConsumer() diff --git a/src/Pole.Core/Consts.cs b/src/Pole.Core/Consts.cs index cf3dd2a..373af80 100644 --- a/src/Pole.Core/Consts.cs +++ b/src/Pole.Core/Consts.cs @@ -8,5 +8,8 @@ namespace Pole.Core public static class Consts { public static ValueTask ValueTaskDone = new ValueTask(); + public const string ConsumerRetryTimesStr = "pole-consumer-retry-times"; + public const string ConsumerExceptionDetailsStr = "pole-consumer-exception-details"; } } +} diff --git a/src/Pole.Core/Exceptions/ProducerConfirmTimeOutException.cs b/src/Pole.Core/Exceptions/ProducerConfirmTimeOutException.cs new file mode 100644 index 0000000..884237f --- /dev/null +++ b/src/Pole.Core/Exceptions/ProducerConfirmTimeOutException.cs @@ -0,0 +1,14 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace Pole.Core.Exceptions +{ + public class ProducerConfirmTimeOutException : Exception + { + public ProducerConfirmTimeOutException(int timeout) : base($"Producer wait to confirm for {timeout} seconds, timeout") + { + + } + } +} diff --git a/src/Pole.Core/Exceptions/ProducerReceivedNAckException.cs b/src/Pole.Core/Exceptions/ProducerReceivedNAckException.cs new file mode 100644 index 0000000..83c3d6d --- /dev/null +++ b/src/Pole.Core/Exceptions/ProducerReceivedNAckException.cs @@ -0,0 +1,14 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace Pole.Core.Exceptions +{ + public class ProducerReceivedNAckException: Exception + { + public ProducerReceivedNAckException():base("Producer received a NAck, the broker is busy") + { + + } + } +} diff --git a/src/Pole.Core/Serialization/DefaultJsonSerializer.cs b/src/Pole.Core/Serialization/DefaultJsonSerializer.cs new file mode 100644 index 0000000..c8664fc --- /dev/null +++ b/src/Pole.Core/Serialization/DefaultJsonSerializer.cs @@ -0,0 +1,50 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Text.Encodings.Web; +using System.Text.Json; +using System.Text.Unicode; + +namespace Pole.Core.Serialization +{ + public class DefaultJsonSerializer : ISerializer + { + static readonly JsonSerializerOptions options = new JsonSerializerOptions() { Encoder = JavaScriptEncoder.Create(UnicodeRanges.All) }; + public T Deserialize(string json) where T : class, new() + { + return JsonSerializer.Deserialize(json); + } + + public object Deserialize(byte[] bytes, Type type) + { + return JsonSerializer.Deserialize(bytes, type); + } + public string Serialize(T data) where T : class, new() + { + return JsonSerializer.Serialize(data, options); + } + public string Serialize(object data, Type type) + { + return JsonSerializer.Serialize(data, type, options); + } + public byte[] SerializeToUtf8Bytes(T data) where T : class, new() + { + return JsonSerializer.SerializeToUtf8Bytes(data, data.GetType(), options); + } + + public T Deserialize(byte[] bytes) where T : class, new() + { + return JsonSerializer.Deserialize(bytes); + } + + public byte[] SerializeToUtf8Bytes(object data, Type type) + { + return JsonSerializer.SerializeToUtf8Bytes(data, type, options); + } + + public object Deserialize(string json, Type type) + { + return JsonSerializer.Deserialize(json, type); + } + } +} diff --git a/src/Pole.EventBus.Rabbitmq/Client/ConnectionWrapper.cs b/src/Pole.EventBus.Rabbitmq/Client/ConnectionWrapper.cs index 25b5704..7e3db14 100644 --- a/src/Pole.EventBus.Rabbitmq/Client/ConnectionWrapper.cs +++ b/src/Pole.EventBus.Rabbitmq/Client/ConnectionWrapper.cs @@ -6,7 +6,7 @@ namespace Pole.EventBus.RabbitMQ { public class ConnectionWrapper { - private readonly List models = new List(); + private readonly List channels = new List(); private readonly IConnection connection; readonly SemaphoreSlim semaphoreSlim = new SemaphoreSlim(1); public ConnectionWrapper( @@ -22,11 +22,11 @@ namespace Pole.EventBus.RabbitMQ semaphoreSlim.Wait(); try { - if (models.Count < Options.MasChannelsPerConnection) + if (channels.Count < Options.MasChannelsPerConnection) { - var model = new ModelWrapper(this, connection.CreateModel()); - models.Add(model); - return (true, model); + var channel = new ModelWrapper(this, connection.CreateModel()); + channels.Add(channel); + return (true, channel); } } finally @@ -37,7 +37,7 @@ namespace Pole.EventBus.RabbitMQ } public void Return(ModelWrapper model) { - models.Remove(model); + channels.Remove(model); } } } diff --git a/src/Pole.EventBus.Rabbitmq/Client/IRabbitMQClient.cs b/src/Pole.EventBus.Rabbitmq/Client/IRabbitMQClient.cs index 005c988..eeab161 100644 --- a/src/Pole.EventBus.Rabbitmq/Client/IRabbitMQClient.cs +++ b/src/Pole.EventBus.Rabbitmq/Client/IRabbitMQClient.cs @@ -2,6 +2,6 @@ { public interface IRabbitMQClient { - ModelWrapper PullModel(); + ModelWrapper PullChannel(); } } diff --git a/src/Pole.EventBus.Rabbitmq/Client/ModelWrapper.cs b/src/Pole.EventBus.Rabbitmq/Client/ModelWrapper.cs index 3fe924b..129a4af 100644 --- a/src/Pole.EventBus.Rabbitmq/Client/ModelWrapper.cs +++ b/src/Pole.EventBus.Rabbitmq/Client/ModelWrapper.cs @@ -1,4 +1,6 @@ using Microsoft.Extensions.ObjectPool; +using Pole.Core; +using Pole.Core.Exceptions; using RabbitMQ.Client; using System; @@ -17,14 +19,31 @@ namespace Pole.EventBus.RabbitMQ { Connection = connectionWrapper; Model = model; + var consumeRetryTimes = 0; + var consumeRetryTimesStr = consumeRetryTimes.ToString(); persistentProperties = Model.CreateBasicProperties(); persistentProperties.Persistent = true; + persistentProperties.Headers.Add(Consts.ConsumerRetryTimesStr, consumeRetryTimesStr); noPersistentProperties = Model.CreateBasicProperties(); noPersistentProperties.Persistent = false; + noPersistentProperties.Headers.Add(Consts.ConsumerRetryTimesStr, consumeRetryTimesStr); } public void Publish(byte[] msg, string exchange, string routingKey, bool persistent = true) { + Model.ConfirmSelect(); Model.BasicPublish(exchange, routingKey, persistent ? persistentProperties : noPersistentProperties, msg); + if (!Model.WaitForConfirms(TimeSpan.FromSeconds(Connection.Options.ProducerConfirmWaitTimeoutSeconds), out bool isTimeout)) + { + if (isTimeout) + { + throw new ProducerConfirmTimeOutException(Connection.Options.ProducerConfirmWaitTimeoutSeconds); + } + else + { + throw new ProducerReceivedNAckException(); + } + } + } public void Dispose() { diff --git a/src/Pole.EventBus.Rabbitmq/Client/RabbitMQClient.cs b/src/Pole.EventBus.Rabbitmq/Client/RabbitMQClient.cs index aed0fce..1b2be55 100644 --- a/src/Pole.EventBus.Rabbitmq/Client/RabbitMQClient.cs +++ b/src/Pole.EventBus.Rabbitmq/Client/RabbitMQClient.cs @@ -22,7 +22,7 @@ namespace Pole.EventBus.RabbitMQ pool = new DefaultObjectPool(new ModelPooledObjectPolicy(connectionFactory, options)); } - public ModelWrapper PullModel() + public ModelWrapper PullChannel() { var result = pool.Get(); if (result.Pool is null) diff --git a/src/Pole.EventBus.Rabbitmq/Configuration/ConsumerOptions.cs b/src/Pole.EventBus.Rabbitmq/Configuration/ConsumerOptions.cs index d4fc021..6a8f391 100644 --- a/src/Pole.EventBus.Rabbitmq/Configuration/ConsumerOptions.cs +++ b/src/Pole.EventBus.Rabbitmq/Configuration/ConsumerOptions.cs @@ -10,8 +10,16 @@ /// public bool AutoAck { get; set; } /// - /// 消息处理失败是否重回队列还是不停重发 + /// 消息处理失败是否重回队列 /// public bool Reenqueue { get; set; } + /// + /// 错误队列后缀 + /// + public string ErrorQueueSuffix { get; set; } + /// + /// 消息处理失败最大重试次数 + /// + public int MaxReenqueueTimes { get; set; } } } diff --git a/src/Pole.EventBus.Rabbitmq/Configuration/RabbitOptions.cs b/src/Pole.EventBus.Rabbitmq/Configuration/RabbitOptions.cs index e74b320..13b545b 100644 --- a/src/Pole.EventBus.Rabbitmq/Configuration/RabbitOptions.cs +++ b/src/Pole.EventBus.Rabbitmq/Configuration/RabbitOptions.cs @@ -12,7 +12,7 @@ namespace Pole.EventBus.RabbitMQ /// /// 目前为一个连接 当消息数量非常大时,单个TCP连接的运输能力有限,可以修改这个最大连接数提高运输能力 /// - public int MaxConnection { get; set; } = 1; + public int MaxConnection { get; set; } = 10; /// /// 消费者批量处理每次处理的最大消息量 /// @@ -22,6 +22,10 @@ namespace Pole.EventBus.RabbitMQ /// public int CunsumerMaxMillisecondsInterval { get; set; } = 1000; /// + /// 消费者批量处理每次处理的最大延时 + /// + public int ProducerConfirmWaitTimeoutSeconds { get; set; } = 5; + /// /// exchange 和 queue 名称的前缀 /// public string Prefix = "Pole_"; diff --git a/src/Pole.EventBus.Rabbitmq/Consumer/ConsumerRunner.cs b/src/Pole.EventBus.Rabbitmq/Consumer/ConsumerRunner.cs index 176d98f..0b012e3 100644 --- a/src/Pole.EventBus.Rabbitmq/Consumer/ConsumerRunner.cs +++ b/src/Pole.EventBus.Rabbitmq/Consumer/ConsumerRunner.cs @@ -7,12 +7,15 @@ using System; using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; +using Pole.Core; +using Pole.Core.Serialization; namespace Pole.EventBus.RabbitMQ { public class ConsumerRunner { readonly IMpscChannel mpscChannel; + readonly ISerializer serializer; public ConsumerRunner( IRabbitMQClient client, IServiceProvider provider, @@ -21,6 +24,7 @@ namespace Pole.EventBus.RabbitMQ { Client = client; Logger = provider.GetService>(); + serializer = provider.GetService(); mpscChannel = provider.GetService>(); mpscChannel.BindConsumer(BatchExecuter); Consumer = consumer; @@ -36,14 +40,14 @@ namespace Pole.EventBus.RabbitMQ private bool isFirst = true; public Task Run() { - Model = Client.PullModel(); + Model = Client.PullChannel(); mpscChannel.Config(Model.Connection.Options.CunsumerMaxBatchSize, Model.Connection.Options.CunsumerMaxMillisecondsInterval); if (isFirst) { isFirst = false; Model.Model.ExchangeDeclare(Consumer.EventBus.Exchange, "direct", true); Model.Model.ExchangeDeclare(Queue.Queue, "direct", true); - Model.Model.ExchangeBind(Consumer.EventBus.Exchange, Queue.Queue,string.Empty); + Model.Model.ExchangeBind(Consumer.EventBus.Exchange, Queue.Queue, string.Empty); Model.Model.QueueDeclare(Queue.Queue, true, false, false, null); Model.Model.QueueBind(Queue.Queue, Queue.Queue, string.Empty); } @@ -98,21 +102,51 @@ namespace Pole.EventBus.RabbitMQ try { await Consumer.Notice(ea.Body); - if (!Consumer.Config.AutoAck) - { - Model.Model.BasicAck(ea.DeliveryTag, false); - } } catch (Exception exception) { - Logger.LogError(exception.InnerException ?? exception, $"An error occurred in {Consumer.EventBus.Exchange}-{Queue}"); - if (Consumer.Config.Reenqueue) + Logger.LogError(exception, $"An error occurred in {Queue.Queue}, routing path {Consumer.EventBus.Exchange}->{Queue.Queue}->{Queue.Queue}"); + await ProcessComsumerErrors(ea, exception); + } + if (!Consumer.Config.AutoAck) + { + Model.Model.BasicAck(ea.DeliveryTag, false); + } + } + + private async Task ProcessComsumerErrors(BasicDeliverEventArgs ea, Exception exception) + { + if (Consumer.Config.Reenqueue) + { + if (ea.BasicProperties.Headers.TryGetValue(Consts.ConsumerRetryTimesStr, out object retryTimesObj)) { - await Task.Delay(1000); - Model.Model.BasicReject(ea.DeliveryTag, true); + var retryTimes = Convert.ToInt32(retryTimesObj); + if (retryTimes <= Consumer.Config.MaxReenqueueTimes) + { + retryTimes++; + ea.BasicProperties.Headers[Consts.ConsumerRetryTimesStr] = retryTimes; + ea.BasicProperties.Headers[Consts.ConsumerExceptionDetailsStr] = serializer.Serialize(exception, typeof(Exception)); + await Task.Delay((int)Math.Pow(2, retryTimes) * 1000).ContinueWith((task) => + { + Model.Model.BasicReject(ea.DeliveryTag, true); + }); + } + else + { + var errorQueueName = $"{Queue.Queue}{Consumer.Config.ErrorQueueSuffix}"; + var errorExchangeName = $"{Queue.Queue}{Consumer.Config.ErrorQueueSuffix}"; + Model.Model.ExchangeDeclare(errorExchangeName, "direct", true); + Model.Model.QueueDeclare(errorQueueName, true, false, false, null); + Model.Model.QueueBind(errorQueueName, errorExchangeName, string.Empty); + } + if (!Consumer.Config.AutoAck) + { + Model.Model.BasicAck(ea.DeliveryTag, false); + } } } } + public void Close() { Model?.Dispose(); diff --git a/src/Pole.EventBus.Rabbitmq/Core/EventBusContainer.cs b/src/Pole.EventBus.Rabbitmq/Core/EventBusContainer.cs index 1fd889a..81bfd3b 100644 --- a/src/Pole.EventBus.Rabbitmq/Core/EventBusContainer.cs +++ b/src/Pole.EventBus.Rabbitmq/Core/EventBusContainer.cs @@ -42,19 +42,19 @@ namespace Pole.EventBus.RabbitMQ AddEventAndEventHandlerInfoList(eventList, evenHandlertList); foreach (var (type, config) in eventList) { - var eventName = string.IsNullOrEmpty(config.EventName) ? type.Name : config.EventName; - var eventBus = CreateEventBus(eventName, rabbitOptions.Prefix, 2, true, true, true).BindEvent(type, eventName); + var eventName = string.IsNullOrEmpty(config.EventName) ? type.Name.ToLower() : config.EventName; + var eventBus = CreateEventBus(eventName, rabbitOptions.Prefix, 1, false, true, true).BindEvent(type, eventName); await eventBus.AddGrainConsumer(); } foreach (var (type, config) in evenHandlertList) { - var eventName = string.IsNullOrEmpty(config.EventName) ? type.Name : config.EventName; - var eventBus = CreateEventBus(eventName, rabbitOptions.Prefix, 2, true, true, true).BindEvent(type, eventName); + var eventName = string.IsNullOrEmpty(config.EventName) ? type.Name.ToLower() : config.EventName; + var eventBus = CreateEventBus(eventName, rabbitOptions.Prefix, 1, false, true, true).BindEvent(type, eventName); await eventBus.AddGrainConsumer(); } } - public RabbitEventBus CreateEventBus(string exchange, string routePrefix, int lBCount = 1, bool autoAck = true, bool reenqueue = true, bool persistent = true) + public RabbitEventBus CreateEventBus(string exchange, string routePrefix, int lBCount = 1, bool autoAck = false, bool reenqueue = true, bool persistent = true) { return new RabbitEventBus(observerUnitContainer, this, exchange, routePrefix, lBCount, autoAck, reenqueue, persistent); } @@ -63,7 +63,7 @@ namespace Pole.EventBus.RabbitMQ if (eventBusDictionary.TryAdd(bus.Event, bus)) { eventBusList.Add(bus); - using var channel = rabbitMQClient.PullModel(); + using var channel = rabbitMQClient.PullChannel(); channel.Model.ExchangeDeclare(bus.Exchange, "direct", true); return Task.CompletedTask; } diff --git a/src/Pole.EventBus.Rabbitmq/Core/RabbitEventBus.cs b/src/Pole.EventBus.Rabbitmq/Core/RabbitEventBus.cs index b001624..358cbdc 100644 --- a/src/Pole.EventBus.Rabbitmq/Core/RabbitEventBus.cs +++ b/src/Pole.EventBus.Rabbitmq/Core/RabbitEventBus.cs @@ -32,6 +32,8 @@ namespace Pole.EventBus.RabbitMQ { AutoAck = autoAck, Reenqueue = reenqueue, + ErrorQueueSuffix = "_error", + MaxReenqueueTimes = 10 }; } public IRabbitEventBusContainer Container { get; } diff --git a/src/Pole.EventBus.Rabbitmq/Producer/RabbitProducer.cs b/src/Pole.EventBus.Rabbitmq/Producer/RabbitProducer.cs index 25a5f1a..5c0b4f5 100644 --- a/src/Pole.EventBus.Rabbitmq/Producer/RabbitProducer.cs +++ b/src/Pole.EventBus.Rabbitmq/Producer/RabbitProducer.cs @@ -17,8 +17,8 @@ namespace Pole.EventBus.RabbitMQ } public ValueTask Publish(byte[] bytes, string hashKey) { - using var model = rabbitMQClient.PullModel(); - model.Publish(bytes, publisher.Exchange, publisher.GetRoute(hashKey), publisher.Persistent); + using var channel = rabbitMQClient.PullChannel(); + channel.Publish(bytes, publisher.Exchange, publisher.GetRoute(hashKey), publisher.Persistent); return Consts.ValueTaskDone; } } -- libgit2 0.25.0