Commit ede3fb7e by dingsongjie

完善接口

parent 6c72dd71
......@@ -43,7 +43,7 @@ namespace Pole.Sagas.Server.Services
}
return commonResponse;
}
public override async Task<CommonResponse> ActivityEnded(ActivityEndedRequest request, ServerCallContext context)
public override async Task<CommonResponse> ActivityExecuted(ActivityExecutedRequest request, ServerCallContext context)
{
CommonResponse commonResponse = new CommonResponse();
try
......@@ -85,7 +85,7 @@ namespace Pole.Sagas.Server.Services
}
return commonResponse;
}
public override async Task<CommonResponse> ActivityExecuteStarted(ActivityExecuteStartedRequest request, ServerCallContext context)
public override async Task<CommonResponse> ActivityExecuting(ActivityExecutingRequest request, ServerCallContext context)
{
CommonResponse commonResponse = new CommonResponse();
try
......
......@@ -14,8 +14,8 @@ namespace Pole.Sagas.Storage.PostgreSql
{
private readonly string sagaTableName;
private readonly string activityTableName;
private readonly PoleSagasStoragePostgreSqlOption poleSagasStoragePostgreSqlOption;
private readonly ISagaStorageInitializer sagaStorageInitializer;
private readonly PoleSagasStoragePostgreSqlOption poleSagasStoragePostgreSqlOption;
private readonly ISagaStorageInitializer sagaStorageInitializer;
public PostgreSqlSagaStorage(IOptions<PoleSagasStoragePostgreSqlOption> poleSagasStoragePostgreSqlOption, ISagaStorageInitializer sagaStorageInitializer)
{
this.poleSagasStoragePostgreSqlOption = poleSagasStoragePostgreSqlOption.Value;
......@@ -24,35 +24,47 @@ namespace Pole.Sagas.Storage.PostgreSql
activityTableName = sagaStorageInitializer.GetActivityTableName();
}
public async Task ActivityCompensateAborted(string activityId, string sagaId, string errors)
{
{
using (var connection = new NpgsqlConnection(poleSagasStoragePostgreSqlOption.ConnectionString))
{
using(var tansaction = await connection.BeginTransactionAsync())
using (var tansaction = await connection.BeginTransactionAsync())
{
var updateActivitySql =
$"UPDATE {activityTableName} SET \"Status\"=@Status,\"Errors\"=@Errors WHERE \"Id\" = @Id";
await connection.ExecuteAsync(updateActivitySql, new
{
Id = activityId,
Errors= errors,
Status= nameof(ActivityStatus.CompensateAborted)
}, tansaction);
var updateSagaSql =
$"UPDATE {sagaTableName} SET \"Status\"=@Status,\"Errors\"=@Errors WHERE \"Id\" = @Id";
await connection.ExecuteAsync(updateSagaSql, new
await connection.ExecuteAsync(updateActivitySql, new
{
Id = activityId,
Errors = errors,
Status = nameof(ActivityStatus.CompensateAborted)
}, tansaction);
if (!string.IsNullOrEmpty(sagaId))
{
var updateSagaSql =
$"UPDATE {sagaTableName} SET \"Status\"=@Status,\"Errors\"=@Errors WHERE \"Id\" = @Id";
await connection.ExecuteAsync(updateSagaSql, new
{
Id = sagaId,
Status = nameof(SagaStatus.Error)
}, tansaction);
}
await tansaction.CommitAsync();
}
}
}
public Task ActivityCompensated(string activityId)
public async Task ActivityCompensated(string activityId)
{
throw new NotImplementedException();
using (var connection = new NpgsqlConnection(poleSagasStoragePostgreSqlOption.ConnectionString))
{
var updateActivitySql =
$"UPDATE {activityTableName} SET \"Status\"=@Status WHERE \"Id\" = @Id";
await connection.ExecuteAsync(updateActivitySql, new
{
Id = activityId,
Status = nameof(ActivityStatus.Compensated)
});
}
}
public Task ActivityEnded(string activityId, byte[] resultData)
......
......@@ -7,13 +7,14 @@ namespace Pole.Sagas.Core.Abstraction
{
Task SagaStarted(string sagaId, string serviceName, DateTime addTime);
Task SagaEnded(string sagaId, DateTime ExpiresAt);
Task ActivityExecuteStarted(string activityId, string sagaId, int timeOutSeconds, byte[] parameterData, int order, DateTime addTime);
Task ActivityExecuting(string activityId, string sagaId, int timeOutSeconds, byte[] parameterData, int order, DateTime addTime);
Task ActivityRetried(string activityId, string status, int retries, ActivityRetryType retryType);
Task ActivityExecuteAborted(string activityId, string errors);
Task ActivityCompensateAborted(string activityId, string sagaId, string errors);
Task ActivityEnded(string activityId,byte[] resultData);
Task ActivityExecuted(string activityId,byte[] resultData);
Task ActivityCompensated(string activityId);
Task ActivityExecuteOvertime(string activityId, string sagaId, string errors);
Task ActivityRevoked(string activityId);
Task ActivityCompensating(string activityId);
}
}
......@@ -43,9 +43,9 @@ namespace Pole.Sagas.Core
}
}
public async Task ActivityEnded(string activityId, byte[] resultData)
public async Task ActivityExecuted(string activityId, byte[] resultData)
{
var result = await sagaClient.ActivityEndedAsync(new Server.Grpc.ActivityEndedRequest
var result = await sagaClient.ActivityExecutedAsync(new Server.Grpc.ActivityExecutedRequest
{
ActivityId = activityId,
ResultData = Google.Protobuf.ByteString.CopyFrom(resultData),
......@@ -84,9 +84,9 @@ namespace Pole.Sagas.Core
}
}
public async Task ActivityExecuteStarted(string activityId, string sagaId, int timeoutSeconds, byte[] parameterData, int order, DateTime addTime)
public async Task ActivityExecuting(string activityId, string sagaId, int timeoutSeconds, byte[] parameterData, int order, DateTime addTime)
{
var result = await sagaClient.ActivityExecuteStartedAsync(new Server.Grpc.ActivityExecuteStartedRequest
var result = await sagaClient.ActivityExecutingAsync(new Server.Grpc.ActivityExecutingRequest
{
ActivityId = activityId,
AddTime = addTime.ToString("yyyy-MM-dd HH:mm:ss.fff"),
......@@ -153,5 +153,17 @@ namespace Pole.Sagas.Core
throw new SagasServerException(result.Errors);
}
}
public async Task ActivityCompensating(string activityId)
{
var result = await sagaClient.ActivityCompensatingAsync(new Server.Grpc.ActivityCompensatingRequest
{
ActivityId = activityId,
});
if (!result.IsSuccess)
{
throw new SagasServerException(result.Errors);
}
}
}
}
......@@ -119,6 +119,7 @@ namespace Pole.Sagas.Core
var activityId = activityWapper.Id;
try
{
await eventSender.ActivityCompensating(activityId);
await activityWapper.InvokeCompensate();
await eventSender.ActivityCompensated(activityId);
var compensateActivity = GetNextCompensateActivity();
......@@ -141,7 +142,7 @@ namespace Pole.Sagas.Core
try
{
var bytesContent = serializer.SerializeToUtf8Bytes(activityWapper.DataObj, activityWapper.ActivityDataType);
await eventSender.ActivityExecuteStarted(activityId, Id, activityWapper.TimeOutSeconds, bytesContent, activityWapper.Order,DateTime.UtcNow);
await eventSender.ActivityExecuting(activityId, Id, activityWapper.TimeOutSeconds, bytesContent, activityWapper.Order,DateTime.UtcNow);
var result = await activityWapper.InvokeExecute();
if (!result.IsSuccess)
{
......@@ -149,7 +150,7 @@ namespace Pole.Sagas.Core
await CompensateActivity(result,currentExecuteOrder);
return result;
}
await eventSender.ActivityEnded(activityId, Encoding.UTF8.GetBytes(string.Empty));
await eventSender.ActivityExecuted(activityId, Encoding.UTF8.GetBytes(string.Empty));
var executeActivity = GetNextExecuteActivity();
if (executeActivity == null)
{
......
......@@ -4,7 +4,7 @@ using System.Text;
namespace Pole.Sagas.Core
{
enum SagaStatus
public enum SagaStatus
{
Started,
Ended,
......
......@@ -7,14 +7,15 @@ package pole.Sagas.Server.Grpc;
service Saga {
rpc SagaStarted (SagaStartedRequest) returns (CommonResponse);
rpc SagaEnded (SagaEndedRequest) returns (CommonResponse);
rpc ActivityExecuteStarted (ActivityExecuteStartedRequest) returns (CommonResponse);
rpc ActivityExecuting (ActivityExecutingRequest) returns (CommonResponse);
rpc ActivityRetried (ActivityRetriedRequest) returns (CommonResponse);
rpc ActivityExecuteAborted (ActivityExecuteAbortedRequest) returns (CommonResponse);
rpc ActivityCompensateAborted (ActivityCompensateAbortedRequest) returns (CommonResponse);
rpc ActivityEnded (ActivityEndedRequest) returns (CommonResponse);
rpc ActivityExecuted (ActivityExecutedRequest) returns (CommonResponse);
rpc ActivityCompensated (ActivityCompensatedRequest) returns (CommonResponse);
rpc ActivityExecuteOvertime (ActivityExecuteOvertimeRequest) returns (CommonResponse);
rpc ActivityRevoked (ActivityRevokedRequest) returns (CommonResponse);
rpc ActivityCompensating (ActivityCompensatingRequest) returns (CommonResponse);
}
message CommonResponse{
......@@ -31,7 +32,7 @@ message SagaEndedRequest {
string sagaId = 1;
string ExpiresAt = 2;
}
message ActivityExecuteStartedRequest {
message ActivityExecutingRequest {
string activityId = 1;
string sagaId = 2;
int32 timeOutSeconds = 3;
......@@ -58,7 +59,7 @@ message ActivityCompensateAbortedRequest {
string sagaId = 2;
string errors = 3;
}
message ActivityEndedRequest {
message ActivityExecutedRequest {
string activityId = 1;
bytes resultData = 2;
}
......@@ -73,5 +74,8 @@ message ActivityExecuteOvertimeRequest {
message ActivityRevokedRequest {
string activityId = 1;
}
message ActivityCompensatingRequest {
string activityId = 1;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment