diff options
author | Jack Mulrow <jack.mulrow@mongodb.com> | 2018-08-29 14:37:23 -0400 |
---|---|---|
committer | Jack Mulrow <jack.mulrow@mongodb.com> | 2018-09-06 18:46:12 -0400 |
commit | 3257bb958a29a23d5101a16c49d97536e5fba056 (patch) | |
tree | 54871c65289f89b954e2e91f183aac00ea730443 /src | |
parent | b4f3327901f58083d376064a6d89680325b26964 (diff) | |
download | mongo-3257bb958a29a23d5101a16c49d97536e5fba056.tar.gz |
SERVER-36557 Compute atClusterTime for cluster batch writes and findAndModify
Diffstat (limited to 'src')
20 files changed, 735 insertions, 156 deletions
diff --git a/src/mongo/s/commands/SConscript b/src/mongo/s/commands/SConscript index 3c6fe218bfd..7e49e001480 100644 --- a/src/mongo/s/commands/SConscript +++ b/src/mongo/s/commands/SConscript @@ -146,13 +146,18 @@ env.Library( ) env.CppUnitTest( - target="cluster_distinct_test", + target="cluster_commands_test", source=[ + "cluster_delete_test.cpp", "cluster_distinct_test.cpp", + "cluster_find_and_modify_test.cpp", + "cluster_insert_test.cpp", + "cluster_update_test.cpp", ], LIBDEPS=[ 'cluster_commands', 'cluster_command_test_fixture', + '$BUILD_DIR/mongo/db/auth/authmocks', '$BUILD_DIR/mongo/db/auth/saslauth', ], ) diff --git a/src/mongo/s/commands/cluster_command_test_fixture.cpp b/src/mongo/s/commands/cluster_command_test_fixture.cpp index 2d3bec52e9c..e9c44a154ef 100644 --- a/src/mongo/s/commands/cluster_command_test_fixture.cpp +++ b/src/mongo/s/commands/cluster_command_test_fixture.cpp @@ -39,6 +39,7 @@ #include "mongo/db/logical_clock.h" #include "mongo/db/logical_session_cache_noop.h" #include "mongo/db/logical_time_validator.h" +#include "mongo/s/cluster_last_error_info.h" #include "mongo/util/log.h" namespace mongo { @@ -62,6 +63,26 @@ void ClusterCommandTestFixture::setUp() { LogicalTimeValidator::set(getServiceContext(), std::move(validator)); LogicalSessionCache::set(getServiceContext(), stdx::make_unique<LogicalSessionCacheNoop>()); + + loadRoutingTableWithTwoChunksAndTwoShards(kNss); +} + +BSONObj ClusterCommandTestFixture::_makeCmd(BSONObj cmdObj, bool includeAfterClusterTime) { + BSONObjBuilder bob(cmdObj); + // Each command runs in a new session. + bob.append("lsid", makeLogicalSessionIdForTest().toBSON()); + bob.append("txnNumber", TxnNumber(1)); + bob.append("autocommit", false); + bob.append("startTransaction", true); + + BSONObjBuilder readConcernBob = bob.subobjStart(repl::ReadConcernArgs::kReadConcernFieldName); + readConcernBob.append("level", "snapshot"); + if (includeAfterClusterTime) { + readConcernBob.append("afterClusterTime", kAfterClusterTime); + } + + readConcernBob.doneFast(); + return bob.obj(); } void ClusterCommandTestFixture::expectReturnsError(ErrorCodes::Error code) { @@ -79,6 +100,15 @@ DbResponse ClusterCommandTestFixture::runCommand(BSONObj cmd) { const auto opMsgRequest = OpMsgRequest::fromDBAndBody(kNss.db(), cmd); + // Ensure the clusterGLE on the Client has not yet been initialized. + ASSERT(!ClusterLastErrorInfo::get(client.get())); + + // Initialize the cluster last error info for the client with a new request. + ClusterLastErrorInfo::get(client.get()) = std::make_shared<ClusterLastErrorInfo>(); + ASSERT(ClusterLastErrorInfo::get(client.get())); + auto clusterGLE = ClusterLastErrorInfo::get(client.get()); + clusterGLE->newRequest(); + return Strategy::clientCommand(opCtx.get(), opMsgRequest.serialize()); } @@ -142,4 +172,82 @@ void ClusterCommandTestFixture::runCommandMaxErrors(BSONObj cmd, future.timed_get(kFutureTimeout); } +void ClusterCommandTestFixture::testNoErrors(BSONObj targetedCmd, BSONObj scatterGatherCmd) { + + // Target one shard. + runCommandSuccessful(_makeCmd(targetedCmd), true); + + // Target all shards. + if (!scatterGatherCmd.isEmpty()) { + runCommandSuccessful(_makeCmd(scatterGatherCmd), false); + } +} + +void ClusterCommandTestFixture::testRetryOnSnapshotError(BSONObj targetedCmd, + BSONObj scatterGatherCmd) { + // Target one shard. + runCommandOneError(_makeCmd(targetedCmd), ErrorCodes::SnapshotUnavailable, true); + runCommandOneError(_makeCmd(targetedCmd), ErrorCodes::SnapshotTooOld, true); + + // Target all shards + if (!scatterGatherCmd.isEmpty()) { + runCommandOneError(_makeCmd(scatterGatherCmd), ErrorCodes::SnapshotUnavailable, false); + runCommandOneError(_makeCmd(scatterGatherCmd), ErrorCodes::SnapshotTooOld, false); + } +} + +void ClusterCommandTestFixture::testMaxRetriesSnapshotErrors(BSONObj targetedCmd, + BSONObj scatterGatherCmd) { + // Target one shard. + runCommandMaxErrors(_makeCmd(targetedCmd), ErrorCodes::SnapshotUnavailable, true); + runCommandMaxErrors(_makeCmd(targetedCmd), ErrorCodes::SnapshotTooOld, true); + + // Target all shards + if (!scatterGatherCmd.isEmpty()) { + runCommandMaxErrors(_makeCmd(scatterGatherCmd), ErrorCodes::SnapshotUnavailable, false); + runCommandMaxErrors(_makeCmd(scatterGatherCmd), ErrorCodes::SnapshotTooOld, false); + } +} + +void ClusterCommandTestFixture::testAttachesAtClusterTimeForSnapshotReadConcern( + BSONObj targetedCmd, BSONObj scatterGatherCmd) { + + auto containsAtClusterTime = [](const executor::RemoteCommandRequest& request) { + ASSERT(!request.cmdObj["readConcern"]["atClusterTime"].eoo()); + }; + + // Target one shard. + runCommandInspectRequests(_makeCmd(targetedCmd), containsAtClusterTime, true); + + // Target all shards. + if (!scatterGatherCmd.isEmpty()) { + runCommandInspectRequests(_makeCmd(scatterGatherCmd), containsAtClusterTime, false); + } +} + +void ClusterCommandTestFixture::testSnapshotReadConcernWithAfterClusterTime( + BSONObj targetedCmd, BSONObj scatterGatherCmd) { + + auto containsAtClusterTimeNoAfterClusterTime = + [&](const executor::RemoteCommandRequest& request) { + ASSERT(!request.cmdObj["readConcern"]["atClusterTime"].eoo()); + ASSERT(request.cmdObj["readConcern"]["afterClusterTime"].eoo()); + + // The chosen atClusterTime should be greater than or equal to the request's + // afterClusterTime. + ASSERT_GTE(LogicalTime(request.cmdObj["readConcern"]["atClusterTime"].timestamp()), + LogicalTime(kAfterClusterTime)); + }; + + // Target one shard. + runCommandInspectRequests( + _makeCmd(targetedCmd, true), containsAtClusterTimeNoAfterClusterTime, true); + + // Target all shards. + if (!scatterGatherCmd.isEmpty()) { + runCommandInspectRequests( + _makeCmd(scatterGatherCmd, true), containsAtClusterTimeNoAfterClusterTime, false); + } +} + } // namespace mongo diff --git a/src/mongo/s/commands/cluster_command_test_fixture.h b/src/mongo/s/commands/cluster_command_test_fixture.h index 9ddf001d080..71cc2f2e830 100644 --- a/src/mongo/s/commands/cluster_command_test_fixture.h +++ b/src/mongo/s/commands/cluster_command_test_fixture.h @@ -45,6 +45,8 @@ protected: const LogicalTime kInMemoryLogicalTime = LogicalTime(Timestamp(10, 1)); + const Timestamp kAfterClusterTime = Timestamp(50, 2); + void setUp() override; virtual void expectInspectRequest(int shardIndex, InspectionCallback cb) = 0; @@ -62,6 +64,43 @@ protected: void runCommandInspectRequests(BSONObj cmd, InspectionCallback cb, bool isTargeted); void runCommandMaxErrors(BSONObj cmd, ErrorCodes::Error code, bool isTargeted); + + /** + * Verifies that running the given commands through mongos will succeed. + */ + void testNoErrors(BSONObj targetedCmd, BSONObj scatterGatherCmd = BSONObj()); + + /** + * Verifies that the given commands will retry on a snapshot error. + */ + void testRetryOnSnapshotError(BSONObj targetedCmd, BSONObj scatterGatherCmd = BSONObj()); + + /** + * Verifies that the given commands will retry up to the max retry attempts on snapshot + * errors then return the final errors they receive. + */ + void testMaxRetriesSnapshotErrors(BSONObj targetedCmd, BSONObj scatterGatherCmd = BSONObj()); + + /** + * Verifies that atClusterTime is attached to the given commands. + */ + void testAttachesAtClusterTimeForSnapshotReadConcern(BSONObj targetedCmd, + BSONObj scatterGatherCmd = BSONObj()); + + /** + * Verifies that the chosen atClusterTime is greater than or equal to each command's + * afterClusterTime. + */ + void testSnapshotReadConcernWithAfterClusterTime(BSONObj targetedCmd, + BSONObj scatterGatherCmd = BSONObj()); + +private: + /** + * Makes a new command object from the one given by apppending read concern + * snapshot and the appropriate transaction options. If includeAfterClusterTime + * is true, also appends afterClusterTime to the read concern. + */ + BSONObj _makeCmd(BSONObj cmdObj, bool includeAfterClusterTime = false); }; } // namespace mongo diff --git a/src/mongo/s/commands/cluster_delete_test.cpp b/src/mongo/s/commands/cluster_delete_test.cpp new file mode 100644 index 00000000000..275e33254ec --- /dev/null +++ b/src/mongo/s/commands/cluster_delete_test.cpp @@ -0,0 +1,76 @@ +/** +* Copyright (C) 2018 MongoDB Inc. +* +* This program is free software: you can redistribute it and/or modify +* it under the terms of the GNU Affero General Public License, version 3, +* as published by the Free Software Foundation. +* +* This program is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +* GNU Affero General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with this program. If not, see <http://www.gnu.org/licenses/>. +* +* As a special exception, the copyright holders give permission to link the +* code of portions of this program with the OpenSSL library under certain +* conditions as described in each individual source file and distribute +* linked combinations including the program with the OpenSSL library. You +* must comply with the GNU Affero General Public License in all respects +* for all of the code used other than as permitted herein. If you modify +* file(s) with this exception, you may extend this exception to your +* version of the file(s), but you are not obligated to do so. If you do not +* wish to do so, delete this exception statement from your version. If you +* delete this exception statement from all source files in the program, +* then also delete it in the license file. +*/ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault + +#include "mongo/platform/basic.h" + +#include "mongo/s/commands/cluster_command_test_fixture.h" +#include "mongo/util/log.h" + +namespace mongo { +namespace { + +class ClusterDeleteTest : public ClusterCommandTestFixture { +protected: + const BSONObj kDeleteCmdTargeted{ + fromjson("{delete: 'coll', deletes: [{q: {'_id': -1}, limit: 0}]}")}; + + const BSONObj kDeleteCmdScatterGather{ + fromjson("{delete: 'coll', deletes: [{q: {'_id': {$gte: -1}}, limit: 0}]}")}; + + void expectInspectRequest(int shardIndex, InspectionCallback cb) override { + onCommandForPoolExecutor([&](const executor::RemoteCommandRequest& request) { + ASSERT_EQ(kNss.coll(), request.cmdObj.firstElement().valueStringData()); + cb(request); + return BSON("n" << 1); + }); + } + + void expectReturnsSuccess(int shardIndex) override { + onCommandForPoolExecutor([this, shardIndex](const executor::RemoteCommandRequest& request) { + ASSERT_EQ(kNss.coll(), request.cmdObj.firstElement().valueStringData()); + return BSON("n" << 1); + }); + } +}; + +TEST_F(ClusterDeleteTest, NoErrors) { + testNoErrors(kDeleteCmdTargeted, kDeleteCmdScatterGather); +} + +TEST_F(ClusterDeleteTest, AttachesAtClusterTimeForSnapshotReadConcern) { + testAttachesAtClusterTimeForSnapshotReadConcern(kDeleteCmdTargeted, kDeleteCmdScatterGather); +} + +TEST_F(ClusterDeleteTest, SnapshotReadConcernWithAfterClusterTime) { + testSnapshotReadConcernWithAfterClusterTime(kDeleteCmdTargeted, kDeleteCmdScatterGather); +} + +} // namespace +} // namespace mongo diff --git a/src/mongo/s/commands/cluster_distinct_test.cpp b/src/mongo/s/commands/cluster_distinct_test.cpp index 16c2468e3a1..2224d0f3adb 100644 --- a/src/mongo/s/commands/cluster_distinct_test.cpp +++ b/src/mongo/s/commands/cluster_distinct_test.cpp @@ -38,39 +38,10 @@ namespace { class ClusterDistinctTest : public ClusterCommandTestFixture { protected: - const Timestamp kAfterClusterTime = Timestamp(50, 2); - const BSONObj kDistinctCmdTargeted{ - fromjson("{distinct: 'coll', key: 'x', query: {'_id': {$lt: -1}}, autocommit: false, " - "txnNumber: NumberLong(1), startTransaction: true}")}; - - const BSONObj kDistinctCmdScatterGather{ - fromjson("{distinct: 'coll', key: '_id', autocommit: false, txnNumber: NumberLong(1), " - "startTransaction: true}")}; - - BSONObj appendLogicalSessionIdAndSnapshotReadConcern(BSONObj cmdObj, - bool includeAfterClusterTime) { - BSONObjBuilder bob(cmdObj); - bob.append("lsid", makeLogicalSessionIdForTest().toBSON()); - BSONObjBuilder readConcernBob = - bob.subobjStart(repl::ReadConcernArgs::kReadConcernFieldName); - readConcernBob.append("level", "snapshot"); - if (includeAfterClusterTime) { - readConcernBob.append("afterClusterTime", kAfterClusterTime); - } - readConcernBob.doneFast(); - return bob.obj(); - } - - BSONObj distinctCmdTargeted(bool includeAfterClusterTime = false) { - return appendLogicalSessionIdAndSnapshotReadConcern(kDistinctCmdTargeted, - includeAfterClusterTime); - } + fromjson("{distinct: 'coll', key: 'x', query: {'_id': {$lt: -1}}}")}; - BSONObj distinctCmdScatterGather(bool includeAfterClusterTime = false) { - return appendLogicalSessionIdAndSnapshotReadConcern(kDistinctCmdScatterGather, - includeAfterClusterTime); - } + const BSONObj kDistinctCmdScatterGather{fromjson("{distinct: 'coll', key: '_id'}")}; void expectInspectRequest(int shardIndex, InspectionCallback cb) override { onCommandForPoolExecutor([&](const executor::RemoteCommandRequest& request) { @@ -89,79 +60,24 @@ protected: }; TEST_F(ClusterDistinctTest, NoErrors) { - loadRoutingTableWithTwoChunksAndTwoShards(kNss); - - // Target one shard. - runCommandSuccessful(distinctCmdTargeted(), true); - - // Target all shards. - runCommandSuccessful(distinctCmdScatterGather(), false); + testNoErrors(kDistinctCmdTargeted, kDistinctCmdScatterGather); } -// Verify distinct through mongos will retry on a snapshot error. TEST_F(ClusterDistinctTest, RetryOnSnapshotError) { - loadRoutingTableWithTwoChunksAndTwoShards(kNss); - - // Target one shard. - runCommandOneError(distinctCmdTargeted(), ErrorCodes::SnapshotUnavailable, true); - runCommandOneError(distinctCmdTargeted(), ErrorCodes::SnapshotTooOld, true); - - // Target all shards - runCommandOneError(distinctCmdScatterGather(), ErrorCodes::SnapshotUnavailable, false); - runCommandOneError(distinctCmdScatterGather(), ErrorCodes::SnapshotTooOld, false); + testRetryOnSnapshotError(kDistinctCmdTargeted, kDistinctCmdScatterGather); } -// Verify distinct commands will retry up to its max retry attempts on snapshot errors -// then return the final error it receives. TEST_F(ClusterDistinctTest, MaxRetriesSnapshotErrors) { - loadRoutingTableWithTwoChunksAndTwoShards(kNss); - - // Target one shard. - runCommandMaxErrors(distinctCmdTargeted(), ErrorCodes::SnapshotUnavailable, true); - runCommandMaxErrors(distinctCmdTargeted(), ErrorCodes::SnapshotTooOld, true); - - // Target all shards - runCommandMaxErrors(distinctCmdScatterGather(), ErrorCodes::SnapshotUnavailable, false); - runCommandMaxErrors(distinctCmdScatterGather(), ErrorCodes::SnapshotTooOld, false); + testMaxRetriesSnapshotErrors(kDistinctCmdTargeted, kDistinctCmdScatterGather); } TEST_F(ClusterDistinctTest, AttachesAtClusterTimeForSnapshotReadConcern) { - loadRoutingTableWithTwoChunksAndTwoShards(kNss); - - auto containsAtClusterTime = [](const executor::RemoteCommandRequest& request) { - ASSERT(!request.cmdObj["readConcern"]["atClusterTime"].eoo()); - }; - - // Target one shard. - runCommandInspectRequests(distinctCmdTargeted(), containsAtClusterTime, true); - - // Target all shards. - runCommandInspectRequests(distinctCmdScatterGather(), containsAtClusterTime, false); + testAttachesAtClusterTimeForSnapshotReadConcern(kDistinctCmdTargeted, + kDistinctCmdScatterGather); } TEST_F(ClusterDistinctTest, SnapshotReadConcernWithAfterClusterTime) { - loadRoutingTableWithTwoChunksAndTwoShards(kNss); - - auto containsAtClusterTimeNoAfterClusterTime = - [&](const executor::RemoteCommandRequest& request) { - ASSERT(!request.cmdObj["readConcern"]["atClusterTime"].eoo()); - ASSERT(request.cmdObj["readConcern"]["afterClusterTime"].eoo()); - - // The chosen atClusterTime should be greater than or equal to the request's - // afterClusterTime. - ASSERT_GTE(LogicalTime(request.cmdObj["readConcern"]["atClusterTime"].timestamp()), - LogicalTime(kAfterClusterTime)); - }; - - // Target one shard. - runCommandInspectRequests(distinctCmdTargeted(true /*includeAfterClusterTime*/), - containsAtClusterTimeNoAfterClusterTime, - true); - - // Target all shards. - runCommandInspectRequests(distinctCmdScatterGather(true /*includeAfterClusterTime*/), - containsAtClusterTimeNoAfterClusterTime, - false); + testSnapshotReadConcernWithAfterClusterTime(kDistinctCmdTargeted, kDistinctCmdScatterGather); } } // namespace diff --git a/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp b/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp index 6f61f915358..55ce1e94578 100644 --- a/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp +++ b/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp @@ -45,6 +45,7 @@ #include "mongo/s/commands/strategy.h" #include "mongo/s/grid.h" #include "mongo/s/stale_exception.h" +#include "mongo/s/transaction/transaction_router.h" #include "mongo/s/write_ops/cluster_write.h" #include "mongo/util/timer.h" @@ -202,6 +203,10 @@ private: const NamespaceString& nss, const BSONObj& cmdObj, BSONObjBuilder* result) { + if (auto txnRouter = TransactionRouter::get(opCtx)) { + txnRouter->setAtClusterTimeToLatestTime(opCtx); + } + const auto response = [&] { std::vector<AsyncRequestsSender::Request> requests; requests.emplace_back( @@ -226,7 +231,8 @@ private: uassertStatusOK(response.status); const auto responseStatus = getStatusFromCommandResult(response.data); - if (ErrorCodes::isNeedRetargettingError(responseStatus.code())) { + if (ErrorCodes::isNeedRetargettingError(responseStatus.code()) || + ErrorCodes::isSnapshotError(responseStatus.code())) { // Command code traps this exception and re-runs uassertStatusOK(responseStatus.withContext("findAndModify")); } diff --git a/src/mongo/s/commands/cluster_find_and_modify_test.cpp b/src/mongo/s/commands/cluster_find_and_modify_test.cpp new file mode 100644 index 00000000000..f6ff28774ac --- /dev/null +++ b/src/mongo/s/commands/cluster_find_and_modify_test.cpp @@ -0,0 +1,81 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault + +#include "mongo/platform/basic.h" + +#include "mongo/s/commands/cluster_command_test_fixture.h" +#include "mongo/util/log.h" + +namespace mongo { +namespace { + +class ClusterFindAndModifyTest : public ClusterCommandTestFixture { +protected: + const BSONObj kFindAndModifyCmdTargeted{ + fromjson("{findAndModify: 'coll', query: {'_id': -1}, update: {$set: {x: 1}}}")}; + + void expectInspectRequest(int shardIndex, InspectionCallback cb) override { + onCommandForPoolExecutor([&](const executor::RemoteCommandRequest& request) { + ASSERT_EQ(kNss.coll(), request.cmdObj.firstElement().valueStringData()); + cb(request); + return BSON("_id" << -1); + }); + } + + void expectReturnsSuccess(int shardIndex) override { + onCommandForPoolExecutor([this, shardIndex](const executor::RemoteCommandRequest& request) { + ASSERT_EQ(kNss.coll(), request.cmdObj.firstElement().valueStringData()); + return BSON("_id" << -1); + }); + } +}; + +TEST_F(ClusterFindAndModifyTest, NoErrors) { + testNoErrors(kFindAndModifyCmdTargeted); +} + +TEST_F(ClusterFindAndModifyTest, RetryOnSnapshotError) { + testRetryOnSnapshotError(kFindAndModifyCmdTargeted); +} + +TEST_F(ClusterFindAndModifyTest, MaxRetriesSnapshotErrors) { + testMaxRetriesSnapshotErrors(kFindAndModifyCmdTargeted); +} + +TEST_F(ClusterFindAndModifyTest, AttachesAtClusterTimeForSnapshotReadConcern) { + testAttachesAtClusterTimeForSnapshotReadConcern(kFindAndModifyCmdTargeted); +} + +TEST_F(ClusterFindAndModifyTest, SnapshotReadConcernWithAfterClusterTime) { + testSnapshotReadConcernWithAfterClusterTime(kFindAndModifyCmdTargeted); +} + +} // namespace +} // namespace mongo diff --git a/src/mongo/s/commands/cluster_insert_test.cpp b/src/mongo/s/commands/cluster_insert_test.cpp new file mode 100644 index 00000000000..652b9bada42 --- /dev/null +++ b/src/mongo/s/commands/cluster_insert_test.cpp @@ -0,0 +1,76 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault + +#include "mongo/platform/basic.h" + +#include "mongo/db/commands.h" +#include "mongo/s/commands/cluster_command_test_fixture.h" +#include "mongo/util/log.h" + +namespace mongo { +namespace { + +class ClusterInsertTest : public ClusterCommandTestFixture { +protected: + const BSONObj kInsertCmdTargeted{fromjson("{insert: 'coll', documents: [{'_id': -1}]}")}; + + const BSONObj kInsertCmdScatterGather{ + fromjson("{insert: 'coll', documents: [{'_id': -1}, {'_id': 1}], ordered: false}")}; + + void expectInspectRequest(int shardIndex, InspectionCallback cb) override { + onCommandForPoolExecutor([&](const executor::RemoteCommandRequest& request) { + ASSERT_EQ(kNss.coll(), request.cmdObj.firstElement().valueStringData()); + cb(request); + return BSON("nInserted" << 1); + }); + } + + void expectReturnsSuccess(int shardIndex) override { + onCommandForPoolExecutor([this, shardIndex](const executor::RemoteCommandRequest& request) { + ASSERT_EQ(kNss.coll(), request.cmdObj.firstElement().valueStringData()); + return BSON("nInserted" << 1); + }); + } +}; + +TEST_F(ClusterInsertTest, NoErrors) { + testNoErrors(kInsertCmdTargeted, kInsertCmdScatterGather); +} + +TEST_F(ClusterInsertTest, AttachesAtClusterTimeForSnapshotReadConcern) { + testAttachesAtClusterTimeForSnapshotReadConcern(kInsertCmdTargeted, kInsertCmdScatterGather); +} + +TEST_F(ClusterInsertTest, SnapshotReadConcernWithAfterClusterTime) { + testSnapshotReadConcernWithAfterClusterTime(kInsertCmdTargeted, kInsertCmdScatterGather); +} + +} // namespace +} // namespace mongo diff --git a/src/mongo/s/commands/cluster_update_test.cpp b/src/mongo/s/commands/cluster_update_test.cpp new file mode 100644 index 00000000000..5e5f07df41a --- /dev/null +++ b/src/mongo/s/commands/cluster_update_test.cpp @@ -0,0 +1,76 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault + +#include "mongo/platform/basic.h" + +#include "mongo/s/commands/cluster_command_test_fixture.h" +#include "mongo/util/log.h" + +namespace mongo { +namespace { + +class ClusterUpdateTest : public ClusterCommandTestFixture { +protected: + const BSONObj kUpdateCmdTargeted{ + fromjson("{update: 'coll', updates: [{q: {'_id': -1}, u: {$set: {x: 1}}}]}")}; + + const BSONObj kUpdateCmdScatterGather{fromjson( + "{update: 'coll', updates: [{q: {'_id': {$gte: -1}}, u: {$set: {x: 1}}, multi: true}]}")}; + + void expectInspectRequest(int shardIndex, InspectionCallback cb) override { + onCommandForPoolExecutor([&](const executor::RemoteCommandRequest& request) { + ASSERT_EQ(kNss.coll(), request.cmdObj.firstElement().valueStringData()); + cb(request); + return BSON("nMatched" << 1 << "nUpserted" << 0 << "nModified" << 1); + }); + } + + void expectReturnsSuccess(int shardIndex) override { + onCommandForPoolExecutor([this, shardIndex](const executor::RemoteCommandRequest& request) { + ASSERT_EQ(kNss.coll(), request.cmdObj.firstElement().valueStringData()); + return BSON("nMatched" << 1 << "nUpserted" << 0 << "nModified" << 1); + }); + } +}; + +TEST_F(ClusterUpdateTest, NoErrors) { + testNoErrors(kUpdateCmdTargeted, kUpdateCmdScatterGather); +} + +TEST_F(ClusterUpdateTest, AttachesAtClusterTimeForSnapshotReadConcern) { + testAttachesAtClusterTimeForSnapshotReadConcern(kUpdateCmdTargeted, kUpdateCmdScatterGather); +} + +TEST_F(ClusterUpdateTest, SnapshotReadConcernWithAfterClusterTime) { + testSnapshotReadConcernWithAfterClusterTime(kUpdateCmdTargeted, kUpdateCmdScatterGather); +} + +} // namespace +} // namespace mongo diff --git a/src/mongo/s/commands/cluster_write_cmd.cpp b/src/mongo/s/commands/cluster_write_cmd.cpp index 9ff50ec8c59..adc98ad498d 100644 --- a/src/mongo/s/commands/cluster_write_cmd.cpp +++ b/src/mongo/s/commands/cluster_write_cmd.cpp @@ -45,6 +45,7 @@ #include "mongo/s/cluster_last_error_info.h" #include "mongo/s/commands/cluster_explain.h" #include "mongo/s/grid.h" +#include "mongo/s/transaction/transaction_router.h" #include "mongo/s/write_ops/batched_command_request.h" #include "mongo/s/write_ops/batched_command_response.h" #include "mongo/s/write_ops/chunk_manager_targeter.h" @@ -246,6 +247,10 @@ private: batchedRequest.setAllowImplicitCreate(false); } + if (auto txnRouter = TransactionRouter::get(opCtx)) { + txnRouter->setAtClusterTimeToLatestTime(opCtx); + } + BatchWriteExecStats stats; BatchedCommandResponse response; ClusterWriter::write(opCtx, batchedRequest, &stats, &response); diff --git a/src/mongo/s/query/cluster_aggregate_test.cpp b/src/mongo/s/query/cluster_aggregate_test.cpp index 517faba1295..1ad895c97a6 100644 --- a/src/mongo/s/query/cluster_aggregate_test.cpp +++ b/src/mongo/s/query/cluster_aggregate_test.cpp @@ -42,27 +42,11 @@ protected: const BSONObj kAggregateCmdTargeted{ fromjson("{aggregate: 'coll', pipeline: [{$match: {_id: 0}}], explain: false, " "allowDiskUse: false, fromMongos: true, " - "cursor: {batchSize: 10}, maxTimeMS: 100, readConcern: {level: 'snapshot'}, " - "autocommit: false, txnNumber: NumberLong(1), startTransaction: true}")}; + "cursor: {batchSize: 10}, maxTimeMS: 100}")}; const BSONObj kAggregateCmdScatterGather{fromjson( "{aggregate: 'coll', pipeline: [], explain: false, allowDiskUse: false, fromMongos: true, " - "cursor: {batchSize: 10}, readConcern: {level: 'snapshot'}, autocommit: false, txnNumber: " - "NumberLong(1), startTransaction: true}")}; - - BSONObj appendLogicalSessionId(BSONObj cmdObj) { - BSONObjBuilder bob(cmdObj); - bob.append("lsid", makeLogicalSessionIdForTest().toBSON()); - return bob.obj(); - } - - BSONObj aggregateCmdTargeted() { - return appendLogicalSessionId(kAggregateCmdTargeted); - } - - BSONObj aggregateCmdScatterGather() { - return appendLogicalSessionId(kAggregateCmdScatterGather); - } + "cursor: {batchSize: 10}}")}; // The index of the shard expected to receive the response is used to prevent different shards // from returning documents with the same shard key. This is expected to be 0 for queries @@ -91,54 +75,24 @@ protected: }; TEST_F(ClusterAggregateTest, NoErrors) { - loadRoutingTableWithTwoChunksAndTwoShards(kNss); - - // Target one shard. - runCommandSuccessful(aggregateCmdTargeted(), true); - - // Target all shards. - runCommandSuccessful(aggregateCmdScatterGather(), false); + testNoErrors(kAggregateCmdTargeted, kAggregateCmdScatterGather); } -// Verify aggregate through mongos will retry on a snapshot error. TEST_F(ClusterAggregateTest, RetryOnSnapshotError) { - loadRoutingTableWithTwoChunksAndTwoShards(kNss); - - // Target one shard. - runCommandOneError(aggregateCmdTargeted(), ErrorCodes::SnapshotUnavailable, true); - runCommandOneError(aggregateCmdTargeted(), ErrorCodes::SnapshotTooOld, true); + testRetryOnSnapshotError(kAggregateCmdTargeted, kAggregateCmdScatterGather); +} - // Target all shards - runCommandOneError(aggregateCmdScatterGather(), ErrorCodes::SnapshotUnavailable, false); - runCommandOneError(aggregateCmdScatterGather(), ErrorCodes::SnapshotTooOld, false); +TEST_F(ClusterAggregateTest, MaxRetriesSnapshotErrors) { + testMaxRetriesSnapshotErrors(kAggregateCmdTargeted, kAggregateCmdScatterGather); } TEST_F(ClusterAggregateTest, AttachesAtClusterTimeForSnapshotReadConcern) { - loadRoutingTableWithTwoChunksAndTwoShards(kNss); - - auto containsAtClusterTime = [](const executor::RemoteCommandRequest& request) { - ASSERT(!request.cmdObj["readConcern"]["atClusterTime"].eoo()); - }; - - // Target one shard. - runCommandInspectRequests(aggregateCmdTargeted(), containsAtClusterTime, true); - - // Target all shards. - runCommandInspectRequests(aggregateCmdScatterGather(), containsAtClusterTime, false); + testAttachesAtClusterTimeForSnapshotReadConcern(kAggregateCmdTargeted, + kAggregateCmdScatterGather); } -// Verify aggregate commands will retry up to its max retry attempts on snapshot errors -// then return the final error it receives. -TEST_F(ClusterAggregateTest, MaxRetriesSnapshotErrors) { - loadRoutingTableWithTwoChunksAndTwoShards(kNss); - - // Target one shard. - runCommandMaxErrors(aggregateCmdTargeted(), ErrorCodes::SnapshotUnavailable, true); - runCommandMaxErrors(aggregateCmdTargeted(), ErrorCodes::SnapshotTooOld, true); - - // Target all shards - runCommandMaxErrors(aggregateCmdScatterGather(), ErrorCodes::SnapshotUnavailable, false); - runCommandMaxErrors(aggregateCmdScatterGather(), ErrorCodes::SnapshotTooOld, false); +TEST_F(ClusterAggregateTest, SnapshotReadConcernWithAfterClusterTime) { + testSnapshotReadConcernWithAfterClusterTime(kAggregateCmdTargeted, kAggregateCmdScatterGather); } } // namespace diff --git a/src/mongo/s/transaction/at_cluster_time_util.cpp b/src/mongo/s/transaction/at_cluster_time_util.cpp index 0a1c66dce03..1264a785047 100644 --- a/src/mongo/s/transaction/at_cluster_time_util.cpp +++ b/src/mongo/s/transaction/at_cluster_time_util.cpp @@ -118,11 +118,20 @@ boost::optional<LogicalTime> computeAtClusterTimeForOneShard(OperationContext* o uassert(ErrorCodes::ShardNotFound, str::stream() << "Could not find shard " << shardId, shard); // Return the cached last committed opTime for the shard if there is one, otherwise return the - // lastest cluster time from the logical clock. + // latest cluster time from the logical clock. auto lastCommittedOpTime = shard->getLastCommittedOpTime(); - return lastCommittedOpTime != LogicalTime::kUninitialized + + auto atClusterTime = lastCommittedOpTime != LogicalTime::kUninitialized ? lastCommittedOpTime : LogicalClock::get(opCtx)->getClusterTime(); + + // If the user passed afterClusterTime, atClusterTime must be greater than or equal to it. + const auto afterClusterTime = repl::ReadConcernArgs::get(opCtx).getArgsAfterClusterTime(); + if (afterClusterTime && *afterClusterTime > atClusterTime) { + return afterClusterTime; + } + + return atClusterTime; } } // namespace at_cluster_time_util diff --git a/src/mongo/s/transaction/compute_at_cluster_time_test.cpp b/src/mongo/s/transaction/compute_at_cluster_time_test.cpp index 5bca0643238..61efcabe61d 100644 --- a/src/mongo/s/transaction/compute_at_cluster_time_test.cpp +++ b/src/mongo/s/transaction/compute_at_cluster_time_test.cpp @@ -268,6 +268,10 @@ TEST_F(AtClusterTimeTargetingTest, AfterClusterTime) { ASSERT(computedTime); ASSERT_GTE(*computedTime, afterClusterTime); + computedTime = at_cluster_time_util::computeAtClusterTimeForOneShard(operationContext(), s0); + ASSERT(computedTime); + ASSERT_GTE(*computedTime, afterClusterTime); + // Target all shards. computedTime = at_cluster_time_util::computeAtClusterTime( operationContext(), true, {s0, s1}, kNss, kEmptyQuery, kEmptyCollation); @@ -286,6 +290,10 @@ TEST_F(AtClusterTimeTargetingTest, AfterClusterTime) { ASSERT(computedTime); ASSERT_GTE(*computedTime, afterClusterTime); + computedTime = at_cluster_time_util::computeAtClusterTimeForOneShard(operationContext(), s0); + ASSERT(computedTime); + ASSERT_GTE(*computedTime, afterClusterTime); + // Target all shards. computedTime = at_cluster_time_util::computeAtClusterTime( operationContext(), true, {s0, s1}, kNss, kEmptyQuery, kEmptyCollation); @@ -329,6 +337,10 @@ TEST_F(AtClusterTimeTargetingTest, AfterClusterTimeLowerBound) { ASSERT(computedTime); ASSERT_EQ(*computedTime, afterClusterTime); + computedTime = at_cluster_time_util::computeAtClusterTimeForOneShard(operationContext(), s0); + ASSERT(computedTime); + ASSERT_EQ(*computedTime, afterClusterTime); + // Target one shard with a last committed optime less than afterClusterTime. The computed value // should still equal afterClusterTime. LogicalTime time1(Timestamp(1, 1)); @@ -339,6 +351,10 @@ TEST_F(AtClusterTimeTargetingTest, AfterClusterTimeLowerBound) { operationContext(), true, {s0}, kNss, kEmptyQuery, kEmptyCollation); ASSERT(computedTime); ASSERT_EQ(*computedTime, afterClusterTime); + + computedTime = at_cluster_time_util::computeAtClusterTimeForOneShard(operationContext(), s0); + ASSERT(computedTime); + ASSERT_EQ(*computedTime, afterClusterTime); } } // namespace diff --git a/src/mongo/s/transaction/transaction_router.cpp b/src/mongo/s/transaction/transaction_router.cpp index 9477bb643b7..823b6e8c813 100644 --- a/src/mongo/s/transaction/transaction_router.cpp +++ b/src/mongo/s/transaction/transaction_router.cpp @@ -244,9 +244,9 @@ TransactionRouter::Participant& TransactionRouter::getOrCreateParticipant(const auto participant = TransactionRouter::Participant(isFirstParticipant, _txnNumber, _readConcernArgs); - // TODO SERVER-36557: Every command that starts a cross-shard transaction should - // compute atClusterTime with snapshot read concern. Hence, we should be able to - // add an invariant here to ensure that atClusterTime is not none. + // TODO SERVER-36589: Once mongos aborts transactions by only sending abortTransaction to shards + // that have been successfully contacted we should be able to add an invariant here to ensure + // that an atClusterTime has been chosen if the read concern level is snapshot. if (_atClusterTime) { participant.setAtClusterTime(*_atClusterTime); } @@ -299,6 +299,24 @@ void TransactionRouter::computeAtClusterTimeForOneShard(OperationContext* opCtx, } } +void TransactionRouter::setAtClusterTimeToLatestTime(OperationContext* opCtx) { + if (_atClusterTime || + _readConcernArgs.getLevel() != repl::ReadConcernLevel::kSnapshotReadConcern) { + return; + } + + auto atClusterTime = LogicalClock::get(opCtx)->getClusterTime(); + + // If the user passed afterClusterTime, atClusterTime for the transaction must be selected so it + // is at least equal to or greater than it. + auto afterClusterTime = repl::ReadConcernArgs::get(opCtx).getArgsAfterClusterTime(); + if (afterClusterTime && *afterClusterTime > atClusterTime) { + atClusterTime = *afterClusterTime; + } + + _atClusterTime = atClusterTime; +} + void TransactionRouter::beginOrContinueTxn(OperationContext* opCtx, TxnNumber txnNumber, bool startTransaction) { diff --git a/src/mongo/s/transaction/transaction_router.h b/src/mongo/s/transaction/transaction_router.h index 22447620f55..5a8acc9e97b 100644 --- a/src/mongo/s/transaction/transaction_router.h +++ b/src/mongo/s/transaction/transaction_router.h @@ -126,6 +126,12 @@ public: */ void computeAtClusterTimeForOneShard(OperationContext* opCtx, const ShardId& shardId); + /** + * Sets the atClusterTime for the current transaction to the latest time in the router's logical + * clock. + */ + void setAtClusterTimeToLatestTime(OperationContext* opCtx); + bool isCheckedOut(); const LogicalSessionId& getSessionId() const; diff --git a/src/mongo/s/write_ops/SConscript b/src/mongo/s/write_ops/SConscript index 593d65a4d23..6aa140a7722 100644 --- a/src/mongo/s/write_ops/SConscript +++ b/src/mongo/s/write_ops/SConscript @@ -34,6 +34,7 @@ env.Library( LIBDEPS=[ '$BUILD_DIR/mongo/s/async_requests_sender', '$BUILD_DIR/mongo/s/commands/cluster_commands_helpers', + '$BUILD_DIR/mongo/s/transaction/router_session', 'batch_write_types', ], ) diff --git a/src/mongo/s/write_ops/batch_write_exec.cpp b/src/mongo/s/write_ops/batch_write_exec.cpp index 120d7ca33f8..844730e1c40 100644 --- a/src/mongo/s/write_ops/batch_write_exec.cpp +++ b/src/mongo/s/write_ops/batch_write_exec.cpp @@ -42,6 +42,7 @@ #include "mongo/s/async_requests_sender.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/grid.h" +#include "mongo/s/transaction/transaction_router.h" #include "mongo/s/write_ops/batch_write_op.h" #include "mongo/s/write_ops/write_error_detail.h" #include "mongo/util/log.h" @@ -270,6 +271,18 @@ void BatchWriteExec::executeBatch(OperationContext* opCtx, LOG(4) << "Write results received from " << shardHost.toString() << ": " << redact(batchedCommandResponse.toString()); + // If we are in a transaction, we must fail the whole batch. + if (TransactionRouter::get(opCtx)) { + // Note: this returns a bad status if any part of the batch failed. + auto batchStatus = batchedCommandResponse.toStatus(); + if (!batchStatus.isOK()) { + batchOp.forgetTargetedBatchesOnTransactionAbortingError(); + uassertStatusOK(batchStatus.withContext( + str::stream() << "Encountered error from " << shardHost.toString() + << " during a transaction")); + } + } + // Dispatch was ok, note response batchOp.noteBatchResponse(*batch, batchedCommandResponse, &trackedErrors); diff --git a/src/mongo/s/write_ops/batch_write_exec_test.cpp b/src/mongo/s/write_ops/batch_write_exec_test.cpp index 6509ad29e6b..0ea95d1d4fe 100644 --- a/src/mongo/s/write_ops/batch_write_exec_test.cpp +++ b/src/mongo/s/write_ops/batch_write_exec_test.cpp @@ -35,6 +35,7 @@ #include "mongo/s/catalog/type_shard.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/sharding_router_test_fixture.h" +#include "mongo/s/transaction/transaction_router.h" #include "mongo/s/write_ops/batch_write_exec.h" #include "mongo/s/write_ops/batched_command_request.h" #include "mongo/s/write_ops/batched_command_response.h" @@ -571,5 +572,167 @@ TEST_F(BatchWriteExecTest, NonRetryableErrorTxnNumber) { future.timed_get(kFutureTimeout); } + +class BatchWriteExecTransactionTest : public BatchWriteExecTest { +public: + const TxnNumber kTxnNumber = 5; + + void setUp() override { + BatchWriteExecTest::setUp(); + + operationContext()->setLogicalSessionId(makeLogicalSessionIdForTest()); + operationContext()->setTxnNumber(kTxnNumber); + repl::ReadConcernArgs::get(operationContext()) = + repl::ReadConcernArgs(repl::ReadConcernLevel::kSnapshotReadConcern); + + _scopedSession.emplace(operationContext()); + + auto txnRouter = TransactionRouter::get(operationContext()); + txnRouter->checkOut(); + txnRouter->beginOrContinueTxn(operationContext(), kTxnNumber, true); + } + + void tearDown() override { + _scopedSession.reset(); + repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs(); + + BatchWriteExecTest::tearDown(); + } + +private: + boost::optional<ScopedRouterSession> _scopedSession; +}; + +TEST_F(BatchWriteExecTransactionTest, ErrorInBatchThrows_CommandError) { + BatchedCommandRequest request([&] { + write_ops::Insert insertOp(nss); + insertOp.setWriteCommandBase([] { + write_ops::WriteCommandBase writeCommandBase; + writeCommandBase.setOrdered(false); + return writeCommandBase; + }()); + insertOp.setDocuments({BSON("x" << 1), BSON("x" << 2)}); + return insertOp; + }()); + request.setWriteConcern(BSONObj()); + + auto future = launchAsync([&] { + BatchedCommandResponse response; + BatchWriteExecStats stats; + ASSERT_THROWS_CODE(BatchWriteExec::executeBatch( + operationContext(), nsTargeter, request, &response, &stats), + AssertionException, + ErrorCodes::UnknownError); + }); + + BatchedCommandResponse failedResponse; + failedResponse.setStatus({ErrorCodes::UnknownError, "dummy error"}); + + expectInsertsReturnError({BSON("x" << 1), BSON("x" << 2)}, failedResponse); + + future.timed_get(kFutureTimeout); +} + +TEST_F(BatchWriteExecTransactionTest, ErrorInBatchThrows_WriteError) { + BatchedCommandRequest request([&] { + write_ops::Insert insertOp(nss); + insertOp.setWriteCommandBase([] { + write_ops::WriteCommandBase writeCommandBase; + writeCommandBase.setOrdered(false); + return writeCommandBase; + }()); + insertOp.setDocuments({BSON("x" << 1), BSON("x" << 2)}); + return insertOp; + }()); + request.setWriteConcern(BSONObj()); + + auto future = launchAsync([&] { + BatchedCommandResponse response; + BatchWriteExecStats stats; + ASSERT_THROWS_CODE(BatchWriteExec::executeBatch( + operationContext(), nsTargeter, request, &response, &stats), + AssertionException, + ErrorCodes::StaleShardVersion); + }); + + // Any write error works, using SSV for convenience. + expectInsertsReturnStaleVersionErrors({BSON("x" << 1), BSON("x" << 2)}); + + future.timed_get(kFutureTimeout); +} + +TEST_F(BatchWriteExecTransactionTest, ErrorInBatchThrows_WriteErrorOrdered) { + BatchedCommandRequest request([&] { + write_ops::Insert insertOp(nss); + insertOp.setWriteCommandBase([] { + write_ops::WriteCommandBase writeCommandBase; + writeCommandBase.setOrdered(true); + return writeCommandBase; + }()); + insertOp.setDocuments({BSON("x" << 1), BSON("x" << 2)}); + return insertOp; + }()); + request.setWriteConcern(BSONObj()); + + auto future = launchAsync([&] { + BatchedCommandResponse response; + BatchWriteExecStats stats; + ASSERT_THROWS_CODE(BatchWriteExec::executeBatch( + operationContext(), nsTargeter, request, &response, &stats), + AssertionException, + ErrorCodes::StaleShardVersion); + }); + + // Any write error works, using SSV for convenience. + expectInsertsReturnStaleVersionErrors({BSON("x" << 1), BSON("x" << 2)}); + + future.timed_get(kFutureTimeout); +} + +TEST_F(BatchWriteExecTransactionTest, ErrorInBatchThrows_WriteConcernError) { + BatchedCommandRequest request([&] { + write_ops::Insert insertOp(nss); + insertOp.setWriteCommandBase([] { + write_ops::WriteCommandBase writeCommandBase; + writeCommandBase.setOrdered(false); + return writeCommandBase; + }()); + insertOp.setDocuments({BSON("x" << 1), BSON("x" << 2)}); + return insertOp; + }()); + request.setWriteConcern(BSONObj()); + + auto future = launchAsync([&] { + BatchedCommandResponse response; + BatchWriteExecStats stats; + ASSERT_THROWS_CODE(BatchWriteExec::executeBatch( + operationContext(), nsTargeter, request, &response, &stats), + AssertionException, + ErrorCodes::NetworkTimeout); + }); + + onCommandForPoolExecutor([&](const executor::RemoteCommandRequest& request) { + const auto opMsgRequest = OpMsgRequest::fromDBAndBody(request.dbname, request.cmdObj); + const auto insertOp = InsertOp::parse(opMsgRequest); + ASSERT_EQUALS(nss, insertOp.getNamespace()); + + BatchedCommandResponse response; + response.setStatus(Status::OK()); + response.setN(1); + + auto wcError = stdx::make_unique<WriteConcernErrorDetail>(); + WriteConcernResult wcRes; + wcRes.err = "timeout"; + wcRes.wTimedOut = true; + wcError->setStatus({ErrorCodes::NetworkTimeout, "Failed to wait for write concern"}); + wcError->setErrInfo(BSON("wtimeout" << true)); + response.setWriteConcernError(wcError.release()); + + return response.toBSON(); + }); + + future.timed_get(kFutureTimeout); +} + } // namespace } // namespace mongo diff --git a/src/mongo/s/write_ops/batch_write_op.cpp b/src/mongo/s/write_ops/batch_write_op.cpp index a5f0824c5f7..1812ed4713a 100644 --- a/src/mongo/s/write_ops/batch_write_op.cpp +++ b/src/mongo/s/write_ops/batch_write_op.cpp @@ -679,6 +679,10 @@ void BatchWriteOp::abortBatch(const WriteErrorDetail& error) { dassert(isFinished()); } +void BatchWriteOp::forgetTargetedBatchesOnTransactionAbortingError() { + _targeted.clear(); +} + bool BatchWriteOp::isFinished() { const size_t numWriteOps = _clientRequest.sizeWriteOps(); const bool orderedOps = _clientRequest.getWriteCommandBase().getOrdered(); diff --git a/src/mongo/s/write_ops/batch_write_op.h b/src/mongo/s/write_ops/batch_write_op.h index 6b1856e7a3d..13418231ea0 100644 --- a/src/mongo/s/write_ops/batch_write_op.h +++ b/src/mongo/s/write_ops/batch_write_op.h @@ -173,6 +173,13 @@ public: void abortBatch(const WriteErrorDetail& error); /** + * Disposes of all tracked targeted batches when an error is encountered during a transaction. + * This is safe because any partially written data on shards will be rolled back if mongos + * decides to abort. + */ + void forgetTargetedBatchesOnTransactionAbortingError(); + + /** * Returns false if the batch write op needs more processing. */ bool isFinished(); |