@@ -69,15 +69,12 @@ function Base.bind(t::Trajectory{<:Any,<:Any,<:AsyncInsertSampleRatioController}
6969 bind (t. controler. ch_out, task)
7070end
7171
72- # !!! by default we assume `x` is a complete example which contains all the traces
73- # When doing partial inserting, the result of undefined
74- function Base. push! (t:: Trajectory , x)
75- push! (t. container, x)
76- on_insert! (t. controller, 1 )
77- end
78-
7972Base. setindex! (t:: Trajectory , v, I... ) = setindex! (t. container, v, I... )
8073
74+ # ####
75+ # in
76+ # ####
77+
8178struct CallMsg
8279 f:: Any
8380 args:: Tuple
@@ -93,32 +90,33 @@ function Base.append!(t::Trajectory, x)
9390 on_insert! (t. controller, length (x))
9491end
9592
96- # !!! bypass the controller
97- sample (t:: Trajectory ) = sample (t. sampler, t. container)
93+ # !!! by default we assume `x` is a complete example which contains all the traces
94+ # When doing partial inserting, the result of undefined
95+ function Base. push! (t:: Trajectory , x)
96+ push! (t. container, x)
97+ on_insert! (t)
98+ end
9899
99- on_sample! (t:: Trajectory ) = on_sample! (t. controller)
100+ on_insert! (t:: Trajectory ) = on_insert! (t, 1 )
101+ on_insert! (t:: Trajectory , n:: Int ) = on_insert! (t. controller, n)
100102
101- function Base. take! (t:: Trajectory )
102- if on_sample! (t)
103- sample (t. sampler, t. container) |> t. transformer
104- else
105- nothing
106- end
107- end
103+ # ####
104+ # out
105+ # ####
108106
109- function Base. iterate (t:: Trajectory )
110- x = take! (t)
111- if isnothing (x)
112- nothing
113- else
114- x, true
115- end
116- end
107+ SampleGenerator (t:: Trajectory ) = SampleGenerator (t. sampler, t. container)
108+
109+ on_sample! (t:: Trajectory ) = on_sample! (t. controller)
110+ sample (t:: Trajectory ) = sample (t. sampler, t. container)
111+
112+ """
113+ Keep sampling batches from the trajectory until the trajectory is not ready to
114+ be sampled yet due to the `controller`.
115+ """
116+ iter (t:: Trajectory ) = Iterators. takewhile (_ -> on_sample! (t), Iterators. cycle (SampleGenerator (t)))
117117
118- Base. iterate (t:: Trajectory , state) = iterate (t)
118+ Base. iterate (t:: Trajectory , args... ) = iterate (iter (t), args... )
119+ Base. IteratorSize (t:: Trajectory ) = Base. IteratorSize (iter (t))
119120
120121Base. iterate (t:: Trajectory{<:Any,<:Any,<:AsyncInsertSampleRatioController} , args... ) = iterate (t. controller. ch_out, args... )
121- Base. take! (t:: Trajectory{<:Any,<:Any,<:AsyncInsertSampleRatioController} ) = take! (t. controller. ch_out)
122-
123- Base. IteratorSize (:: Trajectory{<:Any,<:Any,<:AsyncInsertSampleRatioController} ) = Base. IsInfinite ()
124- Base. IteratorSize (:: Trajectory ) = Base. SizeUnknown ()
122+ Base. IteratorSize (t:: Trajectory{<:Any,<:Any,<:AsyncInsertSampleRatioController} ) = Base. IteratorSize (t. controller. ch_out)
0 commit comments