Commit c6504035 by dingsongjie

完成全部 依赖注入 与启动

parent 170a7cd6
...@@ -44,8 +44,7 @@ namespace Backet.Api.Controllers ...@@ -44,8 +44,7 @@ namespace Backet.Api.Controllers
public Task<bool> RemoveFirstItem() public Task<bool> RemoveFirstItem()
{ {
var id = "da8a489fa7b4409294ee1b358fbbfba5"; var id = "da8a489fa7b4409294ee1b358fbbfba5";
var grain = clusterClient.GetGrain<IBacketGrain>(id); var grain = clusterClient.GetGrain<IBacketGrain>(id);
clusterClient.
return grain.RemoveFirstItem(); return grain.RemoveFirstItem();
} }
} }
......
...@@ -27,6 +27,15 @@ namespace Backet.Api ...@@ -27,6 +27,15 @@ namespace Backet.Api
services.AddDbContextPool<BacketDbContext>(options => options.UseNpgsql(Configuration["postgres:write"])); services.AddDbContextPool<BacketDbContext>(options => options.UseNpgsql(Configuration["postgres:write"]));
services.AddControllers(); services.AddControllers();
services.AddPole(config => {
config.AddRabbitMQ(option =>
{
option.Hosts = new string[1] { Configuration["RabbitmqConfig:HostAddress"] };
option.Password = Configuration["RabbitmqConfig:HostPassword"];
option.UserName = Configuration["RabbitmqConfig:HostUserName"];
});
});
services.ConfigureGrainStorageOptions<BacketDbContext, BacketGrain, Backet.Api.Domain.AggregatesModel.BacketAggregate.Backet>( services.ConfigureGrainStorageOptions<BacketDbContext, BacketGrain, Backet.Api.Domain.AggregatesModel.BacketAggregate.Backet>(
options => options =>
{ {
...@@ -44,6 +53,7 @@ namespace Backet.Api ...@@ -44,6 +53,7 @@ namespace Backet.Api
app.UseDeveloperExceptionPage(); app.UseDeveloperExceptionPage();
} }
app.UsePole();
app.UseRouting(); app.UseRouting();
app.UseEndpoints(endpoints => app.UseEndpoints(endpoints =>
......
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Pole.Core.Observer;
using Pole.Core.Utils; using Pole.Core.Utils;
using System; using System;
using System.Collections.Concurrent; using System.Collections.Concurrent;
......
using Microsoft.AspNetCore.Builder; using Microsoft.AspNetCore.Builder;
using Pole.Core;
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Text; using System.Text;
namespace Pole.Core.Extensions namespace Microsoft.AspNetCore.Builder
{ {
public static class IApplicationBuilderExtensions public static class PoleApplicationBuilderExtensions
{ {
public static IApplicationBuilder UsePole(this IApplicationBuilder applicationBuilder) public static IApplicationBuilder UsePole(this IApplicationBuilder applicationBuilder)
{ {
......
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
using Pole.Core;
using Pole.Core.Abstraction; using Pole.Core.Abstraction;
using Pole.Core.Channels; using Pole.Core.Channels;
using Pole.Core.EventBus; using Pole.Core.EventBus;
...@@ -12,12 +13,17 @@ using System; ...@@ -12,12 +13,17 @@ using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Text; using System.Text;
namespace Pole.Core.Extensions namespace Microsoft.Extensions.DependencyInjection
{ {
public static class IServiceCollectionExtensions public static class PoleServiceCollectionExtensions
{ {
public static IServiceCollection AddPole(this IServiceCollection services,Action<PoleOptions> config) public static IServiceCollection AddPole(this IServiceCollection services,Action<StartupConfig> config)
{ {
StartupConfig startupOption = new StartupConfig(services);
if (startupOption.PoleOptionsConfig == null)
{
services.Configure<PoleOptions>(option => { });
}
services.AddSingleton<IEventTypeFinder, EventTypeFinder>(); services.AddSingleton<IEventTypeFinder, EventTypeFinder>();
services.AddTransient(typeof(IMpscChannel<>), typeof(MpscChannel<>)); services.AddTransient(typeof(IMpscChannel<>), typeof(MpscChannel<>));
services.AddScoped<IBus, Bus>(); services.AddScoped<IBus, Bus>();
...@@ -26,10 +32,11 @@ namespace Pole.Core.Extensions ...@@ -26,10 +32,11 @@ namespace Pole.Core.Extensions
services.AddSingleton<IGeneratorIdSolver, InstanceIPV4_16IdGeneratorIdSolver>(); services.AddSingleton<IGeneratorIdSolver, InstanceIPV4_16IdGeneratorIdSolver>();
services.AddSingleton<ISnowflakeIdGenerator, SnowflakeIdGenerator>(); services.AddSingleton<ISnowflakeIdGenerator, SnowflakeIdGenerator>();
services.AddSingleton<IProcessor, PendingMessageRetryProcessor>(); services.AddSingleton<IProcessor, PendingMessageRetryProcessor>();
services.AddSingleton<IProcessor, ExpiredEventsCollectorProcessor>(); services.AddSingleton<IProcessor, ExpiredEventsCollectorProcessor>();
services.AddHostedService<BackgroundServiceBasedProcessorServer>(); services.AddHostedService<BackgroundServiceBasedProcessorServer>();
config(startupOption);
return services; return services;
} }
} }
......
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using Pole.Core.EventBus.EventStorage;
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq; using System.Linq;
...@@ -21,6 +22,8 @@ namespace Pole.Core.Processor.Server ...@@ -21,6 +22,8 @@ namespace Pole.Core.Processor.Server
} }
public async Task Start(CancellationToken stoppingToken) public async Task Start(CancellationToken stoppingToken)
{ {
var eventStorageInitializer = _serviceProvider.GetService<IEventStorageInitializer>();
await eventStorageInitializer.InitializeAsync(stoppingToken);
ProcessingContext processingContext = new ProcessingContext(stoppingToken); ProcessingContext processingContext = new ProcessingContext(stoppingToken);
List<LoopProcessor> loopProcessors = new List<LoopProcessor>(); List<LoopProcessor> loopProcessors = new List<LoopProcessor>();
......
using System; using Microsoft.Extensions.DependencyInjection;
using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq; using System.Linq;
using System.Threading.Tasks; using System.Threading.Tasks;
...@@ -28,4 +29,13 @@ namespace Pole.Core ...@@ -28,4 +29,13 @@ namespace Pole.Core
public Func<IServiceProvider, Task> Func { get; set; } public Func<IServiceProvider, Task> Func { get; set; }
} }
} }
public class StartupConfig
{
public StartupConfig(IServiceCollection services)
{
Services = services;
}
public IServiceCollection Services { get; }
public Action<PoleOptions> PoleOptionsConfig { get; set; }
}
} }
...@@ -3,21 +3,22 @@ using System.Threading.Tasks; ...@@ -3,21 +3,22 @@ using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
using Pole.Core; using Pole.Core;
using Pole.Core.EventBus; using Pole.Core.EventBus;
using Pole.EventBus.RabbitMQ;
namespace Pole.EventBus.RabbitMQ namespace Microsoft.Extensions.DependencyInjection
{ {
public static class Extensions public static class PoleRabbitmqStartupConfigExtensions
{ {
public static void AddRabbitMQ( public static void AddRabbitMQ(
this IServiceCollection serviceCollection, this StartupConfig startupOption,
Action<RabbitOptions> rabbitConfigAction, Action<RabbitOptions> rabbitConfigAction,
Func<IRabbitEventBusContainer, Task> eventBusConfig = default) Func<IRabbitEventBusContainer, Task> eventBusConfig = default)
{ {
serviceCollection.Configure<RabbitOptions>(config => rabbitConfigAction(config)); startupOption.Services.Configure<RabbitOptions>(config => rabbitConfigAction(config));
serviceCollection.AddSingleton<IRabbitMQClient, RabbitMQClient>(); startupOption.Services.AddSingleton<IRabbitMQClient, RabbitMQClient>();
serviceCollection.AddHostedService<ConsumerManager>(); startupOption.Services.AddHostedService<ConsumerManager>();
serviceCollection.AddSingleton<IRabbitEventBusContainer, EventBusContainer>(); startupOption.Services.AddSingleton<IRabbitEventBusContainer, EventBusContainer>();
serviceCollection.AddSingleton(serviceProvider => serviceProvider.GetService<IRabbitEventBusContainer>() as IProducerContainer); startupOption.Services.AddSingleton(serviceProvider => serviceProvider.GetService<IRabbitEventBusContainer>() as IProducerContainer);
Startup.Register(async serviceProvider => Startup.Register(async serviceProvider =>
{ {
var container = serviceProvider.GetService<IRabbitEventBusContainer>(); var container = serviceProvider.GetService<IRabbitEventBusContainer>();
......
...@@ -9,36 +9,6 @@ namespace Pole.EventStorage.PostgreSql ...@@ -9,36 +9,6 @@ namespace Pole.EventStorage.PostgreSql
{ {
public static class CapOptionsExtensions public static class CapOptionsExtensions
{ {
public static PoleOptions UseEntityFrameworkEventStorage<TContext>(this PoleOptions options)
where TContext : DbContext
{
return options.UseEntityFrameworkEventStorage<TContext>(opt => { });
}
public static PoleOptions UseEntityFrameworkEventStorage<TContext>(this PoleOptions options, Action<EFOptions> configure)
where TContext : DbContext
{
if (configure == null) throw new ArgumentNullException(nameof(configure));
EFOptions eFOptions = new EFOptions();
configure(eFOptions);
Action<PostgreSqlOptions> postgreSqlOptionsConfig = postgreSqlOptions =>
{
postgreSqlOptions.DbContextType = typeof(TContext);
postgreSqlOptions.Schema = eFOptions.Schema;
using var scope = options.Services.BuildServiceProvider().CreateScope();
var provider = scope.ServiceProvider;
using var dbContext = (DbContext)provider.GetRequiredService(typeof(TContext));
postgreSqlOptions.ConnectionString = dbContext.Database.GetDbConnection().ConnectionString;
};
options.Services.Configure(postgreSqlOptionsConfig);
return options;
}
public static PoleOptions UsePostgreSqlEventStorage<TContext>(this PoleOptions options, Action<PostgreSqlOptions> configure)
where TContext : DbContext
{
if (configure == null) throw new ArgumentNullException(nameof(configure));
options.Services.Configure(configure);
return options;
}
} }
} }
using Microsoft.EntityFrameworkCore;
using Pole.Core;
using Pole.Core.EventBus.EventStorage;
using Pole.Core.EventBus.Transaction;
using Pole.EventStorage.PostgreSql;
using System;
using System.Collections.Generic;
using System.Text;
namespace Microsoft.Extensions.DependencyInjection
{
public static class PolePostgreSqlStartupConfigExtensions
{
public static StartupConfig AddEntityFrameworkEventStorage<TContext>(this StartupConfig config)
where TContext : DbContext
{
return config.AddEntityFrameworkEventStorage<TContext>(opt => { });
}
public static StartupConfig AddEntityFrameworkEventStorage<TContext>(this StartupConfig config, Action<EFOptions> configure)
where TContext : DbContext
{
if (configure == null) throw new ArgumentNullException(nameof(configure));
EFOptions eFOptions = new EFOptions();
configure(eFOptions);
Action<PostgreSqlOptions> postgreSqlOptionsConfig = postgreSqlOptions =>
{
postgreSqlOptions.DbContextType = typeof(TContext);
postgreSqlOptions.Schema = eFOptions.Schema;
using var scope = config.Services.BuildServiceProvider().CreateScope();
var provider = scope.ServiceProvider;
using var dbContext = (DbContext)provider.GetRequiredService(typeof(TContext));
postgreSqlOptions.ConnectionString = dbContext.Database.GetDbConnection().ConnectionString;
};
config.Services.Configure(postgreSqlOptionsConfig);
config.Services.AddScoped<IDbTransactionAdapter, PostgreSqlDbTransactionAdapter>();
config.Services.AddSingleton<IEventStorage, PostgreSqlEventStorage>();
config.Services.AddSingleton<IEventStorageInitializer, PostgreSqlEventStorageInitializer>();
return config;
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment