Skip to content

Commit dbee0a2

Browse files
committed
Merge branch 'feature-sticky-connection' of https://github.com/Romanx/elasticsearch-net into Romanx-feature-sticky-connection
2 parents 4b2cea6 + 8abf371 commit dbee0a2

File tree

9 files changed

+472
-80
lines changed

9 files changed

+472
-80
lines changed

src/Benchmarking/project.lock.json

Lines changed: 36 additions & 36 deletions
Large diffs are not rendered by default.

src/CodeGeneration/Nest.Litterateur/project.lock.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3089,7 +3089,7 @@
30893089
"Microsoft.CSharp/4.0.1-beta-23409": {
30903090
"type": "package",
30913091
"serviceable": true,
3092-
"sha512": "GGOOJF47WzXZoka0JHCToQxzSguIy1UeXZywUjA1NPqvKAWVwbSbZ2VxyeIL3jyTV1BHEbBX8FPL6vweUON2aw==",
3092+
"sha512": "I1jsSsyK89VfNebrnx2eiBD5YT6zp+DcX2v6AxZ/IosS38QYmA9YKVmssMd5yhRkXwr1f8MfgZTxF1Cli90JEQ==",
30933093
"files": [
30943094
"lib/dotnet/de/Microsoft.CSharp.xml",
30953095
"lib/dotnet/es/Microsoft.CSharp.xml",
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Linq;
4+
using System.Threading;
5+
6+
namespace Elasticsearch.Net
7+
{
8+
public class StickyConnectionPool : IConnectionPool
9+
{
10+
protected IDateTimeProvider DateTimeProvider { get; }
11+
12+
protected List<Node> InternalNodes { get; set; }
13+
14+
public bool UsingSsl { get; }
15+
public bool SniffedOnStartup { get; set; }
16+
17+
public IReadOnlyCollection<Node> Nodes => this.InternalNodes;
18+
19+
public int MaxRetries => this.InternalNodes.Count - 1;
20+
21+
public bool SupportsReseeding => false;
22+
23+
public bool SupportsPinging => true;
24+
25+
public DateTime LastUpdate { get; protected set; }
26+
27+
public StickyConnectionPool(IEnumerable<Uri> uris, IDateTimeProvider dateTimeProvider = null)
28+
: this(uris.Select(uri => new Node(uri)), dateTimeProvider)
29+
{ }
30+
31+
public StickyConnectionPool(IEnumerable<Node> nodes, IDateTimeProvider dateTimeProvider = null)
32+
{
33+
nodes.ThrowIfEmpty(nameof(nodes));
34+
35+
this.DateTimeProvider = dateTimeProvider ?? Elasticsearch.Net.DateTimeProvider.Default;
36+
37+
var nn = nodes.ToList();
38+
var uris = nn.Select(n => n.Uri).ToList();
39+
if (uris.Select(u => u.Scheme).Distinct().Count() > 1)
40+
throw new ArgumentException("Trying to instantiate a connection pool with mixed URI Schemes");
41+
42+
this.UsingSsl = uris.Any(uri => uri.Scheme == "https");
43+
44+
this.InternalNodes = nn
45+
.DistinctBy(n => n.Uri)
46+
.ToList();
47+
48+
this.LastUpdate = this.DateTimeProvider.Now();
49+
}
50+
51+
protected int GlobalCursor = -1;
52+
53+
public IEnumerable<Node> CreateView(Action<AuditEvent, Node> audit = null)
54+
{
55+
var now = this.DateTimeProvider.Now();
56+
var nodes = this.InternalNodes.Where(n => n.IsAlive || n.DeadUntil <= now)
57+
.ToList();
58+
var count = nodes.Count;
59+
Node node;
60+
61+
if (count == 0)
62+
{
63+
var globalCursor = Interlocked.Increment(ref GlobalCursor);
64+
//could not find a suitable node retrying on first node off globalCursor
65+
audit?.Invoke(AuditEvent.AllNodesDead, null);
66+
node = this.InternalNodes[globalCursor % this.InternalNodes.Count];
67+
node.IsResurrected = true;
68+
audit?.Invoke(AuditEvent.Resurrection, node);
69+
yield return node;
70+
yield break;
71+
}
72+
73+
// If the cursor is greater than the default then it's been
74+
// set already but we now have a live node so we should reset it
75+
if (GlobalCursor > -1)
76+
{
77+
Interlocked.Exchange(ref GlobalCursor, -1);
78+
}
79+
80+
var localCursor = 0;
81+
82+
for (var attempts = 0; attempts < count; attempts++)
83+
{
84+
node = nodes[localCursor];
85+
localCursor = (localCursor + 1) % count;
86+
//if this node is not alive or no longer dead mark it as resurrected
87+
if (!node.IsAlive)
88+
{
89+
audit?.Invoke(AuditEvent.Resurrection, node);
90+
node.IsResurrected = true;
91+
}
92+
yield return node;
93+
}
94+
}
95+
96+
public void Reseed(IEnumerable<Node> nodes) { }
97+
98+
void IDisposable.Dispose() => this.DisposeManagedResources();
99+
100+
protected virtual void DisposeManagedResources() { }
101+
}
102+
}

src/Elasticsearch.Net/project.lock.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2025,7 +2025,7 @@
20252025
"System.Dynamic.Runtime/4.0.11-beta-23516": {
20262026
"type": "package",
20272027
"serviceable": true,
2028-
"sha512": "ypkxS0e+yUw7F6JEwuB22u0qqruMeZFOmtcImh2efDHpTAuhF2FOqCDJ7f4qLf9yomVvB4kjkZ6xGunbIQryxQ==",
2028+
"sha512": "C2GXB20I5vMcO4wemZr5pEjwwEb6H6zVkwF12JSUhripKBIKgI0YKpfp9glyDSL903cYgIXAztMQDajwCR0PmA==",
20292029
"files": [
20302030
"lib/DNXCore50/System.Dynamic.Runtime.dll",
20312031
"lib/MonoAndroid10/_._",

src/Nest/project.lock.json

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2037,7 +2037,7 @@
20372037
"runtime.any.System.Linq.Expressions/4.0.11-beta-23516": {
20382038
"type": "package",
20392039
"serviceable": true,
2040-
"sha512": "4sPxQCjllMJ1uZNlwz/EataPyHSH+AqSDlOIPPqcy/88R2B+abfhPPC78rd7gvHp8KmMX4qbJF6lcCeDIQpmVg==",
2040+
"sha512": "P5nzo1Ye0GxB4BYdWian6Y427eTrhn1JS3jLWZq5bMWVn8hS/OIfyylASN0A/qqeLn4rGA0fOzmJSYqFSKvxgQ==",
20412041
"files": [
20422042
"lib/DNXCore50/System.Linq.Expressions.dll",
20432043
"lib/MonoAndroid10/_._",
@@ -2345,7 +2345,7 @@
23452345
"System.Dynamic.Runtime/4.0.11-beta-23516": {
23462346
"type": "package",
23472347
"serviceable": true,
2348-
"sha512": "ypkxS0e+yUw7F6JEwuB22u0qqruMeZFOmtcImh2efDHpTAuhF2FOqCDJ7f4qLf9yomVvB4kjkZ6xGunbIQryxQ==",
2348+
"sha512": "C2GXB20I5vMcO4wemZr5pEjwwEb6H6zVkwF12JSUhripKBIKgI0YKpfp9glyDSL903cYgIXAztMQDajwCR0PmA==",
23492349
"files": [
23502350
"lib/DNXCore50/System.Dynamic.Runtime.dll",
23512351
"lib/MonoAndroid10/_._",
@@ -2599,7 +2599,7 @@
25992599
"System.Linq.Expressions/4.0.11-beta-23516": {
26002600
"type": "package",
26012601
"serviceable": true,
2602-
"sha512": "YEl5oyF5fifLbHHP099cvb/6f2r2h1QVHzoaoINPHOZtpNec+RfqvzETXcYDIdHT7l+bBAYsBuVUkBgfQEoYfQ==",
2602+
"sha512": "FtKytB13HabzrSvrAgBgOOnG2uxJO4s7zvP5Sk0NS3bwbJUyb5AP1p4897UWnLiB6C95jI4nIkZps51sa9In8g==",
26032603
"files": [
26042604
"lib/MonoAndroid10/_._",
26052605
"lib/MonoTouch10/_._",
Lines changed: 237 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,237 @@
1+
using System;
2+
using System.Diagnostics.CodeAnalysis;
3+
using System.Linq;
4+
using System.Threading.Tasks;
5+
using Elasticsearch.Net;
6+
using FluentAssertions;
7+
using Tests.Framework;
8+
using static Tests.Framework.TimesHelper;
9+
using static Elasticsearch.Net.AuditEvent;
10+
11+
namespace Tests.ClientConcepts.ConnectionPooling.Sticky
12+
{
13+
public class SkipDeadNodes
14+
{
15+
/** Sticky - Skipping Dead Nodes
16+
* When selecting nodes the connection pool will try and skip all the nodes that are marked dead.
17+
*/
18+
19+
protected int NumberOfNodes = 3;
20+
21+
[U] public void EachViewDoesNotSkip()
22+
{
23+
var seeds = Enumerable.Range(9200, NumberOfNodes).Select(p => new Node(new Uri("http://localhost:" + p))).ToList();
24+
var pool = new StickyConnectionPool(seeds);
25+
for (var i = 0; i < 20; i++)
26+
{
27+
var node = pool.CreateView().First();
28+
node.Uri.Port.Should().Be(9200);
29+
node = pool.CreateView().First();
30+
node.Uri.Port.Should().Be(9200);
31+
node = pool.CreateView().First();
32+
node.Uri.Port.Should().Be(9200);
33+
}
34+
}
35+
36+
[U] public void EachViewSeesNextButSkipsTheDeadNode()
37+
{
38+
var seeds = Enumerable.Range(9200, NumberOfNodes).Select(p => new Node(new Uri("http://localhost:" + p))).ToList();
39+
seeds.First().MarkDead(DateTime.Now.AddDays(1));
40+
var pool = new StickyConnectionPool(seeds);
41+
for (var i = 0; i < 20; i++)
42+
{
43+
var node = pool.CreateView().First();
44+
node.Uri.Port.Should().Be(9201);
45+
node = pool.CreateView().First();
46+
node.Uri.Port.Should().Be(9201);
47+
}
48+
/** After we marke the first node alive again we expect it to be hit again*/
49+
seeds.First().MarkAlive();
50+
for (var i = 0; i < 20; i++)
51+
{
52+
var node = pool.CreateView().First();
53+
node.Uri.Port.Should().Be(9200);
54+
node = pool.CreateView().First();
55+
node.Uri.Port.Should().Be(9200);
56+
node = pool.CreateView().First();
57+
node.Uri.Port.Should().Be(9200);
58+
}
59+
}
60+
61+
[U] public void ViewSeesResurrectedNodes()
62+
{
63+
var dateTimeProvider = new TestableDateTimeProvider();
64+
var seeds = Enumerable.Range(9200, NumberOfNodes).Select(p => new Node(new Uri("http://localhost:" + p))).ToList();
65+
seeds.First().MarkDead(dateTimeProvider.Now().AddDays(1));
66+
var pool = new StickyConnectionPool(seeds, dateTimeProvider: dateTimeProvider);
67+
for (var i = 0; i < 20; i++)
68+
{
69+
var node = pool.CreateView().First();
70+
node.Uri.Port.Should().Be(9201);
71+
node = pool.CreateView().First();
72+
node.Uri.Port.Should().Be(9201);
73+
}
74+
/** If we forward our clock 2 days the node that was marked dead until tomorrow (or yesterday!) should be resurrected */
75+
dateTimeProvider.ChangeTime(d => d.AddDays(2));
76+
var n = pool.CreateView().First();
77+
n.Uri.Port.Should().Be(9200);
78+
n = pool.CreateView().First();
79+
n.Uri.Port.Should().Be(9200);
80+
n = pool.CreateView().First();
81+
n.Uri.Port.Should().Be(9200);
82+
n.IsResurrected.Should().BeTrue();
83+
}
84+
85+
[U, SuppressMessage("AsyncUsage", "AsyncFixer001:Unnecessary async/await usage", Justification = "Its a test")]
86+
public async Task FallsOverDeadNodes()
87+
{
88+
/** A cluster with 2 nodes where the second node fails on ping */
89+
var audit = new Auditor(() => Framework.Cluster
90+
.Nodes(4)
91+
.ClientCalls(p => p.Succeeds(Always))
92+
.ClientCalls(p => p.OnPort(9200).FailAlways())
93+
.ClientCalls(p => p.OnPort(9201).FailAlways())
94+
.StickyConnectionPool()
95+
.Settings(p => p.DisablePing())
96+
);
97+
98+
await audit.TraceCalls(
99+
/** The first call goes to 9200 which succeeds */
100+
new ClientCall {
101+
{ BadResponse, 9200},
102+
{ BadResponse, 9201},
103+
{ HealthyResponse, 9202},
104+
{ pool => pool.Nodes.Where(n=>!n.IsAlive).Should().HaveCount(2) }
105+
},
106+
/** The 2nd call does a ping on 9201 because its used for the first time.
107+
* It fails so we wrap over to node 9202 */
108+
new ClientCall {
109+
{ HealthyResponse, 9202},
110+
/** Finally we assert that the connectionpool has one node that is marked as dead */
111+
{ pool => pool.Nodes.Where(n=>!n.IsAlive).Should().HaveCount(2) }
112+
},
113+
new ClientCall {
114+
{ HealthyResponse, 9202},
115+
{ pool => pool.Nodes.Where(n=>!n.IsAlive).Should().HaveCount(2) }
116+
}
117+
);
118+
}
119+
120+
[U(Skip = "Not sure how to trace this chain"), SuppressMessage("AsyncUsage", "AsyncFixer001:Unnecessary async/await usage", Justification = "Its a test")]
121+
public async Task FallsOverDeadNodesWithRecoverResetToPrimary()
122+
{
123+
/** A cluster with 2 nodes where the second node fails on ping */
124+
var audit = new Auditor(() => Framework.Cluster
125+
.Nodes(3)
126+
.ClientCalls(p => p.OnPort(9200).Fails(Twice))
127+
.ClientCalls(p => p.OnPort(9200).Succeeds(Once))
128+
.ClientCalls(p => p.OnPort(9201).Succeeds(Once))
129+
.ClientCalls(p => p.OnPort(9201).Fails(Once))
130+
.ClientCalls(p => p.OnPort(9202).FailAlways())
131+
.StickyConnectionPool()
132+
.Settings(p => p.DisablePing())
133+
);
134+
135+
await audit.TraceCalls(
136+
/** The first call goes to 9200 which fails, so we wrap to 9201 */
137+
new ClientCall {
138+
{ BadResponse, 9200},
139+
{ HealthyResponse, 9201},
140+
{ pool => pool.Nodes.Where(n=>!n.IsAlive).Should().HaveCount(1) }
141+
},
142+
/** The 2nd call does a ping on 9201 which is healthy */
143+
new ClientCall {
144+
{ HealthyResponse, 9201},
145+
/** Finally we assert that the connectionpool has one node that is marked as dead */
146+
{ pool => pool.Nodes.Where(n=>!n.IsAlive).Should().HaveCount(1) }
147+
},
148+
/** The 3rd call does a ping on 9201 which fails, then 9202 and 9203 as all fail */
149+
new ClientCall {
150+
{ BadResponse, 9201},
151+
{ BadResponse, 9202},
152+
{ MaxRetriesReached },
153+
{ pool => pool.Nodes.Where(n=>!n.IsAlive).Should().HaveCount(4) }
154+
},
155+
/** Try to resurrect first node 9200, which fails */
156+
new ClientCall {
157+
{ AllNodesDead },
158+
{ Resurrection, 9200},
159+
{ BadResponse, 9200},
160+
{ pool => pool.Nodes.Where(n=>!n.IsAlive).Should().HaveCount(4) }
161+
},
162+
/** Try to ressurect second node 9201 which succeeds */
163+
new ClientCall {
164+
{ AllNodesDead },
165+
{ Resurrection, 9201},
166+
{ HealthyResponse, 9201},
167+
{ pool => pool.Nodes.Where(n=>!n.IsAlive).Should().HaveCount(3) }
168+
},
169+
/** The ping on 9201 which returns a bad response leaving all nodes dead */
170+
new ClientCall {
171+
{ BadResponse, 9201},
172+
{ MaxRetriesReached },
173+
{ pool => pool.Nodes.Where(n=>!n.IsAlive).Should().HaveCount(4) }
174+
},
175+
/** Try ressurect from 9201 again which succeeded */
176+
new ClientCall {
177+
{ AllNodesDead },
178+
{ Resurrection, 9200},
179+
{ HealthyResponse, 9200},
180+
{ pool => pool.Nodes.Where(n=>!n.IsAlive).Should().HaveCount(3) }
181+
}
182+
);
183+
}
184+
185+
[U, SuppressMessage("AsyncUsage", "AsyncFixer001:Unnecessary async/await usage", Justification = "Its a test")]
186+
public async Task PicksADifferentNodeEachTimeAnodeIsDown()
187+
{
188+
/** A cluster with 2 nodes where the second node fails on ping */
189+
var audit = new Auditor(() => Framework.Cluster
190+
.Nodes(4)
191+
.ClientCalls(p => p.Fails(Always))
192+
.StickyConnectionPool()
193+
.Settings(p => p.DisablePing())
194+
);
195+
196+
await audit.TraceCalls(
197+
/** All the calls fail */
198+
new ClientCall {
199+
{ BadResponse, 9200},
200+
{ BadResponse, 9201},
201+
{ BadResponse, 9202},
202+
{ BadResponse, 9203},
203+
{ MaxRetriesReached },
204+
{ pool => pool.Nodes.Where(n=>!n.IsAlive).Should().HaveCount(4) }
205+
},
206+
/** After all our registered nodes are marked dead we want to sample a single dead node
207+
* each time to quickly see if the cluster is back up. We do not want to retry all 4
208+
* nodes
209+
*/
210+
new ClientCall {
211+
{ AllNodesDead },
212+
{ Resurrection, 9200},
213+
{ BadResponse, 9200},
214+
{ pool => pool.Nodes.Where(n=>!n.IsAlive).Should().HaveCount(4) }
215+
},
216+
new ClientCall {
217+
{ AllNodesDead },
218+
{ Resurrection, 9201},
219+
{ BadResponse, 9201},
220+
{ pool => pool.Nodes.Where(n=>!n.IsAlive).Should().HaveCount(4) }
221+
},
222+
new ClientCall {
223+
{ AllNodesDead },
224+
{ Resurrection, 9202},
225+
{ BadResponse, 9202},
226+
{ pool => pool.Nodes.Where(n=>!n.IsAlive).Should().HaveCount(4) }
227+
},
228+
new ClientCall {
229+
{ AllNodesDead },
230+
{ Resurrection, 9203},
231+
{ BadResponse, 9203},
232+
{ pool => pool.Nodes.Where(n=>!n.IsAlive).Should().HaveCount(4) }
233+
}
234+
);
235+
}
236+
}
237+
}

0 commit comments

Comments
 (0)