ConsumerManager.cs
5.71 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Orleans;
using Pole.Core.Services;
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace Pole.EventBus.RabbitMQ
{
public class ConsumerManager : IHostedService, IDisposable
{
readonly ILogger<ConsumerManager> logger;
readonly IRabbitMQClient client;
readonly IRabbitEventBusContainer rabbitEventBusContainer;
readonly IServiceProvider provider;
readonly IGrainFactory grainFactory;
const int _HoldTime = 20 * 1000;
const int _MonitTime = 60 * 2 * 1000;
const int _checkTime = 10 * 1000;
public ConsumerManager(
ILogger<ConsumerManager> logger,
IRabbitMQClient client,
IGrainFactory grainFactory,
IServiceProvider provider,
IRabbitEventBusContainer rabbitEventBusContainer)
{
this.provider = provider;
this.client = client;
this.logger = logger;
this.rabbitEventBusContainer = rabbitEventBusContainer;
this.grainFactory = grainFactory;
}
private readonly ConcurrentDictionary<string, ConsumerRunner> ConsumerRunners = new ConcurrentDictionary<string, ConsumerRunner>();
private ConcurrentDictionary<string, long> Runners { get; } = new ConcurrentDictionary<string, long>();
private Timer HeathCheckTimer { get; set; }
private Timer DistributedMonitorTime { get; set; }
private Timer DistributedHoldTimer { get; set; }
const int lockHoldingSeconds = 60;
int distributedHoldTimerLock = 0;
int heathCheckTimerLock = 0;
private async Task Start()
{
var consumers = rabbitEventBusContainer.GetConsumers();
foreach (var consumer in consumers)
{
if (consumer is RabbitConsumer value)
{
var queue = value.QueueInfo;
var key = queue.Queue;
var runner = new ConsumerRunner(client, provider, value, queue);
ConsumerRunners.TryAdd(key, runner);
await runner.Run();
}
}
}
private async Task DistributedHold()
{
try
{
if (logger.IsEnabled(LogLevel.Information))
logger.LogInformation("EventBus Background Service is holding.");
if (Interlocked.CompareExchange(ref distributedHoldTimerLock, 1, 0) == 0)
{
foreach (var lockKV in Runners)
{
if (Runners.TryGetValue(lockKV.Key, out var lockId))
{
var holdResult = await grainFactory.GetGrain<IWeightHoldLock>(lockKV.Key).Hold(lockId, lockHoldingSeconds);
if (!holdResult)
{
if (ConsumerRunners.TryRemove(lockKV.Key, out var runner))
{
runner.Close();
}
Runners.TryRemove(lockKV.Key, out var _);
}
}
}
Interlocked.Exchange(ref distributedHoldTimerLock, 0);
}
}
catch (Exception exception)
{
logger.LogError(exception.InnerException ?? exception, nameof(DistributedHold));
Interlocked.Exchange(ref distributedHoldTimerLock, 0);
}
}
private async Task HeathCheck()
{
try
{
if (logger.IsEnabled(LogLevel.Information))
logger.LogInformation("EventBus Background Service is checking.");
if (Interlocked.CompareExchange(ref heathCheckTimerLock, 1, 0) == 0)
{
await Task.WhenAll(ConsumerRunners.Values.Select(runner => runner.HeathCheck()));
Interlocked.Exchange(ref heathCheckTimerLock, 0);
}
}
catch (Exception exception)
{
logger.LogError(exception.InnerException ?? exception, nameof(HeathCheck));
Interlocked.Exchange(ref heathCheckTimerLock, 0);
}
}
public Task StartAsync(CancellationToken cancellationToken)
{
if (logger.IsEnabled(LogLevel.Information))
logger.LogInformation("EventBus Background Service is starting.");
DistributedMonitorTime = new Timer(state => Start().Wait(), null, 1000, _MonitTime);
DistributedHoldTimer = new Timer(state => DistributedHold().Wait(), null, _HoldTime, _HoldTime);
HeathCheckTimer = new Timer(state => { HeathCheck().Wait(); }, null, _checkTime, _checkTime);
return Task.CompletedTask;
}
public Task StopAsync(CancellationToken cancellationToken)
{
if (logger.IsEnabled(LogLevel.Information))
logger.LogInformation("EventBus Background Service is stopping.");
Dispose();
return Task.CompletedTask;
}
public void Dispose()
{
if (logger.IsEnabled(LogLevel.Information))
logger.LogInformation("EventBus Background Service is disposing.");
foreach (var runner in ConsumerRunners.Values)
{
runner.Close();
}
DistributedMonitorTime?.Dispose();
DistributedHoldTimer?.Dispose();
HeathCheckTimer?.Dispose();
}
}
}