summaryrefslogtreecommitdiff
path: root/src/mongo/s/write_ops/bulk_write_exec_test.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/s/write_ops/bulk_write_exec_test.cpp')
-rw-r--r--src/mongo/s/write_ops/bulk_write_exec_test.cpp212
1 files changed, 209 insertions, 3 deletions
diff --git a/src/mongo/s/write_ops/bulk_write_exec_test.cpp b/src/mongo/s/write_ops/bulk_write_exec_test.cpp
index 5dcd2ff15e7..c1416d7976c 100644
--- a/src/mongo/s/write_ops/bulk_write_exec_test.cpp
+++ b/src/mongo/s/write_ops/bulk_write_exec_test.cpp
@@ -813,9 +813,8 @@ TEST_F(BulkWriteOpTest, BuildChildRequestFromTargetedWriteBatch) {
NamespaceString nss1("sonate.pacifique");
// Two different endpoints targeting the same shard for the two namespaces.
- ShardEndpoint endpoint0(ShardId("shard"),
- ShardVersionFactory::make(ChunkVersion::IGNORED(), boost::none),
- boost::none);
+ ShardEndpoint endpoint0(
+ shardId, ShardVersionFactory::make(ChunkVersion::IGNORED(), boost::none), boost::none);
ShardEndpoint endpoint1(
shardId,
ShardVersionFactory::make(ChunkVersion({OID::gen(), Timestamp(2)}, {10, 11}),
@@ -873,6 +872,213 @@ TEST_F(BulkWriteOpTest, BuildChildRequestFromTargetedWriteBatch) {
ASSERT_EQUALS(childRequest.getNsInfo()[1].getNs(), request.getNsInfo()[1].getNs());
}
+// Tests that stmtIds are correctly attached to bulkWrite requests when the operations
+// are ordered.
+TEST_F(BulkWriteOpTest, TestOrderedOpsNoExistingStmtIds) {
+ NamespaceString nss("mgmt.kids");
+
+ ShardEndpoint endpointA(ShardId("shardA"),
+ ShardVersionFactory::make(ChunkVersion::IGNORED(), boost::none),
+ boost::none);
+ ShardEndpoint endpointB(ShardId("shardB"),
+ ShardVersionFactory::make(ChunkVersion::IGNORED(), boost::none),
+ boost::none);
+
+ std::vector<std::unique_ptr<NSTargeter>> targeters;
+ targeters.push_back(initTargeterSplitRange(nss, endpointA, endpointB));
+
+ // Because the operations are ordered, the bulkWrite operations is broken up by shard
+ // endpoint. In other words, targeting this request will result in two batches:
+ // 1) to shard A, and then 2) another to shard B after the first batch is complete.
+ BulkWriteCommandRequest request({BulkWriteInsertOp(0, BSON("x" << -1)), // stmtId 0, shard A
+ BulkWriteInsertOp(0, BSON("x" << -2)), // stmtId 1, shard A
+ BulkWriteInsertOp(0, BSON("x" << 1)), // stmtId 2, shard B
+ BulkWriteInsertOp(0, BSON("x" << 2))}, // stmtId 3, shard B
+ {NamespaceInfoEntry(nss)});
+ request.setOrdered(true);
+
+ // Setting the txnNumber makes it a retryable write.
+ _opCtx->setLogicalSessionId(LogicalSessionId());
+ _opCtx->setTxnNumber(TxnNumber(0));
+ BulkWriteOp bulkWriteOp(_opCtx, request);
+
+ std::map<ShardId, std::unique_ptr<TargetedWriteBatch>> targeted;
+ ASSERT_OK(bulkWriteOp.target(targeters, false, targeted));
+
+ auto* batch = targeted.begin()->second.get();
+ auto childRequest = bulkWriteOp.buildBulkCommandRequest(*batch);
+ auto childStmtIds = childRequest.getStmtIds();
+ ASSERT_EQUALS(childStmtIds->size(), 2u);
+ ASSERT_EQUALS(childStmtIds->at(0), 0);
+ ASSERT_EQUALS(childStmtIds->at(1), 1);
+
+ // Target again to get a batch for the operations to shard B.
+ targeted.clear();
+ ASSERT_OK(bulkWriteOp.target(targeters, false, targeted));
+
+ batch = targeted.begin()->second.get();
+ childRequest = bulkWriteOp.buildBulkCommandRequest(*batch);
+ childStmtIds = childRequest.getStmtIds();
+ ASSERT_EQUALS(childStmtIds->size(), 2u);
+ ASSERT_EQUALS(childStmtIds->at(0), 2);
+ ASSERT_EQUALS(childStmtIds->at(1), 3);
+}
+
+// Tests that stmtIds are correctly attached to bulkWrite requests when the operations
+// are unordered.
+TEST_F(BulkWriteOpTest, TestUnorderedOpsNoExistingStmtIds) {
+ NamespaceString nss("zero7.spinning");
+
+ ShardEndpoint endpointA(ShardId("shardA"),
+ ShardVersionFactory::make(ChunkVersion::IGNORED(), boost::none),
+ boost::none);
+ ShardEndpoint endpointB(ShardId("shardB"),
+ ShardVersionFactory::make(ChunkVersion::IGNORED(), boost::none),
+ boost::none);
+
+ std::vector<std::unique_ptr<NSTargeter>> targeters;
+ targeters.push_back(initTargeterSplitRange(nss, endpointA, endpointB));
+
+ // Since the ops aren't ordered, two batches are produced on a single targeting call:
+ // 1) the ops to shard A (op 0 and op 2) are a batch and 2) the ops to shard B (op 1
+ // and op 3) are a batch. Therefore the stmtIds in the bulkWrite request sent to the shards
+ // will be interleaving.
+ BulkWriteCommandRequest request({BulkWriteInsertOp(0, BSON("x" << -1)), // stmtId 0, shard A
+ BulkWriteInsertOp(0, BSON("x" << 1)), // stmtId 1, shard B
+ BulkWriteInsertOp(0, BSON("x" << -1)), // stmtId 2, shard A
+ BulkWriteInsertOp(0, BSON("x" << 2))}, // stmtId 3, shard B
+ {NamespaceInfoEntry(nss)});
+ request.setOrdered(false);
+
+ // Setting the txnNumber makes it a retryable write.
+ _opCtx->setLogicalSessionId(LogicalSessionId());
+ _opCtx->setTxnNumber(TxnNumber(0));
+ BulkWriteOp bulkWriteOp(_opCtx, request);
+
+ std::map<ShardId, std::unique_ptr<TargetedWriteBatch>> targeted;
+ ASSERT_OK(bulkWriteOp.target(targeters, false, targeted));
+
+ // The batch to shard A contains op 0 and op 2.
+ auto* batch = targeted[ShardId("shardA")].get();
+ auto childRequest = bulkWriteOp.buildBulkCommandRequest(*batch);
+ auto childStmtIds = childRequest.getStmtIds();
+ ASSERT_EQUALS(childStmtIds->size(), 2u);
+ ASSERT_EQUALS(childStmtIds->at(0), 0);
+ ASSERT_EQUALS(childStmtIds->at(1), 2);
+
+ // The batch to shard B contains op 1 and op 3.
+ batch = targeted[ShardId("shardB")].get();
+ childRequest = bulkWriteOp.buildBulkCommandRequest(*batch);
+ childStmtIds = childRequest.getStmtIds();
+ ASSERT_EQUALS(childStmtIds->size(), 2u);
+ ASSERT_EQUALS(childStmtIds->at(0), 1);
+ ASSERT_EQUALS(childStmtIds->at(1), 3);
+}
+
+// Tests that stmtIds are correctly attached to bulkWrite requests when the operations
+// are unordered and stmtIds are attached to the request already.
+TEST_F(BulkWriteOpTest, TestUnorderedOpsStmtIdsExist) {
+ NamespaceString nss("zero7.spinning");
+
+ ShardEndpoint endpointA(ShardId("shardA"),
+ ShardVersionFactory::make(ChunkVersion::IGNORED(), boost::none),
+ boost::none);
+ ShardEndpoint endpointB(ShardId("shardB"),
+ ShardVersionFactory::make(ChunkVersion::IGNORED(), boost::none),
+ boost::none);
+
+ std::vector<std::unique_ptr<NSTargeter>> targeters;
+ targeters.push_back(initTargeterSplitRange(nss, endpointA, endpointB));
+
+ // Since the ops aren't ordered, two batches are produced on a single targeting call:
+ // 1) the ops to shard A (op 0 and op 2) are a batch and 2) the ops to shard B (op 1
+ // and op 3) are a batch. Therefore the stmtIds in the bulkWrite request sent to the shards
+ // will be interleaving.
+ BulkWriteCommandRequest request({BulkWriteInsertOp(0, BSON("x" << -1)), // stmtId 6, shard A
+ BulkWriteInsertOp(0, BSON("x" << 1)), // stmtId 7, shard B
+ BulkWriteInsertOp(0, BSON("x" << -1)), // stmtId 8, shard A
+ BulkWriteInsertOp(0, BSON("x" << 2))}, // stmtId 9, shard B
+ {NamespaceInfoEntry(nss)});
+ request.setOrdered(false);
+ request.setStmtIds(std::vector<int>{6, 7, 8, 9});
+
+ // Setting the txnNumber makes it a retryable write.
+ _opCtx->setLogicalSessionId(LogicalSessionId());
+ _opCtx->setTxnNumber(TxnNumber(0));
+ BulkWriteOp bulkWriteOp(_opCtx, request);
+
+ std::map<ShardId, std::unique_ptr<TargetedWriteBatch>> targeted;
+ ASSERT_OK(bulkWriteOp.target(targeters, false, targeted));
+
+ // The batch to shard A contains op 0 and op 2.
+ auto* batch = targeted[ShardId("shardA")].get();
+ auto childRequest = bulkWriteOp.buildBulkCommandRequest(*batch);
+ auto childStmtIds = childRequest.getStmtIds();
+ ASSERT_EQUALS(childStmtIds->size(), 2u);
+ ASSERT_EQUALS(childStmtIds->at(0), 6);
+ ASSERT_EQUALS(childStmtIds->at(1), 8);
+
+ // The batch to shard B contains op 1 and op 3.
+ batch = targeted[ShardId("shardB")].get();
+ childRequest = bulkWriteOp.buildBulkCommandRequest(*batch);
+ childStmtIds = childRequest.getStmtIds();
+ ASSERT_EQUALS(childStmtIds->size(), 2u);
+ ASSERT_EQUALS(childStmtIds->at(0), 7);
+ ASSERT_EQUALS(childStmtIds->at(1), 9);
+}
+
+// Tests that stmtIds are correctly attached to bulkWrite requests when the operations
+// are unordered and the stmtId field exists.
+TEST_F(BulkWriteOpTest, TestUnorderedOpsStmtIdFieldExists) {
+ NamespaceString nss("zero7.spinning");
+
+ ShardEndpoint endpointA(ShardId("shardA"),
+ ShardVersionFactory::make(ChunkVersion::IGNORED(), boost::none),
+ boost::none);
+ ShardEndpoint endpointB(ShardId("shardB"),
+ ShardVersionFactory::make(ChunkVersion::IGNORED(), boost::none),
+ boost::none);
+
+ std::vector<std::unique_ptr<NSTargeter>> targeters;
+ targeters.push_back(initTargeterSplitRange(nss, endpointA, endpointB));
+
+ // Since the ops aren't ordered, two batches are produced on a single targeting call:
+ // 1) the ops to shard A (op 0 and op 2) are a batch and 2) the ops to shard B (op 1
+ // and op 3) are a batch. Therefore the stmtIds in the bulkWrite request sent to the shards
+ // will be interleaving.
+ BulkWriteCommandRequest request({BulkWriteInsertOp(0, BSON("x" << -1)), // stmtId 6, shard A
+ BulkWriteInsertOp(0, BSON("x" << 1)), // stmtId 7, shard B
+ BulkWriteInsertOp(0, BSON("x" << -1)), // stmtId 8, shard A
+ BulkWriteInsertOp(0, BSON("x" << 2))}, // stmtId 9, shard B
+ {NamespaceInfoEntry(nss)});
+ request.setOrdered(false);
+ request.setStmtId(6); // Produces stmtIds 6, 7, 8, 9
+
+ // Setting the txnNumber makes it a retryable write.
+ _opCtx->setLogicalSessionId(LogicalSessionId());
+ _opCtx->setTxnNumber(TxnNumber(0));
+ BulkWriteOp bulkWriteOp(_opCtx, request);
+
+ std::map<ShardId, std::unique_ptr<TargetedWriteBatch>> targeted;
+ ASSERT_OK(bulkWriteOp.target(targeters, false, targeted));
+
+ // The batch to shard A contains op 0 and op 2.
+ auto* batch = targeted[ShardId("shardA")].get();
+ auto childRequest = bulkWriteOp.buildBulkCommandRequest(*batch);
+ auto childStmtIds = childRequest.getStmtIds();
+ ASSERT_EQUALS(childStmtIds->size(), 2u);
+ ASSERT_EQUALS(childStmtIds->at(0), 6);
+ ASSERT_EQUALS(childStmtIds->at(1), 8);
+
+ // The batch to shard B contains op 1 and op 3.
+ batch = targeted[ShardId("shardB")].get();
+ childRequest = bulkWriteOp.buildBulkCommandRequest(*batch);
+ childStmtIds = childRequest.getStmtIds();
+ ASSERT_EQUALS(childStmtIds->size(), 2u);
+ ASSERT_EQUALS(childStmtIds->at(0), 7);
+ ASSERT_EQUALS(childStmtIds->at(1), 9);
+}
+
// Test BatchItemRef.getLet().
TEST_F(BulkWriteOpTest, BatchItemRefGetLet) {
NamespaceString nss("foo.bar");