diff options
22 files changed, 106 insertions, 20 deletions
diff --git a/jstests/noPassthroughWithMongod/query_oplogreplay.js b/jstests/noPassthroughWithMongod/query_oplogreplay.js index a06a7bc2c8e..36e286f917d 100644 --- a/jstests/noPassthroughWithMongod/query_oplogreplay.js +++ b/jstests/noPassthroughWithMongod/query_oplogreplay.js @@ -199,9 +199,11 @@ assert(!cursor.hasNext()); } + jsTestLog("Non-oplog."); // Test that oplog replay on a non-oplog collection succeeds. test(db.jstests_query_oplogreplay); + jsTestLog("Oplog."); // Test that oplog replay on the actual oplog succeeds. test(db.getSiblingDB("local").oplog.jstests_query_oplogreplay); diff --git a/src/mongo/db/repl/initial_syncer.cpp b/src/mongo/db/repl/initial_syncer.cpp index 785dfa51864..7fac9df71c1 100644 --- a/src/mongo/db/repl/initial_syncer.cpp +++ b/src/mongo/db/repl/initial_syncer.cpp @@ -901,6 +901,9 @@ void InitialSyncer::_lastOplogEntryFetcherCallbackForStopTimestamp( onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status); return; } + const bool orderedCommit = true; + _storage->oplogDiskLocRegister( + opCtx.get(), optimeWithHash.opTime.getTimestamp(), orderedCommit); } stdx::lock_guard<stdx::mutex> lock(_mutex); diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index ec622e6ff1b..32eda65ce6b 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -158,8 +158,8 @@ void _getNextOpTimes(OperationContext* opCtx, auto ts = LogicalClock::get(opCtx)->reserveTicks(count).asTimestamp(); lastSetTimestamp = ts; newTimestampNotifier.notify_all(); - - fassert(28560, oplog->getRecordStore()->oplogDiskLocRegister(opCtx, ts)); + const bool orderedCommit = false; + fassert(28560, oplog->getRecordStore()->oplogDiskLocRegister(opCtx, ts, orderedCommit)); for (std::size_t i = 0; i < count; i++) { slotsOut[i].opTime = {Timestamp(ts.asULL() + i), term}; diff --git a/src/mongo/db/repl/storage_interface.h b/src/mongo/db/repl/storage_interface.h index be79c228483..a65dd305fac 100644 --- a/src/mongo/db/repl/storage_interface.h +++ b/src/mongo/db/repl/storage_interface.h @@ -371,6 +371,16 @@ public: * batch. */ virtual void waitForAllEarlierOplogWritesToBeVisible(OperationContext* opCtx) = 0; + + /** + * Registers a timestamp with the storage engine so that it can enforce oplog visiblity rules. + * orderedCommit - specifies whether the timestamp provided is ordered w.r.t. commits; that is, + * all commits with older timestamps have already occurred, and any commits with newer + * timestamps have not yet occurred. + */ + virtual void oplogDiskLocRegister(OperationContext* opCtx, + const Timestamp& ts, + bool orderedCommit) = 0; }; } // namespace repl diff --git a/src/mongo/db/repl/storage_interface_impl.cpp b/src/mongo/db/repl/storage_interface_impl.cpp index 58a6eaf21f2..71442a94c57 100644 --- a/src/mongo/db/repl/storage_interface_impl.cpp +++ b/src/mongo/db/repl/storage_interface_impl.cpp @@ -1096,5 +1096,13 @@ void StorageInterfaceImpl::waitForAllEarlierOplogWritesToBeVisible(OperationCont oplog.getCollection()->getRecordStore()->waitForAllEarlierOplogWritesToBeVisible(opCtx); } +void StorageInterfaceImpl::oplogDiskLocRegister(OperationContext* opCtx, + const Timestamp& ts, + bool orderedCommit) { + AutoGetCollection oplog(opCtx, NamespaceString::kRsOplogNamespace, MODE_IS); + fassert( + 28557, + oplog.getCollection()->getRecordStore()->oplogDiskLocRegister(opCtx, ts, orderedCommit)); +} } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/storage_interface_impl.h b/src/mongo/db/repl/storage_interface_impl.h index 761875dd0bf..4d0b6f32db9 100644 --- a/src/mongo/db/repl/storage_interface_impl.h +++ b/src/mongo/db/repl/storage_interface_impl.h @@ -168,6 +168,9 @@ public: Status isAdminDbValid(OperationContext* opCtx) override; void waitForAllEarlierOplogWritesToBeVisible(OperationContext* opCtx) override; + void oplogDiskLocRegister(OperationContext* opCtx, + const Timestamp& ts, + bool orderedCommit) override; private: const NamespaceString _rollbackIdNss; diff --git a/src/mongo/db/repl/storage_interface_mock.h b/src/mongo/db/repl/storage_interface_mock.h index b2b3d22f276..2e36f4a78cc 100644 --- a/src/mongo/db/repl/storage_interface_mock.h +++ b/src/mongo/db/repl/storage_interface_mock.h @@ -305,6 +305,12 @@ public: return; } + void oplogDiskLocRegister(OperationContext* opCtx, + const Timestamp& ts, + bool orderedCommit) override { + return; + } + // Testing functions. CreateCollectionForBulkFn createCollectionForBulkFn = [](const NamespaceString& nss, diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index 6c95ca70e47..7e1e5b5d3ae 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -900,7 +900,12 @@ void SyncTail::oplogApplication(OplogBuffer* oplogBuffer, ReplicationCoordinator << lastAppliedOpTimeAtEndOfBatch.toString() << " in the middle of batch application"); - // 4. Finalize this batch. We are at a consistent optime if our current optime is >= the + // 4. Update oplog visibility by notifying the storage engine of the new oplog entries. + const bool orderedCommit = true; + StorageInterface::get(&opCtx)->oplogDiskLocRegister( + &opCtx, lastOpTimeInBatch.getTimestamp(), orderedCommit); + + // 5. Finalize this batch. We are at a consistent optime if our current optime is >= the // current 'minValid' optime. auto consistency = (lastOpTimeInBatch >= minValid) ? ReplicationCoordinator::DataConsistency::Consistent diff --git a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.h b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.h index ad6f9091295..87d46bef8a9 100644 --- a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.h +++ b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.h @@ -74,6 +74,8 @@ public: return SnapshotId(); } + virtual void setOrderedCommit(bool orderedCommit) {} + private: typedef std::shared_ptr<Change> ChangePtr; typedef std::vector<ChangePtr> Changes; diff --git a/src/mongo/db/storage/mmap_v1/dur_recovery_unit.h b/src/mongo/db/storage/mmap_v1/dur_recovery_unit.h index 52f717d29b2..b2c6dc0f20c 100644 --- a/src/mongo/db/storage/mmap_v1/dur_recovery_unit.h +++ b/src/mongo/db/storage/mmap_v1/dur_recovery_unit.h @@ -65,6 +65,8 @@ public: return SnapshotId(); } + virtual void setOrderedCommit(bool orderedCommit) {} + private: /** * Marks writes for journaling, if enabled, and then commits all other Changes in order. diff --git a/src/mongo/db/storage/mmap_v1/heap_record_store_btree.h b/src/mongo/db/storage/mmap_v1/heap_record_store_btree.h index c5d5299a9b7..507f16c8ec0 100644 --- a/src/mongo/db/storage/mmap_v1/heap_record_store_btree.h +++ b/src/mongo/db/storage/mmap_v1/heap_record_store_btree.h @@ -208,6 +208,8 @@ public: return SnapshotId(); } + virtual void setOrderedCommit(bool orderedCommit) {} + // ----------------------- void notifyInsert(HeapRecordStoreBtree* rs, const RecordId& loc); diff --git a/src/mongo/db/storage/mobile/mobile_recovery_unit.h b/src/mongo/db/storage/mobile/mobile_recovery_unit.h index e6cf5c9415c..a919276351f 100644 --- a/src/mongo/db/storage/mobile/mobile_recovery_unit.h +++ b/src/mongo/db/storage/mobile/mobile_recovery_unit.h @@ -87,6 +87,8 @@ public: return checked_cast<MobileRecoveryUnit*>(opCtx->recoveryUnit()); } + void setOrderedCommit(bool orderedCommit) override {} + private: void _abort(); void _commit(); diff --git a/src/mongo/db/storage/record_store.h b/src/mongo/db/storage/record_store.h index c84dbdcf1f3..e48297d530b 100644 --- a/src/mongo/db/storage/record_store.h +++ b/src/mongo/db/storage/record_store.h @@ -624,8 +624,15 @@ public: * * Since this is called inside of a WriteUnitOfWork while holding a std::mutex, it is * illegal to acquire any LockManager locks inside of this function. - */ - virtual Status oplogDiskLocRegister(OperationContext* opCtx, const Timestamp& opTime) { + * + * If `orderedCommit` is true, the storage engine can assume the input `opTime` has become + * visible in the oplog. Otherwise the storage engine must continue to maintain its own + * visibility management. Calls with `orderedCommit` true will not be concurrent with calls of + * `orderedCommit` false. + */ + virtual Status oplogDiskLocRegister(OperationContext* opCtx, + const Timestamp& opTime, + bool orderedCommit) { return Status::OK(); } diff --git a/src/mongo/db/storage/recovery_unit.h b/src/mongo/db/storage/recovery_unit.h index 857885fbf18..0ac797e17c3 100644 --- a/src/mongo/db/storage/recovery_unit.h +++ b/src/mongo/db/storage/recovery_unit.h @@ -305,6 +305,8 @@ public: */ virtual void setRollbackWritesDisabled() = 0; + virtual void setOrderedCommit(bool orderedCommit) = 0; + protected: RecoveryUnit() {} repl::ReplicationCoordinator::Mode _replicationMode = repl::ReplicationCoordinator::modeNone; diff --git a/src/mongo/db/storage/recovery_unit_noop.h b/src/mongo/db/storage/recovery_unit_noop.h index d8f7e69adb1..0dbc41de3d5 100644 --- a/src/mongo/db/storage/recovery_unit_noop.h +++ b/src/mongo/db/storage/recovery_unit_noop.h @@ -80,6 +80,8 @@ public: return SnapshotId(); } + virtual void setOrderedCommit(bool orderedCommit) {} + private: std::vector<std::unique_ptr<Change>> _changes; }; diff --git a/src/mongo/db/storage/wiredtiger/record_store_test_oplog.cpp b/src/mongo/db/storage/wiredtiger/record_store_test_oplog.cpp index 90de4f17d61..d85d6e35e16 100644 --- a/src/mongo/db/storage/wiredtiger/record_store_test_oplog.cpp +++ b/src/mongo/db/storage/wiredtiger/record_store_test_oplog.cpp @@ -41,7 +41,7 @@ StatusWith<RecordId> insertBSON(ServiceContext::UniqueOperationContext& opCtx, const Timestamp& opTime) { BSONObj obj = BSON("ts" << opTime); WriteUnitOfWork wuow(opCtx.get()); - Status status = rs->oplogDiskLocRegister(opCtx.get(), opTime); + Status status = rs->oplogDiskLocRegister(opCtx.get(), opTime, false); if (!status.isOK()) return StatusWith<RecordId>(status); StatusWith<RecordId> res = @@ -55,7 +55,7 @@ RecordId _oplogOrderInsertOplog(OperationContext* opCtx, const std::unique_ptr<RecordStore>& rs, int inc) { Timestamp opTime = Timestamp(5, inc); - Status status = rs->oplogDiskLocRegister(opCtx, opTime); + Status status = rs->oplogDiskLocRegister(opCtx, opTime, false); ASSERT_OK(status); BSONObj obj = BSON("ts" << opTime); StatusWith<RecordId> res = rs->insertRecord(opCtx, obj.objdata(), obj.objsize(), opTime, false); diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_oplog_manager.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_oplog_manager.cpp index cd541d262f0..577e507bcbc 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_oplog_manager.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_oplog_manager.cpp @@ -222,8 +222,11 @@ void WiredTigerOplogManager::_oplogJournalThreadLoop( sessionCache->waitUntilDurable(/*forceCheckpoint=*/false, false); lk.lock(); - // Publish the new timestamp value. - _setOplogReadTimestamp(lk, newTimestamp); + // Publish the new timestamp value. Avoid going backward. + auto oldTimestamp = getOplogReadTimestamp(); + if (newTimestamp > oldTimestamp) { + _setOplogReadTimestamp(lk, newTimestamp); + } lk.unlock(); // Wake up any await_data cursors and tell them more data might be visible now. diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp index 1aaf4f83d94..33ef2418c72 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp @@ -1145,8 +1145,12 @@ Status WiredTigerRecordStore::_insertRecords(OperationContext* opCtx, if (timestamps[i].isNull() && _isOplog) { // If the timestamp is 0, that probably means someone inserted a document directly // into the oplog. In this case, use the RecordId as the timestamp, since they are - // one and the same. + // one and the same. Setting this transaction to be unordered will trigger a journal + // flush. Because these are direct writes into the oplog, the machinery to trigger a + // journal flush is bypassed. A followup oplog read will require a fresh visibility + // value to make progress. ts = Timestamp(record.id.repr()); + opCtx->recoveryUnit()->setOrderedCommit(false); } else { ts = timestamps[i]; } @@ -1684,11 +1688,19 @@ void WiredTigerRecordStore::cappedTruncateAfter(OperationContext* opCtx, } Status WiredTigerRecordStore::oplogDiskLocRegister(OperationContext* opCtx, - const Timestamp& opTime) { - // This labels the current transaction with a timestamp. - // This is required for oplog visibility to work correctly, as WiredTiger uses the transaction - // list to determine where there are holes in the oplog. - return opCtx->recoveryUnit()->setTimestamp(opTime); + const Timestamp& ts, + bool orderedCommit) { + opCtx->recoveryUnit()->setOrderedCommit(orderedCommit); + if (!orderedCommit) { + // This labels the current transaction with a timestamp. + // This is required for oplog visibility to work correctly, as WiredTiger uses the + // transaction list to determine where there are holes in the oplog. + return opCtx->recoveryUnit()->setTimestamp(ts); + } + // This handles non-primary (secondary) state behavior; we simply set the oplog visiblity read + // timestamp here, as there cannot be visible holes prior to the opTime passed in. + _kvEngine->getOplogManager()->setOplogReadTimestamp(ts); + return Status::OK(); } // Cursor Base: diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h index 80b3b44af47..ddec68527c4 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h @@ -213,7 +213,9 @@ public: virtual boost::optional<RecordId> oplogStartHack(OperationContext* opCtx, const RecordId& startingPosition) const; - virtual Status oplogDiskLocRegister(OperationContext* opCtx, const Timestamp& opTime); + virtual Status oplogDiskLocRegister(OperationContext* opCtx, + const Timestamp& opTime, + bool orderedCommit); virtual void updateStatsAfterRepair(OperationContext* opCtx, long long numRecords, diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_test.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_test.cpp index 230d1fc28b6..253b44eb6c4 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_test.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_test.cpp @@ -212,7 +212,7 @@ StatusWith<RecordId> insertBSON(ServiceContext::UniqueOperationContext& opCtx, WriteUnitOfWork wuow(opCtx.get()); WiredTigerRecordStore* wrs = checked_cast<WiredTigerRecordStore*>(rs.get()); invariant(wrs); - Status status = wrs->oplogDiskLocRegister(opCtx.get(), opTime); + Status status = wrs->oplogDiskLocRegister(opCtx.get(), opTime, false); if (!status.isOK()) return StatusWith<RecordId>(status); StatusWith<RecordId> res = @@ -265,7 +265,7 @@ RecordId _oplogOrderInsertOplog(OperationContext* opCtx, const unique_ptr<RecordStore>& rs, int inc) { Timestamp opTime = Timestamp(5, inc); - Status status = rs->oplogDiskLocRegister(opCtx, opTime); + Status status = rs->oplogDiskLocRegister(opCtx, opTime, false); ASSERT_OK(status); BSONObj obj = BSON("ts" << opTime); StatusWith<RecordId> res = rs->insertRecord(opCtx, obj.objdata(), obj.objsize(), opTime, false); @@ -424,7 +424,7 @@ StatusWith<RecordId> insertBSONWithSize(OperationContext* opCtx, WriteUnitOfWork wuow(opCtx); WiredTigerRecordStore* wtrs = checked_cast<WiredTigerRecordStore*>(rs); invariant(wtrs); - Status status = wtrs->oplogDiskLocRegister(opCtx, opTime); + Status status = wtrs->oplogDiskLocRegister(opCtx, opTime, false); if (!status.isOK()) { return StatusWith<RecordId>(status); } diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp index 5fe19158f9e..2e38a052b8b 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp @@ -207,7 +207,12 @@ void WiredTigerRecoveryUnit::_txnClose(bool commit) { } if (_isTimestamped) { - _oplogManager->triggerJournalFlush(); + if (!_orderedCommit) { + // We only need to update oplog visibility where commits can be out-of-order with + // respect to their assigned optime and such commits might otherwise be visible. + // This should happen only on primary nodes. + _oplogManager->triggerJournalFlush(); + } _isTimestamped = false; } invariantWTOK(wtRet); @@ -215,6 +220,7 @@ void WiredTigerRecoveryUnit::_txnClose(bool commit) { _active = false; _mySnapshotId = nextSnapshotId.fetchAndAdd(1); _isOplogReader = false; + _orderedCommit = true; // Default value is true; we assume all writes are ordered. } SnapshotId WiredTigerRecoveryUnit::getSnapshotId() const { diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h index 5207cab95ab..19b110a9f1e 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h @@ -93,6 +93,10 @@ public: void setRollbackWritesDisabled() override {} + virtual void setOrderedCommit(bool orderedCommit) override { + _orderedCommit = orderedCommit; + } + // ---- WT STUFF WiredTigerSession* getSession(); @@ -148,6 +152,9 @@ private: bool _inUnitOfWork; bool _active; bool _isTimestamped = false; + // Commits are assumed ordered. Unordered commits are assumed to always need to reserve a + // new optime, and thus always call oplogDiskLocRegister() on the record store. + bool _orderedCommit = true; Timestamp _commitTimestamp; uint64_t _mySnapshotId; Timestamp _majorityCommittedSnapshot; |