diff options
author | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2020-02-26 12:31:41 -0500 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-02-26 19:46:00 +0000 |
commit | fcd2dd41189fffc6e67a8645b99974178f87ca04 (patch) | |
tree | 5a8db019a31e378e4ffa9f1b3650130c1670184d | |
parent | 4903af6f8e927b64762e5903e026b60c111385e0 (diff) | |
download | mongo-fcd2dd41189fffc6e67a8645b99974178f87ca04.tar.gz |
SERVER-45599 Backport of SERVER-39495: Move ShardingState::needCollectionMetadata under OperationShardingState
ShardingState logically contains answers to questions about whether the
current instance is node in a sharded cluster, whereas
OperationShardingState is responsible for the 'shardedness' of the
commands.
This is a partial cherry-pick from b049257fbd1d215388cffaf7544f6741dbce5b45, adapted for the 4.0 branch.
Also backports the addition of more testing for multi:true/justOne:false updates and ChangeStreams, which was taken from commit 50f6bd4d6a9428a6f1df22db792d7b55d773762c.
-rw-r--r-- | jstests/change_streams/change_stream.js | 40 | ||||
-rw-r--r-- | jstests/sharding/change_streams.js | 423 | ||||
-rw-r--r-- | src/mongo/db/catalog/capped_utils.cpp | 28 | ||||
-rw-r--r-- | src/mongo/db/catalog/create_collection.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/catalog/rename_collection.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/commands/collection_to_capped.cpp | 27 | ||||
-rw-r--r-- | src/mongo/db/commands/find_cmd.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/pipeline/pipeline_d.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/query/find.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/query/get_executor.cpp | 15 | ||||
-rw-r--r-- | src/mongo/db/query/get_executor.h | 2 | ||||
-rw-r--r-- | src/mongo/db/query/query_planner_params.h | 9 | ||||
-rw-r--r-- | src/mongo/db/s/cleanup_orphaned_cmd.cpp | 8 | ||||
-rw-r--r-- | src/mongo/db/s/operation_sharding_state.cpp | 11 | ||||
-rw-r--r-- | src/mongo/db/s/operation_sharding_state.h | 7 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_state.cpp | 15 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_state.h | 6 | ||||
-rw-r--r-- | src/mongo/s/commands/commands_public.cpp | 3 |
18 files changed, 351 insertions, 262 deletions
diff --git a/jstests/change_streams/change_stream.js b/jstests/change_streams/change_stream.js index 7c5688b0704..9f41255c599 100644 --- a/jstests/change_streams/change_stream.js +++ b/jstests/change_streams/change_stream.js @@ -132,6 +132,27 @@ }; cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [expected]}); + jsTestLog("Testing multi:true update"); + assert.writeOK(db.t1.insert({_id: 4, a: 0, b: 1})); + assert.writeOK(db.t1.insert({_id: 5, a: 0, b: 1})); + cursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t1}); + assert.writeOK(db.t1.update({a: 0}, {$set: {b: 2}}, {multi: true})); + expected = [ + { + documentKey: {_id: 4}, + ns: {db: "test", coll: "t1"}, + operationType: "update", + updateDescription: {removedFields: [], updatedFields: {b: 2}} + }, + { + documentKey: {_id: 5}, + ns: {db: "test", coll: "t1"}, + operationType: "update", + updateDescription: {removedFields: [], updatedFields: {b: 2}} + } + ]; + cst.assertNextChangesEqual({cursor: cursor, expectedChanges: expected}); + jsTestLog("Testing delete"); cursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t1}); assert.writeOK(db.t1.remove({_id: 1})); @@ -142,6 +163,25 @@ }; cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [expected]}); + jsTestLog("Testing justOne:false delete"); + assert.writeOK(db.t1.insert({_id: 6, a: 1, b: 1})); + assert.writeOK(db.t1.insert({_id: 7, a: 1, b: 1})); + cursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t1}); + assert.writeOK(db.t1.remove({a: 1}, {justOne: false})); + expected = [ + { + documentKey: {_id: 6}, + ns: {db: "test", coll: "t1"}, + operationType: "delete", + }, + { + documentKey: {_id: 7}, + ns: {db: "test", coll: "t1"}, + operationType: "delete", + } + ]; + cst.assertNextChangesEqual({cursor: cursor, expectedChanges: expected}); + jsTestLog("Testing intervening write on another collection"); cursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t1}); let t2cursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t2}); diff --git a/jstests/sharding/change_streams.js b/jstests/sharding/change_streams.js index cd13f86678c..53c22bfb723 100644 --- a/jstests/sharding/change_streams.js +++ b/jstests/sharding/change_streams.js @@ -3,10 +3,9 @@ (function() { "use strict"; + load('jstests/aggregation/extras/utils.js'); // For assertErrorCode() + load('jstests/libs/change_stream_util.js'); load('jstests/replsets/libs/two_phase_drops.js'); // For TwoPhaseDropCollectionTest. - load('jstests/aggregation/extras/utils.js'); // For assertErrorCode(). - - // For supportsMajorityReadConcern(). load("jstests/multiVersion/libs/causal_consistency_helpers.js"); if (!supportsMajorityReadConcern()) { @@ -14,187 +13,241 @@ return; } - const st = new ShardingTest({ - shards: 2, - rs: { - nodes: 1, - enableMajorityReadConcern: '', - // Use a higher frequency for periodic noops to speed up the test. - setParameter: {periodicNoopIntervalSecs: 1} + function runTest(collName, shardKey) { + const st = new ShardingTest({ + shards: 2, + rs: { + nodes: 1, + enableMajorityReadConcern: '', + // Intentionally disable the periodic no-op writer in order to allow the test have + // control of advancing the cluster time. For when it is enabled later in the test, + // use a higher frequency for periodic noops to speed up the test. + setParameter: {periodicNoopIntervalSecs: 1, writePeriodicNoops: false} + } + }); + + const mongosDB = st.s0.getDB(jsTestName()); + assert.commandWorked(st.s0.adminCommand({enableSharding: mongosDB.getName()})); + st.ensurePrimaryShard(mongosDB.getName(), st.shard0.shardName); + + const mongosColl = mongosDB[collName]; + + // + // Sanity tests + // + + // Test that $sort and $group are banned from running in a $changeStream pipeline. + assertErrorCode(mongosDB.NegativeTest, + [{$changeStream: {}}, {$sort: {operationType: 1}}], + ErrorCodes.IllegalOperation); + assertErrorCode(mongosDB.NegativeTest, + [{$changeStream: {}}, {$group: {_id: '$documentKey'}}], + ErrorCodes.IllegalOperation); + + // Test that using change streams with any stages not allowed to run on mongos results in an + // error. + assertErrorCode( + mongosColl, [{$changeStream: {}}, {$out: "shouldntWork"}], ErrorCodes.IllegalOperation); + + // + // Main tests + // + + function makeShardKey(value) { + var obj = {}; + obj[shardKey] = value; + return obj; + } + + function makeShardKeyDocument(value, optExtraFields) { + var obj = {}; + if (shardKey !== '_id') + obj['_id'] = value; + obj[shardKey] = value; + return Object.assign(obj, optExtraFields); } - }); - - const mongosDB = st.s0.getDB(jsTestName()); - const mongosColl = mongosDB[jsTestName()]; - - assert.commandWorked(mongosDB.dropDatabase()); - - // Enable sharding on the test DB and ensure its primary is st.shard0.shardName. - assert.commandWorked(mongosDB.adminCommand({enableSharding: mongosDB.getName()})); - st.ensurePrimaryShard(mongosDB.getName(), st.rs0.getURL()); - - // Shard the test collection on _id. - assert.commandWorked( - mongosDB.adminCommand({shardCollection: mongosColl.getFullName(), key: {_id: 1}})); - - // Split the collection into 2 chunks: [MinKey, 0), [0, MaxKey). - assert.commandWorked( - mongosDB.adminCommand({split: mongosColl.getFullName(), middle: {_id: 0}})); - - // Move the [0, MaxKey) chunk to st.shard1.shardName. - assert.commandWorked(mongosDB.adminCommand( - {moveChunk: mongosColl.getFullName(), find: {_id: 1}, to: st.rs1.getURL()})); - - // Write a document to each chunk. - assert.writeOK(mongosColl.insert({_id: -1})); - assert.writeOK(mongosColl.insert({_id: 1})); - - let changeStream = - mongosColl.aggregate([{$changeStream: {}}, {$project: {_id: 0, clusterTime: 0}}]); - - // Test that a change stream can see inserts on shard 0. - assert.writeOK(mongosColl.insert({_id: 1000})); - assert.writeOK(mongosColl.insert({_id: -1000})); - - assert.soon(() => changeStream.hasNext(), "expected to be able to see the first insert"); - assert.docEq(changeStream.next(), { - documentKey: {_id: 1000}, - fullDocument: {_id: 1000}, - ns: {db: mongosDB.getName(), coll: mongosColl.getName()}, - operationType: "insert", - }); - - // Now do another write to shard 0, advancing that shard's clock and enabling the stream to - // return the earlier write to shard 1. - assert.writeOK(mongosColl.insert({_id: 1001})); - - assert.soon(() => changeStream.hasNext(), "expected to be able to see the second insert"); - assert.docEq(changeStream.next(), { - documentKey: {_id: -1000}, - fullDocument: {_id: -1000}, - ns: {db: mongosDB.getName(), coll: mongosColl.getName()}, - operationType: "insert", - }); - - // Test that all changes are eventually visible due to the periodic noop writer. - assert.commandWorked( - st.rs0.getPrimary().adminCommand({setParameter: 1, writePeriodicNoops: true})); - assert.commandWorked( - st.rs1.getPrimary().adminCommand({setParameter: 1, writePeriodicNoops: true})); - assert.soon(() => changeStream.hasNext()); - - assert.docEq(changeStream.next(), { - documentKey: {_id: 1001}, - fullDocument: {_id: 1001}, - ns: {db: mongosDB.getName(), coll: mongosColl.getName()}, - operationType: "insert", - }); - changeStream.close(); - - // Test that using change streams with any stages not allowed to run on mongos results in an - // error. - assertErrorCode( - mongosColl, [{$changeStream: {}}, {$out: "shouldntWork"}], ErrorCodes.IllegalOperation); - - // Test that it is legal to open a change stream, even if the - // 'internalQueryProhibitMergingOnMongos' parameter is set. - assert.commandWorked( - mongosDB.adminCommand({setParameter: 1, internalQueryProhibitMergingOnMongoS: true})); - let tempCursor = assert.doesNotThrow(() => mongosColl.aggregate([{$changeStream: {}}])); - tempCursor.close(); - assert.commandWorked( - mongosDB.adminCommand({setParameter: 1, internalQueryProhibitMergingOnMongoS: false})); - - // Test that $sort and $group are banned from running in a $changeStream pipeline. - assertErrorCode(mongosColl, - [{$changeStream: {}}, {$sort: {operationType: 1}}], - ErrorCodes.IllegalOperation); - assertErrorCode(mongosColl, - [{$changeStream: {}}, {$group: {_id: "$documentKey"}}], - ErrorCodes.IllegalOperation); - - assert.writeOK(mongosColl.remove({})); - // We awaited the replication of the first write, so the change stream shouldn't return it. - // Use { w: "majority" } to deal with journaling correctly, even though we only have one node. - assert.writeOK(mongosColl.insert({_id: 0, a: 1}, {writeConcern: {w: "majority"}})); - - changeStream = mongosColl.aggregate([{$changeStream: {}}, {$project: {"_id.clusterTime": 0}}]); - assert(!changeStream.hasNext()); - - // Drop the collection and test that we return a "drop" followed by an "invalidate" entry and - // close the cursor. - jsTestLog("Testing getMore command closes cursor for invalidate entries"); - mongosColl.drop(); - // Wait for the drop to actually happen. - assert.soon(() => !TwoPhaseDropCollectionTest.collectionIsPendingDropInDatabase( - mongosColl.getDB(), mongosColl.getName())); - assert.soon(() => changeStream.hasNext()); - assert.eq(changeStream.next().operationType, "drop"); - assert.soon(() => changeStream.hasNext()); - assert.eq(changeStream.next().operationType, "invalidate"); - assert(changeStream.isExhausted()); - - jsTestLog("Testing aggregate command closes cursor for invalidate entries"); - // Shard the test collection on _id. - assert.commandWorked( - mongosDB.adminCommand({shardCollection: mongosColl.getFullName(), key: {_id: 1}})); - - // Split the collection into 2 chunks: [MinKey, 0), [0, MaxKey). - assert.commandWorked( - mongosDB.adminCommand({split: mongosColl.getFullName(), middle: {_id: 0}})); - // Move the [0, MaxKey) chunk to st.shard1.shardName. - assert.commandWorked(mongosDB.adminCommand( - {moveChunk: mongosColl.getFullName(), find: {_id: 1}, to: st.rs1.getURL()})); - - // Write one document to each chunk. - assert.writeOK(mongosColl.insert({_id: -1}, {writeConcern: {w: "majority"}})); - assert.writeOK(mongosColl.insert({_id: 1}, {writeConcern: {w: "majority"}})); - - changeStream = mongosColl.aggregate([{$changeStream: {}}]); - assert(!changeStream.hasNext()); - - // Store a valid resume token before dropping the collection, to be used later in the test. - assert.writeOK(mongosColl.insert({_id: -2}, {writeConcern: {w: "majority"}})); - assert.writeOK(mongosColl.insert({_id: 2}, {writeConcern: {w: "majority"}})); - - assert.soon(() => changeStream.hasNext()); - const resumeToken = changeStream.next()._id; - - mongosColl.drop(); - - assert.soon(() => changeStream.hasNext()); - let next = changeStream.next(); - assert.eq(next.operationType, "insert"); - assert.eq(next.documentKey._id, 2); - - assert.soon(() => changeStream.hasNext()); - assert.eq(changeStream.next().operationType, "drop"); - - assert.soon(() => changeStream.hasNext()); - assert.eq(changeStream.next().operationType, "invalidate"); - - // With an explicit collation, test that we can resume from before the collection drop. - changeStream = mongosColl.watch([{$project: {_id: 0}}], - {resumeAfter: resumeToken, collation: {locale: "simple"}}); - - assert.soon(() => changeStream.hasNext()); - next = changeStream.next(); - assert.eq(next.operationType, "insert"); - assert.eq(next.documentKey, {_id: 2}); - - assert.soon(() => changeStream.hasNext()); - assert.eq(changeStream.next().operationType, "drop"); - - assert.soon(() => changeStream.hasNext()); - assert.eq(changeStream.next().operationType, "invalidate"); - - // Without an explicit collation, test that we *cannot* resume from before the collection drop. - assert.commandFailedWithCode(mongosDB.runCommand({ - aggregate: mongosColl.getName(), - pipeline: [{$changeStream: {resumeAfter: resumeToken}}], - cursor: {} - }), - ErrorCodes.InvalidResumeToken); - - st.stop(); + + jsTestLog('Testing change streams with shard key ' + shardKey); + // Shard the test collection and split it into 2 chunks: + // [MinKey, 0) - shard0, [0, MaxKey) - shard1 + st.shardColl(mongosColl, + makeShardKey(1) /* shard key */, + makeShardKey(0) /* split at */, + makeShardKey(1) /* move to shard 1 */); + + // Write a document to each chunk. + assert.writeOK(mongosColl.insert(makeShardKeyDocument(-1))); + assert.writeOK(mongosColl.insert(makeShardKeyDocument(1))); + + let changeStream = mongosColl.aggregate([{$changeStream: {}}]); + + // Test that a change stream can see inserts on shard 0. + assert.writeOK(mongosColl.insert(makeShardKeyDocument(1000))); + assert.writeOK(mongosColl.insert(makeShardKeyDocument(-1000))); + + assert.soon(() => changeStream.hasNext(), "expected to be able to see the first insert"); + assertChangeStreamEventEq(changeStream.next(), { + documentKey: makeShardKeyDocument(1000), + fullDocument: makeShardKeyDocument(1000), + ns: {db: mongosDB.getName(), coll: mongosColl.getName()}, + operationType: "insert", + }); + + // Because the periodic noop writer is disabled, do another write to shard 0 in order to + // advance that shard's clock and enabling the stream to return the earlier write to shard 1 + assert.writeOK(mongosColl.insert(makeShardKeyDocument(1001))); + + assert.soon(() => changeStream.hasNext(), "expected to be able to see the second insert"); + assertChangeStreamEventEq(changeStream.next(), { + documentKey: makeShardKeyDocument(-1000), + fullDocument: makeShardKeyDocument(-1000), + ns: {db: mongosDB.getName(), coll: mongosColl.getName()}, + operationType: "insert", + }); + + // Test that all changes are eventually visible due to the periodic noop writer. + assert.commandWorked( + st.rs0.getPrimary().adminCommand({setParameter: 1, writePeriodicNoops: true})); + assert.commandWorked( + st.rs1.getPrimary().adminCommand({setParameter: 1, writePeriodicNoops: true})); + + assert.soon(() => changeStream.hasNext()); + assertChangeStreamEventEq(changeStream.next(), { + documentKey: makeShardKeyDocument(1001), + fullDocument: makeShardKeyDocument(1001), + ns: {db: mongosDB.getName(), coll: mongosColl.getName()}, + operationType: "insert", + }); + changeStream.close(); + + jsTestLog('Testing multi-update change streams with shard key ' + shardKey); + assert.writeOK(mongosColl.insert(makeShardKeyDocument(10, {a: 0, b: 0}))); + assert.writeOK(mongosColl.insert(makeShardKeyDocument(-10, {a: 0, b: 0}))); + changeStream = mongosColl.aggregate([{$changeStream: {}}]); + + assert.writeOK(mongosColl.update({a: 0}, {$set: {b: 2}}, {multi: true})); + + assert.soon(() => changeStream.hasNext()); + assertChangeStreamEventEq(changeStream.next(), { + operationType: "update", + ns: {db: mongosDB.getName(), coll: mongosColl.getName()}, + documentKey: makeShardKeyDocument(-10), + updateDescription: {updatedFields: {b: 2}, removedFields: []}, + }); + + assert.soon(() => changeStream.hasNext()); + assertChangeStreamEventEq(changeStream.next(), { + operationType: "update", + ns: {db: mongosDB.getName(), coll: mongosColl.getName()}, + documentKey: makeShardKeyDocument(10), + updateDescription: {updatedFields: {b: 2}, removedFields: []}, + }); + changeStream.close(); + + // Test that it is legal to open a change stream, even if the + // 'internalQueryProhibitMergingOnMongos' parameter is set. + assert.commandWorked( + st.s0.adminCommand({setParameter: 1, internalQueryProhibitMergingOnMongoS: true})); + let tempCursor = assert.doesNotThrow(() => mongosColl.aggregate([{$changeStream: {}}])); + tempCursor.close(); + assert.commandWorked( + st.s0.adminCommand({setParameter: 1, internalQueryProhibitMergingOnMongoS: false})); + + assert.writeOK(mongosColl.remove({})); + // We awaited the replication of the first write, so the change stream shouldn't return it. + // Use { w: "majority" } to deal with journaling correctly, even though we only have one + // node. + assert.writeOK( + mongosColl.insert(makeShardKeyDocument(0, {a: 1}), {writeConcern: {w: "majority"}})); + + changeStream = mongosColl.aggregate([{$changeStream: {}}]); + assert(!changeStream.hasNext()); + + // Drop the collection and test that we return a "drop" followed by an "invalidate" entry + // and close the cursor. + jsTestLog('Testing getMore command closes cursor for invalidate entries with shard key' + + shardKey); + mongosColl.drop(); + // Wait for the drop to actually happen. + assert.soon(() => !TwoPhaseDropCollectionTest.collectionIsPendingDropInDatabase( + mongosColl.getDB(), mongosColl.getName())); + assert.soon(() => changeStream.hasNext()); + assert.eq(changeStream.next().operationType, "drop"); + assert.soon(() => changeStream.hasNext()); + assert.eq(changeStream.next().operationType, "invalidate"); + assert(changeStream.isExhausted()); + + jsTestLog('Testing aggregate command closes cursor for invalidate entries with shard key' + + shardKey); + // Shard the test collection and split it into 2 chunks: + // [MinKey, 0) - shard0, [0, MaxKey) - shard1 + st.shardColl(mongosColl, + makeShardKey(1) /* shard key */, + makeShardKey(0) /* split at */, + makeShardKey(1) /* move to shard 1 */); + + // Write one document to each chunk. + assert.writeOK( + mongosColl.insert(makeShardKeyDocument(-1), {writeConcern: {w: "majority"}})); + assert.writeOK(mongosColl.insert(makeShardKeyDocument(1), {writeConcern: {w: "majority"}})); + + changeStream = mongosColl.aggregate([{$changeStream: {}}]); + assert(!changeStream.hasNext()); + + // Store a valid resume token before dropping the collection, to be used later in the test + assert.writeOK( + mongosColl.insert(makeShardKeyDocument(-2), {writeConcern: {w: "majority"}})); + assert.writeOK(mongosColl.insert(makeShardKeyDocument(2), {writeConcern: {w: "majority"}})); + + assert.soon(() => changeStream.hasNext()); + const resumeToken = changeStream.next()._id; + + mongosColl.drop(); + + assert.soon(() => changeStream.hasNext()); + assertChangeStreamEventEq(changeStream.next(), { + documentKey: makeShardKeyDocument(2), + fullDocument: makeShardKeyDocument(2), + ns: {db: mongosDB.getName(), coll: mongosColl.getName()}, + operationType: "insert", + }); + + assert.soon(() => changeStream.hasNext()); + assert.eq(changeStream.next().operationType, "drop"); + + assert.soon(() => changeStream.hasNext()); + assert.eq(changeStream.next().operationType, "invalidate"); + + // With an explicit collation, test that we can resume from before the collection drop + changeStream = + mongosColl.watch([], {resumeAfter: resumeToken, collation: {locale: "simple"}}); + + assert.soon(() => changeStream.hasNext()); + assertChangeStreamEventEq(changeStream.next(), { + documentKey: makeShardKeyDocument(2), + fullDocument: makeShardKeyDocument(2), + ns: {db: mongosDB.getName(), coll: mongosColl.getName()}, + operationType: "insert", + }); + + assert.soon(() => changeStream.hasNext()); + assert.eq(changeStream.next().operationType, "drop"); + + assert.soon(() => changeStream.hasNext()); + assert.eq(changeStream.next().operationType, "invalidate"); + + // Without an explicit collation, test that we *cannot* resume from before the collection + // drop + assert.commandFailedWithCode(mongosDB.runCommand({ + aggregate: mongosColl.getName(), + pipeline: [{$changeStream: {resumeAfter: resumeToken}}], + cursor: {} + }), + ErrorCodes.InvalidResumeToken); + + st.stop(); + } + + runTest('with_id_shard_key', '_id'); + runTest('with_non_id_shard_key', 'non_id'); })(); diff --git a/src/mongo/db/catalog/capped_utils.cpp b/src/mongo/db/catalog/capped_utils.cpp index 376f879e6f1..df880bd6f4c 100644 --- a/src/mongo/db/catalog/capped_utils.cpp +++ b/src/mongo/db/catalog/capped_utils.cpp @@ -53,7 +53,9 @@ #include "mongo/db/service_context.h" #include "mongo/util/scopeguard.h" -mongo::Status mongo::emptyCapped(OperationContext* opCtx, const NamespaceString& collectionName) { +namespace mongo { + +Status emptyCapped(OperationContext* opCtx, const NamespaceString& collectionName) { AutoGetDb autoDb(opCtx, collectionName.db(), MODE_X); bool userInitiatedWritesAndNotPrimary = opCtx->writesAreReplicated() && @@ -103,20 +105,20 @@ mongo::Status mongo::emptyCapped(OperationContext* opCtx, const NamespaceString& return status; } - getGlobalServiceContext()->getOpObserver()->onEmptyCapped( - opCtx, collection->ns(), collection->uuid()); + const auto service = opCtx->getServiceContext(); + service->getOpObserver()->onEmptyCapped(opCtx, collection->ns(), collection->uuid()); wuow.commit(); return Status::OK(); } -mongo::Status mongo::cloneCollectionAsCapped(OperationContext* opCtx, - Database* db, - const std::string& shortFrom, - const std::string& shortTo, - long long size, - bool temp) { +Status cloneCollectionAsCapped(OperationContext* opCtx, + Database* db, + const std::string& shortFrom, + const std::string& shortTo, + long long size, + bool temp) { NamespaceString fromNss(db->name(), shortFrom); NamespaceString toNss(db->name(), shortTo); @@ -251,9 +253,9 @@ mongo::Status mongo::cloneCollectionAsCapped(OperationContext* opCtx, MONGO_UNREACHABLE; } -mongo::Status mongo::convertToCapped(OperationContext* opCtx, - const NamespaceString& collectionName, - long long size) { +Status convertToCapped(OperationContext* opCtx, + const NamespaceString& collectionName, + long long size) { StringData dbname = collectionName.db(); StringData shortSource = collectionName.coll(); @@ -301,3 +303,5 @@ mongo::Status mongo::convertToCapped(OperationContext* opCtx, options.stayTemp = false; return renameCollection(opCtx, longTmpName, collectionName, options); } + +} // namespace mongo diff --git a/src/mongo/db/catalog/create_collection.cpp b/src/mongo/db/catalog/create_collection.cpp index a9bf373f38a..20ed630bae4 100644 --- a/src/mongo/db/catalog/create_collection.cpp +++ b/src/mongo/db/catalog/create_collection.cpp @@ -51,6 +51,7 @@ namespace mongo { namespace { + /** * Shared part of the implementation of the createCollection versions for replicated and regular * collection creation. @@ -99,8 +100,10 @@ Status createCollection(OperationContext* opCtx, return writeConflictRetry(opCtx, "create", nss.ns(), [&] { Lock::DBLock dbXLock(opCtx, nss.db(), MODE_X); + const bool shardVersionCheck = true; OldClientContext ctx(opCtx, nss.ns(), shardVersionCheck); + if (opCtx->writesAreReplicated() && !repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesFor(opCtx, nss)) { return Status(ErrorCodes::NotMaster, diff --git a/src/mongo/db/catalog/rename_collection.cpp b/src/mongo/db/catalog/rename_collection.cpp index 6aadfe754b4..3d5b681edd4 100644 --- a/src/mongo/db/catalog/rename_collection.cpp +++ b/src/mongo/db/catalog/rename_collection.cpp @@ -144,7 +144,8 @@ Status renameCollectionCommon(OperationContext* opCtx, // We stay in source context the whole time. This is mostly to set the CurOp namespace. boost::optional<OldClientContext> ctx; - ctx.emplace(opCtx, source.ns()); + const bool shardVersionCheck = true; + ctx.emplace(opCtx, source.ns(), shardVersionCheck); auto replCoord = repl::ReplicationCoordinator::get(opCtx); bool userInitiatedWritesAndNotPrimary = diff --git a/src/mongo/db/commands/collection_to_capped.cpp b/src/mongo/db/commands/collection_to_capped.cpp index a7b3530d0e9..18635222f58 100644 --- a/src/mongo/db/commands/collection_to_capped.cpp +++ b/src/mongo/db/commands/collection_to_capped.cpp @@ -46,10 +46,7 @@ #include "mongo/db/service_context.h" namespace mongo { - -using std::unique_ptr; -using std::string; -using std::stringstream; +namespace { class CmdCloneCollectionAsCapped : public ErrmsgCommandDeprecated { public: @@ -87,9 +84,9 @@ public: out->push_back(Privilege(ResourcePattern::forExactNamespace(nss), targetActions)); } bool errmsgRun(OperationContext* opCtx, - const string& dbname, + const std::string& dbname, const BSONObj& jsobj, - string& errmsg, + std::string& errmsg, BSONObjBuilder& result) { const auto fromElt = jsobj["cloneCollectionAsCapped"]; const auto toElt = jsobj["toCollection"]; @@ -140,13 +137,13 @@ public: uassertStatusOK(status); return true; } + } cmdCloneCollectionAsCapped; -/* jan2010: - Converts the given collection to a capped collection w/ the specified size. - This command is not highly used, and is not currently supported with sharded - environments. - */ +/** + * Converts the given collection to a capped collection w/ the specified size. This command is not + * highly used, and is not currently supported with sharded environments. + */ class CmdConvertToCapped : public ErrmsgCommandDeprecated { public: CmdConvertToCapped() : ErrmsgCommandDeprecated("convertToCapped") {} @@ -168,9 +165,9 @@ public: } bool errmsgRun(OperationContext* opCtx, - const string& dbname, + const std::string& dbname, const BSONObj& jsobj, - string& errmsg, + std::string& errmsg, BSONObjBuilder& result) { const NamespaceString nss(CommandHelpers::parseNsCollectionRequired(dbname, jsobj)); long long size = jsobj.getField("size").safeNumberLong(); @@ -185,4 +182,6 @@ public: } } cmdConvertToCapped; -} + +} // namespace +} // namespace mongo diff --git a/src/mongo/db/commands/find_cmd.cpp b/src/mongo/db/commands/find_cmd.cpp index 6811bb958da..4ce8c38e5f2 100644 --- a/src/mongo/db/commands/find_cmd.cpp +++ b/src/mongo/db/commands/find_cmd.cpp @@ -201,7 +201,7 @@ public: Collection* const collection = ctx->getCollection(); // We have a parsed query. Time to get the execution plan for it. - auto statusWithPlanExecutor = getExecutorFind(opCtx, collection, nss, std::move(cq)); + auto statusWithPlanExecutor = getExecutorFind(opCtx, collection, std::move(cq)); if (!statusWithPlanExecutor.isOK()) { return statusWithPlanExecutor.getStatus(); } @@ -312,7 +312,7 @@ public: Collection* const collection = ctx->getCollection(); // Get the execution plan for the query. - auto statusWithPlanExecutor = getExecutorFind(opCtx, collection, nss, std::move(cq)); + auto statusWithPlanExecutor = getExecutorFind(opCtx, collection, std::move(cq)); uassertStatusOK(statusWithPlanExecutor.getStatus()); auto exec = std::move(statusWithPlanExecutor.getValue()); diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp index 5b368e81af1..5291d665598 100644 --- a/src/mongo/db/pipeline/pipeline_d.cpp +++ b/src/mongo/db/pipeline/pipeline_d.cpp @@ -72,6 +72,7 @@ #include "mongo/db/query/plan_summary_stats.h" #include "mongo/db/query/query_planner.h" #include "mongo/db/s/collection_sharding_state.h" +#include "mongo/db/s/operation_sharding_state.h" #include "mongo/db/s/sharding_state.h" #include "mongo/db/service_context.h" #include "mongo/db/session_catalog.h" @@ -154,7 +155,7 @@ StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> createRandomCursorEx } // If we're in a sharded environment, we need to filter out documents we don't own. - if (ShardingState::get(opCtx)->needCollectionMetadata(opCtx, collection->ns().ns())) { + if (OperationShardingState::isOperationVersioned(opCtx)) { auto shardFilterStage = stdx::make_unique<ShardFilterStage>( opCtx, CollectionShardingState::get(opCtx, collection->ns())->getOrphansFilter(opCtx), @@ -217,7 +218,7 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> attemptToGetExe return {cq.getStatus()}; } - return getExecutorFind(opCtx, collection, nss, std::move(cq.getValue()), plannerOpts); + return getExecutorFind(opCtx, collection, std::move(cq.getValue()), plannerOpts); } BSONObj removeSortKeyMetaProjection(BSONObj projectionObj) { diff --git a/src/mongo/db/query/find.cpp b/src/mongo/db/query/find.cpp index ae14be74be9..c8308a0f331 100644 --- a/src/mongo/db/query/find.cpp +++ b/src/mongo/db/query/find.cpp @@ -560,8 +560,8 @@ std::string runQuery(OperationContext* opCtx, repl::ReplicationCoordinator::get(opCtx)->checkCanServeReadsFor(opCtx, nss, slaveOK)); } - // We have a parsed query. Time to get the execution plan for it. - auto exec = uassertStatusOK(getExecutorLegacyFind(opCtx, collection, nss, std::move(cq))); + // Get the execution plan for the query. + auto exec = uassertStatusOK(getExecutorLegacyFind(opCtx, collection, std::move(cq))); const QueryRequest& qr = exec->getCanonicalQuery()->getQueryRequest(); diff --git a/src/mongo/db/query/get_executor.cpp b/src/mongo/db/query/get_executor.cpp index ea89990bd2c..c00ab569f65 100644 --- a/src/mongo/db/query/get_executor.cpp +++ b/src/mongo/db/query/get_executor.cpp @@ -76,9 +76,8 @@ #include "mongo/db/query/stage_builder.h" #include "mongo/db/repl/optime.h" #include "mongo/db/repl/replication_coordinator.h" -#include "mongo/db/s/collection_metadata.h" #include "mongo/db/s/collection_sharding_state.h" -#include "mongo/db/s/sharding_state.h" +#include "mongo/db/s/operation_sharding_state.h" #include "mongo/db/server_options.h" #include "mongo/db/server_parameters.h" #include "mongo/db/service_context.h" @@ -674,7 +673,6 @@ StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getOplogStartHack( StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> _getExecutorFind( OperationContext* opCtx, Collection* collection, - const NamespaceString& nss, unique_ptr<CanonicalQuery> canonicalQuery, PlanExecutor::YieldPolicy yieldPolicy, size_t plannerOptions) { @@ -687,7 +685,7 @@ StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> _getExecutorFind( return getOplogStartHack(opCtx, collection, std::move(canonicalQuery), plannerOptions); } - if (ShardingState::get(opCtx)->needCollectionMetadata(opCtx, nss.ns())) { + if (OperationShardingState::isOperationVersioned(opCtx)) { plannerOptions |= QueryPlannerParams::INCLUDE_SHARD_FILTER; } return getExecutor(opCtx, collection, std::move(canonicalQuery), yieldPolicy, plannerOptions); @@ -698,25 +696,22 @@ StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> _getExecutorFind( StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutorFind( OperationContext* opCtx, Collection* collection, - const NamespaceString& nss, unique_ptr<CanonicalQuery> canonicalQuery, size_t plannerOptions) { - auto readConcernArgs = repl::ReadConcernArgs::get(opCtx); + const auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx); auto yieldPolicy = readConcernArgs.getLevel() == repl::ReadConcernLevel::kSnapshotReadConcern ? PlanExecutor::INTERRUPT_ONLY : PlanExecutor::YIELD_AUTO; return _getExecutorFind( - opCtx, collection, nss, std::move(canonicalQuery), yieldPolicy, plannerOptions); + opCtx, collection, std::move(canonicalQuery), yieldPolicy, plannerOptions); } StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutorLegacyFind( OperationContext* opCtx, Collection* collection, - const NamespaceString& nss, std::unique_ptr<CanonicalQuery> canonicalQuery) { return _getExecutorFind(opCtx, collection, - nss, std::move(canonicalQuery), PlanExecutor::YIELD_AUTO, QueryPlannerParams::DEFAULT); @@ -1368,7 +1363,7 @@ StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutorCount( } size_t plannerOptions = QueryPlannerParams::IS_COUNT; - if (ShardingState::get(opCtx)->needCollectionMetadata(opCtx, request.getNs().ns())) { + if (OperationShardingState::isOperationVersioned(opCtx)) { plannerOptions |= QueryPlannerParams::INCLUDE_SHARD_FILTER; } diff --git a/src/mongo/db/query/get_executor.h b/src/mongo/db/query/get_executor.h index f8abcc16c13..26b7513f251 100644 --- a/src/mongo/db/query/get_executor.h +++ b/src/mongo/db/query/get_executor.h @@ -99,7 +99,6 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutor( StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutorFind( OperationContext* opCtx, Collection* collection, - const NamespaceString& nss, std::unique_ptr<CanonicalQuery> canonicalQuery, size_t plannerOptions = QueryPlannerParams::DEFAULT); @@ -109,7 +108,6 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutorFind StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutorLegacyFind( OperationContext* opCtx, Collection* collection, - const NamespaceString& nss, std::unique_ptr<CanonicalQuery> canonicalQuery); /** diff --git a/src/mongo/db/query/query_planner_params.h b/src/mongo/db/query/query_planner_params.h index 9e4e8fa4f0c..cdac2cf1307 100644 --- a/src/mongo/db/query/query_planner_params.h +++ b/src/mongo/db/query/query_planner_params.h @@ -59,10 +59,11 @@ struct QueryPlannerParams { // Set this if you're running on a sharded cluster. We'll add a "drop all docs that // shouldn't be on this shard" stage before projection. // - // In order to set this, you must check - // ShardingState::needCollectionMetadata(current_namespace) in the same lock that you use to - // build the query executor. You must also wrap the PlanExecutor in a ClientCursor within - // the same lock. See the comment on ShardFilterStage for details. + // In order to set this, you must check OperationShardingState::isOperationVersioned() in + // the same lock that you use to build the query executor. You must also wrap the + // PlanExecutor in a ClientCursor within the same lock. + // + // See the comment on ShardFilterStage for details. INCLUDE_SHARD_FILTER = 1 << 2, // Set this if you don't want any plans with a blocking sort stage. All sorts must be diff --git a/src/mongo/db/s/cleanup_orphaned_cmd.cpp b/src/mongo/db/s/cleanup_orphaned_cmd.cpp index 8bb1c5546e0..c1676b79b18 100644 --- a/src/mongo/db/s/cleanup_orphaned_cmd.cpp +++ b/src/mongo/db/s/cleanup_orphaned_cmd.cpp @@ -81,13 +81,7 @@ CleanupResult cleanupOrphanedData(OperationContext* opCtx, { AutoGetCollection autoColl(opCtx, ns, MODE_IX); auto* const css = CollectionShardingRuntime::get(opCtx, ns); - const auto optMetadata = css->getCurrentMetadataIfKnown(); - uassert(ErrorCodes::ConflictingOperationInProgress, - str::stream() << "Unable to establish sharding status for collection " << ns.ns(), - optMetadata); - - const auto& metadata = *optMetadata; - + const auto metadata = css->getCurrentMetadata(); if (!metadata->isSharded()) { LOG(0) << "skipping orphaned data cleanup for " << ns.ns() << ", collection is not sharded"; diff --git a/src/mongo/db/s/operation_sharding_state.cpp b/src/mongo/db/s/operation_sharding_state.cpp index 2c588be9ece..0600d1f6216 100644 --- a/src/mongo/db/s/operation_sharding_state.cpp +++ b/src/mongo/db/s/operation_sharding_state.cpp @@ -33,9 +33,9 @@ #include "mongo/db/s/operation_sharding_state.h" #include "mongo/db/operation_context.h" +#include "mongo/db/s/sharded_connection_info.h" namespace mongo { - namespace { const OperationContext::Decoration<OperationShardingState> shardingMetadataDecoration = @@ -49,6 +49,7 @@ const Milliseconds kMaxWaitForMovePrimaryCriticalSection = Minutes(5); // The name of the field in which the client attaches its database version. constexpr auto kDbVersionField = "databaseVersion"_sd; + } // namespace OperationShardingState::OperationShardingState() = default; @@ -61,6 +62,14 @@ OperationShardingState& OperationShardingState::get(OperationContext* opCtx) { return shardingMetadataDecoration(opCtx); } +bool OperationShardingState::isOperationVersioned(OperationContext* opCtx) { + const auto client = opCtx->getClient(); + + // Shard version information received from mongos may either be attached to the Client or + // directly to the OperationContext + return ShardedConnectionInfo::get(client, false) || get(opCtx).hasShardVersion(); +} + void OperationShardingState::setAllowImplicitCollectionCreation( const BSONElement& allowImplicitCollectionCreationElem) { if (!allowImplicitCollectionCreationElem.eoo()) { diff --git a/src/mongo/db/s/operation_sharding_state.h b/src/mongo/db/s/operation_sharding_state.h index d2cf419f8e6..51914410a3e 100644 --- a/src/mongo/db/s/operation_sharding_state.h +++ b/src/mongo/db/s/operation_sharding_state.h @@ -64,6 +64,13 @@ public: static OperationShardingState& get(OperationContext* opCtx); /** + * Returns true if the the current operation was sent by the caller with shard version + * information attached, meaning that it must perform shard version checking and orphan + * filtering. + */ + static bool isOperationVersioned(OperationContext* opCtx); + + /** * Requests on a sharded collection that are broadcast without a shardVersion should not cause * the collection to be created on a shard that does not know about the collection already, * since the collection options will not be propagated. Such requests specify to disallow diff --git a/src/mongo/db/s/sharding_state.cpp b/src/mongo/db/s/sharding_state.cpp index f5e41c73f20..331c52ca2d2 100644 --- a/src/mongo/db/s/sharding_state.cpp +++ b/src/mongo/db/s/sharding_state.cpp @@ -34,8 +34,7 @@ #include "mongo/db/s/sharding_state.h" -#include "mongo/db/s/operation_sharding_state.h" -#include "mongo/db/s/sharded_connection_info.h" +#include "mongo/db/operation_context.h" #include "mongo/db/server_options.h" #include "mongo/util/log.h" @@ -117,18 +116,6 @@ OID ShardingState::clusterId() { return _clusterId; } -bool ShardingState::needCollectionMetadata(OperationContext* opCtx, const std::string& ns) { - if (!enabled()) - return false; - - Client* client = opCtx->getClient(); - - // Shard version information received from mongos may either by attached to the Client or - // directly to the OperationContext. - return ShardedConnectionInfo::get(client, false) || - OperationShardingState::get(opCtx).hasShardVersion(); -} - void ShardingState::clearForTests() { _initializationState.store(static_cast<uint32_t>(InitializationState::kNew)); } diff --git a/src/mongo/db/s/sharding_state.h b/src/mongo/db/s/sharding_state.h index fb220c9aaea..e049d0fcb3b 100644 --- a/src/mongo/db/s/sharding_state.h +++ b/src/mongo/db/s/sharding_state.h @@ -106,12 +106,6 @@ public: OID clusterId(); /** - * Returns true if this node is a shard and if the currently runnint operation must engage the - * sharding subsystem (i.e., perform version checking, orphan filtering, etc). - */ - bool needCollectionMetadata(OperationContext* opCtx, const std::string& ns); - - /** * For testing only. This is a workaround for the fact that it is not possible to get a clean * ServiceContext in between test executions. Because of this, tests which require that they get * started with a clean (uninitialized) ShardingState must invoke this in their tearDown method. diff --git a/src/mongo/s/commands/commands_public.cpp b/src/mongo/s/commands/commands_public.cpp index a5eb613114c..19f0a97c962 100644 --- a/src/mongo/s/commands/commands_public.cpp +++ b/src/mongo/s/commands/commands_public.cpp @@ -366,6 +366,9 @@ public: const NamespaceString nss(parseNs(dbName, cmdObj)); const auto routingInfo = uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss)); + uassert(ErrorCodes::IllegalOperation, + "You can't convertToCapped a sharded collection", + !routingInfo.cm()); // convertToCapped creates a temp collection and renames it at the end. It will require // special handling for create collection. |