EventBusContainer.cs
6.93 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Orleans;
using RabbitMQ.Client;
using Pole.Core.Exceptions;
using Pole.Core.Utils;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading.Tasks;
using Microsoft.Extensions.Options;
using System.Linq;
using Pole.EventBus.Event;
using Pole.Core.Domain;
using Pole.EventBus.EventHandler;
namespace Pole.EventBus.RabbitMQ
{
public class EventBusContainer : IRabbitEventBusContainer, IProducerInfoContainer
{
private readonly ConcurrentDictionary<string, RabbitEventBus> eventBusDictionary = new ConcurrentDictionary<string, RabbitEventBus>();
private readonly List<RabbitEventBus> eventBusList = new List<RabbitEventBus>();
readonly IRabbitMQClient rabbitMQClient;
readonly IServiceProvider serviceProvider;
private readonly IObserverUnitContainer observerUnitContainer;
private readonly RabbitOptions rabbitOptions;
public bool IsAutoRegisterFinished { get; private set; }
public EventBusContainer(
IServiceProvider serviceProvider,
IObserverUnitContainer observerUnitContainer,
IRabbitMQClient rabbitMQClient,
IOptions<RabbitOptions> rabbitOptions)
{
this.serviceProvider = serviceProvider;
this.rabbitMQClient = rabbitMQClient;
this.observerUnitContainer = observerUnitContainer;
this.rabbitOptions = rabbitOptions.Value;
}
public async Task AutoRegister()
{
var eventList = new List<(Type type, EventInfoAttribute config)>();
var evenHandlertList = new List<(Type type, EventInfoAttribute config)>();
AddEventAndEventHandlerInfoList(eventList, evenHandlertList);
foreach (var (type, config) in eventList)
{
var eventName = config.EventName;
var eventBus = CreateEventBus(eventName, rabbitOptions.Prefix, 1, false, true, true).BindEvent(type, eventName);
await eventBus.AddGrainConsumer<string>();
}
foreach (var (type, config) in evenHandlertList)
{
var eventName = config.EventName;
if (!eventBusDictionary.TryGetValue(eventName, out RabbitEventBus rabbitEventBus))
{
var eventBus = CreateEventBus(eventName, rabbitOptions.Prefix, 1, false, true, true).BindEvent(type, eventName);
await eventBus.AddGrainConsumer<string>();
}
}
IsAutoRegisterFinished = true;
}
public RabbitEventBus CreateEventBus(string exchange, string routePrefix, int lBCount = 1, bool autoAck = false, bool reenqueue = true, bool persistent = true)
{
return new RabbitEventBus(observerUnitContainer, this, exchange, routePrefix, lBCount, autoAck, reenqueue, persistent);
}
public Task Work(RabbitEventBus bus)
{
if (eventBusDictionary.TryAdd(bus.EventName, bus))
{
eventBusList.Add(bus);
using var channel = rabbitMQClient.PullChannel();
channel.Model.ExchangeDeclare($"{rabbitOptions.Prefix}{bus.Exchange}", "direct", true);
return Task.CompletedTask;
}
else
throw new EventBusRepeatException(bus.Event.FullName);
}
readonly ConcurrentDictionary<string, IProducer> producerDict = new ConcurrentDictionary<string, IProducer>();
public string GetTargetName(string typeName)
{
if (eventBusDictionary.TryGetValue(typeName, out var eventBus))
{
return $"{rabbitOptions.Prefix}{eventBus.Exchange}";
}
else
{
throw new NotImplementedException($"{nameof(RabbitEventBus)} of {typeName}");
}
}
public List<IConsumer> GetConsumers()
{
var result = new List<IConsumer>();
foreach (var eventBus in eventBusList)
{
result.AddRange(eventBus.Consumers);
}
return result;
}
#region helpers
private void AddEventAndEventHandlerInfoList(List<(Type type, EventInfoAttribute config)> eventList, List<(Type type, EventInfoAttribute config)> eventHandlertList)
{
foreach (var assembly in AssemblyHelper.GetAssemblies(serviceProvider.GetService<ILogger<EventBusContainer>>()))
{
foreach (var type in assembly.GetTypes().Where(m => typeof(IEvent).IsAssignableFrom(m) && m.IsClass))
{
var attribute = type.GetCustomAttributes(typeof(EventInfoAttribute), false).FirstOrDefault();
if (attribute != null)
{
eventList.Add((type, (EventInfoAttribute)attribute));
}
else
{
eventList.Add((type, new EventInfoAttribute() { EventName = type.FullName }));
}
}
}
foreach (var assembly in AssemblyHelper.GetAssemblies(serviceProvider.GetService<ILogger<EventBusContainer>>()))
{
foreach (var type in assembly.GetTypes().Where(m => typeof(IPoleEventHandler).IsAssignableFrom(m) && m.IsClass && !m.IsAbstract && !typeof(Orleans.Runtime.GrainReference).IsAssignableFrom(m)))
{
var eventHandlerInterface = type.GetInterfaces().FirstOrDefault(type => typeof(IPoleEventHandler).IsAssignableFrom(type) && !type.IsGenericType);
var basePoleEventHandlerInterface = eventHandlerInterface.GetInterfaces().FirstOrDefault(m => m.IsGenericType);
if (basePoleEventHandlerInterface == null)
{
throw new PoleEventHandlerImplementException("PoleEventHandler interface must Inherited from IPoleEventHandler<TEvent>");
}
var eventType = basePoleEventHandlerInterface.GetGenericArguments().FirstOrDefault();
if (eventType == null)
{
throw new PoleEventHandlerImplementException("PoleEventHandler interface must Inherited from IPoleEventHandler<TEvent>");
}
var attribute = eventType.GetCustomAttributes(typeof(EventInfoAttribute), false).FirstOrDefault();
if (attribute != null)
{
eventHandlertList.Add((eventHandlerInterface, (EventInfoAttribute)attribute));
}
else
{
throw new PoleEventHandlerImplementException("Can not found EventHandlerAttribute in PoleEventHandler");
}
}
}
}
#endregion
}
}