Skip to content
Toggle navigation
P
Projects
G
Groups
S
Snippets
Help
丁松杰
/
Pole
This project
Loading...
Sign in
Toggle navigation
Go to a project
Project
Repository
Issues
0
Merge Requests
0
Pipelines
Wiki
Snippets
Members
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Commit
f78114ce
authored
Feb 14, 2020
by
丁松杰
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
添加 过期事件清理功能
parent
c0303e02
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
76 additions
and
0 deletions
src/Pole.Core/PoleOptions.cs
src/Pole.Core/Processor/ExpiredEventsCollectorProcessor.cs
src/Pole.Core/PoleOptions.cs
View file @
f78114ce
...
...
@@ -8,6 +8,9 @@ namespace Pole.Core
public
class
PoleOptions
{
public
int
PendingMessageRetryIntervalSeconds
{
get
;
set
;
}
=
30
;
public
int
ExpiredEventsPreBulkDeleteDelaySeconds
{
get
;
set
;
}
=
3
;
public
int
ExpiredEventsCollectIntervalSeconds
{
get
;
set
;
}
=
60
*
60
;
public
IServiceCollection
Services
{
get
;
private
set
;
}
}
}
src/Pole.Core/Processor/ExpiredEventsCollectorProcessor.cs
0 → 100644
View file @
f78114ce
using
Microsoft.Extensions.Logging
;
using
Microsoft.Extensions.Options
;
using
Pole.Core.EventBus.EventStorage
;
using
System
;
using
System.Collections.Generic
;
using
System.Text
;
using
System.Threading.Tasks
;
namespace
Pole.Core.Processor
{
class
ExpiredEventsCollectorProcessor
:
IProcessor
{
private
readonly
ILogger
logger
;
private
readonly
IEventStorageInitializer
initializer
;
private
readonly
IEventStorage
eventstorage
;
private
readonly
PoleOptions
poleOptions
;
private
const
int
ItemBatch
=
1000
;
private
readonly
TimeSpan
_waitingInterval
=
TimeSpan
.
FromMinutes
(
5
);
private
readonly
TimeSpan
_delay
=
TimeSpan
.
FromSeconds
(
1
);
public
string
Name
=>
nameof
(
PendingMessageRetryProcessor
);
public
ExpiredEventsCollectorProcessor
(
ILogger
<
ExpiredEventsCollectorProcessor
>
logger
,
IEventStorageInitializer
initializer
,
IEventStorage
eventstorage
,
IOptions
<
PoleOptions
>
poleOptions
)
{
this
.
logger
=
logger
;
this
.
initializer
=
initializer
;
this
.
eventstorage
=
eventstorage
;
this
.
poleOptions
=
poleOptions
.
Value
;
}
public
async
Task
Process
(
ProcessingContext
context
)
{
try
{
var
tables
=
new
[]
{
initializer
.
GetTableName
(),
};
foreach
(
var
table
in
tables
)
{
logger
.
LogDebug
(
$"Collecting expired data from table:
{
table
}
"
);
int
deletedCount
;
var
time
=
DateTime
.
Now
;
do
{
deletedCount
=
await
eventstorage
.
DeleteExpiresAsync
(
table
,
time
,
ItemBatch
,
context
.
CancellationToken
);
if
(
deletedCount
!=
0
)
{
await
Task
.
Delay
(
poleOptions
.
ExpiredEventsPreBulkDeleteDelaySeconds
*
1000
);
}
}
while
(
deletedCount
!=
0
);
}
}
catch
(
Exception
ex
)
{
logger
.
LogError
(
ex
,
$"
{
nameof
(
ExpiredEventsCollectorProcessor
)}
Process Error"
);
}
finally
{
await
Task
.
Delay
(
poleOptions
.
ExpiredEventsCollectIntervalSeconds
*
1000
);
}
}
}
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment