using Microsoft.Extensions.Logging;
using Orleans.Concurrency;
using Pole.EventBus.Event;
using Pole.Core.Serialization;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using System.Reflection.Emit;
using System.Linq.Expressions;
using System.Linq;
using Pole.Core.Exceptions;
using Orleans;
using Pole.Core.Utils.Abstraction;
namespace Pole.EventBus.EventHandler
{
///
///
///
public abstract class PoleEventHandler:IPoleEventHandler,IPoleEventHandler
{
public abstract Task EventHandle(TEvent @event);
public async Task Invoke(List transports, ISerializer serializer, IEventTypeFinder eventTypeFinder, ILogger logger,Type eventHandlerType)
{
if (transports.Count() != 0)
{
var firstTransport = transports.First();
var eventType = eventTypeFinder.FindType(firstTransport.EventTypeCode);
var eventObjs = transports.Select(transport => serializer.Deserialize(firstTransport.EventBytes, eventType)).Select(@event => (TEvent)@event).ToList();
if (this is IPoleBulkEventsHandler batchHandler)
{
await batchHandler.BulkEventsHandle(eventObjs);
logger.LogTrace("Batch invoke completed: {0}->{1}->{2}", eventHandlerType.FullName, nameof(batchHandler.BulkEventsHandle), serializer.Serialize(eventObjs));
return;
}
else if (this is IPoleEventHandler handler)
{
var handleTasks = eventObjs.Select(m => handler.EventHandle(m));
await Task.WhenAll(handleTasks);
logger.LogTrace("Invoke completed: {0}->{1}->{2}", eventHandlerType.FullName, nameof(handler.EventHandle), serializer.Serialize(eventObjs));
return;
}
else
{
throw new EventHandlerImplementedNotRightException(nameof(handler.EventHandle), eventType.Name, this.GetType().FullName);
}
}
else
{
if (logger.IsEnabled(LogLevel.Information))
logger.LogInformation($"{nameof(EventBytesTransport.FromBytes)} failed");
}
}
}
}