@@ -147,90 +147,92 @@ public async Task FlushAsync(CancellationToken cancellationToken = default)
147147 /// <param name="configuration"> Used to build up the connection </param>
148148 private async Task UpsertRowsAsync ( IEnumerable < T > rows , SqlAttribute attribute , IConfiguration configuration )
149149 {
150- using SqlConnection connection = SqlBindingUtilities . BuildConnection ( attribute . ConnectionStringSetting , configuration ) ;
151- await connection . OpenAsync ( ) ;
152- Dictionary < string , string > props = connection . AsConnectionProps ( ) ;
150+ using ( SqlConnection connection = SqlBindingUtilities . BuildConnection ( attribute . ConnectionStringSetting , configuration ) )
151+ {
152+ await connection . OpenAsync ( ) ;
153+ Dictionary < string , string > props = connection . AsConnectionProps ( ) ;
153154
154- string fullTableName = attribute . CommandText ;
155+ string fullTableName = attribute . CommandText ;
155156
156- // Include the connection string hash as part of the key in case this customer has the same table in two different Sql Servers
157- string cacheKey = $ "{ connection . ConnectionString . GetHashCode ( ) } -{ fullTableName } ";
157+ // Include the connection string hash as part of the key in case this customer has the same table in two different Sql Servers
158+ string cacheKey = $ "{ connection . ConnectionString . GetHashCode ( ) } -{ fullTableName } ";
158159
159- ObjectCache cachedTables = MemoryCache . Default ;
160- var tableInfo = cachedTables [ cacheKey ] as TableInformation ;
160+ ObjectCache cachedTables = MemoryCache . Default ;
161+ var tableInfo = cachedTables [ cacheKey ] as TableInformation ;
161162
162- if ( tableInfo == null )
163- {
164- TelemetryInstance . TrackEvent ( TelemetryEventName . TableInfoCacheMiss , props ) ;
165- // set the columnNames for supporting T as JObject since it doesn't have columns in the memeber info.
166- tableInfo = await TableInformation . RetrieveTableInformationAsync ( connection , fullTableName , this . _logger , GetColumnNamesFromItem ( rows . First ( ) ) ) ;
167- var policy = new CacheItemPolicy
163+ if ( tableInfo == null )
168164 {
169- // Re-look up the primary key(s) after 10 minutes (they should not change very often!)
170- AbsoluteExpiration = DateTimeOffset . Now . AddMinutes ( 10 )
171- } ;
172-
173- this . _logger . LogInformation ( $ "DB and Table: { connection . Database } .{ fullTableName } . Primary keys: [{ string . Join ( "," , tableInfo . PrimaryKeys . Select ( pk => pk . Name ) ) } ]. SQL Column and Definitions: [{ string . Join ( "," , tableInfo . ColumnDefinitions ) } ]") ;
174- cachedTables . Set ( cacheKey , tableInfo , policy ) ;
175- }
176- else
177- {
178- TelemetryInstance . TrackEvent ( TelemetryEventName . TableInfoCacheHit , props ) ;
179- }
165+ TelemetryInstance . TrackEvent ( TelemetryEventName . TableInfoCacheMiss , props ) ;
166+ // set the columnNames for supporting T as JObject since it doesn't have columns in the memeber info.
167+ tableInfo = await TableInformation . RetrieveTableInformationAsync ( connection , fullTableName , this . _logger , GetColumnNamesFromItem ( rows . First ( ) ) ) ;
168+ var policy = new CacheItemPolicy
169+ {
170+ // Re-look up the primary key(s) after 10 minutes (they should not change very often!)
171+ AbsoluteExpiration = DateTimeOffset . Now . AddMinutes ( 10 )
172+ } ;
180173
181- IEnumerable < string > extraProperties = GetExtraProperties ( tableInfo . Columns , rows . First ( ) ) ;
182- if ( extraProperties . Any ( ) )
183- {
184- string message = $ "The following properties in { typeof ( T ) } do not exist in the table { fullTableName } : { string . Join ( ", " , extraProperties . ToArray ( ) ) } .";
185- var ex = new InvalidOperationException ( message ) ;
186- TelemetryInstance . TrackException ( TelemetryErrorName . PropsNotExistOnTable , ex , props ) ;
187- throw ex ;
188- }
174+ this . _logger . LogInformation ( $ "DB and Table: { connection . Database } .{ fullTableName } . Primary keys: [{ string . Join ( "," , tableInfo . PrimaryKeys . Select ( pk => pk . Name ) ) } ]. SQL Column and Definitions: [{ string . Join ( "," , tableInfo . ColumnDefinitions ) } ]") ;
175+ cachedTables . Set ( cacheKey , tableInfo , policy ) ;
176+ }
177+ else
178+ {
179+ TelemetryInstance . TrackEvent ( TelemetryEventName . TableInfoCacheHit , props ) ;
180+ }
189181
190- TelemetryInstance . TrackEvent ( TelemetryEventName . UpsertStart , props ) ;
191- var transactionSw = Stopwatch . StartNew ( ) ;
192- int batchSize = 1000 ;
193- SqlTransaction transaction = connection . BeginTransaction ( ) ;
194- try
195- {
196- SqlCommand command = connection . CreateCommand ( ) ;
197- command . Connection = connection ;
198- command . Transaction = transaction ;
199- SqlParameter par = command . Parameters . Add ( RowDataParameter , SqlDbType . NVarChar , - 1 ) ;
200- int batchCount = 0 ;
201- var commandSw = Stopwatch . StartNew ( ) ;
202- foreach ( IEnumerable < T > batch in rows . Batch ( batchSize ) )
182+ IEnumerable < string > extraProperties = GetExtraProperties ( tableInfo . Columns , rows . First ( ) ) ;
183+ if ( extraProperties . Any ( ) )
203184 {
204- batchCount ++ ;
205- GenerateDataQueryForMerge ( tableInfo , batch , out string newDataQuery , out string rowData ) ;
206- command . CommandText = $ "{ newDataQuery } { tableInfo . Query } ;";
207- par . Value = rowData ;
208- await command . ExecuteNonQueryAsync ( ) ;
185+ string message = $ "The following properties in { typeof ( T ) } do not exist in the table { fullTableName } : { string . Join ( ", " , extraProperties . ToArray ( ) ) } .";
186+ var ex = new InvalidOperationException ( message ) ;
187+ TelemetryInstance . TrackException ( TelemetryErrorName . PropsNotExistOnTable , ex , props ) ;
188+ throw ex ;
209189 }
210- transaction . Commit ( ) ;
211- var measures = new Dictionary < string , double > ( )
190+
191+ TelemetryInstance . TrackEvent ( TelemetryEventName . UpsertStart , props ) ;
192+ var transactionSw = Stopwatch . StartNew ( ) ;
193+ int batchSize = 1000 ;
194+ SqlTransaction transaction = connection . BeginTransaction ( ) ;
195+ try
196+ {
197+ SqlCommand command = connection . CreateCommand ( ) ;
198+ command . Connection = connection ;
199+ command . Transaction = transaction ;
200+ SqlParameter par = command . Parameters . Add ( RowDataParameter , SqlDbType . NVarChar , - 1 ) ;
201+ int batchCount = 0 ;
202+ var commandSw = Stopwatch . StartNew ( ) ;
203+ foreach ( IEnumerable < T > batch in rows . Batch ( batchSize ) )
204+ {
205+ batchCount ++ ;
206+ GenerateDataQueryForMerge ( tableInfo , batch , out string newDataQuery , out string rowData ) ;
207+ command . CommandText = $ "{ newDataQuery } { tableInfo . Query } ;";
208+ par . Value = rowData ;
209+ await command . ExecuteNonQueryAsync ( ) ;
210+ }
211+ transaction . Commit ( ) ;
212+ var measures = new Dictionary < string , double > ( )
212213 {
213214 { TelemetryMeasureName . BatchCount . ToString ( ) , batchCount } ,
214215 { TelemetryMeasureName . TransactionDurationMs . ToString ( ) , transactionSw . ElapsedMilliseconds } ,
215216 { TelemetryMeasureName . CommandDurationMs . ToString ( ) , commandSw . ElapsedMilliseconds }
216217 } ;
217- TelemetryInstance . TrackEvent ( TelemetryEventName . UpsertEnd , props , measures ) ;
218- this . _logger . LogInformation ( $ "Upserted { rows . Count ( ) } row(s) into database: { connection . Database } and table: { fullTableName } .") ;
219- }
220- catch ( Exception ex )
221- {
222- try
223- {
224- TelemetryInstance . TrackException ( TelemetryErrorName . Upsert , ex , props ) ;
225- transaction . Rollback ( ) ;
218+ TelemetryInstance . TrackEvent ( TelemetryEventName . UpsertEnd , props , measures ) ;
219+ this . _logger . LogInformation ( $ "Upserted { rows . Count ( ) } row(s) into database: { connection . Database } and table: { fullTableName } .") ;
226220 }
227- catch ( Exception ex2 )
221+ catch ( Exception ex )
228222 {
229- TelemetryInstance . TrackException ( TelemetryErrorName . UpsertRollback , ex2 , props ) ;
230- string message2 = $ "Encountered exception during upsert and rollback.";
231- throw new AggregateException ( message2 , new List < Exception > { ex , ex2 } ) ;
223+ try
224+ {
225+ TelemetryInstance . TrackException ( TelemetryErrorName . Upsert , ex , props ) ;
226+ transaction . Rollback ( ) ;
227+ }
228+ catch ( Exception ex2 )
229+ {
230+ TelemetryInstance . TrackException ( TelemetryErrorName . UpsertRollback , ex2 , props ) ;
231+ string message2 = $ "Encountered exception during upsert and rollback.";
232+ throw new AggregateException ( message2 , new List < Exception > { ex , ex2 } ) ;
233+ }
234+ throw ;
232235 }
233- throw ;
234236 }
235237 }
236238
@@ -376,7 +378,7 @@ public static bool GetCaseSensitivityFromCollation(string collation)
376378 public static string GetDatabaseCollationQuery ( SqlConnection sqlConnection )
377379 {
378380 return $@ "
379- SELECT
381+ SELECT
380382 DATABASEPROPERTYEX('{ sqlConnection . Database } ', '{ Collation } ') AS { Collation } ;" ;
381383 }
382384
@@ -454,7 +456,7 @@ public static string GetMergeQuery(IList<PrimaryKey> primaryKeys, SqlObject tabl
454456 }
455457
456458 string columnMatchingQuery = columnMatchingQueryBuilder . ToString ( ) . TrimEnd ( ',' ) ;
457- return @$ "
459+ return $@ "
458460 MERGE INTO { table . BracketQuotedFullName } WITH (HOLDLOCK)
459461 AS ExistingData
460462 USING { CteName }
@@ -490,17 +492,19 @@ public static async Task<TableInformation> RetrieveTableInformationAsync(SqlConn
490492 try
491493 {
492494 var cmdCollation = new SqlCommand ( GetDatabaseCollationQuery ( sqlConnection ) , sqlConnection ) ;
493- using SqlDataReader rdr = await cmdCollation . ExecuteReaderAsync ( ) ;
494- while ( await rdr . ReadAsync ( ) )
495+ using ( SqlDataReader rdr = await cmdCollation . ExecuteReaderAsync ( ) )
495496 {
496- caseSensitive = GetCaseSensitivityFromCollation ( rdr [ Collation ] . ToString ( ) ) ;
497+ while ( await rdr . ReadAsync ( ) )
498+ {
499+ caseSensitive = GetCaseSensitivityFromCollation ( rdr [ Collation ] . ToString ( ) ) ;
500+ }
501+ caseSensitiveSw . Stop ( ) ;
502+ TelemetryInstance . TrackDuration ( TelemetryEventName . GetCaseSensitivity , caseSensitiveSw . ElapsedMilliseconds , sqlConnProps ) ;
497503 }
498- caseSensitiveSw . Stop ( ) ;
499- TelemetryInstance . TrackDuration ( TelemetryEventName . GetCaseSensitivity , caseSensitiveSw . ElapsedMilliseconds , sqlConnProps ) ;
500504 }
501505 catch ( Exception ex )
502506 {
503- // Since this doesn't rethrow make sure we stop here too (don't use finally because we want the execution time to be the same here and in the
507+ // Since this doesn't rethrow make sure we stop here too (don't use finally because we want the execution time to be the same here and in the
504508 // overall event but we also only want to send the GetCaseSensitivity event if it succeeds)
505509 caseSensitiveSw . Stop ( ) ;
506510 TelemetryInstance . TrackException ( TelemetryErrorName . GetCaseSensitivity , ex , sqlConnProps ) ;
@@ -515,14 +519,17 @@ public static async Task<TableInformation> RetrieveTableInformationAsync(SqlConn
515519 try
516520 {
517521 var cmdColDef = new SqlCommand ( GetColumnDefinitionsQuery ( table ) , sqlConnection ) ;
518- using SqlDataReader rdr = await cmdColDef . ExecuteReaderAsync ( ) ;
519- while ( await rdr . ReadAsync ( ) )
522+ using ( SqlDataReader rdr = await cmdColDef . ExecuteReaderAsync ( ) )
520523 {
521- string columnName = caseSensitive ? rdr [ ColumnName ] . ToString ( ) : rdr [ ColumnName ] . ToString ( ) . ToLowerInvariant ( ) ;
522- columnDefinitionsFromSQL . Add ( columnName , rdr [ ColumnDefinition ] . ToString ( ) ) ;
524+ while ( await rdr . ReadAsync ( ) )
525+ {
526+ string columnName = caseSensitive ? rdr [ ColumnName ] . ToString ( ) : rdr [ ColumnName ] . ToString ( ) . ToLowerInvariant ( ) ;
527+ columnDefinitionsFromSQL . Add ( columnName , rdr [ ColumnDefinition ] . ToString ( ) ) ;
528+ }
529+ columnDefinitionsSw . Stop ( ) ;
530+ TelemetryInstance . TrackDuration ( TelemetryEventName . GetColumnDefinitions , columnDefinitionsSw . ElapsedMilliseconds , sqlConnProps ) ;
523531 }
524- columnDefinitionsSw . Stop ( ) ;
525- TelemetryInstance . TrackDuration ( TelemetryEventName . GetColumnDefinitions , columnDefinitionsSw . ElapsedMilliseconds , sqlConnProps ) ;
532+
526533 }
527534 catch ( Exception ex )
528535 {
@@ -546,14 +553,16 @@ public static async Task<TableInformation> RetrieveTableInformationAsync(SqlConn
546553 try
547554 {
548555 var cmd = new SqlCommand ( GetPrimaryKeysQuery ( table ) , sqlConnection ) ;
549- using SqlDataReader rdr = await cmd . ExecuteReaderAsync ( ) ;
550- while ( await rdr . ReadAsync ( ) )
556+ using ( SqlDataReader rdr = await cmd . ExecuteReaderAsync ( ) )
551557 {
552- string columnName = caseSensitive ? rdr [ ColumnName ] . ToString ( ) : rdr [ ColumnName ] . ToString ( ) . ToLowerInvariant ( ) ;
553- primaryKeys . Add ( new PrimaryKey ( columnName , bool . Parse ( rdr [ IsIdentity ] . ToString ( ) ) ) ) ;
558+ while ( await rdr . ReadAsync ( ) )
559+ {
560+ string columnName = caseSensitive ? rdr [ ColumnName ] . ToString ( ) : rdr [ ColumnName ] . ToString ( ) . ToLowerInvariant ( ) ;
561+ primaryKeys . Add ( new PrimaryKey ( columnName , bool . Parse ( rdr [ IsIdentity ] . ToString ( ) ) ) ) ;
562+ }
563+ primaryKeysSw . Stop ( ) ;
564+ TelemetryInstance . TrackDuration ( TelemetryEventName . GetPrimaryKeys , primaryKeysSw . ElapsedMilliseconds , sqlConnProps ) ;
554565 }
555- primaryKeysSw . Stop ( ) ;
556- TelemetryInstance . TrackDuration ( TelemetryEventName . GetPrimaryKeys , primaryKeysSw . ElapsedMilliseconds , sqlConnProps ) ;
557566 }
558567 catch ( Exception ex )
559568 {
0 commit comments