DefaultMessageCheckRetryer.cs
4.27 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Pole.ReliableMessage.Abstraction;
using Pole.ReliableMessage.Storage.Abstraction;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace Pole.ReliableMessage
{
class DefaultMessageCheckRetryer : IMessageCheckRetryer
{
private readonly ILogger _logger;
private readonly IRetryTimeDelayCalculator _retryTimeDelayCalculator;
private readonly ReliableMessageOption _options;
private readonly IMessageStorage _storage;
private readonly IMessageChecker _messageChecker;
private readonly IMessageBus _messageBus;
private readonly List<Message> _changedMessage = new List<Message>();
public DefaultMessageCheckRetryer(ILogger<DefaultMessageCheckRetryer> logger, IRetryTimeDelayCalculator retryTimeDelayCalculator, IOptions<ReliableMessageOption> options, IMessageStorage storage, IMessageChecker messageChecker, IMessageBus messageBus)
{
_logger = logger;
_retryTimeDelayCalculator = retryTimeDelayCalculator;
_options = options.Value ?? throw new Exception($"{nameof(ReliableMessageOption)} Must be injected");
_storage = storage;
_messageChecker = messageChecker;
_messageBus = messageBus;
}
public async Task Execute(IEnumerable<Message> messages, DateTime dateTime)
{
try
{
messages.AsParallel().ForAll(async m => await Retry(m, dateTime));
if (_changedMessage.Count != 0)
{
await _storage.Save(_changedMessage);
}
}
catch (Exception ex)
{
_logger.LogError(ex, $"DefaultMessageCheckRetryer.Execute ,Execute with errors");
}
finally
{
if (_changedMessage.Count != 0)
{
_changedMessage.Clear();
}
}
}
private async Task Retry(Message message, DateTime retryTime)
{
try
{
if (_logger.IsEnabled(LogLevel.Debug))
{
_logger.LogDebug($"DefaultMessageCheckRetryer.Retry ,message:{message.Id} begin Retry");
}
var nextRetryDelay = _retryTimeDelayCalculator.Get(message.RetryTimes, _options.MaxPendingMessageRetryDelay);
message.NextRetryUTCTime = retryTime.AddSeconds(nextRetryDelay);
if (retryTime > message.AddedUTCTime.AddSeconds(_options.PendingMessageTimeOut))
{
if (_logger.IsEnabled(LogLevel.Debug))
{
_logger.LogDebug($"DefaultMessageCheckRetryer.Retry ,message:{message.Id} would be Canced ,PendingMessageTimeOut:{_options.PendingMessageTimeOut}");
}
message.NextRetryUTCTime = DateTime.MinValue;
message.MessageStatus = MessageStatus.Canced;
_changedMessage.Add(message);
return;
}
message.RetryTimes++;
var messageCheckerResult = await _messageChecker.GetResult(message);
if (messageCheckerResult.IsFinished)
{
if (_logger.IsEnabled(LogLevel.Debug))
{
_logger.LogDebug($"DefaultMessageCheckRetryer.Retry ,message:{message.Id} would be Pushed");
}
message.MessageStatus = MessageStatus.Pushed;
await _messageBus.Publish(messageCheckerResult.Event, message.Id);
}
else
{
if (_logger.IsEnabled(LogLevel.Debug))
{
_logger.LogDebug($"DefaultMessageCheckRetryer.Retry ,message:{message.Id} would be Retry next time");
}
}
_changedMessage.Add(message);
}
catch (Exception ex)
{
_logger.LogError(ex, $"DefaultMessageCheckRetryer.Retry ,Message:{message.Id} retry with errors");
}
}
}
}