diff options
-rw-r--r-- | src/mongo/s/write_ops/batch_write_exec_test.cpp | 585 | ||||
-rw-r--r-- | src/mongo/s/write_ops/batch_write_op.cpp | 5 | ||||
-rw-r--r-- | src/mongo/s/write_ops/batch_write_op.h | 5 | ||||
-rw-r--r-- | src/mongo/s/write_ops/batch_write_op_test.cpp | 135 | ||||
-rw-r--r-- | src/mongo/s/write_ops/mock_ns_targeter.h | 10 | ||||
-rw-r--r-- | src/mongo/s/write_ops/write_op.cpp | 25 | ||||
-rw-r--r-- | src/mongo/s/write_ops/write_op.h | 3 | ||||
-rw-r--r-- | src/mongo/s/write_ops/write_op_test.cpp | 79 |
8 files changed, 698 insertions, 149 deletions
diff --git a/src/mongo/s/write_ops/batch_write_exec_test.cpp b/src/mongo/s/write_ops/batch_write_exec_test.cpp index 9e14e6c5964..b475f1178ef 100644 --- a/src/mongo/s/write_ops/batch_write_exec_test.cpp +++ b/src/mongo/s/write_ops/batch_write_exec_test.cpp @@ -30,6 +30,7 @@ #include "mongo/platform/basic.h" +#include "mongo/bson/bsonobjbuilder.h" #include "mongo/client/remote_command_targeter_factory_mock.h" #include "mongo/client/remote_command_targeter_mock.h" #include "mongo/db/commands.h" @@ -37,6 +38,7 @@ #include "mongo/s/catalog/type_shard.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/sharding_router_test_fixture.h" +#include "mongo/s/stale_exception.h" #include "mongo/s/write_ops/batch_write_exec.h" #include "mongo/s/write_ops/batched_command_request.h" #include "mongo/s/write_ops/batched_command_response.h" @@ -47,11 +49,24 @@ namespace mongo { namespace { -const HostAndPort kTestShardHost = HostAndPort("FakeHost", 12345); +using executor::RemoteCommandRequest; + const HostAndPort kTestConfigShardHost = HostAndPort("FakeConfigHost", 12345); -const std::string shardName = "FakeShard"; + +const HostAndPort kTestShardHost1 = HostAndPort("FakeHost1", 12345); +const std::string kShardName1 = "FakeShard1"; +const HostAndPort kTestShardHost2 = HostAndPort("FakeHost2", 12345); +const std::string kShardName2 = "FakeShard2"; + const int kMaxRoundsWithoutProgress = 5; +write_ops::UpdateOpEntry buildUpdate(const BSONObj& q, const BSONObj& u) { + write_ops::UpdateOpEntry entry; + entry.setQ(q); + entry.setU(u); + return entry; +} + /** * Mimics a single shard backend for a particular collection which can be initialized with a * set of write command results to return. @@ -65,29 +80,38 @@ public: ShardingTestFixture::setUp(); setRemote(HostAndPort("ClientHost", 12345)); - // Set up the RemoteCommandTargeter for the config shard. + // Set up the RemoteCommandTargeter for the config shard configTargeter()->setFindHostReturnValue(kTestConfigShardHost); + // Add a RemoteCommandTargeter for the data shard2 + targeterFactory()->addTargeterToReturn(ConnectionString(kTestShardHost1), [] { + std::unique_ptr<RemoteCommandTargeterMock> targeter( + std::make_unique<RemoteCommandTargeterMock>()); + targeter->setConnectionStringReturnValue(ConnectionString(kTestShardHost1)); + targeter->setFindHostReturnValue(kTestShardHost1); + return targeter; + }()); + + targeterFactory()->addTargeterToReturn(ConnectionString(kTestShardHost2), [] { + std::unique_ptr<RemoteCommandTargeterMock> targeter( + std::make_unique<RemoteCommandTargeterMock>()); + targeter->setConnectionStringReturnValue(ConnectionString(kTestShardHost2)); + targeter->setFindHostReturnValue(kTestShardHost2); + return targeter; + }()); - // Add a RemoteCommandTargeter for the data shard. - std::unique_ptr<RemoteCommandTargeterMock> targeter( - stdx::make_unique<RemoteCommandTargeterMock>()); - targeter->setConnectionStringReturnValue(ConnectionString(kTestShardHost)); - targeter->setFindHostReturnValue(kTestShardHost); - targeterFactory()->addTargeterToReturn(ConnectionString(kTestShardHost), - std::move(targeter)); - - // Set up the shard registry to contain the fake shard. - ShardType shardType; - shardType.setName(shardName); - shardType.setHost(kTestShardHost.toString()); - std::vector<ShardType> shards{shardType}; - setupShards(shards); - - // Set up the namespace targeter to target the fake shard. - nsTargeter.init(nss, - {MockRange(ShardEndpoint(shardName, ChunkVersion::IGNORED()), - BSON("x" << MINKEY), - BSON("x" << MAXKEY))}); + // Set up the shard registry to contain the fake shards + setupShards({[] { + ShardType shardType; + shardType.setName(kShardName1); + shardType.setHost(kTestShardHost1.toString()); + return shardType; + }(), + [] { + ShardType shardType; + shardType.setName(kShardName2); + shardType.setHost(kTestShardHost2.toString()); + return shardType; + }()}); } void expectInsertsReturnSuccess(const std::vector<BSONObj>& expected) { @@ -197,10 +221,13 @@ public: }); } - ConnectionString shardHost{kTestShardHost}; - NamespaceString nss{"foo.bar"}; + const NamespaceString nss{"foo.bar"}; - MockNSTargeter nsTargeter; + MockNSTargeter singleShardNSTargeter{ + nss, + {MockRange(ShardEndpoint(kShardName1, ChunkVersion(100, 200, OID::gen())), + BSON("x" << MINKEY), + BSON("x" << MAXKEY))}}; }; // @@ -224,7 +251,8 @@ TEST_F(BatchWriteExecTest, SingleOp) { auto future = launchAsync([&] { BatchedCommandResponse response; BatchWriteExecStats stats; - BatchWriteExec::executeBatch(operationContext(), nsTargeter, request, &response, &stats); + BatchWriteExec::executeBatch( + operationContext(), singleShardNSTargeter, request, &response, &stats); ASSERT(response.getOk()); ASSERT_EQ(1LL, response.getN()); ASSERT_EQ(1, stats.numRounds); @@ -260,7 +288,8 @@ TEST_F(BatchWriteExecTest, MultiOpLarge) { auto future = launchAsync([&] { BatchedCommandResponse response; BatchWriteExecStats stats; - BatchWriteExec::executeBatch(operationContext(), nsTargeter, request, &response, &stats); + BatchWriteExec::executeBatch( + operationContext(), singleShardNSTargeter, request, &response, &stats); ASSERT(response.getOk()); ASSERT_EQUALS(response.getN(), kNumDocsToInsert); @@ -292,7 +321,8 @@ TEST_F(BatchWriteExecTest, SingleOpError) { auto future = launchAsync([&] { BatchedCommandResponse response; BatchWriteExecStats stats; - BatchWriteExec::executeBatch(operationContext(), nsTargeter, request, &response, &stats); + BatchWriteExec::executeBatch( + operationContext(), singleShardNSTargeter, request, &response, &stats); ASSERT(response.getOk()); ASSERT_EQ(0, response.getN()); ASSERT(response.isErrDetailsSet()); @@ -308,6 +338,473 @@ TEST_F(BatchWriteExecTest, SingleOpError) { future.timed_get(kFutureTimeout); } +void serializeErrorToBSON(const Status& status, BSONObjBuilder* builder) { + builder->append("code", status.code()); + builder->append("codeName", ErrorCodes::errorString(status.code())); + builder->append("errmsg", status.reason()); + + if (auto ei = status.extraInfo()) + ei->serialize(builder); +} + +TEST_F(BatchWriteExecTest, StaleShardVersionReturnedFromBatchWithSingleMultiWrite) { + BatchedCommandRequest request([&] { + write_ops::Update updateOp(nss); + updateOp.setWriteCommandBase([] { + write_ops::WriteCommandBase writeCommandBase; + writeCommandBase.setOrdered(false); + return writeCommandBase; + }()); + updateOp.setUpdates({buildUpdate(BSON("_id" << 100), BSON("Key" << 100))}); + return updateOp; + }()); + request.setWriteConcern(BSONObj()); + + static const auto epoch = OID::gen(); + + class MultiShardTargeter : public MockNSTargeter { + public: + using MockNSTargeter::MockNSTargeter; + + StatusWith<std::vector<ShardEndpoint>> targetUpdate( + OperationContext* opCtx, const write_ops::UpdateOpEntry& updateDoc) const override { + return std::vector<ShardEndpoint>{ + ShardEndpoint(kShardName1, ChunkVersion(100, 200, epoch)), + ShardEndpoint(kShardName2, ChunkVersion(101, 200, epoch))}; + } + }; + + MultiShardTargeter multiShardNSTargeter( + nss, + {MockRange(ShardEndpoint(kShardName1, ChunkVersion(100, 200, epoch)), + BSON("x" << MINKEY), + BSON("x" << 0)), + MockRange(ShardEndpoint(kShardName2, ChunkVersion(101, 200, epoch)), + BSON("x" << 0), + BSON("x" << MAXKEY))}); + + auto future = launchAsync([&] { + BatchedCommandResponse response; + BatchWriteExecStats stats; + BatchWriteExec::executeBatch( + operationContext(), multiShardNSTargeter, request, &response, &stats); + + return response; + }); + + onCommandForPoolExecutor([&](const RemoteCommandRequest& request) { + ASSERT_EQ(kTestShardHost1, request.target); + + BatchedCommandResponse response; + response.setStatus(Status::OK()); + response.setNModified(1); + + return response.toBSON(); + }); + + onCommandForPoolExecutor([&](const RemoteCommandRequest& request) { + ASSERT_EQ(kTestShardHost2, request.target); + + BatchedCommandResponse response; + response.setStatus(Status::OK()); + response.setNModified(0); + response.addToErrDetails([&] { + WriteErrorDetail* errDetail = new WriteErrorDetail(); + errDetail->setIndex(0); + errDetail->setStatus({ErrorCodes::StaleShardVersion, "Stale shard version"}); + errDetail->setErrInfo([&] { + Status ssvStatus(StaleConfigInfo(nss, + ChunkVersion(101, 200, epoch), + ChunkVersion(105, 200, epoch)), + "Stale shard version"); + BSONObjBuilder builder; + serializeErrorToBSON(ssvStatus, &builder); + return builder.obj(); + }()); + return errDetail; + }()); + + return response.toBSON(); + }); + + onCommandForPoolExecutor([&](const RemoteCommandRequest& request) { + ASSERT_EQ(kTestShardHost2, request.target); + + BatchedCommandResponse response; + response.setStatus(Status::OK()); + response.setNModified(2); + + return response.toBSON(); + }); + + auto response = future.timed_get(kFutureTimeout); + + ASSERT_OK(response.getTopLevelStatus()); + ASSERT_EQ(3, response.getNModified()); +} + +TEST_F(BatchWriteExecTest, + RetryableErrorReturnedFromMultiWriteWithShard1AllOKShard2AllStaleShardVersion) { + BatchedCommandRequest request([&] { + write_ops::Update updateOp(nss); + updateOp.setWriteCommandBase([] { + write_ops::WriteCommandBase writeCommandBase; + writeCommandBase.setOrdered(false); + return writeCommandBase; + }()); + updateOp.setUpdates({buildUpdate(BSON("id" << 150), BSON("x" << 1)), + buildUpdate(BSON("id" << 200), BSON("y" << 2))}); + return updateOp; + }()); + request.setWriteConcern(BSONObj()); + + static const auto epoch = OID::gen(); + + // This allows the batch to target each write operation + // to a specific shard (kShardName2), to perform this test + class MultiShardTargeter : public MockNSTargeter { + public: + using MockNSTargeter::MockNSTargeter; + + StatusWith<std::vector<ShardEndpoint>> targetUpdate( + OperationContext* opCtx, const write_ops::UpdateOpEntry& updateDoc) const override { + return std::vector<ShardEndpoint>{ + ShardEndpoint(kShardName1, ChunkVersion(100, 200, epoch)), + ShardEndpoint(kShardName2, ChunkVersion(101, 200, epoch))}; + } + }; + + MultiShardTargeter multiShardNSTargeter( + nss, + {MockRange(ShardEndpoint(kShardName1, ChunkVersion(100, 200, epoch)), + BSON("sk" << MINKEY), + BSON("sk" << 10)), + MockRange(ShardEndpoint(kShardName2, ChunkVersion(101, 200, epoch)), + BSON("sk" << 10), + BSON("sk" << MAXKEY))}); + + auto future = launchAsync([&] { + BatchedCommandResponse response; + BatchWriteExecStats stats; + BatchWriteExec::executeBatch( + operationContext(), multiShardNSTargeter, request, &response, &stats); + + return response; + }); + + onCommandForPoolExecutor([&](const RemoteCommandRequest& request) { + ASSERT_EQ(kTestShardHost1, request.target); + + BatchedCommandResponse response; + response.setStatus(Status::OK()); + response.setNModified(1); + + return response.toBSON(); + }); + + onCommandForPoolExecutor([&](const RemoteCommandRequest& request) { + ASSERT_EQ(kTestShardHost2, request.target); + + BatchedCommandResponse response; + response.setStatus(Status::OK()); + response.setNModified(0); + response.addToErrDetails([&] { + WriteErrorDetail* errDetail = new WriteErrorDetail(); + errDetail->setIndex(0); + errDetail->setStatus({ErrorCodes::StaleShardVersion, "Stale shard version"}); + errDetail->setErrInfo([&] { + Status ssvStatus(StaleConfigInfo(nss, + ChunkVersion(101, 200, epoch), + ChunkVersion(105, 200, epoch)), + "Stale shard version"); + BSONObjBuilder builder; + serializeErrorToBSON(ssvStatus, &builder); + return builder.obj(); + }()); + return errDetail; + }()); + response.addToErrDetails([&] { + WriteErrorDetail* errDetail = new WriteErrorDetail(); + errDetail->setIndex(1); + errDetail->setStatus({ErrorCodes::StaleShardVersion, "Stale shard version"}); + errDetail->setErrInfo([&] { + Status ssvStatus(StaleConfigInfo(nss, + ChunkVersion(101, 200, epoch), + ChunkVersion(105, 200, epoch)), + "Stale shard version"); + BSONObjBuilder builder; + serializeErrorToBSON(ssvStatus, &builder); + return builder.obj(); + }()); + return errDetail; + }()); + + return response.toBSON(); + }); + + onCommandForPoolExecutor([&](const RemoteCommandRequest& request) { + ASSERT_EQ(kTestShardHost2, request.target); + + BatchedCommandResponse response; + response.setStatus(Status::OK()); + response.setNModified(2); + + return response.toBSON(); + }); + + auto response = future.timed_get(kFutureTimeout); + ASSERT_OK(response.getTopLevelStatus()); + ASSERT_EQ(3, response.getNModified()); +} + +TEST_F(BatchWriteExecTest, RetryableErrorReturnedFromMultiWriteWithShard1FirstOKShard2FirstErr) { + BatchedCommandRequest request([&] { + write_ops::Update updateOp(nss); + updateOp.setWriteCommandBase([] { + write_ops::WriteCommandBase writeCommandBase; + writeCommandBase.setOrdered(false); + return writeCommandBase; + }()); + updateOp.setUpdates({buildUpdate(BSON("id" << 150), BSON("x" << 1)), + buildUpdate(BSON("id" << 200), BSON("y" << 2))}); + return updateOp; + }()); + request.setWriteConcern(BSONObj()); + + static const auto epoch = OID::gen(); + + // This allows the batch to target each write operation + // to a specific shard (kShardName2), to perform this test + class MultiShardTargeter : public MockNSTargeter { + public: + using MockNSTargeter::MockNSTargeter; + + StatusWith<std::vector<ShardEndpoint>> targetUpdate( + OperationContext* opCtx, const write_ops::UpdateOpEntry& updateDoc) const override { + return std::vector<ShardEndpoint>{ + ShardEndpoint(kShardName1, ChunkVersion(100, 200, epoch)), + ShardEndpoint(kShardName2, ChunkVersion(101, 200, epoch))}; + } + }; + + MultiShardTargeter multiShardNSTargeter( + nss, + {MockRange(ShardEndpoint(kShardName1, ChunkVersion(100, 200, epoch)), + BSON("sk" << MINKEY), + BSON("sk" << 10)), + MockRange(ShardEndpoint(kShardName2, ChunkVersion(101, 200, epoch)), + BSON("sk" << 10), + BSON("sk" << MAXKEY))}); + + auto future = launchAsync([&] { + BatchedCommandResponse response; + BatchWriteExecStats stats; + BatchWriteExec::executeBatch( + operationContext(), multiShardNSTargeter, request, &response, &stats); + + return response; + }); + + onCommandForPoolExecutor([&](const RemoteCommandRequest& request) { + ASSERT_EQ(kTestShardHost1, request.target); + + BatchedCommandResponse response; + response.setStatus(Status::OK()); + response.setNModified(0); + response.addToErrDetails([&] { + WriteErrorDetail* errDetail = new WriteErrorDetail(); + errDetail->setIndex(1); + errDetail->setStatus({ErrorCodes::StaleShardVersion, "Stale shard version"}); + errDetail->setErrInfo([&] { + Status ssvStatus(StaleConfigInfo(nss, + ChunkVersion(101, 200, epoch), + ChunkVersion(105, 200, epoch)), + "Stale shard version"); + BSONObjBuilder builder; + serializeErrorToBSON(ssvStatus, &builder); + return builder.obj(); + }()); + return errDetail; + }()); + + return response.toBSON(); + }); + + onCommandForPoolExecutor([&](const RemoteCommandRequest& request) { + ASSERT_EQ(kTestShardHost2, request.target); + + BatchedCommandResponse response; + response.setStatus(Status::OK()); + response.setNModified(0); + response.addToErrDetails([&] { + WriteErrorDetail* errDetail = new WriteErrorDetail(); + errDetail->setIndex(0); + errDetail->setStatus({ErrorCodes::StaleShardVersion, "Stale shard version"}); + errDetail->setErrInfo([&] { + Status ssvStatus(StaleConfigInfo(nss, + ChunkVersion(101, 200, epoch), + ChunkVersion(105, 200, epoch)), + "Stale shard version"); + BSONObjBuilder builder; + serializeErrorToBSON(ssvStatus, &builder); + return builder.obj(); + }()); + return errDetail; + }()); + + return response.toBSON(); + }); + + + onCommandForPoolExecutor([&](const RemoteCommandRequest& request) { + ASSERT_EQ(kTestShardHost1, request.target); + + BatchedCommandResponse response; + response.setStatus(Status::OK()); + response.setNModified(1); + + return response.toBSON(); + }); + + onCommandForPoolExecutor([&](const RemoteCommandRequest& request) { + ASSERT_EQ(kTestShardHost2, request.target); + + BatchedCommandResponse response; + response.setStatus(Status::OK()); + response.setNModified(1); + + return response.toBSON(); + }); + + auto response = future.timed_get(kFutureTimeout); + ASSERT_OK(response.getTopLevelStatus()); + ASSERT_EQ(2, response.getNModified()); +} + +TEST_F(BatchWriteExecTest, RetryableErrorReturnedFromMultiWriteWithShard1FirstOKShard2FirstOK) { + BatchedCommandRequest request([&] { + write_ops::Update updateOp(nss); + updateOp.setWriteCommandBase([] { + write_ops::WriteCommandBase writeCommandBase; + writeCommandBase.setOrdered(false); + return writeCommandBase; + }()); + updateOp.setUpdates({buildUpdate(BSON("id" << 150), BSON("x" << 1)), + buildUpdate(BSON("id" << 200), BSON("y" << 2))}); + return updateOp; + }()); + request.setWriteConcern(BSONObj()); + + static const auto epoch = OID::gen(); + + // This allows the batch to target each write operation + // to a specific shard (kShardName2), to perform this test + class MultiShardTargeter : public MockNSTargeter { + public: + using MockNSTargeter::MockNSTargeter; + + StatusWith<std::vector<ShardEndpoint>> targetUpdate( + OperationContext* opCtx, const write_ops::UpdateOpEntry& updateDoc) const override { + return std::vector<ShardEndpoint>{ + ShardEndpoint(kShardName1, ChunkVersion(100, 200, epoch)), + ShardEndpoint(kShardName2, ChunkVersion(101, 200, epoch))}; + } + }; + + MultiShardTargeter multiShardNSTargeter( + nss, + {MockRange(ShardEndpoint(kShardName1, ChunkVersion(100, 200, epoch)), + BSON("sk" << MINKEY), + BSON("sk" << 10)), + MockRange(ShardEndpoint(kShardName2, ChunkVersion(101, 200, epoch)), + BSON("sk" << 10), + BSON("sk" << MAXKEY))}); + + auto future = launchAsync([&] { + BatchedCommandResponse response; + BatchWriteExecStats stats; + BatchWriteExec::executeBatch( + operationContext(), multiShardNSTargeter, request, &response, &stats); + + return response; + }); + + onCommandForPoolExecutor([&](const RemoteCommandRequest& request) { + ASSERT_EQ(kTestShardHost1, request.target); + + BatchedCommandResponse response; + response.setStatus(Status::OK()); + response.setNModified(0); + response.addToErrDetails([&] { + WriteErrorDetail* errDetail = new WriteErrorDetail(); + errDetail->setIndex(1); + errDetail->setStatus({ErrorCodes::StaleShardVersion, "Stale shard version"}); + errDetail->setErrInfo([&] { + Status ssvStatus(StaleConfigInfo(nss, + ChunkVersion(101, 200, epoch), + ChunkVersion(105, 200, epoch)), + "Stale shard version"); + BSONObjBuilder builder; + serializeErrorToBSON(ssvStatus, &builder); + return builder.obj(); + }()); + return errDetail; + }()); + + return response.toBSON(); + }); + + onCommandForPoolExecutor([&](const RemoteCommandRequest& request) { + ASSERT_EQ(kTestShardHost2, request.target); + + BatchedCommandResponse response; + response.setStatus(Status::OK()); + response.setNModified(0); + response.addToErrDetails([&] { + WriteErrorDetail* errDetail = new WriteErrorDetail(); + errDetail->setIndex(1); + errDetail->setStatus({ErrorCodes::StaleShardVersion, "Stale shard version"}); + errDetail->setErrInfo([&] { + Status ssvStatus(StaleConfigInfo(nss, + ChunkVersion(101, 200, epoch), + ChunkVersion(105, 200, epoch)), + "Stale shard version"); + BSONObjBuilder builder; + serializeErrorToBSON(ssvStatus, &builder); + return builder.obj(); + }()); + return errDetail; + }()); + + return response.toBSON(); + }); + + + onCommandForPoolExecutor([&](const RemoteCommandRequest& request) { + ASSERT_EQ(kTestShardHost1, request.target); + + BatchedCommandResponse response; + response.setStatus(Status::OK()); + response.setNModified(1); + + return response.toBSON(); + }); + + onCommandForPoolExecutor([&](const RemoteCommandRequest& request) { + ASSERT_EQ(kTestShardHost2, request.target); + + BatchedCommandResponse response; + response.setStatus(Status::OK()); + response.setNModified(1); + + return response.toBSON(); + }); + + auto response = future.timed_get(kFutureTimeout); + ASSERT_OK(response.getTopLevelStatus()); + ASSERT_EQ(2, response.getNModified()); +} + // // Test retryable errors // @@ -329,7 +826,8 @@ TEST_F(BatchWriteExecTest, StaleOp) { auto future = launchAsync([&] { BatchedCommandResponse response; BatchWriteExecStats stats; - BatchWriteExec::executeBatch(operationContext(), nsTargeter, request, &response, &stats); + BatchWriteExec::executeBatch( + operationContext(), singleShardNSTargeter, request, &response, &stats); ASSERT(response.getOk()); ASSERT_EQUALS(1, stats.numStaleBatches); @@ -359,7 +857,8 @@ TEST_F(BatchWriteExecTest, MultiStaleOp) { auto future = launchAsync([&] { BatchedCommandResponse response; BatchWriteExecStats stats; - BatchWriteExec::executeBatch(operationContext(), nsTargeter, request, &response, &stats); + BatchWriteExec::executeBatch( + operationContext(), singleShardNSTargeter, request, &response, &stats); ASSERT(response.getOk()); ASSERT_EQUALS(3, stats.numStaleBatches); @@ -377,10 +876,11 @@ TEST_F(BatchWriteExecTest, MultiStaleOp) { future.timed_get(kFutureTimeout); } -TEST_F(BatchWriteExecTest, TooManyStaleOp) { - // Retry op in exec too many times (without refresh) b/c of stale config (the mock nsTargeter - // doesn't report progress on refresh). We should report a no progress error for everything in - // the batch. +TEST_F(BatchWriteExecTest, TooManyStaleShardOp) { + // Retry op in exec too many times (without refresh) b/c of stale config (the mock + // singleShardNSTargeter doesn't report progress on refresh). We should report a no progress + // error for everything in the batch. + BatchedCommandRequest request([&] { write_ops::Insert insertOp(nss); insertOp.setWriteCommandBase([] { @@ -396,7 +896,8 @@ TEST_F(BatchWriteExecTest, TooManyStaleOp) { auto future = launchAsync([&] { BatchedCommandResponse response; BatchWriteExecStats stats; - BatchWriteExec::executeBatch(operationContext(), nsTargeter, request, &response, &stats); + BatchWriteExec::executeBatch( + operationContext(), singleShardNSTargeter, request, &response, &stats); ASSERT(response.getOk()); ASSERT_EQ(0, response.getN()); ASSERT(response.isErrDetailsSet()); @@ -444,7 +945,8 @@ TEST_F(BatchWriteExecTest, RetryableWritesLargeBatch) { auto future = launchAsync([&] { BatchedCommandResponse response; BatchWriteExecStats stats; - BatchWriteExec::executeBatch(operationContext(), nsTargeter, request, &response, &stats); + BatchWriteExec::executeBatch( + operationContext(), singleShardNSTargeter, request, &response, &stats); ASSERT(response.getOk()); ASSERT_EQUALS(response.getN(), kNumDocsToInsert); @@ -478,7 +980,8 @@ TEST_F(BatchWriteExecTest, RetryableErrorNoTxnNumber) { auto future = launchAsync([&] { BatchedCommandResponse response; BatchWriteExecStats stats; - BatchWriteExec::executeBatch(operationContext(), nsTargeter, request, &response, &stats); + BatchWriteExec::executeBatch( + operationContext(), singleShardNSTargeter, request, &response, &stats); ASSERT(response.getOk()); ASSERT_EQ(0, response.getN()); @@ -519,7 +1022,8 @@ TEST_F(BatchWriteExecTest, RetryableErrorTxnNumber) { auto future = launchAsync([&] { BatchedCommandResponse response; BatchWriteExecStats stats; - BatchWriteExec::executeBatch(operationContext(), nsTargeter, request, &response, &stats); + BatchWriteExec::executeBatch( + operationContext(), singleShardNSTargeter, request, &response, &stats); ASSERT(response.getOk()); ASSERT(!response.isErrDetailsSet()); @@ -556,7 +1060,8 @@ TEST_F(BatchWriteExecTest, NonRetryableErrorTxnNumber) { auto future = launchAsync([&] { BatchedCommandResponse response; BatchWriteExecStats stats; - BatchWriteExec::executeBatch(operationContext(), nsTargeter, request, &response, &stats); + BatchWriteExec::executeBatch( + operationContext(), singleShardNSTargeter, request, &response, &stats); ASSERT(response.getOk()); ASSERT_EQ(0, response.getN()); diff --git a/src/mongo/s/write_ops/batch_write_op.cpp b/src/mongo/s/write_ops/batch_write_op.cpp index 229ce247335..14e3fce88c1 100644 --- a/src/mongo/s/write_ops/batch_write_op.cpp +++ b/src/mongo/s/write_ops/batch_write_op.cpp @@ -839,4 +839,9 @@ const std::vector<ShardError>& TrackedErrors::getErrors(int errCode) const { return _errorMap.find(errCode)->second; } +void TargetedWriteBatch::addWrite(TargetedWrite* targetedWrite, int estWriteSize) { + _writes.mutableVector().push_back(targetedWrite); + _estimatedSizeBytes += estWriteSize; +} + } // namespace mongo diff --git a/src/mongo/s/write_ops/batch_write_op.h b/src/mongo/s/write_ops/batch_write_op.h index 73f30018edb..5ea5f1261c5 100644 --- a/src/mongo/s/write_ops/batch_write_op.h +++ b/src/mongo/s/write_ops/batch_write_op.h @@ -262,10 +262,7 @@ public: /** * TargetedWrite is owned here once given to the TargetedWriteBatch. */ - void addWrite(TargetedWrite* targetedWrite, int estWriteSize) { - _writes.mutableVector().push_back(targetedWrite); - _estimatedSizeBytes += estWriteSize; - } + void addWrite(TargetedWrite* targetedWrite, int estWriteSize); private: // Where to send the batch diff --git a/src/mongo/s/write_ops/batch_write_op_test.cpp b/src/mongo/s/write_ops/batch_write_op_test.cpp index 04700a2aa60..2a125d62aa8 100644 --- a/src/mongo/s/write_ops/batch_write_op_test.cpp +++ b/src/mongo/s/write_ops/batch_write_op_test.cpp @@ -41,26 +41,21 @@ namespace mongo { namespace { -void initTargeterFullRange(const NamespaceString& nss, - const ShardEndpoint& endpoint, - MockNSTargeter* targeter) { - targeter->init(nss, {MockRange(endpoint, BSON("x" << MINKEY), BSON("x" << MAXKEY))}); +auto initTargeterFullRange(const NamespaceString& nss, const ShardEndpoint& endpoint) { + return MockNSTargeter(nss, {MockRange(endpoint, BSON("x" << MINKEY), BSON("x" << MAXKEY))}); } -void initTargeterSplitRange(const NamespaceString& nss, +auto initTargeterSplitRange(const NamespaceString& nss, const ShardEndpoint& endpointA, - const ShardEndpoint& endpointB, - MockNSTargeter* targeter) { - targeter->init(nss, - {MockRange(endpointA, BSON("x" << MINKEY), BSON("x" << 0)), - MockRange(endpointB, BSON("x" << 0), BSON("x" << MAXKEY))}); + const ShardEndpoint& endpointB) { + return MockNSTargeter(nss, + {MockRange(endpointA, BSON("x" << MINKEY), BSON("x" << 0)), + MockRange(endpointB, BSON("x" << 0), BSON("x" << MAXKEY))}); } -void initTargeterHalfRange(const NamespaceString& nss, - const ShardEndpoint& endpoint, - MockNSTargeter* targeter) { +auto initTargeterHalfRange(const NamespaceString& nss, const ShardEndpoint& endpoint) { // x >= 0 values are untargetable - targeter->init(nss, {MockRange(endpoint, BSON("x" << MINKEY), BSON("x" << 0))}); + return MockNSTargeter(nss, {MockRange(endpoint, BSON("x" << MINKEY), BSON("x" << 0))}); } write_ops::DeleteOpEntry buildDelete(const BSONObj& query, bool multi) { @@ -130,8 +125,8 @@ using BatchWriteOpTest = WriteOpTestFixture; TEST_F(BatchWriteOpTest, SingleOp) { NamespaceString nss("foo.bar"); ShardEndpoint endpoint(ShardId("shard"), ChunkVersion::IGNORED()); - MockNSTargeter targeter; - initTargeterFullRange(nss, endpoint, &targeter); + + auto targeter = initTargeterFullRange(nss, endpoint); // Do single-target, single doc batch write op BatchedCommandRequest request([&] { @@ -163,8 +158,8 @@ TEST_F(BatchWriteOpTest, SingleOp) { TEST_F(BatchWriteOpTest, SingleError) { NamespaceString nss("foo.bar"); ShardEndpoint endpoint(ShardId("shard"), ChunkVersion::IGNORED()); - MockNSTargeter targeter; - initTargeterFullRange(nss, endpoint, &targeter); + + auto targeter = initTargeterFullRange(nss, endpoint); // Do single-target, single doc batch write op BatchedCommandRequest request([&] { @@ -202,8 +197,8 @@ TEST_F(BatchWriteOpTest, SingleError) { TEST_F(BatchWriteOpTest, SingleTargetError) { NamespaceString nss("foo.bar"); ShardEndpoint endpoint(ShardId("shard"), ChunkVersion::IGNORED()); - MockNSTargeter targeter; - initTargeterHalfRange(nss, endpoint, &targeter); + + auto targeter = initTargeterHalfRange(nss, endpoint); // Do untargetable delete op BatchedCommandRequest request([&] { @@ -237,8 +232,8 @@ TEST_F(BatchWriteOpTest, SingleTargetError) { TEST_F(BatchWriteOpTest, SingleWriteConcernErrorOrdered) { NamespaceString nss("foo.bar"); ShardEndpoint endpoint(ShardId("shard"), ChunkVersion::IGNORED()); - MockNSTargeter targeter; - initTargeterFullRange(nss, endpoint, &targeter); + + auto targeter = initTargeterFullRange(nss, endpoint); BatchedCommandRequest request([&] { write_ops::Insert insertOp(nss); @@ -279,8 +274,8 @@ TEST_F(BatchWriteOpTest, SingleWriteConcernErrorOrdered) { TEST_F(BatchWriteOpTest, SingleStaleError) { NamespaceString nss("foo.bar"); ShardEndpoint endpoint(ShardId("shard"), ChunkVersion::IGNORED()); - MockNSTargeter targeter; - initTargeterFullRange(nss, endpoint, &targeter); + + auto targeter = initTargeterFullRange(nss, endpoint); BatchedCommandRequest request([&] { write_ops::Insert insertOp(nss); @@ -333,8 +328,8 @@ TEST_F(BatchWriteOpTest, SingleStaleError) { TEST_F(BatchWriteOpTest, MultiOpSameShardOrdered) { NamespaceString nss("foo.bar"); ShardEndpoint endpoint(ShardId("shard"), ChunkVersion::IGNORED()); - MockNSTargeter targeter; - initTargeterFullRange(nss, endpoint, &targeter); + + auto targeter = initTargeterFullRange(nss, endpoint); // Do single-target, multi-doc batch write op BatchedCommandRequest request([&] { @@ -370,8 +365,8 @@ TEST_F(BatchWriteOpTest, MultiOpSameShardOrdered) { TEST_F(BatchWriteOpTest, MultiOpSameShardUnordered) { NamespaceString nss("foo.bar"); ShardEndpoint endpoint(ShardId("shard"), ChunkVersion::IGNORED()); - MockNSTargeter targeter; - initTargeterFullRange(nss, endpoint, &targeter); + + auto targeter = initTargeterFullRange(nss, endpoint); // Do single-target, multi-doc batch write op BatchedCommandRequest request([&] { @@ -414,8 +409,8 @@ TEST_F(BatchWriteOpTest, MultiOpTwoShardsOrdered) { NamespaceString nss("foo.bar"); ShardEndpoint endpointA(ShardId("shardA"), ChunkVersion::IGNORED()); ShardEndpoint endpointB(ShardId("shardB"), ChunkVersion::IGNORED()); - MockNSTargeter targeter; - initTargeterSplitRange(nss, endpointA, endpointB, &targeter); + + auto targeter = initTargeterSplitRange(nss, endpointA, endpointB); // Do multi-target, multi-doc batch write op BatchedCommandRequest request([&] { @@ -481,8 +476,8 @@ TEST_F(BatchWriteOpTest, MultiOpTwoShardsUnordered) { NamespaceString nss("foo.bar"); ShardEndpoint endpointA(ShardId("shardA"), ChunkVersion::IGNORED()); ShardEndpoint endpointB(ShardId("shardB"), ChunkVersion::IGNORED()); - MockNSTargeter targeter; - initTargeterSplitRange(nss, endpointA, endpointB, &targeter); + + auto targeter = initTargeterSplitRange(nss, endpointA, endpointB); // Do multi-target, multi-doc batch write op BatchedCommandRequest request([&] { @@ -527,8 +522,8 @@ TEST_F(BatchWriteOpTest, MultiOpTwoShardsEachOrdered) { NamespaceString nss("foo.bar"); ShardEndpoint endpointA(ShardId("shardA"), ChunkVersion::IGNORED()); ShardEndpoint endpointB(ShardId("shardB"), ChunkVersion::IGNORED()); - MockNSTargeter targeter; - initTargeterSplitRange(nss, endpointA, endpointB, &targeter); + + auto targeter = initTargeterSplitRange(nss, endpointA, endpointB); // Do multi-target, multi-doc batch write op BatchedCommandRequest request([&] { @@ -583,8 +578,8 @@ TEST_F(BatchWriteOpTest, MultiOpTwoShardsEachUnordered) { NamespaceString nss("foo.bar"); ShardEndpoint endpointA(ShardId("shardA"), ChunkVersion::IGNORED()); ShardEndpoint endpointB(ShardId("shardB"), ChunkVersion::IGNORED()); - MockNSTargeter targeter; - initTargeterSplitRange(nss, endpointA, endpointB, &targeter); + + auto targeter = initTargeterSplitRange(nss, endpointA, endpointB); // Do multi-target, multi-doc batch write op BatchedCommandRequest request([&] { @@ -631,8 +626,8 @@ TEST_F(BatchWriteOpTest, MultiOpOneOrTwoShardsOrdered) { NamespaceString nss("foo.bar"); ShardEndpoint endpointA(ShardId("shardA"), ChunkVersion::IGNORED()); ShardEndpoint endpointB(ShardId("shardB"), ChunkVersion::IGNORED()); - MockNSTargeter targeter; - initTargeterSplitRange(nss, endpointA, endpointB, &targeter); + + auto targeter = initTargeterSplitRange(nss, endpointA, endpointB); BatchedCommandRequest request([&] { write_ops::Delete deleteOp(nss); @@ -726,8 +721,8 @@ TEST_F(BatchWriteOpTest, MultiOpOneOrTwoShardsUnordered) { NamespaceString nss("foo.bar"); ShardEndpoint endpointA(ShardId("shardA"), ChunkVersion::IGNORED()); ShardEndpoint endpointB(ShardId("shardB"), ChunkVersion::IGNORED()); - MockNSTargeter targeter; - initTargeterSplitRange(nss, endpointA, endpointB, &targeter); + + auto targeter = initTargeterSplitRange(nss, endpointA, endpointB); BatchedCommandRequest request([&] { write_ops::Update updateOp(nss); @@ -780,8 +775,8 @@ TEST_F(BatchWriteOpTest, MultiOpSingleShardErrorUnordered) { NamespaceString nss("foo.bar"); ShardEndpoint endpointA(ShardId("shardA"), ChunkVersion::IGNORED()); ShardEndpoint endpointB(ShardId("shardB"), ChunkVersion::IGNORED()); - MockNSTargeter targeter; - initTargeterSplitRange(nss, endpointA, endpointB, &targeter); + + auto targeter = initTargeterSplitRange(nss, endpointA, endpointB); BatchedCommandRequest request([&] { write_ops::Insert insertOp(nss); @@ -841,8 +836,8 @@ TEST_F(BatchWriteOpTest, MultiOpTwoShardErrorsUnordered) { NamespaceString nss("foo.bar"); ShardEndpoint endpointA(ShardId("shardA"), ChunkVersion::IGNORED()); ShardEndpoint endpointB(ShardId("shardB"), ChunkVersion::IGNORED()); - MockNSTargeter targeter; - initTargeterSplitRange(nss, endpointA, endpointB, &targeter); + + auto targeter = initTargeterSplitRange(nss, endpointA, endpointB); BatchedCommandRequest request([&] { write_ops::Insert insertOp(nss); @@ -899,8 +894,8 @@ TEST_F(BatchWriteOpTest, MultiOpPartialSingleShardErrorUnordered) { NamespaceString nss("foo.bar"); ShardEndpoint endpointA(ShardId("shardA"), ChunkVersion::IGNORED()); ShardEndpoint endpointB(ShardId("shardB"), ChunkVersion::IGNORED()); - MockNSTargeter targeter; - initTargeterSplitRange(nss, endpointA, endpointB, &targeter); + + auto targeter = initTargeterSplitRange(nss, endpointA, endpointB); BatchedCommandRequest request([&] { write_ops::Delete deleteOp(nss); @@ -962,8 +957,8 @@ TEST_F(BatchWriteOpTest, MultiOpPartialSingleShardErrorOrdered) { NamespaceString nss("foo.bar"); ShardEndpoint endpointA(ShardId("shardA"), ChunkVersion::IGNORED()); ShardEndpoint endpointB(ShardId("shardB"), ChunkVersion::IGNORED()); - MockNSTargeter targeter; - initTargeterSplitRange(nss, endpointA, endpointB, &targeter); + + auto targeter = initTargeterSplitRange(nss, endpointA, endpointB); BatchedCommandRequest request([&] { write_ops::Delete deleteOp(nss); @@ -1023,8 +1018,8 @@ TEST_F(BatchWriteOpTest, MultiOpPartialSingleShardErrorOrdered) { TEST_F(BatchWriteOpTest, MultiOpErrorAndWriteConcernErrorUnordered) { NamespaceString nss("foo.bar"); ShardEndpoint endpoint(ShardId("shard"), ChunkVersion::IGNORED()); - MockNSTargeter targeter; - initTargeterFullRange(nss, endpoint, &targeter); + + auto targeter = initTargeterFullRange(nss, endpoint); BatchedCommandRequest request([&] { write_ops::Insert insertOp(nss); @@ -1068,8 +1063,8 @@ TEST_F(BatchWriteOpTest, SingleOpErrorAndWriteConcernErrorOrdered) { NamespaceString nss("foo.bar"); ShardEndpoint endpointA(ShardId("shardA"), ChunkVersion::IGNORED()); ShardEndpoint endpointB(ShardId("shardB"), ChunkVersion::IGNORED()); - MockNSTargeter targeter; - initTargeterSplitRange(nss, endpointA, endpointB, &targeter); + + auto targeter = initTargeterSplitRange(nss, endpointA, endpointB); BatchedCommandRequest request([&] { write_ops::Update updateOp(nss); @@ -1123,8 +1118,8 @@ TEST_F(BatchWriteOpTest, SingleOpErrorAndWriteConcernErrorOrdered) { TEST_F(BatchWriteOpTest, MultiOpFailedTargetOrdered) { NamespaceString nss("foo.bar"); ShardEndpoint endpoint(ShardId("shard"), ChunkVersion::IGNORED()); - MockNSTargeter targeter; - initTargeterHalfRange(nss, endpoint, &targeter); + + auto targeter = initTargeterHalfRange(nss, endpoint); BatchedCommandRequest request([&] { write_ops::Insert insertOp(nss); @@ -1178,8 +1173,8 @@ TEST_F(BatchWriteOpTest, MultiOpFailedTargetOrdered) { TEST_F(BatchWriteOpTest, MultiOpFailedTargetUnordered) { NamespaceString nss("foo.bar"); ShardEndpoint endpoint(ShardId("shard"), ChunkVersion::IGNORED()); - MockNSTargeter targeter; - initTargeterHalfRange(nss, endpoint, &targeter); + + auto targeter = initTargeterHalfRange(nss, endpoint); BatchedCommandRequest request([&] { write_ops::Insert insertOp(nss); @@ -1233,8 +1228,8 @@ TEST_F(BatchWriteOpTest, MultiOpFailedBatchOrdered) { NamespaceString nss("foo.bar"); ShardEndpoint endpointA(ShardId("shardA"), ChunkVersion::IGNORED()); ShardEndpoint endpointB(ShardId("shardB"), ChunkVersion::IGNORED()); - MockNSTargeter targeter; - initTargeterSplitRange(nss, endpointA, endpointB, &targeter); + + auto targeter = initTargeterSplitRange(nss, endpointA, endpointB); BatchedCommandRequest request([&] { write_ops::Insert insertOp(nss); @@ -1281,8 +1276,8 @@ TEST_F(BatchWriteOpTest, MultiOpFailedBatchUnordered) { NamespaceString nss("foo.bar"); ShardEndpoint endpointA(ShardId("shardA"), ChunkVersion::IGNORED()); ShardEndpoint endpointB(ShardId("shardB"), ChunkVersion::IGNORED()); - MockNSTargeter targeter; - initTargeterSplitRange(nss, endpointA, endpointB, &targeter); + + auto targeter = initTargeterSplitRange(nss, endpointA, endpointB); BatchedCommandRequest request([&] { write_ops::Insert insertOp(nss); @@ -1338,8 +1333,8 @@ TEST_F(BatchWriteOpTest, MultiOpAbortOrdered) { NamespaceString nss("foo.bar"); ShardEndpoint endpointA(ShardId("shardA"), ChunkVersion::IGNORED()); ShardEndpoint endpointB(ShardId("shardB"), ChunkVersion::IGNORED()); - MockNSTargeter targeter; - initTargeterSplitRange(nss, endpointA, endpointB, &targeter); + + auto targeter = initTargeterSplitRange(nss, endpointA, endpointB); BatchedCommandRequest request([&] { write_ops::Insert insertOp(nss); @@ -1383,8 +1378,8 @@ TEST_F(BatchWriteOpTest, MultiOpAbortUnordered) { NamespaceString nss("foo.bar"); ShardEndpoint endpointA(ShardId("shardA"), ChunkVersion::IGNORED()); ShardEndpoint endpointB(ShardId("shardB"), ChunkVersion::IGNORED()); - MockNSTargeter targeter; - initTargeterSplitRange(nss, endpointA, endpointB, &targeter); + + auto targeter = initTargeterSplitRange(nss, endpointA, endpointB); BatchedCommandRequest request([&] { write_ops::Insert insertOp(nss); @@ -1424,8 +1419,8 @@ TEST_F(BatchWriteOpTest, MultiOpTwoWCErrors) { NamespaceString nss("foo.bar"); ShardEndpoint endpointA(ShardId("shardA"), ChunkVersion::IGNORED()); ShardEndpoint endpointB(ShardId("shardB"), ChunkVersion::IGNORED()); - MockNSTargeter targeter; - initTargeterSplitRange(nss, endpointA, endpointB, &targeter); + + auto targeter = initTargeterSplitRange(nss, endpointA, endpointB); BatchedCommandRequest request([&] { write_ops::Insert insertOp(nss); @@ -1473,8 +1468,8 @@ using BatchWriteOpLimitTests = WriteOpTestFixture; TEST_F(BatchWriteOpLimitTests, OneBigDoc) { NamespaceString nss("foo.bar"); ShardEndpoint endpoint(ShardId("shard"), ChunkVersion::IGNORED()); - MockNSTargeter targeter; - initTargeterFullRange(nss, endpoint, &targeter); + + auto targeter = initTargeterFullRange(nss, endpoint); // Create a BSONObj (slightly) bigger than the maximum size by including a max-size string const std::string bigString(BSONObjMaxUserSize, 'x'); @@ -1509,8 +1504,8 @@ TEST_F(BatchWriteOpLimitTests, OneBigDoc) { TEST_F(BatchWriteOpLimitTests, OneBigOneSmall) { NamespaceString nss("foo.bar"); ShardEndpoint endpoint(ShardId("shard"), ChunkVersion::IGNORED()); - MockNSTargeter targeter; - initTargeterFullRange(nss, endpoint, &targeter); + + auto targeter = initTargeterFullRange(nss, endpoint); // Create a BSONObj (slightly) bigger than the maximum size by including a max-size string const std::string bigString(BSONObjMaxUserSize, 'x'); diff --git a/src/mongo/s/write_ops/mock_ns_targeter.h b/src/mongo/s/write_ops/mock_ns_targeter.h index e2bb79300d5..1eee9dfc0ea 100644 --- a/src/mongo/s/write_ops/mock_ns_targeter.h +++ b/src/mongo/s/write_ops/mock_ns_targeter.h @@ -59,12 +59,10 @@ struct MockRange { */ class MockNSTargeter : public NSTargeter { public: - void init(const NamespaceString& nss, std::vector<MockRange> mockRanges) { - ASSERT(nss.isValid()); - _nss = nss; - - ASSERT(!mockRanges.empty()); - _mockRanges = std::move(mockRanges); + MockNSTargeter(const NamespaceString& nss, std::vector<MockRange> mockRanges) + : _nss(nss), _mockRanges(std::move(mockRanges)) { + ASSERT(_nss.isValid()); + ASSERT(!_mockRanges.empty()); } const NamespaceString& getNS() const { diff --git a/src/mongo/s/write_ops/write_op.cpp b/src/mongo/s/write_ops/write_op.cpp index c1627b70375..d37a4afdd2e 100644 --- a/src/mongo/s/write_ops/write_op.cpp +++ b/src/mongo/s/write_ops/write_op.cpp @@ -95,19 +95,23 @@ Status WriteOp::targetWrites(OperationContext* opCtx, auto& endpoints = swEndpoints.getValue(); for (auto&& endpoint : endpoints) { - _childOps.emplace_back(this); + // if the operation was already successfull on that shard, there is no need to repeat the + // write + if (!_successfulShardSet.count(endpoint.shardName)) { + _childOps.emplace_back(this); - WriteOpRef ref(_itemRef.getItemIndex(), _childOps.size() - 1); + WriteOpRef ref(_itemRef.getItemIndex(), _childOps.size() - 1); - // For now, multiple endpoints imply no versioning - we can't retry half a multi-write - if (endpoints.size() > 1u) { - endpoint.shardVersion = ChunkVersion::IGNORED(); - } + // For now, multiple endpoints imply no versioning - we can't retry half a multi-write + if (endpoints.size() > 1u) { + endpoint.shardVersion = ChunkVersion::IGNORED(); + } - targetedWrites->push_back(new TargetedWrite(std::move(endpoint), ref)); + targetedWrites->push_back(new TargetedWrite(std::move(endpoint), ref)); - _childOps.back().pendingWrite = targetedWrites->back(); - _childOps.back().state = WriteOpState_Pending; + _childOps.back().pendingWrite = targetedWrites->back(); + _childOps.back().state = WriteOpState_Pending; + } } _state = WriteOpState_Pending; @@ -175,8 +179,6 @@ void WriteOp::_updateOpState() { } if (!childErrors.empty() && isRetryError) { - // Since we're using broadcast mode for multi-shard writes, which cannot SCE - invariant(childErrors.size() == 1u); _state = WriteOpState_Ready; } else if (!childErrors.empty()) { _error.reset(new WriteErrorDetail); @@ -213,6 +215,7 @@ void WriteOp::noteWriteComplete(const TargetedWrite& targetedWrite) { const WriteOpRef& ref = targetedWrite.writeOpRef; auto& childOp = _childOps[ref.second]; + _successfulShardSet.emplace(targetedWrite.endpoint.shardName); childOp.pendingWrite = NULL; childOp.endpoint.reset(new ShardEndpoint(targetedWrite.endpoint)); childOp.state = WriteOpState_Completed; diff --git a/src/mongo/s/write_ops/write_op.h b/src/mongo/s/write_ops/write_op.h index 69e320b98d3..85004557953 100644 --- a/src/mongo/s/write_ops/write_op.h +++ b/src/mongo/s/write_ops/write_op.h @@ -177,6 +177,9 @@ private: // filled when state == _Error std::unique_ptr<WriteErrorDetail> _error; + + // stores the shards where this write operation succeeded + stdx::unordered_set<ShardId, ShardId::Hasher> _successfulShardSet; }; /** diff --git a/src/mongo/s/write_ops/write_op_test.cpp b/src/mongo/s/write_ops/write_op_test.cpp index 2ede012b967..a561c312de7 100644 --- a/src/mongo/s/write_ops/write_op_test.cpp +++ b/src/mongo/s/write_ops/write_op_test.cpp @@ -104,8 +104,7 @@ TEST(WriteOpTests, TargetSingle) { WriteOp writeOp(BatchItemRef(&request, 0)); ASSERT_EQUALS(writeOp.getWriteState(), WriteOpState_Ready); - MockNSTargeter targeter; - targeter.init(nss, {MockRange(endpoint, BSON("x" << MINKEY), BSON("x" << MAXKEY))}); + MockNSTargeter targeter(nss, {MockRange(endpoint, BSON("x" << MINKEY), BSON("x" << MAXKEY))}); OwnedPointerVector<TargetedWrite> targetedOwned; std::vector<TargetedWrite*>& targeted = targetedOwned.mutableVector(); @@ -139,11 +138,10 @@ TEST(WriteOpTests, TargetMultiOneShard) { WriteOp writeOp(BatchItemRef(&request, 0)); ASSERT_EQUALS(writeOp.getWriteState(), WriteOpState_Ready); - MockNSTargeter targeter; - targeter.init(nss, - {MockRange(endpointA, BSON("x" << MINKEY), BSON("x" << 0)), - MockRange(endpointB, BSON("x" << 0), BSON("x" << 10)), - MockRange(endpointC, BSON("x" << 10), BSON("x" << MAXKEY))}); + MockNSTargeter targeter(nss, + {MockRange(endpointA, BSON("x" << MINKEY), BSON("x" << 0)), + MockRange(endpointB, BSON("x" << 0), BSON("x" << 10)), + MockRange(endpointC, BSON("x" << 10), BSON("x" << MAXKEY))}); OwnedPointerVector<TargetedWrite> targetedOwned; std::vector<TargetedWrite*>& targeted = targetedOwned.mutableVector(); @@ -178,11 +176,10 @@ TEST(WriteOpTests, TargetMultiAllShards) { WriteOp writeOp(BatchItemRef(&request, 0)); ASSERT_EQUALS(writeOp.getWriteState(), WriteOpState_Ready); - MockNSTargeter targeter; - targeter.init(nss, - {MockRange(endpointA, BSON("x" << MINKEY), BSON("x" << 0)), - MockRange(endpointB, BSON("x" << 0), BSON("x" << 10)), - MockRange(endpointC, BSON("x" << 10), BSON("x" << MAXKEY))}); + MockNSTargeter targeter(nss, + {MockRange(endpointA, BSON("x" << MINKEY), BSON("x" << 0)), + MockRange(endpointB, BSON("x" << 0), BSON("x" << 10)), + MockRange(endpointC, BSON("x" << 10), BSON("x" << MAXKEY))}); OwnedPointerVector<TargetedWrite> targetedOwned; std::vector<TargetedWrite*>& targeted = targetedOwned.mutableVector(); @@ -206,6 +203,55 @@ TEST(WriteOpTests, TargetMultiAllShards) { ASSERT_EQUALS(writeOp.getWriteState(), WriteOpState_Completed); } +TEST(WriteOpTests, TargetMultiAllShardsAndErrorSingleChildOp) { + OperationContextNoop opCtx; + + NamespaceString nss("foo.bar"); + ShardEndpoint endpointA(ShardId("shardA"), ChunkVersion(10, 0, OID())); + ShardEndpoint endpointB(ShardId("shardB"), ChunkVersion(20, 0, OID())); + + BatchedCommandRequest request([&] { + write_ops::Delete deleteOp(nss); + deleteOp.setDeletes({buildDelete(BSON("x" << GTE << -1 << LT << 1), false)}); + return deleteOp; + }()); + + // Do multi-target write op + WriteOp writeOp(BatchItemRef(&request, 0)); + ASSERT_EQUALS(writeOp.getWriteState(), WriteOpState_Ready); + + MockNSTargeter targeter(nss, + {MockRange(endpointA, BSON("x" << MINKEY), BSON("x" << 0)), + MockRange(endpointB, BSON("x" << 0), BSON("x" << MAXKEY))}); + + OwnedPointerVector<TargetedWrite> targetedOwned; + std::vector<TargetedWrite*>& targeted = targetedOwned.mutableVector(); + Status status = writeOp.targetWrites(&opCtx, targeter, &targeted); + + ASSERT(status.isOK()); + ASSERT_EQUALS(writeOp.getWriteState(), WriteOpState_Pending); + ASSERT_EQUALS(targeted.size(), 2u); + sortByEndpoint(&targeted); + ASSERT_EQUALS(targeted[0]->endpoint.shardName, endpointA.shardName); + ASSERT(ChunkVersion::isIgnoredVersion(targeted[0]->endpoint.shardVersion)); + ASSERT_EQUALS(targeted[1]->endpoint.shardName, endpointB.shardName); + ASSERT(ChunkVersion::isIgnoredVersion(targeted[1]->endpoint.shardVersion)); + + // Simulate retryable error. + WriteErrorDetail retryableError; + retryableError.setIndex(0); + retryableError.setStatus({ErrorCodes::StaleShardVersion, "simulate ssv error for test"}); + writeOp.noteWriteError(*targeted[0], retryableError); + + // State should not change until we have result from all nodes. + ASSERT_EQUALS(writeOp.getWriteState(), WriteOpState_Pending); + + writeOp.noteWriteComplete(*targeted[1]); + + // State resets back to ready because of retryable error. + ASSERT_EQUALS(writeOp.getWriteState(), WriteOpState_Ready); +} + // Single error after targeting test TEST(WriteOpTests, ErrorSingle) { OperationContextNoop opCtx; @@ -224,8 +270,7 @@ TEST(WriteOpTests, ErrorSingle) { WriteOp writeOp(BatchItemRef(&request, 0)); ASSERT_EQUALS(writeOp.getWriteState(), WriteOpState_Ready); - MockNSTargeter targeter; - targeter.init(nss, {MockRange(endpoint, BSON("x" << MINKEY), BSON("x" << MAXKEY))}); + MockNSTargeter targeter(nss, {MockRange(endpoint, BSON("x" << MINKEY), BSON("x" << MAXKEY))}); OwnedPointerVector<TargetedWrite> targetedOwned; std::vector<TargetedWrite*>& targeted = targetedOwned.mutableVector(); @@ -265,8 +310,7 @@ TEST(WriteOpTests, CancelSingle) { WriteOp writeOp(BatchItemRef(&request, 0)); ASSERT_EQUALS(writeOp.getWriteState(), WriteOpState_Ready); - MockNSTargeter targeter; - targeter.init(nss, {MockRange(endpoint, BSON("x" << MINKEY), BSON("x" << MAXKEY))}); + MockNSTargeter targeter(nss, {MockRange(endpoint, BSON("x" << MINKEY), BSON("x" << MAXKEY))}); OwnedPointerVector<TargetedWrite> targetedOwned; std::vector<TargetedWrite*>& targeted = targetedOwned.mutableVector(); @@ -304,8 +348,7 @@ TEST(WriteOpTests, RetrySingleOp) { WriteOp writeOp(BatchItemRef(&request, 0)); ASSERT_EQUALS(writeOp.getWriteState(), WriteOpState_Ready); - MockNSTargeter targeter; - targeter.init(nss, {MockRange(endpoint, BSON("x" << MINKEY), BSON("x" << MAXKEY))}); + MockNSTargeter targeter(nss, {MockRange(endpoint, BSON("x" << MINKEY), BSON("x" << MAXKEY))}); OwnedPointerVector<TargetedWrite> targetedOwned; std::vector<TargetedWrite*>& targeted = targetedOwned.mutableVector(); |