diff options
Diffstat (limited to 'src/mongo/s/write_ops/bulk_write_exec.cpp')
-rw-r--r-- | src/mongo/s/write_ops/bulk_write_exec.cpp | 126 |
1 files changed, 110 insertions, 16 deletions
diff --git a/src/mongo/s/write_ops/bulk_write_exec.cpp b/src/mongo/s/write_ops/bulk_write_exec.cpp index 0af5853acd1..724b46b34ad 100644 --- a/src/mongo/s/write_ops/bulk_write_exec.cpp +++ b/src/mongo/s/write_ops/bulk_write_exec.cpp @@ -33,7 +33,10 @@ #include "mongo/client/read_preference.h" #include "mongo/client/remote_command_targeter.h" #include "mongo/db/commands/bulk_write_gen.h" +#include "mongo/db/commands/bulk_write_parser.h" #include "mongo/db/database_name.h" +#include "mongo/db/namespace_string.h" +#include "mongo/db/ops/write_ops_parsers.h" #include "mongo/db/write_concern_options.h" #include "mongo/executor/task_executor_pool.h" #include "mongo/logv2/log.h" @@ -59,8 +62,9 @@ const int kMaxRoundsWithoutProgress(5); // Send and process the child batches. Each child batch is targeted at a unique shard: therefore // one shard will have only one batch incoming. void executeChildBatches(OperationContext* opCtx, - const TargetedBatchMap& childBatches, - const BulkWriteOp& bulkWriteOp) { + TargetedBatchMap& childBatches, + BulkWriteOp& bulkWriteOp, + stdx::unordered_map<NamespaceString, TrackedErrors>& errorsPerNamespace) { std::vector<AsyncRequestsSender::Request> requests; for (auto& childBatch : childBatches) { auto request = [&]() { @@ -95,6 +99,54 @@ void executeChildBatches(OperationContext* opCtx, while (!ars.done()) { // Block until a response is available. auto response = ars.next(); + + Status responseStatus = response.swResponse.getStatus(); + // TODO (SERVER-76957): The status may not be OK, handle it. + invariant(responseStatus.isOK()); + + auto bwReply = BulkWriteCommandReply::parse(IDLParserContext("bulkWrite"), + response.swResponse.getValue().data); + + // TODO (SERVER-76958): Iterate through the cursor rather than looking only at the + // first batch. + auto cursor = bwReply.getCursor(); + const auto& replyItems = cursor.getFirstBatch(); + TargetedWriteBatch* writeBatch = childBatches.find(response.shardId)->second.get(); + + // Capture the errors if any exist and mark the writes in the TargetedWriteBatch so that + // they may be re-targeted if needed. + bulkWriteOp.noteBatchResponse(*writeBatch, replyItems, errorsPerNamespace); + } +} + +void noteStaleResponses( + OperationContext* opCtx, + const std::vector<std::unique_ptr<NSTargeter>>& targeters, + const stdx::unordered_map<NamespaceString, TrackedErrors>& errorsPerNamespace) { + for (auto& targeter : targeters) { + auto errors = errorsPerNamespace.find(targeter->getNS()); + if (errors != errorsPerNamespace.cend()) { + for (const auto& error : errors->second.getErrors(ErrorCodes::StaleConfig)) { + LOGV2_DEBUG(7279201, + 4, + "Noting stale config response.", + "shardId"_attr = error.endpoint.shardName, + "status"_attr = error.error.getStatus()); + targeter->noteStaleShardResponse( + opCtx, error.endpoint, *error.error.getStatus().extraInfo<StaleConfigInfo>()); + } + for (const auto& error : errors->second.getErrors(ErrorCodes::StaleDbVersion)) { + LOGV2_DEBUG(7279202, + 4, + "Noting stale database response.", + "shardId"_attr = error.endpoint.shardName, + "status"_attr = error.error.getStatus()); + targeter->noteStaleDbResponse( + opCtx, + error.endpoint, + *error.error.getStatus().extraInfo<StaleDbRoutingVersion>()); + } + } } } @@ -137,25 +189,23 @@ std::vector<BulkWriteReplyItem> execute(OperationContext* opCtx, targeter->noteCouldNotTarget(); } refreshedTargeter = true; - } + } else { + stdx::unordered_map<NamespaceString, TrackedErrors> errorsPerNamespace; - // Send the child batches and wait for responses. - executeChildBatches(opCtx, childBatches, bulkWriteOp); + // Send the child batches and wait for responses. + executeChildBatches(opCtx, childBatches, bulkWriteOp, errorsPerNamespace); - // 3: Abort the batch upon errors for ordered writes or transactions. - // TODO(SERVER-72792): Remove the logic below that mimics ok responses and process real - // batch responses. - for (const auto& childBatch : childBatches) { - bulkWriteOp.noteBatchResponse(*childBatch.second); + // If we saw any staleness errors, tell the targeters to invalidate their cache + // so that they may be refreshed. + noteStaleResponses(opCtx, targeters, errorsPerNamespace); } - - // 4: Refresh the targeter(s) if we receive a target error or a stale config/db error. if (bulkWriteOp.isFinished()) { // No need to refresh the targeters if we are done. break; } + // Refresh the targeter(s) if we received a target error or a stale config/db error. bool targeterChanged = false; try { LOGV2_DEBUG(7298200, 2, "Refreshing all targeters for bulkWrite"); @@ -337,11 +387,55 @@ void BulkWriteOp::abortBatch(const Status& status) { dassert(isFinished()); } -// TODO(SERVER-72792): Finish this and process real batch responses. -void BulkWriteOp::noteBatchResponse(const TargetedWriteBatch& targetedBatch) { - for (auto&& write : targetedBatch.getWrites()) { +void BulkWriteOp::noteBatchResponse( + TargetedWriteBatch& targetedBatch, + const std::vector<BulkWriteReplyItem>& replyItems, + stdx::unordered_map<NamespaceString, TrackedErrors>& errorsPerNamespace) { + LOGV2_DEBUG(7279200, + 4, + "Processing bulk write response from shard.", + "shard"_attr = targetedBatch.getShardId(), + "replyItems"_attr = replyItems); + int index = -1; + bool ordered = _clientRequest.getOrdered(); + boost::optional<write_ops::WriteError> lastError; + for (const auto& write : targetedBatch.getWrites()) { + ++index; WriteOp& writeOp = _writeOps[write->writeOpRef.first]; - writeOp.noteWriteComplete(*write); + // TODO (SERVER-76953) : Handle unordered operations + // When an error is encountered on an ordered bulk write, it is impossible for any of the + // remaining operations to have been executed. For that reason we cancel them here so they + // may be retargeted and retried. + if (ordered && lastError) { + invariant(index >= (int)replyItems.size()); + writeOp.cancelWrites(&*lastError); + continue; + } + + auto& reply = replyItems[index]; + + if (reply.getStatus().isOK()) { + writeOp.noteWriteComplete(*write); + } else { + lastError.emplace(reply.getIdx(), reply.getStatus()); + writeOp.noteWriteError(*write, *lastError); + + auto origWrite = BulkWriteCRUDOp(_clientRequest.getOps()[write->writeOpRef.first]); + auto nss = _clientRequest.getNsInfo()[origWrite.getNsInfoIdx()].getNs(); + + if (errorsPerNamespace.find(nss) == errorsPerNamespace.end()) { + TrackedErrors trackedErrors; + trackedErrors.startTracking(ErrorCodes::StaleConfig); + trackedErrors.startTracking(ErrorCodes::StaleDbVersion); + errorsPerNamespace.emplace(nss, trackedErrors); + } + + auto trackedErrors = errorsPerNamespace.find(nss); + invariant(trackedErrors != errorsPerNamespace.end()); + if (trackedErrors->second.isTracking(reply.getStatus().code())) { + trackedErrors->second.addError(ShardError(write->endpoint, *lastError)); + } + } } } |