Commit cdda9bcf by dingsongjie

fix bug

parent 672f72c7
...@@ -9,7 +9,6 @@ namespace Pole.Application.EventBus ...@@ -9,7 +9,6 @@ namespace Pole.Application.EventBus
public object Event { get;private set; } public object Event { get;private set; }
public object CallbackParemeter { get; private set; } public object CallbackParemeter { get; private set; }
public string PrePublishEventId { get; set; } public string PrePublishEventId { get; set; }
public bool IsPublished { get; set; }
public Type EventType { get;private set; } public Type EventType { get;private set; }
public EventEntry(object @event,object callbackParemeter, Type eventType) public EventEntry(object @event,object callbackParemeter, Type eventType)
{ {
......
using Pole.Domain.UnitOfWork; using Microsoft.Extensions.Logging;
using Pole.Domain.UnitOfWork;
using Pole.ReliableMessage.Abstraction; using Pole.ReliableMessage.Abstraction;
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
...@@ -13,11 +14,13 @@ namespace Pole.Application.EventBus ...@@ -13,11 +14,13 @@ namespace Pole.Application.EventBus
{ {
private readonly IReliableMessageScopedBuffer _reliableMessageScopedBuffer; private readonly IReliableMessageScopedBuffer _reliableMessageScopedBuffer;
private readonly IReliableBus _reliableBus; private readonly IReliableBus _reliableBus;
private readonly ILogger<ReliableMessageTransactionWorker> _logger;
public ReliableMessageTransactionWorker(IReliableMessageScopedBuffer reliableMessageScopedBuffer, IReliableBus reliableBus) public ReliableMessageTransactionWorker(IReliableMessageScopedBuffer reliableMessageScopedBuffer, IReliableBus reliableBus, ILogger<ReliableMessageTransactionWorker> logger)
{ {
_reliableMessageScopedBuffer = reliableMessageScopedBuffer; _reliableMessageScopedBuffer = reliableMessageScopedBuffer;
_reliableBus = reliableBus; _reliableBus = reliableBus;
_logger = logger;
} }
public int Order => 200; public int Order => 200;
...@@ -30,25 +33,15 @@ namespace Pole.Application.EventBus ...@@ -30,25 +33,15 @@ namespace Pole.Application.EventBus
try try
{ {
var tasks = events.Select(async @event => var tasks = events.Select(async @event =>
{ {
await _reliableBus.Publish(@event.Event, @event.PrePublishEventId, cancellationToken); await _reliableBus.Publish(@event.Event, @event.PrePublishEventId, cancellationToken);
@event.IsPublished = true; });
});
await Task.WhenAll(tasks); await Task.WhenAll(tasks);
} }
catch (Exception ex) catch (Exception ex)
{ {
_logger.LogError(ex, "ReliableMessageTransactionWorker.Commit error");
if (events.Count(@event => @event.IsPublished) > 1) // 此时 预发送成功 ,数据库事务提交成功 ,发送消息至消息队列失败 ,任然返回成功 ,因为预发送消息 的重试机制会让 消息发送成功
{
//这里发布失败 通过预发送后的重试机制去处理, 因为一旦有一个消息发出去后 无法挽回
return;
}
else
{
// 这里抛出错误 ,统一工作单元拦截后会 回滚整个工作单元
throw ex;
}
} }
WorkerStatus = WorkerStatus.Commited; WorkerStatus = WorkerStatus.Commited;
return; return;
...@@ -75,7 +68,6 @@ namespace Pole.Application.EventBus ...@@ -75,7 +68,6 @@ namespace Pole.Application.EventBus
events.Where(m => !string.IsNullOrEmpty(m.PrePublishEventId)).ToList().ForEach(async @event => events.Where(m => !string.IsNullOrEmpty(m.PrePublishEventId)).ToList().ForEach(async @event =>
{ {
await _reliableBus.Cancel(@event.PrePublishEventId, cancellationToken); await _reliableBus.Cancel(@event.PrePublishEventId, cancellationToken);
@event.IsPublished = true;
}); });
WorkerStatus = WorkerStatus.Rollbacked; WorkerStatus = WorkerStatus.Rollbacked;
return Task.FromResult(1); return Task.FromResult(1);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment