Commit 82d581dc by dingsongjie

优化代码

parent 824f076b
...@@ -48,7 +48,7 @@ namespace Pole.Sagas.Server.Services ...@@ -48,7 +48,7 @@ namespace Pole.Sagas.Server.Services
CommonResponse commonResponse = new CommonResponse(); CommonResponse commonResponse = new CommonResponse();
try try
{ {
await sagaStorage.ActivityExecuted(request.ActivityId, request.ResultData.ToByteArray()); await sagaStorage.ActivityExecuted(request.ActivityId);
commonResponse.IsSuccess = true; commonResponse.IsSuccess = true;
} }
catch (Exception ex) catch (Exception ex)
...@@ -146,7 +146,7 @@ namespace Pole.Sagas.Server.Services ...@@ -146,7 +146,7 @@ namespace Pole.Sagas.Server.Services
CommonResponse commonResponse = new CommonResponse(); CommonResponse commonResponse = new CommonResponse();
try try
{ {
await sagaStorage.ActivityCompensating(request.ActivityId); await sagaStorage.ActivityCompensating(request.ActivityId, request.CompensateTimes);
commonResponse.IsSuccess = true; commonResponse.IsSuccess = true;
} }
catch (Exception ex) catch (Exception ex)
......
...@@ -67,12 +67,12 @@ $"UPDATE {activityTableName} SET \"Status\"=@Status WHERE \"Id\" = @Id"; ...@@ -67,12 +67,12 @@ $"UPDATE {activityTableName} SET \"Status\"=@Status WHERE \"Id\" = @Id";
} }
} }
public async Task ActivityExecuted(string activityId, byte[] resultData) public async Task ActivityExecuted(string activityId)
{ {
using (var connection = new NpgsqlConnection(poleSagasStoragePostgreSqlOption.ConnectionString)) using (var connection = new NpgsqlConnection(poleSagasStoragePostgreSqlOption.ConnectionString))
{ {
var updateActivitySql = var updateActivitySql =
$"UPDATE {activityTableName} SET \"Status\"=@Status \"ResultData\"=@ResultData WHERE \"Id\" = @Id"; $"UPDATE {activityTableName} SET \"Status\"=@Status WHERE \"Id\" = @Id";
await connection.ExecuteAsync(updateActivitySql, new await connection.ExecuteAsync(updateActivitySql, new
{ {
Id = activityId, Id = activityId,
...@@ -141,7 +141,6 @@ $"UPDATE {activityTableName} SET \"Status\"=@Status WHERE \"Id\" = @Id"; ...@@ -141,7 +141,6 @@ $"UPDATE {activityTableName} SET \"Status\"=@Status WHERE \"Id\" = @Id";
ExecuteTimes = executeTimes ExecuteTimes = executeTimes
}); });
} }
} }
} }
...@@ -191,16 +190,17 @@ $"INSERT INTO {sagaTableName} (\"Id\",\"ServiceName\",\"Status\",\"AddTime\")" + ...@@ -191,16 +190,17 @@ $"INSERT INTO {sagaTableName} (\"Id\",\"ServiceName\",\"Status\",\"AddTime\")" +
} }
} }
public async Task ActivityCompensating(string activityId) public async Task ActivityCompensating(string activityId, int compensateTimes)
{ {
using (var connection = new NpgsqlConnection(poleSagasStoragePostgreSqlOption.ConnectionString)) using (var connection = new NpgsqlConnection(poleSagasStoragePostgreSqlOption.ConnectionString))
{ {
var updateActivitySql = var updateActivitySql =
$"UPDATE {activityTableName} SET \"Status\"=@Status WHERE \"Id\" = @Id"; $"UPDATE {activityTableName} SET \"Status\"=@Status ,\"CompensateTimes\"=@CompensateTimes WHERE \"Id\" = @Id";
await connection.ExecuteAsync(updateActivitySql, new await connection.ExecuteAsync(updateActivitySql, new
{ {
Id = activityId, Id = activityId,
Status = nameof(ActivityStatus.Compensating) Status = nameof(ActivityStatus.Compensating),
CompensateTimes= compensateTimes,
}); });
} }
} }
......
...@@ -67,8 +67,7 @@ CREATE TABLE IF NOT EXISTS {GetActivityTableName()}( ...@@ -67,8 +67,7 @@ CREATE TABLE IF NOT EXISTS {GetActivityTableName()}(
""Status"" varchar(10) COLLATE ""pg_catalog"".""default"" NOT NULL, ""Status"" varchar(10) COLLATE ""pg_catalog"".""default"" NOT NULL,
""ExecuteTimes"" int4 NOT NULL, ""ExecuteTimes"" int4 NOT NULL,
""ParameterData"" bytea NOT NULL, ""ParameterData"" bytea NOT NULL,
""ResultData"" bytea, ""CompensateErrors"" varchar(1024) COLLATE ""pg_catalog"".""default"",
""Errors"" varchar(1024) COLLATE ""pg_catalog"".""default"",
""CompensateTimes"" int4 NOT NULL, ""CompensateTimes"" int4 NOT NULL,
""AddTime"" timestamp NOT NULL ""AddTime"" timestamp NOT NULL
) )
......
...@@ -10,10 +10,10 @@ namespace Pole.Sagas.Core.Abstraction ...@@ -10,10 +10,10 @@ namespace Pole.Sagas.Core.Abstraction
Task ActivityExecuting(string activityId,string activityName, string sagaId, byte[] parameterData, int order, DateTime addTime,int executeTimes); Task ActivityExecuting(string activityId,string activityName, string sagaId, byte[] parameterData, int order, DateTime addTime,int executeTimes);
Task ActivityExecuteAborted(string activityId); Task ActivityExecuteAborted(string activityId);
Task ActivityCompensateAborted(string activityId, string sagaId, string errors); Task ActivityCompensateAborted(string activityId, string sagaId, string errors);
Task ActivityExecuted(string activityId,byte[] resultData); Task ActivityExecuted(string activityId);
Task ActivityCompensated(string activityId); Task ActivityCompensated(string activityId);
Task ActivityExecuteOvertime(string activityId); Task ActivityExecuteOvertime(string activityId);
Task ActivityRevoked(string activityId); Task ActivityRevoked(string activityId);
Task ActivityCompensating(string activityId); Task ActivityCompensating(string activityId,int compensateTimes);
} }
} }
...@@ -43,12 +43,11 @@ namespace Pole.Sagas.Core ...@@ -43,12 +43,11 @@ namespace Pole.Sagas.Core
} }
} }
public async Task ActivityExecuted(string activityId, byte[] resultData) public async Task ActivityExecuted(string activityId)
{ {
var result = await sagaClient.ActivityExecutedAsync(new Server.Grpc.ActivityExecutedRequest var result = await sagaClient.ActivityExecutedAsync(new Server.Grpc.ActivityExecutedRequest
{ {
ActivityId = activityId, ActivityId = activityId,
ResultData = Google.Protobuf.ByteString.CopyFrom(resultData),
}); });
if (!result.IsSuccess) if (!result.IsSuccess)
{ {
...@@ -136,7 +135,7 @@ namespace Pole.Sagas.Core ...@@ -136,7 +135,7 @@ namespace Pole.Sagas.Core
} }
} }
public async Task ActivityCompensating(string activityId) public async Task ActivityCompensating(string activityId, int compensateTimes)
{ {
var result = await sagaClient.ActivityCompensatingAsync(new Server.Grpc.ActivityCompensatingRequest var result = await sagaClient.ActivityCompensatingAsync(new Server.Grpc.ActivityCompensatingRequest
{ {
......
...@@ -13,10 +13,10 @@ namespace Pole.Sagas.Core ...@@ -13,10 +13,10 @@ namespace Pole.Sagas.Core
Task ActivityExecuting(string activityId, string activityName,string sagaId, byte[] ParameterData, int order,DateTime addTime,int executeTimes); Task ActivityExecuting(string activityId, string activityName,string sagaId, byte[] ParameterData, int order,DateTime addTime,int executeTimes);
Task ActivityExecuteAborted(string activityId); Task ActivityExecuteAborted(string activityId);
Task ActivityCompensateAborted(string activityId, string sagaId, string errors); Task ActivityCompensateAborted(string activityId, string sagaId, string errors);
Task ActivityExecuted(string activityId, byte[] resultData); Task ActivityExecuted(string activityId);
Task ActivityCompensated(string activityId); Task ActivityCompensated(string activityId);
Task ActivityExecuteOvertime(string activityId); Task ActivityExecuteOvertime(string activityId);
Task ActivityRevoked(string activityId); Task ActivityRevoked(string activityId);
Task ActivityCompensating(string activityId); Task ActivityCompensating(string activityId, int compensateTimes);
} }
} }
...@@ -120,7 +120,7 @@ namespace Pole.Sagas.Core ...@@ -120,7 +120,7 @@ namespace Pole.Sagas.Core
var activityId = activityWapper.Id; var activityId = activityWapper.Id;
try try
{ {
await eventSender.ActivityCompensating(activityId); await eventSender.ActivityCompensating(activityId, activityWapper.CompensateTimes);
await activityWapper.InvokeCompensate(); await activityWapper.InvokeCompensate();
await eventSender.ActivityCompensated(activityId); await eventSender.ActivityCompensated(activityId);
var compensateActivity = GetNextCompensateActivity(); var compensateActivity = GetNextCompensateActivity();
...@@ -152,7 +152,7 @@ namespace Pole.Sagas.Core ...@@ -152,7 +152,7 @@ namespace Pole.Sagas.Core
await CompensateActivity(result, currentExecuteOrder); await CompensateActivity(result, currentExecuteOrder);
return result; return result;
} }
await eventSender.ActivityExecuted(activityId, Encoding.UTF8.GetBytes(string.Empty)); await eventSender.ActivityExecuted(activityId);
var executeActivity = GetNextExecuteActivity(); var executeActivity = GetNextExecuteActivity();
if (executeActivity == null) if (executeActivity == null)
{ {
......
...@@ -50,7 +50,6 @@ message ActivityCompensateAbortedRequest { ...@@ -50,7 +50,6 @@ message ActivityCompensateAbortedRequest {
} }
message ActivityExecutedRequest { message ActivityExecutedRequest {
string activityId = 1; string activityId = 1;
bytes resultData = 2;
} }
message ActivityCompensatedRequest { message ActivityCompensatedRequest {
string activityId = 1; string activityId = 1;
...@@ -63,6 +62,7 @@ message ActivityRevokedRequest { ...@@ -63,6 +62,7 @@ message ActivityRevokedRequest {
} }
message ActivityCompensatingRequest { message ActivityCompensatingRequest {
string activityId = 1; string activityId = 1;
int32 CompensateTimes = 2;
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment