diff options
Diffstat (limited to 'src/mongo/db')
-rw-r--r-- | src/mongo/db/commands/find_and_modify.cpp | 51 | ||||
-rw-r--r-- | src/mongo/db/commands/mr.cpp | 1 | ||||
-rw-r--r-- | src/mongo/db/db_raii.cpp | 24 | ||||
-rw-r--r-- | src/mongo/db/db_raii.h | 7 | ||||
-rw-r--r-- | src/mongo/db/exec/idhack.cpp | 1 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_cursor.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/query/get_executor.cpp | 8 | ||||
-rw-r--r-- | src/mongo/db/range_deleter_db_env.cpp | 1 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog.cpp | 1 | ||||
-rw-r--r-- | src/mongo/db/s/collection_sharding_state.cpp | 106 | ||||
-rw-r--r-- | src/mongo/db/s/collection_sharding_state.h | 33 | ||||
-rw-r--r-- | src/mongo/db/s/migration_source_manager.cpp | 7 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_state.cpp | 117 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_state.h | 2 |
14 files changed, 241 insertions, 120 deletions
diff --git a/src/mongo/db/commands/find_and_modify.cpp b/src/mongo/db/commands/find_and_modify.cpp index 20fd3df1466..0657d640e77 100644 --- a/src/mongo/db/commands/find_and_modify.cpp +++ b/src/mongo/db/commands/find_and_modify.cpp @@ -61,8 +61,8 @@ #include "mongo/db/query/plan_executor.h" #include "mongo/db/repl/repl_client_info.h" #include "mongo/db/repl/replication_coordinator_global.h" +#include "mongo/db/s/collection_sharding_state.h" #include "mongo/db/write_concern.h" -#include "mongo/s/d_state.h" #include "mongo/util/log.h" #include "mongo/util/scopeguard.h" @@ -257,19 +257,18 @@ public: // Explain calls of the findAndModify command are read-only, but we take write // locks so that the timing information is more accurate. - AutoGetDb autoDb(txn, dbName, MODE_IX); - Lock::CollectionLock collLock(txn->lockState(), nsString.ns(), MODE_IX); - - ensureShardVersionOKOrThrow(txn, nsString.ns()); - - Collection* collection = nullptr; - if (autoDb.getDb()) { - collection = autoDb.getDb()->getCollection(nsString.ns()); - } else { + AutoGetCollection autoColl(txn, nsString, MODE_IX); + if (!autoColl.getDb()) { return {ErrorCodes::NamespaceNotFound, str::stream() << "database " << dbName << " does not exist."}; } + auto css = CollectionShardingState::get(txn, nsString); + if (css) { + css->checkShardVersionOrThrow(txn); + } + + Collection* const collection = autoColl.getCollection(); auto statusWithPlanExecutor = getExecutorDelete(txn, collection, &parsedDelete); if (!statusWithPlanExecutor.isOK()) { return statusWithPlanExecutor.getStatus(); @@ -293,19 +292,18 @@ public: // Explain calls of the findAndModify command are read-only, but we take write // locks so that the timing information is more accurate. - AutoGetDb autoDb(txn, dbName, MODE_IX); - Lock::CollectionLock collLock(txn->lockState(), nsString.ns(), MODE_IX); - - ensureShardVersionOKOrThrow(txn, nsString.ns()); - - Collection* collection = nullptr; - if (autoDb.getDb()) { - collection = autoDb.getDb()->getCollection(nsString.ns()); - } else { + AutoGetCollection autoColl(txn, nsString, MODE_IX); + if (!autoColl.getDb()) { return {ErrorCodes::NamespaceNotFound, str::stream() << "database " << dbName << " does not exist."}; } + auto css = CollectionShardingState::get(txn, nsString); + if (css) { + css->checkShardVersionOrThrow(txn); + } + + Collection* collection = autoColl.getCollection(); auto statusWithPlanExecutor = getExecutorUpdate(txn, collection, &parsedUpdate, opDebug); if (!statusWithPlanExecutor.isOK()) { @@ -376,7 +374,6 @@ public: AutoGetOrCreateDb autoDb(txn, dbName, MODE_IX); Lock::CollectionLock collLock(txn->lockState(), nsString.ns(), MODE_IX); - Collection* collection = autoDb.getDb()->getCollection(nsString.ns()); // Attach the namespace and database profiling level to the current op. { @@ -385,13 +382,17 @@ public: ->enter_inlock(nsString.ns().c_str(), autoDb.getDb()->getProfilingLevel()); } - ensureShardVersionOKOrThrow(txn, nsString.ns()); + auto css = CollectionShardingState::get(txn, nsString); + if (css) { + css->checkShardVersionOrThrow(txn); + } Status isPrimary = checkCanAcceptWritesForDatabase(nsString); if (!isPrimary.isOK()) { return appendCommandStatus(result, isPrimary); } + Collection* const collection = autoDb.getDb()->getCollection(nsString.ns()); auto statusWithPlanExecutor = getExecutorDelete(txn, collection, &parsedDelete); if (!statusWithPlanExecutor.isOK()) { return appendCommandStatus(result, statusWithPlanExecutor.getStatus()); @@ -435,7 +436,6 @@ public: AutoGetOrCreateDb autoDb(txn, dbName, MODE_IX); Lock::CollectionLock collLock(txn->lockState(), nsString.ns(), MODE_IX); - Collection* collection = autoDb.getDb()->getCollection(nsString.ns()); // Attach the namespace and database profiling level to the current op. { @@ -444,13 +444,18 @@ public: ->enter_inlock(nsString.ns().c_str(), autoDb.getDb()->getProfilingLevel()); } - ensureShardVersionOKOrThrow(txn, nsString.ns()); + auto css = CollectionShardingState::get(txn, nsString); + if (css) { + css->checkShardVersionOrThrow(txn); + } Status isPrimary = checkCanAcceptWritesForDatabase(nsString); if (!isPrimary.isOK()) { return appendCommandStatus(result, isPrimary); } + Collection* collection = autoDb.getDb()->getCollection(nsString.ns()); + // Create the collection if it does not exist when performing an upsert // because the update stage does not create its own collection. if (!collection && args.isUpsert()) { diff --git a/src/mongo/db/commands/mr.cpp b/src/mongo/db/commands/mr.cpp index 593ae3dddd4..33291845269 100644 --- a/src/mongo/db/commands/mr.cpp +++ b/src/mongo/db/commands/mr.cpp @@ -70,7 +70,6 @@ #include "mongo/s/chunk_manager.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/config.h" -#include "mongo/s/d_state.h" #include "mongo/s/grid.h" #include "mongo/s/shard_key_pattern.h" #include "mongo/s/stale_exception.h" diff --git a/src/mongo/db/db_raii.cpp b/src/mongo/db/db_raii.cpp index b0da72c5bb8..162017fc0f4 100644 --- a/src/mongo/db/db_raii.cpp +++ b/src/mongo/db/db_raii.cpp @@ -36,8 +36,8 @@ #include "mongo/db/client.h" #include "mongo/db/curop.h" #include "mongo/db/repl/replication_coordinator_global.h" +#include "mongo/db/s/collection_sharding_state.h" #include "mongo/db/stats/top.h" -#include "mongo/s/d_state.h" namespace mongo { @@ -46,9 +46,15 @@ AutoGetDb::AutoGetDb(OperationContext* txn, StringData ns, LockMode mode) AutoGetCollection::AutoGetCollection(OperationContext* txn, const NamespaceString& nss, - LockMode mode) - : _autoDb(txn, nss.db(), mode), - _collLock(txn->lockState(), nss.ns(), mode), + LockMode modeAll) + : AutoGetCollection(txn, nss, modeAll, modeAll) {} + +AutoGetCollection::AutoGetCollection(OperationContext* txn, + const NamespaceString& nss, + LockMode modeDB, + LockMode modeColl) + : _autoDb(txn, nss.db(), modeDB), + _collLock(txn->lockState(), nss.ns(), modeColl), _coll(_autoDb.getDb() ? _autoDb.getDb()->getCollection(nss) : nullptr) {} AutoGetOrCreateDb::AutoGetOrCreateDb(OperationContext* txn, StringData ns, LockMode mode) @@ -95,7 +101,10 @@ AutoGetCollectionForRead::AutoGetCollectionForRead(OperationContext* txn, // We have both the DB and collection locked, which is the prerequisite to do a stable shard // version check, but we'd like to do the check after we have a satisfactory snapshot. - ensureShardVersionOKOrThrow(_txn, nss.ns()); + auto css = CollectionShardingState::get(txn, nss); + if (css) { + css->checkShardVersionOrThrow(txn); + } } AutoGetCollectionForRead::~AutoGetCollectionForRead() { @@ -186,7 +195,10 @@ void OldClientContext::_checkNotStale() const { case dbDelete: // here as well. break; default: - ensureShardVersionOKOrThrow(_txn, _ns); + auto css = CollectionShardingState::get(_txn, _ns); + if (css) { + css->checkShardVersionOrThrow(_txn); + } } } diff --git a/src/mongo/db/db_raii.h b/src/mongo/db/db_raii.h index 948cf014c90..654996b676c 100644 --- a/src/mongo/db/db_raii.h +++ b/src/mongo/db/db_raii.h @@ -74,7 +74,12 @@ class AutoGetCollection { MONGO_DISALLOW_COPYING(AutoGetCollection); public: - AutoGetCollection(OperationContext* txn, const NamespaceString& nss, LockMode mode); + AutoGetCollection(OperationContext* txn, const NamespaceString& nss, LockMode modeAll); + + AutoGetCollection(OperationContext* txn, + const NamespaceString& nss, + LockMode modeDB, + LockMode modeColl); Database* getDb() const { return _autoDb.getDb(); diff --git a/src/mongo/db/exec/idhack.cpp b/src/mongo/db/exec/idhack.cpp index affe2e4dc50..759ca91153f 100644 --- a/src/mongo/db/exec/idhack.cpp +++ b/src/mongo/db/exec/idhack.cpp @@ -38,7 +38,6 @@ #include "mongo/db/exec/working_set_computed_data.h" #include "mongo/db/index/btree_access_method.h" #include "mongo/db/storage/record_fetcher.h" -#include "mongo/s/d_state.h" #include "mongo/stdx/memory.h" namespace mongo { diff --git a/src/mongo/db/pipeline/document_source_cursor.cpp b/src/mongo/db/pipeline/document_source_cursor.cpp index c91c7fb2110..803799d8774 100644 --- a/src/mongo/db/pipeline/document_source_cursor.cpp +++ b/src/mongo/db/pipeline/document_source_cursor.cpp @@ -30,7 +30,6 @@ #include "mongo/db/pipeline/document_source.h" - #include "mongo/db/catalog/database_holder.h" #include "mongo/db/db_raii.h" #include "mongo/db/exec/working_set_common.h" @@ -39,7 +38,6 @@ #include "mongo/db/query/explain.h" #include "mongo/db/query/find_common.h" #include "mongo/db/storage/storage_options.h" -#include "mongo/s/d_state.h" namespace mongo { diff --git a/src/mongo/db/query/get_executor.cpp b/src/mongo/db/query/get_executor.cpp index faea115ca02..ade024378e6 100644 --- a/src/mongo/db/query/get_executor.cpp +++ b/src/mongo/db/query/get_executor.cpp @@ -75,6 +75,7 @@ #include "mongo/db/server_parameters.h" #include "mongo/db/service_context.h" #include "mongo/db/s/collection_metadata.h" +#include "mongo/db/s/collection_sharding_state.h" #include "mongo/db/s/sharding_state.h" #include "mongo/db/storage/storage_options.h" #include "mongo/db/storage/oplog_hack.h" @@ -85,7 +86,6 @@ namespace mongo { using std::unique_ptr; -using std::endl; using std::string; using std::vector; using stdx::make_unique; @@ -174,7 +174,7 @@ void fillOutPlannerParams(OperationContext* txn, // If the caller wants a shard filter, make sure we're actually sharded. if (plannerParams->options & QueryPlannerParams::INCLUDE_SHARD_FILTER) { std::shared_ptr<CollectionMetadata> collMetadata = - ShardingState::get(txn)->getCollectionMetadata(canonicalQuery->ns()); + CollectionShardingState::get(txn, canonicalQuery->nss())->getMetadata(); if (collMetadata) { plannerParams->shardKey = collMetadata->getKeyPattern(); } else { @@ -259,7 +259,7 @@ Status prepareExecution(OperationContext* opCtx, if (plannerParams.options & QueryPlannerParams::INCLUDE_SHARD_FILTER) { *rootOut = new ShardFilterStage( opCtx, - ShardingState::get(opCtx)->getCollectionMetadata(collection->ns().ns()), + CollectionShardingState::get(opCtx, canonicalQuery->nss())->getMetadata(), ws, *rootOut); } @@ -648,7 +648,7 @@ StatusWith<unique_ptr<PlanExecutor>> getExecutorDelete(OperationContext* txn, 12050, "cannot delete from system namespace", legalClientSystemNS(nss.ns(), true)); } if (nss.ns().find('$') != string::npos) { - log() << "cannot delete from collection with reserved $ in name: " << nss << endl; + log() << "cannot delete from collection with reserved $ in name: " << nss; uasserted(10100, "cannot delete from collection with reserved $ in name"); } } diff --git a/src/mongo/db/range_deleter_db_env.cpp b/src/mongo/db/range_deleter_db_env.cpp index 0a4a6734913..4132dfde863 100644 --- a/src/mongo/db/range_deleter_db_env.cpp +++ b/src/mongo/db/range_deleter_db_env.cpp @@ -42,7 +42,6 @@ #include "mongo/db/repl/replication_coordinator_global.h" #include "mongo/db/s/operation_sharding_state.h" #include "mongo/db/write_concern_options.h" -#include "mongo/s/d_state.h" #include "mongo/util/log.h" namespace mongo { diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index d371b7d6040..dcada5fd460 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -84,7 +84,6 @@ #include "mongo/db/storage/storage_options.h" #include "mongo/db/storage/storage_engine.h" #include "mongo/platform/random.h" -#include "mongo/s/d_state.h" #include "mongo/scripting/engine.h" #include "mongo/stdx/memory.h" #include "mongo/util/elapsed_tracker.h" diff --git a/src/mongo/db/s/collection_sharding_state.cpp b/src/mongo/db/s/collection_sharding_state.cpp index 9eec8de16e4..e0fa35d4da2 100644 --- a/src/mongo/db/s/collection_sharding_state.cpp +++ b/src/mongo/db/s/collection_sharding_state.cpp @@ -32,15 +32,21 @@ #include "mongo/db/s/collection_sharding_state.h" +#include "mongo/db/client.h" #include "mongo/db/concurrency/lock_state.h" #include "mongo/db/operation_context.h" +#include "mongo/db/repl/replication_coordinator_global.h" #include "mongo/db/s/collection_metadata.h" #include "mongo/db/s/operation_sharding_state.h" +#include "mongo/db/s/sharded_connection_info.h" #include "mongo/db/s/sharding_state.h" #include "mongo/s/chunk_version.h" +#include "mongo/s/stale_exception.h" namespace mongo { +using std::string; + CollectionShardingState::CollectionShardingState( NamespaceString nss, std::unique_ptr<CollectionMetadata> initialMetadata) : _nss(std::move(nss)), _metadata(std::move(initialMetadata)) {} @@ -62,10 +68,22 @@ CollectionShardingState* CollectionShardingState::get(OperationContext* txn, } void CollectionShardingState::setMetadata(std::shared_ptr<CollectionMetadata> newMetadata) { - invariant(newMetadata); _metadata = std::move(newMetadata); } +void CollectionShardingState::checkShardVersionOrThrow(OperationContext* txn) const { + string errmsg; + ChunkVersion received; + ChunkVersion wanted; + if (!_checkShardVersionOk(txn, &errmsg, &received, &wanted)) { + throw SendStaleConfigException(_nss.ns(), + str::stream() << "[" << _nss.ns() + << "] shard version not ok: " << errmsg, + received, + wanted); + } +} + bool CollectionShardingState::isDocumentInMigratingChunk(OperationContext* txn, const BSONObj& doc) { dassert(txn->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX)); @@ -76,6 +94,8 @@ bool CollectionShardingState::isDocumentInMigratingChunk(OperationContext* txn, void CollectionShardingState::onInsertOp(OperationContext* txn, const BSONObj& insertedDoc) { dassert(txn->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX)); + checkShardVersionOrThrow(txn); + ShardingState::get(txn)->migrationSourceManager()->logInsertOp( txn, _nss.ns().c_str(), insertedDoc); } @@ -83,6 +103,8 @@ void CollectionShardingState::onInsertOp(OperationContext* txn, const BSONObj& i void CollectionShardingState::onUpdateOp(OperationContext* txn, const BSONObj& updatedDoc) { dassert(txn->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX)); + checkShardVersionOrThrow(txn); + ShardingState::get(txn)->migrationSourceManager()->logUpdateOp( txn, _nss.ns().c_str(), updatedDoc); } @@ -90,8 +112,90 @@ void CollectionShardingState::onUpdateOp(OperationContext* txn, const BSONObj& u void CollectionShardingState::onDeleteOp(OperationContext* txn, const BSONObj& deletedDocId) { dassert(txn->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX)); + checkShardVersionOrThrow(txn); + ShardingState::get(txn)->migrationSourceManager()->logDeleteOp( txn, _nss.ns().c_str(), deletedDocId); } +bool CollectionShardingState::_checkShardVersionOk(OperationContext* txn, + string* errmsg, + ChunkVersion* expectedShardVersion, + ChunkVersion* actualShardVersion) const { + Client* client = txn->getClient(); + + // Operations using the DBDirectClient are unversioned. + if (client->isInDirectClient()) { + return true; + } + + if (!repl::ReplicationCoordinator::get(txn)->canAcceptWritesForDatabase(_nss.db())) { + // right now connections to secondaries aren't versioned at all + return true; + } + + const auto& oss = OperationShardingState::get(txn); + + // If there is a version attached to the OperationContext, use it as the received version. + // Otherwise, get the received version from the ShardedConnectionInfo. + if (oss.hasShardVersion()) { + *expectedShardVersion = oss.getShardVersion(_nss); + } else { + ShardedConnectionInfo* info = ShardedConnectionInfo::get(client, false); + if (!info) { + // There is no shard version information on either 'txn' or 'client'. This means that + // the operation represented by 'txn' is unversioned, and the shard version is always OK + // for unversioned operations. + return true; + } + + *expectedShardVersion = info->getVersion(_nss.ns()); + } + + if (ChunkVersion::isIgnoredVersion(*expectedShardVersion)) { + return true; + } + + *actualShardVersion = (_metadata ? _metadata->getShardVersion() : ChunkVersion::UNSHARDED()); + + if (expectedShardVersion->isWriteCompatibleWith(*actualShardVersion)) { + return true; + } + + // + // Figure out exactly why not compatible, send appropriate error message + // The versions themselves are returned in the error, so not needed in messages here + // + + // Check epoch first, to send more meaningful message, since other parameters probably won't + // match either. + if (actualShardVersion->epoch() != expectedShardVersion->epoch()) { + *errmsg = str::stream() << "version epoch mismatch detected for " << _nss.ns() << ", " + << "the collection may have been dropped and recreated"; + return false; + } + + if (!actualShardVersion->isSet() && expectedShardVersion->isSet()) { + *errmsg = str::stream() << "this shard no longer contains chunks for " << _nss.ns() << ", " + << "the collection may have been dropped"; + return false; + } + + if (actualShardVersion->isSet() && !expectedShardVersion->isSet()) { + *errmsg = str::stream() << "this shard contains versioned chunks for " << _nss.ns() << ", " + << "but no version set in request"; + return false; + } + + if (actualShardVersion->majorVersion() != expectedShardVersion->majorVersion()) { + // Could be > or < - wanted is > if this is the source of a migration, wanted < if this is + // the target of a migration + *errmsg = str::stream() << "version mismatch detected for " << _nss.ns(); + return false; + } + + // Those are all the reasons the versions can mismatch + MONGO_UNREACHABLE; +} + } // namespace mongo diff --git a/src/mongo/db/s/collection_sharding_state.h b/src/mongo/db/s/collection_sharding_state.h index 73f076fb37b..f9f3255597b 100644 --- a/src/mongo/db/s/collection_sharding_state.h +++ b/src/mongo/db/s/collection_sharding_state.h @@ -37,6 +37,7 @@ namespace mongo { class BSONObj; +struct ChunkVersion; class CollectionMetadata; class OperationContext; @@ -81,6 +82,17 @@ public: */ void setMetadata(std::shared_ptr<CollectionMetadata> newMetadata); + /** + * Checks whether the shard version in the context is compatible with the shard version of the + * collection locally and if not throws SendStaleConfigException populated with the expected and + * actual versions. + * + * Because SendStaleConfigException has special semantics in terms of how a sharded command's + * response is constructed, this function should be the only means of checking for shard version + * match. + */ + void checkShardVersionOrThrow(OperationContext* txn) const; + // Replication subsystem hooks. If this collection is serving as a source for migration, these // methods inform it of any changes to its contents. @@ -93,10 +105,29 @@ public: void onDeleteOp(OperationContext* txn, const BSONObj& deletedDocId); private: + /** + * Checks whether the shard version of the operation matches that of the collection. + * + * txn - Operation context from which to retrieve the operation's expected version. + * errmsg (out) - On false return contains an explanatory error message. + * expectedShardVersion (out) - On false return contains the expected collection version on this + * shard. Obtained from the operation sharding state. + * actualShardVersion (out) - On false return contains the actual collection version on this + * shard. Obtained from the collection sharding state. + * + * Returns true if the expected collection version on the shard matches its actual version on + * the shard and false otherwise. Upon false return, the output parameters will be set. + */ + bool _checkShardVersionOk(OperationContext* txn, + std::string* errmsg, + ChunkVersion* expectedShardVersion, + ChunkVersion* actualShardVersion) const; + // Namespace to which this state belongs. const NamespaceString _nss; - // Contains all the chunks associated with this collection. This value is always non-null. + // Contains all the chunks associated with this collection. This value will be null if the + // collection is not sharded. std::shared_ptr<CollectionMetadata> _metadata; }; diff --git a/src/mongo/db/s/migration_source_manager.cpp b/src/mongo/db/s/migration_source_manager.cpp index 58d12719867..c9dacc78491 100644 --- a/src/mongo/db/s/migration_source_manager.cpp +++ b/src/mongo/db/s/migration_source_manager.cpp @@ -47,7 +47,6 @@ #include "mongo/db/record_id.h" #include "mongo/logger/ramlog.h" #include "mongo/s/chunk.h" -#include "mongo/s/d_state.h" #include "mongo/s/shard_key_pattern.h" #include "mongo/util/elapsed_tracker.h" #include "mongo/util/log.h" @@ -224,8 +223,6 @@ void MigrationSourceManager::done(OperationContext* txn) { void MigrationSourceManager::logInsertOp(OperationContext* txn, const char* ns, const BSONObj& obj) { - ensureShardVersionOKOrThrow(txn, ns); - dassert(txn->lockState()->isWriteLocked()); // Must have Global IX. if (!_sessionId || (_nss != ns)) @@ -249,8 +246,6 @@ void MigrationSourceManager::logInsertOp(OperationContext* txn, void MigrationSourceManager::logUpdateOp(OperationContext* txn, const char* ns, const BSONObj& updatedDoc) { - ensureShardVersionOKOrThrow(txn, ns); - dassert(txn->lockState()->isWriteLocked()); // Must have Global IX. if (!_sessionId || (_nss != ns)) @@ -274,8 +269,6 @@ void MigrationSourceManager::logUpdateOp(OperationContext* txn, void MigrationSourceManager::logDeleteOp(OperationContext* txn, const char* ns, const BSONObj& obj) { - ensureShardVersionOKOrThrow(txn, ns); - dassert(txn->lockState()->isWriteLocked()); // Must have Global IX. BSONElement idElement = obj["_id"]; diff --git a/src/mongo/db/s/sharding_state.cpp b/src/mongo/db/s/sharding_state.cpp index a125a8b155d..775104d4e6b 100644 --- a/src/mongo/db/s/sharding_state.cpp +++ b/src/mongo/db/s/sharding_state.cpp @@ -36,7 +36,7 @@ #include "mongo/client/connection_string.h" #include "mongo/client/replica_set_monitor.h" #include "mongo/db/client.h" -#include "mongo/db/concurrency/lock_state.h" +#include "mongo/db/db_raii.h" #include "mongo/db/dbhelpers.h" #include "mongo/db/operation_context.h" #include "mongo/db/repl/optime.h" @@ -218,7 +218,10 @@ CollectionShardingState* ShardingState::getNS(const std::string& ns) { stdx::lock_guard<stdx::mutex> lk(_mutex); CollectionShardingStateMap::iterator it = _collections.find(ns); if (it == _collections.end()) { - return nullptr; + auto inserted = _collections.insert(make_pair( + ns, stdx::make_unique<CollectionShardingState>(NamespaceString(ns), nullptr))); + invariant(inserted.second); + it = std::move(inserted.first); } return it->second.get(); @@ -229,21 +232,22 @@ void ShardingState::clearCollectionMetadata() { _collections.clear(); } -// TODO we shouldn't need three ways for checking the version. Fix this. -bool ShardingState::hasVersion(const string& ns) { - stdx::lock_guard<stdx::mutex> lk(_mutex); - return !!_collections.count(ns); -} - ChunkVersion ShardingState::getVersion(const string& ns) { - stdx::lock_guard<stdx::mutex> lk(_mutex); - CollectionShardingStateMap::const_iterator it = _collections.find(ns); - if (it != _collections.end()) { - shared_ptr<CollectionMetadata> p = it->second->getMetadata(); + shared_ptr<CollectionMetadata> p; + + { + stdx::lock_guard<stdx::mutex> lk(_mutex); + CollectionShardingStateMap::const_iterator it = _collections.find(ns); + if (it != _collections.end()) { + p = it->second->getMetadata(); + } + } + + if (p) { return p->getShardVersion(); - } else { - return ChunkVersion(0, 0, OID()); } + + return ChunkVersion::UNSHARDED(); } void ShardingState::donateChunk(OperationContext* txn, @@ -618,24 +622,16 @@ Status ShardingState::_refreshMetadata(OperationContext* txn, const ChunkVersion& reqShardVersion, bool useRequestedVersion, ChunkVersion* latestShardVersion) { + invariant(!txn->lockState()->isLocked()); + Status status = _waitForInitialization(txn); if (!status.isOK()) return status; - // The idea here is that we're going to reload the metadata from the config server, but - // we need to do so outside any locks. When we get our result back, if the current metadata - // has changed, we may not be able to install the new metadata. - - // - // Get the initial metadata - // No DBLock is needed since the metadata is expected to change during reload. - // - - shared_ptr<CollectionMetadata> beforeMetadata; - + // We can't reload if a shard name has not yet been set { stdx::lock_guard<stdx::mutex> lk(_mutex); - // We also can't reload if a shard name has not yet been set. + if (_shardName.empty()) { string errMsg = str::stream() << "cannot refresh metadata for " << ns << " before shard name has been set"; @@ -643,15 +639,24 @@ Status ShardingState::_refreshMetadata(OperationContext* txn, warning() << errMsg; return Status(ErrorCodes::NotYetInitialized, errMsg); } + } - CollectionShardingStateMap::iterator it = _collections.find(ns); - if (it != _collections.end()) { - beforeMetadata = it->second->getMetadata(); - } + const NamespaceString nss(ns); + + // The idea here is that we're going to reload the metadata from the config server, but we need + // to do so outside any locks. When we get our result back, if the current metadata has + // changed, we may not be able to install the new metadata. + shared_ptr<CollectionMetadata> beforeMetadata; + { + ScopedTransaction transaction(txn, MODE_IS); + AutoGetCollection autoColl(txn, nss, MODE_IS); + + beforeMetadata = CollectionShardingState::get(txn, nss)->getMetadata(); } ChunkVersion beforeShardVersion; ChunkVersion beforeCollVersion; + if (beforeMetadata) { beforeShardVersion = beforeMetadata->getShardVersion(); beforeCollVersion = beforeMetadata->getCollVersion(); @@ -696,7 +701,7 @@ Status ShardingState::_refreshMetadata(OperationContext* txn, grid.catalogManager(txn), ns, getShardName(), - fullReload ? NULL : beforeMetadata.get(), + fullReload ? nullptr : beforeMetadata.get(), remoteMetadata.get()); refreshMillis = refreshTimer.millis(); @@ -739,29 +744,11 @@ Status ShardingState::_refreshMetadata(OperationContext* txn, // Exclusive collection lock needed since we're now potentially changing the metadata, // and don't want reads/writes to be ongoing. ScopedTransaction transaction(txn, MODE_IX); - Lock::DBLock dbLock(txn->lockState(), nsToDatabaseSubstring(ns), MODE_IX); - Lock::CollectionLock collLock(txn->lockState(), ns, MODE_X); + AutoGetCollection autoColl(txn, nss, MODE_IX, MODE_X); - // // Get the metadata now that the load has completed - // - - // Don't reload if our config server has changed or sharding is no longer enabled - if (!enabled()) { - string errMsg = str::stream() << "could not refresh metadata for " << ns - << ", sharding is no longer enabled"; - - warning() << errMsg; - return Status(ErrorCodes::NotYetInitialized, errMsg); - } - - stdx::lock_guard<stdx::mutex> lk(_mutex); - - CollectionShardingStateMap::iterator it = _collections.find(ns); - if (it != _collections.end()) { - afterMetadata = it->second->getMetadata(); - } - + auto css = CollectionShardingState::get(txn, nss); + afterMetadata = css->getMetadata(); if (afterMetadata) { afterShardVersion = afterMetadata->getShardVersion(); afterCollVersion = afterMetadata->getCollVersion(); @@ -774,7 +761,6 @@ Status ShardingState::_refreshMetadata(OperationContext* txn, // Status status = mdLoader.promotePendingChunks(afterMetadata.get(), remoteMetadata.get()); - if (!status.isOK()) { warning() << "remote metadata for " << ns << " is inconsistent with current pending chunks" @@ -797,31 +783,22 @@ Status ShardingState::_refreshMetadata(OperationContext* txn, if (!afterCollVersion.epoch().isSet()) { // First metadata load installType = InstallType_New; - dassert(it == _collections.end()); - _collections.insert(make_pair(ns, - stdx::make_unique<CollectionShardingState>( - NamespaceString(ns), std::move(remoteMetadata)))); + css->setMetadata(std::move(remoteMetadata)); } else if (remoteCollVersion.epoch().isSet() && remoteCollVersion.epoch() == afterCollVersion.epoch()) { // Update to existing metadata installType = InstallType_Update; - - // Invariant: If CollMetadata was not found, version should be have been 0. - dassert(it != _collections.end()); - it->second->setMetadata(std::move(remoteMetadata)); + css->setMetadata(std::move(remoteMetadata)); } else if (remoteCollVersion.epoch().isSet()) { // New epoch detected, replacing metadata installType = InstallType_Replace; - - // Invariant: If CollMetadata was not found, version should be have been 0. - dassert(it != _collections.end()); - it->second->setMetadata(std::move(remoteMetadata)); + css->setMetadata(std::move(remoteMetadata)); } else { dassert(!remoteCollVersion.epoch().isSet()); // Drop detected installType = InstallType_Drop; - _collections.erase(it); + css->setMetadata(nullptr); } *latestShardVersion = remoteShardVersion; @@ -906,15 +883,17 @@ void ShardingState::appendInfo(OperationContext* txn, BSONObjBuilder& builder) { it != _collections.end(); ++it) { shared_ptr<CollectionMetadata> metadata = it->second->getMetadata(); - versionB.appendTimestamp(it->first, metadata->getShardVersion().toLong()); + if (metadata) { + versionB.appendTimestamp(it->first, metadata->getShardVersion().toLong()); + } else { + versionB.appendTimestamp(it->first, ChunkVersion::UNSHARDED().toLong()); + } } versionB.done(); } bool ShardingState::needCollectionMetadata(OperationContext* txn, const string& ns) { - stdx::lock_guard<stdx::mutex> lk(_mutex); - if (!enabled()) return false; diff --git a/src/mongo/db/s/sharding_state.h b/src/mongo/db/s/sharding_state.h index 2383660ae16..19dc1be2978 100644 --- a/src/mongo/db/s/sharding_state.h +++ b/src/mongo/db/s/sharding_state.h @@ -132,8 +132,6 @@ public: */ void clearCollectionMetadata(); - bool hasVersion(const std::string& ns); - ChunkVersion getVersion(const std::string& ns); /** |