From 0954b56c34ccdf4b8d1711442610b8dff5f09625 Mon Sep 17 00:00:00 2001 From: Siddhant Chaudhary Date: Wed, 9 Aug 2023 17:08:58 +0530 Subject: [PATCH 1/3] Adding implementation of `first`. --- src/DTables.jl | 1 + src/table/dtable.jl | 23 +++++++++++++++++++++-- 2 files changed, 22 insertions(+), 2 deletions(-) diff --git a/src/DTables.jl b/src/DTables.jl index 59084dc..9c824e2 100644 --- a/src/DTables.jl +++ b/src/DTables.jl @@ -28,6 +28,7 @@ using Tables: materializer, partitioner, rows, + rowtable, schema, Schema diff --git a/src/table/dtable.jl b/src/table/dtable.jl index ce96fe7..149b3b6 100644 --- a/src/table/dtable.jl +++ b/src/table/dtable.jl @@ -4,7 +4,6 @@ const VTYPE = Vector{Union{Dagger.Chunk,Dagger.EagerThunk}} DTable Structure representing the distributed table based on Dagger. - The table is stored as a vector of `Chunk` structures which hold partitions of the table. That vector can also store `Dagger.EagerThunk` structures when an operation that modifies the underlying partitions was applied to it (currently only `filter`). @@ -17,7 +16,6 @@ end DTable(chunks::Vector, tabletype) = DTable(VTYPE(chunks), tabletype, nothing) DTable(chunks::Vector, tabletype, schema) = DTable(VTYPE(chunks), tabletype, schema) - """ DTable(table; tabletype=nothing) -> DTable @@ -258,6 +256,27 @@ function length(table::DTable) return sum(chunk_lengths(table)) end +function first(table::DTable, rows::UInt) + if nrow(table) == 0 + return table + end + + chunk_length = chunk_lengths(table)[1] + num_full_chunks = Int(floor(rows / chunk_length)) # number of required chunks + sink = materializer(table.tabletype) + if num_full_chunks * chunk_length == rows + required_chunks = table.chunks[1:num_full_chunks] + else + # take only the needed rows from extra chunk + needed_rows = rows - num_full_chunks * chunk_length + extra_chunk = table.chunks[num_full_chunks + 1] + extra_chunk_rows = rowtable(fetch(extra_chunk)) + new_chunk = Dagger.tochunk(sink(extra_chunk_rows[1:needed_rows])) + required_chunks = vcat(table.chunks[1:num_full_chunks], [new_chunk]) + end + return DTable(required_chunks, table.tabletype) +end + function columnnames_svector(d::DTable) colnames_tuple = determine_columnnames(d) return colnames_tuple !== nothing ? [sym for sym in colnames_tuple] : nothing From b2eab20b1bc2ef041539be224428e57203dbb18e Mon Sep 17 00:00:00 2001 From: Siddhant Chaudhary Date: Wed, 9 Aug 2023 17:14:46 +0530 Subject: [PATCH 2/3] Adding spaces. --- src/table/dtable.jl | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/table/dtable.jl b/src/table/dtable.jl index 149b3b6..28cc59a 100644 --- a/src/table/dtable.jl +++ b/src/table/dtable.jl @@ -4,6 +4,7 @@ const VTYPE = Vector{Union{Dagger.Chunk,Dagger.EagerThunk}} DTable Structure representing the distributed table based on Dagger. + The table is stored as a vector of `Chunk` structures which hold partitions of the table. That vector can also store `Dagger.EagerThunk` structures when an operation that modifies the underlying partitions was applied to it (currently only `filter`). @@ -16,6 +17,7 @@ end DTable(chunks::Vector, tabletype) = DTable(VTYPE(chunks), tabletype, nothing) DTable(chunks::Vector, tabletype, schema) = DTable(VTYPE(chunks), tabletype, schema) + """ DTable(table; tabletype=nothing) -> DTable From 063c6017967a3380c2718103b80a6075a6659fa2 Mon Sep 17 00:00:00 2001 From: Siddhant Chaudhary Date: Sat, 12 Aug 2023 02:30:42 +0530 Subject: [PATCH 3/3] Making the last chunk dtable an `EagerThunk`. --- src/table/dtable.jl | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/table/dtable.jl b/src/table/dtable.jl index 28cc59a..29cc401 100644 --- a/src/table/dtable.jl +++ b/src/table/dtable.jl @@ -263,7 +263,7 @@ function first(table::DTable, rows::UInt) return table end - chunk_length = chunk_lengths(table)[1] + chunk_length = maximum(chunk_lengths(table)) num_full_chunks = Int(floor(rows / chunk_length)) # number of required chunks sink = materializer(table.tabletype) if num_full_chunks * chunk_length == rows @@ -272,8 +272,7 @@ function first(table::DTable, rows::UInt) # take only the needed rows from extra chunk needed_rows = rows - num_full_chunks * chunk_length extra_chunk = table.chunks[num_full_chunks + 1] - extra_chunk_rows = rowtable(fetch(extra_chunk)) - new_chunk = Dagger.tochunk(sink(extra_chunk_rows[1:needed_rows])) + new_chunk = Dagger.@spawn sink(rowtable(fetch(extra_chunk))[1:needed_rows]) required_chunks = vcat(table.chunks[1:num_full_chunks], [new_chunk]) end return DTable(required_chunks, table.tabletype)