From 5de72aaf40e53692dbcd7a8a5dc7b0a9c00a4e5c Mon Sep 17 00:00:00 2001 From: 丁松杰 <377973147@qq.com> Date: Fri, 14 Feb 2020 14:23:32 +0800 Subject: [PATCH] 完成 清理时间等的配置 --- src/Pole.Core/PoleOptions.cs | 1 + src/Pole.Core/Processor/PendingMessageRetryProcessor.cs | 1 + src/Pole.Core/UnitOfWork/UnitOfWork.cs | 8 ++++++-- 3 files changed, 8 insertions(+), 2 deletions(-) diff --git a/src/Pole.Core/PoleOptions.cs b/src/Pole.Core/PoleOptions.cs index 3a8fbbb..270468e 100644 --- a/src/Pole.Core/PoleOptions.cs +++ b/src/Pole.Core/PoleOptions.cs @@ -11,6 +11,7 @@ namespace Pole.Core public int ExpiredEventsPreBulkDeleteDelaySeconds { get; set; } = 3; public int ExpiredEventsCollectIntervalSeconds { get; set; } = 60 * 60; + public int PublishedEventsExpiredAfterSeconds { get; set; } = 60 * 60; public IServiceCollection Services { get; private set; } } } diff --git a/src/Pole.Core/Processor/PendingMessageRetryProcessor.cs b/src/Pole.Core/Processor/PendingMessageRetryProcessor.cs index 48fbc66..c2f10a9 100644 --- a/src/Pole.Core/Processor/PendingMessageRetryProcessor.cs +++ b/src/Pole.Core/Processor/PendingMessageRetryProcessor.cs @@ -72,6 +72,7 @@ namespace Pole.Core.Processor pendingMessage.Retries++; await producer.Publish(bytes); pendingMessage.StatusName = nameof(EventStatus.Published); + pendingMessage.ExpiresAt = DateTime.UtcNow.AddSeconds(options.PublishedEventsExpiredAfterSeconds); } await eventStorage.BulkChangePublishStateAsync(pendingMessages); } diff --git a/src/Pole.Core/UnitOfWork/UnitOfWork.cs b/src/Pole.Core/UnitOfWork/UnitOfWork.cs index 5467347..6fe9604 100644 --- a/src/Pole.Core/UnitOfWork/UnitOfWork.cs +++ b/src/Pole.Core/UnitOfWork/UnitOfWork.cs @@ -12,6 +12,7 @@ using Pole.Core.Abstraction; using Pole.Core.Serialization; using Pole.Core.EventBus.Event; using Pole.Core.EventBus.EventStorage; +using Microsoft.Extensions.Options; namespace Pole.Core.UnitOfWork { @@ -21,13 +22,15 @@ namespace Pole.Core.UnitOfWork private readonly IEventTypeFinder eventTypeFinder; private readonly ISerializer serializer; private readonly IEventStorage eventStorage; + private readonly PoleOptions options; private IBus bus; - public UnitOfWork(IProducer producer, IEventTypeFinder eventTypeFinder, ISerializer serializer, IEventStorage eventStorage) + public UnitOfWork(IProducer producer, IEventTypeFinder eventTypeFinder, ISerializer serializer, IEventStorage eventStorage, IOptions options) { this.producer = producer; this.eventTypeFinder = eventTypeFinder; this.serializer = serializer; this.eventStorage = eventStorage; + this.options = options.Value; } public async Task CompeleteAsync(CancellationToken cancellationToken = default) @@ -43,7 +46,8 @@ namespace Pole.Core.UnitOfWork var bytesTransport = new EventBytesTransport(@event.Name, @event.Id, eventContentBytes); var bytes = bytesTransport.GetBytes(); await producer.Publish(bytes); - @event.StatusName = nameof(EventStatus.Published); + @event.StatusName = nameof(EventStatus.Published); + @event.ExpiresAt = DateTime.UtcNow.AddSeconds(options.PublishedEventsExpiredAfterSeconds); }); await eventStorage.BulkChangePublishStateAsync(bufferedEvents); } -- libgit2 0.25.0