diff --git a/src/Pole.Sagas.Server/Services/SagaService.cs b/src/Pole.Sagas.Server/Services/SagaService.cs index 75a1a94..f14fb85 100644 --- a/src/Pole.Sagas.Server/Services/SagaService.cs +++ b/src/Pole.Sagas.Server/Services/SagaService.cs @@ -48,7 +48,7 @@ namespace Pole.Sagas.Server.Services CommonResponse commonResponse = new CommonResponse(); try { - await sagaStorage.ActivityExecuted(request.ActivityId, request.ResultData.ToByteArray()); + await sagaStorage.ActivityExecuted(request.ActivityId); commonResponse.IsSuccess = true; } catch (Exception ex) @@ -146,7 +146,7 @@ namespace Pole.Sagas.Server.Services CommonResponse commonResponse = new CommonResponse(); try { - await sagaStorage.ActivityCompensating(request.ActivityId); + await sagaStorage.ActivityCompensating(request.ActivityId, request.CompensateTimes); commonResponse.IsSuccess = true; } catch (Exception ex) diff --git a/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorage.cs b/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorage.cs index 90d3fbe..5e83ed7 100644 --- a/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorage.cs +++ b/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorage.cs @@ -67,12 +67,12 @@ $"UPDATE {activityTableName} SET \"Status\"=@Status WHERE \"Id\" = @Id"; } } - public async Task ActivityExecuted(string activityId, byte[] resultData) + public async Task ActivityExecuted(string activityId) { using (var connection = new NpgsqlConnection(poleSagasStoragePostgreSqlOption.ConnectionString)) { var updateActivitySql = -$"UPDATE {activityTableName} SET \"Status\"=@Status \"ResultData\"=@ResultData WHERE \"Id\" = @Id"; +$"UPDATE {activityTableName} SET \"Status\"=@Status WHERE \"Id\" = @Id"; await connection.ExecuteAsync(updateActivitySql, new { Id = activityId, @@ -141,7 +141,6 @@ $"UPDATE {activityTableName} SET \"Status\"=@Status WHERE \"Id\" = @Id"; ExecuteTimes = executeTimes }); } - } } @@ -191,16 +190,17 @@ $"INSERT INTO {sagaTableName} (\"Id\",\"ServiceName\",\"Status\",\"AddTime\")" + } } - public async Task ActivityCompensating(string activityId) + public async Task ActivityCompensating(string activityId, int compensateTimes) { using (var connection = new NpgsqlConnection(poleSagasStoragePostgreSqlOption.ConnectionString)) { var updateActivitySql = -$"UPDATE {activityTableName} SET \"Status\"=@Status WHERE \"Id\" = @Id"; +$"UPDATE {activityTableName} SET \"Status\"=@Status ,\"CompensateTimes\"=@CompensateTimes WHERE \"Id\" = @Id"; await connection.ExecuteAsync(updateActivitySql, new { Id = activityId, - Status = nameof(ActivityStatus.Compensating) + Status = nameof(ActivityStatus.Compensating), + CompensateTimes= compensateTimes, }); } } diff --git a/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorageInitializer.cs b/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorageInitializer.cs index c608c0f..7236b40 100644 --- a/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorageInitializer.cs +++ b/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorageInitializer.cs @@ -67,8 +67,7 @@ CREATE TABLE IF NOT EXISTS {GetActivityTableName()}( ""Status"" varchar(10) COLLATE ""pg_catalog"".""default"" NOT NULL, ""ExecuteTimes"" int4 NOT NULL, ""ParameterData"" bytea NOT NULL, - ""ResultData"" bytea, - ""Errors"" varchar(1024) COLLATE ""pg_catalog"".""default"", + ""CompensateErrors"" varchar(1024) COLLATE ""pg_catalog"".""default"", ""CompensateTimes"" int4 NOT NULL, ""AddTime"" timestamp NOT NULL ) diff --git a/src/Pole.Sagas/Core/Abstraction/IEventSender.cs b/src/Pole.Sagas/Core/Abstraction/IEventSender.cs index 4d29c96..4b059fc 100644 --- a/src/Pole.Sagas/Core/Abstraction/IEventSender.cs +++ b/src/Pole.Sagas/Core/Abstraction/IEventSender.cs @@ -10,10 +10,10 @@ namespace Pole.Sagas.Core.Abstraction Task ActivityExecuting(string activityId,string activityName, string sagaId, byte[] parameterData, int order, DateTime addTime,int executeTimes); Task ActivityExecuteAborted(string activityId); Task ActivityCompensateAborted(string activityId, string sagaId, string errors); - Task ActivityExecuted(string activityId,byte[] resultData); + Task ActivityExecuted(string activityId); Task ActivityCompensated(string activityId); Task ActivityExecuteOvertime(string activityId); Task ActivityRevoked(string activityId); - Task ActivityCompensating(string activityId); + Task ActivityCompensating(string activityId,int compensateTimes); } } diff --git a/src/Pole.Sagas/Core/EventSender.cs b/src/Pole.Sagas/Core/EventSender.cs index 4de898a..bdc8a84 100644 --- a/src/Pole.Sagas/Core/EventSender.cs +++ b/src/Pole.Sagas/Core/EventSender.cs @@ -43,12 +43,11 @@ namespace Pole.Sagas.Core } } - public async Task ActivityExecuted(string activityId, byte[] resultData) + public async Task ActivityExecuted(string activityId) { var result = await sagaClient.ActivityExecutedAsync(new Server.Grpc.ActivityExecutedRequest { ActivityId = activityId, - ResultData = Google.Protobuf.ByteString.CopyFrom(resultData), }); if (!result.IsSuccess) { @@ -136,7 +135,7 @@ namespace Pole.Sagas.Core } } - public async Task ActivityCompensating(string activityId) + public async Task ActivityCompensating(string activityId, int compensateTimes) { var result = await sagaClient.ActivityCompensatingAsync(new Server.Grpc.ActivityCompensatingRequest { diff --git a/src/Pole.Sagas/Core/ISagaStorage.cs b/src/Pole.Sagas/Core/ISagaStorage.cs index f2730cb..1c090ca 100644 --- a/src/Pole.Sagas/Core/ISagaStorage.cs +++ b/src/Pole.Sagas/Core/ISagaStorage.cs @@ -13,10 +13,10 @@ namespace Pole.Sagas.Core Task ActivityExecuting(string activityId, string activityName,string sagaId, byte[] ParameterData, int order,DateTime addTime,int executeTimes); Task ActivityExecuteAborted(string activityId); Task ActivityCompensateAborted(string activityId, string sagaId, string errors); - Task ActivityExecuted(string activityId, byte[] resultData); + Task ActivityExecuted(string activityId); Task ActivityCompensated(string activityId); Task ActivityExecuteOvertime(string activityId); Task ActivityRevoked(string activityId); - Task ActivityCompensating(string activityId); + Task ActivityCompensating(string activityId, int compensateTimes); } } diff --git a/src/Pole.Sagas/Core/Saga.cs b/src/Pole.Sagas/Core/Saga.cs index fe41b55..9815dbf 100644 --- a/src/Pole.Sagas/Core/Saga.cs +++ b/src/Pole.Sagas/Core/Saga.cs @@ -120,7 +120,7 @@ namespace Pole.Sagas.Core var activityId = activityWapper.Id; try { - await eventSender.ActivityCompensating(activityId); + await eventSender.ActivityCompensating(activityId, activityWapper.CompensateTimes); await activityWapper.InvokeCompensate(); await eventSender.ActivityCompensated(activityId); var compensateActivity = GetNextCompensateActivity(); @@ -152,7 +152,7 @@ namespace Pole.Sagas.Core await CompensateActivity(result, currentExecuteOrder); return result; } - await eventSender.ActivityExecuted(activityId, Encoding.UTF8.GetBytes(string.Empty)); + await eventSender.ActivityExecuted(activityId); var executeActivity = GetNextExecuteActivity(); if (executeActivity == null) { diff --git a/src/Pole.Sagas/Protos/saga.proto b/src/Pole.Sagas/Protos/saga.proto index 46350e1..1d9077a 100644 --- a/src/Pole.Sagas/Protos/saga.proto +++ b/src/Pole.Sagas/Protos/saga.proto @@ -50,7 +50,6 @@ message ActivityCompensateAbortedRequest { } message ActivityExecutedRequest { string activityId = 1; - bytes resultData = 2; } message ActivityCompensatedRequest { string activityId = 1; @@ -63,6 +62,7 @@ message ActivityRevokedRequest { } message ActivityCompensatingRequest { string activityId = 1; + int32 CompensateTimes = 2; }