Skip to content

Commit e3784cc

Browse files
committed
Improved exception messages to include ping/sniff exceptions more clearly as suppose to inner maxretry exceptions. Added integration tests for the exception (which behave different in practice then theory (unit tests
1 parent 1851b1b commit e3784cc

File tree

15 files changed

+425
-76
lines changed

15 files changed

+425
-76
lines changed

src/Elasticsearch.Net/Connection/RequestState/ITransportRequestState.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ internal interface ITransportRequestState
1515
int? Seed { get; set; }
1616
Uri CurrentNode { get; set; }
1717
List<RequestMetrics> RequestMetrics { get; set; }
18+
List<Exception> SeenExceptions { get; }
1819
Func<IElasticsearchResponse, Stream, object> ResponseCreationOverride { get; set; }
1920
}
2021
}

src/Elasticsearch.Net/Connection/RequestState/TransportRequestState.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ public IRequestConfiguration RequestConfiguration
4646
public int Sniffs { get; set; }
4747

4848
public List<Uri> SeenNodes { get; private set; }
49+
public List<Exception> SeenExceptions { get; private set; }
4950
public List<RequestMetrics> RequestMetrics { get; set; }
5051

5152
public Uri CurrentNode
@@ -76,6 +77,7 @@ public TransportRequestState(
7677
{
7778
this.StartedOn = DateTime.UtcNow;
7879
this.SeenNodes = new List<Uri>();
80+
this.SeenExceptions = new List<Exception>();
7981
this.ClientSettings = settings;
8082
this.RequestParameters = requestParameters;
8183
this._traceEnabled = settings.TraceEnabled;
@@ -125,6 +127,12 @@ public Uri CreatePathOnCurrentNode(string path = null)
125127

126128
public void SetResult(ElasticsearchResponse<T> result)
127129
{
130+
if (result == null)
131+
{
132+
if (!_traceEnabled) return;
133+
this._stopwatch.Stop();
134+
return;
135+
}
128136
result.NumberOfRetries = this.Retried;
129137
if (this.ClientSettings.MetricsEnabled)
130138
result.Metrics = new CallMetrics

src/Elasticsearch.Net/Connection/Transport.cs

Lines changed: 109 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -68,17 +68,28 @@ private bool Ping(ITransportRequestState requestState)
6868
ConnectTimeout = pingTimeout,
6969
RequestTimeout = pingTimeout
7070
};
71+
try
72+
{
73+
ElasticsearchResponse<Stream> response;
74+
using (var rq = requestState.InitiateRequest(RequestType.Ping))
75+
{
76+
response = this.Connection.HeadSync(requestState.CreatePathOnCurrentNode(""), requestOverrides);
77+
rq.Finish(response.Success, response.HttpStatusCode);
78+
}
79+
if (!response.HttpStatusCode.HasValue || response.HttpStatusCode.Value == -1)
80+
throw new Exception("ping returned no status code");
81+
if (response.Response == null)
82+
return response.Success;
83+
7184

72-
ElasticsearchResponse<Stream> response;
73-
using (var rq = requestState.InitiateRequest(RequestType.Ping))
85+
using (response.Response)
86+
return response.Success;
87+
88+
}
89+
catch (Exception e)
7490
{
75-
response = this.Connection.HeadSync(requestState.CreatePathOnCurrentNode(""), requestOverrides);
76-
rq.Finish(response.Success, response.HttpStatusCode);
91+
throw new PingException(requestState.CurrentNode, e);
7792
}
78-
if (response.Response == null)
79-
return response.Success;
80-
using (response.Response)
81-
return response.Success;
8293
}
8394

8495
private Task<bool> PingAsync(ITransportRequestState requestState)
@@ -95,17 +106,27 @@ private Task<bool> PingAsync(ITransportRequestState requestState)
95106
return this.Connection.Head(requestState.CreatePathOnCurrentNode(""), requestOverrides)
96107
.ContinueWith(t =>
97108
{
109+
if (t.IsFaulted)
110+
{
111+
rq.Finish(false, null);
112+
rq.Dispose();
113+
throw new PingException(requestState.CurrentNode, t.Exception);
114+
}
98115
rq.Finish(t.Result.Success, t.Result.HttpStatusCode);
99116
rq.Dispose();
100117
var response = t.Result;
118+
if (!response.HttpStatusCode.HasValue || response.HttpStatusCode.Value == -1)
119+
throw new PingException(requestState.CurrentNode, t.Exception);
120+
101121
using (response.Response)
102122
return response.Success;
103123
});
104124
}
105125
catch (Exception e)
106126
{
107127
var tcs = new TaskCompletionSource<bool>();
108-
tcs.SetException(e);
128+
var pingException = new PingException(requestState.CurrentNode, e);
129+
tcs.SetException(pingException);
109130
return tcs.Task;
110131
}
111132
}
@@ -121,30 +142,37 @@ private IList<Uri> Sniff(ITransportRequestState ownerState = null)
121142
};
122143

123144
var requestParameters = new RequestParameters { RequestConfiguration = requestOverrides };
124-
125-
var path = "_nodes/_all/clear?timeout=" + pingTimeout;
126-
ElasticsearchResponse<Stream> response;
127-
using (var requestState = new TransportRequestState<Stream>(this.Settings, requestParameters, "GET", path))
145+
try
128146
{
129-
response = this.DoRequest(requestState);
130147

131-
//inform the owing request state of the requests the sniffs did.
132-
if (requestState.RequestMetrics != null && ownerState != null)
148+
var path = "_nodes/_all/clear?timeout=" + pingTimeout;
149+
ElasticsearchResponse<Stream> response;
150+
using (var requestState = new TransportRequestState<Stream>(this.Settings, requestParameters, "GET", path))
133151
{
134-
foreach (var r in requestState.RequestMetrics.Where(p => p.RequestType == RequestType.ElasticsearchCall))
135-
r.RequestType = RequestType.Sniff;
152+
response = this.DoRequest(requestState);
136153

154+
//inform the owing request state of the requests the sniffs did.
155+
if (requestState.RequestMetrics != null && ownerState != null)
156+
{
157+
foreach (var r in requestState.RequestMetrics.Where(p => p.RequestType == RequestType.ElasticsearchCall))
158+
r.RequestType = RequestType.Sniff;
137159

138-
if (ownerState.RequestMetrics == null) ownerState.RequestMetrics = new List<RequestMetrics>();
139-
ownerState.RequestMetrics.AddRange(requestState.RequestMetrics);
140-
}
141-
if (response.Response == null) return null;
142160

143-
using (response.Response)
144-
{
145-
return Sniffer.FromStream(response, response.Response, this.Serializer);
161+
if (ownerState.RequestMetrics == null) ownerState.RequestMetrics = new List<RequestMetrics>();
162+
ownerState.RequestMetrics.AddRange(requestState.RequestMetrics);
163+
}
164+
if (response.Response == null) return null;
165+
166+
using (response.Response)
167+
{
168+
return Sniffer.FromStream(response, response.Response, this.Serializer);
169+
}
146170
}
147171
}
172+
catch (MaxRetryException e)
173+
{
174+
throw new MaxRetryException(new SniffException(e));
175+
}
148176
}
149177

150178
private void SniffClusterState(ITransportRequestState requestState = null)
@@ -322,16 +350,21 @@ private ElasticsearchResponse<T> DoRequest<T>(TransportRequestState<T> requestSt
322350
return typedResponse;
323351
}
324352
}
353+
catch (MaxRetryException)
354+
{
355+
throw;
356+
}
325357
catch (ElasticsearchServerException)
326358
{
327359
throw;
328360
}
329361
catch (Exception e)
330362
{
363+
requestState.SeenExceptions.Add(e);
331364
if (maxRetries == 0 && retried == 0)
332365
throw;
333366
seenError = true;
334-
return RetryRequest<T>(requestState, e);
367+
return RetryRequest<T>(requestState);
335368
}
336369
finally
337370
{
@@ -342,16 +375,15 @@ private ElasticsearchResponse<T> DoRequest<T>(TransportRequestState<T> requestSt
342375
return RetryRequest<T>(requestState);
343376
}
344377

345-
private ElasticsearchResponse<T> RetryRequest<T>(TransportRequestState<T> requestState, Exception e = null)
378+
private ElasticsearchResponse<T> RetryRequest<T>(TransportRequestState<T> requestState)
346379
{
347380
var maxRetries = this.GetMaximumRetries(requestState.RequestConfiguration);
348-
var exceptionMessage = CreateMaxRetryExceptionMessage(requestState, e);
349381

350382
this._connectionPool.MarkDead(requestState.CurrentNode, this.ConfigurationValues.DeadTimeout, this.ConfigurationValues.MaxDeadTimeout);
351383

352384
SniffOnConnectionFailure(requestState);
353385

354-
if (requestState.Retried >= maxRetries) throw new MaxRetryException(exceptionMessage, e);
386+
ThrowMaxRetryExceptionWhenNeeded(requestState, maxRetries);
355387

356388
return this.DoRequest<T>(requestState);
357389
}
@@ -390,16 +422,24 @@ public Task<ElasticsearchResponse<T>> DoRequestAsync<T>(string method, string pa
390422
{
391423
var tcs = new TaskCompletionSource<ElasticsearchResponse<T>>();
392424
if (t.Exception != null)
425+
{
426+
var mr = t.Exception.InnerException as MaxRetryException;
427+
if (mr != null)
428+
throw mr;
429+
393430
tcs.SetException(t.Exception.Flatten());
431+
requestState.SetResult(null);
432+
}
394433
else
395434
{
396435
tcs.SetResult(t.Result);
436+
requestState.SetResult(t.Result);
397437
}
398438

399-
requestState.SetResult(t.Result);
400439

401440
return tcs.Task;
402-
}).Unwrap();
441+
}).Unwrap()
442+
;
403443
}
404444
}
405445

@@ -414,11 +454,12 @@ private Task<ElasticsearchResponse<T>> DoRequestAsync<T>(TransportRequestState<T
414454
.ContinueWith(t =>
415455
{
416456
if (t.IsFaulted)
417-
return this.RetryRequestAsync(requestState, t.Exception);
457+
{
458+
requestState.SeenExceptions.Add(t.Exception.InnerException);
459+
return this.RetryRequestAsync(requestState);
460+
}
418461
if (t.IsCompleted)
419462
{
420-
if (!t.Result)
421-
return this.RetryRequestAsync(requestState, t.Exception);
422463
return this.FinishOrRetryRequestAsync(requestState);
423464
}
424465
return null;
@@ -442,7 +483,8 @@ private Task<ElasticsearchResponse<T>> FinishOrRetryRequestAsync<T>(TransportReq
442483
{
443484
rq.Dispose();
444485
if (maxRetries == 0 && retried == 0) throw t.Exception;
445-
return this.RetryRequestAsync<T>(requestState, t.Exception);
486+
requestState.SeenExceptions.Add(t.Exception);
487+
return this.RetryRequestAsync<T>(requestState);
446488
}
447489

448490
if (t.Result.SuccessOrKnownError
@@ -472,21 +514,32 @@ private Task<ElasticsearchResponse<T>> FinishOrRetryRequestAsync<T>(TransportReq
472514
}).Unwrap();
473515
}
474516

475-
private Task<ElasticsearchResponse<T>> RetryRequestAsync<T>(TransportRequestState<T> requestState, Exception e = null)
517+
private Task<ElasticsearchResponse<T>> RetryRequestAsync<T>(TransportRequestState<T> requestState)
476518
{
477519
var maxRetries = this.GetMaximumRetries(requestState.RequestConfiguration);
478-
var exceptionMessage = CreateMaxRetryExceptionMessage(requestState, e);
479520

480521
this._connectionPool.MarkDead(requestState.CurrentNode, this.ConfigurationValues.DeadTimeout, this.ConfigurationValues.MaxDeadTimeout);
481522

482523
this.SniffOnConnectionFailure(requestState);
483524

484-
if (requestState.Retried >= maxRetries)
485-
throw new MaxRetryException(exceptionMessage, e);
525+
ThrowMaxRetryExceptionWhenNeeded(requestState, maxRetries);
486526

487527
return this.DoRequestAsync<T>(requestState);
488528
}
489529

530+
private static void ThrowMaxRetryExceptionWhenNeeded<T>(TransportRequestState<T> requestState, int maxRetries)
531+
{
532+
if (requestState.Retried < maxRetries) return;
533+
var innerExceptions = requestState.SeenExceptions.Where(e => e != null).ToList();
534+
var innerException = !innerExceptions.HasAny()
535+
? null
536+
: (innerExceptions.Count() == 1)
537+
? innerExceptions.First()
538+
: new AggregateException(requestState.SeenExceptions);
539+
var exceptionMessage = CreateMaxRetryExceptionMessage(requestState, innerException);
540+
throw new MaxRetryException(exceptionMessage, innerException);
541+
}
542+
490543
private static string CreateMaxRetryExceptionMessage<T>(TransportRequestState<T> requestState, Exception e)
491544
{
492545
string innerException = null;
@@ -498,12 +551,12 @@ private static string CreateMaxRetryExceptionMessage<T>(TransportRequestState<T>
498551

499552
aggregate = aggregate.Flatten();
500553
var innerExceptions = aggregate.InnerExceptions
501-
.Select(ae => MaxRetryInnerMessage.F(ae.GetType().Name, ae.Message, ae.StackTrace))
554+
.Select(ae => MaxRetryInnerMessage.F(ae.GetType().Name, ae.Message, "" ?? ae.StackTrace))
502555
.ToList();
503-
innerException = string.Join("\r\n", innerExceptions);
556+
innerException = "\r\n" + string.Join("\r\n", innerExceptions);
504557
}
505558
else
506-
innerException = MaxRetryInnerMessage.F(e.GetType().Name, e.Message, e.StackTrace);
559+
innerException = "\r\n" + MaxRetryInnerMessage.F(e.GetType().Name, e.Message, "" ?? e.StackTrace);
507560
}
508561
var exceptionMessage = MaxRetryExceptionMessage
509562
.F(requestState.Method, requestState.Path, requestState.Retried, innerException);
@@ -576,6 +629,10 @@ private ElasticsearchServerError ThrowOrGetErrorFromStreamResponse<T>(
576629
if (IsValidResponse(requestState, streamResponse))
577630
return null;
578631

632+
if (((streamResponse.HttpStatusCode.HasValue && streamResponse.HttpStatusCode.Value <= 0)
633+
|| !streamResponse.HttpStatusCode.HasValue) && streamResponse.OriginalException != null)
634+
throw streamResponse.OriginalException;
635+
579636
ElasticsearchServerError error = null;
580637

581638
var type = typeof(T);
@@ -598,7 +655,10 @@ private ElasticsearchServerError ThrowOrGetErrorFromStreamResponse<T>(
598655
var e = this.Serializer.Deserialize<OneToOneServerException>(ms);
599656
error = ElasticsearchServerError.Create(e);
600657
}
601-
catch { }
658+
catch (Exception e)
659+
{
660+
var raw = ms.ToArray().Utf8String();
661+
}
602662
ms.Position = 0;
603663
streamResponse.Response = ms;
604664
}
@@ -614,9 +674,12 @@ private ElasticsearchServerError ThrowOrGetErrorFromStreamResponse<T>(
614674

615675
private static bool IsValidResponse(ITransportRequestState requestState, IElasticsearchResponse streamResponse)
616676
{
617-
return (streamResponse.Success || requestState.RequestConfiguration != null) &&
618-
(streamResponse.Success || requestState.RequestConfiguration == null ||
619-
requestState.RequestConfiguration.AllowedStatusCodes.Any(i => i == streamResponse.HttpStatusCode));
677+
return streamResponse.Success ||
678+
(!streamResponse.Success
679+
&& requestState.RequestConfiguration != null
680+
&& requestState.RequestConfiguration.AllowedStatusCodes.HasAny(i => i == streamResponse.HttpStatusCode)
681+
);
682+
620683
}
621684

622685
private bool TypeOfResponseCopiesDirectly<T>()

src/Elasticsearch.Net/ConnectionPool/StaticConnectionPool.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ public class StaticConnectionPool : IConnectionPool
1414
protected IDictionary<Uri, EndpointState> UriLookup;
1515
protected IList<Uri> NodeUris;
1616
protected int Current = -1;
17+
private Random _random;
1718

1819
public int MaxRetries { get { return NodeUris.Count - 1; } }
1920

@@ -24,6 +25,7 @@ public StaticConnectionPool(
2425
bool randomizeOnStartup = true,
2526
IDateTimeProvider dateTimeProvider = null)
2627
{
28+
_random = new Random(1337);
2729
_dateTimeProvider = dateTimeProvider ?? new DateTimeProvider();
2830
var rnd = new Random();
2931
uris.ThrowIfEmpty("uris");
@@ -70,7 +72,7 @@ public virtual Uri GetNext(int? initialSeed, out int seed, out bool shouldPingHi
7072
} while (attempts < count);
7173

7274
//could not find a suitable node retrying on node that has been dead longest.
73-
return this.NodeUris[i];
75+
return this.NodeUris[_random.Next(0, count)];
7476
}
7577

7678
public virtual void MarkDead(Uri uri, int? deadTimeout, int? maxDeadTimeout)

0 commit comments

Comments
 (0)