@@ -13,9 +13,9 @@ static void
1313streaming_compress_mark (void * p )
1414{
1515 struct streaming_compress_t * sc = p ;
16+ rb_gc_mark ((VALUE )sc -> ctx );
1617 rb_gc_mark (sc -> buf );
1718 rb_gc_mark (sc -> buf_size );
18- rb_gc_mark (sc -> pos );
1919}
2020
2121static void
@@ -50,13 +50,22 @@ rb_streaming_compress_allocate(VALUE klass)
5050 sc -> ctx = NULL ;
5151 sc -> buf = Qnil ;
5252 sc -> buf_size = 0 ;
53- sc -> pos = 0 ;
5453 return obj ;
5554}
5655
5756static VALUE
5857rb_streaming_compress_initialize (int argc , VALUE * argv , VALUE obj )
5958{
59+ VALUE compression_level_value ;
60+ rb_scan_args (argc , argv , "01" , & compression_level_value );
61+
62+ int compression_level ;
63+ if (NIL_P (compression_level_value )) {
64+ compression_level = 0 ; // The default. See ZSTD_CLEVEL_DEFAULT in zstd_compress.c
65+ } else {
66+ compression_level = NUM2INT (compression_level_value );
67+ }
68+
6069 struct streaming_compress_t * sc ;
6170 TypedData_Get_Struct (obj , struct streaming_compress_t , & streaming_compress_type , sc );
6271 size_t const buffOutSize = ZSTD_CStreamOutSize ();
@@ -65,14 +74,38 @@ rb_streaming_compress_initialize(int argc, VALUE *argv, VALUE obj)
6574 if (ctx == NULL ) {
6675 rb_raise (rb_eRuntimeError , "%s" , "ZSTD_createCCtx error" );
6776 }
68- ZSTD_CCtx_setParameter (ctx , ZSTD_c_compressionLevel , 1 );
77+ ZSTD_CCtx_setParameter (ctx , ZSTD_c_compressionLevel , compression_level );
6978 sc -> ctx = ctx ;
7079 sc -> buf = rb_str_new (NULL , buffOutSize );
7180 sc -> buf_size = buffOutSize ;
7281
7382 return obj ;
7483}
7584
85+ #define FIXNUMARG (val , ifnil ) \
86+ (NIL_P((val)) ? (ifnil) \
87+ : (FIX2INT((val))))
88+ #define ARG_CONTINUE (val ) FIXNUMARG((val), ZSTD_e_continue)
89+
90+ static VALUE
91+ no_compress (struct streaming_compress_t * sc , ZSTD_EndDirective endOp )
92+ {
93+ ZSTD_inBuffer input = { NULL , 0 , 0 };
94+ const char * output_data = RSTRING_PTR (sc -> buf );
95+ VALUE result = rb_str_new (0 , 0 );
96+ size_t ret ;
97+ do {
98+ ZSTD_outBuffer output = { (void * )output_data , sc -> buf_size , 0 };
99+
100+ size_t const ret = ZSTD_compressStream2 (sc -> ctx , & output , & input , endOp );
101+ if (ZSTD_isError (ret )) {
102+ rb_raise (rb_eRuntimeError , "flush error error code: %s" , ZSTD_getErrorName (ret ));
103+ }
104+ rb_str_cat (result , output .dst , output .pos );
105+ } while (ret > 0 );
106+ return result ;
107+ }
108+
76109static VALUE
77110rb_streaming_compress_compress (VALUE obj , VALUE src )
78111{
@@ -84,33 +117,58 @@ rb_streaming_compress_compress(VALUE obj, VALUE src)
84117 struct streaming_compress_t * sc ;
85118 TypedData_Get_Struct (obj , struct streaming_compress_t , & streaming_compress_type , sc );
86119 const char * output_data = RSTRING_PTR (sc -> buf );
87- ZSTD_outBuffer output = { (void * )output_data , sc -> buf_size , 0 };
88-
89- size_t const result = ZSTD_compressStream2 (sc -> ctx , & output , & input , ZSTD_e_continue );
90- if (ZSTD_isError (result )) {
91- rb_raise (rb_eRuntimeError , "compress error error code: %s" , ZSTD_getErrorName (result ));
120+ VALUE result = rb_str_new (0 , 0 );
121+ while (input .pos < input .size ) {
122+ ZSTD_outBuffer output = { (void * )output_data , sc -> buf_size , 0 };
123+ size_t const ret = ZSTD_compressStream2 (sc -> ctx , & output , & input , ZSTD_e_continue );
124+ if (ZSTD_isError (ret )) {
125+ rb_raise (rb_eRuntimeError , "compress error error code: %s" , ZSTD_getErrorName (ret ));
126+ }
127+ rb_str_cat (result , output .dst , output .pos );
92128 }
93- sc -> pos += output .pos ;
94- return obj ;
129+ return result ;
95130}
96131
97132static VALUE
98- rb_streaming_compress_finish (VALUE obj )
133+ rb_streaming_compress_addstr (VALUE obj , VALUE src )
99134{
100- ZSTD_inBuffer input = { NULL , 0 , 0 };
135+ StringValue (src );
136+ const char * input_data = RSTRING_PTR (src );
137+ size_t input_size = RSTRING_LEN (src );
138+ ZSTD_inBuffer input = { input_data , input_size , 0 };
101139
102140 struct streaming_compress_t * sc ;
103141 TypedData_Get_Struct (obj , struct streaming_compress_t , & streaming_compress_type , sc );
104142 const char * output_data = RSTRING_PTR (sc -> buf );
105- ZSTD_outBuffer output = { (void * )output_data , sc -> buf_size , 0 };
106143
107- size_t const result = ZSTD_compressStream2 (sc -> ctx , & output , & input , ZSTD_e_end );
108- if (ZSTD_isError (result )) {
109- rb_raise (rb_eRuntimeError , "finish error error code: %s" , ZSTD_getErrorName (result ));
144+ while (input .pos < input .size ) {
145+ ZSTD_outBuffer output = { (void * )output_data , sc -> buf_size , 0 };
146+ size_t const result = ZSTD_compressStream2 (sc -> ctx , & output , & input , ZSTD_e_continue );
147+ if (ZSTD_isError (result )) {
148+ rb_raise (rb_eRuntimeError , "compress error error code: %s" , ZSTD_getErrorName (result ));
149+ }
110150 }
111- sc -> pos += output .pos ;
112- rb_str_resize (sc -> buf , sc -> pos );
113- return sc -> buf ;
151+ return obj ;
152+ }
153+
154+
155+ static VALUE
156+ rb_streaming_compress_flush (VALUE obj )
157+ {
158+ struct streaming_compress_t * sc ;
159+ TypedData_Get_Struct (obj , struct streaming_compress_t , & streaming_compress_type , sc );
160+ VALUE result = no_compress (sc , ZSTD_e_flush );
161+ return result ;
162+ }
163+
164+
165+ static VALUE
166+ rb_streaming_compress_finish (VALUE obj )
167+ {
168+ struct streaming_compress_t * sc ;
169+ TypedData_Get_Struct (obj , struct streaming_compress_t , & streaming_compress_type , sc );
170+ VALUE result = no_compress (sc , ZSTD_e_end );
171+ return result ;
114172}
115173
116174extern VALUE rb_mZstd , cStreamingCompress ;
@@ -120,7 +178,13 @@ zstd_ruby_streaming_compress_init(void)
120178 VALUE cStreamingCompress = rb_define_class_under (rb_mZstd , "StreamingCompress" , rb_cObject );
121179 rb_define_alloc_func (cStreamingCompress , rb_streaming_compress_allocate );
122180 rb_define_method (cStreamingCompress , "initialize" , rb_streaming_compress_initialize , -1 );
123- rb_define_method (cStreamingCompress , "<<" , rb_streaming_compress_compress , 1 );
181+ rb_define_method (cStreamingCompress , "compress" , rb_streaming_compress_compress , 1 );
182+ rb_define_method (cStreamingCompress , "<<" , rb_streaming_compress_addstr , 1 );
183+ rb_define_method (cStreamingCompress , "flush" , rb_streaming_compress_flush , 0 );
124184 rb_define_method (cStreamingCompress , "finish" , rb_streaming_compress_finish , 0 );
185+
186+ rb_define_const (cStreamingCompress , "CONTINUE" , INT2FIX (ZSTD_e_continue ));
187+ rb_define_const (cStreamingCompress , "FLUSH" , INT2FIX (ZSTD_e_flush ));
188+ rb_define_const (cStreamingCompress , "END" , INT2FIX (ZSTD_e_end ));
125189}
126190
0 commit comments