summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/commands/find_and_modify.cpp51
-rw-r--r--src/mongo/db/commands/mr.cpp1
-rw-r--r--src/mongo/db/db_raii.cpp24
-rw-r--r--src/mongo/db/db_raii.h7
-rw-r--r--src/mongo/db/exec/idhack.cpp1
-rw-r--r--src/mongo/db/pipeline/document_source_cursor.cpp2
-rw-r--r--src/mongo/db/query/get_executor.cpp8
-rw-r--r--src/mongo/db/range_deleter_db_env.cpp1
-rw-r--r--src/mongo/db/repl/oplog.cpp1
-rw-r--r--src/mongo/db/s/collection_sharding_state.cpp106
-rw-r--r--src/mongo/db/s/collection_sharding_state.h33
-rw-r--r--src/mongo/db/s/migration_source_manager.cpp7
-rw-r--r--src/mongo/db/s/sharding_state.cpp117
-rw-r--r--src/mongo/db/s/sharding_state.h2
14 files changed, 241 insertions, 120 deletions
diff --git a/src/mongo/db/commands/find_and_modify.cpp b/src/mongo/db/commands/find_and_modify.cpp
index 20fd3df1466..0657d640e77 100644
--- a/src/mongo/db/commands/find_and_modify.cpp
+++ b/src/mongo/db/commands/find_and_modify.cpp
@@ -61,8 +61,8 @@
#include "mongo/db/query/plan_executor.h"
#include "mongo/db/repl/repl_client_info.h"
#include "mongo/db/repl/replication_coordinator_global.h"
+#include "mongo/db/s/collection_sharding_state.h"
#include "mongo/db/write_concern.h"
-#include "mongo/s/d_state.h"
#include "mongo/util/log.h"
#include "mongo/util/scopeguard.h"
@@ -257,19 +257,18 @@ public:
// Explain calls of the findAndModify command are read-only, but we take write
// locks so that the timing information is more accurate.
- AutoGetDb autoDb(txn, dbName, MODE_IX);
- Lock::CollectionLock collLock(txn->lockState(), nsString.ns(), MODE_IX);
-
- ensureShardVersionOKOrThrow(txn, nsString.ns());
-
- Collection* collection = nullptr;
- if (autoDb.getDb()) {
- collection = autoDb.getDb()->getCollection(nsString.ns());
- } else {
+ AutoGetCollection autoColl(txn, nsString, MODE_IX);
+ if (!autoColl.getDb()) {
return {ErrorCodes::NamespaceNotFound,
str::stream() << "database " << dbName << " does not exist."};
}
+ auto css = CollectionShardingState::get(txn, nsString);
+ if (css) {
+ css->checkShardVersionOrThrow(txn);
+ }
+
+ Collection* const collection = autoColl.getCollection();
auto statusWithPlanExecutor = getExecutorDelete(txn, collection, &parsedDelete);
if (!statusWithPlanExecutor.isOK()) {
return statusWithPlanExecutor.getStatus();
@@ -293,19 +292,18 @@ public:
// Explain calls of the findAndModify command are read-only, but we take write
// locks so that the timing information is more accurate.
- AutoGetDb autoDb(txn, dbName, MODE_IX);
- Lock::CollectionLock collLock(txn->lockState(), nsString.ns(), MODE_IX);
-
- ensureShardVersionOKOrThrow(txn, nsString.ns());
-
- Collection* collection = nullptr;
- if (autoDb.getDb()) {
- collection = autoDb.getDb()->getCollection(nsString.ns());
- } else {
+ AutoGetCollection autoColl(txn, nsString, MODE_IX);
+ if (!autoColl.getDb()) {
return {ErrorCodes::NamespaceNotFound,
str::stream() << "database " << dbName << " does not exist."};
}
+ auto css = CollectionShardingState::get(txn, nsString);
+ if (css) {
+ css->checkShardVersionOrThrow(txn);
+ }
+
+ Collection* collection = autoColl.getCollection();
auto statusWithPlanExecutor =
getExecutorUpdate(txn, collection, &parsedUpdate, opDebug);
if (!statusWithPlanExecutor.isOK()) {
@@ -376,7 +374,6 @@ public:
AutoGetOrCreateDb autoDb(txn, dbName, MODE_IX);
Lock::CollectionLock collLock(txn->lockState(), nsString.ns(), MODE_IX);
- Collection* collection = autoDb.getDb()->getCollection(nsString.ns());
// Attach the namespace and database profiling level to the current op.
{
@@ -385,13 +382,17 @@ public:
->enter_inlock(nsString.ns().c_str(), autoDb.getDb()->getProfilingLevel());
}
- ensureShardVersionOKOrThrow(txn, nsString.ns());
+ auto css = CollectionShardingState::get(txn, nsString);
+ if (css) {
+ css->checkShardVersionOrThrow(txn);
+ }
Status isPrimary = checkCanAcceptWritesForDatabase(nsString);
if (!isPrimary.isOK()) {
return appendCommandStatus(result, isPrimary);
}
+ Collection* const collection = autoDb.getDb()->getCollection(nsString.ns());
auto statusWithPlanExecutor = getExecutorDelete(txn, collection, &parsedDelete);
if (!statusWithPlanExecutor.isOK()) {
return appendCommandStatus(result, statusWithPlanExecutor.getStatus());
@@ -435,7 +436,6 @@ public:
AutoGetOrCreateDb autoDb(txn, dbName, MODE_IX);
Lock::CollectionLock collLock(txn->lockState(), nsString.ns(), MODE_IX);
- Collection* collection = autoDb.getDb()->getCollection(nsString.ns());
// Attach the namespace and database profiling level to the current op.
{
@@ -444,13 +444,18 @@ public:
->enter_inlock(nsString.ns().c_str(), autoDb.getDb()->getProfilingLevel());
}
- ensureShardVersionOKOrThrow(txn, nsString.ns());
+ auto css = CollectionShardingState::get(txn, nsString);
+ if (css) {
+ css->checkShardVersionOrThrow(txn);
+ }
Status isPrimary = checkCanAcceptWritesForDatabase(nsString);
if (!isPrimary.isOK()) {
return appendCommandStatus(result, isPrimary);
}
+ Collection* collection = autoDb.getDb()->getCollection(nsString.ns());
+
// Create the collection if it does not exist when performing an upsert
// because the update stage does not create its own collection.
if (!collection && args.isUpsert()) {
diff --git a/src/mongo/db/commands/mr.cpp b/src/mongo/db/commands/mr.cpp
index 593ae3dddd4..33291845269 100644
--- a/src/mongo/db/commands/mr.cpp
+++ b/src/mongo/db/commands/mr.cpp
@@ -70,7 +70,6 @@
#include "mongo/s/chunk_manager.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/config.h"
-#include "mongo/s/d_state.h"
#include "mongo/s/grid.h"
#include "mongo/s/shard_key_pattern.h"
#include "mongo/s/stale_exception.h"
diff --git a/src/mongo/db/db_raii.cpp b/src/mongo/db/db_raii.cpp
index b0da72c5bb8..162017fc0f4 100644
--- a/src/mongo/db/db_raii.cpp
+++ b/src/mongo/db/db_raii.cpp
@@ -36,8 +36,8 @@
#include "mongo/db/client.h"
#include "mongo/db/curop.h"
#include "mongo/db/repl/replication_coordinator_global.h"
+#include "mongo/db/s/collection_sharding_state.h"
#include "mongo/db/stats/top.h"
-#include "mongo/s/d_state.h"
namespace mongo {
@@ -46,9 +46,15 @@ AutoGetDb::AutoGetDb(OperationContext* txn, StringData ns, LockMode mode)
AutoGetCollection::AutoGetCollection(OperationContext* txn,
const NamespaceString& nss,
- LockMode mode)
- : _autoDb(txn, nss.db(), mode),
- _collLock(txn->lockState(), nss.ns(), mode),
+ LockMode modeAll)
+ : AutoGetCollection(txn, nss, modeAll, modeAll) {}
+
+AutoGetCollection::AutoGetCollection(OperationContext* txn,
+ const NamespaceString& nss,
+ LockMode modeDB,
+ LockMode modeColl)
+ : _autoDb(txn, nss.db(), modeDB),
+ _collLock(txn->lockState(), nss.ns(), modeColl),
_coll(_autoDb.getDb() ? _autoDb.getDb()->getCollection(nss) : nullptr) {}
AutoGetOrCreateDb::AutoGetOrCreateDb(OperationContext* txn, StringData ns, LockMode mode)
@@ -95,7 +101,10 @@ AutoGetCollectionForRead::AutoGetCollectionForRead(OperationContext* txn,
// We have both the DB and collection locked, which is the prerequisite to do a stable shard
// version check, but we'd like to do the check after we have a satisfactory snapshot.
- ensureShardVersionOKOrThrow(_txn, nss.ns());
+ auto css = CollectionShardingState::get(txn, nss);
+ if (css) {
+ css->checkShardVersionOrThrow(txn);
+ }
}
AutoGetCollectionForRead::~AutoGetCollectionForRead() {
@@ -186,7 +195,10 @@ void OldClientContext::_checkNotStale() const {
case dbDelete: // here as well.
break;
default:
- ensureShardVersionOKOrThrow(_txn, _ns);
+ auto css = CollectionShardingState::get(_txn, _ns);
+ if (css) {
+ css->checkShardVersionOrThrow(_txn);
+ }
}
}
diff --git a/src/mongo/db/db_raii.h b/src/mongo/db/db_raii.h
index 948cf014c90..654996b676c 100644
--- a/src/mongo/db/db_raii.h
+++ b/src/mongo/db/db_raii.h
@@ -74,7 +74,12 @@ class AutoGetCollection {
MONGO_DISALLOW_COPYING(AutoGetCollection);
public:
- AutoGetCollection(OperationContext* txn, const NamespaceString& nss, LockMode mode);
+ AutoGetCollection(OperationContext* txn, const NamespaceString& nss, LockMode modeAll);
+
+ AutoGetCollection(OperationContext* txn,
+ const NamespaceString& nss,
+ LockMode modeDB,
+ LockMode modeColl);
Database* getDb() const {
return _autoDb.getDb();
diff --git a/src/mongo/db/exec/idhack.cpp b/src/mongo/db/exec/idhack.cpp
index affe2e4dc50..759ca91153f 100644
--- a/src/mongo/db/exec/idhack.cpp
+++ b/src/mongo/db/exec/idhack.cpp
@@ -38,7 +38,6 @@
#include "mongo/db/exec/working_set_computed_data.h"
#include "mongo/db/index/btree_access_method.h"
#include "mongo/db/storage/record_fetcher.h"
-#include "mongo/s/d_state.h"
#include "mongo/stdx/memory.h"
namespace mongo {
diff --git a/src/mongo/db/pipeline/document_source_cursor.cpp b/src/mongo/db/pipeline/document_source_cursor.cpp
index c91c7fb2110..803799d8774 100644
--- a/src/mongo/db/pipeline/document_source_cursor.cpp
+++ b/src/mongo/db/pipeline/document_source_cursor.cpp
@@ -30,7 +30,6 @@
#include "mongo/db/pipeline/document_source.h"
-
#include "mongo/db/catalog/database_holder.h"
#include "mongo/db/db_raii.h"
#include "mongo/db/exec/working_set_common.h"
@@ -39,7 +38,6 @@
#include "mongo/db/query/explain.h"
#include "mongo/db/query/find_common.h"
#include "mongo/db/storage/storage_options.h"
-#include "mongo/s/d_state.h"
namespace mongo {
diff --git a/src/mongo/db/query/get_executor.cpp b/src/mongo/db/query/get_executor.cpp
index faea115ca02..ade024378e6 100644
--- a/src/mongo/db/query/get_executor.cpp
+++ b/src/mongo/db/query/get_executor.cpp
@@ -75,6 +75,7 @@
#include "mongo/db/server_parameters.h"
#include "mongo/db/service_context.h"
#include "mongo/db/s/collection_metadata.h"
+#include "mongo/db/s/collection_sharding_state.h"
#include "mongo/db/s/sharding_state.h"
#include "mongo/db/storage/storage_options.h"
#include "mongo/db/storage/oplog_hack.h"
@@ -85,7 +86,6 @@
namespace mongo {
using std::unique_ptr;
-using std::endl;
using std::string;
using std::vector;
using stdx::make_unique;
@@ -174,7 +174,7 @@ void fillOutPlannerParams(OperationContext* txn,
// If the caller wants a shard filter, make sure we're actually sharded.
if (plannerParams->options & QueryPlannerParams::INCLUDE_SHARD_FILTER) {
std::shared_ptr<CollectionMetadata> collMetadata =
- ShardingState::get(txn)->getCollectionMetadata(canonicalQuery->ns());
+ CollectionShardingState::get(txn, canonicalQuery->nss())->getMetadata();
if (collMetadata) {
plannerParams->shardKey = collMetadata->getKeyPattern();
} else {
@@ -259,7 +259,7 @@ Status prepareExecution(OperationContext* opCtx,
if (plannerParams.options & QueryPlannerParams::INCLUDE_SHARD_FILTER) {
*rootOut = new ShardFilterStage(
opCtx,
- ShardingState::get(opCtx)->getCollectionMetadata(collection->ns().ns()),
+ CollectionShardingState::get(opCtx, canonicalQuery->nss())->getMetadata(),
ws,
*rootOut);
}
@@ -648,7 +648,7 @@ StatusWith<unique_ptr<PlanExecutor>> getExecutorDelete(OperationContext* txn,
12050, "cannot delete from system namespace", legalClientSystemNS(nss.ns(), true));
}
if (nss.ns().find('$') != string::npos) {
- log() << "cannot delete from collection with reserved $ in name: " << nss << endl;
+ log() << "cannot delete from collection with reserved $ in name: " << nss;
uasserted(10100, "cannot delete from collection with reserved $ in name");
}
}
diff --git a/src/mongo/db/range_deleter_db_env.cpp b/src/mongo/db/range_deleter_db_env.cpp
index 0a4a6734913..4132dfde863 100644
--- a/src/mongo/db/range_deleter_db_env.cpp
+++ b/src/mongo/db/range_deleter_db_env.cpp
@@ -42,7 +42,6 @@
#include "mongo/db/repl/replication_coordinator_global.h"
#include "mongo/db/s/operation_sharding_state.h"
#include "mongo/db/write_concern_options.h"
-#include "mongo/s/d_state.h"
#include "mongo/util/log.h"
namespace mongo {
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp
index d371b7d6040..dcada5fd460 100644
--- a/src/mongo/db/repl/oplog.cpp
+++ b/src/mongo/db/repl/oplog.cpp
@@ -84,7 +84,6 @@
#include "mongo/db/storage/storage_options.h"
#include "mongo/db/storage/storage_engine.h"
#include "mongo/platform/random.h"
-#include "mongo/s/d_state.h"
#include "mongo/scripting/engine.h"
#include "mongo/stdx/memory.h"
#include "mongo/util/elapsed_tracker.h"
diff --git a/src/mongo/db/s/collection_sharding_state.cpp b/src/mongo/db/s/collection_sharding_state.cpp
index 9eec8de16e4..e0fa35d4da2 100644
--- a/src/mongo/db/s/collection_sharding_state.cpp
+++ b/src/mongo/db/s/collection_sharding_state.cpp
@@ -32,15 +32,21 @@
#include "mongo/db/s/collection_sharding_state.h"
+#include "mongo/db/client.h"
#include "mongo/db/concurrency/lock_state.h"
#include "mongo/db/operation_context.h"
+#include "mongo/db/repl/replication_coordinator_global.h"
#include "mongo/db/s/collection_metadata.h"
#include "mongo/db/s/operation_sharding_state.h"
+#include "mongo/db/s/sharded_connection_info.h"
#include "mongo/db/s/sharding_state.h"
#include "mongo/s/chunk_version.h"
+#include "mongo/s/stale_exception.h"
namespace mongo {
+using std::string;
+
CollectionShardingState::CollectionShardingState(
NamespaceString nss, std::unique_ptr<CollectionMetadata> initialMetadata)
: _nss(std::move(nss)), _metadata(std::move(initialMetadata)) {}
@@ -62,10 +68,22 @@ CollectionShardingState* CollectionShardingState::get(OperationContext* txn,
}
void CollectionShardingState::setMetadata(std::shared_ptr<CollectionMetadata> newMetadata) {
- invariant(newMetadata);
_metadata = std::move(newMetadata);
}
+void CollectionShardingState::checkShardVersionOrThrow(OperationContext* txn) const {
+ string errmsg;
+ ChunkVersion received;
+ ChunkVersion wanted;
+ if (!_checkShardVersionOk(txn, &errmsg, &received, &wanted)) {
+ throw SendStaleConfigException(_nss.ns(),
+ str::stream() << "[" << _nss.ns()
+ << "] shard version not ok: " << errmsg,
+ received,
+ wanted);
+ }
+}
+
bool CollectionShardingState::isDocumentInMigratingChunk(OperationContext* txn,
const BSONObj& doc) {
dassert(txn->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX));
@@ -76,6 +94,8 @@ bool CollectionShardingState::isDocumentInMigratingChunk(OperationContext* txn,
void CollectionShardingState::onInsertOp(OperationContext* txn, const BSONObj& insertedDoc) {
dassert(txn->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX));
+ checkShardVersionOrThrow(txn);
+
ShardingState::get(txn)->migrationSourceManager()->logInsertOp(
txn, _nss.ns().c_str(), insertedDoc);
}
@@ -83,6 +103,8 @@ void CollectionShardingState::onInsertOp(OperationContext* txn, const BSONObj& i
void CollectionShardingState::onUpdateOp(OperationContext* txn, const BSONObj& updatedDoc) {
dassert(txn->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX));
+ checkShardVersionOrThrow(txn);
+
ShardingState::get(txn)->migrationSourceManager()->logUpdateOp(
txn, _nss.ns().c_str(), updatedDoc);
}
@@ -90,8 +112,90 @@ void CollectionShardingState::onUpdateOp(OperationContext* txn, const BSONObj& u
void CollectionShardingState::onDeleteOp(OperationContext* txn, const BSONObj& deletedDocId) {
dassert(txn->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX));
+ checkShardVersionOrThrow(txn);
+
ShardingState::get(txn)->migrationSourceManager()->logDeleteOp(
txn, _nss.ns().c_str(), deletedDocId);
}
+bool CollectionShardingState::_checkShardVersionOk(OperationContext* txn,
+ string* errmsg,
+ ChunkVersion* expectedShardVersion,
+ ChunkVersion* actualShardVersion) const {
+ Client* client = txn->getClient();
+
+ // Operations using the DBDirectClient are unversioned.
+ if (client->isInDirectClient()) {
+ return true;
+ }
+
+ if (!repl::ReplicationCoordinator::get(txn)->canAcceptWritesForDatabase(_nss.db())) {
+ // right now connections to secondaries aren't versioned at all
+ return true;
+ }
+
+ const auto& oss = OperationShardingState::get(txn);
+
+ // If there is a version attached to the OperationContext, use it as the received version.
+ // Otherwise, get the received version from the ShardedConnectionInfo.
+ if (oss.hasShardVersion()) {
+ *expectedShardVersion = oss.getShardVersion(_nss);
+ } else {
+ ShardedConnectionInfo* info = ShardedConnectionInfo::get(client, false);
+ if (!info) {
+ // There is no shard version information on either 'txn' or 'client'. This means that
+ // the operation represented by 'txn' is unversioned, and the shard version is always OK
+ // for unversioned operations.
+ return true;
+ }
+
+ *expectedShardVersion = info->getVersion(_nss.ns());
+ }
+
+ if (ChunkVersion::isIgnoredVersion(*expectedShardVersion)) {
+ return true;
+ }
+
+ *actualShardVersion = (_metadata ? _metadata->getShardVersion() : ChunkVersion::UNSHARDED());
+
+ if (expectedShardVersion->isWriteCompatibleWith(*actualShardVersion)) {
+ return true;
+ }
+
+ //
+ // Figure out exactly why not compatible, send appropriate error message
+ // The versions themselves are returned in the error, so not needed in messages here
+ //
+
+ // Check epoch first, to send more meaningful message, since other parameters probably won't
+ // match either.
+ if (actualShardVersion->epoch() != expectedShardVersion->epoch()) {
+ *errmsg = str::stream() << "version epoch mismatch detected for " << _nss.ns() << ", "
+ << "the collection may have been dropped and recreated";
+ return false;
+ }
+
+ if (!actualShardVersion->isSet() && expectedShardVersion->isSet()) {
+ *errmsg = str::stream() << "this shard no longer contains chunks for " << _nss.ns() << ", "
+ << "the collection may have been dropped";
+ return false;
+ }
+
+ if (actualShardVersion->isSet() && !expectedShardVersion->isSet()) {
+ *errmsg = str::stream() << "this shard contains versioned chunks for " << _nss.ns() << ", "
+ << "but no version set in request";
+ return false;
+ }
+
+ if (actualShardVersion->majorVersion() != expectedShardVersion->majorVersion()) {
+ // Could be > or < - wanted is > if this is the source of a migration, wanted < if this is
+ // the target of a migration
+ *errmsg = str::stream() << "version mismatch detected for " << _nss.ns();
+ return false;
+ }
+
+ // Those are all the reasons the versions can mismatch
+ MONGO_UNREACHABLE;
+}
+
} // namespace mongo
diff --git a/src/mongo/db/s/collection_sharding_state.h b/src/mongo/db/s/collection_sharding_state.h
index 73f076fb37b..f9f3255597b 100644
--- a/src/mongo/db/s/collection_sharding_state.h
+++ b/src/mongo/db/s/collection_sharding_state.h
@@ -37,6 +37,7 @@
namespace mongo {
class BSONObj;
+struct ChunkVersion;
class CollectionMetadata;
class OperationContext;
@@ -81,6 +82,17 @@ public:
*/
void setMetadata(std::shared_ptr<CollectionMetadata> newMetadata);
+ /**
+ * Checks whether the shard version in the context is compatible with the shard version of the
+ * collection locally and if not throws SendStaleConfigException populated with the expected and
+ * actual versions.
+ *
+ * Because SendStaleConfigException has special semantics in terms of how a sharded command's
+ * response is constructed, this function should be the only means of checking for shard version
+ * match.
+ */
+ void checkShardVersionOrThrow(OperationContext* txn) const;
+
// Replication subsystem hooks. If this collection is serving as a source for migration, these
// methods inform it of any changes to its contents.
@@ -93,10 +105,29 @@ public:
void onDeleteOp(OperationContext* txn, const BSONObj& deletedDocId);
private:
+ /**
+ * Checks whether the shard version of the operation matches that of the collection.
+ *
+ * txn - Operation context from which to retrieve the operation's expected version.
+ * errmsg (out) - On false return contains an explanatory error message.
+ * expectedShardVersion (out) - On false return contains the expected collection version on this
+ * shard. Obtained from the operation sharding state.
+ * actualShardVersion (out) - On false return contains the actual collection version on this
+ * shard. Obtained from the collection sharding state.
+ *
+ * Returns true if the expected collection version on the shard matches its actual version on
+ * the shard and false otherwise. Upon false return, the output parameters will be set.
+ */
+ bool _checkShardVersionOk(OperationContext* txn,
+ std::string* errmsg,
+ ChunkVersion* expectedShardVersion,
+ ChunkVersion* actualShardVersion) const;
+
// Namespace to which this state belongs.
const NamespaceString _nss;
- // Contains all the chunks associated with this collection. This value is always non-null.
+ // Contains all the chunks associated with this collection. This value will be null if the
+ // collection is not sharded.
std::shared_ptr<CollectionMetadata> _metadata;
};
diff --git a/src/mongo/db/s/migration_source_manager.cpp b/src/mongo/db/s/migration_source_manager.cpp
index 58d12719867..c9dacc78491 100644
--- a/src/mongo/db/s/migration_source_manager.cpp
+++ b/src/mongo/db/s/migration_source_manager.cpp
@@ -47,7 +47,6 @@
#include "mongo/db/record_id.h"
#include "mongo/logger/ramlog.h"
#include "mongo/s/chunk.h"
-#include "mongo/s/d_state.h"
#include "mongo/s/shard_key_pattern.h"
#include "mongo/util/elapsed_tracker.h"
#include "mongo/util/log.h"
@@ -224,8 +223,6 @@ void MigrationSourceManager::done(OperationContext* txn) {
void MigrationSourceManager::logInsertOp(OperationContext* txn,
const char* ns,
const BSONObj& obj) {
- ensureShardVersionOKOrThrow(txn, ns);
-
dassert(txn->lockState()->isWriteLocked()); // Must have Global IX.
if (!_sessionId || (_nss != ns))
@@ -249,8 +246,6 @@ void MigrationSourceManager::logInsertOp(OperationContext* txn,
void MigrationSourceManager::logUpdateOp(OperationContext* txn,
const char* ns,
const BSONObj& updatedDoc) {
- ensureShardVersionOKOrThrow(txn, ns);
-
dassert(txn->lockState()->isWriteLocked()); // Must have Global IX.
if (!_sessionId || (_nss != ns))
@@ -274,8 +269,6 @@ void MigrationSourceManager::logUpdateOp(OperationContext* txn,
void MigrationSourceManager::logDeleteOp(OperationContext* txn,
const char* ns,
const BSONObj& obj) {
- ensureShardVersionOKOrThrow(txn, ns);
-
dassert(txn->lockState()->isWriteLocked()); // Must have Global IX.
BSONElement idElement = obj["_id"];
diff --git a/src/mongo/db/s/sharding_state.cpp b/src/mongo/db/s/sharding_state.cpp
index a125a8b155d..775104d4e6b 100644
--- a/src/mongo/db/s/sharding_state.cpp
+++ b/src/mongo/db/s/sharding_state.cpp
@@ -36,7 +36,7 @@
#include "mongo/client/connection_string.h"
#include "mongo/client/replica_set_monitor.h"
#include "mongo/db/client.h"
-#include "mongo/db/concurrency/lock_state.h"
+#include "mongo/db/db_raii.h"
#include "mongo/db/dbhelpers.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/repl/optime.h"
@@ -218,7 +218,10 @@ CollectionShardingState* ShardingState::getNS(const std::string& ns) {
stdx::lock_guard<stdx::mutex> lk(_mutex);
CollectionShardingStateMap::iterator it = _collections.find(ns);
if (it == _collections.end()) {
- return nullptr;
+ auto inserted = _collections.insert(make_pair(
+ ns, stdx::make_unique<CollectionShardingState>(NamespaceString(ns), nullptr)));
+ invariant(inserted.second);
+ it = std::move(inserted.first);
}
return it->second.get();
@@ -229,21 +232,22 @@ void ShardingState::clearCollectionMetadata() {
_collections.clear();
}
-// TODO we shouldn't need three ways for checking the version. Fix this.
-bool ShardingState::hasVersion(const string& ns) {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- return !!_collections.count(ns);
-}
-
ChunkVersion ShardingState::getVersion(const string& ns) {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- CollectionShardingStateMap::const_iterator it = _collections.find(ns);
- if (it != _collections.end()) {
- shared_ptr<CollectionMetadata> p = it->second->getMetadata();
+ shared_ptr<CollectionMetadata> p;
+
+ {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ CollectionShardingStateMap::const_iterator it = _collections.find(ns);
+ if (it != _collections.end()) {
+ p = it->second->getMetadata();
+ }
+ }
+
+ if (p) {
return p->getShardVersion();
- } else {
- return ChunkVersion(0, 0, OID());
}
+
+ return ChunkVersion::UNSHARDED();
}
void ShardingState::donateChunk(OperationContext* txn,
@@ -618,24 +622,16 @@ Status ShardingState::_refreshMetadata(OperationContext* txn,
const ChunkVersion& reqShardVersion,
bool useRequestedVersion,
ChunkVersion* latestShardVersion) {
+ invariant(!txn->lockState()->isLocked());
+
Status status = _waitForInitialization(txn);
if (!status.isOK())
return status;
- // The idea here is that we're going to reload the metadata from the config server, but
- // we need to do so outside any locks. When we get our result back, if the current metadata
- // has changed, we may not be able to install the new metadata.
-
- //
- // Get the initial metadata
- // No DBLock is needed since the metadata is expected to change during reload.
- //
-
- shared_ptr<CollectionMetadata> beforeMetadata;
-
+ // We can't reload if a shard name has not yet been set
{
stdx::lock_guard<stdx::mutex> lk(_mutex);
- // We also can't reload if a shard name has not yet been set.
+
if (_shardName.empty()) {
string errMsg = str::stream() << "cannot refresh metadata for " << ns
<< " before shard name has been set";
@@ -643,15 +639,24 @@ Status ShardingState::_refreshMetadata(OperationContext* txn,
warning() << errMsg;
return Status(ErrorCodes::NotYetInitialized, errMsg);
}
+ }
- CollectionShardingStateMap::iterator it = _collections.find(ns);
- if (it != _collections.end()) {
- beforeMetadata = it->second->getMetadata();
- }
+ const NamespaceString nss(ns);
+
+ // The idea here is that we're going to reload the metadata from the config server, but we need
+ // to do so outside any locks. When we get our result back, if the current metadata has
+ // changed, we may not be able to install the new metadata.
+ shared_ptr<CollectionMetadata> beforeMetadata;
+ {
+ ScopedTransaction transaction(txn, MODE_IS);
+ AutoGetCollection autoColl(txn, nss, MODE_IS);
+
+ beforeMetadata = CollectionShardingState::get(txn, nss)->getMetadata();
}
ChunkVersion beforeShardVersion;
ChunkVersion beforeCollVersion;
+
if (beforeMetadata) {
beforeShardVersion = beforeMetadata->getShardVersion();
beforeCollVersion = beforeMetadata->getCollVersion();
@@ -696,7 +701,7 @@ Status ShardingState::_refreshMetadata(OperationContext* txn,
grid.catalogManager(txn),
ns,
getShardName(),
- fullReload ? NULL : beforeMetadata.get(),
+ fullReload ? nullptr : beforeMetadata.get(),
remoteMetadata.get());
refreshMillis = refreshTimer.millis();
@@ -739,29 +744,11 @@ Status ShardingState::_refreshMetadata(OperationContext* txn,
// Exclusive collection lock needed since we're now potentially changing the metadata,
// and don't want reads/writes to be ongoing.
ScopedTransaction transaction(txn, MODE_IX);
- Lock::DBLock dbLock(txn->lockState(), nsToDatabaseSubstring(ns), MODE_IX);
- Lock::CollectionLock collLock(txn->lockState(), ns, MODE_X);
+ AutoGetCollection autoColl(txn, nss, MODE_IX, MODE_X);
- //
// Get the metadata now that the load has completed
- //
-
- // Don't reload if our config server has changed or sharding is no longer enabled
- if (!enabled()) {
- string errMsg = str::stream() << "could not refresh metadata for " << ns
- << ", sharding is no longer enabled";
-
- warning() << errMsg;
- return Status(ErrorCodes::NotYetInitialized, errMsg);
- }
-
- stdx::lock_guard<stdx::mutex> lk(_mutex);
-
- CollectionShardingStateMap::iterator it = _collections.find(ns);
- if (it != _collections.end()) {
- afterMetadata = it->second->getMetadata();
- }
-
+ auto css = CollectionShardingState::get(txn, nss);
+ afterMetadata = css->getMetadata();
if (afterMetadata) {
afterShardVersion = afterMetadata->getShardVersion();
afterCollVersion = afterMetadata->getCollVersion();
@@ -774,7 +761,6 @@ Status ShardingState::_refreshMetadata(OperationContext* txn,
//
Status status = mdLoader.promotePendingChunks(afterMetadata.get(), remoteMetadata.get());
-
if (!status.isOK()) {
warning() << "remote metadata for " << ns
<< " is inconsistent with current pending chunks"
@@ -797,31 +783,22 @@ Status ShardingState::_refreshMetadata(OperationContext* txn,
if (!afterCollVersion.epoch().isSet()) {
// First metadata load
installType = InstallType_New;
- dassert(it == _collections.end());
- _collections.insert(make_pair(ns,
- stdx::make_unique<CollectionShardingState>(
- NamespaceString(ns), std::move(remoteMetadata))));
+ css->setMetadata(std::move(remoteMetadata));
} else if (remoteCollVersion.epoch().isSet() &&
remoteCollVersion.epoch() == afterCollVersion.epoch()) {
// Update to existing metadata
installType = InstallType_Update;
-
- // Invariant: If CollMetadata was not found, version should be have been 0.
- dassert(it != _collections.end());
- it->second->setMetadata(std::move(remoteMetadata));
+ css->setMetadata(std::move(remoteMetadata));
} else if (remoteCollVersion.epoch().isSet()) {
// New epoch detected, replacing metadata
installType = InstallType_Replace;
-
- // Invariant: If CollMetadata was not found, version should be have been 0.
- dassert(it != _collections.end());
- it->second->setMetadata(std::move(remoteMetadata));
+ css->setMetadata(std::move(remoteMetadata));
} else {
dassert(!remoteCollVersion.epoch().isSet());
// Drop detected
installType = InstallType_Drop;
- _collections.erase(it);
+ css->setMetadata(nullptr);
}
*latestShardVersion = remoteShardVersion;
@@ -906,15 +883,17 @@ void ShardingState::appendInfo(OperationContext* txn, BSONObjBuilder& builder) {
it != _collections.end();
++it) {
shared_ptr<CollectionMetadata> metadata = it->second->getMetadata();
- versionB.appendTimestamp(it->first, metadata->getShardVersion().toLong());
+ if (metadata) {
+ versionB.appendTimestamp(it->first, metadata->getShardVersion().toLong());
+ } else {
+ versionB.appendTimestamp(it->first, ChunkVersion::UNSHARDED().toLong());
+ }
}
versionB.done();
}
bool ShardingState::needCollectionMetadata(OperationContext* txn, const string& ns) {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
-
if (!enabled())
return false;
diff --git a/src/mongo/db/s/sharding_state.h b/src/mongo/db/s/sharding_state.h
index 2383660ae16..19dc1be2978 100644
--- a/src/mongo/db/s/sharding_state.h
+++ b/src/mongo/db/s/sharding_state.h
@@ -132,8 +132,6 @@ public:
*/
void clearCollectionMetadata();
- bool hasVersion(const std::string& ns);
-
ChunkVersion getVersion(const std::string& ns);
/**