diff --git a/src/Pole.Sagas.Storage.PostgreSql/PoleSagasPostgreSqlExtensions.cs b/src/Pole.Sagas.Storage.PostgreSql/PoleSagasPostgreSqlExtensions.cs new file mode 100644 index 0000000..3c236e9 --- /dev/null +++ b/src/Pole.Sagas.Storage.PostgreSql/PoleSagasPostgreSqlExtensions.cs @@ -0,0 +1,20 @@ +using Microsoft.Extensions.DependencyInjection; +using Pole.Sagas.Core; +using Pole.Sagas.Core.Abstraction; +using System; +using System.Collections.Generic; +using System.Text; + +namespace Pole.Sagas.Storage.PostgreSql +{ + public static class PoleSagasPostgreSqlExtensions + { + public static IServiceCollection AddPostgreSqlStorage(IServiceCollection services,Action config) + { + services.Configure(config); + services.AddSingleton(); + services.AddSingleton(); + return services; + } + } +} diff --git a/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorage.cs b/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorage.cs index 5b82ab1..173ee37 100644 --- a/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorage.cs +++ b/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorage.cs @@ -1,4 +1,8 @@ -using Pole.Sagas.Core; +using Dapper; +using Microsoft.Extensions.Options; +using Npgsql; +using Pole.Sagas.Core; +using Pole.Sagas.Core.Abstraction; using System; using System.Collections.Generic; using System.Text; @@ -8,9 +12,42 @@ namespace Pole.Sagas.Storage.PostgreSql { public class PostgreSqlSagaStorage : ISagaStorage { - public Task ActivityCompensateAborted(string activityId, string sagaId, string errors) + private readonly string sagaTableName; + private readonly string activityTableName; + private readonly PoleSagasStoragePostgreSqlOption poleSagasStoragePostgreSqlOption; + private readonly ISagaStorageInitializer sagaStorageInitializer; + public PostgreSqlSagaStorage(IOptions poleSagasStoragePostgreSqlOption, ISagaStorageInitializer sagaStorageInitializer) { - throw new NotImplementedException(); + this.poleSagasStoragePostgreSqlOption = poleSagasStoragePostgreSqlOption.Value; + this.sagaStorageInitializer = sagaStorageInitializer; + sagaTableName = sagaStorageInitializer.GetSagaTableName(); + activityTableName = sagaStorageInitializer.GetActivityTableName(); + } + public async Task ActivityCompensateAborted(string activityId, string sagaId, string errors) + { + using (var connection = new NpgsqlConnection(poleSagasStoragePostgreSqlOption.ConnectionString)) + { + using(var tansaction = await connection.BeginTransactionAsync()) + { + var updateActivitySql = +$"UPDATE {activityTableName} SET \"Status\"=@Status,\"Errors\"=@Errors WHERE \"Id\" = @Id"; + await connection.ExecuteAsync(updateActivitySql, new + { + Id = activityId, + Errors= errors, + Status= nameof(ActivityStatus.CompensateAborted) + }, tansaction); + + var updateSagaSql = +$"UPDATE {sagaTableName} SET \"Status\"=@Status,\"Errors\"=@Errors WHERE \"Id\" = @Id"; + await connection.ExecuteAsync(updateSagaSql, new + { + Id = activityId, + Status = nameof(ActivityStatus.CompensateAborted) + }, tansaction); + } + + } } public Task ActivityCompensated(string activityId)