diff --git a/samples/apis/Product.Api/Application/CommandHandler/AddProductTypeCommandHandler.cs b/samples/apis/Product.Api/Application/CommandHandler/AddProductTypeCommandHandler.cs index 0a76490..38d6c65 100644 --- a/samples/apis/Product.Api/Application/CommandHandler/AddProductTypeCommandHandler.cs +++ b/samples/apis/Product.Api/Application/CommandHandler/AddProductTypeCommandHandler.cs @@ -17,30 +17,29 @@ namespace Product.Api.Application.CommandHandler public class AddProductTypeCommandHandler : ICommandHandler, CommonCommandResponse> { private readonly IProductTypeRepository _productTypeRepository; - private readonly IUnitOfWorkManager _unitOfWorkManager; - public AddProductTypeCommandHandler(IProductTypeRepository productTypeRepository, IUnitOfWorkManager unitOfWorkManager) + private readonly IUnitOfWork _unitOfWork; + public AddProductTypeCommandHandler(IProductTypeRepository productTypeRepository, IUnitOfWork unitOfWork) { _productTypeRepository = productTypeRepository; - _unitOfWorkManager = unitOfWorkManager; + _unitOfWork = unitOfWork; } public async Task Handle(Command request, CancellationToken cancellationToken) { var productType = new Domain.ProductTypeAggregate.ProductType(request.Data.Id, request.Data.Name); - - _productTypeRepository.Add(productType); + + _productTypeRepository.Add(productType); ProductTypeAddedDomainEvent productTypeAddedDomainEvent = new ProductTypeAddedDomainEvent { ProductTypeId = productType.Id, ProductTypeName = productType.Name }; - using(var unitOfWork= await _unitOfWorkManager.BeginUnitOfWork()) - { - productType.AddDomainEvent(productTypeAddedDomainEvent); - var result = await _productTypeRepository.SaveEntitiesAsync(); - await unitOfWork.CompeleteAsync(); - return CommonCommandResponse.SuccessResponse; - } + productType.AddDomainEvent(productTypeAddedDomainEvent); + var result = await _productTypeRepository.SaveEntitiesAsync(); + + await _unitOfWork.Compelete(); + return CommonCommandResponse.SuccessResponse; + } } } diff --git a/src/Pole.Application/EventBus/DefaultReliableMessageScopedBuffer.cs b/src/Pole.Application/EventBus/DefaultReliableMessageScopedBuffer.cs new file mode 100644 index 0000000..5e4f1fd --- /dev/null +++ b/src/Pole.Application/EventBus/DefaultReliableMessageScopedBuffer.cs @@ -0,0 +1,21 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Text; + +namespace Pole.Application.EventBus +{ + public class DefaultReliableMessageScopedBuffer : IReliableMessageScopedBuffer + { + public ConcurrentBag EventEntries = new ConcurrentBag(); + public void Add(EventEntry eventEntry) + { + EventEntries.Add(eventEntry); + } + public IEnumerable GetAll() + { + return EventEntries.AsEnumerable(); + } + } +} diff --git a/src/Pole.Application/EventBus/EventEntry.cs b/src/Pole.Application/EventBus/EventEntry.cs new file mode 100644 index 0000000..677c96e --- /dev/null +++ b/src/Pole.Application/EventBus/EventEntry.cs @@ -0,0 +1,19 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace Pole.Application.EventBus +{ + public class EventEntry + { + public object Event { get;private set; } + public object CallbackParemeter { get; private set; } + public string PrePublishEventId { get; set; } + public bool IsPublished { get; set; } + public EventEntry(object @event,object callbackParemeter) + { + Event = @event; + CallbackParemeter = callbackParemeter; + } + } +} diff --git a/src/Pole.Domain/UnitOfWork/IUnitOfWorkManager.cs b/src/Pole.Application/EventBus/IEventBus.cs similarity index 72% rename from src/Pole.Domain/UnitOfWork/IUnitOfWorkManager.cs rename to src/Pole.Application/EventBus/IEventBus.cs index ba9afdf..3510334 100644 --- a/src/Pole.Domain/UnitOfWork/IUnitOfWorkManager.cs +++ b/src/Pole.Application/EventBus/IEventBus.cs @@ -4,10 +4,10 @@ using System.Text; using System.Threading; using System.Threading.Tasks; -namespace Pole.Domain.UnitOfWork +namespace Pole.Application.EventBus { - public interface IUnitOfWorkManager + public interface IEventBus { - Task BeginUnitOfWork(); + Task Publish(TReliableEvent @event, object callbackParemeter, CancellationToken cancellationToken = default); } } diff --git a/src/Pole.Application/EventBus/IReliableMessageScopedBuffer.cs b/src/Pole.Application/EventBus/IReliableMessageScopedBuffer.cs new file mode 100644 index 0000000..9c5675f --- /dev/null +++ b/src/Pole.Application/EventBus/IReliableMessageScopedBuffer.cs @@ -0,0 +1,12 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace Pole.Application.EventBus +{ + public interface IReliableMessageScopedBuffer + { + void Add(EventEntry eventEntry); + IEnumerable GetAll(); + } +} diff --git a/src/Pole.Application/EventBus/ReliableEventBus.cs b/src/Pole.Application/EventBus/ReliableEventBus.cs new file mode 100644 index 0000000..30d1366 --- /dev/null +++ b/src/Pole.Application/EventBus/ReliableEventBus.cs @@ -0,0 +1,24 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace Pole.Application.EventBus +{ + public class ReliableEventBus : IEventBus + { + private readonly IReliableMessageScopedBuffer _reliableMessageScopedBuffer; + + public ReliableEventBus(IReliableMessageScopedBuffer reliableMessageScopedBuffer) + { + _reliableMessageScopedBuffer = reliableMessageScopedBuffer; + } + + public Task Publish(TReliableEvent @event, object callbackParemeter, CancellationToken cancellationToken = default) + { + _reliableMessageScopedBuffer.Add(new EventEntry(@event, callbackParemeter)); + return Task.FromResult(1); + } + } +} diff --git a/src/Pole.Application/EventBus/ReliableMessageTransactionWorker.cs b/src/Pole.Application/EventBus/ReliableMessageTransactionWorker.cs new file mode 100644 index 0000000..d6c7362 --- /dev/null +++ b/src/Pole.Application/EventBus/ReliableMessageTransactionWorker.cs @@ -0,0 +1,83 @@ +using Pole.Domain.UnitOfWork; +using Pole.ReliableMessage.Abstraction; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace Pole.Application.EventBus +{ + public class ReliableMessageTransactionWorker : IWorker + { + private readonly IReliableMessageScopedBuffer _reliableMessageScopedBuffer; + private readonly IReliableBus _reliableBus; + + public ReliableMessageTransactionWorker(IReliableMessageScopedBuffer reliableMessageScopedBuffer, IReliableBus reliableBus) + { + _reliableMessageScopedBuffer = reliableMessageScopedBuffer; + _reliableBus = reliableBus; + } + + public int Order => 200; + + public WorkerStatus WorkerStatus { get; set; } + + public Task Commit(CancellationToken cancellationToken = default) + { + var events = _reliableMessageScopedBuffer.GetAll(); + try + { + events.ToList().ForEach(async @event => + { + await _reliableBus.Publish(@event.Event, @event.PrePublishEventId, cancellationToken); + @event.IsPublished = true; + }); + } + catch (Exception ex) + { + + if (events.Count(@event => @event.IsPublished) > 1) + { + //这里发布失败 通过预发送 后的重试机制去处理, 因为一旦有一个消息发出去后 无法挽回 + return Task.FromResult(1); + } + else + { + // 这里抛出错误 ,统一工作单元拦截后会 回滚整个工作单元 + throw ex; + } + } + WorkerStatus = WorkerStatus.Commited; + return Task.FromResult(1); + } + + public void Dispose() + { + + } + + public async Task PreCommit(CancellationToken cancellationToken = default) + { + var events = _reliableMessageScopedBuffer.GetAll(); + foreach (var @event in events) + { + @event.PrePublishEventId = await _reliableBus.PrePublish(@event.Event, @event.PrePublishEventId, cancellationToken); + } + WorkerStatus = WorkerStatus.PreCommited; + } + + public Task Rollback(CancellationToken cancellationToken = default) + { + var events = _reliableMessageScopedBuffer.GetAll(); + events.Where(m => !string.IsNullOrEmpty(m.PrePublishEventId)).ToList().ForEach(async @event => + { + await _reliableBus.Cancel(@event.PrePublishEventId, cancellationToken); + @event.IsPublished = true; + }); + WorkerStatus = WorkerStatus.Rollbacked; + return Task.FromResult(1); + } + } +} diff --git a/src/Pole.Application/Pole.Application.csproj b/src/Pole.Application/Pole.Application.csproj index 4021574..388996d 100644 --- a/src/Pole.Application/Pole.Application.csproj +++ b/src/Pole.Application/Pole.Application.csproj @@ -10,6 +10,8 @@ + + diff --git a/src/Pole.Application/ServiceCollectionExtensions.cs b/src/Pole.Application/ServiceCollectionExtensions.cs index da6b124..232cb75 100644 --- a/src/Pole.Application/ServiceCollectionExtensions.cs +++ b/src/Pole.Application/ServiceCollectionExtensions.cs @@ -7,6 +7,8 @@ using Pole.Application.Cqrs; using Pole.Application.Cqrs.Internal; using Pole.Application.Command; using Pole.Application; +using Pole.Domain.UnitOfWork; +using Pole.Application.EventBus; namespace Microsoft.Extensions.DependencyInjection { @@ -15,10 +17,14 @@ namespace Microsoft.Extensions.DependencyInjection public static IServiceCollection AddPole(this IServiceCollection services, Action config) { PoleOptions poleOptions = new PoleOptions(services); - config(poleOptions); services.AddScoped(); + services.AddScoped(); + services.AddScoped(); + services.AddScoped(); + + services.AddScoped(); return services; } diff --git a/src/Pole.Domain.EntityframeworkCore/ServiceCollectionExtension.cs b/src/Pole.Domain.EntityframeworkCore/ServiceCollectionExtension.cs index f288914..d55aa5d 100644 --- a/src/Pole.Domain.EntityframeworkCore/ServiceCollectionExtension.cs +++ b/src/Pole.Domain.EntityframeworkCore/ServiceCollectionExtension.cs @@ -14,7 +14,7 @@ namespace Microsoft.Extensions.DependencyInjection { public static PoleOptions AddPoleEntityFrameworkCoreDomain(this PoleOptions options) { - options.Services.AddScoped(); + options.Services.AddScoped(); return options; } } diff --git a/src/Pole.Domain.EntityframeworkCore/UnitOfWork/EntityFrameworkCoreUnitOfWork.cs b/src/Pole.Domain.EntityframeworkCore/UnitOfWork/EntityFrameworkCoreTransactionWorker.cs similarity index 53% rename from src/Pole.Domain.EntityframeworkCore/UnitOfWork/EntityFrameworkCoreUnitOfWork.cs rename to src/Pole.Domain.EntityframeworkCore/UnitOfWork/EntityFrameworkCoreTransactionWorker.cs index 236437e..1adba60 100644 --- a/src/Pole.Domain.EntityframeworkCore/UnitOfWork/EntityFrameworkCoreUnitOfWork.cs +++ b/src/Pole.Domain.EntityframeworkCore/UnitOfWork/EntityFrameworkCoreTransactionWorker.cs @@ -1,4 +1,6 @@ -using Microsoft.EntityFrameworkCore.Storage; +using Microsoft.EntityFrameworkCore; +using Microsoft.EntityFrameworkCore.Storage; +using Microsoft.Extensions.DependencyInjection; using Pole.Domain.UnitOfWork; using System; using System.Collections.Generic; @@ -8,26 +10,41 @@ using System.Threading.Tasks; namespace Pole.Domain.EntityframeworkCore.UnitOfWork { - public class EntityFrameworkCoreUnitOfWork : IUnitOfWork + public class EntityFrameworkCoreTransactionWorker : IWorker { private readonly IDbContextTransaction _dbContextTransaction; - public EntityFrameworkCoreUnitOfWork(IDbContextTransaction dbContextTransaction) + public EntityFrameworkCoreTransactionWorker(DbContextOptions dbContextOptions, IServiceProvider serviceProvider) { - _dbContextTransaction = dbContextTransaction; + var dbContext = serviceProvider.GetRequiredService(dbContextOptions.ContextType) as DbContext; + _dbContextTransaction = dbContext.Database.BeginTransaction(); } - public Task CompeleteAsync(CancellationToken cancellationToken = default) + + public int Order => 100; + + public WorkerStatus WorkerStatus { get; set; } + + public async Task Commit(CancellationToken cancellationToken = default) { - return _dbContextTransaction.CommitAsync(cancellationToken); + await _dbContextTransaction.CommitAsync(cancellationToken); + WorkerStatus = WorkerStatus.Commited; } public void Dispose() { - _dbContextTransaction?.Dispose(); + // 无需手动dispose + //_dbContextTransaction?.Dispose(); + } + + public Task PreCommit(CancellationToken cancellationToken = default) + { + WorkerStatus = WorkerStatus.PostCommited; + return Task.FromResult(1); } - public Task RollbackAsync(CancellationToken cancellationToken = default) + public async Task Rollback(CancellationToken cancellationToken = default) { - return _dbContextTransaction.RollbackAsync(); + await _dbContextTransaction.RollbackAsync(); + WorkerStatus = WorkerStatus.Rollbacked; } } } diff --git a/src/Pole.Domain.EntityframeworkCore/UnitOfWork/EntityFrameworkCoreUnitOfWorkManager.cs b/src/Pole.Domain.EntityframeworkCore/UnitOfWork/EntityFrameworkCoreUnitOfWorkManager.cs deleted file mode 100644 index 8377706..0000000 --- a/src/Pole.Domain.EntityframeworkCore/UnitOfWork/EntityFrameworkCoreUnitOfWorkManager.cs +++ /dev/null @@ -1,26 +0,0 @@ -using Microsoft.EntityFrameworkCore; -using Microsoft.Extensions.DependencyInjection; -using Pole.Domain.UnitOfWork; -using System; -using System.Collections.Generic; -using System.Text; -using System.Threading.Tasks; - -namespace Pole.Domain.EntityframeworkCore.UnitOfWork -{ - public class EntityFrameworkCoreUnitOfWorkManager : IUnitOfWorkManager - { - private readonly DbContext _dbContext; - public EntityFrameworkCoreUnitOfWorkManager(DbContextOptions dbContextOptions, IServiceProvider serviceProvider) - { - _dbContext = serviceProvider.GetRequiredService(dbContextOptions.ContextType) as DbContext; - } - public async Task BeginUnitOfWork() - { - var transaction = await _dbContext.Database.BeginTransactionAsync(); - EntityFrameworkCoreUnitOfWork entityFrameworkCoreUnitOfWork = new EntityFrameworkCoreUnitOfWork(transaction); - - return entityFrameworkCoreUnitOfWork; - } - } -} diff --git a/src/Pole.Domain/UnitOfWork/DefaultUnitOfWork.cs b/src/Pole.Domain/UnitOfWork/DefaultUnitOfWork.cs new file mode 100644 index 0000000..61cfb6d --- /dev/null +++ b/src/Pole.Domain/UnitOfWork/DefaultUnitOfWork.cs @@ -0,0 +1,54 @@ +using Microsoft.Extensions.DependencyInjection; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace Pole.Domain.UnitOfWork +{ + public class DefaultUnitOfWork : IUnitOfWork + { + private readonly List _workers; + public DefaultUnitOfWork(IServiceProvider serviceProvider) + { + _workers = serviceProvider.GetServices().ToList(); + } + public Task Compelete(CancellationToken cancellationToken = default) + { + _workers.OrderBy(worker => worker.Order).ToList().ForEach(async worker => + { + await worker.PreCommit(); + }); + try + { + _workers.OrderBy(worker => worker.Order).ToList().ForEach(async worker => + { + await worker.Commit(); + }); + } + catch (Exception ex) + { + _workers.OrderBy(worker => worker.Order).Where(worker => worker.WorkerStatus == WorkerStatus.Commited).ToList().ForEach(async worker => + { + await worker.Rollback(); + }); + throw ex; + } + return Task.FromResult(1); + } + + public void Dispose() + { + // Workers 都是 scoped 的 每次请求结束后 会自动 dispose 所以这里不需要 调用 Workers 的 dispose + //_workers.OrderBy(worker => worker.Order).ToList().ForEach(m => m.Dispose()); + } + + public Task Rollback(CancellationToken cancellationToken = default) + { + _workers.OrderBy(worker => worker.Order).ToList().ForEach(m => m.Rollback()); + return Task.FromResult(1); + } + } +} diff --git a/src/Pole.Domain/UnitOfWork/IUnitOfWork.cs b/src/Pole.Domain/UnitOfWork/IUnitOfWork.cs index 273b023..3f20e77 100644 --- a/src/Pole.Domain/UnitOfWork/IUnitOfWork.cs +++ b/src/Pole.Domain/UnitOfWork/IUnitOfWork.cs @@ -8,8 +8,8 @@ namespace Pole.Domain.UnitOfWork { public interface IUnitOfWork : IDisposable { - Task CompeleteAsync(CancellationToken cancellationToken = default); - Task RollbackAsync(CancellationToken cancellationToken = default); + Task Compelete(CancellationToken cancellationToken = default); + Task Rollback(CancellationToken cancellationToken = default); } } diff --git a/src/Pole.Domain/UnitOfWork/IWorker.cs b/src/Pole.Domain/UnitOfWork/IWorker.cs new file mode 100644 index 0000000..8de416f --- /dev/null +++ b/src/Pole.Domain/UnitOfWork/IWorker.cs @@ -0,0 +1,17 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace Pole.Domain.UnitOfWork +{ + public interface IWorker : IDisposable + { + int Order { get; } + WorkerStatus WorkerStatus { get; } + Task PreCommit(CancellationToken cancellationToken = default); + Task Commit(CancellationToken cancellationToken = default); + Task Rollback(CancellationToken cancellationToken = default); + } +} diff --git a/src/Pole.Domain/UnitOfWork/WorkerStatus.cs b/src/Pole.Domain/UnitOfWork/WorkerStatus.cs new file mode 100644 index 0000000..8c34e69 --- /dev/null +++ b/src/Pole.Domain/UnitOfWork/WorkerStatus.cs @@ -0,0 +1,15 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace Pole.Domain.UnitOfWork +{ + public enum WorkerStatus + { + Init = 0, + PreCommited = 1, + Commited = 2, + PostCommited = 3, + Rollbacked = 4 + } +} diff --git a/src/Pole.ReliableMessage/Abstraction/IReliableBus.cs b/src/Pole.ReliableMessage/Abstraction/IReliableBus.cs index 9eb37d9..4fdaeea 100644 --- a/src/Pole.ReliableMessage/Abstraction/IReliableBus.cs +++ b/src/Pole.ReliableMessage/Abstraction/IReliableBus.cs @@ -10,6 +10,6 @@ namespace Pole.ReliableMessage.Abstraction { Task PrePublish(TReliableEvent @event,object callbackParemeter, CancellationToken cancellationToken = default); Task Publish(TReliableEvent @event,string prePublishMessageId, CancellationToken cancellationToken=default); - Task Cancel(TReliableEvent @event, string prePublishMessageId, CancellationToken cancellationToken = default); + Task Cancel(string prePublishMessageId, CancellationToken cancellationToken = default); } } diff --git a/src/Pole.ReliableMessage/EventBus/DefaultReliableBus.cs b/src/Pole.ReliableMessage/EventBus/DefaultReliableBus.cs index 2f4454d..eab5425 100644 --- a/src/Pole.ReliableMessage/EventBus/DefaultReliableBus.cs +++ b/src/Pole.ReliableMessage/EventBus/DefaultReliableBus.cs @@ -33,7 +33,7 @@ namespace Pole.Pole.ReliableMessage.EventBus _messageTypeIdGenerator = messageTypeIdGenerator; } - public Task Cancel(TReliableEvent @event, string prePublishMessageId, CancellationToken cancellationToken = default) + public Task Cancel(string prePublishMessageId, CancellationToken cancellationToken = default) { try {