diff options
4 files changed, 292 insertions, 66 deletions
diff --git a/jstests/sharding/bulk_write_basic.js b/jstests/sharding/bulk_write_basic.js
new file mode 100644
index 00000000000..385518b2f43
--- /dev/null
+++ b/jstests/sharding/bulk_write_basic.js
@@ -0,0 +1,121 @@
+ * Tests that bulk write ordered operations succeed on a two shard cluster with both
+ * sharded and unsharded data.
+ * @tags: [multiversion_incompatible, featureFlagBulkWriteCommand]
+ */
+(function() {
+'use strict';
+load("jstests/libs/namespace_utils.js"); // getDBNameAndCollNameFromFullNamespace()
+const st = new ShardingTest({
+ shards: 2,
+ mongos: 2,
+ config: 1,
+ rs: {nodes: 1},
+ mongosOptions: {setParameter: {logComponentVerbosity: tojson({sharding: 4})}}
+function getCollection(ns) {
+ const [dbName, collName] = getDBNameAndCollNameFromFullNamespace(ns);
+ return st.s0.getDB(dbName)[collName];
+const banana = "test.banana";
+const orange = "";
+const staleConfigBananaLog = /7279201.*Noting stale config response.*banana/;
+const staleConfigOrangeLog = /7279201.*Noting stale config response.*orange/;
+const staleDbTest2Log = /7279202.*Noting stale database response.*test2/;
+jsTestLog("Case 1: Collection does't exist yet.");
+// Case 1: The collection doesn't exist yet. This results in a StaleConfig error on the
+// shards and consequently mongos and the shards must all refresh. Then mongos needs to
+// retry the bulk operation.
+// Connect via the first mongos. We do this so that the second mongos remains unused until
+// a later test case.
+const db_s0 = st.s0.getDB("test");
+ bulkWrite: 1,
+ ops: [{insert: 0, document: {a: 0}}, {insert: 0, document: {a: 1}}],
+ nsInfo: [{ns: banana}]
+let insertedDocs = getCollection(banana).find({}).toArray();
+assert.eq(2, insertedDocs.length, `Inserted docs: '${tojson(insertedDocs)}'`);
+assert(checkLog.checkContainsOnce(st.s0, staleConfigBananaLog));
+jsTestLog("Case 2: The collection exists for some of writes, but not for others.");
+ bulkWrite: 1,
+ ops: [
+ {insert: 0, document: {a: 2}},
+ {insert: 1, document: {a: 0}},
+ {insert: 0, document: {a: 3}}
+ ],
+ nsInfo: [{ns: banana}, {ns: orange}]
+insertedDocs = getCollection(banana).find({}).toArray();
+assert.eq(4, insertedDocs.length, `Inserted docs: '${tojson(insertedDocs)}'`);
+insertedDocs = getCollection(orange).find({}).toArray();
+assert.eq(1, insertedDocs.length, `Inserted docs: '${tojson(insertedDocs)}'`);
+assert(checkLog.checkContainsOnce(st.s0, staleConfigOrangeLog));
+jsTestLog("Case 3: StaleDbVersion when unsharded collection moves between shards.");
+const db_s1 = st.s1.getDB("test");
+// Case 3: Move the 'test2' DB back and forth across shards. This will result in bulkWrite
+// getting a StaleDbVersion error. We run this on s1 so s0 doesn't know about the change.
+assert.commandWorked(db_s1.adminCommand({movePrimary: 'test2', to: st.shard0.shardName}));
+assert.commandWorked(db_s1.adminCommand({movePrimary: 'test2', to: st.shard1.shardName}));
+// Now run the bulk write command on s0.
+ {bulkWrite: 1, ops: [{insert: 0, document: {a: 3}}], nsInfo: [{ns: orange}]}));
+insertedDocs = getCollection(orange).find({}).toArray();
+assert.eq(2, insertedDocs.length, `Inserted docs: '${tojson(insertedDocs)}'`);
+assert(checkLog.checkContainsOnce(st.s0, staleDbTest2Log));
+jsTestLog("Case 4: The collection is sharded and lives on both shards.");
+// Case 4: Shard the collection and manually move chunks so that they live on
+// both shards. We stop the balancer as well. We do all of this on s0, but then,
+// we run a bulk write command through the s1 that has a stale view of the cluster.
+jsTestLog("Shard the collection.");
+assert.commandWorked(getCollection(banana).createIndex({a: 1}));
+assert.commandWorked(db_s0.adminCommand({enableSharding: "test"}));
+assert.commandWorked(db_s0.adminCommand({shardCollection: banana, key: {a: 1}}));
+jsTestLog("Create chunks, then move them.");
+assert.commandWorked(db_s0.adminCommand({split: banana, middle: {a: 2}}));
+ db_s0.adminCommand({moveChunk: banana, find: {a: 0}, to: st.shard0.shardName}));
+ db_s0.adminCommand({moveChunk: banana, find: {a: 3}, to: st.shard1.shardName}));
+jsTestLog("Running bulk write command.");
+ bulkWrite: 1,
+ ops: [
+ {insert: 0, document: {a: -1}},
+ {insert: 1, document: {a: 1}},
+ {insert: 0, document: {a: 4}}
+ ],
+ nsInfo: [{ns: banana}, {ns: orange}]
+insertedDocs = getCollection(banana).find({}).toArray();
+assert.eq(6, insertedDocs.length, `Inserted docs: '${tojson(insertedDocs)}'`);
+insertedDocs = getCollection(orange).find({}).toArray();
+assert.eq(3, insertedDocs.length, `Inserted docs: '${tojson(insertedDocs)}'`);
+// Checklog doesn't work in this case because mongos may refresh its routing info before
+// runningthe bulkWrite command, which means that the logs we're looking for won't get printed.
+// However, since the number of documents matched up in the asserts above, it means that mongos
+// must've correctly routed the bulkWrite command.
diff --git a/src/mongo/s/write_ops/bulk_write_exec.cpp b/src/mongo/s/write_ops/bulk_write_exec.cpp
index 0af5853acd1..724b46b34ad 100644
--- a/src/mongo/s/write_ops/bulk_write_exec.cpp
+++ b/src/mongo/s/write_ops/bulk_write_exec.cpp
@@ -33,7 +33,10 @@
#include "mongo/client/read_preference.h"
#include "mongo/client/remote_command_targeter.h"
#include "mongo/db/commands/bulk_write_gen.h"
+#include "mongo/db/commands/bulk_write_parser.h"
#include "mongo/db/database_name.h"
+#include "mongo/db/namespace_string.h"
+#include "mongo/db/ops/write_ops_parsers.h"
#include "mongo/db/write_concern_options.h"
#include "mongo/executor/task_executor_pool.h"
#include "mongo/logv2/log.h"
@@ -59,8 +62,9 @@ const int kMaxRoundsWithoutProgress(5);
// Send and process the child batches. Each child batch is targeted at a unique shard: therefore
// one shard will have only one batch incoming.
void executeChildBatches(OperationContext* opCtx,
- const TargetedBatchMap& childBatches,
- const BulkWriteOp& bulkWriteOp) {
+ TargetedBatchMap& childBatches,
+ BulkWriteOp& bulkWriteOp,
+ stdx::unordered_map<NamespaceString, TrackedErrors>& errorsPerNamespace) {
std::vector<AsyncRequestsSender::Request> requests;
for (auto& childBatch : childBatches) {
auto request = [&]() {
@@ -95,6 +99,54 @@ void executeChildBatches(OperationContext* opCtx,
while (!ars.done()) {
// Block until a response is available.
auto response =;
+ Status responseStatus = response.swResponse.getStatus();
+ // TODO (SERVER-76957): The status may not be OK, handle it.
+ invariant(responseStatus.isOK());
+ auto bwReply = BulkWriteCommandReply::parse(IDLParserContext("bulkWrite"),
+ response.swResponse.getValue().data);
+ // TODO (SERVER-76958): Iterate through the cursor rather than looking only at the
+ // first batch.
+ auto cursor = bwReply.getCursor();
+ const auto& replyItems = cursor.getFirstBatch();
+ TargetedWriteBatch* writeBatch = childBatches.find(response.shardId)->second.get();
+ // Capture the errors if any exist and mark the writes in the TargetedWriteBatch so that
+ // they may be re-targeted if needed.
+ bulkWriteOp.noteBatchResponse(*writeBatch, replyItems, errorsPerNamespace);
+ }
+void noteStaleResponses(
+ OperationContext* opCtx,
+ const std::vector<std::unique_ptr<NSTargeter>>& targeters,
+ const stdx::unordered_map<NamespaceString, TrackedErrors>& errorsPerNamespace) {
+ for (auto& targeter : targeters) {
+ auto errors = errorsPerNamespace.find(targeter->getNS());
+ if (errors != errorsPerNamespace.cend()) {
+ for (const auto& error : errors->second.getErrors(ErrorCodes::StaleConfig)) {
+ LOGV2_DEBUG(7279201,
+ 4,
+ "Noting stale config response.",
+ "shardId"_attr = error.endpoint.shardName,
+ "status"_attr = error.error.getStatus());
+ targeter->noteStaleShardResponse(
+ opCtx, error.endpoint, *error.error.getStatus().extraInfo<StaleConfigInfo>());
+ }
+ for (const auto& error : errors->second.getErrors(ErrorCodes::StaleDbVersion)) {
+ LOGV2_DEBUG(7279202,
+ 4,
+ "Noting stale database response.",
+ "shardId"_attr = error.endpoint.shardName,
+ "status"_attr = error.error.getStatus());
+ targeter->noteStaleDbResponse(
+ opCtx,
+ error.endpoint,
+ *error.error.getStatus().extraInfo<StaleDbRoutingVersion>());
+ }
+ }
@@ -137,25 +189,23 @@ std::vector<BulkWriteReplyItem> execute(OperationContext* opCtx,
refreshedTargeter = true;
- }
+ } else {
+ stdx::unordered_map<NamespaceString, TrackedErrors> errorsPerNamespace;
- // Send the child batches and wait for responses.
- executeChildBatches(opCtx, childBatches, bulkWriteOp);
+ // Send the child batches and wait for responses.
+ executeChildBatches(opCtx, childBatches, bulkWriteOp, errorsPerNamespace);
- // 3: Abort the batch upon errors for ordered writes or transactions.
- // TODO(SERVER-72792): Remove the logic below that mimics ok responses and process real
- // batch responses.
- for (const auto& childBatch : childBatches) {
- bulkWriteOp.noteBatchResponse(*childBatch.second);
+ // If we saw any staleness errors, tell the targeters to invalidate their cache
+ // so that they may be refreshed.
+ noteStaleResponses(opCtx, targeters, errorsPerNamespace);
- // 4: Refresh the targeter(s) if we receive a target error or a stale config/db error.
if (bulkWriteOp.isFinished()) {
// No need to refresh the targeters if we are done.
+ // Refresh the targeter(s) if we received a target error or a stale config/db error.
bool targeterChanged = false;
try {
LOGV2_DEBUG(7298200, 2, "Refreshing all targeters for bulkWrite");
@@ -337,11 +387,55 @@ void BulkWriteOp::abortBatch(const Status& status) {
-// TODO(SERVER-72792): Finish this and process real batch responses.
-void BulkWriteOp::noteBatchResponse(const TargetedWriteBatch& targetedBatch) {
- for (auto&& write : targetedBatch.getWrites()) {
+void BulkWriteOp::noteBatchResponse(
+ TargetedWriteBatch& targetedBatch,
+ const std::vector<BulkWriteReplyItem>& replyItems,
+ stdx::unordered_map<NamespaceString, TrackedErrors>& errorsPerNamespace) {
+ LOGV2_DEBUG(7279200,
+ 4,
+ "Processing bulk write response from shard.",
+ "shard"_attr = targetedBatch.getShardId(),
+ "replyItems"_attr = replyItems);
+ int index = -1;
+ bool ordered = _clientRequest.getOrdered();
+ boost::optional<write_ops::WriteError> lastError;
+ for (const auto& write : targetedBatch.getWrites()) {
+ ++index;
WriteOp& writeOp = _writeOps[write->writeOpRef.first];
- writeOp.noteWriteComplete(*write);
+ // TODO (SERVER-76953) : Handle unordered operations
+ // When an error is encountered on an ordered bulk write, it is impossible for any of the
+ // remaining operations to have been executed. For that reason we cancel them here so they
+ // may be retargeted and retried.
+ if (ordered && lastError) {
+ invariant(index >= (int)replyItems.size());
+ writeOp.cancelWrites(&*lastError);
+ continue;
+ }
+ auto& reply = replyItems[index];
+ if (reply.getStatus().isOK()) {
+ writeOp.noteWriteComplete(*write);
+ } else {
+ lastError.emplace(reply.getIdx(), reply.getStatus());
+ writeOp.noteWriteError(*write, *lastError);
+ auto origWrite = BulkWriteCRUDOp(_clientRequest.getOps()[write->writeOpRef.first]);
+ auto nss = _clientRequest.getNsInfo()[origWrite.getNsInfoIdx()].getNs();
+ if (errorsPerNamespace.find(nss) == errorsPerNamespace.end()) {
+ TrackedErrors trackedErrors;
+ trackedErrors.startTracking(ErrorCodes::StaleConfig);
+ trackedErrors.startTracking(ErrorCodes::StaleDbVersion);
+ errorsPerNamespace.emplace(nss, trackedErrors);
+ }
+ auto trackedErrors = errorsPerNamespace.find(nss);
+ invariant(trackedErrors != errorsPerNamespace.end());
+ if (trackedErrors->second.isTracking(reply.getStatus().code())) {
+ trackedErrors->second.addError(ShardError(write->endpoint, *lastError));
+ }
+ }
diff --git a/src/mongo/s/write_ops/bulk_write_exec.h b/src/mongo/s/write_ops/bulk_write_exec.h
index 51d862ad3f9..745312025bc 100644
--- a/src/mongo/s/write_ops/bulk_write_exec.h
+++ b/src/mongo/s/write_ops/bulk_write_exec.h
@@ -32,6 +32,7 @@
#include "mongo/bson/timestamp.h"
#include "mongo/client/connection_string.h"
#include "mongo/db/commands/bulk_write_gen.h"
+#include "mongo/db/commands/bulk_write_parser.h"
#include "mongo/db/repl/optime.h"
#include "mongo/s/ns_targeter.h"
#include "mongo/s/write_ops/batch_write_op.h"
@@ -127,8 +128,14 @@ public:
void abortBatch(const Status& status);
- // TODO(SERVER-72792): Finish this and process real batch responses.
- void noteBatchResponse(const TargetedWriteBatch& targetedBatch);
+ /**
+ * Processes the response to a TargetedWriteBatch. The response is captured by the vector of
+ * BulkWriteReplyItems. Sharding related errors are then grouped by namespace and captured in
+ * the map passed in.
+ */
+ void noteBatchResponse(TargetedWriteBatch& targetedBatch,
+ const std::vector<BulkWriteReplyItem>& replyItems,
+ stdx::unordered_map<NamespaceString, TrackedErrors>& errorsPerNamespace);
* Returns a vector of BulkWriteReplyItem based on the end state of each individual write in
diff --git a/src/mongo/s/write_ops/bulk_write_exec_test.cpp b/src/mongo/s/write_ops/bulk_write_exec_test.cpp
index 4fa38aabe40..5dcd2ff15e7 100644
--- a/src/mongo/s/write_ops/bulk_write_exec_test.cpp
+++ b/src/mongo/s/write_ops/bulk_write_exec_test.cpp
@@ -905,54 +905,58 @@ public:
-TEST_F(BulkWriteExecTest, RefreshTargetersOnTargetErrors) {
- ShardId shardIdA("shardA");
- ShardId shardIdB("shardB");
- NamespaceString nss0("");
- NamespaceString nss1("");
- ShardEndpoint endpoint0(
- shardIdA, ShardVersionFactory::make(ChunkVersion::IGNORED(), boost::none), boost::none);
- ShardEndpoint endpoint1(
- shardIdB,
- ShardVersionFactory::make(ChunkVersion({OID::gen(), Timestamp(2)}, {10, 11}),
- boost::optional<CollectionIndexes>(boost::none)),
- boost::none);
- std::vector<std::unique_ptr<NSTargeter>> targeters;
- // Initialize the targeter so that x >= 0 values are untargetable so target call will encounter
- // an error.
- targeters.push_back(initTargeterHalfRange(nss0, endpoint0));
- targeters.push_back(initTargeterFullRange(nss1, endpoint1));
- auto targeter0 = static_cast<BulkWriteMockNSTargeter*>(targeters[0].get());
- auto targeter1 = static_cast<BulkWriteMockNSTargeter*>(targeters[1].get());
- // Only the first op would get a target error.
- BulkWriteCommandRequest request(
- {BulkWriteInsertOp(0, BSON("x" << 1)), BulkWriteInsertOp(1, BSON("x" << 1))},
- {NamespaceInfoEntry(nss0), NamespaceInfoEntry(nss1)});
- // Test unordered operations. Since only the first op is untargetable, the second op will
- // succeed without errors. But bulk_write_exec::execute would retry on targeting errors and try
- // to refresh the targeters upon targeting errors.
- request.setOrdered(false);
- auto replyItems = bulk_write_exec::execute(operationContext(), targeters, request);
- ASSERT_EQUALS(replyItems.size(), 2u);
- ASSERT_NOT_OK(replyItems[0].getStatus());
- ASSERT_OK(replyItems[1].getStatus());
- ASSERT_EQUALS(targeter0->getNumRefreshes(), 1);
- ASSERT_EQUALS(targeter1->getNumRefreshes(), 1);
- // Test ordered operations. This is mostly the same as the test case above except that we should
- // only return the first error for ordered operations.
- request.setOrdered(true);
- replyItems = bulk_write_exec::execute(operationContext(), targeters, request);
- ASSERT_EQUALS(replyItems.size(), 1u);
- ASSERT_NOT_OK(replyItems[0].getStatus());
- // We should have another refresh attempt.
- ASSERT_EQUALS(targeter0->getNumRefreshes(), 2);
- ASSERT_EQUALS(targeter1->getNumRefreshes(), 2);
+// TODO (SERVER-76953): Uncomment after mongos can handle targeting errors in unordered ops.
+// TEST_F(BulkWriteExecTest, RefreshTargetersOnTargetErrors) {
+// ShardId shardIdA("shardA");
+// ShardId shardIdB("shardB");
+// NamespaceString nss0("");
+// NamespaceString nss1("");
+// ShardEndpoint endpoint0(
+// shardIdA, ShardVersionFactory::make(ChunkVersion::IGNORED(), boost::none), boost::none);
+// ShardEndpoint endpoint1(
+// shardIdB,
+// ShardVersionFactory::make(ChunkVersion({OID::gen(), Timestamp(2)}, {10, 11}),
+// boost::optional<CollectionIndexes>(boost::none)),
+// boost::none);
+// std::vector<std::unique_ptr<NSTargeter>> targeters;
+// // Initialize the targeter so that x >= 0 values are untargetable so target call will
+// encounter
+// // an error.
+// targeters.push_back(initTargeterHalfRange(nss0, endpoint0));
+// targeters.push_back(initTargeterFullRange(nss1, endpoint1));
+// auto targeter0 = static_cast<BulkWriteMockNSTargeter*>(targeters[0].get());
+// auto targeter1 = static_cast<BulkWriteMockNSTargeter*>(targeters[1].get());
+// // Only the first op would get a target error.
+// BulkWriteCommandRequest request(
+// {BulkWriteInsertOp(0, BSON("x" << 1)), BulkWriteInsertOp(1, BSON("x" << 1))},
+// {NamespaceInfoEntry(nss0), NamespaceInfoEntry(nss1)});
+// // Test unordered operations. Since only the first op is untargetable, the second op will
+// // succeed without errors. But bulk_write_exec::execute would retry on targeting errors and
+// try
+// // to refresh the targeters upon targeting errors.
+// request.setOrdered(false);
+// auto replyItems = bulk_write_exec::execute(operationContext(), targeters, request);
+// ASSERT_EQUALS(replyItems.size(), 2u);
+// ASSERT_NOT_OK(replyItems[0].getStatus());
+// ASSERT_OK(replyItems[1].getStatus());
+// ASSERT_EQUALS(targeter0->getNumRefreshes(), 1);
+// ASSERT_EQUALS(targeter1->getNumRefreshes(), 1);
+// // Test ordered operations. This is mostly the same as the test case above except that we
+// should
+// // only return the first error for ordered operations.
+// request.setOrdered(true);
+// replyItems = bulk_write_exec::execute(operationContext(), targeters, request);
+// ASSERT_EQUALS(replyItems.size(), 1u);
+// ASSERT_NOT_OK(replyItems[0].getStatus());
+// // We should have another refresh attempt.
+// ASSERT_EQUALS(targeter0->getNumRefreshes(), 2);
+// ASSERT_EQUALS(targeter1->getNumRefreshes(), 2);
+// }
TEST_F(BulkWriteExecTest, CollectionDroppedBeforeRefreshingTargeters) {
ShardId shardId("shardA");