Commit 4e4262ee by dingsongjie

优化 backgroundserver

parent 20631e55
...@@ -12,7 +12,7 @@ using System.Threading.Tasks; ...@@ -12,7 +12,7 @@ using System.Threading.Tasks;
namespace Pole.EventBus.Processor.Server namespace Pole.EventBus.Processor.Server
{ {
public class BackgroundServiceBasedProcessorServer : IHostedService public class BackgroundServiceBasedProcessorServer : BackgroundService
{ {
private readonly IServiceProvider _serviceProvider; private readonly IServiceProvider _serviceProvider;
private Task _compositeTask; private Task _compositeTask;
...@@ -22,12 +22,16 @@ namespace Pole.EventBus.Processor.Server ...@@ -22,12 +22,16 @@ namespace Pole.EventBus.Processor.Server
_serviceProvider = serviceProvider; _serviceProvider = serviceProvider;
} }
public async Task StartAsync(CancellationToken cancellationToken) public override async Task StartAsync(CancellationToken cancellationToken)
{ {
var eventStorageInitializer = _serviceProvider.GetService<IEventStorageInitializer>(); var eventStorageInitializer = _serviceProvider.GetService<IEventStorageInitializer>();
await eventStorageInitializer.InitializeAsync(cancellationToken); await eventStorageInitializer.InitializeAsync(cancellationToken);
await base.StartAsync(cancellationToken);
}
ProcessingContext processingContext = new ProcessingContext(cancellationToken); protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
ProcessingContext processingContext = new ProcessingContext(stoppingToken);
List<LoopProcessor> loopProcessors = new List<LoopProcessor>(); List<LoopProcessor> loopProcessors = new List<LoopProcessor>();
var innerProcessors = _serviceProvider.GetServices<IProcessor>(); var innerProcessors = _serviceProvider.GetServices<IProcessor>();
var loggerFactory = _serviceProvider.GetService<ILoggerFactory>(); var loggerFactory = _serviceProvider.GetService<ILoggerFactory>();
...@@ -41,10 +45,5 @@ namespace Pole.EventBus.Processor.Server ...@@ -41,10 +45,5 @@ namespace Pole.EventBus.Processor.Server
_compositeTask = Task.WhenAll(tasks); _compositeTask = Task.WhenAll(tasks);
await _compositeTask; await _compositeTask;
} }
public Task StopAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
} }
} }
...@@ -71,7 +71,7 @@ namespace Pole.Sagas.Client ...@@ -71,7 +71,7 @@ namespace Pole.Sagas.Client
result.ActivityEntities = m.Activities.Select(n => new ActivityEntity result.ActivityEntities = m.Activities.Select(n => new ActivityEntity
{ {
CompensateTimes = n.CompensateTimes, CompensateTimes = n.CompensateTimes,
ExecuteTimes = n.ExecuteTimes, OvertimeCompensateTimes = n.ExecuteTimes,
Id = n.Id, Id = n.Id,
Name = n.Id, Name = n.Id,
Order = n.Order, Order = n.Order,
......
...@@ -12,7 +12,7 @@ using System.Linq; ...@@ -12,7 +12,7 @@ using System.Linq;
namespace Pole.Sagas.Server.Processor namespace Pole.Sagas.Server.Processor
{ {
public class BackgroundServiceBasedProcessorServer : IHostedService public class BackgroundServiceBasedProcessorServer : BackgroundService
{ {
private readonly IServiceProvider _serviceProvider; private readonly IServiceProvider _serviceProvider;
private Task _compositeTask; private Task _compositeTask;
...@@ -22,12 +22,15 @@ namespace Pole.Sagas.Server.Processor ...@@ -22,12 +22,15 @@ namespace Pole.Sagas.Server.Processor
_serviceProvider = serviceProvider; _serviceProvider = serviceProvider;
} }
public async Task StartAsync(CancellationToken cancellationToken) public override async Task StartAsync(CancellationToken cancellationToken)
{ {
var eventStorageInitializer = _serviceProvider.GetService<ISagaStorageInitializer>(); var eventStorageInitializer = _serviceProvider.GetService<ISagaStorageInitializer>();
await eventStorageInitializer.InitializeAsync(cancellationToken); await eventStorageInitializer.InitializeAsync(cancellationToken);
await base.StartAsync(cancellationToken);
ProcessingContext processingContext = new ProcessingContext(cancellationToken); }
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
ProcessingContext processingContext = new ProcessingContext(stoppingToken);
List<LoopProcessor> loopProcessors = new List<LoopProcessor>(); List<LoopProcessor> loopProcessors = new List<LoopProcessor>();
var innerProcessors = _serviceProvider.GetServices<IProcessor>(); var innerProcessors = _serviceProvider.GetServices<IProcessor>();
var loggerFactory = _serviceProvider.GetService<ILoggerFactory>(); var loggerFactory = _serviceProvider.GetService<ILoggerFactory>();
...@@ -41,10 +44,5 @@ namespace Pole.Sagas.Server.Processor ...@@ -41,10 +44,5 @@ namespace Pole.Sagas.Server.Processor
_compositeTask = Task.WhenAll(tasks); _compositeTask = Task.WhenAll(tasks);
await _compositeTask; await _compositeTask;
} }
public Task StopAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
} }
} }
...@@ -154,7 +154,7 @@ namespace Pole.Sagas.Server.Services ...@@ -154,7 +154,7 @@ namespace Pole.Sagas.Server.Services
result.Activities.Add(m.ActivityEntities.Select(n => new GetSagasResponse.Types.Saga.Types.Activity result.Activities.Add(m.ActivityEntities.Select(n => new GetSagasResponse.Types.Saga.Types.Activity
{ {
CompensateTimes = n.CompensateTimes, CompensateTimes = n.CompensateTimes,
ExecuteTimes = n.ExecuteTimes, ExecuteTimes = n.OvertimeCompensateTimes,
Id = n.Id, Id = n.Id,
Name = n.Id, Name = n.Id,
Order = n.Order, Order = n.Order,
......
...@@ -12,7 +12,7 @@ namespace Pole.Sagas.Storage.PostgreSql ...@@ -12,7 +12,7 @@ namespace Pole.Sagas.Storage.PostgreSql
public int Order { get; set; } public int Order { get; set; }
public string Status { get; set; } public string Status { get; set; }
public byte[] ParameterData { get; set; } public byte[] ParameterData { get; set; }
public int ExecuteTimes { get; set; } public int OvertimeCompensateTimes { get; set; }
public int CompensateTimes { get; set; } public int CompensateTimes { get; set; }
public int Name { get; set; } public int Name { get; set; }
} }
......
...@@ -169,9 +169,9 @@ $"INSERT INTO {sagaTableName} (\"Id\",\"ServiceName\",\"Status\",\"AddTime\")" + ...@@ -169,9 +169,9 @@ $"INSERT INTO {sagaTableName} (\"Id\",\"ServiceName\",\"Status\",\"AddTime\")" +
{ {
using (var connection = new NpgsqlConnection(poleSagasStoragePostgreSqlOption.ConnectionString)) using (var connection = new NpgsqlConnection(poleSagasStoragePostgreSqlOption.ConnectionString))
{ {
var updateActivitySql = var sql =
$"select limit_sagas.\"Id\" as SagaId,limit_sagas.\"ServiceName\",activities.\"Id\" as ActivityId,activities.\"Order\",activities.\"Status\",activities.\"ParameterData\",activities.\"ExecuteTimes\",activities.\"CompensateTimes\",activities.\"Name\" from \"Activities\" as activities inner join(select \"Id\",\"ServiceName\" from \"Sagas\" where \"AddTime\" <= @AddTime and \"Status\" = '{nameof(SagaStatus.Started)}' limit @Limit ) as limit_sagas on activities.\"SagaId\" = limit_sagas.\"Id\" and activities.\"Status\" != @Status1 and activities.\"Status\" != @Status2"; $"select limit_sagas.\"Id\" as SagaId,limit_sagas.\"ServiceName\",activities.\"Id\" as ActivityId,activities.\"Order\",activities.\"Status\",activities.\"ParameterData\",activities.\"OvertimeCompensateTimes\",activities.\"CompensateTimes\",activities.\"Name\" from {activityTableName} as activities inner join(select \"Id\",\"ServiceName\" from {sagaTableName} where \"AddTime\" <= @AddTime and \"Status\" = '{nameof(SagaStatus.Started)}' limit @Limit ) as limit_sagas on activities.\"SagaId\" = limit_sagas.\"Id\" and activities.\"Status\" != @Status1 and activities.\"Status\" != @Status2";
var activities = await connection.QueryAsync<ActivityAndSagaEntity>(updateActivitySql, new var activities = await connection.QueryAsync<ActivityAndSagaEntity>(sql, new
{ {
AddTime = dateTime, AddTime = dateTime,
Limit = limit, Limit = limit,
...@@ -197,7 +197,7 @@ $"select limit_sagas.\"Id\" as SagaId,limit_sagas.\"ServiceName\",activities.\"I ...@@ -197,7 +197,7 @@ $"select limit_sagas.\"Id\" as SagaId,limit_sagas.\"ServiceName\",activities.\"I
ActivityEntity activityEntity = new ActivityEntity ActivityEntity activityEntity = new ActivityEntity
{ {
CompensateTimes = activity.CompensateTimes, CompensateTimes = activity.CompensateTimes,
ExecuteTimes = activity.ExecuteTimes, OvertimeCompensateTimes = activity.OvertimeCompensateTimes,
Id = activity.Id, Id = activity.Id,
Order = activity.Order, Order = activity.Order,
ParameterData = activity.ParameterData, ParameterData = activity.ParameterData,
...@@ -213,17 +213,18 @@ $"select limit_sagas.\"Id\" as SagaId,limit_sagas.\"ServiceName\",activities.\"I ...@@ -213,17 +213,18 @@ $"select limit_sagas.\"Id\" as SagaId,limit_sagas.\"ServiceName\",activities.\"I
} }
} }
public Task<int> DeleteExpiredData(string tableName, DateTime ExpiredAt, int batchCount) public async Task<int> DeleteExpiredData(string tableName, DateTime ExpiredAt, int batchCount)
{ {
using (var connection = new NpgsqlConnection(poleSagasStoragePostgreSqlOption.ConnectionString)) using (var connection = new NpgsqlConnection(poleSagasStoragePostgreSqlOption.ConnectionString))
{ {
var sql = var sql =
$"delete {tableName} WHERE \"ExpiresAt\" < @ExpiredAt AND \"Id\" IN (SELECT \"Id\" FROM {tableName} LIMIT @BatchCount);"; $"DELETE FROM {tableName} WHERE \"ExpiresAt\" < @ExpiredAt AND \"Id\" IN (SELECT \"Id\" FROM {tableName} LIMIT @BatchCount);";
return connection.ExecuteAsync(sql, new var result = await connection.ExecuteAsync(sql, new
{ {
ExpiredAt = ExpiredAt, ExpiredAt = ExpiredAt,
BatchCount = batchCount, BatchCount = batchCount,
}); });
return result;
} }
} }
......
...@@ -47,7 +47,6 @@ namespace Pole.Sagas.Storage.PostgreSql ...@@ -47,7 +47,6 @@ namespace Pole.Sagas.Storage.PostgreSql
private string CreateDbTablesScript(string schemaName) private string CreateDbTablesScript(string schemaName)
{ {
var batchSql = $@" var batchSql = $@"
111
CREATE SCHEMA IF NOT EXISTS ""{options.SchemaName}""; CREATE SCHEMA IF NOT EXISTS ""{options.SchemaName}"";
CREATE TABLE IF NOT EXISTS {GetSagaTableName()}( CREATE TABLE IF NOT EXISTS {GetSagaTableName()}(
......
...@@ -15,7 +15,7 @@ namespace Pole.Sagas.Core ...@@ -15,7 +15,7 @@ namespace Pole.Sagas.Core
public Byte[] ParameterData { get; set; } public Byte[] ParameterData { get; set; }
public Byte[] ResultData { get; set; } public Byte[] ResultData { get; set; }
public string Errors { get; set; } public string Errors { get; set; }
public int ExecuteTimes { get; set; } public int OvertimeCompensateTimes { get; set; }
public int CompensateTimes { get; set; } public int CompensateTimes { get; set; }
public DateTime AddTime { get; set; } public DateTime AddTime { get; set; }
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment