Skip to content
  • P
    Projects
  • G
    Groups
  • S
    Snippets
  • Help

丁松杰 / Pole

  • This project
    • Loading...
  • Sign in
Go to a project
  • Project
  • Repository
  • Issues 0
  • Merge Requests 0
  • Pipelines
  • Wiki
  • Snippets
  • Members
  • Activity
  • Graph
  • Charts
  • Create a new issue
  • Jobs
  • Commits
  • Issue Boards
  • Files
  • Commits
  • Branches
  • Tags
  • Contributors
  • Graph
  • Compare
  • Charts
Switch branch/tag
  • Pole
  • src
  • Pole.ReliableMessage
  • DefaultMessageCheckRetryer.cs
Find file
BlameHistoryPermalink
  • 丁松杰's avatar
    优化 可靠消息 · 779a2f18
    丁松杰 committed 5 years ago
    779a2f18
DefaultMessageCheckRetryer.cs 4.27 KB
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Pole.ReliableMessage.Abstraction;
using Pole.ReliableMessage.Storage.Abstraction;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Pole.ReliableMessage
{
    class DefaultMessageCheckRetryer : IMessageCheckRetryer
    {
        private readonly ILogger _logger;
        private readonly IRetryTimeDelayCalculator _retryTimeDelayCalculator;
        private readonly ReliableMessageOption _options;
        private readonly IMessageStorage _storage;
        private readonly IMessageChecker _messageChecker;
        private readonly IMessageBus _messageBus;
        private readonly List<Message> _changedMessage = new List<Message>();
        public DefaultMessageCheckRetryer(ILogger<DefaultMessageCheckRetryer> logger, IRetryTimeDelayCalculator retryTimeDelayCalculator, IOptions<ReliableMessageOption> options, IMessageStorage storage, IMessageChecker messageChecker, IMessageBus messageBus)
        {
            _logger = logger;
            _retryTimeDelayCalculator = retryTimeDelayCalculator;
            _options = options.Value ?? throw new Exception($"{nameof(ReliableMessageOption)} Must be injected");
            _storage = storage;
            _messageChecker = messageChecker;
            _messageBus = messageBus;
        }
        public async Task Execute(IEnumerable<Message> messages, DateTime dateTime)
        {
            try
            {
                messages.AsParallel().ForAll(async m => await Retry(m, dateTime));
                if (_changedMessage.Count != 0)
                {
                    await _storage.Save(_changedMessage);
                }
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, $"DefaultMessageCheckRetryer.Execute ,Execute with errors");
            }
            finally
            {
                if (_changedMessage.Count != 0)
                {
                    _changedMessage.Clear();
                }
            }
        }
        private async Task Retry(Message message, DateTime retryTime)
        {
            try
            {
                if (_logger.IsEnabled(LogLevel.Debug))
                {
                    _logger.LogDebug($"DefaultMessageCheckRetryer.Retry ,message:{message.Id} begin Retry");
                }
                var nextRetryDelay = _retryTimeDelayCalculator.Get(message.RetryTimes, _options.MaxPendingMessageRetryDelay);
                message.NextRetryUTCTime = retryTime.AddSeconds(nextRetryDelay);

                if (retryTime > message.AddedUTCTime.AddSeconds(_options.PendingMessageTimeOut))
                {
                    if (_logger.IsEnabled(LogLevel.Debug))
                    {
                        _logger.LogDebug($"DefaultMessageCheckRetryer.Retry ,message:{message.Id} would be Canced ,PendingMessageTimeOut:{_options.PendingMessageTimeOut}");
                    }

                    message.NextRetryUTCTime = DateTime.MinValue;
                    message.MessageStatus = MessageStatus.Canced;
                    _changedMessage.Add(message);
                    return;
                }
                message.RetryTimes++;

                var messageCheckerResult = await _messageChecker.GetResult(message);
                if (messageCheckerResult.IsFinished)
                {
                    if (_logger.IsEnabled(LogLevel.Debug))
                    {
                        _logger.LogDebug($"DefaultMessageCheckRetryer.Retry ,message:{message.Id} would be Pushed");
                    }
                    message.MessageStatus = MessageStatus.Pushed;
                    await _messageBus.Publish(messageCheckerResult.Event, message.Id);
                }
                else
                {
                    if (_logger.IsEnabled(LogLevel.Debug))
                    {
                        _logger.LogDebug($"DefaultMessageCheckRetryer.Retry ,message:{message.Id} would be Retry next time");
                    }
                }
                _changedMessage.Add(message);
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, $"DefaultMessageCheckRetryer.Retry ,Message:{message.Id}  retry with errors");
            }
        }
    }
}