using Pole.Core.Abstractions; using Pole.Core.Exceptions; using Pole.Core.Utils; using System; using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; namespace Pole.EventBus.RabbitMQ { public class RabbitEventBus { private readonly ConsistentHash _CHash; readonly IObserverUnitContainer observerUnitContainer; public RabbitEventBus( IObserverUnitContainer observerUnitContainer, IRabbitEventBusContainer eventBusContainer, string exchange, string routePrefix, int lBCount = 1, bool autoAck = false, bool reenqueue = true, bool persistent = false) { if (string.IsNullOrEmpty(exchange)) throw new ArgumentNullException(nameof(exchange)); if (string.IsNullOrEmpty(routePrefix)) throw new ArgumentNullException(nameof(routePrefix)); if (lBCount < 1) throw new ArgumentOutOfRangeException($"{nameof(lBCount)} must be greater than 1"); this.observerUnitContainer = observerUnitContainer; Container = eventBusContainer; Exchange = exchange; RoutePrefix = routePrefix; LBCount = lBCount; Persistent = persistent; ConsumerConfig = new ConsumerOptions { AutoAck = autoAck, Reenqueue = reenqueue, }; RouteList = new List(); if (LBCount == 1) { RouteList.Add(routePrefix); } else { for (int i = 0; i < LBCount; i++) { RouteList.Add($"{routePrefix }_{ i.ToString()}"); } } _CHash = new ConsistentHash(RouteList, lBCount * 10); } public IRabbitEventBusContainer Container { get; } public string Exchange { get; } public string RoutePrefix { get; } public int LBCount { get; } public ConsumerOptions ConsumerConfig { get; set; } public List RouteList { get; } public Type ProducerType { get; set; } /// /// 消息是否持久化 /// public bool Persistent { get; set; } public List Consumers { get; set; } = new List(); public string GetRoute(string key) { return LBCount == 1 ? RoutePrefix : _CHash.GetNode(key); ; } public RabbitEventBus BindProducer() { return BindProducer(typeof(TGrain)); } public RabbitEventBus BindProducer(Type grainType) { if (ProducerType == null) ProducerType = grainType; else throw new EventBusRepeatBindingProducerException(grainType.FullName); return this; } public RabbitEventBus AddGrainConsumer(string observerGroup) { var observerUnit = observerUnitContainer.GetUnit(ProducerType); var consumer = new RabbitConsumer( observerUnit.GetEventHandlers(observerGroup), observerUnit.GetBatchEventHandlers(observerGroup)) { EventBus = this, QueueList = RouteList.Select(route => new QueueInfo { RoutingKey = route, Queue = $"{route}_{observerGroup}" }).ToList(), Config = ConsumerConfig }; Consumers.Add(consumer); return this; } public RabbitEventBus AddConsumer( Func handler, Func, Task> batchHandler, string observerGroup) { var consumer = new RabbitConsumer( new List> { handler }, new List, Task>> { batchHandler }) { EventBus = this, QueueList = RouteList.Select(route => new QueueInfo { RoutingKey = route, Queue = $"{route}_{observerGroup}" }).ToList(), Config = ConsumerConfig }; Consumers.Add(consumer); return this; } public Task Enable() { return Container.Work(this); } public Task AddGrainConsumer() { foreach (var group in observerUnitContainer.GetUnit(ProducerType).GetGroups()) { AddGrainConsumer(group); }; return Enable(); } } }