From 904d1646e4e2a959f6815eadd9a291a8e2305d82 Mon Sep 17 00:00:00 2001 From: pfistfl Date: Wed, 1 Apr 2020 19:53:44 +0200 Subject: [PATCH 01/26] Initial draft caching --- DESCRIPTION | 1 + R/PipeOp.R | 27 +++++++++++++++-- tests/testthat/test_caching.R | 56 +++++++++++++++++++++++++++++++++++ 3 files changed, 81 insertions(+), 3 deletions(-) create mode 100644 tests/testthat/test_caching.R diff --git a/DESCRIPTION b/DESCRIPTION index a0c56003d..b21082007 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -45,6 +45,7 @@ Imports: mlr3misc (>= 0.1.4), paradox, R6, + R.cache, withr Suggests: ggplot2, diff --git a/R/PipeOp.R b/R/PipeOp.R index 8027763ff..62f1281ce 100644 --- a/R/PipeOp.R +++ b/R/PipeOp.R @@ -241,7 +241,17 @@ PipeOp = R6Class("PipeOp", return(named_list(self$output$name, NO_OP)) } input = check_types(self, input, "input", "train") - output = private$.train(input) + print(self$hash) + print(digest(private$.param_set, algo = "xxhash64")) + a <<- private$.param_set + R.cache::evalWithMemoization({ + output = private$.train(input) + }, + key = list(map_chr(input, get_hash), self$hash) + ) + print(self$hash) + print(digest(private$.param_set, algo = "xxhash64")) + b <<- private$.param_set output = check_types(self, output, "output", "train") output }, @@ -255,9 +265,15 @@ PipeOp = R6Class("PipeOp", if (is_noop(self$state)) { stopf("Pipeop %s got NO_OP during train but no NO_OP during predict.", self$id) } - input = check_types(self, input, "input", "predict") - output = private$.predict(input) + + R.cache::evalWithMemoization({ + output = private$.predict(input) + }, + key = list(map_chr(input, get_hash), self$hash) + ) + print(digest(list(class(self), self$id), algo = "xxhash64")) + print(digest(self$param_set, algo = "xxhash64")) output = check_types(self, output, "output", "predict") output } @@ -366,3 +382,8 @@ check_types = function(self, data, direction, operation) { names(data) = typetable$name data } + +get_hash = function(x) { + if (!is.null(x$hash)) return(x$hash) + digest(x, algo = "xxhash64") +} \ No newline at end of file diff --git a/tests/testthat/test_caching.R b/tests/testthat/test_caching.R new file mode 100644 index 000000000..af36d414b --- /dev/null +++ b/tests/testthat/test_caching.R @@ -0,0 +1,56 @@ +context("Caching") + +test_that("Bagging Pipeline", { + library("mlr3learners") + library(profvis) + library(ggplot2) + + tsk = tsk("iris") + + po$hash + po$train(list(tsk)) + po$hash + + po = po("scale") + print(po$hash) + R.cache::clearCache(prompt = FALSE) + # R.cache::findCache(list(map(list(tsk), "hash"), po_hash)) + po$train(list(tsk)) + po$train(list(tsk)) + # R.cache::findCache(list(map(list(tsk), "hash"), po_hash)) + + po = po("nop") + print(po$hash) + R.cache::clearCache(prompt = FALSE) + # R.cache::findCache(list(map(list(tsk), "hash"), po_hash)) + po$train(list(tsk)) + po$train(list(tsk)) + # R.cache::findCache(list(map(list(ts + + a = po("scale") + a$hash + a$param_set$values$center = FALSE + a$hash + + for (kk in 1:5) { + cat(sprintf("Iteration #%d:\n", kk)) + res <- evalWithMemoization({ + cat("Evaluating expression...") + a <- 1 + b <- 2 + c <- 4 + Sys.sleep(1) + cat("done\n") + b + }) + print(res) + + # Sanity checks + stopifnot(a == 1 && b == 2 && c == 4) + + # Clean up + rm(a, b, c) + } # for (kk ...) + +}) + From 2f9253cebb50773e9ba80934c9b37ff5455362a0 Mon Sep 17 00:00:00 2001 From: pfistfl Date: Wed, 1 Apr 2020 20:28:51 +0200 Subject: [PATCH 02/26] Initial tests --- R/PipeOp.R | 8 +-- tests/testthat/test_caching.R | 123 ++++++++++++++++++++++------------ 2 files changed, 81 insertions(+), 50 deletions(-) diff --git a/R/PipeOp.R b/R/PipeOp.R index 62f1281ce..c0aedadd5 100644 --- a/R/PipeOp.R +++ b/R/PipeOp.R @@ -241,17 +241,13 @@ PipeOp = R6Class("PipeOp", return(named_list(self$output$name, NO_OP)) } input = check_types(self, input, "input", "train") - print(self$hash) - print(digest(private$.param_set, algo = "xxhash64")) - a <<- private$.param_set + keya <<- list(map_chr(input, get_hash), self$hash) R.cache::evalWithMemoization({ output = private$.train(input) }, key = list(map_chr(input, get_hash), self$hash) ) - print(self$hash) - print(digest(private$.param_set, algo = "xxhash64")) - b <<- private$.param_set + keyb <<- list(map_chr(input, get_hash), self$hash) output = check_types(self, output, "output", "train") output }, diff --git a/tests/testthat/test_caching.R b/tests/testthat/test_caching.R index af36d414b..79766c078 100644 --- a/tests/testthat/test_caching.R +++ b/tests/testthat/test_caching.R @@ -1,56 +1,91 @@ context("Caching") -test_that("Bagging Pipeline", { - library("mlr3learners") - library(profvis) - library(ggplot2) +test_that("Caching works for test hash pipeop", { + + PipeOpTestHash = R6Class("PipeOpTestHash", + inherit = PipeOp, + public = list( + initialize = function(id = "test.hash", param_set = ParamSet$new()) { + super$initialize(id = id, param_set = param_set, + input = data.table(name = "input", train = "*", predict = "*"), + output = data.table(name = "output", train = "*", predict = "*") + ) + }), + private = list( + .train = function(inputs) { + Sys.sleep(1) + self$state = list() + inputs + }, + .predict = function(inputs) { + Sys.sleep(1) + inputs + } + ) + ) + + # FIXME: + # This could fail if load is very high, nonetheless I would keep it. tsk = tsk("iris") - - po$hash - po$train(list(tsk)) - po$hash + po = PipeOpTestHash$new() - po = po("scale") - print(po$hash) + # Takes > 1 second R.cache::clearCache(prompt = FALSE) - # R.cache::findCache(list(map(list(tsk), "hash"), po_hash)) + st = Sys.time() po$train(list(tsk)) - po$train(list(tsk)) - # R.cache::findCache(list(map(list(tsk), "hash"), po_hash)) + expect_true(st < Sys.time() - 1) - po = po("nop") - print(po$hash) - R.cache::clearCache(prompt = FALSE) - # R.cache::findCache(list(map(list(tsk), "hash"), po_hash)) + # takes < 1 second + st = Sys.time() po$train(list(tsk)) - po$train(list(tsk)) - # R.cache::findCache(list(map(list(ts - - a = po("scale") - a$hash - a$param_set$values$center = FALSE - a$hash - - for (kk in 1:5) { - cat(sprintf("Iteration #%d:\n", kk)) - res <- evalWithMemoization({ - cat("Evaluating expression...") - a <- 1 - b <- 2 - c <- 4 - Sys.sleep(1) - cat("done\n") - b - }) - print(res) - - # Sanity checks - stopifnot(a == 1 && b == 2 && c == 4) - - # Clean up - rm(a, b, c) - } # for (kk ...) + expect_true(st > Sys.time() - 1) + + # Takes > 1 second + st = Sys.time() + po$predict(list(tsk)) + expect_true(st < Sys.time() - 1) + + # takes < 1 second + st = Sys.time() + po$predict(list(tsk)) + expect_true(st > Sys.time() - 1) }) + +# test_that("Caching works for scale", { +# po = po("scale") +# old_hash = po$hash + +# po$train(list(tsk)) +# R.cache::saveCache(key = keya, "a") +# R.cache::loadCache(key = keya) +# R.cache::loadCache(key = keyb) + +# R.cache::findCache(key = list(map(list(tsk), "hash"), po$hash)) +# R.cache::findCache(key = list(map(list(tsk), "hash"), old_hash)) + +# po$train(list(tsk)) +# # R.cache::findCache(list(map(list(tsk), "hash"), po_hash)) + +# tsk = tsk("zoo") +# print(po$hash) +# po$train(list(tsk)) +# print(po$hash) + + +# po = po("nop") +# print(po$hash) +# R.cache::clearCache(prompt = FALSE) +# # R.cache::findCache(list(map(list(tsk), "hash"), po_hash)) +# po$train(list(tsk)) +# po$train(list(tsk)) + + +# a = po("scale") +# a$hash +# a$param_set$values$center = FALSE +# a$hash + +# }) From b1b567924ee510a6fa2d68f7b9ec8287a009888d Mon Sep 17 00:00:00 2001 From: pfistfl Date: Thu, 2 Apr 2020 01:14:34 +0200 Subject: [PATCH 03/26] Caching md --- R/PipeOp.R | 16 ++++---- attic/caching.md | 101 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 108 insertions(+), 9 deletions(-) create mode 100644 attic/caching.md diff --git a/R/PipeOp.R b/R/PipeOp.R index c0aedadd5..a672a8430 100644 --- a/R/PipeOp.R +++ b/R/PipeOp.R @@ -241,14 +241,13 @@ PipeOp = R6Class("PipeOp", return(named_list(self$output$name, NO_OP)) } input = check_types(self, input, "input", "train") - keya <<- list(map_chr(input, get_hash), self$hash) + # caching R.cache::evalWithMemoization({ - output = private$.train(input) - }, - key = list(map_chr(input, get_hash), self$hash) + result = list(private$.train(input), self$state) + }, key = list(map_chr(input, get_hash), self$hash) ) - keyb <<- list(map_chr(input, get_hash), self$hash) - output = check_types(self, output, "output", "train") + if (is.null(self$state)) state = result$state + output = check_types(self, result$output, "output", "train") output }, predict = function(input) { @@ -262,14 +261,13 @@ PipeOp = R6Class("PipeOp", stopf("Pipeop %s got NO_OP during train but no NO_OP during predict.", self$id) } input = check_types(self, input, "input", "predict") - R.cache::evalWithMemoization({ output = private$.predict(input) }, key = list(map_chr(input, get_hash), self$hash) ) - print(digest(list(class(self), self$id), algo = "xxhash64")) - print(digest(self$param_set, algo = "xxhash64")) + # print(digest(list(class(self), self$id), algo = "xxhash64")) + # print(digest(self$param_set, algo = "xxhash64")) output = check_types(self, output, "output", "predict") output } diff --git a/attic/caching.md b/attic/caching.md new file mode 100644 index 000000000..c8cddfa82 --- /dev/null +++ b/attic/caching.md @@ -0,0 +1,101 @@ +# Caching + +These docs describe `oportunistic caching`, i.e. caching after a first function call. +If the same function is executed twice in parallel, this does not save any time/cores. +The example currently uses the `R.cache` package by Henrik Bengtsson for caching. +This is just a very simple caching package, that provides a clean, simple API, could +theoretically be replaced by other packages. + + +## Implementation Details + +Ideally we would like to do caching on an abstract level, +perhaps within the PipeOp base-classes `$train` function. +A very nice point would be to wrap the call to `private$.train`. +This would make complexity very manageable. + +`R.cache::evalWithMemoization` memoizes the provided expression. +The `hash` is computed from its `key` argument. + + +Possible solution: adjust `PipeOp` in `PipeOp.R` +``` +train = function(input) { + ... + t = check_types(self, input, "input", "train") + # caching + R.cache::evalWithMemoization({ + result = list(private$.train(input), self$state) #(see A below) + }, key = list(map_chr(input, get_hash), self$hash) + ) + if (is.null(self$state)) state = result$state #(see A below) + output = check_types(self, result$output, "output", "train") + output + }, + predict = function(input) { + ... + input = check_types(self, input, "input", "predict") + R.cache::evalWithMemoization({ + output = private$.predict(input) + }, + key = list(map_chr(input, get_hash), self$hash) + ) + output = check_types(self, output, "output", "predict") + output + } + ), +``` + +where `get_hash` is: +``` +get_hash = function(x) { + if (!is.null(x$hash)) return(x$hash) + digest(x, algo = "xxhash64") +} +``` + +## Possible problems: + +A) Unfortunately `private$.train()` is not a pure function, but + instead has side-effects: + - sets a `$state` + - ... (others?) + +If we can ensure that the only side-effect of `$.train` is a modified state, +we could also memoize the state during `$train` (see above). + +If other fields are updated, we need to have a list of fields that are updated or go a different route. + +## Further Issues: + +F) Should caching be optional? + Probably yes! + +G) How do we globally enable/disable caching? + 1. global option + < ugly, might not work with parallelization. > + + 2. caching can be turned on in `Graph` | `GraphLearner` + ``` + Graph = R6Class( + ... + caching = TRUE, + ... + ) + ``` + `GraphLearner` gets an active binding to turn caching of it's graph on/off. + Could also be added as an arg to the `GraphLearner`s constructor. + + The caching of individual steps is then done by adjusting calls to `graph_reduce`: + `graph_reduce(..., caching = self$caching)` + +H) Should caching be disabled for some `PipeOp`s? + Yes, possible solution: New field in each `PipeOp`: `cached`. + Caching for a pipeop only happens if `cached = TRUE`. + Can also be manually changed to disable caching for any pipeop. + +Open Questions: + - How do `$train` and `$predict` know whether to do caching or not? + Add a second argument `caching`? + - How do caching and `parallelization` interact? + - Does `R.cache::evalWithMemoization`s `key` arg need anything else? From 7557694e9ab7f6dbc3b66852465f13cb7c4c146a Mon Sep 17 00:00:00 2001 From: pfistfl Date: Thu, 2 Apr 2020 09:53:11 +0200 Subject: [PATCH 04/26] more comments --- attic/caching.md | 49 ++++++++++++++++++++++++++++++ tests/testthat/test_caching.R | 57 ++++++++++++++++++----------------- 2 files changed, 78 insertions(+), 28 deletions(-) diff --git a/attic/caching.md b/attic/caching.md index c8cddfa82..f9ba414c2 100644 --- a/attic/caching.md +++ b/attic/caching.md @@ -6,6 +6,9 @@ The example currently uses the `R.cache` package by Henrik Bengtsson for caching This is just a very simple caching package, that provides a clean, simple API, could theoretically be replaced by other packages. +Caching can / should be done on two levels: + - caching of individual pipeops + - caching of full graphs ## Implementation Details @@ -54,6 +57,11 @@ get_hash = function(x) { } ``` +**Alternative:** + +Caching could also be done in `reduce_graph`. This would also simplify caching +whole graph vs. single pipeops. + ## Possible problems: A) Unfortunately `private$.train()` is not a pure function, but @@ -99,3 +107,44 @@ Open Questions: Add a second argument `caching`? - How do caching and `parallelization` interact? - Does `R.cache::evalWithMemoization`s `key` arg need anything else? + - If `state` is obtained from a stochastic function, how do we want this to behave? + +From @mb706: + +- PipeOps should contain metadata about whether they are deterministic or not, and whether + their .train() and .predict() results are the same whenever the input to both is the same (use common vs. separate cache) + + **Possible solution** + + 1. Add a new field: + ``` + cacheable = TRUE # or deterministic = TRUE + ``` + only `PipeOp`s where this holds are beeing cached. + + 2. For `cacheable = FALSE`, the `.Random.seed` is added to the caching `key`. + This would allow to cache reproducible workflows. + +- with some operations it may make more sense to save just the $state and not the result. + Then during $train() the caching mechanism can set the state from cache and call $.predict(). + + Question: How do we decide this? We should maybe think about an **API** for this. + +### Caching a full graph + +- caching in mlrCPO was a wrapper-PipeOp, we could also have that here. + Pro: For multiple operations only the last output needs to be saved; makes the configuration of different caching mechanisms easier. + Cons: We get the drawbacks of wrapping: the graph structure gets obscured. Also when wrapping multiple operations and just one of them is nondeterministic everything falls apart. We may want a ppl() function that wraps a graph optimally so that linear deterministic segments are cached together and only the output of the last PipeOp is kept. (Also works for arbitrary Graphs). + + Comments: + - Caching the graph: Yes! + Caching segments of the graph? + This makes things unneccessarily complicated. We could instead either cache the whole graph **or** if any po is nondeterministic, cache only deterministic pipeops. + + - **Possible solution** + 1. Wrap the graph as described above with pro's, con's. + + 2. Cache the graph's `$reduce_graph` method in `$train, $predict` (in `Graph.R`) + similarly to how `PipeOp`s are cached above. + This is only possible if all po's in a graph are deterministic. + diff --git a/tests/testthat/test_caching.R b/tests/testthat/test_caching.R index 79766c078..598f6fdba 100644 --- a/tests/testthat/test_caching.R +++ b/tests/testthat/test_caching.R @@ -54,38 +54,39 @@ test_that("Caching works for test hash pipeop", { }) -# test_that("Caching works for scale", { -# po = po("scale") -# old_hash = po$hash +test_that("Caching works for scale", { + + old_hash = po$hash -# po$train(list(tsk)) -# R.cache::saveCache(key = keya, "a") -# R.cache::loadCache(key = keya) -# R.cache::loadCache(key = keyb) - -# R.cache::findCache(key = list(map(list(tsk), "hash"), po$hash)) -# R.cache::findCache(key = list(map(list(tsk), "hash"), old_hash)) - -# po$train(list(tsk)) -# # R.cache::findCache(list(map(list(tsk), "hash"), po_hash)) + po$train(list(tsk)) + R.cache::saveCache(key = keya, "a") + R.cache::loadCache(key = keya) + R.cache::loadCache(key = keyb) -# tsk = tsk("zoo") -# print(po$hash) -# po$train(list(tsk)) -# print(po$hash) + R.cache::findCache(key = list(map(list(tsk), "hash"), po$hash)) + R.cache::findCache(key = list(map(list(tsk), "hash"), old_hash)) + po$train(list(tsk)) + # R.cache::findCache(list(map(list(tsk), "hash"), po_hash)) + + po = po("scale") + tsk = tsk("zoo") + po$train(list(tsk)) + + po = po("scale") + po$state -# po = po("nop") -# print(po$hash) -# R.cache::clearCache(prompt = FALSE) -# # R.cache::findCache(list(map(list(tsk), "hash"), po_hash)) -# po$train(list(tsk)) -# po$train(list(tsk)) + po = po("nop") + print(po$hash) + R.cache::clearCache(prompt = FALSE) + # R.cache::findCache(list(map(list(tsk), "hash"), po_hash)) + po$train(list(tsk)) + po$train(list(tsk)) -# a = po("scale") -# a$hash -# a$param_set$values$center = FALSE -# a$hash + a = po("scale") + a$hash + a$param_set$values$center = FALSE + a$hash -# }) +}) From cf807854e29878b9f5ad707e127231ba670c9a94 Mon Sep 17 00:00:00 2001 From: pfistfl Date: Thu, 2 Apr 2020 10:08:59 +0200 Subject: [PATCH 05/26] Move caching to graph_reduce --- R/Graph.R | 13 ++++++++++++- R/PipeOp.R | 22 +++------------------- 2 files changed, 15 insertions(+), 20 deletions(-) diff --git a/R/Graph.R b/R/Graph.R index 31c3cb6cf..0c49cd5c1 100644 --- a/R/Graph.R +++ b/R/Graph.R @@ -531,7 +531,13 @@ graph_reduce = function(self, input, fun, single_input) { input = input_tbl$payload names(input) = input_tbl$name - output = op[[fun]](input) + R.cache::evalWithMemoization( + {res_out = list(output = op[[fun]](input), state = op$state)}, + key = list(map_chr(input, get_hash), op$hash) + ) + if (is.null(op$state) && fun == "train") op = res_out$state # write cached state + output = res_out$output + if (self$keep_results) { op$.result = output } @@ -601,3 +607,8 @@ predict.Graph = function(object, newdata, ...) { } result } + +get_hash = function(x) { + if (!is.null(x$hash)) return(x$hash) + digest(x, algo = "xxhash64") +} \ No newline at end of file diff --git a/R/PipeOp.R b/R/PipeOp.R index a672a8430..232674d18 100644 --- a/R/PipeOp.R +++ b/R/PipeOp.R @@ -241,13 +241,8 @@ PipeOp = R6Class("PipeOp", return(named_list(self$output$name, NO_OP)) } input = check_types(self, input, "input", "train") - # caching - R.cache::evalWithMemoization({ - result = list(private$.train(input), self$state) - }, key = list(map_chr(input, get_hash), self$hash) - ) - if (is.null(self$state)) state = result$state - output = check_types(self, result$output, "output", "train") + output = list(private$.train(input), self$state) + output = check_types(self, output, "output", "train") output }, predict = function(input) { @@ -261,13 +256,7 @@ PipeOp = R6Class("PipeOp", stopf("Pipeop %s got NO_OP during train but no NO_OP during predict.", self$id) } input = check_types(self, input, "input", "predict") - R.cache::evalWithMemoization({ - output = private$.predict(input) - }, - key = list(map_chr(input, get_hash), self$hash) - ) - # print(digest(list(class(self), self$id), algo = "xxhash64")) - # print(digest(self$param_set, algo = "xxhash64")) + output = private$.predict(input) output = check_types(self, output, "output", "predict") output } @@ -376,8 +365,3 @@ check_types = function(self, data, direction, operation) { names(data) = typetable$name data } - -get_hash = function(x) { - if (!is.null(x$hash)) return(x$hash) - digest(x, algo = "xxhash64") -} \ No newline at end of file From 27e84f35b4ca96b98183685083bd9c5fbc8519dc Mon Sep 17 00:00:00 2001 From: pfistfl Date: Thu, 2 Apr 2020 10:15:57 +0200 Subject: [PATCH 06/26] Add current state --- attic/caching.md | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/attic/caching.md b/attic/caching.md index f9ba414c2..620da78ab 100644 --- a/attic/caching.md +++ b/attic/caching.md @@ -20,8 +20,7 @@ This would make complexity very manageable. `R.cache::evalWithMemoization` memoizes the provided expression. The `hash` is computed from its `key` argument. - -Possible solution: adjust `PipeOp` in `PipeOp.R` +~~Possible solution: adjust `PipeOp` in `PipeOp.R` ``` train = function(input) { ... @@ -48,6 +47,20 @@ train = function(input) { } ), ``` +~~ + +or alternatively in `graph_reduce`: + +The call to `op[[fun]](input)` calls the `PipeOp's` "train" and "predict" fun. + +``` + R.cache::evalWithMemoization( + {res_out = list(output = op[[fun]](input), state = op$state)}, + key = list(map_chr(input, get_hash), op$hash) + ) + if (is.null(op$state) && fun == "train") op = res_out$state # write cached state + output = res_out$output +``` where `get_hash` is: ``` From 86529b9a62df92f0d4d13291d912d9daeb0b6eae Mon Sep 17 00:00:00 2001 From: pfistfl Date: Fri, 3 Apr 2020 15:11:52 +0200 Subject: [PATCH 07/26] Document first caching draft --- R/Graph.R | 35 ++++++++++--- R/PipeOp.R | 27 +++++++++- R/PipeOpBranch.R | 3 +- R/PipeOpChunk.R | 14 +++++- R/PipeOpClassBalancing.R | 4 +- R/PipeOpCopy.R | 3 +- R/PipeOpImputeHist.R | 4 +- R/PipeOpImputeSample.R | 4 +- R/PipeOpNOP.R | 3 +- R/PipeOpProxy.R | 21 ++++++++ R/PipeOpSmote.R | 4 +- R/PipeOpThreshold.R | 3 +- R/PipeOpUnbranch.R | 3 +- attic/caching.md | 104 +++++++++++++++++++-------------------- man/Graph.Rd | 2 + man/PipeOp.Rd | 6 +++ 16 files changed, 167 insertions(+), 73 deletions(-) diff --git a/R/Graph.R b/R/Graph.R index 0c49cd5c1..0bb19e253 100644 --- a/R/Graph.R +++ b/R/Graph.R @@ -58,6 +58,8 @@ #' (and therefore their `$param_set$values`) and a hash of `$edges`. #' * `keep_results` :: `logical(1)` \cr #' Whether to store intermediate results in the [`PipeOp`]'s `$.result` slot, mostly for debugging purposes. Default `FALSE`. +#' * `cache` :: `logical(1)` \cr +#' Whether to cache individual [`PipeOp`]'s during "train" and "predict". Default `FALSE`. #' #' @section Methods: #' * `ids(sorted = FALSE)` \cr @@ -112,6 +114,7 @@ #' (`any`, `logical(1)`) -> `list` of `any` \cr #' Predict with the [`Graph`] by calling all the [`PipeOp`]'s `$train` methods. Input and output, as well as the function #' of the `single_input` argument, are analogous to `$train()`. +#' #' #' @name Graph #' @family mlr3pipelines backend related @@ -399,6 +402,14 @@ Graph = R6Class("Graph", } else { map(self$pipeops, "state") } + }, + cache = function(val) { + if (!missing(val)) { + assert_flag(val) + private$.cache = val + } else { + private$.cache = val + } } ), @@ -411,7 +422,8 @@ Graph = R6Class("Graph", value ) }, - .param_set = NULL + .param_set = NULL, + .cache = FALSE ) ) @@ -531,13 +543,11 @@ graph_reduce = function(self, input, fun, single_input) { input = input_tbl$payload names(input) = input_tbl$name - R.cache::evalWithMemoization( - {res_out = list(output = op[[fun]](input), state = op$state)}, - key = list(map_chr(input, get_hash), op$hash) - ) - if (is.null(op$state) && fun == "train") op = res_out$state # write cached state - output = res_out$output - + if (self$cache && po$cache) { + output = cached_pipeop_eval(self, op, fun, input) + } else { + output = op[[fun]](input) + } if (self$keep_results) { op$.result = output } @@ -611,4 +621,13 @@ predict.Graph = function(object, newdata, ...) { get_hash = function(x) { if (!is.null(x$hash)) return(x$hash) digest(x, algo = "xxhash64") +} + +cached_pipeop_eval = function(self, op, fun, input) { + R.cache::evalWithMemoization( + {res_out = list(output = op[[fun]](input), state = op$state)}, + key = list(map_chr(input, get_hash), op$hash) + ) + if (is.null(op$state) && fun == "train") op = res_out$state # write cached state + output = res_out$output } \ No newline at end of file diff --git a/R/PipeOp.R b/R/PipeOp.R index 385790ef9..a6370718c 100644 --- a/R/PipeOp.R +++ b/R/PipeOp.R @@ -115,6 +115,13 @@ #' If the [`Graph`]'s `$keep_results` flag is set to `TRUE`, then the intermediate Results of `$train()` and `$predict()` #' are saved to this slot, exactly as they are returned by these functions. This is mainly for debugging purposes #' and done, if requested, by the [`Graph`] backend itself; it should *not* be done explicitly by `private$.train()` or `private$.predict()`. +#' * `cache` :: `logical(1)` \cr +#' Whether to cache the [`PipeOp`]'s state and or output during "train" and "predict". Defaults to `TRUE`. +#' A [`PipeOp`] can only be cached if it is deterministic. +#' * `stochastic` :: `character` \cr +#' Whether a [`PipeOp`] is stochastic during `"train"`, `"predict"`, both, or not at all `character(0)`. +#' Defaults to `character(0)` (deterministic). +#' #' #' @section Methods: #' * `train(input)`\cr @@ -295,6 +302,22 @@ PipeOp = R6Class("PipeOp", hash = function() { digest(list(class(self), self$id, self$param_set$values), algo = "xxhash64") + }, + cache = function(val) { + if (!missing(val)) { + assert_flag(val) + private$.cache = val + } else { + private$.cache = val + } + }, + stochastic = function(val) { + if (!missing(val)) { + assert_subset(val, c("train", "predict")) + private$.stochastic = val + } else { + private$.stochastic = val + } } ), @@ -317,7 +340,9 @@ PipeOp = R6Class("PipeOp", .predict = function(input) stop("abstract"), .param_set = NULL, .param_set_source = NULL, - .id = NULL + .id = NULL, + .cache = TRUE, + .stochastic = character(0) ) ) diff --git a/R/PipeOpBranch.R b/R/PipeOpBranch.R index f02cae9f7..8986c4459 100644 --- a/R/PipeOpBranch.R +++ b/R/PipeOpBranch.R @@ -117,7 +117,8 @@ PipeOpBranch = R6Class("PipeOpBranch", ret = named_list(self$output$name, NO_OP) ret[[self$param_set$values$selection]] = inputs[[1]] ret - } + }, + .cache = FALSE ) ) diff --git a/R/PipeOpChunk.R b/R/PipeOpChunk.R index 2f784ac5d..b2b7470cf 100644 --- a/R/PipeOpChunk.R +++ b/R/PipeOpChunk.R @@ -75,6 +75,17 @@ PipeOpChunk = R6Class("PipeOpChunk", ) } ), + active = list( + stochastic = function(val) { + if (!missing(val)) { + assert_subset(val, c("train", "predict")) + private$.stochastic = val + } else { + if (self$param_set$values$shuffle) return("train") + character(0) + } + } + ), private = list( .train = function(inputs) { self$state = list() @@ -88,7 +99,8 @@ PipeOpChunk = R6Class("PipeOpChunk", }, .predict = function(inputs) { rep(inputs, self$outnum) - } + }, + .cache = FALSE ) ) diff --git a/R/PipeOpClassBalancing.R b/R/PipeOpClassBalancing.R index 2790a2f9f..b0736367f 100644 --- a/R/PipeOpClassBalancing.R +++ b/R/PipeOpClassBalancing.R @@ -160,7 +160,9 @@ PipeOpClassBalancing = R6Class("PipeOpClassBalancing", task_filter_ex(task, new_ids) }, - .predict_task = identity + .predict_task = identity, + .cache = FALSE, + .stochastic = "train" ) ) diff --git a/R/PipeOpCopy.R b/R/PipeOpCopy.R index 2dc9a7b11..fe3ba709b 100644 --- a/R/PipeOpCopy.R +++ b/R/PipeOpCopy.R @@ -99,7 +99,8 @@ PipeOpCopy = R6Class("PipeOpCopy", }, .predict = function(inputs) { rep_len(inputs, self$outnum) - } + }, + .cache = FALSE ) ) diff --git a/R/PipeOpImputeHist.R b/R/PipeOpImputeHist.R index e44b85f71..a5bd0fb0e 100644 --- a/R/PipeOpImputeHist.R +++ b/R/PipeOpImputeHist.R @@ -74,7 +74,9 @@ PipeOpImputeHist = R6Class("PipeOpImputeHist", } feature[is.na(feature)] = sampled feature - } + }, + .cache = FALSE, + .stochastic = c("train", "predict") ) ) diff --git a/R/PipeOpImputeSample.R b/R/PipeOpImputeSample.R index f19171950..177bdb50b 100644 --- a/R/PipeOpImputeSample.R +++ b/R/PipeOpImputeSample.R @@ -85,7 +85,9 @@ PipeOpImputeSample = R6Class("PipeOpImputeSample", feature[is.na(feature)] = sample(model, outlen, replace = TRUE) } feature - } + }, + .cache = FALSE, + .stochastic = c("train", "predict") ) ) diff --git a/R/PipeOpNOP.R b/R/PipeOpNOP.R index c2d70c5c8..87235cd72 100644 --- a/R/PipeOpNOP.R +++ b/R/PipeOpNOP.R @@ -75,7 +75,8 @@ PipeOpNOP = R6Class("PipeOpNOP", .predict = function(inputs) { inputs - } + }, + .cache = FALSE ) ) diff --git a/R/PipeOpProxy.R b/R/PipeOpProxy.R index 56600f825..c9725e58d 100644 --- a/R/PipeOpProxy.R +++ b/R/PipeOpProxy.R @@ -105,6 +105,27 @@ PipeOpProxy = R6Class("PipeOpProxy", ) } ), + active = list( + cache = function(val) { + if (!missing(val)) { + assert_flag(val) + self$param_set$values$content$cache = val + } else { + self$param_set$values$content$cache + } + }, + stochastic = function(val) { + if (!missing(val)) { + assert_subset(val, c("train", "predict")) + if (inherits(self$param_set$values$content, "Graph")) + stop("'stochastic' not be set when content is a graph!") + else + self$param_set$values$content$stochastic = val + } else { + self$param_set$values$content$stochastic + } + } + ), private = list( .param_set = NULL, .param_set_source = NULL, diff --git a/R/PipeOpSmote.R b/R/PipeOpSmote.R index 3f56ebef7..4c8a042ea 100644 --- a/R/PipeOpSmote.R +++ b/R/PipeOpSmote.R @@ -106,7 +106,9 @@ PipeOpSmote = R6Class("PipeOpSmote", } setnames(st, "class", task$target_names) task$rbind(st) - } + }, + .cache = FALSE, + .stochastic = "train" ) ) diff --git a/R/PipeOpThreshold.R b/R/PipeOpThreshold.R index 6ba93507d..df67f3b14 100644 --- a/R/PipeOpThreshold.R +++ b/R/PipeOpThreshold.R @@ -82,7 +82,8 @@ PipeOpThreshold = R6Class("PipeOpThreshold", } list(prd$set_threshold(thr)) - } + }, + .cache = FALSE ) ) diff --git a/R/PipeOpUnbranch.R b/R/PipeOpUnbranch.R index bcb4e6753..6255bfd7f 100644 --- a/R/PipeOpUnbranch.R +++ b/R/PipeOpUnbranch.R @@ -88,7 +88,8 @@ PipeOpUnbranch = R6Class("PipeOpUnbranch", }, .predict = function(inputs) { filter_noop(inputs) - } + }, + .cache = FALSE ) ) diff --git a/attic/caching.md b/attic/caching.md index 620da78ab..735611f43 100644 --- a/attic/caching.md +++ b/attic/caching.md @@ -6,60 +6,55 @@ The example currently uses the `R.cache` package by Henrik Bengtsson for caching This is just a very simple caching package, that provides a clean, simple API, could theoretically be replaced by other packages. -Caching can / should be done on two levels: - - caching of individual pipeops - - caching of full graphs + +## Preliminaries + +- Pipelines should be cached at a PipeOp level, as there are rarely situations where + caching a full Graph would be required (e.g. tuning a graph requires caching of individual steps). + +- PipeOps could either cache `state` and `result` during training or alternatively only `state` + when `predict` is comparatively cheap and the same transform steps can be done during `train` + and `predict`. For now we will call the latter pipeops `"predict_like_train"`. + This should be annotated in each `PipeOp`. Can default to `"predict_like_train"`. + +- PipeOps can be **stochastic**, either during `"train"`, `"predict"`, `"both"` or `"deterministic"`. + Implementation suggestion: + ``` + stochastic = c("train", "predict", "character(0)") # "character(0)" means not stochastic. + ``` + This needs to be annotated in each `PipeOp`. Could default to `"deterministic"`. + +- Caching can be turned on / off for individual a full `Graph` or individual `PipeOps`. + API for this could e.g. be: + - `Graph` has a `cache` slot, can be set to `TRUE` or `FALSE`, Default `FALSE`? + - `PipeOp` has a `cache` slot, can be set to `TRUE` or `FALSE`, Default `TRUE`? + `PipeOp`s that should never be cached (stochastic, meta, ...) are set to `FALSE`. + - If `Graph$cache && PipeOp$cache`, caching is active. + ## Implementation Details -Ideally we would like to do caching on an abstract level, -perhaps within the PipeOp base-classes `$train` function. -A very nice point would be to wrap the call to `private$.train`. -This would make complexity very manageable. +Ideally we would like to do caching on an abstract level, instead of writing a caching mechanism +for each `PipeOp`. `R.cache::evalWithMemoization` memoizes the provided expression. The `hash` is computed from its `key` argument. -~~Possible solution: adjust `PipeOp` in `PipeOp.R` -``` -train = function(input) { - ... - t = check_types(self, input, "input", "train") - # caching - R.cache::evalWithMemoization({ - result = list(private$.train(input), self$state) #(see A below) - }, key = list(map_chr(input, get_hash), self$hash) - ) - if (is.null(self$state)) state = result$state #(see A below) - output = check_types(self, result$output, "output", "train") - output - }, - predict = function(input) { - ... - input = check_types(self, input, "input", "predict") - R.cache::evalWithMemoization({ - output = private$.predict(input) - }, - key = list(map_chr(input, get_hash), self$hash) - ) - output = check_types(self, output, "output", "predict") - output - } - ), -``` -~~ - -or alternatively in `graph_reduce`: +Possible solution: apply caching in `graph_reduce` (`Graph.R`): -The call to `op[[fun]](input)` calls the `PipeOp's` "train" and "predict" fun. +The call to `op[[fun]](input)` calls each `PipeOp's` "train" and "predict" fun. ``` - R.cache::evalWithMemoization( - {res_out = list(output = op[[fun]](input), state = op$state)}, - key = list(map_chr(input, get_hash), op$hash) - ) - if (is.null(op$state) && fun == "train") op = res_out$state # write cached state - output = res_out$output + if (self$cache && op$cache) { # caching can be enabled / disabled + R.cache::evalWithMemoization( + {res_out = list(output = op[[fun]](input), state = op$state)}, + key = list(map_chr(input, get_hash), op$hash) # hash of input and hash of pipeop (latter includes param_vals) + ) + if (is.null(op$state) && fun == "train") op = res_out$state # write cached state + output = res_out$output + } else { + output = list(output = op[[fun]](input), state = op$state) + } ``` where `get_hash` is: @@ -70,21 +65,15 @@ get_hash = function(x) { } ``` -**Alternative:** - -Caching could also be done in `reduce_graph`. This would also simplify caching -whole graph vs. single pipeops. ## Possible problems: A) Unfortunately `private$.train()` is not a pure function, but instead has side-effects: - sets a `$state` - - ... (others?) If we can ensure that the only side-effect of `$.train` is a modified state, we could also memoize the state during `$train` (see above). - If other fields are updated, we need to have a list of fields that are updated or go a different route. ## Further Issues: @@ -110,10 +99,7 @@ G) How do we globally enable/disable caching? The caching of individual steps is then done by adjusting calls to `graph_reduce`: `graph_reduce(..., caching = self$caching)` -H) Should caching be disabled for some `PipeOp`s? - Yes, possible solution: New field in each `PipeOp`: `cached`. - Caching for a pipeop only happens if `cached = TRUE`. - Can also be manually changed to disable caching for any pipeop. +H) Caching for some `PipeOp`s can be manually changed to disable caching for any pipeop. Open Questions: - How do `$train` and `$predict` know whether to do caching or not? @@ -161,3 +147,13 @@ From @mb706: similarly to how `PipeOp`s are cached above. This is only possible if all po's in a graph are deterministic. + +### Caching non-deterministic `PipeOp`s + +This could be done if we add `Random.seed` to the `key`. +Additionally we would have to advance the `Random.seed` properly. +This could be added in future work, but might not be relevant now. + +It should be possible to enforce caching for stochastic `PipeOp`s. +Example: I want to evaluate choices (branches) made after or before a stochastic pipeop. + This would allow me to circumvent stochasticity. \ No newline at end of file diff --git a/man/Graph.Rd b/man/Graph.Rd index 2f0694850..3fd235781 100644 --- a/man/Graph.Rd +++ b/man/Graph.Rd @@ -66,6 +66,8 @@ Stores a checksum calculated on the \code{\link{Graph}} configuration, which inc (and therefore their \verb{$param_set$values}) and a hash of \verb{$edges}. \item \code{keep_results} :: \code{logical(1)} \cr Whether to store intermediate results in the \code{\link{PipeOp}}'s \verb{$.result} slot, mostly for debugging purposes. Default \code{FALSE}. +\item \code{cache} :: \code{logical(1)} \cr +Whether to cache individual \code{\link{PipeOp}}'s during "train" and "predict". Default \code{FALSE}. } } diff --git a/man/PipeOp.Rd b/man/PipeOp.Rd index 2e536ecfb..02bcd6b24 100644 --- a/man/PipeOp.Rd +++ b/man/PipeOp.Rd @@ -120,6 +120,12 @@ binding and calculate the hash as \verb{digest(list(super$hash, ), If the \code{\link{Graph}}'s \verb{$keep_results} flag is set to \code{TRUE}, then the intermediate Results of \verb{$train()} and \verb{$predict()} are saved to this slot, exactly as they are returned by these functions. This is mainly for debugging purposes and done, if requested, by the \code{\link{Graph}} backend itself; it should \emph{not} be done explicitly by \code{private$.train()} or \code{private$.predict()}. +\item \code{cache} :: \code{logical(1)} \cr +Whether to cache the \code{\link{PipeOp}}'s state and or output during "train" and "predict". Defaults to \code{TRUE}. +A \code{\link{PipeOp}} can only be cached if it is deterministic. +\item \code{stochastic} :: \code{character} \cr +Whether a \code{\link{PipeOp}} is stochastic during \code{"train"}, \code{"predict"}, both, or not at all \code{character(0)}. +Defaults to \code{character(0)} (deterministic). } } From 259b730829f44c50e0ee3ee331098ca42dbf7851 Mon Sep 17 00:00:00 2001 From: pfistfl Date: Fri, 3 Apr 2020 18:30:18 +0200 Subject: [PATCH 08/26] cached_pipeo_eval_fun --- R/Graph.R | 63 +++++++++++++++++++++++++++++++++++++++--------- attic/caching.md | 11 ++++++++- 2 files changed, 62 insertions(+), 12 deletions(-) diff --git a/R/Graph.R b/R/Graph.R index 0bb19e253..a2653f540 100644 --- a/R/Graph.R +++ b/R/Graph.R @@ -543,11 +543,8 @@ graph_reduce = function(self, input, fun, single_input) { input = input_tbl$payload names(input) = input_tbl$name - if (self$cache && po$cache) { - output = cached_pipeop_eval(self, op, fun, input) - } else { - output = op[[fun]](input) - } + output = cached_pipeop_eval(self, op, fun, input) + if (self$keep_results) { op$.result = output } @@ -623,11 +620,55 @@ get_hash = function(x) { digest(x, algo = "xxhash64") } + +# Cached train/predict of a PipeOp. +# 1) Caching is only performed if graph and po have `cache = TRUE` +# 2) Additonally caching is only performed if 'fun' (train / predict) is not stochastic +# for a given PipeOp +# 3) During training we have two options +# - Cache only state: +# This is possible if the train transform is the same as the predict transform +# and predict is comparatively cheap (i.e. filters). +# - Cache state and output +# (All other cases) cached_pipeop_eval = function(self, op, fun, input) { - R.cache::evalWithMemoization( - {res_out = list(output = op[[fun]](input), state = op$state)}, - key = list(map_chr(input, get_hash), op$hash) - ) - if (is.null(op$state) && fun == "train") op = res_out$state # write cached state - output = res_out$output + if (FALSE) { # turn caching off for unit tests for now, turn back on when stuff works again. + if (self$cache && po$cache) { + cache_key = list(map_chr(input, get_hash), op$hash) + if (fun == "train") { + if (fun %nin% op$stochastic) { + # Two options: cache state (can predict on train set using state during train) + # Or: do not cache state () (if upper is not possible) + if (cache_state) { + R.cache::evalWithMemoization({ + op[[fun]](input) + state = op$state + }, key = cache_key) + # Set state if PipeOp was cached + if (is.null(op$state) && fun == "train") op$state = state + # We call "predict" on train inputs, this avoids storing the outputs + # during training on disk. This is only possible for some pipeops. + output = op[["predict"]](input) + } else { + R.cache::evalWithMemoization({ + result = list(output = op[[fun]](input), state = op$state) + }, key = cache_key) + # Set state if PipeOp was cached + if (is.null(op$state) && fun == "train") op$state = result$state + output = result$output + } + return(output) + } + } else if (fun == "predict") { + if (fun %nin% op$stochastic) { + R.cache::evalWithMemoization( + {output = op[[fun]](input)}, + key = cache_key) + return(output) + } + } + } + } + # No caching fallback, anything where we do not run into conditions above + return(op[[fun]](input)) } \ No newline at end of file diff --git a/attic/caching.md b/attic/caching.md index 735611f43..a17862d52 100644 --- a/attic/caching.md +++ b/attic/caching.md @@ -156,4 +156,13 @@ This could be added in future work, but might not be relevant now. It should be possible to enforce caching for stochastic `PipeOp`s. Example: I want to evaluate choices (branches) made after or before a stochastic pipeop. - This would allow me to circumvent stochasticity. \ No newline at end of file + This would allow me to circumvent stochasticity. + + +### User Control for caching + +This basically could be handled via `R.cache`'s functionality, but should somehow be documented. + +### Testthat + +How do we disable caching during `unit` tests. \ No newline at end of file From 3ef805fb3219cd4a415c8cf5a13d08fa99773afa Mon Sep 17 00:00:00 2001 From: pfistfl Date: Fri, 3 Apr 2020 18:33:52 +0200 Subject: [PATCH 09/26] Errors in AB --- R/Graph.R | 4 ++-- R/PipeOp.R | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/R/Graph.R b/R/Graph.R index a2653f540..6ca2a27c5 100644 --- a/R/Graph.R +++ b/R/Graph.R @@ -408,7 +408,7 @@ Graph = R6Class("Graph", assert_flag(val) private$.cache = val } else { - private$.cache = val + private$.cache } } ), @@ -639,7 +639,7 @@ cached_pipeop_eval = function(self, op, fun, input) { if (fun %nin% op$stochastic) { # Two options: cache state (can predict on train set using state during train) # Or: do not cache state () (if upper is not possible) - if (cache_state) { + if (po$cache_state) { R.cache::evalWithMemoization({ op[[fun]](input) state = op$state diff --git a/R/PipeOp.R b/R/PipeOp.R index a6370718c..316a649bc 100644 --- a/R/PipeOp.R +++ b/R/PipeOp.R @@ -308,7 +308,7 @@ PipeOp = R6Class("PipeOp", assert_flag(val) private$.cache = val } else { - private$.cache = val + private$.cache } }, stochastic = function(val) { @@ -316,7 +316,7 @@ PipeOp = R6Class("PipeOp", assert_subset(val, c("train", "predict")) private$.stochastic = val } else { - private$.stochastic = val + private$.stochastic } } ), From 778708884f4d39ea79f7f5e3ad6703528f3d8b18 Mon Sep 17 00:00:00 2001 From: pfistfl Date: Fri, 3 Apr 2020 18:48:25 +0200 Subject: [PATCH 10/26] Bug --- R/PipeOp.R | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/R/PipeOp.R b/R/PipeOp.R index 316a649bc..c32700627 100644 --- a/R/PipeOp.R +++ b/R/PipeOp.R @@ -247,7 +247,7 @@ PipeOp = R6Class("PipeOp", return(named_list(self$output$name, NO_OP)) } input = check_types(self, input, "input", "train") - output = list(private$.train(input), self$state) + output = private$.train(input) output = check_types(self, output, "output", "train") output }, From 416ae54744ac03d9e9eebf9ba0054d5e157a8339 Mon Sep 17 00:00:00 2001 From: pfistfl Date: Fri, 3 Apr 2020 19:01:57 +0200 Subject: [PATCH 11/26] Disable tests --- R/Graph.R | 2 +- tests/testthat/test_caching.R | 160 +++++++++++++++++----------------- 2 files changed, 81 insertions(+), 81 deletions(-) diff --git a/R/Graph.R b/R/Graph.R index 6ca2a27c5..6fb9aa71b 100644 --- a/R/Graph.R +++ b/R/Graph.R @@ -671,4 +671,4 @@ cached_pipeop_eval = function(self, op, fun, input) { } # No caching fallback, anything where we do not run into conditions above return(op[[fun]](input)) -} \ No newline at end of file +} diff --git a/tests/testthat/test_caching.R b/tests/testthat/test_caching.R index 598f6fdba..969693a33 100644 --- a/tests/testthat/test_caching.R +++ b/tests/testthat/test_caching.R @@ -1,92 +1,92 @@ context("Caching") -test_that("Caching works for test hash pipeop", { - - PipeOpTestHash = R6Class("PipeOpTestHash", - inherit = PipeOp, - public = list( - initialize = function(id = "test.hash", param_set = ParamSet$new()) { - super$initialize(id = id, param_set = param_set, - input = data.table(name = "input", train = "*", predict = "*"), - output = data.table(name = "output", train = "*", predict = "*") - ) - }), - private = list( - .train = function(inputs) { - Sys.sleep(1) - self$state = list() - inputs - }, - .predict = function(inputs) { - Sys.sleep(1) - inputs - } - ) - ) - - # FIXME: - # This could fail if load is very high, nonetheless I would keep it. - - tsk = tsk("iris") - po = PipeOpTestHash$new() - - # Takes > 1 second - R.cache::clearCache(prompt = FALSE) - st = Sys.time() - po$train(list(tsk)) - expect_true(st < Sys.time() - 1) - - # takes < 1 second - st = Sys.time() - po$train(list(tsk)) - expect_true(st > Sys.time() - 1) - - # Takes > 1 second - st = Sys.time() - po$predict(list(tsk)) - expect_true(st < Sys.time() - 1) - - # takes < 1 second - st = Sys.time() - po$predict(list(tsk)) - expect_true(st > Sys.time() - 1) - -}) - - -test_that("Caching works for scale", { +# test_that("Caching works for test hash pipeop", { + +# PipeOpTestHash = R6Class("PipeOpTestHash", +# inherit = PipeOp, +# public = list( +# initialize = function(id = "test.hash", param_set = ParamSet$new()) { +# super$initialize(id = id, param_set = param_set, +# input = data.table(name = "input", train = "*", predict = "*"), +# output = data.table(name = "output", train = "*", predict = "*") +# ) +# }), +# private = list( +# .train = function(inputs) { +# Sys.sleep(1) +# self$state = list() +# inputs +# }, +# .predict = function(inputs) { +# Sys.sleep(1) +# inputs +# } +# ) +# ) + +# # FIXME: +# # This could fail if load is very high, nonetheless I would keep it. + +# tsk = tsk("iris") +# po = PipeOpTestHash$new() + +# # Takes > 1 second +# R.cache::clearCache(prompt = FALSE) +# st = Sys.time() +# po$train(list(tsk)) +# expect_true(st < Sys.time() - 1) + +# # takes < 1 second +# st = Sys.time() +# po$train(list(tsk)) +# expect_true(st > Sys.time() - 1) + +# # Takes > 1 second +# st = Sys.time() +# po$predict(list(tsk)) +# expect_true(st < Sys.time() - 1) + +# # takes < 1 second +# st = Sys.time() +# po$predict(list(tsk)) +# expect_true(st > Sys.time() - 1) + +# }) + + +# test_that("Caching works for scale", { - old_hash = po$hash +# old_hash = po$hash - po$train(list(tsk)) - R.cache::saveCache(key = keya, "a") - R.cache::loadCache(key = keya) - R.cache::loadCache(key = keyb) +# po$train(list(tsk)) +# R.cache::saveCache(key = keya, "a") +# R.cache::loadCache(key = keya) +# R.cache::loadCache(key = keyb) - R.cache::findCache(key = list(map(list(tsk), "hash"), po$hash)) - R.cache::findCache(key = list(map(list(tsk), "hash"), old_hash)) +# R.cache::findCache(key = list(map(list(tsk), "hash"), po$hash)) +# R.cache::findCache(key = list(map(list(tsk), "hash"), old_hash)) - po$train(list(tsk)) - # R.cache::findCache(list(map(list(tsk), "hash"), po_hash)) +# po$train(list(tsk)) +# # R.cache::findCache(list(map(list(tsk), "hash"), po_hash)) - po = po("scale") - tsk = tsk("zoo") - po$train(list(tsk)) +# po = po("scale") +# tsk = tsk("zoo") +# po$train(list(tsk)) - po = po("scale") - po$state +# po = po("scale") +# po$state - po = po("nop") - print(po$hash) - R.cache::clearCache(prompt = FALSE) - # R.cache::findCache(list(map(list(tsk), "hash"), po_hash)) - po$train(list(tsk)) - po$train(list(tsk)) +# po = po("nop") +# print(po$hash) +# R.cache::clearCache(prompt = FALSE) +# # R.cache::findCache(list(map(list(tsk), "hash"), po_hash)) +# po$train(list(tsk)) +# po$train(list(tsk)) - a = po("scale") - a$hash - a$param_set$values$center = FALSE - a$hash +# a = po("scale") +# a$hash +# a$param_set$values$center = FALSE +# a$hash -}) +# }) From bbad9d19ed528f417b35bddcc8982fa2c729523a Mon Sep 17 00:00:00 2001 From: pfistfl Date: Fri, 3 Apr 2020 19:37:38 +0200 Subject: [PATCH 12/26] Add caching --- R/PipeOp.R | 6 ++++++ R/PipeOpClassBalancing.R | 3 ++- R/PipeOpProxy.R | 7 +++++++ R/PipeOpSubsample.R | 4 ++-- R/PipeOpThreshold.R | 3 ++- attic/caching.md | 23 ++++++++++++++++++++--- 6 files changed, 39 insertions(+), 7 deletions(-) diff --git a/R/PipeOp.R b/R/PipeOp.R index c32700627..67927343d 100644 --- a/R/PipeOp.R +++ b/R/PipeOp.R @@ -318,6 +318,12 @@ PipeOp = R6Class("PipeOp", } else { private$.stochastic } + }, + cache_state = function(val) { + if (!missing(val)) { + stop("cache_state is read-only!") + } + private$.cache_state } ), diff --git a/R/PipeOpClassBalancing.R b/R/PipeOpClassBalancing.R index b0736367f..60094881f 100644 --- a/R/PipeOpClassBalancing.R +++ b/R/PipeOpClassBalancing.R @@ -162,7 +162,8 @@ PipeOpClassBalancing = R6Class("PipeOpClassBalancing", .predict_task = identity, .cache = FALSE, - .stochastic = "train" + .stochastic = "train", + .cache_state = FALSE ) ) diff --git a/R/PipeOpProxy.R b/R/PipeOpProxy.R index c9725e58d..519903d0b 100644 --- a/R/PipeOpProxy.R +++ b/R/PipeOpProxy.R @@ -124,6 +124,13 @@ PipeOpProxy = R6Class("PipeOpProxy", } else { self$param_set$values$content$stochastic } + }, + cache_state = function(val) { + if (!missing(val)) { + stop("cache_state is read-only!") + } else { + self$param_set$values$content$cache_state + } } ), private = list( diff --git a/R/PipeOpSubsample.R b/R/PipeOpSubsample.R index f746a3b2f..cea2606e0 100644 --- a/R/PipeOpSubsample.R +++ b/R/PipeOpSubsample.R @@ -93,8 +93,8 @@ PipeOpSubsample = R6Class("PipeOpSubsample", self$state = list() task_filter_ex(task, keep) }, - - .predict_task = identity + .predict_task = identity, + .cache_state = FALSE ) ) diff --git a/R/PipeOpThreshold.R b/R/PipeOpThreshold.R index df67f3b14..a90f5c1b0 100644 --- a/R/PipeOpThreshold.R +++ b/R/PipeOpThreshold.R @@ -83,7 +83,8 @@ PipeOpThreshold = R6Class("PipeOpThreshold", list(prd$set_threshold(thr)) }, - .cache = FALSE + .cache = FALSE, + .cache_state = FALSE ) ) diff --git a/attic/caching.md b/attic/caching.md index a17862d52..1abe5205a 100644 --- a/attic/caching.md +++ b/attic/caching.md @@ -22,7 +22,7 @@ theoretically be replaced by other packages. ``` stochastic = c("train", "predict", "character(0)") # "character(0)" means not stochastic. ``` - This needs to be annotated in each `PipeOp`. Could default to `"deterministic"`. + This needs to be annotated in each `PipeOp`. Could default to deterministic. - Caching can be turned on / off for individual a full `Graph` or individual `PipeOps`. API for this could e.g. be: @@ -32,6 +32,22 @@ theoretically be replaced by other packages. - If `Graph$cache && PipeOp$cache`, caching is active. +**Current implementation:** + +- `PipeOp` gets the following new slots: + - `stochastic`: can be c("train", "predict", character(0)). Default `character(0)`, set for some pos. + - `cache`: Whether the `PipeOp` should be cached. Default `TRUE`, set to `FALSE` for some pos. + - `cache_state`: Whether it is sufficient to cache the `$state`. + + Those slots are `xxx` AB's pointing to `private$.xxxx` + +- `Graph` gets the following new slots: + - `cache`: Whether the `Graph` should be cached. Default `TRUE`, set to `FALSE` for some pos. + +- New function called within `graph_reduce`: `cached_pipeop_eval`. See **Implementation Details** below. + + + ## Implementation Details Ideally we would like to do caching on an abstract level, instead of writing a caching mechanism @@ -40,12 +56,13 @@ for each `PipeOp`. `R.cache::evalWithMemoization` memoizes the provided expression. The `hash` is computed from its `key` argument. -Possible solution: apply caching in `graph_reduce` (`Graph.R`): +Possible solution: apply caching during `graph_reduce` (`Graph.R`): The call to `op[[fun]](input)` calls each `PipeOp's` "train" and "predict" fun. +Note: This is a simplified version, see the actual implementation `cached_pipeop_eval` in `graph.R`. ``` - if (self$cache && op$cache) { # caching can be enabled / disabled + if (graph$cache && op$cache) { # caching can be enabled / disabled on graph and pipeop level R.cache::evalWithMemoization( {res_out = list(output = op[[fun]](input), state = op$state)}, key = list(map_chr(input, get_hash), op$hash) # hash of input and hash of pipeop (latter includes param_vals) From 1287e77ab3f7c4da43dd478ce7bc8eb8c87568e3 Mon Sep 17 00:00:00 2001 From: pfistfl Date: Fri, 3 Apr 2020 19:51:27 +0200 Subject: [PATCH 13/26] Test ABs --- R/PipeOp.R | 1 + attic/caching.md | 20 ++++++++++---------- tests/testthat/helper_functions.R | 3 +++ tests/testthat/test_Graph.R | 7 +++++++ tests/testthat/test_PipeOp.R | 12 ++++++++++++ 5 files changed, 33 insertions(+), 10 deletions(-) diff --git a/R/PipeOp.R b/R/PipeOp.R index 67927343d..6f77837b3 100644 --- a/R/PipeOp.R +++ b/R/PipeOp.R @@ -348,6 +348,7 @@ PipeOp = R6Class("PipeOp", .param_set_source = NULL, .id = NULL, .cache = TRUE, + .cache_state = TRUE, .stochastic = character(0) ) ) diff --git a/attic/caching.md b/attic/caching.md index 1abe5205a..54c9f6477 100644 --- a/attic/caching.md +++ b/attic/caching.md @@ -62,16 +62,16 @@ The call to `op[[fun]](input)` calls each `PipeOp's` "train" and "predict" fun. Note: This is a simplified version, see the actual implementation `cached_pipeop_eval` in `graph.R`. ``` - if (graph$cache && op$cache) { # caching can be enabled / disabled on graph and pipeop level - R.cache::evalWithMemoization( - {res_out = list(output = op[[fun]](input), state = op$state)}, - key = list(map_chr(input, get_hash), op$hash) # hash of input and hash of pipeop (latter includes param_vals) - ) - if (is.null(op$state) && fun == "train") op = res_out$state # write cached state - output = res_out$output - } else { - output = list(output = op[[fun]](input), state = op$state) - } + if (graph$cache && op$cache) { # caching can be enabled / disabled on graph and pipeop level + R.cache::evalWithMemoization( + {res_out = list(output = op[[fun]](input), state = op$state)}, + key = list(map_chr(input, get_hash), op$hash) # hash of input and hash of pipeop (latter includes param_vals) + ) + if (is.null(op$state) && fun == "train") op = res_out$state # write cached state + output = res_out$output + } else { + output = list(output = op[[fun]](input), state = op$state) + } ``` where `get_hash` is: diff --git a/tests/testthat/helper_functions.R b/tests/testthat/helper_functions.R index 28501545f..7b14fe673 100644 --- a/tests/testthat/helper_functions.R +++ b/tests/testthat/helper_functions.R @@ -99,6 +99,9 @@ expect_pipeop = function(po) { expect_names(names(po$output), permutation.of = c("name", "train", "predict")) expect_int(po$innum, lower = 1) expect_int(po$outnum, lower = 1) + expect_flag(po$cache) + expect_flag(po$cache_state) + expect_character(po$stochastic) # at least one of "train" or "predict" must be in every parameter's tag testthat::expect_true(every(po$param_set$tags, function(x) length(intersect(c("train", "predict"), x)) > 0)) diff --git a/tests/testthat/test_Graph.R b/tests/testthat/test_Graph.R index b140693ff..200db9db1 100644 --- a/tests/testthat/test_Graph.R +++ b/tests/testthat/test_Graph.R @@ -365,3 +365,10 @@ test_that("Graph with vararg input", { gr$train(list(1, t1, t2, 2, 3), single_input = FALSE)) }) + +test_that("Caching ABs", { + gr = as_graph(po("scale")) + expect_true(!gr$cache) + gr$cache = TRUE + expect_true(gr$cache) +}) diff --git a/tests/testthat/test_PipeOp.R b/tests/testthat/test_PipeOp.R index 0342f0525..c870e18ee 100644 --- a/tests/testthat/test_PipeOp.R +++ b/tests/testthat/test_PipeOp.R @@ -67,3 +67,15 @@ test_that("Errors occur for inputs", { po$param_set = ParamSet$new() }, "read-only") }) + +test_that("Caching ABs", { + po = po("scale") + expect_true(po$cache) + expect_true(po$cache_state) + expect_true(length(po$stochastic) == 0L) + po$cache = FALSE + expect_error({po$cache_state = TRUE}) + po$stochastic = "train" + expect_true(!po$cache) + expect_true(po$stochastic == "train") +}) From 5b58cb727f85c86ca8d7bdeb9799316f731943aa Mon Sep 17 00:00:00 2001 From: pfistfl Date: Fri, 3 Apr 2020 23:18:51 +0200 Subject: [PATCH 14/26] Docs and finish up tests --- R/Graph.R | 43 ++++--- R/PipeOp.R | 26 ++-- attic/caching.md | 51 ++++++-- man/Graph.Rd | 7 ++ man/PipeOp.Rd | 14 ++- tests/testthat/test_caching.R | 226 +++++++++++++++++++++------------- 6 files changed, 239 insertions(+), 128 deletions(-) diff --git a/R/Graph.R b/R/Graph.R index 6fb9aa71b..f9fd39f0b 100644 --- a/R/Graph.R +++ b/R/Graph.R @@ -60,6 +60,13 @@ #' Whether to store intermediate results in the [`PipeOp`]'s `$.result` slot, mostly for debugging purposes. Default `FALSE`. #' * `cache` :: `logical(1)` \cr #' Whether to cache individual [`PipeOp`]'s during "train" and "predict". Default `FALSE`. +#' Caching is performed using the [`R.cache`](R.cache::R.cache) package. +#' Caching can be disabled globally using `getOption("R.cache.enabled", TRUE)`. +#' By default, files are cached in `R.cache::getCacheRootPath()`. +#' For more information on how to set the cache path or retrieve cached items please consider +#' the [`R.cache`](R.cache::R.cache) documentation. +#' Caching can be fine-controlled for each [`PipeOp`] by adjusting individual [`PipeOp`]'s +#' `cache`, `cache_state` and `stochastic` fields. #' #' @section Methods: #' * `ids(sorted = FALSE)` \cr @@ -615,12 +622,6 @@ predict.Graph = function(object, newdata, ...) { result } -get_hash = function(x) { - if (!is.null(x$hash)) return(x$hash) - digest(x, algo = "xxhash64") -} - - # Cached train/predict of a PipeOp. # 1) Caching is only performed if graph and po have `cache = TRUE` # 2) Additonally caching is only performed if 'fun' (train / predict) is not stochastic @@ -632,34 +633,35 @@ get_hash = function(x) { # - Cache state and output # (All other cases) cached_pipeop_eval = function(self, op, fun, input) { - if (FALSE) { # turn caching off for unit tests for now, turn back on when stuff works again. - if (self$cache && po$cache) { + + if (self$cache && op$cache) { cache_key = list(map_chr(input, get_hash), op$hash) if (fun == "train") { if (fun %nin% op$stochastic) { # Two options: cache state (can predict on train set using state during train) # Or: do not cache state () (if upper is not possible) - if (po$cache_state) { + if (op$cache_state) { + # only cache state R.cache::evalWithMemoization({ op[[fun]](input) - state = op$state + state = op$state }, key = cache_key) - # Set state if PipeOp was cached + # Set state if PipeOp was cached (and "train" was therefore not called) if (is.null(op$state) && fun == "train") op$state = state # We call "predict" on train inputs, this avoids storing the outputs - # during training on disk. This is only possible for some pipeops. - output = op[["predict"]](input) + # during training on disk. This is only possible for pipeops where 'cache_state' is TRUE. + return(cached_pipeop_eval(self, op, "predict", input)) } else { + # Otherwise we cache state and input R.cache::evalWithMemoization({ result = list(output = op[[fun]](input), state = op$state) }, key = cache_key) # Set state if PipeOp was cached if (is.null(op$state) && fun == "train") op$state = result$state - output = result$output + return(result$output) } - return(output) } - } else if (fun == "predict") { + } else if (fun == "predict" && !op$cache_state) { if (fun %nin% op$stochastic) { R.cache::evalWithMemoization( {output = op[[fun]](input)}, @@ -668,7 +670,14 @@ cached_pipeop_eval = function(self, op, fun, input) { } } } - } # No caching fallback, anything where we do not run into conditions above return(op[[fun]](input)) } + +get_hash = function(x) { + hash = try(x$hash, silent = TRUE) + if (inherits(hash, "try-error") || is.null(hash)) + hash = digest(x, algo = "xxhash64") + hash +} + diff --git a/R/PipeOp.R b/R/PipeOp.R index 6f77837b3..361e43f6a 100644 --- a/R/PipeOp.R +++ b/R/PipeOp.R @@ -117,10 +117,18 @@ #' and done, if requested, by the [`Graph`] backend itself; it should *not* be done explicitly by `private$.train()` or `private$.predict()`. #' * `cache` :: `logical(1)` \cr #' Whether to cache the [`PipeOp`]'s state and or output during "train" and "predict". Defaults to `TRUE`. -#' A [`PipeOp`] can only be cached if it is deterministic. +#' See the `cache` field in [`Graph`] for more detailed information on caching as well as `cache_state` and +#' `stochastic` below. +#' A [`PipeOp`] is only cached if it is deterministic. +#' * `cache_state` :: `logical(1)` \cr +#' Whether the [`PipeOp`]s behaviour during training is equal to behaviour during prediction +#' (other then setting a state). In this case, only the [`PipeOp`]s state is cached. +#' This avoids caching possibly large intermediate results. +#' Defaults to `TRUE`. #' * `stochastic` :: `character` \cr -#' Whether a [`PipeOp`] is stochastic during `"train"`, `"predict"`, both, or not at all `character(0)`. -#' Defaults to `character(0)` (deterministic). +#' Whether a [`PipeOp`] is stochastic during `"train"`, `"predict"`, or not at all: `character(0)`. +#' Defaults to `character(0)` (deterministic). Stochastic [`PipeOp`]s are not cached during the +#' respective phase. #' #' #' @section Methods: @@ -311,6 +319,12 @@ PipeOp = R6Class("PipeOp", private$.cache } }, + cache_state = function(val) { + if (!missing(val)) { + stop("cache_state is read-only!") + } + private$.cache_state + }, stochastic = function(val) { if (!missing(val)) { assert_subset(val, c("train", "predict")) @@ -318,12 +332,6 @@ PipeOp = R6Class("PipeOp", } else { private$.stochastic } - }, - cache_state = function(val) { - if (!missing(val)) { - stop("cache_state is read-only!") - } - private$.cache_state } ), diff --git a/attic/caching.md b/attic/caching.md index 54c9f6477..1aa076c10 100644 --- a/attic/caching.md +++ b/attic/caching.md @@ -62,23 +62,54 @@ The call to `op[[fun]](input)` calls each `PipeOp's` "train" and "predict" fun. Note: This is a simplified version, see the actual implementation `cached_pipeop_eval` in `graph.R`. ``` - if (graph$cache && op$cache) { # caching can be enabled / disabled on graph and pipeop level - R.cache::evalWithMemoization( - {res_out = list(output = op[[fun]](input), state = op$state)}, - key = list(map_chr(input, get_hash), op$hash) # hash of input and hash of pipeop (latter includes param_vals) - ) - if (is.null(op$state) && fun == "train") op = res_out$state # write cached state - output = res_out$output - } else { - output = list(output = op[[fun]](input), state = op$state) +cached_pipeop_eval = function(self, op, fun, input) { + + if (self$cache && op$cache) { + cache_key = list(map_chr(input, get_hash), op$hash) + if (fun == "train") { + if (fun %nin% op$stochastic) { + # Two options: cache state (can predict on train set using state during train) + # Or: do not cache state () (if upper is not possible) + if (op$cache_state) { + R.cache::evalWithMemoization({ + op[[fun]](input) + state = op$state + }, key = cache_key) + # Set state if PipeOp was cached + if (is.null(op$state) && fun == "train") op$state = state + # We call "predict" on train inputs, this avoids storing the outputs + # during training on disk. This is only possible for some pipeops. + cached_pipeop_eval(self, op, "predict", input) + } else { + R.cache::evalWithMemoization({ + result = list(output = op[[fun]](input), state = op$state) + }, key = cache_key) + # Set state if PipeOp was cached + if (is.null(op$state) && fun == "train") op$state = result$state + return(result$output) + } + } + } else if (fun == "predict" && !op$cache_state) { + if (fun %nin% op$stochastic) { + R.cache::evalWithMemoization( + {output = op[[fun]](input)}, + key = cache_key) + return(output) + } + } } + # No caching fallback, anything where we do not run into conditions above + return(op[[fun]](input)) +} ``` where `get_hash` is: ``` get_hash = function(x) { - if (!is.null(x$hash)) return(x$hash) + hash = try(x$hash, silent = TRUE) + if (inherits(hash, "try-error")) digest(x, algo = "xxhash64") + return(hash) } ``` diff --git a/man/Graph.Rd b/man/Graph.Rd index 3fd235781..ed9c492be 100644 --- a/man/Graph.Rd +++ b/man/Graph.Rd @@ -68,6 +68,13 @@ Stores a checksum calculated on the \code{\link{Graph}} configuration, which inc Whether to store intermediate results in the \code{\link{PipeOp}}'s \verb{$.result} slot, mostly for debugging purposes. Default \code{FALSE}. \item \code{cache} :: \code{logical(1)} \cr Whether to cache individual \code{\link{PipeOp}}'s during "train" and "predict". Default \code{FALSE}. +Caching is performed using the \href{R.cache::R.cache}{\code{R.cache}} package. +Caching can be disabled globally using \code{getOption("R.cache.enabled", TRUE)}. +By default, files are cached in \code{R.cache::getCacheRootPath()}. +For more information on how to set the cache path or retrieve cached items please consider +the \href{R.cache::R.cache}{\code{R.cache}} documentation. +Caching can be fine-controlled for each \code{\link{PipeOp}} by adjusting individual \code{\link{PipeOp}}'s +\code{cache}, \code{cache_state} and \code{stochastic} fields. } } diff --git a/man/PipeOp.Rd b/man/PipeOp.Rd index 02bcd6b24..eef6737e5 100644 --- a/man/PipeOp.Rd +++ b/man/PipeOp.Rd @@ -122,10 +122,18 @@ are saved to this slot, exactly as they are returned by these functions. This is and done, if requested, by the \code{\link{Graph}} backend itself; it should \emph{not} be done explicitly by \code{private$.train()} or \code{private$.predict()}. \item \code{cache} :: \code{logical(1)} \cr Whether to cache the \code{\link{PipeOp}}'s state and or output during "train" and "predict". Defaults to \code{TRUE}. -A \code{\link{PipeOp}} can only be cached if it is deterministic. +See the \code{cache} field in \code{\link{Graph}} for more detailed information on caching as well as \code{cache_state} and +\code{stochastic} below. +A \code{\link{PipeOp}} is only cached if it is deterministic. +\item \code{cache_state} :: \code{logical(1)} \cr +Whether the \code{\link{PipeOp}}s behaviour during training is equal to behaviour during prediction +(other then setting a state). In this case, only the \code{\link{PipeOp}}s state is cached. +This avoids caching possibly large intermediate results. +Defaults to \code{TRUE}. \item \code{stochastic} :: \code{character} \cr -Whether a \code{\link{PipeOp}} is stochastic during \code{"train"}, \code{"predict"}, both, or not at all \code{character(0)}. -Defaults to \code{character(0)} (deterministic). +Whether a \code{\link{PipeOp}} is stochastic during \code{"train"}, \code{"predict"}, or not at all: \code{character(0)}. +Defaults to \code{character(0)} (deterministic). Stochastic \code{\link{PipeOp}}s are not cached during the +respective phase. } } diff --git a/tests/testthat/test_caching.R b/tests/testthat/test_caching.R index 969693a33..a31d146bc 100644 --- a/tests/testthat/test_caching.R +++ b/tests/testthat/test_caching.R @@ -1,92 +1,140 @@ context("Caching") -# test_that("Caching works for test hash pipeop", { - -# PipeOpTestHash = R6Class("PipeOpTestHash", -# inherit = PipeOp, -# public = list( -# initialize = function(id = "test.hash", param_set = ParamSet$new()) { -# super$initialize(id = id, param_set = param_set, -# input = data.table(name = "input", train = "*", predict = "*"), -# output = data.table(name = "output", train = "*", predict = "*") -# ) -# }), -# private = list( -# .train = function(inputs) { -# Sys.sleep(1) -# self$state = list() -# inputs -# }, -# .predict = function(inputs) { -# Sys.sleep(1) -# inputs -# } -# ) -# ) - -# # FIXME: -# # This could fail if load is very high, nonetheless I would keep it. - -# tsk = tsk("iris") -# po = PipeOpTestHash$new() - -# # Takes > 1 second -# R.cache::clearCache(prompt = FALSE) -# st = Sys.time() -# po$train(list(tsk)) -# expect_true(st < Sys.time() - 1) - -# # takes < 1 second -# st = Sys.time() -# po$train(list(tsk)) -# expect_true(st > Sys.time() - 1) - -# # Takes > 1 second -# st = Sys.time() -# po$predict(list(tsk)) -# expect_true(st < Sys.time() - 1) - -# # takes < 1 second -# st = Sys.time() -# po$predict(list(tsk)) -# expect_true(st > Sys.time() - 1) - -# }) - - -# test_that("Caching works for scale", { - -# old_hash = po$hash - -# po$train(list(tsk)) -# R.cache::saveCache(key = keya, "a") -# R.cache::loadCache(key = keya) -# R.cache::loadCache(key = keyb) - -# R.cache::findCache(key = list(map(list(tsk), "hash"), po$hash)) -# R.cache::findCache(key = list(map(list(tsk), "hash"), old_hash)) - -# po$train(list(tsk)) -# # R.cache::findCache(list(map(list(tsk), "hash"), po_hash)) - -# po = po("scale") -# tsk = tsk("zoo") -# po$train(list(tsk)) - -# po = po("scale") -# po$state - -# po = po("nop") -# print(po$hash) -# R.cache::clearCache(prompt = FALSE) -# # R.cache::findCache(list(map(list(tsk), "hash"), po_hash)) -# po$train(list(tsk)) -# po$train(list(tsk)) +test_that("Caching works for test hash pipeop", { + + PipeOpTestHash = R6Class("PipeOpTestHash", + inherit = PipeOp, + public = list( + initialize = function(id = "test.hash", param_set = ParamSet$new()) { + super$initialize(id = id, param_set = param_set, + input = data.table(name = "input", train = "*", predict = "*"), + output = data.table(name = "output", train = "*", predict = "*") + ) + }), + active = list( + cache_state = function(val) { + if (missing(val)) + return(private$.cache_state) + private$.cache_state = val + } + ), + private = list( + .train = function(inputs) { + Sys.sleep(1) + message("sleeping train") + self$state = list("train") + inputs + }, + .predict = function(inputs) { + if (inputs[[1]] == "predict") { + Sys.sleep(1) + message("sleeping predict") + } + inputs + }, + .cache = TRUE, + .cache_state = TRUE + ) + ) + + # caching is only enabled for graphs + gr = as_graph(PipeOpTestHash$new()) + gr$cache = TRUE + + # takes > 1 second + R.cache::clearCache(prompt = FALSE) + st = Sys.time() + expect_message(gr$train("train"), "sleeping train") + expect_true(st < Sys.time() - 1) + + # takes > 1 second + st = Sys.time() + expect_message(gr$predict("predict"), "sleeping predict") + expect_true(st < Sys.time() - 1) + + # cached train takes < 1 second + st = Sys.time() + expect_silent(gr$train("train")) + expect_true(gr$train("train") == "train") + expect_true(st > (Sys.time() - 1)) + + # uncached (cach_state) predict takes > 1 second + st = Sys.time() + expect_message(gr$predict("predict"), "sleeping predict") + expect_true(st < Sys.time() - 1) + + # Obtain result from cache: + key = list(map_chr(list(input = "train"), get_hash), PipeOpTestHash$new()$hash) + # R.cache appends the expression to the key before storing + expr = substitute({ + op[[fun]](input) + state = op$state + }) + key = c(list(expr = expr), key) + expect_equal(R.cache::loadCache(key)$results[[1]], "train") + + + # PO stochastic is respected----------------------- + po = PipeOpTestHash$new() + po$stochastic = c("train", "predict") + po$cache_state = FALSE + gr = as_graph(po) + gr$cache = TRUE + + # takes > 1 second + R.cache::clearCache(prompt = FALSE) + st = Sys.time() + expect_message(gr$train("train"), "sleeping train") + expect_true(st < Sys.time() - 1) + + # takes > 1 second + st = Sys.time() + expect_message(gr$predict("predict"), "sleeping predict") + expect_true(st < Sys.time() - 1) + # Nothing was cached: + expect_true(all(list.files(R.cache::getCacheRootPath()) == "README.txt")) + + + # PO cache = FALSE is respected ----------------------- + po = PipeOpTestHash$new() + po$cache = FALSE + gr = as_graph(po) + gr$cache = TRUE + + # takes > 1 second + R.cache::clearCache(prompt = FALSE) + st = Sys.time() + expect_message(gr$train("train"), "sleeping train") + expect_true(st < Sys.time() - 1) + expect_true(all(list.files(R.cache::getCacheRootPath()) == "README.txt")) - -# a = po("scale") -# a$hash -# a$param_set$values$center = FALSE -# a$hash - -# }) + + # PO cache_state = FALSE is respected------------------- + po = PipeOpTestHash$new() + po$cache_state = FALSE + gr = as_graph(po) + gr$cache = TRUE + + # takes > 1 second + R.cache::clearCache(prompt = FALSE) + st = Sys.time() + expect_message(gr$train("train"), "sleeping train") + expect_true(st < Sys.time() - 1) + + # takes > 1 second + st = Sys.time() + expect_message(gr$predict("predict"), "sleeping predict") + expect_true(st < Sys.time() - 1) + + # cached train takes < 1 second + st = Sys.time() + expect_silent(gr$train("train")) + expect_true(gr$train("train") == "train") + expect_true(st > (Sys.time() - 1)) + + # cached predict takes < 1 second + st = Sys.time() + expect_message(gr$predict("predict"), "sleeping predict") + expect_true(st > Sys.time() - 1) + expect_true(length(list.files(R.cache::getCacheRootPath())) == 3) +}) \ No newline at end of file From eef1240e4d88d1baca36fef9dccf477050d349a7 Mon Sep 17 00:00:00 2001 From: pfistfl Date: Fri, 3 Apr 2020 23:19:27 +0200 Subject: [PATCH 15/26] Docs and finish up tests --- R/PipeOp.R | 2 +- man/PipeOp.Rd | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/R/PipeOp.R b/R/PipeOp.R index 361e43f6a..6dfd79b4c 100644 --- a/R/PipeOp.R +++ b/R/PipeOp.R @@ -117,7 +117,7 @@ #' and done, if requested, by the [`Graph`] backend itself; it should *not* be done explicitly by `private$.train()` or `private$.predict()`. #' * `cache` :: `logical(1)` \cr #' Whether to cache the [`PipeOp`]'s state and or output during "train" and "predict". Defaults to `TRUE`. -#' See the `cache` field in [`Graph`] for more detailed information on caching as well as `cache_state` and +#' See the `cache` field in [`Graph`] for more detailed information on caching, as well as `cache_state` and #' `stochastic` below. #' A [`PipeOp`] is only cached if it is deterministic. #' * `cache_state` :: `logical(1)` \cr diff --git a/man/PipeOp.Rd b/man/PipeOp.Rd index eef6737e5..cafd28b6f 100644 --- a/man/PipeOp.Rd +++ b/man/PipeOp.Rd @@ -122,7 +122,7 @@ are saved to this slot, exactly as they are returned by these functions. This is and done, if requested, by the \code{\link{Graph}} backend itself; it should \emph{not} be done explicitly by \code{private$.train()} or \code{private$.predict()}. \item \code{cache} :: \code{logical(1)} \cr Whether to cache the \code{\link{PipeOp}}'s state and or output during "train" and "predict". Defaults to \code{TRUE}. -See the \code{cache} field in \code{\link{Graph}} for more detailed information on caching as well as \code{cache_state} and +See the \code{cache} field in \code{\link{Graph}} for more detailed information on caching, as well as \code{cache_state} and \code{stochastic} below. A \code{\link{PipeOp}} is only cached if it is deterministic. \item \code{cache_state} :: \code{logical(1)} \cr From 639dc1145952ee72683d1e04f92b525da4492970 Mon Sep 17 00:00:00 2001 From: pfistfl Date: Fri, 3 Apr 2020 23:22:24 +0200 Subject: [PATCH 16/26] Fix tests --- tests/testthat/test_caching.R | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/testthat/test_caching.R b/tests/testthat/test_caching.R index a31d146bc..e4fc62f03 100644 --- a/tests/testthat/test_caching.R +++ b/tests/testthat/test_caching.R @@ -1,6 +1,7 @@ context("Caching") test_that("Caching works for test hash pipeop", { + skip_on_cran() PipeOpTestHash = R6Class("PipeOpTestHash", inherit = PipeOp, @@ -134,7 +135,7 @@ test_that("Caching works for test hash pipeop", { # cached predict takes < 1 second st = Sys.time() - expect_message(gr$predict("predict"), "sleeping predict") + expect_silent(gr$predict("predict")) expect_true(st > Sys.time() - 1) expect_true(length(list.files(R.cache::getCacheRootPath())) == 3) }) \ No newline at end of file From 4b4b2bea7ca723fe251490b3410e5986c1427312 Mon Sep 17 00:00:00 2001 From: pfistfl Date: Fri, 3 Apr 2020 23:36:26 +0200 Subject: [PATCH 17/26] fix up proxy --- R/PipeOpProxy.R | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/R/PipeOpProxy.R b/R/PipeOpProxy.R index 519903d0b..89612c33c 100644 --- a/R/PipeOpProxy.R +++ b/R/PipeOpProxy.R @@ -108,8 +108,7 @@ PipeOpProxy = R6Class("PipeOpProxy", active = list( cache = function(val) { if (!missing(val)) { - assert_flag(val) - self$param_set$values$content$cache = val + self$param_set$values$content$cache = assert_flag(val) } else { self$param_set$values$content$cache } @@ -122,6 +121,7 @@ PipeOpProxy = R6Class("PipeOpProxy", else self$param_set$values$content$stochastic = val } else { + if (inherits(self$param_set$values$content, "Graph")) return(character(0)) self$param_set$values$content$stochastic } }, @@ -129,6 +129,7 @@ PipeOpProxy = R6Class("PipeOpProxy", if (!missing(val)) { stop("cache_state is read-only!") } else { + if (inherits(self$param_set$values$content, "Graph")) return(TRUE) self$param_set$values$content$cache_state } } From fd147c43b8d596c7b8c10543f84e938484a13876 Mon Sep 17 00:00:00 2001 From: pfistfl Date: Sat, 4 Apr 2020 01:00:40 +0200 Subject: [PATCH 18/26] polish docs --- R/Graph.R | 9 +++------ R/PipeOp.R | 8 +++----- man/Graph.Rd | 2 +- man/PipeOp.Rd | 2 +- tests/testthat/test_caching.R | 2 +- 5 files changed, 9 insertions(+), 14 deletions(-) diff --git a/R/Graph.R b/R/Graph.R index f9fd39f0b..9be3c1ce0 100644 --- a/R/Graph.R +++ b/R/Graph.R @@ -61,7 +61,7 @@ #' * `cache` :: `logical(1)` \cr #' Whether to cache individual [`PipeOp`]'s during "train" and "predict". Default `FALSE`. #' Caching is performed using the [`R.cache`](R.cache::R.cache) package. -#' Caching can be disabled globally using `getOption("R.cache.enabled", TRUE)`. +#' Caching can be disabled/enabled globally using `getOption("R.cache.enabled", TRUE)`. #' By default, files are cached in `R.cache::getCacheRootPath()`. #' For more information on how to set the cache path or retrieve cached items please consider #' the [`R.cache`](R.cache::R.cache) documentation. @@ -121,7 +121,6 @@ #' (`any`, `logical(1)`) -> `list` of `any` \cr #' Predict with the [`Graph`] by calling all the [`PipeOp`]'s `$train` methods. Input and output, as well as the function #' of the `single_input` argument, are analogous to `$train()`. -#' #' #' @name Graph #' @family mlr3pipelines backend related @@ -412,8 +411,7 @@ Graph = R6Class("Graph", }, cache = function(val) { if (!missing(val)) { - assert_flag(val) - private$.cache = val + private$.cache = assert_flag(val) } else { private$.cache } @@ -551,7 +549,6 @@ graph_reduce = function(self, input, fun, single_input) { names(input) = input_tbl$name output = cached_pipeop_eval(self, op, fun, input) - if (self$keep_results) { op$.result = output } @@ -662,6 +659,7 @@ cached_pipeop_eval = function(self, op, fun, input) { } } } else if (fun == "predict" && !op$cache_state) { + # during predict, only cache if cache_state is FALSE and op is not stochastic. if (fun %nin% op$stochastic) { R.cache::evalWithMemoization( {output = op[[fun]](input)}, @@ -680,4 +678,3 @@ get_hash = function(x) { hash = digest(x, algo = "xxhash64") hash } - diff --git a/R/PipeOp.R b/R/PipeOp.R index 6dfd79b4c..c7500978e 100644 --- a/R/PipeOp.R +++ b/R/PipeOp.R @@ -119,7 +119,6 @@ #' Whether to cache the [`PipeOp`]'s state and or output during "train" and "predict". Defaults to `TRUE`. #' See the `cache` field in [`Graph`] for more detailed information on caching, as well as `cache_state` and #' `stochastic` below. -#' A [`PipeOp`] is only cached if it is deterministic. #' * `cache_state` :: `logical(1)` \cr #' Whether the [`PipeOp`]s behaviour during training is equal to behaviour during prediction #' (other then setting a state). In this case, only the [`PipeOp`]s state is cached. @@ -129,6 +128,7 @@ #' Whether a [`PipeOp`] is stochastic during `"train"`, `"predict"`, or not at all: `character(0)`. #' Defaults to `character(0)` (deterministic). Stochastic [`PipeOp`]s are not cached during the #' respective phase. +#' A [`PipeOp`] is only cached if it is deterministic. #' #' #' @section Methods: @@ -313,8 +313,7 @@ PipeOp = R6Class("PipeOp", }, cache = function(val) { if (!missing(val)) { - assert_flag(val) - private$.cache = val + private$.cache = assert_flag(val) } else { private$.cache } @@ -327,8 +326,7 @@ PipeOp = R6Class("PipeOp", }, stochastic = function(val) { if (!missing(val)) { - assert_subset(val, c("train", "predict")) - private$.stochastic = val + private$.stochastic = assert_subset(val, c("train", "predict")) } else { private$.stochastic } diff --git a/man/Graph.Rd b/man/Graph.Rd index ed9c492be..07bdb33eb 100644 --- a/man/Graph.Rd +++ b/man/Graph.Rd @@ -69,7 +69,7 @@ Whether to store intermediate results in the \code{\link{PipeOp}}'s \verb{$.resu \item \code{cache} :: \code{logical(1)} \cr Whether to cache individual \code{\link{PipeOp}}'s during "train" and "predict". Default \code{FALSE}. Caching is performed using the \href{R.cache::R.cache}{\code{R.cache}} package. -Caching can be disabled globally using \code{getOption("R.cache.enabled", TRUE)}. +Caching can be disabled/enabled globally using \code{getOption("R.cache.enabled", TRUE)}. By default, files are cached in \code{R.cache::getCacheRootPath()}. For more information on how to set the cache path or retrieve cached items please consider the \href{R.cache::R.cache}{\code{R.cache}} documentation. diff --git a/man/PipeOp.Rd b/man/PipeOp.Rd index cafd28b6f..dd9502609 100644 --- a/man/PipeOp.Rd +++ b/man/PipeOp.Rd @@ -124,7 +124,6 @@ and done, if requested, by the \code{\link{Graph}} backend itself; it should \em Whether to cache the \code{\link{PipeOp}}'s state and or output during "train" and "predict". Defaults to \code{TRUE}. See the \code{cache} field in \code{\link{Graph}} for more detailed information on caching, as well as \code{cache_state} and \code{stochastic} below. -A \code{\link{PipeOp}} is only cached if it is deterministic. \item \code{cache_state} :: \code{logical(1)} \cr Whether the \code{\link{PipeOp}}s behaviour during training is equal to behaviour during prediction (other then setting a state). In this case, only the \code{\link{PipeOp}}s state is cached. @@ -134,6 +133,7 @@ Defaults to \code{TRUE}. Whether a \code{\link{PipeOp}} is stochastic during \code{"train"}, \code{"predict"}, or not at all: \code{character(0)}. Defaults to \code{character(0)} (deterministic). Stochastic \code{\link{PipeOp}}s are not cached during the respective phase. +A \code{\link{PipeOp}} is only cached if it is deterministic. } } diff --git a/tests/testthat/test_caching.R b/tests/testthat/test_caching.R index e4fc62f03..3e49c850b 100644 --- a/tests/testthat/test_caching.R +++ b/tests/testthat/test_caching.R @@ -138,4 +138,4 @@ test_that("Caching works for test hash pipeop", { expect_silent(gr$predict("predict")) expect_true(st > Sys.time() - 1) expect_true(length(list.files(R.cache::getCacheRootPath())) == 3) -}) \ No newline at end of file +}) From 12cae423e3d12783fcf1348c4df1f44a3b2c5645 Mon Sep 17 00:00:00 2001 From: pfistfl Date: Sat, 4 Apr 2020 08:39:58 +0200 Subject: [PATCH 19/26] move R.cache to suggests, cache tests to tempfile --- DESCRIPTION | 4 ++-- R/Graph.R | 1 + tests/testthat/test_caching.R | 9 +++++++++ 3 files changed, 12 insertions(+), 2 deletions(-) diff --git a/DESCRIPTION b/DESCRIPTION index f4af1ca4c..9f8474c2d 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -45,7 +45,6 @@ Imports: mlr3misc (>= 0.1.4), paradox, R6, - R.cache, withr Suggests: ggplot2, @@ -67,7 +66,8 @@ Suggests: fastICA, kernlab, smotefamily, - evaluate + evaluate, + R.cache VignetteBuilder: knitr Remotes: diff --git a/R/Graph.R b/R/Graph.R index 9be3c1ce0..e5f68400a 100644 --- a/R/Graph.R +++ b/R/Graph.R @@ -632,6 +632,7 @@ predict.Graph = function(object, newdata, ...) { cached_pipeop_eval = function(self, op, fun, input) { if (self$cache && op$cache) { + require_namespaces("R.cache") cache_key = list(map_chr(input, get_hash), op$hash) if (fun == "train") { if (fun %nin% op$stochastic) { diff --git a/tests/testthat/test_caching.R b/tests/testthat/test_caching.R index 3e49c850b..349b4791f 100644 --- a/tests/testthat/test_caching.R +++ b/tests/testthat/test_caching.R @@ -3,6 +3,15 @@ context("Caching") test_that("Caching works for test hash pipeop", { skip_on_cran() + # cache to tempdir + old_tmpdir = R.cache::getCacheRootPath() + test_tmpdir = tempdir() + R.cache::setCacheRootPath(test_tmpdir) + on.exit({ + R.cache::setCacheRootPath(old_tmpdir) + unlink(test_tmpdir, recursive = TRUE) + }) + PipeOpTestHash = R6Class("PipeOpTestHash", inherit = PipeOp, public = list( From f096252f6634ef3fa45b94d53fe228acdf563102 Mon Sep 17 00:00:00 2001 From: pfistfl Date: Sat, 4 Apr 2020 09:29:25 +0200 Subject: [PATCH 20/26] no on exit --- tests/testthat/test_caching.R | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/testthat/test_caching.R b/tests/testthat/test_caching.R index 349b4791f..f196edec3 100644 --- a/tests/testthat/test_caching.R +++ b/tests/testthat/test_caching.R @@ -7,10 +7,6 @@ test_that("Caching works for test hash pipeop", { old_tmpdir = R.cache::getCacheRootPath() test_tmpdir = tempdir() R.cache::setCacheRootPath(test_tmpdir) - on.exit({ - R.cache::setCacheRootPath(old_tmpdir) - unlink(test_tmpdir, recursive = TRUE) - }) PipeOpTestHash = R6Class("PipeOpTestHash", inherit = PipeOp, @@ -147,4 +143,8 @@ test_that("Caching works for test hash pipeop", { expect_silent(gr$predict("predict")) expect_true(st > Sys.time() - 1) expect_true(length(list.files(R.cache::getCacheRootPath())) == 3) + + # Reset old cachepath + R.cache::setCacheRootPath(old_tmpdir) + unlink(test_tmpdir, recursive = TRUE) }) From b5f18f1dd1e3caff78358bf3ecd94a5981ba0e0e Mon Sep 17 00:00:00 2001 From: pfistfl Date: Sat, 4 Apr 2020 10:19:22 +0200 Subject: [PATCH 21/26] move to imports --- DESCRIPTION | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/DESCRIPTION b/DESCRIPTION index 9f8474c2d..f4af1ca4c 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -45,6 +45,7 @@ Imports: mlr3misc (>= 0.1.4), paradox, R6, + R.cache, withr Suggests: ggplot2, @@ -66,8 +67,7 @@ Suggests: fastICA, kernlab, smotefamily, - evaluate, - R.cache + evaluate VignetteBuilder: knitr Remotes: From 336748c0bc56e5c90ce08296467a27cee0b5f484 Mon Sep 17 00:00:00 2001 From: pfistfl Date: Mon, 6 Apr 2020 11:42:46 +0200 Subject: [PATCH 22/26] Disable caching tests --- tests/testthat/test_caching.R | 292 +++++++++++++++++----------------- 1 file changed, 146 insertions(+), 146 deletions(-) diff --git a/tests/testthat/test_caching.R b/tests/testthat/test_caching.R index f196edec3..5a69f593f 100644 --- a/tests/testthat/test_caching.R +++ b/tests/testthat/test_caching.R @@ -1,150 +1,150 @@ context("Caching") -test_that("Caching works for test hash pipeop", { - skip_on_cran() - - # cache to tempdir - old_tmpdir = R.cache::getCacheRootPath() - test_tmpdir = tempdir() - R.cache::setCacheRootPath(test_tmpdir) - - PipeOpTestHash = R6Class("PipeOpTestHash", - inherit = PipeOp, - public = list( - initialize = function(id = "test.hash", param_set = ParamSet$new()) { - super$initialize(id = id, param_set = param_set, - input = data.table(name = "input", train = "*", predict = "*"), - output = data.table(name = "output", train = "*", predict = "*") - ) - }), - active = list( - cache_state = function(val) { - if (missing(val)) - return(private$.cache_state) - private$.cache_state = val - } - ), - private = list( - .train = function(inputs) { - Sys.sleep(1) - message("sleeping train") - self$state = list("train") - inputs - }, - .predict = function(inputs) { - if (inputs[[1]] == "predict") { - Sys.sleep(1) - message("sleeping predict") - } - inputs - }, - .cache = TRUE, - .cache_state = TRUE - ) - ) - - # caching is only enabled for graphs - gr = as_graph(PipeOpTestHash$new()) - gr$cache = TRUE - - # takes > 1 second - R.cache::clearCache(prompt = FALSE) - st = Sys.time() - expect_message(gr$train("train"), "sleeping train") - expect_true(st < Sys.time() - 1) - - # takes > 1 second - st = Sys.time() - expect_message(gr$predict("predict"), "sleeping predict") - expect_true(st < Sys.time() - 1) - - # cached train takes < 1 second - st = Sys.time() - expect_silent(gr$train("train")) - expect_true(gr$train("train") == "train") - expect_true(st > (Sys.time() - 1)) - - # uncached (cach_state) predict takes > 1 second - st = Sys.time() - expect_message(gr$predict("predict"), "sleeping predict") - expect_true(st < Sys.time() - 1) - - # Obtain result from cache: - key = list(map_chr(list(input = "train"), get_hash), PipeOpTestHash$new()$hash) - # R.cache appends the expression to the key before storing - expr = substitute({ - op[[fun]](input) - state = op$state - }) - key = c(list(expr = expr), key) - expect_equal(R.cache::loadCache(key)$results[[1]], "train") - - - # PO stochastic is respected----------------------- - po = PipeOpTestHash$new() - po$stochastic = c("train", "predict") - po$cache_state = FALSE - gr = as_graph(po) - gr$cache = TRUE - - # takes > 1 second - R.cache::clearCache(prompt = FALSE) - st = Sys.time() - expect_message(gr$train("train"), "sleeping train") - expect_true(st < Sys.time() - 1) - - # takes > 1 second - st = Sys.time() - expect_message(gr$predict("predict"), "sleeping predict") - expect_true(st < Sys.time() - 1) - # Nothing was cached: - expect_true(all(list.files(R.cache::getCacheRootPath()) == "README.txt")) - - - # PO cache = FALSE is respected ----------------------- - po = PipeOpTestHash$new() - po$cache = FALSE - gr = as_graph(po) - gr$cache = TRUE - - # takes > 1 second - R.cache::clearCache(prompt = FALSE) - st = Sys.time() - expect_message(gr$train("train"), "sleeping train") - expect_true(st < Sys.time() - 1) - expect_true(all(list.files(R.cache::getCacheRootPath()) == "README.txt")) +# test_that("Caching works for test hash pipeop", { +# skip_on_cran() + +# # cache to tempdir +# old_tmpdir = R.cache::getCacheRootPath() +# test_tmpdir = tempdir() +# R.cache::setCacheRootPath(test_tmpdir) + +# PipeOpTestHash = R6Class("PipeOpTestHash", +# inherit = PipeOp, +# public = list( +# initialize = function(id = "test.hash", param_set = ParamSet$new()) { +# super$initialize(id = id, param_set = param_set, +# input = data.table(name = "input", train = "*", predict = "*"), +# output = data.table(name = "output", train = "*", predict = "*") +# ) +# }), +# active = list( +# cache_state = function(val) { +# if (missing(val)) +# return(private$.cache_state) +# private$.cache_state = val +# } +# ), +# private = list( +# .train = function(inputs) { +# Sys.sleep(1) +# message("sleeping train") +# self$state = list("train") +# inputs +# }, +# .predict = function(inputs) { +# if (inputs[[1]] == "predict") { +# Sys.sleep(1) +# message("sleeping predict") +# } +# inputs +# }, +# .cache = TRUE, +# .cache_state = TRUE +# ) +# ) + +# # caching is only enabled for graphs +# gr = as_graph(PipeOpTestHash$new()) +# gr$cache = TRUE + +# # takes > 1 second +# R.cache::clearCache(prompt = FALSE) +# st = Sys.time() +# expect_message(gr$train("train"), "sleeping train") +# expect_true(st < Sys.time() - 1) + +# # takes > 1 second +# st = Sys.time() +# expect_message(gr$predict("predict"), "sleeping predict") +# expect_true(st < Sys.time() - 1) + +# # cached train takes < 1 second +# st = Sys.time() +# expect_silent(gr$train("train")) +# expect_true(gr$train("train") == "train") +# expect_true(st > (Sys.time() - 1)) + +# # uncached (cach_state) predict takes > 1 second +# st = Sys.time() +# expect_message(gr$predict("predict"), "sleeping predict") +# expect_true(st < Sys.time() - 1) + +# # Obtain result from cache: +# key = list(map_chr(list(input = "train"), get_hash), PipeOpTestHash$new()$hash) +# # R.cache appends the expression to the key before storing +# expr = substitute({ +# op[[fun]](input) +# state = op$state +# }) +# key = c(list(expr = expr), key) +# expect_equal(R.cache::loadCache(key)$results[[1]], "train") + + +# # PO stochastic is respected----------------------- +# po = PipeOpTestHash$new() +# po$stochastic = c("train", "predict") +# po$cache_state = FALSE +# gr = as_graph(po) +# gr$cache = TRUE + +# # takes > 1 second +# R.cache::clearCache(prompt = FALSE) +# st = Sys.time() +# expect_message(gr$train("train"), "sleeping train") +# expect_true(st < Sys.time() - 1) + +# # takes > 1 second +# st = Sys.time() +# expect_message(gr$predict("predict"), "sleeping predict") +# expect_true(st < Sys.time() - 1) +# # Nothing was cached: +# expect_true(all(list.files(R.cache::getCacheRootPath()) == "README.txt")) + + +# # PO cache = FALSE is respected ----------------------- +# po = PipeOpTestHash$new() +# po$cache = FALSE +# gr = as_graph(po) +# gr$cache = TRUE + +# # takes > 1 second +# R.cache::clearCache(prompt = FALSE) +# st = Sys.time() +# expect_message(gr$train("train"), "sleeping train") +# expect_true(st < Sys.time() - 1) +# expect_true(all(list.files(R.cache::getCacheRootPath()) == "README.txt")) - # PO cache_state = FALSE is respected------------------- - po = PipeOpTestHash$new() - po$cache_state = FALSE - gr = as_graph(po) - gr$cache = TRUE - - # takes > 1 second - R.cache::clearCache(prompt = FALSE) - st = Sys.time() - expect_message(gr$train("train"), "sleeping train") - expect_true(st < Sys.time() - 1) - - # takes > 1 second - st = Sys.time() - expect_message(gr$predict("predict"), "sleeping predict") - expect_true(st < Sys.time() - 1) - - # cached train takes < 1 second - st = Sys.time() - expect_silent(gr$train("train")) - expect_true(gr$train("train") == "train") - expect_true(st > (Sys.time() - 1)) - - # cached predict takes < 1 second - st = Sys.time() - expect_silent(gr$predict("predict")) - expect_true(st > Sys.time() - 1) - expect_true(length(list.files(R.cache::getCacheRootPath())) == 3) - - # Reset old cachepath - R.cache::setCacheRootPath(old_tmpdir) - unlink(test_tmpdir, recursive = TRUE) -}) +# # PO cache_state = FALSE is respected------------------- +# po = PipeOpTestHash$new() +# po$cache_state = FALSE +# gr = as_graph(po) +# gr$cache = TRUE + +# # takes > 1 second +# R.cache::clearCache(prompt = FALSE) +# st = Sys.time() +# expect_message(gr$train("train"), "sleeping train") +# expect_true(st < Sys.time() - 1) + +# # takes > 1 second +# st = Sys.time() +# expect_message(gr$predict("predict"), "sleeping predict") +# expect_true(st < Sys.time() - 1) + +# # cached train takes < 1 second +# st = Sys.time() +# expect_silent(gr$train("train")) +# expect_true(gr$train("train") == "train") +# expect_true(st > (Sys.time() - 1)) + +# # cached predict takes < 1 second +# st = Sys.time() +# expect_silent(gr$predict("predict")) +# expect_true(st > Sys.time() - 1) +# expect_true(length(list.files(R.cache::getCacheRootPath())) == 3) + +# # Reset old cachepath +# R.cache::setCacheRootPath(old_tmpdir) +# unlink(test_tmpdir, recursive = TRUE) +# }) From 2884fcd63f3a8a97ad5aa4a68ff367a3bb700da1 Mon Sep 17 00:00:00 2001 From: pfistfl Date: Thu, 16 Apr 2020 17:33:15 +0200 Subject: [PATCH 23/26] re-enable tests --- tests/testthat/test_caching.R | 292 +++++++++++++++++----------------- 1 file changed, 146 insertions(+), 146 deletions(-) diff --git a/tests/testthat/test_caching.R b/tests/testthat/test_caching.R index 5a69f593f..25ccb982a 100644 --- a/tests/testthat/test_caching.R +++ b/tests/testthat/test_caching.R @@ -1,150 +1,150 @@ context("Caching") -# test_that("Caching works for test hash pipeop", { -# skip_on_cran() - -# # cache to tempdir -# old_tmpdir = R.cache::getCacheRootPath() -# test_tmpdir = tempdir() -# R.cache::setCacheRootPath(test_tmpdir) - -# PipeOpTestHash = R6Class("PipeOpTestHash", -# inherit = PipeOp, -# public = list( -# initialize = function(id = "test.hash", param_set = ParamSet$new()) { -# super$initialize(id = id, param_set = param_set, -# input = data.table(name = "input", train = "*", predict = "*"), -# output = data.table(name = "output", train = "*", predict = "*") -# ) -# }), -# active = list( -# cache_state = function(val) { -# if (missing(val)) -# return(private$.cache_state) -# private$.cache_state = val -# } -# ), -# private = list( -# .train = function(inputs) { -# Sys.sleep(1) -# message("sleeping train") -# self$state = list("train") -# inputs -# }, -# .predict = function(inputs) { -# if (inputs[[1]] == "predict") { -# Sys.sleep(1) -# message("sleeping predict") -# } -# inputs -# }, -# .cache = TRUE, -# .cache_state = TRUE -# ) -# ) - -# # caching is only enabled for graphs -# gr = as_graph(PipeOpTestHash$new()) -# gr$cache = TRUE - -# # takes > 1 second -# R.cache::clearCache(prompt = FALSE) -# st = Sys.time() -# expect_message(gr$train("train"), "sleeping train") -# expect_true(st < Sys.time() - 1) - -# # takes > 1 second -# st = Sys.time() -# expect_message(gr$predict("predict"), "sleeping predict") -# expect_true(st < Sys.time() - 1) - -# # cached train takes < 1 second -# st = Sys.time() -# expect_silent(gr$train("train")) -# expect_true(gr$train("train") == "train") -# expect_true(st > (Sys.time() - 1)) - -# # uncached (cach_state) predict takes > 1 second -# st = Sys.time() -# expect_message(gr$predict("predict"), "sleeping predict") -# expect_true(st < Sys.time() - 1) - -# # Obtain result from cache: -# key = list(map_chr(list(input = "train"), get_hash), PipeOpTestHash$new()$hash) -# # R.cache appends the expression to the key before storing -# expr = substitute({ -# op[[fun]](input) -# state = op$state -# }) -# key = c(list(expr = expr), key) -# expect_equal(R.cache::loadCache(key)$results[[1]], "train") - - -# # PO stochastic is respected----------------------- -# po = PipeOpTestHash$new() -# po$stochastic = c("train", "predict") -# po$cache_state = FALSE -# gr = as_graph(po) -# gr$cache = TRUE - -# # takes > 1 second -# R.cache::clearCache(prompt = FALSE) -# st = Sys.time() -# expect_message(gr$train("train"), "sleeping train") -# expect_true(st < Sys.time() - 1) - -# # takes > 1 second -# st = Sys.time() -# expect_message(gr$predict("predict"), "sleeping predict") -# expect_true(st < Sys.time() - 1) -# # Nothing was cached: -# expect_true(all(list.files(R.cache::getCacheRootPath()) == "README.txt")) - - -# # PO cache = FALSE is respected ----------------------- -# po = PipeOpTestHash$new() -# po$cache = FALSE -# gr = as_graph(po) -# gr$cache = TRUE - -# # takes > 1 second -# R.cache::clearCache(prompt = FALSE) -# st = Sys.time() -# expect_message(gr$train("train"), "sleeping train") -# expect_true(st < Sys.time() - 1) -# expect_true(all(list.files(R.cache::getCacheRootPath()) == "README.txt")) +test_that("Caching works for test hash pipeop", { + skip_on_cran() + + # cache to tempdir + # old_tmpdir = R.cache::getCacheRootPath() + # test_tmpdir = tempdir() + # R.cache::setCacheRootPath(test_tmpdir) + + PipeOpTestHash = R6Class("PipeOpTestHash", + inherit = PipeOp, + public = list( + initialize = function(id = "test.hash", param_set = ParamSet$new()) { + super$initialize(id = id, param_set = param_set, + input = data.table(name = "input", train = "*", predict = "*"), + output = data.table(name = "output", train = "*", predict = "*") + ) + }), + active = list( + cache_state = function(val) { + if (missing(val)) + return(private$.cache_state) + private$.cache_state = val + } + ), + private = list( + .train = function(inputs) { + Sys.sleep(1) + message("sleeping train") + self$state = list("train") + inputs + }, + .predict = function(inputs) { + if (inputs[[1]] == "predict") { + Sys.sleep(1) + message("sleeping predict") + } + inputs + }, + .cache = TRUE, + .cache_state = TRUE + ) + ) + + # caching is only enabled for graphs + gr = as_graph(PipeOpTestHash$new()) + gr$cache = TRUE + + # takes > 1 second + R.cache::clearCache(prompt = FALSE) + st = Sys.time() + expect_message(gr$train("train"), "sleeping train") + expect_true(st < Sys.time() - 1) + + # takes > 1 second + st = Sys.time() + expect_message(gr$predict("predict"), "sleeping predict") + expect_true(st < Sys.time() - 1) + + # cached train takes < 1 second + st = Sys.time() + expect_silent(gr$train("train")) + expect_true(gr$train("train") == "train") + expect_true(st > (Sys.time() - 1)) + + # uncached (cach_state) predict takes > 1 second + st = Sys.time() + expect_message(gr$predict("predict"), "sleeping predict") + expect_true(st < Sys.time() - 1) + + # Obtain result from cache: + key = list(map_chr(list(input = "train"), get_hash), PipeOpTestHash$new()$hash) + # R.cache appends the expression to the key before storing + expr = substitute({ + op[[fun]](input) + state = op$state + }) + key = c(list(expr = expr), key) + expect_equal(R.cache::loadCache(key)$results[[1]], "train") + + + # PO stochastic is respected----------------------- + po = PipeOpTestHash$new() + po$stochastic = c("train", "predict") + po$cache_state = FALSE + gr = as_graph(po) + gr$cache = TRUE + + # takes > 1 second + R.cache::clearCache(prompt = FALSE) + st = Sys.time() + expect_message(gr$train("train"), "sleeping train") + expect_true(st < Sys.time() - 1) + + # takes > 1 second + st = Sys.time() + expect_message(gr$predict("predict"), "sleeping predict") + expect_true(st < Sys.time() - 1) + # Nothing was cached: + expect_true(all(list.files(R.cache::getCacheRootPath()) == "README.txt")) + + + # PO cache = FALSE is respected ----------------------- + po = PipeOpTestHash$new() + po$cache = FALSE + gr = as_graph(po) + gr$cache = TRUE + + # takes > 1 second + R.cache::clearCache(prompt = FALSE) + st = Sys.time() + expect_message(gr$train("train"), "sleeping train") + expect_true(st < Sys.time() - 1) + expect_true(all(list.files(R.cache::getCacheRootPath()) == "README.txt")) -# # PO cache_state = FALSE is respected------------------- -# po = PipeOpTestHash$new() -# po$cache_state = FALSE -# gr = as_graph(po) -# gr$cache = TRUE - -# # takes > 1 second -# R.cache::clearCache(prompt = FALSE) -# st = Sys.time() -# expect_message(gr$train("train"), "sleeping train") -# expect_true(st < Sys.time() - 1) - -# # takes > 1 second -# st = Sys.time() -# expect_message(gr$predict("predict"), "sleeping predict") -# expect_true(st < Sys.time() - 1) - -# # cached train takes < 1 second -# st = Sys.time() -# expect_silent(gr$train("train")) -# expect_true(gr$train("train") == "train") -# expect_true(st > (Sys.time() - 1)) - -# # cached predict takes < 1 second -# st = Sys.time() -# expect_silent(gr$predict("predict")) -# expect_true(st > Sys.time() - 1) -# expect_true(length(list.files(R.cache::getCacheRootPath())) == 3) - -# # Reset old cachepath -# R.cache::setCacheRootPath(old_tmpdir) -# unlink(test_tmpdir, recursive = TRUE) -# }) + # PO cache_state = FALSE is respected------------------- + po = PipeOpTestHash$new() + po$cache_state = FALSE + gr = as_graph(po) + gr$cache = TRUE + + # takes > 1 second + R.cache::clearCache(prompt = FALSE) + st = Sys.time() + expect_message(gr$train("train"), "sleeping train") + expect_true(st < Sys.time() - 1) + + # takes > 1 second + st = Sys.time() + expect_message(gr$predict("predict"), "sleeping predict") + expect_true(st < Sys.time() - 1) + + # cached train takes < 1 second + st = Sys.time() + expect_silent(gr$train("train")) + expect_true(gr$train("train") == "train") + expect_true(st > (Sys.time() - 1)) + + # cached predict takes < 1 second + st = Sys.time() + expect_silent(gr$predict("predict")) + expect_true(st > Sys.time() - 1) + expect_true(length(list.files(R.cache::getCacheRootPath())) == 3) + + # # Reset old cachepath + # R.cache::setCacheRootPath(old_tmpdir) + # unlink(test_tmpdir, recursive = TRUE) +}) From d1c7d09a5ac58aa1c8515740d464a26d5c77ef4b Mon Sep 17 00:00:00 2001 From: pfistfl Date: Fri, 17 Apr 2020 11:00:09 +0200 Subject: [PATCH 24/26] improve comments, cache tests tempfile --- R/Graph.R | 28 +++++++++++++++++----------- tests/testthat/test_caching.R | 11 ++++++----- 2 files changed, 23 insertions(+), 16 deletions(-) diff --git a/R/Graph.R b/R/Graph.R index 0c5c7d77a..48861c556 100644 --- a/R/Graph.R +++ b/R/Graph.R @@ -628,15 +628,19 @@ predict.Graph = function(object, newdata, ...) { } # Cached train/predict of a PipeOp. -# 1) Caching is only performed if graph and po have `cache = TRUE` -# 2) Additonally caching is only performed if 'fun' (train / predict) is not stochastic -# for a given PipeOp +# 1) Caching of a PipeOp only performed if graph and po have `cache = TRUE`, +# i.e both the Graph AND the PipeOp want to be cached. +# 2) Additonally caching is only performed if 'train' or 'predict' is not stochastic +# for a given PipeOp. This can be obtained from `.$stochastic` and can be set +# for each PipeOp. # 3) During training we have two options -# - Cache only state: +# Each PipeOp stores whether it wants to do I. or II. in `.$cache_state`. +# I. Cache only state: # This is possible if the train transform is the same as the predict transform # and predict is comparatively cheap (i.e. filters). -# - Cache state and output +# II. Cache state and output # (All other cases) + cached_pipeop_eval = function(self, op, fun, input) { if (self$cache && op$cache) { @@ -644,10 +648,11 @@ cached_pipeop_eval = function(self, op, fun, input) { cache_key = list(map_chr(input, get_hash), op$hash) if (fun == "train") { if (fun %nin% op$stochastic) { - # Two options: cache state (can predict on train set using state during train) - # Or: do not cache state () (if upper is not possible) + # Two options: + # I. cache state (can predict on train set using state during train) + # II. do not cache state () (if I. is not possible) if (op$cache_state) { - # only cache state + # only cache state (I.) R.cache::evalWithMemoization({ op[[fun]](input) state = op$state @@ -655,14 +660,15 @@ cached_pipeop_eval = function(self, op, fun, input) { # Set state if PipeOp was cached (and "train" was therefore not called) if (is.null(op$state) && fun == "train") op$state = state # We call "predict" on train inputs, this avoids storing the outputs - # during training on disk. This is only possible for pipeops where 'cache_state' is TRUE. + # during training on disk. + # This is only done for pipeops where 'cache_state' is TRUE. return(cached_pipeop_eval(self, op, "predict", input)) } else { - # Otherwise we cache state and input + # Otherwise we cache state and input (II.) R.cache::evalWithMemoization({ result = list(output = op[[fun]](input), state = op$state) }, key = cache_key) - # Set state if PipeOp was cached + # Set state if PipeOp was cached before (and thus no state was set) if (is.null(op$state) && fun == "train") op$state = result$state return(result$output) } diff --git a/tests/testthat/test_caching.R b/tests/testthat/test_caching.R index 25ccb982a..366c61fdf 100644 --- a/tests/testthat/test_caching.R +++ b/tests/testthat/test_caching.R @@ -2,11 +2,12 @@ context("Caching") test_that("Caching works for test hash pipeop", { skip_on_cran() + require("R.cache") # cache to tempdir - # old_tmpdir = R.cache::getCacheRootPath() - # test_tmpdir = tempdir() - # R.cache::setCacheRootPath(test_tmpdir) + old_tmpdir = R.cache::getCacheRootPath() + test_tmpdir = tempdir() + R.cache::setCacheRootPath(test_tmpdir) PipeOpTestHash = R6Class("PipeOpTestHash", inherit = PipeOp, @@ -145,6 +146,6 @@ test_that("Caching works for test hash pipeop", { expect_true(length(list.files(R.cache::getCacheRootPath())) == 3) # # Reset old cachepath - # R.cache::setCacheRootPath(old_tmpdir) - # unlink(test_tmpdir, recursive = TRUE) + R.cache::setCacheRootPath(old_tmpdir) + unlink(test_tmpdir, recursive = TRUE) }) From 2547fb01f845d65a3d51b67e84ac3afbbf3416e4 Mon Sep 17 00:00:00 2001 From: pfistfl Date: Fri, 17 Apr 2020 11:17:33 +0200 Subject: [PATCH 25/26] fix caching tempdir issues --- tests/testthat/test_caching.R | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/testthat/test_caching.R b/tests/testthat/test_caching.R index 366c61fdf..bfc515bb7 100644 --- a/tests/testthat/test_caching.R +++ b/tests/testthat/test_caching.R @@ -6,7 +6,7 @@ test_that("Caching works for test hash pipeop", { # cache to tempdir old_tmpdir = R.cache::getCacheRootPath() - test_tmpdir = tempdir() + test_tmpdir = dir.create(paste0(tempdir(), "R.cache")) R.cache::setCacheRootPath(test_tmpdir) PipeOpTestHash = R6Class("PipeOpTestHash", @@ -147,5 +147,5 @@ test_that("Caching works for test hash pipeop", { # # Reset old cachepath R.cache::setCacheRootPath(old_tmpdir) - unlink(test_tmpdir, recursive = TRUE) + unlink(list.files(test_tmpdir), recursive = TRUE) }) From 2b752116ee533af7b93229f6582bb04723adc253 Mon Sep 17 00:00:00 2001 From: pfistfl Date: Sat, 18 Apr 2020 08:58:14 +0200 Subject: [PATCH 26/26] unlink testdir properly --- tests/testthat/test_caching.R | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/testthat/test_caching.R b/tests/testthat/test_caching.R index bfc515bb7..d5e7b266a 100644 --- a/tests/testthat/test_caching.R +++ b/tests/testthat/test_caching.R @@ -147,5 +147,5 @@ test_that("Caching works for test hash pipeop", { # # Reset old cachepath R.cache::setCacheRootPath(old_tmpdir) - unlink(list.files(test_tmpdir), recursive = TRUE) + unlink(test_tmpdir, recursive = TRUE) })