diff options
Diffstat (limited to 'src')
26 files changed, 242 insertions, 173 deletions
diff --git a/src/mongo/db/commands/cleanup_orphaned_cmd.cpp b/src/mongo/db/commands/cleanup_orphaned_cmd.cpp index 8d7a3088bee..4769ae6a4a3 100644 --- a/src/mongo/db/commands/cleanup_orphaned_cmd.cpp +++ b/src/mongo/db/commands/cleanup_orphaned_cmd.cpp @@ -33,7 +33,6 @@ #include <string> #include <vector> -#include "mongo/base/init.h" #include "mongo/db/auth/action_type.h" #include "mongo/db/auth/authorization_manager.h" #include "mongo/db/auth/authorization_session.h" @@ -45,25 +44,22 @@ #include "mongo/db/range_arithmetic.h" #include "mongo/db/range_deleter_service.h" #include "mongo/db/repl/replication_coordinator_global.h" +#include "mongo/db/service_context.h" #include "mongo/db/s/collection_metadata.h" #include "mongo/db/s/sharding_state.h" #include "mongo/util/log.h" +namespace mongo { + +using std::string; +using str::stream; + namespace { -using mongo::WriteConcernOptions; const int kDefaultWTimeoutMs = 60 * 1000; const WriteConcernOptions DefaultWriteConcern(WriteConcernOptions::kMajority, WriteConcernOptions::NONE, kDefaultWTimeoutMs); -} - -namespace mongo { - -using std::endl; -using std::string; - -using mongoutils::str::stream; enum CleanupResult { CleanupResult_Done, CleanupResult_Continue, CleanupResult_Error }; @@ -85,10 +81,11 @@ CleanupResult cleanupOrphanedData(OperationContext* txn, string* errMsg) { BSONObj startingFromKey = startingFromKeyConst; - CollectionMetadataPtr metadata = shardingState.getCollectionMetadata(ns.toString()); + std::shared_ptr<CollectionMetadata> metadata = + ShardingState::get(getGlobalServiceContext())->getCollectionMetadata(ns.toString()); if (!metadata || metadata->getKeyPattern().isEmpty()) { warning() << "skipping orphaned data cleanup for " << ns.toString() - << ", collection is not sharded" << endl; + << ", collection is not sharded"; return CleanupResult_Done; } @@ -99,7 +96,7 @@ CleanupResult cleanupOrphanedData(OperationContext* txn, *errMsg = stream() << "could not cleanup orphaned data, start key " << startingFromKey << " does not match shard key pattern " << keyPattern; - warning() << *errMsg << endl; + warning() << *errMsg; return CleanupResult_Error; } } else { @@ -109,7 +106,7 @@ CleanupResult cleanupOrphanedData(OperationContext* txn, KeyRange orphanRange; if (!metadata->getNextOrphanRange(startingFromKey, &orphanRange)) { LOG(1) << "orphaned data cleanup requested for " << ns.toString() << " starting from " - << startingFromKey << ", no orphan ranges remain" << endl; + << startingFromKey << ", no orphan ranges remain"; return CleanupResult_Done; } @@ -121,7 +118,7 @@ CleanupResult cleanupOrphanedData(OperationContext* txn, LOG(1) << "orphaned data cleanup requested for " << ns.toString() << " starting from " << startingFromKey << ", removing next orphan range" - << " [" << orphanRange.minKey << "," << orphanRange.maxKey << ")" << endl; + << " [" << orphanRange.minKey << "," << orphanRange.maxKey << ")"; // Metadata snapshot may be stale now, but deleter checks metadata again in write lock // before delete. @@ -135,7 +132,7 @@ CleanupResult cleanupOrphanedData(OperationContext* txn, deleterOptions.removeSaverReason = "cleanup-cmd"; if (!getDeleter()->deleteNow(txn, deleterOptions, errMsg)) { - warning() << *errMsg << endl; + warning() << *errMsg; return CleanupResult_Error; } @@ -258,18 +255,19 @@ public: writeConcern.wTimeout = kDefaultWTimeoutMs; } - if (!shardingState.enabled()) { + if (!ShardingState::get(getGlobalServiceContext())->enabled()) { errmsg = str::stream() << "server is not part of a sharded cluster or " << "the sharding metadata is not yet initialized."; return false; } ChunkVersion shardVersion; - status = shardingState.refreshMetadataNow(txn, ns, &shardVersion); + status = ShardingState::get(getGlobalServiceContext()) + ->refreshMetadataNow(txn, ns, &shardVersion); if (!status.isOK()) { if (status.code() == ErrorCodes::RemoteChangeDetected) { warning() << "Shard version in transition detected while refreshing " - << "metadata for " << ns << " at version " << shardVersion << endl; + << "metadata for " << ns << " at version " << shardVersion; } else { errmsg = str::stream() << "failed to refresh shard metadata: " << status.reason(); return false; @@ -292,16 +290,12 @@ public: return true; } -}; + +} cleanupOrphanedCmd; BSONField<string> CleanupOrphanedCommand::nsField("cleanupOrphaned"); BSONField<BSONObj> CleanupOrphanedCommand::startingFromKeyField("startingFromKey"); BSONField<BSONObj> CleanupOrphanedCommand::stoppedAtKeyField("stoppedAtKey"); -MONGO_INITIALIZER(RegisterCleanupOrphanedCommand)(InitializerContext* context) { - // Leaked intentionally: a Command registers itself when constructed. - new CleanupOrphanedCommand(); - return Status::OK(); -} - +} // namespace } // namespace mongo diff --git a/src/mongo/db/commands/create_indexes.cpp b/src/mongo/db/commands/create_indexes.cpp index 890d76d0929..9b9277cafdc 100644 --- a/src/mongo/db/commands/create_indexes.cpp +++ b/src/mongo/db/commands/create_indexes.cpp @@ -292,9 +292,10 @@ private: const BSONObj& newIdxKey) { invariant(txn->lockState()->isCollectionLockedForMode(ns, MODE_X)); - if (shardingState.enabled()) { + if (ShardingState::get(getGlobalServiceContext())->enabled()) { std::shared_ptr<CollectionMetadata> metadata( - shardingState.getCollectionMetadata(ns.toString())); + ShardingState::get(getGlobalServiceContext()) + ->getCollectionMetadata(ns.toString())); if (metadata) { ShardKeyPattern shardKeyPattern(metadata->getKeyPattern()); if (!shardKeyPattern.isUniqueIndexCompatible(newIdxKey)) { diff --git a/src/mongo/db/commands/find_cmd.cpp b/src/mongo/db/commands/find_cmd.cpp index b0875bc764e..f25010c3ce0 100644 --- a/src/mongo/db/commands/find_cmd.cpp +++ b/src/mongo/db/commands/find_cmd.cpp @@ -171,9 +171,9 @@ public: * 6) Save state for getMore. * 7) Generate response to send to the client. * - * TODO: Rather than using the sharding version available in thread-local storage - * (i.e. call to shardingState.needCollectionMetadata() below), shard version - * information should be passed as part of the command parameter. + * TODO: Rather than using the sharding version available in thread-local storage (i.e. the + * call to ShardingState::needCollectionMetadata() below), shard version information + * should be passed as part of the command parameter. */ bool run(OperationContext* txn, const std::string& dbname, @@ -236,10 +236,12 @@ public: const int dbProfilingLevel = ctx.getDb() ? ctx.getDb()->getProfilingLevel() : serverGlobalParams.defaultProfile; + ShardingState* const shardingState = ShardingState::get(txn); + // It is possible that the sharding version will change during yield while we are // retrieving a plan executor. If this happens we will throw an error and mongos will // retry. - const ChunkVersion shardingVersionAtStart = shardingState.getVersion(nss.ns()); + const ChunkVersion shardingVersionAtStart = shardingState->getVersion(nss.ns()); // 3) Get the execution plan for the query. auto statusWithPlanExecutor = @@ -247,18 +249,19 @@ public: if (!statusWithPlanExecutor.isOK()) { return appendCommandStatus(result, statusWithPlanExecutor.getStatus()); } + std::unique_ptr<PlanExecutor> exec = std::move(statusWithPlanExecutor.getValue()); // TODO: Currently, chunk ranges are kept around until all ClientCursors created while // the chunk belonged on this node are gone. Separating chunk lifetime management from // ClientCursor should allow this check to go away. - if (!shardingState.getVersion(nss.ns()).isWriteCompatibleWith(shardingVersionAtStart)) { + if (!shardingState->getVersion(nss.ns()).isWriteCompatibleWith(shardingVersionAtStart)) { // Version changed while retrieving a PlanExecutor. Terminate the operation, // signaling that mongos should retry. throw SendStaleConfigException(nss.ns(), "version changed during find command", shardingVersionAtStart, - shardingState.getVersion(nss.ns())); + shardingState->getVersion(nss.ns())); } if (!collection) { diff --git a/src/mongo/db/commands/merge_chunks_cmd.cpp b/src/mongo/db/commands/merge_chunks_cmd.cpp index 12fccf9ed5e..7c184a58a45 100644 --- a/src/mongo/db/commands/merge_chunks_cmd.cpp +++ b/src/mongo/db/commands/merge_chunks_cmd.cpp @@ -28,13 +28,13 @@ #include "mongo/platform/basic.h" -#include "mongo/base/init.h" #include "mongo/db/auth/action_type.h" #include "mongo/db/auth/authorization_session.h" #include "mongo/db/auth/privilege.h" #include "mongo/db/commands.h" #include "mongo/db/field_parser.h" #include "mongo/db/namespace_string.h" +#include "mongo/db/service_context.h" #include "mongo/db/s/sharding_state.h" #include "mongo/s/d_merge.h" @@ -44,6 +44,8 @@ using std::string; using std::stringstream; using std::vector; +namespace { + /** * Mongod-side command for merging chunks. */ @@ -135,13 +137,13 @@ public: // // This might be the first call from mongos, so we may need to pass the config and shard - // information to initialize the shardingState. + // information to initialize the ShardingState::get(getGlobalServiceContext())-> // string config; FieldParser::FieldState extracted = FieldParser::extract(cmdObj, configField, &config, &errmsg); - if (!shardingState.enabled()) { + if (!ShardingState::get(getGlobalServiceContext())->enabled()) { if (!extracted || extracted == FieldParser::FIELD_NONE) { errmsg = "sharding state must be enabled or " @@ -149,7 +151,7 @@ public: return false; } - shardingState.initialize(config); + ShardingState::get(getGlobalServiceContext())->initialize(config); } // ShardName is optional, but might not be set yet @@ -159,7 +161,7 @@ public: if (!extracted) return false; if (extracted != FieldParser::FIELD_NONE) { - shardingState.gotShardName(shardName); + ShardingState::get(getGlobalServiceContext())->gotShardName(shardName); } // @@ -173,7 +175,8 @@ public: return mergeChunks(txn, NamespaceString(ns), minKey, maxKey, epoch, &errmsg); } -}; + +} mergeChunksCmd; BSONField<string> MergeChunksCommand::nsField("mergeChunks"); BSONField<vector<BSONObj>> MergeChunksCommand::boundsField("bounds"); @@ -182,9 +185,5 @@ BSONField<string> MergeChunksCommand::configField("config"); BSONField<string> MergeChunksCommand::shardNameField("shardName"); BSONField<OID> MergeChunksCommand::epochField("epoch"); -MONGO_INITIALIZER(InitMergeChunksCommand)(InitializerContext* context) { - // Leaked intentionally: a Command registers itself when constructed. - new MergeChunksCommand(); - return Status::OK(); -} -} +} // namespace +} // namespace mongo diff --git a/src/mongo/db/commands/mr.cpp b/src/mongo/db/commands/mr.cpp index 6c45af50775..307f848d3f9 100644 --- a/src/mongo/db/commands/mr.cpp +++ b/src/mongo/db/commands/mr.cpp @@ -1299,8 +1299,10 @@ public: // Get metadata before we check our version, to make sure it doesn't increment // in the meantime. Need to do this in the same lock scope as the block. - if (shardingState.needCollectionMetadata(client, config.ns)) { - collMetadata = shardingState.getCollectionMetadata(config.ns); + if (ShardingState::get(getGlobalServiceContext()) + ->needCollectionMetadata(client, config.ns)) { + collMetadata = + ShardingState::get(getGlobalServiceContext())->getCollectionMetadata(config.ns); } } @@ -1623,7 +1625,7 @@ public: // Fetch result from other shards 1 chunk at a time. It would be better to do // just one big $or query, but then the sorting would not be efficient. - const string shardName = shardingState.getShardName(); + const string shardName = ShardingState::get(getGlobalServiceContext())->getShardName(); const ChunkMap& chunkMap = cm->getChunkMap(); for (ChunkMap::const_iterator it = chunkMap.begin(); it != chunkMap.end(); ++it) { diff --git a/src/mongo/db/commands/write_commands/batch_executor.cpp b/src/mongo/db/commands/write_commands/batch_executor.cpp index d6475bd8cca..e2db3bee324 100644 --- a/src/mongo/db/commands/write_commands/batch_executor.cpp +++ b/src/mongo/db/commands/write_commands/batch_executor.cpp @@ -273,20 +273,22 @@ void WriteBatchExecutor::executeBatch(const BatchedCommandRequest& request, dassert(requestMetadata); // Make sure our shard name is set or is the same as what was set previously - if (shardingState.setShardName(requestMetadata->getShardName())) { + if (ShardingState::get(getGlobalServiceContext()) + ->setShardName(requestMetadata->getShardName())) { // // First, we refresh metadata if we need to based on the requested version. // ChunkVersion latestShardVersion; - shardingState.refreshMetadataIfNeeded(_txn, - request.getTargetingNS(), - requestMetadata->getShardVersion(), - &latestShardVersion); + ShardingState::get(getGlobalServiceContext()) + ->refreshMetadataIfNeeded(_txn, + request.getTargetingNS(), + requestMetadata->getShardVersion(), + &latestShardVersion); // Report if we're still changing our metadata // TODO: Better reporting per-collection - if (shardingState.inCriticalMigrateSection()) { + if (ShardingState::get(getGlobalServiceContext())->inCriticalMigrateSection()) { noteInCriticalSection(writeErrors.back()); } @@ -309,12 +311,14 @@ void WriteBatchExecutor::executeBatch(const BatchedCommandRequest& request, if (requestShardVersion.isOlderThan(latestShardVersion) && !requestShardVersion.isWriteCompatibleWith(latestShardVersion)) { - while (shardingState.inCriticalMigrateSection()) { + while ( + ShardingState::get(getGlobalServiceContext())->inCriticalMigrateSection()) { log() << "write request to old shard version " << requestMetadata->getShardVersion().toString() << " waiting for migration commit" << endl; - shardingState.waitTillNotInCriticalSection(10 /* secs */); + ShardingState::get(getGlobalServiceContext()) + ->waitTillNotInCriticalSection(10 /* secs */); } } } @@ -395,7 +399,6 @@ static void buildStaleError(const ChunkVersion& shardVersionRecvd, } static bool checkShardVersion(OperationContext* txn, - ShardingState* shardingState, const BatchedCommandRequest& request, WriteOpResult* result) { const NamespaceString& nss = request.getTargetingNSS(); @@ -406,6 +409,7 @@ static bool checkShardVersion(OperationContext* txn, ? request.getMetadata()->getShardVersion() : ChunkVersion::IGNORED(); + ShardingState* shardingState = ShardingState::get(getGlobalServiceContext()); if (shardingState->enabled()) { CollectionMetadataPtr metadata = shardingState->getCollectionMetadata(nss.ns()); @@ -445,7 +449,6 @@ static void buildUniqueIndexError(const BSONObj& keyPattern, } static bool checkIndexConstraints(OperationContext* txn, - ShardingState* shardingState, const BatchedCommandRequest& request, WriteOpResult* result) { const NamespaceString& nss = request.getTargetingNSS(); @@ -454,6 +457,7 @@ static bool checkIndexConstraints(OperationContext* txn, if (!request.isUniqueIndexRequest()) return true; + ShardingState* shardingState = ShardingState::get(getGlobalServiceContext()); if (shardingState->enabled()) { CollectionMetadataPtr metadata = shardingState->getCollectionMetadata(nss.ns()); @@ -944,10 +948,10 @@ bool WriteBatchExecutor::ExecInsertsState::_lockAndCheckImpl(WriteOpResult* resu if (!checkIsMasterForDatabase(nss, result)) { return false; } - if (!checkShardVersion(txn, &shardingState, *request, result)) { + if (!checkShardVersion(txn, *request, result)) { return false; } - if (!checkIndexConstraints(txn, &shardingState, *request, result)) { + if (!checkIndexConstraints(txn, *request, result)) { return false; } @@ -1181,8 +1185,9 @@ static void multiUpdate(OperationContext* txn, return; } - if (!checkShardVersion(txn, &shardingState, *updateItem.getRequest(), result)) + if (!checkShardVersion(txn, *updateItem.getRequest(), result)) { return; + } Database* const db = dbHolder().get(txn, nsString.db()); @@ -1324,7 +1329,7 @@ static void multiRemove(OperationContext* txn, } // Check version once we're locked - if (!checkShardVersion(txn, &shardingState, *removeItem.getRequest(), result)) { + if (!checkShardVersion(txn, *removeItem.getRequest(), result)) { // Version error return; } diff --git a/src/mongo/db/dbcommands.cpp b/src/mongo/db/dbcommands.cpp index 45d34aff374..8c219052961 100644 --- a/src/mongo/db/dbcommands.cpp +++ b/src/mongo/db/dbcommands.cpp @@ -47,28 +47,23 @@ #include "mongo/db/auth/user_management_commands_parser.h" #include "mongo/db/auth/user_name.h" #include "mongo/db/background.h" -#include "mongo/db/clientcursor.h" #include "mongo/db/catalog/coll_mod.h" #include "mongo/db/catalog/collection.h" #include "mongo/db/catalog/collection_catalog_entry.h" #include "mongo/db/catalog/create_collection.h" #include "mongo/db/catalog/drop_collection.h" #include "mongo/db/catalog/drop_database.h" -#include "mongo/db/catalog/database_catalog_entry.h" #include "mongo/db/clientcursor.h" #include "mongo/db/commands.h" #include "mongo/db/commands/server_status.h" #include "mongo/db/commands/shutdown.h" #include "mongo/db/concurrency/write_conflict_exception.h" -#include "mongo/db/db.h" #include "mongo/db/db_raii.h" #include "mongo/db/dbdirectclient.h" #include "mongo/db/dbhelpers.h" -#include "mongo/db/service_context_d.h" -#include "mongo/db/service_context.h" #include "mongo/db/index_builder.h" -#include "mongo/db/index/index_access_method.h" #include "mongo/db/index/index_descriptor.h" +#include "mongo/db/index/index_access_method.h" #include "mongo/db/instance.h" #include "mongo/db/introspect.h" #include "mongo/db/jsobj.h" @@ -676,6 +671,7 @@ public: while (c->more()) PRINT(c->nextSafe()); } + } cmdFileMD5; @@ -1316,7 +1312,7 @@ bool Command::run(OperationContext* txn, // TODO: refactor out of here as part of SERVER-18326 bool isReplSet = replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet; - if (isReplSet && shardingState.enabled()) { + if (isReplSet && ShardingState::get(txn)->enabled()) { rpc::ShardingMetadata( repl::ReplClientInfo::forClient(txn->getClient()).getLastOp().getTimestamp(), replCoord->getElectionId()).writeToMetadata(&metadataBob); diff --git a/src/mongo/db/dbhelpers.cpp b/src/mongo/db/dbhelpers.cpp index 6618ef14b9b..d561b18beff 100644 --- a/src/mongo/db/dbhelpers.cpp +++ b/src/mongo/db/dbhelpers.cpp @@ -388,13 +388,13 @@ long long Helpers::removeRange(OperationContext* txn, // We should never be able to turn off the sharding state once enabled, but // in the future we might want to. - verify(shardingState.enabled()); + verify(ShardingState::get(getGlobalServiceContext())->enabled()); bool docIsOrphan; // In write lock, so will be the most up-to-date version std::shared_ptr<CollectionMetadata> metadataNow = - shardingState.getCollectionMetadata(ns); + ShardingState::get(getGlobalServiceContext())->getCollectionMetadata(ns); if (metadataNow) { ShardKeyPattern kp(metadataNow->getKeyPattern()); BSONObj key = kp.extractShardKeyFromDoc(obj); diff --git a/src/mongo/db/operation_context.cpp b/src/mongo/db/operation_context.cpp index a327d2c18f3..1b79dadcb50 100644 --- a/src/mongo/db/operation_context.cpp +++ b/src/mongo/db/operation_context.cpp @@ -28,6 +28,7 @@ #include "mongo/platform/basic.h" +#include "mongo/db/client.h" #include "mongo/db/operation_context.h" #include "mongo/util/assert_util.h" @@ -37,8 +38,11 @@ namespace mongo { OperationContext::OperationContext(Client* client, unsigned int opId, Locker* locker) : _client(client), _opId(opId), _locker(locker) {} +ServiceContext* OperationContext::getServiceContext() const { + return _client->getServiceContext(); +} + Client* OperationContext::getClient() const { - invariant(_client); return _client; } diff --git a/src/mongo/db/operation_context.h b/src/mongo/db/operation_context.h index 741279f48e0..6dd13e9d095 100644 --- a/src/mongo/db/operation_context.h +++ b/src/mongo/db/operation_context.h @@ -42,6 +42,7 @@ class Client; class CurOp; class Locker; class ProgressMeter; +class ServiceContext; class StringData; class WriteUnitOfWork; @@ -137,6 +138,11 @@ public: virtual std::string getNS() const = 0; /** + * Returns the service context under which this operation context runs. + */ + ServiceContext* getServiceContext() const; + + /** * Returns the client under which this context runs. */ Client* getClient() const; diff --git a/src/mongo/db/ops/update_lifecycle_impl.cpp b/src/mongo/db/ops/update_lifecycle_impl.cpp index fe0b9710f30..ff8d76bc71d 100644 --- a/src/mongo/db/ops/update_lifecycle_impl.cpp +++ b/src/mongo/db/ops/update_lifecycle_impl.cpp @@ -43,8 +43,8 @@ namespace mongo { namespace { std::shared_ptr<CollectionMetadata> getMetadata(const NamespaceString& nsString) { - if (shardingState.enabled()) { - return shardingState.getCollectionMetadata(nsString.ns()); + if (ShardingState::get(getGlobalServiceContext())->enabled()) { + return ShardingState::get(getGlobalServiceContext())->getCollectionMetadata(nsString.ns()); } return nullptr; diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp index d71b36c49a6..43ba3dcb2dd 100644 --- a/src/mongo/db/pipeline/pipeline_d.cpp +++ b/src/mongo/db/pipeline/pipeline_d.cpp @@ -40,6 +40,7 @@ #include "mongo/db/pipeline/pipeline.h" #include "mongo/db/query/get_executor.h" #include "mongo/db/query/query_planner.h" +#include "mongo/db/service_context.h" #include "mongo/db/s/sharded_connection_info.h" #include "mongo/db/s/sharding_state.h" @@ -65,7 +66,9 @@ public: bool isSharded(const NamespaceString& ns) final { const ChunkVersion unsharded(0, 0, OID()); - return !(shardingState.getVersion(ns.ns()).isWriteCompatibleWith(unsharded)); + return !(ShardingState::get(getGlobalServiceContext()) + ->getVersion(ns.ns()) + .isWriteCompatibleWith(unsharded)); } bool isCapped(const NamespaceString& ns) final { diff --git a/src/mongo/db/query/find.cpp b/src/mongo/db/query/find.cpp index e90e4d18779..75d9b86e9e0 100644 --- a/src/mongo/db/query/find.cpp +++ b/src/mongo/db/query/find.cpp @@ -566,7 +566,8 @@ std::string runQuery(OperationContext* txn, } // We freak out later if this changes before we're done with the query. - const ChunkVersion shardingVersionAtStart = shardingState.getVersion(nss.ns()); + const ChunkVersion shardingVersionAtStart = + ShardingState::get(getGlobalServiceContext())->getVersion(nss.ns()); // Handle query option $maxTimeMS (not used with commands). curop.setMaxTimeMicros(static_cast<unsigned long long>(pq.getMaxTimeMS()) * 1000); @@ -640,13 +641,16 @@ std::string runQuery(OperationContext* txn, // TODO: Currently, chunk ranges are kept around until all ClientCursors created while the // chunk belonged on this node are gone. Separating chunk lifetime management from // ClientCursor should allow this check to go away. - if (!shardingState.getVersion(nss.ns()).isWriteCompatibleWith(shardingVersionAtStart)) { + if (!ShardingState::get(getGlobalServiceContext()) + ->getVersion(nss.ns()) + .isWriteCompatibleWith(shardingVersionAtStart)) { // if the version changed during the query we might be missing some data and its safe to // send this as mongos can resend at this point - throw SendStaleConfigException(nss.ns(), - "version changed during initial query", - shardingVersionAtStart, - shardingState.getVersion(nss.ns())); + throw SendStaleConfigException( + nss.ns(), + "version changed during initial query", + shardingVersionAtStart, + ShardingState::get(getGlobalServiceContext())->getVersion(nss.ns())); } // Fill out curop based on query results. If we have a cursorid, we will fill out curop with diff --git a/src/mongo/db/query/get_executor.cpp b/src/mongo/db/query/get_executor.cpp index bd32c486c53..00e09a99e62 100644 --- a/src/mongo/db/query/get_executor.cpp +++ b/src/mongo/db/query/get_executor.cpp @@ -170,7 +170,8 @@ void fillOutPlannerParams(OperationContext* txn, // If the caller wants a shard filter, make sure we're actually sharded. if (plannerParams->options & QueryPlannerParams::INCLUDE_SHARD_FILTER) { std::shared_ptr<CollectionMetadata> collMetadata = - shardingState.getCollectionMetadata(canonicalQuery->ns()); + ShardingState::get(getGlobalServiceContext()) + ->getCollectionMetadata(canonicalQuery->ns()); if (collMetadata) { plannerParams->shardKey = collMetadata->getKeyPattern(); } else { @@ -245,8 +246,10 @@ Status prepareExecution(OperationContext* opCtx, // Might have to filter out orphaned docs. if (plannerParams.options & QueryPlannerParams::INCLUDE_SHARD_FILTER) { - *rootOut = new ShardFilterStage( - shardingState.getCollectionMetadata(collection->ns().ns()), ws, *rootOut); + *rootOut = new ShardFilterStage(ShardingState::get(getGlobalServiceContext()) + ->getCollectionMetadata(collection->ns().ns()), + ws, + *rootOut); } // There might be a projection. The idhack stage will always fetch the full @@ -470,8 +473,10 @@ StatusWith<unique_ptr<PlanExecutor>> getExecutor(OperationContext* txn, // Might have to filter out orphaned docs. if (plannerOptions & QueryPlannerParams::INCLUDE_SHARD_FILTER) { - root = make_unique<ShardFilterStage>( - shardingState.getCollectionMetadata(collection->ns().ns()), ws.get(), root.release()); + root = make_unique<ShardFilterStage>(ShardingState::get(getGlobalServiceContext()) + ->getCollectionMetadata(collection->ns().ns()), + ws.get(), + root.release()); } return PlanExecutor::make(txn, std::move(ws), std::move(root), collection, yieldPolicy); @@ -599,7 +604,8 @@ StatusWith<unique_ptr<PlanExecutor>> getExecutorFind(OperationContext* txn, } size_t options = QueryPlannerParams::DEFAULT; - if (shardingState.needCollectionMetadata(txn->getClient(), nss.ns())) { + if (ShardingState::get(getGlobalServiceContext()) + ->needCollectionMetadata(txn->getClient(), nss.ns())) { options |= QueryPlannerParams::INCLUDE_SHARD_FILTER; } return getExecutor( diff --git a/src/mongo/db/query/query_planner_params.h b/src/mongo/db/query/query_planner_params.h index 9e0b1a32fb3..cfa41b08f83 100644 --- a/src/mongo/db/query/query_planner_params.h +++ b/src/mongo/db/query/query_planner_params.h @@ -58,7 +58,8 @@ struct QueryPlannerParams { // shouldn't be on this shard" stage before projection. // // In order to set this, you must check - // shardingState.needCollectionMetadata(current_namespace) in the same lock that you use + // ShardingState::get(getGlobalServiceContext())->needCollectionMetadata(current_namespace) + // in the same lock that you use // to build the query executor. You must also wrap the PlanExecutor in a ClientCursor // within the same lock. See the comment on ShardFilterStage for details. INCLUDE_SHARD_FILTER = 1 << 2, diff --git a/src/mongo/db/query/stage_builder.cpp b/src/mongo/db/query/stage_builder.cpp index 07db4afa5fa..c0c29efa91a 100644 --- a/src/mongo/db/query/stage_builder.cpp +++ b/src/mongo/db/query/stage_builder.cpp @@ -283,8 +283,10 @@ PlanStage* buildStages(OperationContext* txn, if (NULL == childStage) { return NULL; } - return new ShardFilterStage( - shardingState.getCollectionMetadata(collection->ns().ns()), ws, childStage); + return new ShardFilterStage(ShardingState::get(getGlobalServiceContext()) + ->getCollectionMetadata(collection->ns().ns()), + ws, + childStage); } else if (STAGE_KEEP_MUTATIONS == root->getType()) { const KeepMutationsNode* km = static_cast<const KeepMutationsNode*>(root); PlanStage* childStage = buildStages(txn, collection, qsol, km->children[0], ws); diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index 489bc753b99..438eec81859 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -302,7 +302,7 @@ void ReplicationCoordinatorExternalStateImpl::killAllUserOperations(OperationCon } void ReplicationCoordinatorExternalStateImpl::clearShardingState() { - shardingState.clearCollectionMetadata(); + ShardingState::get(getGlobalServiceContext())->clearCollectionMetadata(); } void ReplicationCoordinatorExternalStateImpl::signalApplierToChooseNewSyncSource() { diff --git a/src/mongo/db/s/sharding_state.cpp b/src/mongo/db/s/sharding_state.cpp index dbcabce83c1..1ca8eea8c1e 100644 --- a/src/mongo/db/s/sharding_state.cpp +++ b/src/mongo/db/s/sharding_state.cpp @@ -58,8 +58,11 @@ using std::shared_ptr; using std::string; using std::vector; -// Global sharding state instance -ShardingState shardingState; +namespace { + +const auto getShardingState = ServiceContext::declareDecoration<ShardingState>(); + +} // namespace bool isMongos() { return false; @@ -71,6 +74,14 @@ ShardingState::ShardingState() ShardingState::~ShardingState() = default; +ShardingState* ShardingState::get(ServiceContext* serviceContext) { + return &getShardingState(serviceContext); +} + +ShardingState* ShardingState::get(OperationContext* operationContext) { + return ShardingState::get(operationContext->getServiceContext()); +} + bool ShardingState::enabled() { stdx::lock_guard<stdx::mutex> lk(_mutex); return _enabled; diff --git a/src/mongo/db/s/sharding_state.h b/src/mongo/db/s/sharding_state.h index b4c666ab5bd..2a9d835027f 100644 --- a/src/mongo/db/s/sharding_state.h +++ b/src/mongo/db/s/sharding_state.h @@ -46,6 +46,7 @@ struct ChunkVersion; class Client; class CollectionMetadata; class OperationContext; +class ServiceContext; class Status; /** @@ -58,6 +59,17 @@ public: ShardingState(); ~ShardingState(); + /** + * Retrieves the sharding state object associated with the specified service context. This + * method must only be called if ShardingState decoration has been created on the service + * context, otherwise it will fassert. In other words, it may only be called on MongoD and + * tests, which specifically require and instantiate ShardingState. + * + * Returns the instance's ShardingState. + */ + static ShardingState* get(ServiceContext* serviceContext); + static ShardingState* get(OperationContext* operationContext); + bool enabled(); std::string getConfigServer(); @@ -304,7 +316,4 @@ private: CollectionMetadataMap _collMetadata; }; -// Global sharding state instance -extern ShardingState shardingState; - } // namespace mongo diff --git a/src/mongo/dbtests/config_server_fixture.cpp b/src/mongo/dbtests/config_server_fixture.cpp index 062f08d1bdd..0c6705c19a6 100644 --- a/src/mongo/dbtests/config_server_fixture.cpp +++ b/src/mongo/dbtests/config_server_fixture.cpp @@ -35,6 +35,7 @@ #include <list> #include "mongo/dbtests/dbtests.h" +#include "mongo/db/service_context.h" #include "mongo/db/s/sharding_state.h" #include "mongo/s/catalog/type_chunk.h" #include "mongo/s/catalog/legacy/legacy_dist_lock_manager.h" @@ -75,8 +76,8 @@ void ConfigServerFixture::setUp() { BSON(ChunkType::ns() << 1 << ChunkType::DEPRECATED_lastmod() << 1))); ConnectionString connStr(uassertStatusOK(ConnectionString::parse("$dummy:10000"))); - shardingState.initialize(connStr.toString()); - shardingState.gotShardName(shardName()); + ShardingState::get(getGlobalServiceContext())->initialize(connStr.toString()); + ShardingState::get(getGlobalServiceContext())->gotShardName(shardName()); } void ConfigServerFixture::clearServer() { @@ -108,7 +109,7 @@ void ConfigServerFixture::dumpServer() { } void ConfigServerFixture::tearDown() { - shardingState.clearCollectionMetadata(); + ShardingState::get(getGlobalServiceContext())->clearCollectionMetadata(); clearServer(); // Make all connections redirect to the direct client diff --git a/src/mongo/dbtests/framework.cpp b/src/mongo/dbtests/framework.cpp index 944b39c5056..5691f490c45 100644 --- a/src/mongo/dbtests/framework.cpp +++ b/src/mongo/dbtests/framework.cpp @@ -119,7 +119,7 @@ int runDbTests(int argc, char** argv) { getGlobalServiceContext()->initializeGlobalStorageEngine(); // Initialize the sharding state so we can run starding tests in isolation - shardingState.initialize("$dummy:10000"); + ShardingState::get(getGlobalServiceContext())->initialize("$dummy:10000"); // Note: ShardingState::initialize also initializes the distLockMgr. auto distLockMgr = diff --git a/src/mongo/dbtests/merge_chunk_tests.cpp b/src/mongo/dbtests/merge_chunk_tests.cpp index a7b51248418..c80692f0271 100644 --- a/src/mongo/dbtests/merge_chunk_tests.cpp +++ b/src/mongo/dbtests/merge_chunk_tests.cpp @@ -29,6 +29,7 @@ #include "mongo/platform/basic.h" #include "mongo/db/range_arithmetic.h" +#include "mongo/db/service_context.h" #include "mongo/db/s/collection_metadata.h" #include "mongo/db/s/sharding_state.h" #include "mongo/dbtests/config_server_fixture.h" @@ -232,8 +233,9 @@ TEST_F(MergeChunkTests, BasicMerge) { // Get latest version ChunkVersion latestVersion; - shardingState.refreshMetadataNow(&_txn, nss.ns(), &latestVersion); - shardingState.resetMetadata(nss.ns()); + ShardingState::get(getGlobalServiceContext()) + ->refreshMetadataNow(&_txn, nss.ns(), &latestVersion); + ShardingState::get(getGlobalServiceContext())->resetMetadata(nss.ns()); // Do merge string errMsg; @@ -242,7 +244,8 @@ TEST_F(MergeChunkTests, BasicMerge) { ASSERT(result); // Verify result - CollectionMetadataPtr metadata = shardingState.getCollectionMetadata(nss.ns()); + CollectionMetadataPtr metadata = + ShardingState::get(getGlobalServiceContext())->getCollectionMetadata(nss.ns()); ChunkType chunk; ASSERT(metadata->getNextChunk(BSON("x" << 0), &chunk)); @@ -269,8 +272,9 @@ TEST_F(MergeChunkTests, BasicMergeMinMax) { // Get latest version ChunkVersion latestVersion; - shardingState.refreshMetadataNow(&_txn, nss.ns(), &latestVersion); - shardingState.resetMetadata(nss.ns()); + ShardingState::get(getGlobalServiceContext()) + ->refreshMetadataNow(&_txn, nss.ns(), &latestVersion); + ShardingState::get(getGlobalServiceContext())->resetMetadata(nss.ns()); // Do merge string errMsg; @@ -279,7 +283,8 @@ TEST_F(MergeChunkTests, BasicMergeMinMax) { ASSERT(result); // Verify result - CollectionMetadataPtr metadata = shardingState.getCollectionMetadata(nss.ns()); + CollectionMetadataPtr metadata = + ShardingState::get(getGlobalServiceContext())->getCollectionMetadata(nss.ns()); ChunkType chunk; ASSERT(metadata->getNextChunk(BSON("x" << MINKEY), &chunk)); @@ -308,8 +313,9 @@ TEST_F(MergeChunkTests, CompoundMerge) { // Get latest version ChunkVersion latestVersion; - shardingState.refreshMetadataNow(&_txn, nss.ns(), &latestVersion); - shardingState.resetMetadata(nss.ns()); + ShardingState::get(getGlobalServiceContext()) + ->refreshMetadataNow(&_txn, nss.ns(), &latestVersion); + ShardingState::get(getGlobalServiceContext())->resetMetadata(nss.ns()); // Do merge string errMsg; @@ -319,7 +325,8 @@ TEST_F(MergeChunkTests, CompoundMerge) { ASSERT(result); // Verify result - CollectionMetadataPtr metadata = shardingState.getCollectionMetadata(nss.ns()); + CollectionMetadataPtr metadata = + ShardingState::get(getGlobalServiceContext())->getCollectionMetadata(nss.ns()); ChunkType chunk; ASSERT(metadata->getNextChunk(BSON("x" << 0 << "y" << 1), &chunk)); diff --git a/src/mongo/s/d_merge.cpp b/src/mongo/s/d_merge.cpp index 7c40db13bcf..3fb9b5c7b9c 100644 --- a/src/mongo/s/d_merge.cpp +++ b/src/mongo/s/d_merge.cpp @@ -72,7 +72,8 @@ bool mergeChunks(OperationContext* txn, // Get sharding state up-to-date // - ConnectionString configLoc = ConnectionString::parse(shardingState.getConfigServer(), *errMsg); + ConnectionString configLoc = ConnectionString::parse( + ShardingState::get(txn)->getConfigServer(), *errMsg); if (!configLoc.isValid()) { warning() << *errMsg; return false; @@ -95,12 +96,14 @@ bool mergeChunks(OperationContext* txn, return false; } + ShardingState* shardingState = ShardingState::get(txn); + // // We now have the collection lock, refresh metadata to latest version and sanity check // ChunkVersion shardVersion; - Status status = shardingState.refreshMetadataNow(txn, nss.ns(), &shardVersion); + Status status = shardingState->refreshMetadataNow(txn, nss.ns(), &shardVersion); if (!status.isOK()) { *errMsg = str::stream() << "could not merge chunks, failed to refresh metadata for " @@ -120,7 +123,8 @@ bool mergeChunks(OperationContext* txn, return false; } - shared_ptr<CollectionMetadata> metadata = shardingState.getCollectionMetadata(nss.ns()); + shared_ptr<CollectionMetadata> metadata = + shardingState->getCollectionMetadata(nss.ns()); if (!metadata || metadata->getKeyPattern().isEmpty()) { *errMsg = stream() << "could not merge chunks, collection " << nss.ns() @@ -155,7 +159,7 @@ bool mergeChunks(OperationContext* txn, itChunk.setMin(minKey); itChunk.setMax(minKey); itChunk.setNS(nss.ns()); - itChunk.setShard(shardingState.getShardName()); + itChunk.setShard(shardingState->getShardName()); while (itChunk.getMax().woCompare(maxKey) < 0 && metadata->getNextChunk(itChunk.getMax(), &itChunk)) { @@ -165,7 +169,8 @@ bool mergeChunks(OperationContext* txn, if (chunksToMerge.empty()) { *errMsg = stream() << "could not merge chunks, collection " << nss.ns() << " range starting at " << minKey << " and ending at " << maxKey - << " does not belong to shard " << shardingState.getShardName(); + << " does not belong to shard " + << shardingState->getShardName(); warning() << *errMsg; return false; @@ -183,7 +188,7 @@ bool mergeChunks(OperationContext* txn, if (!minKeyInRange) { *errMsg = stream() << "could not merge chunks, collection " << nss.ns() << " range starting at " << minKey << " does not belong to shard " - << shardingState.getShardName(); + << shardingState->getShardName(); warning() << *errMsg; return false; @@ -197,7 +202,7 @@ bool mergeChunks(OperationContext* txn, if (!maxKeyInRange) { *errMsg = stream() << "could not merge chunks, collection " << nss.ns() << " range ending at " << maxKey << " does not belong to shard " - << shardingState.getShardName(); + << shardingState->getShardName(); warning() << *errMsg; return false; @@ -255,7 +260,8 @@ bool mergeChunks(OperationContext* txn, ScopedTransaction transaction(txn, MODE_IX); Lock::DBLock writeLk(txn->lockState(), nss.db(), MODE_IX); Lock::CollectionLock collLock(txn->lockState(), nss.ns(), MODE_X); - shardingState.mergeChunks(txn, nss.ns(), minKey, maxKey, mergeVersion); + + shardingState->mergeChunks(txn, nss.ns(), minKey, maxKey, mergeVersion); } // diff --git a/src/mongo/s/d_migrate.cpp b/src/mongo/s/d_migrate.cpp index 82f432b8925..b98c02dada8 100644 --- a/src/mongo/s/d_migrate.cpp +++ b/src/mongo/s/d_migrate.cpp @@ -62,7 +62,6 @@ #include "mongo/db/range_deleter_service.h" #include "mongo/db/repl/repl_client_info.h" #include "mongo/db/repl/replication_coordinator_global.h" -#include "mongo/db/service_context.h" #include "mongo/db/storage/mmap_v1/dur.h" #include "mongo/db/s/collection_metadata.h" #include "mongo/db/s/sharded_connection_info.h" @@ -1122,7 +1121,7 @@ public: // This could be the first call that enables sharding - make sure we initialize the // sharding state for this shard. - if (!shardingState.enabled()) { + if (!ShardingState::get(txn)->enabled()) { if (cmdObj["configdb"].type() != String) { errmsg = "sharding not enabled"; warning() << errmsg; @@ -1130,11 +1129,11 @@ public: } const string configdb = cmdObj["configdb"].String(); - shardingState.initialize(configdb); + ShardingState::get(txn)->initialize(configdb); } // Initialize our current shard name in the shard state if needed - shardingState.gotShardName(fromShardName); + ShardingState::get(txn)->gotShardName(fromShardName); // Make sure we're as up-to-date as possible with shard information // This catches the case where we had to previously changed a shard's host by @@ -1142,7 +1141,7 @@ public: Shard::reloadShardInfo(); ConnectionString configLoc = - ConnectionString::parse(shardingState.getConfigServer(), errmsg); + ConnectionString::parse(ShardingState::get(txn)->getConfigServer(), errmsg); if (!configLoc.isValid()) { warning() << errmsg; return false; @@ -1189,7 +1188,8 @@ public: // Always refresh our metadata remotely ChunkVersion origShardVersion; - Status refreshStatus = shardingState.refreshMetadataNow(txn, ns, &origShardVersion); + Status refreshStatus = + ShardingState::get(txn)->refreshMetadataNow(txn, ns, &origShardVersion); if (!refreshStatus.isOK()) { errmsg = str::stream() << "moveChunk cannot start migrate of chunk " @@ -1228,7 +1228,7 @@ public: // Get collection metadata const std::shared_ptr<CollectionMetadata> origCollMetadata( - shardingState.getCollectionMetadata(ns)); + ShardingState::get(txn)->getCollectionMetadata(ns)); // With nonzero shard version, we must have metadata invariant(NULL != origCollMetadata); @@ -1312,7 +1312,8 @@ public: recvChunkStartBuilder.append("min", min); recvChunkStartBuilder.append("max", max); recvChunkStartBuilder.append("shardKeyPattern", shardKeyPattern); - recvChunkStartBuilder.append("configServer", shardingState.getConfigServer()); + recvChunkStartBuilder.append("configServer", + ShardingState::get(txn)->getConfigServer()); recvChunkStartBuilder.append("secondaryThrottle", isSecondaryThrottle); // Follow the same convention in moveChunk. @@ -1466,12 +1467,12 @@ public: ScopedTransaction transaction(txn, MODE_IX); Lock::DBLock lk(txn->lockState(), nsToDatabaseSubstring(ns), MODE_IX); Lock::CollectionLock collLock(txn->lockState(), ns, MODE_X); - verify(myVersion > shardingState.getVersion(ns)); + verify(myVersion > ShardingState::get(txn)->getVersion(ns)); // bump the metadata's version up and "forget" about the chunk being moved // this is not the commit point but in practice the state in this shard won't // until the commit it done - shardingState.donateChunk(txn, ns, min, max, myVersion); + ShardingState::get(txn)->donateChunk(txn, ns, min, max, myVersion); } log() << "moveChunk setting version to: " << myVersion << migrateLog; @@ -1508,7 +1509,7 @@ public: // revert the chunk manager back to the state before "forgetting" about the // chunk - shardingState.undoDonateChunk(txn, ns, origCollMetadata); + ShardingState::get(txn)->undoDonateChunk(txn, ns, origCollMetadata); } log() << "Shard version successfully reset to clean up failed migration"; @@ -1562,7 +1563,7 @@ public: // well. we can figure that out by grabbing the metadata installed on 5.a const std::shared_ptr<CollectionMetadata> bumpedCollMetadata( - shardingState.getCollectionMetadata(ns)); + ShardingState::get(txn)->getCollectionMetadata(ns)); if (bumpedCollMetadata->getNumChunks() > 0) { // get another chunk on that shard ChunkType bumpChunk; @@ -1634,7 +1635,7 @@ public: if (MONGO_FAIL_POINT(failMigrationApplyOps)) { throw SocketException(SocketException::RECV_ERROR, - shardingState.getConfigServer()); + ShardingState::get(txn)->getConfigServer()); } } catch (const DBException& ex) { warning() << ex << migrateLog; @@ -1658,7 +1659,7 @@ public: // Revert the metadata back to the state before "forgetting" // about the chunk. - shardingState.undoDonateChunk(txn, ns, origCollMetadata); + ShardingState::get(txn)->undoDonateChunk(txn, ns, origCollMetadata); } log() << "Shard version successfully reset to clean up failed migration"; @@ -1890,7 +1891,7 @@ public: Lock::CollectionLock collLock(txn->lockState(), ns, MODE_X); string errMsg; - if (!shardingState.forgetPending(txn, ns, min, max, epoch, &errMsg)) { + if (!ShardingState::get(txn)->forgetPending(txn, ns, min, max, epoch, &errMsg)) { warning() << errMsg; } } @@ -2065,7 +2066,7 @@ public: Lock::DBLock dbLock(txn->lockState(), nsToDatabaseSubstring(ns), MODE_IX); Lock::CollectionLock collLock(txn->lockState(), ns, MODE_X); - if (!shardingState.notePending(txn, ns, min, max, epoch, &errmsg)) { + if (!ShardingState::get(txn)->notePending(txn, ns, min, max, epoch, &errmsg)) { warning() << errmsg; setState(FAIL); return; @@ -2682,10 +2683,10 @@ public: return false; } - if (!shardingState.enabled()) { + if (!ShardingState::get(txn)->enabled()) { if (!cmdObj["configServer"].eoo()) { dassert(cmdObj["configServer"].type() == String); - shardingState.initialize(cmdObj["configServer"].String()); + ShardingState::get(txn)->initialize(cmdObj["configServer"].String()); } else { errmsg = str::stream() << "cannot start recv'ing chunk, " @@ -2698,7 +2699,7 @@ public: if (!cmdObj["toShardName"].eoo()) { dassert(cmdObj["toShardName"].type() == String); - shardingState.gotShardName(cmdObj["toShardName"].String()); + ShardingState::get(txn)->gotShardName(cmdObj["toShardName"].String()); } string ns = cmdObj.firstElement().String(); @@ -2710,7 +2711,7 @@ public: // We force the remote refresh here to make the behavior consistent and predictable, // generally we'd refresh anyway, and to be paranoid. ChunkVersion currentVersion; - Status status = shardingState.refreshMetadataNow(txn, ns, ¤tVersion); + Status status = ShardingState::get(txn)->refreshMetadataNow(txn, ns, ¤tVersion); if (!status.isOK()) { errmsg = str::stream() << "cannot start recv'ing chunk " diff --git a/src/mongo/s/d_split.cpp b/src/mongo/s/d_split.cpp index 977fe575e55..d0350b51072 100644 --- a/src/mongo/s/d_split.cpp +++ b/src/mongo/s/d_split.cpp @@ -614,9 +614,11 @@ public: // Get sharding state up-to-date // + ShardingState* shardingState = ShardingState::get(txn); + // This could be the first call that enables sharding - make sure we initialize the // sharding state for this shard. - if (!shardingState.enabled()) { + if (!shardingState->enabled()) { if (cmdObj["configdb"].type() != String) { errmsg = "sharding not enabled"; warning() << errmsg << endl; @@ -624,16 +626,15 @@ public: } const string configdb = cmdObj["configdb"].String(); - shardingState.initialize(configdb); + shardingState->initialize(configdb); } // Initialize our current shard name in the shard state if needed - shardingState.gotShardName(shardName); + shardingState->gotShardName(shardName); - ConnectionString configLoc = - ConnectionString::parse(shardingState.getConfigServer(), errmsg); - if (!configLoc.isValid()) { - warning() << errmsg; + auto configLocStatus = ConnectionString::parse(shardingState->getConfigServer()); + if (!configLocStatus.isOK()) { + warning() << configLocStatus.getStatus(); return false; } @@ -657,7 +658,8 @@ public: // Always check our version remotely ChunkVersion shardVersion; - Status refreshStatus = shardingState.refreshMetadataNow(txn, ns, &shardVersion); + Status refreshStatus = ShardingState::get(getGlobalServiceContext()) + ->refreshMetadataNow(txn, ns, &shardVersion); if (!refreshStatus.isOK()) { errmsg = str::stream() << "splitChunk cannot split chunk " @@ -696,7 +698,7 @@ public: // Get collection metadata const std::shared_ptr<CollectionMetadata> collMetadata( - shardingState.getCollectionMetadata(ns)); + ShardingState::get(getGlobalServiceContext())->getCollectionMetadata(ns)); // With nonzero shard version, we must have metadata invariant(NULL != collMetadata); @@ -832,12 +834,14 @@ public: // other chunk version, so it's also implicitly the newCollVersion ChunkVersion newShardVersion = collVersion; - // Increment the minor version once, shardingState.splitChunk increments once + // Increment the minor version once, + // ShardingState::get(getGlobalServiceContext())->splitChunk increments once // per split point (resulting in the correct final shard/collection version) // TODO: Revisit this interface, it's a bit clunky newShardVersion.incMinor(); - shardingState.splitChunk(txn, ns, min, max, splitKeys, newShardVersion); + ShardingState::get(getGlobalServiceContext()) + ->splitChunk(txn, ns, min, max, splitKeys, newShardVersion); } // diff --git a/src/mongo/s/d_state.cpp b/src/mongo/s/d_state.cpp index 81422a68d7c..ecf692f1617 100644 --- a/src/mongo/s/d_state.cpp +++ b/src/mongo/s/d_state.cpp @@ -91,10 +91,10 @@ public: bool haveLocalShardingInfo(Client* client, const string& ns) { - if (!shardingState.enabled()) + if (!ShardingState::get(client->getServiceContext())->enabled()) return false; - if (!shardingState.hasVersion(ns)) + if (!ShardingState::get(client->getServiceContext())->hasVersion(ns)) return false; return ShardedConnectionInfo::get(client, false) != NULL; @@ -170,15 +170,16 @@ public: return false; } - if (shardingState.enabled()) { - if (configdb == shardingState.getConfigServer()) + if (ShardingState::get(txn)->enabled()) { + if (configdb == ShardingState::get(txn)->getConfigServer()) return true; result.append("configdb", - BSON("stored" << shardingState.getConfigServer() << "given" << configdb)); + BSON("stored" << ShardingState::get(txn)->getConfigServer() << "given" + << configdb)); errmsg = str::stream() << "mongos specified a different config database string : " - << "stored : " << shardingState.getConfigServer() + << "stored : " << ShardingState::get(txn)->getConfigServer() << " vs given : " << configdb; return false; } @@ -190,7 +191,7 @@ public: } if (locked) { - shardingState.initialize(configdb); + ShardingState::get(txn)->initialize(configdb); return true; } @@ -240,7 +241,8 @@ public: if (cmdObj["shard"].type() == String) { // The shard host is also sent when using setShardVersion, report this host if there // is an error. - shardingState.gotShardNameAndHost(cmdObj["shard"].String(), cmdObj["shardHost"].str()); + ShardingState::get(txn) + ->gotShardNameAndHost(cmdObj["shard"].String(), cmdObj["shardHost"].str()); } // Handle initial shard connection @@ -280,7 +282,7 @@ public: // step 3 const ChunkVersion oldVersion = info->getVersion(ns); - const ChunkVersion globalVersion = shardingState.getVersion(ns); + const ChunkVersion globalVersion = ShardingState::get(txn)->getVersion(ns); oldVersion.addToBSON(result, "oldVersion"); @@ -332,9 +334,9 @@ public: // TODO: Refactor all of this if (version < globalVersion && version.hasEqualEpoch(globalVersion)) { - while (shardingState.inCriticalMigrateSection()) { + while (ShardingState::get(txn)->inCriticalMigrateSection()) { log() << "waiting till out of critical section"; - shardingState.waitTillNotInCriticalSection(10); + ShardingState::get(txn)->waitTillNotInCriticalSection(10); } errmsg = str::stream() << "shard global version for collection is higher " << "than trying to set to '" << ns << "'"; @@ -348,9 +350,9 @@ public: if (!globalVersion.isSet() && !authoritative) { // Needed b/c when the last chunk is moved off a shard, // the version gets reset to zero, which should require a reload. - while (shardingState.inCriticalMigrateSection()) { + while (ShardingState::get(txn)->inCriticalMigrateSection()) { log() << "waiting till out of critical section"; - shardingState.waitTillNotInCriticalSection(10); + ShardingState::get(txn)->waitTillNotInCriticalSection(10); } // need authoritative for first look @@ -364,7 +366,8 @@ public: } ChunkVersion currVersion; - Status status = shardingState.refreshMetadataIfNeeded(txn, ns, version, &currVersion); + Status status = + ShardingState::get(txn)->refreshMetadataIfNeeded(txn, ns, version, &currVersion); if (!status.isOK()) { // The reload itself was interrupted or confused here @@ -459,13 +462,13 @@ public: return false; } - if (shardingState.enabled()) { - result.append("configServer", shardingState.getConfigServer()); + if (ShardingState::get(txn)->enabled()) { + result.append("configServer", ShardingState::get(txn)->getConfigServer()); } else { result.append("configServer", ""); } - result.appendTimestamp("global", shardingState.getVersion(ns).toLong()); + result.appendTimestamp("global", ShardingState::get(txn)->getVersion(ns).toLong()); ShardedConnectionInfo* const info = ShardedConnectionInfo::get(txn->getClient(), false); result.appendBool("inShardedMode", info != NULL); @@ -476,7 +479,8 @@ public: } if (cmdObj["fullMetadata"].trueValue()) { - shared_ptr<CollectionMetadata> metadata = shardingState.getCollectionMetadata(ns); + shared_ptr<CollectionMetadata> metadata = + ShardingState::get(txn)->getCollectionMetadata(ns); if (metadata) { result.append("metadata", metadata->toBSON()); } else { @@ -515,7 +519,7 @@ public: Lock::DBLock dbXLock(txn->lockState(), dbname, MODE_X); OldClientContext ctx(txn, dbname); - shardingState.appendInfo(result); + ShardingState::get(txn)->appendInfo(result); return true; } @@ -530,8 +534,10 @@ static bool shardVersionOk(Client* client, string& errmsg, ChunkVersion& received, ChunkVersion& wanted) { - if (!shardingState.enabled()) + ShardingState* shardingState = ShardingState::get(client->getServiceContext()); + if (!shardingState->enabled()) { return true; + } if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesForDatabase(nsToDatabase(ns))) { // right now connections to secondaries aren't versioned at all @@ -559,7 +565,7 @@ static bool shardVersionOk(Client* client, return true; } - wanted = shardingState.getVersion(ns); + wanted = shardingState->getVersion(ns); if (received.isWriteCompatibleWith(wanted)) return true; @@ -602,9 +608,7 @@ static bool shardVersionOk(Client* client, } // Those are all the reasons the versions can mismatch - verify(false); - - return false; + MONGO_UNREACHABLE; } void ensureShardVersionOKOrThrow(Client* client, const std::string& ns) { |