diff options
author | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2015-07-09 16:50:30 -0400 |
---|---|---|
committer | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2015-07-11 05:49:49 -0400 |
commit | ac42068ddeaae27d2cd5cfc4808915491e5097c7 (patch) | |
tree | eda7ced79e545b3fce7f6ca0d892d330e932f0a1 /src/mongo/db | |
parent | 15c72c8570c63e2e6ba0a3d339a8286d0be604db (diff) | |
download | mongo-ac42068ddeaae27d2cd5cfc4808915491e5097c7.tar.gz |
SERVER-18084 Move code out of d_state.h/.cpp
Move the ShardingState/ShardedConnectionInfo classes out of d_state and
put them in separate sources under mongo/db.
No functional changes.
Diffstat (limited to 'src/mongo/db')
23 files changed, 1392 insertions, 85 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index f0d632fe070..08ff1f3f687 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -19,6 +19,7 @@ env.SConscript( 'pipeline', 'query', 'repl', + 's', 'sorter', 'stats', 'storage', @@ -636,6 +637,7 @@ serveronlyLibdeps = [ "repl/rslog", "repl/sync_tail", "repl/topology_coordinator_impl", + "s/sharding", "startup_warnings_mongod", "stats/counters", "stats/top", diff --git a/src/mongo/db/commands/cleanup_orphaned_cmd.cpp b/src/mongo/db/commands/cleanup_orphaned_cmd.cpp index a58b8cd4932..3af1687c790 100644 --- a/src/mongo/db/commands/cleanup_orphaned_cmd.cpp +++ b/src/mongo/db/commands/cleanup_orphaned_cmd.cpp @@ -45,8 +45,8 @@ #include "mongo/db/range_arithmetic.h" #include "mongo/db/range_deleter_service.h" #include "mongo/db/repl/replication_coordinator_global.h" +#include "mongo/db/s/sharding_state.h" #include "mongo/s/collection_metadata.h" -#include "mongo/s/d_state.h" #include "mongo/util/log.h" namespace { diff --git a/src/mongo/db/commands/create_indexes.cpp b/src/mongo/db/commands/create_indexes.cpp index d941c3fa58e..8db9d4576b8 100644 --- a/src/mongo/db/commands/create_indexes.cpp +++ b/src/mongo/db/commands/create_indexes.cpp @@ -47,7 +47,7 @@ #include "mongo/db/op_observer.h" #include "mongo/db/ops/insert.h" #include "mongo/db/repl/replication_coordinator_global.h" -#include "mongo/s/d_state.h" +#include "mongo/db/s/sharding_state.h" #include "mongo/s/shard_key_pattern.h" namespace mongo { diff --git a/src/mongo/db/commands/find_cmd.cpp b/src/mongo/db/commands/find_cmd.cpp index a300563de62..b0875bc764e 100644 --- a/src/mongo/db/commands/find_cmd.cpp +++ b/src/mongo/db/commands/find_cmd.cpp @@ -47,9 +47,9 @@ #include "mongo/db/query/explain.h" #include "mongo/db/query/find.h" #include "mongo/db/query/get_executor.h" +#include "mongo/db/s/sharding_state.h" #include "mongo/db/server_parameters.h" #include "mongo/db/stats/counters.h" -#include "mongo/s/d_state.h" #include "mongo/s/stale_exception.h" #include "mongo/util/log.h" #include "mongo/util/scopeguard.h" diff --git a/src/mongo/db/commands/merge_chunks_cmd.cpp b/src/mongo/db/commands/merge_chunks_cmd.cpp index 1ee9d397dd7..12fccf9ed5e 100644 --- a/src/mongo/db/commands/merge_chunks_cmd.cpp +++ b/src/mongo/db/commands/merge_chunks_cmd.cpp @@ -26,6 +26,8 @@ * it in the license file. */ +#include "mongo/platform/basic.h" + #include "mongo/base/init.h" #include "mongo/db/auth/action_type.h" #include "mongo/db/auth/authorization_session.h" @@ -33,7 +35,7 @@ #include "mongo/db/commands.h" #include "mongo/db/field_parser.h" #include "mongo/db/namespace_string.h" -#include "mongo/s/d_state.h" +#include "mongo/db/s/sharding_state.h" #include "mongo/s/d_merge.h" namespace mongo { @@ -147,7 +149,7 @@ public: return false; } - ShardingState::initialize(config); + shardingState.initialize(config); } // ShardName is optional, but might not be set yet diff --git a/src/mongo/db/commands/mr.cpp b/src/mongo/db/commands/mr.cpp index b56672a8763..5c6f38dbf48 100644 --- a/src/mongo/db/commands/mr.cpp +++ b/src/mongo/db/commands/mr.cpp @@ -32,7 +32,6 @@ #include "mongo/db/commands/mr.h" - #include "mongo/client/connpool.h" #include "mongo/client/parallel.h" #include "mongo/db/auth/authorization_session.h" @@ -56,6 +55,8 @@ #include "mongo/db/query/query_planner.h" #include "mongo/db/range_preserver.h" #include "mongo/db/repl/replication_coordinator_global.h" +#include "mongo/db/s/sharded_connection_info.h" +#include "mongo/db/s/sharding_state.h" #include "mongo/s/catalog/catalog_cache.h" #include "mongo/s/chunk_manager.h" #include "mongo/s/collection_metadata.h" @@ -510,9 +511,9 @@ void State::appendResults(BSONObjBuilder& final) { BSONArrayBuilder b((int)(_size * 1.2)); // _size is data size, doesn't count overhead and keys - for (InMemory::iterator i = _temp->begin(); i != _temp->end(); ++i) { - BSONObj key = i->first; - BSONList& all = i->second; + for (const auto& entry : *_temp) { + const BSONObj& key = entry.first; + const BSONList& all = entry.second; verify(all.size() == 1); diff --git a/src/mongo/db/commands/write_commands/batch_executor.cpp b/src/mongo/db/commands/write_commands/batch_executor.cpp index 7e00d55f234..428b9873133 100644 --- a/src/mongo/db/commands/write_commands/batch_executor.cpp +++ b/src/mongo/db/commands/write_commands/batch_executor.cpp @@ -66,11 +66,12 @@ #include "mongo/db/repl/replication_coordinator_global.h" #include "mongo/db/server_parameters.h" #include "mongo/db/service_context.h" +#include "mongo/db/s/sharded_connection_info.h" +#include "mongo/db/s/sharding_state.h" #include "mongo/db/stats/counters.h" #include "mongo/db/stats/top.h" #include "mongo/db/write_concern.h" #include "mongo/s/collection_metadata.h" -#include "mongo/s/d_state.h" #include "mongo/s/shard_key_pattern.h" #include "mongo/s/stale_exception.h" #include "mongo/s/write_ops/batched_upsert_detail.h" diff --git a/src/mongo/db/dbcommands.cpp b/src/mongo/db/dbcommands.cpp index 5e49cd26b37..45d34aff374 100644 --- a/src/mongo/db/dbcommands.cpp +++ b/src/mongo/db/dbcommands.cpp @@ -1,32 +1,30 @@ -// dbcommands.cpp - /** -* Copyright (C) 2012-2014 MongoDB Inc. -* -* This program is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License, version 3, -* as published by the Free Software Foundation. -* -* This program is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see <http://www.gnu.org/licenses/>. -* -* As a special exception, the copyright holders give permission to link the -* code of portions of this program with the OpenSSL library under certain -* conditions as described in each individual source file and distribute -* linked combinations including the program with the OpenSSL library. You -* must comply with the GNU Affero General Public License in all respects for -* all of the code used other than as permitted herein. If you modify file(s) -* with this exception, you may extend this exception to your version of the -* file(s), but you are not obligated to do so. If you do not wish to do so, -* delete this exception statement from your version. If you delete this -* exception statement from all source files in the program, then also delete -* it in the license file. -*/ + * Copyright (C) 2012-2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kCommand @@ -79,7 +77,6 @@ #include "mongo/db/lasterror.h" #include "mongo/db/namespace_string.h" #include "mongo/db/op_observer.h" -#include "mongo/db/operation_context_impl.h" #include "mongo/db/ops/insert.h" #include "mongo/db/query/get_executor.h" #include "mongo/db/query/internal_plans.h" @@ -91,15 +88,13 @@ #include "mongo/db/repl/repl_client_info.h" #include "mongo/db/repl/repl_settings.h" #include "mongo/db/repl/replication_coordinator_global.h" -#include "mongo/db/storage/mmap_v1/dur_stats.h" -#include "mongo/db/storage/storage_engine.h" +#include "mongo/db/s/sharding_state.h" #include "mongo/db/write_concern.h" #include "mongo/rpc/request_interface.h" #include "mongo/rpc/reply_builder_interface.h" #include "mongo/rpc/metadata.h" #include "mongo/rpc/metadata/server_selection_metadata.h" #include "mongo/rpc/metadata/sharding_metadata.h" -#include "mongo/s/d_state.h" #include "mongo/s/stale_exception.h" // for SendStaleConfigException #include "mongo/scripting/engine.h" #include "mongo/util/fail_point_service.h" diff --git a/src/mongo/db/dbhelpers.cpp b/src/mongo/db/dbhelpers.cpp index 0fe1949c452..9580fd07114 100644 --- a/src/mongo/db/dbhelpers.cpp +++ b/src/mongo/db/dbhelpers.cpp @@ -61,7 +61,7 @@ #include "mongo/db/storage_options.h" #include "mongo/db/write_concern.h" #include "mongo/db/write_concern_options.h" -#include "mongo/s/d_state.h" +#include "mongo/db/s/sharding_state.h" #include "mongo/s/shard_key_pattern.h" #include "mongo/util/log.h" #include "mongo/util/scopeguard.h" diff --git a/src/mongo/db/exec/shard_filter.cpp b/src/mongo/db/exec/shard_filter.cpp index d65ef0c0f10..8b28453caf8 100644 --- a/src/mongo/db/exec/shard_filter.cpp +++ b/src/mongo/db/exec/shard_filter.cpp @@ -28,17 +28,21 @@ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kQuery +#include "mongo/platform/basic.h" + #include "mongo/db/exec/shard_filter.h" #include "mongo/db/exec/filter.h" #include "mongo/db/exec/scoped_timer.h" #include "mongo/db/exec/working_set_common.h" +#include "mongo/s/collection_metadata.h" #include "mongo/s/shard_key_pattern.h" #include "mongo/stdx/memory.h" #include "mongo/util/log.h" namespace mongo { +using std::shared_ptr; using std::unique_ptr; using std::vector; using stdx::make_unique; @@ -46,7 +50,7 @@ using stdx::make_unique; // static const char* ShardFilterStage::kStageType = "SHARDING_FILTER"; -ShardFilterStage::ShardFilterStage(const CollectionMetadataPtr& metadata, +ShardFilterStage::ShardFilterStage(const shared_ptr<CollectionMetadata>& metadata, WorkingSet* ws, PlanStage* child) : _ws(ws), _child(child), _commonStats(kStageType), _metadata(metadata) {} diff --git a/src/mongo/db/exec/shard_filter.h b/src/mongo/db/exec/shard_filter.h index cc99525bfcc..a61eefc2f23 100644 --- a/src/mongo/db/exec/shard_filter.h +++ b/src/mongo/db/exec/shard_filter.h @@ -28,14 +28,12 @@ #pragma once - #include "mongo/db/exec/plan_stage.h" -#include "mongo/db/jsobj.h" -#include "mongo/db/record_id.h" -#include "mongo/s/d_state.h" namespace mongo { +class CollectionMetadata; + /** * This stage drops documents that didn't belong to the shard we're executing on at the time of * construction. This matches the contract for sharded cursorids which guarantees that a @@ -73,7 +71,9 @@ namespace mongo { */ class ShardFilterStage : public PlanStage { public: - ShardFilterStage(const CollectionMetadataPtr& metadata, WorkingSet* ws, PlanStage* child); + ShardFilterStage(const std::shared_ptr<CollectionMetadata>& metadata, + WorkingSet* ws, + PlanStage* child); virtual ~ShardFilterStage(); virtual bool isEOF(); @@ -107,7 +107,7 @@ private: // Note: it is important that this is the metadata from the time this stage is constructed. // See class comment for details. - const CollectionMetadataPtr _metadata; + const std::shared_ptr<CollectionMetadata> _metadata; }; } // namespace mongo diff --git a/src/mongo/db/instance.cpp b/src/mongo/db/instance.cpp index b4831e0a6cb..586c59f7278 100644 --- a/src/mongo/db/instance.cpp +++ b/src/mongo/db/instance.cpp @@ -1,32 +1,30 @@ -// instance.cpp - /** -* Copyright (C) 2008 10gen Inc. -* -* This program is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License, version 3, -* as published by the Free Software Foundation. -* -* This program is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see <http://www.gnu.org/licenses/>. -* -* As a special exception, the copyright holders give permission to link the -* code of portions of this program with the OpenSSL library under certain -* conditions as described in each individual source file and distribute -* linked combinations including the program with the OpenSSL library. You -* must comply with the GNU Affero General Public License in all respects for -* all of the code used other than as permitted herein. If you modify file(s) -* with this exception, you may extend this exception to your version of the -* file(s), but you are not obligated to do so. If you do not wish to do so, -* delete this exception statement from your version. If you delete this -* exception statement from all source files in the program, then also delete -* it in the license file. -*/ + * Copyright (C) 2008-2014 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kCommand @@ -79,6 +77,7 @@ #include "mongo/db/query/get_executor.h" #include "mongo/db/repl/oplog.h" #include "mongo/db/repl/replication_coordinator_global.h" +#include "mongo/db/s/sharded_connection_info.h" #include "mongo/db/service_context.h" #include "mongo/db/stats/counters.h" #include "mongo/db/storage/storage_engine.h" @@ -94,7 +93,6 @@ #include "mongo/rpc/metadata.h" #include "mongo/rpc/request_interface.h" #include "mongo/s/catalog/catalog_manager.h" -#include "mongo/s/d_state.h" #include "mongo/s/grid.h" #include "mongo/s/stale_exception.h" // for SendStaleConfigException #include "mongo/scripting/engine.h" diff --git a/src/mongo/db/ops/update_lifecycle_impl.cpp b/src/mongo/db/ops/update_lifecycle_impl.cpp index 95c15f2d04b..b003169edf2 100644 --- a/src/mongo/db/ops/update_lifecycle_impl.cpp +++ b/src/mongo/db/ops/update_lifecycle_impl.cpp @@ -26,14 +26,16 @@ * it in the license file. */ +#include "mongo/platform/basic.h" + #include "mongo/db/ops/update_lifecycle_impl.h" #include "mongo/db/client.h" #include "mongo/db/catalog/database.h" #include "mongo/db/field_ref.h" #include "mongo/db/catalog/collection.h" +#include "mongo/db/s/sharding_state.h" #include "mongo/s/chunk_version.h" -#include "mongo/s/d_state.h" namespace mongo { namespace { diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp index 136d1ca7f29..d71b36c49a6 100644 --- a/src/mongo/db/pipeline/pipeline_d.cpp +++ b/src/mongo/db/pipeline/pipeline_d.cpp @@ -30,7 +30,6 @@ #include "mongo/db/pipeline/pipeline_d.h" - #include "mongo/client/dbclientinterface.h" #include "mongo/db/catalog/collection.h" #include "mongo/db/catalog/database.h" @@ -41,7 +40,8 @@ #include "mongo/db/pipeline/pipeline.h" #include "mongo/db/query/get_executor.h" #include "mongo/db/query/query_planner.h" -#include "mongo/s/d_state.h" +#include "mongo/db/s/sharded_connection_info.h" +#include "mongo/db/s/sharding_state.h" namespace mongo { diff --git a/src/mongo/db/query/find.cpp b/src/mongo/db/query/find.cpp index 2991ce21928..e90e4d18779 100644 --- a/src/mongo/db/query/find.cpp +++ b/src/mongo/db/query/find.cpp @@ -50,11 +50,11 @@ #include "mongo/db/query/internal_plans.h" #include "mongo/db/query/query_planner_params.h" #include "mongo/db/repl/replication_coordinator_global.h" +#include "mongo/db/s/sharding_state.h" #include "mongo/db/server_options.h" #include "mongo/db/server_parameters.h" #include "mongo/db/storage_options.h" #include "mongo/s/chunk_version.h" -#include "mongo/s/d_state.h" #include "mongo/s/stale_exception.h" #include "mongo/stdx/memory.h" #include "mongo/util/fail_point_service.h" diff --git a/src/mongo/db/query/get_executor.cpp b/src/mongo/db/query/get_executor.cpp index aaccdd66d90..86bcc3beddf 100644 --- a/src/mongo/db/query/get_executor.cpp +++ b/src/mongo/db/query/get_executor.cpp @@ -70,9 +70,9 @@ #include "mongo/db/server_options.h" #include "mongo/db/server_parameters.h" #include "mongo/db/service_context.h" +#include "mongo/db/s/sharding_state.h" #include "mongo/db/storage_options.h" #include "mongo/db/storage/oplog_hack.h" -#include "mongo/s/d_state.h" #include "mongo/scripting/engine.h" #include "mongo/stdx/memory.h" #include "mongo/util/log.h" diff --git a/src/mongo/db/query/stage_builder.cpp b/src/mongo/db/query/stage_builder.cpp index 15bb04c1305..07db4afa5fa 100644 --- a/src/mongo/db/query/stage_builder.cpp +++ b/src/mongo/db/query/stage_builder.cpp @@ -28,6 +28,8 @@ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kQuery +#include "mongo/platform/basic.h" + #include "mongo/db/query/stage_builder.h" #include "mongo/db/client.h" @@ -51,6 +53,7 @@ #include "mongo/db/index/fts_access_method.h" #include "mongo/db/catalog/collection.h" #include "mongo/db/catalog/database.h" +#include "mongo/db/s/sharding_state.h" #include "mongo/util/log.h" namespace mongo { diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index 611a089a541..489bc753b99 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -55,9 +55,9 @@ #include "mongo/db/repl/snapshot_thread.h" #include "mongo/db/server_parameters.h" #include "mongo/db/service_context.h" +#include "mongo/db/s/sharding_state.h" #include "mongo/db/storage/storage_engine.h" #include "mongo/executor/network_interface.h" -#include "mongo/s/d_state.h" #include "mongo/stdx/functional.h" #include "mongo/stdx/thread.h" #include "mongo/util/assert_util.h" diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript new file mode 100644 index 00000000000..1ed15c8bfac --- /dev/null +++ b/src/mongo/db/s/SConscript @@ -0,0 +1,17 @@ +# -*- mode: python -*- + +Import("env") + +env.Library( + target='sharding', + source=[ + 'sharded_connection_info.cpp', + 'sharding_state.cpp', + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/base/base', + '$BUILD_DIR/mongo/bson/bson', + '$BUILD_DIR/mongo/bson/util/bson_extract', + '$BUILD_DIR/mongo/db/common', + ] +) diff --git a/src/mongo/db/s/sharded_connection_info.cpp b/src/mongo/db/s/sharded_connection_info.cpp new file mode 100644 index 00000000000..e3029bd950a --- /dev/null +++ b/src/mongo/db/s/sharded_connection_info.cpp @@ -0,0 +1,103 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding + +#include "mongo/platform/basic.h" + +#include "mongo/db/s/sharded_connection_info.h" + +#include <boost/optional.hpp> + +#include "mongo/client/global_conn_pool.h" +#include "mongo/db/client.h" +#include "mongo/db/operation_context.h" +#include "mongo/s/chunk_version.h" +#include "mongo/s/client/shard_connection.h" +#include "mongo/s/client/sharding_connection_hook.h" +#include "mongo/util/log.h" + +namespace mongo { + +namespace { + +const auto clientSCI = Client::declareDecoration<boost::optional<ShardedConnectionInfo>>(); + +} // namespace + +ShardedConnectionInfo::ShardedConnectionInfo() { + _forceVersionOk = false; +} + +ShardedConnectionInfo::~ShardedConnectionInfo() = default; + +ShardedConnectionInfo* ShardedConnectionInfo::get(Client* client, bool create) { + auto& current = clientSCI(client); + + if (!current && create) { + LOG(1) << "entering shard mode for connection"; + current = boost::in_place(); + } + + return current ? ¤t.value() : nullptr; +} + +void ShardedConnectionInfo::reset(Client* client) { + clientSCI(client) = boost::none; +} + +const ChunkVersion ShardedConnectionInfo::getVersion(const std::string& ns) const { + NSVersionMap::const_iterator it = _versions.find(ns); + if (it != _versions.end()) { + return it->second; + } else { + return ChunkVersion(0, 0, OID()); + } +} + +void ShardedConnectionInfo::setVersion(const std::string& ns, const ChunkVersion& version) { + _versions[ns] = version; +} + +void ShardedConnectionInfo::addHook() { + static stdx::mutex lock; + static bool done = false; + + stdx::lock_guard<stdx::mutex> lk(lock); + if (!done) { + log() << "first cluster operation detected, adding sharding hook to enable versioning " + "and authentication to remote servers"; + + globalConnPool.addHook(new ShardingConnectionHook(false)); + shardConnectionPool.addHook(new ShardingConnectionHook(true)); + + done = true; + } +} + +} // namespace mongo diff --git a/src/mongo/db/s/sharded_connection_info.h b/src/mongo/db/s/sharded_connection_info.h new file mode 100644 index 00000000000..482e5910cf4 --- /dev/null +++ b/src/mongo/db/s/sharded_connection_info.h @@ -0,0 +1,82 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include <map> +#include <string> + +#include "mongo/base/disallow_copying.h" + +namespace mongo { + +struct ChunkVersion; +class Client; + +/** + * There is one instance of these per each connection from mongos. Holds version state for each + * namespace. + */ +class ShardedConnectionInfo { + MONGO_DISALLOW_COPYING(ShardedConnectionInfo); + +public: + ShardedConnectionInfo(); + ~ShardedConnectionInfo(); + + const ChunkVersion getVersion(const std::string& ns) const; + void setVersion(const std::string& ns, const ChunkVersion& version); + + static ShardedConnectionInfo* get(Client* client, bool create); + static void reset(Client* client); + static void addHook(); + + bool inForceVersionOkMode() const { + return _forceVersionOk; + } + + void enterForceVersionOkMode() { + _forceVersionOk = true; + } + void leaveForceVersionOkMode() { + _forceVersionOk = false; + } + +private: + typedef std::map<std::string, ChunkVersion> NSVersionMap; + + // Map from a namespace string to the chunk version with which this connection has been + // initialized for the specified namespace + NSVersionMap _versions; + + // If this is true, then chunk versions aren't checked, and all operations are allowed + bool _forceVersionOk; +}; + + +} // namespace mongo diff --git a/src/mongo/db/s/sharding_state.cpp b/src/mongo/db/s/sharding_state.cpp new file mode 100644 index 00000000000..f6442c69744 --- /dev/null +++ b/src/mongo/db/s/sharding_state.cpp @@ -0,0 +1,787 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding + +#include "mongo/platform/basic.h" + +#include "mongo/db/s/sharding_state.h" + +#include "mongo/client/remote_command_targeter_factory_impl.h" +#include "mongo/db/client.h" +#include "mongo/db/concurrency/lock_state.h" +#include "mongo/db/operation_context.h" +#include "mongo/db/repl/replication_executor.h" +#include "mongo/db/s/sharded_connection_info.h" +#include "mongo/executor/network_interface_factory.h" +#include "mongo/executor/task_executor.h" +#include "mongo/s/catalog/catalog_manager.h" +#include "mongo/s/catalog/legacy/catalog_manager_legacy.h" +#include "mongo/s/client/shard_registry.h" +#include "mongo/s/chunk_version.h" +#include "mongo/s/collection_metadata.h" +#include "mongo/s/grid.h" +#include "mongo/s/metadata_loader.h" +#include "mongo/util/log.h" +#include "mongo/util/mongoutils/str.h" +#include "mongo/util/net/sock.h" + +namespace mongo { + +using std::shared_ptr; +using std::string; +using std::vector; + +// Global sharding state instance +ShardingState shardingState; + +ShardingState::ShardingState() + : _enabled(false), + _configServerTickets(3 /* max number of concurrent config server refresh threads */) {} + +ShardingState::~ShardingState() = default; + +bool ShardingState::enabled() { + stdx::lock_guard<stdx::mutex> lk(_mutex); + return _enabled; +} + +string ShardingState::getConfigServer() { + stdx::lock_guard<stdx::mutex> lk(_mutex); + invariant(_enabled); + + return grid.catalogManager()->connectionString().toString(); +} + +string ShardingState::getShardName() { + stdx::lock_guard<stdx::mutex> lk(_mutex); + invariant(_enabled); + + return _shardName; +} + +void ShardingState::initialize(const string& server) { + uassert(18509, + "Unable to obtain host name during sharding initialization.", + !getHostName().empty()); + + stdx::lock_guard<stdx::mutex> lk(_mutex); + + if (_enabled) { + // TODO: Do we need to throw exception if the config servers have changed from what we + // already have in place? How do we test for that? + return; + } + + ShardedConnectionInfo::addHook(); + + std::string errmsg; + ConnectionString configServerCS = ConnectionString::parse(server, errmsg); + uassert(28633, + str::stream() << "Invalid config server connection string: " << errmsg, + configServerCS.isValid()); + + auto catalogManager = stdx::make_unique<CatalogManagerLegacy>(); + uassertStatusOK(catalogManager->init(configServerCS)); + + auto shardRegistry(stdx::make_unique<ShardRegistry>( + stdx::make_unique<RemoteCommandTargeterFactoryImpl>(), + stdx::make_unique<repl::ReplicationExecutor>( + executor::makeNetworkInterface().release(), nullptr, 0), + nullptr, + catalogManager.get())); + shardRegistry->startup(); + + grid.init(std::move(catalogManager), std::move(shardRegistry)); + + _enabled = true; +} + +// TODO: Consolidate and eliminate these various ways of setting / validating shard names +bool ShardingState::setShardName(const string& name) { + return setShardNameAndHost(name, ""); +} + +bool ShardingState::setShardNameAndHost(const string& name, const string& host) { + stdx::lock_guard<stdx::mutex> lk(_mutex); + if (_shardName.size() == 0) { + // TODO SERVER-2299 remotely verify the name is sound w.r.t IPs + _shardName = name; + + string clientAddr = cc().clientAddress(true); + + log() << "remote client " << clientAddr << " initialized this host " + << (host.empty() ? string("") : string("(") + host + ") ") << "as shard " << name; + + return true; + } + + if (_shardName == name) + return true; + + string clientAddr = cc().clientAddress(true); + + warning() << "remote client " << clientAddr << " tried to initialize this host " + << (host.empty() ? string("") : string("(") + host + ") ") << "as shard " << name + << ", but shard name was previously initialized as " << _shardName; + + return false; +} + +void ShardingState::gotShardName(const string& name) { + gotShardNameAndHost(name, ""); +} + +void ShardingState::gotShardNameAndHost(const string& name, const string& host) { + if (setShardNameAndHost(name, host)) { + return; + } + + const string clientAddr = cc().clientAddress(true); + + StringBuilder sb; + + // Same error as above, to match for reporting + sb << "remote client " << clientAddr << " tried to initialize this host " + << (host.empty() ? string("") : string("(") + host + ") ") << "as shard " << name + << ", but shard name was previously initialized as " << _shardName; + + msgasserted(13298, sb.str()); +} + +void ShardingState::clearCollectionMetadata() { + stdx::lock_guard<stdx::mutex> lk(_mutex); + _collMetadata.clear(); +} + +// TODO we shouldn't need three ways for checking the version. Fix this. +bool ShardingState::hasVersion(const string& ns) { + stdx::lock_guard<stdx::mutex> lk(_mutex); + + CollectionMetadataMap::const_iterator it = _collMetadata.find(ns); + return it != _collMetadata.end(); +} + +bool ShardingState::hasVersion(const string& ns, ChunkVersion& version) { + stdx::lock_guard<stdx::mutex> lk(_mutex); + + CollectionMetadataMap::const_iterator it = _collMetadata.find(ns); + if (it == _collMetadata.end()) + return false; + + shared_ptr<CollectionMetadata> p = it->second; + version = p->getShardVersion(); + return true; +} + +ChunkVersion ShardingState::getVersion(const string& ns) { + stdx::lock_guard<stdx::mutex> lk(_mutex); + + CollectionMetadataMap::const_iterator it = _collMetadata.find(ns); + if (it != _collMetadata.end()) { + shared_ptr<CollectionMetadata> p = it->second; + return p->getShardVersion(); + } else { + return ChunkVersion(0, 0, OID()); + } +} + +void ShardingState::donateChunk(OperationContext* txn, + const string& ns, + const BSONObj& min, + const BSONObj& max, + ChunkVersion version) { + invariant(txn->lockState()->isCollectionLockedForMode(ns, MODE_X)); + stdx::lock_guard<stdx::mutex> lk(_mutex); + + CollectionMetadataMap::const_iterator it = _collMetadata.find(ns); + verify(it != _collMetadata.end()); + shared_ptr<CollectionMetadata> p = it->second; + + // empty shards should have version 0 + version = (p->getNumChunks() > 1) ? version : ChunkVersion(0, 0, p->getCollVersion().epoch()); + + ChunkType chunk; + chunk.setMin(min); + chunk.setMax(max); + string errMsg; + + shared_ptr<CollectionMetadata> cloned(p->cloneMigrate(chunk, version, &errMsg)); + // uassert to match old behavior, TODO: report errors w/o throwing + uassert(16855, errMsg, NULL != cloned.get()); + + // TODO: a bit dangerous to have two different zero-version states - no-metadata and + // no-version + _collMetadata[ns] = cloned; +} + +void ShardingState::undoDonateChunk(OperationContext* txn, + const string& ns, + shared_ptr<CollectionMetadata> prevMetadata) { + invariant(txn->lockState()->isCollectionLockedForMode(ns, MODE_X)); + stdx::lock_guard<stdx::mutex> lk(_mutex); + + log() << "ShardingState::undoDonateChunk acquired _mutex"; + + CollectionMetadataMap::iterator it = _collMetadata.find(ns); + verify(it != _collMetadata.end()); + it->second = prevMetadata; +} + +bool ShardingState::notePending(OperationContext* txn, + const string& ns, + const BSONObj& min, + const BSONObj& max, + const OID& epoch, + string* errMsg) { + invariant(txn->lockState()->isCollectionLockedForMode(ns, MODE_X)); + stdx::lock_guard<stdx::mutex> lk(_mutex); + + CollectionMetadataMap::const_iterator it = _collMetadata.find(ns); + if (it == _collMetadata.end()) { + *errMsg = str::stream() << "could not note chunk " + << "[" << min << "," << max << ")" + << " as pending because the local metadata for " << ns + << " has changed"; + + return false; + } + + shared_ptr<CollectionMetadata> metadata = it->second; + + // This can currently happen because drops aren't synchronized with in-migrations + // The idea for checking this here is that in the future we shouldn't have this problem + if (metadata->getCollVersion().epoch() != epoch) { + *errMsg = str::stream() << "could not note chunk " + << "[" << min << "," << max << ")" + << " as pending because the epoch for " << ns + << " has changed from " << epoch << " to " + << metadata->getCollVersion().epoch(); + + return false; + } + + ChunkType chunk; + chunk.setMin(min); + chunk.setMax(max); + + shared_ptr<CollectionMetadata> cloned(metadata->clonePlusPending(chunk, errMsg)); + if (!cloned) + return false; + + _collMetadata[ns] = cloned; + return true; +} + +bool ShardingState::forgetPending(OperationContext* txn, + const string& ns, + const BSONObj& min, + const BSONObj& max, + const OID& epoch, + string* errMsg) { + invariant(txn->lockState()->isCollectionLockedForMode(ns, MODE_X)); + stdx::lock_guard<stdx::mutex> lk(_mutex); + + CollectionMetadataMap::const_iterator it = _collMetadata.find(ns); + if (it == _collMetadata.end()) { + *errMsg = str::stream() << "no need to forget pending chunk " + << "[" << min << "," << max << ")" + << " because the local metadata for " << ns << " has changed"; + + return false; + } + + shared_ptr<CollectionMetadata> metadata = it->second; + + // This can currently happen because drops aren't synchronized with in-migrations + // The idea for checking this here is that in the future we shouldn't have this problem + if (metadata->getCollVersion().epoch() != epoch) { + *errMsg = str::stream() << "no need to forget pending chunk " + << "[" << min << "," << max << ")" + << " because the epoch for " << ns << " has changed from " << epoch + << " to " << metadata->getCollVersion().epoch(); + + return false; + } + + ChunkType chunk; + chunk.setMin(min); + chunk.setMax(max); + + shared_ptr<CollectionMetadata> cloned(metadata->cloneMinusPending(chunk, errMsg)); + if (!cloned) + return false; + + _collMetadata[ns] = cloned; + return true; +} + +void ShardingState::splitChunk(OperationContext* txn, + const string& ns, + const BSONObj& min, + const BSONObj& max, + const vector<BSONObj>& splitKeys, + ChunkVersion version) { + invariant(txn->lockState()->isCollectionLockedForMode(ns, MODE_X)); + stdx::lock_guard<stdx::mutex> lk(_mutex); + + CollectionMetadataMap::const_iterator it = _collMetadata.find(ns); + verify(it != _collMetadata.end()); + + ChunkType chunk; + chunk.setMin(min); + chunk.setMax(max); + string errMsg; + + shared_ptr<CollectionMetadata> cloned( + it->second->cloneSplit(chunk, splitKeys, version, &errMsg)); + // uassert to match old behavior, TODO: report errors w/o throwing + uassert(16857, errMsg, NULL != cloned.get()); + + _collMetadata[ns] = cloned; +} + +void ShardingState::mergeChunks(OperationContext* txn, + const string& ns, + const BSONObj& minKey, + const BSONObj& maxKey, + ChunkVersion mergedVersion) { + invariant(txn->lockState()->isCollectionLockedForMode(ns, MODE_X)); + stdx::lock_guard<stdx::mutex> lk(_mutex); + + CollectionMetadataMap::const_iterator it = _collMetadata.find(ns); + verify(it != _collMetadata.end()); + + string errMsg; + + shared_ptr<CollectionMetadata> cloned( + it->second->cloneMerge(minKey, maxKey, mergedVersion, &errMsg)); + // uassert to match old behavior, TODO: report errors w/o throwing + uassert(17004, errMsg, NULL != cloned.get()); + + _collMetadata[ns] = cloned; +} + +void ShardingState::resetMetadata(const string& ns) { + stdx::lock_guard<stdx::mutex> lk(_mutex); + + warning() << "resetting metadata for " << ns << ", this should only be used in testing"; + + _collMetadata.erase(ns); +} + +Status ShardingState::refreshMetadataIfNeeded(OperationContext* txn, + const string& ns, + const ChunkVersion& reqShardVersion, + ChunkVersion* latestShardVersion) { + // The _configServerTickets serializes this process such that only a small number of threads + // can try to refresh at the same time. + + LOG(2) << "metadata refresh requested for " << ns << " at shard version " << reqShardVersion; + + // + // Queuing of refresh requests starts here when remote reload is needed. This may take time. + // TODO: Explicitly expose the queuing discipline. + // + + _configServerTickets.waitForTicket(); + TicketHolderReleaser needTicketFrom(&_configServerTickets); + + // + // Fast path - check if the requested version is at a higher version than the current + // metadata version or a different epoch before verifying against config server. + // + + shared_ptr<CollectionMetadata> storedMetadata; + { + stdx::lock_guard<stdx::mutex> lk(_mutex); + CollectionMetadataMap::iterator it = _collMetadata.find(ns); + if (it != _collMetadata.end()) + storedMetadata = it->second; + } + ChunkVersion storedShardVersion; + if (storedMetadata) + storedShardVersion = storedMetadata->getShardVersion(); + *latestShardVersion = storedShardVersion; + + if (storedShardVersion >= reqShardVersion && + storedShardVersion.epoch() == reqShardVersion.epoch()) { + // Don't need to remotely reload if we're in the same epoch with a >= version + return Status::OK(); + } + + // + // Slow path - remotely reload + // + // Cases: + // A) Initial config load and/or secondary take-over. + // B) Migration TO this shard finished, notified by mongos. + // C) Dropping a collection, notified (currently) by mongos. + // D) Stale client wants to reload metadata with a different *epoch*, so we aren't sure. + + if (storedShardVersion.epoch() != reqShardVersion.epoch()) { + // Need to remotely reload if our epochs aren't the same, to verify + LOG(1) << "metadata change requested for " << ns << ", from shard version " + << storedShardVersion << " to " << reqShardVersion + << ", need to verify with config server"; + } else { + // Need to remotely reload since our epochs aren't the same but our version is greater + LOG(1) << "metadata version update requested for " << ns << ", from shard version " + << storedShardVersion << " to " << reqShardVersion + << ", need to verify with config server"; + } + + return doRefreshMetadata(txn, ns, reqShardVersion, true, latestShardVersion); +} + +Status ShardingState::refreshMetadataNow(OperationContext* txn, + const string& ns, + ChunkVersion* latestShardVersion) { + return doRefreshMetadata(txn, ns, ChunkVersion(0, 0, OID()), false, latestShardVersion); +} + +Status ShardingState::doRefreshMetadata(OperationContext* txn, + const string& ns, + const ChunkVersion& reqShardVersion, + bool useRequestedVersion, + ChunkVersion* latestShardVersion) { + // The idea here is that we're going to reload the metadata from the config server, but + // we need to do so outside any locks. When we get our result back, if the current metadata + // has changed, we may not be able to install the new metadata. + + // + // Get the initial metadata + // No DBLock is needed since the metadata is expected to change during reload. + // + + shared_ptr<CollectionMetadata> beforeMetadata; + + { + stdx::lock_guard<stdx::mutex> lk(_mutex); + + // We can't reload if sharding is not enabled - i.e. without a config server location + if (!_enabled) { + string errMsg = str::stream() << "cannot refresh metadata for " << ns + << " before sharding has been enabled"; + + warning() << errMsg; + return Status(ErrorCodes::NotYetInitialized, errMsg); + } + + // We also can't reload if a shard name has not yet been set. + if (_shardName.empty()) { + string errMsg = str::stream() << "cannot refresh metadata for " << ns + << " before shard name has been set"; + + warning() << errMsg; + return Status(ErrorCodes::NotYetInitialized, errMsg); + } + + CollectionMetadataMap::iterator it = _collMetadata.find(ns); + if (it != _collMetadata.end()) { + beforeMetadata = it->second; + } + } + + ChunkVersion beforeShardVersion; + ChunkVersion beforeCollVersion; + if (beforeMetadata) { + beforeShardVersion = beforeMetadata->getShardVersion(); + beforeCollVersion = beforeMetadata->getCollVersion(); + } + + *latestShardVersion = beforeShardVersion; + + // + // Determine whether we need to diff or fully reload + // + + bool fullReload = false; + if (!beforeMetadata) { + // We don't have any metadata to reload from + fullReload = true; + } else if (useRequestedVersion && reqShardVersion.epoch() != beforeShardVersion.epoch()) { + // It's not useful to use the metadata as a base because we think the epoch will differ + fullReload = true; + } + + // + // Load the metadata from the remote server, start construction + // + + LOG(0) << "remotely refreshing metadata for " << ns + << (useRequestedVersion + ? string(" with requested shard version ") + reqShardVersion.toString() + : "") + << (fullReload ? ", current shard version is " : " based on current shard version ") + << beforeShardVersion << ", current metadata version is " << beforeCollVersion; + + string errMsg; + + MetadataLoader mdLoader; + CollectionMetadata* remoteMetadataRaw = new CollectionMetadata(); + shared_ptr<CollectionMetadata> remoteMetadata(remoteMetadataRaw); + + Timer refreshTimer; + Status status = mdLoader.makeCollectionMetadata(grid.catalogManager(), + ns, + getShardName(), + fullReload ? NULL : beforeMetadata.get(), + remoteMetadataRaw); + long long refreshMillis = refreshTimer.millis(); + + if (status.code() == ErrorCodes::NamespaceNotFound) { + remoteMetadata.reset(); + remoteMetadataRaw = NULL; + } else if (!status.isOK()) { + warning() << "could not remotely refresh metadata for " << ns << causedBy(status.reason()); + + return status; + } + + ChunkVersion remoteShardVersion; + ChunkVersion remoteCollVersion; + if (remoteMetadata) { + remoteShardVersion = remoteMetadata->getShardVersion(); + remoteCollVersion = remoteMetadata->getCollVersion(); + } + + // + // Get ready to install loaded metadata if needed + // + + shared_ptr<CollectionMetadata> afterMetadata; + ChunkVersion afterShardVersion; + ChunkVersion afterCollVersion; + ChunkVersion::VersionChoice choice; + + // If we choose to install the new metadata, this describes the kind of install + enum InstallType { + InstallType_New, + InstallType_Update, + InstallType_Replace, + InstallType_Drop, + InstallType_None + } installType = InstallType_None; // compiler complains otherwise + + { + // Exclusive collection lock needed since we're now potentially changing the metadata, + // and don't want reads/writes to be ongoing. + ScopedTransaction transaction(txn, MODE_IX); + Lock::DBLock dbLock(txn->lockState(), nsToDatabaseSubstring(ns), MODE_IX); + Lock::CollectionLock collLock(txn->lockState(), ns, MODE_X); + + // + // Get the metadata now that the load has completed + // + + stdx::lock_guard<stdx::mutex> lk(_mutex); + + // Don't reload if our config server has changed or sharding is no longer enabled + if (!_enabled) { + string errMsg = str::stream() << "could not refresh metadata for " << ns + << ", sharding is no longer enabled"; + + warning() << errMsg; + return Status(ErrorCodes::NotYetInitialized, errMsg); + } + + CollectionMetadataMap::iterator it = _collMetadata.find(ns); + if (it != _collMetadata.end()) + afterMetadata = it->second; + + if (afterMetadata) { + afterShardVersion = afterMetadata->getShardVersion(); + afterCollVersion = afterMetadata->getCollVersion(); + } + + *latestShardVersion = afterShardVersion; + + // + // Resolve newer pending chunks with the remote metadata, finish construction + // + + status = mdLoader.promotePendingChunks(afterMetadata.get(), remoteMetadataRaw); + + if (!status.isOK()) { + warning() << "remote metadata for " << ns + << " is inconsistent with current pending chunks" + << causedBy(status.reason()); + + return status; + } + + // + // Compare the 'before', 'after', and 'remote' versions/epochs and choose newest + // Zero-epochs (sentinel value for "dropped" collections), are tested by + // !epoch.isSet(). + // + + choice = ChunkVersion::chooseNewestVersion( + beforeCollVersion, afterCollVersion, remoteCollVersion); + + if (choice == ChunkVersion::VersionChoice_Remote) { + dassert(!remoteCollVersion.epoch().isSet() || remoteShardVersion >= beforeShardVersion); + + if (!afterCollVersion.epoch().isSet()) { + // First metadata load + installType = InstallType_New; + dassert(it == _collMetadata.end()); + _collMetadata.insert(make_pair(ns, remoteMetadata)); + } else if (remoteCollVersion.epoch().isSet() && + remoteCollVersion.epoch() == afterCollVersion.epoch()) { + // Update to existing metadata + installType = InstallType_Update; + + // Invariant: If CollMetadata was not found, version should be have been 0. + dassert(it != _collMetadata.end()); + it->second = remoteMetadata; + } else if (remoteCollVersion.epoch().isSet()) { + // New epoch detected, replacing metadata + installType = InstallType_Replace; + + // Invariant: If CollMetadata was not found, version should be have been 0. + dassert(it != _collMetadata.end()); + it->second = remoteMetadata; + } else { + dassert(!remoteCollVersion.epoch().isSet()); + + // Drop detected + installType = InstallType_Drop; + _collMetadata.erase(it); + } + + *latestShardVersion = remoteShardVersion; + } + } + // End _mutex + // End DBWrite + + // + // Do messaging based on what happened above + // + string localShardVersionMsg = beforeShardVersion.epoch() == afterShardVersion.epoch() + ? afterShardVersion.toString() + : beforeShardVersion.toString() + " / " + afterShardVersion.toString(); + + if (choice == ChunkVersion::VersionChoice_Unknown) { + string errMsg = str::stream() + << "need to retry loading metadata for " << ns + << ", collection may have been dropped or recreated during load" + << " (loaded shard version : " << remoteShardVersion.toString() + << ", stored shard versions : " << localShardVersionMsg << ", took " << refreshMillis + << "ms)"; + + warning() << errMsg; + return Status(ErrorCodes::RemoteChangeDetected, errMsg); + } + + if (choice == ChunkVersion::VersionChoice_Local) { + LOG(0) << "metadata of collection " << ns + << " already up to date (shard version : " << afterShardVersion.toString() + << ", took " << refreshMillis << "ms)"; + return Status::OK(); + } + + dassert(choice == ChunkVersion::VersionChoice_Remote); + + switch (installType) { + case InstallType_New: + LOG(0) << "collection " << ns << " was previously unsharded" + << ", new metadata loaded with shard version " << remoteShardVersion; + break; + case InstallType_Update: + LOG(0) << "updating metadata for " << ns << " from shard version " + << localShardVersionMsg << " to shard version " << remoteShardVersion; + break; + case InstallType_Replace: + LOG(0) << "replacing metadata for " << ns << " at shard version " + << localShardVersionMsg << " with a new epoch (shard version " + << remoteShardVersion << ")"; + break; + case InstallType_Drop: + LOG(0) << "dropping metadata for " << ns << " at shard version " << localShardVersionMsg + << ", took " << refreshMillis << "ms"; + break; + default: + verify(false); + break; + } + + if (installType != InstallType_Drop) { + LOG(0) << "collection version was loaded at version " << remoteCollVersion << ", took " + << refreshMillis << "ms"; + } + + return Status::OK(); +} + +void ShardingState::appendInfo(BSONObjBuilder& builder) { + stdx::lock_guard<stdx::mutex> lk(_mutex); + + builder.appendBool("enabled", _enabled); + if (!_enabled) { + return; + } + + builder.append("configServer", grid.catalogManager()->connectionString().toString()); + builder.append("shardName", _shardName); + + BSONObjBuilder versionB(builder.subobjStart("versions")); + for (CollectionMetadataMap::const_iterator it = _collMetadata.begin(); + it != _collMetadata.end(); + ++it) { + shared_ptr<CollectionMetadata> metadata = it->second; + versionB.appendTimestamp(it->first, metadata->getShardVersion().toLong()); + } + + versionB.done(); +} + +bool ShardingState::needCollectionMetadata(Client* client, const string& ns) const { + if (!_enabled) + return false; + + if (!ShardedConnectionInfo::get(client, false)) + return false; + + return true; +} + +shared_ptr<CollectionMetadata> ShardingState::getCollectionMetadata(const string& ns) { + stdx::lock_guard<stdx::mutex> lk(_mutex); + + CollectionMetadataMap::const_iterator it = _collMetadata.find(ns); + if (it == _collMetadata.end()) { + return shared_ptr<CollectionMetadata>(); + } else { + return it->second; + } +} + +} // namespace mongo diff --git a/src/mongo/db/s/sharding_state.h b/src/mongo/db/s/sharding_state.h new file mode 100644 index 00000000000..b4c666ab5bd --- /dev/null +++ b/src/mongo/db/s/sharding_state.h @@ -0,0 +1,310 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include <map> +#include <string> +#include <vector> + +#include "mongo/base/disallow_copying.h" +#include "mongo/bson/oid.h" +#include "mongo/stdx/memory.h" +#include "mongo/stdx/mutex.h" +#include "mongo/util/concurrency/ticketholder.h" + +namespace mongo { + +class BSONObj; +class BSONObjBuilder; +struct ChunkVersion; +class Client; +class CollectionMetadata; +class OperationContext; +class Status; + +/** + * Represents the sharding state for the running instance. One per instance. + */ +class ShardingState { + MONGO_DISALLOW_COPYING(ShardingState); + +public: + ShardingState(); + ~ShardingState(); + + bool enabled(); + + std::string getConfigServer(); + std::string getShardName(); + + // Initialize sharding state and begin authenticating outgoing connections and handling + // shard versions. If this is not run before sharded operations occur auth will not work + // and versions will not be tracked. + void initialize(const std::string& server); + + void gotShardName(const std::string& name); + bool setShardName(const std::string& name); // Same as above, does not throw + + // Helpers for SetShardVersion which report the host name sent to this shard when the shard + // name does not match. Do not use in other places. + // TODO: Remove once SSV is deprecated + void gotShardNameAndHost(const std::string& name, const std::string& host); + bool setShardNameAndHost(const std::string& name, const std::string& host); + + /** + * Clears the collection metadata cache after step down. + */ + void clearCollectionMetadata(); + + // versioning support + + bool hasVersion(const std::string& ns); + bool hasVersion(const std::string& ns, ChunkVersion& version); + ChunkVersion getVersion(const std::string& ns); + + /** + * If the metadata for 'ns' at this shard is at or above the requested version, + * 'reqShardVersion', returns OK and fills in 'latestShardVersion' with the latest shard + * version. The latter is always greater or equal than 'reqShardVersion' if in the same + * epoch. + * + * Otherwise, falls back to refreshMetadataNow. + * + * This call blocks if there are more than N threads + * currently refreshing metadata. (N is the number of + * tickets in ShardingState::_configServerTickets, + * currently 3.) + * + * Locking Note: + * + Must NOT be called with the write lock because this call may go into the network, + * and deadlocks may occur with shard-as-a-config. Therefore, nothing here guarantees + * that 'latestShardVersion' is indeed the current one on return. + */ + Status refreshMetadataIfNeeded(OperationContext* txn, + const std::string& ns, + const ChunkVersion& reqShardVersion, + ChunkVersion* latestShardVersion); + + /** + * Refreshes collection metadata by asking the config server for the latest information. + * Starts a new config server request. + * + * Locking Notes: + * + Must NOT be called with the write lock because this call may go into the network, + * and deadlocks may occur with shard-as-a-config. Therefore, nothing here guarantees + * that 'latestShardVersion' is indeed the current one on return. + * + * + Because this call must not be issued with the DBLock held, by the time the config + * server sent us back the collection metadata information, someone else may have + * updated the previously stored collection metadata. There are cases when one can't + * tell which of updated or loaded metadata are the freshest. There are also cases where + * the data coming from configs do not correspond to a consistent snapshot. + * In these cases, return RemoteChangeDetected. (This usually means this call needs to + * be issued again, at caller discretion) + * + * @return OK if remote metadata successfully loaded (may or may not have been installed) + * @return RemoteChangeDetected if something changed while reloading and we may retry + * @return !OK if something else went wrong during reload + * @return latestShardVersion the version that is now stored for this collection + */ + Status refreshMetadataNow(OperationContext* txn, + const std::string& ns, + ChunkVersion* latestShardVersion); + + void appendInfo(BSONObjBuilder& b); + + // querying support + + bool needCollectionMetadata(Client* client, const std::string& ns) const; + std::shared_ptr<CollectionMetadata> getCollectionMetadata(const std::string& ns); + + // chunk migrate and split support + + /** + * Creates and installs a new chunk metadata for a given collection by "forgetting" about + * one of its chunks. The new metadata uses the provided version, which has to be higher + * than the current metadata's shard version. + * + * One exception: if the forgotten chunk is the last one in this shard for the collection, + * version has to be 0. + * + * If it runs successfully, clients need to grab the new version to access the collection. + * + * LOCKING NOTE: + * Only safe to do inside the + * + * @param ns the collection + * @param min max the chunk to eliminate from the current metadata + * @param version at which the new metadata should be at + */ + void donateChunk(OperationContext* txn, + const std::string& ns, + const BSONObj& min, + const BSONObj& max, + ChunkVersion version); + + /** + * Creates and installs new chunk metadata for a given collection by reclaiming a previously + * donated chunk. The previous metadata's shard version has to be provided. + * + * If it runs successfully, clients that became stale by the previous donateChunk will be + * able to access the collection again. + * + * Note: If a migration has aborted but not yet unregistered a pending chunk, replacing the + * metadata may leave the chunk as pending - this is not dangerous and should be rare, but + * will require a stepdown to fully recover. + * + * @param ns the collection + * @param prevMetadata the previous metadata before we donated a chunk + */ + void undoDonateChunk(OperationContext* txn, + const std::string& ns, + std::shared_ptr<CollectionMetadata> prevMetadata); + + /** + * Remembers a chunk range between 'min' and 'max' as a range which will have data migrated + * into it. This data can then be protected against cleanup of orphaned data. + * + * Overlapping pending ranges will be removed, so it is only safe to use this when you know + * your metadata view is definitive, such as at the start of a migration. + * + * @return false with errMsg if the range is owned by this shard + */ + bool notePending(OperationContext* txn, + const std::string& ns, + const BSONObj& min, + const BSONObj& max, + const OID& epoch, + std::string* errMsg); + + /** + * Stops tracking a chunk range between 'min' and 'max' that previously was having data + * migrated into it. This data is no longer protected against cleanup of orphaned data. + * + * To avoid removing pending ranges of other operations, ensure that this is only used when + * a migration is still active. + * TODO: Because migrations may currently be active when a collection drops, an epoch is + * necessary to ensure the pending metadata change is still applicable. + * + * @return false with errMsg if the range is owned by the shard or the epoch of the metadata + * has changed + */ + bool forgetPending(OperationContext* txn, + const std::string& ns, + const BSONObj& min, + const BSONObj& max, + const OID& epoch, + std::string* errMsg); + + /** + * Creates and installs a new chunk metadata for a given collection by splitting one of its + * chunks in two or more. The version for the first split chunk should be provided. The + * subsequent chunks' version would be the latter with the minor portion incremented. + * + * The effect on clients will depend on the version used. If the major portion is the same + * as the current shards, clients shouldn't perceive the split. + * + * @param ns the collection + * @param min max the chunk that should be split + * @param splitKeys point in which to split + * @param version at which the new metadata should be at + */ + void splitChunk(OperationContext* txn, + const std::string& ns, + const BSONObj& min, + const BSONObj& max, + const std::vector<BSONObj>& splitKeys, + ChunkVersion version); + + /** + * Creates and installs a new chunk metadata for a given collection by merging a range of + * chunks ['minKey', 'maxKey') into a single chunk with version 'mergedVersion'. + * The current metadata must overlap the range completely and minKey and maxKey must not + * divide an existing chunk. + * + * The merged chunk version must have a greater version than the current shard version, + * and if it has a greater major version clients will need to reload metadata. + * + * @param ns the collection + * @param minKey maxKey the range which should be merged + * @param newShardVersion the shard version the newly merged chunk should have + */ + void mergeChunks(OperationContext* txn, + const std::string& ns, + const BSONObj& minKey, + const BSONObj& maxKey, + ChunkVersion mergedVersion); + + bool inCriticalMigrateSection(); + + /** + * @return true if we are NOT in the critical section + */ + bool waitTillNotInCriticalSection(int maxSecondsToWait); + + /** + * TESTING ONLY + * Uninstalls the metadata for a given collection. + */ + void resetMetadata(const std::string& ns); + +private: + // Map from a namespace into the metadata we need for each collection on this shard + typedef std::map<std::string, std::shared_ptr<CollectionMetadata>> CollectionMetadataMap; + + /** + * Refreshes collection metadata by asking the config server for the latest information. + * May or may not be based on a requested version. + */ + Status doRefreshMetadata(OperationContext* txn, + const std::string& ns, + const ChunkVersion& reqShardVersion, + bool useRequestedVersion, + ChunkVersion* latestShardVersion); + + // protects state below + stdx::mutex _mutex; + + // Whether ::initialize has been called + bool _enabled; + + // Sets the shard name for this host (comes through setShardVersion) + std::string _shardName; + + // protects accessing the config server + // Using a ticket holder so we can have multiple redundant tries at any given time + mutable TicketHolder _configServerTickets; + + CollectionMetadataMap _collMetadata; +}; + +// Global sharding state instance +extern ShardingState shardingState; + +} // namespace mongo |