diff --git a/dscheck.opam b/dscheck.opam index 242a1eb..a46212f 100644 --- a/dscheck.opam +++ b/dscheck.opam @@ -9,8 +9,7 @@ bug-reports: "https://github.com/ocaml-multicore/dscheck/issues" depends: [ "ocaml" {>= "5.0.0"} "dune" {>= "2.9"} - "containers" - "oseq" + "alcotest" {>= "1.6.0"} "odoc" {with-doc} ] build: [ diff --git a/dune-project b/dune-project index ddd2ce3..318d387 100644 --- a/dune-project +++ b/dune-project @@ -11,6 +11,4 @@ (package (name dscheck) (synopsis "Traced Atomics") - (depends (ocaml (>= 5.0.0)) dune containers oseq (alcotest (>= 1.6.0)))) - - + (depends (ocaml (>= 5.0.0)) dune (alcotest (>= 1.6.0)))) diff --git a/src/dune b/src/dune index 82e41aa..eef9f13 100644 --- a/src/dune +++ b/src/dune @@ -1,3 +1,2 @@ (library - (public_name dscheck) - (libraries containers oseq)) + (public_name dscheck)) diff --git a/src/tracedAtomic.ml b/src/tracedAtomic.ml index 8ec2c3d..05e33de 100644 --- a/src/tracedAtomic.ml +++ b/src/tracedAtomic.ml @@ -1,7 +1,38 @@ open Effect open Effect.Shallow -type 'a t = 'a Atomic.t * int +module Uid = struct + type t = int list + let compare = List.compare Int.compare + let to_string t = String.concat "," (List.map string_of_int t) +end + +module IdSet = Set.Make (Uid) +module IdMap = Map.Make (Uid) + +module Uid_pretty = struct + let gen = ref 0 + let cache = ref IdMap.empty + + let find t = + match IdMap.find t !cache with + | i -> i + | exception Not_found -> + let i = !gen in + incr gen ; + cache := IdMap.add t i !cache ; + i + + let to_string = function + | [] -> "_" + | t -> + let i = find t in + if i >= 0 && i < 26 + then String.make 1 (Char.chr (Char.code 'A' + i)) + else string_of_int i +end + +type 'a t = 'a Atomic.t * Uid.t type _ Effect.t += | Make : 'a -> 'a t Effect.t @@ -10,24 +41,17 @@ type _ Effect.t += | Exchange : ('a t * 'a) -> 'a Effect.t | CompareAndSwap : ('a t * 'a * 'a) -> bool Effect.t | FetchAndAdd : (int t * int) -> int Effect.t + | Spawn : (unit -> unit) -> unit Effect.t -module IntSet = Set.Make( - struct - let compare = Stdlib.compare - type t = int - end ) +type cas_result = Unknown | Success | Failed -module IntMap = Map.Make( - struct - type t = int - let compare = Int.compare - end - ) +type atomic_op = + | Start | Make | Get | Set | Exchange | FetchAndAdd | Spawn + | CompareAndSwap of cas_result ref -let _string_of_set s = - IntSet.fold (fun y x -> (string_of_int y) ^ "," ^ x) s "" - -type atomic_op = Start | Make | Get | Set | Exchange | CompareAndSwap | FetchAndAdd +let atomic_op_equal a b = match a, b with + | CompareAndSwap _, CompareAndSwap _ -> true + | _ -> a = b let atomic_op_str x = match x with @@ -36,16 +60,24 @@ let atomic_op_str x = | Get -> "get" | Set -> "set" | Exchange -> "exchange" - | CompareAndSwap -> "compare_and_swap" | FetchAndAdd -> "fetch_and_add" + | Spawn -> "spawn" + | CompareAndSwap cas -> + begin match !cas with + | Unknown -> "compare_and_swap?" + | Success -> "compare_and_swap" + | Failed -> "compare_and_no_swap" + end let tracing = ref false let finished_processes = ref 0 type process_data = { + uid : Uid.t; + mutable generator : int; mutable next_op: atomic_op; - mutable next_repr: int option; + mutable next_repr: Uid.t option; mutable resume_func : (unit, unit) handler -> unit; mutable finished : bool; } @@ -54,13 +86,12 @@ let every_func = ref (fun () -> ()) let final_func = ref (fun () -> ()) (* Atomics implementation *) -let atomics_counter = ref 1 +let atomics_counter = Atomic.make (-1) let make v = if !tracing then perform (Make v) else begin - let i = !atomics_counter in - atomics_counter := !atomics_counter + 1; - (Atomic.make v, i) + let i = Atomic.fetch_and_add atomics_counter (- 1) in + (Atomic.make v, [i]) end let get r = if !tracing then perform (Get r) else match r with | (v,_) -> Atomic.get v @@ -82,25 +113,30 @@ let incr r = ignore (fetch_and_add r 1) let decr r = ignore (fetch_and_add r (-1)) (* Tracing infrastructure *) -let processes = CCVector.create () +let processes = ref IdMap.empty +let push_process p = + assert (not (IdMap.mem p.uid !processes)) ; + processes := IdMap.add p.uid p !processes + +let get_process id = IdMap.find id !processes let update_process_data process_id f op repr = - let process_rec = CCVector.get processes process_id in + let process_rec = get_process process_id in process_rec.resume_func <- f; process_rec.next_repr <- repr; process_rec.next_op <- op let finish_process process_id = - let process_rec = CCVector.get processes process_id in + let process_rec = get_process process_id in process_rec.finished <- true; finished_processes := !finished_processes + 1 -let handler current_process_id runner = +let handler current_process runner = { retc = (fun _ -> ( - finish_process current_process_id; + finish_process current_process.uid; runner ())); exnc = (fun s -> raise s); effc = @@ -109,140 +145,265 @@ let handler current_process_id runner = | Make v -> Some (fun (k : (a, _) continuation) -> - let i = !atomics_counter in + let i = current_process.generator :: current_process.uid in + current_process.generator <- current_process.generator + 1 ; let m = (Atomic.make v, i) in - atomics_counter := !atomics_counter + 1; - update_process_data current_process_id (fun h -> continue_with k m h) Make (Some i); + update_process_data current_process.uid (fun h -> continue_with k m h) Make (Some i); runner ()) | Get (v,i) -> Some (fun (k : (a, _) continuation) -> - update_process_data current_process_id (fun h -> continue_with k (Atomic.get v) h) Get (Some i); + update_process_data current_process.uid (fun h -> continue_with k (Atomic.get v) h) Get (Some i); runner ()) | Set ((r,i), v) -> Some (fun (k : (a, _) continuation) -> - update_process_data current_process_id (fun h -> continue_with k (Atomic.set r v) h) Set (Some i); + update_process_data current_process.uid (fun h -> continue_with k (Atomic.set r v) h) Set (Some i); runner ()) | Exchange ((a,i), b) -> Some (fun (k : (a, _) continuation) -> - update_process_data current_process_id (fun h -> continue_with k (Atomic.exchange a b) h) Exchange (Some i); + update_process_data current_process.uid (fun h -> continue_with k (Atomic.exchange a b) h) Exchange (Some i); runner ()) | CompareAndSwap ((x,i), s, v) -> Some (fun (k : (a, _) continuation) -> - update_process_data current_process_id (fun h -> - continue_with k (Atomic.compare_and_set x s v) h) CompareAndSwap (Some i); + let res = ref Unknown in + update_process_data current_process.uid (fun h -> + let ok = Atomic.compare_and_set x s v in + res := if ok then Success else Failed ; + continue_with k ok h) (CompareAndSwap res) (Some i); runner ()) | FetchAndAdd ((v,i), x) -> Some (fun (k : (a, _) continuation) -> - update_process_data current_process_id (fun h -> + update_process_data current_process.uid (fun h -> continue_with k (Atomic.fetch_and_add v x) h) FetchAndAdd (Some i); runner ()) + | Spawn f -> + Some + (fun (k : (a, _) continuation) -> + let fiber_f h = continue_with (fiber f) () h in + let uid = current_process.generator :: current_process.uid in + current_process.generator <- current_process.generator + 1 ; + let new_process = + { uid; generator = 0; next_op = Start; next_repr = None; resume_func = fiber_f; finished = false } + in + update_process_data + current_process.uid + (fun h -> + push_process new_process ; + continue_with k () h) + Spawn + (Some uid); + runner ()) | _ -> None); } let spawn f = - let fiber_f h = - continue_with (fiber f) () h in - CCVector.push processes - { next_op = Start; next_repr = None; resume_func = fiber_f; finished = false } - -let rec last_element l = - match l with - | h :: [] -> h - | [] -> assert(false) - | _ :: tl -> last_element tl + if !tracing then perform (Spawn f) else failwith "spawn outside tracing" + +type proc_rec = { proc_id: Uid.t; op: atomic_op; obj_ptr : Uid.t option } +type state_cell = { + procs : proc_rec IdMap.t; + run : proc_rec; + enabled : IdSet.t; + mutable backtrack : proc_rec list IdMap.t; +} -type proc_rec = { proc_id: int; op: atomic_op; obj_ptr : int option } -type state_cell = { procs: proc_rec list; run_proc: int; run_op: atomic_op; run_ptr: int option; enabled : IntSet.t; mutable backtrack : IntSet.t } +let group_by fn = function + | [] -> [] + | first :: rest -> + let rec go previous previouses = function + | [] -> [previous::previouses] + | x::xs when fn x previous -> go x (previous::previouses) xs + | x::xs -> (previous::previouses) :: go x [] xs + in + List.rev (go first [] rest) + +let pretty_print h lst = + List.iter + (function + | steps when List.compare_length_with steps 3 <= 0 -> + List.iter (fun step -> Format.fprintf h "%s" (Uid_pretty.to_string step.proc_id)) steps + | (step :: _) as steps -> + Format.fprintf h "(%s%i)" (Uid_pretty.to_string step.proc_id) (List.length steps) + | _ -> assert false) + (group_by (fun a b -> a.proc_id = b.proc_id) lst) + +let clear_line = "\027[2K\r" let num_runs = ref 0 (* we stash the current state in case a check fails and we need to log it *) let schedule_for_checks = ref [] -let do_run init_func init_schedule = - init_func (); (*set up run *) - tracing := true; +let setup_run func init_schedule = + num_runs := !num_runs + 1; + if !num_runs mod 1000 == 0 then + Format.printf "%srun: %#i %a %!" clear_line !num_runs pretty_print !schedule_for_checks; + processes := IdMap.empty ; + finished_processes := 0; schedule_for_checks := init_schedule; - (* cache the number of processes in case it's expensive*) - let num_processes = CCVector.length processes in - (* current number of ops we are through the current run *) + tracing := true; + let uid = [] in + let fiber_f h = continue_with (fiber func) () h in + push_process + { uid; generator = 0; next_op = Start; next_repr = None; resume_func = fiber_f; finished = false } ; + tracing := false + +let do_run init_schedule = + let trace = ref [] in let rec run_trace s () = tracing := false; !every_func (); tracing := true; match s with - | [] -> if !finished_processes == num_processes then begin + | [] -> if !finished_processes == IdMap.cardinal !processes then begin tracing := false; !final_func (); - tracing := true - end - | (process_id_to_run, next_op, next_ptr) :: schedule -> begin - if !finished_processes == num_processes then - (* this should never happen *) - failwith("no enabled processes") - else - begin - let process_to_run = CCVector.get processes process_id_to_run in - assert(process_to_run.next_op = next_op); - assert(process_to_run.next_repr = next_ptr); - process_to_run.resume_func (handler process_id_to_run (run_trace schedule)) - end + tracing := true; end + | { proc_id = process_id_to_run ; op = next_op ; obj_ptr = next_ptr } :: schedule -> + let process_to_run = get_process process_id_to_run in + assert(not process_to_run.finished); + assert(atomic_op_equal process_to_run.next_op next_op); + assert(process_to_run.next_repr = next_ptr); + let run = { proc_id = process_id_to_run ; op = process_to_run.next_op ; obj_ptr = process_to_run.next_repr } in + trace := run :: !trace ; + process_to_run.resume_func (handler process_to_run (run_trace schedule)) in tracing := true; - run_trace init_schedule (); - finished_processes := 0; + run_trace (List.rev init_schedule) () ; tracing := false; - num_runs := !num_runs + 1; - if !num_runs mod 1000 == 0 then - Printf.printf "run: %d\n%!" !num_runs; - let procs = CCVector.mapi (fun i p -> { proc_id = i; op = p.next_op; obj_ptr = p.next_repr }) processes |> CCVector.to_list in - let current_enabled = CCVector.to_seq processes - |> OSeq.zip_index - |> Seq.filter (fun (_,proc) -> not proc.finished) - |> Seq.map (fun (id,_) -> id) - |> IntSet.of_seq in - CCVector.clear processes; - atomics_counter := 1; - match last_element init_schedule with - | (run_proc, run_op, run_ptr) -> - { procs; enabled = current_enabled; run_proc; run_op; run_ptr; backtrack = IntSet.empty } - -let rec explore func state clock last_access = - let s = last_element state in - List.iter (fun proc -> - let j = proc.proc_id in - let i = Option.bind proc.obj_ptr (fun ptr -> IntMap.find_opt ptr last_access) |> Option.value ~default:0 in - if i != 0 then begin - let pre_s = List.nth state (i-1) in - if IntSet.mem j pre_s.enabled then - pre_s.backtrack <- IntSet.add j pre_s.backtrack - else - pre_s.backtrack <- IntSet.union pre_s.backtrack pre_s.enabled + let procs = + IdMap.mapi (fun i p -> { proc_id = i; op = p.next_op; obj_ptr = p.next_repr }) !processes + in + let current_enabled = + IdMap.to_seq !processes + |> Seq.filter (fun (_, p) -> not p.finished) + |> Seq.map (fun (id, _) -> id) + |> IdSet.of_seq + in + let last_run = List.hd !trace in + { procs; enabled = current_enabled; run = last_run; backtrack = IdMap.empty } + +type category = + | Ignore + | Read + | Write + | Read_write + +let categorize = function + | Spawn | Start | Make -> Ignore + | Get -> Read + | Set -> Write + | Exchange | FetchAndAdd -> Read_write + | CompareAndSwap res -> + begin match !res with + | Unknown -> failwith "CAS unknown outcome" (* should not happen *) + | Success -> Read_write + | Failed -> Read + end + +let mark_backtrack proc state = + match proc.op, proc.obj_ptr, state with + | (Spawn | Start | Make), _, _ + | _, None, _ + | _, _, [] -> () + | _, Some proc_ptr, _ :: state -> + let rec go active_uids timeline acc = + match timeline with + | [] -> () + | pre :: timeline -> + let is_required = IdSet.mem pre.run.proc_id active_uids in + begin match is_required, pre.run.obj_ptr with + | true, Some ptr when ptr = proc_ptr && categorize pre.run.op <> Read -> + () + | false, Some ptr when ptr = proc_ptr -> + begin match categorize proc.op, categorize pre.run.op with + | _, Ignore -> failwith "how?" + | Read, Read -> go active_uids timeline acc + | _ -> + begin match acc, timeline with + | [], _ | _, [] -> assert false + | last :: _, pre :: _ -> + let j = last.proc_id in + if match IdMap.find_opt j pre.backtrack with + | None -> true + | Some lst -> List.length lst > List.length acc + then ( + pre.backtrack <- IdMap.add j acc pre.backtrack ; + ) + end + end + | true, None -> + go active_uids timeline (pre.run :: acc) + | true, Some ptr -> + let active_uids = IdSet.add ptr active_uids in + go active_uids timeline (pre.run :: acc) + | false, Some ptr when IdSet.mem ptr active_uids -> + begin match categorize pre.run.op with + | Read -> go active_uids timeline acc + | _ -> + let active_uids = IdSet.add pre.run.proc_id active_uids in + go active_uids timeline (pre.run :: acc) + end + | false, _ -> + go active_uids timeline acc + end + in + let active_uids = IdSet.add proc_ptr (IdSet.singleton proc.proc_id) in + go active_uids state [proc] + +let map_subtract_set map set = + IdMap.filter (fun key _ -> not (IdSet.mem key set)) map + +let rec explore func time state (explored, state_planned) current_schedule = + let s = List.hd state in + assert (IdMap.is_empty s.backtrack) ; + let dones = ref IdSet.empty in + begin match state_planned with + | [] -> + if IdSet.cardinal s.enabled > 0 then begin + let p = IdSet.min_elt s.enabled in + let init_step = IdMap.find p s.procs in + s.backtrack <- IdMap.singleton p [init_step] ; + end + | { proc_id ; _ } :: _ -> + dones := explored ; + s.backtrack <- IdMap.singleton proc_id state_planned + end ; + let is_backtracking = ref false in + while IdMap.(cardinal (map_subtract_set s.backtrack !dones)) > 0 do + let j, new_steps = IdMap.min_binding (map_subtract_set s.backtrack !dones) in + let new_explored = !dones in + dones := IdSet.add j new_explored; + let new_step, new_state_planned = + match new_steps with + | [] -> assert false + | next :: future -> next, future + in + let new_schedule = new_step :: current_schedule in + let schedule = + if !is_backtracking + then begin + setup_run func new_schedule ; + new_schedule + end + else begin + is_backtracking := true ; + schedule_for_checks := new_schedule; + [new_step] end - ) s.procs; - if IntSet.cardinal s.enabled > 0 then begin - let p = IntSet.min_elt s.enabled in - let dones = ref IntSet.empty in - s.backtrack <- IntSet.singleton p; - while IntSet.(cardinal (diff s.backtrack !dones)) > 0 do - let j = IntSet.min_elt (IntSet.diff s.backtrack !dones) in - dones := IntSet.add j !dones; - let j_proc = List.nth s.procs j in - let schedule = (List.map (fun s -> (s.run_proc, s.run_op, s.run_ptr)) state) @ [(j, j_proc.op, j_proc.obj_ptr)] in - let statedash = state @ [do_run func schedule] in - let state_time = (List.length statedash)-1 in - let new_last_access = match j_proc.obj_ptr with Some(ptr) -> IntMap.add ptr state_time last_access | None -> last_access in - let new_clock = IntMap.add j state_time clock in - explore func statedash new_clock new_last_access - done - end + in + let step = do_run schedule in + let new_state = { step with backtrack = IdMap.empty } :: state in + let new_time = time + 1 in + mark_backtrack step.run new_state; + explore func new_time new_state (new_explored, new_state_planned) new_schedule + done let every f = every_func := f @@ -250,35 +411,43 @@ let every f = let final f = final_func := f +let print_trace () = + List.iter (fun { proc_id ; op ; obj_ptr } -> + let last_run_ptr = Option.map Uid.to_string obj_ptr |> Option.value ~default:"" in + Format.printf " Process %s: %s %s@." (Uid.to_string proc_id) (atomic_op_str op) last_run_ptr + ) (List.rev !schedule_for_checks) + let check f = let tracing_at_start = !tracing in tracing := false; if not (f ()) then begin - Printf.printf "Found assertion violation at run %d:\n" !num_runs; - List.iter (fun s -> - begin match s with - | (last_run_proc, last_run_op, last_run_ptr) -> begin - let last_run_ptr = Option.map string_of_int last_run_ptr |> Option.value ~default:"" in - Printf.printf "Process %d: %s %s\n" last_run_proc (atomic_op_str last_run_op) last_run_ptr - end; - end; - ) !schedule_for_checks; - assert(false) + Format.printf "Check: found assertion violation!@." ; + assert false end; tracing := tracing_at_start let reset_state () = finished_processes := 0; - atomics_counter := 1; + Atomic.set atomics_counter 1; num_runs := 0; schedule_for_checks := []; - CCVector.clear processes; -;; - + processes := IdMap.empty let trace func = reset_state (); - let empty_state = do_run func [(0, Start, None)] :: [] in - let empty_clock = IntMap.empty in - let empty_last_access = IntMap.empty in - explore func empty_state empty_clock empty_last_access + let empty_schedule = [{ proc_id = [] ; op = Start ; obj_ptr = None }] in + setup_run func empty_schedule ; + let empty_state = do_run empty_schedule :: [] in + let empty_state_planned = (IdSet.empty, []) in + try explore func 1 empty_state empty_state_planned empty_schedule + with exn -> + Format.printf "Found error at run %d:@." !num_runs; + print_trace () ; + Format.printf "Unhandled exception: %s@." (Printexc.to_string exn) ; + Printexc.print_backtrace stdout ; + raise exn + +let trace func = + Fun.protect + (fun () -> trace func) + ~finally:(fun () -> Format.printf "@.Finished after %#i runs.@." !num_runs) diff --git a/tests/dune b/tests/dune index db241a0..8f8734a 100644 --- a/tests/dune +++ b/tests/dune @@ -7,3 +7,13 @@ (name test_naive_counter) (libraries dscheck alcotest) (modules test_naive_counter)) + +(test + (name test_simple) + (libraries dscheck alcotest) + (modules test_simple)) + +(test + (name test_bug) + (libraries dscheck alcotest) + (modules test_bug)) diff --git a/tests/test_bug.ml b/tests/test_bug.ml new file mode 100644 index 0000000..7bd1620 --- /dev/null +++ b/tests/test_bug.ml @@ -0,0 +1,30 @@ +(** https://github.com/ocaml-multicore/dscheck/pull/3#issuecomment-1366878914 *) + +module Atomic = Dscheck.TracedAtomic + +let test () = + let cancelled = Atomic.make false in + let max_requests = Atomic.make 0 in + Atomic.spawn (fun () -> + Atomic.set cancelled true; + Atomic.decr max_requests; + ); + Atomic.spawn (fun () -> + ignore (Atomic.get max_requests); + assert (Atomic.get cancelled) (* This bug should be detected *) + ) + +let test () = + match Atomic.trace test with + | exception _ -> () + | _ -> failwith "expected failure" + +let () = + let open Alcotest in + run "dscheck" + [ + ( "bug", + [ + test_case "test" `Quick test; + ] ); + ] diff --git a/tests/test_list.ml b/tests/test_list.ml index dcdc556..b1f60af 100644 --- a/tests/test_list.ml +++ b/tests/test_list.ml @@ -60,7 +60,7 @@ let () = ( "list", [ test_case "naive-1-domain" `Quick (test_list_naive_single_domain); - test_case "naive-2-domains" `Quick (test_list_naive 8); + test_case "naive-2-domains" `Quick (test_list_naive 2); test_case "naive-8-domains" `Quick (test_list_naive 8); test_case "safe" `Quick test_list_safe; ] ); diff --git a/tests/test_simple.ml b/tests/test_simple.ml new file mode 100644 index 0000000..2c158b7 --- /dev/null +++ b/tests/test_simple.ml @@ -0,0 +1,31 @@ +module Atomic = Dscheck.TracedAtomic + +(* Example from ocaml-parrafuzz presentation at the OCaml Workshop 2021: + https://www.youtube.com/watch?v=GZsUoSaIpIs *) + +let test i = + let x = Atomic.make i in + let y = Atomic.make 0 in + Atomic.spawn (fun () -> + Atomic.spawn (fun () -> if Atomic.get x = 10 then Atomic.set y 2)) ; + Atomic.set x 0 ; + Atomic.set y 1 ; + Atomic.final (fun () -> Atomic.check (fun () -> Atomic.get y <> 2)) + +let test_0 () = Atomic.trace (fun () -> test 0) + +let test_10 () = + match Atomic.trace (fun () -> test 10) with + | exception _ -> () + | _ -> failwith "expected failure" + +let () = + let open Alcotest in + run "dscheck" + [ + ( "simple", + [ + test_case "test-0" `Quick test_0; + test_case "test-10" `Quick test_10; + ] ); + ]