Skip to content

Commit bdd0e54

Browse files
authored
Fix case when ipc stream has no record batches, only schema (#175)
* Fix case when ipc stream has no record batches, only schema Fixes #158. While the Julia implementation currently doesn't provide way to avoid writing any record batches, the pyarrow implementation has more fine-grained control over writing and allows closing an ipc stream without writing any record batches. In that case, on the Julia side when reading, we just need to check for this case specifically and if so, populate some empty columns, since we're currently relying on them being populated when record batches are read. * fix metadata
1 parent c5c77e6 commit bdd0e54

File tree

2 files changed

+24
-1
lines changed

2 files changed

+24
-1
lines changed

src/table.jl

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,7 @@ function Table(bytes::Vector{UInt8}, off::Integer=1, tlen::Union{Integer, Nothin
217217
i += 1
218218
end
219219
end
220+
anyrecordbatches = false
220221
for batch in BatchIterator(bytes, off)
221222
# store custom_metadata of batch.msg?
222223
header = batch.msg.header
@@ -258,6 +259,7 @@ function Table(bytes::Vector{UInt8}, off::Integer=1, tlen::Union{Integer, Nothin
258259
dictencodings[id] = DictEncoding{eltype(A), S, typeof(A)}(id, A, field.dictionary.isOrdered, values.metadata)
259260
@debug 1 "parsed dictionary batch message: id=$id, data=$values\n"
260261
elseif header isa Meta.RecordBatch
262+
anyrecordbatches = true
261263
@debug 1 "parsing record batch message: compression = $(header.compression)"
262264
put!(tsks, Threads.@spawn begin
263265
collect(VectorIterator(sch, batch, dictencodings, convert))
@@ -270,13 +272,20 @@ function Table(bytes::Vector{UInt8}, off::Integer=1, tlen::Union{Integer, Nothin
270272
wait(tsk)
271273
lu = lookup(t)
272274
ty = types(t)
275+
# 158; some implementations may send 0 record batches
276+
if !anyrecordbatches
277+
for field in sch.fields
278+
T = juliaeltype(field, buildmetadata(field), convert)
279+
push!(columns(t), T[])
280+
end
281+
end
273282
for (nm, col) in zip(names(t), columns(t))
274283
lu[nm] = col
275284
push!(ty, eltype(col))
276285
end
277286
meta = sch !== nothing ? sch.custom_metadata : nothing
278287
if meta !== nothing
279-
getfield(t, :metadata)[] = Dict(x.key=>x.value for x in meta)
288+
getfield(t, :metadata)[] = buildmetadata(meta)
280289
end
281290
return t
282291
end
@@ -337,6 +346,7 @@ end
337346
buildmetadata(f::Meta.Field) = buildmetadata(f.custom_metadata)
338347
buildmetadata(meta) = Dict(String(kv.key) => String(kv.value) for kv in meta)
339348
buildmetadata(::Nothing) = nothing
349+
buildmetadata(x::Dict{String, String}) = x
340350

341351
function Base.iterate(x::VectorIterator, (columnidx, nodeidx, bufferidx)=(Int64(1), Int64(1), Int64(1)))
342352
columnidx > length(x.schema.fields) && return nothing

test/runtests.jl

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -295,6 +295,19 @@ c = Arrow.ToTimestamp(x)
295295
@test eltype(c) == Arrow.Timestamp{Arrow.Flatbuf.TimeUnitModule.MILLISECOND, Symbol("Europe/Paris")}
296296
@test c[1] == Arrow.Timestamp{Arrow.Flatbuf.TimeUnitModule.MILLISECOND, Symbol("Europe/Paris")}(1577836800000)
297297

298+
# 158
299+
# arrow ipc stream generated from pyarrow with no record batches
300+
bytes = UInt8[0xff, 0xff, 0xff, 0xff, 0x78, 0x00, 0x00, 0x00, 0x10, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0a, 0x00, 0x0c, 0x00,
301+
0x06, 0x00, 0x05, 0x00, 0x08, 0x00, 0x0a, 0x00, 0x00, 0x00, 0x00, 0x01, 0x04, 0x00, 0x0c, 0x00, 0x00, 0x00, 0x08, 0x00,
302+
0x08, 0x00, 0x00, 0x00, 0x04, 0x00, 0x08, 0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x14, 0x00,
303+
0x00, 0x00, 0x10, 0x00, 0x14, 0x00, 0x08, 0x00, 0x06, 0x00, 0x07, 0x00, 0x0c, 0x00, 0x00, 0x00, 0x10, 0x00, 0x10, 0x00,
304+
0x00, 0x00, 0x00, 0x00, 0x01, 0x02, 0x10, 0x00, 0x00, 0x00, 0x1c, 0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00,
305+
0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x61, 0x00, 0x00, 0x00, 0x08, 0x00, 0x0c, 0x00, 0x08, 0x00, 0x07, 0x00, 0x08, 0x00,
306+
0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x40, 0x00, 0x00, 0x00, 0xff, 0xff, 0xff, 0xff, 0x00, 0x00, 0x00, 0x00]
307+
tbl = Arrow.Table(bytes)
308+
@test length(tbl.a) == 0
309+
@test eltype(tbl.a) == Union{Int64, Missing}
310+
298311
end # @testset "misc"
299312

300313
end

0 commit comments

Comments
 (0)