Commit 988776d9 by 丁松杰

完成 consumer 消费时 调用 orleans grain , grain 的id 由 eventid 决定,每一个类型的 event 每毫秒支持生成…

完成 consumer 消费时 调用 orleans grain , grain 的id 由 eventid 决定,每一个类型的 event 每毫秒支持生成 64个 event ,并且在 k8s集群里 全局唯一
parent 021ebde6
......@@ -45,7 +45,9 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "integrationEvents", "integr
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Product.IntegrationEvents", "samples\intergrationEvents\Product.IntegrationEvents\Product.IntegrationEvents.csproj", "{9C0DFC90-1AF9-424A-B5FB-2A7C3611970C}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Pole.EventBus.Rabbitmq", "src\Pole.EventBus.Rabbitmq\Pole.EventBus.Rabbitmq.csproj", "{BDF62A19-FFBD-4EE1-A07A-68472E680A95}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Pole.EventBus.Rabbitmq", "src\Pole.EventBus.Rabbitmq\Pole.EventBus.Rabbitmq.csproj", "{BDF62A19-FFBD-4EE1-A07A-68472E680A95}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Pole.Core.Test", "test\Pole.Core.Test\Pole.Core.Test.csproj", "{23EA8735-DB2E-4599-8902-8FCBCBE4799C}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
......@@ -117,6 +119,10 @@ Global
{BDF62A19-FFBD-4EE1-A07A-68472E680A95}.Debug|Any CPU.Build.0 = Debug|Any CPU
{BDF62A19-FFBD-4EE1-A07A-68472E680A95}.Release|Any CPU.ActiveCfg = Release|Any CPU
{BDF62A19-FFBD-4EE1-A07A-68472E680A95}.Release|Any CPU.Build.0 = Release|Any CPU
{23EA8735-DB2E-4599-8902-8FCBCBE4799C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{23EA8735-DB2E-4599-8902-8FCBCBE4799C}.Debug|Any CPU.Build.0 = Debug|Any CPU
{23EA8735-DB2E-4599-8902-8FCBCBE4799C}.Release|Any CPU.ActiveCfg = Release|Any CPU
{23EA8735-DB2E-4599-8902-8FCBCBE4799C}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
......@@ -141,6 +147,7 @@ Global
{74422E64-29FE-4287-A86E-741D1DFF6698} = {4A0FB696-EC29-4A5F-B40B-A6FC56001ADB}
{9C0DFC90-1AF9-424A-B5FB-2A7C3611970C} = {74422E64-29FE-4287-A86E-741D1DFF6698}
{BDF62A19-FFBD-4EE1-A07A-68472E680A95} = {9932C965-8B38-4F70-9E43-86DC56860E2B}
{23EA8735-DB2E-4599-8902-8FCBCBE4799C} = {655E719B-4A3E-467C-A541-E0770AB81DE1}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {DB0775A3-F293-4043-ADB7-72BAC081E87E}
......
......@@ -4,7 +4,7 @@ using System.Text;
namespace Pole.Core.Abstraction
{
public interface ITypeFinder
public interface IEventTypeFinder
{
Type FindType(string code);
string GetCode(Type type);
......
......@@ -10,5 +10,7 @@ namespace Pole.Core
public static ValueTask ValueTaskDone = new ValueTask();
public const string ConsumerRetryTimesStr = "pole-consumer-retry-times";
public const string ConsumerExceptionDetailsStr = "pole-consumer-exception-details";
public const string EventHandlerMethodName = "EventHandle";
public const string BatchEventsHandlerMethodName = "BatchEventsHandler";
}
}
using Pole.Core.Abstraction;
using Pole.Core.EventBus.Event;
using Pole.Core.Serialization;
using Pole.Core.Utils.Abstraction;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Pole.Core.EventBus
{
class Bus : IBus
{
private readonly IProducer producer;
private readonly IEventTypeFinder eventTypeFinder;
private readonly ISerializer serializer;
private readonly ISnowflakeIdGenerator snowflakeIdGenerator;
public Bus(IProducer producer, IEventTypeFinder eventTypeFinder, ISerializer serializer, ISnowflakeIdGenerator snowflakeIdGenerator)
{
this.producer = producer;
this.eventTypeFinder = eventTypeFinder;
this.serializer = serializer;
this.snowflakeIdGenerator = snowflakeIdGenerator;
}
public async Task<bool> Publish(object @event, CancellationToken cancellationToken = default)
{
var eventType = @event.GetType();
var eventTypeCode = eventTypeFinder.GetCode(eventType);
var eventId = snowflakeIdGenerator.NextId();
var bytesTransport = new EventBytesTransport(eventTypeCode, eventId, serializer.SerializeToUtf8Bytes(@event, eventType));
var bytes = bytesTransport.GetBytes();
await producer.Publish(bytes);
return true;
}
}
}
......@@ -8,25 +8,20 @@ namespace Pole.Core.EventBus.Event
{
public readonly struct EventBytesTransport
{
public EventBytesTransport(string eventCode, object grainId, byte[] baseBytes, byte[] eventBytes)
public EventBytesTransport(string eventCode, string eventId, byte[] eventBytes)
{
EventTypeCode = eventCode;
GrainId = grainId;
BaseBytes = baseBytes;
EventBytes = eventBytes;
EventId = eventId;
}
/// <summary>
/// 事件TypeCode
/// </summary>
public string EventTypeCode { get; }
/// <summary>
/// 事件GrainId
/// 每个类型的Event 全局唯一
/// </summary>
public object GrainId { get; }
public string EventId { get; }
/// <summary>
/// 事件base信息的bytes
/// 事件TypeCode
/// </summary>
public byte[] BaseBytes { get; }
public string EventTypeCode { get; }
/// <summary>
/// 事件本身的bytes
/// </summary>
......@@ -34,92 +29,31 @@ namespace Pole.Core.EventBus.Event
public byte[] GetBytes()
{
var eventTypeBytes = Encoding.UTF8.GetBytes(EventTypeCode);
byte[] actorIdBytes;
var strId = GrainId as string;
actorIdBytes = Encoding.UTF8.GetBytes(strId);
var eventIdBytes = Encoding.UTF8.GetBytes(EventId);
using var ms = new PooledMemoryStream();
ms.WriteByte((byte)TransportType.Event);
ms.Write(BitConverter.GetBytes((ushort)eventTypeBytes.Length));
ms.Write(BitConverter.GetBytes((ushort)actorIdBytes.Length));
ms.Write(BitConverter.GetBytes((ushort)BaseBytes.Length));
ms.Write(BitConverter.GetBytes((ushort)eventIdBytes.Length));
ms.Write(BitConverter.GetBytes(EventBytes.Length));
ms.Write(eventTypeBytes);
ms.Write(actorIdBytes);
ms.Write(BaseBytes);
ms.Write(eventIdBytes);
ms.Write(EventBytes);
return ms.ToArray();
}
public static (bool success, PrimaryKey actorId) GetActorId<PrimaryKey>(byte[] bytes)
{
if (bytes[0] == (byte)TransportType.Event)
{
var bytesSpan = bytes.AsSpan();
var eventTypeLength = BitConverter.ToUInt16(bytesSpan.Slice(1, sizeof(ushort)));
var actorIdBytesLength = BitConverter.ToUInt16(bytesSpan.Slice(1 + sizeof(ushort), sizeof(ushort)));
if (typeof(PrimaryKey) == typeof(long))
{
var id = BitConverter.ToInt64(bytesSpan.Slice(3 * sizeof(ushort) + 1 + sizeof(int) + eventTypeLength, actorIdBytesLength));
if (id is PrimaryKey actorId)
return (true, actorId);
}
else
{
var id = Encoding.UTF8.GetString(bytesSpan.Slice(3 * sizeof(ushort) + 1 + sizeof(int) + eventTypeLength, actorIdBytesLength));
if (id is PrimaryKey actorId)
return (true, actorId);
}
}
return (false, default);
}
public static (bool success, EventBytesTransport transport) FromBytesWithNoId(byte[] bytes)
public static (bool success, EventBytesTransport transport) FromBytes(byte[] bytes)
{
if (bytes[0] == (byte)TransportType.Event)
{
var bytesSpan = bytes.AsSpan();
var eventTypeLength = BitConverter.ToUInt16(bytesSpan.Slice(1, sizeof(ushort)));
var actorIdBytesLength = BitConverter.ToUInt16(bytesSpan.Slice(1 + sizeof(ushort), sizeof(ushort)));
var baseBytesLength = BitConverter.ToUInt16(bytesSpan.Slice(2 * sizeof(ushort) + 1, sizeof(ushort)));
var eventBytesLength = BitConverter.ToInt32(bytesSpan.Slice(3 * sizeof(ushort) + 1, sizeof(int)));
var skipLength = 3 * sizeof(ushort) + 1 + sizeof(int);
var eventTypeCodeLength = BitConverter.ToUInt16(bytesSpan.Slice(1, sizeof(ushort)));
var eventIdLength = BitConverter.ToUInt16(bytesSpan.Slice(1 + sizeof(ushort), sizeof(ushort)));
var eventBytesLength = BitConverter.ToInt32(bytesSpan.Slice(1 + 2 * sizeof(ushort), sizeof(int)));
var skipLength = 2 * sizeof(ushort) + 1 + sizeof(int);
return (true, new EventBytesTransport(
Encoding.UTF8.GetString(bytesSpan.Slice(skipLength, eventTypeLength)),
null,
bytesSpan.Slice(skipLength + eventTypeLength + actorIdBytesLength, baseBytesLength).ToArray(),
bytesSpan.Slice(skipLength + eventTypeLength + actorIdBytesLength + baseBytesLength, eventBytesLength).ToArray()
));
}
return (false, default);
}
public static (bool success, EventBytesTransport transport) FromBytes<PrimaryKey>(byte[] bytes)
{
if (bytes[0] == (byte)TransportType.Event)
{
var bytesSpan = bytes.AsSpan();
var eventTypeLength = BitConverter.ToUInt16(bytesSpan.Slice(1, sizeof(ushort)));
var actorIdBytesLength = BitConverter.ToUInt16(bytesSpan.Slice(1 + sizeof(ushort), sizeof(ushort)));
var baseBytesLength = BitConverter.ToUInt16(bytesSpan.Slice(2 * sizeof(ushort) + 1, sizeof(ushort)));
var eventBytesLength = BitConverter.ToInt32(bytesSpan.Slice(3 * sizeof(ushort) + 1, sizeof(int)));
var skipLength = 3 * sizeof(ushort) + 1 + sizeof(int);
if (typeof(PrimaryKey) == typeof(long))
{
var actorId = BitConverter.ToInt64(bytesSpan.Slice(3 * sizeof(ushort) + 1 + sizeof(int) + eventTypeLength, actorIdBytesLength));
return (true, new EventBytesTransport(
Encoding.UTF8.GetString(bytesSpan.Slice(skipLength, eventTypeLength)),
actorId,
bytesSpan.Slice(skipLength + eventTypeLength + actorIdBytesLength, baseBytesLength).ToArray(),
bytesSpan.Slice(skipLength + eventTypeLength + actorIdBytesLength + baseBytesLength, eventBytesLength).ToArray()
));
}
else
{
var actorId = Encoding.UTF8.GetString(bytesSpan.Slice(skipLength + eventTypeLength, actorIdBytesLength));
return (true, new EventBytesTransport(
Encoding.UTF8.GetString(bytesSpan.Slice(skipLength, eventTypeLength)),
actorId,
bytesSpan.Slice(skipLength + eventTypeLength + actorIdBytesLength, baseBytesLength).ToArray(),
bytesSpan.Slice(skipLength + eventTypeLength + actorIdBytesLength + baseBytesLength, eventBytesLength).ToArray()
Encoding.UTF8.GetString(bytesSpan.Slice(skipLength, eventTypeCodeLength)),
Encoding.UTF8.GetString(bytesSpan.Slice(skipLength + eventTypeCodeLength, eventIdLength)),
bytesSpan.Slice(skipLength + eventTypeCodeLength + eventIdLength, eventBytesLength).ToArray()
));
}
}
return (false, default);
}
......
using Orleans.Concurrency;
using Microsoft.Extensions.Logging;
using Orleans.Concurrency;
using Pole.Core.Abstraction;
using Pole.Core.EventBus.Event;
using Pole.Core.Serialization;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using System.Reflection.Emit;
using System.Linq.Expressions;
using System.Linq;
using Pole.Core.Exceptions;
namespace Pole.Core.EventBus.EventHandler
{
/// <summary>
///
/// </summary>
public class PoleEventHandler : PoleEventHandlerBase
{
public override Task Invoke(Immutable<List<byte[]>> bytes)
private IEventTypeFinder eventTypeFinder;
private ISerializer serializer;
private ILogger logger;
private Type grainType;
public PoleEventHandler()
{
grainType = GetType();
DependencyInjection();
}
public override async Task OnActivateAsync()
{
await DependencyInjection();
await base.OnActivateAsync();
}
protected virtual Task DependencyInjection()
{
throw new NotImplementedException();
//ConfigOptions = ServiceProvider.GetOptionsByName<CoreOptions>(typeof(MainGrain).FullName);
serializer = ServiceProvider.GetService<ISerializer>();
eventTypeFinder = ServiceProvider.GetService<IEventTypeFinder>();
logger = (ILogger)ServiceProvider.GetService(typeof(ILogger<>).MakeGenericType(grainType));
return Task.CompletedTask;
}
public override Task Invoke(Immutable<byte[]> bytes)
public override Task Invoke(EventBytesTransport transport)
{
throw new NotImplementedException();
var eventType = eventTypeFinder.FindType(transport.EventTypeCode);
var method = typeof(ClusterClientExtensions).GetMethod(Consts.EventHandlerMethodName, new Type[] { eventType });
if (method == null)
{
throw new EventHandlerTargetMethodNotFoundException(Consts.EventHandlerMethodName, eventType.Name);
}
var data = serializer.Deserialize(transport.EventBytes, eventType);
var eventHandlerType = this.GetType();
var eventHandlerObjectParams = Expression.Parameter(typeof(object), "eventHandler");
var eventHandlerParams = Expression.Convert(eventHandlerObjectParams, eventHandlerType);
var eventObjectParams = Expression.Parameter(typeof(object), "event");
var eventParams = Expression.Convert(eventObjectParams, eventType);
var body = Expression.Call(method, eventHandlerParams, eventParams);
var func = Expression.Lambda<Func<object, object, Task>>(body, true, eventHandlerObjectParams, eventObjectParams).Compile();
var result = func(this, data);
logger.LogTrace("Invoke completed: {0}->{1}->{2}", grainType.FullName, Consts.EventHandlerMethodName, serializer.Serialize(data));
return result;
}
public override Task Invoke(List<EventBytesTransport> transports)
{
if (transports.Count() != 0)
{
var firstTransport = transports.First();
var eventType = eventTypeFinder.FindType(firstTransport.EventTypeCode);
var method = typeof(ClusterClientExtensions).GetMethod(Consts.BatchEventsHandlerMethodName, new Type[] { eventType });
if (method == null)
{
var tasks = transports.Select(transport => Invoke(transport));
return Task.WhenAll(tasks);
}
var datas = transports.Select(transport => serializer.Deserialize(firstTransport.EventBytes, eventType)).ToList();
var eventHandlerType = this.GetType();
var eventHandlerObjectParams = Expression.Parameter(typeof(object), "eventHandler");
var eventHandlerParams = Expression.Convert(eventHandlerObjectParams, eventHandlerType);
var eventObjectParams = Expression.Parameter(typeof(object), "events");
var eventsType = typeof(List<>).MakeGenericType(eventType);
var eventsParams = Expression.Convert(eventObjectParams, eventsType);
var body = Expression.Call(method, eventHandlerParams, eventsParams);
var func = Expression.Lambda<Func<object, object, Task>>(body, true, eventHandlerObjectParams, eventObjectParams).Compile();
var result = func(this, datas);
logger.LogTrace("Batch invoke completed: {0}->{1}->{2}", grainType.FullName, Consts.EventHandlerMethodName, serializer.Serialize(datas));
return result;
}
else
{
if (logger.IsEnabled(LogLevel.Information))
logger.LogInformation($"{nameof(EventBytesTransport.FromBytes)} failed");
return Task.CompletedTask;
}
}
}
}
using Orleans;
using Orleans.Concurrency;
using Pole.Core.EventBus.Event;
using System;
using System.Collections.Generic;
using System.Text;
......@@ -9,7 +10,7 @@ namespace Pole.Core.EventBus.EventHandler
{
public abstract class PoleEventHandlerBase : Grain
{
public abstract Task Invoke(Immutable<byte[]> bytes);
public abstract Task Invoke(Immutable<List<byte[]>> bytes);
public abstract Task Invoke(EventBytesTransport transport);
public abstract Task Invoke(List<EventBytesTransport> transports);
}
}
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Pole.Core.EventBus
{
public interface IBus
{
Task<bool> Publish(object @event, CancellationToken cancellationToken = default);
}
}
......@@ -4,6 +4,6 @@ namespace Pole.Core.EventBus
{
public interface IGrainID
{
Type GrainType { get; }
Type EventHandlerType { get; }
}
}
......@@ -4,6 +4,6 @@ namespace Pole.Core.EventBus
{
public interface IProducer
{
ValueTask Publish(byte[] bytes, string hashKey);
ValueTask Publish(byte[] bytes);
}
}
......@@ -20,21 +20,21 @@ namespace Pole.Core.EventBus
{
readonly IServiceProvider serviceProvider;
readonly ISerializer serializer;
readonly ITypeFinder typeFinder;
readonly IEventTypeFinder typeFinder;
readonly IClusterClient clusterClient;
Func<byte[], Task> eventHandler;
Func<List<byte[]>, Task> batchEventHandler;
protected ILogger Logger { get; private set; }
public Type GrainType { get; }
public Type EventHandlerType { get; }
public ObserverUnit(IServiceProvider serviceProvider, Type grainType)
public ObserverUnit(IServiceProvider serviceProvider, Type eventHandlerType)
{
this.serviceProvider = serviceProvider;
clusterClient = serviceProvider.GetService<IClusterClient>();
serializer = serviceProvider.GetService<ISerializer>();
typeFinder = serviceProvider.GetService<ITypeFinder>();
typeFinder = serviceProvider.GetService<IEventTypeFinder>();
Logger = serviceProvider.GetService<ILogger<ObserverUnit<PrimaryKey>>>();
GrainType = grainType;
EventHandlerType = eventHandlerType;
}
public static ObserverUnit<PrimaryKey> From<Grain>(IServiceProvider serviceProvider) where Grain : Orleans.Grain
{
......@@ -50,116 +50,58 @@ namespace Pole.Core.EventBus
{
return batchEventHandler;
}
public ObserverUnit<PrimaryKey> UnreliableObserver(
Func<IServiceProvider,
FullyEvent<PrimaryKey>, ValueTask> handler)
{
eventHandler = EventHandler;
batchEventHandler = BatchEventHandler;
return this;
//内部函数
Task EventHandler(byte[] bytes)
{
var (success, transport) = EventBytesTransport.FromBytes<PrimaryKey>(bytes);
if (success)
{
var data = serializer.Deserialize(transport.EventBytes, typeFinder.FindType(transport.EventTypeCode));
if (data is IEvent @event && transport.GrainId is PrimaryKey actorId)
{
var eventBase = EventBase.FromBytes(transport.BaseBytes);
var tellTask = handler(serviceProvider, new FullyEvent<PrimaryKey>
{
StateId = actorId,
Base = eventBase,
Event = @event
});
if (!tellTask.IsCompletedSuccessfully)
return tellTask.AsTask();
}
}
return Task.CompletedTask;
}
Task BatchEventHandler(List<byte[]> list)
{
var groups =
list.Select(b => EventBytesTransport.FromBytes<PrimaryKey>(b))
.Where(o => o.success)
.Select(o => o.transport)
.GroupBy(o => o.GrainId);
return Task.WhenAll(groups.Select(async kv =>
{
foreach (var transport in kv)
{
var data = serializer.Deserialize(transport.EventBytes, typeFinder.FindType(transport.EventTypeCode));
if (data is IEvent @event && transport.GrainId is PrimaryKey actorId)
{
var eventBase = EventBase.FromBytes(transport.BaseBytes);
var tellTask = handler(serviceProvider, new FullyEvent<PrimaryKey>
{
StateId = actorId,
Base = eventBase,
Event = @event
});
if (!tellTask.IsCompletedSuccessfully)
await tellTask;
}
}
}));
}
}
public void Observer(Type observerType)
public void Observer()
{
if (!typeof(PoleEventHandlerBase).IsAssignableFrom(observerType))
throw new NotSupportedException($"{observerType.FullName} must inheritance from PoleEventHandler");
if (!typeof(PoleEventHandlerBase).IsAssignableFrom(EventHandlerType))
throw new NotSupportedException($"{EventHandlerType.FullName} must inheritance from PoleEventHandler");
eventHandler = EventHandler;
batchEventHandler = BatchEventHandler;
//内部函数
Task EventHandler(byte[] bytes)
{
var (success, actorId) = EventBytesTransport.GetActorId<PrimaryKey>(bytes);
var (success, transport) = EventBytesTransport.FromBytes(bytes);
if (success)
{
return GetObserver(observerType, actorId).Invoke(new Immutable<byte[]>(bytes));
return GetObserver(EventHandlerType, transport.EventId).Invoke(transport);
}
else
{
if (Logger.IsEnabled(LogLevel.Error))
Logger.LogError($"{nameof(EventBytesTransport.GetActorId)} failed");
Logger.LogError($" EventId:{nameof(EventBytesTransport.EventId)} is not a event");
}
return Task.CompletedTask;
}
Task BatchEventHandler(List<byte[]> list)
{
var groups = list.Select(bytes =>
var transports = list.Select(bytes =>
{
var (success, GrainId) = EventBytesTransport.GetActorId<PrimaryKey>(bytes);
var (success, transport) = EventBytesTransport.FromBytes(bytes);
if (!success)
{
if (Logger.IsEnabled(LogLevel.Error))
Logger.LogError($"{nameof(EventBytesTransport.GetActorId)} failed");
Logger.LogError($" EventId:{nameof(EventBytesTransport.EventId)} is not a event");
}
return (success, GrainId, bytes);
}).Where(o => o.success).GroupBy(o => o.GrainId);
return Task.WhenAll(groups.Select(kv =>
{
var items = kv.Select(item => item.bytes).ToList();
return GetObserver(observerType, kv.Key).Invoke(new Immutable<List<byte[]>>(items));
}));
return (success, transport);
}).Where(o => o.success)
.Select(o => (o.transport))
.ToList();
// 批量处理的时候 grain Id 取第一个 event的id
return GetObserver(EventHandlerType, transports.First().EventId).Invoke(transports);
}
}
static readonly ConcurrentDictionary<Type, Func<IClusterClient, PrimaryKey, string, PoleEventHandlerBase>> _observerGeneratorDict = new ConcurrentDictionary<Type, Func<IClusterClient, PrimaryKey, string, PoleEventHandlerBase>>();
private PoleEventHandlerBase GetObserver(Type ObserverType, PrimaryKey primaryKey)
static readonly ConcurrentDictionary<Type, Func<IClusterClient, string, string, PoleEventHandlerBase>> _observerGeneratorDict = new ConcurrentDictionary<Type, Func<IClusterClient, string, string, PoleEventHandlerBase>>();
private PoleEventHandlerBase GetObserver(Type ObserverType, string primaryKey)
{
var func = _observerGeneratorDict.GetOrAdd(ObserverType, key =>
{
var clientType = typeof(IClusterClient);
var clientParams = Expression.Parameter(clientType, "client");
var primaryKeyParams = Expression.Parameter(typeof(PrimaryKey), "primaryKey");
var primaryKeyParams = Expression.Parameter(typeof(string), "primaryKey");
var grainClassNamePrefixParams = Expression.Parameter(typeof(string), "grainClassNamePrefix");
var method = typeof(ClusterClientExtensions).GetMethod("GetGrain", new Type[] { clientType, typeof(PrimaryKey), typeof(string) });
var method = typeof(ClusterClientExtensions).GetMethod("GetGrain", new Type[] { clientType, typeof(string), typeof(string) });
var body = Expression.Call(method.MakeGenericMethod(ObserverType), clientParams, primaryKeyParams, grainClassNamePrefixParams);
return Expression.Lambda<Func<IClusterClient, PrimaryKey, string, PoleEventHandlerBase>>(body, clientParams, primaryKeyParams, grainClassNamePrefixParams).Compile();
return Expression.Lambda<Func<IClusterClient, string, string, PoleEventHandlerBase>>(body, clientParams, primaryKeyParams, grainClassNamePrefixParams).Compile();
});
return func(clusterClient, primaryKey, null);
}
......
......@@ -24,9 +24,9 @@ namespace Pole.Core.EventBus
{
foreach (var attribute in type.GetCustomAttributes(false))
{
if (attribute is EventHandlerAttribute observer)
if (attribute is EventHandlerAttribute eventHandlerAttribute)
{
eventHandlerList.Add((type, observer));
eventHandlerList.Add((type, eventHandlerAttribute));
break;
}
}
......@@ -36,7 +36,7 @@ namespace Pole.Core.EventBus
{
var unitType = typeof(ObserverUnit<>).MakeGenericType(new Type[] { typeof(string) });
var unit = (ObserverUnit<string>)Activator.CreateInstance(unitType, serviceProvider, eventHandler.Item1);
unit.Observer();
Register<string>(eventHandler.Item2.EventName, unit);
}
}
......@@ -72,7 +72,7 @@ namespace Pole.Core.EventBus
}
if (!unitDict.TryAdd(observerName, new List<object> { observerUnit }))
{
throw new ObserverUnitRepeatedException(observerUnit.GrainType.FullName);
throw new ObserverUnitRepeatedException(observerUnit.EventHandlerType.FullName);
}
}
......
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.Core.Exceptions
{
public class EventHandlerTargetMethodNotFoundException: Exception
{
public EventHandlerTargetMethodNotFoundException(string methodName,string eventTypeName):base($"EventHandler method:{methodName} not found when eventHandler invoke , eventType:{eventTypeName}")
{
}
}
}
using Microsoft.Extensions.Logging;
using Pole.Core.Abstraction;
using Pole.Core.EventBus.Event;
using Pole.Core.Exceptions;
using Pole.Core.Utils;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Text;
namespace Pole.Core.Serialization
{
public class EventTypeFinder : IEventTypeFinder
{
private readonly ConcurrentDictionary<string, Type> codeDict = new ConcurrentDictionary<string, Type>();
private readonly ConcurrentDictionary<Type, string> typeDict = new ConcurrentDictionary<Type, string>();
readonly ILogger<EventTypeFinder> logger;
public EventTypeFinder(ILogger<EventTypeFinder> logger)
{
this.logger = logger;
var baseEventType = typeof(IEvent);
foreach (var assembly in AssemblyHelper.GetAssemblies(this.logger))
{
foreach (var type in assembly.GetTypes())
{
if (baseEventType.IsAssignableFrom(type))
{
typeDict.TryAdd(type, type.FullName);
if (!codeDict.TryAdd(type.FullName, type))
{
throw new TypeCodeRepeatedException(type.FullName, type.FullName);
}
}
}
}
}
/// <summary>
/// 通过code获取Type对象
/// </summary>
/// <param name="typeCode"></param>
/// <returns></returns>
public Type FindType(string typeCode)
{
if (codeDict.TryGetValue(typeCode, out Type type))
{
return type;
}
throw new UnknowTypeCodeException(typeCode);
}
/// <summary>
/// 获取Type对象的code字符串
/// </summary>
/// <param name="type"></param>
/// <returns></returns>
public string GetCode(Type type)
{
if (!typeDict.TryGetValue(type, out var value))
return type.FullName;
return value;
}
}
}
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.Core.Utils.Abstraction
{
public interface IGeneratorIdSolver
{
int GetGeneratorId();
}
}
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.Core.Utils.Abstraction
{
public interface ISnowflakeIdGenerator
{
public string NextId();
}
}
using System;
using System.Reflection;
using System.Reflection.Emit;
namespace Pole.Core.Utils.Emit
{
/// <summary>
/// 用来生成模式匹配方法调用的方法信息
/// </summary>
public class SwitchMethodEmit
{
/// <summary>
/// 方法
/// </summary>
public MethodInfo Mehod { get; set; }
/// <summary>
/// 匹配的类型
/// </summary>
public Type CaseType { get; set; }
/// <summary>
/// 局部变量
/// </summary>
public LocalBuilder DeclareLocal { get; set; }
/// <summary>
/// 方法调用Lable
/// </summary>
public Label Lable { get; set; }
/// <summary>
/// 方法的参数
/// </summary>
public ParameterInfo[] Parameters { get; set; }
/// <summary>
/// 方法在类中的顺序
/// </summary>
public int Index { get; set; }
}
}
using Pole.Core.Utils.Abstraction;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.NetworkInformation;
using System.Net.Sockets;
using System.Text;
namespace Pole.Core.Utils
{
public class InstanceIPV4_16IdGeneratorIdSolver : IGeneratorIdSolver
{
private int generatorId;
public InstanceIPV4_16IdGeneratorIdSolver()
{
NetworkInterface networkInterface = NetworkInterface.GetAllNetworkInterfaces()
.OrderByDescending(c => c.Speed)
.Where(m => m.NetworkInterfaceType != NetworkInterfaceType.Loopback && m.OperationalStatus == OperationalStatus.Up)
.FirstOrDefault();
var props = networkInterface.GetIPProperties();
// get first IPV4 address assigned to this interface
var firstIpV4Address = props.UnicastAddresses
.Where(c => c.Address.AddressFamily == AddressFamily.InterNetwork)
.Select(c => c.Address)
.FirstOrDefault();
var bytes = firstIpV4Address.GetAddressBytes();
generatorId = BitConverter.ToInt32(bytes, 2);
}
public int GetGeneratorId()
{
return generatorId;
}
}
}
using Pole.Core.Abstraction;
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.Core.Utils.Abstraction
{
public class SnowflakeIdGenerator : ISnowflakeIdGenerator
{
private static readonly DateTime Jan1st1970 = new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc);
private int generatorIdBits;
private long twepoch;
private int maxGeneratorId;
private const int SequenceAndGeneratorIdBits = 64 - 1 - 41;
/// <summary>
/// 这里的位数决定 每毫秒能生成的最大个数
/// </summary>
private int sequenceBits;
private int generatorIdShift;
private const int TimestampLeftShift = SequenceAndGeneratorIdBits;
private long sequenceMask;
public long GeneratorId { get; private set; }
// 毫秒内序列(0~4095)
public long Sequence { get; private set; }
// 上次生成ID的时间截
public long LastTimestamp { get; private set; }
/// <summary>
/// 时间戳为41位,可以使用69年,年T = (1L << 41) / (1000L * 60 * 60 * 24 * 365) = 69
/// </summary>
/// <param name="beginTime"></param>
/// <param name="generatorIdBits"></param>
public SnowflakeIdGenerator(DateTime beginTime, int generatorIdBits, long generatorId)
{
twepoch = Convert.ToInt64((beginTime.ToUniversalTime() - Jan1st1970).TotalMilliseconds);
this.generatorIdBits = generatorIdBits;
maxGeneratorId = -1 ^ (-1 << this.generatorIdBits);
sequenceBits = SequenceAndGeneratorIdBits - generatorIdBits;
generatorIdShift = sequenceBits;
sequenceMask = -1L ^ (-1L << sequenceBits);
GeneratorId = generatorId;
Sequence = 0L;
LastTimestamp = -1L;
}
public string NextId()
{
lock (this)
{
long timestamp = GetCurrentTimestamp();
if (timestamp > LastTimestamp) //时间戳改变,毫秒内序列重置
{
Sequence = 0L;
}
else if (timestamp == LastTimestamp) //如果是同一时间生成的,则进行毫秒内序列
{
Sequence = (Sequence + 1) & sequenceMask;
if (Sequence == 0) //毫秒内序列溢出
{
timestamp = GetNextTimestamp(LastTimestamp); //阻塞到下一个毫秒,获得新的时间戳
}
}
else //当前时间小于上一次ID生成的时间戳,证明系统时钟被回拨,此时需要做回拨处理
{
Sequence = (Sequence + 1) & sequenceMask;
if (Sequence > 0)
{
timestamp = LastTimestamp; //停留在最后一次时间戳上,等待系统时间追上后即完全度过了时钟回拨问题。
}
else //毫秒内序列溢出
{
timestamp = LastTimestamp + 1; //直接进位到下一个毫秒
}
}
LastTimestamp = timestamp; //上次生成ID的时间截
//移位并通过或运算拼到一起组成64位的ID
var id = ((timestamp - twepoch) << TimestampLeftShift)
| (GeneratorId << generatorIdShift)
| Sequence;
return id.ToString();
}
}
private static long GetCurrentTimestamp()
{
return (long)(DateTime.UtcNow - Jan1st1970).TotalMilliseconds;
}
private static long GetNextTimestamp(long lastTimestamp)
{
long timestamp = GetCurrentTimestamp();
while (timestamp <= lastTimestamp)
{
timestamp = GetCurrentTimestamp();
}
return timestamp;
}
}
}
......@@ -49,10 +49,7 @@ namespace Pole.EventBus.RabbitMQ
/// </summary>
public bool Persistent { get; set; }
public List<RabbitConsumer> Consumers { get; set; } = new List<RabbitConsumer>();
public string GetRoute(string key)
{
return RoutePrefix;
}
public RabbitEventBus BindEvent(Type eventType, string eventName)
{
Event = eventType;
......
......@@ -15,10 +15,10 @@ namespace Pole.EventBus.RabbitMQ
this.publisher = publisher;
this.rabbitMQClient = rabbitMQClient;
}
public ValueTask Publish(byte[] bytes, string hashKey)
public ValueTask Publish(byte[] bytes)
{
using var channel = rabbitMQClient.PullChannel();
channel.Publish(bytes, publisher.Exchange, publisher.GetRoute(hashKey), publisher.Persistent);
channel.Publish(bytes, publisher.Exchange, string.Empty, publisher.Persistent);
return Consts.ValueTaskDone;
}
}
......
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>netcoreapp3.1</TargetFramework>
<IsPackable>false</IsPackable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="xunit" Version="2.4.1" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.1">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\src\Pole.Core\Pole.Core.csproj" />
</ItemGroup>
</Project>
using Pole.Core.Utils;
using Pole.Core.Utils.Abstraction;
using System;
using System.Collections.Generic;
using System.Text;
using Xunit;
namespace Pole.Core.Test.Utils
{
public class InstanceIPV4_16IdGeneratorIdSolverTest
{
[Fact]
public void GetIPv4AddressBytesTest()
{
IGeneratorIdSolver instanceIPV4_16IdGeneratorIdSolver = new InstanceIPV4_16IdGeneratorIdSolver();
}
}
}
using System;
using Xunit;
namespace Pole.Core.Test
{
public class SnowflakeIdGeneratorTest
{
[Fact]
public void MaxYears()
{
var years = -1L ^ (-1L << 6);
Console.WriteLine(years);
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment