Skip to content

Commit 7822a8e

Browse files
committed
Merge branch 'Romanx-feature-sticky-connection'
2 parents 4b2cea6 + 968c453 commit 7822a8e

File tree

11 files changed

+411
-86
lines changed

11 files changed

+411
-86
lines changed

src/Benchmarking/project.lock.json

Lines changed: 36 additions & 36 deletions
Large diffs are not rendered by default.

src/CodeGeneration/Nest.Litterateur/Documentation/Files/CSharpDocumentationFile.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
using System.Linq;
44
using System.Text;
55
using Microsoft.CodeAnalysis.CSharp;
6-
using Microsoft.CodeAnalysis.Formatting;
76
using Nest.Litterateur.Documentation.Blocks;
87
using Nest.Litterateur.Walkers;
98

src/CodeGeneration/Nest.Litterateur/Walkers/CodeWithDocumentationWalker.cs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,6 @@
77
using System.Linq;
88
using System.Text;
99
using System.Text.RegularExpressions;
10-
using Microsoft.CodeAnalysis.Formatting;
11-
#if !DOTNETCORE
12-
using Microsoft.CodeAnalysis.MSBuild;
13-
#endif
14-
using Microsoft.CodeAnalysis.Text;
1510
using Nest.Litterateur.Documentation.Blocks;
1611

1712
namespace Nest.Litterateur.Walkers

src/CodeGeneration/Nest.Litterateur/project.lock.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3089,7 +3089,7 @@
30893089
"Microsoft.CSharp/4.0.1-beta-23409": {
30903090
"type": "package",
30913091
"serviceable": true,
3092-
"sha512": "GGOOJF47WzXZoka0JHCToQxzSguIy1UeXZywUjA1NPqvKAWVwbSbZ2VxyeIL3jyTV1BHEbBX8FPL6vweUON2aw==",
3092+
"sha512": "I1jsSsyK89VfNebrnx2eiBD5YT6zp+DcX2v6AxZ/IosS38QYmA9YKVmssMd5yhRkXwr1f8MfgZTxF1Cli90JEQ==",
30933093
"files": [
30943094
"lib/dotnet/de/Microsoft.CSharp.xml",
30953095
"lib/dotnet/es/Microsoft.CSharp.xml",
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Linq;
4+
using System.Threading;
5+
6+
namespace Elasticsearch.Net
7+
{
8+
public class StickyConnectionPool : IConnectionPool
9+
{
10+
protected IDateTimeProvider DateTimeProvider { get; }
11+
12+
protected List<Node> InternalNodes { get; set; }
13+
14+
public bool UsingSsl { get; }
15+
public bool SniffedOnStartup { get; set; }
16+
17+
public IReadOnlyCollection<Node> Nodes => this.InternalNodes;
18+
19+
public int MaxRetries => this.InternalNodes.Count - 1;
20+
21+
public bool SupportsReseeding => false;
22+
23+
public bool SupportsPinging => true;
24+
25+
public DateTime LastUpdate { get; protected set; }
26+
27+
public StickyConnectionPool(IEnumerable<Uri> uris, IDateTimeProvider dateTimeProvider = null)
28+
: this(uris.Select(uri => new Node(uri)), dateTimeProvider)
29+
{ }
30+
31+
public StickyConnectionPool(IEnumerable<Node> nodes, IDateTimeProvider dateTimeProvider = null)
32+
{
33+
nodes.ThrowIfEmpty(nameof(nodes));
34+
35+
this.DateTimeProvider = dateTimeProvider ?? Elasticsearch.Net.DateTimeProvider.Default;
36+
37+
var nn = nodes.ToList();
38+
var uris = nn.Select(n => n.Uri).ToList();
39+
if (uris.Select(u => u.Scheme).Distinct().Count() > 1)
40+
throw new ArgumentException("Trying to instantiate a connection pool with mixed URI Schemes");
41+
42+
this.UsingSsl = uris.Any(uri => uri.Scheme == "https");
43+
44+
this.InternalNodes = nn
45+
.DistinctBy(n => n.Uri)
46+
.ToList();
47+
48+
this.LastUpdate = this.DateTimeProvider.Now();
49+
}
50+
51+
protected int GlobalCursor = -1;
52+
53+
public IEnumerable<Node> CreateView(Action<AuditEvent, Node> audit = null)
54+
{
55+
var now = this.DateTimeProvider.Now();
56+
var nodes = this.InternalNodes.Where(n => n.IsAlive || n.DeadUntil <= now)
57+
.ToList();
58+
var count = nodes.Count;
59+
Node node;
60+
61+
if (count == 0)
62+
{
63+
var globalCursor = Interlocked.Increment(ref GlobalCursor);
64+
//could not find a suitable node retrying on first node off globalCursor
65+
audit?.Invoke(AuditEvent.AllNodesDead, null);
66+
node = this.InternalNodes[globalCursor % this.InternalNodes.Count];
67+
node.IsResurrected = true;
68+
audit?.Invoke(AuditEvent.Resurrection, node);
69+
yield return node;
70+
yield break;
71+
}
72+
73+
// If the cursor is greater than the default then it's been
74+
// set already but we now have a live node so we should reset it
75+
if (GlobalCursor > -1)
76+
{
77+
Interlocked.Exchange(ref GlobalCursor, -1);
78+
}
79+
80+
var localCursor = 0;
81+
82+
for (var attempts = 0; attempts < count; attempts++)
83+
{
84+
node = nodes[localCursor];
85+
localCursor = (localCursor + 1) % count;
86+
//if this node is not alive or no longer dead mark it as resurrected
87+
if (!node.IsAlive)
88+
{
89+
audit?.Invoke(AuditEvent.Resurrection, node);
90+
node.IsResurrected = true;
91+
}
92+
yield return node;
93+
}
94+
}
95+
96+
public void Reseed(IEnumerable<Node> nodes) { }
97+
98+
void IDisposable.Dispose() => this.DisposeManagedResources();
99+
100+
protected virtual void DisposeManagedResources() { }
101+
}
102+
}

src/Elasticsearch.Net/project.lock.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2025,7 +2025,7 @@
20252025
"System.Dynamic.Runtime/4.0.11-beta-23516": {
20262026
"type": "package",
20272027
"serviceable": true,
2028-
"sha512": "ypkxS0e+yUw7F6JEwuB22u0qqruMeZFOmtcImh2efDHpTAuhF2FOqCDJ7f4qLf9yomVvB4kjkZ6xGunbIQryxQ==",
2028+
"sha512": "C2GXB20I5vMcO4wemZr5pEjwwEb6H6zVkwF12JSUhripKBIKgI0YKpfp9glyDSL903cYgIXAztMQDajwCR0PmA==",
20292029
"files": [
20302030
"lib/DNXCore50/System.Dynamic.Runtime.dll",
20312031
"lib/MonoAndroid10/_._",

src/Nest/project.lock.json

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2037,7 +2037,7 @@
20372037
"runtime.any.System.Linq.Expressions/4.0.11-beta-23516": {
20382038
"type": "package",
20392039
"serviceable": true,
2040-
"sha512": "4sPxQCjllMJ1uZNlwz/EataPyHSH+AqSDlOIPPqcy/88R2B+abfhPPC78rd7gvHp8KmMX4qbJF6lcCeDIQpmVg==",
2040+
"sha512": "P5nzo1Ye0GxB4BYdWian6Y427eTrhn1JS3jLWZq5bMWVn8hS/OIfyylASN0A/qqeLn4rGA0fOzmJSYqFSKvxgQ==",
20412041
"files": [
20422042
"lib/DNXCore50/System.Linq.Expressions.dll",
20432043
"lib/MonoAndroid10/_._",
@@ -2345,7 +2345,7 @@
23452345
"System.Dynamic.Runtime/4.0.11-beta-23516": {
23462346
"type": "package",
23472347
"serviceable": true,
2348-
"sha512": "ypkxS0e+yUw7F6JEwuB22u0qqruMeZFOmtcImh2efDHpTAuhF2FOqCDJ7f4qLf9yomVvB4kjkZ6xGunbIQryxQ==",
2348+
"sha512": "C2GXB20I5vMcO4wemZr5pEjwwEb6H6zVkwF12JSUhripKBIKgI0YKpfp9glyDSL903cYgIXAztMQDajwCR0PmA==",
23492349
"files": [
23502350
"lib/DNXCore50/System.Dynamic.Runtime.dll",
23512351
"lib/MonoAndroid10/_._",
@@ -2599,7 +2599,7 @@
25992599
"System.Linq.Expressions/4.0.11-beta-23516": {
26002600
"type": "package",
26012601
"serviceable": true,
2602-
"sha512": "YEl5oyF5fifLbHHP099cvb/6f2r2h1QVHzoaoINPHOZtpNec+RfqvzETXcYDIdHT7l+bBAYsBuVUkBgfQEoYfQ==",
2602+
"sha512": "FtKytB13HabzrSvrAgBgOOnG2uxJO4s7zvP5Sk0NS3bwbJUyb5AP1p4897UWnLiB6C95jI4nIkZps51sa9In8g==",
26032603
"files": [
26042604
"lib/MonoAndroid10/_._",
26052605
"lib/MonoTouch10/_._",
Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
using System;
2+
using System.Diagnostics.CodeAnalysis;
3+
using System.Linq;
4+
using System.Threading.Tasks;
5+
using Elasticsearch.Net;
6+
using FluentAssertions;
7+
using Tests.Framework;
8+
using static Tests.Framework.TimesHelper;
9+
using static Elasticsearch.Net.AuditEvent;
10+
11+
namespace Tests.ClientConcepts.ConnectionPooling.Sticky
12+
{
13+
public class SkipDeadNodes
14+
{
15+
/** Sticky - Skipping Dead Nodes
16+
* When selecting nodes the connection pool will try and skip all the nodes that are marked dead.
17+
*/
18+
19+
protected int NumberOfNodes = 3;
20+
21+
[U]
22+
public void EachViewDoesNotSkip()
23+
{
24+
var seeds = Enumerable.Range(9200, NumberOfNodes).Select(p => new Node(new Uri("http://localhost:" + p))).ToList();
25+
var pool = new StickyConnectionPool(seeds);
26+
for (var i = 0; i < 20; i++)
27+
{
28+
var node = pool.CreateView().First();
29+
node.Uri.Port.Should().Be(9200);
30+
node = pool.CreateView().First();
31+
node.Uri.Port.Should().Be(9200);
32+
node = pool.CreateView().First();
33+
node.Uri.Port.Should().Be(9200);
34+
}
35+
}
36+
37+
[U]
38+
public void EachViewSeesNextButSkipsTheDeadNode()
39+
{
40+
var seeds = Enumerable.Range(9200, NumberOfNodes).Select(p => new Node(new Uri("http://localhost:" + p))).ToList();
41+
seeds.First().MarkDead(DateTime.Now.AddDays(1));
42+
var pool = new StickyConnectionPool(seeds);
43+
for (var i = 0; i < 20; i++)
44+
{
45+
var node = pool.CreateView().First();
46+
node.Uri.Port.Should().Be(9201);
47+
node = pool.CreateView().First();
48+
node.Uri.Port.Should().Be(9201);
49+
}
50+
/** After we marke the first node alive again we expect it to be hit again*/
51+
seeds.First().MarkAlive();
52+
for (var i = 0; i < 20; i++)
53+
{
54+
var node = pool.CreateView().First();
55+
node.Uri.Port.Should().Be(9200);
56+
node = pool.CreateView().First();
57+
node.Uri.Port.Should().Be(9200);
58+
node = pool.CreateView().First();
59+
node.Uri.Port.Should().Be(9200);
60+
}
61+
}
62+
63+
[U]
64+
public void ViewSeesResurrectedNodes()
65+
{
66+
var dateTimeProvider = new TestableDateTimeProvider();
67+
var seeds = Enumerable.Range(9200, NumberOfNodes).Select(p => new Node(new Uri("http://localhost:" + p))).ToList();
68+
seeds.First().MarkDead(dateTimeProvider.Now().AddDays(1));
69+
var pool = new StickyConnectionPool(seeds, dateTimeProvider: dateTimeProvider);
70+
for (var i = 0; i < 20; i++)
71+
{
72+
var node = pool.CreateView().First();
73+
node.Uri.Port.Should().Be(9201);
74+
node = pool.CreateView().First();
75+
node.Uri.Port.Should().Be(9201);
76+
}
77+
/** If we forward our clock 2 days the node that was marked dead until tomorrow (or yesterday!) should be resurrected */
78+
dateTimeProvider.ChangeTime(d => d.AddDays(2));
79+
var n = pool.CreateView().First();
80+
n.Uri.Port.Should().Be(9200);
81+
n = pool.CreateView().First();
82+
n.Uri.Port.Should().Be(9200);
83+
n = pool.CreateView().First();
84+
n.Uri.Port.Should().Be(9200);
85+
n.IsResurrected.Should().BeTrue();
86+
}
87+
88+
[U, SuppressMessage("AsyncUsage", "AsyncFixer001:Unnecessary async/await usage", Justification = "Its a test")]
89+
public async Task FallsOverDeadNodes()
90+
{
91+
/** A cluster with 2 nodes where the second node fails on ping */
92+
var audit = new Auditor(() => Framework.Cluster
93+
.Nodes(4)
94+
.ClientCalls(p => p.Succeeds(Always))
95+
.ClientCalls(p => p.OnPort(9200).FailAlways())
96+
.ClientCalls(p => p.OnPort(9201).FailAlways())
97+
.StickyConnectionPool()
98+
.Settings(p => p.DisablePing())
99+
);
100+
101+
await audit.TraceCalls(
102+
/** The first call goes to 9200 which succeeds */
103+
new ClientCall {
104+
{ BadResponse, 9200},
105+
{ BadResponse, 9201},
106+
{ HealthyResponse, 9202},
107+
{ pool => pool.Nodes.Where(n=>!n.IsAlive).Should().HaveCount(2) }
108+
},
109+
/** The 2nd call does a ping on 9201 because its used for the first time.
110+
* It fails so we wrap over to node 9202 */
111+
new ClientCall {
112+
{ HealthyResponse, 9202},
113+
/** Finally we assert that the connectionpool has one node that is marked as dead */
114+
{ pool => pool.Nodes.Where(n=>!n.IsAlive).Should().HaveCount(2) }
115+
},
116+
new ClientCall {
117+
{ HealthyResponse, 9202},
118+
{ pool => pool.Nodes.Where(n=>!n.IsAlive).Should().HaveCount(2) }
119+
}
120+
);
121+
}
122+
123+
[U, SuppressMessage("AsyncUsage", "AsyncFixer001:Unnecessary async/await usage", Justification = "Its a test")]
124+
public async Task PicksADifferentNodeEachTimeAnodeIsDown()
125+
{
126+
/** A cluster with 2 nodes where the second node fails on ping */
127+
var audit = new Auditor(() => Framework.Cluster
128+
.Nodes(4)
129+
.ClientCalls(p => p.Fails(Always))
130+
.StickyConnectionPool()
131+
.Settings(p => p.DisablePing())
132+
);
133+
134+
await audit.TraceCalls(
135+
/** All the calls fail */
136+
new ClientCall {
137+
{ BadResponse, 9200},
138+
{ BadResponse, 9201},
139+
{ BadResponse, 9202},
140+
{ BadResponse, 9203},
141+
{ MaxRetriesReached },
142+
{ pool => pool.Nodes.Where(n=>!n.IsAlive).Should().HaveCount(4) }
143+
},
144+
/** After all our registered nodes are marked dead we want to sample a single dead node
145+
* each time to quickly see if the cluster is back up. We do not want to retry all 4
146+
* nodes
147+
*/
148+
new ClientCall {
149+
{ AllNodesDead },
150+
{ Resurrection, 9200},
151+
{ BadResponse, 9200},
152+
{ pool => pool.Nodes.Where(n=>!n.IsAlive).Should().HaveCount(4) }
153+
},
154+
new ClientCall {
155+
{ AllNodesDead },
156+
{ Resurrection, 9201},
157+
{ BadResponse, 9201},
158+
{ pool => pool.Nodes.Where(n=>!n.IsAlive).Should().HaveCount(4) }
159+
},
160+
new ClientCall {
161+
{ AllNodesDead },
162+
{ Resurrection, 9202},
163+
{ BadResponse, 9202},
164+
{ pool => pool.Nodes.Where(n=>!n.IsAlive).Should().HaveCount(4) }
165+
},
166+
new ClientCall {
167+
{ AllNodesDead },
168+
{ Resurrection, 9203},
169+
{ BadResponse, 9203},
170+
{ pool => pool.Nodes.Where(n=>!n.IsAlive).Should().HaveCount(4) }
171+
}
172+
);
173+
}
174+
}
175+
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
using System;
2+
using System.Collections.Concurrent;
3+
using System.Collections.Generic;
4+
using System.Linq;
5+
using System.Threading;
6+
using Elasticsearch.Net;
7+
using FluentAssertions;
8+
using Tests.Framework;
9+
10+
namespace Tests.ClientConcepts.ConnectionPooling.Sticky
11+
{
12+
public class Sticky
13+
{
14+
/** Sticky
15+
* Each connection pool returns the first `live` node so that it is sticky between requests
16+
*/
17+
18+
protected int NumberOfNodes = 10;
19+
20+
[U]
21+
public void EachViewStartsAtNextPositionAndWrapsOver()
22+
{
23+
var uris = Enumerable.Range(9200, NumberOfNodes).Select(p => new Uri("http://localhost:" + p));
24+
var staticPool = new StickyConnectionPool(uris);
25+
26+
this.AssertCreateView(staticPool);
27+
}
28+
29+
public void AssertCreateView(StickyConnectionPool pool)
30+
{
31+
/**
32+
* Here we have setup a static connection pool seeded with 10 nodes.
33+
* So what order we expect? Imagine the following:
34+
*
35+
* Thread A calls GetNext and gets returned the first live node
36+
* Thread B calls GetNext() and gets returned the same node as it's still the first live.
37+
*/
38+
39+
var startingPositions = Enumerable.Range(0, NumberOfNodes)
40+
.Select(i => pool.CreateView().First())
41+
.Select(n => n.Uri.Port)
42+
.ToList();
43+
44+
var expectedOrder = Enumerable.Repeat(9200, NumberOfNodes);
45+
startingPositions.Should().ContainInOrder(expectedOrder);
46+
}
47+
}
48+
}

0 commit comments

Comments
 (0)