From 27593c6ca2c2aac4c25f2831986d68511514c75e Mon Sep 17 00:00:00 2001 From: 丁松杰 <377973147@qq.com> Date: Thu, 13 Feb 2020 16:06:37 +0800 Subject: [PATCH] 添加 本地消息表 基本存储逻辑 本地表的初始化逻辑,添加 本地消息表 事务适配器,事务连接器 --- src/Pole.Core/EventBus/Bus.cs | 8 ++++++-- src/Pole.Core/EventBus/EventStorage/EventEntity.cs | 17 +++++++++++++++++ src/Pole.Core/EventBus/EventStorage/EventStatus.cs | 13 +++++++++++++ src/Pole.Core/EventBus/EventStorage/IEventStorage.cs | 20 ++++++++++++++++++++ src/Pole.Core/EventBus/EventStorage/IEventStorageInitializer.cs | 15 +++++++++++++++ src/Pole.Core/EventBus/IBus.cs | 5 ++++- src/Pole.Core/PoleOptions.cs | 12 ++++++++++++ src/Pole.Core/ProducerOptions.cs | 11 +++++++++++ src/Pole.EventStorage.PostgreSql/CapOptionsExtensions.cs | 44 ++++++++++++++++++++++++++++++++++++++++++++ src/Pole.EventStorage.PostgreSql/Class1.cs | 8 -------- src/Pole.EventStorage.PostgreSql/EFOptions.cs | 20 ++++++++++++++++++++ src/Pole.EventStorage.PostgreSql/EnlistBusExtensions.cs | 29 +++++++++++++++++++++++++++++ src/Pole.EventStorage.PostgreSql/Pole.EventStorage.PostgreSql.csproj | 1 + src/Pole.EventStorage.PostgreSql/PostgreSqlDbTransactionAdapter.cs | 15 ++++++++++++--- src/Pole.EventStorage.PostgreSql/PostgreSqlEventStorage.cs | 96 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/Pole.EventStorage.PostgreSql/PostgreSqlEventStorageInitializer.cs | 58 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/Pole.EventStorage.PostgreSql/PostgreSqlOptions.cs | 11 +++++++++++ 17 files changed, 369 insertions(+), 14 deletions(-) create mode 100644 src/Pole.Core/EventBus/EventStorage/EventEntity.cs create mode 100644 src/Pole.Core/EventBus/EventStorage/EventStatus.cs create mode 100644 src/Pole.Core/EventBus/EventStorage/IEventStorage.cs create mode 100644 src/Pole.Core/EventBus/EventStorage/IEventStorageInitializer.cs create mode 100644 src/Pole.Core/PoleOptions.cs create mode 100644 src/Pole.Core/ProducerOptions.cs create mode 100644 src/Pole.EventStorage.PostgreSql/CapOptionsExtensions.cs delete mode 100644 src/Pole.EventStorage.PostgreSql/Class1.cs create mode 100644 src/Pole.EventStorage.PostgreSql/EFOptions.cs create mode 100644 src/Pole.EventStorage.PostgreSql/EnlistBusExtensions.cs create mode 100644 src/Pole.EventStorage.PostgreSql/PostgreSqlEventStorage.cs create mode 100644 src/Pole.EventStorage.PostgreSql/PostgreSqlEventStorageInitializer.cs create mode 100644 src/Pole.EventStorage.PostgreSql/PostgreSqlOptions.cs diff --git a/src/Pole.Core/EventBus/Bus.cs b/src/Pole.Core/EventBus/Bus.cs index 7586706..f074055 100644 --- a/src/Pole.Core/EventBus/Bus.cs +++ b/src/Pole.Core/EventBus/Bus.cs @@ -17,9 +17,13 @@ namespace Pole.Core.EventBus private readonly IEventTypeFinder eventTypeFinder; private readonly ISerializer serializer; private readonly ISnowflakeIdGenerator snowflakeIdGenerator; - AsyncLocal Transaction { get; } - public Bus(IProducer producer, IEventTypeFinder eventTypeFinder, ISerializer serializer, ISnowflakeIdGenerator snowflakeIdGenerator) + public IDbTransactionAdapter Transaction { get; set; } + + public IServiceProvider ServiceProvider { get; } + + public Bus(IServiceProvider serviceProvider, IProducer producer, IEventTypeFinder eventTypeFinder, ISerializer serializer, ISnowflakeIdGenerator snowflakeIdGenerator) { + ServiceProvider = serviceProvider; this.producer = producer; this.eventTypeFinder = eventTypeFinder; this.serializer = serializer; diff --git a/src/Pole.Core/EventBus/EventStorage/EventEntity.cs b/src/Pole.Core/EventBus/EventStorage/EventEntity.cs new file mode 100644 index 0000000..2535c95 --- /dev/null +++ b/src/Pole.Core/EventBus/EventStorage/EventEntity.cs @@ -0,0 +1,17 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace Pole.Core.EventBus.EventStorage +{ + public class EventEntity + { + public string Id { get; set; } + public string Name { get; set; } + public string Content { get; set; } + public DateTime Added { get; set; } + public DateTime ExpiresAt { get; set; } + public int Retries { get; set; } + public string StatusName { get; set; } + } +} diff --git a/src/Pole.Core/EventBus/EventStorage/EventStatus.cs b/src/Pole.Core/EventBus/EventStorage/EventStatus.cs new file mode 100644 index 0000000..9c43086 --- /dev/null +++ b/src/Pole.Core/EventBus/EventStorage/EventStatus.cs @@ -0,0 +1,13 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace Pole.Core.EventBus.EventStorage +{ + public enum EventStatus + { + Failed = -1, + PrePublish = 0, + Published = 1 + } +} diff --git a/src/Pole.Core/EventBus/EventStorage/IEventStorage.cs b/src/Pole.Core/EventBus/EventStorage/IEventStorage.cs new file mode 100644 index 0000000..c282a76 --- /dev/null +++ b/src/Pole.Core/EventBus/EventStorage/IEventStorage.cs @@ -0,0 +1,20 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace Pole.Core.EventBus.EventStorage +{ + public interface IEventStorage + { + Task ChangePublishStateAsync(EventEntity message, EventStatus state); + + Task StoreMessage(EventEntity eventEntity, object dbTransaction = null); + + Task DeleteExpiresAsync(string table, DateTime timeout, int batchCount = 1000, + CancellationToken token = default); + + Task> GetPublishedMessagesOfNeedRetry(); + } +} diff --git a/src/Pole.Core/EventBus/EventStorage/IEventStorageInitializer.cs b/src/Pole.Core/EventBus/EventStorage/IEventStorageInitializer.cs new file mode 100644 index 0000000..6a28845 --- /dev/null +++ b/src/Pole.Core/EventBus/EventStorage/IEventStorageInitializer.cs @@ -0,0 +1,15 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace Pole.Core.EventBus.EventStorage +{ + public interface IEventStorageInitializer + { + Task InitializeAsync(CancellationToken cancellationToken); + + string GetTableName(); + } +} diff --git a/src/Pole.Core/EventBus/IBus.cs b/src/Pole.Core/EventBus/IBus.cs index de0f234..2c8cd2b 100644 --- a/src/Pole.Core/EventBus/IBus.cs +++ b/src/Pole.Core/EventBus/IBus.cs @@ -1,4 +1,5 @@ -using System; +using Pole.Core.EventBus.Transaction; +using System; using System.Collections.Generic; using System.Text; using System.Threading; @@ -8,6 +9,8 @@ namespace Pole.Core.EventBus { public interface IBus { + IServiceProvider ServiceProvider { get; } + IDbTransactionAdapter Transaction { get; set; } Task Publish(object @event, CancellationToken cancellationToken = default); } } diff --git a/src/Pole.Core/PoleOptions.cs b/src/Pole.Core/PoleOptions.cs new file mode 100644 index 0000000..3059b92 --- /dev/null +++ b/src/Pole.Core/PoleOptions.cs @@ -0,0 +1,12 @@ +using Microsoft.Extensions.DependencyInjection; +using System; +using System.Collections.Generic; +using System.Text; + +namespace Pole.Core +{ + public class PoleOptions + { + public IServiceCollection Services { get; private set; } + } +} diff --git a/src/Pole.Core/ProducerOptions.cs b/src/Pole.Core/ProducerOptions.cs new file mode 100644 index 0000000..ccf436b --- /dev/null +++ b/src/Pole.Core/ProducerOptions.cs @@ -0,0 +1,11 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace Pole.Core +{ + public class ProducerOptions + { + public int FailedRetryCount { get; set; } + } +} diff --git a/src/Pole.EventStorage.PostgreSql/CapOptionsExtensions.cs b/src/Pole.EventStorage.PostgreSql/CapOptionsExtensions.cs new file mode 100644 index 0000000..b23cbd1 --- /dev/null +++ b/src/Pole.EventStorage.PostgreSql/CapOptionsExtensions.cs @@ -0,0 +1,44 @@ +using Microsoft.EntityFrameworkCore; +using Microsoft.Extensions.DependencyInjection; +using Pole.Core; +using System; +using System.Collections.Generic; +using System.Text; + +namespace Pole.EventStorage.PostgreSql +{ + public static class CapOptionsExtensions + { + public static PoleOptions UseEntityFrameworkEventStorage(this PoleOptions options) + where TContext : DbContext + { + return options.UseEntityFrameworkEventStorage(opt => { }); + } + public static PoleOptions UseEntityFrameworkEventStorage(this PoleOptions options, Action configure) + where TContext : DbContext + { + if (configure == null) throw new ArgumentNullException(nameof(configure)); + EFOptions eFOptions = new EFOptions(); + configure(eFOptions); + Action postgreSqlOptionsConfig = postgreSqlOptions => + { + postgreSqlOptions.DbContextType = typeof(TContext); + postgreSqlOptions.Schema = eFOptions.Schema; + using var scope = options.Services.BuildServiceProvider().CreateScope(); + var provider = scope.ServiceProvider; + using var dbContext = (DbContext)provider.GetRequiredService(typeof(TContext)); + postgreSqlOptions.ConnectionString = dbContext.Database.GetDbConnection().ConnectionString; + }; + options.Services.Configure(postgreSqlOptionsConfig); + + return options; + } + public static PoleOptions UsePostgreSqlEventStorage(this PoleOptions options, Action configure) + where TContext : DbContext + { + if (configure == null) throw new ArgumentNullException(nameof(configure)); + options.Services.Configure(configure); + return options; + } + } +} diff --git a/src/Pole.EventStorage.PostgreSql/Class1.cs b/src/Pole.EventStorage.PostgreSql/Class1.cs deleted file mode 100644 index 4822ca7..0000000 --- a/src/Pole.EventStorage.PostgreSql/Class1.cs +++ /dev/null @@ -1,8 +0,0 @@ -using System; - -namespace Pole.EventStorage.PostgreSql -{ - public class Class1 - { - } -} diff --git a/src/Pole.EventStorage.PostgreSql/EFOptions.cs b/src/Pole.EventStorage.PostgreSql/EFOptions.cs new file mode 100644 index 0000000..ad31f17 --- /dev/null +++ b/src/Pole.EventStorage.PostgreSql/EFOptions.cs @@ -0,0 +1,20 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace Pole.EventStorage.PostgreSql +{ + public class EFOptions + { + public const string DefaultSchema = "pole"; + public const string DefaultTable = "Events"; + /// + /// Gets or sets the schema to use when creating database objects. + /// Default is . + /// + public string Schema { get; set; } = DefaultSchema; + public string TableName { get; set; } = DefaultTable; + + internal Type DbContextType { get; set; } + } +} diff --git a/src/Pole.EventStorage.PostgreSql/EnlistBusExtensions.cs b/src/Pole.EventStorage.PostgreSql/EnlistBusExtensions.cs new file mode 100644 index 0000000..c15c4d4 --- /dev/null +++ b/src/Pole.EventStorage.PostgreSql/EnlistBusExtensions.cs @@ -0,0 +1,29 @@ +using Pole.Core.EventBus; +using Pole.Core.EventBus.Transaction; +using System; +using System.Collections.Generic; +using System.Data; +using System.Text; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.EntityFrameworkCore.Storage; + +namespace Pole.EventStorage.PostgreSql +{ + public static class EnlistBusExtensions + { + public static IDbTransaction EnlistBus(this IDbTransaction dbTransaction, IBus bus, bool autoCommit = false) + { + bus.Transaction = bus.ServiceProvider.GetService(); + bus.Transaction.DbTransaction = dbTransaction; + bus.Transaction.AutoCommit = autoCommit; + return dbTransaction; + } + public static IDbContextTransaction EnlistBus(this IDbContextTransaction dbContextTransaction, IBus bus, bool autoCommit = false) + { + bus.Transaction = bus.ServiceProvider.GetService(); + bus.Transaction.DbTransaction = dbContextTransaction; + bus.Transaction.AutoCommit = autoCommit; + return dbContextTransaction; + } + } +} diff --git a/src/Pole.EventStorage.PostgreSql/Pole.EventStorage.PostgreSql.csproj b/src/Pole.EventStorage.PostgreSql/Pole.EventStorage.PostgreSql.csproj index 96da4d5..567e317 100644 --- a/src/Pole.EventStorage.PostgreSql/Pole.EventStorage.PostgreSql.csproj +++ b/src/Pole.EventStorage.PostgreSql/Pole.EventStorage.PostgreSql.csproj @@ -5,6 +5,7 @@ + diff --git a/src/Pole.EventStorage.PostgreSql/PostgreSqlDbTransactionAdapter.cs b/src/Pole.EventStorage.PostgreSql/PostgreSqlDbTransactionAdapter.cs index ee443aa..ed114e5 100644 --- a/src/Pole.EventStorage.PostgreSql/PostgreSqlDbTransactionAdapter.cs +++ b/src/Pole.EventStorage.PostgreSql/PostgreSqlDbTransactionAdapter.cs @@ -29,12 +29,21 @@ namespace Pole.EventStorage.PostgreSql public void Dispose() { - throw new NotImplementedException(); + (DbTransaction as IDbTransaction)?.Dispose(); + DbTransaction = null; } - public Task RollbackAsync(CancellationToken cancellationToken = default) + public async Task RollbackAsync(CancellationToken cancellationToken = default) { - throw new NotImplementedException(); + switch (DbTransaction) + { + case IDbTransaction dbTransaction: + dbTransaction.Rollback(); + break; + case IDbContextTransaction dbContextTransaction: + await dbContextTransaction.RollbackAsync(cancellationToken); + break; + } } } } diff --git a/src/Pole.EventStorage.PostgreSql/PostgreSqlEventStorage.cs b/src/Pole.EventStorage.PostgreSql/PostgreSqlEventStorage.cs new file mode 100644 index 0000000..f7fc6fe --- /dev/null +++ b/src/Pole.EventStorage.PostgreSql/PostgreSqlEventStorage.cs @@ -0,0 +1,96 @@ +using Dapper; +using Microsoft.EntityFrameworkCore.Storage; +using Microsoft.Extensions.Options; +using Npgsql; +using Pole.Core; +using Pole.Core.EventBus.EventStorage; +using System; +using System.Collections.Generic; +using System.Data; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace Pole.EventStorage.PostgreSql +{ + class PostgreSqlEventStorage : IEventStorage + { + private readonly string tableName; + private readonly ProducerOptions producerOptions; + private readonly PostgreSqlOptions options; + private readonly IEventStorageInitializer eventStorageInitializer; + public PostgreSqlEventStorage(IOptions postgreSqlOptions, IOptions producerOptions, IEventStorageInitializer eventStorageInitializer) + { + this.producerOptions = producerOptions.Value; + this.options = postgreSqlOptions.Value; + this.eventStorageInitializer = eventStorageInitializer; + tableName = eventStorageInitializer.GetTableName(); + } + public async Task ChangePublishStateAsync(EventEntity message, EventStatus state) + { + var sql = + $"UPDATE {tableName} SET \"Retries\"=@Retries,\"ExpiresAt\"=@ExpiresAt,\"StatusName\"=@StatusName WHERE \"Id\"=@Id"; + using var connection = new NpgsqlConnection(options.ConnectionString); + await connection.ExecuteAsync(sql, new + { + Id = long.Parse(message.Id), + message.Retries, + message.ExpiresAt, + StatusName = state.ToString("G") + }); + } + + public async Task DeleteExpiresAsync(string table, DateTime timeout, int batchCount = 1000, CancellationToken token = default) + { + using var connection = new NpgsqlConnection(options.ConnectionString); + + return await connection.ExecuteAsync( + $"DELETE FROM {table} WHERE \"ExpiresAt\" < @timeout AND \"Id\" IN (SELECT \"Id\" FROM {table} LIMIT @batchCount);", + new { timeout, batchCount }); + } + + public async Task> GetPublishedMessagesOfNeedRetry() + { + var fourMinAgo = DateTime.Now.AddMinutes(-4).ToString("O"); + var sql = + $"SELECT * FROM {tableName} WHERE \"Retries\"<{producerOptions.FailedRetryCount} AND \"Added\"<'{fourMinAgo}' AND (\"StatusName\"='{EventStatus.Failed}' OR \"StatusName\"='{EventStatus.PrePublish}') LIMIT 200;"; + + var result = new List(); + using var connection = new NpgsqlConnection(options.ConnectionString); + var reader = await connection.ExecuteReaderAsync(sql); + while (reader.Read()) + { + result.Add(new EventEntity + { + Id = reader.GetString(0), + Retries = reader.GetInt32(4), + Added = reader.GetDateTime(5) + }); + } + + return result; + } + + public async Task StoreMessage(EventEntity eventEntity, object dbTransaction = null) + { + var sql = + $"INSERT INTO {tableName} (\"Id\",\"Version\",\"Name\",\"Content\",\"Retries\",\"Added\",\"ExpiresAt\",\"StatusName\")" + + $"VALUES(@Id,'1',@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName);"; + + if (dbTransaction == null) + { + using var connection = new NpgsqlConnection(options.ConnectionString); + return await connection.ExecuteAsync(sql, eventEntity) > 0; + } + else + { + var dbTrans = dbTransaction as IDbTransaction; + if (dbTrans == null && dbTransaction is IDbContextTransaction dbContextTrans) + dbTrans = dbContextTrans.GetDbTransaction(); + + var conn = dbTrans?.Connection; + return await conn.ExecuteAsync(sql, eventEntity, dbTrans) > 0; + } + } + } +} diff --git a/src/Pole.EventStorage.PostgreSql/PostgreSqlEventStorageInitializer.cs b/src/Pole.EventStorage.PostgreSql/PostgreSqlEventStorageInitializer.cs new file mode 100644 index 0000000..9a4fbd4 --- /dev/null +++ b/src/Pole.EventStorage.PostgreSql/PostgreSqlEventStorageInitializer.cs @@ -0,0 +1,58 @@ +using Dapper; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using Npgsql; +using Pole.Core.EventBus.EventStorage; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace Pole.EventStorage.PostgreSql +{ + class PostgreSqlEventStorageInitializer : IEventStorageInitializer + { + private readonly PostgreSqlOptions options; + private readonly ILogger logger; + public PostgreSqlEventStorageInitializer(IOptions options,Logger logger) + { + this.options = options.Value; + this.logger = logger; + } + public string GetTableName() + { + return $"\"{options.Schema}\".\"{options.TableName}\""; + } + + public async Task InitializeAsync(CancellationToken cancellationToken) + { + if (cancellationToken.IsCancellationRequested) return; + + var sql = CreateDbTablesScript(options.Schema); + using (var connection = new NpgsqlConnection(options.ConnectionString)) + { + await connection.ExecuteAsync(sql); + } + + logger.LogDebug("Ensuring all create database tables script are applied."); + } + protected virtual string CreateDbTablesScript(string schema) + { + var batchSql = $@" +CREATE SCHEMA IF NOT EXISTS ""{schema}""; + +CREATE TABLE IF NOT EXISTS {GetTableName()}( + ""Id"" VARCHAR(20) PRIMARY KEY NOT NULL, + ""Version"" VARCHAR(20) NOT NULL, + ""Name"" VARCHAR(200) NOT NULL, + ""Content"" TEXT NULL, + ""Retries"" INT NOT NULL, + ""Added"" TIMESTAMP NOT NULL, + ""ExpiresAt"" TIMESTAMP NULL, + ""StatusName"" VARCHAR(10) NOT NULL +);"; + return batchSql; + } + } +} diff --git a/src/Pole.EventStorage.PostgreSql/PostgreSqlOptions.cs b/src/Pole.EventStorage.PostgreSql/PostgreSqlOptions.cs new file mode 100644 index 0000000..7ac30db --- /dev/null +++ b/src/Pole.EventStorage.PostgreSql/PostgreSqlOptions.cs @@ -0,0 +1,11 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace Pole.EventStorage.PostgreSql +{ + public class PostgreSqlOptions : EFOptions + { + public string ConnectionString { get; set; } + } +} -- libgit2 0.25.0