Commit 63f4a2bd by dingsongjie

添加 sagas 核心测试项目

parent 2e3b5fe6
Showing with 712 additions and 28 deletions
......@@ -43,6 +43,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Pole.Sagas", "src\Pole.Saga
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Pole.Sagas.Storage.PostgreSql", "src\Pole.Sagas.Storage.PostgreSql\Pole.Sagas.Storage.PostgreSql.csproj", "{9505BDFC-395B-4257-AEB3-2B44611147A4}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SagasTest.Api", "samples\apis\SagasTest.Api\SagasTest.Api.csproj", "{6138197E-6202-4E1B-9458-3CBEE60A36F9}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
......@@ -105,6 +107,10 @@ Global
{9505BDFC-395B-4257-AEB3-2B44611147A4}.Debug|Any CPU.Build.0 = Debug|Any CPU
{9505BDFC-395B-4257-AEB3-2B44611147A4}.Release|Any CPU.ActiveCfg = Release|Any CPU
{9505BDFC-395B-4257-AEB3-2B44611147A4}.Release|Any CPU.Build.0 = Release|Any CPU
{6138197E-6202-4E1B-9458-3CBEE60A36F9}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{6138197E-6202-4E1B-9458-3CBEE60A36F9}.Debug|Any CPU.Build.0 = Debug|Any CPU
{6138197E-6202-4E1B-9458-3CBEE60A36F9}.Release|Any CPU.ActiveCfg = Release|Any CPU
{6138197E-6202-4E1B-9458-3CBEE60A36F9}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
......@@ -127,6 +133,7 @@ Global
{FB3D2F52-123A-4606-B682-9159BD7913AE} = {655E719B-4A3E-467C-A541-E0770AB81DE1}
{1F06D877-E4EC-4908-9057-38EDCE5E54E6} = {9932C965-8B38-4F70-9E43-86DC56860E2B}
{9505BDFC-395B-4257-AEB3-2B44611147A4} = {9932C965-8B38-4F70-9E43-86DC56860E2B}
{6138197E-6202-4E1B-9458-3CBEE60A36F9} = {475116FC-DEEC-4255-94E4-AE7B8C85038D}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {DB0775A3-F293-4043-ADB7-72BAC081E87E}
......
......@@ -39,10 +39,10 @@ namespace Backet.Api
webBuilder.UseStartup<Startup>();
webBuilder.UseKestrel(option =>
{
option.ListenAnyIP(81, config =>
{
config.Protocols = Microsoft.AspNetCore.Server.Kestrel.Core.HttpProtocols.Http1;
});
//option.ListenAnyIP(81, config =>
//{
// config.Protocols = Microsoft.AspNetCore.Server.Kestrel.Core.HttpProtocols.Http1;
//});
option.ListenAnyIP(82, config =>
{
config.Protocols = Microsoft.AspNetCore.Server.Kestrel.Core.HttpProtocols.Http2;
......
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
namespace SagasTest.Api.Controllers
{
[Route("api/[controller]")]
[ApiController]
public class ValuesController : ControllerBase
{
// GET api/values
[HttpGet]
public ActionResult<IEnumerable<string>> Get()
{
return new string[] { "value1", "value2" };
}
// GET api/values/5
[HttpGet("{id}")]
public ActionResult<string> Get(int id)
{
return "value";
}
// POST api/values
[HttpPost]
public void Post([FromBody] string value)
{
}
// PUT api/values/5
[HttpPut("{id}")]
public void Put(int id, [FromBody] string value)
{
}
// DELETE api/values/5
[HttpDelete("{id}")]
public void Delete(int id)
{
}
}
}
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.AspNetCore;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
namespace SagasTest.Api
{
public class Program
{
public static void Main(string[] args)
{
CreateWebHostBuilder(args).Build().Run();
}
public static IWebHostBuilder CreateWebHostBuilder(string[] args) =>
WebHost.CreateDefaultBuilder(args)
.UseStartup<Startup>();
}
}
{
"$schema": "http://json.schemastore.org/launchsettings.json",
"iisSettings": {
"windowsAuthentication": false,
"anonymousAuthentication": true,
"iisExpress": {
"applicationUrl": "http://localhost:63500",
"sslPort": 44362
}
},
"profiles": {
"IIS Express": {
"commandName": "IISExpress",
"launchBrowser": true,
"launchUrl": "api/values",
"environmentVariables": {
"ASPNETCORE_ENVIRONMENT": "Development"
}
},
"SagasTest.Api": {
"commandName": "Project",
"launchBrowser": true,
"launchUrl": "api/values",
"applicationUrl": "https://localhost:5001;http://localhost:5000",
"environmentVariables": {
"ASPNETCORE_ENVIRONMENT": "Development"
}
}
}
}
\ No newline at end of file
<Project Sdk="Microsoft.NET.Sdk.Web">
<PropertyGroup>
<TargetFramework>netcoreapp3.1</TargetFramework>
<AspNetCoreHostingModel>InProcess</AspNetCoreHostingModel>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.AspNetCore.App" />
<PackageReference Include="Microsoft.AspNetCore.Razor.Design" Version="2.2.0" PrivateAssets="All" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\..\src\Pole.Core\Pole.Core.csproj" />
<ProjectReference Include="..\..\..\src\Pole.Sagas\Pole.Sagas.csproj" />
</ItemGroup>
</Project>
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.HttpsPolicy;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
namespace SagasTest.Api
{
public class Startup
{
private IConfiguration Configuration { get; }
private IWebHostEnvironment Environment { get; }
public Startup(IConfiguration configuration, IWebHostEnvironment env)
{
Configuration = configuration;
Environment = env;
}
// This method gets called by the runtime. Use this method to add services to the container.
public void ConfigureServices(IServiceCollection services)
{
services.AddControllers();
services.AddPole(config =>
{
config.AddSagas();
});
}
// This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
public void Configure(IApplicationBuilder app)
{
if (Environment.IsDevelopment())
{
app.UseDeveloperExceptionPage();
}
app.UsePole();
app.UseRouting();
app.UseEndpoints(endpoints =>
{
endpoints.MapDefaultControllerRoute();
});
}
}
}
{
"Logging": {
"LogLevel": {
"Default": "Debug",
"System": "Information",
"Microsoft": "Information"
}
}
}
{
"Logging": {
"LogLevel": {
"Default": "Warning"
}
},
"AllowedHosts": "*"
}
......@@ -68,7 +68,8 @@ namespace Pole.Core.Processor
var bytes = bytesTransport.GetBytes();
if (pendingMessage.Retries > producerOptions.MaxFailedRetryCount)
{
pendingMessage.ExpiresAt = DateTime.UtcNow;
pendingMessage.StatusName = nameof(EventStatus.Failed);
continue;
}
pendingMessage.Retries++;
var targetName = producerContainer.GetTargetName(pendingMessage.Name);
......
......@@ -143,7 +143,7 @@ namespace Pole.EventBus.RabbitMQ
{
retryTimes++;
ea.BasicProperties.Headers[Consts.ConsumerRetryTimesStr] = retryTimes.ToString();
ea.BasicProperties.Headers[Consts.ConsumerExceptionDetailsStr] = exception.InnerException?.Message + exception.StackTrace ?? exception.Message + exception.StackTrace;
ea.BasicProperties.Headers[Consts.ConsumerExceptionDetailsStr] = exception.InnerException != null ? exception.InnerException.Message + exception.StackTrace : exception.Message + exception.StackTrace;
await Task.Delay((int)Math.Pow(2, retryTimes) * 1000).ContinueWith((task) =>
{
using var channel = Client.PullChannel();
......
......@@ -79,7 +79,7 @@ $"UPDATE {tableName} SET \"Retries\"=@Retries,\"ExpiresAt\"=@ExpiresAt,\"StatusN
{
var fourMinAgo = DateTime.UtcNow.AddMinutes(-4).ToString("O");
var sql =
$"SELECT * FROM {tableName} WHERE \"Retries\"<{producerOptions.MaxFailedRetryCount} AND \"Added\"<'{fourMinAgo}' AND (\"StatusName\"='{EventStatus.Failed}' OR \"StatusName\"='{EventStatus.Pending}') for update skip locked LIMIT 200;";
$"SELECT * FROM {tableName} WHERE \"Retries\"<{producerOptions.MaxFailedRetryCount} AND \"Added\"<'{fourMinAgo}' AND \"StatusName\"='{EventStatus.Pending}' for update skip locked LIMIT 200;";
var result = new List<EventEntity>();
using var connection = new NpgsqlConnection(options.ConnectionString);
......
......@@ -3,11 +3,11 @@ using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
namespace Pole.Sagas.Core
namespace Pole.Sagas.Core.Abstraction
{
public interface IActivity<TData>
{
Task<ActivityExecuteResult> Execute(TData data);
Task<bool> Compensate(TData data);
Task Compensate(TData data);
}
}
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.Sagas.Core.Abstraction
{
public interface IActivityFinder
{
Type FindType(string name);
string GetName(Type type);
}
}
using System;
using System.Threading.Tasks;
namespace Pole.Sagas.Core.Abstraction
{
public interface IEventSender
{
Task SagaStarted(string sagaId, string serviceName);
Task SagaEnded(string sagaId,DateTime ExpiresAt);
Task ActivityStarted(string activityId, string sagaId, DateTime activityTimeoutTime, string parameterContent);
Task ActivityRetried(string activityId, string status, int retries,string resultContent);
Task ActivityExecuteAborted(string activityId,string resultContent,string errors);
Task ActivityCompensateAborted(string activityId,string sagaId,string errors);
Task ActivityEnded(string activityId, string resultContent);
Task ActivityCompensated(string activityId);
}
}
......@@ -2,10 +2,10 @@
using System.Collections.Generic;
using System.Text;
namespace Pole.Sagas.Core
namespace Pole.Sagas.Core.Abstraction
{
public interface ISagaFactory
{
TSaga CreateSaga<TSaga>(TimeSpan timeOut) where TSaga : ISaga;
ISaga CreateSaga();
}
}
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.Sagas.Core
{
public class ActivityCompensateResult
{
/// <summary>
/// If not success , this activity will be aborted , and current saga will compensate all previous activities
/// </summary>
public bool IsSuccess { get; set; }
public string Message { get; set; }
}
}
......@@ -7,6 +7,18 @@ namespace Pole.Sagas.Core
public class ActivityExecuteResult
{
public bool IsSuccess { get; set; }
public string Message { get; set; }
public object Result { get; set; }
public string Errors { get; set; }
public static implicit operator SagaResult(ActivityExecuteResult activity)
{
return new SagaResult
{
IsSuccess = activity.IsSuccess,
Result = default(object),
HasException = !string.IsNullOrEmpty(activity.Errors),
ExceptionMessages = activity.Errors
};
}
}
}
using Microsoft.Extensions.Logging;
using Pole.Core.Utils;
using Pole.Sagas.Core.Abstraction;
using Pole.Sagas.Core.Exceptions;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace Pole.Sagas.Core
{
class ActivityFinder : IActivityFinder
{
private readonly ConcurrentDictionary<string, Type> nameDict = new ConcurrentDictionary<string, Type>();
private readonly ConcurrentDictionary<Type, string> typeDict = new ConcurrentDictionary<Type, string>();
readonly ILogger<ActivityFinder> logger;
public ActivityFinder(ILogger<ActivityFinder> logger)
{
this.logger = logger;
var baseActivityType = typeof(IActivity<>);
foreach (var assembly in AssemblyHelper.GetAssemblies(this.logger))
{
foreach (var type in assembly.GetTypes().Where(m => m.IsGenericType && m.GetGenericTypeDefinition() == baseActivityType && !m.IsAbstract))
{
if (!type.FullName.EndsWith("Activity"))
{
throw new ActivityNameIrregularException(type);
}
var activityName = type.Name.Substring(0, type.Name.Length - "Activity".Length);
typeDict.TryAdd(type, activityName);
if (!nameDict.TryAdd(activityName, type))
{
throw new ActivityNameRepeatedException(activityName);
}
}
}
}
public Type FindType(string name)
{
if (nameDict.TryGetValue(name, out Type type))
{
return type;
}
throw new ActivityNotFoundByNameException(name);
}
public string GetName(Type type)
{
if (!typeDict.TryGetValue(type, out var value))
throw new ActivityNotFoundByTypeException(type);
return value;
}
}
}
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.Sagas.Core
{
public enum ActivityStatus
{
NotStarted,
Executed,
Compensated,
ExecuteAborted,
CompensateAborted,
}
}
using Microsoft.Extensions.DependencyInjection;
using System;
using System.Collections.Generic;
using System.Linq.Expressions;
using System.Text;
using System.Threading.Tasks;
namespace Pole.Sagas.Core
{
public class ActivityWapper
{
public string Id { get; set; }
public Type ActivityType { get; set; }
public Type ActivityDataType { get; set; }
public object DataObj { get; set; }
public int Order { get; set; }
public ActivityStatus ActivityState { get; set; }
public IServiceProvider ServiceProvider { get; set; }
public DateTime TimeOut { get; set; }
public Task<ActivityExecuteResult> InvokeExecute()
{
var activityObjParams = Expression.Parameter(typeof(object), "activity");
var activityParams = Expression.Convert(activityObjParams, ActivityType);
var dataObjParams = Expression.Parameter(typeof(object), "data");
var dataParams = Expression.Convert(dataObjParams, ActivityDataType);
var method = ActivityType.GetMethod("Execute", new Type[] { ActivityDataType });
var body = Expression.Call(activityObjParams, method, activityObjParams, dataParams);
var func = Expression.Lambda<Func<object, object, Task<ActivityExecuteResult>>>(body, activityObjParams, dataObjParams).Compile();
using (var scope = ServiceProvider.CreateScope())
{
var activity = scope.ServiceProvider.GetRequiredService(ActivityType);
return func(activity, DataObj);
}
}
public Task InvokeCompensate()
{
var activityObjParams = Expression.Parameter(typeof(object), "activity");
var activityParams = Expression.Convert(activityObjParams, ActivityType);
var dataObjParams = Expression.Parameter(typeof(object), "data");
var dataParams = Expression.Convert(dataObjParams, ActivityDataType);
var method = ActivityType.GetMethod("Compensate", new Type[] { ActivityDataType });
var body = Expression.Call(activityObjParams, method, activityObjParams, dataParams);
var func = Expression.Lambda<Func<object, object, Task>>(body, activityObjParams, dataObjParams).Compile();
using (var scope = ServiceProvider.CreateScope())
{
var activity = scope.ServiceProvider.GetRequiredService(ActivityType);
return func(activity, DataObj);
}
}
}
}
using Pole.Sagas.Core.Abstraction;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
namespace Pole.Sagas.Core
{
class EventSender : IEventSender
{
public Task ActivityCompensateAborted(string activityId, string sagaId, string errors)
{
return Task.CompletedTask;
}
public Task ActivityCompensated(string activityId)
{
return Task.CompletedTask;
}
public Task ActivityEnded(string activityId, string resultContent)
{
return Task.CompletedTask;
}
public Task ActivityExecuteAborted(string activityId, string resultContent, string errors)
{
return Task.CompletedTask;
}
public Task ActivityRetried(string activityId, string status, int retries, string resultContent)
{
return Task.CompletedTask;
}
public Task ActivityStarted(string activityId, string sagaId, DateTime activityTimeoutTime, string parameterContent)
{
return Task.CompletedTask;
}
public Task SagaEnded(string sagaId, DateTime ExpiresAt)
{
return Task.CompletedTask;
}
public Task SagaStarted(string sagaId, string serviceName)
{
return Task.CompletedTask;
}
}
}
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.Sagas.Core.Exceptions
{
public class ActivityNameIrregularException:Exception
{
public ActivityNameIrregularException(Type activityType):base($"Activity : {activityType.FullName} irregular naming")
{
}
}
}
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.Sagas.Core.Exceptions
{
public class ActivityNameRepeatedException : Exception
{
public ActivityNameRepeatedException(string name):base($"Activity :{name} already exists")
{
}
}
}
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.Sagas.Core.Exceptions
{
public class ActivityNotFoundByNameException:Exception
{
public ActivityNotFoundByNameException(string activityName) : base($"Activity not found by name: {activityName} ")
{
}
}
}
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.Sagas.Core.Exceptions
{
public class ActivityNotFoundByTypeException : Exception
{
public ActivityNotFoundByTypeException(Type activityType) : base($"Activity not found by type: {activityType.FullName}")
{
}
}
}
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
namespace Pole.Sagas.Core
{
public interface ISaga
{
void AddActivity<TData>(IActivity<TData> activity);
string Id { get; }
void AddActivity<TData>(string activityName, TData data);
Task<SagaResult> GetResult();
}
}
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.Sagas.Core
{
public class PoleSagasOption
{
public string ServiceName { get; set; }
public int CompeletedSagaExpiredAfterSeconds { get; set; } = 60 * 10;
}
}
using System;
using Pole.Core.Serialization;
using Pole.Core.Utils.Abstraction;
using Pole.Sagas.Core.Exceptions;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
namespace Pole.Sagas.Core
{
class Saga : ISaga
public class Saga : ISaga
{
private System.Collections.Concurrent.ConcurrentQueue<>
public void AddActivity<TData>(IActivity<TData> activity)
private List<ActivityWapper> activities = new List<ActivityWapper>();
private IServiceProvider serviceProvider;
private IEventSender eventSender;
private ISnowflakeIdGenerator snowflakeIdGenerator;
private PoleSagasOption poleSagasOption;
private int _currentMaxOrder = 0;
private int _currentExecuteOrder = 0;
private int _currentCompensateOrder = 0;
private ISerializer serializer;
public string Id { get; }
public Saga(ISnowflakeIdGenerator snowflakeIdGenerator, IServiceProvider serviceProvider, IEventSender eventSender, PoleSagasOption poleSagasOption, ISerializer serializer)
{
this.snowflakeIdGenerator = snowflakeIdGenerator;
this.serviceProvider = serviceProvider;
this.eventSender = eventSender;
this.poleSagasOption = poleSagasOption;
this.serializer = serializer;
Id = snowflakeIdGenerator.NextId();
}
public void AddActivity<TData>(string activityName, TData data)
{
_currentMaxOrder++;
ActivityWapper activityWapper = new ActivityWapper
{
ActivityDataType = typeof(TData),
ActivityState = ActivityStatus.NotStarted,
ActivityType = data.GetType(),
DataObj = data,
Order = _currentMaxOrder,
ServiceProvider = serviceProvider
};
activities.Add(activityWapper);
}
public async Task<SagaResult> GetResult()
{
await eventSender.SagaStarted(Id, poleSagasOption.ServiceName);
var executeActivity = GetNextExecuteActivity();
if (executeActivity == null)
{
var expiresAt = DateTime.UtcNow.AddSeconds(poleSagasOption.CompeletedSagaExpiredAfterSeconds);
await eventSender.SagaEnded(Id, expiresAt);
return SagaResult.SuccessResult;
}
var result = await RecursiveExecuteActivity(executeActivity);
return result;
}
private ActivityWapper GetNextExecuteActivity()
{
if (_currentExecuteOrder == _currentMaxOrder)
{
return null;
}
_currentExecuteOrder++;
return activities[_currentExecuteOrder];
}
private ActivityWapper GetNextCompensateActivity()
{
_currentCompensateOrder--;
if (_currentExecuteOrder == 0)
{
return null;
}
return activities[_currentCompensateOrder];
}
private async Task RecursiveCompensateActivity(ActivityWapper activityWapper)
{
var activityId = activityWapper.Id;
try
{
//var jsonContent = serializer.Serialize(activityWapper.DataObj, activityWapper.ActivityDataType);
//await eventSender.ActivityStarted(activityId, Id, activityWapper.TimeOut, jsonContent);
await activityWapper.InvokeCompensate();
await eventSender.ActivityCompensated(activityId);
var compensateActivity = GetNextCompensateActivity();
if (compensateActivity == null)
{
return;
}
await RecursiveCompensateActivity(compensateActivity);
}
catch (Exception exception)
{
await eventSender.ActivityCompensateAborted(activityId, Id, exception.InnerException != null ? exception.InnerException.Message + exception.StackTrace : exception.Message + exception.StackTrace);
}
}
private async Task<ActivityExecuteResult> RecursiveExecuteActivity(ActivityWapper activityWapper)
{
throw new NotImplementedException();
var activityId = snowflakeIdGenerator.NextId();
activityWapper.Id = activityId;
try
{
var jsonContent = serializer.Serialize(activityWapper.DataObj, activityWapper.ActivityDataType);
await eventSender.ActivityStarted(activityId, Id, activityWapper.TimeOut, jsonContent);
var result = await activityWapper.InvokeExecute();
if (!result.IsSuccess)
{
await eventSender.ActivityExecuteAborted(activityId, serializer.Serialize(result.Result), string.Empty);
_currentCompensateOrder = _currentExecuteOrder;
var compensateActivity = GetNextCompensateActivity();
await RecursiveCompensateActivity(compensateActivity);
return result;
}
await eventSender.ActivityEnded(activityId, string.Empty);
var executeActivity = GetNextExecuteActivity();
if (executeActivity == null)
{
return result;
}
else
{
return await RecursiveExecuteActivity(executeActivity);
}
}
catch (Exception exception)
{
var errors = exception.InnerException != null ? exception.InnerException.Message + exception.StackTrace : exception.Message + exception.StackTrace;
await eventSender.ActivityExecuteAborted(activityId, string.Empty, errors);
return new ActivityExecuteResult
{
IsSuccess = false,
Errors = errors
};
}
}
}
}
using Pole.Core.Utils.Abstraction;
using Microsoft.Extensions.Options;
using Pole.Core.Serialization;
using Pole.Core.Utils.Abstraction;
using Pole.Sagas.Core.Abstraction;
using System;
using System.Collections.Generic;
using System.Text;
......@@ -7,18 +10,23 @@ namespace Pole.Sagas.Core
{
class SagaFactory : ISagaFactory
{
private readonly ISnowflakeIdGenerator _snowflakeIdGenerator;
public SagaFactory(ISnowflakeIdGenerator snowflakeIdGenerator)
private readonly ISnowflakeIdGenerator snowflakeIdGenerator;
private readonly IServiceProvider serviceProvider;
private readonly IEventSender eventSender;
private readonly PoleSagasOption poleSagasOption;
private readonly ISerializer serializer;
public SagaFactory(ISnowflakeIdGenerator snowflakeIdGenerator, IServiceProvider serviceProvider, IEventSender eventSender, IOptions<PoleSagasOption> poleSagasOption, ISerializer serializer)
{
_snowflakeIdGenerator = snowflakeIdGenerator;
this.snowflakeIdGenerator = snowflakeIdGenerator;
this.serviceProvider = serviceProvider;
this.eventSender = eventSender;
this.poleSagasOption = poleSagasOption.Value;
this.serializer = serializer;
}
public TSaga CreateSaga<TSaga>(TimeSpan timeOut) where TSaga : ISaga
public ISaga CreateSaga()
{
var name = typeof(TSaga).FullName;
var SagaFlow = SagasCollection.Get(name);
var newId = _snowflakeIdGenerator.NextId();
throw new NotImplementedException();
return new Saga(snowflakeIdGenerator, serviceProvider, eventSender, poleSagasOption, serializer);
}
}
}
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.Sagas.Core
{
public class SagaResult
{
public bool IsSuccess { get; set; }
public bool HasException { get; set; }
public object Result { get; set; }
public string ExceptionMessages { get; set; } = string.Empty;
public static SagaResult SuccessResult = new SagaResult
{
IsSuccess = true,
Result = default
};
}
}
......@@ -5,7 +5,6 @@
</PropertyGroup>
<ItemGroup>
<Folder Include="Client\" />
<Folder Include="Server\Storage\" />
</ItemGroup>
......
using System;
using System.Collections.Generic;
using System.Text;
using Microsoft.Extensions.DependencyInjection;
using Pole.Core;
using Pole.Sagas.Core;
using Pole.Sagas.Core.Abstraction;
namespace Microsoft.Extensions.DependencyInjection
{
public static class PoleSagaServiceCollectionExtensions
{
public static void AddSagas(this StartupConfig startupOption, Action<PoleSagasOption> rabbitConfigAction)
{
startupOption.Services.Configure(rabbitConfigAction);
startupOption.Services.AddSingleton<IActivityFinder, ActivityFinder>();
startupOption.Services.AddSingleton<IEventSender, EventSender>();
startupOption.Services.AddSingleton<ISagaFactory, SagaFactory>();
}
public static void AddSagas(this StartupConfig startupOption)
{
Action<PoleSagasOption> action = option => { };
startupOption.Services.Configure(action);
startupOption.Services.AddSingleton<IActivityFinder, ActivityFinder>();
startupOption.Services.AddSingleton<IEventSender, EventSender>();
startupOption.Services.AddSingleton<ISagaFactory, SagaFactory>();
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment