From c0303e023f696defd0d28fa8395d8e0d79633575 Mon Sep 17 00:00:00 2001 From: 丁松杰 <377973147@qq.com> Date: Fri, 14 Feb 2020 14:04:31 +0800 Subject: [PATCH] 添加 event 发布者 重试机制 --- src/Pole.Core/EventBus/Bus.cs | 7 +------ src/Pole.Core/EventBus/EventStorage/EventStatus.cs | 2 +- src/Pole.Core/EventBus/EventStorage/IEventStorage.cs | 3 ++- src/Pole.Core/PoleOptions.cs | 1 + src/Pole.Core/Processor/IProcessor.cs | 13 +++++++++++++ src/Pole.Core/Processor/LoopProcessor.cs | 48 ++++++++++++++++++++++++++++++++++++++++++++++++ src/Pole.Core/Processor/PendingMessageRetryProcessor.cs | 79 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/Pole.Core/Processor/ProcessingContext.cs | 17 +++++++++++++++++ src/Pole.Core/Processor/ProcessorBase.cs | 19 +++++++++++++++++++ src/Pole.Core/ProducerOptions.cs | 2 +- src/Pole.Core/UnitOfWork/UnitOfWork.cs | 3 ++- src/Pole.EventStorage.PostgreSql/PostgreSqlEventStorage.cs | 20 ++++++++++++++++++-- 12 files changed, 202 insertions(+), 12 deletions(-) create mode 100644 src/Pole.Core/Processor/IProcessor.cs create mode 100644 src/Pole.Core/Processor/LoopProcessor.cs create mode 100644 src/Pole.Core/Processor/PendingMessageRetryProcessor.cs create mode 100644 src/Pole.Core/Processor/ProcessingContext.cs create mode 100644 src/Pole.Core/Processor/ProcessorBase.cs diff --git a/src/Pole.Core/EventBus/Bus.cs b/src/Pole.Core/EventBus/Bus.cs index fadc789..ab593e5 100644 --- a/src/Pole.Core/EventBus/Bus.cs +++ b/src/Pole.Core/EventBus/Bus.cs @@ -38,10 +38,7 @@ namespace Pole.Core.EventBus var eventType = @event.GetType(); var eventTypeCode = eventTypeFinder.GetCode(eventType); var eventId = snowflakeIdGenerator.NextId(); - //var eventContentBytes = serializer.SerializeToUtf8Bytes(@event, eventType); var eventContent = serializer.Serialize(@event, eventType); - //var bytesTransport = new EventBytesTransport(eventTypeCode, eventId, eventContentBytes); - //var bytes = bytesTransport.GetBytes(); var eventEntity = new EventEntity { Added = DateTime.UtcNow, @@ -50,7 +47,7 @@ namespace Pole.Core.EventBus Id = eventId, Name = eventTypeCode, Retries = 0, - StatusName = nameof(EventStatus.PrePublish) + StatusName = nameof(EventStatus.Pending) }; if (Transaction?.DbTransaction == null) { @@ -62,8 +59,6 @@ namespace Pole.Core.EventBus } PrePublishEventBuffer.Add(eventEntity); - //await producer.Publish(bytes); - //await eventStorage.ChangePublishStateAsync(eventEntity,EventStatus.Published); return true; } diff --git a/src/Pole.Core/EventBus/EventStorage/EventStatus.cs b/src/Pole.Core/EventBus/EventStorage/EventStatus.cs index 9c43086..4aec229 100644 --- a/src/Pole.Core/EventBus/EventStorage/EventStatus.cs +++ b/src/Pole.Core/EventBus/EventStorage/EventStatus.cs @@ -7,7 +7,7 @@ namespace Pole.Core.EventBus.EventStorage public enum EventStatus { Failed = -1, - PrePublish = 0, + Pending = 0, Published = 1 } } diff --git a/src/Pole.Core/EventBus/EventStorage/IEventStorage.cs b/src/Pole.Core/EventBus/EventStorage/IEventStorage.cs index c282a76..8e7f4bc 100644 --- a/src/Pole.Core/EventBus/EventStorage/IEventStorage.cs +++ b/src/Pole.Core/EventBus/EventStorage/IEventStorage.cs @@ -9,12 +9,13 @@ namespace Pole.Core.EventBus.EventStorage public interface IEventStorage { Task ChangePublishStateAsync(EventEntity message, EventStatus state); + Task BulkChangePublishStateAsync(IEnumerable messages); Task StoreMessage(EventEntity eventEntity, object dbTransaction = null); Task DeleteExpiresAsync(string table, DateTime timeout, int batchCount = 1000, CancellationToken token = default); - Task> GetPublishedMessagesOfNeedRetry(); + Task> GetMessagesOfNeedRetry(); } } diff --git a/src/Pole.Core/PoleOptions.cs b/src/Pole.Core/PoleOptions.cs index 3059b92..732065a 100644 --- a/src/Pole.Core/PoleOptions.cs +++ b/src/Pole.Core/PoleOptions.cs @@ -7,6 +7,7 @@ namespace Pole.Core { public class PoleOptions { + public int PendingMessageRetryIntervalSeconds { get; set; } = 30; public IServiceCollection Services { get; private set; } } } diff --git a/src/Pole.Core/Processor/IProcessor.cs b/src/Pole.Core/Processor/IProcessor.cs new file mode 100644 index 0000000..e15efd7 --- /dev/null +++ b/src/Pole.Core/Processor/IProcessor.cs @@ -0,0 +1,13 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; + +namespace Pole.Core.Processor +{ + public interface IProcessor + { + string Name { get; } + Task Process(ProcessingContext context); + } +} diff --git a/src/Pole.Core/Processor/LoopProcessor.cs b/src/Pole.Core/Processor/LoopProcessor.cs new file mode 100644 index 0000000..e528e4f --- /dev/null +++ b/src/Pole.Core/Processor/LoopProcessor.cs @@ -0,0 +1,48 @@ +using Microsoft.Extensions.Logging; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; + +namespace Pole.Core.Processor +{ + public class LoopProcessor : ProcessorBase + { + private IProcessor _processor; + private readonly ILoggerFactory _loggerFactory; + + public LoopProcessor(IProcessor processor, ILoggerFactory loggerFactory) + { + _processor = processor; + _loggerFactory = loggerFactory; + } + public override string Name => "LoopProcessor"; + public override async Task Process(ProcessingContext context) + { + var logger = _loggerFactory.CreateLogger(); + + while (!context.IsStopping) + { + try + { + logger.LogDebug($"{DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss.fff")}...{ this.ToString() } process start"); + + await _processor.Process(context); + + logger.LogDebug($"{DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss.fff")}...{ this.ToString() } process compelete"); + } + catch (Exception ex) + { + logger.LogError(ex, $"{DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss.fff")}...{ this.ToString() } process error"); + } + } + } + public override string ToString() + { + var strArray = new string[2]; + strArray[0] = Name; + strArray[1] = _processor.Name; + return string.Join("_", strArray); + } + } +} diff --git a/src/Pole.Core/Processor/PendingMessageRetryProcessor.cs b/src/Pole.Core/Processor/PendingMessageRetryProcessor.cs new file mode 100644 index 0000000..48fbc66 --- /dev/null +++ b/src/Pole.Core/Processor/PendingMessageRetryProcessor.cs @@ -0,0 +1,79 @@ +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using Pole.Core.Abstraction; +using Pole.Core.EventBus; +using Pole.Core.EventBus.Event; +using Pole.Core.EventBus.EventStorage; +using Pole.Core.Serialization; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Pole.Core.Processor +{ + public class PendingMessageRetryProcessor : ProcessorBase + { + private readonly IEventStorage eventStorage; + private readonly PoleOptions options; + private readonly IProducer producer; + private readonly IEventTypeFinder eventTypeFinder; + private readonly ISerializer serializer; + private readonly ILogger logger; + private readonly ProducerOptions producerOptions; + public PendingMessageRetryProcessor(IEventStorage eventStorage, IOptions options, ILogger logger, IProducer producer, IEventTypeFinder eventTypeFinder, ISerializer serializer, IOptions producerOptions) + { + this.eventStorage = eventStorage; + this.options = options.Value ?? throw new Exception($"{nameof(PoleOptions)} Must be injected"); + this.logger = logger; + this.producer = producer; + this.eventTypeFinder = eventTypeFinder; + this.serializer = serializer; + this.producerOptions = producerOptions.Value ?? throw new Exception($"{nameof(ProducerOptions)} Must be injected"); + } + public override string Name => nameof(PendingMessageRetryProcessor); + + + public override async Task Process(ProcessingContext context) + { + try + { + await ProcessInternal(); + } + catch (Exception ex) + { + logger.LogError(ex, $"{nameof(PendingMessageRetryProcessor)} Process Error"); + } + finally + { + await Task.Delay(options.PendingMessageRetryIntervalSeconds * 1000); + } + } + public async Task ProcessInternal() + { + var now = DateTime.UtcNow; + var pendingMessages = await eventStorage.GetMessagesOfNeedRetry(); + + if (logger.IsEnabled(LogLevel.Debug)) + { + logger.LogDebug($"{nameof(PendingMessageRetryProcessor)} pendingMessages count:{pendingMessages.Count()}"); + } + foreach (var pendingMessage in pendingMessages) + { + var eventType = eventTypeFinder.FindType(pendingMessage.Name); + var eventContentBytes = serializer.SerializeToUtf8Bytes(pendingMessage, eventType); + var bytesTransport = new EventBytesTransport(pendingMessage.Name, pendingMessage.Id, eventContentBytes); + var bytes = bytesTransport.GetBytes(); + if (pendingMessage.Retries > producerOptions.MaxFailedRetryCount) + { + pendingMessage.ExpiresAt = DateTime.UtcNow; + } + pendingMessage.Retries++; + await producer.Publish(bytes); + pendingMessage.StatusName = nameof(EventStatus.Published); + } + await eventStorage.BulkChangePublishStateAsync(pendingMessages); + } + } +} diff --git a/src/Pole.Core/Processor/ProcessingContext.cs b/src/Pole.Core/Processor/ProcessingContext.cs new file mode 100644 index 0000000..a4a55c7 --- /dev/null +++ b/src/Pole.Core/Processor/ProcessingContext.cs @@ -0,0 +1,17 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; + +namespace Pole.Core.Processor +{ + public class ProcessingContext + { + public ProcessingContext(CancellationToken cancellationToken) + { + CancellationToken = cancellationToken; + } + public CancellationToken CancellationToken { get; } + public bool IsStopping => CancellationToken.IsCancellationRequested; + } +} diff --git a/src/Pole.Core/Processor/ProcessorBase.cs b/src/Pole.Core/Processor/ProcessorBase.cs new file mode 100644 index 0000000..e3395b7 --- /dev/null +++ b/src/Pole.Core/Processor/ProcessorBase.cs @@ -0,0 +1,19 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; + +namespace Pole.Core.Processor +{ + public abstract class ProcessorBase : IProcessor + { + public abstract string Name { get; } + + public abstract Task Process(ProcessingContext context); + + public override string ToString() + { + return Name; + } + } +} diff --git a/src/Pole.Core/ProducerOptions.cs b/src/Pole.Core/ProducerOptions.cs index ccf436b..0084585 100644 --- a/src/Pole.Core/ProducerOptions.cs +++ b/src/Pole.Core/ProducerOptions.cs @@ -6,6 +6,6 @@ namespace Pole.Core { public class ProducerOptions { - public int FailedRetryCount { get; set; } + public int MaxFailedRetryCount { get; set; } = 40; } } diff --git a/src/Pole.Core/UnitOfWork/UnitOfWork.cs b/src/Pole.Core/UnitOfWork/UnitOfWork.cs index abd1a6f..5467347 100644 --- a/src/Pole.Core/UnitOfWork/UnitOfWork.cs +++ b/src/Pole.Core/UnitOfWork/UnitOfWork.cs @@ -43,8 +43,9 @@ namespace Pole.Core.UnitOfWork var bytesTransport = new EventBytesTransport(@event.Name, @event.Id, eventContentBytes); var bytes = bytesTransport.GetBytes(); await producer.Publish(bytes); - await eventStorage.ChangePublishStateAsync(@event, EventStatus.Published); + @event.StatusName = nameof(EventStatus.Published); }); + await eventStorage.BulkChangePublishStateAsync(bufferedEvents); } public void Dispose() diff --git a/src/Pole.EventStorage.PostgreSql/PostgreSqlEventStorage.cs b/src/Pole.EventStorage.PostgreSql/PostgreSqlEventStorage.cs index eeadcab..00222c0 100644 --- a/src/Pole.EventStorage.PostgreSql/PostgreSqlEventStorage.cs +++ b/src/Pole.EventStorage.PostgreSql/PostgreSqlEventStorage.cs @@ -7,6 +7,7 @@ using Pole.Core.EventBus.EventStorage; using System; using System.Collections.Generic; using System.Data; +using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; @@ -26,6 +27,21 @@ namespace Pole.EventStorage.PostgreSql this.eventStorageInitializer = eventStorageInitializer; tableName = eventStorageInitializer.GetTableName(); } + + public async Task BulkChangePublishStateAsync(IEnumerable events) + { + var sql = +$"UPDATE {tableName} SET \"Retries\"=@Retries,\"ExpiresAt\"=@ExpiresAt,\"StatusName\"=@StatusName WHERE \"Id\"= any @Ids"; + using var connection = new NpgsqlConnection(options.ConnectionString); + await connection.ExecuteAsync(sql, events.Select(@event=> new + { + Ids = events.Select(@event=>@event.Id).ToArray(), + @event.Retries, + @event.ExpiresAt, + @event.StatusName + }).ToList()); + } + public async Task ChangePublishStateAsync(EventEntity message, EventStatus state) { var sql = @@ -49,11 +65,11 @@ namespace Pole.EventStorage.PostgreSql new { timeout, batchCount }); } - public async Task> GetPublishedMessagesOfNeedRetry() + public async Task> GetMessagesOfNeedRetry() { var fourMinAgo = DateTime.UtcNow.AddMinutes(-4).ToString("O"); var sql = - $"SELECT * FROM {tableName} WHERE \"Retries\"<{producerOptions.FailedRetryCount} AND \"Added\"<'{fourMinAgo}' AND (\"StatusName\"='{EventStatus.Failed}' OR \"StatusName\"='{EventStatus.PrePublish}') LIMIT 200;"; + $"SELECT * FROM {tableName} WHERE \"Retries\"<{producerOptions.MaxFailedRetryCount} AND \"Added\"<'{fourMinAgo}' AND (\"StatusName\"='{EventStatus.Failed}' OR \"StatusName\"='{EventStatus.Pending}') for update skip locked LIMIT 200;"; var result = new List(); using var connection = new NpgsqlConnection(options.ConnectionString); -- libgit2 0.25.0