Commit b2ed39db by 丁松杰

修改 内部逻辑

parent d9d03b97
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
namespace Pole.Core.Abstractions
{
public interface IObserverUnit<PrimaryKey> : IGrainID
{
/// <summary>
/// 获取所有监听者分组
/// </summary>
/// <returns></returns>
List<string> GetGroups();
Task<long[]> GetAndSaveVersion(PrimaryKey primaryKey, long srcVersion);
/// <summary>
/// 重置Grain
/// </summary>
/// <param name="primaryKey">重置Grain</param>
/// <returns></returns>
Task Reset(PrimaryKey primaryKey);
List<Func<byte[], Task>> GetEventHandlers(string observerGroup);
List<Func<byte[], Task>> GetAllEventHandlers();
List<Func<List<byte[]>, Task>> GetBatchEventHandlers(string observerGroup);
List<Func<List<byte[]>, Task>> GetAllBatchEventHandlers();
}
}
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.Core.Abstraction
{
public interface ITypeFinder
{
Type FindType(string code);
string GetCode(Type type);
}
}
......@@ -5,8 +5,8 @@ using System.Text;
namespace Pole.Core.EventBus.Event
{
[AttributeUsage(AttributeTargets.Class)]
public class EventInfoAttribute: Attribute
public class EventAttribute: Attribute
{
public string SendBoxName { get; set; }
public string EventName { get; set; }
}
}
using Pole.Core.Utils;
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.Core.EventBus.Event
{
public class EventBase
{
public EventBase() { }
public EventBase(long version, long timestamp)
{
Version = version;
Timestamp = timestamp;
}
public long Version { get; set; }
public long Timestamp { get; set; }
public byte[] GetBytes()
{
using var ms = new PooledMemoryStream();
ms.Write(BitConverter.GetBytes(Version));
ms.Write(BitConverter.GetBytes(Timestamp));
return ms.ToArray();
}
public static EventBase FromBytes(byte[] bytes)
{
var bytesSpan = bytes.AsSpan();
return new EventBase(BitConverter.ToInt64(bytesSpan), BitConverter.ToInt64(bytesSpan.Slice(sizeof(long))));
}
}
}
using Pole.Core.Serialization;
using Pole.Core.Utils;
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.Core.EventBus.Event
{
public readonly struct EventBytesTransport
{
public EventBytesTransport(string eventCode, object grainId, byte[] baseBytes, byte[] eventBytes)
{
EventTypeCode = eventCode;
GrainId = grainId;
BaseBytes = baseBytes;
EventBytes = eventBytes;
}
/// <summary>
/// 事件TypeCode
/// </summary>
public string EventTypeCode { get; }
/// <summary>
/// 事件GrainId
/// </summary>
public object GrainId { get; }
/// <summary>
/// 事件base信息的bytes
/// </summary>
public byte[] BaseBytes { get; }
/// <summary>
/// 事件本身的bytes
/// </summary>
public byte[] EventBytes { get; }
public byte[] GetBytes()
{
var eventTypeBytes = Encoding.UTF8.GetBytes(EventTypeCode);
byte[] actorIdBytes;
var strId = GrainId as string;
actorIdBytes = Encoding.UTF8.GetBytes(strId);
using var ms = new PooledMemoryStream();
ms.WriteByte((byte)TransportType.Event);
ms.Write(BitConverter.GetBytes((ushort)eventTypeBytes.Length));
ms.Write(BitConverter.GetBytes((ushort)actorIdBytes.Length));
ms.Write(BitConverter.GetBytes((ushort)BaseBytes.Length));
ms.Write(BitConverter.GetBytes(EventBytes.Length));
ms.Write(eventTypeBytes);
ms.Write(actorIdBytes);
ms.Write(BaseBytes);
ms.Write(EventBytes);
return ms.ToArray();
}
public static (bool success, PrimaryKey actorId) GetActorId<PrimaryKey>(byte[] bytes)
{
if (bytes[0] == (byte)TransportType.Event)
{
var bytesSpan = bytes.AsSpan();
var eventTypeLength = BitConverter.ToUInt16(bytesSpan.Slice(1, sizeof(ushort)));
var actorIdBytesLength = BitConverter.ToUInt16(bytesSpan.Slice(1 + sizeof(ushort), sizeof(ushort)));
if (typeof(PrimaryKey) == typeof(long))
{
var id = BitConverter.ToInt64(bytesSpan.Slice(3 * sizeof(ushort) + 1 + sizeof(int) + eventTypeLength, actorIdBytesLength));
if (id is PrimaryKey actorId)
return (true, actorId);
}
else
{
var id = Encoding.UTF8.GetString(bytesSpan.Slice(3 * sizeof(ushort) + 1 + sizeof(int) + eventTypeLength, actorIdBytesLength));
if (id is PrimaryKey actorId)
return (true, actorId);
}
}
return (false, default);
}
public static (bool success, EventBytesTransport transport) FromBytesWithNoId(byte[] bytes)
{
if (bytes[0] == (byte)TransportType.Event)
{
var bytesSpan = bytes.AsSpan();
var eventTypeLength = BitConverter.ToUInt16(bytesSpan.Slice(1, sizeof(ushort)));
var actorIdBytesLength = BitConverter.ToUInt16(bytesSpan.Slice(1 + sizeof(ushort), sizeof(ushort)));
var baseBytesLength = BitConverter.ToUInt16(bytesSpan.Slice(2 * sizeof(ushort) + 1, sizeof(ushort)));
var eventBytesLength = BitConverter.ToInt32(bytesSpan.Slice(3 * sizeof(ushort) + 1, sizeof(int)));
var skipLength = 3 * sizeof(ushort) + 1 + sizeof(int);
return (true, new EventBytesTransport(
Encoding.UTF8.GetString(bytesSpan.Slice(skipLength, eventTypeLength)),
null,
bytesSpan.Slice(skipLength + eventTypeLength + actorIdBytesLength, baseBytesLength).ToArray(),
bytesSpan.Slice(skipLength + eventTypeLength + actorIdBytesLength + baseBytesLength, eventBytesLength).ToArray()
));
}
return (false, default);
}
public static (bool success, EventBytesTransport transport) FromBytes<PrimaryKey>(byte[] bytes)
{
if (bytes[0] == (byte)TransportType.Event)
{
var bytesSpan = bytes.AsSpan();
var eventTypeLength = BitConverter.ToUInt16(bytesSpan.Slice(1, sizeof(ushort)));
var actorIdBytesLength = BitConverter.ToUInt16(bytesSpan.Slice(1 + sizeof(ushort), sizeof(ushort)));
var baseBytesLength = BitConverter.ToUInt16(bytesSpan.Slice(2 * sizeof(ushort) + 1, sizeof(ushort)));
var eventBytesLength = BitConverter.ToInt32(bytesSpan.Slice(3 * sizeof(ushort) + 1, sizeof(int)));
var skipLength = 3 * sizeof(ushort) + 1 + sizeof(int);
if (typeof(PrimaryKey) == typeof(long))
{
var actorId = BitConverter.ToInt64(bytesSpan.Slice(3 * sizeof(ushort) + 1 + sizeof(int) + eventTypeLength, actorIdBytesLength));
return (true, new EventBytesTransport(
Encoding.UTF8.GetString(bytesSpan.Slice(skipLength, eventTypeLength)),
actorId,
bytesSpan.Slice(skipLength + eventTypeLength + actorIdBytesLength, baseBytesLength).ToArray(),
bytesSpan.Slice(skipLength + eventTypeLength + actorIdBytesLength + baseBytesLength, eventBytesLength).ToArray()
));
}
else
{
var actorId = Encoding.UTF8.GetString(bytesSpan.Slice(skipLength + eventTypeLength, actorIdBytesLength));
return (true, new EventBytesTransport(
Encoding.UTF8.GetString(bytesSpan.Slice(skipLength, eventTypeLength)),
actorId,
bytesSpan.Slice(skipLength + eventTypeLength + actorIdBytesLength, baseBytesLength).ToArray(),
bytesSpan.Slice(skipLength + eventTypeLength + actorIdBytesLength + baseBytesLength, eventBytesLength).ToArray()
));
}
}
return (false, default);
}
}
}
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.Core.EventBus.Event
{
public class FullyEvent<PrimaryKey>
{
public IEvent Event { get; set; }
public EventBase Base { get; set; }
public PrimaryKey StateId { get; set; }
}
}
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.Core.EventBus.EventHandler
{
[AttributeUsage(AttributeTargets.Class)]
public class EventHandlerAttribute: Attribute
{
public string EventName { get; set; }
public string EventHandlerName { get; set; }
}
}
using Orleans.Concurrency;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
namespace Pole.Core.EventBus.EventHandler
{
public class PoleEventHandler : PoleEventHandlerBase
{
public override Task BatchInvoke(Immutable<List<byte[]>> bytes)
{
throw new NotImplementedException();
}
public override Task Invoke(Immutable<byte[]> bytes)
{
throw new NotImplementedException();
}
}
}
using Orleans;
using Orleans.Concurrency;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
namespace Pole.Core.EventBus.EventHandler
{
public abstract class PoleEventHandlerBase : Grain
{
public abstract Task Invoke(Immutable<byte[]> bytes);
public abstract Task BatchInvoke(Immutable<List<byte[]>> bytes);
}
}
using System;
namespace Pole.Core.Abstractions
namespace Pole.Core.EventBus
{
public interface IGrainID
{
......
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
namespace Pole.Core.EventBus
{
public interface IObserverUnit<PrimaryKey> : IGrainID
{
List<Func<byte[], Task>> GetEventHandlers();
List<Func<List<byte[]>, Task>> GetBatchEventHandlers();
}
}
using System;
using System.Collections.Generic;
namespace Pole.Core.Abstractions
namespace Pole.Core.EventBus
{
public interface IObserverUnitContainer
{
IObserverUnit<PrimaryKey> GetUnit<PrimaryKey>(Type grainType);
object GetUnit(Type grainType);
void Register(IGrainID followUnit);
List<IObserverUnit<PrimaryKey>> GetUnits<PrimaryKey>(string observerName);
List<object> GetUnits(string observerName);
void Register<PrimaryKey>(string observerName,IGrainID followUnit);
}
}
using Microsoft.Extensions.Logging;
using Orleans;
using Pole.Core.Serialization;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Pole.Core.Abstraction;
using System.Linq;
using Pole.Core.EventBus.Event;
using Orleans.Concurrency;
using System.Collections.Concurrent;
using System.Linq.Expressions;
using Pole.Core.EventBus.EventHandler;
namespace Pole.Core.EventBus
{
public class ObserverUnit<PrimaryKey> : IObserverUnit<PrimaryKey>
{
readonly IServiceProvider serviceProvider;
readonly ISerializer serializer;
readonly ITypeFinder typeFinder;
readonly IClusterClient clusterClient;
readonly List<Func<byte[], Task>> eventHandlers = new List<Func<byte[], Task>>();
readonly List<Func<List<byte[]>, Task>> batchEventHandlers = new List<Func<List<byte[]>, Task>>();
protected ILogger Logger { get; private set; }
public Type GrainType { get; }
public ObserverUnit(IServiceProvider serviceProvider, Type grainType)
{
this.serviceProvider = serviceProvider;
clusterClient = serviceProvider.GetService<IClusterClient>();
serializer = serviceProvider.GetService<ISerializer>();
typeFinder = serviceProvider.GetService<ITypeFinder>();
Logger = serviceProvider.GetService<ILogger<ObserverUnit<PrimaryKey>>>();
GrainType = grainType;
}
public static ObserverUnit<PrimaryKey> From<Grain>(IServiceProvider serviceProvider) where Grain : Orleans.Grain
{
return new ObserverUnit<PrimaryKey>(serviceProvider, typeof(Grain));
}
public List<Func<byte[], Task>> GetEventHandlers()
{
return eventHandlers;
}
public List<Func<List<byte[]>, Task>> GetBatchEventHandlers()
{
return batchEventHandlers;
}
public ObserverUnit<PrimaryKey> UnreliableObserver(
Func<IServiceProvider,
FullyEvent<PrimaryKey>, ValueTask> handler)
{
GetEventHandlers().Add(EventHandler);
GetBatchEventHandlers().Add(BatchEventHandler);
return this;
//内部函数
Task EventHandler(byte[] bytes)
{
var (success, transport) = EventBytesTransport.FromBytes<PrimaryKey>(bytes);
if (success)
{
var data = serializer.Deserialize(transport.EventBytes, typeFinder.FindType(transport.EventTypeCode));
if (data is IEvent @event && transport.GrainId is PrimaryKey actorId)
{
var eventBase = EventBase.FromBytes(transport.BaseBytes);
var tellTask = handler(serviceProvider, new FullyEvent<PrimaryKey>
{
StateId = actorId,
Base = eventBase,
Event = @event
});
if (!tellTask.IsCompletedSuccessfully)
return tellTask.AsTask();
}
}
return Task.CompletedTask;
}
Task BatchEventHandler(List<byte[]> list)
{
var groups =
list.Select(b => EventBytesTransport.FromBytes<PrimaryKey>(b))
.Where(o => o.success)
.Select(o => o.transport)
.GroupBy(o => o.GrainId);
return Task.WhenAll(groups.Select(async kv =>
{
foreach (var transport in kv)
{
var data = serializer.Deserialize(transport.EventBytes, typeFinder.FindType(transport.EventTypeCode));
if (data is IEvent @event && transport.GrainId is PrimaryKey actorId)
{
var eventBase = EventBase.FromBytes(transport.BaseBytes);
var tellTask = handler(serviceProvider, new FullyEvent<PrimaryKey>
{
StateId = actorId,
Base = eventBase,
Event = @event
});
if (!tellTask.IsCompletedSuccessfully)
await tellTask;
}
}
}));
}
}
public void Observer(Type observerType)
{
if (!typeof(PoleEventHandlerBase).IsAssignableFrom(observerType))
throw new NotSupportedException($"{observerType.FullName} must inheritance from PoleEventHandler");
GetEventHandlers().Add(EventHandler);
GetBatchEventHandlers().Add(BatchEventHandler);
//内部函数
Task EventHandler(byte[] bytes)
{
var (success, actorId) = EventBytesTransport.GetActorId<PrimaryKey>(bytes);
if (success)
{
return GetObserver(observerType, actorId).Invoke(new Immutable<byte[]>(bytes));
}
else
{
if (Logger.IsEnabled(LogLevel.Error))
Logger.LogError($"{nameof(EventBytesTransport.GetActorId)} failed");
}
return Task.CompletedTask;
}
Task BatchEventHandler(List<byte[]> list)
{
var groups = list.Select(bytes =>
{
var (success, GrainId) = EventBytesTransport.GetActorId<PrimaryKey>(bytes);
if (!success)
{
if (Logger.IsEnabled(LogLevel.Error))
Logger.LogError($"{nameof(EventBytesTransport.GetActorId)} failed");
}
return (success, GrainId, bytes);
}).Where(o => o.success).GroupBy(o => o.GrainId);
return Task.WhenAll(groups.Select(kv =>
{
var items = kv.Select(item => item.bytes).ToList();
return GetObserver(observerType, kv.Key).BatchInvoke(new Immutable<List<byte[]>>(items));
}));
}
}
static readonly ConcurrentDictionary<Type, Func<IClusterClient, PrimaryKey, string, PoleEventHandlerBase>> _observerGeneratorDict = new ConcurrentDictionary<Type, Func<IClusterClient, PrimaryKey, string, PoleEventHandlerBase>>();
private PoleEventHandlerBase GetObserver(Type ObserverType, PrimaryKey primaryKey)
{
var func = _observerGeneratorDict.GetOrAdd(ObserverType, key =>
{
var clientType = typeof(IClusterClient);
var clientParams = Expression.Parameter(clientType, "client");
var primaryKeyParams = Expression.Parameter(typeof(PrimaryKey), "primaryKey");
var grainClassNamePrefixParams = Expression.Parameter(typeof(string), "grainClassNamePrefix");
var method = typeof(ClusterClientExtensions).GetMethod("GetGrain", new Type[] { clientType, typeof(PrimaryKey), typeof(string) });
var body = Expression.Call(method.MakeGenericMethod(ObserverType), clientParams, primaryKeyParams, grainClassNamePrefixParams);
return Expression.Lambda<Func<IClusterClient, PrimaryKey, string, PoleEventHandlerBase>>(body, clientParams, primaryKeyParams, grainClassNamePrefixParams).Compile();
});
return func(clusterClient, primaryKey, null);
}
}
public static class ClusterClientExtensions
{
public static TGrainInterface GetGrain<TGrainInterface>(IClusterClient client, string primaryKey, string grainClassNamePrefix = null) where TGrainInterface : IGrainWithStringKey
{
return client.GetGrain<TGrainInterface>(primaryKey, grainClassNamePrefix);
}
}
}
using Microsoft.Extensions.Logging;
using Pole.Core.Observer;
using Pole.Core.Utils;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Text;
using Microsoft.Extensions.DependencyInjection;
using Pole.Core.EventBus.EventHandler;
using System.Linq;
using Pole.Core.Exceptions;
namespace Pole.Core.EventBus
{
public class ObserverUnitContainer : IObserverUnitContainer
{
readonly ConcurrentDictionary<string, List<object>> unitDict = new ConcurrentDictionary<string, List<object>>();
public ObserverUnitContainer(IServiceProvider serviceProvider)
{
var eventHandlerList = new List<(Type, EventHandlerAttribute)>();
foreach (var assembly in AssemblyHelper.GetAssemblies(serviceProvider.GetService<ILogger<ObserverUnitContainer>>()))
{
foreach (var type in assembly.GetTypes())
{
foreach (var attribute in type.GetCustomAttributes(false))
{
if (attribute is EventHandlerAttribute observer)
{
eventHandlerList.Add((type, observer));
break;
}
}
}
}
foreach (var eventHandler in eventHandlerList)
{
var unitType = typeof(ObserverUnit<>).MakeGenericType(new Type[] { typeof(string) });
var unit = (ObserverUnit<string>)Activator.CreateInstance(unitType, serviceProvider, eventHandler.Item1);
Register<string>(eventHandler.Item2.EventName, unit);
}
}
public List<IObserverUnit<PrimaryKey>> GetUnits<PrimaryKey>(string observerName)
{
if (unitDict.TryGetValue(observerName, out var units))
{
if (units is List<IObserverUnit<PrimaryKey>> result)
{
return result;
}
else
throw new UnmatchObserverUnitException(observerName);
}
else
throw new UnfindObserverUnitException(observerName);
}
public List<object> GetUnits(string observerName)
{
if (unitDict.TryGetValue(observerName, out var unit))
{
return unit;
}
else
throw new UnfindObserverUnitException(observerName);
}
public void Register<PrimaryKey>(string observerName, IGrainID observerUnit)
{
if (unitDict.TryGetValue(observerName, out List<object> units))
{
units.Add(observerUnit);
}
if (!unitDict.TryAdd(observerName, new List<object> { observerUnit }))
{
throw new ObserverUnitRepeatedException(observerUnit.GrainType.FullName);
}
}
}
}
......@@ -4,7 +4,7 @@ namespace Pole.Core.Exceptions
{
public class UnmatchObserverUnitException : Exception
{
public UnmatchObserverUnitException(string grainName, string unitName) : base($"{unitName} and {grainName} do not match")
public UnmatchObserverUnitException(string unitName) : base($"{unitName} do not match")
{
}
}
......
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.Core.Serialization
{
public interface ISerializer
{
object Deserialize(byte[] bytes, Type type);
T Deserialize<T>(byte[] bytes) where T : class, new();
T Deserialize<T>(string json) where T : class, new();
object Deserialize(string json, Type type);
string Serialize<T>(T data) where T : class, new();
string Serialize(object data, Type type);
byte[] SerializeToUtf8Bytes<T>(T data) where T : class, new();
byte[] SerializeToUtf8Bytes(object data, Type type);
}
}
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.Core.Serialization
{
/// <summary>
/// 消息序列化传输的类别
/// </summary>
public enum TransportType : byte
{
/// <summary>
/// 通用序列化消息
/// </summary>
Common = 0,
/// <summary>
/// 事件序列化消息
/// </summary>
Event = 1
}
}
......@@ -2,7 +2,6 @@
using Microsoft.Extensions.Logging;
using Orleans;
using RabbitMQ.Client;
using Pole.Core.Abstractions;
using Pole.Core.EventBus;
using Pole.Core.Exceptions;
using Pole.Core.Utils;
......@@ -10,6 +9,9 @@ using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading.Tasks;
using Pole.Core.EventBus.Event;
using Pole.Core.EventBus.EventHandler;
using Microsoft.Extensions.Options;
namespace Pole.EventBus.RabbitMQ
{
......@@ -20,58 +22,45 @@ namespace Pole.EventBus.RabbitMQ
readonly IRabbitMQClient rabbitMQClient;
readonly IServiceProvider serviceProvider;
private readonly IObserverUnitContainer observerUnitContainer;
private readonly RabbitOptions rabbitOptions;
public EventBusContainer(
IServiceProvider serviceProvider,
IObserverUnitContainer observerUnitContainer,
IRabbitMQClient rabbitMQClient)
IRabbitMQClient rabbitMQClient,
IOptions<RabbitOptions> rabbitOptions)
{
this.serviceProvider = serviceProvider;
this.rabbitMQClient = rabbitMQClient;
this.observerUnitContainer = observerUnitContainer;
this.rabbitOptions = rabbitOptions.Value;
}
public async Task AutoRegister()
{
var observableList = new List<(Type type, ProducerAttribute config)>();
foreach (var assembly in AssemblyHelper.GetAssemblies(serviceProvider.GetService<ILogger<EventBusContainer>>()))
var eventList = new List<(Type type, EventAttribute config)>();
var evenHandlertList = new List<(Type type, EventHandlerAttribute config)>();
AddEventAndEventHandlerInfoList(eventList, evenHandlertList);
foreach (var (type, config) in eventList)
{
foreach (var type in assembly.GetTypes())
{
foreach (var attribute in type.GetCustomAttributes(false))
{
if (attribute is ProducerAttribute config)
{
observableList.Add((type, config));
break;
}
}
}
var eventName = string.IsNullOrEmpty(config.EventName) ? type.Name : config.EventName;
var eventBus = CreateEventBus(eventName, rabbitOptions.Prefix, 2, true, true, true).BindEvent(type, eventName);
await eventBus.AddGrainConsumer<string>();
}
foreach (var (type, config) in observableList)
foreach (var (type, config) in evenHandlertList)
{
var eventBus = CreateEventBus(string.IsNullOrEmpty(config.Exchange) ? type.Name : config.Exchange, string.IsNullOrEmpty(config.RoutePrefix) ? type.Name : config.RoutePrefix, config.LBCount, config.AutoAck, config.Reenqueue, config.Persistent).BindProducer(type);
if (typeof(IGrainWithIntegerKey).IsAssignableFrom(type))
{
await eventBus.AddGrainConsumer<long>();
}
else if (typeof(IGrainWithStringKey).IsAssignableFrom(type))
{
await eventBus.AddGrainConsumer<string>();
}
else
throw new PrimaryKeyTypeException(type.FullName);
var eventName = string.IsNullOrEmpty(config.EventName) ? type.Name : config.EventName;
var eventBus = CreateEventBus(eventName, rabbitOptions.Prefix, 2, true, true, true).BindEvent(type, eventName);
await eventBus.AddGrainConsumer<string>();
}
}
public RabbitEventBus CreateEventBus(string exchange, string routePrefix, int lBCount = 1, bool autoAck = false, bool reenqueue = false, bool persistent = false)
public RabbitEventBus CreateEventBus(string exchange, string routePrefix, int lBCount = 1, bool autoAck = true, bool reenqueue = true, bool persistent = true)
{
return new RabbitEventBus(observerUnitContainer, this, exchange, routePrefix, lBCount, autoAck, reenqueue, persistent);
}
public RabbitEventBus CreateEventBus<MainGrain>(string exchange, string routePrefix, int lBCount = 1, bool autoAck = false, bool reenqueue = false, bool persistent = false)
{
return CreateEventBus(exchange, routePrefix, lBCount, autoAck, reenqueue, persistent).BindProducer<MainGrain>();
}
public Task Work(RabbitEventBus bus)
{
if (eventBusDictionary.TryAdd(bus.ProducerType, bus))
if (eventBusDictionary.TryAdd(bus.Event, bus))
{
eventBusList.Add(bus);
using var channel = rabbitMQClient.PullModel();
......@@ -79,7 +68,7 @@ namespace Pole.EventBus.RabbitMQ
return Task.CompletedTask;
}
else
throw new EventBusRepeatException(bus.ProducerType.FullName);
throw new EventBusRepeatException(bus.Event.FullName);
}
readonly ConcurrentDictionary<Type, IProducer> producerDict = new ConcurrentDictionary<Type, IProducer>();
......@@ -110,5 +99,40 @@ namespace Pole.EventBus.RabbitMQ
}
return result;
}
#region helpers
private void AddEventAndEventHandlerInfoList(List<(Type type, EventAttribute config)> eventList, List<(Type type, EventHandlerAttribute config)> evenHandlertList)
{
foreach (var assembly in AssemblyHelper.GetAssemblies(serviceProvider.GetService<ILogger<EventBusContainer>>()))
{
foreach (var type in assembly.GetTypes())
{
foreach (var attribute in type.GetCustomAttributes(false))
{
if (attribute is EventAttribute config)
{
eventList.Add((type, config));
break;
}
}
}
}
foreach (var assembly in AssemblyHelper.GetAssemblies(serviceProvider.GetService<ILogger<EventBusContainer>>()))
{
foreach (var type in assembly.GetTypes())
{
foreach (var attribute in type.GetCustomAttributes(false))
{
if (attribute is EventHandlerAttribute config)
{
evenHandlertList.Add((type, config));
break;
}
}
}
}
}
#endregion
}
}
......@@ -7,7 +7,6 @@ namespace Pole.EventBus.RabbitMQ
{
Task AutoRegister();
RabbitEventBus CreateEventBus(string exchange, string routePrefix, int lBCount = 1, bool autoAck = false, bool reenqueue = false, bool persistent = false);
RabbitEventBus CreateEventBus<MainGrain>(string routePrefix, string queue, int lBCount = 1, bool autoAck = false, bool reenqueue = false, bool persistent = false);
Task Work(RabbitEventBus bus);
}
}
using Pole.Core.Abstractions;
using Pole.Core.EventBus;
using Pole.Core.Exceptions;
using Pole.Core.Utils;
using System;
......@@ -34,18 +34,7 @@ namespace Pole.EventBus.RabbitMQ
AutoAck = autoAck,
Reenqueue = reenqueue,
};
RouteList = new List<string>();
if (LBCount == 1)
{
RouteList.Add(routePrefix);
}
else
{
for (int i = 0; i < LBCount; i++)
{
RouteList.Add($"{routePrefix }_{ i.ToString()}");
}
}
RouteList = new List<string>() { $"{routePrefix }" };
_CHash = new ConsistentHash(RouteList, lBCount * 10);
}
public IRabbitEventBusContainer Container { get; }
......@@ -54,7 +43,8 @@ namespace Pole.EventBus.RabbitMQ
public int LBCount { get; }
public ConsumerOptions ConsumerConfig { get; set; }
public List<string> RouteList { get; }
public Type ProducerType { get; set; }
public Type Event { get; set; }
public string EventName { get; set; }
/// <summary>
/// 消息是否持久化
/// </summary>
......@@ -64,31 +54,28 @@ namespace Pole.EventBus.RabbitMQ
{
return LBCount == 1 ? RoutePrefix : _CHash.GetNode(key); ;
}
public RabbitEventBus BindProducer<TGrain>()
{
return BindProducer(typeof(TGrain));
}
public RabbitEventBus BindProducer(Type grainType)
public RabbitEventBus BindEvent(Type eventType, string eventName)
{
if (ProducerType == null)
ProducerType = grainType;
else
throw new EventBusRepeatBindingProducerException(grainType.FullName);
Event = eventType;
EventName = eventName;
return this;
}
public RabbitEventBus AddGrainConsumer<PrimaryKey>(string observerGroup)
public Task AddGrainConsumer<PrimaryKey>()
{
var observerUnit = observerUnitContainer.GetUnit<PrimaryKey>(ProducerType);
var consumer = new RabbitConsumer(
observerUnit.GetEventHandlers(observerGroup),
observerUnit.GetBatchEventHandlers(observerGroup))
var observerUnits = observerUnitContainer.GetUnits<PrimaryKey>(EventName);
foreach (var observerUnit in observerUnits)
{
EventBus = this,
QueueList = RouteList.Select(route => new QueueInfo { RoutingKey = route, Queue = $"{route}_{observerGroup}" }).ToList(),
Config = ConsumerConfig
};
Consumers.Add(consumer);
return this;
var consumer = new RabbitConsumer(
observerUnit.GetEventHandlers(),
observerUnit.GetBatchEventHandlers())
{
EventBus = this,
QueueList = RouteList.Select(route => new QueueInfo { RoutingKey = "", Queue = $"{route}_{EventName}" }).ToList(),
Config = ConsumerConfig
};
Consumers.Add(consumer);
}
return Enable();
}
public RabbitEventBus AddConsumer(
Func<byte[], Task> handler,
......@@ -110,13 +97,5 @@ namespace Pole.EventBus.RabbitMQ
{
return Container.Work(this);
}
public Task AddGrainConsumer<PrimaryKey>()
{
foreach (var group in observerUnitContainer.GetUnit<PrimaryKey>(ProducerType).GetGroups())
{
AddGrainConsumer<PrimaryKey>(group);
};
return Enable();
}
}
}
using Pole.Core;
using Pole.Core;
using Pole.Core.EventBus;
using System.Threading.Tasks;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment