summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Russotto <matthew.russotto@10gen.com>2018-09-14 10:13:35 -0400
committerMatthew Russotto <matthew.russotto@10gen.com>2018-09-14 10:17:42 -0400
commitc46faf4672c81d4801014981669d770fc65b950e (patch)
tree779e165fa75ed59b9c06b474f6ca5f7b6ac97f09
parentf92a1d2aaf0f4d2874f64a5e1b3c12fc66e39d4d (diff)
downloadmongo-c46faf4672c81d4801014981669d770fc65b950e.tar.gz
SERVER-35821 readConcern:snapshot transactions need a read timestamp <= WT's all_committed point
-rw-r--r--buildscripts/resmokeconfig/suites/sharded_core_txns.yml3
-rw-r--r--jstests/core/txns/speculative_snapshot_includes_all_writes.js112
-rw-r--r--src/mongo/db/catalog/collection_impl.cpp19
-rw-r--r--src/mongo/db/read_concern.cpp6
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state.h5
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.cpp5
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.h1
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_mock.cpp4
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_mock.h1
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp19
-rw-r--r--src/mongo/db/storage/recovery_unit.h7
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp28
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h6
-rw-r--r--src/mongo/db/transaction_participant.cpp8
-rw-r--r--src/mongo/db/transaction_participant.h8
15 files changed, 223 insertions, 9 deletions
diff --git a/buildscripts/resmokeconfig/suites/sharded_core_txns.yml b/buildscripts/resmokeconfig/suites/sharded_core_txns.yml
index cfcf29fc681..d0a027b2e0a 100644
--- a/buildscripts/resmokeconfig/suites/sharded_core_txns.yml
+++ b/buildscripts/resmokeconfig/suites/sharded_core_txns.yml
@@ -66,6 +66,9 @@ selector:
- jstests/core/txns/view_reads_in_transaction.js
- jstests/core/txns/list_collections_not_blocked_by_txn.js
- jstests/core/txns/statement_ids_accepted.js
+
+ # Uses hangAfterCollectionInserts failpoint not available on mongos.
+ - jstests/core/txns/speculative_snapshot_includes_all_writes.js
executor:
archive:
hooks:
diff --git a/jstests/core/txns/speculative_snapshot_includes_all_writes.js b/jstests/core/txns/speculative_snapshot_includes_all_writes.js
new file mode 100644
index 00000000000..ad961092bf6
--- /dev/null
+++ b/jstests/core/txns/speculative_snapshot_includes_all_writes.js
@@ -0,0 +1,112 @@
+/**
+ * A speculative snapshot must not include any writes ordered after any uncommitted writes.
+ *
+ * @tags: [uses_transactions]
+ */
+(function() {
+ "use strict";
+
+ load("jstests/libs/check_log.js");
+
+ const dbName = "test";
+ const collName = "speculative_snapshot_includes_all_writes_1";
+ const collName2 = "speculative_snapshot_includes_all_writes_2";
+ const testDB = db.getSiblingDB(dbName);
+ const testColl = testDB[collName];
+ const testColl2 = testDB[collName2];
+
+ testDB.runCommand({drop: collName, writeConcern: {w: "majority"}});
+ testDB.runCommand({drop: collName2, writeConcern: {w: "majority"}});
+
+ assert.commandWorked(testDB.createCollection(collName, {writeConcern: {w: "majority"}}));
+ assert.commandWorked(testDB.createCollection(collName2, {writeConcern: {w: "majority"}}));
+
+ const sessionOptions = {causalConsistency: false};
+ const session = db.getMongo().startSession(sessionOptions);
+ const sessionDb = session.getDatabase(dbName);
+ const sessionColl = sessionDb.getCollection(collName);
+ const sessionColl2 = sessionDb.getCollection(collName2);
+
+ const session2 = db.getMongo().startSession(sessionOptions);
+ const session2Db = session2.getDatabase(dbName);
+ const session2Coll = session2Db.getCollection(collName);
+ const session2Coll2 = session2Db.getCollection(collName2);
+
+ // Clear ramlog so checkLog can't find log messages from previous times this fail point was
+ // enabled.
+ assert.commandWorked(testDB.adminCommand({clearLog: 'global'}));
+
+ jsTest.log("Prepopulate the collections.");
+ assert.commandWorked(testColl.insert([{_id: 0}], {writeConcern: {w: "majority"}}));
+ assert.commandWorked(testColl2.insert([{_id: "a"}], {writeConcern: {w: "majority"}}));
+
+ jsTest.log("Create the uncommitted write.");
+
+ assert.commandWorked(db.adminCommand({
+ configureFailPoint: "hangAfterCollectionInserts",
+ mode: "alwaysOn",
+ data: {collectionNS: testColl2.getFullName()}
+ }));
+
+ const joinHungWrite = startParallelShell(() => {
+ assert.commandWorked(
+ db.getSiblingDB("test").speculative_snapshot_includes_all_writes_2.insert(
+ {_id: "b"}, {writeConcern: {w: "majority"}}));
+ });
+
+ checkLog.contains(
+ db.getMongo(),
+ "hangAfterCollectionInserts fail point enabled for " + testColl2.getFullName());
+
+ jsTest.log("Create a write following the uncommitted write.");
+ // Note this write must use local write concern; it cannot be majority committed until
+ // the prior uncommitted write is committed.
+ assert.commandWorked(testColl.insert([{_id: 1}]));
+
+ jsTestLog("Start a snapshot transaction.");
+
+ session.startTransaction({readConcern: {level: "snapshot"}});
+
+ assert.docEq([{_id: 0}], sessionColl.find().toArray());
+
+ assert.docEq([{_id: "a"}], sessionColl2.find().toArray());
+
+ jsTestLog("Start a majority-read transaction.");
+
+ session2.startTransaction({readConcern: {level: "majority"}});
+
+ assert.docEq([{_id: 0}, {_id: 1}], session2Coll.find().toArray());
+
+ assert.docEq([{_id: "a"}], session2Coll2.find().toArray());
+
+ jsTestLog("Allow the uncommitted write to finish.");
+ assert.commandWorked(db.adminCommand({
+ configureFailPoint: "hangAfterCollectionInserts",
+ mode: "off",
+ }));
+
+ joinHungWrite();
+
+ jsTestLog("Double-checking that writes not committed at start of snapshot cannot appear.");
+ assert.docEq([{_id: 0}], sessionColl.find().toArray());
+
+ assert.docEq([{_id: "a"}], sessionColl2.find().toArray());
+
+ assert.docEq([{_id: 0}, {_id: 1}], session2Coll.find().toArray());
+
+ assert.docEq([{_id: "a"}], session2Coll2.find().toArray());
+
+ jsTestLog("Committing transactions.");
+ session.commitTransaction();
+ session2.commitTransaction();
+
+ assert.docEq([{_id: 0}, {_id: 1}], sessionColl.find().toArray());
+
+ assert.docEq([{_id: "a"}, {_id: "b"}], sessionColl2.find().toArray());
+
+ assert.docEq([{_id: 0}, {_id: 1}], session2Coll.find().toArray());
+
+ assert.docEq([{_id: "a"}, {_id: "b"}], session2Coll2.find().toArray());
+
+ session.endSession();
+}());
diff --git a/src/mongo/db/catalog/collection_impl.cpp b/src/mongo/db/catalog/collection_impl.cpp
index 4389b6b7641..b444e9bba83 100644
--- a/src/mongo/db/catalog/collection_impl.cpp
+++ b/src/mongo/db/catalog/collection_impl.cpp
@@ -101,6 +101,11 @@ namespace {
// Used below to fail during inserts.
MONGO_FAIL_POINT_DEFINE(failCollectionInserts);
+// Used to pause after inserting collection data and calling the opObservers. Inserts to
+// replicated collections that are not part of a multi-statement transaction will have generated
+// their OpTime and oplog entry.
+MONGO_FAIL_POINT_DEFINE(hangAfterCollectionInserts);
+
// Uses the collator factory to convert the BSON representation of a collator to a
// CollatorInterface. Returns null if the BSONObj is empty. We expect the stored collation to be
// valid, since it gets validated on collection create.
@@ -365,6 +370,20 @@ Status CollectionImpl::insertDocuments(OperationContext* opCtx,
opCtx->recoveryUnit()->onCommit(
[this](boost::optional<Timestamp>) { notifyCappedWaitersIfNeeded(); });
+ MONGO_FAIL_POINT_BLOCK(hangAfterCollectionInserts, extraData) {
+ const BSONObj& data = extraData.getData();
+ const auto collElem = data["collectionNS"];
+ // If the failpoint specifies no collection or matches the existing one, hang.
+ if (!collElem || _ns.ns() == collElem.str()) {
+ while (MONGO_FAIL_POINT(hangAfterCollectionInserts)) {
+ log() << "hangAfterCollectionInserts fail point enabled for " << _ns.toString()
+ << ". Blocking until fail point is disabled.";
+ mongo::sleepsecs(1);
+ opCtx->checkForInterrupt();
+ }
+ }
+ }
+
return Status::OK();
}
diff --git a/src/mongo/db/read_concern.cpp b/src/mongo/db/read_concern.cpp
index 53372e23878..6184548f7b0 100644
--- a/src/mongo/db/read_concern.cpp
+++ b/src/mongo/db/read_concern.cpp
@@ -295,7 +295,11 @@ Status waitForReadConcern(OperationContext* opCtx,
"node needs to be a replica set member to use readConcern: snapshot"};
}
if (speculative) {
- txnParticipant->setSpeculativeTransactionOpTimeToLastApplied(opCtx);
+ txnParticipant->setSpeculativeTransactionOpTime(
+ opCtx,
+ readConcernArgs.getOriginalLevel() == repl::ReadConcernLevel::kSnapshotReadConcern
+ ? SpeculativeTransactionOpTime::kAllCommitted
+ : SpeculativeTransactionOpTime::kLastApplied);
}
}
diff --git a/src/mongo/db/repl/replication_coordinator_external_state.h b/src/mongo/db/repl/replication_coordinator_external_state.h
index 05b9a8e85d2..0f2ff6aafcf 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state.h
+++ b/src/mongo/db/repl/replication_coordinator_external_state.h
@@ -174,6 +174,11 @@ public:
virtual void setGlobalTimestamp(ServiceContext* service, const Timestamp& newTime) = 0;
/**
+ * Checks if the oplog exists.
+ */
+ virtual bool oplogExists(OperationContext* opCtx) = 0;
+
+ /**
* Gets the last optime of an operation performed on this host, from stable
* storage.
*/
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
index cefbf8d3bd9..97dc1c9bf9f 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
@@ -621,6 +621,11 @@ void ReplicationCoordinatorExternalStateImpl::setGlobalTimestamp(ServiceContext*
setNewTimestamp(ctx, newTime);
}
+bool ReplicationCoordinatorExternalStateImpl::oplogExists(OperationContext* opCtx) {
+ AutoGetCollection oplog(opCtx, NamespaceString::kRsOplogNamespace, MODE_IS);
+ return oplog.getCollection() != nullptr;
+}
+
StatusWith<OpTime> ReplicationCoordinatorExternalStateImpl::loadLastOpTime(
OperationContext* opCtx) {
// TODO: handle WriteConflictExceptions below
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.h b/src/mongo/db/repl/replication_coordinator_external_state_impl.h
index 57969506c85..e206c02b622 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.h
@@ -86,6 +86,7 @@ public:
virtual StatusWith<LastVote> loadLocalLastVoteDocument(OperationContext* opCtx);
virtual Status storeLocalLastVoteDocument(OperationContext* opCtx, const LastVote& lastVote);
virtual void setGlobalTimestamp(ServiceContext* service, const Timestamp& newTime);
+ bool oplogExists(OperationContext* opCtx) final;
virtual StatusWith<OpTime> loadLastOpTime(OperationContext* opCtx);
virtual HostAndPort getClientHostAndPort(const OperationContext* opCtx);
virtual void closeConnections();
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp
index 614668b2a68..735cc494418 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp
@@ -155,6 +155,10 @@ void ReplicationCoordinatorExternalStateMock::setLocalLastVoteDocument(
void ReplicationCoordinatorExternalStateMock::setGlobalTimestamp(ServiceContext* service,
const Timestamp& newTime) {}
+bool ReplicationCoordinatorExternalStateMock::oplogExists(OperationContext* opCtx) {
+ return true;
+}
+
StatusWith<OpTime> ReplicationCoordinatorExternalStateMock::loadLastOpTime(
OperationContext* opCtx) {
return _lastOpTime;
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.h b/src/mongo/db/repl/replication_coordinator_external_state_mock.h
index d78589bd109..115395206d2 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_mock.h
+++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.h
@@ -76,6 +76,7 @@ public:
virtual StatusWith<LastVote> loadLocalLastVoteDocument(OperationContext* opCtx);
virtual Status storeLocalLastVoteDocument(OperationContext* opCtx, const LastVote& lastVote);
virtual void setGlobalTimestamp(ServiceContext* service, const Timestamp& newTime);
+ bool oplogExists(OperationContext* opCtx) override;
virtual StatusWith<OpTime> loadLastOpTime(OperationContext* opCtx);
virtual void closeConnections();
virtual void killAllUserOperations(OperationContext* opCtx);
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index c7463d168f6..ed40982042b 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -1274,10 +1274,9 @@ Status ReplicationCoordinatorImpl::_waitUntilOpTime(OperationContext* opCtx,
OpTime targetOpTime,
boost::optional<Date_t> deadline) {
if (!isMajorityCommittedRead) {
- // This assumes the read concern is "local" level.
- // We need to wait for all committed writes to be visible, even in the oplog (which uses
- // special visibility rules).
- _storage->waitForAllEarlierOplogWritesToBeVisible(opCtx);
+ if (!_externalState->oplogExists(opCtx)) {
+ return {ErrorCodes::NotYetInitialized, "The oplog does not exist."};
+ }
}
stdx::unique_lock<stdx::mutex> lock(_mutex);
@@ -1341,6 +1340,18 @@ Status ReplicationCoordinatorImpl::_waitUntilOpTime(OperationContext* opCtx,
}
}
+ lock.unlock();
+
+ if (!isMajorityCommittedRead) {
+ // This assumes the read concern is "local" level.
+ // We need to wait for all committed writes to be visible, even in the oplog (which uses
+ // special visibility rules). We must do this after waiting for our target optime, because
+ // only then do we know that it will fill in all "holes" before that time. If we do it
+ // earlier, we may return when the requested optime has been reached, but other writes
+ // at optimes before that time are not yet visible.
+ _storage->waitForAllEarlierOplogWritesToBeVisible(opCtx);
+ }
+
return Status::OK();
}
diff --git a/src/mongo/db/storage/recovery_unit.h b/src/mongo/db/storage/recovery_unit.h
index c39807af32c..8444da4f4c8 100644
--- a/src/mongo/db/storage/recovery_unit.h
+++ b/src/mongo/db/storage/recovery_unit.h
@@ -160,6 +160,8 @@ public:
* - when using ReadSource::kProvided, the timestamp provided.
* - when using ReadSource::kLastAppliedSnapshot, the timestamp chosen using the storage
* engine's last applied timestamp.
+ * - when using ReadSource::kAllCommittedSnapshot, the timestamp chosen using the storage
+ * engine's all-committed timestamp.
* - when using ReadSource::kLastApplied, the last applied timestamp at which the current
* storage transaction was opened, if one is open.
* - when using ReadSource::kMajorityCommitted, the majority committed timestamp chosen by the
@@ -265,6 +267,11 @@ public:
*/
kLastAppliedSnapshot,
/**
+ * Read from the all-committed timestamp. New transactions will always read from the same
+ * timestamp and never advance.
+ */
+ kAllCommittedSnapshot,
+ /**
* Read from the timestamp provided to setTimestampReadSource.
*/
kProvided
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp
index 1cc671dcf56..dbe8f4dc68a 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp
@@ -313,7 +313,8 @@ Status WiredTigerRecoveryUnit::obtainMajorityCommittedSnapshot() {
boost::optional<Timestamp> WiredTigerRecoveryUnit::getPointInTimeReadTimestamp() const {
if (_timestampReadSource == ReadSource::kProvided ||
- _timestampReadSource == ReadSource::kLastAppliedSnapshot) {
+ _timestampReadSource == ReadSource::kLastAppliedSnapshot ||
+ _timestampReadSource == ReadSource::kAllCommittedSnapshot) {
invariant(!_readAtTimestamp.isNull());
return _readAtTimestamp;
}
@@ -371,6 +372,13 @@ void WiredTigerRecoveryUnit::_txnOpen() {
}
break;
}
+ case ReadSource::kAllCommittedSnapshot: {
+ if (_readAtTimestamp.isNull()) {
+ _readAtTimestamp = _beginTransactionAtAllCommittedTimestamp(session);
+ break;
+ }
+ // Intentionally continue to the next case to read at the _readAtTimestamp.
+ }
case ReadSource::kLastAppliedSnapshot: {
// Only ever read the last applied timestamp once, and continue reusing it for
// subsequent transactions.
@@ -400,6 +408,24 @@ void WiredTigerRecoveryUnit::_txnOpen() {
_active = true;
}
+Timestamp WiredTigerRecoveryUnit::_beginTransactionAtAllCommittedTimestamp(WT_SESSION* session) {
+ WiredTigerBeginTxnBlock txnOpen(session, _ignorePrepared);
+ Timestamp txnTimestamp = Timestamp(_oplogManager->fetchAllCommittedValue(session->connection));
+ auto status =
+ txnOpen.setTimestamp(txnTimestamp, WiredTigerBeginTxnBlock::RoundToOldest::kRound);
+ fassert(50948, status);
+
+ // Since this is not in a critical section, we might have rounded to oldest between
+ // calling getAllCommitted and setTimestamp. We need to get the actual read timestamp we
+ // used.
+ char buf[(2 * 8 /*bytes in hex*/) + 1 /*nul terminator*/];
+ auto wtstatus = session->query_timestamp(session, buf, "get=read");
+ invariantWTOK(wtstatus);
+ uint64_t read_timestamp;
+ fassert(50949, parseNumberFromStringWithBase(buf, 16, &read_timestamp));
+ txnOpen.done();
+ return Timestamp(read_timestamp);
+}
Status WiredTigerRecoveryUnit::setTimestamp(Timestamp timestamp) {
_ensureSession();
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h
index a48ff02b35c..2149c5a3e2a 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h
@@ -148,6 +148,12 @@ private:
void _txnClose(bool commit);
void _txnOpen();
+ /**
+ * Starts a transaction at the current all-committed timestamp.
+ * Returns the timestamp the transaction was started at.
+ */
+ Timestamp _beginTransactionAtAllCommittedTimestamp(WT_SESSION* session);
+
WiredTigerSessionCache* _sessionCache; // not owned
WiredTigerOplogManager* _oplogManager; // not owned
UniqueWiredTigerSession _session;
diff --git a/src/mongo/db/transaction_participant.cpp b/src/mongo/db/transaction_participant.cpp
index 7755e0b1af0..e7efc8d2308 100644
--- a/src/mongo/db/transaction_participant.cpp
+++ b/src/mongo/db/transaction_participant.cpp
@@ -265,11 +265,15 @@ void TransactionParticipant::beginTransactionUnconditionally(TxnNumber txnNumber
_beginMultiDocumentTransaction(lg, txnNumber);
}
-void TransactionParticipant::setSpeculativeTransactionOpTimeToLastApplied(OperationContext* opCtx) {
+void TransactionParticipant::setSpeculativeTransactionOpTime(
+ OperationContext* opCtx, SpeculativeTransactionOpTime opTimeChoice) {
stdx::lock_guard<stdx::mutex> lg(_mutex);
repl::ReplicationCoordinator* replCoord =
repl::ReplicationCoordinator::get(opCtx->getClient()->getServiceContext());
- opCtx->recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kLastAppliedSnapshot);
+ opCtx->recoveryUnit()->setTimestampReadSource(
+ opTimeChoice == SpeculativeTransactionOpTime::kAllCommitted
+ ? RecoveryUnit::ReadSource::kAllCommittedSnapshot
+ : RecoveryUnit::ReadSource::kLastAppliedSnapshot);
opCtx->recoveryUnit()->preallocateSnapshot();
auto readTimestamp = opCtx->recoveryUnit()->getPointInTimeReadTimestamp();
invariant(readTimestamp);
diff --git a/src/mongo/db/transaction_participant.h b/src/mongo/db/transaction_participant.h
index 48264e4157e..57b1884db25 100644
--- a/src/mongo/db/transaction_participant.h
+++ b/src/mongo/db/transaction_participant.h
@@ -53,6 +53,11 @@ class OperationContext;
extern AtomicInt32 transactionLifetimeLimitSeconds;
+enum class SpeculativeTransactionOpTime {
+ kLastApplied,
+ kAllCommitted,
+};
+
/**
* A state machine that coordinates a distributed transaction commit with the transaction
* coordinator.
@@ -154,7 +159,8 @@ public:
/**
* Called for speculative transactions to fix the optime of the snapshot to read from.
*/
- void setSpeculativeTransactionOpTimeToLastApplied(OperationContext* opCtx);
+ void setSpeculativeTransactionOpTime(OperationContext* opCtx,
+ SpeculativeTransactionOpTime opTimeChoice);
/**
* Transfers management of transaction resources from the OperationContext to the Session.