diff --git a/src/Pole.Sagas.Server/Pole.Sagas.Server.csproj b/src/Pole.Sagas.Server/Pole.Sagas.Server.csproj index cd806b5..84e8f09 100644 --- a/src/Pole.Sagas.Server/Pole.Sagas.Server.csproj +++ b/src/Pole.Sagas.Server/Pole.Sagas.Server.csproj @@ -12,12 +12,4 @@ - - - - - - - - diff --git a/src/Pole.Sagas.Server/PoleSagasServerServiceCollectionExtensions.cs b/src/Pole.Sagas.Server/PoleSagasServerServiceCollectionExtensions.cs index 0d89091..126f699 100644 --- a/src/Pole.Sagas.Server/PoleSagasServerServiceCollectionExtensions.cs +++ b/src/Pole.Sagas.Server/PoleSagasServerServiceCollectionExtensions.cs @@ -10,6 +10,7 @@ namespace Pole.Sagas.Server public static IServiceCollection AddPoleSagasServer(IServiceCollection services) { services.AddGrpc(); + return services; } } diff --git a/src/Pole.Sagas.Server/Protos/saga.proto b/src/Pole.Sagas.Server/Protos/saga.proto deleted file mode 100644 index 9ec43a9..0000000 --- a/src/Pole.Sagas.Server/Protos/saga.proto +++ /dev/null @@ -1,78 +0,0 @@ -syntax = "proto3"; - -option csharp_namespace = "Pole.Sagas.Server.Grpc"; - -package pole.Sagas.Server.Grpc; - -service Saga { - rpc SagaStarted (SagaStartedRequest) returns (CommonResponse); - rpc SagaEnded (SagaEndedRequest) returns (CommonResponse); - rpc ActivityExecuteStarted (ActivityExecuteStartedRequest) returns (CommonResponse); - rpc ActivityRetried (ActivityRetriedRequest) returns (CommonResponse); - rpc ActivityExecuteAborted (ActivityExecuteAbortedRequest) returns (CommonResponse); - rpc ActivityCompensateAborted (ActivityCompensateAbortedRequest) returns (CommonResponse); - rpc ActivityEnded (ActivityEndedRequest) returns (CommonResponse); - rpc ActivityCompensated (ActivityCompensatedRequest) returns (CommonResponse); - rpc ActivityExecuteOvertime (ActivityExecuteOvertimeRequest) returns (CommonResponse); - rpc ActivityRevoked (ActivityRevokedRequest) returns (CommonResponse); -} - -message CommonResponse{ - bool isSuccess = 1; - string message = 2; - string errors = 3; -} -message SagaStartedRequest { - string sagaId = 1; - string serviceName = 2; - string addTime = 3; -} -message SagaEndedRequest { - string sagaId = 1; - string ExpiresAt = 2; -} -message ActivityExecuteStartedRequest { - string activityId = 1; - string sagaId = 2; - int32 timeOutSeconds = 3; - bytes parameterData = 4; - int32 order = 5; - string addTime = 6; -} -message ActivityRetriedRequest { - string activityId = 1; - string status = 2; - int32 retries = 3; - ActivityRetryType activityRetryType = 4; - enum ActivityRetryType{ - Execute = 0; - Compensate = 1; - } -} -message ActivityExecuteAbortedRequest { - string activityId = 1; - string errors = 2; -} -message ActivityCompensateAbortedRequest { - string activityId = 1; - string sagaId = 2; - string errors = 3; -} -message ActivityEndedRequest { - string activityId = 1; - string sagaId = 2; - bytes resultData = 3; -} -message ActivityCompensatedRequest { - string activityId = 1; -} -message ActivityExecuteOvertimeRequest { - string activityId = 1; - string sagaId = 2; - string errors = 3; -} -message ActivityRevokedRequest { - string activityId = 1; -} - - diff --git a/src/Pole.Sagas/Core/Abstraction/IEventSender.cs b/src/Pole.Sagas/Core/Abstraction/IEventSender.cs index 07284a8..fcfa5f8 100644 --- a/src/Pole.Sagas/Core/Abstraction/IEventSender.cs +++ b/src/Pole.Sagas/Core/Abstraction/IEventSender.cs @@ -7,11 +7,11 @@ namespace Pole.Sagas.Core.Abstraction { Task SagaStarted(string sagaId, string serviceName, DateTime addTime); Task SagaEnded(string sagaId, DateTime ExpiresAt); - Task ActivityExecuteStarted(string activityId, string sagaId, int timeOutSeconds, byte[] ParameterData, int order, DateTime addTime); + Task ActivityExecuteStarted(string activityId, string sagaId, int timeOutSeconds, byte[] parameterData, int order, DateTime addTime); Task ActivityRetried(string activityId, string status, int retries, ActivityRetryType retryType); Task ActivityExecuteAborted(string activityId, string errors); Task ActivityCompensateAborted(string activityId, string sagaId, string errors); - Task ActivityEnded(string activityId, byte[] resultData); + Task ActivityEnded(string activityId,byte[] resultData); Task ActivityCompensated(string activityId); Task ActivityExecuteOvertime(string activityId, string sagaId, string errors); Task ActivityRevoked(string activityId); diff --git a/src/Pole.Sagas/Core/EventSender.cs b/src/Pole.Sagas/Core/EventSender.cs index 81d8437..63dd80d 100644 --- a/src/Pole.Sagas/Core/EventSender.cs +++ b/src/Pole.Sagas/Core/EventSender.cs @@ -1,61 +1,157 @@ -using Pole.Sagas.Core.Abstraction; +using Grpc.Net.Client; +using Microsoft.Extensions.Options; +using Pole.Sagas.Core.Abstraction; +using Pole.Sagas.Core.Exceptions; using System; using System.Collections.Generic; using System.Text; using System.Threading.Tasks; +using static Pole.Sagas.Server.Grpc.Saga; namespace Pole.Sagas.Core { class EventSender : IEventSender { - public Task ActivityCompensateAborted(string activityId, string sagaId, string errors) + private readonly SagaClient sagaClient; + public EventSender(SagaClient sagaClient) { - return Task.CompletedTask; + this.sagaClient = sagaClient; + } + public async Task ActivityCompensateAborted(string activityId, string sagaId, string errors) + { + var result = await sagaClient.ActivityCompensateAbortedAsync(new Server.Grpc.ActivityCompensateAbortedRequest + { + ActivityId = activityId, + Errors = errors, + SagaId = sagaId + }); + if (!result.IsSuccess) + { + throw new SagasServerException(result.Errors); + } } - public Task ActivityCompensated(string activityId) + public async Task ActivityCompensated(string activityId) { - return Task.CompletedTask; + var result = await sagaClient.ActivityCompensatedAsync(new Server.Grpc.ActivityCompensatedRequest + { + ActivityId = activityId, + }); + if (!result.IsSuccess) + { + throw new SagasServerException(result.Errors); + } } - public Task ActivityEnded(string activityId, byte[] resultData) + public async Task ActivityEnded(string activityId, byte[] resultData) { - return Task.CompletedTask; + var result = await sagaClient.ActivityEndedAsync(new Server.Grpc.ActivityEndedRequest + { + ActivityId = activityId, + ResultData = Google.Protobuf.ByteString.CopyFrom(resultData), + }); + if (!result.IsSuccess) + { + throw new SagasServerException(result.Errors); + } } - public Task ActivityExecuteAborted(string activityId, string errors) + public async Task ActivityExecuteAborted(string activityId, string errors) { - return Task.CompletedTask; + var result = await sagaClient.ActivityExecuteAbortedAsync(new Server.Grpc.ActivityExecuteAbortedRequest + { + ActivityId = activityId, + Errors = errors + }); + if (!result.IsSuccess) + { + throw new SagasServerException(result.Errors); + } } - public Task ActivityRetried(string activityId, string status, int retries, ActivityRetryType retryType) + public async Task ActivityRetried(string activityId, string status, int retries, ActivityRetryType retryType) { - return Task.CompletedTask; + Pole.Sagas.Server.Grpc.ActivityRetriedRequest.Types.ActivityRetryType activityRetryType = retryType == ActivityRetryType.Compensate ? Pole.Sagas.Server.Grpc.ActivityRetriedRequest.Types.ActivityRetryType.Compensate : Pole.Sagas.Server.Grpc.ActivityRetriedRequest.Types.ActivityRetryType.Execute; + + var result = await sagaClient.ActivityRetriedAsync(new Server.Grpc.ActivityRetriedRequest + { + ActivityId = activityId, + ActivityRetryType = activityRetryType + }); + if (!result.IsSuccess) + { + throw new SagasServerException(result.Errors); + } } - public Task ActivityExecuteStarted(string activityId, string sagaId, int timeoutSeconds, byte[] ParameterData, int order, DateTime addTime) + public async Task ActivityExecuteStarted(string activityId, string sagaId, int timeoutSeconds, byte[] parameterData, int order, DateTime addTime) { - return Task.CompletedTask; + var result = await sagaClient.ActivityExecuteStartedAsync(new Server.Grpc.ActivityExecuteStartedRequest + { + ActivityId = activityId, + AddTime = addTime.ToString("yyyy-MM-dd HH:mm:ss.fff"), + Order = order, + ParameterData = Google.Protobuf.ByteString.CopyFrom(parameterData), + SagaId = sagaId, + TimeOutSeconds = timeoutSeconds + }); + if (!result.IsSuccess) + { + throw new SagasServerException(result.Errors); + } } - public Task SagaEnded(string sagaId, DateTime ExpiresAt) + public async Task SagaEnded(string sagaId, DateTime ExpiresAt) { - return Task.CompletedTask; + var result = await sagaClient.SagaEndedAsync(new Server.Grpc.SagaEndedRequest + { + SagaId = sagaId, + ExpiresAt = ExpiresAt.ToString("yyyy-MM-dd HH:mm:ss.fff"), + }); + if (!result.IsSuccess) + { + throw new SagasServerException(result.Errors); + } } - public Task SagaStarted(string sagaId, string serviceName, DateTime addTime) + public async Task SagaStarted(string sagaId, string serviceName, DateTime addTime) { - return Task.CompletedTask; + var result = await sagaClient.SagaStartedAsync(new Server.Grpc.SagaStartedRequest + { + SagaId = sagaId, + ServiceName = serviceName, + AddTime = addTime.ToString("yyyy-MM-dd HH:mm:ss.fff"), + }); + if (!result.IsSuccess) + { + throw new SagasServerException(result.Errors); + } } - public Task ActivityExecuteOvertime(string activityId, string sagaId, string errors) + public async Task ActivityExecuteOvertime(string activityId, string sagaId, string errors) { - return Task.CompletedTask; + var result = await sagaClient.ActivityExecuteOvertimeAsync(new Server.Grpc.ActivityExecuteOvertimeRequest + { + SagaId = sagaId, + ActivityId = activityId, + Errors = errors + }); + if (!result.IsSuccess) + { + throw new SagasServerException(result.Errors); + } } - public Task ActivityRevoked(string activityId) + public async Task ActivityRevoked(string activityId) { - throw new NotImplementedException(); + var result = await sagaClient.ActivityRevokedAsync(new Server.Grpc.ActivityRevokedRequest + { + ActivityId = activityId, + }); + if (!result.IsSuccess) + { + throw new SagasServerException(result.Errors); + } } } } diff --git a/src/Pole.Sagas/Core/Exceptions/EventSenderException.cs b/src/Pole.Sagas/Core/Exceptions/EventSenderException.cs new file mode 100644 index 0000000..fd2937f --- /dev/null +++ b/src/Pole.Sagas/Core/Exceptions/EventSenderException.cs @@ -0,0 +1,14 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace Pole.Sagas.Core.Exceptions +{ + public class SagasServerException : Exception + { + public SagasServerException(string errors) : base(errors) + { + + } + } +} diff --git a/src/Pole.Sagas/Core/PoleSagasOption.cs b/src/Pole.Sagas/Core/PoleSagasOption.cs index 05cf12f..7b81db1 100644 --- a/src/Pole.Sagas/Core/PoleSagasOption.cs +++ b/src/Pole.Sagas/Core/PoleSagasOption.cs @@ -8,5 +8,6 @@ namespace Pole.Sagas.Core { public string ServiceName { get; set; } public int CompeletedSagaExpiredAfterSeconds { get; set; } = 60 * 10; + public string SagasServerHost { get; set; } } } diff --git a/src/Pole.Sagas/Pole.Sagas.csproj b/src/Pole.Sagas/Pole.Sagas.csproj index bb5bd42..c118917 100644 --- a/src/Pole.Sagas/Pole.Sagas.csproj +++ b/src/Pole.Sagas/Pole.Sagas.csproj @@ -5,13 +5,20 @@ + + + all + runtime; build; native; contentfiles; analyzers; buildtransitive + - + + + Server diff --git a/src/Pole.Sagas/PoleSagaServiceCollectionExtensions.cs b/src/Pole.Sagas/PoleSagaServiceCollectionExtensions.cs index f543dc4..3c053c8 100644 --- a/src/Pole.Sagas/PoleSagaServiceCollectionExtensions.cs +++ b/src/Pole.Sagas/PoleSagaServiceCollectionExtensions.cs @@ -3,22 +3,32 @@ using System.Collections.Generic; using System.Linq; using System.Text; using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Options; using Pole.Core; using Pole.Core.Utils; using Pole.Sagas.Core; using Pole.Sagas.Core.Abstraction; using Pole.Sagas.Core.Exceptions; +using static Pole.Sagas.Server.Grpc.Saga; namespace Microsoft.Extensions.DependencyInjection { public static class PoleSagaServiceCollectionExtensions { - public static void AddSagas(this StartupConfig startupOption, Action rabbitConfigAction) + public static void AddSagas(this StartupConfig startupOption, Action configAction) { - startupOption.Services.Configure(rabbitConfigAction); + startupOption.Services.Configure(configAction); startupOption.Services.AddSingleton(); startupOption.Services.AddSingleton(); startupOption.Services.AddSingleton(); + using(var provider = startupOption.Services.BuildServiceProvider()) + { + var sagasOption = provider.GetRequiredService>().Value; + startupOption.Services.AddGrpcClient(o => + { + o.Address = new Uri(sagasOption.SagasServerHost); + }); + } var baseActivityType = typeof(IActivity<>); foreach (var assembly in AssemblyHelper.GetAssemblies()) { diff --git a/src/Pole.Sagas/Protos/saga.proto b/src/Pole.Sagas/Protos/saga.proto index 31c7ca4..9f744bf 100644 --- a/src/Pole.Sagas/Protos/saga.proto +++ b/src/Pole.Sagas/Protos/saga.proto @@ -6,7 +6,7 @@ package pole.Sagas.Server.Grpc; service Saga { rpc SagaStarted (SagaStartedRequest) returns (CommonResponse); - rpc SagaEnded (SagaStartedRequest) returns (CommonResponse); + rpc SagaEnded (SagaEndedRequest) returns (CommonResponse); rpc ActivityExecuteStarted (ActivityExecuteStartedRequest) returns (CommonResponse); rpc ActivityRetried (ActivityRetriedRequest) returns (CommonResponse); rpc ActivityExecuteAborted (ActivityExecuteAbortedRequest) returns (CommonResponse); @@ -25,6 +25,7 @@ message CommonResponse{ message SagaStartedRequest { string sagaId = 1; string serviceName = 2; + string addTime = 3; } message SagaEndedRequest { string sagaId = 1; @@ -59,8 +60,7 @@ message ActivityCompensateAbortedRequest { } message ActivityEndedRequest { string activityId = 1; - string sagaId = 2; - bytes resultData = 3; + bytes resultData = 2; } message ActivityCompensatedRequest { string activityId = 1;