Commit 79b51f68 by dingsongjie

完成 重构后的 bug修改

parent 5511b5b8
......@@ -41,13 +41,14 @@ namespace Backet.Api
}));
services.AddPole(config =>
{
config.AddRabbitMQ(option =>
config.AddEventBus();
config.AddEventBusRabbitMQTransport(option =>
{
option.Hosts = new string[1] { Configuration["RabbitmqConfig:HostAddress"] };
option.Password = Configuration["RabbitmqConfig:HostPassword"];
option.UserName = Configuration["RabbitmqConfig:HostUserName"];
});
config.AddEntityFrameworkEventStorage<BacketDbContext>();
config.AddEventBusEFCoreStorage<BacketDbContext>();
});
services.ConfigureGrainStorageOptions<BacketDbContext, BacketGrain, Backet.Api.Domain.AggregatesModel.BacketAggregate.Backet>(
......
......@@ -5,7 +5,7 @@
/// <summary>
/// 批量数据处理每次处理的最大数据量
/// </summary>
public int MaxBatchSize { get; set; } = 100000;
public int MaxBatchSize { get; set; } = 10000;
/// <summary>
/// 批量数据接收的最大延时
/// </summary>
......
......@@ -7,11 +7,6 @@ namespace Pole.Core
{
public class PoleOptions
{
public int PendingMessageRetryIntervalSeconds { get; set; } = 30;
public int ExpiredEventsPreBulkDeleteDelaySeconds { get; set; } = 3;
public int ExpiredEventsCollectIntervalSeconds { get; set; } = 60 * 60;
public int PublishedEventsExpiredAfterSeconds { get; set; } = 60 * 60;
public IServiceCollection Services { get; private set; }
}
}
......@@ -13,14 +13,13 @@ namespace Microsoft.Extensions.DependencyInjection
public static class PoleRabbitmqStartupConfigExtensions
{
private static ConcurrentDictionary<string, ConsumerRunner> ConsumerRunners = new ConcurrentDictionary<string, ConsumerRunner>();
public static void AddRabbitMQ(
public static void AddEventBusRabbitMQTransport(
this StartupConfig startupOption,
Action<RabbitOptions> rabbitConfigAction,
Func<IRabbitEventBusContainer, Task> eventBusConfig = default)
{
startupOption.Services.Configure<RabbitOptions>(config => rabbitConfigAction(config));
startupOption.Services.AddSingleton<IRabbitMQClient, RabbitMQClient>();
//startupOption.Services.AddHostedService<ConsumerManager>();
startupOption.Services.AddSingleton<IRabbitEventBusContainer, EventBusContainer>();
startupOption.Services.AddSingleton<IProducer, RabbitProducer>();
startupOption.Services.AddSingleton(serviceProvider => serviceProvider.GetService<IRabbitEventBusContainer>() as IProducerInfoContainer);
......
......@@ -29,9 +29,9 @@ namespace Pole.EventBus
private readonly IProducerInfoContainer producerContainer;
private readonly IProducer producer;
private readonly IEventStorage eventStorage;
private readonly PoleOptions options;
private readonly PoleEventBusOption options;
private Task<bool> waitToReadTask;
public EventBuffer(ILogger<EventBuffer> logger, IProducerInfoContainer producerContainer, IProducer producer, IEventStorage eventStorage, IOptions<PoleOptions> options)
public EventBuffer(ILogger<EventBuffer> logger, IProducerInfoContainer producerContainer, IProducer producer, IEventStorage eventStorage, IOptions<PoleEventBusOption> options)
{
this.logger = logger;
this.producerContainer = producerContainer;
......
using System;
using Microsoft.Extensions.DependencyInjection;
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.Core
namespace Pole.EventBus
{
public class ProducerOptions
public class PoleEventBusOption
{
public IServiceCollection Service { get; set; }
public int PendingMessageRetryIntervalSeconds { get; set; } = 30;
public int ExpiredEventsPreBulkDeleteDelaySeconds { get; set; } = 3;
public int ExpiredEventsCollectIntervalSeconds { get; set; } = 60 * 60;
public int PublishedEventsExpiredAfterSeconds { get; set; } = 60 * 60;
public int MaxFailedRetryCount { get; set; } = 40;
}
}
using Microsoft.Extensions.DependencyInjection;
using Pole.Core;
using Pole.Core.Processor;
using Pole.EventBus;
using Pole.EventBus.Processor;
using Pole.EventBus.Processor.Server;
using Pole.EventBus.UnitOfWork;
......@@ -8,13 +9,16 @@ using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.EventBus
namespace Microsoft.Extensions.DependencyInjection
{
public static class PoleEventBusStartupConfigExtensions
{
public static void AddEventBus(
this StartupConfig startupOption)
public static StartupConfig AddEventBus(this StartupConfig startupOption, Action<PoleEventBusOption> config = null)
{
Action<PoleEventBusOption> defaultConfig = option => { };
var finalConfig = config ?? defaultConfig;
startupOption.Services.Configure(finalConfig);
startupOption.Services.AddSingleton<IEventBuffer, EventBuffer>();
startupOption.Services.AddScoped<IBus, Bus>();
startupOption.Services.AddSingleton<IObserverUnitContainer, ObserverUnitContainer>();
......@@ -23,11 +27,7 @@ namespace Pole.EventBus
startupOption.Services.AddHostedService<BackgroundServiceBasedProcessorServer>();
startupOption.Services.AddScoped<IUnitOfWork, Pole.EventBus.UnitOfWork.UnitOfWork>();
startupOption.Services.AddSingleton<IEventTypeFinder, EventTypeFinder>();
Startup.Register(async serviceProvider =>
{
});
return startupOption;
}
}
}
......@@ -15,7 +15,7 @@ namespace Pole.EventBus.Processor
private readonly ILogger logger;
private readonly IEventStorageInitializer initializer;
private readonly IEventStorage eventstorage;
private readonly PoleOptions poleOptions;
private readonly PoleEventBusOption poleOptions;
private const int ItemBatch = 1000;
private readonly TimeSpan _waitingInterval = TimeSpan.FromMinutes(5);
......@@ -27,7 +27,7 @@ namespace Pole.EventBus.Processor
ILogger<ExpiredEventsCollectorProcessor> logger,
IEventStorageInitializer initializer,
IEventStorage eventstorage,
IOptions<PoleOptions> poleOptions)
IOptions<PoleEventBusOption> poleOptions)
{
this.logger = logger;
this.initializer = initializer;
......
......@@ -17,22 +17,20 @@ namespace Pole.EventBus.Processor
class PendingMessageRetryProcessor : ProcessorBase
{
private readonly IEventStorage eventStorage;
private readonly PoleOptions options;
private readonly PoleEventBusOption options;
private readonly IProducerInfoContainer producerContainer;
private readonly ISerializer serializer;
private readonly ILogger<PendingMessageRetryProcessor> logger;
private readonly ProducerOptions producerOptions;
private readonly IProducer producer;
private readonly IEventBuffer eventBuffer;
public PendingMessageRetryProcessor(IEventStorage eventStorage, IOptions<PoleOptions> options, ILogger<PendingMessageRetryProcessor> logger,
IProducerInfoContainer producerContainer, ISerializer serializer, IOptions<ProducerOptions> producerOptions, IProducer producer, IEventBuffer eventBuffer)
public PendingMessageRetryProcessor(IEventStorage eventStorage, IOptions<PoleEventBusOption> options, ILogger<PendingMessageRetryProcessor> logger,
IProducerInfoContainer producerContainer, ISerializer serializer, IProducer producer, IEventBuffer eventBuffer)
{
this.eventStorage = eventStorage;
this.options = options.Value ?? throw new Exception($"{nameof(PoleOptions)} Must be injected");
this.options = options.Value ?? throw new Exception($"{nameof(PoleEventBusOption)} Must be injected");
this.logger = logger;
this.producerContainer = producerContainer;
this.serializer = serializer;
this.producerOptions = producerOptions.Value ?? throw new Exception($"{nameof(ProducerOptions)} Must be injected");
this.producer = producer;
this.eventBuffer = eventBuffer;
}
......@@ -68,7 +66,7 @@ namespace Pole.EventBus.Processor
var eventContentBytes = Encoding.UTF8.GetBytes(pendingMessage.Content);
var bytesTransport = new EventBytesTransport(pendingMessage.Name, pendingMessage.Id, eventContentBytes);
var bytes = bytesTransport.GetBytes();
if (pendingMessage.Retries > producerOptions.MaxFailedRetryCount)
if (pendingMessage.Retries > options.MaxFailedRetryCount)
{
pendingMessage.StatusName = nameof(EventStatus.Failed);
continue;
......
......@@ -11,12 +11,12 @@ namespace Microsoft.Extensions.DependencyInjection
{
public static class PolePostgreSqlStartupConfigExtensions
{
public static StartupConfig AddEntityFrameworkEventStorage<TContext>(this StartupConfig config)
public static StartupConfig AddEventBusEFCoreStorage<TContext>(this StartupConfig config)
where TContext : DbContext
{
return config.AddEntityFrameworkEventStorage<TContext>(opt => { });
return config.AddEventBusEFCoreStorage<TContext>(opt => { });
}
public static StartupConfig AddEntityFrameworkEventStorage<TContext>(this StartupConfig config, Action<EFOptions> configure)
public static StartupConfig AddEventBusEFCoreStorage<TContext>(this StartupConfig config, Action<EFOptions> configure)
where TContext : DbContext
{
if (configure == null) throw new ArgumentNullException(nameof(configure));
......
......@@ -3,6 +3,7 @@ using Microsoft.EntityFrameworkCore.Storage;
using Microsoft.Extensions.Options;
using Npgsql;
using Pole.Core;
using Pole.EventBus;
using Pole.EventBus.EventStorage;
using System;
using System.Collections.Generic;
......@@ -17,10 +18,10 @@ namespace Pole.EventStorage.PostgreSql
class PostgreSqlEventStorage : IEventStorage
{
private readonly string tableName;
private readonly ProducerOptions producerOptions;
private readonly PoleEventBusOption producerOptions;
private readonly PostgreSqlOptions options;
private readonly IEventStorageInitializer eventStorageInitializer;
public PostgreSqlEventStorage(IOptions<PostgreSqlOptions> postgreSqlOptions, IOptions<ProducerOptions> producerOptions, IEventStorageInitializer eventStorageInitializer)
public PostgreSqlEventStorage(IOptions<PostgreSqlOptions> postgreSqlOptions, IOptions<PoleEventBusOption> producerOptions, IEventStorageInitializer eventStorageInitializer)
{
this.producerOptions = producerOptions.Value;
this.options = postgreSqlOptions.Value;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment