Commit 2d910fc1 by dingsongjie

优化宿主服务

parent 19eb4caf
......@@ -30,7 +30,7 @@ namespace SagasTest.Api
services.AddControllers();
services.AddPole(config =>
{
config.AddSagas(option=> {
config.AddSagasClient(option=> {
option.ServiceName = "SagasTest";
option.SagasServerHost = "http://localhost:80";
});
......
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Pole.Core.Processor
{
public interface IProcessorServer
{
Task Start(CancellationToken stoppingToken);
}
}
......@@ -12,7 +12,7 @@ using System.Threading.Tasks;
namespace Pole.EventBus.Processor.Server
{
public class BackgroundServiceBasedProcessorServer : BackgroundService, IProcessorServer
public class BackgroundServiceBasedProcessorServer : IHostedService
{
private readonly IServiceProvider _serviceProvider;
private Task _compositeTask;
......@@ -21,12 +21,13 @@ namespace Pole.EventBus.Processor.Server
{
_serviceProvider = serviceProvider;
}
public async Task Start(CancellationToken stoppingToken)
public async Task StartAsync(CancellationToken cancellationToken)
{
var eventStorageInitializer = _serviceProvider.GetService<IEventStorageInitializer>();
await eventStorageInitializer.InitializeAsync(stoppingToken);
await eventStorageInitializer.InitializeAsync(cancellationToken);
ProcessingContext processingContext = new ProcessingContext(stoppingToken);
ProcessingContext processingContext = new ProcessingContext(cancellationToken);
List<LoopProcessor> loopProcessors = new List<LoopProcessor>();
var innerProcessors = _serviceProvider.GetServices<IProcessor>();
var loggerFactory = _serviceProvider.GetService<ILoggerFactory>();
......@@ -40,9 +41,10 @@ namespace Pole.EventBus.Processor.Server
_compositeTask = Task.WhenAll(tasks);
await _compositeTask;
}
protected override Task ExecuteAsync(CancellationToken stoppingToken)
public Task StopAsync(CancellationToken cancellationToken)
{
return Start(stoppingToken);
return Task.CompletedTask;
}
}
}
......@@ -18,7 +18,7 @@ namespace Microsoft.Extensions.DependencyInjection
{
public static class PoleSagaServiceCollectionExtensions
{
public static void AddSagas(this StartupConfig startupOption, Action<PoleSagasOption> configAction)
public static void AddSagasClient(this StartupConfig startupOption, Action<PoleSagasOption> configAction)
{
// 让客户端支持 没有TLS 的 grpc call
AppContext.SetSwitch("System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport", true);
......@@ -26,7 +26,7 @@ namespace Microsoft.Extensions.DependencyInjection
startupOption.Services.AddSingleton<IActivityFinder, ActivityFinder>();
startupOption.Services.AddSingleton<IEventSender, EventSender>();
startupOption.Services.AddSingleton<ISagaFactory, SagaFactory>();
startupOption.Services.AddHostedService<NotEndedSagasCompensateRetryBackgroundService>();
startupOption.Services.AddHostedService<SagasCompensateRetryBackgroundService>();
PoleSagasOption sagasOption = null;
using (var provider = startupOption.Services.BuildServiceProvider())
{
......
......@@ -17,14 +17,14 @@ using static Pole.Sagas.Server.Grpc.Saga;
namespace Pole.Sagas.Client
{
public class NotEndedSagasCompensateRetryBackgroundService : IHostedService
public class SagasCompensateRetryBackgroundService : IHostedService
{
private readonly PoleSagasOption options;
private readonly SagaClient sagaClient;
private readonly SagaRestorer sagaRestorer;
private readonly IEventSender eventSender;
private readonly ILogger logger;
public NotEndedSagasCompensateRetryBackgroundService(IOptions<PoleSagasOption> options, SagaClient sagaClient, IServiceProvider serviceProvider, IEventSender eventSender, ILogger<NotEndedSagasCompensateRetryBackgroundService> logger)
public SagasCompensateRetryBackgroundService(IOptions<PoleSagasOption> options, SagaClient sagaClient, IServiceProvider serviceProvider, IEventSender eventSender, ILogger<SagasCompensateRetryBackgroundService> logger)
{
this.options = options.Value;
this.sagaClient = sagaClient;
......@@ -43,7 +43,7 @@ namespace Pole.Sagas.Client
}
catch (Exception ex)
{
logger.LogError(ex, "Errors in GRPC GetSagas");
logger.LogError(ex, "Errors in grpc get sagas");
}
finally
{
......
......@@ -12,7 +12,7 @@ using System.Linq;
namespace Pole.Sagas.Server.Processor
{
public class BackgroundServiceBasedProcessorServer : BackgroundService, IProcessorServer
public class BackgroundServiceBasedProcessorServer : IHostedService
{
private readonly IServiceProvider _serviceProvider;
private Task _compositeTask;
......@@ -21,12 +21,13 @@ namespace Pole.Sagas.Server.Processor
{
_serviceProvider = serviceProvider;
}
public async Task Start(CancellationToken stoppingToken)
public async Task StartAsync(CancellationToken cancellationToken)
{
var eventStorageInitializer = _serviceProvider.GetService<ISagaStorageInitializer>();
await eventStorageInitializer.InitializeAsync(stoppingToken);
await eventStorageInitializer.InitializeAsync(cancellationToken);
ProcessingContext processingContext = new ProcessingContext(stoppingToken);
ProcessingContext processingContext = new ProcessingContext(cancellationToken);
List<LoopProcessor> loopProcessors = new List<LoopProcessor>();
var innerProcessors = _serviceProvider.GetServices<IProcessor>();
var loggerFactory = _serviceProvider.GetService<ILoggerFactory>();
......@@ -40,9 +41,10 @@ namespace Pole.Sagas.Server.Processor
_compositeTask = Task.WhenAll(tasks);
await _compositeTask;
}
protected override Task ExecuteAsync(CancellationToken stoppingToken)
public Task StopAsync(CancellationToken cancellationToken)
{
return Start(stoppingToken);
return Task.CompletedTask;
}
}
}
......@@ -36,17 +36,18 @@ namespace Pole.Sagas.Storage.PostgreSql
if (cancellationToken.IsCancellationRequested) return;
var sql = CreateDbTablesScript(options.SchemaName);
using (var connection = new NpgsqlConnection(options.ConnectionString))
{
await connection.ExecuteAsync(sql);
}
logger.LogDebug("Ensuring all create database tables script are applied.");
}
private string CreateDbTablesScript(string schemaName)
{
var batchSql = $@"
111
CREATE SCHEMA IF NOT EXISTS ""{options.SchemaName}"";
CREATE TABLE IF NOT EXISTS {GetSagaTableName()}(
......@@ -54,9 +55,9 @@ CREATE TABLE IF NOT EXISTS {GetSagaTableName()}(
""ServiceName"" varchar(64) COLLATE ""pg_catalog"".""default"" NOT NULL,
""Status"" varchar(10) COLLATE ""pg_catalog"".""default"" NOT NULL,
""ExpiresAt"" timestamp,
""AddTime"" timestamp NOT NULL
""AddTime"" timestamp NOT NULL,
CONSTRAINT ""Sagas_pkey"" PRIMARY KEY (""Id"")
);
ALTER TABLE {GetSagaTableName()} ADD CONSTRAINT ""Sagas_pkey"" PRIMARY KEY (""Id"");
CREATE TABLE IF NOT EXISTS {GetActivityTableName()}(
""Id"" varchar(20) COLLATE ""pg_catalog"".""default"" NOT NULL,
......@@ -68,17 +69,14 @@ CREATE TABLE IF NOT EXISTS {GetActivityTableName()}(
""ParameterData"" bytea NOT NULL,
""CompensateErrors"" varchar(1024) COLLATE ""pg_catalog"".""default"",
""CompensateTimes"" int4 NOT NULL,
""AddTime"" timestamp NOT NULL
""AddTime"" timestamp NOT NULL,
CONSTRAINT ""Activities_pkey"" PRIMARY KEY (""Id""),
CONSTRAINT ""Activities_SagaId_fkey"" FOREIGN KEY (""SagaId"") REFERENCES {GetSagaTableName()} (""Id"") ON DELETE CASCADE ON UPDATE NO ACTION
);
CREATE INDEX ""Activities_SagaId"" ON {GetActivityTableName()} USING btree (
CREATE INDEX IF NOT EXISTS ""Activities_SagaId"" ON {GetActivityTableName()} USING btree (
""SagaId"" COLLATE ""pg_catalog"".""default"" ""pg_catalog"".""text_ops"" ASC NULLS LAST
);
ALTER TABLE {GetActivityTableName()} ADD CONSTRAINT ""Activities_pkey"" PRIMARY KEY (""Id"");
ALTER TABLE {GetActivityTableName()} ADD CONSTRAINT ""Activities_SagaId_fkey"" FOREIGN KEY (""SagaId"") REFERENCES {GetSagaTableName()} (""Id"") ON DELETE CASCADE ON UPDATE NO ACTION;
";
return batchSql;
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment