Skip to content
  • P
    Projects
  • G
    Groups
  • S
    Snippets
  • Help

丁松杰 / Pole

  • This project
    • Loading...
  • Sign in
Go to a project
  • Project
  • Repository
  • Issues 0
  • Merge Requests 0
  • Pipelines
  • Wiki
  • Snippets
  • Members
  • Activity
  • Graph
  • Charts
  • Create a new issue
  • Jobs
  • Commits
  • Issue Boards
  • Files
  • Commits
  • Branches
  • Tags
  • Contributors
  • Graph
  • Compare
  • Charts
Switch branch/tag
  • Pole
  • src
  • Pole.EventBus.Rabbitmq
  • Consumer
  • ConsumerManager.cs
Find file
BlameHistoryPermalink
  • 丁松杰's avatar
    修复 写入 错误队列后 没有 ack · 65b7891d
    丁松杰 committed 5 years ago
    65b7891d
ConsumerManager.cs 5.71 KB
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Orleans;
using Pole.Core.Services;
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace Pole.EventBus.RabbitMQ
{
    public class ConsumerManager : IHostedService, IDisposable
    {
        readonly ILogger<ConsumerManager> logger;
        readonly IRabbitMQClient client;
        readonly IRabbitEventBusContainer rabbitEventBusContainer;
        readonly IServiceProvider provider;
        readonly IGrainFactory grainFactory;
        const int _HoldTime = 20 * 1000;
        const int _MonitTime = 60 * 2 * 1000;
        const int _checkTime = 10 * 1000;

        public ConsumerManager(
            ILogger<ConsumerManager> logger,
            IRabbitMQClient client,
            IGrainFactory grainFactory,
            IServiceProvider provider,
            IRabbitEventBusContainer rabbitEventBusContainer)
        {
            this.provider = provider;
            this.client = client;
            this.logger = logger;
            this.rabbitEventBusContainer = rabbitEventBusContainer;
            this.grainFactory = grainFactory;
        }
        private readonly ConcurrentDictionary<string, ConsumerRunner> ConsumerRunners = new ConcurrentDictionary<string, ConsumerRunner>();
        private ConcurrentDictionary<string, long> Runners { get; } = new ConcurrentDictionary<string, long>();
        private Timer HeathCheckTimer { get; set; }
        private Timer DistributedMonitorTime { get; set; }
        private Timer DistributedHoldTimer { get; set; }
        const int lockHoldingSeconds = 60;
        int distributedHoldTimerLock = 0;
        int heathCheckTimerLock = 0;
        private async Task Start()
        {
            var consumers = rabbitEventBusContainer.GetConsumers();
            foreach (var consumer in consumers)
            {
                if (consumer is RabbitConsumer value)
                {
                    var queue = value.QueueInfo;
                    var key = queue.Queue;

                    var runner = new ConsumerRunner(client, provider, value, queue);
                    ConsumerRunners.TryAdd(key, runner);
                    await runner.Run();
                }
            }
        }
        private async Task DistributedHold()
        {
            try
            {
                if (logger.IsEnabled(LogLevel.Information))
                    logger.LogInformation("EventBus Background Service is holding.");
                if (Interlocked.CompareExchange(ref distributedHoldTimerLock, 1, 0) == 0)
                {
                    foreach (var lockKV in Runners)
                    {
                        if (Runners.TryGetValue(lockKV.Key, out var lockId))
                        {
                            var holdResult = await grainFactory.GetGrain<IWeightHoldLock>(lockKV.Key).Hold(lockId, lockHoldingSeconds);
                            if (!holdResult)
                            {
                                if (ConsumerRunners.TryRemove(lockKV.Key, out var runner))
                                {
                                    runner.Close();
                                }
                                Runners.TryRemove(lockKV.Key, out var _);
                            }
                        }
                    }
                    Interlocked.Exchange(ref distributedHoldTimerLock, 0);
                }
            }
            catch (Exception exception)
            {
                logger.LogError(exception.InnerException ?? exception, nameof(DistributedHold));
                Interlocked.Exchange(ref distributedHoldTimerLock, 0);
            }
        }
        private async Task HeathCheck()
        {
            try
            {
                if (logger.IsEnabled(LogLevel.Information))
                    logger.LogInformation("EventBus Background Service is checking.");
                if (Interlocked.CompareExchange(ref heathCheckTimerLock, 1, 0) == 0)
                {
                    await Task.WhenAll(ConsumerRunners.Values.Select(runner => runner.HeathCheck()));
                    Interlocked.Exchange(ref heathCheckTimerLock, 0);
                }
            }
            catch (Exception exception)
            {
                logger.LogError(exception.InnerException ?? exception, nameof(HeathCheck));
                Interlocked.Exchange(ref heathCheckTimerLock, 0);
            }
        }
        public Task StartAsync(CancellationToken cancellationToken)
        {
            if (logger.IsEnabled(LogLevel.Information))
                logger.LogInformation("EventBus Background Service is starting.");
            DistributedMonitorTime = new Timer(state => Start().Wait(), null, 1000, _MonitTime);
            DistributedHoldTimer = new Timer(state => DistributedHold().Wait(), null, _HoldTime, _HoldTime);
            HeathCheckTimer = new Timer(state => { HeathCheck().Wait(); }, null, _checkTime, _checkTime);
            return Task.CompletedTask;
        }
        public Task StopAsync(CancellationToken cancellationToken)
        {
            if (logger.IsEnabled(LogLevel.Information))
                logger.LogInformation("EventBus Background Service is stopping.");
            Dispose();
            return Task.CompletedTask;
        }
        public void Dispose()
        {
            if (logger.IsEnabled(LogLevel.Information))
                logger.LogInformation("EventBus Background Service is disposing.");
            foreach (var runner in ConsumerRunners.Values)
            {
                runner.Close();
            }
            DistributedMonitorTime?.Dispose();
            DistributedHoldTimer?.Dispose();
            HeathCheckTimer?.Dispose();
        }
    }
}