summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorJack Mulrow <jack.mulrow@mongodb.com>2018-08-29 14:37:23 -0400
committerJack Mulrow <jack.mulrow@mongodb.com>2018-09-06 18:46:12 -0400
commit3257bb958a29a23d5101a16c49d97536e5fba056 (patch)
tree54871c65289f89b954e2e91f183aac00ea730443 /src
parentb4f3327901f58083d376064a6d89680325b26964 (diff)
downloadmongo-3257bb958a29a23d5101a16c49d97536e5fba056.tar.gz
SERVER-36557 Compute atClusterTime for cluster batch writes and findAndModify
Diffstat (limited to 'src')
-rw-r--r--src/mongo/s/commands/SConscript7
-rw-r--r--src/mongo/s/commands/cluster_command_test_fixture.cpp108
-rw-r--r--src/mongo/s/commands/cluster_command_test_fixture.h39
-rw-r--r--src/mongo/s/commands/cluster_delete_test.cpp76
-rw-r--r--src/mongo/s/commands/cluster_distinct_test.cpp100
-rw-r--r--src/mongo/s/commands/cluster_find_and_modify_cmd.cpp8
-rw-r--r--src/mongo/s/commands/cluster_find_and_modify_test.cpp81
-rw-r--r--src/mongo/s/commands/cluster_insert_test.cpp76
-rw-r--r--src/mongo/s/commands/cluster_update_test.cpp76
-rw-r--r--src/mongo/s/commands/cluster_write_cmd.cpp5
-rw-r--r--src/mongo/s/query/cluster_aggregate_test.cpp68
-rw-r--r--src/mongo/s/transaction/at_cluster_time_util.cpp13
-rw-r--r--src/mongo/s/transaction/compute_at_cluster_time_test.cpp16
-rw-r--r--src/mongo/s/transaction/transaction_router.cpp24
-rw-r--r--src/mongo/s/transaction/transaction_router.h6
-rw-r--r--src/mongo/s/write_ops/SConscript1
-rw-r--r--src/mongo/s/write_ops/batch_write_exec.cpp13
-rw-r--r--src/mongo/s/write_ops/batch_write_exec_test.cpp163
-rw-r--r--src/mongo/s/write_ops/batch_write_op.cpp4
-rw-r--r--src/mongo/s/write_ops/batch_write_op.h7
20 files changed, 735 insertions, 156 deletions
diff --git a/src/mongo/s/commands/SConscript b/src/mongo/s/commands/SConscript
index 3c6fe218bfd..7e49e001480 100644
--- a/src/mongo/s/commands/SConscript
+++ b/src/mongo/s/commands/SConscript
@@ -146,13 +146,18 @@ env.Library(
)
env.CppUnitTest(
- target="cluster_distinct_test",
+ target="cluster_commands_test",
source=[
+ "cluster_delete_test.cpp",
"cluster_distinct_test.cpp",
+ "cluster_find_and_modify_test.cpp",
+ "cluster_insert_test.cpp",
+ "cluster_update_test.cpp",
],
LIBDEPS=[
'cluster_commands',
'cluster_command_test_fixture',
+ '$BUILD_DIR/mongo/db/auth/authmocks',
'$BUILD_DIR/mongo/db/auth/saslauth',
],
)
diff --git a/src/mongo/s/commands/cluster_command_test_fixture.cpp b/src/mongo/s/commands/cluster_command_test_fixture.cpp
index 2d3bec52e9c..e9c44a154ef 100644
--- a/src/mongo/s/commands/cluster_command_test_fixture.cpp
+++ b/src/mongo/s/commands/cluster_command_test_fixture.cpp
@@ -39,6 +39,7 @@
#include "mongo/db/logical_clock.h"
#include "mongo/db/logical_session_cache_noop.h"
#include "mongo/db/logical_time_validator.h"
+#include "mongo/s/cluster_last_error_info.h"
#include "mongo/util/log.h"
namespace mongo {
@@ -62,6 +63,26 @@ void ClusterCommandTestFixture::setUp() {
LogicalTimeValidator::set(getServiceContext(), std::move(validator));
LogicalSessionCache::set(getServiceContext(), stdx::make_unique<LogicalSessionCacheNoop>());
+
+ loadRoutingTableWithTwoChunksAndTwoShards(kNss);
+}
+
+BSONObj ClusterCommandTestFixture::_makeCmd(BSONObj cmdObj, bool includeAfterClusterTime) {
+ BSONObjBuilder bob(cmdObj);
+ // Each command runs in a new session.
+ bob.append("lsid", makeLogicalSessionIdForTest().toBSON());
+ bob.append("txnNumber", TxnNumber(1));
+ bob.append("autocommit", false);
+ bob.append("startTransaction", true);
+
+ BSONObjBuilder readConcernBob = bob.subobjStart(repl::ReadConcernArgs::kReadConcernFieldName);
+ readConcernBob.append("level", "snapshot");
+ if (includeAfterClusterTime) {
+ readConcernBob.append("afterClusterTime", kAfterClusterTime);
+ }
+
+ readConcernBob.doneFast();
+ return bob.obj();
}
void ClusterCommandTestFixture::expectReturnsError(ErrorCodes::Error code) {
@@ -79,6 +100,15 @@ DbResponse ClusterCommandTestFixture::runCommand(BSONObj cmd) {
const auto opMsgRequest = OpMsgRequest::fromDBAndBody(kNss.db(), cmd);
+ // Ensure the clusterGLE on the Client has not yet been initialized.
+ ASSERT(!ClusterLastErrorInfo::get(client.get()));
+
+ // Initialize the cluster last error info for the client with a new request.
+ ClusterLastErrorInfo::get(client.get()) = std::make_shared<ClusterLastErrorInfo>();
+ ASSERT(ClusterLastErrorInfo::get(client.get()));
+ auto clusterGLE = ClusterLastErrorInfo::get(client.get());
+ clusterGLE->newRequest();
+
return Strategy::clientCommand(opCtx.get(), opMsgRequest.serialize());
}
@@ -142,4 +172,82 @@ void ClusterCommandTestFixture::runCommandMaxErrors(BSONObj cmd,
future.timed_get(kFutureTimeout);
}
+void ClusterCommandTestFixture::testNoErrors(BSONObj targetedCmd, BSONObj scatterGatherCmd) {
+
+ // Target one shard.
+ runCommandSuccessful(_makeCmd(targetedCmd), true);
+
+ // Target all shards.
+ if (!scatterGatherCmd.isEmpty()) {
+ runCommandSuccessful(_makeCmd(scatterGatherCmd), false);
+ }
+}
+
+void ClusterCommandTestFixture::testRetryOnSnapshotError(BSONObj targetedCmd,
+ BSONObj scatterGatherCmd) {
+ // Target one shard.
+ runCommandOneError(_makeCmd(targetedCmd), ErrorCodes::SnapshotUnavailable, true);
+ runCommandOneError(_makeCmd(targetedCmd), ErrorCodes::SnapshotTooOld, true);
+
+ // Target all shards
+ if (!scatterGatherCmd.isEmpty()) {
+ runCommandOneError(_makeCmd(scatterGatherCmd), ErrorCodes::SnapshotUnavailable, false);
+ runCommandOneError(_makeCmd(scatterGatherCmd), ErrorCodes::SnapshotTooOld, false);
+ }
+}
+
+void ClusterCommandTestFixture::testMaxRetriesSnapshotErrors(BSONObj targetedCmd,
+ BSONObj scatterGatherCmd) {
+ // Target one shard.
+ runCommandMaxErrors(_makeCmd(targetedCmd), ErrorCodes::SnapshotUnavailable, true);
+ runCommandMaxErrors(_makeCmd(targetedCmd), ErrorCodes::SnapshotTooOld, true);
+
+ // Target all shards
+ if (!scatterGatherCmd.isEmpty()) {
+ runCommandMaxErrors(_makeCmd(scatterGatherCmd), ErrorCodes::SnapshotUnavailable, false);
+ runCommandMaxErrors(_makeCmd(scatterGatherCmd), ErrorCodes::SnapshotTooOld, false);
+ }
+}
+
+void ClusterCommandTestFixture::testAttachesAtClusterTimeForSnapshotReadConcern(
+ BSONObj targetedCmd, BSONObj scatterGatherCmd) {
+
+ auto containsAtClusterTime = [](const executor::RemoteCommandRequest& request) {
+ ASSERT(!request.cmdObj["readConcern"]["atClusterTime"].eoo());
+ };
+
+ // Target one shard.
+ runCommandInspectRequests(_makeCmd(targetedCmd), containsAtClusterTime, true);
+
+ // Target all shards.
+ if (!scatterGatherCmd.isEmpty()) {
+ runCommandInspectRequests(_makeCmd(scatterGatherCmd), containsAtClusterTime, false);
+ }
+}
+
+void ClusterCommandTestFixture::testSnapshotReadConcernWithAfterClusterTime(
+ BSONObj targetedCmd, BSONObj scatterGatherCmd) {
+
+ auto containsAtClusterTimeNoAfterClusterTime =
+ [&](const executor::RemoteCommandRequest& request) {
+ ASSERT(!request.cmdObj["readConcern"]["atClusterTime"].eoo());
+ ASSERT(request.cmdObj["readConcern"]["afterClusterTime"].eoo());
+
+ // The chosen atClusterTime should be greater than or equal to the request's
+ // afterClusterTime.
+ ASSERT_GTE(LogicalTime(request.cmdObj["readConcern"]["atClusterTime"].timestamp()),
+ LogicalTime(kAfterClusterTime));
+ };
+
+ // Target one shard.
+ runCommandInspectRequests(
+ _makeCmd(targetedCmd, true), containsAtClusterTimeNoAfterClusterTime, true);
+
+ // Target all shards.
+ if (!scatterGatherCmd.isEmpty()) {
+ runCommandInspectRequests(
+ _makeCmd(scatterGatherCmd, true), containsAtClusterTimeNoAfterClusterTime, false);
+ }
+}
+
} // namespace mongo
diff --git a/src/mongo/s/commands/cluster_command_test_fixture.h b/src/mongo/s/commands/cluster_command_test_fixture.h
index 9ddf001d080..71cc2f2e830 100644
--- a/src/mongo/s/commands/cluster_command_test_fixture.h
+++ b/src/mongo/s/commands/cluster_command_test_fixture.h
@@ -45,6 +45,8 @@ protected:
const LogicalTime kInMemoryLogicalTime = LogicalTime(Timestamp(10, 1));
+ const Timestamp kAfterClusterTime = Timestamp(50, 2);
+
void setUp() override;
virtual void expectInspectRequest(int shardIndex, InspectionCallback cb) = 0;
@@ -62,6 +64,43 @@ protected:
void runCommandInspectRequests(BSONObj cmd, InspectionCallback cb, bool isTargeted);
void runCommandMaxErrors(BSONObj cmd, ErrorCodes::Error code, bool isTargeted);
+
+ /**
+ * Verifies that running the given commands through mongos will succeed.
+ */
+ void testNoErrors(BSONObj targetedCmd, BSONObj scatterGatherCmd = BSONObj());
+
+ /**
+ * Verifies that the given commands will retry on a snapshot error.
+ */
+ void testRetryOnSnapshotError(BSONObj targetedCmd, BSONObj scatterGatherCmd = BSONObj());
+
+ /**
+ * Verifies that the given commands will retry up to the max retry attempts on snapshot
+ * errors then return the final errors they receive.
+ */
+ void testMaxRetriesSnapshotErrors(BSONObj targetedCmd, BSONObj scatterGatherCmd = BSONObj());
+
+ /**
+ * Verifies that atClusterTime is attached to the given commands.
+ */
+ void testAttachesAtClusterTimeForSnapshotReadConcern(BSONObj targetedCmd,
+ BSONObj scatterGatherCmd = BSONObj());
+
+ /**
+ * Verifies that the chosen atClusterTime is greater than or equal to each command's
+ * afterClusterTime.
+ */
+ void testSnapshotReadConcernWithAfterClusterTime(BSONObj targetedCmd,
+ BSONObj scatterGatherCmd = BSONObj());
+
+private:
+ /**
+ * Makes a new command object from the one given by apppending read concern
+ * snapshot and the appropriate transaction options. If includeAfterClusterTime
+ * is true, also appends afterClusterTime to the read concern.
+ */
+ BSONObj _makeCmd(BSONObj cmdObj, bool includeAfterClusterTime = false);
};
} // namespace mongo
diff --git a/src/mongo/s/commands/cluster_delete_test.cpp b/src/mongo/s/commands/cluster_delete_test.cpp
new file mode 100644
index 00000000000..275e33254ec
--- /dev/null
+++ b/src/mongo/s/commands/cluster_delete_test.cpp
@@ -0,0 +1,76 @@
+/**
+* Copyright (C) 2018 MongoDB Inc.
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License, version 3,
+* as published by the Free Software Foundation.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see <http://www.gnu.org/licenses/>.
+*
+* As a special exception, the copyright holders give permission to link the
+* code of portions of this program with the OpenSSL library under certain
+* conditions as described in each individual source file and distribute
+* linked combinations including the program with the OpenSSL library. You
+* must comply with the GNU Affero General Public License in all respects
+* for all of the code used other than as permitted herein. If you modify
+* file(s) with this exception, you may extend this exception to your
+* version of the file(s), but you are not obligated to do so. If you do not
+* wish to do so, delete this exception statement from your version. If you
+* delete this exception statement from all source files in the program,
+* then also delete it in the license file.
+*/
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/s/commands/cluster_command_test_fixture.h"
+#include "mongo/util/log.h"
+
+namespace mongo {
+namespace {
+
+class ClusterDeleteTest : public ClusterCommandTestFixture {
+protected:
+ const BSONObj kDeleteCmdTargeted{
+ fromjson("{delete: 'coll', deletes: [{q: {'_id': -1}, limit: 0}]}")};
+
+ const BSONObj kDeleteCmdScatterGather{
+ fromjson("{delete: 'coll', deletes: [{q: {'_id': {$gte: -1}}, limit: 0}]}")};
+
+ void expectInspectRequest(int shardIndex, InspectionCallback cb) override {
+ onCommandForPoolExecutor([&](const executor::RemoteCommandRequest& request) {
+ ASSERT_EQ(kNss.coll(), request.cmdObj.firstElement().valueStringData());
+ cb(request);
+ return BSON("n" << 1);
+ });
+ }
+
+ void expectReturnsSuccess(int shardIndex) override {
+ onCommandForPoolExecutor([this, shardIndex](const executor::RemoteCommandRequest& request) {
+ ASSERT_EQ(kNss.coll(), request.cmdObj.firstElement().valueStringData());
+ return BSON("n" << 1);
+ });
+ }
+};
+
+TEST_F(ClusterDeleteTest, NoErrors) {
+ testNoErrors(kDeleteCmdTargeted, kDeleteCmdScatterGather);
+}
+
+TEST_F(ClusterDeleteTest, AttachesAtClusterTimeForSnapshotReadConcern) {
+ testAttachesAtClusterTimeForSnapshotReadConcern(kDeleteCmdTargeted, kDeleteCmdScatterGather);
+}
+
+TEST_F(ClusterDeleteTest, SnapshotReadConcernWithAfterClusterTime) {
+ testSnapshotReadConcernWithAfterClusterTime(kDeleteCmdTargeted, kDeleteCmdScatterGather);
+}
+
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/s/commands/cluster_distinct_test.cpp b/src/mongo/s/commands/cluster_distinct_test.cpp
index 16c2468e3a1..2224d0f3adb 100644
--- a/src/mongo/s/commands/cluster_distinct_test.cpp
+++ b/src/mongo/s/commands/cluster_distinct_test.cpp
@@ -38,39 +38,10 @@ namespace {
class ClusterDistinctTest : public ClusterCommandTestFixture {
protected:
- const Timestamp kAfterClusterTime = Timestamp(50, 2);
-
const BSONObj kDistinctCmdTargeted{
- fromjson("{distinct: 'coll', key: 'x', query: {'_id': {$lt: -1}}, autocommit: false, "
- "txnNumber: NumberLong(1), startTransaction: true}")};
-
- const BSONObj kDistinctCmdScatterGather{
- fromjson("{distinct: 'coll', key: '_id', autocommit: false, txnNumber: NumberLong(1), "
- "startTransaction: true}")};
-
- BSONObj appendLogicalSessionIdAndSnapshotReadConcern(BSONObj cmdObj,
- bool includeAfterClusterTime) {
- BSONObjBuilder bob(cmdObj);
- bob.append("lsid", makeLogicalSessionIdForTest().toBSON());
- BSONObjBuilder readConcernBob =
- bob.subobjStart(repl::ReadConcernArgs::kReadConcernFieldName);
- readConcernBob.append("level", "snapshot");
- if (includeAfterClusterTime) {
- readConcernBob.append("afterClusterTime", kAfterClusterTime);
- }
- readConcernBob.doneFast();
- return bob.obj();
- }
-
- BSONObj distinctCmdTargeted(bool includeAfterClusterTime = false) {
- return appendLogicalSessionIdAndSnapshotReadConcern(kDistinctCmdTargeted,
- includeAfterClusterTime);
- }
+ fromjson("{distinct: 'coll', key: 'x', query: {'_id': {$lt: -1}}}")};
- BSONObj distinctCmdScatterGather(bool includeAfterClusterTime = false) {
- return appendLogicalSessionIdAndSnapshotReadConcern(kDistinctCmdScatterGather,
- includeAfterClusterTime);
- }
+ const BSONObj kDistinctCmdScatterGather{fromjson("{distinct: 'coll', key: '_id'}")};
void expectInspectRequest(int shardIndex, InspectionCallback cb) override {
onCommandForPoolExecutor([&](const executor::RemoteCommandRequest& request) {
@@ -89,79 +60,24 @@ protected:
};
TEST_F(ClusterDistinctTest, NoErrors) {
- loadRoutingTableWithTwoChunksAndTwoShards(kNss);
-
- // Target one shard.
- runCommandSuccessful(distinctCmdTargeted(), true);
-
- // Target all shards.
- runCommandSuccessful(distinctCmdScatterGather(), false);
+ testNoErrors(kDistinctCmdTargeted, kDistinctCmdScatterGather);
}
-// Verify distinct through mongos will retry on a snapshot error.
TEST_F(ClusterDistinctTest, RetryOnSnapshotError) {
- loadRoutingTableWithTwoChunksAndTwoShards(kNss);
-
- // Target one shard.
- runCommandOneError(distinctCmdTargeted(), ErrorCodes::SnapshotUnavailable, true);
- runCommandOneError(distinctCmdTargeted(), ErrorCodes::SnapshotTooOld, true);
-
- // Target all shards
- runCommandOneError(distinctCmdScatterGather(), ErrorCodes::SnapshotUnavailable, false);
- runCommandOneError(distinctCmdScatterGather(), ErrorCodes::SnapshotTooOld, false);
+ testRetryOnSnapshotError(kDistinctCmdTargeted, kDistinctCmdScatterGather);
}
-// Verify distinct commands will retry up to its max retry attempts on snapshot errors
-// then return the final error it receives.
TEST_F(ClusterDistinctTest, MaxRetriesSnapshotErrors) {
- loadRoutingTableWithTwoChunksAndTwoShards(kNss);
-
- // Target one shard.
- runCommandMaxErrors(distinctCmdTargeted(), ErrorCodes::SnapshotUnavailable, true);
- runCommandMaxErrors(distinctCmdTargeted(), ErrorCodes::SnapshotTooOld, true);
-
- // Target all shards
- runCommandMaxErrors(distinctCmdScatterGather(), ErrorCodes::SnapshotUnavailable, false);
- runCommandMaxErrors(distinctCmdScatterGather(), ErrorCodes::SnapshotTooOld, false);
+ testMaxRetriesSnapshotErrors(kDistinctCmdTargeted, kDistinctCmdScatterGather);
}
TEST_F(ClusterDistinctTest, AttachesAtClusterTimeForSnapshotReadConcern) {
- loadRoutingTableWithTwoChunksAndTwoShards(kNss);
-
- auto containsAtClusterTime = [](const executor::RemoteCommandRequest& request) {
- ASSERT(!request.cmdObj["readConcern"]["atClusterTime"].eoo());
- };
-
- // Target one shard.
- runCommandInspectRequests(distinctCmdTargeted(), containsAtClusterTime, true);
-
- // Target all shards.
- runCommandInspectRequests(distinctCmdScatterGather(), containsAtClusterTime, false);
+ testAttachesAtClusterTimeForSnapshotReadConcern(kDistinctCmdTargeted,
+ kDistinctCmdScatterGather);
}
TEST_F(ClusterDistinctTest, SnapshotReadConcernWithAfterClusterTime) {
- loadRoutingTableWithTwoChunksAndTwoShards(kNss);
-
- auto containsAtClusterTimeNoAfterClusterTime =
- [&](const executor::RemoteCommandRequest& request) {
- ASSERT(!request.cmdObj["readConcern"]["atClusterTime"].eoo());
- ASSERT(request.cmdObj["readConcern"]["afterClusterTime"].eoo());
-
- // The chosen atClusterTime should be greater than or equal to the request's
- // afterClusterTime.
- ASSERT_GTE(LogicalTime(request.cmdObj["readConcern"]["atClusterTime"].timestamp()),
- LogicalTime(kAfterClusterTime));
- };
-
- // Target one shard.
- runCommandInspectRequests(distinctCmdTargeted(true /*includeAfterClusterTime*/),
- containsAtClusterTimeNoAfterClusterTime,
- true);
-
- // Target all shards.
- runCommandInspectRequests(distinctCmdScatterGather(true /*includeAfterClusterTime*/),
- containsAtClusterTimeNoAfterClusterTime,
- false);
+ testSnapshotReadConcernWithAfterClusterTime(kDistinctCmdTargeted, kDistinctCmdScatterGather);
}
} // namespace
diff --git a/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp b/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp
index 6f61f915358..55ce1e94578 100644
--- a/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp
+++ b/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp
@@ -45,6 +45,7 @@
#include "mongo/s/commands/strategy.h"
#include "mongo/s/grid.h"
#include "mongo/s/stale_exception.h"
+#include "mongo/s/transaction/transaction_router.h"
#include "mongo/s/write_ops/cluster_write.h"
#include "mongo/util/timer.h"
@@ -202,6 +203,10 @@ private:
const NamespaceString& nss,
const BSONObj& cmdObj,
BSONObjBuilder* result) {
+ if (auto txnRouter = TransactionRouter::get(opCtx)) {
+ txnRouter->setAtClusterTimeToLatestTime(opCtx);
+ }
+
const auto response = [&] {
std::vector<AsyncRequestsSender::Request> requests;
requests.emplace_back(
@@ -226,7 +231,8 @@ private:
uassertStatusOK(response.status);
const auto responseStatus = getStatusFromCommandResult(response.data);
- if (ErrorCodes::isNeedRetargettingError(responseStatus.code())) {
+ if (ErrorCodes::isNeedRetargettingError(responseStatus.code()) ||
+ ErrorCodes::isSnapshotError(responseStatus.code())) {
// Command code traps this exception and re-runs
uassertStatusOK(responseStatus.withContext("findAndModify"));
}
diff --git a/src/mongo/s/commands/cluster_find_and_modify_test.cpp b/src/mongo/s/commands/cluster_find_and_modify_test.cpp
new file mode 100644
index 00000000000..f6ff28774ac
--- /dev/null
+++ b/src/mongo/s/commands/cluster_find_and_modify_test.cpp
@@ -0,0 +1,81 @@
+/**
+ * Copyright (C) 2018 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/s/commands/cluster_command_test_fixture.h"
+#include "mongo/util/log.h"
+
+namespace mongo {
+namespace {
+
+class ClusterFindAndModifyTest : public ClusterCommandTestFixture {
+protected:
+ const BSONObj kFindAndModifyCmdTargeted{
+ fromjson("{findAndModify: 'coll', query: {'_id': -1}, update: {$set: {x: 1}}}")};
+
+ void expectInspectRequest(int shardIndex, InspectionCallback cb) override {
+ onCommandForPoolExecutor([&](const executor::RemoteCommandRequest& request) {
+ ASSERT_EQ(kNss.coll(), request.cmdObj.firstElement().valueStringData());
+ cb(request);
+ return BSON("_id" << -1);
+ });
+ }
+
+ void expectReturnsSuccess(int shardIndex) override {
+ onCommandForPoolExecutor([this, shardIndex](const executor::RemoteCommandRequest& request) {
+ ASSERT_EQ(kNss.coll(), request.cmdObj.firstElement().valueStringData());
+ return BSON("_id" << -1);
+ });
+ }
+};
+
+TEST_F(ClusterFindAndModifyTest, NoErrors) {
+ testNoErrors(kFindAndModifyCmdTargeted);
+}
+
+TEST_F(ClusterFindAndModifyTest, RetryOnSnapshotError) {
+ testRetryOnSnapshotError(kFindAndModifyCmdTargeted);
+}
+
+TEST_F(ClusterFindAndModifyTest, MaxRetriesSnapshotErrors) {
+ testMaxRetriesSnapshotErrors(kFindAndModifyCmdTargeted);
+}
+
+TEST_F(ClusterFindAndModifyTest, AttachesAtClusterTimeForSnapshotReadConcern) {
+ testAttachesAtClusterTimeForSnapshotReadConcern(kFindAndModifyCmdTargeted);
+}
+
+TEST_F(ClusterFindAndModifyTest, SnapshotReadConcernWithAfterClusterTime) {
+ testSnapshotReadConcernWithAfterClusterTime(kFindAndModifyCmdTargeted);
+}
+
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/s/commands/cluster_insert_test.cpp b/src/mongo/s/commands/cluster_insert_test.cpp
new file mode 100644
index 00000000000..652b9bada42
--- /dev/null
+++ b/src/mongo/s/commands/cluster_insert_test.cpp
@@ -0,0 +1,76 @@
+/**
+ * Copyright (C) 2018 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/commands.h"
+#include "mongo/s/commands/cluster_command_test_fixture.h"
+#include "mongo/util/log.h"
+
+namespace mongo {
+namespace {
+
+class ClusterInsertTest : public ClusterCommandTestFixture {
+protected:
+ const BSONObj kInsertCmdTargeted{fromjson("{insert: 'coll', documents: [{'_id': -1}]}")};
+
+ const BSONObj kInsertCmdScatterGather{
+ fromjson("{insert: 'coll', documents: [{'_id': -1}, {'_id': 1}], ordered: false}")};
+
+ void expectInspectRequest(int shardIndex, InspectionCallback cb) override {
+ onCommandForPoolExecutor([&](const executor::RemoteCommandRequest& request) {
+ ASSERT_EQ(kNss.coll(), request.cmdObj.firstElement().valueStringData());
+ cb(request);
+ return BSON("nInserted" << 1);
+ });
+ }
+
+ void expectReturnsSuccess(int shardIndex) override {
+ onCommandForPoolExecutor([this, shardIndex](const executor::RemoteCommandRequest& request) {
+ ASSERT_EQ(kNss.coll(), request.cmdObj.firstElement().valueStringData());
+ return BSON("nInserted" << 1);
+ });
+ }
+};
+
+TEST_F(ClusterInsertTest, NoErrors) {
+ testNoErrors(kInsertCmdTargeted, kInsertCmdScatterGather);
+}
+
+TEST_F(ClusterInsertTest, AttachesAtClusterTimeForSnapshotReadConcern) {
+ testAttachesAtClusterTimeForSnapshotReadConcern(kInsertCmdTargeted, kInsertCmdScatterGather);
+}
+
+TEST_F(ClusterInsertTest, SnapshotReadConcernWithAfterClusterTime) {
+ testSnapshotReadConcernWithAfterClusterTime(kInsertCmdTargeted, kInsertCmdScatterGather);
+}
+
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/s/commands/cluster_update_test.cpp b/src/mongo/s/commands/cluster_update_test.cpp
new file mode 100644
index 00000000000..5e5f07df41a
--- /dev/null
+++ b/src/mongo/s/commands/cluster_update_test.cpp
@@ -0,0 +1,76 @@
+/**
+ * Copyright (C) 2018 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/s/commands/cluster_command_test_fixture.h"
+#include "mongo/util/log.h"
+
+namespace mongo {
+namespace {
+
+class ClusterUpdateTest : public ClusterCommandTestFixture {
+protected:
+ const BSONObj kUpdateCmdTargeted{
+ fromjson("{update: 'coll', updates: [{q: {'_id': -1}, u: {$set: {x: 1}}}]}")};
+
+ const BSONObj kUpdateCmdScatterGather{fromjson(
+ "{update: 'coll', updates: [{q: {'_id': {$gte: -1}}, u: {$set: {x: 1}}, multi: true}]}")};
+
+ void expectInspectRequest(int shardIndex, InspectionCallback cb) override {
+ onCommandForPoolExecutor([&](const executor::RemoteCommandRequest& request) {
+ ASSERT_EQ(kNss.coll(), request.cmdObj.firstElement().valueStringData());
+ cb(request);
+ return BSON("nMatched" << 1 << "nUpserted" << 0 << "nModified" << 1);
+ });
+ }
+
+ void expectReturnsSuccess(int shardIndex) override {
+ onCommandForPoolExecutor([this, shardIndex](const executor::RemoteCommandRequest& request) {
+ ASSERT_EQ(kNss.coll(), request.cmdObj.firstElement().valueStringData());
+ return BSON("nMatched" << 1 << "nUpserted" << 0 << "nModified" << 1);
+ });
+ }
+};
+
+TEST_F(ClusterUpdateTest, NoErrors) {
+ testNoErrors(kUpdateCmdTargeted, kUpdateCmdScatterGather);
+}
+
+TEST_F(ClusterUpdateTest, AttachesAtClusterTimeForSnapshotReadConcern) {
+ testAttachesAtClusterTimeForSnapshotReadConcern(kUpdateCmdTargeted, kUpdateCmdScatterGather);
+}
+
+TEST_F(ClusterUpdateTest, SnapshotReadConcernWithAfterClusterTime) {
+ testSnapshotReadConcernWithAfterClusterTime(kUpdateCmdTargeted, kUpdateCmdScatterGather);
+}
+
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/s/commands/cluster_write_cmd.cpp b/src/mongo/s/commands/cluster_write_cmd.cpp
index 9ff50ec8c59..adc98ad498d 100644
--- a/src/mongo/s/commands/cluster_write_cmd.cpp
+++ b/src/mongo/s/commands/cluster_write_cmd.cpp
@@ -45,6 +45,7 @@
#include "mongo/s/cluster_last_error_info.h"
#include "mongo/s/commands/cluster_explain.h"
#include "mongo/s/grid.h"
+#include "mongo/s/transaction/transaction_router.h"
#include "mongo/s/write_ops/batched_command_request.h"
#include "mongo/s/write_ops/batched_command_response.h"
#include "mongo/s/write_ops/chunk_manager_targeter.h"
@@ -246,6 +247,10 @@ private:
batchedRequest.setAllowImplicitCreate(false);
}
+ if (auto txnRouter = TransactionRouter::get(opCtx)) {
+ txnRouter->setAtClusterTimeToLatestTime(opCtx);
+ }
+
BatchWriteExecStats stats;
BatchedCommandResponse response;
ClusterWriter::write(opCtx, batchedRequest, &stats, &response);
diff --git a/src/mongo/s/query/cluster_aggregate_test.cpp b/src/mongo/s/query/cluster_aggregate_test.cpp
index 517faba1295..1ad895c97a6 100644
--- a/src/mongo/s/query/cluster_aggregate_test.cpp
+++ b/src/mongo/s/query/cluster_aggregate_test.cpp
@@ -42,27 +42,11 @@ protected:
const BSONObj kAggregateCmdTargeted{
fromjson("{aggregate: 'coll', pipeline: [{$match: {_id: 0}}], explain: false, "
"allowDiskUse: false, fromMongos: true, "
- "cursor: {batchSize: 10}, maxTimeMS: 100, readConcern: {level: 'snapshot'}, "
- "autocommit: false, txnNumber: NumberLong(1), startTransaction: true}")};
+ "cursor: {batchSize: 10}, maxTimeMS: 100}")};
const BSONObj kAggregateCmdScatterGather{fromjson(
"{aggregate: 'coll', pipeline: [], explain: false, allowDiskUse: false, fromMongos: true, "
- "cursor: {batchSize: 10}, readConcern: {level: 'snapshot'}, autocommit: false, txnNumber: "
- "NumberLong(1), startTransaction: true}")};
-
- BSONObj appendLogicalSessionId(BSONObj cmdObj) {
- BSONObjBuilder bob(cmdObj);
- bob.append("lsid", makeLogicalSessionIdForTest().toBSON());
- return bob.obj();
- }
-
- BSONObj aggregateCmdTargeted() {
- return appendLogicalSessionId(kAggregateCmdTargeted);
- }
-
- BSONObj aggregateCmdScatterGather() {
- return appendLogicalSessionId(kAggregateCmdScatterGather);
- }
+ "cursor: {batchSize: 10}}")};
// The index of the shard expected to receive the response is used to prevent different shards
// from returning documents with the same shard key. This is expected to be 0 for queries
@@ -91,54 +75,24 @@ protected:
};
TEST_F(ClusterAggregateTest, NoErrors) {
- loadRoutingTableWithTwoChunksAndTwoShards(kNss);
-
- // Target one shard.
- runCommandSuccessful(aggregateCmdTargeted(), true);
-
- // Target all shards.
- runCommandSuccessful(aggregateCmdScatterGather(), false);
+ testNoErrors(kAggregateCmdTargeted, kAggregateCmdScatterGather);
}
-// Verify aggregate through mongos will retry on a snapshot error.
TEST_F(ClusterAggregateTest, RetryOnSnapshotError) {
- loadRoutingTableWithTwoChunksAndTwoShards(kNss);
-
- // Target one shard.
- runCommandOneError(aggregateCmdTargeted(), ErrorCodes::SnapshotUnavailable, true);
- runCommandOneError(aggregateCmdTargeted(), ErrorCodes::SnapshotTooOld, true);
+ testRetryOnSnapshotError(kAggregateCmdTargeted, kAggregateCmdScatterGather);
+}
- // Target all shards
- runCommandOneError(aggregateCmdScatterGather(), ErrorCodes::SnapshotUnavailable, false);
- runCommandOneError(aggregateCmdScatterGather(), ErrorCodes::SnapshotTooOld, false);
+TEST_F(ClusterAggregateTest, MaxRetriesSnapshotErrors) {
+ testMaxRetriesSnapshotErrors(kAggregateCmdTargeted, kAggregateCmdScatterGather);
}
TEST_F(ClusterAggregateTest, AttachesAtClusterTimeForSnapshotReadConcern) {
- loadRoutingTableWithTwoChunksAndTwoShards(kNss);
-
- auto containsAtClusterTime = [](const executor::RemoteCommandRequest& request) {
- ASSERT(!request.cmdObj["readConcern"]["atClusterTime"].eoo());
- };
-
- // Target one shard.
- runCommandInspectRequests(aggregateCmdTargeted(), containsAtClusterTime, true);
-
- // Target all shards.
- runCommandInspectRequests(aggregateCmdScatterGather(), containsAtClusterTime, false);
+ testAttachesAtClusterTimeForSnapshotReadConcern(kAggregateCmdTargeted,
+ kAggregateCmdScatterGather);
}
-// Verify aggregate commands will retry up to its max retry attempts on snapshot errors
-// then return the final error it receives.
-TEST_F(ClusterAggregateTest, MaxRetriesSnapshotErrors) {
- loadRoutingTableWithTwoChunksAndTwoShards(kNss);
-
- // Target one shard.
- runCommandMaxErrors(aggregateCmdTargeted(), ErrorCodes::SnapshotUnavailable, true);
- runCommandMaxErrors(aggregateCmdTargeted(), ErrorCodes::SnapshotTooOld, true);
-
- // Target all shards
- runCommandMaxErrors(aggregateCmdScatterGather(), ErrorCodes::SnapshotUnavailable, false);
- runCommandMaxErrors(aggregateCmdScatterGather(), ErrorCodes::SnapshotTooOld, false);
+TEST_F(ClusterAggregateTest, SnapshotReadConcernWithAfterClusterTime) {
+ testSnapshotReadConcernWithAfterClusterTime(kAggregateCmdTargeted, kAggregateCmdScatterGather);
}
} // namespace
diff --git a/src/mongo/s/transaction/at_cluster_time_util.cpp b/src/mongo/s/transaction/at_cluster_time_util.cpp
index 0a1c66dce03..1264a785047 100644
--- a/src/mongo/s/transaction/at_cluster_time_util.cpp
+++ b/src/mongo/s/transaction/at_cluster_time_util.cpp
@@ -118,11 +118,20 @@ boost::optional<LogicalTime> computeAtClusterTimeForOneShard(OperationContext* o
uassert(ErrorCodes::ShardNotFound, str::stream() << "Could not find shard " << shardId, shard);
// Return the cached last committed opTime for the shard if there is one, otherwise return the
- // lastest cluster time from the logical clock.
+ // latest cluster time from the logical clock.
auto lastCommittedOpTime = shard->getLastCommittedOpTime();
- return lastCommittedOpTime != LogicalTime::kUninitialized
+
+ auto atClusterTime = lastCommittedOpTime != LogicalTime::kUninitialized
? lastCommittedOpTime
: LogicalClock::get(opCtx)->getClusterTime();
+
+ // If the user passed afterClusterTime, atClusterTime must be greater than or equal to it.
+ const auto afterClusterTime = repl::ReadConcernArgs::get(opCtx).getArgsAfterClusterTime();
+ if (afterClusterTime && *afterClusterTime > atClusterTime) {
+ return afterClusterTime;
+ }
+
+ return atClusterTime;
}
} // namespace at_cluster_time_util
diff --git a/src/mongo/s/transaction/compute_at_cluster_time_test.cpp b/src/mongo/s/transaction/compute_at_cluster_time_test.cpp
index 5bca0643238..61efcabe61d 100644
--- a/src/mongo/s/transaction/compute_at_cluster_time_test.cpp
+++ b/src/mongo/s/transaction/compute_at_cluster_time_test.cpp
@@ -268,6 +268,10 @@ TEST_F(AtClusterTimeTargetingTest, AfterClusterTime) {
ASSERT(computedTime);
ASSERT_GTE(*computedTime, afterClusterTime);
+ computedTime = at_cluster_time_util::computeAtClusterTimeForOneShard(operationContext(), s0);
+ ASSERT(computedTime);
+ ASSERT_GTE(*computedTime, afterClusterTime);
+
// Target all shards.
computedTime = at_cluster_time_util::computeAtClusterTime(
operationContext(), true, {s0, s1}, kNss, kEmptyQuery, kEmptyCollation);
@@ -286,6 +290,10 @@ TEST_F(AtClusterTimeTargetingTest, AfterClusterTime) {
ASSERT(computedTime);
ASSERT_GTE(*computedTime, afterClusterTime);
+ computedTime = at_cluster_time_util::computeAtClusterTimeForOneShard(operationContext(), s0);
+ ASSERT(computedTime);
+ ASSERT_GTE(*computedTime, afterClusterTime);
+
// Target all shards.
computedTime = at_cluster_time_util::computeAtClusterTime(
operationContext(), true, {s0, s1}, kNss, kEmptyQuery, kEmptyCollation);
@@ -329,6 +337,10 @@ TEST_F(AtClusterTimeTargetingTest, AfterClusterTimeLowerBound) {
ASSERT(computedTime);
ASSERT_EQ(*computedTime, afterClusterTime);
+ computedTime = at_cluster_time_util::computeAtClusterTimeForOneShard(operationContext(), s0);
+ ASSERT(computedTime);
+ ASSERT_EQ(*computedTime, afterClusterTime);
+
// Target one shard with a last committed optime less than afterClusterTime. The computed value
// should still equal afterClusterTime.
LogicalTime time1(Timestamp(1, 1));
@@ -339,6 +351,10 @@ TEST_F(AtClusterTimeTargetingTest, AfterClusterTimeLowerBound) {
operationContext(), true, {s0}, kNss, kEmptyQuery, kEmptyCollation);
ASSERT(computedTime);
ASSERT_EQ(*computedTime, afterClusterTime);
+
+ computedTime = at_cluster_time_util::computeAtClusterTimeForOneShard(operationContext(), s0);
+ ASSERT(computedTime);
+ ASSERT_EQ(*computedTime, afterClusterTime);
}
} // namespace
diff --git a/src/mongo/s/transaction/transaction_router.cpp b/src/mongo/s/transaction/transaction_router.cpp
index 9477bb643b7..823b6e8c813 100644
--- a/src/mongo/s/transaction/transaction_router.cpp
+++ b/src/mongo/s/transaction/transaction_router.cpp
@@ -244,9 +244,9 @@ TransactionRouter::Participant& TransactionRouter::getOrCreateParticipant(const
auto participant =
TransactionRouter::Participant(isFirstParticipant, _txnNumber, _readConcernArgs);
- // TODO SERVER-36557: Every command that starts a cross-shard transaction should
- // compute atClusterTime with snapshot read concern. Hence, we should be able to
- // add an invariant here to ensure that atClusterTime is not none.
+ // TODO SERVER-36589: Once mongos aborts transactions by only sending abortTransaction to shards
+ // that have been successfully contacted we should be able to add an invariant here to ensure
+ // that an atClusterTime has been chosen if the read concern level is snapshot.
if (_atClusterTime) {
participant.setAtClusterTime(*_atClusterTime);
}
@@ -299,6 +299,24 @@ void TransactionRouter::computeAtClusterTimeForOneShard(OperationContext* opCtx,
}
}
+void TransactionRouter::setAtClusterTimeToLatestTime(OperationContext* opCtx) {
+ if (_atClusterTime ||
+ _readConcernArgs.getLevel() != repl::ReadConcernLevel::kSnapshotReadConcern) {
+ return;
+ }
+
+ auto atClusterTime = LogicalClock::get(opCtx)->getClusterTime();
+
+ // If the user passed afterClusterTime, atClusterTime for the transaction must be selected so it
+ // is at least equal to or greater than it.
+ auto afterClusterTime = repl::ReadConcernArgs::get(opCtx).getArgsAfterClusterTime();
+ if (afterClusterTime && *afterClusterTime > atClusterTime) {
+ atClusterTime = *afterClusterTime;
+ }
+
+ _atClusterTime = atClusterTime;
+}
+
void TransactionRouter::beginOrContinueTxn(OperationContext* opCtx,
TxnNumber txnNumber,
bool startTransaction) {
diff --git a/src/mongo/s/transaction/transaction_router.h b/src/mongo/s/transaction/transaction_router.h
index 22447620f55..5a8acc9e97b 100644
--- a/src/mongo/s/transaction/transaction_router.h
+++ b/src/mongo/s/transaction/transaction_router.h
@@ -126,6 +126,12 @@ public:
*/
void computeAtClusterTimeForOneShard(OperationContext* opCtx, const ShardId& shardId);
+ /**
+ * Sets the atClusterTime for the current transaction to the latest time in the router's logical
+ * clock.
+ */
+ void setAtClusterTimeToLatestTime(OperationContext* opCtx);
+
bool isCheckedOut();
const LogicalSessionId& getSessionId() const;
diff --git a/src/mongo/s/write_ops/SConscript b/src/mongo/s/write_ops/SConscript
index 593d65a4d23..6aa140a7722 100644
--- a/src/mongo/s/write_ops/SConscript
+++ b/src/mongo/s/write_ops/SConscript
@@ -34,6 +34,7 @@ env.Library(
LIBDEPS=[
'$BUILD_DIR/mongo/s/async_requests_sender',
'$BUILD_DIR/mongo/s/commands/cluster_commands_helpers',
+ '$BUILD_DIR/mongo/s/transaction/router_session',
'batch_write_types',
],
)
diff --git a/src/mongo/s/write_ops/batch_write_exec.cpp b/src/mongo/s/write_ops/batch_write_exec.cpp
index 120d7ca33f8..844730e1c40 100644
--- a/src/mongo/s/write_ops/batch_write_exec.cpp
+++ b/src/mongo/s/write_ops/batch_write_exec.cpp
@@ -42,6 +42,7 @@
#include "mongo/s/async_requests_sender.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/grid.h"
+#include "mongo/s/transaction/transaction_router.h"
#include "mongo/s/write_ops/batch_write_op.h"
#include "mongo/s/write_ops/write_error_detail.h"
#include "mongo/util/log.h"
@@ -270,6 +271,18 @@ void BatchWriteExec::executeBatch(OperationContext* opCtx,
LOG(4) << "Write results received from " << shardHost.toString() << ": "
<< redact(batchedCommandResponse.toString());
+ // If we are in a transaction, we must fail the whole batch.
+ if (TransactionRouter::get(opCtx)) {
+ // Note: this returns a bad status if any part of the batch failed.
+ auto batchStatus = batchedCommandResponse.toStatus();
+ if (!batchStatus.isOK()) {
+ batchOp.forgetTargetedBatchesOnTransactionAbortingError();
+ uassertStatusOK(batchStatus.withContext(
+ str::stream() << "Encountered error from " << shardHost.toString()
+ << " during a transaction"));
+ }
+ }
+
// Dispatch was ok, note response
batchOp.noteBatchResponse(*batch, batchedCommandResponse, &trackedErrors);
diff --git a/src/mongo/s/write_ops/batch_write_exec_test.cpp b/src/mongo/s/write_ops/batch_write_exec_test.cpp
index 6509ad29e6b..0ea95d1d4fe 100644
--- a/src/mongo/s/write_ops/batch_write_exec_test.cpp
+++ b/src/mongo/s/write_ops/batch_write_exec_test.cpp
@@ -35,6 +35,7 @@
#include "mongo/s/catalog/type_shard.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/sharding_router_test_fixture.h"
+#include "mongo/s/transaction/transaction_router.h"
#include "mongo/s/write_ops/batch_write_exec.h"
#include "mongo/s/write_ops/batched_command_request.h"
#include "mongo/s/write_ops/batched_command_response.h"
@@ -571,5 +572,167 @@ TEST_F(BatchWriteExecTest, NonRetryableErrorTxnNumber) {
future.timed_get(kFutureTimeout);
}
+
+class BatchWriteExecTransactionTest : public BatchWriteExecTest {
+public:
+ const TxnNumber kTxnNumber = 5;
+
+ void setUp() override {
+ BatchWriteExecTest::setUp();
+
+ operationContext()->setLogicalSessionId(makeLogicalSessionIdForTest());
+ operationContext()->setTxnNumber(kTxnNumber);
+ repl::ReadConcernArgs::get(operationContext()) =
+ repl::ReadConcernArgs(repl::ReadConcernLevel::kSnapshotReadConcern);
+
+ _scopedSession.emplace(operationContext());
+
+ auto txnRouter = TransactionRouter::get(operationContext());
+ txnRouter->checkOut();
+ txnRouter->beginOrContinueTxn(operationContext(), kTxnNumber, true);
+ }
+
+ void tearDown() override {
+ _scopedSession.reset();
+ repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs();
+
+ BatchWriteExecTest::tearDown();
+ }
+
+private:
+ boost::optional<ScopedRouterSession> _scopedSession;
+};
+
+TEST_F(BatchWriteExecTransactionTest, ErrorInBatchThrows_CommandError) {
+ BatchedCommandRequest request([&] {
+ write_ops::Insert insertOp(nss);
+ insertOp.setWriteCommandBase([] {
+ write_ops::WriteCommandBase writeCommandBase;
+ writeCommandBase.setOrdered(false);
+ return writeCommandBase;
+ }());
+ insertOp.setDocuments({BSON("x" << 1), BSON("x" << 2)});
+ return insertOp;
+ }());
+ request.setWriteConcern(BSONObj());
+
+ auto future = launchAsync([&] {
+ BatchedCommandResponse response;
+ BatchWriteExecStats stats;
+ ASSERT_THROWS_CODE(BatchWriteExec::executeBatch(
+ operationContext(), nsTargeter, request, &response, &stats),
+ AssertionException,
+ ErrorCodes::UnknownError);
+ });
+
+ BatchedCommandResponse failedResponse;
+ failedResponse.setStatus({ErrorCodes::UnknownError, "dummy error"});
+
+ expectInsertsReturnError({BSON("x" << 1), BSON("x" << 2)}, failedResponse);
+
+ future.timed_get(kFutureTimeout);
+}
+
+TEST_F(BatchWriteExecTransactionTest, ErrorInBatchThrows_WriteError) {
+ BatchedCommandRequest request([&] {
+ write_ops::Insert insertOp(nss);
+ insertOp.setWriteCommandBase([] {
+ write_ops::WriteCommandBase writeCommandBase;
+ writeCommandBase.setOrdered(false);
+ return writeCommandBase;
+ }());
+ insertOp.setDocuments({BSON("x" << 1), BSON("x" << 2)});
+ return insertOp;
+ }());
+ request.setWriteConcern(BSONObj());
+
+ auto future = launchAsync([&] {
+ BatchedCommandResponse response;
+ BatchWriteExecStats stats;
+ ASSERT_THROWS_CODE(BatchWriteExec::executeBatch(
+ operationContext(), nsTargeter, request, &response, &stats),
+ AssertionException,
+ ErrorCodes::StaleShardVersion);
+ });
+
+ // Any write error works, using SSV for convenience.
+ expectInsertsReturnStaleVersionErrors({BSON("x" << 1), BSON("x" << 2)});
+
+ future.timed_get(kFutureTimeout);
+}
+
+TEST_F(BatchWriteExecTransactionTest, ErrorInBatchThrows_WriteErrorOrdered) {
+ BatchedCommandRequest request([&] {
+ write_ops::Insert insertOp(nss);
+ insertOp.setWriteCommandBase([] {
+ write_ops::WriteCommandBase writeCommandBase;
+ writeCommandBase.setOrdered(true);
+ return writeCommandBase;
+ }());
+ insertOp.setDocuments({BSON("x" << 1), BSON("x" << 2)});
+ return insertOp;
+ }());
+ request.setWriteConcern(BSONObj());
+
+ auto future = launchAsync([&] {
+ BatchedCommandResponse response;
+ BatchWriteExecStats stats;
+ ASSERT_THROWS_CODE(BatchWriteExec::executeBatch(
+ operationContext(), nsTargeter, request, &response, &stats),
+ AssertionException,
+ ErrorCodes::StaleShardVersion);
+ });
+
+ // Any write error works, using SSV for convenience.
+ expectInsertsReturnStaleVersionErrors({BSON("x" << 1), BSON("x" << 2)});
+
+ future.timed_get(kFutureTimeout);
+}
+
+TEST_F(BatchWriteExecTransactionTest, ErrorInBatchThrows_WriteConcernError) {
+ BatchedCommandRequest request([&] {
+ write_ops::Insert insertOp(nss);
+ insertOp.setWriteCommandBase([] {
+ write_ops::WriteCommandBase writeCommandBase;
+ writeCommandBase.setOrdered(false);
+ return writeCommandBase;
+ }());
+ insertOp.setDocuments({BSON("x" << 1), BSON("x" << 2)});
+ return insertOp;
+ }());
+ request.setWriteConcern(BSONObj());
+
+ auto future = launchAsync([&] {
+ BatchedCommandResponse response;
+ BatchWriteExecStats stats;
+ ASSERT_THROWS_CODE(BatchWriteExec::executeBatch(
+ operationContext(), nsTargeter, request, &response, &stats),
+ AssertionException,
+ ErrorCodes::NetworkTimeout);
+ });
+
+ onCommandForPoolExecutor([&](const executor::RemoteCommandRequest& request) {
+ const auto opMsgRequest = OpMsgRequest::fromDBAndBody(request.dbname, request.cmdObj);
+ const auto insertOp = InsertOp::parse(opMsgRequest);
+ ASSERT_EQUALS(nss, insertOp.getNamespace());
+
+ BatchedCommandResponse response;
+ response.setStatus(Status::OK());
+ response.setN(1);
+
+ auto wcError = stdx::make_unique<WriteConcernErrorDetail>();
+ WriteConcernResult wcRes;
+ wcRes.err = "timeout";
+ wcRes.wTimedOut = true;
+ wcError->setStatus({ErrorCodes::NetworkTimeout, "Failed to wait for write concern"});
+ wcError->setErrInfo(BSON("wtimeout" << true));
+ response.setWriteConcernError(wcError.release());
+
+ return response.toBSON();
+ });
+
+ future.timed_get(kFutureTimeout);
+}
+
} // namespace
} // namespace mongo
diff --git a/src/mongo/s/write_ops/batch_write_op.cpp b/src/mongo/s/write_ops/batch_write_op.cpp
index a5f0824c5f7..1812ed4713a 100644
--- a/src/mongo/s/write_ops/batch_write_op.cpp
+++ b/src/mongo/s/write_ops/batch_write_op.cpp
@@ -679,6 +679,10 @@ void BatchWriteOp::abortBatch(const WriteErrorDetail& error) {
dassert(isFinished());
}
+void BatchWriteOp::forgetTargetedBatchesOnTransactionAbortingError() {
+ _targeted.clear();
+}
+
bool BatchWriteOp::isFinished() {
const size_t numWriteOps = _clientRequest.sizeWriteOps();
const bool orderedOps = _clientRequest.getWriteCommandBase().getOrdered();
diff --git a/src/mongo/s/write_ops/batch_write_op.h b/src/mongo/s/write_ops/batch_write_op.h
index 6b1856e7a3d..13418231ea0 100644
--- a/src/mongo/s/write_ops/batch_write_op.h
+++ b/src/mongo/s/write_ops/batch_write_op.h
@@ -173,6 +173,13 @@ public:
void abortBatch(const WriteErrorDetail& error);
/**
+ * Disposes of all tracked targeted batches when an error is encountered during a transaction.
+ * This is safe because any partially written data on shards will be rolled back if mongos
+ * decides to abort.
+ */
+ void forgetTargetedBatchesOnTransactionAbortingError();
+
+ /**
* Returns false if the batch write op needs more processing.
*/
bool isFinished();