|
4 | 4 | import java.util.Collection; |
5 | 5 | import java.util.LinkedList; |
6 | 6 |
|
| 7 | +import org.slf4j.Logger; |
| 8 | +import org.slf4j.LoggerFactory; |
| 9 | + |
7 | 10 | class LengthPrefixedByteArrayProcessor { |
8 | 11 |
|
9 | 12 | private final ByteArrayOutputStream buffer = new ByteArrayOutputStream(); |
10 | 13 | private Byte firstLengthByteBuffer; //Only used if we've received a single byte at the start of a message |
11 | 14 | private int targetLength = 0; |
| 15 | + |
| 16 | + private final static Logger LOGGER = LoggerFactory.getLogger(LengthPrefixedByteArrayProcessor.class); |
12 | 17 |
|
13 | 18 | public synchronized Collection<byte[]> handle(byte[] data) { |
14 | 19 | Collection<byte[]> results = new LinkedList<>(); |
15 | 20 | int pos = 0; |
| 21 | + LOGGER.trace("Received message of length {}. Existing buffer is {}", data.length, buffer.size()); |
16 | 22 | if (buffer.size() == 0) { |
17 | 23 | while(data.length - pos > 18) { |
18 | 24 | int targetLength = (data[0] & 0xFF) + (data[1] & 0xFF) * 256 + 16 + 2; |
| 25 | + LOGGER.trace("Attempting to read message of length {}", targetLength); |
19 | 26 | if (data.length >= pos + targetLength) { |
20 | 27 | byte[] b = new byte[targetLength - 2]; |
21 | 28 | System.arraycopy(data, pos+2, b, 0, targetLength-2); |
22 | 29 | results.add(b); |
| 30 | + LOGGER.trace("Read complete message"); |
23 | 31 | pos = pos + targetLength; |
24 | 32 | } else { |
| 33 | + LOGGER.trace("Not enough data available"); |
25 | 34 | break; |
26 | 35 | } |
27 | 36 | } |
28 | 37 | } |
29 | 38 | if (data.length > pos) { |
| 39 | + LOGGER.trace("Remaining data available"); |
30 | 40 | step(data, pos, results); |
31 | 41 | } |
| 42 | + LOGGER.trace("Returning {} results", results.size()); |
32 | 43 | return results; |
33 | 44 | } |
34 | 45 |
|
35 | 46 | private void step(byte[] data, int pos, Collection<byte[]> results) { |
36 | | - if (buffer.size() == 0 && data.length == 1 + pos) { |
| 47 | + LOGGER.trace("Performing step operation on buffer of length {} with pos {}", data.length, pos); |
| 48 | + if (targetLength == 0 && data.length == 1 + pos) { |
37 | 49 | firstLengthByteBuffer = data[pos]; |
| 50 | + LOGGER.trace("Received a single byte message, storing byte {} for later", firstLengthByteBuffer); |
38 | 51 | return; |
39 | 52 | } |
40 | | - if (buffer.size() == 0) { |
| 53 | + if (targetLength == 0) { |
41 | 54 | if (firstLengthByteBuffer != null) { |
42 | | - targetLength = firstLengthByteBuffer + data[pos] * 256 + 16; |
| 55 | + targetLength = (firstLengthByteBuffer & 0xFF) + (data[pos] & 0xFF) * 256 + 16; |
43 | 56 | pos += 1; |
| 57 | + LOGGER.trace("Received the second byte after storing the first byte. New length is {}", targetLength); |
44 | 58 | } else { |
45 | | - targetLength = data[pos] + data[pos+1] * 256 + 16; |
| 59 | + targetLength = (data[pos] & 0xFF) + (data[pos+1] & 0xFF) * 256 + 16; |
46 | 60 | pos += 2; |
| 61 | + LOGGER.trace("targetLength is {}", targetLength); |
47 | 62 | } |
48 | 63 | } |
49 | 64 | int toWrite = targetLength - buffer.size(); |
50 | | - if (toWrite > data.length - pos) { |
| 65 | + if (toWrite <= data.length - pos) { |
51 | 66 | //We have a complete message |
| 67 | + LOGGER.trace("Received a complete message"); |
52 | 68 | buffer.write(data, pos, toWrite); |
53 | 69 | results.add(buffer.toByteArray()); |
54 | 70 | buffer.reset(); |
55 | | - step(data, pos + toWrite, results); |
| 71 | + targetLength=0; |
| 72 | + if (pos + toWrite > data.length) { |
| 73 | + step(data, pos + toWrite, results); |
| 74 | + } |
56 | 75 | } else { |
| 76 | + LOGGER.trace("Storing {} bytes in buffer until we receive the complete {}", data.length-pos, targetLength); |
57 | 77 | buffer.write(data, pos, data.length - pos); |
58 | 78 | } |
59 | 79 | } |
|
0 commit comments