From 5511b5b81f0cb443111474045e112751ac4f8c07 Mon Sep 17 00:00:00 2001 From: dingsongjie Date: Thu, 12 Mar 2020 10:27:04 +0800 Subject: [PATCH] 重构 eventbus 的 代码结构 --- Pole.sln | 16 +++++++++++++++- samples/apis/Backet.Api/Backet.Api.csproj | 1 + samples/apis/Backet.Api/Controllers/BacketController.cs | 5 +---- samples/apis/Backet.Api/Domain/Events/BacketCreatedEvent.cs | 4 ++-- samples/apis/Backet.Api/EventHandlers/Abstraction/IToNoticeBacketCreatedEventHandler.cs | 2 +- samples/apis/Backet.Api/EventHandlers/ToNoticeBacketCreatedEventHandler.cs | 2 +- samples/apis/Backet.Api/Grains/AddBacketGrain.cs | 4 ++-- samples/apis/SagasServer/Program.cs | 24 ++++++++++++++++++++++++ samples/apis/SagasServer/Properties/launchSettings.json | 27 +++++++++++++++++++++++++++ samples/apis/SagasServer/SagasServer.csproj | 15 +++++++++++++++ samples/apis/SagasServer/Startup.cs | 36 ++++++++++++++++++++++++++++++++++++ samples/apis/SagasServer/appsettings.Development.json | 9 +++++++++ samples/apis/SagasServer/appsettings.json | 8 ++++++++ samples/apis/SagasTest.Api/Startup.cs | 14 +++++++------- src/Pole.Core/Domain/Entity.cs | 3 +-- src/Pole.Core/Domain/IEvent.cs | 10 ++++++++++ src/Pole.Core/EventBus/Bus.cs | 65 ----------------------------------------------------------------- src/Pole.Core/EventBus/Consumer.cs | 33 --------------------------------- src/Pole.Core/EventBus/Event/EventBytesTransport.cs | 61 ------------------------------------------------------------- src/Pole.Core/EventBus/Event/EventInfoAttribute.cs | 12 ------------ src/Pole.Core/EventBus/Event/IEvent.cs | 10 ---------- src/Pole.Core/EventBus/EventBuffer.cs | 143 ----------------------------------------------------------------------------------------------------------------------------------------------- src/Pole.Core/EventBus/EventHandler/IPoleEventHandler.cs | 23 ----------------------- src/Pole.Core/EventBus/EventHandler/PoleEventHandler.cs | 96 ------------------------------------------------------------------------------------------------ src/Pole.Core/EventBus/EventStorage/EventEntity.cs | 17 ----------------- src/Pole.Core/EventBus/EventStorage/EventStatus.cs | 13 ------------- src/Pole.Core/EventBus/EventStorage/IEventStorage.cs | 22 ---------------------- src/Pole.Core/EventBus/EventStorage/IEventStorageInitializer.cs | 14 -------------- src/Pole.Core/EventBus/IBus.cs | 19 ------------------- src/Pole.Core/EventBus/IConsumer.cs | 11 ----------- src/Pole.Core/EventBus/IConsumerContainer.cs | 9 --------- src/Pole.Core/EventBus/IEventBuffer.cs | 13 ------------- src/Pole.Core/EventBus/IGrainID.cs | 9 --------- src/Pole.Core/EventBus/IObserverUnit.cs | 12 ------------ src/Pole.Core/EventBus/IObserverUnitContainer.cs | 12 ------------ src/Pole.Core/EventBus/IProducer.cs | 11 ----------- src/Pole.Core/EventBus/IProducerInfoContainer.cs | 10 ---------- src/Pole.Core/EventBus/ObserverUnit.cs | 116 -------------------------------------------------------------------------------------------------------------------- src/Pole.Core/EventBus/ObserverUnitContainer.cs | 87 --------------------------------------------------------------------------------------- src/Pole.Core/EventBus/Transaction/IDbTransactionAdapter.cs | 17 ----------------- src/Pole.Core/Extensions/PoleServiceCollectionExtensions.cs | 13 ------------- src/Pole.Core/Grains/PoleGrain.cs | 1 - src/Pole.Core/Processor/ExpiredEventsCollectorProcessor.cs | 70 ---------------------------------------------------------------------- src/Pole.Core/Processor/PendingMessageRetryProcessor.cs | 91 ------------------------------------------------------------------------------------------- src/Pole.Core/Processor/Server/BackgroundServiceBasedProcessorServer.cs | 47 ----------------------------------------------- src/Pole.Core/Serialization/EventTypeFinder.cs | 67 ------------------------------------------------------------------- src/Pole.Core/UnitOfWork/IUnitOfWork.cs | 20 -------------------- src/Pole.Core/UnitOfWork/UnitOfWork.cs | 75 --------------------------------------------------------------------------- src/Pole.Core/Utils/Abstraction/IEventTypeFinder.cs | 12 ------------ src/Pole.EventBus.Rabbitmq/Consumer/RabbitConsumer.cs | 1 - src/Pole.EventBus.Rabbitmq/Core/EventBusContainer.cs | 6 +++--- src/Pole.EventBus.Rabbitmq/Core/IRabbitEventBusContainer.cs | 1 - src/Pole.EventBus.Rabbitmq/Core/RabbitEventBus.cs | 4 +--- src/Pole.EventBus.Rabbitmq/Pole.EventBus.Rabbitmq.csproj | 1 + src/Pole.EventBus.Rabbitmq/PoleRabbitmqStartupConfigExtensions.cs | 2 +- src/Pole.EventBus.Rabbitmq/Producer/RabbitProducer.cs | 1 - src/Pole.EventBus/Bus.cs | 65 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/Pole.EventBus/Consumer.cs | 33 +++++++++++++++++++++++++++++++++ src/Pole.EventBus/Event/EventBytesTransport.cs | 61 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/Pole.EventBus/Event/EventInfoAttribute.cs | 12 ++++++++++++ src/Pole.EventBus/EventBuffer.cs | 144 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/Pole.EventBus/EventHandler/IPoleEventHandler.cs | 23 +++++++++++++++++++++++ src/Pole.EventBus/EventHandler/PoleEventHandler.cs | 96 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/Pole.EventBus/EventStorage/EventEntity.cs | 17 +++++++++++++++++ src/Pole.EventBus/EventStorage/EventStatus.cs | 13 +++++++++++++ src/Pole.EventBus/EventStorage/IEventStorage.cs | 22 ++++++++++++++++++++++ src/Pole.EventBus/EventStorage/IEventStorageInitializer.cs | 14 ++++++++++++++ src/Pole.EventBus/EventTypeFinder.cs | 68 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/Pole.EventBus/IBus.cs | 19 +++++++++++++++++++ src/Pole.EventBus/IConsumer.cs | 11 +++++++++++ src/Pole.EventBus/IConsumerContainer.cs | 9 +++++++++ src/Pole.EventBus/IEventBuffer.cs | 13 +++++++++++++ src/Pole.EventBus/IEventTypeFinder.cs | 12 ++++++++++++ src/Pole.EventBus/IGrainID.cs | 9 +++++++++ src/Pole.EventBus/IObserverUnit.cs | 12 ++++++++++++ src/Pole.EventBus/IObserverUnitContainer.cs | 12 ++++++++++++ src/Pole.EventBus/IProducer.cs | 11 +++++++++++ src/Pole.EventBus/IProducerInfoContainer.cs | 10 ++++++++++ src/Pole.EventBus/ObserverUnit.cs | 116 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/Pole.EventBus/ObserverUnitContainer.cs | 87 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/Pole.EventBus/Pole.EventBus.csproj | 11 +++++++++++ src/Pole.EventBus/PoleEventBusStartupConfigExtensions.cs | 33 +++++++++++++++++++++++++++++++++ src/Pole.EventBus/Processor/ExpiredEventsCollectorProcessor.cs | 72 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/Pole.EventBus/Processor/PendingMessageRetryProcessor.cs | 93 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/Pole.EventBus/Processor/Server/BackgroundServiceBasedProcessorServer.cs | 48 ++++++++++++++++++++++++++++++++++++++++++++++++ src/Pole.EventBus/Transaction/IDbTransactionAdapter.cs | 17 +++++++++++++++++ src/Pole.EventBus/UnitOfWork/IUnitOfWork.cs | 19 +++++++++++++++++++ src/Pole.EventBus/UnitOfWork/UnitOfWork.cs | 73 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/Pole.EventStorage.PostgreSql/IUnitOfWorkExtensions.cs | 5 +++-- src/Pole.EventStorage.PostgreSql/Pole.EventStorage.PostgreSql.csproj | 1 + src/Pole.EventStorage.PostgreSql/PoleNpgsqlBulkUploader.cs | 2 +- src/Pole.EventStorage.PostgreSql/PolePostgreSqlStartupConfigExtensions.cs | 4 ++-- src/Pole.EventStorage.PostgreSql/PostgreSqlDbTransactionAdapter.cs | 5 +---- src/Pole.EventStorage.PostgreSql/PostgreSqlEventStorage.cs | 2 +- src/Pole.EventStorage.PostgreSql/PostgreSqlEventStorageInitializer.cs | 2 +- src/Pole.Orleans.Provider.EntityframeworkCore/GrainStorage.cs | 7 +++---- src/Pole.Orleans.Provider.EntityframeworkCore/Pole.Orleans.Provider.EntityframeworkCore.csproj | 1 + src/Pole.Sagas.Server/PoleSagasServerOption.cs | 15 +++++++++++++++ src/Pole.Sagas.Server/PoleSagasServerServiceCollectionExtensions.cs | 10 +++++++--- src/Pole.Sagas.Storage.PostgreSql/PoleSagasStoragePostgreSqlOption.cs | 2 -- src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorageInitializer.cs | 1 - test/Pole.Samples.Backet.Api/Program.cs | 1 - 102 files changed, 1457 insertions(+), 1283 deletions(-) create mode 100644 samples/apis/SagasServer/Program.cs create mode 100644 samples/apis/SagasServer/Properties/launchSettings.json create mode 100644 samples/apis/SagasServer/SagasServer.csproj create mode 100644 samples/apis/SagasServer/Startup.cs create mode 100644 samples/apis/SagasServer/appsettings.Development.json create mode 100644 samples/apis/SagasServer/appsettings.json create mode 100644 src/Pole.Core/Domain/IEvent.cs delete mode 100644 src/Pole.Core/EventBus/Bus.cs delete mode 100644 src/Pole.Core/EventBus/Consumer.cs delete mode 100644 src/Pole.Core/EventBus/Event/EventBytesTransport.cs delete mode 100644 src/Pole.Core/EventBus/Event/EventInfoAttribute.cs delete mode 100644 src/Pole.Core/EventBus/Event/IEvent.cs delete mode 100644 src/Pole.Core/EventBus/EventBuffer.cs delete mode 100644 src/Pole.Core/EventBus/EventHandler/IPoleEventHandler.cs delete mode 100644 src/Pole.Core/EventBus/EventHandler/PoleEventHandler.cs delete mode 100644 src/Pole.Core/EventBus/EventStorage/EventEntity.cs delete mode 100644 src/Pole.Core/EventBus/EventStorage/EventStatus.cs delete mode 100644 src/Pole.Core/EventBus/EventStorage/IEventStorage.cs delete mode 100644 src/Pole.Core/EventBus/EventStorage/IEventStorageInitializer.cs delete mode 100644 src/Pole.Core/EventBus/IBus.cs delete mode 100644 src/Pole.Core/EventBus/IConsumer.cs delete mode 100644 src/Pole.Core/EventBus/IConsumerContainer.cs delete mode 100644 src/Pole.Core/EventBus/IEventBuffer.cs delete mode 100644 src/Pole.Core/EventBus/IGrainID.cs delete mode 100644 src/Pole.Core/EventBus/IObserverUnit.cs delete mode 100644 src/Pole.Core/EventBus/IObserverUnitContainer.cs delete mode 100644 src/Pole.Core/EventBus/IProducer.cs delete mode 100644 src/Pole.Core/EventBus/IProducerInfoContainer.cs delete mode 100644 src/Pole.Core/EventBus/ObserverUnit.cs delete mode 100644 src/Pole.Core/EventBus/ObserverUnitContainer.cs delete mode 100644 src/Pole.Core/EventBus/Transaction/IDbTransactionAdapter.cs delete mode 100644 src/Pole.Core/Processor/ExpiredEventsCollectorProcessor.cs delete mode 100644 src/Pole.Core/Processor/PendingMessageRetryProcessor.cs delete mode 100644 src/Pole.Core/Processor/Server/BackgroundServiceBasedProcessorServer.cs delete mode 100644 src/Pole.Core/Serialization/EventTypeFinder.cs delete mode 100644 src/Pole.Core/UnitOfWork/IUnitOfWork.cs delete mode 100644 src/Pole.Core/UnitOfWork/UnitOfWork.cs delete mode 100644 src/Pole.Core/Utils/Abstraction/IEventTypeFinder.cs create mode 100644 src/Pole.EventBus/Bus.cs create mode 100644 src/Pole.EventBus/Consumer.cs create mode 100644 src/Pole.EventBus/Event/EventBytesTransport.cs create mode 100644 src/Pole.EventBus/Event/EventInfoAttribute.cs create mode 100644 src/Pole.EventBus/EventBuffer.cs create mode 100644 src/Pole.EventBus/EventHandler/IPoleEventHandler.cs create mode 100644 src/Pole.EventBus/EventHandler/PoleEventHandler.cs create mode 100644 src/Pole.EventBus/EventStorage/EventEntity.cs create mode 100644 src/Pole.EventBus/EventStorage/EventStatus.cs create mode 100644 src/Pole.EventBus/EventStorage/IEventStorage.cs create mode 100644 src/Pole.EventBus/EventStorage/IEventStorageInitializer.cs create mode 100644 src/Pole.EventBus/EventTypeFinder.cs create mode 100644 src/Pole.EventBus/IBus.cs create mode 100644 src/Pole.EventBus/IConsumer.cs create mode 100644 src/Pole.EventBus/IConsumerContainer.cs create mode 100644 src/Pole.EventBus/IEventBuffer.cs create mode 100644 src/Pole.EventBus/IEventTypeFinder.cs create mode 100644 src/Pole.EventBus/IGrainID.cs create mode 100644 src/Pole.EventBus/IObserverUnit.cs create mode 100644 src/Pole.EventBus/IObserverUnitContainer.cs create mode 100644 src/Pole.EventBus/IProducer.cs create mode 100644 src/Pole.EventBus/IProducerInfoContainer.cs create mode 100644 src/Pole.EventBus/ObserverUnit.cs create mode 100644 src/Pole.EventBus/ObserverUnitContainer.cs create mode 100644 src/Pole.EventBus/Pole.EventBus.csproj create mode 100644 src/Pole.EventBus/PoleEventBusStartupConfigExtensions.cs create mode 100644 src/Pole.EventBus/Processor/ExpiredEventsCollectorProcessor.cs create mode 100644 src/Pole.EventBus/Processor/PendingMessageRetryProcessor.cs create mode 100644 src/Pole.EventBus/Processor/Server/BackgroundServiceBasedProcessorServer.cs create mode 100644 src/Pole.EventBus/Transaction/IDbTransactionAdapter.cs create mode 100644 src/Pole.EventBus/UnitOfWork/IUnitOfWork.cs create mode 100644 src/Pole.EventBus/UnitOfWork/UnitOfWork.cs diff --git a/Pole.sln b/Pole.sln index 01ffaa1..74fbcd6 100644 --- a/Pole.sln +++ b/Pole.sln @@ -43,7 +43,11 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SagasTest.Api", "samples\ap EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Pole.Sagas.Server", "src\Pole.Sagas.Server\Pole.Sagas.Server.csproj", "{34ECE24E-0D78-4764-BC54-0CEE61BDB96A}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Pole.Sagas.Client", "src\Pole.Sagas.Client\Pole.Sagas.Client.csproj", "{EED4FEA7-4E20-41B2-9741-55FCA709373E}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Pole.Sagas.Client", "src\Pole.Sagas.Client\Pole.Sagas.Client.csproj", "{EED4FEA7-4E20-41B2-9741-55FCA709373E}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SagasServer", "samples\apis\SagasServer\SagasServer.csproj", "{9717D0D7-9288-4B2F-9830-A06E13B1C8C4}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Pole.EventBus", "src\Pole.EventBus\Pole.EventBus.csproj", "{A92E194A-518F-4A93-B346-0FEA50CCD614}" EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution @@ -115,6 +119,14 @@ Global {EED4FEA7-4E20-41B2-9741-55FCA709373E}.Debug|Any CPU.Build.0 = Debug|Any CPU {EED4FEA7-4E20-41B2-9741-55FCA709373E}.Release|Any CPU.ActiveCfg = Release|Any CPU {EED4FEA7-4E20-41B2-9741-55FCA709373E}.Release|Any CPU.Build.0 = Release|Any CPU + {9717D0D7-9288-4B2F-9830-A06E13B1C8C4}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {9717D0D7-9288-4B2F-9830-A06E13B1C8C4}.Debug|Any CPU.Build.0 = Debug|Any CPU + {9717D0D7-9288-4B2F-9830-A06E13B1C8C4}.Release|Any CPU.ActiveCfg = Release|Any CPU + {9717D0D7-9288-4B2F-9830-A06E13B1C8C4}.Release|Any CPU.Build.0 = Release|Any CPU + {A92E194A-518F-4A93-B346-0FEA50CCD614}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {A92E194A-518F-4A93-B346-0FEA50CCD614}.Debug|Any CPU.Build.0 = Debug|Any CPU + {A92E194A-518F-4A93-B346-0FEA50CCD614}.Release|Any CPU.ActiveCfg = Release|Any CPU + {A92E194A-518F-4A93-B346-0FEA50CCD614}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -138,6 +150,8 @@ Global {6138197E-6202-4E1B-9458-3CBEE60A36F9} = {475116FC-DEEC-4255-94E4-AE7B8C85038D} {34ECE24E-0D78-4764-BC54-0CEE61BDB96A} = {9932C965-8B38-4F70-9E43-86DC56860E2B} {EED4FEA7-4E20-41B2-9741-55FCA709373E} = {9932C965-8B38-4F70-9E43-86DC56860E2B} + {9717D0D7-9288-4B2F-9830-A06E13B1C8C4} = {475116FC-DEEC-4255-94E4-AE7B8C85038D} + {A92E194A-518F-4A93-B346-0FEA50CCD614} = {9932C965-8B38-4F70-9E43-86DC56860E2B} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {DB0775A3-F293-4043-ADB7-72BAC081E87E} diff --git a/samples/apis/Backet.Api/Backet.Api.csproj b/samples/apis/Backet.Api/Backet.Api.csproj index 4e5d3f8..4a69537 100644 --- a/samples/apis/Backet.Api/Backet.Api.csproj +++ b/samples/apis/Backet.Api/Backet.Api.csproj @@ -49,6 +49,7 @@ + diff --git a/samples/apis/Backet.Api/Controllers/BacketController.cs b/samples/apis/Backet.Api/Controllers/BacketController.cs index 0bc65ea..2540106 100644 --- a/samples/apis/Backet.Api/Controllers/BacketController.cs +++ b/samples/apis/Backet.Api/Controllers/BacketController.cs @@ -17,13 +17,10 @@ using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Npgsql; using Orleans; -using Pole.Core.EventBus; -using Pole.Core.EventBus.Event; -using Pole.Core.EventBus.EventHandler; -using Pole.Core.EventBus.EventStorage; using Pole.Core.Serialization; using Pole.Core.UnitOfWork; using Pole.Core.Utils.Abstraction; +using Pole.EventBus; namespace Backet.Api.Controllers { diff --git a/samples/apis/Backet.Api/Domain/Events/BacketCreatedEvent.cs b/samples/apis/Backet.Api/Domain/Events/BacketCreatedEvent.cs index c667e3b..1e8a517 100644 --- a/samples/apis/Backet.Api/Domain/Events/BacketCreatedEvent.cs +++ b/samples/apis/Backet.Api/Domain/Events/BacketCreatedEvent.cs @@ -1,4 +1,4 @@ -using Pole.Core.EventBus.Event; +using Pole.Core.Domain; using System; using System.Collections.Generic; using System.Linq; @@ -7,7 +7,7 @@ using System.Threading.Tasks; namespace Backet.Api.Domain.Event { - [EventInfo(EventName = "Backet")] + [Pole.EventBus.Event.EventInfo(EventName = "Backet")] public class BacketCreatedEvent : IEvent { public string BacketId { get; set; } diff --git a/samples/apis/Backet.Api/EventHandlers/Abstraction/IToNoticeBacketCreatedEventHandler.cs b/samples/apis/Backet.Api/EventHandlers/Abstraction/IToNoticeBacketCreatedEventHandler.cs index 23f75ec..e06612d 100644 --- a/samples/apis/Backet.Api/EventHandlers/Abstraction/IToNoticeBacketCreatedEventHandler.cs +++ b/samples/apis/Backet.Api/EventHandlers/Abstraction/IToNoticeBacketCreatedEventHandler.cs @@ -1,5 +1,5 @@ using Backet.Api.Domain.Event; -using Pole.Core.EventBus.EventHandler; +using Pole.EventBus.EventHandler; using System; using System.Collections.Generic; using System.Linq; diff --git a/samples/apis/Backet.Api/EventHandlers/ToNoticeBacketCreatedEventHandler.cs b/samples/apis/Backet.Api/EventHandlers/ToNoticeBacketCreatedEventHandler.cs index dbfd1e4..ef4ac63 100644 --- a/samples/apis/Backet.Api/EventHandlers/ToNoticeBacketCreatedEventHandler.cs +++ b/samples/apis/Backet.Api/EventHandlers/ToNoticeBacketCreatedEventHandler.cs @@ -1,6 +1,6 @@ using Backet.Api.Domain.Event; using Backet.Api.EventHandlers.Abstraction; -using Pole.Core.EventBus.EventHandler; +using Pole.EventBus.EventHandler; using System; using System.Collections.Generic; using System.Linq; diff --git a/samples/apis/Backet.Api/Grains/AddBacketGrain.cs b/samples/apis/Backet.Api/Grains/AddBacketGrain.cs index ff82a9f..9f69018 100644 --- a/samples/apis/Backet.Api/Grains/AddBacketGrain.cs +++ b/samples/apis/Backet.Api/Grains/AddBacketGrain.cs @@ -8,8 +8,8 @@ using System.Threading.Tasks; using Microsoft.Extensions.DependencyInjection; using Backet.Api.Domain.Event; using Pole.Core.UnitOfWork; -using Pole.Core.EventBus.Transaction; -using Pole.Core.EventBus; +using Pole.EventBus.UnitOfWork; +using Pole.EventBus; namespace Backet.Api.Grains { diff --git a/samples/apis/SagasServer/Program.cs b/samples/apis/SagasServer/Program.cs new file mode 100644 index 0000000..61c1b67 --- /dev/null +++ b/samples/apis/SagasServer/Program.cs @@ -0,0 +1,24 @@ +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Threading.Tasks; +using Microsoft.AspNetCore; +using Microsoft.AspNetCore.Hosting; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.Logging; + +namespace SagasServer +{ + public class Program + { + public static void Main(string[] args) + { + CreateWebHostBuilder(args).Build().Run(); + } + + public static IWebHostBuilder CreateWebHostBuilder(string[] args) => + WebHost.CreateDefaultBuilder(args) + .UseStartup(); + } +} diff --git a/samples/apis/SagasServer/Properties/launchSettings.json b/samples/apis/SagasServer/Properties/launchSettings.json new file mode 100644 index 0000000..68ff505 --- /dev/null +++ b/samples/apis/SagasServer/Properties/launchSettings.json @@ -0,0 +1,27 @@ +{ + "iisSettings": { + "windowsAuthentication": false, + "anonymousAuthentication": true, + "iisExpress": { + "applicationUrl": "http://localhost:53309", + "sslPort": 44342 + } + }, + "profiles": { + "IIS Express": { + "commandName": "IISExpress", + "launchBrowser": true, + "environmentVariables": { + "ASPNETCORE_ENVIRONMENT": "Development" + } + }, + "SagasServer": { + "commandName": "Project", + "launchBrowser": true, + "applicationUrl": "https://localhost:5001;http://localhost:5000", + "environmentVariables": { + "ASPNETCORE_ENVIRONMENT": "Development" + } + } + } +} \ No newline at end of file diff --git a/samples/apis/SagasServer/SagasServer.csproj b/samples/apis/SagasServer/SagasServer.csproj new file mode 100644 index 0000000..d0095d1 --- /dev/null +++ b/samples/apis/SagasServer/SagasServer.csproj @@ -0,0 +1,15 @@ + + + + netcoreapp3.1 + InProcess + + + + + + + + + + diff --git a/samples/apis/SagasServer/Startup.cs b/samples/apis/SagasServer/Startup.cs new file mode 100644 index 0000000..f42a2e6 --- /dev/null +++ b/samples/apis/SagasServer/Startup.cs @@ -0,0 +1,36 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using Microsoft.AspNetCore.Builder; +using Microsoft.AspNetCore.Hosting; +using Microsoft.AspNetCore.Http; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; + +namespace SagasServer +{ + public class Startup + { + // This method gets called by the runtime. Use this method to add services to the container. + // For more information on how to configure your application, visit https://go.microsoft.com/fwlink/?LinkID=398940 + public void ConfigureServices(IServiceCollection services) + { + services.AddPoleSagasServer(); + } + + // This method gets called by the runtime. Use this method to configure the HTTP request pipeline. + public void Configure(IApplicationBuilder app, IWebHostEnvironment env) + { + if (env.IsDevelopment()) + { + app.UseDeveloperExceptionPage(); + } + + app.Run(async (context) => + { + await context.Response.WriteAsync("Hello World!"); + }); + } + } +} diff --git a/samples/apis/SagasServer/appsettings.Development.json b/samples/apis/SagasServer/appsettings.Development.json new file mode 100644 index 0000000..e203e94 --- /dev/null +++ b/samples/apis/SagasServer/appsettings.Development.json @@ -0,0 +1,9 @@ +{ + "Logging": { + "LogLevel": { + "Default": "Debug", + "System": "Information", + "Microsoft": "Information" + } + } +} diff --git a/samples/apis/SagasServer/appsettings.json b/samples/apis/SagasServer/appsettings.json new file mode 100644 index 0000000..def9159 --- /dev/null +++ b/samples/apis/SagasServer/appsettings.json @@ -0,0 +1,8 @@ +{ + "Logging": { + "LogLevel": { + "Default": "Warning" + } + }, + "AllowedHosts": "*" +} diff --git a/samples/apis/SagasTest.Api/Startup.cs b/samples/apis/SagasTest.Api/Startup.cs index c51f30a..088635c 100644 --- a/samples/apis/SagasTest.Api/Startup.cs +++ b/samples/apis/SagasTest.Api/Startup.cs @@ -33,13 +33,13 @@ namespace SagasTest.Api services.AddControllers(); services.AddPole(config => { - config.AddRabbitMQ(option => - { - option.Hosts = new string[1] { Configuration["RabbitmqConfig:HostAddress"] }; - option.Password = Configuration["RabbitmqConfig:HostPassword"]; - option.UserName = Configuration["RabbitmqConfig:HostUserName"]; - }); - config.AddEntityFrameworkEventStorage(); + //config.AddRabbitMQ(option => + //{ + // option.Hosts = new string[1] { Configuration["RabbitmqConfig:HostAddress"] }; + // option.Password = Configuration["RabbitmqConfig:HostPassword"]; + // option.UserName = Configuration["RabbitmqConfig:HostUserName"]; + //}); + //config.AddEntityFrameworkEventStorage(); config.AddSagas(option=> { option.ServiceName = "SagasTest"; }); diff --git a/src/Pole.Core/Domain/Entity.cs b/src/Pole.Core/Domain/Entity.cs index 2a7e1fd..9baed89 100644 --- a/src/Pole.Core/Domain/Entity.cs +++ b/src/Pole.Core/Domain/Entity.cs @@ -1,5 +1,4 @@ -using Pole.Core.EventBus.Event; -using System; +using System; using System.Collections.Generic; using System.Text; diff --git a/src/Pole.Core/Domain/IEvent.cs b/src/Pole.Core/Domain/IEvent.cs new file mode 100644 index 0000000..11a0ae5 --- /dev/null +++ b/src/Pole.Core/Domain/IEvent.cs @@ -0,0 +1,10 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace Pole.Core.Domain +{ + public interface IEvent + { + } +} diff --git a/src/Pole.Core/EventBus/Bus.cs b/src/Pole.Core/EventBus/Bus.cs deleted file mode 100644 index 0651532..0000000 --- a/src/Pole.Core/EventBus/Bus.cs +++ /dev/null @@ -1,65 +0,0 @@ - -using Pole.Core.EventBus.Event; -using Pole.Core.EventBus.EventStorage; -using Pole.Core.EventBus.Transaction; -using Pole.Core.Serialization; -using Pole.Core.Utils.Abstraction; -using System; -using System.Collections.Concurrent; -using System.Collections.Generic; -using System.Text; -using System.Threading; -using System.Threading.Tasks; - -namespace Pole.Core.EventBus -{ - class Bus : IBus - { - private readonly IEventTypeFinder eventTypeFinder; - private readonly ISerializer serializer; - private readonly ISnowflakeIdGenerator snowflakeIdGenerator; - private readonly IEventStorage eventStorage; - public IDbTransactionAdapter Transaction { get; set; } - - public IServiceProvider ServiceProvider { get; } - public BlockingCollection PrePublishEventBuffer { get; } = new BlockingCollection(new ConcurrentQueue()); - - public Bus(IServiceProvider serviceProvider, IEventTypeFinder eventTypeFinder, ISerializer serializer, ISnowflakeIdGenerator snowflakeIdGenerator, IEventStorage eventStorage) - { - ServiceProvider = serviceProvider; - this.eventTypeFinder = eventTypeFinder; - this.serializer = serializer; - this.snowflakeIdGenerator = snowflakeIdGenerator; - this.eventStorage = eventStorage; - } - public async Task Publish(object @event, CancellationToken cancellationToken = default) - { - var eventType = @event.GetType(); - var eventTypeCode = eventTypeFinder.GetCode(eventType); - var eventId = snowflakeIdGenerator.NextId(); - var eventContent = serializer.Serialize(@event, eventType); - var eventEntity = new EventEntity - { - Added = DateTime.UtcNow, - Content = eventContent, - ExpiresAt = null, - Id = eventId, - Name = eventTypeCode, - Retries = 0, - StatusName = nameof(EventStatus.Pending) - }; - if (Transaction?.DbTransaction == null) - { - await eventStorage.StoreMessage(eventEntity); - } - else - { - await eventStorage.StoreMessage(eventEntity, Transaction.DbTransaction); - - } - PrePublishEventBuffer.Add(eventEntity); - - return true; - } - } -} diff --git a/src/Pole.Core/EventBus/Consumer.cs b/src/Pole.Core/EventBus/Consumer.cs deleted file mode 100644 index 15d2857..0000000 --- a/src/Pole.Core/EventBus/Consumer.cs +++ /dev/null @@ -1,33 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Threading.Tasks; - -namespace Pole.Core.EventBus -{ - public abstract class Consumer : IConsumer - { - readonly List> eventHandlers; - readonly List, Task>> batchEventHandlers; - public Consumer( - List> eventHandlers, - List, Task>> batchEventHandlers) - { - this.eventHandlers = eventHandlers; - this.batchEventHandlers = batchEventHandlers; - } - public void AddHandler(Func func) - { - eventHandlers.Add(func); - } - public Task Notice(byte[] bytes) - { - return Task.WhenAll(eventHandlers.Select(func => func(bytes))); - } - - public Task Notice(List list) - { - return Task.WhenAll(batchEventHandlers.Select(func => func(list))); - } - } -} diff --git a/src/Pole.Core/EventBus/Event/EventBytesTransport.cs b/src/Pole.Core/EventBus/Event/EventBytesTransport.cs deleted file mode 100644 index 48cd12b..0000000 --- a/src/Pole.Core/EventBus/Event/EventBytesTransport.cs +++ /dev/null @@ -1,61 +0,0 @@ -using Pole.Core.Serialization; -using Pole.Core.Utils; -using System; -using System.Collections.Generic; -using System.Text; - -namespace Pole.Core.EventBus.Event -{ - public readonly struct EventBytesTransport - { - public EventBytesTransport(string eventCode, string eventId, byte[] eventBytes) - { - EventTypeCode = eventCode; - EventBytes = eventBytes; - EventId = eventId; - } - /// - /// 每个类型的Event 全局唯一 - /// - public string EventId { get; } - /// - /// 事件TypeCode - /// - public string EventTypeCode { get; } - /// - /// 事件本身的bytes - /// - public byte[] EventBytes { get; } - public byte[] GetBytes() - { - var eventTypeBytes = Encoding.UTF8.GetBytes(EventTypeCode); - var eventIdBytes = Encoding.UTF8.GetBytes(EventId); - using var ms = new PooledMemoryStream(); - ms.WriteByte((byte)TransportType.Event); - ms.Write(BitConverter.GetBytes((ushort)eventTypeBytes.Length)); - ms.Write(BitConverter.GetBytes((ushort)eventIdBytes.Length)); - ms.Write(BitConverter.GetBytes(EventBytes.Length)); - ms.Write(eventTypeBytes); - ms.Write(eventIdBytes); - ms.Write(EventBytes); - return ms.ToArray(); - } - public static (bool success, EventBytesTransport transport) FromBytes(byte[] bytes) - { - if (bytes[0] == (byte)TransportType.Event) - { - var bytesSpan = bytes.AsSpan(); - var eventTypeCodeLength = BitConverter.ToUInt16(bytesSpan.Slice(1, sizeof(ushort))); - var eventIdLength = BitConverter.ToUInt16(bytesSpan.Slice(1 + sizeof(ushort), sizeof(ushort))); - var eventBytesLength = BitConverter.ToInt32(bytesSpan.Slice(1 + 2 * sizeof(ushort), sizeof(int))); - var skipLength = 2 * sizeof(ushort) + 1 + sizeof(int); - return (true, new EventBytesTransport( - Encoding.UTF8.GetString(bytesSpan.Slice(skipLength, eventTypeCodeLength)), - Encoding.UTF8.GetString(bytesSpan.Slice(skipLength + eventTypeCodeLength, eventIdLength)), - bytesSpan.Slice(skipLength + eventTypeCodeLength + eventIdLength, eventBytesLength).ToArray() - )); - } - return (false, default); - } - } -} diff --git a/src/Pole.Core/EventBus/Event/EventInfoAttribute.cs b/src/Pole.Core/EventBus/Event/EventInfoAttribute.cs deleted file mode 100644 index b5e08a3..0000000 --- a/src/Pole.Core/EventBus/Event/EventInfoAttribute.cs +++ /dev/null @@ -1,12 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Text; - -namespace Pole.Core.EventBus.Event -{ - [AttributeUsage(AttributeTargets.Class)] - public class EventInfoAttribute: Attribute - { - public string EventName { get; set; } - } -} diff --git a/src/Pole.Core/EventBus/Event/IEvent.cs b/src/Pole.Core/EventBus/Event/IEvent.cs deleted file mode 100644 index 1b80f3d..0000000 --- a/src/Pole.Core/EventBus/Event/IEvent.cs +++ /dev/null @@ -1,10 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Text; - -namespace Pole.Core.EventBus.Event -{ - public interface IEvent - { - } -} diff --git a/src/Pole.Core/EventBus/EventBuffer.cs b/src/Pole.Core/EventBus/EventBuffer.cs deleted file mode 100644 index 9a2e8d6..0000000 --- a/src/Pole.Core/EventBus/EventBuffer.cs +++ /dev/null @@ -1,143 +0,0 @@ -using Microsoft.Extensions.Logging; -using Microsoft.Extensions.Options; -using Pole.Core.EventBus.Event; -using Pole.Core.EventBus.EventStorage; -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading; -using System.Threading.Tasks; -using System.Threading.Tasks.Dataflow; - -namespace Pole.Core.EventBus -{ - class EventBuffer : IEventBuffer - { - readonly BufferBlock buffer = new BufferBlock(); - private int autoConsuming = 0; - private readonly ILogger logger; - /// - /// 批量数据处理每次处理的最大数据量 - /// - private readonly int maxBatchSize = 10000; - /// - /// 批量数据接收的最大延时 - /// - private readonly int maxMillisecondsDelay = 2000; - private readonly IProducerInfoContainer producerContainer; - private readonly IProducer producer; - private readonly IEventStorage eventStorage; - private readonly PoleOptions options; - private Task waitToReadTask; - public EventBuffer(ILogger logger, IProducerInfoContainer producerContainer, IProducer producer, IEventStorage eventStorage, IOptions options) - { - this.logger = logger; - this.producerContainer = producerContainer; - this.producer = producer; - this.eventStorage = eventStorage; - this.options = options.Value; - } - public async Task AddAndRun(EventEntity eventEntity) - { - if (!buffer.Post(eventEntity)) - return await buffer.SendAsync(eventEntity); - if (autoConsuming == 0) - ActiveAutoExecute(); - - return true; - } - private void ActiveAutoExecute() - { - if (autoConsuming == 0) - ThreadPool.QueueUserWorkItem(ActiveConsumer); - async void ActiveConsumer(object state) - { - if (Interlocked.CompareExchange(ref autoConsuming, 1, 0) == 0) - { - try - { - while (await WaitToReadAsync()) - { - try - { - await Execute(); - } - catch (Exception ex) - { - logger.LogError(ex, ex.Message); - } - } - } - finally - { - Interlocked.Exchange(ref autoConsuming, 0); - } - } - } - } - public async Task WaitToReadAsync() - { - waitToReadTask = buffer.OutputAvailableAsync(); - return await waitToReadTask; - - } - public async Task Execute() - { - if (waitToReadTask.IsCompletedSuccessfully && waitToReadTask.Result) - { - var dataList = new List(); - var startTime = DateTimeOffset.UtcNow; - while (buffer.TryReceive(out var value)) - { - dataList.Add(value); - if (dataList.Count > maxBatchSize) - { - break; - } - else if ((DateTimeOffset.UtcNow - startTime).TotalMilliseconds > maxMillisecondsDelay) - { - break; - } - } - if (dataList.Count > 0) - { - await ExecuteCore(dataList); - } - - } - } - private async Task ExecuteCore(List eventEntities) - { - logger.LogTrace($"Begin ExecuteCore Count:{eventEntities.Count} "); - var events = eventEntities.Select(entity => - { - var eventContentBytes = Encoding.UTF8.GetBytes(entity.Content); - var bytesTransport = new EventBytesTransport(entity.Name, entity.Id, eventContentBytes); - var bytes = bytesTransport.GetBytes(); - var targetName = producerContainer.GetTargetName(entity.Name); - entity.StatusName = nameof(EventStatus.Published); - entity.ExpiresAt = DateTime.UtcNow.AddSeconds(options.PublishedEventsExpiredAfterSeconds); - return (targetName, bytes); - }); - eventEntities.ForEach(entity => - { - entity.StatusName = nameof(EventStatus.Published); - entity.ExpiresAt = DateTime.UtcNow.AddSeconds(options.PublishedEventsExpiredAfterSeconds); - }); - logger.LogTrace($"Begin BulkPublish {DateTime.Now.ToString("yyyy-MM-dd hh:mm:ss")} "); - await producer.BulkPublish(events); - logger.LogTrace($"Begin BulkPublish {DateTime.Now.ToString("yyyy-MM-dd hh:mm:ss")} "); - if (eventEntities.Count > 10) - { - await eventStorage.BulkChangePublishStateAsync(eventEntities); - } - else - { - await eventStorage.ChangePublishStateAsync(eventEntities); - } - - logger.LogTrace($"End ExecuteCore {DateTime.Now.ToString("yyyy-MM-dd hh:mm:ss")} Count:{eventEntities.Count} "); - } - } -} diff --git a/src/Pole.Core/EventBus/EventHandler/IPoleEventHandler.cs b/src/Pole.Core/EventBus/EventHandler/IPoleEventHandler.cs deleted file mode 100644 index 38b77f4..0000000 --- a/src/Pole.Core/EventBus/EventHandler/IPoleEventHandler.cs +++ /dev/null @@ -1,23 +0,0 @@ -using Orleans; -using Pole.Core.EventBus.Event; -using System; -using System.Collections.Generic; -using System.Text; -using System.Threading.Tasks; - -namespace Pole.Core.EventBus.EventHandler -{ - public interface IPoleEventHandler : IPoleEventHandler - { - Task EventHandle(TEvent @event); - } - public interface IPoleBulkEventsHandler : IPoleEventHandler - { - Task BulkEventsHandle(List events); - } - public interface IPoleEventHandler : IGrainWithStringKey - { - public Task Invoke(EventBytesTransport transport); - public Task Invoke(List transports); - } -} diff --git a/src/Pole.Core/EventBus/EventHandler/PoleEventHandler.cs b/src/Pole.Core/EventBus/EventHandler/PoleEventHandler.cs deleted file mode 100644 index 73f2f35..0000000 --- a/src/Pole.Core/EventBus/EventHandler/PoleEventHandler.cs +++ /dev/null @@ -1,96 +0,0 @@ -using Microsoft.Extensions.Logging; -using Orleans.Concurrency; -using Pole.Core.EventBus.Event; -using Pole.Core.Serialization; -using System; -using System.Collections.Generic; -using System.Text; -using System.Threading.Tasks; -using Microsoft.Extensions.DependencyInjection; -using System.Reflection.Emit; -using System.Linq.Expressions; -using System.Linq; -using Pole.Core.Exceptions; -using Orleans; -using Pole.Core.Utils.Abstraction; - -namespace Pole.Core.EventBus.EventHandler -{ - /// - /// - /// - public abstract class PoleEventHandler : Grain - { - private IEventTypeFinder eventTypeFinder; - private ISerializer serializer; - private ILogger logger; - private Type grainType; - - public PoleEventHandler() - { - grainType = GetType(); - } - public override async Task OnActivateAsync() - { - await base.OnActivateAsync(); - await DependencyInjection(); - } - protected virtual Task DependencyInjection() - { - //ConfigOptions = ServiceProvider.GetOptionsByName(typeof(MainGrain).FullName); - serializer = ServiceProvider.GetService(); - eventTypeFinder = ServiceProvider.GetService(); - logger = (ILogger)ServiceProvider.GetService(typeof(ILogger<>).MakeGenericType(grainType)); - return Task.CompletedTask; - } - - public Task Invoke(EventBytesTransport transport) - { - var eventType = eventTypeFinder.FindType(transport.EventTypeCode); - - var eventObj = serializer.Deserialize(transport.EventBytes, eventType); - if (this is IPoleEventHandler handler) - { - var result = handler.EventHandle((TEvent)eventObj); - logger.LogTrace($"{nameof(PoleEventHandler)} Invoke completed: {0}->{1}->{2}", grainType.FullName, nameof(handler.EventHandle), serializer.Serialize(eventObj)); - return result; - } - else - { - throw new EventHandlerImplementedNotRightException(nameof(handler.EventHandle), eventType.Name, this.GetType().FullName); - } - } - - public async Task Invoke(List transports) - { - if (transports.Count() != 0) - { - var firstTransport = transports.First(); - var eventType = eventTypeFinder.FindType(firstTransport.EventTypeCode); - var eventObjs = transports.Select(transport => serializer.Deserialize(firstTransport.EventBytes, eventType)).Select(@event => (TEvent)@event).ToList(); - if (this is IPoleBulkEventsHandler batchHandler) - { - await batchHandler.BulkEventsHandle(eventObjs); - logger.LogTrace("Batch invoke completed: {0}->{1}->{2}", grainType.FullName, nameof(batchHandler.BulkEventsHandle), serializer.Serialize(eventObjs)); - return; - } - else if (this is IPoleEventHandler handler) - { - var handleTasks = eventObjs.Select(m => handler.EventHandle(m)); - await Task.WhenAll(handleTasks); - logger.LogTrace("Batch invoke completed: {0}->{1}->{2}", grainType.FullName, nameof(handler.EventHandle), serializer.Serialize(eventObjs)); - return; - } - else - { - throw new EventHandlerImplementedNotRightException(nameof(handler.EventHandle), eventType.Name, this.GetType().FullName); - } - } - else - { - if (logger.IsEnabled(LogLevel.Information)) - logger.LogInformation($"{nameof(EventBytesTransport.FromBytes)} failed"); - } - } - } -} diff --git a/src/Pole.Core/EventBus/EventStorage/EventEntity.cs b/src/Pole.Core/EventBus/EventStorage/EventEntity.cs deleted file mode 100644 index 02284ff..0000000 --- a/src/Pole.Core/EventBus/EventStorage/EventEntity.cs +++ /dev/null @@ -1,17 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Text; - -namespace Pole.Core.EventBus.EventStorage -{ - public class EventEntity - { - public string Id { get; set; } - public string Name { get; set; } - public string Content { get; set; } - public DateTime Added { get; set; } - public DateTime? ExpiresAt { get; set; } - public int Retries { get; set; } - public string StatusName { get; set; } - } -} diff --git a/src/Pole.Core/EventBus/EventStorage/EventStatus.cs b/src/Pole.Core/EventBus/EventStorage/EventStatus.cs deleted file mode 100644 index 4aec229..0000000 --- a/src/Pole.Core/EventBus/EventStorage/EventStatus.cs +++ /dev/null @@ -1,13 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Text; - -namespace Pole.Core.EventBus.EventStorage -{ - public enum EventStatus - { - Failed = -1, - Pending = 0, - Published = 1 - } -} diff --git a/src/Pole.Core/EventBus/EventStorage/IEventStorage.cs b/src/Pole.Core/EventBus/EventStorage/IEventStorage.cs deleted file mode 100644 index 5f45b0f..0000000 --- a/src/Pole.Core/EventBus/EventStorage/IEventStorage.cs +++ /dev/null @@ -1,22 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Text; -using System.Threading; -using System.Threading.Tasks; - -namespace Pole.Core.EventBus.EventStorage -{ - public interface IEventStorage - { - Task ChangePublishStateAsync(EventEntity message, EventStatus state); - Task ChangePublishStateAsync(IEnumerable messages); - Task BulkChangePublishStateAsync(IEnumerable events); - - Task StoreMessage(EventEntity eventEntity, object dbTransaction = null); - - Task DeleteExpiresAsync(string table, DateTime timeout, int batchCount = 1000, - CancellationToken token = default); - - Task> GetMessagesOfNeedRetry(); - } -} diff --git a/src/Pole.Core/EventBus/EventStorage/IEventStorageInitializer.cs b/src/Pole.Core/EventBus/EventStorage/IEventStorageInitializer.cs deleted file mode 100644 index a0d3a4e..0000000 --- a/src/Pole.Core/EventBus/EventStorage/IEventStorageInitializer.cs +++ /dev/null @@ -1,14 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Text; -using System.Threading; -using System.Threading.Tasks; - -namespace Pole.Core.EventBus.EventStorage -{ - public interface IEventStorageInitializer - { - Task InitializeAsync(CancellationToken cancellationToken); - string GetTableName(); - } -} diff --git a/src/Pole.Core/EventBus/IBus.cs b/src/Pole.Core/EventBus/IBus.cs deleted file mode 100644 index 6906448..0000000 --- a/src/Pole.Core/EventBus/IBus.cs +++ /dev/null @@ -1,19 +0,0 @@ -using Pole.Core.EventBus.EventStorage; -using Pole.Core.EventBus.Transaction; -using System; -using System.Collections.Concurrent; -using System.Collections.Generic; -using System.Text; -using System.Threading; -using System.Threading.Tasks; - -namespace Pole.Core.EventBus -{ - public interface IBus - { - IServiceProvider ServiceProvider { get; } - IDbTransactionAdapter Transaction { get; set; } - BlockingCollection PrePublishEventBuffer { get; } - Task Publish(object @event, CancellationToken cancellationToken = default); - } -} diff --git a/src/Pole.Core/EventBus/IConsumer.cs b/src/Pole.Core/EventBus/IConsumer.cs deleted file mode 100644 index d46a039..0000000 --- a/src/Pole.Core/EventBus/IConsumer.cs +++ /dev/null @@ -1,11 +0,0 @@ -using System.Collections.Generic; -using System.Threading.Tasks; - -namespace Pole.Core.EventBus -{ - public interface IConsumer - { - Task Notice(byte[] bytes); - Task Notice(List list); - } -} diff --git a/src/Pole.Core/EventBus/IConsumerContainer.cs b/src/Pole.Core/EventBus/IConsumerContainer.cs deleted file mode 100644 index 954552e..0000000 --- a/src/Pole.Core/EventBus/IConsumerContainer.cs +++ /dev/null @@ -1,9 +0,0 @@ -using System.Collections.Generic; - -namespace Pole.Core.EventBus -{ - public interface IConsumerContainer - { - List GetConsumers(); - } -} diff --git a/src/Pole.Core/EventBus/IEventBuffer.cs b/src/Pole.Core/EventBus/IEventBuffer.cs deleted file mode 100644 index b278016..0000000 --- a/src/Pole.Core/EventBus/IEventBuffer.cs +++ /dev/null @@ -1,13 +0,0 @@ -using Pole.Core.EventBus.EventStorage; -using System; -using System.Collections.Generic; -using System.Text; -using System.Threading.Tasks; - -namespace Pole.Core.EventBus -{ - public interface IEventBuffer - { - Task AddAndRun(EventEntity eventEntity); - } -} diff --git a/src/Pole.Core/EventBus/IGrainID.cs b/src/Pole.Core/EventBus/IGrainID.cs deleted file mode 100644 index ea6edf7..0000000 --- a/src/Pole.Core/EventBus/IGrainID.cs +++ /dev/null @@ -1,9 +0,0 @@ -using System; - -namespace Pole.Core.EventBus -{ - public interface IGrainID - { - Type EventHandlerType { get; } - } -} diff --git a/src/Pole.Core/EventBus/IObserverUnit.cs b/src/Pole.Core/EventBus/IObserverUnit.cs deleted file mode 100644 index d041f3a..0000000 --- a/src/Pole.Core/EventBus/IObserverUnit.cs +++ /dev/null @@ -1,12 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Threading.Tasks; - -namespace Pole.Core.EventBus -{ - public interface IObserverUnit : IGrainID - { - Func GetEventHandler(); - Func, Task> GetBatchEventHandler(); - } -} diff --git a/src/Pole.Core/EventBus/IObserverUnitContainer.cs b/src/Pole.Core/EventBus/IObserverUnitContainer.cs deleted file mode 100644 index 2f0cc91..0000000 --- a/src/Pole.Core/EventBus/IObserverUnitContainer.cs +++ /dev/null @@ -1,12 +0,0 @@ -using System; -using System.Collections.Generic; - -namespace Pole.Core.EventBus -{ - public interface IObserverUnitContainer - { - List> GetUnits(string observerName); - List GetUnits(string observerName); - void Register(string observerName,IGrainID followUnit); - } -} diff --git a/src/Pole.Core/EventBus/IProducer.cs b/src/Pole.Core/EventBus/IProducer.cs deleted file mode 100644 index e215757..0000000 --- a/src/Pole.Core/EventBus/IProducer.cs +++ /dev/null @@ -1,11 +0,0 @@ -using System.Collections.Generic; -using System.Threading.Tasks; - -namespace Pole.Core.EventBus -{ - public interface IProducer - { - ValueTask Publish(string targetName, byte[] bytes); - ValueTask BulkPublish(IEnumerable<(string,byte[])> events); - } -} diff --git a/src/Pole.Core/EventBus/IProducerInfoContainer.cs b/src/Pole.Core/EventBus/IProducerInfoContainer.cs deleted file mode 100644 index 7838ca3..0000000 --- a/src/Pole.Core/EventBus/IProducerInfoContainer.cs +++ /dev/null @@ -1,10 +0,0 @@ -using System; -using System.Threading.Tasks; - -namespace Pole.Core.EventBus -{ - public interface IProducerInfoContainer - { - string GetTargetName(string typeName); - } -} diff --git a/src/Pole.Core/EventBus/ObserverUnit.cs b/src/Pole.Core/EventBus/ObserverUnit.cs deleted file mode 100644 index 05d3978..0000000 --- a/src/Pole.Core/EventBus/ObserverUnit.cs +++ /dev/null @@ -1,116 +0,0 @@ -using Microsoft.Extensions.Logging; -using Orleans; -using Pole.Core.Serialization; -using System; -using System.Collections.Generic; -using System.Text; -using System.Threading.Tasks; -using Microsoft.Extensions.DependencyInjection; -using System.Linq; -using Pole.Core.EventBus.Event; -using Orleans.Concurrency; -using System.Collections.Concurrent; -using System.Linq.Expressions; -using Pole.Core.EventBus.EventHandler; -using Pole.Core.Utils.Abstraction; - -namespace Pole.Core.EventBus -{ - public class ObserverUnit : IObserverUnit - { - readonly IServiceProvider serviceProvider; - readonly ISerializer serializer; - readonly IEventTypeFinder typeFinder; - readonly IClusterClient clusterClient; - Func eventHandler; - Func, Task> batchEventHandler; - protected ILogger Logger { get; private set; } - public Type EventHandlerType { get; } - - public ObserverUnit(IServiceProvider serviceProvider, Type eventHandlerType) - { - this.serviceProvider = serviceProvider; - clusterClient = serviceProvider.GetService(); - serializer = serviceProvider.GetService(); - typeFinder = serviceProvider.GetService(); - Logger = serviceProvider.GetService>>(); - EventHandlerType = eventHandlerType; - } - public static ObserverUnit From(IServiceProvider serviceProvider) where Grain : Orleans.Grain - { - return new ObserverUnit(serviceProvider, typeof(Grain)); - } - - public Func GetEventHandler() - { - return eventHandler; - } - - public Func, Task> GetBatchEventHandler() - { - return batchEventHandler; - } - - public void Observer() - { - if (!typeof(IPoleEventHandler).IsAssignableFrom(EventHandlerType)) - throw new NotSupportedException($"{EventHandlerType.FullName} must inheritance from PoleEventHandler"); - eventHandler = EventHandler; - batchEventHandler = BatchEventHandler; - //内部函数 - Task EventHandler(byte[] bytes) - { - var (success, transport) = EventBytesTransport.FromBytes(bytes); - if (success) - { - return GetObserver(EventHandlerType, transport.EventId).Invoke(transport); - } - else - { - if (Logger.IsEnabled(LogLevel.Error)) - Logger.LogError($" EventId:{nameof(EventBytesTransport.EventId)} is not a event"); - } - return Task.CompletedTask; - } - Task BatchEventHandler(List list) - { - var transports = list.Select(bytes => - { - var (success, transport) = EventBytesTransport.FromBytes(bytes); - if (!success) - { - if (Logger.IsEnabled(LogLevel.Error)) - Logger.LogError($" EventId:{nameof(EventBytesTransport.EventId)} is not a event"); - } - return (success, transport); - }).Where(o => o.success) - .Select(o => (o.transport)) - .ToList(); - // 批量处理的时候 grain Id 取第一个 event的id - return GetObserver(EventHandlerType, transports.First().EventId).Invoke(transports); - } - } - static readonly ConcurrentDictionary> _observerGeneratorDict = new ConcurrentDictionary>(); - private IPoleEventHandler GetObserver(Type ObserverType, string primaryKey) - { - var func = _observerGeneratorDict.GetOrAdd(ObserverType, key => - { - var clientType = typeof(IClusterClient); - var clientParams = Expression.Parameter(clientType, "client"); - var primaryKeyParams = Expression.Parameter(typeof(string), "primaryKey"); - var grainClassNamePrefixParams = Expression.Parameter(typeof(string), "grainClassNamePrefix"); - var method = typeof(ClusterClientExtensions).GetMethod("GetGrain", new Type[] { clientType, typeof(string), typeof(string) }); - var body = Expression.Call(method.MakeGenericMethod(ObserverType), clientParams, primaryKeyParams, grainClassNamePrefixParams); - return Expression.Lambda>(body, clientParams, primaryKeyParams, grainClassNamePrefixParams).Compile(); - }); - return func(clusterClient, primaryKey, null); - } - } - public static class ClusterClientExtensions - { - public static TGrainInterface GetGrain(IClusterClient client, string primaryKey, string grainClassNamePrefix = null) where TGrainInterface : IGrainWithStringKey - { - return client.GetGrain(primaryKey, grainClassNamePrefix); - } - } -} diff --git a/src/Pole.Core/EventBus/ObserverUnitContainer.cs b/src/Pole.Core/EventBus/ObserverUnitContainer.cs deleted file mode 100644 index 2af2997..0000000 --- a/src/Pole.Core/EventBus/ObserverUnitContainer.cs +++ /dev/null @@ -1,87 +0,0 @@ -using Microsoft.Extensions.Logging; -using Pole.Core.Utils; -using System; -using System.Collections.Concurrent; -using System.Collections.Generic; -using System.Text; -using Microsoft.Extensions.DependencyInjection; -using Pole.Core.EventBus.EventHandler; -using System.Linq; -using Pole.Core.Exceptions; -using Pole.Core.EventBus.Event; - -namespace Pole.Core.EventBus -{ - public class ObserverUnitContainer : IObserverUnitContainer - { - readonly ConcurrentDictionary> unitDict = new ConcurrentDictionary>(); - public ObserverUnitContainer(IServiceProvider serviceProvider) - { - var eventHandlerList = new List<(Type, EventInfoAttribute)>(); - foreach (var assembly in AssemblyHelper.GetAssemblies(serviceProvider.GetService>())) - { - foreach (var type in assembly.GetTypes().Where(m => typeof(IPoleEventHandler).IsAssignableFrom(m) && m.IsClass && !m.IsAbstract && !typeof(Orleans.Runtime.GrainReference).IsAssignableFrom(m))) - { - var eventHandlerInterface = type.GetInterfaces().FirstOrDefault(type => typeof(IPoleEventHandler).IsAssignableFrom(type) && !type.IsGenericType); - var basePoleEventHandlerInterface= eventHandlerInterface.GetInterfaces().FirstOrDefault(m=>m.IsGenericType); - - if (basePoleEventHandlerInterface == null) - { - throw new PoleEventHandlerImplementException("PoleEventHandler interface must Inherited from IPoleEventHandler"); - } - var eventType= basePoleEventHandlerInterface.GetGenericArguments().FirstOrDefault(); - if (eventType == null) - { - throw new PoleEventHandlerImplementException("PoleEventHandler interface must Inherited from IPoleEventHandler"); - } - var attribute = eventType.GetCustomAttributes(typeof(EventInfoAttribute), false).FirstOrDefault(); - - if (attribute != null) - { - eventHandlerList.Add((eventHandlerInterface, (EventInfoAttribute)attribute)); - } - else - { - throw new PoleEventHandlerImplementException("Can not found EventHandlerAttribute in PoleEventHandler"); - } - } - } - foreach (var eventHandler in eventHandlerList) - { - var unitType = typeof(ObserverUnit<>).MakeGenericType(new Type[] { typeof(string) }); - var unit = (ObserverUnit)Activator.CreateInstance(unitType, serviceProvider, eventHandler.Item1); - unit.Observer(); - Register(eventHandler.Item2.EventName, unit); - } - } - public List> GetUnits(string observerName) - { - if (unitDict.TryGetValue(observerName, out var units)) - return units.Select(m => (IObserverUnit)m).ToList(); - else - throw new UnfindObserverUnitException(observerName); - } - public List GetUnits(string observerName) - { - if (unitDict.TryGetValue(observerName, out var unit)) - { - return unit; - } - else - throw new UnfindObserverUnitException(observerName); - } - - public void Register(string observerName, IGrainID observerUnit) - { - if (unitDict.TryGetValue(observerName, out List units)) - { - units.Add(observerUnit); - } - if (!unitDict.TryAdd(observerName, new List { observerUnit })) - { - throw new ObserverUnitRepeatedException(observerUnit.EventHandlerType.FullName); - } - } - - } -} diff --git a/src/Pole.Core/EventBus/Transaction/IDbTransactionAdapter.cs b/src/Pole.Core/EventBus/Transaction/IDbTransactionAdapter.cs deleted file mode 100644 index 86a5ede..0000000 --- a/src/Pole.Core/EventBus/Transaction/IDbTransactionAdapter.cs +++ /dev/null @@ -1,17 +0,0 @@ -using Pole.Core.EventBus.EventStorage; -using System; -using System.Collections.Concurrent; -using System.Collections.Generic; -using System.Text; -using System.Threading; -using System.Threading.Tasks; - -namespace Pole.Core.EventBus.Transaction -{ - public interface IDbTransactionAdapter : IDisposable - { - Task CommitAsync(CancellationToken cancellationToken = default); - Task RollbackAsync(CancellationToken cancellationToken = default); - object DbTransaction { get; set; } - } -} diff --git a/src/Pole.Core/Extensions/PoleServiceCollectionExtensions.cs b/src/Pole.Core/Extensions/PoleServiceCollectionExtensions.cs index 299dfdf..49dade1 100644 --- a/src/Pole.Core/Extensions/PoleServiceCollectionExtensions.cs +++ b/src/Pole.Core/Extensions/PoleServiceCollectionExtensions.cs @@ -1,12 +1,9 @@ using Microsoft.Extensions.DependencyInjection; using Pole.Core; using Pole.Core.Channels; -using Pole.Core.EventBus; using Pole.Core.Processor; -using Pole.Core.Processor.Server; using Pole.Core.Query; using Pole.Core.Serialization; -using Pole.Core.UnitOfWork; using Pole.Core.Utils; using Pole.Core.Utils.Abstraction; using System; @@ -24,14 +21,9 @@ namespace Microsoft.Extensions.DependencyInjection { services.Configure(option => { }); } - services.AddSingleton(); - services.AddSingleton(); services.AddTransient(typeof(IMpscChannel<>), typeof(MpscChannel<>)); - services.AddScoped(); - services.AddScoped(); services.AddSingleton(); services.AddSingleton(); - services.AddSingleton(); services.AddSingleton(); using (var serviceProvider = services.BuildServiceProvider()) { @@ -41,11 +33,6 @@ namespace Microsoft.Extensions.DependencyInjection var queryRegister = serviceProvider.GetService(); queryRegister.Register(services, ServiceLifetime.Scoped); } - - services.AddSingleton(); - services.AddSingleton(); - services.AddHostedService(); - config(startupOption); return services; } diff --git a/src/Pole.Core/Grains/PoleGrain.cs b/src/Pole.Core/Grains/PoleGrain.cs index 1dadcbe..f8a3030 100644 --- a/src/Pole.Core/Grains/PoleGrain.cs +++ b/src/Pole.Core/Grains/PoleGrain.cs @@ -1,6 +1,5 @@ using Orleans; using Pole.Core.Domain; -using Pole.Core.EventBus.Event; using System; using System.Collections.Generic; using System.Text; diff --git a/src/Pole.Core/Processor/ExpiredEventsCollectorProcessor.cs b/src/Pole.Core/Processor/ExpiredEventsCollectorProcessor.cs deleted file mode 100644 index 08f19a7..0000000 --- a/src/Pole.Core/Processor/ExpiredEventsCollectorProcessor.cs +++ /dev/null @@ -1,70 +0,0 @@ -using Microsoft.Extensions.Logging; -using Microsoft.Extensions.Options; -using Pole.Core.EventBus.EventStorage; -using System; -using System.Collections.Generic; -using System.Text; -using System.Threading.Tasks; - -namespace Pole.Core.Processor -{ - class ExpiredEventsCollectorProcessor : IProcessor - { - private readonly ILogger logger; - private readonly IEventStorageInitializer initializer; - private readonly IEventStorage eventstorage; - private readonly PoleOptions poleOptions; - - private const int ItemBatch = 1000; - private readonly TimeSpan _waitingInterval = TimeSpan.FromMinutes(5); - private readonly TimeSpan _delay = TimeSpan.FromSeconds(1); - - public string Name => nameof(PendingMessageRetryProcessor); - - public ExpiredEventsCollectorProcessor( - ILogger logger, - IEventStorageInitializer initializer, - IEventStorage eventstorage, - IOptions poleOptions) - { - this.logger = logger; - this.initializer = initializer; - this.eventstorage = eventstorage; - this.poleOptions = poleOptions.Value; - } - - public async Task Process(ProcessingContext context) - { - try - { - var tables = new[] { initializer.GetTableName() }; - - foreach (var table in tables) - { - logger.LogDebug($"Collecting expired data from table: {table}"); - - int deletedCount; - var time = DateTime.UtcNow; - do - { - deletedCount = await eventstorage.DeleteExpiresAsync(table, time, ItemBatch, context.CancellationToken); - - if (deletedCount != 0) - { - await Task.Delay(poleOptions.ExpiredEventsPreBulkDeleteDelaySeconds * 1000); - } - } while (deletedCount != 0); - } - } - catch (Exception ex) - { - logger.LogError(ex, $"{nameof(ExpiredEventsCollectorProcessor)} Process Error"); - } - finally - { - - await Task.Delay(poleOptions.ExpiredEventsCollectIntervalSeconds * 1000); - } - } - } -} diff --git a/src/Pole.Core/Processor/PendingMessageRetryProcessor.cs b/src/Pole.Core/Processor/PendingMessageRetryProcessor.cs deleted file mode 100644 index 6e928b2..0000000 --- a/src/Pole.Core/Processor/PendingMessageRetryProcessor.cs +++ /dev/null @@ -1,91 +0,0 @@ -using Microsoft.Extensions.Logging; -using Microsoft.Extensions.Options; -using Pole.Core.EventBus; -using Pole.Core.EventBus.Event; -using Pole.Core.EventBus.EventStorage; -using Pole.Core.Serialization; -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading.Tasks; - -namespace Pole.Core.Processor -{ - class PendingMessageRetryProcessor : ProcessorBase - { - private readonly IEventStorage eventStorage; - private readonly PoleOptions options; - private readonly IProducerInfoContainer producerContainer; - private readonly ISerializer serializer; - private readonly ILogger logger; - private readonly ProducerOptions producerOptions; - private readonly IProducer producer; - private readonly IEventBuffer eventBuffer; - public PendingMessageRetryProcessor(IEventStorage eventStorage, IOptions options, ILogger logger, - IProducerInfoContainer producerContainer, ISerializer serializer, IOptions producerOptions, IProducer producer, IEventBuffer eventBuffer) - { - this.eventStorage = eventStorage; - this.options = options.Value ?? throw new Exception($"{nameof(PoleOptions)} Must be injected"); - this.logger = logger; - this.producerContainer = producerContainer; - this.serializer = serializer; - this.producerOptions = producerOptions.Value ?? throw new Exception($"{nameof(ProducerOptions)} Must be injected"); - this.producer = producer; - this.eventBuffer = eventBuffer; - } - public override string Name => nameof(PendingMessageRetryProcessor); - - - public override async Task Process(ProcessingContext context) - { - try - { - await ProcessInternal(); - } - catch (Exception ex) - { - logger.LogError(ex, $"{nameof(PendingMessageRetryProcessor)} Process Error"); - } - finally - { - await Task.Delay(options.PendingMessageRetryIntervalSeconds * 1000); - } - } - public async Task ProcessInternal() - { - var now = DateTime.UtcNow; - var pendingMessages = await eventStorage.GetMessagesOfNeedRetry(); - - if (logger.IsEnabled(LogLevel.Debug)) - { - logger.LogDebug($"{nameof(PendingMessageRetryProcessor)} pendingMessages count:{pendingMessages.Count()}"); - } - foreach (var pendingMessage in pendingMessages) - { - var eventContentBytes = Encoding.UTF8.GetBytes(pendingMessage.Content); - var bytesTransport = new EventBytesTransport(pendingMessage.Name, pendingMessage.Id, eventContentBytes); - var bytes = bytesTransport.GetBytes(); - if (pendingMessage.Retries > producerOptions.MaxFailedRetryCount) - { - pendingMessage.StatusName = nameof(EventStatus.Failed); - continue; - } - pendingMessage.Retries++; - var targetName = producerContainer.GetTargetName(pendingMessage.Name); - await producer.Publish(targetName, bytes); - } - if (pendingMessages.Count() > 0) - { - if (pendingMessages.Count() > 10) - { - await eventStorage.BulkChangePublishStateAsync(pendingMessages); - } - else - { - await eventStorage.ChangePublishStateAsync(pendingMessages); - } - } - } - } -} diff --git a/src/Pole.Core/Processor/Server/BackgroundServiceBasedProcessorServer.cs b/src/Pole.Core/Processor/Server/BackgroundServiceBasedProcessorServer.cs deleted file mode 100644 index fcba568..0000000 --- a/src/Pole.Core/Processor/Server/BackgroundServiceBasedProcessorServer.cs +++ /dev/null @@ -1,47 +0,0 @@ -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Hosting; -using Microsoft.Extensions.Logging; -using Pole.Core.EventBus.EventStorage; -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading; -using System.Threading.Tasks; - -namespace Pole.Core.Processor.Server -{ - public class BackgroundServiceBasedProcessorServer : BackgroundService, IProcessorServer - { - private readonly IServiceProvider _serviceProvider; - private Task _compositeTask; - - public BackgroundServiceBasedProcessorServer(IServiceProvider serviceProvider) - { - _serviceProvider = serviceProvider; - } - public async Task Start(CancellationToken stoppingToken) - { - var eventStorageInitializer = _serviceProvider.GetService(); - await eventStorageInitializer.InitializeAsync(stoppingToken); - - ProcessingContext processingContext = new ProcessingContext(stoppingToken); - List loopProcessors = new List(); - var innerProcessors = _serviceProvider.GetServices(); - var loggerFactory = _serviceProvider.GetService(); - foreach (var innerProcessor in innerProcessors) - { - LoopProcessor processor = new LoopProcessor(innerProcessor, loggerFactory); - loopProcessors.Add(processor); - } - var tasks = loopProcessors.Select(p => p.Process(processingContext)); - - _compositeTask = Task.WhenAll(tasks); - await _compositeTask; - } - protected override Task ExecuteAsync(CancellationToken stoppingToken) - { - return Start(stoppingToken); - } - } -} diff --git a/src/Pole.Core/Serialization/EventTypeFinder.cs b/src/Pole.Core/Serialization/EventTypeFinder.cs deleted file mode 100644 index 4871cc2..0000000 --- a/src/Pole.Core/Serialization/EventTypeFinder.cs +++ /dev/null @@ -1,67 +0,0 @@ -using Microsoft.Extensions.Logging; -using Pole.Core.EventBus.Event; -using Pole.Core.Exceptions; -using Pole.Core.Utils; -using Pole.Core.Utils.Abstraction; -using System; -using System.Collections.Concurrent; -using System.Collections.Generic; -using System.Linq; -using System.Text; - -namespace Pole.Core.Serialization -{ - public class EventTypeFinder : IEventTypeFinder - { - private readonly ConcurrentDictionary codeDict = new ConcurrentDictionary(); - private readonly ConcurrentDictionary typeDict = new ConcurrentDictionary(); - readonly ILogger logger; - public EventTypeFinder(ILogger logger) - { - this.logger = logger; - var baseEventType = typeof(IEvent); - foreach (var assembly in AssemblyHelper.GetAssemblies(this.logger)) - { - foreach (var type in assembly.GetTypes().Where(m => baseEventType.IsAssignableFrom(m)&&!m.IsAbstract)) - { - var eventCode = type.FullName; - var eventAttribute = type.GetCustomAttributes(typeof(EventInfoAttribute),false).FirstOrDefault(); - if (eventAttribute is EventInfoAttribute attribute ) - { - eventCode = attribute.EventName; - } - typeDict.TryAdd(type, eventCode); - - if (!codeDict.TryAdd(eventCode, type)) - { - throw new TypeCodeRepeatedException(type.FullName, type.FullName); - } - } - } - } - /// - /// 通过code获取Type对象 - /// - /// - /// - public Type FindType(string typeCode) - { - if (codeDict.TryGetValue(typeCode, out Type type)) - { - return type; - } - throw new UnknowTypeCodeException(typeCode); - } - /// - /// 获取Type对象的code字符串 - /// - /// - /// - public string GetCode(Type type) - { - if (!typeDict.TryGetValue(type, out var value)) - return type.FullName; - return value; - } - } -} diff --git a/src/Pole.Core/UnitOfWork/IUnitOfWork.cs b/src/Pole.Core/UnitOfWork/IUnitOfWork.cs deleted file mode 100644 index 27910ab..0000000 --- a/src/Pole.Core/UnitOfWork/IUnitOfWork.cs +++ /dev/null @@ -1,20 +0,0 @@ -using Pole.Core.EventBus; -using Pole.Core.EventBus.Transaction; -using System; -using System.Collections.Generic; -using System.Data; -using System.Text; -using System.Threading; -using System.Threading.Tasks; -using Microsoft.Extensions.DependencyInjection; - -namespace Pole.Core.UnitOfWork -{ - public interface IUnitOfWork : IDisposable - { - Task CompeleteAsync(CancellationToken cancellationToken = default); - Task Rollback(CancellationToken cancellationToken = default); - IUnitOfWork Enlist(IDbTransactionAdapter dbTransactionAdapter, IBus bus); - IServiceProvider ServiceProvider { get; } - } -} diff --git a/src/Pole.Core/UnitOfWork/UnitOfWork.cs b/src/Pole.Core/UnitOfWork/UnitOfWork.cs deleted file mode 100644 index 035441a..0000000 --- a/src/Pole.Core/UnitOfWork/UnitOfWork.cs +++ /dev/null @@ -1,75 +0,0 @@ -using Pole.Core.EventBus; -using Pole.Core.EventBus.Transaction; -using System; -using System.Collections.Generic; -using System.Data; -using System.Linq; -using System.Text; -using System.Threading; -using System.Threading.Tasks; -using Microsoft.Extensions.DependencyInjection; -using Pole.Core.Serialization; -using Pole.Core.EventBus.Event; -using Pole.Core.EventBus.EventStorage; -using Microsoft.Extensions.Options; -using Pole.Core.Utils.Abstraction; -using Pole.Core.Exceptions; - -namespace Pole.Core.UnitOfWork -{ - class UnitOfWork : IUnitOfWork - { - private readonly IProducerInfoContainer producerContainer; - private readonly IEventTypeFinder eventTypeFinder; - private readonly ISerializer serializer; - private IBus bus; - private IEventBuffer eventBuffer; - public IServiceProvider ServiceProvider { get; } - public UnitOfWork(IProducerInfoContainer producerContainer, IEventTypeFinder eventTypeFinder, - ISerializer serializer, IEventBuffer eventBuffer, IServiceProvider serviceProvider) - { - this.producerContainer = producerContainer; - this.eventTypeFinder = eventTypeFinder; - this.serializer = serializer; - this.eventBuffer = eventBuffer; - this.ServiceProvider = serviceProvider; - } - - public async Task CompeleteAsync(CancellationToken cancellationToken = default) - { - - await bus.Transaction.CommitAsync(); - - var bufferedEvents = bus.PrePublishEventBuffer.ToList(); - bufferedEvents.ForEach(async @event => - { - var eventType = eventTypeFinder.FindType(@event.Name); - var eventContentBytes = Encoding.UTF8.GetBytes(@event.Content); - var bytesTransport = new EventBytesTransport(@event.Name, @event.Id, eventContentBytes); - var bytes = bytesTransport.GetBytes(); - var result = await eventBuffer.AddAndRun(@event); - if (!result) - { - throw new AddEventToEventBufferException(); - } - }); - } - - public void Dispose() - { - bus = null; - } - - public IUnitOfWork Enlist(IDbTransactionAdapter dbTransactionAdapter, IBus bus) - { - bus.Transaction = dbTransactionAdapter; - this.bus = bus; - return this; - } - - public Task Rollback(CancellationToken cancellationToken = default) - { - return bus.Transaction.RollbackAsync(); - } - } -} diff --git a/src/Pole.Core/Utils/Abstraction/IEventTypeFinder.cs b/src/Pole.Core/Utils/Abstraction/IEventTypeFinder.cs deleted file mode 100644 index 9f370fa..0000000 --- a/src/Pole.Core/Utils/Abstraction/IEventTypeFinder.cs +++ /dev/null @@ -1,12 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Text; - -namespace Pole.Core.Utils.Abstraction -{ - public interface IEventTypeFinder - { - Type FindType(string code); - string GetCode(Type type); - } -} diff --git a/src/Pole.EventBus.Rabbitmq/Consumer/RabbitConsumer.cs b/src/Pole.EventBus.Rabbitmq/Consumer/RabbitConsumer.cs index be3e6a4..c0996f5 100644 --- a/src/Pole.EventBus.Rabbitmq/Consumer/RabbitConsumer.cs +++ b/src/Pole.EventBus.Rabbitmq/Consumer/RabbitConsumer.cs @@ -1,7 +1,6 @@ using System; using System.Collections.Generic; using System.Threading.Tasks; -using Pole.Core.EventBus; namespace Pole.EventBus.RabbitMQ { diff --git a/src/Pole.EventBus.Rabbitmq/Core/EventBusContainer.cs b/src/Pole.EventBus.Rabbitmq/Core/EventBusContainer.cs index 989d0af..d052272 100644 --- a/src/Pole.EventBus.Rabbitmq/Core/EventBusContainer.cs +++ b/src/Pole.EventBus.Rabbitmq/Core/EventBusContainer.cs @@ -2,17 +2,17 @@ using Microsoft.Extensions.Logging; using Orleans; using RabbitMQ.Client; -using Pole.Core.EventBus; using Pole.Core.Exceptions; using Pole.Core.Utils; using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Threading.Tasks; -using Pole.Core.EventBus.Event; -using Pole.Core.EventBus.EventHandler; using Microsoft.Extensions.Options; using System.Linq; +using Pole.EventBus.Event; +using Pole.Core.Domain; +using Pole.EventBus.EventHandler; namespace Pole.EventBus.RabbitMQ { diff --git a/src/Pole.EventBus.Rabbitmq/Core/IRabbitEventBusContainer.cs b/src/Pole.EventBus.Rabbitmq/Core/IRabbitEventBusContainer.cs index 879630d..c06fd5b 100644 --- a/src/Pole.EventBus.Rabbitmq/Core/IRabbitEventBusContainer.cs +++ b/src/Pole.EventBus.Rabbitmq/Core/IRabbitEventBusContainer.cs @@ -1,5 +1,4 @@ using System.Threading.Tasks; -using Pole.Core.EventBus; namespace Pole.EventBus.RabbitMQ { diff --git a/src/Pole.EventBus.Rabbitmq/Core/RabbitEventBus.cs b/src/Pole.EventBus.Rabbitmq/Core/RabbitEventBus.cs index 0e3a9e8..4f3d3ea 100644 --- a/src/Pole.EventBus.Rabbitmq/Core/RabbitEventBus.cs +++ b/src/Pole.EventBus.Rabbitmq/Core/RabbitEventBus.cs @@ -1,6 +1,4 @@ -using Pole.Core.EventBus; -using Pole.Core.EventBus.EventHandler; -using Pole.Core.Exceptions; +using Pole.Core.Exceptions; using Pole.Core.Utils; using System; using System.Collections.Generic; diff --git a/src/Pole.EventBus.Rabbitmq/Pole.EventBus.Rabbitmq.csproj b/src/Pole.EventBus.Rabbitmq/Pole.EventBus.Rabbitmq.csproj index dde5642..a5f8283 100644 --- a/src/Pole.EventBus.Rabbitmq/Pole.EventBus.Rabbitmq.csproj +++ b/src/Pole.EventBus.Rabbitmq/Pole.EventBus.Rabbitmq.csproj @@ -15,6 +15,7 @@ + diff --git a/src/Pole.EventBus.Rabbitmq/PoleRabbitmqStartupConfigExtensions.cs b/src/Pole.EventBus.Rabbitmq/PoleRabbitmqStartupConfigExtensions.cs index ae40c95..7ad5bf0 100644 --- a/src/Pole.EventBus.Rabbitmq/PoleRabbitmqStartupConfigExtensions.cs +++ b/src/Pole.EventBus.Rabbitmq/PoleRabbitmqStartupConfigExtensions.cs @@ -5,7 +5,7 @@ using System.Threading.Tasks; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Options; using Pole.Core; -using Pole.Core.EventBus; +using Pole.EventBus; using Pole.EventBus.RabbitMQ; namespace Microsoft.Extensions.DependencyInjection diff --git a/src/Pole.EventBus.Rabbitmq/Producer/RabbitProducer.cs b/src/Pole.EventBus.Rabbitmq/Producer/RabbitProducer.cs index 368e6fc..14dff8d 100644 --- a/src/Pole.EventBus.Rabbitmq/Producer/RabbitProducer.cs +++ b/src/Pole.EventBus.Rabbitmq/Producer/RabbitProducer.cs @@ -1,6 +1,5 @@ using Microsoft.Extensions.Options; using Pole.Core; -using Pole.Core.EventBus; using System; using System.Collections.Generic; using System.Linq; diff --git a/src/Pole.EventBus/Bus.cs b/src/Pole.EventBus/Bus.cs new file mode 100644 index 0000000..866e280 --- /dev/null +++ b/src/Pole.EventBus/Bus.cs @@ -0,0 +1,65 @@ + +using Pole.EventBus.Event; +using Pole.EventBus.EventStorage; +using Pole.EventBus.Transaction; +using Pole.Core.Serialization; +using Pole.Core.Utils.Abstraction; +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace Pole.EventBus +{ + class Bus : IBus + { + private readonly IEventTypeFinder eventTypeFinder; + private readonly ISerializer serializer; + private readonly ISnowflakeIdGenerator snowflakeIdGenerator; + private readonly IEventStorage eventStorage; + public IDbTransactionAdapter Transaction { get; set; } + + public IServiceProvider ServiceProvider { get; } + public BlockingCollection PrePublishEventBuffer { get; } = new BlockingCollection(new ConcurrentQueue()); + + public Bus(IServiceProvider serviceProvider, IEventTypeFinder eventTypeFinder, ISerializer serializer, ISnowflakeIdGenerator snowflakeIdGenerator, IEventStorage eventStorage) + { + ServiceProvider = serviceProvider; + this.eventTypeFinder = eventTypeFinder; + this.serializer = serializer; + this.snowflakeIdGenerator = snowflakeIdGenerator; + this.eventStorage = eventStorage; + } + public async Task Publish(object @event, CancellationToken cancellationToken = default) + { + var eventType = @event.GetType(); + var eventTypeCode = eventTypeFinder.GetCode(eventType); + var eventId = snowflakeIdGenerator.NextId(); + var eventContent = serializer.Serialize(@event, eventType); + var eventEntity = new EventEntity + { + Added = DateTime.UtcNow, + Content = eventContent, + ExpiresAt = null, + Id = eventId, + Name = eventTypeCode, + Retries = 0, + StatusName = nameof(EventStatus.Pending) + }; + if (Transaction?.DbTransaction == null) + { + await eventStorage.StoreMessage(eventEntity); + } + else + { + await eventStorage.StoreMessage(eventEntity, Transaction.DbTransaction); + + } + PrePublishEventBuffer.Add(eventEntity); + + return true; + } + } +} diff --git a/src/Pole.EventBus/Consumer.cs b/src/Pole.EventBus/Consumer.cs new file mode 100644 index 0000000..4698727 --- /dev/null +++ b/src/Pole.EventBus/Consumer.cs @@ -0,0 +1,33 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; + +namespace Pole.EventBus +{ + public abstract class Consumer : IConsumer + { + readonly List> eventHandlers; + readonly List, Task>> batchEventHandlers; + public Consumer( + List> eventHandlers, + List, Task>> batchEventHandlers) + { + this.eventHandlers = eventHandlers; + this.batchEventHandlers = batchEventHandlers; + } + public void AddHandler(Func func) + { + eventHandlers.Add(func); + } + public Task Notice(byte[] bytes) + { + return Task.WhenAll(eventHandlers.Select(func => func(bytes))); + } + + public Task Notice(List list) + { + return Task.WhenAll(batchEventHandlers.Select(func => func(list))); + } + } +} diff --git a/src/Pole.EventBus/Event/EventBytesTransport.cs b/src/Pole.EventBus/Event/EventBytesTransport.cs new file mode 100644 index 0000000..d558138 --- /dev/null +++ b/src/Pole.EventBus/Event/EventBytesTransport.cs @@ -0,0 +1,61 @@ +using Pole.Core.Serialization; +using Pole.Core.Utils; +using System; +using System.Collections.Generic; +using System.Text; + +namespace Pole.EventBus.Event +{ + public readonly struct EventBytesTransport + { + public EventBytesTransport(string eventCode, string eventId, byte[] eventBytes) + { + EventTypeCode = eventCode; + EventBytes = eventBytes; + EventId = eventId; + } + /// + /// 每个类型的Event 全局唯一 + /// + public string EventId { get; } + /// + /// 事件TypeCode + /// + public string EventTypeCode { get; } + /// + /// 事件本身的bytes + /// + public byte[] EventBytes { get; } + public byte[] GetBytes() + { + var eventTypeBytes = Encoding.UTF8.GetBytes(EventTypeCode); + var eventIdBytes = Encoding.UTF8.GetBytes(EventId); + using var ms = new PooledMemoryStream(); + ms.WriteByte((byte)TransportType.Event); + ms.Write(BitConverter.GetBytes((ushort)eventTypeBytes.Length)); + ms.Write(BitConverter.GetBytes((ushort)eventIdBytes.Length)); + ms.Write(BitConverter.GetBytes(EventBytes.Length)); + ms.Write(eventTypeBytes); + ms.Write(eventIdBytes); + ms.Write(EventBytes); + return ms.ToArray(); + } + public static (bool success, EventBytesTransport transport) FromBytes(byte[] bytes) + { + if (bytes[0] == (byte)TransportType.Event) + { + var bytesSpan = bytes.AsSpan(); + var eventTypeCodeLength = BitConverter.ToUInt16(bytesSpan.Slice(1, sizeof(ushort))); + var eventIdLength = BitConverter.ToUInt16(bytesSpan.Slice(1 + sizeof(ushort), sizeof(ushort))); + var eventBytesLength = BitConverter.ToInt32(bytesSpan.Slice(1 + 2 * sizeof(ushort), sizeof(int))); + var skipLength = 2 * sizeof(ushort) + 1 + sizeof(int); + return (true, new EventBytesTransport( + Encoding.UTF8.GetString(bytesSpan.Slice(skipLength, eventTypeCodeLength)), + Encoding.UTF8.GetString(bytesSpan.Slice(skipLength + eventTypeCodeLength, eventIdLength)), + bytesSpan.Slice(skipLength + eventTypeCodeLength + eventIdLength, eventBytesLength).ToArray() + )); + } + return (false, default); + } + } +} diff --git a/src/Pole.EventBus/Event/EventInfoAttribute.cs b/src/Pole.EventBus/Event/EventInfoAttribute.cs new file mode 100644 index 0000000..bd9f276 --- /dev/null +++ b/src/Pole.EventBus/Event/EventInfoAttribute.cs @@ -0,0 +1,12 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace Pole.EventBus.Event +{ + [AttributeUsage(AttributeTargets.Class)] + public class EventInfoAttribute: Attribute + { + public string EventName { get; set; } + } +} diff --git a/src/Pole.EventBus/EventBuffer.cs b/src/Pole.EventBus/EventBuffer.cs new file mode 100644 index 0000000..0c3ae74 --- /dev/null +++ b/src/Pole.EventBus/EventBuffer.cs @@ -0,0 +1,144 @@ +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using Pole.Core; +using Pole.EventBus.Event; +using Pole.EventBus.EventStorage; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using System.Threading.Tasks.Dataflow; + +namespace Pole.EventBus +{ + class EventBuffer : IEventBuffer + { + readonly BufferBlock buffer = new BufferBlock(); + private int autoConsuming = 0; + private readonly ILogger logger; + /// + /// 批量数据处理每次处理的最大数据量 + /// + private readonly int maxBatchSize = 10000; + /// + /// 批量数据接收的最大延时 + /// + private readonly int maxMillisecondsDelay = 2000; + private readonly IProducerInfoContainer producerContainer; + private readonly IProducer producer; + private readonly IEventStorage eventStorage; + private readonly PoleOptions options; + private Task waitToReadTask; + public EventBuffer(ILogger logger, IProducerInfoContainer producerContainer, IProducer producer, IEventStorage eventStorage, IOptions options) + { + this.logger = logger; + this.producerContainer = producerContainer; + this.producer = producer; + this.eventStorage = eventStorage; + this.options = options.Value; + } + public async Task AddAndRun(EventEntity eventEntity) + { + if (!buffer.Post(eventEntity)) + return await buffer.SendAsync(eventEntity); + if (autoConsuming == 0) + ActiveAutoExecute(); + + return true; + } + private void ActiveAutoExecute() + { + if (autoConsuming == 0) + ThreadPool.QueueUserWorkItem(ActiveConsumer); + async void ActiveConsumer(object state) + { + if (Interlocked.CompareExchange(ref autoConsuming, 1, 0) == 0) + { + try + { + while (await WaitToReadAsync()) + { + try + { + await Execute(); + } + catch (Exception ex) + { + logger.LogError(ex, ex.Message); + } + } + } + finally + { + Interlocked.Exchange(ref autoConsuming, 0); + } + } + } + } + public async Task WaitToReadAsync() + { + waitToReadTask = buffer.OutputAvailableAsync(); + return await waitToReadTask; + + } + public async Task Execute() + { + if (waitToReadTask.IsCompletedSuccessfully && waitToReadTask.Result) + { + var dataList = new List(); + var startTime = DateTimeOffset.UtcNow; + while (buffer.TryReceive(out var value)) + { + dataList.Add(value); + if (dataList.Count > maxBatchSize) + { + break; + } + else if ((DateTimeOffset.UtcNow - startTime).TotalMilliseconds > maxMillisecondsDelay) + { + break; + } + } + if (dataList.Count > 0) + { + await ExecuteCore(dataList); + } + + } + } + private async Task ExecuteCore(List eventEntities) + { + logger.LogTrace($"Begin ExecuteCore Count:{eventEntities.Count} "); + var events = eventEntities.Select(entity => + { + var eventContentBytes = Encoding.UTF8.GetBytes(entity.Content); + var bytesTransport = new EventBytesTransport(entity.Name, entity.Id, eventContentBytes); + var bytes = bytesTransport.GetBytes(); + var targetName = producerContainer.GetTargetName(entity.Name); + entity.StatusName = nameof(EventStatus.Published); + entity.ExpiresAt = DateTime.UtcNow.AddSeconds(options.PublishedEventsExpiredAfterSeconds); + return (targetName, bytes); + }); + eventEntities.ForEach(entity => + { + entity.StatusName = nameof(EventStatus.Published); + entity.ExpiresAt = DateTime.UtcNow.AddSeconds(options.PublishedEventsExpiredAfterSeconds); + }); + logger.LogTrace($"Begin BulkPublish {DateTime.Now.ToString("yyyy-MM-dd hh:mm:ss")} "); + await producer.BulkPublish(events); + logger.LogTrace($"Begin BulkPublish {DateTime.Now.ToString("yyyy-MM-dd hh:mm:ss")} "); + if (eventEntities.Count > 10) + { + await eventStorage.BulkChangePublishStateAsync(eventEntities); + } + else + { + await eventStorage.ChangePublishStateAsync(eventEntities); + } + + logger.LogTrace($"End ExecuteCore {DateTime.Now.ToString("yyyy-MM-dd hh:mm:ss")} Count:{eventEntities.Count} "); + } + } +} diff --git a/src/Pole.EventBus/EventHandler/IPoleEventHandler.cs b/src/Pole.EventBus/EventHandler/IPoleEventHandler.cs new file mode 100644 index 0000000..ca204f8 --- /dev/null +++ b/src/Pole.EventBus/EventHandler/IPoleEventHandler.cs @@ -0,0 +1,23 @@ +using Orleans; +using Pole.EventBus.Event; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; + +namespace Pole.EventBus.EventHandler +{ + public interface IPoleEventHandler : IPoleEventHandler + { + Task EventHandle(TEvent @event); + } + public interface IPoleBulkEventsHandler : IPoleEventHandler + { + Task BulkEventsHandle(List events); + } + public interface IPoleEventHandler : IGrainWithStringKey + { + public Task Invoke(EventBytesTransport transport); + public Task Invoke(List transports); + } +} diff --git a/src/Pole.EventBus/EventHandler/PoleEventHandler.cs b/src/Pole.EventBus/EventHandler/PoleEventHandler.cs new file mode 100644 index 0000000..a2bb825 --- /dev/null +++ b/src/Pole.EventBus/EventHandler/PoleEventHandler.cs @@ -0,0 +1,96 @@ +using Microsoft.Extensions.Logging; +using Orleans.Concurrency; +using Pole.EventBus.Event; +using Pole.Core.Serialization; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; +using Microsoft.Extensions.DependencyInjection; +using System.Reflection.Emit; +using System.Linq.Expressions; +using System.Linq; +using Pole.Core.Exceptions; +using Orleans; +using Pole.Core.Utils.Abstraction; + +namespace Pole.EventBus.EventHandler +{ + /// + /// + /// + public abstract class PoleEventHandler : Grain + { + private IEventTypeFinder eventTypeFinder; + private ISerializer serializer; + private ILogger logger; + private Type grainType; + + public PoleEventHandler() + { + grainType = GetType(); + } + public override async Task OnActivateAsync() + { + await base.OnActivateAsync(); + await DependencyInjection(); + } + protected virtual Task DependencyInjection() + { + //ConfigOptions = ServiceProvider.GetOptionsByName(typeof(MainGrain).FullName); + serializer = ServiceProvider.GetService(); + eventTypeFinder = ServiceProvider.GetService(); + logger = (ILogger)ServiceProvider.GetService(typeof(ILogger<>).MakeGenericType(grainType)); + return Task.CompletedTask; + } + + public Task Invoke(EventBytesTransport transport) + { + var eventType = eventTypeFinder.FindType(transport.EventTypeCode); + + var eventObj = serializer.Deserialize(transport.EventBytes, eventType); + if (this is IPoleEventHandler handler) + { + var result = handler.EventHandle((TEvent)eventObj); + logger.LogTrace($"{nameof(PoleEventHandler)} Invoke completed: {0}->{1}->{2}", grainType.FullName, nameof(handler.EventHandle), serializer.Serialize(eventObj)); + return result; + } + else + { + throw new EventHandlerImplementedNotRightException(nameof(handler.EventHandle), eventType.Name, this.GetType().FullName); + } + } + + public async Task Invoke(List transports) + { + if (transports.Count() != 0) + { + var firstTransport = transports.First(); + var eventType = eventTypeFinder.FindType(firstTransport.EventTypeCode); + var eventObjs = transports.Select(transport => serializer.Deserialize(firstTransport.EventBytes, eventType)).Select(@event => (TEvent)@event).ToList(); + if (this is IPoleBulkEventsHandler batchHandler) + { + await batchHandler.BulkEventsHandle(eventObjs); + logger.LogTrace("Batch invoke completed: {0}->{1}->{2}", grainType.FullName, nameof(batchHandler.BulkEventsHandle), serializer.Serialize(eventObjs)); + return; + } + else if (this is IPoleEventHandler handler) + { + var handleTasks = eventObjs.Select(m => handler.EventHandle(m)); + await Task.WhenAll(handleTasks); + logger.LogTrace("Batch invoke completed: {0}->{1}->{2}", grainType.FullName, nameof(handler.EventHandle), serializer.Serialize(eventObjs)); + return; + } + else + { + throw new EventHandlerImplementedNotRightException(nameof(handler.EventHandle), eventType.Name, this.GetType().FullName); + } + } + else + { + if (logger.IsEnabled(LogLevel.Information)) + logger.LogInformation($"{nameof(EventBytesTransport.FromBytes)} failed"); + } + } + } +} diff --git a/src/Pole.EventBus/EventStorage/EventEntity.cs b/src/Pole.EventBus/EventStorage/EventEntity.cs new file mode 100644 index 0000000..684ee9e --- /dev/null +++ b/src/Pole.EventBus/EventStorage/EventEntity.cs @@ -0,0 +1,17 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace Pole.EventBus.EventStorage +{ + public class EventEntity + { + public string Id { get; set; } + public string Name { get; set; } + public string Content { get; set; } + public DateTime Added { get; set; } + public DateTime? ExpiresAt { get; set; } + public int Retries { get; set; } + public string StatusName { get; set; } + } +} diff --git a/src/Pole.EventBus/EventStorage/EventStatus.cs b/src/Pole.EventBus/EventStorage/EventStatus.cs new file mode 100644 index 0000000..a081eab --- /dev/null +++ b/src/Pole.EventBus/EventStorage/EventStatus.cs @@ -0,0 +1,13 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace Pole.EventBus.EventStorage +{ + public enum EventStatus + { + Failed = -1, + Pending = 0, + Published = 1 + } +} diff --git a/src/Pole.EventBus/EventStorage/IEventStorage.cs b/src/Pole.EventBus/EventStorage/IEventStorage.cs new file mode 100644 index 0000000..2877d83 --- /dev/null +++ b/src/Pole.EventBus/EventStorage/IEventStorage.cs @@ -0,0 +1,22 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace Pole.EventBus.EventStorage +{ + public interface IEventStorage + { + Task ChangePublishStateAsync(EventEntity message, EventStatus state); + Task ChangePublishStateAsync(IEnumerable messages); + Task BulkChangePublishStateAsync(IEnumerable events); + + Task StoreMessage(EventEntity eventEntity, object dbTransaction = null); + + Task DeleteExpiresAsync(string table, DateTime timeout, int batchCount = 1000, + CancellationToken token = default); + + Task> GetMessagesOfNeedRetry(); + } +} diff --git a/src/Pole.EventBus/EventStorage/IEventStorageInitializer.cs b/src/Pole.EventBus/EventStorage/IEventStorageInitializer.cs new file mode 100644 index 0000000..800f811 --- /dev/null +++ b/src/Pole.EventBus/EventStorage/IEventStorageInitializer.cs @@ -0,0 +1,14 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace Pole.EventBus.EventStorage +{ + public interface IEventStorageInitializer + { + Task InitializeAsync(CancellationToken cancellationToken); + string GetTableName(); + } +} diff --git a/src/Pole.EventBus/EventTypeFinder.cs b/src/Pole.EventBus/EventTypeFinder.cs new file mode 100644 index 0000000..7561321 --- /dev/null +++ b/src/Pole.EventBus/EventTypeFinder.cs @@ -0,0 +1,68 @@ +using Microsoft.Extensions.Logging; +using Pole.Core.Domain; +using Pole.Core.Exceptions; +using Pole.Core.Utils; +using Pole.Core.Utils.Abstraction; +using Pole.EventBus.Event; +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Text; + +namespace Pole.EventBus +{ + public class EventTypeFinder : IEventTypeFinder + { + private readonly ConcurrentDictionary codeDict = new ConcurrentDictionary(); + private readonly ConcurrentDictionary typeDict = new ConcurrentDictionary(); + readonly ILogger logger; + public EventTypeFinder(ILogger logger) + { + this.logger = logger; + var baseEventType = typeof(IEvent); + foreach (var assembly in AssemblyHelper.GetAssemblies(this.logger)) + { + foreach (var type in assembly.GetTypes().Where(m => baseEventType.IsAssignableFrom(m)&&!m.IsAbstract)) + { + var eventCode = type.FullName; + var eventAttribute = type.GetCustomAttributes(typeof(EventInfoAttribute),false).FirstOrDefault(); + if (eventAttribute is EventInfoAttribute attribute ) + { + eventCode = attribute.EventName; + } + typeDict.TryAdd(type, eventCode); + + if (!codeDict.TryAdd(eventCode, type)) + { + throw new TypeCodeRepeatedException(type.FullName, type.FullName); + } + } + } + } + /// + /// 通过code获取Type对象 + /// + /// + /// + public Type FindType(string typeCode) + { + if (codeDict.TryGetValue(typeCode, out Type type)) + { + return type; + } + throw new UnknowTypeCodeException(typeCode); + } + /// + /// 获取Type对象的code字符串 + /// + /// + /// + public string GetCode(Type type) + { + if (!typeDict.TryGetValue(type, out var value)) + return type.FullName; + return value; + } + } +} diff --git a/src/Pole.EventBus/IBus.cs b/src/Pole.EventBus/IBus.cs new file mode 100644 index 0000000..303154b --- /dev/null +++ b/src/Pole.EventBus/IBus.cs @@ -0,0 +1,19 @@ +using Pole.EventBus.EventStorage; +using Pole.EventBus.Transaction; +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace Pole.EventBus +{ + public interface IBus + { + IServiceProvider ServiceProvider { get; } + IDbTransactionAdapter Transaction { get; set; } + BlockingCollection PrePublishEventBuffer { get; } + Task Publish(object @event, CancellationToken cancellationToken = default); + } +} diff --git a/src/Pole.EventBus/IConsumer.cs b/src/Pole.EventBus/IConsumer.cs new file mode 100644 index 0000000..f39d389 --- /dev/null +++ b/src/Pole.EventBus/IConsumer.cs @@ -0,0 +1,11 @@ +using System.Collections.Generic; +using System.Threading.Tasks; + +namespace Pole.EventBus +{ + public interface IConsumer + { + Task Notice(byte[] bytes); + Task Notice(List list); + } +} diff --git a/src/Pole.EventBus/IConsumerContainer.cs b/src/Pole.EventBus/IConsumerContainer.cs new file mode 100644 index 0000000..5b74cf4 --- /dev/null +++ b/src/Pole.EventBus/IConsumerContainer.cs @@ -0,0 +1,9 @@ +using System.Collections.Generic; + +namespace Pole.EventBus +{ + public interface IConsumerContainer + { + List GetConsumers(); + } +} diff --git a/src/Pole.EventBus/IEventBuffer.cs b/src/Pole.EventBus/IEventBuffer.cs new file mode 100644 index 0000000..2edc5db --- /dev/null +++ b/src/Pole.EventBus/IEventBuffer.cs @@ -0,0 +1,13 @@ +using Pole.EventBus.EventStorage; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; + +namespace Pole.EventBus +{ + public interface IEventBuffer + { + Task AddAndRun(EventEntity eventEntity); + } +} diff --git a/src/Pole.EventBus/IEventTypeFinder.cs b/src/Pole.EventBus/IEventTypeFinder.cs new file mode 100644 index 0000000..2b3826f --- /dev/null +++ b/src/Pole.EventBus/IEventTypeFinder.cs @@ -0,0 +1,12 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace Pole.EventBus +{ + public interface IEventTypeFinder + { + Type FindType(string code); + string GetCode(Type type); + } +} diff --git a/src/Pole.EventBus/IGrainID.cs b/src/Pole.EventBus/IGrainID.cs new file mode 100644 index 0000000..4b5ca17 --- /dev/null +++ b/src/Pole.EventBus/IGrainID.cs @@ -0,0 +1,9 @@ +using System; + +namespace Pole.EventBus +{ + public interface IGrainID + { + Type EventHandlerType { get; } + } +} diff --git a/src/Pole.EventBus/IObserverUnit.cs b/src/Pole.EventBus/IObserverUnit.cs new file mode 100644 index 0000000..1a3ea57 --- /dev/null +++ b/src/Pole.EventBus/IObserverUnit.cs @@ -0,0 +1,12 @@ +using System; +using System.Collections.Generic; +using System.Threading.Tasks; + +namespace Pole.EventBus +{ + public interface IObserverUnit : IGrainID + { + Func GetEventHandler(); + Func, Task> GetBatchEventHandler(); + } +} diff --git a/src/Pole.EventBus/IObserverUnitContainer.cs b/src/Pole.EventBus/IObserverUnitContainer.cs new file mode 100644 index 0000000..806c311 --- /dev/null +++ b/src/Pole.EventBus/IObserverUnitContainer.cs @@ -0,0 +1,12 @@ +using System; +using System.Collections.Generic; + +namespace Pole.EventBus +{ + public interface IObserverUnitContainer + { + List> GetUnits(string observerName); + List GetUnits(string observerName); + void Register(string observerName,IGrainID followUnit); + } +} diff --git a/src/Pole.EventBus/IProducer.cs b/src/Pole.EventBus/IProducer.cs new file mode 100644 index 0000000..e778913 --- /dev/null +++ b/src/Pole.EventBus/IProducer.cs @@ -0,0 +1,11 @@ +using System.Collections.Generic; +using System.Threading.Tasks; + +namespace Pole.EventBus +{ + public interface IProducer + { + ValueTask Publish(string targetName, byte[] bytes); + ValueTask BulkPublish(IEnumerable<(string,byte[])> events); + } +} diff --git a/src/Pole.EventBus/IProducerInfoContainer.cs b/src/Pole.EventBus/IProducerInfoContainer.cs new file mode 100644 index 0000000..1b55ad8 --- /dev/null +++ b/src/Pole.EventBus/IProducerInfoContainer.cs @@ -0,0 +1,10 @@ +using System; +using System.Threading.Tasks; + +namespace Pole.EventBus +{ + public interface IProducerInfoContainer + { + string GetTargetName(string typeName); + } +} diff --git a/src/Pole.EventBus/ObserverUnit.cs b/src/Pole.EventBus/ObserverUnit.cs new file mode 100644 index 0000000..19aa0f4 --- /dev/null +++ b/src/Pole.EventBus/ObserverUnit.cs @@ -0,0 +1,116 @@ +using Microsoft.Extensions.Logging; +using Orleans; +using Pole.Core.Serialization; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; +using Microsoft.Extensions.DependencyInjection; +using System.Linq; +using Pole.EventBus.Event; +using Orleans.Concurrency; +using System.Collections.Concurrent; +using System.Linq.Expressions; +using Pole.EventBus.EventHandler; +using Pole.Core.Utils.Abstraction; + +namespace Pole.EventBus +{ + public class ObserverUnit : IObserverUnit + { + readonly IServiceProvider serviceProvider; + readonly ISerializer serializer; + readonly IEventTypeFinder typeFinder; + readonly IClusterClient clusterClient; + Func eventHandler; + Func, Task> batchEventHandler; + protected ILogger Logger { get; private set; } + public Type EventHandlerType { get; } + + public ObserverUnit(IServiceProvider serviceProvider, Type eventHandlerType) + { + this.serviceProvider = serviceProvider; + clusterClient = serviceProvider.GetService(); + serializer = serviceProvider.GetService(); + typeFinder = serviceProvider.GetService(); + Logger = serviceProvider.GetService>>(); + EventHandlerType = eventHandlerType; + } + public static ObserverUnit From(IServiceProvider serviceProvider) where Grain : Orleans.Grain + { + return new ObserverUnit(serviceProvider, typeof(Grain)); + } + + public Func GetEventHandler() + { + return eventHandler; + } + + public Func, Task> GetBatchEventHandler() + { + return batchEventHandler; + } + + public void Observer() + { + if (!typeof(IPoleEventHandler).IsAssignableFrom(EventHandlerType)) + throw new NotSupportedException($"{EventHandlerType.FullName} must inheritance from PoleEventHandler"); + eventHandler = EventHandler; + batchEventHandler = BatchEventHandler; + //内部函数 + Task EventHandler(byte[] bytes) + { + var (success, transport) = EventBytesTransport.FromBytes(bytes); + if (success) + { + return GetObserver(EventHandlerType, transport.EventId).Invoke(transport); + } + else + { + if (Logger.IsEnabled(LogLevel.Error)) + Logger.LogError($" EventId:{nameof(EventBytesTransport.EventId)} is not a event"); + } + return Task.CompletedTask; + } + Task BatchEventHandler(List list) + { + var transports = list.Select(bytes => + { + var (success, transport) = EventBytesTransport.FromBytes(bytes); + if (!success) + { + if (Logger.IsEnabled(LogLevel.Error)) + Logger.LogError($" EventId:{nameof(EventBytesTransport.EventId)} is not a event"); + } + return (success, transport); + }).Where(o => o.success) + .Select(o => (o.transport)) + .ToList(); + // 批量处理的时候 grain Id 取第一个 event的id + return GetObserver(EventHandlerType, transports.First().EventId).Invoke(transports); + } + } + static readonly ConcurrentDictionary> _observerGeneratorDict = new ConcurrentDictionary>(); + private IPoleEventHandler GetObserver(Type ObserverType, string primaryKey) + { + var func = _observerGeneratorDict.GetOrAdd(ObserverType, key => + { + var clientType = typeof(IClusterClient); + var clientParams = Expression.Parameter(clientType, "client"); + var primaryKeyParams = Expression.Parameter(typeof(string), "primaryKey"); + var grainClassNamePrefixParams = Expression.Parameter(typeof(string), "grainClassNamePrefix"); + var method = typeof(ClusterClientExtensions).GetMethod("GetGrain", new Type[] { clientType, typeof(string), typeof(string) }); + var body = Expression.Call(method.MakeGenericMethod(ObserverType), clientParams, primaryKeyParams, grainClassNamePrefixParams); + return Expression.Lambda>(body, clientParams, primaryKeyParams, grainClassNamePrefixParams).Compile(); + }); + return func(clusterClient, primaryKey, null); + } + } + public static class ClusterClientExtensions + { + public static TGrainInterface GetGrain(IClusterClient client, string primaryKey, string grainClassNamePrefix = null) where TGrainInterface : IGrainWithStringKey + { + return client.GetGrain(primaryKey, grainClassNamePrefix); + } + } +} diff --git a/src/Pole.EventBus/ObserverUnitContainer.cs b/src/Pole.EventBus/ObserverUnitContainer.cs new file mode 100644 index 0000000..023d163 --- /dev/null +++ b/src/Pole.EventBus/ObserverUnitContainer.cs @@ -0,0 +1,87 @@ +using Microsoft.Extensions.Logging; +using Pole.Core.Utils; +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Text; +using Microsoft.Extensions.DependencyInjection; +using Pole.EventBus.EventHandler; +using System.Linq; +using Pole.Core.Exceptions; +using Pole.EventBus.Event; + +namespace Pole.EventBus +{ + public class ObserverUnitContainer : IObserverUnitContainer + { + readonly ConcurrentDictionary> unitDict = new ConcurrentDictionary>(); + public ObserverUnitContainer(IServiceProvider serviceProvider) + { + var eventHandlerList = new List<(Type, EventInfoAttribute)>(); + foreach (var assembly in AssemblyHelper.GetAssemblies(serviceProvider.GetService>())) + { + foreach (var type in assembly.GetTypes().Where(m => typeof(IPoleEventHandler).IsAssignableFrom(m) && m.IsClass && !m.IsAbstract && !typeof(Orleans.Runtime.GrainReference).IsAssignableFrom(m))) + { + var eventHandlerInterface = type.GetInterfaces().FirstOrDefault(type => typeof(IPoleEventHandler).IsAssignableFrom(type) && !type.IsGenericType); + var basePoleEventHandlerInterface= eventHandlerInterface.GetInterfaces().FirstOrDefault(m=>m.IsGenericType); + + if (basePoleEventHandlerInterface == null) + { + throw new PoleEventHandlerImplementException("PoleEventHandler interface must Inherited from IPoleEventHandler"); + } + var eventType= basePoleEventHandlerInterface.GetGenericArguments().FirstOrDefault(); + if (eventType == null) + { + throw new PoleEventHandlerImplementException("PoleEventHandler interface must Inherited from IPoleEventHandler"); + } + var attribute = eventType.GetCustomAttributes(typeof(EventInfoAttribute), false).FirstOrDefault(); + + if (attribute != null) + { + eventHandlerList.Add((eventHandlerInterface, (EventInfoAttribute)attribute)); + } + else + { + throw new PoleEventHandlerImplementException("Can not found EventHandlerAttribute in PoleEventHandler"); + } + } + } + foreach (var eventHandler in eventHandlerList) + { + var unitType = typeof(ObserverUnit<>).MakeGenericType(new Type[] { typeof(string) }); + var unit = (ObserverUnit)Activator.CreateInstance(unitType, serviceProvider, eventHandler.Item1); + unit.Observer(); + Register(eventHandler.Item2.EventName, unit); + } + } + public List> GetUnits(string observerName) + { + if (unitDict.TryGetValue(observerName, out var units)) + return units.Select(m => (IObserverUnit)m).ToList(); + else + throw new UnfindObserverUnitException(observerName); + } + public List GetUnits(string observerName) + { + if (unitDict.TryGetValue(observerName, out var unit)) + { + return unit; + } + else + throw new UnfindObserverUnitException(observerName); + } + + public void Register(string observerName, IGrainID observerUnit) + { + if (unitDict.TryGetValue(observerName, out List units)) + { + units.Add(observerUnit); + } + if (!unitDict.TryAdd(observerName, new List { observerUnit })) + { + throw new ObserverUnitRepeatedException(observerUnit.EventHandlerType.FullName); + } + } + + } +} diff --git a/src/Pole.EventBus/Pole.EventBus.csproj b/src/Pole.EventBus/Pole.EventBus.csproj new file mode 100644 index 0000000..0a79d01 --- /dev/null +++ b/src/Pole.EventBus/Pole.EventBus.csproj @@ -0,0 +1,11 @@ + + + + netstandard2.1 + + + + + + + diff --git a/src/Pole.EventBus/PoleEventBusStartupConfigExtensions.cs b/src/Pole.EventBus/PoleEventBusStartupConfigExtensions.cs new file mode 100644 index 0000000..cec3faa --- /dev/null +++ b/src/Pole.EventBus/PoleEventBusStartupConfigExtensions.cs @@ -0,0 +1,33 @@ +using Microsoft.Extensions.DependencyInjection; +using Pole.Core; +using Pole.Core.Processor; +using Pole.EventBus.Processor; +using Pole.EventBus.Processor.Server; +using Pole.EventBus.UnitOfWork; +using System; +using System.Collections.Generic; +using System.Text; + +namespace Pole.EventBus +{ + public static class PoleEventBusStartupConfigExtensions + { + public static void AddEventBus( + this StartupConfig startupOption) + { + startupOption.Services.AddSingleton(); + startupOption.Services.AddScoped(); + startupOption.Services.AddSingleton(); + startupOption.Services.AddSingleton(); + startupOption.Services.AddSingleton(); + startupOption.Services.AddHostedService(); + startupOption.Services.AddScoped(); + startupOption.Services.AddSingleton(); + + Startup.Register(async serviceProvider => + { + + }); + } + } +} diff --git a/src/Pole.EventBus/Processor/ExpiredEventsCollectorProcessor.cs b/src/Pole.EventBus/Processor/ExpiredEventsCollectorProcessor.cs new file mode 100644 index 0000000..0b44215 --- /dev/null +++ b/src/Pole.EventBus/Processor/ExpiredEventsCollectorProcessor.cs @@ -0,0 +1,72 @@ +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using Pole.Core; +using Pole.Core.Processor; +using Pole.EventBus.EventStorage; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; + +namespace Pole.EventBus.Processor +{ + class ExpiredEventsCollectorProcessor : IProcessor + { + private readonly ILogger logger; + private readonly IEventStorageInitializer initializer; + private readonly IEventStorage eventstorage; + private readonly PoleOptions poleOptions; + + private const int ItemBatch = 1000; + private readonly TimeSpan _waitingInterval = TimeSpan.FromMinutes(5); + private readonly TimeSpan _delay = TimeSpan.FromSeconds(1); + + public string Name => nameof(PendingMessageRetryProcessor); + + public ExpiredEventsCollectorProcessor( + ILogger logger, + IEventStorageInitializer initializer, + IEventStorage eventstorage, + IOptions poleOptions) + { + this.logger = logger; + this.initializer = initializer; + this.eventstorage = eventstorage; + this.poleOptions = poleOptions.Value; + } + + public async Task Process(ProcessingContext context) + { + try + { + var tables = new[] { initializer.GetTableName() }; + + foreach (var table in tables) + { + logger.LogDebug($"Collecting expired data from table: {table}"); + + int deletedCount; + var time = DateTime.UtcNow; + do + { + deletedCount = await eventstorage.DeleteExpiresAsync(table, time, ItemBatch, context.CancellationToken); + + if (deletedCount != 0) + { + await Task.Delay(poleOptions.ExpiredEventsPreBulkDeleteDelaySeconds * 1000); + } + } while (deletedCount != 0); + } + } + catch (Exception ex) + { + logger.LogError(ex, $"{nameof(ExpiredEventsCollectorProcessor)} Process Error"); + } + finally + { + + await Task.Delay(poleOptions.ExpiredEventsCollectIntervalSeconds * 1000); + } + } + } +} diff --git a/src/Pole.EventBus/Processor/PendingMessageRetryProcessor.cs b/src/Pole.EventBus/Processor/PendingMessageRetryProcessor.cs new file mode 100644 index 0000000..6b76726 --- /dev/null +++ b/src/Pole.EventBus/Processor/PendingMessageRetryProcessor.cs @@ -0,0 +1,93 @@ +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using Pole.EventBus; +using Pole.EventBus.Event; +using Pole.EventBus.EventStorage; +using Pole.Core.Serialization; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Pole.Core.Processor; +using Pole.Core; + +namespace Pole.EventBus.Processor +{ + class PendingMessageRetryProcessor : ProcessorBase + { + private readonly IEventStorage eventStorage; + private readonly PoleOptions options; + private readonly IProducerInfoContainer producerContainer; + private readonly ISerializer serializer; + private readonly ILogger logger; + private readonly ProducerOptions producerOptions; + private readonly IProducer producer; + private readonly IEventBuffer eventBuffer; + public PendingMessageRetryProcessor(IEventStorage eventStorage, IOptions options, ILogger logger, + IProducerInfoContainer producerContainer, ISerializer serializer, IOptions producerOptions, IProducer producer, IEventBuffer eventBuffer) + { + this.eventStorage = eventStorage; + this.options = options.Value ?? throw new Exception($"{nameof(PoleOptions)} Must be injected"); + this.logger = logger; + this.producerContainer = producerContainer; + this.serializer = serializer; + this.producerOptions = producerOptions.Value ?? throw new Exception($"{nameof(ProducerOptions)} Must be injected"); + this.producer = producer; + this.eventBuffer = eventBuffer; + } + public override string Name => nameof(PendingMessageRetryProcessor); + + + public override async Task Process(ProcessingContext context) + { + try + { + await ProcessInternal(); + } + catch (Exception ex) + { + logger.LogError(ex, $"{nameof(PendingMessageRetryProcessor)} Process Error"); + } + finally + { + await Task.Delay(options.PendingMessageRetryIntervalSeconds * 1000); + } + } + public async Task ProcessInternal() + { + var now = DateTime.UtcNow; + var pendingMessages = await eventStorage.GetMessagesOfNeedRetry(); + + if (logger.IsEnabled(LogLevel.Debug)) + { + logger.LogDebug($"{nameof(PendingMessageRetryProcessor)} pendingMessages count:{pendingMessages.Count()}"); + } + foreach (var pendingMessage in pendingMessages) + { + var eventContentBytes = Encoding.UTF8.GetBytes(pendingMessage.Content); + var bytesTransport = new EventBytesTransport(pendingMessage.Name, pendingMessage.Id, eventContentBytes); + var bytes = bytesTransport.GetBytes(); + if (pendingMessage.Retries > producerOptions.MaxFailedRetryCount) + { + pendingMessage.StatusName = nameof(EventStatus.Failed); + continue; + } + pendingMessage.Retries++; + var targetName = producerContainer.GetTargetName(pendingMessage.Name); + await producer.Publish(targetName, bytes); + } + if (pendingMessages.Count() > 0) + { + if (pendingMessages.Count() > 10) + { + await eventStorage.BulkChangePublishStateAsync(pendingMessages); + } + else + { + await eventStorage.ChangePublishStateAsync(pendingMessages); + } + } + } + } +} diff --git a/src/Pole.EventBus/Processor/Server/BackgroundServiceBasedProcessorServer.cs b/src/Pole.EventBus/Processor/Server/BackgroundServiceBasedProcessorServer.cs new file mode 100644 index 0000000..5f7bad7 --- /dev/null +++ b/src/Pole.EventBus/Processor/Server/BackgroundServiceBasedProcessorServer.cs @@ -0,0 +1,48 @@ +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Pole.Core.Processor; +using Pole.EventBus.EventStorage; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace Pole.EventBus.Processor.Server +{ + public class BackgroundServiceBasedProcessorServer : BackgroundService, IProcessorServer + { + private readonly IServiceProvider _serviceProvider; + private Task _compositeTask; + + public BackgroundServiceBasedProcessorServer(IServiceProvider serviceProvider) + { + _serviceProvider = serviceProvider; + } + public async Task Start(CancellationToken stoppingToken) + { + var eventStorageInitializer = _serviceProvider.GetService(); + await eventStorageInitializer.InitializeAsync(stoppingToken); + + ProcessingContext processingContext = new ProcessingContext(stoppingToken); + List loopProcessors = new List(); + var innerProcessors = _serviceProvider.GetServices(); + var loggerFactory = _serviceProvider.GetService(); + foreach (var innerProcessor in innerProcessors) + { + LoopProcessor processor = new LoopProcessor(innerProcessor, loggerFactory); + loopProcessors.Add(processor); + } + var tasks = loopProcessors.Select(p => p.Process(processingContext)); + + _compositeTask = Task.WhenAll(tasks); + await _compositeTask; + } + protected override Task ExecuteAsync(CancellationToken stoppingToken) + { + return Start(stoppingToken); + } + } +} diff --git a/src/Pole.EventBus/Transaction/IDbTransactionAdapter.cs b/src/Pole.EventBus/Transaction/IDbTransactionAdapter.cs new file mode 100644 index 0000000..7a0672b --- /dev/null +++ b/src/Pole.EventBus/Transaction/IDbTransactionAdapter.cs @@ -0,0 +1,17 @@ +using Pole.EventBus.EventStorage; +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace Pole.EventBus.Transaction +{ + public interface IDbTransactionAdapter : IDisposable + { + Task CommitAsync(CancellationToken cancellationToken = default); + Task RollbackAsync(CancellationToken cancellationToken = default); + object DbTransaction { get; set; } + } +} diff --git a/src/Pole.EventBus/UnitOfWork/IUnitOfWork.cs b/src/Pole.EventBus/UnitOfWork/IUnitOfWork.cs new file mode 100644 index 0000000..82e03d3 --- /dev/null +++ b/src/Pole.EventBus/UnitOfWork/IUnitOfWork.cs @@ -0,0 +1,19 @@ +using System; +using System.Collections.Generic; +using System.Data; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Extensions.DependencyInjection; +using Pole.EventBus.Transaction; + +namespace Pole.EventBus.UnitOfWork +{ + public interface IUnitOfWork : IDisposable + { + Task CompeleteAsync(CancellationToken cancellationToken = default); + Task Rollback(CancellationToken cancellationToken = default); + IUnitOfWork Enlist(IDbTransactionAdapter dbTransactionAdapter, IBus bus); + IServiceProvider ServiceProvider { get; } + } +} diff --git a/src/Pole.EventBus/UnitOfWork/UnitOfWork.cs b/src/Pole.EventBus/UnitOfWork/UnitOfWork.cs new file mode 100644 index 0000000..14971b9 --- /dev/null +++ b/src/Pole.EventBus/UnitOfWork/UnitOfWork.cs @@ -0,0 +1,73 @@ +using System; +using System.Collections.Generic; +using System.Data; +using System.Linq; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Extensions.DependencyInjection; +using Pole.Core.Serialization; +using Microsoft.Extensions.Options; +using Pole.Core.Utils.Abstraction; +using Pole.Core.Exceptions; +using Pole.EventBus.Event; +using Pole.EventBus.Transaction; + +namespace Pole.EventBus.UnitOfWork +{ + class UnitOfWork : IUnitOfWork + { + private readonly IProducerInfoContainer producerContainer; + private readonly IEventTypeFinder eventTypeFinder; + private readonly ISerializer serializer; + private IBus bus; + private IEventBuffer eventBuffer; + public IServiceProvider ServiceProvider { get; } + public UnitOfWork(IProducerInfoContainer producerContainer, IEventTypeFinder eventTypeFinder, + ISerializer serializer, IEventBuffer eventBuffer, IServiceProvider serviceProvider) + { + this.producerContainer = producerContainer; + this.eventTypeFinder = eventTypeFinder; + this.serializer = serializer; + this.eventBuffer = eventBuffer; + this.ServiceProvider = serviceProvider; + } + + public async Task CompeleteAsync(CancellationToken cancellationToken = default) + { + + await bus.Transaction.CommitAsync(); + + var bufferedEvents = bus.PrePublishEventBuffer.ToList(); + bufferedEvents.ForEach(async @event => + { + var eventType = eventTypeFinder.FindType(@event.Name); + var eventContentBytes = Encoding.UTF8.GetBytes(@event.Content); + var bytesTransport = new EventBytesTransport(@event.Name, @event.Id, eventContentBytes); + var bytes = bytesTransport.GetBytes(); + var result = await eventBuffer.AddAndRun(@event); + if (!result) + { + throw new AddEventToEventBufferException(); + } + }); + } + + public void Dispose() + { + bus = null; + } + + public IUnitOfWork Enlist(IDbTransactionAdapter dbTransactionAdapter, IBus bus) + { + bus.Transaction = dbTransactionAdapter; + this.bus = bus; + return this; + } + + public Task Rollback(CancellationToken cancellationToken = default) + { + return bus.Transaction.RollbackAsync(); + } + } +} diff --git a/src/Pole.EventStorage.PostgreSql/IUnitOfWorkExtensions.cs b/src/Pole.EventStorage.PostgreSql/IUnitOfWorkExtensions.cs index a7dba92..0d72d77 100644 --- a/src/Pole.EventStorage.PostgreSql/IUnitOfWorkExtensions.cs +++ b/src/Pole.EventStorage.PostgreSql/IUnitOfWorkExtensions.cs @@ -1,8 +1,9 @@ using Microsoft.EntityFrameworkCore.Storage; using Microsoft.Extensions.DependencyInjection; -using Pole.Core.EventBus; -using Pole.Core.EventBus.Transaction; using Pole.Core.UnitOfWork; +using Pole.EventBus; +using Pole.EventBus.Transaction; +using Pole.EventBus.UnitOfWork; using System; using System.Collections.Generic; using System.Data; diff --git a/src/Pole.EventStorage.PostgreSql/Pole.EventStorage.PostgreSql.csproj b/src/Pole.EventStorage.PostgreSql/Pole.EventStorage.PostgreSql.csproj index 567e317..5107250 100644 --- a/src/Pole.EventStorage.PostgreSql/Pole.EventStorage.PostgreSql.csproj +++ b/src/Pole.EventStorage.PostgreSql/Pole.EventStorage.PostgreSql.csproj @@ -11,6 +11,7 @@ + diff --git a/src/Pole.EventStorage.PostgreSql/PoleNpgsqlBulkUploader.cs b/src/Pole.EventStorage.PostgreSql/PoleNpgsqlBulkUploader.cs index 5f3c8cf..9fbd875 100644 --- a/src/Pole.EventStorage.PostgreSql/PoleNpgsqlBulkUploader.cs +++ b/src/Pole.EventStorage.PostgreSql/PoleNpgsqlBulkUploader.cs @@ -1,6 +1,5 @@ using Npgsql; using NpgsqlTypes; -using Pole.Core.EventBus.EventStorage; using System; using System.Collections.Concurrent; using System.Collections.Generic; @@ -8,6 +7,7 @@ using System.Text; using System.Threading; using System.Threading.Tasks; using Dapper; +using Pole.EventBus.EventStorage; namespace Pole.EventStorage.PostgreSql { diff --git a/src/Pole.EventStorage.PostgreSql/PolePostgreSqlStartupConfigExtensions.cs b/src/Pole.EventStorage.PostgreSql/PolePostgreSqlStartupConfigExtensions.cs index 0b2e8ba..6384524 100644 --- a/src/Pole.EventStorage.PostgreSql/PolePostgreSqlStartupConfigExtensions.cs +++ b/src/Pole.EventStorage.PostgreSql/PolePostgreSqlStartupConfigExtensions.cs @@ -1,7 +1,7 @@ using Microsoft.EntityFrameworkCore; using Pole.Core; -using Pole.Core.EventBus.EventStorage; -using Pole.Core.EventBus.Transaction; +using Pole.EventBus.EventStorage; +using Pole.EventBus.Transaction; using Pole.EventStorage.PostgreSql; using System; using System.Collections.Generic; diff --git a/src/Pole.EventStorage.PostgreSql/PostgreSqlDbTransactionAdapter.cs b/src/Pole.EventStorage.PostgreSql/PostgreSqlDbTransactionAdapter.cs index c0c6c84..7f19b3a 100644 --- a/src/Pole.EventStorage.PostgreSql/PostgreSqlDbTransactionAdapter.cs +++ b/src/Pole.EventStorage.PostgreSql/PostgreSqlDbTransactionAdapter.cs @@ -1,9 +1,6 @@ using Microsoft.EntityFrameworkCore.Storage; -using Pole.Core.EventBus; -using Pole.Core.EventBus.Event; -using Pole.Core.EventBus.EventStorage; -using Pole.Core.EventBus.Transaction; using Pole.Core.Serialization; +using Pole.EventBus.Transaction; using System; using System.Collections.Concurrent; using System.Collections.Generic; diff --git a/src/Pole.EventStorage.PostgreSql/PostgreSqlEventStorage.cs b/src/Pole.EventStorage.PostgreSql/PostgreSqlEventStorage.cs index 5759e2e..9c1ee85 100644 --- a/src/Pole.EventStorage.PostgreSql/PostgreSqlEventStorage.cs +++ b/src/Pole.EventStorage.PostgreSql/PostgreSqlEventStorage.cs @@ -3,7 +3,7 @@ using Microsoft.EntityFrameworkCore.Storage; using Microsoft.Extensions.Options; using Npgsql; using Pole.Core; -using Pole.Core.EventBus.EventStorage; +using Pole.EventBus.EventStorage; using System; using System.Collections.Generic; using System.Data; diff --git a/src/Pole.EventStorage.PostgreSql/PostgreSqlEventStorageInitializer.cs b/src/Pole.EventStorage.PostgreSql/PostgreSqlEventStorageInitializer.cs index 0d6f797..7f2fa14 100644 --- a/src/Pole.EventStorage.PostgreSql/PostgreSqlEventStorageInitializer.cs +++ b/src/Pole.EventStorage.PostgreSql/PostgreSqlEventStorageInitializer.cs @@ -2,7 +2,7 @@ using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using Npgsql; -using Pole.Core.EventBus.EventStorage; +using Pole.EventBus.EventStorage; using System; using System.Collections.Generic; using System.Text; diff --git a/src/Pole.Orleans.Provider.EntityframeworkCore/GrainStorage.cs b/src/Pole.Orleans.Provider.EntityframeworkCore/GrainStorage.cs index b752649..df5eb82 100644 --- a/src/Pole.Orleans.Provider.EntityframeworkCore/GrainStorage.cs +++ b/src/Pole.Orleans.Provider.EntityframeworkCore/GrainStorage.cs @@ -8,10 +8,9 @@ using Orleans; using Orleans.Runtime; using Orleans.Storage; using Pole.Core.Domain; -using Pole.Core.EventBus; -using Pole.Core.EventBus.Event; -using Pole.Core.EventBus.Transaction; -using Pole.Core.UnitOfWork; +using Pole.EventBus; +using Pole.EventBus.Transaction; +using Pole.EventBus.UnitOfWork; using System; using System.Collections.Generic; using System.Linq; diff --git a/src/Pole.Orleans.Provider.EntityframeworkCore/Pole.Orleans.Provider.EntityframeworkCore.csproj b/src/Pole.Orleans.Provider.EntityframeworkCore/Pole.Orleans.Provider.EntityframeworkCore.csproj index 3467f5f..4cf517f 100644 --- a/src/Pole.Orleans.Provider.EntityframeworkCore/Pole.Orleans.Provider.EntityframeworkCore.csproj +++ b/src/Pole.Orleans.Provider.EntityframeworkCore/Pole.Orleans.Provider.EntityframeworkCore.csproj @@ -10,6 +10,7 @@ + diff --git a/src/Pole.Sagas.Server/PoleSagasServerOption.cs b/src/Pole.Sagas.Server/PoleSagasServerOption.cs index cb80b5b..6c1efe6 100644 --- a/src/Pole.Sagas.Server/PoleSagasServerOption.cs +++ b/src/Pole.Sagas.Server/PoleSagasServerOption.cs @@ -6,10 +6,25 @@ namespace Pole.Sagas.Server { public class PoleSagasServerOption { + /// + /// 从数据库获取未结束的 sagas 的 时间间隔 单位秒 + /// public int NotEndedSagasFetchIntervalSeconds { get; set; } = 30; + /// + /// 每个Grpc 获取Sagas 的请求 ,服务端流式返回,每一次返回的间隔时间 单位秒 + /// public int GetSagasGrpcStreamingResponseDelaySeconds { get; set; } = 20; + /// + /// 过期数据 批量删除触发的时间间隔,单位秒 + /// public int ExpiredDataBulkDeleteIntervalSeconds { get; set; } = 10*60; + /// + /// 过期数据 批量是每一次删除的数量 + /// public int ExpiredDataDeleteBatchCount { get; set; } = 1000; + /// + /// 批量删除时 实际过期的数量比预定数量要大时,会分多次删除,此值为其中每次分批删除的时间间隔 + /// public int ExpiredDataPreBulkDeleteDelaySeconds { get; set; } = 3; } } diff --git a/src/Pole.Sagas.Server/PoleSagasServerServiceCollectionExtensions.cs b/src/Pole.Sagas.Server/PoleSagasServerServiceCollectionExtensions.cs index c31dc47..2bd53a0 100644 --- a/src/Pole.Sagas.Server/PoleSagasServerServiceCollectionExtensions.cs +++ b/src/Pole.Sagas.Server/PoleSagasServerServiceCollectionExtensions.cs @@ -1,17 +1,21 @@ using Microsoft.Extensions.DependencyInjection; using Pole.Core.Processor; +using Pole.Sagas.Server; using Pole.Sagas.Server.Processor; using System; using System.Collections.Generic; using System.Text; -namespace Pole.Sagas.Server +namespace Microsoft.Extensions.DependencyInjection { public static class PoleSagasServerServiceCollectionExtensions { - public static IServiceCollection AddPoleSagasServer(IServiceCollection services) - { + public static IServiceCollection AddPoleSagasServer(this IServiceCollection services, Action config = null) + { + Action defaultConfig = option => { }; + var finalConfig = config ?? defaultConfig; services.AddGrpc(); + services.Configure(config); services.AddSingleton(); services.AddSingleton(); diff --git a/src/Pole.Sagas.Storage.PostgreSql/PoleSagasStoragePostgreSqlOption.cs b/src/Pole.Sagas.Storage.PostgreSql/PoleSagasStoragePostgreSqlOption.cs index eb0aa2a..89f6453 100644 --- a/src/Pole.Sagas.Storage.PostgreSql/PoleSagasStoragePostgreSqlOption.cs +++ b/src/Pole.Sagas.Storage.PostgreSql/PoleSagasStoragePostgreSqlOption.cs @@ -9,8 +9,6 @@ namespace Pole.Sagas.Storage.PostgreSql public string SagaTableName { get; set; } = "Sagas"; public string SchemaName { get; set; } = "pole-sagas"; public string ActivityTableName { get; set; } = "Activities"; - public string OvertimeCompensationGuaranteeTableName { get; set; } = "OCG-Activities"; - public int SagasRecoveryIntervalSecond { get; set; } public string ConnectionString { get; set; } } } diff --git a/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorageInitializer.cs b/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorageInitializer.cs index d9e6f55..7ac014f 100644 --- a/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorageInitializer.cs +++ b/src/Pole.Sagas.Storage.PostgreSql/PostgreSqlSagaStorageInitializer.cs @@ -2,7 +2,6 @@ using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using Npgsql; -using Pole.Core.EventBus.EventStorage; using Pole.Sagas.Core; using Pole.Sagas.Core.Abstraction; using System; diff --git a/test/Pole.Samples.Backet.Api/Program.cs b/test/Pole.Samples.Backet.Api/Program.cs index 24eaa1d..2f727db 100644 --- a/test/Pole.Samples.Backet.Api/Program.cs +++ b/test/Pole.Samples.Backet.Api/Program.cs @@ -1,7 +1,6 @@ using BenchmarkDotNet.Reports; using BenchmarkDotNet.Running; using Npgsql; -using Pole.Core.EventBus.EventStorage; using Pole.Samples.Backet.Api.Benchmarks; using System; using System.Collections.Generic; -- libgit2 0.25.0