diff options
author | Matthew Russotto <matthew.russotto@10gen.com> | 2018-09-14 10:13:35 -0400 |
---|---|---|
committer | Matthew Russotto <matthew.russotto@10gen.com> | 2018-09-14 10:17:42 -0400 |
commit | c46faf4672c81d4801014981669d770fc65b950e (patch) | |
tree | 779e165fa75ed59b9c06b474f6ca5f7b6ac97f09 | |
parent | f92a1d2aaf0f4d2874f64a5e1b3c12fc66e39d4d (diff) | |
download | mongo-c46faf4672c81d4801014981669d770fc65b950e.tar.gz |
SERVER-35821 readConcern:snapshot transactions need a read timestamp <= WT's all_committed point
15 files changed, 223 insertions, 9 deletions
diff --git a/buildscripts/resmokeconfig/suites/sharded_core_txns.yml b/buildscripts/resmokeconfig/suites/sharded_core_txns.yml index cfcf29fc681..d0a027b2e0a 100644 --- a/buildscripts/resmokeconfig/suites/sharded_core_txns.yml +++ b/buildscripts/resmokeconfig/suites/sharded_core_txns.yml @@ -66,6 +66,9 @@ selector: - jstests/core/txns/view_reads_in_transaction.js - jstests/core/txns/list_collections_not_blocked_by_txn.js - jstests/core/txns/statement_ids_accepted.js + + # Uses hangAfterCollectionInserts failpoint not available on mongos. + - jstests/core/txns/speculative_snapshot_includes_all_writes.js executor: archive: hooks: diff --git a/jstests/core/txns/speculative_snapshot_includes_all_writes.js b/jstests/core/txns/speculative_snapshot_includes_all_writes.js new file mode 100644 index 00000000000..ad961092bf6 --- /dev/null +++ b/jstests/core/txns/speculative_snapshot_includes_all_writes.js @@ -0,0 +1,112 @@ +/** + * A speculative snapshot must not include any writes ordered after any uncommitted writes. + * + * @tags: [uses_transactions] + */ +(function() { + "use strict"; + + load("jstests/libs/check_log.js"); + + const dbName = "test"; + const collName = "speculative_snapshot_includes_all_writes_1"; + const collName2 = "speculative_snapshot_includes_all_writes_2"; + const testDB = db.getSiblingDB(dbName); + const testColl = testDB[collName]; + const testColl2 = testDB[collName2]; + + testDB.runCommand({drop: collName, writeConcern: {w: "majority"}}); + testDB.runCommand({drop: collName2, writeConcern: {w: "majority"}}); + + assert.commandWorked(testDB.createCollection(collName, {writeConcern: {w: "majority"}})); + assert.commandWorked(testDB.createCollection(collName2, {writeConcern: {w: "majority"}})); + + const sessionOptions = {causalConsistency: false}; + const session = db.getMongo().startSession(sessionOptions); + const sessionDb = session.getDatabase(dbName); + const sessionColl = sessionDb.getCollection(collName); + const sessionColl2 = sessionDb.getCollection(collName2); + + const session2 = db.getMongo().startSession(sessionOptions); + const session2Db = session2.getDatabase(dbName); + const session2Coll = session2Db.getCollection(collName); + const session2Coll2 = session2Db.getCollection(collName2); + + // Clear ramlog so checkLog can't find log messages from previous times this fail point was + // enabled. + assert.commandWorked(testDB.adminCommand({clearLog: 'global'})); + + jsTest.log("Prepopulate the collections."); + assert.commandWorked(testColl.insert([{_id: 0}], {writeConcern: {w: "majority"}})); + assert.commandWorked(testColl2.insert([{_id: "a"}], {writeConcern: {w: "majority"}})); + + jsTest.log("Create the uncommitted write."); + + assert.commandWorked(db.adminCommand({ + configureFailPoint: "hangAfterCollectionInserts", + mode: "alwaysOn", + data: {collectionNS: testColl2.getFullName()} + })); + + const joinHungWrite = startParallelShell(() => { + assert.commandWorked( + db.getSiblingDB("test").speculative_snapshot_includes_all_writes_2.insert( + {_id: "b"}, {writeConcern: {w: "majority"}})); + }); + + checkLog.contains( + db.getMongo(), + "hangAfterCollectionInserts fail point enabled for " + testColl2.getFullName()); + + jsTest.log("Create a write following the uncommitted write."); + // Note this write must use local write concern; it cannot be majority committed until + // the prior uncommitted write is committed. + assert.commandWorked(testColl.insert([{_id: 1}])); + + jsTestLog("Start a snapshot transaction."); + + session.startTransaction({readConcern: {level: "snapshot"}}); + + assert.docEq([{_id: 0}], sessionColl.find().toArray()); + + assert.docEq([{_id: "a"}], sessionColl2.find().toArray()); + + jsTestLog("Start a majority-read transaction."); + + session2.startTransaction({readConcern: {level: "majority"}}); + + assert.docEq([{_id: 0}, {_id: 1}], session2Coll.find().toArray()); + + assert.docEq([{_id: "a"}], session2Coll2.find().toArray()); + + jsTestLog("Allow the uncommitted write to finish."); + assert.commandWorked(db.adminCommand({ + configureFailPoint: "hangAfterCollectionInserts", + mode: "off", + })); + + joinHungWrite(); + + jsTestLog("Double-checking that writes not committed at start of snapshot cannot appear."); + assert.docEq([{_id: 0}], sessionColl.find().toArray()); + + assert.docEq([{_id: "a"}], sessionColl2.find().toArray()); + + assert.docEq([{_id: 0}, {_id: 1}], session2Coll.find().toArray()); + + assert.docEq([{_id: "a"}], session2Coll2.find().toArray()); + + jsTestLog("Committing transactions."); + session.commitTransaction(); + session2.commitTransaction(); + + assert.docEq([{_id: 0}, {_id: 1}], sessionColl.find().toArray()); + + assert.docEq([{_id: "a"}, {_id: "b"}], sessionColl2.find().toArray()); + + assert.docEq([{_id: 0}, {_id: 1}], session2Coll.find().toArray()); + + assert.docEq([{_id: "a"}, {_id: "b"}], session2Coll2.find().toArray()); + + session.endSession(); +}()); diff --git a/src/mongo/db/catalog/collection_impl.cpp b/src/mongo/db/catalog/collection_impl.cpp index 4389b6b7641..b444e9bba83 100644 --- a/src/mongo/db/catalog/collection_impl.cpp +++ b/src/mongo/db/catalog/collection_impl.cpp @@ -101,6 +101,11 @@ namespace { // Used below to fail during inserts. MONGO_FAIL_POINT_DEFINE(failCollectionInserts); +// Used to pause after inserting collection data and calling the opObservers. Inserts to +// replicated collections that are not part of a multi-statement transaction will have generated +// their OpTime and oplog entry. +MONGO_FAIL_POINT_DEFINE(hangAfterCollectionInserts); + // Uses the collator factory to convert the BSON representation of a collator to a // CollatorInterface. Returns null if the BSONObj is empty. We expect the stored collation to be // valid, since it gets validated on collection create. @@ -365,6 +370,20 @@ Status CollectionImpl::insertDocuments(OperationContext* opCtx, opCtx->recoveryUnit()->onCommit( [this](boost::optional<Timestamp>) { notifyCappedWaitersIfNeeded(); }); + MONGO_FAIL_POINT_BLOCK(hangAfterCollectionInserts, extraData) { + const BSONObj& data = extraData.getData(); + const auto collElem = data["collectionNS"]; + // If the failpoint specifies no collection or matches the existing one, hang. + if (!collElem || _ns.ns() == collElem.str()) { + while (MONGO_FAIL_POINT(hangAfterCollectionInserts)) { + log() << "hangAfterCollectionInserts fail point enabled for " << _ns.toString() + << ". Blocking until fail point is disabled."; + mongo::sleepsecs(1); + opCtx->checkForInterrupt(); + } + } + } + return Status::OK(); } diff --git a/src/mongo/db/read_concern.cpp b/src/mongo/db/read_concern.cpp index 53372e23878..6184548f7b0 100644 --- a/src/mongo/db/read_concern.cpp +++ b/src/mongo/db/read_concern.cpp @@ -295,7 +295,11 @@ Status waitForReadConcern(OperationContext* opCtx, "node needs to be a replica set member to use readConcern: snapshot"}; } if (speculative) { - txnParticipant->setSpeculativeTransactionOpTimeToLastApplied(opCtx); + txnParticipant->setSpeculativeTransactionOpTime( + opCtx, + readConcernArgs.getOriginalLevel() == repl::ReadConcernLevel::kSnapshotReadConcern + ? SpeculativeTransactionOpTime::kAllCommitted + : SpeculativeTransactionOpTime::kLastApplied); } } diff --git a/src/mongo/db/repl/replication_coordinator_external_state.h b/src/mongo/db/repl/replication_coordinator_external_state.h index 05b9a8e85d2..0f2ff6aafcf 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state.h +++ b/src/mongo/db/repl/replication_coordinator_external_state.h @@ -174,6 +174,11 @@ public: virtual void setGlobalTimestamp(ServiceContext* service, const Timestamp& newTime) = 0; /** + * Checks if the oplog exists. + */ + virtual bool oplogExists(OperationContext* opCtx) = 0; + + /** * Gets the last optime of an operation performed on this host, from stable * storage. */ diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index cefbf8d3bd9..97dc1c9bf9f 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -621,6 +621,11 @@ void ReplicationCoordinatorExternalStateImpl::setGlobalTimestamp(ServiceContext* setNewTimestamp(ctx, newTime); } +bool ReplicationCoordinatorExternalStateImpl::oplogExists(OperationContext* opCtx) { + AutoGetCollection oplog(opCtx, NamespaceString::kRsOplogNamespace, MODE_IS); + return oplog.getCollection() != nullptr; +} + StatusWith<OpTime> ReplicationCoordinatorExternalStateImpl::loadLastOpTime( OperationContext* opCtx) { // TODO: handle WriteConflictExceptions below diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.h b/src/mongo/db/repl/replication_coordinator_external_state_impl.h index 57969506c85..e206c02b622 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.h +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.h @@ -86,6 +86,7 @@ public: virtual StatusWith<LastVote> loadLocalLastVoteDocument(OperationContext* opCtx); virtual Status storeLocalLastVoteDocument(OperationContext* opCtx, const LastVote& lastVote); virtual void setGlobalTimestamp(ServiceContext* service, const Timestamp& newTime); + bool oplogExists(OperationContext* opCtx) final; virtual StatusWith<OpTime> loadLastOpTime(OperationContext* opCtx); virtual HostAndPort getClientHostAndPort(const OperationContext* opCtx); virtual void closeConnections(); diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp index 614668b2a68..735cc494418 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp @@ -155,6 +155,10 @@ void ReplicationCoordinatorExternalStateMock::setLocalLastVoteDocument( void ReplicationCoordinatorExternalStateMock::setGlobalTimestamp(ServiceContext* service, const Timestamp& newTime) {} +bool ReplicationCoordinatorExternalStateMock::oplogExists(OperationContext* opCtx) { + return true; +} + StatusWith<OpTime> ReplicationCoordinatorExternalStateMock::loadLastOpTime( OperationContext* opCtx) { return _lastOpTime; diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.h b/src/mongo/db/repl/replication_coordinator_external_state_mock.h index d78589bd109..115395206d2 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_mock.h +++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.h @@ -76,6 +76,7 @@ public: virtual StatusWith<LastVote> loadLocalLastVoteDocument(OperationContext* opCtx); virtual Status storeLocalLastVoteDocument(OperationContext* opCtx, const LastVote& lastVote); virtual void setGlobalTimestamp(ServiceContext* service, const Timestamp& newTime); + bool oplogExists(OperationContext* opCtx) override; virtual StatusWith<OpTime> loadLastOpTime(OperationContext* opCtx); virtual void closeConnections(); virtual void killAllUserOperations(OperationContext* opCtx); diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index c7463d168f6..ed40982042b 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -1274,10 +1274,9 @@ Status ReplicationCoordinatorImpl::_waitUntilOpTime(OperationContext* opCtx, OpTime targetOpTime, boost::optional<Date_t> deadline) { if (!isMajorityCommittedRead) { - // This assumes the read concern is "local" level. - // We need to wait for all committed writes to be visible, even in the oplog (which uses - // special visibility rules). - _storage->waitForAllEarlierOplogWritesToBeVisible(opCtx); + if (!_externalState->oplogExists(opCtx)) { + return {ErrorCodes::NotYetInitialized, "The oplog does not exist."}; + } } stdx::unique_lock<stdx::mutex> lock(_mutex); @@ -1341,6 +1340,18 @@ Status ReplicationCoordinatorImpl::_waitUntilOpTime(OperationContext* opCtx, } } + lock.unlock(); + + if (!isMajorityCommittedRead) { + // This assumes the read concern is "local" level. + // We need to wait for all committed writes to be visible, even in the oplog (which uses + // special visibility rules). We must do this after waiting for our target optime, because + // only then do we know that it will fill in all "holes" before that time. If we do it + // earlier, we may return when the requested optime has been reached, but other writes + // at optimes before that time are not yet visible. + _storage->waitForAllEarlierOplogWritesToBeVisible(opCtx); + } + return Status::OK(); } diff --git a/src/mongo/db/storage/recovery_unit.h b/src/mongo/db/storage/recovery_unit.h index c39807af32c..8444da4f4c8 100644 --- a/src/mongo/db/storage/recovery_unit.h +++ b/src/mongo/db/storage/recovery_unit.h @@ -160,6 +160,8 @@ public: * - when using ReadSource::kProvided, the timestamp provided. * - when using ReadSource::kLastAppliedSnapshot, the timestamp chosen using the storage * engine's last applied timestamp. + * - when using ReadSource::kAllCommittedSnapshot, the timestamp chosen using the storage + * engine's all-committed timestamp. * - when using ReadSource::kLastApplied, the last applied timestamp at which the current * storage transaction was opened, if one is open. * - when using ReadSource::kMajorityCommitted, the majority committed timestamp chosen by the @@ -265,6 +267,11 @@ public: */ kLastAppliedSnapshot, /** + * Read from the all-committed timestamp. New transactions will always read from the same + * timestamp and never advance. + */ + kAllCommittedSnapshot, + /** * Read from the timestamp provided to setTimestampReadSource. */ kProvided diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp index 1cc671dcf56..dbe8f4dc68a 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp @@ -313,7 +313,8 @@ Status WiredTigerRecoveryUnit::obtainMajorityCommittedSnapshot() { boost::optional<Timestamp> WiredTigerRecoveryUnit::getPointInTimeReadTimestamp() const { if (_timestampReadSource == ReadSource::kProvided || - _timestampReadSource == ReadSource::kLastAppliedSnapshot) { + _timestampReadSource == ReadSource::kLastAppliedSnapshot || + _timestampReadSource == ReadSource::kAllCommittedSnapshot) { invariant(!_readAtTimestamp.isNull()); return _readAtTimestamp; } @@ -371,6 +372,13 @@ void WiredTigerRecoveryUnit::_txnOpen() { } break; } + case ReadSource::kAllCommittedSnapshot: { + if (_readAtTimestamp.isNull()) { + _readAtTimestamp = _beginTransactionAtAllCommittedTimestamp(session); + break; + } + // Intentionally continue to the next case to read at the _readAtTimestamp. + } case ReadSource::kLastAppliedSnapshot: { // Only ever read the last applied timestamp once, and continue reusing it for // subsequent transactions. @@ -400,6 +408,24 @@ void WiredTigerRecoveryUnit::_txnOpen() { _active = true; } +Timestamp WiredTigerRecoveryUnit::_beginTransactionAtAllCommittedTimestamp(WT_SESSION* session) { + WiredTigerBeginTxnBlock txnOpen(session, _ignorePrepared); + Timestamp txnTimestamp = Timestamp(_oplogManager->fetchAllCommittedValue(session->connection)); + auto status = + txnOpen.setTimestamp(txnTimestamp, WiredTigerBeginTxnBlock::RoundToOldest::kRound); + fassert(50948, status); + + // Since this is not in a critical section, we might have rounded to oldest between + // calling getAllCommitted and setTimestamp. We need to get the actual read timestamp we + // used. + char buf[(2 * 8 /*bytes in hex*/) + 1 /*nul terminator*/]; + auto wtstatus = session->query_timestamp(session, buf, "get=read"); + invariantWTOK(wtstatus); + uint64_t read_timestamp; + fassert(50949, parseNumberFromStringWithBase(buf, 16, &read_timestamp)); + txnOpen.done(); + return Timestamp(read_timestamp); +} Status WiredTigerRecoveryUnit::setTimestamp(Timestamp timestamp) { _ensureSession(); diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h index a48ff02b35c..2149c5a3e2a 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h @@ -148,6 +148,12 @@ private: void _txnClose(bool commit); void _txnOpen(); + /** + * Starts a transaction at the current all-committed timestamp. + * Returns the timestamp the transaction was started at. + */ + Timestamp _beginTransactionAtAllCommittedTimestamp(WT_SESSION* session); + WiredTigerSessionCache* _sessionCache; // not owned WiredTigerOplogManager* _oplogManager; // not owned UniqueWiredTigerSession _session; diff --git a/src/mongo/db/transaction_participant.cpp b/src/mongo/db/transaction_participant.cpp index 7755e0b1af0..e7efc8d2308 100644 --- a/src/mongo/db/transaction_participant.cpp +++ b/src/mongo/db/transaction_participant.cpp @@ -265,11 +265,15 @@ void TransactionParticipant::beginTransactionUnconditionally(TxnNumber txnNumber _beginMultiDocumentTransaction(lg, txnNumber); } -void TransactionParticipant::setSpeculativeTransactionOpTimeToLastApplied(OperationContext* opCtx) { +void TransactionParticipant::setSpeculativeTransactionOpTime( + OperationContext* opCtx, SpeculativeTransactionOpTime opTimeChoice) { stdx::lock_guard<stdx::mutex> lg(_mutex); repl::ReplicationCoordinator* replCoord = repl::ReplicationCoordinator::get(opCtx->getClient()->getServiceContext()); - opCtx->recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kLastAppliedSnapshot); + opCtx->recoveryUnit()->setTimestampReadSource( + opTimeChoice == SpeculativeTransactionOpTime::kAllCommitted + ? RecoveryUnit::ReadSource::kAllCommittedSnapshot + : RecoveryUnit::ReadSource::kLastAppliedSnapshot); opCtx->recoveryUnit()->preallocateSnapshot(); auto readTimestamp = opCtx->recoveryUnit()->getPointInTimeReadTimestamp(); invariant(readTimestamp); diff --git a/src/mongo/db/transaction_participant.h b/src/mongo/db/transaction_participant.h index 48264e4157e..57b1884db25 100644 --- a/src/mongo/db/transaction_participant.h +++ b/src/mongo/db/transaction_participant.h @@ -53,6 +53,11 @@ class OperationContext; extern AtomicInt32 transactionLifetimeLimitSeconds; +enum class SpeculativeTransactionOpTime { + kLastApplied, + kAllCommitted, +}; + /** * A state machine that coordinates a distributed transaction commit with the transaction * coordinator. @@ -154,7 +159,8 @@ public: /** * Called for speculative transactions to fix the optime of the snapshot to read from. */ - void setSpeculativeTransactionOpTimeToLastApplied(OperationContext* opCtx); + void setSpeculativeTransactionOpTime(OperationContext* opCtx, + SpeculativeTransactionOpTime opTimeChoice); /** * Transfers management of transaction resources from the OperationContext to the Session. |