diff options
author | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2015-07-09 16:50:30 -0400 |
---|---|---|
committer | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2015-07-11 05:49:49 -0400 |
commit | ac42068ddeaae27d2cd5cfc4808915491e5097c7 (patch) | |
tree | eda7ced79e545b3fce7f6ca0d892d330e932f0a1 /src/mongo | |
parent | 15c72c8570c63e2e6ba0a3d339a8286d0be604db (diff) | |
download | mongo-ac42068ddeaae27d2cd5cfc4808915491e5097c7.tar.gz |
SERVER-18084 Move code out of d_state.h/.cpp
Move the ShardingState/ShardedConnectionInfo classes out of d_state and
put them in separate sources under mongo/db.
No functional changes.
Diffstat (limited to 'src/mongo')
31 files changed, 1508 insertions, 1277 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index f0d632fe070..08ff1f3f687 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -19,6 +19,7 @@ env.SConscript( 'pipeline', 'query', 'repl', + 's', 'sorter', 'stats', 'storage', @@ -636,6 +637,7 @@ serveronlyLibdeps = [ "repl/rslog", "repl/sync_tail", "repl/topology_coordinator_impl", + "s/sharding", "startup_warnings_mongod", "stats/counters", "stats/top", diff --git a/src/mongo/db/commands/cleanup_orphaned_cmd.cpp b/src/mongo/db/commands/cleanup_orphaned_cmd.cpp index a58b8cd4932..3af1687c790 100644 --- a/src/mongo/db/commands/cleanup_orphaned_cmd.cpp +++ b/src/mongo/db/commands/cleanup_orphaned_cmd.cpp @@ -45,8 +45,8 @@ #include "mongo/db/range_arithmetic.h" #include "mongo/db/range_deleter_service.h" #include "mongo/db/repl/replication_coordinator_global.h" +#include "mongo/db/s/sharding_state.h" #include "mongo/s/collection_metadata.h" -#include "mongo/s/d_state.h" #include "mongo/util/log.h" namespace { diff --git a/src/mongo/db/commands/create_indexes.cpp b/src/mongo/db/commands/create_indexes.cpp index d941c3fa58e..8db9d4576b8 100644 --- a/src/mongo/db/commands/create_indexes.cpp +++ b/src/mongo/db/commands/create_indexes.cpp @@ -47,7 +47,7 @@ #include "mongo/db/op_observer.h" #include "mongo/db/ops/insert.h" #include "mongo/db/repl/replication_coordinator_global.h" -#include "mongo/s/d_state.h" +#include "mongo/db/s/sharding_state.h" #include "mongo/s/shard_key_pattern.h" namespace mongo { diff --git a/src/mongo/db/commands/find_cmd.cpp b/src/mongo/db/commands/find_cmd.cpp index a300563de62..b0875bc764e 100644 --- a/src/mongo/db/commands/find_cmd.cpp +++ b/src/mongo/db/commands/find_cmd.cpp @@ -47,9 +47,9 @@ #include "mongo/db/query/explain.h" #include "mongo/db/query/find.h" #include "mongo/db/query/get_executor.h" +#include "mongo/db/s/sharding_state.h" #include "mongo/db/server_parameters.h" #include "mongo/db/stats/counters.h" -#include "mongo/s/d_state.h" #include "mongo/s/stale_exception.h" #include "mongo/util/log.h" #include "mongo/util/scopeguard.h" diff --git a/src/mongo/db/commands/merge_chunks_cmd.cpp b/src/mongo/db/commands/merge_chunks_cmd.cpp index 1ee9d397dd7..12fccf9ed5e 100644 --- a/src/mongo/db/commands/merge_chunks_cmd.cpp +++ b/src/mongo/db/commands/merge_chunks_cmd.cpp @@ -26,6 +26,8 @@ * it in the license file. */ +#include "mongo/platform/basic.h" + #include "mongo/base/init.h" #include "mongo/db/auth/action_type.h" #include "mongo/db/auth/authorization_session.h" @@ -33,7 +35,7 @@ #include "mongo/db/commands.h" #include "mongo/db/field_parser.h" #include "mongo/db/namespace_string.h" -#include "mongo/s/d_state.h" +#include "mongo/db/s/sharding_state.h" #include "mongo/s/d_merge.h" namespace mongo { @@ -147,7 +149,7 @@ public: return false; } - ShardingState::initialize(config); + shardingState.initialize(config); } // ShardName is optional, but might not be set yet diff --git a/src/mongo/db/commands/mr.cpp b/src/mongo/db/commands/mr.cpp index b56672a8763..5c6f38dbf48 100644 --- a/src/mongo/db/commands/mr.cpp +++ b/src/mongo/db/commands/mr.cpp @@ -32,7 +32,6 @@ #include "mongo/db/commands/mr.h" - #include "mongo/client/connpool.h" #include "mongo/client/parallel.h" #include "mongo/db/auth/authorization_session.h" @@ -56,6 +55,8 @@ #include "mongo/db/query/query_planner.h" #include "mongo/db/range_preserver.h" #include "mongo/db/repl/replication_coordinator_global.h" +#include "mongo/db/s/sharded_connection_info.h" +#include "mongo/db/s/sharding_state.h" #include "mongo/s/catalog/catalog_cache.h" #include "mongo/s/chunk_manager.h" #include "mongo/s/collection_metadata.h" @@ -510,9 +511,9 @@ void State::appendResults(BSONObjBuilder& final) { BSONArrayBuilder b((int)(_size * 1.2)); // _size is data size, doesn't count overhead and keys - for (InMemory::iterator i = _temp->begin(); i != _temp->end(); ++i) { - BSONObj key = i->first; - BSONList& all = i->second; + for (const auto& entry : *_temp) { + const BSONObj& key = entry.first; + const BSONList& all = entry.second; verify(all.size() == 1); diff --git a/src/mongo/db/commands/write_commands/batch_executor.cpp b/src/mongo/db/commands/write_commands/batch_executor.cpp index 7e00d55f234..428b9873133 100644 --- a/src/mongo/db/commands/write_commands/batch_executor.cpp +++ b/src/mongo/db/commands/write_commands/batch_executor.cpp @@ -66,11 +66,12 @@ #include "mongo/db/repl/replication_coordinator_global.h" #include "mongo/db/server_parameters.h" #include "mongo/db/service_context.h" +#include "mongo/db/s/sharded_connection_info.h" +#include "mongo/db/s/sharding_state.h" #include "mongo/db/stats/counters.h" #include "mongo/db/stats/top.h" #include "mongo/db/write_concern.h" #include "mongo/s/collection_metadata.h" -#include "mongo/s/d_state.h" #include "mongo/s/shard_key_pattern.h" #include "mongo/s/stale_exception.h" #include "mongo/s/write_ops/batched_upsert_detail.h" diff --git a/src/mongo/db/dbcommands.cpp b/src/mongo/db/dbcommands.cpp index 5e49cd26b37..45d34aff374 100644 --- a/src/mongo/db/dbcommands.cpp +++ b/src/mongo/db/dbcommands.cpp @@ -1,32 +1,30 @@ -// dbcommands.cpp - /** -* Copyright (C) 2012-2014 MongoDB Inc. -* -* This program is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License, version 3, -* as published by the Free Software Foundation. -* -* This program is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see <http://www.gnu.org/licenses/>. -* -* As a special exception, the copyright holders give permission to link the -* code of portions of this program with the OpenSSL library under certain -* conditions as described in each individual source file and distribute -* linked combinations including the program with the OpenSSL library. You -* must comply with the GNU Affero General Public License in all respects for -* all of the code used other than as permitted herein. If you modify file(s) -* with this exception, you may extend this exception to your version of the -* file(s), but you are not obligated to do so. If you do not wish to do so, -* delete this exception statement from your version. If you delete this -* exception statement from all source files in the program, then also delete -* it in the license file. -*/ + * Copyright (C) 2012-2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kCommand @@ -79,7 +77,6 @@ #include "mongo/db/lasterror.h" #include "mongo/db/namespace_string.h" #include "mongo/db/op_observer.h" -#include "mongo/db/operation_context_impl.h" #include "mongo/db/ops/insert.h" #include "mongo/db/query/get_executor.h" #include "mongo/db/query/internal_plans.h" @@ -91,15 +88,13 @@ #include "mongo/db/repl/repl_client_info.h" #include "mongo/db/repl/repl_settings.h" #include "mongo/db/repl/replication_coordinator_global.h" -#include "mongo/db/storage/mmap_v1/dur_stats.h" -#include "mongo/db/storage/storage_engine.h" +#include "mongo/db/s/sharding_state.h" #include "mongo/db/write_concern.h" #include "mongo/rpc/request_interface.h" #include "mongo/rpc/reply_builder_interface.h" #include "mongo/rpc/metadata.h" #include "mongo/rpc/metadata/server_selection_metadata.h" #include "mongo/rpc/metadata/sharding_metadata.h" -#include "mongo/s/d_state.h" #include "mongo/s/stale_exception.h" // for SendStaleConfigException #include "mongo/scripting/engine.h" #include "mongo/util/fail_point_service.h" diff --git a/src/mongo/db/dbhelpers.cpp b/src/mongo/db/dbhelpers.cpp index 0fe1949c452..9580fd07114 100644 --- a/src/mongo/db/dbhelpers.cpp +++ b/src/mongo/db/dbhelpers.cpp @@ -61,7 +61,7 @@ #include "mongo/db/storage_options.h" #include "mongo/db/write_concern.h" #include "mongo/db/write_concern_options.h" -#include "mongo/s/d_state.h" +#include "mongo/db/s/sharding_state.h" #include "mongo/s/shard_key_pattern.h" #include "mongo/util/log.h" #include "mongo/util/scopeguard.h" diff --git a/src/mongo/db/exec/shard_filter.cpp b/src/mongo/db/exec/shard_filter.cpp index d65ef0c0f10..8b28453caf8 100644 --- a/src/mongo/db/exec/shard_filter.cpp +++ b/src/mongo/db/exec/shard_filter.cpp @@ -28,17 +28,21 @@ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kQuery +#include "mongo/platform/basic.h" + #include "mongo/db/exec/shard_filter.h" #include "mongo/db/exec/filter.h" #include "mongo/db/exec/scoped_timer.h" #include "mongo/db/exec/working_set_common.h" +#include "mongo/s/collection_metadata.h" #include "mongo/s/shard_key_pattern.h" #include "mongo/stdx/memory.h" #include "mongo/util/log.h" namespace mongo { +using std::shared_ptr; using std::unique_ptr; using std::vector; using stdx::make_unique; @@ -46,7 +50,7 @@ using stdx::make_unique; // static const char* ShardFilterStage::kStageType = "SHARDING_FILTER"; -ShardFilterStage::ShardFilterStage(const CollectionMetadataPtr& metadata, +ShardFilterStage::ShardFilterStage(const shared_ptr<CollectionMetadata>& metadata, WorkingSet* ws, PlanStage* child) : _ws(ws), _child(child), _commonStats(kStageType), _metadata(metadata) {} diff --git a/src/mongo/db/exec/shard_filter.h b/src/mongo/db/exec/shard_filter.h index cc99525bfcc..a61eefc2f23 100644 --- a/src/mongo/db/exec/shard_filter.h +++ b/src/mongo/db/exec/shard_filter.h @@ -28,14 +28,12 @@ #pragma once - #include "mongo/db/exec/plan_stage.h" -#include "mongo/db/jsobj.h" -#include "mongo/db/record_id.h" -#include "mongo/s/d_state.h" namespace mongo { +class CollectionMetadata; + /** * This stage drops documents that didn't belong to the shard we're executing on at the time of * construction. This matches the contract for sharded cursorids which guarantees that a @@ -73,7 +71,9 @@ namespace mongo { */ class ShardFilterStage : public PlanStage { public: - ShardFilterStage(const CollectionMetadataPtr& metadata, WorkingSet* ws, PlanStage* child); + ShardFilterStage(const std::shared_ptr<CollectionMetadata>& metadata, + WorkingSet* ws, + PlanStage* child); virtual ~ShardFilterStage(); virtual bool isEOF(); @@ -107,7 +107,7 @@ private: // Note: it is important that this is the metadata from the time this stage is constructed. // See class comment for details. - const CollectionMetadataPtr _metadata; + const std::shared_ptr<CollectionMetadata> _metadata; }; } // namespace mongo diff --git a/src/mongo/db/instance.cpp b/src/mongo/db/instance.cpp index b4831e0a6cb..586c59f7278 100644 --- a/src/mongo/db/instance.cpp +++ b/src/mongo/db/instance.cpp @@ -1,32 +1,30 @@ -// instance.cpp - /** -* Copyright (C) 2008 10gen Inc. -* -* This program is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License, version 3, -* as published by the Free Software Foundation. -* -* This program is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see <http://www.gnu.org/licenses/>. -* -* As a special exception, the copyright holders give permission to link the -* code of portions of this program with the OpenSSL library under certain -* conditions as described in each individual source file and distribute -* linked combinations including the program with the OpenSSL library. You -* must comply with the GNU Affero General Public License in all respects for -* all of the code used other than as permitted herein. If you modify file(s) -* with this exception, you may extend this exception to your version of the -* file(s), but you are not obligated to do so. If you do not wish to do so, -* delete this exception statement from your version. If you delete this -* exception statement from all source files in the program, then also delete -* it in the license file. -*/ + * Copyright (C) 2008-2014 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kCommand @@ -79,6 +77,7 @@ #include "mongo/db/query/get_executor.h" #include "mongo/db/repl/oplog.h" #include "mongo/db/repl/replication_coordinator_global.h" +#include "mongo/db/s/sharded_connection_info.h" #include "mongo/db/service_context.h" #include "mongo/db/stats/counters.h" #include "mongo/db/storage/storage_engine.h" @@ -94,7 +93,6 @@ #include "mongo/rpc/metadata.h" #include "mongo/rpc/request_interface.h" #include "mongo/s/catalog/catalog_manager.h" -#include "mongo/s/d_state.h" #include "mongo/s/grid.h" #include "mongo/s/stale_exception.h" // for SendStaleConfigException #include "mongo/scripting/engine.h" diff --git a/src/mongo/db/ops/update_lifecycle_impl.cpp b/src/mongo/db/ops/update_lifecycle_impl.cpp index 95c15f2d04b..b003169edf2 100644 --- a/src/mongo/db/ops/update_lifecycle_impl.cpp +++ b/src/mongo/db/ops/update_lifecycle_impl.cpp @@ -26,14 +26,16 @@ * it in the license file. */ +#include "mongo/platform/basic.h" + #include "mongo/db/ops/update_lifecycle_impl.h" #include "mongo/db/client.h" #include "mongo/db/catalog/database.h" #include "mongo/db/field_ref.h" #include "mongo/db/catalog/collection.h" +#include "mongo/db/s/sharding_state.h" #include "mongo/s/chunk_version.h" -#include "mongo/s/d_state.h" namespace mongo { namespace { diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp index 136d1ca7f29..d71b36c49a6 100644 --- a/src/mongo/db/pipeline/pipeline_d.cpp +++ b/src/mongo/db/pipeline/pipeline_d.cpp @@ -30,7 +30,6 @@ #include "mongo/db/pipeline/pipeline_d.h" - #include "mongo/client/dbclientinterface.h" #include "mongo/db/catalog/collection.h" #include "mongo/db/catalog/database.h" @@ -41,7 +40,8 @@ #include "mongo/db/pipeline/pipeline.h" #include "mongo/db/query/get_executor.h" #include "mongo/db/query/query_planner.h" -#include "mongo/s/d_state.h" +#include "mongo/db/s/sharded_connection_info.h" +#include "mongo/db/s/sharding_state.h" namespace mongo { diff --git a/src/mongo/db/query/find.cpp b/src/mongo/db/query/find.cpp index 2991ce21928..e90e4d18779 100644 --- a/src/mongo/db/query/find.cpp +++ b/src/mongo/db/query/find.cpp @@ -50,11 +50,11 @@ #include "mongo/db/query/internal_plans.h" #include "mongo/db/query/query_planner_params.h" #include "mongo/db/repl/replication_coordinator_global.h" +#include "mongo/db/s/sharding_state.h" #include "mongo/db/server_options.h" #include "mongo/db/server_parameters.h" #include "mongo/db/storage_options.h" #include "mongo/s/chunk_version.h" -#include "mongo/s/d_state.h" #include "mongo/s/stale_exception.h" #include "mongo/stdx/memory.h" #include "mongo/util/fail_point_service.h" diff --git a/src/mongo/db/query/get_executor.cpp b/src/mongo/db/query/get_executor.cpp index aaccdd66d90..86bcc3beddf 100644 --- a/src/mongo/db/query/get_executor.cpp +++ b/src/mongo/db/query/get_executor.cpp @@ -70,9 +70,9 @@ #include "mongo/db/server_options.h" #include "mongo/db/server_parameters.h" #include "mongo/db/service_context.h" +#include "mongo/db/s/sharding_state.h" #include "mongo/db/storage_options.h" #include "mongo/db/storage/oplog_hack.h" -#include "mongo/s/d_state.h" #include "mongo/scripting/engine.h" #include "mongo/stdx/memory.h" #include "mongo/util/log.h" diff --git a/src/mongo/db/query/stage_builder.cpp b/src/mongo/db/query/stage_builder.cpp index 15bb04c1305..07db4afa5fa 100644 --- a/src/mongo/db/query/stage_builder.cpp +++ b/src/mongo/db/query/stage_builder.cpp @@ -28,6 +28,8 @@ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kQuery +#include "mongo/platform/basic.h" + #include "mongo/db/query/stage_builder.h" #include "mongo/db/client.h" @@ -51,6 +53,7 @@ #include "mongo/db/index/fts_access_method.h" #include "mongo/db/catalog/collection.h" #include "mongo/db/catalog/database.h" +#include "mongo/db/s/sharding_state.h" #include "mongo/util/log.h" namespace mongo { diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index 611a089a541..489bc753b99 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -55,9 +55,9 @@ #include "mongo/db/repl/snapshot_thread.h" #include "mongo/db/server_parameters.h" #include "mongo/db/service_context.h" +#include "mongo/db/s/sharding_state.h" #include "mongo/db/storage/storage_engine.h" #include "mongo/executor/network_interface.h" -#include "mongo/s/d_state.h" #include "mongo/stdx/functional.h" #include "mongo/stdx/thread.h" #include "mongo/util/assert_util.h" diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript new file mode 100644 index 00000000000..1ed15c8bfac --- /dev/null +++ b/src/mongo/db/s/SConscript @@ -0,0 +1,17 @@ +# -*- mode: python -*- + +Import("env") + +env.Library( + target='sharding', + source=[ + 'sharded_connection_info.cpp', + 'sharding_state.cpp', + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/base/base', + '$BUILD_DIR/mongo/bson/bson', + '$BUILD_DIR/mongo/bson/util/bson_extract', + '$BUILD_DIR/mongo/db/common', + ] +) diff --git a/src/mongo/db/s/sharded_connection_info.cpp b/src/mongo/db/s/sharded_connection_info.cpp new file mode 100644 index 00000000000..e3029bd950a --- /dev/null +++ b/src/mongo/db/s/sharded_connection_info.cpp @@ -0,0 +1,103 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding + +#include "mongo/platform/basic.h" + +#include "mongo/db/s/sharded_connection_info.h" + +#include <boost/optional.hpp> + +#include "mongo/client/global_conn_pool.h" +#include "mongo/db/client.h" +#include "mongo/db/operation_context.h" +#include "mongo/s/chunk_version.h" +#include "mongo/s/client/shard_connection.h" +#include "mongo/s/client/sharding_connection_hook.h" +#include "mongo/util/log.h" + +namespace mongo { + +namespace { + +const auto clientSCI = Client::declareDecoration<boost::optional<ShardedConnectionInfo>>(); + +} // namespace + +ShardedConnectionInfo::ShardedConnectionInfo() { + _forceVersionOk = false; +} + +ShardedConnectionInfo::~ShardedConnectionInfo() = default; + +ShardedConnectionInfo* ShardedConnectionInfo::get(Client* client, bool create) { + auto& current = clientSCI(client); + + if (!current && create) { + LOG(1) << "entering shard mode for connection"; + current = boost::in_place(); + } + + return current ? ¤t.value() : nullptr; +} + +void ShardedConnectionInfo::reset(Client* client) { + clientSCI(client) = boost::none; +} + +const ChunkVersion ShardedConnectionInfo::getVersion(const std::string& ns) const { + NSVersionMap::const_iterator it = _versions.find(ns); + if (it != _versions.end()) { + return it->second; + } else { + return ChunkVersion(0, 0, OID()); + } +} + +void ShardedConnectionInfo::setVersion(const std::string& ns, const ChunkVersion& version) { + _versions[ns] = version; +} + +void ShardedConnectionInfo::addHook() { + static stdx::mutex lock; + static bool done = false; + + stdx::lock_guard<stdx::mutex> lk(lock); + if (!done) { + log() << "first cluster operation detected, adding sharding hook to enable versioning " + "and authentication to remote servers"; + + globalConnPool.addHook(new ShardingConnectionHook(false)); + shardConnectionPool.addHook(new ShardingConnectionHook(true)); + + done = true; + } +} + +} // namespace mongo diff --git a/src/mongo/db/s/sharded_connection_info.h b/src/mongo/db/s/sharded_connection_info.h new file mode 100644 index 00000000000..482e5910cf4 --- /dev/null +++ b/src/mongo/db/s/sharded_connection_info.h @@ -0,0 +1,82 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include <map> +#include <string> + +#include "mongo/base/disallow_copying.h" + +namespace mongo { + +struct ChunkVersion; +class Client; + +/** + * There is one instance of these per each connection from mongos. Holds version state for each + * namespace. + */ +class ShardedConnectionInfo { + MONGO_DISALLOW_COPYING(ShardedConnectionInfo); + +public: + ShardedConnectionInfo(); + ~ShardedConnectionInfo(); + + const ChunkVersion getVersion(const std::string& ns) const; + void setVersion(const std::string& ns, const ChunkVersion& version); + + static ShardedConnectionInfo* get(Client* client, bool create); + static void reset(Client* client); + static void addHook(); + + bool inForceVersionOkMode() const { + return _forceVersionOk; + } + + void enterForceVersionOkMode() { + _forceVersionOk = true; + } + void leaveForceVersionOkMode() { + _forceVersionOk = false; + } + +private: + typedef std::map<std::string, ChunkVersion> NSVersionMap; + + // Map from a namespace string to the chunk version with which this connection has been + // initialized for the specified namespace + NSVersionMap _versions; + + // If this is true, then chunk versions aren't checked, and all operations are allowed + bool _forceVersionOk; +}; + + +} // namespace mongo diff --git a/src/mongo/db/s/sharding_state.cpp b/src/mongo/db/s/sharding_state.cpp new file mode 100644 index 00000000000..f6442c69744 --- /dev/null +++ b/src/mongo/db/s/sharding_state.cpp @@ -0,0 +1,787 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding + +#include "mongo/platform/basic.h" + +#include "mongo/db/s/sharding_state.h" + +#include "mongo/client/remote_command_targeter_factory_impl.h" +#include "mongo/db/client.h" +#include "mongo/db/concurrency/lock_state.h" +#include "mongo/db/operation_context.h" +#include "mongo/db/repl/replication_executor.h" +#include "mongo/db/s/sharded_connection_info.h" +#include "mongo/executor/network_interface_factory.h" +#include "mongo/executor/task_executor.h" +#include "mongo/s/catalog/catalog_manager.h" +#include "mongo/s/catalog/legacy/catalog_manager_legacy.h" +#include "mongo/s/client/shard_registry.h" +#include "mongo/s/chunk_version.h" +#include "mongo/s/collection_metadata.h" +#include "mongo/s/grid.h" +#include "mongo/s/metadata_loader.h" +#include "mongo/util/log.h" +#include "mongo/util/mongoutils/str.h" +#include "mongo/util/net/sock.h" + +namespace mongo { + +using std::shared_ptr; +using std::string; +using std::vector; + +// Global sharding state instance +ShardingState shardingState; + +ShardingState::ShardingState() + : _enabled(false), + _configServerTickets(3 /* max number of concurrent config server refresh threads */) {} + +ShardingState::~ShardingState() = default; + +bool ShardingState::enabled() { + stdx::lock_guard<stdx::mutex> lk(_mutex); + return _enabled; +} + +string ShardingState::getConfigServer() { + stdx::lock_guard<stdx::mutex> lk(_mutex); + invariant(_enabled); + + return grid.catalogManager()->connectionString().toString(); +} + +string ShardingState::getShardName() { + stdx::lock_guard<stdx::mutex> lk(_mutex); + invariant(_enabled); + + return _shardName; +} + +void ShardingState::initialize(const string& server) { + uassert(18509, + "Unable to obtain host name during sharding initialization.", + !getHostName().empty()); + + stdx::lock_guard<stdx::mutex> lk(_mutex); + + if (_enabled) { + // TODO: Do we need to throw exception if the config servers have changed from what we + // already have in place? How do we test for that? + return; + } + + ShardedConnectionInfo::addHook(); + + std::string errmsg; + ConnectionString configServerCS = ConnectionString::parse(server, errmsg); + uassert(28633, + str::stream() << "Invalid config server connection string: " << errmsg, + configServerCS.isValid()); + + auto catalogManager = stdx::make_unique<CatalogManagerLegacy>(); + uassertStatusOK(catalogManager->init(configServerCS)); + + auto shardRegistry(stdx::make_unique<ShardRegistry>( + stdx::make_unique<RemoteCommandTargeterFactoryImpl>(), + stdx::make_unique<repl::ReplicationExecutor>( + executor::makeNetworkInterface().release(), nullptr, 0), + nullptr, + catalogManager.get())); + shardRegistry->startup(); + + grid.init(std::move(catalogManager), std::move(shardRegistry)); + + _enabled = true; +} + +// TODO: Consolidate and eliminate these various ways of setting / validating shard names +bool ShardingState::setShardName(const string& name) { + return setShardNameAndHost(name, ""); +} + +bool ShardingState::setShardNameAndHost(const string& name, const string& host) { + stdx::lock_guard<stdx::mutex> lk(_mutex); + if (_shardName.size() == 0) { + // TODO SERVER-2299 remotely verify the name is sound w.r.t IPs + _shardName = name; + + string clientAddr = cc().clientAddress(true); + + log() << "remote client " << clientAddr << " initialized this host " + << (host.empty() ? string("") : string("(") + host + ") ") << "as shard " << name; + + return true; + } + + if (_shardName == name) + return true; + + string clientAddr = cc().clientAddress(true); + + warning() << "remote client " << clientAddr << " tried to initialize this host " + << (host.empty() ? string("") : string("(") + host + ") ") << "as shard " << name + << ", but shard name was previously initialized as " << _shardName; + + return false; +} + +void ShardingState::gotShardName(const string& name) { + gotShardNameAndHost(name, ""); +} + +void ShardingState::gotShardNameAndHost(const string& name, const string& host) { + if (setShardNameAndHost(name, host)) { + return; + } + + const string clientAddr = cc().clientAddress(true); + + StringBuilder sb; + + // Same error as above, to match for reporting + sb << "remote client " << clientAddr << " tried to initialize this host " + << (host.empty() ? string("") : string("(") + host + ") ") << "as shard " << name + << ", but shard name was previously initialized as " << _shardName; + + msgasserted(13298, sb.str()); +} + +void ShardingState::clearCollectionMetadata() { + stdx::lock_guard<stdx::mutex> lk(_mutex); + _collMetadata.clear(); +} + +// TODO we shouldn't need three ways for checking the version. Fix this. +bool ShardingState::hasVersion(const string& ns) { + stdx::lock_guard<stdx::mutex> lk(_mutex); + + CollectionMetadataMap::const_iterator it = _collMetadata.find(ns); + return it != _collMetadata.end(); +} + +bool ShardingState::hasVersion(const string& ns, ChunkVersion& version) { + stdx::lock_guard<stdx::mutex> lk(_mutex); + + CollectionMetadataMap::const_iterator it = _collMetadata.find(ns); + if (it == _collMetadata.end()) + return false; + + shared_ptr<CollectionMetadata> p = it->second; + version = p->getShardVersion(); + return true; +} + +ChunkVersion ShardingState::getVersion(const string& ns) { + stdx::lock_guard<stdx::mutex> lk(_mutex); + + CollectionMetadataMap::const_iterator it = _collMetadata.find(ns); + if (it != _collMetadata.end()) { + shared_ptr<CollectionMetadata> p = it->second; + return p->getShardVersion(); + } else { + return ChunkVersion(0, 0, OID()); + } +} + +void ShardingState::donateChunk(OperationContext* txn, + const string& ns, + const BSONObj& min, + const BSONObj& max, + ChunkVersion version) { + invariant(txn->lockState()->isCollectionLockedForMode(ns, MODE_X)); + stdx::lock_guard<stdx::mutex> lk(_mutex); + + CollectionMetadataMap::const_iterator it = _collMetadata.find(ns); + verify(it != _collMetadata.end()); + shared_ptr<CollectionMetadata> p = it->second; + + // empty shards should have version 0 + version = (p->getNumChunks() > 1) ? version : ChunkVersion(0, 0, p->getCollVersion().epoch()); + + ChunkType chunk; + chunk.setMin(min); + chunk.setMax(max); + string errMsg; + + shared_ptr<CollectionMetadata> cloned(p->cloneMigrate(chunk, version, &errMsg)); + // uassert to match old behavior, TODO: report errors w/o throwing + uassert(16855, errMsg, NULL != cloned.get()); + + // TODO: a bit dangerous to have two different zero-version states - no-metadata and + // no-version + _collMetadata[ns] = cloned; +} + +void ShardingState::undoDonateChunk(OperationContext* txn, + const string& ns, + shared_ptr<CollectionMetadata> prevMetadata) { + invariant(txn->lockState()->isCollectionLockedForMode(ns, MODE_X)); + stdx::lock_guard<stdx::mutex> lk(_mutex); + + log() << "ShardingState::undoDonateChunk acquired _mutex"; + + CollectionMetadataMap::iterator it = _collMetadata.find(ns); + verify(it != _collMetadata.end()); + it->second = prevMetadata; +} + +bool ShardingState::notePending(OperationContext* txn, + const string& ns, + const BSONObj& min, + const BSONObj& max, + const OID& epoch, + string* errMsg) { + invariant(txn->lockState()->isCollectionLockedForMode(ns, MODE_X)); + stdx::lock_guard<stdx::mutex> lk(_mutex); + + CollectionMetadataMap::const_iterator it = _collMetadata.find(ns); + if (it == _collMetadata.end()) { + *errMsg = str::stream() << "could not note chunk " + << "[" << min << "," << max << ")" + << " as pending because the local metadata for " << ns + << " has changed"; + + return false; + } + + shared_ptr<CollectionMetadata> metadata = it->second; + + // This can currently happen because drops aren't synchronized with in-migrations + // The idea for checking this here is that in the future we shouldn't have this problem + if (metadata->getCollVersion().epoch() != epoch) { + *errMsg = str::stream() << "could not note chunk " + << "[" << min << "," << max << ")" + << " as pending because the epoch for " << ns + << " has changed from " << epoch << " to " + << metadata->getCollVersion().epoch(); + + return false; + } + + ChunkType chunk; + chunk.setMin(min); + chunk.setMax(max); + + shared_ptr<CollectionMetadata> cloned(metadata->clonePlusPending(chunk, errMsg)); + if (!cloned) + return false; + + _collMetadata[ns] = cloned; + return true; +} + +bool ShardingState::forgetPending(OperationContext* txn, + const string& ns, + const BSONObj& min, + const BSONObj& max, + const OID& epoch, + string* errMsg) { + invariant(txn->lockState()->isCollectionLockedForMode(ns, MODE_X)); + stdx::lock_guard<stdx::mutex> lk(_mutex); + + CollectionMetadataMap::const_iterator it = _collMetadata.find(ns); + if (it == _collMetadata.end()) { + *errMsg = str::stream() << "no need to forget pending chunk " + << "[" << min << "," << max << ")" + << " because the local metadata for " << ns << " has changed"; + + return false; + } + + shared_ptr<CollectionMetadata> metadata = it->second; + + // This can currently happen because drops aren't synchronized with in-migrations + // The idea for checking this here is that in the future we shouldn't have this problem + if (metadata->getCollVersion().epoch() != epoch) { + *errMsg = str::stream() << "no need to forget pending chunk " + << "[" << min << "," << max << ")" + << " because the epoch for " << ns << " has changed from " << epoch + << " to " << metadata->getCollVersion().epoch(); + + return false; + } + + ChunkType chunk; + chunk.setMin(min); + chunk.setMax(max); + + shared_ptr<CollectionMetadata> cloned(metadata->cloneMinusPending(chunk, errMsg)); + if (!cloned) + return false; + + _collMetadata[ns] = cloned; + return true; +} + +void ShardingState::splitChunk(OperationContext* txn, + const string& ns, + const BSONObj& min, + const BSONObj& max, + const vector<BSONObj>& splitKeys, + ChunkVersion version) { + invariant(txn->lockState()->isCollectionLockedForMode(ns, MODE_X)); + stdx::lock_guard<stdx::mutex> lk(_mutex); + + CollectionMetadataMap::const_iterator it = _collMetadata.find(ns); + verify(it != _collMetadata.end()); + + ChunkType chunk; + chunk.setMin(min); + chunk.setMax(max); + string errMsg; + + shared_ptr<CollectionMetadata> cloned( + it->second->cloneSplit(chunk, splitKeys, version, &errMsg)); + // uassert to match old behavior, TODO: report errors w/o throwing + uassert(16857, errMsg, NULL != cloned.get()); + + _collMetadata[ns] = cloned; +} + +void ShardingState::mergeChunks(OperationContext* txn, + const string& ns, + const BSONObj& minKey, + const BSONObj& maxKey, + ChunkVersion mergedVersion) { + invariant(txn->lockState()->isCollectionLockedForMode(ns, MODE_X)); + stdx::lock_guard<stdx::mutex> lk(_mutex); + + CollectionMetadataMap::const_iterator it = _collMetadata.find(ns); + verify(it != _collMetadata.end()); + + string errMsg; + + shared_ptr<CollectionMetadata> cloned( + it->second->cloneMerge(minKey, maxKey, mergedVersion, &errMsg)); + // uassert to match old behavior, TODO: report errors w/o throwing + uassert(17004, errMsg, NULL != cloned.get()); + + _collMetadata[ns] = cloned; +} + +void ShardingState::resetMetadata(const string& ns) { + stdx::lock_guard<stdx::mutex> lk(_mutex); + + warning() << "resetting metadata for " << ns << ", this should only be used in testing"; + + _collMetadata.erase(ns); +} + +Status ShardingState::refreshMetadataIfNeeded(OperationContext* txn, + const string& ns, + const ChunkVersion& reqShardVersion, + ChunkVersion* latestShardVersion) { + // The _configServerTickets serializes this process such that only a small number of threads + // can try to refresh at the same time. + + LOG(2) << "metadata refresh requested for " << ns << " at shard version " << reqShardVersion; + + // + // Queuing of refresh requests starts here when remote reload is needed. This may take time. + // TODO: Explicitly expose the queuing discipline. + // + + _configServerTickets.waitForTicket(); + TicketHolderReleaser needTicketFrom(&_configServerTickets); + + // + // Fast path - check if the requested version is at a higher version than the current + // metadata version or a different epoch before verifying against config server. + // + + shared_ptr<CollectionMetadata> storedMetadata; + { + stdx::lock_guard<stdx::mutex> lk(_mutex); + CollectionMetadataMap::iterator it = _collMetadata.find(ns); + if (it != _collMetadata.end()) + storedMetadata = it->second; + } + ChunkVersion storedShardVersion; + if (storedMetadata) + storedShardVersion = storedMetadata->getShardVersion(); + *latestShardVersion = storedShardVersion; + + if (storedShardVersion >= reqShardVersion && + storedShardVersion.epoch() == reqShardVersion.epoch()) { + // Don't need to remotely reload if we're in the same epoch with a >= version + return Status::OK(); + } + + // + // Slow path - remotely reload + // + // Cases: + // A) Initial config load and/or secondary take-over. + // B) Migration TO this shard finished, notified by mongos. + // C) Dropping a collection, notified (currently) by mongos. + // D) Stale client wants to reload metadata with a different *epoch*, so we aren't sure. + + if (storedShardVersion.epoch() != reqShardVersion.epoch()) { + // Need to remotely reload if our epochs aren't the same, to verify + LOG(1) << "metadata change requested for " << ns << ", from shard version " + << storedShardVersion << " to " << reqShardVersion + << ", need to verify with config server"; + } else { + // Need to remotely reload since our epochs aren't the same but our version is greater + LOG(1) << "metadata version update requested for " << ns << ", from shard version " + << storedShardVersion << " to " << reqShardVersion + << ", need to verify with config server"; + } + + return doRefreshMetadata(txn, ns, reqShardVersion, true, latestShardVersion); +} + +Status ShardingState::refreshMetadataNow(OperationContext* txn, + const string& ns, + ChunkVersion* latestShardVersion) { + return doRefreshMetadata(txn, ns, ChunkVersion(0, 0, OID()), false, latestShardVersion); +} + +Status ShardingState::doRefreshMetadata(OperationContext* txn, + const string& ns, + const ChunkVersion& reqShardVersion, + bool useRequestedVersion, + ChunkVersion* latestShardVersion) { + // The idea here is that we're going to reload the metadata from the config server, but + // we need to do so outside any locks. When we get our result back, if the current metadata + // has changed, we may not be able to install the new metadata. + + // + // Get the initial metadata + // No DBLock is needed since the metadata is expected to change during reload. + // + + shared_ptr<CollectionMetadata> beforeMetadata; + + { + stdx::lock_guard<stdx::mutex> lk(_mutex); + + // We can't reload if sharding is not enabled - i.e. without a config server location + if (!_enabled) { + string errMsg = str::stream() << "cannot refresh metadata for " << ns + << " before sharding has been enabled"; + + warning() << errMsg; + return Status(ErrorCodes::NotYetInitialized, errMsg); + } + + // We also can't reload if a shard name has not yet been set. + if (_shardName.empty()) { + string errMsg = str::stream() << "cannot refresh metadata for " << ns + << " before shard name has been set"; + + warning() << errMsg; + return Status(ErrorCodes::NotYetInitialized, errMsg); + } + + CollectionMetadataMap::iterator it = _collMetadata.find(ns); + if (it != _collMetadata.end()) { + beforeMetadata = it->second; + } + } + + ChunkVersion beforeShardVersion; + ChunkVersion beforeCollVersion; + if (beforeMetadata) { + beforeShardVersion = beforeMetadata->getShardVersion(); + beforeCollVersion = beforeMetadata->getCollVersion(); + } + + *latestShardVersion = beforeShardVersion; + + // + // Determine whether we need to diff or fully reload + // + + bool fullReload = false; + if (!beforeMetadata) { + // We don't have any metadata to reload from + fullReload = true; + } else if (useRequestedVersion && reqShardVersion.epoch() != beforeShardVersion.epoch()) { + // It's not useful to use the metadata as a base because we think the epoch will differ + fullReload = true; + } + + // + // Load the metadata from the remote server, start construction + // + + LOG(0) << "remotely refreshing metadata for " << ns + << (useRequestedVersion + ? string(" with requested shard version ") + reqShardVersion.toString() + : "") + << (fullReload ? ", current shard version is " : " based on current shard version ") + << beforeShardVersion << ", current metadata version is " << beforeCollVersion; + + string errMsg; + + MetadataLoader mdLoader; + CollectionMetadata* remoteMetadataRaw = new CollectionMetadata(); + shared_ptr<CollectionMetadata> remoteMetadata(remoteMetadataRaw); + + Timer refreshTimer; + Status status = mdLoader.makeCollectionMetadata(grid.catalogManager(), + ns, + getShardName(), + fullReload ? NULL : beforeMetadata.get(), + remoteMetadataRaw); + long long refreshMillis = refreshTimer.millis(); + + if (status.code() == ErrorCodes::NamespaceNotFound) { + remoteMetadata.reset(); + remoteMetadataRaw = NULL; + } else if (!status.isOK()) { + warning() << "could not remotely refresh metadata for " << ns << causedBy(status.reason()); + + return status; + } + + ChunkVersion remoteShardVersion; + ChunkVersion remoteCollVersion; + if (remoteMetadata) { + remoteShardVersion = remoteMetadata->getShardVersion(); + remoteCollVersion = remoteMetadata->getCollVersion(); + } + + // + // Get ready to install loaded metadata if needed + // + + shared_ptr<CollectionMetadata> afterMetadata; + ChunkVersion afterShardVersion; + ChunkVersion afterCollVersion; + ChunkVersion::VersionChoice choice; + + // If we choose to install the new metadata, this describes the kind of install + enum InstallType { + InstallType_New, + InstallType_Update, + InstallType_Replace, + InstallType_Drop, + InstallType_None + } installType = InstallType_None; // compiler complains otherwise + + { + // Exclusive collection lock needed since we're now potentially changing the metadata, + // and don't want reads/writes to be ongoing. + ScopedTransaction transaction(txn, MODE_IX); + Lock::DBLock dbLock(txn->lockState(), nsToDatabaseSubstring(ns), MODE_IX); + Lock::CollectionLock collLock(txn->lockState(), ns, MODE_X); + + // + // Get the metadata now that the load has completed + // + + stdx::lock_guard<stdx::mutex> lk(_mutex); + + // Don't reload if our config server has changed or sharding is no longer enabled + if (!_enabled) { + string errMsg = str::stream() << "could not refresh metadata for " << ns + << ", sharding is no longer enabled"; + + warning() << errMsg; + return Status(ErrorCodes::NotYetInitialized, errMsg); + } + + CollectionMetadataMap::iterator it = _collMetadata.find(ns); + if (it != _collMetadata.end()) + afterMetadata = it->second; + + if (afterMetadata) { + afterShardVersion = afterMetadata->getShardVersion(); + afterCollVersion = afterMetadata->getCollVersion(); + } + + *latestShardVersion = afterShardVersion; + + // + // Resolve newer pending chunks with the remote metadata, finish construction + // + + status = mdLoader.promotePendingChunks(afterMetadata.get(), remoteMetadataRaw); + + if (!status.isOK()) { + warning() << "remote metadata for " << ns + << " is inconsistent with current pending chunks" + << causedBy(status.reason()); + + return status; + } + + // + // Compare the 'before', 'after', and 'remote' versions/epochs and choose newest + // Zero-epochs (sentinel value for "dropped" collections), are tested by + // !epoch.isSet(). + // + + choice = ChunkVersion::chooseNewestVersion( + beforeCollVersion, afterCollVersion, remoteCollVersion); + + if (choice == ChunkVersion::VersionChoice_Remote) { + dassert(!remoteCollVersion.epoch().isSet() || remoteShardVersion >= beforeShardVersion); + + if (!afterCollVersion.epoch().isSet()) { + // First metadata load + installType = InstallType_New; + dassert(it == _collMetadata.end()); + _collMetadata.insert(make_pair(ns, remoteMetadata)); + } else if (remoteCollVersion.epoch().isSet() && + remoteCollVersion.epoch() == afterCollVersion.epoch()) { + // Update to existing metadata + installType = InstallType_Update; + + // Invariant: If CollMetadata was not found, version should be have been 0. + dassert(it != _collMetadata.end()); + it->second = remoteMetadata; + } else if (remoteCollVersion.epoch().isSet()) { + // New epoch detected, replacing metadata + installType = InstallType_Replace; + + // Invariant: If CollMetadata was not found, version should be have been 0. + dassert(it != _collMetadata.end()); + it->second = remoteMetadata; + } else { + dassert(!remoteCollVersion.epoch().isSet()); + + // Drop detected + installType = InstallType_Drop; + _collMetadata.erase(it); + } + + *latestShardVersion = remoteShardVersion; + } + } + // End _mutex + // End DBWrite + + // + // Do messaging based on what happened above + // + string localShardVersionMsg = beforeShardVersion.epoch() == afterShardVersion.epoch() + ? afterShardVersion.toString() + : beforeShardVersion.toString() + " / " + afterShardVersion.toString(); + + if (choice == ChunkVersion::VersionChoice_Unknown) { + string errMsg = str::stream() + << "need to retry loading metadata for " << ns + << ", collection may have been dropped or recreated during load" + << " (loaded shard version : " << remoteShardVersion.toString() + << ", stored shard versions : " << localShardVersionMsg << ", took " << refreshMillis + << "ms)"; + + warning() << errMsg; + return Status(ErrorCodes::RemoteChangeDetected, errMsg); + } + + if (choice == ChunkVersion::VersionChoice_Local) { + LOG(0) << "metadata of collection " << ns + << " already up to date (shard version : " << afterShardVersion.toString() + << ", took " << refreshMillis << "ms)"; + return Status::OK(); + } + + dassert(choice == ChunkVersion::VersionChoice_Remote); + + switch (installType) { + case InstallType_New: + LOG(0) << "collection " << ns << " was previously unsharded" + << ", new metadata loaded with shard version " << remoteShardVersion; + break; + case InstallType_Update: + LOG(0) << "updating metadata for " << ns << " from shard version " + << localShardVersionMsg << " to shard version " << remoteShardVersion; + break; + case InstallType_Replace: + LOG(0) << "replacing metadata for " << ns << " at shard version " + << localShardVersionMsg << " with a new epoch (shard version " + << remoteShardVersion << ")"; + break; + case InstallType_Drop: + LOG(0) << "dropping metadata for " << ns << " at shard version " << localShardVersionMsg + << ", took " << refreshMillis << "ms"; + break; + default: + verify(false); + break; + } + + if (installType != InstallType_Drop) { + LOG(0) << "collection version was loaded at version " << remoteCollVersion << ", took " + << refreshMillis << "ms"; + } + + return Status::OK(); +} + +void ShardingState::appendInfo(BSONObjBuilder& builder) { + stdx::lock_guard<stdx::mutex> lk(_mutex); + + builder.appendBool("enabled", _enabled); + if (!_enabled) { + return; + } + + builder.append("configServer", grid.catalogManager()->connectionString().toString()); + builder.append("shardName", _shardName); + + BSONObjBuilder versionB(builder.subobjStart("versions")); + for (CollectionMetadataMap::const_iterator it = _collMetadata.begin(); + it != _collMetadata.end(); + ++it) { + shared_ptr<CollectionMetadata> metadata = it->second; + versionB.appendTimestamp(it->first, metadata->getShardVersion().toLong()); + } + + versionB.done(); +} + +bool ShardingState::needCollectionMetadata(Client* client, const string& ns) const { + if (!_enabled) + return false; + + if (!ShardedConnectionInfo::get(client, false)) + return false; + + return true; +} + +shared_ptr<CollectionMetadata> ShardingState::getCollectionMetadata(const string& ns) { + stdx::lock_guard<stdx::mutex> lk(_mutex); + + CollectionMetadataMap::const_iterator it = _collMetadata.find(ns); + if (it == _collMetadata.end()) { + return shared_ptr<CollectionMetadata>(); + } else { + return it->second; + } +} + +} // namespace mongo diff --git a/src/mongo/db/s/sharding_state.h b/src/mongo/db/s/sharding_state.h new file mode 100644 index 00000000000..b4c666ab5bd --- /dev/null +++ b/src/mongo/db/s/sharding_state.h @@ -0,0 +1,310 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include <map> +#include <string> +#include <vector> + +#include "mongo/base/disallow_copying.h" +#include "mongo/bson/oid.h" +#include "mongo/stdx/memory.h" +#include "mongo/stdx/mutex.h" +#include "mongo/util/concurrency/ticketholder.h" + +namespace mongo { + +class BSONObj; +class BSONObjBuilder; +struct ChunkVersion; +class Client; +class CollectionMetadata; +class OperationContext; +class Status; + +/** + * Represents the sharding state for the running instance. One per instance. + */ +class ShardingState { + MONGO_DISALLOW_COPYING(ShardingState); + +public: + ShardingState(); + ~ShardingState(); + + bool enabled(); + + std::string getConfigServer(); + std::string getShardName(); + + // Initialize sharding state and begin authenticating outgoing connections and handling + // shard versions. If this is not run before sharded operations occur auth will not work + // and versions will not be tracked. + void initialize(const std::string& server); + + void gotShardName(const std::string& name); + bool setShardName(const std::string& name); // Same as above, does not throw + + // Helpers for SetShardVersion which report the host name sent to this shard when the shard + // name does not match. Do not use in other places. + // TODO: Remove once SSV is deprecated + void gotShardNameAndHost(const std::string& name, const std::string& host); + bool setShardNameAndHost(const std::string& name, const std::string& host); + + /** + * Clears the collection metadata cache after step down. + */ + void clearCollectionMetadata(); + + // versioning support + + bool hasVersion(const std::string& ns); + bool hasVersion(const std::string& ns, ChunkVersion& version); + ChunkVersion getVersion(const std::string& ns); + + /** + * If the metadata for 'ns' at this shard is at or above the requested version, + * 'reqShardVersion', returns OK and fills in 'latestShardVersion' with the latest shard + * version. The latter is always greater or equal than 'reqShardVersion' if in the same + * epoch. + * + * Otherwise, falls back to refreshMetadataNow. + * + * This call blocks if there are more than N threads + * currently refreshing metadata. (N is the number of + * tickets in ShardingState::_configServerTickets, + * currently 3.) + * + * Locking Note: + * + Must NOT be called with the write lock because this call may go into the network, + * and deadlocks may occur with shard-as-a-config. Therefore, nothing here guarantees + * that 'latestShardVersion' is indeed the current one on return. + */ + Status refreshMetadataIfNeeded(OperationContext* txn, + const std::string& ns, + const ChunkVersion& reqShardVersion, + ChunkVersion* latestShardVersion); + + /** + * Refreshes collection metadata by asking the config server for the latest information. + * Starts a new config server request. + * + * Locking Notes: + * + Must NOT be called with the write lock because this call may go into the network, + * and deadlocks may occur with shard-as-a-config. Therefore, nothing here guarantees + * that 'latestShardVersion' is indeed the current one on return. + * + * + Because this call must not be issued with the DBLock held, by the time the config + * server sent us back the collection metadata information, someone else may have + * updated the previously stored collection metadata. There are cases when one can't + * tell which of updated or loaded metadata are the freshest. There are also cases where + * the data coming from configs do not correspond to a consistent snapshot. + * In these cases, return RemoteChangeDetected. (This usually means this call needs to + * be issued again, at caller discretion) + * + * @return OK if remote metadata successfully loaded (may or may not have been installed) + * @return RemoteChangeDetected if something changed while reloading and we may retry + * @return !OK if something else went wrong during reload + * @return latestShardVersion the version that is now stored for this collection + */ + Status refreshMetadataNow(OperationContext* txn, + const std::string& ns, + ChunkVersion* latestShardVersion); + + void appendInfo(BSONObjBuilder& b); + + // querying support + + bool needCollectionMetadata(Client* client, const std::string& ns) const; + std::shared_ptr<CollectionMetadata> getCollectionMetadata(const std::string& ns); + + // chunk migrate and split support + + /** + * Creates and installs a new chunk metadata for a given collection by "forgetting" about + * one of its chunks. The new metadata uses the provided version, which has to be higher + * than the current metadata's shard version. + * + * One exception: if the forgotten chunk is the last one in this shard for the collection, + * version has to be 0. + * + * If it runs successfully, clients need to grab the new version to access the collection. + * + * LOCKING NOTE: + * Only safe to do inside the + * + * @param ns the collection + * @param min max the chunk to eliminate from the current metadata + * @param version at which the new metadata should be at + */ + void donateChunk(OperationContext* txn, + const std::string& ns, + const BSONObj& min, + const BSONObj& max, + ChunkVersion version); + + /** + * Creates and installs new chunk metadata for a given collection by reclaiming a previously + * donated chunk. The previous metadata's shard version has to be provided. + * + * If it runs successfully, clients that became stale by the previous donateChunk will be + * able to access the collection again. + * + * Note: If a migration has aborted but not yet unregistered a pending chunk, replacing the + * metadata may leave the chunk as pending - this is not dangerous and should be rare, but + * will require a stepdown to fully recover. + * + * @param ns the collection + * @param prevMetadata the previous metadata before we donated a chunk + */ + void undoDonateChunk(OperationContext* txn, + const std::string& ns, + std::shared_ptr<CollectionMetadata> prevMetadata); + + /** + * Remembers a chunk range between 'min' and 'max' as a range which will have data migrated + * into it. This data can then be protected against cleanup of orphaned data. + * + * Overlapping pending ranges will be removed, so it is only safe to use this when you know + * your metadata view is definitive, such as at the start of a migration. + * + * @return false with errMsg if the range is owned by this shard + */ + bool notePending(OperationContext* txn, + const std::string& ns, + const BSONObj& min, + const BSONObj& max, + const OID& epoch, + std::string* errMsg); + + /** + * Stops tracking a chunk range between 'min' and 'max' that previously was having data + * migrated into it. This data is no longer protected against cleanup of orphaned data. + * + * To avoid removing pending ranges of other operations, ensure that this is only used when + * a migration is still active. + * TODO: Because migrations may currently be active when a collection drops, an epoch is + * necessary to ensure the pending metadata change is still applicable. + * + * @return false with errMsg if the range is owned by the shard or the epoch of the metadata + * has changed + */ + bool forgetPending(OperationContext* txn, + const std::string& ns, + const BSONObj& min, + const BSONObj& max, + const OID& epoch, + std::string* errMsg); + + /** + * Creates and installs a new chunk metadata for a given collection by splitting one of its + * chunks in two or more. The version for the first split chunk should be provided. The + * subsequent chunks' version would be the latter with the minor portion incremented. + * + * The effect on clients will depend on the version used. If the major portion is the same + * as the current shards, clients shouldn't perceive the split. + * + * @param ns the collection + * @param min max the chunk that should be split + * @param splitKeys point in which to split + * @param version at which the new metadata should be at + */ + void splitChunk(OperationContext* txn, + const std::string& ns, + const BSONObj& min, + const BSONObj& max, + const std::vector<BSONObj>& splitKeys, + ChunkVersion version); + + /** + * Creates and installs a new chunk metadata for a given collection by merging a range of + * chunks ['minKey', 'maxKey') into a single chunk with version 'mergedVersion'. + * The current metadata must overlap the range completely and minKey and maxKey must not + * divide an existing chunk. + * + * The merged chunk version must have a greater version than the current shard version, + * and if it has a greater major version clients will need to reload metadata. + * + * @param ns the collection + * @param minKey maxKey the range which should be merged + * @param newShardVersion the shard version the newly merged chunk should have + */ + void mergeChunks(OperationContext* txn, + const std::string& ns, + const BSONObj& minKey, + const BSONObj& maxKey, + ChunkVersion mergedVersion); + + bool inCriticalMigrateSection(); + + /** + * @return true if we are NOT in the critical section + */ + bool waitTillNotInCriticalSection(int maxSecondsToWait); + + /** + * TESTING ONLY + * Uninstalls the metadata for a given collection. + */ + void resetMetadata(const std::string& ns); + +private: + // Map from a namespace into the metadata we need for each collection on this shard + typedef std::map<std::string, std::shared_ptr<CollectionMetadata>> CollectionMetadataMap; + + /** + * Refreshes collection metadata by asking the config server for the latest information. + * May or may not be based on a requested version. + */ + Status doRefreshMetadata(OperationContext* txn, + const std::string& ns, + const ChunkVersion& reqShardVersion, + bool useRequestedVersion, + ChunkVersion* latestShardVersion); + + // protects state below + stdx::mutex _mutex; + + // Whether ::initialize has been called + bool _enabled; + + // Sets the shard name for this host (comes through setShardVersion) + std::string _shardName; + + // protects accessing the config server + // Using a ticket holder so we can have multiple redundant tries at any given time + mutable TicketHolder _configServerTickets; + + CollectionMetadataMap _collMetadata; +}; + +// Global sharding state instance +extern ShardingState shardingState; + +} // namespace mongo diff --git a/src/mongo/dbtests/config_server_fixture.cpp b/src/mongo/dbtests/config_server_fixture.cpp index 51f3ea5ae97..062f08d1bdd 100644 --- a/src/mongo/dbtests/config_server_fixture.cpp +++ b/src/mongo/dbtests/config_server_fixture.cpp @@ -35,17 +35,16 @@ #include <list> #include "mongo/dbtests/dbtests.h" +#include "mongo/db/s/sharding_state.h" #include "mongo/s/catalog/type_chunk.h" #include "mongo/s/catalog/legacy/legacy_dist_lock_manager.h" #include "mongo/s/catalog/type_config_version.h" -#include "mongo/s/d_state.h" #include "mongo/stdx/memory.h" #include "mongo/util/log.h" namespace mongo { using std::unique_ptr; -using std::endl; using std::list; using std::string; @@ -89,7 +88,7 @@ void ConfigServerFixture::clearVersion() { } void ConfigServerFixture::dumpServer() { - log() << "Dumping virtual config server to log..." << endl; + log() << "Dumping virtual config server to log..."; list<string> collectionNames(_client.getCollectionNames("config")); @@ -99,11 +98,11 @@ void ConfigServerFixture::dumpServer() { unique_ptr<DBClientCursor> cursor(_client.query(collection, BSONObj()).release()); ASSERT(cursor.get() != NULL); - log() << "Dumping collection " << collection << endl; + log() << "Dumping collection " << collection; while (cursor->more()) { BSONObj obj = cursor->nextSafe(); - log() << obj.toString() << endl; + log() << obj.toString(); } } } @@ -119,4 +118,5 @@ void ConfigServerFixture::tearDown() { DBException::traceExceptions = false; } -} + +} // namespace mongo diff --git a/src/mongo/dbtests/framework.cpp b/src/mongo/dbtests/framework.cpp index 4b645fd537b..944b39c5056 100644 --- a/src/mongo/dbtests/framework.cpp +++ b/src/mongo/dbtests/framework.cpp @@ -1,32 +1,30 @@ -// framework.cpp - /** -* Copyright (C) 2008 10gen Inc. -* -* This program is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License, version 3, -* as published by the Free Software Foundation. -* -* This program is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see <http://www.gnu.org/licenses/>. -* -* As a special exception, the copyright holders give permission to link the -* code of portions of this program with the OpenSSL library under certain -* conditions as described in each individual source file and distribute -* linked combinations including the program with the OpenSSL library. You -* must comply with the GNU Affero General Public License in all respects -* for all of the code used other than as permitted herein. If you modify -* file(s) with this exception, you may extend this exception to your -* version of the file(s), but you are not obligated to do so. If you do not -* wish to do so, delete this exception statement from your version. If you -* delete this exception statement from all source files in the program, -* then also delete it in the license file. -*/ + * Copyright (C) 2008-2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault @@ -35,19 +33,15 @@ #include "mongo/dbtests/framework.h" #include <string> -#include <vector> -#include "mongo/base/initializer.h" #include "mongo/base/status.h" #include "mongo/db/client.h" #include "mongo/db/concurrency/lock_state.h" -#include "mongo/db/service_context_d.h" #include "mongo/db/service_context.h" -#include "mongo/db/ops/update.h" +#include "mongo/db/s/sharding_state.h" #include "mongo/dbtests/dbtests.h" #include "mongo/dbtests/framework_options.h" #include "mongo/s/catalog/catalog_manager.h" -#include "mongo/s/d_state.h" #include "mongo/s/grid.h" #include "mongo/s/catalog/legacy/legacy_dist_lock_manager.h" #include "mongo/stdx/mutex.h" @@ -57,23 +51,19 @@ #include "mongo/util/log.h" #include "mongo/util/version.h" -namespace moe = mongo::optionenvironment; - namespace mongo { -using std::endl; -using std::string; - -namespace dbtests { +namespace { stdx::mutex globalCurrentTestNameMutex; std::string globalCurrentTestName; class TestWatchDog : public BackgroundJob { public: - virtual string name() const { + virtual std::string name() const { return "TestWatchDog"; } + virtual void run() { int minutesRunning = 0; std::string lastRunningTestName, currentTestName; @@ -98,12 +88,11 @@ public: } if (minutesRunning > 30) { - log() << currentTestName << " has been running for more than 30 minutes. aborting." - << endl; + log() << currentTestName << " has been running for more than 30 minutes. aborting."; ::abort(); } else if (minutesRunning > 1) { warning() << currentTestName << " has been running for more than " - << minutesRunning - 1 << " minutes." << endl; + << minutesRunning - 1 << " minutes."; // See what is stuck getGlobalLockManager()->dump(); @@ -112,6 +101,10 @@ public: } }; +} // namespace + +namespace dbtests { + int runDbTests(int argc, char** argv) { frameworkGlobalParams.perfHist = 1; frameworkGlobalParams.seed = time(0); @@ -138,27 +131,34 @@ int runDbTests(int argc, char** argv) { TestWatchDog twd; twd.go(); - int ret = ::mongo::unittest::Suite::run(frameworkGlobalParams.suites, - frameworkGlobalParams.filter, - frameworkGlobalParams.runsPerTest); + int ret = unittest::Suite::run(frameworkGlobalParams.suites, + frameworkGlobalParams.filter, + frameworkGlobalParams.runsPerTest); - - exitCleanly((ExitCode)ret); // so everything shuts down cleanly + // So everything shuts down cleanly + exitCleanly((ExitCode)ret); return ret; } + } // namespace dbtests +namespace unittest { + +void onCurrentTestNameChange(const std::string& testName) { + stdx::lock_guard<stdx::mutex> lk(globalCurrentTestNameMutex); + globalCurrentTestName = testName; +} + +} // namespace unittest + #ifdef _WIN32 namespace ntservice { + bool shouldStartService() { return false; } -} + +} // namespace ntservice #endif } // namespace mongo - -void mongo::unittest::onCurrentTestNameChange(const std::string& testName) { - stdx::lock_guard<stdx::mutex> lk(mongo::dbtests::globalCurrentTestNameMutex); - mongo::dbtests::globalCurrentTestName = testName; -} diff --git a/src/mongo/dbtests/merge_chunk_tests.cpp b/src/mongo/dbtests/merge_chunk_tests.cpp index 9d9a0397d6e..16e010edf27 100644 --- a/src/mongo/dbtests/merge_chunk_tests.cpp +++ b/src/mongo/dbtests/merge_chunk_tests.cpp @@ -26,14 +26,16 @@ * then also delete it in the license file. */ +#include "mongo/platform/basic.h" + #include "mongo/db/range_arithmetic.h" +#include "mongo/db/s/sharding_state.h" #include "mongo/dbtests/config_server_fixture.h" #include "mongo/s/catalog/type_chunk.h" #include "mongo/s/catalog/type_collection.h" #include "mongo/s/chunk.h" // for genID #include "mongo/s/chunk_version.h" #include "mongo/s/collection_metadata.h" -#include "mongo/s/d_state.h" #include "mongo/s/d_merge.h" #include "mongo/unittest/unittest.h" @@ -331,4 +333,4 @@ TEST_F(MergeChunkTests, CompoundMerge) { assertWrittenAsMerged(ranges); } -} // end namespace +} // namespace mongo diff --git a/src/mongo/s/d_merge.cpp b/src/mongo/s/d_merge.cpp index 859d9dc4a00..8f96a6fce3b 100644 --- a/src/mongo/s/d_merge.cpp +++ b/src/mongo/s/d_merge.cpp @@ -33,13 +33,15 @@ #include <vector> #include "mongo/client/connpool.h" +#include "mongo/db/client.h" #include "mongo/db/concurrency/d_concurrency.h" #include "mongo/db/namespace_string.h" #include "mongo/db/operation_context.h" +#include "mongo/db/s/sharding_state.h" #include "mongo/s/catalog/catalog_manager.h" #include "mongo/s/chunk.h" +#include "mongo/s/collection_metadata.h" #include "mongo/s/config.h" -#include "mongo/s/d_state.h" #include "mongo/s/catalog/dist_lock_manager.h" #include "mongo/s/grid.h" #include "mongo/util/log.h" @@ -47,7 +49,7 @@ namespace mongo { -using std::endl; +using std::shared_ptr; using std::string; using mongoutils::str::stream; @@ -59,8 +61,6 @@ static BSONObj buildMergeLogEntry(const std::vector<ChunkType>&, const ChunkVersion&, const ChunkVersion&); -static bool isEmptyChunk(const ChunkType&); - bool mergeChunks(OperationContext* txn, const NamespaceString& nss, const BSONObj& minKey, @@ -73,7 +73,7 @@ bool mergeChunks(OperationContext* txn, ConnectionString configLoc = ConnectionString::parse(shardingState.getConfigServer(), *errMsg); if (!configLoc.isValid()) { - warning() << *errMsg << endl; + warning() << *errMsg; return false; } @@ -90,7 +90,7 @@ bool mergeChunks(OperationContext* txn, << " to merge chunks in [" << minKey << "," << maxKey << ")" << causedBy(scopedDistLock.getStatus()); - warning() << *errMsg << endl; + warning() << *errMsg; return false; } @@ -105,7 +105,7 @@ bool mergeChunks(OperationContext* txn, *errMsg = str::stream() << "could not merge chunks, failed to refresh metadata for " << nss.ns() << causedBy(status.reason()); - warning() << *errMsg << endl; + warning() << *errMsg; return false; } @@ -115,17 +115,17 @@ bool mergeChunks(OperationContext* txn, << "(sent epoch : " << epoch.toString() << ", current epoch : " << shardVersion.epoch().toString() << ")"; - warning() << *errMsg << endl; + warning() << *errMsg; return false; } - CollectionMetadataPtr metadata = shardingState.getCollectionMetadata(nss.ns()); + shared_ptr<CollectionMetadata> metadata = shardingState.getCollectionMetadata(nss.ns()); if (!metadata || metadata->getKeyPattern().isEmpty()) { *errMsg = stream() << "could not merge chunks, collection " << nss.ns() << " is not sharded"; - warning() << *errMsg << endl; + warning() << *errMsg; return false; } @@ -137,7 +137,7 @@ bool mergeChunks(OperationContext* txn, << " for collection " << nss.ns() << " with key pattern " << metadata->getKeyPattern(); - warning() << *errMsg << endl; + warning() << *errMsg; return false; } @@ -166,7 +166,7 @@ bool mergeChunks(OperationContext* txn, << " range starting at " << minKey << " and ending at " << maxKey << " does not belong to shard " << shardingState.getShardName(); - warning() << *errMsg << endl; + warning() << *errMsg; return false; } @@ -184,7 +184,7 @@ bool mergeChunks(OperationContext* txn, << " range starting at " << minKey << " does not belong to shard " << shardingState.getShardName(); - warning() << *errMsg << endl; + warning() << *errMsg; return false; } @@ -198,7 +198,7 @@ bool mergeChunks(OperationContext* txn, << " range ending at " << maxKey << " does not belong to shard " << shardingState.getShardName(); - warning() << *errMsg << endl; + warning() << *errMsg; return false; } @@ -212,7 +212,7 @@ bool mergeChunks(OperationContext* txn, << (!validRangeStartKey && !validRangeEndKey ? " or " : "") << (!validRangeEndKey ? "ending at " + maxKey.toString() : ""); - warning() << *errMsg << endl; + warning() << *errMsg; return false; } @@ -220,7 +220,7 @@ bool mergeChunks(OperationContext* txn, *errMsg = stream() << "could not merge chunks, collection " << nss.ns() << " already contains chunk for " << rangeToString(minKey, maxKey); - warning() << *errMsg << endl; + warning() << *errMsg; return false; } @@ -232,7 +232,7 @@ bool mergeChunks(OperationContext* txn, << " has a hole in the range " << rangeToString(minKey, maxKey) << " at " << rangeToString(chunksToMerge[i - 1].getMax(), chunksToMerge[i].getMin()); - warning() << *errMsg << endl; + warning() << *errMsg; return false; } } diff --git a/src/mongo/s/d_migrate.cpp b/src/mongo/s/d_migrate.cpp index f8266c05db5..5b143d59b32 100644 --- a/src/mongo/s/d_migrate.cpp +++ b/src/mongo/s/d_migrate.cpp @@ -64,6 +64,8 @@ #include "mongo/db/repl/replication_coordinator_global.h" #include "mongo/db/service_context.h" #include "mongo/db/storage/mmap_v1/dur.h" +#include "mongo/db/s/sharded_connection_info.h" +#include "mongo/db/s/sharding_state.h" #include "mongo/db/write_concern.h" #include "mongo/logger/ramlog.h" #include "mongo/s/catalog/catalog_manager.h" @@ -1125,8 +1127,9 @@ public: warning() << errmsg; return false; } - string configdb = cmdObj["configdb"].String(); - ShardingState::initialize(configdb); + + const string configdb = cmdObj["configdb"].String(); + shardingState.initialize(configdb); } // Initialize our current shard name in the shard state if needed @@ -1223,7 +1226,8 @@ public: } // Get collection metadata - const CollectionMetadataPtr origCollMetadata(shardingState.getCollectionMetadata(ns)); + const std::shared_ptr<CollectionMetadata> origCollMetadata( + shardingState.getCollectionMetadata(ns)); // With nonzero shard version, we must have metadata invariant(NULL != origCollMetadata); @@ -2679,7 +2683,7 @@ public: if (!shardingState.enabled()) { if (!cmdObj["configServer"].eoo()) { dassert(cmdObj["configServer"].type() == String); - ShardingState::initialize(cmdObj["configServer"].String()); + shardingState.initialize(cmdObj["configServer"].String()); } else { errmsg = str::stream() << "cannot start recv'ing chunk, " diff --git a/src/mongo/s/d_split.cpp b/src/mongo/s/d_split.cpp index 3004f50c1cb..fba6f67f6f1 100644 --- a/src/mongo/s/d_split.cpp +++ b/src/mongo/s/d_split.cpp @@ -53,12 +53,12 @@ #include "mongo/db/instance.h" #include "mongo/db/jsobj.h" #include "mongo/db/query/internal_plans.h" +#include "mongo/db/s/sharding_state.h" #include "mongo/s/catalog/catalog_manager.h" #include "mongo/s/catalog/type_chunk.h" #include "mongo/s/chunk.h" #include "mongo/s/chunk_version.h" #include "mongo/s/config.h" -#include "mongo/s/d_state.h" #include "mongo/s/catalog/dist_lock_manager.h" #include "mongo/s/grid.h" #include "mongo/s/shard_key_pattern.h" @@ -623,8 +623,9 @@ public: warning() << errmsg << endl; return false; } - string configdb = cmdObj["configdb"].String(); - ShardingState::initialize(configdb); + + const string configdb = cmdObj["configdb"].String(); + shardingState.initialize(configdb); } // Initialize our current shard name in the shard state if needed diff --git a/src/mongo/s/d_state.cpp b/src/mongo/s/d_state.cpp index bb0735ee53d..e2c438a3b38 100644 --- a/src/mongo/s/d_state.cpp +++ b/src/mongo/s/d_state.cpp @@ -34,13 +34,9 @@ #include "mongo/s/d_state.h" -#include <map> -#include <string> #include <vector> #include "mongo/client/connpool.h" -#include "mongo/client/global_conn_pool.h" -#include "mongo/client/remote_command_targeter_factory_impl.h" #include "mongo/db/auth/action_set.h" #include "mongo/db/auth/action_type.h" #include "mongo/db/auth/authorization_manager.h" @@ -53,17 +49,12 @@ #include "mongo/db/lasterror.h" #include "mongo/db/operation_context.h" #include "mongo/db/repl/replication_coordinator_global.h" -#include "mongo/db/repl/replication_executor.h" +#include "mongo/db/s/sharded_connection_info.h" +#include "mongo/db/s/sharding_state.h" #include "mongo/db/wire_version.h" -#include "mongo/executor/network_interface_factory.h" -#include "mongo/executor/task_executor.h" -#include "mongo/s/catalog/legacy/catalog_manager_legacy.h" -#include "mongo/s/client/shard_connection.h" -#include "mongo/s/client/shard_registry.h" -#include "mongo/s/client/sharding_connection_hook.h" +#include "mongo/s/collection_metadata.h" #include "mongo/s/config.h" #include "mongo/s/grid.h" -#include "mongo/s/metadata_loader.h" #include "mongo/s/stale_exception.h" #include "mongo/stdx/memory.h" #include "mongo/util/concurrency/mutex.h" @@ -73,797 +64,24 @@ namespace mongo { -using boost::optional; -using std::endl; +using std::shared_ptr; using std::string; using std::stringstream; using std::vector; -namespace { -const auto clientSCI = Client::declareDecoration<optional<ShardedConnectionInfo>>(); -} // namespace - bool isMongos() { return false; } - -// -----ShardingState START ---- - -ShardingState::ShardingState() - : _enabled(false), - _configServerTickets(3 /* max number of concurrent config server refresh threads */) {} - -bool ShardingState::enabled() { - stdx::lock_guard<stdx::mutex> lk(_mutex); - return _enabled; -} - -string ShardingState::getConfigServer() { - stdx::lock_guard<stdx::mutex> lk(_mutex); - invariant(_enabled); - - return grid.catalogManager()->connectionString().toString(); -} - -void ShardingState::initialize(const string& server) { - uassert(18509, - "Unable to obtain host name during sharding initialization.", - !getHostName().empty()); - - shardingState._initialize(server); -} - -// TODO: Consolidate and eliminate these various ways of setting / validating shard names -bool ShardingState::setShardName(const string& name) { - return setShardNameAndHost(name, ""); -} - -std::string ShardingState::getShardName() { - stdx::lock_guard<stdx::mutex> lk(_mutex); - return _shardName; -} - -bool ShardingState::setShardNameAndHost(const string& name, const string& host) { - stdx::lock_guard<stdx::mutex> lk(_mutex); - if (_shardName.size() == 0) { - // TODO SERVER-2299 remotely verify the name is sound w.r.t IPs - _shardName = name; - - string clientAddr = cc().clientAddress(true); - - log() << "remote client " << clientAddr << " initialized this host " - << (host.empty() ? string("") : string("(") + host + ") ") << "as shard " << name; - - return true; - } - - if (_shardName == name) - return true; - - string clientAddr = cc().clientAddress(true); - - warning() << "remote client " << clientAddr << " tried to initialize this host " - << (host.empty() ? string("") : string("(") + host + ") ") << "as shard " << name - << ", but shard name was previously initialized as " << _shardName; - - return false; -} - -void ShardingState::gotShardName(const string& name) { - gotShardNameAndHost(name, ""); -} - -void ShardingState::gotShardNameAndHost(const string& name, const string& host) { - if (setShardNameAndHost(name, host)) - return; - - string clientAddr = cc().clientAddress(true); - stringstream ss; - - // Same error as above, to match for reporting - ss << "remote client " << clientAddr << " tried to initialize this host " - << (host.empty() ? string("") : string("(") + host + ") ") << "as shard " << name - << ", but shard name was previously initialized as " << _shardName; - - msgasserted(13298, ss.str()); -} - -void ShardingState::clearCollectionMetadata() { - stdx::lock_guard<stdx::mutex> lk(_mutex); - _collMetadata.clear(); -} - -// TODO we shouldn't need three ways for checking the version. Fix this. -bool ShardingState::hasVersion(const string& ns) { - stdx::lock_guard<stdx::mutex> lk(_mutex); - - CollectionMetadataMap::const_iterator it = _collMetadata.find(ns); - return it != _collMetadata.end(); -} - -bool ShardingState::hasVersion(const string& ns, ChunkVersion& version) { - stdx::lock_guard<stdx::mutex> lk(_mutex); - - CollectionMetadataMap::const_iterator it = _collMetadata.find(ns); - if (it == _collMetadata.end()) - return false; - - CollectionMetadataPtr p = it->second; - version = p->getShardVersion(); - return true; -} - -ChunkVersion ShardingState::getVersion(const string& ns) { - stdx::lock_guard<stdx::mutex> lk(_mutex); - - CollectionMetadataMap::const_iterator it = _collMetadata.find(ns); - if (it != _collMetadata.end()) { - CollectionMetadataPtr p = it->second; - return p->getShardVersion(); - } else { - return ChunkVersion(0, 0, OID()); - } -} - -void ShardingState::donateChunk(OperationContext* txn, - const string& ns, - const BSONObj& min, - const BSONObj& max, - ChunkVersion version) { - invariant(txn->lockState()->isCollectionLockedForMode(ns, MODE_X)); - stdx::lock_guard<stdx::mutex> lk(_mutex); - - CollectionMetadataMap::const_iterator it = _collMetadata.find(ns); - verify(it != _collMetadata.end()); - CollectionMetadataPtr p = it->second; - - // empty shards should have version 0 - version = (p->getNumChunks() > 1) ? version : ChunkVersion(0, 0, p->getCollVersion().epoch()); - - ChunkType chunk; - chunk.setMin(min); - chunk.setMax(max); - string errMsg; - - CollectionMetadataPtr cloned(p->cloneMigrate(chunk, version, &errMsg)); - // uassert to match old behavior, TODO: report errors w/o throwing - uassert(16855, errMsg, NULL != cloned.get()); - - // TODO: a bit dangerous to have two different zero-version states - no-metadata and - // no-version - _collMetadata[ns] = cloned; -} - -void ShardingState::undoDonateChunk(OperationContext* txn, - const string& ns, - CollectionMetadataPtr prevMetadata) { - invariant(txn->lockState()->isCollectionLockedForMode(ns, MODE_X)); - stdx::lock_guard<stdx::mutex> lk(_mutex); - - log() << "ShardingState::undoDonateChunk acquired _mutex" << endl; - - CollectionMetadataMap::iterator it = _collMetadata.find(ns); - verify(it != _collMetadata.end()); - it->second = prevMetadata; -} - -bool ShardingState::notePending(OperationContext* txn, - const string& ns, - const BSONObj& min, - const BSONObj& max, - const OID& epoch, - string* errMsg) { - invariant(txn->lockState()->isCollectionLockedForMode(ns, MODE_X)); - stdx::lock_guard<stdx::mutex> lk(_mutex); - - CollectionMetadataMap::const_iterator it = _collMetadata.find(ns); - if (it == _collMetadata.end()) { - *errMsg = str::stream() << "could not note chunk " - << "[" << min << "," << max << ")" - << " as pending because the local metadata for " << ns - << " has changed"; - - return false; - } - - CollectionMetadataPtr metadata = it->second; - - // This can currently happen because drops aren't synchronized with in-migrations - // The idea for checking this here is that in the future we shouldn't have this problem - if (metadata->getCollVersion().epoch() != epoch) { - *errMsg = str::stream() << "could not note chunk " - << "[" << min << "," << max << ")" - << " as pending because the epoch for " << ns - << " has changed from " << epoch << " to " - << metadata->getCollVersion().epoch(); - - return false; - } - - ChunkType chunk; - chunk.setMin(min); - chunk.setMax(max); - - CollectionMetadataPtr cloned(metadata->clonePlusPending(chunk, errMsg)); - if (!cloned) - return false; - - _collMetadata[ns] = cloned; - return true; -} - -bool ShardingState::forgetPending(OperationContext* txn, - const string& ns, - const BSONObj& min, - const BSONObj& max, - const OID& epoch, - string* errMsg) { - invariant(txn->lockState()->isCollectionLockedForMode(ns, MODE_X)); - stdx::lock_guard<stdx::mutex> lk(_mutex); - - CollectionMetadataMap::const_iterator it = _collMetadata.find(ns); - if (it == _collMetadata.end()) { - *errMsg = str::stream() << "no need to forget pending chunk " - << "[" << min << "," << max << ")" - << " because the local metadata for " << ns << " has changed"; - - return false; - } - - CollectionMetadataPtr metadata = it->second; - - // This can currently happen because drops aren't synchronized with in-migrations - // The idea for checking this here is that in the future we shouldn't have this problem - if (metadata->getCollVersion().epoch() != epoch) { - *errMsg = str::stream() << "no need to forget pending chunk " - << "[" << min << "," << max << ")" - << " because the epoch for " << ns << " has changed from " << epoch - << " to " << metadata->getCollVersion().epoch(); - - return false; - } - - ChunkType chunk; - chunk.setMin(min); - chunk.setMax(max); - - CollectionMetadataPtr cloned(metadata->cloneMinusPending(chunk, errMsg)); - if (!cloned) - return false; - - _collMetadata[ns] = cloned; - return true; -} - -void ShardingState::splitChunk(OperationContext* txn, - const string& ns, - const BSONObj& min, - const BSONObj& max, - const vector<BSONObj>& splitKeys, - ChunkVersion version) { - invariant(txn->lockState()->isCollectionLockedForMode(ns, MODE_X)); - stdx::lock_guard<stdx::mutex> lk(_mutex); - - CollectionMetadataMap::const_iterator it = _collMetadata.find(ns); - verify(it != _collMetadata.end()); - - ChunkType chunk; - chunk.setMin(min); - chunk.setMax(max); - string errMsg; - - CollectionMetadataPtr cloned(it->second->cloneSplit(chunk, splitKeys, version, &errMsg)); - // uassert to match old behavior, TODO: report errors w/o throwing - uassert(16857, errMsg, NULL != cloned.get()); - - _collMetadata[ns] = cloned; -} - -void ShardingState::mergeChunks(OperationContext* txn, - const string& ns, - const BSONObj& minKey, - const BSONObj& maxKey, - ChunkVersion mergedVersion) { - invariant(txn->lockState()->isCollectionLockedForMode(ns, MODE_X)); - stdx::lock_guard<stdx::mutex> lk(_mutex); - - CollectionMetadataMap::const_iterator it = _collMetadata.find(ns); - verify(it != _collMetadata.end()); - - string errMsg; - - CollectionMetadataPtr cloned(it->second->cloneMerge(minKey, maxKey, mergedVersion, &errMsg)); - // uassert to match old behavior, TODO: report errors w/o throwing - uassert(17004, errMsg, NULL != cloned.get()); - - _collMetadata[ns] = cloned; -} - -void ShardingState::resetMetadata(const string& ns) { - stdx::lock_guard<stdx::mutex> lk(_mutex); - - warning() << "resetting metadata for " << ns << ", this should only be used in testing" << endl; - - _collMetadata.erase(ns); -} - -Status ShardingState::refreshMetadataIfNeeded(OperationContext* txn, - const string& ns, - const ChunkVersion& reqShardVersion, - ChunkVersion* latestShardVersion) { - // The _configServerTickets serializes this process such that only a small number of threads - // can try to refresh at the same time. - - LOG(2) << "metadata refresh requested for " << ns << " at shard version " << reqShardVersion - << endl; - - // - // Queuing of refresh requests starts here when remote reload is needed. This may take time. - // TODO: Explicitly expose the queuing discipline. - // - - _configServerTickets.waitForTicket(); - TicketHolderReleaser needTicketFrom(&_configServerTickets); - - // - // Fast path - check if the requested version is at a higher version than the current - // metadata version or a different epoch before verifying against config server. - // - - CollectionMetadataPtr storedMetadata; - { - stdx::lock_guard<stdx::mutex> lk(_mutex); - CollectionMetadataMap::iterator it = _collMetadata.find(ns); - if (it != _collMetadata.end()) - storedMetadata = it->second; - } - ChunkVersion storedShardVersion; - if (storedMetadata) - storedShardVersion = storedMetadata->getShardVersion(); - *latestShardVersion = storedShardVersion; - - if (storedShardVersion >= reqShardVersion && - storedShardVersion.epoch() == reqShardVersion.epoch()) { - // Don't need to remotely reload if we're in the same epoch with a >= version - return Status::OK(); - } - - // - // Slow path - remotely reload - // - // Cases: - // A) Initial config load and/or secondary take-over. - // B) Migration TO this shard finished, notified by mongos. - // C) Dropping a collection, notified (currently) by mongos. - // D) Stale client wants to reload metadata with a different *epoch*, so we aren't sure. - - if (storedShardVersion.epoch() != reqShardVersion.epoch()) { - // Need to remotely reload if our epochs aren't the same, to verify - LOG(1) << "metadata change requested for " << ns << ", from shard version " - << storedShardVersion << " to " << reqShardVersion - << ", need to verify with config server" << endl; - } else { - // Need to remotely reload since our epochs aren't the same but our version is greater - LOG(1) << "metadata version update requested for " << ns << ", from shard version " - << storedShardVersion << " to " << reqShardVersion - << ", need to verify with config server" << endl; - } - - return doRefreshMetadata(txn, ns, reqShardVersion, true, latestShardVersion); -} - -Status ShardingState::refreshMetadataNow(OperationContext* txn, - const string& ns, - ChunkVersion* latestShardVersion) { - return doRefreshMetadata(txn, ns, ChunkVersion(0, 0, OID()), false, latestShardVersion); -} - -void ShardingState::_initialize(const string& server) { - // Ensure only one caller at a time initializes - stdx::lock_guard<stdx::mutex> lk(_mutex); - - if (_enabled) { - // TODO: Do we need to throw exception if the config servers have changed from what we - // already have in place? How do we test for that? - return; - } - - ShardedConnectionInfo::addHook(); - - std::string errmsg; - ConnectionString configServerCS = ConnectionString::parse(server, errmsg); - uassert(28633, - str::stream() << "Invalid config server connection string: " << errmsg, - configServerCS.isValid()); - - auto catalogManager = stdx::make_unique<CatalogManagerLegacy>(); - uassertStatusOK(catalogManager->init(configServerCS)); - - auto shardRegistry(stdx::make_unique<ShardRegistry>( - stdx::make_unique<RemoteCommandTargeterFactoryImpl>(), - stdx::make_unique<repl::ReplicationExecutor>( - executor::makeNetworkInterface().release(), nullptr, 0), - nullptr, - catalogManager.get())); - shardRegistry->startup(); - - grid.init(std::move(catalogManager), std::move(shardRegistry)); - - _enabled = true; -} - -Status ShardingState::doRefreshMetadata(OperationContext* txn, - const string& ns, - const ChunkVersion& reqShardVersion, - bool useRequestedVersion, - ChunkVersion* latestShardVersion) { - // The idea here is that we're going to reload the metadata from the config server, but - // we need to do so outside any locks. When we get our result back, if the current metadata - // has changed, we may not be able to install the new metadata. - - // - // Get the initial metadata - // No DBLock is needed since the metadata is expected to change during reload. - // - - CollectionMetadataPtr beforeMetadata; - - { - stdx::lock_guard<stdx::mutex> lk(_mutex); - - // We can't reload if sharding is not enabled - i.e. without a config server location - if (!_enabled) { - string errMsg = str::stream() << "cannot refresh metadata for " << ns - << " before sharding has been enabled"; - - warning() << errMsg; - return Status(ErrorCodes::NotYetInitialized, errMsg); - } - - // We also can't reload if a shard name has not yet been set. - if (_shardName.empty()) { - string errMsg = str::stream() << "cannot refresh metadata for " << ns - << " before shard name has been set"; - - warning() << errMsg; - return Status(ErrorCodes::NotYetInitialized, errMsg); - } - - CollectionMetadataMap::iterator it = _collMetadata.find(ns); - if (it != _collMetadata.end()) { - beforeMetadata = it->second; - } - } - - ChunkVersion beforeShardVersion; - ChunkVersion beforeCollVersion; - if (beforeMetadata) { - beforeShardVersion = beforeMetadata->getShardVersion(); - beforeCollVersion = beforeMetadata->getCollVersion(); - } - - *latestShardVersion = beforeShardVersion; - - // - // Determine whether we need to diff or fully reload - // - - bool fullReload = false; - if (!beforeMetadata) { - // We don't have any metadata to reload from - fullReload = true; - } else if (useRequestedVersion && reqShardVersion.epoch() != beforeShardVersion.epoch()) { - // It's not useful to use the metadata as a base because we think the epoch will differ - fullReload = true; - } - - // - // Load the metadata from the remote server, start construction - // - - LOG(0) << "remotely refreshing metadata for " << ns - << (useRequestedVersion - ? string(" with requested shard version ") + reqShardVersion.toString() - : "") - << (fullReload ? ", current shard version is " : " based on current shard version ") - << beforeShardVersion << ", current metadata version is " << beforeCollVersion << endl; - - string errMsg; - - MetadataLoader mdLoader; - CollectionMetadata* remoteMetadataRaw = new CollectionMetadata(); - CollectionMetadataPtr remoteMetadata(remoteMetadataRaw); - - Timer refreshTimer; - Status status = mdLoader.makeCollectionMetadata(grid.catalogManager(), - ns, - getShardName(), - fullReload ? NULL : beforeMetadata.get(), - remoteMetadataRaw); - long long refreshMillis = refreshTimer.millis(); - - if (status.code() == ErrorCodes::NamespaceNotFound) { - remoteMetadata.reset(); - remoteMetadataRaw = NULL; - } else if (!status.isOK()) { - warning() << "could not remotely refresh metadata for " << ns << causedBy(status.reason()) - << endl; - - return status; - } - - ChunkVersion remoteShardVersion; - ChunkVersion remoteCollVersion; - if (remoteMetadata) { - remoteShardVersion = remoteMetadata->getShardVersion(); - remoteCollVersion = remoteMetadata->getCollVersion(); - } - - // - // Get ready to install loaded metadata if needed - // - - CollectionMetadataPtr afterMetadata; - ChunkVersion afterShardVersion; - ChunkVersion afterCollVersion; - ChunkVersion::VersionChoice choice; - - // If we choose to install the new metadata, this describes the kind of install - enum InstallType { - InstallType_New, - InstallType_Update, - InstallType_Replace, - InstallType_Drop, - InstallType_None - } installType = InstallType_None; // compiler complains otherwise - - { - // Exclusive collection lock needed since we're now potentially changing the metadata, - // and don't want reads/writes to be ongoing. - ScopedTransaction transaction(txn, MODE_IX); - Lock::DBLock dbLock(txn->lockState(), nsToDatabaseSubstring(ns), MODE_IX); - Lock::CollectionLock collLock(txn->lockState(), ns, MODE_X); - - // - // Get the metadata now that the load has completed - // - - stdx::lock_guard<stdx::mutex> lk(_mutex); - - // Don't reload if our config server has changed or sharding is no longer enabled - if (!_enabled) { - string errMsg = str::stream() << "could not refresh metadata for " << ns - << ", sharding is no longer enabled"; - - warning() << errMsg; - return Status(ErrorCodes::NotYetInitialized, errMsg); - } - - CollectionMetadataMap::iterator it = _collMetadata.find(ns); - if (it != _collMetadata.end()) - afterMetadata = it->second; - - if (afterMetadata) { - afterShardVersion = afterMetadata->getShardVersion(); - afterCollVersion = afterMetadata->getCollVersion(); - } - - *latestShardVersion = afterShardVersion; - - // - // Resolve newer pending chunks with the remote metadata, finish construction - // - - status = mdLoader.promotePendingChunks(afterMetadata.get(), remoteMetadataRaw); - - if (!status.isOK()) { - warning() << "remote metadata for " << ns - << " is inconsistent with current pending chunks" << causedBy(status.reason()) - << endl; - - return status; - } - - // - // Compare the 'before', 'after', and 'remote' versions/epochs and choose newest - // Zero-epochs (sentinel value for "dropped" collections), are tested by - // !epoch.isSet(). - // - - choice = ChunkVersion::chooseNewestVersion( - beforeCollVersion, afterCollVersion, remoteCollVersion); - - if (choice == ChunkVersion::VersionChoice_Remote) { - dassert(!remoteCollVersion.epoch().isSet() || remoteShardVersion >= beforeShardVersion); - - if (!afterCollVersion.epoch().isSet()) { - // First metadata load - installType = InstallType_New; - dassert(it == _collMetadata.end()); - _collMetadata.insert(make_pair(ns, remoteMetadata)); - } else if (remoteCollVersion.epoch().isSet() && - remoteCollVersion.epoch() == afterCollVersion.epoch()) { - // Update to existing metadata - installType = InstallType_Update; - - // Invariant: If CollMetadata was not found, version should be have been 0. - dassert(it != _collMetadata.end()); - it->second = remoteMetadata; - } else if (remoteCollVersion.epoch().isSet()) { - // New epoch detected, replacing metadata - installType = InstallType_Replace; - - // Invariant: If CollMetadata was not found, version should be have been 0. - dassert(it != _collMetadata.end()); - it->second = remoteMetadata; - } else { - dassert(!remoteCollVersion.epoch().isSet()); - - // Drop detected - installType = InstallType_Drop; - _collMetadata.erase(it); - } - - *latestShardVersion = remoteShardVersion; - } - } - // End _mutex - // End DBWrite - - // - // Do messaging based on what happened above - // - string localShardVersionMsg = beforeShardVersion.epoch() == afterShardVersion.epoch() - ? afterShardVersion.toString() - : beforeShardVersion.toString() + " / " + afterShardVersion.toString(); - - if (choice == ChunkVersion::VersionChoice_Unknown) { - string errMsg = str::stream() - << "need to retry loading metadata for " << ns - << ", collection may have been dropped or recreated during load" - << " (loaded shard version : " << remoteShardVersion.toString() - << ", stored shard versions : " << localShardVersionMsg << ", took " << refreshMillis - << "ms)"; - - warning() << errMsg; - return Status(ErrorCodes::RemoteChangeDetected, errMsg); - } - - if (choice == ChunkVersion::VersionChoice_Local) { - LOG(0) << "metadata of collection " << ns - << " already up to date (shard version : " << afterShardVersion.toString() - << ", took " << refreshMillis << "ms)" << endl; - return Status::OK(); - } - - dassert(choice == ChunkVersion::VersionChoice_Remote); - - switch (installType) { - case InstallType_New: - LOG(0) << "collection " << ns << " was previously unsharded" - << ", new metadata loaded with shard version " << remoteShardVersion << endl; - break; - case InstallType_Update: - LOG(0) << "updating metadata for " << ns << " from shard version " - << localShardVersionMsg << " to shard version " << remoteShardVersion << endl; - break; - case InstallType_Replace: - LOG(0) << "replacing metadata for " << ns << " at shard version " - << localShardVersionMsg << " with a new epoch (shard version " - << remoteShardVersion << ")" << endl; - break; - case InstallType_Drop: - LOG(0) << "dropping metadata for " << ns << " at shard version " << localShardVersionMsg - << ", took " << refreshMillis << "ms" << endl; - break; - default: - verify(false); - break; - } - - if (installType != InstallType_Drop) { - LOG(0) << "collection version was loaded at version " << remoteCollVersion << ", took " - << refreshMillis << "ms" << endl; - } - - return Status::OK(); -} - -void ShardingState::appendInfo(BSONObjBuilder& builder) { - stdx::lock_guard<stdx::mutex> lk(_mutex); - - builder.appendBool("enabled", _enabled); - if (!_enabled) { - return; - } - - builder.append("configServer", grid.catalogManager()->connectionString().toString()); - builder.append("shardName", _shardName); - - BSONObjBuilder versionB(builder.subobjStart("versions")); - for (CollectionMetadataMap::const_iterator it = _collMetadata.begin(); - it != _collMetadata.end(); - ++it) { - CollectionMetadataPtr metadata = it->second; - versionB.appendTimestamp(it->first, metadata->getShardVersion().toLong()); - } - - versionB.done(); +ShardForceVersionOkModeBlock::ShardForceVersionOkModeBlock(Client* client) { + info = ShardedConnectionInfo::get(client, false); + if (info) + info->enterForceVersionOkMode(); } -bool ShardingState::needCollectionMetadata(Client* client, const string& ns) const { - if (!_enabled) - return false; - - if (!ShardedConnectionInfo::get(client, false)) - return false; - - return true; -} - -CollectionMetadataPtr ShardingState::getCollectionMetadata(const string& ns) { - stdx::lock_guard<stdx::mutex> lk(_mutex); - - CollectionMetadataMap::const_iterator it = _collMetadata.find(ns); - if (it == _collMetadata.end()) { - return CollectionMetadataPtr(); - } else { - return it->second; - } -} - -ShardingState shardingState; - -// -----ShardingState END ---- - -// -----ShardedConnectionInfo START ---- - -ShardedConnectionInfo::ShardedConnectionInfo() { - _forceVersionOk = false; -} - -ShardedConnectionInfo* ShardedConnectionInfo::get(Client* client, bool create) { - auto& current = clientSCI(client); - - if (!current && create) { - LOG(1) << "entering shard mode for connection" << endl; - current = boost::in_place(); - } - - return current ? ¤t.value() : nullptr; -} - -void ShardedConnectionInfo::reset(Client* client) { - clientSCI(client) = boost::none; -} - -const ChunkVersion ShardedConnectionInfo::getVersion(const string& ns) const { - NSVersionMap::const_iterator it = _versions.find(ns); - if (it != _versions.end()) { - return it->second; - } else { - return ChunkVersion(0, 0, OID()); - } -} - -void ShardedConnectionInfo::setVersion(const string& ns, const ChunkVersion& version) { - _versions[ns] = version; -} - -void ShardedConnectionInfo::addHook() { - static stdx::mutex lock; - static bool done = false; - - stdx::lock_guard<stdx::mutex> lk(lock); - if (!done) { - log() << "first cluster operation detected, adding sharding hook to enable versioning " - "and authentication to remote servers"; - - globalConnPool.addHook(new ShardingConnectionHook(false)); - shardConnectionPool.addHook(new ShardingConnectionHook(true)); - done = true; - } +ShardForceVersionOkModeBlock::~ShardForceVersionOkModeBlock() { + if (info) + info->leaveForceVersionOkMode(); } class MongodShardCommand : public Command { @@ -978,7 +196,7 @@ public: } if (locked) { - ShardingState::initialize(configdb); + shardingState.initialize(configdb); return true; } @@ -1121,7 +339,7 @@ public: // TODO: Refactor all of this if (version < globalVersion && version.hasEqualEpoch(globalVersion)) { while (shardingState.inCriticalMigrateSection()) { - log() << "waiting till out of critical section" << endl; + log() << "waiting till out of critical section"; shardingState.waitTillNotInCriticalSection(10); } errmsg = str::stream() << "shard global version for collection is higher " @@ -1137,7 +355,7 @@ public: // Needed b/c when the last chunk is moved off a shard, // the version gets reset to zero, which should require a reload. while (shardingState.inCriticalMigrateSection()) { - log() << "waiting till out of critical section" << endl; + log() << "waiting till out of critical section"; shardingState.waitTillNotInCriticalSection(10); } @@ -1162,7 +380,7 @@ public: << ", stored shard version is " << currVersion.toString() << causedBy(status.reason()); - warning() << errmsg << endl; + warning() << errmsg; result.append("ns", ns); version.addToBSON(result, "version"); @@ -1179,7 +397,7 @@ public: << ", requested version is " << version.toString() << " but found version " << currVersion.toString(); - OCCASIONALLY warning() << errmsg << endl; + OCCASIONALLY warning() << errmsg; // WARNING: the exact fields below are important for compatibility with mongos // version reload. @@ -1264,7 +482,7 @@ public: } if (cmdObj["fullMetadata"].trueValue()) { - CollectionMetadataPtr metadata = shardingState.getCollectionMetadata(ns); + shared_ptr<CollectionMetadata> metadata = shardingState.getCollectionMetadata(ns); if (metadata) { result.append("metadata", metadata->toBSON()); } else { diff --git a/src/mongo/s/d_state.h b/src/mongo/s/d_state.h index 917f9fe6ab9..d435e621c9a 100644 --- a/src/mongo/s/d_state.h +++ b/src/mongo/s/d_state.h @@ -29,319 +29,18 @@ #pragma once -#include "mongo/db/client.h" -#include "mongo/db/jsobj.h" -#include "mongo/s/collection_metadata.h" -#include "mongo/s/chunk_version.h" -#include "mongo/util/concurrency/ticketholder.h" -#include "mongo/util/net/message.h" +#include <string> namespace mongo { -class Database; -class RecordId; +class BSONObj; +class Client; class OperationContext; - -// -------------- -// --- global state --- -// -------------- - -class ShardingState { -public: - ShardingState(); - - bool enabled(); - std::string getConfigServer(); - - // Initialize sharding state and begin authenticating outgoing connections and handling - // shard versions. If this is not run before sharded operations occur auth will not work - // and versions will not be tracked. - static void initialize(const std::string& server); - - void gotShardName(const std::string& name); - bool setShardName(const std::string& name); // Same as above, does not throw - std::string getShardName(); - - // Helpers for SetShardVersion which report the host name sent to this shard when the shard - // name does not match. Do not use in other places. - // TODO: Remove once SSV is deprecated - void gotShardNameAndHost(const std::string& name, const std::string& host); - bool setShardNameAndHost(const std::string& name, const std::string& host); - - /** - * Clears the collection metadata cache after step down. - */ - void clearCollectionMetadata(); - - // versioning support - - bool hasVersion(const std::string& ns); - bool hasVersion(const std::string& ns, ChunkVersion& version); - ChunkVersion getVersion(const std::string& ns); - - /** - * If the metadata for 'ns' at this shard is at or above the requested version, - * 'reqShardVersion', returns OK and fills in 'latestShardVersion' with the latest shard - * version. The latter is always greater or equal than 'reqShardVersion' if in the same - * epoch. - * - * Otherwise, falls back to refreshMetadataNow. - * - * This call blocks if there are more than N threads - * currently refreshing metadata. (N is the number of - * tickets in ShardingState::_configServerTickets, - * currently 3.) - * - * Locking Note: - * + Must NOT be called with the write lock because this call may go into the network, - * and deadlocks may occur with shard-as-a-config. Therefore, nothing here guarantees - * that 'latestShardVersion' is indeed the current one on return. - */ - Status refreshMetadataIfNeeded(OperationContext* txn, - const std::string& ns, - const ChunkVersion& reqShardVersion, - ChunkVersion* latestShardVersion); - - /** - * Refreshes collection metadata by asking the config server for the latest information. - * Starts a new config server request. - * - * Locking Notes: - * + Must NOT be called with the write lock because this call may go into the network, - * and deadlocks may occur with shard-as-a-config. Therefore, nothing here guarantees - * that 'latestShardVersion' is indeed the current one on return. - * - * + Because this call must not be issued with the DBLock held, by the time the config - * server sent us back the collection metadata information, someone else may have - * updated the previously stored collection metadata. There are cases when one can't - * tell which of updated or loaded metadata are the freshest. There are also cases where - * the data coming from configs do not correspond to a consistent snapshot. - * In these cases, return RemoteChangeDetected. (This usually means this call needs to - * be issued again, at caller discretion) - * - * @return OK if remote metadata successfully loaded (may or may not have been installed) - * @return RemoteChangeDetected if something changed while reloading and we may retry - * @return !OK if something else went wrong during reload - * @return latestShardVersion the version that is now stored for this collection - */ - Status refreshMetadataNow(OperationContext* txn, - const std::string& ns, - ChunkVersion* latestShardVersion); - - void appendInfo(BSONObjBuilder& b); - - // querying support - - bool needCollectionMetadata(Client* client, const std::string& ns) const; - CollectionMetadataPtr getCollectionMetadata(const std::string& ns); - - // chunk migrate and split support - - /** - * Creates and installs a new chunk metadata for a given collection by "forgetting" about - * one of its chunks. The new metadata uses the provided version, which has to be higher - * than the current metadata's shard version. - * - * One exception: if the forgotten chunk is the last one in this shard for the collection, - * version has to be 0. - * - * If it runs successfully, clients need to grab the new version to access the collection. - * - * LOCKING NOTE: - * Only safe to do inside the - * - * @param ns the collection - * @param min max the chunk to eliminate from the current metadata - * @param version at which the new metadata should be at - */ - void donateChunk(OperationContext* txn, - const std::string& ns, - const BSONObj& min, - const BSONObj& max, - ChunkVersion version); - - /** - * Creates and installs new chunk metadata for a given collection by reclaiming a previously - * donated chunk. The previous metadata's shard version has to be provided. - * - * If it runs successfully, clients that became stale by the previous donateChunk will be - * able to access the collection again. - * - * Note: If a migration has aborted but not yet unregistered a pending chunk, replacing the - * metadata may leave the chunk as pending - this is not dangerous and should be rare, but - * will require a stepdown to fully recover. - * - * @param ns the collection - * @param prevMetadata the previous metadata before we donated a chunk - */ - void undoDonateChunk(OperationContext* txn, - const std::string& ns, - CollectionMetadataPtr prevMetadata); - - /** - * Remembers a chunk range between 'min' and 'max' as a range which will have data migrated - * into it. This data can then be protected against cleanup of orphaned data. - * - * Overlapping pending ranges will be removed, so it is only safe to use this when you know - * your metadata view is definitive, such as at the start of a migration. - * - * @return false with errMsg if the range is owned by this shard - */ - bool notePending(OperationContext* txn, - const std::string& ns, - const BSONObj& min, - const BSONObj& max, - const OID& epoch, - std::string* errMsg); - - /** - * Stops tracking a chunk range between 'min' and 'max' that previously was having data - * migrated into it. This data is no longer protected against cleanup of orphaned data. - * - * To avoid removing pending ranges of other operations, ensure that this is only used when - * a migration is still active. - * TODO: Because migrations may currently be active when a collection drops, an epoch is - * necessary to ensure the pending metadata change is still applicable. - * - * @return false with errMsg if the range is owned by the shard or the epoch of the metadata - * has changed - */ - bool forgetPending(OperationContext* txn, - const std::string& ns, - const BSONObj& min, - const BSONObj& max, - const OID& epoch, - std::string* errMsg); - - /** - * Creates and installs a new chunk metadata for a given collection by splitting one of its - * chunks in two or more. The version for the first split chunk should be provided. The - * subsequent chunks' version would be the latter with the minor portion incremented. - * - * The effect on clients will depend on the version used. If the major portion is the same - * as the current shards, clients shouldn't perceive the split. - * - * @param ns the collection - * @param min max the chunk that should be split - * @param splitKeys point in which to split - * @param version at which the new metadata should be at - */ - void splitChunk(OperationContext* txn, - const std::string& ns, - const BSONObj& min, - const BSONObj& max, - const std::vector<BSONObj>& splitKeys, - ChunkVersion version); - - /** - * Creates and installs a new chunk metadata for a given collection by merging a range of - * chunks ['minKey', 'maxKey') into a single chunk with version 'mergedVersion'. - * The current metadata must overlap the range completely and minKey and maxKey must not - * divide an existing chunk. - * - * The merged chunk version must have a greater version than the current shard version, - * and if it has a greater major version clients will need to reload metadata. - * - * @param ns the collection - * @param minKey maxKey the range which should be merged - * @param newShardVersion the shard version the newly merged chunk should have - */ - void mergeChunks(OperationContext* txn, - const std::string& ns, - const BSONObj& minKey, - const BSONObj& maxKey, - ChunkVersion mergedVersion); - - bool inCriticalMigrateSection(); - - /** - * @return true if we are NOT in the critical section - */ - bool waitTillNotInCriticalSection(int maxSecondsToWait); - - /** - * TESTING ONLY - * Uninstalls the metadata for a given collection. - */ - void resetMetadata(const std::string& ns); - -private: - void _initialize(const std::string& server); - - /** - * Refreshes collection metadata by asking the config server for the latest information. - * May or may not be based on a requested version. - */ - Status doRefreshMetadata(OperationContext* txn, - const std::string& ns, - const ChunkVersion& reqShardVersion, - bool useRequestedVersion, - ChunkVersion* latestShardVersion); - - // protects state below - stdx::mutex _mutex; - - // Whether ::initialize has been called - bool _enabled; - - // Sets the shard name for this host (comes through setShardVersion) - std::string _shardName; - - // protects accessing the config server - // Using a ticket holder so we can have multiple redundant tries at any given time - mutable TicketHolder _configServerTickets; - - // Map from a namespace into the metadata we need for each collection on this shard - typedef std::map<std::string, CollectionMetadataPtr> CollectionMetadataMap; - CollectionMetadataMap _collMetadata; -}; - -extern ShardingState shardingState; - -/** - * one per connection from mongos - * holds version state for each namespace - */ -class ShardedConnectionInfo { -public: - ShardedConnectionInfo(); - - const ChunkVersion getVersion(const std::string& ns) const; - void setVersion(const std::string& ns, const ChunkVersion& version); - - static ShardedConnectionInfo* get(Client* client, bool create); - static void reset(Client* client); - static void addHook(); - - bool inForceVersionOkMode() const { - return _forceVersionOk; - } - - void enterForceVersionOkMode() { - _forceVersionOk = true; - } - void leaveForceVersionOkMode() { - _forceVersionOk = false; - } - -private: - // if this is true, then chunk version #s aren't check, and all ops are allowed - bool _forceVersionOk; - - typedef std::map<std::string, ChunkVersion> NSVersionMap; - NSVersionMap _versions; -}; +class ShardedConnectionInfo; struct ShardForceVersionOkModeBlock { - ShardForceVersionOkModeBlock(Client* client) { - info = ShardedConnectionInfo::get(client, false); - if (info) - info->enterForceVersionOkMode(); - } - ~ShardForceVersionOkModeBlock() { - if (info) - info->leaveForceVersionOkMode(); - } + ShardForceVersionOkModeBlock(Client* client); + ~ShardForceVersionOkModeBlock(); ShardedConnectionInfo* info; }; |