diff --git a/src/Pole.Sagas.Server/Services/SagaService.cs b/src/Pole.Sagas.Server/Services/SagaService.cs index 00efe20..9a71d2e 100644 --- a/src/Pole.Sagas.Server/Services/SagaService.cs +++ b/src/Pole.Sagas.Server/Services/SagaService.cs @@ -48,7 +48,7 @@ namespace Pole.Sagas.Server.Services CommonResponse commonResponse = new CommonResponse(); try { - await sagaStorage.ActivityEnded(request.ActivityId, request.ResultData.ToByteArray()); + await sagaStorage.ActivityExecuted(request.ActivityId, request.ResultData.ToByteArray()); commonResponse.IsSuccess = true; } catch (Exception ex) @@ -62,7 +62,7 @@ namespace Pole.Sagas.Server.Services CommonResponse commonResponse = new CommonResponse(); try { - await sagaStorage.ActivityExecuteAborted(request.ActivityId, request.Errors); + await sagaStorage.ActivityExecuteAborted(request.ActivityId); commonResponse.IsSuccess = true; } catch (Exception ex) @@ -76,7 +76,7 @@ namespace Pole.Sagas.Server.Services CommonResponse commonResponse = new CommonResponse(); try { - await sagaStorage.ActivityExecuteOvertime(request.ActivityId, request.SagaId, request.Errors); + await sagaStorage.ActivityExecuteOvertime(request.ActivityId); commonResponse.IsSuccess = true; } catch (Exception ex) @@ -90,7 +90,7 @@ namespace Pole.Sagas.Server.Services CommonResponse commonResponse = new CommonResponse(); try { - await sagaStorage.ActivityExecuteStarted(request.ActivityId, request.SagaId, request.TimeOutSeconds, request.ParameterData.ToByteArray(), request.Order, Convert.ToDateTime(request.AddTime)); + await sagaStorage.ActivityExecuting(request.ActivityId, request.SagaId, request.ParameterData.ToByteArray(), request.Order, Convert.ToDateTime(request.AddTime)); commonResponse.IsSuccess = true; } catch (Exception ex) @@ -156,6 +156,20 @@ namespace Pole.Sagas.Server.Services } return commonResponse; } + public override async Task ActivityCompensating(ActivityCompensatingRequest request, ServerCallContext context) + { + CommonResponse commonResponse = new CommonResponse(); + try + { + await sagaStorage.ActivityCompensating(request.ActivityId); + commonResponse.IsSuccess = true; + } + catch (Exception ex) + { + commonResponse.Errors = CombineError(ex); + } + return commonResponse; + } private string CombineError(Exception exception) { return exception.InnerException != null ? exception.InnerException.Message + exception.StackTrace : exception.Message + exception.StackTrace; diff --git a/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorage.cs b/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorage.cs index b1dcdff..40666f4 100644 --- a/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorage.cs +++ b/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorage.cs @@ -67,22 +67,49 @@ $"UPDATE {activityTableName} SET \"Status\"=@Status WHERE \"Id\" = @Id"; } } - public Task ActivityEnded(string activityId, byte[] resultData) + public async Task ActivityExecuted(string activityId, byte[] resultData) { - throw new NotImplementedException(); + using (var connection = new NpgsqlConnection(poleSagasStoragePostgreSqlOption.ConnectionString)) + { + var updateActivitySql = +$"UPDATE {activityTableName} SET \"Status\"=@Status \"ResultData\"=@ResultData WHERE \"Id\" = @Id"; + await connection.ExecuteAsync(updateActivitySql, new + { + Id = activityId, + Status = nameof(ActivityStatus.Executed) + }); + } } - public Task ActivityExecuteAborted(string activityId, string errors) + public async Task ActivityExecuteAborted(string activityId) { - throw new NotImplementedException(); + using (var connection = new NpgsqlConnection(poleSagasStoragePostgreSqlOption.ConnectionString)) + { + var updateActivitySql = +$"UPDATE {activityTableName} SET \"Status\"=@Status WHERE \"Id\" = @Id"; + await connection.ExecuteAsync(updateActivitySql, new + { + Id = activityId, + Status = nameof(ActivityStatus.ExecuteAborted) + }); + } } - public Task ActivityExecuteOvertime(string activityId, string sagaId, string errors) + public async Task ActivityExecuteOvertime(string activityId) { - throw new NotImplementedException(); + using (var connection = new NpgsqlConnection(poleSagasStoragePostgreSqlOption.ConnectionString)) + { + var updateActivitySql = +$"UPDATE {activityTableName} SET \"Status\"=@Status WHERE \"Id\" = @Id"; + await connection.ExecuteAsync(updateActivitySql, new + { + Id = activityId, + Status = nameof(ActivityStatus.ExecutingOvertime) + }); + } } - public Task ActivityExecuteStarted(string activityId, string sagaId, int timeOutSeconds, byte[] ParameterData, int order, DateTime addTime) + public Task ActivityExecuting(string activityId, string sagaId, byte[] ParameterData, int order, DateTime addTime) { throw new NotImplementedException(); } @@ -106,5 +133,10 @@ $"UPDATE {activityTableName} SET \"Status\"=@Status WHERE \"Id\" = @Id"; { throw new NotImplementedException(); } + + public Task ActivityCompensating(string activityId) + { + throw new NotImplementedException(); + } } } diff --git a/src/Pole.Sagas/Core/Abstraction/IEventSender.cs b/src/Pole.Sagas/Core/Abstraction/IEventSender.cs index 3b92f5a..3ac7806 100644 --- a/src/Pole.Sagas/Core/Abstraction/IEventSender.cs +++ b/src/Pole.Sagas/Core/Abstraction/IEventSender.cs @@ -7,13 +7,13 @@ namespace Pole.Sagas.Core.Abstraction { Task SagaStarted(string sagaId, string serviceName, DateTime addTime); Task SagaEnded(string sagaId, DateTime ExpiresAt); - Task ActivityExecuting(string activityId, string sagaId, int timeOutSeconds, byte[] parameterData, int order, DateTime addTime); + Task ActivityExecuting(string activityId, string sagaId, byte[] parameterData, int order, DateTime addTime); Task ActivityRetried(string activityId, string status, int retries, ActivityRetryType retryType); - Task ActivityExecuteAborted(string activityId, string errors); + Task ActivityExecuteAborted(string activityId); Task ActivityCompensateAborted(string activityId, string sagaId, string errors); Task ActivityExecuted(string activityId,byte[] resultData); Task ActivityCompensated(string activityId); - Task ActivityExecuteOvertime(string activityId, string sagaId, string errors); + Task ActivityExecuteOvertime(string activityId); Task ActivityRevoked(string activityId); Task ActivityCompensating(string activityId); } diff --git a/src/Pole.Sagas/Core/ActivityStatus.cs b/src/Pole.Sagas/Core/ActivityStatus.cs index e7ed9fb..a5c8949 100644 --- a/src/Pole.Sagas/Core/ActivityStatus.cs +++ b/src/Pole.Sagas/Core/ActivityStatus.cs @@ -14,6 +14,6 @@ namespace Pole.Sagas.Core ExecuteAborted, Revoked, CompensateAborted, - Overtime + ExecutingOvertime } } diff --git a/src/Pole.Sagas/Core/EventSender.cs b/src/Pole.Sagas/Core/EventSender.cs index 51be7e4..003c29e 100644 --- a/src/Pole.Sagas/Core/EventSender.cs +++ b/src/Pole.Sagas/Core/EventSender.cs @@ -56,12 +56,11 @@ namespace Pole.Sagas.Core } } - public async Task ActivityExecuteAborted(string activityId, string errors) + public async Task ActivityExecuteAborted(string activityId) { var result = await sagaClient.ActivityExecuteAbortedAsync(new Server.Grpc.ActivityExecuteAbortedRequest { - ActivityId = activityId, - Errors = errors + ActivityId = activityId }); if (!result.IsSuccess) { @@ -84,7 +83,7 @@ namespace Pole.Sagas.Core } } - public async Task ActivityExecuting(string activityId, string sagaId, int timeoutSeconds, byte[] parameterData, int order, DateTime addTime) + public async Task ActivityExecuting(string activityId, string sagaId, byte[] parameterData, int order, DateTime addTime) { var result = await sagaClient.ActivityExecutingAsync(new Server.Grpc.ActivityExecutingRequest { @@ -93,7 +92,6 @@ namespace Pole.Sagas.Core Order = order, ParameterData = Google.Protobuf.ByteString.CopyFrom(parameterData), SagaId = sagaId, - TimeOutSeconds = timeoutSeconds }); if (!result.IsSuccess) { @@ -128,13 +126,11 @@ namespace Pole.Sagas.Core } } - public async Task ActivityExecuteOvertime(string activityId, string sagaId, string errors) + public async Task ActivityExecuteOvertime(string activityId) { var result = await sagaClient.ActivityExecuteOvertimeAsync(new Server.Grpc.ActivityExecuteOvertimeRequest { - SagaId = sagaId, ActivityId = activityId, - Errors = errors }); if (!result.IsSuccess) { diff --git a/src/Pole.Sagas/Core/ISagaStorage.cs b/src/Pole.Sagas/Core/ISagaStorage.cs index ef85923..799a6e1 100644 --- a/src/Pole.Sagas/Core/ISagaStorage.cs +++ b/src/Pole.Sagas/Core/ISagaStorage.cs @@ -10,13 +10,14 @@ namespace Pole.Sagas.Core { Task SagaStarted(string sagaId, string serviceName,DateTime addTime); Task SagaEnded(string sagaId, DateTime ExpiresAt); - Task ActivityExecuteStarted(string activityId, string sagaId, int timeOutSeconds, byte[] ParameterData, int order,DateTime addTime); + Task ActivityExecuting(string activityId, string sagaId, byte[] ParameterData, int order,DateTime addTime); Task ActivityRetried(string activityId, string status, int retries, ActivityRetryType retryType); - Task ActivityExecuteAborted(string activityId, string errors); + Task ActivityExecuteAborted(string activityId); Task ActivityCompensateAborted(string activityId, string sagaId, string errors); - Task ActivityEnded(string activityId, byte[] resultData); + Task ActivityExecuted(string activityId, byte[] resultData); Task ActivityCompensated(string activityId); - Task ActivityExecuteOvertime(string activityId, string sagaId, string errors); + Task ActivityExecuteOvertime(string activityId); Task ActivityRevoked(string activityId); + Task ActivityCompensating(string activityId); } } diff --git a/src/Pole.Sagas/Core/Saga.cs b/src/Pole.Sagas/Core/Saga.cs index 20e0ec4..4af2fa4 100644 --- a/src/Pole.Sagas/Core/Saga.cs +++ b/src/Pole.Sagas/Core/Saga.cs @@ -142,7 +142,7 @@ namespace Pole.Sagas.Core try { var bytesContent = serializer.SerializeToUtf8Bytes(activityWapper.DataObj, activityWapper.ActivityDataType); - await eventSender.ActivityExecuting(activityId, Id, activityWapper.TimeOutSeconds, bytesContent, activityWapper.Order,DateTime.UtcNow); + await eventSender.ActivityExecuting(activityId, Id, bytesContent, activityWapper.Order,DateTime.UtcNow); var result = await activityWapper.InvokeExecute(); if (!result.IsSuccess) { @@ -171,7 +171,7 @@ namespace Pole.Sagas.Core IsSuccess = false, Errors = errors }; - await eventSender.ActivityExecuteOvertime(activityId, Id, errors); + await eventSender.ActivityExecuteOvertime(activityId); // 超时的时候 需要首先补偿这个超时的操作 return await CompensateActivity(result,currentExecuteOrder+1); } @@ -183,7 +183,7 @@ namespace Pole.Sagas.Core IsSuccess = false, Errors = errors }; - await eventSender.ActivityExecuteAborted(activityId, errors); + await eventSender.ActivityExecuteAborted(activityId); // 出错的时候 需要首先补偿这个出错的操作 return await CompensateActivity(result, currentExecuteOrder + 1); } diff --git a/src/Pole.Sagas/Core/SagaStatus.cs b/src/Pole.Sagas/Core/SagaStatus.cs index 6738cac..10d90ee 100644 --- a/src/Pole.Sagas/Core/SagaStatus.cs +++ b/src/Pole.Sagas/Core/SagaStatus.cs @@ -8,7 +8,6 @@ namespace Pole.Sagas.Core { Started, Ended, - Error, - Overtime + Error } } diff --git a/src/Pole.Sagas/Protos/saga.proto b/src/Pole.Sagas/Protos/saga.proto index c39e329..b4eb80c 100644 --- a/src/Pole.Sagas/Protos/saga.proto +++ b/src/Pole.Sagas/Protos/saga.proto @@ -35,10 +35,9 @@ message SagaEndedRequest { message ActivityExecutingRequest { string activityId = 1; string sagaId = 2; - int32 timeOutSeconds = 3; - bytes parameterData = 4; - int32 order = 5; - string addTime = 6; + bytes parameterData = 3; + int32 order = 4; + string addTime = 5; } message ActivityRetriedRequest { string activityId = 1; @@ -52,7 +51,6 @@ message ActivityRetriedRequest { } message ActivityExecuteAbortedRequest { string activityId = 1; - string errors = 2; } message ActivityCompensateAbortedRequest { string activityId = 1; @@ -68,8 +66,6 @@ message ActivityCompensatedRequest { } message ActivityExecuteOvertimeRequest { string activityId = 1; - string sagaId = 2; - string errors = 3; } message ActivityRevokedRequest { string activityId = 1;