@@ -4,6 +4,7 @@ struct streaming_compress_t {
44 ZSTD_CCtx * ctx ;
55 VALUE buf ;
66 size_t buf_size ;
7+ VALUE pending ; /* accumulate compressed bytes produced by write() */
78};
89
910static void
@@ -12,8 +13,10 @@ streaming_compress_mark(void *p)
1213 struct streaming_compress_t * sc = p ;
1314#ifdef HAVE_RB_GC_MARK_MOVABLE
1415 rb_gc_mark_movable (sc -> buf );
16+ rb_gc_mark_movable (sc -> pending );
1517#else
1618 rb_gc_mark (sc -> buf );
19+ rb_gc_mark (sc -> pending );
1720#endif
1821}
1922
@@ -40,6 +43,7 @@ streaming_compress_compact(void *p)
4043{
4144 struct streaming_compress_t * sc = p ;
4245 sc -> buf = rb_gc_location (sc -> buf );
46+ sc -> pending = rb_gc_location (sc -> pending );
4347}
4448#endif
4549
@@ -64,6 +68,7 @@ rb_streaming_compress_allocate(VALUE klass)
6468 sc -> ctx = NULL ;
6569 sc -> buf = Qnil ;
6670 sc -> buf_size = 0 ;
71+ sc -> pending = Qnil ;
6772 return obj ;
6873}
6974
@@ -86,6 +91,7 @@ rb_streaming_compress_initialize(int argc, VALUE *argv, VALUE obj)
8691 sc -> ctx = ctx ;
8792 sc -> buf = rb_str_new (NULL , buffOutSize );
8893 sc -> buf_size = buffOutSize ;
94+ sc -> pending = rb_str_new (0 , 0 );
8995
9096 return obj ;
9197}
@@ -142,7 +148,6 @@ static VALUE
142148rb_streaming_compress_write (int argc , VALUE * argv , VALUE obj )
143149{
144150 size_t total = 0 ;
145- VALUE result = rb_str_new (0 , 0 );
146151 struct streaming_compress_t * sc ;
147152 TypedData_Get_Struct (obj , struct streaming_compress_t , & streaming_compress_type , sc );
148153 const char * output_data = RSTRING_PTR (sc -> buf );
@@ -160,6 +165,10 @@ rb_streaming_compress_write(int argc, VALUE *argv, VALUE obj)
160165 if (ZSTD_isError (ret )) {
161166 rb_raise (rb_eRuntimeError , "compress error error code: %s" , ZSTD_getErrorName (ret ));
162167 }
168+ /* collect produced bytes */
169+ if (output .pos > 0 ) {
170+ rb_str_cat (sc -> pending , output .dst , output .pos );
171+ }
163172 total += RSTRING_LEN (str );
164173 }
165174 }
@@ -192,17 +201,23 @@ rb_streaming_compress_flush(VALUE obj)
192201{
193202 struct streaming_compress_t * sc ;
194203 TypedData_Get_Struct (obj , struct streaming_compress_t , & streaming_compress_type , sc );
195- VALUE result = no_compress (sc , ZSTD_e_flush );
196- return result ;
204+ VALUE drained = no_compress (sc , ZSTD_e_flush );
205+ rb_str_cat (sc -> pending , RSTRING_PTR (drained ), RSTRING_LEN (drained ));
206+ VALUE out = sc -> pending ;
207+ sc -> pending = rb_str_new (0 , 0 );
208+ return out ;
197209}
198210
199211static VALUE
200212rb_streaming_compress_finish (VALUE obj )
201213{
202214 struct streaming_compress_t * sc ;
203215 TypedData_Get_Struct (obj , struct streaming_compress_t , & streaming_compress_type , sc );
204- VALUE result = no_compress (sc , ZSTD_e_end );
205- return result ;
216+ VALUE drained = no_compress (sc , ZSTD_e_end );
217+ rb_str_cat (sc -> pending , RSTRING_PTR (drained ), RSTRING_LEN (drained ));
218+ VALUE out = sc -> pending ;
219+ sc -> pending = rb_str_new (0 , 0 );
220+ return out ;
206221}
207222
208223extern VALUE rb_mZstd , cStreamingCompress ;
0 commit comments