diff options
author | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2016-03-10 09:51:50 -0500 |
---|---|---|
committer | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2016-03-10 11:30:07 -0500 |
commit | 44d8a4dd0f8f27b72e2040e2bde74c552739eb23 (patch) | |
tree | 97a8934b6dd2a39a2bccfe0b479182dab789ea3f /src | |
parent | 6efa681435ec30467ca88edc449b241bb2c326bf (diff) | |
download | mongo-44d8a4dd0f8f27b72e2040e2bde74c552739eb23.tar.gz |
SERVER-22359 Move ensureShardVersionOkOrThrow to CollectionShardingState
This ensures that we will have assertions in place for the correct locks
being held.
Diffstat (limited to 'src')
22 files changed, 285 insertions, 316 deletions
diff --git a/src/mongo/db/commands/find_and_modify.cpp b/src/mongo/db/commands/find_and_modify.cpp index 20fd3df1466..0657d640e77 100644 --- a/src/mongo/db/commands/find_and_modify.cpp +++ b/src/mongo/db/commands/find_and_modify.cpp @@ -61,8 +61,8 @@ #include "mongo/db/query/plan_executor.h" #include "mongo/db/repl/repl_client_info.h" #include "mongo/db/repl/replication_coordinator_global.h" +#include "mongo/db/s/collection_sharding_state.h" #include "mongo/db/write_concern.h" -#include "mongo/s/d_state.h" #include "mongo/util/log.h" #include "mongo/util/scopeguard.h" @@ -257,19 +257,18 @@ public: // Explain calls of the findAndModify command are read-only, but we take write // locks so that the timing information is more accurate. - AutoGetDb autoDb(txn, dbName, MODE_IX); - Lock::CollectionLock collLock(txn->lockState(), nsString.ns(), MODE_IX); - - ensureShardVersionOKOrThrow(txn, nsString.ns()); - - Collection* collection = nullptr; - if (autoDb.getDb()) { - collection = autoDb.getDb()->getCollection(nsString.ns()); - } else { + AutoGetCollection autoColl(txn, nsString, MODE_IX); + if (!autoColl.getDb()) { return {ErrorCodes::NamespaceNotFound, str::stream() << "database " << dbName << " does not exist."}; } + auto css = CollectionShardingState::get(txn, nsString); + if (css) { + css->checkShardVersionOrThrow(txn); + } + + Collection* const collection = autoColl.getCollection(); auto statusWithPlanExecutor = getExecutorDelete(txn, collection, &parsedDelete); if (!statusWithPlanExecutor.isOK()) { return statusWithPlanExecutor.getStatus(); @@ -293,19 +292,18 @@ public: // Explain calls of the findAndModify command are read-only, but we take write // locks so that the timing information is more accurate. - AutoGetDb autoDb(txn, dbName, MODE_IX); - Lock::CollectionLock collLock(txn->lockState(), nsString.ns(), MODE_IX); - - ensureShardVersionOKOrThrow(txn, nsString.ns()); - - Collection* collection = nullptr; - if (autoDb.getDb()) { - collection = autoDb.getDb()->getCollection(nsString.ns()); - } else { + AutoGetCollection autoColl(txn, nsString, MODE_IX); + if (!autoColl.getDb()) { return {ErrorCodes::NamespaceNotFound, str::stream() << "database " << dbName << " does not exist."}; } + auto css = CollectionShardingState::get(txn, nsString); + if (css) { + css->checkShardVersionOrThrow(txn); + } + + Collection* collection = autoColl.getCollection(); auto statusWithPlanExecutor = getExecutorUpdate(txn, collection, &parsedUpdate, opDebug); if (!statusWithPlanExecutor.isOK()) { @@ -376,7 +374,6 @@ public: AutoGetOrCreateDb autoDb(txn, dbName, MODE_IX); Lock::CollectionLock collLock(txn->lockState(), nsString.ns(), MODE_IX); - Collection* collection = autoDb.getDb()->getCollection(nsString.ns()); // Attach the namespace and database profiling level to the current op. { @@ -385,13 +382,17 @@ public: ->enter_inlock(nsString.ns().c_str(), autoDb.getDb()->getProfilingLevel()); } - ensureShardVersionOKOrThrow(txn, nsString.ns()); + auto css = CollectionShardingState::get(txn, nsString); + if (css) { + css->checkShardVersionOrThrow(txn); + } Status isPrimary = checkCanAcceptWritesForDatabase(nsString); if (!isPrimary.isOK()) { return appendCommandStatus(result, isPrimary); } + Collection* const collection = autoDb.getDb()->getCollection(nsString.ns()); auto statusWithPlanExecutor = getExecutorDelete(txn, collection, &parsedDelete); if (!statusWithPlanExecutor.isOK()) { return appendCommandStatus(result, statusWithPlanExecutor.getStatus()); @@ -435,7 +436,6 @@ public: AutoGetOrCreateDb autoDb(txn, dbName, MODE_IX); Lock::CollectionLock collLock(txn->lockState(), nsString.ns(), MODE_IX); - Collection* collection = autoDb.getDb()->getCollection(nsString.ns()); // Attach the namespace and database profiling level to the current op. { @@ -444,13 +444,18 @@ public: ->enter_inlock(nsString.ns().c_str(), autoDb.getDb()->getProfilingLevel()); } - ensureShardVersionOKOrThrow(txn, nsString.ns()); + auto css = CollectionShardingState::get(txn, nsString); + if (css) { + css->checkShardVersionOrThrow(txn); + } Status isPrimary = checkCanAcceptWritesForDatabase(nsString); if (!isPrimary.isOK()) { return appendCommandStatus(result, isPrimary); } + Collection* collection = autoDb.getDb()->getCollection(nsString.ns()); + // Create the collection if it does not exist when performing an upsert // because the update stage does not create its own collection. if (!collection && args.isUpsert()) { diff --git a/src/mongo/db/commands/mr.cpp b/src/mongo/db/commands/mr.cpp index 593ae3dddd4..33291845269 100644 --- a/src/mongo/db/commands/mr.cpp +++ b/src/mongo/db/commands/mr.cpp @@ -70,7 +70,6 @@ #include "mongo/s/chunk_manager.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/config.h" -#include "mongo/s/d_state.h" #include "mongo/s/grid.h" #include "mongo/s/shard_key_pattern.h" #include "mongo/s/stale_exception.h" diff --git a/src/mongo/db/db_raii.cpp b/src/mongo/db/db_raii.cpp index b0da72c5bb8..162017fc0f4 100644 --- a/src/mongo/db/db_raii.cpp +++ b/src/mongo/db/db_raii.cpp @@ -36,8 +36,8 @@ #include "mongo/db/client.h" #include "mongo/db/curop.h" #include "mongo/db/repl/replication_coordinator_global.h" +#include "mongo/db/s/collection_sharding_state.h" #include "mongo/db/stats/top.h" -#include "mongo/s/d_state.h" namespace mongo { @@ -46,9 +46,15 @@ AutoGetDb::AutoGetDb(OperationContext* txn, StringData ns, LockMode mode) AutoGetCollection::AutoGetCollection(OperationContext* txn, const NamespaceString& nss, - LockMode mode) - : _autoDb(txn, nss.db(), mode), - _collLock(txn->lockState(), nss.ns(), mode), + LockMode modeAll) + : AutoGetCollection(txn, nss, modeAll, modeAll) {} + +AutoGetCollection::AutoGetCollection(OperationContext* txn, + const NamespaceString& nss, + LockMode modeDB, + LockMode modeColl) + : _autoDb(txn, nss.db(), modeDB), + _collLock(txn->lockState(), nss.ns(), modeColl), _coll(_autoDb.getDb() ? _autoDb.getDb()->getCollection(nss) : nullptr) {} AutoGetOrCreateDb::AutoGetOrCreateDb(OperationContext* txn, StringData ns, LockMode mode) @@ -95,7 +101,10 @@ AutoGetCollectionForRead::AutoGetCollectionForRead(OperationContext* txn, // We have both the DB and collection locked, which is the prerequisite to do a stable shard // version check, but we'd like to do the check after we have a satisfactory snapshot. - ensureShardVersionOKOrThrow(_txn, nss.ns()); + auto css = CollectionShardingState::get(txn, nss); + if (css) { + css->checkShardVersionOrThrow(txn); + } } AutoGetCollectionForRead::~AutoGetCollectionForRead() { @@ -186,7 +195,10 @@ void OldClientContext::_checkNotStale() const { case dbDelete: // here as well. break; default: - ensureShardVersionOKOrThrow(_txn, _ns); + auto css = CollectionShardingState::get(_txn, _ns); + if (css) { + css->checkShardVersionOrThrow(_txn); + } } } diff --git a/src/mongo/db/db_raii.h b/src/mongo/db/db_raii.h index 948cf014c90..654996b676c 100644 --- a/src/mongo/db/db_raii.h +++ b/src/mongo/db/db_raii.h @@ -74,7 +74,12 @@ class AutoGetCollection { MONGO_DISALLOW_COPYING(AutoGetCollection); public: - AutoGetCollection(OperationContext* txn, const NamespaceString& nss, LockMode mode); + AutoGetCollection(OperationContext* txn, const NamespaceString& nss, LockMode modeAll); + + AutoGetCollection(OperationContext* txn, + const NamespaceString& nss, + LockMode modeDB, + LockMode modeColl); Database* getDb() const { return _autoDb.getDb(); diff --git a/src/mongo/db/exec/idhack.cpp b/src/mongo/db/exec/idhack.cpp index affe2e4dc50..759ca91153f 100644 --- a/src/mongo/db/exec/idhack.cpp +++ b/src/mongo/db/exec/idhack.cpp @@ -38,7 +38,6 @@ #include "mongo/db/exec/working_set_computed_data.h" #include "mongo/db/index/btree_access_method.h" #include "mongo/db/storage/record_fetcher.h" -#include "mongo/s/d_state.h" #include "mongo/stdx/memory.h" namespace mongo { diff --git a/src/mongo/db/pipeline/document_source_cursor.cpp b/src/mongo/db/pipeline/document_source_cursor.cpp index c91c7fb2110..803799d8774 100644 --- a/src/mongo/db/pipeline/document_source_cursor.cpp +++ b/src/mongo/db/pipeline/document_source_cursor.cpp @@ -30,7 +30,6 @@ #include "mongo/db/pipeline/document_source.h" - #include "mongo/db/catalog/database_holder.h" #include "mongo/db/db_raii.h" #include "mongo/db/exec/working_set_common.h" @@ -39,7 +38,6 @@ #include "mongo/db/query/explain.h" #include "mongo/db/query/find_common.h" #include "mongo/db/storage/storage_options.h" -#include "mongo/s/d_state.h" namespace mongo { diff --git a/src/mongo/db/query/get_executor.cpp b/src/mongo/db/query/get_executor.cpp index faea115ca02..ade024378e6 100644 --- a/src/mongo/db/query/get_executor.cpp +++ b/src/mongo/db/query/get_executor.cpp @@ -75,6 +75,7 @@ #include "mongo/db/server_parameters.h" #include "mongo/db/service_context.h" #include "mongo/db/s/collection_metadata.h" +#include "mongo/db/s/collection_sharding_state.h" #include "mongo/db/s/sharding_state.h" #include "mongo/db/storage/storage_options.h" #include "mongo/db/storage/oplog_hack.h" @@ -85,7 +86,6 @@ namespace mongo { using std::unique_ptr; -using std::endl; using std::string; using std::vector; using stdx::make_unique; @@ -174,7 +174,7 @@ void fillOutPlannerParams(OperationContext* txn, // If the caller wants a shard filter, make sure we're actually sharded. if (plannerParams->options & QueryPlannerParams::INCLUDE_SHARD_FILTER) { std::shared_ptr<CollectionMetadata> collMetadata = - ShardingState::get(txn)->getCollectionMetadata(canonicalQuery->ns()); + CollectionShardingState::get(txn, canonicalQuery->nss())->getMetadata(); if (collMetadata) { plannerParams->shardKey = collMetadata->getKeyPattern(); } else { @@ -259,7 +259,7 @@ Status prepareExecution(OperationContext* opCtx, if (plannerParams.options & QueryPlannerParams::INCLUDE_SHARD_FILTER) { *rootOut = new ShardFilterStage( opCtx, - ShardingState::get(opCtx)->getCollectionMetadata(collection->ns().ns()), + CollectionShardingState::get(opCtx, canonicalQuery->nss())->getMetadata(), ws, *rootOut); } @@ -648,7 +648,7 @@ StatusWith<unique_ptr<PlanExecutor>> getExecutorDelete(OperationContext* txn, 12050, "cannot delete from system namespace", legalClientSystemNS(nss.ns(), true)); } if (nss.ns().find('$') != string::npos) { - log() << "cannot delete from collection with reserved $ in name: " << nss << endl; + log() << "cannot delete from collection with reserved $ in name: " << nss; uasserted(10100, "cannot delete from collection with reserved $ in name"); } } diff --git a/src/mongo/db/range_deleter_db_env.cpp b/src/mongo/db/range_deleter_db_env.cpp index 0a4a6734913..4132dfde863 100644 --- a/src/mongo/db/range_deleter_db_env.cpp +++ b/src/mongo/db/range_deleter_db_env.cpp @@ -42,7 +42,6 @@ #include "mongo/db/repl/replication_coordinator_global.h" #include "mongo/db/s/operation_sharding_state.h" #include "mongo/db/write_concern_options.h" -#include "mongo/s/d_state.h" #include "mongo/util/log.h" namespace mongo { diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index d371b7d6040..dcada5fd460 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -84,7 +84,6 @@ #include "mongo/db/storage/storage_options.h" #include "mongo/db/storage/storage_engine.h" #include "mongo/platform/random.h" -#include "mongo/s/d_state.h" #include "mongo/scripting/engine.h" #include "mongo/stdx/memory.h" #include "mongo/util/elapsed_tracker.h" diff --git a/src/mongo/db/s/collection_sharding_state.cpp b/src/mongo/db/s/collection_sharding_state.cpp index 9eec8de16e4..e0fa35d4da2 100644 --- a/src/mongo/db/s/collection_sharding_state.cpp +++ b/src/mongo/db/s/collection_sharding_state.cpp @@ -32,15 +32,21 @@ #include "mongo/db/s/collection_sharding_state.h" +#include "mongo/db/client.h" #include "mongo/db/concurrency/lock_state.h" #include "mongo/db/operation_context.h" +#include "mongo/db/repl/replication_coordinator_global.h" #include "mongo/db/s/collection_metadata.h" #include "mongo/db/s/operation_sharding_state.h" +#include "mongo/db/s/sharded_connection_info.h" #include "mongo/db/s/sharding_state.h" #include "mongo/s/chunk_version.h" +#include "mongo/s/stale_exception.h" namespace mongo { +using std::string; + CollectionShardingState::CollectionShardingState( NamespaceString nss, std::unique_ptr<CollectionMetadata> initialMetadata) : _nss(std::move(nss)), _metadata(std::move(initialMetadata)) {} @@ -62,10 +68,22 @@ CollectionShardingState* CollectionShardingState::get(OperationContext* txn, } void CollectionShardingState::setMetadata(std::shared_ptr<CollectionMetadata> newMetadata) { - invariant(newMetadata); _metadata = std::move(newMetadata); } +void CollectionShardingState::checkShardVersionOrThrow(OperationContext* txn) const { + string errmsg; + ChunkVersion received; + ChunkVersion wanted; + if (!_checkShardVersionOk(txn, &errmsg, &received, &wanted)) { + throw SendStaleConfigException(_nss.ns(), + str::stream() << "[" << _nss.ns() + << "] shard version not ok: " << errmsg, + received, + wanted); + } +} + bool CollectionShardingState::isDocumentInMigratingChunk(OperationContext* txn, const BSONObj& doc) { dassert(txn->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX)); @@ -76,6 +94,8 @@ bool CollectionShardingState::isDocumentInMigratingChunk(OperationContext* txn, void CollectionShardingState::onInsertOp(OperationContext* txn, const BSONObj& insertedDoc) { dassert(txn->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX)); + checkShardVersionOrThrow(txn); + ShardingState::get(txn)->migrationSourceManager()->logInsertOp( txn, _nss.ns().c_str(), insertedDoc); } @@ -83,6 +103,8 @@ void CollectionShardingState::onInsertOp(OperationContext* txn, const BSONObj& i void CollectionShardingState::onUpdateOp(OperationContext* txn, const BSONObj& updatedDoc) { dassert(txn->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX)); + checkShardVersionOrThrow(txn); + ShardingState::get(txn)->migrationSourceManager()->logUpdateOp( txn, _nss.ns().c_str(), updatedDoc); } @@ -90,8 +112,90 @@ void CollectionShardingState::onUpdateOp(OperationContext* txn, const BSONObj& u void CollectionShardingState::onDeleteOp(OperationContext* txn, const BSONObj& deletedDocId) { dassert(txn->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX)); + checkShardVersionOrThrow(txn); + ShardingState::get(txn)->migrationSourceManager()->logDeleteOp( txn, _nss.ns().c_str(), deletedDocId); } +bool CollectionShardingState::_checkShardVersionOk(OperationContext* txn, + string* errmsg, + ChunkVersion* expectedShardVersion, + ChunkVersion* actualShardVersion) const { + Client* client = txn->getClient(); + + // Operations using the DBDirectClient are unversioned. + if (client->isInDirectClient()) { + return true; + } + + if (!repl::ReplicationCoordinator::get(txn)->canAcceptWritesForDatabase(_nss.db())) { + // right now connections to secondaries aren't versioned at all + return true; + } + + const auto& oss = OperationShardingState::get(txn); + + // If there is a version attached to the OperationContext, use it as the received version. + // Otherwise, get the received version from the ShardedConnectionInfo. + if (oss.hasShardVersion()) { + *expectedShardVersion = oss.getShardVersion(_nss); + } else { + ShardedConnectionInfo* info = ShardedConnectionInfo::get(client, false); + if (!info) { + // There is no shard version information on either 'txn' or 'client'. This means that + // the operation represented by 'txn' is unversioned, and the shard version is always OK + // for unversioned operations. + return true; + } + + *expectedShardVersion = info->getVersion(_nss.ns()); + } + + if (ChunkVersion::isIgnoredVersion(*expectedShardVersion)) { + return true; + } + + *actualShardVersion = (_metadata ? _metadata->getShardVersion() : ChunkVersion::UNSHARDED()); + + if (expectedShardVersion->isWriteCompatibleWith(*actualShardVersion)) { + return true; + } + + // + // Figure out exactly why not compatible, send appropriate error message + // The versions themselves are returned in the error, so not needed in messages here + // + + // Check epoch first, to send more meaningful message, since other parameters probably won't + // match either. + if (actualShardVersion->epoch() != expectedShardVersion->epoch()) { + *errmsg = str::stream() << "version epoch mismatch detected for " << _nss.ns() << ", " + << "the collection may have been dropped and recreated"; + return false; + } + + if (!actualShardVersion->isSet() && expectedShardVersion->isSet()) { + *errmsg = str::stream() << "this shard no longer contains chunks for " << _nss.ns() << ", " + << "the collection may have been dropped"; + return false; + } + + if (actualShardVersion->isSet() && !expectedShardVersion->isSet()) { + *errmsg = str::stream() << "this shard contains versioned chunks for " << _nss.ns() << ", " + << "but no version set in request"; + return false; + } + + if (actualShardVersion->majorVersion() != expectedShardVersion->majorVersion()) { + // Could be > or < - wanted is > if this is the source of a migration, wanted < if this is + // the target of a migration + *errmsg = str::stream() << "version mismatch detected for " << _nss.ns(); + return false; + } + + // Those are all the reasons the versions can mismatch + MONGO_UNREACHABLE; +} + } // namespace mongo diff --git a/src/mongo/db/s/collection_sharding_state.h b/src/mongo/db/s/collection_sharding_state.h index 73f076fb37b..f9f3255597b 100644 --- a/src/mongo/db/s/collection_sharding_state.h +++ b/src/mongo/db/s/collection_sharding_state.h @@ -37,6 +37,7 @@ namespace mongo { class BSONObj; +struct ChunkVersion; class CollectionMetadata; class OperationContext; @@ -81,6 +82,17 @@ public: */ void setMetadata(std::shared_ptr<CollectionMetadata> newMetadata); + /** + * Checks whether the shard version in the context is compatible with the shard version of the + * collection locally and if not throws SendStaleConfigException populated with the expected and + * actual versions. + * + * Because SendStaleConfigException has special semantics in terms of how a sharded command's + * response is constructed, this function should be the only means of checking for shard version + * match. + */ + void checkShardVersionOrThrow(OperationContext* txn) const; + // Replication subsystem hooks. If this collection is serving as a source for migration, these // methods inform it of any changes to its contents. @@ -93,10 +105,29 @@ public: void onDeleteOp(OperationContext* txn, const BSONObj& deletedDocId); private: + /** + * Checks whether the shard version of the operation matches that of the collection. + * + * txn - Operation context from which to retrieve the operation's expected version. + * errmsg (out) - On false return contains an explanatory error message. + * expectedShardVersion (out) - On false return contains the expected collection version on this + * shard. Obtained from the operation sharding state. + * actualShardVersion (out) - On false return contains the actual collection version on this + * shard. Obtained from the collection sharding state. + * + * Returns true if the expected collection version on the shard matches its actual version on + * the shard and false otherwise. Upon false return, the output parameters will be set. + */ + bool _checkShardVersionOk(OperationContext* txn, + std::string* errmsg, + ChunkVersion* expectedShardVersion, + ChunkVersion* actualShardVersion) const; + // Namespace to which this state belongs. const NamespaceString _nss; - // Contains all the chunks associated with this collection. This value is always non-null. + // Contains all the chunks associated with this collection. This value will be null if the + // collection is not sharded. std::shared_ptr<CollectionMetadata> _metadata; }; diff --git a/src/mongo/db/s/migration_source_manager.cpp b/src/mongo/db/s/migration_source_manager.cpp index 58d12719867..c9dacc78491 100644 --- a/src/mongo/db/s/migration_source_manager.cpp +++ b/src/mongo/db/s/migration_source_manager.cpp @@ -47,7 +47,6 @@ #include "mongo/db/record_id.h" #include "mongo/logger/ramlog.h" #include "mongo/s/chunk.h" -#include "mongo/s/d_state.h" #include "mongo/s/shard_key_pattern.h" #include "mongo/util/elapsed_tracker.h" #include "mongo/util/log.h" @@ -224,8 +223,6 @@ void MigrationSourceManager::done(OperationContext* txn) { void MigrationSourceManager::logInsertOp(OperationContext* txn, const char* ns, const BSONObj& obj) { - ensureShardVersionOKOrThrow(txn, ns); - dassert(txn->lockState()->isWriteLocked()); // Must have Global IX. if (!_sessionId || (_nss != ns)) @@ -249,8 +246,6 @@ void MigrationSourceManager::logInsertOp(OperationContext* txn, void MigrationSourceManager::logUpdateOp(OperationContext* txn, const char* ns, const BSONObj& updatedDoc) { - ensureShardVersionOKOrThrow(txn, ns); - dassert(txn->lockState()->isWriteLocked()); // Must have Global IX. if (!_sessionId || (_nss != ns)) @@ -274,8 +269,6 @@ void MigrationSourceManager::logUpdateOp(OperationContext* txn, void MigrationSourceManager::logDeleteOp(OperationContext* txn, const char* ns, const BSONObj& obj) { - ensureShardVersionOKOrThrow(txn, ns); - dassert(txn->lockState()->isWriteLocked()); // Must have Global IX. BSONElement idElement = obj["_id"]; diff --git a/src/mongo/db/s/sharding_state.cpp b/src/mongo/db/s/sharding_state.cpp index a125a8b155d..775104d4e6b 100644 --- a/src/mongo/db/s/sharding_state.cpp +++ b/src/mongo/db/s/sharding_state.cpp @@ -36,7 +36,7 @@ #include "mongo/client/connection_string.h" #include "mongo/client/replica_set_monitor.h" #include "mongo/db/client.h" -#include "mongo/db/concurrency/lock_state.h" +#include "mongo/db/db_raii.h" #include "mongo/db/dbhelpers.h" #include "mongo/db/operation_context.h" #include "mongo/db/repl/optime.h" @@ -218,7 +218,10 @@ CollectionShardingState* ShardingState::getNS(const std::string& ns) { stdx::lock_guard<stdx::mutex> lk(_mutex); CollectionShardingStateMap::iterator it = _collections.find(ns); if (it == _collections.end()) { - return nullptr; + auto inserted = _collections.insert(make_pair( + ns, stdx::make_unique<CollectionShardingState>(NamespaceString(ns), nullptr))); + invariant(inserted.second); + it = std::move(inserted.first); } return it->second.get(); @@ -229,21 +232,22 @@ void ShardingState::clearCollectionMetadata() { _collections.clear(); } -// TODO we shouldn't need three ways for checking the version. Fix this. -bool ShardingState::hasVersion(const string& ns) { - stdx::lock_guard<stdx::mutex> lk(_mutex); - return !!_collections.count(ns); -} - ChunkVersion ShardingState::getVersion(const string& ns) { - stdx::lock_guard<stdx::mutex> lk(_mutex); - CollectionShardingStateMap::const_iterator it = _collections.find(ns); - if (it != _collections.end()) { - shared_ptr<CollectionMetadata> p = it->second->getMetadata(); + shared_ptr<CollectionMetadata> p; + + { + stdx::lock_guard<stdx::mutex> lk(_mutex); + CollectionShardingStateMap::const_iterator it = _collections.find(ns); + if (it != _collections.end()) { + p = it->second->getMetadata(); + } + } + + if (p) { return p->getShardVersion(); - } else { - return ChunkVersion(0, 0, OID()); } + + return ChunkVersion::UNSHARDED(); } void ShardingState::donateChunk(OperationContext* txn, @@ -618,24 +622,16 @@ Status ShardingState::_refreshMetadata(OperationContext* txn, const ChunkVersion& reqShardVersion, bool useRequestedVersion, ChunkVersion* latestShardVersion) { + invariant(!txn->lockState()->isLocked()); + Status status = _waitForInitialization(txn); if (!status.isOK()) return status; - // The idea here is that we're going to reload the metadata from the config server, but - // we need to do so outside any locks. When we get our result back, if the current metadata - // has changed, we may not be able to install the new metadata. - - // - // Get the initial metadata - // No DBLock is needed since the metadata is expected to change during reload. - // - - shared_ptr<CollectionMetadata> beforeMetadata; - + // We can't reload if a shard name has not yet been set { stdx::lock_guard<stdx::mutex> lk(_mutex); - // We also can't reload if a shard name has not yet been set. + if (_shardName.empty()) { string errMsg = str::stream() << "cannot refresh metadata for " << ns << " before shard name has been set"; @@ -643,15 +639,24 @@ Status ShardingState::_refreshMetadata(OperationContext* txn, warning() << errMsg; return Status(ErrorCodes::NotYetInitialized, errMsg); } + } - CollectionShardingStateMap::iterator it = _collections.find(ns); - if (it != _collections.end()) { - beforeMetadata = it->second->getMetadata(); - } + const NamespaceString nss(ns); + + // The idea here is that we're going to reload the metadata from the config server, but we need + // to do so outside any locks. When we get our result back, if the current metadata has + // changed, we may not be able to install the new metadata. + shared_ptr<CollectionMetadata> beforeMetadata; + { + ScopedTransaction transaction(txn, MODE_IS); + AutoGetCollection autoColl(txn, nss, MODE_IS); + + beforeMetadata = CollectionShardingState::get(txn, nss)->getMetadata(); } ChunkVersion beforeShardVersion; ChunkVersion beforeCollVersion; + if (beforeMetadata) { beforeShardVersion = beforeMetadata->getShardVersion(); beforeCollVersion = beforeMetadata->getCollVersion(); @@ -696,7 +701,7 @@ Status ShardingState::_refreshMetadata(OperationContext* txn, grid.catalogManager(txn), ns, getShardName(), - fullReload ? NULL : beforeMetadata.get(), + fullReload ? nullptr : beforeMetadata.get(), remoteMetadata.get()); refreshMillis = refreshTimer.millis(); @@ -739,29 +744,11 @@ Status ShardingState::_refreshMetadata(OperationContext* txn, // Exclusive collection lock needed since we're now potentially changing the metadata, // and don't want reads/writes to be ongoing. ScopedTransaction transaction(txn, MODE_IX); - Lock::DBLock dbLock(txn->lockState(), nsToDatabaseSubstring(ns), MODE_IX); - Lock::CollectionLock collLock(txn->lockState(), ns, MODE_X); + AutoGetCollection autoColl(txn, nss, MODE_IX, MODE_X); - // // Get the metadata now that the load has completed - // - - // Don't reload if our config server has changed or sharding is no longer enabled - if (!enabled()) { - string errMsg = str::stream() << "could not refresh metadata for " << ns - << ", sharding is no longer enabled"; - - warning() << errMsg; - return Status(ErrorCodes::NotYetInitialized, errMsg); - } - - stdx::lock_guard<stdx::mutex> lk(_mutex); - - CollectionShardingStateMap::iterator it = _collections.find(ns); - if (it != _collections.end()) { - afterMetadata = it->second->getMetadata(); - } - + auto css = CollectionShardingState::get(txn, nss); + afterMetadata = css->getMetadata(); if (afterMetadata) { afterShardVersion = afterMetadata->getShardVersion(); afterCollVersion = afterMetadata->getCollVersion(); @@ -774,7 +761,6 @@ Status ShardingState::_refreshMetadata(OperationContext* txn, // Status status = mdLoader.promotePendingChunks(afterMetadata.get(), remoteMetadata.get()); - if (!status.isOK()) { warning() << "remote metadata for " << ns << " is inconsistent with current pending chunks" @@ -797,31 +783,22 @@ Status ShardingState::_refreshMetadata(OperationContext* txn, if (!afterCollVersion.epoch().isSet()) { // First metadata load installType = InstallType_New; - dassert(it == _collections.end()); - _collections.insert(make_pair(ns, - stdx::make_unique<CollectionShardingState>( - NamespaceString(ns), std::move(remoteMetadata)))); + css->setMetadata(std::move(remoteMetadata)); } else if (remoteCollVersion.epoch().isSet() && remoteCollVersion.epoch() == afterCollVersion.epoch()) { // Update to existing metadata installType = InstallType_Update; - - // Invariant: If CollMetadata was not found, version should be have been 0. - dassert(it != _collections.end()); - it->second->setMetadata(std::move(remoteMetadata)); + css->setMetadata(std::move(remoteMetadata)); } else if (remoteCollVersion.epoch().isSet()) { // New epoch detected, replacing metadata installType = InstallType_Replace; - - // Invariant: If CollMetadata was not found, version should be have been 0. - dassert(it != _collections.end()); - it->second->setMetadata(std::move(remoteMetadata)); + css->setMetadata(std::move(remoteMetadata)); } else { dassert(!remoteCollVersion.epoch().isSet()); // Drop detected installType = InstallType_Drop; - _collections.erase(it); + css->setMetadata(nullptr); } *latestShardVersion = remoteShardVersion; @@ -906,15 +883,17 @@ void ShardingState::appendInfo(OperationContext* txn, BSONObjBuilder& builder) { it != _collections.end(); ++it) { shared_ptr<CollectionMetadata> metadata = it->second->getMetadata(); - versionB.appendTimestamp(it->first, metadata->getShardVersion().toLong()); + if (metadata) { + versionB.appendTimestamp(it->first, metadata->getShardVersion().toLong()); + } else { + versionB.appendTimestamp(it->first, ChunkVersion::UNSHARDED().toLong()); + } } versionB.done(); } bool ShardingState::needCollectionMetadata(OperationContext* txn, const string& ns) { - stdx::lock_guard<stdx::mutex> lk(_mutex); - if (!enabled()) return false; diff --git a/src/mongo/db/s/sharding_state.h b/src/mongo/db/s/sharding_state.h index 2383660ae16..19dc1be2978 100644 --- a/src/mongo/db/s/sharding_state.h +++ b/src/mongo/db/s/sharding_state.h @@ -132,8 +132,6 @@ public: */ void clearCollectionMetadata(); - bool hasVersion(const std::string& ns); - ChunkVersion getVersion(const std::string& ns); /** diff --git a/src/mongo/s/commands/commands_public.cpp b/src/mongo/s/commands/commands_public.cpp index b51f0e6d29d..6c177a05d59 100644 --- a/src/mongo/s/commands/commands_public.cpp +++ b/src/mongo/s/commands/commands_public.cpp @@ -908,17 +908,17 @@ public: auto status = grid.catalogCache()->getDatabase(txn, nss.db().toString()); if (!status.isOK()) { return Status(status.getStatus().code(), - stream() << "Passthrough command failed: " << command.toString() - << " on ns " << nss.ns() << ". Caused by " - << causedBy(status.getStatus())); + str::stream() << "Passthrough command failed: " << command.toString() + << " on ns " << nss.ns() << ". Caused by " + << causedBy(status.getStatus())); } shared_ptr<DBConfig> conf = status.getValue(); if (conf->isSharded(nss.ns())) { return Status(ErrorCodes::IllegalOperation, - stream() << "Passthrough command failed: " << command.toString() - << " on ns " << nss.ns() - << ". Cannot run on sharded namespace."); + str::stream() << "Passthrough command failed: " << command.toString() + << " on ns " << nss.ns() + << ". Cannot run on sharded namespace."); } const auto primaryShard = grid.shardRegistry()->getShard(txn, conf->getPrimaryId()); @@ -931,8 +931,9 @@ public: if (!conn->runCommand(nss.db().toString(), command, shardResult, options)) { conn.done(); return Status(ErrorCodes::OperationFailed, - stream() << "Passthrough command failed: " << command << " on ns " - << nss.ns() << "; result: " << shardResult); + str::stream() << "Passthrough command failed: " << command + << " on ns " << nss.ns() + << "; result: " << shardResult); } conn.done(); } catch (const DBException& ex) { diff --git a/src/mongo/s/d_state.cpp b/src/mongo/s/d_state.cpp index bf2e483b271..af47f7027a1 100644 --- a/src/mongo/s/d_state.cpp +++ b/src/mongo/s/d_state.cpp @@ -37,18 +37,15 @@ #include "mongo/db/auth/authorization_manager.h" #include "mongo/db/auth/authorization_session.h" #include "mongo/db/auth/privilege.h" -#include "mongo/db/client.h" #include "mongo/db/commands.h" #include "mongo/db/db_raii.h" #include "mongo/db/jsobj.h" #include "mongo/db/operation_context.h" -#include "mongo/db/repl/replication_coordinator_global.h" #include "mongo/db/s/collection_metadata.h" #include "mongo/db/s/collection_sharding_state.h" #include "mongo/db/s/operation_sharding_state.h" #include "mongo/db/s/sharded_connection_info.h" #include "mongo/db/s/sharding_state.h" -#include "mongo/s/stale_exception.h" #include "mongo/util/log.h" #include "mongo/util/stringutils.h" @@ -57,7 +54,6 @@ namespace mongo { using std::shared_ptr; using std::string; using std::stringstream; -using std::vector; namespace { @@ -224,130 +220,23 @@ public: } shardingStateCmd; -/** - * @ return true if not in sharded mode - or if version for this client is ok - */ -bool shardVersionOk(OperationContext* txn, - const string& ns, - string& errmsg, - ChunkVersion& received, - ChunkVersion& wanted) { - Client* client = txn->getClient(); - - // Operations using the DBDirectClient are unversioned. - if (client->isInDirectClient()) { - return true; - } - - ShardingState* shardingState = ShardingState::get(client->getServiceContext()); - if (!shardingState->enabled()) { - return true; - } - - if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesForDatabase(nsToDatabase(ns))) { - // right now connections to secondaries aren't versioned at all - return true; - } - - auto& oss = OperationShardingState::get(txn); - - // If there is a version attached to the OperationContext, use it as the received version. - // Otherwise, get the received version from the ShardedConnectionInfo. - if (oss.hasShardVersion()) { - received = oss.getShardVersion(NamespaceString(ns)); - } else { - ShardedConnectionInfo* info = ShardedConnectionInfo::get(client, false); - if (!info) { - // There is no shard version information on either 'txn' or 'client'. This means that - // the operation represented by 'txn' is unversioned, and the shard version is always OK - // for unversioned operations. - return true; - } - - received = info->getVersion(ns); - } - - if (ChunkVersion::isIgnoredVersion(received)) { - return true; - } - - wanted = shardingState->getVersion(ns); - - if (received.isWriteCompatibleWith(wanted)) - return true; - - // - // Figure out exactly why not compatible, send appropriate error message - // The versions themselves are returned in the error, so not needed in messages here - // - - // Check epoch first, to send more meaningful message, since other parameters probably - // won't match either - if (!wanted.hasEqualEpoch(received)) { - errmsg = str::stream() << "version epoch mismatch detected for " << ns << ", " - << "the collection may have been dropped and recreated"; - return false; - } - - if (!wanted.isSet() && received.isSet()) { - errmsg = str::stream() << "this shard no longer contains chunks for " << ns << ", " - << "the collection may have been dropped"; - return false; - } +} // namespace - if (wanted.isSet() && !received.isSet()) { - errmsg = str::stream() << "this shard contains versioned chunks for " << ns << ", " - << "but no version set in request"; +bool haveLocalShardingInfo(OperationContext* txn, const string& ns) { + if (!ShardingState::get(txn)->enabled()) { return false; } - if (wanted.majorVersion() != received.majorVersion()) { - // - // Could be > or < - wanted is > if this is the source of a migration, - // wanted < if this is the target of a migration - // - - errmsg = str::stream() << "version mismatch detected for " << ns << ", " - << "stored major version " << wanted.majorVersion() - << " does not match received " << received.majorVersion(); + auto css = CollectionShardingState::get(txn, ns); + if (!css->getMetadata()) { return false; } - // Those are all the reasons the versions can mismatch - MONGO_UNREACHABLE; -} - -} // namespace - -bool haveLocalShardingInfo(Client* client, const string& ns) { - if (!ShardingState::get(client->getServiceContext())->enabled()) - return false; - - if (!ShardingState::get(client->getServiceContext())->hasVersion(ns)) - return false; - - return ShardedConnectionInfo::get(client, false) != NULL; -} - -void ensureShardVersionOKOrThrow(OperationContext* txn, const std::string& ns) { - string errmsg; - ChunkVersion received; - ChunkVersion wanted; - if (!shardVersionOk(txn, ns, errmsg, received, wanted)) { - StringBuilder sb; - sb << "[" << ns << "] shard version not ok: " << errmsg; - throw SendStaleConfigException(ns, sb.str(), received, wanted); - } + return ShardedConnectionInfo::get(txn->getClient(), false) != nullptr; } void usingAShardConnection(const string& addr) {} -void saveGLEStats(const BSONObj& result, StringData hostString) { - // Declared in cluster_last_error_info.h. - // - // This can be called in mongod, which is unfortunate. To fix this, - // we can redesign how connection pooling works on mongod for sharded operations. -} +void saveGLEStats(const BSONObj& result, StringData hostString) {} } // namespace mongo diff --git a/src/mongo/s/d_state.h b/src/mongo/s/d_state.h index e2fd9cf79e5..5583d053d55 100644 --- a/src/mongo/s/d_state.h +++ b/src/mongo/s/d_state.h @@ -26,36 +26,17 @@ * then also delete it in the license file. */ - #pragma once #include <string> namespace mongo { -class BSONObj; -class Client; class OperationContext; -class ShardedConnectionInfo; -class NamespaceString; - -// ----------------- -// --- core --- -// ----------------- /** * @return true if we have any shard info for the ns */ -bool haveLocalShardingInfo(Client* client, const std::string& ns); +bool haveLocalShardingInfo(OperationContext* txn, const std::string& ns); -/** - * Validates whether the shard chunk version for the specified collection is up to date and if - * not, throws SendStaleConfigException. - * - * It is important (but not enforced) that method be called with the collection locked in at - * least IS mode in order to ensure that the shard version won't change. - * - * @param ns Complete collection namespace to be cheched. - */ -void ensureShardVersionOKOrThrow(OperationContext* txn, const std::string& ns); -} +} // namespace mongo diff --git a/src/mongo/s/stale_exception.h b/src/mongo/s/stale_exception.h index 8769da77bbe..90d6e0060e6 100644 --- a/src/mongo/s/stale_exception.h +++ b/src/mongo/s/stale_exception.h @@ -35,10 +35,8 @@ namespace mongo { -using mongoutils::str::stream; - /** - * Thrown whenever your config info for a given shard/chunk is out of date. + * Thrown whenever the config info for a given shard/chunk is out of date. */ class StaleConfigException : public AssertionException { public: @@ -48,9 +46,9 @@ public: ChunkVersion received, ChunkVersion wanted) : AssertionException( - stream() << raw << " ( ns : " << ns << ", received : " << received.toString() - << ", wanted : " << wanted.toString() << ", " - << (code == ErrorCodes::SendStaleConfig ? "send" : "recv") << " )", + str::stream() << raw << " ( ns : " << ns << ", received : " << received.toString() + << ", wanted : " << wanted.toString() << ", " + << (code == ErrorCodes::SendStaleConfig ? "send" : "recv") << " )", code), _ns(ns), _received(received), @@ -58,22 +56,24 @@ public: /** Preferred if we're rebuilding this from a thrown exception */ StaleConfigException(const std::string& raw, int code, const BSONObj& error) - : AssertionException( - stream() << raw - << " ( ns : " << (error["ns"].type() == String ? error["ns"].String() - : std::string("<unknown>")) - << ", received : " << ChunkVersion::fromBSON(error, "vReceived").toString() - << ", wanted : " << ChunkVersion::fromBSON(error, "vWanted").toString() - << ", " << (code == ErrorCodes::SendStaleConfig ? "send" : "recv") << " )", - code), + : AssertionException(str::stream() + << raw << " ( ns : " << (error["ns"].type() == String + ? error["ns"].String() + : std::string("<unknown>")) + << ", received : " + << ChunkVersion::fromBSON(error, "vReceived").toString() + << ", wanted : " + << ChunkVersion::fromBSON(error, "vWanted").toString() << ", " + << (code == ErrorCodes::SendStaleConfig ? "send" : "recv") << " )", + code), // For legacy reasons, we may not always get a namespace here _ns(error["ns"].type() == String ? error["ns"].String() : ""), _received(ChunkVersion::fromBSON(error, "vReceived")), _wanted(ChunkVersion::fromBSON(error, "vWanted")) {} /** - * Needs message so when we trace all exceptions on construction we get a useful - * message + * TODO: This constructor is only necessary, because ParallelSortClusteredCursor puts per-host + * stale config exceptions in a map and this requires a default constructor. */ StaleConfigException() : AssertionException("initializing empty stale config exception object", 0) {} @@ -88,26 +88,6 @@ public: return _ns; } - /** - * true if this exception would require a full reload of config data to resolve - */ - bool requiresFullReload() const { - return !_received.hasEqualEpoch(_wanted) || _received.isSet() != _wanted.isSet(); - } - - static bool parse(const std::string& big, std::string& ns, std::string& raw) { - std::string::size_type start = big.find('['); - if (start == std::string::npos) - return false; - std::string::size_type end = big.find(']', start); - if (end == std::string::npos) - return false; - - ns = big.substr(start + 1, (end - start) - 1); - raw = big.substr(end + 1); - return true; - } - ChunkVersion getVersionReceived() const { return _received; } @@ -116,14 +96,11 @@ public: return _wanted; } - StaleConfigException& operator=(const StaleConfigException& elem) { - this->_ei.msg = elem._ei.msg; - this->_ei.code = elem._ei.code; - this->_ns = elem._ns; - this->_received = elem._received; - this->_wanted = elem._wanted; - - return *this; + /** + * Returns true if this exception would require a full reload of config data to resolve. + */ + bool requiresFullReload() const { + return !_received.hasEqualEpoch(_wanted) || _received.isSet() != _wanted.isSet(); } private: diff --git a/src/mongo/scripting/mozjs/db.cpp b/src/mongo/scripting/mozjs/db.cpp index 0a107ba3953..97112595037 100644 --- a/src/mongo/scripting/mozjs/db.cpp +++ b/src/mongo/scripting/mozjs/db.cpp @@ -59,8 +59,7 @@ void DBInfo::getProperty(JSContext* cx, if (o.hasOwnField(InternedString::_fullName)) { // need to check every time that the collection did not get sharded - if (haveLocalShardingInfo(opContext->getClient(), - o.getString(InternedString::_fullName))) + if (haveLocalShardingInfo(opContext, o.getString(InternedString::_fullName))) uasserted(ErrorCodes::BadValue, "can't use sharded collection from db.eval"); } } diff --git a/src/mongo/scripting/mozjs/dbcollection.cpp b/src/mongo/scripting/mozjs/dbcollection.cpp index 2cc03a752e1..8d094721f8f 100644 --- a/src/mongo/scripting/mozjs/dbcollection.cpp +++ b/src/mongo/scripting/mozjs/dbcollection.cpp @@ -75,8 +75,9 @@ void DBCollectionInfo::construct(JSContext* cx, JS::CallArgs args) { std::string fullName = ValueWriter(cx, args.get(3)).toString(); auto context = scope->getOpContext(); - if (context && haveLocalShardingInfo(context->getClient(), fullName)) + if (context && haveLocalShardingInfo(context, fullName)) { uasserted(ErrorCodes::BadValue, "can't use sharded collection from db.eval"); + } args.rval().setObjectOrNull(thisv); } diff --git a/src/mongo/shell/clientAndShell.cpp b/src/mongo/shell/clientAndShell.cpp index 2f55cef17aa..abbb03be656 100644 --- a/src/mongo/shell/clientAndShell.cpp +++ b/src/mongo/shell/clientAndShell.cpp @@ -70,7 +70,7 @@ bool inShutdown() { return dbexitCalled; } -bool haveLocalShardingInfo(Client* client, const string& ns) { +bool haveLocalShardingInfo(OperationContext* txn, const string& ns) { return false; } diff --git a/src/mongo/unittest/crutch.cpp b/src/mongo/unittest/crutch.cpp index 282c03538c4..61297a0f48c 100644 --- a/src/mongo/unittest/crutch.cpp +++ b/src/mongo/unittest/crutch.cpp @@ -54,7 +54,7 @@ DBClientBase* createDirectClient(OperationContext* txn) { return NULL; } -bool haveLocalShardingInfo(Client* client, const std::string& ns) { +bool haveLocalShardingInfo(OperationContext* txn, const std::string& ns) { return false; } |