Commit 0ad97bd8 by dingsongjie

修改 部分bug

parent 7b305c9c
using Pole.Application.EventBus;
using Pole.ReliableMessage.Abstraction;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace Backet.Api.Application.IntegrationEvent.Handler
{
public class JustTestWhenProductAddedIntegrationEventHandler : IntegrationEventHandler<ProductAddedIntegrationEvent>
{
public JustTestWhenProductAddedIntegrationEventHandler(IServiceProvider serviceProvider) : base(serviceProvider)
{
}
public override Task Handle(IReliableEventHandlerContext<ProductAddedIntegrationEvent> context)
{
return Task.FromResult(1);
}
}
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace Backet.Api.Application.IntegrationEvent
{
public class ProductAddedIntegrationEvent
{
public string ProductName { get; set; }
public long Price { get; set; }
}
}
......@@ -4,4 +4,21 @@
<TargetFramework>netcoreapp3.1</TargetFramework>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="3.1.0" />
<PackageReference Include="Npgsql.EntityFrameworkCore.PostgreSQL" Version="3.1.0" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\..\src\Pole.Application\Pole.Application.csproj" />
<ProjectReference Include="..\..\..\src\Pole.Domain.EntityframeworkCore\Pole.Domain.EntityframeworkCore.csproj" />
<ProjectReference Include="..\..\..\src\Pole.Grpc\Pole.Grpc.csproj" />
<ProjectReference Include="..\..\..\src\Pole.ReliableMessage.Masstransit\Pole.ReliableMessage.Masstransit.csproj" />
<ProjectReference Include="..\..\..\src\Pole.ReliableMessage.Storage.Mongodb\Pole.ReliableMessage.Storage.Mongodb.csproj" />
</ItemGroup>
<ItemGroup>
<Folder Include="Domain\AggregatesModel\" />
</ItemGroup>
</Project>
using Microsoft.EntityFrameworkCore;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace Backet.Api.Infrastructure
{
public class BacketDbContext : DbContext
{
public BacketDbContext(DbContextOptions options) : base(options)
{
}
protected override void OnModelCreating(ModelBuilder builder)
{
base.OnModelCreating(builder);
}
}
}
{
"iisSettings": {
"windowsAuthentication": false,
"anonymousAuthentication": true,
"iisExpress": {
"applicationUrl": "http://localhost:21703",
"sslPort": 44369
}
},
"profiles": {
"IIS Express": {
"commandName": "IISExpress",
"launchBrowser": true,
"environmentVariables": {
"ASPNETCORE_ENVIRONMENT": "Development"
}
},
"Backet.Api": {
"commandName": "Project",
"launchBrowser": true,
"applicationUrl": "https://localhost:5001;http://localhost:5000",
"applicationUrl": "http://localhost:5001",
"environmentVariables": {
"ASPNETCORE_ENVIRONMENT": "Development"
}
......
......@@ -2,25 +2,77 @@ using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Backet.Api.Infrastructure;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Http;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Pole.ReliableMessage.Storage.Mongodb;
namespace Backet.Api
{
public class Startup
{
private IConfiguration Configuration { get; }
private IWebHostEnvironment Environment { get; }
public Startup(IConfiguration configuration, IWebHostEnvironment env)
{
Configuration = configuration;
Environment = env;
}
// This method gets called by the runtime. Use this method to add services to the container.
// For more information on how to configure your application, visit https://go.microsoft.com/fwlink/?LinkID=398940
public void ConfigureServices(IServiceCollection services)
{
services.AddDbContext<BacketDbContext>(options =>
options.UseNpgsql(Configuration["postgres:main"]));
services.AddGrpc(option =>
{
if (Environment.IsDevelopment())
{
option.EnableDetailedErrors = true;
}
});
services.AddGrpcValidation();
services.AddGrpcRequestValidator(this.GetType().Assembly);
services.AddPole(option =>
{
option.AddManageredAssemblies(this.GetType().Assembly);
option.AutoInjectionDependency();
option.AutoInjectionCommandHandlersAndDomainEventHandlers();
option.AddPoleEntityFrameworkCoreDomain();
option.AddPoleReliableMessage(messageOption =>
{
messageOption.AddMasstransitRabbitmq(rabbitoption =>
{
rabbitoption.RabbitMqHostAddress = Configuration["RabbitmqConfig:HostAddress"];
rabbitoption.RabbitMqHostUserName = Configuration["RabbitmqConfig:HostUserName"];
rabbitoption.RabbitMqHostPassword = Configuration["RabbitmqConfig:HostPassword"];
rabbitoption.QueueNamePrefix = Configuration["ServiceName"];
});
messageOption.AddMongodb(mongodbOption =>
{
mongodbOption.ServiceCollectionName = Configuration["ServiceName"];
mongodbOption.Servers = Configuration.GetSection("MongoConfig:Servers").Get<MongoHost[]>();
});
messageOption.AddEventAssemblies(typeof(Startup).Assembly)
.AddEventHandlerAssemblies(typeof(Startup).Assembly);
messageOption.NetworkInterfaceGatewayAddress = Configuration["ReliableMessageOption:NetworkInterfaceGatewayAddress"];
});
});
}
// This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
{
app.UsePoleReliableMessage();
if (env.IsDevelopment())
{
app.UseDeveloperExceptionPage();
......
......@@ -3,8 +3,33 @@
"LogLevel": {
"Default": "Information",
"Microsoft": "Warning",
"Microsoft.Hosting.Lifetime": "Information"
"Microsoft.Hosting.Lifetime": "Information",
"Grpc": "Information"
}
},
"AllowedHosts": "*"
"postgres": {
"main": "Server=192.168.0.248;Port=5432;Username=postgres;Password=comteck2020!@#;Database=Pole_samples_Backet;Enlist=True;Timeout=0;Command Timeout=600;Pooling=false;MinPoolSize=20;MaxPoolSize=500;"
},
"ServiceName": "Backet",
"RabbitmqConfig": {
"HostAddress": "rabbitmq://192.168.0.248/",
"HostUserName": "comteck",
"HostPassword": "comteck3030"
},
"MongoConfig": {
"Servers": [
{
"Host": "192.168.0.248",
"Port": "27017"
}
]
},
"ReliableMessageOption": {
"NetworkInterfaceGatewayAddress": "192.168.0.1"
},
"Kestrel": {
"EndpointDefaults": {
"Protocols": "Http2"
}
}
}
using Pole.Domain;
using Pole.Application.EventBus;
using Pole.Domain;
using Product.Api.Application.IntergrationEvent;
using Product.Api.Domain.Event;
using Product.Api.Domain.ProductAggregate;
using System;
......@@ -12,15 +14,23 @@ namespace Product.Api.Application.DomainEventHandler
public class AddDefaultProductWhenProductTypeAdded2DomainEventHandler : IDomainEventHandler<ProductTypeAddedDomainEvent>
{
private readonly IProductRepository _productRepository;
public AddDefaultProductWhenProductTypeAdded2DomainEventHandler(IProductRepository productRepository)
private readonly IEventBus _eventBus;
public AddDefaultProductWhenProductTypeAdded2DomainEventHandler(IProductRepository productRepository, IEventBus eventBus)
{
_productRepository = productRepository;
_eventBus = eventBus;
}
public async Task Handle(ProductTypeAddedDomainEvent request, CancellationToken cancellationToken)
{
Product.Api.Domain.ProductAggregate.Product product = new Product.Api.Domain.ProductAggregate.Product(Guid.NewGuid().ToString("N"), request.ProductTypeName, 100, request.ProductTypeId);
_productRepository.Add(product);
ProductAddedIntegrationEvent productAddedIntegrationEvent = new ProductAddedIntegrationEvent()
{
Price = product.Price,
ProductName = product.Name
};
await _eventBus.Publish(productAddedIntegrationEvent, product.Id);
await _productRepository.SaveEntitiesAsync();
}
}
......
using Pole.ReliableMessage.Abstraction;
using Product.Api.Application.IntergrationEvent;
using Product.Api.Application.Query.Abstraction;
using Product.Api.Domain.ProductAggregate;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace Product.Api.Application.IntegrationEvent.CallBack
{
public class ProductAddedIntegrationEventCallback : IReliableEventCallback<ProductAddedIntegrationEvent, string>
{
private readonly IProductQuery _productQuery;
public ProductAddedIntegrationEventCallback(IProductQuery productQuery)
{
_productQuery = productQuery;
}
public async Task<bool> Callback(string callbackParemeter)
{
return await _productQuery.GetById(callbackParemeter) != null;
}
}
}
using Pole.Core.DependencyInjection;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace Product.Api.Application.Query.Abstraction
{
public interface IProductQuery: IScopedDenpendency
{
Task<Product.Api.Domain.ProductAggregate.Product> GetById(string id);
}
}
using Microsoft.EntityFrameworkCore;
using Product.Api.Application.Query.Abstraction;
using Product.Api.Infrastructure;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace Product.Api.Application.Query
{
public class EfCoreProductQuery : IProductQuery
{
private readonly ProductDbContext _productDbContext;
public EfCoreProductQuery(ProductDbContext productDbContext)
{
_productDbContext = productDbContext;
}
public Task<Domain.ProductAggregate.Product> GetById(string id)
{
return _productDbContext.Products.FirstOrDefaultAsync(m => m.Id == id);
}
}
}
......@@ -13,7 +13,7 @@ namespace Product.Api.Infrastructure
{
public DbSet<Product.Api.Domain.ProductAggregate.Product> Products { get; set; }
public DbSet<Product.Api.Domain.ProductTypeAggregate.ProductType> ProductTypes { get; set; }
public ProductDbContext(DbContextOptions<ProductDbContext> options) : base(options)
public ProductDbContext(DbContextOptions options) : base(options)
{
}
......
......@@ -49,23 +49,23 @@ namespace Product.Api
option.AutoInjectionCommandHandlersAndDomainEventHandlers();
option.AddPoleEntityFrameworkCoreDomain();
option.AddPoleReliableMessage(option =>
option.AddPoleReliableMessage(messageOption =>
{
option.AddMasstransitRabbitmq(rabbitoption =>
messageOption.AddMasstransitRabbitmq(rabbitoption =>
{
rabbitoption.RabbitMqHostAddress = Configuration["RabbitmqConfig:HostAddress"];
rabbitoption.RabbitMqHostUserName = Configuration["RabbitmqConfig:HostUserName"];
rabbitoption.RabbitMqHostPassword = Configuration["RabbitmqConfig:HostPassword"];
rabbitoption.QueueNamePrefix = Configuration["ServiceName"];
});
option.AddMongodb(mongodbOption =>
messageOption.AddMongodb(mongodbOption =>
{
mongodbOption.ServiceCollectionName = Configuration["ServiceName"];
mongodbOption.Servers = Configuration.GetSection("MongoConfig:Servers").Get<MongoHost[]>();
});
option.AddEventAssemblies(typeof(Startup).Assembly)
messageOption.AddEventAssemblies(typeof(Startup).Assembly)
.AddEventHandlerAssemblies(typeof(Startup).Assembly);
option.NetworkInterfaceGatewayAddress = Configuration["ReliableMessageOption:NetworkInterfaceGatewayAddress"];
messageOption.NetworkInterfaceGatewayAddress = Configuration["ReliableMessageOption:NetworkInterfaceGatewayAddress"];
});
});
}
......@@ -73,6 +73,7 @@ namespace Product.Api
// This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
{
app.UsePoleReliableMessage();
if (env.IsDevelopment())
{
app.UseDeveloperExceptionPage();
......
......@@ -10,10 +10,12 @@ namespace Pole.Application.EventBus
public object CallbackParemeter { get; private set; }
public string PrePublishEventId { get; set; }
public bool IsPublished { get; set; }
public EventEntry(object @event,object callbackParemeter)
public Type EventType { get;private set; }
public EventEntry(object @event,object callbackParemeter, Type eventType)
{
Event = @event;
CallbackParemeter = callbackParemeter;
EventType = eventType;
}
}
}
using Pole.ReliableMessage.Masstransit;
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.Application.EventBus
{
public abstract class IntegrationEventHandler<TEvent> : ReliableEventHandler<TEvent> where TEvent : class
{
public IntegrationEventHandler(IServiceProvider serviceProvider) : base(serviceProvider)
{
}
}
}
......@@ -8,6 +8,6 @@ namespace Pole.Application.EventBus
{
public interface IEventBus
{
Task Publish<TReliableEvent>(TReliableEvent @event, object callbackParemeter, CancellationToken cancellationToken = default);
Task Publish<TReliableEvent>(TReliableEvent @event, object callbackParemeter, CancellationToken cancellationToken = default) where TReliableEvent : class;
}
}
......@@ -15,9 +15,9 @@ namespace Pole.Application.EventBus
_reliableMessageScopedBuffer = reliableMessageScopedBuffer;
}
public Task Publish<TReliableEvent>(TReliableEvent @event, object callbackParemeter, CancellationToken cancellationToken = default)
public Task Publish<TReliableEvent>(TReliableEvent @event, object callbackParemeter, CancellationToken cancellationToken = default) where TReliableEvent : class
{
_reliableMessageScopedBuffer.Add(new EventEntry(@event, callbackParemeter));
_reliableMessageScopedBuffer.Add(new EventEntry(@event, callbackParemeter,typeof(TReliableEvent)));
return Task.FromResult(1);
}
}
......
......@@ -40,7 +40,7 @@ namespace Pole.Application.EventBus
if (events.Count(@event => @event.IsPublished) > 1)
{
//这里发布失败 通过预发送 后的重试机制去处理, 因为一旦有一个消息发出去后 无法挽回
//这里发布失败 通过预发送后的重试机制去处理, 因为一旦有一个消息发出去后 无法挽回
return Task.FromResult(1);
}
else
......@@ -63,7 +63,7 @@ namespace Pole.Application.EventBus
var events = _reliableMessageScopedBuffer.GetAll();
foreach (var @event in events)
{
@event.PrePublishEventId = await _reliableBus.PrePublish(@event.Event, @event.PrePublishEventId, cancellationToken);
@event.PrePublishEventId = await _reliableBus.PrePublish(@event.Event, @event.EventType, @event.PrePublishEventId, cancellationToken);
}
WorkerStatus = WorkerStatus.PreCommited;
}
......
......@@ -11,6 +11,7 @@
<ItemGroup>
<ProjectReference Include="..\Pole.Core\Pole.Core.csproj" />
<ProjectReference Include="..\Pole.Domain\Pole.Domain.csproj" />
<ProjectReference Include="..\Pole.ReliableMessage.Masstransit\Pole.ReliableMessage.Masstransit.csproj" />
<ProjectReference Include="..\Pole.ReliableMessage\Pole.ReliableMessage.csproj" />
</ItemGroup>
......
......@@ -8,8 +8,9 @@ namespace Pole.ReliableMessage.Abstraction
{
public interface IReliableBus
{
Task<string> PrePublish<TReliableEvent>(TReliableEvent @event,object callbackParemeter, CancellationToken cancellationToken = default);
Task<bool> Publish<TReliableEvent>(TReliableEvent @event,string prePublishMessageId, CancellationToken cancellationToken=default);
Task<bool> Cancel(string prePublishMessageId, CancellationToken cancellationToken = default);
}
Task<string> PrePublish<TReliableEvent>(TReliableEvent @event, object callbackParemeter, CancellationToken cancellationToken = default) where TReliableEvent : class;
Task<string> PrePublish(object @event, Type eventType, object callbackParemeter, CancellationToken cancellationToken = default);
Task<bool> Publish(object @event, string prePublishMessageId, CancellationToken cancellationToken = default);
Task<bool> Cancel(string prePublishMessageId, CancellationToken cancellationToken = default);
}
}
......@@ -12,7 +12,7 @@ namespace Microsoft.AspNetCore.Builder
{
public static class ComteckReliableMessageIApplicationBuilderExtensions
{
public static IApplicationBuilder UseComteckReliableMessage(this IApplicationBuilder applicationBuilder)
public static IApplicationBuilder UsePoleReliableMessage(this IApplicationBuilder applicationBuilder)
{
var option = applicationBuilder.ApplicationServices.GetRequiredService(typeof(IOptions<ReliableMessageOption>)) as IOptions<ReliableMessageOption>;
var messageCallBackRegister = applicationBuilder.ApplicationServices.GetRequiredService(typeof(IMessageCallBackRegister)) as IMessageCallBackRegister;
......
......@@ -47,11 +47,24 @@ namespace Pole.Pole.ReliableMessage.EventBus
}
}
public async Task<string> PrePublish<TReliableEvent>(TReliableEvent @event, object callbackParemeter, CancellationToken cancellationToken = default)
#region PrePublish
public async Task<string> PrePublish<TReliableEvent>(TReliableEvent @event, object callbackParemeter, CancellationToken cancellationToken = default) where TReliableEvent : class
{
var messageTypeId = _messageTypeIdGenerator.Generate(typeof(TReliableEvent));
var content = _jsonConverter.Serialize(@event);
return await PrePublishCore(callbackParemeter, messageTypeId, content);
}
public async Task<string> PrePublish(object @event, Type eventType, object callbackParemeter, CancellationToken cancellationToken = default)
{
var messageTypeId = _messageTypeIdGenerator.Generate(eventType);
var content = _jsonConverter.Serialize(@event);
return await PrePublishCore(callbackParemeter, messageTypeId, content);
}
private async Task<string> PrePublishCore(object callbackParemeter, string messageTypeId, string content)
{
var currentMessageCallbackInfo = _messageCallBackInfoStore.Get(messageTypeId);
if (currentMessageCallbackInfo == null)
{
......@@ -64,7 +77,7 @@ namespace Pole.Pole.ReliableMessage.EventBus
_logger.LogDebug($"PrePublish message begin ,messageId:{messageId}");
var now = _timeHelper.GetUTCNow();
var content = _jsonConverter.Serialize(@event);
var callBackParem = _jsonConverter.Serialize(callbackParemeter);
Message newMessage = new Message()
{
......@@ -88,9 +101,10 @@ namespace Pole.Pole.ReliableMessage.EventBus
_logger.LogError(ex, errorInfo);
throw new Exception(errorInfo, ex);
}
}
}
#endregion
public async Task<bool> Publish<TReliableEvent>(TReliableEvent @event, string prePublishMessageId, CancellationToken cancellationToken = default)
public async Task<bool> Publish(object @event, string prePublishMessageId, CancellationToken cancellationToken = default)
{
try
{
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment