Commit 18f75144 by 丁松杰

add eventbus

parent b6c9d9df
Showing with 2950 additions and 1 deletions
......@@ -45,6 +45,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "integrationEvents", "integr
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Product.IntegrationEvents", "samples\intergrationEvents\Product.IntegrationEvents\Product.IntegrationEvents.csproj", "{9C0DFC90-1AF9-424A-B5FB-2A7C3611970C}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Pole.EventBus.Rabbitmq", "src\Pole.EventBus.Rabbitmq\Pole.EventBus.Rabbitmq.csproj", "{BDF62A19-FFBD-4EE1-A07A-68472E680A95}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
......@@ -111,6 +113,10 @@ Global
{9C0DFC90-1AF9-424A-B5FB-2A7C3611970C}.Debug|Any CPU.Build.0 = Debug|Any CPU
{9C0DFC90-1AF9-424A-B5FB-2A7C3611970C}.Release|Any CPU.ActiveCfg = Release|Any CPU
{9C0DFC90-1AF9-424A-B5FB-2A7C3611970C}.Release|Any CPU.Build.0 = Release|Any CPU
{BDF62A19-FFBD-4EE1-A07A-68472E680A95}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{BDF62A19-FFBD-4EE1-A07A-68472E680A95}.Debug|Any CPU.Build.0 = Debug|Any CPU
{BDF62A19-FFBD-4EE1-A07A-68472E680A95}.Release|Any CPU.ActiveCfg = Release|Any CPU
{BDF62A19-FFBD-4EE1-A07A-68472E680A95}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
......@@ -134,6 +140,7 @@ Global
{F65858EC-C34F-4121-BEC5-4E20DEA74A0A} = {475116FC-DEEC-4255-94E4-AE7B8C85038D}
{74422E64-29FE-4287-A86E-741D1DFF6698} = {4A0FB696-EC29-4A5F-B40B-A6FC56001ADB}
{9C0DFC90-1AF9-424A-B5FB-2A7C3611970C} = {74422E64-29FE-4287-A86E-741D1DFF6698}
{BDF62A19-FFBD-4EE1-A07A-68472E680A95} = {9932C965-8B38-4F70-9E43-86DC56860E2B}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {DB0775A3-F293-4043-ADB7-72BAC081E87E}
......
using System;
namespace Ray.Core.Abstractions
{
public interface IGrainID
{
Type GrainType { get; }
}
}
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
namespace Ray.Core.Abstractions
{
public interface IObserverUnit<PrimaryKey> : IGrainID
{
/// <summary>
/// 获取所有监听者分组
/// </summary>
/// <returns></returns>
List<string> GetGroups();
Task<long[]> GetAndSaveVersion(PrimaryKey primaryKey, long srcVersion);
/// <summary>
/// 重置Grain
/// </summary>
/// <param name="primaryKey">重置Grain</param>
/// <returns></returns>
Task Reset(PrimaryKey primaryKey);
List<Func<byte[], Task>> GetEventHandlers(string observerGroup);
List<Func<byte[], Task>> GetAllEventHandlers();
List<Func<List<byte[]>, Task>> GetBatchEventHandlers(string observerGroup);
List<Func<List<byte[]>, Task>> GetAllBatchEventHandlers();
}
}
using System;
namespace Ray.Core.Abstractions
{
public interface IObserverUnitContainer
{
IObserverUnit<PrimaryKey> GetUnit<PrimaryKey>(Type grainType);
object GetUnit(Type grainType);
void Register(IGrainID followUnit);
}
}
using System.Threading.Tasks;
namespace Ray.Core.Channels
{
public interface IBaseMpscChannel
{
/// <summary>
/// 是否已经完成
/// </summary>
bool IsComplete { get; }
/// <summary>
/// 是否是子级channel
/// </summary>
bool IsChildren { get; set; }
/// <summary>
/// 把一个mpscchannel关联到另外一个mpscchannel,只要有消息进入,所有关联的channel都会顺序的进行消息检查和处理
/// </summary>
/// <param name="channel"></param>
void JoinConsumerSequence(IBaseMpscChannel channel);
/// <summary>
/// 等待消息写入
/// </summary>
/// <returns></returns>
Task<bool> WaitToReadAsync();
Task ManualConsume();
void Complete();
}
}
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
namespace Ray.Core.Channels
{
public interface IMpscChannel<T> : IBaseMpscChannel
{
void BindConsumer(Func<List<T>, Task> consumer);
void BindConsumer(Func<List<T>, Task> consumer, int maxBatchSize, int maxMillisecondsDelay);
void Config(int maxBatchSize, int maxMillisecondsDelay);
ValueTask<bool> WriteAsync(T data);
}
}
using System.Threading.Tasks;
namespace Ray.Core.Channels
{
public class AsyncInputEvent<Input, Output>
{
public AsyncInputEvent(Input data)
{
Value = data;
}
public TaskCompletionSource<Output> TaskSource { get; } = new TaskCompletionSource<Output>();
public Input Value { get; set; }
}
}
namespace Ray.Core.Channels
{
public class ChannelOptions
{
/// <summary>
/// 批量数据处理每次处理的最大数据量
/// </summary>
public int MaxBatchSize { get; set; } = 100000;
/// <summary>
/// 批量数据接收的最大延时
/// </summary>
public int MaxMillisecondsDelay { get; set; } = 1000;
}
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace Ray.Core.Channels
{
/// <summary>
/// multi producter single consumer channel
/// </summary>
/// <typeparam name="T">data type produced by producer</typeparam>
public class MpscChannel<T> : IMpscChannel<T>
{
readonly BufferBlock<T> buffer = new BufferBlock<T>();
private Func<List<T>, Task> consumer;
readonly List<IBaseMpscChannel> consumerSequence = new List<IBaseMpscChannel>();
private Task<bool> waitToReadTask;
readonly ILogger logger;
/// <summary>
/// 是否在自动消费中
/// </summary>
private int _autoConsuming = 0;
public MpscChannel(ILogger<MpscChannel<T>> logger, IOptions<ChannelOptions> options)
{
this.logger = logger;
MaxBatchSize = options.Value.MaxBatchSize;
MaxMillisecondsDelay = options.Value.MaxMillisecondsDelay;
}
/// <summary>
/// 批量数据处理每次处理的最大数据量
/// </summary>
public int MaxBatchSize { get; set; }
/// <summary>
/// 批量数据接收的最大延时
/// </summary>
public int MaxMillisecondsDelay { get; set; }
public bool IsComplete { get; private set; }
public bool IsChildren { get; set; }
public void BindConsumer(Func<List<T>, Task> consumer)
{
if (this.consumer is null)
this.consumer = consumer;
else
throw new RebindConsumerException(GetType().Name);
}
public void BindConsumer(Func<List<T>, Task> consumer, int maxBatchSize, int maxMillisecondsDelay)
{
if (this.consumer is null)
{
this.consumer = consumer;
MaxBatchSize = maxBatchSize;
MaxMillisecondsDelay = maxMillisecondsDelay;
}
else
throw new RebindConsumerException(GetType().Name);
}
public void Config(int maxBatchSize, int maxMillisecondsDelay)
{
MaxBatchSize = maxBatchSize;
MaxMillisecondsDelay = maxMillisecondsDelay;
}
public async ValueTask<bool> WriteAsync(T data)
{
if (consumer is null)
throw new NoBindConsumerException(GetType().Name);
if (!IsChildren && _autoConsuming == 0)
ActiveAutoConsumer();
if (!buffer.Post(data))
return await buffer.SendAsync(data);
return true;
}
private void ActiveAutoConsumer()
{
if (!IsChildren && _autoConsuming == 0)
ThreadPool.QueueUserWorkItem(ActiveConsumer);
async void ActiveConsumer(object state)
{
if (Interlocked.CompareExchange(ref _autoConsuming, 1, 0) == 0)
{
try
{
while (await WaitToReadAsync())
{
try
{
await ManualConsume();
}
catch (Exception ex)
{
logger.LogError(ex, ex.Message);
}
}
}
finally
{
Interlocked.Exchange(ref _autoConsuming, 0);
}
}
}
}
public void JoinConsumerSequence(IBaseMpscChannel channel)
{
if (consumerSequence.IndexOf(channel) == -1)
{
channel.IsChildren = true;
consumerSequence.Add(channel);
}
}
public async Task ManualConsume()
{
if (waitToReadTask.IsCompletedSuccessfully && waitToReadTask.Result)
{
var dataList = new List<T>();
var startTime = DateTimeOffset.UtcNow;
while (buffer.TryReceive(out var value))
{
dataList.Add(value);
if (dataList.Count > MaxBatchSize)
{
break;
}
else if ((DateTimeOffset.UtcNow - startTime).TotalMilliseconds > MaxMillisecondsDelay)
{
break;
}
}
if (dataList.Count > 0)
await consumer(dataList);
}
foreach (var joinConsumer in consumerSequence)
{
await joinConsumer.ManualConsume();
}
}
public async Task<bool> WaitToReadAsync()
{
waitToReadTask = buffer.OutputAvailableAsync();
if (consumerSequence.Count == 0)
{
return await waitToReadTask;
}
else
{
var taskList = consumerSequence.Select(c => c.WaitToReadAsync()).ToList();
taskList.Add(waitToReadTask);
return await await Task.WhenAny(taskList);
}
}
public void Complete()
{
IsComplete = true;
foreach (var joinConsumer in consumerSequence)
{
joinConsumer.Complete();
}
buffer.Complete();
}
}
}
using System;
namespace Ray.Core.Channels
{
public class NoBindConsumerException : Exception
{
public NoBindConsumerException(string message) : base(message)
{
}
}
}
using System;
namespace Ray.Core.Channels
{
public class RebindConsumerException : Exception
{
public RebindConsumerException(string message) : base(message)
{
}
}
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace Ray.Core.EventBus
{
public abstract class Consumer : IConsumer
{
readonly List<Func<byte[], Task>> eventHandlers;
readonly List<Func<List<byte[]>, Task>> batchEventHandlers;
public Consumer(
List<Func<byte[], Task>> eventHandlers,
List<Func<List<byte[]>, Task>> batchEventHandlers)
{
this.eventHandlers = eventHandlers;
this.batchEventHandlers = batchEventHandlers;
}
public void AddHandler(Func<byte[], Task> func)
{
eventHandlers.Add(func);
}
public Task Notice(byte[] bytes)
{
return Task.WhenAll(eventHandlers.Select(func => func(bytes)));
}
public Task Notice(List<byte[]> list)
{
return Task.WhenAll(batchEventHandlers.Select(func => func(list)));
}
}
}
using System.Collections.Generic;
using System.Threading.Tasks;
namespace Ray.Core.EventBus
{
public interface IConsumer
{
Task Notice(byte[] bytes);
Task Notice(List<byte[]> list);
}
}
using System.Collections.Generic;
namespace Ray.Core.EventBus
{
public interface IConsumerContainer
{
List<IConsumer> GetConsumers();
}
}
using System.Threading.Tasks;
namespace Ray.Core.EventBus
{
public interface IProducer
{
ValueTask Publish(byte[] bytes, string hashKey);
}
}
using System;
using System.Threading.Tasks;
namespace Ray.Core.EventBus
{
public interface IProducerContainer
{
ValueTask<IProducer> GetProducer<T>();
ValueTask<IProducer> GetProducer(Type type);
}
}
using System;
namespace Ray.Core.Exceptions
{
public class BeginTxTimeoutException : Exception
{
public BeginTxTimeoutException(string stateId, long transactionId, Type type) :
base($"Grain type {type.FullName} with grainId {stateId} and transactionId {transactionId}")
{
}
}
}
using System;
namespace Ray.Core.Exceptions
{
public class ChannelUnavailabilityException : Exception
{
public ChannelUnavailabilityException(string id, Type grainType) : base($"Channel unavailability,type {grainType.FullName} with id {id}")
{
}
}
}
using System;
namespace Ray.Core.Exceptions
{
public class EventBusRepeatBindingProducerException : Exception
{
public EventBusRepeatBindingProducerException(string name) : base(name)
{
}
}
}
using System;
namespace Ray.Core.Exceptions
{
public class EventIsClearedException : Exception
{
public EventIsClearedException(string eventType, string eventJsonString, long archiveIndex) : base($"eventType:{eventType},event:{eventJsonString},archive index:{archiveIndex}")
{
}
}
}
using System;
namespace Ray.Core.Exceptions
{
public class EventVersionUnorderedException : Exception
{
public EventVersionUnorderedException(string id, Type type, long eventVersion, long stateVersion) :
base($"Event version and state version do not match of Grain type {type.FullName} and Id {id}.There state version are {stateVersion} and event version are {eventVersion}")
{
}
}
}
using System;
namespace Ray.Core.Exceptions
{
public class ObserverNotCompletedException : Exception
{
public ObserverNotCompletedException(string typeName, string stateId) : base($"{typeName} with id={stateId}")
{
}
}
}
using System;
namespace Ray.Core.Exceptions
{
public class ObserverUnitRepeatedException : Exception
{
public ObserverUnitRepeatedException(string name) : base(name)
{
}
}
}
using System;
namespace Ray.Core.Exceptions
{
public class PrimaryKeyTypeException : Exception
{
public PrimaryKeyTypeException(string name) : base(name)
{
}
}
}
using System;
namespace Ray.Core.Exceptions
{
public class RepeatedTxException : Exception
{
public RepeatedTxException(string stateId, long transactionId, Type type) :
base($"Grain type {type.FullName} with grainId {stateId} and transactionId {transactionId}")
{
}
}
}
using System;
namespace Ray.Core.Exceptions
{
public class SnapshotNotSupportTxException : Exception
{
public SnapshotNotSupportTxException(Type type) : base(type.FullName)
{
}
}
}
using System;
namespace Ray.Core.Exceptions
{
public class StateInsecurityException : Exception
{
public StateInsecurityException(string id, Type grainType, long doingVersion, long stateVersion) :
base($"State insecurity of Grain type {grainType.FullName} and Id {id},Maybe because the previous event failed to execute.There state version are {stateVersion} and doing version are {doingVersion}")
{
}
}
}
using System;
namespace Ray.Core.Exceptions
{
public class StateIsOverException : Exception
{
public StateIsOverException(string id, Type grainType) :
base($"State Is Over of Grain type {grainType.FullName} and Id {id}")
{
}
}
}
using System;
namespace Ray.Core.Exceptions
{
public class TxCommitException : Exception
{
}
}
using System;
namespace Ray.Core.Exceptions
{
public class TxIdException : Exception
{
}
}
using System;
namespace Ray.Core.Exceptions
{
public class TxSnapshotException : Exception
{
public TxSnapshotException(string stateId, long snapshotVersion, long backupSnapshotVersion) :
base($"StateId {stateId} and snapshot version {snapshotVersion} and backup snapshot version {backupSnapshotVersion}")
{
}
}
}
using System;
namespace Ray.Core.Exceptions
{
public class TypeCodeRepeatedException : Exception
{
public TypeCodeRepeatedException(string typeName, string typeFullName) : base($"Type named {typeName} was repeated of {typeFullName}.")
{
}
}
}
using System;
namespace Ray.Core.Exceptions
{
public class UnmatchObserverUnitException : Exception
{
public UnmatchObserverUnitException(string grainName, string unitName) : base($"{unitName} and {grainName} do not match")
{
}
}
}
using System;
namespace Ray.Core.Exceptions
{
public class UnfindEventHandlerException : Exception
{
public UnfindEventHandlerException(Type eventType) : base(eventType.FullName)
{
}
}
}
using System;
namespace Ray.Core.Exceptions
{
public class UnfindObserverUnitException : Exception
{
public UnfindObserverUnitException(string name) : base(name)
{
}
}
}
using System;
namespace Ray.Core.Exceptions
{
public class UnfindSnapshotHandlerException : Exception
{
public UnfindSnapshotHandlerException(Type grainType) : base(grainType.FullName)
{
}
}
}
using System;
namespace Ray.Core.Exceptions
{
public class UnknowTypeCodeException : Exception
{
public UnknowTypeCodeException(string typeName) : base(typeName)
{
}
}
}
using System;
namespace Ray.Core.Exceptions
{
public class UnopenedTransactionException : Exception
{
public UnopenedTransactionException(string id, Type grainType, string methodName) :
base($"Unopened transaction, cannot be invoke {methodName},type {grainType.FullName} with id {id}")
{
}
}
}
using System;
using System.Collections.Generic;
using System.Linq;
namespace Ray.Core.Observer
{
/// <summary>
/// 标记为观察者
/// </summary>
[AttributeUsage(AttributeTargets.Class, AllowMultiple = false)]
public class ObserverAttribute : Attribute
{
/// <summary>
/// 事件监听者标记
/// </summary>
/// <param name="group">监听者分组</param>
/// <param name="name">监听者名称(如果是shadow请设置为null)</param>
/// <param name="observable">被监听的Type</param>
/// <param name="observer">监听者的Type</param>
public ObserverAttribute(string group, string name, Type observable, Type observer = default)
{
Group = group;
Name = name;
Observable = observable;
Observer = observer;
}
/// <summary>
/// 监听者分组
/// </summary>
public string Group { get; set; }
/// <summary>
/// 监听者名称(如果是shadow请设置为null)
/// </summary>
public string Name { get; set; }
/// <summary>
/// 被监听的Type
/// </summary>
public Type Observable { get; set; }
/// <summary>
/// 监听者的Type
/// </summary>
public Type Observer { get; set; }
}
}
using System.Collections.Generic;
using System.Threading.Tasks;
using Orleans.Concurrency;
namespace Ray.Core.Observer
{
public interface IObserver : IVersion
{
Task OnNext(Immutable<byte[]> bytes);
Task OnNext(Immutable<List<byte[]>> items);
/// <summary>
/// 重置状态
/// </summary>
/// <returns></returns>
Task Reset();
}
}
using System.Threading.Tasks;
namespace Ray.Core.Observer
{
public interface IVersion
{
Task<long> GetVersion();
Task<long> GetAndSaveVersion(long compareVersion);
}
}
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
<TargetFramework>netstandard2.1</TargetFramework>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="MediatR.Extensions.Microsoft.DependencyInjection" Version="8.0.0" />
<PackageReference Include="Microsoft.Orleans.Core.Abstractions" Version="3.0.2" />
<PackageReference Include="Microsoft.Orleans.Runtime.Abstractions" Version="3.0.2" />
<PackageReference Include="System.Runtime.Loader" Version="4.3.0" />
<PackageReference Include="System.Threading.Tasks.Dataflow" Version="4.11.0" />
<PackageReference Include="System.Threading.Tasks.Extensions" Version="4.5.3" />
</ItemGroup>
</Project>
using System.Threading.Tasks;
using Orleans;
namespace Ray.Core.Services
{
public interface IHoldLock : IGrainWithStringKey
{
Task<(bool isOk, long lockId)> Lock(int holdingSeconds =30);
Task<bool> Hold(long lockId, int holdingSeconds = 30);
Task Unlock(long lockId);
}
}
using System.Threading.Tasks;
using Orleans;
using Orleans.Concurrency;
namespace Ray.Core.Services
{
public interface ILocalUID : IGrainWithStringKey
{
/// <summary>
/// 通过utc时间生成分布式唯一id
/// </summary>
[AlwaysInterleave]
Task<string> NewID();
}
}
using System.Threading.Tasks;
using Orleans;
namespace Ray.Core.Services
{
public interface ILock : IGrainWithStringKey
{
Task<bool> Lock(int millisecondsDelay = 0);
Task Unlock();
}
}
using Orleans;
using Orleans.Concurrency;
using System.Threading.Tasks;
namespace Ray.Core.Services
{
public interface IUtcUID : IGrainWithStringKey
{
/// <summary>
/// 通过utc时间生成分布式唯一id
/// </summary>
[AlwaysInterleave]
Task<string> NewID();
}
}
using System.Threading.Tasks;
using Orleans;
namespace Ray.Core.Services
{
public interface IWeightHoldLock : IGrainWithStringKey
{
Task<(bool isOk, long lockId, int expectMillisecondDelay)> Lock(int weight, int holdingSeconds = 30);
Task<bool> Hold(long lockId, int holdingSeconds = 30);
Task Unlock(long lockId);
}
}
using System;
using System.Threading.Tasks;
using Orleans;
namespace Ray.Core.Services
{
public class HoldLockGrain : Grain, IHoldLock
{
long lockId = 0;
long expireTime = 0;
public Task<(bool isOk, long lockId)> Lock(int holdingSeconds = 30)
{
var now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
if (lockId == 0 || now > expireTime)
{
lockId = now;
expireTime = now + holdingSeconds * 1000;
return Task.FromResult((true, now));
}
else
{
return Task.FromResult((false, (long)0));
}
}
public Task<bool> Hold(long lockId, int holdingSeconds = 30)
{
if (this.lockId == lockId)
{
expireTime = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() + holdingSeconds * 1000;
return Task.FromResult(true);
}
else
{
return Task.FromResult(false);
}
}
public Task Unlock(long lockId)
{
if (this.lockId == lockId)
{
this.lockId = 0;
expireTime = 0;
}
return Task.CompletedTask;
}
}
}
using Orleans;
using Orleans.Concurrency;
using System;
using System.Threading;
using System.Threading.Tasks;
namespace Ray.Core.Services
{
[Reentrant]
public class LocalUIDGrain : Grain, ILocalUID
{
int start_id = 1;
string start_string;
long start_long;
const int length = 19;
public LocalUIDGrain()
{
start_string = DateTimeOffset.Now.ToString("yyyyMMddHHmmss");
start_long = long.Parse(start_string); ;
}
public Task<string> NewID()
{
return Task.FromResult(GenerateUtcId());
string GenerateUtcId()
{
var now_string = DateTimeOffset.Now.ToString("yyyyMMddHHmmss");
var now_Long = long.Parse(now_string);
if (now_Long > start_long)
{
Interlocked.Exchange(ref start_string, now_string);
Interlocked.Exchange(ref start_long, now_Long);
Interlocked.Exchange(ref start_id, 0);
}
var builder = new Span<char>(new char[length]);
var newTimes = Interlocked.Increment(ref start_id);
if (newTimes <= 99999)
{
start_string.AsSpan().CopyTo(builder);
var timesString = newTimes.ToString();
for (int i = start_string.Length; i < length - timesString.Length; i++)
{
builder[i] = '0';
}
var span = length - timesString.Length;
for (int i = span; i < length; i++)
{
builder[i] = timesString[i - span];
}
return builder.ToString();
}
else
{
return GenerateUtcId();
}
}
}
}
}
using System.Threading;
using System.Threading.Tasks;
using Orleans;
using Orleans.Concurrency;
namespace Ray.Core.Services
{
[Reentrant]
public class LockGrain : Grain, ILock
{
int locked = 0;
TaskCompletionSource<bool> taskSource;
public async Task<bool> Lock(int millisecondsDelay = 0)
{
if (locked == 0)
{
locked = 1;
return true;
}
else
{
taskSource = new TaskCompletionSource<bool>();
if (millisecondsDelay != 0)
{
using var tc = new CancellationTokenSource(millisecondsDelay);
tc.Token.Register(() =>
{
taskSource.TrySetCanceled();
});
}
return await taskSource.Task;
}
}
public Task Unlock()
{
locked = 0;
taskSource.TrySetResult(true);
return Task.CompletedTask;
}
}
}
using Orleans;
using Orleans.Concurrency;
using System;
using System.Threading;
using System.Threading.Tasks;
namespace Ray.Core.Services
{
[Reentrant]
public class UtcUIDGrain : Grain, IUtcUID
{
int start_id = 1;
string start_string;
long start_long;
const int length = 19;
public UtcUIDGrain()
{
start_string = DateTimeOffset.UtcNow.ToString("yyyyMMddHHmmss");
start_long = long.Parse(start_string);
}
public Task<string> NewID()
{
return Task.FromResult(GenerateUtcId());
string GenerateUtcId()
{
var now_string = DateTimeOffset.UtcNow.ToString("yyyyMMddHHmmss");
var now_Long = long.Parse(now_string);
if (now_Long > start_long)
{
Interlocked.Exchange(ref start_string, now_string);
Interlocked.Exchange(ref start_long, now_Long);
Interlocked.Exchange(ref start_id, 0);
}
var builder = new Span<char>(new char[length]);
var newTimes = Interlocked.Increment(ref start_id);
if (newTimes <= 99999)
{
start_string.AsSpan().CopyTo(builder);
var timesString = newTimes.ToString();
for (int i = start_string.Length; i < length - timesString.Length; i++)
{
builder[i] = '0';
}
var span = length - timesString.Length;
for (int i = span; i < length; i++)
{
builder[i] = timesString[i - span];
}
return builder.ToString();
}
else
{
return GenerateUtcId();
}
}
}
}
}
using System;
using System.Threading.Tasks;
using Orleans;
namespace Ray.Core.Services
{
public class WeightHoldLock : Grain, IWeightHoldLock
{
long lockId = 0;
long expireTime = 0;
int currentWeight = 0;
int maxWaitWeight = -1;
public Task<bool> Hold(long lockId, int holdingSeconds = 30)
{
if (this.lockId == lockId && currentWeight >= maxWaitWeight)
{
expireTime = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() + holdingSeconds * 1000;
return Task.FromResult(true);
}
else
{
return Task.FromResult(false);
}
}
public Task<(bool isOk, long lockId, int expectMillisecondDelay)> Lock(int weight, int holdingSeconds = 30)
{
var now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
if (lockId == 0 || now > expireTime)
{
lockId = now;
currentWeight = weight;
maxWaitWeight = -1;
expireTime = now + holdingSeconds * 1000;
return Task.FromResult((true, now, 0));
}
if (weight >= maxWaitWeight && weight > currentWeight)
{
maxWaitWeight = weight;
return Task.FromResult((false, (long)0, (int)(expireTime - now)));
}
return Task.FromResult((false, (long)0, 0));
}
public Task Unlock(long lockId)
{
if (this.lockId == lockId)
{
this.lockId = 0;
currentWeight = 0;
expireTime = 0;
}
return Task.CompletedTask;
}
}
}
using Microsoft.Extensions.DependencyModel;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Runtime.Loader;
namespace Ray.Core.Utils
{
public class AssemblyHelper
{
public static IList<Assembly> GetAssemblies(ILogger logger = default)
{
var libs = DependencyContext.Default.CompileLibraries.Where(lib => !lib.Serviceable);
return libs.Select(lib =>
{
try
{
return AssemblyLoadContext.Default.LoadFromAssemblyName(new AssemblyName(lib.Name));
}
catch (Exception ex)
{
if (logger != default)
logger.LogWarning(ex, ex.Message);
return default;
}
}).Where(assembly => assembly != default).ToList();
}
}
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Text;
namespace Ray.Core.Utils
{
public class ConsistentHash
{
readonly SortedDictionary<int, string> circle = new SortedDictionary<int, string>();
int _replicate = 200; //default _replicate count
int[] ayKeys = null; //cache the ordered keys for better performance
//it's better you override the GetHashCode() of T.
//we will use GetHashCode() to identify different node.
public ConsistentHash(IEnumerable<string> nodes)
{
Init(nodes, _replicate);
}
public ConsistentHash(IEnumerable<string> nodes, int replicate)
{
Init(nodes, replicate);
}
private void Init(IEnumerable<string> nodes, int replicate)
{
_replicate = replicate;
foreach (string node in nodes)
{
Add(node, false);
}
ayKeys = circle.Keys.ToArray();
}
public void Add(string node)
{
Add(node, true);
}
public void Add(string node, bool updateKeyArray)
{
for (int i = 0; i < _replicate; i++)
{
int hash = BetterHash(node + i);
circle[hash] = node;
}
if (updateKeyArray)
{
ayKeys = circle.Keys.ToArray();
}
}
public void Remove(string node)
{
for (int i = 0; i < _replicate; i++)
{
int hash = BetterHash(node + i);
if (!circle.Remove(hash))
{
throw new Exception("can not remove a node that not added");
}
}
ayKeys = circle.Keys.ToArray();
}
public string GetNode(string key)
{
int first = First_ge(ayKeys, BetterHash(key));
return circle[ayKeys[first]];
}
//return the index of first item that >= val.
//if not exist, return 0;
//ay should be ordered array.
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static int First_ge(int[] ay, int val)
{
int begin = 0;
int end = ay.Length - 1;
if (ay[end] < val || ay[0] > val)
{
return 0;
}
while (end - begin > 1)
{
int mid = (end + begin) / 2;
if (ay[mid] >= val)
{
end = mid;
}
else
{
begin = mid;
}
}
if (ay[begin] > val || ay[end] < val)
{
throw new Exception("should not happen");
}
return end;
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static int BetterHash(string key)
{
return (int)MurmurHash2.Hash(Encoding.UTF8.GetBytes(key));
}
}
}
using System;
using System.Reflection;
using System.Reflection.Emit;
namespace Ray.Core.Utils.Emit
{
/// <summary>
/// 用来生成模式匹配方法调用的方法信息
/// </summary>
public class SwitchMethodEmit
{
/// <summary>
/// 方法
/// </summary>
public MethodInfo Mehod { get; set; }
/// <summary>
/// 匹配的类型
/// </summary>
public Type CaseType { get; set; }
/// <summary>
/// 局部变量
/// </summary>
public LocalBuilder DeclareLocal { get; set; }
/// <summary>
/// 方法调用Lable
/// </summary>
public Label Lable { get; set; }
/// <summary>
/// 方法的参数
/// </summary>
public ParameterInfo[] Parameters { get; set; }
/// <summary>
/// 方法在类中的顺序
/// </summary>
public int Index { get; set; }
}
}
using System.Runtime.InteropServices;
namespace Ray.Core.Utils
{
public class MurmurHash2
{
public static uint Hash(byte[] data)
{
return Hash(data, 0xc58f1a7b);
}
const uint m = 0x5bd1e995;
const int r = 24;
[StructLayout(LayoutKind.Explicit)]
struct BytetouintConverter
{
[FieldOffset(0)]
public byte[] Bytes;
[FieldOffset(0)]
public uint[] UInts;
}
public static uint Hash(byte[] data, uint seed)
{
int length = data.Length;
if (length == 0)
return 0;
uint h = seed ^ (uint)length;
int currentIndex = 0;
uint[] hackArray = new BytetouintConverter { Bytes = data }.UInts;
while (length >= 4)
{
uint k = hackArray[currentIndex++];
k *= m;
k ^= k >> r;
k *= m;
h *= m;
h ^= k;
length -= 4;
}
currentIndex *= 4; // fix the length
switch (length)
{
case 3:
h ^= (ushort)(data[currentIndex++] | data[currentIndex++] << 8);
h ^= (uint)data[currentIndex] << 16;
h *= m;
break;
case 2:
h ^= (ushort)(data[currentIndex++] | data[currentIndex] << 8);
h *= m;
break;
case 1:
h ^= data[currentIndex];
h *= m;
break;
default:
break;
}
// Do a few final mixes of the hash to ensure the last few
// bytes are well-incorporated.
h ^= h >> 13;
h *= m;
h ^= h >> 15;
return h;
}
}
}
using System;
using System.IO;
using System.Buffers;
namespace Ray.Core.Utils
{
public class PooledMemoryStream : Stream
{
/// <summary>create writable memory stream with default parameters</summary>
/// <remarks>buffer is allocated from ArrayPool.Shared</remarks>
public PooledMemoryStream()
: this(ArrayPool<byte>.Shared)
{
}
/// <summary>create writable memory stream with specified ArrayPool</summary>
/// <remarks>buffer is allocated from ArrayPool</remarks>
public PooledMemoryStream(ArrayPool<byte> pool)
: this(pool, 4096)
{
}
/// <summary>create writable memory stream with ensuring buffer length</summary>
/// <remarks>buffer is allocated from ArrayPool</remarks>
public PooledMemoryStream(ArrayPool<byte> pool, int capacity)
{
m_Pool = pool;
_currentbuffer = m_Pool.Rent(capacity);
_Length = 0;
_CanWrite = true;
_Position = 0;
}
/// <summary>create readonly MemoryStream without buffer copy</summary>
/// <remarks>data will be read from 'data' parameter</summary>
public PooledMemoryStream(byte[] data)
{
m_Pool = null;
_currentbuffer = data;
_Length = data.Length;
_CanWrite = false;
}
public override bool CanRead
{
get
{
return true;
}
}
public override bool CanSeek => true;
public override bool CanWrite => _CanWrite;
public override long Length => _Length;
public override long Position
{
get => _Position;
set
{
_Position = value;
}
}
public override void Flush()
{
}
public override int Read(byte[] buffer, int offset, int count)
{
int readlen = count > (int)(_Length - _Position) ? (int)(_Length - _Position) : count;
if (readlen > 0)
{
Buffer.BlockCopy(_currentbuffer
, (int)_Position
, buffer, offset
, readlen)
;
_Position += readlen;
return readlen;
}
else
{
return 0;
}
}
public override long Seek(long offset, SeekOrigin origin)
{
long oldValue = _Position;
switch ((int)origin)
{
case (int)SeekOrigin.Begin:
_Position = offset;
break;
case (int)SeekOrigin.End:
_Position = _Length - offset;
break;
case (int)SeekOrigin.Current:
_Position += offset;
break;
default:
throw new InvalidOperationException("unknown SeekOrigin");
}
if (_Position < 0 || _Position > _Length)
{
_Position = oldValue;
throw new IndexOutOfRangeException();
}
return _Position;
}
void ReallocateBuffer(int minimumRequired)
{
var tmp = m_Pool.Rent(minimumRequired);
Buffer.BlockCopy(_currentbuffer, 0, tmp, 0, _currentbuffer.Length);
m_Pool.Return(_currentbuffer);
_currentbuffer = tmp;
}
public override void SetLength(long value)
{
if (!_CanWrite)
{
throw new NotSupportedException("stream is readonly");
}
if (value > int.MaxValue)
{
throw new IndexOutOfRangeException("overflow");
}
if (value < 0)
{
throw new IndexOutOfRangeException("underflow");
}
_Length = value;
if (_currentbuffer.Length < _Length)
{
ReallocateBuffer((int)_Length);
}
}
/// <summary>write data to stream</summary>
/// <remarks>if stream data length is over int.MaxValue, this method throws IndexOutOfRangeException</remarks>
public override void Write(byte[] buffer, int offset, int count)
{
if (!_CanWrite)
{
throw new InvalidOperationException("stream is readonly");
}
long endOffset = _Position + count;
if (endOffset > _currentbuffer.Length)
{
ReallocateBuffer((int)(endOffset) * 2);
}
Buffer.BlockCopy(buffer, offset,
_currentbuffer, (int)_Position, count);
if (endOffset > _Length)
{
_Length = endOffset;
}
_Position = endOffset;
}
protected override void Dispose(bool disposing)
{
base.Dispose(disposing);
if (m_Pool != null && _currentbuffer != null)
{
m_Pool.Return(_currentbuffer);
_currentbuffer = null;
}
}
/// <summary>ensure the buffer size</summary>
/// <remarks>capacity != stream buffer length</remarks>
public void Reserve(int capacity)
{
if (capacity > _currentbuffer.Length)
{
ReallocateBuffer(capacity);
}
}
/// <summary>Create newly allocated buffer and copy the stream data</summary>
public byte[] ToArray()
{
var ret = new byte[_Length];
Buffer.BlockCopy(_currentbuffer, 0, ret, 0, (int)_Length);
return ret;
}
/// <summary>Create ArraySegment for current stream data without allocation buffer</summary>
/// <remarks>After disposing stream, manupilating returned value(read or write) may cause undefined behavior</remarks>
public ArraySegment<byte> ToUnsafeArraySegment()
{
return new ArraySegment<byte>(_currentbuffer, 0, (int)_Length);
}
public ReadOnlyMemory<byte> ToReadOnlyMemory()
{
return new ReadOnlyMemory<byte>(_currentbuffer, 0, (int)_Length);
}
ArrayPool<byte> m_Pool;
byte[] _currentbuffer;
readonly bool _CanWrite;
long _Length;
long _Position;
}
}
\ No newline at end of file
using RabbitMQ.Client;
using System.Collections.Generic;
using System.Threading;
namespace Ray.EventBus.RabbitMQ
{
public class ConnectionWrapper
{
private readonly List<ModelWrapper> models = new List<ModelWrapper>();
private readonly IConnection connection;
readonly SemaphoreSlim semaphoreSlim = new SemaphoreSlim(1);
public ConnectionWrapper(
IConnection connection,
RabbitOptions options)
{
this.connection = connection;
Options = options;
}
public RabbitOptions Options { get; }
public (bool success, ModelWrapper model) Get()
{
semaphoreSlim.Wait();
try
{
if (models.Count < Options.PoolSizePerConnection)
{
var model = new ModelWrapper(this, connection.CreateModel());
models.Add(model);
return (true, model);
}
}
finally
{
semaphoreSlim.Release();
}
return (false, default);
}
public void Return(ModelWrapper model)
{
models.Remove(model);
}
}
}
namespace Ray.EventBus.RabbitMQ
{
public interface IRabbitMQClient
{
ModelWrapper PullModel();
}
}
using Microsoft.Extensions.ObjectPool;
using RabbitMQ.Client;
using System.Collections.Generic;
using System.Threading;
namespace Ray.EventBus.RabbitMQ
{
public class ModelPooledObjectPolicy : IPooledObjectPolicy<ModelWrapper>
{
readonly ConnectionFactory connectionFactory;
readonly List<ConnectionWrapper> connections = new List<ConnectionWrapper>();
readonly SemaphoreSlim semaphoreSlim = new SemaphoreSlim(1);
readonly RabbitOptions options;
public ModelPooledObjectPolicy(ConnectionFactory connectionFactory, RabbitOptions options)
{
this.connectionFactory = connectionFactory;
this.options = options;
}
public ModelWrapper Create()
{
foreach (var connection in connections)
{
(bool success, ModelWrapper model) = connection.Get();
if (success)
return model;
}
semaphoreSlim.Wait();
try
{
if (connections.Count < options.MaxConnection)
{
var connection = new ConnectionWrapper(connectionFactory.CreateConnection(options.EndPoints), options);
(bool success, ModelWrapper model) = connection.Get();
connections.Add(connection);
if (success)
return model;
}
throw new System.OverflowException(nameof(connections));
}
finally
{
semaphoreSlim.Release();
}
}
public bool Return(ModelWrapper obj)
{
if (obj.Model.IsOpen)
return true;
else
{
obj.ForceDispose();
return false;
}
}
}
}
using Microsoft.Extensions.ObjectPool;
using RabbitMQ.Client;
using System;
namespace Ray.EventBus.RabbitMQ
{
public class ModelWrapper : IDisposable
{
readonly IBasicProperties persistentProperties;
readonly IBasicProperties noPersistentProperties;
public DefaultObjectPool<ModelWrapper> Pool { get; set; }
public ConnectionWrapper Connection { get; set; }
public IModel Model { get; set; }
public ModelWrapper(
ConnectionWrapper connectionWrapper,
IModel model)
{
Connection = connectionWrapper;
Model = model;
persistentProperties = Model.CreateBasicProperties();
persistentProperties.Persistent = true;
noPersistentProperties = Model.CreateBasicProperties();
noPersistentProperties.Persistent = false;
}
public void Publish(byte[] msg, string exchange, string routingKey, bool persistent = true)
{
Model.BasicPublish(exchange, routingKey, persistent ? persistentProperties : noPersistentProperties, msg);
}
public void Dispose()
{
Pool.Return(this);
}
public void ForceDispose()
{
Model.Close();
Model.Dispose();
Connection.Return(this);
}
}
}
using Microsoft.Extensions.ObjectPool;
using Microsoft.Extensions.Options;
using RabbitMQ.Client;
namespace Ray.EventBus.RabbitMQ
{
public class RabbitMQClient : IRabbitMQClient
{
readonly ConnectionFactory connectionFactory;
readonly RabbitOptions options;
readonly DefaultObjectPool<ModelWrapper> pool;
public RabbitMQClient(IOptions<RabbitOptions> config)
{
options = config.Value;
connectionFactory = new ConnectionFactory
{
UserName = options.UserName,
Password = options.Password,
VirtualHost = options.VirtualHost,
AutomaticRecoveryEnabled = false
};
pool = new DefaultObjectPool<ModelWrapper>(new ModelPooledObjectPolicy(connectionFactory, options));
}
public ModelWrapper PullModel()
{
var result = pool.Get();
if (result.Pool is null)
result.Pool = pool;
return result;
}
}
}
namespace Ray.EventBus.RabbitMQ
{
/// <summary>
/// Consumer配置信息
/// </summary>
public class ConsumerOptions
{
/// <summary>
/// 是否自动ack
/// </summary>
public bool AutoAck { get; set; }
/// <summary>
/// 消息处理失败是否重回队列还是不停重发
/// </summary>
public bool Reenqueue { get; set; }
}
}
using System.Collections.Generic;
using RabbitMQ.Client;
namespace Ray.EventBus.RabbitMQ
{
public class RabbitOptions
{
public string UserName { get; set; }
public string Password { get; set; }
public string VirtualHost { get; set; }
public int PoolSizePerConnection { get; set; } = 200;
public int MaxConnection { get; set; } = 20;
/// <summary>
/// 消费者批量处理每次处理的最大消息量
/// </summary>
public ushort CunsumerMaxBatchSize { get; set; } = 3000;
/// <summary>
/// 消费者批量处理每次处理的最大延时
/// </summary>
public int CunsumerMaxMillisecondsInterval { get; set; } = 1000;
public string[] Hosts
{
get; set;
}
public List<AmqpTcpEndpoint> EndPoints
{
get
{
var list = new List<AmqpTcpEndpoint>();
foreach (var host in Hosts)
{
list.Add(AmqpTcpEndpoint.Parse(host));
}
return list;
}
}
}
}
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Orleans;
using Ray.Core.Services;
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace Ray.EventBus.RabbitMQ
{
public class ConsumerManager : IHostedService, IDisposable
{
readonly ILogger<ConsumerManager> logger;
readonly IRabbitMQClient client;
readonly IRabbitEventBusContainer rabbitEventBusContainer;
readonly IServiceProvider provider;
readonly IGrainFactory grainFactory;
const int _HoldTime = 20 * 1000;
const int _MonitTime = 60 * 2 * 1000;
const int _checkTime = 10 * 1000;
public ConsumerManager(
ILogger<ConsumerManager> logger,
IRabbitMQClient client,
IGrainFactory grainFactory,
IServiceProvider provider,
IRabbitEventBusContainer rabbitEventBusContainer)
{
this.provider = provider;
this.client = client;
this.logger = logger;
this.rabbitEventBusContainer = rabbitEventBusContainer;
this.grainFactory = grainFactory;
}
private readonly ConcurrentDictionary<string, ConsumerRunner> ConsumerRunners = new ConcurrentDictionary<string, ConsumerRunner>();
private ConcurrentDictionary<string, long> Runners { get; } = new ConcurrentDictionary<string, long>();
private Timer HeathCheckTimer { get; set; }
private Timer DistributedMonitorTime { get; set; }
private Timer DistributedHoldTimer { get; set; }
const int lockHoldingSeconds = 60;
int distributedMonitorTimeLock = 0;
int distributedHoldTimerLock = 0;
int heathCheckTimerLock = 0;
private async Task DistributedStart()
{
try
{
if (Interlocked.CompareExchange(ref distributedMonitorTimeLock, 1, 0) == 0)
{
var consumers = rabbitEventBusContainer.GetConsumers();
foreach (var consumer in consumers)
{
if (consumer is RabbitConsumer value)
{
for (int i = 0; i < value.QueueList.Count(); i++)
{
var queue = value.QueueList[i];
var key = queue.ToString();
if (!Runners.ContainsKey(key))
{
var weight = 100000 - Runners.Count;
var (isOk, lockId, expectMillisecondDelay) = await grainFactory.GetGrain<IWeightHoldLock>(key).Lock(weight, lockHoldingSeconds);
if (isOk)
{
if (Runners.TryAdd(key, lockId))
{
var runner = new ConsumerRunner(client, provider, value, queue);
ConsumerRunners.TryAdd(key, runner);
await runner.Run();
}
}
}
}
}
}
Interlocked.Exchange(ref distributedMonitorTimeLock, 0);
if (logger.IsEnabled(LogLevel.Information))
logger.LogInformation("EventBus Background Service is working.");
}
}
catch (Exception exception)
{
logger.LogError(exception.InnerException ?? exception, nameof(DistributedStart));
Interlocked.Exchange(ref distributedMonitorTimeLock, 0);
}
}
private async Task DistributedHold()
{
try
{
if (logger.IsEnabled(LogLevel.Information))
logger.LogInformation("EventBus Background Service is holding.");
if (Interlocked.CompareExchange(ref distributedHoldTimerLock, 1, 0) == 0)
{
foreach (var lockKV in Runners)
{
if (Runners.TryGetValue(lockKV.Key, out var lockId))
{
var holdResult = await grainFactory.GetGrain<IWeightHoldLock>(lockKV.Key).Hold(lockId, lockHoldingSeconds);
if (!holdResult)
{
if (ConsumerRunners.TryRemove(lockKV.Key, out var runner))
{
runner.Close();
}
Runners.TryRemove(lockKV.Key, out var _);
}
}
}
Interlocked.Exchange(ref distributedHoldTimerLock, 0);
}
}
catch (Exception exception)
{
logger.LogError(exception.InnerException ?? exception, nameof(DistributedHold));
Interlocked.Exchange(ref distributedHoldTimerLock, 0);
}
}
private async Task HeathCheck()
{
try
{
if (logger.IsEnabled(LogLevel.Information))
logger.LogInformation("EventBus Background Service is checking.");
if (Interlocked.CompareExchange(ref heathCheckTimerLock, 1, 0) == 0)
{
await Task.WhenAll(ConsumerRunners.Values.Select(runner => runner.HeathCheck()));
Interlocked.Exchange(ref heathCheckTimerLock, 0);
}
}
catch (Exception exception)
{
logger.LogError(exception.InnerException ?? exception, nameof(HeathCheck));
Interlocked.Exchange(ref heathCheckTimerLock, 0);
}
}
public Task StartAsync(CancellationToken cancellationToken)
{
if (logger.IsEnabled(LogLevel.Information))
logger.LogInformation("EventBus Background Service is starting.");
DistributedMonitorTime = new Timer(state => DistributedStart().Wait(), null, 1000, _MonitTime);
DistributedHoldTimer = new Timer(state => DistributedHold().Wait(), null, _HoldTime, _HoldTime);
HeathCheckTimer = new Timer(state => { HeathCheck().Wait(); }, null, _checkTime, _checkTime);
return Task.CompletedTask;
}
public Task StopAsync(CancellationToken cancellationToken)
{
if (logger.IsEnabled(LogLevel.Information))
logger.LogInformation("EventBus Background Service is stopping.");
Dispose();
return Task.CompletedTask;
}
public void Dispose()
{
if (logger.IsEnabled(LogLevel.Information))
logger.LogInformation("EventBus Background Service is disposing.");
foreach (var runner in ConsumerRunners.Values)
{
runner.Close();
}
DistributedMonitorTime?.Dispose();
DistributedHoldTimer?.Dispose();
HeathCheckTimer?.Dispose();
}
}
}
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using Ray.Core.Channels;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace Ray.EventBus.RabbitMQ
{
public class ConsumerRunner
{
readonly IMpscChannel<BasicDeliverEventArgs> mpscChannel;
public ConsumerRunner(
IRabbitMQClient client,
IServiceProvider provider,
RabbitConsumer consumer,
QueueInfo queue)
{
Client = client;
Logger = provider.GetService<ILogger<ConsumerRunner>>();
mpscChannel = provider.GetService<IMpscChannel<BasicDeliverEventArgs>>();
mpscChannel.BindConsumer(BatchExecuter);
Consumer = consumer;
Queue = queue;
}
public ILogger<ConsumerRunner> Logger { get; }
public IRabbitMQClient Client { get; }
public RabbitConsumer Consumer { get; }
public QueueInfo Queue { get; }
public ModelWrapper Model { get; set; }
public EventingBasicConsumer BasicConsumer { get; set; }
public bool IsUnAvailable => !BasicConsumer.IsRunning || Model.Model.IsClosed;
private bool isFirst = true;
public Task Run()
{
Model = Client.PullModel();
mpscChannel.Config(Model.Connection.Options.CunsumerMaxBatchSize, Model.Connection.Options.CunsumerMaxMillisecondsInterval);
if (isFirst)
{
isFirst = false;
Model.Model.ExchangeDeclare(Consumer.EventBus.Exchange, "direct", true);
Model.Model.QueueDeclare(Queue.Queue, true, false, false, null);
Model.Model.QueueBind(Queue.Queue, Consumer.EventBus.Exchange, Queue.RoutingKey);
}
Model.Model.BasicQos(0, Model.Connection.Options.CunsumerMaxBatchSize, false);
BasicConsumer = new EventingBasicConsumer(Model.Model);
BasicConsumer.Received += async (ch, ea) => await mpscChannel.WriteAsync(ea);
BasicConsumer.ConsumerTag = Model.Model.BasicConsume(Queue.Queue, Consumer.Config.AutoAck, BasicConsumer);
return Task.CompletedTask;
}
public Task HeathCheck()
{
if (IsUnAvailable)
{
Close();
return Run();
}
else
return Task.CompletedTask;
}
private async Task BatchExecuter(List<BasicDeliverEventArgs> list)
{
if (list.Count == 1)
{
await Process(list.First());
}
else
{
try
{
await Consumer.Notice(list.Select(o => o.Body).ToList());
if (!Consumer.Config.AutoAck)
{
Model.Model.BasicAck(list.Max(o => o.DeliveryTag), true);
}
}
catch (Exception exception)
{
Logger.LogError(exception.InnerException ?? exception, $"An error occurred in {Consumer.EventBus.Exchange}-{Queue}");
if (Consumer.Config.Reenqueue)
{
await Task.Delay(1000);
foreach (var item in list)
{
Model.Model.BasicReject(item.DeliveryTag, true);
}
}
}
}
}
private async Task Process(BasicDeliverEventArgs ea)
{
try
{
await Consumer.Notice(ea.Body);
if (!Consumer.Config.AutoAck)
{
Model.Model.BasicAck(ea.DeliveryTag, false);
}
}
catch (Exception exception)
{
Logger.LogError(exception.InnerException ?? exception, $"An error occurred in {Consumer.EventBus.Exchange}-{Queue}");
if (Consumer.Config.Reenqueue)
{
await Task.Delay(1000);
Model.Model.BasicReject(ea.DeliveryTag, true);
}
}
}
public void Close()
{
Model?.Dispose();
}
}
}
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Ray.Core.EventBus;
namespace Ray.EventBus.RabbitMQ
{
public class RabbitConsumer : Consumer
{
public RabbitConsumer(
List<Func<byte[], Task>> eventHandlers,
List<Func<List<byte[]>, Task>> batchEventHandlers) : base(eventHandlers, batchEventHandlers)
{
}
public RabbitEventBus EventBus { get; set; }
public List<QueueInfo> QueueList { get; set; }
public ConsumerOptions Config { get; set; }
}
}
using System;
namespace Ray.EventBus.RabbitMQ
{
[AttributeUsage(AttributeTargets.Class, AllowMultiple = false)]
public class ProducerAttribute : Attribute
{
public ProducerAttribute(string exchange = null, string routePrefix = null, int lBCount = 1, bool autoAck = false, bool reenqueue = false, bool persistent = false)
{
Exchange = exchange;
RoutePrefix = routePrefix;
LBCount = lBCount;
AutoAck = autoAck;
Reenqueue = reenqueue;
Persistent = persistent;
}
public string Exchange { get; }
public string RoutePrefix { get; }
public int LBCount { get; }
public bool AutoAck { get; set; }
public bool Reenqueue { get; set; }
public bool Persistent { get; set; }
}
}
using RabbitMQ.Client;
using System.Collections.Generic;
using System.Threading;
namespace Ray.EventBus.RabbitMQ
{
public class ConnectionWrapper
{
private readonly List<ModelWrapper> models = new List<ModelWrapper>();
private readonly IConnection connection;
readonly SemaphoreSlim semaphoreSlim = new SemaphoreSlim(1);
public ConnectionWrapper(
IConnection connection,
RabbitOptions options)
{
this.connection = connection;
Options = options;
}
public RabbitOptions Options { get; }
public (bool success, ModelWrapper model) Get()
{
semaphoreSlim.Wait();
try
{
if (models.Count < Options.PoolSizePerConnection)
{
var model = new ModelWrapper(this, connection.CreateModel());
models.Add(model);
return (true, model);
}
}
finally
{
semaphoreSlim.Release();
}
return (false, default);
}
public void Return(ModelWrapper model)
{
models.Remove(model);
}
}
}
namespace Ray.EventBus.RabbitMQ
{
public interface IRabbitMQClient
{
ModelWrapper PullModel();
}
}
using Microsoft.Extensions.ObjectPool;
using RabbitMQ.Client;
using System.Collections.Generic;
using System.Threading;
namespace Ray.EventBus.RabbitMQ
{
public class ModelPooledObjectPolicy : IPooledObjectPolicy<ModelWrapper>
{
readonly ConnectionFactory connectionFactory;
readonly List<ConnectionWrapper> connections = new List<ConnectionWrapper>();
readonly SemaphoreSlim semaphoreSlim = new SemaphoreSlim(1);
readonly RabbitOptions options;
public ModelPooledObjectPolicy(ConnectionFactory connectionFactory, RabbitOptions options)
{
this.connectionFactory = connectionFactory;
this.options = options;
}
public ModelWrapper Create()
{
foreach (var connection in connections)
{
(bool success, ModelWrapper model) = connection.Get();
if (success)
return model;
}
semaphoreSlim.Wait();
try
{
if (connections.Count < options.MaxConnection)
{
var connection = new ConnectionWrapper(connectionFactory.CreateConnection(options.EndPoints), options);
(bool success, ModelWrapper model) = connection.Get();
connections.Add(connection);
if (success)
return model;
}
throw new System.OverflowException(nameof(connections));
}
finally
{
semaphoreSlim.Release();
}
}
public bool Return(ModelWrapper obj)
{
if (obj.Model.IsOpen)
return true;
else
{
obj.ForceDispose();
return false;
}
}
}
}
using Microsoft.Extensions.ObjectPool;
using RabbitMQ.Client;
using System;
namespace Ray.EventBus.RabbitMQ
{
public class ModelWrapper : IDisposable
{
readonly IBasicProperties persistentProperties;
readonly IBasicProperties noPersistentProperties;
public DefaultObjectPool<ModelWrapper> Pool { get; set; }
public ConnectionWrapper Connection { get; set; }
public IModel Model { get; set; }
public ModelWrapper(
ConnectionWrapper connectionWrapper,
IModel model)
{
Connection = connectionWrapper;
Model = model;
persistentProperties = Model.CreateBasicProperties();
persistentProperties.Persistent = true;
noPersistentProperties = Model.CreateBasicProperties();
noPersistentProperties.Persistent = false;
}
public void Publish(byte[] msg, string exchange, string routingKey, bool persistent = true)
{
Model.BasicPublish(exchange, routingKey, persistent ? persistentProperties : noPersistentProperties, msg);
}
public void Dispose()
{
Pool.Return(this);
}
public void ForceDispose()
{
Model.Close();
Model.Dispose();
Connection.Return(this);
}
}
}
using Microsoft.Extensions.ObjectPool;
using Microsoft.Extensions.Options;
using RabbitMQ.Client;
namespace Ray.EventBus.RabbitMQ
{
public class RabbitMQClient : IRabbitMQClient
{
readonly ConnectionFactory connectionFactory;
readonly RabbitOptions options;
readonly DefaultObjectPool<ModelWrapper> pool;
public RabbitMQClient(IOptions<RabbitOptions> config)
{
options = config.Value;
connectionFactory = new ConnectionFactory
{
UserName = options.UserName,
Password = options.Password,
VirtualHost = options.VirtualHost,
AutomaticRecoveryEnabled = false
};
pool = new DefaultObjectPool<ModelWrapper>(new ModelPooledObjectPolicy(connectionFactory, options));
}
public ModelWrapper PullModel()
{
var result = pool.Get();
if (result.Pool is null)
result.Pool = pool;
return result;
}
}
}
namespace Ray.EventBus.RabbitMQ
{
/// <summary>
/// Consumer配置信息
/// </summary>
public class ConsumerOptions
{
/// <summary>
/// 是否自动ack
/// </summary>
public bool AutoAck { get; set; }
/// <summary>
/// 消息处理失败是否重回队列还是不停重发
/// </summary>
public bool Reenqueue { get; set; }
}
}
using System.Collections.Generic;
using RabbitMQ.Client;
namespace Ray.EventBus.RabbitMQ
{
public class RabbitOptions
{
public string UserName { get; set; }
public string Password { get; set; }
public string VirtualHost { get; set; }
public int PoolSizePerConnection { get; set; } = 200;
public int MaxConnection { get; set; } = 20;
/// <summary>
/// 消费者批量处理每次处理的最大消息量
/// </summary>
public ushort CunsumerMaxBatchSize { get; set; } = 3000;
/// <summary>
/// 消费者批量处理每次处理的最大延时
/// </summary>
public int CunsumerMaxMillisecondsInterval { get; set; } = 1000;
public string[] Hosts
{
get; set;
}
public List<AmqpTcpEndpoint> EndPoints
{
get
{
var list = new List<AmqpTcpEndpoint>();
foreach (var host in Hosts)
{
list.Add(AmqpTcpEndpoint.Parse(host));
}
return list;
}
}
}
}
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Orleans;
using Ray.Core.Services;
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace Ray.EventBus.RabbitMQ
{
public class ConsumerManager : IHostedService, IDisposable
{
readonly ILogger<ConsumerManager> logger;
readonly IRabbitMQClient client;
readonly IRabbitEventBusContainer rabbitEventBusContainer;
readonly IServiceProvider provider;
readonly IGrainFactory grainFactory;
const int _HoldTime = 20 * 1000;
const int _MonitTime = 60 * 2 * 1000;
const int _checkTime = 10 * 1000;
public ConsumerManager(
ILogger<ConsumerManager> logger,
IRabbitMQClient client,
IGrainFactory grainFactory,
IServiceProvider provider,
IRabbitEventBusContainer rabbitEventBusContainer)
{
this.provider = provider;
this.client = client;
this.logger = logger;
this.rabbitEventBusContainer = rabbitEventBusContainer;
this.grainFactory = grainFactory;
}
private readonly ConcurrentDictionary<string, ConsumerRunner> ConsumerRunners = new ConcurrentDictionary<string, ConsumerRunner>();
private ConcurrentDictionary<string, long> Runners { get; } = new ConcurrentDictionary<string, long>();
private Timer HeathCheckTimer { get; set; }
private Timer DistributedMonitorTime { get; set; }
private Timer DistributedHoldTimer { get; set; }
const int lockHoldingSeconds = 60;
int distributedMonitorTimeLock = 0;
int distributedHoldTimerLock = 0;
int heathCheckTimerLock = 0;
private async Task DistributedStart()
{
try
{
if (Interlocked.CompareExchange(ref distributedMonitorTimeLock, 1, 0) == 0)
{
var consumers = rabbitEventBusContainer.GetConsumers();
foreach (var consumer in consumers)
{
if (consumer is RabbitConsumer value)
{
for (int i = 0; i < value.QueueList.Count(); i++)
{
var queue = value.QueueList[i];
var key = queue.ToString();
if (!Runners.ContainsKey(key))
{
var weight = 100000 - Runners.Count;
var (isOk, lockId, expectMillisecondDelay) = await grainFactory.GetGrain<IWeightHoldLock>(key).Lock(weight, lockHoldingSeconds);
if (isOk)
{
if (Runners.TryAdd(key, lockId))
{
var runner = new ConsumerRunner(client, provider, value, queue);
ConsumerRunners.TryAdd(key, runner);
await runner.Run();
}
}
}
}
}
}
Interlocked.Exchange(ref distributedMonitorTimeLock, 0);
if (logger.IsEnabled(LogLevel.Information))
logger.LogInformation("EventBus Background Service is working.");
}
}
catch (Exception exception)
{
logger.LogError(exception.InnerException ?? exception, nameof(DistributedStart));
Interlocked.Exchange(ref distributedMonitorTimeLock, 0);
}
}
private async Task DistributedHold()
{
try
{
if (logger.IsEnabled(LogLevel.Information))
logger.LogInformation("EventBus Background Service is holding.");
if (Interlocked.CompareExchange(ref distributedHoldTimerLock, 1, 0) == 0)
{
foreach (var lockKV in Runners)
{
if (Runners.TryGetValue(lockKV.Key, out var lockId))
{
var holdResult = await grainFactory.GetGrain<IWeightHoldLock>(lockKV.Key).Hold(lockId, lockHoldingSeconds);
if (!holdResult)
{
if (ConsumerRunners.TryRemove(lockKV.Key, out var runner))
{
runner.Close();
}
Runners.TryRemove(lockKV.Key, out var _);
}
}
}
Interlocked.Exchange(ref distributedHoldTimerLock, 0);
}
}
catch (Exception exception)
{
logger.LogError(exception.InnerException ?? exception, nameof(DistributedHold));
Interlocked.Exchange(ref distributedHoldTimerLock, 0);
}
}
private async Task HeathCheck()
{
try
{
if (logger.IsEnabled(LogLevel.Information))
logger.LogInformation("EventBus Background Service is checking.");
if (Interlocked.CompareExchange(ref heathCheckTimerLock, 1, 0) == 0)
{
await Task.WhenAll(ConsumerRunners.Values.Select(runner => runner.HeathCheck()));
Interlocked.Exchange(ref heathCheckTimerLock, 0);
}
}
catch (Exception exception)
{
logger.LogError(exception.InnerException ?? exception, nameof(HeathCheck));
Interlocked.Exchange(ref heathCheckTimerLock, 0);
}
}
public Task StartAsync(CancellationToken cancellationToken)
{
if (logger.IsEnabled(LogLevel.Information))
logger.LogInformation("EventBus Background Service is starting.");
DistributedMonitorTime = new Timer(state => DistributedStart().Wait(), null, 1000, _MonitTime);
DistributedHoldTimer = new Timer(state => DistributedHold().Wait(), null, _HoldTime, _HoldTime);
HeathCheckTimer = new Timer(state => { HeathCheck().Wait(); }, null, _checkTime, _checkTime);
return Task.CompletedTask;
}
public Task StopAsync(CancellationToken cancellationToken)
{
if (logger.IsEnabled(LogLevel.Information))
logger.LogInformation("EventBus Background Service is stopping.");
Dispose();
return Task.CompletedTask;
}
public void Dispose()
{
if (logger.IsEnabled(LogLevel.Information))
logger.LogInformation("EventBus Background Service is disposing.");
foreach (var runner in ConsumerRunners.Values)
{
runner.Close();
}
DistributedMonitorTime?.Dispose();
DistributedHoldTimer?.Dispose();
HeathCheckTimer?.Dispose();
}
}
}
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using Ray.Core.Channels;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace Ray.EventBus.RabbitMQ
{
public class ConsumerRunner
{
readonly IMpscChannel<BasicDeliverEventArgs> mpscChannel;
public ConsumerRunner(
IRabbitMQClient client,
IServiceProvider provider,
RabbitConsumer consumer,
QueueInfo queue)
{
Client = client;
Logger = provider.GetService<ILogger<ConsumerRunner>>();
mpscChannel = provider.GetService<IMpscChannel<BasicDeliverEventArgs>>();
mpscChannel.BindConsumer(BatchExecuter);
Consumer = consumer;
Queue = queue;
}
public ILogger<ConsumerRunner> Logger { get; }
public IRabbitMQClient Client { get; }
public RabbitConsumer Consumer { get; }
public QueueInfo Queue { get; }
public ModelWrapper Model { get; set; }
public EventingBasicConsumer BasicConsumer { get; set; }
public bool IsUnAvailable => !BasicConsumer.IsRunning || Model.Model.IsClosed;
private bool isFirst = true;
public Task Run()
{
Model = Client.PullModel();
mpscChannel.Config(Model.Connection.Options.CunsumerMaxBatchSize, Model.Connection.Options.CunsumerMaxMillisecondsInterval);
if (isFirst)
{
isFirst = false;
Model.Model.ExchangeDeclare(Consumer.EventBus.Exchange, "direct", true);
Model.Model.QueueDeclare(Queue.Queue, true, false, false, null);
Model.Model.QueueBind(Queue.Queue, Consumer.EventBus.Exchange, Queue.RoutingKey);
}
Model.Model.BasicQos(0, Model.Connection.Options.CunsumerMaxBatchSize, false);
BasicConsumer = new EventingBasicConsumer(Model.Model);
BasicConsumer.Received += async (ch, ea) => await mpscChannel.WriteAsync(ea);
BasicConsumer.ConsumerTag = Model.Model.BasicConsume(Queue.Queue, Consumer.Config.AutoAck, BasicConsumer);
return Task.CompletedTask;
}
public Task HeathCheck()
{
if (IsUnAvailable)
{
Close();
return Run();
}
else
return Task.CompletedTask;
}
private async Task BatchExecuter(List<BasicDeliverEventArgs> list)
{
if (list.Count == 1)
{
await Process(list.First());
}
else
{
try
{
await Consumer.Notice(list.Select(o => o.Body).ToList());
if (!Consumer.Config.AutoAck)
{
Model.Model.BasicAck(list.Max(o => o.DeliveryTag), true);
}
}
catch (Exception exception)
{
Logger.LogError(exception.InnerException ?? exception, $"An error occurred in {Consumer.EventBus.Exchange}-{Queue}");
if (Consumer.Config.Reenqueue)
{
await Task.Delay(1000);
foreach (var item in list)
{
Model.Model.BasicReject(item.DeliveryTag, true);
}
}
}
}
}
private async Task Process(BasicDeliverEventArgs ea)
{
try
{
await Consumer.Notice(ea.Body);
if (!Consumer.Config.AutoAck)
{
Model.Model.BasicAck(ea.DeliveryTag, false);
}
}
catch (Exception exception)
{
Logger.LogError(exception.InnerException ?? exception, $"An error occurred in {Consumer.EventBus.Exchange}-{Queue}");
if (Consumer.Config.Reenqueue)
{
await Task.Delay(1000);
Model.Model.BasicReject(ea.DeliveryTag, true);
}
}
}
public void Close()
{
Model?.Dispose();
}
}
}
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Ray.Core.EventBus;
namespace Ray.EventBus.RabbitMQ
{
public class RabbitConsumer : Consumer
{
public RabbitConsumer(
List<Func<byte[], Task>> eventHandlers,
List<Func<List<byte[]>, Task>> batchEventHandlers) : base(eventHandlers, batchEventHandlers)
{
}
public RabbitEventBus EventBus { get; set; }
public List<QueueInfo> QueueList { get; set; }
public ConsumerOptions Config { get; set; }
}
}
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Orleans;
using RabbitMQ.Client;
using Ray.Core.Abstractions;
using Ray.Core.EventBus;
using Ray.Core.Exceptions;
using Ray.Core.Utils;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading.Tasks;
namespace Ray.EventBus.RabbitMQ
{
public class EventBusContainer : IRabbitEventBusContainer, IProducerContainer
{
private readonly ConcurrentDictionary<Type, RabbitEventBus> eventBusDictionary = new ConcurrentDictionary<Type, RabbitEventBus>();
private readonly List<RabbitEventBus> eventBusList = new List<RabbitEventBus>();
readonly IRabbitMQClient rabbitMQClient;
readonly IServiceProvider serviceProvider;
private readonly IObserverUnitContainer observerUnitContainer;
public EventBusContainer(
IServiceProvider serviceProvider,
IObserverUnitContainer observerUnitContainer,
IRabbitMQClient rabbitMQClient)
{
this.serviceProvider = serviceProvider;
this.rabbitMQClient = rabbitMQClient;
this.observerUnitContainer = observerUnitContainer;
}
public async Task AutoRegister()
{
var observableList = new List<(Type type, ProducerAttribute config)>();
foreach (var assembly in AssemblyHelper.GetAssemblies(serviceProvider.GetService<ILogger<EventBusContainer>>()))
{
foreach (var type in assembly.GetTypes())
{
foreach (var attribute in type.GetCustomAttributes(false))
{
if (attribute is ProducerAttribute config)
{
observableList.Add((type, config));
break;
}
}
}
}
foreach (var (type, config) in observableList)
{
var eventBus = CreateEventBus(string.IsNullOrEmpty(config.Exchange) ? type.Name : config.Exchange, string.IsNullOrEmpty(config.RoutePrefix) ? type.Name : config.RoutePrefix, config.LBCount, config.AutoAck, config.Reenqueue, config.Persistent).BindProducer(type);
if (typeof(IGrainWithIntegerKey).IsAssignableFrom(type))
{
await eventBus.AddGrainConsumer<long>();
}
else if (typeof(IGrainWithStringKey).IsAssignableFrom(type))
{
await eventBus.AddGrainConsumer<string>();
}
else
throw new PrimaryKeyTypeException(type.FullName);
}
}
public RabbitEventBus CreateEventBus(string exchange, string routePrefix, int lBCount = 1, bool autoAck = false, bool reenqueue = false, bool persistent = false)
{
return new RabbitEventBus(observerUnitContainer, this, exchange, routePrefix, lBCount, autoAck, reenqueue, persistent);
}
public RabbitEventBus CreateEventBus<MainGrain>(string exchange, string routePrefix, int lBCount = 1, bool autoAck = false, bool reenqueue = false, bool persistent = false)
{
return CreateEventBus(exchange, routePrefix, lBCount, autoAck, reenqueue, persistent).BindProducer<MainGrain>();
}
public Task Work(RabbitEventBus bus)
{
if (eventBusDictionary.TryAdd(bus.ProducerType, bus))
{
eventBusList.Add(bus);
using var channel = rabbitMQClient.PullModel();
channel.Model.ExchangeDeclare(bus.Exchange, "direct", true);
return Task.CompletedTask;
}
else
throw new EventBusRepeatException(bus.ProducerType.FullName);
}
readonly ConcurrentDictionary<Type, IProducer> producerDict = new ConcurrentDictionary<Type, IProducer>();
public ValueTask<IProducer> GetProducer(Type type)
{
if (eventBusDictionary.TryGetValue(type, out var eventBus))
{
return new ValueTask<IProducer>(producerDict.GetOrAdd(type, key =>
{
return new RabbitProducer(rabbitMQClient, eventBus);
}));
}
else
{
throw new NotImplementedException($"{nameof(IProducer)} of {type.FullName}");
}
}
public ValueTask<IProducer> GetProducer<T>()
{
return GetProducer(typeof(T));
}
public List<IConsumer> GetConsumers()
{
var result = new List<IConsumer>();
foreach (var eventBus in eventBusList)
{
result.AddRange(eventBus.Consumers);
}
return result;
}
}
}
using System;
namespace Ray.EventBus.RabbitMQ
{
public class EventBusRepeatException : Exception
{
public EventBusRepeatException(string message) : base(message)
{
}
}
}
using System.Threading.Tasks;
using Ray.Core.EventBus;
namespace Ray.EventBus.RabbitMQ
{
public interface IRabbitEventBusContainer : IConsumerContainer
{
Task AutoRegister();
RabbitEventBus CreateEventBus(string exchange, string routePrefix, int lBCount = 1, bool autoAck = false, bool reenqueue = false, bool persistent = false);
RabbitEventBus CreateEventBus<MainGrain>(string routePrefix, string queue, int lBCount = 1, bool autoAck = false, bool reenqueue = false, bool persistent = false);
Task Work(RabbitEventBus bus);
}
}
namespace Ray.EventBus.RabbitMQ
{
public class QueueInfo
{
public string Queue { get; set; }
public string RoutingKey { get; set; }
public override string ToString()
{
return $"{Queue}_{RoutingKey}";
}
}
}
using Ray.Core.Abstractions;
using Ray.Core.Exceptions;
using Ray.Core.Utils;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace Ray.EventBus.RabbitMQ
{
public class RabbitEventBus
{
private readonly ConsistentHash _CHash;
readonly IObserverUnitContainer observerUnitContainer;
public RabbitEventBus(
IObserverUnitContainer observerUnitContainer,
IRabbitEventBusContainer eventBusContainer,
string exchange, string routePrefix, int lBCount = 1, bool autoAck = false, bool reenqueue = true, bool persistent = false)
{
if (string.IsNullOrEmpty(exchange))
throw new ArgumentNullException(nameof(exchange));
if (string.IsNullOrEmpty(routePrefix))
throw new ArgumentNullException(nameof(routePrefix));
if (lBCount < 1)
throw new ArgumentOutOfRangeException($"{nameof(lBCount)} must be greater than 1");
this.observerUnitContainer = observerUnitContainer;
Container = eventBusContainer;
Exchange = exchange;
RoutePrefix = routePrefix;
LBCount = lBCount;
Persistent = persistent;
ConsumerConfig = new ConsumerOptions
{
AutoAck = autoAck,
Reenqueue = reenqueue,
};
RouteList = new List<string>();
if (LBCount == 1)
{
RouteList.Add(routePrefix);
}
else
{
for (int i = 0; i < LBCount; i++)
{
RouteList.Add($"{routePrefix }_{ i.ToString()}");
}
}
_CHash = new ConsistentHash(RouteList, lBCount * 10);
}
public IRabbitEventBusContainer Container { get; }
public string Exchange { get; }
public string RoutePrefix { get; }
public int LBCount { get; }
public ConsumerOptions ConsumerConfig { get; set; }
public List<string> RouteList { get; }
public Type ProducerType { get; set; }
/// <summary>
/// 消息是否持久化
/// </summary>
public bool Persistent { get; set; }
public List<RabbitConsumer> Consumers { get; set; } = new List<RabbitConsumer>();
public string GetRoute(string key)
{
return LBCount == 1 ? RoutePrefix : _CHash.GetNode(key); ;
}
public RabbitEventBus BindProducer<TGrain>()
{
return BindProducer(typeof(TGrain));
}
public RabbitEventBus BindProducer(Type grainType)
{
if (ProducerType == null)
ProducerType = grainType;
else
throw new EventBusRepeatBindingProducerException(grainType.FullName);
return this;
}
public RabbitEventBus AddGrainConsumer<PrimaryKey>(string observerGroup)
{
var observerUnit = observerUnitContainer.GetUnit<PrimaryKey>(ProducerType);
var consumer = new RabbitConsumer(
observerUnit.GetEventHandlers(observerGroup),
observerUnit.GetBatchEventHandlers(observerGroup))
{
EventBus = this,
QueueList = RouteList.Select(route => new QueueInfo { RoutingKey = route, Queue = $"{route}_{observerGroup}" }).ToList(),
Config = ConsumerConfig
};
Consumers.Add(consumer);
return this;
}
public RabbitEventBus AddConsumer(
Func<byte[], Task> handler,
Func<List<byte[]>, Task> batchHandler,
string observerGroup)
{
var consumer = new RabbitConsumer(
new List<Func<byte[], Task>> { handler },
new List<Func<List<byte[]>, Task>> { batchHandler })
{
EventBus = this,
QueueList = RouteList.Select(route => new QueueInfo { RoutingKey = route, Queue = $"{route}_{observerGroup}" }).ToList(),
Config = ConsumerConfig
};
Consumers.Add(consumer);
return this;
}
public Task Enable()
{
return Container.Work(this);
}
public Task AddGrainConsumer<PrimaryKey>()
{
foreach (var group in observerUnitContainer.GetUnit<PrimaryKey>(ProducerType).GetGroups())
{
AddGrainConsumer<PrimaryKey>(group);
};
return Enable();
}
}
}
using System;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Ray.Core;
using Ray.Core.EventBus;
namespace Ray.EventBus.RabbitMQ
{
public static class Extensions
{
public static void AddRabbitMQ(
this IServiceCollection serviceCollection,
Action<RabbitOptions> rabbitConfigAction,
Func<IRabbitEventBusContainer, Task> eventBusConfig = default)
{
serviceCollection.Configure<RabbitOptions>(config => rabbitConfigAction(config));
serviceCollection.AddSingleton<IRabbitMQClient, RabbitMQClient>();
serviceCollection.AddHostedService<ConsumerManager>();
serviceCollection.AddSingleton<IRabbitEventBusContainer, EventBusContainer>();
serviceCollection.AddSingleton(serviceProvider => serviceProvider.GetService<IRabbitEventBusContainer>() as IProducerContainer);
Startup.Register(async serviceProvider =>
{
var container = serviceProvider.GetService<IRabbitEventBusContainer>();
if (eventBusConfig != default)
await eventBusConfig(container);
else
await container.AutoRegister();
});
}
}
}
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>netstandard2.1</TargetFramework>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="3.1.0" />
<PackageReference Include="Microsoft.Extensions.Logging" Version="3.1.1" />
<PackageReference Include="Microsoft.Extensions.ObjectPool" Version="3.1.1" />
<PackageReference Include="Microsoft.Orleans.Core.Abstractions" Version="3.0.2" />
<PackageReference Include="Microsoft.Orleans.Runtime.Abstractions" Version="3.0.2" />
<PackageReference Include="RabbitMQ.Client" Version="5.1.2" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Pole.Core\Pole.Core.csproj" />
</ItemGroup>
</Project>
using Ray.Core;
using Ray.Core.EventBus;
using System.Threading.Tasks;
namespace Ray.EventBus.RabbitMQ
{
public class RabbitProducer : IProducer
{
readonly RabbitEventBus publisher;
readonly IRabbitMQClient rabbitMQClient;
public RabbitProducer(
IRabbitMQClient rabbitMQClient,
RabbitEventBus publisher)
{
this.publisher = publisher;
this.rabbitMQClient = rabbitMQClient;
}
public ValueTask Publish(byte[] bytes, string hashKey)
{
using var model = rabbitMQClient.PullModel();
model.Publish(bytes, publisher.Exchange, publisher.GetRoute(hashKey), publisher.Persistent);
return Consts.ValueTaskDone;
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment