Skip to content
  • P
    Projects
  • G
    Groups
  • S
    Snippets
  • Help

丁松杰 / Pole

  • This project
    • Loading...
  • Sign in
Go to a project
  • Project
  • Repository
  • Issues 0
  • Merge Requests 0
  • Pipelines
  • Wiki
  • Snippets
  • Members
  • Activity
  • Graph
  • Charts
  • Create a new issue
  • Jobs
  • Commits
  • Issue Boards
  • Files
  • Commits
  • Branches
  • Tags
  • Contributors
  • Graph
  • Compare
  • Charts
Switch branch/tag
  • Pole
  • src
  • Pole.Core
  • EventBus
  • ObserverUnitContainer.cs
Find file
BlameHistoryPermalink
  • 丁松杰's avatar
    完成 consumer 消费时 调用 orleans grain , grain 的id 由 eventid 决定,每一个类型的 event 每毫秒支持生成… · 988776d9
    完成 consumer 消费时 调用 orleans grain , grain 的id 由 eventid 决定,每一个类型的 event 每毫秒支持生成 64个 event ,并且在 k8s集群里 全局唯一
    丁松杰 committed 5 years ago
    988776d9
ObserverUnitContainer.cs 2.97 KB
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80
using Microsoft.Extensions.Logging;
using Pole.Core.Observer;
using Pole.Core.Utils;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Text;
using Microsoft.Extensions.DependencyInjection;
using Pole.Core.EventBus.EventHandler;
using System.Linq;
using Pole.Core.Exceptions;

namespace Pole.Core.EventBus
{
    public class ObserverUnitContainer : IObserverUnitContainer
    {
        readonly ConcurrentDictionary<string, List<object>> unitDict = new ConcurrentDictionary<string, List<object>>();
        public ObserverUnitContainer(IServiceProvider serviceProvider)
        {
            var eventHandlerList = new List<(Type, EventHandlerAttribute)>();
            foreach (var assembly in AssemblyHelper.GetAssemblies(serviceProvider.GetService<ILogger<ObserverUnitContainer>>()))
            {
                foreach (var type in assembly.GetTypes())
                {
                    foreach (var attribute in type.GetCustomAttributes(false))
                    {
                        if (attribute is EventHandlerAttribute eventHandlerAttribute)
                        {
                            eventHandlerList.Add((type, eventHandlerAttribute));
                            break;
                        }
                    }
                }
            }
            foreach (var eventHandler in eventHandlerList)
            {
                var unitType = typeof(ObserverUnit<>).MakeGenericType(new Type[] { typeof(string) });
                var unit = (ObserverUnit<string>)Activator.CreateInstance(unitType, serviceProvider, eventHandler.Item1);
                unit.Observer();
                Register<string>(eventHandler.Item2.EventName, unit);
            }
        }
        public List<IObserverUnit<PrimaryKey>> GetUnits<PrimaryKey>(string observerName)
        {
            if (unitDict.TryGetValue(observerName, out var units))
            {
                if (units is List<IObserverUnit<PrimaryKey>> result)
                {
                    return result;
                }
                else
                    throw new UnmatchObserverUnitException(observerName);
            }
            else
                throw new UnfindObserverUnitException(observerName);
        }
        public List<object> GetUnits(string observerName)
        {
            if (unitDict.TryGetValue(observerName, out var unit))
            {
                return unit;
            }
            else
                throw new UnfindObserverUnitException(observerName);
        }

        public void Register<PrimaryKey>(string observerName, IGrainID observerUnit)
        {
            if (unitDict.TryGetValue(observerName, out List<object> units))
            {
                units.Add(observerUnit);
            }
            if (!unitDict.TryAdd(observerName, new List<object> { observerUnit }))
            {
                throw new ObserverUnitRepeatedException(observerUnit.EventHandlerType.FullName);
            }
        }

    }
}