Commit ef47889d by dingsongjie

完成 sagas 基本测试

parent 4e4262ee
Showing with 146 additions and 100 deletions
......@@ -17,7 +17,7 @@
},
"SagasServer": {
"commandName": "Project",
"launchBrowser": true,
"launchBrowser": false,
"applicationUrl": "http://localhost:5000",
"environmentVariables": {
"ASPNETCORE_ENVIRONMENT": "Development"
......
......@@ -17,7 +17,7 @@ namespace SagasTest.Api.Activities
{
this.httpClientFactory = httpClientFactory;
}
public Task Compensate(Transaction1Dto data,CancellationToken cancellationToken)
public Task Compensate(Transaction1Dto data)
{
throw new NotImplementedException();
}
......
......@@ -18,7 +18,7 @@ namespace SagasTest.Api.Activities
this.httpClientFactory = httpClientFactory;
}
public async Task Compensate(Transaction1Dto data, CancellationToken cancellationToken)
public async Task Compensate(Transaction1Dto data)
{
var httpclient = httpClientFactory.CreateClient();
httpclient.BaseAddress = new Uri("http://localhost:5000");
......
......@@ -17,7 +17,7 @@ namespace SagasTest.Api.Activities
{
this.httpClientFactory = httpClientFactory;
}
public async Task Compensate(Transaction1Dto data, CancellationToken cancellationToken)
public async Task Compensate(Transaction1Dto data)
{
var httpclient = httpClientFactory.CreateClient();
httpclient.BaseAddress = new Uri("http://localhost:5000");
......
......@@ -18,7 +18,7 @@ namespace SagasTest.Api.Activities
this.httpClientFactory = httpClientFactory;
}
public async Task Compensate(Transaction1Dto data, CancellationToken cancellationToken)
public async Task Compensate(Transaction1Dto data)
{
var httpclient = httpClientFactory.CreateClient();
httpclient.BaseAddress = new Uri("http://localhost:5000");
......
......@@ -17,7 +17,7 @@ namespace SagasTest.Api.Activities
{
this.httpClientFactory = httpClientFactory;
}
public Task Compensate(Transaction2Dto data, CancellationToken cancellationToken)
public Task Compensate(Transaction2Dto data)
{
throw new NotImplementedException();
}
......
......@@ -18,7 +18,7 @@ namespace SagasTest.Api.Activities
this.httpClientFactory = httpClientFactory;
}
public async Task Compensate(Transaction2Dto data, CancellationToken cancellationToken)
public async Task Compensate(Transaction2Dto data)
{
var httpclient = httpClientFactory.CreateClient();
httpclient.BaseAddress = new Uri("http://localhost:5000");
......
......@@ -17,7 +17,7 @@ namespace SagasTest.Api.Activities
{
this.httpClientFactory = httpClientFactory;
}
public async Task Compensate(Transaction2Dto data, CancellationToken cancellationToken)
public async Task Compensate(Transaction2Dto data)
{
var httpclient = httpClientFactory.CreateClient();
httpclient.BaseAddress = new Uri("http://localhost:5000");
......
......@@ -18,7 +18,7 @@ namespace SagasTest.Api.Activities
this.httpClientFactory = httpClientFactory;
}
public async Task Compensate(Transaction2Dto data, CancellationToken cancellationToken)
public async Task Compensate(Transaction2Dto data)
{
var httpclient = httpClientFactory.CreateClient();
httpclient.BaseAddress = new Uri("http://localhost:5000");
......
......@@ -11,7 +11,7 @@ namespace SagasTest.Api.Activities
{
public class Transaction3ExceptionActivity : IActivity<Transaction3Dto>
{
public Task Compensate(Transaction3Dto data, CancellationToken cancellationToken)
public Task Compensate(Transaction3Dto data)
{
Console.WriteLine("Transaction3 Rollback");
return Task.CompletedTask;
......
......@@ -11,7 +11,7 @@ namespace SagasTest.Api.Activities
{
public class Transaction3HasResultActivity : IActivity<Transaction3Dto>
{
public Task Compensate(Transaction3Dto data, CancellationToken cancellationToken)
public Task Compensate(Transaction3Dto data)
{
Console.WriteLine("Transaction3 Rollback");
return Task.CompletedTask;
......
......@@ -11,7 +11,7 @@ namespace SagasTest.Api.Activities
{
public class Transaction3ReturnFalseActivity : IActivity<Transaction3Dto>
{
public Task Compensate(Transaction3Dto data, CancellationToken cancellationToken)
public Task Compensate(Transaction3Dto data)
{
Console.WriteLine("Transaction3 Rollback");
return Task.CompletedTask;
......
......@@ -85,6 +85,7 @@ $"UPDATE {tableName} SET \"Retries\"=@Retries,\"ExpiresAt\"=@ExpiresAt,\"StatusN
var result = new List<EventEntity>();
using (var connection = new NpgsqlConnection(options.ConnectionString))
{
await connection.OpenAsync();
using (var transaction = await connection.BeginTransactionAsync())
{
var reader = await connection.ExecuteReaderAsync(sql);
......
......@@ -10,6 +10,6 @@ namespace Pole.Sagas.Client.Abstraction
public interface IActivity<TData>
{
Task<ActivityExecuteResult> Execute(TData data ,CancellationToken cancellationToken);
Task Compensate(TData data, CancellationToken cancellationToken);
Task Compensate(TData data);
}
}
......@@ -7,7 +7,7 @@ namespace Pole.Sagas.Client.Abstraction
{
Task SagaStarted(string sagaId, string serviceName, DateTime addTime);
Task SagaEnded(string sagaId, DateTime ExpiresAt);
Task ActivityExecuting(string activityId,string activityName, string sagaId, byte[] parameterData, int order, DateTime addTime);
Task ActivityExecuting(string activityId,string activityName, string sagaId, string parameterData, int order, DateTime addTime);
Task ActivityExecuteAborted(string activityId);
/// <summary>
///
......@@ -19,7 +19,7 @@ namespace Pole.Sagas.Client.Abstraction
Task ActivityCompensateAborted(string activityId, string sagaId, string errors);
Task ActivityCompensated(string activityId);
Task ActivityOvertimeCompensated(string activityId,bool compensated);
Task ActivityExecuteOvertime(string activityId,string name,byte [] parameterData,DateTime addTime);
Task ActivityExecuteOvertime(string activityId);
Task ActivityRevoked(string activityId);
}
}
......@@ -47,15 +47,14 @@ namespace Pole.Sagas.Client
var activityParams = Expression.Convert(activityObjParams, ActivityType);
var dataObjParams = Expression.Parameter(typeof(object), "data");
var dataParams = Expression.Convert(dataObjParams, ActivityDataType);
var cancellationTokenParams = Expression.Parameter(typeof(CancellationToken), "ct");
var method = ActivityType.GetMethod("Compensate", new Type[] { ActivityDataType, typeof(CancellationToken) });
var body = Expression.Call(activityParams, method, dataParams, cancellationTokenParams);
var func = Expression.Lambda<Func<object, object, CancellationToken, Task>>(body, activityObjParams, dataObjParams, cancellationTokenParams).Compile();
var method = ActivityType.GetMethod("Compensate", new Type[] { ActivityDataType});
var body = Expression.Call(activityParams, method, dataParams);
var func = Expression.Lambda<Func<object, object, Task>>(body, activityObjParams, dataObjParams).Compile();
using (var scope = ServiceProvider.CreateScope())
{
var activity = scope.ServiceProvider.GetRequiredService(ActivityType);
return func(activity, DataObj, CancellationTokenSource.Token);
return func(activity, DataObj);
}
}
}
......
......@@ -56,7 +56,7 @@ namespace Pole.Sagas.Client
}
}
public async Task ActivityExecuting(string activityId, string activityName, string sagaId, byte[] parameterData, int order, DateTime addTime)
public async Task ActivityExecuting(string activityId, string activityName, string sagaId, string parameterData, int order, DateTime addTime)
{
var result = await sagaClient.ActivityExecutingAsync(new Server.Grpc.ActivityExecutingRequest
{
......@@ -64,7 +64,7 @@ namespace Pole.Sagas.Client
ActivityName = activityName,
AddTime = addTime.ToString("yyyy-MM-dd HH:mm:ss.fff"),
Order = order,
ParameterData = Google.Protobuf.ByteString.CopyFrom(parameterData),
ParameterData = parameterData,
SagaId = sagaId,
});
if (!result.IsSuccess)
......@@ -100,7 +100,7 @@ namespace Pole.Sagas.Client
}
}
public async Task ActivityExecuteOvertime(string activityId, string name, byte[] parameterData, DateTime addTime)
public async Task ActivityExecuteOvertime(string activityId)
{
var result = await sagaClient.ActivityExecuteOvertimeAsync(new Server.Grpc.ActivityExecuteOvertimeRequest
{
......
......@@ -9,7 +9,7 @@ namespace Pole.Sagas.Client
public string ServiceName { get; set; }
public int PreSagasGrpcStreamingResponseLimitCount { get; set; } = 20;
public int MaxCompensateTimes { get; set; } = 10;
public int MaxOvertimeCompensateTimes { get; set; } = 10;
public int MaxOvertimeCompensateTimes { get; set; } = 3;
public int CompeletedSagaExpiredAfterSeconds { get; set; } = 60 * 10;
public int SagasTimeOutSeconds { get; set; } = 60;
public string SagasServerHost { get; set; }
......
......@@ -25,6 +25,10 @@ namespace Pole.Sagas.Client
{
get { return activities.Count; }
}
private bool IsCompensated
{
get { return this.activities.All(m => m.ActivityStatus == ActivityStatus.Compensated); }
}
/// <summary>
/// 如果 等于 -1 说明已经在执行补偿操作,此时这个值已经没有意义
/// </summary>
......@@ -75,13 +79,13 @@ namespace Pole.Sagas.Client
ActivityStatus = ActivityStatus.NotStarted,
ActivityType = targetActivityType,
DataObj = data,
Order = CurrentMaxOrder,
Order = CurrentMaxOrder + 1,
ServiceProvider = serviceProvider,
TimeOutSeconds = 2,
TimeOutSeconds = timeOutSeconds,
};
activities.Add(activityWapper);
}
internal void AddActivity(string activityName, string activityStatus, object data, int order, int timeOutSeconds = 2)
internal void AddActivity(string id, string activityName, string activityStatus, string data, int order, int compensateTimes,int overtimeCompensateTimes)
{
var targetActivityType = activityFinder.FindType(activityName);
......@@ -91,16 +95,19 @@ namespace Pole.Sagas.Client
throw new ActivityNotFoundWhenCompensateRetryException(activityName);
}
var dataType = activityInterface.GetGenericArguments()[0];
var dataParameter = serializer.Deserialize(data, dataType);
ActivityWapper activityWapper = new ActivityWapper
{
Name = activityName,
ActivityDataType = dataType,
ActivityStatus = (ActivityStatus)Enum.Parse(typeof(ActivityStatus), activityStatus),
ActivityType = targetActivityType,
DataObj = data,
DataObj = dataParameter,
Order = order,
ServiceProvider = serviceProvider,
TimeOutSeconds = 2,
Id = id,
CompensateTimes = compensateTimes,
OvertimeCompensateTimes= overtimeCompensateTimes
};
activities.Add(activityWapper);
}
......@@ -131,11 +138,12 @@ namespace Pole.Sagas.Client
return true;
}
await RecursiveCompensateActivity(compensateActivity);
if (activities.Any(m => m.ActivityStatus != ActivityStatus.Compensated|| m.ActivityStatus != ActivityStatus.CompensateAborted))
// 如果补偿成功 这里返回 true
if (activities.All(m => m.ActivityStatus == ActivityStatus.Compensated))
{
return false;
return true;
}
return true;
return false;
}
private ActivityWapper GetNextExecuteActivity()
......@@ -170,15 +178,24 @@ namespace Pole.Sagas.Client
}
try
{
if (activityWapper.ActivityStatus != ActivityStatus.ExecutingOvertime)
{
activityWapper.ActivityStatus = ActivityStatus.Compensating;
}
await activityWapper.InvokeCompensate();
if (activityWapper.ActivityStatus == ActivityStatus.ExecutingOvertime)
{
// 超时 补偿次数已到
var isCompensated = activityWapper.CompensateTimes == poleSagasOption.MaxOvertimeCompensateTimes;
var isCompensated = activityWapper.OvertimeCompensateTimes >= poleSagasOption.MaxOvertimeCompensateTimes;
if (isCompensated)
{
activityWapper.ActivityStatus = ActivityStatus.Compensated;
}
await eventSender.ActivityOvertimeCompensated(activityId, isCompensated);
}
else
{
activityWapper.ActivityStatus = ActivityStatus.Compensated;
await eventSender.ActivityCompensated(activityId);
}
var compensateActivity = GetNextCompensateActivity();
......@@ -190,8 +207,9 @@ namespace Pole.Sagas.Client
}
catch (Exception exception)
{
activityWapper.ActivityStatus = ActivityStatus.CompensateAborted;
// todo: 超时操作 如果出错可能 减少 补偿次数 这里 先不做处理
if (activityWapper.CompensateTimes == poleSagasOption.MaxCompensateTimes)
if (activityWapper.CompensateTimes >= poleSagasOption.MaxCompensateTimes)
{
// 此时 结束 saga 并且设置状态 为 Error
await eventSender.ActivityCompensateAborted(activityId, Id, exception.InnerException != null ? exception.InnerException.Message + exception.StackTrace : exception.Message + exception.StackTrace);
......@@ -206,21 +224,28 @@ namespace Pole.Sagas.Client
{
var activityId = snowflakeIdGenerator.NextId();
activityWapper.Id = activityId;
activityWapper.CancellationTokenSource = new System.Threading.CancellationTokenSource(2 * 1000);
activityWapper.CancellationTokenSource = new System.Threading.CancellationTokenSource(activityWapper.TimeOutSeconds * 1000);
try
{
var bytesContent = serializer.SerializeToUtf8Bytes(activityWapper.DataObj, activityWapper.ActivityDataType);
await eventSender.ActivityExecuting(activityId, activityWapper.Name, Id, bytesContent, activityWapper.Order, DateTime.UtcNow);
var content = serializer.Serialize(activityWapper.DataObj, activityWapper.ActivityDataType);
activityWapper.ActivityStatus = ActivityStatus.Executing;
await eventSender.ActivityExecuting(activityId, activityWapper.Name, Id, content, activityWapper.Order, DateTime.UtcNow);
var result = await activityWapper.InvokeExecute();
if (!result.IsSuccess)
{
activityWapper.ActivityStatus = ActivityStatus.Revoked;
await eventSender.ActivityRevoked(activityId);
await CompensateActivity(result, currentExecuteOrder);
var expiresAt = DateTime.UtcNow.AddSeconds(poleSagasOption.CompeletedSagaExpiredAfterSeconds);
await eventSender.SagaEnded(Id, expiresAt);
return result;
}
activityWapper.ActivityStatus = ActivityStatus.Executed;
var executeActivity = GetNextExecuteActivity();
if (executeActivity == null)
{
var expiresAt = DateTime.UtcNow.AddSeconds(poleSagasOption.CompeletedSagaExpiredAfterSeconds);
await eventSender.SagaEnded(Id, expiresAt);
return result;
}
else
......@@ -239,7 +264,9 @@ namespace Pole.Sagas.Client
Errors = errors
};
var bytesContent = serializer.SerializeToUtf8Bytes(activityWapper.DataObj, activityWapper.ActivityDataType);
await eventSender.ActivityExecuteOvertime(activityId, activityWapper.Name, bytesContent, DateTime.UtcNow);
activityWapper.ActivityStatus = ActivityStatus.ExecutingOvertime;
await eventSender.ActivityExecuteOvertime(activityId);
// 超时的时候 需要首先补偿这个超时的操作
return await CompensateActivity(result, currentExecuteOrder + 1);
}
......@@ -251,9 +278,17 @@ namespace Pole.Sagas.Client
IsSuccess = false,
Errors = errors
};
activityWapper.ActivityStatus = ActivityStatus.ExecuteAborted;
await eventSender.ActivityExecuteAborted(activityId);
// 出错的时候 需要首先补偿这个出错的操作
return await CompensateActivity(result, currentExecuteOrder + 1);
var executeResult = await CompensateActivity(result, currentExecuteOrder + 1);
var expiresAt = DateTime.UtcNow.AddSeconds(poleSagasOption.CompeletedSagaExpiredAfterSeconds);
if (IsCompensated)
{
await eventSender.SagaEnded(Id, expiresAt);
}
return executeResult;
}
}
}
......
......@@ -5,6 +5,7 @@ using Pole.Sagas.Client.Abstraction;
using Pole.Sagas.Core;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace Pole.Sagas.Client
......@@ -29,9 +30,9 @@ namespace Pole.Sagas.Client
internal Saga CreateSaga(SagaEntity sagaEntity)
{
var saga = new Saga(snowflakeIdGenerator, serviceProvider, eventSender, poleSagasOption, serializer, activityFinder, sagaEntity.Id);
foreach (var activity in sagaEntity.ActivityEntities)
foreach (var activity in sagaEntity.ActivityEntities.OrderBy(m=>m.Order))
{
saga.AddActivity(activity.Name, activity.Status, activity.ParameterData, activity.Order);
saga.AddActivity(activity.Id,activity.Name, activity.Status, activity.ParameterData, activity.Order, activity.CompensateTimes,activity.OvertimeCompensateTimes);
}
return saga;
}
......
......@@ -17,7 +17,7 @@ using static Pole.Sagas.Server.Grpc.Saga;
namespace Pole.Sagas.Client
{
public class SagasCompensateRetryBackgroundService : IHostedService
public class SagasCompensateRetryBackgroundService : BackgroundService
{
private readonly PoleSagasOption options;
private readonly SagaClient sagaClient;
......@@ -33,25 +33,6 @@ namespace Pole.Sagas.Client
this.logger = logger;
}
public async Task StartAsync(CancellationToken cancellationToken)
{
while (true)
{
try
{
await GrpcGetSagasCore(cancellationToken);
}
catch (Exception ex)
{
logger.LogError(ex, "Errors in grpc get sagas");
}
finally
{
await Task.Delay(options.GrpcConnectFailRetryIntervalSeconds * 1000);
}
}
}
private async Task GrpcGetSagasCore(CancellationToken cancellationToken)
{
using (var stream = sagaClient.GetSagas(new Pole.Sagas.Server.Grpc.GetSagasRequest { Limit = options.PreSagasGrpcStreamingResponseLimitCount, ServiceName = options.ServiceName }))
......@@ -73,9 +54,9 @@ namespace Pole.Sagas.Client
CompensateTimes = n.CompensateTimes,
OvertimeCompensateTimes = n.ExecuteTimes,
Id = n.Id,
Name = n.Id,
Name = n.Name,
Order = n.Order,
ParameterData = n.ParameterData.ToByteArray(),
ParameterData = n.ParameterData,
SagaId = n.SagaId,
Status = n.Status
}).ToList();
......@@ -101,9 +82,24 @@ namespace Pole.Sagas.Client
}
}
public Task StopAsync(CancellationToken cancellationToken)
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
return Task.CompletedTask;
while (true)
{
try
{
await GrpcGetSagasCore(stoppingToken);
}
catch (Exception ex)
{
logger.LogError(ex, "Errors in grpc get sagas");
}
finally
{
await Task.Delay(options.GrpcConnectFailRetryIntervalSeconds * 1000);
}
}
}
}
}
......@@ -56,7 +56,7 @@ namespace Pole.Sagas.Server
await semaphoreSlim.WaitAsync();
if (Sagas.TryGetValue(serviceName, out List<SagaEntity> sagaList))
{
var result = sagaList.Take(limit);
var result = sagaList.Take(limit).ToList();
sagaList.RemoveAll(m => result.Select(n => n.Id).Contains(m.Id));
Sagas[serviceName] = sagaList;
return result;
......
......@@ -70,7 +70,7 @@ namespace Pole.Sagas.Server.Services
CommonResponse commonResponse = new CommonResponse();
try
{
await sagaStorage.ActivityExecuteOvertime(request.ActivityId, request.Name, request.ParameterData.ToByteArray(), Convert.ToDateTime(request.AddTime));
await sagaStorage.ActivityExecuteOvertime(request.ActivityId);
commonResponse.IsSuccess = true;
}
catch (Exception ex)
......@@ -84,7 +84,7 @@ namespace Pole.Sagas.Server.Services
CommonResponse commonResponse = new CommonResponse();
try
{
await sagaStorage.ActivityExecuting(request.ActivityId, request.ActivityName, request.SagaId, request.ParameterData.ToByteArray(), request.Order, Convert.ToDateTime(request.AddTime));
await sagaStorage.ActivityExecuting(request.ActivityId, request.ActivityName, request.SagaId, request.ParameterData, request.Order, Convert.ToDateTime(request.AddTime));
commonResponse.IsSuccess = true;
}
catch (Exception ex)
......@@ -139,12 +139,16 @@ namespace Pole.Sagas.Server.Services
{
while (!context.CancellationToken.IsCancellationRequested)
{
await Task.Delay(poleSagasServerOption.GetSagasGrpcStreamingResponseDelaySeconds*1000);
await Task.Delay(poleSagasServerOption.GetSagasGrpcStreamingResponseDelaySeconds * 1000);
GetSagasResponse getSagasResponse = new GetSagasResponse();
try
{
var sagaEntities = await sagasBuffer.GetSagas(request.ServiceName, request.Limit);
if (sagaEntities.Count() == 0)
{
continue;
}
var sagaDtoes = sagaEntities.Select(m =>
{
var result = new GetSagasResponse.Types.Saga
......@@ -156,9 +160,9 @@ namespace Pole.Sagas.Server.Services
CompensateTimes = n.CompensateTimes,
ExecuteTimes = n.OvertimeCompensateTimes,
Id = n.Id,
Name = n.Id,
Name = n.Name,
Order = n.Order,
ParameterData = ByteString.CopyFrom(n.ParameterData),
ParameterData = n.ParameterData,
SagaId = n.SagaId,
Status = n.Status
}));
......
......@@ -6,14 +6,14 @@ namespace Pole.Sagas.Storage.PostgreSql
{
public class ActivityAndSagaEntity
{
public string Id { get; set; }
public string ActivityId { get; set; }
public string SagaId { get; set; }
public string ServiceName { get; set; }
public int Order { get; set; }
public string Status { get; set; }
public byte[] ParameterData { get; set; }
public string ParameterData { get; set; }
public int OvertimeCompensateTimes { get; set; }
public int CompensateTimes { get; set; }
public int Name { get; set; }
public string ActivityName { get; set; }
}
}
......@@ -29,10 +29,11 @@ namespace Pole.Sagas.Storage.PostgreSql
{
using (var connection = new NpgsqlConnection(poleSagasStoragePostgreSqlOption.ConnectionString))
{
await connection.OpenAsync();
using (var tansaction = await connection.BeginTransactionAsync())
{
var updateActivitySql =
$"UPDATE {activityTableName} SET \"Status\"=@Status,\"Errors\"=@Errors, \"CompensateTimes\"=\"CompensateTimes\"+1 WHERE \"Id\" = @Id";
$"UPDATE {activityTableName} SET \"Status\"=@Status,\"CompensateErrors\"=@Errors, \"CompensateTimes\"=\"CompensateTimes\"+1 WHERE \"Id\" = @Id";
await connection.ExecuteAsync(updateActivitySql, new
{
Id = activityId,
......@@ -42,7 +43,7 @@ $"UPDATE {activityTableName} SET \"Status\"=@Status,\"Errors\"=@Errors, \"Compen
if (!string.IsNullOrEmpty(sagaId))
{
var updateSagaSql =
$"UPDATE {sagaTableName} SET \"Status\"=@Status,\"Errors\"=@Errors WHERE \"Id\" = @Id";
$"UPDATE {sagaTableName} SET \"Status\"=@Status WHERE \"Id\" = @Id";
await connection.ExecuteAsync(updateSagaSql, new
{
Id = sagaId,
......@@ -83,7 +84,7 @@ $"UPDATE {activityTableName} SET \"Status\"=@Status WHERE \"Id\" = @Id";
}
}
public async Task ActivityExecuteOvertime(string activityId, string name, byte[] parameterData, DateTime addTime)
public async Task ActivityExecuteOvertime(string activityId)
{
using (var connection = new NpgsqlConnection(poleSagasStoragePostgreSqlOption.ConnectionString))
{
......@@ -97,19 +98,20 @@ $"UPDATE {activityTableName} SET \"Status\"=@Status WHERE \"Id\" = @Id";
}
}
public async Task ActivityExecuting(string activityId, string activityName, string sagaId, byte[] ParameterData, int order, DateTime addTime)
public async Task ActivityExecuting(string activityId, string activityName, string sagaId, string ParameterData, int order, DateTime addTime)
{
using (var connection = new NpgsqlConnection(poleSagasStoragePostgreSqlOption.ConnectionString))
{
string sql =
$"INSERT INTO {activityTableName} (\"Id\",\"Name\",\"SagaId\",\"Status\",\"ParameterData\",\"OvertimeCompensateTimes\",\"CompensateTimes\",\"AddTime\")" +
$"VALUES(@Id,@Name,@SagaId,@Status,@ParameterData,@OvertimeCompensateTimes,@CompensateTimes,@AddTime);";
$"INSERT INTO {activityTableName} (\"Id\",\"Name\",\"SagaId\",\"Status\",\"Order\",\"ParameterData\",\"OvertimeCompensateTimes\",\"CompensateTimes\",\"AddTime\")" +
$"VALUES(@Id,@Name,@SagaId,@Status,@Order,@ParameterData,@OvertimeCompensateTimes,@CompensateTimes,@AddTime);";
_ = await connection.ExecuteAsync(sql, new
{
Id = activityId,
Name = activityName,
SagaId = sagaId,
Status = nameof(ActivityStatus.Executing),
Order = order,
ExecutingOvertimeRetries = 0,
ParameterData = ParameterData,
OvertimeCompensateTimes = 0,
......@@ -143,7 +145,7 @@ $"UPDATE {sagaTableName} SET \"Status\"=@Status ,\"ExpiresAt\"=@ExpiresAt WHERE
{
Id = sagaId,
ExpiresAt = ExpiresAt,
Status = nameof(ActivityStatus.Revoked)
Status = nameof(SagaStatus.Ended)
});
}
}
......@@ -160,7 +162,7 @@ $"INSERT INTO {sagaTableName} (\"Id\",\"ServiceName\",\"Status\",\"AddTime\")" +
Id = sagaId,
AddTime = addTime,
ServiceName = serviceName,
Status = nameof(ActivityStatus.Revoked)
Status = nameof(SagaStatus.Started)
});
}
}
......@@ -170,7 +172,7 @@ $"INSERT INTO {sagaTableName} (\"Id\",\"ServiceName\",\"Status\",\"AddTime\")" +
using (var connection = new NpgsqlConnection(poleSagasStoragePostgreSqlOption.ConnectionString))
{
var sql =
$"select limit_sagas.\"Id\" as SagaId,limit_sagas.\"ServiceName\",activities.\"Id\" as ActivityId,activities.\"Order\",activities.\"Status\",activities.\"ParameterData\",activities.\"OvertimeCompensateTimes\",activities.\"CompensateTimes\",activities.\"Name\" from {activityTableName} as activities inner join(select \"Id\",\"ServiceName\" from {sagaTableName} where \"AddTime\" <= @AddTime and \"Status\" = '{nameof(SagaStatus.Started)}' limit @Limit ) as limit_sagas on activities.\"SagaId\" = limit_sagas.\"Id\" and activities.\"Status\" != @Status1 and activities.\"Status\" != @Status2";
$"select limit_sagas.\"Id\" as SagaId,limit_sagas.\"ServiceName\",activities.\"Id\" as ActivityId,activities.\"Order\",activities.\"Status\",activities.\"ParameterData\",activities.\"OvertimeCompensateTimes\",activities.\"CompensateTimes\",activities.\"Name\" as ActivityName from {activityTableName} as activities inner join(select \"Id\",\"ServiceName\" from {sagaTableName} where \"AddTime\" <= @AddTime and \"Status\" = '{nameof(SagaStatus.Started)}' limit @Limit ) as limit_sagas on activities.\"SagaId\" = limit_sagas.\"Id\" and activities.\"Status\" != @Status1 and activities.\"Status\" != @Status2";
var activities = await connection.QueryAsync<ActivityAndSagaEntity>(sql, new
{
AddTime = dateTime,
......@@ -198,11 +200,12 @@ $"select limit_sagas.\"Id\" as SagaId,limit_sagas.\"ServiceName\",activities.\"I
{
CompensateTimes = activity.CompensateTimes,
OvertimeCompensateTimes = activity.OvertimeCompensateTimes,
Id = activity.Id,
Id = activity.ActivityId,
Order = activity.Order,
ParameterData = activity.ParameterData,
SagaId = activity.SagaId,
Status = activity.Status,
Name= activity.ActivityName
};
sagaEntity.ActivityEntities.Add(activityEntity);
}
......
......@@ -63,9 +63,9 @@ CREATE TABLE IF NOT EXISTS {GetActivityTableName()}(
""Name"" varchar(255) COLLATE ""pg_catalog"".""default"" NOT NULL,
""SagaId"" varchar(20) COLLATE ""pg_catalog"".""default"" NOT NULL,
""Order"" int4 NOT NULL,
""Status"" varchar(10) COLLATE ""pg_catalog"".""default"" NOT NULL,
""Status"" varchar(20) COLLATE ""pg_catalog"".""default"" NOT NULL,
""OvertimeCompensateTimes"" int4 NOT NULL,
""ParameterData"" bytea NOT NULL,
""ParameterData"" text NOT NULL,
""CompensateErrors"" varchar(1024) COLLATE ""pg_catalog"".""default"",
""CompensateTimes"" int4 NOT NULL,
""AddTime"" timestamp NOT NULL,
......
......@@ -12,11 +12,11 @@ namespace Pole.Sagas.Core.Abstraction
{
Task SagaStarted(string sagaId, string serviceName,DateTime addTime);
Task SagaEnded(string sagaId, DateTime ExpiresAt);
Task ActivityExecuting(string activityId, string activityName,string sagaId, byte[] ParameterData, int order,DateTime addTime);
Task ActivityExecuting(string activityId, string activityName,string sagaId, string ParameterData, int order,DateTime addTime);
Task ActivityExecuteAborted(string activityId);
Task ActivityCompensateAborted(string activityId, string sagaId, string errors);
Task ActivityCompensated(string activityId);
Task ActivityExecuteOvertime(string activityId, string name, byte[] parameterData, DateTime addTime);
Task ActivityExecuteOvertime(string activityId);
Task ActivityRevoked(string activityId);
IAsyncEnumerable<SagasGroupEntity> GetSagas(DateTime dateTime, int limit);
Task<int> DeleteExpiredData(string tableName,DateTime ExpiredAt, int batchCount);
......
......@@ -11,12 +11,8 @@ namespace Pole.Sagas.Core
public string SagaId { get; set; }
public int Order { get; set; }
public string Status { get; set; }
public int TimeOutSeconds { get; set; }
public Byte[] ParameterData { get; set; }
public Byte[] ResultData { get; set; }
public string Errors { get; set; }
public string ParameterData { get; set; }
public int OvertimeCompensateTimes { get; set; }
public int CompensateTimes { get; set; }
public DateTime AddTime { get; set; }
}
}
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.Sagas.Core.Exceptions
{
public class CompensateActivityException : Exception
{
public CompensateActivityException(string activityName,Exception innerException):base($"activity: {activityName} compensate error", innerException)
{
}
}
}
......@@ -8,7 +8,7 @@ namespace Pole.Sagas.Core
{
public string Id { get; set; }
public string ServiceName { get; set; }
public List<ActivityEntity> ActivityEntities { get; set; }
public List<ActivityEntity> ActivityEntities { get; set; } = new List<ActivityEntity>();
public string Status { get; set; }
public DateTime? ExpiresAt { get; set; }
public DateTime AddTime { get; set; }
......
......@@ -8,6 +8,6 @@ namespace Pole.Sagas.Core
public class SagasGroupEntity
{
public string ServiceName { get; set; }
public List<SagaEntity> SagaEntities { get; set; }
public List<SagaEntity> SagaEntities { get; set; } = new List<SagaEntity>();
}
}
......@@ -34,7 +34,7 @@ message SagaEndedRequest {
message ActivityExecutingRequest {
string activityId = 1;
string sagaId = 2;
bytes parameterData = 3;
string parameterData = 3;
int32 order = 4;
string addTime = 5;
string activityName = 6;
......@@ -55,9 +55,6 @@ message ActivityCompensatedRequest {
}
message ActivityExecuteOvertimeRequest {
string activityId = 1;
string name = 2;
bytes parameterData = 3;
string addTime = 4;
}
message ActivityRevokedRequest {
string activityId = 1;
......@@ -82,7 +79,7 @@ message GetSagasResponse{
string sagaId = 2;
int32 order = 3;
string status = 4;
bytes parameterData = 5;
string parameterData = 5;
int32 executeTimes = 6;
int32 compensateTimes = 7;
string name = 8;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment