From 0ad97bd8ee7aa35a2813e55c4362df0edd236f18 Mon Sep 17 00:00:00 2001 From: dingsongjie Date: Thu, 16 Jan 2020 17:59:18 +0800 Subject: [PATCH] 修改 部分bug --- samples/apis/Backet.Api/Application/IntegrationEvent/Handler/JustTestWhenProductAddedIntegrationEventHandler.cs | 21 +++++++++++++++++++++ samples/apis/Backet.Api/Application/IntegrationEvent/ProductAddedIntegrationEvent.cs | 13 +++++++++++++ samples/apis/Backet.Api/Backet.Api.csproj | 17 +++++++++++++++++ samples/apis/Backet.Api/Infrastructure/BacketDbContext.cs | 20 ++++++++++++++++++++ samples/apis/Backet.Api/Properties/launchSettings.json | 18 +----------------- samples/apis/Backet.Api/Startup.cs | 52 ++++++++++++++++++++++++++++++++++++++++++++++++++++ samples/apis/Backet.Api/appsettings.json | 29 +++++++++++++++++++++++++++-- samples/apis/Product.Api/Application/DomainEventHandler/AddDefaultProductWhenProductTypeAdded2DomainEventHandler.cs | 14 ++++++++++++-- samples/apis/Product.Api/Application/IntegrationEvent/CallBack/ProductAddedIntegrationEventCallback.cs | 26 ++++++++++++++++++++++++++ samples/apis/Product.Api/Application/Query/Abstraction/IProductQuery.cs | 13 +++++++++++++ samples/apis/Product.Api/Application/Query/EfCoreProductQuery.cs | 25 +++++++++++++++++++++++++ samples/apis/Product.Api/Infrastructure/ProductDbContext.cs | 2 +- samples/apis/Product.Api/Startup.cs | 11 ++++++----- src/Pole.Application/EventBus/EventEntry.cs | 4 +++- src/Pole.Application/EventBus/EventHandler.cs | 14 ++++++++++++++ src/Pole.Application/EventBus/IEventBus.cs | 2 +- src/Pole.Application/EventBus/ReliableEventBus.cs | 4 ++-- src/Pole.Application/EventBus/ReliableMessageTransactionWorker.cs | 4 ++-- src/Pole.Application/Pole.Application.csproj | 1 + src/Pole.ReliableMessage/Abstraction/IReliableBus.cs | 9 +++++---- src/Pole.ReliableMessage/ComteckReliableMessageIApplicationBuilderExtensions.cs | 2 +- src/Pole.ReliableMessage/EventBus/DefaultReliableBus.cs | 22 ++++++++++++++++++---- 22 files changed, 281 insertions(+), 42 deletions(-) create mode 100644 samples/apis/Backet.Api/Application/IntegrationEvent/Handler/JustTestWhenProductAddedIntegrationEventHandler.cs create mode 100644 samples/apis/Backet.Api/Application/IntegrationEvent/ProductAddedIntegrationEvent.cs create mode 100644 samples/apis/Backet.Api/Infrastructure/BacketDbContext.cs create mode 100644 samples/apis/Product.Api/Application/IntegrationEvent/CallBack/ProductAddedIntegrationEventCallback.cs create mode 100644 samples/apis/Product.Api/Application/Query/Abstraction/IProductQuery.cs create mode 100644 samples/apis/Product.Api/Application/Query/EfCoreProductQuery.cs create mode 100644 src/Pole.Application/EventBus/EventHandler.cs diff --git a/samples/apis/Backet.Api/Application/IntegrationEvent/Handler/JustTestWhenProductAddedIntegrationEventHandler.cs b/samples/apis/Backet.Api/Application/IntegrationEvent/Handler/JustTestWhenProductAddedIntegrationEventHandler.cs new file mode 100644 index 0000000..d15a043 --- /dev/null +++ b/samples/apis/Backet.Api/Application/IntegrationEvent/Handler/JustTestWhenProductAddedIntegrationEventHandler.cs @@ -0,0 +1,21 @@ +using Pole.Application.EventBus; +using Pole.ReliableMessage.Abstraction; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; + +namespace Backet.Api.Application.IntegrationEvent.Handler +{ + public class JustTestWhenProductAddedIntegrationEventHandler : IntegrationEventHandler + { + public JustTestWhenProductAddedIntegrationEventHandler(IServiceProvider serviceProvider) : base(serviceProvider) + { + } + + public override Task Handle(IReliableEventHandlerContext context) + { + return Task.FromResult(1); + } + } +} diff --git a/samples/apis/Backet.Api/Application/IntegrationEvent/ProductAddedIntegrationEvent.cs b/samples/apis/Backet.Api/Application/IntegrationEvent/ProductAddedIntegrationEvent.cs new file mode 100644 index 0000000..fd50a07 --- /dev/null +++ b/samples/apis/Backet.Api/Application/IntegrationEvent/ProductAddedIntegrationEvent.cs @@ -0,0 +1,13 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; + +namespace Backet.Api.Application.IntegrationEvent +{ + public class ProductAddedIntegrationEvent + { + public string ProductName { get; set; } + public long Price { get; set; } + } +} diff --git a/samples/apis/Backet.Api/Backet.Api.csproj b/samples/apis/Backet.Api/Backet.Api.csproj index 92605c5..4c80c39 100644 --- a/samples/apis/Backet.Api/Backet.Api.csproj +++ b/samples/apis/Backet.Api/Backet.Api.csproj @@ -4,4 +4,21 @@ netcoreapp3.1 + + + + + + + + + + + + + + + + + diff --git a/samples/apis/Backet.Api/Infrastructure/BacketDbContext.cs b/samples/apis/Backet.Api/Infrastructure/BacketDbContext.cs new file mode 100644 index 0000000..3bafcfb --- /dev/null +++ b/samples/apis/Backet.Api/Infrastructure/BacketDbContext.cs @@ -0,0 +1,20 @@ +using Microsoft.EntityFrameworkCore; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; + +namespace Backet.Api.Infrastructure +{ + public class BacketDbContext : DbContext + { + public BacketDbContext(DbContextOptions options) : base(options) + { + + } + protected override void OnModelCreating(ModelBuilder builder) + { + base.OnModelCreating(builder); + } + } +} diff --git a/samples/apis/Backet.Api/Properties/launchSettings.json b/samples/apis/Backet.Api/Properties/launchSettings.json index 937f6c5..7584100 100644 --- a/samples/apis/Backet.Api/Properties/launchSettings.json +++ b/samples/apis/Backet.Api/Properties/launchSettings.json @@ -1,24 +1,8 @@ { - "iisSettings": { - "windowsAuthentication": false, - "anonymousAuthentication": true, - "iisExpress": { - "applicationUrl": "http://localhost:21703", - "sslPort": 44369 - } - }, "profiles": { - "IIS Express": { - "commandName": "IISExpress", - "launchBrowser": true, - "environmentVariables": { - "ASPNETCORE_ENVIRONMENT": "Development" - } - }, "Backet.Api": { "commandName": "Project", - "launchBrowser": true, - "applicationUrl": "https://localhost:5001;http://localhost:5000", + "applicationUrl": "http://localhost:5001", "environmentVariables": { "ASPNETCORE_ENVIRONMENT": "Development" } diff --git a/samples/apis/Backet.Api/Startup.cs b/samples/apis/Backet.Api/Startup.cs index 7b166b5..7f69c23 100644 --- a/samples/apis/Backet.Api/Startup.cs +++ b/samples/apis/Backet.Api/Startup.cs @@ -2,25 +2,77 @@ using System; using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; +using Backet.Api.Infrastructure; using Microsoft.AspNetCore.Builder; using Microsoft.AspNetCore.Hosting; using Microsoft.AspNetCore.Http; +using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; +using Pole.ReliableMessage.Storage.Mongodb; namespace Backet.Api { public class Startup { + private IConfiguration Configuration { get; } + private IWebHostEnvironment Environment { get; } + public Startup(IConfiguration configuration, IWebHostEnvironment env) + { + Configuration = configuration; + Environment = env; + } // This method gets called by the runtime. Use this method to add services to the container. // For more information on how to configure your application, visit https://go.microsoft.com/fwlink/?LinkID=398940 public void ConfigureServices(IServiceCollection services) { + services.AddDbContext(options => + options.UseNpgsql(Configuration["postgres:main"])); + + services.AddGrpc(option => + { + if (Environment.IsDevelopment()) + { + option.EnableDetailedErrors = true; + } + }); + + services.AddGrpcValidation(); + services.AddGrpcRequestValidator(this.GetType().Assembly); + + services.AddPole(option => + { + option.AddManageredAssemblies(this.GetType().Assembly); + option.AutoInjectionDependency(); + option.AutoInjectionCommandHandlersAndDomainEventHandlers(); + option.AddPoleEntityFrameworkCoreDomain(); + + option.AddPoleReliableMessage(messageOption => + { + messageOption.AddMasstransitRabbitmq(rabbitoption => + { + rabbitoption.RabbitMqHostAddress = Configuration["RabbitmqConfig:HostAddress"]; + rabbitoption.RabbitMqHostUserName = Configuration["RabbitmqConfig:HostUserName"]; + rabbitoption.RabbitMqHostPassword = Configuration["RabbitmqConfig:HostPassword"]; + rabbitoption.QueueNamePrefix = Configuration["ServiceName"]; + }); + messageOption.AddMongodb(mongodbOption => + { + mongodbOption.ServiceCollectionName = Configuration["ServiceName"]; + mongodbOption.Servers = Configuration.GetSection("MongoConfig:Servers").Get(); + }); + messageOption.AddEventAssemblies(typeof(Startup).Assembly) + .AddEventHandlerAssemblies(typeof(Startup).Assembly); + messageOption.NetworkInterfaceGatewayAddress = Configuration["ReliableMessageOption:NetworkInterfaceGatewayAddress"]; + }); + }); } // This method gets called by the runtime. Use this method to configure the HTTP request pipeline. public void Configure(IApplicationBuilder app, IWebHostEnvironment env) { + app.UsePoleReliableMessage(); if (env.IsDevelopment()) { app.UseDeveloperExceptionPage(); diff --git a/samples/apis/Backet.Api/appsettings.json b/samples/apis/Backet.Api/appsettings.json index d9d9a9b..0ccd5c0 100644 --- a/samples/apis/Backet.Api/appsettings.json +++ b/samples/apis/Backet.Api/appsettings.json @@ -3,8 +3,33 @@ "LogLevel": { "Default": "Information", "Microsoft": "Warning", - "Microsoft.Hosting.Lifetime": "Information" + "Microsoft.Hosting.Lifetime": "Information", + "Grpc": "Information" } }, - "AllowedHosts": "*" + "postgres": { + "main": "Server=192.168.0.248;Port=5432;Username=postgres;Password=comteck2020!@#;Database=Pole_samples_Backet;Enlist=True;Timeout=0;Command Timeout=600;Pooling=false;MinPoolSize=20;MaxPoolSize=500;" + }, + "ServiceName": "Backet", + "RabbitmqConfig": { + "HostAddress": "rabbitmq://192.168.0.248/", + "HostUserName": "comteck", + "HostPassword": "comteck3030" + }, + "MongoConfig": { + "Servers": [ + { + "Host": "192.168.0.248", + "Port": "27017" + } + ] + }, + "ReliableMessageOption": { + "NetworkInterfaceGatewayAddress": "192.168.0.1" + }, + "Kestrel": { + "EndpointDefaults": { + "Protocols": "Http2" + } + } } diff --git a/samples/apis/Product.Api/Application/DomainEventHandler/AddDefaultProductWhenProductTypeAdded2DomainEventHandler.cs b/samples/apis/Product.Api/Application/DomainEventHandler/AddDefaultProductWhenProductTypeAdded2DomainEventHandler.cs index d1cb775..592d1d7 100644 --- a/samples/apis/Product.Api/Application/DomainEventHandler/AddDefaultProductWhenProductTypeAdded2DomainEventHandler.cs +++ b/samples/apis/Product.Api/Application/DomainEventHandler/AddDefaultProductWhenProductTypeAdded2DomainEventHandler.cs @@ -1,4 +1,6 @@ -using Pole.Domain; +using Pole.Application.EventBus; +using Pole.Domain; +using Product.Api.Application.IntergrationEvent; using Product.Api.Domain.Event; using Product.Api.Domain.ProductAggregate; using System; @@ -12,15 +14,23 @@ namespace Product.Api.Application.DomainEventHandler public class AddDefaultProductWhenProductTypeAdded2DomainEventHandler : IDomainEventHandler { private readonly IProductRepository _productRepository; - public AddDefaultProductWhenProductTypeAdded2DomainEventHandler(IProductRepository productRepository) + private readonly IEventBus _eventBus; + public AddDefaultProductWhenProductTypeAdded2DomainEventHandler(IProductRepository productRepository, IEventBus eventBus) { _productRepository = productRepository; + _eventBus = eventBus; } public async Task Handle(ProductTypeAddedDomainEvent request, CancellationToken cancellationToken) { Product.Api.Domain.ProductAggregate.Product product = new Product.Api.Domain.ProductAggregate.Product(Guid.NewGuid().ToString("N"), request.ProductTypeName, 100, request.ProductTypeId); _productRepository.Add(product); + ProductAddedIntegrationEvent productAddedIntegrationEvent = new ProductAddedIntegrationEvent() + { + Price = product.Price, + ProductName = product.Name + }; + await _eventBus.Publish(productAddedIntegrationEvent, product.Id); await _productRepository.SaveEntitiesAsync(); } } diff --git a/samples/apis/Product.Api/Application/IntegrationEvent/CallBack/ProductAddedIntegrationEventCallback.cs b/samples/apis/Product.Api/Application/IntegrationEvent/CallBack/ProductAddedIntegrationEventCallback.cs new file mode 100644 index 0000000..edc62b1 --- /dev/null +++ b/samples/apis/Product.Api/Application/IntegrationEvent/CallBack/ProductAddedIntegrationEventCallback.cs @@ -0,0 +1,26 @@ +using Pole.ReliableMessage.Abstraction; +using Product.Api.Application.IntergrationEvent; +using Product.Api.Application.Query.Abstraction; +using Product.Api.Domain.ProductAggregate; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; + +namespace Product.Api.Application.IntegrationEvent.CallBack +{ + public class ProductAddedIntegrationEventCallback : IReliableEventCallback + { + private readonly IProductQuery _productQuery; + public ProductAddedIntegrationEventCallback(IProductQuery productQuery) + { + _productQuery = productQuery; + } + + + public async Task Callback(string callbackParemeter) + { + return await _productQuery.GetById(callbackParemeter) != null; + } + } +} diff --git a/samples/apis/Product.Api/Application/Query/Abstraction/IProductQuery.cs b/samples/apis/Product.Api/Application/Query/Abstraction/IProductQuery.cs new file mode 100644 index 0000000..700bb05 --- /dev/null +++ b/samples/apis/Product.Api/Application/Query/Abstraction/IProductQuery.cs @@ -0,0 +1,13 @@ +using Pole.Core.DependencyInjection; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; + +namespace Product.Api.Application.Query.Abstraction +{ + public interface IProductQuery: IScopedDenpendency + { + Task GetById(string id); + } +} diff --git a/samples/apis/Product.Api/Application/Query/EfCoreProductQuery.cs b/samples/apis/Product.Api/Application/Query/EfCoreProductQuery.cs new file mode 100644 index 0000000..c33c379 --- /dev/null +++ b/samples/apis/Product.Api/Application/Query/EfCoreProductQuery.cs @@ -0,0 +1,25 @@ +using Microsoft.EntityFrameworkCore; +using Product.Api.Application.Query.Abstraction; +using Product.Api.Infrastructure; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; + +namespace Product.Api.Application.Query +{ + public class EfCoreProductQuery : IProductQuery + { + private readonly ProductDbContext _productDbContext; + + public EfCoreProductQuery(ProductDbContext productDbContext) + { + _productDbContext = productDbContext; + } + + public Task GetById(string id) + { + return _productDbContext.Products.FirstOrDefaultAsync(m => m.Id == id); + } + } +} diff --git a/samples/apis/Product.Api/Infrastructure/ProductDbContext.cs b/samples/apis/Product.Api/Infrastructure/ProductDbContext.cs index 4d98704..0fd7ae9 100644 --- a/samples/apis/Product.Api/Infrastructure/ProductDbContext.cs +++ b/samples/apis/Product.Api/Infrastructure/ProductDbContext.cs @@ -13,7 +13,7 @@ namespace Product.Api.Infrastructure { public DbSet Products { get; set; } public DbSet ProductTypes { get; set; } - public ProductDbContext(DbContextOptions options) : base(options) + public ProductDbContext(DbContextOptions options) : base(options) { } diff --git a/samples/apis/Product.Api/Startup.cs b/samples/apis/Product.Api/Startup.cs index 2171936..a855878 100644 --- a/samples/apis/Product.Api/Startup.cs +++ b/samples/apis/Product.Api/Startup.cs @@ -49,23 +49,23 @@ namespace Product.Api option.AutoInjectionCommandHandlersAndDomainEventHandlers(); option.AddPoleEntityFrameworkCoreDomain(); - option.AddPoleReliableMessage(option => + option.AddPoleReliableMessage(messageOption => { - option.AddMasstransitRabbitmq(rabbitoption => + messageOption.AddMasstransitRabbitmq(rabbitoption => { rabbitoption.RabbitMqHostAddress = Configuration["RabbitmqConfig:HostAddress"]; rabbitoption.RabbitMqHostUserName = Configuration["RabbitmqConfig:HostUserName"]; rabbitoption.RabbitMqHostPassword = Configuration["RabbitmqConfig:HostPassword"]; rabbitoption.QueueNamePrefix = Configuration["ServiceName"]; }); - option.AddMongodb(mongodbOption => + messageOption.AddMongodb(mongodbOption => { mongodbOption.ServiceCollectionName = Configuration["ServiceName"]; mongodbOption.Servers = Configuration.GetSection("MongoConfig:Servers").Get(); }); - option.AddEventAssemblies(typeof(Startup).Assembly) + messageOption.AddEventAssemblies(typeof(Startup).Assembly) .AddEventHandlerAssemblies(typeof(Startup).Assembly); - option.NetworkInterfaceGatewayAddress = Configuration["ReliableMessageOption:NetworkInterfaceGatewayAddress"]; + messageOption.NetworkInterfaceGatewayAddress = Configuration["ReliableMessageOption:NetworkInterfaceGatewayAddress"]; }); }); } @@ -73,6 +73,7 @@ namespace Product.Api // This method gets called by the runtime. Use this method to configure the HTTP request pipeline. public void Configure(IApplicationBuilder app, IWebHostEnvironment env) { + app.UsePoleReliableMessage(); if (env.IsDevelopment()) { app.UseDeveloperExceptionPage(); diff --git a/src/Pole.Application/EventBus/EventEntry.cs b/src/Pole.Application/EventBus/EventEntry.cs index 677c96e..e80cb65 100644 --- a/src/Pole.Application/EventBus/EventEntry.cs +++ b/src/Pole.Application/EventBus/EventEntry.cs @@ -10,10 +10,12 @@ namespace Pole.Application.EventBus public object CallbackParemeter { get; private set; } public string PrePublishEventId { get; set; } public bool IsPublished { get; set; } - public EventEntry(object @event,object callbackParemeter) + public Type EventType { get;private set; } + public EventEntry(object @event,object callbackParemeter, Type eventType) { Event = @event; CallbackParemeter = callbackParemeter; + EventType = eventType; } } } diff --git a/src/Pole.Application/EventBus/EventHandler.cs b/src/Pole.Application/EventBus/EventHandler.cs new file mode 100644 index 0000000..5531333 --- /dev/null +++ b/src/Pole.Application/EventBus/EventHandler.cs @@ -0,0 +1,14 @@ +using Pole.ReliableMessage.Masstransit; +using System; +using System.Collections.Generic; +using System.Text; + +namespace Pole.Application.EventBus +{ + public abstract class IntegrationEventHandler : ReliableEventHandler where TEvent : class + { + public IntegrationEventHandler(IServiceProvider serviceProvider) : base(serviceProvider) + { + } + } +} diff --git a/src/Pole.Application/EventBus/IEventBus.cs b/src/Pole.Application/EventBus/IEventBus.cs index 3510334..0502af8 100644 --- a/src/Pole.Application/EventBus/IEventBus.cs +++ b/src/Pole.Application/EventBus/IEventBus.cs @@ -8,6 +8,6 @@ namespace Pole.Application.EventBus { public interface IEventBus { - Task Publish(TReliableEvent @event, object callbackParemeter, CancellationToken cancellationToken = default); + Task Publish(TReliableEvent @event, object callbackParemeter, CancellationToken cancellationToken = default) where TReliableEvent : class; } } diff --git a/src/Pole.Application/EventBus/ReliableEventBus.cs b/src/Pole.Application/EventBus/ReliableEventBus.cs index 30d1366..000f7d3 100644 --- a/src/Pole.Application/EventBus/ReliableEventBus.cs +++ b/src/Pole.Application/EventBus/ReliableEventBus.cs @@ -15,9 +15,9 @@ namespace Pole.Application.EventBus _reliableMessageScopedBuffer = reliableMessageScopedBuffer; } - public Task Publish(TReliableEvent @event, object callbackParemeter, CancellationToken cancellationToken = default) + public Task Publish(TReliableEvent @event, object callbackParemeter, CancellationToken cancellationToken = default) where TReliableEvent : class { - _reliableMessageScopedBuffer.Add(new EventEntry(@event, callbackParemeter)); + _reliableMessageScopedBuffer.Add(new EventEntry(@event, callbackParemeter,typeof(TReliableEvent))); return Task.FromResult(1); } } diff --git a/src/Pole.Application/EventBus/ReliableMessageTransactionWorker.cs b/src/Pole.Application/EventBus/ReliableMessageTransactionWorker.cs index d6c7362..8743426 100644 --- a/src/Pole.Application/EventBus/ReliableMessageTransactionWorker.cs +++ b/src/Pole.Application/EventBus/ReliableMessageTransactionWorker.cs @@ -40,7 +40,7 @@ namespace Pole.Application.EventBus if (events.Count(@event => @event.IsPublished) > 1) { - //这里发布失败 通过预发送 后的重试机制去处理, 因为一旦有一个消息发出去后 无法挽回 + //这里发布失败 通过预发送后的重试机制去处理, 因为一旦有一个消息发出去后 无法挽回 return Task.FromResult(1); } else @@ -63,7 +63,7 @@ namespace Pole.Application.EventBus var events = _reliableMessageScopedBuffer.GetAll(); foreach (var @event in events) { - @event.PrePublishEventId = await _reliableBus.PrePublish(@event.Event, @event.PrePublishEventId, cancellationToken); + @event.PrePublishEventId = await _reliableBus.PrePublish(@event.Event, @event.EventType, @event.PrePublishEventId, cancellationToken); } WorkerStatus = WorkerStatus.PreCommited; } diff --git a/src/Pole.Application/Pole.Application.csproj b/src/Pole.Application/Pole.Application.csproj index 388996d..f529bbe 100644 --- a/src/Pole.Application/Pole.Application.csproj +++ b/src/Pole.Application/Pole.Application.csproj @@ -11,6 +11,7 @@ + diff --git a/src/Pole.ReliableMessage/Abstraction/IReliableBus.cs b/src/Pole.ReliableMessage/Abstraction/IReliableBus.cs index 4fdaeea..bb86e56 100644 --- a/src/Pole.ReliableMessage/Abstraction/IReliableBus.cs +++ b/src/Pole.ReliableMessage/Abstraction/IReliableBus.cs @@ -8,8 +8,9 @@ namespace Pole.ReliableMessage.Abstraction { public interface IReliableBus { - Task PrePublish(TReliableEvent @event,object callbackParemeter, CancellationToken cancellationToken = default); - Task Publish(TReliableEvent @event,string prePublishMessageId, CancellationToken cancellationToken=default); - Task Cancel(string prePublishMessageId, CancellationToken cancellationToken = default); - } + Task PrePublish(TReliableEvent @event, object callbackParemeter, CancellationToken cancellationToken = default) where TReliableEvent : class; + Task PrePublish(object @event, Type eventType, object callbackParemeter, CancellationToken cancellationToken = default); + Task Publish(object @event, string prePublishMessageId, CancellationToken cancellationToken = default); + Task Cancel(string prePublishMessageId, CancellationToken cancellationToken = default); + } } diff --git a/src/Pole.ReliableMessage/ComteckReliableMessageIApplicationBuilderExtensions.cs b/src/Pole.ReliableMessage/ComteckReliableMessageIApplicationBuilderExtensions.cs index 6c6b177..a8f9913 100644 --- a/src/Pole.ReliableMessage/ComteckReliableMessageIApplicationBuilderExtensions.cs +++ b/src/Pole.ReliableMessage/ComteckReliableMessageIApplicationBuilderExtensions.cs @@ -12,7 +12,7 @@ namespace Microsoft.AspNetCore.Builder { public static class ComteckReliableMessageIApplicationBuilderExtensions { - public static IApplicationBuilder UseComteckReliableMessage(this IApplicationBuilder applicationBuilder) + public static IApplicationBuilder UsePoleReliableMessage(this IApplicationBuilder applicationBuilder) { var option = applicationBuilder.ApplicationServices.GetRequiredService(typeof(IOptions)) as IOptions; var messageCallBackRegister = applicationBuilder.ApplicationServices.GetRequiredService(typeof(IMessageCallBackRegister)) as IMessageCallBackRegister; diff --git a/src/Pole.ReliableMessage/EventBus/DefaultReliableBus.cs b/src/Pole.ReliableMessage/EventBus/DefaultReliableBus.cs index eab5425..751965d 100644 --- a/src/Pole.ReliableMessage/EventBus/DefaultReliableBus.cs +++ b/src/Pole.ReliableMessage/EventBus/DefaultReliableBus.cs @@ -47,11 +47,24 @@ namespace Pole.Pole.ReliableMessage.EventBus } } - public async Task PrePublish(TReliableEvent @event, object callbackParemeter, CancellationToken cancellationToken = default) + #region PrePublish + public async Task PrePublish(TReliableEvent @event, object callbackParemeter, CancellationToken cancellationToken = default) where TReliableEvent : class { var messageTypeId = _messageTypeIdGenerator.Generate(typeof(TReliableEvent)); + var content = _jsonConverter.Serialize(@event); + return await PrePublishCore(callbackParemeter, messageTypeId, content); + } + + public async Task PrePublish(object @event, Type eventType, object callbackParemeter, CancellationToken cancellationToken = default) + { + var messageTypeId = _messageTypeIdGenerator.Generate(eventType); + var content = _jsonConverter.Serialize(@event); + return await PrePublishCore(callbackParemeter, messageTypeId, content); + } + private async Task PrePublishCore(object callbackParemeter, string messageTypeId, string content) + { var currentMessageCallbackInfo = _messageCallBackInfoStore.Get(messageTypeId); if (currentMessageCallbackInfo == null) { @@ -64,7 +77,7 @@ namespace Pole.Pole.ReliableMessage.EventBus _logger.LogDebug($"PrePublish message begin ,messageId:{messageId}"); var now = _timeHelper.GetUTCNow(); - var content = _jsonConverter.Serialize(@event); + var callBackParem = _jsonConverter.Serialize(callbackParemeter); Message newMessage = new Message() { @@ -88,9 +101,10 @@ namespace Pole.Pole.ReliableMessage.EventBus _logger.LogError(ex, errorInfo); throw new Exception(errorInfo, ex); } - } + } + #endregion - public async Task Publish(TReliableEvent @event, string prePublishMessageId, CancellationToken cancellationToken = default) + public async Task Publish(object @event, string prePublishMessageId, CancellationToken cancellationToken = default) { try { -- libgit2 0.25.0