using Microsoft.Extensions.Logging;
using Orleans.Concurrency;
using Pole.Core.Abstraction;
using Pole.Core.EventBus.Event;
using Pole.Core.Serialization;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using System.Reflection.Emit;
using System.Linq.Expressions;
using System.Linq;
using Pole.Core.Exceptions;
namespace Pole.Core.EventBus.EventHandler
{
///
///
///
public class PoleEventHandler : PoleEventHandlerBase
{
private IEventTypeFinder eventTypeFinder;
private ISerializer serializer;
private ILogger logger;
private Type grainType;
public PoleEventHandler()
{
grainType = GetType();
DependencyInjection();
}
public override async Task OnActivateAsync()
{
await DependencyInjection();
await base.OnActivateAsync();
}
protected virtual Task DependencyInjection()
{
//ConfigOptions = ServiceProvider.GetOptionsByName(typeof(MainGrain).FullName);
serializer = ServiceProvider.GetService();
eventTypeFinder = ServiceProvider.GetService();
logger = (ILogger)ServiceProvider.GetService(typeof(ILogger<>).MakeGenericType(grainType));
return Task.CompletedTask;
}
public override Task Invoke(EventBytesTransport transport)
{
var eventType = eventTypeFinder.FindType(transport.EventTypeCode);
var method = typeof(ClusterClientExtensions).GetMethod(Consts.EventHandlerMethodName, new Type[] { eventType });
if (method == null)
{
throw new EventHandlerTargetMethodNotFoundException(Consts.EventHandlerMethodName, eventType.Name);
}
var data = serializer.Deserialize(transport.EventBytes, eventType);
var eventHandlerType = this.GetType();
var eventHandlerObjectParams = Expression.Parameter(typeof(object), "eventHandler");
var eventHandlerParams = Expression.Convert(eventHandlerObjectParams, eventHandlerType);
var eventObjectParams = Expression.Parameter(typeof(object), "event");
var eventParams = Expression.Convert(eventObjectParams, eventType);
var body = Expression.Call(method, eventHandlerParams, eventParams);
var func = Expression.Lambda>(body, true, eventHandlerObjectParams, eventObjectParams).Compile();
var result = func(this, data);
logger.LogTrace("Invoke completed: {0}->{1}->{2}", grainType.FullName, Consts.EventHandlerMethodName, serializer.Serialize(data));
return result;
}
public override Task Invoke(List transports)
{
if (transports.Count() != 0)
{
var firstTransport = transports.First();
var eventType = eventTypeFinder.FindType(firstTransport.EventTypeCode);
var method = typeof(ClusterClientExtensions).GetMethod(Consts.BatchEventsHandlerMethodName, new Type[] { eventType });
if (method == null)
{
var tasks = transports.Select(transport => Invoke(transport));
return Task.WhenAll(tasks);
}
var datas = transports.Select(transport => serializer.Deserialize(firstTransport.EventBytes, eventType)).ToList();
var eventHandlerType = this.GetType();
var eventHandlerObjectParams = Expression.Parameter(typeof(object), "eventHandler");
var eventHandlerParams = Expression.Convert(eventHandlerObjectParams, eventHandlerType);
var eventObjectParams = Expression.Parameter(typeof(object), "events");
var eventsType = typeof(List<>).MakeGenericType(eventType);
var eventsParams = Expression.Convert(eventObjectParams, eventsType);
var body = Expression.Call(method, eventHandlerParams, eventsParams);
var func = Expression.Lambda>(body, true, eventHandlerObjectParams, eventObjectParams).Compile();
var result = func(this, datas);
logger.LogTrace("Batch invoke completed: {0}->{1}->{2}", grainType.FullName, Consts.EventHandlerMethodName, serializer.Serialize(datas));
return result;
}
else
{
if (logger.IsEnabled(LogLevel.Information))
logger.LogInformation($"{nameof(EventBytesTransport.FromBytes)} failed");
return Task.CompletedTask;
}
}
}
}