Skip to content

Commit fba1fcc

Browse files
author
Alex McAuliffe
committed
First attempt at a sticky connection pool #1810
1 parent 1e89b91 commit fba1fcc

File tree

1 file changed

+98
-0
lines changed

1 file changed

+98
-0
lines changed
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Linq;
4+
using System.Text;
5+
using System.Threading;
6+
using System.Threading.Tasks;
7+
8+
namespace Elasticsearch.Net.ConnectionPool
9+
{
10+
public class StickyConnectionPool : IConnectionPool
11+
{
12+
protected IDateTimeProvider DateTimeProvider { get; }
13+
14+
protected List<Node> InternalNodes { get; set; }
15+
16+
public bool UsingSsl { get; }
17+
public bool SniffedOnStartup { get; set; }
18+
19+
public IReadOnlyCollection<Node> Nodes => this.InternalNodes;
20+
21+
public int MaxRetries => this.InternalNodes.Count - 1;
22+
23+
public bool SupportsReseeding => false;
24+
25+
public bool SupportsPinging => true;
26+
27+
public DateTime LastUpdate { get; protected set; }
28+
29+
public StickyConnectionPool(IEnumerable<Uri> uris, IDateTimeProvider dateTimeProvider = null)
30+
: this(uris.Select(uri => new Node(uri)), dateTimeProvider)
31+
{ }
32+
33+
public StickyConnectionPool(IEnumerable<Node> nodes, IDateTimeProvider dateTimeProvider = null)
34+
{
35+
nodes.ThrowIfEmpty(nameof(nodes));
36+
37+
this.DateTimeProvider = dateTimeProvider ?? Elasticsearch.Net.DateTimeProvider.Default;
38+
39+
var nn = nodes.ToList();
40+
var uris = nn.Select(n => n.Uri).ToList();
41+
if (uris.Select(u => u.Scheme).Distinct().Count() > 1)
42+
throw new ArgumentException("Trying to instantiate a connection pool with mixed URI Schemes");
43+
44+
this.UsingSsl = uris.Any(uri => uri.Scheme == "https");
45+
46+
this.InternalNodes = nn
47+
.DistinctBy(n => n.Uri)
48+
.ToList();
49+
50+
this.LastUpdate = this.DateTimeProvider.Now();
51+
}
52+
53+
protected int GlobalCursor = -1;
54+
55+
public IEnumerable<Node> CreateView(Action<AuditEvent, Node> audit = null)
56+
{
57+
var now = this.DateTimeProvider.Now();
58+
var nodes = this.InternalNodes.Where(n => n.IsAlive || n.DeadUntil <= now)
59+
.ToList();
60+
var count = nodes.Count;
61+
Node node;
62+
63+
var globalCursor = Interlocked.Increment(ref GlobalCursor);
64+
65+
if (count == 0)
66+
{
67+
//could not find a suitable node retrying on first node off globalCursor
68+
audit?.Invoke(AuditEvent.AllNodesDead, null);
69+
node = this.InternalNodes[globalCursor % this.InternalNodes.Count];
70+
node.IsResurrected = true;
71+
audit?.Invoke(AuditEvent.Resurrection, node);
72+
yield return node;
73+
yield break;
74+
}
75+
76+
var localCursor = globalCursor % count;
77+
78+
for (var attempts = 0; attempts < count; attempts++)
79+
{
80+
node = nodes[localCursor];
81+
localCursor = (localCursor + 1) % count;
82+
//if this node is not alive or no longer dead mark it as resurrected
83+
if (!node.IsAlive)
84+
{
85+
audit?.Invoke(AuditEvent.Resurrection, node);
86+
node.IsResurrected = true;
87+
}
88+
yield return node;
89+
}
90+
}
91+
92+
public void Reseed(IEnumerable<Node> nodes) { }
93+
94+
void IDisposable.Dispose() => this.DisposeManagedResources();
95+
96+
protected virtual void DisposeManagedResources() { }
97+
}
98+
}

0 commit comments

Comments
 (0)