Skip to content

Commit 01526df

Browse files
committed
ExHashRing for distributing the keys across the cluster members
1 parent 63f4d0c commit 01526df

File tree

19 files changed

+337
-349
lines changed

19 files changed

+337
-349
lines changed

.github/workflows/ci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ jobs:
3333
otp: 25.x
3434
os: 'ubuntu-latest'
3535
- elixir: 1.14.x
36-
otp: 24.x
36+
otp: 25.x
3737
os: 'ubuntu-latest'
3838

3939
env:

CHANGELOG.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,22 @@
1212
- [Nebulex.Adapters.Partitioned] The adapter implements the new Nebulex v3 API.
1313
- [Nebulex.Adapters.Partitioned] The adapter supports the `:timeout` option from
1414
`Nebulex.Cache`.
15+
- [Nebulex.Adapters.Partitioned] The adapter uses `ExHashRing` for distributing
16+
the keys across the cluster members.
17+
- [Nebulex.Adapters.Partitioned] he adapter supports the `:hash_ring` option to
18+
configute `ExHashRing`.
1519

1620
### Backwards incompatible changes
1721

1822
- [Nebulex.Adapters.Multilevel] Option `:model` is no longer supported. Please
1923
use the option `:inclusion_policy` instead.
2024
- [Nebulex.Adapters.Multilevel] The previous extended function `model/0,1` has
2125
been removed. Please use `inclusion_policy/0,1` instead.
26+
- [Nebulex.Adapters.Partitioned] The option `:keyslot` is no longer supported,
27+
so the `Nebulex.Adapter.Keyslot` behaviour has been removed. The partitioned
28+
adapter uses `ExHashRing` for key distribution under-the-hood.
29+
- [Nebulex.Adapters.Partitioned] The option `:join_timeout` is no longer
30+
supported.
2231
- [Nebulex.Distributed.RPC] The usage of async tasks for the RPC calls has been
2332
removed (for OTP < 23). The new `Nebulex.Distributed.RPC` module uses `:erpc`.
2433

README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@ the primary cache storage with the option `:primary_storage_adapter`.
2525

2626
## Installation
2727

28-
Add `:nebulex_distributed` to your list of dependencies in `mix.exs`:
28+
`:nebulex_distributed` requires Erlang/OTP 25 or later. Then add
29+
`:nebulex_distributed` to your list of dependencies in `mix.exs`:
2930

3031
```elixir
3132
def deps do

coveralls.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@
22
"coverage_options": {
33
"minimum_coverage": 100
44
},
5-
65
"skip_files": [
6+
"lib/nebulex/distributed.ex",
77
"test/*"
88
]
9-
}
9+
}

lib/nebulex/adapter/keyslot.ex

Lines changed: 0 additions & 51 deletions
This file was deleted.

lib/nebulex/adapters/multilevel.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -561,7 +561,7 @@ defmodule Nebulex.Adapters.Multilevel do
561561
|> levels(ml_opts)
562562
|> Enum.reduce([node()], fn %{name: name, cache: cache}, acc ->
563563
if cache.__adapter__() in [Nebulex.Adapters.Partitioned, Nebulex.Adapters.Replicated] do
564-
Cluster.get_nodes(name || cache) ++ acc
564+
Cluster.pg_nodes(name || cache) ++ acc
565565
else
566566
acc
567567
end

lib/nebulex/adapters/partitioned.ex

Lines changed: 68 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ defmodule Nebulex.Adapters.Partitioned do
66
77
* Partitioned cache topology (Sharding Distribution Model).
88
* Configurable primary storage adapter.
9-
* Configurable Keyslot to distributed the keys across the cluster members.
9+
* `ExHashRing` for distributing the keys across the cluster members.
1010
* Support for transactions via Erlang global name registration facility.
1111
1212
## Partitioned Cache Topology
@@ -102,29 +102,10 @@ defmodule Nebulex.Adapters.Partitioned do
102102
primary_storage_adapter: Nebulex.Adapters.Local
103103
end
104104
105-
Also, you can provide a custom keyslot function:
106-
107-
defmodule MyApp.PartitionedCache do
108-
use Nebulex.Cache,
109-
otp_app: :my_app,
110-
adapter: Nebulex.Adapters.Partitioned,
111-
primary_storage_adapter: Nebulex.Adapters.Local
112-
113-
@behaviour Nebulex.Adapter.Keyslot
114-
115-
@impl true
116-
def hash_slot(key, range) do
117-
key
118-
|> :erlang.phash2()
119-
|> :jchash.compute(range)
120-
end
121-
end
122-
123105
Where the configuration for the cache must be in your application environment,
124106
usually defined in your `config/config.exs`:
125107
126108
config :my_app, MyApp.PartitionedCache,
127-
keyslot: MyApp.PartitionedCache,
128109
primary: [
129110
gc_interval: 3_600_000,
130111
backend: :shards
@@ -315,7 +296,9 @@ defmodule Nebulex.Adapters.Partitioned do
315296
316297
Get a cluster node based on the given `key`:
317298
318-
MyCache.get_node("mykey")
299+
MyCache.find_node("mykey")
300+
301+
MyCache.find_node!("mykey")
319302
320303
Joining the cache to the cluster:
321304
@@ -348,9 +331,6 @@ defmodule Nebulex.Adapters.Partitioned do
348331
# Inherit default observable implementation
349332
use Nebulex.Adapter.Observable
350333

351-
# Inherit default keyslot implementation
352-
use Nebulex.Adapter.Keyslot
353-
354334
import Nebulex.Adapter
355335
import Nebulex.Utils
356336

@@ -388,14 +368,30 @@ defmodule Nebulex.Adapters.Partitioned do
388368
A convenience function for getting the cluster nodes.
389369
"""
390370
def nodes(name \\ get_dynamic_cache()) do
391-
Cluster.get_nodes(name)
371+
name
372+
|> lookup_meta()
373+
|> get_in([:hash_ring, :name])
374+
|> Cluster.ring_nodes()
392375
end
393376

394377
@doc """
395378
A convenience function to get the node of the given `key`.
396379
"""
397-
def get_node(name \\ get_dynamic_cache(), key) do
398-
Cluster.get_node(name, key, lookup_meta(name).keyslot)
380+
def find_node(name \\ get_dynamic_cache(), key) do
381+
name
382+
|> lookup_meta()
383+
|> get_in([:hash_ring, :name])
384+
|> Cluster.find_node(key)
385+
end
386+
387+
@doc """
388+
Same as `find_node/2` but raises an error if an error occurs.
389+
"""
390+
def find_node!(name \\ get_dynamic_cache(), key) do
391+
case find_node(name, key) do
392+
{:ok, node} -> node
393+
{:error, _} = error -> raise error
394+
end
399395
end
400396

401397
@doc """
@@ -440,16 +436,19 @@ defmodule Nebulex.Adapters.Partitioned do
440436
do: [name: camelize_and_concat([name, Primary])] ++ primary_opts,
441437
else: primary_opts
442438

443-
# Keyslot module for selecting nodes
444-
keyslot = Keyword.fetch!(opts, :keyslot)
439+
# Hash ring options
440+
hash_ring =
441+
opts
442+
|> Keyword.fetch!(:hash_ring)
443+
|> Keyword.put_new_lazy(:name, fn -> camelize_and_concat([name, Ring]) end)
445444

446445
# Prepare metadata
447446
adapter_meta = %{
448447
telemetry_prefix: telemetry_prefix,
449448
telemetry: telemetry,
450449
name: name,
451450
primary_name: primary_opts[:name],
452-
keyslot: keyslot
451+
hash_ring: hash_ring
453452
}
454453

455454
# Prepare child spec
@@ -525,6 +524,9 @@ defmodule Nebulex.Adapters.Partitioned do
525524
end
526525

527526
case map_reduce(entries, adapter_meta, action, [opts], timeout, {true, []}, reducer) do
527+
{:error, _} = error ->
528+
error
529+
528530
{true, _} ->
529531
{:ok, true}
530532

@@ -638,12 +640,13 @@ defmodule Nebulex.Adapters.Partitioned do
638640
end
639641
end
640642

641-
defp do_execute(adapter_meta, %{op: op} = query, opts) do
643+
defp do_execute(%{hash_ring: hash_ring} = adapter_meta, %{op: op} = query, opts) do
644+
ring = Keyword.fetch!(hash_ring, :name)
642645
timeout = Keyword.fetch!(opts, :timeout)
643646
query = build_query(query)
644647

645648
RPC.multicall(
646-
Cluster.get_nodes(adapter_meta.name),
649+
Cluster.ring_nodes(ring),
647650
__MODULE__,
648651
:with_dynamic_cache,
649652
[adapter_meta, op, [query, opts]],
@@ -687,11 +690,13 @@ defmodule Nebulex.Adapters.Partitioned do
687690
## Nebulex.Adapter.Transaction
688691

689692
@impl true
690-
def transaction(adapter_meta, fun, opts) do
693+
def transaction(%{hash_ring: hash_ring} = adapter_meta, fun, opts) do
694+
ring = Keyword.fetch!(hash_ring, :name)
695+
691696
opts =
692697
opts
693698
|> Options.validate_common_runtime_opts!()
694-
|> Keyword.put(:nodes, Cluster.get_nodes(adapter_meta.name))
699+
|> Keyword.put(:nodes, Cluster.ring_nodes(ring))
695700

696701
super(adapter_meta, fun, opts)
697702
end
@@ -722,8 +727,10 @@ defmodule Nebulex.Adapters.Partitioned do
722727
super(adapter_meta, :server, opts)
723728
end
724729

725-
def info(adapter_meta, :nodes, _opts) do
726-
{:ok, Cluster.get_nodes(adapter_meta.name)}
730+
def info(%{hash_ring: hash_ring}, :nodes, _opts) do
731+
ring = Keyword.fetch!(hash_ring, :name)
732+
733+
{:ok, Cluster.ring_nodes(ring)}
727734
end
728735

729736
def info(adapter_meta, :nodes_info, opts) do
@@ -778,11 +785,12 @@ defmodule Nebulex.Adapters.Partitioned do
778785
end
779786
end
780787

781-
defp fetch_nodes_info(adapter_meta, spec, opts) do
788+
defp fetch_nodes_info(%{hash_ring: hash_ring} = adapter_meta, spec, opts) do
782789
opts = Options.validate_common_runtime_opts!(opts)
790+
ring = Keyword.fetch!(hash_ring, :name)
783791

784792
RPC.multicall(
785-
Cluster.get_nodes(adapter_meta.name),
793+
Cluster.ring_nodes(ring),
786794
__MODULE__,
787795
:with_dynamic_cache,
788796
[adapter_meta, :info, [spec, opts]],
@@ -839,16 +847,18 @@ defmodule Nebulex.Adapters.Partitioned do
839847
end
840848
end
841849

842-
defp get_node(%{name: name, keyslot: keyslot}, key) do
843-
Cluster.get_node(name, key, keyslot)
850+
defp find_node(%{hash_ring: hash_ring}, key) do
851+
hash_ring
852+
|> Keyword.fetch!(:name)
853+
|> Cluster.find_node(key)
844854
end
845855

846856
defp call(adapter_meta, key, action, args, opts) do
847857
timeout = Keyword.fetch!(opts, :timeout)
848858

849-
adapter_meta
850-
|> get_node(key)
851-
|> RPC.call(__MODULE__, :with_dynamic_cache, [adapter_meta, action, args], timeout)
859+
with {:ok, node} <- find_node(adapter_meta, key) do
860+
RPC.call(node, __MODULE__, :with_dynamic_cache, [adapter_meta, action, args], timeout)
861+
end
852862
end
853863

854864
defp map_reduce(enum, meta, action, args, timeout, acc, reducer, group_fun \\ & &1) do
@@ -858,14 +868,23 @@ defmodule Nebulex.Adapters.Partitioned do
858868
{node, {__MODULE__, :with_dynamic_cache, [meta, action, [group_fun.(group) | args]]}}
859869
end)
860870
|> RPC.multi_mfa_call(timeout, acc, reducer)
871+
catch
872+
error -> error
861873
end
862874

863875
defp group_by_node(enum, adapter_meta, action) when action in [:put_all, :put_new_all] do
864-
Enum.group_by(enum, &get_node(adapter_meta, elem(&1, 0)))
876+
Enum.group_by(enum, &find_node_or_throw(adapter_meta, elem(&1, 0)))
865877
end
866878

867879
defp group_by_node(enum, adapter_meta, _action) do
868-
Enum.group_by(enum, &get_node(adapter_meta, &1))
880+
Enum.group_by(enum, &find_node_or_throw(adapter_meta, &1))
881+
end
882+
883+
defp find_node_or_throw(adapter_meta, key) do
884+
case find_node(adapter_meta, key) do
885+
{:ok, node} -> node
886+
{:error, _} = error -> throw(error)
887+
end
869888
end
870889

871890
defp handle_rpc_multi_call({res, []}, _action, fun) do
@@ -879,9 +898,11 @@ defmodule Nebulex.Adapters.Partitioned do
879898
action: action
880899
end
881900

901+
## Error formatting
902+
882903
@doc false
883-
def format_error({:rpc, {:unexpected_errors, errors}}, opts) do
884-
action = Keyword.fetch!(opts, :action)
904+
def format_error({:rpc, {:unexpected_errors, errors}}, metadata) do
905+
action = Keyword.fetch!(metadata, :action)
885906

886907
formatted_errors =
887908
Enum.map_join(errors, "\n\n", fn {{:error, reason}, node} ->

0 commit comments

Comments
 (0)