diff --git a/src/Pole.EventStorage.PostgreSql/PostgreSqlEventStorage.cs b/src/Pole.EventStorage.PostgreSql/PostgreSqlEventStorage.cs index c8fb158..d1dcb27 100644 --- a/src/Pole.EventStorage.PostgreSql/PostgreSqlEventStorage.cs +++ b/src/Pole.EventStorage.PostgreSql/PostgreSqlEventStorage.cs @@ -83,22 +83,27 @@ $"UPDATE {tableName} SET \"Retries\"=@Retries,\"ExpiresAt\"=@ExpiresAt,\"StatusN $"SELECT * FROM {tableName} WHERE \"Retries\"<{producerOptions.MaxFailedRetryCount} AND \"Added\"<'{fourMinAgo}' AND \"StatusName\"='{EventStatus.Pending}' for update skip locked LIMIT 200;"; var result = new List(); - using var connection = new NpgsqlConnection(options.ConnectionString); - var reader = await connection.ExecuteReaderAsync(sql); - while (reader.Read()) + using (var connection = new NpgsqlConnection(options.ConnectionString)) { - result.Add(new EventEntity + using (var transaction = await connection.BeginTransactionAsync()) { - Id = reader.GetString(0), - Name = reader.GetString(2), - Content = reader.GetString(3), - Retries = reader.GetInt32(4), - Added = reader.GetDateTime(5), - StatusName = reader.GetString(7) - }); + var reader = await connection.ExecuteReaderAsync(sql); + while (reader.Read()) + { + result.Add(new EventEntity + { + Id = reader.GetString(0), + Name = reader.GetString(2), + Content = reader.GetString(3), + Retries = reader.GetInt32(4), + Added = reader.GetDateTime(5), + StatusName = reader.GetString(7) + }); + } + await transaction.CommitAsync(); + return result; + } } - - return result; } public async Task StoreMessage(EventEntity eventEntity, object dbTransaction = null)