@@ -23,8 +23,13 @@ public class RequestPipeline : IRequestPipeline
2323 private const string ExpectedTagLine = "You Know, for Search" ;
2424 private const string NoNodesAttemptedMessage = "No nodes were attempted, this can happen when a node predicate does not match any nodes" ;
2525
26+ public const string ProductCheckTransientErrorWarning =
27+ "The client is unable to verify that the server is Elasticsearch due to an unsuccessful product check call. "
28+ + "Some functionality may not be compatible if the server is running an unsupported product." ;
29+
2630 public const string UndeterminedProductWarning =
27- "The client is unable to verify that the server is Elasticsearch due security privileges on the server side." ;
31+ "The client is unable to verify that the server is Elasticsearch due to security privileges on the server side. "
32+ + "Some functionality may not be compatible if the server is running an unsupported product." ;
2833
2934 private static readonly Version MinVersion = new ( 6 , 0 , 0 ) ;
3035 private static readonly Version Version7 = new ( 7 , 0 , 0 ) ;
@@ -255,7 +260,7 @@ public ElasticsearchClientException CreateClientException<TResponse>(
255260 public void FirstPoolUsage ( SemaphoreSlim semaphore )
256261 {
257262 // If sniffing has completed and the product check has run, we are done!
258- if ( ! FirstPoolUsageNeedsSniffing && _connectionPool . ProductCheckStatus != ProductCheckStatus . NotChecked )
263+ if ( ! FirstPoolUsageNeedsSniffing && ! RequiresProductCheck ( _connectionPool . ProductCheckStatus ) )
259264 return ;
260265
261266 if ( ! semaphore . Wait ( _settings . RequestTimeout . Add ( _settings . RequestTimeout ) ) ) // Double the timeout to allow for product check delays
@@ -270,7 +275,7 @@ public void FirstPoolUsage(SemaphoreSlim semaphore)
270275
271276 try
272277 {
273- if ( _connectionPool . ProductCheckStatus == ProductCheckStatus . NotChecked )
278+ if ( RequiresProductCheck ( _connectionPool . ProductCheckStatus ) )
274279 using ( Audit ( ProductCheckOnStartup ) )
275280 {
276281 var nodes = _connectionPool . Nodes . ToArray ( ) ; // Isolated copy of nodes for the product check
@@ -282,8 +287,9 @@ public void FirstPoolUsage(SemaphoreSlim semaphore)
282287 }
283288 else
284289 // We determine the product from the first node which successfully responds.
290+ // If a node fails, we retry other available nodes until the request timeout is reached.
285291 for ( var i = 0 ;
286- i < nodes . Length && _connectionPool . ProductCheckStatus == ProductCheckStatus . NotChecked && ! IsTakingTooLong ;
292+ i < nodes . Length && RequiresProductCheck ( _connectionPool . ProductCheckStatus ) && ! IsTakingTooLong ;
287293 i ++ )
288294 ProductCheck ( nodes [ i ] ) ;
289295
@@ -311,7 +317,7 @@ public void FirstPoolUsage(SemaphoreSlim semaphore)
311317 public async Task FirstPoolUsageAsync ( SemaphoreSlim semaphore , CancellationToken cancellationToken )
312318 {
313319 // If sniffing has completed and the product check has run, we are done!
314- if ( ! FirstPoolUsageNeedsSniffing && _connectionPool . ProductCheckStatus != ProductCheckStatus . NotChecked )
320+ if ( ! FirstPoolUsageNeedsSniffing && ! RequiresProductCheck ( _connectionPool . ProductCheckStatus ) )
315321 return ;
316322
317323 // TODO cancellationToken could throw here and will bubble out as OperationCancelledException
@@ -330,7 +336,7 @@ public async Task FirstPoolUsageAsync(SemaphoreSlim semaphore, CancellationToken
330336
331337 try
332338 {
333- if ( _connectionPool . ProductCheckStatus == ProductCheckStatus . NotChecked )
339+ if ( RequiresProductCheck ( _connectionPool . ProductCheckStatus ) )
334340 using ( Audit ( ProductCheckOnStartup ) )
335341 {
336342 var nodes = _connectionPool . Nodes . ToArray ( ) ; // Isolated copy of nodes for the product check
@@ -342,8 +348,9 @@ public async Task FirstPoolUsageAsync(SemaphoreSlim semaphore, CancellationToken
342348 }
343349 else
344350 // We determine the product from the first node which successfully responds.
351+ // If a node fails, we retry other available nodes until the request timeout is reached.
345352 for ( var i = 0 ;
346- i < nodes . Length && _connectionPool . ProductCheckStatus == ProductCheckStatus . NotChecked && ! IsTakingTooLong ;
353+ i < nodes . Length && RequiresProductCheck ( _connectionPool . ProductCheckStatus ) && ! IsTakingTooLong ;
347354 i ++ )
348355 await ProductCheckAsync ( nodes [ i ] , cancellationToken ) . ConfigureAwait ( false ) ;
349356
@@ -595,21 +602,37 @@ public void ThrowNoNodesAttempted(RequestData requestData, List<PipelineExceptio
595602 throw new UnexpectedElasticsearchClientException ( clientException , seenExceptions ) { Request = requestData , AuditTrail = AuditTrail } ;
596603 }
597604
605+ private static bool RequiresProductCheck ( ProductCheckStatus status ) =>
606+ status is ProductCheckStatus . NotChecked or ProductCheckStatus . TransientFailure ;
607+
598608 private TResponse PostCallElasticsearch < TResponse > ( RequestData requestData , TResponse response ,
599609 Diagnostic < RequestData , IApiCallDetails > diagnostic , Auditable audit
600610 )
601611 where TResponse : class , IElasticsearchResponse , new ( )
602612 {
603- // Add additional warning to debug information if the product could not be determined and may not be Elasticsearch
604- if ( _connectionPool . ProductCheckStatus == ProductCheckStatus . UndeterminedProduct && response . ApiCall is ApiCallDetails callDetails )
605- {
606- Debug . WriteLine ( UndeterminedProductWarning ) ;
607- callDetails . BuildDebugInformationPrefix = sb =>
613+ if ( response . ApiCall is ApiCallDetails callDetails )
614+ switch ( _connectionPool . ProductCheckStatus )
608615 {
609- sb . AppendLine ( "# Warnings:" ) ;
610- sb . AppendLine ( $ "- { UndeterminedProductWarning } ") ;
611- } ;
612- }
616+ // Add additional warning to debug information if the product could not be determined and may not be Elasticsearch
617+ case ProductCheckStatus . UndeterminedProduct :
618+ Debug . WriteLine ( UndeterminedProductWarning ) ;
619+ callDetails . BuildDebugInformationPrefix = sb =>
620+ {
621+ sb . AppendLine ( "# Warnings:" ) ;
622+ sb . AppendLine ( $ "- { UndeterminedProductWarning } ") ;
623+ } ;
624+ break ;
625+
626+ // Add additional warning to debug information if the product could not be determined due to a transient error.
627+ case ProductCheckStatus . TransientFailure :
628+ Debug . WriteLine ( ProductCheckTransientErrorWarning ) ;
629+ callDetails . BuildDebugInformationPrefix = sb =>
630+ {
631+ sb . AppendLine ( "# Warnings:" ) ;
632+ sb . AppendLine ( $ "- { ProductCheckTransientErrorWarning } ") ;
633+ } ;
634+ break ;
635+ }
613636
614637 diagnostic . EndState = response . ApiCall ;
615638 response . ApiCall . AuditTrail = AuditTrail ;
@@ -692,10 +715,14 @@ private bool ApplyProductCheckRules(RootResponse response)
692715 return true ;
693716 }
694717
695- // Any response besides a 200, 401 or 403 is considered a failure and the check is considered incomplete remaining in the NotChecked state .
718+ // Any response besides a 200, 401 or 403 is considered a failure and the check is considered incomplete and marked as a likely transient failure .
696719 // This means that the check will run on subsequent requests until we have a valid response to evaluate.
697720 // By returning false, the failure to perform the product check will be included in the audit log.
698- if ( ! response . Success ) return false ;
721+ if ( ! response . Success )
722+ {
723+ _connectionPool . ProductCheckStatus = ProductCheckStatus . TransientFailure ;
724+ return false ;
725+ }
699726
700727 // Start by assuming the product is valid
701728 _connectionPool . ProductCheckStatus = ProductCheckStatus . ValidProduct ;
0 commit comments