Commit 6834cd1a by dingsongjie

修复 bug

parent a7eba34a
......@@ -7,13 +7,8 @@ using System.Threading.Tasks;
namespace Backet.Api.EventHandlers
{
public class ToNoticeBacketCreatedEventHandler : PoleEventHandler<BacketCreatedEvent>, IPoleBulkEventsHandler<BacketCreatedEvent>
public class ToNoticeBacketCreatedEventHandler : PoleEventHandler<BacketCreatedEvent>
{
public async Task BulkEventsHandle(List<BacketCreatedEvent> @event)
{
await Task.Delay(1500);
}
public override async Task EventHandle(BacketCreatedEvent @event)
{
await Task.Delay(1200);
......
......@@ -6,10 +6,6 @@
public class ConsumerOptions
{
/// <summary>
/// 是否自动ack
/// </summary>
public bool AutoAck { get; set; }
/// <summary>
/// 消息处理失败是否重回队列
/// </summary>
public bool Reenqueue { get; set; }
......
......@@ -29,7 +29,7 @@ namespace Pole.EventBus.RabbitMQ
/// <summary>
/// exchange 和 queue 名称的前缀
/// </summary>
public string Prefix = "Pole_";
public string Prefix = "Pole";
public string[] Hosts
{
get; set;
......
......@@ -65,7 +65,7 @@ namespace Pole.EventBus.RabbitMQ
{
await mpscChannel.WriteAsync(ea);
};
BasicConsumer.ConsumerTag = Model.Model.BasicConsume(Queue.Queue, Consumer.Config.AutoAck, BasicConsumer);
BasicConsumer.ConsumerTag = Model.Model.BasicConsume(Queue.Queue, false, BasicConsumer);
return Task.CompletedTask;
}
private async Task BatchExecuter(List<BasicDeliverEventArgs> list)
......@@ -81,31 +81,27 @@ namespace Pole.EventBus.RabbitMQ
{
foreach (var item in list)
{
await ProcessComsumerErrors(item, exception);
ProcessComsumerErrors(item, exception);
}
return;
}
}
if (!Consumer.Config.AutoAck)
if (errorMessageDeliveryTags.Count == 0)
{
if (errorMessageDeliveryTags.Count == 0)
{
Model.Model.BasicAck(list.Max(o => o.DeliveryTag), true);
}
else
Model.Model.BasicAck(list.Max(o => o.DeliveryTag), true);
}
else
{
list.ForEach(m =>
{
list.ForEach(m =>
{
Model.Model.BasicAck(m.DeliveryTag, false);
});
}
Model.Model.BasicAck(m.DeliveryTag, false);
});
}
}
private async Task ProcessComsumerErrors(BasicDeliverEventArgs ea, Exception exception)
private void ProcessComsumerErrors(BasicDeliverEventArgs ea, Exception exception)
{
// todo 这里需要添加断路器 防止超量的 Task.Delay
if (ea.BasicProperties.Headers.TryGetValue(Consts.ConsumerRetryTimesStr, out object retryTimesObj))
{
errorMessageDeliveryTags.Add(ea.DeliveryTag);
......@@ -117,7 +113,8 @@ namespace Pole.EventBus.RabbitMQ
retryTimes++;
ea.BasicProperties.Headers[Consts.ConsumerRetryTimesStr] = retryTimes.ToString();
ea.BasicProperties.Headers[Consts.ConsumerExceptionDetailsStr] = exception.InnerException != null ? exception.InnerException.Message + exception.StackTrace : exception.Message + exception.StackTrace;
await Task.Delay((int)Math.Pow(2, retryTimes) * 1000).ContinueWith((task) =>
// 默认预取数为 300 所以每个消费者 理论上最多有 300个延时任务
Task.Delay((int)Math.Pow(2, retryTimes) * 1000).ContinueWith((task) =>
{
using var channel = Client.PullChannel();
channel.Publish(ea.Body, ea.BasicProperties.Headers, Queue.Queue, string.Empty, true);
......@@ -135,10 +132,7 @@ namespace Pole.EventBus.RabbitMQ
Model.Model.QueueBind(errorQueueName, errorExchangeName, string.Empty);
using var channel = Client.PullChannel();
channel.Publish(ea.Body, ea.BasicProperties.Headers, errorExchangeName, string.Empty, true);
if (!Consumer.Config.AutoAck)
{
Model.Model.BasicAck(ea.DeliveryTag, false);
}
Model.Model.BasicAck(ea.DeliveryTag, false);
}
}
}
......
......@@ -35,7 +35,7 @@ namespace Pole.EventBus.RabbitMQ
this.observerUnitContainer = observerUnitContainer;
this.rabbitOptions = rabbitOptions.Value;
}
public async Task AutoRegister(IServiceCollection services)
public async Task AutoRegister()
{
var eventList = new List<(Type type, EventInfoAttribute config)>();
var evenHandlertList = new List<(Type type, EventInfoAttribute config)>();
......@@ -43,7 +43,7 @@ namespace Pole.EventBus.RabbitMQ
foreach (var (type, config) in eventList)
{
var eventName = config.EventName;
var eventBus = CreateEventBus(eventName, rabbitOptions.Prefix, 1, false, true, true).BindEvent(type, eventName);
var eventBus = CreateEventBus(eventName, rabbitOptions.Prefix, 1, true, true).BindEvent(type, eventName);
await eventBus.AddGrainConsumer<string>();
}
foreach (var (type, config) in evenHandlertList)
......@@ -52,24 +52,15 @@ namespace Pole.EventBus.RabbitMQ
if (!eventBusDictionary.TryGetValue(eventName, out RabbitEventBus rabbitEventBus))
{
var eventBus = CreateEventBus(eventName, rabbitOptions.Prefix, 1, false, true, true).BindEvent(type, eventName);
var eventBus = CreateEventBus(eventName, rabbitOptions.Prefix, 1, true, true).BindEvent(type, eventName);
await eventBus.AddGrainConsumer<string>();
}
}
RegisterEventHandlers(services, evenHandlertList);
}
private void RegisterEventHandlers(IServiceCollection services, List<(Type type, EventInfoAttribute config)> evenHandlertList)
public RabbitEventBus CreateEventBus(string exchange, string routePrefix, int lBCount = 1, bool reenqueue = true, bool persistent = true)
{
foreach(var eventHandler in evenHandlertList)
{
services.AddScoped(eventHandler.type);
}
}
public RabbitEventBus CreateEventBus(string exchange, string routePrefix, int lBCount = 1, bool autoAck = false, bool reenqueue = true, bool persistent = true)
{
return new RabbitEventBus(observerUnitContainer, this, exchange, routePrefix, lBCount, autoAck, reenqueue, persistent);
return new RabbitEventBus(observerUnitContainer, this, exchange, routePrefix, lBCount, reenqueue, persistent);
}
public Task Work(RabbitEventBus bus)
{
......
......@@ -5,8 +5,8 @@ namespace Pole.EventBus.RabbitMQ
{
public interface IRabbitEventBusContainer : IConsumerContainer
{
Task AutoRegister(IServiceCollection service);
RabbitEventBus CreateEventBus(string exchange, string routePrefix, int lBCount = 1, bool autoAck = false, bool reenqueue = false, bool persistent = false);
Task AutoRegister();
RabbitEventBus CreateEventBus(string exchange, string routePrefix, int lBCount = 1, bool reenqueue = false, bool persistent = false);
Task Work(RabbitEventBus bus);
}
}
......@@ -13,7 +13,7 @@ namespace Pole.EventBus.RabbitMQ
public RabbitEventBus(
IObserverUnitContainer observerUnitContainer,
IRabbitEventBusContainer eventBusContainer,
string exchange, string routePrefix, int lBCount = 1, bool autoAck = false, bool reenqueue = true, bool persistent = false)
string exchange, string routePrefix, int lBCount = 1, bool reenqueue = true, bool persistent = false)
{
if (string.IsNullOrEmpty(exchange))
throw new ArgumentNullException(nameof(exchange));
......@@ -29,7 +29,6 @@ namespace Pole.EventBus.RabbitMQ
Persistent = persistent;
ConsumerConfig = new ConsumerOptions
{
AutoAck = autoAck,
Reenqueue = reenqueue,
ErrorQueueSuffix = "_error",
MaxReenqueueTimes = 10
......
......@@ -31,7 +31,7 @@ namespace Microsoft.Extensions.DependencyInjection
if (eventBusConfig != default)
await eventBusConfig(container);
else
await container.AutoRegister(startupOption.Services);
await container.AutoRegister();
var consumers = container.GetConsumers();
foreach (var consumer in consumers)
......
......@@ -19,11 +19,19 @@ namespace Pole.EventBus.EventHandler
/// <summary>
///
/// </summary>
public abstract class PoleEventHandler<TEvent>:IPoleEventHandler,IPoleEventHandler<TEvent>
public abstract class PoleEventHandler<TEvent> : PoleEventHandlerBase<TEvent>, IPoleEventHandler<TEvent>
{
public abstract Task EventHandle(TEvent @event);
public async Task Invoke(List<EventBytesTransport> transports, ISerializer serializer, IEventTypeFinder eventTypeFinder, ILogger logger,Type eventHandlerType)
}
public abstract class PoleBulkEventsHandler<TEvent> : PoleEventHandlerBase<TEvent>, IPoleBulkEventsHandler<TEvent>
{
public abstract Task BulkEventsHandle(List<TEvent> events);
}
public abstract class PoleEventHandlerBase<TEvent> : IPoleEventHandler
{
public async Task Invoke(List<EventBytesTransport> transports, ISerializer serializer, IEventTypeFinder eventTypeFinder, ILogger logger, Type eventHandlerType)
{
if (transports.Count() != 0)
{
......
......@@ -33,10 +33,6 @@ namespace Pole.EventBus
Logger = serviceProvider.GetService<ILogger<ObserverUnit<PrimaryKey>>>();
EventHandlerType = eventHandlerType;
}
public static ObserverUnit<PrimaryKey> From<Grain>(IServiceProvider serviceProvider) where Grain : Orleans.Grain
{
return new ObserverUnit<PrimaryKey>(serviceProvider, typeof(Grain));
}
public Func<List<byte[]>, Task> GetBatchEventHandler()
{
......@@ -89,7 +85,7 @@ namespace Pole.EventBus
var loggerParams = Expression.Parameter(typeof(ILogger), "logger");
var eventHandlerTypeParams = Expression.Parameter(typeof(Type), "eventHandlerType");
var method = typeof(IPoleEventHandler).GetMethod("Invoke");
var body = Expression.Call(eventHandlerParams, method, eventBytesTransportParams, serializerParams, serializerParams, eventTypeFinderParams, loggerParams, eventHandlerTypeParams);
var body = Expression.Call(eventHandlerParams, method, eventBytesTransportParams, serializerParams, eventTypeFinderParams, loggerParams, eventHandlerTypeParams);
return Expression.Lambda<Func<object, List<EventBytesTransport>, ISerializer, IEventTypeFinder, ILogger, Type, Task>>(body, eventHandlerObjParams, eventBytesTransportParams, serializerParams, eventTypeFinderParams, loggerParams, eventHandlerTypeParams).Compile();
});
return func;
......
using Microsoft.Extensions.DependencyInjection;
using Pole.Core;
using Pole.Core.Processor;
using Pole.Core.Utils;
using Pole.EventBus;
using Pole.EventBus.EventHandler;
using Pole.EventBus.Processor;
using Pole.EventBus.Processor.Server;
using Pole.EventBus.UnitOfWork;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace Microsoft.Extensions.DependencyInjection
......@@ -27,7 +30,20 @@ namespace Microsoft.Extensions.DependencyInjection
startupOption.Services.AddHostedService<BackgroundServiceBasedProcessorServer>();
startupOption.Services.AddScoped<IUnitOfWork, Pole.EventBus.UnitOfWork.UnitOfWork>();
startupOption.Services.AddSingleton<IEventTypeFinder, EventTypeFinder>();
RegisterEventHandler(startupOption);
return startupOption;
}
private static void RegisterEventHandler(StartupConfig startupOption)
{
foreach (var assembly in AssemblyHelper.GetAssemblies())
{
foreach (var type in assembly.GetTypes().Where(m => typeof(IPoleEventHandler).IsAssignableFrom(m) && m.IsClass && !m.IsAbstract && !typeof(Orleans.Runtime.GrainReference).IsAssignableFrom(m)))
{
startupOption.Services.AddScoped(type);
}
}
}
}
}
......@@ -55,6 +55,7 @@ namespace Pole.EventBus.Processor
public async Task ProcessInternal()
{
var now = DateTime.UtcNow;
var pendingMessages = await eventStorage.GetMessagesOfNeedRetry();
if (logger.IsEnabled(LogLevel.Debug))
......@@ -74,6 +75,7 @@ namespace Pole.EventBus.Processor
pendingMessage.Retries++;
var targetName = producerContainer.GetTargetName(pendingMessage.Name);
await producer.Publish(targetName, bytes);
pendingMessage.StatusName = nameof(EventStatus.Published);
}
if (pendingMessages.Count() > 0)
{
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment