From 672f72c7b107c919967506ebd420836a02e59032 Mon Sep 17 00:00:00 2001 From: dingsongjie Date: Sun, 19 Jan 2020 11:39:40 +0800 Subject: [PATCH] 优化 重试机制 --- samples/apis/Backet.Api/Application/IntegrationEvent/Handler/JustTestWhenProductAddedIntegrationEventHandler.cs | 25 +++++++++++++++---------- samples/apis/Backet.Api/Domain/AggregatesModel/BacketAggregate/Backet.cs | 32 ++++++++++++++++++++++++++++---- samples/apis/Backet.Api/Domain/AggregatesModel/BacketAggregate/BacketItem.cs | 6 +++--- samples/apis/Backet.Api/Domain/AggregatesModel/BacketAggregate/IBacketRepository.cs | 12 ++++++++++++ samples/apis/Backet.Api/Infrastructure/Repository/BacketRepository.cs | 26 ++++++++++++++++++++++++++ samples/apis/Backet.Api/Startup.cs | 18 ++++++++++++++++++ samples/apis/Product.Api/Application/CommandHandler/AddProductTypeCommandHandler.cs | 2 +- samples/apis/Product.Api/Application/DomainEventHandler/AddDefaultProductWhenProductTypeAdded2DomainEventHandler.cs | 6 +++++- samples/apis/Product.Api/Startup.cs | 18 ++++++++++++++++++ samples/intergrationEvents/Product.IntegrationEvents/ProductAddedIntegrationEvent.cs | 2 ++ src/Pole.Domain.EntityframeworkCore/EFCoreRepository.cs | 2 +- src/Pole.Domain/Entity/Entity.cs | 4 ++-- src/Pole.Domain/UnitOfWork/DefaultUnitOfWork.cs | 2 +- src/Pole.Domain/UnitOfWork/IUnitOfWork.cs | 2 +- src/Pole.ReliableMessage.Masstransit/MasstransitRabbitmqOption.cs | 10 +++++----- 15 files changed, 138 insertions(+), 29 deletions(-) create mode 100644 samples/apis/Backet.Api/Domain/AggregatesModel/BacketAggregate/IBacketRepository.cs create mode 100644 samples/apis/Backet.Api/Infrastructure/Repository/BacketRepository.cs diff --git a/samples/apis/Backet.Api/Application/IntegrationEvent/Handler/JustTestWhenProductAddedIntegrationEventHandler.cs b/samples/apis/Backet.Api/Application/IntegrationEvent/Handler/JustTestWhenProductAddedIntegrationEventHandler.cs index 5cd3d65..2469ef5 100644 --- a/samples/apis/Backet.Api/Application/IntegrationEvent/Handler/JustTestWhenProductAddedIntegrationEventHandler.cs +++ b/samples/apis/Backet.Api/Application/IntegrationEvent/Handler/JustTestWhenProductAddedIntegrationEventHandler.cs @@ -1,4 +1,6 @@ -using Pole.Application.EventBus; +using Backet.Api.Domain.AggregatesModel.BacketAggregate; +using Pole.Application.EventBus; +using Pole.Domain.UnitOfWork; using Pole.ReliableMessage.Abstraction; using Product.IntegrationEvents; using System; @@ -10,21 +12,24 @@ namespace Backet.Api.Application.IntegrationEvent.Handler { public class JustTestWhenProductAddedIntegrationEventHandler : IntegrationEventHandler { - public JustTestWhenProductAddedIntegrationEventHandler(IServiceProvider serviceProvider) : base(serviceProvider) + private readonly IBacketRepository _backetRepository; + private readonly IUnitOfWork _unitOfWork; + public JustTestWhenProductAddedIntegrationEventHandler(IServiceProvider serviceProvider, IBacketRepository backetRepository, IUnitOfWork unitOfWork) : base(serviceProvider) { + _backetRepository = backetRepository; + _unitOfWork = unitOfWork; } - public override Task Handle(IReliableEventHandlerContext context) + public override async Task Handle(IReliableEventHandlerContext context) { - try - { - } - catch(Exception ex) - { + var @event = context.Event; + Backet.Api.Domain.AggregatesModel.BacketAggregate.Backet backet = new Domain.AggregatesModel.BacketAggregate.Backet(@event.BacketId, "1"); + backet.AddBacketItem(@event.ProductId, @event.ProductName, @event.Price); + _backetRepository.Add(backet); + await _backetRepository.SaveEntitiesAsync(); - } - return Task.FromResult(1); + await _unitOfWork.CompeleteAsync(); } } } diff --git a/samples/apis/Backet.Api/Domain/AggregatesModel/BacketAggregate/Backet.cs b/samples/apis/Backet.Api/Domain/AggregatesModel/BacketAggregate/Backet.cs index b716eb7..db0182c 100644 --- a/samples/apis/Backet.Api/Domain/AggregatesModel/BacketAggregate/Backet.cs +++ b/samples/apis/Backet.Api/Domain/AggregatesModel/BacketAggregate/Backet.cs @@ -6,10 +6,34 @@ using System.Threading.Tasks; namespace Backet.Api.Domain.AggregatesModel.BacketAggregate { - public class Backet: Entity,IAggregateRoot + public class Backet : Entity, IAggregateRoot { - public string UserId { get; set; } - public IEnumerable BacketItems { get; private set; } - public long TotalPrice { get; set; } + public Backet(string id,string userId) + { + Id = id; + UserId = userId; + } + public void AddBacketItem(string productId, string productName, long Price) + { + BacketItem backetItem = new BacketItem() + { + Id = Guid.NewGuid().ToString("N"), + Price = Price, + ProductId = productId, + ProductName = productName + }; + BacketItems.Add(backetItem); + SetBacketTotalPrice(); + } + private void SetBacketTotalPrice() + { + foreach (var item in BacketItems) + { + TotalPrice += item.Price; + } + } + public string UserId { get; private set; } + public List BacketItems { get; private set; } = new List(); + public long TotalPrice { get; private set; } } } diff --git a/samples/apis/Backet.Api/Domain/AggregatesModel/BacketAggregate/BacketItem.cs b/samples/apis/Backet.Api/Domain/AggregatesModel/BacketAggregate/BacketItem.cs index f9953f5..bc32338 100644 --- a/samples/apis/Backet.Api/Domain/AggregatesModel/BacketAggregate/BacketItem.cs +++ b/samples/apis/Backet.Api/Domain/AggregatesModel/BacketAggregate/BacketItem.cs @@ -9,8 +9,8 @@ namespace Backet.Api.Domain.AggregatesModel.BacketAggregate public class BacketItem : Entity { public string ProductId { get; set; } - public string ProductName { get; set; } - public long Price { get; set; } - public string BacketId { get; set; } + public string ProductName { get; set; } + public long Price { get; set; } + public string BacketId { get; set; } } } diff --git a/samples/apis/Backet.Api/Domain/AggregatesModel/BacketAggregate/IBacketRepository.cs b/samples/apis/Backet.Api/Domain/AggregatesModel/BacketAggregate/IBacketRepository.cs new file mode 100644 index 0000000..3da10f0 --- /dev/null +++ b/samples/apis/Backet.Api/Domain/AggregatesModel/BacketAggregate/IBacketRepository.cs @@ -0,0 +1,12 @@ +using Pole.Domain; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; + +namespace Backet.Api.Domain.AggregatesModel.BacketAggregate +{ + public interface IBacketRepository : IRepository + { + } +} diff --git a/samples/apis/Backet.Api/Infrastructure/Repository/BacketRepository.cs b/samples/apis/Backet.Api/Infrastructure/Repository/BacketRepository.cs new file mode 100644 index 0000000..1052cbe --- /dev/null +++ b/samples/apis/Backet.Api/Infrastructure/Repository/BacketRepository.cs @@ -0,0 +1,26 @@ +using Backet.Api.Domain.AggregatesModel.BacketAggregate; +using Pole.Domain.EntityframeworkCore; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; + +namespace Backet.Api.Infrastructure.Repository +{ + public class BacketRepository : EFCoreRepository, IBacketRepository + { + public BacketRepository(IServiceProvider serviceProvider) : base(serviceProvider) + { + } + public override async Task Get(string id) + { + var backet = await base.Get(id); + if (backet != null) + { + await _dbContext.Entry(backet).Collection(m => m.BacketItems).LoadAsync(); + } + return backet; + } + } +} diff --git a/samples/apis/Backet.Api/Startup.cs b/samples/apis/Backet.Api/Startup.cs index f08a743..56f2145 100644 --- a/samples/apis/Backet.Api/Startup.cs +++ b/samples/apis/Backet.Api/Startup.cs @@ -1,8 +1,10 @@ using System; using System.Collections.Generic; +using System.Data.SqlClient; using System.Linq; using System.Threading.Tasks; using Backet.Api.Infrastructure; +using GreenPipes; using Microsoft.AspNetCore.Builder; using Microsoft.AspNetCore.Hosting; using Microsoft.AspNetCore.Http; @@ -10,6 +12,7 @@ using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; +using Npgsql; using Pole.ReliableMessage.Storage.Mongodb; namespace Backet.Api @@ -57,6 +60,21 @@ namespace Backet.Api rabbitoption.RabbitMqHostPassword = Configuration["RabbitmqConfig:HostPassword"]; rabbitoption.QueueNamePrefix = Configuration["ServiceName"]; rabbitoption.EventHandlerNameSuffix = "IntegrationEventHandler"; + rabbitoption.RetryConfigure = + r => + { + r.Intervals(TimeSpan.FromSeconds(0.1) + , TimeSpan.FromSeconds(1) + , TimeSpan.FromSeconds(4) + , TimeSpan.FromSeconds(16) + , TimeSpan.FromSeconds(64) + ); + r.Ignore(exception => + { + var sqlException = exception.InnerException as PostgresException; + return sqlException != null && sqlException.SqlState == "23505"; + }); + }; }); messageOption.AddMongodb(mongodbOption => { diff --git a/samples/apis/Product.Api/Application/CommandHandler/AddProductTypeCommandHandler.cs b/samples/apis/Product.Api/Application/CommandHandler/AddProductTypeCommandHandler.cs index c2546a7..0c96ab5 100644 --- a/samples/apis/Product.Api/Application/CommandHandler/AddProductTypeCommandHandler.cs +++ b/samples/apis/Product.Api/Application/CommandHandler/AddProductTypeCommandHandler.cs @@ -37,7 +37,7 @@ namespace Product.Api.Application.CommandHandler productType.AddDomainEvent(productTypeAddedDomainEvent); var result = await _productTypeRepository.SaveEntitiesAsync(); - await _unitOfWork.Compelete(); + await _unitOfWork.CompeleteAsync(); return CommonCommandResponse.SuccessResponse; } } diff --git a/samples/apis/Product.Api/Application/DomainEventHandler/AddDefaultProductWhenProductTypeAdded2DomainEventHandler.cs b/samples/apis/Product.Api/Application/DomainEventHandler/AddDefaultProductWhenProductTypeAdded2DomainEventHandler.cs index dbd6661..310ca57 100644 --- a/samples/apis/Product.Api/Application/DomainEventHandler/AddDefaultProductWhenProductTypeAdded2DomainEventHandler.cs +++ b/samples/apis/Product.Api/Application/DomainEventHandler/AddDefaultProductWhenProductTypeAdded2DomainEventHandler.cs @@ -25,11 +25,15 @@ namespace Product.Api.Application.DomainEventHandler { Product.Api.Domain.ProductAggregate.Product product = new Product.Api.Domain.ProductAggregate.Product(Guid.NewGuid().ToString("N"), request.ProductTypeName, 100, request.ProductTypeId); _productRepository.Add(product); + var backId = Guid.NewGuid().ToString("N"); ProductAddedIntegrationEvent productAddedIntegrationEvent = new ProductAddedIntegrationEvent() { + BacketId = backId, Price = product.Price, - ProductName = product.Name + ProductName = product.Name, + ProductId = product.Id }; + await _eventBus.Publish(productAddedIntegrationEvent, product.Id); await _productRepository.SaveEntitiesAsync(); } diff --git a/samples/apis/Product.Api/Startup.cs b/samples/apis/Product.Api/Startup.cs index 3a43bff..cd8b8be 100644 --- a/samples/apis/Product.Api/Startup.cs +++ b/samples/apis/Product.Api/Startup.cs @@ -1,7 +1,9 @@ using System; using System.Collections.Generic; +using System.Data.SqlClient; using System.Linq; using System.Threading.Tasks; +using GreenPipes; using Microsoft.AspNetCore.Builder; using Microsoft.AspNetCore.Hosting; using Microsoft.AspNetCore.Http; @@ -9,6 +11,7 @@ using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; +using Npgsql; using Pole.ReliableMessage.Storage.Mongodb; using Product.Api.Grpc; using Product.Api.Infrastructure; @@ -58,6 +61,21 @@ namespace Product.Api rabbitoption.RabbitMqHostPassword = Configuration["RabbitmqConfig:HostPassword"]; rabbitoption.QueueNamePrefix = Configuration["ServiceName"]; rabbitoption.EventHandlerNameSuffix = "IntegrationEventHandler"; + rabbitoption.RetryConfigure = + r => + { + r.Intervals(TimeSpan.FromSeconds(0.1) + , TimeSpan.FromSeconds(1) + , TimeSpan.FromSeconds(4) + , TimeSpan.FromSeconds(16) + , TimeSpan.FromSeconds(64) + ); + r.Ignore(exception => + { + var sqlException = exception.InnerException as PostgresException; + return sqlException != null && sqlException.SqlState == "23505"; + }); + }; }); messageOption.AddMongodb(mongodbOption => { diff --git a/samples/intergrationEvents/Product.IntegrationEvents/ProductAddedIntegrationEvent.cs b/samples/intergrationEvents/Product.IntegrationEvents/ProductAddedIntegrationEvent.cs index c39a231..71c07b6 100644 --- a/samples/intergrationEvents/Product.IntegrationEvents/ProductAddedIntegrationEvent.cs +++ b/samples/intergrationEvents/Product.IntegrationEvents/ProductAddedIntegrationEvent.cs @@ -7,6 +7,8 @@ namespace Product.IntegrationEvents { public class ProductAddedIntegrationEvent { + public string BacketId { get; set; } + public string ProductId { get; set; } public string ProductName { get; set; } public long Price { get; set; } } diff --git a/src/Pole.Domain.EntityframeworkCore/EFCoreRepository.cs b/src/Pole.Domain.EntityframeworkCore/EFCoreRepository.cs index 20773f7..6cb4183 100644 --- a/src/Pole.Domain.EntityframeworkCore/EFCoreRepository.cs +++ b/src/Pole.Domain.EntityframeworkCore/EFCoreRepository.cs @@ -26,7 +26,7 @@ namespace Pole.Domain.EntityframeworkCore _dbContext.Set().Add(entity); } - public void Delete(TEntity entity) + public virtual void Delete(TEntity entity) { _dbContext.Set().Remove(entity); } diff --git a/src/Pole.Domain/Entity/Entity.cs b/src/Pole.Domain/Entity/Entity.cs index 82d67ba..0107d5e 100644 --- a/src/Pole.Domain/Entity/Entity.cs +++ b/src/Pole.Domain/Entity/Entity.cs @@ -14,7 +14,7 @@ namespace Pole.Domain { return _id; } - protected set + set { _id = value; } @@ -22,7 +22,7 @@ namespace Pole.Domain public List DomainEvents { get; private set; } public bool IsTransient() { - return string.IsNullOrEmpty( this._id); + return string.IsNullOrEmpty(this._id); } public override bool Equals(object obj) { diff --git a/src/Pole.Domain/UnitOfWork/DefaultUnitOfWork.cs b/src/Pole.Domain/UnitOfWork/DefaultUnitOfWork.cs index e2bd340..6b3d89f 100644 --- a/src/Pole.Domain/UnitOfWork/DefaultUnitOfWork.cs +++ b/src/Pole.Domain/UnitOfWork/DefaultUnitOfWork.cs @@ -15,7 +15,7 @@ namespace Pole.Domain.UnitOfWork { _workers = serviceProvider.GetServices().ToList(); } - public async Task Compelete(CancellationToken cancellationToken = default) + public async Task CompeleteAsync(CancellationToken cancellationToken = default) { var preCommitTasks = _workers.OrderBy(worker => worker.Order).Select(async worker => { diff --git a/src/Pole.Domain/UnitOfWork/IUnitOfWork.cs b/src/Pole.Domain/UnitOfWork/IUnitOfWork.cs index 3f20e77..9e0fb19 100644 --- a/src/Pole.Domain/UnitOfWork/IUnitOfWork.cs +++ b/src/Pole.Domain/UnitOfWork/IUnitOfWork.cs @@ -8,7 +8,7 @@ namespace Pole.Domain.UnitOfWork { public interface IUnitOfWork : IDisposable { - Task Compelete(CancellationToken cancellationToken = default); + Task CompeleteAsync(CancellationToken cancellationToken = default); Task Rollback(CancellationToken cancellationToken = default); } diff --git a/src/Pole.ReliableMessage.Masstransit/MasstransitRabbitmqOption.cs b/src/Pole.ReliableMessage.Masstransit/MasstransitRabbitmqOption.cs index 50e533e..739c965 100644 --- a/src/Pole.ReliableMessage.Masstransit/MasstransitRabbitmqOption.cs +++ b/src/Pole.ReliableMessage.Masstransit/MasstransitRabbitmqOption.cs @@ -22,10 +22,10 @@ namespace Pole.ReliableMessage.Masstransit public Action RetryConfigure { get; set; } = r => r.Intervals(TimeSpan.FromSeconds(0.1) - , TimeSpan.FromSeconds(1) - , TimeSpan.FromSeconds(4) - , TimeSpan.FromSeconds(16) - , TimeSpan.FromSeconds(64) - ); + , TimeSpan.FromSeconds(1) + , TimeSpan.FromSeconds(4) + , TimeSpan.FromSeconds(16) + , TimeSpan.FromSeconds(64) + ); } } -- libgit2 0.25.0