From 51b2b230332e724da8d7739d78e58c384a6b7e13 Mon Sep 17 00:00:00 2001 From: dingsongjie Date: Thu, 27 Feb 2020 18:04:59 +0800 Subject: [PATCH] fix bug --- src/Pole.Core/Utils/ConsistentHash.cs | 114 ------------------------------------------------------------------------------------------------------------------ src/Pole.EventBus.Rabbitmq/Consumer/ConsumerRunner.cs | 18 +++++++++++++++++- src/Pole.Grpc/Pole.Grpc.csproj | 1 - test/Pole.Samples.Backet.Api/Program.cs | 29 ++++++++++++++++++++++++----- 4 files changed, 41 insertions(+), 121 deletions(-) delete mode 100644 src/Pole.Core/Utils/ConsistentHash.cs diff --git a/src/Pole.Core/Utils/ConsistentHash.cs b/src/Pole.Core/Utils/ConsistentHash.cs deleted file mode 100644 index 6955009..0000000 --- a/src/Pole.Core/Utils/ConsistentHash.cs +++ /dev/null @@ -1,114 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Runtime.CompilerServices; -using System.Text; - -namespace Pole.Core.Utils -{ - public class ConsistentHash - { - readonly SortedDictionary circle = new SortedDictionary(); - int _replicate = 200; //default _replicate count - int[] ayKeys = null; //cache the ordered keys for better performance - - //it's better you override the GetHashCode() of T. - //we will use GetHashCode() to identify different node. - public ConsistentHash(IEnumerable nodes) - { - Init(nodes, _replicate); - } - - public ConsistentHash(IEnumerable nodes, int replicate) - { - Init(nodes, replicate); - } - private void Init(IEnumerable nodes, int replicate) - { - _replicate = replicate; - - foreach (string node in nodes) - { - Add(node, false); - } - ayKeys = circle.Keys.ToArray(); - } - - public void Add(string node) - { - Add(node, true); - } - - public void Add(string node, bool updateKeyArray) - { - for (int i = 0; i < _replicate; i++) - { - int hash = BetterHash(node + i); - circle[hash] = node; - } - - if (updateKeyArray) - { - ayKeys = circle.Keys.ToArray(); - } - } - - public void Remove(string node) - { - for (int i = 0; i < _replicate; i++) - { - int hash = BetterHash(node + i); - if (!circle.Remove(hash)) - { - throw new Exception("can not remove a node that not added"); - } - } - ayKeys = circle.Keys.ToArray(); - } - public string GetNode(string key) - { - int first = First_ge(ayKeys, BetterHash(key)); - return circle[ayKeys[first]]; - } - //return the index of first item that >= val. - //if not exist, return 0; - //ay should be ordered arPole. - [MethodImpl(MethodImplOptions.AggressiveInlining)] - private static int First_ge(int[] ay, int val) - { - int begin = 0; - int end = ay.Length - 1; - - if (ay[end] < val || ay[0] > val) - { - return 0; - } - - while (end - begin > 1) - { - int mid = (end + begin) / 2; - if (ay[mid] >= val) - { - end = mid; - } - else - { - begin = mid; - } - } - - if (ay[begin] > val || ay[end] < val) - { - throw new Exception("should not happen"); - } - - return end; - } - - [MethodImpl(MethodImplOptions.AggressiveInlining)] - public static int BetterHash(string key) - { - return (int)MurmurHash2.Hash(Encoding.UTF8.GetBytes(key)); - } - } -} diff --git a/src/Pole.EventBus.Rabbitmq/Consumer/ConsumerRunner.cs b/src/Pole.EventBus.Rabbitmq/Consumer/ConsumerRunner.cs index dc55077..ad518b2 100644 --- a/src/Pole.EventBus.Rabbitmq/Consumer/ConsumerRunner.cs +++ b/src/Pole.EventBus.Rabbitmq/Consumer/ConsumerRunner.cs @@ -19,6 +19,7 @@ namespace Pole.EventBus.RabbitMQ readonly IMpscChannel mpscChannel; readonly ISerializer serializer; readonly RabbitOptions rabbitOptions; + List errorMessageDeliveryTags = new List(); public ConsumerRunner( IRabbitMQClient client, IServiceProvider provider, @@ -93,7 +94,17 @@ namespace Pole.EventBus.RabbitMQ } if (!Consumer.Config.AutoAck) { - Model.Model.BasicAck(list.Max(o => o.DeliveryTag), true); + if (errorMessageDeliveryTags.Count == 0) + { + Model.Model.BasicAck(list.Max(o => o.DeliveryTag), true); + } + else + { + list.ForEach(m => + { + Model.Model.BasicAck(m.DeliveryTag, false); + }); + } } } } @@ -121,8 +132,11 @@ namespace Pole.EventBus.RabbitMQ private async Task ProcessComsumerErrors(BasicDeliverEventArgs ea, Exception exception) { // todo 这里需要添加断路器 防止超量的 Task.Delay + if (ea.BasicProperties.Headers.TryGetValue(Consts.ConsumerRetryTimesStr, out object retryTimesObj)) { + errorMessageDeliveryTags.Add(ea.DeliveryTag); + var retryTimesStr = Encoding.UTF8.GetString((byte[])retryTimesObj); var retryTimes = Convert.ToInt32(retryTimesStr); if (retryTimes < Consumer.Config.MaxReenqueueTimes) @@ -135,6 +149,8 @@ namespace Pole.EventBus.RabbitMQ using var channel = Client.PullChannel(); channel.Publish(ea.Body, ea.BasicProperties.Headers, Queue.Queue, string.Empty, true); Model.Model.BasicAck(ea.DeliveryTag, false); + + errorMessageDeliveryTags.Remove(ea.DeliveryTag); }); } else diff --git a/src/Pole.Grpc/Pole.Grpc.csproj b/src/Pole.Grpc/Pole.Grpc.csproj index 8678271..c615e71 100644 --- a/src/Pole.Grpc/Pole.Grpc.csproj +++ b/src/Pole.Grpc/Pole.Grpc.csproj @@ -13,7 +13,6 @@ - diff --git a/test/Pole.Samples.Backet.Api/Program.cs b/test/Pole.Samples.Backet.Api/Program.cs index 2adc1a8..b917538 100644 --- a/test/Pole.Samples.Backet.Api/Program.cs +++ b/test/Pole.Samples.Backet.Api/Program.cs @@ -6,6 +6,7 @@ using Pole.Samples.Backet.Api.Benchmarks; using System; using System.Collections.Generic; using System.Text; +using System.Threading; using System.Threading.Tasks; namespace Pole.Samples.Backet.Api @@ -18,13 +19,31 @@ namespace Pole.Samples.Backet.Api //await grainWithEntityframeworkCoreAndPgTest.SingleOrDefaultAsync(); //Summary summary = BenchmarkRunner.Run(); //Console.ReadLine(); - using ( var connection = new NpgsqlConnection("Server=192.168.0.248;Port=5432;Username=postgres;Password=comteck2020!@#;Database=Pole-Backet;Enlist=True;Timeout=0;Command Timeout=600;MinPoolSize=20;MaxPoolSize=500;")) + //using ( var connection = new NpgsqlConnection("Server=192.168.0.248;Port=5432;Username=postgres;Password=comteck2020!@#;Database=Pole-Backet;Enlist=True;Timeout=0;Command Timeout=600;MinPoolSize=20;MaxPoolSize=500;")) + //{ + // var uploader = new Pole.EventStorage.PostgreSql.PoleNpgsqlBulkUploader(connection); + // var events = new List(); + // events.Add(new EventEntity { Id = "111", Retries = 20, ExpiresAt = DateTime.Now, StatusName = "333" }); + // await uploader.UpdateAsync("\"pole\".\"Events\"", events); + //} + // Queue the task. + for (var i = 0; i < 100; i++) { - var uploader = new Pole.EventStorage.PostgreSql.PoleNpgsqlBulkUploader(connection); - var events = new List(); - events.Add(new EventEntity { Id = "111", Retries = 20, ExpiresAt = DateTime.Now, StatusName = "333" }); - await uploader.UpdateAsync("\"pole\".\"Events\"", events); + ThreadPool.QueueUserWorkItem(ThreadProc, i); } + + Console.WriteLine("Main thread does some work, then sleeps."); + Thread.Sleep(1000); + + Console.WriteLine("Main thread exits."); + } + + // This thread procedure performs the task. + static void ThreadProc(Object stateInfo) + { + var i = Convert.ToInt32(stateInfo); + // No state object was passed to QueueUserWorkItem, so stateInfo is null. + Console.WriteLine($"Hello from the thread pool.{i}"); } } } -- libgit2 0.25.0