5151 * chunk-ext = *( ";" chunk-ext-name [ "=" chunk-ext-val ] )
5252 * chunk-ext-name = token
5353 * chunk-ext-val = token / quoted-string
54+ *
55+ * trailer-part = *( header-field CRLF )
5456 * </pre>
5557 *
5658 * @see ChunkedEncodedInputStream
@@ -60,9 +62,12 @@ public class ChunkedEncodedPublisher implements Publisher<ByteBuffer> {
6062 private static final byte [] CRLF = {'\r' , '\n' };
6163 private static final byte SEMICOLON = ';' ;
6264 private static final byte EQUALS = '=' ;
65+ private static final byte COLON = ':' ;
66+ private static final byte COMMA = ',' ;
6367
6468 private final Publisher <ByteBuffer > wrapped ;
6569 private final List <ChunkExtensionProvider > extensions = new ArrayList <>();
70+ private final List <TrailerProvider > trailers = new ArrayList <>();
6671 private final int chunkSize ;
6772 private ByteBuffer chunkBuffer ;
6873 private final boolean addEmptyTrailingChunk ;
@@ -71,6 +76,7 @@ public ChunkedEncodedPublisher(Builder b) {
7176 this .wrapped = b .publisher ;
7277 this .chunkSize = b .chunkSize ;
7378 this .extensions .addAll (b .extensions );
79+ this .trailers .addAll (b .trailers );
7480 this .addEmptyTrailingChunk = b .addEmptyTrailingChunk ;
7581 }
7682
@@ -125,10 +131,9 @@ public Publisher<ByteBuffer> map(Publisher<ByteBuffer> upstream, Function<? supe
125131 return subscriber -> upstream .subscribe (MappingSubscriber .create (subscriber , mapper ));
126132 }
127133
128- // TODO: Trailing checksum
129134 private ByteBuffer encodeChunk (ByteBuffer byteBuffer ) {
130135 int contentLen = byteBuffer .remaining ();
131- byte [] chunkSizeHex = Integer .toHexString (contentLen ).getBytes (StandardCharsets .UTF_8 );
136+ byte [] chunkSizeHex = Integer .toHexString (byteBuffer . remaining () ).getBytes (StandardCharsets .UTF_8 );
132137
133138 List <Pair <byte [], byte []>> chunkExtensions = this .extensions .stream ()
134139 .map (e -> {
@@ -138,24 +143,54 @@ private ByteBuffer encodeChunk(ByteBuffer byteBuffer) {
138143
139144 int extensionsLength = calculateExtensionsLength (chunkExtensions );
140145
141- int encodedLen = chunkSizeHex .length + extensionsLength + CRLF .length + contentLen + CRLF .length ;
146+ boolean isTrailerChunk = contentLen == 0 ;
147+
148+ List <ByteBuffer > trailerData ;
149+ if (isTrailerChunk ) {
150+ trailerData = getTrailerData ();
151+ } else {
152+ trailerData = Collections .emptyList ();
153+ }
154+
155+ int trailerLen = trailerData .stream ()
156+ // + 2 for each CRLF that ends the header-field
157+ .mapToInt (t -> t .remaining () + 2 )
158+ .sum ();
159+
160+ int encodedLen = chunkSizeHex .length + extensionsLength + CRLF .length + contentLen + trailerLen + CRLF .length ;
161+
162+ if (isTrailerChunk ) {
163+ encodedLen += CRLF .length ;
164+ }
142165
143166 ByteBuffer encoded = ByteBuffer .allocate (encodedLen );
144- encoded .put (chunkSizeHex );
145167
146- chunkExtensions .forEach (p -> {
168+ encoded .put (chunkSizeHex ); // chunk-size
169+ chunkExtensions .forEach (p -> { // chunk-ext
147170 encoded .put (SEMICOLON );
148171 encoded .put (p .left ());
149172 if (p .right () != null && p .right ().length > 0 ) {
150173 encoded .put (EQUALS );
151174 encoded .put (p .right ());
152175 }
153176 });
154-
155- encoded .put (CRLF );
156- encoded .put (byteBuffer );
157177 encoded .put (CRLF );
158178
179+ // chunk-data
180+ if (byteBuffer .hasRemaining ()) {
181+ encoded .put (byteBuffer );
182+ encoded .put (CRLF );
183+ }
184+
185+ if (isTrailerChunk ) {
186+ // trailer-part
187+ trailerData .forEach (t -> {
188+ encoded .put (t );
189+ encoded .put (CRLF );
190+ });
191+ encoded .put (CRLF );
192+ }
193+
159194 encoded .flip ();
160195
161196 return encoded ;
@@ -174,6 +209,46 @@ private int calculateExtensionsLength(List<Pair<byte[], byte[]>> chunkExtensions
174209 }).sum ();
175210 }
176211
212+ private List <ByteBuffer > getTrailerData () {
213+ List <ByteBuffer > data = new ArrayList <>();
214+
215+ for (TrailerProvider provider : trailers ) {
216+ Pair <String , List <String >> trailer = provider .get ();
217+
218+ byte [] key = trailer .left ().getBytes (StandardCharsets .UTF_8 );
219+ List <byte []> values = trailer .right ()
220+ .stream ().map (v -> v .getBytes (StandardCharsets .UTF_8 ))
221+ .collect (Collectors .toList ());
222+
223+ if (values .isEmpty ()) {
224+ throw new RuntimeException (String .format ("Trailing header '%s' has no values" , trailer .left ()));
225+ }
226+
227+ int valuesLen = values .stream ().mapToInt (v -> v .length ).sum ();
228+ // name:value1,value2,..
229+ int size = key .length
230+ + 1 // colon
231+ + valuesLen
232+ + values .size () - 1 ; // commas
233+
234+ ByteBuffer trailerData = ByteBuffer .allocate (size );
235+
236+ trailerData .put (key );
237+ trailerData .put (COLON );
238+
239+ for (int i = 0 ; i < values .size (); ++i ) {
240+ trailerData .put (values .get (i ));
241+ if (i + 1 != values .size ()) {
242+ trailerData .put (COMMA );
243+ }
244+ }
245+
246+ trailerData .flip ();
247+ data .add (trailerData );
248+ }
249+ return data ;
250+ }
251+
177252 private class ChunkingSubscriber extends DelegatingSubscriber <ByteBuffer , Iterable <ByteBuffer >> {
178253 protected ChunkingSubscriber (Subscriber <? super Iterable <ByteBuffer >> subscriber ) {
179254 super (subscriber );
@@ -222,6 +297,7 @@ public static class Builder {
222297 private int chunkSize ;
223298 private boolean addEmptyTrailingChunk ;
224299 private final List <ChunkExtensionProvider > extensions = new ArrayList <>();
300+ private final List <TrailerProvider > trailers = new ArrayList <>();
225301
226302 public Builder publisher (Publisher <ByteBuffer > publisher ) {
227303 this .publisher = publisher ;
@@ -243,6 +319,11 @@ public Builder addExtension(ChunkExtensionProvider extension) {
243319 return this ;
244320 }
245321
322+ public Builder addTrailer (TrailerProvider trailerProvider ) {
323+ this .trailers .add (trailerProvider );
324+ return this ;
325+ }
326+
246327 public ChunkedEncodedPublisher build () {
247328 return new ChunkedEncodedPublisher (this );
248329 }
0 commit comments