From 988776d9e36c1ebd13e10dbb1ab82e50a4fdfde8 Mon Sep 17 00:00:00 2001 From: 丁松杰 <377973147@qq.com> Date: Wed, 12 Feb 2020 16:15:07 +0800 Subject: [PATCH] 完成 consumer 消费时 调用 orleans grain , grain 的id 由 eventid 决定,每一个类型的 event 每毫秒支持生成 64个 event ,并且在 k8s集群里 全局唯一 --- Pole.sln | 9 ++++++++- src/Pole.Core/Abstraction/IEventTypeFinder.cs | 12 ++++++++++++ src/Pole.Core/Abstraction/ITypeFinder.cs | 12 ------------ src/Pole.Core/Consts.cs | 2 ++ src/Pole.Core/EventBus/Bus.cs | 37 +++++++++++++++++++++++++++++++++++++ src/Pole.Core/EventBus/Event/EventBytesTransport.cs | 100 +++++++++++++++++----------------------------------------------------------------------------------- src/Pole.Core/EventBus/EventHandler/PoleEventHandler.cs | 92 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++----- src/Pole.Core/EventBus/EventHandler/PoleEventHandlerBase.cs | 5 +++-- src/Pole.Core/EventBus/IBus.cs | 13 +++++++++++++ src/Pole.Core/EventBus/IGrainID.cs | 2 +- src/Pole.Core/EventBus/IProducer.cs | 2 +- src/Pole.Core/EventBus/ObserverUnit.cs | 110 ++++++++++++++++++++++++++------------------------------------------------------------------------------------ src/Pole.Core/EventBus/ObserverUnitContainer.cs | 8 ++++---- src/Pole.Core/Exceptions/EventHandlerTargetMethodNotFoundException.cs | 14 ++++++++++++++ src/Pole.Core/Serialization/EventTypeFinder.cs | 63 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/Pole.Core/Utils/Abstraction/IInstanceIPV4_16IdGeneratorIdSolver.cs | 11 +++++++++++ src/Pole.Core/Utils/Abstraction/ISnowflakeIdGenerator.cs | 11 +++++++++++ src/Pole.Core/Utils/Emit/SwitchMethodEmit.cs | 37 ------------------------------------- src/Pole.Core/Utils/InstanceIPV4_16IdGeneratorIdSolver.cs | 35 +++++++++++++++++++++++++++++++++++ src/Pole.Core/Utils/SnowflakeIdGenerator.cs | 101 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/Pole.EventBus.Rabbitmq/Core/RabbitEventBus.cs | 5 +---- src/Pole.EventBus.Rabbitmq/Producer/RabbitProducer.cs | 4 ++-- test/Pole.Core.Test/Pole.Core.Test.csproj | 20 ++++++++++++++++++++ test/Pole.Core.Test/Utils/InstanceIPV4_16IdGeneratorIdSolverTest.cs | 18 ++++++++++++++++++ test/Pole.Core.Test/Utils/SnowflakeIdGeneratorTest.cs | 15 +++++++++++++++ 25 files changed, 502 insertions(+), 236 deletions(-) create mode 100644 src/Pole.Core/Abstraction/IEventTypeFinder.cs delete mode 100644 src/Pole.Core/Abstraction/ITypeFinder.cs create mode 100644 src/Pole.Core/EventBus/Bus.cs create mode 100644 src/Pole.Core/EventBus/IBus.cs create mode 100644 src/Pole.Core/Exceptions/EventHandlerTargetMethodNotFoundException.cs create mode 100644 src/Pole.Core/Serialization/EventTypeFinder.cs create mode 100644 src/Pole.Core/Utils/Abstraction/IInstanceIPV4_16IdGeneratorIdSolver.cs create mode 100644 src/Pole.Core/Utils/Abstraction/ISnowflakeIdGenerator.cs delete mode 100644 src/Pole.Core/Utils/Emit/SwitchMethodEmit.cs create mode 100644 src/Pole.Core/Utils/InstanceIPV4_16IdGeneratorIdSolver.cs create mode 100644 src/Pole.Core/Utils/SnowflakeIdGenerator.cs create mode 100644 test/Pole.Core.Test/Pole.Core.Test.csproj create mode 100644 test/Pole.Core.Test/Utils/InstanceIPV4_16IdGeneratorIdSolverTest.cs create mode 100644 test/Pole.Core.Test/Utils/SnowflakeIdGeneratorTest.cs diff --git a/Pole.sln b/Pole.sln index 8f3d917..87c7f66 100644 --- a/Pole.sln +++ b/Pole.sln @@ -45,7 +45,9 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "integrationEvents", "integr EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Product.IntegrationEvents", "samples\intergrationEvents\Product.IntegrationEvents\Product.IntegrationEvents.csproj", "{9C0DFC90-1AF9-424A-B5FB-2A7C3611970C}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Pole.EventBus.Rabbitmq", "src\Pole.EventBus.Rabbitmq\Pole.EventBus.Rabbitmq.csproj", "{BDF62A19-FFBD-4EE1-A07A-68472E680A95}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Pole.EventBus.Rabbitmq", "src\Pole.EventBus.Rabbitmq\Pole.EventBus.Rabbitmq.csproj", "{BDF62A19-FFBD-4EE1-A07A-68472E680A95}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Pole.Core.Test", "test\Pole.Core.Test\Pole.Core.Test.csproj", "{23EA8735-DB2E-4599-8902-8FCBCBE4799C}" EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution @@ -117,6 +119,10 @@ Global {BDF62A19-FFBD-4EE1-A07A-68472E680A95}.Debug|Any CPU.Build.0 = Debug|Any CPU {BDF62A19-FFBD-4EE1-A07A-68472E680A95}.Release|Any CPU.ActiveCfg = Release|Any CPU {BDF62A19-FFBD-4EE1-A07A-68472E680A95}.Release|Any CPU.Build.0 = Release|Any CPU + {23EA8735-DB2E-4599-8902-8FCBCBE4799C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {23EA8735-DB2E-4599-8902-8FCBCBE4799C}.Debug|Any CPU.Build.0 = Debug|Any CPU + {23EA8735-DB2E-4599-8902-8FCBCBE4799C}.Release|Any CPU.ActiveCfg = Release|Any CPU + {23EA8735-DB2E-4599-8902-8FCBCBE4799C}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -141,6 +147,7 @@ Global {74422E64-29FE-4287-A86E-741D1DFF6698} = {4A0FB696-EC29-4A5F-B40B-A6FC56001ADB} {9C0DFC90-1AF9-424A-B5FB-2A7C3611970C} = {74422E64-29FE-4287-A86E-741D1DFF6698} {BDF62A19-FFBD-4EE1-A07A-68472E680A95} = {9932C965-8B38-4F70-9E43-86DC56860E2B} + {23EA8735-DB2E-4599-8902-8FCBCBE4799C} = {655E719B-4A3E-467C-A541-E0770AB81DE1} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {DB0775A3-F293-4043-ADB7-72BAC081E87E} diff --git a/src/Pole.Core/Abstraction/IEventTypeFinder.cs b/src/Pole.Core/Abstraction/IEventTypeFinder.cs new file mode 100644 index 0000000..7c8084c --- /dev/null +++ b/src/Pole.Core/Abstraction/IEventTypeFinder.cs @@ -0,0 +1,12 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace Pole.Core.Abstraction +{ + public interface IEventTypeFinder + { + Type FindType(string code); + string GetCode(Type type); + } +} diff --git a/src/Pole.Core/Abstraction/ITypeFinder.cs b/src/Pole.Core/Abstraction/ITypeFinder.cs deleted file mode 100644 index 23aa2ef..0000000 --- a/src/Pole.Core/Abstraction/ITypeFinder.cs +++ /dev/null @@ -1,12 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Text; - -namespace Pole.Core.Abstraction -{ - public interface ITypeFinder - { - Type FindType(string code); - string GetCode(Type type); - } -} diff --git a/src/Pole.Core/Consts.cs b/src/Pole.Core/Consts.cs index 03081e2..94d54df 100644 --- a/src/Pole.Core/Consts.cs +++ b/src/Pole.Core/Consts.cs @@ -10,5 +10,7 @@ namespace Pole.Core public static ValueTask ValueTaskDone = new ValueTask(); public const string ConsumerRetryTimesStr = "pole-consumer-retry-times"; public const string ConsumerExceptionDetailsStr = "pole-consumer-exception-details"; + public const string EventHandlerMethodName = "EventHandle"; + public const string BatchEventsHandlerMethodName = "BatchEventsHandler"; } } diff --git a/src/Pole.Core/EventBus/Bus.cs b/src/Pole.Core/EventBus/Bus.cs new file mode 100644 index 0000000..be2f585 --- /dev/null +++ b/src/Pole.Core/EventBus/Bus.cs @@ -0,0 +1,37 @@ +using Pole.Core.Abstraction; +using Pole.Core.EventBus.Event; +using Pole.Core.Serialization; +using Pole.Core.Utils.Abstraction; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace Pole.Core.EventBus +{ + class Bus : IBus + { + private readonly IProducer producer; + private readonly IEventTypeFinder eventTypeFinder; + private readonly ISerializer serializer; + private readonly ISnowflakeIdGenerator snowflakeIdGenerator; + public Bus(IProducer producer, IEventTypeFinder eventTypeFinder, ISerializer serializer, ISnowflakeIdGenerator snowflakeIdGenerator) + { + this.producer = producer; + this.eventTypeFinder = eventTypeFinder; + this.serializer = serializer; + this.snowflakeIdGenerator = snowflakeIdGenerator; + } + public async Task Publish(object @event, CancellationToken cancellationToken = default) + { + var eventType = @event.GetType(); + var eventTypeCode = eventTypeFinder.GetCode(eventType); + var eventId = snowflakeIdGenerator.NextId(); + var bytesTransport = new EventBytesTransport(eventTypeCode, eventId, serializer.SerializeToUtf8Bytes(@event, eventType)); + var bytes = bytesTransport.GetBytes(); + await producer.Publish(bytes); + return true; + } + } +} diff --git a/src/Pole.Core/EventBus/Event/EventBytesTransport.cs b/src/Pole.Core/EventBus/Event/EventBytesTransport.cs index 0341fcb..48cd12b 100644 --- a/src/Pole.Core/EventBus/Event/EventBytesTransport.cs +++ b/src/Pole.Core/EventBus/Event/EventBytesTransport.cs @@ -8,25 +8,20 @@ namespace Pole.Core.EventBus.Event { public readonly struct EventBytesTransport { - public EventBytesTransport(string eventCode, object grainId, byte[] baseBytes, byte[] eventBytes) + public EventBytesTransport(string eventCode, string eventId, byte[] eventBytes) { EventTypeCode = eventCode; - GrainId = grainId; - BaseBytes = baseBytes; EventBytes = eventBytes; + EventId = eventId; } /// - /// 事件TypeCode - /// - public string EventTypeCode { get; } - /// - /// 事件GrainId + /// 每个类型的Event 全局唯一 /// - public object GrainId { get; } + public string EventId { get; } /// - /// 事件base信息的bytes + /// 事件TypeCode /// - public byte[] BaseBytes { get; } + public string EventTypeCode { get; } /// /// 事件本身的bytes /// @@ -34,92 +29,31 @@ namespace Pole.Core.EventBus.Event public byte[] GetBytes() { var eventTypeBytes = Encoding.UTF8.GetBytes(EventTypeCode); - byte[] actorIdBytes; - var strId = GrainId as string; - actorIdBytes = Encoding.UTF8.GetBytes(strId); + var eventIdBytes = Encoding.UTF8.GetBytes(EventId); using var ms = new PooledMemoryStream(); ms.WriteByte((byte)TransportType.Event); ms.Write(BitConverter.GetBytes((ushort)eventTypeBytes.Length)); - ms.Write(BitConverter.GetBytes((ushort)actorIdBytes.Length)); - ms.Write(BitConverter.GetBytes((ushort)BaseBytes.Length)); + ms.Write(BitConverter.GetBytes((ushort)eventIdBytes.Length)); ms.Write(BitConverter.GetBytes(EventBytes.Length)); ms.Write(eventTypeBytes); - ms.Write(actorIdBytes); - ms.Write(BaseBytes); + ms.Write(eventIdBytes); ms.Write(EventBytes); return ms.ToArray(); } - public static (bool success, PrimaryKey actorId) GetActorId(byte[] bytes) - { - if (bytes[0] == (byte)TransportType.Event) - { - var bytesSpan = bytes.AsSpan(); - var eventTypeLength = BitConverter.ToUInt16(bytesSpan.Slice(1, sizeof(ushort))); - var actorIdBytesLength = BitConverter.ToUInt16(bytesSpan.Slice(1 + sizeof(ushort), sizeof(ushort))); - if (typeof(PrimaryKey) == typeof(long)) - { - var id = BitConverter.ToInt64(bytesSpan.Slice(3 * sizeof(ushort) + 1 + sizeof(int) + eventTypeLength, actorIdBytesLength)); - if (id is PrimaryKey actorId) - return (true, actorId); - } - else - { - var id = Encoding.UTF8.GetString(bytesSpan.Slice(3 * sizeof(ushort) + 1 + sizeof(int) + eventTypeLength, actorIdBytesLength)); - if (id is PrimaryKey actorId) - return (true, actorId); - } - } - return (false, default); - } - public static (bool success, EventBytesTransport transport) FromBytesWithNoId(byte[] bytes) + public static (bool success, EventBytesTransport transport) FromBytes(byte[] bytes) { if (bytes[0] == (byte)TransportType.Event) { var bytesSpan = bytes.AsSpan(); - var eventTypeLength = BitConverter.ToUInt16(bytesSpan.Slice(1, sizeof(ushort))); - var actorIdBytesLength = BitConverter.ToUInt16(bytesSpan.Slice(1 + sizeof(ushort), sizeof(ushort))); - var baseBytesLength = BitConverter.ToUInt16(bytesSpan.Slice(2 * sizeof(ushort) + 1, sizeof(ushort))); - var eventBytesLength = BitConverter.ToInt32(bytesSpan.Slice(3 * sizeof(ushort) + 1, sizeof(int))); - var skipLength = 3 * sizeof(ushort) + 1 + sizeof(int); + var eventTypeCodeLength = BitConverter.ToUInt16(bytesSpan.Slice(1, sizeof(ushort))); + var eventIdLength = BitConverter.ToUInt16(bytesSpan.Slice(1 + sizeof(ushort), sizeof(ushort))); + var eventBytesLength = BitConverter.ToInt32(bytesSpan.Slice(1 + 2 * sizeof(ushort), sizeof(int))); + var skipLength = 2 * sizeof(ushort) + 1 + sizeof(int); return (true, new EventBytesTransport( - Encoding.UTF8.GetString(bytesSpan.Slice(skipLength, eventTypeLength)), - null, - bytesSpan.Slice(skipLength + eventTypeLength + actorIdBytesLength, baseBytesLength).ToArray(), - bytesSpan.Slice(skipLength + eventTypeLength + actorIdBytesLength + baseBytesLength, eventBytesLength).ToArray() - )); - } - return (false, default); - } - public static (bool success, EventBytesTransport transport) FromBytes(byte[] bytes) - { - if (bytes[0] == (byte)TransportType.Event) - { - var bytesSpan = bytes.AsSpan(); - var eventTypeLength = BitConverter.ToUInt16(bytesSpan.Slice(1, sizeof(ushort))); - var actorIdBytesLength = BitConverter.ToUInt16(bytesSpan.Slice(1 + sizeof(ushort), sizeof(ushort))); - var baseBytesLength = BitConverter.ToUInt16(bytesSpan.Slice(2 * sizeof(ushort) + 1, sizeof(ushort))); - var eventBytesLength = BitConverter.ToInt32(bytesSpan.Slice(3 * sizeof(ushort) + 1, sizeof(int))); - var skipLength = 3 * sizeof(ushort) + 1 + sizeof(int); - if (typeof(PrimaryKey) == typeof(long)) - { - var actorId = BitConverter.ToInt64(bytesSpan.Slice(3 * sizeof(ushort) + 1 + sizeof(int) + eventTypeLength, actorIdBytesLength)); - return (true, new EventBytesTransport( - Encoding.UTF8.GetString(bytesSpan.Slice(skipLength, eventTypeLength)), - actorId, - bytesSpan.Slice(skipLength + eventTypeLength + actorIdBytesLength, baseBytesLength).ToArray(), - bytesSpan.Slice(skipLength + eventTypeLength + actorIdBytesLength + baseBytesLength, eventBytesLength).ToArray() - )); - } - else - { - var actorId = Encoding.UTF8.GetString(bytesSpan.Slice(skipLength + eventTypeLength, actorIdBytesLength)); - return (true, new EventBytesTransport( - Encoding.UTF8.GetString(bytesSpan.Slice(skipLength, eventTypeLength)), - actorId, - bytesSpan.Slice(skipLength + eventTypeLength + actorIdBytesLength, baseBytesLength).ToArray(), - bytesSpan.Slice(skipLength + eventTypeLength + actorIdBytesLength + baseBytesLength, eventBytesLength).ToArray() + Encoding.UTF8.GetString(bytesSpan.Slice(skipLength, eventTypeCodeLength)), + Encoding.UTF8.GetString(bytesSpan.Slice(skipLength + eventTypeCodeLength, eventIdLength)), + bytesSpan.Slice(skipLength + eventTypeCodeLength + eventIdLength, eventBytesLength).ToArray() )); - } } return (false, default); } diff --git a/src/Pole.Core/EventBus/EventHandler/PoleEventHandler.cs b/src/Pole.Core/EventBus/EventHandler/PoleEventHandler.cs index e125507..bb39137 100644 --- a/src/Pole.Core/EventBus/EventHandler/PoleEventHandler.cs +++ b/src/Pole.Core/EventBus/EventHandler/PoleEventHandler.cs @@ -1,21 +1,103 @@ -using Orleans.Concurrency; +using Microsoft.Extensions.Logging; +using Orleans.Concurrency; +using Pole.Core.Abstraction; +using Pole.Core.EventBus.Event; +using Pole.Core.Serialization; using System; using System.Collections.Generic; using System.Text; using System.Threading.Tasks; +using Microsoft.Extensions.DependencyInjection; +using System.Reflection.Emit; +using System.Linq.Expressions; +using System.Linq; +using Pole.Core.Exceptions; namespace Pole.Core.EventBus.EventHandler { + /// + /// + /// public class PoleEventHandler : PoleEventHandlerBase { - public override Task Invoke(Immutable> bytes) + private IEventTypeFinder eventTypeFinder; + private ISerializer serializer; + private ILogger logger; + private Type grainType; + + public PoleEventHandler() + { + grainType = GetType(); + DependencyInjection(); + } + public override async Task OnActivateAsync() + { + await DependencyInjection(); + await base.OnActivateAsync(); + } + protected virtual Task DependencyInjection() { - throw new NotImplementedException(); + //ConfigOptions = ServiceProvider.GetOptionsByName(typeof(MainGrain).FullName); + serializer = ServiceProvider.GetService(); + eventTypeFinder = ServiceProvider.GetService(); + logger = (ILogger)ServiceProvider.GetService(typeof(ILogger<>).MakeGenericType(grainType)); + return Task.CompletedTask; } - public override Task Invoke(Immutable bytes) + public override Task Invoke(EventBytesTransport transport) { - throw new NotImplementedException(); + var eventType = eventTypeFinder.FindType(transport.EventTypeCode); + var method = typeof(ClusterClientExtensions).GetMethod(Consts.EventHandlerMethodName, new Type[] { eventType }); + if (method == null) + { + throw new EventHandlerTargetMethodNotFoundException(Consts.EventHandlerMethodName, eventType.Name); + } + var data = serializer.Deserialize(transport.EventBytes, eventType); + var eventHandlerType = this.GetType(); + var eventHandlerObjectParams = Expression.Parameter(typeof(object), "eventHandler"); + var eventHandlerParams = Expression.Convert(eventHandlerObjectParams, eventHandlerType); + var eventObjectParams = Expression.Parameter(typeof(object), "event"); + var eventParams = Expression.Convert(eventObjectParams, eventType); + + var body = Expression.Call(method, eventHandlerParams, eventParams); + var func = Expression.Lambda>(body, true, eventHandlerObjectParams, eventObjectParams).Compile(); + var result = func(this, data); + logger.LogTrace("Invoke completed: {0}->{1}->{2}", grainType.FullName, Consts.EventHandlerMethodName, serializer.Serialize(data)); + return result; + } + + public override Task Invoke(List transports) + { + if (transports.Count() != 0) + { + var firstTransport = transports.First(); + var eventType = eventTypeFinder.FindType(firstTransport.EventTypeCode); + var method = typeof(ClusterClientExtensions).GetMethod(Consts.BatchEventsHandlerMethodName, new Type[] { eventType }); + if (method == null) + { + var tasks = transports.Select(transport => Invoke(transport)); + return Task.WhenAll(tasks); + } + var datas = transports.Select(transport => serializer.Deserialize(firstTransport.EventBytes, eventType)).ToList(); + var eventHandlerType = this.GetType(); + var eventHandlerObjectParams = Expression.Parameter(typeof(object), "eventHandler"); + var eventHandlerParams = Expression.Convert(eventHandlerObjectParams, eventHandlerType); + var eventObjectParams = Expression.Parameter(typeof(object), "events"); + var eventsType = typeof(List<>).MakeGenericType(eventType); + var eventsParams = Expression.Convert(eventObjectParams, eventsType); + + var body = Expression.Call(method, eventHandlerParams, eventsParams); + var func = Expression.Lambda>(body, true, eventHandlerObjectParams, eventObjectParams).Compile(); + var result = func(this, datas); + logger.LogTrace("Batch invoke completed: {0}->{1}->{2}", grainType.FullName, Consts.EventHandlerMethodName, serializer.Serialize(datas)); + return result; + } + else + { + if (logger.IsEnabled(LogLevel.Information)) + logger.LogInformation($"{nameof(EventBytesTransport.FromBytes)} failed"); + return Task.CompletedTask; + } } } } diff --git a/src/Pole.Core/EventBus/EventHandler/PoleEventHandlerBase.cs b/src/Pole.Core/EventBus/EventHandler/PoleEventHandlerBase.cs index 68b9bad..a45f448 100644 --- a/src/Pole.Core/EventBus/EventHandler/PoleEventHandlerBase.cs +++ b/src/Pole.Core/EventBus/EventHandler/PoleEventHandlerBase.cs @@ -1,5 +1,6 @@ using Orleans; using Orleans.Concurrency; +using Pole.Core.EventBus.Event; using System; using System.Collections.Generic; using System.Text; @@ -9,7 +10,7 @@ namespace Pole.Core.EventBus.EventHandler { public abstract class PoleEventHandlerBase : Grain { - public abstract Task Invoke(Immutable bytes); - public abstract Task Invoke(Immutable> bytes); + public abstract Task Invoke(EventBytesTransport transport); + public abstract Task Invoke(List transports); } } diff --git a/src/Pole.Core/EventBus/IBus.cs b/src/Pole.Core/EventBus/IBus.cs new file mode 100644 index 0000000..de0f234 --- /dev/null +++ b/src/Pole.Core/EventBus/IBus.cs @@ -0,0 +1,13 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace Pole.Core.EventBus +{ + public interface IBus + { + Task Publish(object @event, CancellationToken cancellationToken = default); + } +} diff --git a/src/Pole.Core/EventBus/IGrainID.cs b/src/Pole.Core/EventBus/IGrainID.cs index 6397084..ea6edf7 100644 --- a/src/Pole.Core/EventBus/IGrainID.cs +++ b/src/Pole.Core/EventBus/IGrainID.cs @@ -4,6 +4,6 @@ namespace Pole.Core.EventBus { public interface IGrainID { - Type GrainType { get; } + Type EventHandlerType { get; } } } diff --git a/src/Pole.Core/EventBus/IProducer.cs b/src/Pole.Core/EventBus/IProducer.cs index 82e5fee..dffc879 100644 --- a/src/Pole.Core/EventBus/IProducer.cs +++ b/src/Pole.Core/EventBus/IProducer.cs @@ -4,6 +4,6 @@ namespace Pole.Core.EventBus { public interface IProducer { - ValueTask Publish(byte[] bytes, string hashKey); + ValueTask Publish(byte[] bytes); } } diff --git a/src/Pole.Core/EventBus/ObserverUnit.cs b/src/Pole.Core/EventBus/ObserverUnit.cs index 959ec55..d89eecf 100644 --- a/src/Pole.Core/EventBus/ObserverUnit.cs +++ b/src/Pole.Core/EventBus/ObserverUnit.cs @@ -20,21 +20,21 @@ namespace Pole.Core.EventBus { readonly IServiceProvider serviceProvider; readonly ISerializer serializer; - readonly ITypeFinder typeFinder; + readonly IEventTypeFinder typeFinder; readonly IClusterClient clusterClient; Func eventHandler; Func, Task> batchEventHandler; protected ILogger Logger { get; private set; } - public Type GrainType { get; } + public Type EventHandlerType { get; } - public ObserverUnit(IServiceProvider serviceProvider, Type grainType) + public ObserverUnit(IServiceProvider serviceProvider, Type eventHandlerType) { this.serviceProvider = serviceProvider; clusterClient = serviceProvider.GetService(); serializer = serviceProvider.GetService(); - typeFinder = serviceProvider.GetService(); + typeFinder = serviceProvider.GetService(); Logger = serviceProvider.GetService>>(); - GrainType = grainType; + EventHandlerType = eventHandlerType; } public static ObserverUnit From(IServiceProvider serviceProvider) where Grain : Orleans.Grain { @@ -50,116 +50,58 @@ namespace Pole.Core.EventBus { return batchEventHandler; } - public ObserverUnit UnreliableObserver( - Func, ValueTask> handler) - { - eventHandler = EventHandler; - batchEventHandler = BatchEventHandler; - return this; - //内部函数 - Task EventHandler(byte[] bytes) - { - var (success, transport) = EventBytesTransport.FromBytes(bytes); - if (success) - { - var data = serializer.Deserialize(transport.EventBytes, typeFinder.FindType(transport.EventTypeCode)); - if (data is IEvent @event && transport.GrainId is PrimaryKey actorId) - { - var eventBase = EventBase.FromBytes(transport.BaseBytes); - var tellTask = handler(serviceProvider, new FullyEvent - { - StateId = actorId, - Base = eventBase, - Event = @event - }); - if (!tellTask.IsCompletedSuccessfully) - return tellTask.AsTask(); - } - } - return Task.CompletedTask; - } - Task BatchEventHandler(List list) - { - var groups = - list.Select(b => EventBytesTransport.FromBytes(b)) - .Where(o => o.success) - .Select(o => o.transport) - .GroupBy(o => o.GrainId); - return Task.WhenAll(groups.Select(async kv => - { - foreach (var transport in kv) - { - var data = serializer.Deserialize(transport.EventBytes, typeFinder.FindType(transport.EventTypeCode)); - if (data is IEvent @event && transport.GrainId is PrimaryKey actorId) - { - var eventBase = EventBase.FromBytes(transport.BaseBytes); - var tellTask = handler(serviceProvider, new FullyEvent - { - StateId = actorId, - Base = eventBase, - Event = @event - }); - if (!tellTask.IsCompletedSuccessfully) - await tellTask; - } - } - })); - } - } - public void Observer(Type observerType) + + public void Observer() { - if (!typeof(PoleEventHandlerBase).IsAssignableFrom(observerType)) - throw new NotSupportedException($"{observerType.FullName} must inheritance from PoleEventHandler"); + if (!typeof(PoleEventHandlerBase).IsAssignableFrom(EventHandlerType)) + throw new NotSupportedException($"{EventHandlerType.FullName} must inheritance from PoleEventHandler"); eventHandler = EventHandler; batchEventHandler = BatchEventHandler; //内部函数 Task EventHandler(byte[] bytes) { - var (success, actorId) = EventBytesTransport.GetActorId(bytes); + var (success, transport) = EventBytesTransport.FromBytes(bytes); if (success) { - return GetObserver(observerType, actorId).Invoke(new Immutable(bytes)); - + return GetObserver(EventHandlerType, transport.EventId).Invoke(transport); } else { if (Logger.IsEnabled(LogLevel.Error)) - Logger.LogError($"{nameof(EventBytesTransport.GetActorId)} failed"); + Logger.LogError($" EventId:{nameof(EventBytesTransport.EventId)} is not a event"); } return Task.CompletedTask; } Task BatchEventHandler(List list) { - var groups = list.Select(bytes => + var transports = list.Select(bytes => { - var (success, GrainId) = EventBytesTransport.GetActorId(bytes); + var (success, transport) = EventBytesTransport.FromBytes(bytes); if (!success) { if (Logger.IsEnabled(LogLevel.Error)) - Logger.LogError($"{nameof(EventBytesTransport.GetActorId)} failed"); + Logger.LogError($" EventId:{nameof(EventBytesTransport.EventId)} is not a event"); } - return (success, GrainId, bytes); - }).Where(o => o.success).GroupBy(o => o.GrainId); - return Task.WhenAll(groups.Select(kv => - { - var items = kv.Select(item => item.bytes).ToList(); - return GetObserver(observerType, kv.Key).Invoke(new Immutable>(items)); - })); + return (success, transport); + }).Where(o => o.success) + .Select(o => (o.transport)) + .ToList(); + // 批量处理的时候 grain Id 取第一个 event的id + return GetObserver(EventHandlerType, transports.First().EventId).Invoke(transports); } } - static readonly ConcurrentDictionary> _observerGeneratorDict = new ConcurrentDictionary>(); - private PoleEventHandlerBase GetObserver(Type ObserverType, PrimaryKey primaryKey) + static readonly ConcurrentDictionary> _observerGeneratorDict = new ConcurrentDictionary>(); + private PoleEventHandlerBase GetObserver(Type ObserverType, string primaryKey) { var func = _observerGeneratorDict.GetOrAdd(ObserverType, key => { var clientType = typeof(IClusterClient); var clientParams = Expression.Parameter(clientType, "client"); - var primaryKeyParams = Expression.Parameter(typeof(PrimaryKey), "primaryKey"); + var primaryKeyParams = Expression.Parameter(typeof(string), "primaryKey"); var grainClassNamePrefixParams = Expression.Parameter(typeof(string), "grainClassNamePrefix"); - var method = typeof(ClusterClientExtensions).GetMethod("GetGrain", new Type[] { clientType, typeof(PrimaryKey), typeof(string) }); + var method = typeof(ClusterClientExtensions).GetMethod("GetGrain", new Type[] { clientType, typeof(string), typeof(string) }); var body = Expression.Call(method.MakeGenericMethod(ObserverType), clientParams, primaryKeyParams, grainClassNamePrefixParams); - return Expression.Lambda>(body, clientParams, primaryKeyParams, grainClassNamePrefixParams).Compile(); + return Expression.Lambda>(body, clientParams, primaryKeyParams, grainClassNamePrefixParams).Compile(); }); return func(clusterClient, primaryKey, null); } diff --git a/src/Pole.Core/EventBus/ObserverUnitContainer.cs b/src/Pole.Core/EventBus/ObserverUnitContainer.cs index aca9142..3b5c0f7 100644 --- a/src/Pole.Core/EventBus/ObserverUnitContainer.cs +++ b/src/Pole.Core/EventBus/ObserverUnitContainer.cs @@ -24,9 +24,9 @@ namespace Pole.Core.EventBus { foreach (var attribute in type.GetCustomAttributes(false)) { - if (attribute is EventHandlerAttribute observer) + if (attribute is EventHandlerAttribute eventHandlerAttribute) { - eventHandlerList.Add((type, observer)); + eventHandlerList.Add((type, eventHandlerAttribute)); break; } } @@ -36,7 +36,7 @@ namespace Pole.Core.EventBus { var unitType = typeof(ObserverUnit<>).MakeGenericType(new Type[] { typeof(string) }); var unit = (ObserverUnit)Activator.CreateInstance(unitType, serviceProvider, eventHandler.Item1); - + unit.Observer(); Register(eventHandler.Item2.EventName, unit); } } @@ -72,7 +72,7 @@ namespace Pole.Core.EventBus } if (!unitDict.TryAdd(observerName, new List { observerUnit })) { - throw new ObserverUnitRepeatedException(observerUnit.GrainType.FullName); + throw new ObserverUnitRepeatedException(observerUnit.EventHandlerType.FullName); } } diff --git a/src/Pole.Core/Exceptions/EventHandlerTargetMethodNotFoundException.cs b/src/Pole.Core/Exceptions/EventHandlerTargetMethodNotFoundException.cs new file mode 100644 index 0000000..2cc81fe --- /dev/null +++ b/src/Pole.Core/Exceptions/EventHandlerTargetMethodNotFoundException.cs @@ -0,0 +1,14 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace Pole.Core.Exceptions +{ + public class EventHandlerTargetMethodNotFoundException: Exception + { + public EventHandlerTargetMethodNotFoundException(string methodName,string eventTypeName):base($"EventHandler method:{methodName} not found when eventHandler invoke , eventType:{eventTypeName}") + { + + } + } +} diff --git a/src/Pole.Core/Serialization/EventTypeFinder.cs b/src/Pole.Core/Serialization/EventTypeFinder.cs new file mode 100644 index 0000000..30abaea --- /dev/null +++ b/src/Pole.Core/Serialization/EventTypeFinder.cs @@ -0,0 +1,63 @@ +using Microsoft.Extensions.Logging; +using Pole.Core.Abstraction; +using Pole.Core.EventBus.Event; +using Pole.Core.Exceptions; +using Pole.Core.Utils; +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Text; + +namespace Pole.Core.Serialization +{ + public class EventTypeFinder : IEventTypeFinder + { + private readonly ConcurrentDictionary codeDict = new ConcurrentDictionary(); + private readonly ConcurrentDictionary typeDict = new ConcurrentDictionary(); + readonly ILogger logger; + public EventTypeFinder(ILogger logger) + { + this.logger = logger; + var baseEventType = typeof(IEvent); + foreach (var assembly in AssemblyHelper.GetAssemblies(this.logger)) + { + foreach (var type in assembly.GetTypes()) + { + if (baseEventType.IsAssignableFrom(type)) + { + typeDict.TryAdd(type, type.FullName); + + if (!codeDict.TryAdd(type.FullName, type)) + { + throw new TypeCodeRepeatedException(type.FullName, type.FullName); + } + } + } + } + } + /// + /// 通过code获取Type对象 + /// + /// + /// + public Type FindType(string typeCode) + { + if (codeDict.TryGetValue(typeCode, out Type type)) + { + return type; + } + throw new UnknowTypeCodeException(typeCode); + } + /// + /// 获取Type对象的code字符串 + /// + /// + /// + public string GetCode(Type type) + { + if (!typeDict.TryGetValue(type, out var value)) + return type.FullName; + return value; + } + } +} diff --git a/src/Pole.Core/Utils/Abstraction/IInstanceIPV4_16IdGeneratorIdSolver.cs b/src/Pole.Core/Utils/Abstraction/IInstanceIPV4_16IdGeneratorIdSolver.cs new file mode 100644 index 0000000..73f9df7 --- /dev/null +++ b/src/Pole.Core/Utils/Abstraction/IInstanceIPV4_16IdGeneratorIdSolver.cs @@ -0,0 +1,11 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace Pole.Core.Utils.Abstraction +{ + public interface IGeneratorIdSolver + { + int GetGeneratorId(); + } +} diff --git a/src/Pole.Core/Utils/Abstraction/ISnowflakeIdGenerator.cs b/src/Pole.Core/Utils/Abstraction/ISnowflakeIdGenerator.cs new file mode 100644 index 0000000..b39bfc1 --- /dev/null +++ b/src/Pole.Core/Utils/Abstraction/ISnowflakeIdGenerator.cs @@ -0,0 +1,11 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace Pole.Core.Utils.Abstraction +{ + public interface ISnowflakeIdGenerator + { + public string NextId(); + } +} diff --git a/src/Pole.Core/Utils/Emit/SwitchMethodEmit.cs b/src/Pole.Core/Utils/Emit/SwitchMethodEmit.cs deleted file mode 100644 index fa9ec63..0000000 --- a/src/Pole.Core/Utils/Emit/SwitchMethodEmit.cs +++ /dev/null @@ -1,37 +0,0 @@ -using System; -using System.Reflection; -using System.Reflection.Emit; - -namespace Pole.Core.Utils.Emit -{ - /// - /// 用来生成模式匹配方法调用的方法信息 - /// - public class SwitchMethodEmit - { - /// - /// 方法 - /// - public MethodInfo Mehod { get; set; } - /// - /// 匹配的类型 - /// - public Type CaseType { get; set; } - /// - /// 局部变量 - /// - public LocalBuilder DeclareLocal { get; set; } - /// - /// 方法调用Lable - /// - public Label Lable { get; set; } - /// - /// 方法的参数 - /// - public ParameterInfo[] Parameters { get; set; } - /// - /// 方法在类中的顺序 - /// - public int Index { get; set; } - } -} diff --git a/src/Pole.Core/Utils/InstanceIPV4_16IdGeneratorIdSolver.cs b/src/Pole.Core/Utils/InstanceIPV4_16IdGeneratorIdSolver.cs new file mode 100644 index 0000000..abadccf --- /dev/null +++ b/src/Pole.Core/Utils/InstanceIPV4_16IdGeneratorIdSolver.cs @@ -0,0 +1,35 @@ +using Pole.Core.Utils.Abstraction; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Net.NetworkInformation; +using System.Net.Sockets; +using System.Text; + +namespace Pole.Core.Utils +{ + public class InstanceIPV4_16IdGeneratorIdSolver : IGeneratorIdSolver + { + private int generatorId; + public InstanceIPV4_16IdGeneratorIdSolver() + { + NetworkInterface networkInterface = NetworkInterface.GetAllNetworkInterfaces() + .OrderByDescending(c => c.Speed) + .Where(m => m.NetworkInterfaceType != NetworkInterfaceType.Loopback && m.OperationalStatus == OperationalStatus.Up) + .FirstOrDefault(); + var props = networkInterface.GetIPProperties(); + // get first IPV4 address assigned to this interface + var firstIpV4Address = props.UnicastAddresses + .Where(c => c.Address.AddressFamily == AddressFamily.InterNetwork) + .Select(c => c.Address) + .FirstOrDefault(); + + var bytes = firstIpV4Address.GetAddressBytes(); + generatorId = BitConverter.ToInt32(bytes, 2); + } + public int GetGeneratorId() + { + return generatorId; + } + } +} diff --git a/src/Pole.Core/Utils/SnowflakeIdGenerator.cs b/src/Pole.Core/Utils/SnowflakeIdGenerator.cs new file mode 100644 index 0000000..ea6bfa8 --- /dev/null +++ b/src/Pole.Core/Utils/SnowflakeIdGenerator.cs @@ -0,0 +1,101 @@ +using Pole.Core.Abstraction; +using System; +using System.Collections.Generic; +using System.Text; + +namespace Pole.Core.Utils.Abstraction +{ + public class SnowflakeIdGenerator : ISnowflakeIdGenerator + { + + private static readonly DateTime Jan1st1970 = new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc); + private int generatorIdBits; + private long twepoch; + private int maxGeneratorId; + private const int SequenceAndGeneratorIdBits = 64 - 1 - 41; + /// + /// 这里的位数决定 每毫秒能生成的最大个数 + /// + private int sequenceBits; + private int generatorIdShift; + private const int TimestampLeftShift = SequenceAndGeneratorIdBits; + private long sequenceMask; + public long GeneratorId { get; private set; } + + // 毫秒内序列(0~4095) + public long Sequence { get; private set; } + + // 上次生成ID的时间截 + public long LastTimestamp { get; private set; } + + /// + /// 时间戳为41位,可以使用69年,年T = (1L << 41) / (1000L * 60 * 60 * 24 * 365) = 69 + /// + /// + /// + public SnowflakeIdGenerator(DateTime beginTime, int generatorIdBits, long generatorId) + { + twepoch = Convert.ToInt64((beginTime.ToUniversalTime() - Jan1st1970).TotalMilliseconds); + this.generatorIdBits = generatorIdBits; + maxGeneratorId = -1 ^ (-1 << this.generatorIdBits); + sequenceBits = SequenceAndGeneratorIdBits - generatorIdBits; + generatorIdShift = sequenceBits; + sequenceMask = -1L ^ (-1L << sequenceBits); + GeneratorId = generatorId; + Sequence = 0L; + LastTimestamp = -1L; + } + public string NextId() + { + lock (this) + { + long timestamp = GetCurrentTimestamp(); + if (timestamp > LastTimestamp) //时间戳改变,毫秒内序列重置 + { + Sequence = 0L; + } + else if (timestamp == LastTimestamp) //如果是同一时间生成的,则进行毫秒内序列 + { + Sequence = (Sequence + 1) & sequenceMask; + if (Sequence == 0) //毫秒内序列溢出 + { + timestamp = GetNextTimestamp(LastTimestamp); //阻塞到下一个毫秒,获得新的时间戳 + } + } + else //当前时间小于上一次ID生成的时间戳,证明系统时钟被回拨,此时需要做回拨处理 + { + Sequence = (Sequence + 1) & sequenceMask; + if (Sequence > 0) + { + timestamp = LastTimestamp; //停留在最后一次时间戳上,等待系统时间追上后即完全度过了时钟回拨问题。 + } + else //毫秒内序列溢出 + { + timestamp = LastTimestamp + 1; //直接进位到下一个毫秒 + } + } + + LastTimestamp = timestamp; //上次生成ID的时间截 + + //移位并通过或运算拼到一起组成64位的ID + var id = ((timestamp - twepoch) << TimestampLeftShift) + | (GeneratorId << generatorIdShift) + | Sequence; + return id.ToString(); + } + } + private static long GetCurrentTimestamp() + { + return (long)(DateTime.UtcNow - Jan1st1970).TotalMilliseconds; + } + private static long GetNextTimestamp(long lastTimestamp) + { + long timestamp = GetCurrentTimestamp(); + while (timestamp <= lastTimestamp) + { + timestamp = GetCurrentTimestamp(); + } + return timestamp; + } + } +} diff --git a/src/Pole.EventBus.Rabbitmq/Core/RabbitEventBus.cs b/src/Pole.EventBus.Rabbitmq/Core/RabbitEventBus.cs index 358cbdc..5735443 100644 --- a/src/Pole.EventBus.Rabbitmq/Core/RabbitEventBus.cs +++ b/src/Pole.EventBus.Rabbitmq/Core/RabbitEventBus.cs @@ -49,10 +49,7 @@ namespace Pole.EventBus.RabbitMQ /// public bool Persistent { get; set; } public List Consumers { get; set; } = new List(); - public string GetRoute(string key) - { - return RoutePrefix; - } + public RabbitEventBus BindEvent(Type eventType, string eventName) { Event = eventType; diff --git a/src/Pole.EventBus.Rabbitmq/Producer/RabbitProducer.cs b/src/Pole.EventBus.Rabbitmq/Producer/RabbitProducer.cs index 5c0b4f5..54cbb57 100644 --- a/src/Pole.EventBus.Rabbitmq/Producer/RabbitProducer.cs +++ b/src/Pole.EventBus.Rabbitmq/Producer/RabbitProducer.cs @@ -15,10 +15,10 @@ namespace Pole.EventBus.RabbitMQ this.publisher = publisher; this.rabbitMQClient = rabbitMQClient; } - public ValueTask Publish(byte[] bytes, string hashKey) + public ValueTask Publish(byte[] bytes) { using var channel = rabbitMQClient.PullChannel(); - channel.Publish(bytes, publisher.Exchange, publisher.GetRoute(hashKey), publisher.Persistent); + channel.Publish(bytes, publisher.Exchange, string.Empty, publisher.Persistent); return Consts.ValueTaskDone; } } diff --git a/test/Pole.Core.Test/Pole.Core.Test.csproj b/test/Pole.Core.Test/Pole.Core.Test.csproj new file mode 100644 index 0000000..77049be --- /dev/null +++ b/test/Pole.Core.Test/Pole.Core.Test.csproj @@ -0,0 +1,20 @@ + + + + netcoreapp3.1 + false + + + + + + all + runtime; build; native; contentfiles; analyzers; buildtransitive + + + + + + + + diff --git a/test/Pole.Core.Test/Utils/InstanceIPV4_16IdGeneratorIdSolverTest.cs b/test/Pole.Core.Test/Utils/InstanceIPV4_16IdGeneratorIdSolverTest.cs new file mode 100644 index 0000000..bc09806 --- /dev/null +++ b/test/Pole.Core.Test/Utils/InstanceIPV4_16IdGeneratorIdSolverTest.cs @@ -0,0 +1,18 @@ +using Pole.Core.Utils; +using Pole.Core.Utils.Abstraction; +using System; +using System.Collections.Generic; +using System.Text; +using Xunit; + +namespace Pole.Core.Test.Utils +{ + public class InstanceIPV4_16IdGeneratorIdSolverTest + { + [Fact] + public void GetIPv4AddressBytesTest() + { + IGeneratorIdSolver instanceIPV4_16IdGeneratorIdSolver = new InstanceIPV4_16IdGeneratorIdSolver(); + } + } +} diff --git a/test/Pole.Core.Test/Utils/SnowflakeIdGeneratorTest.cs b/test/Pole.Core.Test/Utils/SnowflakeIdGeneratorTest.cs new file mode 100644 index 0000000..01cc5a5 --- /dev/null +++ b/test/Pole.Core.Test/Utils/SnowflakeIdGeneratorTest.cs @@ -0,0 +1,15 @@ +using System; +using Xunit; + +namespace Pole.Core.Test +{ + public class SnowflakeIdGeneratorTest + { + [Fact] + public void MaxYears() + { + var years = -1L ^ (-1L << 6); + Console.WriteLine(years); + } + } +} -- libgit2 0.25.0