using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Orleans; using RabbitMQ.Client; using Pole.Core.EventBus; using Pole.Core.Exceptions; using Pole.Core.Utils; using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Threading.Tasks; using Pole.Core.EventBus.Event; using Pole.Core.EventBus.EventHandler; using Microsoft.Extensions.Options; namespace Pole.EventBus.RabbitMQ { public class EventBusContainer : IRabbitEventBusContainer, IProducerContainer { private readonly ConcurrentDictionary eventBusDictionary = new ConcurrentDictionary(); private readonly List eventBusList = new List(); readonly IRabbitMQClient rabbitMQClient; readonly IServiceProvider serviceProvider; private readonly IObserverUnitContainer observerUnitContainer; private readonly RabbitOptions rabbitOptions; public EventBusContainer( IServiceProvider serviceProvider, IObserverUnitContainer observerUnitContainer, IRabbitMQClient rabbitMQClient, IOptions rabbitOptions) { this.serviceProvider = serviceProvider; this.rabbitMQClient = rabbitMQClient; this.observerUnitContainer = observerUnitContainer; this.rabbitOptions = rabbitOptions.Value; } public async Task AutoRegister() { var observableList = new List<(Type type, ProducerAttribute config)>(); var eventList = new List<(Type type, EventAttribute config)>(); var evenHandlertList = new List<(Type type, EventHandlerAttribute config)>(); AddEventAndEventHandlerInfoList(eventList, evenHandlertList); foreach (var (type, config) in eventList) { var eventName = string.IsNullOrEmpty(config.EventName) ? type.Name.ToLower() : config.EventName; var eventBus = CreateEventBus(eventName, rabbitOptions.Prefix, 1, false, true, true).BindEvent(type, eventName); await eventBus.AddGrainConsumer(); } foreach (var (type, config) in evenHandlertList) { var eventName = string.IsNullOrEmpty(config.EventName) ? type.Name.ToLower() : config.EventName; var eventBus = CreateEventBus(eventName, rabbitOptions.Prefix, 1, false, true, true).BindEvent(type, eventName); await eventBus.AddGrainConsumer(); } } public RabbitEventBus CreateEventBus(string exchange, string routePrefix, int lBCount = 1, bool autoAck = false, bool reenqueue = true, bool persistent = true) { return new RabbitEventBus(observerUnitContainer, this, exchange, routePrefix, lBCount, autoAck, reenqueue, persistent); } public Task Work(RabbitEventBus bus) { if (eventBusDictionary.TryAdd(bus.Event, bus)) { eventBusList.Add(bus); using var channel = rabbitMQClient.PullChannel(); channel.Model.ExchangeDeclare(bus.Exchange, "direct", true); return Task.CompletedTask; } else throw new EventBusRepeatException(bus.Event.FullName); } readonly ConcurrentDictionary producerDict = new ConcurrentDictionary(); public ValueTask GetProducer(Type type) { if (eventBusDictionary.TryGetValue(type, out var eventBus)) { return new ValueTask(producerDict.GetOrAdd(type, key => { return new RabbitProducer(rabbitMQClient, eventBus); })); } else { throw new NotImplementedException($"{nameof(IProducer)} of {type.FullName}"); } } public ValueTask GetProducer() { return GetProducer(typeof(T)); } public List GetConsumers() { var result = new List(); foreach (var eventBus in eventBusList) { result.AddRange(eventBus.Consumers); } return result; } #region helpers private void AddEventAndEventHandlerInfoList(List<(Type type, EventAttribute config)> eventList, List<(Type type, EventHandlerAttribute config)> evenHandlertList) { foreach (var assembly in AssemblyHelper.GetAssemblies(serviceProvider.GetService>())) { foreach (var type in assembly.GetTypes()) { foreach (var attribute in type.GetCustomAttributes(false)) { if (attribute is EventAttribute config) { eventList.Add((type, config)); break; } } } } foreach (var assembly in AssemblyHelper.GetAssemblies(serviceProvider.GetService>())) { foreach (var type in assembly.GetTypes()) { foreach (var attribute in type.GetCustomAttributes(false)) { if (attribute is EventHandlerAttribute config) { evenHandlertList.Add((type, config)); break; } } } } } #endregion } }