Commit e519b36d by dingsongjie

完成 核心部分 除 超时外所有处理

parent a0c8638a
using Pole.Sagas.Core;
using Pole.Sagas.Core.Abstraction;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading.Tasks;
namespace SagasTest.Api.Activities
{
public class Transaction1CompensateErrorActivity : IActivity<Transaction1Dto>
{
private readonly IHttpClientFactory httpClientFactory;
public Transaction1CompensateErrorActivity(IHttpClientFactory httpClientFactory)
{
this.httpClientFactory = httpClientFactory;
}
public Task Compensate(Transaction1Dto data)
{
throw new NotImplementedException();
}
public async Task<ActivityExecuteResult> Execute(Transaction1Dto data)
{
var httpclient = httpClientFactory.CreateClient();
httpclient.BaseAddress = new Uri("http://localhost:5000");
var result = await httpclient.GetAsync("api/OutGoingMock/Transaction1Ok");
return ActivityExecuteResult.Success;
}
}
}
using Pole.Sagas.Core;
using Pole.Sagas.Core.Abstraction;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading.Tasks;
namespace SagasTest.Api.Activities
{
public class Transaction1ExceptionActivity : IActivity<Transaction1Dto>
{
private readonly IHttpClientFactory httpClientFactory;
public Transaction1ExceptionActivity(IHttpClientFactory httpClientFactory)
{
this.httpClientFactory = httpClientFactory;
}
public async Task Compensate(Transaction1Dto data)
{
var httpclient = httpClientFactory.CreateClient();
httpclient.BaseAddress = new Uri("http://localhost:5000");
var result = await httpclient.GetAsync("api/OutGoingMock/Transaction1RollBack");
}
public Task<ActivityExecuteResult> Execute(Transaction1Dto data)
{
throw new NotImplementedException();
}
}
}
......@@ -13,8 +13,7 @@ namespace SagasTest.Api.Activities
private readonly IHttpClientFactory httpClientFactory;
public Transaction1OkActivity(IHttpClientFactory httpClientFactory)
{
this.httpClientFactory = httpClientFactory;
this.httpClientFactory = httpClientFactory;
}
public async Task Compensate(Transaction1Dto data)
{
......
using Pole.Sagas.Core;
using Pole.Sagas.Core.Abstraction;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading.Tasks;
namespace SagasTest.Api.Activities
{
public class Transaction2CompensateErrorActivity : IActivity<Transaction2Dto>
{
private readonly IHttpClientFactory httpClientFactory;
public Transaction2CompensateErrorActivity(IHttpClientFactory httpClientFactory)
{
this.httpClientFactory = httpClientFactory;
}
public Task Compensate(Transaction2Dto data)
{
throw new NotImplementedException();
}
public async Task<ActivityExecuteResult> Execute(Transaction2Dto data)
{
var httpclient = httpClientFactory.CreateClient();
httpclient.BaseAddress = new Uri("http://localhost:5000");
var result = await httpclient.GetAsync("api/OutGoingMock/Transaction2Ok");
return ActivityExecuteResult.Success;
}
}
}
using Pole.Sagas.Core;
using Pole.Sagas.Core.Abstraction;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading.Tasks;
namespace SagasTest.Api.Activities
{
public class Transaction2ExceptionActivity : IActivity<Transaction2Dto>
{
private readonly IHttpClientFactory httpClientFactory;
public Transaction2ExceptionActivity(IHttpClientFactory httpClientFactory)
{
this.httpClientFactory = httpClientFactory;
}
public async Task Compensate(Transaction2Dto data)
{
var httpclient = httpClientFactory.CreateClient();
httpclient.BaseAddress = new Uri("http://localhost:5000");
var result = await httpclient.GetAsync("api/OutGoingMock/Transaction2RollBack");
}
public Task<ActivityExecuteResult> Execute(Transaction2Dto data)
{
throw new NotImplementedException();
}
}
}
......@@ -14,7 +14,6 @@ namespace SagasTest.Api.Activities
public Transaction2OkActivity(IHttpClientFactory httpClientFactory)
{
this.httpClientFactory = httpClientFactory;
}
public async Task Compensate(Transaction2Dto data)
{
......
using Pole.Sagas.Core;
using Pole.Sagas.Core.Abstraction;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace SagasTest.Api.Activities
{
public class Transaction3ExceptionActivity : IActivity<Transaction3Dto>
{
public Task Compensate(Transaction3Dto data)
{
Console.WriteLine("Transaction3 Rollback");
return Task.CompletedTask;
}
public Task<ActivityExecuteResult> Execute(Transaction3Dto data)
{
throw new NotImplementedException();
}
}
}
......@@ -62,7 +62,61 @@ namespace SagasTest.Api.Controllers
var result = await sagas.GetResult();
}
[HttpGet("Transaction3Exception")]
public async Task Transaction3Exception()
{
var sagas = sagaFactory.CreateSaga();
sagas.AddActivity("Transaction1Ok", new Transaction1Dto { Id = 1, Name = "22" });
sagas.AddActivity("Transaction2Ok", new Transaction2Dto { Price = 1, Message = "我们" });
sagas.AddActivity("Transaction3Exception", new Transaction3Dto { Age = 1, Name = "333" });
var result = await sagas.GetResult();
}
[HttpGet("Transaction2Exception")]
public async Task Transaction2Exception()
{
var sagas = sagaFactory.CreateSaga();
sagas.AddActivity("Transaction1Ok", new Transaction1Dto { Id = 1, Name = "22" });
sagas.AddActivity("Transaction2Exception", new Transaction2Dto { Price = 1, Message = "我们" });
sagas.AddActivity("Transaction3HasResult", new Transaction3Dto { Age = 1, Name = "333" });
var result = await sagas.GetResult();
}
[HttpGet("Transaction1Exception")]
public async Task Transaction1Exception()
{
var sagas = sagaFactory.CreateSaga();
sagas.AddActivity("Transaction1Exception", new Transaction1Dto { Id = 1, Name = "22" });
sagas.AddActivity("Transaction2Ok", new Transaction2Dto { Price = 1, Message = "我们" });
sagas.AddActivity("Transaction3HasResult", new Transaction3Dto { Age = 1, Name = "333" });
var result = await sagas.GetResult();
}
[HttpGet("Transaction2CompensateError")]
public async Task Transaction2CompensateError()
{
var sagas = sagaFactory.CreateSaga();
sagas.AddActivity("Transaction1Ok", new Transaction1Dto { Id = 1, Name = "22" });
sagas.AddActivity("Transaction2CompensateError", new Transaction2Dto { Price = 1, Message = "我们" });
sagas.AddActivity("Transaction3Exception", new Transaction3Dto { Age = 1, Name = "333" });
var result = await sagas.GetResult();
}
[HttpGet("Transaction1CompensateError")]
public async Task Transaction1CompensateError()
{
var sagas = sagaFactory.CreateSaga();
sagas.AddActivity("Transaction1CompensateError", new Transaction1Dto { Id = 1, Name = "22" });
sagas.AddActivity("Transaction2Ok", new Transaction2Dto { Price = 1, Message = "我们" });
sagas.AddActivity("Transaction3Exception", new Transaction3Dto { Age = 1, Name = "333" });
var result = await sagas.GetResult();
}
// GET api/values/5
[HttpGet("{id}")]
public ActionResult<string> Get(int id)
......
......@@ -80,7 +80,7 @@ namespace Pole.Sagas.Core
return null;
}
_currentExecuteOrder++;
return activities[_currentExecuteOrder-1];
return activities[_currentExecuteOrder - 1];
}
private ActivityWapper GetNextCompensateActivity()
{
......@@ -90,7 +90,7 @@ namespace Pole.Sagas.Core
return null;
}
return activities[_currentCompensateOrder-1];
return activities[_currentCompensateOrder - 1];
}
private async Task RecursiveCompensateActivity(ActivityWapper activityWapper)
{
......@@ -123,9 +123,7 @@ namespace Pole.Sagas.Core
if (!result.IsSuccess)
{
await eventSender.ActivityExecuteAborted(activityId, serializer.Serialize(result.Result), string.Empty);
_currentCompensateOrder = _currentExecuteOrder;
var compensateActivity = GetNextCompensateActivity();
await RecursiveCompensateActivity(compensateActivity);
await CompensateActivity(result);
return result;
}
await eventSender.ActivityEnded(activityId, string.Empty);
......@@ -142,13 +140,26 @@ namespace Pole.Sagas.Core
catch (Exception exception)
{
var errors = exception.InnerException != null ? exception.InnerException.Message + exception.StackTrace : exception.Message + exception.StackTrace;
await eventSender.ActivityExecuteAborted(activityId, string.Empty, errors);
return new ActivityExecuteResult
var result = new ActivityExecuteResult
{
IsSuccess = false,
Errors = errors
};
await eventSender.ActivityExecuteAborted(activityId, string.Empty, errors);
return await CompensateActivity(result);
}
}
private async Task<ActivityExecuteResult> CompensateActivity(ActivityExecuteResult result)
{
_currentCompensateOrder = _currentExecuteOrder;
var compensateActivity = GetNextCompensateActivity();
if (compensateActivity == null)
{
return result;
}
await RecursiveCompensateActivity(compensateActivity);
return result;
}
}
}
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.Sagas.Core
{
enum SagaStatus
{
Started,
Ended,
Error
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment