Commit 92eeb084 by dingsongjie

完善功能

parent 74cb469b
Showing with 293 additions and 80 deletions
using Pole.Sagas.Core;
using Pole.Sagas.Client.Abstraction;
using Pole.Sagas.Core;
using Pole.Sagas.Core.Abstraction;
using System;
using System.Collections.Generic;
......
using Pole.Sagas.Core;
using Pole.Sagas.Client.Abstraction;
using Pole.Sagas.Core;
using Pole.Sagas.Core.Abstraction;
using System;
using System.Collections.Generic;
......
using Pole.Sagas.Core;
using Pole.Sagas.Client.Abstraction;
using Pole.Sagas.Core;
using Pole.Sagas.Core.Abstraction;
using System;
using System.Collections.Generic;
......
using Pole.Sagas.Core;
using Pole.Sagas.Client.Abstraction;
using Pole.Sagas.Core;
using Pole.Sagas.Core.Abstraction;
using System;
using System.Collections.Generic;
......
using Pole.Sagas.Core;
using Pole.Sagas.Client.Abstraction;
using Pole.Sagas.Core;
using Pole.Sagas.Core.Abstraction;
using System;
using System.Collections.Generic;
......
using Pole.Sagas.Core;
using Pole.Sagas.Client.Abstraction;
using Pole.Sagas.Core;
using Pole.Sagas.Core.Abstraction;
using System;
using System.Collections.Generic;
......
using Pole.Sagas.Core;
using Pole.Sagas.Client.Abstraction;
using Pole.Sagas.Core;
using Pole.Sagas.Core.Abstraction;
using System;
using System.Collections.Generic;
......
using Pole.Sagas.Core;
using Pole.Sagas.Client.Abstraction;
using Pole.Sagas.Core;
using Pole.Sagas.Core.Abstraction;
using System;
using System.Collections.Generic;
......
using Pole.Sagas.Core;
using Pole.Sagas.Client.Abstraction;
using Pole.Sagas.Core;
using Pole.Sagas.Core.Abstraction;
using System;
using System.Collections.Generic;
......
using Pole.Sagas.Core;
using Pole.Sagas.Client.Abstraction;
using Pole.Sagas.Core;
using Pole.Sagas.Core.Abstraction;
using System;
using System.Collections.Generic;
......
using Pole.Sagas.Core;
using Pole.Sagas.Client.Abstraction;
using Pole.Sagas.Core;
using Pole.Sagas.Core.Abstraction;
using System;
using System.Collections.Generic;
......
......@@ -3,6 +3,7 @@ using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Pole.Sagas.Client.Abstraction;
using Pole.Sagas.Core.Abstraction;
using SagasTest.Api.Activities;
......
using System;
using Pole.Sagas.Core;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Pole.Sagas.Core.Abstraction
namespace Pole.Sagas.Client.Abstraction
{
public interface IActivity<TData>
{
......
......@@ -2,7 +2,7 @@
using System.Collections.Generic;
using System.Text;
namespace Pole.Sagas.Core.Abstraction
namespace Pole.Sagas.Client.Abstraction
{
public interface IActivityFinder
{
......
using System;
using System.Threading.Tasks;
namespace Pole.Sagas.Core.Abstraction
namespace Pole.Sagas.Client.Abstraction
{
public interface IEventSender
{
......
using System;
using Pole.Sagas.Core;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
namespace Pole.Sagas.Core
namespace Pole.Sagas.Client.Abstraction
{
public interface ISaga
{
......
......@@ -2,7 +2,7 @@
using System.Collections.Generic;
using System.Text;
namespace Pole.Sagas.Core.Abstraction
namespace Pole.Sagas.Client.Abstraction
{
public interface ISagaFactory
{
......
using Microsoft.Extensions.Logging;
using Pole.Core.Utils;
using Pole.Sagas.Client.Abstraction;
using Pole.Sagas.Core.Abstraction;
using Pole.Sagas.Core.Exceptions;
using System;
......@@ -8,7 +9,7 @@ using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace Pole.Sagas.Core
namespace Pole.Sagas.Client
{
public class ActivityFinder : IActivityFinder
{
......
using Microsoft.Extensions.DependencyInjection;
using Pole.Sagas.Core;
using System;
using System.Collections.Generic;
using System.Linq.Expressions;
......@@ -6,7 +7,7 @@ using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Pole.Sagas.Core
namespace Pole.Sagas.Client
{
public class ActivityWapper
{
......
using Grpc.Net.Client;
using Microsoft.Extensions.Options;
using Pole.Sagas.Client.Abstraction;
using Pole.Sagas.Core.Abstraction;
using Pole.Sagas.Core.Exceptions;
using System;
......@@ -8,7 +9,7 @@ using System.Text;
using System.Threading.Tasks;
using static Pole.Sagas.Server.Grpc.Saga;
namespace Pole.Sagas.Core
namespace Pole.Sagas.Client
{
public class EventSender : IEventSender
{
......
using Grpc.Core;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Options;
using Pole.Core.Serialization;
using Pole.Core.Utils.Abstraction;
using Pole.Sagas.Client.Abstraction;
using Pole.Sagas.Core;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using static Pole.Sagas.Server.Grpc.Saga;
namespace Pole.Sagas.Client
{
public class NotEndedSagasCompensateRetryBackgroundService : IHostedService
{
private readonly PoleSagasOption options;
private readonly SagaClient sagaClient;
private readonly SagaRestorer sagaRestorer;
public NotEndedSagasCompensateRetryBackgroundService(IOptions<PoleSagasOption> options, SagaClient sagaClient, IServiceProvider serviceProvider)
{
this.options = options.Value;
this.sagaClient = sagaClient;
sagaRestorer = new SagaRestorer(serviceProvider.GetRequiredService<ISnowflakeIdGenerator>(), serviceProvider, serviceProvider.GetRequiredService<IEventSender>(), this.options, serviceProvider.GetRequiredService<ISerializer>(), serviceProvider.GetRequiredService<IActivityFinder>());
}
public async Task StartAsync(CancellationToken cancellationToken)
{
using (var stream = sagaClient.GetSagas(new Pole.Sagas.Server.Grpc.GetSagasRequest { Limit = options.PreSagasGrpcStreamingResponseLimitCount, ServiceName = options.ServiceName }))
{
while (await stream.ResponseStream.MoveNext(cancellationToken))
{
if (stream.ResponseStream.Current.IsSuccess)
{
var sagas = stream.ResponseStream.Current.Sagas.Select(m =>
{
var result = new SagaEntity
{
Id = m.Id,
};
result.ActivityEntities = m.Activities.Select(n => new ActivityEntity
{
CompensateTimes = n.CompensateTimes,
ExecuteTimes = n.ExecuteTimes,
Id = n.Id,
Name = n.Id,
Order = n.Order,
ParameterData = n.ParameterData.ToByteArray(),
SagaId = n.SagaId,
Status = n.Status
}).ToList();
return result;
}).ToList();
sagas.ForEach(async sagaEntity =>
{
var saga = sagaRestorer.CreateSaga(sagaEntity);
await saga.Compensate();
});
}
}
//await foreach (var getSagasResponse in stream.ResponseStream.ReadAllAsync(cancellationToken))
//{
// if (getSagasResponse.IsSuccess)
// {
// var sagas = getSagasResponse.Sagas.Select(m =>
// {
// var result = new SagaEntity
// {
// Id = m.Id,
// };
// result.ActivityEntities = m.Activities.Select(n => new ActivityEntity
// {
// CompensateTimes = n.CompensateTimes,
// ExecuteTimes = n.ExecuteTimes,
// Id = n.Id,
// Name = n.Id,
// Order = n.Order,
// ParameterData = n.ParameterData.ToByteArray(),
// SagaId = n.SagaId,
// Status = n.Status
// }).ToList();
// return result;
// });
// }
//}
}
}
public Task StopAsync(CancellationToken cancellationToken)
{
throw new NotImplementedException();
}
}
}
......@@ -2,10 +2,13 @@
using System.Collections.Generic;
using System.Linq;
using System.Text;
using Grpc.Core;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using Pole.Core;
using Pole.Core.Utils;
using Pole.Sagas.Client;
using Pole.Sagas.Client.Abstraction;
using Pole.Sagas.Core;
using Pole.Sagas.Core.Abstraction;
using Pole.Sagas.Core.Exceptions;
......@@ -21,14 +24,20 @@ namespace Microsoft.Extensions.DependencyInjection
startupOption.Services.AddSingleton<IActivityFinder, ActivityFinder>();
startupOption.Services.AddSingleton<IEventSender, EventSender>();
startupOption.Services.AddSingleton<ISagaFactory, SagaFactory>();
using(var provider = startupOption.Services.BuildServiceProvider())
PoleSagasOption sagasOption = null;
using (var provider = startupOption.Services.BuildServiceProvider())
{
var sagasOption = provider.GetRequiredService<IOptions<PoleSagasOption>>().Value;
sagasOption = provider.GetRequiredService<IOptions<PoleSagasOption>>().Value;
startupOption.Services.AddGrpcClient<SagaClient>(o =>
{
o.Address = new Uri(sagasOption.SagasServerHost);
});
}
RegisterActivities(startupOption);
}
private static void RegisterActivities(StartupConfig startupOption)
{
var baseActivityType = typeof(IActivity<>);
foreach (var assembly in AssemblyHelper.GetAssemblies())
{
......
......@@ -2,11 +2,12 @@
using System.Collections.Generic;
using System.Text;
namespace Pole.Sagas.Core
namespace Pole.Sagas.Client
{
public class PoleSagasOption
{
public string ServiceName { get; set; }
public int PreSagasGrpcStreamingResponseLimitCount { get; set; } = 20;
public int CompeletedSagaExpiredAfterSeconds { get; set; } = 60 * 10;
public int SagasTimeOutSeconds { get; set; } = 60;
public string SagasServerHost { get; set; }
......
using Pole.Core.Serialization;
using Pole.Core.Utils.Abstraction;
using Pole.Sagas.Client.Abstraction;
using Pole.Sagas.Core;
using Pole.Sagas.Core.Abstraction;
using Pole.Sagas.Core.Exceptions;
using System;
......@@ -9,7 +11,7 @@ using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Pole.Sagas.Core
namespace Pole.Sagas.Client
{
public class Saga : ISaga
{
......@@ -44,7 +46,7 @@ namespace Pole.Sagas.Core
this.activityFinder = activityFinder;
Id = snowflakeIdGenerator.NextId();
}
internal Saga(ISnowflakeIdGenerator snowflakeIdGenerator, IServiceProvider serviceProvider, IEventSender eventSender, PoleSagasOption poleSagasOption, ISerializer serializer, IActivityFinder activityFinder, int currentExecuteOrder, int currentCompensateOrder, List<ActivityWapper> activities)
internal Saga(ISnowflakeIdGenerator snowflakeIdGenerator, IServiceProvider serviceProvider, IEventSender eventSender, PoleSagasOption poleSagasOption, ISerializer serializer, IActivityFinder activityFinder, string id)
{
this.snowflakeIdGenerator = snowflakeIdGenerator;
this.serviceProvider = serviceProvider;
......@@ -52,10 +54,8 @@ namespace Pole.Sagas.Core
this.poleSagasOption = poleSagasOption;
this.serializer = serializer;
this.activityFinder = activityFinder;
Id = snowflakeIdGenerator.NextId();
this.currentExecuteOrder = currentExecuteOrder;
this.currentCompensateOrder = currentCompensateOrder;
this.activities = activities;
Id = id;
this.currentExecuteOrder = -1;
}
public void AddActivity(string activityName, object data, int timeOutSeconds = 2)
......@@ -81,6 +81,29 @@ namespace Pole.Sagas.Core
};
activities.Add(activityWapper);
}
internal void AddActivity(string activityName, string activityStatus, object data, int order, int timeOutSeconds = 2)
{
var targetActivityType = activityFinder.FindType(activityName);
var activityInterface = targetActivityType.GetInterfaces().FirstOrDefault();
if (!activityInterface.IsGenericType)
{
throw new ActivityNotFoundWhenCompensateRetryException(activityName);
}
var dataType = activityInterface.GetGenericArguments()[0];
ActivityWapper activityWapper = new ActivityWapper
{
Name = activityName,
ActivityDataType = dataType,
ActivityStatus = (ActivityStatus)Enum.Parse(typeof(ActivityStatus), activityStatus),
ActivityType = targetActivityType,
DataObj = data,
Order = order,
ServiceProvider = serviceProvider,
TimeOutSeconds = 2,
};
activities.Add(activityWapper);
}
public async Task<SagaResult> GetResult()
{
......@@ -95,6 +118,16 @@ namespace Pole.Sagas.Core
var result = await RecursiveExecuteActivity(executeActivity);
return result;
}
internal async Task Compensate()
{
this.currentCompensateOrder = CurrentMaxOrder+1;
var compensateActivity = GetNextCompensateActivity();
if (compensateActivity == null)
{
return ;
}
await RecursiveCompensateActivity(compensateActivity);
}
private ActivityWapper GetNextExecuteActivity()
{
......@@ -174,7 +207,7 @@ namespace Pole.Sagas.Core
Errors = errors
};
var bytesContent = serializer.SerializeToUtf8Bytes(activityWapper.DataObj, activityWapper.ActivityDataType);
await eventSender.ActivityExecuteOvertime(activityId, activityWapper.Name, bytesContent,DateTime.UtcNow);
await eventSender.ActivityExecuteOvertime(activityId, activityWapper.Name, bytesContent, DateTime.UtcNow);
// 超时的时候 需要首先补偿这个超时的操作
return await CompensateActivity(result, currentExecuteOrder + 1);
}
......
using Microsoft.Extensions.Options;
using Pole.Core.Serialization;
using Pole.Core.Utils.Abstraction;
using Pole.Sagas.Client.Abstraction;
using Pole.Sagas.Core;
using Pole.Sagas.Core.Abstraction;
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.Sagas.Core
namespace Pole.Sagas.Client
{
public class SagaFactory : ISagaFactory
{
......@@ -30,9 +32,5 @@ namespace Pole.Sagas.Core
{
return new Saga(snowflakeIdGenerator, serviceProvider, eventSender, poleSagasOption, serializer, activityFinder);
}
internal ISaga CreateSaga(string id)
{
return new Saga(snowflakeIdGenerator, serviceProvider, eventSender, poleSagasOption, serializer, activityFinder);
}
}
}
using Microsoft.Extensions.Options;
using Pole.Core.Serialization;
using Pole.Core.Utils.Abstraction;
using Pole.Sagas.Client.Abstraction;
using Pole.Sagas.Core;
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.Sagas.Client
{
class SagaRestorer
{
private readonly ISnowflakeIdGenerator snowflakeIdGenerator;
private readonly IServiceProvider serviceProvider;
private readonly IEventSender eventSender;
private readonly PoleSagasOption poleSagasOption;
private readonly ISerializer serializer;
private readonly IActivityFinder activityFinder;
public SagaRestorer(ISnowflakeIdGenerator snowflakeIdGenerator, IServiceProvider serviceProvider, IEventSender eventSender, PoleSagasOption poleSagasOption, ISerializer serializer, IActivityFinder activityFinder)
{
this.snowflakeIdGenerator = snowflakeIdGenerator;
this.serviceProvider = serviceProvider;
this.eventSender = eventSender;
this.poleSagasOption = poleSagasOption;
this.serializer = serializer;
this.activityFinder = activityFinder;
}
internal Saga CreateSaga(SagaEntity sagaEntity)
{
var saga = new Saga(snowflakeIdGenerator, serviceProvider, eventSender, poleSagasOption, serializer, activityFinder, sagaEntity.Id);
foreach (var activity in sagaEntity.ActivityEntities)
{
saga.AddActivity(activity.Name, activity.Status, activity.ParameterData, activity.Order);
}
return saga;
}
}
}
......@@ -9,7 +9,7 @@ namespace Pole.Sagas.Server
{
public interface ISagasBuffer
{
Task<IEnumerable<SagaEntity>> GetSagas(string serviceName, DateTime dateTime, int limit);
Task<IEnumerable<SagaEntity>> GetSagas(string serviceName, int limit);
Task<bool> AddSagas(IAsyncEnumerable<SagasGroupEntity> sagasGroupEntities);
}
}
......@@ -6,7 +6,8 @@ namespace Pole.Sagas.Server
{
public class PoleSagasServerOption
{
public int NotEndedSagasFetchIntervalSeconds { get; set; } = 10;
public int NotEndedSagasFetchIntervalSeconds { get; set; } = 30;
public int GetSagasGrpcStreamingResponseDelaySeconds { get; set; } = 20;
public int ExpiredDataBulkDeleteIntervalSeconds { get; set; } = 10*60;
public int ExpiredDataDeleteBatchCount { get; set; } = 1000;
public int ExpiredDataPreBulkDeleteDelaySeconds { get; set; } = 3;
......
......@@ -2,6 +2,7 @@
using Microsoft.Extensions.Options;
using Pole.Core.Processor;
using Pole.Sagas.Core;
using Pole.Sagas.Core.Abstraction;
using System;
using System.Collections.Generic;
using System.Text;
......@@ -45,7 +46,7 @@ namespace Pole.Sagas.Server.Processor
private async Task ProcessInternal()
{
var addTimeFilter = DateTime.UtcNow.AddMinutes(-4);
var sagas = sagaStorage.GetSagas(addTimeFilter, 500);
var sagas = sagaStorage.GetSagas(addTimeFilter, 2000);
await sagasBuffer.AddSagas(sagas);
}
}
......
......@@ -49,7 +49,7 @@ namespace Pole.Sagas.Server
}
}
public async Task<IEnumerable<SagaEntity>> GetSagas(string serviceName, DateTime dateTime, int limit)
public async Task<IEnumerable<SagaEntity>> GetSagas(string serviceName, int limit)
{
try
{
......
using Google.Protobuf;
using Grpc.Core;
using Microsoft.Extensions.Options;
using Pole.Sagas.Core;
using Pole.Sagas.Core.Abstraction;
using Pole.Sagas.Server.Grpc;
using System;
using System.Collections.Generic;
......@@ -14,10 +16,12 @@ namespace Pole.Sagas.Server.Services
{
private readonly ISagaStorage sagaStorage;
private readonly ISagasBuffer sagasBuffer;
public SagaService(ISagaStorage sagaStorage, ISagasBuffer sagasBuffer)
private readonly PoleSagasServerOption poleSagasServerOption;
public SagaService(ISagaStorage sagaStorage, ISagasBuffer sagasBuffer, IOptions<PoleSagasServerOption> poleSagasServerOption)
{
this.sagaStorage = sagaStorage;
this.sagasBuffer = sagasBuffer;
this.poleSagasServerOption = poleSagasServerOption.Value;
}
public override async Task<CommonResponse> ActivityCompensateAborted(ActivityCompensateAbortedRequest request, ServerCallContext context)
{
......@@ -159,40 +163,46 @@ namespace Pole.Sagas.Server.Services
}
return commonResponse;
}
public override async Task<GetSagasResponse> GetSagas(GetSagasRequest request, ServerCallContext context)
public override async Task GetSagas(GetSagasRequest request, IServerStreamWriter<GetSagasResponse> responseStream, ServerCallContext context)
{
GetSagasResponse getSagasResponse = new GetSagasResponse();
try
while (!context.CancellationToken.IsCancellationRequested)
{
var sagaEntities = await sagasBuffer.GetSagas(request.ServiceName, Convert.ToDateTime(request.AddTime), request.Limit);
var sagaDtoes = sagaEntities.Select(m =>
await Task.Delay(poleSagasServerOption.GetSagasGrpcStreamingResponseDelaySeconds*1000);
GetSagasResponse getSagasResponse = new GetSagasResponse();
try
{
var result = new GetSagasResponse.Types.Saga
var sagaEntities = await sagasBuffer.GetSagas(request.ServiceName, request.Limit);
var sagaDtoes = sagaEntities.Select(m =>
{
Id = m.Id,
};
result.Activities.Add(m.ActivityEntities.Select(n => new GetSagasResponse.Types.Saga.Types.Activity
{
CompensateTimes = n.CompensateTimes,
ExecuteTimes = n.ExecuteTimes,
Id = n.Id,
Name = n.Id,
Order = n.Order,
ParameterData = ByteString.CopyFrom(n.ParameterData),
SagaId = n.SagaId,
Status = n.Status
}));
return result;
});
getSagasResponse.Sagas.Add(sagaDtoes);
getSagasResponse.IsSuccess = true;
}
catch (Exception ex)
{
getSagasResponse.Errors = CombineError(ex);
var result = new GetSagasResponse.Types.Saga
{
Id = m.Id,
};
result.Activities.Add(m.ActivityEntities.Select(n => new GetSagasResponse.Types.Saga.Types.Activity
{
CompensateTimes = n.CompensateTimes,
ExecuteTimes = n.ExecuteTimes,
Id = n.Id,
Name = n.Id,
Order = n.Order,
ParameterData = ByteString.CopyFrom(n.ParameterData),
SagaId = n.SagaId,
Status = n.Status
}));
return result;
});
getSagasResponse.Sagas.Add(sagaDtoes);
getSagasResponse.IsSuccess = true;
}
catch (Exception ex)
{
getSagasResponse.Errors = CombineError(ex);
}
await responseStream.WriteAsync(getSagasResponse);
}
return getSagasResponse;
}
private string CombineError(Exception exception)
{
return exception.InnerException != null ? exception.InnerException.Message + exception.StackTrace : exception.Message + exception.StackTrace;
......
......@@ -34,7 +34,7 @@ namespace Pole.Sagas.Storage.PostgreSql
using (var tansaction = await connection.BeginTransactionAsync())
{
var updateActivitySql =
$"UPDATE {activityTableName} SET \"Status\"=@Status,\"Errors\"=@Errors WHERE \"Id\" = @Id";
$"UPDATE {activityTableName} SET \"Status\"=@Status,\"Errors\"=@Errors, \"CompensateTimes\"=\"CompensateTimes\"+1 WHERE \"Id\" = @Id";
await connection.ExecuteAsync(updateActivitySql, new
{
Id = activityId,
......@@ -230,11 +230,12 @@ $"UPDATE {activityTableName} SET \"Status\"=@Status ,\"CompensateTimes\"=@Compen
using (var connection = new NpgsqlConnection(poleSagasStoragePostgreSqlOption.ConnectionString))
{
var updateActivitySql =
$"select limit_sagas.\"Id\" as SagaId,limit_sagas.\"ServiceName\",activities.\"Id\" as ActivityId,activities.\"Order\",activities.\"Status\",activities.\"ParameterData\",activities.\"ExecuteTimes\",activities.\"CompensateTimes\",activities.\"Name\" from \"Activities\" as activities inner join(select \"Id\",\"ServiceName\" from \"Sagas\" where \"AddTime\" <= @AddTime and \"Status\" = '{nameof(SagaStatus.Started)}' limit @Limit ) as limit_sagas on activities.\"SagaId\" = limit_sagas.\"Id\"";
$"select limit_sagas.\"Id\" as SagaId,limit_sagas.\"ServiceName\",activities.\"Id\" as ActivityId,activities.\"Order\",activities.\"Status\",activities.\"ParameterData\",activities.\"ExecuteTimes\",activities.\"CompensateTimes\",activities.\"Name\" from \"Activities\" as activities inner join(select \"Id\",\"ServiceName\" from \"Sagas\" where \"AddTime\" <= @AddTime and \"Status\" = '{nameof(SagaStatus.Started)}' limit @Limit ) as limit_sagas on activities.\"SagaId\" = limit_sagas.\"Id\" and activities.\"Status\" != @Status ";
var activities = await connection.QueryAsync<ActivityAndSagaEntity>(updateActivitySql, new
{
AddTime = dateTime,
Limit = limit,
Status = nameof(ActivityStatus.Compensated)
});
var groupedByServiceNameActivities = activities.GroupBy(m => m.ServiceName);
foreach (var groupedByServiceName in groupedByServiceNameActivities)
......@@ -271,7 +272,7 @@ $"select limit_sagas.\"Id\" as SagaId,limit_sagas.\"ServiceName\",activities.\"I
}
}
public Task<int> DeleteExpiredData(string tableName, DateTime ExpiredAt, int batchCount)
public Task<int> DeleteExpiredData(string tableName, DateTime ExpiredAt, int batchCount)
{
using (var connection = new NpgsqlConnection(poleSagasStoragePostgreSqlOption.ConnectionString))
{
......
......@@ -6,7 +6,7 @@ using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
namespace Pole.Sagas.Core
namespace Pole.Sagas.Core.Abstraction
{
public interface ISagaStorage
{
......
......@@ -7,6 +7,7 @@ namespace Pole.Sagas.Core
public class ActivityEntity
{
public string Id { get; set; }
public string Name { get; set; }
public string SagaId { get; set; }
public int Order { get; set; }
public string Status { get; set; }
......
......@@ -4,7 +4,7 @@ using System.Text;
namespace Pole.Sagas.Core.Exceptions
{
class ActivityImplementIrregularException: Exception
public class ActivityImplementIrregularException : Exception
{
public ActivityImplementIrregularException(string name) : base($"Activity name :{name }must have and only inherit from IActivity<>")
{
......
......@@ -2,9 +2,13 @@
using System.Collections.Generic;
using System.Text;
namespace Pole.Sagas.Core.Abstraction
namespace Pole.Sagas.Core.Exceptions
{
public interface ISagaInvoker
public class ActivityNotFoundWhenCompensateRetryException : Exception
{
public ActivityNotFoundWhenCompensateRetryException(string activityName):base($"Activity:{activityName} NotFound When Compensate Retry")
{
}
}
}
......@@ -8,7 +8,7 @@ namespace Pole.Sagas.Core
{
public string Id { get; set; }
public string ServiceName { get; set; }
public List<ActivityEntity> ActivityEntities { get; set; }
public List<ActivityEntity> ActivityEntities { get; set; }
public string Status { get; set; }
public DateTime? ExpiresAt { get; set; }
public DateTime AddTime { get; set; }
......
......@@ -15,7 +15,7 @@ service Saga {
rpc ActivityExecuteOvertime (ActivityExecuteOvertimeRequest) returns (CommonResponse);
rpc ActivityRevoked (ActivityRevokedRequest) returns (CommonResponse);
rpc ActivityCompensating (ActivityCompensatingRequest) returns (CommonResponse);
rpc GetSagas (GetSagasRequest) returns (GetSagasResponse);
rpc GetSagas (GetSagasRequest) returns (stream GetSagasResponse);
}
message CommonResponse{
......@@ -70,8 +70,7 @@ message ActivityCompensatingRequest {
}
message GetSagasRequest{
string serviceName = 1;
string addTime = 2;
int32 limit = 3;
int32 limit = 2;
}
message GetSagasResponse{
bool isSuccess = 1;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment