Commit 170a7cd6 by dingsongjie

添加 启动优化

parent d6a21d40
......@@ -45,6 +45,7 @@ namespace Backet.Api.Controllers
{
var id = "da8a489fa7b4409294ee1b358fbbfba5";
var grain = clusterClient.GetGrain<IBacketGrain>(id);
clusterClient.
return grain.RemoveFirstItem();
}
}
......
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.Core.DependencyInjection
{
public interface ISingleDependency
{
}
}
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.Core.DependencyInjection
{
public interface ITransientDependency
{
}
}
using Microsoft.Extensions.DependencyInjection;
using Pole.Core.Abstraction;
using Pole.Core.Channels;
using Pole.Core.EventBus;
using Pole.Core.Processor;
using Pole.Core.Processor.Server;
using Pole.Core.Serialization;
using Pole.Core.UnitOfWork;
using Pole.Core.Utils;
using Pole.Core.Utils.Abstraction;
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.Core.Extensions
{
public static class IServiceCollectionExtensions
{
public static IServiceCollection AddPole(this IServiceCollection services,Action<PoleOptions> config)
{
services.AddSingleton<IEventTypeFinder, EventTypeFinder>();
services.AddTransient(typeof(IMpscChannel<>), typeof(MpscChannel<>));
services.AddScoped<IBus, Bus>();
services.AddScoped<IUnitOfWork, Pole.Core.UnitOfWork.UnitOfWork>();
services.AddSingleton<ISerializer, DefaultJsonSerializer>();
services.AddSingleton<IGeneratorIdSolver, InstanceIPV4_16IdGeneratorIdSolver>();
services.AddSingleton<ISnowflakeIdGenerator, SnowflakeIdGenerator>();
services.AddSingleton<IProcessor, PendingMessageRetryProcessor>();
services.AddSingleton<IProcessor, ExpiredEventsCollectorProcessor>();
services.AddHostedService<BackgroundServiceBasedProcessorServer>();
return services;
}
}
}
using Microsoft.AspNetCore.Builder;
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.Core.Extensions
{
public static class IApplicationBuilderExtensions
{
public static IApplicationBuilder UsePole(this IApplicationBuilder applicationBuilder)
{
Startup.StartRay(applicationBuilder.ApplicationServices);
return applicationBuilder;
}
}
}
using System;
using System.Collections.Generic;
using System.Linq;
namespace Pole.Core.Observer
{
/// <summary>
/// 标记为观察者
/// </summary>
[AttributeUsage(AttributeTargets.Class, AllowMultiple = false)]
public class ObserverAttribute : Attribute
{
/// <summary>
/// 事件监听者标记
/// </summary>
/// <param name="group">监听者分组</param>
/// <param name="name">监听者名称(如果是shadow请设置为null)</param>
/// <param name="observable">被监听的Type</param>
/// <param name="observer">监听者的Type</param>
public ObserverAttribute(string group, string name, Type observable, Type observer = default)
{
Group = group;
Name = name;
Observable = observable;
Observer = observer;
}
/// <summary>
/// 监听者分组
/// </summary>
public string Group { get; set; }
/// <summary>
/// 监听者名称(如果是shadow请设置为null)
/// </summary>
public string Name { get; set; }
/// <summary>
/// 被监听的Type
/// </summary>
public Type Observable { get; set; }
/// <summary>
/// 监听者的Type
/// </summary>
public Type Observer { get; set; }
}
}
using System.Collections.Generic;
using System.Threading.Tasks;
using Orleans.Concurrency;
namespace Pole.Core.Observer
{
public interface IObserver : IVersion
{
Task OnNext(Immutable<byte[]> bytes);
Task OnNext(Immutable<List<byte[]>> items);
/// <summary>
/// 重置状态
/// </summary>
/// <returns></returns>
Task Reset();
}
}
using System.Threading.Tasks;
namespace Pole.Core.Observer
{
public interface IVersion
{
Task<long> GetVersion();
Task<long> GetAndSaveVersion(long compareVersion);
}
}
......@@ -6,6 +6,7 @@
<ItemGroup>
<PackageReference Include="MediatR.Extensions.Microsoft.DependencyInjection" Version="8.0.0" />
<PackageReference Include="Microsoft.AspNetCore.Http.Abstractions" Version="2.2.0" />
<PackageReference Include="Microsoft.Orleans.Core.Abstractions" Version="3.0.2" />
<PackageReference Include="Microsoft.Orleans.Runtime.Abstractions" Version="3.0.2" />
<PackageReference Include="System.Runtime.Loader" Version="4.3.0" />
......
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Pole.Core.DependencyInjection
namespace Pole.Core.Processor
{
public interface IScopedDenpendency
public interface IProcessorServer
{
Task Start(CancellationToken stoppingToken);
}
}
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Pole.Core.Processor.Server
{
public class BackgroundServiceBasedProcessorServer : BackgroundService, IProcessorServer
{
private readonly IServiceProvider _serviceProvider;
private Task _compositeTask;
public BackgroundServiceBasedProcessorServer(IServiceProvider serviceProvider)
{
_serviceProvider = serviceProvider;
}
public async Task Start(CancellationToken stoppingToken)
{
ProcessingContext processingContext = new ProcessingContext(stoppingToken);
List<LoopProcessor> loopProcessors = new List<LoopProcessor>();
var innerProcessors = _serviceProvider.GetServices<IProcessor>();
var loggerFactory = _serviceProvider.GetService<ILoggerFactory>();
foreach (var innerProcessor in innerProcessors)
{
LoopProcessor processor = new LoopProcessor(innerProcessor, loggerFactory);
loopProcessors.Add(processor);
}
var tasks = loopProcessors.Select(p => p.Process(processingContext));
_compositeTask = Task.WhenAll(tasks);
await _compositeTask;
}
protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
return Start(stoppingToken);
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment