using Microsoft.Extensions.Logging; using Orleans.Concurrency; using Pole.EventBus.Event; using Pole.Core.Serialization; using System; using System.Collections.Generic; using System.Text; using System.Threading.Tasks; using Microsoft.Extensions.DependencyInjection; using System.Reflection.Emit; using System.Linq.Expressions; using System.Linq; using Pole.Core.Exceptions; using Orleans; using Pole.Core.Utils.Abstraction; namespace Pole.EventBus.EventHandler { /// /// /// public abstract class PoleEventHandler:IPoleEventHandler,IPoleEventHandler { public abstract Task EventHandle(TEvent @event); public async Task Invoke(List transports, ISerializer serializer, IEventTypeFinder eventTypeFinder, ILogger logger,Type eventHandlerType) { if (transports.Count() != 0) { var firstTransport = transports.First(); var eventType = eventTypeFinder.FindType(firstTransport.EventTypeCode); var eventObjs = transports.Select(transport => serializer.Deserialize(firstTransport.EventBytes, eventType)).Select(@event => (TEvent)@event).ToList(); if (this is IPoleBulkEventsHandler batchHandler) { await batchHandler.BulkEventsHandle(eventObjs); logger.LogTrace("Batch invoke completed: {0}->{1}->{2}", eventHandlerType.FullName, nameof(batchHandler.BulkEventsHandle), serializer.Serialize(eventObjs)); return; } else if (this is IPoleEventHandler handler) { var handleTasks = eventObjs.Select(m => handler.EventHandle(m)); await Task.WhenAll(handleTasks); logger.LogTrace("Invoke completed: {0}->{1}->{2}", eventHandlerType.FullName, nameof(handler.EventHandle), serializer.Serialize(eventObjs)); return; } else { throw new EventHandlerImplementedNotRightException(nameof(handler.EventHandle), eventType.Name, this.GetType().FullName); } } else { if (logger.IsEnabled(LogLevel.Information)) logger.LogInformation($"{nameof(EventBytesTransport.FromBytes)} failed"); } } } }