Commit 5de72aaf by 丁松杰

完成 清理时间等的配置

parent f78114ce
......@@ -11,6 +11,7 @@ namespace Pole.Core
public int ExpiredEventsPreBulkDeleteDelaySeconds { get; set; } = 3;
public int ExpiredEventsCollectIntervalSeconds { get; set; } = 60 * 60;
public int PublishedEventsExpiredAfterSeconds { get; set; } = 60 * 60;
public IServiceCollection Services { get; private set; }
}
}
......@@ -72,6 +72,7 @@ namespace Pole.Core.Processor
pendingMessage.Retries++;
await producer.Publish(bytes);
pendingMessage.StatusName = nameof(EventStatus.Published);
pendingMessage.ExpiresAt = DateTime.UtcNow.AddSeconds(options.PublishedEventsExpiredAfterSeconds);
}
await eventStorage.BulkChangePublishStateAsync(pendingMessages);
}
......
......@@ -12,6 +12,7 @@ using Pole.Core.Abstraction;
using Pole.Core.Serialization;
using Pole.Core.EventBus.Event;
using Pole.Core.EventBus.EventStorage;
using Microsoft.Extensions.Options;
namespace Pole.Core.UnitOfWork
{
......@@ -21,13 +22,15 @@ namespace Pole.Core.UnitOfWork
private readonly IEventTypeFinder eventTypeFinder;
private readonly ISerializer serializer;
private readonly IEventStorage eventStorage;
private readonly PoleOptions options;
private IBus bus;
public UnitOfWork(IProducer producer, IEventTypeFinder eventTypeFinder, ISerializer serializer, IEventStorage eventStorage)
public UnitOfWork(IProducer producer, IEventTypeFinder eventTypeFinder, ISerializer serializer, IEventStorage eventStorage, IOptions<PoleOptions> options)
{
this.producer = producer;
this.eventTypeFinder = eventTypeFinder;
this.serializer = serializer;
this.eventStorage = eventStorage;
this.options = options.Value;
}
public async Task CompeleteAsync(CancellationToken cancellationToken = default)
......@@ -43,7 +46,8 @@ namespace Pole.Core.UnitOfWork
var bytesTransport = new EventBytesTransport(@event.Name, @event.Id, eventContentBytes);
var bytes = bytesTransport.GetBytes();
await producer.Publish(bytes);
@event.StatusName = nameof(EventStatus.Published);
@event.StatusName = nameof(EventStatus.Published);
@event.ExpiresAt = DateTime.UtcNow.AddSeconds(options.PublishedEventsExpiredAfterSeconds);
});
await eventStorage.BulkChangePublishStateAsync(bufferedEvents);
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment