-
Notifications
You must be signed in to change notification settings - Fork 7
Fix memory leak #14
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix memory leak #14
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -48,6 +48,7 @@ type process_data = { | |
| mutable next_repr: int option; | ||
| mutable resume_func : (unit, unit) handler -> unit; | ||
| mutable finished : bool; | ||
| mutable discontinue_f : unit -> unit; | ||
| } | ||
|
|
||
| let every_func = ref (fun () -> ()) | ||
|
|
@@ -82,18 +83,33 @@ let incr r = ignore (fetch_and_add r 1) | |
| let decr r = ignore (fetch_and_add r (-1)) | ||
|
|
||
| (* Tracing infrastructure *) | ||
|
|
||
| exception Terminated_early | ||
|
|
||
| let discontinue k () = | ||
| discontinue_with k Terminated_early ({ | ||
| retc = (fun _ -> ()); | ||
| exnc = (function Terminated_early -> () | e -> raise e); | ||
| effc = (fun (type a) (_ : a Effect.t) -> None); | ||
| }) | ||
| ;; | ||
|
|
||
| let processes = CCVector.create () | ||
|
|
||
| let update_process_data process_id f op repr = | ||
| let update_process_data process_id f op repr k = | ||
| let process_rec = CCVector.get processes process_id in | ||
| process_rec.resume_func <- f; | ||
| process_rec.next_repr <- repr; | ||
| process_rec.next_op <- op | ||
| process_rec.next_op <- op; | ||
| process_rec.discontinue_f <- discontinue k | ||
| ;; | ||
|
|
||
| let finish_process process_id = | ||
| let process_rec = CCVector.get processes process_id in | ||
| process_rec.finished <- true; | ||
| process_rec.discontinue_f <- (fun () -> ()); | ||
| finished_processes := !finished_processes + 1 | ||
| ;; | ||
|
|
||
| let handler current_process_id runner = | ||
| { | ||
|
|
@@ -112,44 +128,44 @@ let handler current_process_id runner = | |
| let i = !atomics_counter in | ||
| let m = (Atomic.make v, i) in | ||
| atomics_counter := !atomics_counter + 1; | ||
| update_process_data current_process_id (fun h -> continue_with k m h) Make (Some i); | ||
| update_process_data current_process_id (fun h -> continue_with k m h) Make (Some i) k; | ||
| runner ()) | ||
| | Get (v,i) -> | ||
| Some | ||
| (fun (k : (a, _) continuation) -> | ||
| update_process_data current_process_id (fun h -> continue_with k (Atomic.get v) h) Get (Some i); | ||
| update_process_data current_process_id (fun h -> continue_with k (Atomic.get v) h) Get (Some i) k; | ||
| runner ()) | ||
| | Set ((r,i), v) -> | ||
| Some | ||
| (fun (k : (a, _) continuation) -> | ||
| update_process_data current_process_id (fun h -> continue_with k (Atomic.set r v) h) Set (Some i); | ||
| update_process_data current_process_id (fun h -> continue_with k (Atomic.set r v) h) Set (Some i) k; | ||
| runner ()) | ||
| | Exchange ((a,i), b) -> | ||
| Some | ||
| (fun (k : (a, _) continuation) -> | ||
| update_process_data current_process_id (fun h -> continue_with k (Atomic.exchange a b) h) Exchange (Some i); | ||
| update_process_data current_process_id (fun h -> continue_with k (Atomic.exchange a b) h) Exchange (Some i) k; | ||
| runner ()) | ||
| | CompareAndSwap ((x,i), s, v) -> | ||
| Some | ||
| (fun (k : (a, _) continuation) -> | ||
| update_process_data current_process_id (fun h -> | ||
| continue_with k (Atomic.compare_and_set x s v) h) CompareAndSwap (Some i); | ||
| continue_with k (Atomic.compare_and_set x s v) h) CompareAndSwap (Some i) k; | ||
| runner ()) | ||
| | FetchAndAdd ((v,i), x) -> | ||
| Some | ||
| (fun (k : (a, _) continuation) -> | ||
| update_process_data current_process_id (fun h -> | ||
| continue_with k (Atomic.fetch_and_add v x) h) FetchAndAdd (Some i); | ||
| continue_with k (Atomic.fetch_and_add v x) h) FetchAndAdd (Some i) k; | ||
| runner ()) | ||
| | _ -> | ||
| None); | ||
| } | ||
|
|
||
| let spawn f = | ||
| let fiber_f h = | ||
| continue_with (fiber f) () h in | ||
| let fiber_f = fiber f in | ||
| let resume_func = continue_with fiber_f () in | ||
| CCVector.push processes | ||
| { next_op = Start; next_repr = None; resume_func = fiber_f; finished = false } | ||
| { next_op = Start; next_repr = None; resume_func; finished = false; discontinue_f = discontinue fiber_f} | ||
|
|
||
| let rec last_element l = | ||
| match l with | ||
|
|
@@ -208,6 +224,7 @@ let do_run init_func init_schedule = | |
| |> Seq.filter (fun (_,proc) -> not proc.finished) | ||
| |> Seq.map (fun (id,_) -> id) | ||
| |> IntSet.of_seq in | ||
| CCVector.iter (fun proc -> proc.discontinue_f ()) processes; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This seems like a fairly straightforward fix to me (i.e. discontinue the continuations after being done with them). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm I think this is a solution to the wrong problem. The reason why we end up with pending continuations is that the evaluation of a program while it should just do
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I disagree.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess it's a matter of point of view, but I find the code more complex if it's covering up the leak rather than not having it in the first place. It adds code that shouldn't be necessary. There are multiple ways of fixing a bug, so it's worth reflecting on which direction this is pushing the codebase towards. I don't know how to make
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
This is very much where I'm coming from. Discontinuing continuations is the smallest step we can take towards correct OCaml code. I don't think that reverting of either of the patches is going to be a big job, but it's going to be far easier to revert fix of a memory leak, than backpedal on locking core algorithm into DFS and sacrificing pure Reduction to 0.4s is admirable but dscheck is still going to explode on n+2 on a complex test rather than n. It's not going to give us a qualitative difference. Conversely, if we modernize pruning in core algo, we might not even need per-state optimizations. We can still look at the first commit in #3 because it's arguably the best we have. But we should not hold off on stabilizing dscheck testsuites.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This PR addresses precisely the origin of the leak. Continuations are cancellable for a reason. Yes, we can force the algorithm to execute continuations to the end instead but that's not in any way mutually exclusive with discontinuing ones we don't need. In fact both mechanisms could coexist.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also doesn't the same argument apply in reverse? #3 is a far more restricting change to the core algo and we might end up needing to revert it as soon as we try to improve on the exponential. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I mean, your PR is cleaning up after the leak. The origin of the leak is that the continuations are not executed fully. There's nothing to discontinue once this is addressed. Cancelling the continuations is incompatible with running them to the end. I don't know that there's a core algorithm to preserve, I only know that we are wasting resources by aborting the continuations early. But I'm probably not going to convince you at this point :P It seems more productive if you carry on with your plan and revisit the "run continuations to the end" later.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
This PR terminates continuations in both ways. Likewise, if #3 needed to sometimes step off depth-first (e.g. for particular op), it would have to start discontinuing at those points only. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Well the search order got nothing to do with wasting runs by cancelling them. Issue #9 will be a valid reason for cleaning up though! Anyway, I'm sorry if I was too blunt in my feedback. We can keep arguing but I don't think we understand each other. My worst outcome has already been reached if you've convinced yourself that dscheck performances isn't worth improving.. Solving sudoku is exponential but there are tricks to speed it up! I really hope you will go on to write some cool optimizations, the code will tell you what's slow and it's a lot of fun :) |
||
| CCVector.clear processes; | ||
| atomics_counter := 1; | ||
| match last_element init_schedule with | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,62 @@ | ||
| (* | ||
| * Copyright (c) 2015, Théo Laurent <theo.laurent@ens.fr> | ||
| * Copyright (c) 2015, KC Sivaramakrishnan <sk826@cl.cam.ac.uk> | ||
| * | ||
| * Permission to use, copy, modify, and/or distribute this software for any | ||
| * purpose with or without fee is hereby granted, provided that the above | ||
| * copyright notice and this permission notice appear in all copies. | ||
| * | ||
| * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES | ||
| * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF | ||
| * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR | ||
| * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES | ||
| * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN | ||
| * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF | ||
| * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. | ||
| *) | ||
|
|
||
| (* Michael-Scott queue copied over from [lockfree](github.com/ocaml-multicore/lockfree) *) | ||
| module Atomic = Dscheck.TracedAtomic | ||
|
|
||
| type 'a node = Nil | Next of 'a * 'a node Atomic.t | ||
| type 'a t = { head : 'a node Atomic.t; tail : 'a node Atomic.t } | ||
|
|
||
| let create () = | ||
| let head = Next (Obj.magic (), Atomic.make Nil) in | ||
| { head = Atomic.make head; tail = Atomic.make head } | ||
|
|
||
| let is_empty q = | ||
| match Atomic.get q.head with | ||
| | Nil -> failwith "MSQueue.is_empty: impossible" | ||
| | Next (_, x) -> ( match Atomic.get x with Nil -> true | _ -> false) | ||
|
|
||
| let pop q = | ||
| let rec loop () = | ||
| let s = Atomic.get q.head in | ||
| let nhead = | ||
| match s with | ||
| | Nil -> failwith "MSQueue.pop: impossible" | ||
| | Next (_, x) -> Atomic.get x | ||
| in | ||
| match nhead with | ||
| | Nil -> None | ||
| | Next (v, _) when Atomic.compare_and_set q.head s nhead -> Some v | ||
| | _ -> loop () | ||
| in | ||
| loop () | ||
|
|
||
| let push q v = | ||
| let rec find_tail_and_enq curr_end node = | ||
| if Atomic.compare_and_set curr_end Nil node then () | ||
| else | ||
| match Atomic.get curr_end with | ||
| | Nil -> find_tail_and_enq curr_end node | ||
| | Next (_, n) -> find_tail_and_enq n node | ||
| in | ||
| let newnode = Next (v, Atomic.make Nil) in | ||
| let tail = Atomic.get q.tail in | ||
| match tail with | ||
| | Nil -> failwith "HW_MSQueue.push: impossible" | ||
| | Next (_, n) -> | ||
| find_tail_and_enq n newnode; | ||
| ignore (Atomic.compare_and_set q.tail tail newnode) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,39 @@ | ||
| module Atomic = Dscheck.TracedAtomic | ||
|
|
||
| let drain queue = | ||
| let remaining = ref 0 in | ||
| while not (Michael_scott_queue.is_empty queue) do | ||
| remaining := !remaining + 1; | ||
| assert (Option.is_some (Michael_scott_queue.pop queue)) | ||
| done; | ||
| !remaining | ||
|
|
||
| let producer_consumer () = | ||
| Atomic.trace (fun () -> | ||
| let queue = Michael_scott_queue.create () in | ||
| let items_total = 4 in | ||
|
|
||
| Atomic.spawn (fun () -> | ||
| for _ = 1 to items_total do | ||
| Michael_scott_queue.push queue 0 | ||
| done); | ||
|
|
||
| (* consumer *) | ||
| let popped = ref 0 in | ||
| Atomic.spawn (fun () -> | ||
| for _ = 1 to items_total do | ||
| match Michael_scott_queue.pop queue with | ||
| | None -> () | ||
| | Some _ -> popped := !popped + 1 | ||
| done); | ||
|
|
||
| (* checks*) | ||
| Atomic.final (fun () -> | ||
| Atomic.check (fun () -> | ||
| let remaining = drain queue in | ||
| !popped + remaining = items_total))) | ||
|
|
||
| let () = | ||
| let open Alcotest in | ||
| run "michael_scott_queue_dscheck" | ||
| [ ("basic", [ test_case "1-producer-1-consumer" `Slow producer_consumer ]) ] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Drive-by-comment: 😅
I think this should be a
with-testdependencyThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
haha, thanks!