summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authorJudah Schvimer <judah@mongodb.com>2018-03-20 15:30:00 -0400
committerJudah Schvimer <judah@mongodb.com>2018-03-20 15:30:00 -0400
commit2ee6908f1c73dd50d6425e3462ccac2582deb2f3 (patch)
treef62b6719c1f353a26bca9c91644fe490542835d7 /src/mongo/db
parent56b93cc67319cfb85fc8fdae36549bcbc0701065 (diff)
downloadmongo-2ee6908f1c73dd50d6425e3462ccac2582deb2f3.tar.gz
SERVER-33743 Use all_committed to set lastApplied on primary nodes
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/repl/oplog.cpp22
-rw-r--r--src/mongo/db/storage/devnull/devnull_kv_engine.h4
-rw-r--r--src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_engine.h4
-rw-r--r--src/mongo/db/storage/kv/kv_engine.h5
-rw-r--r--src/mongo/db/storage/kv/kv_storage_engine.cpp4
-rw-r--r--src/mongo/db/storage/kv/kv_storage_engine.h2
-rw-r--r--src/mongo/db/storage/mmap_v1/mmap_v1_engine.h4
-rw-r--r--src/mongo/db/storage/mobile/mobile_kv_engine.h4
-rw-r--r--src/mongo/db/storage/storage_engine.h8
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp22
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h2
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_oplog_manager.cpp4
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_oplog_manager.h4
13 files changed, 79 insertions, 10 deletions
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp
index f0ea23b3b22..6ba0e8b3154 100644
--- a/src/mongo/db/repl/oplog.cpp
+++ b/src/mongo/db/repl/oplog.cpp
@@ -377,9 +377,29 @@ void _logOpsInner(OperationContext* opCtx,
// Set replCoord last optime only after we're sure the WUOW didn't abort and roll back.
opCtx->recoveryUnit()->onCommit([opCtx, replCoord, finalOpTime] {
+
+ auto lastAppliedTimestamp = finalOpTime.getTimestamp();
+ const auto storageEngine = opCtx->getServiceContext()->getGlobalStorageEngine();
+ if (storageEngine->supportsDocLocking()) {
+ // If the storage engine supports document level locking, then it is possible for
+ // oplog writes to commit out of order. In that case, we only want to set our last
+ // applied optime to the all committed timestamp to ensure that all operations earlier
+ // than the last applied optime have been storage-committed. We are guaranteed that
+ // whatever operation occurred at the all committed timestamp occurred during the same
+ // term as 'finalOpTime'. When a primary enters a new term, it first commits a
+ // 'new primary' oplog entry in the new term before accepting any new writes. This
+ // will ensure that the all committed timestamp is in the new term before any client
+ // writes are committed.
+ lastAppliedTimestamp = storageEngine->getAllCommittedTimestamp(opCtx);
+ }
+
// Optimes on the primary should always represent consistent database states.
replCoord->setMyLastAppliedOpTimeForward(
- finalOpTime, ReplicationCoordinator::DataConsistency::Consistent);
+ OpTime(lastAppliedTimestamp, finalOpTime.getTerm()),
+ ReplicationCoordinator::DataConsistency::Consistent);
+
+ // We set the last op on the client to 'finalOpTime', because that contains the timestamp
+ // of the operation that the client actually performed.
ReplClientInfo::forClient(opCtx->getClient()).setLastOp(finalOpTime);
});
}
diff --git a/src/mongo/db/storage/devnull/devnull_kv_engine.h b/src/mongo/db/storage/devnull/devnull_kv_engine.h
index 85fb6b80a86..2f6d1ca75f2 100644
--- a/src/mongo/db/storage/devnull/devnull_kv_engine.h
+++ b/src/mongo/db/storage/devnull/devnull_kv_engine.h
@@ -112,6 +112,10 @@ public:
void setJournalListener(JournalListener* jl) final {}
+ virtual Timestamp getAllCommittedTimestamp(OperationContext* opCtx) const override {
+ return Timestamp();
+ }
+
private:
std::shared_ptr<void> _catalogInfo;
};
diff --git a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_engine.h b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_engine.h
index f2544ba5d1d..6574ff10beb 100644
--- a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_engine.h
+++ b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_engine.h
@@ -110,6 +110,10 @@ public:
_journalListener = jl;
}
+ virtual Timestamp getAllCommittedTimestamp(OperationContext* opCtx) const override {
+ MONGO_UNREACHABLE;
+ }
+
private:
typedef StringMap<std::shared_ptr<void>> DataMap;
diff --git a/src/mongo/db/storage/kv/kv_engine.h b/src/mongo/db/storage/kv/kv_engine.h
index 4d5f65f3418..0e989c7e0a0 100644
--- a/src/mongo/db/storage/kv/kv_engine.h
+++ b/src/mongo/db/storage/kv/kv_engine.h
@@ -283,6 +283,11 @@ public:
}
/**
+ * See `StorageEngine::getAllCommittedTimestamp`
+ */
+ virtual Timestamp getAllCommittedTimestamp(OperationContext* opCtx) const = 0;
+
+ /**
* See `StorageEngine::supportsReadConcernSnapshot`
*/
virtual bool supportsReadConcernSnapshot() const {
diff --git a/src/mongo/db/storage/kv/kv_storage_engine.cpp b/src/mongo/db/storage/kv/kv_storage_engine.cpp
index 19cbded2aec..0730ed8e597 100644
--- a/src/mongo/db/storage/kv/kv_storage_engine.cpp
+++ b/src/mongo/db/storage/kv/kv_storage_engine.cpp
@@ -602,4 +602,8 @@ bool KVStorageEngine::supportsReadConcernSnapshot() const {
void KVStorageEngine::replicationBatchIsComplete() const {
return _engine->replicationBatchIsComplete();
}
+
+Timestamp KVStorageEngine::getAllCommittedTimestamp(OperationContext* opCtx) const {
+ return _engine->getAllCommittedTimestamp(opCtx);
+}
} // namespace mongo
diff --git a/src/mongo/db/storage/kv/kv_storage_engine.h b/src/mongo/db/storage/kv/kv_storage_engine.h
index 6d1f5b3a489..50b4f23e65d 100644
--- a/src/mongo/db/storage/kv/kv_storage_engine.h
+++ b/src/mongo/db/storage/kv/kv_storage_engine.h
@@ -125,6 +125,8 @@ public:
virtual boost::optional<Timestamp> getRecoveryTimestamp() const override;
+ virtual Timestamp getAllCommittedTimestamp(OperationContext* opCtx) const override;
+
bool supportsReadConcernSnapshot() const final;
virtual void replicationBatchIsComplete() const override;
diff --git a/src/mongo/db/storage/mmap_v1/mmap_v1_engine.h b/src/mongo/db/storage/mmap_v1/mmap_v1_engine.h
index 0d7c6b3711e..afc3472cbcf 100644
--- a/src/mongo/db/storage/mmap_v1/mmap_v1_engine.h
+++ b/src/mongo/db/storage/mmap_v1/mmap_v1_engine.h
@@ -104,6 +104,10 @@ public:
void setJournalListener(JournalListener* jl) final;
+ Timestamp getAllCommittedTimestamp(OperationContext* opCtx) const override {
+ MONGO_UNREACHABLE;
+ }
+
private:
static void _listDatabases(const std::string& directory, std::vector<std::string>* out);
diff --git a/src/mongo/db/storage/mobile/mobile_kv_engine.h b/src/mongo/db/storage/mobile/mobile_kv_engine.h
index fc3f767b6a4..c6c577d5c08 100644
--- a/src/mongo/db/storage/mobile/mobile_kv_engine.h
+++ b/src/mongo/db/storage/mobile/mobile_kv_engine.h
@@ -116,6 +116,10 @@ public:
_journalListener = jl;
}
+ virtual Timestamp getAllCommittedTimestamp(OperationContext* opCtx) const override {
+ MONGO_UNREACHABLE;
+ }
+
private:
mutable stdx::mutex _mutex;
void _initDBPath(const std::string& path);
diff --git a/src/mongo/db/storage/storage_engine.h b/src/mongo/db/storage/storage_engine.h
index c3b02246013..5a75f792f27 100644
--- a/src/mongo/db/storage/storage_engine.h
+++ b/src/mongo/db/storage/storage_engine.h
@@ -382,6 +382,14 @@ public:
return std::vector<CollectionIndexNamePair>();
};
+ /**
+ * Returns the all committed timestamp. All transactions with timestamps earlier than the
+ * all committed timestamp are committed. Only storage engines that support document level
+ * locking must provide an implementation. Other storage engines may provide a no-op
+ * implementation.
+ */
+ virtual Timestamp getAllCommittedTimestamp(OperationContext* opCtx) const = 0;
+
protected:
/**
* The destructor will never be called. See cleanShutdown instead.
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp
index 5f699c1ff48..06c7bbcf329 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.cpp
@@ -1028,15 +1028,6 @@ void WiredTigerKVEngine::setStableTimestamp(Timestamp stableTimestamp) {
return;
}
- const auto oplogReadTimestamp = Timestamp(_oplogManager->getOplogReadTimestamp());
- if (!oplogReadTimestamp.isNull() && stableTimestamp > oplogReadTimestamp) {
- // When a replica set has one voting node, replication can advance the commit point ahead
- // of the current oplog read visibility. Allowing that to happen in storage can result in
- // logically previous transactions trying to commit behind this updated stable
- // timestamp. Instead, pin the stable timestamp to the oplog read timestamp.
- stableTimestamp = oplogReadTimestamp;
- }
-
const bool keepOldBehavior = true;
// Communicate to WiredTiger what the "stable timestamp" is. Timestamp-aware checkpoints will
// only persist to disk transactions committed with a timestamp earlier than the "stable
@@ -1072,6 +1063,15 @@ void WiredTigerKVEngine::_setOldestTimestamp(Timestamp oldestTimestamp, bool for
// Nothing to set yet.
return;
}
+ const auto oplogReadTimestamp = Timestamp(_oplogManager->getOplogReadTimestamp());
+ if (!force && !oplogReadTimestamp.isNull() && oldestTimestamp > oplogReadTimestamp) {
+ // Oplog visibility is updated asynchronously from replication updating the commit point.
+ // When force is not set, lag the `oldest_timestamp` to the possibly stale oplog read
+ // timestamp value. This guarantees an oplog reader's `read_timestamp` can always
+ // be serviced. When force is set, we respect the caller's request and do not lag the
+ // oldest timestamp.
+ oldestTimestamp = oplogReadTimestamp;
+ }
char oldestTSConfigString["force=true,oldest_timestamp=,commit_timestamp="_sd.size() +
(2 * 8 * 2) /* 2 timestamps of 16 hexadecimal digits each */ +
@@ -1143,6 +1143,10 @@ StatusWith<Timestamp> WiredTigerKVEngine::recoverToStableTimestamp() {
"WT does not support recover to stable timestamp yet.");
}
+Timestamp WiredTigerKVEngine::getAllCommittedTimestamp(OperationContext* opCtx) const {
+ return Timestamp(_oplogManager->fetchAllCommittedValue(opCtx));
+}
+
boost::optional<Timestamp> WiredTigerKVEngine::getRecoveryTimestamp() const {
if (!supportsRecoverToStableTimestamp()) {
severe() << "WiredTiger is configured to not support recover to a stable timestamp";
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h
index 9ad010a0957..146bce6e702 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h
@@ -180,6 +180,8 @@ public:
virtual boost::optional<Timestamp> getRecoveryTimestamp() const override;
+ virtual Timestamp getAllCommittedTimestamp(OperationContext* opCtx) const override;
+
bool supportsReadConcernSnapshot() const final;
// wiredtiger specific
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_oplog_manager.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_oplog_manager.cpp
index 577e507bcbc..19b37e4274a 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_oplog_manager.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_oplog_manager.cpp
@@ -249,6 +249,10 @@ void WiredTigerOplogManager::_setOplogReadTimestamp(WithLock, uint64_t newTimest
LOG(2) << "setting new oplogReadTimestamp: " << newTimestamp;
}
+uint64_t WiredTigerOplogManager::fetchAllCommittedValue(OperationContext* opCtx) {
+ return _fetchAllCommittedValue(WiredTigerRecoveryUnit::get(opCtx)->getSessionCache()->conn());
+}
+
uint64_t WiredTigerOplogManager::_fetchAllCommittedValue(WT_CONNECTION* conn) {
// Fetch the latest all_committed value from the storage engine. This value will be a
// timestamp that has no holes (uncommitted transactions with lower timestamps) behind it.
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_oplog_manager.h b/src/mongo/db/storage/wiredtiger/wiredtiger_oplog_manager.h
index ac3bdc03c2c..cfa444d0b37 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_oplog_manager.h
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_oplog_manager.h
@@ -78,6 +78,10 @@ public:
void waitForAllEarlierOplogWritesToBeVisible(const WiredTigerRecordStore* oplogRecordStore,
OperationContext* opCtx) const;
+ // Returns the all committed timestamp. All transactions with timestamps earlier than the
+ // all committed timestamp are committed.
+ uint64_t fetchAllCommittedValue(OperationContext* opCtx);
+
private:
void _oplogJournalThreadLoop(WiredTigerSessionCache* sessionCache,
WiredTigerRecordStore* oplogRecordStore) noexcept;