diff --git a/Pole.sln b/Pole.sln
index 8bc9b96..1caff7e 100644
--- a/Pole.sln
+++ b/Pole.sln
@@ -41,6 +41,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Pole.Sagas.Storage.PostgreS
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SagasTest.Api", "samples\apis\SagasTest.Api\SagasTest.Api.csproj", "{6138197E-6202-4E1B-9458-3CBEE60A36F9}"
EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Pole.Sagas.Server", "src\Pole.Sagas.Server\Pole.Sagas.Server.csproj", "{34ECE24E-0D78-4764-BC54-0CEE61BDB96A}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -103,6 +105,10 @@ Global
{6138197E-6202-4E1B-9458-3CBEE60A36F9}.Debug|Any CPU.Build.0 = Debug|Any CPU
{6138197E-6202-4E1B-9458-3CBEE60A36F9}.Release|Any CPU.ActiveCfg = Release|Any CPU
{6138197E-6202-4E1B-9458-3CBEE60A36F9}.Release|Any CPU.Build.0 = Release|Any CPU
+ {34ECE24E-0D78-4764-BC54-0CEE61BDB96A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {34ECE24E-0D78-4764-BC54-0CEE61BDB96A}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {34ECE24E-0D78-4764-BC54-0CEE61BDB96A}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {34ECE24E-0D78-4764-BC54-0CEE61BDB96A}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@@ -124,6 +130,7 @@ Global
{1F06D877-E4EC-4908-9057-38EDCE5E54E6} = {9932C965-8B38-4F70-9E43-86DC56860E2B}
{9505BDFC-395B-4257-AEB3-2B44611147A4} = {9932C965-8B38-4F70-9E43-86DC56860E2B}
{6138197E-6202-4E1B-9458-3CBEE60A36F9} = {475116FC-DEEC-4255-94E4-AE7B8C85038D}
+ {34ECE24E-0D78-4764-BC54-0CEE61BDB96A} = {9932C965-8B38-4F70-9E43-86DC56860E2B}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {DB0775A3-F293-4043-ADB7-72BAC081E87E}
diff --git a/src/Pole.Sagas.Server/Pole.Sagas.Server.csproj b/src/Pole.Sagas.Server/Pole.Sagas.Server.csproj
new file mode 100644
index 0000000..cd806b5
--- /dev/null
+++ b/src/Pole.Sagas.Server/Pole.Sagas.Server.csproj
@@ -0,0 +1,23 @@
+
+
+
+ netcoreapp3.1
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/Pole.Sagas.Server/PoleSagasServerApplicationBuilderExtensions.cs b/src/Pole.Sagas.Server/PoleSagasServerApplicationBuilderExtensions.cs
new file mode 100644
index 0000000..b51db48
--- /dev/null
+++ b/src/Pole.Sagas.Server/PoleSagasServerApplicationBuilderExtensions.cs
@@ -0,0 +1,21 @@
+using Microsoft.AspNetCore.Builder;
+using System;
+using System.Collections.Generic;
+using System.Text;
+
+namespace Pole.Sagas.Server
+{
+ public static class PoleSagasServerApplicationBuilderExtensions
+ {
+ public static IApplicationBuilder UserPoleSagasServer(IApplicationBuilder builder)
+ {
+ builder.UseRouting();
+
+ builder.UseEndpoints(endpoints =>
+ {
+ endpoints.MapGrpcService().EnableGrpcWeb();
+ });
+
+ }
+ }
+}
diff --git a/src/Pole.Sagas.Server/PoleSagasServerServiceCollectionExtensions.cs b/src/Pole.Sagas.Server/PoleSagasServerServiceCollectionExtensions.cs
new file mode 100644
index 0000000..0d89091
--- /dev/null
+++ b/src/Pole.Sagas.Server/PoleSagasServerServiceCollectionExtensions.cs
@@ -0,0 +1,16 @@
+using Microsoft.Extensions.DependencyInjection;
+using System;
+using System.Collections.Generic;
+using System.Text;
+
+namespace Pole.Sagas.Server
+{
+ public static class PoleSagasServerServiceCollectionExtensions
+ {
+ public static IServiceCollection AddPoleSagasServer(IServiceCollection services)
+ {
+ services.AddGrpc();
+ return services;
+ }
+ }
+}
diff --git a/src/Pole.Sagas.Server/Protos/saga.proto b/src/Pole.Sagas.Server/Protos/saga.proto
new file mode 100644
index 0000000..9ec43a9
--- /dev/null
+++ b/src/Pole.Sagas.Server/Protos/saga.proto
@@ -0,0 +1,78 @@
+syntax = "proto3";
+
+option csharp_namespace = "Pole.Sagas.Server.Grpc";
+
+package pole.Sagas.Server.Grpc;
+
+service Saga {
+ rpc SagaStarted (SagaStartedRequest) returns (CommonResponse);
+ rpc SagaEnded (SagaEndedRequest) returns (CommonResponse);
+ rpc ActivityExecuteStarted (ActivityExecuteStartedRequest) returns (CommonResponse);
+ rpc ActivityRetried (ActivityRetriedRequest) returns (CommonResponse);
+ rpc ActivityExecuteAborted (ActivityExecuteAbortedRequest) returns (CommonResponse);
+ rpc ActivityCompensateAborted (ActivityCompensateAbortedRequest) returns (CommonResponse);
+ rpc ActivityEnded (ActivityEndedRequest) returns (CommonResponse);
+ rpc ActivityCompensated (ActivityCompensatedRequest) returns (CommonResponse);
+ rpc ActivityExecuteOvertime (ActivityExecuteOvertimeRequest) returns (CommonResponse);
+ rpc ActivityRevoked (ActivityRevokedRequest) returns (CommonResponse);
+}
+
+message CommonResponse{
+ bool isSuccess = 1;
+ string message = 2;
+ string errors = 3;
+}
+message SagaStartedRequest {
+ string sagaId = 1;
+ string serviceName = 2;
+ string addTime = 3;
+}
+message SagaEndedRequest {
+ string sagaId = 1;
+ string ExpiresAt = 2;
+}
+message ActivityExecuteStartedRequest {
+ string activityId = 1;
+ string sagaId = 2;
+ int32 timeOutSeconds = 3;
+ bytes parameterData = 4;
+ int32 order = 5;
+ string addTime = 6;
+}
+message ActivityRetriedRequest {
+ string activityId = 1;
+ string status = 2;
+ int32 retries = 3;
+ ActivityRetryType activityRetryType = 4;
+ enum ActivityRetryType{
+ Execute = 0;
+ Compensate = 1;
+ }
+}
+message ActivityExecuteAbortedRequest {
+ string activityId = 1;
+ string errors = 2;
+}
+message ActivityCompensateAbortedRequest {
+ string activityId = 1;
+ string sagaId = 2;
+ string errors = 3;
+}
+message ActivityEndedRequest {
+ string activityId = 1;
+ string sagaId = 2;
+ bytes resultData = 3;
+}
+message ActivityCompensatedRequest {
+ string activityId = 1;
+}
+message ActivityExecuteOvertimeRequest {
+ string activityId = 1;
+ string sagaId = 2;
+ string errors = 3;
+}
+message ActivityRevokedRequest {
+ string activityId = 1;
+}
+
+
diff --git a/src/Pole.Sagas.Server/Services/SagaService.cs b/src/Pole.Sagas.Server/Services/SagaService.cs
new file mode 100644
index 0000000..98f795a
--- /dev/null
+++ b/src/Pole.Sagas.Server/Services/SagaService.cs
@@ -0,0 +1,164 @@
+using Grpc.Core;
+using Pole.Sagas.Core;
+using Pole.Sagas.Server.Grpc;
+using System;
+using System.Collections.Generic;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Pole.Sagas.Server.Services
+{
+ public class SagaService : Pole.Sagas.Server.Grpc.Saga.SagaBase
+ {
+ private readonly ISagaStorage sagaStorage;
+ public SagaService(ISagaStorage sagaStorage)
+ {
+ this.sagaStorage = sagaStorage;
+ }
+ public override async Task ActivityCompensateAborted(ActivityCompensateAbortedRequest request, ServerCallContext context)
+ {
+ CommonResponse commonResponse = new CommonResponse();
+ try
+ {
+ await sagaStorage.ActivityCompensateAborted(request.ActivityId, request.SagaId, request.Errors);
+ commonResponse.IsSuccess = true;
+ }
+ catch (Exception ex)
+ {
+ commonResponse.Errors = CombineError(ex);
+ }
+ return commonResponse;
+ }
+ public override async Task ActivityCompensated(ActivityCompensatedRequest request, ServerCallContext context)
+ {
+ CommonResponse commonResponse = new CommonResponse();
+ try
+ {
+ await sagaStorage.ActivityCompensated(request.ActivityId);
+ commonResponse.IsSuccess = true;
+ }
+ catch (Exception ex)
+ {
+ commonResponse.Errors = CombineError(ex);
+ }
+ return commonResponse;
+ }
+ public override async Task ActivityEnded(ActivityEndedRequest request, ServerCallContext context)
+ {
+ CommonResponse commonResponse = new CommonResponse();
+ try
+ {
+ await sagaStorage.ActivityEnded(request.ActivityId, request.ResultData.ToByteArray());
+ commonResponse.IsSuccess = true;
+ }
+ catch (Exception ex)
+ {
+ commonResponse.Errors = CombineError(ex);
+ }
+ return commonResponse;
+ }
+ public override async Task ActivityExecuteAborted(ActivityExecuteAbortedRequest request, ServerCallContext context)
+ {
+ CommonResponse commonResponse = new CommonResponse();
+ try
+ {
+ await sagaStorage.ActivityExecuteAborted(request.ActivityId, request.Errors);
+ commonResponse.IsSuccess = true;
+ }
+ catch (Exception ex)
+ {
+ commonResponse.Errors = CombineError(ex);
+ }
+ return commonResponse;
+ }
+ public override async Task ActivityExecuteOvertime(ActivityExecuteOvertimeRequest request, ServerCallContext context)
+ {
+ CommonResponse commonResponse = new CommonResponse();
+ try
+ {
+ await sagaStorage.ActivityExecuteOvertime(request.ActivityId, request.SagaId, request.Errors);
+ commonResponse.IsSuccess = true;
+ }
+ catch (Exception ex)
+ {
+ commonResponse.Errors = CombineError(ex);
+ }
+ return commonResponse;
+ }
+ public override async Task ActivityExecuteStarted(ActivityExecuteStartedRequest request, ServerCallContext context)
+ {
+ CommonResponse commonResponse = new CommonResponse();
+ try
+ {
+ await sagaStorage.ActivityExecuteStarted(request.ActivityId, request.SagaId, request.TimeOutSeconds, request.ParameterData.ToByteArray(), request.Order, Convert.ToDateTime(request.AddTime));
+ commonResponse.IsSuccess = true;
+ }
+ catch (Exception ex)
+ {
+ commonResponse.Errors = CombineError(ex);
+ }
+ return commonResponse;
+ }
+ public override async Task ActivityRetried(ActivityRetriedRequest request, ServerCallContext context)
+ {
+ CommonResponse commonResponse = new CommonResponse();
+ try
+ {
+ var targetActivityRetryType = request.ActivityRetryType == Pole.Sagas.Server.Grpc.ActivityRetriedRequest.Types.ActivityRetryType.Compensate ? ActivityRetryType.Compensate : ActivityRetryType.Execute;
+ await sagaStorage.ActivityRetried(request.ActivityId, request.Status, request.Retries, targetActivityRetryType);
+ commonResponse.IsSuccess = true;
+ }
+ catch (Exception ex)
+ {
+ commonResponse.Errors = CombineError(ex);
+ }
+ return commonResponse;
+ }
+ public override async Task ActivityRevoked(ActivityRevokedRequest request, ServerCallContext context)
+ {
+ CommonResponse commonResponse = new CommonResponse();
+ try
+ {
+ await sagaStorage.ActivityRevoked(request.ActivityId);
+ commonResponse.IsSuccess = true;
+ }
+ catch (Exception ex)
+ {
+ commonResponse.Errors = CombineError(ex);
+ }
+ return commonResponse;
+ }
+ public override async Task SagaEnded(SagaEndedRequest request, ServerCallContext context)
+ {
+ CommonResponse commonResponse = new CommonResponse();
+ try
+ {
+ await sagaStorage.SagaEnded(request.SagaId,Convert.ToDateTime(request.ExpiresAt));
+ commonResponse.IsSuccess = true;
+ }
+ catch (Exception ex)
+ {
+ commonResponse.Errors = CombineError(ex);
+ }
+ return commonResponse;
+ }
+ public override async Task SagaStarted(SagaStartedRequest request, ServerCallContext context)
+ {
+ CommonResponse commonResponse = new CommonResponse();
+ try
+ {
+ await sagaStorage.SagaStarted(request.SagaId,request.ServiceName,Convert.ToDateTime( request.AddTime));
+ commonResponse.IsSuccess = true;
+ }
+ catch (Exception ex)
+ {
+ commonResponse.Errors = CombineError(ex);
+ }
+ return commonResponse;
+ }
+ private string CombineError(Exception exception)
+ {
+ return exception.InnerException != null ? exception.InnerException.Message + exception.StackTrace : exception.Message + exception.StackTrace;
+ }
+ }
+}
diff --git a/src/Pole.Sagas.Storage.PostgreSql/Pole.Sagas.Storage.PostgreSql.csproj b/src/Pole.Sagas.Storage.PostgreSql/Pole.Sagas.Storage.PostgreSql.csproj
index 9f5c4f4..ac168da 100644
--- a/src/Pole.Sagas.Storage.PostgreSql/Pole.Sagas.Storage.PostgreSql.csproj
+++ b/src/Pole.Sagas.Storage.PostgreSql/Pole.Sagas.Storage.PostgreSql.csproj
@@ -1,7 +1,11 @@
- netstandard2.0
+ netstandard2.1
+
+
+
+
diff --git a/src/Pole.Sagas/Core/Abstraction/IEventSender.cs b/src/Pole.Sagas/Core/Abstraction/IEventSender.cs
index 515318b..07284a8 100644
--- a/src/Pole.Sagas/Core/Abstraction/IEventSender.cs
+++ b/src/Pole.Sagas/Core/Abstraction/IEventSender.cs
@@ -5,13 +5,13 @@ namespace Pole.Sagas.Core.Abstraction
{
public interface IEventSender
{
- Task SagaStarted(string sagaId, string serviceName);
+ Task SagaStarted(string sagaId, string serviceName, DateTime addTime);
Task SagaEnded(string sagaId, DateTime ExpiresAt);
- Task ActivityExecuteStarted(string activityId, string sagaId, int timeOutSeconds, string parameterContent, int order);
- Task ActivityRetried(string activityId, string status, int retries, string resultContent);
+ Task ActivityExecuteStarted(string activityId, string sagaId, int timeOutSeconds, byte[] ParameterData, int order, DateTime addTime);
+ Task ActivityRetried(string activityId, string status, int retries, ActivityRetryType retryType);
Task ActivityExecuteAborted(string activityId, string errors);
Task ActivityCompensateAborted(string activityId, string sagaId, string errors);
- Task ActivityEnded(string activityId, string resultContent);
+ Task ActivityEnded(string activityId, byte[] resultData);
Task ActivityCompensated(string activityId);
Task ActivityExecuteOvertime(string activityId, string sagaId, string errors);
Task ActivityRevoked(string activityId);
diff --git a/src/Pole.Sagas/Server/Events/SagaStartedEvent.cs b/src/Pole.Sagas/Core/Abstraction/ISagaInvoker.cs
similarity index 77%
rename from src/Pole.Sagas/Server/Events/SagaStartedEvent.cs
rename to src/Pole.Sagas/Core/Abstraction/ISagaInvoker.cs
index 2f28f5a..69629b0 100644
--- a/src/Pole.Sagas/Server/Events/SagaStartedEvent.cs
+++ b/src/Pole.Sagas/Core/Abstraction/ISagaInvoker.cs
@@ -2,10 +2,9 @@
using System.Collections.Generic;
using System.Text;
-namespace Pole.Sagas.Server.Events
+namespace Pole.Sagas.Core.Abstraction
{
- class SagaStartedEvent:IEvent
+ public interface ISagaInvoker
{
-
}
}
diff --git a/src/Pole.Sagas/Core/ActivityEntity.cs b/src/Pole.Sagas/Core/ActivityEntity.cs
new file mode 100644
index 0000000..b1e6ab9
--- /dev/null
+++ b/src/Pole.Sagas/Core/ActivityEntity.cs
@@ -0,0 +1,21 @@
+using System;
+using System.Collections.Generic;
+using System.Text;
+
+namespace Pole.Sagas.Core
+{
+ public class ActivityEntity
+ {
+ public string Id { get; set; }
+ public string SagaId { get; set; }
+ public int Order { get; set; }
+ public string Status { get; set; }
+ public int TimeOutSeconds { get; set; }
+ public Byte[] ParameterData { get; set; }
+ public Byte[] ResultData { get; set; }
+ public string Errors { get; set; }
+ public int ExecuteRetries { get; set; }
+ public int CompensateRetries { get; set; }
+ public DateTime AddTime { get; set; }
+ }
+}
diff --git a/src/Pole.Sagas/Server/IEvent.cs b/src/Pole.Sagas/Core/ActivityRetryType.cs
similarity index 70%
rename from src/Pole.Sagas/Server/IEvent.cs
rename to src/Pole.Sagas/Core/ActivityRetryType.cs
index 0fc2d52..fe5a014 100644
--- a/src/Pole.Sagas/Server/IEvent.cs
+++ b/src/Pole.Sagas/Core/ActivityRetryType.cs
@@ -2,10 +2,11 @@
using System.Collections.Generic;
using System.Text;
-namespace Pole.Sagas.Server
+namespace Pole.Sagas.Core
{
- public interface IEvent
+ public enum ActivityRetryType
{
-
+ Execute,
+ Compensate
}
}
diff --git a/src/Pole.Sagas/Core/ActivityWapper.cs b/src/Pole.Sagas/Core/ActivityWapper.cs
index 52c4486..c3ecd75 100644
--- a/src/Pole.Sagas/Core/ActivityWapper.cs
+++ b/src/Pole.Sagas/Core/ActivityWapper.cs
@@ -15,7 +15,7 @@ namespace Pole.Sagas.Core
public Type ActivityDataType { get; set; }
public object DataObj { get; set; }
public int Order { get; set; }
- public ActivityStatus ActivityState { get; set; }
+ public ActivityStatus ActivityStatus { get; set; }
public IServiceProvider ServiceProvider { get; set; }
public int TimeOutSeconds { get; set; }
public CancellationTokenSource CancellationTokenSource { get; set; }
diff --git a/src/Pole.Sagas/Core/EventSender.cs b/src/Pole.Sagas/Core/EventSender.cs
index bb400e8..81d8437 100644
--- a/src/Pole.Sagas/Core/EventSender.cs
+++ b/src/Pole.Sagas/Core/EventSender.cs
@@ -18,22 +18,22 @@ namespace Pole.Sagas.Core
return Task.CompletedTask;
}
- public Task ActivityEnded(string activityId, string resultContent)
+ public Task ActivityEnded(string activityId, byte[] resultData)
{
return Task.CompletedTask;
}
- public Task ActivityExecuteAborted(string activityId, string errors)
+ public Task ActivityExecuteAborted(string activityId, string errors)
{
return Task.CompletedTask;
}
- public Task ActivityRetried(string activityId, string status, int retries, string resultContent)
+ public Task ActivityRetried(string activityId, string status, int retries, ActivityRetryType retryType)
{
return Task.CompletedTask;
}
- public Task ActivityExecuteStarted(string activityId, string sagaId, int timeoutSeconds, string parameterContent, int order)
+ public Task ActivityExecuteStarted(string activityId, string sagaId, int timeoutSeconds, byte[] ParameterData, int order, DateTime addTime)
{
return Task.CompletedTask;
}
@@ -43,7 +43,7 @@ namespace Pole.Sagas.Core
return Task.CompletedTask;
}
- public Task SagaStarted(string sagaId, string serviceName)
+ public Task SagaStarted(string sagaId, string serviceName, DateTime addTime)
{
return Task.CompletedTask;
}
diff --git a/src/Pole.Sagas/Core/ISagaStorage.cs b/src/Pole.Sagas/Core/ISagaStorage.cs
new file mode 100644
index 0000000..ef85923
--- /dev/null
+++ b/src/Pole.Sagas/Core/ISagaStorage.cs
@@ -0,0 +1,22 @@
+using Pole.Sagas.Core;
+using System;
+using System.Collections.Generic;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace Pole.Sagas.Core
+{
+ public interface ISagaStorage
+ {
+ Task SagaStarted(string sagaId, string serviceName,DateTime addTime);
+ Task SagaEnded(string sagaId, DateTime ExpiresAt);
+ Task ActivityExecuteStarted(string activityId, string sagaId, int timeOutSeconds, byte[] ParameterData, int order,DateTime addTime);
+ Task ActivityRetried(string activityId, string status, int retries, ActivityRetryType retryType);
+ Task ActivityExecuteAborted(string activityId, string errors);
+ Task ActivityCompensateAborted(string activityId, string sagaId, string errors);
+ Task ActivityEnded(string activityId, byte[] resultData);
+ Task ActivityCompensated(string activityId);
+ Task ActivityExecuteOvertime(string activityId, string sagaId, string errors);
+ Task ActivityRevoked(string activityId);
+ }
+}
diff --git a/src/Pole.Sagas/Core/Saga.cs b/src/Pole.Sagas/Core/Saga.cs
index c0f3ce2..24972c3 100644
--- a/src/Pole.Sagas/Core/Saga.cs
+++ b/src/Pole.Sagas/Core/Saga.cs
@@ -43,7 +43,7 @@ namespace Pole.Sagas.Core
this.activityFinder = activityFinder;
Id = snowflakeIdGenerator.NextId();
}
- internal Saga(ISnowflakeIdGenerator snowflakeIdGenerator, IServiceProvider serviceProvider, IEventSender eventSender, PoleSagasOption poleSagasOption, ISerializer serializer, IActivityFinder activityFinder,int currentExecuteOrder,int currentCompensateOrder)
+ internal Saga(ISnowflakeIdGenerator snowflakeIdGenerator, IServiceProvider serviceProvider, IEventSender eventSender, PoleSagasOption poleSagasOption, ISerializer serializer, IActivityFinder activityFinder,int currentExecuteOrder,int currentCompensateOrder, List activities)
{
this.snowflakeIdGenerator = snowflakeIdGenerator;
this.serviceProvider = serviceProvider;
@@ -54,6 +54,7 @@ namespace Pole.Sagas.Core
Id = snowflakeIdGenerator.NextId();
this.currentExecuteOrder = currentExecuteOrder;
this.currentCompensateOrder = currentCompensateOrder;
+ this.activities = activities;
}
public void AddActivity(string activityName, object data, int timeOutSeconds = 2)
@@ -69,7 +70,7 @@ namespace Pole.Sagas.Core
ActivityWapper activityWapper = new ActivityWapper
{
ActivityDataType = dataType,
- ActivityState = ActivityStatus.NotStarted,
+ ActivityStatus = ActivityStatus.NotStarted,
ActivityType = targetActivityType,
DataObj = data,
Order = CurrentMaxOrder,
@@ -81,7 +82,7 @@ namespace Pole.Sagas.Core
public async Task GetResult()
{
- await eventSender.SagaStarted(Id, poleSagasOption.ServiceName);
+ await eventSender.SagaStarted(Id, poleSagasOption.ServiceName,DateTime.UtcNow);
var executeActivity = GetNextExecuteActivity();
if (executeActivity == null)
@@ -139,8 +140,8 @@ namespace Pole.Sagas.Core
activityWapper.CancellationTokenSource = new System.Threading.CancellationTokenSource(2 * 1000);
try
{
- var jsonContent = serializer.Serialize(activityWapper.DataObj, activityWapper.ActivityDataType);
- await eventSender.ActivityExecuteStarted(activityId, Id, activityWapper.TimeOutSeconds, jsonContent, activityWapper.Order);
+ var bytesContent = serializer.SerializeToUtf8Bytes(activityWapper.DataObj, activityWapper.ActivityDataType);
+ await eventSender.ActivityExecuteStarted(activityId, Id, activityWapper.TimeOutSeconds, bytesContent, activityWapper.Order,DateTime.UtcNow);
var result = await activityWapper.InvokeExecute();
if (!result.IsSuccess)
{
@@ -148,7 +149,7 @@ namespace Pole.Sagas.Core
await CompensateActivity(result,currentExecuteOrder);
return result;
}
- await eventSender.ActivityEnded(activityId, string.Empty);
+ await eventSender.ActivityEnded(activityId, Encoding.UTF8.GetBytes(string.Empty));
var executeActivity = GetNextExecuteActivity();
if (executeActivity == null)
{
diff --git a/src/Pole.Sagas/Core/SagaEntity.cs b/src/Pole.Sagas/Core/SagaEntity.cs
new file mode 100644
index 0000000..3b5b9e9
--- /dev/null
+++ b/src/Pole.Sagas/Core/SagaEntity.cs
@@ -0,0 +1,16 @@
+using System;
+using System.Collections.Generic;
+using System.Text;
+
+namespace Pole.Sagas.Core
+{
+ public class SagaEntity
+ {
+ public int Id { get; set; }
+ public string ServiceName { get; set; }
+ public List ActivityEntities { get; set; }
+ public string Status { get; set; }
+ public DateTime? ExpiresAt { get; set; }
+ public DateTime AddTime { get; set; }
+ }
+}
diff --git a/src/Pole.Sagas/Pole.Sagas.csproj b/src/Pole.Sagas/Pole.Sagas.csproj
index 7cfb216..bb5bd42 100644
--- a/src/Pole.Sagas/Pole.Sagas.csproj
+++ b/src/Pole.Sagas/Pole.Sagas.csproj
@@ -5,11 +5,18 @@
-
+
+
+
+ Server
+ Protos
+
+
+
diff --git a/src/Pole.Sagas/Protos/saga.proto b/src/Pole.Sagas/Protos/saga.proto
new file mode 100644
index 0000000..31c7ca4
--- /dev/null
+++ b/src/Pole.Sagas/Protos/saga.proto
@@ -0,0 +1,77 @@
+syntax = "proto3";
+
+option csharp_namespace = "Pole.Sagas.Server.Grpc";
+
+package pole.Sagas.Server.Grpc;
+
+service Saga {
+ rpc SagaStarted (SagaStartedRequest) returns (CommonResponse);
+ rpc SagaEnded (SagaStartedRequest) returns (CommonResponse);
+ rpc ActivityExecuteStarted (ActivityExecuteStartedRequest) returns (CommonResponse);
+ rpc ActivityRetried (ActivityRetriedRequest) returns (CommonResponse);
+ rpc ActivityExecuteAborted (ActivityExecuteAbortedRequest) returns (CommonResponse);
+ rpc ActivityCompensateAborted (ActivityCompensateAbortedRequest) returns (CommonResponse);
+ rpc ActivityEnded (ActivityEndedRequest) returns (CommonResponse);
+ rpc ActivityCompensated (ActivityCompensatedRequest) returns (CommonResponse);
+ rpc ActivityExecuteOvertime (ActivityExecuteOvertimeRequest) returns (CommonResponse);
+ rpc ActivityRevoked (ActivityRevokedRequest) returns (CommonResponse);
+}
+
+message CommonResponse{
+ bool isSuccess = 1;
+ string message = 2;
+ string errors = 3;
+}
+message SagaStartedRequest {
+ string sagaId = 1;
+ string serviceName = 2;
+}
+message SagaEndedRequest {
+ string sagaId = 1;
+ string ExpiresAt = 2;
+}
+message ActivityExecuteStartedRequest {
+ string activityId = 1;
+ string sagaId = 2;
+ int32 timeOutSeconds = 3;
+ bytes parameterData = 4;
+ int32 order = 5;
+ string addTime = 6;
+}
+message ActivityRetriedRequest {
+ string activityId = 1;
+ string status = 2;
+ int32 retries = 3;
+ ActivityRetryType activityRetryType = 4;
+ enum ActivityRetryType{
+ Execute = 0;
+ Compensate = 1;
+ }
+}
+message ActivityExecuteAbortedRequest {
+ string activityId = 1;
+ string errors = 2;
+}
+message ActivityCompensateAbortedRequest {
+ string activityId = 1;
+ string sagaId = 2;
+ string errors = 3;
+}
+message ActivityEndedRequest {
+ string activityId = 1;
+ string sagaId = 2;
+ bytes resultData = 3;
+}
+message ActivityCompensatedRequest {
+ string activityId = 1;
+}
+message ActivityExecuteOvertimeRequest {
+ string activityId = 1;
+ string sagaId = 2;
+ string errors = 3;
+}
+message ActivityRevokedRequest {
+ string activityId = 1;
+}
+
+