Commit ac47a1a5 by dingsongjie

完善代码

parent 6834cd1a
......@@ -19,6 +19,13 @@ namespace SagasServer
public static IWebHostBuilder CreateWebHostBuilder(string[] args) =>
WebHost.CreateDefaultBuilder(args)
.UseStartup<Startup>();
.UseStartup<Startup>()
.UseKestrel(option =>
{
option.ListenAnyIP(80, config =>
{
config.Protocols = Microsoft.AspNetCore.Server.Kestrel.Core.HttpProtocols.Http2;
});
});
}
}
......@@ -18,7 +18,7 @@
"SagasServer": {
"commandName": "Project",
"launchBrowser": true,
"applicationUrl": "https://localhost:5001;http://localhost:5000",
"applicationUrl": "http://localhost:5000",
"environmentVariables": {
"ASPNETCORE_ENVIRONMENT": "Development"
}
......
......@@ -6,10 +6,8 @@
</PropertyGroup>
<ItemGroup>
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\..\src\Pole.Sagas.Server\Pole.Sagas.Server.csproj" />
<ProjectReference Include="..\..\..\src\Pole.Sagas.Storage.PostgreSql\Pole.Sagas.Storage.PostgreSql.csproj" />
</ItemGroup>
</Project>
......@@ -5,6 +5,7 @@ using System.Threading.Tasks;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
......@@ -12,11 +13,25 @@ namespace SagasServer
{
public class Startup
{
private IConfiguration Configuration { get; }
private IWebHostEnvironment Environment { get; }
public Startup(IConfiguration configuration, IWebHostEnvironment env)
{
Configuration = configuration;
Environment = env;
}
// This method gets called by the runtime. Use this method to add services to the container.
// For more information on how to configure your application, visit https://go.microsoft.com/fwlink/?LinkID=398940
public void ConfigureServices(IServiceCollection services)
{
services.AddPoleSagasServer();
services.AddPole(config =>
{
config.AddSagasServer();
config.AddSagasServerPGStorage(option =>
{
option.ConnectionString = Configuration["postgres:write"];
});
});
}
// This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
......
......@@ -4,5 +4,8 @@
"Default": "Warning"
}
},
"postgres": {
"write": "Server=192.168.0.248;Port=5432;Username=postgres;Password=comteck2020!@#;Database=Pole-Sagas;Enlist=True;Timeout=0;Command Timeout=600;MinPoolSize=20;MaxPoolSize=500;"
},
"AllowedHosts": "*"
}
using Pole.Core.Domain;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace Backet.Api.Domain.AggregatesModel.BacketAggregate
{
public class Backet : Entity, IAggregateRoot
{
public void AddBacketItem(string productId, string productName, long Price)
{
BacketItem backetItem = new BacketItem()
{
Id = Guid.NewGuid().ToString("N"),
Price = Price,
ProductId = productId,
ProductName = productName
};
BacketItems.Add(backetItem);
SetBacketTotalPrice();
}
public void ModifyItemProductId(string productId)
{
BacketItems.ForEach(m => m.ProductId = productId);
}
private void SetBacketTotalPrice()
{
foreach (var item in BacketItems)
{
TotalPrice = BacketItems.Sum(m=>m.Price);
}
}
public string UserId { get; set; }
public List<BacketItem> BacketItems { get; private set; } = new List<BacketItem>();
public long TotalPrice { get; set; }
internal void RemoveFirstItem()
{
var first = BacketItems.FirstOrDefault();
if (first != null)
{
BacketItems.Remove(first);
SetBacketTotalPrice();
}
}
}
}
using Pole.Core.Domain;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace Backet.Api.Domain.AggregatesModel.BacketAggregate
{
public class BacketItem : Entity
{
public string ProductId { get; set; }
public string ProductName { get; set; }
public long Price { get; set; }
public string BacketId { get; set; }
}
}
using Backet.Api.Domain.AggregatesModel.BacketAggregate;
using Backet.Api.Infrastructure.EntityConfigurations;
using Microsoft.EntityFrameworkCore;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace Backet.Api.Infrastructure
{
public class BacketDbContext : DbContext
{
public BacketDbContext(DbContextOptions options) : base(options)
{
}
public DbSet<Backet.Api.Domain.AggregatesModel.BacketAggregate.Backet> Backets { get; set; }
public DbSet<BacketItem> BacketItems { get; set; }
protected override void OnModelCreating(ModelBuilder builder)
{
base.OnModelCreating(builder);
builder.ApplyConfiguration(new BacketItemEntityTypeConfiguration());
builder.ApplyConfiguration(new BacketEntityTypeConfiguration());
}
}
}
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Metadata.Builders;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace Backet.Api.Infrastructure.EntityConfigurations
{
public class BacketEntityTypeConfiguration : IEntityTypeConfiguration<Backet.Api.Domain.AggregatesModel.BacketAggregate.Backet>
{
public void Configure(EntityTypeBuilder<Backet.Api.Domain.AggregatesModel.BacketAggregate.Backet> builder)
{
builder.ToTable(nameof(Backet.Api.Domain.AggregatesModel.BacketAggregate.Backet));
builder.Property(m => m.Id).HasMaxLength(32);
builder.Property(m => m.UserId).HasMaxLength(32).IsRequired();
builder.HasMany(m => m.BacketItems).WithOne().IsRequired().OnDelete(DeleteBehavior.Cascade).HasForeignKey("BacketId");
builder.Ignore(m => m.DomainEvents);
builder.Ignore(m => m.IsPersisted);
builder.HasKey(m => m.Id);
builder.HasIndex(m => m.UserId);
}
}
}
using Backet.Api.Domain.AggregatesModel.BacketAggregate;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Metadata.Builders;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace Backet.Api.Infrastructure.EntityConfigurations
{
public class BacketItemEntityTypeConfiguration : IEntityTypeConfiguration<BacketItem>
{
public void Configure(EntityTypeBuilder<BacketItem> builder)
{
builder.ToTable(nameof(BacketItem));
builder.Property(m => m.Id).HasMaxLength(32);
builder.Property(m => m.ProductId).HasMaxLength(32);
builder.Property(m => m.ProductName).HasMaxLength(256).IsRequired();
builder.Property(m => m.BacketId).HasMaxLength(32).IsRequired();
builder.Ignore(m => m.DomainEvents);
builder.Ignore(m => m.IsPersisted);
builder.HasKey(m => m.Id);
builder.HasIndex(m => m.ProductId);
}
}
}
......@@ -8,8 +8,8 @@
<ItemGroup>
<ProjectReference Include="..\..\..\src\Pole.Core\Pole.Core.csproj" />
<ProjectReference Include="..\..\..\src\Pole.EventBus.Rabbitmq\Pole.EventBus.Rabbitmq.csproj" />
<ProjectReference Include="..\..\..\src\Pole.EventStorage.PostgreSql\Pole.EventStorage.PostgreSql.csproj" />
<ProjectReference Include="..\..\..\src\Pole.Sagas.Client\Pole.Sagas.Client.csproj" />
<ProjectReference Include="..\..\..\src\Pole.Sagas.Storage.PostgreSql\Pole.Sagas.Storage.PostgreSql.csproj" />
</ItemGroup>
</Project>
......@@ -2,12 +2,10 @@
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Backet.Api.Infrastructure;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.HttpsPolicy;
using Microsoft.AspNetCore.Mvc;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
......@@ -29,19 +27,12 @@ namespace SagasTest.Api
// This method gets called by the runtime. Use this method to add services to the container.
public void ConfigureServices(IServiceCollection services)
{
services.AddDbContextPool<BacketDbContext>(options => options.UseNpgsql(Configuration["postgres:write"]));
services.AddControllers();
services.AddPole(config =>
{
//config.AddRabbitMQ(option =>
//{
// option.Hosts = new string[1] { Configuration["RabbitmqConfig:HostAddress"] };
// option.Password = Configuration["RabbitmqConfig:HostPassword"];
// option.UserName = Configuration["RabbitmqConfig:HostUserName"];
//});
//config.AddEntityFrameworkEventStorage<BacketDbContext>();
config.AddSagas(option=> {
option.ServiceName = "SagasTest";
option.SagasServerHost = "http://localhost:80";
});
});
services.AddHttpClient();
......
......@@ -24,7 +24,7 @@ namespace Pole.Sagas.Client
private readonly SagaRestorer sagaRestorer;
private readonly IEventSender eventSender;
private readonly ILogger logger;
public NotEndedSagasCompensateRetryBackgroundService(IOptions<PoleSagasOption> options, SagaClient sagaClient, IServiceProvider serviceProvider, IEventSender eventSender, ILogger logger)
public NotEndedSagasCompensateRetryBackgroundService(IOptions<PoleSagasOption> options, SagaClient sagaClient, IServiceProvider serviceProvider, IEventSender eventSender, ILogger<NotEndedSagasCompensateRetryBackgroundService> logger)
{
this.options = options.Value;
this.sagaClient = sagaClient;
......@@ -111,7 +111,7 @@ namespace Pole.Sagas.Client
public Task StopAsync(CancellationToken cancellationToken)
{
throw new NotImplementedException();
return Task.CompletedTask;
}
}
}
......@@ -20,10 +20,13 @@ namespace Microsoft.Extensions.DependencyInjection
{
public static void AddSagas(this StartupConfig startupOption, Action<PoleSagasOption> configAction)
{
// 让客户端支持 没有TLS 的 grpc call
AppContext.SetSwitch("System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport", true);
startupOption.Services.Configure(configAction);
startupOption.Services.AddSingleton<IActivityFinder, ActivityFinder>();
startupOption.Services.AddSingleton<IEventSender, EventSender>();
startupOption.Services.AddSingleton<ISagaFactory, SagaFactory>();
startupOption.Services.AddHostedService<NotEndedSagasCompensateRetryBackgroundService>();
PoleSagasOption sagasOption = null;
using (var provider = startupOption.Services.BuildServiceProvider())
{
......
using System;
using Microsoft.Extensions.DependencyInjection;
using System;
using System.Collections.Generic;
using System.Text;
......@@ -26,5 +27,6 @@ namespace Pole.Sagas.Server
/// 批量删除时 实际过期的数量比预定数量要大时,会分多次删除,此值为其中每次分批删除的时间间隔
/// </summary>
public int ExpiredDataPreBulkDeleteDelaySeconds { get; set; } = 3;
}
}
using Microsoft.Extensions.DependencyInjection;
using Pole.Core;
using Pole.Core.Processor;
using Pole.Sagas.Server;
using Pole.Sagas.Server.Processor;
......@@ -10,17 +11,17 @@ namespace Microsoft.Extensions.DependencyInjection
{
public static class PoleSagasServerServiceCollectionExtensions
{
public static IServiceCollection AddPoleSagasServer(this IServiceCollection services, Action<PoleSagasServerOption> config = null)
public static StartupConfig AddSagasServer(this StartupConfig startupConfig, Action<PoleSagasServerOption> config = null)
{
Action<PoleSagasServerOption> defaultConfig = option => { };
var finalConfig = config ?? defaultConfig;
services.AddGrpc();
services.Configure(config);
startupConfig.Services.AddGrpc();
startupConfig.Services.Configure(finalConfig);
services.AddSingleton<IProcessor, NotEndedSagasFetchProcessor>();
services.AddSingleton<IProcessor, ExpiredSagasCollectorProcessor>();
services.AddHostedService<BackgroundServiceBasedProcessorServer>();
return services;
startupConfig.Services.AddSingleton<IProcessor, NotEndedSagasFetchProcessor>();
startupConfig.Services.AddSingleton<IProcessor, ExpiredSagasCollectorProcessor>();
startupConfig.Services.AddHostedService<BackgroundServiceBasedProcessorServer>();
return startupConfig;
}
}
}
using Microsoft.Extensions.DependencyInjection;
using Pole.Core;
using Pole.Sagas.Core;
using Pole.Sagas.Core.Abstraction;
using Pole.Sagas.Storage.PostgreSql;
using System;
using System.Collections.Generic;
using System.Text;
namespace Pole.Sagas.Storage.PostgreSql
namespace Microsoft.Extensions.DependencyInjection
{
public static class PoleSagasPostgreSqlExtensions
{
public static IServiceCollection AddPostgreSqlStorage(IServiceCollection services,Action<PoleSagasStoragePostgreSqlOption> config)
public static StartupConfig AddSagasServerPGStorage(this StartupConfig startupConfig, Action<PoleSagasStoragePostgreSqlOption> config)
{
services.Configure(config);
services.AddSingleton<ISagaStorageInitializer, PostgreSqlSagaStorageInitializer>();
services.AddSingleton<ISagaStorage, PostgreSqlSagaStorage>();
return services;
startupConfig.Services.Configure(config);
startupConfig.Services.AddSingleton<ISagaStorageInitializer, PostgreSqlSagaStorageInitializer>();
startupConfig.Services.AddSingleton<ISagaStorage, PostgreSqlSagaStorage>();
return startupConfig;
}
}
}
......@@ -56,7 +56,7 @@ CREATE TABLE IF NOT EXISTS {GetSagaTableName()}(
""ExpiresAt"" timestamp,
""AddTime"" timestamp NOT NULL
);
ALTER TABLE ""{GetSagaTableName()}"" ADD CONSTRAINT ""Sagas_pkey"" PRIMARY KEY (""Id"");
ALTER TABLE {GetSagaTableName()} ADD CONSTRAINT ""Sagas_pkey"" PRIMARY KEY (""Id"");
CREATE TABLE IF NOT EXISTS {GetActivityTableName()}(
""Id"" varchar(20) COLLATE ""pg_catalog"".""default"" NOT NULL,
......@@ -71,14 +71,14 @@ CREATE TABLE IF NOT EXISTS {GetActivityTableName()}(
""AddTime"" timestamp NOT NULL
);
CREATE INDEX ""Activities_SagaId"" ON ""{GetActivityTableName()}"" USING btree (
CREATE INDEX ""Activities_SagaId"" ON {GetActivityTableName()} USING btree (
""SagaId"" COLLATE ""pg_catalog"".""default"" ""pg_catalog"".""text_ops"" ASC NULLS LAST
);
ALTER TABLE ""{GetActivityTableName()}"" ADD CONSTRAINT ""Activities_pkey"" PRIMARY KEY (""Id"");
ALTER TABLE {GetActivityTableName()} ADD CONSTRAINT ""Activities_pkey"" PRIMARY KEY (""Id"");
ALTER TABLE ""{GetActivityTableName()}"" ADD CONSTRAINT ""Activities_SagaId_fkey"" FOREIGN KEY (""SagaId"") REFERENCES {GetSagaTableName()} (""Id"") ON DELETE CASCADE ON UPDATE NO ACTION;
ALTER TABLE {GetActivityTableName()} ADD CONSTRAINT ""Activities_SagaId_fkey"" FOREIGN KEY (""SagaId"") REFERENCES {GetSagaTableName()} (""Id"") ON DELETE CASCADE ON UPDATE NO ACTION;
";
return batchSql;
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment