using System; using System.Collections.Concurrent; using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Options; using Pole.Core; using Pole.Core.EventBus; using Pole.EventBus.RabbitMQ; namespace Microsoft.Extensions.DependencyInjection { public static class PoleRabbitmqStartupConfigExtensions { private static ConcurrentDictionary ConsumerRunners = new ConcurrentDictionary(); public static void AddRabbitMQ( this StartupConfig startupOption, Action rabbitConfigAction, Func eventBusConfig = default) { startupOption.Services.Configure(config => rabbitConfigAction(config)); startupOption.Services.AddSingleton(); //startupOption.Services.AddHostedService(); startupOption.Services.AddSingleton(); startupOption.Services.AddSingleton(serviceProvider => serviceProvider.GetService() as IProducerContainer); Startup.Register(async serviceProvider => { var container = serviceProvider.GetService(); var client = serviceProvider.GetService(); var rabbitOptions = serviceProvider.GetService>().Value; if (eventBusConfig != default) await eventBusConfig(container); else await container.AutoRegister(); var consumers = container.GetConsumers(); foreach (var consumer in consumers) { if (consumer is RabbitConsumer value) { var queue = value.QueueInfo; var key = queue.Queue; var runner = new ConsumerRunner(client, serviceProvider, value, queue, rabbitOptions); ConsumerRunners.TryAdd(key, runner); await runner.Run(); } } }); } } }