From ac247fc1b78f7d0b6b6bd3b252ef1fce0ebe9ce1 Mon Sep 17 00:00:00 2001 From: 丁松杰 <377973147@qq.com> Date: Wed, 12 Feb 2020 18:23:40 +0800 Subject: [PATCH] 添加可靠消息部分代码 --- Pole.sln | 7 +++++++ src/Pole.Core/EventBus/Bus.cs | 2 ++ src/Pole.Core/EventBus/Transaction/IDbTransactionAdapter.cs | 16 ++++++++++++++++ src/Pole.EventStorage.PostgreSql/Class1.cs | 8 ++++++++ src/Pole.EventStorage.PostgreSql/Pole.EventStorage.PostgreSql.csproj | 15 +++++++++++++++ src/Pole.EventStorage.PostgreSql/PostgreSqlDbTransactionAdapter.cs | 40 ++++++++++++++++++++++++++++++++++++++++ 6 files changed, 88 insertions(+) create mode 100644 src/Pole.Core/EventBus/Transaction/IDbTransactionAdapter.cs create mode 100644 src/Pole.EventStorage.PostgreSql/Class1.cs create mode 100644 src/Pole.EventStorage.PostgreSql/Pole.EventStorage.PostgreSql.csproj create mode 100644 src/Pole.EventStorage.PostgreSql/PostgreSqlDbTransactionAdapter.cs diff --git a/Pole.sln b/Pole.sln index 87c7f66..973006e 100644 --- a/Pole.sln +++ b/Pole.sln @@ -49,6 +49,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Pole.EventBus.Rabbitmq", "s EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Pole.Core.Test", "test\Pole.Core.Test\Pole.Core.Test.csproj", "{23EA8735-DB2E-4599-8902-8FCBCBE4799C}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Pole.EventStorage.PostgreSql", "src\Pole.EventStorage.PostgreSql\Pole.EventStorage.PostgreSql.csproj", "{548EFDBB-252F-48DD-87F4-58ABFBD4963C}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -123,6 +125,10 @@ Global {23EA8735-DB2E-4599-8902-8FCBCBE4799C}.Debug|Any CPU.Build.0 = Debug|Any CPU {23EA8735-DB2E-4599-8902-8FCBCBE4799C}.Release|Any CPU.ActiveCfg = Release|Any CPU {23EA8735-DB2E-4599-8902-8FCBCBE4799C}.Release|Any CPU.Build.0 = Release|Any CPU + {548EFDBB-252F-48DD-87F4-58ABFBD4963C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {548EFDBB-252F-48DD-87F4-58ABFBD4963C}.Debug|Any CPU.Build.0 = Debug|Any CPU + {548EFDBB-252F-48DD-87F4-58ABFBD4963C}.Release|Any CPU.ActiveCfg = Release|Any CPU + {548EFDBB-252F-48DD-87F4-58ABFBD4963C}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -148,6 +154,7 @@ Global {9C0DFC90-1AF9-424A-B5FB-2A7C3611970C} = {74422E64-29FE-4287-A86E-741D1DFF6698} {BDF62A19-FFBD-4EE1-A07A-68472E680A95} = {9932C965-8B38-4F70-9E43-86DC56860E2B} {23EA8735-DB2E-4599-8902-8FCBCBE4799C} = {655E719B-4A3E-467C-A541-E0770AB81DE1} + {548EFDBB-252F-48DD-87F4-58ABFBD4963C} = {9932C965-8B38-4F70-9E43-86DC56860E2B} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {DB0775A3-F293-4043-ADB7-72BAC081E87E} diff --git a/src/Pole.Core/EventBus/Bus.cs b/src/Pole.Core/EventBus/Bus.cs index be2f585..7586706 100644 --- a/src/Pole.Core/EventBus/Bus.cs +++ b/src/Pole.Core/EventBus/Bus.cs @@ -1,5 +1,6 @@ using Pole.Core.Abstraction; using Pole.Core.EventBus.Event; +using Pole.Core.EventBus.Transaction; using Pole.Core.Serialization; using Pole.Core.Utils.Abstraction; using System; @@ -16,6 +17,7 @@ namespace Pole.Core.EventBus private readonly IEventTypeFinder eventTypeFinder; private readonly ISerializer serializer; private readonly ISnowflakeIdGenerator snowflakeIdGenerator; + AsyncLocal Transaction { get; } public Bus(IProducer producer, IEventTypeFinder eventTypeFinder, ISerializer serializer, ISnowflakeIdGenerator snowflakeIdGenerator) { this.producer = producer; diff --git a/src/Pole.Core/EventBus/Transaction/IDbTransactionAdapter.cs b/src/Pole.Core/EventBus/Transaction/IDbTransactionAdapter.cs new file mode 100644 index 0000000..dd7fe98 --- /dev/null +++ b/src/Pole.Core/EventBus/Transaction/IDbTransactionAdapter.cs @@ -0,0 +1,16 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace Pole.Core.EventBus.Transaction +{ + public interface IDbTransactionAdapter : IDisposable + { + Task CommitAsync(CancellationToken cancellationToken = default); + Task RollbackAsync(CancellationToken cancellationToken = default); + bool AutoCommit { get; set; } + object DbTransaction { get; set; } + } +} diff --git a/src/Pole.EventStorage.PostgreSql/Class1.cs b/src/Pole.EventStorage.PostgreSql/Class1.cs new file mode 100644 index 0000000..4822ca7 --- /dev/null +++ b/src/Pole.EventStorage.PostgreSql/Class1.cs @@ -0,0 +1,8 @@ +using System; + +namespace Pole.EventStorage.PostgreSql +{ + public class Class1 + { + } +} diff --git a/src/Pole.EventStorage.PostgreSql/Pole.EventStorage.PostgreSql.csproj b/src/Pole.EventStorage.PostgreSql/Pole.EventStorage.PostgreSql.csproj new file mode 100644 index 0000000..96da4d5 --- /dev/null +++ b/src/Pole.EventStorage.PostgreSql/Pole.EventStorage.PostgreSql.csproj @@ -0,0 +1,15 @@ + + + + netstandard2.1 + + + + + + + + + + + diff --git a/src/Pole.EventStorage.PostgreSql/PostgreSqlDbTransactionAdapter.cs b/src/Pole.EventStorage.PostgreSql/PostgreSqlDbTransactionAdapter.cs new file mode 100644 index 0000000..ee443aa --- /dev/null +++ b/src/Pole.EventStorage.PostgreSql/PostgreSqlDbTransactionAdapter.cs @@ -0,0 +1,40 @@ +using Microsoft.EntityFrameworkCore.Storage; +using Pole.Core.EventBus.Transaction; +using System; +using System.Collections.Generic; +using System.Data; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace Pole.EventStorage.PostgreSql +{ + class PostgreSqlDbTransactionAdapter : IDbTransactionAdapter + { + public bool AutoCommit { get => throw new NotImplementedException(); set => throw new NotImplementedException(); } + public object DbTransaction { get => throw new NotImplementedException(); set => throw new NotImplementedException(); } + + public async Task CommitAsync(CancellationToken cancellationToken = default) + { + switch (DbTransaction) + { + case IDbTransaction dbTransaction: + dbTransaction.Commit(); + break; + case IDbContextTransaction dbContextTransaction: + await dbContextTransaction.CommitAsync(cancellationToken); + break; + } + } + + public void Dispose() + { + throw new NotImplementedException(); + } + + public Task RollbackAsync(CancellationToken cancellationToken = default) + { + throw new NotImplementedException(); + } + } +} -- libgit2 0.25.0