Commit 672f72c7 by dingsongjie

优化 重试机制

parent 70eb14eb
using Pole.Application.EventBus;
using Backet.Api.Domain.AggregatesModel.BacketAggregate;
using Pole.Application.EventBus;
using Pole.Domain.UnitOfWork;
using Pole.ReliableMessage.Abstraction;
using Product.IntegrationEvents;
using System;
......@@ -10,21 +12,24 @@ namespace Backet.Api.Application.IntegrationEvent.Handler
{
public class JustTestWhenProductAddedIntegrationEventHandler : IntegrationEventHandler<ProductAddedIntegrationEvent>
{
public JustTestWhenProductAddedIntegrationEventHandler(IServiceProvider serviceProvider) : base(serviceProvider)
private readonly IBacketRepository _backetRepository;
private readonly IUnitOfWork _unitOfWork;
public JustTestWhenProductAddedIntegrationEventHandler(IServiceProvider serviceProvider, IBacketRepository backetRepository, IUnitOfWork unitOfWork) : base(serviceProvider)
{
_backetRepository = backetRepository;
_unitOfWork = unitOfWork;
}
public override Task Handle(IReliableEventHandlerContext<ProductAddedIntegrationEvent> context)
public override async Task Handle(IReliableEventHandlerContext<ProductAddedIntegrationEvent> context)
{
try
{
}
catch(Exception ex)
{
var @event = context.Event;
Backet.Api.Domain.AggregatesModel.BacketAggregate.Backet backet = new Domain.AggregatesModel.BacketAggregate.Backet(@event.BacketId, "1");
backet.AddBacketItem(@event.ProductId, @event.ProductName, @event.Price);
_backetRepository.Add(backet);
await _backetRepository.SaveEntitiesAsync();
}
return Task.FromResult(1);
await _unitOfWork.CompeleteAsync();
}
}
}
......@@ -6,10 +6,34 @@ using System.Threading.Tasks;
namespace Backet.Api.Domain.AggregatesModel.BacketAggregate
{
public class Backet: Entity,IAggregateRoot
public class Backet : Entity, IAggregateRoot
{
public string UserId { get; set; }
public IEnumerable<BacketItem> BacketItems { get; private set; }
public long TotalPrice { get; set; }
public Backet(string id,string userId)
{
Id = id;
UserId = userId;
}
public void AddBacketItem(string productId, string productName, long Price)
{
BacketItem backetItem = new BacketItem()
{
Id = Guid.NewGuid().ToString("N"),
Price = Price,
ProductId = productId,
ProductName = productName
};
BacketItems.Add(backetItem);
SetBacketTotalPrice();
}
private void SetBacketTotalPrice()
{
foreach (var item in BacketItems)
{
TotalPrice += item.Price;
}
}
public string UserId { get; private set; }
public List<BacketItem> BacketItems { get; private set; } = new List<BacketItem>();
public long TotalPrice { get; private set; }
}
}
......@@ -9,8 +9,8 @@ namespace Backet.Api.Domain.AggregatesModel.BacketAggregate
public class BacketItem : Entity
{
public string ProductId { get; set; }
public string ProductName { get; set; }
public long Price { get; set; }
public string BacketId { get; set; }
public string ProductName { get; set; }
public long Price { get; set; }
public string BacketId { get; set; }
}
}
using Pole.Domain;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace Backet.Api.Domain.AggregatesModel.BacketAggregate
{
public interface IBacketRepository : IRepository<Backet>
{
}
}
using Backet.Api.Domain.AggregatesModel.BacketAggregate;
using Pole.Domain.EntityframeworkCore;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace Backet.Api.Infrastructure.Repository
{
public class BacketRepository : EFCoreRepository<Backet.Api.Domain.AggregatesModel.BacketAggregate.Backet>, IBacketRepository
{
public BacketRepository(IServiceProvider serviceProvider) : base(serviceProvider)
{
}
public override async Task<Domain.AggregatesModel.BacketAggregate.Backet> Get(string id)
{
var backet = await base.Get(id);
if (backet != null)
{
await _dbContext.Entry(backet).Collection(m => m.BacketItems).LoadAsync();
}
return backet;
}
}
}
using System;
using System.Collections.Generic;
using System.Data.SqlClient;
using System.Linq;
using System.Threading.Tasks;
using Backet.Api.Infrastructure;
using GreenPipes;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Http;
......@@ -10,6 +12,7 @@ using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Npgsql;
using Pole.ReliableMessage.Storage.Mongodb;
namespace Backet.Api
......@@ -57,6 +60,21 @@ namespace Backet.Api
rabbitoption.RabbitMqHostPassword = Configuration["RabbitmqConfig:HostPassword"];
rabbitoption.QueueNamePrefix = Configuration["ServiceName"];
rabbitoption.EventHandlerNameSuffix = "IntegrationEventHandler";
rabbitoption.RetryConfigure =
r =>
{
r.Intervals(TimeSpan.FromSeconds(0.1)
, TimeSpan.FromSeconds(1)
, TimeSpan.FromSeconds(4)
, TimeSpan.FromSeconds(16)
, TimeSpan.FromSeconds(64)
);
r.Ignore<DbUpdateException>(exception =>
{
var sqlException = exception.InnerException as PostgresException;
return sqlException != null && sqlException.SqlState == "23505";
});
};
});
messageOption.AddMongodb(mongodbOption =>
{
......
......@@ -37,7 +37,7 @@ namespace Product.Api.Application.CommandHandler
productType.AddDomainEvent(productTypeAddedDomainEvent);
var result = await _productTypeRepository.SaveEntitiesAsync();
await _unitOfWork.Compelete();
await _unitOfWork.CompeleteAsync();
return CommonCommandResponse.SuccessResponse;
}
}
......
......@@ -25,11 +25,15 @@ namespace Product.Api.Application.DomainEventHandler
{
Product.Api.Domain.ProductAggregate.Product product = new Product.Api.Domain.ProductAggregate.Product(Guid.NewGuid().ToString("N"), request.ProductTypeName, 100, request.ProductTypeId);
_productRepository.Add(product);
var backId = Guid.NewGuid().ToString("N");
ProductAddedIntegrationEvent productAddedIntegrationEvent = new ProductAddedIntegrationEvent()
{
BacketId = backId,
Price = product.Price,
ProductName = product.Name
ProductName = product.Name,
ProductId = product.Id
};
await _eventBus.Publish(productAddedIntegrationEvent, product.Id);
await _productRepository.SaveEntitiesAsync();
}
......
using System;
using System.Collections.Generic;
using System.Data.SqlClient;
using System.Linq;
using System.Threading.Tasks;
using GreenPipes;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Http;
......@@ -9,6 +11,7 @@ using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Npgsql;
using Pole.ReliableMessage.Storage.Mongodb;
using Product.Api.Grpc;
using Product.Api.Infrastructure;
......@@ -58,6 +61,21 @@ namespace Product.Api
rabbitoption.RabbitMqHostPassword = Configuration["RabbitmqConfig:HostPassword"];
rabbitoption.QueueNamePrefix = Configuration["ServiceName"];
rabbitoption.EventHandlerNameSuffix = "IntegrationEventHandler";
rabbitoption.RetryConfigure =
r =>
{
r.Intervals(TimeSpan.FromSeconds(0.1)
, TimeSpan.FromSeconds(1)
, TimeSpan.FromSeconds(4)
, TimeSpan.FromSeconds(16)
, TimeSpan.FromSeconds(64)
);
r.Ignore<DbUpdateException>(exception =>
{
var sqlException = exception.InnerException as PostgresException;
return sqlException != null && sqlException.SqlState == "23505";
});
};
});
messageOption.AddMongodb(mongodbOption =>
{
......
......@@ -7,6 +7,8 @@ namespace Product.IntegrationEvents
{
public class ProductAddedIntegrationEvent
{
public string BacketId { get; set; }
public string ProductId { get; set; }
public string ProductName { get; set; }
public long Price { get; set; }
}
......
......@@ -26,7 +26,7 @@ namespace Pole.Domain.EntityframeworkCore
_dbContext.Set<TEntity>().Add(entity);
}
public void Delete(TEntity entity)
public virtual void Delete(TEntity entity)
{
_dbContext.Set<TEntity>().Remove(entity);
}
......
......@@ -14,7 +14,7 @@ namespace Pole.Domain
{
return _id;
}
protected set
set
{
_id = value;
}
......@@ -22,7 +22,7 @@ namespace Pole.Domain
public List<IDomainEvent> DomainEvents { get; private set; }
public bool IsTransient()
{
return string.IsNullOrEmpty( this._id);
return string.IsNullOrEmpty(this._id);
}
public override bool Equals(object obj)
{
......
......@@ -15,7 +15,7 @@ namespace Pole.Domain.UnitOfWork
{
_workers = serviceProvider.GetServices<IWorker>().ToList();
}
public async Task Compelete(CancellationToken cancellationToken = default)
public async Task CompeleteAsync(CancellationToken cancellationToken = default)
{
var preCommitTasks = _workers.OrderBy(worker => worker.Order).Select(async worker =>
{
......
......@@ -8,7 +8,7 @@ namespace Pole.Domain.UnitOfWork
{
public interface IUnitOfWork : IDisposable
{
Task Compelete(CancellationToken cancellationToken = default);
Task CompeleteAsync(CancellationToken cancellationToken = default);
Task Rollback(CancellationToken cancellationToken = default);
}
......
......@@ -22,10 +22,10 @@ namespace Pole.ReliableMessage.Masstransit
public Action<IRetryConfigurator> RetryConfigure { get; set; } =
r => r.Intervals(TimeSpan.FromSeconds(0.1)
, TimeSpan.FromSeconds(1)
, TimeSpan.FromSeconds(4)
, TimeSpan.FromSeconds(16)
, TimeSpan.FromSeconds(64)
);
, TimeSpan.FromSeconds(1)
, TimeSpan.FromSeconds(4)
, TimeSpan.FromSeconds(16)
, TimeSpan.FromSeconds(64)
);
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment