Commit 5511b5b8 by dingsongjie

重构 eventbus 的 代码结构

parent dc62bcb8
Showing with 301 additions and 126 deletions
......@@ -43,7 +43,11 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SagasTest.Api", "samples\ap
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Pole.Sagas.Server", "src\Pole.Sagas.Server\Pole.Sagas.Server.csproj", "{34ECE24E-0D78-4764-BC54-0CEE61BDB96A}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Pole.Sagas.Client", "src\Pole.Sagas.Client\Pole.Sagas.Client.csproj", "{EED4FEA7-4E20-41B2-9741-55FCA709373E}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Pole.Sagas.Client", "src\Pole.Sagas.Client\Pole.Sagas.Client.csproj", "{EED4FEA7-4E20-41B2-9741-55FCA709373E}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SagasServer", "samples\apis\SagasServer\SagasServer.csproj", "{9717D0D7-9288-4B2F-9830-A06E13B1C8C4}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Pole.EventBus", "src\Pole.EventBus\Pole.EventBus.csproj", "{A92E194A-518F-4A93-B346-0FEA50CCD614}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
......@@ -115,6 +119,14 @@ Global
{EED4FEA7-4E20-41B2-9741-55FCA709373E}.Debug|Any CPU.Build.0 = Debug|Any CPU
{EED4FEA7-4E20-41B2-9741-55FCA709373E}.Release|Any CPU.ActiveCfg = Release|Any CPU
{EED4FEA7-4E20-41B2-9741-55FCA709373E}.Release|Any CPU.Build.0 = Release|Any CPU
{9717D0D7-9288-4B2F-9830-A06E13B1C8C4}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{9717D0D7-9288-4B2F-9830-A06E13B1C8C4}.Debug|Any CPU.Build.0 = Debug|Any CPU
{9717D0D7-9288-4B2F-9830-A06E13B1C8C4}.Release|Any CPU.ActiveCfg = Release|Any CPU
{9717D0D7-9288-4B2F-9830-A06E13B1C8C4}.Release|Any CPU.Build.0 = Release|Any CPU
{A92E194A-518F-4A93-B346-0FEA50CCD614}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{A92E194A-518F-4A93-B346-0FEA50CCD614}.Debug|Any CPU.Build.0 = Debug|Any CPU
{A92E194A-518F-4A93-B346-0FEA50CCD614}.Release|Any CPU.ActiveCfg = Release|Any CPU
{A92E194A-518F-4A93-B346-0FEA50CCD614}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
......@@ -138,6 +150,8 @@ Global
{6138197E-6202-4E1B-9458-3CBEE60A36F9} = {475116FC-DEEC-4255-94E4-AE7B8C85038D}
{34ECE24E-0D78-4764-BC54-0CEE61BDB96A} = {9932C965-8B38-4F70-9E43-86DC56860E2B}
{EED4FEA7-4E20-41B2-9741-55FCA709373E} = {9932C965-8B38-4F70-9E43-86DC56860E2B}
{9717D0D7-9288-4B2F-9830-A06E13B1C8C4} = {475116FC-DEEC-4255-94E4-AE7B8C85038D}
{A92E194A-518F-4A93-B346-0FEA50CCD614} = {9932C965-8B38-4F70-9E43-86DC56860E2B}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {DB0775A3-F293-4043-ADB7-72BAC081E87E}
......
......@@ -49,6 +49,7 @@
<ItemGroup>
<ProjectReference Include="..\..\..\src\Pole.Core\Pole.Core.csproj" />
<ProjectReference Include="..\..\..\src\Pole.EventBus.Rabbitmq\Pole.EventBus.Rabbitmq.csproj" />
<ProjectReference Include="..\..\..\src\Pole.EventBus\Pole.EventBus.csproj" />
<ProjectReference Include="..\..\..\src\Pole.EventStorage.PostgreSql\Pole.EventStorage.PostgreSql.csproj" />
<ProjectReference Include="..\..\..\src\Pole.Grpc\Pole.Grpc.csproj" />
<ProjectReference Include="..\..\..\src\Pole.Orleans.Provider.EntityframeworkCore\Pole.Orleans.Provider.EntityframeworkCore.csproj" />
......
......@@ -17,13 +17,10 @@ using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Npgsql;
using Orleans;
using Pole.Core.EventBus;
using Pole.Core.EventBus.Event;
using Pole.Core.EventBus.EventHandler;
using Pole.Core.EventBus.EventStorage;
using Pole.Core.Serialization;
using Pole.Core.UnitOfWork;
using Pole.Core.Utils.Abstraction;
using Pole.EventBus;
namespace Backet.Api.Controllers
{
......
using Pole.Core.EventBus.Event;
using Pole.Core.Domain;
using System;
using System.Collections.Generic;
using System.Linq;
......@@ -7,7 +7,7 @@ using System.Threading.Tasks;
namespace Backet.Api.Domain.Event
{
[EventInfo(EventName = "Backet")]
[Pole.EventBus.Event.EventInfo(EventName = "Backet")]
public class BacketCreatedEvent : IEvent
{
public string BacketId { get; set; }
......
using Backet.Api.Domain.Event;
using Pole.Core.EventBus.EventHandler;
using Pole.EventBus.EventHandler;
using System;
using System.Collections.Generic;
using System.Linq;
......
using Backet.Api.Domain.Event;
using Backet.Api.EventHandlers.Abstraction;
using Pole.Core.EventBus.EventHandler;
using Pole.EventBus.EventHandler;
using System;
using System.Collections.Generic;
using System.Linq;
......
......@@ -8,8 +8,8 @@ using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Backet.Api.Domain.Event;
using Pole.Core.UnitOfWork;
using Pole.Core.EventBus.Transaction;
using Pole.Core.EventBus;
using Pole.EventBus.UnitOfWork;
using Pole.EventBus;
namespace Backet.Api.Grains
{
......
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.AspNetCore;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
namespace SagasServer
{
public class Program
{
public static void Main(string[] args)
{
CreateWebHostBuilder(args).Build().Run();
}
public static IWebHostBuilder CreateWebHostBuilder(string[] args) =>
WebHost.CreateDefaultBuilder(args)
.UseStartup<Startup>();
}
}
{
"iisSettings": {
"windowsAuthentication": false,
"anonymousAuthentication": true,
"iisExpress": {
"applicationUrl": "http://localhost:53309",
"sslPort": 44342
}
},
"profiles": {
"IIS Express": {
"commandName": "IISExpress",
"launchBrowser": true,
"environmentVariables": {
"ASPNETCORE_ENVIRONMENT": "Development"
}
},
"SagasServer": {
"commandName": "Project",
"launchBrowser": true,
"applicationUrl": "https://localhost:5001;http://localhost:5000",
"environmentVariables": {
"ASPNETCORE_ENVIRONMENT": "Development"
}
}
}
}
\ No newline at end of file
<Project Sdk="Microsoft.NET.Sdk.Web">
<PropertyGroup>
<TargetFramework>netcoreapp3.1</TargetFramework>
<AspNetCoreHostingModel>InProcess</AspNetCoreHostingModel>
</PropertyGroup>
<ItemGroup>
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\..\src\Pole.Sagas.Server\Pole.Sagas.Server.csproj" />
</ItemGroup>
</Project>
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
namespace SagasServer
{
public class Startup
{
// This method gets called by the runtime. Use this method to add services to the container.
// For more information on how to configure your application, visit https://go.microsoft.com/fwlink/?LinkID=398940
public void ConfigureServices(IServiceCollection services)
{
services.AddPoleSagasServer();
}
// This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
{
if (env.IsDevelopment())
{
app.UseDeveloperExceptionPage();
}
app.Run(async (context) =>
{
await context.Response.WriteAsync("Hello World!");
});
}
}
}
{
"Logging": {
"LogLevel": {
"Default": "Debug",
"System": "Information",
"Microsoft": "Information"
}
}
}
{
"Logging": {
"LogLevel": {
"Default": "Warning"
}
},
"AllowedHosts": "*"
}
......@@ -33,13 +33,13 @@ namespace SagasTest.Api
services.AddControllers();
services.AddPole(config =>
{
config.AddRabbitMQ(option =>
{
option.Hosts = new string[1] { Configuration["RabbitmqConfig:HostAddress"] };
option.Password = Configuration["RabbitmqConfig:HostPassword"];
option.UserName = Configuration["RabbitmqConfig:HostUserName"];
});
config.AddEntityFrameworkEventStorage<BacketDbContext>();
//config.AddRabbitMQ(option =>
//{
// option.Hosts = new string[1] { Configuration["RabbitmqConfig:HostAddress"] };
// option.Password = Configuration["RabbitmqConfig:HostPassword"];
// option.UserName = Configuration["RabbitmqConfig:HostUserName"];
//});
//config.AddEntityFrameworkEventStorage<BacketDbContext>();
config.AddSagas(option=> {
option.ServiceName = "SagasTest";
});
......
using Pole.Core.EventBus.Event;
using System;
using System;
using System.Collections.Generic;
using System.Text;
......
......@@ -2,7 +2,7 @@
using System.Collections.Generic;
using System.Text;
namespace Pole.Core.EventBus.Event
namespace Pole.Core.Domain
{
public interface IEvent
{
......
using Microsoft.Extensions.DependencyInjection;
using Pole.Core;
using Pole.Core.Channels;
using Pole.Core.EventBus;
using Pole.Core.Processor;
using Pole.Core.Processor.Server;
using Pole.Core.Query;
using Pole.Core.Serialization;
using Pole.Core.UnitOfWork;
using Pole.Core.Utils;
using Pole.Core.Utils.Abstraction;
using System;
......@@ -24,14 +21,9 @@ namespace Microsoft.Extensions.DependencyInjection
{
services.Configure<PoleOptions>(option => { });
}
services.AddSingleton<IEventTypeFinder, EventTypeFinder>();
services.AddSingleton<IEventBuffer, EventBuffer>();
services.AddTransient(typeof(IMpscChannel<>), typeof(MpscChannel<>));
services.AddScoped<IBus, Bus>();
services.AddScoped<IUnitOfWork, Pole.Core.UnitOfWork.UnitOfWork>();
services.AddSingleton<ISerializer, DefaultJsonSerializer>();
services.AddSingleton<IGeneratorIdSolver, InstanceIPV4_16IdGeneratorIdSolver>();
services.AddSingleton<IObserverUnitContainer, ObserverUnitContainer>();
services.AddSingleton<IQueryRegister, QueryRegister>();
using (var serviceProvider = services.BuildServiceProvider())
{
......@@ -41,11 +33,6 @@ namespace Microsoft.Extensions.DependencyInjection
var queryRegister = serviceProvider.GetService<IQueryRegister>();
queryRegister.Register(services, ServiceLifetime.Scoped);
}
services.AddSingleton<IProcessor, PendingMessageRetryProcessor>();
services.AddSingleton<IProcessor, ExpiredEventsCollectorProcessor>();
services.AddHostedService<BackgroundServiceBasedProcessorServer>();
config(startupOption);
return services;
}
......
using Orleans;
using Pole.Core.Domain;
using Pole.Core.EventBus.Event;
using System;
using System.Collections.Generic;
using System.Text;
......
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Pole.Core.EventBus;
namespace Pole.EventBus.RabbitMQ
{
......
......@@ -2,17 +2,17 @@
using Microsoft.Extensions.Logging;
using Orleans;
using RabbitMQ.Client;
using Pole.Core.EventBus;
using Pole.Core.Exceptions;
using Pole.Core.Utils;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading.Tasks;
using Pole.Core.EventBus.Event;
using Pole.Core.EventBus.EventHandler;
using Microsoft.Extensions.Options;
using System.Linq;
using Pole.EventBus.Event;
using Pole.Core.Domain;
using Pole.EventBus.EventHandler;
namespace Pole.EventBus.RabbitMQ
{
......
using System.Threading.Tasks;
using Pole.Core.EventBus;
namespace Pole.EventBus.RabbitMQ
{
......
using Pole.Core.EventBus;
using Pole.Core.EventBus.EventHandler;
using Pole.Core.Exceptions;
using Pole.Core.Exceptions;
using Pole.Core.Utils;
using System;
using System.Collections.Generic;
......
......@@ -15,6 +15,7 @@
<ItemGroup>
<ProjectReference Include="..\Pole.Core\Pole.Core.csproj" />
<ProjectReference Include="..\Pole.EventBus\Pole.EventBus.csproj" />
</ItemGroup>
</Project>
......@@ -5,7 +5,7 @@ using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using Pole.Core;
using Pole.Core.EventBus;
using Pole.EventBus;
using Pole.EventBus.RabbitMQ;
namespace Microsoft.Extensions.DependencyInjection
......
using Microsoft.Extensions.Options;
using Pole.Core;
using Pole.Core.EventBus;
using System;
using System.Collections.Generic;
using System.Linq;
......

using Pole.Core.EventBus.Event;
using Pole.Core.EventBus.EventStorage;
using Pole.Core.EventBus.Transaction;
using Pole.EventBus.Event;
using Pole.EventBus.EventStorage;
using Pole.EventBus.Transaction;
using Pole.Core.Serialization;
using Pole.Core.Utils.Abstraction;
using System;
......@@ -11,7 +11,7 @@ using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Pole.Core.EventBus
namespace Pole.EventBus
{
class Bus : IBus
{
......
......@@ -3,7 +3,7 @@ using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace Pole.Core.EventBus
namespace Pole.EventBus
{
public abstract class Consumer : IConsumer
{
......
......@@ -4,7 +4,7 @@ using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.Core.EventBus.Event
namespace Pole.EventBus.Event
{
public readonly struct EventBytesTransport
{
......
......@@ -2,7 +2,7 @@
using System.Collections.Generic;
using System.Text;
namespace Pole.Core.EventBus.Event
namespace Pole.EventBus.Event
{
[AttributeUsage(AttributeTargets.Class)]
public class EventInfoAttribute: Attribute
......
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Pole.Core.EventBus.Event;
using Pole.Core.EventBus.EventStorage;
using Pole.Core;
using Pole.EventBus.Event;
using Pole.EventBus.EventStorage;
using System;
using System.Collections.Generic;
using System.Linq;
......@@ -10,7 +11,7 @@ using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
namespace Pole.Core.EventBus
namespace Pole.EventBus
{
class EventBuffer : IEventBuffer
{
......
using Orleans;
using Pole.Core.EventBus.Event;
using Pole.EventBus.Event;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
namespace Pole.Core.EventBus.EventHandler
namespace Pole.EventBus.EventHandler
{
public interface IPoleEventHandler<TEvent> : IPoleEventHandler
{
......
using Microsoft.Extensions.Logging;
using Orleans.Concurrency;
using Pole.Core.EventBus.Event;
using Pole.EventBus.Event;
using Pole.Core.Serialization;
using System;
using System.Collections.Generic;
......@@ -14,7 +14,7 @@ using Pole.Core.Exceptions;
using Orleans;
using Pole.Core.Utils.Abstraction;
namespace Pole.Core.EventBus.EventHandler
namespace Pole.EventBus.EventHandler
{
/// <summary>
///
......
......@@ -2,7 +2,7 @@
using System.Collections.Generic;
using System.Text;
namespace Pole.Core.EventBus.EventStorage
namespace Pole.EventBus.EventStorage
{
public class EventEntity
{
......
......@@ -2,7 +2,7 @@
using System.Collections.Generic;
using System.Text;
namespace Pole.Core.EventBus.EventStorage
namespace Pole.EventBus.EventStorage
{
public enum EventStatus
{
......
......@@ -4,7 +4,7 @@ using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Pole.Core.EventBus.EventStorage
namespace Pole.EventBus.EventStorage
{
public interface IEventStorage
{
......
......@@ -4,7 +4,7 @@ using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Pole.Core.EventBus.EventStorage
namespace Pole.EventBus.EventStorage
{
public interface IEventStorageInitializer
{
......
using Microsoft.Extensions.Logging;
using Pole.Core.EventBus.Event;
using Pole.Core.Domain;
using Pole.Core.Exceptions;
using Pole.Core.Utils;
using Pole.Core.Utils.Abstraction;
using Pole.EventBus.Event;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace Pole.Core.Serialization
namespace Pole.EventBus
{
public class EventTypeFinder : IEventTypeFinder
{
......
using Pole.Core.EventBus.EventStorage;
using Pole.Core.EventBus.Transaction;
using Pole.EventBus.EventStorage;
using Pole.EventBus.Transaction;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
......@@ -7,7 +7,7 @@ using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Pole.Core.EventBus
namespace Pole.EventBus
{
public interface IBus
{
......
using System.Collections.Generic;
using System.Threading.Tasks;
namespace Pole.Core.EventBus
namespace Pole.EventBus
{
public interface IConsumer
{
......
using System.Collections.Generic;
namespace Pole.Core.EventBus
namespace Pole.EventBus
{
public interface IConsumerContainer
{
......
using Pole.Core.EventBus.EventStorage;
using Pole.EventBus.EventStorage;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
namespace Pole.Core.EventBus
namespace Pole.EventBus
{
public interface IEventBuffer
{
......
......@@ -2,7 +2,7 @@
using System.Collections.Generic;
using System.Text;
namespace Pole.Core.Utils.Abstraction
namespace Pole.EventBus
{
public interface IEventTypeFinder
{
......
using System;
namespace Pole.Core.EventBus
namespace Pole.EventBus
{
public interface IGrainID
{
......
......@@ -2,7 +2,7 @@
using System.Collections.Generic;
using System.Threading.Tasks;
namespace Pole.Core.EventBus
namespace Pole.EventBus
{
public interface IObserverUnit<PrimaryKey> : IGrainID
{
......
using System;
using System.Collections.Generic;
namespace Pole.Core.EventBus
namespace Pole.EventBus
{
public interface IObserverUnitContainer
{
......
using System.Collections.Generic;
using System.Threading.Tasks;
namespace Pole.Core.EventBus
namespace Pole.EventBus
{
public interface IProducer
{
......
using System;
using System.Threading.Tasks;
namespace Pole.Core.EventBus
namespace Pole.EventBus
{
public interface IProducerInfoContainer
{
......
......@@ -7,14 +7,14 @@ using System.Text;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using System.Linq;
using Pole.Core.EventBus.Event;
using Pole.EventBus.Event;
using Orleans.Concurrency;
using System.Collections.Concurrent;
using System.Linq.Expressions;
using Pole.Core.EventBus.EventHandler;
using Pole.EventBus.EventHandler;
using Pole.Core.Utils.Abstraction;
namespace Pole.Core.EventBus
namespace Pole.EventBus
{
public class ObserverUnit<PrimaryKey> : IObserverUnit<PrimaryKey>
{
......
......@@ -5,12 +5,12 @@ using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Text;
using Microsoft.Extensions.DependencyInjection;
using Pole.Core.EventBus.EventHandler;
using Pole.EventBus.EventHandler;
using System.Linq;
using Pole.Core.Exceptions;
using Pole.Core.EventBus.Event;
using Pole.EventBus.Event;
namespace Pole.Core.EventBus
namespace Pole.EventBus
{
public class ObserverUnitContainer : IObserverUnitContainer
{
......
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>netstandard2.1</TargetFramework>
</PropertyGroup>
<ItemGroup>
<ProjectReference Include="..\Pole.Core\Pole.Core.csproj" />
</ItemGroup>
</Project>
using Microsoft.Extensions.DependencyInjection;
using Pole.Core;
using Pole.Core.Processor;
using Pole.EventBus.Processor;
using Pole.EventBus.Processor.Server;
using Pole.EventBus.UnitOfWork;
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.EventBus
{
public static class PoleEventBusStartupConfigExtensions
{
public static void AddEventBus(
this StartupConfig startupOption)
{
startupOption.Services.AddSingleton<IEventBuffer, EventBuffer>();
startupOption.Services.AddScoped<IBus, Bus>();
startupOption.Services.AddSingleton<IObserverUnitContainer, ObserverUnitContainer>();
startupOption.Services.AddSingleton<IProcessor, PendingMessageRetryProcessor>();
startupOption.Services.AddSingleton<IProcessor, ExpiredEventsCollectorProcessor>();
startupOption.Services.AddHostedService<BackgroundServiceBasedProcessorServer>();
startupOption.Services.AddScoped<IUnitOfWork, Pole.EventBus.UnitOfWork.UnitOfWork>();
startupOption.Services.AddSingleton<IEventTypeFinder, EventTypeFinder>();
Startup.Register(async serviceProvider =>
{
});
}
}
}
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Pole.Core.EventBus.EventStorage;
using Pole.Core;
using Pole.Core.Processor;
using Pole.EventBus.EventStorage;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
namespace Pole.Core.Processor
namespace Pole.EventBus.Processor
{
class ExpiredEventsCollectorProcessor : IProcessor
{
......
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Pole.Core.EventBus;
using Pole.Core.EventBus.Event;
using Pole.Core.EventBus.EventStorage;
using Pole.EventBus;
using Pole.EventBus.Event;
using Pole.EventBus.EventStorage;
using Pole.Core.Serialization;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Pole.Core.Processor;
using Pole.Core;
namespace Pole.Core.Processor
namespace Pole.EventBus.Processor
{
class PendingMessageRetryProcessor : ProcessorBase
{
......
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Pole.Core.EventBus.EventStorage;
using Pole.Core.Processor;
using Pole.EventBus.EventStorage;
using System;
using System.Collections.Generic;
using System.Linq;
......@@ -9,7 +10,7 @@ using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Pole.Core.Processor.Server
namespace Pole.EventBus.Processor.Server
{
public class BackgroundServiceBasedProcessorServer : BackgroundService, IProcessorServer
{
......
using Pole.Core.EventBus.EventStorage;
using Pole.EventBus.EventStorage;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
......@@ -6,7 +6,7 @@ using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Pole.Core.EventBus.Transaction
namespace Pole.EventBus.Transaction
{
public interface IDbTransactionAdapter : IDisposable
{
......
using Pole.Core.EventBus;
using Pole.Core.EventBus.Transaction;
using System;
using System;
using System.Collections.Generic;
using System.Data;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Pole.EventBus.Transaction;
namespace Pole.Core.UnitOfWork
namespace Pole.EventBus.UnitOfWork
{
public interface IUnitOfWork : IDisposable
{
......
using Pole.Core.EventBus;
using Pole.Core.EventBus.Transaction;
using System;
using System;
using System.Collections.Generic;
using System.Data;
using System.Linq;
......@@ -9,13 +7,13 @@ using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Pole.Core.Serialization;
using Pole.Core.EventBus.Event;
using Pole.Core.EventBus.EventStorage;
using Microsoft.Extensions.Options;
using Pole.Core.Utils.Abstraction;
using Pole.Core.Exceptions;
using Pole.EventBus.Event;
using Pole.EventBus.Transaction;
namespace Pole.Core.UnitOfWork
namespace Pole.EventBus.UnitOfWork
{
class UnitOfWork : IUnitOfWork
{
......
using Microsoft.EntityFrameworkCore.Storage;
using Microsoft.Extensions.DependencyInjection;
using Pole.Core.EventBus;
using Pole.Core.EventBus.Transaction;
using Pole.Core.UnitOfWork;
using Pole.EventBus;
using Pole.EventBus.Transaction;
using Pole.EventBus.UnitOfWork;
using System;
using System.Collections.Generic;
using System.Data;
......
......@@ -11,6 +11,7 @@
<ItemGroup>
<ProjectReference Include="..\Pole.Core\Pole.Core.csproj" />
<ProjectReference Include="..\Pole.EventBus\Pole.EventBus.csproj" />
</ItemGroup>
</Project>
using Npgsql;
using NpgsqlTypes;
using Pole.Core.EventBus.EventStorage;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
......@@ -8,6 +7,7 @@ using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Dapper;
using Pole.EventBus.EventStorage;
namespace Pole.EventStorage.PostgreSql
{
......
using Microsoft.EntityFrameworkCore;
using Pole.Core;
using Pole.Core.EventBus.EventStorage;
using Pole.Core.EventBus.Transaction;
using Pole.EventBus.EventStorage;
using Pole.EventBus.Transaction;
using Pole.EventStorage.PostgreSql;
using System;
using System.Collections.Generic;
......
using Microsoft.EntityFrameworkCore.Storage;
using Pole.Core.EventBus;
using Pole.Core.EventBus.Event;
using Pole.Core.EventBus.EventStorage;
using Pole.Core.EventBus.Transaction;
using Pole.Core.Serialization;
using Pole.EventBus.Transaction;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
......
......@@ -3,7 +3,7 @@ using Microsoft.EntityFrameworkCore.Storage;
using Microsoft.Extensions.Options;
using Npgsql;
using Pole.Core;
using Pole.Core.EventBus.EventStorage;
using Pole.EventBus.EventStorage;
using System;
using System.Collections.Generic;
using System.Data;
......
......@@ -2,7 +2,7 @@
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Npgsql;
using Pole.Core.EventBus.EventStorage;
using Pole.EventBus.EventStorage;
using System;
using System.Collections.Generic;
using System.Text;
......
......@@ -8,10 +8,9 @@ using Orleans;
using Orleans.Runtime;
using Orleans.Storage;
using Pole.Core.Domain;
using Pole.Core.EventBus;
using Pole.Core.EventBus.Event;
using Pole.Core.EventBus.Transaction;
using Pole.Core.UnitOfWork;
using Pole.EventBus;
using Pole.EventBus.Transaction;
using Pole.EventBus.UnitOfWork;
using System;
using System.Collections.Generic;
using System.Linq;
......
......@@ -10,6 +10,7 @@
<ItemGroup>
<ProjectReference Include="..\Pole.Core\Pole.Core.csproj" />
<ProjectReference Include="..\Pole.EventBus\Pole.EventBus.csproj" />
</ItemGroup>
</Project>
......@@ -6,10 +6,25 @@ namespace Pole.Sagas.Server
{
public class PoleSagasServerOption
{
/// <summary>
/// 从数据库获取未结束的 sagas 的 时间间隔 单位秒
/// </summary>
public int NotEndedSagasFetchIntervalSeconds { get; set; } = 30;
/// <summary>
/// 每个Grpc 获取Sagas 的请求 ,服务端流式返回,每一次返回的间隔时间 单位秒
/// </summary>
public int GetSagasGrpcStreamingResponseDelaySeconds { get; set; } = 20;
/// <summary>
/// 过期数据 批量删除触发的时间间隔,单位秒
/// </summary>
public int ExpiredDataBulkDeleteIntervalSeconds { get; set; } = 10*60;
/// <summary>
/// 过期数据 批量是每一次删除的数量
/// </summary>
public int ExpiredDataDeleteBatchCount { get; set; } = 1000;
/// <summary>
/// 批量删除时 实际过期的数量比预定数量要大时,会分多次删除,此值为其中每次分批删除的时间间隔
/// </summary>
public int ExpiredDataPreBulkDeleteDelaySeconds { get; set; } = 3;
}
}
using Microsoft.Extensions.DependencyInjection;
using Pole.Core.Processor;
using Pole.Sagas.Server;
using Pole.Sagas.Server.Processor;
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.Sagas.Server
namespace Microsoft.Extensions.DependencyInjection
{
public static class PoleSagasServerServiceCollectionExtensions
{
public static IServiceCollection AddPoleSagasServer(IServiceCollection services)
{
public static IServiceCollection AddPoleSagasServer(this IServiceCollection services, Action<PoleSagasServerOption> config = null)
{
Action<PoleSagasServerOption> defaultConfig = option => { };
var finalConfig = config ?? defaultConfig;
services.AddGrpc();
services.Configure(config);
services.AddSingleton<IProcessor, NotEndedSagasFetchProcessor>();
services.AddSingleton<IProcessor, ExpiredSagasCollectorProcessor>();
......
......@@ -9,8 +9,6 @@ namespace Pole.Sagas.Storage.PostgreSql
public string SagaTableName { get; set; } = "Sagas";
public string SchemaName { get; set; } = "pole-sagas";
public string ActivityTableName { get; set; } = "Activities";
public string OvertimeCompensationGuaranteeTableName { get; set; } = "OCG-Activities";
public int SagasRecoveryIntervalSecond { get; set; }
public string ConnectionString { get; set; }
}
}
......@@ -2,7 +2,6 @@
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Npgsql;
using Pole.Core.EventBus.EventStorage;
using Pole.Sagas.Core;
using Pole.Sagas.Core.Abstraction;
using System;
......
using BenchmarkDotNet.Reports;
using BenchmarkDotNet.Running;
using Npgsql;
using Pole.Core.EventBus.EventStorage;
using Pole.Samples.Backet.Api.Benchmarks;
using System;
using System.Collections.Generic;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment