@@ -315,8 +315,95 @@ object Test {
315315 }
316316 }
317317
318+ /**
319+ *
320+ * @param stream
321+ * @tparam A
322+ * @return
323+ */
318324 private def makeLinear [A ](stream : StagedStream [A ]): StagedStream [A ] = {
319- ???
325+ stream match {
326+ case Linear (producer) => stream
327+ case Nested (producer, nestedf) => {
328+
329+ /** Helper function that orchestrates the handling of the function that represents an `advance: Unit => Unit`.
330+ * It reifies a nested stream as calls to `advance`. Advance encodes the step function of each nested stream.
331+ * It is used in the init of a producer of a nested stream. When an inner stream finishes, the
332+ * `currentAdvance` holds the function to the `advance` function of the earlier stream.
333+ * `makeAdvanceFunction`, for each nested stream, installs a new `advance` function that after
334+ * the stream finishes it will restore the earlier one.
335+ *
336+ * When `advance` is called the result is consumed in the continuation. Within this continuation
337+ * the resulting value should be saved in a variable.
338+ *
339+ * @param currentAdvance variable that holds a function that represents the stream at each level.
340+ * @param k the continuation that consumes a variable.
341+ * @return the quote of the orchestrated code that will be executed as
342+ */
343+ def makeAdvanceFunction [A ](currentAdvance : Var [Unit => Unit ], k : A => Expr [Unit ], stream : StagedStream [A ]): Expr [Unit ] = {
344+ stream match {
345+ case Linear (producer) =>
346+ producer.card match {
347+ case AtMost1 => producer.init(st => ' {
348+ if (~ producer.hasNext(st)) {
349+ ~ producer.step(st, k)
350+ }
351+ })
352+ case Many => producer.init(st => ' {
353+ val oldAdvance : Unit => Unit = ~ currentAdvance.get
354+ val newAdvance : Unit => Unit = { _ : Unit => {
355+ if (~ producer.hasNext(st)) {
356+ ~ producer.step(st, k)
357+ }
358+ else {
359+ ~ currentAdvance.update(' {oldAdvance})
360+ }
361+ ~ currentAdvance.update(' {newAdvance})
362+ newAdvance(_)
363+ }}
364+ })
365+ }
366+ case nested : Nested [A , bt] =>
367+ makeAdvanceFunction(currentAdvance, (a : bt) => makeAdvanceFunction(currentAdvance, k, nested.nestedf(a)), Linear (nested.producer))
368+ }
369+ }
370+
371+ val newProducer = new Producer [A ] {
372+ // _1: if the stream has ended,
373+ // _2: the current element,
374+ // _3: the step of the inner most steam
375+ type St = (Var [Boolean ], Var [A ], Var [Unit => Unit ])
376+ val card : Cardinality = Many
377+
378+ def init (k : St => Expr [Unit ]): Expr [Unit ] = {
379+ producer.init(st => ' {
380+ Var (' { (_ : Unit ) => ()}){ advf => {
381+ Var (' { true }) { hasNext => {
382+ // Var('{ null.asInstanceOf[A] }) { curr => {
383+ // def adv() = {
384+ // ~hasNext.update(producer.hasNext(st))
385+ // if(~hasNext.get) {
386+ // ~producer.step(st, el => '{
387+ // ~makeAdvanceFunction(advf, ((a: A) => curr.update('{a})), nestedf(el))
388+ // })
389+ // }
390+ // }
391+ // ~k((flag, current, advf))
392+ ???
393+ // }}
394+ }}
395+ }}
396+ })
397+ }
398+
399+ def step (st : St , k : A => Expr [Unit ]): Expr [Unit ] = ???
400+
401+ def hasNext (st : St ): Expr [Boolean ] = ???
402+ }
403+
404+ Linear (newProducer)
405+ }
406+ }
320407 }
321408
322409 private def pushLinear [A , B , C ](producer : Producer [A ], nestedProducer : Producer [B ], nestedf : (B => StagedStream [C ])): StagedStream [(A , C )] = {
0 commit comments