summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--jstests/noPassthroughWithMongod/query_oplogreplay.js2
-rw-r--r--src/mongo/db/repl/initial_syncer.cpp3
-rw-r--r--src/mongo/db/repl/oplog.cpp4
-rw-r--r--src/mongo/db/repl/storage_interface.h10
-rw-r--r--src/mongo/db/repl/storage_interface_impl.cpp8
-rw-r--r--src/mongo/db/repl/storage_interface_impl.h3
-rw-r--r--src/mongo/db/repl/storage_interface_mock.h6
-rw-r--r--src/mongo/db/repl/sync_tail.cpp7
-rw-r--r--src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.h2
-rw-r--r--src/mongo/db/storage/mmap_v1/dur_recovery_unit.h2
-rw-r--r--src/mongo/db/storage/mmap_v1/heap_record_store_btree.h2
-rw-r--r--src/mongo/db/storage/mobile/mobile_recovery_unit.h2
-rw-r--r--src/mongo/db/storage/record_store.h11
-rw-r--r--src/mongo/db/storage/recovery_unit.h2
-rw-r--r--src/mongo/db/storage/recovery_unit_noop.h2
-rw-r--r--src/mongo/db/storage/wiredtiger/record_store_test_oplog.cpp4
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_oplog_manager.cpp7
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp24
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h4
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_record_store_test.cpp6
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp8
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h7
22 files changed, 106 insertions, 20 deletions
diff --git a/jstests/noPassthroughWithMongod/query_oplogreplay.js b/jstests/noPassthroughWithMongod/query_oplogreplay.js
index a06a7bc2c8e..36e286f917d 100644
--- a/jstests/noPassthroughWithMongod/query_oplogreplay.js
+++ b/jstests/noPassthroughWithMongod/query_oplogreplay.js
@@ -199,9 +199,11 @@
assert(!cursor.hasNext());
}
+ jsTestLog("Non-oplog.");
// Test that oplog replay on a non-oplog collection succeeds.
test(db.jstests_query_oplogreplay);
+ jsTestLog("Oplog.");
// Test that oplog replay on the actual oplog succeeds.
test(db.getSiblingDB("local").oplog.jstests_query_oplogreplay);
diff --git a/src/mongo/db/repl/initial_syncer.cpp b/src/mongo/db/repl/initial_syncer.cpp
index 785dfa51864..7fac9df71c1 100644
--- a/src/mongo/db/repl/initial_syncer.cpp
+++ b/src/mongo/db/repl/initial_syncer.cpp
@@ -901,6 +901,9 @@ void InitialSyncer::_lastOplogEntryFetcherCallbackForStopTimestamp(
onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status);
return;
}
+ const bool orderedCommit = true;
+ _storage->oplogDiskLocRegister(
+ opCtx.get(), optimeWithHash.opTime.getTimestamp(), orderedCommit);
}
stdx::lock_guard<stdx::mutex> lock(_mutex);
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp
index ec622e6ff1b..32eda65ce6b 100644
--- a/src/mongo/db/repl/oplog.cpp
+++ b/src/mongo/db/repl/oplog.cpp
@@ -158,8 +158,8 @@ void _getNextOpTimes(OperationContext* opCtx,
auto ts = LogicalClock::get(opCtx)->reserveTicks(count).asTimestamp();
lastSetTimestamp = ts;
newTimestampNotifier.notify_all();
-
- fassert(28560, oplog->getRecordStore()->oplogDiskLocRegister(opCtx, ts));
+ const bool orderedCommit = false;
+ fassert(28560, oplog->getRecordStore()->oplogDiskLocRegister(opCtx, ts, orderedCommit));
for (std::size_t i = 0; i < count; i++) {
slotsOut[i].opTime = {Timestamp(ts.asULL() + i), term};
diff --git a/src/mongo/db/repl/storage_interface.h b/src/mongo/db/repl/storage_interface.h
index be79c228483..a65dd305fac 100644
--- a/src/mongo/db/repl/storage_interface.h
+++ b/src/mongo/db/repl/storage_interface.h
@@ -371,6 +371,16 @@ public:
* batch.
*/
virtual void waitForAllEarlierOplogWritesToBeVisible(OperationContext* opCtx) = 0;
+
+ /**
+ * Registers a timestamp with the storage engine so that it can enforce oplog visiblity rules.
+ * orderedCommit - specifies whether the timestamp provided is ordered w.r.t. commits; that is,
+ * all commits with older timestamps have already occurred, and any commits with newer
+ * timestamps have not yet occurred.
+ */
+ virtual void oplogDiskLocRegister(OperationContext* opCtx,
+ const Timestamp& ts,
+ bool orderedCommit) = 0;
};
} // namespace repl
diff --git a/src/mongo/db/repl/storage_interface_impl.cpp b/src/mongo/db/repl/storage_interface_impl.cpp
index 58a6eaf21f2..71442a94c57 100644
--- a/src/mongo/db/repl/storage_interface_impl.cpp
+++ b/src/mongo/db/repl/storage_interface_impl.cpp
@@ -1096,5 +1096,13 @@ void StorageInterfaceImpl::waitForAllEarlierOplogWritesToBeVisible(OperationCont
oplog.getCollection()->getRecordStore()->waitForAllEarlierOplogWritesToBeVisible(opCtx);
}
+void StorageInterfaceImpl::oplogDiskLocRegister(OperationContext* opCtx,
+ const Timestamp& ts,
+ bool orderedCommit) {
+ AutoGetCollection oplog(opCtx, NamespaceString::kRsOplogNamespace, MODE_IS);
+ fassert(
+ 28557,
+ oplog.getCollection()->getRecordStore()->oplogDiskLocRegister(opCtx, ts, orderedCommit));
+}
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/storage_interface_impl.h b/src/mongo/db/repl/storage_interface_impl.h
index 761875dd0bf..4d0b6f32db9 100644
--- a/src/mongo/db/repl/storage_interface_impl.h
+++ b/src/mongo/db/repl/storage_interface_impl.h
@@ -168,6 +168,9 @@ public:
Status isAdminDbValid(OperationContext* opCtx) override;
void waitForAllEarlierOplogWritesToBeVisible(OperationContext* opCtx) override;
+ void oplogDiskLocRegister(OperationContext* opCtx,
+ const Timestamp& ts,
+ bool orderedCommit) override;
private:
const NamespaceString _rollbackIdNss;
diff --git a/src/mongo/db/repl/storage_interface_mock.h b/src/mongo/db/repl/storage_interface_mock.h
index b2b3d22f276..2e36f4a78cc 100644
--- a/src/mongo/db/repl/storage_interface_mock.h
+++ b/src/mongo/db/repl/storage_interface_mock.h
@@ -305,6 +305,12 @@ public:
return;
}
+ void oplogDiskLocRegister(OperationContext* opCtx,
+ const Timestamp& ts,
+ bool orderedCommit) override {
+ return;
+ }
+
// Testing functions.
CreateCollectionForBulkFn createCollectionForBulkFn =
[](const NamespaceString& nss,
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index 6c95ca70e47..7e1e5b5d3ae 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -900,7 +900,12 @@ void SyncTail::oplogApplication(OplogBuffer* oplogBuffer, ReplicationCoordinator
<< lastAppliedOpTimeAtEndOfBatch.toString()
<< " in the middle of batch application");
- // 4. Finalize this batch. We are at a consistent optime if our current optime is >= the
+ // 4. Update oplog visibility by notifying the storage engine of the new oplog entries.
+ const bool orderedCommit = true;
+ StorageInterface::get(&opCtx)->oplogDiskLocRegister(
+ &opCtx, lastOpTimeInBatch.getTimestamp(), orderedCommit);
+
+ // 5. Finalize this batch. We are at a consistent optime if our current optime is >= the
// current 'minValid' optime.
auto consistency = (lastOpTimeInBatch >= minValid)
? ReplicationCoordinator::DataConsistency::Consistent
diff --git a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.h b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.h
index ad6f9091295..87d46bef8a9 100644
--- a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.h
+++ b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.h
@@ -74,6 +74,8 @@ public:
return SnapshotId();
}
+ virtual void setOrderedCommit(bool orderedCommit) {}
+
private:
typedef std::shared_ptr<Change> ChangePtr;
typedef std::vector<ChangePtr> Changes;
diff --git a/src/mongo/db/storage/mmap_v1/dur_recovery_unit.h b/src/mongo/db/storage/mmap_v1/dur_recovery_unit.h
index 52f717d29b2..b2c6dc0f20c 100644
--- a/src/mongo/db/storage/mmap_v1/dur_recovery_unit.h
+++ b/src/mongo/db/storage/mmap_v1/dur_recovery_unit.h
@@ -65,6 +65,8 @@ public:
return SnapshotId();
}
+ virtual void setOrderedCommit(bool orderedCommit) {}
+
private:
/**
* Marks writes for journaling, if enabled, and then commits all other Changes in order.
diff --git a/src/mongo/db/storage/mmap_v1/heap_record_store_btree.h b/src/mongo/db/storage/mmap_v1/heap_record_store_btree.h
index c5d5299a9b7..507f16c8ec0 100644
--- a/src/mongo/db/storage/mmap_v1/heap_record_store_btree.h
+++ b/src/mongo/db/storage/mmap_v1/heap_record_store_btree.h
@@ -208,6 +208,8 @@ public:
return SnapshotId();
}
+ virtual void setOrderedCommit(bool orderedCommit) {}
+
// -----------------------
void notifyInsert(HeapRecordStoreBtree* rs, const RecordId& loc);
diff --git a/src/mongo/db/storage/mobile/mobile_recovery_unit.h b/src/mongo/db/storage/mobile/mobile_recovery_unit.h
index e6cf5c9415c..a919276351f 100644
--- a/src/mongo/db/storage/mobile/mobile_recovery_unit.h
+++ b/src/mongo/db/storage/mobile/mobile_recovery_unit.h
@@ -87,6 +87,8 @@ public:
return checked_cast<MobileRecoveryUnit*>(opCtx->recoveryUnit());
}
+ void setOrderedCommit(bool orderedCommit) override {}
+
private:
void _abort();
void _commit();
diff --git a/src/mongo/db/storage/record_store.h b/src/mongo/db/storage/record_store.h
index c84dbdcf1f3..e48297d530b 100644
--- a/src/mongo/db/storage/record_store.h
+++ b/src/mongo/db/storage/record_store.h
@@ -624,8 +624,15 @@ public:
*
* Since this is called inside of a WriteUnitOfWork while holding a std::mutex, it is
* illegal to acquire any LockManager locks inside of this function.
- */
- virtual Status oplogDiskLocRegister(OperationContext* opCtx, const Timestamp& opTime) {
+ *
+ * If `orderedCommit` is true, the storage engine can assume the input `opTime` has become
+ * visible in the oplog. Otherwise the storage engine must continue to maintain its own
+ * visibility management. Calls with `orderedCommit` true will not be concurrent with calls of
+ * `orderedCommit` false.
+ */
+ virtual Status oplogDiskLocRegister(OperationContext* opCtx,
+ const Timestamp& opTime,
+ bool orderedCommit) {
return Status::OK();
}
diff --git a/src/mongo/db/storage/recovery_unit.h b/src/mongo/db/storage/recovery_unit.h
index 857885fbf18..0ac797e17c3 100644
--- a/src/mongo/db/storage/recovery_unit.h
+++ b/src/mongo/db/storage/recovery_unit.h
@@ -305,6 +305,8 @@ public:
*/
virtual void setRollbackWritesDisabled() = 0;
+ virtual void setOrderedCommit(bool orderedCommit) = 0;
+
protected:
RecoveryUnit() {}
repl::ReplicationCoordinator::Mode _replicationMode = repl::ReplicationCoordinator::modeNone;
diff --git a/src/mongo/db/storage/recovery_unit_noop.h b/src/mongo/db/storage/recovery_unit_noop.h
index d8f7e69adb1..0dbc41de3d5 100644
--- a/src/mongo/db/storage/recovery_unit_noop.h
+++ b/src/mongo/db/storage/recovery_unit_noop.h
@@ -80,6 +80,8 @@ public:
return SnapshotId();
}
+ virtual void setOrderedCommit(bool orderedCommit) {}
+
private:
std::vector<std::unique_ptr<Change>> _changes;
};
diff --git a/src/mongo/db/storage/wiredtiger/record_store_test_oplog.cpp b/src/mongo/db/storage/wiredtiger/record_store_test_oplog.cpp
index 90de4f17d61..d85d6e35e16 100644
--- a/src/mongo/db/storage/wiredtiger/record_store_test_oplog.cpp
+++ b/src/mongo/db/storage/wiredtiger/record_store_test_oplog.cpp
@@ -41,7 +41,7 @@ StatusWith<RecordId> insertBSON(ServiceContext::UniqueOperationContext& opCtx,
const Timestamp& opTime) {
BSONObj obj = BSON("ts" << opTime);
WriteUnitOfWork wuow(opCtx.get());
- Status status = rs->oplogDiskLocRegister(opCtx.get(), opTime);
+ Status status = rs->oplogDiskLocRegister(opCtx.get(), opTime, false);
if (!status.isOK())
return StatusWith<RecordId>(status);
StatusWith<RecordId> res =
@@ -55,7 +55,7 @@ RecordId _oplogOrderInsertOplog(OperationContext* opCtx,
const std::unique_ptr<RecordStore>& rs,
int inc) {
Timestamp opTime = Timestamp(5, inc);
- Status status = rs->oplogDiskLocRegister(opCtx, opTime);
+ Status status = rs->oplogDiskLocRegister(opCtx, opTime, false);
ASSERT_OK(status);
BSONObj obj = BSON("ts" << opTime);
StatusWith<RecordId> res = rs->insertRecord(opCtx, obj.objdata(), obj.objsize(), opTime, false);
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_oplog_manager.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_oplog_manager.cpp
index cd541d262f0..577e507bcbc 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_oplog_manager.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_oplog_manager.cpp
@@ -222,8 +222,11 @@ void WiredTigerOplogManager::_oplogJournalThreadLoop(
sessionCache->waitUntilDurable(/*forceCheckpoint=*/false, false);
lk.lock();
- // Publish the new timestamp value.
- _setOplogReadTimestamp(lk, newTimestamp);
+ // Publish the new timestamp value. Avoid going backward.
+ auto oldTimestamp = getOplogReadTimestamp();
+ if (newTimestamp > oldTimestamp) {
+ _setOplogReadTimestamp(lk, newTimestamp);
+ }
lk.unlock();
// Wake up any await_data cursors and tell them more data might be visible now.
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp
index 1aaf4f83d94..33ef2418c72 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp
@@ -1145,8 +1145,12 @@ Status WiredTigerRecordStore::_insertRecords(OperationContext* opCtx,
if (timestamps[i].isNull() && _isOplog) {
// If the timestamp is 0, that probably means someone inserted a document directly
// into the oplog. In this case, use the RecordId as the timestamp, since they are
- // one and the same.
+ // one and the same. Setting this transaction to be unordered will trigger a journal
+ // flush. Because these are direct writes into the oplog, the machinery to trigger a
+ // journal flush is bypassed. A followup oplog read will require a fresh visibility
+ // value to make progress.
ts = Timestamp(record.id.repr());
+ opCtx->recoveryUnit()->setOrderedCommit(false);
} else {
ts = timestamps[i];
}
@@ -1684,11 +1688,19 @@ void WiredTigerRecordStore::cappedTruncateAfter(OperationContext* opCtx,
}
Status WiredTigerRecordStore::oplogDiskLocRegister(OperationContext* opCtx,
- const Timestamp& opTime) {
- // This labels the current transaction with a timestamp.
- // This is required for oplog visibility to work correctly, as WiredTiger uses the transaction
- // list to determine where there are holes in the oplog.
- return opCtx->recoveryUnit()->setTimestamp(opTime);
+ const Timestamp& ts,
+ bool orderedCommit) {
+ opCtx->recoveryUnit()->setOrderedCommit(orderedCommit);
+ if (!orderedCommit) {
+ // This labels the current transaction with a timestamp.
+ // This is required for oplog visibility to work correctly, as WiredTiger uses the
+ // transaction list to determine where there are holes in the oplog.
+ return opCtx->recoveryUnit()->setTimestamp(ts);
+ }
+ // This handles non-primary (secondary) state behavior; we simply set the oplog visiblity read
+ // timestamp here, as there cannot be visible holes prior to the opTime passed in.
+ _kvEngine->getOplogManager()->setOplogReadTimestamp(ts);
+ return Status::OK();
}
// Cursor Base:
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h
index 80b3b44af47..ddec68527c4 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h
@@ -213,7 +213,9 @@ public:
virtual boost::optional<RecordId> oplogStartHack(OperationContext* opCtx,
const RecordId& startingPosition) const;
- virtual Status oplogDiskLocRegister(OperationContext* opCtx, const Timestamp& opTime);
+ virtual Status oplogDiskLocRegister(OperationContext* opCtx,
+ const Timestamp& opTime,
+ bool orderedCommit);
virtual void updateStatsAfterRepair(OperationContext* opCtx,
long long numRecords,
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_test.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_test.cpp
index 230d1fc28b6..253b44eb6c4 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_test.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_test.cpp
@@ -212,7 +212,7 @@ StatusWith<RecordId> insertBSON(ServiceContext::UniqueOperationContext& opCtx,
WriteUnitOfWork wuow(opCtx.get());
WiredTigerRecordStore* wrs = checked_cast<WiredTigerRecordStore*>(rs.get());
invariant(wrs);
- Status status = wrs->oplogDiskLocRegister(opCtx.get(), opTime);
+ Status status = wrs->oplogDiskLocRegister(opCtx.get(), opTime, false);
if (!status.isOK())
return StatusWith<RecordId>(status);
StatusWith<RecordId> res =
@@ -265,7 +265,7 @@ RecordId _oplogOrderInsertOplog(OperationContext* opCtx,
const unique_ptr<RecordStore>& rs,
int inc) {
Timestamp opTime = Timestamp(5, inc);
- Status status = rs->oplogDiskLocRegister(opCtx, opTime);
+ Status status = rs->oplogDiskLocRegister(opCtx, opTime, false);
ASSERT_OK(status);
BSONObj obj = BSON("ts" << opTime);
StatusWith<RecordId> res = rs->insertRecord(opCtx, obj.objdata(), obj.objsize(), opTime, false);
@@ -424,7 +424,7 @@ StatusWith<RecordId> insertBSONWithSize(OperationContext* opCtx,
WriteUnitOfWork wuow(opCtx);
WiredTigerRecordStore* wtrs = checked_cast<WiredTigerRecordStore*>(rs);
invariant(wtrs);
- Status status = wtrs->oplogDiskLocRegister(opCtx, opTime);
+ Status status = wtrs->oplogDiskLocRegister(opCtx, opTime, false);
if (!status.isOK()) {
return StatusWith<RecordId>(status);
}
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp
index 5fe19158f9e..2e38a052b8b 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp
@@ -207,7 +207,12 @@ void WiredTigerRecoveryUnit::_txnClose(bool commit) {
}
if (_isTimestamped) {
- _oplogManager->triggerJournalFlush();
+ if (!_orderedCommit) {
+ // We only need to update oplog visibility where commits can be out-of-order with
+ // respect to their assigned optime and such commits might otherwise be visible.
+ // This should happen only on primary nodes.
+ _oplogManager->triggerJournalFlush();
+ }
_isTimestamped = false;
}
invariantWTOK(wtRet);
@@ -215,6 +220,7 @@ void WiredTigerRecoveryUnit::_txnClose(bool commit) {
_active = false;
_mySnapshotId = nextSnapshotId.fetchAndAdd(1);
_isOplogReader = false;
+ _orderedCommit = true; // Default value is true; we assume all writes are ordered.
}
SnapshotId WiredTigerRecoveryUnit::getSnapshotId() const {
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h
index 5207cab95ab..19b110a9f1e 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h
@@ -93,6 +93,10 @@ public:
void setRollbackWritesDisabled() override {}
+ virtual void setOrderedCommit(bool orderedCommit) override {
+ _orderedCommit = orderedCommit;
+ }
+
// ---- WT STUFF
WiredTigerSession* getSession();
@@ -148,6 +152,9 @@ private:
bool _inUnitOfWork;
bool _active;
bool _isTimestamped = false;
+ // Commits are assumed ordered. Unordered commits are assumed to always need to reserve a
+ // new optime, and thus always call oplogDiskLocRegister() on the record store.
+ bool _orderedCommit = true;
Timestamp _commitTimestamp;
uint64_t _mySnapshotId;
Timestamp _majorityCommittedSnapshot;