1010using System . Text ;
1111using System . Threading ;
1212using System . Threading . Tasks ;
13+ using static Microsoft . Azure . WebJobs . Extensions . Sql . Telemetry . Telemetry ;
1314using Microsoft . Azure . WebJobs . Logging ;
1415using Microsoft . Data . SqlClient ;
1516using Microsoft . Extensions . Configuration ;
1617using Microsoft . Extensions . Logging ;
1718using MoreLinq ;
1819using Newtonsoft . Json ;
1920using Newtonsoft . Json . Serialization ;
21+ using Microsoft . Azure . WebJobs . Extensions . Sql . Telemetry ;
22+ using System . Diagnostics ;
2023
2124namespace Microsoft . Azure . WebJobs . Extensions . Sql
2225{
@@ -74,6 +77,7 @@ public SqlAsyncCollector(IConfiguration configuration, SqlAttribute attribute, I
7477 this . _configuration = configuration ?? throw new ArgumentNullException ( nameof ( configuration ) ) ;
7578 this . _attribute = attribute ?? throw new ArgumentNullException ( nameof ( attribute ) ) ;
7679 this . _logger = loggerFactory ? . CreateLogger ( LogCategories . Bindings ) ?? throw new ArgumentNullException ( nameof ( loggerFactory ) ) ;
80+ TelemetryInstance . TrackCreate ( CreateType . SqlAsyncCollector ) ;
7781 }
7882
7983 /// <summary>
@@ -89,7 +93,7 @@ public async Task AddAsync(T item, CancellationToken cancellationToken = default
8993 if ( item != null )
9094 {
9195 await this . _rowLock . WaitAsync ( cancellationToken ) ;
92-
96+ TelemetryInstance . TrackEvent ( TelemetryEventName . AddAsync ) ;
9397 try
9498 {
9599 this . _rows . Add ( item ) ;
@@ -116,6 +120,7 @@ public async Task FlushAsync(CancellationToken cancellationToken = default)
116120 {
117121 if ( this . _rows . Count != 0 )
118122 {
123+ TelemetryInstance . TrackEvent ( TelemetryEventName . FlushAsync ) ;
119124 await this . UpsertRowsAsync ( this . _rows , this . _attribute , this . _configuration ) ;
120125 this . _rows . Clear ( ) ;
121126 }
@@ -139,6 +144,7 @@ private async Task UpsertRowsAsync(IEnumerable<T> rows, SqlAttribute attribute,
139144 {
140145 using SqlConnection connection = SqlBindingUtilities . BuildConnection ( attribute . ConnectionStringSetting , configuration ) ;
141146 await connection . OpenAsync ( ) ;
147+ Dictionary < string , string > props = connection . AsConnectionProps ( ) ;
142148
143149 string fullTableName = attribute . CommandText ;
144150
@@ -150,8 +156,8 @@ private async Task UpsertRowsAsync(IEnumerable<T> rows, SqlAttribute attribute,
150156
151157 if ( tableInfo == null )
152158 {
159+ TelemetryInstance . TrackEvent ( TelemetryEventName . TableInfoCacheMiss , props ) ;
153160 tableInfo = await TableInformation . RetrieveTableInformationAsync ( connection , fullTableName , this . _logger ) ;
154-
155161 var policy = new CacheItemPolicy
156162 {
157163 // Re-look up the primary key(s) after 10 minutes (they should not change very often!)
@@ -161,14 +167,22 @@ private async Task UpsertRowsAsync(IEnumerable<T> rows, SqlAttribute attribute,
161167 this . _logger . LogInformation ( $ "DB and Table: { connection . Database } .{ fullTableName } . Primary keys: [{ string . Join ( "," , tableInfo . PrimaryKeys . Select ( pk => pk . Name ) ) } ]. SQL Column and Definitions: [{ string . Join ( "," , tableInfo . ColumnDefinitions ) } ]") ;
162168 cachedTables . Set ( cacheKey , tableInfo , policy ) ;
163169 }
170+ else
171+ {
172+ TelemetryInstance . TrackEvent ( TelemetryEventName . TableInfoCacheHit , props ) ;
173+ }
164174
165175 IEnumerable < string > extraProperties = GetExtraProperties ( tableInfo . Columns ) ;
166176 if ( extraProperties . Any ( ) )
167177 {
168178 string message = $ "The following properties in { typeof ( T ) } do not exist in the table { fullTableName } : { string . Join ( ", " , extraProperties . ToArray ( ) ) } .";
169- throw new InvalidOperationException ( message ) ;
179+ var ex = new InvalidOperationException ( message ) ;
180+ TelemetryInstance . TrackError ( TelemetryErrorName . PropsNotExistOnTable , ex , props ) ;
181+ throw ex ;
170182 }
171183
184+ TelemetryInstance . TrackEvent ( TelemetryEventName . UpsertStart , props ) ;
185+ var transactionSw = Stopwatch . StartNew ( ) ;
172186 int batchSize = 1000 ;
173187 SqlTransaction transaction = connection . BeginTransaction ( ) ;
174188 try
@@ -177,24 +191,35 @@ private async Task UpsertRowsAsync(IEnumerable<T> rows, SqlAttribute attribute,
177191 command . Connection = connection ;
178192 command . Transaction = transaction ;
179193 SqlParameter par = command . Parameters . Add ( RowDataParameter , SqlDbType . NVarChar , - 1 ) ;
180-
194+ int batchCount = 0 ;
195+ var commandSw = Stopwatch . StartNew ( ) ;
181196 foreach ( IEnumerable < T > batch in rows . Batch ( batchSize ) )
182197 {
198+ batchCount ++ ;
183199 GenerateDataQueryForMerge ( tableInfo , batch , out string newDataQuery , out string rowData ) ;
184200 command . CommandText = $ "{ newDataQuery } { tableInfo . Query } ;";
185201 par . Value = rowData ;
186202 await command . ExecuteNonQueryAsync ( ) ;
187203 }
188204 transaction . Commit ( ) ;
205+ var measures = new Dictionary < string , double > ( )
206+ {
207+ { TelemetryMeasureName . BatchCount . ToString ( ) , batchCount } ,
208+ { TelemetryMeasureName . TransactionDurationMs . ToString ( ) , transactionSw . ElapsedMilliseconds } ,
209+ { TelemetryMeasureName . CommandDurationMs . ToString ( ) , commandSw . ElapsedMilliseconds }
210+ } ;
211+ TelemetryInstance . TrackEvent ( TelemetryEventName . UpsertEnd , props , measures ) ;
189212 }
190213 catch ( Exception ex )
191214 {
192215 try
193216 {
217+ TelemetryInstance . TrackError ( TelemetryErrorName . Upsert , ex , props ) ;
194218 transaction . Rollback ( ) ;
195219 }
196220 catch ( Exception ex2 )
197221 {
222+ TelemetryInstance . TrackError ( TelemetryErrorName . UpsertRollback , ex2 , props ) ;
198223 string message2 = $ "Encountered exception during upsert and rollback.";
199224 throw new AggregateException ( message2 , new List < Exception > { ex , ex2 } ) ;
200225 }
@@ -418,10 +443,14 @@ WHEN NOT MATCHED THEN
418443 /// <returns>TableInformation object containing primary keys, column types, etc.</returns>
419444 public static async Task < TableInformation > RetrieveTableInformationAsync ( SqlConnection sqlConnection , string fullName , ILogger logger )
420445 {
446+ Dictionary < string , string > sqlConnProps = sqlConnection . AsConnectionProps ( ) ;
447+ TelemetryInstance . TrackEvent ( TelemetryEventName . GetTableInfoStart , sqlConnProps ) ;
421448 var table = new SqlObject ( fullName ) ;
422449
423450 // Get case sensitivity from database collation (default to false if any exception occurs)
424451 bool caseSensitive = false ;
452+ var tableInfoSw = Stopwatch . StartNew ( ) ;
453+ var caseSensitiveSw = Stopwatch . StartNew ( ) ;
425454 try
426455 {
427456 var cmdCollation = new SqlCommand ( GetDatabaseCollationQuery ( sqlConnection ) , sqlConnection ) ;
@@ -430,16 +459,23 @@ public static async Task<TableInformation> RetrieveTableInformationAsync(SqlConn
430459 {
431460 caseSensitive = GetCaseSensitivityFromCollation ( rdr [ Collation ] . ToString ( ) ) ;
432461 }
462+ caseSensitiveSw . Stop ( ) ;
463+ TelemetryInstance . TrackDuration ( TelemetryEventName . GetCaseSensitivity , caseSensitiveSw . ElapsedMilliseconds , sqlConnProps ) ;
433464 }
434465 catch ( Exception ex )
435466 {
467+ // Since this doesn't rethrow make sure we stop here too (don't use finally because we want the execution time to be the same here and in the
468+ // overall event but we also only want to send the GetCaseSensitivity event if it succeeds)
469+ caseSensitiveSw . Stop ( ) ;
470+ TelemetryInstance . TrackError ( TelemetryErrorName . GetCaseSensitivity , ex , sqlConnProps ) ;
436471 logger . LogWarning ( $ "Encountered exception while retrieving database collation: { ex } . Case insensitive behavior will be used by default.") ;
437472 }
438473
439474 StringComparer comparer = caseSensitive ? StringComparer . Ordinal : StringComparer . OrdinalIgnoreCase ;
440475
441476 // Get all column names and types
442477 var columnDefinitionsFromSQL = new Dictionary < string , string > ( comparer ) ;
478+ var columnDefinitionsSw = Stopwatch . StartNew ( ) ;
443479 try
444480 {
445481 var cmdColDef = new SqlCommand ( GetColumnDefinitionsQuery ( table ) , sqlConnection ) ;
@@ -449,9 +485,12 @@ public static async Task<TableInformation> RetrieveTableInformationAsync(SqlConn
449485 string columnName = caseSensitive ? rdr [ ColumnName ] . ToString ( ) : rdr [ ColumnName ] . ToString ( ) . ToLowerInvariant ( ) ;
450486 columnDefinitionsFromSQL . Add ( columnName , rdr [ ColumnDefinition ] . ToString ( ) ) ;
451487 }
488+ columnDefinitionsSw . Stop ( ) ;
489+ TelemetryInstance . TrackDuration ( TelemetryEventName . GetColumnDefinitions , columnDefinitionsSw . ElapsedMilliseconds , sqlConnProps ) ;
452490 }
453491 catch ( Exception ex )
454492 {
493+ TelemetryInstance . TrackError ( TelemetryErrorName . GetColumnDefinitions , ex , sqlConnProps ) ;
455494 // Throw a custom error so that it's easier to decipher.
456495 string message = $ "Encountered exception while retrieving column names and types for table { table } . Cannot generate upsert command without them.";
457496 throw new InvalidOperationException ( message , ex ) ;
@@ -460,11 +499,14 @@ public static async Task<TableInformation> RetrieveTableInformationAsync(SqlConn
460499 if ( columnDefinitionsFromSQL . Count == 0 )
461500 {
462501 string message = $ "Table { table } does not exist.";
463- throw new InvalidOperationException ( message ) ;
502+ var ex = new InvalidOperationException ( message ) ;
503+ TelemetryInstance . TrackError ( TelemetryErrorName . GetColumnDefinitionsTableDoesNotExist , ex , sqlConnProps ) ;
504+ throw ex ;
464505 }
465506
466507 // Query SQL for table Primary Keys
467508 var primaryKeys = new List < PrimaryKey > ( ) ;
509+ var primaryKeysSw = Stopwatch . StartNew ( ) ;
468510 try
469511 {
470512 var cmd = new SqlCommand ( GetPrimaryKeysQuery ( table ) , sqlConnection ) ;
@@ -474,9 +516,12 @@ public static async Task<TableInformation> RetrieveTableInformationAsync(SqlConn
474516 string columnName = caseSensitive ? rdr [ ColumnName ] . ToString ( ) : rdr [ ColumnName ] . ToString ( ) . ToLowerInvariant ( ) ;
475517 primaryKeys . Add ( new PrimaryKey ( columnName , bool . Parse ( rdr [ IsIdentity ] . ToString ( ) ) ) ) ;
476518 }
519+ primaryKeysSw . Stop ( ) ;
520+ TelemetryInstance . TrackDuration ( TelemetryEventName . GetPrimaryKeys , primaryKeysSw . ElapsedMilliseconds , sqlConnProps ) ;
477521 }
478522 catch ( Exception ex )
479523 {
524+ TelemetryInstance . TrackError ( TelemetryErrorName . GetPrimaryKeys , ex , sqlConnProps ) ;
480525 // Throw a custom error so that it's easier to decipher.
481526 string message = $ "Encountered exception while retrieving primary keys for table { table } . Cannot generate upsert command without them.";
482527 throw new InvalidOperationException ( message , ex ) ;
@@ -485,7 +530,9 @@ public static async Task<TableInformation> RetrieveTableInformationAsync(SqlConn
485530 if ( ! primaryKeys . Any ( ) )
486531 {
487532 string message = $ "Did not retrieve any primary keys for { table } . Cannot generate upsert command without them.";
488- throw new InvalidOperationException ( message ) ;
533+ var ex = new InvalidOperationException ( message ) ;
534+ TelemetryInstance . TrackError ( TelemetryErrorName . NoPrimaryKeys , ex , sqlConnProps ) ;
535+ throw ex ;
489536 }
490537
491538 // Match SQL Primary Key column names to POCO field/property objects. Ensure none are missing.
@@ -500,12 +547,26 @@ public static async Task<TableInformation> RetrieveTableInformationAsync(SqlConn
500547 if ( ! hasIdentityColumnPrimaryKeys && missingPrimaryKeysFromPOCO . Any ( ) )
501548 {
502549 string message = $ "All primary keys for SQL table { table } need to be found in '{ typeof ( T ) } .' Missing primary keys: [{ string . Join ( "," , missingPrimaryKeysFromPOCO ) } ]";
503- throw new InvalidOperationException ( message ) ;
550+ var ex = new InvalidOperationException ( message ) ;
551+ TelemetryInstance . TrackError ( TelemetryErrorName . MissingPrimaryKeys , ex , sqlConnProps ) ;
552+ throw ex ;
504553 }
505554
506555 // If any identity columns aren't included in the object then we have to generate a basic insert since the merge statement expects all primary key
507556 // columns to exist. (the merge statement can handle nullable columns though if those exist)
508- string query = hasIdentityColumnPrimaryKeys && missingPrimaryKeysFromPOCO . Any ( ) ? GetInsertQuery ( table ) : GetMergeQuery ( primaryKeys , table , comparison ) ;
557+ bool usingInsertQuery = hasIdentityColumnPrimaryKeys && missingPrimaryKeysFromPOCO . Any ( ) ;
558+ string query = usingInsertQuery ? GetInsertQuery ( table ) : GetMergeQuery ( primaryKeys , table , comparison ) ;
559+
560+ tableInfoSw . Stop ( ) ;
561+ var durations = new Dictionary < string , double > ( )
562+ {
563+ { TelemetryMeasureName . GetCaseSensitivityDurationMs . ToString ( ) , caseSensitiveSw . ElapsedMilliseconds } ,
564+ { TelemetryMeasureName . GetColumnDefinitionsDurationMs . ToString ( ) , columnDefinitionsSw . ElapsedMilliseconds } ,
565+ { TelemetryMeasureName . GetPrimaryKeysDurationMs . ToString ( ) , primaryKeysSw . ElapsedMilliseconds }
566+ } ;
567+ sqlConnProps . Add ( TelemetryPropertyName . QueryType . ToString ( ) , usingInsertQuery ? "insert" : "merge" ) ;
568+ sqlConnProps . Add ( TelemetryPropertyName . HasIdentityColumn . ToString ( ) , hasIdentityColumnPrimaryKeys . ToString ( ) ) ;
569+ TelemetryInstance . TrackDuration ( TelemetryEventName . GetTableInfoEnd , tableInfoSw . ElapsedMilliseconds , sqlConnProps , durations ) ;
509570 return new TableInformation ( primaryKeyFields , columnDefinitionsFromSQL , comparer , query ) ;
510571 }
511572 }
0 commit comments