DefaultReliableBus.cs
4.74 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
using Pole.ReliableMessage.Abstraction;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using ILogger = Microsoft.Extensions.Logging.ILogger;
using Pole.ReliableMessage.Storage.Abstraction;
namespace Pole.Pole.ReliableMessage.EventBus
{
public class DefaultReliableBus : IReliableBus
{
private readonly IMessageBus _messageBus;
private readonly IMessageStorage _messageStorage;
private readonly IMessageIdGenerator _messageIdGenerator;
private readonly ITimeHelper _timeHelper;
//private readonly IMessageBuffer _messageBuffer;
private readonly ILogger _logger;
private readonly IJsonConverter _jsonConverter;
private readonly IMessageCallBackInfoStore _messageCallBackInfoStore;
private readonly IMessageTypeIdGenerator _messageTypeIdGenerator;
public DefaultReliableBus(IMessageBus messageBus, IMessageStorage messageStorage, IMessageIdGenerator messageIdGenerator, ITimeHelper timeHelper, ILogger<DefaultReliableBus> logger, IJsonConverter jsonConverter, IMessageCallBackInfoStore messageCallBackInfoStore, IMessageTypeIdGenerator messageTypeIdGenerator)
{
_messageBus = messageBus;
_messageStorage = messageStorage;
_messageIdGenerator = messageIdGenerator;
_timeHelper = timeHelper;
_logger = logger;
_jsonConverter = jsonConverter;
_messageCallBackInfoStore = messageCallBackInfoStore;
_messageTypeIdGenerator = messageTypeIdGenerator;
}
public Task<bool> Cancel<TReliableEvent>(TReliableEvent @event, string prePublishMessageId, CancellationToken cancellationToken = default)
{
try
{
return _messageStorage.UpdateStatus(m => m.Id == prePublishMessageId, MessageStatus.Canced);
}
catch (Exception ex)
{
var errorInfo = $"Cancel PrePublish errors in defaultReliableBus;{ex.Message}";
_logger.LogError(ex, errorInfo);
throw new Exception(errorInfo, ex);
}
}
public async Task<string> PrePublish<TReliableEvent>(TReliableEvent @event, object callbackParemeter, CancellationToken cancellationToken = default)
{
var messageTypeId = _messageTypeIdGenerator.Generate(typeof(TReliableEvent));
var currentMessageCallbackInfo = _messageCallBackInfoStore.Get(messageTypeId);
if (currentMessageCallbackInfo == null)
{
throw new Exception($"Current message type Callback not registered ,messageTypeId:{messageTypeId}");
}
try
{
var messageId = _messageIdGenerator.Generate();
_logger.LogDebug($"PrePublish message begin ,messageId:{messageId}");
var now = _timeHelper.GetUTCNow();
var content = _jsonConverter.Serialize(@event);
var callBackParem = _jsonConverter.Serialize(callbackParemeter);
Message newMessage = new Message()
{
AddedUTCTime = now,
Content = content,
Id = messageId,
MessageStatusId = MessageStatus.Pending.Id,
MessageTypeId = messageTypeId,
RePushCallBackParameterValue = callBackParem,
NextRetryUTCTime = DateTime.MinValue
};
await _messageStorage.Add(newMessage);
_logger.LogDebug($"PrePublish message successful ,messageId:{messageId}");
return messageId;
}
catch (Exception ex)
{
var errorInfo = $"PrePublish errors in DefaultReliableBus;{ex.Message}";
_logger.LogError(ex, errorInfo);
throw new Exception(errorInfo, ex);
}
}
public async Task<bool> Publish<TReliableEvent>(TReliableEvent @event, string prePublishMessageId, CancellationToken cancellationToken = default)
{
try
{
await _messageBus.Publish(@event, prePublishMessageId, cancellationToken);
var messageBufferResult = await _messageStorage.UpdateStatus(m => m.Id == prePublishMessageId && m.MessageStatusId == MessageStatus.Pending.Id, MessageStatus.Pushed);
return messageBufferResult;
}
catch (Exception ex)
{
var errorInfo = $"Publish errors in DefaultReliableBus;{ex.Message}";
_logger.LogError(ex, errorInfo);
throw new Exception(errorInfo, ex);
}
}
}
}