From a5e8b7dada69c783873940d6f0723e99480626ad Mon Sep 17 00:00:00 2001 From: kauboy26 Date: Wed, 17 May 2023 19:18:56 +0000 Subject: SERVER-72792 implement basic progress tracking for individual operations in bulkWrite --- jstests/sharding/bulk_write_basic.js | 121 ++++++++++++++++++++++++ src/mongo/s/write_ops/bulk_write_exec.cpp | 126 +++++++++++++++++++++---- src/mongo/s/write_ops/bulk_write_exec.h | 11 ++- src/mongo/s/write_ops/bulk_write_exec_test.cpp | 100 ++++++++++---------- 4 files changed, 292 insertions(+), 66 deletions(-) create mode 100644 jstests/sharding/bulk_write_basic.js diff --git a/jstests/sharding/bulk_write_basic.js b/jstests/sharding/bulk_write_basic.js new file mode 100644 index 00000000000..385518b2f43 --- /dev/null +++ b/jstests/sharding/bulk_write_basic.js @@ -0,0 +1,121 @@ +/* + * Tests that bulk write ordered operations succeed on a two shard cluster with both + * sharded and unsharded data. + * @tags: [multiversion_incompatible, featureFlagBulkWriteCommand] + */ + +(function() { +'use strict'; + +load("jstests/libs/namespace_utils.js"); // getDBNameAndCollNameFromFullNamespace() + +const st = new ShardingTest({ + shards: 2, + mongos: 2, + config: 1, + rs: {nodes: 1}, + mongosOptions: {setParameter: {logComponentVerbosity: tojson({sharding: 4})}} +}); + +function getCollection(ns) { + const [dbName, collName] = getDBNameAndCollNameFromFullNamespace(ns); + return st.s0.getDB(dbName)[collName]; +} + +const banana = "test.banana"; +const orange = "test2.orange"; + +const staleConfigBananaLog = /7279201.*Noting stale config response.*banana/; +const staleConfigOrangeLog = /7279201.*Noting stale config response.*orange/; +const staleDbTest2Log = /7279202.*Noting stale database response.*test2/; + +jsTestLog("Case 1: Collection does't exist yet."); +// Case 1: The collection doesn't exist yet. This results in a StaleConfig error on the +// shards and consequently mongos and the shards must all refresh. Then mongos needs to +// retry the bulk operation. + +// Connect via the first mongos. We do this so that the second mongos remains unused until +// a later test case. +const db_s0 = st.s0.getDB("test"); +assert.commandWorked(db_s0.adminCommand({ + bulkWrite: 1, + ops: [{insert: 0, document: {a: 0}}, {insert: 0, document: {a: 1}}], + nsInfo: [{ns: banana}] +})); + +let insertedDocs = getCollection(banana).find({}).toArray(); +assert.eq(2, insertedDocs.length, `Inserted docs: '${tojson(insertedDocs)}'`); +assert(checkLog.checkContainsOnce(st.s0, staleConfigBananaLog)); + +jsTestLog("Case 2: The collection exists for some of writes, but not for others."); +assert.commandWorked(db_s0.adminCommand({ + bulkWrite: 1, + ops: [ + {insert: 0, document: {a: 2}}, + {insert: 1, document: {a: 0}}, + {insert: 0, document: {a: 3}} + ], + nsInfo: [{ns: banana}, {ns: orange}] +})); + +insertedDocs = getCollection(banana).find({}).toArray(); +assert.eq(4, insertedDocs.length, `Inserted docs: '${tojson(insertedDocs)}'`); +insertedDocs = getCollection(orange).find({}).toArray(); +assert.eq(1, insertedDocs.length, `Inserted docs: '${tojson(insertedDocs)}'`); +assert(checkLog.checkContainsOnce(st.s0, staleConfigOrangeLog)); + +jsTestLog("Case 3: StaleDbVersion when unsharded collection moves between shards."); +const db_s1 = st.s1.getDB("test"); +// Case 3: Move the 'test2' DB back and forth across shards. This will result in bulkWrite +// getting a StaleDbVersion error. We run this on s1 so s0 doesn't know about the change. +assert.commandWorked(db_s1.adminCommand({movePrimary: 'test2', to: st.shard0.shardName})); +assert.commandWorked(db_s1.adminCommand({movePrimary: 'test2', to: st.shard1.shardName})); + +// Now run the bulk write command on s0. +assert.commandWorked(db_s0.adminCommand( + {bulkWrite: 1, ops: [{insert: 0, document: {a: 3}}], nsInfo: [{ns: orange}]})); +insertedDocs = getCollection(orange).find({}).toArray(); +assert.eq(2, insertedDocs.length, `Inserted docs: '${tojson(insertedDocs)}'`); +assert(checkLog.checkContainsOnce(st.s0, staleDbTest2Log)); + +jsTestLog("Case 4: The collection is sharded and lives on both shards."); +// Case 4: Shard the collection and manually move chunks so that they live on +// both shards. We stop the balancer as well. We do all of this on s0, but then, +// we run a bulk write command through the s1 that has a stale view of the cluster. +assert.commandWorked(st.stopBalancer()); + +jsTestLog("Shard the collection."); +assert.commandWorked(getCollection(banana).createIndex({a: 1})); +assert.commandWorked(db_s0.adminCommand({enableSharding: "test"})); +assert.commandWorked(db_s0.adminCommand({shardCollection: banana, key: {a: 1}})); + +jsTestLog("Create chunks, then move them."); +assert.commandWorked(db_s0.adminCommand({split: banana, middle: {a: 2}})); +assert.commandWorked( + db_s0.adminCommand({moveChunk: banana, find: {a: 0}, to: st.shard0.shardName})); +assert.commandWorked( + db_s0.adminCommand({moveChunk: banana, find: {a: 3}, to: st.shard1.shardName})); + +jsTestLog("Running bulk write command."); +assert.commandWorked(db_s1.adminCommand({ + bulkWrite: 1, + ops: [ + {insert: 0, document: {a: -1}}, + {insert: 1, document: {a: 1}}, + {insert: 0, document: {a: 4}} + ], + nsInfo: [{ns: banana}, {ns: orange}] +})); + +insertedDocs = getCollection(banana).find({}).toArray(); +assert.eq(6, insertedDocs.length, `Inserted docs: '${tojson(insertedDocs)}'`); +insertedDocs = getCollection(orange).find({}).toArray(); +assert.eq(3, insertedDocs.length, `Inserted docs: '${tojson(insertedDocs)}'`); + +// Checklog doesn't work in this case because mongos may refresh its routing info before +// runningthe bulkWrite command, which means that the logs we're looking for won't get printed. +// However, since the number of documents matched up in the asserts above, it means that mongos +// must've correctly routed the bulkWrite command. + +st.stop(); +})(); diff --git a/src/mongo/s/write_ops/bulk_write_exec.cpp b/src/mongo/s/write_ops/bulk_write_exec.cpp index 0af5853acd1..724b46b34ad 100644 --- a/src/mongo/s/write_ops/bulk_write_exec.cpp +++ b/src/mongo/s/write_ops/bulk_write_exec.cpp @@ -33,7 +33,10 @@ #include "mongo/client/read_preference.h" #include "mongo/client/remote_command_targeter.h" #include "mongo/db/commands/bulk_write_gen.h" +#include "mongo/db/commands/bulk_write_parser.h" #include "mongo/db/database_name.h" +#include "mongo/db/namespace_string.h" +#include "mongo/db/ops/write_ops_parsers.h" #include "mongo/db/write_concern_options.h" #include "mongo/executor/task_executor_pool.h" #include "mongo/logv2/log.h" @@ -59,8 +62,9 @@ const int kMaxRoundsWithoutProgress(5); // Send and process the child batches. Each child batch is targeted at a unique shard: therefore // one shard will have only one batch incoming. void executeChildBatches(OperationContext* opCtx, - const TargetedBatchMap& childBatches, - const BulkWriteOp& bulkWriteOp) { + TargetedBatchMap& childBatches, + BulkWriteOp& bulkWriteOp, + stdx::unordered_map& errorsPerNamespace) { std::vector requests; for (auto& childBatch : childBatches) { auto request = [&]() { @@ -95,6 +99,54 @@ void executeChildBatches(OperationContext* opCtx, while (!ars.done()) { // Block until a response is available. auto response = ars.next(); + + Status responseStatus = response.swResponse.getStatus(); + // TODO (SERVER-76957): The status may not be OK, handle it. + invariant(responseStatus.isOK()); + + auto bwReply = BulkWriteCommandReply::parse(IDLParserContext("bulkWrite"), + response.swResponse.getValue().data); + + // TODO (SERVER-76958): Iterate through the cursor rather than looking only at the + // first batch. + auto cursor = bwReply.getCursor(); + const auto& replyItems = cursor.getFirstBatch(); + TargetedWriteBatch* writeBatch = childBatches.find(response.shardId)->second.get(); + + // Capture the errors if any exist and mark the writes in the TargetedWriteBatch so that + // they may be re-targeted if needed. + bulkWriteOp.noteBatchResponse(*writeBatch, replyItems, errorsPerNamespace); + } +} + +void noteStaleResponses( + OperationContext* opCtx, + const std::vector>& targeters, + const stdx::unordered_map& errorsPerNamespace) { + for (auto& targeter : targeters) { + auto errors = errorsPerNamespace.find(targeter->getNS()); + if (errors != errorsPerNamespace.cend()) { + for (const auto& error : errors->second.getErrors(ErrorCodes::StaleConfig)) { + LOGV2_DEBUG(7279201, + 4, + "Noting stale config response.", + "shardId"_attr = error.endpoint.shardName, + "status"_attr = error.error.getStatus()); + targeter->noteStaleShardResponse( + opCtx, error.endpoint, *error.error.getStatus().extraInfo()); + } + for (const auto& error : errors->second.getErrors(ErrorCodes::StaleDbVersion)) { + LOGV2_DEBUG(7279202, + 4, + "Noting stale database response.", + "shardId"_attr = error.endpoint.shardName, + "status"_attr = error.error.getStatus()); + targeter->noteStaleDbResponse( + opCtx, + error.endpoint, + *error.error.getStatus().extraInfo()); + } + } } } @@ -137,25 +189,23 @@ std::vector execute(OperationContext* opCtx, targeter->noteCouldNotTarget(); } refreshedTargeter = true; - } + } else { + stdx::unordered_map errorsPerNamespace; - // Send the child batches and wait for responses. - executeChildBatches(opCtx, childBatches, bulkWriteOp); + // Send the child batches and wait for responses. + executeChildBatches(opCtx, childBatches, bulkWriteOp, errorsPerNamespace); - // 3: Abort the batch upon errors for ordered writes or transactions. - // TODO(SERVER-72792): Remove the logic below that mimics ok responses and process real - // batch responses. - for (const auto& childBatch : childBatches) { - bulkWriteOp.noteBatchResponse(*childBatch.second); + // If we saw any staleness errors, tell the targeters to invalidate their cache + // so that they may be refreshed. + noteStaleResponses(opCtx, targeters, errorsPerNamespace); } - - // 4: Refresh the targeter(s) if we receive a target error or a stale config/db error. if (bulkWriteOp.isFinished()) { // No need to refresh the targeters if we are done. break; } + // Refresh the targeter(s) if we received a target error or a stale config/db error. bool targeterChanged = false; try { LOGV2_DEBUG(7298200, 2, "Refreshing all targeters for bulkWrite"); @@ -337,11 +387,55 @@ void BulkWriteOp::abortBatch(const Status& status) { dassert(isFinished()); } -// TODO(SERVER-72792): Finish this and process real batch responses. -void BulkWriteOp::noteBatchResponse(const TargetedWriteBatch& targetedBatch) { - for (auto&& write : targetedBatch.getWrites()) { +void BulkWriteOp::noteBatchResponse( + TargetedWriteBatch& targetedBatch, + const std::vector& replyItems, + stdx::unordered_map& errorsPerNamespace) { + LOGV2_DEBUG(7279200, + 4, + "Processing bulk write response from shard.", + "shard"_attr = targetedBatch.getShardId(), + "replyItems"_attr = replyItems); + int index = -1; + bool ordered = _clientRequest.getOrdered(); + boost::optional lastError; + for (const auto& write : targetedBatch.getWrites()) { + ++index; WriteOp& writeOp = _writeOps[write->writeOpRef.first]; - writeOp.noteWriteComplete(*write); + // TODO (SERVER-76953) : Handle unordered operations + // When an error is encountered on an ordered bulk write, it is impossible for any of the + // remaining operations to have been executed. For that reason we cancel them here so they + // may be retargeted and retried. + if (ordered && lastError) { + invariant(index >= (int)replyItems.size()); + writeOp.cancelWrites(&*lastError); + continue; + } + + auto& reply = replyItems[index]; + + if (reply.getStatus().isOK()) { + writeOp.noteWriteComplete(*write); + } else { + lastError.emplace(reply.getIdx(), reply.getStatus()); + writeOp.noteWriteError(*write, *lastError); + + auto origWrite = BulkWriteCRUDOp(_clientRequest.getOps()[write->writeOpRef.first]); + auto nss = _clientRequest.getNsInfo()[origWrite.getNsInfoIdx()].getNs(); + + if (errorsPerNamespace.find(nss) == errorsPerNamespace.end()) { + TrackedErrors trackedErrors; + trackedErrors.startTracking(ErrorCodes::StaleConfig); + trackedErrors.startTracking(ErrorCodes::StaleDbVersion); + errorsPerNamespace.emplace(nss, trackedErrors); + } + + auto trackedErrors = errorsPerNamespace.find(nss); + invariant(trackedErrors != errorsPerNamespace.end()); + if (trackedErrors->second.isTracking(reply.getStatus().code())) { + trackedErrors->second.addError(ShardError(write->endpoint, *lastError)); + } + } } } diff --git a/src/mongo/s/write_ops/bulk_write_exec.h b/src/mongo/s/write_ops/bulk_write_exec.h index 51d862ad3f9..745312025bc 100644 --- a/src/mongo/s/write_ops/bulk_write_exec.h +++ b/src/mongo/s/write_ops/bulk_write_exec.h @@ -32,6 +32,7 @@ #include "mongo/bson/timestamp.h" #include "mongo/client/connection_string.h" #include "mongo/db/commands/bulk_write_gen.h" +#include "mongo/db/commands/bulk_write_parser.h" #include "mongo/db/repl/optime.h" #include "mongo/s/ns_targeter.h" #include "mongo/s/write_ops/batch_write_op.h" @@ -127,8 +128,14 @@ public: */ void abortBatch(const Status& status); - // TODO(SERVER-72792): Finish this and process real batch responses. - void noteBatchResponse(const TargetedWriteBatch& targetedBatch); + /** + * Processes the response to a TargetedWriteBatch. The response is captured by the vector of + * BulkWriteReplyItems. Sharding related errors are then grouped by namespace and captured in + * the map passed in. + */ + void noteBatchResponse(TargetedWriteBatch& targetedBatch, + const std::vector& replyItems, + stdx::unordered_map& errorsPerNamespace); /** * Returns a vector of BulkWriteReplyItem based on the end state of each individual write in diff --git a/src/mongo/s/write_ops/bulk_write_exec_test.cpp b/src/mongo/s/write_ops/bulk_write_exec_test.cpp index 4fa38aabe40..5dcd2ff15e7 100644 --- a/src/mongo/s/write_ops/bulk_write_exec_test.cpp +++ b/src/mongo/s/write_ops/bulk_write_exec_test.cpp @@ -905,54 +905,58 @@ public: } }; -TEST_F(BulkWriteExecTest, RefreshTargetersOnTargetErrors) { - ShardId shardIdA("shardA"); - ShardId shardIdB("shardB"); - NamespaceString nss0("foo.bar"); - NamespaceString nss1("bar.foo"); - ShardEndpoint endpoint0( - shardIdA, ShardVersionFactory::make(ChunkVersion::IGNORED(), boost::none), boost::none); - ShardEndpoint endpoint1( - shardIdB, - ShardVersionFactory::make(ChunkVersion({OID::gen(), Timestamp(2)}, {10, 11}), - boost::optional(boost::none)), - boost::none); - - std::vector> targeters; - // Initialize the targeter so that x >= 0 values are untargetable so target call will encounter - // an error. - targeters.push_back(initTargeterHalfRange(nss0, endpoint0)); - targeters.push_back(initTargeterFullRange(nss1, endpoint1)); - - auto targeter0 = static_cast(targeters[0].get()); - auto targeter1 = static_cast(targeters[1].get()); - - // Only the first op would get a target error. - BulkWriteCommandRequest request( - {BulkWriteInsertOp(0, BSON("x" << 1)), BulkWriteInsertOp(1, BSON("x" << 1))}, - {NamespaceInfoEntry(nss0), NamespaceInfoEntry(nss1)}); - - // Test unordered operations. Since only the first op is untargetable, the second op will - // succeed without errors. But bulk_write_exec::execute would retry on targeting errors and try - // to refresh the targeters upon targeting errors. - request.setOrdered(false); - auto replyItems = bulk_write_exec::execute(operationContext(), targeters, request); - ASSERT_EQUALS(replyItems.size(), 2u); - ASSERT_NOT_OK(replyItems[0].getStatus()); - ASSERT_OK(replyItems[1].getStatus()); - ASSERT_EQUALS(targeter0->getNumRefreshes(), 1); - ASSERT_EQUALS(targeter1->getNumRefreshes(), 1); - - // Test ordered operations. This is mostly the same as the test case above except that we should - // only return the first error for ordered operations. - request.setOrdered(true); - replyItems = bulk_write_exec::execute(operationContext(), targeters, request); - ASSERT_EQUALS(replyItems.size(), 1u); - ASSERT_NOT_OK(replyItems[0].getStatus()); - // We should have another refresh attempt. - ASSERT_EQUALS(targeter0->getNumRefreshes(), 2); - ASSERT_EQUALS(targeter1->getNumRefreshes(), 2); -} +// TODO (SERVER-76953): Uncomment after mongos can handle targeting errors in unordered ops. +// TEST_F(BulkWriteExecTest, RefreshTargetersOnTargetErrors) { +// ShardId shardIdA("shardA"); +// ShardId shardIdB("shardB"); +// NamespaceString nss0("foo.bar"); +// NamespaceString nss1("bar.foo"); +// ShardEndpoint endpoint0( +// shardIdA, ShardVersionFactory::make(ChunkVersion::IGNORED(), boost::none), boost::none); +// ShardEndpoint endpoint1( +// shardIdB, +// ShardVersionFactory::make(ChunkVersion({OID::gen(), Timestamp(2)}, {10, 11}), +// boost::optional(boost::none)), +// boost::none); + +// std::vector> targeters; +// // Initialize the targeter so that x >= 0 values are untargetable so target call will +// encounter +// // an error. +// targeters.push_back(initTargeterHalfRange(nss0, endpoint0)); +// targeters.push_back(initTargeterFullRange(nss1, endpoint1)); + +// auto targeter0 = static_cast(targeters[0].get()); +// auto targeter1 = static_cast(targeters[1].get()); + +// // Only the first op would get a target error. +// BulkWriteCommandRequest request( +// {BulkWriteInsertOp(0, BSON("x" << 1)), BulkWriteInsertOp(1, BSON("x" << 1))}, +// {NamespaceInfoEntry(nss0), NamespaceInfoEntry(nss1)}); + +// // Test unordered operations. Since only the first op is untargetable, the second op will +// // succeed without errors. But bulk_write_exec::execute would retry on targeting errors and +// try +// // to refresh the targeters upon targeting errors. +// request.setOrdered(false); +// auto replyItems = bulk_write_exec::execute(operationContext(), targeters, request); +// ASSERT_EQUALS(replyItems.size(), 2u); +// ASSERT_NOT_OK(replyItems[0].getStatus()); +// ASSERT_OK(replyItems[1].getStatus()); +// ASSERT_EQUALS(targeter0->getNumRefreshes(), 1); +// ASSERT_EQUALS(targeter1->getNumRefreshes(), 1); + +// // Test ordered operations. This is mostly the same as the test case above except that we +// should +// // only return the first error for ordered operations. +// request.setOrdered(true); +// replyItems = bulk_write_exec::execute(operationContext(), targeters, request); +// ASSERT_EQUALS(replyItems.size(), 1u); +// ASSERT_NOT_OK(replyItems[0].getStatus()); +// // We should have another refresh attempt. +// ASSERT_EQUALS(targeter0->getNumRefreshes(), 2); +// ASSERT_EQUALS(targeter1->getNumRefreshes(), 2); +// } TEST_F(BulkWriteExecTest, CollectionDroppedBeforeRefreshingTargeters) { ShardId shardId("shardA"); -- cgit v1.2.1