diff --git a/samples/apis/SagasServer/Startup.cs b/samples/apis/SagasServer/Startup.cs index df281de..5381e1c 100644 --- a/samples/apis/SagasServer/Startup.cs +++ b/samples/apis/SagasServer/Startup.cs @@ -41,7 +41,7 @@ namespace SagasServer { app.UseDeveloperExceptionPage(); } - + app.UserPoleSagasServer(); app.Run(async (context) => { await context.Response.WriteAsync("Hello World!"); diff --git a/src/Pole.Sagas.Client/NotEndedSagasCompensateRetryBackgroundService.cs b/src/Pole.Sagas.Client/NotEndedSagasCompensateRetryBackgroundService.cs index 28bef1a..96b90c3 100644 --- a/src/Pole.Sagas.Client/NotEndedSagasCompensateRetryBackgroundService.cs +++ b/src/Pole.Sagas.Client/NotEndedSagasCompensateRetryBackgroundService.cs @@ -35,8 +35,28 @@ namespace Pole.Sagas.Client public async Task StartAsync(CancellationToken cancellationToken) { + while (true) + { + try + { + await GrpcGetSagasCore(cancellationToken); + } + catch (Exception ex) + { + logger.LogError(ex, "Errors in GRPC"); + } + finally + { + await Task.Delay(options.GrpcConnectFailRetryIntervalSeconds * 1000); + } + } + } + + private async Task GrpcGetSagasCore(CancellationToken cancellationToken) + { using (var stream = sagaClient.GetSagas(new Pole.Sagas.Server.Grpc.GetSagasRequest { Limit = options.PreSagasGrpcStreamingResponseLimitCount, ServiceName = options.ServiceName })) { + while (await stream.ResponseStream.MoveNext(cancellationToken)) { if (stream.ResponseStream.Current.IsSuccess) @@ -73,39 +93,12 @@ namespace Pole.Sagas.Client } }); } - catch(Exception ex) + catch (Exception ex) { logger.LogError(ex, "Errors in NotEndedSagasCompensateRetryBackgroundService CompensateRetry"); } - } } - //await foreach (var getSagasResponse in stream.ResponseStream.ReadAllAsync(cancellationToken)) - //{ - // if (getSagasResponse.IsSuccess) - // { - // var sagas = getSagasResponse.Sagas.Select(m => - // { - // var result = new SagaEntity - // { - // Id = m.Id, - // }; - // result.ActivityEntities = m.Activities.Select(n => new ActivityEntity - // { - // CompensateTimes = n.CompensateTimes, - // ExecuteTimes = n.ExecuteTimes, - // Id = n.Id, - // Name = n.Id, - // Order = n.Order, - // ParameterData = n.ParameterData.ToByteArray(), - // SagaId = n.SagaId, - // Status = n.Status - // }).ToList(); - // return result; - // }); - - // } - //} } } diff --git a/src/Pole.Sagas.Client/PoleSagasOption.cs b/src/Pole.Sagas.Client/PoleSagasOption.cs index 6f3ff31..416ad7d 100644 --- a/src/Pole.Sagas.Client/PoleSagasOption.cs +++ b/src/Pole.Sagas.Client/PoleSagasOption.cs @@ -13,5 +13,6 @@ namespace Pole.Sagas.Client public int CompeletedSagaExpiredAfterSeconds { get; set; } = 60 * 10; public int SagasTimeOutSeconds { get; set; } = 60; public string SagasServerHost { get; set; } + public int GrpcConnectFailRetryIntervalSeconds { get; set; } = 10; } } diff --git a/src/Pole.Sagas.Server/PoleSagasServerApplicationBuilderExtensions.cs b/src/Pole.Sagas.Server/PoleSagasServerApplicationBuilderExtensions.cs index b983f0d..422afac 100644 --- a/src/Pole.Sagas.Server/PoleSagasServerApplicationBuilderExtensions.cs +++ b/src/Pole.Sagas.Server/PoleSagasServerApplicationBuilderExtensions.cs @@ -4,11 +4,11 @@ using System; using System.Collections.Generic; using System.Text; -namespace Pole.Sagas.Server +namespace Microsoft.Extensions.Hosting { public static class PoleSagasServerApplicationBuilderExtensions { - public static IApplicationBuilder UserPoleSagasServer(IApplicationBuilder builder) + public static IApplicationBuilder UserPoleSagasServer(this IApplicationBuilder builder) { builder.UseRouting(); diff --git a/src/Pole.Sagas.Server/PoleSagasServerServiceCollectionExtensions.cs b/src/Pole.Sagas.Server/PoleSagasServerServiceCollectionExtensions.cs index 8a2d321..6d62fe5 100644 --- a/src/Pole.Sagas.Server/PoleSagasServerServiceCollectionExtensions.cs +++ b/src/Pole.Sagas.Server/PoleSagasServerServiceCollectionExtensions.cs @@ -11,16 +11,18 @@ namespace Microsoft.Extensions.DependencyInjection { public static class PoleSagasServerServiceCollectionExtensions { - public static StartupConfig AddSagasServer(this StartupConfig startupConfig, Action config = null) - { + public static StartupConfig AddSagasServer(this StartupConfig startupConfig, Action config = null) + { Action defaultConfig = option => { }; var finalConfig = config ?? defaultConfig; startupConfig.Services.AddGrpc(); startupConfig.Services.Configure(finalConfig); startupConfig.Services.AddSingleton(); + startupConfig.Services.AddSingleton(); startupConfig.Services.AddSingleton(); startupConfig.Services.AddHostedService(); + return startupConfig; } } diff --git a/src/Pole.Sagas.Server/SagasBuffer.cs b/src/Pole.Sagas.Server/SagasBuffer.cs index 20dc0ba..3995137 100644 --- a/src/Pole.Sagas.Server/SagasBuffer.cs +++ b/src/Pole.Sagas.Server/SagasBuffer.cs @@ -15,7 +15,7 @@ namespace Pole.Sagas.Server private readonly SemaphoreSlim semaphoreSlim = new SemaphoreSlim(1); private readonly Dictionary> Sagas = new Dictionary>(); private readonly ILogger logger; - public SagasBuffer(ILogger logger) + public SagasBuffer(ILogger logger) { this.logger = logger; }