Commit d9d03b97 by 丁松杰

event bus 基本代码

parent 18f75144
Showing with 165 additions and 630 deletions
using System;
namespace Ray.Core.Abstractions
namespace Pole.Core.Abstractions
{
public interface IGrainID
{
......
......@@ -2,7 +2,7 @@
using System.Collections.Generic;
using System.Threading.Tasks;
namespace Ray.Core.Abstractions
namespace Pole.Core.Abstractions
{
public interface IObserverUnit<PrimaryKey> : IGrainID
{
......
using System;
namespace Ray.Core.Abstractions
namespace Pole.Core.Abstractions
{
public interface IObserverUnitContainer
{
......
using System.Threading.Tasks;
namespace Ray.Core.Channels
namespace Pole.Core.Channels
{
public interface IBaseMpscChannel
{
......
......@@ -2,7 +2,7 @@
using System.Collections.Generic;
using System.Threading.Tasks;
namespace Ray.Core.Channels
namespace Pole.Core.Channels
{
public interface IMpscChannel<T> : IBaseMpscChannel
{
......
using System.Threading.Tasks;
namespace Ray.Core.Channels
namespace Pole.Core.Channels
{
public class AsyncInputEvent<Input, Output>
{
......
namespace Ray.Core.Channels
namespace Pole.Core.Channels
{
public class ChannelOptions
{
......
......@@ -7,7 +7,7 @@ using System.Threading.Tasks.Dataflow;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace Ray.Core.Channels
namespace Pole.Core.Channels
{
/// <summary>
/// multi producter single consumer channel
......
using System;
namespace Ray.Core.Channels
namespace Pole.Core.Channels
{
public class NoBindConsumerException : Exception
{
......
using System;
namespace Ray.Core.Channels
namespace Pole.Core.Channels
{
public class RebindConsumerException : Exception
{
......
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
namespace Pole.Core
{
public static class Consts
{
public static ValueTask ValueTaskDone = new ValueTask();
}
}
......@@ -3,7 +3,7 @@ using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace Ray.Core.EventBus
namespace Pole.Core.EventBus
{
public abstract class Consumer : IConsumer
{
......
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.Core.EventBus.Event
{
[AttributeUsage(AttributeTargets.Class)]
public class EventInfoAttribute: Attribute
{
public string SendBoxName { get; set; }
}
}
namespace Ray.EventBus.RabbitMQ
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.Core.EventBus.Event
{
public interface IRabbitMQClient
public interface IEvent
{
ModelWrapper PullModel();
}
}
using System.Collections.Generic;
using System.Threading.Tasks;
namespace Ray.Core.EventBus
namespace Pole.Core.EventBus
{
public interface IConsumer
{
......
using System.Collections.Generic;
namespace Ray.Core.EventBus
namespace Pole.Core.EventBus
{
public interface IConsumerContainer
{
......
using System.Threading.Tasks;
namespace Ray.Core.EventBus
namespace Pole.Core.EventBus
{
public interface IProducer
{
......
using System;
using System.Threading.Tasks;
namespace Ray.Core.EventBus
namespace Pole.Core.EventBus
{
public interface IProducerContainer
{
......
using System;
namespace Ray.Core.Exceptions
namespace Pole.Core.Exceptions
{
public class BeginTxTimeoutException : Exception
{
......
using System;
namespace Ray.Core.Exceptions
namespace Pole.Core.Exceptions
{
public class ChannelUnavailabilityException : Exception
{
......
using System;
namespace Ray.Core.Exceptions
namespace Pole.Core.Exceptions
{
public class EventBusRepeatBindingProducerException : Exception
{
......
using System;
namespace Ray.Core.Exceptions
namespace Pole.Core.Exceptions
{
public class EventIsClearedException : Exception
{
......
using System;
namespace Ray.Core.Exceptions
namespace Pole.Core.Exceptions
{
public class EventVersionUnorderedException : Exception
{
......
using System;
namespace Ray.Core.Exceptions
namespace Pole.Core.Exceptions
{
public class ObserverNotCompletedException : Exception
{
......
using System;
namespace Ray.Core.Exceptions
namespace Pole.Core.Exceptions
{
public class ObserverUnitRepeatedException : Exception
{
......
using System;
namespace Ray.Core.Exceptions
namespace Pole.Core.Exceptions
{
public class PrimaryKeyTypeException : Exception
{
......
using System;
namespace Ray.Core.Exceptions
namespace Pole.Core.Exceptions
{
public class RepeatedTxException : Exception
{
......
using System;
namespace Ray.Core.Exceptions
namespace Pole.Core.Exceptions
{
public class SnapshotNotSupportTxException : Exception
{
......
using System;
namespace Ray.Core.Exceptions
namespace Pole.Core.Exceptions
{
public class StateInsecurityException : Exception
{
......
using System;
namespace Ray.Core.Exceptions
namespace Pole.Core.Exceptions
{
public class StateIsOverException : Exception
{
......
using System;
namespace Ray.Core.Exceptions
namespace Pole.Core.Exceptions
{
public class TxCommitException : Exception
{
......
using System;
namespace Ray.Core.Exceptions
namespace Pole.Core.Exceptions
{
public class TxIdException : Exception
{
......
using System;
namespace Ray.Core.Exceptions
namespace Pole.Core.Exceptions
{
public class TxSnapshotException : Exception
{
......
using System;
namespace Ray.Core.Exceptions
namespace Pole.Core.Exceptions
{
public class TypeCodeRepeatedException : Exception
{
......
using System;
namespace Ray.Core.Exceptions
namespace Pole.Core.Exceptions
{
public class UnmatchObserverUnitException : Exception
{
......
using System;
namespace Ray.Core.Exceptions
namespace Pole.Core.Exceptions
{
public class UnfindEventHandlerException : Exception
{
......
using System;
namespace Ray.Core.Exceptions
namespace Pole.Core.Exceptions
{
public class UnfindObserverUnitException : Exception
{
......
using System;
namespace Ray.Core.Exceptions
namespace Pole.Core.Exceptions
{
public class UnfindSnapshotHandlerException : Exception
{
......
using System;
namespace Ray.Core.Exceptions
namespace Pole.Core.Exceptions
{
public class UnknowTypeCodeException : Exception
{
......
using System;
namespace Ray.Core.Exceptions
namespace Pole.Core.Exceptions
{
public class UnopenedTransactionException : Exception
{
......
......@@ -2,7 +2,7 @@
using System.Collections.Generic;
using System.Linq;
namespace Ray.Core.Observer
namespace Pole.Core.Observer
{
/// <summary>
/// 标记为观察者
......
......@@ -2,7 +2,7 @@
using System.Threading.Tasks;
using Orleans.Concurrency;
namespace Ray.Core.Observer
namespace Pole.Core.Observer
{
public interface IObserver : IVersion
{
......
using System.Threading.Tasks;
namespace Ray.Core.Observer
namespace Pole.Core.Observer
{
public interface IVersion
{
......
using System.Threading.Tasks;
using Orleans;
namespace Ray.Core.Services
namespace Pole.Core.Services
{
public interface IHoldLock : IGrainWithStringKey
{
......
......@@ -2,7 +2,7 @@
using Orleans;
using Orleans.Concurrency;
namespace Ray.Core.Services
namespace Pole.Core.Services
{
public interface ILocalUID : IGrainWithStringKey
{
......
using System.Threading.Tasks;
using Orleans;
namespace Ray.Core.Services
namespace Pole.Core.Services
{
public interface ILock : IGrainWithStringKey
{
......
......@@ -2,7 +2,7 @@
using Orleans.Concurrency;
using System.Threading.Tasks;
namespace Ray.Core.Services
namespace Pole.Core.Services
{
public interface IUtcUID : IGrainWithStringKey
{
......
......@@ -2,7 +2,7 @@
using Orleans;
namespace Ray.Core.Services
namespace Pole.Core.Services
{
public interface IWeightHoldLock : IGrainWithStringKey
{
......
......@@ -2,7 +2,7 @@
using System.Threading.Tasks;
using Orleans;
namespace Ray.Core.Services
namespace Pole.Core.Services
{
public class HoldLockGrain : Grain, IHoldLock
{
......
......@@ -4,7 +4,7 @@ using System;
using System.Threading;
using System.Threading.Tasks;
namespace Ray.Core.Services
namespace Pole.Core.Services
{
[Reentrant]
public class LocalUIDGrain : Grain, ILocalUID
......
......@@ -3,7 +3,7 @@ using System.Threading.Tasks;
using Orleans;
using Orleans.Concurrency;
namespace Ray.Core.Services
namespace Pole.Core.Services
{
[Reentrant]
public class LockGrain : Grain, ILock
......
......@@ -4,7 +4,7 @@ using System;
using System.Threading;
using System.Threading.Tasks;
namespace Ray.Core.Services
namespace Pole.Core.Services
{
[Reentrant]
public class UtcUIDGrain : Grain, IUtcUID
......
......@@ -2,7 +2,7 @@
using System.Threading.Tasks;
using Orleans;
namespace Ray.Core.Services
namespace Pole.Core.Services
{
public class WeightHoldLock : Grain, IWeightHoldLock
{
......
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace Pole.Core
{
public static class Startup
{
static List<StartupTask> tasks = new List<StartupTask>();
public static void Register(Func<IServiceProvider, Task> method, int sortIndex = 0)
{
tasks.Add(new StartupTask(sortIndex, method));
}
internal static Task StartRay(IServiceProvider serviceProvider)
{
tasks = tasks.OrderBy(func => func.SortIndex).ToList();
return Task.WhenAll(tasks.Select(value => value.Func(serviceProvider)));
}
private class StartupTask
{
public StartupTask(int sortIndex, Func<IServiceProvider, Task> func)
{
SortIndex = sortIndex;
Func = func;
}
public int SortIndex { get; set; }
public Func<IServiceProvider, Task> Func { get; set; }
}
}
}
......@@ -6,7 +6,7 @@ using System.Linq;
using System.Reflection;
using System.Runtime.Loader;
namespace Ray.Core.Utils
namespace Pole.Core.Utils
{
public class AssemblyHelper
{
......
......@@ -4,7 +4,7 @@ using System.Linq;
using System.Runtime.CompilerServices;
using System.Text;
namespace Ray.Core.Utils
namespace Pole.Core.Utils
{
public class ConsistentHash
{
......@@ -72,7 +72,7 @@ namespace Ray.Core.Utils
}
//return the index of first item that >= val.
//if not exist, return 0;
//ay should be ordered array.
//ay should be ordered arPole.
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static int First_ge(int[] ay, int val)
{
......
......@@ -2,7 +2,7 @@
using System.Reflection;
using System.Reflection.Emit;
namespace Ray.Core.Utils.Emit
namespace Pole.Core.Utils.Emit
{
/// <summary>
/// 用来生成模式匹配方法调用的方法信息
......
using System.Runtime.InteropServices;
namespace Ray.Core.Utils
namespace Pole.Core.Utils
{
public class MurmurHash2
{
......
......@@ -2,7 +2,7 @@
using System.IO;
using System.Buffers;
namespace Ray.Core.Utils
namespace Pole.Core.Utils
{
public class PooledMemoryStream : Stream
{
......
using System;
namespace Ray.EventBus.RabbitMQ
namespace Pole.EventBus.RabbitMQ
{
[AttributeUsage(AttributeTargets.Class, AllowMultiple = false)]
public class ProducerAttribute : Attribute
......
......@@ -2,7 +2,7 @@
using System.Collections.Generic;
using System.Threading;
namespace Ray.EventBus.RabbitMQ
namespace Pole.EventBus.RabbitMQ
{
public class ConnectionWrapper
{
......@@ -22,7 +22,7 @@ namespace Ray.EventBus.RabbitMQ
semaphoreSlim.Wait();
try
{
if (models.Count < Options.PoolSizePerConnection)
if (models.Count < Options.MasChannelsPerConnection)
{
var model = new ModelWrapper(this, connection.CreateModel());
models.Add(model);
......
namespace Ray.EventBus.RabbitMQ
namespace Pole.EventBus.RabbitMQ
{
public interface IRabbitMQClient
{
......
......@@ -3,7 +3,7 @@ using RabbitMQ.Client;
using System.Collections.Generic;
using System.Threading;
namespace Ray.EventBus.RabbitMQ
namespace Pole.EventBus.RabbitMQ
{
public class ModelPooledObjectPolicy : IPooledObjectPolicy<ModelWrapper>
{
......
......@@ -2,7 +2,7 @@
using RabbitMQ.Client;
using System;
namespace Ray.EventBus.RabbitMQ
namespace Pole.EventBus.RabbitMQ
{
public class ModelWrapper : IDisposable
{
......
......@@ -2,7 +2,7 @@
using Microsoft.Extensions.Options;
using RabbitMQ.Client;
namespace Ray.EventBus.RabbitMQ
namespace Pole.EventBus.RabbitMQ
{
public class RabbitMQClient : IRabbitMQClient
{
......@@ -17,7 +17,7 @@ namespace Ray.EventBus.RabbitMQ
UserName = options.UserName,
Password = options.Password,
VirtualHost = options.VirtualHost,
AutomaticRecoveryEnabled = false
AutomaticRecoveryEnabled = true
};
pool = new DefaultObjectPool<ModelWrapper>(new ModelPooledObjectPolicy(connectionFactory, options));
}
......
namespace Ray.EventBus.RabbitMQ
namespace Pole.EventBus.RabbitMQ
{
/// <summary>
/// Consumer配置信息
......
using System.Collections.Generic;
using RabbitMQ.Client;
namespace Ray.EventBus.RabbitMQ
namespace Pole.EventBus.RabbitMQ
{
public class RabbitOptions
{
public string UserName { get; set; }
public string Password { get; set; }
public string VirtualHost { get; set; }
public int PoolSizePerConnection { get; set; } = 200;
public int MasChannelsPerConnection { get; set; } = 200;
public int MaxConnection { get; set; } = 20;
/// <summary>
/// 消费者批量处理每次处理的最大消息量
......@@ -18,6 +18,10 @@ namespace Ray.EventBus.RabbitMQ
/// 消费者批量处理每次处理的最大延时
/// </summary>
public int CunsumerMaxMillisecondsInterval { get; set; } = 1000;
/// <summary>
/// exchange 和 queue 名称的前缀
/// </summary>
public string Prefix = "Pole_";
public string[] Hosts
{
get; set;
......
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Orleans;
using Ray.Core.Services;
using Pole.Core.Services;
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace Ray.EventBus.RabbitMQ
namespace Pole.EventBus.RabbitMQ
{
public class ConsumerManager : IHostedService, IDisposable
{
......
......@@ -2,13 +2,13 @@
using Microsoft.Extensions.Logging;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using Ray.Core.Channels;
using Pole.Core.Channels;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace Ray.EventBus.RabbitMQ
namespace Pole.EventBus.RabbitMQ
{
public class ConsumerRunner
{
......
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Ray.Core.EventBus;
using Pole.Core.EventBus;
namespace Ray.EventBus.RabbitMQ
namespace Pole.EventBus.RabbitMQ
{
public class RabbitConsumer : Consumer
{
......
using RabbitMQ.Client;
using System.Collections.Generic;
using System.Threading;
namespace Ray.EventBus.RabbitMQ
{
public class ConnectionWrapper
{
private readonly List<ModelWrapper> models = new List<ModelWrapper>();
private readonly IConnection connection;
readonly SemaphoreSlim semaphoreSlim = new SemaphoreSlim(1);
public ConnectionWrapper(
IConnection connection,
RabbitOptions options)
{
this.connection = connection;
Options = options;
}
public RabbitOptions Options { get; }
public (bool success, ModelWrapper model) Get()
{
semaphoreSlim.Wait();
try
{
if (models.Count < Options.PoolSizePerConnection)
{
var model = new ModelWrapper(this, connection.CreateModel());
models.Add(model);
return (true, model);
}
}
finally
{
semaphoreSlim.Release();
}
return (false, default);
}
public void Return(ModelWrapper model)
{
models.Remove(model);
}
}
}
using Microsoft.Extensions.ObjectPool;
using RabbitMQ.Client;
using System.Collections.Generic;
using System.Threading;
namespace Ray.EventBus.RabbitMQ
{
public class ModelPooledObjectPolicy : IPooledObjectPolicy<ModelWrapper>
{
readonly ConnectionFactory connectionFactory;
readonly List<ConnectionWrapper> connections = new List<ConnectionWrapper>();
readonly SemaphoreSlim semaphoreSlim = new SemaphoreSlim(1);
readonly RabbitOptions options;
public ModelPooledObjectPolicy(ConnectionFactory connectionFactory, RabbitOptions options)
{
this.connectionFactory = connectionFactory;
this.options = options;
}
public ModelWrapper Create()
{
foreach (var connection in connections)
{
(bool success, ModelWrapper model) = connection.Get();
if (success)
return model;
}
semaphoreSlim.Wait();
try
{
if (connections.Count < options.MaxConnection)
{
var connection = new ConnectionWrapper(connectionFactory.CreateConnection(options.EndPoints), options);
(bool success, ModelWrapper model) = connection.Get();
connections.Add(connection);
if (success)
return model;
}
throw new System.OverflowException(nameof(connections));
}
finally
{
semaphoreSlim.Release();
}
}
public bool Return(ModelWrapper obj)
{
if (obj.Model.IsOpen)
return true;
else
{
obj.ForceDispose();
return false;
}
}
}
}
using Microsoft.Extensions.ObjectPool;
using RabbitMQ.Client;
using System;
namespace Ray.EventBus.RabbitMQ
{
public class ModelWrapper : IDisposable
{
readonly IBasicProperties persistentProperties;
readonly IBasicProperties noPersistentProperties;
public DefaultObjectPool<ModelWrapper> Pool { get; set; }
public ConnectionWrapper Connection { get; set; }
public IModel Model { get; set; }
public ModelWrapper(
ConnectionWrapper connectionWrapper,
IModel model)
{
Connection = connectionWrapper;
Model = model;
persistentProperties = Model.CreateBasicProperties();
persistentProperties.Persistent = true;
noPersistentProperties = Model.CreateBasicProperties();
noPersistentProperties.Persistent = false;
}
public void Publish(byte[] msg, string exchange, string routingKey, bool persistent = true)
{
Model.BasicPublish(exchange, routingKey, persistent ? persistentProperties : noPersistentProperties, msg);
}
public void Dispose()
{
Pool.Return(this);
}
public void ForceDispose()
{
Model.Close();
Model.Dispose();
Connection.Return(this);
}
}
}
using Microsoft.Extensions.ObjectPool;
using Microsoft.Extensions.Options;
using RabbitMQ.Client;
namespace Ray.EventBus.RabbitMQ
{
public class RabbitMQClient : IRabbitMQClient
{
readonly ConnectionFactory connectionFactory;
readonly RabbitOptions options;
readonly DefaultObjectPool<ModelWrapper> pool;
public RabbitMQClient(IOptions<RabbitOptions> config)
{
options = config.Value;
connectionFactory = new ConnectionFactory
{
UserName = options.UserName,
Password = options.Password,
VirtualHost = options.VirtualHost,
AutomaticRecoveryEnabled = false
};
pool = new DefaultObjectPool<ModelWrapper>(new ModelPooledObjectPolicy(connectionFactory, options));
}
public ModelWrapper PullModel()
{
var result = pool.Get();
if (result.Pool is null)
result.Pool = pool;
return result;
}
}
}
namespace Ray.EventBus.RabbitMQ
{
/// <summary>
/// Consumer配置信息
/// </summary>
public class ConsumerOptions
{
/// <summary>
/// 是否自动ack
/// </summary>
public bool AutoAck { get; set; }
/// <summary>
/// 消息处理失败是否重回队列还是不停重发
/// </summary>
public bool Reenqueue { get; set; }
}
}
using System.Collections.Generic;
using RabbitMQ.Client;
namespace Ray.EventBus.RabbitMQ
{
public class RabbitOptions
{
public string UserName { get; set; }
public string Password { get; set; }
public string VirtualHost { get; set; }
public int PoolSizePerConnection { get; set; } = 200;
public int MaxConnection { get; set; } = 20;
/// <summary>
/// 消费者批量处理每次处理的最大消息量
/// </summary>
public ushort CunsumerMaxBatchSize { get; set; } = 3000;
/// <summary>
/// 消费者批量处理每次处理的最大延时
/// </summary>
public int CunsumerMaxMillisecondsInterval { get; set; } = 1000;
public string[] Hosts
{
get; set;
}
public List<AmqpTcpEndpoint> EndPoints
{
get
{
var list = new List<AmqpTcpEndpoint>();
foreach (var host in Hosts)
{
list.Add(AmqpTcpEndpoint.Parse(host));
}
return list;
}
}
}
}
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Orleans;
using Ray.Core.Services;
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace Ray.EventBus.RabbitMQ
{
public class ConsumerManager : IHostedService, IDisposable
{
readonly ILogger<ConsumerManager> logger;
readonly IRabbitMQClient client;
readonly IRabbitEventBusContainer rabbitEventBusContainer;
readonly IServiceProvider provider;
readonly IGrainFactory grainFactory;
const int _HoldTime = 20 * 1000;
const int _MonitTime = 60 * 2 * 1000;
const int _checkTime = 10 * 1000;
public ConsumerManager(
ILogger<ConsumerManager> logger,
IRabbitMQClient client,
IGrainFactory grainFactory,
IServiceProvider provider,
IRabbitEventBusContainer rabbitEventBusContainer)
{
this.provider = provider;
this.client = client;
this.logger = logger;
this.rabbitEventBusContainer = rabbitEventBusContainer;
this.grainFactory = grainFactory;
}
private readonly ConcurrentDictionary<string, ConsumerRunner> ConsumerRunners = new ConcurrentDictionary<string, ConsumerRunner>();
private ConcurrentDictionary<string, long> Runners { get; } = new ConcurrentDictionary<string, long>();
private Timer HeathCheckTimer { get; set; }
private Timer DistributedMonitorTime { get; set; }
private Timer DistributedHoldTimer { get; set; }
const int lockHoldingSeconds = 60;
int distributedMonitorTimeLock = 0;
int distributedHoldTimerLock = 0;
int heathCheckTimerLock = 0;
private async Task DistributedStart()
{
try
{
if (Interlocked.CompareExchange(ref distributedMonitorTimeLock, 1, 0) == 0)
{
var consumers = rabbitEventBusContainer.GetConsumers();
foreach (var consumer in consumers)
{
if (consumer is RabbitConsumer value)
{
for (int i = 0; i < value.QueueList.Count(); i++)
{
var queue = value.QueueList[i];
var key = queue.ToString();
if (!Runners.ContainsKey(key))
{
var weight = 100000 - Runners.Count;
var (isOk, lockId, expectMillisecondDelay) = await grainFactory.GetGrain<IWeightHoldLock>(key).Lock(weight, lockHoldingSeconds);
if (isOk)
{
if (Runners.TryAdd(key, lockId))
{
var runner = new ConsumerRunner(client, provider, value, queue);
ConsumerRunners.TryAdd(key, runner);
await runner.Run();
}
}
}
}
}
}
Interlocked.Exchange(ref distributedMonitorTimeLock, 0);
if (logger.IsEnabled(LogLevel.Information))
logger.LogInformation("EventBus Background Service is working.");
}
}
catch (Exception exception)
{
logger.LogError(exception.InnerException ?? exception, nameof(DistributedStart));
Interlocked.Exchange(ref distributedMonitorTimeLock, 0);
}
}
private async Task DistributedHold()
{
try
{
if (logger.IsEnabled(LogLevel.Information))
logger.LogInformation("EventBus Background Service is holding.");
if (Interlocked.CompareExchange(ref distributedHoldTimerLock, 1, 0) == 0)
{
foreach (var lockKV in Runners)
{
if (Runners.TryGetValue(lockKV.Key, out var lockId))
{
var holdResult = await grainFactory.GetGrain<IWeightHoldLock>(lockKV.Key).Hold(lockId, lockHoldingSeconds);
if (!holdResult)
{
if (ConsumerRunners.TryRemove(lockKV.Key, out var runner))
{
runner.Close();
}
Runners.TryRemove(lockKV.Key, out var _);
}
}
}
Interlocked.Exchange(ref distributedHoldTimerLock, 0);
}
}
catch (Exception exception)
{
logger.LogError(exception.InnerException ?? exception, nameof(DistributedHold));
Interlocked.Exchange(ref distributedHoldTimerLock, 0);
}
}
private async Task HeathCheck()
{
try
{
if (logger.IsEnabled(LogLevel.Information))
logger.LogInformation("EventBus Background Service is checking.");
if (Interlocked.CompareExchange(ref heathCheckTimerLock, 1, 0) == 0)
{
await Task.WhenAll(ConsumerRunners.Values.Select(runner => runner.HeathCheck()));
Interlocked.Exchange(ref heathCheckTimerLock, 0);
}
}
catch (Exception exception)
{
logger.LogError(exception.InnerException ?? exception, nameof(HeathCheck));
Interlocked.Exchange(ref heathCheckTimerLock, 0);
}
}
public Task StartAsync(CancellationToken cancellationToken)
{
if (logger.IsEnabled(LogLevel.Information))
logger.LogInformation("EventBus Background Service is starting.");
DistributedMonitorTime = new Timer(state => DistributedStart().Wait(), null, 1000, _MonitTime);
DistributedHoldTimer = new Timer(state => DistributedHold().Wait(), null, _HoldTime, _HoldTime);
HeathCheckTimer = new Timer(state => { HeathCheck().Wait(); }, null, _checkTime, _checkTime);
return Task.CompletedTask;
}
public Task StopAsync(CancellationToken cancellationToken)
{
if (logger.IsEnabled(LogLevel.Information))
logger.LogInformation("EventBus Background Service is stopping.");
Dispose();
return Task.CompletedTask;
}
public void Dispose()
{
if (logger.IsEnabled(LogLevel.Information))
logger.LogInformation("EventBus Background Service is disposing.");
foreach (var runner in ConsumerRunners.Values)
{
runner.Close();
}
DistributedMonitorTime?.Dispose();
DistributedHoldTimer?.Dispose();
HeathCheckTimer?.Dispose();
}
}
}
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using Ray.Core.Channels;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace Ray.EventBus.RabbitMQ
{
public class ConsumerRunner
{
readonly IMpscChannel<BasicDeliverEventArgs> mpscChannel;
public ConsumerRunner(
IRabbitMQClient client,
IServiceProvider provider,
RabbitConsumer consumer,
QueueInfo queue)
{
Client = client;
Logger = provider.GetService<ILogger<ConsumerRunner>>();
mpscChannel = provider.GetService<IMpscChannel<BasicDeliverEventArgs>>();
mpscChannel.BindConsumer(BatchExecuter);
Consumer = consumer;
Queue = queue;
}
public ILogger<ConsumerRunner> Logger { get; }
public IRabbitMQClient Client { get; }
public RabbitConsumer Consumer { get; }
public QueueInfo Queue { get; }
public ModelWrapper Model { get; set; }
public EventingBasicConsumer BasicConsumer { get; set; }
public bool IsUnAvailable => !BasicConsumer.IsRunning || Model.Model.IsClosed;
private bool isFirst = true;
public Task Run()
{
Model = Client.PullModel();
mpscChannel.Config(Model.Connection.Options.CunsumerMaxBatchSize, Model.Connection.Options.CunsumerMaxMillisecondsInterval);
if (isFirst)
{
isFirst = false;
Model.Model.ExchangeDeclare(Consumer.EventBus.Exchange, "direct", true);
Model.Model.QueueDeclare(Queue.Queue, true, false, false, null);
Model.Model.QueueBind(Queue.Queue, Consumer.EventBus.Exchange, Queue.RoutingKey);
}
Model.Model.BasicQos(0, Model.Connection.Options.CunsumerMaxBatchSize, false);
BasicConsumer = new EventingBasicConsumer(Model.Model);
BasicConsumer.Received += async (ch, ea) => await mpscChannel.WriteAsync(ea);
BasicConsumer.ConsumerTag = Model.Model.BasicConsume(Queue.Queue, Consumer.Config.AutoAck, BasicConsumer);
return Task.CompletedTask;
}
public Task HeathCheck()
{
if (IsUnAvailable)
{
Close();
return Run();
}
else
return Task.CompletedTask;
}
private async Task BatchExecuter(List<BasicDeliverEventArgs> list)
{
if (list.Count == 1)
{
await Process(list.First());
}
else
{
try
{
await Consumer.Notice(list.Select(o => o.Body).ToList());
if (!Consumer.Config.AutoAck)
{
Model.Model.BasicAck(list.Max(o => o.DeliveryTag), true);
}
}
catch (Exception exception)
{
Logger.LogError(exception.InnerException ?? exception, $"An error occurred in {Consumer.EventBus.Exchange}-{Queue}");
if (Consumer.Config.Reenqueue)
{
await Task.Delay(1000);
foreach (var item in list)
{
Model.Model.BasicReject(item.DeliveryTag, true);
}
}
}
}
}
private async Task Process(BasicDeliverEventArgs ea)
{
try
{
await Consumer.Notice(ea.Body);
if (!Consumer.Config.AutoAck)
{
Model.Model.BasicAck(ea.DeliveryTag, false);
}
}
catch (Exception exception)
{
Logger.LogError(exception.InnerException ?? exception, $"An error occurred in {Consumer.EventBus.Exchange}-{Queue}");
if (Consumer.Config.Reenqueue)
{
await Task.Delay(1000);
Model.Model.BasicReject(ea.DeliveryTag, true);
}
}
}
public void Close()
{
Model?.Dispose();
}
}
}
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Ray.Core.EventBus;
namespace Ray.EventBus.RabbitMQ
{
public class RabbitConsumer : Consumer
{
public RabbitConsumer(
List<Func<byte[], Task>> eventHandlers,
List<Func<List<byte[]>, Task>> batchEventHandlers) : base(eventHandlers, batchEventHandlers)
{
}
public RabbitEventBus EventBus { get; set; }
public List<QueueInfo> QueueList { get; set; }
public ConsumerOptions Config { get; set; }
}
}
......@@ -2,16 +2,16 @@
using Microsoft.Extensions.Logging;
using Orleans;
using RabbitMQ.Client;
using Ray.Core.Abstractions;
using Ray.Core.EventBus;
using Ray.Core.Exceptions;
using Ray.Core.Utils;
using Pole.Core.Abstractions;
using Pole.Core.EventBus;
using Pole.Core.Exceptions;
using Pole.Core.Utils;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading.Tasks;
namespace Ray.EventBus.RabbitMQ
namespace Pole.EventBus.RabbitMQ
{
public class EventBusContainer : IRabbitEventBusContainer, IProducerContainer
{
......
using System;
namespace Ray.EventBus.RabbitMQ
namespace Pole.EventBus.RabbitMQ
{
public class EventBusRepeatException : Exception
{
......
using System.Threading.Tasks;
using Ray.Core.EventBus;
using Pole.Core.EventBus;
namespace Ray.EventBus.RabbitMQ
namespace Pole.EventBus.RabbitMQ
{
public interface IRabbitEventBusContainer : IConsumerContainer
{
......
namespace Ray.EventBus.RabbitMQ
namespace Pole.EventBus.RabbitMQ
{
public class QueueInfo
{
......
using Ray.Core.Abstractions;
using Ray.Core.Exceptions;
using Ray.Core.Utils;
using Pole.Core.Abstractions;
using Pole.Core.Exceptions;
using Pole.Core.Utils;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace Ray.EventBus.RabbitMQ
namespace Pole.EventBus.RabbitMQ
{
public class RabbitEventBus
{
......
using System;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Ray.Core;
using Ray.Core.EventBus;
using Pole.Core;
using Pole.Core.EventBus;
namespace Ray.EventBus.RabbitMQ
namespace Pole.EventBus.RabbitMQ
{
public static class Extensions
{
......
......@@ -17,4 +17,11 @@
<ProjectReference Include="..\Pole.Core\Pole.Core.csproj" />
</ItemGroup>
<ItemGroup>
<Folder Include="Core\Attributes\" />
<Folder Include="Core\Client\" />
<Folder Include="Core\Configuration\" />
<Folder Include="Core\Consumer\" />
</ItemGroup>
</Project>
using Ray.Core;
using Ray.Core.EventBus;
using Pole.Core;
using Pole.Core;
using Pole.Core.EventBus;
using System.Threading.Tasks;
namespace Ray.EventBus.RabbitMQ
namespace Pole.EventBus.RabbitMQ
{
public class RabbitProducer : IProducer
{
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment