Skip to content
  • P
    Projects
  • G
    Groups
  • S
    Snippets
  • Help

丁松杰 / Pole

  • This project
    • Loading...
  • Sign in
Go to a project
  • Project
  • Repository
  • Issues 0
  • Merge Requests 0
  • Pipelines
  • Wiki
  • Snippets
  • Members
  • Activity
  • Graph
  • Charts
  • Create a new issue
  • Jobs
  • Commits
  • Issue Boards
  • Files
  • Commits
  • Branches
  • Tags
  • Contributors
  • Graph
  • Compare
  • Charts
Switch branch/tag
  • Pole
  • src
  • Pole.ReliableMessage.Storage.Mongodb
  • ReliableMessageOptionExtension.cs
Find file
BlameHistoryPermalink
  • 丁松杰's avatar
    优化 可靠消息 · def023ac
    丁松杰 committed 5 years ago
    def023ac
ReliableMessageOptionExtension.cs 5.47 KB
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121
using Pole.ReliableMessage;
using Pole.ReliableMessage.Abstraction;
using Pole.ReliableMessage.Messaging;
using Pole.ReliableMessage.Storage.Mongodb;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using MongoDB.Bson;
using MongoDB.Bson.Serialization;
using MongoDB.Bson.Serialization.IdGenerators;
using MongoDB.Driver;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using Pole.ReliableMessage.Storage.Abstraction;

namespace Microsoft.Extensions.DependencyInjection
{
    public static class ReliableMessageOptionExtension
    {
        public static ReliableMessageOption AddMongodb(this ReliableMessageOption option, Action<MongodbOption> mongodbOptionConfig)
        {
            option.ReliableMessageOptionExtensions.Add(new MongodbStorageExtension(mongodbOptionConfig));
            return option;
        }
    }
    public class MongodbStorageExtension : IReliableMessageOptionExtension
    {
        private readonly Action<MongodbOption> _mongodbOption;
        public MongodbStorageExtension(Action<MongodbOption> masstransitRabbitmqOption)
        {
            _mongodbOption = masstransitRabbitmqOption;
        }
        public void AddServices(IServiceCollection services)
        {
            services.Configure(_mongodbOption);
            services.AddSingleton<IMessageStorage, MongodbMessageStorage>();
            services.AddSingleton<IMemberShipTableManager, MongodbMemberShipTableManager>();

            var mongodbOption = services.BuildServiceProvider().GetRequiredService<IOptions<MongodbOption>>().Value;

            var servers = mongodbOption.Servers.Select(x => new MongoServerAddress(x.Host, x.Port)).ToList();
            var settings = new MongoClientSettings()
            {
                Servers = servers
            };
            var client = new MongoClient(settings);
            var database = client.GetDatabase(mongodbOption.MessageDatabaseName);

            AddMapper();

            InitCollection(mongodbOption, database);

            services.AddSingleton(client);
        }

        private static void InitCollection(MongodbOption mongodbOption, IMongoDatabase database)
        {
            var collectionNames = database.ListCollectionNames().ToList();

            if (!collectionNames.Contains(mongodbOption.ServiceCollectionName))
            {
                database.CreateCollection(mongodbOption.ServiceCollectionName);
                var messageCollection = database.GetCollection<Message>(mongodbOption.ServiceCollectionName);
                AddMessageCollectionIndex(messageCollection);
            }

            if (!collectionNames.Contains(mongodbOption.MembershipCollectionName))
            {
                database.CreateCollection(mongodbOption.MembershipCollectionName);
                var membershipCollection = database.GetCollection<MemberShipTable>(mongodbOption.MembershipCollectionName);
                AddMemberShipTableCollectionIndex(membershipCollection);
            }
        }

        private static void AddMessageCollectionIndex(IMongoCollection<Message> collection)
        {
            List<CreateIndexModel<Message>> createIndexModels = new List<CreateIndexModel<Message>>();

            //var nextRetryUTCTimeIndex = Builders<Message>.IndexKeys.Ascending(m => m.NextRetryUTCTime);
            //CreateIndexModel<Message> nextRetryUTCTimeIndexModel = new CreateIndexModel<Message>(nextRetryUTCTimeIndex, new CreateIndexOptions() { Background = true });
            //createIndexModels.Add(nextRetryUTCTimeIndexModel);

            var AddedUTCTimeUTCTimeIndex = Builders<Message>.IndexKeys.Ascending(m => m.AddedUTCTime);
            CreateIndexModel<Message> AddedUTCTimeIndexModel = new CreateIndexModel<Message>(AddedUTCTimeUTCTimeIndex, new CreateIndexOptions() { Background = true });
            createIndexModels.Add(AddedUTCTimeIndexModel);

            //var messageTypeIdIndex = Builders<Message>.IndexKeys.Ascending(m => m.MessageTypeId);
            //CreateIndexModel<Message> messageTypeIdIndexModel = new CreateIndexModel<Message>(messageTypeIdIndex, new CreateIndexOptions() { Background = true });
            //createIndexModels.Add(messageTypeIdIndexModel);

            collection.Indexes.CreateMany(createIndexModels);
        }
        private static void AddMemberShipTableCollectionIndex(IMongoCollection<MemberShipTable> collection)
        {
            List<CreateIndexModel<MemberShipTable>> createIndexMembershipModels = new List<CreateIndexModel<MemberShipTable>>();

            var serviceNameIndex = Builders<MemberShipTable>.IndexKeys.Ascending(m => m.ServiceName);
            CreateIndexModel<MemberShipTable> serviceNameIndexModel = new CreateIndexModel<MemberShipTable>(serviceNameIndex, new CreateIndexOptions() { Background = true, Unique = true });
            createIndexMembershipModels.Add(serviceNameIndexModel);

            collection.Indexes.CreateMany(createIndexMembershipModels);
        }

        private static void AddMapper()
        {
            BsonClassMap.RegisterClassMap<Message>(cm =>
            {
                cm.AutoMap();
                cm.UnmapMember(m => m.MessageStatus);
                cm.MapIdField(m => m.Id);
                cm.MapMember(m => m.NextRetryUTCTime).SetIsRequired(true);
            });
            BsonClassMap.RegisterClassMap<MemberShipTable>(cm =>
            {
                cm.AutoMap();
                cm.MapIdField(m => m.Id).SetIdGenerator(StringObjectIdGenerator.Instance);
            });
        }
    }
}