From cdda9bcf740c91112804bb323c5b2dadc2662ea9 Mon Sep 17 00:00:00 2001 From: dingsongjie Date: Sun, 19 Jan 2020 15:28:10 +0800 Subject: [PATCH] fix bug --- src/Pole.Application/EventBus/EventEntry.cs | 1 - src/Pole.Application/EventBus/ReliableMessageTransactionWorker.cs | 28 ++++++++++------------------ 2 files changed, 10 insertions(+), 19 deletions(-) diff --git a/src/Pole.Application/EventBus/EventEntry.cs b/src/Pole.Application/EventBus/EventEntry.cs index e80cb65..237a607 100644 --- a/src/Pole.Application/EventBus/EventEntry.cs +++ b/src/Pole.Application/EventBus/EventEntry.cs @@ -9,7 +9,6 @@ namespace Pole.Application.EventBus public object Event { get;private set; } public object CallbackParemeter { get; private set; } public string PrePublishEventId { get; set; } - public bool IsPublished { get; set; } public Type EventType { get;private set; } public EventEntry(object @event,object callbackParemeter, Type eventType) { diff --git a/src/Pole.Application/EventBus/ReliableMessageTransactionWorker.cs b/src/Pole.Application/EventBus/ReliableMessageTransactionWorker.cs index 049650d..56a465c 100644 --- a/src/Pole.Application/EventBus/ReliableMessageTransactionWorker.cs +++ b/src/Pole.Application/EventBus/ReliableMessageTransactionWorker.cs @@ -1,4 +1,5 @@ -using Pole.Domain.UnitOfWork; +using Microsoft.Extensions.Logging; +using Pole.Domain.UnitOfWork; using Pole.ReliableMessage.Abstraction; using System; using System.Collections.Generic; @@ -13,11 +14,13 @@ namespace Pole.Application.EventBus { private readonly IReliableMessageScopedBuffer _reliableMessageScopedBuffer; private readonly IReliableBus _reliableBus; + private readonly ILogger _logger; - public ReliableMessageTransactionWorker(IReliableMessageScopedBuffer reliableMessageScopedBuffer, IReliableBus reliableBus) + public ReliableMessageTransactionWorker(IReliableMessageScopedBuffer reliableMessageScopedBuffer, IReliableBus reliableBus, ILogger logger) { _reliableMessageScopedBuffer = reliableMessageScopedBuffer; _reliableBus = reliableBus; + _logger = logger; } public int Order => 200; @@ -30,25 +33,15 @@ namespace Pole.Application.EventBus try { var tasks = events.Select(async @event => - { - await _reliableBus.Publish(@event.Event, @event.PrePublishEventId, cancellationToken); - @event.IsPublished = true; - }); + { + await _reliableBus.Publish(@event.Event, @event.PrePublishEventId, cancellationToken); + }); await Task.WhenAll(tasks); } catch (Exception ex) { - - if (events.Count(@event => @event.IsPublished) > 1) - { - //这里发布失败 通过预发送后的重试机制去处理, 因为一旦有一个消息发出去后 无法挽回 - return; - } - else - { - // 这里抛出错误 ,统一工作单元拦截后会 回滚整个工作单元 - throw ex; - } + _logger.LogError(ex, "ReliableMessageTransactionWorker.Commit error"); + // 此时 预发送成功 ,数据库事务提交成功 ,发送消息至消息队列失败 ,任然返回成功 ,因为预发送消息 的重试机制会让 消息发送成功 } WorkerStatus = WorkerStatus.Commited; return; @@ -75,7 +68,6 @@ namespace Pole.Application.EventBus events.Where(m => !string.IsNullOrEmpty(m.PrePublishEventId)).ToList().ForEach(async @event => { await _reliableBus.Cancel(@event.PrePublishEventId, cancellationToken); - @event.IsPublished = true; }); WorkerStatus = WorkerStatus.Rollbacked; return Task.FromResult(1); -- libgit2 0.25.0