Commit c0303e02 by 丁松杰

添加 event 发布者 重试机制

parent 592237d8
......@@ -38,10 +38,7 @@ namespace Pole.Core.EventBus
var eventType = @event.GetType();
var eventTypeCode = eventTypeFinder.GetCode(eventType);
var eventId = snowflakeIdGenerator.NextId();
//var eventContentBytes = serializer.SerializeToUtf8Bytes(@event, eventType);
var eventContent = serializer.Serialize(@event, eventType);
//var bytesTransport = new EventBytesTransport(eventTypeCode, eventId, eventContentBytes);
//var bytes = bytesTransport.GetBytes();
var eventEntity = new EventEntity
{
Added = DateTime.UtcNow,
......@@ -50,7 +47,7 @@ namespace Pole.Core.EventBus
Id = eventId,
Name = eventTypeCode,
Retries = 0,
StatusName = nameof(EventStatus.PrePublish)
StatusName = nameof(EventStatus.Pending)
};
if (Transaction?.DbTransaction == null)
{
......@@ -62,8 +59,6 @@ namespace Pole.Core.EventBus
}
PrePublishEventBuffer.Add(eventEntity);
//await producer.Publish(bytes);
//await eventStorage.ChangePublishStateAsync(eventEntity,EventStatus.Published);
return true;
}
......
......@@ -7,7 +7,7 @@ namespace Pole.Core.EventBus.EventStorage
public enum EventStatus
{
Failed = -1,
PrePublish = 0,
Pending = 0,
Published = 1
}
}
......@@ -9,12 +9,13 @@ namespace Pole.Core.EventBus.EventStorage
public interface IEventStorage
{
Task ChangePublishStateAsync(EventEntity message, EventStatus state);
Task BulkChangePublishStateAsync(IEnumerable<EventEntity> messages);
Task<bool> StoreMessage(EventEntity eventEntity, object dbTransaction = null);
Task<int> DeleteExpiresAsync(string table, DateTime timeout, int batchCount = 1000,
CancellationToken token = default);
Task<IEnumerable<EventEntity>> GetPublishedMessagesOfNeedRetry();
Task<IEnumerable<EventEntity>> GetMessagesOfNeedRetry();
}
}
......@@ -7,6 +7,7 @@ namespace Pole.Core
{
public class PoleOptions
{
public int PendingMessageRetryIntervalSeconds { get; set; } = 30;
public IServiceCollection Services { get; private set; }
}
}
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
namespace Pole.Core.Processor
{
public interface IProcessor
{
string Name { get; }
Task Process(ProcessingContext context);
}
}
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
namespace Pole.Core.Processor
{
public class LoopProcessor : ProcessorBase
{
private IProcessor _processor;
private readonly ILoggerFactory _loggerFactory;
public LoopProcessor(IProcessor processor, ILoggerFactory loggerFactory)
{
_processor = processor;
_loggerFactory = loggerFactory;
}
public override string Name => "LoopProcessor";
public override async Task Process(ProcessingContext context)
{
var logger = _loggerFactory.CreateLogger<LoopProcessor>();
while (!context.IsStopping)
{
try
{
logger.LogDebug($"{DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss.fff")}...{ this.ToString() } process start");
await _processor.Process(context);
logger.LogDebug($"{DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss.fff")}...{ this.ToString() } process compelete");
}
catch (Exception ex)
{
logger.LogError(ex, $"{DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss.fff")}...{ this.ToString() } process error");
}
}
}
public override string ToString()
{
var strArray = new string[2];
strArray[0] = Name;
strArray[1] = _processor.Name;
return string.Join("_", strArray);
}
}
}
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Pole.Core.Abstraction;
using Pole.Core.EventBus;
using Pole.Core.EventBus.Event;
using Pole.Core.EventBus.EventStorage;
using Pole.Core.Serialization;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace Pole.Core.Processor
{
public class PendingMessageRetryProcessor : ProcessorBase
{
private readonly IEventStorage eventStorage;
private readonly PoleOptions options;
private readonly IProducer producer;
private readonly IEventTypeFinder eventTypeFinder;
private readonly ISerializer serializer;
private readonly ILogger<PendingMessageRetryProcessor> logger;
private readonly ProducerOptions producerOptions;
public PendingMessageRetryProcessor(IEventStorage eventStorage, IOptions<PoleOptions> options, ILogger<PendingMessageRetryProcessor> logger, IProducer producer, IEventTypeFinder eventTypeFinder, ISerializer serializer, IOptions<ProducerOptions> producerOptions)
{
this.eventStorage = eventStorage;
this.options = options.Value ?? throw new Exception($"{nameof(PoleOptions)} Must be injected");
this.logger = logger;
this.producer = producer;
this.eventTypeFinder = eventTypeFinder;
this.serializer = serializer;
this.producerOptions = producerOptions.Value ?? throw new Exception($"{nameof(ProducerOptions)} Must be injected");
}
public override string Name => nameof(PendingMessageRetryProcessor);
public override async Task Process(ProcessingContext context)
{
try
{
await ProcessInternal();
}
catch (Exception ex)
{
logger.LogError(ex, $"{nameof(PendingMessageRetryProcessor)} Process Error");
}
finally
{
await Task.Delay(options.PendingMessageRetryIntervalSeconds * 1000);
}
}
public async Task ProcessInternal()
{
var now = DateTime.UtcNow;
var pendingMessages = await eventStorage.GetMessagesOfNeedRetry();
if (logger.IsEnabled(LogLevel.Debug))
{
logger.LogDebug($"{nameof(PendingMessageRetryProcessor)} pendingMessages count:{pendingMessages.Count()}");
}
foreach (var pendingMessage in pendingMessages)
{
var eventType = eventTypeFinder.FindType(pendingMessage.Name);
var eventContentBytes = serializer.SerializeToUtf8Bytes(pendingMessage, eventType);
var bytesTransport = new EventBytesTransport(pendingMessage.Name, pendingMessage.Id, eventContentBytes);
var bytes = bytesTransport.GetBytes();
if (pendingMessage.Retries > producerOptions.MaxFailedRetryCount)
{
pendingMessage.ExpiresAt = DateTime.UtcNow;
}
pendingMessage.Retries++;
await producer.Publish(bytes);
pendingMessage.StatusName = nameof(EventStatus.Published);
}
await eventStorage.BulkChangePublishStateAsync(pendingMessages);
}
}
}
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
namespace Pole.Core.Processor
{
public class ProcessingContext
{
public ProcessingContext(CancellationToken cancellationToken)
{
CancellationToken = cancellationToken;
}
public CancellationToken CancellationToken { get; }
public bool IsStopping => CancellationToken.IsCancellationRequested;
}
}
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
namespace Pole.Core.Processor
{
public abstract class ProcessorBase : IProcessor
{
public abstract string Name { get; }
public abstract Task Process(ProcessingContext context);
public override string ToString()
{
return Name;
}
}
}
......@@ -6,6 +6,6 @@ namespace Pole.Core
{
public class ProducerOptions
{
public int FailedRetryCount { get; set; }
public int MaxFailedRetryCount { get; set; } = 40;
}
}
......@@ -43,8 +43,9 @@ namespace Pole.Core.UnitOfWork
var bytesTransport = new EventBytesTransport(@event.Name, @event.Id, eventContentBytes);
var bytes = bytesTransport.GetBytes();
await producer.Publish(bytes);
await eventStorage.ChangePublishStateAsync(@event, EventStatus.Published);
@event.StatusName = nameof(EventStatus.Published);
});
await eventStorage.BulkChangePublishStateAsync(bufferedEvents);
}
public void Dispose()
......
......@@ -7,6 +7,7 @@ using Pole.Core.EventBus.EventStorage;
using System;
using System.Collections.Generic;
using System.Data;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
......@@ -26,6 +27,21 @@ namespace Pole.EventStorage.PostgreSql
this.eventStorageInitializer = eventStorageInitializer;
tableName = eventStorageInitializer.GetTableName();
}
public async Task BulkChangePublishStateAsync(IEnumerable<EventEntity> events)
{
var sql =
$"UPDATE {tableName} SET \"Retries\"=@Retries,\"ExpiresAt\"=@ExpiresAt,\"StatusName\"=@StatusName WHERE \"Id\"= any @Ids";
using var connection = new NpgsqlConnection(options.ConnectionString);
await connection.ExecuteAsync(sql, events.Select(@event=> new
{
Ids = events.Select(@event=>@event.Id).ToArray(),
@event.Retries,
@event.ExpiresAt,
@event.StatusName
}).ToList());
}
public async Task ChangePublishStateAsync(EventEntity message, EventStatus state)
{
var sql =
......@@ -49,11 +65,11 @@ namespace Pole.EventStorage.PostgreSql
new { timeout, batchCount });
}
public async Task<IEnumerable<EventEntity>> GetPublishedMessagesOfNeedRetry()
public async Task<IEnumerable<EventEntity>> GetMessagesOfNeedRetry()
{
var fourMinAgo = DateTime.UtcNow.AddMinutes(-4).ToString("O");
var sql =
$"SELECT * FROM {tableName} WHERE \"Retries\"<{producerOptions.FailedRetryCount} AND \"Added\"<'{fourMinAgo}' AND (\"StatusName\"='{EventStatus.Failed}' OR \"StatusName\"='{EventStatus.PrePublish}') LIMIT 200;";
$"SELECT * FROM {tableName} WHERE \"Retries\"<{producerOptions.MaxFailedRetryCount} AND \"Added\"<'{fourMinAgo}' AND (\"StatusName\"='{EventStatus.Failed}' OR \"StatusName\"='{EventStatus.Pending}') for update skip locked LIMIT 200;";
var result = new List<EventEntity>();
using var connection = new NpgsqlConnection(options.ConnectionString);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment