diff options
author | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2016-03-10 09:51:50 -0500 |
---|---|---|
committer | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2016-03-10 11:30:07 -0500 |
commit | 44d8a4dd0f8f27b72e2040e2bde74c552739eb23 (patch) | |
tree | 97a8934b6dd2a39a2bccfe0b479182dab789ea3f /src/mongo | |
parent | 6efa681435ec30467ca88edc449b241bb2c326bf (diff) | |
download | mongo-44d8a4dd0f8f27b72e2040e2bde74c552739eb23.tar.gz |
SERVER-22359 Move ensureShardVersionOkOrThrow to CollectionShardingState
This ensures that we will have assertions in place for the correct locks
being held.
Diffstat (limited to 'src/mongo')
22 files changed, 285 insertions, 316 deletions
diff --git a/src/mongo/db/commands/find_and_modify.cpp b/src/mongo/db/commands/find_and_modify.cpp index 20fd3df1466..0657d640e77 100644 --- a/src/mongo/db/commands/find_and_modify.cpp +++ b/src/mongo/db/commands/find_and_modify.cpp @@ -61,8 +61,8 @@ #include "mongo/db/query/plan_executor.h" #include "mongo/db/repl/repl_client_info.h" #include "mongo/db/repl/replication_coordinator_global.h" +#include "mongo/db/s/collection_sharding_state.h" #include "mongo/db/write_concern.h" -#include "mongo/s/d_state.h" #include "mongo/util/log.h" #include "mongo/util/scopeguard.h" @@ -257,19 +257,18 @@ public: // Explain calls of the findAndModify command are read-only, but we take write // locks so that the timing information is more accurate. - AutoGetDb autoDb(txn, dbName, MODE_IX); - Lock::CollectionLock collLock(txn->lockState(), nsString.ns(), MODE_IX); - - ensureShardVersionOKOrThrow(txn, nsString.ns()); - - Collection* collection = nullptr; - if (autoDb.getDb()) { - collection = autoDb.getDb()->getCollection(nsString.ns()); - } else { + AutoGetCollection autoColl(txn, nsString, MODE_IX); + if (!autoColl.getDb()) { return {ErrorCodes::NamespaceNotFound, str::stream() << "database " << dbName << " does not exist."}; } + auto css = CollectionShardingState::get(txn, nsString); + if (css) { + css->checkShardVersionOrThrow(txn); + } + + Collection* const collection = autoColl.getCollection(); auto statusWithPlanExecutor = getExecutorDelete(txn, collection, &parsedDelete); if (!statusWithPlanExecutor.isOK()) { return statusWithPlanExecutor.getStatus(); @@ -293,19 +292,18 @@ public: // Explain calls of the findAndModify command are read-only, but we take write // locks so that the timing information is more accurate. - AutoGetDb autoDb(txn, dbName, MODE_IX); - Lock::CollectionLock collLock(txn->lockState(), nsString.ns(), MODE_IX); - - ensureShardVersionOKOrThrow(txn, nsString.ns()); - - Collection* collection = nullptr; - if (autoDb.getDb()) { - collection = autoDb.getDb()->getCollection(nsString.ns()); - } else { + AutoGetCollection autoColl(txn, nsString, MODE_IX); + if (!autoColl.getDb()) { return {ErrorCodes::NamespaceNotFound, str::stream() << "database " << dbName << " does not exist."}; } + auto css = CollectionShardingState::get(txn, nsString); + if (css) { + css->checkShardVersionOrThrow(txn); + } + + Collection* collection = autoColl.getCollection(); auto statusWithPlanExecutor = getExecutorUpdate(txn, collection, &parsedUpdate, opDebug); if (!statusWithPlanExecutor.isOK()) { @@ -376,7 +374,6 @@ public: AutoGetOrCreateDb autoDb(txn, dbName, MODE_IX); Lock::CollectionLock collLock(txn->lockState(), nsString.ns(), MODE_IX); - Collection* collection = autoDb.getDb()->getCollection(nsString.ns()); // Attach the namespace and database profiling level to the current op. { @@ -385,13 +382,17 @@ public: ->enter_inlock(nsString.ns().c_str(), autoDb.getDb()->getProfilingLevel()); } - ensureShardVersionOKOrThrow(txn, nsString.ns()); + auto css = CollectionShardingState::get(txn, nsString); + if (css) { + css->checkShardVersionOrThrow(txn); + } Status isPrimary = checkCanAcceptWritesForDatabase(nsString); if (!isPrimary.isOK()) { return appendCommandStatus(result, isPrimary); } + Collection* const collection = autoDb.getDb()->getCollection(nsString.ns()); auto statusWithPlanExecutor = getExecutorDelete(txn, collection, &parsedDelete); if (!statusWithPlanExecutor.isOK()) { return appendCommandStatus(result, statusWithPlanExecutor.getStatus()); @@ -435,7 +436,6 @@ public: AutoGetOrCreateDb autoDb(txn, dbName, MODE_IX); Lock::CollectionLock collLock(txn->lockState(), nsString.ns(), MODE_IX); - Collection* collection = autoDb.getDb()->getCollection(nsString.ns()); // Attach the namespace and database profiling level to the current op. { @@ -444,13 +444,18 @@ public: ->enter_inlock(nsString.ns().c_str(), autoDb.getDb()->getProfilingLevel()); } - ensureShardVersionOKOrThrow(txn, nsString.ns()); + auto css = CollectionShardingState::get(txn, nsString); + if (css) { + css->checkShardVersionOrThrow(txn); + } Status isPrimary = checkCanAcceptWritesForDatabase(nsString); if (!isPrimary.isOK()) { return appendCommandStatus(result, isPrimary); } + Collection* collection = autoDb.getDb()->getCollection(nsString.ns()); + // Create the collection if it does not exist when performing an upsert // because the update stage does not create its own collection. if (!collection && args.isUpsert()) { diff --git a/src/mongo/db/commands/mr.cpp b/src/mongo/db/commands/mr.cpp index 593ae3dddd4..33291845269 100644 --- a/src/mongo/db/commands/mr.cpp +++ b/src/mongo/db/commands/mr.cpp @@ -70,7 +70,6 @@ #include "mongo/s/chunk_manager.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/config.h" -#include "mongo/s/d_state.h" #include "mongo/s/grid.h" #include "mongo/s/shard_key_pattern.h" #include "mongo/s/stale_exception.h" diff --git a/src/mongo/db/db_raii.cpp b/src/mongo/db/db_raii.cpp index b0da72c5bb8..162017fc0f4 100644 --- a/src/mongo/db/db_raii.cpp +++ b/src/mongo/db/db_raii.cpp @@ -36,8 +36,8 @@ #include "mongo/db/client.h" #include "mongo/db/curop.h" #include "mongo/db/repl/replication_coordinator_global.h" +#include "mongo/db/s/collection_sharding_state.h" #include "mongo/db/stats/top.h" -#include "mongo/s/d_state.h" namespace mongo { @@ -46,9 +46,15 @@ AutoGetDb::AutoGetDb(OperationContext* txn, StringData ns, LockMode mode) AutoGetCollection::AutoGetCollection(OperationContext* txn, const NamespaceString& nss, - LockMode mode) - : _autoDb(txn, nss.db(), mode), - _collLock(txn->lockState(), nss.ns(), mode), + LockMode modeAll) + : AutoGetCollection(txn, nss, modeAll, modeAll) {} + +AutoGetCollection::AutoGetCollection(OperationContext* txn, + const NamespaceString& nss, + LockMode modeDB, + LockMode modeColl) + : _autoDb(txn, nss.db(), modeDB), + _collLock(txn->lockState(), nss.ns(), modeColl), _coll(_autoDb.getDb() ? _autoDb.getDb()->getCollection(nss) : nullptr) {} AutoGetOrCreateDb::AutoGetOrCreateDb(OperationContext* txn, StringData ns, LockMode mode) @@ -95,7 +101,10 @@ AutoGetCollectionForRead::AutoGetCollectionForRead(OperationContext* txn, // We have both the DB and collection locked, which is the prerequisite to do a stable shard // version check, but we'd like to do the check after we have a satisfactory snapshot. - ensureShardVersionOKOrThrow(_txn, nss.ns()); + auto css = CollectionShardingState::get(txn, nss); + if (css) { + css->checkShardVersionOrThrow(txn); + } } AutoGetCollectionForRead::~AutoGetCollectionForRead() { @@ -186,7 +195,10 @@ void OldClientContext::_checkNotStale() const { case dbDelete: // here as well. break; default: - ensureShardVersionOKOrThrow(_txn, _ns); + auto css = CollectionShardingState::get(_txn, _ns); + if (css) { + css->checkShardVersionOrThrow(_txn); + } } } diff --git a/src/mongo/db/db_raii.h b/src/mongo/db/db_raii.h index 948cf014c90..654996b676c 100644 --- a/src/mongo/db/db_raii.h +++ b/src/mongo/db/db_raii.h @@ -74,7 +74,12 @@ class AutoGetCollection { MONGO_DISALLOW_COPYING(AutoGetCollection); public: - AutoGetCollection(OperationContext* txn, const NamespaceString& nss, LockMode mode); + AutoGetCollection(OperationContext* txn, const NamespaceString& nss, LockMode modeAll); + + AutoGetCollection(OperationContext* txn, + const NamespaceString& nss, + LockMode modeDB, + LockMode modeColl); Database* getDb() const { return _autoDb.getDb(); diff --git a/src/mongo/db/exec/idhack.cpp b/src/mongo/db/exec/idhack.cpp index affe2e4dc50..759ca91153f 100644 --- a/src/mongo/db/exec/idhack.cpp +++ b/src/mongo/db/exec/idhack.cpp @@ -38,7 +38,6 @@ #include "mongo/db/exec/working_set_computed_data.h" #include "mongo/db/index/btree_access_method.h" #include "mongo/db/storage/record_fetcher.h" -#include "mongo/s/d_state.h" #include "mongo/stdx/memory.h" namespace mongo { diff --git a/src/mongo/db/pipeline/document_source_cursor.cpp b/src/mongo/db/pipeline/document_source_cursor.cpp index c91c7fb2110..803799d8774 100644 --- a/src/mongo/db/pipeline/document_source_cursor.cpp +++ b/src/mongo/db/pipeline/document_source_cursor.cpp @@ -30,7 +30,6 @@ #include "mongo/db/pipeline/document_source.h" - #include "mongo/db/catalog/database_holder.h" #include "mongo/db/db_raii.h" #include "mongo/db/exec/working_set_common.h" @@ -39,7 +38,6 @@ #include "mongo/db/query/explain.h" #include "mongo/db/query/find_common.h" #include "mongo/db/storage/storage_options.h" -#include "mongo/s/d_state.h" namespace mongo { diff --git a/src/mongo/db/query/get_executor.cpp b/src/mongo/db/query/get_executor.cpp index faea115ca02..ade024378e6 100644 --- a/src/mongo/db/query/get_executor.cpp +++ b/src/mongo/db/query/get_executor.cpp @@ -75,6 +75,7 @@ #include "mongo/db/server_parameters.h" #include "mongo/db/service_context.h" #include "mongo/db/s/collection_metadata.h" +#include "mongo/db/s/collection_sharding_state.h" #include "mongo/db/s/sharding_state.h" #include "mongo/db/storage/storage_options.h" #include "mongo/db/storage/oplog_hack.h" @@ -85,7 +86,6 @@ namespace mongo { using std::unique_ptr; -using std::endl; using std::string; using std::vector; using stdx::make_unique; @@ -174,7 +174,7 @@ void fillOutPlannerParams(OperationContext* txn, // If the caller wants a shard filter, make sure we're actually sharded. if (plannerParams->options & QueryPlannerParams::INCLUDE_SHARD_FILTER) { std::shared_ptr<CollectionMetadata> collMetadata = - ShardingState::get(txn)->getCollectionMetadata(canonicalQuery->ns()); + CollectionShardingState::get(txn, canonicalQuery->nss())->getMetadata(); if (collMetadata) { plannerParams->shardKey = collMetadata->getKeyPattern(); } else { @@ -259,7 +259,7 @@ Status prepareExecution(OperationContext* opCtx, if (plannerParams.options & QueryPlannerParams::INCLUDE_SHARD_FILTER) { *rootOut = new ShardFilterStage( opCtx, - ShardingState::get(opCtx)->getCollectionMetadata(collection->ns().ns()), + CollectionShardingState::get(opCtx, canonicalQuery->nss())->getMetadata(), ws, *rootOut); } @@ -648,7 +648,7 @@ StatusWith<unique_ptr<PlanExecutor>> getExecutorDelete(OperationContext* txn, 12050, "cannot delete from system namespace", legalClientSystemNS(nss.ns(), true)); } if (nss.ns().find('$') != string::npos) { - log() << "cannot delete from collection with reserved $ in name: " << nss << endl; + log() << "cannot delete from collection with reserved $ in name: " << nss; uasserted(10100, "cannot delete from collection with reserved $ in name"); } } diff --git a/src/mongo/db/range_deleter_db_env.cpp b/src/mongo/db/range_deleter_db_env.cpp index 0a4a6734913..4132dfde863 100644 --- a/src/mongo/db/range_deleter_db_env.cpp +++ b/src/mongo/db/range_deleter_db_env.cpp @@ -42,7 +42,6 @@ #include "mongo/db/repl/replication_coordinator_global.h" #include "mongo/db/s/operation_sharding_state.h" #include "mongo/db/write_concern_options.h" -#include "mongo/s/d_state.h" #include "mongo/util/log.h" namespace mongo { diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index d371b7d6040..dcada5fd460 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -84,7 +84,6 @@ #include "mongo/db/storage/storage_options.h" #include "mongo/db/storage/storage_engine.h" #include "mongo/platform/random.h" -#include "mongo/s/d_state.h" #include "mongo/scripting/engine.h" #include "mongo/stdx/memory.h" #include "mongo/util/elapsed_tracker.h" diff --git a/src/mongo/db/s/collection_sharding_state.cpp b/src/mongo/db/s/collection_sharding_state.cpp index 9eec8de16e4..e0fa35d4da2 100644 --- a/src/mongo/db/s/collection_sharding_state.cpp +++ b/src/mongo/db/s/collection_sharding_state.cpp @@ -32,15 +32,21 @@ #include "mongo/db/s/collection_sharding_state.h" +#include "mongo/db/client.h" #include "mongo/db/concurrency/lock_state.h" #include "mongo/db/operation_context.h" +#include "mongo/db/repl/replication_coordinator_global.h" #include "mongo/db/s/collection_metadata.h" #include "mongo/db/s/operation_sharding_state.h" +#include "mongo/db/s/sharded_connection_info.h" #include "mongo/db/s/sharding_state.h" #include "mongo/s/chunk_version.h" +#include "mongo/s/stale_exception.h" namespace mongo { +using std::string; + CollectionShardingState::CollectionShardingState( NamespaceString nss, std::unique_ptr<CollectionMetadata> initialMetadata) : _nss(std::move(nss)), _metadata(std::move(initialMetadata)) {} @@ -62,10 +68,22 @@ CollectionShardingState* CollectionShardingState::get(OperationContext* txn, } void CollectionShardingState::setMetadata(std::shared_ptr<CollectionMetadata> newMetadata) { - invariant(newMetadata); _metadata = std::move(newMetadata); } +void CollectionShardingState::checkShardVersionOrThrow(OperationContext* txn) const { + string errmsg; + ChunkVersion received; + ChunkVersion wanted; + if (!_checkShardVersionOk(txn, &errmsg, &received, &wanted)) { + throw SendStaleConfigException(_nss.ns(), + str::stream() << "[" << _nss.ns() + << "] shard version not ok: " << errmsg, + received, + wanted); + } +} + bool CollectionShardingState::isDocumentInMigratingChunk(OperationContext* txn, const BSONObj& doc) { dassert(txn->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX)); @@ -76,6 +94,8 @@ bool CollectionShardingState::isDocumentInMigratingChunk(OperationContext* txn, void CollectionShardingState::onInsertOp(OperationContext* txn, const BSONObj& insertedDoc) { dassert(txn->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX)); + checkShardVersionOrThrow(txn); + ShardingState::get(txn)->migrationSourceManager()->logInsertOp( txn, _nss.ns().c_str(), insertedDoc); } @@ -83,6 +103,8 @@ void CollectionShardingState::onInsertOp(OperationContext* txn, const BSONObj& i void CollectionShardingState::onUpdateOp(OperationContext* txn, const BSONObj& updatedDoc) { dassert(txn->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX)); + checkShardVersionOrThrow(txn); + ShardingState::get(txn)->migrationSourceManager()->logUpdateOp( txn, _nss.ns().c_str(), updatedDoc); } @@ -90,8 +112,90 @@ void CollectionShardingState::onUpdateOp(OperationContext* txn, const BSONObj& u void CollectionShardingState::onDeleteOp(OperationContext* txn, const BSONObj& deletedDocId) { dassert(txn->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX)); + checkShardVersionOrThrow(txn); + ShardingState::get(txn)->migrationSourceManager()->logDeleteOp( txn, _nss.ns().c_str(), deletedDocId); } +bool CollectionShardingState::_checkShardVersionOk(OperationContext* txn, + string* errmsg, + ChunkVersion* expectedShardVersion, + ChunkVersion* actualShardVersion) const { + Client* client = txn->getClient(); + + // Operations using the DBDirectClient are unversioned. + if (client->isInDirectClient()) { + return true; + } + + if (!repl::ReplicationCoordinator::get(txn)->canAcceptWritesForDatabase(_nss.db())) { + // right now connections to secondaries aren't versioned at all + return true; + } + + const auto& oss = OperationShardingState::get(txn); + + // If there is a version attached to the OperationContext, use it as the received version. + // Otherwise, get the received version from the ShardedConnectionInfo. + if (oss.hasShardVersion()) { + *expectedShardVersion = oss.getShardVersion(_nss); + } else { + ShardedConnectionInfo* info = ShardedConnectionInfo::get(client, false); + if (!info) { + // There is no shard version information on either 'txn' or 'client'. This means that + // the operation represented by 'txn' is unversioned, and the shard version is always OK + // for unversioned operations. + return true; + } + + *expectedShardVersion = info->getVersion(_nss.ns()); + } + + if (ChunkVersion::isIgnoredVersion(*expectedShardVersion)) { + return true; + } + + *actualShardVersion = (_metadata ? _metadata->getShardVersion() : ChunkVersion::UNSHARDED()); + + if (expectedShardVersion->isWriteCompatibleWith(*actualShardVersion)) { + return true; + } + + // + // Figure out exactly why not compatible, send appropriate error message + // The versions themselves are returned in the error, so not needed in messages here + // + + // Check epoch first, to send more meaningful message, since other parameters probably won't + // match either. + if (actualShardVersion->epoch() != expectedShardVersion->epoch()) { + *errmsg = str::stream() << "version epoch mismatch detected for " << _nss.ns() << ", " + << "the collection may have been dropped and recreated"; + return false; + } + + if (!actualShardVersion->isSet() && expectedShardVersion->isSet()) { + *errmsg = str::stream() << "this shard no longer contains chunks for " << _nss.ns() << ", " + << "the collection may have been dropped"; + return false; + } + + if (actualShardVersion->isSet() && !expectedShardVersion->isSet()) { + *errmsg = str::stream() << "this shard contains versioned chunks for " << _nss.ns() << ", " + << "but no version set in request"; + return false; + } + + if (actualShardVersion->majorVersion() != expectedShardVersion->majorVersion()) { + // Could be > or < - wanted is > if this is the source of a migration, wanted < if this is + // the target of a migration + *errmsg = str::stream() << "version mismatch detected for " << _nss.ns(); + return false; + } + + // Those are all the reasons the versions can mismatch + MONGO_UNREACHABLE; +} + } // namespace mongo diff --git a/src/mongo/db/s/collection_sharding_state.h b/src/mongo/db/s/collection_sharding_state.h index 73f076fb37b..f9f3255597b 100644 --- a/src/mongo/db/s/collection_sharding_state.h +++ b/src/mongo/db/s/collection_sharding_state.h @@ -37,6 +37,7 @@ namespace mongo { class BSONObj; +struct ChunkVersion; class CollectionMetadata; class OperationContext; @@ -81,6 +82,17 @@ public: */ void setMetadata(std::shared_ptr<CollectionMetadata> newMetadata); + /** + * Checks whether the shard version in the context is compatible with the shard version of the + * collection locally and if not throws SendStaleConfigException populated with the expected and + * actual versions. + * + * Because SendStaleConfigException has special semantics in terms of how a sharded command's + * response is constructed, this function should be the only means of checking for shard version + * match. + */ + void checkShardVersionOrThrow(OperationContext* txn) const; + // Replication subsystem hooks. If this collection is serving as a source for migration, these // methods inform it of any changes to its contents. @@ -93,10 +105,29 @@ public: void onDeleteOp(OperationContext* txn, const BSONObj& deletedDocId); private: + /** + * Checks whether the shard version of the operation matches that of the collection. + * + * txn - Operation context from which to retrieve the operation's expected version. + * errmsg (out) - On false return contains an explanatory error message. + * expectedShardVersion (out) - On false return contains the expected collection version on this + * shard. Obtained from the operation sharding state. + * actualShardVersion (out) - On false return contains the actual collection version on this + * shard. Obtained from the collection sharding state. + * + * Returns true if the expected collection version on the shard matches its actual version on + * the shard and false otherwise. Upon false return, the output parameters will be set. + */ + bool _checkShardVersionOk(OperationContext* txn, + std::string* errmsg, + ChunkVersion* expectedShardVersion, + ChunkVersion* actualShardVersion) const; + // Namespace to which this state belongs. const NamespaceString _nss; - // Contains all the chunks associated with this collection. This value is always non-null. + // Contains all the chunks associated with this collection. This value will be null if the + // collection is not sharded. std::shared_ptr<CollectionMetadata> _metadata; }; diff --git a/src/mongo/db/s/migration_source_manager.cpp b/src/mongo/db/s/migration_source_manager.cpp index 58d12719867..c9dacc78491 100644 --- a/src/mongo/db/s/migration_source_manager.cpp +++ b/src/mongo/db/s/migration_source_manager.cpp @@ -47,7 +47,6 @@ #include "mongo/db/record_id.h" #include "mongo/logger/ramlog.h" #include "mongo/s/chunk.h" -#include "mongo/s/d_state.h" #include "mongo/s/shard_key_pattern.h" #include "mongo/util/elapsed_tracker.h" #include "mongo/util/log.h" @@ -224,8 +223,6 @@ void MigrationSourceManager::done(OperationContext* txn) { void MigrationSourceManager::logInsertOp(OperationContext* txn, const char* ns, const BSONObj& obj) { - ensureShardVersionOKOrThrow(txn, ns); - dassert(txn->lockState()->isWriteLocked()); // Must have Global IX. if (!_sessionId || (_nss != ns)) @@ -249,8 +246,6 @@ void MigrationSourceManager::logInsertOp(OperationContext* txn, void MigrationSourceManager::logUpdateOp(OperationContext* txn, const char* ns, const BSONObj& updatedDoc) { - ensureShardVersionOKOrThrow(txn, ns); - dassert(txn->lockState()->isWriteLocked()); // Must have Global IX. if (!_sessionId || (_nss != ns)) @@ -274,8 +269,6 @@ void MigrationSourceManager::logUpdateOp(OperationContext* txn, void MigrationSourceManager::logDeleteOp(OperationContext* txn, const char* ns, const BSONObj& obj) { - ensureShardVersionOKOrThrow(txn, ns); - dassert(txn->lockState()->isWriteLocked()); // Must have Global IX. BSONElement idElement = obj["_id"]; diff --git a/src/mongo/db/s/sharding_state.cpp b/src/mongo/db/s/sharding_state.cpp index a125a8b155d..775104d4e6b 100644 --- a/src/mongo/db/s/sharding_state.cpp +++ b/src/mongo/db/s/sharding_state.cpp @@ -36,7 +36,7 @@ #include "mongo/client/connection_string.h" #include "mongo/client/replica_set_monitor.h" #include "mongo/db/client.h" -#include "mongo/db/concurrency/lock_state.h" +#include "mongo/db/db_raii.h" #include "mongo/db/dbhelpers.h" #include "mongo/db/operation_context.h" #include "mongo/db/repl/optime.h" @@ -218,7 +218,10 @@ CollectionShardingState* ShardingState::getNS(const std::string& ns) { stdx::lock_guard<stdx::mutex> lk(_mutex); CollectionShardingStateMap::iterator it = _collections.find(ns); if (it == _collections.end()) { - return nullptr; + auto inserted = _collections.insert(make_pair( + ns, stdx::make_unique<CollectionShardingState>(NamespaceString(ns), nullptr))); + invariant(inserted.second); + it = std::move(inserted.first); } return it->second.get(); @@ -229,21 +232,22 @@ void ShardingState::clearCollectionMetadata() { _collections.clear(); } -// TODO we shouldn't need three ways for checking the version. Fix this. -bool ShardingState::hasVersion(const string& ns) { - stdx::lock_guard<stdx::mutex> lk(_mutex); - return !!_collections.count(ns); -} - ChunkVersion ShardingState::getVersion(const string& ns) { - stdx::lock_guard<stdx::mutex> lk(_mutex); - CollectionShardingStateMap::const_iterator it = _collections.find(ns); - if (it != _collections.end()) { - shared_ptr<CollectionMetadata> p = it->second->getMetadata(); + shared_ptr<CollectionMetadata> p; + + { + stdx::lock_guard<stdx::mutex> lk(_mutex); + CollectionShardingStateMap::const_iterator it = _collections.find(ns); + if (it != _collections.end()) { + p = it->second->getMetadata(); + } + } + + if (p) { return p->getShardVersion(); - } else { - return ChunkVersion(0, 0, OID()); } + + return ChunkVersion::UNSHARDED(); } void ShardingState::donateChunk(OperationContext* txn, @@ -618,24 +622,16 @@ Status ShardingState::_refreshMetadata(OperationContext* txn, const ChunkVersion& reqShardVersion, bool useRequestedVersion, ChunkVersion* latestShardVersion) { + invariant(!txn->lockState()->isLocked()); + Status status = _waitForInitialization(txn); if (!status.isOK()) return status; - // The idea here is that we're going to reload the metadata from the config server, but - // we need to do so outside any locks. When we get our result back, if the current metadata - // has changed, we may not be able to install the new metadata. - - // - // Get the initial metadata - // No DBLock is needed since the metadata is expected to change during reload. - // - - shared_ptr<CollectionMetadata> beforeMetadata; - + // We can't reload if a shard name has not yet been set { stdx::lock_guard<stdx::mutex> lk(_mutex); - // We also can't reload if a shard name has not yet been set. + if (_shardName.empty()) { string errMsg = str::stream() << "cannot refresh metadata for " << ns << " before shard name has been set"; @@ -643,15 +639,24 @@ Status ShardingState::_refreshMetadata(OperationContext* txn, warning() << errMsg; return Status(ErrorCodes::NotYetInitialized, errMsg); } + } - CollectionShardingStateMap::iterator it = _collections.find(ns); - if (it != _collections.end()) { - beforeMetadata = it->second->getMetadata(); - } + const NamespaceString nss(ns); + + // The idea here is that we're going to reload the metadata from the config server, but we need + // to do so outside any locks. When we get our result back, if the current metadata has + // changed, we may not be able to install the new metadata. + shared_ptr<CollectionMetadata> beforeMetadata; + { + ScopedTransaction transaction(txn, MODE_IS); + AutoGetCollection autoColl(txn, nss, MODE_IS); + + beforeMetadata = CollectionShardingState::get(txn, nss)->getMetadata(); } ChunkVersion beforeShardVersion; ChunkVersion beforeCollVersion; + if (beforeMetadata) { beforeShardVersion = beforeMetadata->getShardVersion(); beforeCollVersion = beforeMetadata->getCollVersion(); @@ -696,7 +701,7 @@ Status ShardingState::_refreshMetadata(OperationContext* txn, grid.catalogManager(txn), ns, getShardName(), - fullReload ? NULL : beforeMetadata.get(), + fullReload ? nullptr : beforeMetadata.get(), remoteMetadata.get()); refreshMillis = refreshTimer.millis(); @@ -739,29 +744,11 @@ Status ShardingState::_refreshMetadata(OperationContext* txn, // Exclusive collection lock needed since we're now potentially changing the metadata, // and don't want reads/writes to be ongoing. ScopedTransaction transaction(txn, MODE_IX); - Lock::DBLock dbLock(txn->lockState(), nsToDatabaseSubstring(ns), MODE_IX); - Lock::CollectionLock collLock(txn->lockState(), ns, MODE_X); + AutoGetCollection autoColl(txn, nss, MODE_IX, MODE_X); - // // Get the metadata now that the load has completed - // - - // Don't reload if our config server has changed or sharding is no longer enabled - if (!enabled()) { - string errMsg = str::stream() << "could not refresh metadata for " << ns - << ", sharding is no longer enabled"; - - warning() << errMsg; - return Status(ErrorCodes::NotYetInitialized, errMsg); - } - - stdx::lock_guard<stdx::mutex> lk(_mutex); - - CollectionShardingStateMap::iterator it = _collections.find(ns); - if (it != _collections.end()) { - afterMetadata = it->second->getMetadata(); - } - + auto css = CollectionShardingState::get(txn, nss); + afterMetadata = css->getMetadata(); if (afterMetadata) { afterShardVersion = afterMetadata->getShardVersion(); afterCollVersion = afterMetadata->getCollVersion(); @@ -774,7 +761,6 @@ Status ShardingState::_refreshMetadata(OperationContext* txn, // Status status = mdLoader.promotePendingChunks(afterMetadata.get(), remoteMetadata.get()); - if (!status.isOK()) { warning() << "remote metadata for " << ns << " is inconsistent with current pending chunks" @@ -797,31 +783,22 @@ Status ShardingState::_refreshMetadata(OperationContext* txn, if (!afterCollVersion.epoch().isSet()) { // First metadata load installType = InstallType_New; - dassert(it == _collections.end()); - _collections.insert(make_pair(ns, - stdx::make_unique<CollectionShardingState>( - NamespaceString(ns), std::move(remoteMetadata)))); + css->setMetadata(std::move(remoteMetadata)); } else if (remoteCollVersion.epoch().isSet() && remoteCollVersion.epoch() == afterCollVersion.epoch()) { // Update to existing metadata installType = InstallType_Update; - - // Invariant: If CollMetadata was not found, version should be have been 0. - dassert(it != _collections.end()); - it->second->setMetadata(std::move(remoteMetadata)); + css->setMetadata(std::move(remoteMetadata)); } else if (remoteCollVersion.epoch().isSet()) { // New epoch detected, replacing metadata installType = InstallType_Replace; - - // Invariant: If CollMetadata was not found, version should be have been 0. - dassert(it != _collections.end()); - it->second->setMetadata(std::move(remoteMetadata)); + css->setMetadata(std::move(remoteMetadata)); } else { dassert(!remoteCollVersion.epoch().isSet()); // Drop detected installType = InstallType_Drop; - _collections.erase(it); + css->setMetadata(nullptr); } *latestShardVersion = remoteShardVersion; @@ -906,15 +883,17 @@ void ShardingState::appendInfo(OperationContext* txn, BSONObjBuilder& builder) { it != _collections.end(); ++it) { shared_ptr<CollectionMetadata> metadata = it->second->getMetadata(); - versionB.appendTimestamp(it->first, metadata->getShardVersion().toLong()); + if (metadata) { + versionB.appendTimestamp(it->first, metadata->getShardVersion().toLong()); + } else { + versionB.appendTimestamp(it->first, ChunkVersion::UNSHARDED().toLong()); + } } versionB.done(); } bool ShardingState::needCollectionMetadata(OperationContext* txn, const string& ns) { - stdx::lock_guard<stdx::mutex> lk(_mutex); - if (!enabled()) return false; diff --git a/src/mongo/db/s/sharding_state.h b/src/mongo/db/s/sharding_state.h index 2383660ae16..19dc1be2978 100644 --- a/src/mongo/db/s/sharding_state.h +++ b/src/mongo/db/s/sharding_state.h @@ -132,8 +132,6 @@ public: */ void clearCollectionMetadata(); - bool hasVersion(const std::string& ns); - ChunkVersion getVersion(const std::string& ns); /** diff --git a/src/mongo/s/commands/commands_public.cpp b/src/mongo/s/commands/commands_public.cpp index b51f0e6d29d..6c177a05d59 100644 --- a/src/mongo/s/commands/commands_public.cpp +++ b/src/mongo/s/commands/commands_public.cpp @@ -908,17 +908,17 @@ public: auto status = grid.catalogCache()->getDatabase(txn, nss.db().toString()); if (!status.isOK()) { return Status(status.getStatus().code(), - stream() << "Passthrough command failed: " << command.toString() - << " on ns " << nss.ns() << ". Caused by " - << causedBy(status.getStatus())); + str::stream() << "Passthrough command failed: " << command.toString() + << " on ns " << nss.ns() << ". Caused by " + << causedBy(status.getStatus())); } shared_ptr<DBConfig> conf = status.getValue(); if (conf->isSharded(nss.ns())) { return Status(ErrorCodes::IllegalOperation, - stream() << "Passthrough command failed: " << command.toString() - << " on ns " << nss.ns() - << ". Cannot run on sharded namespace."); + str::stream() << "Passthrough command failed: " << command.toString() + << " on ns " << nss.ns() + << ". Cannot run on sharded namespace."); } const auto primaryShard = grid.shardRegistry()->getShard(txn, conf->getPrimaryId()); @@ -931,8 +931,9 @@ public: if (!conn->runCommand(nss.db().toString(), command, shardResult, options)) { conn.done(); return Status(ErrorCodes::OperationFailed, - stream() << "Passthrough command failed: " << command << " on ns " - << nss.ns() << "; result: " << shardResult); + str::stream() << "Passthrough command failed: " << command + << " on ns " << nss.ns() + << "; result: " << shardResult); } conn.done(); } catch (const DBException& ex) { diff --git a/src/mongo/s/d_state.cpp b/src/mongo/s/d_state.cpp index bf2e483b271..af47f7027a1 100644 --- a/src/mongo/s/d_state.cpp +++ b/src/mongo/s/d_state.cpp @@ -37,18 +37,15 @@ #include "mongo/db/auth/authorization_manager.h" #include "mongo/db/auth/authorization_session.h" #include "mongo/db/auth/privilege.h" -#include "mongo/db/client.h" #include "mongo/db/commands.h" #include "mongo/db/db_raii.h" #include "mongo/db/jsobj.h" #include "mongo/db/operation_context.h" -#include "mongo/db/repl/replication_coordinator_global.h" #include "mongo/db/s/collection_metadata.h" #include "mongo/db/s/collection_sharding_state.h" #include "mongo/db/s/operation_sharding_state.h" #include "mongo/db/s/sharded_connection_info.h" #include "mongo/db/s/sharding_state.h" -#include "mongo/s/stale_exception.h" #include "mongo/util/log.h" #include "mongo/util/stringutils.h" @@ -57,7 +54,6 @@ namespace mongo { using std::shared_ptr; using std::string; using std::stringstream; -using std::vector; namespace { @@ -224,130 +220,23 @@ public: } shardingStateCmd; -/** - * @ return true if not in sharded mode - or if version for this client is ok - */ -bool shardVersionOk(OperationContext* txn, - const string& ns, - string& errmsg, - ChunkVersion& received, - ChunkVersion& wanted) { - Client* client = txn->getClient(); - - // Operations using the DBDirectClient are unversioned. - if (client->isInDirectClient()) { - return true; - } - - ShardingState* shardingState = ShardingState::get(client->getServiceContext()); - if (!shardingState->enabled()) { - return true; - } - - if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesForDatabase(nsToDatabase(ns))) { - // right now connections to secondaries aren't versioned at all - return true; - } - - auto& oss = OperationShardingState::get(txn); - - // If there is a version attached to the OperationContext, use it as the received version. - // Otherwise, get the received version from the ShardedConnectionInfo. - if (oss.hasShardVersion()) { - received = oss.getShardVersion(NamespaceString(ns)); - } else { - ShardedConnectionInfo* info = ShardedConnectionInfo::get(client, false); - if (!info) { - // There is no shard version information on either 'txn' or 'client'. This means that - // the operation represented by 'txn' is unversioned, and the shard version is always OK - // for unversioned operations. - return true; - } - - received = info->getVersion(ns); - } - - if (ChunkVersion::isIgnoredVersion(received)) { - return true; - } - - wanted = shardingState->getVersion(ns); - - if (received.isWriteCompatibleWith(wanted)) - return true; - - // - // Figure out exactly why not compatible, send appropriate error message - // The versions themselves are returned in the error, so not needed in messages here - // - - // Check epoch first, to send more meaningful message, since other parameters probably - // won't match either - if (!wanted.hasEqualEpoch(received)) { - errmsg = str::stream() << "version epoch mismatch detected for " << ns << ", " - << "the collection may have been dropped and recreated"; - return false; - } - - if (!wanted.isSet() && received.isSet()) { - errmsg = str::stream() << "this shard no longer contains chunks for " << ns << ", " - << "the collection may have been dropped"; - return false; - } +} // namespace - if (wanted.isSet() && !received.isSet()) { - errmsg = str::stream() << "this shard contains versioned chunks for " << ns << ", " - << "but no version set in request"; +bool haveLocalShardingInfo(OperationContext* txn, const string& ns) { + if (!ShardingState::get(txn)->enabled()) { return false; } - if (wanted.majorVersion() != received.majorVersion()) { - // - // Could be > or < - wanted is > if this is the source of a migration, - // wanted < if this is the target of a migration - // - - errmsg = str::stream() << "version mismatch detected for " << ns << ", " - << "stored major version " << wanted.majorVersion() - << " does not match received " << received.majorVersion(); + auto css = CollectionShardingState::get(txn, ns); + if (!css->getMetadata()) { return false; } - // Those are all the reasons the versions can mismatch - MONGO_UNREACHABLE; -} - -} // namespace - -bool haveLocalShardingInfo(Client* client, const string& ns) { - if (!ShardingState::get(client->getServiceContext())->enabled()) - return false; - - if (!ShardingState::get(client->getServiceContext())->hasVersion(ns)) - return false; - - return ShardedConnectionInfo::get(client, false) != NULL; -} - -void ensureShardVersionOKOrThrow(OperationContext* txn, const std::string& ns) { - string errmsg; - ChunkVersion received; - ChunkVersion wanted; - if (!shardVersionOk(txn, ns, errmsg, received, wanted)) { - StringBuilder sb; - sb << "[" << ns << "] shard version not ok: " << errmsg; - throw SendStaleConfigException(ns, sb.str(), received, wanted); - } + return ShardedConnectionInfo::get(txn->getClient(), false) != nullptr; } void usingAShardConnection(const string& addr) {} -void saveGLEStats(const BSONObj& result, StringData hostString) { - // Declared in cluster_last_error_info.h. - // - // This can be called in mongod, which is unfortunate. To fix this, - // we can redesign how connection pooling works on mongod for sharded operations. -} +void saveGLEStats(const BSONObj& result, StringData hostString) {} } // namespace mongo diff --git a/src/mongo/s/d_state.h b/src/mongo/s/d_state.h index e2fd9cf79e5..5583d053d55 100644 --- a/src/mongo/s/d_state.h +++ b/src/mongo/s/d_state.h @@ -26,36 +26,17 @@ * then also delete it in the license file. */ - #pragma once #include <string> namespace mongo { -class BSONObj; -class Client; class OperationContext; -class ShardedConnectionInfo; -class NamespaceString; - -// ----------------- -// --- core --- -// ----------------- /** * @return true if we have any shard info for the ns */ -bool haveLocalShardingInfo(Client* client, const std::string& ns); +bool haveLocalShardingInfo(OperationContext* txn, const std::string& ns); -/** - * Validates whether the shard chunk version for the specified collection is up to date and if - * not, throws SendStaleConfigException. - * - * It is important (but not enforced) that method be called with the collection locked in at - * least IS mode in order to ensure that the shard version won't change. - * - * @param ns Complete collection namespace to be cheched. - */ -void ensureShardVersionOKOrThrow(OperationContext* txn, const std::string& ns); -} +} // namespace mongo diff --git a/src/mongo/s/stale_exception.h b/src/mongo/s/stale_exception.h index 8769da77bbe..90d6e0060e6 100644 --- a/src/mongo/s/stale_exception.h +++ b/src/mongo/s/stale_exception.h @@ -35,10 +35,8 @@ namespace mongo { -using mongoutils::str::stream; - /** - * Thrown whenever your config info for a given shard/chunk is out of date. + * Thrown whenever the config info for a given shard/chunk is out of date. */ class StaleConfigException : public AssertionException { public: @@ -48,9 +46,9 @@ public: ChunkVersion received, ChunkVersion wanted) : AssertionException( - stream() << raw << " ( ns : " << ns << ", received : " << received.toString() - << ", wanted : " << wanted.toString() << ", " - << (code == ErrorCodes::SendStaleConfig ? "send" : "recv") << " )", + str::stream() << raw << " ( ns : " << ns << ", received : " << received.toString() + << ", wanted : " << wanted.toString() << ", " + << (code == ErrorCodes::SendStaleConfig ? "send" : "recv") << " )", code), _ns(ns), _received(received), @@ -58,22 +56,24 @@ public: /** Preferred if we're rebuilding this from a thrown exception */ StaleConfigException(const std::string& raw, int code, const BSONObj& error) - : AssertionException( - stream() << raw - << " ( ns : " << (error["ns"].type() == String ? error["ns"].String() - : std::string("<unknown>")) - << ", received : " << ChunkVersion::fromBSON(error, "vReceived").toString() - << ", wanted : " << ChunkVersion::fromBSON(error, "vWanted").toString() - << ", " << (code == ErrorCodes::SendStaleConfig ? "send" : "recv") << " )", - code), + : AssertionException(str::stream() + << raw << " ( ns : " << (error["ns"].type() == String + ? error["ns"].String() + : std::string("<unknown>")) + << ", received : " + << ChunkVersion::fromBSON(error, "vReceived").toString() + << ", wanted : " + << ChunkVersion::fromBSON(error, "vWanted").toString() << ", " + << (code == ErrorCodes::SendStaleConfig ? "send" : "recv") << " )", + code), // For legacy reasons, we may not always get a namespace here _ns(error["ns"].type() == String ? error["ns"].String() : ""), _received(ChunkVersion::fromBSON(error, "vReceived")), _wanted(ChunkVersion::fromBSON(error, "vWanted")) {} /** - * Needs message so when we trace all exceptions on construction we get a useful - * message + * TODO: This constructor is only necessary, because ParallelSortClusteredCursor puts per-host + * stale config exceptions in a map and this requires a default constructor. */ StaleConfigException() : AssertionException("initializing empty stale config exception object", 0) {} @@ -88,26 +88,6 @@ public: return _ns; } - /** - * true if this exception would require a full reload of config data to resolve - */ - bool requiresFullReload() const { - return !_received.hasEqualEpoch(_wanted) || _received.isSet() != _wanted.isSet(); - } - - static bool parse(const std::string& big, std::string& ns, std::string& raw) { - std::string::size_type start = big.find('['); - if (start == std::string::npos) - return false; - std::string::size_type end = big.find(']', start); - if (end == std::string::npos) - return false; - - ns = big.substr(start + 1, (end - start) - 1); - raw = big.substr(end + 1); - return true; - } - ChunkVersion getVersionReceived() const { return _received; } @@ -116,14 +96,11 @@ public: return _wanted; } - StaleConfigException& operator=(const StaleConfigException& elem) { - this->_ei.msg = elem._ei.msg; - this->_ei.code = elem._ei.code; - this->_ns = elem._ns; - this->_received = elem._received; - this->_wanted = elem._wanted; - - return *this; + /** + * Returns true if this exception would require a full reload of config data to resolve. + */ + bool requiresFullReload() const { + return !_received.hasEqualEpoch(_wanted) || _received.isSet() != _wanted.isSet(); } private: diff --git a/src/mongo/scripting/mozjs/db.cpp b/src/mongo/scripting/mozjs/db.cpp index 0a107ba3953..97112595037 100644 --- a/src/mongo/scripting/mozjs/db.cpp +++ b/src/mongo/scripting/mozjs/db.cpp @@ -59,8 +59,7 @@ void DBInfo::getProperty(JSContext* cx, if (o.hasOwnField(InternedString::_fullName)) { // need to check every time that the collection did not get sharded - if (haveLocalShardingInfo(opContext->getClient(), - o.getString(InternedString::_fullName))) + if (haveLocalShardingInfo(opContext, o.getString(InternedString::_fullName))) uasserted(ErrorCodes::BadValue, "can't use sharded collection from db.eval"); } } diff --git a/src/mongo/scripting/mozjs/dbcollection.cpp b/src/mongo/scripting/mozjs/dbcollection.cpp index 2cc03a752e1..8d094721f8f 100644 --- a/src/mongo/scripting/mozjs/dbcollection.cpp +++ b/src/mongo/scripting/mozjs/dbcollection.cpp @@ -75,8 +75,9 @@ void DBCollectionInfo::construct(JSContext* cx, JS::CallArgs args) { std::string fullName = ValueWriter(cx, args.get(3)).toString(); auto context = scope->getOpContext(); - if (context && haveLocalShardingInfo(context->getClient(), fullName)) + if (context && haveLocalShardingInfo(context, fullName)) { uasserted(ErrorCodes::BadValue, "can't use sharded collection from db.eval"); + } args.rval().setObjectOrNull(thisv); } diff --git a/src/mongo/shell/clientAndShell.cpp b/src/mongo/shell/clientAndShell.cpp index 2f55cef17aa..abbb03be656 100644 --- a/src/mongo/shell/clientAndShell.cpp +++ b/src/mongo/shell/clientAndShell.cpp @@ -70,7 +70,7 @@ bool inShutdown() { return dbexitCalled; } -bool haveLocalShardingInfo(Client* client, const string& ns) { +bool haveLocalShardingInfo(OperationContext* txn, const string& ns) { return false; } diff --git a/src/mongo/unittest/crutch.cpp b/src/mongo/unittest/crutch.cpp index 282c03538c4..61297a0f48c 100644 --- a/src/mongo/unittest/crutch.cpp +++ b/src/mongo/unittest/crutch.cpp @@ -54,7 +54,7 @@ DBClientBase* createDirectClient(OperationContext* txn) { return NULL; } -bool haveLocalShardingInfo(Client* client, const std::string& ns) { +bool haveLocalShardingInfo(OperationContext* txn, const std::string& ns) { return false; } |