From 2d910fc1089fbafe6b84a868679b5b10c995d48a Mon Sep 17 00:00:00 2001 From: dingsongjie Date: Thu, 12 Mar 2020 17:35:47 +0800 Subject: [PATCH] 优化宿主服务 --- samples/apis/SagasTest.Api/Startup.cs | 2 +- src/Pole.Core/Processor/IProcessorServer.cs | 13 ------------- src/Pole.EventBus/Processor/Server/BackgroundServiceBasedProcessorServer.cs | 14 ++++++++------ src/Pole.Sagas.Client/NotEndedSagasCompensateRetryBackgroundService.cs | 109 ------------------------------------------------------------------------------------------------------------- src/Pole.Sagas.Client/PoleSagaServiceCollectionExtensions.cs | 4 ++-- src/Pole.Sagas.Client/SagasCompensateRetryBackgroundService.cs | 109 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/Pole.Sagas.Server/Processor/BackgroundServiceBasedProcessorServer.cs | 14 ++++++++------ src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorageInitializer.cs | 18 ++++++++---------- 8 files changed, 136 insertions(+), 147 deletions(-) delete mode 100644 src/Pole.Core/Processor/IProcessorServer.cs delete mode 100644 src/Pole.Sagas.Client/NotEndedSagasCompensateRetryBackgroundService.cs create mode 100644 src/Pole.Sagas.Client/SagasCompensateRetryBackgroundService.cs diff --git a/samples/apis/SagasTest.Api/Startup.cs b/samples/apis/SagasTest.Api/Startup.cs index 531b433..7ddcf48 100644 --- a/samples/apis/SagasTest.Api/Startup.cs +++ b/samples/apis/SagasTest.Api/Startup.cs @@ -30,7 +30,7 @@ namespace SagasTest.Api services.AddControllers(); services.AddPole(config => { - config.AddSagas(option=> { + config.AddSagasClient(option=> { option.ServiceName = "SagasTest"; option.SagasServerHost = "http://localhost:80"; }); diff --git a/src/Pole.Core/Processor/IProcessorServer.cs b/src/Pole.Core/Processor/IProcessorServer.cs deleted file mode 100644 index 3ad156f..0000000 --- a/src/Pole.Core/Processor/IProcessorServer.cs +++ /dev/null @@ -1,13 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Text; -using System.Threading; -using System.Threading.Tasks; - -namespace Pole.Core.Processor -{ - public interface IProcessorServer - { - Task Start(CancellationToken stoppingToken); - } -} diff --git a/src/Pole.EventBus/Processor/Server/BackgroundServiceBasedProcessorServer.cs b/src/Pole.EventBus/Processor/Server/BackgroundServiceBasedProcessorServer.cs index 5f7bad7..8905bbe 100644 --- a/src/Pole.EventBus/Processor/Server/BackgroundServiceBasedProcessorServer.cs +++ b/src/Pole.EventBus/Processor/Server/BackgroundServiceBasedProcessorServer.cs @@ -12,7 +12,7 @@ using System.Threading.Tasks; namespace Pole.EventBus.Processor.Server { - public class BackgroundServiceBasedProcessorServer : BackgroundService, IProcessorServer + public class BackgroundServiceBasedProcessorServer : IHostedService { private readonly IServiceProvider _serviceProvider; private Task _compositeTask; @@ -21,12 +21,13 @@ namespace Pole.EventBus.Processor.Server { _serviceProvider = serviceProvider; } - public async Task Start(CancellationToken stoppingToken) + + public async Task StartAsync(CancellationToken cancellationToken) { var eventStorageInitializer = _serviceProvider.GetService(); - await eventStorageInitializer.InitializeAsync(stoppingToken); + await eventStorageInitializer.InitializeAsync(cancellationToken); - ProcessingContext processingContext = new ProcessingContext(stoppingToken); + ProcessingContext processingContext = new ProcessingContext(cancellationToken); List loopProcessors = new List(); var innerProcessors = _serviceProvider.GetServices(); var loggerFactory = _serviceProvider.GetService(); @@ -40,9 +41,10 @@ namespace Pole.EventBus.Processor.Server _compositeTask = Task.WhenAll(tasks); await _compositeTask; } - protected override Task ExecuteAsync(CancellationToken stoppingToken) + + public Task StopAsync(CancellationToken cancellationToken) { - return Start(stoppingToken); + return Task.CompletedTask; } } } diff --git a/src/Pole.Sagas.Client/NotEndedSagasCompensateRetryBackgroundService.cs b/src/Pole.Sagas.Client/NotEndedSagasCompensateRetryBackgroundService.cs deleted file mode 100644 index 625047b..0000000 --- a/src/Pole.Sagas.Client/NotEndedSagasCompensateRetryBackgroundService.cs +++ /dev/null @@ -1,109 +0,0 @@ -using Grpc.Core; -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Hosting; -using Microsoft.Extensions.Logging; -using Microsoft.Extensions.Options; -using Pole.Core.Serialization; -using Pole.Core.Utils.Abstraction; -using Pole.Sagas.Client.Abstraction; -using Pole.Sagas.Core; -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading; -using System.Threading.Tasks; -using static Pole.Sagas.Server.Grpc.Saga; - -namespace Pole.Sagas.Client -{ - public class NotEndedSagasCompensateRetryBackgroundService : IHostedService - { - private readonly PoleSagasOption options; - private readonly SagaClient sagaClient; - private readonly SagaRestorer sagaRestorer; - private readonly IEventSender eventSender; - private readonly ILogger logger; - public NotEndedSagasCompensateRetryBackgroundService(IOptions options, SagaClient sagaClient, IServiceProvider serviceProvider, IEventSender eventSender, ILogger logger) - { - this.options = options.Value; - this.sagaClient = sagaClient; - sagaRestorer = new SagaRestorer(serviceProvider.GetRequiredService(), serviceProvider, serviceProvider.GetRequiredService(), this.options, serviceProvider.GetRequiredService(), serviceProvider.GetRequiredService()); - this.eventSender = eventSender; - this.logger = logger; - } - - public async Task StartAsync(CancellationToken cancellationToken) - { - while (true) - { - try - { - await GrpcGetSagasCore(cancellationToken); - } - catch (Exception ex) - { - logger.LogError(ex, "Errors in GRPC GetSagas"); - } - finally - { - await Task.Delay(options.GrpcConnectFailRetryIntervalSeconds * 1000); - } - } - } - - private async Task GrpcGetSagasCore(CancellationToken cancellationToken) - { - using (var stream = sagaClient.GetSagas(new Pole.Sagas.Server.Grpc.GetSagasRequest { Limit = options.PreSagasGrpcStreamingResponseLimitCount, ServiceName = options.ServiceName })) - { - while (await stream.ResponseStream.MoveNext(cancellationToken)) - { - if (stream.ResponseStream.Current.IsSuccess) - { - try - { - var sagas = stream.ResponseStream.Current.Sagas.Select(m => - { - var result = new SagaEntity - { - Id = m.Id, - }; - result.ActivityEntities = m.Activities.Select(n => new ActivityEntity - { - CompensateTimes = n.CompensateTimes, - ExecuteTimes = n.ExecuteTimes, - Id = n.Id, - Name = n.Id, - Order = n.Order, - ParameterData = n.ParameterData.ToByteArray(), - SagaId = n.SagaId, - Status = n.Status - }).ToList(); - return result; - }).ToList(); - sagas.ForEach(async sagaEntity => - { - var saga = sagaRestorer.CreateSaga(sagaEntity); - var compensateResult = await saga.CompensateWhenRetry(); - if (compensateResult) - { - var expiresAt = DateTime.UtcNow.AddSeconds(options.CompeletedSagaExpiredAfterSeconds); - await eventSender.SagaEnded(sagaEntity.Id, expiresAt); - } - }); - } - catch (Exception ex) - { - logger.LogError(ex, "Errors in NotEndedSagasCompensateRetryBackgroundService CompensateRetry"); - } - } - } - } - } - - public Task StopAsync(CancellationToken cancellationToken) - { - return Task.CompletedTask; - } - } -} diff --git a/src/Pole.Sagas.Client/PoleSagaServiceCollectionExtensions.cs b/src/Pole.Sagas.Client/PoleSagaServiceCollectionExtensions.cs index 7a5d741..0024236 100644 --- a/src/Pole.Sagas.Client/PoleSagaServiceCollectionExtensions.cs +++ b/src/Pole.Sagas.Client/PoleSagaServiceCollectionExtensions.cs @@ -18,7 +18,7 @@ namespace Microsoft.Extensions.DependencyInjection { public static class PoleSagaServiceCollectionExtensions { - public static void AddSagas(this StartupConfig startupOption, Action configAction) + public static void AddSagasClient(this StartupConfig startupOption, Action configAction) { // 让客户端支持 没有TLS 的 grpc call AppContext.SetSwitch("System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport", true); @@ -26,7 +26,7 @@ namespace Microsoft.Extensions.DependencyInjection startupOption.Services.AddSingleton(); startupOption.Services.AddSingleton(); startupOption.Services.AddSingleton(); - startupOption.Services.AddHostedService(); + startupOption.Services.AddHostedService(); PoleSagasOption sagasOption = null; using (var provider = startupOption.Services.BuildServiceProvider()) { diff --git a/src/Pole.Sagas.Client/SagasCompensateRetryBackgroundService.cs b/src/Pole.Sagas.Client/SagasCompensateRetryBackgroundService.cs new file mode 100644 index 0000000..e37abab --- /dev/null +++ b/src/Pole.Sagas.Client/SagasCompensateRetryBackgroundService.cs @@ -0,0 +1,109 @@ +using Grpc.Core; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using Pole.Core.Serialization; +using Pole.Core.Utils.Abstraction; +using Pole.Sagas.Client.Abstraction; +using Pole.Sagas.Core; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using static Pole.Sagas.Server.Grpc.Saga; + +namespace Pole.Sagas.Client +{ + public class SagasCompensateRetryBackgroundService : IHostedService + { + private readonly PoleSagasOption options; + private readonly SagaClient sagaClient; + private readonly SagaRestorer sagaRestorer; + private readonly IEventSender eventSender; + private readonly ILogger logger; + public SagasCompensateRetryBackgroundService(IOptions options, SagaClient sagaClient, IServiceProvider serviceProvider, IEventSender eventSender, ILogger logger) + { + this.options = options.Value; + this.sagaClient = sagaClient; + sagaRestorer = new SagaRestorer(serviceProvider.GetRequiredService(), serviceProvider, serviceProvider.GetRequiredService(), this.options, serviceProvider.GetRequiredService(), serviceProvider.GetRequiredService()); + this.eventSender = eventSender; + this.logger = logger; + } + + public async Task StartAsync(CancellationToken cancellationToken) + { + while (true) + { + try + { + await GrpcGetSagasCore(cancellationToken); + } + catch (Exception ex) + { + logger.LogError(ex, "Errors in grpc get sagas"); + } + finally + { + await Task.Delay(options.GrpcConnectFailRetryIntervalSeconds * 1000); + } + } + } + + private async Task GrpcGetSagasCore(CancellationToken cancellationToken) + { + using (var stream = sagaClient.GetSagas(new Pole.Sagas.Server.Grpc.GetSagasRequest { Limit = options.PreSagasGrpcStreamingResponseLimitCount, ServiceName = options.ServiceName })) + { + while (await stream.ResponseStream.MoveNext(cancellationToken)) + { + if (stream.ResponseStream.Current.IsSuccess) + { + try + { + var sagas = stream.ResponseStream.Current.Sagas.Select(m => + { + var result = new SagaEntity + { + Id = m.Id, + }; + result.ActivityEntities = m.Activities.Select(n => new ActivityEntity + { + CompensateTimes = n.CompensateTimes, + ExecuteTimes = n.ExecuteTimes, + Id = n.Id, + Name = n.Id, + Order = n.Order, + ParameterData = n.ParameterData.ToByteArray(), + SagaId = n.SagaId, + Status = n.Status + }).ToList(); + return result; + }).ToList(); + sagas.ForEach(async sagaEntity => + { + var saga = sagaRestorer.CreateSaga(sagaEntity); + var compensateResult = await saga.CompensateWhenRetry(); + if (compensateResult) + { + var expiresAt = DateTime.UtcNow.AddSeconds(options.CompeletedSagaExpiredAfterSeconds); + await eventSender.SagaEnded(sagaEntity.Id, expiresAt); + } + }); + } + catch (Exception ex) + { + logger.LogError(ex, "Errors in NotEndedSagasCompensateRetryBackgroundService CompensateRetry"); + } + } + } + } + } + + public Task StopAsync(CancellationToken cancellationToken) + { + return Task.CompletedTask; + } + } +} diff --git a/src/Pole.Sagas.Server/Processor/BackgroundServiceBasedProcessorServer.cs b/src/Pole.Sagas.Server/Processor/BackgroundServiceBasedProcessorServer.cs index 7664971..98bb91d 100644 --- a/src/Pole.Sagas.Server/Processor/BackgroundServiceBasedProcessorServer.cs +++ b/src/Pole.Sagas.Server/Processor/BackgroundServiceBasedProcessorServer.cs @@ -12,7 +12,7 @@ using System.Linq; namespace Pole.Sagas.Server.Processor { - public class BackgroundServiceBasedProcessorServer : BackgroundService, IProcessorServer + public class BackgroundServiceBasedProcessorServer : IHostedService { private readonly IServiceProvider _serviceProvider; private Task _compositeTask; @@ -21,12 +21,13 @@ namespace Pole.Sagas.Server.Processor { _serviceProvider = serviceProvider; } - public async Task Start(CancellationToken stoppingToken) + + public async Task StartAsync(CancellationToken cancellationToken) { var eventStorageInitializer = _serviceProvider.GetService(); - await eventStorageInitializer.InitializeAsync(stoppingToken); + await eventStorageInitializer.InitializeAsync(cancellationToken); - ProcessingContext processingContext = new ProcessingContext(stoppingToken); + ProcessingContext processingContext = new ProcessingContext(cancellationToken); List loopProcessors = new List(); var innerProcessors = _serviceProvider.GetServices(); var loggerFactory = _serviceProvider.GetService(); @@ -40,9 +41,10 @@ namespace Pole.Sagas.Server.Processor _compositeTask = Task.WhenAll(tasks); await _compositeTask; } - protected override Task ExecuteAsync(CancellationToken stoppingToken) + + public Task StopAsync(CancellationToken cancellationToken) { - return Start(stoppingToken); + return Task.CompletedTask; } } } diff --git a/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorageInitializer.cs b/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorageInitializer.cs index 15a4cc9..531faa9 100644 --- a/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorageInitializer.cs +++ b/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorageInitializer.cs @@ -36,17 +36,18 @@ namespace Pole.Sagas.Storage.PostgreSql if (cancellationToken.IsCancellationRequested) return; var sql = CreateDbTablesScript(options.SchemaName); + using (var connection = new NpgsqlConnection(options.ConnectionString)) { await connection.ExecuteAsync(sql); } - logger.LogDebug("Ensuring all create database tables script are applied."); } private string CreateDbTablesScript(string schemaName) { var batchSql = $@" +111 CREATE SCHEMA IF NOT EXISTS ""{options.SchemaName}""; CREATE TABLE IF NOT EXISTS {GetSagaTableName()}( @@ -54,9 +55,9 @@ CREATE TABLE IF NOT EXISTS {GetSagaTableName()}( ""ServiceName"" varchar(64) COLLATE ""pg_catalog"".""default"" NOT NULL, ""Status"" varchar(10) COLLATE ""pg_catalog"".""default"" NOT NULL, ""ExpiresAt"" timestamp, - ""AddTime"" timestamp NOT NULL + ""AddTime"" timestamp NOT NULL, + CONSTRAINT ""Sagas_pkey"" PRIMARY KEY (""Id"") ); -ALTER TABLE {GetSagaTableName()} ADD CONSTRAINT ""Sagas_pkey"" PRIMARY KEY (""Id""); CREATE TABLE IF NOT EXISTS {GetActivityTableName()}( ""Id"" varchar(20) COLLATE ""pg_catalog"".""default"" NOT NULL, @@ -68,17 +69,14 @@ CREATE TABLE IF NOT EXISTS {GetActivityTableName()}( ""ParameterData"" bytea NOT NULL, ""CompensateErrors"" varchar(1024) COLLATE ""pg_catalog"".""default"", ""CompensateTimes"" int4 NOT NULL, - ""AddTime"" timestamp NOT NULL + ""AddTime"" timestamp NOT NULL, + CONSTRAINT ""Activities_pkey"" PRIMARY KEY (""Id""), + CONSTRAINT ""Activities_SagaId_fkey"" FOREIGN KEY (""SagaId"") REFERENCES {GetSagaTableName()} (""Id"") ON DELETE CASCADE ON UPDATE NO ACTION ); -CREATE INDEX ""Activities_SagaId"" ON {GetActivityTableName()} USING btree ( +CREATE INDEX IF NOT EXISTS ""Activities_SagaId"" ON {GetActivityTableName()} USING btree ( ""SagaId"" COLLATE ""pg_catalog"".""default"" ""pg_catalog"".""text_ops"" ASC NULLS LAST ); - -ALTER TABLE {GetActivityTableName()} ADD CONSTRAINT ""Activities_pkey"" PRIMARY KEY (""Id""); - - -ALTER TABLE {GetActivityTableName()} ADD CONSTRAINT ""Activities_SagaId_fkey"" FOREIGN KEY (""SagaId"") REFERENCES {GetSagaTableName()} (""Id"") ON DELETE CASCADE ON UPDATE NO ACTION; "; return batchSql; } -- libgit2 0.25.0