ModelWrapper.cs
2.16 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
using Microsoft.Extensions.ObjectPool;
using Pole.Core;
using Pole.Core.Exceptions;
using RabbitMQ.Client;
using System;
namespace Pole.EventBus.RabbitMQ
{
public class ModelWrapper : IDisposable
{
readonly IBasicProperties persistentProperties;
readonly IBasicProperties noPersistentProperties;
public DefaultObjectPool<ModelWrapper> Pool { get; set; }
public ConnectionWrapper Connection { get; set; }
public IModel Model { get; set; }
public ModelWrapper(
ConnectionWrapper connectionWrapper,
IModel model)
{
Connection = connectionWrapper;
Model = model;
var consumeRetryTimes = 0;
var consumeRetryTimesStr = consumeRetryTimes.ToString();
persistentProperties = Model.CreateBasicProperties();
persistentProperties.Persistent = true;
persistentProperties.Headers.Add(Consts.ConsumerRetryTimesStr, consumeRetryTimesStr);
noPersistentProperties = Model.CreateBasicProperties();
noPersistentProperties.Persistent = false;
noPersistentProperties.Headers.Add(Consts.ConsumerRetryTimesStr, consumeRetryTimesStr);
}
public void Publish(byte[] msg, string exchange, string routingKey, bool persistent = true)
{
Model.ConfirmSelect();
Model.BasicPublish(exchange, routingKey, persistent ? persistentProperties : noPersistentProperties, msg);
if (!Model.WaitForConfirms(TimeSpan.FromSeconds(Connection.Options.ProducerConfirmWaitTimeoutSeconds), out bool isTimeout))
{
if (isTimeout)
{
throw new ProducerConfirmTimeOutException(Connection.Options.ProducerConfirmWaitTimeoutSeconds);
}
else
{
throw new ProducerReceivedNAckException();
}
}
}
public void Dispose()
{
Pool.Return(this);
}
public void ForceDispose()
{
Model.Close();
Model.Dispose();
Connection.Return(this);
}
}
}