diff options
4 files changed, 81 insertions, 0 deletions
diff --git a/src/mongo/db/storage/record_store.h b/src/mongo/db/storage/record_store.h index e690b35a618..5bf95d6cb36 100644 --- a/src/mongo/db/storage/record_store.h +++ b/src/mongo/db/storage/record_store.h @@ -530,6 +530,18 @@ public: MONGO_UNREACHABLE; } + /** + * If supported, this method returns the timestamp value for the latest storage engine committed + * oplog document. Note that this method will not include uncommitted writes on the input + * OperationContext. A new transaction is always created and destroyed to service this call. + * + * Unsupported RecordStores return the OplogOperationUnsupported error code. + */ + virtual StatusWith<Timestamp> getLatestOplogTimestamp(OperationContext* opCtx) const { + return Status(ErrorCodes::OplogOperationUnsupported, + "The current storage engine doesn't support an optimized implementation for " + "getting the latest oplog timestamp."); + } protected: std::string _ns; diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp index 6defe16f35e..82aa7dc61c5 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp @@ -1362,6 +1362,24 @@ void WiredTigerRecordStore::notifyCappedWaitersIfNeeded() { } } +StatusWith<Timestamp> WiredTigerRecordStore::getLatestOplogTimestamp( + OperationContext* opCtx) const { + invariant(_isOplog); + invariant(opCtx->lockState()->isReadLocked()); + + WiredTigerSessionCache* cache = WiredTigerRecoveryUnit::get(opCtx)->getSessionCache(); + auto sessRaii = cache->getSession(); + WT_SESSION* sess = sessRaii->getSession(); + WT_CURSOR* cursor; + invariantWTOK(sess->open_cursor(sess, _uri.c_str(), nullptr, nullptr, &cursor)); + invariantWTOK(cursor->prev(cursor)); + + RecordId recordId = getKey(cursor); + invariantWTOK(sess->reset(sess)); + + return {Timestamp(static_cast<unsigned long long>(recordId.repr()))}; +} + Status WiredTigerRecordStore::updateRecord(OperationContext* opCtx, const RecordId& id, const char* data, diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h index e647af70929..30985dcd465 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h @@ -250,6 +250,8 @@ public: void reclaimOplog(OperationContext* opCtx) override; + StatusWith<Timestamp> getLatestOplogTimestamp(OperationContext* opCtx) const override; + /** * The `recoveryTimestamp` is when replication recovery would need to replay from for * recoverable rollback, or restart for durable engines. `reclaimOplog` will not diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_test.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_test.cpp index 36094cf945a..9225fd8bc80 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_test.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_test.cpp @@ -866,5 +866,54 @@ TEST(WiredTigerRecordStoreTest, OplogStones_AscendingOrder) { } } +TEST(WiredTigerRecordStoreTest, GetLatestOplogTest) { + unique_ptr<RecordStoreHarnessHelper> harnessHelper(newRecordStoreHarnessHelper()); + unique_ptr<RecordStore> rs(harnessHelper->newCappedRecordStore("local.oplog.rs", 100000, -1)); + + auto wtrs = checked_cast<WiredTigerRecordStore*>(rs.get()); + + // 1) Initialize the top of oplog to "1". + ServiceContext::UniqueOperationContext op1(harnessHelper->newOperationContext()); + op1->recoveryUnit()->beginUnitOfWork(op1.get()); + Timestamp tsOne = + Timestamp(static_cast<unsigned long long>(_oplogOrderInsertOplog(op1.get(), rs, 1).repr())); + op1->recoveryUnit()->commitUnitOfWork(); + // Asserting on a recovery unit without a snapshot. + ASSERT_EQ(tsOne, wtrs->getLatestOplogTimestamp(op1.get())); + + // 2) Open a hole at time "2". + op1->recoveryUnit()->beginUnitOfWork(op1.get()); + // Don't save the return value because the compiler complains about unused variables. + _oplogOrderInsertOplog(op1.get(), rs, 2); + // Querying with the recovery unit with a snapshot will not return the uncommitted value. + ASSERT_EQ(tsOne, wtrs->getLatestOplogTimestamp(op1.get())); + + // Store the client with an uncommitted transaction. Create a new, concurrent client. + auto client1 = Client::releaseCurrent(); + Client::initThread("client2"); + + ServiceContext::UniqueOperationContext op2(harnessHelper->newOperationContext()); + op2->recoveryUnit()->beginUnitOfWork(op2.get()); + Timestamp tsThree = + Timestamp(static_cast<unsigned long long>(_oplogOrderInsertOplog(op2.get(), rs, 3).repr())); + // Before committing, the query still only sees timestamp "1". + ASSERT_EQ(tsOne, wtrs->getLatestOplogTimestamp(op2.get())); + op2->recoveryUnit()->commitUnitOfWork(); + // After committing, three is the top of oplog. + ASSERT_EQ(tsThree, wtrs->getLatestOplogTimestamp(op2.get())); + + // Destroy client2. + op2.reset(); + Client::releaseCurrent(); + // Reinstall client 1. + Client::setCurrent(std::move(client1)); + + // A new query with client 1 will see timestamp "3". + ASSERT_EQ(tsThree, wtrs->getLatestOplogTimestamp(op1.get())); + op1->recoveryUnit()->commitUnitOfWork(); + // Committing the write at timestamp "2" does not change the top of oplog result. + ASSERT_EQ(tsThree, wtrs->getLatestOplogTimestamp(op1.get())); +} + } // namespace } // namespace mongo |