Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions bench/bench_skiplist.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
open Saturn

let workload num_elems num_threads add remove =
let sl = Skiplist.create () in
let elems = Array.init num_elems (fun _ -> Random.int 10000) in
let push () =
Domain.spawn (fun () ->
let start_time = Unix.gettimeofday () in
for i = 0 to (num_elems - 1) / num_threads do
Domain.cpu_relax ();
let prob = Random.float 1.0 in
if prob < add then Skiplist.add sl (Random.int 10000) |> ignore
else if prob >= add && prob < add +. remove then
Skiplist.remove sl (Random.int 10000) |> ignore
else Skiplist.mem sl elems.(i) |> ignore
Copy link
Contributor

@polytypic polytypic Nov 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... The use of elems array here and above is a bit strange. A huge array is initialized and then only the fraction for a single thread is used. Why not use a Random.int here as well?

done;
start_time)
in
let threads = List.init num_threads (fun _ -> push ()) in
let start_time_threads =
List.map (fun domain -> Domain.join domain) threads
in
let end_time = Unix.gettimeofday () in
let time_diff = end_time -. List.nth start_time_threads 0 in
Copy link
Contributor

@polytypic polytypic Nov 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... The timing collection seems to just use the start time of the first domain and the time after joining all of the domains.

How about:

  1. Use a barrier to synchronize all the domains before their loops.
  2. Individually time the loop inside each domain and return that from each domain.

Various measures could then be calculated from the collection of timings. E.g. compute average of the times from each domain to get roughly the same kind of measurement as here, but taking all domains into account rather than the start time of the first domain and the end time (+ some) of the domain that finished last.

time_diff

(* A write heavy workload with threads with 50% adds and 50% removes. *)
let write_heavy_workload num_elems num_threads =
workload num_elems num_threads 0.5 0.5

(* A regular workload with 90% reads, 9% adds and 1% removes. *)
let read_heavy_workload num_elems num_threads =
workload num_elems num_threads 0.09 0.01

let moderate_heavy_workload num_elems num_threads =
workload num_elems num_threads 0.2 0.1

let balanced_heavy_workload num_elems num_threads =
workload num_elems num_threads 0.3 0.2

let bench ~workload_type ~num_elems ~num_threads () =
let workload =
if workload_type = "read_heavy" then read_heavy_workload
else if workload_type = "moderate_heavy" then moderate_heavy_workload
else if workload_type = "balanced_heavy" then balanced_heavy_workload
else write_heavy_workload
in
let results = ref [] in
for i = 1 to 10 do
let time = workload num_elems num_threads in
if i > 1 then results := time :: !results
done;
let results = List.sort Float.compare !results in
let median_time = List.nth results 4 in
let median_throughput = Float.of_int num_elems /. median_time in
Benchmark_result.create_generic ~median_time ~median_throughput
("atomic_skiplist_" ^ workload_type)
4 changes: 4 additions & 0 deletions bench/main.ml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ let benchmark_list =
Mpmc_queue.bench ~use_cas:true ~takers:4 ~pushers:4;
Mpmc_queue.bench ~use_cas:true ~takers:1 ~pushers:8;
Mpmc_queue.bench ~use_cas:true ~takers:8 ~pushers:1;
Bench_skiplist.bench ~workload_type:"read_heavy" ~num_elems:2000000
~num_threads:2;
Bench_skiplist.bench ~workload_type:"moderate_heavy" ~num_elems:2000000
~num_threads:2;
]
@ backoff_benchmarks

Expand Down
1 change: 1 addition & 0 deletions src/saturn.ml
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,5 @@ module Single_prod_single_cons_queue =

module Single_consumer_queue = Saturn_lockfree.Single_consumer_queue
module Relaxed_queue = Mpmc_relaxed_queue
module Skiplist = Saturn_lockfree.Skiplist
module Backoff = Saturn_lockfree.Backoff
1 change: 1 addition & 0 deletions src/saturn.mli
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ module Single_prod_single_cons_queue =

module Single_consumer_queue = Saturn_lockfree.Single_consumer_queue
module Relaxed_queue = Mpmc_relaxed_queue
module Skiplist = Saturn_lockfree.Skiplist

module Backoff = Saturn_lockfree.Backoff
(** {2 Other} *)
1 change: 1 addition & 0 deletions src_lockfree/saturn_lockfree.ml
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,5 @@ module Work_stealing_deque = Ws_deque
module Single_prod_single_cons_queue = Spsc_queue
module Single_consumer_queue = Mpsc_queue
module Relaxed_queue = Mpmc_relaxed_queue
module Skiplist = Skiplist
module Backoff = Backoff
1 change: 1 addition & 0 deletions src_lockfree/saturn_lockfree.mli
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ module Work_stealing_deque = Ws_deque
module Single_prod_single_cons_queue = Spsc_queue
module Single_consumer_queue = Mpsc_queue
module Relaxed_queue = Mpmc_relaxed_queue
module Skiplist = Skiplist

(** {2 Other} *)

Expand Down
193 changes: 193 additions & 0 deletions src_lockfree/skiplist.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
type 'a markable_reference = { node : 'a node; marked : bool }

and 'a node = {
key : 'a;
height : int;
next : 'a markable_reference Atomic.t array;
}

exception Failed_snip

type 'a t = { head : 'a node; max_height : int }

let min = Obj.new_block 5 5 |> Obj.obj
let max = Obj.new_block 5 5 |> Obj.obj
let null_node = { key = max; height = 0; next = [||] }

let[@inline] create_dummy_node_array sl =
Array.make (sl.max_height + 1) null_node

let[@inline] create_new_node value height =
let next =
Array.init (height + 1) (fun _ ->
Atomic.make { node = null_node; marked = false })
in
{ key = value; height; next }

let[@inline] get_random_level sl =
let rec count_level cur_level =
if Random.bool () then cur_level
else if cur_level == sl.max_height then count_level 0
else count_level (cur_level + 1)
in
if sl.max_height = 0 then 0 else count_level 0

let create ?(max_height = 10) () =
let max_height = Int.max max_height 1 in
let tail = create_new_node max (max_height - 1) in
let next =
Array.init max_height (fun _ -> Atomic.make { node = tail; marked = false })
in
let head = { key = min; height = max_height - 1; next } in
{ head; max_height = max_height - 1 }

(** Compares old_node and old_mark with the atomic reference and if they are the same then
Replaces the value in the atomic with node and mark *)
let compare_and_set_mark_ref (atomic, old_node, old_mark, node, mark) =
let ({ node = current_node; marked = current_marked } as current) =
Atomic.get atomic
in
current_node == old_node && current_marked = old_mark
&& ((current_node == node && current_marked = mark)
|| Atomic.compare_and_set atomic current { node; marked = mark })

(** Returns true if key is found within the skiplist else false;
Irrespective of return value, fills the preds and succs array with
the predecessors nodes with smaller key and successors nodes with greater than
or equal to key
*)
let find_in (key, preds, succs, sl) =
let head = sl.head in
let rec iterate (prev, curr, succ, mark, level) =
if mark then
let snip =
compare_and_set_mark_ref (prev.next.(level), curr, false, succ, false)
in
if not snip then raise Failed_snip
else
let { node = new_succ; marked = mark } = Atomic.get succ.next.(level) in
iterate (prev, succ, new_succ, mark, level)
else if curr.key != max && curr.key < key then
let { node = new_succ; marked = mark } = Atomic.get succ.next.(level) in
iterate (curr, succ, new_succ, mark, level)
else (prev, curr)
in
let rec update_arrays prev level =
let { node = curr; marked = _ } = Atomic.get prev.next.(level) in
let { node = succ; marked = mark } = Atomic.get curr.next.(level) in
try
let prev, curr = iterate (prev, curr, succ, mark, level) in
preds.(level) <- prev;
succs.(level) <- curr;
if level > 0 then update_arrays prev (level - 1)
else if curr.key == max then false
else curr.key = key
with Failed_snip -> update_arrays head sl.max_height
in
update_arrays head sl.max_height

(** Adds a new key to the skiplist sl. *)
let add sl key =
let top_level = get_random_level sl in
let preds = create_dummy_node_array sl in
let succs = create_dummy_node_array sl in
let rec repeat () =
let found = find_in (key, preds, succs, sl) in
if found then false
else
let new_node_next =
Array.map
(fun element ->
let mark_ref = { node = element; marked = false } in
Atomic.make mark_ref)
succs
Copy link
Contributor

@polytypic polytypic Nov 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... Aside from the array created here being too large, I believe there might be a more subtle space leak issue.

What happens here is that a new node is being constructed and as a part of that an array of references to other nodes is created. A particular reference of this array is not updated during add nor are the references in the array created here subject to updates until a reference to the new_node is added to a predecessor node at a specific level (except when the node being added might be removed after it has been linked on level 0).

Consider the following scenario:

  1. A domain performing add is suspended after creating the new_node_next array.
  2. Another domain removes one of the nodes to which the new_node_next array has a reference at a non-zero level.
  3. The domain performing add is resumed and completes the operation.

What will happen then that the add will notice that a successor node was removed at around line 118+ as the compare_and_set_mark_ref fails. The add will then call find_in to update the preds and succs. This will not, however, update the reference in the new_node_next array, which was created based on an earlier succs. That is because the reference is at a level on which the new_node is not yet attached to the skip list (that is because the compare_and_set_mark_ref failed).

This means that after add returns, the new_node has been added to the skip list and the new_node contains a reference to a removed node. This means that the key contained in that removed node cannot be garbage collected. It will remain in memory until some call to find_in will notice that the removed node (due to marked references) and removes it. But there is no guarantee such a call will happen. It might never happen.

Am I missing something?

in
let new_node = { key; height = top_level; next = new_node_next } in
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The next array of the new node is initialized to contain as many elements as the maximum number of levels, but only a part (according to height) of those are actually used.

let pred = preds.(0) in
let succ = succs.(0) in
if
not
(compare_and_set_mark_ref
(pred.next.(0), succ, false, new_node, false))
then repeat ()
else
let rec update_levels level =
let rec set_next () =
let pred = preds.(level) in
let succ = succs.(level) in
if
compare_and_set_mark_ref
(pred.next.(level), succ, false, new_node, false)
then ()
else (
find_in (key, preds, succs, sl) |> ignore;
set_next ())
in
set_next ();
if level < top_level then update_levels (level + 1)
in
if top_level > 0 then update_levels 1;
true
in
repeat ()

let mem sl key =
let rec search (pred, curr, succ, mark, level) =
if mark then
let { node = new_succ; marked = mark } = Atomic.get succ.next.(level) in
search (pred, succ, new_succ, mark, level)
else if curr.key != max && curr.key < key then
let { node = new_succ; marked = mark } = Atomic.get succ.next.(level) in
search (curr, succ, new_succ, mark, level)
else if level > 0 then
let level = level - 1 in
let { node = curr; marked = _ } = Atomic.get pred.next.(level) in
let { node = succ; marked = mark } = Atomic.get curr.next.(level) in
search (pred, curr, succ, mark, level)
else if curr.key == max then false
else curr.key = key
in
let pred = sl.head in
let { node = curr; marked = _ } = Atomic.get pred.next.(sl.max_height) in
let { node = succ; marked = mark } = Atomic.get curr.next.(sl.max_height) in
search (pred, curr, succ, mark, sl.max_height)

let remove sl key =
let preds = create_dummy_node_array sl in
let succs = create_dummy_node_array sl in
let found = find_in (key, preds, succs, sl) in
if not found then false
else
let nodeToRemove = succs.(0) in
let nodeHeight = nodeToRemove.height in
let rec mark_levels succ level =
let _ =
compare_and_set_mark_ref
(nodeToRemove.next.(level), succ, false, succ, true)
in
let { node = succ; marked = mark } =
Atomic.get nodeToRemove.next.(level)
in
if not mark then mark_levels succ level
in
let rec update_upper_levels level =
let { node = succ; marked = mark } =
Atomic.get nodeToRemove.next.(level)
in
if not mark then mark_levels succ level;
if level > 1 then update_upper_levels (level - 1)
in
let rec update_bottom_level succ =
let iMarkedIt =
compare_and_set_mark_ref (nodeToRemove.next.(0), succ, false, succ, true)
in
let { node = succ; marked = mark } = Atomic.get succs.(0).next.(0) in
if iMarkedIt then (
find_in (key, preds, succs, sl) |> ignore;
true)
else if mark then false
else update_bottom_level succ
in
if nodeHeight > 0 then update_upper_levels nodeHeight;
let { node = succ; marked = _ } = Atomic.get nodeToRemove.next.(0) in
update_bottom_level succ
24 changes: 24 additions & 0 deletions src_lockfree/skiplist.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
(** Skiplist TODO

[key] values are compared with [=] and thus should not be functions or
objects.
*)

type 'a t
(** The type of lock-free skiplist. *)

val create : ?max_height:int -> unit -> 'a t
(** [create ~max_height ()] returns a new empty skiplist. [~max_height] is the
number of level used to distribute nodes. Its default value is 10 by default
and can not be less than 1. *)

val add : 'a t -> 'a -> bool
(** [add s v] adds [v] to [s] if [v] is not already in [s] and returns
[true]. If [v] is already in [s], it returns [false] and [v] is unchanged. *)

val remove : 'a t -> 'a -> bool
(** [remove s v] removes [v] of [s] if [v] is in [s] and returns [true]. If [v]
is not in [s], it returns [false] and [v] is unchanged. *)

val mem : 'a t -> 'a -> bool
(** [mem s v] returns [true] if v is in s and [false] otherwise. *)
22 changes: 22 additions & 0 deletions test/skiplist/dune
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
(rule
(copy ../../src_lockfree/backoff.ml backoff.ml))

(rule
(copy ../../src_lockfree/skiplist.ml skiplist.ml))

(test
(name skiplist_dscheck)
(libraries atomic dscheck alcotest)
(modules skiplist skiplist_dscheck))

(test
(name qcheck_skiplist)
(libraries saturn qcheck qcheck-alcotest)
(modules qcheck_skiplist))

(test
(name stm_skiplist)
(modules stm_skiplist)
(libraries saturn qcheck-stm.sequential qcheck-stm.domain)
(action
(run %{test} --verbose)))
Loading