summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorHaley Connelly <haley.connelly@mongodb.com>2020-08-18 20:27:42 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-08-28 19:16:19 +0000
commit0816d7c7cb8ba73aaf2780eb9b295509e5b4b9f7 (patch)
treea360c82d8ee0f943be69be4634b0173445c1d23c /src
parent7bc9d6f843b1ca2df2bdc00a895d825acda4f446 (diff)
downloadmongo-0816d7c7cb8ba73aaf2780eb9b295509e5b4b9f7.tar.gz
SERVER-49567 Create refresh mechanism to refresh a list of given shards
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/s/SConscript2
-rw-r--r--src/mongo/db/s/resharding_util.cpp51
-rw-r--r--src/mongo/db/s/resharding_util.h11
-rw-r--r--src/mongo/db/s/resharding_util_refresh_test.cpp122
4 files changed, 186 insertions, 0 deletions
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index e8b56544b60..c00293a975d 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -140,6 +140,7 @@ env.Library(
'$BUILD_DIR/mongo/db/pipeline/expression_context',
'$BUILD_DIR/mongo/db/pipeline/pipeline',
'$BUILD_DIR/mongo/db/storage/write_unit_of_work',
+ '$BUILD_DIR/mongo/s/async_requests_sender',
'$BUILD_DIR/mongo/s/common_s',
'$BUILD_DIR/mongo/s/grid',
],
@@ -523,6 +524,7 @@ env.CppUnitTest(
'balancer/type_migration_test.cpp',
'config_server_op_observer_test.cpp',
'vector_clock_config_server_test.cpp',
+ 'resharding_util_refresh_test.cpp',
'resharding_util_test.cpp',
],
LIBDEPS=[
diff --git a/src/mongo/db/s/resharding_util.cpp b/src/mongo/db/s/resharding_util.cpp
index 5939c83f97b..d3335c21988 100644
--- a/src/mongo/db/s/resharding_util.cpp
+++ b/src/mongo/db/s/resharding_util.cpp
@@ -27,6 +27,8 @@
* it in the license file.
*/
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding
+
#include "mongo/platform/basic.h"
#include "mongo/bson/bsonobj.h"
@@ -38,10 +40,59 @@
#include "mongo/db/pipeline/document_source_sort.h"
#include "mongo/db/s/resharding_util.h"
#include "mongo/db/storage/write_unit_of_work.h"
+#include "mongo/logv2/log.h"
+#include "mongo/rpc/get_status_from_command_result.h"
+#include "mongo/s/async_requests_sender.h"
#include "mongo/s/grid.h"
+#include "mongo/s/request_types/flush_routing_table_cache_updates_gen.h"
namespace mongo {
+void tellShardsToRefresh(OperationContext* opCtx,
+ const std::vector<ShardId>& shardIds,
+ const NamespaceString& nss,
+ std::shared_ptr<executor::TaskExecutor> executor) {
+ auto cmd = _flushRoutingTableCacheUpdatesWithWriteConcern(nss);
+ cmd.setSyncFromConfig(true);
+ cmd.setDbName(nss.db());
+ auto cmdObj =
+ cmd.toBSON(BSON(WriteConcernOptions::kWriteConcernField << WriteConcernOptions::Majority));
+
+ std::vector<AsyncRequestsSender::Request> requests;
+ for (const auto& shardId : shardIds) {
+ requests.emplace_back(shardId, cmdObj);
+ }
+
+ if (!requests.empty()) {
+ AsyncRequestsSender ars(opCtx,
+ executor,
+ "admin",
+ requests,
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly),
+ Shard::RetryPolicy::kIdempotent);
+
+ while (!ars.done()) {
+ // Retrieve the responses and throw at the first failure.
+ auto response = ars.next();
+
+ auto generateErrorContext = [&]() -> std::string {
+ return str::stream()
+ << "Unable to _flushRoutingTableCacheUpdatesWithWriteConcern for namespace "
+ << nss.ns() << " on " << response.shardId;
+ };
+
+ auto shardResponse =
+ uassertStatusOKWithContext(std::move(response.swResponse), generateErrorContext());
+
+ auto status = getStatusFromCommandResult(shardResponse.data);
+ uassertStatusOKWithContext(status, generateErrorContext());
+
+ auto wcStatus = getWriteConcernStatusFromCommandResult(shardResponse.data);
+ uassertStatusOKWithContext(wcStatus, generateErrorContext());
+ }
+ }
+}
+
void checkForHolesAndOverlapsInChunks(std::vector<ReshardedChunk>& chunks,
const KeyPattern& keyPattern) {
std::sort(chunks.begin(), chunks.end(), [](const ReshardedChunk& a, const ReshardedChunk& b) {
diff --git a/src/mongo/db/s/resharding_util.h b/src/mongo/db/s/resharding_util.h
index db10d68c4fd..517ef531b29 100644
--- a/src/mongo/db/s/resharding_util.h
+++ b/src/mongo/db/s/resharding_util.h
@@ -36,14 +36,25 @@
#include "mongo/db/operation_context.h"
#include "mongo/db/pipeline/pipeline.h"
#include "mongo/db/s/resharding/donor_oplog_id_gen.h"
+#include "mongo/executor/task_executor.h"
#include "mongo/s/catalog/type_tags.h"
#include "mongo/s/resharded_chunk_gen.h"
+#include "mongo/s/shard_id.h"
namespace mongo {
constexpr auto kReshardingOplogPrePostImageOps = "prePostImageOps"_sd;
/**
+ * Sends _flushRoutingTableCacheUpdatesWithWriteConcern to a list of shards. Throws if one of the
+ * shards fails to refresh.
+ */
+void tellShardsToRefresh(OperationContext* opCtx,
+ const std::vector<ShardId>& shardIds,
+ const NamespaceString& nss,
+ std::shared_ptr<executor::TaskExecutor> executor);
+
+/**
* Asserts that there is not a hole or overlap in the chunks.
*/
void checkForHolesAndOverlapsInChunks(std::vector<ReshardedChunk>& chunks,
diff --git a/src/mongo/db/s/resharding_util_refresh_test.cpp b/src/mongo/db/s/resharding_util_refresh_test.cpp
new file mode 100644
index 00000000000..f13a484f857
--- /dev/null
+++ b/src/mongo/db/s/resharding_util_refresh_test.cpp
@@ -0,0 +1,122 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/client/remote_command_targeter_mock.h"
+#include "mongo/db/s/config/config_server_test_fixture.h"
+#include "mongo/db/s/resharding_util.h"
+#include "mongo/executor/thread_pool_task_executor_test_fixture.h"
+#include "mongo/logv2/log.h"
+#include "mongo/s/catalog/type_shard.h"
+#include "mongo/s/shard_id.h"
+
+namespace mongo {
+namespace {
+
+const std::vector<ShardId> kShardIdList{{"s1"}, {"s2"}};
+const std::vector<ShardType> kShards{{"s1", "s1:123"}, {"s2", "s2:123"}};
+
+const Status kMockStatus = {ErrorCodes::InternalError, "test"};
+const BSONObj kMockErrorRes = BSON("ok" << 0 << "code" << kMockStatus.code());
+
+const BSONObj kMockWriteConcernError = BSON("code" << ErrorCodes::WriteConcernFailed << "errmsg"
+ << "Mock");
+const BSONObj kMockResWithWriteConcernError =
+ BSON("ok" << 1 << "writeConcernError" << kMockWriteConcernError);
+
+const Status kRetryableError{ErrorCodes::HostUnreachable, "RetryableError for test"};
+
+class ReshardingRefresherTest : public ConfigServerTestFixture {
+protected:
+ void setUp() {
+ ConfigServerTestFixture::setUp();
+
+ for (const auto& shard : kShards) {
+ targeterFactory()->addTargeterToReturn(
+ ConnectionString(HostAndPort{shard.getHost()}), [&] {
+ auto targeter = std::make_unique<RemoteCommandTargeterMock>();
+ targeter->setFindHostReturnValue(HostAndPort{shard.getHost()});
+ return targeter;
+ }());
+ }
+
+ setupShards(kShards);
+ shardRegistry()->reload(operationContext());
+ }
+};
+
+TEST_F(ReshardingRefresherTest, refresherTwoShardsSucceed) {
+ auto opCtx = operationContext();
+ auto nss = NamespaceString("mydb", "mycoll");
+ auto future = launchAsync([&] { tellShardsToRefresh(opCtx, kShardIdList, nss, executor()); });
+
+ onCommand([&](const executor::RemoteCommandRequest& request) { return BSON("ok" << 1); });
+ onCommand([&](const executor::RemoteCommandRequest& request) { return BSON("ok" << 1); });
+
+ future.default_timed_get();
+}
+
+TEST_F(ReshardingRefresherTest, refresherTwoShardsFirstErrors) {
+ auto opCtx = operationContext();
+ auto nss = NamespaceString("mydb", "mycoll");
+ auto future = launchAsync([&] { tellShardsToRefresh(opCtx, kShardIdList, nss, executor()); });
+
+ onCommand([&](const executor::RemoteCommandRequest& request) { return kMockErrorRes; });
+
+ ASSERT_THROWS_CODE(future.default_timed_get(), DBException, kMockStatus.code());
+}
+
+TEST_F(ReshardingRefresherTest, refresherTwoShardsSecondErrors) {
+ auto opCtx = operationContext();
+ auto nss = NamespaceString("mydb", "mycoll");
+ auto future = launchAsync([&] { tellShardsToRefresh(opCtx, kShardIdList, nss, executor()); });
+
+ onCommand([&](const executor::RemoteCommandRequest& request) { return BSON("ok" << 1); });
+ onCommand([&](const executor::RemoteCommandRequest& request) { return kMockErrorRes; });
+
+ ASSERT_THROWS_CODE(future.default_timed_get(), DBException, kMockStatus.code());
+}
+
+TEST_F(ReshardingRefresherTest, refresherTwoShardsWriteConcernFailed) {
+ auto opCtx = operationContext();
+ auto nss = NamespaceString("mydb", "mycoll");
+ auto future = launchAsync([&] { tellShardsToRefresh(opCtx, kShardIdList, nss, executor()); });
+
+ onCommand([&](const executor::RemoteCommandRequest& request) {
+ return kMockResWithWriteConcernError;
+ });
+
+ ASSERT_THROWS_CODE(future.default_timed_get(), DBException, ErrorCodes::WriteConcernFailed);
+}
+
+} // namespace
+} // namespace mongo