Commit 824f076b by dingsongjie

完成部分存储功能

parent b7b7cb3c
......@@ -90,22 +90,7 @@ namespace Pole.Sagas.Server.Services
CommonResponse commonResponse = new CommonResponse();
try
{
await sagaStorage.ActivityExecuting(request.ActivityId, request.SagaId, request.ParameterData.ToByteArray(), request.Order, Convert.ToDateTime(request.AddTime));
commonResponse.IsSuccess = true;
}
catch (Exception ex)
{
commonResponse.Errors = CombineError(ex);
}
return commonResponse;
}
public override async Task<CommonResponse> ActivityRetried(ActivityRetriedRequest request, ServerCallContext context)
{
CommonResponse commonResponse = new CommonResponse();
try
{
var targetActivityRetryType = request.ActivityRetryType == Pole.Sagas.Server.Grpc.ActivityRetriedRequest.Types.ActivityRetryType.Compensate ? ActivityRetryType.Compensate : ActivityRetryType.Execute;
await sagaStorage.ActivityRetried(request.ActivityId, request.Status, request.Retries, targetActivityRetryType);
await sagaStorage.ActivityExecuting(request.ActivityId, request.ActivityName, request.SagaId, request.ParameterData.ToByteArray(), request.Order, Convert.ToDateTime(request.AddTime), request.ExecuteTimes);
commonResponse.IsSuccess = true;
}
catch (Exception ex)
......
......@@ -12,7 +12,7 @@ namespace Pole.Sagas.Storage.PostgreSql
public static IServiceCollection AddPostgreSqlStorage(IServiceCollection services,Action<PoleSagasStoragePostgreSqlOption> config)
{
services.Configure(config);
services.AddSingleton<ISagaStorageInitializer, PostgreSqlEventStorageInitializer>();
services.AddSingleton<ISagaStorageInitializer, PostgreSqlSagaStorageInitializer>();
services.AddSingleton<ISagaStorage, PostgreSqlSagaStorage>();
return services;
}
......
......@@ -109,34 +109,100 @@ $"UPDATE {activityTableName} SET \"Status\"=@Status WHERE \"Id\" = @Id";
}
}
public Task ActivityExecuting(string activityId, string sagaId, byte[] ParameterData, int order, DateTime addTime)
public async Task ActivityExecuting(string activityId, string activityName, string sagaId, byte[] ParameterData, int order, DateTime addTime, int executeTimes)
{
throw new NotImplementedException();
}
using (var connection = new NpgsqlConnection(poleSagasStoragePostgreSqlOption.ConnectionString))
{
string sql = string.Empty;
if (executeTimes == 1)
{
sql =
$"INSERT INTO {activityTableName} (\"Id\",\"Name\",\"SagaId\",\"Status\",\"ParameterData\",\"ExecuteTimes\",\"CompensateTimes\",\"AddTime\")" +
$"VALUES(@Id,@Name,@SagaId,@Status,@ParameterData,@ExecuteTimes,@CompensateTimes,@AddTime);";
_ = await connection.ExecuteAsync(sql, new
{
Id = activityId,
Name = activityName,
SagaId = sagaId,
Status = nameof(ActivityStatus.Executing),
ExecutingOvertimeRetries = 0,
ParameterData = ParameterData,
ExecuteTimes = executeTimes,
CompensateTimes = 0,
AddTime = addTime
});
}
else
{
sql = $"UPDATE {activityTableName} SET \"ExecuteTimes\"=@ExecuteTimes WHERE \"Id\" = @Id";
await connection.ExecuteAsync(sql, new
{
Id = activityId,
ExecuteTimes = executeTimes
});
}
public Task ActivityRetried(string activityId, string status, int retries, ActivityRetryType retryType)
{
throw new NotImplementedException();
}
}
public Task ActivityRevoked(string activityId)
public async Task ActivityRevoked(string activityId)
{
throw new NotImplementedException();
using (var connection = new NpgsqlConnection(poleSagasStoragePostgreSqlOption.ConnectionString))
{
var updateActivitySql =
$"UPDATE {activityTableName} SET \"Status\"=@Status WHERE \"Id\" = @Id";
await connection.ExecuteAsync(updateActivitySql, new
{
Id = activityId,
Status = nameof(ActivityStatus.Revoked)
});
}
}
public Task SagaEnded(string sagaId, DateTime ExpiresAt)
public async Task SagaEnded(string sagaId, DateTime ExpiresAt)
{
throw new NotImplementedException();
using (var connection = new NpgsqlConnection(poleSagasStoragePostgreSqlOption.ConnectionString))
{
var updateActivitySql =
$"UPDATE {sagaTableName} SET \"Status\"=@Status ,\"ExpiresAt\"=@ExpiresAt WHERE \"Id\" = @Id";
await connection.ExecuteAsync(updateActivitySql, new
{
Id = sagaId,
ExpiresAt= ExpiresAt,
Status = nameof(ActivityStatus.Revoked)
});
}
}
public Task SagaStarted(string sagaId, string serviceName, DateTime addTime)
public async Task SagaStarted(string sagaId, string serviceName, DateTime addTime)
{
throw new NotImplementedException();
using (var connection = new NpgsqlConnection(poleSagasStoragePostgreSqlOption.ConnectionString))
{
var sql =
$"INSERT INTO {sagaTableName} (\"Id\",\"ServiceName\",\"Status\",\"AddTime\")" +
$"VALUES(@Id,@ServiceName,@Status,@AddTime);";
await connection.ExecuteAsync(sql, new
{
Id = sagaId,
AddTime = addTime,
ServiceName=serviceName,
Status = nameof(ActivityStatus.Revoked)
});
}
}
public Task ActivityCompensating(string activityId)
public async Task ActivityCompensating(string activityId)
{
throw new NotImplementedException();
using (var connection = new NpgsqlConnection(poleSagasStoragePostgreSqlOption.ConnectionString))
{
var updateActivitySql =
$"UPDATE {activityTableName} SET \"Status\"=@Status WHERE \"Id\" = @Id";
await connection.ExecuteAsync(updateActivitySql, new
{
Id = activityId,
Status = nameof(ActivityStatus.Compensating)
});
}
}
}
}
......@@ -13,11 +13,11 @@ using System.Threading.Tasks;
namespace Pole.Sagas.Storage.PostgreSql
{
class PostgreSqlEventStorageInitializer : ISagaStorageInitializer
class PostgreSqlSagaStorageInitializer : ISagaStorageInitializer
{
private PoleSagasStoragePostgreSqlOption options;
private readonly ILogger logger;
public PostgreSqlEventStorageInitializer(IOptions<PoleSagasStoragePostgreSqlOption> poleSagaServerOption, ILogger<PostgreSqlEventStorageInitializer> logger)
public PostgreSqlSagaStorageInitializer(IOptions<PoleSagasStoragePostgreSqlOption> poleSagaServerOption, ILogger<PostgreSqlSagaStorageInitializer> logger)
{
this.options = poleSagaServerOption.Value;
this.logger = logger;
......@@ -57,19 +57,19 @@ CREATE TABLE IF NOT EXISTS {GetSagaTableName()}(
""ExpiresAt"" timestamp,
""AddTime"" timestamp NOT NULL
);
ALTER TABLE ""{options.SchemaName}"" ADD CONSTRAINT ""Sagas_pkey"" PRIMARY KEY (""Id"");
ALTER TABLE ""{GetSagaTableName()}"" ADD CONSTRAINT ""Sagas_pkey"" PRIMARY KEY (""Id"");
CREATE TABLE IF NOT EXISTS {GetActivityTableName()}(
""Id"" varchar(20) COLLATE ""pg_catalog"".""default"" NOT NULL,
""Name"" varchar(255) COLLATE ""pg_catalog"".""default"" NOT NULL,
""SagaId"" varchar(20) COLLATE ""pg_catalog"".""default"" NOT NULL,
""Order"" int4 NOT NULL,
""Status"" varchar(10) COLLATE ""pg_catalog"".""default"" NOT NULL,
""TimeOutSeconds"" int4 NOT NULL,
""ExecuteTimes"" int4 NOT NULL,
""ParameterData"" bytea NOT NULL,
""ResultData"" bytea,
""Errors"" varchar(1024) COLLATE ""pg_catalog"".""default"",
""ExecuteRetries"" int4 NOT NULL,
""CompensateRetries"" int4 NOT NULL,
""CompensateTimes"" int4 NOT NULL,
""AddTime"" timestamp NOT NULL
)
;
......
......@@ -7,8 +7,7 @@ namespace Pole.Sagas.Core.Abstraction
{
Task SagaStarted(string sagaId, string serviceName, DateTime addTime);
Task SagaEnded(string sagaId, DateTime ExpiresAt);
Task ActivityExecuting(string activityId, string sagaId, byte[] parameterData, int order, DateTime addTime);
Task ActivityRetried(string activityId, string status, int retries, ActivityRetryType retryType);
Task ActivityExecuting(string activityId,string activityName, string sagaId, byte[] parameterData, int order, DateTime addTime,int executeTimes);
Task ActivityExecuteAborted(string activityId);
Task ActivityCompensateAborted(string activityId, string sagaId, string errors);
Task ActivityExecuted(string activityId,byte[] resultData);
......
......@@ -11,12 +11,16 @@ namespace Pole.Sagas.Core
public class ActivityWapper
{
public string Id { get; set; }
public string Name { get; set; }
public Type ActivityType { get; set; }
public Type ActivityDataType { get; set; }
public object DataObj { get; set; }
public int Order { get; set; }
public int ExecuteTimes { get; set; }
public int CompensateTimes { get; set; }
public ActivityStatus ActivityStatus { get; set; }
public IServiceProvider ServiceProvider { get; set; }
public int TimeOutSeconds { get; set; }
public CancellationTokenSource CancellationTokenSource { get; set; }
public Task<ActivityExecuteResult> InvokeExecute()
......
......@@ -68,26 +68,12 @@ namespace Pole.Sagas.Core
}
}
public async Task ActivityRetried(string activityId, string status, int retries, ActivityRetryType retryType)
{
Pole.Sagas.Server.Grpc.ActivityRetriedRequest.Types.ActivityRetryType activityRetryType = retryType == ActivityRetryType.Compensate ? Pole.Sagas.Server.Grpc.ActivityRetriedRequest.Types.ActivityRetryType.Compensate : Pole.Sagas.Server.Grpc.ActivityRetriedRequest.Types.ActivityRetryType.Execute;
var result = await sagaClient.ActivityRetriedAsync(new Server.Grpc.ActivityRetriedRequest
{
ActivityId = activityId,
ActivityRetryType = activityRetryType
});
if (!result.IsSuccess)
{
throw new SagasServerException(result.Errors);
}
}
public async Task ActivityExecuting(string activityId, string sagaId, byte[] parameterData, int order, DateTime addTime)
public async Task ActivityExecuting(string activityId, string activityName, string sagaId, byte[] parameterData, int order, DateTime addTime, int executeTimes)
{
var result = await sagaClient.ActivityExecutingAsync(new Server.Grpc.ActivityExecutingRequest
{
ActivityId = activityId,
ActivityName = activityName,
AddTime = addTime.ToString("yyyy-MM-dd HH:mm:ss.fff"),
Order = order,
ParameterData = Google.Protobuf.ByteString.CopyFrom(parameterData),
......
......@@ -10,8 +10,7 @@ namespace Pole.Sagas.Core
{
Task SagaStarted(string sagaId, string serviceName,DateTime addTime);
Task SagaEnded(string sagaId, DateTime ExpiresAt);
Task ActivityExecuting(string activityId, string sagaId, byte[] ParameterData, int order,DateTime addTime);
Task ActivityRetried(string activityId, string status, int retries, ActivityRetryType retryType);
Task ActivityExecuting(string activityId, string activityName,string sagaId, byte[] ParameterData, int order,DateTime addTime,int executeTimes);
Task ActivityExecuteAborted(string activityId);
Task ActivityCompensateAborted(string activityId, string sagaId, string errors);
Task ActivityExecuted(string activityId, byte[] resultData);
......
......@@ -25,7 +25,7 @@ namespace Pole.Sagas.Core
/// <summary>
/// 如果 等于 -1 说明已经在执行补偿操作,此时这个值已经没有意义
/// </summary>
private int currentExecuteOrder = 0;
private int currentExecuteOrder = 0;
/// <summary>
/// 如果 等于 -1 说明已经还未执行补偿操作,此时这个值没有意义
/// </summary>
......@@ -43,7 +43,7 @@ namespace Pole.Sagas.Core
this.activityFinder = activityFinder;
Id = snowflakeIdGenerator.NextId();
}
internal Saga(ISnowflakeIdGenerator snowflakeIdGenerator, IServiceProvider serviceProvider, IEventSender eventSender, PoleSagasOption poleSagasOption, ISerializer serializer, IActivityFinder activityFinder,int currentExecuteOrder,int currentCompensateOrder, List<ActivityWapper> activities)
internal Saga(ISnowflakeIdGenerator snowflakeIdGenerator, IServiceProvider serviceProvider, IEventSender eventSender, PoleSagasOption poleSagasOption, ISerializer serializer, IActivityFinder activityFinder, int currentExecuteOrder, int currentCompensateOrder, List<ActivityWapper> activities)
{
this.snowflakeIdGenerator = snowflakeIdGenerator;
this.serviceProvider = serviceProvider;
......@@ -69,6 +69,7 @@ namespace Pole.Sagas.Core
var dataType = activityInterface.GetGenericArguments()[0];
ActivityWapper activityWapper = new ActivityWapper
{
Name = activityName,
ActivityDataType = dataType,
ActivityStatus = ActivityStatus.NotStarted,
ActivityType = targetActivityType,
......@@ -82,7 +83,7 @@ namespace Pole.Sagas.Core
public async Task<SagaResult> GetResult()
{
await eventSender.SagaStarted(Id, poleSagasOption.ServiceName,DateTime.UtcNow);
await eventSender.SagaStarted(Id, poleSagasOption.ServiceName, DateTime.UtcNow);
var executeActivity = GetNextExecuteActivity();
if (executeActivity == null)
......@@ -138,16 +139,17 @@ namespace Pole.Sagas.Core
{
var activityId = snowflakeIdGenerator.NextId();
activityWapper.Id = activityId;
activityWapper.ExecuteTimes++;
activityWapper.CancellationTokenSource = new System.Threading.CancellationTokenSource(2 * 1000);
try
{
var bytesContent = serializer.SerializeToUtf8Bytes(activityWapper.DataObj, activityWapper.ActivityDataType);
await eventSender.ActivityExecuting(activityId, Id, bytesContent, activityWapper.Order,DateTime.UtcNow);
await eventSender.ActivityExecuting(activityId, activityWapper.Name, Id, bytesContent, activityWapper.Order, DateTime.UtcNow, activityWapper.ExecuteTimes);
var result = await activityWapper.InvokeExecute();
if (!result.IsSuccess)
{
await eventSender.ActivityRevoked(activityId);
await CompensateActivity(result,currentExecuteOrder);
await CompensateActivity(result, currentExecuteOrder);
return result;
}
await eventSender.ActivityExecuted(activityId, Encoding.UTF8.GetBytes(string.Empty));
......@@ -173,7 +175,7 @@ namespace Pole.Sagas.Core
};
await eventSender.ActivityExecuteOvertime(activityId);
// 超时的时候 需要首先补偿这个超时的操作
return await CompensateActivity(result,currentExecuteOrder+1);
return await CompensateActivity(result, currentExecuteOrder + 1);
}
else
{
......@@ -190,7 +192,7 @@ namespace Pole.Sagas.Core
}
}
private async Task<ActivityExecuteResult> CompensateActivity(ActivityExecuteResult result,int currentCompensateOrder)
private async Task<ActivityExecuteResult> CompensateActivity(ActivityExecuteResult result, int currentCompensateOrder)
{
this.currentCompensateOrder = currentCompensateOrder;
currentExecuteOrder = -1;
......
......@@ -8,7 +8,6 @@ service Saga {
rpc SagaStarted (SagaStartedRequest) returns (CommonResponse);
rpc SagaEnded (SagaEndedRequest) returns (CommonResponse);
rpc ActivityExecuting (ActivityExecutingRequest) returns (CommonResponse);
rpc ActivityRetried (ActivityRetriedRequest) returns (CommonResponse);
rpc ActivityExecuteAborted (ActivityExecuteAbortedRequest) returns (CommonResponse);
rpc ActivityCompensateAborted (ActivityCompensateAbortedRequest) returns (CommonResponse);
rpc ActivityExecuted (ActivityExecutedRequest) returns (CommonResponse);
......@@ -38,16 +37,8 @@ message ActivityExecutingRequest {
bytes parameterData = 3;
int32 order = 4;
string addTime = 5;
}
message ActivityRetriedRequest {
string activityId = 1;
string status = 2;
int32 retries = 3;
ActivityRetryType activityRetryType = 4;
enum ActivityRetryType{
Execute = 0;
Compensate = 1;
}
string activityName = 6;
int32 executeTimes = 7;
}
message ActivityExecuteAbortedRequest {
string activityId = 1;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment