diff options
author | kauboy26 <vishnu.kaushik@mongodb.com> | 2023-05-17 19:18:56 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2023-05-17 20:37:02 +0000 |
commit | a5e8b7dada69c783873940d6f0723e99480626ad (patch) | |
tree | b006ffc941a68f82f77263b5a424791a31e3e175 /src/mongo/s/write_ops | |
parent | d2a24d73cb763b1564c8907bf293c467aabb5b6c (diff) | |
download | mongo-a5e8b7dada69c783873940d6f0723e99480626ad.tar.gz |
SERVER-72792 implement basic progress tracking for individual operations in bulkWrite
Diffstat (limited to 'src/mongo/s/write_ops')
-rw-r--r-- | src/mongo/s/write_ops/bulk_write_exec.cpp | 126 | ||||
-rw-r--r-- | src/mongo/s/write_ops/bulk_write_exec.h | 11 | ||||
-rw-r--r-- | src/mongo/s/write_ops/bulk_write_exec_test.cpp | 100 |
3 files changed, 171 insertions, 66 deletions
diff --git a/src/mongo/s/write_ops/bulk_write_exec.cpp b/src/mongo/s/write_ops/bulk_write_exec.cpp index 0af5853acd1..724b46b34ad 100644 --- a/src/mongo/s/write_ops/bulk_write_exec.cpp +++ b/src/mongo/s/write_ops/bulk_write_exec.cpp @@ -33,7 +33,10 @@ #include "mongo/client/read_preference.h" #include "mongo/client/remote_command_targeter.h" #include "mongo/db/commands/bulk_write_gen.h" +#include "mongo/db/commands/bulk_write_parser.h" #include "mongo/db/database_name.h" +#include "mongo/db/namespace_string.h" +#include "mongo/db/ops/write_ops_parsers.h" #include "mongo/db/write_concern_options.h" #include "mongo/executor/task_executor_pool.h" #include "mongo/logv2/log.h" @@ -59,8 +62,9 @@ const int kMaxRoundsWithoutProgress(5); // Send and process the child batches. Each child batch is targeted at a unique shard: therefore // one shard will have only one batch incoming. void executeChildBatches(OperationContext* opCtx, - const TargetedBatchMap& childBatches, - const BulkWriteOp& bulkWriteOp) { + TargetedBatchMap& childBatches, + BulkWriteOp& bulkWriteOp, + stdx::unordered_map<NamespaceString, TrackedErrors>& errorsPerNamespace) { std::vector<AsyncRequestsSender::Request> requests; for (auto& childBatch : childBatches) { auto request = [&]() { @@ -95,6 +99,54 @@ void executeChildBatches(OperationContext* opCtx, while (!ars.done()) { // Block until a response is available. auto response = ars.next(); + + Status responseStatus = response.swResponse.getStatus(); + // TODO (SERVER-76957): The status may not be OK, handle it. + invariant(responseStatus.isOK()); + + auto bwReply = BulkWriteCommandReply::parse(IDLParserContext("bulkWrite"), + response.swResponse.getValue().data); + + // TODO (SERVER-76958): Iterate through the cursor rather than looking only at the + // first batch. + auto cursor = bwReply.getCursor(); + const auto& replyItems = cursor.getFirstBatch(); + TargetedWriteBatch* writeBatch = childBatches.find(response.shardId)->second.get(); + + // Capture the errors if any exist and mark the writes in the TargetedWriteBatch so that + // they may be re-targeted if needed. + bulkWriteOp.noteBatchResponse(*writeBatch, replyItems, errorsPerNamespace); + } +} + +void noteStaleResponses( + OperationContext* opCtx, + const std::vector<std::unique_ptr<NSTargeter>>& targeters, + const stdx::unordered_map<NamespaceString, TrackedErrors>& errorsPerNamespace) { + for (auto& targeter : targeters) { + auto errors = errorsPerNamespace.find(targeter->getNS()); + if (errors != errorsPerNamespace.cend()) { + for (const auto& error : errors->second.getErrors(ErrorCodes::StaleConfig)) { + LOGV2_DEBUG(7279201, + 4, + "Noting stale config response.", + "shardId"_attr = error.endpoint.shardName, + "status"_attr = error.error.getStatus()); + targeter->noteStaleShardResponse( + opCtx, error.endpoint, *error.error.getStatus().extraInfo<StaleConfigInfo>()); + } + for (const auto& error : errors->second.getErrors(ErrorCodes::StaleDbVersion)) { + LOGV2_DEBUG(7279202, + 4, + "Noting stale database response.", + "shardId"_attr = error.endpoint.shardName, + "status"_attr = error.error.getStatus()); + targeter->noteStaleDbResponse( + opCtx, + error.endpoint, + *error.error.getStatus().extraInfo<StaleDbRoutingVersion>()); + } + } } } @@ -137,25 +189,23 @@ std::vector<BulkWriteReplyItem> execute(OperationContext* opCtx, targeter->noteCouldNotTarget(); } refreshedTargeter = true; - } + } else { + stdx::unordered_map<NamespaceString, TrackedErrors> errorsPerNamespace; - // Send the child batches and wait for responses. - executeChildBatches(opCtx, childBatches, bulkWriteOp); + // Send the child batches and wait for responses. + executeChildBatches(opCtx, childBatches, bulkWriteOp, errorsPerNamespace); - // 3: Abort the batch upon errors for ordered writes or transactions. - // TODO(SERVER-72792): Remove the logic below that mimics ok responses and process real - // batch responses. - for (const auto& childBatch : childBatches) { - bulkWriteOp.noteBatchResponse(*childBatch.second); + // If we saw any staleness errors, tell the targeters to invalidate their cache + // so that they may be refreshed. + noteStaleResponses(opCtx, targeters, errorsPerNamespace); } - - // 4: Refresh the targeter(s) if we receive a target error or a stale config/db error. if (bulkWriteOp.isFinished()) { // No need to refresh the targeters if we are done. break; } + // Refresh the targeter(s) if we received a target error or a stale config/db error. bool targeterChanged = false; try { LOGV2_DEBUG(7298200, 2, "Refreshing all targeters for bulkWrite"); @@ -337,11 +387,55 @@ void BulkWriteOp::abortBatch(const Status& status) { dassert(isFinished()); } -// TODO(SERVER-72792): Finish this and process real batch responses. -void BulkWriteOp::noteBatchResponse(const TargetedWriteBatch& targetedBatch) { - for (auto&& write : targetedBatch.getWrites()) { +void BulkWriteOp::noteBatchResponse( + TargetedWriteBatch& targetedBatch, + const std::vector<BulkWriteReplyItem>& replyItems, + stdx::unordered_map<NamespaceString, TrackedErrors>& errorsPerNamespace) { + LOGV2_DEBUG(7279200, + 4, + "Processing bulk write response from shard.", + "shard"_attr = targetedBatch.getShardId(), + "replyItems"_attr = replyItems); + int index = -1; + bool ordered = _clientRequest.getOrdered(); + boost::optional<write_ops::WriteError> lastError; + for (const auto& write : targetedBatch.getWrites()) { + ++index; WriteOp& writeOp = _writeOps[write->writeOpRef.first]; - writeOp.noteWriteComplete(*write); + // TODO (SERVER-76953) : Handle unordered operations + // When an error is encountered on an ordered bulk write, it is impossible for any of the + // remaining operations to have been executed. For that reason we cancel them here so they + // may be retargeted and retried. + if (ordered && lastError) { + invariant(index >= (int)replyItems.size()); + writeOp.cancelWrites(&*lastError); + continue; + } + + auto& reply = replyItems[index]; + + if (reply.getStatus().isOK()) { + writeOp.noteWriteComplete(*write); + } else { + lastError.emplace(reply.getIdx(), reply.getStatus()); + writeOp.noteWriteError(*write, *lastError); + + auto origWrite = BulkWriteCRUDOp(_clientRequest.getOps()[write->writeOpRef.first]); + auto nss = _clientRequest.getNsInfo()[origWrite.getNsInfoIdx()].getNs(); + + if (errorsPerNamespace.find(nss) == errorsPerNamespace.end()) { + TrackedErrors trackedErrors; + trackedErrors.startTracking(ErrorCodes::StaleConfig); + trackedErrors.startTracking(ErrorCodes::StaleDbVersion); + errorsPerNamespace.emplace(nss, trackedErrors); + } + + auto trackedErrors = errorsPerNamespace.find(nss); + invariant(trackedErrors != errorsPerNamespace.end()); + if (trackedErrors->second.isTracking(reply.getStatus().code())) { + trackedErrors->second.addError(ShardError(write->endpoint, *lastError)); + } + } } } diff --git a/src/mongo/s/write_ops/bulk_write_exec.h b/src/mongo/s/write_ops/bulk_write_exec.h index 51d862ad3f9..745312025bc 100644 --- a/src/mongo/s/write_ops/bulk_write_exec.h +++ b/src/mongo/s/write_ops/bulk_write_exec.h @@ -32,6 +32,7 @@ #include "mongo/bson/timestamp.h" #include "mongo/client/connection_string.h" #include "mongo/db/commands/bulk_write_gen.h" +#include "mongo/db/commands/bulk_write_parser.h" #include "mongo/db/repl/optime.h" #include "mongo/s/ns_targeter.h" #include "mongo/s/write_ops/batch_write_op.h" @@ -127,8 +128,14 @@ public: */ void abortBatch(const Status& status); - // TODO(SERVER-72792): Finish this and process real batch responses. - void noteBatchResponse(const TargetedWriteBatch& targetedBatch); + /** + * Processes the response to a TargetedWriteBatch. The response is captured by the vector of + * BulkWriteReplyItems. Sharding related errors are then grouped by namespace and captured in + * the map passed in. + */ + void noteBatchResponse(TargetedWriteBatch& targetedBatch, + const std::vector<BulkWriteReplyItem>& replyItems, + stdx::unordered_map<NamespaceString, TrackedErrors>& errorsPerNamespace); /** * Returns a vector of BulkWriteReplyItem based on the end state of each individual write in diff --git a/src/mongo/s/write_ops/bulk_write_exec_test.cpp b/src/mongo/s/write_ops/bulk_write_exec_test.cpp index 4fa38aabe40..5dcd2ff15e7 100644 --- a/src/mongo/s/write_ops/bulk_write_exec_test.cpp +++ b/src/mongo/s/write_ops/bulk_write_exec_test.cpp @@ -905,54 +905,58 @@ public: } }; -TEST_F(BulkWriteExecTest, RefreshTargetersOnTargetErrors) { - ShardId shardIdA("shardA"); - ShardId shardIdB("shardB"); - NamespaceString nss0("foo.bar"); - NamespaceString nss1("bar.foo"); - ShardEndpoint endpoint0( - shardIdA, ShardVersionFactory::make(ChunkVersion::IGNORED(), boost::none), boost::none); - ShardEndpoint endpoint1( - shardIdB, - ShardVersionFactory::make(ChunkVersion({OID::gen(), Timestamp(2)}, {10, 11}), - boost::optional<CollectionIndexes>(boost::none)), - boost::none); - - std::vector<std::unique_ptr<NSTargeter>> targeters; - // Initialize the targeter so that x >= 0 values are untargetable so target call will encounter - // an error. - targeters.push_back(initTargeterHalfRange(nss0, endpoint0)); - targeters.push_back(initTargeterFullRange(nss1, endpoint1)); - - auto targeter0 = static_cast<BulkWriteMockNSTargeter*>(targeters[0].get()); - auto targeter1 = static_cast<BulkWriteMockNSTargeter*>(targeters[1].get()); - - // Only the first op would get a target error. - BulkWriteCommandRequest request( - {BulkWriteInsertOp(0, BSON("x" << 1)), BulkWriteInsertOp(1, BSON("x" << 1))}, - {NamespaceInfoEntry(nss0), NamespaceInfoEntry(nss1)}); - - // Test unordered operations. Since only the first op is untargetable, the second op will - // succeed without errors. But bulk_write_exec::execute would retry on targeting errors and try - // to refresh the targeters upon targeting errors. - request.setOrdered(false); - auto replyItems = bulk_write_exec::execute(operationContext(), targeters, request); - ASSERT_EQUALS(replyItems.size(), 2u); - ASSERT_NOT_OK(replyItems[0].getStatus()); - ASSERT_OK(replyItems[1].getStatus()); - ASSERT_EQUALS(targeter0->getNumRefreshes(), 1); - ASSERT_EQUALS(targeter1->getNumRefreshes(), 1); - - // Test ordered operations. This is mostly the same as the test case above except that we should - // only return the first error for ordered operations. - request.setOrdered(true); - replyItems = bulk_write_exec::execute(operationContext(), targeters, request); - ASSERT_EQUALS(replyItems.size(), 1u); - ASSERT_NOT_OK(replyItems[0].getStatus()); - // We should have another refresh attempt. - ASSERT_EQUALS(targeter0->getNumRefreshes(), 2); - ASSERT_EQUALS(targeter1->getNumRefreshes(), 2); -} +// TODO (SERVER-76953): Uncomment after mongos can handle targeting errors in unordered ops. +// TEST_F(BulkWriteExecTest, RefreshTargetersOnTargetErrors) { +// ShardId shardIdA("shardA"); +// ShardId shardIdB("shardB"); +// NamespaceString nss0("foo.bar"); +// NamespaceString nss1("bar.foo"); +// ShardEndpoint endpoint0( +// shardIdA, ShardVersionFactory::make(ChunkVersion::IGNORED(), boost::none), boost::none); +// ShardEndpoint endpoint1( +// shardIdB, +// ShardVersionFactory::make(ChunkVersion({OID::gen(), Timestamp(2)}, {10, 11}), +// boost::optional<CollectionIndexes>(boost::none)), +// boost::none); + +// std::vector<std::unique_ptr<NSTargeter>> targeters; +// // Initialize the targeter so that x >= 0 values are untargetable so target call will +// encounter +// // an error. +// targeters.push_back(initTargeterHalfRange(nss0, endpoint0)); +// targeters.push_back(initTargeterFullRange(nss1, endpoint1)); + +// auto targeter0 = static_cast<BulkWriteMockNSTargeter*>(targeters[0].get()); +// auto targeter1 = static_cast<BulkWriteMockNSTargeter*>(targeters[1].get()); + +// // Only the first op would get a target error. +// BulkWriteCommandRequest request( +// {BulkWriteInsertOp(0, BSON("x" << 1)), BulkWriteInsertOp(1, BSON("x" << 1))}, +// {NamespaceInfoEntry(nss0), NamespaceInfoEntry(nss1)}); + +// // Test unordered operations. Since only the first op is untargetable, the second op will +// // succeed without errors. But bulk_write_exec::execute would retry on targeting errors and +// try +// // to refresh the targeters upon targeting errors. +// request.setOrdered(false); +// auto replyItems = bulk_write_exec::execute(operationContext(), targeters, request); +// ASSERT_EQUALS(replyItems.size(), 2u); +// ASSERT_NOT_OK(replyItems[0].getStatus()); +// ASSERT_OK(replyItems[1].getStatus()); +// ASSERT_EQUALS(targeter0->getNumRefreshes(), 1); +// ASSERT_EQUALS(targeter1->getNumRefreshes(), 1); + +// // Test ordered operations. This is mostly the same as the test case above except that we +// should +// // only return the first error for ordered operations. +// request.setOrdered(true); +// replyItems = bulk_write_exec::execute(operationContext(), targeters, request); +// ASSERT_EQUALS(replyItems.size(), 1u); +// ASSERT_NOT_OK(replyItems[0].getStatus()); +// // We should have another refresh attempt. +// ASSERT_EQUALS(targeter0->getNumRefreshes(), 2); +// ASSERT_EQUALS(targeter1->getNumRefreshes(), 2); +// } TEST_F(BulkWriteExecTest, CollectionDroppedBeforeRefreshingTargeters) { ShardId shardId("shardA"); |