Commit b7b7cb3c by dingsongjie

优化

parent ede3fb7e
......@@ -48,7 +48,7 @@ namespace Pole.Sagas.Server.Services
CommonResponse commonResponse = new CommonResponse();
try
{
await sagaStorage.ActivityEnded(request.ActivityId, request.ResultData.ToByteArray());
await sagaStorage.ActivityExecuted(request.ActivityId, request.ResultData.ToByteArray());
commonResponse.IsSuccess = true;
}
catch (Exception ex)
......@@ -62,7 +62,7 @@ namespace Pole.Sagas.Server.Services
CommonResponse commonResponse = new CommonResponse();
try
{
await sagaStorage.ActivityExecuteAborted(request.ActivityId, request.Errors);
await sagaStorage.ActivityExecuteAborted(request.ActivityId);
commonResponse.IsSuccess = true;
}
catch (Exception ex)
......@@ -76,7 +76,7 @@ namespace Pole.Sagas.Server.Services
CommonResponse commonResponse = new CommonResponse();
try
{
await sagaStorage.ActivityExecuteOvertime(request.ActivityId, request.SagaId, request.Errors);
await sagaStorage.ActivityExecuteOvertime(request.ActivityId);
commonResponse.IsSuccess = true;
}
catch (Exception ex)
......@@ -90,7 +90,7 @@ namespace Pole.Sagas.Server.Services
CommonResponse commonResponse = new CommonResponse();
try
{
await sagaStorage.ActivityExecuteStarted(request.ActivityId, request.SagaId, request.TimeOutSeconds, request.ParameterData.ToByteArray(), request.Order, Convert.ToDateTime(request.AddTime));
await sagaStorage.ActivityExecuting(request.ActivityId, request.SagaId, request.ParameterData.ToByteArray(), request.Order, Convert.ToDateTime(request.AddTime));
commonResponse.IsSuccess = true;
}
catch (Exception ex)
......@@ -156,6 +156,20 @@ namespace Pole.Sagas.Server.Services
}
return commonResponse;
}
public override async Task<CommonResponse> ActivityCompensating(ActivityCompensatingRequest request, ServerCallContext context)
{
CommonResponse commonResponse = new CommonResponse();
try
{
await sagaStorage.ActivityCompensating(request.ActivityId);
commonResponse.IsSuccess = true;
}
catch (Exception ex)
{
commonResponse.Errors = CombineError(ex);
}
return commonResponse;
}
private string CombineError(Exception exception)
{
return exception.InnerException != null ? exception.InnerException.Message + exception.StackTrace : exception.Message + exception.StackTrace;
......
......@@ -67,22 +67,49 @@ $"UPDATE {activityTableName} SET \"Status\"=@Status WHERE \"Id\" = @Id";
}
}
public Task ActivityEnded(string activityId, byte[] resultData)
public async Task ActivityExecuted(string activityId, byte[] resultData)
{
throw new NotImplementedException();
using (var connection = new NpgsqlConnection(poleSagasStoragePostgreSqlOption.ConnectionString))
{
var updateActivitySql =
$"UPDATE {activityTableName} SET \"Status\"=@Status \"ResultData\"=@ResultData WHERE \"Id\" = @Id";
await connection.ExecuteAsync(updateActivitySql, new
{
Id = activityId,
Status = nameof(ActivityStatus.Executed)
});
}
}
public Task ActivityExecuteAborted(string activityId, string errors)
public async Task ActivityExecuteAborted(string activityId)
{
throw new NotImplementedException();
using (var connection = new NpgsqlConnection(poleSagasStoragePostgreSqlOption.ConnectionString))
{
var updateActivitySql =
$"UPDATE {activityTableName} SET \"Status\"=@Status WHERE \"Id\" = @Id";
await connection.ExecuteAsync(updateActivitySql, new
{
Id = activityId,
Status = nameof(ActivityStatus.ExecuteAborted)
});
}
}
public Task ActivityExecuteOvertime(string activityId, string sagaId, string errors)
public async Task ActivityExecuteOvertime(string activityId)
{
throw new NotImplementedException();
using (var connection = new NpgsqlConnection(poleSagasStoragePostgreSqlOption.ConnectionString))
{
var updateActivitySql =
$"UPDATE {activityTableName} SET \"Status\"=@Status WHERE \"Id\" = @Id";
await connection.ExecuteAsync(updateActivitySql, new
{
Id = activityId,
Status = nameof(ActivityStatus.ExecutingOvertime)
});
}
}
public Task ActivityExecuteStarted(string activityId, string sagaId, int timeOutSeconds, byte[] ParameterData, int order, DateTime addTime)
public Task ActivityExecuting(string activityId, string sagaId, byte[] ParameterData, int order, DateTime addTime)
{
throw new NotImplementedException();
}
......@@ -106,5 +133,10 @@ $"UPDATE {activityTableName} SET \"Status\"=@Status WHERE \"Id\" = @Id";
{
throw new NotImplementedException();
}
public Task ActivityCompensating(string activityId)
{
throw new NotImplementedException();
}
}
}
......@@ -7,13 +7,13 @@ namespace Pole.Sagas.Core.Abstraction
{
Task SagaStarted(string sagaId, string serviceName, DateTime addTime);
Task SagaEnded(string sagaId, DateTime ExpiresAt);
Task ActivityExecuting(string activityId, string sagaId, int timeOutSeconds, byte[] parameterData, int order, DateTime addTime);
Task ActivityExecuting(string activityId, string sagaId, byte[] parameterData, int order, DateTime addTime);
Task ActivityRetried(string activityId, string status, int retries, ActivityRetryType retryType);
Task ActivityExecuteAborted(string activityId, string errors);
Task ActivityExecuteAborted(string activityId);
Task ActivityCompensateAborted(string activityId, string sagaId, string errors);
Task ActivityExecuted(string activityId,byte[] resultData);
Task ActivityCompensated(string activityId);
Task ActivityExecuteOvertime(string activityId, string sagaId, string errors);
Task ActivityExecuteOvertime(string activityId);
Task ActivityRevoked(string activityId);
Task ActivityCompensating(string activityId);
}
......
......@@ -14,6 +14,6 @@ namespace Pole.Sagas.Core
ExecuteAborted,
Revoked,
CompensateAborted,
Overtime
ExecutingOvertime
}
}
......@@ -56,12 +56,11 @@ namespace Pole.Sagas.Core
}
}
public async Task ActivityExecuteAborted(string activityId, string errors)
public async Task ActivityExecuteAborted(string activityId)
{
var result = await sagaClient.ActivityExecuteAbortedAsync(new Server.Grpc.ActivityExecuteAbortedRequest
{
ActivityId = activityId,
Errors = errors
ActivityId = activityId
});
if (!result.IsSuccess)
{
......@@ -84,7 +83,7 @@ namespace Pole.Sagas.Core
}
}
public async Task ActivityExecuting(string activityId, string sagaId, int timeoutSeconds, byte[] parameterData, int order, DateTime addTime)
public async Task ActivityExecuting(string activityId, string sagaId, byte[] parameterData, int order, DateTime addTime)
{
var result = await sagaClient.ActivityExecutingAsync(new Server.Grpc.ActivityExecutingRequest
{
......@@ -93,7 +92,6 @@ namespace Pole.Sagas.Core
Order = order,
ParameterData = Google.Protobuf.ByteString.CopyFrom(parameterData),
SagaId = sagaId,
TimeOutSeconds = timeoutSeconds
});
if (!result.IsSuccess)
{
......@@ -128,13 +126,11 @@ namespace Pole.Sagas.Core
}
}
public async Task ActivityExecuteOvertime(string activityId, string sagaId, string errors)
public async Task ActivityExecuteOvertime(string activityId)
{
var result = await sagaClient.ActivityExecuteOvertimeAsync(new Server.Grpc.ActivityExecuteOvertimeRequest
{
SagaId = sagaId,
ActivityId = activityId,
Errors = errors
});
if (!result.IsSuccess)
{
......
......@@ -10,13 +10,14 @@ namespace Pole.Sagas.Core
{
Task SagaStarted(string sagaId, string serviceName,DateTime addTime);
Task SagaEnded(string sagaId, DateTime ExpiresAt);
Task ActivityExecuteStarted(string activityId, string sagaId, int timeOutSeconds, byte[] ParameterData, int order,DateTime addTime);
Task ActivityExecuting(string activityId, string sagaId, byte[] ParameterData, int order,DateTime addTime);
Task ActivityRetried(string activityId, string status, int retries, ActivityRetryType retryType);
Task ActivityExecuteAborted(string activityId, string errors);
Task ActivityExecuteAborted(string activityId);
Task ActivityCompensateAborted(string activityId, string sagaId, string errors);
Task ActivityEnded(string activityId, byte[] resultData);
Task ActivityExecuted(string activityId, byte[] resultData);
Task ActivityCompensated(string activityId);
Task ActivityExecuteOvertime(string activityId, string sagaId, string errors);
Task ActivityExecuteOvertime(string activityId);
Task ActivityRevoked(string activityId);
Task ActivityCompensating(string activityId);
}
}
......@@ -142,7 +142,7 @@ namespace Pole.Sagas.Core
try
{
var bytesContent = serializer.SerializeToUtf8Bytes(activityWapper.DataObj, activityWapper.ActivityDataType);
await eventSender.ActivityExecuting(activityId, Id, activityWapper.TimeOutSeconds, bytesContent, activityWapper.Order,DateTime.UtcNow);
await eventSender.ActivityExecuting(activityId, Id, bytesContent, activityWapper.Order,DateTime.UtcNow);
var result = await activityWapper.InvokeExecute();
if (!result.IsSuccess)
{
......@@ -171,7 +171,7 @@ namespace Pole.Sagas.Core
IsSuccess = false,
Errors = errors
};
await eventSender.ActivityExecuteOvertime(activityId, Id, errors);
await eventSender.ActivityExecuteOvertime(activityId);
// 超时的时候 需要首先补偿这个超时的操作
return await CompensateActivity(result,currentExecuteOrder+1);
}
......@@ -183,7 +183,7 @@ namespace Pole.Sagas.Core
IsSuccess = false,
Errors = errors
};
await eventSender.ActivityExecuteAborted(activityId, errors);
await eventSender.ActivityExecuteAborted(activityId);
// 出错的时候 需要首先补偿这个出错的操作
return await CompensateActivity(result, currentExecuteOrder + 1);
}
......
......@@ -8,7 +8,6 @@ namespace Pole.Sagas.Core
{
Started,
Ended,
Error,
Overtime
Error
}
}
......@@ -35,10 +35,9 @@ message SagaEndedRequest {
message ActivityExecutingRequest {
string activityId = 1;
string sagaId = 2;
int32 timeOutSeconds = 3;
bytes parameterData = 4;
int32 order = 5;
string addTime = 6;
bytes parameterData = 3;
int32 order = 4;
string addTime = 5;
}
message ActivityRetriedRequest {
string activityId = 1;
......@@ -52,7 +51,6 @@ message ActivityRetriedRequest {
}
message ActivityExecuteAbortedRequest {
string activityId = 1;
string errors = 2;
}
message ActivityCompensateAbortedRequest {
string activityId = 1;
......@@ -68,8 +66,6 @@ message ActivityCompensatedRequest {
}
message ActivityExecuteOvertimeRequest {
string activityId = 1;
string sagaId = 2;
string errors = 3;
}
message ActivityRevokedRequest {
string activityId = 1;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment