diff --git a/samples/apis/Backet.Api/EventHandlers/ToNoticeBacketCreatedEventHandler.cs b/samples/apis/Backet.Api/EventHandlers/ToNoticeBacketCreatedEventHandler.cs index da077c3..9274a10 100644 --- a/samples/apis/Backet.Api/EventHandlers/ToNoticeBacketCreatedEventHandler.cs +++ b/samples/apis/Backet.Api/EventHandlers/ToNoticeBacketCreatedEventHandler.cs @@ -7,13 +7,8 @@ using System.Threading.Tasks; namespace Backet.Api.EventHandlers { - public class ToNoticeBacketCreatedEventHandler : PoleEventHandler, IPoleBulkEventsHandler + public class ToNoticeBacketCreatedEventHandler : PoleEventHandler { - public async Task BulkEventsHandle(List @event) - { - await Task.Delay(1500); - } - public override async Task EventHandle(BacketCreatedEvent @event) { await Task.Delay(1200); diff --git a/src/Pole.EventBus.Rabbitmq/Configuration/ConsumerOptions.cs b/src/Pole.EventBus.Rabbitmq/Configuration/ConsumerOptions.cs index 6a8f391..d51c337 100644 --- a/src/Pole.EventBus.Rabbitmq/Configuration/ConsumerOptions.cs +++ b/src/Pole.EventBus.Rabbitmq/Configuration/ConsumerOptions.cs @@ -6,10 +6,6 @@ public class ConsumerOptions { /// - /// 是否自动ack - /// - public bool AutoAck { get; set; } - /// /// 消息处理失败是否重回队列 /// public bool Reenqueue { get; set; } diff --git a/src/Pole.EventBus.Rabbitmq/Configuration/RabbitOptions.cs b/src/Pole.EventBus.Rabbitmq/Configuration/RabbitOptions.cs index d3736e2..d962cdd 100644 --- a/src/Pole.EventBus.Rabbitmq/Configuration/RabbitOptions.cs +++ b/src/Pole.EventBus.Rabbitmq/Configuration/RabbitOptions.cs @@ -29,7 +29,7 @@ namespace Pole.EventBus.RabbitMQ /// /// exchange 和 queue 名称的前缀 /// - public string Prefix = "Pole_"; + public string Prefix = "Pole"; public string[] Hosts { get; set; diff --git a/src/Pole.EventBus.Rabbitmq/Consumer/ConsumerRunner.cs b/src/Pole.EventBus.Rabbitmq/Consumer/ConsumerRunner.cs index d61e95b..f45693a 100644 --- a/src/Pole.EventBus.Rabbitmq/Consumer/ConsumerRunner.cs +++ b/src/Pole.EventBus.Rabbitmq/Consumer/ConsumerRunner.cs @@ -65,7 +65,7 @@ namespace Pole.EventBus.RabbitMQ { await mpscChannel.WriteAsync(ea); }; - BasicConsumer.ConsumerTag = Model.Model.BasicConsume(Queue.Queue, Consumer.Config.AutoAck, BasicConsumer); + BasicConsumer.ConsumerTag = Model.Model.BasicConsume(Queue.Queue, false, BasicConsumer); return Task.CompletedTask; } private async Task BatchExecuter(List list) @@ -81,31 +81,27 @@ namespace Pole.EventBus.RabbitMQ { foreach (var item in list) { - await ProcessComsumerErrors(item, exception); + ProcessComsumerErrors(item, exception); } return; } } - if (!Consumer.Config.AutoAck) + + if (errorMessageDeliveryTags.Count == 0) { - if (errorMessageDeliveryTags.Count == 0) - { - Model.Model.BasicAck(list.Max(o => o.DeliveryTag), true); - } - else + Model.Model.BasicAck(list.Max(o => o.DeliveryTag), true); + } + else + { + list.ForEach(m => { - list.ForEach(m => - { - Model.Model.BasicAck(m.DeliveryTag, false); - }); - } + Model.Model.BasicAck(m.DeliveryTag, false); + }); } } - private async Task ProcessComsumerErrors(BasicDeliverEventArgs ea, Exception exception) + private void ProcessComsumerErrors(BasicDeliverEventArgs ea, Exception exception) { - // todo 这里需要添加断路器 防止超量的 Task.Delay - if (ea.BasicProperties.Headers.TryGetValue(Consts.ConsumerRetryTimesStr, out object retryTimesObj)) { errorMessageDeliveryTags.Add(ea.DeliveryTag); @@ -117,7 +113,8 @@ namespace Pole.EventBus.RabbitMQ retryTimes++; ea.BasicProperties.Headers[Consts.ConsumerRetryTimesStr] = retryTimes.ToString(); ea.BasicProperties.Headers[Consts.ConsumerExceptionDetailsStr] = exception.InnerException != null ? exception.InnerException.Message + exception.StackTrace : exception.Message + exception.StackTrace; - await Task.Delay((int)Math.Pow(2, retryTimes) * 1000).ContinueWith((task) => + // 默认预取数为 300 所以每个消费者 理论上最多有 300个延时任务 + Task.Delay((int)Math.Pow(2, retryTimes) * 1000).ContinueWith((task) => { using var channel = Client.PullChannel(); channel.Publish(ea.Body, ea.BasicProperties.Headers, Queue.Queue, string.Empty, true); @@ -135,10 +132,7 @@ namespace Pole.EventBus.RabbitMQ Model.Model.QueueBind(errorQueueName, errorExchangeName, string.Empty); using var channel = Client.PullChannel(); channel.Publish(ea.Body, ea.BasicProperties.Headers, errorExchangeName, string.Empty, true); - if (!Consumer.Config.AutoAck) - { - Model.Model.BasicAck(ea.DeliveryTag, false); - } + Model.Model.BasicAck(ea.DeliveryTag, false); } } } diff --git a/src/Pole.EventBus.Rabbitmq/Core/EventBusContainer.cs b/src/Pole.EventBus.Rabbitmq/Core/EventBusContainer.cs index 8aec2bb..bbc750e 100644 --- a/src/Pole.EventBus.Rabbitmq/Core/EventBusContainer.cs +++ b/src/Pole.EventBus.Rabbitmq/Core/EventBusContainer.cs @@ -35,7 +35,7 @@ namespace Pole.EventBus.RabbitMQ this.observerUnitContainer = observerUnitContainer; this.rabbitOptions = rabbitOptions.Value; } - public async Task AutoRegister(IServiceCollection services) + public async Task AutoRegister() { var eventList = new List<(Type type, EventInfoAttribute config)>(); var evenHandlertList = new List<(Type type, EventInfoAttribute config)>(); @@ -43,7 +43,7 @@ namespace Pole.EventBus.RabbitMQ foreach (var (type, config) in eventList) { var eventName = config.EventName; - var eventBus = CreateEventBus(eventName, rabbitOptions.Prefix, 1, false, true, true).BindEvent(type, eventName); + var eventBus = CreateEventBus(eventName, rabbitOptions.Prefix, 1, true, true).BindEvent(type, eventName); await eventBus.AddGrainConsumer(); } foreach (var (type, config) in evenHandlertList) @@ -52,24 +52,15 @@ namespace Pole.EventBus.RabbitMQ if (!eventBusDictionary.TryGetValue(eventName, out RabbitEventBus rabbitEventBus)) { - var eventBus = CreateEventBus(eventName, rabbitOptions.Prefix, 1, false, true, true).BindEvent(type, eventName); + var eventBus = CreateEventBus(eventName, rabbitOptions.Prefix, 1, true, true).BindEvent(type, eventName); await eventBus.AddGrainConsumer(); } } - RegisterEventHandlers(services, evenHandlertList); } - private void RegisterEventHandlers(IServiceCollection services, List<(Type type, EventInfoAttribute config)> evenHandlertList) + public RabbitEventBus CreateEventBus(string exchange, string routePrefix, int lBCount = 1, bool reenqueue = true, bool persistent = true) { - foreach(var eventHandler in evenHandlertList) - { - services.AddScoped(eventHandler.type); - } - } - - public RabbitEventBus CreateEventBus(string exchange, string routePrefix, int lBCount = 1, bool autoAck = false, bool reenqueue = true, bool persistent = true) - { - return new RabbitEventBus(observerUnitContainer, this, exchange, routePrefix, lBCount, autoAck, reenqueue, persistent); + return new RabbitEventBus(observerUnitContainer, this, exchange, routePrefix, lBCount, reenqueue, persistent); } public Task Work(RabbitEventBus bus) { diff --git a/src/Pole.EventBus.Rabbitmq/Core/IRabbitEventBusContainer.cs b/src/Pole.EventBus.Rabbitmq/Core/IRabbitEventBusContainer.cs index 71acb75..8265bb1 100644 --- a/src/Pole.EventBus.Rabbitmq/Core/IRabbitEventBusContainer.cs +++ b/src/Pole.EventBus.Rabbitmq/Core/IRabbitEventBusContainer.cs @@ -5,8 +5,8 @@ namespace Pole.EventBus.RabbitMQ { public interface IRabbitEventBusContainer : IConsumerContainer { - Task AutoRegister(IServiceCollection service); - RabbitEventBus CreateEventBus(string exchange, string routePrefix, int lBCount = 1, bool autoAck = false, bool reenqueue = false, bool persistent = false); + Task AutoRegister(); + RabbitEventBus CreateEventBus(string exchange, string routePrefix, int lBCount = 1, bool reenqueue = false, bool persistent = false); Task Work(RabbitEventBus bus); } } diff --git a/src/Pole.EventBus.Rabbitmq/Core/RabbitEventBus.cs b/src/Pole.EventBus.Rabbitmq/Core/RabbitEventBus.cs index 384553e..608cfd4 100644 --- a/src/Pole.EventBus.Rabbitmq/Core/RabbitEventBus.cs +++ b/src/Pole.EventBus.Rabbitmq/Core/RabbitEventBus.cs @@ -13,7 +13,7 @@ namespace Pole.EventBus.RabbitMQ public RabbitEventBus( IObserverUnitContainer observerUnitContainer, IRabbitEventBusContainer eventBusContainer, - string exchange, string routePrefix, int lBCount = 1, bool autoAck = false, bool reenqueue = true, bool persistent = false) + string exchange, string routePrefix, int lBCount = 1, bool reenqueue = true, bool persistent = false) { if (string.IsNullOrEmpty(exchange)) throw new ArgumentNullException(nameof(exchange)); @@ -29,7 +29,6 @@ namespace Pole.EventBus.RabbitMQ Persistent = persistent; ConsumerConfig = new ConsumerOptions { - AutoAck = autoAck, Reenqueue = reenqueue, ErrorQueueSuffix = "_error", MaxReenqueueTimes = 10 diff --git a/src/Pole.EventBus.Rabbitmq/PoleRabbitmqStartupConfigExtensions.cs b/src/Pole.EventBus.Rabbitmq/PoleRabbitmqStartupConfigExtensions.cs index b7dbd72..890c2a1 100644 --- a/src/Pole.EventBus.Rabbitmq/PoleRabbitmqStartupConfigExtensions.cs +++ b/src/Pole.EventBus.Rabbitmq/PoleRabbitmqStartupConfigExtensions.cs @@ -31,7 +31,7 @@ namespace Microsoft.Extensions.DependencyInjection if (eventBusConfig != default) await eventBusConfig(container); else - await container.AutoRegister(startupOption.Services); + await container.AutoRegister(); var consumers = container.GetConsumers(); foreach (var consumer in consumers) diff --git a/src/Pole.EventBus/EventHandler/PoleEventHandler.cs b/src/Pole.EventBus/EventHandler/PoleEventHandler.cs index 68de69e..bbd522f 100644 --- a/src/Pole.EventBus/EventHandler/PoleEventHandler.cs +++ b/src/Pole.EventBus/EventHandler/PoleEventHandler.cs @@ -19,11 +19,19 @@ namespace Pole.EventBus.EventHandler /// /// /// - public abstract class PoleEventHandler:IPoleEventHandler,IPoleEventHandler + public abstract class PoleEventHandler : PoleEventHandlerBase, IPoleEventHandler { public abstract Task EventHandle(TEvent @event); - public async Task Invoke(List transports, ISerializer serializer, IEventTypeFinder eventTypeFinder, ILogger logger,Type eventHandlerType) + } + public abstract class PoleBulkEventsHandler : PoleEventHandlerBase, IPoleBulkEventsHandler + { + public abstract Task BulkEventsHandle(List events); + } + public abstract class PoleEventHandlerBase : IPoleEventHandler + { + + public async Task Invoke(List transports, ISerializer serializer, IEventTypeFinder eventTypeFinder, ILogger logger, Type eventHandlerType) { if (transports.Count() != 0) { diff --git a/src/Pole.EventBus/ObserverUnit.cs b/src/Pole.EventBus/ObserverUnit.cs index 4cb8cba..b7b867a 100644 --- a/src/Pole.EventBus/ObserverUnit.cs +++ b/src/Pole.EventBus/ObserverUnit.cs @@ -33,10 +33,6 @@ namespace Pole.EventBus Logger = serviceProvider.GetService>>(); EventHandlerType = eventHandlerType; } - public static ObserverUnit From(IServiceProvider serviceProvider) where Grain : Orleans.Grain - { - return new ObserverUnit(serviceProvider, typeof(Grain)); - } public Func, Task> GetBatchEventHandler() { @@ -89,7 +85,7 @@ namespace Pole.EventBus var loggerParams = Expression.Parameter(typeof(ILogger), "logger"); var eventHandlerTypeParams = Expression.Parameter(typeof(Type), "eventHandlerType"); var method = typeof(IPoleEventHandler).GetMethod("Invoke"); - var body = Expression.Call(eventHandlerParams, method, eventBytesTransportParams, serializerParams, serializerParams, eventTypeFinderParams, loggerParams, eventHandlerTypeParams); + var body = Expression.Call(eventHandlerParams, method, eventBytesTransportParams, serializerParams, eventTypeFinderParams, loggerParams, eventHandlerTypeParams); return Expression.Lambda, ISerializer, IEventTypeFinder, ILogger, Type, Task>>(body, eventHandlerObjParams, eventBytesTransportParams, serializerParams, eventTypeFinderParams, loggerParams, eventHandlerTypeParams).Compile(); }); return func; diff --git a/src/Pole.EventBus/PoleEventBusStartupConfigExtensions.cs b/src/Pole.EventBus/PoleEventBusStartupConfigExtensions.cs index f51d4bf..929d57a 100644 --- a/src/Pole.EventBus/PoleEventBusStartupConfigExtensions.cs +++ b/src/Pole.EventBus/PoleEventBusStartupConfigExtensions.cs @@ -1,12 +1,15 @@ using Microsoft.Extensions.DependencyInjection; using Pole.Core; using Pole.Core.Processor; +using Pole.Core.Utils; using Pole.EventBus; +using Pole.EventBus.EventHandler; using Pole.EventBus.Processor; using Pole.EventBus.Processor.Server; using Pole.EventBus.UnitOfWork; using System; using System.Collections.Generic; +using System.Linq; using System.Text; namespace Microsoft.Extensions.DependencyInjection @@ -27,7 +30,20 @@ namespace Microsoft.Extensions.DependencyInjection startupOption.Services.AddHostedService(); startupOption.Services.AddScoped(); startupOption.Services.AddSingleton(); + + RegisterEventHandler(startupOption); return startupOption; } + + private static void RegisterEventHandler(StartupConfig startupOption) + { + foreach (var assembly in AssemblyHelper.GetAssemblies()) + { + foreach (var type in assembly.GetTypes().Where(m => typeof(IPoleEventHandler).IsAssignableFrom(m) && m.IsClass && !m.IsAbstract && !typeof(Orleans.Runtime.GrainReference).IsAssignableFrom(m))) + { + startupOption.Services.AddScoped(type); + } + } + } } } diff --git a/src/Pole.EventBus/Processor/PendingMessageRetryProcessor.cs b/src/Pole.EventBus/Processor/PendingMessageRetryProcessor.cs index b728966..4ef1eaa 100644 --- a/src/Pole.EventBus/Processor/PendingMessageRetryProcessor.cs +++ b/src/Pole.EventBus/Processor/PendingMessageRetryProcessor.cs @@ -55,6 +55,7 @@ namespace Pole.EventBus.Processor public async Task ProcessInternal() { var now = DateTime.UtcNow; + var pendingMessages = await eventStorage.GetMessagesOfNeedRetry(); if (logger.IsEnabled(LogLevel.Debug)) @@ -74,6 +75,7 @@ namespace Pole.EventBus.Processor pendingMessage.Retries++; var targetName = producerContainer.GetTargetName(pendingMessage.Name); await producer.Publish(targetName, bytes); + pendingMessage.StatusName = nameof(EventStatus.Published); } if (pendingMessages.Count() > 0) {