diff --git a/samples/apis/Backet.Api/Startup.cs b/samples/apis/Backet.Api/Startup.cs index 21f0358..2fcf032 100644 --- a/samples/apis/Backet.Api/Startup.cs +++ b/samples/apis/Backet.Api/Startup.cs @@ -41,13 +41,14 @@ namespace Backet.Api })); services.AddPole(config => { - config.AddRabbitMQ(option => + config.AddEventBus(); + config.AddEventBusRabbitMQTransport(option => { option.Hosts = new string[1] { Configuration["RabbitmqConfig:HostAddress"] }; option.Password = Configuration["RabbitmqConfig:HostPassword"]; option.UserName = Configuration["RabbitmqConfig:HostUserName"]; }); - config.AddEntityFrameworkEventStorage(); + config.AddEventBusEFCoreStorage(); }); services.ConfigureGrainStorageOptions( diff --git a/src/Pole.Core/Channels/ChannelOptions.cs b/src/Pole.Core/Channels/ChannelOptions.cs index 8ee2d13..4ad8eff 100644 --- a/src/Pole.Core/Channels/ChannelOptions.cs +++ b/src/Pole.Core/Channels/ChannelOptions.cs @@ -5,7 +5,7 @@ /// /// 批量数据处理每次处理的最大数据量 /// - public int MaxBatchSize { get; set; } = 100000; + public int MaxBatchSize { get; set; } = 10000; /// /// 批量数据接收的最大延时 /// diff --git a/src/Pole.Core/PoleOptions.cs b/src/Pole.Core/PoleOptions.cs index 270468e..3059b92 100644 --- a/src/Pole.Core/PoleOptions.cs +++ b/src/Pole.Core/PoleOptions.cs @@ -7,11 +7,6 @@ namespace Pole.Core { public class PoleOptions { - public int PendingMessageRetryIntervalSeconds { get; set; } = 30; - - public int ExpiredEventsPreBulkDeleteDelaySeconds { get; set; } = 3; - public int ExpiredEventsCollectIntervalSeconds { get; set; } = 60 * 60; - public int PublishedEventsExpiredAfterSeconds { get; set; } = 60 * 60; public IServiceCollection Services { get; private set; } } } diff --git a/src/Pole.EventBus.Rabbitmq/PoleRabbitmqStartupConfigExtensions.cs b/src/Pole.EventBus.Rabbitmq/PoleRabbitmqStartupConfigExtensions.cs index 7ad5bf0..890c2a1 100644 --- a/src/Pole.EventBus.Rabbitmq/PoleRabbitmqStartupConfigExtensions.cs +++ b/src/Pole.EventBus.Rabbitmq/PoleRabbitmqStartupConfigExtensions.cs @@ -13,14 +13,13 @@ namespace Microsoft.Extensions.DependencyInjection public static class PoleRabbitmqStartupConfigExtensions { private static ConcurrentDictionary ConsumerRunners = new ConcurrentDictionary(); - public static void AddRabbitMQ( + public static void AddEventBusRabbitMQTransport( this StartupConfig startupOption, Action rabbitConfigAction, Func eventBusConfig = default) { startupOption.Services.Configure(config => rabbitConfigAction(config)); startupOption.Services.AddSingleton(); - //startupOption.Services.AddHostedService(); startupOption.Services.AddSingleton(); startupOption.Services.AddSingleton(); startupOption.Services.AddSingleton(serviceProvider => serviceProvider.GetService() as IProducerInfoContainer); diff --git a/src/Pole.EventBus/EventBuffer.cs b/src/Pole.EventBus/EventBuffer.cs index 0c3ae74..f08ded9 100644 --- a/src/Pole.EventBus/EventBuffer.cs +++ b/src/Pole.EventBus/EventBuffer.cs @@ -29,9 +29,9 @@ namespace Pole.EventBus private readonly IProducerInfoContainer producerContainer; private readonly IProducer producer; private readonly IEventStorage eventStorage; - private readonly PoleOptions options; + private readonly PoleEventBusOption options; private Task waitToReadTask; - public EventBuffer(ILogger logger, IProducerInfoContainer producerContainer, IProducer producer, IEventStorage eventStorage, IOptions options) + public EventBuffer(ILogger logger, IProducerInfoContainer producerContainer, IProducer producer, IEventStorage eventStorage, IOptions options) { this.logger = logger; this.producerContainer = producerContainer; diff --git a/src/Pole.Core/ProducerOptions.cs b/src/Pole.EventBus/PoleEventBusOption.cs similarity index 53% rename from src/Pole.Core/ProducerOptions.cs rename to src/Pole.EventBus/PoleEventBusOption.cs index 0084585..e4e4f99 100644 --- a/src/Pole.Core/ProducerOptions.cs +++ b/src/Pole.EventBus/PoleEventBusOption.cs @@ -1,11 +1,17 @@ -using System; +using Microsoft.Extensions.DependencyInjection; +using System; using System.Collections.Generic; using System.Text; -namespace Pole.Core +namespace Pole.EventBus { - public class ProducerOptions + public class PoleEventBusOption { + public IServiceCollection Service { get; set; } + public int PendingMessageRetryIntervalSeconds { get; set; } = 30; + public int ExpiredEventsPreBulkDeleteDelaySeconds { get; set; } = 3; + public int ExpiredEventsCollectIntervalSeconds { get; set; } = 60 * 60; + public int PublishedEventsExpiredAfterSeconds { get; set; } = 60 * 60; public int MaxFailedRetryCount { get; set; } = 40; } } diff --git a/src/Pole.EventBus/PoleEventBusStartupConfigExtensions.cs b/src/Pole.EventBus/PoleEventBusStartupConfigExtensions.cs index cec3faa..f51d4bf 100644 --- a/src/Pole.EventBus/PoleEventBusStartupConfigExtensions.cs +++ b/src/Pole.EventBus/PoleEventBusStartupConfigExtensions.cs @@ -1,6 +1,7 @@ using Microsoft.Extensions.DependencyInjection; using Pole.Core; using Pole.Core.Processor; +using Pole.EventBus; using Pole.EventBus.Processor; using Pole.EventBus.Processor.Server; using Pole.EventBus.UnitOfWork; @@ -8,13 +9,16 @@ using System; using System.Collections.Generic; using System.Text; -namespace Pole.EventBus +namespace Microsoft.Extensions.DependencyInjection { public static class PoleEventBusStartupConfigExtensions { - public static void AddEventBus( - this StartupConfig startupOption) + public static StartupConfig AddEventBus(this StartupConfig startupOption, Action config = null) { + Action defaultConfig = option => { }; + var finalConfig = config ?? defaultConfig; + + startupOption.Services.Configure(finalConfig); startupOption.Services.AddSingleton(); startupOption.Services.AddScoped(); startupOption.Services.AddSingleton(); @@ -23,11 +27,7 @@ namespace Pole.EventBus startupOption.Services.AddHostedService(); startupOption.Services.AddScoped(); startupOption.Services.AddSingleton(); - - Startup.Register(async serviceProvider => - { - - }); + return startupOption; } } } diff --git a/src/Pole.EventBus/Processor/ExpiredEventsCollectorProcessor.cs b/src/Pole.EventBus/Processor/ExpiredEventsCollectorProcessor.cs index 0b44215..b0cc35e 100644 --- a/src/Pole.EventBus/Processor/ExpiredEventsCollectorProcessor.cs +++ b/src/Pole.EventBus/Processor/ExpiredEventsCollectorProcessor.cs @@ -15,7 +15,7 @@ namespace Pole.EventBus.Processor private readonly ILogger logger; private readonly IEventStorageInitializer initializer; private readonly IEventStorage eventstorage; - private readonly PoleOptions poleOptions; + private readonly PoleEventBusOption poleOptions; private const int ItemBatch = 1000; private readonly TimeSpan _waitingInterval = TimeSpan.FromMinutes(5); @@ -27,7 +27,7 @@ namespace Pole.EventBus.Processor ILogger logger, IEventStorageInitializer initializer, IEventStorage eventstorage, - IOptions poleOptions) + IOptions poleOptions) { this.logger = logger; this.initializer = initializer; diff --git a/src/Pole.EventBus/Processor/PendingMessageRetryProcessor.cs b/src/Pole.EventBus/Processor/PendingMessageRetryProcessor.cs index 6b76726..b728966 100644 --- a/src/Pole.EventBus/Processor/PendingMessageRetryProcessor.cs +++ b/src/Pole.EventBus/Processor/PendingMessageRetryProcessor.cs @@ -17,22 +17,20 @@ namespace Pole.EventBus.Processor class PendingMessageRetryProcessor : ProcessorBase { private readonly IEventStorage eventStorage; - private readonly PoleOptions options; + private readonly PoleEventBusOption options; private readonly IProducerInfoContainer producerContainer; private readonly ISerializer serializer; private readonly ILogger logger; - private readonly ProducerOptions producerOptions; private readonly IProducer producer; private readonly IEventBuffer eventBuffer; - public PendingMessageRetryProcessor(IEventStorage eventStorage, IOptions options, ILogger logger, - IProducerInfoContainer producerContainer, ISerializer serializer, IOptions producerOptions, IProducer producer, IEventBuffer eventBuffer) + public PendingMessageRetryProcessor(IEventStorage eventStorage, IOptions options, ILogger logger, + IProducerInfoContainer producerContainer, ISerializer serializer, IProducer producer, IEventBuffer eventBuffer) { this.eventStorage = eventStorage; - this.options = options.Value ?? throw new Exception($"{nameof(PoleOptions)} Must be injected"); + this.options = options.Value ?? throw new Exception($"{nameof(PoleEventBusOption)} Must be injected"); this.logger = logger; this.producerContainer = producerContainer; this.serializer = serializer; - this.producerOptions = producerOptions.Value ?? throw new Exception($"{nameof(ProducerOptions)} Must be injected"); this.producer = producer; this.eventBuffer = eventBuffer; } @@ -68,7 +66,7 @@ namespace Pole.EventBus.Processor var eventContentBytes = Encoding.UTF8.GetBytes(pendingMessage.Content); var bytesTransport = new EventBytesTransport(pendingMessage.Name, pendingMessage.Id, eventContentBytes); var bytes = bytesTransport.GetBytes(); - if (pendingMessage.Retries > producerOptions.MaxFailedRetryCount) + if (pendingMessage.Retries > options.MaxFailedRetryCount) { pendingMessage.StatusName = nameof(EventStatus.Failed); continue; diff --git a/src/Pole.EventStorage.PostgreSql/PolePostgreSqlStartupConfigExtensions.cs b/src/Pole.EventStorage.PostgreSql/PolePostgreSqlStartupConfigExtensions.cs index 6384524..6e5c558 100644 --- a/src/Pole.EventStorage.PostgreSql/PolePostgreSqlStartupConfigExtensions.cs +++ b/src/Pole.EventStorage.PostgreSql/PolePostgreSqlStartupConfigExtensions.cs @@ -11,12 +11,12 @@ namespace Microsoft.Extensions.DependencyInjection { public static class PolePostgreSqlStartupConfigExtensions { - public static StartupConfig AddEntityFrameworkEventStorage(this StartupConfig config) + public static StartupConfig AddEventBusEFCoreStorage(this StartupConfig config) where TContext : DbContext { - return config.AddEntityFrameworkEventStorage(opt => { }); + return config.AddEventBusEFCoreStorage(opt => { }); } - public static StartupConfig AddEntityFrameworkEventStorage(this StartupConfig config, Action configure) + public static StartupConfig AddEventBusEFCoreStorage(this StartupConfig config, Action configure) where TContext : DbContext { if (configure == null) throw new ArgumentNullException(nameof(configure)); diff --git a/src/Pole.EventStorage.PostgreSql/PostgreSqlEventStorage.cs b/src/Pole.EventStorage.PostgreSql/PostgreSqlEventStorage.cs index 9c1ee85..c8fb158 100644 --- a/src/Pole.EventStorage.PostgreSql/PostgreSqlEventStorage.cs +++ b/src/Pole.EventStorage.PostgreSql/PostgreSqlEventStorage.cs @@ -3,6 +3,7 @@ using Microsoft.EntityFrameworkCore.Storage; using Microsoft.Extensions.Options; using Npgsql; using Pole.Core; +using Pole.EventBus; using Pole.EventBus.EventStorage; using System; using System.Collections.Generic; @@ -17,10 +18,10 @@ namespace Pole.EventStorage.PostgreSql class PostgreSqlEventStorage : IEventStorage { private readonly string tableName; - private readonly ProducerOptions producerOptions; + private readonly PoleEventBusOption producerOptions; private readonly PostgreSqlOptions options; private readonly IEventStorageInitializer eventStorageInitializer; - public PostgreSqlEventStorage(IOptions postgreSqlOptions, IOptions producerOptions, IEventStorageInitializer eventStorageInitializer) + public PostgreSqlEventStorage(IOptions postgreSqlOptions, IOptions producerOptions, IEventStorageInitializer eventStorageInitializer) { this.producerOptions = producerOptions.Value; this.options = postgreSqlOptions.Value;