diff --git a/src/Pole.Core/EventBus/Bus.cs b/src/Pole.Core/EventBus/Bus.cs index f074055..efc8059 100644 --- a/src/Pole.Core/EventBus/Bus.cs +++ b/src/Pole.Core/EventBus/Bus.cs @@ -1,5 +1,6 @@ using Pole.Core.Abstraction; using Pole.Core.EventBus.Event; +using Pole.Core.EventBus.EventStorage; using Pole.Core.EventBus.Transaction; using Pole.Core.Serialization; using Pole.Core.Utils.Abstraction; @@ -17,25 +18,55 @@ namespace Pole.Core.EventBus private readonly IEventTypeFinder eventTypeFinder; private readonly ISerializer serializer; private readonly ISnowflakeIdGenerator snowflakeIdGenerator; + private readonly IEventStorage eventStorage; public IDbTransactionAdapter Transaction { get; set; } public IServiceProvider ServiceProvider { get; } - public Bus(IServiceProvider serviceProvider, IProducer producer, IEventTypeFinder eventTypeFinder, ISerializer serializer, ISnowflakeIdGenerator snowflakeIdGenerator) + public Bus(IServiceProvider serviceProvider, IProducer producer, IEventTypeFinder eventTypeFinder, ISerializer serializer, ISnowflakeIdGenerator snowflakeIdGenerator, IEventStorage eventStorage) { ServiceProvider = serviceProvider; this.producer = producer; this.eventTypeFinder = eventTypeFinder; this.serializer = serializer; this.snowflakeIdGenerator = snowflakeIdGenerator; + this.eventStorage = eventStorage; } public async Task Publish(object @event, CancellationToken cancellationToken = default) { var eventType = @event.GetType(); var eventTypeCode = eventTypeFinder.GetCode(eventType); var eventId = snowflakeIdGenerator.NextId(); - var bytesTransport = new EventBytesTransport(eventTypeCode, eventId, serializer.SerializeToUtf8Bytes(@event, eventType)); + var eventContentBytes = serializer.SerializeToUtf8Bytes(@event, eventType); + var eventContent = serializer.Serialize(@event, eventType); + var bytesTransport = new EventBytesTransport(eventTypeCode, eventId, eventContentBytes); var bytes = bytesTransport.GetBytes(); + var eventEntity = new EventEntity + { + Added = DateTime.UtcNow, + Content = eventContent, + ExpiresAt = null, + Id = eventId, + Name = eventTypeCode, + Retries = 0, + StatusName = nameof(EventStatus.PrePublish) + }; + if (Transaction?.DbTransaction == null) + { + var mediumMessage = await eventStorage.StoreMessage(eventEntity); + } + else + { + var transaction = (IDbTransactionAdapter)Transaction; + + var mediumMessage = eventStorage.StoreMessage(eventEntity, transaction.DbTransaction); + + if (transaction.AutoCommit) + { + await transaction.CommitAsync(); + } + } + await producer.Publish(bytes); return true; } diff --git a/src/Pole.Core/EventBus/EventStorage/EventEntity.cs b/src/Pole.Core/EventBus/EventStorage/EventEntity.cs index 2535c95..02284ff 100644 --- a/src/Pole.Core/EventBus/EventStorage/EventEntity.cs +++ b/src/Pole.Core/EventBus/EventStorage/EventEntity.cs @@ -10,7 +10,7 @@ namespace Pole.Core.EventBus.EventStorage public string Name { get; set; } public string Content { get; set; } public DateTime Added { get; set; } - public DateTime ExpiresAt { get; set; } + public DateTime? ExpiresAt { get; set; } public int Retries { get; set; } public string StatusName { get; set; } } diff --git a/src/Pole.EventStorage.PostgreSql/PostgreSqlEventStorage.cs b/src/Pole.EventStorage.PostgreSql/PostgreSqlEventStorage.cs index f7fc6fe..eeadcab 100644 --- a/src/Pole.EventStorage.PostgreSql/PostgreSqlEventStorage.cs +++ b/src/Pole.EventStorage.PostgreSql/PostgreSqlEventStorage.cs @@ -51,7 +51,7 @@ namespace Pole.EventStorage.PostgreSql public async Task> GetPublishedMessagesOfNeedRetry() { - var fourMinAgo = DateTime.Now.AddMinutes(-4).ToString("O"); + var fourMinAgo = DateTime.UtcNow.AddMinutes(-4).ToString("O"); var sql = $"SELECT * FROM {tableName} WHERE \"Retries\"<{producerOptions.FailedRetryCount} AND \"Added\"<'{fourMinAgo}' AND (\"StatusName\"='{EventStatus.Failed}' OR \"StatusName\"='{EventStatus.PrePublish}') LIMIT 200;";