From def023ac33b994fadb20fccadb3424cc92246a99 Mon Sep 17 00:00:00 2001 From: 丁松杰 <377973147@qq.com> Date: Tue, 7 Jan 2020 14:53:06 +0800 Subject: [PATCH] 优化 可靠消息 --- src/Pole.ReliableMessage.Storage.Abstraction/IMemberShipTable.cs | 22 ---------------------- src/Pole.ReliableMessage.Storage.Abstraction/IMemberShipTableManager.cs | 22 ++++++++++++++++++++++ src/Pole.ReliableMessage.Storage.Mongodb/MongodbMemberShipTable.cs | 94 ---------------------------------------------------------------------------------------------- src/Pole.ReliableMessage.Storage.Mongodb/MongodbMemberShipTableManager.cs | 94 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/Pole.ReliableMessage.Storage.Mongodb/ReliableMessageOptionExtension.cs | 2 +- src/Pole.ReliableMessage/Processor/MessageCleanProcessor.cs | 4 ++-- src/Pole.ReliableMessage/Processor/PendingMessageCheckProcessor.cs | 4 ++-- src/Pole.ReliableMessage/Processor/PendingMessageServiceInstanceCheckProcessor.cs | 4 ++-- 8 files changed, 123 insertions(+), 123 deletions(-) delete mode 100644 src/Pole.ReliableMessage.Storage.Abstraction/IMemberShipTable.cs create mode 100644 src/Pole.ReliableMessage.Storage.Abstraction/IMemberShipTableManager.cs delete mode 100644 src/Pole.ReliableMessage.Storage.Mongodb/MongodbMemberShipTable.cs create mode 100644 src/Pole.ReliableMessage.Storage.Mongodb/MongodbMemberShipTableManager.cs diff --git a/src/Pole.ReliableMessage.Storage.Abstraction/IMemberShipTable.cs b/src/Pole.ReliableMessage.Storage.Abstraction/IMemberShipTable.cs deleted file mode 100644 index 9f9b1ed..0000000 --- a/src/Pole.ReliableMessage.Storage.Abstraction/IMemberShipTable.cs +++ /dev/null @@ -1,22 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Net; -using System.Text; -using System.Threading.Tasks; - -namespace Pole.ReliableMessage.Storage.Abstraction -{ - public interface IMemberShipTable - { - Task IsPendingMessageCheckerServiceInstance(string ipAddress); - Task UpdateIAmAlive(string ipAddress, DateTime dateTime); - /// - /// 如果当前 超时时间内 没有可用 实例 返回 空 - /// - /// - /// - Task GetPendingMessageCheckerServiceInstanceIp(DateTime iamAliveEndTime); - - Task AddCheckerServiceInstanceAndDeleteOthers(string ipAddress, DateTime aliveUTCTime); - } -} diff --git a/src/Pole.ReliableMessage.Storage.Abstraction/IMemberShipTableManager.cs b/src/Pole.ReliableMessage.Storage.Abstraction/IMemberShipTableManager.cs new file mode 100644 index 0000000..46ef040 --- /dev/null +++ b/src/Pole.ReliableMessage.Storage.Abstraction/IMemberShipTableManager.cs @@ -0,0 +1,22 @@ +using System; +using System.Collections.Generic; +using System.Net; +using System.Text; +using System.Threading.Tasks; + +namespace Pole.ReliableMessage.Storage.Abstraction +{ + public interface IMemberShipTableManager + { + Task IsPendingMessageCheckerServiceInstance(string ipAddress); + Task UpdateIAmAlive(string ipAddress, DateTime dateTime); + /// + /// 如果当前 超时时间内 没有可用 实例 返回 空 + /// + /// + /// + Task GetPendingMessageCheckerServiceInstanceIp(DateTime iamAliveEndTime); + + Task AddCheckerServiceInstanceAndDeleteOthers(string ipAddress, DateTime aliveUTCTime); + } +} diff --git a/src/Pole.ReliableMessage.Storage.Mongodb/MongodbMemberShipTable.cs b/src/Pole.ReliableMessage.Storage.Mongodb/MongodbMemberShipTable.cs deleted file mode 100644 index 72f8ee4..0000000 --- a/src/Pole.ReliableMessage.Storage.Mongodb/MongodbMemberShipTable.cs +++ /dev/null @@ -1,94 +0,0 @@ -using Pole.ReliableMessage.Abstraction; -using Microsoft.Extensions.Configuration; -using Microsoft.Extensions.Logging; -using Microsoft.Extensions.Options; -using MongoDB.Driver; -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; -using Pole.ReliableMessage.Storage.Abstraction; - -namespace Pole.ReliableMessage.Storage.Mongodb -{ - class MongodbMemberShipTable : IMemberShipTable - { - private readonly MongoClient _mongoClient; - private readonly MongodbOption _mongodbOption; - private readonly ILogger _logger; - public MongodbMemberShipTable(IConfiguration configuration, MongoClient mongoClient, IOptions mongodbOption, ILogger logger) - { - _mongoClient = mongoClient; - _mongodbOption = mongodbOption.Value; - _logger = logger; - } - private IMongoDatabase GetActiveMessageDatabase(string activeMessageDatabase) - { - return _mongoClient.GetDatabase(activeMessageDatabase); - } - private IMongoCollection GetCollection() - { - var database = GetActiveMessageDatabase(_mongodbOption.MessageDatabaseName); - var messageCollectionName = _mongodbOption.MembershipCollectionName; - var collection = database.GetCollection(messageCollectionName); - return collection; - } - public async Task AddCheckerServiceInstanceAndDeleteOthers(string ipAddress, DateTime aliveUTCTime) - { - var collection = GetCollection(); - var deleteResult = await collection.DeleteManyAsync(m => m.ServiceName == _mongodbOption.ServiceCollectionName); - MemberShipTable memberShipTable = new MemberShipTable(_mongodbOption.ServiceCollectionName, ipAddress, aliveUTCTime); - await collection.InsertOneAsync(memberShipTable); - return true; - } - - public async Task GetPendingMessageCheckerServiceInstanceIp(DateTime iamAliveEndTime) - { - var collection = GetCollection(); - - var instances = (await collection.FindAsync(m => m.ServiceName == _mongodbOption.ServiceCollectionName && m.IAmAliveUTCTime >= iamAliveEndTime)).ToList(); - if (instances.Count > 1) - { - _logger.LogInformation($"Current time have {instances.Count} PendingMessageChecker in {_mongodbOption.ServiceCollectionName} service , I will delete the extra instances"); - var currentInstance = instances.FirstOrDefault(); - var extraInstances = instances.Remove(currentInstance); - instances.ForEach(async n => - { - await collection.DeleteOneAsync(m => m.Id == n.Id); - }); - _logger.LogInformation($"Extra PendingMessageChecker instances in {_mongodbOption.ServiceCollectionName} service deleted successfully"); - return currentInstance.PendingMessageCheckerIp; - } - else if (instances.Count == 1) - { - return instances.FirstOrDefault().PendingMessageCheckerIp; - } - else - { - return null; - } - } - - public async Task IsPendingMessageCheckerServiceInstance(string ipAddress) - { - var collection = GetCollection(); - - var instances = (await collection.FindAsync(m => m.ServiceName == _mongodbOption.ServiceCollectionName && m.PendingMessageCheckerIp== ipAddress)).FirstOrDefault(); - if (instances != null) - { - return true; - } - return false; - } - - public async Task UpdateIAmAlive(string ipAddress,DateTime dateTime) - { - var collection = GetCollection(); - var filter = Builders.Filter.Where(m => m.ServiceName == _mongodbOption.ServiceCollectionName && m.PendingMessageCheckerIp == ipAddress); - var update = Builders.Update.Set(m=>m.IAmAliveUTCTime,dateTime); - var result = await collection.UpdateOneAsync(filter, update); - return true; - } - } -} diff --git a/src/Pole.ReliableMessage.Storage.Mongodb/MongodbMemberShipTableManager.cs b/src/Pole.ReliableMessage.Storage.Mongodb/MongodbMemberShipTableManager.cs new file mode 100644 index 0000000..6350b55 --- /dev/null +++ b/src/Pole.ReliableMessage.Storage.Mongodb/MongodbMemberShipTableManager.cs @@ -0,0 +1,94 @@ +using Pole.ReliableMessage.Abstraction; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using MongoDB.Driver; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Pole.ReliableMessage.Storage.Abstraction; + +namespace Pole.ReliableMessage.Storage.Mongodb +{ + class MongodbMemberShipTableManager : IMemberShipTableManager + { + private readonly MongoClient _mongoClient; + private readonly MongodbOption _mongodbOption; + private readonly ILogger _logger; + public MongodbMemberShipTableManager(IConfiguration configuration, MongoClient mongoClient, IOptions mongodbOption, ILogger logger) + { + _mongoClient = mongoClient; + _mongodbOption = mongodbOption.Value; + _logger = logger; + } + private IMongoDatabase GetActiveMessageDatabase(string activeMessageDatabase) + { + return _mongoClient.GetDatabase(activeMessageDatabase); + } + private IMongoCollection GetCollection() + { + var database = GetActiveMessageDatabase(_mongodbOption.MessageDatabaseName); + var messageCollectionName = _mongodbOption.MembershipCollectionName; + var collection = database.GetCollection(messageCollectionName); + return collection; + } + public async Task AddCheckerServiceInstanceAndDeleteOthers(string ipAddress, DateTime aliveUTCTime) + { + var collection = GetCollection(); + var deleteResult = await collection.DeleteManyAsync(m => m.ServiceName == _mongodbOption.ServiceCollectionName); + MemberShipTable memberShipTable = new MemberShipTable(_mongodbOption.ServiceCollectionName, ipAddress, aliveUTCTime); + await collection.InsertOneAsync(memberShipTable); + return true; + } + + public async Task GetPendingMessageCheckerServiceInstanceIp(DateTime iamAliveEndTime) + { + var collection = GetCollection(); + + var instances = (await collection.FindAsync(m => m.ServiceName == _mongodbOption.ServiceCollectionName && m.IAmAliveUTCTime >= iamAliveEndTime)).ToList(); + if (instances.Count > 1) + { + _logger.LogInformation($"Current time have {instances.Count} PendingMessageChecker in {_mongodbOption.ServiceCollectionName} service , I will delete the extra instances"); + var currentInstance = instances.FirstOrDefault(); + var extraInstances = instances.Remove(currentInstance); + instances.ForEach(async n => + { + await collection.DeleteOneAsync(m => m.Id == n.Id); + }); + _logger.LogInformation($"Extra PendingMessageChecker instances in {_mongodbOption.ServiceCollectionName} service deleted successfully"); + return currentInstance.PendingMessageCheckerIp; + } + else if (instances.Count == 1) + { + return instances.FirstOrDefault().PendingMessageCheckerIp; + } + else + { + return null; + } + } + + public async Task IsPendingMessageCheckerServiceInstance(string ipAddress) + { + var collection = GetCollection(); + + var instances = (await collection.FindAsync(m => m.ServiceName == _mongodbOption.ServiceCollectionName && m.PendingMessageCheckerIp== ipAddress)).FirstOrDefault(); + if (instances != null) + { + return true; + } + return false; + } + + public async Task UpdateIAmAlive(string ipAddress,DateTime dateTime) + { + var collection = GetCollection(); + var filter = Builders.Filter.Where(m => m.ServiceName == _mongodbOption.ServiceCollectionName && m.PendingMessageCheckerIp == ipAddress); + var update = Builders.Update.Set(m=>m.IAmAliveUTCTime,dateTime); + var result = await collection.UpdateOneAsync(filter, update); + return true; + } + } +} diff --git a/src/Pole.ReliableMessage.Storage.Mongodb/ReliableMessageOptionExtension.cs b/src/Pole.ReliableMessage.Storage.Mongodb/ReliableMessageOptionExtension.cs index 13c51a4..e33c8c7 100644 --- a/src/Pole.ReliableMessage.Storage.Mongodb/ReliableMessageOptionExtension.cs +++ b/src/Pole.ReliableMessage.Storage.Mongodb/ReliableMessageOptionExtension.cs @@ -35,7 +35,7 @@ namespace Microsoft.Extensions.DependencyInjection { services.Configure(_mongodbOption); services.AddSingleton(); - services.AddSingleton(); + services.AddSingleton(); var mongodbOption = services.BuildServiceProvider().GetRequiredService>().Value; diff --git a/src/Pole.ReliableMessage/Processor/MessageCleanProcessor.cs b/src/Pole.ReliableMessage/Processor/MessageCleanProcessor.cs index 6396e14..7f176b1 100644 --- a/src/Pole.ReliableMessage/Processor/MessageCleanProcessor.cs +++ b/src/Pole.ReliableMessage/Processor/MessageCleanProcessor.cs @@ -15,9 +15,9 @@ namespace Pole.ReliableMessage.Processor private readonly ReliableMessageOption _options; private readonly ILogger _logger; private readonly IMessageStorage _messageStorage; - private readonly IMemberShipTable _memberShipTable; + private readonly IMemberShipTableManager _memberShipTable; private readonly IServiceIPv4AddressProvider _serviceIPv4AddressProvider; - public MessageCleanProcessor(IOptions options, ILogger logger, IMessageStorage messageStorage, IMemberShipTable memberShipTable, IServiceIPv4AddressProvider serviceIPv4AddressProvider) + public MessageCleanProcessor(IOptions options, ILogger logger, IMessageStorage messageStorage, IMemberShipTableManager memberShipTable, IServiceIPv4AddressProvider serviceIPv4AddressProvider) { _options = options.Value ?? throw new Exception($"{nameof(ReliableMessageOption)} Must be injected"); _logger = logger; diff --git a/src/Pole.ReliableMessage/Processor/PendingMessageCheckProcessor.cs b/src/Pole.ReliableMessage/Processor/PendingMessageCheckProcessor.cs index 224c221..c72b3e1 100644 --- a/src/Pole.ReliableMessage/Processor/PendingMessageCheckProcessor.cs +++ b/src/Pole.ReliableMessage/Processor/PendingMessageCheckProcessor.cs @@ -19,11 +19,11 @@ namespace Pole.ReliableMessage.Processor private readonly ReliableMessageOption _options; //private readonly IMessageBuffer _messageBuffer; private readonly ITimeHelper _timeHelper; - private readonly IMemberShipTable _memberShipTable; + private readonly IMemberShipTableManager _memberShipTable; private readonly ILogger _logger; private readonly IServiceIPv4AddressProvider _serviceIPv4AddressProvider; private readonly IMessageCheckRetryer _messageCheckRetryer; - public PendingMessageCheckProcessor(IMessageStorage storage, IOptions options, ITimeHelper timeHelper, IMemberShipTable memberShipTable, ILogger logger, IServiceIPv4AddressProvider serviceIPv4AddressProvider, IMessageCheckRetryer messageCheckRetryer) + public PendingMessageCheckProcessor(IMessageStorage storage, IOptions options, ITimeHelper timeHelper, IMemberShipTableManager memberShipTable, ILogger logger, IServiceIPv4AddressProvider serviceIPv4AddressProvider, IMessageCheckRetryer messageCheckRetryer) { _storage = storage; _options = options.Value ?? throw new Exception($"{nameof(ReliableMessageOption)} Must be injected"); diff --git a/src/Pole.ReliableMessage/Processor/PendingMessageServiceInstanceCheckProcessor.cs b/src/Pole.ReliableMessage/Processor/PendingMessageServiceInstanceCheckProcessor.cs index a1d1e6d..07696b9 100644 --- a/src/Pole.ReliableMessage/Processor/PendingMessageServiceInstanceCheckProcessor.cs +++ b/src/Pole.ReliableMessage/Processor/PendingMessageServiceInstanceCheckProcessor.cs @@ -14,10 +14,10 @@ namespace Pole.ReliableMessage.Processor { private readonly ReliableMessageOption _options; private readonly ITimeHelper _timeHelper; - private readonly IMemberShipTable _memberShipTable; + private readonly IMemberShipTableManager _memberShipTable; private readonly ILogger _logger; private readonly IServiceIPv4AddressProvider _serviceIPv4AddressProvider; - public PendingMessageServiceInstanceCheckProcessor(IOptions options, ITimeHelper timeHelper, IMemberShipTable memberShipTable, ILogger logger, IServiceIPv4AddressProvider serviceIPv4AddressProvider) + public PendingMessageServiceInstanceCheckProcessor(IOptions options, ITimeHelper timeHelper, IMemberShipTableManager memberShipTable, ILogger logger, IServiceIPv4AddressProvider serviceIPv4AddressProvider) { _options = options.Value ?? throw new Exception($"{nameof(ReliableMessageOption)} Must be injected"); _timeHelper = timeHelper; -- libgit2 0.25.0