From da854a0793e210a4236dedf9d8895dee1f098125 Mon Sep 17 00:00:00 2001 From: Mathias Stearn Date: Tue, 18 Oct 2016 16:23:24 -0400 Subject: SERVER-26685 Tie oplog visibility to durability (cherry picked from commit 8a2f2fc1883f5deb1b23915cd7a47686a623ba87) --- src/mongo/db/catalog/collection.cpp | 5 +- src/mongo/db/catalog/database.cpp | 7 +- src/mongo/db/concurrency/locker.h | 14 +- src/mongo/db/concurrency/locker_noop.h | 4 + src/mongo/db/exec/collection_scan.cpp | 14 ++ src/mongo/db/storage/devnull/devnull_kv_engine.cpp | 2 + .../ephemeral_for_test_record_store.h | 2 + .../db/storage/mmap_v1/heap_record_store_btree.h | 4 + .../db/storage/mmap_v1/record_store_v1_base.h | 2 + src/mongo/db/storage/record_store.h | 16 +- .../record_store_test_capped_visibility.cpp | 24 +-- .../storage/wiredtiger/wiredtiger_record_store.cpp | 162 ++++++++++++++------- .../storage/wiredtiger/wiredtiger_record_store.h | 24 ++- .../wiredtiger/wiredtiger_record_store_test.cpp | 85 ++++++++++- .../wiredtiger/wiredtiger_recovery_unit.cpp | 1 + .../wiredtiger/wiredtiger_session_cache.cpp | 2 +- src/mongo/dbtests/repltests.cpp | 30 +--- 17 files changed, 299 insertions(+), 99 deletions(-) diff --git a/src/mongo/db/catalog/collection.cpp b/src/mongo/db/catalog/collection.cpp index 12c2506b524..0043b20a4c5 100644 --- a/src/mongo/db/catalog/collection.cpp +++ b/src/mongo/db/catalog/collection.cpp @@ -196,10 +196,11 @@ Collection::Collection(OperationContext* txn, Collection::~Collection() { verify(ok()); - _magic = 0; - if (_cappedNotifier) { + if (isCapped()) { + _recordStore->setCappedCallback(nullptr); _cappedNotifier->kill(); } + _magic = 0; } bool Collection::requiresIdIndex() const { diff --git a/src/mongo/db/catalog/database.cpp b/src/mongo/db/catalog/database.cpp index de014156efe..a54a5e4798c 100644 --- a/src/mongo/db/catalog/database.cpp +++ b/src/mongo/db/catalog/database.cpp @@ -386,11 +386,12 @@ Status Database::dropCollection(OperationContext* txn, StringData fullns) { Top::get(txn->getClient()->getServiceContext()).collectionDropped(fullns); - s = _dbEntry->dropCollection(txn, fullns); - - // we want to do this always + // We want to destroy the Collection object before telling the StorageEngine to destroy the + // RecordStore. _clearCollectionCache(txn, fullns, "collection dropped"); + s = _dbEntry->dropCollection(txn, fullns); + if (!s.isOK()) return s; diff --git a/src/mongo/db/concurrency/locker.h b/src/mongo/db/concurrency/locker.h index cdc490e1379..c664b3841d7 100644 --- a/src/mongo/db/concurrency/locker.h +++ b/src/mongo/db/concurrency/locker.h @@ -49,7 +49,19 @@ public: virtual ~Locker() {} /** - * Require global lock attempts with obtain tickets from 'reading' (for MODE_S and MODE_IS), + * Returns true if this is an instance of LockerNoop. Because LockerNoop doesn't implement many + * methods, some users may need to check this first to find out what is safe to call. LockerNoop + * is only used in unittests and for a brief period at startup, so you can assume you hold the + * equivalent of a MODE_X lock when using it. + * + * TODO get rid of this once we kill LockerNoop. + */ + virtual bool isNoop() const { + return false; + } + + /** + * Require global lock attempts to obtain tickets from 'reading' (for MODE_S and MODE_IS), * and from 'writing' (for MODE_IX), which must have static lifetimes. There is no throttling * for MODE_X, as there can only ever be a single locker using this mode. The throttling is * intended to defend against arge drops in throughput under high load due to too much diff --git a/src/mongo/db/concurrency/locker_noop.h b/src/mongo/db/concurrency/locker_noop.h index cfbd3234c2c..9e8d3b6300a 100644 --- a/src/mongo/db/concurrency/locker_noop.h +++ b/src/mongo/db/concurrency/locker_noop.h @@ -41,6 +41,10 @@ class LockerNoop : public Locker { public: LockerNoop() {} + virtual bool isNoop() const { + return true; + } + virtual ClientState getClientState() const { invariant(false); } diff --git a/src/mongo/db/exec/collection_scan.cpp b/src/mongo/db/exec/collection_scan.cpp index 74d64c39cc8..db07439fea9 100644 --- a/src/mongo/db/exec/collection_scan.cpp +++ b/src/mongo/db/exec/collection_scan.cpp @@ -97,6 +97,20 @@ PlanStage::StageState CollectionScan::work(WorkingSetID* out) { try { if (needToMakeCursor) { const bool forward = _params.direction == CollectionScanParams::FORWARD; + + if (forward && !_params.tailable && _params.collection->ns().isOplog()) { + // Forward, non-tailable scans from the oplog need to wait until all oplog entries + // before the read begins to be visible. This isn't needed for reverse scans because + // we only hide oplog entries from forward scans, and it isn't necessary for tailing + // cursors because they ignore EOF and will eventually see all writes. Forward, + // non-tailable scans are the only case where a meaningful EOF will be seen that + // might not include writes that finished before the read started. This also must be + // done before we create the cursor as that is when we establish the endpoint for + // the cursor. + _params.collection->getRecordStore()->waitForAllEarlierOplogWritesToBeVisible( + getOpCtx()); + } + _cursor = _params.collection->getCursor(getOpCtx(), forward); if (!_lastSeenId.isNull()) { diff --git a/src/mongo/db/storage/devnull/devnull_kv_engine.cpp b/src/mongo/db/storage/devnull/devnull_kv_engine.cpp index 9c82309d5c9..d601702623f 100644 --- a/src/mongo/db/storage/devnull/devnull_kv_engine.cpp +++ b/src/mongo/db/storage/devnull/devnull_kv_engine.cpp @@ -163,6 +163,8 @@ public: return Status::OK(); } + void waitForAllEarlierOplogWritesToBeVisible(OperationContext* txn) const override {} + virtual void updateStatsAfterRepair(OperationContext* txn, long long numRecords, long long dataSize) {} diff --git a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_record_store.h b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_record_store.h index 324a30653eb..3d1ef8f111b 100644 --- a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_record_store.h +++ b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_record_store.h @@ -121,6 +121,8 @@ public: virtual boost::optional oplogStartHack(OperationContext* txn, const RecordId& startingPosition) const; + void waitForAllEarlierOplogWritesToBeVisible(OperationContext* txn) const override {} + virtual void updateStatsAfterRepair(OperationContext* txn, long long numRecords, long long dataSize) { diff --git a/src/mongo/db/storage/mmap_v1/heap_record_store_btree.h b/src/mongo/db/storage/mmap_v1/heap_record_store_btree.h index a496f1ff31e..2c40bcb61ab 100644 --- a/src/mongo/db/storage/mmap_v1/heap_record_store_btree.h +++ b/src/mongo/db/storage/mmap_v1/heap_record_store_btree.h @@ -154,6 +154,10 @@ public: invariant(false); } + void waitForAllEarlierOplogWritesToBeVisible(OperationContext* txn) const override { + invariant(false); + } + virtual void updateStatsAfterRepair(OperationContext* txn, long long numRecords, long long dataSize) { diff --git a/src/mongo/db/storage/mmap_v1/record_store_v1_base.h b/src/mongo/db/storage/mmap_v1/record_store_v1_base.h index d34c5a9b3f0..6b7b6c1630e 100644 --- a/src/mongo/db/storage/mmap_v1/record_store_v1_base.h +++ b/src/mongo/db/storage/mmap_v1/record_store_v1_base.h @@ -256,6 +256,8 @@ public: /* return which "deleted bucket" for this size object */ static int bucket(int size); + void waitForAllEarlierOplogWritesToBeVisible(OperationContext* txn) const override {} + virtual void updateStatsAfterRepair(OperationContext* txn, long long numRecords, long long dataSize) { diff --git a/src/mongo/db/storage/record_store.h b/src/mongo/db/storage/record_store.h index b6e973a16ab..499bcbacdd2 100644 --- a/src/mongo/db/storage/record_store.h +++ b/src/mongo/db/storage/record_store.h @@ -127,12 +127,11 @@ struct BsonRecord { * IMPORTANT NOTE FOR DOCUMENT-LOCKING ENGINES: If you implement capped collections with a * "visibility" system such that documents that exist in your snapshot but were inserted after * the last uncommitted document are hidden, you must follow the following rules: - * - next() must never return invisible documents. + * - next() on forward cursors must never return invisible documents. * - If next() on a forward cursor hits an invisible document, it should behave as if it hit * the end of the collection. - * - When next() on a reverse cursor seeks to the end of the collection it must return the - * newest visible document. This should only return boost::none if there are no visible - * documents in the collection. + * - Reverse cursors must ignore the visibility filter. That means that they initially return the + * newest committed record in the collection and may skip over uncommitted records. * - SeekableRecordCursor::seekExact() must ignore the visibility filter and return the requested * document even if it is supposed to be invisible. * TODO SERVER-18934 Handle this above the storage engine layer so storage engines don't have to @@ -579,6 +578,15 @@ public: return Status::OK(); } + /** + * Waits for all writes that completed before this call to be visible to forward scans. + * See the comment on RecordCursor for more details about the visibility rules. + * + * It is only legal to call this on an oplog. It is illegal to call this inside a + * WriteUnitOfWork. + */ + virtual void waitForAllEarlierOplogWritesToBeVisible(OperationContext* txn) const = 0; + /** * Called after a repair operation is run with the recomputed numRecords and dataSize. */ diff --git a/src/mongo/db/storage/record_store_test_capped_visibility.cpp b/src/mongo/db/storage/record_store_test_capped_visibility.cpp index 1f7d7afa87c..3d1765564b5 100644 --- a/src/mongo/db/storage/record_store_test_capped_visibility.cpp +++ b/src/mongo/db/storage/record_store_test_capped_visibility.cpp @@ -69,9 +69,9 @@ TEST(RecordStore_CappedVisibility, EmptyInitialState) { RecordId lowestHiddenId = doInsert(longLivedOp, rs); - // Collection still looks empty to iteration but not seekExact. + // Collection still looks empty to forward iteration but not reverse or seekExact. ASSERT(!rs->getCursor(longLivedOp.get(), true)->next()); - ASSERT(!rs->getCursor(longLivedOp.get(), false)->next()); + ASSERT_ID_EQ(rs->getCursor(longLivedOp.get(), false)->next(), lowestHiddenId); ASSERT_ID_EQ(rs->getCursor(longLivedOp.get())->seekExact(lowestHiddenId), lowestHiddenId); RecordId otherId; @@ -88,21 +88,22 @@ TEST(RecordStore_CappedVisibility, EmptyInitialState) { otherId = doInsert(txn, rs); ASSERT(!rs->getCursor(txn.get(), true)->next()); - ASSERT(!rs->getCursor(txn.get(), false)->next()); + ASSERT_ID_EQ(rs->getCursor(txn.get(), false)->next(), otherId); ASSERT_ID_EQ(rs->getCursor(txn.get())->seekExact(otherId), otherId); wuow.commit(); ASSERT(!rs->getCursor(txn.get(), true)->next()); - ASSERT(!rs->getCursor(txn.get(), false)->next()); + ASSERT_ID_EQ(rs->getCursor(txn.get(), false)->next(), otherId); ASSERT_ID_EQ(rs->getCursor(txn.get())->seekExact(otherId), otherId); ASSERT(!rs->getCursor(txn.get())->seekExact(lowestHiddenId)); } + // longLivedOp is still on old snapshot so it can't see otherId yet. ASSERT(!rs->getCursor(longLivedOp.get(), true)->next()); - ASSERT(!rs->getCursor(longLivedOp.get(), false)->next()); + ASSERT_ID_EQ(rs->getCursor(longLivedOp.get(), false)->next(), lowestHiddenId); ASSERT_ID_EQ(rs->getCursor(longLivedOp.get())->seekExact(lowestHiddenId), lowestHiddenId); - ASSERT(!rs->getCursor(longLivedOp.get())->seekExact(otherId)); // still on old snapshot. + ASSERT(!rs->getCursor(longLivedOp.get())->seekExact(otherId)); // This makes all documents visible and lets longLivedOp get a new snapshot. longLivedWuow.commit(); @@ -140,7 +141,7 @@ TEST(RecordStore_CappedVisibility, NonEmptyInitialState) { // Collection still looks like it only has a single doc to iteration but not seekExact. ASSERT_ID_EQ(rs->getCursor(longLivedOp.get(), true)->next(), initialId); - ASSERT_ID_EQ(rs->getCursor(longLivedOp.get(), false)->next(), initialId); + ASSERT_ID_EQ(rs->getCursor(longLivedOp.get(), false)->next(), lowestHiddenId); ASSERT_ID_EQ(rs->getCursor(longLivedOp.get())->seekExact(initialId), initialId); ASSERT_ID_EQ(rs->getCursor(longLivedOp.get())->seekExact(lowestHiddenId), lowestHiddenId); @@ -159,21 +160,22 @@ TEST(RecordStore_CappedVisibility, NonEmptyInitialState) { otherId = doInsert(txn, rs); ASSERT_ID_EQ(rs->getCursor(txn.get(), true)->next(), initialId); - ASSERT_ID_EQ(rs->getCursor(txn.get(), false)->next(), initialId); + ASSERT_ID_EQ(rs->getCursor(txn.get(), false)->next(), otherId); ASSERT_ID_EQ(rs->getCursor(txn.get())->seekExact(otherId), otherId); wuow.commit(); ASSERT_ID_EQ(rs->getCursor(txn.get(), true)->next(), initialId); - ASSERT_ID_EQ(rs->getCursor(txn.get(), false)->next(), initialId); + ASSERT_ID_EQ(rs->getCursor(txn.get(), false)->next(), otherId); ASSERT_ID_EQ(rs->getCursor(txn.get())->seekExact(otherId), otherId); ASSERT(!rs->getCursor(txn.get())->seekExact(lowestHiddenId)); } + // longLivedOp is still on old snapshot so it can't see otherId yet. ASSERT_ID_EQ(rs->getCursor(longLivedOp.get(), true)->next(), initialId); - ASSERT_ID_EQ(rs->getCursor(longLivedOp.get(), false)->next(), initialId); + ASSERT_ID_EQ(rs->getCursor(longLivedOp.get(), false)->next(), lowestHiddenId); ASSERT_ID_EQ(rs->getCursor(longLivedOp.get())->seekExact(lowestHiddenId), lowestHiddenId); - ASSERT(!rs->getCursor(longLivedOp.get())->seekExact(otherId)); // still on old snapshot. + ASSERT(!rs->getCursor(longLivedOp.get())->seekExact(otherId)); // This makes all documents visible and lets longLivedOp get a new snapshot. longLivedWuow.commit(); diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp index 7edd75781c5..bd6bdceb5e9 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp @@ -39,6 +39,7 @@ #include "mongo/base/checked_cast.h" #include "mongo/bson/util/builder.h" +#include "mongo/db/client.h" #include "mongo/db/concurrency/locker.h" #include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/namespace_string.h" @@ -91,7 +92,7 @@ bool shouldUseOplogHack(OperationContext* opCtx, const std::string& uri) { } // namespace MONGO_FP_DECLARE(WTWriteConflictException); -MONGO_FP_DECLARE(WTEmulateOutOfOrderNextRecordId); +MONGO_FP_DECLARE(WTPausePrimaryOplogDurabilityLoop); const std::string kWiredTigerEngineName = "wiredTiger"; @@ -445,31 +446,7 @@ public: WT_CURSOR* c = _cursor->get(); - bool mustAdvance = !_skipNextAdvance; - if (_lastReturnedId.isNull() && !_forward && _rs._isCapped) { - // In this case we need to seek to the highest visible record. - const RecordId reverseCappedInitialSeekPoint = - _readUntilForOplog.isNull() ? _rs.lowestCappedHiddenRecord() : _readUntilForOplog; - - if (!reverseCappedInitialSeekPoint.isNull()) { - c->set_key(c, _makeKey(reverseCappedInitialSeekPoint)); - int cmp; - int seekRet = WT_OP_CHECK(c->search_near(c, &cmp)); - if (seekRet == WT_NOTFOUND) { - _eof = true; - return {}; - } - invariantWTOK(seekRet); - - // If we landed at or past the lowest hidden record, we must advance to be in - // the visible range. - mustAdvance = _rs.isCappedHidden(reverseCappedInitialSeekPoint) - ? (cmp >= 0) - : (cmp > 0); // No longer hidden. - } - } - - if (mustAdvance) { + if (!_skipNextAdvance) { // Nothing after the next line can throw WCEs. // Note that an unpositioned (or eof) WT_CURSOR returns the first/last entry in the // table when you call next/prev. @@ -486,13 +463,6 @@ public: invariantWTOK(c->get_key(c, &key)); RecordId id = _fromKey(key); - if (_forward && MONGO_FAIL_POINT(WTEmulateOutOfOrderNextRecordId)) { - log() << "WTEmulateOutOfOrderNextRecordId fail point has triggerd so RecordId is now " - "RecordId(1) instead of " << id; - // Replace the found RecordId with a (small) fake one. - id = RecordId{1}; - } - if (_forward && _lastReturnedId >= id) { log() << "WTCursor::next -- c->next_key ( " << id << ") was not greater than _lastReturnedId (" << _lastReturnedId @@ -612,6 +582,9 @@ private: if (!_rs._isCapped) return true; + if (!_forward) + return true; + if (_readUntilForOplog.isNull() || !_rs._isOplog) { // this is the normal capped case return !_rs.isCappedHidden(id); @@ -866,11 +839,18 @@ WiredTigerRecordStore::WiredTigerRecordStore(OperationContext* ctx, if (WiredTigerKVEngine::initRsOplogBackgroundThread(ns)) { _oplogStones = std::make_shared(ctx, this); } + + if (_isOplog) { + _oplogJournalThread = stdx::thread(&WiredTigerRecordStore::_oplogJournalThreadLoop, + this, + WiredTigerRecoveryUnit::get(ctx)->getSessionCache()); + } } WiredTigerRecordStore::~WiredTigerRecordStore() { { stdx::lock_guard lk(_cappedDeleterMutex); // NOLINT + stdx::lock_guard lk2(_uncommittedRecordIdsMutex); _shuttingDown = true; } @@ -882,6 +862,11 @@ WiredTigerRecordStore::~WiredTigerRecordStore() { if (_oplogStones) { _oplogStones->kill(); } + + if (_oplogJournalThread.joinable()) { + _opsWaitingForJournalCV.notify_one(); + _oplogJournalThread.join(); + } } const char* WiredTigerRecordStore::name() const { @@ -1121,6 +1106,7 @@ int64_t WiredTigerRecordStore::cappedDeleteAsNeeded_inlock(OperationContext* txn ++docsRemoved; sizeSaved += old_value.size; + stdx::lock_guard cappedCallbackLock(_cappedCallbackMutex); if (_cappedCallback) { uassertStatusOK(_cappedCallback->aboutToDeleteCapped( txn, @@ -1337,10 +1323,25 @@ StatusWith WiredTigerRecordStore::insertRecord(OperationContext* txn, return StatusWith(records[0].id); } -void WiredTigerRecordStore::_dealtWithCappedId(SortedRecordIds::iterator it) { +void WiredTigerRecordStore::_dealtWithCappedId(SortedRecordIds::iterator it, bool didCommit) { invariant(&(*it) != NULL); stdx::lock_guard lk(_uncommittedRecordIdsMutex); - _uncommittedRecordIds.erase(it); + if (didCommit && _isOplog && *it != _oplog_highestSeen) { + // Defer removal from _uncommittedRecordIds until it is durable. We don't need to wait for + // durability of ops that didn't commit because they won't become durable. + // As an optimization, we only defer visibility until durable if new ops were created while + // we were pending. This makes single-threaded w>1 workloads faster and is safe because + // durability follows commit order for commits that are fully sequenced (B doesn't call + // commit until after A's commit call returns). + const bool wasEmpty = _opsWaitingForJournal.empty(); + _opsWaitingForJournal.push_back(it); + if (wasEmpty) { + _opsWaitingForJournalCV.notify_one(); + } + } else { + _uncommittedRecordIds.erase(it); + _opsBecameVisibleCV.notify_all(); + } } bool WiredTigerRecordStore::isCappedHidden(const RecordId& id) const { @@ -1431,10 +1432,13 @@ std::unique_ptr WiredTigerRecordStore::getCursor(Operation bool forward) const { if (_isOplog && forward) { WiredTigerRecoveryUnit* wru = WiredTigerRecoveryUnit::get(txn); - if (!wru->inActiveTxn() || wru->getOplogReadTill().isNull()) { - // if we don't have a session, we have no snapshot, so we can update our view - _oplogSetStartHack(wru); + // If we already have a snapshot we don't know what it can see, unless we know no one + // else could be writing (because we hold an exclusive lock). + if (wru->inActiveTxn() && !txn->lockState()->isNoop() && + !txn->lockState()->isCollectionLockedForMode(_ns, MODE_X)) { + throw WriteConflictException(); } + _oplogSetStartHack(wru); } return stdx::make_unique(txn, *this, forward); @@ -1627,22 +1631,78 @@ public: : _rs(rs), _it(it) {} virtual void commit() { + _rs->_dealtWithCappedId(_it, true); // Do not notify here because all committed inserts notify, always. - _rs->_dealtWithCappedId(_it); } virtual void rollback() { // Notify on rollback since it might make later commits visible. - _rs->_dealtWithCappedId(_it); + _rs->_dealtWithCappedId(_it, false); + stdx::lock_guard lk(_rs->_cappedCallbackMutex); if (_rs->_cappedCallback) _rs->_cappedCallback->notifyCappedWaitersIfNeeded(); } private: - WiredTigerRecordStore* _rs; - SortedRecordIds::iterator _it; + WiredTigerRecordStore* const _rs; + const SortedRecordIds::iterator _it; }; +void WiredTigerRecordStore::_oplogJournalThreadLoop(WiredTigerSessionCache* sessionCache) try { + Client::initThread("WTOplogJournalThread"); + while (true) { + stdx::unique_lock lk(_uncommittedRecordIdsMutex); + _opsWaitingForJournalCV.wait( + lk, [&] { return _shuttingDown || !_opsWaitingForJournal.empty(); }); + + while (!_shuttingDown && MONGO_FAIL_POINT(WTPausePrimaryOplogDurabilityLoop)) { + lk.unlock(); + sleepmillis(10); + lk.lock(); + } + + if (_shuttingDown) + return; + + decltype(_opsWaitingForJournal) opsAboutToBeJournaled = {}; + _opsWaitingForJournal.swap(opsAboutToBeJournaled); + + lk.unlock(); + sessionCache->waitUntilDurable(/*forceCheckpoint=*/false); + lk.lock(); + + for (auto&& op : opsAboutToBeJournaled) { + _uncommittedRecordIds.erase(op); + } + + _opsBecameVisibleCV.notify_all(); + lk.unlock(); + + stdx::lock_guard cappedCallbackLock(_cappedCallbackMutex); + if (_cappedCallback) { + _cappedCallback->notifyCappedWaitersIfNeeded(); + } + } +} catch (...) { + std::terminate(); +} + +void WiredTigerRecordStore::waitForAllEarlierOplogWritesToBeVisible(OperationContext* txn) const { + invariant(txn->lockState()->isNoop() || !txn->lockState()->inAWriteUnitOfWork()); + + // This function must not start a WT transaction, otherwise we will get stuck in an infinite + // loop of WCE handling when the getCursor() is called. + + stdx::unique_lock lk(_uncommittedRecordIdsMutex); + const auto waitingFor = _oplog_highestSeen; + while (!_uncommittedRecordIds.empty() && _uncommittedRecordIds.front() <= waitingFor) { + // We can't use a simple wait() here because we need to wake up periodically to check for + // interrupt and OperationContext::waitForConditionOrInterrupt doesn't exist on this branch. + txn->checkForInterrupt(); + _opsBecameVisibleCV.wait_for(lk, Microseconds(Seconds(10))); + } +} + void WiredTigerRecordStore::_addUncommitedRecordId_inlock(OperationContext* txn, const RecordId& id) { // todo: make this a dassert at some point @@ -1784,13 +1844,17 @@ void WiredTigerRecordStore::temp_cappedTruncateAfter(OperationContext* txn, } // Compute the number and associated sizes of the records to delete. - do { - if (_cappedCallback) { - uassertStatusOK(_cappedCallback->aboutToDeleteCapped(txn, record->id, record->data)); - } - recordsRemoved++; - bytesRemoved += record->data.size(); - } while ((record = cursor.next())); + { + stdx::lock_guard cappedCallbackLock(_cappedCallbackMutex); + do { + if (_cappedCallback) { + uassertStatusOK( + _cappedCallback->aboutToDeleteCapped(txn, record->id, record->data)); + } + recordsRemoved++; + bytesRemoved += record->data.size(); + } while ((record = cursor.next())); + } // Truncate the collection starting from the record located at 'firstRemovedId' to the end of // the collection. diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h index 3335e774c4c..1fab25f8be4 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h @@ -39,8 +39,10 @@ #include "mongo/db/storage/capped_callback.h" #include "mongo/db/storage/record_store.h" #include "mongo/platform/atomic_word.h" +#include "mongo/stdx/condition_variable.h" #include "mongo/stdx/mutex.h" #include "mongo/util/concurrency/synchronization.h" +#include "mongo/stdx/thread.h" #include "mongo/util/fail_point_service.h" /** @@ -53,6 +55,7 @@ namespace mongo { class RecoveryUnit; class WiredTigerCursor; +class WiredTigerSessionCache; class WiredTigerRecoveryUnit; class WiredTigerSizeStorer; @@ -193,6 +196,9 @@ public: long long numRecords, long long dataSize); + + void waitForAllEarlierOplogWritesToBeVisible(OperationContext* txn) const override; + bool isOplog() const { return _isOplog; } @@ -201,8 +207,10 @@ public: } void setCappedCallback(CappedCallback* cb) { + stdx::lock_guard lk(_cappedCallbackMutex); _cappedCallback = cb; } + int64_t cappedMaxDocs() const; int64_t cappedMaxSize() const; @@ -255,7 +263,7 @@ private: static int64_t _makeKey(const RecordId& id); static RecordId _fromKey(int64_t k); - void _dealtWithCappedId(SortedRecordIds::iterator it); + void _dealtWithCappedId(SortedRecordIds::iterator it, bool didCommit); void _addUncommitedRecordId_inlock(OperationContext* txn, const RecordId& id); RecordId _nextId(); @@ -265,6 +273,7 @@ private: void _increaseDataSize(OperationContext* txn, int64_t amount); RecordData _getData(const WiredTigerCursor& cursor) const; void _oplogSetStartHack(WiredTigerRecoveryUnit* wru) const; + void _oplogJournalThreadLoop(WiredTigerSessionCache* sessionCache); const std::string _uri; const uint64_t _tableId; // not persisted @@ -284,6 +293,7 @@ private: AtomicInt64 _cappedSleep; AtomicInt64 _cappedSleepMS; CappedCallback* _cappedCallback; + stdx::mutex _cappedCallbackMutex; // guards _cappedCallback. // See comment in ::cappedDeleteAsNeeded int _cappedDeleteCheckCount; @@ -292,7 +302,6 @@ private: const bool _useOplogHack; SortedRecordIds _uncommittedRecordIds; - RecordId _oplog_visibleTo; RecordId _oplog_highestSeen; mutable stdx::mutex _uncommittedRecordIdsMutex; @@ -307,8 +316,19 @@ private: // Non-null if this record store is underlying the active oplog. std::shared_ptr _oplogStones; + + // These use the _uncommittedRecordIdsMutex and are only used when _isOplog is true. + stdx::condition_variable _opsWaitingForJournalCV; + mutable stdx::condition_variable _opsBecameVisibleCV; + std::vector _opsWaitingForJournal; + stdx::thread _oplogJournalThread; }; // WT failpoint to throw write conflict exceptions randomly MONGO_FP_FORWARD_DECLARE(WTWriteConflictException); + +// Prevents oplog writes from being considered durable on the primary. Once activated, new writes +// will not be considered durable until deactivated. It is unspecified whether writes that commit +// before activation will become visible while active. +MONGO_FP_FORWARD_DECLARE(WTPausePrimaryOplogDurabilityLoop); } diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_test.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_test.cpp index 7dcc4033f70..b65c933e744 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_test.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_test.cpp @@ -48,6 +48,8 @@ #include "mongo/db/storage/wiredtiger/wiredtiger_util.h" #include "mongo/unittest/temp_dir.h" #include "mongo/unittest/unittest.h" +#include "mongo/util/fail_point.h" +#include "mongo/util/scopeguard.h" namespace mongo { @@ -754,7 +756,7 @@ TEST(WiredTigerRecordStoreTest, CappedCursorRollover) { ASSERT(!cursor->next()); } -RecordId _oplogOrderInsertOplog(OperationContext* txn, unique_ptr& rs, int inc) { +RecordId _oplogOrderInsertOplog(OperationContext* txn, const unique_ptr& rs, int inc) { Timestamp opTime = Timestamp(5, inc); WiredTigerRecordStore* wrs = checked_cast(rs.get()); Status status = wrs->oplogDiskLocRegister(txn, opTime); @@ -831,6 +833,8 @@ TEST(WiredTigerRecordStoreTest, OplogOrder) { w1.commit(); } + rs->waitForAllEarlierOplogWritesToBeVisible(harnessHelper->newOperationContext().get()); + { // now all 3 docs should be visible unique_ptr opCtx(harnessHelper->newOperationContext()); auto cursor = rs->getCursor(opCtx.get()); @@ -885,6 +889,8 @@ TEST(WiredTigerRecordStoreTest, OplogOrder) { w1.commit(); } + rs->waitForAllEarlierOplogWritesToBeVisible(harnessHelper->newOperationContext().get()); + { // now all 3 docs should be visible unique_ptr opCtx(harnessHelper->newOperationContext()); auto cursor = rs->getCursor(opCtx.get()); @@ -896,6 +902,83 @@ TEST(WiredTigerRecordStoreTest, OplogOrder) { } } +// Test that even when the oplog durability loop is paused, we can still advance the commit point as +// long as the commit for each insert comes before the next insert starts. +TEST(WiredTigerRecordStoreTest, OplogDurableVisibilityInOrder) { + ON_BLOCK_EXIT([] { WTPausePrimaryOplogDurabilityLoop.setMode(FailPoint::off); }); + WTPausePrimaryOplogDurabilityLoop.setMode(FailPoint::alwaysOn); + + unique_ptr harnessHelper(new WiredTigerHarnessHelper()); + unique_ptr rs(harnessHelper->newCappedRecordStore("local.oplog.foo", 100000, -1)); + auto wtrs = checked_cast(rs.get()); + + { + auto opCtx(harnessHelper->newOperationContext()); + WriteUnitOfWork uow(opCtx.get()); + RecordId id = _oplogOrderInsertOplog(opCtx.get(), rs, 1); + ASSERT(wtrs->isCappedHidden(id)); + uow.commit(); + ASSERT(!wtrs->isCappedHidden(id)); + } + + { + auto opCtx(harnessHelper->newOperationContext()); + WriteUnitOfWork uow(opCtx.get()); + RecordId id = _oplogOrderInsertOplog(opCtx.get(), rs, 2); + ASSERT(wtrs->isCappedHidden(id)); + uow.commit(); + ASSERT(!wtrs->isCappedHidden(id)); + } +} + +// Test that Oplog entries inserted while there are hidden entries do not become visible until the +// op and all earlier ops are durable. +TEST(WiredTigerRecordStoreTest, OplogDurableVisibilityOutOfOrder) { + ON_BLOCK_EXIT([] { WTPausePrimaryOplogDurabilityLoop.setMode(FailPoint::off); }); + WTPausePrimaryOplogDurabilityLoop.setMode(FailPoint::alwaysOn); + + unique_ptr harnessHelper(new WiredTigerHarnessHelper()); + unique_ptr rs(harnessHelper->newCappedRecordStore("local.oplog.foo", 100000, -1)); + + auto wtrs = checked_cast(rs.get()); + + auto longLivedOp(harnessHelper->newOperationContext()); + WriteUnitOfWork uow(longLivedOp.get()); + RecordId id1 = _oplogOrderInsertOplog(longLivedOp.get(), rs, 1); + ASSERT(wtrs->isCappedHidden(id1)); + + + RecordId id2; + { + auto innerClient = harnessHelper->serviceContext()->makeClient("inner"); + auto opCtx(harnessHelper->newOperationContext(innerClient.get())); + WriteUnitOfWork uow(opCtx.get()); + id2 = _oplogOrderInsertOplog(opCtx.get(), rs, 2); + ASSERT(wtrs->isCappedHidden(id2)); + uow.commit(); + } + + ASSERT(wtrs->isCappedHidden(id1)); + ASSERT(wtrs->isCappedHidden(id2)); + + uow.commit(); + + ASSERT(wtrs->isCappedHidden(id1)); + ASSERT(wtrs->isCappedHidden(id2)); + + // Wait a bit and check again to make sure they don't become visible automatically. + sleepsecs(1); + ASSERT(wtrs->isCappedHidden(id1)); + ASSERT(wtrs->isCappedHidden(id2)); + + WTPausePrimaryOplogDurabilityLoop.setMode(FailPoint::off); + + rs->waitForAllEarlierOplogWritesToBeVisible(longLivedOp.get()); + + ASSERT(!wtrs->isCappedHidden(id1)); + ASSERT(!wtrs->isCappedHidden(id2)); +} + TEST(WiredTigerRecordStoreTest, StorageSizeStatisticsDisabled) { WiredTigerHarnessHelper harnessHelper("statistics=(none)"); unique_ptr rs(harnessHelper.newNonCappedRecordStore("a.b")); diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp index 8c002cc9e57..58fd74c2ee0 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp @@ -213,6 +213,7 @@ void WiredTigerRecoveryUnit::_txnClose(bool commit) { } _active = false; _mySnapshotId = nextSnapshotId.fetchAndAdd(1); + _oplogReadTill = RecordId(); } SnapshotId WiredTigerRecoveryUnit::getSnapshotId() const { diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp index f6a1ccca150..54fff70be4d 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp @@ -211,7 +211,7 @@ void WiredTigerSessionCache::waitUntilDurable(bool forceCheckpoint) { JournalListener::Token token = _journalListener->getToken(); // Use the journal when available, or a checkpoint otherwise. - if (_engine->isDurable()) { + if (_engine && _engine->isDurable()) { invariantWTOK(s->log_flush(s, "sync=on")); LOG(4) << "flushed journal"; } else { diff --git a/src/mongo/dbtests/repltests.cpp b/src/mongo/dbtests/repltests.cpp index f53a2545968..f01c2d88fef 100644 --- a/src/mongo/dbtests/repltests.cpp +++ b/src/mongo/dbtests/repltests.cpp @@ -152,37 +152,17 @@ protected: return count; } int opCount() { - ScopedTransaction transaction(&_txn, MODE_X); - Lock::GlobalWrite lk(_txn.lockState()); - OldClientContext ctx(&_txn, cllNS()); - - Database* db = ctx.db(); - Collection* coll = db->getCollection(cllNS()); - if (!coll) { - WriteUnitOfWork wunit(&_txn); - coll = db->createCollection(&_txn, cllNS()); - wunit.commit(); - } - - int count = 0; - auto cursor = coll->getCursor(&_txn); - while (auto record = cursor->next()) { - ++count; - } - return count; + return DBDirectClient(&_txn).query(cllNS(), BSONObj())->itcount(); } void applyAllOperations() { ScopedTransaction transaction(&_txn, MODE_X); Lock::GlobalWrite lk(_txn.lockState()); vector ops; { - OldClientContext ctx(&_txn, cllNS()); - Database* db = ctx.db(); - Collection* coll = db->getCollection(cllNS()); - - auto cursor = coll->getCursor(&_txn); - while (auto record = cursor->next()) { - ops.push_back(record->data.releaseToBson().getOwned()); + DBDirectClient db(&_txn); + auto cursor = db.query(cllNS(), BSONObj()); + while (cursor->more()) { + ops.push_back(cursor->nextSafe().getOwned()); } } { -- cgit v1.2.1