summaryrefslogtreecommitdiff
path: root/src/mongo/s/commands
diff options
context:
space:
mode:
authorJack Mulrow <jack.mulrow@mongodb.com>2018-08-29 14:37:23 -0400
committerJack Mulrow <jack.mulrow@mongodb.com>2018-09-06 18:46:12 -0400
commit3257bb958a29a23d5101a16c49d97536e5fba056 (patch)
tree54871c65289f89b954e2e91f183aac00ea730443 /src/mongo/s/commands
parentb4f3327901f58083d376064a6d89680325b26964 (diff)
downloadmongo-3257bb958a29a23d5101a16c49d97536e5fba056.tar.gz
SERVER-36557 Compute atClusterTime for cluster batch writes and findAndModify
Diffstat (limited to 'src/mongo/s/commands')
-rw-r--r--src/mongo/s/commands/SConscript7
-rw-r--r--src/mongo/s/commands/cluster_command_test_fixture.cpp108
-rw-r--r--src/mongo/s/commands/cluster_command_test_fixture.h39
-rw-r--r--src/mongo/s/commands/cluster_delete_test.cpp76
-rw-r--r--src/mongo/s/commands/cluster_distinct_test.cpp100
-rw-r--r--src/mongo/s/commands/cluster_find_and_modify_cmd.cpp8
-rw-r--r--src/mongo/s/commands/cluster_find_and_modify_test.cpp81
-rw-r--r--src/mongo/s/commands/cluster_insert_test.cpp76
-rw-r--r--src/mongo/s/commands/cluster_update_test.cpp76
-rw-r--r--src/mongo/s/commands/cluster_write_cmd.cpp5
10 files changed, 482 insertions, 94 deletions
diff --git a/src/mongo/s/commands/SConscript b/src/mongo/s/commands/SConscript
index 3c6fe218bfd..7e49e001480 100644
--- a/src/mongo/s/commands/SConscript
+++ b/src/mongo/s/commands/SConscript
@@ -146,13 +146,18 @@ env.Library(
)
env.CppUnitTest(
- target="cluster_distinct_test",
+ target="cluster_commands_test",
source=[
+ "cluster_delete_test.cpp",
"cluster_distinct_test.cpp",
+ "cluster_find_and_modify_test.cpp",
+ "cluster_insert_test.cpp",
+ "cluster_update_test.cpp",
],
LIBDEPS=[
'cluster_commands',
'cluster_command_test_fixture',
+ '$BUILD_DIR/mongo/db/auth/authmocks',
'$BUILD_DIR/mongo/db/auth/saslauth',
],
)
diff --git a/src/mongo/s/commands/cluster_command_test_fixture.cpp b/src/mongo/s/commands/cluster_command_test_fixture.cpp
index 2d3bec52e9c..e9c44a154ef 100644
--- a/src/mongo/s/commands/cluster_command_test_fixture.cpp
+++ b/src/mongo/s/commands/cluster_command_test_fixture.cpp
@@ -39,6 +39,7 @@
#include "mongo/db/logical_clock.h"
#include "mongo/db/logical_session_cache_noop.h"
#include "mongo/db/logical_time_validator.h"
+#include "mongo/s/cluster_last_error_info.h"
#include "mongo/util/log.h"
namespace mongo {
@@ -62,6 +63,26 @@ void ClusterCommandTestFixture::setUp() {
LogicalTimeValidator::set(getServiceContext(), std::move(validator));
LogicalSessionCache::set(getServiceContext(), stdx::make_unique<LogicalSessionCacheNoop>());
+
+ loadRoutingTableWithTwoChunksAndTwoShards(kNss);
+}
+
+BSONObj ClusterCommandTestFixture::_makeCmd(BSONObj cmdObj, bool includeAfterClusterTime) {
+ BSONObjBuilder bob(cmdObj);
+ // Each command runs in a new session.
+ bob.append("lsid", makeLogicalSessionIdForTest().toBSON());
+ bob.append("txnNumber", TxnNumber(1));
+ bob.append("autocommit", false);
+ bob.append("startTransaction", true);
+
+ BSONObjBuilder readConcernBob = bob.subobjStart(repl::ReadConcernArgs::kReadConcernFieldName);
+ readConcernBob.append("level", "snapshot");
+ if (includeAfterClusterTime) {
+ readConcernBob.append("afterClusterTime", kAfterClusterTime);
+ }
+
+ readConcernBob.doneFast();
+ return bob.obj();
}
void ClusterCommandTestFixture::expectReturnsError(ErrorCodes::Error code) {
@@ -79,6 +100,15 @@ DbResponse ClusterCommandTestFixture::runCommand(BSONObj cmd) {
const auto opMsgRequest = OpMsgRequest::fromDBAndBody(kNss.db(), cmd);
+ // Ensure the clusterGLE on the Client has not yet been initialized.
+ ASSERT(!ClusterLastErrorInfo::get(client.get()));
+
+ // Initialize the cluster last error info for the client with a new request.
+ ClusterLastErrorInfo::get(client.get()) = std::make_shared<ClusterLastErrorInfo>();
+ ASSERT(ClusterLastErrorInfo::get(client.get()));
+ auto clusterGLE = ClusterLastErrorInfo::get(client.get());
+ clusterGLE->newRequest();
+
return Strategy::clientCommand(opCtx.get(), opMsgRequest.serialize());
}
@@ -142,4 +172,82 @@ void ClusterCommandTestFixture::runCommandMaxErrors(BSONObj cmd,
future.timed_get(kFutureTimeout);
}
+void ClusterCommandTestFixture::testNoErrors(BSONObj targetedCmd, BSONObj scatterGatherCmd) {
+
+ // Target one shard.
+ runCommandSuccessful(_makeCmd(targetedCmd), true);
+
+ // Target all shards.
+ if (!scatterGatherCmd.isEmpty()) {
+ runCommandSuccessful(_makeCmd(scatterGatherCmd), false);
+ }
+}
+
+void ClusterCommandTestFixture::testRetryOnSnapshotError(BSONObj targetedCmd,
+ BSONObj scatterGatherCmd) {
+ // Target one shard.
+ runCommandOneError(_makeCmd(targetedCmd), ErrorCodes::SnapshotUnavailable, true);
+ runCommandOneError(_makeCmd(targetedCmd), ErrorCodes::SnapshotTooOld, true);
+
+ // Target all shards
+ if (!scatterGatherCmd.isEmpty()) {
+ runCommandOneError(_makeCmd(scatterGatherCmd), ErrorCodes::SnapshotUnavailable, false);
+ runCommandOneError(_makeCmd(scatterGatherCmd), ErrorCodes::SnapshotTooOld, false);
+ }
+}
+
+void ClusterCommandTestFixture::testMaxRetriesSnapshotErrors(BSONObj targetedCmd,
+ BSONObj scatterGatherCmd) {
+ // Target one shard.
+ runCommandMaxErrors(_makeCmd(targetedCmd), ErrorCodes::SnapshotUnavailable, true);
+ runCommandMaxErrors(_makeCmd(targetedCmd), ErrorCodes::SnapshotTooOld, true);
+
+ // Target all shards
+ if (!scatterGatherCmd.isEmpty()) {
+ runCommandMaxErrors(_makeCmd(scatterGatherCmd), ErrorCodes::SnapshotUnavailable, false);
+ runCommandMaxErrors(_makeCmd(scatterGatherCmd), ErrorCodes::SnapshotTooOld, false);
+ }
+}
+
+void ClusterCommandTestFixture::testAttachesAtClusterTimeForSnapshotReadConcern(
+ BSONObj targetedCmd, BSONObj scatterGatherCmd) {
+
+ auto containsAtClusterTime = [](const executor::RemoteCommandRequest& request) {
+ ASSERT(!request.cmdObj["readConcern"]["atClusterTime"].eoo());
+ };
+
+ // Target one shard.
+ runCommandInspectRequests(_makeCmd(targetedCmd), containsAtClusterTime, true);
+
+ // Target all shards.
+ if (!scatterGatherCmd.isEmpty()) {
+ runCommandInspectRequests(_makeCmd(scatterGatherCmd), containsAtClusterTime, false);
+ }
+}
+
+void ClusterCommandTestFixture::testSnapshotReadConcernWithAfterClusterTime(
+ BSONObj targetedCmd, BSONObj scatterGatherCmd) {
+
+ auto containsAtClusterTimeNoAfterClusterTime =
+ [&](const executor::RemoteCommandRequest& request) {
+ ASSERT(!request.cmdObj["readConcern"]["atClusterTime"].eoo());
+ ASSERT(request.cmdObj["readConcern"]["afterClusterTime"].eoo());
+
+ // The chosen atClusterTime should be greater than or equal to the request's
+ // afterClusterTime.
+ ASSERT_GTE(LogicalTime(request.cmdObj["readConcern"]["atClusterTime"].timestamp()),
+ LogicalTime(kAfterClusterTime));
+ };
+
+ // Target one shard.
+ runCommandInspectRequests(
+ _makeCmd(targetedCmd, true), containsAtClusterTimeNoAfterClusterTime, true);
+
+ // Target all shards.
+ if (!scatterGatherCmd.isEmpty()) {
+ runCommandInspectRequests(
+ _makeCmd(scatterGatherCmd, true), containsAtClusterTimeNoAfterClusterTime, false);
+ }
+}
+
} // namespace mongo
diff --git a/src/mongo/s/commands/cluster_command_test_fixture.h b/src/mongo/s/commands/cluster_command_test_fixture.h
index 9ddf001d080..71cc2f2e830 100644
--- a/src/mongo/s/commands/cluster_command_test_fixture.h
+++ b/src/mongo/s/commands/cluster_command_test_fixture.h
@@ -45,6 +45,8 @@ protected:
const LogicalTime kInMemoryLogicalTime = LogicalTime(Timestamp(10, 1));
+ const Timestamp kAfterClusterTime = Timestamp(50, 2);
+
void setUp() override;
virtual void expectInspectRequest(int shardIndex, InspectionCallback cb) = 0;
@@ -62,6 +64,43 @@ protected:
void runCommandInspectRequests(BSONObj cmd, InspectionCallback cb, bool isTargeted);
void runCommandMaxErrors(BSONObj cmd, ErrorCodes::Error code, bool isTargeted);
+
+ /**
+ * Verifies that running the given commands through mongos will succeed.
+ */
+ void testNoErrors(BSONObj targetedCmd, BSONObj scatterGatherCmd = BSONObj());
+
+ /**
+ * Verifies that the given commands will retry on a snapshot error.
+ */
+ void testRetryOnSnapshotError(BSONObj targetedCmd, BSONObj scatterGatherCmd = BSONObj());
+
+ /**
+ * Verifies that the given commands will retry up to the max retry attempts on snapshot
+ * errors then return the final errors they receive.
+ */
+ void testMaxRetriesSnapshotErrors(BSONObj targetedCmd, BSONObj scatterGatherCmd = BSONObj());
+
+ /**
+ * Verifies that atClusterTime is attached to the given commands.
+ */
+ void testAttachesAtClusterTimeForSnapshotReadConcern(BSONObj targetedCmd,
+ BSONObj scatterGatherCmd = BSONObj());
+
+ /**
+ * Verifies that the chosen atClusterTime is greater than or equal to each command's
+ * afterClusterTime.
+ */
+ void testSnapshotReadConcernWithAfterClusterTime(BSONObj targetedCmd,
+ BSONObj scatterGatherCmd = BSONObj());
+
+private:
+ /**
+ * Makes a new command object from the one given by apppending read concern
+ * snapshot and the appropriate transaction options. If includeAfterClusterTime
+ * is true, also appends afterClusterTime to the read concern.
+ */
+ BSONObj _makeCmd(BSONObj cmdObj, bool includeAfterClusterTime = false);
};
} // namespace mongo
diff --git a/src/mongo/s/commands/cluster_delete_test.cpp b/src/mongo/s/commands/cluster_delete_test.cpp
new file mode 100644
index 00000000000..275e33254ec
--- /dev/null
+++ b/src/mongo/s/commands/cluster_delete_test.cpp
@@ -0,0 +1,76 @@
+/**
+* Copyright (C) 2018 MongoDB Inc.
+*
+* This program is free software: you can redistribute it and/or modify
+* it under the terms of the GNU Affero General Public License, version 3,
+* as published by the Free Software Foundation.
+*
+* This program is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+* GNU Affero General Public License for more details.
+*
+* You should have received a copy of the GNU Affero General Public License
+* along with this program. If not, see <http://www.gnu.org/licenses/>.
+*
+* As a special exception, the copyright holders give permission to link the
+* code of portions of this program with the OpenSSL library under certain
+* conditions as described in each individual source file and distribute
+* linked combinations including the program with the OpenSSL library. You
+* must comply with the GNU Affero General Public License in all respects
+* for all of the code used other than as permitted herein. If you modify
+* file(s) with this exception, you may extend this exception to your
+* version of the file(s), but you are not obligated to do so. If you do not
+* wish to do so, delete this exception statement from your version. If you
+* delete this exception statement from all source files in the program,
+* then also delete it in the license file.
+*/
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/s/commands/cluster_command_test_fixture.h"
+#include "mongo/util/log.h"
+
+namespace mongo {
+namespace {
+
+class ClusterDeleteTest : public ClusterCommandTestFixture {
+protected:
+ const BSONObj kDeleteCmdTargeted{
+ fromjson("{delete: 'coll', deletes: [{q: {'_id': -1}, limit: 0}]}")};
+
+ const BSONObj kDeleteCmdScatterGather{
+ fromjson("{delete: 'coll', deletes: [{q: {'_id': {$gte: -1}}, limit: 0}]}")};
+
+ void expectInspectRequest(int shardIndex, InspectionCallback cb) override {
+ onCommandForPoolExecutor([&](const executor::RemoteCommandRequest& request) {
+ ASSERT_EQ(kNss.coll(), request.cmdObj.firstElement().valueStringData());
+ cb(request);
+ return BSON("n" << 1);
+ });
+ }
+
+ void expectReturnsSuccess(int shardIndex) override {
+ onCommandForPoolExecutor([this, shardIndex](const executor::RemoteCommandRequest& request) {
+ ASSERT_EQ(kNss.coll(), request.cmdObj.firstElement().valueStringData());
+ return BSON("n" << 1);
+ });
+ }
+};
+
+TEST_F(ClusterDeleteTest, NoErrors) {
+ testNoErrors(kDeleteCmdTargeted, kDeleteCmdScatterGather);
+}
+
+TEST_F(ClusterDeleteTest, AttachesAtClusterTimeForSnapshotReadConcern) {
+ testAttachesAtClusterTimeForSnapshotReadConcern(kDeleteCmdTargeted, kDeleteCmdScatterGather);
+}
+
+TEST_F(ClusterDeleteTest, SnapshotReadConcernWithAfterClusterTime) {
+ testSnapshotReadConcernWithAfterClusterTime(kDeleteCmdTargeted, kDeleteCmdScatterGather);
+}
+
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/s/commands/cluster_distinct_test.cpp b/src/mongo/s/commands/cluster_distinct_test.cpp
index 16c2468e3a1..2224d0f3adb 100644
--- a/src/mongo/s/commands/cluster_distinct_test.cpp
+++ b/src/mongo/s/commands/cluster_distinct_test.cpp
@@ -38,39 +38,10 @@ namespace {
class ClusterDistinctTest : public ClusterCommandTestFixture {
protected:
- const Timestamp kAfterClusterTime = Timestamp(50, 2);
-
const BSONObj kDistinctCmdTargeted{
- fromjson("{distinct: 'coll', key: 'x', query: {'_id': {$lt: -1}}, autocommit: false, "
- "txnNumber: NumberLong(1), startTransaction: true}")};
-
- const BSONObj kDistinctCmdScatterGather{
- fromjson("{distinct: 'coll', key: '_id', autocommit: false, txnNumber: NumberLong(1), "
- "startTransaction: true}")};
-
- BSONObj appendLogicalSessionIdAndSnapshotReadConcern(BSONObj cmdObj,
- bool includeAfterClusterTime) {
- BSONObjBuilder bob(cmdObj);
- bob.append("lsid", makeLogicalSessionIdForTest().toBSON());
- BSONObjBuilder readConcernBob =
- bob.subobjStart(repl::ReadConcernArgs::kReadConcernFieldName);
- readConcernBob.append("level", "snapshot");
- if (includeAfterClusterTime) {
- readConcernBob.append("afterClusterTime", kAfterClusterTime);
- }
- readConcernBob.doneFast();
- return bob.obj();
- }
-
- BSONObj distinctCmdTargeted(bool includeAfterClusterTime = false) {
- return appendLogicalSessionIdAndSnapshotReadConcern(kDistinctCmdTargeted,
- includeAfterClusterTime);
- }
+ fromjson("{distinct: 'coll', key: 'x', query: {'_id': {$lt: -1}}}")};
- BSONObj distinctCmdScatterGather(bool includeAfterClusterTime = false) {
- return appendLogicalSessionIdAndSnapshotReadConcern(kDistinctCmdScatterGather,
- includeAfterClusterTime);
- }
+ const BSONObj kDistinctCmdScatterGather{fromjson("{distinct: 'coll', key: '_id'}")};
void expectInspectRequest(int shardIndex, InspectionCallback cb) override {
onCommandForPoolExecutor([&](const executor::RemoteCommandRequest& request) {
@@ -89,79 +60,24 @@ protected:
};
TEST_F(ClusterDistinctTest, NoErrors) {
- loadRoutingTableWithTwoChunksAndTwoShards(kNss);
-
- // Target one shard.
- runCommandSuccessful(distinctCmdTargeted(), true);
-
- // Target all shards.
- runCommandSuccessful(distinctCmdScatterGather(), false);
+ testNoErrors(kDistinctCmdTargeted, kDistinctCmdScatterGather);
}
-// Verify distinct through mongos will retry on a snapshot error.
TEST_F(ClusterDistinctTest, RetryOnSnapshotError) {
- loadRoutingTableWithTwoChunksAndTwoShards(kNss);
-
- // Target one shard.
- runCommandOneError(distinctCmdTargeted(), ErrorCodes::SnapshotUnavailable, true);
- runCommandOneError(distinctCmdTargeted(), ErrorCodes::SnapshotTooOld, true);
-
- // Target all shards
- runCommandOneError(distinctCmdScatterGather(), ErrorCodes::SnapshotUnavailable, false);
- runCommandOneError(distinctCmdScatterGather(), ErrorCodes::SnapshotTooOld, false);
+ testRetryOnSnapshotError(kDistinctCmdTargeted, kDistinctCmdScatterGather);
}
-// Verify distinct commands will retry up to its max retry attempts on snapshot errors
-// then return the final error it receives.
TEST_F(ClusterDistinctTest, MaxRetriesSnapshotErrors) {
- loadRoutingTableWithTwoChunksAndTwoShards(kNss);
-
- // Target one shard.
- runCommandMaxErrors(distinctCmdTargeted(), ErrorCodes::SnapshotUnavailable, true);
- runCommandMaxErrors(distinctCmdTargeted(), ErrorCodes::SnapshotTooOld, true);
-
- // Target all shards
- runCommandMaxErrors(distinctCmdScatterGather(), ErrorCodes::SnapshotUnavailable, false);
- runCommandMaxErrors(distinctCmdScatterGather(), ErrorCodes::SnapshotTooOld, false);
+ testMaxRetriesSnapshotErrors(kDistinctCmdTargeted, kDistinctCmdScatterGather);
}
TEST_F(ClusterDistinctTest, AttachesAtClusterTimeForSnapshotReadConcern) {
- loadRoutingTableWithTwoChunksAndTwoShards(kNss);
-
- auto containsAtClusterTime = [](const executor::RemoteCommandRequest& request) {
- ASSERT(!request.cmdObj["readConcern"]["atClusterTime"].eoo());
- };
-
- // Target one shard.
- runCommandInspectRequests(distinctCmdTargeted(), containsAtClusterTime, true);
-
- // Target all shards.
- runCommandInspectRequests(distinctCmdScatterGather(), containsAtClusterTime, false);
+ testAttachesAtClusterTimeForSnapshotReadConcern(kDistinctCmdTargeted,
+ kDistinctCmdScatterGather);
}
TEST_F(ClusterDistinctTest, SnapshotReadConcernWithAfterClusterTime) {
- loadRoutingTableWithTwoChunksAndTwoShards(kNss);
-
- auto containsAtClusterTimeNoAfterClusterTime =
- [&](const executor::RemoteCommandRequest& request) {
- ASSERT(!request.cmdObj["readConcern"]["atClusterTime"].eoo());
- ASSERT(request.cmdObj["readConcern"]["afterClusterTime"].eoo());
-
- // The chosen atClusterTime should be greater than or equal to the request's
- // afterClusterTime.
- ASSERT_GTE(LogicalTime(request.cmdObj["readConcern"]["atClusterTime"].timestamp()),
- LogicalTime(kAfterClusterTime));
- };
-
- // Target one shard.
- runCommandInspectRequests(distinctCmdTargeted(true /*includeAfterClusterTime*/),
- containsAtClusterTimeNoAfterClusterTime,
- true);
-
- // Target all shards.
- runCommandInspectRequests(distinctCmdScatterGather(true /*includeAfterClusterTime*/),
- containsAtClusterTimeNoAfterClusterTime,
- false);
+ testSnapshotReadConcernWithAfterClusterTime(kDistinctCmdTargeted, kDistinctCmdScatterGather);
}
} // namespace
diff --git a/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp b/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp
index 6f61f915358..55ce1e94578 100644
--- a/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp
+++ b/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp
@@ -45,6 +45,7 @@
#include "mongo/s/commands/strategy.h"
#include "mongo/s/grid.h"
#include "mongo/s/stale_exception.h"
+#include "mongo/s/transaction/transaction_router.h"
#include "mongo/s/write_ops/cluster_write.h"
#include "mongo/util/timer.h"
@@ -202,6 +203,10 @@ private:
const NamespaceString& nss,
const BSONObj& cmdObj,
BSONObjBuilder* result) {
+ if (auto txnRouter = TransactionRouter::get(opCtx)) {
+ txnRouter->setAtClusterTimeToLatestTime(opCtx);
+ }
+
const auto response = [&] {
std::vector<AsyncRequestsSender::Request> requests;
requests.emplace_back(
@@ -226,7 +231,8 @@ private:
uassertStatusOK(response.status);
const auto responseStatus = getStatusFromCommandResult(response.data);
- if (ErrorCodes::isNeedRetargettingError(responseStatus.code())) {
+ if (ErrorCodes::isNeedRetargettingError(responseStatus.code()) ||
+ ErrorCodes::isSnapshotError(responseStatus.code())) {
// Command code traps this exception and re-runs
uassertStatusOK(responseStatus.withContext("findAndModify"));
}
diff --git a/src/mongo/s/commands/cluster_find_and_modify_test.cpp b/src/mongo/s/commands/cluster_find_and_modify_test.cpp
new file mode 100644
index 00000000000..f6ff28774ac
--- /dev/null
+++ b/src/mongo/s/commands/cluster_find_and_modify_test.cpp
@@ -0,0 +1,81 @@
+/**
+ * Copyright (C) 2018 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/s/commands/cluster_command_test_fixture.h"
+#include "mongo/util/log.h"
+
+namespace mongo {
+namespace {
+
+class ClusterFindAndModifyTest : public ClusterCommandTestFixture {
+protected:
+ const BSONObj kFindAndModifyCmdTargeted{
+ fromjson("{findAndModify: 'coll', query: {'_id': -1}, update: {$set: {x: 1}}}")};
+
+ void expectInspectRequest(int shardIndex, InspectionCallback cb) override {
+ onCommandForPoolExecutor([&](const executor::RemoteCommandRequest& request) {
+ ASSERT_EQ(kNss.coll(), request.cmdObj.firstElement().valueStringData());
+ cb(request);
+ return BSON("_id" << -1);
+ });
+ }
+
+ void expectReturnsSuccess(int shardIndex) override {
+ onCommandForPoolExecutor([this, shardIndex](const executor::RemoteCommandRequest& request) {
+ ASSERT_EQ(kNss.coll(), request.cmdObj.firstElement().valueStringData());
+ return BSON("_id" << -1);
+ });
+ }
+};
+
+TEST_F(ClusterFindAndModifyTest, NoErrors) {
+ testNoErrors(kFindAndModifyCmdTargeted);
+}
+
+TEST_F(ClusterFindAndModifyTest, RetryOnSnapshotError) {
+ testRetryOnSnapshotError(kFindAndModifyCmdTargeted);
+}
+
+TEST_F(ClusterFindAndModifyTest, MaxRetriesSnapshotErrors) {
+ testMaxRetriesSnapshotErrors(kFindAndModifyCmdTargeted);
+}
+
+TEST_F(ClusterFindAndModifyTest, AttachesAtClusterTimeForSnapshotReadConcern) {
+ testAttachesAtClusterTimeForSnapshotReadConcern(kFindAndModifyCmdTargeted);
+}
+
+TEST_F(ClusterFindAndModifyTest, SnapshotReadConcernWithAfterClusterTime) {
+ testSnapshotReadConcernWithAfterClusterTime(kFindAndModifyCmdTargeted);
+}
+
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/s/commands/cluster_insert_test.cpp b/src/mongo/s/commands/cluster_insert_test.cpp
new file mode 100644
index 00000000000..652b9bada42
--- /dev/null
+++ b/src/mongo/s/commands/cluster_insert_test.cpp
@@ -0,0 +1,76 @@
+/**
+ * Copyright (C) 2018 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/commands.h"
+#include "mongo/s/commands/cluster_command_test_fixture.h"
+#include "mongo/util/log.h"
+
+namespace mongo {
+namespace {
+
+class ClusterInsertTest : public ClusterCommandTestFixture {
+protected:
+ const BSONObj kInsertCmdTargeted{fromjson("{insert: 'coll', documents: [{'_id': -1}]}")};
+
+ const BSONObj kInsertCmdScatterGather{
+ fromjson("{insert: 'coll', documents: [{'_id': -1}, {'_id': 1}], ordered: false}")};
+
+ void expectInspectRequest(int shardIndex, InspectionCallback cb) override {
+ onCommandForPoolExecutor([&](const executor::RemoteCommandRequest& request) {
+ ASSERT_EQ(kNss.coll(), request.cmdObj.firstElement().valueStringData());
+ cb(request);
+ return BSON("nInserted" << 1);
+ });
+ }
+
+ void expectReturnsSuccess(int shardIndex) override {
+ onCommandForPoolExecutor([this, shardIndex](const executor::RemoteCommandRequest& request) {
+ ASSERT_EQ(kNss.coll(), request.cmdObj.firstElement().valueStringData());
+ return BSON("nInserted" << 1);
+ });
+ }
+};
+
+TEST_F(ClusterInsertTest, NoErrors) {
+ testNoErrors(kInsertCmdTargeted, kInsertCmdScatterGather);
+}
+
+TEST_F(ClusterInsertTest, AttachesAtClusterTimeForSnapshotReadConcern) {
+ testAttachesAtClusterTimeForSnapshotReadConcern(kInsertCmdTargeted, kInsertCmdScatterGather);
+}
+
+TEST_F(ClusterInsertTest, SnapshotReadConcernWithAfterClusterTime) {
+ testSnapshotReadConcernWithAfterClusterTime(kInsertCmdTargeted, kInsertCmdScatterGather);
+}
+
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/s/commands/cluster_update_test.cpp b/src/mongo/s/commands/cluster_update_test.cpp
new file mode 100644
index 00000000000..5e5f07df41a
--- /dev/null
+++ b/src/mongo/s/commands/cluster_update_test.cpp
@@ -0,0 +1,76 @@
+/**
+ * Copyright (C) 2018 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/s/commands/cluster_command_test_fixture.h"
+#include "mongo/util/log.h"
+
+namespace mongo {
+namespace {
+
+class ClusterUpdateTest : public ClusterCommandTestFixture {
+protected:
+ const BSONObj kUpdateCmdTargeted{
+ fromjson("{update: 'coll', updates: [{q: {'_id': -1}, u: {$set: {x: 1}}}]}")};
+
+ const BSONObj kUpdateCmdScatterGather{fromjson(
+ "{update: 'coll', updates: [{q: {'_id': {$gte: -1}}, u: {$set: {x: 1}}, multi: true}]}")};
+
+ void expectInspectRequest(int shardIndex, InspectionCallback cb) override {
+ onCommandForPoolExecutor([&](const executor::RemoteCommandRequest& request) {
+ ASSERT_EQ(kNss.coll(), request.cmdObj.firstElement().valueStringData());
+ cb(request);
+ return BSON("nMatched" << 1 << "nUpserted" << 0 << "nModified" << 1);
+ });
+ }
+
+ void expectReturnsSuccess(int shardIndex) override {
+ onCommandForPoolExecutor([this, shardIndex](const executor::RemoteCommandRequest& request) {
+ ASSERT_EQ(kNss.coll(), request.cmdObj.firstElement().valueStringData());
+ return BSON("nMatched" << 1 << "nUpserted" << 0 << "nModified" << 1);
+ });
+ }
+};
+
+TEST_F(ClusterUpdateTest, NoErrors) {
+ testNoErrors(kUpdateCmdTargeted, kUpdateCmdScatterGather);
+}
+
+TEST_F(ClusterUpdateTest, AttachesAtClusterTimeForSnapshotReadConcern) {
+ testAttachesAtClusterTimeForSnapshotReadConcern(kUpdateCmdTargeted, kUpdateCmdScatterGather);
+}
+
+TEST_F(ClusterUpdateTest, SnapshotReadConcernWithAfterClusterTime) {
+ testSnapshotReadConcernWithAfterClusterTime(kUpdateCmdTargeted, kUpdateCmdScatterGather);
+}
+
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/s/commands/cluster_write_cmd.cpp b/src/mongo/s/commands/cluster_write_cmd.cpp
index 9ff50ec8c59..adc98ad498d 100644
--- a/src/mongo/s/commands/cluster_write_cmd.cpp
+++ b/src/mongo/s/commands/cluster_write_cmd.cpp
@@ -45,6 +45,7 @@
#include "mongo/s/cluster_last_error_info.h"
#include "mongo/s/commands/cluster_explain.h"
#include "mongo/s/grid.h"
+#include "mongo/s/transaction/transaction_router.h"
#include "mongo/s/write_ops/batched_command_request.h"
#include "mongo/s/write_ops/batched_command_response.h"
#include "mongo/s/write_ops/chunk_manager_targeter.h"
@@ -246,6 +247,10 @@ private:
batchedRequest.setAllowImplicitCreate(false);
}
+ if (auto txnRouter = TransactionRouter::get(opCtx)) {
+ txnRouter->setAtClusterTimeToLatestTime(opCtx);
+ }
+
BatchWriteExecStats stats;
BatchedCommandResponse response;
ClusterWriter::write(opCtx, batchedRequest, &stats, &response);