summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Russotto <matthew.russotto@10gen.com>2018-09-14 10:13:35 -0400
committerMatthew Russotto <matthew.russotto@10gen.com>2018-09-14 10:17:42 -0400
commitc46faf4672c81d4801014981669d770fc65b950e (patch)
tree779e165fa75ed59b9c06b474f6ca5f7b6ac97f09 /src
parentf92a1d2aaf0f4d2874f64a5e1b3c12fc66e39d4d (diff)
downloadmongo-c46faf4672c81d4801014981669d770fc65b950e.tar.gz
SERVER-35821 readConcern:snapshot transactions need a read timestamp <= WT's all_committed point
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/catalog/collection_impl.cpp19
-rw-r--r--src/mongo/db/read_concern.cpp6
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state.h5
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.cpp5
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.h1
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_mock.cpp4
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_mock.h1
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp19
-rw-r--r--src/mongo/db/storage/recovery_unit.h7
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp28
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h6
-rw-r--r--src/mongo/db/transaction_participant.cpp8
-rw-r--r--src/mongo/db/transaction_participant.h8
13 files changed, 108 insertions, 9 deletions
diff --git a/src/mongo/db/catalog/collection_impl.cpp b/src/mongo/db/catalog/collection_impl.cpp
index 4389b6b7641..b444e9bba83 100644
--- a/src/mongo/db/catalog/collection_impl.cpp
+++ b/src/mongo/db/catalog/collection_impl.cpp
@@ -101,6 +101,11 @@ namespace {
// Used below to fail during inserts.
MONGO_FAIL_POINT_DEFINE(failCollectionInserts);
+// Used to pause after inserting collection data and calling the opObservers. Inserts to
+// replicated collections that are not part of a multi-statement transaction will have generated
+// their OpTime and oplog entry.
+MONGO_FAIL_POINT_DEFINE(hangAfterCollectionInserts);
+
// Uses the collator factory to convert the BSON representation of a collator to a
// CollatorInterface. Returns null if the BSONObj is empty. We expect the stored collation to be
// valid, since it gets validated on collection create.
@@ -365,6 +370,20 @@ Status CollectionImpl::insertDocuments(OperationContext* opCtx,
opCtx->recoveryUnit()->onCommit(
[this](boost::optional<Timestamp>) { notifyCappedWaitersIfNeeded(); });
+ MONGO_FAIL_POINT_BLOCK(hangAfterCollectionInserts, extraData) {
+ const BSONObj& data = extraData.getData();
+ const auto collElem = data["collectionNS"];
+ // If the failpoint specifies no collection or matches the existing one, hang.
+ if (!collElem || _ns.ns() == collElem.str()) {
+ while (MONGO_FAIL_POINT(hangAfterCollectionInserts)) {
+ log() << "hangAfterCollectionInserts fail point enabled for " << _ns.toString()
+ << ". Blocking until fail point is disabled.";
+ mongo::sleepsecs(1);
+ opCtx->checkForInterrupt();
+ }
+ }
+ }
+
return Status::OK();
}
diff --git a/src/mongo/db/read_concern.cpp b/src/mongo/db/read_concern.cpp
index 53372e23878..6184548f7b0 100644
--- a/src/mongo/db/read_concern.cpp
+++ b/src/mongo/db/read_concern.cpp
@@ -295,7 +295,11 @@ Status waitForReadConcern(OperationContext* opCtx,
"node needs to be a replica set member to use readConcern: snapshot"};
}
if (speculative) {
- txnParticipant->setSpeculativeTransactionOpTimeToLastApplied(opCtx);
+ txnParticipant->setSpeculativeTransactionOpTime(
+ opCtx,
+ readConcernArgs.getOriginalLevel() == repl::ReadConcernLevel::kSnapshotReadConcern
+ ? SpeculativeTransactionOpTime::kAllCommitted
+ : SpeculativeTransactionOpTime::kLastApplied);
}
}
diff --git a/src/mongo/db/repl/replication_coordinator_external_state.h b/src/mongo/db/repl/replication_coordinator_external_state.h
index 05b9a8e85d2..0f2ff6aafcf 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state.h
+++ b/src/mongo/db/repl/replication_coordinator_external_state.h
@@ -174,6 +174,11 @@ public:
virtual void setGlobalTimestamp(ServiceContext* service, const Timestamp& newTime) = 0;
/**
+ * Checks if the oplog exists.
+ */
+ virtual bool oplogExists(OperationContext* opCtx) = 0;
+
+ /**
* Gets the last optime of an operation performed on this host, from stable
* storage.
*/
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
index cefbf8d3bd9..97dc1c9bf9f 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
@@ -621,6 +621,11 @@ void ReplicationCoordinatorExternalStateImpl::setGlobalTimestamp(ServiceContext*
setNewTimestamp(ctx, newTime);
}
+bool ReplicationCoordinatorExternalStateImpl::oplogExists(OperationContext* opCtx) {
+ AutoGetCollection oplog(opCtx, NamespaceString::kRsOplogNamespace, MODE_IS);
+ return oplog.getCollection() != nullptr;
+}
+
StatusWith<OpTime> ReplicationCoordinatorExternalStateImpl::loadLastOpTime(
OperationContext* opCtx) {
// TODO: handle WriteConflictExceptions below
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.h b/src/mongo/db/repl/replication_coordinator_external_state_impl.h
index 57969506c85..e206c02b622 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.h
@@ -86,6 +86,7 @@ public:
virtual StatusWith<LastVote> loadLocalLastVoteDocument(OperationContext* opCtx);
virtual Status storeLocalLastVoteDocument(OperationContext* opCtx, const LastVote& lastVote);
virtual void setGlobalTimestamp(ServiceContext* service, const Timestamp& newTime);
+ bool oplogExists(OperationContext* opCtx) final;
virtual StatusWith<OpTime> loadLastOpTime(OperationContext* opCtx);
virtual HostAndPort getClientHostAndPort(const OperationContext* opCtx);
virtual void closeConnections();
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp
index 614668b2a68..735cc494418 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp
@@ -155,6 +155,10 @@ void ReplicationCoordinatorExternalStateMock::setLocalLastVoteDocument(
void ReplicationCoordinatorExternalStateMock::setGlobalTimestamp(ServiceContext* service,
const Timestamp& newTime) {}
+bool ReplicationCoordinatorExternalStateMock::oplogExists(OperationContext* opCtx) {
+ return true;
+}
+
StatusWith<OpTime> ReplicationCoordinatorExternalStateMock::loadLastOpTime(
OperationContext* opCtx) {
return _lastOpTime;
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.h b/src/mongo/db/repl/replication_coordinator_external_state_mock.h
index d78589bd109..115395206d2 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_mock.h
+++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.h
@@ -76,6 +76,7 @@ public:
virtual StatusWith<LastVote> loadLocalLastVoteDocument(OperationContext* opCtx);
virtual Status storeLocalLastVoteDocument(OperationContext* opCtx, const LastVote& lastVote);
virtual void setGlobalTimestamp(ServiceContext* service, const Timestamp& newTime);
+ bool oplogExists(OperationContext* opCtx) override;
virtual StatusWith<OpTime> loadLastOpTime(OperationContext* opCtx);
virtual void closeConnections();
virtual void killAllUserOperations(OperationContext* opCtx);
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index c7463d168f6..ed40982042b 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -1274,10 +1274,9 @@ Status ReplicationCoordinatorImpl::_waitUntilOpTime(OperationContext* opCtx,
OpTime targetOpTime,
boost::optional<Date_t> deadline) {
if (!isMajorityCommittedRead) {
- // This assumes the read concern is "local" level.
- // We need to wait for all committed writes to be visible, even in the oplog (which uses
- // special visibility rules).
- _storage->waitForAllEarlierOplogWritesToBeVisible(opCtx);
+ if (!_externalState->oplogExists(opCtx)) {
+ return {ErrorCodes::NotYetInitialized, "The oplog does not exist."};
+ }
}
stdx::unique_lock<stdx::mutex> lock(_mutex);
@@ -1341,6 +1340,18 @@ Status ReplicationCoordinatorImpl::_waitUntilOpTime(OperationContext* opCtx,
}
}
+ lock.unlock();
+
+ if (!isMajorityCommittedRead) {
+ // This assumes the read concern is "local" level.
+ // We need to wait for all committed writes to be visible, even in the oplog (which uses
+ // special visibility rules). We must do this after waiting for our target optime, because
+ // only then do we know that it will fill in all "holes" before that time. If we do it
+ // earlier, we may return when the requested optime has been reached, but other writes
+ // at optimes before that time are not yet visible.
+ _storage->waitForAllEarlierOplogWritesToBeVisible(opCtx);
+ }
+
return Status::OK();
}
diff --git a/src/mongo/db/storage/recovery_unit.h b/src/mongo/db/storage/recovery_unit.h
index c39807af32c..8444da4f4c8 100644
--- a/src/mongo/db/storage/recovery_unit.h
+++ b/src/mongo/db/storage/recovery_unit.h
@@ -160,6 +160,8 @@ public:
* - when using ReadSource::kProvided, the timestamp provided.
* - when using ReadSource::kLastAppliedSnapshot, the timestamp chosen using the storage
* engine's last applied timestamp.
+ * - when using ReadSource::kAllCommittedSnapshot, the timestamp chosen using the storage
+ * engine's all-committed timestamp.
* - when using ReadSource::kLastApplied, the last applied timestamp at which the current
* storage transaction was opened, if one is open.
* - when using ReadSource::kMajorityCommitted, the majority committed timestamp chosen by the
@@ -265,6 +267,11 @@ public:
*/
kLastAppliedSnapshot,
/**
+ * Read from the all-committed timestamp. New transactions will always read from the same
+ * timestamp and never advance.
+ */
+ kAllCommittedSnapshot,
+ /**
* Read from the timestamp provided to setTimestampReadSource.
*/
kProvided
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp
index 1cc671dcf56..dbe8f4dc68a 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp
@@ -313,7 +313,8 @@ Status WiredTigerRecoveryUnit::obtainMajorityCommittedSnapshot() {
boost::optional<Timestamp> WiredTigerRecoveryUnit::getPointInTimeReadTimestamp() const {
if (_timestampReadSource == ReadSource::kProvided ||
- _timestampReadSource == ReadSource::kLastAppliedSnapshot) {
+ _timestampReadSource == ReadSource::kLastAppliedSnapshot ||
+ _timestampReadSource == ReadSource::kAllCommittedSnapshot) {
invariant(!_readAtTimestamp.isNull());
return _readAtTimestamp;
}
@@ -371,6 +372,13 @@ void WiredTigerRecoveryUnit::_txnOpen() {
}
break;
}
+ case ReadSource::kAllCommittedSnapshot: {
+ if (_readAtTimestamp.isNull()) {
+ _readAtTimestamp = _beginTransactionAtAllCommittedTimestamp(session);
+ break;
+ }
+ // Intentionally continue to the next case to read at the _readAtTimestamp.
+ }
case ReadSource::kLastAppliedSnapshot: {
// Only ever read the last applied timestamp once, and continue reusing it for
// subsequent transactions.
@@ -400,6 +408,24 @@ void WiredTigerRecoveryUnit::_txnOpen() {
_active = true;
}
+Timestamp WiredTigerRecoveryUnit::_beginTransactionAtAllCommittedTimestamp(WT_SESSION* session) {
+ WiredTigerBeginTxnBlock txnOpen(session, _ignorePrepared);
+ Timestamp txnTimestamp = Timestamp(_oplogManager->fetchAllCommittedValue(session->connection));
+ auto status =
+ txnOpen.setTimestamp(txnTimestamp, WiredTigerBeginTxnBlock::RoundToOldest::kRound);
+ fassert(50948, status);
+
+ // Since this is not in a critical section, we might have rounded to oldest between
+ // calling getAllCommitted and setTimestamp. We need to get the actual read timestamp we
+ // used.
+ char buf[(2 * 8 /*bytes in hex*/) + 1 /*nul terminator*/];
+ auto wtstatus = session->query_timestamp(session, buf, "get=read");
+ invariantWTOK(wtstatus);
+ uint64_t read_timestamp;
+ fassert(50949, parseNumberFromStringWithBase(buf, 16, &read_timestamp));
+ txnOpen.done();
+ return Timestamp(read_timestamp);
+}
Status WiredTigerRecoveryUnit::setTimestamp(Timestamp timestamp) {
_ensureSession();
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h
index a48ff02b35c..2149c5a3e2a 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h
@@ -148,6 +148,12 @@ private:
void _txnClose(bool commit);
void _txnOpen();
+ /**
+ * Starts a transaction at the current all-committed timestamp.
+ * Returns the timestamp the transaction was started at.
+ */
+ Timestamp _beginTransactionAtAllCommittedTimestamp(WT_SESSION* session);
+
WiredTigerSessionCache* _sessionCache; // not owned
WiredTigerOplogManager* _oplogManager; // not owned
UniqueWiredTigerSession _session;
diff --git a/src/mongo/db/transaction_participant.cpp b/src/mongo/db/transaction_participant.cpp
index 7755e0b1af0..e7efc8d2308 100644
--- a/src/mongo/db/transaction_participant.cpp
+++ b/src/mongo/db/transaction_participant.cpp
@@ -265,11 +265,15 @@ void TransactionParticipant::beginTransactionUnconditionally(TxnNumber txnNumber
_beginMultiDocumentTransaction(lg, txnNumber);
}
-void TransactionParticipant::setSpeculativeTransactionOpTimeToLastApplied(OperationContext* opCtx) {
+void TransactionParticipant::setSpeculativeTransactionOpTime(
+ OperationContext* opCtx, SpeculativeTransactionOpTime opTimeChoice) {
stdx::lock_guard<stdx::mutex> lg(_mutex);
repl::ReplicationCoordinator* replCoord =
repl::ReplicationCoordinator::get(opCtx->getClient()->getServiceContext());
- opCtx->recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kLastAppliedSnapshot);
+ opCtx->recoveryUnit()->setTimestampReadSource(
+ opTimeChoice == SpeculativeTransactionOpTime::kAllCommitted
+ ? RecoveryUnit::ReadSource::kAllCommittedSnapshot
+ : RecoveryUnit::ReadSource::kLastAppliedSnapshot);
opCtx->recoveryUnit()->preallocateSnapshot();
auto readTimestamp = opCtx->recoveryUnit()->getPointInTimeReadTimestamp();
invariant(readTimestamp);
diff --git a/src/mongo/db/transaction_participant.h b/src/mongo/db/transaction_participant.h
index 48264e4157e..57b1884db25 100644
--- a/src/mongo/db/transaction_participant.h
+++ b/src/mongo/db/transaction_participant.h
@@ -53,6 +53,11 @@ class OperationContext;
extern AtomicInt32 transactionLifetimeLimitSeconds;
+enum class SpeculativeTransactionOpTime {
+ kLastApplied,
+ kAllCommitted,
+};
+
/**
* A state machine that coordinates a distributed transaction commit with the transaction
* coordinator.
@@ -154,7 +159,8 @@ public:
/**
* Called for speculative transactions to fix the optime of the snapshot to read from.
*/
- void setSpeculativeTransactionOpTimeToLastApplied(OperationContext* opCtx);
+ void setSpeculativeTransactionOpTime(OperationContext* opCtx,
+ SpeculativeTransactionOpTime opTimeChoice);
/**
* Transfers management of transaction resources from the OperationContext to the Session.