Commit a0c8638a by dingsongjie

添加核心代码 并且 添加 核心代码的测试

parent 63f4a2bd
Showing with 549 additions and 72 deletions
using Pole.Sagas.Core;
using Pole.Sagas.Core.Abstraction;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading.Tasks;
namespace SagasTest.Api.Activities
{
public class Transaction1OkActivity : IActivity<Transaction1Dto>
{
private readonly IHttpClientFactory httpClientFactory;
public Transaction1OkActivity(IHttpClientFactory httpClientFactory)
{
this.httpClientFactory = httpClientFactory;
}
public async Task Compensate(Transaction1Dto data)
{
var httpclient = httpClientFactory.CreateClient();
httpclient.BaseAddress = new Uri("http://localhost:5000");
var result = await httpclient.GetAsync("api/OutGoingMock/Transaction1RollBack");
}
public async Task<ActivityExecuteResult> Execute(Transaction1Dto data)
{
var httpclient = httpClientFactory.CreateClient();
httpclient.BaseAddress = new Uri("http://localhost:5000");
var result = await httpclient.GetAsync("api/OutGoingMock/Transaction1Ok");
return ActivityExecuteResult.Success;
}
}
public class Transaction1Dto
{
public string Name { get; set; }
public int Id { get; set; }
}
}
using Pole.Sagas.Core;
using Pole.Sagas.Core.Abstraction;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading.Tasks;
namespace SagasTest.Api.Activities
{
public class Transaction1ReturnFalseActivity : IActivity<Transaction1Dto>
{
private readonly IHttpClientFactory httpClientFactory;
public Transaction1ReturnFalseActivity(IHttpClientFactory httpClientFactory)
{
this.httpClientFactory = httpClientFactory;
}
public async Task Compensate(Transaction1Dto data)
{
var httpclient = httpClientFactory.CreateClient();
httpclient.BaseAddress = new Uri("http://localhost:5000");
var result = await httpclient.GetAsync("api/OutGoingMock/Transaction1RollBack");
}
public async Task<ActivityExecuteResult> Execute(Transaction1Dto data)
{
var httpclient = httpClientFactory.CreateClient();
httpclient.BaseAddress = new Uri("http://localhost:5000");
var result = await httpclient.GetAsync("api/OutGoingMock/Transaction1Ok");
return new ActivityExecuteResult
{
IsSuccess = false,
Result = "库存不足"
};
}
}
}
using Pole.Sagas.Core;
using Pole.Sagas.Core.Abstraction;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading.Tasks;
namespace SagasTest.Api.Activities
{
public class Transaction2OkActivity : IActivity<Transaction2Dto>
{
private readonly IHttpClientFactory httpClientFactory;
public Transaction2OkActivity(IHttpClientFactory httpClientFactory)
{
this.httpClientFactory = httpClientFactory;
}
public async Task Compensate(Transaction2Dto data)
{
var httpclient = httpClientFactory.CreateClient();
httpclient.BaseAddress = new Uri("http://localhost:5000");
var result = await httpclient.GetAsync("api/OutGoingMock/Transaction2RollBack");
}
public async Task<ActivityExecuteResult> Execute(Transaction2Dto data)
{
var httpclient = httpClientFactory.CreateClient();
httpclient.BaseAddress = new Uri("http://localhost:5000");
var result = await httpclient.GetAsync("api/OutGoingMock/Transaction2Ok");
return ActivityExecuteResult.Success;
}
}
public class Transaction2Dto
{
public long Price { get; set; }
public string Message { get; set; }
}
}
using Pole.Sagas.Core;
using Pole.Sagas.Core.Abstraction;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading.Tasks;
namespace SagasTest.Api.Activities
{
public class Transaction2ReturnFalseActivity : IActivity<Transaction2Dto>
{
private readonly IHttpClientFactory httpClientFactory;
public Transaction2ReturnFalseActivity(IHttpClientFactory httpClientFactory)
{
this.httpClientFactory = httpClientFactory;
}
public async Task Compensate(Transaction2Dto data)
{
var httpclient = httpClientFactory.CreateClient();
httpclient.BaseAddress = new Uri("http://localhost:5000");
var result = await httpclient.GetAsync("api/OutGoingMock/Transaction2RollBack");
}
public async Task<ActivityExecuteResult> Execute(Transaction2Dto data)
{
var httpclient = httpClientFactory.CreateClient();
httpclient.BaseAddress = new Uri("http://localhost:5000");
var result = await httpclient.GetAsync("api/OutGoingMock/Transaction2Ok");
return new ActivityExecuteResult {
IsSuccess=false,
Result="用户余额不足"
};
}
}
}
using Pole.Sagas.Core;
using Pole.Sagas.Core.Abstraction;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace SagasTest.Api.Activities
{
public class Transaction3HasResultActivity : IActivity<Transaction3Dto>
{
public Task Compensate(Transaction3Dto data)
{
Console.WriteLine("Transaction3 Rollback");
return Task.CompletedTask;
}
public Task<ActivityExecuteResult> Execute(Transaction3Dto data)
{
Console.WriteLine("Transaction3 commit");
var result = new ActivityExecuteResult
{
IsSuccess = true,
Result = new Transaction3DtoResult
{
OrderId = 112,
UserName = "ccc"
}
};
return Task.FromResult(result);
}
}
public class Transaction3Dto
{
public string Name { get; set; }
public int Age { get; set; }
}
public class Transaction3DtoResult
{
public int OrderId { get; set; }
public string UserName { get; set; }
}
}
using Pole.Sagas.Core;
using Pole.Sagas.Core.Abstraction;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace SagasTest.Api.Activities
{
public class Transaction3ReturnFalseActivity : IActivity<Transaction3Dto>
{
public Task Compensate(Transaction3Dto data)
{
Console.WriteLine("Transaction3 Rollback");
return Task.CompletedTask;
}
public Task<ActivityExecuteResult> Execute(Transaction3Dto data)
{
Console.WriteLine("Transaction3 commit");
var result = new ActivityExecuteResult
{
IsSuccess = false,
Result = "创建订单失败"
};
return Task.FromResult(result);
}
}
}
using Microsoft.AspNetCore.Mvc;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace SagasTest.Api.Controllers
{
[Route("api/[controller]")]
[ApiController]
public class OutGoingMockController : ControllerBase
{
[HttpGet("Transaction1Ok")]
public Task<string> Transaction1Ok()
{
return Task.FromResult("Transaction1 Ok");
}
[HttpGet("Transaction1RollBack")]
public Task<string> Transaction1RollBack()
{
return Task.FromResult("Transaction1 RollBack");
}
[HttpGet("Transaction2Ok")]
public Task<string> Transaction2Ok()
{
return Task.FromResult("Transaction1 Ok");
}
[HttpGet("Transaction2RollBack")]
public Task<string> Transaction2RollBack()
{
return Task.FromResult("Transaction2 RollBack");
}
[HttpGet("Transaction3Ok")]
public Task<string> Transaction3Ok()
{
return Task.FromResult("Transaction1 Ok");
}
}
}
......@@ -3,18 +3,64 @@ using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Pole.Sagas.Core.Abstraction;
using SagasTest.Api.Activities;
namespace SagasTest.Api.Controllers
{
[Route("api/[controller]")]
[ApiController]
public class ValuesController : ControllerBase
public class SagasTestController : ControllerBase
{
private readonly ISagaFactory sagaFactory;
public SagasTestController(ISagaFactory sagaFactory)
{
this.sagaFactory = sagaFactory;
}
// GET api/values
[HttpGet]
public ActionResult<IEnumerable<string>> Get()
[HttpGet("NormalCall")]
public async Task NormalCall()
{
var sagas = sagaFactory.CreateSaga();
sagas.AddActivity("Transaction1Ok", new Transaction1Dto { Id = 1, Name = "22" });
sagas.AddActivity("Transaction2Ok", new Transaction2Dto { Price = 1, Message = "我们" });
sagas.AddActivity("Transaction3HasResult", new Transaction3Dto { Age = 1, Name = "333" });
var result = await sagas.GetResult();
}
[HttpGet("Transaction3ReturnFalse")]
public async Task Transaction3ReturnFalse()
{
var sagas = sagaFactory.CreateSaga();
sagas.AddActivity("Transaction1Ok", new Transaction1Dto { Id = 1, Name = "22" });
sagas.AddActivity("Transaction2Ok", new Transaction2Dto { Price = 1, Message = "我们" });
sagas.AddActivity("Transaction3ReturnFalse", new Transaction3Dto { Age = 1, Name = "333" });
var result = await sagas.GetResult();
}
[HttpGet("Transaction2ReturnFalse")]
public async Task Transaction2ReturnFalse()
{
var sagas = sagaFactory.CreateSaga();
sagas.AddActivity("Transaction1Ok", new Transaction1Dto { Id = 1, Name = "22" });
sagas.AddActivity("Transaction2ReturnFalse", new Transaction2Dto { Price = 1, Message = "我们" });
sagas.AddActivity("Transaction3HasResult", new Transaction3Dto { Age = 1, Name = "333" });
var result = await sagas.GetResult();
}
[HttpGet("Transaction1ReturnFalse")]
public async Task Transaction1ReturnFalse()
{
return new string[] { "value1", "value2" };
var sagas = sagaFactory.CreateSaga();
sagas.AddActivity("Transaction1ReturnFalse", new Transaction1Dto { Id = 1, Name = "22" });
sagas.AddActivity("Transaction2Ok", new Transaction2Dto { Price = 1, Message = "我们" });
sagas.AddActivity("Transaction3HasResult", new Transaction3Dto { Age = 1, Name = "333" });
var result = await sagas.GetResult();
}
// GET api/values/5
......
using Pole.Core.Domain;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace Backet.Api.Domain.AggregatesModel.BacketAggregate
{
public class Backet : Entity, IAggregateRoot
{
public void AddBacketItem(string productId, string productName, long Price)
{
BacketItem backetItem = new BacketItem()
{
Id = Guid.NewGuid().ToString("N"),
Price = Price,
ProductId = productId,
ProductName = productName
};
BacketItems.Add(backetItem);
SetBacketTotalPrice();
}
public void ModifyItemProductId(string productId)
{
BacketItems.ForEach(m => m.ProductId = productId);
}
private void SetBacketTotalPrice()
{
foreach (var item in BacketItems)
{
TotalPrice = BacketItems.Sum(m=>m.Price);
}
}
public string UserId { get; set; }
public List<BacketItem> BacketItems { get; private set; } = new List<BacketItem>();
public long TotalPrice { get; set; }
internal void RemoveFirstItem()
{
var first = BacketItems.FirstOrDefault();
if (first != null)
{
BacketItems.Remove(first);
SetBacketTotalPrice();
}
}
}
}
using Pole.Core.Domain;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace Backet.Api.Domain.AggregatesModel.BacketAggregate
{
public class BacketItem : Entity
{
public string ProductId { get; set; }
public string ProductName { get; set; }
public long Price { get; set; }
public string BacketId { get; set; }
}
}
using Backet.Api.Domain.AggregatesModel.BacketAggregate;
using Backet.Api.Infrastructure.EntityConfigurations;
using Microsoft.EntityFrameworkCore;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace Backet.Api.Infrastructure
{
public class BacketDbContext : DbContext
{
public BacketDbContext(DbContextOptions options) : base(options)
{
}
public DbSet<Backet.Api.Domain.AggregatesModel.BacketAggregate.Backet> Backets { get; set; }
public DbSet<BacketItem> BacketItems { get; set; }
protected override void OnModelCreating(ModelBuilder builder)
{
base.OnModelCreating(builder);
builder.ApplyConfiguration(new BacketItemEntityTypeConfiguration());
builder.ApplyConfiguration(new BacketEntityTypeConfiguration());
}
}
}
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Metadata.Builders;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace Backet.Api.Infrastructure.EntityConfigurations
{
public class BacketEntityTypeConfiguration : IEntityTypeConfiguration<Backet.Api.Domain.AggregatesModel.BacketAggregate.Backet>
{
public void Configure(EntityTypeBuilder<Backet.Api.Domain.AggregatesModel.BacketAggregate.Backet> builder)
{
builder.ToTable(nameof(Backet.Api.Domain.AggregatesModel.BacketAggregate.Backet));
builder.Property(m => m.Id).HasMaxLength(32);
builder.Property(m => m.UserId).HasMaxLength(32).IsRequired();
builder.HasMany(m => m.BacketItems).WithOne().IsRequired().OnDelete(DeleteBehavior.Cascade).HasForeignKey("BacketId");
builder.Ignore(m => m.DomainEvents);
builder.Ignore(m => m.IsPersisted);
builder.HasKey(m => m.Id);
builder.HasIndex(m => m.UserId);
}
}
}
using Backet.Api.Domain.AggregatesModel.BacketAggregate;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Metadata.Builders;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace Backet.Api.Infrastructure.EntityConfigurations
{
public class BacketItemEntityTypeConfiguration : IEntityTypeConfiguration<BacketItem>
{
public void Configure(EntityTypeBuilder<BacketItem> builder)
{
builder.ToTable(nameof(BacketItem));
builder.Property(m => m.Id).HasMaxLength(32);
builder.Property(m => m.ProductId).HasMaxLength(32);
builder.Property(m => m.ProductName).HasMaxLength(256).IsRequired();
builder.Property(m => m.BacketId).HasMaxLength(32).IsRequired();
builder.Ignore(m => m.DomainEvents);
builder.Ignore(m => m.IsPersisted);
builder.HasKey(m => m.Id);
builder.HasIndex(m => m.ProductId);
}
}
}
......@@ -19,9 +19,9 @@
},
"SagasTest.Api": {
"commandName": "Project",
"launchBrowser": true,
"launchBrowser": false,
"launchUrl": "api/values",
"applicationUrl": "https://localhost:5001;http://localhost:5000",
"applicationUrl": "http://localhost:5000",
"environmentVariables": {
"ASPNETCORE_ENVIRONMENT": "Development"
}
......
......@@ -6,12 +6,9 @@
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.AspNetCore.App" />
<PackageReference Include="Microsoft.AspNetCore.Razor.Design" Version="2.2.0" PrivateAssets="All" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\..\src\Pole.Core\Pole.Core.csproj" />
<ProjectReference Include="..\..\..\src\Pole.EventBus.Rabbitmq\Pole.EventBus.Rabbitmq.csproj" />
<ProjectReference Include="..\..\..\src\Pole.EventStorage.PostgreSql\Pole.EventStorage.PostgreSql.csproj" />
<ProjectReference Include="..\..\..\src\Pole.Sagas\Pole.Sagas.csproj" />
</ItemGroup>
......
......@@ -2,10 +2,12 @@
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Backet.Api.Infrastructure;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.HttpsPolicy;
using Microsoft.AspNetCore.Mvc;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
......@@ -27,11 +29,22 @@ namespace SagasTest.Api
// This method gets called by the runtime. Use this method to add services to the container.
public void ConfigureServices(IServiceCollection services)
{
services.AddDbContextPool<BacketDbContext>(options => options.UseNpgsql(Configuration["postgres:write"]));
services.AddControllers();
services.AddPole(config =>
{
config.AddSagas();
config.AddRabbitMQ(option =>
{
option.Hosts = new string[1] { Configuration["RabbitmqConfig:HostAddress"] };
option.Password = Configuration["RabbitmqConfig:HostPassword"];
option.UserName = Configuration["RabbitmqConfig:HostUserName"];
});
config.AddEntityFrameworkEventStorage<BacketDbContext>();
config.AddSagas(option=> {
option.ServiceName = "SagasTest";
});
});
services.AddHttpClient();
}
// This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
......
......@@ -4,5 +4,14 @@
"Default": "Warning"
}
},
"AllowedHosts": "*"
"AllowedHosts": "*",
"postgres": {
"write": "Server=192.168.0.248;Port=5432;Username=postgres;Password=comteck2020!@#;Database=Pole-Backet;Enlist=True;Timeout=0;Command Timeout=600;MinPoolSize=20;MaxPoolSize=500;"
},
"ServiceName": "Backet",
"RabbitmqConfig": {
"HostAddress": "192.168.0.248",
"HostUserName": "comteck",
"HostPassword": "comteck3030"
}
}
......@@ -7,7 +7,7 @@ namespace Pole.Sagas.Core.Abstraction
{
Task SagaStarted(string sagaId, string serviceName);
Task SagaEnded(string sagaId,DateTime ExpiresAt);
Task ActivityStarted(string activityId, string sagaId, DateTime activityTimeoutTime, string parameterContent);
Task ActivityExecuteStarted(string activityId, string sagaId, DateTime activityTimeoutTime, string parameterContent,int order);
Task ActivityRetried(string activityId, string status, int retries,string resultContent);
Task ActivityExecuteAborted(string activityId,string resultContent,string errors);
Task ActivityCompensateAborted(string activityId,string sagaId,string errors);
......
......@@ -9,16 +9,21 @@ namespace Pole.Sagas.Core
public bool IsSuccess { get; set; }
public object Result { get; set; }
public string Errors { get; set; }
public static ActivityExecuteResult Success = new ActivityExecuteResult
{
IsSuccess = true
};
public static implicit operator SagaResult(ActivityExecuteResult activity)
{
return new SagaResult
{
IsSuccess = activity.IsSuccess,
Result = default(object),
Result = activity.Result,
HasException = !string.IsNullOrEmpty(activity.Errors),
ExceptionMessages = activity.Errors
};
}
}
}
......@@ -21,7 +21,8 @@ namespace Pole.Sagas.Core
var baseActivityType = typeof(IActivity<>);
foreach (var assembly in AssemblyHelper.GetAssemblies(this.logger))
{
foreach (var type in assembly.GetTypes().Where(m => m.IsGenericType && m.GetGenericTypeDefinition() == baseActivityType && !m.IsAbstract))
foreach (var type in assembly.GetTypes().Where(m => m.GetInterfaces().Any(i=>i.IsGenericType&& i.GetGenericTypeDefinition() == baseActivityType)&&m.IsClass&&!m.IsAbstract))
{
if (!type.FullName.EndsWith("Activity"))
{
......
......@@ -24,7 +24,7 @@ namespace Pole.Sagas.Core
var dataObjParams = Expression.Parameter(typeof(object), "data");
var dataParams = Expression.Convert(dataObjParams, ActivityDataType);
var method = ActivityType.GetMethod("Execute", new Type[] { ActivityDataType });
var body = Expression.Call(activityObjParams, method, activityObjParams, dataParams);
var body = Expression.Call(activityParams, method, dataParams);
var func = Expression.Lambda<Func<object, object, Task<ActivityExecuteResult>>>(body, activityObjParams, dataObjParams).Compile();
using (var scope = ServiceProvider.CreateScope())
......@@ -40,8 +40,9 @@ namespace Pole.Sagas.Core
var dataObjParams = Expression.Parameter(typeof(object), "data");
var dataParams = Expression.Convert(dataObjParams, ActivityDataType);
var method = ActivityType.GetMethod("Compensate", new Type[] { ActivityDataType });
var body = Expression.Call(activityObjParams, method, activityObjParams, dataParams);
var body = Expression.Call(activityParams, method, dataParams);
var func = Expression.Lambda<Func<object, object, Task>>(body, activityObjParams, dataObjParams).Compile();
using (var scope = ServiceProvider.CreateScope())
{
var activity = scope.ServiceProvider.GetRequiredService(ActivityType);
......
......@@ -33,7 +33,7 @@ namespace Pole.Sagas.Core
return Task.CompletedTask;
}
public Task ActivityStarted(string activityId, string sagaId, DateTime activityTimeoutTime, string parameterContent)
public Task ActivityExecuteStarted(string activityId, string sagaId, DateTime activityTimeoutTime, string parameterContent, int order)
{
return Task.CompletedTask;
}
......
......@@ -2,14 +2,13 @@
using System.Collections.Generic;
using System.Text;
namespace Pole.Sagas.Core
namespace Pole.Sagas.Core.Exceptions
{
public class ActivityCompensateResult
class ActivityImplementIrregularException: Exception
{
/// <summary>
/// If not success , this activity will be aborted , and current saga will compensate all previous activities
/// </summary>
public bool IsSuccess { get; set; }
public string Message { get; set; }
public ActivityImplementIrregularException(string name) : base($"Activity name :{name }must have and only inherit from IActivity<>")
{
}
}
}
......@@ -8,7 +8,7 @@ namespace Pole.Sagas.Core
public interface ISaga
{
string Id { get; }
void AddActivity<TData>(string activityName, TData data);
void AddActivity(string activityName, object data);
Task<SagaResult> GetResult();
}
}
using Pole.Core.Serialization;
using Pole.Core.Utils.Abstraction;
using Pole.Sagas.Core.Abstraction;
using Pole.Sagas.Core.Exceptions;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
......@@ -14,6 +16,7 @@ namespace Pole.Sagas.Core
private IServiceProvider serviceProvider;
private IEventSender eventSender;
private ISnowflakeIdGenerator snowflakeIdGenerator;
private IActivityFinder activityFinder;
private PoleSagasOption poleSagasOption;
private int _currentMaxOrder = 0;
private int _currentExecuteOrder = 0;
......@@ -21,24 +24,33 @@ namespace Pole.Sagas.Core
private ISerializer serializer;
public string Id { get; }
public Saga(ISnowflakeIdGenerator snowflakeIdGenerator, IServiceProvider serviceProvider, IEventSender eventSender, PoleSagasOption poleSagasOption, ISerializer serializer)
public Saga(ISnowflakeIdGenerator snowflakeIdGenerator, IServiceProvider serviceProvider, IEventSender eventSender, PoleSagasOption poleSagasOption, ISerializer serializer, IActivityFinder activityFinder)
{
this.snowflakeIdGenerator = snowflakeIdGenerator;
this.serviceProvider = serviceProvider;
this.eventSender = eventSender;
this.poleSagasOption = poleSagasOption;
this.serializer = serializer;
this.activityFinder = activityFinder;
Id = snowflakeIdGenerator.NextId();
}
public void AddActivity<TData>(string activityName, TData data)
public void AddActivity(string activityName, object data)
{
var targetActivityType = activityFinder.FindType(activityName);
var activityInterface = targetActivityType.GetInterfaces().FirstOrDefault();
if (!activityInterface.IsGenericType)
{
throw new ActivityImplementIrregularException(activityName);
}
var dataType = activityInterface.GetGenericArguments()[0];
_currentMaxOrder++;
ActivityWapper activityWapper = new ActivityWapper
{
ActivityDataType = typeof(TData),
ActivityDataType = dataType,
ActivityState = ActivityStatus.NotStarted,
ActivityType = data.GetType(),
ActivityType = targetActivityType,
DataObj = data,
Order = _currentMaxOrder,
ServiceProvider = serviceProvider
......@@ -68,25 +80,23 @@ namespace Pole.Sagas.Core
return null;
}
_currentExecuteOrder++;
return activities[_currentExecuteOrder];
return activities[_currentExecuteOrder-1];
}
private ActivityWapper GetNextCompensateActivity()
{
_currentCompensateOrder--;
if (_currentExecuteOrder == 0)
if (_currentCompensateOrder == 0)
{
return null;
}
return activities[_currentCompensateOrder];
return activities[_currentCompensateOrder-1];
}
private async Task RecursiveCompensateActivity(ActivityWapper activityWapper)
{
var activityId = activityWapper.Id;
try
{
//var jsonContent = serializer.Serialize(activityWapper.DataObj, activityWapper.ActivityDataType);
//await eventSender.ActivityStarted(activityId, Id, activityWapper.TimeOut, jsonContent);
await activityWapper.InvokeCompensate();
await eventSender.ActivityCompensated(activityId);
var compensateActivity = GetNextCompensateActivity();
......@@ -108,7 +118,7 @@ namespace Pole.Sagas.Core
try
{
var jsonContent = serializer.Serialize(activityWapper.DataObj, activityWapper.ActivityDataType);
await eventSender.ActivityStarted(activityId, Id, activityWapper.TimeOut, jsonContent);
await eventSender.ActivityExecuteStarted(activityId, Id, activityWapper.TimeOut, jsonContent, activityWapper.Order);
var result = await activityWapper.InvokeExecute();
if (!result.IsSuccess)
{
......
......@@ -15,18 +15,20 @@ namespace Pole.Sagas.Core
private readonly IEventSender eventSender;
private readonly PoleSagasOption poleSagasOption;
private readonly ISerializer serializer;
public SagaFactory(ISnowflakeIdGenerator snowflakeIdGenerator, IServiceProvider serviceProvider, IEventSender eventSender, IOptions<PoleSagasOption> poleSagasOption, ISerializer serializer)
private readonly IActivityFinder activityFinder;
public SagaFactory(ISnowflakeIdGenerator snowflakeIdGenerator, IServiceProvider serviceProvider, IEventSender eventSender, IOptions<PoleSagasOption> poleSagasOption, ISerializer serializer, IActivityFinder activityFinder)
{
this.snowflakeIdGenerator = snowflakeIdGenerator;
this.serviceProvider = serviceProvider;
this.eventSender = eventSender;
this.poleSagasOption = poleSagasOption.Value;
this.serializer = serializer;
this.activityFinder = activityFinder;
}
public ISaga CreateSaga()
{
return new Saga(snowflakeIdGenerator, serviceProvider, eventSender, poleSagasOption, serializer);
return new Saga(snowflakeIdGenerator, serviceProvider, eventSender, poleSagasOption, serializer, activityFinder);
}
}
}
using Pole.Sagas.Core.Exceptions;
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.Sagas.Core
{
public class SagasCollection : Dictionary<string, ISaga>
{
private static System.Collections.Concurrent.ConcurrentDictionary<string, ISaga> _sagas = new System.Collections.Concurrent.ConcurrentDictionary<string, ISaga>();
public static ISaga Get(string name)
{
if (!_sagas.TryGetValue(name, out ISaga saga))
{
throw new SagaNotFoundException(name);
}
return saga;
}
public static bool Add(ISaga saga)
{
var name = saga.GetType().FullName;
return _sagas.TryAdd(name, saga);
}
}
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using Microsoft.Extensions.DependencyInjection;
using Pole.Core;
using Pole.Core.Utils;
using Pole.Sagas.Core;
using Pole.Sagas.Core.Abstraction;
using Pole.Sagas.Core.Exceptions;
namespace Microsoft.Extensions.DependencyInjection
{
......@@ -16,14 +19,19 @@ namespace Microsoft.Extensions.DependencyInjection
startupOption.Services.AddSingleton<IActivityFinder, ActivityFinder>();
startupOption.Services.AddSingleton<IEventSender, EventSender>();
startupOption.Services.AddSingleton<ISagaFactory, SagaFactory>();
}
public static void AddSagas(this StartupConfig startupOption)
{
Action<PoleSagasOption> action = option => { };
startupOption.Services.Configure(action);
startupOption.Services.AddSingleton<IActivityFinder, ActivityFinder>();
startupOption.Services.AddSingleton<IEventSender, EventSender>();
startupOption.Services.AddSingleton<ISagaFactory, SagaFactory>();
var baseActivityType = typeof(IActivity<>);
foreach (var assembly in AssemblyHelper.GetAssemblies())
{
foreach (var type in assembly.GetTypes().Where(m => m.GetInterfaces().Any(i => i.IsGenericType && i.GetGenericTypeDefinition() == baseActivityType) && m.IsClass && !m.IsAbstract))
{
if (!type.FullName.EndsWith("Activity"))
{
throw new ActivityNameIrregularException(type);
}
startupOption.Services.AddScoped(type);
}
}
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment