summaryrefslogtreecommitdiff
path: root/src/mongo/s/write_ops/bulk_write_exec.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/s/write_ops/bulk_write_exec.cpp')
-rw-r--r--src/mongo/s/write_ops/bulk_write_exec.cpp126
1 files changed, 110 insertions, 16 deletions
diff --git a/src/mongo/s/write_ops/bulk_write_exec.cpp b/src/mongo/s/write_ops/bulk_write_exec.cpp
index 0af5853acd1..724b46b34ad 100644
--- a/src/mongo/s/write_ops/bulk_write_exec.cpp
+++ b/src/mongo/s/write_ops/bulk_write_exec.cpp
@@ -33,7 +33,10 @@
#include "mongo/client/read_preference.h"
#include "mongo/client/remote_command_targeter.h"
#include "mongo/db/commands/bulk_write_gen.h"
+#include "mongo/db/commands/bulk_write_parser.h"
#include "mongo/db/database_name.h"
+#include "mongo/db/namespace_string.h"
+#include "mongo/db/ops/write_ops_parsers.h"
#include "mongo/db/write_concern_options.h"
#include "mongo/executor/task_executor_pool.h"
#include "mongo/logv2/log.h"
@@ -59,8 +62,9 @@ const int kMaxRoundsWithoutProgress(5);
// Send and process the child batches. Each child batch is targeted at a unique shard: therefore
// one shard will have only one batch incoming.
void executeChildBatches(OperationContext* opCtx,
- const TargetedBatchMap& childBatches,
- const BulkWriteOp& bulkWriteOp) {
+ TargetedBatchMap& childBatches,
+ BulkWriteOp& bulkWriteOp,
+ stdx::unordered_map<NamespaceString, TrackedErrors>& errorsPerNamespace) {
std::vector<AsyncRequestsSender::Request> requests;
for (auto& childBatch : childBatches) {
auto request = [&]() {
@@ -95,6 +99,54 @@ void executeChildBatches(OperationContext* opCtx,
while (!ars.done()) {
// Block until a response is available.
auto response = ars.next();
+
+ Status responseStatus = response.swResponse.getStatus();
+ // TODO (SERVER-76957): The status may not be OK, handle it.
+ invariant(responseStatus.isOK());
+
+ auto bwReply = BulkWriteCommandReply::parse(IDLParserContext("bulkWrite"),
+ response.swResponse.getValue().data);
+
+ // TODO (SERVER-76958): Iterate through the cursor rather than looking only at the
+ // first batch.
+ auto cursor = bwReply.getCursor();
+ const auto& replyItems = cursor.getFirstBatch();
+ TargetedWriteBatch* writeBatch = childBatches.find(response.shardId)->second.get();
+
+ // Capture the errors if any exist and mark the writes in the TargetedWriteBatch so that
+ // they may be re-targeted if needed.
+ bulkWriteOp.noteBatchResponse(*writeBatch, replyItems, errorsPerNamespace);
+ }
+}
+
+void noteStaleResponses(
+ OperationContext* opCtx,
+ const std::vector<std::unique_ptr<NSTargeter>>& targeters,
+ const stdx::unordered_map<NamespaceString, TrackedErrors>& errorsPerNamespace) {
+ for (auto& targeter : targeters) {
+ auto errors = errorsPerNamespace.find(targeter->getNS());
+ if (errors != errorsPerNamespace.cend()) {
+ for (const auto& error : errors->second.getErrors(ErrorCodes::StaleConfig)) {
+ LOGV2_DEBUG(7279201,
+ 4,
+ "Noting stale config response.",
+ "shardId"_attr = error.endpoint.shardName,
+ "status"_attr = error.error.getStatus());
+ targeter->noteStaleShardResponse(
+ opCtx, error.endpoint, *error.error.getStatus().extraInfo<StaleConfigInfo>());
+ }
+ for (const auto& error : errors->second.getErrors(ErrorCodes::StaleDbVersion)) {
+ LOGV2_DEBUG(7279202,
+ 4,
+ "Noting stale database response.",
+ "shardId"_attr = error.endpoint.shardName,
+ "status"_attr = error.error.getStatus());
+ targeter->noteStaleDbResponse(
+ opCtx,
+ error.endpoint,
+ *error.error.getStatus().extraInfo<StaleDbRoutingVersion>());
+ }
+ }
}
}
@@ -137,25 +189,23 @@ std::vector<BulkWriteReplyItem> execute(OperationContext* opCtx,
targeter->noteCouldNotTarget();
}
refreshedTargeter = true;
- }
+ } else {
+ stdx::unordered_map<NamespaceString, TrackedErrors> errorsPerNamespace;
- // Send the child batches and wait for responses.
- executeChildBatches(opCtx, childBatches, bulkWriteOp);
+ // Send the child batches and wait for responses.
+ executeChildBatches(opCtx, childBatches, bulkWriteOp, errorsPerNamespace);
- // 3: Abort the batch upon errors for ordered writes or transactions.
- // TODO(SERVER-72792): Remove the logic below that mimics ok responses and process real
- // batch responses.
- for (const auto& childBatch : childBatches) {
- bulkWriteOp.noteBatchResponse(*childBatch.second);
+ // If we saw any staleness errors, tell the targeters to invalidate their cache
+ // so that they may be refreshed.
+ noteStaleResponses(opCtx, targeters, errorsPerNamespace);
}
-
- // 4: Refresh the targeter(s) if we receive a target error or a stale config/db error.
if (bulkWriteOp.isFinished()) {
// No need to refresh the targeters if we are done.
break;
}
+ // Refresh the targeter(s) if we received a target error or a stale config/db error.
bool targeterChanged = false;
try {
LOGV2_DEBUG(7298200, 2, "Refreshing all targeters for bulkWrite");
@@ -337,11 +387,55 @@ void BulkWriteOp::abortBatch(const Status& status) {
dassert(isFinished());
}
-// TODO(SERVER-72792): Finish this and process real batch responses.
-void BulkWriteOp::noteBatchResponse(const TargetedWriteBatch& targetedBatch) {
- for (auto&& write : targetedBatch.getWrites()) {
+void BulkWriteOp::noteBatchResponse(
+ TargetedWriteBatch& targetedBatch,
+ const std::vector<BulkWriteReplyItem>& replyItems,
+ stdx::unordered_map<NamespaceString, TrackedErrors>& errorsPerNamespace) {
+ LOGV2_DEBUG(7279200,
+ 4,
+ "Processing bulk write response from shard.",
+ "shard"_attr = targetedBatch.getShardId(),
+ "replyItems"_attr = replyItems);
+ int index = -1;
+ bool ordered = _clientRequest.getOrdered();
+ boost::optional<write_ops::WriteError> lastError;
+ for (const auto& write : targetedBatch.getWrites()) {
+ ++index;
WriteOp& writeOp = _writeOps[write->writeOpRef.first];
- writeOp.noteWriteComplete(*write);
+ // TODO (SERVER-76953) : Handle unordered operations
+ // When an error is encountered on an ordered bulk write, it is impossible for any of the
+ // remaining operations to have been executed. For that reason we cancel them here so they
+ // may be retargeted and retried.
+ if (ordered && lastError) {
+ invariant(index >= (int)replyItems.size());
+ writeOp.cancelWrites(&*lastError);
+ continue;
+ }
+
+ auto& reply = replyItems[index];
+
+ if (reply.getStatus().isOK()) {
+ writeOp.noteWriteComplete(*write);
+ } else {
+ lastError.emplace(reply.getIdx(), reply.getStatus());
+ writeOp.noteWriteError(*write, *lastError);
+
+ auto origWrite = BulkWriteCRUDOp(_clientRequest.getOps()[write->writeOpRef.first]);
+ auto nss = _clientRequest.getNsInfo()[origWrite.getNsInfoIdx()].getNs();
+
+ if (errorsPerNamespace.find(nss) == errorsPerNamespace.end()) {
+ TrackedErrors trackedErrors;
+ trackedErrors.startTracking(ErrorCodes::StaleConfig);
+ trackedErrors.startTracking(ErrorCodes::StaleDbVersion);
+ errorsPerNamespace.emplace(nss, trackedErrors);
+ }
+
+ auto trackedErrors = errorsPerNamespace.find(nss);
+ invariant(trackedErrors != errorsPerNamespace.end());
+ if (trackedErrors->second.isTracking(reply.getStatus().code())) {
+ trackedErrors->second.addError(ShardError(write->endpoint, *lastError));
+ }
+ }
}
}