From b9422953b29d8ca47cb54455651b4c65218ac353 Mon Sep 17 00:00:00 2001 From: dingsongjie Date: Wed, 26 Feb 2020 15:28:12 +0800 Subject: [PATCH] 性能部分 改进完成 --- .gitignore | 1 + samples/apis/Backet.Api/Backet.Api.csproj | 7 +++++++ samples/apis/Backet.Api/Controllers/BacketController.cs | 43 +++++++++++++++++++++++++++++++++---------- samples/apis/Backet.Api/Program.cs | 12 +++++++++++- samples/apis/Backet.Api/Properties/launchSettings.json | 2 +- samples/apis/Backet.Api/Startup.cs | 2 +- samples/apis/Backet.Api/appsettings.json | 2 +- samples/apis/Backet.Api/log4net.config | 23 +++++++++++++++++++++++ src/Pole.Core/EventBus/EventBuffer.cs | 143 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/Pole.Core/EventBus/EventStorage/IEventStorage.cs | 3 ++- src/Pole.Core/EventBus/IEventBuffer.cs | 13 +++++++++++++ src/Pole.Core/EventBus/IProducer.cs | 6 ++++-- src/Pole.Core/EventBus/IProducerContainer.cs | 11 ----------- src/Pole.Core/EventBus/IProducerInfoContainer.cs | 10 ++++++++++ src/Pole.Core/Exceptions/AddEventToEventBufferException.cs | 14 ++++++++++++++ src/Pole.Core/Extensions/PoleServiceCollectionExtensions.cs | 1 + src/Pole.Core/Processor/ExpiredEventsCollectorProcessor.cs | 9 +++------ src/Pole.Core/Processor/PendingMessageRetryProcessor.cs | 25 +++++++++++++++++-------- src/Pole.Core/UnitOfWork/UnitOfWork.cs | 22 +++++++++++----------- src/Pole.EventBus.Rabbitmq/Client/ConnectionWrapper.cs | 1 + src/Pole.EventBus.Rabbitmq/Client/ModelWrapper.cs | 18 ++++-------------- src/Pole.EventBus.Rabbitmq/Configuration/RabbitOptions.cs | 2 +- src/Pole.EventBus.Rabbitmq/Core/EventBusContainer.cs | 17 +++++------------ src/Pole.EventBus.Rabbitmq/PoleRabbitmqStartupConfigExtensions.cs | 3 ++- src/Pole.EventBus.Rabbitmq/Producer/RabbitProducer.cs | 36 +++++++++++++++++++++++++++--------- src/Pole.EventStorage.PostgreSql/PoleNpgsqlBulkUploader.cs | 76 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/Pole.EventStorage.PostgreSql/PostgreSqlEventStorage.cs | 32 +++++++++++++++++++++----------- src/Pole.Orleans.Provider.EntityframeworkCore/GrainStorage.cs | 1 - test/Pole.Samples.Backet.Api/Benchmarks/GrainWithEntityframeworkCoreAndPgTest.cs | 14 +++++++++++++- test/Pole.Samples.Backet.Api/Pole.Samples.Backet.Api.csproj | 1 + test/Pole.Samples.Backet.Api/Program.cs | 13 +++++++++++-- 31 files changed, 458 insertions(+), 105 deletions(-) create mode 100644 samples/apis/Backet.Api/log4net.config create mode 100644 src/Pole.Core/EventBus/EventBuffer.cs create mode 100644 src/Pole.Core/EventBus/IEventBuffer.cs delete mode 100644 src/Pole.Core/EventBus/IProducerContainer.cs create mode 100644 src/Pole.Core/EventBus/IProducerInfoContainer.cs create mode 100644 src/Pole.Core/Exceptions/AddEventToEventBufferException.cs create mode 100644 src/Pole.EventStorage.PostgreSql/PoleNpgsqlBulkUploader.cs diff --git a/.gitignore b/.gitignore index 64f27d9..67f5dde 100644 --- a/.gitignore +++ b/.gitignore @@ -260,3 +260,4 @@ paket-files/ __pycache__/ *.pyc +/samples/apis/Backet.Api/Logger diff --git a/samples/apis/Backet.Api/Backet.Api.csproj b/samples/apis/Backet.Api/Backet.Api.csproj index 27d5320..211eb50 100644 --- a/samples/apis/Backet.Api/Backet.Api.csproj +++ b/samples/apis/Backet.Api/Backet.Api.csproj @@ -12,11 +12,13 @@ + all runtime; build; native; contentfiles; analyzers; buildtransitive + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/src/Pole.Core/EventBus/EventBuffer.cs b/src/Pole.Core/EventBus/EventBuffer.cs new file mode 100644 index 0000000..5b6b4f7 --- /dev/null +++ b/src/Pole.Core/EventBus/EventBuffer.cs @@ -0,0 +1,143 @@ +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using Pole.Core.EventBus.Event; +using Pole.Core.EventBus.EventStorage; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using System.Threading.Tasks.Dataflow; + +namespace Pole.Core.EventBus +{ + class EventBuffer : IEventBuffer + { + readonly BufferBlock buffer = new BufferBlock(); + private int autoConsuming = 0; + private readonly ILogger logger; + /// + /// 批量数据处理每次处理的最大数据量 + /// + private readonly int maxBatchSize = 10000; + /// + /// 批量数据接收的最大延时 + /// + private readonly int maxMillisecondsDelay = 2000; + private readonly IProducerInfoContainer producerContainer; + private readonly IProducer producer; + private readonly IEventStorage eventStorage; + private readonly PoleOptions options; + private Task waitToReadTask; + public EventBuffer(ILogger logger, IProducerInfoContainer producerContainer, IProducer producer, IEventStorage eventStorage, IOptions options) + { + this.logger = logger; + this.producerContainer = producerContainer; + this.producer = producer; + this.eventStorage = eventStorage; + this.options = options.Value; + } + public async Task AddAndRun(EventEntity eventEntity) + { + if (!buffer.Post(eventEntity)) + return await buffer.SendAsync(eventEntity); + if (autoConsuming == 0) + ActiveAutoExecute(); + + return true; + } + private void ActiveAutoExecute() + { + if (autoConsuming == 0) + ThreadPool.QueueUserWorkItem(ActiveConsumer); + async void ActiveConsumer(object state) + { + if (Interlocked.CompareExchange(ref autoConsuming, 1, 0) == 0) + { + try + { + while (await WaitToReadAsync()) + { + try + { + await Execute(); + } + catch (Exception ex) + { + logger.LogError(ex, ex.Message); + } + } + } + finally + { + Interlocked.Exchange(ref autoConsuming, 0); + } + } + } + } + public async Task WaitToReadAsync() + { + waitToReadTask = buffer.OutputAvailableAsync(); + return await waitToReadTask; + + } + public async Task Execute() + { + if (waitToReadTask.IsCompletedSuccessfully && waitToReadTask.Result) + { + var dataList = new List(); + var startTime = DateTimeOffset.UtcNow; + while (buffer.TryReceive(out var value)) + { + dataList.Add(value); + if (dataList.Count > maxBatchSize) + { + break; + } + else if ((DateTimeOffset.UtcNow - startTime).TotalMilliseconds > maxMillisecondsDelay) + { + break; + } + } + if (dataList.Count > 0) + { + await ExecuteCore(dataList); + } + + } + } + private async Task ExecuteCore(List eventEntities) + { + logger.LogError($"Begin ExecuteCore Count:{eventEntities.Count} "); + var events = eventEntities.Select(entity => + { + var eventContentBytes = Encoding.UTF8.GetBytes(entity.Content); + var bytesTransport = new EventBytesTransport(entity.Name, entity.Id, eventContentBytes); + var bytes = bytesTransport.GetBytes(); + var targetName = producerContainer.GetTargetName(entity.Name); + entity.StatusName = nameof(EventStatus.Published); + entity.ExpiresAt = DateTime.UtcNow.AddSeconds(options.PublishedEventsExpiredAfterSeconds); + return (targetName, bytes); + }); + eventEntities.ForEach(entity => + { + entity.StatusName = nameof(EventStatus.Published); + entity.ExpiresAt = DateTime.UtcNow.AddSeconds(options.PublishedEventsExpiredAfterSeconds); + }); + logger.LogError($"Begin BulkPublish {DateTime.Now.ToString("yyyy-MM-dd hh:mm:ss")} "); + await producer.BulkPublish(events); + logger.LogError($"Begin BulkPublish {DateTime.Now.ToString("yyyy-MM-dd hh:mm:ss")} "); + if (eventEntities.Count > 10) + { + await eventStorage.BulkChangePublishStateAsync(eventEntities); + } + else + { + await eventStorage.ChangePublishStateAsync(eventEntities); + } + + logger.LogError($"End ExecuteCore {DateTime.Now.ToString("yyyy-MM-dd hh:mm:ss")} Count:{eventEntities.Count} "); + } + } +} diff --git a/src/Pole.Core/EventBus/EventStorage/IEventStorage.cs b/src/Pole.Core/EventBus/EventStorage/IEventStorage.cs index 8e7f4bc..5f45b0f 100644 --- a/src/Pole.Core/EventBus/EventStorage/IEventStorage.cs +++ b/src/Pole.Core/EventBus/EventStorage/IEventStorage.cs @@ -9,7 +9,8 @@ namespace Pole.Core.EventBus.EventStorage public interface IEventStorage { Task ChangePublishStateAsync(EventEntity message, EventStatus state); - Task BulkChangePublishStateAsync(IEnumerable messages); + Task ChangePublishStateAsync(IEnumerable messages); + Task BulkChangePublishStateAsync(IEnumerable events); Task StoreMessage(EventEntity eventEntity, object dbTransaction = null); diff --git a/src/Pole.Core/EventBus/IEventBuffer.cs b/src/Pole.Core/EventBus/IEventBuffer.cs new file mode 100644 index 0000000..b278016 --- /dev/null +++ b/src/Pole.Core/EventBus/IEventBuffer.cs @@ -0,0 +1,13 @@ +using Pole.Core.EventBus.EventStorage; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; + +namespace Pole.Core.EventBus +{ + public interface IEventBuffer + { + Task AddAndRun(EventEntity eventEntity); + } +} diff --git a/src/Pole.Core/EventBus/IProducer.cs b/src/Pole.Core/EventBus/IProducer.cs index dffc879..e215757 100644 --- a/src/Pole.Core/EventBus/IProducer.cs +++ b/src/Pole.Core/EventBus/IProducer.cs @@ -1,9 +1,11 @@ -using System.Threading.Tasks; +using System.Collections.Generic; +using System.Threading.Tasks; namespace Pole.Core.EventBus { public interface IProducer { - ValueTask Publish(byte[] bytes); + ValueTask Publish(string targetName, byte[] bytes); + ValueTask BulkPublish(IEnumerable<(string,byte[])> events); } } diff --git a/src/Pole.Core/EventBus/IProducerContainer.cs b/src/Pole.Core/EventBus/IProducerContainer.cs deleted file mode 100644 index 2006625..0000000 --- a/src/Pole.Core/EventBus/IProducerContainer.cs +++ /dev/null @@ -1,11 +0,0 @@ -using System; -using System.Threading.Tasks; - -namespace Pole.Core.EventBus -{ - public interface IProducerContainer - { - ValueTask GetProducer(); - ValueTask GetProducer(string typeName); - } -} diff --git a/src/Pole.Core/EventBus/IProducerInfoContainer.cs b/src/Pole.Core/EventBus/IProducerInfoContainer.cs new file mode 100644 index 0000000..7838ca3 --- /dev/null +++ b/src/Pole.Core/EventBus/IProducerInfoContainer.cs @@ -0,0 +1,10 @@ +using System; +using System.Threading.Tasks; + +namespace Pole.Core.EventBus +{ + public interface IProducerInfoContainer + { + string GetTargetName(string typeName); + } +} diff --git a/src/Pole.Core/Exceptions/AddEventToEventBufferException.cs b/src/Pole.Core/Exceptions/AddEventToEventBufferException.cs new file mode 100644 index 0000000..bac385d --- /dev/null +++ b/src/Pole.Core/Exceptions/AddEventToEventBufferException.cs @@ -0,0 +1,14 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace Pole.Core.Exceptions +{ + public class AddEventToEventBufferException: Exception + { + public AddEventToEventBufferException() : base("Errors when add event to the event buffer ") + { + + } + } +} diff --git a/src/Pole.Core/Extensions/PoleServiceCollectionExtensions.cs b/src/Pole.Core/Extensions/PoleServiceCollectionExtensions.cs index eb184f5..6a538bf 100644 --- a/src/Pole.Core/Extensions/PoleServiceCollectionExtensions.cs +++ b/src/Pole.Core/Extensions/PoleServiceCollectionExtensions.cs @@ -24,6 +24,7 @@ namespace Microsoft.Extensions.DependencyInjection services.Configure(option => { }); } services.AddSingleton(); + services.AddSingleton(); services.AddTransient(typeof(IMpscChannel<>), typeof(MpscChannel<>)); services.AddScoped(); services.AddScoped(); diff --git a/src/Pole.Core/Processor/ExpiredEventsCollectorProcessor.cs b/src/Pole.Core/Processor/ExpiredEventsCollectorProcessor.cs index 72c25d9..08f19a7 100644 --- a/src/Pole.Core/Processor/ExpiredEventsCollectorProcessor.cs +++ b/src/Pole.Core/Processor/ExpiredEventsCollectorProcessor.cs @@ -37,17 +37,14 @@ namespace Pole.Core.Processor { try { - var tables = new[] -{ - initializer.GetTableName(), - }; + var tables = new[] { initializer.GetTableName() }; foreach (var table in tables) { logger.LogDebug($"Collecting expired data from table: {table}"); int deletedCount; - var time = DateTime.Now; + var time = DateTime.UtcNow; do { deletedCount = await eventstorage.DeleteExpiresAsync(table, time, ItemBatch, context.CancellationToken); @@ -59,7 +56,7 @@ namespace Pole.Core.Processor } while (deletedCount != 0); } } - catch(Exception ex) + catch (Exception ex) { logger.LogError(ex, $"{nameof(ExpiredEventsCollectorProcessor)} Process Error"); } diff --git a/src/Pole.Core/Processor/PendingMessageRetryProcessor.cs b/src/Pole.Core/Processor/PendingMessageRetryProcessor.cs index d148170..54db6d7 100644 --- a/src/Pole.Core/Processor/PendingMessageRetryProcessor.cs +++ b/src/Pole.Core/Processor/PendingMessageRetryProcessor.cs @@ -16,12 +16,14 @@ namespace Pole.Core.Processor { private readonly IEventStorage eventStorage; private readonly PoleOptions options; - private readonly IProducerContainer producerContainer; + private readonly IProducerInfoContainer producerContainer; private readonly ISerializer serializer; private readonly ILogger logger; private readonly ProducerOptions producerOptions; + private readonly IProducer producer; + private readonly IEventBuffer eventBuffer; public PendingMessageRetryProcessor(IEventStorage eventStorage, IOptions options, ILogger logger, - IProducerContainer producerContainer, ISerializer serializer, IOptions producerOptions) + IProducerInfoContainer producerContainer, ISerializer serializer, IOptions producerOptions, IProducer producer, IEventBuffer eventBuffer) { this.eventStorage = eventStorage; this.options = options.Value ?? throw new Exception($"{nameof(PoleOptions)} Must be injected"); @@ -29,6 +31,8 @@ namespace Pole.Core.Processor this.producerContainer = producerContainer; this.serializer = serializer; this.producerOptions = producerOptions.Value ?? throw new Exception($"{nameof(ProducerOptions)} Must be injected"); + this.producer = producer; + this.eventBuffer = eventBuffer; } public override string Name => nameof(PendingMessageRetryProcessor); @@ -67,15 +71,20 @@ namespace Pole.Core.Processor pendingMessage.ExpiresAt = DateTime.UtcNow; } pendingMessage.Retries++; - var producer = await producerContainer.GetProducer(pendingMessage.Name); - await producer.Publish(bytes); - pendingMessage.StatusName = nameof(EventStatus.Published); - pendingMessage.ExpiresAt = DateTime.UtcNow.AddSeconds(options.PublishedEventsExpiredAfterSeconds); + var targetName = producerContainer.GetTargetName(pendingMessage.Name); + await producer.Publish(targetName, bytes); } if (pendingMessages.Count() > 0) { - await eventStorage.BulkChangePublishStateAsync(pendingMessages); - } + if (pendingMessages.Count() > 10) + { + await eventStorage.BulkChangePublishStateAsync(pendingMessages); + } + else + { + await eventStorage.ChangePublishStateAsync(pendingMessages); + } + } } } } diff --git a/src/Pole.Core/UnitOfWork/UnitOfWork.cs b/src/Pole.Core/UnitOfWork/UnitOfWork.cs index f54101e..b4e7ba1 100644 --- a/src/Pole.Core/UnitOfWork/UnitOfWork.cs +++ b/src/Pole.Core/UnitOfWork/UnitOfWork.cs @@ -13,24 +13,24 @@ using Pole.Core.EventBus.Event; using Pole.Core.EventBus.EventStorage; using Microsoft.Extensions.Options; using Pole.Core.Utils.Abstraction; +using Pole.Core.Exceptions; namespace Pole.Core.UnitOfWork { class UnitOfWork : IUnitOfWork { - private readonly IProducerContainer producerContainer; + private readonly IProducerInfoContainer producerContainer; private readonly IEventTypeFinder eventTypeFinder; private readonly ISerializer serializer; - private readonly IEventStorage eventStorage; - private readonly PoleOptions options; private IBus bus; - public UnitOfWork(IProducerContainer producerContainer, IEventTypeFinder eventTypeFinder, ISerializer serializer, IEventStorage eventStorage, IOptions options) + private IEventBuffer eventBuffer; + public UnitOfWork(IProducerInfoContainer producerContainer, IEventTypeFinder eventTypeFinder, + ISerializer serializer, IEventBuffer eventBuffer) { this.producerContainer = producerContainer; this.eventTypeFinder = eventTypeFinder; this.serializer = serializer; - this.eventStorage = eventStorage; - this.options = options.Value; + this.eventBuffer = eventBuffer; } public async Task CompeleteAsync(CancellationToken cancellationToken = default) @@ -45,12 +45,12 @@ namespace Pole.Core.UnitOfWork var eventContentBytes = Encoding.UTF8.GetBytes(@event.Content); var bytesTransport = new EventBytesTransport(@event.Name, @event.Id, eventContentBytes); var bytes = bytesTransport.GetBytes(); - var producer = await producerContainer.GetProducer(@event.Name); - await producer.Publish(bytes); - @event.StatusName = nameof(EventStatus.Published); - @event.ExpiresAt = DateTime.UtcNow.AddSeconds(options.PublishedEventsExpiredAfterSeconds); + var result = await eventBuffer.AddAndRun(@event); + if (!result) + { + throw new AddEventToEventBufferException(); + } }); - await eventStorage.BulkChangePublishStateAsync(bufferedEvents); } public void Dispose() diff --git a/src/Pole.EventBus.Rabbitmq/Client/ConnectionWrapper.cs b/src/Pole.EventBus.Rabbitmq/Client/ConnectionWrapper.cs index b1511aa..6bd9049 100644 --- a/src/Pole.EventBus.Rabbitmq/Client/ConnectionWrapper.cs +++ b/src/Pole.EventBus.Rabbitmq/Client/ConnectionWrapper.cs @@ -26,6 +26,7 @@ namespace Pole.EventBus.RabbitMQ if (channels.Count < Options.MasChannelsPerConnection) { var channel = new ModelWrapper(this, connection.CreateModel()); + channel.Model.ConfirmSelect(); channels.Add(channel); return (true, channel); } diff --git a/src/Pole.EventBus.Rabbitmq/Client/ModelWrapper.cs b/src/Pole.EventBus.Rabbitmq/Client/ModelWrapper.cs index 6539051..ed38ce4 100644 --- a/src/Pole.EventBus.Rabbitmq/Client/ModelWrapper.cs +++ b/src/Pole.EventBus.Rabbitmq/Client/ModelWrapper.cs @@ -41,7 +41,6 @@ namespace Pole.EventBus.RabbitMQ { noPersistentProperties.Headers = headers; } - Model.ConfirmSelect(); Model.BasicPublish(exchange, routingKey, persistent ? persistentProperties : noPersistentProperties, msg); if (!Model.WaitForConfirms(TimeSpan.FromSeconds(Connection.Options.ProducerConfirmWaitTimeoutSeconds), out bool isTimeout)) { @@ -56,22 +55,13 @@ namespace Pole.EventBus.RabbitMQ } } + public void WaitForConfirmsOrDie(TimeSpan timeSpan) + { + Model.WaitForConfirmsOrDie(timeSpan); + } public void Publish(byte[] msg, string exchange, string routingKey, bool persistent = true) { - Model.ConfirmSelect(); Model.BasicPublish(exchange, routingKey, persistent ? persistentProperties : noPersistentProperties, msg); - if (!Model.WaitForConfirms(TimeSpan.FromSeconds(Connection.Options.ProducerConfirmWaitTimeoutSeconds), out bool isTimeout)) - { - if (isTimeout) - { - throw new ProducerConfirmTimeOutException(Connection.Options.ProducerConfirmWaitTimeoutSeconds); - } - else - { - throw new ProducerReceivedNAckException(); - } - } - } public void Dispose() { diff --git a/src/Pole.EventBus.Rabbitmq/Configuration/RabbitOptions.cs b/src/Pole.EventBus.Rabbitmq/Configuration/RabbitOptions.cs index 7a6b707..d3736e2 100644 --- a/src/Pole.EventBus.Rabbitmq/Configuration/RabbitOptions.cs +++ b/src/Pole.EventBus.Rabbitmq/Configuration/RabbitOptions.cs @@ -9,7 +9,7 @@ namespace Pole.EventBus.RabbitMQ public string Password { get; set; } public string VirtualHost { get; set; } = "/"; public int Port { get; set; } = 5672; - public int MasChannelsPerConnection { get; set; } = 200; + public int MasChannelsPerConnection { get; set; } = 512; /// /// 目前为一个连接 当消息数量非常大时,单个TCP连接的运输能力有限,可以修改这个最大连接数提高运输能力 /// diff --git a/src/Pole.EventBus.Rabbitmq/Core/EventBusContainer.cs b/src/Pole.EventBus.Rabbitmq/Core/EventBusContainer.cs index 63ce567..989d0af 100644 --- a/src/Pole.EventBus.Rabbitmq/Core/EventBusContainer.cs +++ b/src/Pole.EventBus.Rabbitmq/Core/EventBusContainer.cs @@ -16,7 +16,7 @@ using System.Linq; namespace Pole.EventBus.RabbitMQ { - public class EventBusContainer : IRabbitEventBusContainer, IProducerContainer + public class EventBusContainer : IRabbitEventBusContainer, IProducerInfoContainer { private readonly ConcurrentDictionary eventBusDictionary = new ConcurrentDictionary(); private readonly List eventBusList = new List(); @@ -79,25 +79,17 @@ namespace Pole.EventBus.RabbitMQ readonly ConcurrentDictionary producerDict = new ConcurrentDictionary(); - - public ValueTask GetProducer(string typeName) + public string GetTargetName(string typeName) { if (eventBusDictionary.TryGetValue(typeName, out var eventBus)) { - return new ValueTask(producerDict.GetOrAdd(typeName, key => - { - return new RabbitProducer(rabbitMQClient, eventBus, rabbitOptions); - })); + return $"{rabbitOptions.Prefix}{eventBus.Exchange}"; } else { - throw new NotImplementedException($"{nameof(IProducer)} of {typeName}"); + throw new NotImplementedException($"{nameof(RabbitEventBus)} of {typeName}"); } } - public ValueTask GetProducer() - { - return GetProducer(typeof(T).FullName); - } public List GetConsumers() { var result = new List(); @@ -160,6 +152,7 @@ namespace Pole.EventBus.RabbitMQ } } + #endregion } } diff --git a/src/Pole.EventBus.Rabbitmq/PoleRabbitmqStartupConfigExtensions.cs b/src/Pole.EventBus.Rabbitmq/PoleRabbitmqStartupConfigExtensions.cs index 755fd33..ae40c95 100644 --- a/src/Pole.EventBus.Rabbitmq/PoleRabbitmqStartupConfigExtensions.cs +++ b/src/Pole.EventBus.Rabbitmq/PoleRabbitmqStartupConfigExtensions.cs @@ -22,7 +22,8 @@ namespace Microsoft.Extensions.DependencyInjection startupOption.Services.AddSingleton(); //startupOption.Services.AddHostedService(); startupOption.Services.AddSingleton(); - startupOption.Services.AddSingleton(serviceProvider => serviceProvider.GetService() as IProducerContainer); + startupOption.Services.AddSingleton(); + startupOption.Services.AddSingleton(serviceProvider => serviceProvider.GetService() as IProducerInfoContainer); Startup.Register(async serviceProvider => { var container = serviceProvider.GetService(); diff --git a/src/Pole.EventBus.Rabbitmq/Producer/RabbitProducer.cs b/src/Pole.EventBus.Rabbitmq/Producer/RabbitProducer.cs index a3f2c06..368e6fc 100644 --- a/src/Pole.EventBus.Rabbitmq/Producer/RabbitProducer.cs +++ b/src/Pole.EventBus.Rabbitmq/Producer/RabbitProducer.cs @@ -1,28 +1,46 @@ -using Pole.Core; +using Microsoft.Extensions.Options; +using Pole.Core; using Pole.Core.EventBus; +using System; using System.Collections.Generic; +using System.Linq; using System.Threading.Tasks; namespace Pole.EventBus.RabbitMQ { public class RabbitProducer : IProducer { - readonly RabbitEventBus publisher; readonly IRabbitMQClient rabbitMQClient; readonly RabbitOptions rabbitOptions; public RabbitProducer( IRabbitMQClient rabbitMQClient, - RabbitEventBus publisher, - RabbitOptions rabbitOptions) + IOptions rabbitOptions) { - this.publisher = publisher; this.rabbitMQClient = rabbitMQClient; - this.rabbitOptions = rabbitOptions; + this.rabbitOptions = rabbitOptions.Value; } - public ValueTask Publish(byte[] bytes) + + public ValueTask BulkPublish(IEnumerable<(string, byte[])> events) + { + using (var channel = rabbitMQClient.PullChannel()) + { + events.ToList().ForEach(@event => + { + channel.Publish(@event.Item2, @event.Item1, string.Empty, true); + }); + + channel.WaitForConfirmsOrDie(TimeSpan.FromSeconds(rabbitOptions.ProducerConfirmWaitTimeoutSeconds)); + } + return Consts.ValueTaskDone; + } + + public ValueTask Publish(string targetName, byte[] bytes) { - using var channel = rabbitMQClient.PullChannel(); - channel.Publish(bytes, $"{rabbitOptions.Prefix}{publisher.Exchange}", string.Empty, publisher.Persistent); + using (var channel = rabbitMQClient.PullChannel()) + { + channel.Publish(bytes, targetName, string.Empty, true); + channel.WaitForConfirmsOrDie(TimeSpan.FromSeconds(rabbitOptions.ProducerConfirmWaitTimeoutSeconds)); + } return Consts.ValueTaskDone; } } diff --git a/src/Pole.EventStorage.PostgreSql/PoleNpgsqlBulkUploader.cs b/src/Pole.EventStorage.PostgreSql/PoleNpgsqlBulkUploader.cs new file mode 100644 index 0000000..5f3c8cf --- /dev/null +++ b/src/Pole.EventStorage.PostgreSql/PoleNpgsqlBulkUploader.cs @@ -0,0 +1,76 @@ +using Npgsql; +using NpgsqlTypes; +using Pole.Core.EventBus.EventStorage; +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Dapper; + +namespace Pole.EventStorage.PostgreSql +{ + public class PoleNpgsqlBulkUploader + { + private readonly NpgsqlConnection npgsqlConnection; + private static int tablesCounter = 0; + private static string uniqueTablePrefix = Guid.NewGuid().ToString().Replace("-", "_"); + + public PoleNpgsqlBulkUploader(NpgsqlConnection npgsqlConnection) + { + this.npgsqlConnection = npgsqlConnection; + } + public async Task UpdateAsync(string tableName, IEnumerable eventEntities) + { + await npgsqlConnection.OpenAsync(); + using (var transaction = await npgsqlConnection.BeginTransactionAsync()) + { + var tempTableName = GetUniqueName("_temp_"); + + // 1. Create temp table + var sql = $"CREATE TEMP TABLE {tempTableName} ON COMMIT DROP AS SELECT \"Retries\" , \"ExpiresAt\" , \"StatusName\" , \"Id\" FROM {tableName} LIMIT 0"; + await npgsqlConnection.ExecuteAsync(sql); + + // 2. Import into temp table + using (var importer = npgsqlConnection.BeginBinaryImport($"COPY {tempTableName} (\"Retries\" , \"ExpiresAt\" , \"StatusName\" , \"Id\") FROM STDIN (FORMAT BINARY)")) + { + foreach (var item in eventEntities) + { + importer.StartRow(); + importer.Write(item.Retries); + if (item.ExpiresAt.HasValue) + { + importer.Write(item.ExpiresAt.Value, NpgsqlDbType.Timestamp); + } + else + { + importer.Write(DBNull.Value); + } + + importer.Write(item.StatusName, NpgsqlDbType.Varchar); + importer.Write(item.Id, NpgsqlDbType.Varchar); + } + importer.Complete(); + } + + // 3. Insert into real table from temp one + sql = $"UPDATE {tableName} target SET \"Retries\" = \"source\".\"Retries\" , \"ExpiresAt\" = \"source\".\"ExpiresAt\" , \"StatusName\" = \"source\".\"StatusName\" FROM {tempTableName} as source WHERE \"target\".\"Id\" = \"source\".\"Id\""; + await npgsqlConnection.ExecuteAsync(sql); + // 5. Commit + transaction?.Commit(); + } + } + + /// + /// Get unique object name using user-defined prefix. + /// + /// Prefix. + /// Unique name. + static string GetUniqueName(string prefix) + { + var counter = Interlocked.Increment(ref tablesCounter); + return $"{prefix}_{uniqueTablePrefix}_{counter}"; + } + } +} diff --git a/src/Pole.EventStorage.PostgreSql/PostgreSqlEventStorage.cs b/src/Pole.EventStorage.PostgreSql/PostgreSqlEventStorage.cs index 4bd6d32..ae4d464 100644 --- a/src/Pole.EventStorage.PostgreSql/PostgreSqlEventStorage.cs +++ b/src/Pole.EventStorage.PostgreSql/PostgreSqlEventStorage.cs @@ -28,18 +28,28 @@ namespace Pole.EventStorage.PostgreSql tableName = eventStorageInitializer.GetTableName(); } - public async Task BulkChangePublishStateAsync(IEnumerable events) + public async Task ChangePublishStateAsync(IEnumerable events) { var sql = -$"UPDATE {tableName} SET \"Retries\"=@Retries,\"ExpiresAt\"=@ExpiresAt,\"StatusName\"=@StatusName WHERE \"Id\" IN (@Ids)"; - using var connection = new NpgsqlConnection(options.ConnectionString); - await connection.ExecuteAsync(sql, events.Select(@event=> new +$"UPDATE {tableName} SET \"Retries\"=@Retries,\"ExpiresAt\"=@ExpiresAt,\"StatusName\"=@StatusName WHERE \"Id\" = @Id"; + using (var connection = new NpgsqlConnection(options.ConnectionString)) { - Ids =string.Join(',',events.Select(@event=>@event.Id).ToArray()), - @event.Retries, - @event.ExpiresAt, - @event.StatusName - }).ToList()); + var result = await connection.ExecuteAsync(sql, events.Select(@event => new + { + Id = @event.Id, + @event.Retries, + @event.ExpiresAt, + @event.StatusName + }).ToList()); + } + } + public async Task BulkChangePublishStateAsync(IEnumerable events) + { + using (var connection = new NpgsqlConnection(options.ConnectionString)) + { + var uploader = new PoleNpgsqlBulkUploader(connection); + await uploader.UpdateAsync(tableName, events); + } } public async Task ChangePublishStateAsync(EventEntity message, EventStatus state) @@ -79,8 +89,8 @@ $"UPDATE {tableName} SET \"Retries\"=@Retries,\"ExpiresAt\"=@ExpiresAt,\"StatusN result.Add(new EventEntity { Id = reader.GetString(0), - Name=reader.GetString(2), - Content=reader.GetString(3), + Name = reader.GetString(2), + Content = reader.GetString(3), Retries = reader.GetInt32(4), Added = reader.GetDateTime(5) }); diff --git a/src/Pole.Orleans.Provider.EntityframeworkCore/GrainStorage.cs b/src/Pole.Orleans.Provider.EntityframeworkCore/GrainStorage.cs index 682ddea..73d8d74 100644 --- a/src/Pole.Orleans.Provider.EntityframeworkCore/GrainStorage.cs +++ b/src/Pole.Orleans.Provider.EntityframeworkCore/GrainStorage.cs @@ -54,7 +54,6 @@ namespace Pole.Orleans.Provider.EntityframeworkCore { TEntity entity = await _options.ReadStateAsync(context, grainReference) .ConfigureAwait(false); - _options.SetEntity(grainState, entity); if (entity != null && _options.CheckForETag) diff --git a/test/Pole.Samples.Backet.Api/Benchmarks/GrainWithEntityframeworkCoreAndPgTest.cs b/test/Pole.Samples.Backet.Api/Benchmarks/GrainWithEntityframeworkCoreAndPgTest.cs index eeeecfd..d57bed9 100644 --- a/test/Pole.Samples.Backet.Api/Benchmarks/GrainWithEntityframeworkCoreAndPgTest.cs +++ b/test/Pole.Samples.Backet.Api/Benchmarks/GrainWithEntityframeworkCoreAndPgTest.cs @@ -2,10 +2,12 @@ using BenchmarkDotNet.Attributes; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.DependencyInjection; +using Npgsql; using System; using System.Collections.Generic; using System.Text; using System.Threading.Tasks; +using Dapper; namespace Pole.Samples.Backet.Api.Benchmarks { @@ -15,7 +17,7 @@ namespace Pole.Samples.Backet.Api.Benchmarks public GrainWithEntityframeworkCoreAndPgTest() { var services = new ServiceCollection(); - services.AddDbContextPool(options => options.UseNpgsql("Server=192.168.0.248;Port=5432;Username=postgres;Password=comteck2020!@#;Database=Pole-Backet;Enlist=True;Timeout=0;Command Timeout=600;Pooling=false;MinPoolSize=20;MaxPoolSize=500;")); + services.AddDbContextPool(options => options.UseNpgsql("Server=192.168.0.248;Port=5432;Username=postgres;Password=comteck2020!@#;Database=Pole-Backet;Enlist=True;Timeout=0;Command Timeout=600;MinPoolSize=20;MaxPoolSize=500;")); serviceProvider = services.BuildServiceProvider(); } [Benchmark] @@ -28,5 +30,15 @@ namespace Pole.Samples.Backet.Api.Benchmarks var entity = await context.Backets.Include(box => box.BacketItems).SingleOrDefaultAsync(m => m.Id == "222"); } } + [Benchmark] + public async Task DapperOpenConnection() + { + using (NpgsqlConnection conn = new NpgsqlConnection("Server=192.168.0.251;Port=5432;Username=postgres;Password=comteck2020!@#;Database=smartretail-tenant;Enlist=True;")) + { + await conn.OpenAsync(); + //var teams = await conn.QueryAsync("SELECT * FROM \"public\".\"Backet\" where \"Id\" =@Id", new { Id = newId }); + var teams = await conn.ExecuteAsync("SELECT 1"); + } + } } } diff --git a/test/Pole.Samples.Backet.Api/Pole.Samples.Backet.Api.csproj b/test/Pole.Samples.Backet.Api/Pole.Samples.Backet.Api.csproj index 69bfcfb..2797744 100644 --- a/test/Pole.Samples.Backet.Api/Pole.Samples.Backet.Api.csproj +++ b/test/Pole.Samples.Backet.Api/Pole.Samples.Backet.Api.csproj @@ -14,6 +14,7 @@ + diff --git a/test/Pole.Samples.Backet.Api/Program.cs b/test/Pole.Samples.Backet.Api/Program.cs index 9280618..2adc1a8 100644 --- a/test/Pole.Samples.Backet.Api/Program.cs +++ b/test/Pole.Samples.Backet.Api/Program.cs @@ -1,5 +1,7 @@ using BenchmarkDotNet.Reports; using BenchmarkDotNet.Running; +using Npgsql; +using Pole.Core.EventBus.EventStorage; using Pole.Samples.Backet.Api.Benchmarks; using System; using System.Collections.Generic; @@ -14,8 +16,15 @@ namespace Pole.Samples.Backet.Api { //GrainWithEntityframeworkCoreAndPgTest grainWithEntityframeworkCoreAndPgTest = new GrainWithEntityframeworkCoreAndPgTest(); //await grainWithEntityframeworkCoreAndPgTest.SingleOrDefaultAsync(); - Summary summary = BenchmarkRunner.Run(); - Console.ReadLine(); + //Summary summary = BenchmarkRunner.Run(); + //Console.ReadLine(); + using ( var connection = new NpgsqlConnection("Server=192.168.0.248;Port=5432;Username=postgres;Password=comteck2020!@#;Database=Pole-Backet;Enlist=True;Timeout=0;Command Timeout=600;MinPoolSize=20;MaxPoolSize=500;")) + { + var uploader = new Pole.EventStorage.PostgreSql.PoleNpgsqlBulkUploader(connection); + var events = new List(); + events.Add(new EventEntity { Id = "111", Retries = 20, ExpiresAt = DateTime.Now, StatusName = "333" }); + await uploader.UpdateAsync("\"pole\".\"Events\"", events); + } } } } -- libgit2 0.25.0