Commit ef9a3f0e by dingsongjie

删掉 之前版本的项目

parent 3ac920dd
Showing with 0 additions and 3227 deletions
......@@ -11,26 +11,12 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Pole.Core", "src\Pole.Core\
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Pole.Application", "src\Pole.Application\Pole.Application.csproj", "{C7825E5B-4FB0-4498-B8D1-E9EC0BC1AA5C}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Pole.Domain", "src\Pole.Domain\Pole.Domain.csproj", "{6F6DBA49-4274-4C62-BBE7-91FCC5B77989}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Pole.Domain.EntityframeworkCore", "src\Pole.Domain.EntityframeworkCore\Pole.Domain.EntityframeworkCore.csproj", "{1C26BE3A-CBEA-47D1-97A0-6DB4F41DFF5A}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Pole.Grpc", "src\Pole.Grpc\Pole.Grpc.csproj", "{F40FE25F-6081-4B29-A7BD-CB5C24F6FDA8}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "samples", "samples", "{4A0FB696-EC29-4A5F-B40B-A6FC56001ADB}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "apis", "apis", "{475116FC-DEEC-4255-94E4-AE7B8C85038D}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Pole.ReliableMessage", "src\Pole.ReliableMessage\Pole.ReliableMessage.csproj", "{699C75AB-4814-4E16-A3F3-9735C4C609FE}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Pole.ReliableMessage.Masstransit", "src\Pole.ReliableMessage.Masstransit\Pole.ReliableMessage.Masstransit.csproj", "{051BECA5-5E65-4FCB-9B7F-C9E64809E218}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Pole.ReliableMessage.Storage.Abstraction", "src\Pole.ReliableMessage.Storage.Abstraction\Pole.ReliableMessage.Storage.Abstraction.csproj", "{3D92F460-350B-4614-A6F6-C00A2D0FA9E2}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Pole.ReliableMessage.Storage.Mongodb", "src\Pole.ReliableMessage.Storage.Mongodb\Pole.ReliableMessage.Storage.Mongodb.csproj", "{793C73C6-93DE-4A56-B979-137914B247F2}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Pole.ReliableMessage.Storage.EntityframeworkCore", "src\Pole.ReliableMessage.Storage.EntityframeworkCore\Pole.ReliableMessage.Storage.EntityframeworkCore.csproj", "{805CF4F7-CCDC-4390-A92B-55E0FFA7F659}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "webs", "webs", "{452B9D9E-881E-4E0E-A90B-98F2253F20F1}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Order.Api", "samples\apis\Order.Api\Order.Api.csproj", "{098DF771-6DC6-45D4-ABFA-FF84E8F7750B}"
......@@ -67,38 +53,10 @@ Global
{C7825E5B-4FB0-4498-B8D1-E9EC0BC1AA5C}.Debug|Any CPU.Build.0 = Debug|Any CPU
{C7825E5B-4FB0-4498-B8D1-E9EC0BC1AA5C}.Release|Any CPU.ActiveCfg = Release|Any CPU
{C7825E5B-4FB0-4498-B8D1-E9EC0BC1AA5C}.Release|Any CPU.Build.0 = Release|Any CPU
{6F6DBA49-4274-4C62-BBE7-91FCC5B77989}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{6F6DBA49-4274-4C62-BBE7-91FCC5B77989}.Debug|Any CPU.Build.0 = Debug|Any CPU
{6F6DBA49-4274-4C62-BBE7-91FCC5B77989}.Release|Any CPU.ActiveCfg = Release|Any CPU
{6F6DBA49-4274-4C62-BBE7-91FCC5B77989}.Release|Any CPU.Build.0 = Release|Any CPU
{1C26BE3A-CBEA-47D1-97A0-6DB4F41DFF5A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{1C26BE3A-CBEA-47D1-97A0-6DB4F41DFF5A}.Debug|Any CPU.Build.0 = Debug|Any CPU
{1C26BE3A-CBEA-47D1-97A0-6DB4F41DFF5A}.Release|Any CPU.ActiveCfg = Release|Any CPU
{1C26BE3A-CBEA-47D1-97A0-6DB4F41DFF5A}.Release|Any CPU.Build.0 = Release|Any CPU
{F40FE25F-6081-4B29-A7BD-CB5C24F6FDA8}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{F40FE25F-6081-4B29-A7BD-CB5C24F6FDA8}.Debug|Any CPU.Build.0 = Debug|Any CPU
{F40FE25F-6081-4B29-A7BD-CB5C24F6FDA8}.Release|Any CPU.ActiveCfg = Release|Any CPU
{F40FE25F-6081-4B29-A7BD-CB5C24F6FDA8}.Release|Any CPU.Build.0 = Release|Any CPU
{699C75AB-4814-4E16-A3F3-9735C4C609FE}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{699C75AB-4814-4E16-A3F3-9735C4C609FE}.Debug|Any CPU.Build.0 = Debug|Any CPU
{699C75AB-4814-4E16-A3F3-9735C4C609FE}.Release|Any CPU.ActiveCfg = Release|Any CPU
{699C75AB-4814-4E16-A3F3-9735C4C609FE}.Release|Any CPU.Build.0 = Release|Any CPU
{051BECA5-5E65-4FCB-9B7F-C9E64809E218}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{051BECA5-5E65-4FCB-9B7F-C9E64809E218}.Debug|Any CPU.Build.0 = Debug|Any CPU
{051BECA5-5E65-4FCB-9B7F-C9E64809E218}.Release|Any CPU.ActiveCfg = Release|Any CPU
{051BECA5-5E65-4FCB-9B7F-C9E64809E218}.Release|Any CPU.Build.0 = Release|Any CPU
{3D92F460-350B-4614-A6F6-C00A2D0FA9E2}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{3D92F460-350B-4614-A6F6-C00A2D0FA9E2}.Debug|Any CPU.Build.0 = Debug|Any CPU
{3D92F460-350B-4614-A6F6-C00A2D0FA9E2}.Release|Any CPU.ActiveCfg = Release|Any CPU
{3D92F460-350B-4614-A6F6-C00A2D0FA9E2}.Release|Any CPU.Build.0 = Release|Any CPU
{793C73C6-93DE-4A56-B979-137914B247F2}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{793C73C6-93DE-4A56-B979-137914B247F2}.Debug|Any CPU.Build.0 = Debug|Any CPU
{793C73C6-93DE-4A56-B979-137914B247F2}.Release|Any CPU.ActiveCfg = Release|Any CPU
{793C73C6-93DE-4A56-B979-137914B247F2}.Release|Any CPU.Build.0 = Release|Any CPU
{805CF4F7-CCDC-4390-A92B-55E0FFA7F659}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{805CF4F7-CCDC-4390-A92B-55E0FFA7F659}.Debug|Any CPU.Build.0 = Debug|Any CPU
{805CF4F7-CCDC-4390-A92B-55E0FFA7F659}.Release|Any CPU.ActiveCfg = Release|Any CPU
{805CF4F7-CCDC-4390-A92B-55E0FFA7F659}.Release|Any CPU.Build.0 = Release|Any CPU
{098DF771-6DC6-45D4-ABFA-FF84E8F7750B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{098DF771-6DC6-45D4-ABFA-FF84E8F7750B}.Debug|Any CPU.Build.0 = Debug|Any CPU
{098DF771-6DC6-45D4-ABFA-FF84E8F7750B}.Release|Any CPU.ActiveCfg = Release|Any CPU
......@@ -142,15 +100,8 @@ Global
GlobalSection(NestedProjects) = preSolution
{CA80F6EF-95A0-4BB7-BA8B-02E167E82865} = {9932C965-8B38-4F70-9E43-86DC56860E2B}
{C7825E5B-4FB0-4498-B8D1-E9EC0BC1AA5C} = {9932C965-8B38-4F70-9E43-86DC56860E2B}
{6F6DBA49-4274-4C62-BBE7-91FCC5B77989} = {9932C965-8B38-4F70-9E43-86DC56860E2B}
{1C26BE3A-CBEA-47D1-97A0-6DB4F41DFF5A} = {9932C965-8B38-4F70-9E43-86DC56860E2B}
{F40FE25F-6081-4B29-A7BD-CB5C24F6FDA8} = {9932C965-8B38-4F70-9E43-86DC56860E2B}
{475116FC-DEEC-4255-94E4-AE7B8C85038D} = {4A0FB696-EC29-4A5F-B40B-A6FC56001ADB}
{699C75AB-4814-4E16-A3F3-9735C4C609FE} = {9932C965-8B38-4F70-9E43-86DC56860E2B}
{051BECA5-5E65-4FCB-9B7F-C9E64809E218} = {9932C965-8B38-4F70-9E43-86DC56860E2B}
{3D92F460-350B-4614-A6F6-C00A2D0FA9E2} = {9932C965-8B38-4F70-9E43-86DC56860E2B}
{793C73C6-93DE-4A56-B979-137914B247F2} = {9932C965-8B38-4F70-9E43-86DC56860E2B}
{805CF4F7-CCDC-4390-A92B-55E0FFA7F659} = {9932C965-8B38-4F70-9E43-86DC56860E2B}
{452B9D9E-881E-4E0E-A90B-98F2253F20F1} = {4A0FB696-EC29-4A5F-B40B-A6FC56001ADB}
{098DF771-6DC6-45D4-ABFA-FF84E8F7750B} = {475116FC-DEEC-4255-94E4-AE7B8C85038D}
{125B1E4B-B1C1-4F85-9C6A-38815960E654} = {475116FC-DEEC-4255-94E4-AE7B8C85038D}
......
using MediatR;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
using Pole.EntityframeworkCore.MediatR;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Pole.Domain.EntityframeworkCore
{
public class EFCoreRepository<TEntity> : IRepository<TEntity>
where TEntity : Entity, IAggregateRoot
{
protected readonly DbContext _dbContext;
private readonly IMediator _mediator;
public EFCoreRepository(IServiceProvider serviceProvider)
{
var dbContextOptions = serviceProvider.GetRequiredService<DbContextOptions>();
_dbContext = serviceProvider.GetRequiredService(dbContextOptions.ContextType) as DbContext;
_mediator = serviceProvider.GetRequiredService<IMediator>();
}
public void Add(TEntity entity)
{
_dbContext.Set<TEntity>().Add(entity);
}
public virtual void Delete(TEntity entity)
{
_dbContext.Set<TEntity>().Remove(entity);
}
public virtual Task<TEntity> Get(string id)
{
return _dbContext.Set<TEntity>().FirstOrDefaultAsync(m => m.Id == id);
}
public async Task<bool> SaveEntitiesAsync(CancellationToken cancellationToken = default)
{
await _mediator.DispatchDomainEventsAsync(_dbContext);
await _dbContext.SaveChangesAsync(cancellationToken);
return true;
}
public void Update(TEntity entity)
{
throw new NotImplementedException();
}
}
}
using MediatR;
using Microsoft.EntityFrameworkCore;
using Pole.Domain;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace Pole.EntityframeworkCore.MediatR
{
public static class MediatorExtension
{
public static async Task DispatchDomainEventsAsync(this IMediator mediator, DbContext ctx)
{
var domainEntities = ctx.ChangeTracker
.Entries<Entity>()
.Where(x => x.Entity.DomainEvents != null && x.Entity.DomainEvents.Any());
var domainEvents = domainEntities
.SelectMany(x => x.Entity.DomainEvents)
.ToList();
domainEntities.ToList()
.ForEach(entity => entity.Entity.ClearDomainEvents());
var tasks = domainEvents
.Select(async (domainEvent) =>
{
await mediator.Publish(domainEvent);
});
await Task.WhenAll(tasks);
}
}
}
using MediatR;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Pole.Domain.EntityframeworkCore.MediatR
{
public class NoMediator : IMediator
{
public Task Publish(object notification, CancellationToken cancellationToken = default)
{
return Task.CompletedTask;
}
public Task Publish<TNotification>(TNotification notification, CancellationToken cancellationToken = default) where TNotification : INotification
{
return Task.CompletedTask;
}
public Task<TResponse> Send<TResponse>(IRequest<TResponse> request, CancellationToken cancellationToken = default)
{
return Task.FromResult(default(TResponse));
}
public Task<object> Send(object request, CancellationToken cancellationToken = default)
{
return Task.FromResult(default(object));
}
}
}
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="MediatR" Version="8.0.0" />
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="3.1.0" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Pole.Application\Pole.Application.csproj" />
<ProjectReference Include="..\Pole.Domain\Pole.Domain.csproj" />
</ItemGroup>
</Project>
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.DependencyInjection;
using Pole.Application;
using Pole.Domain.EntityframeworkCore;
using Pole.Domain.EntityframeworkCore.UnitOfWork;
using Pole.Domain.UnitOfWork;
using System;
using System.Collections.Generic;
using System.Text;
namespace Microsoft.Extensions.DependencyInjection
{
public static class PoleApplicationOptionsExtension
{
public static PoleOptions AddPoleEntityFrameworkCoreDomain(this PoleOptions options)
{
options.Services.AddScoped<IWorker, EntityFrameworkCoreTransactionWorker>();
return options;
}
}
}
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Storage;
using Microsoft.Extensions.DependencyInjection;
using Pole.Domain.UnitOfWork;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Pole.Domain.EntityframeworkCore.UnitOfWork
{
public class EntityFrameworkCoreTransactionWorker : IWorker
{
private readonly IDbContextTransaction _dbContextTransaction;
public EntityFrameworkCoreTransactionWorker(DbContextOptions dbContextOptions, IServiceProvider serviceProvider)
{
var dbContext = serviceProvider.GetRequiredService(dbContextOptions.ContextType) as DbContext;
_dbContextTransaction = dbContext.Database.BeginTransaction();
}
public int Order => 100;
public WorkerStatus WorkerStatus { get; set; }
public async Task Commit(CancellationToken cancellationToken = default)
{
await _dbContextTransaction.CommitAsync(cancellationToken);
WorkerStatus = WorkerStatus.Commited;
}
public void Dispose()
{
// 无需手动dispose
//_dbContextTransaction?.Dispose();
}
public Task PreCommit(CancellationToken cancellationToken = default)
{
WorkerStatus = WorkerStatus.PostCommited;
return Task.FromResult(1);
}
public async Task Rollback(CancellationToken cancellationToken = default)
{
await _dbContextTransaction.RollbackAsync();
WorkerStatus = WorkerStatus.Rollbacked;
}
}
}
using MediatR;
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.Domain
{
public abstract class Entity
{
string _id;
public virtual string Id
{
get
{
return _id;
}
set
{
_id = value;
}
}
public List<IDomainEvent> DomainEvents { get; private set; }
public bool IsTransient()
{
return string.IsNullOrEmpty(this._id);
}
public override bool Equals(object obj)
{
if (obj == null || !(obj is Entity))
return false;
if (Object.ReferenceEquals(this, obj))
return true;
if (this.GetType() != obj.GetType())
return false;
Entity item = (Entity)obj;
if (item.IsTransient() || this.IsTransient())
return false;
else
return item.Id == this.Id;
}
public override int GetHashCode()
{
return this.Id.GetHashCode();
}
public static bool operator ==(Entity left, Entity right)
{
if (Object.Equals(left, null))
return (Object.Equals(right, null)) ? true : false;
else
return left.Equals(right);
}
public static bool operator !=(Entity left, Entity right)
{
return !(left == right);
}
public void AddDomainEvent(IDomainEvent eventItem)
{
DomainEvents = DomainEvents ?? new List<IDomainEvent>();
DomainEvents.Add(eventItem);
}
public void RemoveDomainEvent(IDomainEvent eventItem)
{
if (DomainEvents is null) return;
DomainEvents.Remove(eventItem);
}
public void ClearDomainEvents()
{
DomainEvents?.Clear();
}
}
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Text;
namespace Pole.Domain
{
public abstract class Enumeration : IComparable
{
public string Name { get; private set; }
public int Id { get; private set; }
protected Enumeration()
{
}
protected Enumeration(int id, string name)
{
Id = id;
Name = name;
}
public override string ToString()
{
return Name;
}
public static IEnumerable<T> GetAll<T>() where T : Enumeration
{
var fields = typeof(T).GetFields(BindingFlags.Public | BindingFlags.Static | BindingFlags.DeclaredOnly);
return fields.Select(f => f.GetValue(null)).Cast<T>();
}
public override bool Equals(object obj)
{
var otherValue = obj as Enumeration;
if (otherValue == null)
{
return false;
}
var typeMatches = GetType().Equals(obj.GetType());
var valueMatches = Id.Equals(otherValue.Id);
return typeMatches && valueMatches;
}
public override int GetHashCode()
{
return Id.GetHashCode();
}
public static int AbsoluteDifference(Enumeration firstValue, Enumeration secondValue)
{
var absoluteDifference = Math.Abs(firstValue.Id - secondValue.Id);
return absoluteDifference;
}
public static T FromValue<T>(int value) where T : Enumeration
{
var matchingItem = Parse<T, int>(value, "value", item => item.Id == value);
return matchingItem;
}
public static T FromDisplayName<T>(string displayName) where T : Enumeration
{
var matchingItem = Parse<T, string>(displayName, "display name", item => item.Name == displayName);
return matchingItem;
}
private static T Parse<T, K>(K value, string description, Func<T, bool> predicate) where T : Enumeration
{
var matchingItem = GetAll<T>().FirstOrDefault(predicate);
if (matchingItem == null)
{
var message = string.Format("'{0}' is not a valid {1} in {2}", value, description, typeof(T));
throw new InvalidOperationException(message);
}
return matchingItem;
}
public int CompareTo(object other)
{
return Id.CompareTo(((Enumeration)other).Id);
}
}
}
using MediatR;
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.Domain
{
public interface IAggregateRoot { }
}
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.Domain
{
public interface ISoftDeleteable
{
bool IsDelete { get; set; }
}
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace Pole.Domain
{
public abstract class ValueObject
{
protected static bool EqualOperator(ValueObject left, ValueObject right)
{
if (left is null ^ right is null)
{
return false;
}
return left is null || left.Equals(right);
}
protected static bool NotEqualOperator(ValueObject left, ValueObject right)
{
return !(EqualOperator(left, right));
}
protected abstract IEnumerable<object> GetAtomicValues();
public override bool Equals(object obj)
{
if (obj == null || obj.GetType() != GetType())
{
return false;
}
ValueObject other = (ValueObject)obj;
IEnumerator<object> thisValues = GetAtomicValues().GetEnumerator();
IEnumerator<object> otherValues = other.GetAtomicValues().GetEnumerator();
while (thisValues.MoveNext() && otherValues.MoveNext())
{
if (thisValues.Current is null ^ otherValues.Current is null)
{
return false;
}
if (thisValues.Current != null && !thisValues.Current.Equals(otherValues.Current))
{
return false;
}
}
return !thisValues.MoveNext() && !otherValues.MoveNext();
}
public override int GetHashCode()
{
return GetAtomicValues()
.Select(x => x != null ? x.GetHashCode() : 0)
.Aggregate((x, y) => x ^ y);
}
public ValueObject GetCopy()
{
return this.MemberwiseClone() as ValueObject;
}
}
}
using MediatR;
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.Domain
{
public interface IDomainEvent : INotification
{
}
}
using MediatR;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Pole.Domain
{
public interface IDomainEventHandler<TCommand> : INotificationHandler<TCommand> where TCommand : IDomainEvent
{
}
}
using Pole.Core.DependencyInjection;
using Pole.Domain.UnitOfWork;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Pole.Domain
{
public interface IRepository<T> : IRepository where T : IAggregateRoot
{
void Update(T entity);
void Delete(T entity);
void Add(T entity);
Task<T> Get(string id);
Task<bool> SaveEntitiesAsync(CancellationToken cancellationToken = default);
}
public interface IRepository: IScopedDenpendency
{
}
}
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="MediatR" Version="8.0.0" />
<PackageReference Include="MediatR.Extensions.Microsoft.DependencyInjection" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="3.1.0" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Pole.Core\Pole.Core.csproj" />
</ItemGroup>
</Project>
using Microsoft.Extensions.DependencyInjection;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Pole.Domain.UnitOfWork
{
public class DefaultUnitOfWork : IUnitOfWork
{
private readonly List<IWorker> _workers;
public DefaultUnitOfWork(IServiceProvider serviceProvider)
{
_workers = serviceProvider.GetServices<IWorker>().ToList();
}
public async Task CompeleteAsync(CancellationToken cancellationToken = default)
{
var preCommitTasks = _workers.OrderBy(worker => worker.Order).Select(async worker =>
{
await worker.PreCommit();
});
await Task.WhenAll(preCommitTasks);
try
{
var commitTasks = _workers.OrderBy(worker => worker.Order).Select(async worker =>
{
await worker.Commit();
});
await Task.WhenAll(commitTasks);
}
catch (Exception ex)
{
var rollbackTasks = _workers.OrderBy(worker => worker.Order).Where(worker => worker.WorkerStatus == WorkerStatus.Commited).Select(async worker =>
{
await worker.Rollback();
});
await Task.WhenAll(rollbackTasks);
throw ex;
}
}
public void Dispose()
{
// Workers 都是 scoped 的 每次请求结束后 会自动 dispose 所以这里不需要 调用 Workers 的 dispose
//_workers.OrderBy(worker => worker.Order).ToList().ForEach(m => m.Dispose());
}
public Task Rollback(CancellationToken cancellationToken = default)
{
_workers.OrderBy(worker => worker.Order).ToList().ForEach(m => m.Rollback());
return Task.FromResult(1);
}
}
}
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Pole.Domain.UnitOfWork
{
public interface IUnitOfWork : IDisposable
{
Task CompeleteAsync(CancellationToken cancellationToken = default);
Task Rollback(CancellationToken cancellationToken = default);
}
}
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Pole.Domain.UnitOfWork
{
public interface IWorker : IDisposable
{
int Order { get; }
WorkerStatus WorkerStatus { get; }
Task PreCommit(CancellationToken cancellationToken = default);
Task Commit(CancellationToken cancellationToken = default);
Task Rollback(CancellationToken cancellationToken = default);
}
}
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.Domain.UnitOfWork
{
public class UnitOfWorkResult
{
public static UnitOfWorkResult SuccessResult = new UnitOfWorkResult(1, "保存成功");
public static UnitOfWorkResult FaildResult = new UnitOfWorkResult(1, "保存失败");
public UnitOfWorkResult(int status, string message)
{
Status = status;
Message = message;
}
/// <summary>
/// 1 Success 2 Faild ...
/// </summary>
public int Status { get;private set; }
public string Message { get;private set; }
}
}
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.Domain.UnitOfWork
{
public enum WorkerStatus
{
Init = 0,
PreCommited = 1,
Commited = 2,
PostCommited = 3,
Rollbacked = 4
}
}
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
</PropertyGroup>
</Project>
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.ReliableMessage.Masstransit.Abstraction
{
public interface IReliableEvent
{
}
}
using Pole.ReliableMessage.EventBus;
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.ReliableMessage.Masstransit.Abstraction
{
public interface IReliableEventHandlerRegistrarFactory
{
MasstransitEventHandlerRegistrar Create(Type eventHandlerType);
}
}
using Pole.ReliableMessage.Abstraction;
using MassTransit;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
namespace Pole.ReliableMessage.Masstransit
{
public class DefaultReliableEventHandlerContext<TEvent> : IReliableEventHandlerContext<TEvent>
where TEvent : class
{
private readonly ConsumeContext<TEvent> _executeContext;
public DefaultReliableEventHandlerContext(ConsumeContext<TEvent> executeContext)
{
_executeContext = executeContext;
this.Event = executeContext.Message;
}
public TEvent Event { get; private set; }
}
}
using Pole.ReliableMessage.EventBus;
using Pole.ReliableMessage.Masstransit.Abstraction;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using Pole.Domain;
namespace Pole.ReliableMessage.Masstransit
{
class DefaultReliableEventHandlerRegistrarFactory : IReliableEventHandlerRegistrarFactory
{
private readonly MasstransitRabbitmqOption _masstransitOptions;
public DefaultReliableEventHandlerRegistrarFactory(IOptions<MasstransitRabbitmqOption> masstransitOptions)
{
_masstransitOptions = masstransitOptions.Value ?? throw new ArgumentNullException(nameof(masstransitOptions));
}
public MasstransitEventHandlerRegistrar Create(Type eventHnadler)
{
if (!eventHnadler.Name.EndsWith(_masstransitOptions.EventHandlerNameSuffix))
{
throw new Exception($"EventHandler Name Must EndWith {_masstransitOptions.EventHandlerNameSuffix}");
}
var reliableEventHandlerParemeterAttribute = eventHnadler.GetCustomAttributes(typeof(ReliableEventHandlerParemeterAttribute), true).FirstOrDefault();
var eventHandlerName = GetQueueName(reliableEventHandlerParemeterAttribute, eventHnadler, _masstransitOptions.QueueNamePrefix, _masstransitOptions.EventHandlerNameSuffix);
var parentEventHandler = eventHnadler.BaseType;
var eventType = parentEventHandler.GetGenericArguments().ToList().FirstOrDefault();
ushort prefetchCount = GetPrefetchCount(eventHnadler, reliableEventHandlerParemeterAttribute);
MasstransitEventHandlerRegistrar eventHandlerRegisterInvoker = new MasstransitEventHandlerRegistrar(eventHandlerName, eventHnadler, eventType, _masstransitOptions.RetryConfigure, prefetchCount);
return eventHandlerRegisterInvoker;
}
private string GetQueueName(object reliableEventHandlerParemeterAttribute, Type eventHnadler, string queueNamePrefix,string eventHandlerNameSuffix)
{
var eventHandlerDefaultName = $"eventHandler-{ eventHnadler.Name.Replace(eventHandlerNameSuffix, "").ToLowerInvariant()}";
var eventHandlerName = string.IsNullOrEmpty(queueNamePrefix) ? eventHandlerDefaultName : $"{queueNamePrefix}-{eventHandlerDefaultName}";
if (reliableEventHandlerParemeterAttribute != null)
{
var reliableEventHandlerParemeterAttributeType = reliableEventHandlerParemeterAttribute.GetType();
var prefetchCountPropertyInfo = reliableEventHandlerParemeterAttributeType.GetProperty(nameof(ReliableEventHandlerParemeterAttribute.QueueHaType));
var queueHaTypeValue = Convert.ToInt32(prefetchCountPropertyInfo.GetValue(reliableEventHandlerParemeterAttribute));
if (queueHaTypeValue != 0)
{
var currentQueueType = Enumeration.FromValue<QueueHaType>(queueHaTypeValue);
eventHandlerName = currentQueueType.GenerateQueueName(eventHandlerName);
}
}
return eventHandlerName;
}
private ushort GetPrefetchCount(Type eventHnadler, object reliableEventHandlerParemeterAttribute)
{
var prefetchCount = _masstransitOptions.PrefetchCount;
if (reliableEventHandlerParemeterAttribute != null)
{
var reliableEventHandlerParemeterAttributeType = reliableEventHandlerParemeterAttribute.GetType();
var prefetchCountPropertyInfo = reliableEventHandlerParemeterAttributeType.GetProperty(nameof(ReliableEventHandlerParemeterAttribute.PrefetchCount));
var prefetchCountValue = Convert.ToUInt16(prefetchCountPropertyInfo.GetValue(reliableEventHandlerParemeterAttribute));
if (prefetchCountValue != 0)
{
prefetchCount = prefetchCountValue;
}
}
return prefetchCount;
}
}
}
using MassTransit;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Pole.ReliableMessage.Masstransit
{
public class MassTransitHostedService : IHostedService
{
readonly IBusControl _bus;
readonly ILogger _logger;
public MassTransitHostedService(IBusControl bus, ILogger<MassTransitHostedService> logger)
{
_bus = bus;
_logger = logger;
}
public async Task StartAsync(CancellationToken cancellationToken)
{
await _bus.StartAsync();
_logger.LogInformation("MassTransit Bus Start Successful");
}
public async Task StopAsync(CancellationToken cancellationToken)
{
await _bus.StopAsync();
_logger.LogInformation("MassTransit Bus Stop Successful");
}
}
}
using Pole.ReliableMessage.Abstraction;
using Pole.ReliableMessage.Masstransit.Pipe;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Pole.ReliableMessage.Masstransit
{
class MasstransitBasedMessageBus : IMessageBus
{
public MasstransitBasedMessageBus(MassTransit.IBus bus)
{
_bus = bus;
}
private readonly MassTransit.IBus _bus;
public Task Publish(object @event, string reliableMessageId, CancellationToken cancellationToken = default)
{
var pipe = new AddReliableMessageIdPipe(reliableMessageId);
return _bus.Publish(@event, pipe, cancellationToken);
}
}
}
using GreenPipes;
using GreenPipes.Configurators;
using MassTransit;
using MassTransit.ExtensionsDependencyInjectionIntegration;
using MassTransit.RabbitMqTransport;
using Microsoft.Extensions.DependencyInjection;
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.ReliableMessage.EventBus
{
public class MasstransitEventHandlerRegistrar
{
private readonly string _queueName;
private readonly Type _eventHandlerType;
private readonly Type _eventHandlerEventType;
private readonly Action<IRetryConfigurator> _retryConfigure;
public readonly ushort _prefetchCount;
public MasstransitEventHandlerRegistrar(string eventHandlerName, Type eventHandlerType, Type eventHandlerEventType, Action<IRetryConfigurator> retryConfigure, ushort prefetchCount)
{
_queueName = eventHandlerName;
_eventHandlerType = eventHandlerType;
_eventHandlerEventType = eventHandlerEventType;
_retryConfigure = retryConfigure;
_prefetchCount = prefetchCount;
}
public void RegisterEventHandler(IServiceCollectionConfigurator serviceCollectionConfigurator, IServiceCollection services)
{
serviceCollectionConfigurator.AddConsumer(_eventHandlerType);
}
public void RegisterQueue(IServiceCollectionConfigurator serviceCollectionConfigurator, IRabbitMqBusFactoryConfigurator rabbitMqBusFactoryConfigurator, IRabbitMqHost rabbitMqHost, IServiceProvider serviceProvider)
{
//serviceCollectionConfigurator.AddConsumer(_eventHandlerType);
rabbitMqBusFactoryConfigurator.ReceiveEndpoint(_queueName, conf =>
{
//conf.Consumer()
conf.ConfigureConsumer(serviceProvider, _eventHandlerType);
conf.PrefetchCount = _prefetchCount;
conf.UseMessageRetry(_retryConfigure);
});
}
}
}
using Pole.ReliableMessage.Abstraction;
using Pole.ReliableMessage.EventBus;
using Pole.ReliableMessage.Masstransit.Abstraction;
using MassTransit;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Text;
using System.Threading.Tasks;
namespace Pole.ReliableMessage.Masstransit
{
class MasstransitMessageBusConfigurator : IMessageBusConfigurator
{
private readonly IReliableEventHandlerRegistrarFactory _reliableEventHandlerRegistrarFactory;
private readonly MasstransitRabbitmqOption _options;
public MasstransitMessageBusConfigurator(IReliableEventHandlerRegistrarFactory reliableEventHandlerRegistrarFactory, IOptions<MasstransitRabbitmqOption> options)
{
_reliableEventHandlerRegistrarFactory = reliableEventHandlerRegistrarFactory;
_options = options.Value;
}
public async Task Configure(IServiceCollection services,IEnumerable<Type> eventHandlerTypes)
{
await Task.CompletedTask;
var eventHandlerRegistrars = GetEventHandlerRegistrars(eventHandlerTypes).ToList();
services.AddMassTransit(x =>
{
foreach (var eventHandlerRegistrar in eventHandlerRegistrars)
{
eventHandlerRegistrar.RegisterEventHandler(x, services);
}
x.AddBus(provider => Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host(new Uri(_options.RabbitMqHostAddress), h =>
{
h.Username(_options.RabbitMqHostUserName);
h.Password(_options.RabbitMqHostPassword);
});
foreach (var eventHandlerRegistrar in eventHandlerRegistrars)
{
eventHandlerRegistrar.RegisterQueue(x, cfg, host, provider);
}
}));
});
}
private IEnumerable<MasstransitEventHandlerRegistrar> GetEventHandlerRegistrars(IEnumerable<Type> eventHandlerTypes)
{
foreach (var eventHandler in eventHandlerTypes)
{
var model = _reliableEventHandlerRegistrarFactory.Create(eventHandler);
yield return model;
}
}
}
}
using GreenPipes;
using GreenPipes.Configurators;
using Microsoft.Extensions.DependencyInjection;
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.ReliableMessage.Masstransit
{
public class MasstransitRabbitmqOption
{
public string RabbitMqHostAddress { get; set; }
public string RabbitMqHostUserName { get; set; }
public string RabbitMqHostPassword { get; set; }
public string QueueNamePrefix { get; set; } = string.Empty;
public string EventHandlerNameSuffix = "EventHandler";
/// <summary>
/// 2 个并发
/// </summary>
public ushort PrefetchCount { get; set; } = 2;
public Action<IRetryConfigurator> RetryConfigure { get; set; } =
r => r.Intervals(TimeSpan.FromSeconds(0.1)
, TimeSpan.FromSeconds(1)
, TimeSpan.FromSeconds(4)
, TimeSpan.FromSeconds(16)
, TimeSpan.FromSeconds(64)
);
}
}
using Pole.ReliableMessage.Abstraction;
using Pole.ReliableMessage.Masstransit.Pipe;
using Pole.ReliableMessage.Messaging;
using MassTransit;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using Pole.ReliableMessage.Storage.Abstraction;
namespace Pole.ReliableMessage.Masstransit
{
public abstract class ReliableEventHandler<TEvent> : IReliableEventHandler<TEvent>, IConsumer<TEvent>
where TEvent : class
{
private readonly ILogger<ReliableEventHandler<TEvent>> _logger;
public ReliableEventHandler(IServiceProvider serviceProvider)
{
var loggerFactory = serviceProvider.GetRequiredService<ILoggerFactory>();
_logger = loggerFactory.CreateLogger<ReliableEventHandler<TEvent>>();
}
public abstract Task Handle(IReliableEventHandlerContext<TEvent> context);
public async Task Consume(ConsumeContext<TEvent> context)
{
var messageId = GetReliableMessageId(context);
_logger.LogDebug($"Message Begin Handle,messageId:{messageId}");
await Handle(new DefaultReliableEventHandlerContext<TEvent>(context));
_logger.LogDebug($"Message handled successfully ,messageId:{messageId}");
}
private string GetReliableMessageId(ConsumeContext<TEvent> context)
{
return context.Headers.Get(AddReliableMessageIdPipe.RELIABLE_MESSAGE_ID, string.Empty);
}
}
}
using Pole.ReliableMessage.Abstraction;
using MassTransit;
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.ReliableMessage.Masstransit.Messaging
{
class DefaultMessageIdGenerator : IMessageIdGenerator
{
public string Generate()
{
return NewId.Next().ToString("N").ToLowerInvariant();
}
}
}
using GreenPipes;
using MassTransit;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
namespace Pole.ReliableMessage.Masstransit.Pipe
{
class AddReliableMessageIdPipe : IPipe<PublishContext>
{
public const string RELIABLE_MESSAGE_ID = "ReliableMessageId";
private readonly string _reliableMessageId;
public AddReliableMessageIdPipe(string reliableMessageId)
{
_reliableMessageId = reliableMessageId;
}
public void Probe(ProbeContext context)
{
}
public async Task Send(PublishContext context)
{
context.Headers.Set(RELIABLE_MESSAGE_ID, _reliableMessageId);
await Task.CompletedTask;
}
}
}
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="MassTransit" Version="6.0.1" />
<PackageReference Include="MassTransit.Extensions.DependencyInjection" Version="6.0.1" />
<PackageReference Include="MassTransit.RabbitMQ" Version="6.0.1" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Pole.ReliableMessage\Pole.ReliableMessage.csproj" />
</ItemGroup>
</Project>
using Pole.Domain;
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.ReliableMessage.Masstransit
{
public class QueueHaType : Enumeration
{
public static QueueHaType None = new QueueHaType(1, "无",string.Empty);
public static QueueHaType Default = new QueueHaType(2, "默认高可用","Rmd.");
public static QueueHaType Backlog = new QueueHaType(3, "消息可积压","Rmb.");
private readonly string _queuePrefix;
public QueueHaType(int id, string name) : base(id, name)
{
}
public QueueHaType(int id, string name,string prefix) : this(id, name)
{
_queuePrefix = prefix;
}
public string GenerateQueueName(string rawQueueName)
{
return string.Concat(_queuePrefix, rawQueueName);
}
}
public enum QueueHaTypeEnum:int
{
None = 1,
Default = 2,
Backlog = 3
}
}
using GreenPipes.Configurators;
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.ReliableMessage.Masstransit
{
[AttributeUsage(AttributeTargets.Class)]
public class ReliableEventHandlerParemeterAttribute : Attribute
{
public ushort PrefetchCount { get; set; }
public QueueHaTypeEnum QueueHaType { get; set; } = QueueHaTypeEnum.Default;
}
}
using Pole.ReliableMessage;
using Pole.ReliableMessage.Abstraction;
using Pole.ReliableMessage.Masstransit;
using Pole.ReliableMessage.Masstransit.Abstraction;
using Microsoft.Extensions.DependencyInjection;
using System;
using System.Collections.Generic;
using System.Text;
using Pole.ReliableMessage.Masstransit.Messaging;
namespace Microsoft.Extensions.DependencyInjection
{
public static class ReliableMessageOptionExtension
{
public static ReliableMessageOption AddMasstransitRabbitmq(this ReliableMessageOption option, Action<MasstransitRabbitmqOption> optionConfig)
{
option.ReliableMessageOptionExtensions.Add(new MasstransitRabbitmqExtension(optionConfig));
return option;
}
}
public class MasstransitRabbitmqExtension : IReliableMessageOptionExtension
{
private readonly Action<MasstransitRabbitmqOption> _masstransitRabbitmqOption;
public MasstransitRabbitmqExtension(Action<MasstransitRabbitmqOption> masstransitRabbitmqOption)
{
_masstransitRabbitmqOption = masstransitRabbitmqOption;
}
public void AddServices(IServiceCollection services)
{
services.Configure(_masstransitRabbitmqOption);
services.AddSingleton<IMessageBus, MasstransitBasedMessageBus>();
services.AddSingleton<IMessageBusConfigurator, MasstransitMessageBusConfigurator>();
services.AddSingleton<IReliableEventHandlerRegistrarFactory, DefaultReliableEventHandlerRegistrarFactory>();
services.AddSingleton<IMessageIdGenerator, DefaultMessageIdGenerator>();
services.AddHostedService<MassTransitHostedService>();
}
}
}
using System;
using System.Collections.Generic;
using System.Net;
using System.Text;
using System.Threading.Tasks;
namespace Pole.ReliableMessage.Storage.Abstraction
{
public interface IMemberShipTableManager
{
Task<bool> IsPendingMessageCheckerServiceInstance(string ipAddress);
Task<bool> UpdateIAmAlive(string ipAddress, DateTime dateTime);
/// <summary>
/// 如果当前 超时时间内 没有可用 实例 返回 空
/// </summary>
/// <param name="iamAliveTimeout"></param>
/// <returns></returns>
Task<string> GetPendingMessageCheckerServiceInstanceIp(DateTime iamAliveEndTime);
Task<bool> AddCheckerServiceInstanceAndDeleteOthers(string ipAddress, DateTime aliveUTCTime);
}
}
using System;
using System.Collections.Generic;
using System.Linq.Expressions;
using System.Text;
using System.Threading.Tasks;
namespace Pole.ReliableMessage.Storage.Abstraction
{
public interface IMessageStorage
{
/// <summary>
///
/// </summary>
/// <param name="message"></param>
/// <returns></returns>
Task<bool> Add(Message message);
Task<long> Delete(Expression<Func<Message, bool>> filter);
/// <summary>
///
/// </summary>
/// <param name="messageStatus"></param>
/// <param name="endRetryTime"></param>
/// <returns></returns>
Task<List<Message>> GetMany(Expression<Func<Message, bool>> filter, int count);
/// <summary>
///
/// </summary>
/// <param name="messageStatus"></param>
/// <param name="endRetryTime"></param>
/// <returns></returns>
Task<Message> GetOne(Expression<Func<Message, bool>> filter);
/// <summary>
/// 批量更新
/// 更新这几个值 MessageStatusId , RetryTimes LastRetryUTCTime, NextRetryUTCTime
/// </summary>
/// <param name="messages"></param>
/// <returns></returns>
Task<bool> Save(IEnumerable<Message> messages);
/// <summary>
/// 检查 消息的状态,如果不是指定状态则返回true,并且更新状态到指定状态 ,如果已经是指定状态返回false
/// </summary>
/// <param name="id"> </param>
/// <param name="messageStatus"></param>
/// <returns></returns>
Task<bool> CheckAndUpdateStatus(Expression<Func<Message, bool>> filter, MessageStatus messageStatus);
Task<bool> UpdateStatus(Expression<Func<Message, bool>> filter, MessageStatus messageStatus);
}
}
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.ReliableMessage.Storage.Abstraction
{
public class MemberShipTable
{
public string Id { get;private set; }
public MemberShipTable(string serviceName,string pendingMessageCheckerIp,DateTime iAmAliveUTCTime)
{
ServiceName = serviceName;
PendingMessageCheckerIp = pendingMessageCheckerIp;
IAmAliveUTCTime = iAmAliveUTCTime;
}
public string ServiceName { get;private set; }
public string PendingMessageCheckerIp { get; private set; }
public DateTime IAmAliveUTCTime { get; private set; }
}
}
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.ReliableMessage.Storage.Abstraction
{
public class Message : IComparable<Message>
{
/// <summary>
/// 这里id 永远为 string
/// </summary>
public string Id { get; set; }
/// <summary>
/// 消息状态
/// </summary>
public MessageStatus MessageStatus { get; set; }
/// <summary>
/// 消息状态Id
/// </summary>
public int MessageStatusId { get; set; }
/// <summary>
/// 预发送的时间
/// </summary>
public DateTime AddedUTCTime { get; set; }
/// <summary>
/// 用来存放 消息内容 目前没有大小限制 这个需要根据 实际情况 , mongodb 和 rabiitmq 的 综合指标来定 ,开发人员 在使用超大对象时需谨慎
/// </summary>
public string Content { get; set; }
/// <summary>
/// 消息的名称 用来鉴别不同的消息
/// </summary>
public string MessageTypeId { get; set; }
/// <summary>
/// 当前消息 回调者所需参数值
/// </summary>
public string RePushCallBackParameterValue { get; set; }
///// <summary>
///// 最后一次的重试时间
///// </summary>
//public DateTime LastRetryUTCTime { get; set; }
/// <summary>
/// 下一次的重试时间
/// </summary>
public DateTime NextRetryUTCTime { get; set; }
/// <summary>
/// 重试次数
/// </summary>
public int RetryTimes { get; set; } = 0;
public int CompareTo(Message other)
{
return Id.CompareTo(other.Id);
}
}
public class MessageIEqualityComparer : IEqualityComparer<Message>
{
public static MessageIEqualityComparer Default = new MessageIEqualityComparer();
public bool Equals(Message x, Message y)
{
return x.CompareTo(y) == 0;
}
public int GetHashCode(Message obj)
{
return obj.Id.GetHashCode();
}
}
}
using Pole.Domain;
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.ReliableMessage.Storage.Abstraction
{
public class MessageStatus : Enumeration
{
public static MessageStatus Pending = new MessageStatus(3,"待发送");
public static MessageStatus Pushed = new MessageStatus(6,"已发送");
public static MessageStatus Canced = new MessageStatus(9,"已取消");
//public static MessageStatus Handed = new MessageStatus(12, "已处理");
public MessageStatus(int id,string name ):base(id,name)
{
}
}
}
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
</PropertyGroup>
<ItemGroup>
<ProjectReference Include="..\Pole.Domain\Pole.Domain.csproj" />
</ItemGroup>
</Project>
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
</PropertyGroup>
<ItemGroup>
<ProjectReference Include="..\Pole.ReliableMessage.Storage.Abstraction\Pole.ReliableMessage.Storage.Abstraction.csproj" />
</ItemGroup>
</Project>
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.ReliableMessage.Storage.Mongodb
{
public sealed class MongoHost
{
/// <summary>
/// 主机或者IP地址
/// </summary>
public string Host { get; set; }
/// <summary>
/// 端口号
/// </summary>
public int Port { get; set; }
}
}
using Pole.ReliableMessage.Abstraction;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using MongoDB.Driver;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Pole.ReliableMessage.Storage.Abstraction;
namespace Pole.ReliableMessage.Storage.Mongodb
{
class MongodbMemberShipTableManager : IMemberShipTableManager
{
private readonly MongoClient _mongoClient;
private readonly MongodbOption _mongodbOption;
private readonly ILogger _logger;
public MongodbMemberShipTableManager(IConfiguration configuration, MongoClient mongoClient, IOptions<MongodbOption> mongodbOption, ILogger<MongodbMemberShipTableManager> logger)
{
_mongoClient = mongoClient;
_mongodbOption = mongodbOption.Value;
_logger = logger;
}
private IMongoDatabase GetActiveMessageDatabase(string activeMessageDatabase)
{
return _mongoClient.GetDatabase(activeMessageDatabase);
}
private IMongoCollection<MemberShipTable> GetCollection()
{
var database = GetActiveMessageDatabase(_mongodbOption.MessageDatabaseName);
var messageCollectionName = _mongodbOption.MembershipCollectionName;
var collection = database.GetCollection<MemberShipTable>(messageCollectionName);
return collection;
}
public async Task<bool> AddCheckerServiceInstanceAndDeleteOthers(string ipAddress, DateTime aliveUTCTime)
{
var collection = GetCollection();
var deleteResult = await collection.DeleteManyAsync(m => m.ServiceName == _mongodbOption.ServiceCollectionName);
MemberShipTable memberShipTable = new MemberShipTable(_mongodbOption.ServiceCollectionName, ipAddress, aliveUTCTime);
await collection.InsertOneAsync(memberShipTable);
return true;
}
public async Task<string> GetPendingMessageCheckerServiceInstanceIp(DateTime iamAliveEndTime)
{
var collection = GetCollection();
var instances = (await collection.FindAsync(m => m.ServiceName == _mongodbOption.ServiceCollectionName && m.IAmAliveUTCTime >= iamAliveEndTime)).ToList();
if (instances.Count > 1)
{
_logger.LogInformation($"Current time have {instances.Count} PendingMessageChecker in {_mongodbOption.ServiceCollectionName} service , I will delete the extra instances");
var currentInstance = instances.FirstOrDefault();
var extraInstances = instances.Remove(currentInstance);
instances.ForEach(async n =>
{
await collection.DeleteOneAsync(m => m.Id == n.Id);
});
_logger.LogInformation($"Extra PendingMessageChecker instances in {_mongodbOption.ServiceCollectionName} service deleted successfully");
return currentInstance.PendingMessageCheckerIp;
}
else if (instances.Count == 1)
{
return instances.FirstOrDefault().PendingMessageCheckerIp;
}
else
{
return null;
}
}
public async Task<bool> IsPendingMessageCheckerServiceInstance(string ipAddress)
{
var collection = GetCollection();
var instances = (await collection.FindAsync(m => m.ServiceName == _mongodbOption.ServiceCollectionName && m.PendingMessageCheckerIp== ipAddress)).FirstOrDefault();
if (instances != null)
{
return true;
}
return false;
}
public async Task<bool> UpdateIAmAlive(string ipAddress,DateTime dateTime)
{
var collection = GetCollection();
var filter = Builders<MemberShipTable>.Filter.Where(m => m.ServiceName == _mongodbOption.ServiceCollectionName && m.PendingMessageCheckerIp == ipAddress);
var update = Builders<MemberShipTable>.Update.Set(m=>m.IAmAliveUTCTime,dateTime);
var result = await collection.UpdateOneAsync(filter, update);
return true;
}
}
}
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.ReliableMessage.Storage.Mongodb
{
public class MongodbOption
{
public string MessageDatabaseName { get; set; } = "ReliableMessage";
public string MembershipCollectionName { get; set; } = "Membership";
public string ServiceCollectionName { get; set; }
public MongoHost[] Servers { get; set; }
}
}
using Pole.ReliableMessage.Abstraction;
using Pole.ReliableMessage.Messaging;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using MongoDB.Driver;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Text;
using System.Threading.Tasks;
using Pole.ReliableMessage.Storage.Abstraction;
using Pole.Domain;
namespace Pole.ReliableMessage.Storage.Mongodb
{
class MongodbMessageStorage : IMessageStorage
{
private readonly MongoClient _mongoClient;
private readonly MongodbOption _mongodbOption;
private readonly ILogger<MongodbMessageStorage> _logger;
public MongodbMessageStorage(MongoClient mongoClient, IOptions<MongodbOption> mongodbOption, ILogger<MongodbMessageStorage> logger)
{
_mongoClient = mongoClient;
_mongodbOption = mongodbOption.Value;
_logger = logger;
}
private IMongoDatabase GetActiveMessageDatabase(string messageDatabase)
{
return _mongoClient.GetDatabase(messageDatabase);
}
private IMongoCollection<Message> GetCollection()
{
var database = GetActiveMessageDatabase(_mongodbOption.MessageDatabaseName);
var messageCollectionName = _mongodbOption.ServiceCollectionName;
var collection = database.GetCollection<Message>(messageCollectionName);
return collection;
}
public async Task<bool> Add(Message message)
{
IMongoCollection<Message> collection = GetCollection();
await collection.InsertOneAsync(message);
return true;
}
public async Task<bool> CheckAndUpdateStatus(Expression<Func<Message, bool>> filter, MessageStatus messageStatus)
{
IMongoCollection<Message> collection = GetCollection();
var update = Builders<Message>.Update.Set(m => m.MessageStatusId, messageStatus.Id);
var beforeDoc = await collection.FindOneAndUpdateAsync(filter, update, new FindOneAndUpdateOptions<Message, Message>() { ReturnDocument = ReturnDocument.Before });
if (beforeDoc == null)
{
throw new Exception("IMessageStorage.CheckAndUpdateStatus Error ,Message not found in Storage");
}
if (beforeDoc.MessageStatusId == messageStatus.Id)
{
return false;
}
return true;
}
public async Task<List<Message>> GetMany(Expression<Func<Message, bool>> filter, int count)
{
IMongoCollection<Message> collection = GetCollection();
var list = await collection.Find(filter).Limit(count).ToListAsync();
list.ForEach(m =>
{
m.MessageStatus = Enumeration.FromValue<MessageStatus>(m.MessageStatusId);
});
return list;
}
public async Task<bool> Save(IEnumerable<Message> messages)
{
var count = messages.Count();
_logger.LogDebug($"MongodbMessageStorage Save begin, Messages count: {messages.Count()}");
if (count == 0)
{
_logger.LogDebug($"MongodbMessageStorage Save successfully, saved count: 0");
return true;
}
IMongoCollection<Message> collection = GetCollection();
var models = new List<WriteModel<Message>>();
foreach (var message in messages)
{
FilterDefinition<Message> filter = Builders<Message>.Filter.Where(m => m.Id == message.Id);
UpdateDefinition<Message> update = Builders<Message>.Update
.Set(m => m.MessageStatusId, message.MessageStatus.Id)
.Set(m => m.RetryTimes, message.RetryTimes)
.Set(m => m.NextRetryUTCTime, message.NextRetryUTCTime);
var model = new UpdateOneModel<Message>(filter, update);
models.Add(model);
}
var result = await collection.BulkWriteAsync(models, new BulkWriteOptions { IsOrdered = false });
_logger.LogDebug($"MongodbMessageStorage Save successfully, saved count: {result.ModifiedCount}");
return result.IsAcknowledged;
}
public async Task<bool> UpdateStatus(Expression<Func<Message, bool>> filter, MessageStatus messageStatus)
{
IMongoCollection<Message> collection = GetCollection();
var update = Builders<Message>.Update.Set(m => m.MessageStatusId, messageStatus.Id);
var result = await collection.UpdateOneAsync(filter, update);
return result.IsAcknowledged;
}
public async Task<long> Delete(Expression<Func<Message, bool>> filter)
{
IMongoCollection<Message> collection = GetCollection();
var result = await collection.DeleteManyAsync(filter);
return result.DeletedCount;
}
public Task<Message> GetOne(Expression<Func<Message, bool>> filter)
{
throw new NotImplementedException();
}
}
}
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="3.1.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="3.1.0" />
<PackageReference Include="Microsoft.Extensions.Options" Version="3.1.0" />
<PackageReference Include="MongoDB.Driver" Version="2.10.0" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Pole.ReliableMessage.Storage.Abstraction\Pole.ReliableMessage.Storage.Abstraction.csproj" />
<ProjectReference Include="..\Pole.ReliableMessage\Pole.ReliableMessage.csproj" />
</ItemGroup>
</Project>
using Pole.ReliableMessage;
using Pole.ReliableMessage.Abstraction;
using Pole.ReliableMessage.Messaging;
using Pole.ReliableMessage.Storage.Mongodb;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using MongoDB.Bson;
using MongoDB.Bson.Serialization;
using MongoDB.Bson.Serialization.IdGenerators;
using MongoDB.Driver;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using Pole.ReliableMessage.Storage.Abstraction;
namespace Microsoft.Extensions.DependencyInjection
{
public static class ReliableMessageOptionExtension
{
public static ReliableMessageOption AddMongodb(this ReliableMessageOption option, Action<MongodbOption> mongodbOptionConfig)
{
option.ReliableMessageOptionExtensions.Add(new MongodbStorageExtension(mongodbOptionConfig));
return option;
}
}
public class MongodbStorageExtension : IReliableMessageOptionExtension
{
private readonly Action<MongodbOption> _mongodbOption;
public MongodbStorageExtension(Action<MongodbOption> masstransitRabbitmqOption)
{
_mongodbOption = masstransitRabbitmqOption;
}
public void AddServices(IServiceCollection services)
{
services.Configure(_mongodbOption);
services.AddSingleton<IMessageStorage, MongodbMessageStorage>();
services.AddSingleton<IMemberShipTableManager, MongodbMemberShipTableManager>();
var mongodbOption = services.BuildServiceProvider().GetRequiredService<IOptions<MongodbOption>>().Value;
var servers = mongodbOption.Servers.Select(x => new MongoServerAddress(x.Host, x.Port)).ToList();
var settings = new MongoClientSettings()
{
Servers = servers
};
var client = new MongoClient(settings);
var database = client.GetDatabase(mongodbOption.MessageDatabaseName);
AddMapper();
InitCollection(mongodbOption, database);
services.AddSingleton(client);
}
private static void InitCollection(MongodbOption mongodbOption, IMongoDatabase database)
{
var collectionNames = database.ListCollectionNames().ToList();
if (!collectionNames.Contains(mongodbOption.ServiceCollectionName))
{
database.CreateCollection(mongodbOption.ServiceCollectionName);
var messageCollection = database.GetCollection<Message>(mongodbOption.ServiceCollectionName);
AddMessageCollectionIndex(messageCollection);
}
if (!collectionNames.Contains(mongodbOption.MembershipCollectionName))
{
database.CreateCollection(mongodbOption.MembershipCollectionName);
var membershipCollection = database.GetCollection<MemberShipTable>(mongodbOption.MembershipCollectionName);
AddMemberShipTableCollectionIndex(membershipCollection);
}
}
private static void AddMessageCollectionIndex(IMongoCollection<Message> collection)
{
List<CreateIndexModel<Message>> createIndexModels = new List<CreateIndexModel<Message>>();
//var nextRetryUTCTimeIndex = Builders<Message>.IndexKeys.Ascending(m => m.NextRetryUTCTime);
//CreateIndexModel<Message> nextRetryUTCTimeIndexModel = new CreateIndexModel<Message>(nextRetryUTCTimeIndex, new CreateIndexOptions() { Background = true });
//createIndexModels.Add(nextRetryUTCTimeIndexModel);
var AddedUTCTimeUTCTimeIndex = Builders<Message>.IndexKeys.Ascending(m => m.AddedUTCTime);
CreateIndexModel<Message> AddedUTCTimeIndexModel = new CreateIndexModel<Message>(AddedUTCTimeUTCTimeIndex, new CreateIndexOptions() { Background = true });
createIndexModels.Add(AddedUTCTimeIndexModel);
//var messageTypeIdIndex = Builders<Message>.IndexKeys.Ascending(m => m.MessageTypeId);
//CreateIndexModel<Message> messageTypeIdIndexModel = new CreateIndexModel<Message>(messageTypeIdIndex, new CreateIndexOptions() { Background = true });
//createIndexModels.Add(messageTypeIdIndexModel);
collection.Indexes.CreateMany(createIndexModels);
}
private static void AddMemberShipTableCollectionIndex(IMongoCollection<MemberShipTable> collection)
{
List<CreateIndexModel<MemberShipTable>> createIndexMembershipModels = new List<CreateIndexModel<MemberShipTable>>();
var serviceNameIndex = Builders<MemberShipTable>.IndexKeys.Ascending(m => m.ServiceName);
CreateIndexModel<MemberShipTable> serviceNameIndexModel = new CreateIndexModel<MemberShipTable>(serviceNameIndex, new CreateIndexOptions() { Background = true, Unique = true });
createIndexMembershipModels.Add(serviceNameIndexModel);
collection.Indexes.CreateMany(createIndexMembershipModels);
}
private static void AddMapper()
{
BsonClassMap.RegisterClassMap<Message>(cm =>
{
cm.AutoMap();
cm.UnmapMember(m => m.MessageStatus);
cm.MapIdField(m => m.Id);
cm.MapMember(m => m.NextRetryUTCTime).SetIsRequired(true);
});
BsonClassMap.RegisterClassMap<MemberShipTable>(cm =>
{
cm.AutoMap();
cm.MapIdField(m => m.Id).SetIdGenerator(StringObjectIdGenerator.Instance);
});
}
}
}
using Microsoft.AspNetCore.Builder;
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.ReliableMessage.Abstraction
{
public interface IApplicationBuilderConfigurator
{
void Config(IApplicationBuilder applicationBuilder);
void Add(Action<IApplicationBuilder> config);
}
}
using Microsoft.Extensions.DependencyInjection;
using System;
using System.Collections.Generic;
using System.Reflection;
using System.Text;
using System.Threading.Tasks;
namespace Pole.ReliableMessage.Abstraction
{
public interface IComteckReliableMessageBootstrap
{
Task Initialize(IServiceCollection services, List<Assembly> eventHandlerAssemblies, List<Assembly> eventAssemblies);
}
}
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.ReliableMessage.Abstraction
{
public interface IJsonConverter
{
string Serialize(object obj);
T Deserialize<T>(string json);
object Deserialize(string json,Type type);
}
}
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Pole.ReliableMessage.Abstraction
{
public interface IMessageBus
{
Task Publish(object @event,string reliableMessageId, CancellationToken cancellationToken = default);
}
}
using Microsoft.Extensions.DependencyInjection;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
namespace Pole.ReliableMessage.Abstraction
{
public interface IMessageBusConfigurator
{
Task Configure(IServiceCollection services, IEnumerable<Type> eventHandlerTypes);
}
}
using Pole.ReliableMessage.Messaging.CallBack;
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.ReliableMessage.Abstraction
{
public interface IMessageCallBackInfoGenerator
{
MessageCallBackInfo Generate(Type eventHandlerType);
}
}
using Pole.ReliableMessage.Messaging.CallBack;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
namespace Pole.ReliableMessage.Abstraction
{
public interface IMessageCallBackInfoStore
{
Task Add(MessageCallBackInfo messageCallBackInfo);
Task<MessageCallBackInfo> Get(string messageTypeId);
}
}
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
namespace Pole.ReliableMessage.Abstraction
{
public interface IMessageCallBackRegister
{
Task Register(IEnumerable<Type> eventHandlerTypes);
}
}
using Pole.ReliableMessage.Storage.Abstraction;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
namespace Pole.ReliableMessage.Abstraction
{
public interface IMessageCheckRetryer
{
Task Execute(IEnumerable<Message> messages, DateTime dateTime);
}
}
using Pole.ReliableMessage.Messaging;
using Pole.ReliableMessage.Storage.Abstraction;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
namespace Pole.ReliableMessage.Abstraction
{
public interface IMessageChecker
{
Task<MessageCheckerResult> GetResult(Message message);
}
}
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.ReliableMessage.Abstraction
{
public interface IMessageIdGenerator
{
string Generate();
}
}
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.ReliableMessage.Abstraction
{
public interface IMessageTypeIdGenerator
{
string Generate(Type messageType);
}
}
using Pole.ReliableMessage.Processor;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
namespace Pole.ReliableMessage.Abstraction
{
public interface IProcessor
{
string Name { get; }
Task Process(ProcessingContext context);
}
}
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Pole.ReliableMessage.Abstraction
{
public interface IProcessorServer
{
Task Start(CancellationToken stoppingToken);
}
}
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Pole.ReliableMessage.Abstraction
{
public interface IReliableBus
{
Task<string> PrePublish<TReliableEvent>(TReliableEvent @event, object callbackParemeter, CancellationToken cancellationToken = default) where TReliableEvent : class;
Task<string> PrePublish(object @event, Type eventType, object callbackParemeter, CancellationToken cancellationToken = default);
Task<bool> Publish(object @event, string prePublishMessageId, CancellationToken cancellationToken = default);
Task<bool> Cancel(string prePublishMessageId, CancellationToken cancellationToken = default);
}
}
using System;
using System.Collections.Generic;
using System.Reflection;
using System.Text;
namespace Pole.ReliableMessage.Abstraction
{
public interface IReliableEventCallBackFinder
{
List<Type> FindAll(IEnumerable<Assembly> assemblies);
}
}
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
namespace Pole.ReliableMessage.Abstraction
{
public interface IReliableEventCallback<TReliableEvent,TCallbackParemeter> : IReliableEventCallback
{
Task<bool> Callback(TCallbackParemeter callbackParemeter);
}
public interface IReliableEventCallback
{
}
}
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
namespace Pole.ReliableMessage.Abstraction
{
public interface IReliableEventHandler<TEvent> : IReliableEventHandler
where TEvent : class
{
Task Handle(IReliableEventHandlerContext<TEvent> context);
}
public interface IReliableEventHandler
{
}
}
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
namespace Pole.ReliableMessage.Abstraction
{
public interface IReliableEventHandlerContext<TEvent> where TEvent : class
{
TEvent Event { get; }
//Task Publish(object @event);
}
}
using System;
using System.Collections.Generic;
using System.Reflection;
using System.Text;
namespace Pole.ReliableMessage.Abstraction
{
public interface IReliableEventHandlerFinder
{
List<Type> FindAll(IEnumerable<Assembly> assemblies);
}
}
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.ReliableMessage.Abstraction
{
public interface IRetryTimeDelayCalculator
{
int Get(int retryTimes, int maxPendingMessageRetryDelay);
}
}
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.ReliableMessage.Abstraction
{
public interface IServiceIPv4AddressProvider
{
string Get();
}
}
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.ReliableMessage.Abstraction
{
public interface ITimeHelper
{
DateTime GetUTCNow();
/// <summary>
/// "UTC :{_timeHelper.GetNow().ToString("yyyy-MM-dd HH:mm:ss.fff")}"
/// </summary>
/// <returns></returns>
string GetAppropriateFormatedDateString();
}
}
using Pole.ReliableMessage.Abstraction;
using Pole.ReliableMessage.Processor;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Pole.ReliableMessage
{
public class BackgroundServiceBasedProcessorServer : BackgroundService, IProcessorServer
{
private readonly IServiceProvider _serviceProvider;
private Task _compositeTask;
public BackgroundServiceBasedProcessorServer(IServiceProvider serviceProvider)
{
_serviceProvider = serviceProvider;
}
public async Task Start(CancellationToken stoppingToken)
{
ProcessingContext processingContext = new ProcessingContext(stoppingToken);
List<LoopProcessor> loopProcessors = new List<LoopProcessor>();
var innerProcessors = _serviceProvider.GetServices<IProcessor>();
var loggerFactory = _serviceProvider.GetService<ILoggerFactory>();
var timeHelper = _serviceProvider.GetService<ITimeHelper>();
foreach (var innerProcessor in innerProcessors)
{
LoopProcessor processor = new LoopProcessor(innerProcessor, loggerFactory, timeHelper);
loopProcessors.Add(processor);
}
var tasks = loopProcessors.Select(p => p.Process(processingContext));
_compositeTask = Task.WhenAll(tasks);
await _compositeTask;
}
protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
return Start(stoppingToken);
}
}
}
using Pole.ReliableMessage;
using Pole.ReliableMessage.Abstraction;
using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace Microsoft.AspNetCore.Builder
{
public static class ComteckReliableMessageIApplicationBuilderExtensions
{
public static IApplicationBuilder UsePoleReliableMessage(this IApplicationBuilder applicationBuilder)
{
var option = applicationBuilder.ApplicationServices.GetRequiredService(typeof(IOptions<ReliableMessageOption>)) as IOptions<ReliableMessageOption>;
var messageCallBackRegister = applicationBuilder.ApplicationServices.GetRequiredService(typeof(IMessageCallBackRegister)) as IMessageCallBackRegister;
var reliableEventCallBackFinder = applicationBuilder.ApplicationServices.GetRequiredService(typeof(IReliableEventCallBackFinder)) as IReliableEventCallBackFinder;
var eventCallbacks = reliableEventCallBackFinder.FindAll(option.Value.EventCallbackAssemblies);
messageCallBackRegister.Register(eventCallbacks).GetAwaiter().GetResult();
return applicationBuilder;
}
}
}
using Pole.ReliableMessage.Abstraction;
using Microsoft.AspNetCore.Builder;
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.ReliableMessage
{
class DefaultApplicationBuilderConfigurator : IApplicationBuilderConfigurator
{
private readonly List<Action<IApplicationBuilder>> _configs = new List<Action<IApplicationBuilder>>();
public void Add(Action<IApplicationBuilder> config)
{
_configs.Add(config);
}
public void Config(IApplicationBuilder applicationBuilder)
{
_configs.ForEach(m => {
m(applicationBuilder);
});
}
}
}
using Pole.ReliableMessage.Abstraction;
using Microsoft.Extensions.DependencyInjection;
using System;
using System.Collections.Generic;
using System.Reflection;
using System.Text;
using System.Threading.Tasks;
namespace Pole.ReliableMessage
{
class DefaultComteckReliableMessageBootstrap : IComteckReliableMessageBootstrap
{
private readonly IReliableEventHandlerFinder _reliableEventHandlerFinder;
private readonly IMessageBusConfigurator _messageBusConfigurator;
private readonly IMessageCallBackRegister _messageCallBackRegister;
private readonly IReliableEventCallBackFinder _reliableEventCallBackFinder;
public DefaultComteckReliableMessageBootstrap(IReliableEventHandlerFinder reliableEventHandlerFinder, IMessageBusConfigurator messageBusConfigurator, IMessageCallBackRegister messageCallBackRegister, IReliableEventCallBackFinder reliableEventCallBackFinder)
{
_reliableEventHandlerFinder = reliableEventHandlerFinder;
_messageBusConfigurator = messageBusConfigurator;
_messageCallBackRegister = messageCallBackRegister;
_reliableEventCallBackFinder = reliableEventCallBackFinder;
}
public async Task Initialize(IServiceCollection services, List<Assembly> eventHandlerAssemblies, List<Assembly> eventAssemblies)
{
var eventHandlers = _reliableEventHandlerFinder.FindAll(eventHandlerAssemblies);
await _messageBusConfigurator.Configure(services, eventHandlers);
var eventCallbacks = _reliableEventCallBackFinder.FindAll(eventAssemblies);
await _messageCallBackRegister.Register(eventCallbacks);
RegisterEventCallbacks(services, eventCallbacks);
}
private void RegisterEventCallbacks(IServiceCollection services, List<Type> eventCallbacks)
{
eventCallbacks.ForEach(m =>
{
services.AddScoped(m);
});
}
}
}
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Pole.ReliableMessage.Abstraction;
using Pole.ReliableMessage.Storage.Abstraction;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace Pole.ReliableMessage
{
class DefaultMessageCheckRetryer : IMessageCheckRetryer
{
private readonly ILogger _logger;
private readonly IRetryTimeDelayCalculator _retryTimeDelayCalculator;
private readonly ReliableMessageOption _options;
private readonly IMessageStorage _storage;
private readonly IMessageChecker _messageChecker;
private readonly IMessageBus _messageBus;
private readonly List<Message> _changedMessage = new List<Message>();
public DefaultMessageCheckRetryer(ILogger<DefaultMessageCheckRetryer> logger, IRetryTimeDelayCalculator retryTimeDelayCalculator, IOptions<ReliableMessageOption> options, IMessageStorage storage, IMessageChecker messageChecker, IMessageBus messageBus)
{
_logger = logger;
_retryTimeDelayCalculator = retryTimeDelayCalculator;
_options = options.Value ?? throw new Exception($"{nameof(ReliableMessageOption)} Must be injected");
_storage = storage;
_messageChecker = messageChecker;
_messageBus = messageBus;
}
public async Task Execute(IEnumerable<Message> messages, DateTime dateTime)
{
try
{
messages.AsParallel().ForAll(async m => await Retry(m, dateTime));
if (_changedMessage.Count != 0)
{
await _storage.Save(_changedMessage);
}
}
catch (Exception ex)
{
_logger.LogError(ex, $"DefaultMessageCheckRetryer.Execute ,Execute with errors");
}
finally
{
if (_changedMessage.Count != 0)
{
_changedMessage.Clear();
}
}
}
private async Task Retry(Message message, DateTime retryTime)
{
try
{
if (_logger.IsEnabled(LogLevel.Debug))
{
_logger.LogDebug($"DefaultMessageCheckRetryer.Retry ,message:{message.Id} begin Retry");
}
var nextRetryDelay = _retryTimeDelayCalculator.Get(message.RetryTimes, _options.MaxPendingMessageRetryDelay);
message.NextRetryUTCTime = retryTime.AddSeconds(nextRetryDelay);
if (retryTime > message.AddedUTCTime.AddSeconds(_options.PendingMessageTimeOut))
{
if (_logger.IsEnabled(LogLevel.Debug))
{
_logger.LogDebug($"DefaultMessageCheckRetryer.Retry ,message:{message.Id} would be Canced ,PendingMessageTimeOut:{_options.PendingMessageTimeOut}");
}
message.NextRetryUTCTime = DateTime.MinValue;
message.MessageStatus = MessageStatus.Canced;
_changedMessage.Add(message);
return;
}
message.RetryTimes++;
var messageCheckerResult = await _messageChecker.GetResult(message);
if (messageCheckerResult.IsFinished)
{
if (_logger.IsEnabled(LogLevel.Debug))
{
_logger.LogDebug($"DefaultMessageCheckRetryer.Retry ,message:{message.Id} would be Pushed");
}
message.MessageStatus = MessageStatus.Pushed;
await _messageBus.Publish(messageCheckerResult.Event, message.Id);
}
else
{
if (_logger.IsEnabled(LogLevel.Debug))
{
_logger.LogDebug($"DefaultMessageCheckRetryer.Retry ,message:{message.Id} would be Retry next time");
}
}
_changedMessage.Add(message);
}
catch (Exception ex)
{
_logger.LogError(ex, $"DefaultMessageCheckRetryer.Retry ,Message:{message.Id} retry with errors");
}
}
}
}
using Pole.ReliableMessage.Abstraction;
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.ReliableMessage
{
class DefaultRetryTimeDelayCalculator : IRetryTimeDelayCalculator
{
public int Get(int retryTimes, int maxPendingMessageRetryDelay)
{
var retryTimeDelay = (int)Math.Pow(2, retryTimes + 1);
if (retryTimeDelay >= maxPendingMessageRetryDelay)
{
return maxPendingMessageRetryDelay;
}
return retryTimeDelay;
}
}
}
using Pole.ReliableMessage.Abstraction;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using ILogger = Microsoft.Extensions.Logging.ILogger;
using Pole.ReliableMessage.Storage.Abstraction;
namespace Pole.Pole.ReliableMessage.EventBus
{
public class DefaultReliableBus : IReliableBus
{
private readonly IMessageBus _messageBus;
private readonly IMessageStorage _messageStorage;
private readonly IMessageIdGenerator _messageIdGenerator;
private readonly ITimeHelper _timeHelper;
//private readonly IMessageBuffer _messageBuffer;
private readonly ILogger _logger;
private readonly IJsonConverter _jsonConverter;
private readonly IMessageCallBackInfoStore _messageCallBackInfoStore;
private readonly IMessageTypeIdGenerator _messageTypeIdGenerator;
public DefaultReliableBus(IMessageBus messageBus, IMessageStorage messageStorage, IMessageIdGenerator messageIdGenerator, ITimeHelper timeHelper, ILogger<DefaultReliableBus> logger, IJsonConverter jsonConverter, IMessageCallBackInfoStore messageCallBackInfoStore, IMessageTypeIdGenerator messageTypeIdGenerator)
{
_messageBus = messageBus;
_messageStorage = messageStorage;
_messageIdGenerator = messageIdGenerator;
_timeHelper = timeHelper;
_logger = logger;
_jsonConverter = jsonConverter;
_messageCallBackInfoStore = messageCallBackInfoStore;
_messageTypeIdGenerator = messageTypeIdGenerator;
}
public Task<bool> Cancel(string prePublishMessageId, CancellationToken cancellationToken = default)
{
try
{
return _messageStorage.UpdateStatus(m => m.Id == prePublishMessageId, MessageStatus.Canced);
}
catch (Exception ex)
{
var errorInfo = $"Cancel PrePublish errors in defaultReliableBus;{ex.Message}";
_logger.LogError(ex, errorInfo);
throw new Exception(errorInfo, ex);
}
}
#region PrePublish
public async Task<string> PrePublish<TReliableEvent>(TReliableEvent @event, object callbackParemeter, CancellationToken cancellationToken = default) where TReliableEvent : class
{
var messageTypeId = _messageTypeIdGenerator.Generate(typeof(TReliableEvent));
var content = _jsonConverter.Serialize(@event);
return await PrePublishCore(callbackParemeter, messageTypeId, content);
}
public async Task<string> PrePublish(object @event, Type eventType, object callbackParemeter, CancellationToken cancellationToken = default)
{
var messageTypeId = _messageTypeIdGenerator.Generate(eventType);
var content = _jsonConverter.Serialize(@event);
return await PrePublishCore(callbackParemeter, messageTypeId, content);
}
private async Task<string> PrePublishCore(object callbackParemeter, string messageTypeId, string content)
{
var currentMessageCallbackInfo = _messageCallBackInfoStore.Get(messageTypeId);
if (currentMessageCallbackInfo == null)
{
throw new Exception($"Current message type Callback not registered ,messageTypeId:{messageTypeId}");
}
try
{
var messageId = _messageIdGenerator.Generate();
_logger.LogDebug($"PrePublish message begin ,messageId:{messageId}");
var now = _timeHelper.GetUTCNow();
var callBackParem = _jsonConverter.Serialize(callbackParemeter);
Message newMessage = new Message()
{
AddedUTCTime = now,
Content = content,
Id = messageId,
MessageStatusId = MessageStatus.Pending.Id,
MessageTypeId = messageTypeId,
RePushCallBackParameterValue = callBackParem,
NextRetryUTCTime = DateTime.MinValue
};
await _messageStorage.Add(newMessage);
_logger.LogDebug($"PrePublish message successful ,messageId:{messageId}");
return messageId;
}
catch (Exception ex)
{
var errorInfo = $"PrePublish errors in DefaultReliableBus;{ex.Message}";
_logger.LogError(ex, errorInfo);
throw new Exception(errorInfo, ex);
}
}
#endregion
public async Task<bool> Publish(object @event, string prePublishMessageId, CancellationToken cancellationToken = default)
{
try
{
await _messageBus.Publish(@event, prePublishMessageId, cancellationToken);
var messageBufferResult = await _messageStorage.UpdateStatus(m => m.Id == prePublishMessageId && m.MessageStatusId == MessageStatus.Pending.Id, MessageStatus.Pushed);
return messageBufferResult;
}
catch (Exception ex)
{
var errorInfo = $"Publish errors in DefaultReliableBus;{ex.Message}";
_logger.LogError(ex, errorInfo);
throw new Exception(errorInfo, ex);
}
}
}
}
using Pole.ReliableMessage.Abstraction;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Text;
namespace Pole.ReliableMessage.EventBus
{
class DefaultReliableEventCallBackFinder : IReliableEventCallBackFinder
{
public List<Type> FindAll(IEnumerable<Assembly> assemblies)
{
var callbackType = typeof(IReliableEventCallback);
var callbackTypes = assemblies.SelectMany(m => m.GetTypes().Where(type => callbackType.IsAssignableFrom(type)));
return callbackTypes.ToList(); ;
}
}
}
using Pole.ReliableMessage.Abstraction;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Text;
namespace Pole.ReliableMessage.EventBus
{
public class DefaultReliableEventHandlerFinder : IReliableEventHandlerFinder
{
public List<Type> FindAll(IEnumerable<Assembly> assemblies)
{
var eventHandlerType = typeof(IReliableEventHandler);
var eventHandlerTypes = assemblies.SelectMany(m => m.GetTypes().Where(type => eventHandlerType.IsAssignableFrom(type)));
return eventHandlerTypes.ToList();
}
}
}
using Microsoft.Extensions.DependencyInjection;
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.ReliableMessage
{
public interface IReliableMessageOptionExtension
{
void AddServices(IServiceCollection services);
}
}
using Pole.ReliableMessage.Abstraction;
using Pole.ReliableMessage.EventBus;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Text;
using System.Threading.Tasks;
namespace Pole.ReliableMessage.Messaging.CallBack
{
class DefaultMessageCallBackInfoGenerator : IMessageCallBackInfoGenerator
{
private readonly IMessageTypeIdGenerator _messageTypeIdGenerator;
public DefaultMessageCallBackInfoGenerator(IMessageTypeIdGenerator messageTypeIdGenerator)
{
_messageTypeIdGenerator = messageTypeIdGenerator;
}
public MessageCallBackInfo Generate(Type eventCallbackType)
{
var @interface = eventCallbackType.GetInterfaces().FirstOrDefault();
Func<object, object, Task<bool>> deleg = MakeCallBackFunc(eventCallbackType, @interface);
var eventType = @interface.GetGenericArguments()[0];
var eventCallbackArguemntType = @interface.GetGenericArguments()[1];
var enentName = _messageTypeIdGenerator.Generate(eventType);
MessageCallBackInfo messageCallBackInfo = new MessageCallBackInfo(enentName, deleg, eventCallbackType, eventCallbackArguemntType, eventType);
return messageCallBackInfo;
}
private static Func<object, object, Task<bool>> MakeCallBackFunc(Type eventType, Type @interface)
{
var callbackParemeterType = @interface.GetGenericArguments()[1];
var argument = Expression.Parameter(typeof(object));
var paremeter = Expression.Parameter(typeof(object));
// var typedParemeter = Expression.Parameter(eventType);
var typedcallbackParemeter = Expression.Convert(argument, callbackParemeterType);
var typedParemeter = Expression.Convert(paremeter, eventType);
var callBackMethod = eventType.GetMethod("Callback");
var call = Expression.Call(typedParemeter, callBackMethod, typedcallbackParemeter);
//var innerParemeter = eventType.GetInterfaces().FirstOrDefault();
var lambda = Expression.Lambda<Func<object, object, Task<bool>>>(call, true, argument, paremeter);
var deleg = lambda.Compile();
return deleg;
}
}
}
using Pole.ReliableMessage.Abstraction;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
namespace Pole.ReliableMessage.Messaging.CallBack
{
class DefaultMessageCallBackRegister : IMessageCallBackRegister
{
private readonly IMessageCallBackInfoGenerator _messageCallBackInfoGenerator;
private readonly IMessageCallBackInfoStore _messageCallBackInfoStore;
public DefaultMessageCallBackRegister(IMessageCallBackInfoGenerator messageCallBackInfoGenerator, IMessageCallBackInfoStore messageCallBackInfoStore)
{
_messageCallBackInfoGenerator = messageCallBackInfoGenerator;
_messageCallBackInfoStore = messageCallBackInfoStore;
}
public async Task Register(IEnumerable<Type> eventCallbackTypes)
{
foreach(var eventCallbackType in eventCallbackTypes)
{
var messageCallBackInfo = _messageCallBackInfoGenerator.Generate(eventCallbackType);
await _messageCallBackInfoStore.Add(messageCallBackInfo);
}
}
}
}
using Pole.ReliableMessage.Abstraction;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
namespace Pole.ReliableMessage.Messaging.CallBack
{
public class MessageCallBackInfo
{
private Func<object, object, Task<bool>> _callBack;
public MessageCallBackInfo(string messageTypeId, Func<object, object, Task<bool>> callBack, Type eventCallbackType, Type eventCallbackArguemntType,Type eventType)
{
MessageTypeId = messageTypeId;
_callBack = callBack;
EventCallbackType = eventCallbackType;
EventCallbackArguemntType = eventCallbackArguemntType;
EventType = eventType;
}
public string MessageTypeId { get;private set; }
public Type EventType { get; private set; }
public Type EventCallbackType { get; private set; }
public Type EventCallbackArguemntType { get; private set; }
public Task<bool> Invoke(object parameter, object reliableEvent)
{
return _callBack(parameter, reliableEvent);
}
}
}
using Pole.ReliableMessage.Abstraction;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
namespace Pole.ReliableMessage.Messaging.CallBack
{
public class MessageCallBackInfoInMemoryStore : Dictionary<string, MessageCallBackInfo>, IMessageCallBackInfoStore
{
private readonly ILogger<MessageCallBackInfoInMemoryStore> _logger;
public MessageCallBackInfoInMemoryStore(ILogger<MessageCallBackInfoInMemoryStore> logger)
{
_logger = logger;
}
public async Task Add(MessageCallBackInfo messageCallBackInfo)
{
await Task.CompletedTask;
if (TryGetValue(messageCallBackInfo.MessageTypeId, out MessageCallBackInfo info))
{
throw new Exception($"Add MessageCallBackInfo Faild , MessageCallBackInfo Already Added ,MessageTypeId:{messageCallBackInfo.MessageTypeId}");
}
Add(messageCallBackInfo.MessageTypeId, messageCallBackInfo);
_logger.LogDebug($"Add MessageCallBackInfo Success ,MessageTypeId:{messageCallBackInfo.MessageTypeId}");
}
public async Task<MessageCallBackInfo> Get(string messageTypeId)
{
await Task.CompletedTask;
if (TryGetValue(messageTypeId, out MessageCallBackInfo info))
{
return info;
}
return null;
}
}
}
using Pole.Domain;
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.ReliableMessage.Messaging.CallBack
{
public class MessageCallBackResponseStatus : Enumeration
{
public static MessageCallBackResponseStatus Finised = new MessageCallBackResponseStatus(3, "已完成");
public static MessageCallBackResponseStatus Unfinished = new MessageCallBackResponseStatus(6, "未完成");
public static MessageCallBackResponseStatus Unusual = new MessageCallBackResponseStatus(9, "异常");
public MessageCallBackResponseStatus(int id, string name) : base(id, name)
{
}
}
}
using Pole.ReliableMessage.Abstraction;
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.ReliableMessage.Messaging
{
class DefaultJsonConverter : IJsonConverter
{
public T Deserialize<T>(string json)
{
return System.Text.Json.JsonSerializer.Deserialize<T>(json);
}
public object Deserialize(string json, Type type)
{
return System.Text.Json.JsonSerializer.Deserialize(json, type);
}
public string Serialize(object obj)
{
return System.Text.Json.JsonSerializer.Serialize(obj);
}
}
}
using Pole.ReliableMessage.Abstraction;
using Pole.ReliableMessage.Messaging;
using Pole.ReliableMessage.Messaging.CallBack;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using Pole.ReliableMessage.Storage.Abstraction;
namespace Pole.ReliableMessage.Messaging
{
class DefaultMessageChecker : IMessageChecker
{
private readonly IMessageCallBackInfoStore _messageCallBackInfoStore;
private readonly ILogger<DefaultMessageChecker> _logger;
private readonly IJsonConverter _jsonConverter;
private readonly IServiceProvider _serviceProvider;
public DefaultMessageChecker(IMessageCallBackInfoStore messageCallBackInfoStore, ILogger<DefaultMessageChecker> logger, IJsonConverter jsonConverter, IServiceProvider serviceProvider)
{
_messageCallBackInfoStore = messageCallBackInfoStore;
_logger = logger;
_jsonConverter = jsonConverter;
_serviceProvider = serviceProvider;
}
public async Task<MessageCheckerResult> GetResult(Message message)
{
try
{
var callBackInfo = await _messageCallBackInfoStore.Get(message.MessageTypeId);
if (callBackInfo == null)
{
_logger.LogError($"Can't find the callBackInfo, MessageTypeId:{message.MessageTypeId}");
return MessageCheckerResult.NotFinished;
}
using (var scope = _serviceProvider.CreateScope())
{
var callback = scope.ServiceProvider.GetRequiredService(callBackInfo.EventCallbackType);
var argument = _jsonConverter.Deserialize(message.RePushCallBackParameterValue, callBackInfo.EventCallbackArguemntType);
var result = await callBackInfo.Invoke(argument, callback);
if (_logger.IsEnabled(LogLevel.Debug))
{
var messageInfoDetail = _jsonConverter.Serialize(message);
_logger.LogDebug($"DefaultMessageChecker IsFinished Result:{result.ToString()},MessageTypeId:{message.MessageTypeId},MessageDetail:{messageInfoDetail}");
}
if (result)
{
var @event = _jsonConverter.Deserialize(message.Content, callBackInfo.EventType);
return new MessageCheckerResult(true, @event);
}
return MessageCheckerResult.NotFinished;
}
}
catch (Exception ex)
{
var messageInfoDetail = _jsonConverter.Serialize(message);
_logger.LogError(ex, $"DefaultMessageChecker.IsFinished Error, MessageTypeId:{message.MessageTypeId},MessageDetail:{messageInfoDetail}");
return MessageCheckerResult.NotFinished;
}
}
}
}
using Pole.ReliableMessage.Abstraction;
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.ReliableMessage.Messaging
{
class DefaultMessageTypeIdGenerator : IMessageTypeIdGenerator
{
public string Generate(Type messageType)
{
return messageType.FullName;
}
}
}
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.ReliableMessage.Messaging
{
public class MessageCheckerResult
{
public static MessageCheckerResult NotFinished = new MessageCheckerResult(false);
public MessageCheckerResult(bool isFinished,object @event):this(isFinished)
{
Event = @event;
}
public MessageCheckerResult(bool isFinished)
{
IsFinished = isFinished;
}
public bool IsFinished { get; set; }
public object Event { get; set; }
}
}
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
</PropertyGroup>
<ItemGroup>
<Folder Include="Filter\" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Microsoft.AspNetCore.Http.Abstractions" Version="2.2.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="3.1.0" />
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="3.1.0" />
<PackageReference Include="Microsoft.Extensions.Http" Version="3.1.0" />
<PackageReference Include="Microsoft.Extensions.Options" Version="3.1.0" />
<PackageReference Include="System.Text.Json" Version="4.7.0" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\Pole.ReliableMessage.Storage.Abstraction\Pole.ReliableMessage.Storage.Abstraction.csproj" />
</ItemGroup>
</Project>
using Pole.Pole.ReliableMessage.EventBus;
using Pole.ReliableMessage;
using Pole.ReliableMessage.Abstraction;
using Pole.ReliableMessage.EventBus;
using Pole.ReliableMessage.Messaging;
using Pole.ReliableMessage.Messaging.CallBack;
using Pole.ReliableMessage.Processor;
using Pole.ReliableMessage.Utils;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Text;
namespace Microsoft.Extensions.DependencyInjection
{
public static class PoleReliableMessageServiceCollectionExtensions
{
public static IServiceCollection AddPoleReliableMessage(this IServiceCollection services, Action<ReliableMessageOption> optionConfig)
{
ReliableMessageOption reliableMessageOption = new ReliableMessageOption();
optionConfig(reliableMessageOption);
foreach(var extension in reliableMessageOption.ReliableMessageOptionExtensions)
{
extension.AddServices(services);
}
services.Configure(optionConfig);
services.AddSingleton<IJsonConverter, DefaultJsonConverter>();
//services.AddSingleton<IMessageBuffer, DefaultMessageBuffer>();
services.AddSingleton<IMessageCheckRetryer, DefaultMessageCheckRetryer>();
services.AddSingleton<IRetryTimeDelayCalculator, DefaultRetryTimeDelayCalculator>();
services.AddSingleton<ITimeHelper, DefaulTimeHelper>();
services.AddSingleton<IApplicationBuilderConfigurator, DefaultApplicationBuilderConfigurator>();
services.AddSingleton<IComteckReliableMessageBootstrap, DefaultComteckReliableMessageBootstrap>();
services.AddSingleton<IMessageCallBackInfoGenerator, DefaultMessageCallBackInfoGenerator>();
services.AddSingleton<IMessageCallBackInfoStore, MessageCallBackInfoInMemoryStore>();
services.AddSingleton<IMessageCallBackRegister, DefaultMessageCallBackRegister>();
services.AddSingleton<IMessageChecker, DefaultMessageChecker>();
services.AddSingleton<IMessageCallBackInfoGenerator, DefaultMessageCallBackInfoGenerator>();
services.AddSingleton<IReliableBus, DefaultReliableBus>();
services.AddSingleton<IReliableEventCallBackFinder, DefaultReliableEventCallBackFinder>();
services.AddSingleton<IReliableEventHandlerFinder, DefaultReliableEventHandlerFinder>();
services.AddSingleton<IMessageTypeIdGenerator, DefaultMessageTypeIdGenerator>();
services.AddSingleton<IServiceIPv4AddressProvider, DefaultServiceIPv4AddressProvider>();
services.AddHostedService<BackgroundServiceBasedProcessorServer>();
services.AddHttpClient();
services.AddSingleton<IProcessor, MessageCleanProcessor>();
services.AddSingleton<IProcessor, PendingMessageCheckProcessor>();
services.AddSingleton<IProcessor, PendingMessageServiceInstanceCheckProcessor>();
var provider = services.BuildServiceProvider();
IComteckReliableMessageBootstrap applicationBuilderConfigurator = provider.GetService(typeof(IComteckReliableMessageBootstrap)) as IComteckReliableMessageBootstrap;
applicationBuilderConfigurator.Initialize(services, reliableMessageOption.EventHandlerAssemblies, reliableMessageOption.EventCallbackAssemblies);
return services;
}
}
}
using Pole.ReliableMessage.Abstraction;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
namespace Pole.ReliableMessage.Processor
{
public class LoopProcessor : ProcessorBase
{
private IProcessor _processor;
private readonly ILoggerFactory _loggerFactory;
private readonly ITimeHelper _timeHelper;
public LoopProcessor(IProcessor processor, ILoggerFactory loggerFactory, ITimeHelper timeHelper)
{
_processor = processor;
_loggerFactory = loggerFactory;
_timeHelper = timeHelper;
}
public override string Name => "LoopProcessor";
public override async Task Process(ProcessingContext context)
{
var logger = _loggerFactory.CreateLogger<LoopProcessor>();
while (!context.IsStopping)
{
try
{
logger.LogDebug($"{_timeHelper.GetAppropriateFormatedDateString()}...{ this.ToString() } process start");
await _processor.Process(context);
logger.LogDebug($"{_timeHelper.GetAppropriateFormatedDateString()}...{ this.ToString() } process compelete");
}
catch (Exception ex)
{
logger.LogError(ex, $"{_timeHelper.GetAppropriateFormatedDateString()}...{ this.ToString() } process error");
}
}
}
public override string ToString()
{
var strArray = new string[2];
strArray[0] = Name;
strArray[1] = _processor.Name;
return string.Join("_", strArray);
}
}
}
using Pole.ReliableMessage.Abstraction;
using Pole.ReliableMessage.Messaging;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using Pole.ReliableMessage.Storage.Abstraction;
namespace Pole.ReliableMessage.Processor
{
public class MessageCleanProcessor : ProcessorBase
{
private readonly ReliableMessageOption _options;
private readonly ILogger<MessageCleanProcessor> _logger;
private readonly IMessageStorage _messageStorage;
private readonly IMemberShipTableManager _memberShipTable;
private readonly IServiceIPv4AddressProvider _serviceIPv4AddressProvider;
public MessageCleanProcessor(IOptions<ReliableMessageOption> options, ILogger<MessageCleanProcessor> logger, IMessageStorage messageStorage, IMemberShipTableManager memberShipTable, IServiceIPv4AddressProvider serviceIPv4AddressProvider)
{
_options = options.Value ?? throw new Exception($"{nameof(ReliableMessageOption)} Must be injected");
_logger = logger;
_messageStorage = messageStorage;
_memberShipTable = memberShipTable;
_serviceIPv4AddressProvider = serviceIPv4AddressProvider;
}
public override string Name => nameof(PendingMessageCheckProcessor);
public override async Task Process(ProcessingContext context)
{
try
{
var iPStr = _serviceIPv4AddressProvider.Get();
var isPendingChecker = await _memberShipTable.IsPendingMessageCheckerServiceInstance(iPStr);// 这里可以把时间加上去
if (!isPendingChecker)
{
_logger.LogDebug("I an not the pendingChecker ,Ignore clean up messages");
return;
}
_logger.LogInformation($"Begin clean message");
var deletedCount = await _messageStorage.Delete(m => m.MessageStatusId == MessageStatus.Canced.Id || m.MessageStatusId == MessageStatus.Pushed.Id);
_logger.LogInformation($"End clean message ,delete message count : {deletedCount} , successfully");
}
catch (Exception ex)
{
_logger.LogError(ex, $"Clean message error");
}
finally
{
await Task.Delay(_options.MessageCleanInterval * 1000);
}
}
}
}
using Pole.ReliableMessage.Abstraction;
using Pole.ReliableMessage.Messaging;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.NetworkInformation;
using System.Net.Sockets;
using System.Text;
using System.Threading.Tasks;
using Pole.ReliableMessage.Storage.Abstraction;
namespace Pole.ReliableMessage.Processor
{
class PendingMessageCheckProcessor : ProcessorBase
{
private readonly IMessageStorage _storage;
private readonly ReliableMessageOption _options;
//private readonly IMessageBuffer _messageBuffer;
private readonly ITimeHelper _timeHelper;
private readonly IMemberShipTableManager _memberShipTable;
private readonly ILogger<PendingMessageCheckProcessor> _logger;
private readonly IServiceIPv4AddressProvider _serviceIPv4AddressProvider;
private readonly IMessageCheckRetryer _messageCheckRetryer;
public PendingMessageCheckProcessor(IMessageStorage storage, IOptions<ReliableMessageOption> options, ITimeHelper timeHelper, IMemberShipTableManager memberShipTable, ILogger<PendingMessageCheckProcessor> logger, IServiceIPv4AddressProvider serviceIPv4AddressProvider, IMessageCheckRetryer messageCheckRetryer)
{
_storage = storage;
_options = options.Value ?? throw new Exception($"{nameof(ReliableMessageOption)} Must be injected");
//_messageBuffer = messageBuffer;
_timeHelper = timeHelper;
_memberShipTable = memberShipTable;
_logger = logger;
_serviceIPv4AddressProvider = serviceIPv4AddressProvider;
_messageCheckRetryer = messageCheckRetryer;
}
public override string Name => nameof(PendingMessageCheckProcessor);
public override async Task Process(ProcessingContext context)
{
try
{
var iPStr = _serviceIPv4AddressProvider.Get();
var isPendingChecker = await _memberShipTable.IsPendingMessageCheckerServiceInstance(iPStr);// 这里可以把时间加上去
if (!isPendingChecker)
{
_logger.LogDebug("I an not the PendingChecker ,Ignore check");
return;
}
await ProcessInternal();
}
catch (Exception ex)
{
_logger.LogError(ex, $"PendingMessageCheckProcessor Process Error");
}
finally
{
await Task.Delay(_options.PendingMessageRetryInterval * 1000);
}
}
public async Task ProcessInternal()
{
var now = _timeHelper.GetUTCNow();
var pendingMessages = await _storage.GetMany(m => m.MessageStatusId == MessageStatus.Pending.Id && m.NextRetryUTCTime <= now && m.AddedUTCTime <= now.AddSeconds(-1 * _options.PendingMessageFirstProcessingWaitTime) && m.AddedUTCTime >= now.AddSeconds(-1 * _options.PendingMessageCheckingTimeOutSeconds), _options.PendingMessageCheckBatchCount);
if (_logger.IsEnabled(LogLevel.Debug))
{
_logger.LogDebug($"PendingMessageCheckProcessor pendingMessages count:{pendingMessages.Count}");
}
await _messageCheckRetryer.Execute(pendingMessages, now);
// 说明此时 消息数量超过 批量获取数
if (pendingMessages.Count == _options.PendingMessageCheckBatchCount)
{
await ProcessInternal();
}
}
}
}
using Pole.ReliableMessage.Abstraction;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Pole.ReliableMessage.Storage.Abstraction;
namespace Pole.ReliableMessage.Processor
{
class PendingMessageServiceInstanceCheckProcessor : ProcessorBase
{
private readonly ReliableMessageOption _options;
private readonly ITimeHelper _timeHelper;
private readonly IMemberShipTableManager _memberShipTable;
private readonly ILogger<PendingMessageServiceInstanceCheckProcessor> _logger;
private readonly IServiceIPv4AddressProvider _serviceIPv4AddressProvider;
public PendingMessageServiceInstanceCheckProcessor(IOptions<ReliableMessageOption> options, ITimeHelper timeHelper, IMemberShipTableManager memberShipTable, ILogger<PendingMessageServiceInstanceCheckProcessor> logger, IServiceIPv4AddressProvider serviceIPv4AddressProvider)
{
_options = options.Value ?? throw new Exception($"{nameof(ReliableMessageOption)} Must be injected");
_timeHelper = timeHelper;
_memberShipTable = memberShipTable;
_logger = logger;
_serviceIPv4AddressProvider = serviceIPv4AddressProvider;
}
public override string Name => nameof(PendingMessageCheckProcessor);
public override async Task Process(ProcessingContext context)
{
try
{
var now = _timeHelper.GetUTCNow();
var iPStr = _serviceIPv4AddressProvider.Get();
_logger.LogDebug($"Current instance ip is {iPStr}");
var currentCheckIp = await _memberShipTable.GetPendingMessageCheckerServiceInstanceIp(now.AddSeconds(-1 * _options.PendingMessageCheckerInstanceIAnAliveTimeout));
if (string.IsNullOrEmpty(currentCheckIp))
{
var addInstanceResult = await _memberShipTable.AddCheckerServiceInstanceAndDeleteOthers(iPStr, now);
if (addInstanceResult)
{
_logger.LogInformation($"I am the PendingMessageCheck now, ip:{iPStr}");
return;
}
_logger.LogError($"There is no PendingMessageChecker ,I want to be the PendingMessageCheck ,but faild ,memberShipTable.AddCheckerServiceInstance faild , ip:{iPStr}");
return;
}
if (currentCheckIp == iPStr)
{
_logger.LogDebug($"Begin update pendingMessageChecker iAmAliveUTCTime");
await _memberShipTable.UpdateIAmAlive(currentCheckIp, now);
_logger.LogDebug($"Update pendingMessageChecker iAmAliveUTCTime successfully");
}
}
catch (Exception ex)
{
_logger.LogError(ex, $"PendingMessageServiceInstanceCheckProcessor Process Error");
}
finally
{
await Task.Delay(_options.PendingMessageCheckerInstanceIAnAliveTimeUpdateDelay * 1000);
}
}
}
}
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
namespace Pole.ReliableMessage.Processor
{
public class ProcessingContext
{
public ProcessingContext(CancellationToken cancellationToken)
{
CancellationToken = cancellationToken;
}
public CancellationToken CancellationToken { get; }
public bool IsStopping => CancellationToken.IsCancellationRequested;
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment