Commit fa6f2610 by dingsongjie

添加 sagas数据库初始化功能

parent d424c8c1
......@@ -41,7 +41,9 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Pole.Sagas.Storage.PostgreS
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SagasTest.Api", "samples\apis\SagasTest.Api\SagasTest.Api.csproj", "{6138197E-6202-4E1B-9458-3CBEE60A36F9}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Pole.Sagas.Server", "src\Pole.Sagas.Server\Pole.Sagas.Server.csproj", "{34ECE24E-0D78-4764-BC54-0CEE61BDB96A}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Pole.Sagas.Server", "src\Pole.Sagas.Server\Pole.Sagas.Server.csproj", "{34ECE24E-0D78-4764-BC54-0CEE61BDB96A}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Pole.Sagas.Client", "src\Pole.Sagas.Client\Pole.Sagas.Client.csproj", "{EED4FEA7-4E20-41B2-9741-55FCA709373E}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
......@@ -109,6 +111,10 @@ Global
{34ECE24E-0D78-4764-BC54-0CEE61BDB96A}.Debug|Any CPU.Build.0 = Debug|Any CPU
{34ECE24E-0D78-4764-BC54-0CEE61BDB96A}.Release|Any CPU.ActiveCfg = Release|Any CPU
{34ECE24E-0D78-4764-BC54-0CEE61BDB96A}.Release|Any CPU.Build.0 = Release|Any CPU
{EED4FEA7-4E20-41B2-9741-55FCA709373E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{EED4FEA7-4E20-41B2-9741-55FCA709373E}.Debug|Any CPU.Build.0 = Debug|Any CPU
{EED4FEA7-4E20-41B2-9741-55FCA709373E}.Release|Any CPU.ActiveCfg = Release|Any CPU
{EED4FEA7-4E20-41B2-9741-55FCA709373E}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
......@@ -131,6 +137,7 @@ Global
{9505BDFC-395B-4257-AEB3-2B44611147A4} = {9932C965-8B38-4F70-9E43-86DC56860E2B}
{6138197E-6202-4E1B-9458-3CBEE60A36F9} = {475116FC-DEEC-4255-94E4-AE7B8C85038D}
{34ECE24E-0D78-4764-BC54-0CEE61BDB96A} = {9932C965-8B38-4F70-9E43-86DC56860E2B}
{EED4FEA7-4E20-41B2-9741-55FCA709373E} = {9932C965-8B38-4F70-9E43-86DC56860E2B}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {DB0775A3-F293-4043-ADB7-72BAC081E87E}
......
......@@ -9,7 +9,7 @@
<ProjectReference Include="..\..\..\src\Pole.Core\Pole.Core.csproj" />
<ProjectReference Include="..\..\..\src\Pole.EventBus.Rabbitmq\Pole.EventBus.Rabbitmq.csproj" />
<ProjectReference Include="..\..\..\src\Pole.EventStorage.PostgreSql\Pole.EventStorage.PostgreSql.csproj" />
<ProjectReference Include="..\..\..\src\Pole.Sagas\Pole.Sagas.csproj" />
<ProjectReference Include="..\..\..\src\Pole.Sagas.Client\Pole.Sagas.Client.csproj" />
</ItemGroup>
</Project>
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>netstandard2.1</TargetFramework>
</PropertyGroup>
<ItemGroup>
<ProjectReference Include="..\Pole.Core\Pole.Core.csproj" />
<ProjectReference Include="..\Pole.Sagas\Pole.Sagas.csproj" />
</ItemGroup>
</Project>
......@@ -5,6 +5,12 @@
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Dapper" Version="2.0.30" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="3.1.2" />
<PackageReference Include="Npgsql" Version="4.1.3.1" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Pole.Sagas\Pole.Sagas.csproj" />
</ItemGroup>
......
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.Sagas.Storage.PostgreSql
{
public class PoleSagasStoragePostgreSqlOption
{
public string SagaTableName { get; set; }
public string SchemaName { get; set; }
public string ActivityTableName { get; set; }
public int SagasRecoveryIntervalSecond { get; set; }
public string ConnectionString { get; set; }
}
}
using Dapper;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Npgsql;
using Pole.Core.EventBus.EventStorage;
using Pole.Sagas.Core;
using Pole.Sagas.Core.Abstraction;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Pole.Sagas.Storage.PostgreSql
{
class PostgreSqlEventStorageInitializer : ISagaStorageInitializer
{
private PoleSagasStoragePostgreSqlOption options;
private readonly ILogger logger;
public PostgreSqlEventStorageInitializer(IOptions<PoleSagasStoragePostgreSqlOption> poleSagaServerOption, ILogger<PostgreSqlEventStorageInitializer> logger)
{
this.options = poleSagaServerOption.Value;
this.logger = logger;
}
public string GetActivityTableName()
{
return $"\"{options.SchemaName}\".\"{options.ActivityTableName}\"";
}
public string GetSagaTableName()
{
return $"\"{options.SchemaName}\".\"{options.SagaTableName}\"";
}
public async Task InitializeAsync(CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested) return;
var sql = CreateDbTablesScript(options.SchemaName);
using (var connection = new NpgsqlConnection(options.ConnectionString))
{
await connection.ExecuteAsync(sql);
}
logger.LogDebug("Ensuring all create database tables script are applied.");
}
private string CreateDbTablesScript(string schemaName)
{
var batchSql = $@"
CREATE SCHEMA IF NOT EXISTS ""{options.SchemaName}"";
CREATE TABLE IF NOT EXISTS {GetSagaTableName()}(
""Id"" varchar(20) COLLATE ""pg_catalog"".""default"" NOT NULL,
""ServiceName"" varchar(64) COLLATE ""pg_catalog"".""default"" NOT NULL,
""Status"" varchar(10) COLLATE ""pg_catalog"".""default"" NOT NULL,
""ExpiresAt"" timestamp,
""AddTime"" timestamp NOT NULL
);
ALTER TABLE ""{options.SchemaName}"" ADD CONSTRAINT ""Sagas_pkey"" PRIMARY KEY (""Id"");
CREATE TABLE IF NOT EXISTS {GetActivityTableName()}(
""Id"" varchar(20) COLLATE ""pg_catalog"".""default"" NOT NULL,
""SagaId"" varchar(20) COLLATE ""pg_catalog"".""default"" NOT NULL,
""Order"" int4 NOT NULL,
""Status"" varchar(10) COLLATE ""pg_catalog"".""default"" NOT NULL,
""TimeOutSeconds"" int4 NOT NULL,
""ParameterData"" bytea NOT NULL,
""ResultData"" bytea,
""Errors"" varchar(1024) COLLATE ""pg_catalog"".""default"",
""ExecuteRetries"" int4 NOT NULL,
""CompensateRetries"" int4 NOT NULL,
""AddTime"" timestamp NOT NULL
)
;
CREATE INDEX ""Activities_SagaId"" ON ""{GetActivityTableName()}"" USING btree (
""SagaId"" COLLATE ""pg_catalog"".""default"" ""pg_catalog"".""text_ops"" ASC NULLS LAST
);
ALTER TABLE ""{GetActivityTableName()}"" ADD CONSTRAINT ""Activities_pkey"" PRIMARY KEY (""Id"");
";
return batchSql;
}
}
}
using Pole.Sagas.Core;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
namespace Pole.Sagas.Storage.PostgreSql
{
public class PostgreSqlSagaStorage : ISagaStorage
{
public Task ActivityCompensateAborted(string activityId, string sagaId, string errors)
{
throw new NotImplementedException();
}
public Task ActivityCompensated(string activityId)
{
throw new NotImplementedException();
}
public Task ActivityEnded(string activityId, byte[] resultData)
{
throw new NotImplementedException();
}
public Task ActivityExecuteAborted(string activityId, string errors)
{
throw new NotImplementedException();
}
public Task ActivityExecuteOvertime(string activityId, string sagaId, string errors)
{
throw new NotImplementedException();
}
public Task ActivityExecuteStarted(string activityId, string sagaId, int timeOutSeconds, byte[] ParameterData, int order, DateTime addTime)
{
throw new NotImplementedException();
}
public Task ActivityRetried(string activityId, string status, int retries, ActivityRetryType retryType)
{
throw new NotImplementedException();
}
public Task ActivityRevoked(string activityId)
{
throw new NotImplementedException();
}
public Task SagaEnded(string sagaId, DateTime ExpiresAt)
{
throw new NotImplementedException();
}
public Task SagaStarted(string sagaId, string serviceName, DateTime addTime)
{
throw new NotImplementedException();
}
}
}
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Pole.Sagas.Core.Abstraction
{
public interface ISagaStorageInitializer
{
Task InitializeAsync(CancellationToken cancellationToken);
string GetSagaTableName();
string GetActivityTableName();
}
}
......@@ -10,7 +10,7 @@ using System.Text;
namespace Pole.Sagas.Core
{
class ActivityFinder : IActivityFinder
public class ActivityFinder : IActivityFinder
{
private readonly ConcurrentDictionary<string, Type> nameDict = new ConcurrentDictionary<string, Type>();
private readonly ConcurrentDictionary<Type, string> typeDict = new ConcurrentDictionary<Type, string>();
......@@ -21,8 +21,8 @@ namespace Pole.Sagas.Core
var baseActivityType = typeof(IActivity<>);
foreach (var assembly in AssemblyHelper.GetAssemblies(this.logger))
{
foreach (var type in assembly.GetTypes().Where(m => m.GetInterfaces().Any(i=>i.IsGenericType&& i.GetGenericTypeDefinition() == baseActivityType)&&m.IsClass&&!m.IsAbstract))
foreach (var type in assembly.GetTypes().Where(m => m.GetInterfaces().Any(i => i.IsGenericType && i.GetGenericTypeDefinition() == baseActivityType) && m.IsClass && !m.IsAbstract))
{
if (!type.FullName.EndsWith("Activity"))
{
......
......@@ -10,7 +10,7 @@ using static Pole.Sagas.Server.Grpc.Saga;
namespace Pole.Sagas.Core
{
class EventSender : IEventSender
public class EventSender : IEventSender
{
private readonly SagaClient sagaClient;
public EventSender(SagaClient sagaClient)
......
......@@ -8,11 +8,11 @@ using System.Text;
namespace Pole.Sagas.Core
{
class SagaFactory : ISagaFactory
public class SagaFactory : ISagaFactory
{
private readonly ISnowflakeIdGenerator snowflakeIdGenerator;
private readonly IServiceProvider serviceProvider;
private readonly IEventSender eventSender;
private readonly IEventSender eventSender;
private readonly PoleSagasOption poleSagasOption;
private readonly ISerializer serializer;
private readonly IActivityFinder activityFinder;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment