Skip to content

Commit 411b567

Browse files
committed
Merge branch 'develop' of github.com:msgpack/msgpack-java into develop
2 parents d42694a + f16e762 commit 411b567

File tree

3 files changed

+151
-13
lines changed

3 files changed

+151
-13
lines changed

msgpack-core/src/main/java/org/msgpack/core/MessageUnpacker.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -940,12 +940,13 @@ public String unpackString()
940940
if (len > stringSizeLimit) {
941941
throw new MessageSizeException(String.format("cannot unpack a String of size larger than %,d: %,d", stringSizeLimit, len), len);
942942
}
943+
944+
resetDecoder(); // should be invoked only once per value
945+
943946
if (buffer.size() - position >= len) {
944947
return decodeStringFastPath(len);
945948
}
946949

947-
resetDecoder();
948-
949950
try {
950951
int rawRemaining = len;
951952
while (rawRemaining > 0) {
@@ -1039,10 +1040,7 @@ private String decodeStringFastPath(int length)
10391040
return s;
10401041
}
10411042
else {
1042-
resetDecoder();
1043-
ByteBuffer bb = buffer.sliceAsByteBuffer();
1044-
bb.limit(position + length);
1045-
bb.position(position);
1043+
ByteBuffer bb = buffer.sliceAsByteBuffer(position, length);
10461044
CharBuffer cb;
10471045
try {
10481046
cb = decoder.decode(bb);
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
//
2+
// MessagePack for Java
3+
//
4+
// Licensed under the Apache License, Version 2.0 (the "License");
5+
// you may not use this file except in compliance with the License.
6+
// You may obtain a copy of the License at
7+
//
8+
// http://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an "AS IS" BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
//
16+
package org.msgpack.core.buffer;
17+
18+
import java.io.IOException;
19+
import java.util.Enumeration;
20+
21+
import static org.msgpack.core.Preconditions.checkNotNull;
22+
23+
/**
24+
* {@link MessageBufferInput} adapter for {@link MessageBufferInput} Enumeration
25+
*/
26+
public class SequenceMessageBufferInput
27+
implements MessageBufferInput
28+
{
29+
private Enumeration<? extends MessageBufferInput> sequence;
30+
private MessageBufferInput input;
31+
32+
public SequenceMessageBufferInput(Enumeration<? extends MessageBufferInput> sequence)
33+
{
34+
this.sequence = checkNotNull(sequence, "input sequence is null");
35+
try {
36+
nextInput();
37+
}
38+
catch (IOException ignore) {
39+
}
40+
}
41+
42+
@Override
43+
public MessageBuffer next() throws IOException
44+
{
45+
if (input == null) {
46+
return null;
47+
}
48+
MessageBuffer buffer = input.next();
49+
if (buffer == null) {
50+
nextInput();
51+
return next();
52+
}
53+
54+
return buffer;
55+
}
56+
57+
private void nextInput() throws IOException
58+
{
59+
if (input != null) {
60+
input.close();
61+
}
62+
63+
if (sequence.hasMoreElements()) {
64+
input = sequence.nextElement();
65+
if (input == null) {
66+
throw new NullPointerException("An element in the MessageBufferInput sequence is null");
67+
}
68+
}
69+
else {
70+
input = null;
71+
}
72+
}
73+
74+
@Override
75+
public void close() throws IOException
76+
{
77+
do {
78+
nextInput();
79+
} while (input != null);
80+
}
81+
}

msgpack-core/src/test/scala/org/msgpack/core/MessageUnpackerTest.scala

Lines changed: 66 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,13 @@ package org.msgpack.core
1717

1818
import java.io._
1919
import java.nio.ByteBuffer
20+
import java.util.Collections
2021

2122
import org.msgpack.core.buffer._
2223
import org.msgpack.value.ValueType
2324
import xerial.core.io.IOUtil._
2425

26+
import scala.collection.JavaConversions._
2527
import scala.util.Random
2628

2729
object MessageUnpackerTest {
@@ -205,6 +207,34 @@ class MessageUnpackerTest extends MessagePackSpec {
205207
builder.result()
206208
}
207209

210+
def unpackerCollectionWithVariousBuffers(data: Array[Byte], chunkSize: Int) : Seq[MessageUnpacker] = {
211+
val seqBytes = Seq.newBuilder[MessageBufferInput]
212+
val seqByteBuffers = Seq.newBuilder[MessageBufferInput]
213+
val seqDirectBuffers = Seq.newBuilder[MessageBufferInput]
214+
var left = data.length
215+
var position = 0
216+
while (left > 0) {
217+
val length = Math.min(chunkSize, left)
218+
seqBytes += new ArrayBufferInput(data, position, length)
219+
val bb = ByteBuffer.allocate(length)
220+
val db = ByteBuffer.allocateDirect(length)
221+
bb.put(data, position, length).flip()
222+
db.put(data, position, length).flip()
223+
seqByteBuffers += new ByteBufferInput(bb)
224+
seqDirectBuffers += new ByteBufferInput(db)
225+
left -= length
226+
position += length
227+
}
228+
val builder = Seq.newBuilder[MessageUnpacker]
229+
builder += MessagePack.newDefaultUnpacker(new SequenceMessageBufferInput(Collections.enumeration(seqBytes.result())))
230+
builder += MessagePack.newDefaultUnpacker(new SequenceMessageBufferInput(Collections.enumeration(seqByteBuffers.result())))
231+
if (!universal) {
232+
builder += MessagePack.newDefaultUnpacker(new SequenceMessageBufferInput(Collections.enumeration(seqDirectBuffers.result())))
233+
}
234+
235+
builder.result()
236+
}
237+
208238
"MessageUnpacker" should {
209239

210240
"parse message packed data" taggedAs ("unpack") in {
@@ -330,21 +360,50 @@ class MessageUnpackerTest extends MessagePackSpec {
330360
new SplitTest {val data = testData3(30)}.run
331361
}
332362

333-
"read numeric data at buffer boundary" taggedAs("boundary2") in {
363+
"read integer at MessageBuffer boundaries" taggedAs("integer-buffer-boundary") in {
334364
val packer = MessagePack.newDefaultBufferPacker()
335365
(0 until 1170).foreach{i =>
336366
packer.packLong(0x0011223344556677L)
337-
packer.packString("hello")
338367
}
339368
packer.close
340369
val data = packer.toByteArray
341370

342-
val unpacker = MessagePack.newDefaultUnpacker(new InputStreamBufferInput(new ByteArrayInputStream(data), 8192))
343-
(0 until 1170).foreach { i =>
344-
unpacker.unpackLong() shouldBe 0x0011223344556677L
345-
unpacker.unpackString() shouldBe "hello"
371+
// Boundary test
372+
withResource(MessagePack.newDefaultUnpacker(new InputStreamBufferInput(new ByteArrayInputStream(data), 8192))) { unpacker =>
373+
(0 until 1170).foreach { i =>
374+
unpacker.unpackLong() shouldBe 0x0011223344556677L
375+
}
376+
}
377+
378+
// Boundary test for sequences of ByteBuffer, DirectByteBuffer backed MessageInput.
379+
for (unpacker <- unpackerCollectionWithVariousBuffers(data, 32)) {
380+
(0 until 1170).foreach { i =>
381+
unpacker.unpackLong() shouldBe 0x0011223344556677L
382+
}
383+
}
384+
}
385+
386+
"read string at MessageBuffer boundaries" taggedAs("string-buffer-boundary") in {
387+
val packer = MessagePack.newDefaultBufferPacker()
388+
(0 until 1170).foreach{i =>
389+
packer.packString("hello world")
390+
}
391+
packer.close
392+
val data = packer.toByteArray
393+
394+
// Boundary test
395+
withResource(MessagePack.newDefaultUnpacker(new InputStreamBufferInput(new ByteArrayInputStream(data), 8192))) { unpacker =>
396+
(0 until 1170).foreach { i =>
397+
unpacker.unpackString() shouldBe "hello world"
398+
}
399+
}
400+
401+
// Boundary test for sequences of ByteBuffer, DirectByteBuffer backed MessageInput.
402+
for (unpacker <- unpackerCollectionWithVariousBuffers(data, 32)) {
403+
(0 until 1170).foreach { i =>
404+
unpacker.unpackString() shouldBe "hello world"
405+
}
346406
}
347-
unpacker.close()
348407
}
349408

350409
"be faster then msgpack-v6 skip" taggedAs ("cmp-skip") in {

0 commit comments

Comments
 (0)