diff --git a/samples/apis/Backet.Api/Controllers/BacketController.cs b/samples/apis/Backet.Api/Controllers/BacketController.cs index 2540106..701c2c7 100644 --- a/samples/apis/Backet.Api/Controllers/BacketController.cs +++ b/samples/apis/Backet.Api/Controllers/BacketController.cs @@ -6,7 +6,6 @@ using System.Runtime; using System.Text; using System.Threading.Tasks; using Backet.Api.Domain.Event; -using Backet.Api.EventHandlers.Abstraction; using Backet.Api.Grains.Abstraction; using Backet.Api.Infrastructure; using Dapper; diff --git a/samples/apis/Backet.Api/EventHandlers/Abstraction/IToNoticeBacketCreatedEventHandler.cs b/samples/apis/Backet.Api/EventHandlers/Abstraction/IToNoticeBacketCreatedEventHandler.cs deleted file mode 100644 index e06612d..0000000 --- a/samples/apis/Backet.Api/EventHandlers/Abstraction/IToNoticeBacketCreatedEventHandler.cs +++ /dev/null @@ -1,13 +0,0 @@ -using Backet.Api.Domain.Event; -using Pole.EventBus.EventHandler; -using System; -using System.Collections.Generic; -using System.Linq; -using System.Threading.Tasks; - -namespace Backet.Api.EventHandlers.Abstraction -{ - public interface IToNoticeBacketCreatedEventHandler : IPoleBulkEventsHandler, IPoleEventHandler - { - } -} diff --git a/samples/apis/Backet.Api/EventHandlers/ToNoticeBacketCreatedEventHandler.cs b/samples/apis/Backet.Api/EventHandlers/ToNoticeBacketCreatedEventHandler.cs index ef4ac63..da077c3 100644 --- a/samples/apis/Backet.Api/EventHandlers/ToNoticeBacketCreatedEventHandler.cs +++ b/samples/apis/Backet.Api/EventHandlers/ToNoticeBacketCreatedEventHandler.cs @@ -1,5 +1,4 @@ using Backet.Api.Domain.Event; -using Backet.Api.EventHandlers.Abstraction; using Pole.EventBus.EventHandler; using System; using System.Collections.Generic; @@ -8,14 +7,14 @@ using System.Threading.Tasks; namespace Backet.Api.EventHandlers { - public class ToNoticeBacketCreatedEventHandler : PoleEventHandler, IToNoticeBacketCreatedEventHandler + public class ToNoticeBacketCreatedEventHandler : PoleEventHandler, IPoleBulkEventsHandler { public async Task BulkEventsHandle(List @event) - { + { await Task.Delay(1500); } - public async Task EventHandle(BacketCreatedEvent @event) + public override async Task EventHandle(BacketCreatedEvent @event) { await Task.Delay(1200); } diff --git a/src/Pole.EventBus.Rabbitmq/Consumer/ConsumerRunner.cs b/src/Pole.EventBus.Rabbitmq/Consumer/ConsumerRunner.cs index 92d562a..d61e95b 100644 --- a/src/Pole.EventBus.Rabbitmq/Consumer/ConsumerRunner.cs +++ b/src/Pole.EventBus.Rabbitmq/Consumer/ConsumerRunner.cs @@ -70,62 +70,35 @@ namespace Pole.EventBus.RabbitMQ } private async Task BatchExecuter(List list) { - if (list.Count == 1) - { - await Process(list.First()); - } - else - { - try - { - await Consumer.Notice(list.Select(o => o.Body).ToList()); - } - catch (Exception exception) - { - Logger.LogError(exception, $"An error occurred in batch consume {Queue.Queue} queue, routing path {Consumer.EventBus.Exchange}->{Queue.Queue}->{Queue.Queue}"); - if (Consumer.Config.Reenqueue) - { - foreach (var item in list) - { - await ProcessComsumerErrors(item, exception); - } - return; - } - } - if (!Consumer.Config.AutoAck) - { - if (errorMessageDeliveryTags.Count == 0) - { - Model.Model.BasicAck(list.Max(o => o.DeliveryTag), true); - } - else - { - list.ForEach(m => - { - Model.Model.BasicAck(m.DeliveryTag, false); - }); - } - } - } - } - private async Task Process(BasicDeliverEventArgs ea) - { try { - await Consumer.Notice(ea.Body); + await Consumer.Notice(list.Select(o => o.Body).ToList()); } catch (Exception exception) { - Logger.LogError(exception, $"An error occurred in consume {Queue.Queue} queue, routing path {Consumer.EventBus.Exchange}->{Queue.Queue}->{Queue.Queue}"); + Logger.LogError(exception, $"An error occurred in batch consume {Queue.Queue} queue, routing path {Consumer.EventBus.Exchange}->{Queue.Queue}->{Queue.Queue}"); if (Consumer.Config.Reenqueue) { - await ProcessComsumerErrors(ea, exception); + foreach (var item in list) + { + await ProcessComsumerErrors(item, exception); + } return; } } if (!Consumer.Config.AutoAck) { - Model.Model.BasicAck(ea.DeliveryTag, false); + if (errorMessageDeliveryTags.Count == 0) + { + Model.Model.BasicAck(list.Max(o => o.DeliveryTag), true); + } + else + { + list.ForEach(m => + { + Model.Model.BasicAck(m.DeliveryTag, false); + }); + } } } diff --git a/src/Pole.EventBus.Rabbitmq/Consumer/RabbitConsumer.cs b/src/Pole.EventBus.Rabbitmq/Consumer/RabbitConsumer.cs index c0996f5..2e8386a 100644 --- a/src/Pole.EventBus.Rabbitmq/Consumer/RabbitConsumer.cs +++ b/src/Pole.EventBus.Rabbitmq/Consumer/RabbitConsumer.cs @@ -7,8 +7,7 @@ namespace Pole.EventBus.RabbitMQ public class RabbitConsumer : Consumer { public RabbitConsumer( - Func eventHandlers, - Func, Task> batchEventHandlers) : base(new List> { eventHandlers }, new List, Task>> { batchEventHandlers }) + Func, Task> batchEventHandlers) : base( new List, Task>> { batchEventHandlers }) { } public RabbitEventBus EventBus { get; set; } diff --git a/src/Pole.EventBus.Rabbitmq/Core/EventBusContainer.cs b/src/Pole.EventBus.Rabbitmq/Core/EventBusContainer.cs index d052272..9b388d3 100644 --- a/src/Pole.EventBus.Rabbitmq/Core/EventBusContainer.cs +++ b/src/Pole.EventBus.Rabbitmq/Core/EventBusContainer.cs @@ -126,23 +126,24 @@ namespace Pole.EventBus.RabbitMQ foreach (var type in assembly.GetTypes().Where(m => typeof(IPoleEventHandler).IsAssignableFrom(m) && m.IsClass && !m.IsAbstract && !typeof(Orleans.Runtime.GrainReference).IsAssignableFrom(m))) { - var eventHandlerInterface = type.GetInterfaces().FirstOrDefault(type => typeof(IPoleEventHandler).IsAssignableFrom(type) && !type.IsGenericType); - var basePoleEventHandlerInterface = eventHandlerInterface.GetInterfaces().FirstOrDefault(m => m.IsGenericType); + var eventType = type.GetGenericArguments().FirstOrDefault(); + //var eventHandlerInterface = type.GetInterfaces().FirstOrDefault(type => typeof(IPoleEventHandler).IsAssignableFrom(type) && !type.IsGenericType); + //var basePoleEventHandlerInterface = eventHandlerInterface.GetInterfaces().FirstOrDefault(m => m.IsGenericType); - if (basePoleEventHandlerInterface == null) - { - throw new PoleEventHandlerImplementException("PoleEventHandler interface must Inherited from IPoleEventHandler"); - } - var eventType = basePoleEventHandlerInterface.GetGenericArguments().FirstOrDefault(); - if (eventType == null) - { - throw new PoleEventHandlerImplementException("PoleEventHandler interface must Inherited from IPoleEventHandler"); - } + //if (basePoleEventHandlerInterface == null) + //{ + // throw new PoleEventHandlerImplementException("PoleEventHandler interface must Inherited from IPoleEventHandler"); + //} + //var eventType = basePoleEventHandlerInterface.GetGenericArguments().FirstOrDefault(); + //if (eventType == null) + //{ + // throw new PoleEventHandlerImplementException("PoleEventHandler interface must Inherited from IPoleEventHandler"); + //} var attribute = eventType.GetCustomAttributes(typeof(EventInfoAttribute), false).FirstOrDefault(); if (attribute != null) { - eventHandlertList.Add((eventHandlerInterface, (EventInfoAttribute)attribute)); + eventHandlertList.Add((type, (EventInfoAttribute)attribute)); } else { diff --git a/src/Pole.EventBus.Rabbitmq/Core/RabbitEventBus.cs b/src/Pole.EventBus.Rabbitmq/Core/RabbitEventBus.cs index 4f3d3ea..384553e 100644 --- a/src/Pole.EventBus.Rabbitmq/Core/RabbitEventBus.cs +++ b/src/Pole.EventBus.Rabbitmq/Core/RabbitEventBus.cs @@ -62,7 +62,6 @@ namespace Pole.EventBus.RabbitMQ { string queueNameSuffix = observerUnit.EventHandlerType.FullName; var consumer = new RabbitConsumer( - observerUnit.GetEventHandler(), observerUnit.GetBatchEventHandler()) { EventBus = this, diff --git a/src/Pole.EventBus/Consumer.cs b/src/Pole.EventBus/Consumer.cs index 4698727..5e95743 100644 --- a/src/Pole.EventBus/Consumer.cs +++ b/src/Pole.EventBus/Consumer.cs @@ -7,24 +7,12 @@ namespace Pole.EventBus { public abstract class Consumer : IConsumer { - readonly List> eventHandlers; readonly List, Task>> batchEventHandlers; public Consumer( - List> eventHandlers, List, Task>> batchEventHandlers) { - this.eventHandlers = eventHandlers; this.batchEventHandlers = batchEventHandlers; } - public void AddHandler(Func func) - { - eventHandlers.Add(func); - } - public Task Notice(byte[] bytes) - { - return Task.WhenAll(eventHandlers.Select(func => func(bytes))); - } - public Task Notice(List list) { return Task.WhenAll(batchEventHandlers.Select(func => func(list))); diff --git a/src/Pole.EventBus/EventHandler/IPoleEventHandler.cs b/src/Pole.EventBus/EventHandler/IPoleEventHandler.cs index ca204f8..b442b50 100644 --- a/src/Pole.EventBus/EventHandler/IPoleEventHandler.cs +++ b/src/Pole.EventBus/EventHandler/IPoleEventHandler.cs @@ -1,4 +1,6 @@ -using Orleans; +using Microsoft.Extensions.Logging; +using Orleans; +using Pole.Core.Serialization; using Pole.EventBus.Event; using System; using System.Collections.Generic; @@ -15,9 +17,8 @@ namespace Pole.EventBus.EventHandler { Task BulkEventsHandle(List events); } - public interface IPoleEventHandler : IGrainWithStringKey + public interface IPoleEventHandler { - public Task Invoke(EventBytesTransport transport); - public Task Invoke(List transports); + public Task Invoke(List transports, ISerializer serializer, IEventTypeFinder eventTypeFinder, ILogger logger, Type eventHandlerType); } } diff --git a/src/Pole.EventBus/EventHandler/PoleEventHandler.cs b/src/Pole.EventBus/EventHandler/PoleEventHandler.cs index a2bb825..68de69e 100644 --- a/src/Pole.EventBus/EventHandler/PoleEventHandler.cs +++ b/src/Pole.EventBus/EventHandler/PoleEventHandler.cs @@ -19,49 +19,11 @@ namespace Pole.EventBus.EventHandler /// /// /// - public abstract class PoleEventHandler : Grain + public abstract class PoleEventHandler:IPoleEventHandler,IPoleEventHandler { - private IEventTypeFinder eventTypeFinder; - private ISerializer serializer; - private ILogger logger; - private Type grainType; + public abstract Task EventHandle(TEvent @event); - public PoleEventHandler() - { - grainType = GetType(); - } - public override async Task OnActivateAsync() - { - await base.OnActivateAsync(); - await DependencyInjection(); - } - protected virtual Task DependencyInjection() - { - //ConfigOptions = ServiceProvider.GetOptionsByName(typeof(MainGrain).FullName); - serializer = ServiceProvider.GetService(); - eventTypeFinder = ServiceProvider.GetService(); - logger = (ILogger)ServiceProvider.GetService(typeof(ILogger<>).MakeGenericType(grainType)); - return Task.CompletedTask; - } - - public Task Invoke(EventBytesTransport transport) - { - var eventType = eventTypeFinder.FindType(transport.EventTypeCode); - - var eventObj = serializer.Deserialize(transport.EventBytes, eventType); - if (this is IPoleEventHandler handler) - { - var result = handler.EventHandle((TEvent)eventObj); - logger.LogTrace($"{nameof(PoleEventHandler)} Invoke completed: {0}->{1}->{2}", grainType.FullName, nameof(handler.EventHandle), serializer.Serialize(eventObj)); - return result; - } - else - { - throw new EventHandlerImplementedNotRightException(nameof(handler.EventHandle), eventType.Name, this.GetType().FullName); - } - } - - public async Task Invoke(List transports) + public async Task Invoke(List transports, ISerializer serializer, IEventTypeFinder eventTypeFinder, ILogger logger,Type eventHandlerType) { if (transports.Count() != 0) { @@ -71,14 +33,14 @@ namespace Pole.EventBus.EventHandler if (this is IPoleBulkEventsHandler batchHandler) { await batchHandler.BulkEventsHandle(eventObjs); - logger.LogTrace("Batch invoke completed: {0}->{1}->{2}", grainType.FullName, nameof(batchHandler.BulkEventsHandle), serializer.Serialize(eventObjs)); + logger.LogTrace("Batch invoke completed: {0}->{1}->{2}", eventHandlerType.FullName, nameof(batchHandler.BulkEventsHandle), serializer.Serialize(eventObjs)); return; } else if (this is IPoleEventHandler handler) { var handleTasks = eventObjs.Select(m => handler.EventHandle(m)); await Task.WhenAll(handleTasks); - logger.LogTrace("Batch invoke completed: {0}->{1}->{2}", grainType.FullName, nameof(handler.EventHandle), serializer.Serialize(eventObjs)); + logger.LogTrace("Invoke completed: {0}->{1}->{2}", eventHandlerType.FullName, nameof(handler.EventHandle), serializer.Serialize(eventObjs)); return; } else diff --git a/src/Pole.EventBus/IConsumer.cs b/src/Pole.EventBus/IConsumer.cs index f39d389..a4553f6 100644 --- a/src/Pole.EventBus/IConsumer.cs +++ b/src/Pole.EventBus/IConsumer.cs @@ -5,7 +5,6 @@ namespace Pole.EventBus { public interface IConsumer { - Task Notice(byte[] bytes); Task Notice(List list); } } diff --git a/src/Pole.EventBus/IObserverUnit.cs b/src/Pole.EventBus/IObserverUnit.cs index 1a3ea57..1aee82e 100644 --- a/src/Pole.EventBus/IObserverUnit.cs +++ b/src/Pole.EventBus/IObserverUnit.cs @@ -6,7 +6,6 @@ namespace Pole.EventBus { public interface IObserverUnit : IGrainID { - Func GetEventHandler(); Func, Task> GetBatchEventHandler(); } } diff --git a/src/Pole.EventBus/ObserverUnit.cs b/src/Pole.EventBus/ObserverUnit.cs index 19aa0f4..4cb8cba 100644 --- a/src/Pole.EventBus/ObserverUnit.cs +++ b/src/Pole.EventBus/ObserverUnit.cs @@ -21,8 +21,6 @@ namespace Pole.EventBus readonly IServiceProvider serviceProvider; readonly ISerializer serializer; readonly IEventTypeFinder typeFinder; - readonly IClusterClient clusterClient; - Func eventHandler; Func, Task> batchEventHandler; protected ILogger Logger { get; private set; } public Type EventHandlerType { get; } @@ -30,7 +28,6 @@ namespace Pole.EventBus public ObserverUnit(IServiceProvider serviceProvider, Type eventHandlerType) { this.serviceProvider = serviceProvider; - clusterClient = serviceProvider.GetService(); serializer = serviceProvider.GetService(); typeFinder = serviceProvider.GetService(); Logger = serviceProvider.GetService>>(); @@ -41,11 +38,6 @@ namespace Pole.EventBus return new ObserverUnit(serviceProvider, typeof(Grain)); } - public Func GetEventHandler() - { - return eventHandler; - } - public Func, Task> GetBatchEventHandler() { return batchEventHandler; @@ -55,23 +47,8 @@ namespace Pole.EventBus { if (!typeof(IPoleEventHandler).IsAssignableFrom(EventHandlerType)) throw new NotSupportedException($"{EventHandlerType.FullName} must inheritance from PoleEventHandler"); - eventHandler = EventHandler; batchEventHandler = BatchEventHandler; //内部函数 - Task EventHandler(byte[] bytes) - { - var (success, transport) = EventBytesTransport.FromBytes(bytes); - if (success) - { - return GetObserver(EventHandlerType, transport.EventId).Invoke(transport); - } - else - { - if (Logger.IsEnabled(LogLevel.Error)) - Logger.LogError($" EventId:{nameof(EventBytesTransport.EventId)} is not a event"); - } - return Task.CompletedTask; - } Task BatchEventHandler(List list) { var transports = list.Select(bytes => @@ -87,23 +64,35 @@ namespace Pole.EventBus .Select(o => (o.transport)) .ToList(); // 批量处理的时候 grain Id 取第一个 event的id - return GetObserver(EventHandlerType, transports.First().EventId).Invoke(transports); + using (var scope = serviceProvider.CreateScope()) + { + var eventHandlerInstance = scope.ServiceProvider.GetRequiredService(EventHandlerType); + var serializer = scope.ServiceProvider.GetRequiredService() as ISerializer; + var eventTypeFinder = scope.ServiceProvider.GetRequiredService() as IEventTypeFinder; + var loggerFactory = scope.ServiceProvider.GetRequiredService() as ILoggerFactory; + var logger = loggerFactory.CreateLogger(EventHandlerType); + return GetObserver(EventHandlerType)(eventHandlerInstance, transports, serializer, eventTypeFinder, logger, EventHandlerType); + } } } - static readonly ConcurrentDictionary> _observerGeneratorDict = new ConcurrentDictionary>(); - private IPoleEventHandler GetObserver(Type ObserverType, string primaryKey) + static readonly ConcurrentDictionary, ISerializer, IEventTypeFinder, ILogger, Type, Task>> _observerGeneratorDict = new ConcurrentDictionary, ISerializer, IEventTypeFinder, ILogger, Type, Task>>(); + private Func, ISerializer, IEventTypeFinder, ILogger, Type, Task> GetObserver(Type ObserverType) { var func = _observerGeneratorDict.GetOrAdd(ObserverType, key => { - var clientType = typeof(IClusterClient); - var clientParams = Expression.Parameter(clientType, "client"); - var primaryKeyParams = Expression.Parameter(typeof(string), "primaryKey"); - var grainClassNamePrefixParams = Expression.Parameter(typeof(string), "grainClassNamePrefix"); - var method = typeof(ClusterClientExtensions).GetMethod("GetGrain", new Type[] { clientType, typeof(string), typeof(string) }); - var body = Expression.Call(method.MakeGenericMethod(ObserverType), clientParams, primaryKeyParams, grainClassNamePrefixParams); - return Expression.Lambda>(body, clientParams, primaryKeyParams, grainClassNamePrefixParams).Compile(); + var eventHandlerObjParams = Expression.Parameter(typeof(object), "observerType"); + + var eventHandlerParams = Expression.Convert(eventHandlerObjParams, ObserverType); + var eventBytesTransportParams = Expression.Parameter(typeof(List), "observerType"); + var serializerParams = Expression.Parameter(typeof(ISerializer), "serializer"); + var eventTypeFinderParams = Expression.Parameter(typeof(IEventTypeFinder), "eventTypeFinder"); + var loggerParams = Expression.Parameter(typeof(ILogger), "logger"); + var eventHandlerTypeParams = Expression.Parameter(typeof(Type), "eventHandlerType"); + var method = typeof(IPoleEventHandler).GetMethod("Invoke"); + var body = Expression.Call(eventHandlerParams, method, eventBytesTransportParams, serializerParams, serializerParams, eventTypeFinderParams, loggerParams, eventHandlerTypeParams); + return Expression.Lambda, ISerializer, IEventTypeFinder, ILogger, Type, Task>>(body, eventHandlerObjParams, eventBytesTransportParams, serializerParams, eventTypeFinderParams, loggerParams, eventHandlerTypeParams).Compile(); }); - return func(clusterClient, primaryKey, null); + return func; } } public static class ClusterClientExtensions diff --git a/src/Pole.EventBus/ObserverUnitContainer.cs b/src/Pole.EventBus/ObserverUnitContainer.cs index 023d163..578e3ef 100644 --- a/src/Pole.EventBus/ObserverUnitContainer.cs +++ b/src/Pole.EventBus/ObserverUnitContainer.cs @@ -22,23 +22,24 @@ namespace Pole.EventBus { foreach (var type in assembly.GetTypes().Where(m => typeof(IPoleEventHandler).IsAssignableFrom(m) && m.IsClass && !m.IsAbstract && !typeof(Orleans.Runtime.GrainReference).IsAssignableFrom(m))) { - var eventHandlerInterface = type.GetInterfaces().FirstOrDefault(type => typeof(IPoleEventHandler).IsAssignableFrom(type) && !type.IsGenericType); - var basePoleEventHandlerInterface= eventHandlerInterface.GetInterfaces().FirstOrDefault(m=>m.IsGenericType); + var eventType = type.GetGenericArguments().FirstOrDefault(); + //var eventHandlerInterface = type.GetInterfaces().FirstOrDefault(type => typeof(IPoleEventHandler).IsAssignableFrom(type) && !type.IsGenericType); + //var basePoleEventHandlerInterface= eventHandlerInterface.GetInterfaces().FirstOrDefault(m=>m.IsGenericType); - if (basePoleEventHandlerInterface == null) - { - throw new PoleEventHandlerImplementException("PoleEventHandler interface must Inherited from IPoleEventHandler"); - } - var eventType= basePoleEventHandlerInterface.GetGenericArguments().FirstOrDefault(); - if (eventType == null) - { - throw new PoleEventHandlerImplementException("PoleEventHandler interface must Inherited from IPoleEventHandler"); - } + //if (basePoleEventHandlerInterface == null) + //{ + // throw new PoleEventHandlerImplementException("PoleEventHandler interface must Inherited from IPoleEventHandler"); + //} + //var eventType= basePoleEventHandlerInterface.GetGenericArguments().FirstOrDefault(); + //if (eventType == null) + //{ + // throw new PoleEventHandlerImplementException("PoleEventHandler interface must Inherited from IPoleEventHandler"); + //} var attribute = eventType.GetCustomAttributes(typeof(EventInfoAttribute), false).FirstOrDefault(); if (attribute != null) { - eventHandlerList.Add((eventHandlerInterface, (EventInfoAttribute)attribute)); + eventHandlerList.Add((type, (EventInfoAttribute)attribute)); } else {