Skip to content

Commit fdcf00e

Browse files
committed
[#34] Registry-based routing pool
1 parent 9e8d39c commit fdcf00e

File tree

9 files changed

+253
-202
lines changed

9 files changed

+253
-202
lines changed

lib/nebulex_redis_adapter.ex

Lines changed: 40 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -168,9 +168,9 @@ defmodule NebulexRedisAdapter do
168168
]
169169
]
170170
171-
By default, the adapter uses `NebulexRedisAdapter.ClientCluster.Keyslot` for the
172-
keyslot. Besides, if `:jchash` is defined as dependency, the adapter will use
173-
consistent-hashing automatically. However, you can also provide your own
171+
By default, the adapter uses `NebulexRedisAdapter.ClientCluster.Keyslot` for
172+
the keyslot. Besides, if `:jchash` is defined as dependency, the adapter will
173+
use consistent-hashing automatically. However, you can also provide your own
174174
implementation by implementing the `Nebulex.Adapter.Keyslot` and set it into
175175
the `:keyslot` option. For example:
176176
@@ -194,12 +194,12 @@ defmodule NebulexRedisAdapter do
194194
195195
### Client-side cluster options
196196
197-
In addition to shared options, `:client_side_cluster` mode supports the following
198-
options:
197+
In addition to shared options, `:client_side_cluster` mode supports the
198+
following options:
199199
200200
* `:nodes` - The list of nodes the adapter will setup the cluster with;
201-
a pool of connections is established per node. The `:client_side_cluster` mode
202-
enables resilience to be able to survive in case any node(s) gets
201+
a pool of connections is established per node. The `:client_side_cluster`
202+
mode enables resilience to be able to survive in case any node(s) gets
203203
unreachable. For each element of the list, we set the configuration
204204
for each node, such as `:conn_opts`, `:pool_size`, etc.
205205
@@ -333,13 +333,19 @@ defmodule NebulexRedisAdapter do
333333

334334
@impl true
335335
def init(opts) do
336-
# required cache name
336+
# Required cache name
337337
name = opts[:name] || Keyword.fetch!(opts, :cache)
338338

339-
# adapter mode
339+
# Init stats
340+
stats_counter = Stats.init(opts)
341+
342+
# Adapter mode
340343
mode = Keyword.get(opts, :mode, :standalone)
341344

342-
# pool size
345+
# Local registry
346+
registry = normalize_module_name([name, Registry])
347+
348+
# Resolve the pool size
343349
pool_size =
344350
get_option(
345351
opts,
@@ -349,60 +355,62 @@ defmodule NebulexRedisAdapter do
349355
System.schedulers_online()
350356
)
351357

352-
# init the specs according to the adapter mode
353-
{children, default_keyslot} = do_init(mode, name, pool_size, opts)
354-
355-
# keyslot module for selecting nodes
358+
# Keyslot module for selecting nodes
356359
keyslot =
357-
opts
358-
|> Keyword.get(:keyslot, default_keyslot)
359-
|> assert_behaviour(Nebulex.Adapter.Keyslot, "keyslot")
360+
if keyslot = Keyword.get(opts, :keyslot) do
361+
assert_behaviour(keyslot, Nebulex.Adapter.Keyslot, "keyslot")
362+
end
360363

361-
# cluster nodes
364+
# Cluster nodes
362365
nodes =
363366
for {node_name, node_opts} <- Keyword.get(opts, :nodes, []) do
364-
{node_name, Keyword.get(node_opts, :pool_size, System.schedulers_online())}
367+
{node_name, Keyword.get(node_opts, :pool_size, pool_size)}
365368
end
366369

367-
# init stats
368-
stats_counter = Stats.init(opts)
369-
370+
# Init adapter metadata
370371
adapter_meta = %{
372+
cache_pid: self(),
371373
name: name,
372374
mode: mode,
373375
keyslot: keyslot,
374376
nodes: nodes,
375377
pool_size: pool_size,
376378
stats_counter: stats_counter,
379+
registry: registry,
377380
started_at: DateTime.utc_now(),
378381
default_dt: Keyword.get(opts, :default_data_type, :object),
379382
telemetry: Keyword.fetch!(opts, :telemetry),
380383
telemetry_prefix: Keyword.fetch!(opts, :telemetry_prefix)
381384
}
382385

386+
# Init the connections child spec according to the adapter mode
387+
{conn_child_spec, adapter_meta} = do_init(adapter_meta, opts)
388+
389+
# Build the child spec
383390
child_spec =
384391
Nebulex.Adapters.Supervisor.child_spec(
385392
name: normalize_module_name([name, Supervisor]),
386393
strategy: :one_for_all,
387-
children: [{NebulexRedisAdapter.BootstrapServer, adapter_meta} | children]
394+
children: [
395+
{NebulexRedisAdapter.BootstrapServer, adapter_meta},
396+
{Registry, name: registry, keys: :unique},
397+
conn_child_spec
398+
]
388399
)
389400

390401
{:ok, child_spec, adapter_meta}
391402
end
392403

393-
defp do_init(:standalone, name, pool_size, opts) do
394-
{:ok, children} = Connection.init(name, pool_size, opts)
395-
{children, ClientCluster.Keyslot}
404+
defp do_init(%{mode: :standalone} = adapter_meta, opts) do
405+
Connection.init(adapter_meta, opts)
396406
end
397407

398-
defp do_init(:client_side_cluster, _name, _pool_size, opts) do
399-
{:ok, children} = ClientCluster.init(opts)
400-
{children, ClientCluster.Keyslot}
408+
defp do_init(%{mode: :redis_cluster} = adapter_meta, opts) do
409+
RedisCluster.init(adapter_meta, opts)
401410
end
402411

403-
defp do_init(:redis_cluster, name, pool_size, opts) do
404-
{:ok, children} = RedisCluster.init(name, pool_size, opts)
405-
{children, RedisCluster.Keyslot}
412+
defp do_init(%{mode: :client_side_cluster} = adapter_meta, opts) do
413+
ClientCluster.init(adapter_meta, opts)
406414
end
407415

408416
## Nebulex.Adapter.Entry

lib/nebulex_redis_adapter/client_cluster.ex

Lines changed: 39 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -2,28 +2,46 @@ defmodule NebulexRedisAdapter.ClientCluster do
22
# Client-side Cluster
33
@moduledoc false
44

5-
alias NebulexRedisAdapter.ClientCluster.Supervisor, as: ClusterSupervisor
6-
alias NebulexRedisAdapter.{Connection, Pool}
5+
alias NebulexRedisAdapter.ClientCluster.Keyslot, as: ClientClusterKeyslot
6+
alias NebulexRedisAdapter.ClientCluster.Supervisor, as: ClientClusterSupervisor
7+
alias NebulexRedisAdapter.Pool
8+
9+
@typedoc "Proxy type to the adapter meta"
10+
@type adapter_meta :: Nebulex.Adapter.metadata()
711

812
@type hash_slot :: {:"$hash_slot", term}
913
@type node_entry :: {node_name :: atom, pool_size :: pos_integer}
1014
@type nodes_config :: [node_entry]
1115

12-
@compile {:inline, pool_name: 2}
13-
1416
## API
1517

16-
@spec init(Keyword.t()) :: {:ok, [:supervisor.child_spec() | {module(), term()} | module()]}
17-
def init(opts) do
18-
cache = Keyword.fetch!(opts, :cache)
19-
20-
children =
18+
@spec init(adapter_meta, Keyword.t()) :: {Supervisor.child_spec(), adapter_meta}
19+
def init(%{name: name, registry: registry, pool_size: pool_size} = adapter_meta, opts) do
20+
node_connections_specs =
2121
for {node_name, node_opts} <- Keyword.get(opts, :nodes, []) do
22-
arg = {Connection, [name: pool_name(cache, node_name)] ++ node_opts}
23-
Supervisor.child_spec({ClusterSupervisor, arg}, type: :supervisor, id: {cache, node_name})
22+
node_opts =
23+
node_opts
24+
|> Keyword.put(:name, name)
25+
|> Keyword.put(:registry, registry)
26+
|> Keyword.put(:node, node_name)
27+
|> Keyword.put_new(:pool_size, pool_size)
28+
29+
Supervisor.child_spec({ClientClusterSupervisor, node_opts},
30+
type: :supervisor,
31+
id: {name, node_name}
32+
)
2433
end
2534

26-
{:ok, children}
35+
node_connections_supervisor_spec = %{
36+
id: :node_connections_supervisor,
37+
type: :supervisor,
38+
start: {Supervisor, :start_link, [node_connections_specs, [strategy: :one_for_one]]}
39+
}
40+
41+
adapter_meta =
42+
Map.update(adapter_meta, :keyslot, ClientClusterKeyslot, &(&1 || ClientClusterKeyslot))
43+
44+
{node_connections_supervisor_spec, adapter_meta}
2745
end
2846

2947
@spec exec!(
@@ -33,37 +51,31 @@ defmodule NebulexRedisAdapter.ClientCluster do
3351
reducer :: (any, any -> any)
3452
) :: any | no_return
3553
def exec!(
36-
%{name: name, nodes: nodes},
54+
%{name: name, registry: registry, nodes: nodes},
3755
command,
3856
init_acc \\ nil,
3957
reducer \\ fn res, _ -> res end
4058
) do
41-
# TODO: Perhaps this should be performed in parallel
4259
Enum.reduce(nodes, init_acc, fn {node_name, pool_size}, acc ->
43-
name
44-
|> pool_name(node_name)
45-
|> Pool.get_conn(pool_size)
60+
registry
61+
|> Pool.get_conn({name, node_name}, pool_size)
4662
|> Redix.command!(command)
4763
|> reducer.(acc)
4864
end)
4965
end
5066

51-
@spec get_conn(Nebulex.Cache.t(), nodes_config, atom) :: atom
52-
def get_conn(cache, nodes, node_name) do
67+
@spec get_conn(atom, atom, nodes_config, atom) :: pid
68+
def get_conn(registry, name, nodes, node_name) do
5369
pool_size = Keyword.fetch!(nodes, node_name)
5470

55-
cache
56-
|> pool_name(node_name)
57-
|> Pool.get_conn(pool_size)
71+
Pool.get_conn(registry, {name, node_name}, pool_size)
5872
end
5973

60-
@spec get_conn(Nebulex.Cache.t(), nodes_config, term, module) :: atom
61-
def get_conn(cache, nodes, key, module) do
74+
@spec get_conn(atom, atom, nodes_config, term, module) :: pid
75+
def get_conn(registry, name, nodes, key, module) do
6276
{node_name, pool_size} = get_node(module, nodes, key)
6377

64-
cache
65-
|> pool_name(node_name)
66-
|> Pool.get_conn(pool_size)
78+
Pool.get_conn(registry, {name, node_name}, pool_size)
6779
end
6880

6981
@spec group_keys_by_hash_slot(Enum.t(), nodes_config, module) :: map
@@ -79,9 +91,6 @@ defmodule NebulexRedisAdapter.ClientCluster do
7991
end)
8092
end
8193

82-
@spec pool_name(Nebulex.Cache.t(), atom) :: atom
83-
def pool_name(cache, node_name), do: :"#{cache}.#{node_name}"
84-
8594
## Private Functions
8695

8796
defp get_node(module, nodes, key) do

lib/nebulex_redis_adapter/client_cluster/supervisor.ex

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,17 +7,24 @@ defmodule NebulexRedisAdapter.ClientCluster.Supervisor do
77
## API
88

99
@doc false
10-
def start_link({module, opts}) do
11-
name = Keyword.fetch!(opts, :name)
12-
Supervisor.start_link(__MODULE__, {module, opts}, name: name)
10+
def start_link(opts) do
11+
Supervisor.start_link(__MODULE__, opts)
1312
end
1413

1514
## Supervisor Callbacks
1615

1716
@impl true
18-
def init({module, opts}) do
19-
module
20-
|> Pool.children(opts)
21-
|> Supervisor.init(strategy: :one_for_one)
17+
def init(opts) do
18+
name = Keyword.fetch!(opts, :name)
19+
registry = Keyword.fetch!(opts, :registry)
20+
node = Keyword.fetch!(opts, :node)
21+
pool_size = Keyword.fetch!(opts, :pool_size)
22+
23+
children =
24+
Pool.register_names(registry, {name, node}, pool_size, fn conn_name ->
25+
{NebulexRedisAdapter.Connection, Keyword.put(opts, :name, conn_name)}
26+
end)
27+
28+
Supervisor.init(children, strategy: :one_for_one)
2229
end
2330
end

lib/nebulex_redis_adapter/command.ex

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -31,20 +31,32 @@ defmodule NebulexRedisAdapter.Command do
3131

3232
## Private Functions
3333

34-
defp conn(%{mode: :standalone, name: name, pool_size: pool_size}, _key) do
35-
Pool.get_conn(name, pool_size)
34+
defp conn(%{mode: :standalone, name: name, registry: registry, pool_size: pool_size}, _key) do
35+
Pool.get_conn(registry, name, pool_size)
3636
end
3737

38-
defp conn(%{mode: :client_side_cluster, name: name, nodes: nodes}, {:"$hash_slot", node_name}) do
39-
ClientCluster.get_conn(name, nodes, node_name)
38+
defp conn(%{mode: :redis_cluster} = meta, key) do
39+
RedisCluster.get_conn(meta, key)
4040
end
4141

42-
defp conn(%{mode: :client_side_cluster, name: name, nodes: nodes, keyslot: keyslot}, key) do
43-
ClientCluster.get_conn(name, nodes, key, keyslot)
42+
defp conn(
43+
%{mode: :client_side_cluster, name: name, registry: registry, nodes: nodes},
44+
{:"$hash_slot", node_name}
45+
) do
46+
ClientCluster.get_conn(registry, name, nodes, node_name)
4447
end
4548

46-
defp conn(%{mode: :redis_cluster} = meta, key) do
47-
RedisCluster.get_conn(meta, key)
49+
defp conn(
50+
%{
51+
mode: :client_side_cluster,
52+
name: name,
53+
registry: registry,
54+
nodes: nodes,
55+
keyslot: keyslot
56+
},
57+
key
58+
) do
59+
ClientCluster.get_conn(registry, name, nodes, key, keyslot)
4860
end
4961

5062
defp handle_command_response({:ok, response}) do

lib/nebulex_redis_adapter/connection.ex

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,29 @@
11
defmodule NebulexRedisAdapter.Connection do
22
@moduledoc false
33

4+
alias NebulexRedisAdapter.Pool
5+
6+
@typedoc "Proxy type to the adapter meta"
7+
@type adapter_meta :: Nebulex.Adapter.metadata()
8+
49
## API
510

6-
@spec init(atom, pos_integer, Keyword.t()) :: {:ok, [Supervisor.child_spec()]}
7-
def init(name, pool_size, opts) do
8-
children =
9-
for i <- 0..(pool_size - 1) do
10-
child_spec([name: :"#{name}.#{i}"] ++ opts)
11-
end
11+
@spec init(adapter_meta, Keyword.t()) :: {Supervisor.child_spec(), adapter_meta}
12+
def init(%{name: name, registry: registry, pool_size: pool_size} = adapter_meta, opts) do
13+
connections_specs =
14+
Pool.register_names(registry, name, pool_size, fn conn_name ->
15+
opts
16+
|> Keyword.put(:name, conn_name)
17+
|> child_spec()
18+
end)
19+
20+
connections_supervisor_spec = %{
21+
id: :connections_supervisor,
22+
type: :supervisor,
23+
start: {Supervisor, :start_link, [connections_specs, [strategy: :one_for_one]]}
24+
}
1225

13-
{:ok, children}
26+
{connections_supervisor_spec, adapter_meta}
1427
end
1528

1629
@spec child_spec(Keyword.t()) :: Supervisor.child_spec()

lib/nebulex_redis_adapter/pool.ex

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,20 +3,21 @@ defmodule NebulexRedisAdapter.Pool do
33

44
## API
55

6-
@spec children(module, Keyword.t()) :: [Supervisor.child_spec()]
7-
def children(module, opts) do
8-
name = Keyword.fetch!(opts, :name)
9-
pool_size = opts[:pool_size] || System.schedulers_online()
10-
6+
@spec register_names(atom, term, pos_integer, ({:via, module, term} -> term)) :: [term]
7+
def register_names(registry, key, pool_size, fun) do
118
for index <- 0..(pool_size - 1) do
12-
opts = Keyword.put(opts, :name, :"#{name}.#{index}")
13-
{module, opts}
9+
fun.({:via, Registry, {registry, {key, index}}})
1410
end
1511
end
1612

17-
@spec get_conn(atom, pos_integer) :: atom
18-
def get_conn(name, pool_size) do
19-
# ensure to select the same connection based on the caller PID
20-
:"#{name}.#{:erlang.phash2(self(), pool_size)}"
13+
@spec get_conn(atom, term, pos_integer) :: pid
14+
def get_conn(registry, key, pool_size) do
15+
# Ensure selecting the same connection based on the caller PID
16+
index = :erlang.phash2(self(), pool_size)
17+
18+
registry
19+
|> Registry.lookup({key, index})
20+
|> hd()
21+
|> elem(0)
2122
end
2223
end

0 commit comments

Comments
 (0)