Skip to content

Commit b868d8c

Browse files
authored
Use State to track the order of MongoDB transactions (#1742)
Summary: This PR adds functionality to track the order of transactions as they are parsed. It adds the streamID of each request to the state vector at parsing time which will then be used to iterate over at stitching time. Adding this will mainly help the stitching process when trying to stitch a request with N `moreToCome` responses. **Motivation behind this change:** The stitching implementation relies on the new interface using `absl::flat_hash_map` to store the `streamID` and a deque of request/response frames. We then use a response led matching algorithm where we loop through the response map and stitch the first response frame in a deque with its corresponding request frame. A response pairs with a request when both frames share the same `streamID`, the response frame's `streamID` is its `responseTo` and the request frame's `streamID` is its `requestID`. MongoDB's `OP_MSG` wire protocol has the concept of `more_to_come` which means that the server could send `N` responses back to a singular request by the client. Each frame in the series of the `N` responses are linked by the `responseTo` of the frame matching the `requestID` of the previous response frame, similar to a singly linked list. The head response frame's `responseTo` will be the `requestID` of the request frame. Note: the `requestID` of each frame in the `N more_to_come` frames is random and unique. At the time of stitching, if we do not use state to track the order of transactions we would iterate over the response map in a "random" order and could iterate over the `more_to_come` response frames out of order. We could lose context on how the `more_to_come` frames are linked due to not knowing the head response frame and if we were to iterate over the end of the `more_to_come` message before looping through all prior `more_to_come` frames in the message they would be dropped since we do not know which request those frames are responding to. To solve this issue, tracking the order of transactions' `streamIDs` to iterate over would ensure that could use the response led stitching approach and find the complete `more_to_come` message for a given request. **New test case:** The new test case checks to make sure the state's `stream_order` vector is correctly populated with the order of `streamIDs` as we parse new request frames (transactions). The test case parses 3 frames and expects that the state's `stream_order` after parsing the first frame to contain `std::pair<917, false>` since the first frame's `requestID` is 917. It expects the `stream_order` to contain `std::pair<917, false>`, `std::pair<444, false>` after parsing the second request frame since that frame's requestID is 444 and so on. Related issues: #640 Type of change: /kind feature Test Plan: Modified the existing tests and added another test to make sure the vector is populated correctly. --------- Signed-off-by: Kartik Pattaswamy <kpattaswamy@pixielabs.ai>
1 parent 1cd1ac5 commit b868d8c

File tree

4 files changed

+117
-23
lines changed

4 files changed

+117
-23
lines changed

src/stirling/source_connectors/socket_tracer/protocols/mongodb/parse.cc

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
* SPDX-License-Identifier: Apache-2.0
1717
*/
1818
#include <string>
19+
#include <utility>
1920

2021
#include "src/stirling/source_connectors/socket_tracer/protocols/mongodb/decode.h"
2122
#include "src/stirling/source_connectors/socket_tracer/protocols/mongodb/parse.h"
@@ -27,7 +28,7 @@ namespace stirling {
2728
namespace protocols {
2829
namespace mongodb {
2930

30-
ParseState ParseFrame(message_type_t type, std::string_view* buf, Frame* frame) {
31+
ParseState ParseFrame(message_type_t type, std::string_view* buf, Frame* frame, State* state) {
3132
if (type != message_type_t::kRequest && type != message_type_t::kResponse) {
3233
return ParseState::kInvalid;
3334
}
@@ -72,6 +73,10 @@ ParseState ParseFrame(message_type_t type, std::string_view* buf, Frame* frame)
7273
ParseState parse_state = mongodb::ProcessPayload(&decoder, frame);
7374
if (parse_state == ParseState::kSuccess) {
7475
*buf = decoder.Buf();
76+
77+
if (type == message_type_t::kRequest) {
78+
state->stream_order.push_back(std::pair(frame->request_id, false));
79+
}
7580
}
7681

7782
return parse_state;
@@ -80,8 +85,9 @@ ParseState ParseFrame(message_type_t type, std::string_view* buf, Frame* frame)
8085
} // namespace mongodb
8186

8287
template <>
83-
ParseState ParseFrame(message_type_t type, std::string_view* buf, mongodb::Frame* frame, NoState*) {
84-
return mongodb::ParseFrame(type, buf, frame);
88+
ParseState ParseFrame(message_type_t type, std::string_view* buf, mongodb::Frame* frame,
89+
mongodb::StateWrapper* state) {
90+
return mongodb::ParseFrame(type, buf, frame, &state->global);
8591
}
8692

8793
template <>

src/stirling/source_connectors/socket_tracer/protocols/mongodb/parse.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,13 @@ namespace stirling {
2828
namespace protocols {
2929
namespace mongodb {
3030

31-
ParseState ParseFrame(BinaryDecoder* decoder, Frame* frame);
31+
ParseState ParseFrame(BinaryDecoder* decoder, Frame* frame, State* state);
3232

3333
} // namespace mongodb
3434

3535
template <>
36-
ParseState ParseFrame(message_type_t type, std::string_view* buf, mongodb::Frame* frame, NoState*);
36+
ParseState ParseFrame(message_type_t type, std::string_view* buf, mongodb::Frame* frame,
37+
mongodb::StateWrapper* state);
3738

3839
template <>
3940
size_t FindFrameBoundary<mongodb::Frame>(message_type_t type, std::string_view buf,

src/stirling/source_connectors/socket_tracer/protocols/mongodb/parse_test.cc

Lines changed: 83 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,14 @@
2020

2121
#include <string>
2222
#include <utility>
23+
#include <vector>
2324

2425
#include "src/common/testing/testing.h"
2526

2627
namespace px {
2728
namespace stirling {
2829
namespace protocols {
30+
namespace mongodb {
2931

3032
// clang-format off
3133

@@ -351,7 +353,9 @@ TEST_F(MongoDBParserTest, ParseFrameWhenNeedsMoreHeaderData) {
351353
auto frame_view = CreateStringView<char>(CharArrayStringView<uint8_t>(mongoDBNeedMoreHeaderData));
352354

353355
mongodb::Frame frame;
354-
ParseState state = ParseFrame(message_type_t::kRequest, &frame_view, &frame);
356+
StateWrapper state_order{};
357+
358+
ParseState state = ParseFrame(message_type_t::kRequest, &frame_view, &frame, &state_order);
355359

356360
EXPECT_EQ(state, ParseState::kNeedsMoreData);
357361
}
@@ -360,7 +364,9 @@ TEST_F(MongoDBParserTest, ParseFrameWhenNeedsMoreData) {
360364
auto frame_view = CreateStringView<char>(CharArrayStringView<uint8_t>(mongoDBNeedMoreData));
361365

362366
mongodb::Frame frame;
363-
ParseState state = ParseFrame(message_type_t::kRequest, &frame_view, &frame);
367+
StateWrapper state_order{};
368+
369+
ParseState state = ParseFrame(message_type_t::kRequest, &frame_view, &frame, &state_order);
364370

365371
EXPECT_EQ(state, ParseState::kNeedsMoreData);
366372
}
@@ -369,7 +375,9 @@ TEST_F(MongoDBParserTest, ParseFrameWhenNotValidType) {
369375
auto frame_view = CreateStringView<char>(CharArrayStringView<uint8_t>(mongoDBInvalidType));
370376

371377
mongodb::Frame frame;
372-
ParseState state = ParseFrame(message_type_t::kRequest, &frame_view, &frame);
378+
StateWrapper state_order{};
379+
380+
ParseState state = ParseFrame(message_type_t::kRequest, &frame_view, &frame, &state_order);
373381

374382
EXPECT_EQ(state, ParseState::kInvalid);
375383
}
@@ -378,7 +386,9 @@ TEST_F(MongoDBParserTest, ParseFrameWhenUnsupportedType) {
378386
auto frame_view = CreateStringView<char>(CharArrayStringView<uint8_t>(mongoDBUnsupportedType));
379387

380388
mongodb::Frame frame;
381-
ParseState state = ParseFrame(message_type_t::kRequest, &frame_view, &frame);
389+
StateWrapper state_order{};
390+
391+
ParseState state = ParseFrame(message_type_t::kRequest, &frame_view, &frame, &state_order);
382392

383393
EXPECT_EQ(state, ParseState::kIgnored);
384394
}
@@ -387,7 +397,9 @@ TEST_F(MongoDBParserTest, ParseFrameInvalidFlagBits) {
387397
auto frame_view = CreateStringView<char>(CharArrayStringView<uint8_t>(mongoDBInvalidFlagBits));
388398

389399
mongodb::Frame frame;
390-
ParseState state = ParseFrame(message_type_t::kRequest, &frame_view, &frame);
400+
StateWrapper state_order{};
401+
402+
ParseState state = ParseFrame(message_type_t::kRequest, &frame_view, &frame, &state_order);
391403

392404
EXPECT_EQ(state, ParseState::kInvalid);
393405
}
@@ -396,7 +408,9 @@ TEST_F(MongoDBParserTest, ParseFrameValidFlagBitsSet) {
396408
auto frame_view = CreateStringView<char>(CharArrayStringView<uint8_t>(mongoDBValidFlagBitsSet));
397409

398410
mongodb::Frame frame;
399-
ParseState state = ParseFrame(message_type_t::kRequest, &frame_view, &frame);
411+
StateWrapper state_order{};
412+
413+
ParseState state = ParseFrame(message_type_t::kRequest, &frame_view, &frame, &state_order);
400414

401415
EXPECT_TRUE(frame.checksum_present);
402416
EXPECT_TRUE(frame.more_to_come);
@@ -408,7 +422,9 @@ TEST_F(MongoDBParserTest, ParseFrameInvalidChecksum) {
408422
auto frame_view = CreateStringView<char>(CharArrayStringView<uint8_t>(mongoDBMissingChecksum));
409423

410424
mongodb::Frame frame;
411-
ParseState state = ParseFrame(message_type_t::kRequest, &frame_view, &frame);
425+
StateWrapper state_order{};
426+
427+
ParseState state = ParseFrame(message_type_t::kRequest, &frame_view, &frame, &state_order);
412428

413429
EXPECT_TRUE(frame.checksum_present);
414430
EXPECT_EQ(state, ParseState::kInvalid);
@@ -418,7 +434,9 @@ TEST_F(MongoDBParserTest, ParseFrameInvalidKindByte) {
418434
auto frame_view = CreateStringView<char>(CharArrayStringView<uint8_t>(mongoDBInvalidKindByte));
419435

420436
mongodb::Frame frame;
421-
ParseState state = ParseFrame(message_type_t::kRequest, &frame_view, &frame);
437+
StateWrapper state_order{};
438+
439+
ParseState state = ParseFrame(message_type_t::kRequest, &frame_view, &frame, &state_order);
422440

423441
EXPECT_EQ(state, ParseState::kInvalid);
424442
}
@@ -428,7 +446,9 @@ TEST_F(MongoDBParserTest, ParseFrameInvalidSeqIdentifier) {
428446
CreateStringView<char>(CharArrayStringView<uint8_t>(mongoDBInvalidSeqIdentifier));
429447

430448
mongodb::Frame frame;
431-
ParseState state = ParseFrame(message_type_t::kRequest, &frame_view, &frame);
449+
StateWrapper state_order{};
450+
451+
ParseState state = ParseFrame(message_type_t::kRequest, &frame_view, &frame, &state_order);
432452

433453
EXPECT_EQ(state, ParseState::kInvalid);
434454
}
@@ -437,7 +457,9 @@ TEST_F(MongoDBParserTest, ParseFrameEmptyDocument) {
437457
auto frame_view = CreateStringView<char>(CharArrayStringView<uint8_t>(mongoDBEmptyDocument));
438458

439459
mongodb::Frame frame;
440-
ParseState state = ParseFrame(message_type_t::kRequest, &frame_view, &frame);
460+
StateWrapper state_order{};
461+
462+
ParseState state = ParseFrame(message_type_t::kRequest, &frame_view, &frame, &state_order);
441463

442464
EXPECT_EQ(frame.sections[0].kind, 0);
443465
EXPECT_EQ(frame.sections[0].length, 4);
@@ -452,7 +474,9 @@ TEST_F(MongoDBParserTest, ParseFrameValidRequest) {
452474
auto frame_view = CreateStringView<char>(CharArrayStringView<uint8_t>(mongoDBValidRequest));
453475

454476
mongodb::Frame frame;
455-
ParseState state = ParseFrame(message_type_t::kRequest, &frame_view, &frame);
477+
StateWrapper state_order{};
478+
479+
ParseState state = ParseFrame(message_type_t::kRequest, &frame_view, &frame, &state_order);
456480

457481
EXPECT_EQ(frame.length, 178);
458482
EXPECT_EQ(frame.request_id, 444);
@@ -468,7 +492,9 @@ TEST_F(MongoDBParserTest, ParseFrameValidResponse) {
468492
auto frame_view = CreateStringView<char>(CharArrayStringView<uint8_t>(mongoDBValidResponse));
469493

470494
mongodb::Frame frame;
471-
ParseState state = ParseFrame(message_type_t::kResponse, &frame_view, &frame);
495+
StateWrapper state_order{};
496+
497+
ParseState state = ParseFrame(message_type_t::kResponse, &frame_view, &frame, &state_order);
472498

473499
EXPECT_EQ(frame.length, 45);
474500
EXPECT_EQ(frame.request_id, 917);
@@ -485,7 +511,9 @@ TEST_F(MongoDBParserTest, ParseFrameValidRequestTwoSections) {
485511
CreateStringView<char>(CharArrayStringView<uint8_t>(mongoDBValidRequestTwoSections));
486512

487513
mongodb::Frame frame;
488-
ParseState state = ParseFrame(message_type_t::kRequest, &frame_view, &frame);
514+
StateWrapper state_order{};
515+
516+
ParseState state = ParseFrame(message_type_t::kRequest, &frame_view, &frame, &state_order);
489517

490518
EXPECT_EQ(frame.length, 157);
491519
EXPECT_EQ(frame.request_id, 1144108930);
@@ -504,7 +532,9 @@ TEST_F(MongoDBParserTest, ParseFrameValidResponseTwoSections) {
504532
CreateStringView<char>(CharArrayStringView<uint8_t>(mongoDBValidResponseTwoSections));
505533

506534
mongodb::Frame frame;
507-
ParseState state = ParseFrame(message_type_t::kResponse, &frame_view, &frame);
535+
StateWrapper state_order{};
536+
537+
ParseState state = ParseFrame(message_type_t::kResponse, &frame_view, &frame, &state_order);
508538

509539
EXPECT_EQ(frame.length, 45);
510540
EXPECT_EQ(frame.request_id, 444);
@@ -521,7 +551,10 @@ TEST_F(MongoDBParserTest, ParseValidFrameAndInvalidFrame) {
521551
CreateStringView<char>(CharArrayStringView<uint8_t>(mongoDBValidRequestAndInvalidRequest));
522552

523553
mongodb::Frame valid_frame;
524-
ParseState state = ParseFrame(message_type_t::kRequest, &frame_view, &valid_frame);
554+
StateWrapper state_order{};
555+
556+
ParseState state = ParseFrame(message_type_t::kRequest, &frame_view, &valid_frame, &state_order);
557+
525558
EXPECT_EQ(valid_frame.length, 45);
526559
EXPECT_EQ(valid_frame.request_id, 917);
527560
EXPECT_EQ(valid_frame.response_to, 444);
@@ -532,11 +565,44 @@ TEST_F(MongoDBParserTest, ParseValidFrameAndInvalidFrame) {
532565
EXPECT_EQ(state, ParseState::kSuccess);
533566

534567
mongodb::Frame invalid_frame;
535-
state = ParseFrame(message_type_t::kRequest, &frame_view, &invalid_frame);
568+
state = ParseFrame(message_type_t::kRequest, &frame_view, &invalid_frame, &state_order);
536569
EXPECT_EQ(state, ParseState::kInvalid);
537570
}
538571

539-
namespace mongodb {} // namespace mongodb
572+
TEST_F(MongoDBParserTest, ValidateStateOrderFromFrames) {
573+
// Setup three different request frames.
574+
auto frame_view_1 = CreateStringView<char>(CharArrayStringView<uint8_t>(mongoDBValidFlagBitsSet));
575+
auto frame_view_2 = CreateStringView<char>(CharArrayStringView<uint8_t>(mongoDBValidRequest));
576+
auto frame_view_3 =
577+
CreateStringView<char>(CharArrayStringView<uint8_t>(mongoDBValidRequestTwoSections));
578+
579+
// Initialize what the state's stream order should look like after the first insertion.
580+
std::vector<std::pair<mongodb::stream_id_t, bool>> stream_order{{917, false}};
581+
582+
StateWrapper state_order{};
583+
584+
mongodb::Frame frame_1;
585+
ParseState state_1 = ParseFrame(message_type_t::kRequest, &frame_view_1, &frame_1, &state_order);
586+
EXPECT_EQ(frame_1.request_id, 917);
587+
EXPECT_EQ(state_order.global.stream_order, stream_order);
588+
EXPECT_EQ(state_1, ParseState::kSuccess);
589+
590+
stream_order.push_back({444, false});
591+
mongodb::Frame frame_2;
592+
ParseState state_2 = ParseFrame(message_type_t::kRequest, &frame_view_2, &frame_2, &state_order);
593+
EXPECT_EQ(frame_2.request_id, 444);
594+
EXPECT_EQ(state_order.global.stream_order, stream_order);
595+
EXPECT_EQ(state_2, ParseState::kSuccess);
596+
597+
stream_order.push_back({1144108930, false});
598+
mongodb::Frame frame_3;
599+
ParseState state_3 = ParseFrame(message_type_t::kRequest, &frame_view_3, &frame_3, &state_order);
600+
EXPECT_EQ(frame_3.request_id, 1144108930);
601+
EXPECT_EQ(state_order.global.stream_order, stream_order);
602+
EXPECT_EQ(state_3, ParseState::kSuccess);
603+
}
604+
605+
} // namespace mongodb
540606
} // namespace protocols
541607
} // namespace stirling
542608
} // namespace px

src/stirling/source_connectors/socket_tracer/protocols/mongodb/types.h

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
#pragma once
2020

2121
#include <string>
22+
#include <utility>
23+
2224
#include <vector>
2325

2426
#include "src/stirling/source_connectors/socket_tracer/protocols/common/event_parser.h"
@@ -153,10 +155,29 @@ struct Record {
153155
}
154156
};
155157

158+
// The stream_order state tracks which stream_id to stitch first.
159+
// In more detail, the MongoDB wire protocol can link responses together in a more_to_come scenario
160+
// where each response points to a new response via the streamID. To stitch correctly, we record the
161+
// first streamID on the request side in each such sequence during frame parsing and store that in
162+
// the stream_order vector. The second item in the pair is a boolean flag that is initially false,
163+
// but set to true during the stitching process to indicate that the transaction has been processsed
164+
// and can be removed from the vector.
165+
using stream_id_t = int32_t;
166+
struct State {
167+
std::vector<std::pair<mongodb::stream_id_t, bool>> stream_order;
168+
};
169+
170+
struct StateWrapper {
171+
State global;
172+
std::monostate send;
173+
std::monostate recv;
174+
};
175+
156176
struct ProtocolTraits : public BaseProtocolTraits<Record> {
157177
using frame_type = Frame;
158178
using record_type = Record;
159-
using state_type = NoState;
179+
using state_type = StateWrapper;
180+
using key_type = stream_id_t;
160181
};
161182

162183
} // namespace mongodb

0 commit comments

Comments
 (0)