From 20631e557f785b7c1100f9f43664bc82d6e0cd8e Mon Sep 17 00:00:00 2001 From: dingsongjie Date: Thu, 12 Mar 2020 17:41:16 +0800 Subject: [PATCH] 完善 eventbus 多个实例之间重试时的数据隔离 --- src/Pole.EventStorage.PostgreSql/PostgreSqlEventStorage.cs | 31 ++++++++++++++++++------------- 1 file changed, 18 insertions(+), 13 deletions(-) diff --git a/src/Pole.EventStorage.PostgreSql/PostgreSqlEventStorage.cs b/src/Pole.EventStorage.PostgreSql/PostgreSqlEventStorage.cs index c8fb158..d1dcb27 100644 --- a/src/Pole.EventStorage.PostgreSql/PostgreSqlEventStorage.cs +++ b/src/Pole.EventStorage.PostgreSql/PostgreSqlEventStorage.cs @@ -83,22 +83,27 @@ $"UPDATE {tableName} SET \"Retries\"=@Retries,\"ExpiresAt\"=@ExpiresAt,\"StatusN $"SELECT * FROM {tableName} WHERE \"Retries\"<{producerOptions.MaxFailedRetryCount} AND \"Added\"<'{fourMinAgo}' AND \"StatusName\"='{EventStatus.Pending}' for update skip locked LIMIT 200;"; var result = new List(); - using var connection = new NpgsqlConnection(options.ConnectionString); - var reader = await connection.ExecuteReaderAsync(sql); - while (reader.Read()) + using (var connection = new NpgsqlConnection(options.ConnectionString)) { - result.Add(new EventEntity + using (var transaction = await connection.BeginTransactionAsync()) { - Id = reader.GetString(0), - Name = reader.GetString(2), - Content = reader.GetString(3), - Retries = reader.GetInt32(4), - Added = reader.GetDateTime(5), - StatusName = reader.GetString(7) - }); + var reader = await connection.ExecuteReaderAsync(sql); + while (reader.Read()) + { + result.Add(new EventEntity + { + Id = reader.GetString(0), + Name = reader.GetString(2), + Content = reader.GetString(3), + Retries = reader.GetInt32(4), + Added = reader.GetDateTime(5), + StatusName = reader.GetString(7) + }); + } + await transaction.CommitAsync(); + return result; + } } - - return result; } public async Task StoreMessage(EventEntity eventEntity, object dbTransaction = null) -- libgit2 0.25.0