Skip to content
  • P
    Projects
  • G
    Groups
  • S
    Snippets
  • Help

丁松杰 / Pole

  • This project
    • Loading...
  • Sign in
Go to a project
  • Project
  • Repository
  • Issues 0
  • Merge Requests 0
  • Pipelines
  • Wiki
  • Snippets
  • Members
  • Activity
  • Graph
  • Charts
  • Create a new issue
  • Jobs
  • Commits
  • Issue Boards
  • Files
  • Commits
  • Branches
  • Tags
  • Contributors
  • Graph
  • Compare
  • Charts
Switch branch/tag
  • Pole
  • src
  • Pole.ReliableMessage.Storage.Mongodb
  • MongodbMemberShipTableManager.cs
Find file
BlameHistoryPermalink
  • 丁松杰's avatar
    优化 可靠消息 · def023ac
    丁松杰 committed 5 years ago
    def023ac
MongodbMemberShipTableManager.cs 4.2 KB
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94
using Pole.ReliableMessage.Abstraction;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using MongoDB.Driver;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Pole.ReliableMessage.Storage.Abstraction;

namespace Pole.ReliableMessage.Storage.Mongodb
{
    class MongodbMemberShipTableManager : IMemberShipTableManager
    {
        private readonly MongoClient _mongoClient;
        private readonly MongodbOption _mongodbOption;
        private readonly ILogger _logger;
        public MongodbMemberShipTableManager(IConfiguration configuration, MongoClient mongoClient, IOptions<MongodbOption> mongodbOption, ILogger<MongodbMemberShipTableManager> logger)
        {
            _mongoClient = mongoClient;
            _mongodbOption = mongodbOption.Value;
            _logger = logger;
        }
        private IMongoDatabase GetActiveMessageDatabase(string activeMessageDatabase)
        {
            return _mongoClient.GetDatabase(activeMessageDatabase);
        }
        private IMongoCollection<MemberShipTable> GetCollection()
        {
            var database = GetActiveMessageDatabase(_mongodbOption.MessageDatabaseName);
            var messageCollectionName = _mongodbOption.MembershipCollectionName;
            var collection = database.GetCollection<MemberShipTable>(messageCollectionName);
            return collection;
        }
        public async Task<bool> AddCheckerServiceInstanceAndDeleteOthers(string ipAddress, DateTime aliveUTCTime)
        {
            var collection = GetCollection();
            var deleteResult = await collection.DeleteManyAsync(m => m.ServiceName == _mongodbOption.ServiceCollectionName);
            MemberShipTable memberShipTable = new MemberShipTable(_mongodbOption.ServiceCollectionName, ipAddress, aliveUTCTime);
            await collection.InsertOneAsync(memberShipTable);
            return true;
        }

        public async Task<string> GetPendingMessageCheckerServiceInstanceIp(DateTime iamAliveEndTime)
        {
            var collection = GetCollection();

            var instances = (await collection.FindAsync(m => m.ServiceName == _mongodbOption.ServiceCollectionName && m.IAmAliveUTCTime >= iamAliveEndTime)).ToList();
            if (instances.Count > 1)
            {
                _logger.LogInformation($"Current time have {instances.Count} PendingMessageChecker in {_mongodbOption.ServiceCollectionName} service , I will delete  the extra instances");
                var currentInstance = instances.FirstOrDefault();
                var extraInstances = instances.Remove(currentInstance);
                instances.ForEach(async n =>
                {
                    await collection.DeleteOneAsync(m => m.Id == n.Id);
                });
                _logger.LogInformation($"Extra PendingMessageChecker instances in {_mongodbOption.ServiceCollectionName} service deleted successfully");
                return currentInstance.PendingMessageCheckerIp;
            }
            else if (instances.Count == 1)
            {
                return instances.FirstOrDefault().PendingMessageCheckerIp;
            }
            else
            {
                return null;
            }
        }

        public async Task<bool> IsPendingMessageCheckerServiceInstance(string ipAddress)
        {
            var collection = GetCollection();

            var instances = (await collection.FindAsync(m => m.ServiceName == _mongodbOption.ServiceCollectionName && m.PendingMessageCheckerIp== ipAddress)).FirstOrDefault();
            if (instances != null)
            {
                return true;
            }
            return false;
        }

        public async Task<bool> UpdateIAmAlive(string ipAddress,DateTime dateTime)
        {
            var collection = GetCollection();
            var filter = Builders<MemberShipTable>.Filter.Where(m => m.ServiceName == _mongodbOption.ServiceCollectionName && m.PendingMessageCheckerIp == ipAddress);
            var update = Builders<MemberShipTable>.Update.Set(m=>m.IAmAliveUTCTime,dateTime);
            var result = await collection.UpdateOneAsync(filter, update);
            return true;
        }
    }
}