From ede3fb7e653dfb94643093c86e63bac5194ef3f7 Mon Sep 17 00:00:00 2001 From: dingsongjie Date: Tue, 10 Mar 2020 10:15:43 +0800 Subject: [PATCH] 完善接口 --- src/Pole.Sagas.Server/Services/SagaService.cs | 4 ++-- src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorage.cs | 44 ++++++++++++++++++++++++++++---------------- src/Pole.Sagas/Core/Abstraction/IEventSender.cs | 5 +++-- src/Pole.Sagas/Core/EventSender.cs | 20 ++++++++++++++++---- src/Pole.Sagas/Core/Saga.cs | 5 +++-- src/Pole.Sagas/Core/SagaStatus.cs | 2 +- src/Pole.Sagas/Protos/saga.proto | 12 ++++++++---- 7 files changed, 61 insertions(+), 31 deletions(-) diff --git a/src/Pole.Sagas.Server/Services/SagaService.cs b/src/Pole.Sagas.Server/Services/SagaService.cs index 98f795a..00efe20 100644 --- a/src/Pole.Sagas.Server/Services/SagaService.cs +++ b/src/Pole.Sagas.Server/Services/SagaService.cs @@ -43,7 +43,7 @@ namespace Pole.Sagas.Server.Services } return commonResponse; } - public override async Task ActivityEnded(ActivityEndedRequest request, ServerCallContext context) + public override async Task ActivityExecuted(ActivityExecutedRequest request, ServerCallContext context) { CommonResponse commonResponse = new CommonResponse(); try @@ -85,7 +85,7 @@ namespace Pole.Sagas.Server.Services } return commonResponse; } - public override async Task ActivityExecuteStarted(ActivityExecuteStartedRequest request, ServerCallContext context) + public override async Task ActivityExecuting(ActivityExecutingRequest request, ServerCallContext context) { CommonResponse commonResponse = new CommonResponse(); try diff --git a/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorage.cs b/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorage.cs index 173ee37..b1dcdff 100644 --- a/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorage.cs +++ b/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorage.cs @@ -14,8 +14,8 @@ namespace Pole.Sagas.Storage.PostgreSql { private readonly string sagaTableName; private readonly string activityTableName; - private readonly PoleSagasStoragePostgreSqlOption poleSagasStoragePostgreSqlOption; - private readonly ISagaStorageInitializer sagaStorageInitializer; + private readonly PoleSagasStoragePostgreSqlOption poleSagasStoragePostgreSqlOption; + private readonly ISagaStorageInitializer sagaStorageInitializer; public PostgreSqlSagaStorage(IOptions poleSagasStoragePostgreSqlOption, ISagaStorageInitializer sagaStorageInitializer) { this.poleSagasStoragePostgreSqlOption = poleSagasStoragePostgreSqlOption.Value; @@ -24,35 +24,47 @@ namespace Pole.Sagas.Storage.PostgreSql activityTableName = sagaStorageInitializer.GetActivityTableName(); } public async Task ActivityCompensateAborted(string activityId, string sagaId, string errors) - { + { using (var connection = new NpgsqlConnection(poleSagasStoragePostgreSqlOption.ConnectionString)) { - using(var tansaction = await connection.BeginTransactionAsync()) + using (var tansaction = await connection.BeginTransactionAsync()) { var updateActivitySql = $"UPDATE {activityTableName} SET \"Status\"=@Status,\"Errors\"=@Errors WHERE \"Id\" = @Id"; - await connection.ExecuteAsync(updateActivitySql, new - { - Id = activityId, - Errors= errors, - Status= nameof(ActivityStatus.CompensateAborted) - }, tansaction); - - var updateSagaSql = -$"UPDATE {sagaTableName} SET \"Status\"=@Status,\"Errors\"=@Errors WHERE \"Id\" = @Id"; - await connection.ExecuteAsync(updateSagaSql, new + await connection.ExecuteAsync(updateActivitySql, new { Id = activityId, + Errors = errors, Status = nameof(ActivityStatus.CompensateAborted) }, tansaction); + if (!string.IsNullOrEmpty(sagaId)) + { + var updateSagaSql = +$"UPDATE {sagaTableName} SET \"Status\"=@Status,\"Errors\"=@Errors WHERE \"Id\" = @Id"; + await connection.ExecuteAsync(updateSagaSql, new + { + Id = sagaId, + Status = nameof(SagaStatus.Error) + }, tansaction); + } + await tansaction.CommitAsync(); } } } - public Task ActivityCompensated(string activityId) + public async Task ActivityCompensated(string activityId) { - throw new NotImplementedException(); + using (var connection = new NpgsqlConnection(poleSagasStoragePostgreSqlOption.ConnectionString)) + { + var updateActivitySql = +$"UPDATE {activityTableName} SET \"Status\"=@Status WHERE \"Id\" = @Id"; + await connection.ExecuteAsync(updateActivitySql, new + { + Id = activityId, + Status = nameof(ActivityStatus.Compensated) + }); + } } public Task ActivityEnded(string activityId, byte[] resultData) diff --git a/src/Pole.Sagas/Core/Abstraction/IEventSender.cs b/src/Pole.Sagas/Core/Abstraction/IEventSender.cs index fcfa5f8..3b92f5a 100644 --- a/src/Pole.Sagas/Core/Abstraction/IEventSender.cs +++ b/src/Pole.Sagas/Core/Abstraction/IEventSender.cs @@ -7,13 +7,14 @@ namespace Pole.Sagas.Core.Abstraction { Task SagaStarted(string sagaId, string serviceName, DateTime addTime); Task SagaEnded(string sagaId, DateTime ExpiresAt); - Task ActivityExecuteStarted(string activityId, string sagaId, int timeOutSeconds, byte[] parameterData, int order, DateTime addTime); + Task ActivityExecuting(string activityId, string sagaId, int timeOutSeconds, byte[] parameterData, int order, DateTime addTime); Task ActivityRetried(string activityId, string status, int retries, ActivityRetryType retryType); Task ActivityExecuteAborted(string activityId, string errors); Task ActivityCompensateAborted(string activityId, string sagaId, string errors); - Task ActivityEnded(string activityId,byte[] resultData); + Task ActivityExecuted(string activityId,byte[] resultData); Task ActivityCompensated(string activityId); Task ActivityExecuteOvertime(string activityId, string sagaId, string errors); Task ActivityRevoked(string activityId); + Task ActivityCompensating(string activityId); } } diff --git a/src/Pole.Sagas/Core/EventSender.cs b/src/Pole.Sagas/Core/EventSender.cs index 2fae955..51be7e4 100644 --- a/src/Pole.Sagas/Core/EventSender.cs +++ b/src/Pole.Sagas/Core/EventSender.cs @@ -43,9 +43,9 @@ namespace Pole.Sagas.Core } } - public async Task ActivityEnded(string activityId, byte[] resultData) + public async Task ActivityExecuted(string activityId, byte[] resultData) { - var result = await sagaClient.ActivityEndedAsync(new Server.Grpc.ActivityEndedRequest + var result = await sagaClient.ActivityExecutedAsync(new Server.Grpc.ActivityExecutedRequest { ActivityId = activityId, ResultData = Google.Protobuf.ByteString.CopyFrom(resultData), @@ -84,9 +84,9 @@ namespace Pole.Sagas.Core } } - public async Task ActivityExecuteStarted(string activityId, string sagaId, int timeoutSeconds, byte[] parameterData, int order, DateTime addTime) + public async Task ActivityExecuting(string activityId, string sagaId, int timeoutSeconds, byte[] parameterData, int order, DateTime addTime) { - var result = await sagaClient.ActivityExecuteStartedAsync(new Server.Grpc.ActivityExecuteStartedRequest + var result = await sagaClient.ActivityExecutingAsync(new Server.Grpc.ActivityExecutingRequest { ActivityId = activityId, AddTime = addTime.ToString("yyyy-MM-dd HH:mm:ss.fff"), @@ -153,5 +153,17 @@ namespace Pole.Sagas.Core throw new SagasServerException(result.Errors); } } + + public async Task ActivityCompensating(string activityId) + { + var result = await sagaClient.ActivityCompensatingAsync(new Server.Grpc.ActivityCompensatingRequest + { + ActivityId = activityId, + }); + if (!result.IsSuccess) + { + throw new SagasServerException(result.Errors); + } + } } } diff --git a/src/Pole.Sagas/Core/Saga.cs b/src/Pole.Sagas/Core/Saga.cs index 24972c3..20e0ec4 100644 --- a/src/Pole.Sagas/Core/Saga.cs +++ b/src/Pole.Sagas/Core/Saga.cs @@ -119,6 +119,7 @@ namespace Pole.Sagas.Core var activityId = activityWapper.Id; try { + await eventSender.ActivityCompensating(activityId); await activityWapper.InvokeCompensate(); await eventSender.ActivityCompensated(activityId); var compensateActivity = GetNextCompensateActivity(); @@ -141,7 +142,7 @@ namespace Pole.Sagas.Core try { var bytesContent = serializer.SerializeToUtf8Bytes(activityWapper.DataObj, activityWapper.ActivityDataType); - await eventSender.ActivityExecuteStarted(activityId, Id, activityWapper.TimeOutSeconds, bytesContent, activityWapper.Order,DateTime.UtcNow); + await eventSender.ActivityExecuting(activityId, Id, activityWapper.TimeOutSeconds, bytesContent, activityWapper.Order,DateTime.UtcNow); var result = await activityWapper.InvokeExecute(); if (!result.IsSuccess) { @@ -149,7 +150,7 @@ namespace Pole.Sagas.Core await CompensateActivity(result,currentExecuteOrder); return result; } - await eventSender.ActivityEnded(activityId, Encoding.UTF8.GetBytes(string.Empty)); + await eventSender.ActivityExecuted(activityId, Encoding.UTF8.GetBytes(string.Empty)); var executeActivity = GetNextExecuteActivity(); if (executeActivity == null) { diff --git a/src/Pole.Sagas/Core/SagaStatus.cs b/src/Pole.Sagas/Core/SagaStatus.cs index cf04215..6738cac 100644 --- a/src/Pole.Sagas/Core/SagaStatus.cs +++ b/src/Pole.Sagas/Core/SagaStatus.cs @@ -4,7 +4,7 @@ using System.Text; namespace Pole.Sagas.Core { - enum SagaStatus + public enum SagaStatus { Started, Ended, diff --git a/src/Pole.Sagas/Protos/saga.proto b/src/Pole.Sagas/Protos/saga.proto index 9f744bf..c39e329 100644 --- a/src/Pole.Sagas/Protos/saga.proto +++ b/src/Pole.Sagas/Protos/saga.proto @@ -7,14 +7,15 @@ package pole.Sagas.Server.Grpc; service Saga { rpc SagaStarted (SagaStartedRequest) returns (CommonResponse); rpc SagaEnded (SagaEndedRequest) returns (CommonResponse); - rpc ActivityExecuteStarted (ActivityExecuteStartedRequest) returns (CommonResponse); + rpc ActivityExecuting (ActivityExecutingRequest) returns (CommonResponse); rpc ActivityRetried (ActivityRetriedRequest) returns (CommonResponse); rpc ActivityExecuteAborted (ActivityExecuteAbortedRequest) returns (CommonResponse); rpc ActivityCompensateAborted (ActivityCompensateAbortedRequest) returns (CommonResponse); - rpc ActivityEnded (ActivityEndedRequest) returns (CommonResponse); + rpc ActivityExecuted (ActivityExecutedRequest) returns (CommonResponse); rpc ActivityCompensated (ActivityCompensatedRequest) returns (CommonResponse); rpc ActivityExecuteOvertime (ActivityExecuteOvertimeRequest) returns (CommonResponse); rpc ActivityRevoked (ActivityRevokedRequest) returns (CommonResponse); + rpc ActivityCompensating (ActivityCompensatingRequest) returns (CommonResponse); } message CommonResponse{ @@ -31,7 +32,7 @@ message SagaEndedRequest { string sagaId = 1; string ExpiresAt = 2; } -message ActivityExecuteStartedRequest { +message ActivityExecutingRequest { string activityId = 1; string sagaId = 2; int32 timeOutSeconds = 3; @@ -58,7 +59,7 @@ message ActivityCompensateAbortedRequest { string sagaId = 2; string errors = 3; } -message ActivityEndedRequest { +message ActivityExecutedRequest { string activityId = 1; bytes resultData = 2; } @@ -73,5 +74,8 @@ message ActivityExecuteOvertimeRequest { message ActivityRevokedRequest { string activityId = 1; } +message ActivityCompensatingRequest { + string activityId = 1; +} -- libgit2 0.25.0