Commit e57c03a2 by dingsongjie

add some sql

parent fa6f2610
using Microsoft.Extensions.DependencyInjection;
using Pole.Sagas.Core;
using Pole.Sagas.Core.Abstraction;
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.Sagas.Storage.PostgreSql
{
public static class PoleSagasPostgreSqlExtensions
{
public static IServiceCollection AddPostgreSqlStorage(IServiceCollection services,Action<PoleSagasStoragePostgreSqlOption> config)
{
services.Configure(config);
services.AddSingleton<ISagaStorageInitializer, PostgreSqlEventStorageInitializer>();
services.AddSingleton<ISagaStorage, PostgreSqlSagaStorage>();
return services;
}
}
}
using Pole.Sagas.Core; using Dapper;
using Microsoft.Extensions.Options;
using Npgsql;
using Pole.Sagas.Core;
using Pole.Sagas.Core.Abstraction;
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Text; using System.Text;
...@@ -8,9 +12,42 @@ namespace Pole.Sagas.Storage.PostgreSql ...@@ -8,9 +12,42 @@ namespace Pole.Sagas.Storage.PostgreSql
{ {
public class PostgreSqlSagaStorage : ISagaStorage public class PostgreSqlSagaStorage : ISagaStorage
{ {
public Task ActivityCompensateAborted(string activityId, string sagaId, string errors) private readonly string sagaTableName;
private readonly string activityTableName;
private readonly PoleSagasStoragePostgreSqlOption poleSagasStoragePostgreSqlOption;
private readonly ISagaStorageInitializer sagaStorageInitializer;
public PostgreSqlSagaStorage(IOptions<PoleSagasStoragePostgreSqlOption> poleSagasStoragePostgreSqlOption, ISagaStorageInitializer sagaStorageInitializer)
{ {
throw new NotImplementedException(); this.poleSagasStoragePostgreSqlOption = poleSagasStoragePostgreSqlOption.Value;
this.sagaStorageInitializer = sagaStorageInitializer;
sagaTableName = sagaStorageInitializer.GetSagaTableName();
activityTableName = sagaStorageInitializer.GetActivityTableName();
}
public async Task ActivityCompensateAborted(string activityId, string sagaId, string errors)
{
using (var connection = new NpgsqlConnection(poleSagasStoragePostgreSqlOption.ConnectionString))
{
using(var tansaction = await connection.BeginTransactionAsync())
{
var updateActivitySql =
$"UPDATE {activityTableName} SET \"Status\"=@Status,\"Errors\"=@Errors WHERE \"Id\" = @Id";
await connection.ExecuteAsync(updateActivitySql, new
{
Id = activityId,
Errors= errors,
Status= nameof(ActivityStatus.CompensateAborted)
}, tansaction);
var updateSagaSql =
$"UPDATE {sagaTableName} SET \"Status\"=@Status,\"Errors\"=@Errors WHERE \"Id\" = @Id";
await connection.ExecuteAsync(updateSagaSql, new
{
Id = activityId,
Status = nameof(ActivityStatus.CompensateAborted)
}, tansaction);
}
}
} }
public Task ActivityCompensated(string activityId) public Task ActivityCompensated(string activityId)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment