Skip to content

Commit 446da7d

Browse files
committed
Add ByteBuffer input support again
1 parent 734359f commit 446da7d

File tree

9 files changed

+398
-128
lines changed

9 files changed

+398
-128
lines changed

msgpack-core/src/main/java/org/msgpack/core/MessagePack.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package org.msgpack.core;
1717

1818
import org.msgpack.core.buffer.ArrayBufferInput;
19+
import org.msgpack.core.buffer.ByteBufferInput;
1920
import org.msgpack.core.buffer.ChannelBufferInput;
2021
import org.msgpack.core.buffer.ChannelBufferOutput;
2122
import org.msgpack.core.buffer.InputStreamBufferInput;
@@ -25,6 +26,7 @@
2526

2627
import java.io.InputStream;
2728
import java.io.OutputStream;
29+
import java.nio.ByteBuffer;
2830
import java.nio.channels.ReadableByteChannel;
2931
import java.nio.channels.WritableByteChannel;
3032
import java.nio.charset.Charset;
@@ -236,6 +238,17 @@ public static MessageUnpacker newDefaultUnpacker(byte[] contents, int offset, in
236238
return DEFAULT_UNPACKER_CONFIG.newUnpacker(contents, offset, length);
237239
}
238240

241+
/**
242+
* Create an unpacker that reads the data from a given ByteBuffer
243+
*
244+
* @param contents
245+
* @return
246+
*/
247+
public static MessageUnpacker newDefaultUnpacker(ByteBuffer contents)
248+
{
249+
return DEFAULT_UNPACKER_CONFIG.newUnpacker(contents);
250+
}
251+
239252
/**
240253
* MessagePacker configuration.
241254
*/
@@ -524,6 +537,17 @@ public MessageUnpacker newUnpacker(byte[] contents, int offset, int length)
524537
return newUnpacker(new ArrayBufferInput(contents, offset, length));
525538
}
526539

540+
/**
541+
* Create an unpacker that reads the data from a given ByteBuffer
542+
*
543+
* @param contents
544+
* @return
545+
*/
546+
public MessageUnpacker newUnpacker(ByteBuffer contents)
547+
{
548+
return newUnpacker(new ByteBufferInput(contents));
549+
}
550+
527551
/**
528552
* Allow unpackBinaryHeader to read str format family (default: true)
529553
*/

msgpack-core/src/main/java/org/msgpack/core/MessagePacker.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -531,8 +531,7 @@ else if (s.length() < (1 << 8)) {
531531
throw new IllegalArgumentException("Unexpected UTF-8 encoder state");
532532
}
533533
// move 1 byte backward to expand 3-byte header region to 3 bytes
534-
buffer.putBytes(position + 3,
535-
buffer.array(), buffer.arrayOffset() + position + 2, written);
534+
buffer.putMessageBuffer(position + 3, buffer, position + 2, written);
536535
// write 3-byte header
537536
buffer.putByte(position++, STR16);
538537
buffer.putShort(position, (short) written);
@@ -560,8 +559,7 @@ else if (s.length() < (1 << 16)) {
560559
throw new IllegalArgumentException("Unexpected UTF-8 encoder state");
561560
}
562561
// move 2 bytes backward to expand 3-byte header region to 5 bytes
563-
buffer.putBytes(position + 5,
564-
buffer.array(), buffer.arrayOffset() + position + 3, written);
562+
buffer.putMessageBuffer(position + 5, buffer, position + 3, written);
565563
// write 3-byte header header
566564
buffer.putByte(position++, STR32);
567565
buffer.putInt(position, written);

msgpack-core/src/main/java/org/msgpack/core/MessageUnpacker.java

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -213,14 +213,9 @@ private MessageBuffer prepareNumberBuffer(int readLength)
213213
// fill the temporary buffer from the current data fragment and
214214
// next fragment(s).
215215

216-
// TODO buffer.array() doesn't work if MessageBuffer is allocated by
217-
// newDirectBuffer. dd copy method to MessageBuffer to solve this issue.
218-
219216
int off = 0;
220217
if (remaining > 0) {
221-
numberBuffer.putBytes(0,
222-
buffer.array(), buffer.arrayOffset() + position,
223-
remaining);
218+
numberBuffer.putMessageBuffer(0, buffer, position, remaining);
224219
readLength -= remaining;
225220
off += remaining;
226221
}
@@ -229,16 +224,12 @@ private MessageBuffer prepareNumberBuffer(int readLength)
229224
nextBuffer();
230225
int nextSize = buffer.size();
231226
if (nextSize >= readLength) {
232-
numberBuffer.putBytes(off,
233-
buffer.array(), buffer.arrayOffset(),
234-
readLength);
227+
numberBuffer.putMessageBuffer(off, buffer, 0, readLength);
235228
position = readLength;
236229
break;
237230
}
238231
else {
239-
numberBuffer.putBytes(off,
240-
buffer.array(), buffer.arrayOffset(),
241-
nextSize);
232+
numberBuffer.putMessageBuffer(off, buffer, 0, nextSize);
242233
readLength -= nextSize;
243234
off += nextSize;
244235
}
@@ -1041,7 +1032,8 @@ private void handleCoderError(CoderResult cr)
10411032
private String decodeStringFastPath(int length)
10421033
{
10431034
if (actionOnMalformedString == CodingErrorAction.REPLACE &&
1044-
actionOnUnmappableString == CodingErrorAction.REPLACE) {
1035+
actionOnUnmappableString == CodingErrorAction.REPLACE &&
1036+
buffer.hasArray()) {
10451037
String s = new String(buffer.array(), buffer.arrayOffset() + position, length, MessagePack.UTF8);
10461038
position += length;
10471039
return s;
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
//
2+
// MessagePack for Java
3+
//
4+
// Licensed under the Apache License, Version 2.0 (the "License");
5+
// you may not use this file except in compliance with the License.
6+
// You may obtain a copy of the License at
7+
//
8+
// http://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an "AS IS" BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
//
16+
package org.msgpack.core.buffer;
17+
18+
import java.io.IOException;
19+
import java.nio.ByteBuffer;
20+
21+
import static org.msgpack.core.Preconditions.checkNotNull;
22+
23+
/**
24+
* {@link MessageBufferInput} adapter for {@link java.nio.ByteBuffer}
25+
*/
26+
public class ByteBufferInput
27+
implements MessageBufferInput
28+
{
29+
private ByteBuffer input;
30+
private boolean isRead = false;
31+
32+
public ByteBufferInput(ByteBuffer input)
33+
{
34+
this.input = checkNotNull(input, "input ByteBuffer is null");
35+
}
36+
37+
/**
38+
* Reset buffer. This method doesn't close the old resource.
39+
*
40+
* @param input new buffer
41+
* @return the old resource
42+
*/
43+
public ByteBuffer reset(ByteBuffer input)
44+
{
45+
ByteBuffer old = this.input;
46+
this.input = input;
47+
isRead = false;
48+
return old;
49+
}
50+
51+
@Override
52+
public MessageBuffer next()
53+
throws IOException
54+
{
55+
if (isRead) {
56+
return null;
57+
}
58+
59+
isRead = true;
60+
return MessageBuffer.wrap(input);
61+
}
62+
63+
@Override
64+
public void close()
65+
throws IOException
66+
{
67+
// Nothing to do
68+
}
69+
}

msgpack-core/src/main/java/org/msgpack/core/buffer/MessageBuffer.java

Lines changed: 85 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ public class MessageBuffer
4343
* Reference to MessageBuffer Constructors
4444
*/
4545
private static final Constructor<?> mbArrConstructor;
46+
private static final Constructor<?> mbBBConstructor;
4647

4748
/**
4849
* The offset from the object memory header to its byte array data
@@ -145,6 +146,11 @@ public class MessageBuffer
145146
Constructor<?> mbArrCstr = bufferCls.getDeclaredConstructor(byte[].class, int.class, int.class);
146147
mbArrCstr.setAccessible(true);
147148
mbArrConstructor = mbArrCstr;
149+
150+
// MessageBufferX(ByteBuffer) constructor
151+
Constructor<?> mbBBCstr = bufferCls.getDeclaredConstructor(ByteBuffer.class);
152+
mbBBCstr.setAccessible(true);
153+
mbBBConstructor = mbBBCstr;
148154
}
149155
catch (Exception e) {
150156
e.printStackTrace(System.err);
@@ -170,6 +176,12 @@ public class MessageBuffer
170176
*/
171177
protected final int size;
172178

179+
/**
180+
* Reference is used to hold a reference to an object that holds the underlying memory so that it cannot be
181+
* released by the garbage collector.
182+
*/
183+
protected final ByteBuffer reference;
184+
173185
public static MessageBuffer allocate(int length)
174186
{
175187
return wrap(new byte[length]);
@@ -185,6 +197,11 @@ public static MessageBuffer wrap(byte[] array, int offset, int length)
185197
return newMessageBuffer(array, offset, length);
186198
}
187199

200+
public static MessageBuffer wrap(ByteBuffer bb)
201+
{
202+
return newMessageBuffer(bb).slice(bb.position(), bb.remaining());
203+
}
204+
188205
/**
189206
* Creates a new MessageBuffer instance backed by a java heap array
190207
*
@@ -202,11 +219,32 @@ private static MessageBuffer newMessageBuffer(byte[] arr, int off, int len)
202219
}
203220
}
204221

222+
/**
223+
* Creates a new MessageBuffer instance backed by ByteBuffer
224+
*
225+
* @param bb
226+
* @return
227+
*/
228+
private static MessageBuffer newMessageBuffer(ByteBuffer bb)
229+
{
230+
checkNotNull(bb);
231+
try {
232+
// We need to use reflection to create MessageBuffer instances in order to prevent TypeProfile generation for getInt method. TypeProfile will be
233+
// generated to resolve one of the method references when two or more classes overrides the method.
234+
return (MessageBuffer) mbBBConstructor.newInstance(bb);
235+
} catch (Exception e) {
236+
throw new RuntimeException(e);
237+
}
238+
}
239+
205240
public static void releaseBuffer(MessageBuffer buffer)
206241
{
207242
if (isUniversalBuffer || buffer.base instanceof byte[]) {
208243
// We have nothing to do. Wait until the garbage-collector collects this array object
209244
}
245+
else if (DirectBufferAccess.isDirectByteBufferInstance(buffer.base)) {
246+
DirectBufferAccess.clean(buffer.base);
247+
}
210248
else {
211249
// Maybe cannot reach here
212250
unsafe.freeMemory(buffer.address);
@@ -225,13 +263,43 @@ public static void releaseBuffer(MessageBuffer buffer)
225263
this.base = arr;
226264
this.address = ARRAY_BYTE_BASE_OFFSET + offset;
227265
this.size = length;
266+
this.reference = null;
267+
}
268+
269+
/**
270+
* Create a MessageBuffer instance from a given ByteBuffer instance
271+
*
272+
* @param bb
273+
*/
274+
MessageBuffer(ByteBuffer bb)
275+
{
276+
if (bb.isDirect()) {
277+
if (isUniversalBuffer) {
278+
throw new IllegalStateException("Cannot create MessageBuffer from DirectBuffer");
279+
}
280+
// Direct buffer or off-heap memory
281+
this.base = null;
282+
this.address = DirectBufferAccess.getAddress(bb);
283+
this.size = bb.capacity();
284+
this.reference = bb;
285+
}
286+
else if (bb.hasArray()) {
287+
this.base = bb.array();
288+
this.address = ARRAY_BYTE_BASE_OFFSET;
289+
this.size = bb.array().length;
290+
this.reference = null;
291+
}
292+
else {
293+
throw new IllegalArgumentException("Only the array-backed ByteBuffer or DirectBuffer are supported");
294+
}
228295
}
229296

230297
protected MessageBuffer(Object base, long address, int length)
231298
{
232299
this.base = base;
233300
this.address = address;
234301
this.size = length;
302+
this.reference = null;
235303
}
236304

237305
/**
@@ -393,6 +461,11 @@ else if (src.hasArray()) {
393461
}
394462
}
395463

464+
public void putMessageBuffer(int index, MessageBuffer src, int srcOffset, int len)
465+
{
466+
unsafe.copyMemory(src.base, src.address + srcOffset, base, address + index, len);
467+
}
468+
396469
/**
397470
* Create a ByteBuffer view of the range [index, index+length) of this memory
398471
*
@@ -402,7 +475,13 @@ else if (src.hasArray()) {
402475
*/
403476
public ByteBuffer sliceAsByteBuffer(int index, int length)
404477
{
405-
return ByteBuffer.wrap((byte[]) base, (int) ((address - ARRAY_BYTE_BASE_OFFSET) + index), length);
478+
if (hasArray()) {
479+
return ByteBuffer.wrap((byte[]) base, (int) ((address - ARRAY_BYTE_BASE_OFFSET) + index), length);
480+
}
481+
else {
482+
assert (!isUniversalBuffer);
483+
return DirectBufferAccess.newByteBuffer(address, index, length, reference);
484+
}
406485
}
407486

408487
/**
@@ -415,6 +494,11 @@ public ByteBuffer sliceAsByteBuffer()
415494
return sliceAsByteBuffer(0, size());
416495
}
417496

497+
public boolean hasArray()
498+
{
499+
return base instanceof byte[];
500+
}
501+
418502
/**
419503
* Get a copy of this buffer
420504
*

0 commit comments

Comments
 (0)