Commit ac247fc1 by 丁松杰

添加可靠消息部分代码

parent 988776d9
......@@ -49,6 +49,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Pole.EventBus.Rabbitmq", "s
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Pole.Core.Test", "test\Pole.Core.Test\Pole.Core.Test.csproj", "{23EA8735-DB2E-4599-8902-8FCBCBE4799C}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Pole.EventStorage.PostgreSql", "src\Pole.EventStorage.PostgreSql\Pole.EventStorage.PostgreSql.csproj", "{548EFDBB-252F-48DD-87F4-58ABFBD4963C}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
......@@ -123,6 +125,10 @@ Global
{23EA8735-DB2E-4599-8902-8FCBCBE4799C}.Debug|Any CPU.Build.0 = Debug|Any CPU
{23EA8735-DB2E-4599-8902-8FCBCBE4799C}.Release|Any CPU.ActiveCfg = Release|Any CPU
{23EA8735-DB2E-4599-8902-8FCBCBE4799C}.Release|Any CPU.Build.0 = Release|Any CPU
{548EFDBB-252F-48DD-87F4-58ABFBD4963C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{548EFDBB-252F-48DD-87F4-58ABFBD4963C}.Debug|Any CPU.Build.0 = Debug|Any CPU
{548EFDBB-252F-48DD-87F4-58ABFBD4963C}.Release|Any CPU.ActiveCfg = Release|Any CPU
{548EFDBB-252F-48DD-87F4-58ABFBD4963C}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
......@@ -148,6 +154,7 @@ Global
{9C0DFC90-1AF9-424A-B5FB-2A7C3611970C} = {74422E64-29FE-4287-A86E-741D1DFF6698}
{BDF62A19-FFBD-4EE1-A07A-68472E680A95} = {9932C965-8B38-4F70-9E43-86DC56860E2B}
{23EA8735-DB2E-4599-8902-8FCBCBE4799C} = {655E719B-4A3E-467C-A541-E0770AB81DE1}
{548EFDBB-252F-48DD-87F4-58ABFBD4963C} = {9932C965-8B38-4F70-9E43-86DC56860E2B}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {DB0775A3-F293-4043-ADB7-72BAC081E87E}
......
using Pole.Core.Abstraction;
using Pole.Core.EventBus.Event;
using Pole.Core.EventBus.Transaction;
using Pole.Core.Serialization;
using Pole.Core.Utils.Abstraction;
using System;
......@@ -16,6 +17,7 @@ namespace Pole.Core.EventBus
private readonly IEventTypeFinder eventTypeFinder;
private readonly ISerializer serializer;
private readonly ISnowflakeIdGenerator snowflakeIdGenerator;
AsyncLocal<IDbTransactionAdapter> Transaction { get; }
public Bus(IProducer producer, IEventTypeFinder eventTypeFinder, ISerializer serializer, ISnowflakeIdGenerator snowflakeIdGenerator)
{
this.producer = producer;
......
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Pole.Core.EventBus.Transaction
{
public interface IDbTransactionAdapter : IDisposable
{
Task CommitAsync(CancellationToken cancellationToken = default);
Task RollbackAsync(CancellationToken cancellationToken = default);
bool AutoCommit { get; set; }
object DbTransaction { get; set; }
}
}
using System;
namespace Pole.EventStorage.PostgreSql
{
public class Class1
{
}
}
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>netstandard2.1</TargetFramework>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Npgsql.EntityFrameworkCore.PostgreSQL" Version="3.1.1.2" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Pole.Core\Pole.Core.csproj" />
</ItemGroup>
</Project>
using Microsoft.EntityFrameworkCore.Storage;
using Pole.Core.EventBus.Transaction;
using System;
using System.Collections.Generic;
using System.Data;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Pole.EventStorage.PostgreSql
{
class PostgreSqlDbTransactionAdapter : IDbTransactionAdapter
{
public bool AutoCommit { get => throw new NotImplementedException(); set => throw new NotImplementedException(); }
public object DbTransaction { get => throw new NotImplementedException(); set => throw new NotImplementedException(); }
public async Task CommitAsync(CancellationToken cancellationToken = default)
{
switch (DbTransaction)
{
case IDbTransaction dbTransaction:
dbTransaction.Commit();
break;
case IDbContextTransaction dbContextTransaction:
await dbContextTransaction.CommitAsync(cancellationToken);
break;
}
}
public void Dispose()
{
throw new NotImplementedException();
}
public Task RollbackAsync(CancellationToken cancellationToken = default)
{
throw new NotImplementedException();
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment