summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJack Mulrow <jack.mulrow@mongodb.com>2019-01-29 16:02:42 -0500
committerJack Mulrow <jack.mulrow@mongodb.com>2019-02-08 09:52:12 -0500
commit315b7197505795f21fb62e69c122833def6cf75a (patch)
tree6ca8e78862d2bed2a2f386673810d18e89bfa09a
parentb2ab47954087c481722607483cc9868a6cc713e8 (diff)
downloadmongo-315b7197505795f21fb62e69c122833def6cf75a.tar.gz
SERVER-39274 Mongos shouldn't upconvert txn read concern level to snapshot if none is provided
-rw-r--r--jstests/core/txns/concurrent_drops_and_creates.js26
-rw-r--r--jstests/core/txns/read_concerns.js11
-rw-r--r--jstests/sharding/explain_cmd.js4
-rw-r--r--jstests/sharding/transactions_read_concerns.js12
-rw-r--r--src/mongo/db/repl/read_concern_args.h3
-rw-r--r--src/mongo/s/transaction_router.cpp70
-rw-r--r--src/mongo/s/transaction_router.h11
-rw-r--r--src/mongo/s/transaction_router_test.cpp43
8 files changed, 66 insertions, 114 deletions
diff --git a/jstests/core/txns/concurrent_drops_and_creates.js b/jstests/core/txns/concurrent_drops_and_creates.js
index 8db02f1ae98..b458fbd41e5 100644
--- a/jstests/core/txns/concurrent_drops_and_creates.js
+++ b/jstests/core/txns/concurrent_drops_and_creates.js
@@ -8,8 +8,10 @@
const dbName2 = "test2";
const collNameA = "coll_A";
const collNameB = "coll_B";
- const testDB1 = db.getSiblingDB(dbName1);
- const testDB2 = db.getSiblingDB(dbName2);
+
+ const sessionOutsideTxn = db.getMongo().startSession({causalConsistency: true});
+ const testDB1 = sessionOutsideTxn.getDatabase(dbName1);
+ const testDB2 = sessionOutsideTxn.getDatabase(dbName2);
testDB1.runCommand({drop: collNameA, writeConcern: {w: "majority"}});
testDB2.runCommand({drop: collNameB, writeConcern: {w: "majority"}});
@@ -20,8 +22,8 @@
const sessionCollB = sessionDB2[collNameB];
//
- // A transaction cannot write to a collection that has been dropped since the transaction
- // started.
+ // A transaction with snapshot read concern cannot write to a collection that has been dropped
+ // since the transaction started.
//
// Ensure collection A and collection B exist.
@@ -29,10 +31,12 @@
assert.commandWorked(sessionCollB.insert({}));
// Start the transaction with a write to collection A.
- session.startTransaction();
+ session.startTransaction({readConcern: {level: "snapshot"}});
assert.commandWorked(sessionCollA.insert({}));
- // Drop collection B outside of the transaction.
+ // Drop collection B outside of the transaction. Advance the cluster time of the session
+ // performing the drop to ensure it happens at a later cluster time than the transaction began.
+ sessionOutsideTxn.advanceClusterTime(session.getClusterTime());
assert.commandWorked(testDB2.runCommand({drop: collNameB, writeConcern: {w: "majority"}}));
// We cannot write to collection B in the transaction, since it is illegal to implicitly create
@@ -45,8 +49,8 @@
ErrorCodes.NoSuchTransaction);
//
- // A transaction cannot write to a collection that has been created since the transaction
- // started.
+ // A transaction with snapshot read concern cannot write to a collection that has been created
+ // since the transaction started.
//
// Ensure collection A exists and collection B does not exist.
@@ -54,10 +58,12 @@
testDB2.runCommand({drop: collNameB, writeConcern: {w: "majority"}});
// Start the transaction with a write to collection A.
- session.startTransaction();
+ session.startTransaction({readConcern: {level: "snapshot"}});
assert.commandWorked(sessionCollA.insert({}));
- // Create collection B outside of the transaction.
+ // Create collection B outside of the transaction. Advance the cluster time of the session
+ // performing the drop to ensure it happens at a later cluster time than the transaction began.
+ sessionOutsideTxn.advanceClusterTime(session.getClusterTime());
assert.commandWorked(testDB2.runCommand({create: collNameB}));
// We cannot write to collection B in the transaction, since it experienced catalog changes
diff --git a/jstests/core/txns/read_concerns.js b/jstests/core/txns/read_concerns.js
index 0256e49eca9..7d70ea4c8dc 100644
--- a/jstests/core/txns/read_concerns.js
+++ b/jstests/core/txns/read_concerns.js
@@ -20,7 +20,12 @@
// Set up the collection.
assert.writeOK(sessionColl.insert({_id: 0}, {writeConcern: {w: "majority"}}));
- session.startTransaction({readConcern: {level: level}});
+ if (level) {
+ session.startTransaction({readConcern: {level: level}});
+ } else {
+ session.startTransaction();
+ }
+
const res = sessionDB.runCommand({find: collName});
if (supported) {
assert.commandWorked(res,
@@ -39,6 +44,10 @@
session.endSession();
}
+ // Starting a txn with no read concern level is allowed.
+ runTest(undefined, {causalConsistency: false}, true /*supported*/);
+ runTest(undefined, {causalConsistency: true}, true /*supported*/);
+
const kSupportedLevels = ["local", "majority", "snapshot"];
for (let level of kSupportedLevels) {
runTest(level, {causalConsistency: false}, true /*supported*/);
diff --git a/jstests/sharding/explain_cmd.js b/jstests/sharding/explain_cmd.js
index 2c5f63753b7..5a984f5f610 100644
--- a/jstests/sharding/explain_cmd.js
+++ b/jstests/sharding/explain_cmd.js
@@ -141,14 +141,14 @@
});
assert.commandFailed(explain, tojson(explain));
- // Explain a changeStream, ensure an error is thrown under snapshot read concern which is the
- // default read concern for a transaction.
+ // Explain a changeStream, ensure an error is thrown under snapshot read concern.
const session = db.getMongo().startSession();
const sessionDB = session.getDatabase(db.getName());
explain = sessionDB.runCommand({
aggregate: "coll",
pipeline: [{$changeStream: {}}],
explain: true,
+ readConcern: {level: "snapshot"},
txnNumber: NumberLong(0),
startTransaction: true,
autocommit: false
diff --git a/jstests/sharding/transactions_read_concerns.js b/jstests/sharding/transactions_read_concerns.js
index 07b754706c0..914d1d8e933 100644
--- a/jstests/sharding/transactions_read_concerns.js
+++ b/jstests/sharding/transactions_read_concerns.js
@@ -38,7 +38,11 @@
const session = st.s.startSession(sessionOptions);
const sessionDB = session.getDatabase(dbName);
- session.startTransaction({readConcern: readConcern});
+ if (readConcern) {
+ session.startTransaction({readConcern: readConcern});
+ } else {
+ session.startTransaction();
+ }
// Target only the first shard.
assert.commandWorked(sessionDB.runCommand({find: collName, filter: {_id: -1}}));
@@ -52,7 +56,7 @@
// Depending on the transaction's read concern, the new document will or will not be visible
// to the next statement.
- const numExpectedDocs = readConcern.level === "snapshot" ? 0 : 1;
+ const numExpectedDocs = readConcern && readConcern.level === "snapshot" ? 0 : 1;
assert.eq(numExpectedDocs,
sessionDB[collName].find({_id: 5}).itcount(),
"sharded transaction with read concern " + tojson(readConcern) +
@@ -65,6 +69,10 @@
assert.writeOK(sessionDB[collName].remove({_id: 5}));
}
+ // Specifying no read concern level is allowed and should not compute a global snapshot.
+ runTest(st, undefined, {causalConsistency: false});
+ runTest(st, undefined, {causalConsistency: true});
+
const kAllowedReadConcernLevels = ["local", "majority", "snapshot"];
for (let readConcernLevel of kAllowedReadConcernLevels) {
runTest(st, {level: readConcernLevel}, {causalConsistency: false});
diff --git a/src/mongo/db/repl/read_concern_args.h b/src/mongo/db/repl/read_concern_args.h
index 5fbc69d52fb..e6ef8533863 100644
--- a/src/mongo/db/repl/read_concern_args.h
+++ b/src/mongo/db/repl/read_concern_args.h
@@ -135,7 +135,8 @@ public:
void appendInfo(BSONObjBuilder* builder) const;
/**
- * Returns true if any of clusterTime, opTime or level arguments are set.
+ * Returns true if any of clusterTime, opTime or level arguments are set. Does not differentiate
+ * between an unspecified read concern and an empty one (i.e. an empty BSON object).
*/
bool isEmpty() const;
diff --git a/src/mongo/s/transaction_router.cpp b/src/mongo/s/transaction_router.cpp
index b6ad3722c77..25c9d453333 100644
--- a/src/mongo/s/transaction_router.cpp
+++ b/src/mongo/s/transaction_router.cpp
@@ -68,7 +68,14 @@ bool isTransactionCommand(const BSONObj& cmd) {
cmdName == "prepareTransaction";
}
+/**
+ * Attaches the given atClusterTime to the readConcern object in the given command object, removing
+ * afterClusterTime if present. Assumes the given command object has a readConcern field and has
+ * readConcern level snapshot.
+ */
BSONObj appendAtClusterTimeToReadConcern(BSONObj cmdObj, LogicalTime atClusterTime) {
+ dassert(cmdObj.hasField(repl::ReadConcernArgs::kReadConcernFieldName));
+
BSONObjBuilder cmdAtClusterTimeBob;
for (auto&& elem : cmdObj) {
if (elem.fieldNameStringData() == repl::ReadConcernArgs::kReadConcernFieldName) {
@@ -82,15 +89,9 @@ BSONObj appendAtClusterTimeToReadConcern(BSONObj cmdObj, LogicalTime atClusterTi
}
}
- // Transactions will upconvert a read concern with afterClusterTime but no level to have
- // level snapshot, so a command may have a read concern field with no level.
- //
- // TODO SERVER-37237: Once read concern handling has been consolidated on mongos, this
- // assertion can probably be removed.
- if (!readConcernBob.hasField(repl::ReadConcernArgs::kLevelFieldName)) {
- readConcernBob.append(repl::ReadConcernArgs::kLevelFieldName,
- kReadConcernLevelSnapshotName);
- }
+ dassert(readConcernBob.hasField(repl::ReadConcernArgs::kLevelFieldName) &&
+ readConcernBob.asTempObj()[repl::ReadConcernArgs::kLevelFieldName].String() ==
+ kReadConcernLevelSnapshotName);
readConcernBob.append(repl::ReadConcernArgs::kAtClusterTimeFieldName,
atClusterTime.asTimestamp());
@@ -110,13 +111,7 @@ BSONObj appendReadConcernForTxn(BSONObj cmd,
if (cmd.hasField(repl::ReadConcernArgs::kReadConcernFieldName)) {
repl::ReadConcernArgs existingReadConcernArgs;
dassert(existingReadConcernArgs.initialize(cmd));
- // There may be no read concern level if the user only specified afterClusterTime and the
- // transaction provided the default level.
- //
- // TODO SERVER-37237: Once read concern handling has been consolidated on mongos, this
- // assertion can probably be simplified or removed.
- dassert(existingReadConcernArgs.getLevel() == readConcernArgs.getLevel() ||
- !existingReadConcernArgs.hasLevel());
+ dassert(existingReadConcernArgs.getLevel() == readConcernArgs.getLevel());
return atClusterTime ? appendAtClusterTimeToReadConcern(std::move(cmd), *atClusterTime)
: cmd;
@@ -133,8 +128,10 @@ BSONObjBuilder appendFieldsForStartTransaction(BSONObj cmd,
repl::ReadConcernArgs readConcernArgs,
boost::optional<LogicalTime> atClusterTime,
bool doAppendStartTransaction) {
- auto cmdWithReadConcern =
- appendReadConcernForTxn(std::move(cmd), readConcernArgs, atClusterTime);
+ auto cmdWithReadConcern = !readConcernArgs.isEmpty()
+ ? appendReadConcernForTxn(std::move(cmd), readConcernArgs, atClusterTime)
+ : std::move(cmd);
+
BSONObjBuilder bob(std::move(cmdWithReadConcern));
if (doAppendStartTransaction) {
@@ -236,10 +233,6 @@ void TransactionRouter::AtClusterTime::setTime(LogicalTime atClusterTime, StmtId
_stmtIdSelectedAt = currentStmtId;
}
-bool TransactionRouter::AtClusterTime::isSet() const {
- return _atClusterTime != LogicalTime::kUninitialized;
-}
-
bool TransactionRouter::AtClusterTime::canChange(StmtId currentStmtId) const {
return _stmtIdSelectedAt == kUninitializedStmtId || _stmtIdSelectedAt == currentStmtId;
}
@@ -278,16 +271,6 @@ BSONObj TransactionRouter::attachTxnFieldsIfNeeded(const ShardId& shardId, const
return txnPart.attachTxnFieldsIfNeeded(cmdObj, true);
}
-void TransactionRouter::_verifyReadConcern() {
- if (!_isRecoveringCommit) {
- invariant(!_readConcernArgs.isEmpty());
- }
-
- if (_atClusterTime) {
- invariant(_atClusterTime->isSet());
- }
-}
-
void TransactionRouter::_verifyParticipantAtClusterTime(const Participant& participant) {
const auto& participantAtClusterTime = participant.sharedOptions.atClusterTime;
invariant(participantAtClusterTime);
@@ -299,8 +282,6 @@ TransactionRouter::Participant* TransactionRouter::getParticipant(const ShardId&
if (iter == _participants.end())
return nullptr;
- _verifyReadConcern();
-
if (_atClusterTime) {
_verifyParticipantAtClusterTime(iter->second);
}
@@ -316,11 +297,10 @@ TransactionRouter::Participant& TransactionRouter::_createParticipant(const Shar
_coordinatorId = shard.toString();
}
- _verifyReadConcern();
-
- auto sharedOptions = _atClusterTime
- ? SharedTransactionOptions{_txnNumber, _readConcernArgs, _atClusterTime->getTime()}
- : SharedTransactionOptions{_txnNumber, _readConcernArgs, boost::none};
+ SharedTransactionOptions sharedOptions = {
+ _txnNumber,
+ _readConcernArgs,
+ _atClusterTime ? boost::optional<LogicalTime>(_atClusterTime->getTime()) : boost::none};
auto resultPair =
_participants.try_emplace(shard.toString(),
@@ -514,15 +494,11 @@ void TransactionRouter::beginOrContinueTxn(OperationContext* opCtx,
txnNumber > _txnNumber);
auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx);
- if (!readConcernArgs.hasLevel()) {
- // Transactions started without a readConcern level will use snapshot as the default.
- uassertStatusOK(readConcernArgs.upconvertReadConcernLevelToSnapshot());
- } else {
- uassert(ErrorCodes::InvalidOptions,
- "The first command in a transaction cannot specify a readConcern level other "
- "than local, majority, or snapshot",
+ uassert(ErrorCodes::InvalidOptions,
+ "The first command in a transaction cannot specify a readConcern level other "
+ "than local, majority, or snapshot",
+ !readConcernArgs.hasLevel() ||
isReadConcernLevelAllowedInTransaction(readConcernArgs.getLevel()));
- }
_readConcernArgs = readConcernArgs;
} else if (action == TransactionActions::kCommit) {
uassert(ErrorCodes::TransactionTooOld,
diff --git a/src/mongo/s/transaction_router.h b/src/mongo/s/transaction_router.h
index 9fd8925aabc..601dfc2e6d2 100644
--- a/src/mongo/s/transaction_router.h
+++ b/src/mongo/s/transaction_router.h
@@ -118,11 +118,6 @@ public:
void setTime(LogicalTime atClusterTime, StmtId currentStmtId);
/**
- * True if the timestamp has been set to a non-null value.
- */
- bool isSet() const;
-
- /**
* True if the timestamp can be changed by a command running at the given statement id.
*/
bool canChange(StmtId currentStmtId) const;
@@ -303,12 +298,6 @@ private:
Participant& _createParticipant(const ShardId& shard);
/**
- * Asserts the transaction has a valid read concern and, if the read concern level is snapshot,
- * has selected a non-null atClusterTime.
- */
- void _verifyReadConcern();
-
- /**
* If the transaction's read concern level is snapshot, asserts the participant's atClusterTime
* matches the transaction's.
*/
diff --git a/src/mongo/s/transaction_router_test.cpp b/src/mongo/s/transaction_router_test.cpp
index d3ceafa1f3e..3f6c05637f1 100644
--- a/src/mongo/s/transaction_router_test.cpp
+++ b/src/mongo/s/transaction_router_test.cpp
@@ -544,7 +544,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, CannotSpecifyReadConcernAfterFir
ErrorCodes::InvalidOptions);
}
-TEST_F(TransactionRouterTestWithDefaultSession, UpconvertToSnapshotIfNoReadConcernLevelGiven) {
+TEST_F(TransactionRouterTestWithDefaultSession, PassesThroughNoReadConcernToParticipants) {
repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs();
TxnNumber txnNum{3};
@@ -556,11 +556,6 @@ TEST_F(TransactionRouterTestWithDefaultSession, UpconvertToSnapshotIfNoReadConce
BSONObj expectedNewObj = BSON("insert"
<< "test"
- << "readConcern"
- << BSON("level"
- << "snapshot"
- << "atClusterTime"
- << kInMemoryLogicalTime.asTimestamp())
<< "startTransaction"
<< true
<< "coordinator"
@@ -577,7 +572,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, UpconvertToSnapshotIfNoReadConce
}
TEST_F(TransactionRouterTestWithDefaultSession,
- UpconvertToSnapshotIfNoReadConcernLevelButHasAfterClusterTime) {
+ PassesThroughNoReadConcernLevelToParticipantsWithAfterClusterTime) {
LogicalTime kAfterClusterTime(Timestamp(10, 1));
repl::ReadConcernArgs::get(operationContext()) =
repl::ReadConcernArgs(kAfterClusterTime, boost::none);
@@ -592,10 +587,7 @@ TEST_F(TransactionRouterTestWithDefaultSession,
BSONObj expectedNewObj = BSON("insert"
<< "test"
<< "readConcern"
- << BSON("level"
- << "snapshot"
- << "atClusterTime"
- << kAfterClusterTime.asTimestamp())
+ << BSON("afterClusterTime" << kAfterClusterTime.asTimestamp())
<< "startTransaction"
<< true
<< "coordinator"
@@ -653,20 +645,6 @@ TEST_F(TransactionRouterTestWithDefaultSession, RejectUnsupportedLevelsWithAfter
DBException,
ErrorCodes::InvalidOptions);
}
-
- repl::ReadConcernArgs::get(operationContext()) =
- repl::ReadConcernArgs(repl::OpTime(Timestamp(10, 1), 2), boost::none);
-
- {
-
- TxnNumber txnNum{3};
- auto& txnRouter(*TransactionRouter::get(operationContext()));
- ASSERT_THROWS_CODE(
- txnRouter.beginOrContinueTxn(
- operationContext(), txnNum, TransactionRouter::TransactionActions::kStart),
- DBException,
- ErrorCodes::InvalidOptions);
- }
}
TEST_F(TransactionRouterTestWithDefaultSession, CannotCommitWithoutParticipants) {
@@ -1827,20 +1805,5 @@ TEST_F(TransactionRouterTestWithDefaultSessionAndStartedSnapshot,
ASSERT_BSONOBJ_EQ(rcLatestInMemoryAtClusterTime, newCmd["readConcern"].Obj());
}
-TEST_F(TransactionRouterTestWithDefaultSessionAndStartedSnapshot,
- AddingAtClusterTimeAddsLevelSnapshotIfNotThere) {
- const Timestamp existingAfterClusterTime(1, 1);
-
- auto& txnRouter(*TransactionRouter::get(operationContext()));
- auto newCmd = txnRouter.attachTxnFieldsIfNeeded(shard1,
- BSON("aggregate"
- << "testColl"
- << "readConcern"
- << BSON("afterClusterTime"
- << existingAfterClusterTime)));
-
- ASSERT_BSONOBJ_EQ(rcLatestInMemoryAtClusterTime, newCmd["readConcern"].Obj());
-}
-
} // unnamed namespace
} // namespace mongo