summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDaniel Gottlieb <daniel.gottlieb@mongodb.com>2019-10-23 17:48:25 +0000
committerevergreen <evergreen@mongodb.com>2019-10-23 17:48:25 +0000
commit19b9e3fc31506fd6d8e2f629e5d3f3d0dbf80918 (patch)
tree3a84d3ff1ce38ec9466ec51a3d046b09e81853d9
parent7e7a45333cbab5ded49543ee2b09611f77792a51 (diff)
downloadmongo-19b9e3fc31506fd6d8e2f629e5d3f3d0dbf80918.tar.gz
SERVER-43656: Add an optimized method for querying the latest entry in the oplog.
-rw-r--r--src/mongo/db/storage/record_store.h12
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp18
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h2
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_record_store_test.cpp49
4 files changed, 81 insertions, 0 deletions
diff --git a/src/mongo/db/storage/record_store.h b/src/mongo/db/storage/record_store.h
index e690b35a618..5bf95d6cb36 100644
--- a/src/mongo/db/storage/record_store.h
+++ b/src/mongo/db/storage/record_store.h
@@ -530,6 +530,18 @@ public:
MONGO_UNREACHABLE;
}
+ /**
+ * If supported, this method returns the timestamp value for the latest storage engine committed
+ * oplog document. Note that this method will not include uncommitted writes on the input
+ * OperationContext. A new transaction is always created and destroyed to service this call.
+ *
+ * Unsupported RecordStores return the OplogOperationUnsupported error code.
+ */
+ virtual StatusWith<Timestamp> getLatestOplogTimestamp(OperationContext* opCtx) const {
+ return Status(ErrorCodes::OplogOperationUnsupported,
+ "The current storage engine doesn't support an optimized implementation for "
+ "getting the latest oplog timestamp.");
+ }
protected:
std::string _ns;
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp
index 6defe16f35e..82aa7dc61c5 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp
@@ -1362,6 +1362,24 @@ void WiredTigerRecordStore::notifyCappedWaitersIfNeeded() {
}
}
+StatusWith<Timestamp> WiredTigerRecordStore::getLatestOplogTimestamp(
+ OperationContext* opCtx) const {
+ invariant(_isOplog);
+ invariant(opCtx->lockState()->isReadLocked());
+
+ WiredTigerSessionCache* cache = WiredTigerRecoveryUnit::get(opCtx)->getSessionCache();
+ auto sessRaii = cache->getSession();
+ WT_SESSION* sess = sessRaii->getSession();
+ WT_CURSOR* cursor;
+ invariantWTOK(sess->open_cursor(sess, _uri.c_str(), nullptr, nullptr, &cursor));
+ invariantWTOK(cursor->prev(cursor));
+
+ RecordId recordId = getKey(cursor);
+ invariantWTOK(sess->reset(sess));
+
+ return {Timestamp(static_cast<unsigned long long>(recordId.repr()))};
+}
+
Status WiredTigerRecordStore::updateRecord(OperationContext* opCtx,
const RecordId& id,
const char* data,
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h
index e647af70929..30985dcd465 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h
@@ -250,6 +250,8 @@ public:
void reclaimOplog(OperationContext* opCtx) override;
+ StatusWith<Timestamp> getLatestOplogTimestamp(OperationContext* opCtx) const override;
+
/**
* The `recoveryTimestamp` is when replication recovery would need to replay from for
* recoverable rollback, or restart for durable engines. `reclaimOplog` will not
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_test.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_test.cpp
index 36094cf945a..9225fd8bc80 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_test.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_test.cpp
@@ -866,5 +866,54 @@ TEST(WiredTigerRecordStoreTest, OplogStones_AscendingOrder) {
}
}
+TEST(WiredTigerRecordStoreTest, GetLatestOplogTest) {
+ unique_ptr<RecordStoreHarnessHelper> harnessHelper(newRecordStoreHarnessHelper());
+ unique_ptr<RecordStore> rs(harnessHelper->newCappedRecordStore("local.oplog.rs", 100000, -1));
+
+ auto wtrs = checked_cast<WiredTigerRecordStore*>(rs.get());
+
+ // 1) Initialize the top of oplog to "1".
+ ServiceContext::UniqueOperationContext op1(harnessHelper->newOperationContext());
+ op1->recoveryUnit()->beginUnitOfWork(op1.get());
+ Timestamp tsOne =
+ Timestamp(static_cast<unsigned long long>(_oplogOrderInsertOplog(op1.get(), rs, 1).repr()));
+ op1->recoveryUnit()->commitUnitOfWork();
+ // Asserting on a recovery unit without a snapshot.
+ ASSERT_EQ(tsOne, wtrs->getLatestOplogTimestamp(op1.get()));
+
+ // 2) Open a hole at time "2".
+ op1->recoveryUnit()->beginUnitOfWork(op1.get());
+ // Don't save the return value because the compiler complains about unused variables.
+ _oplogOrderInsertOplog(op1.get(), rs, 2);
+ // Querying with the recovery unit with a snapshot will not return the uncommitted value.
+ ASSERT_EQ(tsOne, wtrs->getLatestOplogTimestamp(op1.get()));
+
+ // Store the client with an uncommitted transaction. Create a new, concurrent client.
+ auto client1 = Client::releaseCurrent();
+ Client::initThread("client2");
+
+ ServiceContext::UniqueOperationContext op2(harnessHelper->newOperationContext());
+ op2->recoveryUnit()->beginUnitOfWork(op2.get());
+ Timestamp tsThree =
+ Timestamp(static_cast<unsigned long long>(_oplogOrderInsertOplog(op2.get(), rs, 3).repr()));
+ // Before committing, the query still only sees timestamp "1".
+ ASSERT_EQ(tsOne, wtrs->getLatestOplogTimestamp(op2.get()));
+ op2->recoveryUnit()->commitUnitOfWork();
+ // After committing, three is the top of oplog.
+ ASSERT_EQ(tsThree, wtrs->getLatestOplogTimestamp(op2.get()));
+
+ // Destroy client2.
+ op2.reset();
+ Client::releaseCurrent();
+ // Reinstall client 1.
+ Client::setCurrent(std::move(client1));
+
+ // A new query with client 1 will see timestamp "3".
+ ASSERT_EQ(tsThree, wtrs->getLatestOplogTimestamp(op1.get()));
+ op1->recoveryUnit()->commitUnitOfWork();
+ // Committing the write at timestamp "2" does not change the top of oplog result.
+ ASSERT_EQ(tsThree, wtrs->getLatestOplogTimestamp(op1.get()));
+}
+
} // namespace
} // namespace mongo