Skip to content
  • P
    Projects
  • G
    Groups
  • S
    Snippets
  • Help

丁松杰 / Pole

  • This project
    • Loading...
  • Sign in
Go to a project
  • Project
  • Repository
  • Issues 0
  • Merge Requests 0
  • Pipelines
  • Wiki
  • Snippets
  • Members
  • Activity
  • Graph
  • Charts
  • Create a new issue
  • Jobs
  • Commits
  • Issue Boards
  • Files
  • Commits
  • Branches
  • Tags
  • Contributors
  • Graph
  • Compare
  • Charts
Switch branch/tag
  • Pole
  • src
  • Pole.Sagas.Storage.PostgreSql
  • PostgreSqlSagaStorageInitializer.cs
Find file
BlameHistoryPermalink
  • dingsongjie's avatar
    重构 eventbus 的 代码结构 · 5511b5b8
    dingsongjie committed 5 years ago
    5511b5b8
PostgreSqlSagaStorageInitializer.cs 3.15 KB
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86
using Dapper;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Npgsql;
using Pole.Sagas.Core;
using Pole.Sagas.Core.Abstraction;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace Pole.Sagas.Storage.PostgreSql
{
    class PostgreSqlSagaStorageInitializer : ISagaStorageInitializer
    {
        private PoleSagasStoragePostgreSqlOption options;
        private readonly ILogger logger;
        public PostgreSqlSagaStorageInitializer(IOptions<PoleSagasStoragePostgreSqlOption> poleSagaServerOption, ILogger<PostgreSqlSagaStorageInitializer> logger)
        {
            this.options = poleSagaServerOption.Value;
            this.logger = logger;
        }
        public string GetActivityTableName()
        {
            return $"\"{options.SchemaName}\".\"{options.ActivityTableName}\"";
        }

        public string GetSagaTableName()
        {
            return $"\"{options.SchemaName}\".\"{options.SagaTableName}\"";
        }

        public async Task InitializeAsync(CancellationToken cancellationToken)
        {
            if (cancellationToken.IsCancellationRequested) return;

            var sql = CreateDbTablesScript(options.SchemaName);
            using (var connection = new NpgsqlConnection(options.ConnectionString))
            {
                await connection.ExecuteAsync(sql);
            }

            logger.LogDebug("Ensuring all create database tables script are applied.");
        }

        private string CreateDbTablesScript(string schemaName)
        {
            var batchSql = $@"
CREATE SCHEMA IF NOT EXISTS ""{options.SchemaName}"";

CREATE TABLE IF NOT EXISTS {GetSagaTableName()}(
  ""Id"" varchar(20) COLLATE ""pg_catalog"".""default"" NOT NULL,
  ""ServiceName"" varchar(64) COLLATE ""pg_catalog"".""default"" NOT NULL,
  ""Status"" varchar(10) COLLATE ""pg_catalog"".""default"" NOT NULL,
  ""ExpiresAt"" timestamp,
  ""AddTime"" timestamp NOT NULL
);
ALTER TABLE ""{GetSagaTableName()}"" ADD CONSTRAINT ""Sagas_pkey"" PRIMARY KEY (""Id"");

CREATE TABLE IF NOT EXISTS {GetActivityTableName()}(
  ""Id"" varchar(20) COLLATE ""pg_catalog"".""default"" NOT NULL,
  ""Name"" varchar(255) COLLATE ""pg_catalog"".""default"" NOT NULL,
  ""SagaId"" varchar(20) COLLATE ""pg_catalog"".""default"" NOT NULL,
  ""Order"" int4 NOT NULL,
  ""Status"" varchar(10) COLLATE ""pg_catalog"".""default"" NOT NULL,
  ""OvertimeCompensateTimes"" int4 NOT NULL,
  ""ParameterData"" bytea NOT NULL,
  ""CompensateErrors"" varchar(1024) COLLATE ""pg_catalog"".""default"",
  ""CompensateTimes"" int4 NOT NULL,
  ""AddTime"" timestamp NOT NULL
);

CREATE INDEX ""Activities_SagaId"" ON ""{GetActivityTableName()}"" USING btree (
  ""SagaId"" COLLATE ""pg_catalog"".""default"" ""pg_catalog"".""text_ops"" ASC NULLS LAST
);

ALTER TABLE ""{GetActivityTableName()}"" ADD CONSTRAINT ""Activities_pkey"" PRIMARY KEY (""Id"");


ALTER TABLE ""{GetActivityTableName()}"" ADD CONSTRAINT ""Activities_SagaId_fkey"" FOREIGN KEY (""SagaId"") REFERENCES {GetSagaTableName()} (""Id"") ON DELETE CASCADE ON UPDATE NO ACTION;
            ";
            return batchSql;
        }
    }
}