Commit 8fc078fb by 丁松杰

完成 发送者 发送确认 以及部分 消费者 重试及错误队列的功能

parent 5f7a7238
......@@ -68,10 +68,13 @@ namespace Pole.Core.Channels
{
if (consumer is null)
throw new NoBindConsumerException(GetType().Name);
if (!IsChildren && _autoConsuming == 0)
ActiveAutoConsumer();
if (!buffer.Post(data))
return await buffer.SendAsync(data);
if (!IsChildren && _autoConsuming == 0)
ActiveAutoConsumer();
return true;
}
private void ActiveAutoConsumer()
......
......@@ -8,5 +8,8 @@ namespace Pole.Core
public static class Consts
{
public static ValueTask ValueTaskDone = new ValueTask();
public const string ConsumerRetryTimesStr = "pole-consumer-retry-times";
public const string ConsumerExceptionDetailsStr = "pole-consumer-exception-details";
}
}
}
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.Core.Exceptions
{
public class ProducerConfirmTimeOutException : Exception
{
public ProducerConfirmTimeOutException(int timeout) : base($"Producer wait to confirm for {timeout} seconds, timeout")
{
}
}
}
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.Core.Exceptions
{
public class ProducerReceivedNAckException: Exception
{
public ProducerReceivedNAckException():base("Producer received a NAck, the broker is busy")
{
}
}
}
using System;
using System.Collections.Generic;
using System.Text;
using System.Text.Encodings.Web;
using System.Text.Json;
using System.Text.Unicode;
namespace Pole.Core.Serialization
{
public class DefaultJsonSerializer : ISerializer
{
static readonly JsonSerializerOptions options = new JsonSerializerOptions() { Encoder = JavaScriptEncoder.Create(UnicodeRanges.All) };
public T Deserialize<T>(string json) where T : class, new()
{
return JsonSerializer.Deserialize<T>(json);
}
public object Deserialize(byte[] bytes, Type type)
{
return JsonSerializer.Deserialize(bytes, type);
}
public string Serialize<T>(T data) where T : class, new()
{
return JsonSerializer.Serialize(data, options);
}
public string Serialize(object data, Type type)
{
return JsonSerializer.Serialize(data, type, options);
}
public byte[] SerializeToUtf8Bytes<T>(T data) where T : class, new()
{
return JsonSerializer.SerializeToUtf8Bytes(data, data.GetType(), options);
}
public T Deserialize<T>(byte[] bytes) where T : class, new()
{
return JsonSerializer.Deserialize<T>(bytes);
}
public byte[] SerializeToUtf8Bytes(object data, Type type)
{
return JsonSerializer.SerializeToUtf8Bytes(data, type, options);
}
public object Deserialize(string json, Type type)
{
return JsonSerializer.Deserialize(json, type);
}
}
}
......@@ -6,7 +6,7 @@ namespace Pole.EventBus.RabbitMQ
{
public class ConnectionWrapper
{
private readonly List<ModelWrapper> models = new List<ModelWrapper>();
private readonly List<ModelWrapper> channels = new List<ModelWrapper>();
private readonly IConnection connection;
readonly SemaphoreSlim semaphoreSlim = new SemaphoreSlim(1);
public ConnectionWrapper(
......@@ -22,11 +22,11 @@ namespace Pole.EventBus.RabbitMQ
semaphoreSlim.Wait();
try
{
if (models.Count < Options.MasChannelsPerConnection)
if (channels.Count < Options.MasChannelsPerConnection)
{
var model = new ModelWrapper(this, connection.CreateModel());
models.Add(model);
return (true, model);
var channel = new ModelWrapper(this, connection.CreateModel());
channels.Add(channel);
return (true, channel);
}
}
finally
......@@ -37,7 +37,7 @@ namespace Pole.EventBus.RabbitMQ
}
public void Return(ModelWrapper model)
{
models.Remove(model);
channels.Remove(model);
}
}
}
......@@ -2,6 +2,6 @@
{
public interface IRabbitMQClient
{
ModelWrapper PullModel();
ModelWrapper PullChannel();
}
}
using Microsoft.Extensions.ObjectPool;
using Pole.Core;
using Pole.Core.Exceptions;
using RabbitMQ.Client;
using System;
......@@ -17,14 +19,31 @@ namespace Pole.EventBus.RabbitMQ
{
Connection = connectionWrapper;
Model = model;
var consumeRetryTimes = 0;
var consumeRetryTimesStr = consumeRetryTimes.ToString();
persistentProperties = Model.CreateBasicProperties();
persistentProperties.Persistent = true;
persistentProperties.Headers.Add(Consts.ConsumerRetryTimesStr, consumeRetryTimesStr);
noPersistentProperties = Model.CreateBasicProperties();
noPersistentProperties.Persistent = false;
noPersistentProperties.Headers.Add(Consts.ConsumerRetryTimesStr, consumeRetryTimesStr);
}
public void Publish(byte[] msg, string exchange, string routingKey, bool persistent = true)
{
Model.ConfirmSelect();
Model.BasicPublish(exchange, routingKey, persistent ? persistentProperties : noPersistentProperties, msg);
if (!Model.WaitForConfirms(TimeSpan.FromSeconds(Connection.Options.ProducerConfirmWaitTimeoutSeconds), out bool isTimeout))
{
if (isTimeout)
{
throw new ProducerConfirmTimeOutException(Connection.Options.ProducerConfirmWaitTimeoutSeconds);
}
else
{
throw new ProducerReceivedNAckException();
}
}
}
public void Dispose()
{
......
......@@ -22,7 +22,7 @@ namespace Pole.EventBus.RabbitMQ
pool = new DefaultObjectPool<ModelWrapper>(new ModelPooledObjectPolicy(connectionFactory, options));
}
public ModelWrapper PullModel()
public ModelWrapper PullChannel()
{
var result = pool.Get();
if (result.Pool is null)
......
......@@ -10,8 +10,16 @@
/// </summary>
public bool AutoAck { get; set; }
/// <summary>
/// 消息处理失败是否重回队列还是不停重发
/// 消息处理失败是否重回队列
/// </summary>
public bool Reenqueue { get; set; }
/// <summary>
/// 错误队列后缀
/// </summary>
public string ErrorQueueSuffix { get; set; }
/// <summary>
/// 消息处理失败最大重试次数
/// </summary>
public int MaxReenqueueTimes { get; set; }
}
}
......@@ -12,7 +12,7 @@ namespace Pole.EventBus.RabbitMQ
/// <summary>
/// 目前为一个连接 当消息数量非常大时,单个TCP连接的运输能力有限,可以修改这个最大连接数提高运输能力
/// </summary>
public int MaxConnection { get; set; } = 1;
public int MaxConnection { get; set; } = 10;
/// <summary>
/// 消费者批量处理每次处理的最大消息量
/// </summary>
......@@ -22,6 +22,10 @@ namespace Pole.EventBus.RabbitMQ
/// </summary>
public int CunsumerMaxMillisecondsInterval { get; set; } = 1000;
/// <summary>
/// 消费者批量处理每次处理的最大延时
/// </summary>
public int ProducerConfirmWaitTimeoutSeconds { get; set; } = 5;
/// <summary>
/// exchange 和 queue 名称的前缀
/// </summary>
public string Prefix = "Pole_";
......
......@@ -7,12 +7,15 @@ using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Pole.Core;
using Pole.Core.Serialization;
namespace Pole.EventBus.RabbitMQ
{
public class ConsumerRunner
{
readonly IMpscChannel<BasicDeliverEventArgs> mpscChannel;
readonly ISerializer serializer;
public ConsumerRunner(
IRabbitMQClient client,
IServiceProvider provider,
......@@ -21,6 +24,7 @@ namespace Pole.EventBus.RabbitMQ
{
Client = client;
Logger = provider.GetService<ILogger<ConsumerRunner>>();
serializer = provider.GetService<ISerializer>();
mpscChannel = provider.GetService<IMpscChannel<BasicDeliverEventArgs>>();
mpscChannel.BindConsumer(BatchExecuter);
Consumer = consumer;
......@@ -36,14 +40,14 @@ namespace Pole.EventBus.RabbitMQ
private bool isFirst = true;
public Task Run()
{
Model = Client.PullModel();
Model = Client.PullChannel();
mpscChannel.Config(Model.Connection.Options.CunsumerMaxBatchSize, Model.Connection.Options.CunsumerMaxMillisecondsInterval);
if (isFirst)
{
isFirst = false;
Model.Model.ExchangeDeclare(Consumer.EventBus.Exchange, "direct", true);
Model.Model.ExchangeDeclare(Queue.Queue, "direct", true);
Model.Model.ExchangeBind(Consumer.EventBus.Exchange, Queue.Queue,string.Empty);
Model.Model.ExchangeBind(Consumer.EventBus.Exchange, Queue.Queue, string.Empty);
Model.Model.QueueDeclare(Queue.Queue, true, false, false, null);
Model.Model.QueueBind(Queue.Queue, Queue.Queue, string.Empty);
}
......@@ -98,21 +102,51 @@ namespace Pole.EventBus.RabbitMQ
try
{
await Consumer.Notice(ea.Body);
if (!Consumer.Config.AutoAck)
{
Model.Model.BasicAck(ea.DeliveryTag, false);
}
}
catch (Exception exception)
{
Logger.LogError(exception.InnerException ?? exception, $"An error occurred in {Consumer.EventBus.Exchange}-{Queue}");
if (Consumer.Config.Reenqueue)
Logger.LogError(exception, $"An error occurred in {Queue.Queue}, routing path {Consumer.EventBus.Exchange}->{Queue.Queue}->{Queue.Queue}");
await ProcessComsumerErrors(ea, exception);
}
if (!Consumer.Config.AutoAck)
{
Model.Model.BasicAck(ea.DeliveryTag, false);
}
}
private async Task ProcessComsumerErrors(BasicDeliverEventArgs ea, Exception exception)
{
if (Consumer.Config.Reenqueue)
{
if (ea.BasicProperties.Headers.TryGetValue(Consts.ConsumerRetryTimesStr, out object retryTimesObj))
{
await Task.Delay(1000);
Model.Model.BasicReject(ea.DeliveryTag, true);
var retryTimes = Convert.ToInt32(retryTimesObj);
if (retryTimes <= Consumer.Config.MaxReenqueueTimes)
{
retryTimes++;
ea.BasicProperties.Headers[Consts.ConsumerRetryTimesStr] = retryTimes;
ea.BasicProperties.Headers[Consts.ConsumerExceptionDetailsStr] = serializer.Serialize(exception, typeof(Exception));
await Task.Delay((int)Math.Pow(2, retryTimes) * 1000).ContinueWith((task) =>
{
Model.Model.BasicReject(ea.DeliveryTag, true);
});
}
else
{
var errorQueueName = $"{Queue.Queue}{Consumer.Config.ErrorQueueSuffix}";
var errorExchangeName = $"{Queue.Queue}{Consumer.Config.ErrorQueueSuffix}";
Model.Model.ExchangeDeclare(errorExchangeName, "direct", true);
Model.Model.QueueDeclare(errorQueueName, true, false, false, null);
Model.Model.QueueBind(errorQueueName, errorExchangeName, string.Empty);
}
if (!Consumer.Config.AutoAck)
{
Model.Model.BasicAck(ea.DeliveryTag, false);
}
}
}
}
public void Close()
{
Model?.Dispose();
......
......@@ -42,19 +42,19 @@ namespace Pole.EventBus.RabbitMQ
AddEventAndEventHandlerInfoList(eventList, evenHandlertList);
foreach (var (type, config) in eventList)
{
var eventName = string.IsNullOrEmpty(config.EventName) ? type.Name : config.EventName;
var eventBus = CreateEventBus(eventName, rabbitOptions.Prefix, 2, true, true, true).BindEvent(type, eventName);
var eventName = string.IsNullOrEmpty(config.EventName) ? type.Name.ToLower() : config.EventName;
var eventBus = CreateEventBus(eventName, rabbitOptions.Prefix, 1, false, true, true).BindEvent(type, eventName);
await eventBus.AddGrainConsumer<string>();
}
foreach (var (type, config) in evenHandlertList)
{
var eventName = string.IsNullOrEmpty(config.EventName) ? type.Name : config.EventName;
var eventBus = CreateEventBus(eventName, rabbitOptions.Prefix, 2, true, true, true).BindEvent(type, eventName);
var eventName = string.IsNullOrEmpty(config.EventName) ? type.Name.ToLower() : config.EventName;
var eventBus = CreateEventBus(eventName, rabbitOptions.Prefix, 1, false, true, true).BindEvent(type, eventName);
await eventBus.AddGrainConsumer<string>();
}
}
public RabbitEventBus CreateEventBus(string exchange, string routePrefix, int lBCount = 1, bool autoAck = true, bool reenqueue = true, bool persistent = true)
public RabbitEventBus CreateEventBus(string exchange, string routePrefix, int lBCount = 1, bool autoAck = false, bool reenqueue = true, bool persistent = true)
{
return new RabbitEventBus(observerUnitContainer, this, exchange, routePrefix, lBCount, autoAck, reenqueue, persistent);
}
......@@ -63,7 +63,7 @@ namespace Pole.EventBus.RabbitMQ
if (eventBusDictionary.TryAdd(bus.Event, bus))
{
eventBusList.Add(bus);
using var channel = rabbitMQClient.PullModel();
using var channel = rabbitMQClient.PullChannel();
channel.Model.ExchangeDeclare(bus.Exchange, "direct", true);
return Task.CompletedTask;
}
......
......@@ -32,6 +32,8 @@ namespace Pole.EventBus.RabbitMQ
{
AutoAck = autoAck,
Reenqueue = reenqueue,
ErrorQueueSuffix = "_error",
MaxReenqueueTimes = 10
};
}
public IRabbitEventBusContainer Container { get; }
......
......@@ -17,8 +17,8 @@ namespace Pole.EventBus.RabbitMQ
}
public ValueTask Publish(byte[] bytes, string hashKey)
{
using var model = rabbitMQClient.PullModel();
model.Publish(bytes, publisher.Exchange, publisher.GetRoute(hashKey), publisher.Persistent);
using var channel = rabbitMQClient.PullChannel();
channel.Publish(bytes, publisher.Exchange, publisher.GetRoute(hashKey), publisher.Persistent);
return Consts.ValueTaskDone;
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment