ModelWrapper.cs
2.92 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
using Microsoft.Extensions.ObjectPool;
using Pole.Core;
using Pole.Core.Exceptions;
using RabbitMQ.Client;
using System;
using System.Collections.Generic;
namespace Pole.EventBus.RabbitMQ
{
public class ModelWrapper : IDisposable
{
readonly IBasicProperties persistentProperties;
readonly IBasicProperties noPersistentProperties;
public DefaultObjectPool<ModelWrapper> Pool { get; set; }
public ConnectionWrapper Connection { get; set; }
public IModel Model { get; set; }
public ModelWrapper(
ConnectionWrapper connectionWrapper,
IModel model)
{
Connection = connectionWrapper;
Model = model;
var consumeRetryTimes = 0;
var consumeRetryTimesStr = consumeRetryTimes.ToString();
persistentProperties = Model.CreateBasicProperties();
persistentProperties.Persistent = true;
persistentProperties.Headers = new Dictionary<string, object>();
persistentProperties.Headers.Add(Consts.ConsumerRetryTimesStr, consumeRetryTimesStr);
noPersistentProperties = Model.CreateBasicProperties();
noPersistentProperties.Persistent = false;
noPersistentProperties.Headers = new Dictionary<string, object>();
noPersistentProperties.Headers.Add(Consts.ConsumerRetryTimesStr, consumeRetryTimesStr);
}
public void Publish(byte[] msg, IDictionary<string, object> headers, string exchange, string routingKey, bool persistent = true)
{
if (persistent)
{
persistentProperties.Headers = headers;
}
else
{
noPersistentProperties.Headers = headers;
}
Model.BasicPublish(exchange, routingKey, persistent ? persistentProperties : noPersistentProperties, msg);
if (!Model.WaitForConfirms(TimeSpan.FromSeconds(Connection.Options.ProducerConfirmWaitTimeoutSeconds), out bool isTimeout))
{
if (isTimeout)
{
throw new ProducerConfirmTimeOutException(Connection.Options.ProducerConfirmWaitTimeoutSeconds);
}
else
{
throw new ProducerReceivedNAckException();
}
}
}
public void WaitForConfirmsOrDie(TimeSpan timeSpan)
{
Model.WaitForConfirmsOrDie(timeSpan);
}
public void Publish(byte[] msg, string exchange, string routingKey, bool persistent = true)
{
Model.BasicPublish(exchange, routingKey, persistent ? persistentProperties : noPersistentProperties, msg);
}
public void Dispose()
{
Pool.Return(this);
}
public void ForceDispose()
{
Model.Close();
Model.Dispose();
Connection.Return(this);
}
}
}