ObserverUnit.cs
5.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
using Microsoft.Extensions.Logging;
using Orleans;
using Pole.Core.Serialization;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using System.Linq;
using Pole.Core.EventBus.Event;
using Orleans.Concurrency;
using System.Collections.Concurrent;
using System.Linq.Expressions;
using Pole.Core.EventBus.EventHandler;
using Pole.Core.Utils.Abstraction;
namespace Pole.Core.EventBus
{
public class ObserverUnit<PrimaryKey> : IObserverUnit<PrimaryKey>
{
readonly IServiceProvider serviceProvider;
readonly ISerializer serializer;
readonly IEventTypeFinder typeFinder;
readonly IClusterClient clusterClient;
Func<byte[], Task> eventHandler;
Func<List<byte[]>, Task> batchEventHandler;
protected ILogger Logger { get; private set; }
public Type EventHandlerType { get; }
public ObserverUnit(IServiceProvider serviceProvider, Type eventHandlerType)
{
this.serviceProvider = serviceProvider;
clusterClient = serviceProvider.GetService<IClusterClient>();
serializer = serviceProvider.GetService<ISerializer>();
typeFinder = serviceProvider.GetService<IEventTypeFinder>();
Logger = serviceProvider.GetService<ILogger<ObserverUnit<PrimaryKey>>>();
EventHandlerType = eventHandlerType;
}
public static ObserverUnit<PrimaryKey> From<Grain>(IServiceProvider serviceProvider) where Grain : Orleans.Grain
{
return new ObserverUnit<PrimaryKey>(serviceProvider, typeof(Grain));
}
public Func<byte[], Task> GetEventHandler()
{
return eventHandler;
}
public Func<List<byte[]>, Task> GetBatchEventHandler()
{
return batchEventHandler;
}
public void Observer()
{
if (!typeof(IPoleEventHandler).IsAssignableFrom(EventHandlerType))
throw new NotSupportedException($"{EventHandlerType.FullName} must inheritance from PoleEventHandler");
eventHandler = EventHandler;
batchEventHandler = BatchEventHandler;
//内部函数
Task EventHandler(byte[] bytes)
{
var (success, transport) = EventBytesTransport.FromBytes(bytes);
if (success)
{
return GetObserver(EventHandlerType, transport.EventId).Invoke(transport);
}
else
{
if (Logger.IsEnabled(LogLevel.Error))
Logger.LogError($" EventId:{nameof(EventBytesTransport.EventId)} is not a event");
}
return Task.CompletedTask;
}
Task BatchEventHandler(List<byte[]> list)
{
var transports = list.Select(bytes =>
{
var (success, transport) = EventBytesTransport.FromBytes(bytes);
if (!success)
{
if (Logger.IsEnabled(LogLevel.Error))
Logger.LogError($" EventId:{nameof(EventBytesTransport.EventId)} is not a event");
}
return (success, transport);
}).Where(o => o.success)
.Select(o => (o.transport))
.ToList();
// 批量处理的时候 grain Id 取第一个 event的id
return GetObserver(EventHandlerType, transports.First().EventId).Invoke(transports);
}
}
static readonly ConcurrentDictionary<Type, Func<IClusterClient, string, string, IPoleEventHandler>> _observerGeneratorDict = new ConcurrentDictionary<Type, Func<IClusterClient, string, string, IPoleEventHandler>>();
private IPoleEventHandler GetObserver(Type ObserverType, string primaryKey)
{
var func = _observerGeneratorDict.GetOrAdd(ObserverType, key =>
{
var clientType = typeof(IClusterClient);
var clientParams = Expression.Parameter(clientType, "client");
var primaryKeyParams = Expression.Parameter(typeof(string), "primaryKey");
var grainClassNamePrefixParams = Expression.Parameter(typeof(string), "grainClassNamePrefix");
var method = typeof(ClusterClientExtensions).GetMethod("GetGrain", new Type[] { clientType, typeof(string), typeof(string) });
var body = Expression.Call(method.MakeGenericMethod(ObserverType), clientParams, primaryKeyParams, grainClassNamePrefixParams);
return Expression.Lambda<Func<IClusterClient, string, string, IPoleEventHandler>>(body, clientParams, primaryKeyParams, grainClassNamePrefixParams).Compile();
});
return func(clusterClient, primaryKey, null);
}
}
public static class ClusterClientExtensions
{
public static TGrainInterface GetGrain<TGrainInterface>(IClusterClient client, string primaryKey, string grainClassNamePrefix = null) where TGrainInterface : IGrainWithStringKey
{
return client.GetGrain<TGrainInterface>(primaryKey, grainClassNamePrefix);
}
}
}