From aa873b89b1608138394fb745d6a58b54ae580532 Mon Sep 17 00:00:00 2001 From: dingsongjie Date: Fri, 17 Jan 2020 11:59:54 +0800 Subject: [PATCH] 基本流程走通 --- samples/apis/Backet.Api/Application/IntegrationEvent/Handler/JustTestWhenProductAddedIntegrationEventHandler.cs | 1 + samples/apis/Backet.Api/Application/IntegrationEvent/ProductAddedIntegrationEvent.cs | 2 +- samples/apis/Backet.Api/Startup.cs | 1 + samples/apis/Product.Api/Application/CommandHandler/AddProductTypeCommandHandler.cs | 1 - samples/apis/Product.Api/Startup.cs | 1 + src/Pole.Application/EventBus/ReliableMessageTransactionWorker.cs | 19 ++++++++++--------- src/Pole.Domain/UnitOfWork/DefaultUnitOfWork.cs | 18 ++++++++++-------- src/Pole.ReliableMessage.Masstransit/DefaultReliableEventHandlerRegistrarFactory.cs | 10 +++++----- src/Pole.ReliableMessage.Masstransit/MasstransitBasedMessageBus.cs | 2 +- src/Pole.ReliableMessage.Masstransit/MasstransitRabbitmqOption.cs | 2 ++ 10 files changed, 32 insertions(+), 25 deletions(-) diff --git a/samples/apis/Backet.Api/Application/IntegrationEvent/Handler/JustTestWhenProductAddedIntegrationEventHandler.cs b/samples/apis/Backet.Api/Application/IntegrationEvent/Handler/JustTestWhenProductAddedIntegrationEventHandler.cs index d15a043..22c6055 100644 --- a/samples/apis/Backet.Api/Application/IntegrationEvent/Handler/JustTestWhenProductAddedIntegrationEventHandler.cs +++ b/samples/apis/Backet.Api/Application/IntegrationEvent/Handler/JustTestWhenProductAddedIntegrationEventHandler.cs @@ -1,5 +1,6 @@ using Pole.Application.EventBus; using Pole.ReliableMessage.Abstraction; +using Product.Api.Application.IntergrationEvent; using System; using System.Collections.Generic; using System.Linq; diff --git a/samples/apis/Backet.Api/Application/IntegrationEvent/ProductAddedIntegrationEvent.cs b/samples/apis/Backet.Api/Application/IntegrationEvent/ProductAddedIntegrationEvent.cs index fd50a07..5a9ec41 100644 --- a/samples/apis/Backet.Api/Application/IntegrationEvent/ProductAddedIntegrationEvent.cs +++ b/samples/apis/Backet.Api/Application/IntegrationEvent/ProductAddedIntegrationEvent.cs @@ -3,7 +3,7 @@ using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; -namespace Backet.Api.Application.IntegrationEvent +namespace Product.Api.Application.IntergrationEvent { public class ProductAddedIntegrationEvent { diff --git a/samples/apis/Backet.Api/Startup.cs b/samples/apis/Backet.Api/Startup.cs index 7f69c23..f08a743 100644 --- a/samples/apis/Backet.Api/Startup.cs +++ b/samples/apis/Backet.Api/Startup.cs @@ -56,6 +56,7 @@ namespace Backet.Api rabbitoption.RabbitMqHostUserName = Configuration["RabbitmqConfig:HostUserName"]; rabbitoption.RabbitMqHostPassword = Configuration["RabbitmqConfig:HostPassword"]; rabbitoption.QueueNamePrefix = Configuration["ServiceName"]; + rabbitoption.EventHandlerNameSuffix = "IntegrationEventHandler"; }); messageOption.AddMongodb(mongodbOption => { diff --git a/samples/apis/Product.Api/Application/CommandHandler/AddProductTypeCommandHandler.cs b/samples/apis/Product.Api/Application/CommandHandler/AddProductTypeCommandHandler.cs index 38d6c65..c2546a7 100644 --- a/samples/apis/Product.Api/Application/CommandHandler/AddProductTypeCommandHandler.cs +++ b/samples/apis/Product.Api/Application/CommandHandler/AddProductTypeCommandHandler.cs @@ -39,7 +39,6 @@ namespace Product.Api.Application.CommandHandler await _unitOfWork.Compelete(); return CommonCommandResponse.SuccessResponse; - } } } diff --git a/samples/apis/Product.Api/Startup.cs b/samples/apis/Product.Api/Startup.cs index a855878..3a43bff 100644 --- a/samples/apis/Product.Api/Startup.cs +++ b/samples/apis/Product.Api/Startup.cs @@ -57,6 +57,7 @@ namespace Product.Api rabbitoption.RabbitMqHostUserName = Configuration["RabbitmqConfig:HostUserName"]; rabbitoption.RabbitMqHostPassword = Configuration["RabbitmqConfig:HostPassword"]; rabbitoption.QueueNamePrefix = Configuration["ServiceName"]; + rabbitoption.EventHandlerNameSuffix = "IntegrationEventHandler"; }); messageOption.AddMongodb(mongodbOption => { diff --git a/src/Pole.Application/EventBus/ReliableMessageTransactionWorker.cs b/src/Pole.Application/EventBus/ReliableMessageTransactionWorker.cs index 8743426..049650d 100644 --- a/src/Pole.Application/EventBus/ReliableMessageTransactionWorker.cs +++ b/src/Pole.Application/EventBus/ReliableMessageTransactionWorker.cs @@ -24,16 +24,17 @@ namespace Pole.Application.EventBus public WorkerStatus WorkerStatus { get; set; } - public Task Commit(CancellationToken cancellationToken = default) + public async Task Commit(CancellationToken cancellationToken = default) { var events = _reliableMessageScopedBuffer.GetAll(); try { - events.ToList().ForEach(async @event => - { - await _reliableBus.Publish(@event.Event, @event.PrePublishEventId, cancellationToken); - @event.IsPublished = true; - }); + var tasks = events.Select(async @event => + { + await _reliableBus.Publish(@event.Event, @event.PrePublishEventId, cancellationToken); + @event.IsPublished = true; + }); + await Task.WhenAll(tasks); } catch (Exception ex) { @@ -41,7 +42,7 @@ namespace Pole.Application.EventBus if (events.Count(@event => @event.IsPublished) > 1) { //这里发布失败 通过预发送后的重试机制去处理, 因为一旦有一个消息发出去后 无法挽回 - return Task.FromResult(1); + return; } else { @@ -50,7 +51,7 @@ namespace Pole.Application.EventBus } } WorkerStatus = WorkerStatus.Commited; - return Task.FromResult(1); + return; } public void Dispose() @@ -63,7 +64,7 @@ namespace Pole.Application.EventBus var events = _reliableMessageScopedBuffer.GetAll(); foreach (var @event in events) { - @event.PrePublishEventId = await _reliableBus.PrePublish(@event.Event, @event.EventType, @event.PrePublishEventId, cancellationToken); + @event.PrePublishEventId = await _reliableBus.PrePublish(@event.Event, @event.EventType, @event.CallbackParemeter, cancellationToken); } WorkerStatus = WorkerStatus.PreCommited; } diff --git a/src/Pole.Domain/UnitOfWork/DefaultUnitOfWork.cs b/src/Pole.Domain/UnitOfWork/DefaultUnitOfWork.cs index 61cfb6d..e2bd340 100644 --- a/src/Pole.Domain/UnitOfWork/DefaultUnitOfWork.cs +++ b/src/Pole.Domain/UnitOfWork/DefaultUnitOfWork.cs @@ -15,28 +15,30 @@ namespace Pole.Domain.UnitOfWork { _workers = serviceProvider.GetServices().ToList(); } - public Task Compelete(CancellationToken cancellationToken = default) + public async Task Compelete(CancellationToken cancellationToken = default) { - _workers.OrderBy(worker => worker.Order).ToList().ForEach(async worker => + var preCommitTasks = _workers.OrderBy(worker => worker.Order).Select(async worker => { await worker.PreCommit(); }); + await Task.WhenAll(preCommitTasks); try { - _workers.OrderBy(worker => worker.Order).ToList().ForEach(async worker => + var commitTasks = _workers.OrderBy(worker => worker.Order).Select(async worker => { await worker.Commit(); }); + await Task.WhenAll(commitTasks); } catch (Exception ex) { - _workers.OrderBy(worker => worker.Order).Where(worker => worker.WorkerStatus == WorkerStatus.Commited).ToList().ForEach(async worker => - { - await worker.Rollback(); - }); + var rollbackTasks = _workers.OrderBy(worker => worker.Order).Where(worker => worker.WorkerStatus == WorkerStatus.Commited).Select(async worker => + { + await worker.Rollback(); + }); + await Task.WhenAll(rollbackTasks); throw ex; } - return Task.FromResult(1); } public void Dispose() diff --git a/src/Pole.ReliableMessage.Masstransit/DefaultReliableEventHandlerRegistrarFactory.cs b/src/Pole.ReliableMessage.Masstransit/DefaultReliableEventHandlerRegistrarFactory.cs index 402561c..439e0b1 100644 --- a/src/Pole.ReliableMessage.Masstransit/DefaultReliableEventHandlerRegistrarFactory.cs +++ b/src/Pole.ReliableMessage.Masstransit/DefaultReliableEventHandlerRegistrarFactory.cs @@ -19,13 +19,13 @@ namespace Pole.ReliableMessage.Masstransit public MasstransitEventHandlerRegistrar Create(Type eventHnadler) { - if (!eventHnadler.Name.EndsWith("EventHandler")) + if (!eventHnadler.Name.EndsWith(_masstransitOptions.EventHandlerNameSuffix)) { - throw new Exception("EventHandler Name Must EndWith EventHandler"); + throw new Exception($"EventHandler Name Must EndWith {_masstransitOptions.EventHandlerNameSuffix}"); } var reliableEventHandlerParemeterAttribute = eventHnadler.GetCustomAttributes(typeof(ReliableEventHandlerParemeterAttribute), true).FirstOrDefault(); - var eventHandlerName = GetQueueName(reliableEventHandlerParemeterAttribute, eventHnadler, _masstransitOptions.QueueNamePrefix); + var eventHandlerName = GetQueueName(reliableEventHandlerParemeterAttribute, eventHnadler, _masstransitOptions.QueueNamePrefix, _masstransitOptions.EventHandlerNameSuffix); var parentEventHandler = eventHnadler.BaseType; var eventType = parentEventHandler.GetGenericArguments().ToList().FirstOrDefault(); @@ -36,9 +36,9 @@ namespace Pole.ReliableMessage.Masstransit return eventHandlerRegisterInvoker; } - private string GetQueueName(object reliableEventHandlerParemeterAttribute, Type eventHnadler, string queueNamePrefix) + private string GetQueueName(object reliableEventHandlerParemeterAttribute, Type eventHnadler, string queueNamePrefix,string eventHandlerNameSuffix) { - var eventHandlerDefaultName = $"eventHandler-{ eventHnadler.Name.Replace("EventHandler", "").ToLowerInvariant()}"; + var eventHandlerDefaultName = $"eventHandler-{ eventHnadler.Name.Replace(eventHandlerNameSuffix, "").ToLowerInvariant()}"; var eventHandlerName = string.IsNullOrEmpty(queueNamePrefix) ? eventHandlerDefaultName : $"{queueNamePrefix}-{eventHandlerDefaultName}"; if (reliableEventHandlerParemeterAttribute != null) diff --git a/src/Pole.ReliableMessage.Masstransit/MasstransitBasedMessageBus.cs b/src/Pole.ReliableMessage.Masstransit/MasstransitBasedMessageBus.cs index 3f9665e..c1a8e12 100644 --- a/src/Pole.ReliableMessage.Masstransit/MasstransitBasedMessageBus.cs +++ b/src/Pole.ReliableMessage.Masstransit/MasstransitBasedMessageBus.cs @@ -15,7 +15,7 @@ namespace Pole.ReliableMessage.Masstransit _bus = bus; } private readonly MassTransit.IBus _bus; - public Task Publish(object @event,string reliableMessageId, CancellationToken cancellationToken = default(CancellationToken)) + public Task Publish(object @event, string reliableMessageId, CancellationToken cancellationToken = default) { var pipe = new AddReliableMessageIdPipe(reliableMessageId); return _bus.Publish(@event, pipe, cancellationToken); diff --git a/src/Pole.ReliableMessage.Masstransit/MasstransitRabbitmqOption.cs b/src/Pole.ReliableMessage.Masstransit/MasstransitRabbitmqOption.cs index 34e3678..50e533e 100644 --- a/src/Pole.ReliableMessage.Masstransit/MasstransitRabbitmqOption.cs +++ b/src/Pole.ReliableMessage.Masstransit/MasstransitRabbitmqOption.cs @@ -13,6 +13,8 @@ namespace Pole.ReliableMessage.Masstransit public string RabbitMqHostUserName { get; set; } public string RabbitMqHostPassword { get; set; } public string QueueNamePrefix { get; set; } = string.Empty; + + public string EventHandlerNameSuffix = "EventHandler"; /// /// 2 个并发 /// -- libgit2 0.25.0