From b2ed39dbfc9bb8fd83da4205355f68430b446236 Mon Sep 17 00:00:00 2001 From: 丁松杰 <377973147@qq.com> Date: Sun, 9 Feb 2020 18:42:06 +0800 Subject: [PATCH] 修改 内部逻辑 --- src/Pole.Core/Abstraction/IGrainID.cs | 9 --------- src/Pole.Core/Abstraction/IObserverUnit.cs | 26 -------------------------- src/Pole.Core/Abstraction/IObserverUnitContainer.cs | 11 ----------- src/Pole.Core/Abstraction/ITypeFinder.cs | 12 ++++++++++++ src/Pole.Core/EventBus/Event/EventAttribute.cs | 12 ++++++++++++ src/Pole.Core/EventBus/Event/EventBase.cs | 31 +++++++++++++++++++++++++++++++ src/Pole.Core/EventBus/Event/EventBytesTransport.cs | 127 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/Pole.Core/EventBus/Event/EventInfoAttribute.cs | 12 ------------ src/Pole.Core/EventBus/Event/FullyEvent.cs | 13 +++++++++++++ src/Pole.Core/EventBus/EventHandler/EventHandlerAttribute.cs | 13 +++++++++++++ src/Pole.Core/EventBus/EventHandler/PoleEventHandler.cs | 21 +++++++++++++++++++++ src/Pole.Core/EventBus/EventHandler/PoleEventHandlerBase.cs | 15 +++++++++++++++ src/Pole.Core/EventBus/IGrainID.cs | 9 +++++++++ src/Pole.Core/EventBus/IObserverUnit.cs | 12 ++++++++++++ src/Pole.Core/EventBus/IObserverUnitContainer.cs | 12 ++++++++++++ src/Pole.Core/EventBus/ObserverUnit.cs | 174 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/Pole.Core/EventBus/ObserverUnitContainer.cs | 80 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/Pole.Core/Exceptions/UnMatchObserverUnitException.cs | 2 +- src/Pole.Core/Serialization/ISerializer.cs | 18 ++++++++++++++++++ src/Pole.Core/Serialization/TransportType.cs | 21 +++++++++++++++++++++ src/Pole.EventBus.Rabbitmq/Core/EventBusContainer.cs | 90 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++--------------------------------- src/Pole.EventBus.Rabbitmq/Core/IRabbitEventBusContainer.cs | 1 - src/Pole.EventBus.Rabbitmq/Core/RabbitEventBus.cs | 63 +++++++++++++++++++++------------------------------------------ src/Pole.EventBus.Rabbitmq/Producer/RabbitProducer.cs | 1 - 24 files changed, 649 insertions(+), 136 deletions(-) delete mode 100644 src/Pole.Core/Abstraction/IGrainID.cs delete mode 100644 src/Pole.Core/Abstraction/IObserverUnit.cs delete mode 100644 src/Pole.Core/Abstraction/IObserverUnitContainer.cs create mode 100644 src/Pole.Core/Abstraction/ITypeFinder.cs create mode 100644 src/Pole.Core/EventBus/Event/EventAttribute.cs create mode 100644 src/Pole.Core/EventBus/Event/EventBase.cs create mode 100644 src/Pole.Core/EventBus/Event/EventBytesTransport.cs delete mode 100644 src/Pole.Core/EventBus/Event/EventInfoAttribute.cs create mode 100644 src/Pole.Core/EventBus/Event/FullyEvent.cs create mode 100644 src/Pole.Core/EventBus/EventHandler/EventHandlerAttribute.cs create mode 100644 src/Pole.Core/EventBus/EventHandler/PoleEventHandler.cs create mode 100644 src/Pole.Core/EventBus/EventHandler/PoleEventHandlerBase.cs create mode 100644 src/Pole.Core/EventBus/IGrainID.cs create mode 100644 src/Pole.Core/EventBus/IObserverUnit.cs create mode 100644 src/Pole.Core/EventBus/IObserverUnitContainer.cs create mode 100644 src/Pole.Core/EventBus/ObserverUnit.cs create mode 100644 src/Pole.Core/EventBus/ObserverUnitContainer.cs create mode 100644 src/Pole.Core/Serialization/ISerializer.cs create mode 100644 src/Pole.Core/Serialization/TransportType.cs diff --git a/src/Pole.Core/Abstraction/IGrainID.cs b/src/Pole.Core/Abstraction/IGrainID.cs deleted file mode 100644 index 9b35a78..0000000 --- a/src/Pole.Core/Abstraction/IGrainID.cs +++ /dev/null @@ -1,9 +0,0 @@ -using System; - -namespace Pole.Core.Abstractions -{ - public interface IGrainID - { - Type GrainType { get; } - } -} diff --git a/src/Pole.Core/Abstraction/IObserverUnit.cs b/src/Pole.Core/Abstraction/IObserverUnit.cs deleted file mode 100644 index 32b086d..0000000 --- a/src/Pole.Core/Abstraction/IObserverUnit.cs +++ /dev/null @@ -1,26 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Threading.Tasks; - -namespace Pole.Core.Abstractions -{ - public interface IObserverUnit : IGrainID - { - /// - /// 获取所有监听者分组 - /// - /// - List GetGroups(); - Task GetAndSaveVersion(PrimaryKey primaryKey, long srcVersion); - /// - /// 重置Grain - /// - /// 重置Grain - /// - Task Reset(PrimaryKey primaryKey); - List> GetEventHandlers(string observerGroup); - List> GetAllEventHandlers(); - List, Task>> GetBatchEventHandlers(string observerGroup); - List, Task>> GetAllBatchEventHandlers(); - } -} diff --git a/src/Pole.Core/Abstraction/IObserverUnitContainer.cs b/src/Pole.Core/Abstraction/IObserverUnitContainer.cs deleted file mode 100644 index 5da6d1e..0000000 --- a/src/Pole.Core/Abstraction/IObserverUnitContainer.cs +++ /dev/null @@ -1,11 +0,0 @@ -using System; - -namespace Pole.Core.Abstractions -{ - public interface IObserverUnitContainer - { - IObserverUnit GetUnit(Type grainType); - object GetUnit(Type grainType); - void Register(IGrainID followUnit); - } -} diff --git a/src/Pole.Core/Abstraction/ITypeFinder.cs b/src/Pole.Core/Abstraction/ITypeFinder.cs new file mode 100644 index 0000000..23aa2ef --- /dev/null +++ b/src/Pole.Core/Abstraction/ITypeFinder.cs @@ -0,0 +1,12 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace Pole.Core.Abstraction +{ + public interface ITypeFinder + { + Type FindType(string code); + string GetCode(Type type); + } +} diff --git a/src/Pole.Core/EventBus/Event/EventAttribute.cs b/src/Pole.Core/EventBus/Event/EventAttribute.cs new file mode 100644 index 0000000..62e5063 --- /dev/null +++ b/src/Pole.Core/EventBus/Event/EventAttribute.cs @@ -0,0 +1,12 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace Pole.Core.EventBus.Event +{ + [AttributeUsage(AttributeTargets.Class)] + public class EventAttribute: Attribute + { + public string EventName { get; set; } + } +} diff --git a/src/Pole.Core/EventBus/Event/EventBase.cs b/src/Pole.Core/EventBus/Event/EventBase.cs new file mode 100644 index 0000000..4f5167f --- /dev/null +++ b/src/Pole.Core/EventBus/Event/EventBase.cs @@ -0,0 +1,31 @@ +using Pole.Core.Utils; +using System; +using System.Collections.Generic; +using System.Text; + +namespace Pole.Core.EventBus.Event +{ + public class EventBase + { + public EventBase() { } + public EventBase(long version, long timestamp) + { + Version = version; + Timestamp = timestamp; + } + public long Version { get; set; } + public long Timestamp { get; set; } + public byte[] GetBytes() + { + using var ms = new PooledMemoryStream(); + ms.Write(BitConverter.GetBytes(Version)); + ms.Write(BitConverter.GetBytes(Timestamp)); + return ms.ToArray(); + } + public static EventBase FromBytes(byte[] bytes) + { + var bytesSpan = bytes.AsSpan(); + return new EventBase(BitConverter.ToInt64(bytesSpan), BitConverter.ToInt64(bytesSpan.Slice(sizeof(long)))); + } + } +} diff --git a/src/Pole.Core/EventBus/Event/EventBytesTransport.cs b/src/Pole.Core/EventBus/Event/EventBytesTransport.cs new file mode 100644 index 0000000..0341fcb --- /dev/null +++ b/src/Pole.Core/EventBus/Event/EventBytesTransport.cs @@ -0,0 +1,127 @@ +using Pole.Core.Serialization; +using Pole.Core.Utils; +using System; +using System.Collections.Generic; +using System.Text; + +namespace Pole.Core.EventBus.Event +{ + public readonly struct EventBytesTransport + { + public EventBytesTransport(string eventCode, object grainId, byte[] baseBytes, byte[] eventBytes) + { + EventTypeCode = eventCode; + GrainId = grainId; + BaseBytes = baseBytes; + EventBytes = eventBytes; + } + /// + /// 事件TypeCode + /// + public string EventTypeCode { get; } + /// + /// 事件GrainId + /// + public object GrainId { get; } + /// + /// 事件base信息的bytes + /// + public byte[] BaseBytes { get; } + /// + /// 事件本身的bytes + /// + public byte[] EventBytes { get; } + public byte[] GetBytes() + { + var eventTypeBytes = Encoding.UTF8.GetBytes(EventTypeCode); + byte[] actorIdBytes; + var strId = GrainId as string; + actorIdBytes = Encoding.UTF8.GetBytes(strId); + using var ms = new PooledMemoryStream(); + ms.WriteByte((byte)TransportType.Event); + ms.Write(BitConverter.GetBytes((ushort)eventTypeBytes.Length)); + ms.Write(BitConverter.GetBytes((ushort)actorIdBytes.Length)); + ms.Write(BitConverter.GetBytes((ushort)BaseBytes.Length)); + ms.Write(BitConverter.GetBytes(EventBytes.Length)); + ms.Write(eventTypeBytes); + ms.Write(actorIdBytes); + ms.Write(BaseBytes); + ms.Write(EventBytes); + return ms.ToArray(); + } + public static (bool success, PrimaryKey actorId) GetActorId(byte[] bytes) + { + if (bytes[0] == (byte)TransportType.Event) + { + var bytesSpan = bytes.AsSpan(); + var eventTypeLength = BitConverter.ToUInt16(bytesSpan.Slice(1, sizeof(ushort))); + var actorIdBytesLength = BitConverter.ToUInt16(bytesSpan.Slice(1 + sizeof(ushort), sizeof(ushort))); + if (typeof(PrimaryKey) == typeof(long)) + { + var id = BitConverter.ToInt64(bytesSpan.Slice(3 * sizeof(ushort) + 1 + sizeof(int) + eventTypeLength, actorIdBytesLength)); + if (id is PrimaryKey actorId) + return (true, actorId); + } + else + { + var id = Encoding.UTF8.GetString(bytesSpan.Slice(3 * sizeof(ushort) + 1 + sizeof(int) + eventTypeLength, actorIdBytesLength)); + if (id is PrimaryKey actorId) + return (true, actorId); + } + } + return (false, default); + } + public static (bool success, EventBytesTransport transport) FromBytesWithNoId(byte[] bytes) + { + if (bytes[0] == (byte)TransportType.Event) + { + var bytesSpan = bytes.AsSpan(); + var eventTypeLength = BitConverter.ToUInt16(bytesSpan.Slice(1, sizeof(ushort))); + var actorIdBytesLength = BitConverter.ToUInt16(bytesSpan.Slice(1 + sizeof(ushort), sizeof(ushort))); + var baseBytesLength = BitConverter.ToUInt16(bytesSpan.Slice(2 * sizeof(ushort) + 1, sizeof(ushort))); + var eventBytesLength = BitConverter.ToInt32(bytesSpan.Slice(3 * sizeof(ushort) + 1, sizeof(int))); + var skipLength = 3 * sizeof(ushort) + 1 + sizeof(int); + return (true, new EventBytesTransport( + Encoding.UTF8.GetString(bytesSpan.Slice(skipLength, eventTypeLength)), + null, + bytesSpan.Slice(skipLength + eventTypeLength + actorIdBytesLength, baseBytesLength).ToArray(), + bytesSpan.Slice(skipLength + eventTypeLength + actorIdBytesLength + baseBytesLength, eventBytesLength).ToArray() + )); + } + return (false, default); + } + public static (bool success, EventBytesTransport transport) FromBytes(byte[] bytes) + { + if (bytes[0] == (byte)TransportType.Event) + { + var bytesSpan = bytes.AsSpan(); + var eventTypeLength = BitConverter.ToUInt16(bytesSpan.Slice(1, sizeof(ushort))); + var actorIdBytesLength = BitConverter.ToUInt16(bytesSpan.Slice(1 + sizeof(ushort), sizeof(ushort))); + var baseBytesLength = BitConverter.ToUInt16(bytesSpan.Slice(2 * sizeof(ushort) + 1, sizeof(ushort))); + var eventBytesLength = BitConverter.ToInt32(bytesSpan.Slice(3 * sizeof(ushort) + 1, sizeof(int))); + var skipLength = 3 * sizeof(ushort) + 1 + sizeof(int); + if (typeof(PrimaryKey) == typeof(long)) + { + var actorId = BitConverter.ToInt64(bytesSpan.Slice(3 * sizeof(ushort) + 1 + sizeof(int) + eventTypeLength, actorIdBytesLength)); + return (true, new EventBytesTransport( + Encoding.UTF8.GetString(bytesSpan.Slice(skipLength, eventTypeLength)), + actorId, + bytesSpan.Slice(skipLength + eventTypeLength + actorIdBytesLength, baseBytesLength).ToArray(), + bytesSpan.Slice(skipLength + eventTypeLength + actorIdBytesLength + baseBytesLength, eventBytesLength).ToArray() + )); + } + else + { + var actorId = Encoding.UTF8.GetString(bytesSpan.Slice(skipLength + eventTypeLength, actorIdBytesLength)); + return (true, new EventBytesTransport( + Encoding.UTF8.GetString(bytesSpan.Slice(skipLength, eventTypeLength)), + actorId, + bytesSpan.Slice(skipLength + eventTypeLength + actorIdBytesLength, baseBytesLength).ToArray(), + bytesSpan.Slice(skipLength + eventTypeLength + actorIdBytesLength + baseBytesLength, eventBytesLength).ToArray() + )); + } + } + return (false, default); + } + } +} diff --git a/src/Pole.Core/EventBus/Event/EventInfoAttribute.cs b/src/Pole.Core/EventBus/Event/EventInfoAttribute.cs deleted file mode 100644 index a75cdea..0000000 --- a/src/Pole.Core/EventBus/Event/EventInfoAttribute.cs +++ /dev/null @@ -1,12 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Text; - -namespace Pole.Core.EventBus.Event -{ - [AttributeUsage(AttributeTargets.Class)] - public class EventInfoAttribute: Attribute - { - public string SendBoxName { get; set; } - } -} diff --git a/src/Pole.Core/EventBus/Event/FullyEvent.cs b/src/Pole.Core/EventBus/Event/FullyEvent.cs new file mode 100644 index 0000000..aa0acef --- /dev/null +++ b/src/Pole.Core/EventBus/Event/FullyEvent.cs @@ -0,0 +1,13 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace Pole.Core.EventBus.Event +{ + public class FullyEvent + { + public IEvent Event { get; set; } + public EventBase Base { get; set; } + public PrimaryKey StateId { get; set; } + } +} diff --git a/src/Pole.Core/EventBus/EventHandler/EventHandlerAttribute.cs b/src/Pole.Core/EventBus/EventHandler/EventHandlerAttribute.cs new file mode 100644 index 0000000..0be1e5d --- /dev/null +++ b/src/Pole.Core/EventBus/EventHandler/EventHandlerAttribute.cs @@ -0,0 +1,13 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace Pole.Core.EventBus.EventHandler +{ + [AttributeUsage(AttributeTargets.Class)] + public class EventHandlerAttribute: Attribute + { + public string EventName { get; set; } + public string EventHandlerName { get; set; } + } +} diff --git a/src/Pole.Core/EventBus/EventHandler/PoleEventHandler.cs b/src/Pole.Core/EventBus/EventHandler/PoleEventHandler.cs new file mode 100644 index 0000000..eb56f7f --- /dev/null +++ b/src/Pole.Core/EventBus/EventHandler/PoleEventHandler.cs @@ -0,0 +1,21 @@ +using Orleans.Concurrency; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; + +namespace Pole.Core.EventBus.EventHandler +{ + public class PoleEventHandler : PoleEventHandlerBase + { + public override Task BatchInvoke(Immutable> bytes) + { + throw new NotImplementedException(); + } + + public override Task Invoke(Immutable bytes) + { + throw new NotImplementedException(); + } + } +} diff --git a/src/Pole.Core/EventBus/EventHandler/PoleEventHandlerBase.cs b/src/Pole.Core/EventBus/EventHandler/PoleEventHandlerBase.cs new file mode 100644 index 0000000..372957e --- /dev/null +++ b/src/Pole.Core/EventBus/EventHandler/PoleEventHandlerBase.cs @@ -0,0 +1,15 @@ +using Orleans; +using Orleans.Concurrency; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; + +namespace Pole.Core.EventBus.EventHandler +{ + public abstract class PoleEventHandlerBase : Grain + { + public abstract Task Invoke(Immutable bytes); + public abstract Task BatchInvoke(Immutable> bytes); + } +} diff --git a/src/Pole.Core/EventBus/IGrainID.cs b/src/Pole.Core/EventBus/IGrainID.cs new file mode 100644 index 0000000..6397084 --- /dev/null +++ b/src/Pole.Core/EventBus/IGrainID.cs @@ -0,0 +1,9 @@ +using System; + +namespace Pole.Core.EventBus +{ + public interface IGrainID + { + Type GrainType { get; } + } +} diff --git a/src/Pole.Core/EventBus/IObserverUnit.cs b/src/Pole.Core/EventBus/IObserverUnit.cs new file mode 100644 index 0000000..927c492 --- /dev/null +++ b/src/Pole.Core/EventBus/IObserverUnit.cs @@ -0,0 +1,12 @@ +using System; +using System.Collections.Generic; +using System.Threading.Tasks; + +namespace Pole.Core.EventBus +{ + public interface IObserverUnit : IGrainID + { + List> GetEventHandlers(); + List, Task>> GetBatchEventHandlers(); + } +} diff --git a/src/Pole.Core/EventBus/IObserverUnitContainer.cs b/src/Pole.Core/EventBus/IObserverUnitContainer.cs new file mode 100644 index 0000000..2f0cc91 --- /dev/null +++ b/src/Pole.Core/EventBus/IObserverUnitContainer.cs @@ -0,0 +1,12 @@ +using System; +using System.Collections.Generic; + +namespace Pole.Core.EventBus +{ + public interface IObserverUnitContainer + { + List> GetUnits(string observerName); + List GetUnits(string observerName); + void Register(string observerName,IGrainID followUnit); + } +} diff --git a/src/Pole.Core/EventBus/ObserverUnit.cs b/src/Pole.Core/EventBus/ObserverUnit.cs new file mode 100644 index 0000000..b5aaef7 --- /dev/null +++ b/src/Pole.Core/EventBus/ObserverUnit.cs @@ -0,0 +1,174 @@ +using Microsoft.Extensions.Logging; +using Orleans; +using Pole.Core.Serialization; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; +using Microsoft.Extensions.DependencyInjection; +using Pole.Core.Abstraction; +using System.Linq; +using Pole.Core.EventBus.Event; +using Orleans.Concurrency; +using System.Collections.Concurrent; +using System.Linq.Expressions; +using Pole.Core.EventBus.EventHandler; + +namespace Pole.Core.EventBus +{ + public class ObserverUnit : IObserverUnit + { + readonly IServiceProvider serviceProvider; + readonly ISerializer serializer; + readonly ITypeFinder typeFinder; + readonly IClusterClient clusterClient; + readonly List> eventHandlers = new List>(); + readonly List, Task>> batchEventHandlers = new List, Task>>(); + protected ILogger Logger { get; private set; } + public Type GrainType { get; } + + public ObserverUnit(IServiceProvider serviceProvider, Type grainType) + { + this.serviceProvider = serviceProvider; + clusterClient = serviceProvider.GetService(); + serializer = serviceProvider.GetService(); + typeFinder = serviceProvider.GetService(); + Logger = serviceProvider.GetService>>(); + GrainType = grainType; + } + public static ObserverUnit From(IServiceProvider serviceProvider) where Grain : Orleans.Grain + { + return new ObserverUnit(serviceProvider, typeof(Grain)); + } + + public List> GetEventHandlers() + { + return eventHandlers; + } + + public List, Task>> GetBatchEventHandlers() + { + return batchEventHandlers; + } + public ObserverUnit UnreliableObserver( + Func, ValueTask> handler) + { + GetEventHandlers().Add(EventHandler); + GetBatchEventHandlers().Add(BatchEventHandler); + return this; + //内部函数 + Task EventHandler(byte[] bytes) + { + var (success, transport) = EventBytesTransport.FromBytes(bytes); + if (success) + { + var data = serializer.Deserialize(transport.EventBytes, typeFinder.FindType(transport.EventTypeCode)); + if (data is IEvent @event && transport.GrainId is PrimaryKey actorId) + { + var eventBase = EventBase.FromBytes(transport.BaseBytes); + var tellTask = handler(serviceProvider, new FullyEvent + { + StateId = actorId, + Base = eventBase, + Event = @event + }); + if (!tellTask.IsCompletedSuccessfully) + return tellTask.AsTask(); + } + } + return Task.CompletedTask; + } + Task BatchEventHandler(List list) + { + var groups = + list.Select(b => EventBytesTransport.FromBytes(b)) + .Where(o => o.success) + .Select(o => o.transport) + .GroupBy(o => o.GrainId); + return Task.WhenAll(groups.Select(async kv => + { + foreach (var transport in kv) + { + var data = serializer.Deserialize(transport.EventBytes, typeFinder.FindType(transport.EventTypeCode)); + if (data is IEvent @event && transport.GrainId is PrimaryKey actorId) + { + var eventBase = EventBase.FromBytes(transport.BaseBytes); + var tellTask = handler(serviceProvider, new FullyEvent + { + StateId = actorId, + Base = eventBase, + Event = @event + }); + if (!tellTask.IsCompletedSuccessfully) + await tellTask; + } + } + })); + } + } + public void Observer(Type observerType) + { + if (!typeof(PoleEventHandlerBase).IsAssignableFrom(observerType)) + throw new NotSupportedException($"{observerType.FullName} must inheritance from PoleEventHandler"); + GetEventHandlers().Add(EventHandler); + GetBatchEventHandlers().Add(BatchEventHandler); + //内部函数 + Task EventHandler(byte[] bytes) + { + var (success, actorId) = EventBytesTransport.GetActorId(bytes); + if (success) + { + return GetObserver(observerType, actorId).Invoke(new Immutable(bytes)); + + } + else + { + if (Logger.IsEnabled(LogLevel.Error)) + Logger.LogError($"{nameof(EventBytesTransport.GetActorId)} failed"); + } + return Task.CompletedTask; + } + Task BatchEventHandler(List list) + { + var groups = list.Select(bytes => + { + var (success, GrainId) = EventBytesTransport.GetActorId(bytes); + if (!success) + { + if (Logger.IsEnabled(LogLevel.Error)) + Logger.LogError($"{nameof(EventBytesTransport.GetActorId)} failed"); + } + return (success, GrainId, bytes); + }).Where(o => o.success).GroupBy(o => o.GrainId); + return Task.WhenAll(groups.Select(kv => + { + var items = kv.Select(item => item.bytes).ToList(); + return GetObserver(observerType, kv.Key).BatchInvoke(new Immutable>(items)); + })); + } + } + static readonly ConcurrentDictionary> _observerGeneratorDict = new ConcurrentDictionary>(); + private PoleEventHandlerBase GetObserver(Type ObserverType, PrimaryKey primaryKey) + { + var func = _observerGeneratorDict.GetOrAdd(ObserverType, key => + { + var clientType = typeof(IClusterClient); + var clientParams = Expression.Parameter(clientType, "client"); + var primaryKeyParams = Expression.Parameter(typeof(PrimaryKey), "primaryKey"); + var grainClassNamePrefixParams = Expression.Parameter(typeof(string), "grainClassNamePrefix"); + var method = typeof(ClusterClientExtensions).GetMethod("GetGrain", new Type[] { clientType, typeof(PrimaryKey), typeof(string) }); + var body = Expression.Call(method.MakeGenericMethod(ObserverType), clientParams, primaryKeyParams, grainClassNamePrefixParams); + return Expression.Lambda>(body, clientParams, primaryKeyParams, grainClassNamePrefixParams).Compile(); + }); + return func(clusterClient, primaryKey, null); + } + } + public static class ClusterClientExtensions + { + public static TGrainInterface GetGrain(IClusterClient client, string primaryKey, string grainClassNamePrefix = null) where TGrainInterface : IGrainWithStringKey + { + return client.GetGrain(primaryKey, grainClassNamePrefix); + } + } +} diff --git a/src/Pole.Core/EventBus/ObserverUnitContainer.cs b/src/Pole.Core/EventBus/ObserverUnitContainer.cs new file mode 100644 index 0000000..aca9142 --- /dev/null +++ b/src/Pole.Core/EventBus/ObserverUnitContainer.cs @@ -0,0 +1,80 @@ +using Microsoft.Extensions.Logging; +using Pole.Core.Observer; +using Pole.Core.Utils; +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Text; +using Microsoft.Extensions.DependencyInjection; +using Pole.Core.EventBus.EventHandler; +using System.Linq; +using Pole.Core.Exceptions; + +namespace Pole.Core.EventBus +{ + public class ObserverUnitContainer : IObserverUnitContainer + { + readonly ConcurrentDictionary> unitDict = new ConcurrentDictionary>(); + public ObserverUnitContainer(IServiceProvider serviceProvider) + { + var eventHandlerList = new List<(Type, EventHandlerAttribute)>(); + foreach (var assembly in AssemblyHelper.GetAssemblies(serviceProvider.GetService>())) + { + foreach (var type in assembly.GetTypes()) + { + foreach (var attribute in type.GetCustomAttributes(false)) + { + if (attribute is EventHandlerAttribute observer) + { + eventHandlerList.Add((type, observer)); + break; + } + } + } + } + foreach (var eventHandler in eventHandlerList) + { + var unitType = typeof(ObserverUnit<>).MakeGenericType(new Type[] { typeof(string) }); + var unit = (ObserverUnit)Activator.CreateInstance(unitType, serviceProvider, eventHandler.Item1); + + Register(eventHandler.Item2.EventName, unit); + } + } + public List> GetUnits(string observerName) + { + if (unitDict.TryGetValue(observerName, out var units)) + { + if (units is List> result) + { + return result; + } + else + throw new UnmatchObserverUnitException(observerName); + } + else + throw new UnfindObserverUnitException(observerName); + } + public List GetUnits(string observerName) + { + if (unitDict.TryGetValue(observerName, out var unit)) + { + return unit; + } + else + throw new UnfindObserverUnitException(observerName); + } + + public void Register(string observerName, IGrainID observerUnit) + { + if (unitDict.TryGetValue(observerName, out List units)) + { + units.Add(observerUnit); + } + if (!unitDict.TryAdd(observerName, new List { observerUnit })) + { + throw new ObserverUnitRepeatedException(observerUnit.GrainType.FullName); + } + } + + } +} diff --git a/src/Pole.Core/Exceptions/UnMatchObserverUnitException.cs b/src/Pole.Core/Exceptions/UnMatchObserverUnitException.cs index f4e60e4..51f2bf1 100644 --- a/src/Pole.Core/Exceptions/UnMatchObserverUnitException.cs +++ b/src/Pole.Core/Exceptions/UnMatchObserverUnitException.cs @@ -4,7 +4,7 @@ namespace Pole.Core.Exceptions { public class UnmatchObserverUnitException : Exception { - public UnmatchObserverUnitException(string grainName, string unitName) : base($"{unitName} and {grainName} do not match") + public UnmatchObserverUnitException(string unitName) : base($"{unitName} do not match") { } } diff --git a/src/Pole.Core/Serialization/ISerializer.cs b/src/Pole.Core/Serialization/ISerializer.cs new file mode 100644 index 0000000..8a135ec --- /dev/null +++ b/src/Pole.Core/Serialization/ISerializer.cs @@ -0,0 +1,18 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace Pole.Core.Serialization +{ + public interface ISerializer + { + object Deserialize(byte[] bytes, Type type); + T Deserialize(byte[] bytes) where T : class, new(); + T Deserialize(string json) where T : class, new(); + object Deserialize(string json, Type type); + string Serialize(T data) where T : class, new(); + string Serialize(object data, Type type); + byte[] SerializeToUtf8Bytes(T data) where T : class, new(); + byte[] SerializeToUtf8Bytes(object data, Type type); + } +} diff --git a/src/Pole.Core/Serialization/TransportType.cs b/src/Pole.Core/Serialization/TransportType.cs new file mode 100644 index 0000000..2969288 --- /dev/null +++ b/src/Pole.Core/Serialization/TransportType.cs @@ -0,0 +1,21 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace Pole.Core.Serialization +{ + /// + /// 消息序列化传输的类别 + /// + public enum TransportType : byte + { + /// + /// 通用序列化消息 + /// + Common = 0, + /// + /// 事件序列化消息 + /// + Event = 1 + } +} diff --git a/src/Pole.EventBus.Rabbitmq/Core/EventBusContainer.cs b/src/Pole.EventBus.Rabbitmq/Core/EventBusContainer.cs index 2842b21..1fd889a 100644 --- a/src/Pole.EventBus.Rabbitmq/Core/EventBusContainer.cs +++ b/src/Pole.EventBus.Rabbitmq/Core/EventBusContainer.cs @@ -2,7 +2,6 @@ using Microsoft.Extensions.Logging; using Orleans; using RabbitMQ.Client; -using Pole.Core.Abstractions; using Pole.Core.EventBus; using Pole.Core.Exceptions; using Pole.Core.Utils; @@ -10,6 +9,9 @@ using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Threading.Tasks; +using Pole.Core.EventBus.Event; +using Pole.Core.EventBus.EventHandler; +using Microsoft.Extensions.Options; namespace Pole.EventBus.RabbitMQ { @@ -20,58 +22,45 @@ namespace Pole.EventBus.RabbitMQ readonly IRabbitMQClient rabbitMQClient; readonly IServiceProvider serviceProvider; private readonly IObserverUnitContainer observerUnitContainer; + private readonly RabbitOptions rabbitOptions; public EventBusContainer( IServiceProvider serviceProvider, IObserverUnitContainer observerUnitContainer, - IRabbitMQClient rabbitMQClient) + IRabbitMQClient rabbitMQClient, + IOptions rabbitOptions) { this.serviceProvider = serviceProvider; this.rabbitMQClient = rabbitMQClient; this.observerUnitContainer = observerUnitContainer; + this.rabbitOptions = rabbitOptions.Value; } public async Task AutoRegister() { var observableList = new List<(Type type, ProducerAttribute config)>(); - foreach (var assembly in AssemblyHelper.GetAssemblies(serviceProvider.GetService>())) + var eventList = new List<(Type type, EventAttribute config)>(); + var evenHandlertList = new List<(Type type, EventHandlerAttribute config)>(); + AddEventAndEventHandlerInfoList(eventList, evenHandlertList); + foreach (var (type, config) in eventList) { - foreach (var type in assembly.GetTypes()) - { - foreach (var attribute in type.GetCustomAttributes(false)) - { - if (attribute is ProducerAttribute config) - { - observableList.Add((type, config)); - break; - } - } - } + var eventName = string.IsNullOrEmpty(config.EventName) ? type.Name : config.EventName; + var eventBus = CreateEventBus(eventName, rabbitOptions.Prefix, 2, true, true, true).BindEvent(type, eventName); + await eventBus.AddGrainConsumer(); } - foreach (var (type, config) in observableList) + foreach (var (type, config) in evenHandlertList) { - var eventBus = CreateEventBus(string.IsNullOrEmpty(config.Exchange) ? type.Name : config.Exchange, string.IsNullOrEmpty(config.RoutePrefix) ? type.Name : config.RoutePrefix, config.LBCount, config.AutoAck, config.Reenqueue, config.Persistent).BindProducer(type); - if (typeof(IGrainWithIntegerKey).IsAssignableFrom(type)) - { - await eventBus.AddGrainConsumer(); - } - else if (typeof(IGrainWithStringKey).IsAssignableFrom(type)) - { - await eventBus.AddGrainConsumer(); - } - else - throw new PrimaryKeyTypeException(type.FullName); + var eventName = string.IsNullOrEmpty(config.EventName) ? type.Name : config.EventName; + var eventBus = CreateEventBus(eventName, rabbitOptions.Prefix, 2, true, true, true).BindEvent(type, eventName); + await eventBus.AddGrainConsumer(); } } - public RabbitEventBus CreateEventBus(string exchange, string routePrefix, int lBCount = 1, bool autoAck = false, bool reenqueue = false, bool persistent = false) + + public RabbitEventBus CreateEventBus(string exchange, string routePrefix, int lBCount = 1, bool autoAck = true, bool reenqueue = true, bool persistent = true) { return new RabbitEventBus(observerUnitContainer, this, exchange, routePrefix, lBCount, autoAck, reenqueue, persistent); } - public RabbitEventBus CreateEventBus(string exchange, string routePrefix, int lBCount = 1, bool autoAck = false, bool reenqueue = false, bool persistent = false) - { - return CreateEventBus(exchange, routePrefix, lBCount, autoAck, reenqueue, persistent).BindProducer(); - } public Task Work(RabbitEventBus bus) { - if (eventBusDictionary.TryAdd(bus.ProducerType, bus)) + if (eventBusDictionary.TryAdd(bus.Event, bus)) { eventBusList.Add(bus); using var channel = rabbitMQClient.PullModel(); @@ -79,7 +68,7 @@ namespace Pole.EventBus.RabbitMQ return Task.CompletedTask; } else - throw new EventBusRepeatException(bus.ProducerType.FullName); + throw new EventBusRepeatException(bus.Event.FullName); } readonly ConcurrentDictionary producerDict = new ConcurrentDictionary(); @@ -110,5 +99,40 @@ namespace Pole.EventBus.RabbitMQ } return result; } + + + #region helpers + private void AddEventAndEventHandlerInfoList(List<(Type type, EventAttribute config)> eventList, List<(Type type, EventHandlerAttribute config)> evenHandlertList) + { + foreach (var assembly in AssemblyHelper.GetAssemblies(serviceProvider.GetService>())) + { + foreach (var type in assembly.GetTypes()) + { + foreach (var attribute in type.GetCustomAttributes(false)) + { + if (attribute is EventAttribute config) + { + eventList.Add((type, config)); + break; + } + } + } + } + foreach (var assembly in AssemblyHelper.GetAssemblies(serviceProvider.GetService>())) + { + foreach (var type in assembly.GetTypes()) + { + foreach (var attribute in type.GetCustomAttributes(false)) + { + if (attribute is EventHandlerAttribute config) + { + evenHandlertList.Add((type, config)); + break; + } + } + } + } + } + #endregion } } diff --git a/src/Pole.EventBus.Rabbitmq/Core/IRabbitEventBusContainer.cs b/src/Pole.EventBus.Rabbitmq/Core/IRabbitEventBusContainer.cs index 9730109..d90fce0 100644 --- a/src/Pole.EventBus.Rabbitmq/Core/IRabbitEventBusContainer.cs +++ b/src/Pole.EventBus.Rabbitmq/Core/IRabbitEventBusContainer.cs @@ -7,7 +7,6 @@ namespace Pole.EventBus.RabbitMQ { Task AutoRegister(); RabbitEventBus CreateEventBus(string exchange, string routePrefix, int lBCount = 1, bool autoAck = false, bool reenqueue = false, bool persistent = false); - RabbitEventBus CreateEventBus(string routePrefix, string queue, int lBCount = 1, bool autoAck = false, bool reenqueue = false, bool persistent = false); Task Work(RabbitEventBus bus); } } diff --git a/src/Pole.EventBus.Rabbitmq/Core/RabbitEventBus.cs b/src/Pole.EventBus.Rabbitmq/Core/RabbitEventBus.cs index 00b9a0a..35c35a2 100644 --- a/src/Pole.EventBus.Rabbitmq/Core/RabbitEventBus.cs +++ b/src/Pole.EventBus.Rabbitmq/Core/RabbitEventBus.cs @@ -1,4 +1,4 @@ -using Pole.Core.Abstractions; +using Pole.Core.EventBus; using Pole.Core.Exceptions; using Pole.Core.Utils; using System; @@ -34,18 +34,7 @@ namespace Pole.EventBus.RabbitMQ AutoAck = autoAck, Reenqueue = reenqueue, }; - RouteList = new List(); - if (LBCount == 1) - { - RouteList.Add(routePrefix); - } - else - { - for (int i = 0; i < LBCount; i++) - { - RouteList.Add($"{routePrefix }_{ i.ToString()}"); - } - } + RouteList = new List() { $"{routePrefix }" }; _CHash = new ConsistentHash(RouteList, lBCount * 10); } public IRabbitEventBusContainer Container { get; } @@ -54,7 +43,8 @@ namespace Pole.EventBus.RabbitMQ public int LBCount { get; } public ConsumerOptions ConsumerConfig { get; set; } public List RouteList { get; } - public Type ProducerType { get; set; } + public Type Event { get; set; } + public string EventName { get; set; } /// /// 消息是否持久化 /// @@ -64,31 +54,28 @@ namespace Pole.EventBus.RabbitMQ { return LBCount == 1 ? RoutePrefix : _CHash.GetNode(key); ; } - public RabbitEventBus BindProducer() - { - return BindProducer(typeof(TGrain)); - } - public RabbitEventBus BindProducer(Type grainType) + public RabbitEventBus BindEvent(Type eventType, string eventName) { - if (ProducerType == null) - ProducerType = grainType; - else - throw new EventBusRepeatBindingProducerException(grainType.FullName); + Event = eventType; + EventName = eventName; return this; } - public RabbitEventBus AddGrainConsumer(string observerGroup) + public Task AddGrainConsumer() { - var observerUnit = observerUnitContainer.GetUnit(ProducerType); - var consumer = new RabbitConsumer( - observerUnit.GetEventHandlers(observerGroup), - observerUnit.GetBatchEventHandlers(observerGroup)) + var observerUnits = observerUnitContainer.GetUnits(EventName); + foreach (var observerUnit in observerUnits) { - EventBus = this, - QueueList = RouteList.Select(route => new QueueInfo { RoutingKey = route, Queue = $"{route}_{observerGroup}" }).ToList(), - Config = ConsumerConfig - }; - Consumers.Add(consumer); - return this; + var consumer = new RabbitConsumer( + observerUnit.GetEventHandlers(), + observerUnit.GetBatchEventHandlers()) + { + EventBus = this, + QueueList = RouteList.Select(route => new QueueInfo { RoutingKey = "", Queue = $"{route}_{EventName}" }).ToList(), + Config = ConsumerConfig + }; + Consumers.Add(consumer); + } + return Enable(); } public RabbitEventBus AddConsumer( Func handler, @@ -110,13 +97,5 @@ namespace Pole.EventBus.RabbitMQ { return Container.Work(this); } - public Task AddGrainConsumer() - { - foreach (var group in observerUnitContainer.GetUnit(ProducerType).GetGroups()) - { - AddGrainConsumer(group); - }; - return Enable(); - } } } diff --git a/src/Pole.EventBus.Rabbitmq/Producer/RabbitProducer.cs b/src/Pole.EventBus.Rabbitmq/Producer/RabbitProducer.cs index 79d27dd..25a5f1a 100644 --- a/src/Pole.EventBus.Rabbitmq/Producer/RabbitProducer.cs +++ b/src/Pole.EventBus.Rabbitmq/Producer/RabbitProducer.cs @@ -1,5 +1,4 @@ using Pole.Core; -using Pole.Core; using Pole.Core.EventBus; using System.Threading.Tasks; -- libgit2 0.25.0