Commit fe4ff39e by dingsongjie

完善 demo

parent 2f5c7d8c
......@@ -17,30 +17,29 @@ namespace Product.Api.Application.CommandHandler
public class AddProductTypeCommandHandler : ICommandHandler<Command<AddProductTypeRequest, CommonCommandResponse>, CommonCommandResponse>
{
private readonly IProductTypeRepository _productTypeRepository;
private readonly IUnitOfWorkManager _unitOfWorkManager;
public AddProductTypeCommandHandler(IProductTypeRepository productTypeRepository, IUnitOfWorkManager unitOfWorkManager)
private readonly IUnitOfWork _unitOfWork;
public AddProductTypeCommandHandler(IProductTypeRepository productTypeRepository, IUnitOfWork unitOfWork)
{
_productTypeRepository = productTypeRepository;
_unitOfWorkManager = unitOfWorkManager;
_unitOfWork = unitOfWork;
}
public async Task<CommonCommandResponse> Handle(Command<AddProductTypeRequest, CommonCommandResponse> request, CancellationToken cancellationToken)
{
var productType = new Domain.ProductTypeAggregate.ProductType(request.Data.Id, request.Data.Name);
_productTypeRepository.Add(productType);
_productTypeRepository.Add(productType);
ProductTypeAddedDomainEvent productTypeAddedDomainEvent = new ProductTypeAddedDomainEvent
{
ProductTypeId = productType.Id,
ProductTypeName = productType.Name
};
using(var unitOfWork= await _unitOfWorkManager.BeginUnitOfWork())
{
productType.AddDomainEvent(productTypeAddedDomainEvent);
var result = await _productTypeRepository.SaveEntitiesAsync();
await unitOfWork.CompeleteAsync();
return CommonCommandResponse.SuccessResponse;
}
productType.AddDomainEvent(productTypeAddedDomainEvent);
var result = await _productTypeRepository.SaveEntitiesAsync();
await _unitOfWork.Compelete();
return CommonCommandResponse.SuccessResponse;
}
}
}
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace Pole.Application.EventBus
{
public class DefaultReliableMessageScopedBuffer : IReliableMessageScopedBuffer
{
public ConcurrentBag<EventEntry> EventEntries = new ConcurrentBag<EventEntry>();
public void Add(EventEntry eventEntry)
{
EventEntries.Add(eventEntry);
}
public IEnumerable<EventEntry> GetAll()
{
return EventEntries.AsEnumerable();
}
}
}
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.Application.EventBus
{
public class EventEntry
{
public object Event { get;private set; }
public object CallbackParemeter { get; private set; }
public string PrePublishEventId { get; set; }
public bool IsPublished { get; set; }
public EventEntry(object @event,object callbackParemeter)
{
Event = @event;
CallbackParemeter = callbackParemeter;
}
}
}
......@@ -4,10 +4,10 @@ using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Pole.Domain.UnitOfWork
namespace Pole.Application.EventBus
{
public interface IUnitOfWorkManager
public interface IEventBus
{
Task<IUnitOfWork> BeginUnitOfWork();
Task Publish<TReliableEvent>(TReliableEvent @event, object callbackParemeter, CancellationToken cancellationToken = default);
}
}
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.Application.EventBus
{
public interface IReliableMessageScopedBuffer
{
void Add(EventEntry eventEntry);
IEnumerable<EventEntry> GetAll();
}
}
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Pole.Application.EventBus
{
public class ReliableEventBus : IEventBus
{
private readonly IReliableMessageScopedBuffer _reliableMessageScopedBuffer;
public ReliableEventBus(IReliableMessageScopedBuffer reliableMessageScopedBuffer)
{
_reliableMessageScopedBuffer = reliableMessageScopedBuffer;
}
public Task Publish<TReliableEvent>(TReliableEvent @event, object callbackParemeter, CancellationToken cancellationToken = default)
{
_reliableMessageScopedBuffer.Add(new EventEntry(@event, callbackParemeter));
return Task.FromResult(1);
}
}
}
using Pole.Domain.UnitOfWork;
using Pole.ReliableMessage.Abstraction;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Pole.Application.EventBus
{
public class ReliableMessageTransactionWorker : IWorker
{
private readonly IReliableMessageScopedBuffer _reliableMessageScopedBuffer;
private readonly IReliableBus _reliableBus;
public ReliableMessageTransactionWorker(IReliableMessageScopedBuffer reliableMessageScopedBuffer, IReliableBus reliableBus)
{
_reliableMessageScopedBuffer = reliableMessageScopedBuffer;
_reliableBus = reliableBus;
}
public int Order => 200;
public WorkerStatus WorkerStatus { get; set; }
public Task Commit(CancellationToken cancellationToken = default)
{
var events = _reliableMessageScopedBuffer.GetAll();
try
{
events.ToList().ForEach(async @event =>
{
await _reliableBus.Publish(@event.Event, @event.PrePublishEventId, cancellationToken);
@event.IsPublished = true;
});
}
catch (Exception ex)
{
if (events.Count(@event => @event.IsPublished) > 1)
{
//这里发布失败 通过预发送 后的重试机制去处理, 因为一旦有一个消息发出去后 无法挽回
return Task.FromResult(1);
}
else
{
// 这里抛出错误 ,统一工作单元拦截后会 回滚整个工作单元
throw ex;
}
}
WorkerStatus = WorkerStatus.Commited;
return Task.FromResult(1);
}
public void Dispose()
{
}
public async Task PreCommit(CancellationToken cancellationToken = default)
{
var events = _reliableMessageScopedBuffer.GetAll();
foreach (var @event in events)
{
@event.PrePublishEventId = await _reliableBus.PrePublish(@event.Event, @event.PrePublishEventId, cancellationToken);
}
WorkerStatus = WorkerStatus.PreCommited;
}
public Task Rollback(CancellationToken cancellationToken = default)
{
var events = _reliableMessageScopedBuffer.GetAll();
events.Where(m => !string.IsNullOrEmpty(m.PrePublishEventId)).ToList().ForEach(async @event =>
{
await _reliableBus.Cancel(@event.PrePublishEventId, cancellationToken);
@event.IsPublished = true;
});
WorkerStatus = WorkerStatus.Rollbacked;
return Task.FromResult(1);
}
}
}
......@@ -10,6 +10,8 @@
<ItemGroup>
<ProjectReference Include="..\Pole.Core\Pole.Core.csproj" />
<ProjectReference Include="..\Pole.Domain\Pole.Domain.csproj" />
<ProjectReference Include="..\Pole.ReliableMessage\Pole.ReliableMessage.csproj" />
</ItemGroup>
</Project>
......@@ -7,6 +7,8 @@ using Pole.Application.Cqrs;
using Pole.Application.Cqrs.Internal;
using Pole.Application.Command;
using Pole.Application;
using Pole.Domain.UnitOfWork;
using Pole.Application.EventBus;
namespace Microsoft.Extensions.DependencyInjection
{
......@@ -15,10 +17,14 @@ namespace Microsoft.Extensions.DependencyInjection
public static IServiceCollection AddPole(this IServiceCollection services, Action<PoleOptions> config)
{
PoleOptions poleOptions = new PoleOptions(services);
config(poleOptions);
services.AddScoped<ICommandBus, DefaultCommandBus>();
services.AddScoped<IUnitOfWork, DefaultUnitOfWork>();
services.AddScoped<IWorker, ReliableMessageTransactionWorker>();
services.AddScoped<IEventBus, ReliableEventBus>();
services.AddScoped<IReliableMessageScopedBuffer, DefaultReliableMessageScopedBuffer>();
return services;
}
......
......@@ -14,7 +14,7 @@ namespace Microsoft.Extensions.DependencyInjection
{
public static PoleOptions AddPoleEntityFrameworkCoreDomain(this PoleOptions options)
{
options.Services.AddScoped<IUnitOfWorkManager, EntityFrameworkCoreUnitOfWorkManager>();
options.Services.AddScoped<IWorker, EntityFrameworkCoreTransactionWorker>();
return options;
}
}
......
using Microsoft.EntityFrameworkCore.Storage;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Storage;
using Microsoft.Extensions.DependencyInjection;
using Pole.Domain.UnitOfWork;
using System;
using System.Collections.Generic;
......@@ -8,26 +10,41 @@ using System.Threading.Tasks;
namespace Pole.Domain.EntityframeworkCore.UnitOfWork
{
public class EntityFrameworkCoreUnitOfWork : IUnitOfWork
public class EntityFrameworkCoreTransactionWorker : IWorker
{
private readonly IDbContextTransaction _dbContextTransaction;
public EntityFrameworkCoreUnitOfWork(IDbContextTransaction dbContextTransaction)
public EntityFrameworkCoreTransactionWorker(DbContextOptions dbContextOptions, IServiceProvider serviceProvider)
{
_dbContextTransaction = dbContextTransaction;
var dbContext = serviceProvider.GetRequiredService(dbContextOptions.ContextType) as DbContext;
_dbContextTransaction = dbContext.Database.BeginTransaction();
}
public Task CompeleteAsync(CancellationToken cancellationToken = default)
public int Order => 100;
public WorkerStatus WorkerStatus { get; set; }
public async Task Commit(CancellationToken cancellationToken = default)
{
return _dbContextTransaction.CommitAsync(cancellationToken);
await _dbContextTransaction.CommitAsync(cancellationToken);
WorkerStatus = WorkerStatus.Commited;
}
public void Dispose()
{
_dbContextTransaction?.Dispose();
// 无需手动dispose
//_dbContextTransaction?.Dispose();
}
public Task PreCommit(CancellationToken cancellationToken = default)
{
WorkerStatus = WorkerStatus.PostCommited;
return Task.FromResult(1);
}
public Task RollbackAsync(CancellationToken cancellationToken = default)
public async Task Rollback(CancellationToken cancellationToken = default)
{
return _dbContextTransaction.RollbackAsync();
await _dbContextTransaction.RollbackAsync();
WorkerStatus = WorkerStatus.Rollbacked;
}
}
}
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
using Pole.Domain.UnitOfWork;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
namespace Pole.Domain.EntityframeworkCore.UnitOfWork
{
public class EntityFrameworkCoreUnitOfWorkManager : IUnitOfWorkManager
{
private readonly DbContext _dbContext;
public EntityFrameworkCoreUnitOfWorkManager(DbContextOptions dbContextOptions, IServiceProvider serviceProvider)
{
_dbContext = serviceProvider.GetRequiredService(dbContextOptions.ContextType) as DbContext;
}
public async Task<IUnitOfWork> BeginUnitOfWork()
{
var transaction = await _dbContext.Database.BeginTransactionAsync();
EntityFrameworkCoreUnitOfWork entityFrameworkCoreUnitOfWork = new EntityFrameworkCoreUnitOfWork(transaction);
return entityFrameworkCoreUnitOfWork;
}
}
}
using Microsoft.Extensions.DependencyInjection;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Pole.Domain.UnitOfWork
{
public class DefaultUnitOfWork : IUnitOfWork
{
private readonly List<IWorker> _workers;
public DefaultUnitOfWork(IServiceProvider serviceProvider)
{
_workers = serviceProvider.GetServices<IWorker>().ToList();
}
public Task Compelete(CancellationToken cancellationToken = default)
{
_workers.OrderBy(worker => worker.Order).ToList().ForEach(async worker =>
{
await worker.PreCommit();
});
try
{
_workers.OrderBy(worker => worker.Order).ToList().ForEach(async worker =>
{
await worker.Commit();
});
}
catch (Exception ex)
{
_workers.OrderBy(worker => worker.Order).Where(worker => worker.WorkerStatus == WorkerStatus.Commited).ToList().ForEach(async worker =>
{
await worker.Rollback();
});
throw ex;
}
return Task.FromResult(1);
}
public void Dispose()
{
// Workers 都是 scoped 的 每次请求结束后 会自动 dispose 所以这里不需要 调用 Workers 的 dispose
//_workers.OrderBy(worker => worker.Order).ToList().ForEach(m => m.Dispose());
}
public Task Rollback(CancellationToken cancellationToken = default)
{
_workers.OrderBy(worker => worker.Order).ToList().ForEach(m => m.Rollback());
return Task.FromResult(1);
}
}
}
......@@ -8,8 +8,8 @@ namespace Pole.Domain.UnitOfWork
{
public interface IUnitOfWork : IDisposable
{
Task CompeleteAsync(CancellationToken cancellationToken = default);
Task RollbackAsync(CancellationToken cancellationToken = default);
Task Compelete(CancellationToken cancellationToken = default);
Task Rollback(CancellationToken cancellationToken = default);
}
}
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Pole.Domain.UnitOfWork
{
public interface IWorker : IDisposable
{
int Order { get; }
WorkerStatus WorkerStatus { get; }
Task PreCommit(CancellationToken cancellationToken = default);
Task Commit(CancellationToken cancellationToken = default);
Task Rollback(CancellationToken cancellationToken = default);
}
}
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.Domain.UnitOfWork
{
public enum WorkerStatus
{
Init = 0,
PreCommited = 1,
Commited = 2,
PostCommited = 3,
Rollbacked = 4
}
}
......@@ -10,6 +10,6 @@ namespace Pole.ReliableMessage.Abstraction
{
Task<string> PrePublish<TReliableEvent>(TReliableEvent @event,object callbackParemeter, CancellationToken cancellationToken = default);
Task<bool> Publish<TReliableEvent>(TReliableEvent @event,string prePublishMessageId, CancellationToken cancellationToken=default);
Task<bool> Cancel<TReliableEvent>(TReliableEvent @event, string prePublishMessageId, CancellationToken cancellationToken = default);
Task<bool> Cancel(string prePublishMessageId, CancellationToken cancellationToken = default);
}
}
......@@ -33,7 +33,7 @@ namespace Pole.Pole.ReliableMessage.EventBus
_messageTypeIdGenerator = messageTypeIdGenerator;
}
public Task<bool> Cancel<TReliableEvent>(TReliableEvent @event, string prePublishMessageId, CancellationToken cancellationToken = default)
public Task<bool> Cancel(string prePublishMessageId, CancellationToken cancellationToken = default)
{
try
{
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment