From 9aa11ff082b1a52c4936ba344e680a1a9f6398e3 Mon Sep 17 00:00:00 2001 From: dingsongjie Date: Mon, 9 Mar 2020 13:13:08 +0800 Subject: [PATCH] 添加 grpc 服务 --- Pole.sln | 7 +++++++ src/Pole.Sagas.Server/Pole.Sagas.Server.csproj | 23 +++++++++++++++++++++++ src/Pole.Sagas.Server/PoleSagasServerApplicationBuilderExtensions.cs | 21 +++++++++++++++++++++ src/Pole.Sagas.Server/PoleSagasServerServiceCollectionExtensions.cs | 16 ++++++++++++++++ src/Pole.Sagas.Server/Protos/saga.proto | 78 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/Pole.Sagas.Server/Services/SagaService.cs | 164 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/Pole.Sagas.Storage.PostgreSql/Pole.Sagas.Storage.PostgreSql.csproj | 6 +++++- src/Pole.Sagas/Core/Abstraction/IEventSender.cs | 8 ++++---- src/Pole.Sagas/Core/Abstraction/ISagaInvoker.cs | 10 ++++++++++ src/Pole.Sagas/Core/ActivityEntity.cs | 21 +++++++++++++++++++++ src/Pole.Sagas/Core/ActivityRetryType.cs | 12 ++++++++++++ src/Pole.Sagas/Core/ActivityWapper.cs | 2 +- src/Pole.Sagas/Core/EventSender.cs | 10 +++++----- src/Pole.Sagas/Core/ISagaStorage.cs | 22 ++++++++++++++++++++++ src/Pole.Sagas/Core/Saga.cs | 13 +++++++------ src/Pole.Sagas/Core/SagaEntity.cs | 16 ++++++++++++++++ src/Pole.Sagas/Pole.Sagas.csproj | 9 ++++++++- src/Pole.Sagas/Protos/saga.proto | 77 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/Pole.Sagas/Server/Events/SagaStartedEvent.cs | 11 ----------- src/Pole.Sagas/Server/IEvent.cs | 11 ----------- 20 files changed, 497 insertions(+), 40 deletions(-) create mode 100644 src/Pole.Sagas.Server/Pole.Sagas.Server.csproj create mode 100644 src/Pole.Sagas.Server/PoleSagasServerApplicationBuilderExtensions.cs create mode 100644 src/Pole.Sagas.Server/PoleSagasServerServiceCollectionExtensions.cs create mode 100644 src/Pole.Sagas.Server/Protos/saga.proto create mode 100644 src/Pole.Sagas.Server/Services/SagaService.cs create mode 100644 src/Pole.Sagas/Core/Abstraction/ISagaInvoker.cs create mode 100644 src/Pole.Sagas/Core/ActivityEntity.cs create mode 100644 src/Pole.Sagas/Core/ActivityRetryType.cs create mode 100644 src/Pole.Sagas/Core/ISagaStorage.cs create mode 100644 src/Pole.Sagas/Core/SagaEntity.cs create mode 100644 src/Pole.Sagas/Protos/saga.proto delete mode 100644 src/Pole.Sagas/Server/Events/SagaStartedEvent.cs delete mode 100644 src/Pole.Sagas/Server/IEvent.cs diff --git a/Pole.sln b/Pole.sln index 8bc9b96..1caff7e 100644 --- a/Pole.sln +++ b/Pole.sln @@ -41,6 +41,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Pole.Sagas.Storage.PostgreS EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SagasTest.Api", "samples\apis\SagasTest.Api\SagasTest.Api.csproj", "{6138197E-6202-4E1B-9458-3CBEE60A36F9}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Pole.Sagas.Server", "src\Pole.Sagas.Server\Pole.Sagas.Server.csproj", "{34ECE24E-0D78-4764-BC54-0CEE61BDB96A}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -103,6 +105,10 @@ Global {6138197E-6202-4E1B-9458-3CBEE60A36F9}.Debug|Any CPU.Build.0 = Debug|Any CPU {6138197E-6202-4E1B-9458-3CBEE60A36F9}.Release|Any CPU.ActiveCfg = Release|Any CPU {6138197E-6202-4E1B-9458-3CBEE60A36F9}.Release|Any CPU.Build.0 = Release|Any CPU + {34ECE24E-0D78-4764-BC54-0CEE61BDB96A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {34ECE24E-0D78-4764-BC54-0CEE61BDB96A}.Debug|Any CPU.Build.0 = Debug|Any CPU + {34ECE24E-0D78-4764-BC54-0CEE61BDB96A}.Release|Any CPU.ActiveCfg = Release|Any CPU + {34ECE24E-0D78-4764-BC54-0CEE61BDB96A}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -124,6 +130,7 @@ Global {1F06D877-E4EC-4908-9057-38EDCE5E54E6} = {9932C965-8B38-4F70-9E43-86DC56860E2B} {9505BDFC-395B-4257-AEB3-2B44611147A4} = {9932C965-8B38-4F70-9E43-86DC56860E2B} {6138197E-6202-4E1B-9458-3CBEE60A36F9} = {475116FC-DEEC-4255-94E4-AE7B8C85038D} + {34ECE24E-0D78-4764-BC54-0CEE61BDB96A} = {9932C965-8B38-4F70-9E43-86DC56860E2B} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {DB0775A3-F293-4043-ADB7-72BAC081E87E} diff --git a/src/Pole.Sagas.Server/Pole.Sagas.Server.csproj b/src/Pole.Sagas.Server/Pole.Sagas.Server.csproj new file mode 100644 index 0000000..cd806b5 --- /dev/null +++ b/src/Pole.Sagas.Server/Pole.Sagas.Server.csproj @@ -0,0 +1,23 @@ + + + + netcoreapp3.1 + + + + + + + + + + + + + + + + + + + diff --git a/src/Pole.Sagas.Server/PoleSagasServerApplicationBuilderExtensions.cs b/src/Pole.Sagas.Server/PoleSagasServerApplicationBuilderExtensions.cs new file mode 100644 index 0000000..b51db48 --- /dev/null +++ b/src/Pole.Sagas.Server/PoleSagasServerApplicationBuilderExtensions.cs @@ -0,0 +1,21 @@ +using Microsoft.AspNetCore.Builder; +using System; +using System.Collections.Generic; +using System.Text; + +namespace Pole.Sagas.Server +{ + public static class PoleSagasServerApplicationBuilderExtensions + { + public static IApplicationBuilder UserPoleSagasServer(IApplicationBuilder builder) + { + builder.UseRouting(); + + builder.UseEndpoints(endpoints => + { + endpoints.MapGrpcService().EnableGrpcWeb(); + }); + + } + } +} diff --git a/src/Pole.Sagas.Server/PoleSagasServerServiceCollectionExtensions.cs b/src/Pole.Sagas.Server/PoleSagasServerServiceCollectionExtensions.cs new file mode 100644 index 0000000..0d89091 --- /dev/null +++ b/src/Pole.Sagas.Server/PoleSagasServerServiceCollectionExtensions.cs @@ -0,0 +1,16 @@ +using Microsoft.Extensions.DependencyInjection; +using System; +using System.Collections.Generic; +using System.Text; + +namespace Pole.Sagas.Server +{ + public static class PoleSagasServerServiceCollectionExtensions + { + public static IServiceCollection AddPoleSagasServer(IServiceCollection services) + { + services.AddGrpc(); + return services; + } + } +} diff --git a/src/Pole.Sagas.Server/Protos/saga.proto b/src/Pole.Sagas.Server/Protos/saga.proto new file mode 100644 index 0000000..9ec43a9 --- /dev/null +++ b/src/Pole.Sagas.Server/Protos/saga.proto @@ -0,0 +1,78 @@ +syntax = "proto3"; + +option csharp_namespace = "Pole.Sagas.Server.Grpc"; + +package pole.Sagas.Server.Grpc; + +service Saga { + rpc SagaStarted (SagaStartedRequest) returns (CommonResponse); + rpc SagaEnded (SagaEndedRequest) returns (CommonResponse); + rpc ActivityExecuteStarted (ActivityExecuteStartedRequest) returns (CommonResponse); + rpc ActivityRetried (ActivityRetriedRequest) returns (CommonResponse); + rpc ActivityExecuteAborted (ActivityExecuteAbortedRequest) returns (CommonResponse); + rpc ActivityCompensateAborted (ActivityCompensateAbortedRequest) returns (CommonResponse); + rpc ActivityEnded (ActivityEndedRequest) returns (CommonResponse); + rpc ActivityCompensated (ActivityCompensatedRequest) returns (CommonResponse); + rpc ActivityExecuteOvertime (ActivityExecuteOvertimeRequest) returns (CommonResponse); + rpc ActivityRevoked (ActivityRevokedRequest) returns (CommonResponse); +} + +message CommonResponse{ + bool isSuccess = 1; + string message = 2; + string errors = 3; +} +message SagaStartedRequest { + string sagaId = 1; + string serviceName = 2; + string addTime = 3; +} +message SagaEndedRequest { + string sagaId = 1; + string ExpiresAt = 2; +} +message ActivityExecuteStartedRequest { + string activityId = 1; + string sagaId = 2; + int32 timeOutSeconds = 3; + bytes parameterData = 4; + int32 order = 5; + string addTime = 6; +} +message ActivityRetriedRequest { + string activityId = 1; + string status = 2; + int32 retries = 3; + ActivityRetryType activityRetryType = 4; + enum ActivityRetryType{ + Execute = 0; + Compensate = 1; + } +} +message ActivityExecuteAbortedRequest { + string activityId = 1; + string errors = 2; +} +message ActivityCompensateAbortedRequest { + string activityId = 1; + string sagaId = 2; + string errors = 3; +} +message ActivityEndedRequest { + string activityId = 1; + string sagaId = 2; + bytes resultData = 3; +} +message ActivityCompensatedRequest { + string activityId = 1; +} +message ActivityExecuteOvertimeRequest { + string activityId = 1; + string sagaId = 2; + string errors = 3; +} +message ActivityRevokedRequest { + string activityId = 1; +} + + diff --git a/src/Pole.Sagas.Server/Services/SagaService.cs b/src/Pole.Sagas.Server/Services/SagaService.cs new file mode 100644 index 0000000..98f795a --- /dev/null +++ b/src/Pole.Sagas.Server/Services/SagaService.cs @@ -0,0 +1,164 @@ +using Grpc.Core; +using Pole.Sagas.Core; +using Pole.Sagas.Server.Grpc; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; + +namespace Pole.Sagas.Server.Services +{ + public class SagaService : Pole.Sagas.Server.Grpc.Saga.SagaBase + { + private readonly ISagaStorage sagaStorage; + public SagaService(ISagaStorage sagaStorage) + { + this.sagaStorage = sagaStorage; + } + public override async Task ActivityCompensateAborted(ActivityCompensateAbortedRequest request, ServerCallContext context) + { + CommonResponse commonResponse = new CommonResponse(); + try + { + await sagaStorage.ActivityCompensateAborted(request.ActivityId, request.SagaId, request.Errors); + commonResponse.IsSuccess = true; + } + catch (Exception ex) + { + commonResponse.Errors = CombineError(ex); + } + return commonResponse; + } + public override async Task ActivityCompensated(ActivityCompensatedRequest request, ServerCallContext context) + { + CommonResponse commonResponse = new CommonResponse(); + try + { + await sagaStorage.ActivityCompensated(request.ActivityId); + commonResponse.IsSuccess = true; + } + catch (Exception ex) + { + commonResponse.Errors = CombineError(ex); + } + return commonResponse; + } + public override async Task ActivityEnded(ActivityEndedRequest request, ServerCallContext context) + { + CommonResponse commonResponse = new CommonResponse(); + try + { + await sagaStorage.ActivityEnded(request.ActivityId, request.ResultData.ToByteArray()); + commonResponse.IsSuccess = true; + } + catch (Exception ex) + { + commonResponse.Errors = CombineError(ex); + } + return commonResponse; + } + public override async Task ActivityExecuteAborted(ActivityExecuteAbortedRequest request, ServerCallContext context) + { + CommonResponse commonResponse = new CommonResponse(); + try + { + await sagaStorage.ActivityExecuteAborted(request.ActivityId, request.Errors); + commonResponse.IsSuccess = true; + } + catch (Exception ex) + { + commonResponse.Errors = CombineError(ex); + } + return commonResponse; + } + public override async Task ActivityExecuteOvertime(ActivityExecuteOvertimeRequest request, ServerCallContext context) + { + CommonResponse commonResponse = new CommonResponse(); + try + { + await sagaStorage.ActivityExecuteOvertime(request.ActivityId, request.SagaId, request.Errors); + commonResponse.IsSuccess = true; + } + catch (Exception ex) + { + commonResponse.Errors = CombineError(ex); + } + return commonResponse; + } + public override async Task ActivityExecuteStarted(ActivityExecuteStartedRequest request, ServerCallContext context) + { + CommonResponse commonResponse = new CommonResponse(); + try + { + await sagaStorage.ActivityExecuteStarted(request.ActivityId, request.SagaId, request.TimeOutSeconds, request.ParameterData.ToByteArray(), request.Order, Convert.ToDateTime(request.AddTime)); + commonResponse.IsSuccess = true; + } + catch (Exception ex) + { + commonResponse.Errors = CombineError(ex); + } + return commonResponse; + } + public override async Task ActivityRetried(ActivityRetriedRequest request, ServerCallContext context) + { + CommonResponse commonResponse = new CommonResponse(); + try + { + var targetActivityRetryType = request.ActivityRetryType == Pole.Sagas.Server.Grpc.ActivityRetriedRequest.Types.ActivityRetryType.Compensate ? ActivityRetryType.Compensate : ActivityRetryType.Execute; + await sagaStorage.ActivityRetried(request.ActivityId, request.Status, request.Retries, targetActivityRetryType); + commonResponse.IsSuccess = true; + } + catch (Exception ex) + { + commonResponse.Errors = CombineError(ex); + } + return commonResponse; + } + public override async Task ActivityRevoked(ActivityRevokedRequest request, ServerCallContext context) + { + CommonResponse commonResponse = new CommonResponse(); + try + { + await sagaStorage.ActivityRevoked(request.ActivityId); + commonResponse.IsSuccess = true; + } + catch (Exception ex) + { + commonResponse.Errors = CombineError(ex); + } + return commonResponse; + } + public override async Task SagaEnded(SagaEndedRequest request, ServerCallContext context) + { + CommonResponse commonResponse = new CommonResponse(); + try + { + await sagaStorage.SagaEnded(request.SagaId,Convert.ToDateTime(request.ExpiresAt)); + commonResponse.IsSuccess = true; + } + catch (Exception ex) + { + commonResponse.Errors = CombineError(ex); + } + return commonResponse; + } + public override async Task SagaStarted(SagaStartedRequest request, ServerCallContext context) + { + CommonResponse commonResponse = new CommonResponse(); + try + { + await sagaStorage.SagaStarted(request.SagaId,request.ServiceName,Convert.ToDateTime( request.AddTime)); + commonResponse.IsSuccess = true; + } + catch (Exception ex) + { + commonResponse.Errors = CombineError(ex); + } + return commonResponse; + } + private string CombineError(Exception exception) + { + return exception.InnerException != null ? exception.InnerException.Message + exception.StackTrace : exception.Message + exception.StackTrace; + } + } +} diff --git a/src/Pole.Sagas.Storage.PostgreSql/Pole.Sagas.Storage.PostgreSql.csproj b/src/Pole.Sagas.Storage.PostgreSql/Pole.Sagas.Storage.PostgreSql.csproj index 9f5c4f4..ac168da 100644 --- a/src/Pole.Sagas.Storage.PostgreSql/Pole.Sagas.Storage.PostgreSql.csproj +++ b/src/Pole.Sagas.Storage.PostgreSql/Pole.Sagas.Storage.PostgreSql.csproj @@ -1,7 +1,11 @@ - netstandard2.0 + netstandard2.1 + + + + diff --git a/src/Pole.Sagas/Core/Abstraction/IEventSender.cs b/src/Pole.Sagas/Core/Abstraction/IEventSender.cs index 515318b..07284a8 100644 --- a/src/Pole.Sagas/Core/Abstraction/IEventSender.cs +++ b/src/Pole.Sagas/Core/Abstraction/IEventSender.cs @@ -5,13 +5,13 @@ namespace Pole.Sagas.Core.Abstraction { public interface IEventSender { - Task SagaStarted(string sagaId, string serviceName); + Task SagaStarted(string sagaId, string serviceName, DateTime addTime); Task SagaEnded(string sagaId, DateTime ExpiresAt); - Task ActivityExecuteStarted(string activityId, string sagaId, int timeOutSeconds, string parameterContent, int order); - Task ActivityRetried(string activityId, string status, int retries, string resultContent); + Task ActivityExecuteStarted(string activityId, string sagaId, int timeOutSeconds, byte[] ParameterData, int order, DateTime addTime); + Task ActivityRetried(string activityId, string status, int retries, ActivityRetryType retryType); Task ActivityExecuteAborted(string activityId, string errors); Task ActivityCompensateAborted(string activityId, string sagaId, string errors); - Task ActivityEnded(string activityId, string resultContent); + Task ActivityEnded(string activityId, byte[] resultData); Task ActivityCompensated(string activityId); Task ActivityExecuteOvertime(string activityId, string sagaId, string errors); Task ActivityRevoked(string activityId); diff --git a/src/Pole.Sagas/Core/Abstraction/ISagaInvoker.cs b/src/Pole.Sagas/Core/Abstraction/ISagaInvoker.cs new file mode 100644 index 0000000..69629b0 --- /dev/null +++ b/src/Pole.Sagas/Core/Abstraction/ISagaInvoker.cs @@ -0,0 +1,10 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace Pole.Sagas.Core.Abstraction +{ + public interface ISagaInvoker + { + } +} diff --git a/src/Pole.Sagas/Core/ActivityEntity.cs b/src/Pole.Sagas/Core/ActivityEntity.cs new file mode 100644 index 0000000..b1e6ab9 --- /dev/null +++ b/src/Pole.Sagas/Core/ActivityEntity.cs @@ -0,0 +1,21 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace Pole.Sagas.Core +{ + public class ActivityEntity + { + public string Id { get; set; } + public string SagaId { get; set; } + public int Order { get; set; } + public string Status { get; set; } + public int TimeOutSeconds { get; set; } + public Byte[] ParameterData { get; set; } + public Byte[] ResultData { get; set; } + public string Errors { get; set; } + public int ExecuteRetries { get; set; } + public int CompensateRetries { get; set; } + public DateTime AddTime { get; set; } + } +} diff --git a/src/Pole.Sagas/Core/ActivityRetryType.cs b/src/Pole.Sagas/Core/ActivityRetryType.cs new file mode 100644 index 0000000..fe5a014 --- /dev/null +++ b/src/Pole.Sagas/Core/ActivityRetryType.cs @@ -0,0 +1,12 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace Pole.Sagas.Core +{ + public enum ActivityRetryType + { + Execute, + Compensate + } +} diff --git a/src/Pole.Sagas/Core/ActivityWapper.cs b/src/Pole.Sagas/Core/ActivityWapper.cs index 52c4486..c3ecd75 100644 --- a/src/Pole.Sagas/Core/ActivityWapper.cs +++ b/src/Pole.Sagas/Core/ActivityWapper.cs @@ -15,7 +15,7 @@ namespace Pole.Sagas.Core public Type ActivityDataType { get; set; } public object DataObj { get; set; } public int Order { get; set; } - public ActivityStatus ActivityState { get; set; } + public ActivityStatus ActivityStatus { get; set; } public IServiceProvider ServiceProvider { get; set; } public int TimeOutSeconds { get; set; } public CancellationTokenSource CancellationTokenSource { get; set; } diff --git a/src/Pole.Sagas/Core/EventSender.cs b/src/Pole.Sagas/Core/EventSender.cs index bb400e8..81d8437 100644 --- a/src/Pole.Sagas/Core/EventSender.cs +++ b/src/Pole.Sagas/Core/EventSender.cs @@ -18,22 +18,22 @@ namespace Pole.Sagas.Core return Task.CompletedTask; } - public Task ActivityEnded(string activityId, string resultContent) + public Task ActivityEnded(string activityId, byte[] resultData) { return Task.CompletedTask; } - public Task ActivityExecuteAborted(string activityId, string errors) + public Task ActivityExecuteAborted(string activityId, string errors) { return Task.CompletedTask; } - public Task ActivityRetried(string activityId, string status, int retries, string resultContent) + public Task ActivityRetried(string activityId, string status, int retries, ActivityRetryType retryType) { return Task.CompletedTask; } - public Task ActivityExecuteStarted(string activityId, string sagaId, int timeoutSeconds, string parameterContent, int order) + public Task ActivityExecuteStarted(string activityId, string sagaId, int timeoutSeconds, byte[] ParameterData, int order, DateTime addTime) { return Task.CompletedTask; } @@ -43,7 +43,7 @@ namespace Pole.Sagas.Core return Task.CompletedTask; } - public Task SagaStarted(string sagaId, string serviceName) + public Task SagaStarted(string sagaId, string serviceName, DateTime addTime) { return Task.CompletedTask; } diff --git a/src/Pole.Sagas/Core/ISagaStorage.cs b/src/Pole.Sagas/Core/ISagaStorage.cs new file mode 100644 index 0000000..ef85923 --- /dev/null +++ b/src/Pole.Sagas/Core/ISagaStorage.cs @@ -0,0 +1,22 @@ +using Pole.Sagas.Core; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; + +namespace Pole.Sagas.Core +{ + public interface ISagaStorage + { + Task SagaStarted(string sagaId, string serviceName,DateTime addTime); + Task SagaEnded(string sagaId, DateTime ExpiresAt); + Task ActivityExecuteStarted(string activityId, string sagaId, int timeOutSeconds, byte[] ParameterData, int order,DateTime addTime); + Task ActivityRetried(string activityId, string status, int retries, ActivityRetryType retryType); + Task ActivityExecuteAborted(string activityId, string errors); + Task ActivityCompensateAborted(string activityId, string sagaId, string errors); + Task ActivityEnded(string activityId, byte[] resultData); + Task ActivityCompensated(string activityId); + Task ActivityExecuteOvertime(string activityId, string sagaId, string errors); + Task ActivityRevoked(string activityId); + } +} diff --git a/src/Pole.Sagas/Core/Saga.cs b/src/Pole.Sagas/Core/Saga.cs index c0f3ce2..24972c3 100644 --- a/src/Pole.Sagas/Core/Saga.cs +++ b/src/Pole.Sagas/Core/Saga.cs @@ -43,7 +43,7 @@ namespace Pole.Sagas.Core this.activityFinder = activityFinder; Id = snowflakeIdGenerator.NextId(); } - internal Saga(ISnowflakeIdGenerator snowflakeIdGenerator, IServiceProvider serviceProvider, IEventSender eventSender, PoleSagasOption poleSagasOption, ISerializer serializer, IActivityFinder activityFinder,int currentExecuteOrder,int currentCompensateOrder) + internal Saga(ISnowflakeIdGenerator snowflakeIdGenerator, IServiceProvider serviceProvider, IEventSender eventSender, PoleSagasOption poleSagasOption, ISerializer serializer, IActivityFinder activityFinder,int currentExecuteOrder,int currentCompensateOrder, List activities) { this.snowflakeIdGenerator = snowflakeIdGenerator; this.serviceProvider = serviceProvider; @@ -54,6 +54,7 @@ namespace Pole.Sagas.Core Id = snowflakeIdGenerator.NextId(); this.currentExecuteOrder = currentExecuteOrder; this.currentCompensateOrder = currentCompensateOrder; + this.activities = activities; } public void AddActivity(string activityName, object data, int timeOutSeconds = 2) @@ -69,7 +70,7 @@ namespace Pole.Sagas.Core ActivityWapper activityWapper = new ActivityWapper { ActivityDataType = dataType, - ActivityState = ActivityStatus.NotStarted, + ActivityStatus = ActivityStatus.NotStarted, ActivityType = targetActivityType, DataObj = data, Order = CurrentMaxOrder, @@ -81,7 +82,7 @@ namespace Pole.Sagas.Core public async Task GetResult() { - await eventSender.SagaStarted(Id, poleSagasOption.ServiceName); + await eventSender.SagaStarted(Id, poleSagasOption.ServiceName,DateTime.UtcNow); var executeActivity = GetNextExecuteActivity(); if (executeActivity == null) @@ -139,8 +140,8 @@ namespace Pole.Sagas.Core activityWapper.CancellationTokenSource = new System.Threading.CancellationTokenSource(2 * 1000); try { - var jsonContent = serializer.Serialize(activityWapper.DataObj, activityWapper.ActivityDataType); - await eventSender.ActivityExecuteStarted(activityId, Id, activityWapper.TimeOutSeconds, jsonContent, activityWapper.Order); + var bytesContent = serializer.SerializeToUtf8Bytes(activityWapper.DataObj, activityWapper.ActivityDataType); + await eventSender.ActivityExecuteStarted(activityId, Id, activityWapper.TimeOutSeconds, bytesContent, activityWapper.Order,DateTime.UtcNow); var result = await activityWapper.InvokeExecute(); if (!result.IsSuccess) { @@ -148,7 +149,7 @@ namespace Pole.Sagas.Core await CompensateActivity(result,currentExecuteOrder); return result; } - await eventSender.ActivityEnded(activityId, string.Empty); + await eventSender.ActivityEnded(activityId, Encoding.UTF8.GetBytes(string.Empty)); var executeActivity = GetNextExecuteActivity(); if (executeActivity == null) { diff --git a/src/Pole.Sagas/Core/SagaEntity.cs b/src/Pole.Sagas/Core/SagaEntity.cs new file mode 100644 index 0000000..3b5b9e9 --- /dev/null +++ b/src/Pole.Sagas/Core/SagaEntity.cs @@ -0,0 +1,16 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace Pole.Sagas.Core +{ + public class SagaEntity + { + public int Id { get; set; } + public string ServiceName { get; set; } + public List ActivityEntities { get; set; } + public string Status { get; set; } + public DateTime? ExpiresAt { get; set; } + public DateTime AddTime { get; set; } + } +} diff --git a/src/Pole.Sagas/Pole.Sagas.csproj b/src/Pole.Sagas/Pole.Sagas.csproj index 7cfb216..bb5bd42 100644 --- a/src/Pole.Sagas/Pole.Sagas.csproj +++ b/src/Pole.Sagas/Pole.Sagas.csproj @@ -5,11 +5,18 @@ - + + + + Server + Protos + + + diff --git a/src/Pole.Sagas/Protos/saga.proto b/src/Pole.Sagas/Protos/saga.proto new file mode 100644 index 0000000..31c7ca4 --- /dev/null +++ b/src/Pole.Sagas/Protos/saga.proto @@ -0,0 +1,77 @@ +syntax = "proto3"; + +option csharp_namespace = "Pole.Sagas.Server.Grpc"; + +package pole.Sagas.Server.Grpc; + +service Saga { + rpc SagaStarted (SagaStartedRequest) returns (CommonResponse); + rpc SagaEnded (SagaStartedRequest) returns (CommonResponse); + rpc ActivityExecuteStarted (ActivityExecuteStartedRequest) returns (CommonResponse); + rpc ActivityRetried (ActivityRetriedRequest) returns (CommonResponse); + rpc ActivityExecuteAborted (ActivityExecuteAbortedRequest) returns (CommonResponse); + rpc ActivityCompensateAborted (ActivityCompensateAbortedRequest) returns (CommonResponse); + rpc ActivityEnded (ActivityEndedRequest) returns (CommonResponse); + rpc ActivityCompensated (ActivityCompensatedRequest) returns (CommonResponse); + rpc ActivityExecuteOvertime (ActivityExecuteOvertimeRequest) returns (CommonResponse); + rpc ActivityRevoked (ActivityRevokedRequest) returns (CommonResponse); +} + +message CommonResponse{ + bool isSuccess = 1; + string message = 2; + string errors = 3; +} +message SagaStartedRequest { + string sagaId = 1; + string serviceName = 2; +} +message SagaEndedRequest { + string sagaId = 1; + string ExpiresAt = 2; +} +message ActivityExecuteStartedRequest { + string activityId = 1; + string sagaId = 2; + int32 timeOutSeconds = 3; + bytes parameterData = 4; + int32 order = 5; + string addTime = 6; +} +message ActivityRetriedRequest { + string activityId = 1; + string status = 2; + int32 retries = 3; + ActivityRetryType activityRetryType = 4; + enum ActivityRetryType{ + Execute = 0; + Compensate = 1; + } +} +message ActivityExecuteAbortedRequest { + string activityId = 1; + string errors = 2; +} +message ActivityCompensateAbortedRequest { + string activityId = 1; + string sagaId = 2; + string errors = 3; +} +message ActivityEndedRequest { + string activityId = 1; + string sagaId = 2; + bytes resultData = 3; +} +message ActivityCompensatedRequest { + string activityId = 1; +} +message ActivityExecuteOvertimeRequest { + string activityId = 1; + string sagaId = 2; + string errors = 3; +} +message ActivityRevokedRequest { + string activityId = 1; +} + + diff --git a/src/Pole.Sagas/Server/Events/SagaStartedEvent.cs b/src/Pole.Sagas/Server/Events/SagaStartedEvent.cs deleted file mode 100644 index 2f28f5a..0000000 --- a/src/Pole.Sagas/Server/Events/SagaStartedEvent.cs +++ /dev/null @@ -1,11 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Text; - -namespace Pole.Sagas.Server.Events -{ - class SagaStartedEvent:IEvent - { - - } -} diff --git a/src/Pole.Sagas/Server/IEvent.cs b/src/Pole.Sagas/Server/IEvent.cs deleted file mode 100644 index 0fc2d52..0000000 --- a/src/Pole.Sagas/Server/IEvent.cs +++ /dev/null @@ -1,11 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Text; - -namespace Pole.Sagas.Server -{ - public interface IEvent - { - - } -} -- libgit2 0.25.0