Commit 27593c6c by 丁松杰

添加 本地消息表 基本存储逻辑 本地表的初始化逻辑,添加 本地消息表 事务适配器,事务连接器

parent ac247fc1
......@@ -17,9 +17,13 @@ namespace Pole.Core.EventBus
private readonly IEventTypeFinder eventTypeFinder;
private readonly ISerializer serializer;
private readonly ISnowflakeIdGenerator snowflakeIdGenerator;
AsyncLocal<IDbTransactionAdapter> Transaction { get; }
public Bus(IProducer producer, IEventTypeFinder eventTypeFinder, ISerializer serializer, ISnowflakeIdGenerator snowflakeIdGenerator)
public IDbTransactionAdapter Transaction { get; set; }
public IServiceProvider ServiceProvider { get; }
public Bus(IServiceProvider serviceProvider, IProducer producer, IEventTypeFinder eventTypeFinder, ISerializer serializer, ISnowflakeIdGenerator snowflakeIdGenerator)
{
ServiceProvider = serviceProvider;
this.producer = producer;
this.eventTypeFinder = eventTypeFinder;
this.serializer = serializer;
......
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.Core.EventBus.EventStorage
{
public class EventEntity
{
public string Id { get; set; }
public string Name { get; set; }
public string Content { get; set; }
public DateTime Added { get; set; }
public DateTime ExpiresAt { get; set; }
public int Retries { get; set; }
public string StatusName { get; set; }
}
}
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.Core.EventBus.EventStorage
{
public enum EventStatus
{
Failed = -1,
PrePublish = 0,
Published = 1
}
}
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Pole.Core.EventBus.EventStorage
{
public interface IEventStorage
{
Task ChangePublishStateAsync(EventEntity message, EventStatus state);
Task<bool> StoreMessage(EventEntity eventEntity, object dbTransaction = null);
Task<int> DeleteExpiresAsync(string table, DateTime timeout, int batchCount = 1000,
CancellationToken token = default);
Task<IEnumerable<EventEntity>> GetPublishedMessagesOfNeedRetry();
}
}
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Pole.Core.EventBus.EventStorage
{
public interface IEventStorageInitializer
{
Task InitializeAsync(CancellationToken cancellationToken);
string GetTableName();
}
}
using System;
using Pole.Core.EventBus.Transaction;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
......@@ -8,6 +9,8 @@ namespace Pole.Core.EventBus
{
public interface IBus
{
IServiceProvider ServiceProvider { get; }
IDbTransactionAdapter Transaction { get; set; }
Task<bool> Publish(object @event, CancellationToken cancellationToken = default);
}
}
using Microsoft.Extensions.DependencyInjection;
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.Core
{
public class PoleOptions
{
public IServiceCollection Services { get; private set; }
}
}
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.Core
{
public class ProducerOptions
{
public int FailedRetryCount { get; set; }
}
}
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
using Pole.Core;
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.EventStorage.PostgreSql
{
public static class CapOptionsExtensions
{
public static PoleOptions UseEntityFrameworkEventStorage<TContext>(this PoleOptions options)
where TContext : DbContext
{
return options.UseEntityFrameworkEventStorage<TContext>(opt => { });
}
public static PoleOptions UseEntityFrameworkEventStorage<TContext>(this PoleOptions options, Action<EFOptions> configure)
where TContext : DbContext
{
if (configure == null) throw new ArgumentNullException(nameof(configure));
EFOptions eFOptions = new EFOptions();
configure(eFOptions);
Action<PostgreSqlOptions> postgreSqlOptionsConfig = postgreSqlOptions =>
{
postgreSqlOptions.DbContextType = typeof(TContext);
postgreSqlOptions.Schema = eFOptions.Schema;
using var scope = options.Services.BuildServiceProvider().CreateScope();
var provider = scope.ServiceProvider;
using var dbContext = (DbContext)provider.GetRequiredService(typeof(TContext));
postgreSqlOptions.ConnectionString = dbContext.Database.GetDbConnection().ConnectionString;
};
options.Services.Configure(postgreSqlOptionsConfig);
return options;
}
public static PoleOptions UsePostgreSqlEventStorage<TContext>(this PoleOptions options, Action<PostgreSqlOptions> configure)
where TContext : DbContext
{
if (configure == null) throw new ArgumentNullException(nameof(configure));
options.Services.Configure(configure);
return options;
}
}
}
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.EventStorage.PostgreSql
{
public class EFOptions
{
public const string DefaultSchema = "pole";
public const string DefaultTable = "Events";
/// <summary>
/// Gets or sets the schema to use when creating database objects.
/// Default is <see cref="DefaultSchema" />.
/// </summary>
public string Schema { get; set; } = DefaultSchema;
public string TableName { get; set; } = DefaultTable;
internal Type DbContextType { get; set; }
}
}
using Pole.Core.EventBus;
using Pole.Core.EventBus.Transaction;
using System;
using System.Collections.Generic;
using System.Data;
using System.Text;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.EntityFrameworkCore.Storage;
namespace Pole.EventStorage.PostgreSql
{
public static class EnlistBusExtensions
{
public static IDbTransaction EnlistBus(this IDbTransaction dbTransaction, IBus bus, bool autoCommit = false)
{
bus.Transaction = bus.ServiceProvider.GetService<IDbTransactionAdapter>();
bus.Transaction.DbTransaction = dbTransaction;
bus.Transaction.AutoCommit = autoCommit;
return dbTransaction;
}
public static IDbContextTransaction EnlistBus(this IDbContextTransaction dbContextTransaction, IBus bus, bool autoCommit = false)
{
bus.Transaction = bus.ServiceProvider.GetService<IDbTransactionAdapter>();
bus.Transaction.DbTransaction = dbContextTransaction;
bus.Transaction.AutoCommit = autoCommit;
return dbContextTransaction;
}
}
}
......@@ -5,6 +5,7 @@
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Dapper" Version="2.0.30" />
<PackageReference Include="Npgsql.EntityFrameworkCore.PostgreSQL" Version="3.1.1.2" />
</ItemGroup>
......
......@@ -29,12 +29,21 @@ namespace Pole.EventStorage.PostgreSql
public void Dispose()
{
throw new NotImplementedException();
(DbTransaction as IDbTransaction)?.Dispose();
DbTransaction = null;
}
public Task RollbackAsync(CancellationToken cancellationToken = default)
public async Task RollbackAsync(CancellationToken cancellationToken = default)
{
throw new NotImplementedException();
switch (DbTransaction)
{
case IDbTransaction dbTransaction:
dbTransaction.Rollback();
break;
case IDbContextTransaction dbContextTransaction:
await dbContextTransaction.RollbackAsync(cancellationToken);
break;
}
}
}
}
using Dapper;
using Microsoft.EntityFrameworkCore.Storage;
using Microsoft.Extensions.Options;
using Npgsql;
using Pole.Core;
using Pole.Core.EventBus.EventStorage;
using System;
using System.Collections.Generic;
using System.Data;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Pole.EventStorage.PostgreSql
{
class PostgreSqlEventStorage : IEventStorage
{
private readonly string tableName;
private readonly ProducerOptions producerOptions;
private readonly PostgreSqlOptions options;
private readonly IEventStorageInitializer eventStorageInitializer;
public PostgreSqlEventStorage(IOptions<PostgreSqlOptions> postgreSqlOptions, IOptions<ProducerOptions> producerOptions, IEventStorageInitializer eventStorageInitializer)
{
this.producerOptions = producerOptions.Value;
this.options = postgreSqlOptions.Value;
this.eventStorageInitializer = eventStorageInitializer;
tableName = eventStorageInitializer.GetTableName();
}
public async Task ChangePublishStateAsync(EventEntity message, EventStatus state)
{
var sql =
$"UPDATE {tableName} SET \"Retries\"=@Retries,\"ExpiresAt\"=@ExpiresAt,\"StatusName\"=@StatusName WHERE \"Id\"=@Id";
using var connection = new NpgsqlConnection(options.ConnectionString);
await connection.ExecuteAsync(sql, new
{
Id = long.Parse(message.Id),
message.Retries,
message.ExpiresAt,
StatusName = state.ToString("G")
});
}
public async Task<int> DeleteExpiresAsync(string table, DateTime timeout, int batchCount = 1000, CancellationToken token = default)
{
using var connection = new NpgsqlConnection(options.ConnectionString);
return await connection.ExecuteAsync(
$"DELETE FROM {table} WHERE \"ExpiresAt\" < @timeout AND \"Id\" IN (SELECT \"Id\" FROM {table} LIMIT @batchCount);",
new { timeout, batchCount });
}
public async Task<IEnumerable<EventEntity>> GetPublishedMessagesOfNeedRetry()
{
var fourMinAgo = DateTime.Now.AddMinutes(-4).ToString("O");
var sql =
$"SELECT * FROM {tableName} WHERE \"Retries\"<{producerOptions.FailedRetryCount} AND \"Added\"<'{fourMinAgo}' AND (\"StatusName\"='{EventStatus.Failed}' OR \"StatusName\"='{EventStatus.PrePublish}') LIMIT 200;";
var result = new List<EventEntity>();
using var connection = new NpgsqlConnection(options.ConnectionString);
var reader = await connection.ExecuteReaderAsync(sql);
while (reader.Read())
{
result.Add(new EventEntity
{
Id = reader.GetString(0),
Retries = reader.GetInt32(4),
Added = reader.GetDateTime(5)
});
}
return result;
}
public async Task<bool> StoreMessage(EventEntity eventEntity, object dbTransaction = null)
{
var sql =
$"INSERT INTO {tableName} (\"Id\",\"Version\",\"Name\",\"Content\",\"Retries\",\"Added\",\"ExpiresAt\",\"StatusName\")" +
$"VALUES(@Id,'1',@Name,@Content,@Retries,@Added,@ExpiresAt,@StatusName);";
if (dbTransaction == null)
{
using var connection = new NpgsqlConnection(options.ConnectionString);
return await connection.ExecuteAsync(sql, eventEntity) > 0;
}
else
{
var dbTrans = dbTransaction as IDbTransaction;
if (dbTrans == null && dbTransaction is IDbContextTransaction dbContextTrans)
dbTrans = dbContextTrans.GetDbTransaction();
var conn = dbTrans?.Connection;
return await conn.ExecuteAsync(sql, eventEntity, dbTrans) > 0;
}
}
}
}
using Dapper;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Npgsql;
using Pole.Core.EventBus.EventStorage;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Pole.EventStorage.PostgreSql
{
class PostgreSqlEventStorageInitializer : IEventStorageInitializer
{
private readonly PostgreSqlOptions options;
private readonly ILogger logger;
public PostgreSqlEventStorageInitializer(IOptions<PostgreSqlOptions> options,Logger<PostgreSqlEventStorageInitializer> logger)
{
this.options = options.Value;
this.logger = logger;
}
public string GetTableName()
{
return $"\"{options.Schema}\".\"{options.TableName}\"";
}
public async Task InitializeAsync(CancellationToken cancellationToken)
{
if (cancellationToken.IsCancellationRequested) return;
var sql = CreateDbTablesScript(options.Schema);
using (var connection = new NpgsqlConnection(options.ConnectionString))
{
await connection.ExecuteAsync(sql);
}
logger.LogDebug("Ensuring all create database tables script are applied.");
}
protected virtual string CreateDbTablesScript(string schema)
{
var batchSql = $@"
CREATE SCHEMA IF NOT EXISTS ""{schema}"";
CREATE TABLE IF NOT EXISTS {GetTableName()}(
""Id"" VARCHAR(20) PRIMARY KEY NOT NULL,
""Version"" VARCHAR(20) NOT NULL,
""Name"" VARCHAR(200) NOT NULL,
""Content"" TEXT NULL,
""Retries"" INT NOT NULL,
""Added"" TIMESTAMP NOT NULL,
""ExpiresAt"" TIMESTAMP NULL,
""StatusName"" VARCHAR(10) NOT NULL
);";
return batchSql;
}
}
}
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.EventStorage.PostgreSql
{
public class Class1
public class PostgreSqlOptions : EFOptions
{
public string ConnectionString { get; set; }
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment