diff options
author | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2020-01-29 10:47:13 -0500 |
---|---|---|
committer | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2020-01-29 11:37:03 -0500 |
commit | c25809db3532d2af31321648790e775bd420e600 (patch) | |
tree | 779ebc7776b03a4cc400f8dc7750381338271eb6 | |
parent | a7e9c2223a6aa92f6648d3800d251d8335cd1881 (diff) | |
download | mongo-c25809db3532d2af31321648790e775bd420e600.tar.gz |
Revert "SERVER-45599 Backport of SERVER-32198: Split CollectionShardingState::getMetadata into three methods"
This reverts commit a414e4ceafb45dc6ebf4daeb9198f0a7f3fb189c.
32 files changed, 522 insertions, 570 deletions
diff --git a/jstests/sharding/cursor_valid_after_shard_stepdown.js b/jstests/sharding/cursor_valid_after_shard_stepdown.js index 01e7ee57c30..ea0dd72338f 100644 --- a/jstests/sharding/cursor_valid_after_shard_stepdown.js +++ b/jstests/sharding/cursor_valid_after_shard_stepdown.js @@ -38,11 +38,10 @@ TestData.skipCheckingUUIDsConsistentAcrossCluster = true; assert.eq(0, getMoreCursor.id); assert.eq(2, getMoreCursor.nextBatch[0].x); - // After stepdown, the shard version will be reset var shardVersionAfterStepdown = assert.commandWorked(st.rs0.getPrimary().adminCommand({getShardVersion: 'TestDB.TestColl'})) .global; - assert.eq("UNKNOWN", shardVersionAfterStepdown); + assert.eq(Timestamp(0, 0), shardVersionAfterStepdown); st.stop(); })(); diff --git a/jstests/sharding/dump_coll_metadata.js b/jstests/sharding/dump_coll_metadata.js index c1254985dd6..30726f4746d 100644 --- a/jstests/sharding/dump_coll_metadata.js +++ b/jstests/sharding/dump_coll_metadata.js @@ -25,13 +25,13 @@ assert.eq(metadata.chunks.length, 1); assert.eq(metadata.pending.length, 0); - assert.eq(metadata.chunks[0][0]._id, MinKey); - assert.eq(metadata.chunks[0][1]._id, MaxKey); - assert.eq(metadata.shardVersion, result.global); + assert(metadata.chunks[0][0]._id + "" == MinKey + ""); + assert(metadata.chunks[0][1]._id + "" == MaxKey + ""); + assert(metadata.shardVersion + "" == result.global + ""); // Make sure a collection with no metadata still returns the metadata field - assert.neq(shardAdmin.runCommand({getShardVersion: coll + "xyz", fullMetadata: true}).metadata, - undefined); + assert(shardAdmin.runCommand({getShardVersion: coll + "xyz", fullMetadata: true}).metadata != + undefined); // Make sure we get multiple chunks after a split and refresh -- splits by themselves do not // cause the shard to refresh. diff --git a/jstests/sharding/features1.js b/jstests/sharding/features1.js index 2fabc5a514b..4f36d5675d6 100644 --- a/jstests/sharding/features1.js +++ b/jstests/sharding/features1.js @@ -1,63 +1,81 @@ (function() { - 'use strict'; var s = new ShardingTest({name: "features1", shards: 2, mongos: 1}); - assert.commandWorked(s.s0.adminCommand({enablesharding: "test"})); + + s.adminCommand({enablesharding: "test"}); s.ensurePrimaryShard('test', s.shard1.shardName); // ---- can't shard system namespaces ---- - assert.commandFailed(s.s0.adminCommand({shardcollection: "test.system.blah", key: {num: 1}}), - "shard system namespace"); + + assert(!s.admin.runCommand({shardcollection: "test.system.blah", key: {num: 1}}).ok, + "shard system namespace"); // ---- setup test.foo ----- - assert.commandWorked(s.s0.adminCommand({shardcollection: "test.foo", key: {num: 1}})); - let db = s.s0.getDB("test"); - assert.commandWorked(db.foo.createIndex({y: 1})); + s.adminCommand({shardcollection: "test.foo", key: {num: 1}}); + + db = s.getDB("test"); + + a = s._connections[0].getDB("test"); + b = s._connections[1].getDB("test"); - assert.commandWorked(s.s0.adminCommand({split: "test.foo", middle: {num: 10}})); - assert.commandWorked(s.s0.adminCommand( - {movechunk: "test.foo", find: {num: 20}, to: s.getOther(s.getPrimaryShard("test")).name})); + db.foo.ensureIndex({y: 1}); - assert.writeOK(db.foo.insert({num: 5})); - assert.writeOK(db.foo.save({num: 15})); + s.adminCommand({split: "test.foo", middle: {num: 10}}); + s.adminCommand( + {movechunk: "test.foo", find: {num: 20}, to: s.getOther(s.getPrimaryShard("test")).name}); - let a = s.rs0.getPrimary().getDB("test"); - let b = s.rs1.getPrimary().getDB("test"); + db.foo.save({num: 5}); + db.foo.save({num: 15}); + + s.sync(); // ---- make sure shard key index is everywhere ---- + assert.eq(3, a.foo.getIndexKeys().length, "a index 1"); assert.eq(3, b.foo.getIndexKeys().length, "b index 1"); // ---- make sure if you add an index it goes everywhere ------ - assert.commandWorked(db.foo.createIndex({x: 1})); + + db.foo.ensureIndex({x: 1}); + + s.sync(); + assert.eq(4, a.foo.getIndexKeys().length, "a index 2"); assert.eq(4, b.foo.getIndexKeys().length, "b index 2"); - // ---- no unique indexes allowed that do not include the shard key ------ - assert.commandFailed(db.foo.createIndex({z: 1}, true)); + // ---- no unique indexes ------ + + db.foo.ensureIndex({z: 1}, true); + + s.sync(); + assert.eq(4, a.foo.getIndexKeys().length, "a index 3"); assert.eq(4, b.foo.getIndexKeys().length, "b index 3"); - // ---- unique indexes that include the shard key are allowed ------ - assert.commandWorked(db.foo.createIndex({num: 1, bar: 1}, true)); + db.foo.ensureIndex({num: 1, bar: 1}, true); + s.sync(); assert.eq(5, b.foo.getIndexKeys().length, "c index 3"); - // ---- can't shard thing with unique indexes ------ - assert.commandWorked(db.foo2.createIndex({a: 1})); + // ---- can't shard thing with unique indexes + + db.foo2.ensureIndex({a: 1}); + s.sync(); printjson(db.foo2.getIndexes()); - assert.commandWorked(s.s0.adminCommand({shardcollection: "test.foo2", key: {num: 1}}), - "shard with index"); + assert(s.admin.runCommand({shardcollection: "test.foo2", key: {num: 1}}).ok, + "shard with index"); - assert.commandWorked(db.foo3.createIndex({a: 1}, true)); + db.foo3.ensureIndex({a: 1}, true); + s.sync(); printjson(db.foo3.getIndexes()); - assert.commandFailed(s.s0.adminCommand({shardcollection: "test.foo3", key: {num: 1}}), - "shard with unique index"); + assert(!s.admin.runCommand({shardcollection: "test.foo3", key: {num: 1}}).ok, + "shard with unique index"); - assert.commandWorked(db.foo7.createIndex({num: 1, a: 1}, true)); + db.foo7.ensureIndex({num: 1, a: 1}, true); + s.sync(); printjson(db.foo7.getIndexes()); - assert.commandWorked(s.s0.adminCommand({shardcollection: "test.foo7", key: {num: 1}}), - "shard with ok unique index"); + assert(s.admin.runCommand({shardcollection: "test.foo7", key: {num: 1}}).ok, + "shard with ok unique index"); // ----- eval ----- @@ -106,16 +124,17 @@ assert.eq(0, result.ok, "eval should not work for sharded collection in cluster"); // ---- unique shard key ---- - assert.commandWorked( - s.s0.adminCommand({shardcollection: "test.foo4", key: {num: 1}, unique: true}), - "shard with index and unique"); - assert.commandWorked(s.s0.adminCommand({split: "test.foo4", middle: {num: 10}})); - assert.commandWorked(s.s0.adminCommand( - {movechunk: "test.foo4", find: {num: 20}, to: s.getOther(s.getPrimaryShard("test")).name})); - assert.writeOK(db.foo4.insert({num: 5})); - assert.writeOK(db.foo4.insert({num: 15})); + assert(s.admin.runCommand({shardcollection: "test.foo4", key: {num: 1}, unique: true}).ok, + "shard with index and unique"); + s.adminCommand({split: "test.foo4", middle: {num: 10}}); + + s.admin.runCommand( + {movechunk: "test.foo4", find: {num: 20}, to: s.getOther(s.getPrimaryShard("test")).name}); + assert.writeOK(db.foo4.save({num: 5})); + assert.writeOK(db.foo4.save({num: 15})); + s.sync(); assert.eq(1, a.foo4.count(), "ua1"); assert.eq(1, b.foo4.count(), "ub1"); @@ -126,17 +145,16 @@ assert(b.foo4.getIndexes()[1].unique, "ub3"); assert.eq(2, db.foo4.count(), "uc1"); - assert.writeOK(db.foo4.insert({num: 7})); + db.foo4.save({num: 7}); assert.eq(3, db.foo4.count(), "uc2"); - assert.writeError(db.foo4.insert({num: 7})); + assert.writeError(db.foo4.save({num: 7})); assert.eq(3, db.foo4.count(), "uc4"); // --- don't let you convertToCapped ---- assert(!db.foo4.isCapped(), "ca1"); assert(!a.foo4.isCapped(), "ca2"); assert(!b.foo4.isCapped(), "ca3"); - - assert.commandFailed(db.foo4.convertToCapped(30000), "ca30"); + assert(!db.foo4.convertToCapped(30000).ok, "ca30"); assert(!db.foo4.isCapped(), "ca4"); assert(!a.foo4.isCapped(), "ca5"); assert(!b.foo4.isCapped(), "ca6"); @@ -148,9 +166,11 @@ assert(db.foo4a.isCapped(), "ca8"); // --- don't let you shard a capped collection + db.createCollection("foo5", {capped: true, size: 30000}); assert(db.foo5.isCapped(), "cb1"); - assert.commandFailed(s.s0.adminCommand({shardcollection: "test.foo5", key: {num: 1}})); + var res = s.admin.runCommand({shardcollection: "test.foo5", key: {num: 1}}); + assert(!res.ok, "shard capped: " + tojson(res)); // ----- group ---- @@ -200,18 +220,21 @@ }); // ---- can't shard non-empty collection without index ----- - assert.writeOK(db.foo8.insert({a: 1})); - assert.commandFailed(s.s0.adminCommand({shardcollection: "test.foo8", key: {a: 1}}), - "non-empty collection"); + + assert.writeOK(db.foo8.save({a: 1})); + assert(!s.admin.runCommand({shardcollection: "test.foo8", key: {a: 1}}).ok, + "non-empty collection"); // ---- can't shard non-empty collection with null values in shard key ---- - assert.writeOK(db.foo9.insert({b: 1})); - assert.commandWorked(db.foo9.createIndex({a: 1})); - assert.commandFailed(s.s0.adminCommand({shardcollection: "test.foo9", key: {a: 1}}), - "entry with null value"); + + assert.writeOK(db.foo9.save({b: 1})); + db.foo9.ensureIndex({a: 1}); + assert(!s.admin.runCommand({shardcollection: "test.foo9", key: {a: 1}}).ok, + "entry with null value"); // --- listDatabases --- - var r = db.getMongo().getDBs(); + + r = db.getMongo().getDBs(); assert.eq(3, r.databases.length, tojson(r)); assert.eq("number", typeof(r.totalSize), "listDatabases 3 : " + tojson(r)); @@ -222,4 +245,5 @@ assert.commandWorked(s.s0.adminCommand({flushRouterConfig: 'TestDB.TestColl'})); s.stop(); + })(); diff --git a/src/mongo/base/global_initializer_registerer.cpp b/src/mongo/base/global_initializer_registerer.cpp index 67b83dfd6a1..7c458470649 100644 --- a/src/mongo/base/global_initializer_registerer.cpp +++ b/src/mongo/base/global_initializer_registerer.cpp @@ -50,6 +50,7 @@ GlobalInitializerRegisterer::GlobalInitializerRegisterer(std::string name, std::move(prerequisites), std::move(dependents)); + if (Status::OK() != status) { std::cerr << "Attempt to add global initializer failed, status: " << status << std::endl; ::abort(); diff --git a/src/mongo/db/catalog/create_collection.cpp b/src/mongo/db/catalog/create_collection.cpp index a9bf373f38a..da4ecaf40ab 100644 --- a/src/mongo/db/catalog/create_collection.cpp +++ b/src/mongo/db/catalog/create_collection.cpp @@ -89,14 +89,6 @@ Status createCollection(OperationContext* opCtx, !options["capped"].trueValue() || options["size"].isNumber() || options.hasField("$nExtents")); - CollectionOptions collectionOptions; - { - Status status = collectionOptions.parse(options, kind); - if (!status.isOK()) { - return status; - } - } - return writeConflictRetry(opCtx, "create", nss.ns(), [&] { Lock::DBLock dbXLock(opCtx, nss.db(), MODE_X); const bool shardVersionCheck = true; @@ -107,6 +99,12 @@ Status createCollection(OperationContext* opCtx, str::stream() << "Not primary while creating collection " << nss.ns()); } + CollectionOptions collectionOptions; + Status status = collectionOptions.parse(options, kind); + if (!status.isOK()) { + return status; + } + if (collectionOptions.isView()) { // If the `system.views` collection does not exist, create it in a separate // WriteUnitOfWork. @@ -119,8 +117,9 @@ Status createCollection(OperationContext* opCtx, // Create collection. const bool createDefaultIndexes = true; - Status status = Database::userCreateNS( + status = Database::userCreateNS( opCtx, ctx.db(), nss.ns(), collectionOptions, createDefaultIndexes, idIndex); + if (!status.isOK()) { return status; } diff --git a/src/mongo/db/catalog/rename_collection.cpp b/src/mongo/db/catalog/rename_collection.cpp index 6aadfe754b4..d300df28d52 100644 --- a/src/mongo/db/catalog/rename_collection.cpp +++ b/src/mongo/db/catalog/rename_collection.cpp @@ -171,10 +171,10 @@ Status renameCollectionCommon(OperationContext* opCtx, // Make sure the source collection is not sharded. { - auto* const css = CollectionShardingState::get(opCtx, source); - const auto metadata = css->getCurrentMetadata(); - if (metadata->isSharded()) + auto const css = CollectionShardingState::get(opCtx, source); + if (css->getMetadata(opCtx)->isSharded()) { return {ErrorCodes::IllegalOperation, "source namespace cannot be sharded"}; + } } // Disallow renaming from a replicated to an unreplicated collection or vice versa. @@ -211,10 +211,10 @@ Status renameCollectionCommon(OperationContext* opCtx, } { - auto* const css = CollectionShardingState::get(opCtx, target); - const auto metadata = css->getCurrentMetadata(); - if (metadata->isSharded()) + auto const css = CollectionShardingState::get(opCtx, target); + if (css->getMetadata(opCtx)->isSharded()) { return {ErrorCodes::IllegalOperation, "cannot rename to a sharded collection"}; + } } // RenameCollectionForCommand cannot drop target by renaming. diff --git a/src/mongo/db/commands/SConscript b/src/mongo/db/commands/SConscript index 0c419beeb36..b0717ce99c7 100644 --- a/src/mongo/db/commands/SConscript +++ b/src/mongo/db/commands/SConscript @@ -347,7 +347,7 @@ env.Library( '$BUILD_DIR/mongo/db/repl/oplog', '$BUILD_DIR/mongo/db/repl/repl_coordinator_interface', '$BUILD_DIR/mongo/db/rw_concern_d', - '$BUILD_DIR/mongo/db/s/sharding_runtime_d', + '$BUILD_DIR/mongo/db/s/sharding_catalog_manager', '$BUILD_DIR/mongo/s/sharding_legacy_api', '$BUILD_DIR/mongo/util/net/ssl_manager', 'core', diff --git a/src/mongo/db/commands/count_cmd.cpp b/src/mongo/db/commands/count_cmd.cpp index 50cd6752024..cd2891f6266 100644 --- a/src/mongo/db/commands/count_cmd.cpp +++ b/src/mongo/db/commands/count_cmd.cpp @@ -152,8 +152,7 @@ public: // Prevent chunks from being cleaned up during yields - this allows us to only check the // version on initial entry into count. - auto rangePreserver = - CollectionShardingState::get(opCtx, nss)->getMetadataForOperation(opCtx); + auto rangePreserver = CollectionShardingState::get(opCtx, nss)->getMetadata(opCtx); auto statusWithPlanExecutor = getExecutorCount(opCtx, collection, request.getValue(), true /*explain*/); @@ -206,8 +205,7 @@ public: // Prevent chunks from being cleaned up during yields - this allows us to only check the // version on initial entry into count. - auto rangePreserver = - CollectionShardingState::get(opCtx, nss)->getMetadataForOperation(opCtx); + auto rangePreserver = CollectionShardingState::get(opCtx, nss)->getMetadata(opCtx); auto statusWithPlanExecutor = getExecutorCount(opCtx, collection, request.getValue(), false /*explain*/); diff --git a/src/mongo/db/commands/create_indexes.cpp b/src/mongo/db/commands/create_indexes.cpp index fd71731c664..167d5f3161b 100644 --- a/src/mongo/db/commands/create_indexes.cpp +++ b/src/mongo/db/commands/create_indexes.cpp @@ -234,6 +234,8 @@ std::vector<BSONObj> resolveDefaultsAndRemoveExistingIndexes(OperationContext* o return specs; } +} // namespace + /** * { createIndexes : "bar", indexes : [ { ns : "test.bar", key : { x : 1 }, name: "x_1" } ] } */ @@ -241,17 +243,16 @@ class CmdCreateIndex : public ErrmsgCommandDeprecated { public: CmdCreateIndex() : ErrmsgCommandDeprecated(kCommandName) {} - bool supportsWriteConcern(const BSONObj& cmd) const override { + virtual bool supportsWriteConcern(const BSONObj& cmd) const override { return true; } - AllowedOnSecondary secondaryAllowed(ServiceContext*) const override { return AllowedOnSecondary::kNever; } - Status checkAuthForCommand(Client* client, - const std::string& dbname, - const BSONObj& cmdObj) const override { + virtual Status checkAuthForCommand(Client* client, + const std::string& dbname, + const BSONObj& cmdObj) const { ActionSet actions; actions.addAction(ActionType::createIndex); Privilege p(parseResourcePattern(dbname, cmdObj), actions); @@ -260,22 +261,26 @@ public: return Status(ErrorCodes::Unauthorized, "Unauthorized"); } - bool errmsgRun(OperationContext* opCtx, - const string& dbname, - const BSONObj& cmdObj, - string& errmsg, - BSONObjBuilder& result) override { + virtual bool errmsgRun(OperationContext* opCtx, + const string& dbname, + const BSONObj& cmdObj, + string& errmsg, + BSONObjBuilder& result) { const NamespaceString ns(CommandHelpers::parseNsCollectionRequired(dbname, cmdObj)); - uassertStatusOK(userAllowedWriteNS(ns)); - // Disallow users from creating new indexes on config.transactions since the sessions code - // was optimized to not update indexes + Status status = userAllowedWriteNS(ns); + uassertStatusOK(status); + + // Disallow users from creating new indexes on config.transactions since the sessions + // code was optimized to not update indexes. uassert(ErrorCodes::IllegalOperation, str::stream() << "not allowed to create index on " << ns.ns(), ns != NamespaceString::kSessionTransactionsTableNamespace); - auto specs = uassertStatusOK( - parseAndValidateIndexSpecs(opCtx, ns, cmdObj, serverGlobalParams.featureCompatibility)); + auto specsWithStatus = + parseAndValidateIndexSpecs(opCtx, ns, cmdObj, serverGlobalParams.featureCompatibility); + uassertStatusOK(specsWithStatus.getStatus()); + auto specs = std::move(specsWithStatus.getValue()); // Do not use AutoGetOrCreateDb because we may relock the database in mode X. Lock::DBLock dbLock(opCtx, ns.db(), MODE_IX); @@ -330,7 +335,8 @@ public: uasserted(ErrorCodes::CommandNotSupportedOnView, errmsg); } - uassertStatusOK(userAllowedCreateNS(ns.db(), ns.coll())); + status = userAllowedCreateNS(ns.db(), ns.coll()); + uassertStatusOK(status); writeConflictRetry(opCtx, kCommandName, ns.ns(), [&] { WriteUnitOfWork wunit(opCtx); @@ -362,7 +368,8 @@ public: for (size_t i = 0; i < specs.size(); i++) { const BSONObj& spec = specs[i]; if (spec["unique"].trueValue()) { - _checkUniqueIndexConstraints(opCtx, ns, spec["key"].Obj()); + status = checkUniqueIndexConstraints(opCtx, ns, spec["key"].Obj()); + uassertStatusOK(status); } } @@ -428,24 +435,24 @@ public: } private: - static void _checkUniqueIndexConstraints(OperationContext* opCtx, - const NamespaceString& nss, - const BSONObj& newIdxKey) { + static Status checkUniqueIndexConstraints(OperationContext* opCtx, + const NamespaceString& nss, + const BSONObj& newIdxKey) { invariant(opCtx->lockState()->isCollectionLockedForMode(nss.ns(), MODE_X)); - const auto metadata = CollectionShardingState::get(opCtx, nss)->getCurrentMetadata(); - if (!metadata->isSharded()) - return; + auto metadata = CollectionShardingState::get(opCtx, nss)->getMetadata(opCtx); + if (metadata->isSharded()) { + ShardKeyPattern shardKeyPattern(metadata->getKeyPattern()); + if (!shardKeyPattern.isUniqueIndexCompatible(newIdxKey)) { + return Status(ErrorCodes::CannotCreateIndex, + str::stream() << "cannot create unique index over " << newIdxKey + << " with shard key pattern " + << shardKeyPattern.toBSON()); + } + } + - const ShardKeyPattern shardKeyPattern(metadata->getKeyPattern()); - uassert(ErrorCodes::CannotCreateIndex, - str::stream() << "cannot create unique index over " << newIdxKey - << " with shard key pattern " - << shardKeyPattern.toBSON(), - shardKeyPattern.isUniqueIndexCompatible(newIdxKey)); + return Status::OK(); } - } cmdCreateIndex; - -} // namespace -} // namespace mongo +} diff --git a/src/mongo/db/commands/geo_near_cmd.cpp b/src/mongo/db/commands/geo_near_cmd.cpp index 336ce83ebba..e014a13e02c 100644 --- a/src/mongo/db/commands/geo_near_cmd.cpp +++ b/src/mongo/db/commands/geo_near_cmd.cpp @@ -231,8 +231,7 @@ public: // Prevent chunks from being cleaned up during yields - this allows us to only check the // version on initial entry into geoNear. - auto rangePreserver = - CollectionShardingState::get(opCtx, nss)->getMetadataForOperation(opCtx); + auto rangePreserver = CollectionShardingState::get(opCtx, nss)->getMetadata(opCtx); const auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx); const PlanExecutor::YieldPolicy yieldPolicy = diff --git a/src/mongo/db/commands/mr.cpp b/src/mongo/db/commands/mr.cpp index 4a23c78b7fb..3637948a1e7 100644 --- a/src/mongo/db/commands/mr.cpp +++ b/src/mongo/db/commands/mr.cpp @@ -62,7 +62,8 @@ #include "mongo/db/query/plan_summary_stats.h" #include "mongo/db/query/query_planner.h" #include "mongo/db/repl/replication_coordinator.h" -#include "mongo/db/s/collection_sharding_runtime.h" +#include "mongo/db/s/collection_metadata.h" +#include "mongo/db/s/collection_sharding_state.h" #include "mongo/db/s/sharding_state.h" #include "mongo/db/server_options.h" #include "mongo/db/service_context.h" @@ -453,9 +454,6 @@ void State::prepTempCollection() { << status.code()); } wuow.commit(); - - CollectionShardingRuntime::get(_opCtx, _config.incLong) - ->setFilteringMetadata(_opCtx, CollectionMetadata()); }); } @@ -505,7 +503,6 @@ void State::prepTempCollection() { CollectionOptions options = finalOptions; options.temp = true; - // If a UUID for the final output collection was sent by mongos (i.e., the final output // collection is sharded), use the UUID mongos sent when creating the temp collection. // When the temp collection is renamed to the final output collection, the UUID will be @@ -537,9 +534,6 @@ void State::prepTempCollection() { _opCtx, _config.tempNamespace, uuid, *it, false); } wuow.commit(); - - CollectionShardingRuntime::get(_opCtx, _config.tempNamespace) - ->setFilteringMetadata(_opCtx, CollectionMetadata()); }); } @@ -1432,9 +1426,12 @@ public: uassert(16149, "cannot run map reduce without the js engine", getGlobalScriptEngine()); - const auto metadata = [&] { + // Prevent sharding state from changing during the MR. + const auto collMetadata = [&] { + // Get metadata before we check our version, to make sure it doesn't increment in the + // meantime AutoGetCollectionForReadCommand autoColl(opCtx, config.nss); - return CollectionShardingState::get(opCtx, config.nss)->getMetadataForOperation(opCtx); + return CollectionShardingState::get(opCtx, config.nss)->getMetadata(opCtx); }(); bool shouldHaveData = false; @@ -1502,13 +1499,17 @@ public: const ExtensionsCallbackReal extensionsCallback(opCtx, &config.nss); const boost::intrusive_ptr<ExpressionContext> expCtx; - auto cq = uassertStatusOKWithContext( + auto statusWithCQ = CanonicalQuery::canonicalize(opCtx, std::move(qr), expCtx, extensionsCallback, - MatchExpressionParser::kAllowAllSpecialFeatures), - str::stream() << "Can't canonicalize query " << config.filter); + MatchExpressionParser::kAllowAllSpecialFeatures); + if (!statusWithCQ.isOK()) { + uasserted(17238, "Can't canonicalize query " + config.filter.toString()); + return 0; + } + std::unique_ptr<CanonicalQuery> cq = std::move(statusWithCQ.getValue()); unique_ptr<PlanExecutor, PlanExecutor::Deleter> exec; { @@ -1535,37 +1536,38 @@ public: Timer mt; + // go through each doc BSONObj o; PlanExecutor::ExecState execState; while (PlanExecutor::ADVANCED == (execState = exec->getNext(&o, NULL))) { - o = o.getOwned(); // The object will be accessed outside of collection lock - - // Check to see if this is a new object we don't own yet because of a chunk - // migration - if (metadata->isSharded()) { - ShardKeyPattern kp(metadata->getKeyPattern()); - if (!metadata->keyBelongsToMe(kp.extractShardKeyFromDoc(o))) { + o = o.getOwned(); // we will be accessing outside of the lock + // check to see if this is a new object we don't own yet + // because of a chunk migration + if (collMetadata->isSharded()) { + ShardKeyPattern kp(collMetadata->getKeyPattern()); + if (!collMetadata->keyBelongsToMe(kp.extractShardKeyFromDoc(o))) { continue; } } + // do map if (config.verbose) mt.reset(); - config.mapper->map(o); - if (config.verbose) mapTime += mt.micros(); - // Check if the state accumulated so far needs to be written to a collection. - // This may yield the DB lock temporarily and then acquire it again. + // Check if the state accumulated so far needs to be written to a + // collection. This may yield the DB lock temporarily and then + // acquire it again. + // numInputs++; if (numInputs % 100 == 0) { Timer t; - // TODO: As an optimization, we might want to do the save/restore state and - // yield inside the reduceAndSpillInMemoryState method, so it only happens - // if necessary. + // TODO: As an optimization, we might want to do the save/restore + // state and yield inside the reduceAndSpillInMemoryState method, so + // it only happens if necessary. exec->saveState(); scopedAutoDb.reset(); diff --git a/src/mongo/db/exec/update.cpp b/src/mongo/db/exec/update.cpp index eb568bfb8e4..559673940db 100644 --- a/src/mongo/db/exec/update.cpp +++ b/src/mongo/db/exec/update.cpp @@ -135,7 +135,7 @@ bool shouldRestartUpdateIfNoLongerMatches(const UpdateStageParams& params) { const std::vector<std::unique_ptr<FieldRef>>* getImmutableFields(OperationContext* opCtx, const NamespaceString& ns) { - auto metadata = CollectionShardingState::get(opCtx, ns)->getCurrentMetadata(); + auto metadata = CollectionShardingState::get(opCtx, ns)->getMetadata(opCtx); if (metadata->isSharded()) { const std::vector<std::unique_ptr<FieldRef>>& fields = metadata->getKeyPatternFields(); // Return shard-keys as immutable for the update system. @@ -288,12 +288,13 @@ BSONObj UpdateStage::transformAndUpdate(const Snapshotted<BSONObj>& oldObj, Reco RecordId newRecordId; OplogUpdateEntryArgs args; if (!request->isExplain()) { - auto* const css = CollectionShardingState::get(getOpCtx(), _collection->ns()); + invariant(_collection); + auto* css = CollectionShardingState::get(getOpCtx(), _collection->ns()); args.nss = _collection->ns(); args.uuid = _collection->uuid(); args.stmtId = request->getStmtId(); args.update = logObj; - auto metadata = css->getCurrentMetadata(); + auto metadata = css->getMetadata(getOpCtx()); args.criteria = metadata->extractDocumentKey(newObj); uassert(16980, "Multi-update operations require all documents to have an '_id' field", diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp index 15abf920fa6..35cfdb77237 100644 --- a/src/mongo/db/op_observer_impl.cpp +++ b/src/mongo/db/op_observer_impl.cpp @@ -311,7 +311,7 @@ OpTimeBundle replLogApplyOps(OperationContext* opCtx, BSONObj OpObserverImpl::getDocumentKey(OperationContext* opCtx, NamespaceString const& nss, BSONObj const& doc) { - auto metadata = CollectionShardingState::get(opCtx, nss)->getMetadataForOperation(opCtx); + auto metadata = CollectionShardingState::get(opCtx, nss)->getMetadata(opCtx); return metadata->extractDocumentKey(doc).getOwned(); } diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp index 9523261eb8a..0661430956f 100644 --- a/src/mongo/db/pipeline/pipeline_d.cpp +++ b/src/mongo/db/pipeline/pipeline_d.cpp @@ -157,7 +157,7 @@ StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> createRandomCursorEx if (ShardingState::get(opCtx)->needCollectionMetadata(opCtx, collection->ns().ns())) { auto shardFilterStage = stdx::make_unique<ShardFilterStage>( opCtx, - CollectionShardingState::get(opCtx, collection->ns())->getMetadataForOperation(opCtx), + CollectionShardingState::get(opCtx, collection->ns())->getMetadata(opCtx), ws.get(), stage.release()); return PlanExecutor::make(opCtx, @@ -610,7 +610,7 @@ DBClientBase* PipelineD::MongoDInterface::directClient() { bool PipelineD::MongoDInterface::isSharded(OperationContext* opCtx, const NamespaceString& nss) { AutoGetCollectionForReadCommand autoColl(opCtx, nss); auto const css = CollectionShardingState::get(opCtx, nss); - return css->getCurrentMetadata()->isSharded(); + return css->getMetadata(opCtx)->isSharded(); } BSONObj PipelineD::MongoDInterface::insert(const boost::intrusive_ptr<ExpressionContext>& expCtx, @@ -752,7 +752,7 @@ Status PipelineD::MongoDInterface::attachCursorSourceToPipeline( auto css = CollectionShardingState::get(expCtx->opCtx, expCtx->ns); uassert(4567, str::stream() << "from collection (" << expCtx->ns.ns() << ") cannot be sharded", - !css->getCurrentMetadata()->isSharded()); + !css->getMetadata(expCtx->opCtx)->isSharded()); PipelineD::prepareCursorSource(autoColl->getCollection(), expCtx->ns, nullptr, pipeline); @@ -800,7 +800,7 @@ std::pair<std::vector<FieldPath>, bool> PipelineD::MongoDInterface::collectDocum auto scm = [opCtx, &nss]() -> ScopedCollectionMetadata { AutoGetCollection autoColl(opCtx, nss, MODE_IS); - return CollectionShardingState::get(opCtx, nss)->getCurrentMetadata(); + return CollectionShardingState::get(opCtx, nss)->getMetadata(opCtx); }(); // Collection is not sharded or UUID mismatch implies collection has been dropped and recreated diff --git a/src/mongo/db/query/get_executor.cpp b/src/mongo/db/query/get_executor.cpp index defba5f3c69..4f7f4477081 100644 --- a/src/mongo/db/query/get_executor.cpp +++ b/src/mongo/db/query/get_executor.cpp @@ -178,8 +178,8 @@ void fillOutPlannerParams(OperationContext* opCtx, // If the caller wants a shard filter, make sure we're actually sharded. if (plannerParams->options & QueryPlannerParams::INCLUDE_SHARD_FILTER) { - auto collMetadata = CollectionShardingState::get(opCtx, canonicalQuery->nss()) - ->getMetadataForOperation(opCtx); + auto collMetadata = + CollectionShardingState::get(opCtx, canonicalQuery->nss())->getMetadata(opCtx); if (collMetadata->isSharded()) { plannerParams->shardKey = collMetadata->getKeyPattern(); } else { @@ -313,8 +313,7 @@ StatusWith<PrepareExecutionResult> prepareExecution(OperationContext* opCtx, if (plannerParams.options & QueryPlannerParams::INCLUDE_SHARD_FILTER) { root = make_unique<ShardFilterStage>( opCtx, - CollectionShardingState::get(opCtx, canonicalQuery->nss()) - ->getMetadataForOperation(opCtx), + CollectionShardingState::get(opCtx, canonicalQuery->nss())->getMetadata(opCtx), ws, root.release()); } diff --git a/src/mongo/db/query/stage_builder.cpp b/src/mongo/db/query/stage_builder.cpp index c1390d0d18c..c23997773dd 100644 --- a/src/mongo/db/query/stage_builder.cpp +++ b/src/mongo/db/query/stage_builder.cpp @@ -302,11 +302,11 @@ PlanStage* buildStages(OperationContext* opCtx, if (nullptr == childStage) { return nullptr; } - return new ShardFilterStage(opCtx, - CollectionShardingState::get(opCtx, collection->ns()) - ->getMetadataForOperation(opCtx), - ws, - childStage); + return new ShardFilterStage( + opCtx, + CollectionShardingState::get(opCtx, collection->ns())->getMetadata(opCtx), + ws, + childStage); } case STAGE_KEEP_MUTATIONS: { const KeepMutationsNode* km = static_cast<const KeepMutationsNode*>(root); diff --git a/src/mongo/db/s/cleanup_orphaned_cmd.cpp b/src/mongo/db/s/cleanup_orphaned_cmd.cpp index 8bb1c5546e0..3cf9ec3728d 100644 --- a/src/mongo/db/s/cleanup_orphaned_cmd.cpp +++ b/src/mongo/db/s/cleanup_orphaned_cmd.cpp @@ -81,16 +81,11 @@ CleanupResult cleanupOrphanedData(OperationContext* opCtx, { AutoGetCollection autoColl(opCtx, ns, MODE_IX); auto* const css = CollectionShardingRuntime::get(opCtx, ns); - const auto optMetadata = css->getCurrentMetadataIfKnown(); - uassert(ErrorCodes::ConflictingOperationInProgress, - str::stream() << "Unable to establish sharding status for collection " << ns.ns(), - optMetadata); - - const auto& metadata = *optMetadata; + auto metadata = css->getMetadata(opCtx); if (!metadata->isSharded()) { - LOG(0) << "skipping orphaned data cleanup for " << ns.ns() - << ", collection is not sharded"; + log() << "skipping orphaned data cleanup for " << ns.toString() + << ", collection is not sharded"; return CleanupResult_Done; } diff --git a/src/mongo/db/s/collection_metadata_filtering_test.cpp b/src/mongo/db/s/collection_metadata_filtering_test.cpp index 46db156c36a..7f777dfa179 100644 --- a/src/mongo/db/s/collection_metadata_filtering_test.cpp +++ b/src/mongo/db/s/collection_metadata_filtering_test.cpp @@ -32,7 +32,6 @@ #include "mongo/db/catalog_raii.h" #include "mongo/db/s/collection_sharding_runtime.h" -#include "mongo/db/s/operation_sharding_state.h" #include "mongo/s/catalog/type_chunk.h" #include "mongo/s/shard_server_test_fixture.h" @@ -108,12 +107,6 @@ protected: } _manager->setFilteringMetadata(CollectionMetadata(cm, ShardId("0"))); - - auto& oss = OperationShardingState::get(operationContext()); - const auto version = cm->getVersion(ShardId("0")); - BSONObjBuilder builder; - version.appendToCommand(&builder); - oss.initializeClientRoutingVersions(kNss, builder.obj()); } std::shared_ptr<MetadataManager> _manager; @@ -141,7 +134,7 @@ TEST_F(CollectionMetadataFilteringTest, FilterDocumentsInTheFuture) { AutoGetCollection autoColl(operationContext(), kNss, MODE_IS); auto* const css = CollectionShardingState::get(operationContext(), kNss); - testFn(css->getMetadataForOperation(operationContext())); + testFn(css->getMetadata(operationContext())); } { @@ -172,7 +165,7 @@ TEST_F(CollectionMetadataFilteringTest, FilterDocumentsInThePast) { AutoGetCollection autoColl(operationContext(), kNss, MODE_IS); auto* const css = CollectionShardingState::get(operationContext(), kNss); - testFn(css->getMetadataForOperation(operationContext())); + testFn(css->getMetadata(operationContext())); } { @@ -211,7 +204,7 @@ TEST_F(CollectionMetadataFilteringTest, FilterDocumentsTooFarInThePastThrowsStal AutoGetCollection autoColl(operationContext(), kNss, MODE_IS); auto* const css = CollectionShardingState::get(operationContext(), kNss); - testFn(css->getMetadataForOperation(operationContext())); + testFn(css->getMetadata(operationContext())); } { diff --git a/src/mongo/db/s/collection_sharding_runtime.cpp b/src/mongo/db/s/collection_sharding_runtime.cpp index 6fc514ca2a7..f6f92850408 100644 --- a/src/mongo/db/s/collection_sharding_runtime.cpp +++ b/src/mongo/db/s/collection_sharding_runtime.cpp @@ -45,30 +45,6 @@ namespace { // How long to wait before starting cleanup of an emigrated chunk range MONGO_EXPORT_SERVER_PARAMETER(orphanCleanupDelaySecs, int, 900); // 900s = 15m -/** - * Returns whether the specified namespace is used for sharding-internal purposes only and can never - * be marked as anything other than UNSHARDED, because the call sites which reference these - * collections are not prepared to handle StaleConfig errors. - */ -bool isNamespaceAlwaysUnsharded(const NamespaceString& nss) { - // There should never be a case to mark as sharded collections which are on the config server - if (serverGlobalParams.clusterRole != ClusterRole::ShardServer) - return true; - - // Local and admin never have sharded collections - if (nss.db() == NamespaceString::kLocalDb || nss.db() == NamespaceString::kAdminDb) - return true; - - // Certain config collections can never be sharded - if (nss == NamespaceString::kSessionTransactionsTableNamespace) - return true; - - if (nss.isSystemDotProfile()) - return true; - - return false; -} - } // namespace CollectionShardingRuntime::CollectionShardingRuntime(ServiceContext* sc, @@ -76,11 +52,7 @@ CollectionShardingRuntime::CollectionShardingRuntime(ServiceContext* sc, executor::TaskExecutor* rangeDeleterExecutor) : CollectionShardingState(nss), _nss(std::move(nss)), - _metadataManager(std::make_shared<MetadataManager>(sc, _nss, rangeDeleterExecutor)) { - if (isNamespaceAlwaysUnsharded(_nss)) { - _metadataManager->setFilteringMetadata(CollectionMetadata()); - } -} + _metadataManager(std::make_shared<MetadataManager>(sc, _nss, rangeDeleterExecutor)) {} CollectionShardingRuntime* CollectionShardingRuntime::get(OperationContext* opCtx, const NamespaceString& nss) { @@ -90,17 +62,13 @@ CollectionShardingRuntime* CollectionShardingRuntime::get(OperationContext* opCt void CollectionShardingRuntime::setFilteringMetadata(OperationContext* opCtx, CollectionMetadata newMetadata) { - invariant(!newMetadata.isSharded() || !isNamespaceAlwaysUnsharded(_nss), - str::stream() << "Namespace " << _nss.ns() << " must never be sharded."); invariant(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_X)); _metadataManager->setFilteringMetadata(std::move(newMetadata)); } void CollectionShardingRuntime::clearFilteringMetadata() { - if (!isNamespaceAlwaysUnsharded(_nss)) { - _metadataManager->clearFilteringMetadata(); - } + _metadataManager->clearFilteringMetadata(); } auto CollectionShardingRuntime::beginReceive(ChunkRange const& range) -> CleanupNotification { diff --git a/src/mongo/db/s/collection_sharding_state.cpp b/src/mongo/db/s/collection_sharding_state.cpp index f65f262a029..45645383969 100644 --- a/src/mongo/db/s/collection_sharding_state.cpp +++ b/src/mongo/db/s/collection_sharding_state.cpp @@ -34,7 +34,6 @@ #include "mongo/db/s/collection_sharding_state.h" -#include "mongo/db/repl/read_concern_args.h" #include "mongo/db/s/operation_sharding_state.h" #include "mongo/db/s/sharded_connection_info.h" #include "mongo/s/stale_exception.h" @@ -73,9 +72,8 @@ public: stdx::lock_guard<stdx::mutex> lg(_mutex); for (auto& coll : _collections) { - const auto optMetadata = coll.second->getCurrentMetadataIfKnown(); - if (optMetadata) { - const auto& metadata = *optMetadata; + ScopedCollectionMetadata metadata = coll.second->getMetadata(opCtx); + if (metadata->isSharded()) { versionB.appendTimestamp(coll.first, metadata->getShardVersion().toLong()); } } @@ -111,28 +109,6 @@ private: const auto kUnshardedCollection = std::make_shared<UnshardedCollection>(); -ChunkVersion getOperationReceivedVersion(OperationContext* opCtx, const NamespaceString& nss) { - auto& oss = OperationShardingState::get(opCtx); - - // If there is a version attached to the OperationContext, use it as the received version, - // otherwise get the received version from the ShardedConnectionInfo - if (oss.hasShardVersion()) { - return oss.getShardVersion(nss); - } else if (auto const info = ShardedConnectionInfo::get(opCtx->getClient(), false)) { - auto connectionShardVersion = info->getVersion(nss.ns()); - - // For backwards compatibility with map/reduce, which can access up to 2 sharded collections - // in a single call, the lack of version for a namespace on the collection must be treated - // as UNSHARDED - return connectionShardVersion.value_or(ChunkVersion::UNSHARDED()); - } else { - // There is no shard version information on either 'opCtx' or 'client'. This means that the - // operation represented by 'opCtx' is unversioned, and the shard version is always OK for - // unversioned operations. - return ChunkVersion::IGNORED(); - } -} - } // namespace CollectionShardingState::CollectionShardingState(NamespaceString nss) : _nss(std::move(nss)) {} @@ -151,49 +127,37 @@ void CollectionShardingState::report(OperationContext* opCtx, BSONObjBuilder* bu collectionsMap->report(opCtx, builder); } -ScopedCollectionMetadata CollectionShardingState::getMetadataForOperation(OperationContext* opCtx) { - const auto receivedShardVersion = getOperationReceivedVersion(opCtx, _nss); - - if (ChunkVersion::isIgnoredVersion(receivedShardVersion)) { - return {kUnshardedCollection}; - } - +ScopedCollectionMetadata CollectionShardingState::getMetadata(OperationContext* opCtx) { const auto atClusterTime = repl::ReadConcernArgs::get(opCtx).getArgsAtClusterTime(); auto optMetadata = _getMetadata(atClusterTime); - if (!optMetadata) return {kUnshardedCollection}; - return {std::move(*optMetadata)}; -} - -ScopedCollectionMetadata CollectionShardingState::getCurrentMetadata() { - auto optMetadata = _getMetadata(boost::none); - - if (!optMetadata) - return {kUnshardedCollection}; - - return {std::move(*optMetadata)}; -} - -boost::optional<ScopedCollectionMetadata> CollectionShardingState::getCurrentMetadataIfKnown() { - return _getMetadata(boost::none); -} - -boost::optional<ChunkVersion> CollectionShardingState::getCurrentShardVersionIfKnown() { - const auto optMetadata = _getMetadata(boost::none); - if (!optMetadata) - return boost::none; - - const auto& metadata = *optMetadata; - if (!metadata->isSharded()) - return ChunkVersion::UNSHARDED(); - - return metadata->getCollVersion(); + return std::move(*optMetadata); } void CollectionShardingState::checkShardVersionOrThrow(OperationContext* opCtx) { - const auto receivedShardVersion = getOperationReceivedVersion(opCtx, _nss); + auto& oss = OperationShardingState::get(opCtx); + + const auto receivedShardVersion = [&] { + // If there is a version attached to the OperationContext, use it as the received version, + // otherwise get the received version from the ShardedConnectionInfo + if (oss.hasShardVersion()) { + return oss.getShardVersion(_nss); + } else if (auto const info = ShardedConnectionInfo::get(opCtx->getClient(), false)) { + auto connectionShardVersion = info->getVersion(_nss.ns()); + + // For backwards compatibility with map/reduce, which can access up to 2 sharded + // collections in a single call, the lack of version for a namespace on the collection + // must be treated as UNSHARDED + return connectionShardVersion.value_or(ChunkVersion::UNSHARDED()); + } else { + // There is no shard version information on either 'opCtx' or 'client'. This means that + // the operation represented by 'opCtx' is unversioned, and the shard version is always + // OK for unversioned operations. + return ChunkVersion::IGNORED(); + } + }(); if (ChunkVersion::isIgnoredVersion(receivedShardVersion)) { return; @@ -203,7 +167,8 @@ void CollectionShardingState::checkShardVersionOrThrow(OperationContext* opCtx) invariant(repl::ReadConcernArgs::get(opCtx).getLevel() != repl::ReadConcernLevel::kAvailableReadConcern); - const auto metadata = getMetadataForOperation(opCtx); + // Set this for error messaging purposes before potentially returning false. + auto metadata = getMetadata(opCtx); const auto wantedShardVersion = metadata->isSharded() ? metadata->getShardVersion() : ChunkVersion::UNSHARDED(); @@ -213,7 +178,6 @@ void CollectionShardingState::checkShardVersionOrThrow(OperationContext* opCtx) if (criticalSectionSignal) { // Set migration critical section on operation sharding state: operation will wait for the // migration to finish before returning failure and retrying. - auto& oss = OperationShardingState::get(opCtx); oss.setMigrationCriticalSectionSignal(criticalSectionSignal); uasserted(StaleConfigInfo(_nss, receivedShardVersion, wantedShardVersion), diff --git a/src/mongo/db/s/collection_sharding_state.h b/src/mongo/db/s/collection_sharding_state.h index e964dbced12..77e11534f73 100644 --- a/src/mongo/db/s/collection_sharding_state.h +++ b/src/mongo/db/s/collection_sharding_state.h @@ -73,28 +73,14 @@ public: static void report(OperationContext* opCtx, BSONObjBuilder* builder); /** - * Returns the chunk filtering metadata that the current operation should be using for that - * collection or otherwise throws if it has not been loaded yet. If the operation does not - * require a specific shard version, returns an UNSHARDED metadata. The returned object is safe - * to access outside of collection lock. + * Returns the chunk filtering metadata for the collection. The returned object is safe to + * access outside of collection lock. * * If the operation context contains an 'atClusterTime' property, the returned filtering * metadata will be tied to a specific point in time. Otherwise it will reference the latest * time available. */ - ScopedCollectionMetadata getMetadataForOperation(OperationContext* opCtx); - ScopedCollectionMetadata getCurrentMetadata(); - - /** - * Returns boost::none if the filtering metadata for the collection is not known yet. Otherwise - * returns the most recently refreshed from the config server metadata or shard version. - * - * These methods do not check for the shard version that the operation requires and should only - * be used for cases such as checking whether a particular config server update has taken - * effect. - */ - boost::optional<ScopedCollectionMetadata> getCurrentMetadataIfKnown(); - boost::optional<ChunkVersion> getCurrentShardVersionIfKnown(); + ScopedCollectionMetadata getMetadata(OperationContext* opCtx); /** * Checks whether the shard version in the operation context is compatible with the shard diff --git a/src/mongo/db/s/collection_sharding_state_test.cpp b/src/mongo/db/s/collection_sharding_state_test.cpp index a8361bd1490..f5c4a3aad43 100644 --- a/src/mongo/db/s/collection_sharding_state_test.cpp +++ b/src/mongo/db/s/collection_sharding_state_test.cpp @@ -33,7 +33,6 @@ #include "mongo/db/catalog_raii.h" #include "mongo/db/s/collection_sharding_runtime.h" #include "mongo/db/s/op_observer_sharding_impl.h" -#include "mongo/db/s/operation_sharding_state.h" #include "mongo/db/s/type_shard_identity.h" #include "mongo/s/shard_server_test_fixture.h" @@ -59,25 +58,12 @@ CollectionMetadata makeAMetadata(BSONObj const& keyPattern) { return CollectionMetadata(std::move(cm), ShardId("this")); } -class DeleteStateTest : public ShardServerTestFixture { -protected: - void setCollectionFilteringMetadata(CollectionMetadata metadata) { - AutoGetCollection autoColl(operationContext(), kTestNss, MODE_X); - auto* const css = CollectionShardingRuntime::get(operationContext(), kTestNss); - css->setFilteringMetadata(operationContext(), std::move(metadata)); - - auto& oss = OperationShardingState::get(operationContext()); - const auto version = metadata.getShardVersion(); - BSONObjBuilder builder; - version.appendToCommand(&builder); - oss.initializeClientRoutingVersions(kTestNss, builder.obj()); - } -}; +using DeleteStateTest = ShardServerTestFixture; TEST_F(DeleteStateTest, MakeDeleteStateUnsharded) { - setCollectionFilteringMetadata(CollectionMetadata()); - AutoGetCollection autoColl(operationContext(), kTestNss, MODE_IX); + auto* const css = CollectionShardingRuntime::get(operationContext(), kTestNss); + css->setFilteringMetadata(operationContext(), CollectionMetadata()); auto doc = BSON("key3" << "abc" @@ -97,10 +83,11 @@ TEST_F(DeleteStateTest, MakeDeleteStateUnsharded) { } TEST_F(DeleteStateTest, MakeDeleteStateShardedWithoutIdInShardKey) { - // Push a CollectionMetadata with a shard key not including "_id"... - setCollectionFilteringMetadata(makeAMetadata(BSON("key" << 1 << "key3" << 1))); - AutoGetCollection autoColl(operationContext(), kTestNss, MODE_IX); + auto* const css = CollectionShardingRuntime::get(operationContext(), kTestNss); + + // Push a CollectionMetadata with a shard key not including "_id"... + css->setFilteringMetadata(operationContext(), makeAMetadata(BSON("key" << 1 << "key3" << 1))); // The order of fields in `doc` deliberately does not match the shard key auto doc = BSON("key3" @@ -122,10 +109,12 @@ TEST_F(DeleteStateTest, MakeDeleteStateShardedWithoutIdInShardKey) { } TEST_F(DeleteStateTest, MakeDeleteStateShardedWithIdInShardKey) { - // Push a CollectionMetadata with a shard key that does have "_id" in the middle... - setCollectionFilteringMetadata(makeAMetadata(BSON("key" << 1 << "_id" << 1 << "key2" << 1))); - AutoGetCollection autoColl(operationContext(), kTestNss, MODE_IX); + auto* const css = CollectionShardingRuntime::get(operationContext(), kTestNss); + + // Push a CollectionMetadata with a shard key that does have "_id" in the middle... + css->setFilteringMetadata(operationContext(), + makeAMetadata(BSON("key" << 1 << "_id" << 1 << "key2" << 1))); // The order of fields in `doc` deliberately does not match the shard key auto doc = BSON("key2" << true << "key3" @@ -145,11 +134,13 @@ TEST_F(DeleteStateTest, MakeDeleteStateShardedWithIdInShardKey) { } TEST_F(DeleteStateTest, MakeDeleteStateShardedWithIdHashInShardKey) { - // Push a CollectionMetadata with a shard key "_id", hashed. - setCollectionFilteringMetadata(makeAMetadata(BSON("_id" - << "hashed"))); - AutoGetCollection autoColl(operationContext(), kTestNss, MODE_IX); + auto* const css = CollectionShardingRuntime::get(operationContext(), kTestNss); + + // Push a CollectionMetadata with a shard key "_id", hashed. + auto aMetadata = makeAMetadata(BSON("_id" + << "hashed")); + css->setFilteringMetadata(operationContext(), std::move(aMetadata)); auto doc = BSON("key2" << true << "_id" << "hello" diff --git a/src/mongo/db/s/get_shard_version_command.cpp b/src/mongo/db/s/get_shard_version_command.cpp index a51ce749549..a67a90efafb 100644 --- a/src/mongo/db/s/get_shard_version_command.cpp +++ b/src/mongo/db/s/get_shard_version_command.cpp @@ -111,37 +111,27 @@ public: AutoGetCollection autoColl(opCtx, nss, MODE_IS); auto* const css = CollectionShardingRuntime::get(opCtx, nss); - const auto optMetadata = css->getCurrentMetadataIfKnown(); - if (!optMetadata) { - result.append("global", "UNKNOWN"); - - if (cmdObj["fullMetadata"].trueValue()) { - result.append("metadata", BSONObj()); - } + const auto metadata = css->getMetadata(opCtx); + if (metadata->isSharded()) { + result.appendTimestamp("global", metadata->getShardVersion().toLong()); } else { - const auto& metadata = *optMetadata; + result.appendTimestamp("global", ChunkVersion::UNSHARDED().toLong()); + } + if (cmdObj["fullMetadata"].trueValue()) { + BSONObjBuilder metadataBuilder(result.subobjStart("metadata")); if (metadata->isSharded()) { - result.appendTimestamp("global", metadata->getShardVersion().toLong()); - } else { - result.appendTimestamp("global", ChunkVersion::UNSHARDED().toLong()); - } - - if (cmdObj["fullMetadata"].trueValue()) { - BSONObjBuilder metadataBuilder(result.subobjStart("metadata")); - if (metadata->isSharded()) { - metadata->toBSONBasic(metadataBuilder); + metadata->toBSONBasic(metadataBuilder); - BSONArrayBuilder chunksArr(metadataBuilder.subarrayStart("chunks")); - metadata->toBSONChunks(chunksArr); - chunksArr.doneFast(); + BSONArrayBuilder chunksArr(metadataBuilder.subarrayStart("chunks")); + metadata->toBSONChunks(chunksArr); + chunksArr.doneFast(); - BSONArrayBuilder pendingArr(metadataBuilder.subarrayStart("pending")); - css->toBSONPending(pendingArr); - pendingArr.doneFast(); - } - metadataBuilder.doneFast(); + BSONArrayBuilder pendingArr(metadataBuilder.subarrayStart("pending")); + css->toBSONPending(pendingArr); + pendingArr.doneFast(); } + metadataBuilder.doneFast(); } return true; diff --git a/src/mongo/db/s/merge_chunks_command.cpp b/src/mongo/db/s/merge_chunks_command.cpp index 1208f5a8b52..a4d42284520 100644 --- a/src/mongo/db/s/merge_chunks_command.cpp +++ b/src/mongo/db/s/merge_chunks_command.cpp @@ -51,46 +51,55 @@ #include "mongo/util/mongoutils/str.h" namespace mongo { + +using std::string; +using std::vector; +using str::stream; + namespace { bool checkMetadataForSuccess(OperationContext* opCtx, const NamespaceString& nss, - const OID& epoch, - const ChunkRange& chunkRange) { + const BSONObj& minKey, + const BSONObj& maxKey) { const auto metadataAfterMerge = [&] { AutoGetCollection autoColl(opCtx, nss, MODE_IS); - return CollectionShardingState::get(opCtx, nss)->getCurrentMetadata(); + return CollectionShardingState::get(opCtx, nss)->getMetadata(opCtx); }(); uassert(ErrorCodes::StaleEpoch, - str::stream() << "Collection " << nss.ns() << " changed since merge start", - metadataAfterMerge->getCollVersion().epoch() == epoch); + str::stream() << "Collection " << nss.ns() << " became unsharded", + metadataAfterMerge->isSharded()); ChunkType chunk; - if (!metadataAfterMerge->getNextChunk(chunkRange.getMin(), &chunk)) { + if (!metadataAfterMerge->getNextChunk(minKey, &chunk)) { return false; } - return chunk.getMin().woCompare(chunkRange.getMin()) == 0 && - chunk.getMax().woCompare(chunkRange.getMax()) == 0; + return chunk.getMin().woCompare(minKey) == 0 && chunk.getMax().woCompare(maxKey) == 0; } -void mergeChunks(OperationContext* opCtx, - const NamespaceString& nss, - const BSONObj& minKey, - const BSONObj& maxKey, - const OID& epoch) { - const std::string whyMessage = str::stream() << "merging chunks in " << nss.ns() << " from " - << minKey << " to " << maxKey; - auto scopedDistLock = uassertStatusOKWithContext( - Grid::get(opCtx)->catalogClient()->getDistLockManager()->lock( - opCtx, nss.ns(), whyMessage, DistLockManager::kSingleLockAttemptTimeout), - str::stream() << "could not acquire collection lock for " << nss.ns() - << " to merge chunks in [" - << redact(minKey) - << ", " - << redact(maxKey) - << ")"); +Status mergeChunks(OperationContext* opCtx, + const NamespaceString& nss, + const BSONObj& minKey, + const BSONObj& maxKey, + const OID& epoch) { + // Get the distributed lock + // TODO(SERVER-25086): Remove distLock acquisition from merge chunk + const string whyMessage = stream() << "merging chunks in " << nss.ns() << " from " << minKey + << " to " << maxKey; + + auto scopedDistLock = Grid::get(opCtx)->catalogClient()->getDistLockManager()->lock( + opCtx, nss.ns(), whyMessage, DistLockManager::kSingleLockAttemptTimeout); + + if (!scopedDistLock.isOK()) { + std::string context = stream() << "could not acquire collection lock for " << nss.ns() + << " to merge chunks in [" << redact(minKey) << ", " + << redact(maxKey) << ")"; + + warning() << context << causedBy(scopedDistLock.getStatus()); + return scopedDistLock.getStatus().withContext(context); + } auto const shardingState = ShardingState::get(opCtx); @@ -100,32 +109,39 @@ void mergeChunks(OperationContext* opCtx, const auto metadata = [&] { AutoGetCollection autoColl(opCtx, nss, MODE_IS); - return CollectionShardingState::get(opCtx, nss)->getCurrentMetadata(); + return CollectionShardingState::get(opCtx, nss)->getMetadata(opCtx); }(); - uassert(ErrorCodes::StaleEpoch, - str::stream() << "Collection " << nss.ns() << " became unsharded", - metadata->isSharded()); + if (!metadata->isSharded()) { + std::string errmsg = stream() << "could not merge chunks, collection " << nss.ns() + << " is not sharded"; + + warning() << errmsg; + return {ErrorCodes::StaleEpoch, errmsg}; + } const auto shardVersion = metadata->getShardVersion(); - uassert(ErrorCodes::StaleEpoch, - str::stream() << "could not merge chunks, collection " << nss.ns() - << " has changed since merge was sent (sent epoch: " - << epoch.toString() - << ", current epoch: " - << shardVersion.epoch() - << ")", - !epoch.isSet() || shardVersion.epoch() == epoch); - - uassert(ErrorCodes::IllegalOperation, - str::stream() << "could not merge chunks, the range " - << redact(ChunkRange(minKey, maxKey).toString()) - << " is not valid" - << " for collection " - << nss.ns() - << " with key pattern " - << metadata->getKeyPattern().toString(), - metadata->isValidKey(minKey) && metadata->isValidKey(maxKey)); + + if (epoch.isSet() && shardVersion.epoch() != epoch) { + std::string errmsg = stream() + << "could not merge chunks, collection " << nss.ns() + << " has changed since merge was sent (sent epoch: " << epoch.toString() + << ", current epoch: " << shardVersion.epoch() << ")"; + + warning() << errmsg; + return {ErrorCodes::StaleEpoch, errmsg}; + } + + if (!metadata->isValidKey(minKey) || !metadata->isValidKey(maxKey)) { + std::string errmsg = stream() << "could not merge chunks, the range " + << redact(ChunkRange(minKey, maxKey).toString()) + << " is not valid" + << " for collection " << nss.ns() << " with key pattern " + << metadata->getKeyPattern().toString(); + + warning() << errmsg; + return Status(ErrorCodes::IllegalOperation, errmsg); + } // // Get merged chunk information @@ -144,15 +160,15 @@ void mergeChunks(OperationContext* opCtx, chunksToMerge.push_back(itChunk); } - uassert(ErrorCodes::IllegalOperation, - str::stream() << "could not merge chunks, collection " << nss.ns() - << " range starting at " - << redact(minKey) - << " and ending at " - << redact(maxKey) - << " does not belong to shard " - << shardingState->shardId(), - !chunksToMerge.empty()); + if (chunksToMerge.empty()) { + std::string errmsg = stream() + << "could not merge chunks, collection " << nss.ns() << " range starting at " + << redact(minKey) << " and ending at " << redact(maxKey) << " does not belong to shard " + << shardingState->shardId(); + + warning() << errmsg; + return Status(ErrorCodes::IllegalOperation, errmsg); + } // // Validate the range starts and ends at chunks and has no holes, error if not valid @@ -163,56 +179,65 @@ void mergeChunks(OperationContext* opCtx, // minKey is inclusive bool minKeyInRange = rangeContains(firstDocMin, firstDocMax, minKey); - uassert(ErrorCodes::IllegalOperation, - str::stream() << "could not merge chunks, collection " << nss.ns() - << " range starting at " - << redact(minKey) - << " does not belong to shard " - << shardingState->shardId(), - minKeyInRange); + if (!minKeyInRange) { + std::string errmsg = stream() << "could not merge chunks, collection " << nss.ns() + << " range starting at " << redact(minKey) + << " does not belong to shard " << shardingState->shardId(); + + warning() << errmsg; + return Status(ErrorCodes::IllegalOperation, errmsg); + } BSONObj lastDocMin = chunksToMerge.back().getMin(); BSONObj lastDocMax = chunksToMerge.back().getMax(); // maxKey is exclusive bool maxKeyInRange = lastDocMin.woCompare(maxKey) < 0 && lastDocMax.woCompare(maxKey) >= 0; - uassert(ErrorCodes::IllegalOperation, - str::stream() << "could not merge chunks, collection " << nss.ns() - << " range ending at " - << redact(maxKey) - << " does not belong to shard " - << shardingState->shardId(), - maxKeyInRange); + if (!maxKeyInRange) { + std::string errmsg = stream() << "could not merge chunks, collection " << nss.ns() + << " range ending at " << redact(maxKey) + << " does not belong to shard " << shardingState->shardId(); + + warning() << errmsg; + return Status(ErrorCodes::IllegalOperation, errmsg); + } bool validRangeStartKey = firstDocMin.woCompare(minKey) == 0; bool validRangeEndKey = lastDocMax.woCompare(maxKey) == 0; - uassert(ErrorCodes::IllegalOperation, - str::stream() << "could not merge chunks, collection " << nss.ns() - << " does not contain a chunk " - << (!validRangeStartKey ? "starting at " + redact(minKey.toString()) : "") - << (!validRangeStartKey && !validRangeEndKey ? " or " : "") - << (!validRangeEndKey ? "ending at " + redact(maxKey.toString()) : ""), - validRangeStartKey && validRangeEndKey); + if (!validRangeStartKey || !validRangeEndKey) { + std::string errmsg = stream() + << "could not merge chunks, collection " << nss.ns() << " does not contain a chunk " + << (!validRangeStartKey ? "starting at " + redact(minKey.toString()) : "") + << (!validRangeStartKey && !validRangeEndKey ? " or " : "") + << (!validRangeEndKey ? "ending at " + redact(maxKey.toString()) : ""); + + warning() << errmsg; + return Status(ErrorCodes::IllegalOperation, errmsg); + } + + if (chunksToMerge.size() == 1) { + std::string errmsg = stream() << "could not merge chunks, collection " << nss.ns() + << " already contains chunk for " + << redact(ChunkRange(minKey, maxKey).toString()); + + warning() << errmsg; + return Status(ErrorCodes::IllegalOperation, errmsg); + } - uassert(ErrorCodes::IllegalOperation, - str::stream() << "could not merge chunks, collection " << nss.ns() - << " already contains chunk for " - << ChunkRange(minKey, maxKey).toString(), - chunksToMerge.size() > 1); // Look for hole in range for (size_t i = 1; i < chunksToMerge.size(); ++i) { - uassert( - ErrorCodes::IllegalOperation, - str::stream() - << "could not merge chunks, collection " - << nss.ns() - << " has a hole in the range " - << ChunkRange(minKey, maxKey).toString() - << " at " - << ChunkRange(chunksToMerge[i - 1].getMax(), chunksToMerge[i].getMin()).toString(), - chunksToMerge[i - 1].getMax().woCompare(chunksToMerge[i].getMin()) == 0); + if (chunksToMerge[i - 1].getMax().woCompare(chunksToMerge[i].getMin()) != 0) { + std::string errmsg = stream() + << "could not merge chunks, collection " << nss.ns() << " has a hole in the range " + << redact(ChunkRange(minKey, maxKey).toString()) << " at " + << redact(ChunkRange(chunksToMerge[i - 1].getMax(), chunksToMerge[i].getMin()) + .toString()); + + warning() << errmsg; + return Status(ErrorCodes::IllegalOperation, errmsg); + } } // @@ -226,33 +251,42 @@ void mergeChunks(OperationContext* opCtx, auto configCmdObj = request.toConfigCommandBSON(ShardingCatalogClient::kMajorityWriteConcern.toBSON()); - auto cmdResponse = - uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getConfigShard()->runCommand( - opCtx, - ReadPreferenceSetting{ReadPreference::PrimaryOnly}, - "admin", - configCmdObj, - Shard::RetryPolicy::kIdempotent)); + auto cmdResponseStatus = Grid::get(opCtx)->shardRegistry()->getConfigShard()->runCommand( + opCtx, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + "admin", + configCmdObj, + Shard::RetryPolicy::kIdempotent); // Refresh metadata to pick up new chunk definitions (regardless of the results returned from // running _configsvrCommitChunkMerge). forceShardFilteringMetadataRefresh(opCtx, nss); + // If we failed to get any response from the config server at all, despite retries, then we + // should just go ahead and fail the whole operation. + if (!cmdResponseStatus.isOK()) { + return cmdResponseStatus.getStatus(); + } + // If _configsvrCommitChunkMerge returned an error, look at this shard's metadata to determine // if the merge actually did happen. This can happen if there's a network error getting the // response from the first call to _configsvrCommitChunkMerge, but it actually succeeds, thus // the automatic retry fails with a precondition violation, for example. - auto commandStatus = std::move(cmdResponse.commandStatus); - auto writeConcernStatus = std::move(cmdResponse.writeConcernStatus); + auto commandStatus = std::move(cmdResponseStatus.getValue().commandStatus); + auto writeConcernStatus = std::move(cmdResponseStatus.getValue().writeConcernStatus); if ((!commandStatus.isOK() || !writeConcernStatus.isOK()) && - checkMetadataForSuccess(opCtx, nss, epoch, ChunkRange(minKey, maxKey))) { + checkMetadataForSuccess(opCtx, nss, minKey, maxKey)) { + LOG(1) << "mergeChunk [" << redact(minKey) << "," << redact(maxKey) << ") has already been committed."; + } else if (!commandStatus.isOK()) { + return commandStatus.withContext("Failed to commit chunk merge"); + } else if (!writeConcernStatus.isOK()) { + return writeConcernStatus.withContext("Failed to commit chunk merge"); } - uassertStatusOKWithContext(commandStatus, "Failed to commit chunk merge"); - uassertStatusOKWithContext(writeConcernStatus, "Failed to commit chunk merge"); + return Status::OK(); } class MergeChunksCommand : public ErrmsgCommandDeprecated { @@ -291,22 +325,22 @@ public: } // Required - static BSONField<std::string> nsField; - static BSONField<std::vector<BSONObj>> boundsField; + static BSONField<string> nsField; + static BSONField<vector<BSONObj>> boundsField; // Optional, if the merge is only valid for a particular epoch static BSONField<OID> epochField; bool errmsgRun(OperationContext* opCtx, - const std::string& dbname, + const string& dbname, const BSONObj& cmdObj, - std::string& errmsg, + string& errmsg, BSONObjBuilder& result) override { uassertStatusOK(ShardingState::get(opCtx)->canAcceptShardedCommands()); const NamespaceString nss(parseNs(dbname, cmdObj)); - std::vector<BSONObj> bounds; + vector<BSONObj> bounds; if (!FieldParser::extract(cmdObj, boundsField, &bounds, &errmsg)) { return false; } @@ -340,14 +374,15 @@ public: return false; } - mergeChunks(opCtx, nss, minKey, maxKey, epoch); + auto mergeStatus = mergeChunks(opCtx, nss, minKey, maxKey, epoch); + uassertStatusOK(mergeStatus); return true; } } mergeChunksCmd; -BSONField<std::string> MergeChunksCommand::nsField("mergeChunks"); -BSONField<std::vector<BSONObj>> MergeChunksCommand::boundsField("bounds"); +BSONField<string> MergeChunksCommand::nsField("mergeChunks"); +BSONField<vector<BSONObj>> MergeChunksCommand::boundsField("bounds"); BSONField<OID> MergeChunksCommand::epochField("epoch"); } // namespace diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp index 49a350d096b..6bf3f7718b3 100644 --- a/src/mongo/db/s/migration_destination_manager.cpp +++ b/src/mongo/db/s/migration_destination_manager.cpp @@ -1161,14 +1161,14 @@ CollectionShardingRuntime::CleanupNotification MigrationDestinationManager::_not AutoGetCollection autoColl(opCtx, _nss, MODE_IX, MODE_X); auto* const css = CollectionShardingRuntime::get(opCtx, _nss); - const auto optMetadata = css->getCurrentMetadataIfKnown(); + + auto metadata = css->getMetadata(opCtx); // This can currently happen because drops aren't synchronized with in-migrations. The idea for // checking this here is that in the future we shouldn't have this problem. - if (!optMetadata || !(*optMetadata)->isSharded() || - (*optMetadata)->getCollVersion().epoch() != _epoch) { + if (!metadata->isSharded() || metadata->getCollVersion().epoch() != _epoch) { return Status{ErrorCodes::StaleShardVersion, - str::stream() << "Not marking chunk " << redact(range.toString()) + str::stream() << "not noting chunk " << redact(range.toString()) << " as pending because the epoch of " << _nss.ns() << " changed"}; @@ -1192,14 +1192,14 @@ void MigrationDestinationManager::_forgetPending(OperationContext* opCtx, ChunkR UninterruptibleLockGuard noInterrupt(opCtx->lockState()); AutoGetCollection autoColl(opCtx, _nss, MODE_IX, MODE_X); auto* const css = CollectionShardingRuntime::get(opCtx, _nss); - const auto optMetadata = css->getCurrentMetadataIfKnown(); + + auto metadata = css->getMetadata(opCtx); // This can currently happen because drops aren't synchronized with in-migrations. The idea for // checking this here is that in the future we shouldn't have this problem. - if (!optMetadata || !(*optMetadata)->isSharded() || - (*optMetadata)->getCollVersion().epoch() != _epoch) { - LOG(0) << "No need to forget pending chunk " << redact(range.toString()) - << " because the epoch for " << _nss.ns() << " changed"; + if (!metadata->isSharded() || metadata->getCollVersion().epoch() != _epoch) { + log() << "no need to forget pending chunk " << redact(range.toString()) + << " because the epoch for " << _nss.ns() << " changed"; return; } diff --git a/src/mongo/db/s/migration_source_manager.cpp b/src/mongo/db/s/migration_source_manager.cpp index 15ae251d65f..57937d9c5fe 100644 --- a/src/mongo/db/s/migration_source_manager.cpp +++ b/src/mongo/db/s/migration_source_manager.cpp @@ -112,6 +112,19 @@ void refreshRecipientRoutingTable(OperationContext* opCtx, executor->scheduleRemoteCommand(request, noOp).getStatus().ignore(); } +Status checkCollectionEpochMatches(const ScopedCollectionMetadata& metadata, OID expectedEpoch) { + if (metadata->isSharded() && metadata->getCollVersion().epoch() == expectedEpoch) + return Status::OK(); + + return {ErrorCodes::IncompatibleShardingMetadata, + str::stream() << "The collection was dropped or recreated since the migration began. " + << "Expected collection epoch: " + << expectedEpoch.toString() + << ", but found: " + << (metadata->isSharded() ? metadata->getCollVersion().epoch().toString() + : "unsharded collection.")}; +} + } // namespace MONGO_FAIL_POINT_DEFINE(doNotRefreshRecipientAfterCommit); @@ -157,15 +170,9 @@ MigrationSourceManager::MigrationSourceManager(OperationContext* opCtx, collectionUUID = autoColl.getCollection()->uuid().value(); } - auto optMetadata = - CollectionShardingState::get(opCtx, getNss())->getCurrentMetadataIfKnown(); - uassert(ErrorCodes::ConflictingOperationInProgress, - "The collection's sharding state was cleared by a concurrent operation", - optMetadata); - - auto& metadata = *optMetadata; + auto metadata = CollectionShardingState::get(opCtx, getNss())->getMetadata(opCtx); uassert(ErrorCodes::IncompatibleShardingMetadata, - "Cannot move chunks for an unsharded collection", + str::stream() << "cannot move chunks for an unsharded collection", metadata->isSharded()); return std::make_tuple(std::move(metadata), std::move(collectionUUID)); @@ -234,7 +241,14 @@ Status MigrationSourceManager::startClone(OperationContext* opCtx) { { // Register for notifications from the replication subsystem - const auto metadata = _getCurrentMetadataAndCheckEpoch(opCtx); + UninterruptibleLockGuard noInterrupt(opCtx->lockState()); + AutoGetCollection autoColl(opCtx, getNss(), MODE_IX, MODE_X); + auto* const css = CollectionShardingRuntime::get(opCtx, getNss()); + + const auto metadata = css->getMetadata(opCtx); + Status status = checkCollectionEpochMatches(metadata, _collectionEpoch); + if (!status.isOK()) + return status; // Having the metadata manager registered on the collection sharding state is what indicates // that a chunk on that collection is being migrated. With an active migration, write @@ -243,9 +257,6 @@ Status MigrationSourceManager::startClone(OperationContext* opCtx) { _cloneDriver = stdx::make_unique<MigrationChunkClonerSourceLegacy>( _args, metadata->getKeyPattern(), _donorConnStr, _recipientHost); - UninterruptibleLockGuard noInterrupt(opCtx->lockState()); - AutoGetCollection autoColl(opCtx, getNss(), MODE_IX, MODE_X); - auto* const css = CollectionShardingRuntime::get(opCtx, getNss()); invariant(nullptr == std::exchange(msmForCsr(css), this)); _state = kCloning; } @@ -285,7 +296,19 @@ Status MigrationSourceManager::enterCriticalSection(OperationContext* opCtx) { _stats.totalDonorChunkCloneTimeMillis.addAndFetch(_cloneAndCommitTimer.millis()); _cloneAndCommitTimer.reset(); - _notifyChangeStreamsOnRecipientFirstChunk(opCtx, _getCurrentMetadataAndCheckEpoch(opCtx)); + { + const auto metadata = [&] { + UninterruptibleLockGuard noInterrupt(opCtx->lockState()); + AutoGetCollection autoColl(opCtx, _args.getNss(), MODE_IS); + return CollectionShardingState::get(opCtx, _args.getNss())->getMetadata(opCtx); + }(); + + Status status = checkCollectionEpochMatches(metadata, _collectionEpoch); + if (!status.isOK()) + return status; + + _notifyChangeStreamsOnRecipientFirstChunk(opCtx, metadata); + } // Mark the shard as running critical operation, which requires recovery on crash. // @@ -361,7 +384,15 @@ Status MigrationSourceManager::commitChunkMetadataOnConfig(OperationContext* opC BSONObjBuilder builder; { - const auto metadata = _getCurrentMetadataAndCheckEpoch(opCtx); + const auto metadata = [&] { + UninterruptibleLockGuard noInterrupt(opCtx->lockState()); + AutoGetCollection autoColl(opCtx, _args.getNss(), MODE_IS); + return CollectionShardingState::get(opCtx, _args.getNss())->getMetadata(opCtx); + }(); + + Status status = checkCollectionEpochMatches(metadata, _collectionEpoch); + if (!status.isOK()) + return status; boost::optional<ChunkType> controlChunkType = boost::none; ChunkType differentChunk; @@ -504,7 +535,18 @@ Status MigrationSourceManager::commitChunkMetadataOnConfig(OperationContext* opC << "' after commit failed"); } - const auto refreshedMetadata = _getCurrentMetadataAndCheckEpoch(opCtx); + auto refreshedMetadata = [&] { + UninterruptibleLockGuard noInterrupt(opCtx->lockState()); + AutoGetCollection autoColl(opCtx, getNss(), MODE_IS); + return CollectionShardingState::get(opCtx, getNss())->getMetadata(opCtx); + }(); + + if (!refreshedMetadata->isSharded()) { + return {ErrorCodes::NamespaceNotSharded, + str::stream() << "Chunk move failed because collection '" << getNss().ns() + << "' is no longer sharded. The migration commit error was: " + << migrationCommitStatus.toString()}; + } if (refreshedMetadata->keyBelongsToMe(_args.getMinKey())) { // This condition may only happen if the migration commit has failed for any reason @@ -529,8 +571,8 @@ Status MigrationSourceManager::commitChunkMetadataOnConfig(OperationContext* opC } // Migration succeeded - LOG(0) << "Migration succeeded and updated collection version to " - << refreshedMetadata->getCollVersion(); + log() << "Migration succeeded and updated collection version to " + << refreshedMetadata->getCollVersion(); MONGO_FAIL_POINT_PAUSE_WHILE_SET(hangBeforeLeavingCriticalSection); @@ -619,32 +661,6 @@ void MigrationSourceManager::cleanupOnError(OperationContext* opCtx) { } } -ScopedCollectionMetadata MigrationSourceManager::_getCurrentMetadataAndCheckEpoch( - OperationContext* opCtx) { - auto metadata = [&] { - UninterruptibleLockGuard noInterrupt(opCtx->lockState()); - AutoGetCollection autoColl(opCtx, getNss(), MODE_IS); - auto* const css = CollectionShardingRuntime::get(opCtx, getNss()); - - const auto optMetadata = css->getCurrentMetadataIfKnown(); - uassert(ErrorCodes::ConflictingOperationInProgress, - "The collection's sharding state was cleared by a concurrent operation", - optMetadata); - return *optMetadata; - }(); - - uassert(ErrorCodes::ConflictingOperationInProgress, - str::stream() << "The collection was dropped or recreated since the migration began. " - << "Expected collection epoch: " - << _collectionEpoch.toString() - << ", but found: " - << (metadata->isSharded() ? metadata->getCollVersion().epoch().toString() - : "unsharded collection."), - metadata->isSharded() && metadata->getCollVersion().epoch() == _collectionEpoch); - - return metadata; -} - void MigrationSourceManager::_notifyChangeStreamsOnRecipientFirstChunk( OperationContext* opCtx, const ScopedCollectionMetadata& metadata) { // If this is not the first donation, there is nothing to be done diff --git a/src/mongo/db/s/migration_source_manager.h b/src/mongo/db/s/migration_source_manager.h index eaaacdf0590..c053b73736e 100644 --- a/src/mongo/db/s/migration_source_manager.h +++ b/src/mongo/db/s/migration_source_manager.h @@ -185,8 +185,6 @@ private: // comments explaining the various state transitions. enum State { kCreated, kCloning, kCloneCaughtUp, kCriticalSection, kCloneCompleted, kDone }; - ScopedCollectionMetadata _getCurrentMetadataAndCheckEpoch(OperationContext* opCtx); - /** * If this donation moves the first chunk to the recipient (i.e., the recipient didn't have any * chunks), this function writes a no-op message to the oplog, so that change stream will notice diff --git a/src/mongo/db/s/set_shard_version_command.cpp b/src/mongo/db/s/set_shard_version_command.cpp index 2d99d322752..99a7a0b3bb6 100644 --- a/src/mongo/db/s/set_shard_version_command.cpp +++ b/src/mongo/db/s/set_shard_version_command.cpp @@ -232,12 +232,11 @@ public: boost::optional<Lock::CollectionLock> collLock; collLock.emplace(opCtx->lockState(), nss.ns(), MODE_IS); - auto* const css = CollectionShardingState::get(opCtx, nss); + auto const css = CollectionShardingState::get(opCtx, nss); const ChunkVersion collectionShardVersion = [&] { - auto optMetadata = css->getCurrentMetadataIfKnown(); - return (optMetadata && (*optMetadata)->isSharded()) - ? (*optMetadata)->getShardVersion() - : ChunkVersion::UNSHARDED(); + auto metadata = css->getMetadata(opCtx); + return metadata->isSharded() ? metadata->getShardVersion() + : ChunkVersion::UNSHARDED(); }(); if (requestedVersion.isWriteCompatibleWith(collectionShardVersion)) { @@ -351,13 +350,11 @@ public: { AutoGetCollection autoColl(opCtx, nss, MODE_IS); - const ChunkVersion currVersion = [&] { - auto* const css = CollectionShardingState::get(opCtx, nss); - auto optMetadata = css->getCurrentMetadataIfKnown(); - return (optMetadata && (*optMetadata)->isSharded()) - ? (*optMetadata)->getShardVersion() - : ChunkVersion::UNSHARDED(); - }(); + ChunkVersion currVersion = ChunkVersion::UNSHARDED(); + auto metadata = CollectionShardingState::get(opCtx, nss)->getMetadata(opCtx); + if (metadata->isSharded()) { + currVersion = metadata->getShardVersion(); + } if (!status.isOK()) { // The reload itself was interrupted or confused here diff --git a/src/mongo/db/s/shard_filtering_metadata_refresh.cpp b/src/mongo/db/s/shard_filtering_metadata_refresh.cpp index fbcc31a0557..1fb93a6213f 100644 --- a/src/mongo/db/s/shard_filtering_metadata_refresh.cpp +++ b/src/mongo/db/s/shard_filtering_metadata_refresh.cpp @@ -74,16 +74,19 @@ void onShardVersionMismatch(OperationContext* opCtx, const auto currentShardVersion = [&] { AutoGetCollection autoColl(opCtx, nss, MODE_IS); - return CollectionShardingState::get(opCtx, nss)->getCurrentShardVersionIfKnown(); + const auto currentMetadata = CollectionShardingState::get(opCtx, nss)->getMetadata(opCtx); + if (currentMetadata->isSharded()) { + return currentMetadata->getShardVersion(); + } + + return ChunkVersion::UNSHARDED(); }(); - if (currentShardVersion) { - if (currentShardVersion->epoch() == shardVersionReceived.epoch() && - currentShardVersion->majorVersion() >= shardVersionReceived.majorVersion()) { - // Don't need to remotely reload if we're in the same epoch and the requested version is - // smaller than the one we know about. This means that the remote side is behind. - return; - } + if (currentShardVersion.epoch() == shardVersionReceived.epoch() && + currentShardVersion.majorVersion() >= shardVersionReceived.majorVersion()) { + // Don't need to remotely reload if we're in the same epoch and the requested version is + // smaller than the one we know about. This means that the remote side is behind. + return; } forceShardFilteringMetadataRefresh(opCtx, nss, forceRefreshFromThisThread); @@ -133,69 +136,58 @@ ChunkVersion forceShardFilteringMetadataRefresh(OperationContext* opCtx, invariant(!opCtx->lockState()->isLocked()); invariant(!opCtx->getClient()->isInDirectClient()); - auto* const shardingState = ShardingState::get(opCtx); + auto const shardingState = ShardingState::get(opCtx); invariant(shardingState->canAcceptShardedCommands()); - auto routingInfo = + const auto routingInfo = uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfoWithRefresh( opCtx, nss, forceRefreshFromThisThread)); - auto cm = routingInfo.cm(); + const auto cm = routingInfo.cm(); if (!cm) { // No chunk manager, so unsharded. // Exclusive collection lock needed since we're now changing the metadata AutoGetCollection autoColl(opCtx, nss, MODE_IX, MODE_X); - CollectionShardingRuntime::get(opCtx, nss) - ->setFilteringMetadata(opCtx, CollectionMetadata()); + + auto* const css = CollectionShardingRuntime::get(opCtx, nss); + css->setFilteringMetadata(opCtx, CollectionMetadata()); return ChunkVersion::UNSHARDED(); } - // Optimistic check with only IS lock in order to avoid threads piling up on the collection X - // lock below { AutoGetCollection autoColl(opCtx, nss, MODE_IS); - auto optMetadata = CollectionShardingState::get(opCtx, nss)->getCurrentMetadataIfKnown(); + auto metadata = CollectionShardingState::get(opCtx, nss)->getMetadata(opCtx); // We already have newer version - if (optMetadata) { - const auto& metadata = *optMetadata; - if (metadata->isSharded() && - metadata->getCollVersion().epoch() == cm->getVersion().epoch() && - metadata->getCollVersion() >= cm->getVersion()) { - LOG(1) << "Skipping refresh of metadata for " << nss << " " - << metadata->getCollVersion() << " with an older " << cm->getVersion(); - return metadata->getShardVersion(); - } + if (metadata->isSharded() && + metadata->getCollVersion().epoch() == cm->getVersion().epoch() && + metadata->getCollVersion() >= cm->getVersion()) { + LOG(1) << "Skipping refresh of metadata for " << nss << " " + << metadata->getCollVersion() << " with an older " << cm->getVersion(); + return metadata->getShardVersion(); } } // Exclusive collection lock needed since we're now changing the metadata AutoGetCollection autoColl(opCtx, nss, MODE_IX, MODE_X); + auto* const css = CollectionShardingRuntime::get(opCtx, nss); - { - auto optMetadata = CollectionShardingState::get(opCtx, nss)->getCurrentMetadataIfKnown(); + auto metadata = css->getMetadata(opCtx); - // We already have newer version - if (optMetadata) { - const auto& metadata = *optMetadata; - if (metadata->isSharded() && - metadata->getCollVersion().epoch() == cm->getVersion().epoch() && - metadata->getCollVersion() >= cm->getVersion()) { - LOG(1) << "Skipping refresh of metadata for " << nss << " " - << metadata->getCollVersion() << " with an older " << cm->getVersion(); - return metadata->getShardVersion(); - } - } + // We already have newer version + if (metadata->isSharded() && metadata->getCollVersion().epoch() == cm->getVersion().epoch() && + metadata->getCollVersion() >= cm->getVersion()) { + LOG(1) << "Skipping refresh of metadata for " << nss << " " << metadata->getCollVersion() + << " with an older " << cm->getVersion(); + return metadata->getShardVersion(); } - CollectionMetadata metadata(std::move(cm), shardingState->shardId()); - const auto newShardVersion = metadata.getShardVersion(); + css->setFilteringMetadata(opCtx, CollectionMetadata(cm, shardingState->shardId())); - css->setFilteringMetadata(opCtx, std::move(metadata)); - return newShardVersion; + return css->getMetadata(opCtx)->getShardVersion(); } Status onDbVersionMismatchNoExcept( diff --git a/src/mongo/db/s/shard_server_op_observer.cpp b/src/mongo/db/s/shard_server_op_observer.cpp index 3078fd59396..622225a9977 100644 --- a/src/mongo/db/s/shard_server_op_observer.cpp +++ b/src/mongo/db/s/shard_server_op_observer.cpp @@ -206,8 +206,8 @@ void ShardServerOpObserver::onInserts(OperationContext* opCtx, std::vector<InsertStatement>::const_iterator begin, std::vector<InsertStatement>::const_iterator end, bool fromMigrate) { - auto* const css = CollectionShardingState::get(opCtx, nss); - const auto metadata = css->getMetadataForOperation(opCtx); + auto const css = CollectionShardingState::get(opCtx, nss); + const auto metadata = css->getMetadata(opCtx); for (auto it = begin; it != end; ++it) { const auto& insertedDoc = it->doc; @@ -232,8 +232,8 @@ void ShardServerOpObserver::onInserts(OperationContext* opCtx, } void ShardServerOpObserver::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArgs& args) { - auto* const css = CollectionShardingState::get(opCtx, args.nss); - const auto metadata = css->getMetadataForOperation(opCtx); + auto const css = CollectionShardingState::get(opCtx, args.nss); + const auto metadata = css->getMetadata(opCtx); if (args.nss == NamespaceString::kShardConfigCollectionsNamespace) { // Notification of routing table changes are only needed on secondaries diff --git a/src/mongo/db/s/split_chunk.cpp b/src/mongo/db/s/split_chunk.cpp index d3c0a6908db..f82f8f56bf2 100644 --- a/src/mongo/db/s/split_chunk.cpp +++ b/src/mongo/db/s/split_chunk.cpp @@ -96,17 +96,16 @@ bool checkIfSingleDoc(OperationContext* opCtx, */ bool checkMetadataForSuccessfulSplitChunk(OperationContext* opCtx, const NamespaceString& nss, - const OID& epoch, const ChunkRange& chunkRange, const std::vector<BSONObj>& splitKeys) { const auto metadataAfterSplit = [&] { AutoGetCollection autoColl(opCtx, nss, MODE_IS); - return CollectionShardingState::get(opCtx, nss)->getCurrentMetadata(); + return CollectionShardingState::get(opCtx, nss)->getMetadata(opCtx); }(); uassert(ErrorCodes::StaleEpoch, - str::stream() << "Collection " << nss.ns() << " changed since split start", - metadataAfterSplit->getCollVersion().epoch() == epoch); + str::stream() << "Collection " << nss.ns() << " became unsharded", + metadataAfterSplit->isSharded()); auto newChunkBounds(splitKeys); auto startKey = chunkRange.getMin(); @@ -209,8 +208,7 @@ StatusWith<boost::optional<ChunkRange>> splitChunk(OperationContext* opCtx, if (!commandStatus.isOK() || !writeConcernStatus.isOK()) { forceShardFilteringMetadataRefresh(opCtx, nss); - if (checkMetadataForSuccessfulSplitChunk( - opCtx, nss, expectedCollectionEpoch, chunkRange, splitKeys)) { + if (checkMetadataForSuccessfulSplitChunk(opCtx, nss, chunkRange, splitKeys)) { // Split was committed. } else if (!commandStatus.isOK()) { return commandStatus; diff --git a/src/mongo/s/stale_exception.cpp b/src/mongo/s/stale_exception.cpp index 42fc136e1b4..67e8c7b8ba8 100644 --- a/src/mongo/s/stale_exception.cpp +++ b/src/mongo/s/stale_exception.cpp @@ -44,7 +44,7 @@ MONGO_INIT_REGISTER_ERROR_EXTRA_INFO(StaleDbRoutingVersion); boost::optional<ChunkVersion> extractOptionalVersion(const BSONObj& obj, StringData field) { auto swChunkVersion = ChunkVersion::parseLegacyWithField(obj, field); if (swChunkVersion == ErrorCodes::NoSuchKey) - return boost::none; + return ChunkVersion::UNSHARDED(); return uassertStatusOK(std::move(swChunkVersion)); } |