From 592237d8534fcce19ea1e1e4e3b2f87b012b9936 Mon Sep 17 00:00:00 2001 From: 丁松杰 <377973147@qq.com> Date: Fri, 14 Feb 2020 12:11:38 +0800 Subject: [PATCH] 修复bugs --- src/Pole.Core/EventBus/Bus.cs | 24 ++++++++++-------------- src/Pole.Core/EventBus/IBus.cs | 5 ++++- src/Pole.Core/EventBus/Transaction/IDbTransactionAdapter.cs | 5 +++-- src/Pole.Core/UnitOfWork/IUnitOfWork.cs | 19 +++++++++++++++++++ src/Pole.Core/UnitOfWork/UnitOfWork.cs | 68 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/Pole.EventStorage.PostgreSql/EnlistBusExtensions.cs | 29 ----------------------------- src/Pole.EventStorage.PostgreSql/PostgreSqlDbTransactionAdapter.cs | 10 ++++++++-- 7 files changed, 112 insertions(+), 48 deletions(-) create mode 100644 src/Pole.Core/UnitOfWork/IUnitOfWork.cs create mode 100644 src/Pole.Core/UnitOfWork/UnitOfWork.cs delete mode 100644 src/Pole.EventStorage.PostgreSql/EnlistBusExtensions.cs diff --git a/src/Pole.Core/EventBus/Bus.cs b/src/Pole.Core/EventBus/Bus.cs index 8466a84..fadc789 100644 --- a/src/Pole.Core/EventBus/Bus.cs +++ b/src/Pole.Core/EventBus/Bus.cs @@ -5,6 +5,7 @@ using Pole.Core.EventBus.Transaction; using Pole.Core.Serialization; using Pole.Core.Utils.Abstraction; using System; +using System.Collections.Concurrent; using System.Collections.Generic; using System.Text; using System.Threading; @@ -14,7 +15,6 @@ namespace Pole.Core.EventBus { class Bus : IBus { - private readonly IProducer producer; private readonly IEventTypeFinder eventTypeFinder; private readonly ISerializer serializer; private readonly ISnowflakeIdGenerator snowflakeIdGenerator; @@ -22,11 +22,12 @@ namespace Pole.Core.EventBus public IDbTransactionAdapter Transaction { get; set; } public IServiceProvider ServiceProvider { get; } + public BlockingCollection PrePublishEventBuffer { get; } = + new BlockingCollection(new ConcurrentQueue()); - public Bus(IServiceProvider serviceProvider, IProducer producer, IEventTypeFinder eventTypeFinder, ISerializer serializer, ISnowflakeIdGenerator snowflakeIdGenerator, IEventStorage eventStorage) + public Bus(IServiceProvider serviceProvider, IEventTypeFinder eventTypeFinder, ISerializer serializer, ISnowflakeIdGenerator snowflakeIdGenerator, IEventStorage eventStorage) { ServiceProvider = serviceProvider; - this.producer = producer; this.eventTypeFinder = eventTypeFinder; this.serializer = serializer; this.snowflakeIdGenerator = snowflakeIdGenerator; @@ -37,10 +38,10 @@ namespace Pole.Core.EventBus var eventType = @event.GetType(); var eventTypeCode = eventTypeFinder.GetCode(eventType); var eventId = snowflakeIdGenerator.NextId(); - var eventContentBytes = serializer.SerializeToUtf8Bytes(@event, eventType); + //var eventContentBytes = serializer.SerializeToUtf8Bytes(@event, eventType); var eventContent = serializer.Serialize(@event, eventType); - var bytesTransport = new EventBytesTransport(eventTypeCode, eventId, eventContentBytes); - var bytes = bytesTransport.GetBytes(); + //var bytesTransport = new EventBytesTransport(eventTypeCode, eventId, eventContentBytes); + //var bytes = bytesTransport.GetBytes(); var eventEntity = new EventEntity { Added = DateTime.UtcNow, @@ -59,15 +60,10 @@ namespace Pole.Core.EventBus { var mediumMessage = await eventStorage.StoreMessage(eventEntity, Transaction.DbTransaction); - if (Transaction.AutoCommit) - { - await Transaction.CommitAsync(); - } } - - await producer.Publish(bytes); - - await eventStorage.ChangePublishStateAsync(eventEntity,EventStatus.Published); + PrePublishEventBuffer.Add(eventEntity); + //await producer.Publish(bytes); + //await eventStorage.ChangePublishStateAsync(eventEntity,EventStatus.Published); return true; } diff --git a/src/Pole.Core/EventBus/IBus.cs b/src/Pole.Core/EventBus/IBus.cs index 2c8cd2b..6906448 100644 --- a/src/Pole.Core/EventBus/IBus.cs +++ b/src/Pole.Core/EventBus/IBus.cs @@ -1,5 +1,7 @@ -using Pole.Core.EventBus.Transaction; +using Pole.Core.EventBus.EventStorage; +using Pole.Core.EventBus.Transaction; using System; +using System.Collections.Concurrent; using System.Collections.Generic; using System.Text; using System.Threading; @@ -11,6 +13,7 @@ namespace Pole.Core.EventBus { IServiceProvider ServiceProvider { get; } IDbTransactionAdapter Transaction { get; set; } + BlockingCollection PrePublishEventBuffer { get; } Task Publish(object @event, CancellationToken cancellationToken = default); } } diff --git a/src/Pole.Core/EventBus/Transaction/IDbTransactionAdapter.cs b/src/Pole.Core/EventBus/Transaction/IDbTransactionAdapter.cs index dd7fe98..86a5ede 100644 --- a/src/Pole.Core/EventBus/Transaction/IDbTransactionAdapter.cs +++ b/src/Pole.Core/EventBus/Transaction/IDbTransactionAdapter.cs @@ -1,4 +1,6 @@ -using System; +using Pole.Core.EventBus.EventStorage; +using System; +using System.Collections.Concurrent; using System.Collections.Generic; using System.Text; using System.Threading; @@ -10,7 +12,6 @@ namespace Pole.Core.EventBus.Transaction { Task CommitAsync(CancellationToken cancellationToken = default); Task RollbackAsync(CancellationToken cancellationToken = default); - bool AutoCommit { get; set; } object DbTransaction { get; set; } } } diff --git a/src/Pole.Core/UnitOfWork/IUnitOfWork.cs b/src/Pole.Core/UnitOfWork/IUnitOfWork.cs new file mode 100644 index 0000000..24f5b7a --- /dev/null +++ b/src/Pole.Core/UnitOfWork/IUnitOfWork.cs @@ -0,0 +1,19 @@ +using Pole.Core.EventBus; +using Pole.Core.EventBus.Transaction; +using System; +using System.Collections.Generic; +using System.Data; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Extensions.DependencyInjection; + +namespace Pole.Core.UnitOfWork +{ + public interface IUnitOfWork : IDisposable + { + Task CompeleteAsync(CancellationToken cancellationToken = default); + Task Rollback(CancellationToken cancellationToken = default); + IUnitOfWork Enlist(IDbTransaction dbTransaction, IBus bus); + } +} diff --git a/src/Pole.Core/UnitOfWork/UnitOfWork.cs b/src/Pole.Core/UnitOfWork/UnitOfWork.cs new file mode 100644 index 0000000..abd1a6f --- /dev/null +++ b/src/Pole.Core/UnitOfWork/UnitOfWork.cs @@ -0,0 +1,68 @@ +using Pole.Core.EventBus; +using Pole.Core.EventBus.Transaction; +using System; +using System.Collections.Generic; +using System.Data; +using System.Linq; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Extensions.DependencyInjection; +using Pole.Core.Abstraction; +using Pole.Core.Serialization; +using Pole.Core.EventBus.Event; +using Pole.Core.EventBus.EventStorage; + +namespace Pole.Core.UnitOfWork +{ + class UnitOfWork : IUnitOfWork + { + private readonly IProducer producer; + private readonly IEventTypeFinder eventTypeFinder; + private readonly ISerializer serializer; + private readonly IEventStorage eventStorage; + private IBus bus; + public UnitOfWork(IProducer producer, IEventTypeFinder eventTypeFinder, ISerializer serializer, IEventStorage eventStorage) + { + this.producer = producer; + this.eventTypeFinder = eventTypeFinder; + this.serializer = serializer; + this.eventStorage = eventStorage; + } + + public async Task CompeleteAsync(CancellationToken cancellationToken = default) + { + + await bus.Transaction.CommitAsync(); + + var bufferedEvents = bus.PrePublishEventBuffer.ToList(); + bufferedEvents.ForEach(async @event => + { + var eventType = eventTypeFinder.FindType(@event.Name); + var eventContentBytes = serializer.SerializeToUtf8Bytes(@event, eventType); + var bytesTransport = new EventBytesTransport(@event.Name, @event.Id, eventContentBytes); + var bytes = bytesTransport.GetBytes(); + await producer.Publish(bytes); + await eventStorage.ChangePublishStateAsync(@event, EventStatus.Published); + }); + } + + public void Dispose() + { + bus = null; + } + + public IUnitOfWork Enlist(IDbTransaction dbTransaction, IBus bus) + { + bus.Transaction = bus.ServiceProvider.GetService(); + bus.Transaction.DbTransaction = dbTransaction; + this.bus = bus; + return this; + } + + public Task Rollback(CancellationToken cancellationToken = default) + { + return bus.Transaction.RollbackAsync(); + } + } +} diff --git a/src/Pole.EventStorage.PostgreSql/EnlistBusExtensions.cs b/src/Pole.EventStorage.PostgreSql/EnlistBusExtensions.cs deleted file mode 100644 index c15c4d4..0000000 --- a/src/Pole.EventStorage.PostgreSql/EnlistBusExtensions.cs +++ /dev/null @@ -1,29 +0,0 @@ -using Pole.Core.EventBus; -using Pole.Core.EventBus.Transaction; -using System; -using System.Collections.Generic; -using System.Data; -using System.Text; -using Microsoft.Extensions.DependencyInjection; -using Microsoft.EntityFrameworkCore.Storage; - -namespace Pole.EventStorage.PostgreSql -{ - public static class EnlistBusExtensions - { - public static IDbTransaction EnlistBus(this IDbTransaction dbTransaction, IBus bus, bool autoCommit = false) - { - bus.Transaction = bus.ServiceProvider.GetService(); - bus.Transaction.DbTransaction = dbTransaction; - bus.Transaction.AutoCommit = autoCommit; - return dbTransaction; - } - public static IDbContextTransaction EnlistBus(this IDbContextTransaction dbContextTransaction, IBus bus, bool autoCommit = false) - { - bus.Transaction = bus.ServiceProvider.GetService(); - bus.Transaction.DbTransaction = dbContextTransaction; - bus.Transaction.AutoCommit = autoCommit; - return dbContextTransaction; - } - } -} diff --git a/src/Pole.EventStorage.PostgreSql/PostgreSqlDbTransactionAdapter.cs b/src/Pole.EventStorage.PostgreSql/PostgreSqlDbTransactionAdapter.cs index ed114e5..506ad03 100644 --- a/src/Pole.EventStorage.PostgreSql/PostgreSqlDbTransactionAdapter.cs +++ b/src/Pole.EventStorage.PostgreSql/PostgreSqlDbTransactionAdapter.cs @@ -1,8 +1,15 @@ using Microsoft.EntityFrameworkCore.Storage; +using Pole.Core.Abstraction; +using Pole.Core.EventBus; +using Pole.Core.EventBus.Event; +using Pole.Core.EventBus.EventStorage; using Pole.Core.EventBus.Transaction; +using Pole.Core.Serialization; using System; +using System.Collections.Concurrent; using System.Collections.Generic; using System.Data; +using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; @@ -11,8 +18,7 @@ namespace Pole.EventStorage.PostgreSql { class PostgreSqlDbTransactionAdapter : IDbTransactionAdapter { - public bool AutoCommit { get => throw new NotImplementedException(); set => throw new NotImplementedException(); } - public object DbTransaction { get => throw new NotImplementedException(); set => throw new NotImplementedException(); } + public object DbTransaction { get; set; } public async Task CommitAsync(CancellationToken cancellationToken = default) { -- libgit2 0.25.0