Commit 20631e55 by dingsongjie

完善 eventbus 多个实例之间重试时的数据隔离

parent 2d910fc1
...@@ -83,7 +83,10 @@ $"UPDATE {tableName} SET \"Retries\"=@Retries,\"ExpiresAt\"=@ExpiresAt,\"StatusN ...@@ -83,7 +83,10 @@ $"UPDATE {tableName} SET \"Retries\"=@Retries,\"ExpiresAt\"=@ExpiresAt,\"StatusN
$"SELECT * FROM {tableName} WHERE \"Retries\"<{producerOptions.MaxFailedRetryCount} AND \"Added\"<'{fourMinAgo}' AND \"StatusName\"='{EventStatus.Pending}' for update skip locked LIMIT 200;"; $"SELECT * FROM {tableName} WHERE \"Retries\"<{producerOptions.MaxFailedRetryCount} AND \"Added\"<'{fourMinAgo}' AND \"StatusName\"='{EventStatus.Pending}' for update skip locked LIMIT 200;";
var result = new List<EventEntity>(); var result = new List<EventEntity>();
using var connection = new NpgsqlConnection(options.ConnectionString); using (var connection = new NpgsqlConnection(options.ConnectionString))
{
using (var transaction = await connection.BeginTransactionAsync())
{
var reader = await connection.ExecuteReaderAsync(sql); var reader = await connection.ExecuteReaderAsync(sql);
while (reader.Read()) while (reader.Read())
{ {
...@@ -97,9 +100,11 @@ $"UPDATE {tableName} SET \"Retries\"=@Retries,\"ExpiresAt\"=@ExpiresAt,\"StatusN ...@@ -97,9 +100,11 @@ $"UPDATE {tableName} SET \"Retries\"=@Retries,\"ExpiresAt\"=@ExpiresAt,\"StatusN
StatusName = reader.GetString(7) StatusName = reader.GetString(7)
}); });
} }
await transaction.CommitAsync();
return result; return result;
} }
}
}
public async Task<bool> StoreMessage(EventEntity eventEntity, object dbTransaction = null) public async Task<bool> StoreMessage(EventEntity eventEntity, object dbTransaction = null)
{ {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment