From 79b51f68d42de04dc35473072a00b85d834725e5 Mon Sep 17 00:00:00 2001 From: dingsongjie <dingsongjie@DESKTOP-0AS088R> Date: Thu, 12 Mar 2020 11:21:57 +0800 Subject: [PATCH] 完成 重构后的 bug修改 --- samples/apis/Backet.Api/Startup.cs | 5 +++-- src/Pole.Core/Channels/ChannelOptions.cs | 2 +- src/Pole.Core/PoleOptions.cs | 5 ----- src/Pole.Core/ProducerOptions.cs | 11 ----------- src/Pole.EventBus.Rabbitmq/PoleRabbitmqStartupConfigExtensions.cs | 3 +-- src/Pole.EventBus/EventBuffer.cs | 4 ++-- src/Pole.EventBus/PoleEventBusOption.cs | 17 +++++++++++++++++ src/Pole.EventBus/PoleEventBusStartupConfigExtensions.cs | 16 ++++++++-------- src/Pole.EventBus/Processor/ExpiredEventsCollectorProcessor.cs | 4 ++-- src/Pole.EventBus/Processor/PendingMessageRetryProcessor.cs | 12 +++++------- src/Pole.EventStorage.PostgreSql/PolePostgreSqlStartupConfigExtensions.cs | 6 +++--- src/Pole.EventStorage.PostgreSql/PostgreSqlEventStorage.cs | 5 +++-- 12 files changed, 45 insertions(+), 45 deletions(-) delete mode 100644 src/Pole.Core/ProducerOptions.cs create mode 100644 src/Pole.EventBus/PoleEventBusOption.cs diff --git a/samples/apis/Backet.Api/Startup.cs b/samples/apis/Backet.Api/Startup.cs index 21f0358..2fcf032 100644 --- a/samples/apis/Backet.Api/Startup.cs +++ b/samples/apis/Backet.Api/Startup.cs @@ -41,13 +41,14 @@ namespace Backet.Api })); services.AddPole(config => { - config.AddRabbitMQ(option => + config.AddEventBus(); + config.AddEventBusRabbitMQTransport(option => { option.Hosts = new string[1] { Configuration["RabbitmqConfig:HostAddress"] }; option.Password = Configuration["RabbitmqConfig:HostPassword"]; option.UserName = Configuration["RabbitmqConfig:HostUserName"]; }); - config.AddEntityFrameworkEventStorage<BacketDbContext>(); + config.AddEventBusEFCoreStorage<BacketDbContext>(); }); services.ConfigureGrainStorageOptions<BacketDbContext, BacketGrain, Backet.Api.Domain.AggregatesModel.BacketAggregate.Backet>( diff --git a/src/Pole.Core/Channels/ChannelOptions.cs b/src/Pole.Core/Channels/ChannelOptions.cs index 8ee2d13..4ad8eff 100644 --- a/src/Pole.Core/Channels/ChannelOptions.cs +++ b/src/Pole.Core/Channels/ChannelOptions.cs @@ -5,7 +5,7 @@ /// <summary> /// 批量数据处理每次处理的最大数据量 /// </summary> - public int MaxBatchSize { get; set; } = 100000; + public int MaxBatchSize { get; set; } = 10000; /// <summary> /// 批量数据接收的最大延时 /// </summary> diff --git a/src/Pole.Core/PoleOptions.cs b/src/Pole.Core/PoleOptions.cs index 270468e..3059b92 100644 --- a/src/Pole.Core/PoleOptions.cs +++ b/src/Pole.Core/PoleOptions.cs @@ -7,11 +7,6 @@ namespace Pole.Core { public class PoleOptions { - public int PendingMessageRetryIntervalSeconds { get; set; } = 30; - - public int ExpiredEventsPreBulkDeleteDelaySeconds { get; set; } = 3; - public int ExpiredEventsCollectIntervalSeconds { get; set; } = 60 * 60; - public int PublishedEventsExpiredAfterSeconds { get; set; } = 60 * 60; public IServiceCollection Services { get; private set; } } } diff --git a/src/Pole.Core/ProducerOptions.cs b/src/Pole.Core/ProducerOptions.cs deleted file mode 100644 index 0084585..0000000 --- a/src/Pole.Core/ProducerOptions.cs +++ /dev/null @@ -1,11 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Text; - -namespace Pole.Core -{ - public class ProducerOptions - { - public int MaxFailedRetryCount { get; set; } = 40; - } -} diff --git a/src/Pole.EventBus.Rabbitmq/PoleRabbitmqStartupConfigExtensions.cs b/src/Pole.EventBus.Rabbitmq/PoleRabbitmqStartupConfigExtensions.cs index 7ad5bf0..890c2a1 100644 --- a/src/Pole.EventBus.Rabbitmq/PoleRabbitmqStartupConfigExtensions.cs +++ b/src/Pole.EventBus.Rabbitmq/PoleRabbitmqStartupConfigExtensions.cs @@ -13,14 +13,13 @@ namespace Microsoft.Extensions.DependencyInjection public static class PoleRabbitmqStartupConfigExtensions { private static ConcurrentDictionary<string, ConsumerRunner> ConsumerRunners = new ConcurrentDictionary<string, ConsumerRunner>(); - public static void AddRabbitMQ( + public static void AddEventBusRabbitMQTransport( this StartupConfig startupOption, Action<RabbitOptions> rabbitConfigAction, Func<IRabbitEventBusContainer, Task> eventBusConfig = default) { startupOption.Services.Configure<RabbitOptions>(config => rabbitConfigAction(config)); startupOption.Services.AddSingleton<IRabbitMQClient, RabbitMQClient>(); - //startupOption.Services.AddHostedService<ConsumerManager>(); startupOption.Services.AddSingleton<IRabbitEventBusContainer, EventBusContainer>(); startupOption.Services.AddSingleton<IProducer, RabbitProducer>(); startupOption.Services.AddSingleton(serviceProvider => serviceProvider.GetService<IRabbitEventBusContainer>() as IProducerInfoContainer); diff --git a/src/Pole.EventBus/EventBuffer.cs b/src/Pole.EventBus/EventBuffer.cs index 0c3ae74..f08ded9 100644 --- a/src/Pole.EventBus/EventBuffer.cs +++ b/src/Pole.EventBus/EventBuffer.cs @@ -29,9 +29,9 @@ namespace Pole.EventBus private readonly IProducerInfoContainer producerContainer; private readonly IProducer producer; private readonly IEventStorage eventStorage; - private readonly PoleOptions options; + private readonly PoleEventBusOption options; private Task<bool> waitToReadTask; - public EventBuffer(ILogger<EventBuffer> logger, IProducerInfoContainer producerContainer, IProducer producer, IEventStorage eventStorage, IOptions<PoleOptions> options) + public EventBuffer(ILogger<EventBuffer> logger, IProducerInfoContainer producerContainer, IProducer producer, IEventStorage eventStorage, IOptions<PoleEventBusOption> options) { this.logger = logger; this.producerContainer = producerContainer; diff --git a/src/Pole.EventBus/PoleEventBusOption.cs b/src/Pole.EventBus/PoleEventBusOption.cs new file mode 100644 index 0000000..e4e4f99 --- /dev/null +++ b/src/Pole.EventBus/PoleEventBusOption.cs @@ -0,0 +1,17 @@ +using Microsoft.Extensions.DependencyInjection; +using System; +using System.Collections.Generic; +using System.Text; + +namespace Pole.EventBus +{ + public class PoleEventBusOption + { + public IServiceCollection Service { get; set; } + public int PendingMessageRetryIntervalSeconds { get; set; } = 30; + public int ExpiredEventsPreBulkDeleteDelaySeconds { get; set; } = 3; + public int ExpiredEventsCollectIntervalSeconds { get; set; } = 60 * 60; + public int PublishedEventsExpiredAfterSeconds { get; set; } = 60 * 60; + public int MaxFailedRetryCount { get; set; } = 40; + } +} diff --git a/src/Pole.EventBus/PoleEventBusStartupConfigExtensions.cs b/src/Pole.EventBus/PoleEventBusStartupConfigExtensions.cs index cec3faa..f51d4bf 100644 --- a/src/Pole.EventBus/PoleEventBusStartupConfigExtensions.cs +++ b/src/Pole.EventBus/PoleEventBusStartupConfigExtensions.cs @@ -1,6 +1,7 @@ using Microsoft.Extensions.DependencyInjection; using Pole.Core; using Pole.Core.Processor; +using Pole.EventBus; using Pole.EventBus.Processor; using Pole.EventBus.Processor.Server; using Pole.EventBus.UnitOfWork; @@ -8,13 +9,16 @@ using System; using System.Collections.Generic; using System.Text; -namespace Pole.EventBus +namespace Microsoft.Extensions.DependencyInjection { public static class PoleEventBusStartupConfigExtensions { - public static void AddEventBus( - this StartupConfig startupOption) + public static StartupConfig AddEventBus(this StartupConfig startupOption, Action<PoleEventBusOption> config = null) { + Action<PoleEventBusOption> defaultConfig = option => { }; + var finalConfig = config ?? defaultConfig; + + startupOption.Services.Configure(finalConfig); startupOption.Services.AddSingleton<IEventBuffer, EventBuffer>(); startupOption.Services.AddScoped<IBus, Bus>(); startupOption.Services.AddSingleton<IObserverUnitContainer, ObserverUnitContainer>(); @@ -23,11 +27,7 @@ namespace Pole.EventBus startupOption.Services.AddHostedService<BackgroundServiceBasedProcessorServer>(); startupOption.Services.AddScoped<IUnitOfWork, Pole.EventBus.UnitOfWork.UnitOfWork>(); startupOption.Services.AddSingleton<IEventTypeFinder, EventTypeFinder>(); - - Startup.Register(async serviceProvider => - { - - }); + return startupOption; } } } diff --git a/src/Pole.EventBus/Processor/ExpiredEventsCollectorProcessor.cs b/src/Pole.EventBus/Processor/ExpiredEventsCollectorProcessor.cs index 0b44215..b0cc35e 100644 --- a/src/Pole.EventBus/Processor/ExpiredEventsCollectorProcessor.cs +++ b/src/Pole.EventBus/Processor/ExpiredEventsCollectorProcessor.cs @@ -15,7 +15,7 @@ namespace Pole.EventBus.Processor private readonly ILogger logger; private readonly IEventStorageInitializer initializer; private readonly IEventStorage eventstorage; - private readonly PoleOptions poleOptions; + private readonly PoleEventBusOption poleOptions; private const int ItemBatch = 1000; private readonly TimeSpan _waitingInterval = TimeSpan.FromMinutes(5); @@ -27,7 +27,7 @@ namespace Pole.EventBus.Processor ILogger<ExpiredEventsCollectorProcessor> logger, IEventStorageInitializer initializer, IEventStorage eventstorage, - IOptions<PoleOptions> poleOptions) + IOptions<PoleEventBusOption> poleOptions) { this.logger = logger; this.initializer = initializer; diff --git a/src/Pole.EventBus/Processor/PendingMessageRetryProcessor.cs b/src/Pole.EventBus/Processor/PendingMessageRetryProcessor.cs index 6b76726..b728966 100644 --- a/src/Pole.EventBus/Processor/PendingMessageRetryProcessor.cs +++ b/src/Pole.EventBus/Processor/PendingMessageRetryProcessor.cs @@ -17,22 +17,20 @@ namespace Pole.EventBus.Processor class PendingMessageRetryProcessor : ProcessorBase { private readonly IEventStorage eventStorage; - private readonly PoleOptions options; + private readonly PoleEventBusOption options; private readonly IProducerInfoContainer producerContainer; private readonly ISerializer serializer; private readonly ILogger<PendingMessageRetryProcessor> logger; - private readonly ProducerOptions producerOptions; private readonly IProducer producer; private readonly IEventBuffer eventBuffer; - public PendingMessageRetryProcessor(IEventStorage eventStorage, IOptions<PoleOptions> options, ILogger<PendingMessageRetryProcessor> logger, - IProducerInfoContainer producerContainer, ISerializer serializer, IOptions<ProducerOptions> producerOptions, IProducer producer, IEventBuffer eventBuffer) + public PendingMessageRetryProcessor(IEventStorage eventStorage, IOptions<PoleEventBusOption> options, ILogger<PendingMessageRetryProcessor> logger, + IProducerInfoContainer producerContainer, ISerializer serializer, IProducer producer, IEventBuffer eventBuffer) { this.eventStorage = eventStorage; - this.options = options.Value ?? throw new Exception($"{nameof(PoleOptions)} Must be injected"); + this.options = options.Value ?? throw new Exception($"{nameof(PoleEventBusOption)} Must be injected"); this.logger = logger; this.producerContainer = producerContainer; this.serializer = serializer; - this.producerOptions = producerOptions.Value ?? throw new Exception($"{nameof(ProducerOptions)} Must be injected"); this.producer = producer; this.eventBuffer = eventBuffer; } @@ -68,7 +66,7 @@ namespace Pole.EventBus.Processor var eventContentBytes = Encoding.UTF8.GetBytes(pendingMessage.Content); var bytesTransport = new EventBytesTransport(pendingMessage.Name, pendingMessage.Id, eventContentBytes); var bytes = bytesTransport.GetBytes(); - if (pendingMessage.Retries > producerOptions.MaxFailedRetryCount) + if (pendingMessage.Retries > options.MaxFailedRetryCount) { pendingMessage.StatusName = nameof(EventStatus.Failed); continue; diff --git a/src/Pole.EventStorage.PostgreSql/PolePostgreSqlStartupConfigExtensions.cs b/src/Pole.EventStorage.PostgreSql/PolePostgreSqlStartupConfigExtensions.cs index 6384524..6e5c558 100644 --- a/src/Pole.EventStorage.PostgreSql/PolePostgreSqlStartupConfigExtensions.cs +++ b/src/Pole.EventStorage.PostgreSql/PolePostgreSqlStartupConfigExtensions.cs @@ -11,12 +11,12 @@ namespace Microsoft.Extensions.DependencyInjection { public static class PolePostgreSqlStartupConfigExtensions { - public static StartupConfig AddEntityFrameworkEventStorage<TContext>(this StartupConfig config) + public static StartupConfig AddEventBusEFCoreStorage<TContext>(this StartupConfig config) where TContext : DbContext { - return config.AddEntityFrameworkEventStorage<TContext>(opt => { }); + return config.AddEventBusEFCoreStorage<TContext>(opt => { }); } - public static StartupConfig AddEntityFrameworkEventStorage<TContext>(this StartupConfig config, Action<EFOptions> configure) + public static StartupConfig AddEventBusEFCoreStorage<TContext>(this StartupConfig config, Action<EFOptions> configure) where TContext : DbContext { if (configure == null) throw new ArgumentNullException(nameof(configure)); diff --git a/src/Pole.EventStorage.PostgreSql/PostgreSqlEventStorage.cs b/src/Pole.EventStorage.PostgreSql/PostgreSqlEventStorage.cs index 9c1ee85..c8fb158 100644 --- a/src/Pole.EventStorage.PostgreSql/PostgreSqlEventStorage.cs +++ b/src/Pole.EventStorage.PostgreSql/PostgreSqlEventStorage.cs @@ -3,6 +3,7 @@ using Microsoft.EntityFrameworkCore.Storage; using Microsoft.Extensions.Options; using Npgsql; using Pole.Core; +using Pole.EventBus; using Pole.EventBus.EventStorage; using System; using System.Collections.Generic; @@ -17,10 +18,10 @@ namespace Pole.EventStorage.PostgreSql class PostgreSqlEventStorage : IEventStorage { private readonly string tableName; - private readonly ProducerOptions producerOptions; + private readonly PoleEventBusOption producerOptions; private readonly PostgreSqlOptions options; private readonly IEventStorageInitializer eventStorageInitializer; - public PostgreSqlEventStorage(IOptions<PostgreSqlOptions> postgreSqlOptions, IOptions<ProducerOptions> producerOptions, IEventStorageInitializer eventStorageInitializer) + public PostgreSqlEventStorage(IOptions<PostgreSqlOptions> postgreSqlOptions, IOptions<PoleEventBusOption> producerOptions, IEventStorageInitializer eventStorageInitializer) { this.producerOptions = producerOptions.Value; this.options = postgreSqlOptions.Value; -- libgit2 0.25.0