using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Pole.Core.Processor; using Pole.EventBus.EventStorage; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace Pole.EventBus.Processor.Server { public class BackgroundServiceBasedProcessorServer : IHostedService { private readonly IServiceProvider _serviceProvider; private Task _compositeTask; public BackgroundServiceBasedProcessorServer(IServiceProvider serviceProvider) { _serviceProvider = serviceProvider; } public async Task StartAsync(CancellationToken cancellationToken) { var eventStorageInitializer = _serviceProvider.GetService(); await eventStorageInitializer.InitializeAsync(cancellationToken); ProcessingContext processingContext = new ProcessingContext(cancellationToken); List loopProcessors = new List(); var innerProcessors = _serviceProvider.GetServices(); var loggerFactory = _serviceProvider.GetService(); foreach (var innerProcessor in innerProcessors) { LoopProcessor processor = new LoopProcessor(innerProcessor, loggerFactory); loopProcessors.Add(processor); } var tasks = loopProcessors.Select(p => p.Process(processingContext)); _compositeTask = Task.WhenAll(tasks); await _compositeTask; } public Task StopAsync(CancellationToken cancellationToken) { return Task.CompletedTask; } } }