Commit 888a195d by dingsongjie

eventbus 改造完成

parent 79b51f68
......@@ -6,7 +6,6 @@ using System.Runtime;
using System.Text;
using System.Threading.Tasks;
using Backet.Api.Domain.Event;
using Backet.Api.EventHandlers.Abstraction;
using Backet.Api.Grains.Abstraction;
using Backet.Api.Infrastructure;
using Dapper;
......
using Backet.Api.Domain.Event;
using Pole.EventBus.EventHandler;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace Backet.Api.EventHandlers.Abstraction
{
public interface IToNoticeBacketCreatedEventHandler : IPoleBulkEventsHandler<BacketCreatedEvent>, IPoleEventHandler<BacketCreatedEvent>
{
}
}
using Backet.Api.Domain.Event;
using Backet.Api.EventHandlers.Abstraction;
using Pole.EventBus.EventHandler;
using System;
using System.Collections.Generic;
......@@ -8,14 +7,14 @@ using System.Threading.Tasks;
namespace Backet.Api.EventHandlers
{
public class ToNoticeBacketCreatedEventHandler : PoleEventHandler<BacketCreatedEvent>, IToNoticeBacketCreatedEventHandler
public class ToNoticeBacketCreatedEventHandler : PoleEventHandler<BacketCreatedEvent>, IPoleBulkEventsHandler<BacketCreatedEvent>
{
public async Task BulkEventsHandle(List<BacketCreatedEvent> @event)
{
{
await Task.Delay(1500);
}
public async Task EventHandle(BacketCreatedEvent @event)
public override async Task EventHandle(BacketCreatedEvent @event)
{
await Task.Delay(1200);
}
......
......@@ -70,62 +70,35 @@ namespace Pole.EventBus.RabbitMQ
}
private async Task BatchExecuter(List<BasicDeliverEventArgs> list)
{
if (list.Count == 1)
{
await Process(list.First());
}
else
{
try
{
await Consumer.Notice(list.Select(o => o.Body).ToList());
}
catch (Exception exception)
{
Logger.LogError(exception, $"An error occurred in batch consume {Queue.Queue} queue, routing path {Consumer.EventBus.Exchange}->{Queue.Queue}->{Queue.Queue}");
if (Consumer.Config.Reenqueue)
{
foreach (var item in list)
{
await ProcessComsumerErrors(item, exception);
}
return;
}
}
if (!Consumer.Config.AutoAck)
{
if (errorMessageDeliveryTags.Count == 0)
{
Model.Model.BasicAck(list.Max(o => o.DeliveryTag), true);
}
else
{
list.ForEach(m =>
{
Model.Model.BasicAck(m.DeliveryTag, false);
});
}
}
}
}
private async Task Process(BasicDeliverEventArgs ea)
{
try
{
await Consumer.Notice(ea.Body);
await Consumer.Notice(list.Select(o => o.Body).ToList());
}
catch (Exception exception)
{
Logger.LogError(exception, $"An error occurred in consume {Queue.Queue} queue, routing path {Consumer.EventBus.Exchange}->{Queue.Queue}->{Queue.Queue}");
Logger.LogError(exception, $"An error occurred in batch consume {Queue.Queue} queue, routing path {Consumer.EventBus.Exchange}->{Queue.Queue}->{Queue.Queue}");
if (Consumer.Config.Reenqueue)
{
await ProcessComsumerErrors(ea, exception);
foreach (var item in list)
{
await ProcessComsumerErrors(item, exception);
}
return;
}
}
if (!Consumer.Config.AutoAck)
{
Model.Model.BasicAck(ea.DeliveryTag, false);
if (errorMessageDeliveryTags.Count == 0)
{
Model.Model.BasicAck(list.Max(o => o.DeliveryTag), true);
}
else
{
list.ForEach(m =>
{
Model.Model.BasicAck(m.DeliveryTag, false);
});
}
}
}
......
......@@ -7,8 +7,7 @@ namespace Pole.EventBus.RabbitMQ
public class RabbitConsumer : Consumer
{
public RabbitConsumer(
Func<byte[], Task> eventHandlers,
Func<List<byte[]>, Task> batchEventHandlers) : base(new List<Func<byte[], Task>> { eventHandlers }, new List<Func<List<byte[]>, Task>> { batchEventHandlers })
Func<List<byte[]>, Task> batchEventHandlers) : base( new List<Func<List<byte[]>, Task>> { batchEventHandlers })
{
}
public RabbitEventBus EventBus { get; set; }
......
......@@ -126,23 +126,24 @@ namespace Pole.EventBus.RabbitMQ
foreach (var type in assembly.GetTypes().Where(m => typeof(IPoleEventHandler).IsAssignableFrom(m) && m.IsClass && !m.IsAbstract && !typeof(Orleans.Runtime.GrainReference).IsAssignableFrom(m)))
{
var eventHandlerInterface = type.GetInterfaces().FirstOrDefault(type => typeof(IPoleEventHandler).IsAssignableFrom(type) && !type.IsGenericType);
var basePoleEventHandlerInterface = eventHandlerInterface.GetInterfaces().FirstOrDefault(m => m.IsGenericType);
var eventType = type.GetGenericArguments().FirstOrDefault();
//var eventHandlerInterface = type.GetInterfaces().FirstOrDefault(type => typeof(IPoleEventHandler).IsAssignableFrom(type) && !type.IsGenericType);
//var basePoleEventHandlerInterface = eventHandlerInterface.GetInterfaces().FirstOrDefault(m => m.IsGenericType);
if (basePoleEventHandlerInterface == null)
{
throw new PoleEventHandlerImplementException("PoleEventHandler interface must Inherited from IPoleEventHandler<TEvent>");
}
var eventType = basePoleEventHandlerInterface.GetGenericArguments().FirstOrDefault();
if (eventType == null)
{
throw new PoleEventHandlerImplementException("PoleEventHandler interface must Inherited from IPoleEventHandler<TEvent>");
}
//if (basePoleEventHandlerInterface == null)
//{
// throw new PoleEventHandlerImplementException("PoleEventHandler interface must Inherited from IPoleEventHandler<TEvent>");
//}
//var eventType = basePoleEventHandlerInterface.GetGenericArguments().FirstOrDefault();
//if (eventType == null)
//{
// throw new PoleEventHandlerImplementException("PoleEventHandler interface must Inherited from IPoleEventHandler<TEvent>");
//}
var attribute = eventType.GetCustomAttributes(typeof(EventInfoAttribute), false).FirstOrDefault();
if (attribute != null)
{
eventHandlertList.Add((eventHandlerInterface, (EventInfoAttribute)attribute));
eventHandlertList.Add((type, (EventInfoAttribute)attribute));
}
else
{
......
......@@ -62,7 +62,6 @@ namespace Pole.EventBus.RabbitMQ
{
string queueNameSuffix = observerUnit.EventHandlerType.FullName;
var consumer = new RabbitConsumer(
observerUnit.GetEventHandler(),
observerUnit.GetBatchEventHandler())
{
EventBus = this,
......
......@@ -7,24 +7,12 @@ namespace Pole.EventBus
{
public abstract class Consumer : IConsumer
{
readonly List<Func<byte[], Task>> eventHandlers;
readonly List<Func<List<byte[]>, Task>> batchEventHandlers;
public Consumer(
List<Func<byte[], Task>> eventHandlers,
List<Func<List<byte[]>, Task>> batchEventHandlers)
{
this.eventHandlers = eventHandlers;
this.batchEventHandlers = batchEventHandlers;
}
public void AddHandler(Func<byte[], Task> func)
{
eventHandlers.Add(func);
}
public Task Notice(byte[] bytes)
{
return Task.WhenAll(eventHandlers.Select(func => func(bytes)));
}
public Task Notice(List<byte[]> list)
{
return Task.WhenAll(batchEventHandlers.Select(func => func(list)));
......
using Orleans;
using Microsoft.Extensions.Logging;
using Orleans;
using Pole.Core.Serialization;
using Pole.EventBus.Event;
using System;
using System.Collections.Generic;
......@@ -15,9 +17,8 @@ namespace Pole.EventBus.EventHandler
{
Task BulkEventsHandle(List<TEvent> events);
}
public interface IPoleEventHandler : IGrainWithStringKey
public interface IPoleEventHandler
{
public Task Invoke(EventBytesTransport transport);
public Task Invoke(List<EventBytesTransport> transports);
public Task Invoke(List<EventBytesTransport> transports, ISerializer serializer, IEventTypeFinder eventTypeFinder, ILogger logger, Type eventHandlerType);
}
}
......@@ -19,49 +19,11 @@ namespace Pole.EventBus.EventHandler
/// <summary>
///
/// </summary>
public abstract class PoleEventHandler<TEvent> : Grain
public abstract class PoleEventHandler<TEvent>:IPoleEventHandler,IPoleEventHandler<TEvent>
{
private IEventTypeFinder eventTypeFinder;
private ISerializer serializer;
private ILogger logger;
private Type grainType;
public abstract Task EventHandle(TEvent @event);
public PoleEventHandler()
{
grainType = GetType();
}
public override async Task OnActivateAsync()
{
await base.OnActivateAsync();
await DependencyInjection();
}
protected virtual Task DependencyInjection()
{
//ConfigOptions = ServiceProvider.GetOptionsByName<CoreOptions>(typeof(MainGrain).FullName);
serializer = ServiceProvider.GetService<ISerializer>();
eventTypeFinder = ServiceProvider.GetService<IEventTypeFinder>();
logger = (ILogger)ServiceProvider.GetService(typeof(ILogger<>).MakeGenericType(grainType));
return Task.CompletedTask;
}
public Task Invoke(EventBytesTransport transport)
{
var eventType = eventTypeFinder.FindType(transport.EventTypeCode);
var eventObj = serializer.Deserialize(transport.EventBytes, eventType);
if (this is IPoleEventHandler<TEvent> handler)
{
var result = handler.EventHandle((TEvent)eventObj);
logger.LogTrace($"{nameof(PoleEventHandler<TEvent>)} Invoke completed: {0}->{1}->{2}", grainType.FullName, nameof(handler.EventHandle), serializer.Serialize(eventObj));
return result;
}
else
{
throw new EventHandlerImplementedNotRightException(nameof(handler.EventHandle), eventType.Name, this.GetType().FullName);
}
}
public async Task Invoke(List<EventBytesTransport> transports)
public async Task Invoke(List<EventBytesTransport> transports, ISerializer serializer, IEventTypeFinder eventTypeFinder, ILogger logger,Type eventHandlerType)
{
if (transports.Count() != 0)
{
......@@ -71,14 +33,14 @@ namespace Pole.EventBus.EventHandler
if (this is IPoleBulkEventsHandler<TEvent> batchHandler)
{
await batchHandler.BulkEventsHandle(eventObjs);
logger.LogTrace("Batch invoke completed: {0}->{1}->{2}", grainType.FullName, nameof(batchHandler.BulkEventsHandle), serializer.Serialize(eventObjs));
logger.LogTrace("Batch invoke completed: {0}->{1}->{2}", eventHandlerType.FullName, nameof(batchHandler.BulkEventsHandle), serializer.Serialize(eventObjs));
return;
}
else if (this is IPoleEventHandler<TEvent> handler)
{
var handleTasks = eventObjs.Select(m => handler.EventHandle(m));
await Task.WhenAll(handleTasks);
logger.LogTrace("Batch invoke completed: {0}->{1}->{2}", grainType.FullName, nameof(handler.EventHandle), serializer.Serialize(eventObjs));
logger.LogTrace("Invoke completed: {0}->{1}->{2}", eventHandlerType.FullName, nameof(handler.EventHandle), serializer.Serialize(eventObjs));
return;
}
else
......
......@@ -5,7 +5,6 @@ namespace Pole.EventBus
{
public interface IConsumer
{
Task Notice(byte[] bytes);
Task Notice(List<byte[]> list);
}
}
......@@ -6,7 +6,6 @@ namespace Pole.EventBus
{
public interface IObserverUnit<PrimaryKey> : IGrainID
{
Func<byte[], Task> GetEventHandler();
Func<List<byte[]>, Task> GetBatchEventHandler();
}
}
......@@ -21,8 +21,6 @@ namespace Pole.EventBus
readonly IServiceProvider serviceProvider;
readonly ISerializer serializer;
readonly IEventTypeFinder typeFinder;
readonly IClusterClient clusterClient;
Func<byte[], Task> eventHandler;
Func<List<byte[]>, Task> batchEventHandler;
protected ILogger Logger { get; private set; }
public Type EventHandlerType { get; }
......@@ -30,7 +28,6 @@ namespace Pole.EventBus
public ObserverUnit(IServiceProvider serviceProvider, Type eventHandlerType)
{
this.serviceProvider = serviceProvider;
clusterClient = serviceProvider.GetService<IClusterClient>();
serializer = serviceProvider.GetService<ISerializer>();
typeFinder = serviceProvider.GetService<IEventTypeFinder>();
Logger = serviceProvider.GetService<ILogger<ObserverUnit<PrimaryKey>>>();
......@@ -41,11 +38,6 @@ namespace Pole.EventBus
return new ObserverUnit<PrimaryKey>(serviceProvider, typeof(Grain));
}
public Func<byte[], Task> GetEventHandler()
{
return eventHandler;
}
public Func<List<byte[]>, Task> GetBatchEventHandler()
{
return batchEventHandler;
......@@ -55,23 +47,8 @@ namespace Pole.EventBus
{
if (!typeof(IPoleEventHandler).IsAssignableFrom(EventHandlerType))
throw new NotSupportedException($"{EventHandlerType.FullName} must inheritance from PoleEventHandler");
eventHandler = EventHandler;
batchEventHandler = BatchEventHandler;
//内部函数
Task EventHandler(byte[] bytes)
{
var (success, transport) = EventBytesTransport.FromBytes(bytes);
if (success)
{
return GetObserver(EventHandlerType, transport.EventId).Invoke(transport);
}
else
{
if (Logger.IsEnabled(LogLevel.Error))
Logger.LogError($" EventId:{nameof(EventBytesTransport.EventId)} is not a event");
}
return Task.CompletedTask;
}
Task BatchEventHandler(List<byte[]> list)
{
var transports = list.Select(bytes =>
......@@ -87,23 +64,35 @@ namespace Pole.EventBus
.Select(o => (o.transport))
.ToList();
// 批量处理的时候 grain Id 取第一个 event的id
return GetObserver(EventHandlerType, transports.First().EventId).Invoke(transports);
using (var scope = serviceProvider.CreateScope())
{
var eventHandlerInstance = scope.ServiceProvider.GetRequiredService(EventHandlerType);
var serializer = scope.ServiceProvider.GetRequiredService<ISerializer>() as ISerializer;
var eventTypeFinder = scope.ServiceProvider.GetRequiredService<IEventTypeFinder>() as IEventTypeFinder;
var loggerFactory = scope.ServiceProvider.GetRequiredService<ILoggerFactory>() as ILoggerFactory;
var logger = loggerFactory.CreateLogger(EventHandlerType);
return GetObserver(EventHandlerType)(eventHandlerInstance, transports, serializer, eventTypeFinder, logger, EventHandlerType);
}
}
}
static readonly ConcurrentDictionary<Type, Func<IClusterClient, string, string, IPoleEventHandler>> _observerGeneratorDict = new ConcurrentDictionary<Type, Func<IClusterClient, string, string, IPoleEventHandler>>();
private IPoleEventHandler GetObserver(Type ObserverType, string primaryKey)
static readonly ConcurrentDictionary<Type, Func<object, List<EventBytesTransport>, ISerializer, IEventTypeFinder, ILogger, Type, Task>> _observerGeneratorDict = new ConcurrentDictionary<Type, Func<object, List<EventBytesTransport>, ISerializer, IEventTypeFinder, ILogger, Type, Task>>();
private Func<object, List<EventBytesTransport>, ISerializer, IEventTypeFinder, ILogger, Type, Task> GetObserver(Type ObserverType)
{
var func = _observerGeneratorDict.GetOrAdd(ObserverType, key =>
{
var clientType = typeof(IClusterClient);
var clientParams = Expression.Parameter(clientType, "client");
var primaryKeyParams = Expression.Parameter(typeof(string), "primaryKey");
var grainClassNamePrefixParams = Expression.Parameter(typeof(string), "grainClassNamePrefix");
var method = typeof(ClusterClientExtensions).GetMethod("GetGrain", new Type[] { clientType, typeof(string), typeof(string) });
var body = Expression.Call(method.MakeGenericMethod(ObserverType), clientParams, primaryKeyParams, grainClassNamePrefixParams);
return Expression.Lambda<Func<IClusterClient, string, string, IPoleEventHandler>>(body, clientParams, primaryKeyParams, grainClassNamePrefixParams).Compile();
var eventHandlerObjParams = Expression.Parameter(typeof(object), "observerType");
var eventHandlerParams = Expression.Convert(eventHandlerObjParams, ObserverType);
var eventBytesTransportParams = Expression.Parameter(typeof(List<EventBytesTransport>), "observerType");
var serializerParams = Expression.Parameter(typeof(ISerializer), "serializer");
var eventTypeFinderParams = Expression.Parameter(typeof(IEventTypeFinder), "eventTypeFinder");
var loggerParams = Expression.Parameter(typeof(ILogger), "logger");
var eventHandlerTypeParams = Expression.Parameter(typeof(Type), "eventHandlerType");
var method = typeof(IPoleEventHandler).GetMethod("Invoke");
var body = Expression.Call(eventHandlerParams, method, eventBytesTransportParams, serializerParams, serializerParams, eventTypeFinderParams, loggerParams, eventHandlerTypeParams);
return Expression.Lambda<Func<object, List<EventBytesTransport>, ISerializer, IEventTypeFinder, ILogger, Type, Task>>(body, eventHandlerObjParams, eventBytesTransportParams, serializerParams, eventTypeFinderParams, loggerParams, eventHandlerTypeParams).Compile();
});
return func(clusterClient, primaryKey, null);
return func;
}
}
public static class ClusterClientExtensions
......
......@@ -22,23 +22,24 @@ namespace Pole.EventBus
{
foreach (var type in assembly.GetTypes().Where(m => typeof(IPoleEventHandler).IsAssignableFrom(m) && m.IsClass && !m.IsAbstract && !typeof(Orleans.Runtime.GrainReference).IsAssignableFrom(m)))
{
var eventHandlerInterface = type.GetInterfaces().FirstOrDefault(type => typeof(IPoleEventHandler).IsAssignableFrom(type) && !type.IsGenericType);
var basePoleEventHandlerInterface= eventHandlerInterface.GetInterfaces().FirstOrDefault(m=>m.IsGenericType);
var eventType = type.GetGenericArguments().FirstOrDefault();
//var eventHandlerInterface = type.GetInterfaces().FirstOrDefault(type => typeof(IPoleEventHandler).IsAssignableFrom(type) && !type.IsGenericType);
//var basePoleEventHandlerInterface= eventHandlerInterface.GetInterfaces().FirstOrDefault(m=>m.IsGenericType);
if (basePoleEventHandlerInterface == null)
{
throw new PoleEventHandlerImplementException("PoleEventHandler interface must Inherited from IPoleEventHandler<TEvent>");
}
var eventType= basePoleEventHandlerInterface.GetGenericArguments().FirstOrDefault();
if (eventType == null)
{
throw new PoleEventHandlerImplementException("PoleEventHandler interface must Inherited from IPoleEventHandler<TEvent>");
}
//if (basePoleEventHandlerInterface == null)
//{
// throw new PoleEventHandlerImplementException("PoleEventHandler interface must Inherited from IPoleEventHandler<TEvent>");
//}
//var eventType= basePoleEventHandlerInterface.GetGenericArguments().FirstOrDefault();
//if (eventType == null)
//{
// throw new PoleEventHandlerImplementException("PoleEventHandler interface must Inherited from IPoleEventHandler<TEvent>");
//}
var attribute = eventType.GetCustomAttributes(typeof(EventInfoAttribute), false).FirstOrDefault();
if (attribute != null)
{
eventHandlerList.Add((eventHandlerInterface, (EventInfoAttribute)attribute));
eventHandlerList.Add((type, (EventInfoAttribute)attribute));
}
else
{
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment