Skip to content

Commit a5f5289

Browse files
authored
Add pledgeinsize (#64)
* Add pledgeinsize * Add tests for pledgeinsize * test errors * Add coverage for upstream tests * test unknown pledgeinsize * fix tests for 32 bit * remove dead code * bump compat * remove unrelated changes * check if `pledgeinsize` is defined * remove unrelated changes * fix tests
1 parent eef39ab commit a5f5289

File tree

3 files changed

+103
-20
lines changed

3 files changed

+103
-20
lines changed

src/compression.jl

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -180,15 +180,30 @@ function TranscodingStreams.startproc(codec::ZstdCompressor, mode::Symbol, err::
180180
end
181181
# TODO Allow setting other parameters here.
182182
end
183-
code = reset!(codec.cstream, 0 #=unknown source size=#)
184-
if iserror(code)
185-
# This is unreachable according to zstd.h
186-
err[] = ErrorException("zstd error resetting context.")
187-
return :error
188-
end
183+
reset!(codec.cstream)
189184
return :ok
190185
end
191186

187+
@static if isdefined(TranscodingStreams, :pledgeinsize) # Defined in v0.11.3
188+
function TranscodingStreams.pledgeinsize(codec::ZstdCompressor, insize::Int64, err::Error)::Symbol
189+
if codec.cstream.ptr == C_NULL
190+
error("`startproc` must be called before `pledgeinsize`")
191+
end
192+
srcsize = if signbit(insize)
193+
ZSTD_CONTENTSIZE_UNKNOWN
194+
else
195+
Culonglong(insize)
196+
end
197+
code = LibZstd.ZSTD_CCtx_setPledgedSrcSize(codec.cstream, srcsize)
198+
if iserror(code)
199+
err[] = ErrorException("zstd error setting pledged source size")
200+
:error
201+
else
202+
:ok
203+
end
204+
end
205+
end
206+
192207
function TranscodingStreams.process(codec::ZstdCompressor, input::Memory, output::Memory, err::Error)
193208
if codec.cstream.ptr == C_NULL
194209
error("startproc must be called before process")

src/libzstd.jl

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -60,23 +60,17 @@ Base.unsafe_convert(::Type{Ptr{LibZstd.ZSTD_CCtx}}, cstream::CStream) = cstream.
6060
Base.unsafe_convert(::Type{Ptr{InBuffer}}, cstream::CStream) = Base.unsafe_convert(Ptr{InBuffer}, cstream.ibuffer)
6161
Base.unsafe_convert(::Type{Ptr{OutBuffer}}, cstream::CStream) = Base.unsafe_convert(Ptr{OutBuffer}, cstream.obuffer)
6262

63-
function reset!(cstream::CStream, srcsize::Integer)
63+
function reset!(cstream::CStream)
6464
# ZSTD_resetCStream is deprecated
6565
# https://github.com/facebook/zstd/blob/9d2a45a705e22ad4817b41442949cd0f78597154/lib/zstd.h#L2253-L2272
6666
res = LibZstd.ZSTD_CCtx_reset(cstream, LibZstd.ZSTD_reset_session_only)
67-
if iserror(res)
68-
return res
69-
end
70-
if srcsize == 0
71-
# From zstd.h:
72-
# Note: ZSTD_resetCStream() interprets pledgedSrcSize == 0 as ZSTD_CONTENTSIZE_UNKNOWN, but
73-
# ZSTD_CCtx_setPledgedSrcSize() does not do the same, so ZSTD_CONTENTSIZE_UNKNOWN must be
74-
# explicitly specified.
75-
srcsize = ZSTD_CONTENTSIZE_UNKNOWN
76-
end
7767
reset!(cstream.ibuffer)
7868
reset!(cstream.obuffer)
79-
return LibZstd.ZSTD_CCtx_setPledgedSrcSize(cstream, srcsize)
69+
if iserror(res)
70+
# According to zstd.h "Resetting session never fails" so this branch should be unreachable.
71+
error("unreachable")
72+
end
73+
return
8074
end
8175

8276
"""

test/runtests.jl

Lines changed: 76 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -130,9 +130,17 @@ include("utils.jl")
130130
@test CodecZstd.find_decompressed_size(v) == 22
131131

132132
codec = ZstdCompressor
133-
buffer3 = transcode(codec, b"Hello")
134-
buffer4 = transcode(codec, b"World!")
133+
sink = IOBuffer()
134+
s = TranscodingStream(codec(), sink; stop_on_end=true)
135+
write(s, b"Hello")
136+
close(s)
137+
buffer3 = take!(sink)
135138
@test CodecZstd.find_decompressed_size(buffer3) == CodecZstd.ZSTD_CONTENTSIZE_UNKNOWN
139+
sink = IOBuffer()
140+
s = TranscodingStream(codec(), sink; stop_on_end=true)
141+
write(s, b"Hello")
142+
close(s)
143+
buffer4 = take!(sink)
136144
@test CodecZstd.find_decompressed_size(buffer4) == CodecZstd.ZSTD_CONTENTSIZE_UNKNOWN
137145

138146
write(iob, buffer1)
@@ -156,6 +164,68 @@ include("utils.jl")
156164
@test CodecZstd.find_decompressed_size(v) == CodecZstd.ZSTD_CONTENTSIZE_ERROR
157165
end
158166

167+
if isdefined(TranscodingStreams, :pledgeinsize)
168+
@testset "pledgeinsize" begin
169+
# when pledgeinsize is available transcode should save the
170+
# decompressed size in a header
171+
for n in [0:30; 1000; 1000000;]
172+
v = transcode(ZstdCompressor, rand(UInt8, n))
173+
@test CodecZstd.find_decompressed_size(v) == n
174+
end
175+
176+
# Test what happens if pledgeinsize promise is broken
177+
d1 = zeros(UInt8, 10000)
178+
d2 = zeros(UInt8, 10000)
179+
GC.@preserve d1 d2 begin
180+
@testset "too many bytes" begin
181+
m1 = TranscodingStreams.Memory(pointer(d1), 1000)
182+
m2 = TranscodingStreams.Memory(pointer(d2), 1000)
183+
codec = ZstdCompressor()
184+
e = TranscodingStreams.Error()
185+
@test TranscodingStreams.startproc(codec, :read, e) === :ok
186+
@test TranscodingStreams.pledgeinsize(codec, Int64(10), e) === :ok
187+
@test TranscodingStreams.process(codec, m1, m2, e) === (0, 0, :error)
188+
@test e[] == ErrorException("zstd compression error: Src size is incorrect")
189+
TranscodingStreams.finalize(codec)
190+
end
191+
@testset "too few bytes" begin
192+
m1 = TranscodingStreams.Memory(pointer(d1), 10)
193+
m2 = TranscodingStreams.Memory(pointer(d2), 1000)
194+
codec = ZstdCompressor()
195+
e = TranscodingStreams.Error()
196+
@test TranscodingStreams.startproc(codec, :read, e) === :ok
197+
@test TranscodingStreams.pledgeinsize(codec, Int64(10000), e) === :ok
198+
@test TranscodingStreams.process(codec, m1, m2, e)[3] === :ok
199+
m1 = TranscodingStreams.Memory(pointer(d1), 0)
200+
@test TranscodingStreams.process(codec, m1, m2, e)[3] === :error
201+
@test e[] == ErrorException("zstd compression error: Src size is incorrect")
202+
TranscodingStreams.finalize(codec)
203+
end
204+
@testset "set pledgeinsize after process" begin
205+
m1 = TranscodingStreams.Memory(pointer(d1), 1000)
206+
m2 = TranscodingStreams.Memory(pointer(d2), 1000)
207+
codec = ZstdCompressor()
208+
e = TranscodingStreams.Error()
209+
@test TranscodingStreams.startproc(codec, :read, e) === :ok
210+
@test TranscodingStreams.process(codec, m1, m2, e)[3] === :ok
211+
@test TranscodingStreams.pledgeinsize(codec, Int64(10000), e) === :error
212+
@test e[] == ErrorException("zstd error setting pledged source size")
213+
TranscodingStreams.finalize(codec)
214+
end
215+
@testset "set unknown pledgeinsize" begin
216+
m1 = TranscodingStreams.Memory(pointer(d1), 1000)
217+
m2 = TranscodingStreams.Memory(pointer(d2), 1000)
218+
codec = ZstdCompressor()
219+
e = TranscodingStreams.Error()
220+
@test TranscodingStreams.startproc(codec, :read, e) === :ok
221+
@test TranscodingStreams.pledgeinsize(codec, Int64(-1), e) === :ok
222+
@test TranscodingStreams.process(codec, m1, m2, e)[3] === :ok
223+
TranscodingStreams.finalize(codec)
224+
end
225+
end
226+
end
227+
end
228+
159229
include("compress_endOp.jl")
160230
include("static_only_tests.jl")
161231

@@ -195,6 +265,10 @@ include("utils.jl")
195265
TranscodingStreams.finalize(codec)
196266
data = [0x00,0x01]
197267
GC.@preserve data let m = TranscodingStreams.Memory(pointer(data), length(data))
268+
try
269+
TranscodingStreams.pledgeinsize(codec, Int64(10), TranscodingStreams.Error())
270+
catch
271+
end
198272
try
199273
TranscodingStreams.expectedsize(codec, m)
200274
catch

0 commit comments

Comments
 (0)