From 7ce60b59b24a893bb6494089ea4db8e19901f48d Mon Sep 17 00:00:00 2001 From: kauboy26 Date: Wed, 17 May 2023 20:55:33 +0000 Subject: SERVER-72989 Attach stmtIds to bulkWrite request sent by mongos --- src/mongo/s/write_ops/bulk_write_exec_test.cpp | 212 ++++++++++++++++++++++++- 1 file changed, 209 insertions(+), 3 deletions(-) (limited to 'src/mongo/s/write_ops/bulk_write_exec_test.cpp') diff --git a/src/mongo/s/write_ops/bulk_write_exec_test.cpp b/src/mongo/s/write_ops/bulk_write_exec_test.cpp index 5dcd2ff15e7..c1416d7976c 100644 --- a/src/mongo/s/write_ops/bulk_write_exec_test.cpp +++ b/src/mongo/s/write_ops/bulk_write_exec_test.cpp @@ -813,9 +813,8 @@ TEST_F(BulkWriteOpTest, BuildChildRequestFromTargetedWriteBatch) { NamespaceString nss1("sonate.pacifique"); // Two different endpoints targeting the same shard for the two namespaces. - ShardEndpoint endpoint0(ShardId("shard"), - ShardVersionFactory::make(ChunkVersion::IGNORED(), boost::none), - boost::none); + ShardEndpoint endpoint0( + shardId, ShardVersionFactory::make(ChunkVersion::IGNORED(), boost::none), boost::none); ShardEndpoint endpoint1( shardId, ShardVersionFactory::make(ChunkVersion({OID::gen(), Timestamp(2)}, {10, 11}), @@ -873,6 +872,213 @@ TEST_F(BulkWriteOpTest, BuildChildRequestFromTargetedWriteBatch) { ASSERT_EQUALS(childRequest.getNsInfo()[1].getNs(), request.getNsInfo()[1].getNs()); } +// Tests that stmtIds are correctly attached to bulkWrite requests when the operations +// are ordered. +TEST_F(BulkWriteOpTest, TestOrderedOpsNoExistingStmtIds) { + NamespaceString nss("mgmt.kids"); + + ShardEndpoint endpointA(ShardId("shardA"), + ShardVersionFactory::make(ChunkVersion::IGNORED(), boost::none), + boost::none); + ShardEndpoint endpointB(ShardId("shardB"), + ShardVersionFactory::make(ChunkVersion::IGNORED(), boost::none), + boost::none); + + std::vector> targeters; + targeters.push_back(initTargeterSplitRange(nss, endpointA, endpointB)); + + // Because the operations are ordered, the bulkWrite operations is broken up by shard + // endpoint. In other words, targeting this request will result in two batches: + // 1) to shard A, and then 2) another to shard B after the first batch is complete. + BulkWriteCommandRequest request({BulkWriteInsertOp(0, BSON("x" << -1)), // stmtId 0, shard A + BulkWriteInsertOp(0, BSON("x" << -2)), // stmtId 1, shard A + BulkWriteInsertOp(0, BSON("x" << 1)), // stmtId 2, shard B + BulkWriteInsertOp(0, BSON("x" << 2))}, // stmtId 3, shard B + {NamespaceInfoEntry(nss)}); + request.setOrdered(true); + + // Setting the txnNumber makes it a retryable write. + _opCtx->setLogicalSessionId(LogicalSessionId()); + _opCtx->setTxnNumber(TxnNumber(0)); + BulkWriteOp bulkWriteOp(_opCtx, request); + + std::map> targeted; + ASSERT_OK(bulkWriteOp.target(targeters, false, targeted)); + + auto* batch = targeted.begin()->second.get(); + auto childRequest = bulkWriteOp.buildBulkCommandRequest(*batch); + auto childStmtIds = childRequest.getStmtIds(); + ASSERT_EQUALS(childStmtIds->size(), 2u); + ASSERT_EQUALS(childStmtIds->at(0), 0); + ASSERT_EQUALS(childStmtIds->at(1), 1); + + // Target again to get a batch for the operations to shard B. + targeted.clear(); + ASSERT_OK(bulkWriteOp.target(targeters, false, targeted)); + + batch = targeted.begin()->second.get(); + childRequest = bulkWriteOp.buildBulkCommandRequest(*batch); + childStmtIds = childRequest.getStmtIds(); + ASSERT_EQUALS(childStmtIds->size(), 2u); + ASSERT_EQUALS(childStmtIds->at(0), 2); + ASSERT_EQUALS(childStmtIds->at(1), 3); +} + +// Tests that stmtIds are correctly attached to bulkWrite requests when the operations +// are unordered. +TEST_F(BulkWriteOpTest, TestUnorderedOpsNoExistingStmtIds) { + NamespaceString nss("zero7.spinning"); + + ShardEndpoint endpointA(ShardId("shardA"), + ShardVersionFactory::make(ChunkVersion::IGNORED(), boost::none), + boost::none); + ShardEndpoint endpointB(ShardId("shardB"), + ShardVersionFactory::make(ChunkVersion::IGNORED(), boost::none), + boost::none); + + std::vector> targeters; + targeters.push_back(initTargeterSplitRange(nss, endpointA, endpointB)); + + // Since the ops aren't ordered, two batches are produced on a single targeting call: + // 1) the ops to shard A (op 0 and op 2) are a batch and 2) the ops to shard B (op 1 + // and op 3) are a batch. Therefore the stmtIds in the bulkWrite request sent to the shards + // will be interleaving. + BulkWriteCommandRequest request({BulkWriteInsertOp(0, BSON("x" << -1)), // stmtId 0, shard A + BulkWriteInsertOp(0, BSON("x" << 1)), // stmtId 1, shard B + BulkWriteInsertOp(0, BSON("x" << -1)), // stmtId 2, shard A + BulkWriteInsertOp(0, BSON("x" << 2))}, // stmtId 3, shard B + {NamespaceInfoEntry(nss)}); + request.setOrdered(false); + + // Setting the txnNumber makes it a retryable write. + _opCtx->setLogicalSessionId(LogicalSessionId()); + _opCtx->setTxnNumber(TxnNumber(0)); + BulkWriteOp bulkWriteOp(_opCtx, request); + + std::map> targeted; + ASSERT_OK(bulkWriteOp.target(targeters, false, targeted)); + + // The batch to shard A contains op 0 and op 2. + auto* batch = targeted[ShardId("shardA")].get(); + auto childRequest = bulkWriteOp.buildBulkCommandRequest(*batch); + auto childStmtIds = childRequest.getStmtIds(); + ASSERT_EQUALS(childStmtIds->size(), 2u); + ASSERT_EQUALS(childStmtIds->at(0), 0); + ASSERT_EQUALS(childStmtIds->at(1), 2); + + // The batch to shard B contains op 1 and op 3. + batch = targeted[ShardId("shardB")].get(); + childRequest = bulkWriteOp.buildBulkCommandRequest(*batch); + childStmtIds = childRequest.getStmtIds(); + ASSERT_EQUALS(childStmtIds->size(), 2u); + ASSERT_EQUALS(childStmtIds->at(0), 1); + ASSERT_EQUALS(childStmtIds->at(1), 3); +} + +// Tests that stmtIds are correctly attached to bulkWrite requests when the operations +// are unordered and stmtIds are attached to the request already. +TEST_F(BulkWriteOpTest, TestUnorderedOpsStmtIdsExist) { + NamespaceString nss("zero7.spinning"); + + ShardEndpoint endpointA(ShardId("shardA"), + ShardVersionFactory::make(ChunkVersion::IGNORED(), boost::none), + boost::none); + ShardEndpoint endpointB(ShardId("shardB"), + ShardVersionFactory::make(ChunkVersion::IGNORED(), boost::none), + boost::none); + + std::vector> targeters; + targeters.push_back(initTargeterSplitRange(nss, endpointA, endpointB)); + + // Since the ops aren't ordered, two batches are produced on a single targeting call: + // 1) the ops to shard A (op 0 and op 2) are a batch and 2) the ops to shard B (op 1 + // and op 3) are a batch. Therefore the stmtIds in the bulkWrite request sent to the shards + // will be interleaving. + BulkWriteCommandRequest request({BulkWriteInsertOp(0, BSON("x" << -1)), // stmtId 6, shard A + BulkWriteInsertOp(0, BSON("x" << 1)), // stmtId 7, shard B + BulkWriteInsertOp(0, BSON("x" << -1)), // stmtId 8, shard A + BulkWriteInsertOp(0, BSON("x" << 2))}, // stmtId 9, shard B + {NamespaceInfoEntry(nss)}); + request.setOrdered(false); + request.setStmtIds(std::vector{6, 7, 8, 9}); + + // Setting the txnNumber makes it a retryable write. + _opCtx->setLogicalSessionId(LogicalSessionId()); + _opCtx->setTxnNumber(TxnNumber(0)); + BulkWriteOp bulkWriteOp(_opCtx, request); + + std::map> targeted; + ASSERT_OK(bulkWriteOp.target(targeters, false, targeted)); + + // The batch to shard A contains op 0 and op 2. + auto* batch = targeted[ShardId("shardA")].get(); + auto childRequest = bulkWriteOp.buildBulkCommandRequest(*batch); + auto childStmtIds = childRequest.getStmtIds(); + ASSERT_EQUALS(childStmtIds->size(), 2u); + ASSERT_EQUALS(childStmtIds->at(0), 6); + ASSERT_EQUALS(childStmtIds->at(1), 8); + + // The batch to shard B contains op 1 and op 3. + batch = targeted[ShardId("shardB")].get(); + childRequest = bulkWriteOp.buildBulkCommandRequest(*batch); + childStmtIds = childRequest.getStmtIds(); + ASSERT_EQUALS(childStmtIds->size(), 2u); + ASSERT_EQUALS(childStmtIds->at(0), 7); + ASSERT_EQUALS(childStmtIds->at(1), 9); +} + +// Tests that stmtIds are correctly attached to bulkWrite requests when the operations +// are unordered and the stmtId field exists. +TEST_F(BulkWriteOpTest, TestUnorderedOpsStmtIdFieldExists) { + NamespaceString nss("zero7.spinning"); + + ShardEndpoint endpointA(ShardId("shardA"), + ShardVersionFactory::make(ChunkVersion::IGNORED(), boost::none), + boost::none); + ShardEndpoint endpointB(ShardId("shardB"), + ShardVersionFactory::make(ChunkVersion::IGNORED(), boost::none), + boost::none); + + std::vector> targeters; + targeters.push_back(initTargeterSplitRange(nss, endpointA, endpointB)); + + // Since the ops aren't ordered, two batches are produced on a single targeting call: + // 1) the ops to shard A (op 0 and op 2) are a batch and 2) the ops to shard B (op 1 + // and op 3) are a batch. Therefore the stmtIds in the bulkWrite request sent to the shards + // will be interleaving. + BulkWriteCommandRequest request({BulkWriteInsertOp(0, BSON("x" << -1)), // stmtId 6, shard A + BulkWriteInsertOp(0, BSON("x" << 1)), // stmtId 7, shard B + BulkWriteInsertOp(0, BSON("x" << -1)), // stmtId 8, shard A + BulkWriteInsertOp(0, BSON("x" << 2))}, // stmtId 9, shard B + {NamespaceInfoEntry(nss)}); + request.setOrdered(false); + request.setStmtId(6); // Produces stmtIds 6, 7, 8, 9 + + // Setting the txnNumber makes it a retryable write. + _opCtx->setLogicalSessionId(LogicalSessionId()); + _opCtx->setTxnNumber(TxnNumber(0)); + BulkWriteOp bulkWriteOp(_opCtx, request); + + std::map> targeted; + ASSERT_OK(bulkWriteOp.target(targeters, false, targeted)); + + // The batch to shard A contains op 0 and op 2. + auto* batch = targeted[ShardId("shardA")].get(); + auto childRequest = bulkWriteOp.buildBulkCommandRequest(*batch); + auto childStmtIds = childRequest.getStmtIds(); + ASSERT_EQUALS(childStmtIds->size(), 2u); + ASSERT_EQUALS(childStmtIds->at(0), 6); + ASSERT_EQUALS(childStmtIds->at(1), 8); + + // The batch to shard B contains op 1 and op 3. + batch = targeted[ShardId("shardB")].get(); + childRequest = bulkWriteOp.buildBulkCommandRequest(*batch); + childStmtIds = childRequest.getStmtIds(); + ASSERT_EQUALS(childStmtIds->size(), 2u); + ASSERT_EQUALS(childStmtIds->at(0), 7); + ASSERT_EQUALS(childStmtIds->at(1), 9); +} + // Test BatchItemRef.getLet(). TEST_F(BulkWriteOpTest, BatchItemRefGetLet) { NamespaceString nss("foo.bar"); -- cgit v1.2.1