Commit b9422953 by dingsongjie

性能部分 改进完成

parent 07435322
Showing with 451 additions and 97 deletions
...@@ -260,3 +260,4 @@ paket-files/ ...@@ -260,3 +260,4 @@ paket-files/
__pycache__/ __pycache__/
*.pyc *.pyc
/samples/apis/Backet.Api/Logger
...@@ -12,11 +12,13 @@ ...@@ -12,11 +12,13 @@
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="Dapper" Version="2.0.30" />
<PackageReference Include="Grpc.AspNetCore" Version="2.26.0" /> <PackageReference Include="Grpc.AspNetCore" Version="2.26.0" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Tools" Version="3.1.1"> <PackageReference Include="Microsoft.EntityFrameworkCore.Tools" Version="3.1.1">
<PrivateAssets>all</PrivateAssets> <PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets> <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference> </PackageReference>
<PackageReference Include="Microsoft.Extensions.Logging.Log4Net.AspNetCore" Version="3.0.3" />
<PackageReference Include="Microsoft.Orleans.Client" Version="3.1.0" /> <PackageReference Include="Microsoft.Orleans.Client" Version="3.1.0" />
<!--<PackageReference Include="Microsoft.Orleans.CodeGenerator.MSBuild" Version="3.1.0"> <!--<PackageReference Include="Microsoft.Orleans.CodeGenerator.MSBuild" Version="3.1.0">
<PrivateAssets>all</PrivateAssets> <PrivateAssets>all</PrivateAssets>
...@@ -37,5 +39,10 @@ ...@@ -37,5 +39,10 @@
<ProjectReference Include="..\..\..\src\Pole.EventStorage.PostgreSql\Pole.EventStorage.PostgreSql.csproj" /> <ProjectReference Include="..\..\..\src\Pole.EventStorage.PostgreSql\Pole.EventStorage.PostgreSql.csproj" />
<ProjectReference Include="..\..\..\src\Pole.Orleans.Provider.EntityframeworkCore\Pole.Orleans.Provider.EntityframeworkCore.csproj" /> <ProjectReference Include="..\..\..\src\Pole.Orleans.Provider.EntityframeworkCore\Pole.Orleans.Provider.EntityframeworkCore.csproj" />
</ItemGroup> </ItemGroup>
<ItemGroup>
<Content Update="log4net.config">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</Content>
</ItemGroup>
</Project> </Project>
...@@ -2,14 +2,25 @@ ...@@ -2,14 +2,25 @@
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq; using System.Linq;
using System.Linq.Expressions; using System.Linq.Expressions;
using System.Text;
using System.Threading.Tasks; using System.Threading.Tasks;
using Backet.Api.Domain.Event;
using Backet.Api.EventHandlers.Abstraction; using Backet.Api.EventHandlers.Abstraction;
using Backet.Api.Grains.Abstraction; using Backet.Api.Grains.Abstraction;
using Backet.Api.Infrastructure;
using Dapper;
using Microsoft.AspNetCore.Http; using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc; using Microsoft.AspNetCore.Mvc;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging;
using Npgsql;
using Orleans; using Orleans;
using Pole.Core.EventBus; using Pole.Core.EventBus;
using Pole.Core.EventBus.Event;
using Pole.Core.EventBus.EventHandler; using Pole.Core.EventBus.EventHandler;
using Pole.Core.EventBus.EventStorage;
using Pole.Core.Serialization;
using Pole.Core.Utils.Abstraction;
namespace Backet.Api.Controllers namespace Backet.Api.Controllers
{ {
...@@ -18,26 +29,38 @@ namespace Backet.Api.Controllers ...@@ -18,26 +29,38 @@ namespace Backet.Api.Controllers
public class BacketController : ControllerBase public class BacketController : ControllerBase
{ {
private readonly IClusterClient clusterClient; private readonly IClusterClient clusterClient;
public BacketController(IClusterClient clusterClient) private readonly ILogger logger;
private readonly IProducerInfoContainer producerContainer;
private readonly IEventTypeFinder eventTypeFinder;
private readonly ISerializer serializer;
private readonly ISnowflakeIdGenerator snowflakeIdGenerator;
private readonly IEventBuffer eventBuffer;
public BacketController(IClusterClient clusterClient, ILogger<BacketController> logger, IProducerInfoContainer producerContainer,
IEventTypeFinder eventTypeFinder, ISerializer serializer, ISnowflakeIdGenerator snowflakeIdGenerator, IEventBuffer eventBuffer)
{ {
this.clusterClient = clusterClient; this.clusterClient = clusterClient;
this.logger = logger;
this.producerContainer = producerContainer;
this.eventTypeFinder = eventTypeFinder;
this.serializer = serializer;
this.snowflakeIdGenerator = snowflakeIdGenerator;
this.eventBuffer = eventBuffer;
} }
[HttpPost("api/backet/AddBacket")] [HttpPost("api/backet/AddBacket")]
public Task<bool> AddBacket([FromBody]Backet.Api.Grains.Abstraction.BacketDto backet) public Task<bool> AddBacket([FromBody]Backet.Api.Grains.Abstraction.BacketDto backet)
{ {
var newId = Guid.NewGuid().ToString("N").ToLower(); var newId = Guid.NewGuid().ToString("N").ToLower();
backet.Id = newId; backet.Id = newId;
//var entity = await backetDbContext.Backets.AsNoTracking().Include(box => box.BacketItems).SingleOrDefaultAsync(m => m.Id == "222");
////using (NpgsqlConnection conn = new NpgsqlConnection("Server=192.168.0.248;Port=5432;Username=postgres;Password=comteck2020!@#;Database=Pole-Backet;Enlist=True;Timeout=0;Command Timeout=600"))
////{
//// await conn.OpenAsync();
//// var teams = await conn.QueryAsync<Backet.Api.Domain.AggregatesModel.BacketAggregate.Backet>("SELECT * FROM \"public\".\"Backet\" where \"Id\" =@Id", new { Id = newId });
//// //var teams = await conn.ExecuteAsync("SELECT 1");
////}
var grain = clusterClient.GetGrain<IBacketGrain>(newId); var grain = clusterClient.GetGrain<IBacketGrain>(newId);
return grain.AddBacket(backet); return grain.AddBacket(backet);
//var clientType = typeof(IClusterClient);
//var clientParams = Expression.Parameter(clientType, "client");
//var primaryKeyParams = Expression.Parameter(typeof(string), "primaryKey");
//var grainClassNamePrefixParams = Expression.Parameter(typeof(string), "grainClassNamePrefix");
//var method = typeof(ClusterClientExtensions).GetMethod("GetGrain", new Type[] { clientType, typeof(string), typeof(string) });
//var body = Expression.Call(method.MakeGenericMethod(typeof(IToNoticeBacketCreatedEventHandler)), clientParams, primaryKeyParams, grainClassNamePrefixParams);
//var func = Expression.Lambda<Func<IClusterClient, string, string, IPoleEventHandler>>(body, clientParams, primaryKeyParams, grainClassNamePrefixParams).Compile();
//var handler = func(clusterClient, newId, null);
//await handler.Invoke(null);
//return true; //return true;
} }
[HttpPost("api/backet/UpdateBacket")] [HttpPost("api/backet/UpdateBacket")]
......
...@@ -9,6 +9,8 @@ using Microsoft.Extensions.Hosting; ...@@ -9,6 +9,8 @@ using Microsoft.Extensions.Hosting;
using Orleans; using Orleans;
using Orleans.Hosting; using Orleans.Hosting;
using Pole.Orleans.Provider.EntityframeworkCore; using Pole.Orleans.Provider.EntityframeworkCore;
using Microsoft.Extensions.Logging;
namespace Backet.Api namespace Backet.Api
{ {
public class Program public class Program
...@@ -28,6 +30,14 @@ namespace Backet.Api ...@@ -28,6 +30,14 @@ namespace Backet.Api
.ConfigureWebHostDefaults(webBuilder => .ConfigureWebHostDefaults(webBuilder =>
{ {
webBuilder.UseStartup<Startup>(); webBuilder.UseStartup<Startup>();
}); })
.ConfigureLogging((hostingContext, logging) =>
{
// The ILoggingBuilder minimum level determines the
// the lowest possible level for logging. The log4net
// level then sets the level that we actually log at.
logging.AddLog4Net();
logging.SetMinimumLevel(LogLevel.Warning);
});
} }
} }
...@@ -4,7 +4,7 @@ ...@@ -4,7 +4,7 @@
"commandName": "Project", "commandName": "Project",
"applicationUrl": "http://localhost:5000", "applicationUrl": "http://localhost:5000",
"environmentVariables": { "environmentVariables": {
"ASPNETCORE_ENVIRONMENT": "Development" //"ASPNETCORE_ENVIRONMENT": "Development"
} }
} }
} }
......
...@@ -40,7 +40,7 @@ namespace Backet.Api ...@@ -40,7 +40,7 @@ namespace Backet.Api
services.ConfigureGrainStorageOptions<BacketDbContext, BacketGrain, Backet.Api.Domain.AggregatesModel.BacketAggregate.Backet>( services.ConfigureGrainStorageOptions<BacketDbContext, BacketGrain, Backet.Api.Domain.AggregatesModel.BacketAggregate.Backet>(
options => options =>
{ {
options.UseQuery(context => context.Backets options.UseQuery(context => context.Backets.AsNoTracking()
.Include(box => box.BacketItems)); .Include(box => box.BacketItems));
options.IsRelatedData = true; options.IsRelatedData = true;
}); });
......
...@@ -8,7 +8,7 @@ ...@@ -8,7 +8,7 @@
} }
}, },
"postgres": { "postgres": {
"write": "Server=192.168.0.248;Port=5432;Username=postgres;Password=comteck2020!@#;Database=Pole-Backet;Enlist=True;Timeout=0;Command Timeout=600;Pooling=false;MinPoolSize=20;MaxPoolSize=500;" "write": "Server=192.168.0.248;Port=5432;Username=postgres;Password=comteck2020!@#;Database=Pole-Backet;Enlist=True;Timeout=0;Command Timeout=600;MinPoolSize=20;MaxPoolSize=500;"
}, },
"ServiceName": "Backet", "ServiceName": "Backet",
"RabbitmqConfig": { "RabbitmqConfig": {
......
<?xml version="1.0" encoding="utf-8" ?>
<log4net>
<appender name="RollingFileAppender" type="log4net.Appender.RollingFileAppender" >
<layout type="log4net.Layout.PatternLayout">
<conversionPattern value="%date [%thread] %-5level %logger - %message%newline" />
</layout>
<param name="File" value="Logger/"/>
<param name="AppendToFile" value="true" />
<!--输出的日志不会覆盖以前的信息-->
<param name="MaxSizeRollBackups" value="100" />
<!--备份文件的个数-->
<param name="MaxFileSize" value="10240" />
<!--当个日志文件的最大大小-->
<param name="StaticLogFileName" value="false" />
<!--是否使用静态文件名-->
<param name="DatePattern" value="yyyyMMdd&quot;.log&quot;" />
<!--日志文件名-->
</appender>
<root>
<level value="Warn"/>
<appender-ref ref="RollingFileAppender" />
</root>
</log4net>
\ No newline at end of file
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Pole.Core.EventBus.Event;
using Pole.Core.EventBus.EventStorage;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
namespace Pole.Core.EventBus
{
class EventBuffer : IEventBuffer
{
readonly BufferBlock<EventEntity> buffer = new BufferBlock<EventEntity>();
private int autoConsuming = 0;
private readonly ILogger logger;
/// <summary>
/// 批量数据处理每次处理的最大数据量
/// </summary>
private readonly int maxBatchSize = 10000;
/// <summary>
/// 批量数据接收的最大延时
/// </summary>
private readonly int maxMillisecondsDelay = 2000;
private readonly IProducerInfoContainer producerContainer;
private readonly IProducer producer;
private readonly IEventStorage eventStorage;
private readonly PoleOptions options;
private Task<bool> waitToReadTask;
public EventBuffer(ILogger<EventBuffer> logger, IProducerInfoContainer producerContainer, IProducer producer, IEventStorage eventStorage, IOptions<PoleOptions> options)
{
this.logger = logger;
this.producerContainer = producerContainer;
this.producer = producer;
this.eventStorage = eventStorage;
this.options = options.Value;
}
public async Task<bool> AddAndRun(EventEntity eventEntity)
{
if (!buffer.Post(eventEntity))
return await buffer.SendAsync(eventEntity);
if (autoConsuming == 0)
ActiveAutoExecute();
return true;
}
private void ActiveAutoExecute()
{
if (autoConsuming == 0)
ThreadPool.QueueUserWorkItem(ActiveConsumer);
async void ActiveConsumer(object state)
{
if (Interlocked.CompareExchange(ref autoConsuming, 1, 0) == 0)
{
try
{
while (await WaitToReadAsync())
{
try
{
await Execute();
}
catch (Exception ex)
{
logger.LogError(ex, ex.Message);
}
}
}
finally
{
Interlocked.Exchange(ref autoConsuming, 0);
}
}
}
}
public async Task<bool> WaitToReadAsync()
{
waitToReadTask = buffer.OutputAvailableAsync();
return await waitToReadTask;
}
public async Task Execute()
{
if (waitToReadTask.IsCompletedSuccessfully && waitToReadTask.Result)
{
var dataList = new List<EventEntity>();
var startTime = DateTimeOffset.UtcNow;
while (buffer.TryReceive(out var value))
{
dataList.Add(value);
if (dataList.Count > maxBatchSize)
{
break;
}
else if ((DateTimeOffset.UtcNow - startTime).TotalMilliseconds > maxMillisecondsDelay)
{
break;
}
}
if (dataList.Count > 0)
{
await ExecuteCore(dataList);
}
}
}
private async Task ExecuteCore(List<EventEntity> eventEntities)
{
logger.LogError($"Begin ExecuteCore Count:{eventEntities.Count} ");
var events = eventEntities.Select(entity =>
{
var eventContentBytes = Encoding.UTF8.GetBytes(entity.Content);
var bytesTransport = new EventBytesTransport(entity.Name, entity.Id, eventContentBytes);
var bytes = bytesTransport.GetBytes();
var targetName = producerContainer.GetTargetName(entity.Name);
entity.StatusName = nameof(EventStatus.Published);
entity.ExpiresAt = DateTime.UtcNow.AddSeconds(options.PublishedEventsExpiredAfterSeconds);
return (targetName, bytes);
});
eventEntities.ForEach(entity =>
{
entity.StatusName = nameof(EventStatus.Published);
entity.ExpiresAt = DateTime.UtcNow.AddSeconds(options.PublishedEventsExpiredAfterSeconds);
});
logger.LogError($"Begin BulkPublish {DateTime.Now.ToString("yyyy-MM-dd hh:mm:ss")} ");
await producer.BulkPublish(events);
logger.LogError($"Begin BulkPublish {DateTime.Now.ToString("yyyy-MM-dd hh:mm:ss")} ");
if (eventEntities.Count > 10)
{
await eventStorage.BulkChangePublishStateAsync(eventEntities);
}
else
{
await eventStorage.ChangePublishStateAsync(eventEntities);
}
logger.LogError($"End ExecuteCore {DateTime.Now.ToString("yyyy-MM-dd hh:mm:ss")} Count:{eventEntities.Count} ");
}
}
}
...@@ -9,7 +9,8 @@ namespace Pole.Core.EventBus.EventStorage ...@@ -9,7 +9,8 @@ namespace Pole.Core.EventBus.EventStorage
public interface IEventStorage public interface IEventStorage
{ {
Task ChangePublishStateAsync(EventEntity message, EventStatus state); Task ChangePublishStateAsync(EventEntity message, EventStatus state);
Task BulkChangePublishStateAsync(IEnumerable<EventEntity> messages); Task ChangePublishStateAsync(IEnumerable<EventEntity> messages);
Task BulkChangePublishStateAsync(IEnumerable<EventEntity> events);
Task<bool> StoreMessage(EventEntity eventEntity, object dbTransaction = null); Task<bool> StoreMessage(EventEntity eventEntity, object dbTransaction = null);
......
using Pole.Core.EventBus.EventStorage;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
namespace Pole.Core.EventBus
{
public interface IEventBuffer
{
Task<bool> AddAndRun(EventEntity eventEntity);
}
}
using System.Threading.Tasks; using System.Collections.Generic;
using System.Threading.Tasks;
namespace Pole.Core.EventBus namespace Pole.Core.EventBus
{ {
public interface IProducer public interface IProducer
{ {
ValueTask Publish(byte[] bytes); ValueTask Publish(string targetName, byte[] bytes);
ValueTask BulkPublish(IEnumerable<(string,byte[])> events);
} }
} }
...@@ -3,9 +3,8 @@ using System.Threading.Tasks; ...@@ -3,9 +3,8 @@ using System.Threading.Tasks;
namespace Pole.Core.EventBus namespace Pole.Core.EventBus
{ {
public interface IProducerContainer public interface IProducerInfoContainer
{ {
ValueTask<IProducer> GetProducer<T>(); string GetTargetName(string typeName);
ValueTask<IProducer> GetProducer(string typeName);
} }
} }
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.Core.Exceptions
{
public class AddEventToEventBufferException: Exception
{
public AddEventToEventBufferException() : base("Errors when add event to the event buffer ")
{
}
}
}
...@@ -24,6 +24,7 @@ namespace Microsoft.Extensions.DependencyInjection ...@@ -24,6 +24,7 @@ namespace Microsoft.Extensions.DependencyInjection
services.Configure<PoleOptions>(option => { }); services.Configure<PoleOptions>(option => { });
} }
services.AddSingleton<IEventTypeFinder, EventTypeFinder>(); services.AddSingleton<IEventTypeFinder, EventTypeFinder>();
services.AddSingleton<IEventBuffer, EventBuffer>();
services.AddTransient(typeof(IMpscChannel<>), typeof(MpscChannel<>)); services.AddTransient(typeof(IMpscChannel<>), typeof(MpscChannel<>));
services.AddScoped<IBus, Bus>(); services.AddScoped<IBus, Bus>();
services.AddScoped<IUnitOfWork, Pole.Core.UnitOfWork.UnitOfWork>(); services.AddScoped<IUnitOfWork, Pole.Core.UnitOfWork.UnitOfWork>();
......
...@@ -37,17 +37,14 @@ namespace Pole.Core.Processor ...@@ -37,17 +37,14 @@ namespace Pole.Core.Processor
{ {
try try
{ {
var tables = new[] var tables = new[] { initializer.GetTableName() };
{
initializer.GetTableName(),
};
foreach (var table in tables) foreach (var table in tables)
{ {
logger.LogDebug($"Collecting expired data from table: {table}"); logger.LogDebug($"Collecting expired data from table: {table}");
int deletedCount; int deletedCount;
var time = DateTime.Now; var time = DateTime.UtcNow;
do do
{ {
deletedCount = await eventstorage.DeleteExpiresAsync(table, time, ItemBatch, context.CancellationToken); deletedCount = await eventstorage.DeleteExpiresAsync(table, time, ItemBatch, context.CancellationToken);
...@@ -59,7 +56,7 @@ namespace Pole.Core.Processor ...@@ -59,7 +56,7 @@ namespace Pole.Core.Processor
} while (deletedCount != 0); } while (deletedCount != 0);
} }
} }
catch(Exception ex) catch (Exception ex)
{ {
logger.LogError(ex, $"{nameof(ExpiredEventsCollectorProcessor)} Process Error"); logger.LogError(ex, $"{nameof(ExpiredEventsCollectorProcessor)} Process Error");
} }
......
...@@ -16,12 +16,14 @@ namespace Pole.Core.Processor ...@@ -16,12 +16,14 @@ namespace Pole.Core.Processor
{ {
private readonly IEventStorage eventStorage; private readonly IEventStorage eventStorage;
private readonly PoleOptions options; private readonly PoleOptions options;
private readonly IProducerContainer producerContainer; private readonly IProducerInfoContainer producerContainer;
private readonly ISerializer serializer; private readonly ISerializer serializer;
private readonly ILogger<PendingMessageRetryProcessor> logger; private readonly ILogger<PendingMessageRetryProcessor> logger;
private readonly ProducerOptions producerOptions; private readonly ProducerOptions producerOptions;
private readonly IProducer producer;
private readonly IEventBuffer eventBuffer;
public PendingMessageRetryProcessor(IEventStorage eventStorage, IOptions<PoleOptions> options, ILogger<PendingMessageRetryProcessor> logger, public PendingMessageRetryProcessor(IEventStorage eventStorage, IOptions<PoleOptions> options, ILogger<PendingMessageRetryProcessor> logger,
IProducerContainer producerContainer, ISerializer serializer, IOptions<ProducerOptions> producerOptions) IProducerInfoContainer producerContainer, ISerializer serializer, IOptions<ProducerOptions> producerOptions, IProducer producer, IEventBuffer eventBuffer)
{ {
this.eventStorage = eventStorage; this.eventStorage = eventStorage;
this.options = options.Value ?? throw new Exception($"{nameof(PoleOptions)} Must be injected"); this.options = options.Value ?? throw new Exception($"{nameof(PoleOptions)} Must be injected");
...@@ -29,6 +31,8 @@ namespace Pole.Core.Processor ...@@ -29,6 +31,8 @@ namespace Pole.Core.Processor
this.producerContainer = producerContainer; this.producerContainer = producerContainer;
this.serializer = serializer; this.serializer = serializer;
this.producerOptions = producerOptions.Value ?? throw new Exception($"{nameof(ProducerOptions)} Must be injected"); this.producerOptions = producerOptions.Value ?? throw new Exception($"{nameof(ProducerOptions)} Must be injected");
this.producer = producer;
this.eventBuffer = eventBuffer;
} }
public override string Name => nameof(PendingMessageRetryProcessor); public override string Name => nameof(PendingMessageRetryProcessor);
...@@ -67,15 +71,20 @@ namespace Pole.Core.Processor ...@@ -67,15 +71,20 @@ namespace Pole.Core.Processor
pendingMessage.ExpiresAt = DateTime.UtcNow; pendingMessage.ExpiresAt = DateTime.UtcNow;
} }
pendingMessage.Retries++; pendingMessage.Retries++;
var producer = await producerContainer.GetProducer(pendingMessage.Name); var targetName = producerContainer.GetTargetName(pendingMessage.Name);
await producer.Publish(bytes); await producer.Publish(targetName, bytes);
pendingMessage.StatusName = nameof(EventStatus.Published);
pendingMessage.ExpiresAt = DateTime.UtcNow.AddSeconds(options.PublishedEventsExpiredAfterSeconds);
} }
if (pendingMessages.Count() > 0) if (pendingMessages.Count() > 0)
{ {
await eventStorage.BulkChangePublishStateAsync(pendingMessages); if (pendingMessages.Count() > 10)
} {
await eventStorage.BulkChangePublishStateAsync(pendingMessages);
}
else
{
await eventStorage.ChangePublishStateAsync(pendingMessages);
}
}
} }
} }
} }
...@@ -13,24 +13,24 @@ using Pole.Core.EventBus.Event; ...@@ -13,24 +13,24 @@ using Pole.Core.EventBus.Event;
using Pole.Core.EventBus.EventStorage; using Pole.Core.EventBus.EventStorage;
using Microsoft.Extensions.Options; using Microsoft.Extensions.Options;
using Pole.Core.Utils.Abstraction; using Pole.Core.Utils.Abstraction;
using Pole.Core.Exceptions;
namespace Pole.Core.UnitOfWork namespace Pole.Core.UnitOfWork
{ {
class UnitOfWork : IUnitOfWork class UnitOfWork : IUnitOfWork
{ {
private readonly IProducerContainer producerContainer; private readonly IProducerInfoContainer producerContainer;
private readonly IEventTypeFinder eventTypeFinder; private readonly IEventTypeFinder eventTypeFinder;
private readonly ISerializer serializer; private readonly ISerializer serializer;
private readonly IEventStorage eventStorage;
private readonly PoleOptions options;
private IBus bus; private IBus bus;
public UnitOfWork(IProducerContainer producerContainer, IEventTypeFinder eventTypeFinder, ISerializer serializer, IEventStorage eventStorage, IOptions<PoleOptions> options) private IEventBuffer eventBuffer;
public UnitOfWork(IProducerInfoContainer producerContainer, IEventTypeFinder eventTypeFinder,
ISerializer serializer, IEventBuffer eventBuffer)
{ {
this.producerContainer = producerContainer; this.producerContainer = producerContainer;
this.eventTypeFinder = eventTypeFinder; this.eventTypeFinder = eventTypeFinder;
this.serializer = serializer; this.serializer = serializer;
this.eventStorage = eventStorage; this.eventBuffer = eventBuffer;
this.options = options.Value;
} }
public async Task CompeleteAsync(CancellationToken cancellationToken = default) public async Task CompeleteAsync(CancellationToken cancellationToken = default)
...@@ -45,12 +45,12 @@ namespace Pole.Core.UnitOfWork ...@@ -45,12 +45,12 @@ namespace Pole.Core.UnitOfWork
var eventContentBytes = Encoding.UTF8.GetBytes(@event.Content); var eventContentBytes = Encoding.UTF8.GetBytes(@event.Content);
var bytesTransport = new EventBytesTransport(@event.Name, @event.Id, eventContentBytes); var bytesTransport = new EventBytesTransport(@event.Name, @event.Id, eventContentBytes);
var bytes = bytesTransport.GetBytes(); var bytes = bytesTransport.GetBytes();
var producer = await producerContainer.GetProducer(@event.Name); var result = await eventBuffer.AddAndRun(@event);
await producer.Publish(bytes); if (!result)
@event.StatusName = nameof(EventStatus.Published); {
@event.ExpiresAt = DateTime.UtcNow.AddSeconds(options.PublishedEventsExpiredAfterSeconds); throw new AddEventToEventBufferException();
}
}); });
await eventStorage.BulkChangePublishStateAsync(bufferedEvents);
} }
public void Dispose() public void Dispose()
......
...@@ -26,6 +26,7 @@ namespace Pole.EventBus.RabbitMQ ...@@ -26,6 +26,7 @@ namespace Pole.EventBus.RabbitMQ
if (channels.Count < Options.MasChannelsPerConnection) if (channels.Count < Options.MasChannelsPerConnection)
{ {
var channel = new ModelWrapper(this, connection.CreateModel()); var channel = new ModelWrapper(this, connection.CreateModel());
channel.Model.ConfirmSelect();
channels.Add(channel); channels.Add(channel);
return (true, channel); return (true, channel);
} }
......
...@@ -41,7 +41,6 @@ namespace Pole.EventBus.RabbitMQ ...@@ -41,7 +41,6 @@ namespace Pole.EventBus.RabbitMQ
{ {
noPersistentProperties.Headers = headers; noPersistentProperties.Headers = headers;
} }
Model.ConfirmSelect();
Model.BasicPublish(exchange, routingKey, persistent ? persistentProperties : noPersistentProperties, msg); Model.BasicPublish(exchange, routingKey, persistent ? persistentProperties : noPersistentProperties, msg);
if (!Model.WaitForConfirms(TimeSpan.FromSeconds(Connection.Options.ProducerConfirmWaitTimeoutSeconds), out bool isTimeout)) if (!Model.WaitForConfirms(TimeSpan.FromSeconds(Connection.Options.ProducerConfirmWaitTimeoutSeconds), out bool isTimeout))
{ {
...@@ -56,22 +55,13 @@ namespace Pole.EventBus.RabbitMQ ...@@ -56,22 +55,13 @@ namespace Pole.EventBus.RabbitMQ
} }
} }
public void WaitForConfirmsOrDie(TimeSpan timeSpan)
{
Model.WaitForConfirmsOrDie(timeSpan);
}
public void Publish(byte[] msg, string exchange, string routingKey, bool persistent = true) public void Publish(byte[] msg, string exchange, string routingKey, bool persistent = true)
{ {
Model.ConfirmSelect();
Model.BasicPublish(exchange, routingKey, persistent ? persistentProperties : noPersistentProperties, msg); Model.BasicPublish(exchange, routingKey, persistent ? persistentProperties : noPersistentProperties, msg);
if (!Model.WaitForConfirms(TimeSpan.FromSeconds(Connection.Options.ProducerConfirmWaitTimeoutSeconds), out bool isTimeout))
{
if (isTimeout)
{
throw new ProducerConfirmTimeOutException(Connection.Options.ProducerConfirmWaitTimeoutSeconds);
}
else
{
throw new ProducerReceivedNAckException();
}
}
} }
public void Dispose() public void Dispose()
{ {
......
...@@ -9,7 +9,7 @@ namespace Pole.EventBus.RabbitMQ ...@@ -9,7 +9,7 @@ namespace Pole.EventBus.RabbitMQ
public string Password { get; set; } public string Password { get; set; }
public string VirtualHost { get; set; } = "/"; public string VirtualHost { get; set; } = "/";
public int Port { get; set; } = 5672; public int Port { get; set; } = 5672;
public int MasChannelsPerConnection { get; set; } = 200; public int MasChannelsPerConnection { get; set; } = 512;
/// <summary> /// <summary>
/// 目前为一个连接 当消息数量非常大时,单个TCP连接的运输能力有限,可以修改这个最大连接数提高运输能力 /// 目前为一个连接 当消息数量非常大时,单个TCP连接的运输能力有限,可以修改这个最大连接数提高运输能力
/// </summary> /// </summary>
......
...@@ -16,7 +16,7 @@ using System.Linq; ...@@ -16,7 +16,7 @@ using System.Linq;
namespace Pole.EventBus.RabbitMQ namespace Pole.EventBus.RabbitMQ
{ {
public class EventBusContainer : IRabbitEventBusContainer, IProducerContainer public class EventBusContainer : IRabbitEventBusContainer, IProducerInfoContainer
{ {
private readonly ConcurrentDictionary<string, RabbitEventBus> eventBusDictionary = new ConcurrentDictionary<string, RabbitEventBus>(); private readonly ConcurrentDictionary<string, RabbitEventBus> eventBusDictionary = new ConcurrentDictionary<string, RabbitEventBus>();
private readonly List<RabbitEventBus> eventBusList = new List<RabbitEventBus>(); private readonly List<RabbitEventBus> eventBusList = new List<RabbitEventBus>();
...@@ -79,25 +79,17 @@ namespace Pole.EventBus.RabbitMQ ...@@ -79,25 +79,17 @@ namespace Pole.EventBus.RabbitMQ
readonly ConcurrentDictionary<string, IProducer> producerDict = new ConcurrentDictionary<string, IProducer>(); readonly ConcurrentDictionary<string, IProducer> producerDict = new ConcurrentDictionary<string, IProducer>();
public string GetTargetName(string typeName)
public ValueTask<IProducer> GetProducer(string typeName)
{ {
if (eventBusDictionary.TryGetValue(typeName, out var eventBus)) if (eventBusDictionary.TryGetValue(typeName, out var eventBus))
{ {
return new ValueTask<IProducer>(producerDict.GetOrAdd(typeName, key => return $"{rabbitOptions.Prefix}{eventBus.Exchange}";
{
return new RabbitProducer(rabbitMQClient, eventBus, rabbitOptions);
}));
} }
else else
{ {
throw new NotImplementedException($"{nameof(IProducer)} of {typeName}"); throw new NotImplementedException($"{nameof(RabbitEventBus)} of {typeName}");
} }
} }
public ValueTask<IProducer> GetProducer<T>()
{
return GetProducer(typeof(T).FullName);
}
public List<IConsumer> GetConsumers() public List<IConsumer> GetConsumers()
{ {
var result = new List<IConsumer>(); var result = new List<IConsumer>();
...@@ -160,6 +152,7 @@ namespace Pole.EventBus.RabbitMQ ...@@ -160,6 +152,7 @@ namespace Pole.EventBus.RabbitMQ
} }
} }
#endregion #endregion
} }
} }
...@@ -22,7 +22,8 @@ namespace Microsoft.Extensions.DependencyInjection ...@@ -22,7 +22,8 @@ namespace Microsoft.Extensions.DependencyInjection
startupOption.Services.AddSingleton<IRabbitMQClient, RabbitMQClient>(); startupOption.Services.AddSingleton<IRabbitMQClient, RabbitMQClient>();
//startupOption.Services.AddHostedService<ConsumerManager>(); //startupOption.Services.AddHostedService<ConsumerManager>();
startupOption.Services.AddSingleton<IRabbitEventBusContainer, EventBusContainer>(); startupOption.Services.AddSingleton<IRabbitEventBusContainer, EventBusContainer>();
startupOption.Services.AddSingleton(serviceProvider => serviceProvider.GetService<IRabbitEventBusContainer>() as IProducerContainer); startupOption.Services.AddSingleton<IProducer, RabbitProducer>();
startupOption.Services.AddSingleton(serviceProvider => serviceProvider.GetService<IRabbitEventBusContainer>() as IProducerInfoContainer);
Startup.Register(async serviceProvider => Startup.Register(async serviceProvider =>
{ {
var container = serviceProvider.GetService<IRabbitEventBusContainer>(); var container = serviceProvider.GetService<IRabbitEventBusContainer>();
......
using Pole.Core; using Microsoft.Extensions.Options;
using Pole.Core;
using Pole.Core.EventBus; using Pole.Core.EventBus;
using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks; using System.Threading.Tasks;
namespace Pole.EventBus.RabbitMQ namespace Pole.EventBus.RabbitMQ
{ {
public class RabbitProducer : IProducer public class RabbitProducer : IProducer
{ {
readonly RabbitEventBus publisher;
readonly IRabbitMQClient rabbitMQClient; readonly IRabbitMQClient rabbitMQClient;
readonly RabbitOptions rabbitOptions; readonly RabbitOptions rabbitOptions;
public RabbitProducer( public RabbitProducer(
IRabbitMQClient rabbitMQClient, IRabbitMQClient rabbitMQClient,
RabbitEventBus publisher, IOptions<RabbitOptions> rabbitOptions)
RabbitOptions rabbitOptions)
{ {
this.publisher = publisher;
this.rabbitMQClient = rabbitMQClient; this.rabbitMQClient = rabbitMQClient;
this.rabbitOptions = rabbitOptions; this.rabbitOptions = rabbitOptions.Value;
} }
public ValueTask Publish(byte[] bytes)
public ValueTask BulkPublish(IEnumerable<(string, byte[])> events)
{
using (var channel = rabbitMQClient.PullChannel())
{
events.ToList().ForEach(@event =>
{
channel.Publish(@event.Item2, @event.Item1, string.Empty, true);
});
channel.WaitForConfirmsOrDie(TimeSpan.FromSeconds(rabbitOptions.ProducerConfirmWaitTimeoutSeconds));
}
return Consts.ValueTaskDone;
}
public ValueTask Publish(string targetName, byte[] bytes)
{ {
using var channel = rabbitMQClient.PullChannel(); using (var channel = rabbitMQClient.PullChannel())
channel.Publish(bytes, $"{rabbitOptions.Prefix}{publisher.Exchange}", string.Empty, publisher.Persistent); {
channel.Publish(bytes, targetName, string.Empty, true);
channel.WaitForConfirmsOrDie(TimeSpan.FromSeconds(rabbitOptions.ProducerConfirmWaitTimeoutSeconds));
}
return Consts.ValueTaskDone; return Consts.ValueTaskDone;
} }
} }
......
using Npgsql;
using NpgsqlTypes;
using Pole.Core.EventBus.EventStorage;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Dapper;
namespace Pole.EventStorage.PostgreSql
{
public class PoleNpgsqlBulkUploader
{
private readonly NpgsqlConnection npgsqlConnection;
private static int tablesCounter = 0;
private static string uniqueTablePrefix = Guid.NewGuid().ToString().Replace("-", "_");
public PoleNpgsqlBulkUploader(NpgsqlConnection npgsqlConnection)
{
this.npgsqlConnection = npgsqlConnection;
}
public async Task UpdateAsync(string tableName, IEnumerable<EventEntity> eventEntities)
{
await npgsqlConnection.OpenAsync();
using (var transaction = await npgsqlConnection.BeginTransactionAsync())
{
var tempTableName = GetUniqueName("_temp_");
// 1. Create temp table
var sql = $"CREATE TEMP TABLE {tempTableName} ON COMMIT DROP AS SELECT \"Retries\" , \"ExpiresAt\" , \"StatusName\" , \"Id\" FROM {tableName} LIMIT 0";
await npgsqlConnection.ExecuteAsync(sql);
// 2. Import into temp table
using (var importer = npgsqlConnection.BeginBinaryImport($"COPY {tempTableName} (\"Retries\" , \"ExpiresAt\" , \"StatusName\" , \"Id\") FROM STDIN (FORMAT BINARY)"))
{
foreach (var item in eventEntities)
{
importer.StartRow();
importer.Write(item.Retries);
if (item.ExpiresAt.HasValue)
{
importer.Write(item.ExpiresAt.Value, NpgsqlDbType.Timestamp);
}
else
{
importer.Write(DBNull.Value);
}
importer.Write(item.StatusName, NpgsqlDbType.Varchar);
importer.Write(item.Id, NpgsqlDbType.Varchar);
}
importer.Complete();
}
// 3. Insert into real table from temp one
sql = $"UPDATE {tableName} target SET \"Retries\" = \"source\".\"Retries\" , \"ExpiresAt\" = \"source\".\"ExpiresAt\" , \"StatusName\" = \"source\".\"StatusName\" FROM {tempTableName} as source WHERE \"target\".\"Id\" = \"source\".\"Id\"";
await npgsqlConnection.ExecuteAsync(sql);
// 5. Commit
transaction?.Commit();
}
}
/// <summary>
/// Get unique object name using user-defined prefix.
/// </summary>
/// <param name="prefix">Prefix.</param>
/// <returns>Unique name.</returns>
static string GetUniqueName(string prefix)
{
var counter = Interlocked.Increment(ref tablesCounter);
return $"{prefix}_{uniqueTablePrefix}_{counter}";
}
}
}
...@@ -28,18 +28,28 @@ namespace Pole.EventStorage.PostgreSql ...@@ -28,18 +28,28 @@ namespace Pole.EventStorage.PostgreSql
tableName = eventStorageInitializer.GetTableName(); tableName = eventStorageInitializer.GetTableName();
} }
public async Task BulkChangePublishStateAsync(IEnumerable<EventEntity> events) public async Task ChangePublishStateAsync(IEnumerable<EventEntity> events)
{ {
var sql = var sql =
$"UPDATE {tableName} SET \"Retries\"=@Retries,\"ExpiresAt\"=@ExpiresAt,\"StatusName\"=@StatusName WHERE \"Id\" IN (@Ids)"; $"UPDATE {tableName} SET \"Retries\"=@Retries,\"ExpiresAt\"=@ExpiresAt,\"StatusName\"=@StatusName WHERE \"Id\" = @Id";
using var connection = new NpgsqlConnection(options.ConnectionString); using (var connection = new NpgsqlConnection(options.ConnectionString))
await connection.ExecuteAsync(sql, events.Select(@event=> new
{ {
Ids =string.Join(',',events.Select(@event=>@event.Id).ToArray()), var result = await connection.ExecuteAsync(sql, events.Select(@event => new
@event.Retries, {
@event.ExpiresAt, Id = @event.Id,
@event.StatusName @event.Retries,
}).ToList()); @event.ExpiresAt,
@event.StatusName
}).ToList());
}
}
public async Task BulkChangePublishStateAsync(IEnumerable<EventEntity> events)
{
using (var connection = new NpgsqlConnection(options.ConnectionString))
{
var uploader = new PoleNpgsqlBulkUploader(connection);
await uploader.UpdateAsync(tableName, events);
}
} }
public async Task ChangePublishStateAsync(EventEntity message, EventStatus state) public async Task ChangePublishStateAsync(EventEntity message, EventStatus state)
...@@ -79,8 +89,8 @@ $"UPDATE {tableName} SET \"Retries\"=@Retries,\"ExpiresAt\"=@ExpiresAt,\"StatusN ...@@ -79,8 +89,8 @@ $"UPDATE {tableName} SET \"Retries\"=@Retries,\"ExpiresAt\"=@ExpiresAt,\"StatusN
result.Add(new EventEntity result.Add(new EventEntity
{ {
Id = reader.GetString(0), Id = reader.GetString(0),
Name=reader.GetString(2), Name = reader.GetString(2),
Content=reader.GetString(3), Content = reader.GetString(3),
Retries = reader.GetInt32(4), Retries = reader.GetInt32(4),
Added = reader.GetDateTime(5) Added = reader.GetDateTime(5)
}); });
......
...@@ -54,7 +54,6 @@ namespace Pole.Orleans.Provider.EntityframeworkCore ...@@ -54,7 +54,6 @@ namespace Pole.Orleans.Provider.EntityframeworkCore
{ {
TEntity entity = await _options.ReadStateAsync(context, grainReference) TEntity entity = await _options.ReadStateAsync(context, grainReference)
.ConfigureAwait(false); .ConfigureAwait(false);
_options.SetEntity(grainState, entity); _options.SetEntity(grainState, entity);
if (entity != null && _options.CheckForETag) if (entity != null && _options.CheckForETag)
......
...@@ -2,10 +2,12 @@ ...@@ -2,10 +2,12 @@
using BenchmarkDotNet.Attributes; using BenchmarkDotNet.Attributes;
using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
using Npgsql;
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Text; using System.Text;
using System.Threading.Tasks; using System.Threading.Tasks;
using Dapper;
namespace Pole.Samples.Backet.Api.Benchmarks namespace Pole.Samples.Backet.Api.Benchmarks
{ {
...@@ -15,7 +17,7 @@ namespace Pole.Samples.Backet.Api.Benchmarks ...@@ -15,7 +17,7 @@ namespace Pole.Samples.Backet.Api.Benchmarks
public GrainWithEntityframeworkCoreAndPgTest() public GrainWithEntityframeworkCoreAndPgTest()
{ {
var services = new ServiceCollection(); var services = new ServiceCollection();
services.AddDbContextPool<BacketDbContext>(options => options.UseNpgsql("Server=192.168.0.248;Port=5432;Username=postgres;Password=comteck2020!@#;Database=Pole-Backet;Enlist=True;Timeout=0;Command Timeout=600;Pooling=false;MinPoolSize=20;MaxPoolSize=500;")); services.AddDbContextPool<BacketDbContext>(options => options.UseNpgsql("Server=192.168.0.248;Port=5432;Username=postgres;Password=comteck2020!@#;Database=Pole-Backet;Enlist=True;Timeout=0;Command Timeout=600;MinPoolSize=20;MaxPoolSize=500;"));
serviceProvider = services.BuildServiceProvider(); serviceProvider = services.BuildServiceProvider();
} }
[Benchmark] [Benchmark]
...@@ -28,5 +30,15 @@ namespace Pole.Samples.Backet.Api.Benchmarks ...@@ -28,5 +30,15 @@ namespace Pole.Samples.Backet.Api.Benchmarks
var entity = await context.Backets.Include(box => box.BacketItems).SingleOrDefaultAsync(m => m.Id == "222"); var entity = await context.Backets.Include(box => box.BacketItems).SingleOrDefaultAsync(m => m.Id == "222");
} }
} }
[Benchmark]
public async Task DapperOpenConnection()
{
using (NpgsqlConnection conn = new NpgsqlConnection("Server=192.168.0.251;Port=5432;Username=postgres;Password=comteck2020!@#;Database=smartretail-tenant;Enlist=True;"))
{
await conn.OpenAsync();
//var teams = await conn.QueryAsync<Backet.Api.Domain.AggregatesModel.BacketAggregate.Backet>("SELECT * FROM \"public\".\"Backet\" where \"Id\" =@Id", new { Id = newId });
var teams = await conn.ExecuteAsync("SELECT 1");
}
}
} }
} }
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
<ItemGroup> <ItemGroup>
<ProjectReference Include="..\..\samples\apis\Backet.Api\Backet.Api.csproj" /> <ProjectReference Include="..\..\samples\apis\Backet.Api\Backet.Api.csproj" />
<ProjectReference Include="..\..\src\Pole.EventStorage.PostgreSql\Pole.EventStorage.PostgreSql.csproj" />
</ItemGroup> </ItemGroup>
</Project> </Project>
using BenchmarkDotNet.Reports; using BenchmarkDotNet.Reports;
using BenchmarkDotNet.Running; using BenchmarkDotNet.Running;
using Npgsql;
using Pole.Core.EventBus.EventStorage;
using Pole.Samples.Backet.Api.Benchmarks; using Pole.Samples.Backet.Api.Benchmarks;
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
...@@ -14,8 +16,15 @@ namespace Pole.Samples.Backet.Api ...@@ -14,8 +16,15 @@ namespace Pole.Samples.Backet.Api
{ {
//GrainWithEntityframeworkCoreAndPgTest grainWithEntityframeworkCoreAndPgTest = new GrainWithEntityframeworkCoreAndPgTest(); //GrainWithEntityframeworkCoreAndPgTest grainWithEntityframeworkCoreAndPgTest = new GrainWithEntityframeworkCoreAndPgTest();
//await grainWithEntityframeworkCoreAndPgTest.SingleOrDefaultAsync(); //await grainWithEntityframeworkCoreAndPgTest.SingleOrDefaultAsync();
Summary summary = BenchmarkRunner.Run<GrainWithEntityframeworkCoreAndPgTest>(); //Summary summary = BenchmarkRunner.Run<GrainWithEntityframeworkCoreAndPgTest>();
Console.ReadLine(); //Console.ReadLine();
using ( var connection = new NpgsqlConnection("Server=192.168.0.248;Port=5432;Username=postgres;Password=comteck2020!@#;Database=Pole-Backet;Enlist=True;Timeout=0;Command Timeout=600;MinPoolSize=20;MaxPoolSize=500;"))
{
var uploader = new Pole.EventStorage.PostgreSql.PoleNpgsqlBulkUploader(connection);
var events = new List<EventEntity>();
events.Add(new EventEntity { Id = "111", Retries = 20, ExpiresAt = DateTime.Now, StatusName = "333" });
await uploader.UpdateAsync("\"pole\".\"Events\"", events);
}
} }
} }
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment