Commit 6e9e80bc by dingsongjie

sagas client grpc 添加重试连接机制

parent ac47a1a5
...@@ -41,7 +41,7 @@ namespace SagasServer ...@@ -41,7 +41,7 @@ namespace SagasServer
{ {
app.UseDeveloperExceptionPage(); app.UseDeveloperExceptionPage();
} }
app.UserPoleSagasServer();
app.Run(async (context) => app.Run(async (context) =>
{ {
await context.Response.WriteAsync("Hello World!"); await context.Response.WriteAsync("Hello World!");
......
...@@ -35,8 +35,28 @@ namespace Pole.Sagas.Client ...@@ -35,8 +35,28 @@ namespace Pole.Sagas.Client
public async Task StartAsync(CancellationToken cancellationToken) public async Task StartAsync(CancellationToken cancellationToken)
{ {
while (true)
{
try
{
await GrpcGetSagasCore(cancellationToken);
}
catch (Exception ex)
{
logger.LogError(ex, "Errors in GRPC");
}
finally
{
await Task.Delay(options.GrpcConnectFailRetryIntervalSeconds * 1000);
}
}
}
private async Task GrpcGetSagasCore(CancellationToken cancellationToken)
{
using (var stream = sagaClient.GetSagas(new Pole.Sagas.Server.Grpc.GetSagasRequest { Limit = options.PreSagasGrpcStreamingResponseLimitCount, ServiceName = options.ServiceName })) using (var stream = sagaClient.GetSagas(new Pole.Sagas.Server.Grpc.GetSagasRequest { Limit = options.PreSagasGrpcStreamingResponseLimitCount, ServiceName = options.ServiceName }))
{ {
while (await stream.ResponseStream.MoveNext(cancellationToken)) while (await stream.ResponseStream.MoveNext(cancellationToken))
{ {
if (stream.ResponseStream.Current.IsSuccess) if (stream.ResponseStream.Current.IsSuccess)
...@@ -73,39 +93,12 @@ namespace Pole.Sagas.Client ...@@ -73,39 +93,12 @@ namespace Pole.Sagas.Client
} }
}); });
} }
catch(Exception ex) catch (Exception ex)
{ {
logger.LogError(ex, "Errors in NotEndedSagasCompensateRetryBackgroundService CompensateRetry"); logger.LogError(ex, "Errors in NotEndedSagasCompensateRetryBackgroundService CompensateRetry");
} }
} }
} }
//await foreach (var getSagasResponse in stream.ResponseStream.ReadAllAsync(cancellationToken))
//{
// if (getSagasResponse.IsSuccess)
// {
// var sagas = getSagasResponse.Sagas.Select(m =>
// {
// var result = new SagaEntity
// {
// Id = m.Id,
// };
// result.ActivityEntities = m.Activities.Select(n => new ActivityEntity
// {
// CompensateTimes = n.CompensateTimes,
// ExecuteTimes = n.ExecuteTimes,
// Id = n.Id,
// Name = n.Id,
// Order = n.Order,
// ParameterData = n.ParameterData.ToByteArray(),
// SagaId = n.SagaId,
// Status = n.Status
// }).ToList();
// return result;
// });
// }
//}
} }
} }
......
...@@ -13,5 +13,6 @@ namespace Pole.Sagas.Client ...@@ -13,5 +13,6 @@ namespace Pole.Sagas.Client
public int CompeletedSagaExpiredAfterSeconds { get; set; } = 60 * 10; public int CompeletedSagaExpiredAfterSeconds { get; set; } = 60 * 10;
public int SagasTimeOutSeconds { get; set; } = 60; public int SagasTimeOutSeconds { get; set; } = 60;
public string SagasServerHost { get; set; } public string SagasServerHost { get; set; }
public int GrpcConnectFailRetryIntervalSeconds { get; set; } = 10;
} }
} }
...@@ -4,11 +4,11 @@ using System; ...@@ -4,11 +4,11 @@ using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Text; using System.Text;
namespace Pole.Sagas.Server namespace Microsoft.Extensions.Hosting
{ {
public static class PoleSagasServerApplicationBuilderExtensions public static class PoleSagasServerApplicationBuilderExtensions
{ {
public static IApplicationBuilder UserPoleSagasServer(IApplicationBuilder builder) public static IApplicationBuilder UserPoleSagasServer(this IApplicationBuilder builder)
{ {
builder.UseRouting(); builder.UseRouting();
......
...@@ -11,16 +11,18 @@ namespace Microsoft.Extensions.DependencyInjection ...@@ -11,16 +11,18 @@ namespace Microsoft.Extensions.DependencyInjection
{ {
public static class PoleSagasServerServiceCollectionExtensions public static class PoleSagasServerServiceCollectionExtensions
{ {
public static StartupConfig AddSagasServer(this StartupConfig startupConfig, Action<PoleSagasServerOption> config = null) public static StartupConfig AddSagasServer(this StartupConfig startupConfig, Action<PoleSagasServerOption> config = null)
{ {
Action<PoleSagasServerOption> defaultConfig = option => { }; Action<PoleSagasServerOption> defaultConfig = option => { };
var finalConfig = config ?? defaultConfig; var finalConfig = config ?? defaultConfig;
startupConfig.Services.AddGrpc(); startupConfig.Services.AddGrpc();
startupConfig.Services.Configure(finalConfig); startupConfig.Services.Configure(finalConfig);
startupConfig.Services.AddSingleton<IProcessor, NotEndedSagasFetchProcessor>(); startupConfig.Services.AddSingleton<IProcessor, NotEndedSagasFetchProcessor>();
startupConfig.Services.AddSingleton<ISagasBuffer, SagasBuffer>();
startupConfig.Services.AddSingleton<IProcessor, ExpiredSagasCollectorProcessor>(); startupConfig.Services.AddSingleton<IProcessor, ExpiredSagasCollectorProcessor>();
startupConfig.Services.AddHostedService<BackgroundServiceBasedProcessorServer>(); startupConfig.Services.AddHostedService<BackgroundServiceBasedProcessorServer>();
return startupConfig; return startupConfig;
} }
} }
......
...@@ -15,7 +15,7 @@ namespace Pole.Sagas.Server ...@@ -15,7 +15,7 @@ namespace Pole.Sagas.Server
private readonly SemaphoreSlim semaphoreSlim = new SemaphoreSlim(1); private readonly SemaphoreSlim semaphoreSlim = new SemaphoreSlim(1);
private readonly Dictionary<string, List<SagaEntity>> Sagas = new Dictionary<string, List<SagaEntity>>(); private readonly Dictionary<string, List<SagaEntity>> Sagas = new Dictionary<string, List<SagaEntity>>();
private readonly ILogger logger; private readonly ILogger logger;
public SagasBuffer(ILogger logger) public SagasBuffer(ILogger<SagasBuffer> logger)
{ {
this.logger = logger; this.logger = logger;
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment