diff --git a/Pole.sln b/Pole.sln
index 1caff7e..01ffaa1 100644
--- a/Pole.sln
+++ b/Pole.sln
@@ -41,7 +41,9 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Pole.Sagas.Storage.PostgreS
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SagasTest.Api", "samples\apis\SagasTest.Api\SagasTest.Api.csproj", "{6138197E-6202-4E1B-9458-3CBEE60A36F9}"
EndProject
-Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Pole.Sagas.Server", "src\Pole.Sagas.Server\Pole.Sagas.Server.csproj", "{34ECE24E-0D78-4764-BC54-0CEE61BDB96A}"
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Pole.Sagas.Server", "src\Pole.Sagas.Server\Pole.Sagas.Server.csproj", "{34ECE24E-0D78-4764-BC54-0CEE61BDB96A}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Pole.Sagas.Client", "src\Pole.Sagas.Client\Pole.Sagas.Client.csproj", "{EED4FEA7-4E20-41B2-9741-55FCA709373E}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
@@ -109,6 +111,10 @@ Global
{34ECE24E-0D78-4764-BC54-0CEE61BDB96A}.Debug|Any CPU.Build.0 = Debug|Any CPU
{34ECE24E-0D78-4764-BC54-0CEE61BDB96A}.Release|Any CPU.ActiveCfg = Release|Any CPU
{34ECE24E-0D78-4764-BC54-0CEE61BDB96A}.Release|Any CPU.Build.0 = Release|Any CPU
+ {EED4FEA7-4E20-41B2-9741-55FCA709373E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {EED4FEA7-4E20-41B2-9741-55FCA709373E}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {EED4FEA7-4E20-41B2-9741-55FCA709373E}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {EED4FEA7-4E20-41B2-9741-55FCA709373E}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@@ -131,6 +137,7 @@ Global
{9505BDFC-395B-4257-AEB3-2B44611147A4} = {9932C965-8B38-4F70-9E43-86DC56860E2B}
{6138197E-6202-4E1B-9458-3CBEE60A36F9} = {475116FC-DEEC-4255-94E4-AE7B8C85038D}
{34ECE24E-0D78-4764-BC54-0CEE61BDB96A} = {9932C965-8B38-4F70-9E43-86DC56860E2B}
+ {EED4FEA7-4E20-41B2-9741-55FCA709373E} = {9932C965-8B38-4F70-9E43-86DC56860E2B}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {DB0775A3-F293-4043-ADB7-72BAC081E87E}
diff --git a/samples/apis/SagasTest.Api/SagasTest.Api.csproj b/samples/apis/SagasTest.Api/SagasTest.Api.csproj
index 4c62276..89d53e8 100644
--- a/samples/apis/SagasTest.Api/SagasTest.Api.csproj
+++ b/samples/apis/SagasTest.Api/SagasTest.Api.csproj
@@ -9,7 +9,7 @@
-
+
diff --git a/src/Pole.Sagas.Client/Pole.Sagas.Client.csproj b/src/Pole.Sagas.Client/Pole.Sagas.Client.csproj
new file mode 100644
index 0000000..51732e3
--- /dev/null
+++ b/src/Pole.Sagas.Client/Pole.Sagas.Client.csproj
@@ -0,0 +1,12 @@
+
+
+
+ netstandard2.1
+
+
+
+
+
+
+
+
diff --git a/src/Pole.Sagas/PoleSagaServiceCollectionExtensions.cs b/src/Pole.Sagas.Client/PoleSagaServiceCollectionExtensions.cs
similarity index 100%
rename from src/Pole.Sagas/PoleSagaServiceCollectionExtensions.cs
rename to src/Pole.Sagas.Client/PoleSagaServiceCollectionExtensions.cs
diff --git a/src/Pole.Sagas.Storage.PostgreSql/Pole.Sagas.Storage.PostgreSql.csproj b/src/Pole.Sagas.Storage.PostgreSql/Pole.Sagas.Storage.PostgreSql.csproj
index ac168da..b52f75a 100644
--- a/src/Pole.Sagas.Storage.PostgreSql/Pole.Sagas.Storage.PostgreSql.csproj
+++ b/src/Pole.Sagas.Storage.PostgreSql/Pole.Sagas.Storage.PostgreSql.csproj
@@ -5,6 +5,12 @@
+
+
+
+
+
+
diff --git a/src/Pole.Sagas.Storage.PostgreSql/PoleSagasStoragePostgreSqlOption.cs b/src/Pole.Sagas.Storage.PostgreSql/PoleSagasStoragePostgreSqlOption.cs
new file mode 100644
index 0000000..0ae681e
--- /dev/null
+++ b/src/Pole.Sagas.Storage.PostgreSql/PoleSagasStoragePostgreSqlOption.cs
@@ -0,0 +1,15 @@
+using System;
+using System.Collections.Generic;
+using System.Text;
+
+namespace Pole.Sagas.Storage.PostgreSql
+{
+ public class PoleSagasStoragePostgreSqlOption
+ {
+ public string SagaTableName { get; set; }
+ public string SchemaName { get; set; }
+ public string ActivityTableName { get; set; }
+ public int SagasRecoveryIntervalSecond { get; set; }
+ public string ConnectionString { get; set; }
+ }
+}
diff --git a/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlEventStorageInitializer.cs b/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlEventStorageInitializer.cs
new file mode 100644
index 0000000..5505281
--- /dev/null
+++ b/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlEventStorageInitializer.cs
@@ -0,0 +1,84 @@
+using Dapper;
+using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Options;
+using Npgsql;
+using Pole.Core.EventBus.EventStorage;
+using Pole.Sagas.Core;
+using Pole.Sagas.Core.Abstraction;
+using System;
+using System.Collections.Generic;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace Pole.Sagas.Storage.PostgreSql
+{
+ class PostgreSqlEventStorageInitializer : ISagaStorageInitializer
+ {
+ private PoleSagasStoragePostgreSqlOption options;
+ private readonly ILogger logger;
+ public PostgreSqlEventStorageInitializer(IOptions poleSagaServerOption, ILogger logger)
+ {
+ this.options = poleSagaServerOption.Value;
+ this.logger = logger;
+ }
+ public string GetActivityTableName()
+ {
+ return $"\"{options.SchemaName}\".\"{options.ActivityTableName}\"";
+ }
+
+ public string GetSagaTableName()
+ {
+ return $"\"{options.SchemaName}\".\"{options.SagaTableName}\"";
+ }
+
+ public async Task InitializeAsync(CancellationToken cancellationToken)
+ {
+ if (cancellationToken.IsCancellationRequested) return;
+
+ var sql = CreateDbTablesScript(options.SchemaName);
+ using (var connection = new NpgsqlConnection(options.ConnectionString))
+ {
+ await connection.ExecuteAsync(sql);
+ }
+
+ logger.LogDebug("Ensuring all create database tables script are applied.");
+ }
+
+ private string CreateDbTablesScript(string schemaName)
+ {
+ var batchSql = $@"
+CREATE SCHEMA IF NOT EXISTS ""{options.SchemaName}"";
+
+CREATE TABLE IF NOT EXISTS {GetSagaTableName()}(
+ ""Id"" varchar(20) COLLATE ""pg_catalog"".""default"" NOT NULL,
+ ""ServiceName"" varchar(64) COLLATE ""pg_catalog"".""default"" NOT NULL,
+ ""Status"" varchar(10) COLLATE ""pg_catalog"".""default"" NOT NULL,
+ ""ExpiresAt"" timestamp,
+ ""AddTime"" timestamp NOT NULL
+);
+ALTER TABLE ""{options.SchemaName}"" ADD CONSTRAINT ""Sagas_pkey"" PRIMARY KEY (""Id"");
+
+CREATE TABLE IF NOT EXISTS {GetActivityTableName()}(
+ ""Id"" varchar(20) COLLATE ""pg_catalog"".""default"" NOT NULL,
+ ""SagaId"" varchar(20) COLLATE ""pg_catalog"".""default"" NOT NULL,
+ ""Order"" int4 NOT NULL,
+ ""Status"" varchar(10) COLLATE ""pg_catalog"".""default"" NOT NULL,
+ ""TimeOutSeconds"" int4 NOT NULL,
+ ""ParameterData"" bytea NOT NULL,
+ ""ResultData"" bytea,
+ ""Errors"" varchar(1024) COLLATE ""pg_catalog"".""default"",
+ ""ExecuteRetries"" int4 NOT NULL,
+ ""CompensateRetries"" int4 NOT NULL,
+ ""AddTime"" timestamp NOT NULL
+)
+;
+CREATE INDEX ""Activities_SagaId"" ON ""{GetActivityTableName()}"" USING btree (
+ ""SagaId"" COLLATE ""pg_catalog"".""default"" ""pg_catalog"".""text_ops"" ASC NULLS LAST
+);
+ALTER TABLE ""{GetActivityTableName()}"" ADD CONSTRAINT ""Activities_pkey"" PRIMARY KEY (""Id"");
+ ";
+ return batchSql;
+ }
+ }
+}
diff --git a/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorage.cs b/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorage.cs
new file mode 100644
index 0000000..5b82ab1
--- /dev/null
+++ b/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorage.cs
@@ -0,0 +1,61 @@
+using Pole.Sagas.Core;
+using System;
+using System.Collections.Generic;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Pole.Sagas.Storage.PostgreSql
+{
+ public class PostgreSqlSagaStorage : ISagaStorage
+ {
+ public Task ActivityCompensateAborted(string activityId, string sagaId, string errors)
+ {
+ throw new NotImplementedException();
+ }
+
+ public Task ActivityCompensated(string activityId)
+ {
+ throw new NotImplementedException();
+ }
+
+ public Task ActivityEnded(string activityId, byte[] resultData)
+ {
+ throw new NotImplementedException();
+ }
+
+ public Task ActivityExecuteAborted(string activityId, string errors)
+ {
+ throw new NotImplementedException();
+ }
+
+ public Task ActivityExecuteOvertime(string activityId, string sagaId, string errors)
+ {
+ throw new NotImplementedException();
+ }
+
+ public Task ActivityExecuteStarted(string activityId, string sagaId, int timeOutSeconds, byte[] ParameterData, int order, DateTime addTime)
+ {
+ throw new NotImplementedException();
+ }
+
+ public Task ActivityRetried(string activityId, string status, int retries, ActivityRetryType retryType)
+ {
+ throw new NotImplementedException();
+ }
+
+ public Task ActivityRevoked(string activityId)
+ {
+ throw new NotImplementedException();
+ }
+
+ public Task SagaEnded(string sagaId, DateTime ExpiresAt)
+ {
+ throw new NotImplementedException();
+ }
+
+ public Task SagaStarted(string sagaId, string serviceName, DateTime addTime)
+ {
+ throw new NotImplementedException();
+ }
+ }
+}
diff --git a/src/Pole.Sagas/Core/Abstraction/ISagaStorageInitializer.cs b/src/Pole.Sagas/Core/Abstraction/ISagaStorageInitializer.cs
new file mode 100644
index 0000000..1d47d3a
--- /dev/null
+++ b/src/Pole.Sagas/Core/Abstraction/ISagaStorageInitializer.cs
@@ -0,0 +1,15 @@
+using System;
+using System.Collections.Generic;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace Pole.Sagas.Core.Abstraction
+{
+ public interface ISagaStorageInitializer
+ {
+ Task InitializeAsync(CancellationToken cancellationToken);
+ string GetSagaTableName();
+ string GetActivityTableName();
+ }
+}
diff --git a/src/Pole.Sagas/Core/ActivityFinder.cs b/src/Pole.Sagas/Core/ActivityFinder.cs
index 3e17b21..17d4f7a 100644
--- a/src/Pole.Sagas/Core/ActivityFinder.cs
+++ b/src/Pole.Sagas/Core/ActivityFinder.cs
@@ -10,7 +10,7 @@ using System.Text;
namespace Pole.Sagas.Core
{
- class ActivityFinder : IActivityFinder
+ public class ActivityFinder : IActivityFinder
{
private readonly ConcurrentDictionary nameDict = new ConcurrentDictionary();
private readonly ConcurrentDictionary typeDict = new ConcurrentDictionary();
@@ -21,8 +21,8 @@ namespace Pole.Sagas.Core
var baseActivityType = typeof(IActivity<>);
foreach (var assembly in AssemblyHelper.GetAssemblies(this.logger))
{
-
- foreach (var type in assembly.GetTypes().Where(m => m.GetInterfaces().Any(i=>i.IsGenericType&& i.GetGenericTypeDefinition() == baseActivityType)&&m.IsClass&&!m.IsAbstract))
+
+ foreach (var type in assembly.GetTypes().Where(m => m.GetInterfaces().Any(i => i.IsGenericType && i.GetGenericTypeDefinition() == baseActivityType) && m.IsClass && !m.IsAbstract))
{
if (!type.FullName.EndsWith("Activity"))
{
diff --git a/src/Pole.Sagas/Core/EventSender.cs b/src/Pole.Sagas/Core/EventSender.cs
index 63dd80d..2fae955 100644
--- a/src/Pole.Sagas/Core/EventSender.cs
+++ b/src/Pole.Sagas/Core/EventSender.cs
@@ -10,7 +10,7 @@ using static Pole.Sagas.Server.Grpc.Saga;
namespace Pole.Sagas.Core
{
- class EventSender : IEventSender
+ public class EventSender : IEventSender
{
private readonly SagaClient sagaClient;
public EventSender(SagaClient sagaClient)
diff --git a/src/Pole.Sagas/Core/SagaFactory.cs b/src/Pole.Sagas/Core/SagaFactory.cs
index aa9cb9e..627b00d 100644
--- a/src/Pole.Sagas/Core/SagaFactory.cs
+++ b/src/Pole.Sagas/Core/SagaFactory.cs
@@ -8,11 +8,11 @@ using System.Text;
namespace Pole.Sagas.Core
{
- class SagaFactory : ISagaFactory
+ public class SagaFactory : ISagaFactory
{
private readonly ISnowflakeIdGenerator snowflakeIdGenerator;
private readonly IServiceProvider serviceProvider;
- private readonly IEventSender eventSender;
+ private readonly IEventSender eventSender;
private readonly PoleSagasOption poleSagasOption;
private readonly ISerializer serializer;
private readonly IActivityFinder activityFinder;