ModelPooledObjectPolicy.cs
1.83 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
using Microsoft.Extensions.ObjectPool;
using RabbitMQ.Client;
using System.Collections.Generic;
using System.Threading;
namespace Pole.EventBus.RabbitMQ
{
public class ModelPooledObjectPolicy : IPooledObjectPolicy<ModelWrapper>
{
readonly ConnectionFactory connectionFactory;
readonly List<ConnectionWrapper> connections = new List<ConnectionWrapper>();
readonly SemaphoreSlim semaphoreSlim = new SemaphoreSlim(1);
readonly RabbitOptions options;
public ModelPooledObjectPolicy(ConnectionFactory connectionFactory, RabbitOptions options)
{
this.connectionFactory = connectionFactory;
this.options = options;
}
public ModelWrapper Create()
{
foreach (var connection in connections)
{
(bool success, ModelWrapper model) = connection.Get();
if (success)
return model;
}
semaphoreSlim.Wait();
try
{
if (connections.Count < options.MaxConnection)
{
var connection = new ConnectionWrapper(connectionFactory.CreateConnection(options.EndPoints), options);
(bool success, ModelWrapper model) = connection.Get();
connections.Add(connection);
if (success)
return model;
}
throw new System.OverflowException(nameof(connections));
}
finally
{
semaphoreSlim.Release();
}
}
public bool Return(ModelWrapper obj)
{
if (obj.Model.IsOpen)
return true;
else
{
obj.ForceDispose();
return false;
}
}
}
}