Commit aa873b89 by dingsongjie

基本流程走通

parent 0ad97bd8
using Pole.Application.EventBus; using Pole.Application.EventBus;
using Pole.ReliableMessage.Abstraction; using Pole.ReliableMessage.Abstraction;
using Product.Api.Application.IntergrationEvent;
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq; using System.Linq;
......
...@@ -3,7 +3,7 @@ using System.Collections.Generic; ...@@ -3,7 +3,7 @@ using System.Collections.Generic;
using System.Linq; using System.Linq;
using System.Threading.Tasks; using System.Threading.Tasks;
namespace Backet.Api.Application.IntegrationEvent namespace Product.Api.Application.IntergrationEvent
{ {
public class ProductAddedIntegrationEvent public class ProductAddedIntegrationEvent
{ {
......
...@@ -56,6 +56,7 @@ namespace Backet.Api ...@@ -56,6 +56,7 @@ namespace Backet.Api
rabbitoption.RabbitMqHostUserName = Configuration["RabbitmqConfig:HostUserName"]; rabbitoption.RabbitMqHostUserName = Configuration["RabbitmqConfig:HostUserName"];
rabbitoption.RabbitMqHostPassword = Configuration["RabbitmqConfig:HostPassword"]; rabbitoption.RabbitMqHostPassword = Configuration["RabbitmqConfig:HostPassword"];
rabbitoption.QueueNamePrefix = Configuration["ServiceName"]; rabbitoption.QueueNamePrefix = Configuration["ServiceName"];
rabbitoption.EventHandlerNameSuffix = "IntegrationEventHandler";
}); });
messageOption.AddMongodb(mongodbOption => messageOption.AddMongodb(mongodbOption =>
{ {
......
...@@ -39,7 +39,6 @@ namespace Product.Api.Application.CommandHandler ...@@ -39,7 +39,6 @@ namespace Product.Api.Application.CommandHandler
await _unitOfWork.Compelete(); await _unitOfWork.Compelete();
return CommonCommandResponse.SuccessResponse; return CommonCommandResponse.SuccessResponse;
} }
} }
} }
...@@ -57,6 +57,7 @@ namespace Product.Api ...@@ -57,6 +57,7 @@ namespace Product.Api
rabbitoption.RabbitMqHostUserName = Configuration["RabbitmqConfig:HostUserName"]; rabbitoption.RabbitMqHostUserName = Configuration["RabbitmqConfig:HostUserName"];
rabbitoption.RabbitMqHostPassword = Configuration["RabbitmqConfig:HostPassword"]; rabbitoption.RabbitMqHostPassword = Configuration["RabbitmqConfig:HostPassword"];
rabbitoption.QueueNamePrefix = Configuration["ServiceName"]; rabbitoption.QueueNamePrefix = Configuration["ServiceName"];
rabbitoption.EventHandlerNameSuffix = "IntegrationEventHandler";
}); });
messageOption.AddMongodb(mongodbOption => messageOption.AddMongodb(mongodbOption =>
{ {
......
...@@ -24,16 +24,17 @@ namespace Pole.Application.EventBus ...@@ -24,16 +24,17 @@ namespace Pole.Application.EventBus
public WorkerStatus WorkerStatus { get; set; } public WorkerStatus WorkerStatus { get; set; }
public Task Commit(CancellationToken cancellationToken = default) public async Task Commit(CancellationToken cancellationToken = default)
{ {
var events = _reliableMessageScopedBuffer.GetAll(); var events = _reliableMessageScopedBuffer.GetAll();
try try
{ {
events.ToList().ForEach(async @event => var tasks = events.Select(async @event =>
{ {
await _reliableBus.Publish(@event.Event, @event.PrePublishEventId, cancellationToken); await _reliableBus.Publish(@event.Event, @event.PrePublishEventId, cancellationToken);
@event.IsPublished = true; @event.IsPublished = true;
}); });
await Task.WhenAll(tasks);
} }
catch (Exception ex) catch (Exception ex)
{ {
...@@ -41,7 +42,7 @@ namespace Pole.Application.EventBus ...@@ -41,7 +42,7 @@ namespace Pole.Application.EventBus
if (events.Count(@event => @event.IsPublished) > 1) if (events.Count(@event => @event.IsPublished) > 1)
{ {
//这里发布失败 通过预发送后的重试机制去处理, 因为一旦有一个消息发出去后 无法挽回 //这里发布失败 通过预发送后的重试机制去处理, 因为一旦有一个消息发出去后 无法挽回
return Task.FromResult(1); return;
} }
else else
{ {
...@@ -50,7 +51,7 @@ namespace Pole.Application.EventBus ...@@ -50,7 +51,7 @@ namespace Pole.Application.EventBus
} }
} }
WorkerStatus = WorkerStatus.Commited; WorkerStatus = WorkerStatus.Commited;
return Task.FromResult(1); return;
} }
public void Dispose() public void Dispose()
...@@ -63,7 +64,7 @@ namespace Pole.Application.EventBus ...@@ -63,7 +64,7 @@ namespace Pole.Application.EventBus
var events = _reliableMessageScopedBuffer.GetAll(); var events = _reliableMessageScopedBuffer.GetAll();
foreach (var @event in events) foreach (var @event in events)
{ {
@event.PrePublishEventId = await _reliableBus.PrePublish(@event.Event, @event.EventType, @event.PrePublishEventId, cancellationToken); @event.PrePublishEventId = await _reliableBus.PrePublish(@event.Event, @event.EventType, @event.CallbackParemeter, cancellationToken);
} }
WorkerStatus = WorkerStatus.PreCommited; WorkerStatus = WorkerStatus.PreCommited;
} }
......
...@@ -15,28 +15,30 @@ namespace Pole.Domain.UnitOfWork ...@@ -15,28 +15,30 @@ namespace Pole.Domain.UnitOfWork
{ {
_workers = serviceProvider.GetServices<IWorker>().ToList(); _workers = serviceProvider.GetServices<IWorker>().ToList();
} }
public Task Compelete(CancellationToken cancellationToken = default) public async Task Compelete(CancellationToken cancellationToken = default)
{ {
_workers.OrderBy(worker => worker.Order).ToList().ForEach(async worker => var preCommitTasks = _workers.OrderBy(worker => worker.Order).Select(async worker =>
{ {
await worker.PreCommit(); await worker.PreCommit();
}); });
await Task.WhenAll(preCommitTasks);
try try
{ {
_workers.OrderBy(worker => worker.Order).ToList().ForEach(async worker => var commitTasks = _workers.OrderBy(worker => worker.Order).Select(async worker =>
{ {
await worker.Commit(); await worker.Commit();
}); });
await Task.WhenAll(commitTasks);
} }
catch (Exception ex) catch (Exception ex)
{ {
_workers.OrderBy(worker => worker.Order).Where(worker => worker.WorkerStatus == WorkerStatus.Commited).ToList().ForEach(async worker => var rollbackTasks = _workers.OrderBy(worker => worker.Order).Where(worker => worker.WorkerStatus == WorkerStatus.Commited).Select(async worker =>
{ {
await worker.Rollback(); await worker.Rollback();
}); });
await Task.WhenAll(rollbackTasks);
throw ex; throw ex;
} }
return Task.FromResult(1);
} }
public void Dispose() public void Dispose()
......
...@@ -19,13 +19,13 @@ namespace Pole.ReliableMessage.Masstransit ...@@ -19,13 +19,13 @@ namespace Pole.ReliableMessage.Masstransit
public MasstransitEventHandlerRegistrar Create(Type eventHnadler) public MasstransitEventHandlerRegistrar Create(Type eventHnadler)
{ {
if (!eventHnadler.Name.EndsWith("EventHandler")) if (!eventHnadler.Name.EndsWith(_masstransitOptions.EventHandlerNameSuffix))
{ {
throw new Exception("EventHandler Name Must EndWith EventHandler"); throw new Exception($"EventHandler Name Must EndWith {_masstransitOptions.EventHandlerNameSuffix}");
} }
var reliableEventHandlerParemeterAttribute = eventHnadler.GetCustomAttributes(typeof(ReliableEventHandlerParemeterAttribute), true).FirstOrDefault(); var reliableEventHandlerParemeterAttribute = eventHnadler.GetCustomAttributes(typeof(ReliableEventHandlerParemeterAttribute), true).FirstOrDefault();
var eventHandlerName = GetQueueName(reliableEventHandlerParemeterAttribute, eventHnadler, _masstransitOptions.QueueNamePrefix); var eventHandlerName = GetQueueName(reliableEventHandlerParemeterAttribute, eventHnadler, _masstransitOptions.QueueNamePrefix, _masstransitOptions.EventHandlerNameSuffix);
var parentEventHandler = eventHnadler.BaseType; var parentEventHandler = eventHnadler.BaseType;
var eventType = parentEventHandler.GetGenericArguments().ToList().FirstOrDefault(); var eventType = parentEventHandler.GetGenericArguments().ToList().FirstOrDefault();
...@@ -36,9 +36,9 @@ namespace Pole.ReliableMessage.Masstransit ...@@ -36,9 +36,9 @@ namespace Pole.ReliableMessage.Masstransit
return eventHandlerRegisterInvoker; return eventHandlerRegisterInvoker;
} }
private string GetQueueName(object reliableEventHandlerParemeterAttribute, Type eventHnadler, string queueNamePrefix) private string GetQueueName(object reliableEventHandlerParemeterAttribute, Type eventHnadler, string queueNamePrefix,string eventHandlerNameSuffix)
{ {
var eventHandlerDefaultName = $"eventHandler-{ eventHnadler.Name.Replace("EventHandler", "").ToLowerInvariant()}"; var eventHandlerDefaultName = $"eventHandler-{ eventHnadler.Name.Replace(eventHandlerNameSuffix, "").ToLowerInvariant()}";
var eventHandlerName = string.IsNullOrEmpty(queueNamePrefix) ? eventHandlerDefaultName : $"{queueNamePrefix}-{eventHandlerDefaultName}"; var eventHandlerName = string.IsNullOrEmpty(queueNamePrefix) ? eventHandlerDefaultName : $"{queueNamePrefix}-{eventHandlerDefaultName}";
if (reliableEventHandlerParemeterAttribute != null) if (reliableEventHandlerParemeterAttribute != null)
......
...@@ -15,7 +15,7 @@ namespace Pole.ReliableMessage.Masstransit ...@@ -15,7 +15,7 @@ namespace Pole.ReliableMessage.Masstransit
_bus = bus; _bus = bus;
} }
private readonly MassTransit.IBus _bus; private readonly MassTransit.IBus _bus;
public Task Publish(object @event,string reliableMessageId, CancellationToken cancellationToken = default(CancellationToken)) public Task Publish(object @event, string reliableMessageId, CancellationToken cancellationToken = default)
{ {
var pipe = new AddReliableMessageIdPipe(reliableMessageId); var pipe = new AddReliableMessageIdPipe(reliableMessageId);
return _bus.Publish(@event, pipe, cancellationToken); return _bus.Publish(@event, pipe, cancellationToken);
......
...@@ -13,6 +13,8 @@ namespace Pole.ReliableMessage.Masstransit ...@@ -13,6 +13,8 @@ namespace Pole.ReliableMessage.Masstransit
public string RabbitMqHostUserName { get; set; } public string RabbitMqHostUserName { get; set; }
public string RabbitMqHostPassword { get; set; } public string RabbitMqHostPassword { get; set; }
public string QueueNamePrefix { get; set; } = string.Empty; public string QueueNamePrefix { get; set; } = string.Empty;
public string EventHandlerNameSuffix = "EventHandler";
/// <summary> /// <summary>
/// 2 个并发 /// 2 个并发
/// </summary> /// </summary>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment