diff options
author | Jack Mulrow <jack.mulrow@mongodb.com> | 2019-01-29 16:02:42 -0500 |
---|---|---|
committer | Jack Mulrow <jack.mulrow@mongodb.com> | 2019-02-08 09:52:12 -0500 |
commit | 315b7197505795f21fb62e69c122833def6cf75a (patch) | |
tree | 6ca8e78862d2bed2a2f386673810d18e89bfa09a | |
parent | b2ab47954087c481722607483cc9868a6cc713e8 (diff) | |
download | mongo-315b7197505795f21fb62e69c122833def6cf75a.tar.gz |
SERVER-39274 Mongos shouldn't upconvert txn read concern level to snapshot if none is provided
-rw-r--r-- | jstests/core/txns/concurrent_drops_and_creates.js | 26 | ||||
-rw-r--r-- | jstests/core/txns/read_concerns.js | 11 | ||||
-rw-r--r-- | jstests/sharding/explain_cmd.js | 4 | ||||
-rw-r--r-- | jstests/sharding/transactions_read_concerns.js | 12 | ||||
-rw-r--r-- | src/mongo/db/repl/read_concern_args.h | 3 | ||||
-rw-r--r-- | src/mongo/s/transaction_router.cpp | 70 | ||||
-rw-r--r-- | src/mongo/s/transaction_router.h | 11 | ||||
-rw-r--r-- | src/mongo/s/transaction_router_test.cpp | 43 |
8 files changed, 66 insertions, 114 deletions
diff --git a/jstests/core/txns/concurrent_drops_and_creates.js b/jstests/core/txns/concurrent_drops_and_creates.js index 8db02f1ae98..b458fbd41e5 100644 --- a/jstests/core/txns/concurrent_drops_and_creates.js +++ b/jstests/core/txns/concurrent_drops_and_creates.js @@ -8,8 +8,10 @@ const dbName2 = "test2"; const collNameA = "coll_A"; const collNameB = "coll_B"; - const testDB1 = db.getSiblingDB(dbName1); - const testDB2 = db.getSiblingDB(dbName2); + + const sessionOutsideTxn = db.getMongo().startSession({causalConsistency: true}); + const testDB1 = sessionOutsideTxn.getDatabase(dbName1); + const testDB2 = sessionOutsideTxn.getDatabase(dbName2); testDB1.runCommand({drop: collNameA, writeConcern: {w: "majority"}}); testDB2.runCommand({drop: collNameB, writeConcern: {w: "majority"}}); @@ -20,8 +22,8 @@ const sessionCollB = sessionDB2[collNameB]; // - // A transaction cannot write to a collection that has been dropped since the transaction - // started. + // A transaction with snapshot read concern cannot write to a collection that has been dropped + // since the transaction started. // // Ensure collection A and collection B exist. @@ -29,10 +31,12 @@ assert.commandWorked(sessionCollB.insert({})); // Start the transaction with a write to collection A. - session.startTransaction(); + session.startTransaction({readConcern: {level: "snapshot"}}); assert.commandWorked(sessionCollA.insert({})); - // Drop collection B outside of the transaction. + // Drop collection B outside of the transaction. Advance the cluster time of the session + // performing the drop to ensure it happens at a later cluster time than the transaction began. + sessionOutsideTxn.advanceClusterTime(session.getClusterTime()); assert.commandWorked(testDB2.runCommand({drop: collNameB, writeConcern: {w: "majority"}})); // We cannot write to collection B in the transaction, since it is illegal to implicitly create @@ -45,8 +49,8 @@ ErrorCodes.NoSuchTransaction); // - // A transaction cannot write to a collection that has been created since the transaction - // started. + // A transaction with snapshot read concern cannot write to a collection that has been created + // since the transaction started. // // Ensure collection A exists and collection B does not exist. @@ -54,10 +58,12 @@ testDB2.runCommand({drop: collNameB, writeConcern: {w: "majority"}}); // Start the transaction with a write to collection A. - session.startTransaction(); + session.startTransaction({readConcern: {level: "snapshot"}}); assert.commandWorked(sessionCollA.insert({})); - // Create collection B outside of the transaction. + // Create collection B outside of the transaction. Advance the cluster time of the session + // performing the drop to ensure it happens at a later cluster time than the transaction began. + sessionOutsideTxn.advanceClusterTime(session.getClusterTime()); assert.commandWorked(testDB2.runCommand({create: collNameB})); // We cannot write to collection B in the transaction, since it experienced catalog changes diff --git a/jstests/core/txns/read_concerns.js b/jstests/core/txns/read_concerns.js index 0256e49eca9..7d70ea4c8dc 100644 --- a/jstests/core/txns/read_concerns.js +++ b/jstests/core/txns/read_concerns.js @@ -20,7 +20,12 @@ // Set up the collection. assert.writeOK(sessionColl.insert({_id: 0}, {writeConcern: {w: "majority"}})); - session.startTransaction({readConcern: {level: level}}); + if (level) { + session.startTransaction({readConcern: {level: level}}); + } else { + session.startTransaction(); + } + const res = sessionDB.runCommand({find: collName}); if (supported) { assert.commandWorked(res, @@ -39,6 +44,10 @@ session.endSession(); } + // Starting a txn with no read concern level is allowed. + runTest(undefined, {causalConsistency: false}, true /*supported*/); + runTest(undefined, {causalConsistency: true}, true /*supported*/); + const kSupportedLevels = ["local", "majority", "snapshot"]; for (let level of kSupportedLevels) { runTest(level, {causalConsistency: false}, true /*supported*/); diff --git a/jstests/sharding/explain_cmd.js b/jstests/sharding/explain_cmd.js index 2c5f63753b7..5a984f5f610 100644 --- a/jstests/sharding/explain_cmd.js +++ b/jstests/sharding/explain_cmd.js @@ -141,14 +141,14 @@ }); assert.commandFailed(explain, tojson(explain)); - // Explain a changeStream, ensure an error is thrown under snapshot read concern which is the - // default read concern for a transaction. + // Explain a changeStream, ensure an error is thrown under snapshot read concern. const session = db.getMongo().startSession(); const sessionDB = session.getDatabase(db.getName()); explain = sessionDB.runCommand({ aggregate: "coll", pipeline: [{$changeStream: {}}], explain: true, + readConcern: {level: "snapshot"}, txnNumber: NumberLong(0), startTransaction: true, autocommit: false diff --git a/jstests/sharding/transactions_read_concerns.js b/jstests/sharding/transactions_read_concerns.js index 07b754706c0..914d1d8e933 100644 --- a/jstests/sharding/transactions_read_concerns.js +++ b/jstests/sharding/transactions_read_concerns.js @@ -38,7 +38,11 @@ const session = st.s.startSession(sessionOptions); const sessionDB = session.getDatabase(dbName); - session.startTransaction({readConcern: readConcern}); + if (readConcern) { + session.startTransaction({readConcern: readConcern}); + } else { + session.startTransaction(); + } // Target only the first shard. assert.commandWorked(sessionDB.runCommand({find: collName, filter: {_id: -1}})); @@ -52,7 +56,7 @@ // Depending on the transaction's read concern, the new document will or will not be visible // to the next statement. - const numExpectedDocs = readConcern.level === "snapshot" ? 0 : 1; + const numExpectedDocs = readConcern && readConcern.level === "snapshot" ? 0 : 1; assert.eq(numExpectedDocs, sessionDB[collName].find({_id: 5}).itcount(), "sharded transaction with read concern " + tojson(readConcern) + @@ -65,6 +69,10 @@ assert.writeOK(sessionDB[collName].remove({_id: 5})); } + // Specifying no read concern level is allowed and should not compute a global snapshot. + runTest(st, undefined, {causalConsistency: false}); + runTest(st, undefined, {causalConsistency: true}); + const kAllowedReadConcernLevels = ["local", "majority", "snapshot"]; for (let readConcernLevel of kAllowedReadConcernLevels) { runTest(st, {level: readConcernLevel}, {causalConsistency: false}); diff --git a/src/mongo/db/repl/read_concern_args.h b/src/mongo/db/repl/read_concern_args.h index 5fbc69d52fb..e6ef8533863 100644 --- a/src/mongo/db/repl/read_concern_args.h +++ b/src/mongo/db/repl/read_concern_args.h @@ -135,7 +135,8 @@ public: void appendInfo(BSONObjBuilder* builder) const; /** - * Returns true if any of clusterTime, opTime or level arguments are set. + * Returns true if any of clusterTime, opTime or level arguments are set. Does not differentiate + * between an unspecified read concern and an empty one (i.e. an empty BSON object). */ bool isEmpty() const; diff --git a/src/mongo/s/transaction_router.cpp b/src/mongo/s/transaction_router.cpp index b6ad3722c77..25c9d453333 100644 --- a/src/mongo/s/transaction_router.cpp +++ b/src/mongo/s/transaction_router.cpp @@ -68,7 +68,14 @@ bool isTransactionCommand(const BSONObj& cmd) { cmdName == "prepareTransaction"; } +/** + * Attaches the given atClusterTime to the readConcern object in the given command object, removing + * afterClusterTime if present. Assumes the given command object has a readConcern field and has + * readConcern level snapshot. + */ BSONObj appendAtClusterTimeToReadConcern(BSONObj cmdObj, LogicalTime atClusterTime) { + dassert(cmdObj.hasField(repl::ReadConcernArgs::kReadConcernFieldName)); + BSONObjBuilder cmdAtClusterTimeBob; for (auto&& elem : cmdObj) { if (elem.fieldNameStringData() == repl::ReadConcernArgs::kReadConcernFieldName) { @@ -82,15 +89,9 @@ BSONObj appendAtClusterTimeToReadConcern(BSONObj cmdObj, LogicalTime atClusterTi } } - // Transactions will upconvert a read concern with afterClusterTime but no level to have - // level snapshot, so a command may have a read concern field with no level. - // - // TODO SERVER-37237: Once read concern handling has been consolidated on mongos, this - // assertion can probably be removed. - if (!readConcernBob.hasField(repl::ReadConcernArgs::kLevelFieldName)) { - readConcernBob.append(repl::ReadConcernArgs::kLevelFieldName, - kReadConcernLevelSnapshotName); - } + dassert(readConcernBob.hasField(repl::ReadConcernArgs::kLevelFieldName) && + readConcernBob.asTempObj()[repl::ReadConcernArgs::kLevelFieldName].String() == + kReadConcernLevelSnapshotName); readConcernBob.append(repl::ReadConcernArgs::kAtClusterTimeFieldName, atClusterTime.asTimestamp()); @@ -110,13 +111,7 @@ BSONObj appendReadConcernForTxn(BSONObj cmd, if (cmd.hasField(repl::ReadConcernArgs::kReadConcernFieldName)) { repl::ReadConcernArgs existingReadConcernArgs; dassert(existingReadConcernArgs.initialize(cmd)); - // There may be no read concern level if the user only specified afterClusterTime and the - // transaction provided the default level. - // - // TODO SERVER-37237: Once read concern handling has been consolidated on mongos, this - // assertion can probably be simplified or removed. - dassert(existingReadConcernArgs.getLevel() == readConcernArgs.getLevel() || - !existingReadConcernArgs.hasLevel()); + dassert(existingReadConcernArgs.getLevel() == readConcernArgs.getLevel()); return atClusterTime ? appendAtClusterTimeToReadConcern(std::move(cmd), *atClusterTime) : cmd; @@ -133,8 +128,10 @@ BSONObjBuilder appendFieldsForStartTransaction(BSONObj cmd, repl::ReadConcernArgs readConcernArgs, boost::optional<LogicalTime> atClusterTime, bool doAppendStartTransaction) { - auto cmdWithReadConcern = - appendReadConcernForTxn(std::move(cmd), readConcernArgs, atClusterTime); + auto cmdWithReadConcern = !readConcernArgs.isEmpty() + ? appendReadConcernForTxn(std::move(cmd), readConcernArgs, atClusterTime) + : std::move(cmd); + BSONObjBuilder bob(std::move(cmdWithReadConcern)); if (doAppendStartTransaction) { @@ -236,10 +233,6 @@ void TransactionRouter::AtClusterTime::setTime(LogicalTime atClusterTime, StmtId _stmtIdSelectedAt = currentStmtId; } -bool TransactionRouter::AtClusterTime::isSet() const { - return _atClusterTime != LogicalTime::kUninitialized; -} - bool TransactionRouter::AtClusterTime::canChange(StmtId currentStmtId) const { return _stmtIdSelectedAt == kUninitializedStmtId || _stmtIdSelectedAt == currentStmtId; } @@ -278,16 +271,6 @@ BSONObj TransactionRouter::attachTxnFieldsIfNeeded(const ShardId& shardId, const return txnPart.attachTxnFieldsIfNeeded(cmdObj, true); } -void TransactionRouter::_verifyReadConcern() { - if (!_isRecoveringCommit) { - invariant(!_readConcernArgs.isEmpty()); - } - - if (_atClusterTime) { - invariant(_atClusterTime->isSet()); - } -} - void TransactionRouter::_verifyParticipantAtClusterTime(const Participant& participant) { const auto& participantAtClusterTime = participant.sharedOptions.atClusterTime; invariant(participantAtClusterTime); @@ -299,8 +282,6 @@ TransactionRouter::Participant* TransactionRouter::getParticipant(const ShardId& if (iter == _participants.end()) return nullptr; - _verifyReadConcern(); - if (_atClusterTime) { _verifyParticipantAtClusterTime(iter->second); } @@ -316,11 +297,10 @@ TransactionRouter::Participant& TransactionRouter::_createParticipant(const Shar _coordinatorId = shard.toString(); } - _verifyReadConcern(); - - auto sharedOptions = _atClusterTime - ? SharedTransactionOptions{_txnNumber, _readConcernArgs, _atClusterTime->getTime()} - : SharedTransactionOptions{_txnNumber, _readConcernArgs, boost::none}; + SharedTransactionOptions sharedOptions = { + _txnNumber, + _readConcernArgs, + _atClusterTime ? boost::optional<LogicalTime>(_atClusterTime->getTime()) : boost::none}; auto resultPair = _participants.try_emplace(shard.toString(), @@ -514,15 +494,11 @@ void TransactionRouter::beginOrContinueTxn(OperationContext* opCtx, txnNumber > _txnNumber); auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx); - if (!readConcernArgs.hasLevel()) { - // Transactions started without a readConcern level will use snapshot as the default. - uassertStatusOK(readConcernArgs.upconvertReadConcernLevelToSnapshot()); - } else { - uassert(ErrorCodes::InvalidOptions, - "The first command in a transaction cannot specify a readConcern level other " - "than local, majority, or snapshot", + uassert(ErrorCodes::InvalidOptions, + "The first command in a transaction cannot specify a readConcern level other " + "than local, majority, or snapshot", + !readConcernArgs.hasLevel() || isReadConcernLevelAllowedInTransaction(readConcernArgs.getLevel())); - } _readConcernArgs = readConcernArgs; } else if (action == TransactionActions::kCommit) { uassert(ErrorCodes::TransactionTooOld, diff --git a/src/mongo/s/transaction_router.h b/src/mongo/s/transaction_router.h index 9fd8925aabc..601dfc2e6d2 100644 --- a/src/mongo/s/transaction_router.h +++ b/src/mongo/s/transaction_router.h @@ -118,11 +118,6 @@ public: void setTime(LogicalTime atClusterTime, StmtId currentStmtId); /** - * True if the timestamp has been set to a non-null value. - */ - bool isSet() const; - - /** * True if the timestamp can be changed by a command running at the given statement id. */ bool canChange(StmtId currentStmtId) const; @@ -303,12 +298,6 @@ private: Participant& _createParticipant(const ShardId& shard); /** - * Asserts the transaction has a valid read concern and, if the read concern level is snapshot, - * has selected a non-null atClusterTime. - */ - void _verifyReadConcern(); - - /** * If the transaction's read concern level is snapshot, asserts the participant's atClusterTime * matches the transaction's. */ diff --git a/src/mongo/s/transaction_router_test.cpp b/src/mongo/s/transaction_router_test.cpp index d3ceafa1f3e..3f6c05637f1 100644 --- a/src/mongo/s/transaction_router_test.cpp +++ b/src/mongo/s/transaction_router_test.cpp @@ -544,7 +544,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, CannotSpecifyReadConcernAfterFir ErrorCodes::InvalidOptions); } -TEST_F(TransactionRouterTestWithDefaultSession, UpconvertToSnapshotIfNoReadConcernLevelGiven) { +TEST_F(TransactionRouterTestWithDefaultSession, PassesThroughNoReadConcernToParticipants) { repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs(); TxnNumber txnNum{3}; @@ -556,11 +556,6 @@ TEST_F(TransactionRouterTestWithDefaultSession, UpconvertToSnapshotIfNoReadConce BSONObj expectedNewObj = BSON("insert" << "test" - << "readConcern" - << BSON("level" - << "snapshot" - << "atClusterTime" - << kInMemoryLogicalTime.asTimestamp()) << "startTransaction" << true << "coordinator" @@ -577,7 +572,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, UpconvertToSnapshotIfNoReadConce } TEST_F(TransactionRouterTestWithDefaultSession, - UpconvertToSnapshotIfNoReadConcernLevelButHasAfterClusterTime) { + PassesThroughNoReadConcernLevelToParticipantsWithAfterClusterTime) { LogicalTime kAfterClusterTime(Timestamp(10, 1)); repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs(kAfterClusterTime, boost::none); @@ -592,10 +587,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, BSONObj expectedNewObj = BSON("insert" << "test" << "readConcern" - << BSON("level" - << "snapshot" - << "atClusterTime" - << kAfterClusterTime.asTimestamp()) + << BSON("afterClusterTime" << kAfterClusterTime.asTimestamp()) << "startTransaction" << true << "coordinator" @@ -653,20 +645,6 @@ TEST_F(TransactionRouterTestWithDefaultSession, RejectUnsupportedLevelsWithAfter DBException, ErrorCodes::InvalidOptions); } - - repl::ReadConcernArgs::get(operationContext()) = - repl::ReadConcernArgs(repl::OpTime(Timestamp(10, 1), 2), boost::none); - - { - - TxnNumber txnNum{3}; - auto& txnRouter(*TransactionRouter::get(operationContext())); - ASSERT_THROWS_CODE( - txnRouter.beginOrContinueTxn( - operationContext(), txnNum, TransactionRouter::TransactionActions::kStart), - DBException, - ErrorCodes::InvalidOptions); - } } TEST_F(TransactionRouterTestWithDefaultSession, CannotCommitWithoutParticipants) { @@ -1827,20 +1805,5 @@ TEST_F(TransactionRouterTestWithDefaultSessionAndStartedSnapshot, ASSERT_BSONOBJ_EQ(rcLatestInMemoryAtClusterTime, newCmd["readConcern"].Obj()); } -TEST_F(TransactionRouterTestWithDefaultSessionAndStartedSnapshot, - AddingAtClusterTimeAddsLevelSnapshotIfNotThere) { - const Timestamp existingAfterClusterTime(1, 1); - - auto& txnRouter(*TransactionRouter::get(operationContext())); - auto newCmd = txnRouter.attachTxnFieldsIfNeeded(shard1, - BSON("aggregate" - << "testColl" - << "readConcern" - << BSON("afterClusterTime" - << existingAfterClusterTime))); - - ASSERT_BSONOBJ_EQ(rcLatestInMemoryAtClusterTime, newCmd["readConcern"].Obj()); -} - } // unnamed namespace } // namespace mongo |