diff options
author | Kaitlin Mahar <kaitlin.mahar@mongodb.com> | 2023-05-05 21:41:38 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2023-05-05 22:46:09 +0000 |
commit | 74c325e8f49f272bd06343fdb745b9440a59d442 (patch) | |
tree | 4c988e90f71035c20adb4f7b4e3aec4e663cb272 /src | |
parent | 0417f5dae1f8d20caec7646982a49124c2b44c5f (diff) | |
download | mongo-74c325e8f49f272bd06343fdb745b9440a59d442.tar.gz |
SERVER-74096 Add mongos subbatching tests for multi-target ops in bulkWrite command
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/s/write_ops/bulk_write_exec_test.cpp | 268 |
1 files changed, 262 insertions, 6 deletions
diff --git a/src/mongo/s/write_ops/bulk_write_exec_test.cpp b/src/mongo/s/write_ops/bulk_write_exec_test.cpp index c63579a0db4..4fa38aabe40 100644 --- a/src/mongo/s/write_ops/bulk_write_exec_test.cpp +++ b/src/mongo/s/write_ops/bulk_write_exec_test.cpp @@ -433,12 +433,268 @@ TEST_F(BulkWriteOpTest, TargetMultiOpsOrdered_DifferentShard) { ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(4).getWriteState(), WriteOpState_Pending); } -// TODO(SERVER-74096): Test sub-batching logic with multi-target writes. -// 1. Test targeting ordered ops where a multi-target sub-batch must only contain writes for a -// single write op. -// 2. Test targeting unordered ops of the same namespace that target the same shard under with two -// different endpoints/shardVersions. This happens when a bulkWrite includes a multi-target write -// and a single-target write. +// Test targeting ordered ops where a multi-target sub-batch must only contain writes for a +// single write op. +TEST_F(BulkWriteOpTest, TargetMultiTargetOpsOrdered) { + ShardId shardIdA("shardA"); + ShardId shardIdB("shardB"); + NamespaceString nss0("foo.bar"); + ShardEndpoint endpointA( + shardIdA, ShardVersionFactory::make(ChunkVersion::IGNORED(), boost::none), boost::none); + ShardEndpoint endpointB( + shardIdB, ShardVersionFactory::make(ChunkVersion::IGNORED(), boost::none), boost::none); + + std::vector<std::unique_ptr<NSTargeter>> targeters; + targeters.push_back(initTargeterSplitRange(nss0, endpointA, endpointB)); + + // Ordered update and delete ops. We place multi-target ops in between single-target ops to the + // same shards, to ensure we correctly separate the multi-target ops into their own batches. + // Expected targets: + // ops[0] -> shardA + // ops[1] -> shardA and shardB + // ops[2] -> shardB + // ops[3] -> shardB + // ops[4] -> shardA and shardB + // ops[5] -> shardA + BulkWriteCommandRequest request( + { + BulkWriteUpdateOp(0, BSON("x" << -1), BSON("$set" << BSON("z" << 3))), + BulkWriteUpdateOp( + 0, BSON("x" << BSON("$gte" << -5 << "$lt" << 5)), BSON("$set" << BSON("y" << 2))), + BulkWriteUpdateOp(0, BSON("x" << 1), BSON("$set" << BSON("z" << 3))), + BulkWriteDeleteOp(0, BSON("x" << 1)), + BulkWriteDeleteOp(0, BSON("x" << BSON("$gte" << -5 << "$lt" << 5))), + BulkWriteDeleteOp(0, BSON("x" << -1)), + }, + {NamespaceInfoEntry(nss0)}); + + BulkWriteOp bulkWriteOp(_opCtx, request); + + // The resulting batches should be: + // {shardA: [ops[0]} + // {shardA: [ops[1]]}, {shardB: [ops[1]]} + // {shardB: [ops[2], ops[3]]} + // {shardA: [ops[4]]}, {shardB: [ops[4]]} + // {shardA: [ops[5]]} + + TargetedBatchMap targeted; + + // {shardA: [ops[0]} + ASSERT_OK(bulkWriteOp.target(targeters, false, targeted)); + ASSERT_EQUALS(targeted.size(), 1u); + ASSERT_EQUALS(targeted[shardIdA]->getWrites().size(), 1u); + ASSERT_EQUALS(targeted[shardIdA]->getWrites()[0]->writeOpRef.first, 0); + assertEndpointsEqual(targeted[shardIdA]->getWrites()[0]->endpoint, endpointA); + ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(0).getWriteState(), WriteOpState_Pending); + ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(1).getWriteState(), WriteOpState_Ready); + ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(2).getWriteState(), WriteOpState_Ready); + ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(3).getWriteState(), WriteOpState_Ready); + ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(4).getWriteState(), WriteOpState_Ready); + ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(5).getWriteState(), WriteOpState_Ready); + + targeted.clear(); + + // {shardA: [ops[1]]}, {shardB: [ops[1]]} + ASSERT_OK(bulkWriteOp.target(targeters, false, targeted)); + ASSERT_EQUALS(targeted.size(), 2u); + ASSERT_EQUALS(targeted[shardIdA]->getWrites().size(), 1u); + ASSERT_EQUALS(targeted[shardIdA]->getWrites()[0]->writeOpRef.first, 1); + assertEndpointsEqual(targeted[shardIdA]->getWrites()[0]->endpoint, endpointA); + ASSERT_EQUALS(targeted[shardIdB]->getWrites().size(), 1u); + ASSERT_EQUALS(targeted[shardIdB]->getWrites()[0]->writeOpRef.first, 1); + assertEndpointsEqual(targeted[shardIdB]->getWrites()[0]->endpoint, endpointB); + ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(0).getWriteState(), WriteOpState_Pending); + ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(1).getWriteState(), WriteOpState_Pending); + ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(2).getWriteState(), WriteOpState_Ready); + ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(3).getWriteState(), WriteOpState_Ready); + ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(4).getWriteState(), WriteOpState_Ready); + ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(5).getWriteState(), WriteOpState_Ready); + + targeted.clear(); + + // {shardB: [ops[2], ops[3]]} + ASSERT_OK(bulkWriteOp.target(targeters, false, targeted)); + ASSERT_EQUALS(targeted.size(), 1u); + ASSERT_EQUALS(targeted[shardIdB]->getWrites().size(), 2u); + assertEndpointsEqual(targeted[shardIdB]->getWrites()[0]->endpoint, endpointB); + ASSERT_EQUALS(targeted[shardIdB]->getWrites()[0]->writeOpRef.first, 2); + assertEndpointsEqual(targeted[shardIdB]->getWrites()[1]->endpoint, endpointB); + ASSERT_EQUALS(targeted[shardIdB]->getWrites()[1]->writeOpRef.first, 3); + ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(0).getWriteState(), WriteOpState_Pending); + ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(1).getWriteState(), WriteOpState_Pending); + ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(2).getWriteState(), WriteOpState_Pending); + ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(3).getWriteState(), WriteOpState_Pending); + ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(4).getWriteState(), WriteOpState_Ready); + ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(5).getWriteState(), WriteOpState_Ready); + + targeted.clear(); + + // {shardA: [ops[4]]}, {shardB: [ops[4]]} + ASSERT_OK(bulkWriteOp.target(targeters, false, targeted)); + ASSERT_EQUALS(targeted.size(), 2u); + ASSERT_EQUALS(targeted[shardIdA]->getWrites().size(), 1u); + ASSERT_EQUALS(targeted[shardIdA]->getWrites()[0]->writeOpRef.first, 4); + assertEndpointsEqual(targeted[shardIdA]->getWrites()[0]->endpoint, endpointA); + ASSERT_EQUALS(targeted[shardIdB]->getWrites().size(), 1u); + ASSERT_EQUALS(targeted[shardIdB]->getWrites()[0]->writeOpRef.first, 4); + assertEndpointsEqual(targeted[shardIdB]->getWrites()[0]->endpoint, endpointB); + ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(0).getWriteState(), WriteOpState_Pending); + ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(1).getWriteState(), WriteOpState_Pending); + ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(2).getWriteState(), WriteOpState_Pending); + ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(3).getWriteState(), WriteOpState_Pending); + ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(4).getWriteState(), WriteOpState_Pending); + ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(5).getWriteState(), WriteOpState_Ready); + + + targeted.clear(); + + // {shardA: [ops[5]]} + ASSERT_OK(bulkWriteOp.target(targeters, false, targeted)); + ASSERT_EQUALS(targeted.size(), 1u); + ASSERT_EQUALS(targeted[shardIdA]->getWrites().size(), 1u); + ASSERT_EQUALS(targeted[shardIdA]->getWrites()[0]->writeOpRef.first, 5); + assertEndpointsEqual(targeted[shardIdA]->getWrites()[0]->endpoint, endpointA); + ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(0).getWriteState(), WriteOpState_Pending); + ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(1).getWriteState(), WriteOpState_Pending); + ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(2).getWriteState(), WriteOpState_Pending); + ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(3).getWriteState(), WriteOpState_Pending); + ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(4).getWriteState(), WriteOpState_Pending); + ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(5).getWriteState(), WriteOpState_Pending); +} + +// Test targeting unordered ops of the same namespace that target the same shard/endpoint under two +// different shardVersions. +// As discussed in SERVER-34347, because of the way that (non-transactional) multi-target writes +// disregard the shardVersion and use ChunkVersion::IGNORED, we cannot have together in a single +// sub-batch an op for a multi-target write *and* an op for a single-target write that target +// the same endpoint, because the single target write will use the actual shardVersion. +TEST_F(BulkWriteOpTest, TargetMultiOpsUnordered_OneShard_TwoEndpoints) { + ShardId shardIdA("shardA"); + ShardId shardIdB("shardB"); + NamespaceString nss0("foo.bar"); + + // The endpoints we'll use for our targeter. + ShardEndpoint endpointA( + shardIdA, + ShardVersionFactory::make(ChunkVersion({OID::gen(), Timestamp(2)}, {10, 11}), + boost::optional<CollectionIndexes>(boost::none)), + boost::none); + ShardEndpoint endpointB( + shardIdB, + ShardVersionFactory::make(ChunkVersion({OID::gen(), Timestamp(3)}, {11, 12}), + boost::optional<CollectionIndexes>(boost::none)), + boost::none); + + std::vector<std::unique_ptr<NSTargeter>> targeters; + targeters.push_back(initTargeterSplitRange(nss0, endpointA, endpointB)); + + + // Used for assertions below; equivalent to the endpoints that multi-target ops will use (same + // as those above but no shard version.) + ShardEndpoint endpointANoVersion( + shardIdA, ShardVersionFactory::make(ChunkVersion::IGNORED(), boost::none), boost::none); + ShardEndpoint endpointBNoVersion( + shardIdB, ShardVersionFactory::make(ChunkVersion::IGNORED(), boost::none), boost::none); + + // We expect the ops to target the following endpoints with/without shardVersion as indicated: + // ops[0] -> A, shardVersion included + // ops[1] -> A shardVersion ignored, B shardVersion ignored + // ops[2] -> B, shardVersion included + // ops[3] -> A shardVersion ignored, B shardVersion ignored + // ops[4] -> A shardVersion included + + // Due to the interleaving of ops, each op should end up split into its own sub-batch, since no + // two consecutive ops target the same endpoint with the same shardVersion. + BulkWriteCommandRequest request( + { + BulkWriteUpdateOp(0, BSON("x" << -1), BSON("$set" << BSON("z" << 3))), + BulkWriteUpdateOp( + 0, BSON("x" << BSON("$gte" << -5 << "$lt" << 5)), BSON("$set" << BSON("y" << 2))), + BulkWriteDeleteOp(0, BSON("x" << 1)), + BulkWriteDeleteOp(0, BSON("x" << BSON("$gte" << -5 << "$lt" << 5))), + BulkWriteInsertOp(0, BSON("x" << -2)), + }, + {NamespaceInfoEntry(nss0)}); + request.setOrdered(false); + + BulkWriteOp bulkWriteOp(_opCtx, request); + + TargetedBatchMap targeted; + + // batch with ops[0] + ASSERT_OK(bulkWriteOp.target(targeters, false, targeted)); + ASSERT_EQUALS(targeted.size(), 1u); + ASSERT_EQUALS(targeted[shardIdA]->getWrites().size(), 1); + ASSERT_EQUALS(targeted[shardIdA]->getWrites()[0]->writeOpRef.first, 0); + assertEndpointsEqual(targeted[shardIdA]->getWrites()[0]->endpoint, endpointA); + ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(0).getWriteState(), WriteOpState_Pending); + ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(1).getWriteState(), WriteOpState_Ready); + ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(2).getWriteState(), WriteOpState_Ready); + ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(3).getWriteState(), WriteOpState_Ready); + ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(4).getWriteState(), WriteOpState_Ready); + + targeted.clear(); + + // batch with ops[1] + ASSERT_OK(bulkWriteOp.target(targeters, false, targeted)); + ASSERT_EQUALS(targeted.size(), 2u); + ASSERT_EQUALS(targeted[shardIdA]->getWrites().size(), 1); + ASSERT_EQUALS(targeted[shardIdA]->getWrites()[0]->writeOpRef.first, 1); + assertEndpointsEqual(targeted[shardIdA]->getWrites()[0]->endpoint, endpointANoVersion); + ASSERT_EQUALS(targeted[shardIdB]->getWrites().size(), 1); + ASSERT_EQUALS(targeted[shardIdB]->getWrites()[0]->writeOpRef.first, 1); + assertEndpointsEqual(targeted[shardIdB]->getWrites()[0]->endpoint, endpointBNoVersion); + ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(0).getWriteState(), WriteOpState_Pending); + ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(1).getWriteState(), WriteOpState_Pending); + ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(2).getWriteState(), WriteOpState_Ready); + ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(3).getWriteState(), WriteOpState_Ready); + ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(4).getWriteState(), WriteOpState_Ready); + + targeted.clear(); + + // batch with ops[2] + ASSERT_OK(bulkWriteOp.target(targeters, false, targeted)); + ASSERT_EQUALS(targeted.size(), 1u); + ASSERT_EQUALS(targeted[shardIdB]->getWrites().size(), 1); + ASSERT_EQUALS(targeted[shardIdB]->getWrites()[0]->writeOpRef.first, 2); + assertEndpointsEqual(targeted[shardIdB]->getWrites()[0]->endpoint, endpointB); + ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(0).getWriteState(), WriteOpState_Pending); + ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(1).getWriteState(), WriteOpState_Pending); + ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(2).getWriteState(), WriteOpState_Pending); + ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(3).getWriteState(), WriteOpState_Ready); + ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(4).getWriteState(), WriteOpState_Ready); + + targeted.clear(); + + // batch with ops[3] + ASSERT_OK(bulkWriteOp.target(targeters, false, targeted)); + ASSERT_EQUALS(targeted.size(), 2u); + ASSERT_EQUALS(targeted[shardIdA]->getWrites().size(), 1); + ASSERT_EQUALS(targeted[shardIdA]->getWrites()[0]->writeOpRef.first, 3); + assertEndpointsEqual(targeted[shardIdA]->getWrites()[0]->endpoint, endpointANoVersion); + ASSERT_EQUALS(targeted[shardIdB]->getWrites().size(), 1); + ASSERT_EQUALS(targeted[shardIdB]->getWrites()[0]->writeOpRef.first, 3); + assertEndpointsEqual(targeted[shardIdB]->getWrites()[0]->endpoint, endpointBNoVersion); + ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(0).getWriteState(), WriteOpState_Pending); + ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(1).getWriteState(), WriteOpState_Pending); + ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(2).getWriteState(), WriteOpState_Pending); + ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(3).getWriteState(), WriteOpState_Pending); + ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(4).getWriteState(), WriteOpState_Ready); + + targeted.clear(); + + // batch with ops[4] + ASSERT_OK(bulkWriteOp.target(targeters, false, targeted)); + ASSERT_EQUALS(targeted.size(), 1u); + ASSERT_EQUALS(targeted[shardIdA]->getWrites().size(), 1); + ASSERT_EQUALS(targeted[shardIdA]->getWrites()[0]->writeOpRef.first, 4); + assertEndpointsEqual(targeted[shardIdA]->getWrites()[0]->endpoint, endpointA); + ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(0).getWriteState(), WriteOpState_Pending); + ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(1).getWriteState(), WriteOpState_Pending); + ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(2).getWriteState(), WriteOpState_Pending); + ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(3).getWriteState(), WriteOpState_Pending); + ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(4).getWriteState(), WriteOpState_Pending); +} // Test multiple unordered ops that target two different shards. TEST_F(BulkWriteOpTest, TargetMultiOpsUnordered) { |