using Pole.Core.EventBus; using Pole.Core.Exceptions; using Pole.Core.Utils; using System; using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; namespace Pole.EventBus.RabbitMQ { public class RabbitEventBus { readonly IObserverUnitContainer observerUnitContainer; public RabbitEventBus( IObserverUnitContainer observerUnitContainer, IRabbitEventBusContainer eventBusContainer, string exchange, string routePrefix, int lBCount = 1, bool autoAck = false, bool reenqueue = true, bool persistent = false) { if (string.IsNullOrEmpty(exchange)) throw new ArgumentNullException(nameof(exchange)); if (string.IsNullOrEmpty(routePrefix)) throw new ArgumentNullException(nameof(routePrefix)); if (lBCount < 1) throw new ArgumentOutOfRangeException($"{nameof(lBCount)} must be greater than 1"); this.observerUnitContainer = observerUnitContainer; Container = eventBusContainer; Exchange = exchange; RoutePrefix = routePrefix; LBCount = lBCount; Persistent = persistent; ConsumerConfig = new ConsumerOptions { AutoAck = autoAck, Reenqueue = reenqueue, ErrorQueueSuffix = "_error", MaxReenqueueTimes = 10 }; } public IRabbitEventBusContainer Container { get; } public string Exchange { get; } public string RoutePrefix { get; } public int LBCount { get; } public ConsumerOptions ConsumerConfig { get; set; } public List RouteList { get; } public Type Event { get; set; } public string EventName { get; set; } /// /// 消息是否持久化 /// public bool Persistent { get; set; } public List Consumers { get; set; } = new List(); public string GetRoute(string key) { return RoutePrefix; } public RabbitEventBus BindEvent(Type eventType, string eventName) { Event = eventType; EventName = eventName; return this; } public Task AddGrainConsumer() { var observerUnits = observerUnitContainer.GetUnits(EventName); foreach (var observerUnit in observerUnits) { var consumer = new RabbitConsumer( observerUnit.GetEventHandler(), observerUnit.GetBatchEventHandler()) { EventBus = this, QueueInfo = new QueueInfo { RoutingKey = RoutePrefix, Queue = $"{RoutePrefix}_{observerUnit}" }, Config = ConsumerConfig }; Consumers.Add(consumer); } return Enable(); } public Task Enable() { return Container.Work(this); } } }