Skip to content
  • P
    Projects
  • G
    Groups
  • S
    Snippets
  • Help

丁松杰 / Pole

  • This project
    • Loading...
  • Sign in
Go to a project
  • Project
  • Repository
  • Issues 0
  • Merge Requests 0
  • Pipelines
  • Wiki
  • Snippets
  • Members
  • Activity
  • Graph
  • Charts
  • Create a new issue
  • Jobs
  • Commits
  • Issue Boards
  • Files
  • Commits
  • Branches
  • Tags
  • Contributors
  • Graph
  • Compare
  • Charts
Switch branch/tag
  • Pole
  • src
  • Pole.Core
  • EventBus
  • ObserverUnit.cs
Find file
BlameHistoryPermalink
  • dingsongjie's avatar
    精简代码 · 2c98d157
    dingsongjie committed 5 years ago
    2c98d157
ObserverUnit.cs 5.2 KB
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116
using Microsoft.Extensions.Logging;
using Orleans;
using Pole.Core.Serialization;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using System.Linq;
using Pole.Core.EventBus.Event;
using Orleans.Concurrency;
using System.Collections.Concurrent;
using System.Linq.Expressions;
using Pole.Core.EventBus.EventHandler;
using Pole.Core.Utils.Abstraction;

namespace Pole.Core.EventBus
{
    public class ObserverUnit<PrimaryKey> : IObserverUnit<PrimaryKey>
    {
        readonly IServiceProvider serviceProvider;
        readonly ISerializer serializer;
        readonly IEventTypeFinder typeFinder;
        readonly IClusterClient clusterClient;
        Func<byte[], Task> eventHandler;
        Func<List<byte[]>, Task> batchEventHandler;
        protected ILogger Logger { get; private set; }
        public Type EventHandlerType { get; }

        public ObserverUnit(IServiceProvider serviceProvider, Type eventHandlerType)
        {
            this.serviceProvider = serviceProvider;
            clusterClient = serviceProvider.GetService<IClusterClient>();
            serializer = serviceProvider.GetService<ISerializer>();
            typeFinder = serviceProvider.GetService<IEventTypeFinder>();
            Logger = serviceProvider.GetService<ILogger<ObserverUnit<PrimaryKey>>>();
            EventHandlerType = eventHandlerType;
        }
        public static ObserverUnit<PrimaryKey> From<Grain>(IServiceProvider serviceProvider) where Grain : Orleans.Grain
        {
            return new ObserverUnit<PrimaryKey>(serviceProvider, typeof(Grain));
        }

        public Func<byte[], Task> GetEventHandler()
        {
            return eventHandler;
        }

        public Func<List<byte[]>, Task> GetBatchEventHandler()
        {
            return batchEventHandler;
        }

        public void Observer()
        {
            if (!typeof(IPoleEventHandler).IsAssignableFrom(EventHandlerType))
                throw new NotSupportedException($"{EventHandlerType.FullName} must inheritance from PoleEventHandler");
            eventHandler = EventHandler;
            batchEventHandler = BatchEventHandler;
            //内部函数
            Task EventHandler(byte[] bytes)
            {
                var (success, transport) = EventBytesTransport.FromBytes(bytes);
                if (success)
                {
                    return GetObserver(EventHandlerType, transport.EventId).Invoke(transport);
                }
                else
                {
                    if (Logger.IsEnabled(LogLevel.Error))
                        Logger.LogError($" EventId:{nameof(EventBytesTransport.EventId)} is not a event");
                }
                return Task.CompletedTask;
            }
            Task BatchEventHandler(List<byte[]> list)
            {
                var transports = list.Select(bytes =>
                {
                    var (success, transport) = EventBytesTransport.FromBytes(bytes);
                    if (!success)
                    {
                        if (Logger.IsEnabled(LogLevel.Error))
                            Logger.LogError($" EventId:{nameof(EventBytesTransport.EventId)} is not a event");
                    }
                    return (success, transport);
                }).Where(o => o.success)
                  .Select(o => (o.transport))
                  .ToList();
                // 批量处理的时候 grain Id 取第一个 event的id
                return GetObserver(EventHandlerType, transports.First().EventId).Invoke(transports);
            }
        }
        static readonly ConcurrentDictionary<Type, Func<IClusterClient, string, string, IPoleEventHandler>> _observerGeneratorDict = new ConcurrentDictionary<Type, Func<IClusterClient, string, string, IPoleEventHandler>>();
        private IPoleEventHandler GetObserver(Type ObserverType, string primaryKey)
        {
            var func = _observerGeneratorDict.GetOrAdd(ObserverType, key =>
            {
                var clientType = typeof(IClusterClient);
                var clientParams = Expression.Parameter(clientType, "client");
                var primaryKeyParams = Expression.Parameter(typeof(string), "primaryKey");
                var grainClassNamePrefixParams = Expression.Parameter(typeof(string), "grainClassNamePrefix");
                var method = typeof(ClusterClientExtensions).GetMethod("GetGrain", new Type[] { clientType, typeof(string), typeof(string) });
                var body = Expression.Call(method.MakeGenericMethod(ObserverType), clientParams, primaryKeyParams, grainClassNamePrefixParams);
                return Expression.Lambda<Func<IClusterClient, string, string, IPoleEventHandler>>(body, clientParams, primaryKeyParams, grainClassNamePrefixParams).Compile();
            });
            return func(clusterClient, primaryKey, null);
        }
    }
    public static class ClusterClientExtensions
    {
        public static TGrainInterface GetGrain<TGrainInterface>(IClusterClient client, string primaryKey, string grainClassNamePrefix = null) where TGrainInterface : IGrainWithStringKey
        {
            return client.GetGrain<TGrainInterface>(primaryKey, grainClassNamePrefix);
        }
    }
}