Skip to content

Commit 7357c0d

Browse files
committed
When maxretries is not configured and we are sniffing on connectionfaults take into account the newly returned maxretries from the connection pool before assuming we may shortcircuit to return the result because maxretry is 0
1 parent 6c54d0a commit 7357c0d

File tree

6 files changed

+152
-13
lines changed

6 files changed

+152
-13
lines changed

src/Elasticsearch.Net/Connection/Transport.cs

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,10 @@ private ElasticsearchResponse<T> DoRequest<T>(TransportRequestState<T> requestSt
193193
this.Ping(baseUri);
194194

195195
var streamResponse = _doRequest(requestState.Method, uri, requestState.PostData, requestState.RequestConfiguration);
196-
if (streamResponse != null && ((maxRetries == 0 && retried == 0) || streamResponse.SuccessOrKnownError))
196+
if (streamResponse.SuccessOrKnownError
197+
|| (
198+
maxRetries == 0 && retried == 0 && !SniffOnFaultDiscoveredMoreNodes(requestState, retried, streamResponse))
199+
)
197200
{
198201
var error = ThrowOrGetErrorFromStreamResponse(requestState, streamResponse);
199202

@@ -220,6 +223,14 @@ private ElasticsearchResponse<T> DoRequest<T>(TransportRequestState<T> requestSt
220223
return RetryRequest<T>(requestState, baseUri, retried);
221224
}
222225

226+
private bool SniffOnFaultDiscoveredMoreNodes<T>(TransportRequestState<T> requestState, int retried,
227+
ElasticsearchResponse<Stream> streamResponse)
228+
{
229+
if (retried != 0 || streamResponse.SuccessOrKnownError) return false;
230+
SniffOnConnectionFailure(requestState, retried);
231+
return this.GetMaximumRetries(requestState.RequestConfiguration) > 0;
232+
}
233+
223234
private void SetErrorDiagnosticsAndPatchSuccess<T>(TransportRequestState<T> requestState,
224235
ElasticsearchServerError error, ElasticsearchResponse<T> typedResponse, ElasticsearchResponse<Stream> streamResponse)
225236
{
@@ -256,16 +267,24 @@ private ElasticsearchResponse<T> RetryRequest<T>(TransportRequestState<T> reques
256267

257268
this._connectionPool.MarkDead(baseUri, this.ConfigurationValues.DeadTimeout, this.ConfigurationValues.MaxDeadTimeout);
258269

259-
if (!SniffingDisabled(requestState.RequestConfiguration)
260-
&& this.ConfigurationValues.SniffsOnConnectionFault
261-
&& retried == 0)
262-
this.SniffClusterState();
270+
SniffOnConnectionFailure(requestState, retried);
263271

264272
if (retried >= maxRetries) throw new MaxRetryException(exceptionMessage, e);
265273

266274
return this.DoRequest<T>(requestState, ++retried);
267275
}
268276

277+
private void SniffOnConnectionFailure<T>(TransportRequestState<T> requestState, int retried)
278+
{
279+
if (requestState.SniffedOnConnectionFailure
280+
|| SniffingDisabled(requestState.RequestConfiguration)
281+
|| !this.ConfigurationValues.SniffsOnConnectionFault
282+
|| retried != 0) return;
283+
284+
this.SniffClusterState();
285+
requestState.SniffedOnConnectionFailure = true;
286+
}
287+
269288
private ElasticsearchResponse<Stream> _doRequest(string method, Uri uri, byte[] postData, IRequestConfiguration requestSpecificConfig)
270289
{
271290
switch (method.ToLowerInvariant())
@@ -350,7 +369,11 @@ private Task<ElasticsearchResponse<T>> _doRequestAsyncOrRetry<T>(
350369
throw t.Exception;
351370
return this.RetryRequestAsync<T>(requestState, baseUri, retried, t.Exception);
352371
}
353-
if ((maxRetries == 0 && retried == 0) || t.Result.SuccessOrKnownError)
372+
373+
if (t.Result.SuccessOrKnownError
374+
|| (
375+
maxRetries == 0 && retried == 0 && !SniffOnFaultDiscoveredMoreNodes(requestState, retried, t.Result))
376+
)
354377
{
355378
var error = ThrowOrGetErrorFromStreamResponse(requestState, t.Result);
356379
return this.StreamToTypedResponseAsync<T>(t.Result, requestState.DeserializationState)
@@ -374,8 +397,7 @@ private Task<ElasticsearchResponse<T>> RetryRequestAsync<T>(TransportRequestStat
374397

375398
this._connectionPool.MarkDead(baseUri, this.ConfigurationValues.DeadTimeout, this.ConfigurationValues.MaxDeadTimeout);
376399

377-
if (this.ConfigurationValues.SniffsOnConnectionFault && retried == 0)
378-
this.SniffClusterState();
400+
this.SniffOnConnectionFailure(requestState, retried);
379401

380402
if (retried < maxRetries)
381403
return this.DoRequestAsync<T>(requestState, ++retried);

src/Elasticsearch.Net/Connection/TransportRequestState.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ class TransportRequestState<T>
1212
public IRequestConfiguration RequestConfiguration { get; set; }
1313
public object DeserializationState { get; private set; }
1414

15+
internal bool SniffedOnConnectionFailure { get; set; }
16+
1517
public TransportRequestState(ElasticsearchResponseTracer<T> tracer, string method, string path, byte[] postData = null, IRequestParameters requestParameters = null)
1618
{
1719
this.Method = method;

src/Elasticsearch.Net/ElasticsearchClient.Generated.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23211,7 +23211,7 @@ public Task<ElasticsearchResponse<DynamicDictionary>> InfoAsync(Func<InfoRequest
2321123211
requestParams = requestParameters(new InfoRequestParameters());
2321223212
ToNameValueCollection(requestParams);
2321323213
}
23214-
23214+
2321523215

2321623216

2321723217
return ElasticsearchResponse.WrapAsync(this.DoRequestAsync<Dictionary<string, object>>(

src/Tests/Elasticsearch.Net.Tests.Unit/Connection/SniffingConnectionPoolTests.cs

Lines changed: 88 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using System;
22
using System.Collections.Generic;
3+
using System.IO;
34
using System.Linq;
45
using System.Text;
56
using System.Threading.Tasks;
@@ -142,6 +143,10 @@ public void SniffOnConnectionFaultCausesSniffOn503()
142143
{
143144
var dateTimeProvider = fake.Resolve<IDateTimeProvider>();
144145
var nowCall = A.CallTo(()=>dateTimeProvider.Now());
146+
nowCall.Invokes(() =>
147+
{
148+
149+
});
145150
nowCall.Returns(DateTime.UtcNow);
146151
var nodes = new[] { new Uri("http://localhost:9200") };
147152
var connectionPool = new SniffingConnectionPool(nodes);
@@ -150,7 +155,13 @@ public void SniffOnConnectionFaultCausesSniffOn503()
150155
fake.Provide<IConnectionConfigurationValues>(config);
151156
var transport = FakeCalls.ProvideDefaultTransport(fake, dateTimeProvider);
152157
var connection = fake.Resolve<IConnection>();
153-
var sniffCall = FakeCalls.Sniff(fake, config, nodes);
158+
159+
var sniffNewNodes = new[]
160+
{
161+
new Uri("http://localhost:9200"),
162+
new Uri("http://localhost:9201")
163+
};
164+
var sniffCall = FakeCalls.Sniff(fake, config, sniffNewNodes);
154165
var getCall = FakeCalls.GetSyncCall(fake);
155166
getCall.ReturnsNextFromSequence(
156167

@@ -169,8 +180,83 @@ public void SniffOnConnectionFaultCausesSniffOn503()
169180
Assert.Throws<MaxRetryException>(()=>client1.Info()); //info call 5
170181

171182
sniffCall.MustHaveHappened(Repeated.Exactly.Once);
172-
nowCall.MustHaveHappened(Repeated.Exactly.Times(7));
183+
nowCall.MustHaveHappened(Repeated.Exactly.Times(8));
184+
185+
}
186+
}
187+
188+
[Test]
189+
public async void HostsReturnedBySniffAreVisited_Async()
190+
{
191+
using (var fake = new AutoFake())
192+
{
193+
var dateTimeProvider = fake.Resolve<IDateTimeProvider>();
194+
var nowCall = A.CallTo(()=>dateTimeProvider.Now());
195+
nowCall.Returns(DateTime.UtcNow);
173196

197+
var connectionPool = new SniffingConnectionPool(new[]
198+
{
199+
new Uri("http://localhost:9200"),
200+
new Uri("http://localhost:9201")
201+
}, randomizeOnStartup: false);
202+
var config = new ConnectionConfiguration(connectionPool)
203+
.SniffOnConnectionFault();
204+
fake.Provide<IConnectionConfigurationValues>(config);
205+
FakeCalls.ProvideDefaultTransport(fake);
206+
FakeCalls.PingAtConnectionLevelAsync(fake)
207+
.ReturnsLazily(()=>
208+
FakeResponse.OkAsync(config)
209+
);
210+
211+
var sniffCall = FakeCalls.Sniff(fake, config, new List<Uri>()
212+
{
213+
new Uri("http://localhost:9204"),
214+
new Uri("http://localhost:9203"),
215+
new Uri("http://localhost:9202"),
216+
new Uri("http://localhost:9201")
217+
});
218+
219+
var connection = fake.Resolve<IConnection>();
220+
var seenNodes = new List<Uri>();
221+
//var getCall = FakeResponse.GetSyncCall(fake);
222+
var getCall = A.CallTo(() => connection.Get(
223+
A<Uri>.That.Not.Matches(u=>u.AbsolutePath.StartsWith("/_nodes")),
224+
A<IRequestConfiguration>._));
225+
getCall.ReturnsNextFromSequence(
226+
FakeResponse.OkAsync(config), //info 1
227+
FakeResponse.BadAsync(config), //info 2
228+
FakeResponse.OkAsync(config), //info 2 retry
229+
FakeResponse.OkAsync(config), //info 3
230+
FakeResponse.OkAsync(config), //info 4
231+
FakeResponse.OkAsync(config), //info 5
232+
FakeResponse.OkAsync(config), //info 6
233+
FakeResponse.OkAsync(config), //info 7
234+
FakeResponse.OkAsync(config), //info 8
235+
FakeResponse.OkAsync(config) //info 9
236+
);
237+
getCall.Invokes((Uri u, IRequestConnectionConfiguration o) => seenNodes.Add(u));
238+
239+
var client1 = fake.Resolve<ElasticsearchClient>();
240+
await client1.InfoAsync(); //info call 1
241+
await client1.InfoAsync(); //info call 2
242+
await client1.InfoAsync(); //info call 3
243+
await client1.InfoAsync(); //info call 4
244+
await client1.InfoAsync(); //info call 5
245+
await client1.InfoAsync(); //info call 6
246+
await client1.InfoAsync(); //info call 7
247+
await client1.InfoAsync(); //info call 8
248+
await client1.InfoAsync(); //info call 9
249+
250+
sniffCall.MustHaveHappened(Repeated.Exactly.Once);
251+
seenNodes.Should().NotBeEmpty().And.HaveCount(10);
252+
seenNodes[0].Port.Should().Be(9200);
253+
seenNodes[1].Port.Should().Be(9201);
254+
//after sniff
255+
seenNodes[2].Port.Should().Be(9202, string.Join(",", seenNodes.Select(n=>n.Port)));
256+
seenNodes[3].Port.Should().Be(9204);
257+
seenNodes[4].Port.Should().Be(9203);
258+
seenNodes[5].Port.Should().Be(9202);
259+
seenNodes[6].Port.Should().Be(9201);
174260
}
175261
}
176262
[Test]

src/Tests/Elasticsearch.Net.Tests.Unit/Stubs/FakeCalls.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ public static IReturnValueConfiguration<ElasticsearchResponse<Stream>> PingAtCon
3434
A<Uri>.That.Matches(IsRoot()), A<RequestConnectionConfiguration>._));
3535
}
3636

37-
3837

3938
public static IReturnValueConfiguration<Task<ElasticsearchResponse<Stream>>> PingAtConnectionLevelAsync(AutoFake fake)
4039
{

src/Tests/Elasticsearch.Net.Tests.Unit/Stubs/FakeResponse.cs

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
using System.Collections.Generic;
22
using System.IO;
3+
using System.Text;
4+
using System.Threading.Tasks;
35
using Elasticsearch.Net.Connection;
46

57
namespace Elasticsearch.Net.Tests.Unit.Stubs
@@ -14,14 +16,33 @@ public static ElasticsearchResponse<Stream> Ok(
1416
{
1517
return ElasticsearchResponse<Stream>.Create(config, 200, method, path, null, response);
1618
}
17-
19+
20+
public static Task<ElasticsearchResponse<Stream>> OkAsync(
21+
IConnectionConfigurationValues config,
22+
string method = "GET",
23+
string path = "/",
24+
Stream response = null)
25+
{
26+
response = response ?? new MemoryStream(Encoding.UTF8.GetBytes("{}"));
27+
return Task.FromResult(ElasticsearchResponse<Stream>.Create(config, 200, method, path, null, response));
28+
}
29+
1830
public static ElasticsearchResponse<Stream> Bad(
1931
IConnectionConfigurationValues config,
2032
string method = "GET",
2133
string path = "/")
2234
{
2335
return ElasticsearchResponse<Stream>.Create(config, 503, method, path, null);
2436
}
37+
38+
public static Task<ElasticsearchResponse<Stream>> BadAsync(
39+
IConnectionConfigurationValues config,
40+
string method = "GET",
41+
string path = "/")
42+
{
43+
return Task.FromResult(ElasticsearchResponse<Stream>.Create(config, 503, method, path, null));
44+
}
45+
2546
public static ElasticsearchResponse<Stream> Any(
2647
IConnectionConfigurationValues config,
2748
int statusCode,
@@ -30,5 +51,14 @@ public static ElasticsearchResponse<Stream> Any(
3051
{
3152
return ElasticsearchResponse<Stream>.Create(config, statusCode, method, path, null);
3253
}
54+
55+
public static Task<ElasticsearchResponse<Stream>> AnyAsync(
56+
IConnectionConfigurationValues config,
57+
int statusCode,
58+
string method = "GET",
59+
string path = "/")
60+
{
61+
return Task.FromResult(ElasticsearchResponse<Stream>.Create(config, statusCode, method, path, null));
62+
}
3363
}
3464
}

0 commit comments

Comments
 (0)