Commit 5f7a7238 by 丁松杰

优化代码

parent b2ed39db
......@@ -6,7 +6,7 @@ namespace Pole.Core.EventBus
{
public interface IObserverUnit<PrimaryKey> : IGrainID
{
List<Func<byte[], Task>> GetEventHandlers();
List<Func<List<byte[]>, Task>> GetBatchEventHandlers();
Func<byte[], Task> GetEventHandler();
Func<List<byte[]>, Task> GetBatchEventHandler();
}
}
......@@ -22,8 +22,8 @@ namespace Pole.Core.EventBus
readonly ISerializer serializer;
readonly ITypeFinder typeFinder;
readonly IClusterClient clusterClient;
readonly List<Func<byte[], Task>> eventHandlers = new List<Func<byte[], Task>>();
readonly List<Func<List<byte[]>, Task>> batchEventHandlers = new List<Func<List<byte[]>, Task>>();
Func<byte[], Task> eventHandler;
Func<List<byte[]>, Task> batchEventHandler;
protected ILogger Logger { get; private set; }
public Type GrainType { get; }
......@@ -41,21 +41,21 @@ namespace Pole.Core.EventBus
return new ObserverUnit<PrimaryKey>(serviceProvider, typeof(Grain));
}
public List<Func<byte[], Task>> GetEventHandlers()
public Func<byte[], Task> GetEventHandler()
{
return eventHandlers;
return eventHandler;
}
public List<Func<List<byte[]>, Task>> GetBatchEventHandlers()
public Func<List<byte[]>, Task> GetBatchEventHandler()
{
return batchEventHandlers;
return batchEventHandler;
}
public ObserverUnit<PrimaryKey> UnreliableObserver(
Func<IServiceProvider,
FullyEvent<PrimaryKey>, ValueTask> handler)
{
GetEventHandlers().Add(EventHandler);
GetBatchEventHandlers().Add(BatchEventHandler);
eventHandler = EventHandler;
batchEventHandler = BatchEventHandler;
return this;
//内部函数
Task EventHandler(byte[] bytes)
......@@ -111,8 +111,8 @@ namespace Pole.Core.EventBus
{
if (!typeof(PoleEventHandlerBase).IsAssignableFrom(observerType))
throw new NotSupportedException($"{observerType.FullName} must inheritance from PoleEventHandler");
GetEventHandlers().Add(EventHandler);
GetBatchEventHandlers().Add(BatchEventHandler);
eventHandler = EventHandler;
batchEventHandler = BatchEventHandler;
//内部函数
Task EventHandler(byte[] bytes)
{
......
......@@ -9,7 +9,10 @@ namespace Pole.EventBus.RabbitMQ
public string Password { get; set; }
public string VirtualHost { get; set; }
public int MasChannelsPerConnection { get; set; } = 200;
public int MaxConnection { get; set; } = 20;
/// <summary>
/// 目前为一个连接 当消息数量非常大时,单个TCP连接的运输能力有限,可以修改这个最大连接数提高运输能力
/// </summary>
public int MaxConnection { get; set; } = 1;
/// <summary>
/// 消费者批量处理每次处理的最大消息量
/// </summary>
......
......@@ -43,49 +43,21 @@ namespace Pole.EventBus.RabbitMQ
int distributedMonitorTimeLock = 0;
int distributedHoldTimerLock = 0;
int heathCheckTimerLock = 0;
private async Task DistributedStart()
private async Task Start()
{
try
var consumers = rabbitEventBusContainer.GetConsumers();
foreach (var consumer in consumers)
{
if (Interlocked.CompareExchange(ref distributedMonitorTimeLock, 1, 0) == 0)
if (consumer is RabbitConsumer value)
{
var consumers = rabbitEventBusContainer.GetConsumers();
foreach (var consumer in consumers)
{
if (consumer is RabbitConsumer value)
{
for (int i = 0; i < value.QueueList.Count(); i++)
{
var queue = value.QueueList[i];
var key = queue.ToString();
if (!Runners.ContainsKey(key))
{
var weight = 100000 - Runners.Count;
var (isOk, lockId, expectMillisecondDelay) = await grainFactory.GetGrain<IWeightHoldLock>(key).Lock(weight, lockHoldingSeconds);
if (isOk)
{
if (Runners.TryAdd(key, lockId))
{
var runner = new ConsumerRunner(client, provider, value, queue);
ConsumerRunners.TryAdd(key, runner);
await runner.Run();
}
var queue = value.QueueInfo;
var key = queue.Queue;
}
}
}
}
}
Interlocked.Exchange(ref distributedMonitorTimeLock, 0);
if (logger.IsEnabled(LogLevel.Information))
logger.LogInformation("EventBus Background Service is working.");
var runner = new ConsumerRunner(client, provider, value, queue);
ConsumerRunners.TryAdd(key, runner);
await runner.Run();
}
}
catch (Exception exception)
{
logger.LogError(exception.InnerException ?? exception, nameof(DistributedStart));
Interlocked.Exchange(ref distributedMonitorTimeLock, 0);
}
}
private async Task DistributedHold()
{
......@@ -141,7 +113,7 @@ namespace Pole.EventBus.RabbitMQ
{
if (logger.IsEnabled(LogLevel.Information))
logger.LogInformation("EventBus Background Service is starting.");
DistributedMonitorTime = new Timer(state => DistributedStart().Wait(), null, 1000, _MonitTime);
DistributedMonitorTime = new Timer(state => Start().Wait(), null, 1000, _MonitTime);
DistributedHoldTimer = new Timer(state => DistributedHold().Wait(), null, _HoldTime, _HoldTime);
HeathCheckTimer = new Timer(state => { HeathCheck().Wait(); }, null, _checkTime, _checkTime);
return Task.CompletedTask;
......
......@@ -42,8 +42,10 @@ namespace Pole.EventBus.RabbitMQ
{
isFirst = false;
Model.Model.ExchangeDeclare(Consumer.EventBus.Exchange, "direct", true);
Model.Model.ExchangeDeclare(Queue.Queue, "direct", true);
Model.Model.ExchangeBind(Consumer.EventBus.Exchange, Queue.Queue,string.Empty);
Model.Model.QueueDeclare(Queue.Queue, true, false, false, null);
Model.Model.QueueBind(Queue.Queue, Consumer.EventBus.Exchange, Queue.RoutingKey);
Model.Model.QueueBind(Queue.Queue, Queue.Queue, string.Empty);
}
Model.Model.BasicQos(0, Model.Connection.Options.CunsumerMaxBatchSize, false);
BasicConsumer = new EventingBasicConsumer(Model.Model);
......
......@@ -8,12 +8,12 @@ namespace Pole.EventBus.RabbitMQ
public class RabbitConsumer : Consumer
{
public RabbitConsumer(
List<Func<byte[], Task>> eventHandlers,
List<Func<List<byte[]>, Task>> batchEventHandlers) : base(eventHandlers, batchEventHandlers)
Func<byte[], Task> eventHandlers,
Func<List<byte[]>, Task> batchEventHandlers) : base(new List<Func<byte[], Task>> { eventHandlers }, new List<Func<List<byte[]>, Task>> { batchEventHandlers })
{
}
public RabbitEventBus EventBus { get; set; }
public List<QueueInfo> QueueList { get; set; }
public QueueInfo QueueInfo { get; set; }
public ConsumerOptions Config { get; set; }
}
}
......@@ -10,7 +10,6 @@ namespace Pole.EventBus.RabbitMQ
{
public class RabbitEventBus
{
private readonly ConsistentHash _CHash;
readonly IObserverUnitContainer observerUnitContainer;
public RabbitEventBus(
IObserverUnitContainer observerUnitContainer,
......@@ -34,8 +33,6 @@ namespace Pole.EventBus.RabbitMQ
AutoAck = autoAck,
Reenqueue = reenqueue,
};
RouteList = new List<string>() { $"{routePrefix }" };
_CHash = new ConsistentHash(RouteList, lBCount * 10);
}
public IRabbitEventBusContainer Container { get; }
public string Exchange { get; }
......@@ -52,7 +49,7 @@ namespace Pole.EventBus.RabbitMQ
public List<RabbitConsumer> Consumers { get; set; } = new List<RabbitConsumer>();
public string GetRoute(string key)
{
return LBCount == 1 ? RoutePrefix : _CHash.GetNode(key); ;
return RoutePrefix;
}
public RabbitEventBus BindEvent(Type eventType, string eventName)
{
......@@ -66,33 +63,17 @@ namespace Pole.EventBus.RabbitMQ
foreach (var observerUnit in observerUnits)
{
var consumer = new RabbitConsumer(
observerUnit.GetEventHandlers(),
observerUnit.GetBatchEventHandlers())
observerUnit.GetEventHandler(),
observerUnit.GetBatchEventHandler())
{
EventBus = this,
QueueList = RouteList.Select(route => new QueueInfo { RoutingKey = "", Queue = $"{route}_{EventName}" }).ToList(),
QueueInfo = new QueueInfo { RoutingKey = RoutePrefix, Queue = $"{RoutePrefix}_{observerUnit}" },
Config = ConsumerConfig
};
Consumers.Add(consumer);
}
return Enable();
}
public RabbitEventBus AddConsumer(
Func<byte[], Task> handler,
Func<List<byte[]>, Task> batchHandler,
string observerGroup)
{
var consumer = new RabbitConsumer(
new List<Func<byte[], Task>> { handler },
new List<Func<List<byte[]>, Task>> { batchHandler })
{
EventBus = this,
QueueList = RouteList.Select(route => new QueueInfo { RoutingKey = route, Queue = $"{route}_{observerGroup}" }).ToList(),
Config = ConsumerConfig
};
Consumers.Add(consumer);
return this;
}
public Task Enable()
{
return Container.Work(this);
......
......@@ -17,11 +17,4 @@
<ProjectReference Include="..\Pole.Core\Pole.Core.csproj" />
</ItemGroup>
<ItemGroup>
<Folder Include="Core\Attributes\" />
<Folder Include="Core\Client\" />
<Folder Include="Core\Configuration\" />
<Folder Include="Core\Consumer\" />
</ItemGroup>
</Project>
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment