summaryrefslogtreecommitdiff
path: root/src/mongo/db/s
diff options
context:
space:
mode:
authorKaloian Manassiev <kaloian.manassiev@mongodb.com>2016-03-07 14:22:47 -0500
committerKaloian Manassiev <kaloian.manassiev@mongodb.com>2016-03-16 18:53:09 -0400
commitee5fbdd8540d93d2e0d6fa19ba9a5595bb1829cb (patch)
tree2ff83a961d227161d6171708162b595cf35ab7bd /src/mongo/db/s
parent28399f947dda32a33509ddaebc650b0aa95fa0dd (diff)
downloadmongo-ee5fbdd8540d93d2e0d6fa19ba9a5595bb1829cb.tar.gz
SERVER-22997 Unify wait for migrate critical section and metadata refresh
This change gets rid of commands explicitly waiting for critical section establishment and instead makes all operations when they encounter stale version to install a wait object on the OperationContext, which can be used to wait for critical section outside of lock. It also makes refresh to happen outside of lock.
Diffstat (limited to 'src/mongo/db/s')
-rw-r--r--src/mongo/db/s/collection_sharding_state.cpp12
-rw-r--r--src/mongo/db/s/migration_impl.cpp9
-rw-r--r--src/mongo/db/s/migration_source_manager.cpp61
-rw-r--r--src/mongo/db/s/migration_source_manager.h53
-rw-r--r--src/mongo/db/s/operation_sharding_state.cpp64
-rw-r--r--src/mongo/db/s/operation_sharding_state.h28
-rw-r--r--src/mongo/db/s/set_shard_version_command.cpp263
-rw-r--r--src/mongo/db/s/sharding_state.cpp83
-rw-r--r--src/mongo/db/s/sharding_state.h29
9 files changed, 359 insertions, 243 deletions
diff --git a/src/mongo/db/s/collection_sharding_state.cpp b/src/mongo/db/s/collection_sharding_state.cpp
index e0fa35d4da2..acff32f0b43 100644
--- a/src/mongo/db/s/collection_sharding_state.cpp
+++ b/src/mongo/db/s/collection_sharding_state.cpp
@@ -68,6 +68,11 @@ CollectionShardingState* CollectionShardingState::get(OperationContext* txn,
}
void CollectionShardingState::setMetadata(std::shared_ptr<CollectionMetadata> newMetadata) {
+ if (newMetadata) {
+ invariant(!newMetadata->getCollVersion().isWriteCompatibleWith(ChunkVersion::UNSHARDED()));
+ invariant(!newMetadata->getShardVersion().isWriteCompatibleWith(ChunkVersion::UNSHARDED()));
+ }
+
_metadata = std::move(newMetadata);
}
@@ -76,6 +81,13 @@ void CollectionShardingState::checkShardVersionOrThrow(OperationContext* txn) co
ChunkVersion received;
ChunkVersion wanted;
if (!_checkShardVersionOk(txn, &errmsg, &received, &wanted)) {
+ // Set migration critical section in case we failed because of migration
+ auto migrationCritSec =
+ ShardingState::get(txn)->migrationSourceManager()->getMigrationCriticalSection();
+ if (migrationCritSec) {
+ OperationShardingState::get(txn).setMigrationCriticalSection(migrationCritSec);
+ }
+
throw SendStaleConfigException(_nss.ns(),
str::stream() << "[" << _nss.ns()
<< "] shard version not ok: " << errmsg,
diff --git a/src/mongo/db/s/migration_impl.cpp b/src/mongo/db/s/migration_impl.cpp
index 873fc6c6f5c..d3dd4e30442 100644
--- a/src/mongo/db/s/migration_impl.cpp
+++ b/src/mongo/db/s/migration_impl.cpp
@@ -69,6 +69,7 @@ BSONObj createRecvChunkCommitRequest(const MigrationSessionId& sessionId) {
}
MONGO_FP_DECLARE(failMigrationCommit);
+MONGO_FP_DECLARE(hangBeforeCommitMigration);
MONGO_FP_DECLARE(hangBeforeLeavingCriticalSection);
} // namespace
@@ -133,12 +134,12 @@ Status ChunkMoveOperationState::initialize(const BSONObj& cmdObj) {
_toShardCS = toShard->getConnString();
}
- auto& operationVersion = OperationShardingState::get(_txn);
- if (!operationVersion.hasShardVersion()) {
+ auto& oss = OperationShardingState::get(_txn);
+ if (!oss.hasShardVersion()) {
return Status{ErrorCodes::InvalidOptions, "moveChunk command is missing shard version"};
}
- _collectionVersion = operationVersion.getShardVersion(_nss);
+ _collectionVersion = oss.getShardVersion(_nss);
return Status::OK();
}
@@ -385,6 +386,8 @@ Status ChunkMoveOperationState::commitMigration(const MigrationSessionId& sessio
preCond.append(b.obj());
}
+ MONGO_FAIL_POINT_PAUSE_WHILE_SET(hangBeforeCommitMigration);
+
fassertStatusOK(34431,
grid.catalogManager(_txn)->applyChunkOpsDeprecated(
_txn, updates.arr(), preCond.arr(), _nss.ns(), nextVersion));
diff --git a/src/mongo/db/s/migration_source_manager.cpp b/src/mongo/db/s/migration_source_manager.cpp
index c9dacc78491..980620de09e 100644
--- a/src/mongo/db/s/migration_source_manager.cpp
+++ b/src/mongo/db/s/migration_source_manager.cpp
@@ -48,6 +48,7 @@
#include "mongo/logger/ramlog.h"
#include "mongo/s/chunk.h"
#include "mongo/s/shard_key_pattern.h"
+#include "mongo/stdx/memory.h"
#include "mongo/util/elapsed_tracker.h"
#include "mongo/util/log.h"
@@ -209,8 +210,11 @@ void MigrationSourceManager::done(OperationContext* txn) {
_sessionId = boost::none;
_deleteNotifyExec.reset(NULL);
- _inCriticalSection = false;
- _inCriticalSectionCV.notify_all();
+
+ if (_critSec) {
+ _critSec->exitCriticalSection();
+ _critSec = nullptr;
+ }
_deleted.clear();
_reload.clear();
@@ -577,30 +581,23 @@ long long MigrationSourceManager::mbUsed() const {
return _memoryUsed / (1024 * 1024);
}
-bool MigrationSourceManager::getInCriticalSection() const {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- return _inCriticalSection;
-}
-
void MigrationSourceManager::setInCriticalSection(bool inCritSec) {
stdx::lock_guard<stdx::mutex> lk(_mutex);
- _inCriticalSection = inCritSec;
- _inCriticalSectionCV.notify_all();
-}
-bool MigrationSourceManager::waitTillNotInCriticalSection(int maxSecondsToWait) {
- const auto deadline = stdx::chrono::system_clock::now() + Seconds(maxSecondsToWait);
- stdx::unique_lock<stdx::mutex> lk(_mutex);
- while (_inCriticalSection) {
- log() << "Waiting for " << maxSecondsToWait
- << " seconds for the migration critical section to end";
-
- if (stdx::cv_status::timeout == _inCriticalSectionCV.wait_until(lk, deadline)) {
- return false;
- }
+ if (inCritSec) {
+ invariant(!_critSec);
+ _critSec = std::make_shared<CriticalSectionState>();
+ } else {
+ invariant(_critSec);
+ _critSec->exitCriticalSection();
+ _critSec = nullptr;
}
+}
- return true;
+std::shared_ptr<MigrationSourceManager::CriticalSectionState>
+MigrationSourceManager::getMigrationCriticalSection() {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ return _critSec;
}
bool MigrationSourceManager::isActive() const {
@@ -649,4 +646,26 @@ NamespaceString MigrationSourceManager::_getNS() const {
return _nss;
}
+MigrationSourceManager::CriticalSectionState::CriticalSectionState() = default;
+
+bool MigrationSourceManager::CriticalSectionState::waitUntilOutOfCriticalSection(
+ Microseconds waitTimeout) {
+ const auto waitDeadline = stdx::chrono::system_clock::now() + waitTimeout;
+
+ stdx::unique_lock<stdx::mutex> sl(_criticalSectionMutex);
+ while (_inCriticalSection) {
+ if (stdx::cv_status::timeout == _criticalSectionCV.wait_until(sl, waitDeadline)) {
+ return false;
+ }
+ }
+
+ return true;
+}
+
+void MigrationSourceManager::CriticalSectionState::exitCriticalSection() {
+ stdx::unique_lock<stdx::mutex> sl(_criticalSectionMutex);
+ _inCriticalSection = false;
+ _criticalSectionCV.notify_all();
+}
+
} // namespace mongo
diff --git a/src/mongo/db/s/migration_source_manager.h b/src/mongo/db/s/migration_source_manager.h
index 8b5504fd1c9..5c9cf07751e 100644
--- a/src/mongo/db/s/migration_source_manager.h
+++ b/src/mongo/db/s/migration_source_manager.h
@@ -49,6 +49,8 @@ class MigrationSourceManager {
MONGO_DISALLOW_COPYING(MigrationSourceManager);
public:
+ class CriticalSectionState;
+
MigrationSourceManager();
~MigrationSourceManager();
@@ -127,18 +129,12 @@ public:
long long mbUsed() const;
- bool getInCriticalSection() const;
-
void setInCriticalSection(bool inCritSec);
- /**
- * Blocks until the "in critical section" state changes and returns true if we are NOT in the
- * critical section
- */
- bool waitTillNotInCriticalSection(int maxSecondsToWait);
-
bool isActive() const;
+ std::shared_ptr<CriticalSectionState> getMigrationCriticalSection();
+
private:
friend class LogOpForShardingHandler;
@@ -175,11 +171,6 @@ private:
mutable stdx::mutex _mutex;
- stdx::condition_variable _inCriticalSectionCV; // (M)
-
- // Is migration currently in critical section. This can be used to block new writes.
- bool _inCriticalSection{false}; // (M)
-
std::unique_ptr<PlanExecutor> _deleteNotifyExec; // (M)
// List of _id of documents that were modified that must be re-cloned.
@@ -203,6 +194,42 @@ private:
// List of record id that needs to be transferred from here to the other side.
std::set<RecordId> _cloneLocs; // (C)
+
+ // This value is set when setInCriticalSection is called with true argument and is signalled and
+ // cleared when it is called with false argument.
+ std::shared_ptr<CriticalSectionState> _critSec;
+};
+
+/**
+ * This object is instantiated once the migration logic enters critical section. It contains all
+ * the state which is associated with being in a critical section, such as the bumped metadata
+ * version (which has not yet been reflected on the config server).
+ */
+class MigrationSourceManager::CriticalSectionState {
+ MONGO_DISALLOW_COPYING(CriticalSectionState);
+
+public:
+ CriticalSectionState();
+
+ /**
+ * Blocks until the critical section completes. Returns true if the wait succeeded and the
+ * critical section is no longer active, or false if the waitTimeout was exceeded.
+ */
+ bool waitUntilOutOfCriticalSection(Microseconds waitTimeout);
+
+ /**
+ * To be called when the critical section has completed. Signals any threads sitting blocked in
+ * waitUntilOutOfCriticalSection. Must only be used once for the lifetime of this object.
+ */
+ void exitCriticalSection();
+
+private:
+ // Only moves from true to false once. Happens under the critical section mutex and the critical
+ // section will be signalled.
+ bool _inCriticalSection{true};
+
+ stdx::mutex _criticalSectionMutex;
+ stdx::condition_variable _criticalSectionCV;
};
} // namespace mongo
diff --git a/src/mongo/db/s/operation_sharding_state.cpp b/src/mongo/db/s/operation_sharding_state.cpp
index 906c491aac3..40617c08138 100644
--- a/src/mongo/db/s/operation_sharding_state.cpp
+++ b/src/mongo/db/s/operation_sharding_state.cpp
@@ -39,7 +39,8 @@ namespace {
const OperationContext::Decoration<OperationShardingState> shardingMetadataDecoration =
OperationContext::declareDecoration<OperationShardingState>();
-const ChunkVersion kUnshardedVersion(ChunkVersion::UNSHARDED());
+// Max time to wait for the migration critical section to complete
+const Minutes kMaxWaitForMigrationCriticalSection(5);
} // namespace mongo
@@ -49,12 +50,12 @@ OperationShardingState& OperationShardingState::get(OperationContext* txn) {
return shardingMetadataDecoration(txn);
}
-void OperationShardingState::initializeShardVersion(NamespaceString ns,
+void OperationShardingState::initializeShardVersion(NamespaceString nss,
const BSONElement& shardVersionElt) {
invariant(!hasShardVersion());
- if (ns.isSystemDotIndexes()) {
- setShardVersion(std::move(ns), ChunkVersion::IGNORED());
+ if (nss.isSystemDotIndexes()) {
+ setShardVersion(std::move(nss), ChunkVersion::IGNORED());
return;
}
@@ -70,31 +71,54 @@ void OperationShardingState::initializeShardVersion(NamespaceString ns,
return;
}
- setShardVersion(std::move(ns), std::move(newVersion));
+ setShardVersion(std::move(nss), std::move(newVersion));
}
bool OperationShardingState::hasShardVersion() const {
return _hasVersion;
}
-const ChunkVersion& OperationShardingState::getShardVersion(const NamespaceString& ns) const {
- if (_ns != ns) {
- return kUnshardedVersion;
+ChunkVersion OperationShardingState::getShardVersion(const NamespaceString& nss) const {
+ if (_ns != nss) {
+ return ChunkVersion::UNSHARDED();
}
return _shardVersion;
}
-void OperationShardingState::setShardVersion(NamespaceString ns, ChunkVersion newVersion) {
+void OperationShardingState::setShardVersion(NamespaceString nss, ChunkVersion newVersion) {
// This currently supports only setting the shard version for one namespace.
- invariant(!_hasVersion || _ns == ns);
- invariant(!ns.isSystemDotIndexes() || ChunkVersion::isIgnoredVersion(newVersion));
+ invariant(!_hasVersion || _ns == nss);
+ invariant(!nss.isSystemDotIndexes() || ChunkVersion::isIgnoredVersion(newVersion));
- _ns = std::move(ns);
+ _ns = std::move(nss);
_shardVersion = std::move(newVersion);
_hasVersion = true;
}
+bool OperationShardingState::waitForMigrationCriticalSection(OperationContext* txn) {
+ // Must not block while holding a lock
+ invariant(!txn->lockState()->isLocked());
+
+ if (_migrationCriticalSection) {
+ const Microseconds operationRemainingTime(Microseconds(txn->getRemainingMaxTimeMicros()));
+ _migrationCriticalSection->waitUntilOutOfCriticalSection(
+ durationCount<Microseconds>(operationRemainingTime)
+ ? operationRemainingTime
+ : kMaxWaitForMigrationCriticalSection);
+ _migrationCriticalSection = nullptr;
+ return true;
+ }
+
+ return false;
+}
+
+void OperationShardingState::setMigrationCriticalSection(
+ std::shared_ptr<MigrationSourceManager::CriticalSectionState> critSec) {
+ invariant(critSec);
+ _migrationCriticalSection = std::move(critSec);
+}
+
void OperationShardingState::_clear() {
_hasVersion = false;
_shardVersion = ChunkVersion();
@@ -104,21 +128,21 @@ void OperationShardingState::_clear() {
OperationShardingState::IgnoreVersioningBlock::IgnoreVersioningBlock(OperationContext* txn,
const NamespaceString& ns)
: _txn(txn), _ns(ns) {
- auto& operationVersion = OperationShardingState::get(txn);
- _hadOriginalVersion = operationVersion._hasVersion;
+ auto& oss = OperationShardingState::get(txn);
+ _hadOriginalVersion = oss._hasVersion;
if (_hadOriginalVersion) {
- _originalVersion = operationVersion.getShardVersion(ns);
+ _originalVersion = oss.getShardVersion(ns);
}
- operationVersion.setShardVersion(ns, ChunkVersion::IGNORED());
+ oss.setShardVersion(ns, ChunkVersion::IGNORED());
}
OperationShardingState::IgnoreVersioningBlock::~IgnoreVersioningBlock() {
- auto& operationVersion = OperationShardingState::get(_txn);
- invariant(ChunkVersion::isIgnoredVersion(operationVersion.getShardVersion(_ns)));
+ auto& oss = OperationShardingState::get(_txn);
+ invariant(ChunkVersion::isIgnoredVersion(oss.getShardVersion(_ns)));
if (_hadOriginalVersion) {
- operationVersion.setShardVersion(_ns, _originalVersion);
+ oss.setShardVersion(_ns, _originalVersion);
} else {
- operationVersion._clear();
+ oss._clear();
}
}
diff --git a/src/mongo/db/s/operation_sharding_state.h b/src/mongo/db/s/operation_sharding_state.h
index 7aa2fc919d3..f52cbd9f702 100644
--- a/src/mongo/db/s/operation_sharding_state.h
+++ b/src/mongo/db/s/operation_sharding_state.h
@@ -32,6 +32,7 @@
#include "mongo/base/disallow_copying.h"
#include "mongo/db/namespace_string.h"
+#include "mongo/db/s/migration_source_manager.h"
#include "mongo/s/chunk_version.h"
namespace mongo {
@@ -69,7 +70,7 @@ public:
* This initialization may only be performed once for the lifetime of the object, which
* coincides with the lifetime of the request.
*/
- void initializeShardVersion(NamespaceString ns, const BSONElement& shardVersionElement);
+ void initializeShardVersion(NamespaceString nss, const BSONElement& shardVersionElement);
/**
* Returns whether or not there is a shard version associated with this operation.
@@ -84,12 +85,29 @@ public:
* Returns ChunkVersion::UNSHARDED() if this operation has no shard version information
* for the requested namespace.
*/
- const ChunkVersion& getShardVersion(const NamespaceString& ns) const;
+ ChunkVersion getShardVersion(const NamespaceString& nss) const;
/**
* Stores the given chunk version of a namespace into this object.
*/
- void setShardVersion(NamespaceString ns, ChunkVersion newVersion);
+ void setShardVersion(NamespaceString nss, ChunkVersion newVersion);
+
+ /**
+ * This call is a no op if there isn't a currently active migration critical section. Otherwise
+ * it will wait for the critical section to complete up to the remaining operation time.
+ *
+ * Returns true if the call actually waited because of migration critical section (regardless if
+ * whether it timed out or not), false if there was no active migration critical section.
+ */
+ bool waitForMigrationCriticalSection(OperationContext* txn);
+
+ /**
+ * Setting this value indicates that when the version check failed, there was an active
+ * migration for the namespace and that it would be prudent to wait for the critical section to
+ * complete before retrying so the router doesn't make wasteful requests.
+ */
+ void setMigrationCriticalSection(
+ std::shared_ptr<MigrationSourceManager::CriticalSectionState> critSec);
private:
/**
@@ -101,6 +119,10 @@ private:
bool _hasVersion = false;
ChunkVersion _shardVersion;
NamespaceString _ns;
+
+ // This value will only be non-null if version check during the operation execution failed due
+ // to stale version and there was a migration for that namespace, which was in critical section.
+ std::shared_ptr<MigrationSourceManager::CriticalSectionState> _migrationCriticalSection;
};
/**
diff --git a/src/mongo/db/s/set_shard_version_command.cpp b/src/mongo/db/s/set_shard_version_command.cpp
index 04710cd8c95..b4f5f9170cb 100644
--- a/src/mongo/db/s/set_shard_version_command.cpp
+++ b/src/mongo/db/s/set_shard_version_command.cpp
@@ -37,9 +37,12 @@
#include "mongo/db/auth/privilege.h"
#include "mongo/db/client.h"
#include "mongo/db/commands.h"
+#include "mongo/db/db_raii.h"
#include "mongo/db/lasterror.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/repl/replication_coordinator_global.h"
+#include "mongo/db/s/collection_metadata.h"
+#include "mongo/db/s/collection_sharding_state.h"
#include "mongo/db/s/sharded_connection_info.h"
#include "mongo/db/s/sharding_state.h"
#include "mongo/db/wire_version.h"
@@ -152,157 +155,191 @@ public:
return false;
}
+ const NamespaceString nss(ns);
+
+ // Backwards compatibility for SERVER-23119
+ if (!nss.isValid()) {
+ warning() << "Invalid namespace used for setShardVersion: " << ns;
+ return true;
+ }
// we can run on a slave up to here
- if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesForDatabase(
- nsToDatabase(ns))) {
+ if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesForDatabase(nss.db())) {
result.append("errmsg", "not master");
result.append("note", "from post init in setShardVersion");
return false;
}
// step 2
- ChunkVersion version =
+ const ChunkVersion requestedVersion =
uassertStatusOK(ChunkVersion::parseFromBSONForSetShardVersion(cmdObj));
- // step 3
- const ChunkVersion oldVersion = info->getVersion(ns);
- const ChunkVersion globalVersion = shardingState->getVersion(ns);
+ // step 3 - Actual version checking
+ const ChunkVersion connectionVersion = info->getVersion(ns);
+ connectionVersion.addToBSON(result, "oldVersion");
+
+ {
+ // Use a stable collection metadata while performing the checks
+ boost::optional<AutoGetCollection> autoColl;
+ autoColl.emplace(txn, nss, MODE_IS);
+
+ auto css = CollectionShardingState::get(txn, nss);
+ const ChunkVersion collectionShardVersion =
+ (css->getMetadata() ? css->getMetadata()->getShardVersion()
+ : ChunkVersion::UNSHARDED());
+
+ if (requestedVersion.isWriteCompatibleWith(collectionShardVersion)) {
+ // mongos and mongod agree!
+ if (!connectionVersion.isWriteCompatibleWith(requestedVersion)) {
+ if (connectionVersion < collectionShardVersion &&
+ connectionVersion.epoch() == collectionShardVersion.epoch()) {
+ info->setVersion(ns, requestedVersion);
+ } else if (authoritative) {
+ // this means there was a drop and our version is reset
+ info->setVersion(ns, requestedVersion);
+ } else {
+ result.append("ns", ns);
+ result.appendBool("need_authoritative", true);
+ errmsg = "verifying drop on '" + ns + "'";
+ return false;
+ }
+ }
- oldVersion.addToBSON(result, "oldVersion");
+ return true;
+ }
- if (version.isWriteCompatibleWith(globalVersion)) {
- // mongos and mongod agree!
- if (!oldVersion.isWriteCompatibleWith(version)) {
- if (oldVersion < globalVersion && oldVersion.hasEqualEpoch(globalVersion)) {
- info->setVersion(ns, version);
- } else if (authoritative) {
- // this means there was a drop and our version is reset
- info->setVersion(ns, version);
- } else {
- result.append("ns", ns);
+ // step 4
+ // Cases below all either return OR fall-through to remote metadata reload.
+ const bool isDropRequested =
+ !requestedVersion.isSet() && collectionShardVersion.isSet();
+
+ if (isDropRequested) {
+ if (!authoritative) {
result.appendBool("need_authoritative", true);
- errmsg = "verifying drop on '" + ns + "'";
+ result.append("ns", ns);
+ collectionShardVersion.addToBSON(result, "globalVersion");
+ errmsg = "dropping needs to be authoritative";
return false;
}
- }
-
- return true;
- }
- // step 4
- // Cases below all either return OR fall-through to remote metadata reload.
- const bool isDropRequested = !version.isSet() && globalVersion.isSet();
-
- if (isDropRequested) {
- if (!authoritative) {
- result.appendBool("need_authoritative", true);
- result.append("ns", ns);
- globalVersion.addToBSON(result, "globalVersion");
- errmsg = "dropping needs to be authoritative";
- return false;
- }
-
- // Fall through to metadata reload below
- } else {
- // Not Dropping
+ // Fall through to metadata reload below
+ } else {
+ // Not Dropping
- // TODO: Refactor all of this
- if (version < oldVersion && version.hasEqualEpoch(oldVersion)) {
- errmsg = str::stream() << "this connection already had a newer version "
- << "of collection '" << ns << "'";
- result.append("ns", ns);
- version.addToBSON(result, "newVersion");
- globalVersion.addToBSON(result, "globalVersion");
- return false;
- }
+ // TODO: Refactor all of this
+ if (requestedVersion < connectionVersion &&
+ requestedVersion.epoch() == connectionVersion.epoch()) {
+ errmsg = str::stream() << "this connection already had a newer version "
+ << "of collection '" << ns << "'";
+ result.append("ns", ns);
+ requestedVersion.addToBSON(result, "newVersion");
+ collectionShardVersion.addToBSON(result, "globalVersion");
+ return false;
+ }
- // TODO: Refactor all of this
- if (version < globalVersion && version.hasEqualEpoch(globalVersion)) {
- while (shardingState->inCriticalMigrateSection()) {
- log() << "waiting till out of critical section";
- shardingState->waitTillNotInCriticalSection(10);
+ // TODO: Refactor all of this
+ if (requestedVersion < collectionShardVersion &&
+ requestedVersion.epoch() == collectionShardVersion.epoch()) {
+ auto critSec =
+ shardingState->migrationSourceManager()->getMigrationCriticalSection();
+ if (critSec) {
+ autoColl.reset();
+ log() << "waiting till out of critical section";
+ critSec->waitUntilOutOfCriticalSection(Seconds(10));
+ }
+
+ errmsg = str::stream() << "shard global version for collection is higher "
+ << "than trying to set to '" << ns << "'";
+ result.append("ns", ns);
+ requestedVersion.addToBSON(result, "version");
+ collectionShardVersion.addToBSON(result, "globalVersion");
+ result.appendBool("reloadConfig", true);
+ return false;
}
- errmsg = str::stream() << "shard global version for collection is higher "
- << "than trying to set to '" << ns << "'";
- result.append("ns", ns);
- version.addToBSON(result, "version");
- globalVersion.addToBSON(result, "globalVersion");
- result.appendBool("reloadConfig", true);
- return false;
- }
- if (!globalVersion.isSet() && !authoritative) {
- // Needed b/c when the last chunk is moved off a shard,
- // the version gets reset to zero, which should require a reload.
- while (shardingState->inCriticalMigrateSection()) {
- log() << "waiting till out of critical section";
- shardingState->waitTillNotInCriticalSection(10);
+ if (!collectionShardVersion.isSet() && !authoritative) {
+ // Needed b/c when the last chunk is moved off a shard, the version gets reset
+ // to zero, which should require a reload.
+ auto critSec =
+ shardingState->migrationSourceManager()->getMigrationCriticalSection();
+ if (critSec) {
+ autoColl.reset();
+ log() << "waiting till out of critical section";
+ critSec->waitUntilOutOfCriticalSection(Seconds(10));
+ }
+
+ // need authoritative for first look
+ result.append("ns", ns);
+ result.appendBool("need_authoritative", true);
+ errmsg = "first time for collection '" + ns + "'";
+ return false;
}
- // need authoritative for first look
- result.append("ns", ns);
- result.appendBool("need_authoritative", true);
- errmsg = "first time for collection '" + ns + "'";
- return false;
+ // Fall through to metadata reload below
}
-
- // Fall through to metadata reload below
}
- ChunkVersion currVersion;
- Status status = shardingState->refreshMetadataIfNeeded(txn, ns, version, &currVersion);
+ Status status = shardingState->onStaleShardVersion(txn, nss, requestedVersion);
+
+ {
+ AutoGetCollection autoColl(txn, nss, MODE_IS);
- if (!status.isOK()) {
- // The reload itself was interrupted or confused here
+ ChunkVersion currVersion = ChunkVersion::UNSHARDED();
+ auto collMetadata = CollectionShardingState::get(txn, nss)->getMetadata();
+ if (collMetadata) {
+ currVersion = collMetadata->getShardVersion();
+ }
- errmsg = str::stream() << "could not refresh metadata for " << ns
- << " with requested shard version " << version.toString()
- << ", stored shard version is " << currVersion.toString()
- << causedBy(status.reason());
+ if (!status.isOK()) {
+ // The reload itself was interrupted or confused here
- warning() << errmsg;
+ errmsg = str::stream()
+ << "could not refresh metadata for " << ns << " with requested shard version "
+ << requestedVersion.toString() << ", stored shard version is "
+ << currVersion.toString() << causedBy(status.reason());
- result.append("ns", ns);
- version.addToBSON(result, "version");
- currVersion.addToBSON(result, "globalVersion");
- result.appendBool("reloadConfig", true);
+ warning() << errmsg;
- return false;
- } else if (!version.isWriteCompatibleWith(currVersion)) {
- // We reloaded a version that doesn't match the version mongos was trying to
- // set.
+ result.append("ns", ns);
+ requestedVersion.addToBSON(result, "version");
+ currVersion.addToBSON(result, "globalVersion");
+ result.appendBool("reloadConfig", true);
- errmsg = str::stream() << "requested shard version differs from"
- << " config shard version for " << ns
- << ", requested version is " << version.toString()
- << " but found version " << currVersion.toString();
+ return false;
+ } else if (!requestedVersion.isWriteCompatibleWith(currVersion)) {
+ // We reloaded a version that doesn't match the version mongos was trying to
+ // set.
+ errmsg = str::stream() << "requested shard version differs from"
+ << " config shard version for " << ns
+ << ", requested version is " << requestedVersion.toString()
+ << " but found version " << currVersion.toString();
- OCCASIONALLY warning() << errmsg;
+ OCCASIONALLY warning() << errmsg;
- // WARNING: the exact fields below are important for compatibility with mongos
- // version reload.
+ // WARNING: the exact fields below are important for compatibility with mongos
+ // version reload.
- result.append("ns", ns);
- currVersion.addToBSON(result, "globalVersion");
+ result.append("ns", ns);
+ currVersion.addToBSON(result, "globalVersion");
+
+ // If this was a reset of a collection or the last chunk moved out, inform mongos to
+ // do a full reload.
+ if (currVersion.epoch() != requestedVersion.epoch() || !currVersion.isSet()) {
+ result.appendBool("reloadConfig", true);
+ // Zero-version also needed to trigger full mongos reload, sadly
+ // TODO: Make this saner, and less impactful (full reload on last chunk is bad)
+ ChunkVersion(0, 0, OID()).addToBSON(result, "version");
+ // For debugging
+ requestedVersion.addToBSON(result, "origVersion");
+ } else {
+ requestedVersion.addToBSON(result, "version");
+ }
- // If this was a reset of a collection or the last chunk moved out, inform mongos to
- // do a full reload.
- if (currVersion.epoch() != version.epoch() || !currVersion.isSet()) {
- result.appendBool("reloadConfig", true);
- // Zero-version also needed to trigger full mongos reload, sadly
- // TODO: Make this saner, and less impactful (full reload on last chunk is bad)
- ChunkVersion(0, 0, OID()).addToBSON(result, "version");
- // For debugging
- version.addToBSON(result, "origVersion");
- } else {
- version.addToBSON(result, "version");
+ return false;
}
-
- return false;
}
- info->setVersion(ns, version);
+ info->setVersion(ns, requestedVersion);
return true;
}
diff --git a/src/mongo/db/s/sharding_state.cpp b/src/mongo/db/s/sharding_state.cpp
index 775104d4e6b..2baf2f513cb 100644
--- a/src/mongo/db/s/sharding_state.cpp
+++ b/src/mongo/db/s/sharding_state.cpp
@@ -425,14 +425,6 @@ void ShardingState::mergeChunks(OperationContext* txn,
it->second->setMetadata(std::move(cloned));
}
-bool ShardingState::inCriticalMigrateSection() {
- return _migrationSourceManager.getInCriticalSection();
-}
-
-bool ShardingState::waitTillNotInCriticalSection(int maxSecondsToWait) {
- return _migrationSourceManager.waitTillNotInCriticalSection(maxSecondsToWait);
-}
-
void ShardingState::resetMetadata(const string& ns) {
stdx::lock_guard<stdx::mutex> lk(_mutex);
@@ -441,47 +433,45 @@ void ShardingState::resetMetadata(const string& ns) {
_collections.erase(ns);
}
-Status ShardingState::refreshMetadataIfNeeded(OperationContext* txn,
- const string& ns,
- const ChunkVersion& reqShardVersion,
- ChunkVersion* latestShardVersion) {
- // The _configServerTickets serializes this process such that only a small number of threads
- // can try to refresh at the same time.
-
- LOG(2) << "metadata refresh requested for " << ns << " at shard version " << reqShardVersion;
+Status ShardingState::onStaleShardVersion(OperationContext* txn,
+ const NamespaceString& nss,
+ const ChunkVersion& expectedVersion) {
+ invariant(!txn->lockState()->isLocked());
+ invariant(enabled());
- //
- // Queuing of refresh requests starts here when remote reload is needed. This may take time.
- // TODO: Explicitly expose the queuing discipline.
- //
+ LOG(2) << "metadata refresh requested for " << nss.ns() << " at shard version "
+ << expectedVersion;
- _configServerTickets.waitForTicket();
- TicketHolderReleaser needTicketFrom(&_configServerTickets);
+ // Ensure any ongoing migrations have completed
+ auto& oss = OperationShardingState::get(txn);
+ oss.waitForMigrationCriticalSection(txn);
- //
- // Fast path - check if the requested version is at a higher version than the current
- // metadata version or a different epoch before verifying against config server.
- //
+ ChunkVersion collectionShardVersion;
- shared_ptr<CollectionMetadata> storedMetadata;
+ // Fast path - check if the requested version is at a higher version than the current metadata
+ // version or a different epoch before verifying against config server.
{
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- CollectionShardingStateMap::iterator it = _collections.find(ns);
- if (it != _collections.end())
- storedMetadata = it->second->getMetadata();
- }
+ AutoGetCollection autoColl(txn, nss, MODE_IS);
- ChunkVersion storedShardVersion;
- if (storedMetadata)
- storedShardVersion = storedMetadata->getShardVersion();
- *latestShardVersion = storedShardVersion;
+ shared_ptr<CollectionMetadata> storedMetadata =
+ CollectionShardingState::get(txn, nss)->getMetadata();
+ if (storedMetadata) {
+ collectionShardVersion = storedMetadata->getShardVersion();
+ }
- if (storedShardVersion >= reqShardVersion &&
- storedShardVersion.epoch() == reqShardVersion.epoch()) {
- // Don't need to remotely reload if we're in the same epoch with a >= version
- return Status::OK();
+ if (collectionShardVersion >= expectedVersion &&
+ collectionShardVersion.epoch() == expectedVersion.epoch()) {
+ // Don't need to remotely reload if we're in the same epoch and the requested version is
+ // smaller than the one we know about. This means that the remote side is behind.
+ return Status::OK();
+ }
}
+ // The _configServerTickets serializes this process such that only a small number of threads can
+ // try to refresh at the same time
+ _configServerTickets.waitForTicket();
+ TicketHolderReleaser needTicketFrom(&_configServerTickets);
+
//
// Slow path - remotely reload
//
@@ -491,19 +481,20 @@ Status ShardingState::refreshMetadataIfNeeded(OperationContext* txn,
// C) Dropping a collection, notified (currently) by mongos.
// D) Stale client wants to reload metadata with a different *epoch*, so we aren't sure.
- if (storedShardVersion.epoch() != reqShardVersion.epoch()) {
+ if (collectionShardVersion.epoch() != expectedVersion.epoch()) {
// Need to remotely reload if our epochs aren't the same, to verify
- LOG(1) << "metadata change requested for " << ns << ", from shard version "
- << storedShardVersion << " to " << reqShardVersion
+ LOG(1) << "metadata change requested for " << nss.ns() << ", from shard version "
+ << collectionShardVersion << " to " << expectedVersion
<< ", need to verify with config server";
} else {
// Need to remotely reload since our epochs aren't the same but our version is greater
- LOG(1) << "metadata version update requested for " << ns << ", from shard version "
- << storedShardVersion << " to " << reqShardVersion
+ LOG(1) << "metadata version update requested for " << nss.ns() << ", from shard version "
+ << collectionShardVersion << " to " << expectedVersion
<< ", need to verify with config server";
}
- return _refreshMetadata(txn, ns, reqShardVersion, true, latestShardVersion);
+ ChunkVersion unusedLatestShardVersion;
+ return _refreshMetadata(txn, nss.ns(), expectedVersion, true, &unusedLatestShardVersion);
}
Status ShardingState::refreshMetadataNow(OperationContext* txn,
diff --git a/src/mongo/db/s/sharding_state.h b/src/mongo/db/s/sharding_state.h
index 19dc1be2978..defecd2aafa 100644
--- a/src/mongo/db/s/sharding_state.h
+++ b/src/mongo/db/s/sharding_state.h
@@ -135,24 +135,12 @@ public:
ChunkVersion getVersion(const std::string& ns);
/**
- * If the metadata for 'ns' at this shard is at or above the requested version,
- * 'reqShardVersion', returns OK and fills in 'latestShardVersion' with the latest shard
- * version. The latter is always greater or equal than 'reqShardVersion' if in the same epoch.
- *
- * Otherwise, falls back to refreshMetadataNow.
- *
- * This call blocks if there are more than _configServerTickets threads currently refreshing
- * metadata (currently set to 3).
- *
- * Locking Note:
- * + Must NOT be called with the write lock because this call may go into the network,
- * and deadlocks may occur with shard-as-a-config. Therefore, nothing here guarantees
- * that 'latestShardVersion' is indeed the current one on return.
+ * Refreshes the local metadata based on whether the expected version is higher than what we
+ * have cached.
*/
- Status refreshMetadataIfNeeded(OperationContext* txn,
- const std::string& ns,
- const ChunkVersion& reqShardVersion,
- ChunkVersion* latestShardVersion);
+ Status onStaleShardVersion(OperationContext* txn,
+ const NamespaceString& nss,
+ const ChunkVersion& expectedVersion);
/**
* Refreshes collection metadata by asking the config server for the latest information.
@@ -303,13 +291,6 @@ public:
const BSONObj& maxKey,
ChunkVersion mergedVersion);
- bool inCriticalMigrateSection();
-
- /**
- * @return true if we are NOT in the critical section
- */
- bool waitTillNotInCriticalSection(int maxSecondsToWait);
-
/**
* TESTING ONLY
* Uninstalls the metadata for a given collection.