summaryrefslogtreecommitdiff
path: root/src/mongo/s
diff options
context:
space:
mode:
authorKaitlin Mahar <kaitlin.mahar@mongodb.com>2023-05-05 21:41:38 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2023-05-05 22:46:09 +0000
commit74c325e8f49f272bd06343fdb745b9440a59d442 (patch)
tree4c988e90f71035c20adb4f7b4e3aec4e663cb272 /src/mongo/s
parent0417f5dae1f8d20caec7646982a49124c2b44c5f (diff)
downloadmongo-74c325e8f49f272bd06343fdb745b9440a59d442.tar.gz
SERVER-74096 Add mongos subbatching tests for multi-target ops in bulkWrite command
Diffstat (limited to 'src/mongo/s')
-rw-r--r--src/mongo/s/write_ops/bulk_write_exec_test.cpp268
1 files changed, 262 insertions, 6 deletions
diff --git a/src/mongo/s/write_ops/bulk_write_exec_test.cpp b/src/mongo/s/write_ops/bulk_write_exec_test.cpp
index c63579a0db4..4fa38aabe40 100644
--- a/src/mongo/s/write_ops/bulk_write_exec_test.cpp
+++ b/src/mongo/s/write_ops/bulk_write_exec_test.cpp
@@ -433,12 +433,268 @@ TEST_F(BulkWriteOpTest, TargetMultiOpsOrdered_DifferentShard) {
ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(4).getWriteState(), WriteOpState_Pending);
}
-// TODO(SERVER-74096): Test sub-batching logic with multi-target writes.
-// 1. Test targeting ordered ops where a multi-target sub-batch must only contain writes for a
-// single write op.
-// 2. Test targeting unordered ops of the same namespace that target the same shard under with two
-// different endpoints/shardVersions. This happens when a bulkWrite includes a multi-target write
-// and a single-target write.
+// Test targeting ordered ops where a multi-target sub-batch must only contain writes for a
+// single write op.
+TEST_F(BulkWriteOpTest, TargetMultiTargetOpsOrdered) {
+ ShardId shardIdA("shardA");
+ ShardId shardIdB("shardB");
+ NamespaceString nss0("foo.bar");
+ ShardEndpoint endpointA(
+ shardIdA, ShardVersionFactory::make(ChunkVersion::IGNORED(), boost::none), boost::none);
+ ShardEndpoint endpointB(
+ shardIdB, ShardVersionFactory::make(ChunkVersion::IGNORED(), boost::none), boost::none);
+
+ std::vector<std::unique_ptr<NSTargeter>> targeters;
+ targeters.push_back(initTargeterSplitRange(nss0, endpointA, endpointB));
+
+ // Ordered update and delete ops. We place multi-target ops in between single-target ops to the
+ // same shards, to ensure we correctly separate the multi-target ops into their own batches.
+ // Expected targets:
+ // ops[0] -> shardA
+ // ops[1] -> shardA and shardB
+ // ops[2] -> shardB
+ // ops[3] -> shardB
+ // ops[4] -> shardA and shardB
+ // ops[5] -> shardA
+ BulkWriteCommandRequest request(
+ {
+ BulkWriteUpdateOp(0, BSON("x" << -1), BSON("$set" << BSON("z" << 3))),
+ BulkWriteUpdateOp(
+ 0, BSON("x" << BSON("$gte" << -5 << "$lt" << 5)), BSON("$set" << BSON("y" << 2))),
+ BulkWriteUpdateOp(0, BSON("x" << 1), BSON("$set" << BSON("z" << 3))),
+ BulkWriteDeleteOp(0, BSON("x" << 1)),
+ BulkWriteDeleteOp(0, BSON("x" << BSON("$gte" << -5 << "$lt" << 5))),
+ BulkWriteDeleteOp(0, BSON("x" << -1)),
+ },
+ {NamespaceInfoEntry(nss0)});
+
+ BulkWriteOp bulkWriteOp(_opCtx, request);
+
+ // The resulting batches should be:
+ // {shardA: [ops[0]}
+ // {shardA: [ops[1]]}, {shardB: [ops[1]]}
+ // {shardB: [ops[2], ops[3]]}
+ // {shardA: [ops[4]]}, {shardB: [ops[4]]}
+ // {shardA: [ops[5]]}
+
+ TargetedBatchMap targeted;
+
+ // {shardA: [ops[0]}
+ ASSERT_OK(bulkWriteOp.target(targeters, false, targeted));
+ ASSERT_EQUALS(targeted.size(), 1u);
+ ASSERT_EQUALS(targeted[shardIdA]->getWrites().size(), 1u);
+ ASSERT_EQUALS(targeted[shardIdA]->getWrites()[0]->writeOpRef.first, 0);
+ assertEndpointsEqual(targeted[shardIdA]->getWrites()[0]->endpoint, endpointA);
+ ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(0).getWriteState(), WriteOpState_Pending);
+ ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(1).getWriteState(), WriteOpState_Ready);
+ ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(2).getWriteState(), WriteOpState_Ready);
+ ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(3).getWriteState(), WriteOpState_Ready);
+ ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(4).getWriteState(), WriteOpState_Ready);
+ ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(5).getWriteState(), WriteOpState_Ready);
+
+ targeted.clear();
+
+ // {shardA: [ops[1]]}, {shardB: [ops[1]]}
+ ASSERT_OK(bulkWriteOp.target(targeters, false, targeted));
+ ASSERT_EQUALS(targeted.size(), 2u);
+ ASSERT_EQUALS(targeted[shardIdA]->getWrites().size(), 1u);
+ ASSERT_EQUALS(targeted[shardIdA]->getWrites()[0]->writeOpRef.first, 1);
+ assertEndpointsEqual(targeted[shardIdA]->getWrites()[0]->endpoint, endpointA);
+ ASSERT_EQUALS(targeted[shardIdB]->getWrites().size(), 1u);
+ ASSERT_EQUALS(targeted[shardIdB]->getWrites()[0]->writeOpRef.first, 1);
+ assertEndpointsEqual(targeted[shardIdB]->getWrites()[0]->endpoint, endpointB);
+ ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(0).getWriteState(), WriteOpState_Pending);
+ ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(1).getWriteState(), WriteOpState_Pending);
+ ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(2).getWriteState(), WriteOpState_Ready);
+ ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(3).getWriteState(), WriteOpState_Ready);
+ ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(4).getWriteState(), WriteOpState_Ready);
+ ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(5).getWriteState(), WriteOpState_Ready);
+
+ targeted.clear();
+
+ // {shardB: [ops[2], ops[3]]}
+ ASSERT_OK(bulkWriteOp.target(targeters, false, targeted));
+ ASSERT_EQUALS(targeted.size(), 1u);
+ ASSERT_EQUALS(targeted[shardIdB]->getWrites().size(), 2u);
+ assertEndpointsEqual(targeted[shardIdB]->getWrites()[0]->endpoint, endpointB);
+ ASSERT_EQUALS(targeted[shardIdB]->getWrites()[0]->writeOpRef.first, 2);
+ assertEndpointsEqual(targeted[shardIdB]->getWrites()[1]->endpoint, endpointB);
+ ASSERT_EQUALS(targeted[shardIdB]->getWrites()[1]->writeOpRef.first, 3);
+ ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(0).getWriteState(), WriteOpState_Pending);
+ ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(1).getWriteState(), WriteOpState_Pending);
+ ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(2).getWriteState(), WriteOpState_Pending);
+ ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(3).getWriteState(), WriteOpState_Pending);
+ ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(4).getWriteState(), WriteOpState_Ready);
+ ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(5).getWriteState(), WriteOpState_Ready);
+
+ targeted.clear();
+
+ // {shardA: [ops[4]]}, {shardB: [ops[4]]}
+ ASSERT_OK(bulkWriteOp.target(targeters, false, targeted));
+ ASSERT_EQUALS(targeted.size(), 2u);
+ ASSERT_EQUALS(targeted[shardIdA]->getWrites().size(), 1u);
+ ASSERT_EQUALS(targeted[shardIdA]->getWrites()[0]->writeOpRef.first, 4);
+ assertEndpointsEqual(targeted[shardIdA]->getWrites()[0]->endpoint, endpointA);
+ ASSERT_EQUALS(targeted[shardIdB]->getWrites().size(), 1u);
+ ASSERT_EQUALS(targeted[shardIdB]->getWrites()[0]->writeOpRef.first, 4);
+ assertEndpointsEqual(targeted[shardIdB]->getWrites()[0]->endpoint, endpointB);
+ ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(0).getWriteState(), WriteOpState_Pending);
+ ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(1).getWriteState(), WriteOpState_Pending);
+ ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(2).getWriteState(), WriteOpState_Pending);
+ ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(3).getWriteState(), WriteOpState_Pending);
+ ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(4).getWriteState(), WriteOpState_Pending);
+ ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(5).getWriteState(), WriteOpState_Ready);
+
+
+ targeted.clear();
+
+ // {shardA: [ops[5]]}
+ ASSERT_OK(bulkWriteOp.target(targeters, false, targeted));
+ ASSERT_EQUALS(targeted.size(), 1u);
+ ASSERT_EQUALS(targeted[shardIdA]->getWrites().size(), 1u);
+ ASSERT_EQUALS(targeted[shardIdA]->getWrites()[0]->writeOpRef.first, 5);
+ assertEndpointsEqual(targeted[shardIdA]->getWrites()[0]->endpoint, endpointA);
+ ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(0).getWriteState(), WriteOpState_Pending);
+ ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(1).getWriteState(), WriteOpState_Pending);
+ ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(2).getWriteState(), WriteOpState_Pending);
+ ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(3).getWriteState(), WriteOpState_Pending);
+ ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(4).getWriteState(), WriteOpState_Pending);
+ ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(5).getWriteState(), WriteOpState_Pending);
+}
+
+// Test targeting unordered ops of the same namespace that target the same shard/endpoint under two
+// different shardVersions.
+// As discussed in SERVER-34347, because of the way that (non-transactional) multi-target writes
+// disregard the shardVersion and use ChunkVersion::IGNORED, we cannot have together in a single
+// sub-batch an op for a multi-target write *and* an op for a single-target write that target
+// the same endpoint, because the single target write will use the actual shardVersion.
+TEST_F(BulkWriteOpTest, TargetMultiOpsUnordered_OneShard_TwoEndpoints) {
+ ShardId shardIdA("shardA");
+ ShardId shardIdB("shardB");
+ NamespaceString nss0("foo.bar");
+
+ // The endpoints we'll use for our targeter.
+ ShardEndpoint endpointA(
+ shardIdA,
+ ShardVersionFactory::make(ChunkVersion({OID::gen(), Timestamp(2)}, {10, 11}),
+ boost::optional<CollectionIndexes>(boost::none)),
+ boost::none);
+ ShardEndpoint endpointB(
+ shardIdB,
+ ShardVersionFactory::make(ChunkVersion({OID::gen(), Timestamp(3)}, {11, 12}),
+ boost::optional<CollectionIndexes>(boost::none)),
+ boost::none);
+
+ std::vector<std::unique_ptr<NSTargeter>> targeters;
+ targeters.push_back(initTargeterSplitRange(nss0, endpointA, endpointB));
+
+
+ // Used for assertions below; equivalent to the endpoints that multi-target ops will use (same
+ // as those above but no shard version.)
+ ShardEndpoint endpointANoVersion(
+ shardIdA, ShardVersionFactory::make(ChunkVersion::IGNORED(), boost::none), boost::none);
+ ShardEndpoint endpointBNoVersion(
+ shardIdB, ShardVersionFactory::make(ChunkVersion::IGNORED(), boost::none), boost::none);
+
+ // We expect the ops to target the following endpoints with/without shardVersion as indicated:
+ // ops[0] -> A, shardVersion included
+ // ops[1] -> A shardVersion ignored, B shardVersion ignored
+ // ops[2] -> B, shardVersion included
+ // ops[3] -> A shardVersion ignored, B shardVersion ignored
+ // ops[4] -> A shardVersion included
+
+ // Due to the interleaving of ops, each op should end up split into its own sub-batch, since no
+ // two consecutive ops target the same endpoint with the same shardVersion.
+ BulkWriteCommandRequest request(
+ {
+ BulkWriteUpdateOp(0, BSON("x" << -1), BSON("$set" << BSON("z" << 3))),
+ BulkWriteUpdateOp(
+ 0, BSON("x" << BSON("$gte" << -5 << "$lt" << 5)), BSON("$set" << BSON("y" << 2))),
+ BulkWriteDeleteOp(0, BSON("x" << 1)),
+ BulkWriteDeleteOp(0, BSON("x" << BSON("$gte" << -5 << "$lt" << 5))),
+ BulkWriteInsertOp(0, BSON("x" << -2)),
+ },
+ {NamespaceInfoEntry(nss0)});
+ request.setOrdered(false);
+
+ BulkWriteOp bulkWriteOp(_opCtx, request);
+
+ TargetedBatchMap targeted;
+
+ // batch with ops[0]
+ ASSERT_OK(bulkWriteOp.target(targeters, false, targeted));
+ ASSERT_EQUALS(targeted.size(), 1u);
+ ASSERT_EQUALS(targeted[shardIdA]->getWrites().size(), 1);
+ ASSERT_EQUALS(targeted[shardIdA]->getWrites()[0]->writeOpRef.first, 0);
+ assertEndpointsEqual(targeted[shardIdA]->getWrites()[0]->endpoint, endpointA);
+ ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(0).getWriteState(), WriteOpState_Pending);
+ ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(1).getWriteState(), WriteOpState_Ready);
+ ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(2).getWriteState(), WriteOpState_Ready);
+ ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(3).getWriteState(), WriteOpState_Ready);
+ ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(4).getWriteState(), WriteOpState_Ready);
+
+ targeted.clear();
+
+ // batch with ops[1]
+ ASSERT_OK(bulkWriteOp.target(targeters, false, targeted));
+ ASSERT_EQUALS(targeted.size(), 2u);
+ ASSERT_EQUALS(targeted[shardIdA]->getWrites().size(), 1);
+ ASSERT_EQUALS(targeted[shardIdA]->getWrites()[0]->writeOpRef.first, 1);
+ assertEndpointsEqual(targeted[shardIdA]->getWrites()[0]->endpoint, endpointANoVersion);
+ ASSERT_EQUALS(targeted[shardIdB]->getWrites().size(), 1);
+ ASSERT_EQUALS(targeted[shardIdB]->getWrites()[0]->writeOpRef.first, 1);
+ assertEndpointsEqual(targeted[shardIdB]->getWrites()[0]->endpoint, endpointBNoVersion);
+ ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(0).getWriteState(), WriteOpState_Pending);
+ ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(1).getWriteState(), WriteOpState_Pending);
+ ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(2).getWriteState(), WriteOpState_Ready);
+ ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(3).getWriteState(), WriteOpState_Ready);
+ ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(4).getWriteState(), WriteOpState_Ready);
+
+ targeted.clear();
+
+ // batch with ops[2]
+ ASSERT_OK(bulkWriteOp.target(targeters, false, targeted));
+ ASSERT_EQUALS(targeted.size(), 1u);
+ ASSERT_EQUALS(targeted[shardIdB]->getWrites().size(), 1);
+ ASSERT_EQUALS(targeted[shardIdB]->getWrites()[0]->writeOpRef.first, 2);
+ assertEndpointsEqual(targeted[shardIdB]->getWrites()[0]->endpoint, endpointB);
+ ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(0).getWriteState(), WriteOpState_Pending);
+ ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(1).getWriteState(), WriteOpState_Pending);
+ ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(2).getWriteState(), WriteOpState_Pending);
+ ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(3).getWriteState(), WriteOpState_Ready);
+ ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(4).getWriteState(), WriteOpState_Ready);
+
+ targeted.clear();
+
+ // batch with ops[3]
+ ASSERT_OK(bulkWriteOp.target(targeters, false, targeted));
+ ASSERT_EQUALS(targeted.size(), 2u);
+ ASSERT_EQUALS(targeted[shardIdA]->getWrites().size(), 1);
+ ASSERT_EQUALS(targeted[shardIdA]->getWrites()[0]->writeOpRef.first, 3);
+ assertEndpointsEqual(targeted[shardIdA]->getWrites()[0]->endpoint, endpointANoVersion);
+ ASSERT_EQUALS(targeted[shardIdB]->getWrites().size(), 1);
+ ASSERT_EQUALS(targeted[shardIdB]->getWrites()[0]->writeOpRef.first, 3);
+ assertEndpointsEqual(targeted[shardIdB]->getWrites()[0]->endpoint, endpointBNoVersion);
+ ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(0).getWriteState(), WriteOpState_Pending);
+ ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(1).getWriteState(), WriteOpState_Pending);
+ ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(2).getWriteState(), WriteOpState_Pending);
+ ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(3).getWriteState(), WriteOpState_Pending);
+ ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(4).getWriteState(), WriteOpState_Ready);
+
+ targeted.clear();
+
+ // batch with ops[4]
+ ASSERT_OK(bulkWriteOp.target(targeters, false, targeted));
+ ASSERT_EQUALS(targeted.size(), 1u);
+ ASSERT_EQUALS(targeted[shardIdA]->getWrites().size(), 1);
+ ASSERT_EQUALS(targeted[shardIdA]->getWrites()[0]->writeOpRef.first, 4);
+ assertEndpointsEqual(targeted[shardIdA]->getWrites()[0]->endpoint, endpointA);
+ ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(0).getWriteState(), WriteOpState_Pending);
+ ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(1).getWriteState(), WriteOpState_Pending);
+ ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(2).getWriteState(), WriteOpState_Pending);
+ ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(3).getWriteState(), WriteOpState_Pending);
+ ASSERT_EQUALS(bulkWriteOp.getWriteOp_forTest(4).getWriteState(), WriteOpState_Pending);
+}
// Test multiple unordered ops that target two different shards.
TEST_F(BulkWriteOpTest, TargetMultiOpsUnordered) {