From 4e4262eed50f867236b1ba3a7b33f3021f69191b Mon Sep 17 00:00:00 2001 From: dingsongjie Date: Fri, 13 Mar 2020 09:29:04 +0800 Subject: [PATCH] 优化 backgroundserver --- src/Pole.EventBus/Processor/Server/BackgroundServiceBasedProcessorServer.cs | 15 +++++++-------- src/Pole.Sagas.Client/SagasCompensateRetryBackgroundService.cs | 2 +- src/Pole.Sagas.Server/Processor/BackgroundServiceBasedProcessorServer.cs | 16 +++++++--------- src/Pole.Sagas.Server/Services/SagaService.cs | 2 +- src/Pole.Sagas.Storage.PostgreSql/ActivityAndSagaEntity.cs | 2 +- src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorage.cs | 15 ++++++++------- src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorageInitializer.cs | 1 - src/Pole.Sagas/Core/ActivityEntity.cs | 2 +- 8 files changed, 26 insertions(+), 29 deletions(-) diff --git a/src/Pole.EventBus/Processor/Server/BackgroundServiceBasedProcessorServer.cs b/src/Pole.EventBus/Processor/Server/BackgroundServiceBasedProcessorServer.cs index 8905bbe..74ce579 100644 --- a/src/Pole.EventBus/Processor/Server/BackgroundServiceBasedProcessorServer.cs +++ b/src/Pole.EventBus/Processor/Server/BackgroundServiceBasedProcessorServer.cs @@ -12,7 +12,7 @@ using System.Threading.Tasks; namespace Pole.EventBus.Processor.Server { - public class BackgroundServiceBasedProcessorServer : IHostedService + public class BackgroundServiceBasedProcessorServer : BackgroundService { private readonly IServiceProvider _serviceProvider; private Task _compositeTask; @@ -22,12 +22,16 @@ namespace Pole.EventBus.Processor.Server _serviceProvider = serviceProvider; } - public async Task StartAsync(CancellationToken cancellationToken) + public override async Task StartAsync(CancellationToken cancellationToken) { var eventStorageInitializer = _serviceProvider.GetService(); await eventStorageInitializer.InitializeAsync(cancellationToken); + await base.StartAsync(cancellationToken); + } - ProcessingContext processingContext = new ProcessingContext(cancellationToken); + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + ProcessingContext processingContext = new ProcessingContext(stoppingToken); List loopProcessors = new List(); var innerProcessors = _serviceProvider.GetServices(); var loggerFactory = _serviceProvider.GetService(); @@ -41,10 +45,5 @@ namespace Pole.EventBus.Processor.Server _compositeTask = Task.WhenAll(tasks); await _compositeTask; } - - public Task StopAsync(CancellationToken cancellationToken) - { - return Task.CompletedTask; - } } } diff --git a/src/Pole.Sagas.Client/SagasCompensateRetryBackgroundService.cs b/src/Pole.Sagas.Client/SagasCompensateRetryBackgroundService.cs index e37abab..d5e521f 100644 --- a/src/Pole.Sagas.Client/SagasCompensateRetryBackgroundService.cs +++ b/src/Pole.Sagas.Client/SagasCompensateRetryBackgroundService.cs @@ -71,7 +71,7 @@ namespace Pole.Sagas.Client result.ActivityEntities = m.Activities.Select(n => new ActivityEntity { CompensateTimes = n.CompensateTimes, - ExecuteTimes = n.ExecuteTimes, + OvertimeCompensateTimes = n.ExecuteTimes, Id = n.Id, Name = n.Id, Order = n.Order, diff --git a/src/Pole.Sagas.Server/Processor/BackgroundServiceBasedProcessorServer.cs b/src/Pole.Sagas.Server/Processor/BackgroundServiceBasedProcessorServer.cs index 98bb91d..5880526 100644 --- a/src/Pole.Sagas.Server/Processor/BackgroundServiceBasedProcessorServer.cs +++ b/src/Pole.Sagas.Server/Processor/BackgroundServiceBasedProcessorServer.cs @@ -12,7 +12,7 @@ using System.Linq; namespace Pole.Sagas.Server.Processor { - public class BackgroundServiceBasedProcessorServer : IHostedService + public class BackgroundServiceBasedProcessorServer : BackgroundService { private readonly IServiceProvider _serviceProvider; private Task _compositeTask; @@ -22,12 +22,15 @@ namespace Pole.Sagas.Server.Processor _serviceProvider = serviceProvider; } - public async Task StartAsync(CancellationToken cancellationToken) + public override async Task StartAsync(CancellationToken cancellationToken) { var eventStorageInitializer = _serviceProvider.GetService(); await eventStorageInitializer.InitializeAsync(cancellationToken); - - ProcessingContext processingContext = new ProcessingContext(cancellationToken); + await base.StartAsync(cancellationToken); + } + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + ProcessingContext processingContext = new ProcessingContext(stoppingToken); List loopProcessors = new List(); var innerProcessors = _serviceProvider.GetServices(); var loggerFactory = _serviceProvider.GetService(); @@ -41,10 +44,5 @@ namespace Pole.Sagas.Server.Processor _compositeTask = Task.WhenAll(tasks); await _compositeTask; } - - public Task StopAsync(CancellationToken cancellationToken) - { - return Task.CompletedTask; - } } } diff --git a/src/Pole.Sagas.Server/Services/SagaService.cs b/src/Pole.Sagas.Server/Services/SagaService.cs index 9979d3f..a00b4a3 100644 --- a/src/Pole.Sagas.Server/Services/SagaService.cs +++ b/src/Pole.Sagas.Server/Services/SagaService.cs @@ -154,7 +154,7 @@ namespace Pole.Sagas.Server.Services result.Activities.Add(m.ActivityEntities.Select(n => new GetSagasResponse.Types.Saga.Types.Activity { CompensateTimes = n.CompensateTimes, - ExecuteTimes = n.ExecuteTimes, + ExecuteTimes = n.OvertimeCompensateTimes, Id = n.Id, Name = n.Id, Order = n.Order, diff --git a/src/Pole.Sagas.Storage.PostgreSql/ActivityAndSagaEntity.cs b/src/Pole.Sagas.Storage.PostgreSql/ActivityAndSagaEntity.cs index a96eeea..e1e04db 100644 --- a/src/Pole.Sagas.Storage.PostgreSql/ActivityAndSagaEntity.cs +++ b/src/Pole.Sagas.Storage.PostgreSql/ActivityAndSagaEntity.cs @@ -12,7 +12,7 @@ namespace Pole.Sagas.Storage.PostgreSql public int Order { get; set; } public string Status { get; set; } public byte[] ParameterData { get; set; } - public int ExecuteTimes { get; set; } + public int OvertimeCompensateTimes { get; set; } public int CompensateTimes { get; set; } public int Name { get; set; } } diff --git a/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorage.cs b/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorage.cs index 39afdb5..cec72c9 100644 --- a/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorage.cs +++ b/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorage.cs @@ -169,9 +169,9 @@ $"INSERT INTO {sagaTableName} (\"Id\",\"ServiceName\",\"Status\",\"AddTime\")" + { using (var connection = new NpgsqlConnection(poleSagasStoragePostgreSqlOption.ConnectionString)) { - var updateActivitySql = -$"select limit_sagas.\"Id\" as SagaId,limit_sagas.\"ServiceName\",activities.\"Id\" as ActivityId,activities.\"Order\",activities.\"Status\",activities.\"ParameterData\",activities.\"ExecuteTimes\",activities.\"CompensateTimes\",activities.\"Name\" from \"Activities\" as activities inner join(select \"Id\",\"ServiceName\" from \"Sagas\" where \"AddTime\" <= @AddTime and \"Status\" = '{nameof(SagaStatus.Started)}' limit @Limit ) as limit_sagas on activities.\"SagaId\" = limit_sagas.\"Id\" and activities.\"Status\" != @Status1 and activities.\"Status\" != @Status2"; - var activities = await connection.QueryAsync(updateActivitySql, new + var sql = +$"select limit_sagas.\"Id\" as SagaId,limit_sagas.\"ServiceName\",activities.\"Id\" as ActivityId,activities.\"Order\",activities.\"Status\",activities.\"ParameterData\",activities.\"OvertimeCompensateTimes\",activities.\"CompensateTimes\",activities.\"Name\" from {activityTableName} as activities inner join(select \"Id\",\"ServiceName\" from {sagaTableName} where \"AddTime\" <= @AddTime and \"Status\" = '{nameof(SagaStatus.Started)}' limit @Limit ) as limit_sagas on activities.\"SagaId\" = limit_sagas.\"Id\" and activities.\"Status\" != @Status1 and activities.\"Status\" != @Status2"; + var activities = await connection.QueryAsync(sql, new { AddTime = dateTime, Limit = limit, @@ -197,7 +197,7 @@ $"select limit_sagas.\"Id\" as SagaId,limit_sagas.\"ServiceName\",activities.\"I ActivityEntity activityEntity = new ActivityEntity { CompensateTimes = activity.CompensateTimes, - ExecuteTimes = activity.ExecuteTimes, + OvertimeCompensateTimes = activity.OvertimeCompensateTimes, Id = activity.Id, Order = activity.Order, ParameterData = activity.ParameterData, @@ -213,17 +213,18 @@ $"select limit_sagas.\"Id\" as SagaId,limit_sagas.\"ServiceName\",activities.\"I } } - public Task DeleteExpiredData(string tableName, DateTime ExpiredAt, int batchCount) + public async Task DeleteExpiredData(string tableName, DateTime ExpiredAt, int batchCount) { using (var connection = new NpgsqlConnection(poleSagasStoragePostgreSqlOption.ConnectionString)) { var sql = -$"delete {tableName} WHERE \"ExpiresAt\" < @ExpiredAt AND \"Id\" IN (SELECT \"Id\" FROM {tableName} LIMIT @BatchCount);"; - return connection.ExecuteAsync(sql, new +$"DELETE FROM {tableName} WHERE \"ExpiresAt\" < @ExpiredAt AND \"Id\" IN (SELECT \"Id\" FROM {tableName} LIMIT @BatchCount);"; + var result = await connection.ExecuteAsync(sql, new { ExpiredAt = ExpiredAt, BatchCount = batchCount, }); + return result; } } diff --git a/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorageInitializer.cs b/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorageInitializer.cs index 531faa9..1241899 100644 --- a/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorageInitializer.cs +++ b/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorageInitializer.cs @@ -47,7 +47,6 @@ namespace Pole.Sagas.Storage.PostgreSql private string CreateDbTablesScript(string schemaName) { var batchSql = $@" -111 CREATE SCHEMA IF NOT EXISTS ""{options.SchemaName}""; CREATE TABLE IF NOT EXISTS {GetSagaTableName()}( diff --git a/src/Pole.Sagas/Core/ActivityEntity.cs b/src/Pole.Sagas/Core/ActivityEntity.cs index fea4a46..9121c6a 100644 --- a/src/Pole.Sagas/Core/ActivityEntity.cs +++ b/src/Pole.Sagas/Core/ActivityEntity.cs @@ -15,7 +15,7 @@ namespace Pole.Sagas.Core public Byte[] ParameterData { get; set; } public Byte[] ResultData { get; set; } public string Errors { get; set; } - public int ExecuteTimes { get; set; } + public int OvertimeCompensateTimes { get; set; } public int CompensateTimes { get; set; } public DateTime AddTime { get; set; } } -- libgit2 0.25.0