diff --git a/samples/apis/SagasTest.Api/Startup.cs b/samples/apis/SagasTest.Api/Startup.cs index 531b433..7ddcf48 100644 --- a/samples/apis/SagasTest.Api/Startup.cs +++ b/samples/apis/SagasTest.Api/Startup.cs @@ -30,7 +30,7 @@ namespace SagasTest.Api services.AddControllers(); services.AddPole(config => { - config.AddSagas(option=> { + config.AddSagasClient(option=> { option.ServiceName = "SagasTest"; option.SagasServerHost = "http://localhost:80"; }); diff --git a/src/Pole.Core/Processor/IProcessorServer.cs b/src/Pole.Core/Processor/IProcessorServer.cs deleted file mode 100644 index 3ad156f..0000000 --- a/src/Pole.Core/Processor/IProcessorServer.cs +++ /dev/null @@ -1,13 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Text; -using System.Threading; -using System.Threading.Tasks; - -namespace Pole.Core.Processor -{ - public interface IProcessorServer - { - Task Start(CancellationToken stoppingToken); - } -} diff --git a/src/Pole.EventBus/Processor/Server/BackgroundServiceBasedProcessorServer.cs b/src/Pole.EventBus/Processor/Server/BackgroundServiceBasedProcessorServer.cs index 5f7bad7..8905bbe 100644 --- a/src/Pole.EventBus/Processor/Server/BackgroundServiceBasedProcessorServer.cs +++ b/src/Pole.EventBus/Processor/Server/BackgroundServiceBasedProcessorServer.cs @@ -12,7 +12,7 @@ using System.Threading.Tasks; namespace Pole.EventBus.Processor.Server { - public class BackgroundServiceBasedProcessorServer : BackgroundService, IProcessorServer + public class BackgroundServiceBasedProcessorServer : IHostedService { private readonly IServiceProvider _serviceProvider; private Task _compositeTask; @@ -21,12 +21,13 @@ namespace Pole.EventBus.Processor.Server { _serviceProvider = serviceProvider; } - public async Task Start(CancellationToken stoppingToken) + + public async Task StartAsync(CancellationToken cancellationToken) { var eventStorageInitializer = _serviceProvider.GetService(); - await eventStorageInitializer.InitializeAsync(stoppingToken); + await eventStorageInitializer.InitializeAsync(cancellationToken); - ProcessingContext processingContext = new ProcessingContext(stoppingToken); + ProcessingContext processingContext = new ProcessingContext(cancellationToken); List loopProcessors = new List(); var innerProcessors = _serviceProvider.GetServices(); var loggerFactory = _serviceProvider.GetService(); @@ -40,9 +41,10 @@ namespace Pole.EventBus.Processor.Server _compositeTask = Task.WhenAll(tasks); await _compositeTask; } - protected override Task ExecuteAsync(CancellationToken stoppingToken) + + public Task StopAsync(CancellationToken cancellationToken) { - return Start(stoppingToken); + return Task.CompletedTask; } } } diff --git a/src/Pole.Sagas.Client/PoleSagaServiceCollectionExtensions.cs b/src/Pole.Sagas.Client/PoleSagaServiceCollectionExtensions.cs index 7a5d741..0024236 100644 --- a/src/Pole.Sagas.Client/PoleSagaServiceCollectionExtensions.cs +++ b/src/Pole.Sagas.Client/PoleSagaServiceCollectionExtensions.cs @@ -18,7 +18,7 @@ namespace Microsoft.Extensions.DependencyInjection { public static class PoleSagaServiceCollectionExtensions { - public static void AddSagas(this StartupConfig startupOption, Action configAction) + public static void AddSagasClient(this StartupConfig startupOption, Action configAction) { // 让客户端支持 没有TLS 的 grpc call AppContext.SetSwitch("System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport", true); @@ -26,7 +26,7 @@ namespace Microsoft.Extensions.DependencyInjection startupOption.Services.AddSingleton(); startupOption.Services.AddSingleton(); startupOption.Services.AddSingleton(); - startupOption.Services.AddHostedService(); + startupOption.Services.AddHostedService(); PoleSagasOption sagasOption = null; using (var provider = startupOption.Services.BuildServiceProvider()) { diff --git a/src/Pole.Sagas.Client/NotEndedSagasCompensateRetryBackgroundService.cs b/src/Pole.Sagas.Client/SagasCompensateRetryBackgroundService.cs similarity index 95% rename from src/Pole.Sagas.Client/NotEndedSagasCompensateRetryBackgroundService.cs rename to src/Pole.Sagas.Client/SagasCompensateRetryBackgroundService.cs index 625047b..e37abab 100644 --- a/src/Pole.Sagas.Client/NotEndedSagasCompensateRetryBackgroundService.cs +++ b/src/Pole.Sagas.Client/SagasCompensateRetryBackgroundService.cs @@ -17,14 +17,14 @@ using static Pole.Sagas.Server.Grpc.Saga; namespace Pole.Sagas.Client { - public class NotEndedSagasCompensateRetryBackgroundService : IHostedService + public class SagasCompensateRetryBackgroundService : IHostedService { private readonly PoleSagasOption options; private readonly SagaClient sagaClient; private readonly SagaRestorer sagaRestorer; private readonly IEventSender eventSender; private readonly ILogger logger; - public NotEndedSagasCompensateRetryBackgroundService(IOptions options, SagaClient sagaClient, IServiceProvider serviceProvider, IEventSender eventSender, ILogger logger) + public SagasCompensateRetryBackgroundService(IOptions options, SagaClient sagaClient, IServiceProvider serviceProvider, IEventSender eventSender, ILogger logger) { this.options = options.Value; this.sagaClient = sagaClient; @@ -43,7 +43,7 @@ namespace Pole.Sagas.Client } catch (Exception ex) { - logger.LogError(ex, "Errors in GRPC GetSagas"); + logger.LogError(ex, "Errors in grpc get sagas"); } finally { diff --git a/src/Pole.Sagas.Server/Processor/BackgroundServiceBasedProcessorServer.cs b/src/Pole.Sagas.Server/Processor/BackgroundServiceBasedProcessorServer.cs index 7664971..98bb91d 100644 --- a/src/Pole.Sagas.Server/Processor/BackgroundServiceBasedProcessorServer.cs +++ b/src/Pole.Sagas.Server/Processor/BackgroundServiceBasedProcessorServer.cs @@ -12,7 +12,7 @@ using System.Linq; namespace Pole.Sagas.Server.Processor { - public class BackgroundServiceBasedProcessorServer : BackgroundService, IProcessorServer + public class BackgroundServiceBasedProcessorServer : IHostedService { private readonly IServiceProvider _serviceProvider; private Task _compositeTask; @@ -21,12 +21,13 @@ namespace Pole.Sagas.Server.Processor { _serviceProvider = serviceProvider; } - public async Task Start(CancellationToken stoppingToken) + + public async Task StartAsync(CancellationToken cancellationToken) { var eventStorageInitializer = _serviceProvider.GetService(); - await eventStorageInitializer.InitializeAsync(stoppingToken); + await eventStorageInitializer.InitializeAsync(cancellationToken); - ProcessingContext processingContext = new ProcessingContext(stoppingToken); + ProcessingContext processingContext = new ProcessingContext(cancellationToken); List loopProcessors = new List(); var innerProcessors = _serviceProvider.GetServices(); var loggerFactory = _serviceProvider.GetService(); @@ -40,9 +41,10 @@ namespace Pole.Sagas.Server.Processor _compositeTask = Task.WhenAll(tasks); await _compositeTask; } - protected override Task ExecuteAsync(CancellationToken stoppingToken) + + public Task StopAsync(CancellationToken cancellationToken) { - return Start(stoppingToken); + return Task.CompletedTask; } } } diff --git a/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorageInitializer.cs b/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorageInitializer.cs index 15a4cc9..531faa9 100644 --- a/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorageInitializer.cs +++ b/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorageInitializer.cs @@ -36,17 +36,18 @@ namespace Pole.Sagas.Storage.PostgreSql if (cancellationToken.IsCancellationRequested) return; var sql = CreateDbTablesScript(options.SchemaName); + using (var connection = new NpgsqlConnection(options.ConnectionString)) { await connection.ExecuteAsync(sql); } - logger.LogDebug("Ensuring all create database tables script are applied."); } private string CreateDbTablesScript(string schemaName) { var batchSql = $@" +111 CREATE SCHEMA IF NOT EXISTS ""{options.SchemaName}""; CREATE TABLE IF NOT EXISTS {GetSagaTableName()}( @@ -54,9 +55,9 @@ CREATE TABLE IF NOT EXISTS {GetSagaTableName()}( ""ServiceName"" varchar(64) COLLATE ""pg_catalog"".""default"" NOT NULL, ""Status"" varchar(10) COLLATE ""pg_catalog"".""default"" NOT NULL, ""ExpiresAt"" timestamp, - ""AddTime"" timestamp NOT NULL + ""AddTime"" timestamp NOT NULL, + CONSTRAINT ""Sagas_pkey"" PRIMARY KEY (""Id"") ); -ALTER TABLE {GetSagaTableName()} ADD CONSTRAINT ""Sagas_pkey"" PRIMARY KEY (""Id""); CREATE TABLE IF NOT EXISTS {GetActivityTableName()}( ""Id"" varchar(20) COLLATE ""pg_catalog"".""default"" NOT NULL, @@ -68,17 +69,14 @@ CREATE TABLE IF NOT EXISTS {GetActivityTableName()}( ""ParameterData"" bytea NOT NULL, ""CompensateErrors"" varchar(1024) COLLATE ""pg_catalog"".""default"", ""CompensateTimes"" int4 NOT NULL, - ""AddTime"" timestamp NOT NULL + ""AddTime"" timestamp NOT NULL, + CONSTRAINT ""Activities_pkey"" PRIMARY KEY (""Id""), + CONSTRAINT ""Activities_SagaId_fkey"" FOREIGN KEY (""SagaId"") REFERENCES {GetSagaTableName()} (""Id"") ON DELETE CASCADE ON UPDATE NO ACTION ); -CREATE INDEX ""Activities_SagaId"" ON {GetActivityTableName()} USING btree ( +CREATE INDEX IF NOT EXISTS ""Activities_SagaId"" ON {GetActivityTableName()} USING btree ( ""SagaId"" COLLATE ""pg_catalog"".""default"" ""pg_catalog"".""text_ops"" ASC NULLS LAST ); - -ALTER TABLE {GetActivityTableName()} ADD CONSTRAINT ""Activities_pkey"" PRIMARY KEY (""Id""); - - -ALTER TABLE {GetActivityTableName()} ADD CONSTRAINT ""Activities_SagaId_fkey"" FOREIGN KEY (""SagaId"") REFERENCES {GetSagaTableName()} (""Id"") ON DELETE CASCADE ON UPDATE NO ACTION; "; return batchSql; }