11using System ;
22using System . Collections . Concurrent ;
33using System . Collections . Generic ;
4+ using System . Threading ;
45using System . Threading . Tasks ;
5- using System . Timers ;
66
77namespace Splunk
88{
@@ -18,7 +18,6 @@ public class BatchManager
1818 {
1919 readonly ConcurrentBag < object > events ;
2020 readonly uint batchSizeCount ;
21- readonly uint batchIntervalInMilliseconds ;
2221 readonly Timer timer ;
2322 readonly Action < List < object > > emitAction ;
2423
@@ -31,57 +30,44 @@ public class BatchManager
3130 /// <param name="batchSizeCount">Batch size count.</param>
3231 /// <param name="batchIntervalInMilliseconds">Batch interval in milliseconds.</param>
3332 /// <param name="emitAction">Emit action to be invoked at Emit process.</param>
34- public BatchManager ( uint batchSizeCount , uint batchIntervalInMilliseconds , Action < List < object > > emitAction )
33+ public BatchManager ( uint batchSizeCount , int batchIntervalInMilliseconds , Action < List < object > > emitAction )
3534 {
3635 events = new ConcurrentBag < object > ( ) ;
3736 this . batchSizeCount = batchSizeCount ;
38- this . batchIntervalInMilliseconds = batchIntervalInMilliseconds ;
3937
4038 if ( batchIntervalInMilliseconds > 0 )
41- {
42- timer = new Timer ( batchIntervalInMilliseconds ) ;
43- timer . AutoReset = false ;
44- timer . Enabled = true ;
45- timer . Elapsed += TimerTick ;
46- timer . Start ( ) ;
47- }
39+ timer = new Timer ( EmitTimeChek , null , 0 , batchIntervalInMilliseconds ) ;
4840
4941 this . emitAction = emitAction ;
5042 }
5143
52- void TimerTick ( object sender , ElapsedEventArgs e )
44+ void EmitTimeChek ( object state )
5345 {
54- Task
55- . Factory
56- . StartNew ( ( ) =>
57- {
58- if ( events . Count > 0 )
59- Emit ( ) ;
60- } ) . ContinueWith ( task =>
61- {
62- timer ? . Start ( ) ;
63- } ) ;
46+ if ( events . Count > 0 )
47+ Emit ( ) ;
6448 }
6549
6650 void Emit ( )
6751 {
68- bool continueExtraction = true ;
69- List < object > emitEvents = new List < object > ( ) ;
70- while ( continueExtraction )
71- {
72- if ( events . Count == 0 )
73- continueExtraction = false ;
74- else
52+ Task . Factory . StartNew ( ( ) => {
53+ bool continueExtraction = true ;
54+ List < object > emitEvents = new List < object > ( ) ;
55+ while ( continueExtraction )
7556 {
76- events . TryTake ( out object item ) ;
77- if ( item != null )
78- emitEvents . Add ( item ) ;
79- if ( events . Count == 0 || emitEvents . Count >= batchSizeCount )
57+ if ( events . Count == 0 )
8058 continueExtraction = false ;
59+ else
60+ {
61+ events . TryTake ( out object item ) ;
62+ if ( item != null )
63+ emitEvents . Add ( item ) ;
64+ if ( events . Count == 0 || emitEvents . Count >= batchSizeCount )
65+ continueExtraction = false ;
66+ }
8167 }
82- }
83- if ( emitEvents . Count > 0 )
84- emitAction ? . Invoke ( emitEvents ) ;
68+ if ( emitEvents . Count > 0 )
69+ emitAction ? . Invoke ( emitEvents ) ;
70+ } ) ;
8571 }
8672
8773 /// <summary>
0 commit comments