diff --git a/bench/bench_skiplist.ml b/bench/bench_skiplist.ml new file mode 100644 index 00000000..285dc450 --- /dev/null +++ b/bench/bench_skiplist.ml @@ -0,0 +1,57 @@ +open Saturn + +let workload num_elems num_threads add remove = + let sl = Skiplist.create () in + let elems = Array.init num_elems (fun _ -> Random.int 10000) in + let push () = + Domain.spawn (fun () -> + let start_time = Unix.gettimeofday () in + for i = 0 to (num_elems - 1) / num_threads do + Domain.cpu_relax (); + let prob = Random.float 1.0 in + if prob < add then Skiplist.add sl (Random.int 10000) |> ignore + else if prob >= add && prob < add +. remove then + Skiplist.remove sl (Random.int 10000) |> ignore + else Skiplist.mem sl elems.(i) |> ignore + done; + start_time) + in + let threads = List.init num_threads (fun _ -> push ()) in + let start_time_threads = + List.map (fun domain -> Domain.join domain) threads + in + let end_time = Unix.gettimeofday () in + let time_diff = end_time -. List.nth start_time_threads 0 in + time_diff + +(* A write heavy workload with threads with 50% adds and 50% removes. *) +let write_heavy_workload num_elems num_threads = + workload num_elems num_threads 0.5 0.5 + +(* A regular workload with 90% reads, 9% adds and 1% removes. *) +let read_heavy_workload num_elems num_threads = + workload num_elems num_threads 0.09 0.01 + +let moderate_heavy_workload num_elems num_threads = + workload num_elems num_threads 0.2 0.1 + +let balanced_heavy_workload num_elems num_threads = + workload num_elems num_threads 0.3 0.2 + +let bench ~workload_type ~num_elems ~num_threads () = + let workload = + if workload_type = "read_heavy" then read_heavy_workload + else if workload_type = "moderate_heavy" then moderate_heavy_workload + else if workload_type = "balanced_heavy" then balanced_heavy_workload + else write_heavy_workload + in + let results = ref [] in + for i = 1 to 10 do + let time = workload num_elems num_threads in + if i > 1 then results := time :: !results + done; + let results = List.sort Float.compare !results in + let median_time = List.nth results 4 in + let median_throughput = Float.of_int num_elems /. median_time in + Benchmark_result.create_generic ~median_time ~median_throughput + ("atomic_skiplist_" ^ workload_type) diff --git a/bench/main.ml b/bench/main.ml index 95632970..ecae2a54 100644 --- a/bench/main.ml +++ b/bench/main.ml @@ -16,6 +16,10 @@ let benchmark_list = Mpmc_queue.bench ~use_cas:true ~takers:4 ~pushers:4; Mpmc_queue.bench ~use_cas:true ~takers:1 ~pushers:8; Mpmc_queue.bench ~use_cas:true ~takers:8 ~pushers:1; + Bench_skiplist.bench ~workload_type:"read_heavy" ~num_elems:2000000 + ~num_threads:2; + Bench_skiplist.bench ~workload_type:"moderate_heavy" ~num_elems:2000000 + ~num_threads:2; ] @ backoff_benchmarks diff --git a/src/saturn.ml b/src/saturn.ml index d3b71e46..e0f53cad 100644 --- a/src/saturn.ml +++ b/src/saturn.ml @@ -35,4 +35,5 @@ module Single_prod_single_cons_queue = module Single_consumer_queue = Saturn_lockfree.Single_consumer_queue module Relaxed_queue = Mpmc_relaxed_queue +module Skiplist = Saturn_lockfree.Skiplist module Backoff = Saturn_lockfree.Backoff diff --git a/src/saturn.mli b/src/saturn.mli index 1a9d56f5..b68db595 100644 --- a/src/saturn.mli +++ b/src/saturn.mli @@ -39,6 +39,7 @@ module Single_prod_single_cons_queue = module Single_consumer_queue = Saturn_lockfree.Single_consumer_queue module Relaxed_queue = Mpmc_relaxed_queue +module Skiplist = Saturn_lockfree.Skiplist module Backoff = Saturn_lockfree.Backoff (** {2 Other} *) diff --git a/src_lockfree/saturn_lockfree.ml b/src_lockfree/saturn_lockfree.ml index ca2e063a..fcbf4fbf 100644 --- a/src_lockfree/saturn_lockfree.ml +++ b/src_lockfree/saturn_lockfree.ml @@ -32,4 +32,5 @@ module Work_stealing_deque = Ws_deque module Single_prod_single_cons_queue = Spsc_queue module Single_consumer_queue = Mpsc_queue module Relaxed_queue = Mpmc_relaxed_queue +module Skiplist = Skiplist module Backoff = Backoff diff --git a/src_lockfree/saturn_lockfree.mli b/src_lockfree/saturn_lockfree.mli index d70939f5..683eb6a9 100644 --- a/src_lockfree/saturn_lockfree.mli +++ b/src_lockfree/saturn_lockfree.mli @@ -36,6 +36,7 @@ module Work_stealing_deque = Ws_deque module Single_prod_single_cons_queue = Spsc_queue module Single_consumer_queue = Mpsc_queue module Relaxed_queue = Mpmc_relaxed_queue +module Skiplist = Skiplist (** {2 Other} *) diff --git a/src_lockfree/skiplist.ml b/src_lockfree/skiplist.ml new file mode 100644 index 00000000..936eb82b --- /dev/null +++ b/src_lockfree/skiplist.ml @@ -0,0 +1,193 @@ +type 'a markable_reference = { node : 'a node; marked : bool } + +and 'a node = { + key : 'a; + height : int; + next : 'a markable_reference Atomic.t array; +} + +exception Failed_snip + +type 'a t = { head : 'a node; max_height : int } + +let min = Obj.new_block 5 5 |> Obj.obj +let max = Obj.new_block 5 5 |> Obj.obj +let null_node = { key = max; height = 0; next = [||] } + +let[@inline] create_dummy_node_array sl = + Array.make (sl.max_height + 1) null_node + +let[@inline] create_new_node value height = + let next = + Array.init (height + 1) (fun _ -> + Atomic.make { node = null_node; marked = false }) + in + { key = value; height; next } + +let[@inline] get_random_level sl = + let rec count_level cur_level = + if Random.bool () then cur_level + else if cur_level == sl.max_height then count_level 0 + else count_level (cur_level + 1) + in + if sl.max_height = 0 then 0 else count_level 0 + +let create ?(max_height = 10) () = + let max_height = Int.max max_height 1 in + let tail = create_new_node max (max_height - 1) in + let next = + Array.init max_height (fun _ -> Atomic.make { node = tail; marked = false }) + in + let head = { key = min; height = max_height - 1; next } in + { head; max_height = max_height - 1 } + +(** Compares old_node and old_mark with the atomic reference and if they are the same then + Replaces the value in the atomic with node and mark *) +let compare_and_set_mark_ref (atomic, old_node, old_mark, node, mark) = + let ({ node = current_node; marked = current_marked } as current) = + Atomic.get atomic + in + current_node == old_node && current_marked = old_mark + && ((current_node == node && current_marked = mark) + || Atomic.compare_and_set atomic current { node; marked = mark }) + +(** Returns true if key is found within the skiplist else false; + Irrespective of return value, fills the preds and succs array with + the predecessors nodes with smaller key and successors nodes with greater than + or equal to key + *) +let find_in (key, preds, succs, sl) = + let head = sl.head in + let rec iterate (prev, curr, succ, mark, level) = + if mark then + let snip = + compare_and_set_mark_ref (prev.next.(level), curr, false, succ, false) + in + if not snip then raise Failed_snip + else + let { node = new_succ; marked = mark } = Atomic.get succ.next.(level) in + iterate (prev, succ, new_succ, mark, level) + else if curr.key != max && curr.key < key then + let { node = new_succ; marked = mark } = Atomic.get succ.next.(level) in + iterate (curr, succ, new_succ, mark, level) + else (prev, curr) + in + let rec update_arrays prev level = + let { node = curr; marked = _ } = Atomic.get prev.next.(level) in + let { node = succ; marked = mark } = Atomic.get curr.next.(level) in + try + let prev, curr = iterate (prev, curr, succ, mark, level) in + preds.(level) <- prev; + succs.(level) <- curr; + if level > 0 then update_arrays prev (level - 1) + else if curr.key == max then false + else curr.key = key + with Failed_snip -> update_arrays head sl.max_height + in + update_arrays head sl.max_height + +(** Adds a new key to the skiplist sl. *) +let add sl key = + let top_level = get_random_level sl in + let preds = create_dummy_node_array sl in + let succs = create_dummy_node_array sl in + let rec repeat () = + let found = find_in (key, preds, succs, sl) in + if found then false + else + let new_node_next = + Array.map + (fun element -> + let mark_ref = { node = element; marked = false } in + Atomic.make mark_ref) + succs + in + let new_node = { key; height = top_level; next = new_node_next } in + let pred = preds.(0) in + let succ = succs.(0) in + if + not + (compare_and_set_mark_ref + (pred.next.(0), succ, false, new_node, false)) + then repeat () + else + let rec update_levels level = + let rec set_next () = + let pred = preds.(level) in + let succ = succs.(level) in + if + compare_and_set_mark_ref + (pred.next.(level), succ, false, new_node, false) + then () + else ( + find_in (key, preds, succs, sl) |> ignore; + set_next ()) + in + set_next (); + if level < top_level then update_levels (level + 1) + in + if top_level > 0 then update_levels 1; + true + in + repeat () + +let mem sl key = + let rec search (pred, curr, succ, mark, level) = + if mark then + let { node = new_succ; marked = mark } = Atomic.get succ.next.(level) in + search (pred, succ, new_succ, mark, level) + else if curr.key != max && curr.key < key then + let { node = new_succ; marked = mark } = Atomic.get succ.next.(level) in + search (curr, succ, new_succ, mark, level) + else if level > 0 then + let level = level - 1 in + let { node = curr; marked = _ } = Atomic.get pred.next.(level) in + let { node = succ; marked = mark } = Atomic.get curr.next.(level) in + search (pred, curr, succ, mark, level) + else if curr.key == max then false + else curr.key = key + in + let pred = sl.head in + let { node = curr; marked = _ } = Atomic.get pred.next.(sl.max_height) in + let { node = succ; marked = mark } = Atomic.get curr.next.(sl.max_height) in + search (pred, curr, succ, mark, sl.max_height) + +let remove sl key = + let preds = create_dummy_node_array sl in + let succs = create_dummy_node_array sl in + let found = find_in (key, preds, succs, sl) in + if not found then false + else + let nodeToRemove = succs.(0) in + let nodeHeight = nodeToRemove.height in + let rec mark_levels succ level = + let _ = + compare_and_set_mark_ref + (nodeToRemove.next.(level), succ, false, succ, true) + in + let { node = succ; marked = mark } = + Atomic.get nodeToRemove.next.(level) + in + if not mark then mark_levels succ level + in + let rec update_upper_levels level = + let { node = succ; marked = mark } = + Atomic.get nodeToRemove.next.(level) + in + if not mark then mark_levels succ level; + if level > 1 then update_upper_levels (level - 1) + in + let rec update_bottom_level succ = + let iMarkedIt = + compare_and_set_mark_ref (nodeToRemove.next.(0), succ, false, succ, true) + in + let { node = succ; marked = mark } = Atomic.get succs.(0).next.(0) in + if iMarkedIt then ( + find_in (key, preds, succs, sl) |> ignore; + true) + else if mark then false + else update_bottom_level succ + in + if nodeHeight > 0 then update_upper_levels nodeHeight; + let { node = succ; marked = _ } = Atomic.get nodeToRemove.next.(0) in + update_bottom_level succ diff --git a/src_lockfree/skiplist.mli b/src_lockfree/skiplist.mli new file mode 100644 index 00000000..bc18575e --- /dev/null +++ b/src_lockfree/skiplist.mli @@ -0,0 +1,24 @@ +(** Skiplist TODO + + [key] values are compared with [=] and thus should not be functions or + objects. +*) + +type 'a t +(** The type of lock-free skiplist. *) + +val create : ?max_height:int -> unit -> 'a t +(** [create ~max_height ()] returns a new empty skiplist. [~max_height] is the + number of level used to distribute nodes. Its default value is 10 by default + and can not be less than 1. *) + +val add : 'a t -> 'a -> bool +(** [add s v] adds [v] to [s] if [v] is not already in [s] and returns + [true]. If [v] is already in [s], it returns [false] and [v] is unchanged. *) + +val remove : 'a t -> 'a -> bool +(** [remove s v] removes [v] of [s] if [v] is in [s] and returns [true]. If [v] + is not in [s], it returns [false] and [v] is unchanged. *) + +val mem : 'a t -> 'a -> bool +(** [mem s v] returns [true] if v is in s and [false] otherwise. *) diff --git a/test/skiplist/dune b/test/skiplist/dune new file mode 100644 index 00000000..3ffaa883 --- /dev/null +++ b/test/skiplist/dune @@ -0,0 +1,22 @@ +(rule + (copy ../../src_lockfree/backoff.ml backoff.ml)) + +(rule + (copy ../../src_lockfree/skiplist.ml skiplist.ml)) + +(test + (name skiplist_dscheck) + (libraries atomic dscheck alcotest) + (modules skiplist skiplist_dscheck)) + +(test + (name qcheck_skiplist) + (libraries saturn qcheck qcheck-alcotest) + (modules qcheck_skiplist)) + +(test + (name stm_skiplist) + (modules stm_skiplist) + (libraries saturn qcheck-stm.sequential qcheck-stm.domain) + (action + (run %{test} --verbose))) diff --git a/test/skiplist/qcheck_skiplist.ml b/test/skiplist/qcheck_skiplist.ml new file mode 100644 index 00000000..94b65f81 --- /dev/null +++ b/test/skiplist/qcheck_skiplist.ml @@ -0,0 +1,182 @@ +module Skiplist = Saturn.Skiplist + +let tests_sequential = + QCheck. + [ + (* TEST 1: add*) + Test.make ~name:"add" (list int) (fun lpush -> + assume (lpush <> []); + let sl = Skiplist.create () in + let rec add_all_elems l = + match l with + | h :: t -> if Skiplist.add sl h then add_all_elems t else false + | [] -> true + in + add_all_elems lpush); + (*TEST 2: add_remove*) + Test.make ~name:"add_remove" (list int) (fun lpush -> + let lpush = List.sort_uniq Int.compare lpush in + let sl = Skiplist.create () in + List.iter (fun key -> ignore (Skiplist.add sl key)) lpush; + let rec remove_all_elems l = + match l with + | h :: t -> + if Skiplist.remove sl h then remove_all_elems t else false + | [] -> true + in + remove_all_elems lpush); + (*TEST 3: add_find*) + Test.make ~name:"add_find" (list int) (fun lpush -> + let lpush = List.sort_uniq Int.compare lpush in + let lpush = Array.of_list lpush in + let sl = Skiplist.create () in + let len = Array.length lpush in + let pos = Array.sub lpush 0 (len / 2) in + let neg = Array.sub lpush (len / 2) (len / 2) in + Array.iter (fun key -> ignore @@ Skiplist.add sl key) pos; + let rec check_pos index = + if index < len / 2 then + if Skiplist.mem sl pos.(index) then check_pos (index + 1) + else false + else true + in + let rec check_neg index = + if index < len / 2 then + if not @@ Skiplist.mem sl neg.(index) then check_neg (index + 1) + else false + else true + in + check_pos 0 && check_neg 0); + (* TEST 4: add_remove_find *) + Test.make ~name:"add_remove_find" (list int) (fun lpush -> + let lpush = List.sort_uniq Int.compare lpush in + let sl = Skiplist.create () in + List.iter (fun key -> ignore @@ Skiplist.add sl key) lpush; + List.iter (fun key -> ignore @@ Skiplist.remove sl key) lpush; + let rec not_find_all_elems l = + match l with + | h :: t -> + if not @@ Skiplist.mem sl h then not_find_all_elems t else false + | [] -> true + in + + not_find_all_elems lpush); + ] + +let tests_two_domains = + QCheck. + [ + (* TEST 1: Two domains doing multiple adds *) + Test.make ~name:"parallel_add" (pair small_nat small_nat) + (fun (npush1, npush2) -> + let sl = Skiplist.create () in + let sema = Semaphore.Binary.make false in + let lpush1 = List.init npush1 (fun i -> i) in + let lpush2 = List.init npush2 (fun i -> i + npush1) in + let work lpush = + List.map + (fun elt -> + let completed = Skiplist.add sl elt in + Domain.cpu_relax (); + completed) + lpush + in + + let domain1 = + Domain.spawn (fun () -> + Semaphore.Binary.release sema; + work lpush1) + in + let popped2 = + while not (Semaphore.Binary.try_acquire sema) do + Domain.cpu_relax () + done; + work lpush2 + in + let popped1 = Domain.join domain1 in + let rec compare_all_true l = + match l with + | true :: t -> compare_all_true t + | false :: _ -> false + | [] -> true + in + compare_all_true popped1 && compare_all_true popped2); + (* TEST 2: Two domains doing multiple one push and one pop in parallel *) + Test.make ~count:10000 ~name:"parallel_add_remove" + (pair small_nat small_nat) (fun (npush1, npush2) -> + let sl = Skiplist.create () in + let sema = Semaphore.Binary.make false in + + let lpush1 = List.init npush1 (fun i -> i) in + let lpush2 = List.init npush2 (fun i -> i + npush1) in + + let work lpush = + List.map + (fun elt -> + ignore @@ Skiplist.add sl elt; + Domain.cpu_relax (); + Skiplist.remove sl elt) + lpush + in + + let domain1 = + Domain.spawn (fun () -> + Semaphore.Binary.release sema; + work lpush1) + in + let _ = + while not (Semaphore.Binary.try_acquire sema) do + Domain.cpu_relax () + done; + work lpush2 + in + let _ = Domain.join domain1 in + + let rec check_none_present l = + match l with + | h :: t -> + if Skiplist.mem sl h then false else check_none_present t + | [] -> true + in + check_none_present lpush1 && check_none_present lpush2); + (* TEST 3: Parallel push and pop using the same elements in two domains + *) + Test.make ~name:"parallel_add_remove_same_list" (list int) (fun lpush -> + let sl = Skiplist.create () in + let sema = Semaphore.Binary.make false in + let add_all_elems l = List.map (Skiplist.add sl) l in + let remove_all_elems l = List.map (Skiplist.remove sl) l in + + let domain1 = + Domain.spawn (fun () -> + Semaphore.Binary.release sema; + Domain.cpu_relax (); + let add1 = add_all_elems lpush in + let remove1 = remove_all_elems lpush in + (add1, remove1)) + in + let _, _ = + while not (Semaphore.Binary.try_acquire sema) do + Domain.cpu_relax () + done; + let add2 = add_all_elems lpush in + let remove2 = remove_all_elems lpush in + (add2, remove2) + in + let _, _ = Domain.join domain1 in + let rec check_none_present l = + match l with + | h :: t -> + if Skiplist.mem sl h then false else check_none_present t + | [] -> true + in + check_none_present lpush); + ] + +let () = + let to_alcotest = List.map QCheck_alcotest.to_alcotest in + Alcotest.run "Skip List" + [ + ("test_sequential", to_alcotest tests_sequential); + ("tests_two_domains", to_alcotest tests_two_domains); + ] diff --git a/test/skiplist/skiplist_dscheck.ml b/test/skiplist/skiplist_dscheck.ml new file mode 100644 index 00000000..283ced0e --- /dev/null +++ b/test/skiplist/skiplist_dscheck.ml @@ -0,0 +1,97 @@ +open Skiplist + +let _two_mem () = + Atomic.trace (fun () -> + Random.init 0; + let sl = create ~max_height:2 () in + let added1 = ref false in + let found1 = ref false in + let found2 = ref false in + + Atomic.spawn (fun () -> + added1 := add sl 1; + found1 := mem sl 1); + + Atomic.spawn (fun () -> found2 := mem sl 2); + + Atomic.final (fun () -> + Atomic.check (fun () -> !added1 && !found1 && not !found2))) + +let _two_add () = + Atomic.trace (fun () -> + Random.init 0; + let sl = create ~max_height:3 () in + let added1 = ref false in + let added2 = ref false in + + Atomic.spawn (fun () -> added1 := add sl 1); + Atomic.spawn (fun () -> added2 := add sl 2); + + Atomic.final (fun () -> + Atomic.check (fun () -> !added1 && !added2 && mem sl 1 && mem sl 2))) + +let _two_add_same () = + Atomic.trace (fun () -> + Random.init 0; + let sl = create ~max_height:3 () in + let added1 = ref false in + let added2 = ref false in + + Atomic.spawn (fun () -> added1 := add sl 1); + Atomic.spawn (fun () -> added2 := add sl 1); + + Atomic.final (fun () -> + Atomic.check (fun () -> + (!added1 && not !added2) + || (((not !added1) && !added2) && mem sl 1)))) + +let _two_remove_same () = + Atomic.trace (fun () -> + Random.init 0; + let sl = create ~max_height:2 () in + let added1 = ref false in + let removed1 = ref false in + let removed2 = ref false in + + Atomic.spawn (fun () -> + added1 := add sl 1; + removed1 := remove sl 1); + Atomic.spawn (fun () -> removed2 := remove sl 1); + + Atomic.final (fun () -> + Atomic.check (fun () -> + !added1 + && ((!removed1 && not !removed2) || ((not !removed1) && !removed2)) + && not (mem sl 1)))) + +let _two_remove () = + Atomic.trace (fun () -> + Random.init 0; + let sl = create ~max_height:2 () in + let added1 = ref false in + let removed1 = ref false in + let removed2 = ref false in + + Atomic.spawn (fun () -> + added1 := add sl 1; + removed1 := remove sl 1); + Atomic.spawn (fun () -> removed2 := remove sl 2); + + Atomic.final (fun () -> + Atomic.check (fun () -> + let found1 = mem sl 1 in + !added1 && !removed1 && (not !removed2) && not found1))) + +let () = + let open Alcotest in + run "skiplist_dscheck" + [ + ( "basic", + [ + test_case "2-mem" `Slow _two_mem; + test_case "2-add-same" `Slow _two_add_same; + test_case "2-add" `Slow _two_add; + test_case "2-remove-same" `Slow _two_remove_same; + test_case "2-remove" `Slow _two_remove; + ] ); + ] diff --git a/test/skiplist/stm_skiplist.ml b/test/skiplist/stm_skiplist.ml new file mode 100644 index 00000000..0831c4c9 --- /dev/null +++ b/test/skiplist/stm_skiplist.ml @@ -0,0 +1,70 @@ +(** Sequential and Parallel model-based tests of ws_deque *) + +open QCheck +open STM +module Skiplist = Saturn.Skiplist + +module WSDConf = struct + type cmd = Mem of int | Add of int | Remove of int + + let show_cmd c = + match c with + | Mem i -> "Mem " ^ string_of_int i + | Add i -> "Add " ^ string_of_int i + | Remove i -> "Remove " ^ string_of_int i + + module Sint = Set.Make (struct + type t = int + + let compare = compare + end) + + type state = Sint.t + type sut = int Skiplist.t + + let arb_cmd _s = + let int_gen = Gen.nat in + QCheck.make ~print:show_cmd + (Gen.oneof + [ + Gen.map (fun i -> Add i) int_gen; + Gen.map (fun i -> Mem i) int_gen; + Gen.map (fun i -> Remove i) int_gen; + ]) + + let init_state = Sint.empty + let init_sut () = Skiplist.create () + let cleanup _ = () + + let next_state c s = + match c with + | Add i -> Sint.add i s + | Remove i -> Sint.remove i s + | Mem _ -> s + + let precond _ _ = true + + let run c d = + match c with + | Add i -> Res (bool, Skiplist.add d i) + | Remove i -> Res (bool, Skiplist.remove d i) + | Mem i -> Res (bool, Skiplist.mem d i) + + let postcond c (s : state) res = + match (c, res) with + | Add i, Res ((Bool, _), res) -> Sint.mem i s = not res + | Remove i, Res ((Bool, _), res) -> Sint.mem i s = res + | Mem i, Res ((Bool, _), res) -> Sint.mem i s = res + | _, _ -> false +end + +module WSDT_seq = STM_sequential.Make (WSDConf) +module WSDT_dom = STM_domain.Make (WSDConf) + +let () = + let count = 1000 in + QCheck_base_runner.run_tests_main + [ + WSDT_seq.agree_test ~count ~name:"STM Lockfree.Skiplist test sequential"; + WSDT_dom.agree_test_par ~count ~name:"STM Lockfree.Skiplist test parallel"; + ]