summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/s/write_ops/batch_write_exec_test.cpp585
-rw-r--r--src/mongo/s/write_ops/batch_write_op.cpp5
-rw-r--r--src/mongo/s/write_ops/batch_write_op.h5
-rw-r--r--src/mongo/s/write_ops/batch_write_op_test.cpp135
-rw-r--r--src/mongo/s/write_ops/mock_ns_targeter.h10
-rw-r--r--src/mongo/s/write_ops/write_op.cpp25
-rw-r--r--src/mongo/s/write_ops/write_op.h3
-rw-r--r--src/mongo/s/write_ops/write_op_test.cpp79
8 files changed, 698 insertions, 149 deletions
diff --git a/src/mongo/s/write_ops/batch_write_exec_test.cpp b/src/mongo/s/write_ops/batch_write_exec_test.cpp
index 9e14e6c5964..b475f1178ef 100644
--- a/src/mongo/s/write_ops/batch_write_exec_test.cpp
+++ b/src/mongo/s/write_ops/batch_write_exec_test.cpp
@@ -30,6 +30,7 @@
#include "mongo/platform/basic.h"
+#include "mongo/bson/bsonobjbuilder.h"
#include "mongo/client/remote_command_targeter_factory_mock.h"
#include "mongo/client/remote_command_targeter_mock.h"
#include "mongo/db/commands.h"
@@ -37,6 +38,7 @@
#include "mongo/s/catalog/type_shard.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/sharding_router_test_fixture.h"
+#include "mongo/s/stale_exception.h"
#include "mongo/s/write_ops/batch_write_exec.h"
#include "mongo/s/write_ops/batched_command_request.h"
#include "mongo/s/write_ops/batched_command_response.h"
@@ -47,11 +49,24 @@
namespace mongo {
namespace {
-const HostAndPort kTestShardHost = HostAndPort("FakeHost", 12345);
+using executor::RemoteCommandRequest;
+
const HostAndPort kTestConfigShardHost = HostAndPort("FakeConfigHost", 12345);
-const std::string shardName = "FakeShard";
+
+const HostAndPort kTestShardHost1 = HostAndPort("FakeHost1", 12345);
+const std::string kShardName1 = "FakeShard1";
+const HostAndPort kTestShardHost2 = HostAndPort("FakeHost2", 12345);
+const std::string kShardName2 = "FakeShard2";
+
const int kMaxRoundsWithoutProgress = 5;
+write_ops::UpdateOpEntry buildUpdate(const BSONObj& q, const BSONObj& u) {
+ write_ops::UpdateOpEntry entry;
+ entry.setQ(q);
+ entry.setU(u);
+ return entry;
+}
+
/**
* Mimics a single shard backend for a particular collection which can be initialized with a
* set of write command results to return.
@@ -65,29 +80,38 @@ public:
ShardingTestFixture::setUp();
setRemote(HostAndPort("ClientHost", 12345));
- // Set up the RemoteCommandTargeter for the config shard.
+ // Set up the RemoteCommandTargeter for the config shard
configTargeter()->setFindHostReturnValue(kTestConfigShardHost);
+ // Add a RemoteCommandTargeter for the data shard2
+ targeterFactory()->addTargeterToReturn(ConnectionString(kTestShardHost1), [] {
+ std::unique_ptr<RemoteCommandTargeterMock> targeter(
+ std::make_unique<RemoteCommandTargeterMock>());
+ targeter->setConnectionStringReturnValue(ConnectionString(kTestShardHost1));
+ targeter->setFindHostReturnValue(kTestShardHost1);
+ return targeter;
+ }());
+
+ targeterFactory()->addTargeterToReturn(ConnectionString(kTestShardHost2), [] {
+ std::unique_ptr<RemoteCommandTargeterMock> targeter(
+ std::make_unique<RemoteCommandTargeterMock>());
+ targeter->setConnectionStringReturnValue(ConnectionString(kTestShardHost2));
+ targeter->setFindHostReturnValue(kTestShardHost2);
+ return targeter;
+ }());
- // Add a RemoteCommandTargeter for the data shard.
- std::unique_ptr<RemoteCommandTargeterMock> targeter(
- stdx::make_unique<RemoteCommandTargeterMock>());
- targeter->setConnectionStringReturnValue(ConnectionString(kTestShardHost));
- targeter->setFindHostReturnValue(kTestShardHost);
- targeterFactory()->addTargeterToReturn(ConnectionString(kTestShardHost),
- std::move(targeter));
-
- // Set up the shard registry to contain the fake shard.
- ShardType shardType;
- shardType.setName(shardName);
- shardType.setHost(kTestShardHost.toString());
- std::vector<ShardType> shards{shardType};
- setupShards(shards);
-
- // Set up the namespace targeter to target the fake shard.
- nsTargeter.init(nss,
- {MockRange(ShardEndpoint(shardName, ChunkVersion::IGNORED()),
- BSON("x" << MINKEY),
- BSON("x" << MAXKEY))});
+ // Set up the shard registry to contain the fake shards
+ setupShards({[] {
+ ShardType shardType;
+ shardType.setName(kShardName1);
+ shardType.setHost(kTestShardHost1.toString());
+ return shardType;
+ }(),
+ [] {
+ ShardType shardType;
+ shardType.setName(kShardName2);
+ shardType.setHost(kTestShardHost2.toString());
+ return shardType;
+ }()});
}
void expectInsertsReturnSuccess(const std::vector<BSONObj>& expected) {
@@ -197,10 +221,13 @@ public:
});
}
- ConnectionString shardHost{kTestShardHost};
- NamespaceString nss{"foo.bar"};
+ const NamespaceString nss{"foo.bar"};
- MockNSTargeter nsTargeter;
+ MockNSTargeter singleShardNSTargeter{
+ nss,
+ {MockRange(ShardEndpoint(kShardName1, ChunkVersion(100, 200, OID::gen())),
+ BSON("x" << MINKEY),
+ BSON("x" << MAXKEY))}};
};
//
@@ -224,7 +251,8 @@ TEST_F(BatchWriteExecTest, SingleOp) {
auto future = launchAsync([&] {
BatchedCommandResponse response;
BatchWriteExecStats stats;
- BatchWriteExec::executeBatch(operationContext(), nsTargeter, request, &response, &stats);
+ BatchWriteExec::executeBatch(
+ operationContext(), singleShardNSTargeter, request, &response, &stats);
ASSERT(response.getOk());
ASSERT_EQ(1LL, response.getN());
ASSERT_EQ(1, stats.numRounds);
@@ -260,7 +288,8 @@ TEST_F(BatchWriteExecTest, MultiOpLarge) {
auto future = launchAsync([&] {
BatchedCommandResponse response;
BatchWriteExecStats stats;
- BatchWriteExec::executeBatch(operationContext(), nsTargeter, request, &response, &stats);
+ BatchWriteExec::executeBatch(
+ operationContext(), singleShardNSTargeter, request, &response, &stats);
ASSERT(response.getOk());
ASSERT_EQUALS(response.getN(), kNumDocsToInsert);
@@ -292,7 +321,8 @@ TEST_F(BatchWriteExecTest, SingleOpError) {
auto future = launchAsync([&] {
BatchedCommandResponse response;
BatchWriteExecStats stats;
- BatchWriteExec::executeBatch(operationContext(), nsTargeter, request, &response, &stats);
+ BatchWriteExec::executeBatch(
+ operationContext(), singleShardNSTargeter, request, &response, &stats);
ASSERT(response.getOk());
ASSERT_EQ(0, response.getN());
ASSERT(response.isErrDetailsSet());
@@ -308,6 +338,473 @@ TEST_F(BatchWriteExecTest, SingleOpError) {
future.timed_get(kFutureTimeout);
}
+void serializeErrorToBSON(const Status& status, BSONObjBuilder* builder) {
+ builder->append("code", status.code());
+ builder->append("codeName", ErrorCodes::errorString(status.code()));
+ builder->append("errmsg", status.reason());
+
+ if (auto ei = status.extraInfo())
+ ei->serialize(builder);
+}
+
+TEST_F(BatchWriteExecTest, StaleShardVersionReturnedFromBatchWithSingleMultiWrite) {
+ BatchedCommandRequest request([&] {
+ write_ops::Update updateOp(nss);
+ updateOp.setWriteCommandBase([] {
+ write_ops::WriteCommandBase writeCommandBase;
+ writeCommandBase.setOrdered(false);
+ return writeCommandBase;
+ }());
+ updateOp.setUpdates({buildUpdate(BSON("_id" << 100), BSON("Key" << 100))});
+ return updateOp;
+ }());
+ request.setWriteConcern(BSONObj());
+
+ static const auto epoch = OID::gen();
+
+ class MultiShardTargeter : public MockNSTargeter {
+ public:
+ using MockNSTargeter::MockNSTargeter;
+
+ StatusWith<std::vector<ShardEndpoint>> targetUpdate(
+ OperationContext* opCtx, const write_ops::UpdateOpEntry& updateDoc) const override {
+ return std::vector<ShardEndpoint>{
+ ShardEndpoint(kShardName1, ChunkVersion(100, 200, epoch)),
+ ShardEndpoint(kShardName2, ChunkVersion(101, 200, epoch))};
+ }
+ };
+
+ MultiShardTargeter multiShardNSTargeter(
+ nss,
+ {MockRange(ShardEndpoint(kShardName1, ChunkVersion(100, 200, epoch)),
+ BSON("x" << MINKEY),
+ BSON("x" << 0)),
+ MockRange(ShardEndpoint(kShardName2, ChunkVersion(101, 200, epoch)),
+ BSON("x" << 0),
+ BSON("x" << MAXKEY))});
+
+ auto future = launchAsync([&] {
+ BatchedCommandResponse response;
+ BatchWriteExecStats stats;
+ BatchWriteExec::executeBatch(
+ operationContext(), multiShardNSTargeter, request, &response, &stats);
+
+ return response;
+ });
+
+ onCommandForPoolExecutor([&](const RemoteCommandRequest& request) {
+ ASSERT_EQ(kTestShardHost1, request.target);
+
+ BatchedCommandResponse response;
+ response.setStatus(Status::OK());
+ response.setNModified(1);
+
+ return response.toBSON();
+ });
+
+ onCommandForPoolExecutor([&](const RemoteCommandRequest& request) {
+ ASSERT_EQ(kTestShardHost2, request.target);
+
+ BatchedCommandResponse response;
+ response.setStatus(Status::OK());
+ response.setNModified(0);
+ response.addToErrDetails([&] {
+ WriteErrorDetail* errDetail = new WriteErrorDetail();
+ errDetail->setIndex(0);
+ errDetail->setStatus({ErrorCodes::StaleShardVersion, "Stale shard version"});
+ errDetail->setErrInfo([&] {
+ Status ssvStatus(StaleConfigInfo(nss,
+ ChunkVersion(101, 200, epoch),
+ ChunkVersion(105, 200, epoch)),
+ "Stale shard version");
+ BSONObjBuilder builder;
+ serializeErrorToBSON(ssvStatus, &builder);
+ return builder.obj();
+ }());
+ return errDetail;
+ }());
+
+ return response.toBSON();
+ });
+
+ onCommandForPoolExecutor([&](const RemoteCommandRequest& request) {
+ ASSERT_EQ(kTestShardHost2, request.target);
+
+ BatchedCommandResponse response;
+ response.setStatus(Status::OK());
+ response.setNModified(2);
+
+ return response.toBSON();
+ });
+
+ auto response = future.timed_get(kFutureTimeout);
+
+ ASSERT_OK(response.getTopLevelStatus());
+ ASSERT_EQ(3, response.getNModified());
+}
+
+TEST_F(BatchWriteExecTest,
+ RetryableErrorReturnedFromMultiWriteWithShard1AllOKShard2AllStaleShardVersion) {
+ BatchedCommandRequest request([&] {
+ write_ops::Update updateOp(nss);
+ updateOp.setWriteCommandBase([] {
+ write_ops::WriteCommandBase writeCommandBase;
+ writeCommandBase.setOrdered(false);
+ return writeCommandBase;
+ }());
+ updateOp.setUpdates({buildUpdate(BSON("id" << 150), BSON("x" << 1)),
+ buildUpdate(BSON("id" << 200), BSON("y" << 2))});
+ return updateOp;
+ }());
+ request.setWriteConcern(BSONObj());
+
+ static const auto epoch = OID::gen();
+
+ // This allows the batch to target each write operation
+ // to a specific shard (kShardName2), to perform this test
+ class MultiShardTargeter : public MockNSTargeter {
+ public:
+ using MockNSTargeter::MockNSTargeter;
+
+ StatusWith<std::vector<ShardEndpoint>> targetUpdate(
+ OperationContext* opCtx, const write_ops::UpdateOpEntry& updateDoc) const override {
+ return std::vector<ShardEndpoint>{
+ ShardEndpoint(kShardName1, ChunkVersion(100, 200, epoch)),
+ ShardEndpoint(kShardName2, ChunkVersion(101, 200, epoch))};
+ }
+ };
+
+ MultiShardTargeter multiShardNSTargeter(
+ nss,
+ {MockRange(ShardEndpoint(kShardName1, ChunkVersion(100, 200, epoch)),
+ BSON("sk" << MINKEY),
+ BSON("sk" << 10)),
+ MockRange(ShardEndpoint(kShardName2, ChunkVersion(101, 200, epoch)),
+ BSON("sk" << 10),
+ BSON("sk" << MAXKEY))});
+
+ auto future = launchAsync([&] {
+ BatchedCommandResponse response;
+ BatchWriteExecStats stats;
+ BatchWriteExec::executeBatch(
+ operationContext(), multiShardNSTargeter, request, &response, &stats);
+
+ return response;
+ });
+
+ onCommandForPoolExecutor([&](const RemoteCommandRequest& request) {
+ ASSERT_EQ(kTestShardHost1, request.target);
+
+ BatchedCommandResponse response;
+ response.setStatus(Status::OK());
+ response.setNModified(1);
+
+ return response.toBSON();
+ });
+
+ onCommandForPoolExecutor([&](const RemoteCommandRequest& request) {
+ ASSERT_EQ(kTestShardHost2, request.target);
+
+ BatchedCommandResponse response;
+ response.setStatus(Status::OK());
+ response.setNModified(0);
+ response.addToErrDetails([&] {
+ WriteErrorDetail* errDetail = new WriteErrorDetail();
+ errDetail->setIndex(0);
+ errDetail->setStatus({ErrorCodes::StaleShardVersion, "Stale shard version"});
+ errDetail->setErrInfo([&] {
+ Status ssvStatus(StaleConfigInfo(nss,
+ ChunkVersion(101, 200, epoch),
+ ChunkVersion(105, 200, epoch)),
+ "Stale shard version");
+ BSONObjBuilder builder;
+ serializeErrorToBSON(ssvStatus, &builder);
+ return builder.obj();
+ }());
+ return errDetail;
+ }());
+ response.addToErrDetails([&] {
+ WriteErrorDetail* errDetail = new WriteErrorDetail();
+ errDetail->setIndex(1);
+ errDetail->setStatus({ErrorCodes::StaleShardVersion, "Stale shard version"});
+ errDetail->setErrInfo([&] {
+ Status ssvStatus(StaleConfigInfo(nss,
+ ChunkVersion(101, 200, epoch),
+ ChunkVersion(105, 200, epoch)),
+ "Stale shard version");
+ BSONObjBuilder builder;
+ serializeErrorToBSON(ssvStatus, &builder);
+ return builder.obj();
+ }());
+ return errDetail;
+ }());
+
+ return response.toBSON();
+ });
+
+ onCommandForPoolExecutor([&](const RemoteCommandRequest& request) {
+ ASSERT_EQ(kTestShardHost2, request.target);
+
+ BatchedCommandResponse response;
+ response.setStatus(Status::OK());
+ response.setNModified(2);
+
+ return response.toBSON();
+ });
+
+ auto response = future.timed_get(kFutureTimeout);
+ ASSERT_OK(response.getTopLevelStatus());
+ ASSERT_EQ(3, response.getNModified());
+}
+
+TEST_F(BatchWriteExecTest, RetryableErrorReturnedFromMultiWriteWithShard1FirstOKShard2FirstErr) {
+ BatchedCommandRequest request([&] {
+ write_ops::Update updateOp(nss);
+ updateOp.setWriteCommandBase([] {
+ write_ops::WriteCommandBase writeCommandBase;
+ writeCommandBase.setOrdered(false);
+ return writeCommandBase;
+ }());
+ updateOp.setUpdates({buildUpdate(BSON("id" << 150), BSON("x" << 1)),
+ buildUpdate(BSON("id" << 200), BSON("y" << 2))});
+ return updateOp;
+ }());
+ request.setWriteConcern(BSONObj());
+
+ static const auto epoch = OID::gen();
+
+ // This allows the batch to target each write operation
+ // to a specific shard (kShardName2), to perform this test
+ class MultiShardTargeter : public MockNSTargeter {
+ public:
+ using MockNSTargeter::MockNSTargeter;
+
+ StatusWith<std::vector<ShardEndpoint>> targetUpdate(
+ OperationContext* opCtx, const write_ops::UpdateOpEntry& updateDoc) const override {
+ return std::vector<ShardEndpoint>{
+ ShardEndpoint(kShardName1, ChunkVersion(100, 200, epoch)),
+ ShardEndpoint(kShardName2, ChunkVersion(101, 200, epoch))};
+ }
+ };
+
+ MultiShardTargeter multiShardNSTargeter(
+ nss,
+ {MockRange(ShardEndpoint(kShardName1, ChunkVersion(100, 200, epoch)),
+ BSON("sk" << MINKEY),
+ BSON("sk" << 10)),
+ MockRange(ShardEndpoint(kShardName2, ChunkVersion(101, 200, epoch)),
+ BSON("sk" << 10),
+ BSON("sk" << MAXKEY))});
+
+ auto future = launchAsync([&] {
+ BatchedCommandResponse response;
+ BatchWriteExecStats stats;
+ BatchWriteExec::executeBatch(
+ operationContext(), multiShardNSTargeter, request, &response, &stats);
+
+ return response;
+ });
+
+ onCommandForPoolExecutor([&](const RemoteCommandRequest& request) {
+ ASSERT_EQ(kTestShardHost1, request.target);
+
+ BatchedCommandResponse response;
+ response.setStatus(Status::OK());
+ response.setNModified(0);
+ response.addToErrDetails([&] {
+ WriteErrorDetail* errDetail = new WriteErrorDetail();
+ errDetail->setIndex(1);
+ errDetail->setStatus({ErrorCodes::StaleShardVersion, "Stale shard version"});
+ errDetail->setErrInfo([&] {
+ Status ssvStatus(StaleConfigInfo(nss,
+ ChunkVersion(101, 200, epoch),
+ ChunkVersion(105, 200, epoch)),
+ "Stale shard version");
+ BSONObjBuilder builder;
+ serializeErrorToBSON(ssvStatus, &builder);
+ return builder.obj();
+ }());
+ return errDetail;
+ }());
+
+ return response.toBSON();
+ });
+
+ onCommandForPoolExecutor([&](const RemoteCommandRequest& request) {
+ ASSERT_EQ(kTestShardHost2, request.target);
+
+ BatchedCommandResponse response;
+ response.setStatus(Status::OK());
+ response.setNModified(0);
+ response.addToErrDetails([&] {
+ WriteErrorDetail* errDetail = new WriteErrorDetail();
+ errDetail->setIndex(0);
+ errDetail->setStatus({ErrorCodes::StaleShardVersion, "Stale shard version"});
+ errDetail->setErrInfo([&] {
+ Status ssvStatus(StaleConfigInfo(nss,
+ ChunkVersion(101, 200, epoch),
+ ChunkVersion(105, 200, epoch)),
+ "Stale shard version");
+ BSONObjBuilder builder;
+ serializeErrorToBSON(ssvStatus, &builder);
+ return builder.obj();
+ }());
+ return errDetail;
+ }());
+
+ return response.toBSON();
+ });
+
+
+ onCommandForPoolExecutor([&](const RemoteCommandRequest& request) {
+ ASSERT_EQ(kTestShardHost1, request.target);
+
+ BatchedCommandResponse response;
+ response.setStatus(Status::OK());
+ response.setNModified(1);
+
+ return response.toBSON();
+ });
+
+ onCommandForPoolExecutor([&](const RemoteCommandRequest& request) {
+ ASSERT_EQ(kTestShardHost2, request.target);
+
+ BatchedCommandResponse response;
+ response.setStatus(Status::OK());
+ response.setNModified(1);
+
+ return response.toBSON();
+ });
+
+ auto response = future.timed_get(kFutureTimeout);
+ ASSERT_OK(response.getTopLevelStatus());
+ ASSERT_EQ(2, response.getNModified());
+}
+
+TEST_F(BatchWriteExecTest, RetryableErrorReturnedFromMultiWriteWithShard1FirstOKShard2FirstOK) {
+ BatchedCommandRequest request([&] {
+ write_ops::Update updateOp(nss);
+ updateOp.setWriteCommandBase([] {
+ write_ops::WriteCommandBase writeCommandBase;
+ writeCommandBase.setOrdered(false);
+ return writeCommandBase;
+ }());
+ updateOp.setUpdates({buildUpdate(BSON("id" << 150), BSON("x" << 1)),
+ buildUpdate(BSON("id" << 200), BSON("y" << 2))});
+ return updateOp;
+ }());
+ request.setWriteConcern(BSONObj());
+
+ static const auto epoch = OID::gen();
+
+ // This allows the batch to target each write operation
+ // to a specific shard (kShardName2), to perform this test
+ class MultiShardTargeter : public MockNSTargeter {
+ public:
+ using MockNSTargeter::MockNSTargeter;
+
+ StatusWith<std::vector<ShardEndpoint>> targetUpdate(
+ OperationContext* opCtx, const write_ops::UpdateOpEntry& updateDoc) const override {
+ return std::vector<ShardEndpoint>{
+ ShardEndpoint(kShardName1, ChunkVersion(100, 200, epoch)),
+ ShardEndpoint(kShardName2, ChunkVersion(101, 200, epoch))};
+ }
+ };
+
+ MultiShardTargeter multiShardNSTargeter(
+ nss,
+ {MockRange(ShardEndpoint(kShardName1, ChunkVersion(100, 200, epoch)),
+ BSON("sk" << MINKEY),
+ BSON("sk" << 10)),
+ MockRange(ShardEndpoint(kShardName2, ChunkVersion(101, 200, epoch)),
+ BSON("sk" << 10),
+ BSON("sk" << MAXKEY))});
+
+ auto future = launchAsync([&] {
+ BatchedCommandResponse response;
+ BatchWriteExecStats stats;
+ BatchWriteExec::executeBatch(
+ operationContext(), multiShardNSTargeter, request, &response, &stats);
+
+ return response;
+ });
+
+ onCommandForPoolExecutor([&](const RemoteCommandRequest& request) {
+ ASSERT_EQ(kTestShardHost1, request.target);
+
+ BatchedCommandResponse response;
+ response.setStatus(Status::OK());
+ response.setNModified(0);
+ response.addToErrDetails([&] {
+ WriteErrorDetail* errDetail = new WriteErrorDetail();
+ errDetail->setIndex(1);
+ errDetail->setStatus({ErrorCodes::StaleShardVersion, "Stale shard version"});
+ errDetail->setErrInfo([&] {
+ Status ssvStatus(StaleConfigInfo(nss,
+ ChunkVersion(101, 200, epoch),
+ ChunkVersion(105, 200, epoch)),
+ "Stale shard version");
+ BSONObjBuilder builder;
+ serializeErrorToBSON(ssvStatus, &builder);
+ return builder.obj();
+ }());
+ return errDetail;
+ }());
+
+ return response.toBSON();
+ });
+
+ onCommandForPoolExecutor([&](const RemoteCommandRequest& request) {
+ ASSERT_EQ(kTestShardHost2, request.target);
+
+ BatchedCommandResponse response;
+ response.setStatus(Status::OK());
+ response.setNModified(0);
+ response.addToErrDetails([&] {
+ WriteErrorDetail* errDetail = new WriteErrorDetail();
+ errDetail->setIndex(1);
+ errDetail->setStatus({ErrorCodes::StaleShardVersion, "Stale shard version"});
+ errDetail->setErrInfo([&] {
+ Status ssvStatus(StaleConfigInfo(nss,
+ ChunkVersion(101, 200, epoch),
+ ChunkVersion(105, 200, epoch)),
+ "Stale shard version");
+ BSONObjBuilder builder;
+ serializeErrorToBSON(ssvStatus, &builder);
+ return builder.obj();
+ }());
+ return errDetail;
+ }());
+
+ return response.toBSON();
+ });
+
+
+ onCommandForPoolExecutor([&](const RemoteCommandRequest& request) {
+ ASSERT_EQ(kTestShardHost1, request.target);
+
+ BatchedCommandResponse response;
+ response.setStatus(Status::OK());
+ response.setNModified(1);
+
+ return response.toBSON();
+ });
+
+ onCommandForPoolExecutor([&](const RemoteCommandRequest& request) {
+ ASSERT_EQ(kTestShardHost2, request.target);
+
+ BatchedCommandResponse response;
+ response.setStatus(Status::OK());
+ response.setNModified(1);
+
+ return response.toBSON();
+ });
+
+ auto response = future.timed_get(kFutureTimeout);
+ ASSERT_OK(response.getTopLevelStatus());
+ ASSERT_EQ(2, response.getNModified());
+}
+
//
// Test retryable errors
//
@@ -329,7 +826,8 @@ TEST_F(BatchWriteExecTest, StaleOp) {
auto future = launchAsync([&] {
BatchedCommandResponse response;
BatchWriteExecStats stats;
- BatchWriteExec::executeBatch(operationContext(), nsTargeter, request, &response, &stats);
+ BatchWriteExec::executeBatch(
+ operationContext(), singleShardNSTargeter, request, &response, &stats);
ASSERT(response.getOk());
ASSERT_EQUALS(1, stats.numStaleBatches);
@@ -359,7 +857,8 @@ TEST_F(BatchWriteExecTest, MultiStaleOp) {
auto future = launchAsync([&] {
BatchedCommandResponse response;
BatchWriteExecStats stats;
- BatchWriteExec::executeBatch(operationContext(), nsTargeter, request, &response, &stats);
+ BatchWriteExec::executeBatch(
+ operationContext(), singleShardNSTargeter, request, &response, &stats);
ASSERT(response.getOk());
ASSERT_EQUALS(3, stats.numStaleBatches);
@@ -377,10 +876,11 @@ TEST_F(BatchWriteExecTest, MultiStaleOp) {
future.timed_get(kFutureTimeout);
}
-TEST_F(BatchWriteExecTest, TooManyStaleOp) {
- // Retry op in exec too many times (without refresh) b/c of stale config (the mock nsTargeter
- // doesn't report progress on refresh). We should report a no progress error for everything in
- // the batch.
+TEST_F(BatchWriteExecTest, TooManyStaleShardOp) {
+ // Retry op in exec too many times (without refresh) b/c of stale config (the mock
+ // singleShardNSTargeter doesn't report progress on refresh). We should report a no progress
+ // error for everything in the batch.
+
BatchedCommandRequest request([&] {
write_ops::Insert insertOp(nss);
insertOp.setWriteCommandBase([] {
@@ -396,7 +896,8 @@ TEST_F(BatchWriteExecTest, TooManyStaleOp) {
auto future = launchAsync([&] {
BatchedCommandResponse response;
BatchWriteExecStats stats;
- BatchWriteExec::executeBatch(operationContext(), nsTargeter, request, &response, &stats);
+ BatchWriteExec::executeBatch(
+ operationContext(), singleShardNSTargeter, request, &response, &stats);
ASSERT(response.getOk());
ASSERT_EQ(0, response.getN());
ASSERT(response.isErrDetailsSet());
@@ -444,7 +945,8 @@ TEST_F(BatchWriteExecTest, RetryableWritesLargeBatch) {
auto future = launchAsync([&] {
BatchedCommandResponse response;
BatchWriteExecStats stats;
- BatchWriteExec::executeBatch(operationContext(), nsTargeter, request, &response, &stats);
+ BatchWriteExec::executeBatch(
+ operationContext(), singleShardNSTargeter, request, &response, &stats);
ASSERT(response.getOk());
ASSERT_EQUALS(response.getN(), kNumDocsToInsert);
@@ -478,7 +980,8 @@ TEST_F(BatchWriteExecTest, RetryableErrorNoTxnNumber) {
auto future = launchAsync([&] {
BatchedCommandResponse response;
BatchWriteExecStats stats;
- BatchWriteExec::executeBatch(operationContext(), nsTargeter, request, &response, &stats);
+ BatchWriteExec::executeBatch(
+ operationContext(), singleShardNSTargeter, request, &response, &stats);
ASSERT(response.getOk());
ASSERT_EQ(0, response.getN());
@@ -519,7 +1022,8 @@ TEST_F(BatchWriteExecTest, RetryableErrorTxnNumber) {
auto future = launchAsync([&] {
BatchedCommandResponse response;
BatchWriteExecStats stats;
- BatchWriteExec::executeBatch(operationContext(), nsTargeter, request, &response, &stats);
+ BatchWriteExec::executeBatch(
+ operationContext(), singleShardNSTargeter, request, &response, &stats);
ASSERT(response.getOk());
ASSERT(!response.isErrDetailsSet());
@@ -556,7 +1060,8 @@ TEST_F(BatchWriteExecTest, NonRetryableErrorTxnNumber) {
auto future = launchAsync([&] {
BatchedCommandResponse response;
BatchWriteExecStats stats;
- BatchWriteExec::executeBatch(operationContext(), nsTargeter, request, &response, &stats);
+ BatchWriteExec::executeBatch(
+ operationContext(), singleShardNSTargeter, request, &response, &stats);
ASSERT(response.getOk());
ASSERT_EQ(0, response.getN());
diff --git a/src/mongo/s/write_ops/batch_write_op.cpp b/src/mongo/s/write_ops/batch_write_op.cpp
index 229ce247335..14e3fce88c1 100644
--- a/src/mongo/s/write_ops/batch_write_op.cpp
+++ b/src/mongo/s/write_ops/batch_write_op.cpp
@@ -839,4 +839,9 @@ const std::vector<ShardError>& TrackedErrors::getErrors(int errCode) const {
return _errorMap.find(errCode)->second;
}
+void TargetedWriteBatch::addWrite(TargetedWrite* targetedWrite, int estWriteSize) {
+ _writes.mutableVector().push_back(targetedWrite);
+ _estimatedSizeBytes += estWriteSize;
+}
+
} // namespace mongo
diff --git a/src/mongo/s/write_ops/batch_write_op.h b/src/mongo/s/write_ops/batch_write_op.h
index 73f30018edb..5ea5f1261c5 100644
--- a/src/mongo/s/write_ops/batch_write_op.h
+++ b/src/mongo/s/write_ops/batch_write_op.h
@@ -262,10 +262,7 @@ public:
/**
* TargetedWrite is owned here once given to the TargetedWriteBatch.
*/
- void addWrite(TargetedWrite* targetedWrite, int estWriteSize) {
- _writes.mutableVector().push_back(targetedWrite);
- _estimatedSizeBytes += estWriteSize;
- }
+ void addWrite(TargetedWrite* targetedWrite, int estWriteSize);
private:
// Where to send the batch
diff --git a/src/mongo/s/write_ops/batch_write_op_test.cpp b/src/mongo/s/write_ops/batch_write_op_test.cpp
index 04700a2aa60..2a125d62aa8 100644
--- a/src/mongo/s/write_ops/batch_write_op_test.cpp
+++ b/src/mongo/s/write_ops/batch_write_op_test.cpp
@@ -41,26 +41,21 @@
namespace mongo {
namespace {
-void initTargeterFullRange(const NamespaceString& nss,
- const ShardEndpoint& endpoint,
- MockNSTargeter* targeter) {
- targeter->init(nss, {MockRange(endpoint, BSON("x" << MINKEY), BSON("x" << MAXKEY))});
+auto initTargeterFullRange(const NamespaceString& nss, const ShardEndpoint& endpoint) {
+ return MockNSTargeter(nss, {MockRange(endpoint, BSON("x" << MINKEY), BSON("x" << MAXKEY))});
}
-void initTargeterSplitRange(const NamespaceString& nss,
+auto initTargeterSplitRange(const NamespaceString& nss,
const ShardEndpoint& endpointA,
- const ShardEndpoint& endpointB,
- MockNSTargeter* targeter) {
- targeter->init(nss,
- {MockRange(endpointA, BSON("x" << MINKEY), BSON("x" << 0)),
- MockRange(endpointB, BSON("x" << 0), BSON("x" << MAXKEY))});
+ const ShardEndpoint& endpointB) {
+ return MockNSTargeter(nss,
+ {MockRange(endpointA, BSON("x" << MINKEY), BSON("x" << 0)),
+ MockRange(endpointB, BSON("x" << 0), BSON("x" << MAXKEY))});
}
-void initTargeterHalfRange(const NamespaceString& nss,
- const ShardEndpoint& endpoint,
- MockNSTargeter* targeter) {
+auto initTargeterHalfRange(const NamespaceString& nss, const ShardEndpoint& endpoint) {
// x >= 0 values are untargetable
- targeter->init(nss, {MockRange(endpoint, BSON("x" << MINKEY), BSON("x" << 0))});
+ return MockNSTargeter(nss, {MockRange(endpoint, BSON("x" << MINKEY), BSON("x" << 0))});
}
write_ops::DeleteOpEntry buildDelete(const BSONObj& query, bool multi) {
@@ -130,8 +125,8 @@ using BatchWriteOpTest = WriteOpTestFixture;
TEST_F(BatchWriteOpTest, SingleOp) {
NamespaceString nss("foo.bar");
ShardEndpoint endpoint(ShardId("shard"), ChunkVersion::IGNORED());
- MockNSTargeter targeter;
- initTargeterFullRange(nss, endpoint, &targeter);
+
+ auto targeter = initTargeterFullRange(nss, endpoint);
// Do single-target, single doc batch write op
BatchedCommandRequest request([&] {
@@ -163,8 +158,8 @@ TEST_F(BatchWriteOpTest, SingleOp) {
TEST_F(BatchWriteOpTest, SingleError) {
NamespaceString nss("foo.bar");
ShardEndpoint endpoint(ShardId("shard"), ChunkVersion::IGNORED());
- MockNSTargeter targeter;
- initTargeterFullRange(nss, endpoint, &targeter);
+
+ auto targeter = initTargeterFullRange(nss, endpoint);
// Do single-target, single doc batch write op
BatchedCommandRequest request([&] {
@@ -202,8 +197,8 @@ TEST_F(BatchWriteOpTest, SingleError) {
TEST_F(BatchWriteOpTest, SingleTargetError) {
NamespaceString nss("foo.bar");
ShardEndpoint endpoint(ShardId("shard"), ChunkVersion::IGNORED());
- MockNSTargeter targeter;
- initTargeterHalfRange(nss, endpoint, &targeter);
+
+ auto targeter = initTargeterHalfRange(nss, endpoint);
// Do untargetable delete op
BatchedCommandRequest request([&] {
@@ -237,8 +232,8 @@ TEST_F(BatchWriteOpTest, SingleTargetError) {
TEST_F(BatchWriteOpTest, SingleWriteConcernErrorOrdered) {
NamespaceString nss("foo.bar");
ShardEndpoint endpoint(ShardId("shard"), ChunkVersion::IGNORED());
- MockNSTargeter targeter;
- initTargeterFullRange(nss, endpoint, &targeter);
+
+ auto targeter = initTargeterFullRange(nss, endpoint);
BatchedCommandRequest request([&] {
write_ops::Insert insertOp(nss);
@@ -279,8 +274,8 @@ TEST_F(BatchWriteOpTest, SingleWriteConcernErrorOrdered) {
TEST_F(BatchWriteOpTest, SingleStaleError) {
NamespaceString nss("foo.bar");
ShardEndpoint endpoint(ShardId("shard"), ChunkVersion::IGNORED());
- MockNSTargeter targeter;
- initTargeterFullRange(nss, endpoint, &targeter);
+
+ auto targeter = initTargeterFullRange(nss, endpoint);
BatchedCommandRequest request([&] {
write_ops::Insert insertOp(nss);
@@ -333,8 +328,8 @@ TEST_F(BatchWriteOpTest, SingleStaleError) {
TEST_F(BatchWriteOpTest, MultiOpSameShardOrdered) {
NamespaceString nss("foo.bar");
ShardEndpoint endpoint(ShardId("shard"), ChunkVersion::IGNORED());
- MockNSTargeter targeter;
- initTargeterFullRange(nss, endpoint, &targeter);
+
+ auto targeter = initTargeterFullRange(nss, endpoint);
// Do single-target, multi-doc batch write op
BatchedCommandRequest request([&] {
@@ -370,8 +365,8 @@ TEST_F(BatchWriteOpTest, MultiOpSameShardOrdered) {
TEST_F(BatchWriteOpTest, MultiOpSameShardUnordered) {
NamespaceString nss("foo.bar");
ShardEndpoint endpoint(ShardId("shard"), ChunkVersion::IGNORED());
- MockNSTargeter targeter;
- initTargeterFullRange(nss, endpoint, &targeter);
+
+ auto targeter = initTargeterFullRange(nss, endpoint);
// Do single-target, multi-doc batch write op
BatchedCommandRequest request([&] {
@@ -414,8 +409,8 @@ TEST_F(BatchWriteOpTest, MultiOpTwoShardsOrdered) {
NamespaceString nss("foo.bar");
ShardEndpoint endpointA(ShardId("shardA"), ChunkVersion::IGNORED());
ShardEndpoint endpointB(ShardId("shardB"), ChunkVersion::IGNORED());
- MockNSTargeter targeter;
- initTargeterSplitRange(nss, endpointA, endpointB, &targeter);
+
+ auto targeter = initTargeterSplitRange(nss, endpointA, endpointB);
// Do multi-target, multi-doc batch write op
BatchedCommandRequest request([&] {
@@ -481,8 +476,8 @@ TEST_F(BatchWriteOpTest, MultiOpTwoShardsUnordered) {
NamespaceString nss("foo.bar");
ShardEndpoint endpointA(ShardId("shardA"), ChunkVersion::IGNORED());
ShardEndpoint endpointB(ShardId("shardB"), ChunkVersion::IGNORED());
- MockNSTargeter targeter;
- initTargeterSplitRange(nss, endpointA, endpointB, &targeter);
+
+ auto targeter = initTargeterSplitRange(nss, endpointA, endpointB);
// Do multi-target, multi-doc batch write op
BatchedCommandRequest request([&] {
@@ -527,8 +522,8 @@ TEST_F(BatchWriteOpTest, MultiOpTwoShardsEachOrdered) {
NamespaceString nss("foo.bar");
ShardEndpoint endpointA(ShardId("shardA"), ChunkVersion::IGNORED());
ShardEndpoint endpointB(ShardId("shardB"), ChunkVersion::IGNORED());
- MockNSTargeter targeter;
- initTargeterSplitRange(nss, endpointA, endpointB, &targeter);
+
+ auto targeter = initTargeterSplitRange(nss, endpointA, endpointB);
// Do multi-target, multi-doc batch write op
BatchedCommandRequest request([&] {
@@ -583,8 +578,8 @@ TEST_F(BatchWriteOpTest, MultiOpTwoShardsEachUnordered) {
NamespaceString nss("foo.bar");
ShardEndpoint endpointA(ShardId("shardA"), ChunkVersion::IGNORED());
ShardEndpoint endpointB(ShardId("shardB"), ChunkVersion::IGNORED());
- MockNSTargeter targeter;
- initTargeterSplitRange(nss, endpointA, endpointB, &targeter);
+
+ auto targeter = initTargeterSplitRange(nss, endpointA, endpointB);
// Do multi-target, multi-doc batch write op
BatchedCommandRequest request([&] {
@@ -631,8 +626,8 @@ TEST_F(BatchWriteOpTest, MultiOpOneOrTwoShardsOrdered) {
NamespaceString nss("foo.bar");
ShardEndpoint endpointA(ShardId("shardA"), ChunkVersion::IGNORED());
ShardEndpoint endpointB(ShardId("shardB"), ChunkVersion::IGNORED());
- MockNSTargeter targeter;
- initTargeterSplitRange(nss, endpointA, endpointB, &targeter);
+
+ auto targeter = initTargeterSplitRange(nss, endpointA, endpointB);
BatchedCommandRequest request([&] {
write_ops::Delete deleteOp(nss);
@@ -726,8 +721,8 @@ TEST_F(BatchWriteOpTest, MultiOpOneOrTwoShardsUnordered) {
NamespaceString nss("foo.bar");
ShardEndpoint endpointA(ShardId("shardA"), ChunkVersion::IGNORED());
ShardEndpoint endpointB(ShardId("shardB"), ChunkVersion::IGNORED());
- MockNSTargeter targeter;
- initTargeterSplitRange(nss, endpointA, endpointB, &targeter);
+
+ auto targeter = initTargeterSplitRange(nss, endpointA, endpointB);
BatchedCommandRequest request([&] {
write_ops::Update updateOp(nss);
@@ -780,8 +775,8 @@ TEST_F(BatchWriteOpTest, MultiOpSingleShardErrorUnordered) {
NamespaceString nss("foo.bar");
ShardEndpoint endpointA(ShardId("shardA"), ChunkVersion::IGNORED());
ShardEndpoint endpointB(ShardId("shardB"), ChunkVersion::IGNORED());
- MockNSTargeter targeter;
- initTargeterSplitRange(nss, endpointA, endpointB, &targeter);
+
+ auto targeter = initTargeterSplitRange(nss, endpointA, endpointB);
BatchedCommandRequest request([&] {
write_ops::Insert insertOp(nss);
@@ -841,8 +836,8 @@ TEST_F(BatchWriteOpTest, MultiOpTwoShardErrorsUnordered) {
NamespaceString nss("foo.bar");
ShardEndpoint endpointA(ShardId("shardA"), ChunkVersion::IGNORED());
ShardEndpoint endpointB(ShardId("shardB"), ChunkVersion::IGNORED());
- MockNSTargeter targeter;
- initTargeterSplitRange(nss, endpointA, endpointB, &targeter);
+
+ auto targeter = initTargeterSplitRange(nss, endpointA, endpointB);
BatchedCommandRequest request([&] {
write_ops::Insert insertOp(nss);
@@ -899,8 +894,8 @@ TEST_F(BatchWriteOpTest, MultiOpPartialSingleShardErrorUnordered) {
NamespaceString nss("foo.bar");
ShardEndpoint endpointA(ShardId("shardA"), ChunkVersion::IGNORED());
ShardEndpoint endpointB(ShardId("shardB"), ChunkVersion::IGNORED());
- MockNSTargeter targeter;
- initTargeterSplitRange(nss, endpointA, endpointB, &targeter);
+
+ auto targeter = initTargeterSplitRange(nss, endpointA, endpointB);
BatchedCommandRequest request([&] {
write_ops::Delete deleteOp(nss);
@@ -962,8 +957,8 @@ TEST_F(BatchWriteOpTest, MultiOpPartialSingleShardErrorOrdered) {
NamespaceString nss("foo.bar");
ShardEndpoint endpointA(ShardId("shardA"), ChunkVersion::IGNORED());
ShardEndpoint endpointB(ShardId("shardB"), ChunkVersion::IGNORED());
- MockNSTargeter targeter;
- initTargeterSplitRange(nss, endpointA, endpointB, &targeter);
+
+ auto targeter = initTargeterSplitRange(nss, endpointA, endpointB);
BatchedCommandRequest request([&] {
write_ops::Delete deleteOp(nss);
@@ -1023,8 +1018,8 @@ TEST_F(BatchWriteOpTest, MultiOpPartialSingleShardErrorOrdered) {
TEST_F(BatchWriteOpTest, MultiOpErrorAndWriteConcernErrorUnordered) {
NamespaceString nss("foo.bar");
ShardEndpoint endpoint(ShardId("shard"), ChunkVersion::IGNORED());
- MockNSTargeter targeter;
- initTargeterFullRange(nss, endpoint, &targeter);
+
+ auto targeter = initTargeterFullRange(nss, endpoint);
BatchedCommandRequest request([&] {
write_ops::Insert insertOp(nss);
@@ -1068,8 +1063,8 @@ TEST_F(BatchWriteOpTest, SingleOpErrorAndWriteConcernErrorOrdered) {
NamespaceString nss("foo.bar");
ShardEndpoint endpointA(ShardId("shardA"), ChunkVersion::IGNORED());
ShardEndpoint endpointB(ShardId("shardB"), ChunkVersion::IGNORED());
- MockNSTargeter targeter;
- initTargeterSplitRange(nss, endpointA, endpointB, &targeter);
+
+ auto targeter = initTargeterSplitRange(nss, endpointA, endpointB);
BatchedCommandRequest request([&] {
write_ops::Update updateOp(nss);
@@ -1123,8 +1118,8 @@ TEST_F(BatchWriteOpTest, SingleOpErrorAndWriteConcernErrorOrdered) {
TEST_F(BatchWriteOpTest, MultiOpFailedTargetOrdered) {
NamespaceString nss("foo.bar");
ShardEndpoint endpoint(ShardId("shard"), ChunkVersion::IGNORED());
- MockNSTargeter targeter;
- initTargeterHalfRange(nss, endpoint, &targeter);
+
+ auto targeter = initTargeterHalfRange(nss, endpoint);
BatchedCommandRequest request([&] {
write_ops::Insert insertOp(nss);
@@ -1178,8 +1173,8 @@ TEST_F(BatchWriteOpTest, MultiOpFailedTargetOrdered) {
TEST_F(BatchWriteOpTest, MultiOpFailedTargetUnordered) {
NamespaceString nss("foo.bar");
ShardEndpoint endpoint(ShardId("shard"), ChunkVersion::IGNORED());
- MockNSTargeter targeter;
- initTargeterHalfRange(nss, endpoint, &targeter);
+
+ auto targeter = initTargeterHalfRange(nss, endpoint);
BatchedCommandRequest request([&] {
write_ops::Insert insertOp(nss);
@@ -1233,8 +1228,8 @@ TEST_F(BatchWriteOpTest, MultiOpFailedBatchOrdered) {
NamespaceString nss("foo.bar");
ShardEndpoint endpointA(ShardId("shardA"), ChunkVersion::IGNORED());
ShardEndpoint endpointB(ShardId("shardB"), ChunkVersion::IGNORED());
- MockNSTargeter targeter;
- initTargeterSplitRange(nss, endpointA, endpointB, &targeter);
+
+ auto targeter = initTargeterSplitRange(nss, endpointA, endpointB);
BatchedCommandRequest request([&] {
write_ops::Insert insertOp(nss);
@@ -1281,8 +1276,8 @@ TEST_F(BatchWriteOpTest, MultiOpFailedBatchUnordered) {
NamespaceString nss("foo.bar");
ShardEndpoint endpointA(ShardId("shardA"), ChunkVersion::IGNORED());
ShardEndpoint endpointB(ShardId("shardB"), ChunkVersion::IGNORED());
- MockNSTargeter targeter;
- initTargeterSplitRange(nss, endpointA, endpointB, &targeter);
+
+ auto targeter = initTargeterSplitRange(nss, endpointA, endpointB);
BatchedCommandRequest request([&] {
write_ops::Insert insertOp(nss);
@@ -1338,8 +1333,8 @@ TEST_F(BatchWriteOpTest, MultiOpAbortOrdered) {
NamespaceString nss("foo.bar");
ShardEndpoint endpointA(ShardId("shardA"), ChunkVersion::IGNORED());
ShardEndpoint endpointB(ShardId("shardB"), ChunkVersion::IGNORED());
- MockNSTargeter targeter;
- initTargeterSplitRange(nss, endpointA, endpointB, &targeter);
+
+ auto targeter = initTargeterSplitRange(nss, endpointA, endpointB);
BatchedCommandRequest request([&] {
write_ops::Insert insertOp(nss);
@@ -1383,8 +1378,8 @@ TEST_F(BatchWriteOpTest, MultiOpAbortUnordered) {
NamespaceString nss("foo.bar");
ShardEndpoint endpointA(ShardId("shardA"), ChunkVersion::IGNORED());
ShardEndpoint endpointB(ShardId("shardB"), ChunkVersion::IGNORED());
- MockNSTargeter targeter;
- initTargeterSplitRange(nss, endpointA, endpointB, &targeter);
+
+ auto targeter = initTargeterSplitRange(nss, endpointA, endpointB);
BatchedCommandRequest request([&] {
write_ops::Insert insertOp(nss);
@@ -1424,8 +1419,8 @@ TEST_F(BatchWriteOpTest, MultiOpTwoWCErrors) {
NamespaceString nss("foo.bar");
ShardEndpoint endpointA(ShardId("shardA"), ChunkVersion::IGNORED());
ShardEndpoint endpointB(ShardId("shardB"), ChunkVersion::IGNORED());
- MockNSTargeter targeter;
- initTargeterSplitRange(nss, endpointA, endpointB, &targeter);
+
+ auto targeter = initTargeterSplitRange(nss, endpointA, endpointB);
BatchedCommandRequest request([&] {
write_ops::Insert insertOp(nss);
@@ -1473,8 +1468,8 @@ using BatchWriteOpLimitTests = WriteOpTestFixture;
TEST_F(BatchWriteOpLimitTests, OneBigDoc) {
NamespaceString nss("foo.bar");
ShardEndpoint endpoint(ShardId("shard"), ChunkVersion::IGNORED());
- MockNSTargeter targeter;
- initTargeterFullRange(nss, endpoint, &targeter);
+
+ auto targeter = initTargeterFullRange(nss, endpoint);
// Create a BSONObj (slightly) bigger than the maximum size by including a max-size string
const std::string bigString(BSONObjMaxUserSize, 'x');
@@ -1509,8 +1504,8 @@ TEST_F(BatchWriteOpLimitTests, OneBigDoc) {
TEST_F(BatchWriteOpLimitTests, OneBigOneSmall) {
NamespaceString nss("foo.bar");
ShardEndpoint endpoint(ShardId("shard"), ChunkVersion::IGNORED());
- MockNSTargeter targeter;
- initTargeterFullRange(nss, endpoint, &targeter);
+
+ auto targeter = initTargeterFullRange(nss, endpoint);
// Create a BSONObj (slightly) bigger than the maximum size by including a max-size string
const std::string bigString(BSONObjMaxUserSize, 'x');
diff --git a/src/mongo/s/write_ops/mock_ns_targeter.h b/src/mongo/s/write_ops/mock_ns_targeter.h
index e2bb79300d5..1eee9dfc0ea 100644
--- a/src/mongo/s/write_ops/mock_ns_targeter.h
+++ b/src/mongo/s/write_ops/mock_ns_targeter.h
@@ -59,12 +59,10 @@ struct MockRange {
*/
class MockNSTargeter : public NSTargeter {
public:
- void init(const NamespaceString& nss, std::vector<MockRange> mockRanges) {
- ASSERT(nss.isValid());
- _nss = nss;
-
- ASSERT(!mockRanges.empty());
- _mockRanges = std::move(mockRanges);
+ MockNSTargeter(const NamespaceString& nss, std::vector<MockRange> mockRanges)
+ : _nss(nss), _mockRanges(std::move(mockRanges)) {
+ ASSERT(_nss.isValid());
+ ASSERT(!_mockRanges.empty());
}
const NamespaceString& getNS() const {
diff --git a/src/mongo/s/write_ops/write_op.cpp b/src/mongo/s/write_ops/write_op.cpp
index c1627b70375..d37a4afdd2e 100644
--- a/src/mongo/s/write_ops/write_op.cpp
+++ b/src/mongo/s/write_ops/write_op.cpp
@@ -95,19 +95,23 @@ Status WriteOp::targetWrites(OperationContext* opCtx,
auto& endpoints = swEndpoints.getValue();
for (auto&& endpoint : endpoints) {
- _childOps.emplace_back(this);
+ // if the operation was already successfull on that shard, there is no need to repeat the
+ // write
+ if (!_successfulShardSet.count(endpoint.shardName)) {
+ _childOps.emplace_back(this);
- WriteOpRef ref(_itemRef.getItemIndex(), _childOps.size() - 1);
+ WriteOpRef ref(_itemRef.getItemIndex(), _childOps.size() - 1);
- // For now, multiple endpoints imply no versioning - we can't retry half a multi-write
- if (endpoints.size() > 1u) {
- endpoint.shardVersion = ChunkVersion::IGNORED();
- }
+ // For now, multiple endpoints imply no versioning - we can't retry half a multi-write
+ if (endpoints.size() > 1u) {
+ endpoint.shardVersion = ChunkVersion::IGNORED();
+ }
- targetedWrites->push_back(new TargetedWrite(std::move(endpoint), ref));
+ targetedWrites->push_back(new TargetedWrite(std::move(endpoint), ref));
- _childOps.back().pendingWrite = targetedWrites->back();
- _childOps.back().state = WriteOpState_Pending;
+ _childOps.back().pendingWrite = targetedWrites->back();
+ _childOps.back().state = WriteOpState_Pending;
+ }
}
_state = WriteOpState_Pending;
@@ -175,8 +179,6 @@ void WriteOp::_updateOpState() {
}
if (!childErrors.empty() && isRetryError) {
- // Since we're using broadcast mode for multi-shard writes, which cannot SCE
- invariant(childErrors.size() == 1u);
_state = WriteOpState_Ready;
} else if (!childErrors.empty()) {
_error.reset(new WriteErrorDetail);
@@ -213,6 +215,7 @@ void WriteOp::noteWriteComplete(const TargetedWrite& targetedWrite) {
const WriteOpRef& ref = targetedWrite.writeOpRef;
auto& childOp = _childOps[ref.second];
+ _successfulShardSet.emplace(targetedWrite.endpoint.shardName);
childOp.pendingWrite = NULL;
childOp.endpoint.reset(new ShardEndpoint(targetedWrite.endpoint));
childOp.state = WriteOpState_Completed;
diff --git a/src/mongo/s/write_ops/write_op.h b/src/mongo/s/write_ops/write_op.h
index 69e320b98d3..85004557953 100644
--- a/src/mongo/s/write_ops/write_op.h
+++ b/src/mongo/s/write_ops/write_op.h
@@ -177,6 +177,9 @@ private:
// filled when state == _Error
std::unique_ptr<WriteErrorDetail> _error;
+
+ // stores the shards where this write operation succeeded
+ stdx::unordered_set<ShardId, ShardId::Hasher> _successfulShardSet;
};
/**
diff --git a/src/mongo/s/write_ops/write_op_test.cpp b/src/mongo/s/write_ops/write_op_test.cpp
index 2ede012b967..a561c312de7 100644
--- a/src/mongo/s/write_ops/write_op_test.cpp
+++ b/src/mongo/s/write_ops/write_op_test.cpp
@@ -104,8 +104,7 @@ TEST(WriteOpTests, TargetSingle) {
WriteOp writeOp(BatchItemRef(&request, 0));
ASSERT_EQUALS(writeOp.getWriteState(), WriteOpState_Ready);
- MockNSTargeter targeter;
- targeter.init(nss, {MockRange(endpoint, BSON("x" << MINKEY), BSON("x" << MAXKEY))});
+ MockNSTargeter targeter(nss, {MockRange(endpoint, BSON("x" << MINKEY), BSON("x" << MAXKEY))});
OwnedPointerVector<TargetedWrite> targetedOwned;
std::vector<TargetedWrite*>& targeted = targetedOwned.mutableVector();
@@ -139,11 +138,10 @@ TEST(WriteOpTests, TargetMultiOneShard) {
WriteOp writeOp(BatchItemRef(&request, 0));
ASSERT_EQUALS(writeOp.getWriteState(), WriteOpState_Ready);
- MockNSTargeter targeter;
- targeter.init(nss,
- {MockRange(endpointA, BSON("x" << MINKEY), BSON("x" << 0)),
- MockRange(endpointB, BSON("x" << 0), BSON("x" << 10)),
- MockRange(endpointC, BSON("x" << 10), BSON("x" << MAXKEY))});
+ MockNSTargeter targeter(nss,
+ {MockRange(endpointA, BSON("x" << MINKEY), BSON("x" << 0)),
+ MockRange(endpointB, BSON("x" << 0), BSON("x" << 10)),
+ MockRange(endpointC, BSON("x" << 10), BSON("x" << MAXKEY))});
OwnedPointerVector<TargetedWrite> targetedOwned;
std::vector<TargetedWrite*>& targeted = targetedOwned.mutableVector();
@@ -178,11 +176,10 @@ TEST(WriteOpTests, TargetMultiAllShards) {
WriteOp writeOp(BatchItemRef(&request, 0));
ASSERT_EQUALS(writeOp.getWriteState(), WriteOpState_Ready);
- MockNSTargeter targeter;
- targeter.init(nss,
- {MockRange(endpointA, BSON("x" << MINKEY), BSON("x" << 0)),
- MockRange(endpointB, BSON("x" << 0), BSON("x" << 10)),
- MockRange(endpointC, BSON("x" << 10), BSON("x" << MAXKEY))});
+ MockNSTargeter targeter(nss,
+ {MockRange(endpointA, BSON("x" << MINKEY), BSON("x" << 0)),
+ MockRange(endpointB, BSON("x" << 0), BSON("x" << 10)),
+ MockRange(endpointC, BSON("x" << 10), BSON("x" << MAXKEY))});
OwnedPointerVector<TargetedWrite> targetedOwned;
std::vector<TargetedWrite*>& targeted = targetedOwned.mutableVector();
@@ -206,6 +203,55 @@ TEST(WriteOpTests, TargetMultiAllShards) {
ASSERT_EQUALS(writeOp.getWriteState(), WriteOpState_Completed);
}
+TEST(WriteOpTests, TargetMultiAllShardsAndErrorSingleChildOp) {
+ OperationContextNoop opCtx;
+
+ NamespaceString nss("foo.bar");
+ ShardEndpoint endpointA(ShardId("shardA"), ChunkVersion(10, 0, OID()));
+ ShardEndpoint endpointB(ShardId("shardB"), ChunkVersion(20, 0, OID()));
+
+ BatchedCommandRequest request([&] {
+ write_ops::Delete deleteOp(nss);
+ deleteOp.setDeletes({buildDelete(BSON("x" << GTE << -1 << LT << 1), false)});
+ return deleteOp;
+ }());
+
+ // Do multi-target write op
+ WriteOp writeOp(BatchItemRef(&request, 0));
+ ASSERT_EQUALS(writeOp.getWriteState(), WriteOpState_Ready);
+
+ MockNSTargeter targeter(nss,
+ {MockRange(endpointA, BSON("x" << MINKEY), BSON("x" << 0)),
+ MockRange(endpointB, BSON("x" << 0), BSON("x" << MAXKEY))});
+
+ OwnedPointerVector<TargetedWrite> targetedOwned;
+ std::vector<TargetedWrite*>& targeted = targetedOwned.mutableVector();
+ Status status = writeOp.targetWrites(&opCtx, targeter, &targeted);
+
+ ASSERT(status.isOK());
+ ASSERT_EQUALS(writeOp.getWriteState(), WriteOpState_Pending);
+ ASSERT_EQUALS(targeted.size(), 2u);
+ sortByEndpoint(&targeted);
+ ASSERT_EQUALS(targeted[0]->endpoint.shardName, endpointA.shardName);
+ ASSERT(ChunkVersion::isIgnoredVersion(targeted[0]->endpoint.shardVersion));
+ ASSERT_EQUALS(targeted[1]->endpoint.shardName, endpointB.shardName);
+ ASSERT(ChunkVersion::isIgnoredVersion(targeted[1]->endpoint.shardVersion));
+
+ // Simulate retryable error.
+ WriteErrorDetail retryableError;
+ retryableError.setIndex(0);
+ retryableError.setStatus({ErrorCodes::StaleShardVersion, "simulate ssv error for test"});
+ writeOp.noteWriteError(*targeted[0], retryableError);
+
+ // State should not change until we have result from all nodes.
+ ASSERT_EQUALS(writeOp.getWriteState(), WriteOpState_Pending);
+
+ writeOp.noteWriteComplete(*targeted[1]);
+
+ // State resets back to ready because of retryable error.
+ ASSERT_EQUALS(writeOp.getWriteState(), WriteOpState_Ready);
+}
+
// Single error after targeting test
TEST(WriteOpTests, ErrorSingle) {
OperationContextNoop opCtx;
@@ -224,8 +270,7 @@ TEST(WriteOpTests, ErrorSingle) {
WriteOp writeOp(BatchItemRef(&request, 0));
ASSERT_EQUALS(writeOp.getWriteState(), WriteOpState_Ready);
- MockNSTargeter targeter;
- targeter.init(nss, {MockRange(endpoint, BSON("x" << MINKEY), BSON("x" << MAXKEY))});
+ MockNSTargeter targeter(nss, {MockRange(endpoint, BSON("x" << MINKEY), BSON("x" << MAXKEY))});
OwnedPointerVector<TargetedWrite> targetedOwned;
std::vector<TargetedWrite*>& targeted = targetedOwned.mutableVector();
@@ -265,8 +310,7 @@ TEST(WriteOpTests, CancelSingle) {
WriteOp writeOp(BatchItemRef(&request, 0));
ASSERT_EQUALS(writeOp.getWriteState(), WriteOpState_Ready);
- MockNSTargeter targeter;
- targeter.init(nss, {MockRange(endpoint, BSON("x" << MINKEY), BSON("x" << MAXKEY))});
+ MockNSTargeter targeter(nss, {MockRange(endpoint, BSON("x" << MINKEY), BSON("x" << MAXKEY))});
OwnedPointerVector<TargetedWrite> targetedOwned;
std::vector<TargetedWrite*>& targeted = targetedOwned.mutableVector();
@@ -304,8 +348,7 @@ TEST(WriteOpTests, RetrySingleOp) {
WriteOp writeOp(BatchItemRef(&request, 0));
ASSERT_EQUALS(writeOp.getWriteState(), WriteOpState_Ready);
- MockNSTargeter targeter;
- targeter.init(nss, {MockRange(endpoint, BSON("x" << MINKEY), BSON("x" << MAXKEY))});
+ MockNSTargeter targeter(nss, {MockRange(endpoint, BSON("x" << MINKEY), BSON("x" << MAXKEY))});
OwnedPointerVector<TargetedWrite> targetedOwned;
std::vector<TargetedWrite*>& targeted = targetedOwned.mutableVector();