Commit d0e3d7c6 by 丁松杰

事件发布流程添加 本地事务表保证

parent 27593c6c
using Pole.Core.Abstraction;
using Pole.Core.EventBus.Event;
using Pole.Core.EventBus.EventStorage;
using Pole.Core.EventBus.Transaction;
using Pole.Core.Serialization;
using Pole.Core.Utils.Abstraction;
......@@ -17,25 +18,55 @@ namespace Pole.Core.EventBus
private readonly IEventTypeFinder eventTypeFinder;
private readonly ISerializer serializer;
private readonly ISnowflakeIdGenerator snowflakeIdGenerator;
private readonly IEventStorage eventStorage;
public IDbTransactionAdapter Transaction { get; set; }
public IServiceProvider ServiceProvider { get; }
public Bus(IServiceProvider serviceProvider, IProducer producer, IEventTypeFinder eventTypeFinder, ISerializer serializer, ISnowflakeIdGenerator snowflakeIdGenerator)
public Bus(IServiceProvider serviceProvider, IProducer producer, IEventTypeFinder eventTypeFinder, ISerializer serializer, ISnowflakeIdGenerator snowflakeIdGenerator, IEventStorage eventStorage)
{
ServiceProvider = serviceProvider;
this.producer = producer;
this.eventTypeFinder = eventTypeFinder;
this.serializer = serializer;
this.snowflakeIdGenerator = snowflakeIdGenerator;
this.eventStorage = eventStorage;
}
public async Task<bool> Publish(object @event, CancellationToken cancellationToken = default)
{
var eventType = @event.GetType();
var eventTypeCode = eventTypeFinder.GetCode(eventType);
var eventId = snowflakeIdGenerator.NextId();
var bytesTransport = new EventBytesTransport(eventTypeCode, eventId, serializer.SerializeToUtf8Bytes(@event, eventType));
var eventContentBytes = serializer.SerializeToUtf8Bytes(@event, eventType);
var eventContent = serializer.Serialize(@event, eventType);
var bytesTransport = new EventBytesTransport(eventTypeCode, eventId, eventContentBytes);
var bytes = bytesTransport.GetBytes();
var eventEntity = new EventEntity
{
Added = DateTime.UtcNow,
Content = eventContent,
ExpiresAt = null,
Id = eventId,
Name = eventTypeCode,
Retries = 0,
StatusName = nameof(EventStatus.PrePublish)
};
if (Transaction?.DbTransaction == null)
{
var mediumMessage = await eventStorage.StoreMessage(eventEntity);
}
else
{
var transaction = (IDbTransactionAdapter)Transaction;
var mediumMessage = eventStorage.StoreMessage(eventEntity, transaction.DbTransaction);
if (transaction.AutoCommit)
{
await transaction.CommitAsync();
}
}
await producer.Publish(bytes);
return true;
}
......
......@@ -10,7 +10,7 @@ namespace Pole.Core.EventBus.EventStorage
public string Name { get; set; }
public string Content { get; set; }
public DateTime Added { get; set; }
public DateTime ExpiresAt { get; set; }
public DateTime? ExpiresAt { get; set; }
public int Retries { get; set; }
public string StatusName { get; set; }
}
......
......@@ -51,7 +51,7 @@ namespace Pole.EventStorage.PostgreSql
public async Task<IEnumerable<EventEntity>> GetPublishedMessagesOfNeedRetry()
{
var fourMinAgo = DateTime.Now.AddMinutes(-4).ToString("O");
var fourMinAgo = DateTime.UtcNow.AddMinutes(-4).ToString("O");
var sql =
$"SELECT * FROM {tableName} WHERE \"Retries\"<{producerOptions.FailedRetryCount} AND \"Added\"<'{fourMinAgo}' AND (\"StatusName\"='{EventStatus.Failed}' OR \"StatusName\"='{EventStatus.PrePublish}') LIMIT 200;";
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment