Commit 9aa11ff0 by dingsongjie

添加 grpc 服务

parent 42cceee2
......@@ -41,6 +41,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Pole.Sagas.Storage.PostgreS
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SagasTest.Api", "samples\apis\SagasTest.Api\SagasTest.Api.csproj", "{6138197E-6202-4E1B-9458-3CBEE60A36F9}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Pole.Sagas.Server", "src\Pole.Sagas.Server\Pole.Sagas.Server.csproj", "{34ECE24E-0D78-4764-BC54-0CEE61BDB96A}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
......@@ -103,6 +105,10 @@ Global
{6138197E-6202-4E1B-9458-3CBEE60A36F9}.Debug|Any CPU.Build.0 = Debug|Any CPU
{6138197E-6202-4E1B-9458-3CBEE60A36F9}.Release|Any CPU.ActiveCfg = Release|Any CPU
{6138197E-6202-4E1B-9458-3CBEE60A36F9}.Release|Any CPU.Build.0 = Release|Any CPU
{34ECE24E-0D78-4764-BC54-0CEE61BDB96A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{34ECE24E-0D78-4764-BC54-0CEE61BDB96A}.Debug|Any CPU.Build.0 = Debug|Any CPU
{34ECE24E-0D78-4764-BC54-0CEE61BDB96A}.Release|Any CPU.ActiveCfg = Release|Any CPU
{34ECE24E-0D78-4764-BC54-0CEE61BDB96A}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
......@@ -124,6 +130,7 @@ Global
{1F06D877-E4EC-4908-9057-38EDCE5E54E6} = {9932C965-8B38-4F70-9E43-86DC56860E2B}
{9505BDFC-395B-4257-AEB3-2B44611147A4} = {9932C965-8B38-4F70-9E43-86DC56860E2B}
{6138197E-6202-4E1B-9458-3CBEE60A36F9} = {475116FC-DEEC-4255-94E4-AE7B8C85038D}
{34ECE24E-0D78-4764-BC54-0CEE61BDB96A} = {9932C965-8B38-4F70-9E43-86DC56860E2B}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {DB0775A3-F293-4043-ADB7-72BAC081E87E}
......
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>netcoreapp3.1</TargetFramework>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Grpc.AspNetCore" Version="2.27.0" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Pole.Sagas\Pole.Sagas.csproj" />
</ItemGroup>
<ItemGroup>
<Protobuf Include="Protos\*" AdditionalImportDirs="Protos" GrpcServices="Server" />
</ItemGroup>
<ItemGroup>
<None Remove="Protos\saga.proto" />
</ItemGroup>
</Project>
using Microsoft.AspNetCore.Builder;
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.Sagas.Server
{
public static class PoleSagasServerApplicationBuilderExtensions
{
public static IApplicationBuilder UserPoleSagasServer(IApplicationBuilder builder)
{
builder.UseRouting();
builder.UseEndpoints(endpoints =>
{
endpoints.MapGrpcService<BacketService>().EnableGrpcWeb();
});
}
}
}
using Microsoft.Extensions.DependencyInjection;
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.Sagas.Server
{
public static class PoleSagasServerServiceCollectionExtensions
{
public static IServiceCollection AddPoleSagasServer(IServiceCollection services)
{
services.AddGrpc();
return services;
}
}
}
syntax = "proto3";
option csharp_namespace = "Pole.Sagas.Server.Grpc";
package pole.Sagas.Server.Grpc;
service Saga {
rpc SagaStarted (SagaStartedRequest) returns (CommonResponse);
rpc SagaEnded (SagaEndedRequest) returns (CommonResponse);
rpc ActivityExecuteStarted (ActivityExecuteStartedRequest) returns (CommonResponse);
rpc ActivityRetried (ActivityRetriedRequest) returns (CommonResponse);
rpc ActivityExecuteAborted (ActivityExecuteAbortedRequest) returns (CommonResponse);
rpc ActivityCompensateAborted (ActivityCompensateAbortedRequest) returns (CommonResponse);
rpc ActivityEnded (ActivityEndedRequest) returns (CommonResponse);
rpc ActivityCompensated (ActivityCompensatedRequest) returns (CommonResponse);
rpc ActivityExecuteOvertime (ActivityExecuteOvertimeRequest) returns (CommonResponse);
rpc ActivityRevoked (ActivityRevokedRequest) returns (CommonResponse);
}
message CommonResponse{
bool isSuccess = 1;
string message = 2;
string errors = 3;
}
message SagaStartedRequest {
string sagaId = 1;
string serviceName = 2;
string addTime = 3;
}
message SagaEndedRequest {
string sagaId = 1;
string ExpiresAt = 2;
}
message ActivityExecuteStartedRequest {
string activityId = 1;
string sagaId = 2;
int32 timeOutSeconds = 3;
bytes parameterData = 4;
int32 order = 5;
string addTime = 6;
}
message ActivityRetriedRequest {
string activityId = 1;
string status = 2;
int32 retries = 3;
ActivityRetryType activityRetryType = 4;
enum ActivityRetryType{
Execute = 0;
Compensate = 1;
}
}
message ActivityExecuteAbortedRequest {
string activityId = 1;
string errors = 2;
}
message ActivityCompensateAbortedRequest {
string activityId = 1;
string sagaId = 2;
string errors = 3;
}
message ActivityEndedRequest {
string activityId = 1;
string sagaId = 2;
bytes resultData = 3;
}
message ActivityCompensatedRequest {
string activityId = 1;
}
message ActivityExecuteOvertimeRequest {
string activityId = 1;
string sagaId = 2;
string errors = 3;
}
message ActivityRevokedRequest {
string activityId = 1;
}
using Grpc.Core;
using Pole.Sagas.Core;
using Pole.Sagas.Server.Grpc;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
namespace Pole.Sagas.Server.Services
{
public class SagaService : Pole.Sagas.Server.Grpc.Saga.SagaBase
{
private readonly ISagaStorage sagaStorage;
public SagaService(ISagaStorage sagaStorage)
{
this.sagaStorage = sagaStorage;
}
public override async Task<CommonResponse> ActivityCompensateAborted(ActivityCompensateAbortedRequest request, ServerCallContext context)
{
CommonResponse commonResponse = new CommonResponse();
try
{
await sagaStorage.ActivityCompensateAborted(request.ActivityId, request.SagaId, request.Errors);
commonResponse.IsSuccess = true;
}
catch (Exception ex)
{
commonResponse.Errors = CombineError(ex);
}
return commonResponse;
}
public override async Task<CommonResponse> ActivityCompensated(ActivityCompensatedRequest request, ServerCallContext context)
{
CommonResponse commonResponse = new CommonResponse();
try
{
await sagaStorage.ActivityCompensated(request.ActivityId);
commonResponse.IsSuccess = true;
}
catch (Exception ex)
{
commonResponse.Errors = CombineError(ex);
}
return commonResponse;
}
public override async Task<CommonResponse> ActivityEnded(ActivityEndedRequest request, ServerCallContext context)
{
CommonResponse commonResponse = new CommonResponse();
try
{
await sagaStorage.ActivityEnded(request.ActivityId, request.ResultData.ToByteArray());
commonResponse.IsSuccess = true;
}
catch (Exception ex)
{
commonResponse.Errors = CombineError(ex);
}
return commonResponse;
}
public override async Task<CommonResponse> ActivityExecuteAborted(ActivityExecuteAbortedRequest request, ServerCallContext context)
{
CommonResponse commonResponse = new CommonResponse();
try
{
await sagaStorage.ActivityExecuteAborted(request.ActivityId, request.Errors);
commonResponse.IsSuccess = true;
}
catch (Exception ex)
{
commonResponse.Errors = CombineError(ex);
}
return commonResponse;
}
public override async Task<CommonResponse> ActivityExecuteOvertime(ActivityExecuteOvertimeRequest request, ServerCallContext context)
{
CommonResponse commonResponse = new CommonResponse();
try
{
await sagaStorage.ActivityExecuteOvertime(request.ActivityId, request.SagaId, request.Errors);
commonResponse.IsSuccess = true;
}
catch (Exception ex)
{
commonResponse.Errors = CombineError(ex);
}
return commonResponse;
}
public override async Task<CommonResponse> ActivityExecuteStarted(ActivityExecuteStartedRequest request, ServerCallContext context)
{
CommonResponse commonResponse = new CommonResponse();
try
{
await sagaStorage.ActivityExecuteStarted(request.ActivityId, request.SagaId, request.TimeOutSeconds, request.ParameterData.ToByteArray(), request.Order, Convert.ToDateTime(request.AddTime));
commonResponse.IsSuccess = true;
}
catch (Exception ex)
{
commonResponse.Errors = CombineError(ex);
}
return commonResponse;
}
public override async Task<CommonResponse> ActivityRetried(ActivityRetriedRequest request, ServerCallContext context)
{
CommonResponse commonResponse = new CommonResponse();
try
{
var targetActivityRetryType = request.ActivityRetryType == Pole.Sagas.Server.Grpc.ActivityRetriedRequest.Types.ActivityRetryType.Compensate ? ActivityRetryType.Compensate : ActivityRetryType.Execute;
await sagaStorage.ActivityRetried(request.ActivityId, request.Status, request.Retries, targetActivityRetryType);
commonResponse.IsSuccess = true;
}
catch (Exception ex)
{
commonResponse.Errors = CombineError(ex);
}
return commonResponse;
}
public override async Task<CommonResponse> ActivityRevoked(ActivityRevokedRequest request, ServerCallContext context)
{
CommonResponse commonResponse = new CommonResponse();
try
{
await sagaStorage.ActivityRevoked(request.ActivityId);
commonResponse.IsSuccess = true;
}
catch (Exception ex)
{
commonResponse.Errors = CombineError(ex);
}
return commonResponse;
}
public override async Task<CommonResponse> SagaEnded(SagaEndedRequest request, ServerCallContext context)
{
CommonResponse commonResponse = new CommonResponse();
try
{
await sagaStorage.SagaEnded(request.SagaId,Convert.ToDateTime(request.ExpiresAt));
commonResponse.IsSuccess = true;
}
catch (Exception ex)
{
commonResponse.Errors = CombineError(ex);
}
return commonResponse;
}
public override async Task<CommonResponse> SagaStarted(SagaStartedRequest request, ServerCallContext context)
{
CommonResponse commonResponse = new CommonResponse();
try
{
await sagaStorage.SagaStarted(request.SagaId,request.ServiceName,Convert.ToDateTime( request.AddTime));
commonResponse.IsSuccess = true;
}
catch (Exception ex)
{
commonResponse.Errors = CombineError(ex);
}
return commonResponse;
}
private string CombineError(Exception exception)
{
return exception.InnerException != null ? exception.InnerException.Message + exception.StackTrace : exception.Message + exception.StackTrace;
}
}
}
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
<TargetFramework>netstandard2.1</TargetFramework>
</PropertyGroup>
<ItemGroup>
<ProjectReference Include="..\Pole.Sagas\Pole.Sagas.csproj" />
</ItemGroup>
</Project>
......@@ -5,13 +5,13 @@ namespace Pole.Sagas.Core.Abstraction
{
public interface IEventSender
{
Task SagaStarted(string sagaId, string serviceName);
Task SagaStarted(string sagaId, string serviceName, DateTime addTime);
Task SagaEnded(string sagaId, DateTime ExpiresAt);
Task ActivityExecuteStarted(string activityId, string sagaId, int timeOutSeconds, string parameterContent, int order);
Task ActivityRetried(string activityId, string status, int retries, string resultContent);
Task ActivityExecuteStarted(string activityId, string sagaId, int timeOutSeconds, byte[] ParameterData, int order, DateTime addTime);
Task ActivityRetried(string activityId, string status, int retries, ActivityRetryType retryType);
Task ActivityExecuteAborted(string activityId, string errors);
Task ActivityCompensateAborted(string activityId, string sagaId, string errors);
Task ActivityEnded(string activityId, string resultContent);
Task ActivityEnded(string activityId, byte[] resultData);
Task ActivityCompensated(string activityId);
Task ActivityExecuteOvertime(string activityId, string sagaId, string errors);
Task ActivityRevoked(string activityId);
......
......@@ -2,10 +2,9 @@
using System.Collections.Generic;
using System.Text;
namespace Pole.Sagas.Server.Events
namespace Pole.Sagas.Core.Abstraction
{
class SagaStartedEvent:IEvent
public interface ISagaInvoker
{
}
}
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.Sagas.Core
{
public class ActivityEntity
{
public string Id { get; set; }
public string SagaId { get; set; }
public int Order { get; set; }
public string Status { get; set; }
public int TimeOutSeconds { get; set; }
public Byte[] ParameterData { get; set; }
public Byte[] ResultData { get; set; }
public string Errors { get; set; }
public int ExecuteRetries { get; set; }
public int CompensateRetries { get; set; }
public DateTime AddTime { get; set; }
}
}
......@@ -2,10 +2,11 @@
using System.Collections.Generic;
using System.Text;
namespace Pole.Sagas.Server
namespace Pole.Sagas.Core
{
public interface IEvent
public enum ActivityRetryType
{
Execute,
Compensate
}
}
......@@ -15,7 +15,7 @@ namespace Pole.Sagas.Core
public Type ActivityDataType { get; set; }
public object DataObj { get; set; }
public int Order { get; set; }
public ActivityStatus ActivityState { get; set; }
public ActivityStatus ActivityStatus { get; set; }
public IServiceProvider ServiceProvider { get; set; }
public int TimeOutSeconds { get; set; }
public CancellationTokenSource CancellationTokenSource { get; set; }
......
......@@ -18,22 +18,22 @@ namespace Pole.Sagas.Core
return Task.CompletedTask;
}
public Task ActivityEnded(string activityId, string resultContent)
public Task ActivityEnded(string activityId, byte[] resultData)
{
return Task.CompletedTask;
}
public Task ActivityExecuteAborted(string activityId, string errors)
public Task ActivityExecuteAborted(string activityId, string errors)
{
return Task.CompletedTask;
}
public Task ActivityRetried(string activityId, string status, int retries, string resultContent)
public Task ActivityRetried(string activityId, string status, int retries, ActivityRetryType retryType)
{
return Task.CompletedTask;
}
public Task ActivityExecuteStarted(string activityId, string sagaId, int timeoutSeconds, string parameterContent, int order)
public Task ActivityExecuteStarted(string activityId, string sagaId, int timeoutSeconds, byte[] ParameterData, int order, DateTime addTime)
{
return Task.CompletedTask;
}
......@@ -43,7 +43,7 @@ namespace Pole.Sagas.Core
return Task.CompletedTask;
}
public Task SagaStarted(string sagaId, string serviceName)
public Task SagaStarted(string sagaId, string serviceName, DateTime addTime)
{
return Task.CompletedTask;
}
......
using Pole.Sagas.Core;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
namespace Pole.Sagas.Core
{
public interface ISagaStorage
{
Task SagaStarted(string sagaId, string serviceName,DateTime addTime);
Task SagaEnded(string sagaId, DateTime ExpiresAt);
Task ActivityExecuteStarted(string activityId, string sagaId, int timeOutSeconds, byte[] ParameterData, int order,DateTime addTime);
Task ActivityRetried(string activityId, string status, int retries, ActivityRetryType retryType);
Task ActivityExecuteAborted(string activityId, string errors);
Task ActivityCompensateAborted(string activityId, string sagaId, string errors);
Task ActivityEnded(string activityId, byte[] resultData);
Task ActivityCompensated(string activityId);
Task ActivityExecuteOvertime(string activityId, string sagaId, string errors);
Task ActivityRevoked(string activityId);
}
}
......@@ -43,7 +43,7 @@ namespace Pole.Sagas.Core
this.activityFinder = activityFinder;
Id = snowflakeIdGenerator.NextId();
}
internal Saga(ISnowflakeIdGenerator snowflakeIdGenerator, IServiceProvider serviceProvider, IEventSender eventSender, PoleSagasOption poleSagasOption, ISerializer serializer, IActivityFinder activityFinder,int currentExecuteOrder,int currentCompensateOrder)
internal Saga(ISnowflakeIdGenerator snowflakeIdGenerator, IServiceProvider serviceProvider, IEventSender eventSender, PoleSagasOption poleSagasOption, ISerializer serializer, IActivityFinder activityFinder,int currentExecuteOrder,int currentCompensateOrder, List<ActivityWapper> activities)
{
this.snowflakeIdGenerator = snowflakeIdGenerator;
this.serviceProvider = serviceProvider;
......@@ -54,6 +54,7 @@ namespace Pole.Sagas.Core
Id = snowflakeIdGenerator.NextId();
this.currentExecuteOrder = currentExecuteOrder;
this.currentCompensateOrder = currentCompensateOrder;
this.activities = activities;
}
public void AddActivity(string activityName, object data, int timeOutSeconds = 2)
......@@ -69,7 +70,7 @@ namespace Pole.Sagas.Core
ActivityWapper activityWapper = new ActivityWapper
{
ActivityDataType = dataType,
ActivityState = ActivityStatus.NotStarted,
ActivityStatus = ActivityStatus.NotStarted,
ActivityType = targetActivityType,
DataObj = data,
Order = CurrentMaxOrder,
......@@ -81,7 +82,7 @@ namespace Pole.Sagas.Core
public async Task<SagaResult> GetResult()
{
await eventSender.SagaStarted(Id, poleSagasOption.ServiceName);
await eventSender.SagaStarted(Id, poleSagasOption.ServiceName,DateTime.UtcNow);
var executeActivity = GetNextExecuteActivity();
if (executeActivity == null)
......@@ -139,8 +140,8 @@ namespace Pole.Sagas.Core
activityWapper.CancellationTokenSource = new System.Threading.CancellationTokenSource(2 * 1000);
try
{
var jsonContent = serializer.Serialize(activityWapper.DataObj, activityWapper.ActivityDataType);
await eventSender.ActivityExecuteStarted(activityId, Id, activityWapper.TimeOutSeconds, jsonContent, activityWapper.Order);
var bytesContent = serializer.SerializeToUtf8Bytes(activityWapper.DataObj, activityWapper.ActivityDataType);
await eventSender.ActivityExecuteStarted(activityId, Id, activityWapper.TimeOutSeconds, bytesContent, activityWapper.Order,DateTime.UtcNow);
var result = await activityWapper.InvokeExecute();
if (!result.IsSuccess)
{
......@@ -148,7 +149,7 @@ namespace Pole.Sagas.Core
await CompensateActivity(result,currentExecuteOrder);
return result;
}
await eventSender.ActivityEnded(activityId, string.Empty);
await eventSender.ActivityEnded(activityId, Encoding.UTF8.GetBytes(string.Empty));
var executeActivity = GetNextExecuteActivity();
if (executeActivity == null)
{
......
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.Sagas.Core
{
public class SagaEntity
{
public int Id { get; set; }
public string ServiceName { get; set; }
public List<ActivityEntity> ActivityEntities { get; set; }
public string Status { get; set; }
public DateTime? ExpiresAt { get; set; }
public DateTime AddTime { get; set; }
}
}
......@@ -5,11 +5,18 @@
</PropertyGroup>
<ItemGroup>
<Folder Include="Server\Storage\" />
<PackageReference Include="Grpc.Net.ClientFactory" Version="2.27.0" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Pole.Core\Pole.Core.csproj" />
</ItemGroup>
<ItemGroup>
<None Update="Protos\saga.proto">
<GrpcServices>Server</GrpcServices>
<AdditionalImportDirs>Protos</AdditionalImportDirs>
</None>
</ItemGroup>
</Project>
syntax = "proto3";
option csharp_namespace = "Pole.Sagas.Server.Grpc";
package pole.Sagas.Server.Grpc;
service Saga {
rpc SagaStarted (SagaStartedRequest) returns (CommonResponse);
rpc SagaEnded (SagaStartedRequest) returns (CommonResponse);
rpc ActivityExecuteStarted (ActivityExecuteStartedRequest) returns (CommonResponse);
rpc ActivityRetried (ActivityRetriedRequest) returns (CommonResponse);
rpc ActivityExecuteAborted (ActivityExecuteAbortedRequest) returns (CommonResponse);
rpc ActivityCompensateAborted (ActivityCompensateAbortedRequest) returns (CommonResponse);
rpc ActivityEnded (ActivityEndedRequest) returns (CommonResponse);
rpc ActivityCompensated (ActivityCompensatedRequest) returns (CommonResponse);
rpc ActivityExecuteOvertime (ActivityExecuteOvertimeRequest) returns (CommonResponse);
rpc ActivityRevoked (ActivityRevokedRequest) returns (CommonResponse);
}
message CommonResponse{
bool isSuccess = 1;
string message = 2;
string errors = 3;
}
message SagaStartedRequest {
string sagaId = 1;
string serviceName = 2;
}
message SagaEndedRequest {
string sagaId = 1;
string ExpiresAt = 2;
}
message ActivityExecuteStartedRequest {
string activityId = 1;
string sagaId = 2;
int32 timeOutSeconds = 3;
bytes parameterData = 4;
int32 order = 5;
string addTime = 6;
}
message ActivityRetriedRequest {
string activityId = 1;
string status = 2;
int32 retries = 3;
ActivityRetryType activityRetryType = 4;
enum ActivityRetryType{
Execute = 0;
Compensate = 1;
}
}
message ActivityExecuteAbortedRequest {
string activityId = 1;
string errors = 2;
}
message ActivityCompensateAbortedRequest {
string activityId = 1;
string sagaId = 2;
string errors = 3;
}
message ActivityEndedRequest {
string activityId = 1;
string sagaId = 2;
bytes resultData = 3;
}
message ActivityCompensatedRequest {
string activityId = 1;
}
message ActivityExecuteOvertimeRequest {
string activityId = 1;
string sagaId = 2;
string errors = 3;
}
message ActivityRevokedRequest {
string activityId = 1;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment