Commit d424c8c1 by dingsongjie

客户端 发送部分完成

parent 32de2b90
...@@ -12,12 +12,4 @@ ...@@ -12,12 +12,4 @@
<ProjectReference Include="..\Pole.Sagas\Pole.Sagas.csproj" /> <ProjectReference Include="..\Pole.Sagas\Pole.Sagas.csproj" />
</ItemGroup> </ItemGroup>
<ItemGroup>
<Protobuf Include="Protos\*" AdditionalImportDirs="Protos" GrpcServices="Server" />
</ItemGroup>
<ItemGroup>
<None Remove="Protos\saga.proto" />
</ItemGroup>
</Project> </Project>
...@@ -10,6 +10,7 @@ namespace Pole.Sagas.Server ...@@ -10,6 +10,7 @@ namespace Pole.Sagas.Server
public static IServiceCollection AddPoleSagasServer(IServiceCollection services) public static IServiceCollection AddPoleSagasServer(IServiceCollection services)
{ {
services.AddGrpc(); services.AddGrpc();
return services; return services;
} }
} }
......
syntax = "proto3";
option csharp_namespace = "Pole.Sagas.Server.Grpc";
package pole.Sagas.Server.Grpc;
service Saga {
rpc SagaStarted (SagaStartedRequest) returns (CommonResponse);
rpc SagaEnded (SagaEndedRequest) returns (CommonResponse);
rpc ActivityExecuteStarted (ActivityExecuteStartedRequest) returns (CommonResponse);
rpc ActivityRetried (ActivityRetriedRequest) returns (CommonResponse);
rpc ActivityExecuteAborted (ActivityExecuteAbortedRequest) returns (CommonResponse);
rpc ActivityCompensateAborted (ActivityCompensateAbortedRequest) returns (CommonResponse);
rpc ActivityEnded (ActivityEndedRequest) returns (CommonResponse);
rpc ActivityCompensated (ActivityCompensatedRequest) returns (CommonResponse);
rpc ActivityExecuteOvertime (ActivityExecuteOvertimeRequest) returns (CommonResponse);
rpc ActivityRevoked (ActivityRevokedRequest) returns (CommonResponse);
}
message CommonResponse{
bool isSuccess = 1;
string message = 2;
string errors = 3;
}
message SagaStartedRequest {
string sagaId = 1;
string serviceName = 2;
string addTime = 3;
}
message SagaEndedRequest {
string sagaId = 1;
string ExpiresAt = 2;
}
message ActivityExecuteStartedRequest {
string activityId = 1;
string sagaId = 2;
int32 timeOutSeconds = 3;
bytes parameterData = 4;
int32 order = 5;
string addTime = 6;
}
message ActivityRetriedRequest {
string activityId = 1;
string status = 2;
int32 retries = 3;
ActivityRetryType activityRetryType = 4;
enum ActivityRetryType{
Execute = 0;
Compensate = 1;
}
}
message ActivityExecuteAbortedRequest {
string activityId = 1;
string errors = 2;
}
message ActivityCompensateAbortedRequest {
string activityId = 1;
string sagaId = 2;
string errors = 3;
}
message ActivityEndedRequest {
string activityId = 1;
string sagaId = 2;
bytes resultData = 3;
}
message ActivityCompensatedRequest {
string activityId = 1;
}
message ActivityExecuteOvertimeRequest {
string activityId = 1;
string sagaId = 2;
string errors = 3;
}
message ActivityRevokedRequest {
string activityId = 1;
}
...@@ -7,11 +7,11 @@ namespace Pole.Sagas.Core.Abstraction ...@@ -7,11 +7,11 @@ namespace Pole.Sagas.Core.Abstraction
{ {
Task SagaStarted(string sagaId, string serviceName, DateTime addTime); Task SagaStarted(string sagaId, string serviceName, DateTime addTime);
Task SagaEnded(string sagaId, DateTime ExpiresAt); Task SagaEnded(string sagaId, DateTime ExpiresAt);
Task ActivityExecuteStarted(string activityId, string sagaId, int timeOutSeconds, byte[] ParameterData, int order, DateTime addTime); Task ActivityExecuteStarted(string activityId, string sagaId, int timeOutSeconds, byte[] parameterData, int order, DateTime addTime);
Task ActivityRetried(string activityId, string status, int retries, ActivityRetryType retryType); Task ActivityRetried(string activityId, string status, int retries, ActivityRetryType retryType);
Task ActivityExecuteAborted(string activityId, string errors); Task ActivityExecuteAborted(string activityId, string errors);
Task ActivityCompensateAborted(string activityId, string sagaId, string errors); Task ActivityCompensateAborted(string activityId, string sagaId, string errors);
Task ActivityEnded(string activityId, byte[] resultData); Task ActivityEnded(string activityId,byte[] resultData);
Task ActivityCompensated(string activityId); Task ActivityCompensated(string activityId);
Task ActivityExecuteOvertime(string activityId, string sagaId, string errors); Task ActivityExecuteOvertime(string activityId, string sagaId, string errors);
Task ActivityRevoked(string activityId); Task ActivityRevoked(string activityId);
......
using Pole.Sagas.Core.Abstraction; using Grpc.Net.Client;
using Microsoft.Extensions.Options;
using Pole.Sagas.Core.Abstraction;
using Pole.Sagas.Core.Exceptions;
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Text; using System.Text;
using System.Threading.Tasks; using System.Threading.Tasks;
using static Pole.Sagas.Server.Grpc.Saga;
namespace Pole.Sagas.Core namespace Pole.Sagas.Core
{ {
class EventSender : IEventSender class EventSender : IEventSender
{ {
public Task ActivityCompensateAborted(string activityId, string sagaId, string errors) private readonly SagaClient sagaClient;
public EventSender(SagaClient sagaClient)
{ {
return Task.CompletedTask; this.sagaClient = sagaClient;
}
public async Task ActivityCompensateAborted(string activityId, string sagaId, string errors)
{
var result = await sagaClient.ActivityCompensateAbortedAsync(new Server.Grpc.ActivityCompensateAbortedRequest
{
ActivityId = activityId,
Errors = errors,
SagaId = sagaId
});
if (!result.IsSuccess)
{
throw new SagasServerException(result.Errors);
}
} }
public Task ActivityCompensated(string activityId) public async Task ActivityCompensated(string activityId)
{ {
return Task.CompletedTask; var result = await sagaClient.ActivityCompensatedAsync(new Server.Grpc.ActivityCompensatedRequest
{
ActivityId = activityId,
});
if (!result.IsSuccess)
{
throw new SagasServerException(result.Errors);
}
} }
public Task ActivityEnded(string activityId, byte[] resultData) public async Task ActivityEnded(string activityId, byte[] resultData)
{ {
return Task.CompletedTask; var result = await sagaClient.ActivityEndedAsync(new Server.Grpc.ActivityEndedRequest
{
ActivityId = activityId,
ResultData = Google.Protobuf.ByteString.CopyFrom(resultData),
});
if (!result.IsSuccess)
{
throw new SagasServerException(result.Errors);
}
} }
public Task ActivityExecuteAborted(string activityId, string errors) public async Task ActivityExecuteAborted(string activityId, string errors)
{ {
return Task.CompletedTask; var result = await sagaClient.ActivityExecuteAbortedAsync(new Server.Grpc.ActivityExecuteAbortedRequest
{
ActivityId = activityId,
Errors = errors
});
if (!result.IsSuccess)
{
throw new SagasServerException(result.Errors);
}
} }
public Task ActivityRetried(string activityId, string status, int retries, ActivityRetryType retryType) public async Task ActivityRetried(string activityId, string status, int retries, ActivityRetryType retryType)
{ {
return Task.CompletedTask; Pole.Sagas.Server.Grpc.ActivityRetriedRequest.Types.ActivityRetryType activityRetryType = retryType == ActivityRetryType.Compensate ? Pole.Sagas.Server.Grpc.ActivityRetriedRequest.Types.ActivityRetryType.Compensate : Pole.Sagas.Server.Grpc.ActivityRetriedRequest.Types.ActivityRetryType.Execute;
var result = await sagaClient.ActivityRetriedAsync(new Server.Grpc.ActivityRetriedRequest
{
ActivityId = activityId,
ActivityRetryType = activityRetryType
});
if (!result.IsSuccess)
{
throw new SagasServerException(result.Errors);
}
} }
public Task ActivityExecuteStarted(string activityId, string sagaId, int timeoutSeconds, byte[] ParameterData, int order, DateTime addTime) public async Task ActivityExecuteStarted(string activityId, string sagaId, int timeoutSeconds, byte[] parameterData, int order, DateTime addTime)
{ {
return Task.CompletedTask; var result = await sagaClient.ActivityExecuteStartedAsync(new Server.Grpc.ActivityExecuteStartedRequest
{
ActivityId = activityId,
AddTime = addTime.ToString("yyyy-MM-dd HH:mm:ss.fff"),
Order = order,
ParameterData = Google.Protobuf.ByteString.CopyFrom(parameterData),
SagaId = sagaId,
TimeOutSeconds = timeoutSeconds
});
if (!result.IsSuccess)
{
throw new SagasServerException(result.Errors);
}
} }
public Task SagaEnded(string sagaId, DateTime ExpiresAt) public async Task SagaEnded(string sagaId, DateTime ExpiresAt)
{ {
return Task.CompletedTask; var result = await sagaClient.SagaEndedAsync(new Server.Grpc.SagaEndedRequest
{
SagaId = sagaId,
ExpiresAt = ExpiresAt.ToString("yyyy-MM-dd HH:mm:ss.fff"),
});
if (!result.IsSuccess)
{
throw new SagasServerException(result.Errors);
}
} }
public Task SagaStarted(string sagaId, string serviceName, DateTime addTime) public async Task SagaStarted(string sagaId, string serviceName, DateTime addTime)
{ {
return Task.CompletedTask; var result = await sagaClient.SagaStartedAsync(new Server.Grpc.SagaStartedRequest
{
SagaId = sagaId,
ServiceName = serviceName,
AddTime = addTime.ToString("yyyy-MM-dd HH:mm:ss.fff"),
});
if (!result.IsSuccess)
{
throw new SagasServerException(result.Errors);
}
} }
public Task ActivityExecuteOvertime(string activityId, string sagaId, string errors) public async Task ActivityExecuteOvertime(string activityId, string sagaId, string errors)
{ {
return Task.CompletedTask; var result = await sagaClient.ActivityExecuteOvertimeAsync(new Server.Grpc.ActivityExecuteOvertimeRequest
{
SagaId = sagaId,
ActivityId = activityId,
Errors = errors
});
if (!result.IsSuccess)
{
throw new SagasServerException(result.Errors);
}
} }
public Task ActivityRevoked(string activityId) public async Task ActivityRevoked(string activityId)
{ {
throw new NotImplementedException(); var result = await sagaClient.ActivityRevokedAsync(new Server.Grpc.ActivityRevokedRequest
{
ActivityId = activityId,
});
if (!result.IsSuccess)
{
throw new SagasServerException(result.Errors);
}
} }
} }
} }
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.Sagas.Core.Exceptions
{
public class SagasServerException : Exception
{
public SagasServerException(string errors) : base(errors)
{
}
}
}
...@@ -8,5 +8,6 @@ namespace Pole.Sagas.Core ...@@ -8,5 +8,6 @@ namespace Pole.Sagas.Core
{ {
public string ServiceName { get; set; } public string ServiceName { get; set; }
public int CompeletedSagaExpiredAfterSeconds { get; set; } = 60 * 10; public int CompeletedSagaExpiredAfterSeconds { get; set; } = 60 * 10;
public string SagasServerHost { get; set; }
} }
} }
...@@ -5,13 +5,20 @@ ...@@ -5,13 +5,20 @@
</PropertyGroup> </PropertyGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="Google.Protobuf" Version="3.11.4" />
<PackageReference Include="Grpc.Net.ClientFactory" Version="2.27.0" /> <PackageReference Include="Grpc.Net.ClientFactory" Version="2.27.0" />
<PackageReference Include="Grpc.Tools" Version="2.27.0">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<ProjectReference Include="..\Pole.Core\Pole.Core.csproj" /> <ProjectReference Include="..\Pole.Core\Pole.Core.csproj" />
</ItemGroup> </ItemGroup>
<ItemGroup>
<Protobuf Include="Protos\*" AdditionalImportDirs="Protos" GrpcServices="All" />
</ItemGroup>
<ItemGroup> <ItemGroup>
<None Update="Protos\saga.proto"> <None Update="Protos\saga.proto">
<GrpcServices>Server</GrpcServices> <GrpcServices>Server</GrpcServices>
......
...@@ -3,22 +3,32 @@ using System.Collections.Generic; ...@@ -3,22 +3,32 @@ using System.Collections.Generic;
using System.Linq; using System.Linq;
using System.Text; using System.Text;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using Pole.Core; using Pole.Core;
using Pole.Core.Utils; using Pole.Core.Utils;
using Pole.Sagas.Core; using Pole.Sagas.Core;
using Pole.Sagas.Core.Abstraction; using Pole.Sagas.Core.Abstraction;
using Pole.Sagas.Core.Exceptions; using Pole.Sagas.Core.Exceptions;
using static Pole.Sagas.Server.Grpc.Saga;
namespace Microsoft.Extensions.DependencyInjection namespace Microsoft.Extensions.DependencyInjection
{ {
public static class PoleSagaServiceCollectionExtensions public static class PoleSagaServiceCollectionExtensions
{ {
public static void AddSagas(this StartupConfig startupOption, Action<PoleSagasOption> rabbitConfigAction) public static void AddSagas(this StartupConfig startupOption, Action<PoleSagasOption> configAction)
{ {
startupOption.Services.Configure(rabbitConfigAction); startupOption.Services.Configure(configAction);
startupOption.Services.AddSingleton<IActivityFinder, ActivityFinder>(); startupOption.Services.AddSingleton<IActivityFinder, ActivityFinder>();
startupOption.Services.AddSingleton<IEventSender, EventSender>(); startupOption.Services.AddSingleton<IEventSender, EventSender>();
startupOption.Services.AddSingleton<ISagaFactory, SagaFactory>(); startupOption.Services.AddSingleton<ISagaFactory, SagaFactory>();
using(var provider = startupOption.Services.BuildServiceProvider())
{
var sagasOption = provider.GetRequiredService<IOptions<PoleSagasOption>>().Value;
startupOption.Services.AddGrpcClient<SagaClient>(o =>
{
o.Address = new Uri(sagasOption.SagasServerHost);
});
}
var baseActivityType = typeof(IActivity<>); var baseActivityType = typeof(IActivity<>);
foreach (var assembly in AssemblyHelper.GetAssemblies()) foreach (var assembly in AssemblyHelper.GetAssemblies())
{ {
......
...@@ -6,7 +6,7 @@ package pole.Sagas.Server.Grpc; ...@@ -6,7 +6,7 @@ package pole.Sagas.Server.Grpc;
service Saga { service Saga {
rpc SagaStarted (SagaStartedRequest) returns (CommonResponse); rpc SagaStarted (SagaStartedRequest) returns (CommonResponse);
rpc SagaEnded (SagaStartedRequest) returns (CommonResponse); rpc SagaEnded (SagaEndedRequest) returns (CommonResponse);
rpc ActivityExecuteStarted (ActivityExecuteStartedRequest) returns (CommonResponse); rpc ActivityExecuteStarted (ActivityExecuteStartedRequest) returns (CommonResponse);
rpc ActivityRetried (ActivityRetriedRequest) returns (CommonResponse); rpc ActivityRetried (ActivityRetriedRequest) returns (CommonResponse);
rpc ActivityExecuteAborted (ActivityExecuteAbortedRequest) returns (CommonResponse); rpc ActivityExecuteAborted (ActivityExecuteAbortedRequest) returns (CommonResponse);
...@@ -25,6 +25,7 @@ message CommonResponse{ ...@@ -25,6 +25,7 @@ message CommonResponse{
message SagaStartedRequest { message SagaStartedRequest {
string sagaId = 1; string sagaId = 1;
string serviceName = 2; string serviceName = 2;
string addTime = 3;
} }
message SagaEndedRequest { message SagaEndedRequest {
string sagaId = 1; string sagaId = 1;
...@@ -59,8 +60,7 @@ message ActivityCompensateAbortedRequest { ...@@ -59,8 +60,7 @@ message ActivityCompensateAbortedRequest {
} }
message ActivityEndedRequest { message ActivityEndedRequest {
string activityId = 1; string activityId = 1;
string sagaId = 2; bytes resultData = 2;
bytes resultData = 3;
} }
message ActivityCompensatedRequest { message ActivityCompensatedRequest {
string activityId = 1; string activityId = 1;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment