summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMathias Stearn <mathias@10gen.com>2016-10-18 16:23:24 -0400
committerMathias Stearn <redbeard0531@gmail.com>2016-11-14 18:45:25 -0500
commitda854a0793e210a4236dedf9d8895dee1f098125 (patch)
tree8dc2868af77e603f3691bb33cb6d506e7dafd945
parent0b3f66a03f21ce0aa0d86f3035ce4b67bcbbcba4 (diff)
downloadmongo-da854a0793e210a4236dedf9d8895dee1f098125.tar.gz
SERVER-26685 Tie oplog visibility to durability
(cherry picked from commit 8a2f2fc1883f5deb1b23915cd7a47686a623ba87)
-rw-r--r--src/mongo/db/catalog/collection.cpp5
-rw-r--r--src/mongo/db/catalog/database.cpp7
-rw-r--r--src/mongo/db/concurrency/locker.h14
-rw-r--r--src/mongo/db/concurrency/locker_noop.h4
-rw-r--r--src/mongo/db/exec/collection_scan.cpp14
-rw-r--r--src/mongo/db/storage/devnull/devnull_kv_engine.cpp2
-rw-r--r--src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_record_store.h2
-rw-r--r--src/mongo/db/storage/mmap_v1/heap_record_store_btree.h4
-rw-r--r--src/mongo/db/storage/mmap_v1/record_store_v1_base.h2
-rw-r--r--src/mongo/db/storage/record_store.h16
-rw-r--r--src/mongo/db/storage/record_store_test_capped_visibility.cpp24
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp162
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h24
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_record_store_test.cpp85
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp1
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp2
-rw-r--r--src/mongo/dbtests/repltests.cpp30
17 files changed, 299 insertions, 99 deletions
diff --git a/src/mongo/db/catalog/collection.cpp b/src/mongo/db/catalog/collection.cpp
index 12c2506b524..0043b20a4c5 100644
--- a/src/mongo/db/catalog/collection.cpp
+++ b/src/mongo/db/catalog/collection.cpp
@@ -196,10 +196,11 @@ Collection::Collection(OperationContext* txn,
Collection::~Collection() {
verify(ok());
- _magic = 0;
- if (_cappedNotifier) {
+ if (isCapped()) {
+ _recordStore->setCappedCallback(nullptr);
_cappedNotifier->kill();
}
+ _magic = 0;
}
bool Collection::requiresIdIndex() const {
diff --git a/src/mongo/db/catalog/database.cpp b/src/mongo/db/catalog/database.cpp
index de014156efe..a54a5e4798c 100644
--- a/src/mongo/db/catalog/database.cpp
+++ b/src/mongo/db/catalog/database.cpp
@@ -386,11 +386,12 @@ Status Database::dropCollection(OperationContext* txn, StringData fullns) {
Top::get(txn->getClient()->getServiceContext()).collectionDropped(fullns);
- s = _dbEntry->dropCollection(txn, fullns);
-
- // we want to do this always
+ // We want to destroy the Collection object before telling the StorageEngine to destroy the
+ // RecordStore.
_clearCollectionCache(txn, fullns, "collection dropped");
+ s = _dbEntry->dropCollection(txn, fullns);
+
if (!s.isOK())
return s;
diff --git a/src/mongo/db/concurrency/locker.h b/src/mongo/db/concurrency/locker.h
index cdc490e1379..c664b3841d7 100644
--- a/src/mongo/db/concurrency/locker.h
+++ b/src/mongo/db/concurrency/locker.h
@@ -49,7 +49,19 @@ public:
virtual ~Locker() {}
/**
- * Require global lock attempts with obtain tickets from 'reading' (for MODE_S and MODE_IS),
+ * Returns true if this is an instance of LockerNoop. Because LockerNoop doesn't implement many
+ * methods, some users may need to check this first to find out what is safe to call. LockerNoop
+ * is only used in unittests and for a brief period at startup, so you can assume you hold the
+ * equivalent of a MODE_X lock when using it.
+ *
+ * TODO get rid of this once we kill LockerNoop.
+ */
+ virtual bool isNoop() const {
+ return false;
+ }
+
+ /**
+ * Require global lock attempts to obtain tickets from 'reading' (for MODE_S and MODE_IS),
* and from 'writing' (for MODE_IX), which must have static lifetimes. There is no throttling
* for MODE_X, as there can only ever be a single locker using this mode. The throttling is
* intended to defend against arge drops in throughput under high load due to too much
diff --git a/src/mongo/db/concurrency/locker_noop.h b/src/mongo/db/concurrency/locker_noop.h
index cfbd3234c2c..9e8d3b6300a 100644
--- a/src/mongo/db/concurrency/locker_noop.h
+++ b/src/mongo/db/concurrency/locker_noop.h
@@ -41,6 +41,10 @@ class LockerNoop : public Locker {
public:
LockerNoop() {}
+ virtual bool isNoop() const {
+ return true;
+ }
+
virtual ClientState getClientState() const {
invariant(false);
}
diff --git a/src/mongo/db/exec/collection_scan.cpp b/src/mongo/db/exec/collection_scan.cpp
index 74d64c39cc8..db07439fea9 100644
--- a/src/mongo/db/exec/collection_scan.cpp
+++ b/src/mongo/db/exec/collection_scan.cpp
@@ -97,6 +97,20 @@ PlanStage::StageState CollectionScan::work(WorkingSetID* out) {
try {
if (needToMakeCursor) {
const bool forward = _params.direction == CollectionScanParams::FORWARD;
+
+ if (forward && !_params.tailable && _params.collection->ns().isOplog()) {
+ // Forward, non-tailable scans from the oplog need to wait until all oplog entries
+ // before the read begins to be visible. This isn't needed for reverse scans because
+ // we only hide oplog entries from forward scans, and it isn't necessary for tailing
+ // cursors because they ignore EOF and will eventually see all writes. Forward,
+ // non-tailable scans are the only case where a meaningful EOF will be seen that
+ // might not include writes that finished before the read started. This also must be
+ // done before we create the cursor as that is when we establish the endpoint for
+ // the cursor.
+ _params.collection->getRecordStore()->waitForAllEarlierOplogWritesToBeVisible(
+ getOpCtx());
+ }
+
_cursor = _params.collection->getCursor(getOpCtx(), forward);
if (!_lastSeenId.isNull()) {
diff --git a/src/mongo/db/storage/devnull/devnull_kv_engine.cpp b/src/mongo/db/storage/devnull/devnull_kv_engine.cpp
index 9c82309d5c9..d601702623f 100644
--- a/src/mongo/db/storage/devnull/devnull_kv_engine.cpp
+++ b/src/mongo/db/storage/devnull/devnull_kv_engine.cpp
@@ -163,6 +163,8 @@ public:
return Status::OK();
}
+ void waitForAllEarlierOplogWritesToBeVisible(OperationContext* txn) const override {}
+
virtual void updateStatsAfterRepair(OperationContext* txn,
long long numRecords,
long long dataSize) {}
diff --git a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_record_store.h b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_record_store.h
index 324a30653eb..3d1ef8f111b 100644
--- a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_record_store.h
+++ b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_record_store.h
@@ -121,6 +121,8 @@ public:
virtual boost::optional<RecordId> oplogStartHack(OperationContext* txn,
const RecordId& startingPosition) const;
+ void waitForAllEarlierOplogWritesToBeVisible(OperationContext* txn) const override {}
+
virtual void updateStatsAfterRepair(OperationContext* txn,
long long numRecords,
long long dataSize) {
diff --git a/src/mongo/db/storage/mmap_v1/heap_record_store_btree.h b/src/mongo/db/storage/mmap_v1/heap_record_store_btree.h
index a496f1ff31e..2c40bcb61ab 100644
--- a/src/mongo/db/storage/mmap_v1/heap_record_store_btree.h
+++ b/src/mongo/db/storage/mmap_v1/heap_record_store_btree.h
@@ -154,6 +154,10 @@ public:
invariant(false);
}
+ void waitForAllEarlierOplogWritesToBeVisible(OperationContext* txn) const override {
+ invariant(false);
+ }
+
virtual void updateStatsAfterRepair(OperationContext* txn,
long long numRecords,
long long dataSize) {
diff --git a/src/mongo/db/storage/mmap_v1/record_store_v1_base.h b/src/mongo/db/storage/mmap_v1/record_store_v1_base.h
index d34c5a9b3f0..6b7b6c1630e 100644
--- a/src/mongo/db/storage/mmap_v1/record_store_v1_base.h
+++ b/src/mongo/db/storage/mmap_v1/record_store_v1_base.h
@@ -256,6 +256,8 @@ public:
/* return which "deleted bucket" for this size object */
static int bucket(int size);
+ void waitForAllEarlierOplogWritesToBeVisible(OperationContext* txn) const override {}
+
virtual void updateStatsAfterRepair(OperationContext* txn,
long long numRecords,
long long dataSize) {
diff --git a/src/mongo/db/storage/record_store.h b/src/mongo/db/storage/record_store.h
index b6e973a16ab..499bcbacdd2 100644
--- a/src/mongo/db/storage/record_store.h
+++ b/src/mongo/db/storage/record_store.h
@@ -127,12 +127,11 @@ struct BsonRecord {
* IMPORTANT NOTE FOR DOCUMENT-LOCKING ENGINES: If you implement capped collections with a
* "visibility" system such that documents that exist in your snapshot but were inserted after
* the last uncommitted document are hidden, you must follow the following rules:
- * - next() must never return invisible documents.
+ * - next() on forward cursors must never return invisible documents.
* - If next() on a forward cursor hits an invisible document, it should behave as if it hit
* the end of the collection.
- * - When next() on a reverse cursor seeks to the end of the collection it must return the
- * newest visible document. This should only return boost::none if there are no visible
- * documents in the collection.
+ * - Reverse cursors must ignore the visibility filter. That means that they initially return the
+ * newest committed record in the collection and may skip over uncommitted records.
* - SeekableRecordCursor::seekExact() must ignore the visibility filter and return the requested
* document even if it is supposed to be invisible.
* TODO SERVER-18934 Handle this above the storage engine layer so storage engines don't have to
@@ -580,6 +579,15 @@ public:
}
/**
+ * Waits for all writes that completed before this call to be visible to forward scans.
+ * See the comment on RecordCursor for more details about the visibility rules.
+ *
+ * It is only legal to call this on an oplog. It is illegal to call this inside a
+ * WriteUnitOfWork.
+ */
+ virtual void waitForAllEarlierOplogWritesToBeVisible(OperationContext* txn) const = 0;
+
+ /**
* Called after a repair operation is run with the recomputed numRecords and dataSize.
*/
virtual void updateStatsAfterRepair(OperationContext* txn,
diff --git a/src/mongo/db/storage/record_store_test_capped_visibility.cpp b/src/mongo/db/storage/record_store_test_capped_visibility.cpp
index 1f7d7afa87c..3d1765564b5 100644
--- a/src/mongo/db/storage/record_store_test_capped_visibility.cpp
+++ b/src/mongo/db/storage/record_store_test_capped_visibility.cpp
@@ -69,9 +69,9 @@ TEST(RecordStore_CappedVisibility, EmptyInitialState) {
RecordId lowestHiddenId = doInsert(longLivedOp, rs);
- // Collection still looks empty to iteration but not seekExact.
+ // Collection still looks empty to forward iteration but not reverse or seekExact.
ASSERT(!rs->getCursor(longLivedOp.get(), true)->next());
- ASSERT(!rs->getCursor(longLivedOp.get(), false)->next());
+ ASSERT_ID_EQ(rs->getCursor(longLivedOp.get(), false)->next(), lowestHiddenId);
ASSERT_ID_EQ(rs->getCursor(longLivedOp.get())->seekExact(lowestHiddenId), lowestHiddenId);
RecordId otherId;
@@ -88,21 +88,22 @@ TEST(RecordStore_CappedVisibility, EmptyInitialState) {
otherId = doInsert(txn, rs);
ASSERT(!rs->getCursor(txn.get(), true)->next());
- ASSERT(!rs->getCursor(txn.get(), false)->next());
+ ASSERT_ID_EQ(rs->getCursor(txn.get(), false)->next(), otherId);
ASSERT_ID_EQ(rs->getCursor(txn.get())->seekExact(otherId), otherId);
wuow.commit();
ASSERT(!rs->getCursor(txn.get(), true)->next());
- ASSERT(!rs->getCursor(txn.get(), false)->next());
+ ASSERT_ID_EQ(rs->getCursor(txn.get(), false)->next(), otherId);
ASSERT_ID_EQ(rs->getCursor(txn.get())->seekExact(otherId), otherId);
ASSERT(!rs->getCursor(txn.get())->seekExact(lowestHiddenId));
}
+ // longLivedOp is still on old snapshot so it can't see otherId yet.
ASSERT(!rs->getCursor(longLivedOp.get(), true)->next());
- ASSERT(!rs->getCursor(longLivedOp.get(), false)->next());
+ ASSERT_ID_EQ(rs->getCursor(longLivedOp.get(), false)->next(), lowestHiddenId);
ASSERT_ID_EQ(rs->getCursor(longLivedOp.get())->seekExact(lowestHiddenId), lowestHiddenId);
- ASSERT(!rs->getCursor(longLivedOp.get())->seekExact(otherId)); // still on old snapshot.
+ ASSERT(!rs->getCursor(longLivedOp.get())->seekExact(otherId));
// This makes all documents visible and lets longLivedOp get a new snapshot.
longLivedWuow.commit();
@@ -140,7 +141,7 @@ TEST(RecordStore_CappedVisibility, NonEmptyInitialState) {
// Collection still looks like it only has a single doc to iteration but not seekExact.
ASSERT_ID_EQ(rs->getCursor(longLivedOp.get(), true)->next(), initialId);
- ASSERT_ID_EQ(rs->getCursor(longLivedOp.get(), false)->next(), initialId);
+ ASSERT_ID_EQ(rs->getCursor(longLivedOp.get(), false)->next(), lowestHiddenId);
ASSERT_ID_EQ(rs->getCursor(longLivedOp.get())->seekExact(initialId), initialId);
ASSERT_ID_EQ(rs->getCursor(longLivedOp.get())->seekExact(lowestHiddenId), lowestHiddenId);
@@ -159,21 +160,22 @@ TEST(RecordStore_CappedVisibility, NonEmptyInitialState) {
otherId = doInsert(txn, rs);
ASSERT_ID_EQ(rs->getCursor(txn.get(), true)->next(), initialId);
- ASSERT_ID_EQ(rs->getCursor(txn.get(), false)->next(), initialId);
+ ASSERT_ID_EQ(rs->getCursor(txn.get(), false)->next(), otherId);
ASSERT_ID_EQ(rs->getCursor(txn.get())->seekExact(otherId), otherId);
wuow.commit();
ASSERT_ID_EQ(rs->getCursor(txn.get(), true)->next(), initialId);
- ASSERT_ID_EQ(rs->getCursor(txn.get(), false)->next(), initialId);
+ ASSERT_ID_EQ(rs->getCursor(txn.get(), false)->next(), otherId);
ASSERT_ID_EQ(rs->getCursor(txn.get())->seekExact(otherId), otherId);
ASSERT(!rs->getCursor(txn.get())->seekExact(lowestHiddenId));
}
+ // longLivedOp is still on old snapshot so it can't see otherId yet.
ASSERT_ID_EQ(rs->getCursor(longLivedOp.get(), true)->next(), initialId);
- ASSERT_ID_EQ(rs->getCursor(longLivedOp.get(), false)->next(), initialId);
+ ASSERT_ID_EQ(rs->getCursor(longLivedOp.get(), false)->next(), lowestHiddenId);
ASSERT_ID_EQ(rs->getCursor(longLivedOp.get())->seekExact(lowestHiddenId), lowestHiddenId);
- ASSERT(!rs->getCursor(longLivedOp.get())->seekExact(otherId)); // still on old snapshot.
+ ASSERT(!rs->getCursor(longLivedOp.get())->seekExact(otherId));
// This makes all documents visible and lets longLivedOp get a new snapshot.
longLivedWuow.commit();
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp
index 7edd75781c5..bd6bdceb5e9 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp
@@ -39,6 +39,7 @@
#include "mongo/base/checked_cast.h"
#include "mongo/bson/util/builder.h"
+#include "mongo/db/client.h"
#include "mongo/db/concurrency/locker.h"
#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/namespace_string.h"
@@ -91,7 +92,7 @@ bool shouldUseOplogHack(OperationContext* opCtx, const std::string& uri) {
} // namespace
MONGO_FP_DECLARE(WTWriteConflictException);
-MONGO_FP_DECLARE(WTEmulateOutOfOrderNextRecordId);
+MONGO_FP_DECLARE(WTPausePrimaryOplogDurabilityLoop);
const std::string kWiredTigerEngineName = "wiredTiger";
@@ -445,31 +446,7 @@ public:
WT_CURSOR* c = _cursor->get();
- bool mustAdvance = !_skipNextAdvance;
- if (_lastReturnedId.isNull() && !_forward && _rs._isCapped) {
- // In this case we need to seek to the highest visible record.
- const RecordId reverseCappedInitialSeekPoint =
- _readUntilForOplog.isNull() ? _rs.lowestCappedHiddenRecord() : _readUntilForOplog;
-
- if (!reverseCappedInitialSeekPoint.isNull()) {
- c->set_key(c, _makeKey(reverseCappedInitialSeekPoint));
- int cmp;
- int seekRet = WT_OP_CHECK(c->search_near(c, &cmp));
- if (seekRet == WT_NOTFOUND) {
- _eof = true;
- return {};
- }
- invariantWTOK(seekRet);
-
- // If we landed at or past the lowest hidden record, we must advance to be in
- // the visible range.
- mustAdvance = _rs.isCappedHidden(reverseCappedInitialSeekPoint)
- ? (cmp >= 0)
- : (cmp > 0); // No longer hidden.
- }
- }
-
- if (mustAdvance) {
+ if (!_skipNextAdvance) {
// Nothing after the next line can throw WCEs.
// Note that an unpositioned (or eof) WT_CURSOR returns the first/last entry in the
// table when you call next/prev.
@@ -486,13 +463,6 @@ public:
invariantWTOK(c->get_key(c, &key));
RecordId id = _fromKey(key);
- if (_forward && MONGO_FAIL_POINT(WTEmulateOutOfOrderNextRecordId)) {
- log() << "WTEmulateOutOfOrderNextRecordId fail point has triggerd so RecordId is now "
- "RecordId(1) instead of " << id;
- // Replace the found RecordId with a (small) fake one.
- id = RecordId{1};
- }
-
if (_forward && _lastReturnedId >= id) {
log() << "WTCursor::next -- c->next_key ( " << id
<< ") was not greater than _lastReturnedId (" << _lastReturnedId
@@ -612,6 +582,9 @@ private:
if (!_rs._isCapped)
return true;
+ if (!_forward)
+ return true;
+
if (_readUntilForOplog.isNull() || !_rs._isOplog) {
// this is the normal capped case
return !_rs.isCappedHidden(id);
@@ -866,11 +839,18 @@ WiredTigerRecordStore::WiredTigerRecordStore(OperationContext* ctx,
if (WiredTigerKVEngine::initRsOplogBackgroundThread(ns)) {
_oplogStones = std::make_shared<OplogStones>(ctx, this);
}
+
+ if (_isOplog) {
+ _oplogJournalThread = stdx::thread(&WiredTigerRecordStore::_oplogJournalThreadLoop,
+ this,
+ WiredTigerRecoveryUnit::get(ctx)->getSessionCache());
+ }
}
WiredTigerRecordStore::~WiredTigerRecordStore() {
{
stdx::lock_guard<boost::timed_mutex> lk(_cappedDeleterMutex); // NOLINT
+ stdx::lock_guard<stdx::mutex> lk2(_uncommittedRecordIdsMutex);
_shuttingDown = true;
}
@@ -882,6 +862,11 @@ WiredTigerRecordStore::~WiredTigerRecordStore() {
if (_oplogStones) {
_oplogStones->kill();
}
+
+ if (_oplogJournalThread.joinable()) {
+ _opsWaitingForJournalCV.notify_one();
+ _oplogJournalThread.join();
+ }
}
const char* WiredTigerRecordStore::name() const {
@@ -1121,6 +1106,7 @@ int64_t WiredTigerRecordStore::cappedDeleteAsNeeded_inlock(OperationContext* txn
++docsRemoved;
sizeSaved += old_value.size;
+ stdx::lock_guard<stdx::mutex> cappedCallbackLock(_cappedCallbackMutex);
if (_cappedCallback) {
uassertStatusOK(_cappedCallback->aboutToDeleteCapped(
txn,
@@ -1337,10 +1323,25 @@ StatusWith<RecordId> WiredTigerRecordStore::insertRecord(OperationContext* txn,
return StatusWith<RecordId>(records[0].id);
}
-void WiredTigerRecordStore::_dealtWithCappedId(SortedRecordIds::iterator it) {
+void WiredTigerRecordStore::_dealtWithCappedId(SortedRecordIds::iterator it, bool didCommit) {
invariant(&(*it) != NULL);
stdx::lock_guard<stdx::mutex> lk(_uncommittedRecordIdsMutex);
- _uncommittedRecordIds.erase(it);
+ if (didCommit && _isOplog && *it != _oplog_highestSeen) {
+ // Defer removal from _uncommittedRecordIds until it is durable. We don't need to wait for
+ // durability of ops that didn't commit because they won't become durable.
+ // As an optimization, we only defer visibility until durable if new ops were created while
+ // we were pending. This makes single-threaded w>1 workloads faster and is safe because
+ // durability follows commit order for commits that are fully sequenced (B doesn't call
+ // commit until after A's commit call returns).
+ const bool wasEmpty = _opsWaitingForJournal.empty();
+ _opsWaitingForJournal.push_back(it);
+ if (wasEmpty) {
+ _opsWaitingForJournalCV.notify_one();
+ }
+ } else {
+ _uncommittedRecordIds.erase(it);
+ _opsBecameVisibleCV.notify_all();
+ }
}
bool WiredTigerRecordStore::isCappedHidden(const RecordId& id) const {
@@ -1431,10 +1432,13 @@ std::unique_ptr<SeekableRecordCursor> WiredTigerRecordStore::getCursor(Operation
bool forward) const {
if (_isOplog && forward) {
WiredTigerRecoveryUnit* wru = WiredTigerRecoveryUnit::get(txn);
- if (!wru->inActiveTxn() || wru->getOplogReadTill().isNull()) {
- // if we don't have a session, we have no snapshot, so we can update our view
- _oplogSetStartHack(wru);
+ // If we already have a snapshot we don't know what it can see, unless we know no one
+ // else could be writing (because we hold an exclusive lock).
+ if (wru->inActiveTxn() && !txn->lockState()->isNoop() &&
+ !txn->lockState()->isCollectionLockedForMode(_ns, MODE_X)) {
+ throw WriteConflictException();
}
+ _oplogSetStartHack(wru);
}
return stdx::make_unique<Cursor>(txn, *this, forward);
@@ -1627,22 +1631,78 @@ public:
: _rs(rs), _it(it) {}
virtual void commit() {
+ _rs->_dealtWithCappedId(_it, true);
// Do not notify here because all committed inserts notify, always.
- _rs->_dealtWithCappedId(_it);
}
virtual void rollback() {
// Notify on rollback since it might make later commits visible.
- _rs->_dealtWithCappedId(_it);
+ _rs->_dealtWithCappedId(_it, false);
+ stdx::lock_guard<stdx::mutex> lk(_rs->_cappedCallbackMutex);
if (_rs->_cappedCallback)
_rs->_cappedCallback->notifyCappedWaitersIfNeeded();
}
private:
- WiredTigerRecordStore* _rs;
- SortedRecordIds::iterator _it;
+ WiredTigerRecordStore* const _rs;
+ const SortedRecordIds::iterator _it;
};
+void WiredTigerRecordStore::_oplogJournalThreadLoop(WiredTigerSessionCache* sessionCache) try {
+ Client::initThread("WTOplogJournalThread");
+ while (true) {
+ stdx::unique_lock<stdx::mutex> lk(_uncommittedRecordIdsMutex);
+ _opsWaitingForJournalCV.wait(
+ lk, [&] { return _shuttingDown || !_opsWaitingForJournal.empty(); });
+
+ while (!_shuttingDown && MONGO_FAIL_POINT(WTPausePrimaryOplogDurabilityLoop)) {
+ lk.unlock();
+ sleepmillis(10);
+ lk.lock();
+ }
+
+ if (_shuttingDown)
+ return;
+
+ decltype(_opsWaitingForJournal) opsAboutToBeJournaled = {};
+ _opsWaitingForJournal.swap(opsAboutToBeJournaled);
+
+ lk.unlock();
+ sessionCache->waitUntilDurable(/*forceCheckpoint=*/false);
+ lk.lock();
+
+ for (auto&& op : opsAboutToBeJournaled) {
+ _uncommittedRecordIds.erase(op);
+ }
+
+ _opsBecameVisibleCV.notify_all();
+ lk.unlock();
+
+ stdx::lock_guard<stdx::mutex> cappedCallbackLock(_cappedCallbackMutex);
+ if (_cappedCallback) {
+ _cappedCallback->notifyCappedWaitersIfNeeded();
+ }
+ }
+} catch (...) {
+ std::terminate();
+}
+
+void WiredTigerRecordStore::waitForAllEarlierOplogWritesToBeVisible(OperationContext* txn) const {
+ invariant(txn->lockState()->isNoop() || !txn->lockState()->inAWriteUnitOfWork());
+
+ // This function must not start a WT transaction, otherwise we will get stuck in an infinite
+ // loop of WCE handling when the getCursor() is called.
+
+ stdx::unique_lock<stdx::mutex> lk(_uncommittedRecordIdsMutex);
+ const auto waitingFor = _oplog_highestSeen;
+ while (!_uncommittedRecordIds.empty() && _uncommittedRecordIds.front() <= waitingFor) {
+ // We can't use a simple wait() here because we need to wake up periodically to check for
+ // interrupt and OperationContext::waitForConditionOrInterrupt doesn't exist on this branch.
+ txn->checkForInterrupt();
+ _opsBecameVisibleCV.wait_for(lk, Microseconds(Seconds(10)));
+ }
+}
+
void WiredTigerRecordStore::_addUncommitedRecordId_inlock(OperationContext* txn,
const RecordId& id) {
// todo: make this a dassert at some point
@@ -1784,13 +1844,17 @@ void WiredTigerRecordStore::temp_cappedTruncateAfter(OperationContext* txn,
}
// Compute the number and associated sizes of the records to delete.
- do {
- if (_cappedCallback) {
- uassertStatusOK(_cappedCallback->aboutToDeleteCapped(txn, record->id, record->data));
- }
- recordsRemoved++;
- bytesRemoved += record->data.size();
- } while ((record = cursor.next()));
+ {
+ stdx::lock_guard<stdx::mutex> cappedCallbackLock(_cappedCallbackMutex);
+ do {
+ if (_cappedCallback) {
+ uassertStatusOK(
+ _cappedCallback->aboutToDeleteCapped(txn, record->id, record->data));
+ }
+ recordsRemoved++;
+ bytesRemoved += record->data.size();
+ } while ((record = cursor.next()));
+ }
// Truncate the collection starting from the record located at 'firstRemovedId' to the end of
// the collection.
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h
index 3335e774c4c..1fab25f8be4 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h
@@ -39,8 +39,10 @@
#include "mongo/db/storage/capped_callback.h"
#include "mongo/db/storage/record_store.h"
#include "mongo/platform/atomic_word.h"
+#include "mongo/stdx/condition_variable.h"
#include "mongo/stdx/mutex.h"
#include "mongo/util/concurrency/synchronization.h"
+#include "mongo/stdx/thread.h"
#include "mongo/util/fail_point_service.h"
/**
@@ -53,6 +55,7 @@ namespace mongo {
class RecoveryUnit;
class WiredTigerCursor;
+class WiredTigerSessionCache;
class WiredTigerRecoveryUnit;
class WiredTigerSizeStorer;
@@ -193,6 +196,9 @@ public:
long long numRecords,
long long dataSize);
+
+ void waitForAllEarlierOplogWritesToBeVisible(OperationContext* txn) const override;
+
bool isOplog() const {
return _isOplog;
}
@@ -201,8 +207,10 @@ public:
}
void setCappedCallback(CappedCallback* cb) {
+ stdx::lock_guard<stdx::mutex> lk(_cappedCallbackMutex);
_cappedCallback = cb;
}
+
int64_t cappedMaxDocs() const;
int64_t cappedMaxSize() const;
@@ -255,7 +263,7 @@ private:
static int64_t _makeKey(const RecordId& id);
static RecordId _fromKey(int64_t k);
- void _dealtWithCappedId(SortedRecordIds::iterator it);
+ void _dealtWithCappedId(SortedRecordIds::iterator it, bool didCommit);
void _addUncommitedRecordId_inlock(OperationContext* txn, const RecordId& id);
RecordId _nextId();
@@ -265,6 +273,7 @@ private:
void _increaseDataSize(OperationContext* txn, int64_t amount);
RecordData _getData(const WiredTigerCursor& cursor) const;
void _oplogSetStartHack(WiredTigerRecoveryUnit* wru) const;
+ void _oplogJournalThreadLoop(WiredTigerSessionCache* sessionCache);
const std::string _uri;
const uint64_t _tableId; // not persisted
@@ -284,6 +293,7 @@ private:
AtomicInt64 _cappedSleep;
AtomicInt64 _cappedSleepMS;
CappedCallback* _cappedCallback;
+ stdx::mutex _cappedCallbackMutex; // guards _cappedCallback.
// See comment in ::cappedDeleteAsNeeded
int _cappedDeleteCheckCount;
@@ -292,7 +302,6 @@ private:
const bool _useOplogHack;
SortedRecordIds _uncommittedRecordIds;
- RecordId _oplog_visibleTo;
RecordId _oplog_highestSeen;
mutable stdx::mutex _uncommittedRecordIdsMutex;
@@ -307,8 +316,19 @@ private:
// Non-null if this record store is underlying the active oplog.
std::shared_ptr<OplogStones> _oplogStones;
+
+ // These use the _uncommittedRecordIdsMutex and are only used when _isOplog is true.
+ stdx::condition_variable _opsWaitingForJournalCV;
+ mutable stdx::condition_variable _opsBecameVisibleCV;
+ std::vector<SortedRecordIds::iterator> _opsWaitingForJournal;
+ stdx::thread _oplogJournalThread;
};
// WT failpoint to throw write conflict exceptions randomly
MONGO_FP_FORWARD_DECLARE(WTWriteConflictException);
+
+// Prevents oplog writes from being considered durable on the primary. Once activated, new writes
+// will not be considered durable until deactivated. It is unspecified whether writes that commit
+// before activation will become visible while active.
+MONGO_FP_FORWARD_DECLARE(WTPausePrimaryOplogDurabilityLoop);
}
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_test.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_test.cpp
index 7dcc4033f70..b65c933e744 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_test.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_test.cpp
@@ -48,6 +48,8 @@
#include "mongo/db/storage/wiredtiger/wiredtiger_util.h"
#include "mongo/unittest/temp_dir.h"
#include "mongo/unittest/unittest.h"
+#include "mongo/util/fail_point.h"
+#include "mongo/util/scopeguard.h"
namespace mongo {
@@ -754,7 +756,7 @@ TEST(WiredTigerRecordStoreTest, CappedCursorRollover) {
ASSERT(!cursor->next());
}
-RecordId _oplogOrderInsertOplog(OperationContext* txn, unique_ptr<RecordStore>& rs, int inc) {
+RecordId _oplogOrderInsertOplog(OperationContext* txn, const unique_ptr<RecordStore>& rs, int inc) {
Timestamp opTime = Timestamp(5, inc);
WiredTigerRecordStore* wrs = checked_cast<WiredTigerRecordStore*>(rs.get());
Status status = wrs->oplogDiskLocRegister(txn, opTime);
@@ -831,6 +833,8 @@ TEST(WiredTigerRecordStoreTest, OplogOrder) {
w1.commit();
}
+ rs->waitForAllEarlierOplogWritesToBeVisible(harnessHelper->newOperationContext().get());
+
{ // now all 3 docs should be visible
unique_ptr<OperationContext> opCtx(harnessHelper->newOperationContext());
auto cursor = rs->getCursor(opCtx.get());
@@ -885,6 +889,8 @@ TEST(WiredTigerRecordStoreTest, OplogOrder) {
w1.commit();
}
+ rs->waitForAllEarlierOplogWritesToBeVisible(harnessHelper->newOperationContext().get());
+
{ // now all 3 docs should be visible
unique_ptr<OperationContext> opCtx(harnessHelper->newOperationContext());
auto cursor = rs->getCursor(opCtx.get());
@@ -896,6 +902,83 @@ TEST(WiredTigerRecordStoreTest, OplogOrder) {
}
}
+// Test that even when the oplog durability loop is paused, we can still advance the commit point as
+// long as the commit for each insert comes before the next insert starts.
+TEST(WiredTigerRecordStoreTest, OplogDurableVisibilityInOrder) {
+ ON_BLOCK_EXIT([] { WTPausePrimaryOplogDurabilityLoop.setMode(FailPoint::off); });
+ WTPausePrimaryOplogDurabilityLoop.setMode(FailPoint::alwaysOn);
+
+ unique_ptr<WiredTigerHarnessHelper> harnessHelper(new WiredTigerHarnessHelper());
+ unique_ptr<RecordStore> rs(harnessHelper->newCappedRecordStore("local.oplog.foo", 100000, -1));
+ auto wtrs = checked_cast<WiredTigerRecordStore*>(rs.get());
+
+ {
+ auto opCtx(harnessHelper->newOperationContext());
+ WriteUnitOfWork uow(opCtx.get());
+ RecordId id = _oplogOrderInsertOplog(opCtx.get(), rs, 1);
+ ASSERT(wtrs->isCappedHidden(id));
+ uow.commit();
+ ASSERT(!wtrs->isCappedHidden(id));
+ }
+
+ {
+ auto opCtx(harnessHelper->newOperationContext());
+ WriteUnitOfWork uow(opCtx.get());
+ RecordId id = _oplogOrderInsertOplog(opCtx.get(), rs, 2);
+ ASSERT(wtrs->isCappedHidden(id));
+ uow.commit();
+ ASSERT(!wtrs->isCappedHidden(id));
+ }
+}
+
+// Test that Oplog entries inserted while there are hidden entries do not become visible until the
+// op and all earlier ops are durable.
+TEST(WiredTigerRecordStoreTest, OplogDurableVisibilityOutOfOrder) {
+ ON_BLOCK_EXIT([] { WTPausePrimaryOplogDurabilityLoop.setMode(FailPoint::off); });
+ WTPausePrimaryOplogDurabilityLoop.setMode(FailPoint::alwaysOn);
+
+ unique_ptr<WiredTigerHarnessHelper> harnessHelper(new WiredTigerHarnessHelper());
+ unique_ptr<RecordStore> rs(harnessHelper->newCappedRecordStore("local.oplog.foo", 100000, -1));
+
+ auto wtrs = checked_cast<WiredTigerRecordStore*>(rs.get());
+
+ auto longLivedOp(harnessHelper->newOperationContext());
+ WriteUnitOfWork uow(longLivedOp.get());
+ RecordId id1 = _oplogOrderInsertOplog(longLivedOp.get(), rs, 1);
+ ASSERT(wtrs->isCappedHidden(id1));
+
+
+ RecordId id2;
+ {
+ auto innerClient = harnessHelper->serviceContext()->makeClient("inner");
+ auto opCtx(harnessHelper->newOperationContext(innerClient.get()));
+ WriteUnitOfWork uow(opCtx.get());
+ id2 = _oplogOrderInsertOplog(opCtx.get(), rs, 2);
+ ASSERT(wtrs->isCappedHidden(id2));
+ uow.commit();
+ }
+
+ ASSERT(wtrs->isCappedHidden(id1));
+ ASSERT(wtrs->isCappedHidden(id2));
+
+ uow.commit();
+
+ ASSERT(wtrs->isCappedHidden(id1));
+ ASSERT(wtrs->isCappedHidden(id2));
+
+ // Wait a bit and check again to make sure they don't become visible automatically.
+ sleepsecs(1);
+ ASSERT(wtrs->isCappedHidden(id1));
+ ASSERT(wtrs->isCappedHidden(id2));
+
+ WTPausePrimaryOplogDurabilityLoop.setMode(FailPoint::off);
+
+ rs->waitForAllEarlierOplogWritesToBeVisible(longLivedOp.get());
+
+ ASSERT(!wtrs->isCappedHidden(id1));
+ ASSERT(!wtrs->isCappedHidden(id2));
+}
+
TEST(WiredTigerRecordStoreTest, StorageSizeStatisticsDisabled) {
WiredTigerHarnessHelper harnessHelper("statistics=(none)");
unique_ptr<RecordStore> rs(harnessHelper.newNonCappedRecordStore("a.b"));
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp
index 8c002cc9e57..58fd74c2ee0 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp
@@ -213,6 +213,7 @@ void WiredTigerRecoveryUnit::_txnClose(bool commit) {
}
_active = false;
_mySnapshotId = nextSnapshotId.fetchAndAdd(1);
+ _oplogReadTill = RecordId();
}
SnapshotId WiredTigerRecoveryUnit::getSnapshotId() const {
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp
index f6a1ccca150..54fff70be4d 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_session_cache.cpp
@@ -211,7 +211,7 @@ void WiredTigerSessionCache::waitUntilDurable(bool forceCheckpoint) {
JournalListener::Token token = _journalListener->getToken();
// Use the journal when available, or a checkpoint otherwise.
- if (_engine->isDurable()) {
+ if (_engine && _engine->isDurable()) {
invariantWTOK(s->log_flush(s, "sync=on"));
LOG(4) << "flushed journal";
} else {
diff --git a/src/mongo/dbtests/repltests.cpp b/src/mongo/dbtests/repltests.cpp
index f53a2545968..f01c2d88fef 100644
--- a/src/mongo/dbtests/repltests.cpp
+++ b/src/mongo/dbtests/repltests.cpp
@@ -152,37 +152,17 @@ protected:
return count;
}
int opCount() {
- ScopedTransaction transaction(&_txn, MODE_X);
- Lock::GlobalWrite lk(_txn.lockState());
- OldClientContext ctx(&_txn, cllNS());
-
- Database* db = ctx.db();
- Collection* coll = db->getCollection(cllNS());
- if (!coll) {
- WriteUnitOfWork wunit(&_txn);
- coll = db->createCollection(&_txn, cllNS());
- wunit.commit();
- }
-
- int count = 0;
- auto cursor = coll->getCursor(&_txn);
- while (auto record = cursor->next()) {
- ++count;
- }
- return count;
+ return DBDirectClient(&_txn).query(cllNS(), BSONObj())->itcount();
}
void applyAllOperations() {
ScopedTransaction transaction(&_txn, MODE_X);
Lock::GlobalWrite lk(_txn.lockState());
vector<BSONObj> ops;
{
- OldClientContext ctx(&_txn, cllNS());
- Database* db = ctx.db();
- Collection* coll = db->getCollection(cllNS());
-
- auto cursor = coll->getCursor(&_txn);
- while (auto record = cursor->next()) {
- ops.push_back(record->data.releaseToBson().getOwned());
+ DBDirectClient db(&_txn);
+ auto cursor = db.query(cllNS(), BSONObj());
+ while (cursor->more()) {
+ ops.push_back(cursor->nextSafe().getOwned());
}
}
{