summaryrefslogtreecommitdiff
path: root/src/mongo/db/s/sharding_util.cpp
diff options
context:
space:
mode:
authorJordi Serra Torrens <jordi.serra-torrens@mongodb.com>2021-02-17 12:54:44 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-02-25 08:49:19 +0000
commit1d3c1ca0ddb2c984ca66381cc39565cbec01c645 (patch)
tree9f3d69748bd8eaee8fa2af56792981f722699769 /src/mongo/db/s/sharding_util.cpp
parentd1a3ee71e82eb35a7f1e1ea6fcee4fa920317346 (diff)
downloadmongo-1d3c1ca0ddb2c984ca66381cc39565cbec01c645.tar.gz
SERVER-53861: Implement stop migrations procedure for DDL operations
Diffstat (limited to 'src/mongo/db/s/sharding_util.cpp')
-rw-r--r--src/mongo/db/s/sharding_util.cpp103
1 files changed, 103 insertions, 0 deletions
diff --git a/src/mongo/db/s/sharding_util.cpp b/src/mongo/db/s/sharding_util.cpp
new file mode 100644
index 00000000000..7f0d5080bdb
--- /dev/null
+++ b/src/mongo/db/s/sharding_util.cpp
@@ -0,0 +1,103 @@
+/**
+ * Copyright (C) 2021-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/s/sharding_util.h"
+
+#include "mongo/db/commands.h"
+#include "mongo/logv2/log.h"
+#include "mongo/s/async_requests_sender.h"
+#include "mongo/s/request_types/flush_routing_table_cache_updates_gen.h"
+
+namespace mongo {
+
+namespace sharding_util {
+
+void tellShardsToRefreshCollection(OperationContext* opCtx,
+ const std::vector<ShardId>& shardIds,
+ const NamespaceString& nss,
+ const std::shared_ptr<executor::TaskExecutor>& executor) {
+ auto cmd = _flushRoutingTableCacheUpdatesWithWriteConcern(nss);
+ cmd.setSyncFromConfig(true);
+ cmd.setDbName(nss.db());
+ auto cmdObj = CommandHelpers::appendMajorityWriteConcern(cmd.toBSON({}));
+
+ sendCommandToShards(opCtx, cmdObj, shardIds, nss, executor);
+}
+
+void sendCommandToShards(OperationContext* opCtx,
+ const BSONObj& command,
+ const std::vector<ShardId>& shardIds,
+ const NamespaceString& nss,
+ const std::shared_ptr<executor::TaskExecutor>& executor) {
+ std::vector<AsyncRequestsSender::Request> requests;
+ for (const auto& shardId : shardIds) {
+ requests.emplace_back(shardId, command);
+ }
+
+ if (!requests.empty()) {
+ // The _flushRoutingTableCacheUpdatesWithWriteConcern command will fail with a
+ // QueryPlanKilled error response if the config.cache.chunks collection is dropped
+ // concurrently. The config.cache.chunks collection is dropped by the shard when it detects
+ // the sharded collection's epoch having changed. We use kIdempotentOrCursorInvalidated so
+ // the ARS automatically retries in that situation.
+ AsyncRequestsSender ars(opCtx,
+ executor,
+ "admin",
+ requests,
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly),
+ Shard::RetryPolicy::kIdempotentOrCursorInvalidated);
+
+ while (!ars.done()) {
+ // Retrieve the responses and throw at the first failure.
+ auto response = ars.next();
+
+ auto generateErrorContext = [&]() -> std::string {
+ return str::stream()
+ << "Unable to _flushRoutingTableCacheUpdatesWithWriteConcern for namespace "
+ << nss.ns() << " on " << response.shardId;
+ };
+
+ auto shardResponse =
+ uassertStatusOKWithContext(std::move(response.swResponse), generateErrorContext());
+
+ auto status = getStatusFromCommandResult(shardResponse.data);
+ uassertStatusOKWithContext(status, generateErrorContext());
+
+ auto wcStatus = getWriteConcernStatusFromCommandResult(shardResponse.data);
+ uassertStatusOKWithContext(wcStatus, generateErrorContext());
+ }
+ }
+}
+
+} // namespace sharding_util
+} // namespace mongo