PostgreSqlSagaStorageInitializer.cs
3.15 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
using Dapper;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Npgsql;
using Pole.Sagas.Core;
using Pole.Sagas.Core.Abstraction;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Pole.Sagas.Storage.PostgreSql
{
class PostgreSqlSagaStorageInitializer : ISagaStorageInitializer
{
private PoleSagasStoragePostgreSqlOption options;
private readonly ILogger logger;
public PostgreSqlSagaStorageInitializer(IOptions<PoleSagasStoragePostgreSqlOption> poleSagaServerOption, ILogger<PostgreSqlSagaStorageInitializer> logger)
{
this.options = poleSagaServerOption.Value;
this.logger = logger;
}
public string GetActivityTableName()
{
return $"\"{options.SchemaName}\".\"{options.ActivityTableName}\"";
}
public string GetSagaTableName()
{
return $"\"{options.SchemaName}\".\"{options.SagaTableName}\"";
}
public async Task InitializeAsync(CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested) return;
var sql = CreateDbTablesScript(options.SchemaName);
using (var connection = new NpgsqlConnection(options.ConnectionString))
{
await connection.ExecuteAsync(sql);
}
logger.LogDebug("Ensuring all create database tables script are applied.");
}
private string CreateDbTablesScript(string schemaName)
{
var batchSql = $@"
CREATE SCHEMA IF NOT EXISTS ""{options.SchemaName}"";
CREATE TABLE IF NOT EXISTS {GetSagaTableName()}(
""Id"" varchar(20) COLLATE ""pg_catalog"".""default"" NOT NULL,
""ServiceName"" varchar(64) COLLATE ""pg_catalog"".""default"" NOT NULL,
""Status"" varchar(10) COLLATE ""pg_catalog"".""default"" NOT NULL,
""ExpiresAt"" timestamp,
""AddTime"" timestamp NOT NULL
);
ALTER TABLE ""{GetSagaTableName()}"" ADD CONSTRAINT ""Sagas_pkey"" PRIMARY KEY (""Id"");
CREATE TABLE IF NOT EXISTS {GetActivityTableName()}(
""Id"" varchar(20) COLLATE ""pg_catalog"".""default"" NOT NULL,
""Name"" varchar(255) COLLATE ""pg_catalog"".""default"" NOT NULL,
""SagaId"" varchar(20) COLLATE ""pg_catalog"".""default"" NOT NULL,
""Order"" int4 NOT NULL,
""Status"" varchar(10) COLLATE ""pg_catalog"".""default"" NOT NULL,
""OvertimeCompensateTimes"" int4 NOT NULL,
""ParameterData"" bytea NOT NULL,
""CompensateErrors"" varchar(1024) COLLATE ""pg_catalog"".""default"",
""CompensateTimes"" int4 NOT NULL,
""AddTime"" timestamp NOT NULL
);
CREATE INDEX ""Activities_SagaId"" ON ""{GetActivityTableName()}"" USING btree (
""SagaId"" COLLATE ""pg_catalog"".""default"" ""pg_catalog"".""text_ops"" ASC NULLS LAST
);
ALTER TABLE ""{GetActivityTableName()}"" ADD CONSTRAINT ""Activities_pkey"" PRIMARY KEY (""Id"");
ALTER TABLE ""{GetActivityTableName()}"" ADD CONSTRAINT ""Activities_SagaId_fkey"" FOREIGN KEY (""SagaId"") REFERENCES {GetSagaTableName()} (""Id"") ON DELETE CASCADE ON UPDATE NO ACTION;
";
return batchSql;
}
}
}