Skip to content

Commit 4bfdc4f

Browse files
abhinavdangetichiyoung
authored andcommitted
Address possible data races in ActiveStream context
WARNING: ThreadSanitizer: data race (pid=27028) Read of size 8 at 0x7d480000b1f8 by main thread (mutexes: write M32941632, write M1367, write M32940809): #0 void STATWRITER_NAMESPACE::add_casted_stat<unsigned long>(char const*, unsigned long const&, void (*)(char const*, unsigned short, char const*, unsigned int, void const*), void const*) /home/abhinav/couchbase/ep-engine/src/statwriter.h:45 (ep.so+0x000000037825) couchbase#1 ActiveStream::addStats(void (*)(char const*, unsigned short, char const*, unsigned int, void const*), void const*) /home/abhinav/couchbase/ep-engine/src/dcp/stream.cc:477 (ep.so+0x000000071d16) couchbase#2 DcpProducer::addStats(void (*)(char const*, unsigned short, char const*, unsigned int, void const*), void const*) /home/abhinav/couchbase/ep-engine/src/dcp/producer.cc:602 (ep.so+0x000000068057) couchbase#3 ConnStatBuilder::operator()(SingleThreadedRCPtr<ConnHandler>&) /home/abhinav/couchbase/ep-engine/src/ep_engine.cc:3887 (ep.so+0x0000000e13e1) couchbase#4 EventuallyPersistentEngine::doDcpStats(void const*, void (*)(char const*, unsigned short, char const*, unsigned int, void const*)) /home/abhinav/couchbase/ep-engine/src/ep_engine.cc:4144 (ep.so+0x0000000c151a) couchbase#5 EventuallyPersistentEngine::getStats(void const*, char const*, int, void (*)(char const*, unsigned short, char const*, unsigned int, void const*)) /home/abhinav/couchbase/ep-engine/src/ep_engine.cc:4564 (ep.so+0x0000000c5405) #6 EvpGetStats(engine_interface*, void const*, char const*, int, void (*)(char const*, unsigned short, char const*, unsigned int, void const*)) /home/abhinav/couchbase/ep-engine/src/ep_engine.cc:213 (ep.so+0x0000000b422e) #7 mock_get_stats(engine_interface*, void const*, char const*, int, void (*)(char const*, unsigned short, char const*, unsigned int, void const*)) /home/abhinav/couchbase/memcached/programs/engine_testapp/engine_testapp.cc:239 (engine_testapp+0x0000000ba9ad) #8 get_int_stat(engine_interface*, engine_interface_v1*, char const*, char const*) /home/abhinav/couchbase/ep-engine/tests/ep_test_apis.cc:990 (ep_testsuite.so+0x0000000aeb81) #9 dcp_stream(engine_interface*, engine_interface_v1*, char const*, void const*, unsigned short, unsigned int, unsigned long, unsigned long, unsigned long, unsigned long, unsigned long, int, int, int, int, bool, bool, unsigned char, bool, unsigned long*, bool) /home/abhinav/couchbase/ep-engine/tests/ep_testsuite.cc:4090 (ep_testsuite.so+0x00000009790c) #10 test_dcp_producer_stream_req_dgm(engine_interface*, engine_interface_v1*) /home/abhinav/couchbase/ep-engine/tests/ep_testsuite.cc:4564 (ep_testsuite.so+0x000000077604) #11 execute_test(test, char const*, char const*) /home/abhinav/couchbase/memcached/programs/engine_testapp/engine_testapp.cc:1090 (engine_testapp+0x0000000b946c) #12 __libc_start_main /build/buildd/eglibc-2.19/csu/libc-start.c:287 (libc.so.6+0x000000021ec4) Previous write of size 8 at 0x7d480000b1f8 by thread T9 (mutexes: write M32940880, write M32940855): #0 ActiveStream::backfillReceived(Item*, backfill_source_t) /home/abhinav/couchbase/ep-engine/src/dcp/stream.cc:287 (ep.so+0x00000007054e) couchbase#1 DiskCallback::callback(GetValue&) /home/abhinav/couchbase/ep-engine/src/dcp/backfill.cc:94 (ep.so+0x000000056067) couchbase#2 CouchKVStore::recordDbDump(_db*, _docinfo*, void*) /home/abhinav/couchbase/ep-engine/src/couch-kvstore/couch-kvstore.cc:1757 (ep.so+0x00000018103f) couchbase#3 recordDbDumpC(_db*, _docinfo*, void*) /home/abhinav/couchbase/ep-engine/src/couch-kvstore/couch-kvstore.cc:66 (ep.so+0x00000017fcc5) couchbase#4 lookup_callback(couchfile_lookup_request*, _sized_buf const*, _sized_buf const*) /home/abhinav/couchbase/couchstore/src/couch_db.cc:767 (libcouchstore.so+0x00000000d7f5) couchbase#5 btree_lookup_inner(couchfile_lookup_request*, unsigned long, int, int) /home/abhinav/couchbase/couchstore/src/btree_read.cc:99 (libcouchstore.so+0x00000000b5b2) #6 btree_lookup_inner(couchfile_lookup_request*, unsigned long, int, int) /home/abhinav/couchbase/couchstore/src/btree_read.cc:69 (libcouchstore.so+0x00000000b370) #7 btree_lookup_inner(couchfile_lookup_request*, unsigned long, int, int) /home/abhinav/couchbase/couchstore/src/btree_read.cc:69 (libcouchstore.so+0x00000000b370) #8 btree_lookup /home/abhinav/couchbase/couchstore/src/btree_read.cc:131 (libcouchstore.so+0x00000000b00c) #9 couchstore_changes_since /home/abhinav/couchbase/couchstore/src/couch_db.cc:812 (libcouchstore.so+0x00000000d601) #10 CouchKVStore::scan(ScanContext*) /home/abhinav/couchbase/ep-engine/src/couch-kvstore/couch-kvstore.cc:1264 (ep.so+0x00000017f77e) #11 DCPBackfill::scan() /home/abhinav/couchbase/ep-engine/src/dcp/backfill.cc:193 (ep.so+0x000000057672) #12 DCPBackfill::run() /home/abhinav/couchbase/ep-engine/src/dcp/backfill.cc:118 (ep.so+0x000000056647) #13 BackfillManager::backfill() /home/abhinav/couchbase/ep-engine/src/dcp/backfill-manager.cc:240 (ep.so+0x0000000508d5) #14 BackfillManagerTask::run() /home/abhinav/couchbase/ep-engine/src/dcp/backfill-manager.cc:43 (ep.so+0x00000005052f) #15 ExecutorThread::run() /home/abhinav/couchbase/ep-engine/src/executorthread.cc:112 (ep.so+0x0000000f8796) #16 launch_executor_thread(void*) /home/abhinav/couchbase/ep-engine/src/executorthread.cc:33 (ep.so+0x0000000f8335) #17 platform_thread_wrap /home/abhinav/couchbase/platform/src/cb_pthreads.c:23 (libplatform.so.0.1.0+0x000000003d31) Change-Id: I166917524b5fcad285b3623ff160e875c316d983 Reviewed-on: http://review.couchbase.org/55781 Tested-by: buildbot <build@couchbase.com> Reviewed-by: Chiyoung Seo <chiyoung@couchbase.com>
1 parent 6db8d9b commit 4bfdc4f

File tree

2 files changed

+24
-22
lines changed

2 files changed

+24
-22
lines changed

src/dcp/stream.cc

Lines changed: 20 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -284,7 +284,7 @@ bool ActiveStream::backfillReceived(Item* itm, backfill_source_t backfill_source
284284
pushToReadyQ(new MutationResponse(itm, opaque_,
285285
prepareExtendedMetaData(itm->getVBucketId(),
286286
itm->getConflictResMode())));
287-
lastReadSeqno = itm->getBySeqno();
287+
lastReadSeqno.store(itm->getBySeqno());
288288

289289
if (!itemsReady) {
290290
itemsReady = true;
@@ -313,7 +313,7 @@ void ActiveStream::completeBackfill() {
313313
"%" PRIu64 " items read from disk, %" PRIu64 " from memory,"
314314
" last seqno read: %" PRIu64,
315315
producer->logHeader(), vb_, uint64_t(backfillItems.disk.load()),
316-
uint64_t(backfillItems.memory.load()), lastReadSeqno);
316+
uint64_t(backfillItems.memory.load()), lastReadSeqno.load());
317317

318318
if (!itemsReady) {
319319
itemsReady = true;
@@ -380,7 +380,7 @@ DcpResponse* ActiveStream::backfillPhase() {
380380

381381
if (!isBackfillTaskRunning && readyQ.empty()) {
382382
backfillRemaining.store(0, std::memory_order_relaxed);
383-
if (lastReadSeqno >= end_seqno_) {
383+
if (lastReadSeqno.load() >= end_seqno_) {
384384
endStream(END_STREAM_OK);
385385
} else if (flags_ & DCP_ADD_STREAM_FLAG_TAKEOVER) {
386386
transitionState(STREAM_TAKEOVER_SEND);
@@ -403,7 +403,7 @@ DcpResponse* ActiveStream::inMemoryPhase() {
403403
return nextQueuedItem();
404404
}
405405

406-
if (lastSentSeqno >= end_seqno_) {
406+
if (lastSentSeqno.load() >= end_seqno_) {
407407
endStream(END_STREAM_OK);
408408
} else {
409409
nextCheckpointItem();
@@ -470,11 +470,11 @@ void ActiveStream::addStats(ADD_STAT add_stat, const void *c) {
470470
snprintf(buffer, bsize, "%s:stream_%d_backfill_sent", name_.c_str(), vb_);
471471
add_casted_stat(buffer, backfillItems.sent, add_stat, c);
472472
snprintf(buffer, bsize, "%s:stream_%d_memory_phase", name_.c_str(), vb_);
473-
add_casted_stat(buffer, itemsFromMemoryPhase, add_stat, c);
473+
add_casted_stat(buffer, itemsFromMemoryPhase.load(), add_stat, c);
474474
snprintf(buffer, bsize, "%s:stream_%d_last_sent_seqno", name_.c_str(), vb_);
475-
add_casted_stat(buffer, lastSentSeqno, add_stat, c);
475+
add_casted_stat(buffer, lastSentSeqno.load(), add_stat, c);
476476
snprintf(buffer, bsize, "%s:stream_%d_last_read_seqno", name_.c_str(), vb_);
477-
add_casted_stat(buffer, lastReadSeqno, add_stat, c);
477+
add_casted_stat(buffer, lastReadSeqno.load(), add_stat, c);
478478
snprintf(buffer, bsize, "%s:stream_%d_ready_queue_memory", name_.c_str(), vb_);
479479
add_casted_stat(buffer, getReadyQueueMemory(), add_stat, c);
480480
snprintf(buffer, bsize, "%s:stream_%d_items_ready", name_.c_str(), vb_);
@@ -538,7 +538,8 @@ DcpResponse* ActiveStream::nextQueuedItem() {
538538
if (response->getEvent() == DCP_MUTATION ||
539539
response->getEvent() == DCP_DELETION ||
540540
response->getEvent() == DCP_EXPIRATION) {
541-
lastSentSeqno = dynamic_cast<MutationResponse*>(response)->getBySeqno();
541+
lastSentSeqno.store(
542+
dynamic_cast<MutationResponse*>(response)->getBySeqno());
542543

543544
if (state_ == STREAM_BACKFILLING) {
544545
backfillItems.sent++;
@@ -585,7 +586,7 @@ void ActiveStream::nextCheckpointItem() {
585586
if (qi->getOperation() == queue_op_set ||
586587
qi->getOperation() == queue_op_del) {
587588
curChkSeqno = qi->getBySeqno();
588-
lastReadSeqno = qi->getBySeqno();
589+
lastReadSeqno.store(qi->getBySeqno());
589590

590591
mutations.push_back(new MutationResponse(qi, opaque_,
591592
prepareExtendedMetaData(qi->getVBucketId(),
@@ -682,7 +683,7 @@ void ActiveStream::endStream(end_stream_status_t reason) {
682683
"%" PRIu64 " was last seqno sent, reason: %s",
683684
producer->logHeader(), vb_,
684685
uint64_t(backfillItems.sent.load()),
685-
uint64_t(itemsFromMemoryPhase), lastSentSeqno,
686+
uint64_t(itemsFromMemoryPhase.load()), lastSentSeqno.load(),
686687
getEndStreamStatusStr(reason));
687688
}
688689
}
@@ -696,17 +697,17 @@ void ActiveStream::scheduleBackfill() {
696697

697698
CursorRegResult result =
698699
vbucket->checkpointManager.registerCursorBySeqno(name_,
699-
lastReadSeqno);
700+
lastReadSeqno.load());
700701
curChkSeqno = result.first;
701702
bool isFirstItem = result.second;
702703

703-
if (lastReadSeqno > curChkSeqno) {
704+
if (lastReadSeqno.load() > curChkSeqno) {
704705
throw std::logic_error("ActiveStream::scheduleBackfill: "
705-
"lastReadSeqno (which is " + std::to_string(lastReadSeqno) +
706+
"lastReadSeqno (which is " + std::to_string(lastReadSeqno.load()) +
706707
") is greater than curChkSeqno (which is " +
707708
std::to_string(curChkSeqno) + ")");
708709
}
709-
uint64_t backfillStart = lastReadSeqno + 1;
710+
uint64_t backfillStart = lastReadSeqno.load() + 1;
710711

711712
/* We need to find the minimum seqno that needs to be backfilled in
712713
* order to make sure that we don't miss anything when transitioning
@@ -841,20 +842,20 @@ size_t ActiveStream::getItemsRemaining() {
841842
uint64_t high_seqno = vbucket->getHighSeqno();
842843

843844
if (end_seqno_ < high_seqno) {
844-
if (end_seqno_ > lastSentSeqno) {
845-
return (end_seqno_ - lastSentSeqno);
845+
if (end_seqno_ > lastSentSeqno.load()) {
846+
return (end_seqno_ - lastSentSeqno.load());
846847
}
847848
} else {
848-
if (high_seqno > lastSentSeqno) {
849-
return (high_seqno - lastSentSeqno);
849+
if (high_seqno > lastSentSeqno.load()) {
850+
return (high_seqno - lastSentSeqno.load());
850851
}
851852
}
852853

853854
return 0;
854855
}
855856

856857
uint64_t ActiveStream::getLastSentSeqno() {
857-
return lastSentSeqno;
858+
return lastSentSeqno.load();
858859
}
859860

860861
ExtendedMetaData* ActiveStream::prepareExtendedMetaData(uint16_t vBucketId,

src/dcp/stream.h

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -250,10 +250,10 @@ class ActiveStream : public Stream {
250250
uint8_t conflictResMode);
251251

252252
//! The last sequence number queued from disk or memory
253-
uint64_t lastReadSeqno;
253+
AtomicValue<uint64_t> lastReadSeqno;
254254

255255
//! The last sequence number sent to the network layer
256-
uint64_t lastSentSeqno;
256+
AtomicValue<uint64_t> lastSentSeqno;
257257

258258
//! The last known seqno pointed to by the checkpoint cursor
259259
uint64_t curChkSeqno;
@@ -274,8 +274,9 @@ class ActiveStream : public Stream {
274274
AtomicValue<size_t> disk;
275275
AtomicValue<size_t> sent;
276276
} backfillItems;
277+
277278
//! The amount of items that have been sent during the memory phase
278-
size_t itemsFromMemoryPhase;
279+
AtomicValue<size_t> itemsFromMemoryPhase;
279280

280281
//! Whether ot not this is the first snapshot marker sent
281282
bool firstMarkerSent;

0 commit comments

Comments
 (0)