-
Notifications
You must be signed in to change notification settings - Fork 30
Lockfree Skip List #65
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
2e6ca9b
0efac79
ad8782a
1d49325
dcf3df2
f733944
6eb5303
20355ce
1aeae33
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,57 @@ | ||
| open Saturn | ||
|
|
||
| let workload num_elems num_threads add remove = | ||
| let sl = Skiplist.create () in | ||
| let elems = Array.init num_elems (fun _ -> Random.int 10000) in | ||
| let push () = | ||
| Domain.spawn (fun () -> | ||
| let start_time = Unix.gettimeofday () in | ||
| for i = 0 to (num_elems - 1) / num_threads do | ||
| Domain.cpu_relax (); | ||
| let prob = Random.float 1.0 in | ||
| if prob < add then Skiplist.add sl (Random.int 10000) |> ignore | ||
| else if prob >= add && prob < add +. remove then | ||
| Skiplist.remove sl (Random.int 10000) |> ignore | ||
| else Skiplist.mem sl elems.(i) |> ignore | ||
| done; | ||
| start_time) | ||
| in | ||
| let threads = List.init num_threads (fun _ -> push ()) in | ||
| let start_time_threads = | ||
| List.map (fun domain -> Domain.join domain) threads | ||
| in | ||
| let end_time = Unix.gettimeofday () in | ||
| let time_diff = end_time -. List.nth start_time_threads 0 in | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm... The timing collection seems to just use the start time of the first domain and the time after joining all of the domains. How about:
Various measures could then be calculated from the collection of timings. E.g. compute average of the times from each domain to get roughly the same kind of measurement as here, but taking all domains into account rather than the start time of the first domain and the end time (+ some) of the domain that finished last. |
||
| time_diff | ||
|
|
||
| (* A write heavy workload with threads with 50% adds and 50% removes. *) | ||
| let write_heavy_workload num_elems num_threads = | ||
| workload num_elems num_threads 0.5 0.5 | ||
|
|
||
| (* A regular workload with 90% reads, 9% adds and 1% removes. *) | ||
| let read_heavy_workload num_elems num_threads = | ||
| workload num_elems num_threads 0.09 0.01 | ||
|
|
||
| let moderate_heavy_workload num_elems num_threads = | ||
| workload num_elems num_threads 0.2 0.1 | ||
|
|
||
| let balanced_heavy_workload num_elems num_threads = | ||
| workload num_elems num_threads 0.3 0.2 | ||
|
|
||
| let bench ~workload_type ~num_elems ~num_threads () = | ||
| let workload = | ||
| if workload_type = "read_heavy" then read_heavy_workload | ||
| else if workload_type = "moderate_heavy" then moderate_heavy_workload | ||
| else if workload_type = "balanced_heavy" then balanced_heavy_workload | ||
| else write_heavy_workload | ||
| in | ||
| let results = ref [] in | ||
| for i = 1 to 10 do | ||
| let time = workload num_elems num_threads in | ||
| if i > 1 then results := time :: !results | ||
| done; | ||
| let results = List.sort Float.compare !results in | ||
| let median_time = List.nth results 4 in | ||
| let median_throughput = Float.of_int num_elems /. median_time in | ||
| Benchmark_result.create_generic ~median_time ~median_throughput | ||
| ("atomic_skiplist_" ^ workload_type) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,193 @@ | ||
| type 'a markable_reference = { node : 'a node; marked : bool } | ||
|
|
||
| and 'a node = { | ||
| key : 'a; | ||
| height : int; | ||
| next : 'a markable_reference Atomic.t array; | ||
| } | ||
|
|
||
| exception Failed_snip | ||
|
|
||
| type 'a t = { head : 'a node; max_height : int } | ||
|
|
||
| let min = Obj.new_block 5 5 |> Obj.obj | ||
| let max = Obj.new_block 5 5 |> Obj.obj | ||
| let null_node = { key = max; height = 0; next = [||] } | ||
|
|
||
| let[@inline] create_dummy_node_array sl = | ||
| Array.make (sl.max_height + 1) null_node | ||
|
|
||
| let[@inline] create_new_node value height = | ||
| let next = | ||
| Array.init (height + 1) (fun _ -> | ||
| Atomic.make { node = null_node; marked = false }) | ||
| in | ||
| { key = value; height; next } | ||
|
|
||
| let[@inline] get_random_level sl = | ||
| let rec count_level cur_level = | ||
| if Random.bool () then cur_level | ||
| else if cur_level == sl.max_height then count_level 0 | ||
| else count_level (cur_level + 1) | ||
| in | ||
| if sl.max_height = 0 then 0 else count_level 0 | ||
|
|
||
| let create ?(max_height = 10) () = | ||
| let max_height = Int.max max_height 1 in | ||
| let tail = create_new_node max (max_height - 1) in | ||
| let next = | ||
| Array.init max_height (fun _ -> Atomic.make { node = tail; marked = false }) | ||
| in | ||
| let head = { key = min; height = max_height - 1; next } in | ||
| { head; max_height = max_height - 1 } | ||
|
|
||
| (** Compares old_node and old_mark with the atomic reference and if they are the same then | ||
| Replaces the value in the atomic with node and mark *) | ||
| let compare_and_set_mark_ref (atomic, old_node, old_mark, node, mark) = | ||
| let ({ node = current_node; marked = current_marked } as current) = | ||
| Atomic.get atomic | ||
| in | ||
| current_node == old_node && current_marked = old_mark | ||
| && ((current_node == node && current_marked = mark) | ||
| || Atomic.compare_and_set atomic current { node; marked = mark }) | ||
|
|
||
| (** Returns true if key is found within the skiplist else false; | ||
| Irrespective of return value, fills the preds and succs array with | ||
| the predecessors nodes with smaller key and successors nodes with greater than | ||
| or equal to key | ||
| *) | ||
| let find_in (key, preds, succs, sl) = | ||
| let head = sl.head in | ||
| let rec iterate (prev, curr, succ, mark, level) = | ||
| if mark then | ||
| let snip = | ||
| compare_and_set_mark_ref (prev.next.(level), curr, false, succ, false) | ||
| in | ||
| if not snip then raise Failed_snip | ||
| else | ||
| let { node = new_succ; marked = mark } = Atomic.get succ.next.(level) in | ||
| iterate (prev, succ, new_succ, mark, level) | ||
| else if curr.key != max && curr.key < key then | ||
| let { node = new_succ; marked = mark } = Atomic.get succ.next.(level) in | ||
| iterate (curr, succ, new_succ, mark, level) | ||
| else (prev, curr) | ||
| in | ||
| let rec update_arrays prev level = | ||
| let { node = curr; marked = _ } = Atomic.get prev.next.(level) in | ||
| let { node = succ; marked = mark } = Atomic.get curr.next.(level) in | ||
| try | ||
| let prev, curr = iterate (prev, curr, succ, mark, level) in | ||
| preds.(level) <- prev; | ||
| succs.(level) <- curr; | ||
| if level > 0 then update_arrays prev (level - 1) | ||
| else if curr.key == max then false | ||
| else curr.key = key | ||
| with Failed_snip -> update_arrays head sl.max_height | ||
| in | ||
| update_arrays head sl.max_height | ||
|
|
||
| (** Adds a new key to the skiplist sl. *) | ||
| let add sl key = | ||
| let top_level = get_random_level sl in | ||
| let preds = create_dummy_node_array sl in | ||
| let succs = create_dummy_node_array sl in | ||
| let rec repeat () = | ||
| let found = find_in (key, preds, succs, sl) in | ||
| if found then false | ||
| else | ||
| let new_node_next = | ||
| Array.map | ||
| (fun element -> | ||
| let mark_ref = { node = element; marked = false } in | ||
| Atomic.make mark_ref) | ||
| succs | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm... Aside from the array created here being too large, I believe there might be a more subtle space leak issue. What happens here is that a new node is being constructed and as a part of that an array of references to other nodes is created. A particular reference of this array is not updated during Consider the following scenario:
What will happen then that the This means that after Am I missing something? |
||
| in | ||
| let new_node = { key; height = top_level; next = new_node_next } in | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The |
||
| let pred = preds.(0) in | ||
| let succ = succs.(0) in | ||
| if | ||
| not | ||
| (compare_and_set_mark_ref | ||
| (pred.next.(0), succ, false, new_node, false)) | ||
| then repeat () | ||
| else | ||
| let rec update_levels level = | ||
| let rec set_next () = | ||
| let pred = preds.(level) in | ||
| let succ = succs.(level) in | ||
| if | ||
| compare_and_set_mark_ref | ||
| (pred.next.(level), succ, false, new_node, false) | ||
| then () | ||
| else ( | ||
| find_in (key, preds, succs, sl) |> ignore; | ||
| set_next ()) | ||
| in | ||
| set_next (); | ||
| if level < top_level then update_levels (level + 1) | ||
| in | ||
| if top_level > 0 then update_levels 1; | ||
| true | ||
| in | ||
| repeat () | ||
|
|
||
| let mem sl key = | ||
| let rec search (pred, curr, succ, mark, level) = | ||
| if mark then | ||
| let { node = new_succ; marked = mark } = Atomic.get succ.next.(level) in | ||
| search (pred, succ, new_succ, mark, level) | ||
| else if curr.key != max && curr.key < key then | ||
| let { node = new_succ; marked = mark } = Atomic.get succ.next.(level) in | ||
| search (curr, succ, new_succ, mark, level) | ||
| else if level > 0 then | ||
| let level = level - 1 in | ||
| let { node = curr; marked = _ } = Atomic.get pred.next.(level) in | ||
| let { node = succ; marked = mark } = Atomic.get curr.next.(level) in | ||
| search (pred, curr, succ, mark, level) | ||
| else if curr.key == max then false | ||
| else curr.key = key | ||
| in | ||
| let pred = sl.head in | ||
| let { node = curr; marked = _ } = Atomic.get pred.next.(sl.max_height) in | ||
| let { node = succ; marked = mark } = Atomic.get curr.next.(sl.max_height) in | ||
| search (pred, curr, succ, mark, sl.max_height) | ||
|
|
||
| let remove sl key = | ||
| let preds = create_dummy_node_array sl in | ||
| let succs = create_dummy_node_array sl in | ||
| let found = find_in (key, preds, succs, sl) in | ||
| if not found then false | ||
| else | ||
| let nodeToRemove = succs.(0) in | ||
| let nodeHeight = nodeToRemove.height in | ||
| let rec mark_levels succ level = | ||
| let _ = | ||
| compare_and_set_mark_ref | ||
| (nodeToRemove.next.(level), succ, false, succ, true) | ||
| in | ||
| let { node = succ; marked = mark } = | ||
| Atomic.get nodeToRemove.next.(level) | ||
| in | ||
| if not mark then mark_levels succ level | ||
| in | ||
| let rec update_upper_levels level = | ||
| let { node = succ; marked = mark } = | ||
| Atomic.get nodeToRemove.next.(level) | ||
| in | ||
| if not mark then mark_levels succ level; | ||
| if level > 1 then update_upper_levels (level - 1) | ||
| in | ||
| let rec update_bottom_level succ = | ||
| let iMarkedIt = | ||
| compare_and_set_mark_ref (nodeToRemove.next.(0), succ, false, succ, true) | ||
| in | ||
| let { node = succ; marked = mark } = Atomic.get succs.(0).next.(0) in | ||
| if iMarkedIt then ( | ||
| find_in (key, preds, succs, sl) |> ignore; | ||
| true) | ||
| else if mark then false | ||
| else update_bottom_level succ | ||
| in | ||
| if nodeHeight > 0 then update_upper_levels nodeHeight; | ||
| let { node = succ; marked = _ } = Atomic.get nodeToRemove.next.(0) in | ||
| update_bottom_level succ | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,24 @@ | ||
| (** Skiplist TODO | ||
|
|
||
| [key] values are compared with [=] and thus should not be functions or | ||
| objects. | ||
| *) | ||
|
|
||
| type 'a t | ||
| (** The type of lock-free skiplist. *) | ||
|
|
||
| val create : ?max_height:int -> unit -> 'a t | ||
| (** [create ~max_height ()] returns a new empty skiplist. [~max_height] is the | ||
| number of level used to distribute nodes. Its default value is 10 by default | ||
| and can not be less than 1. *) | ||
|
|
||
| val add : 'a t -> 'a -> bool | ||
| (** [add s v] adds [v] to [s] if [v] is not already in [s] and returns | ||
| [true]. If [v] is already in [s], it returns [false] and [v] is unchanged. *) | ||
|
|
||
| val remove : 'a t -> 'a -> bool | ||
| (** [remove s v] removes [v] of [s] if [v] is in [s] and returns [true]. If [v] | ||
| is not in [s], it returns [false] and [v] is unchanged. *) | ||
|
|
||
| val mem : 'a t -> 'a -> bool | ||
| (** [mem s v] returns [true] if v is in s and [false] otherwise. *) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,22 @@ | ||
| (rule | ||
| (copy ../../src_lockfree/backoff.ml backoff.ml)) | ||
|
|
||
| (rule | ||
| (copy ../../src_lockfree/skiplist.ml skiplist.ml)) | ||
|
|
||
| (test | ||
| (name skiplist_dscheck) | ||
| (libraries atomic dscheck alcotest) | ||
| (modules skiplist skiplist_dscheck)) | ||
|
|
||
| (test | ||
| (name qcheck_skiplist) | ||
| (libraries saturn qcheck qcheck-alcotest) | ||
| (modules qcheck_skiplist)) | ||
|
|
||
| (test | ||
| (name stm_skiplist) | ||
| (modules stm_skiplist) | ||
| (libraries saturn qcheck-stm.sequential qcheck-stm.domain) | ||
| (action | ||
| (run %{test} --verbose))) |
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm... The use of
elemsarray here and above is a bit strange. A huge array is initialized and then only the fraction for a single thread is used. Why not use aRandom.inthere as well?