From fa6f26106baf705ce4c3fc5361466d47efc9dd2c Mon Sep 17 00:00:00 2001 From: dingsongjie Date: Mon, 9 Mar 2020 14:48:23 +0800 Subject: [PATCH] 添加 sagas数据库初始化功能 --- Pole.sln | 9 ++++++++- samples/apis/SagasTest.Api/SagasTest.Api.csproj | 2 +- src/Pole.Sagas.Client/Pole.Sagas.Client.csproj | 12 ++++++++++++ src/Pole.Sagas.Client/PoleSagaServiceCollectionExtensions.cs | 47 +++++++++++++++++++++++++++++++++++++++++++++++ src/Pole.Sagas.Storage.PostgreSql/Pole.Sagas.Storage.PostgreSql.csproj | 6 ++++++ src/Pole.Sagas.Storage.PostgreSql/PoleSagasStoragePostgreSqlOption.cs | 15 +++++++++++++++ src/Pole.Sagas.Storage.PostgreSql/PostgreSqlEventStorageInitializer.cs | 84 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorage.cs | 61 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/Pole.Sagas/Core/Abstraction/ISagaStorageInitializer.cs | 15 +++++++++++++++ src/Pole.Sagas/Core/ActivityFinder.cs | 6 +++--- src/Pole.Sagas/Core/EventSender.cs | 2 +- src/Pole.Sagas/Core/SagaFactory.cs | 4 ++-- src/Pole.Sagas/PoleSagaServiceCollectionExtensions.cs | 47 ----------------------------------------------- 13 files changed, 255 insertions(+), 55 deletions(-) create mode 100644 src/Pole.Sagas.Client/Pole.Sagas.Client.csproj create mode 100644 src/Pole.Sagas.Client/PoleSagaServiceCollectionExtensions.cs create mode 100644 src/Pole.Sagas.Storage.PostgreSql/PoleSagasStoragePostgreSqlOption.cs create mode 100644 src/Pole.Sagas.Storage.PostgreSql/PostgreSqlEventStorageInitializer.cs create mode 100644 src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorage.cs create mode 100644 src/Pole.Sagas/Core/Abstraction/ISagaStorageInitializer.cs delete mode 100644 src/Pole.Sagas/PoleSagaServiceCollectionExtensions.cs diff --git a/Pole.sln b/Pole.sln index 1caff7e..01ffaa1 100644 --- a/Pole.sln +++ b/Pole.sln @@ -41,7 +41,9 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Pole.Sagas.Storage.PostgreS EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SagasTest.Api", "samples\apis\SagasTest.Api\SagasTest.Api.csproj", "{6138197E-6202-4E1B-9458-3CBEE60A36F9}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Pole.Sagas.Server", "src\Pole.Sagas.Server\Pole.Sagas.Server.csproj", "{34ECE24E-0D78-4764-BC54-0CEE61BDB96A}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Pole.Sagas.Server", "src\Pole.Sagas.Server\Pole.Sagas.Server.csproj", "{34ECE24E-0D78-4764-BC54-0CEE61BDB96A}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Pole.Sagas.Client", "src\Pole.Sagas.Client\Pole.Sagas.Client.csproj", "{EED4FEA7-4E20-41B2-9741-55FCA709373E}" EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution @@ -109,6 +111,10 @@ Global {34ECE24E-0D78-4764-BC54-0CEE61BDB96A}.Debug|Any CPU.Build.0 = Debug|Any CPU {34ECE24E-0D78-4764-BC54-0CEE61BDB96A}.Release|Any CPU.ActiveCfg = Release|Any CPU {34ECE24E-0D78-4764-BC54-0CEE61BDB96A}.Release|Any CPU.Build.0 = Release|Any CPU + {EED4FEA7-4E20-41B2-9741-55FCA709373E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {EED4FEA7-4E20-41B2-9741-55FCA709373E}.Debug|Any CPU.Build.0 = Debug|Any CPU + {EED4FEA7-4E20-41B2-9741-55FCA709373E}.Release|Any CPU.ActiveCfg = Release|Any CPU + {EED4FEA7-4E20-41B2-9741-55FCA709373E}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -131,6 +137,7 @@ Global {9505BDFC-395B-4257-AEB3-2B44611147A4} = {9932C965-8B38-4F70-9E43-86DC56860E2B} {6138197E-6202-4E1B-9458-3CBEE60A36F9} = {475116FC-DEEC-4255-94E4-AE7B8C85038D} {34ECE24E-0D78-4764-BC54-0CEE61BDB96A} = {9932C965-8B38-4F70-9E43-86DC56860E2B} + {EED4FEA7-4E20-41B2-9741-55FCA709373E} = {9932C965-8B38-4F70-9E43-86DC56860E2B} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {DB0775A3-F293-4043-ADB7-72BAC081E87E} diff --git a/samples/apis/SagasTest.Api/SagasTest.Api.csproj b/samples/apis/SagasTest.Api/SagasTest.Api.csproj index 4c62276..89d53e8 100644 --- a/samples/apis/SagasTest.Api/SagasTest.Api.csproj +++ b/samples/apis/SagasTest.Api/SagasTest.Api.csproj @@ -9,7 +9,7 @@ - + diff --git a/src/Pole.Sagas.Client/Pole.Sagas.Client.csproj b/src/Pole.Sagas.Client/Pole.Sagas.Client.csproj new file mode 100644 index 0000000..51732e3 --- /dev/null +++ b/src/Pole.Sagas.Client/Pole.Sagas.Client.csproj @@ -0,0 +1,12 @@ + + + + netstandard2.1 + + + + + + + + diff --git a/src/Pole.Sagas.Client/PoleSagaServiceCollectionExtensions.cs b/src/Pole.Sagas.Client/PoleSagaServiceCollectionExtensions.cs new file mode 100644 index 0000000..3c053c8 --- /dev/null +++ b/src/Pole.Sagas.Client/PoleSagaServiceCollectionExtensions.cs @@ -0,0 +1,47 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Options; +using Pole.Core; +using Pole.Core.Utils; +using Pole.Sagas.Core; +using Pole.Sagas.Core.Abstraction; +using Pole.Sagas.Core.Exceptions; +using static Pole.Sagas.Server.Grpc.Saga; + +namespace Microsoft.Extensions.DependencyInjection +{ + public static class PoleSagaServiceCollectionExtensions + { + public static void AddSagas(this StartupConfig startupOption, Action configAction) + { + startupOption.Services.Configure(configAction); + startupOption.Services.AddSingleton(); + startupOption.Services.AddSingleton(); + startupOption.Services.AddSingleton(); + using(var provider = startupOption.Services.BuildServiceProvider()) + { + var sagasOption = provider.GetRequiredService>().Value; + startupOption.Services.AddGrpcClient(o => + { + o.Address = new Uri(sagasOption.SagasServerHost); + }); + } + var baseActivityType = typeof(IActivity<>); + foreach (var assembly in AssemblyHelper.GetAssemblies()) + { + + foreach (var type in assembly.GetTypes().Where(m => m.GetInterfaces().Any(i => i.IsGenericType && i.GetGenericTypeDefinition() == baseActivityType) && m.IsClass && !m.IsAbstract)) + { + if (!type.FullName.EndsWith("Activity")) + { + throw new ActivityNameIrregularException(type); + } + startupOption.Services.AddScoped(type); + } + } + } + } +} diff --git a/src/Pole.Sagas.Storage.PostgreSql/Pole.Sagas.Storage.PostgreSql.csproj b/src/Pole.Sagas.Storage.PostgreSql/Pole.Sagas.Storage.PostgreSql.csproj index ac168da..b52f75a 100644 --- a/src/Pole.Sagas.Storage.PostgreSql/Pole.Sagas.Storage.PostgreSql.csproj +++ b/src/Pole.Sagas.Storage.PostgreSql/Pole.Sagas.Storage.PostgreSql.csproj @@ -5,6 +5,12 @@ + + + + + + diff --git a/src/Pole.Sagas.Storage.PostgreSql/PoleSagasStoragePostgreSqlOption.cs b/src/Pole.Sagas.Storage.PostgreSql/PoleSagasStoragePostgreSqlOption.cs new file mode 100644 index 0000000..0ae681e --- /dev/null +++ b/src/Pole.Sagas.Storage.PostgreSql/PoleSagasStoragePostgreSqlOption.cs @@ -0,0 +1,15 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace Pole.Sagas.Storage.PostgreSql +{ + public class PoleSagasStoragePostgreSqlOption + { + public string SagaTableName { get; set; } + public string SchemaName { get; set; } + public string ActivityTableName { get; set; } + public int SagasRecoveryIntervalSecond { get; set; } + public string ConnectionString { get; set; } + } +} diff --git a/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlEventStorageInitializer.cs b/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlEventStorageInitializer.cs new file mode 100644 index 0000000..5505281 --- /dev/null +++ b/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlEventStorageInitializer.cs @@ -0,0 +1,84 @@ +using Dapper; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using Npgsql; +using Pole.Core.EventBus.EventStorage; +using Pole.Sagas.Core; +using Pole.Sagas.Core.Abstraction; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace Pole.Sagas.Storage.PostgreSql +{ + class PostgreSqlEventStorageInitializer : ISagaStorageInitializer + { + private PoleSagasStoragePostgreSqlOption options; + private readonly ILogger logger; + public PostgreSqlEventStorageInitializer(IOptions poleSagaServerOption, ILogger logger) + { + this.options = poleSagaServerOption.Value; + this.logger = logger; + } + public string GetActivityTableName() + { + return $"\"{options.SchemaName}\".\"{options.ActivityTableName}\""; + } + + public string GetSagaTableName() + { + return $"\"{options.SchemaName}\".\"{options.SagaTableName}\""; + } + + public async Task InitializeAsync(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) return; + + var sql = CreateDbTablesScript(options.SchemaName); + using (var connection = new NpgsqlConnection(options.ConnectionString)) + { + await connection.ExecuteAsync(sql); + } + + logger.LogDebug("Ensuring all create database tables script are applied."); + } + + private string CreateDbTablesScript(string schemaName) + { + var batchSql = $@" +CREATE SCHEMA IF NOT EXISTS ""{options.SchemaName}""; + +CREATE TABLE IF NOT EXISTS {GetSagaTableName()}( + ""Id"" varchar(20) COLLATE ""pg_catalog"".""default"" NOT NULL, + ""ServiceName"" varchar(64) COLLATE ""pg_catalog"".""default"" NOT NULL, + ""Status"" varchar(10) COLLATE ""pg_catalog"".""default"" NOT NULL, + ""ExpiresAt"" timestamp, + ""AddTime"" timestamp NOT NULL +); +ALTER TABLE ""{options.SchemaName}"" ADD CONSTRAINT ""Sagas_pkey"" PRIMARY KEY (""Id""); + +CREATE TABLE IF NOT EXISTS {GetActivityTableName()}( + ""Id"" varchar(20) COLLATE ""pg_catalog"".""default"" NOT NULL, + ""SagaId"" varchar(20) COLLATE ""pg_catalog"".""default"" NOT NULL, + ""Order"" int4 NOT NULL, + ""Status"" varchar(10) COLLATE ""pg_catalog"".""default"" NOT NULL, + ""TimeOutSeconds"" int4 NOT NULL, + ""ParameterData"" bytea NOT NULL, + ""ResultData"" bytea, + ""Errors"" varchar(1024) COLLATE ""pg_catalog"".""default"", + ""ExecuteRetries"" int4 NOT NULL, + ""CompensateRetries"" int4 NOT NULL, + ""AddTime"" timestamp NOT NULL +) +; +CREATE INDEX ""Activities_SagaId"" ON ""{GetActivityTableName()}"" USING btree ( + ""SagaId"" COLLATE ""pg_catalog"".""default"" ""pg_catalog"".""text_ops"" ASC NULLS LAST +); +ALTER TABLE ""{GetActivityTableName()}"" ADD CONSTRAINT ""Activities_pkey"" PRIMARY KEY (""Id""); + "; + return batchSql; + } + } +} diff --git a/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorage.cs b/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorage.cs new file mode 100644 index 0000000..5b82ab1 --- /dev/null +++ b/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorage.cs @@ -0,0 +1,61 @@ +using Pole.Sagas.Core; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; + +namespace Pole.Sagas.Storage.PostgreSql +{ + public class PostgreSqlSagaStorage : ISagaStorage + { + public Task ActivityCompensateAborted(string activityId, string sagaId, string errors) + { + throw new NotImplementedException(); + } + + public Task ActivityCompensated(string activityId) + { + throw new NotImplementedException(); + } + + public Task ActivityEnded(string activityId, byte[] resultData) + { + throw new NotImplementedException(); + } + + public Task ActivityExecuteAborted(string activityId, string errors) + { + throw new NotImplementedException(); + } + + public Task ActivityExecuteOvertime(string activityId, string sagaId, string errors) + { + throw new NotImplementedException(); + } + + public Task ActivityExecuteStarted(string activityId, string sagaId, int timeOutSeconds, byte[] ParameterData, int order, DateTime addTime) + { + throw new NotImplementedException(); + } + + public Task ActivityRetried(string activityId, string status, int retries, ActivityRetryType retryType) + { + throw new NotImplementedException(); + } + + public Task ActivityRevoked(string activityId) + { + throw new NotImplementedException(); + } + + public Task SagaEnded(string sagaId, DateTime ExpiresAt) + { + throw new NotImplementedException(); + } + + public Task SagaStarted(string sagaId, string serviceName, DateTime addTime) + { + throw new NotImplementedException(); + } + } +} diff --git a/src/Pole.Sagas/Core/Abstraction/ISagaStorageInitializer.cs b/src/Pole.Sagas/Core/Abstraction/ISagaStorageInitializer.cs new file mode 100644 index 0000000..1d47d3a --- /dev/null +++ b/src/Pole.Sagas/Core/Abstraction/ISagaStorageInitializer.cs @@ -0,0 +1,15 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace Pole.Sagas.Core.Abstraction +{ + public interface ISagaStorageInitializer + { + Task InitializeAsync(CancellationToken cancellationToken); + string GetSagaTableName(); + string GetActivityTableName(); + } +} diff --git a/src/Pole.Sagas/Core/ActivityFinder.cs b/src/Pole.Sagas/Core/ActivityFinder.cs index 3e17b21..17d4f7a 100644 --- a/src/Pole.Sagas/Core/ActivityFinder.cs +++ b/src/Pole.Sagas/Core/ActivityFinder.cs @@ -10,7 +10,7 @@ using System.Text; namespace Pole.Sagas.Core { - class ActivityFinder : IActivityFinder + public class ActivityFinder : IActivityFinder { private readonly ConcurrentDictionary nameDict = new ConcurrentDictionary(); private readonly ConcurrentDictionary typeDict = new ConcurrentDictionary(); @@ -21,8 +21,8 @@ namespace Pole.Sagas.Core var baseActivityType = typeof(IActivity<>); foreach (var assembly in AssemblyHelper.GetAssemblies(this.logger)) { - - foreach (var type in assembly.GetTypes().Where(m => m.GetInterfaces().Any(i=>i.IsGenericType&& i.GetGenericTypeDefinition() == baseActivityType)&&m.IsClass&&!m.IsAbstract)) + + foreach (var type in assembly.GetTypes().Where(m => m.GetInterfaces().Any(i => i.IsGenericType && i.GetGenericTypeDefinition() == baseActivityType) && m.IsClass && !m.IsAbstract)) { if (!type.FullName.EndsWith("Activity")) { diff --git a/src/Pole.Sagas/Core/EventSender.cs b/src/Pole.Sagas/Core/EventSender.cs index 63dd80d..2fae955 100644 --- a/src/Pole.Sagas/Core/EventSender.cs +++ b/src/Pole.Sagas/Core/EventSender.cs @@ -10,7 +10,7 @@ using static Pole.Sagas.Server.Grpc.Saga; namespace Pole.Sagas.Core { - class EventSender : IEventSender + public class EventSender : IEventSender { private readonly SagaClient sagaClient; public EventSender(SagaClient sagaClient) diff --git a/src/Pole.Sagas/Core/SagaFactory.cs b/src/Pole.Sagas/Core/SagaFactory.cs index aa9cb9e..627b00d 100644 --- a/src/Pole.Sagas/Core/SagaFactory.cs +++ b/src/Pole.Sagas/Core/SagaFactory.cs @@ -8,11 +8,11 @@ using System.Text; namespace Pole.Sagas.Core { - class SagaFactory : ISagaFactory + public class SagaFactory : ISagaFactory { private readonly ISnowflakeIdGenerator snowflakeIdGenerator; private readonly IServiceProvider serviceProvider; - private readonly IEventSender eventSender; + private readonly IEventSender eventSender; private readonly PoleSagasOption poleSagasOption; private readonly ISerializer serializer; private readonly IActivityFinder activityFinder; diff --git a/src/Pole.Sagas/PoleSagaServiceCollectionExtensions.cs b/src/Pole.Sagas/PoleSagaServiceCollectionExtensions.cs deleted file mode 100644 index 3c053c8..0000000 --- a/src/Pole.Sagas/PoleSagaServiceCollectionExtensions.cs +++ /dev/null @@ -1,47 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Options; -using Pole.Core; -using Pole.Core.Utils; -using Pole.Sagas.Core; -using Pole.Sagas.Core.Abstraction; -using Pole.Sagas.Core.Exceptions; -using static Pole.Sagas.Server.Grpc.Saga; - -namespace Microsoft.Extensions.DependencyInjection -{ - public static class PoleSagaServiceCollectionExtensions - { - public static void AddSagas(this StartupConfig startupOption, Action configAction) - { - startupOption.Services.Configure(configAction); - startupOption.Services.AddSingleton(); - startupOption.Services.AddSingleton(); - startupOption.Services.AddSingleton(); - using(var provider = startupOption.Services.BuildServiceProvider()) - { - var sagasOption = provider.GetRequiredService>().Value; - startupOption.Services.AddGrpcClient(o => - { - o.Address = new Uri(sagasOption.SagasServerHost); - }); - } - var baseActivityType = typeof(IActivity<>); - foreach (var assembly in AssemblyHelper.GetAssemblies()) - { - - foreach (var type in assembly.GetTypes().Where(m => m.GetInterfaces().Any(i => i.IsGenericType && i.GetGenericTypeDefinition() == baseActivityType) && m.IsClass && !m.IsAbstract)) - { - if (!type.FullName.EndsWith("Activity")) - { - throw new ActivityNameIrregularException(type); - } - startupOption.Services.AddScoped(type); - } - } - } - } -} -- libgit2 0.25.0