@@ -145,25 +145,27 @@ public void Dispose()
145145 /// </summary>
146146 private async Task RunChangeConsumptionLoopAsync ( )
147147 {
148- this . _logger . LogInformation ( "Starting change consumption loop." ) ;
148+ this . _logger . LogDebugWithThreadId ( "Starting change consumption loop." ) ;
149149
150150 try
151151 {
152152 CancellationToken token = this . _cancellationTokenSourceCheckForChanges . Token ;
153153
154154 using ( var connection = new SqlConnection ( this . _connectionString ) )
155155 {
156+ this . _logger . LogDebugWithThreadId ( "BEGIN OpenChangeConsumptionConnection" ) ;
156157 await connection . OpenAsync ( token ) ;
157-
158+ this . _logger . LogDebugWithThreadId ( "END OpenChangeConsumptionConnection" ) ;
158159 // Check for cancellation request only after a cycle of checking and processing of changes completes.
159160 while ( ! token . IsCancellationRequested )
160161 {
162+ this . _logger . LogDebugWithThreadId ( $ "BEGIN CheckingForChanges State={ this . _state } ") ;
161163 if ( this . _state == State . CheckingForChanges )
162164 {
163165 await this . GetTableChangesAsync ( connection , token ) ;
164166 await this . ProcessTableChangesAsync ( connection , token ) ;
165167 }
166-
168+ this . _logger . LogDebugWithThreadId ( "END CheckingForChanges" ) ;
167169 await Task . Delay ( TimeSpan . FromSeconds ( PollingIntervalInSeconds ) , token ) ;
168170 }
169171 }
@@ -195,7 +197,7 @@ private async Task RunChangeConsumptionLoopAsync()
195197 private async Task GetTableChangesAsync ( SqlConnection connection , CancellationToken token )
196198 {
197199 TelemetryInstance . TrackEvent ( TelemetryEventName . GetChangesStart , this . _telemetryProps ) ;
198-
200+ this . _logger . LogDebugWithThreadId ( "BEGIN GetTableChanges" ) ;
199201 try
200202 {
201203 var transactionSw = Stopwatch . StartNew ( ) ;
@@ -208,14 +210,17 @@ private async Task GetTableChangesAsync(SqlConnection connection, CancellationTo
208210 // Update the version number stored in the global state table if necessary before using it.
209211 using ( SqlCommand updateTablesPreInvocationCommand = this . BuildUpdateTablesPreInvocation ( connection , transaction ) )
210212 {
213+ this . _logger . LogDebugWithThreadId ( $ "BEGIN UpdateTablesPreInvocation Query={ updateTablesPreInvocationCommand . CommandText } ") ;
211214 var commandSw = Stopwatch . StartNew ( ) ;
212215 await updateTablesPreInvocationCommand . ExecuteNonQueryAsync ( token ) ;
213216 setLastSyncVersionDurationMs = commandSw . ElapsedMilliseconds ;
214217 }
218+ this . _logger . LogDebugWithThreadId ( $ "END UpdateTablesPreInvocation Duration={ setLastSyncVersionDurationMs } ms") ;
215219
216220 // Use the version number to query for new changes.
217221 using ( SqlCommand getChangesCommand = this . BuildGetChangesCommand ( connection , transaction ) )
218222 {
223+ this . _logger . LogDebugWithThreadId ( $ "BEGIN GetChanges Query={ getChangesCommand . CommandText } ") ;
219224 var commandSw = Stopwatch . StartNew ( ) ;
220225 var rows = new List < IReadOnlyDictionary < string , string > > ( ) ;
221226
@@ -230,18 +235,19 @@ private async Task GetTableChangesAsync(SqlConnection connection, CancellationTo
230235 this . _rows = rows ;
231236 getChangesDurationMs = commandSw . ElapsedMilliseconds ;
232237 }
233-
234- this . _logger . LogDebug ( $ "Changed rows count: { this . _rows . Count } .") ;
238+ this . _logger . LogDebugWithThreadId ( $ "END GetChanges Duration={ getChangesDurationMs } ms ChangedRows={ this . _rows . Count } ") ;
235239
236240 // If changes were found, acquire leases on them.
237241 if ( this . _rows . Count > 0 )
238242 {
239243 using ( SqlCommand acquireLeasesCommand = this . BuildAcquireLeasesCommand ( connection , transaction ) )
240244 {
245+ this . _logger . LogDebugWithThreadId ( $ "BEGIN AcquireLeases Query={ acquireLeasesCommand . CommandText } ") ;
241246 var commandSw = Stopwatch . StartNew ( ) ;
242247 await acquireLeasesCommand . ExecuteNonQueryAsync ( token ) ;
243248 acquireLeasesDurationMs = commandSw . ElapsedMilliseconds ;
244249 }
250+ this . _logger . LogDebugWithThreadId ( $ "END AcquireLeases Duration={ acquireLeasesDurationMs } ms") ;
245251 }
246252
247253 transaction . Commit ( ) ;
@@ -282,10 +288,12 @@ private async Task GetTableChangesAsync(SqlConnection connection, CancellationTo
282288 this . _logger . LogError ( $ "Failed to check for changes in table '{ this . _userTable . FullName } ' due to exception: { e . GetType ( ) } . Exception message: { e . Message } ") ;
283289 TelemetryInstance . TrackException ( TelemetryErrorName . GetChanges , e , this . _telemetryProps ) ;
284290 }
291+ this . _logger . LogDebugWithThreadId ( "END GetTableChanges" ) ;
285292 }
286293
287294 private async Task ProcessTableChangesAsync ( SqlConnection connection , CancellationToken token )
288295 {
296+ this . _logger . LogDebugWithThreadId ( "BEGIN ProcessTableChanges" ) ;
289297 if ( this . _rows . Count > 0 )
290298 {
291299 this . _state = State . ProcessingChanges ;
@@ -297,7 +305,7 @@ private async Task ProcessTableChangesAsync(SqlConnection connection, Cancellati
297305 // thing. We could still try to trigger on the correctly processed changes, but that adds additional
298306 // complication because we don't want to release the leases on the incorrectly processed changes.
299307 // For now, just give up I guess?
300- changes = this . GetChanges ( ) ;
308+ changes = this . ProcessChanges ( ) ;
301309 }
302310 catch ( Exception e )
303311 {
@@ -311,18 +319,20 @@ private async Task ProcessTableChangesAsync(SqlConnection connection, Cancellati
311319 var input = new TriggeredFunctionData ( ) { TriggerValue = changes } ;
312320
313321 TelemetryInstance . TrackEvent ( TelemetryEventName . TriggerFunctionStart , this . _telemetryProps ) ;
322+ this . _logger . LogDebugWithThreadId ( "Executing triggered function" ) ;
314323 var stopwatch = Stopwatch . StartNew ( ) ;
315324
316325 FunctionResult result = await this . _executor . TryExecuteAsync ( input , this . _cancellationTokenSourceExecutor . Token ) ;
317-
326+ long durationMs = stopwatch . ElapsedMilliseconds ;
318327 var measures = new Dictionary < TelemetryMeasureName , double >
319328 {
320- [ TelemetryMeasureName . DurationMs ] = stopwatch . ElapsedMilliseconds ,
329+ [ TelemetryMeasureName . DurationMs ] = durationMs ,
321330 [ TelemetryMeasureName . BatchCount ] = this . _rows . Count ,
322331 } ;
323332
324333 if ( result . Succeeded )
325334 {
335+ this . _logger . LogDebugWithThreadId ( $ "Successfully triggered function. Duration={ durationMs } ms") ;
326336 TelemetryInstance . TrackEvent ( TelemetryEventName . TriggerFunctionEnd , this . _telemetryProps , measures ) ;
327337 await this . ReleaseLeasesAsync ( connection , token ) ;
328338 }
@@ -337,6 +347,7 @@ private async Task ProcessTableChangesAsync(SqlConnection connection, Cancellati
337347 }
338348 }
339349 }
350+ this . _logger . LogDebugWithThreadId ( "END ProcessTableChanges" ) ;
340351 }
341352
342353 /// <summary>
@@ -353,11 +364,14 @@ private async void RunLeaseRenewalLoopAsync()
353364
354365 using ( var connection = new SqlConnection ( this . _connectionString ) )
355366 {
367+ this . _logger . LogDebugWithThreadId ( "BEGIN OpenLeaseRenewalLoopConnection" ) ;
356368 await connection . OpenAsync ( token ) ;
357-
369+ this . _logger . LogDebugWithThreadId ( "END OpenLeaseRenewalLoopConnection" ) ;
358370 while ( ! token . IsCancellationRequested )
359371 {
372+ this . _logger . LogDebugWithThreadId ( "BEGIN WaitRowsLock - LeaseRenewal" ) ;
360373 await this . _rowsLock . WaitAsync ( token ) ;
374+ this . _logger . LogDebugWithThreadId ( "END WaitRowsLock - LeaseRenewal" ) ;
361375 await this . RenewLeasesAsync ( connection , token ) ;
362376 await Task . Delay ( TimeSpan . FromSeconds ( LeaseRenewalIntervalInSeconds ) , token ) ;
363377 }
@@ -391,13 +405,16 @@ private async Task RenewLeasesAsync(SqlConnection connection, CancellationToken
391405 using ( SqlCommand renewLeasesCommand = this . BuildRenewLeasesCommand ( connection ) )
392406 {
393407 TelemetryInstance . TrackEvent ( TelemetryEventName . RenewLeasesStart , this . _telemetryProps ) ;
408+ this . _logger . LogDebugWithThreadId ( $ "BEGIN RenewLeases Query={ renewLeasesCommand . CommandText } ") ;
394409 var stopwatch = Stopwatch . StartNew ( ) ;
395410
396411 await renewLeasesCommand . ExecuteNonQueryAsync ( token ) ;
397412
413+ long durationMs = stopwatch . ElapsedMilliseconds ;
414+ this . _logger . LogDebugWithThreadId ( $ "END RenewLeases Duration={ durationMs } ms") ;
398415 var measures = new Dictionary < TelemetryMeasureName , double >
399416 {
400- [ TelemetryMeasureName . DurationMs ] = stopwatch . ElapsedMilliseconds ,
417+ [ TelemetryMeasureName . DurationMs ] = durationMs ,
401418 } ;
402419
403420 TelemetryInstance . TrackEvent ( TelemetryEventName . RenewLeasesEnd , this . _telemetryProps , measures ) ;
@@ -437,6 +454,7 @@ private async Task RenewLeasesAsync(SqlConnection connection, CancellationToken
437454 }
438455
439456 // Want to always release the lock at the end, even if renewing the leases failed.
457+ this . _logger . LogDebugWithThreadId ( "ReleaseRowLock - RenewLeases" ) ;
440458 this . _rowsLock . Release ( ) ;
441459 }
442460 }
@@ -449,12 +467,15 @@ private async Task ClearRowsAsync(bool acquireLock)
449467 {
450468 if ( acquireLock )
451469 {
470+ this . _logger . LogDebugWithThreadId ( "BEGIN WaitRowsLock - ClearRows" ) ;
452471 await this . _rowsLock . WaitAsync ( ) ;
472+ this . _logger . LogDebugWithThreadId ( "END WaitRowsLock - ClearRows" ) ;
453473 }
454474
455475 this . _leaseRenewalCount = 0 ;
456476 this . _state = State . CheckingForChanges ;
457477 this . _rows = new List < IReadOnlyDictionary < string , string > > ( ) ;
478+ this . _logger . LogDebugWithThreadId ( "ReleaseRowLock - ClearRows" ) ;
458479 this . _rowsLock . Release ( ) ;
459480 }
460481
@@ -465,9 +486,10 @@ private async Task ClearRowsAsync(bool acquireLock)
465486 private async Task ReleaseLeasesAsync ( SqlConnection connection , CancellationToken token )
466487 {
467488 TelemetryInstance . TrackEvent ( TelemetryEventName . ReleaseLeasesStart , this . _telemetryProps ) ;
468-
489+ this . _logger . LogDebugWithThreadId ( "BEGIN WaitRowsLock - ReleaseLeases" ) ;
469490 // Don't want to change the "_rows" while another thread is attempting to renew leases on them.
470491 await this . _rowsLock . WaitAsync ( token ) ;
492+ this . _logger . LogDebugWithThreadId ( "END WaitRowsLock - ReleaseLeases" ) ;
471493 long newLastSyncVersion = this . RecomputeLastSyncVersion ( ) ;
472494
473495 try
@@ -482,18 +504,22 @@ private async Task ReleaseLeasesAsync(SqlConnection connection, CancellationToke
482504 // Release the leases held on "_rows".
483505 using ( SqlCommand releaseLeasesCommand = this . BuildReleaseLeasesCommand ( connection , transaction ) )
484506 {
507+ this . _logger . LogDebugWithThreadId ( $ "BEGIN ReleaseLeases Query={ releaseLeasesCommand . CommandText } ") ;
485508 var commandSw = Stopwatch . StartNew ( ) ;
486509 await releaseLeasesCommand . ExecuteNonQueryAsync ( token ) ;
487510 releaseLeasesDurationMs = commandSw . ElapsedMilliseconds ;
511+ this . _logger . LogDebugWithThreadId ( $ "END ReleaseLeases Duration={ releaseLeasesDurationMs } ms") ;
488512 }
489513
490514 // Update the global state table if we have processed all changes with ChangeVersion <= newLastSyncVersion,
491515 // and clean up the leases table to remove all rows with ChangeVersion <= newLastSyncVersion.
492516 using ( SqlCommand updateTablesPostInvocationCommand = this . BuildUpdateTablesPostInvocation ( connection , transaction , newLastSyncVersion ) )
493517 {
518+ this . _logger . LogDebugWithThreadId ( $ "BEGIN UpdateTablesPostInvocation Query={ updateTablesPostInvocationCommand . CommandText } ") ;
494519 var commandSw = Stopwatch . StartNew ( ) ;
495520 await updateTablesPostInvocationCommand . ExecuteNonQueryAsync ( token ) ;
496521 updateLastSyncVersionDurationMs = commandSw . ElapsedMilliseconds ;
522+ this . _logger . LogDebugWithThreadId ( $ "END UpdateTablesPostInvocation Duration={ updateLastSyncVersionDurationMs } ms") ;
497523 }
498524
499525 transaction . Commit ( ) ;
@@ -550,10 +576,11 @@ private long RecomputeLastSyncVersion()
550576 string changeVersion = row [ "SYS_CHANGE_VERSION" ] ;
551577 changeVersionSet . Add ( long . Parse ( changeVersion , CultureInfo . InvariantCulture ) ) ;
552578 }
553-
554579 // If there are more than one version numbers in the set, return the second highest one. Otherwise, return
555580 // the only version number in the set.
556- return changeVersionSet . ElementAt ( changeVersionSet . Count > 1 ? changeVersionSet . Count - 2 : 0 ) ;
581+ long lastSyncVersion = changeVersionSet . ElementAt ( changeVersionSet . Count > 1 ? changeVersionSet . Count - 2 : 0 ) ;
582+ this . _logger . LogDebugWithThreadId ( $ "RecomputeLastSyncVersion. LastSyncVersion={ lastSyncVersion } ChangeVersionSet={ string . Join ( "," , changeVersionSet ) } ") ;
583+ return lastSyncVersion ;
557584 }
558585
559586 /// <summary>
@@ -562,8 +589,9 @@ private long RecomputeLastSyncVersion()
562589 /// will be populated with only the primary key values of the deleted row.
563590 /// </summary>
564591 /// <returns>The list of changes</returns>
565- private IReadOnlyList < SqlChange < T > > GetChanges ( )
592+ private IReadOnlyList < SqlChange < T > > ProcessChanges ( )
566593 {
594+ this . _logger . LogDebugWithThreadId ( "BEGIN ProcessChanges" ) ;
567595 var changes = new List < SqlChange < T > > ( ) ;
568596 foreach ( IReadOnlyDictionary < string , string > row in this . _rows )
569597 {
@@ -577,7 +605,7 @@ private IReadOnlyList<SqlChange<T>> GetChanges()
577605
578606 changes . Add ( new SqlChange < T > ( operation , JsonConvert . DeserializeObject < T > ( JsonConvert . SerializeObject ( item ) ) ) ) ;
579607 }
580-
608+ this . _logger . LogDebugWithThreadId ( "END ProcessChanges" ) ;
581609 return changes ;
582610 }
583611
@@ -616,7 +644,7 @@ private SqlCommand BuildUpdateTablesPreInvocation(SqlConnection connection, SqlT
616644 SELECT @last_sync_version = LastSyncVersion
617645 FROM { SqlTriggerConstants . GlobalStateTableName }
618646 WHERE UserFunctionID = '{ this . _userFunctionId } ' AND UserTableID = { this . _userTableId } ;
619-
647+
620648 IF @last_sync_version < @min_valid_version
621649 UPDATE { SqlTriggerConstants . GlobalStateTableName }
622650 SET LastSyncVersion = @min_valid_version
0 commit comments