diff options
-rw-r--r-- | jstests/change_streams/change_stream.js | 40 | ||||
-rw-r--r-- | jstests/sharding/change_streams.js | 423 | ||||
-rw-r--r-- | src/mongo/db/catalog/capped_utils.cpp | 28 | ||||
-rw-r--r-- | src/mongo/db/catalog/create_collection.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/catalog/rename_collection.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/commands/collection_to_capped.cpp | 27 | ||||
-rw-r--r-- | src/mongo/db/commands/find_cmd.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/pipeline/pipeline_d.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/query/find.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/query/get_executor.cpp | 15 | ||||
-rw-r--r-- | src/mongo/db/query/get_executor.h | 2 | ||||
-rw-r--r-- | src/mongo/db/query/query_planner_params.h | 9 | ||||
-rw-r--r-- | src/mongo/db/s/cleanup_orphaned_cmd.cpp | 8 | ||||
-rw-r--r-- | src/mongo/db/s/operation_sharding_state.cpp | 11 | ||||
-rw-r--r-- | src/mongo/db/s/operation_sharding_state.h | 7 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_state.cpp | 15 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_state.h | 6 | ||||
-rw-r--r-- | src/mongo/s/commands/commands_public.cpp | 3 |
18 files changed, 262 insertions, 351 deletions
diff --git a/jstests/change_streams/change_stream.js b/jstests/change_streams/change_stream.js index 9f41255c599..7c5688b0704 100644 --- a/jstests/change_streams/change_stream.js +++ b/jstests/change_streams/change_stream.js @@ -132,27 +132,6 @@ }; cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [expected]}); - jsTestLog("Testing multi:true update"); - assert.writeOK(db.t1.insert({_id: 4, a: 0, b: 1})); - assert.writeOK(db.t1.insert({_id: 5, a: 0, b: 1})); - cursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t1}); - assert.writeOK(db.t1.update({a: 0}, {$set: {b: 2}}, {multi: true})); - expected = [ - { - documentKey: {_id: 4}, - ns: {db: "test", coll: "t1"}, - operationType: "update", - updateDescription: {removedFields: [], updatedFields: {b: 2}} - }, - { - documentKey: {_id: 5}, - ns: {db: "test", coll: "t1"}, - operationType: "update", - updateDescription: {removedFields: [], updatedFields: {b: 2}} - } - ]; - cst.assertNextChangesEqual({cursor: cursor, expectedChanges: expected}); - jsTestLog("Testing delete"); cursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t1}); assert.writeOK(db.t1.remove({_id: 1})); @@ -163,25 +142,6 @@ }; cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [expected]}); - jsTestLog("Testing justOne:false delete"); - assert.writeOK(db.t1.insert({_id: 6, a: 1, b: 1})); - assert.writeOK(db.t1.insert({_id: 7, a: 1, b: 1})); - cursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t1}); - assert.writeOK(db.t1.remove({a: 1}, {justOne: false})); - expected = [ - { - documentKey: {_id: 6}, - ns: {db: "test", coll: "t1"}, - operationType: "delete", - }, - { - documentKey: {_id: 7}, - ns: {db: "test", coll: "t1"}, - operationType: "delete", - } - ]; - cst.assertNextChangesEqual({cursor: cursor, expectedChanges: expected}); - jsTestLog("Testing intervening write on another collection"); cursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t1}); let t2cursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t2}); diff --git a/jstests/sharding/change_streams.js b/jstests/sharding/change_streams.js index 53c22bfb723..cd13f86678c 100644 --- a/jstests/sharding/change_streams.js +++ b/jstests/sharding/change_streams.js @@ -3,9 +3,10 @@ (function() { "use strict"; - load('jstests/aggregation/extras/utils.js'); // For assertErrorCode() - load('jstests/libs/change_stream_util.js'); load('jstests/replsets/libs/two_phase_drops.js'); // For TwoPhaseDropCollectionTest. + load('jstests/aggregation/extras/utils.js'); // For assertErrorCode(). + + // For supportsMajorityReadConcern(). load("jstests/multiVersion/libs/causal_consistency_helpers.js"); if (!supportsMajorityReadConcern()) { @@ -13,241 +14,187 @@ return; } - function runTest(collName, shardKey) { - const st = new ShardingTest({ - shards: 2, - rs: { - nodes: 1, - enableMajorityReadConcern: '', - // Intentionally disable the periodic no-op writer in order to allow the test have - // control of advancing the cluster time. For when it is enabled later in the test, - // use a higher frequency for periodic noops to speed up the test. - setParameter: {periodicNoopIntervalSecs: 1, writePeriodicNoops: false} - } - }); - - const mongosDB = st.s0.getDB(jsTestName()); - assert.commandWorked(st.s0.adminCommand({enableSharding: mongosDB.getName()})); - st.ensurePrimaryShard(mongosDB.getName(), st.shard0.shardName); - - const mongosColl = mongosDB[collName]; - - // - // Sanity tests - // - - // Test that $sort and $group are banned from running in a $changeStream pipeline. - assertErrorCode(mongosDB.NegativeTest, - [{$changeStream: {}}, {$sort: {operationType: 1}}], - ErrorCodes.IllegalOperation); - assertErrorCode(mongosDB.NegativeTest, - [{$changeStream: {}}, {$group: {_id: '$documentKey'}}], - ErrorCodes.IllegalOperation); - - // Test that using change streams with any stages not allowed to run on mongos results in an - // error. - assertErrorCode( - mongosColl, [{$changeStream: {}}, {$out: "shouldntWork"}], ErrorCodes.IllegalOperation); - - // - // Main tests - // - - function makeShardKey(value) { - var obj = {}; - obj[shardKey] = value; - return obj; - } - - function makeShardKeyDocument(value, optExtraFields) { - var obj = {}; - if (shardKey !== '_id') - obj['_id'] = value; - obj[shardKey] = value; - return Object.assign(obj, optExtraFields); + const st = new ShardingTest({ + shards: 2, + rs: { + nodes: 1, + enableMajorityReadConcern: '', + // Use a higher frequency for periodic noops to speed up the test. + setParameter: {periodicNoopIntervalSecs: 1} } - - jsTestLog('Testing change streams with shard key ' + shardKey); - // Shard the test collection and split it into 2 chunks: - // [MinKey, 0) - shard0, [0, MaxKey) - shard1 - st.shardColl(mongosColl, - makeShardKey(1) /* shard key */, - makeShardKey(0) /* split at */, - makeShardKey(1) /* move to shard 1 */); - - // Write a document to each chunk. - assert.writeOK(mongosColl.insert(makeShardKeyDocument(-1))); - assert.writeOK(mongosColl.insert(makeShardKeyDocument(1))); - - let changeStream = mongosColl.aggregate([{$changeStream: {}}]); - - // Test that a change stream can see inserts on shard 0. - assert.writeOK(mongosColl.insert(makeShardKeyDocument(1000))); - assert.writeOK(mongosColl.insert(makeShardKeyDocument(-1000))); - - assert.soon(() => changeStream.hasNext(), "expected to be able to see the first insert"); - assertChangeStreamEventEq(changeStream.next(), { - documentKey: makeShardKeyDocument(1000), - fullDocument: makeShardKeyDocument(1000), - ns: {db: mongosDB.getName(), coll: mongosColl.getName()}, - operationType: "insert", - }); - - // Because the periodic noop writer is disabled, do another write to shard 0 in order to - // advance that shard's clock and enabling the stream to return the earlier write to shard 1 - assert.writeOK(mongosColl.insert(makeShardKeyDocument(1001))); - - assert.soon(() => changeStream.hasNext(), "expected to be able to see the second insert"); - assertChangeStreamEventEq(changeStream.next(), { - documentKey: makeShardKeyDocument(-1000), - fullDocument: makeShardKeyDocument(-1000), - ns: {db: mongosDB.getName(), coll: mongosColl.getName()}, - operationType: "insert", - }); - - // Test that all changes are eventually visible due to the periodic noop writer. - assert.commandWorked( - st.rs0.getPrimary().adminCommand({setParameter: 1, writePeriodicNoops: true})); - assert.commandWorked( - st.rs1.getPrimary().adminCommand({setParameter: 1, writePeriodicNoops: true})); - - assert.soon(() => changeStream.hasNext()); - assertChangeStreamEventEq(changeStream.next(), { - documentKey: makeShardKeyDocument(1001), - fullDocument: makeShardKeyDocument(1001), - ns: {db: mongosDB.getName(), coll: mongosColl.getName()}, - operationType: "insert", - }); - changeStream.close(); - - jsTestLog('Testing multi-update change streams with shard key ' + shardKey); - assert.writeOK(mongosColl.insert(makeShardKeyDocument(10, {a: 0, b: 0}))); - assert.writeOK(mongosColl.insert(makeShardKeyDocument(-10, {a: 0, b: 0}))); - changeStream = mongosColl.aggregate([{$changeStream: {}}]); - - assert.writeOK(mongosColl.update({a: 0}, {$set: {b: 2}}, {multi: true})); - - assert.soon(() => changeStream.hasNext()); - assertChangeStreamEventEq(changeStream.next(), { - operationType: "update", - ns: {db: mongosDB.getName(), coll: mongosColl.getName()}, - documentKey: makeShardKeyDocument(-10), - updateDescription: {updatedFields: {b: 2}, removedFields: []}, - }); - - assert.soon(() => changeStream.hasNext()); - assertChangeStreamEventEq(changeStream.next(), { - operationType: "update", - ns: {db: mongosDB.getName(), coll: mongosColl.getName()}, - documentKey: makeShardKeyDocument(10), - updateDescription: {updatedFields: {b: 2}, removedFields: []}, - }); - changeStream.close(); - - // Test that it is legal to open a change stream, even if the - // 'internalQueryProhibitMergingOnMongos' parameter is set. - assert.commandWorked( - st.s0.adminCommand({setParameter: 1, internalQueryProhibitMergingOnMongoS: true})); - let tempCursor = assert.doesNotThrow(() => mongosColl.aggregate([{$changeStream: {}}])); - tempCursor.close(); - assert.commandWorked( - st.s0.adminCommand({setParameter: 1, internalQueryProhibitMergingOnMongoS: false})); - - assert.writeOK(mongosColl.remove({})); - // We awaited the replication of the first write, so the change stream shouldn't return it. - // Use { w: "majority" } to deal with journaling correctly, even though we only have one - // node. - assert.writeOK( - mongosColl.insert(makeShardKeyDocument(0, {a: 1}), {writeConcern: {w: "majority"}})); - - changeStream = mongosColl.aggregate([{$changeStream: {}}]); - assert(!changeStream.hasNext()); - - // Drop the collection and test that we return a "drop" followed by an "invalidate" entry - // and close the cursor. - jsTestLog('Testing getMore command closes cursor for invalidate entries with shard key' + - shardKey); - mongosColl.drop(); - // Wait for the drop to actually happen. - assert.soon(() => !TwoPhaseDropCollectionTest.collectionIsPendingDropInDatabase( - mongosColl.getDB(), mongosColl.getName())); - assert.soon(() => changeStream.hasNext()); - assert.eq(changeStream.next().operationType, "drop"); - assert.soon(() => changeStream.hasNext()); - assert.eq(changeStream.next().operationType, "invalidate"); - assert(changeStream.isExhausted()); - - jsTestLog('Testing aggregate command closes cursor for invalidate entries with shard key' + - shardKey); - // Shard the test collection and split it into 2 chunks: - // [MinKey, 0) - shard0, [0, MaxKey) - shard1 - st.shardColl(mongosColl, - makeShardKey(1) /* shard key */, - makeShardKey(0) /* split at */, - makeShardKey(1) /* move to shard 1 */); - - // Write one document to each chunk. - assert.writeOK( - mongosColl.insert(makeShardKeyDocument(-1), {writeConcern: {w: "majority"}})); - assert.writeOK(mongosColl.insert(makeShardKeyDocument(1), {writeConcern: {w: "majority"}})); - - changeStream = mongosColl.aggregate([{$changeStream: {}}]); - assert(!changeStream.hasNext()); - - // Store a valid resume token before dropping the collection, to be used later in the test - assert.writeOK( - mongosColl.insert(makeShardKeyDocument(-2), {writeConcern: {w: "majority"}})); - assert.writeOK(mongosColl.insert(makeShardKeyDocument(2), {writeConcern: {w: "majority"}})); - - assert.soon(() => changeStream.hasNext()); - const resumeToken = changeStream.next()._id; - - mongosColl.drop(); - - assert.soon(() => changeStream.hasNext()); - assertChangeStreamEventEq(changeStream.next(), { - documentKey: makeShardKeyDocument(2), - fullDocument: makeShardKeyDocument(2), - ns: {db: mongosDB.getName(), coll: mongosColl.getName()}, - operationType: "insert", - }); - - assert.soon(() => changeStream.hasNext()); - assert.eq(changeStream.next().operationType, "drop"); - - assert.soon(() => changeStream.hasNext()); - assert.eq(changeStream.next().operationType, "invalidate"); - - // With an explicit collation, test that we can resume from before the collection drop - changeStream = - mongosColl.watch([], {resumeAfter: resumeToken, collation: {locale: "simple"}}); - - assert.soon(() => changeStream.hasNext()); - assertChangeStreamEventEq(changeStream.next(), { - documentKey: makeShardKeyDocument(2), - fullDocument: makeShardKeyDocument(2), - ns: {db: mongosDB.getName(), coll: mongosColl.getName()}, - operationType: "insert", - }); - - assert.soon(() => changeStream.hasNext()); - assert.eq(changeStream.next().operationType, "drop"); - - assert.soon(() => changeStream.hasNext()); - assert.eq(changeStream.next().operationType, "invalidate"); - - // Without an explicit collation, test that we *cannot* resume from before the collection - // drop - assert.commandFailedWithCode(mongosDB.runCommand({ - aggregate: mongosColl.getName(), - pipeline: [{$changeStream: {resumeAfter: resumeToken}}], - cursor: {} - }), - ErrorCodes.InvalidResumeToken); - - st.stop(); - } - - runTest('with_id_shard_key', '_id'); - runTest('with_non_id_shard_key', 'non_id'); + }); + + const mongosDB = st.s0.getDB(jsTestName()); + const mongosColl = mongosDB[jsTestName()]; + + assert.commandWorked(mongosDB.dropDatabase()); + + // Enable sharding on the test DB and ensure its primary is st.shard0.shardName. + assert.commandWorked(mongosDB.adminCommand({enableSharding: mongosDB.getName()})); + st.ensurePrimaryShard(mongosDB.getName(), st.rs0.getURL()); + + // Shard the test collection on _id. + assert.commandWorked( + mongosDB.adminCommand({shardCollection: mongosColl.getFullName(), key: {_id: 1}})); + + // Split the collection into 2 chunks: [MinKey, 0), [0, MaxKey). + assert.commandWorked( + mongosDB.adminCommand({split: mongosColl.getFullName(), middle: {_id: 0}})); + + // Move the [0, MaxKey) chunk to st.shard1.shardName. + assert.commandWorked(mongosDB.adminCommand( + {moveChunk: mongosColl.getFullName(), find: {_id: 1}, to: st.rs1.getURL()})); + + // Write a document to each chunk. + assert.writeOK(mongosColl.insert({_id: -1})); + assert.writeOK(mongosColl.insert({_id: 1})); + + let changeStream = + mongosColl.aggregate([{$changeStream: {}}, {$project: {_id: 0, clusterTime: 0}}]); + + // Test that a change stream can see inserts on shard 0. + assert.writeOK(mongosColl.insert({_id: 1000})); + assert.writeOK(mongosColl.insert({_id: -1000})); + + assert.soon(() => changeStream.hasNext(), "expected to be able to see the first insert"); + assert.docEq(changeStream.next(), { + documentKey: {_id: 1000}, + fullDocument: {_id: 1000}, + ns: {db: mongosDB.getName(), coll: mongosColl.getName()}, + operationType: "insert", + }); + + // Now do another write to shard 0, advancing that shard's clock and enabling the stream to + // return the earlier write to shard 1. + assert.writeOK(mongosColl.insert({_id: 1001})); + + assert.soon(() => changeStream.hasNext(), "expected to be able to see the second insert"); + assert.docEq(changeStream.next(), { + documentKey: {_id: -1000}, + fullDocument: {_id: -1000}, + ns: {db: mongosDB.getName(), coll: mongosColl.getName()}, + operationType: "insert", + }); + + // Test that all changes are eventually visible due to the periodic noop writer. + assert.commandWorked( + st.rs0.getPrimary().adminCommand({setParameter: 1, writePeriodicNoops: true})); + assert.commandWorked( + st.rs1.getPrimary().adminCommand({setParameter: 1, writePeriodicNoops: true})); + assert.soon(() => changeStream.hasNext()); + + assert.docEq(changeStream.next(), { + documentKey: {_id: 1001}, + fullDocument: {_id: 1001}, + ns: {db: mongosDB.getName(), coll: mongosColl.getName()}, + operationType: "insert", + }); + changeStream.close(); + + // Test that using change streams with any stages not allowed to run on mongos results in an + // error. + assertErrorCode( + mongosColl, [{$changeStream: {}}, {$out: "shouldntWork"}], ErrorCodes.IllegalOperation); + + // Test that it is legal to open a change stream, even if the + // 'internalQueryProhibitMergingOnMongos' parameter is set. + assert.commandWorked( + mongosDB.adminCommand({setParameter: 1, internalQueryProhibitMergingOnMongoS: true})); + let tempCursor = assert.doesNotThrow(() => mongosColl.aggregate([{$changeStream: {}}])); + tempCursor.close(); + assert.commandWorked( + mongosDB.adminCommand({setParameter: 1, internalQueryProhibitMergingOnMongoS: false})); + + // Test that $sort and $group are banned from running in a $changeStream pipeline. + assertErrorCode(mongosColl, + [{$changeStream: {}}, {$sort: {operationType: 1}}], + ErrorCodes.IllegalOperation); + assertErrorCode(mongosColl, + [{$changeStream: {}}, {$group: {_id: "$documentKey"}}], + ErrorCodes.IllegalOperation); + + assert.writeOK(mongosColl.remove({})); + // We awaited the replication of the first write, so the change stream shouldn't return it. + // Use { w: "majority" } to deal with journaling correctly, even though we only have one node. + assert.writeOK(mongosColl.insert({_id: 0, a: 1}, {writeConcern: {w: "majority"}})); + + changeStream = mongosColl.aggregate([{$changeStream: {}}, {$project: {"_id.clusterTime": 0}}]); + assert(!changeStream.hasNext()); + + // Drop the collection and test that we return a "drop" followed by an "invalidate" entry and + // close the cursor. + jsTestLog("Testing getMore command closes cursor for invalidate entries"); + mongosColl.drop(); + // Wait for the drop to actually happen. + assert.soon(() => !TwoPhaseDropCollectionTest.collectionIsPendingDropInDatabase( + mongosColl.getDB(), mongosColl.getName())); + assert.soon(() => changeStream.hasNext()); + assert.eq(changeStream.next().operationType, "drop"); + assert.soon(() => changeStream.hasNext()); + assert.eq(changeStream.next().operationType, "invalidate"); + assert(changeStream.isExhausted()); + + jsTestLog("Testing aggregate command closes cursor for invalidate entries"); + // Shard the test collection on _id. + assert.commandWorked( + mongosDB.adminCommand({shardCollection: mongosColl.getFullName(), key: {_id: 1}})); + + // Split the collection into 2 chunks: [MinKey, 0), [0, MaxKey). + assert.commandWorked( + mongosDB.adminCommand({split: mongosColl.getFullName(), middle: {_id: 0}})); + // Move the [0, MaxKey) chunk to st.shard1.shardName. + assert.commandWorked(mongosDB.adminCommand( + {moveChunk: mongosColl.getFullName(), find: {_id: 1}, to: st.rs1.getURL()})); + + // Write one document to each chunk. + assert.writeOK(mongosColl.insert({_id: -1}, {writeConcern: {w: "majority"}})); + assert.writeOK(mongosColl.insert({_id: 1}, {writeConcern: {w: "majority"}})); + + changeStream = mongosColl.aggregate([{$changeStream: {}}]); + assert(!changeStream.hasNext()); + + // Store a valid resume token before dropping the collection, to be used later in the test. + assert.writeOK(mongosColl.insert({_id: -2}, {writeConcern: {w: "majority"}})); + assert.writeOK(mongosColl.insert({_id: 2}, {writeConcern: {w: "majority"}})); + + assert.soon(() => changeStream.hasNext()); + const resumeToken = changeStream.next()._id; + + mongosColl.drop(); + + assert.soon(() => changeStream.hasNext()); + let next = changeStream.next(); + assert.eq(next.operationType, "insert"); + assert.eq(next.documentKey._id, 2); + + assert.soon(() => changeStream.hasNext()); + assert.eq(changeStream.next().operationType, "drop"); + + assert.soon(() => changeStream.hasNext()); + assert.eq(changeStream.next().operationType, "invalidate"); + + // With an explicit collation, test that we can resume from before the collection drop. + changeStream = mongosColl.watch([{$project: {_id: 0}}], + {resumeAfter: resumeToken, collation: {locale: "simple"}}); + + assert.soon(() => changeStream.hasNext()); + next = changeStream.next(); + assert.eq(next.operationType, "insert"); + assert.eq(next.documentKey, {_id: 2}); + + assert.soon(() => changeStream.hasNext()); + assert.eq(changeStream.next().operationType, "drop"); + + assert.soon(() => changeStream.hasNext()); + assert.eq(changeStream.next().operationType, "invalidate"); + + // Without an explicit collation, test that we *cannot* resume from before the collection drop. + assert.commandFailedWithCode(mongosDB.runCommand({ + aggregate: mongosColl.getName(), + pipeline: [{$changeStream: {resumeAfter: resumeToken}}], + cursor: {} + }), + ErrorCodes.InvalidResumeToken); + + st.stop(); })(); diff --git a/src/mongo/db/catalog/capped_utils.cpp b/src/mongo/db/catalog/capped_utils.cpp index df880bd6f4c..376f879e6f1 100644 --- a/src/mongo/db/catalog/capped_utils.cpp +++ b/src/mongo/db/catalog/capped_utils.cpp @@ -53,9 +53,7 @@ #include "mongo/db/service_context.h" #include "mongo/util/scopeguard.h" -namespace mongo { - -Status emptyCapped(OperationContext* opCtx, const NamespaceString& collectionName) { +mongo::Status mongo::emptyCapped(OperationContext* opCtx, const NamespaceString& collectionName) { AutoGetDb autoDb(opCtx, collectionName.db(), MODE_X); bool userInitiatedWritesAndNotPrimary = opCtx->writesAreReplicated() && @@ -105,20 +103,20 @@ Status emptyCapped(OperationContext* opCtx, const NamespaceString& collectionNam return status; } - const auto service = opCtx->getServiceContext(); - service->getOpObserver()->onEmptyCapped(opCtx, collection->ns(), collection->uuid()); + getGlobalServiceContext()->getOpObserver()->onEmptyCapped( + opCtx, collection->ns(), collection->uuid()); wuow.commit(); return Status::OK(); } -Status cloneCollectionAsCapped(OperationContext* opCtx, - Database* db, - const std::string& shortFrom, - const std::string& shortTo, - long long size, - bool temp) { +mongo::Status mongo::cloneCollectionAsCapped(OperationContext* opCtx, + Database* db, + const std::string& shortFrom, + const std::string& shortTo, + long long size, + bool temp) { NamespaceString fromNss(db->name(), shortFrom); NamespaceString toNss(db->name(), shortTo); @@ -253,9 +251,9 @@ Status cloneCollectionAsCapped(OperationContext* opCtx, MONGO_UNREACHABLE; } -Status convertToCapped(OperationContext* opCtx, - const NamespaceString& collectionName, - long long size) { +mongo::Status mongo::convertToCapped(OperationContext* opCtx, + const NamespaceString& collectionName, + long long size) { StringData dbname = collectionName.db(); StringData shortSource = collectionName.coll(); @@ -303,5 +301,3 @@ Status convertToCapped(OperationContext* opCtx, options.stayTemp = false; return renameCollection(opCtx, longTmpName, collectionName, options); } - -} // namespace mongo diff --git a/src/mongo/db/catalog/create_collection.cpp b/src/mongo/db/catalog/create_collection.cpp index 20ed630bae4..a9bf373f38a 100644 --- a/src/mongo/db/catalog/create_collection.cpp +++ b/src/mongo/db/catalog/create_collection.cpp @@ -51,7 +51,6 @@ namespace mongo { namespace { - /** * Shared part of the implementation of the createCollection versions for replicated and regular * collection creation. @@ -100,10 +99,8 @@ Status createCollection(OperationContext* opCtx, return writeConflictRetry(opCtx, "create", nss.ns(), [&] { Lock::DBLock dbXLock(opCtx, nss.db(), MODE_X); - const bool shardVersionCheck = true; OldClientContext ctx(opCtx, nss.ns(), shardVersionCheck); - if (opCtx->writesAreReplicated() && !repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesFor(opCtx, nss)) { return Status(ErrorCodes::NotMaster, diff --git a/src/mongo/db/catalog/rename_collection.cpp b/src/mongo/db/catalog/rename_collection.cpp index 3d5b681edd4..6aadfe754b4 100644 --- a/src/mongo/db/catalog/rename_collection.cpp +++ b/src/mongo/db/catalog/rename_collection.cpp @@ -144,8 +144,7 @@ Status renameCollectionCommon(OperationContext* opCtx, // We stay in source context the whole time. This is mostly to set the CurOp namespace. boost::optional<OldClientContext> ctx; - const bool shardVersionCheck = true; - ctx.emplace(opCtx, source.ns(), shardVersionCheck); + ctx.emplace(opCtx, source.ns()); auto replCoord = repl::ReplicationCoordinator::get(opCtx); bool userInitiatedWritesAndNotPrimary = diff --git a/src/mongo/db/commands/collection_to_capped.cpp b/src/mongo/db/commands/collection_to_capped.cpp index 18635222f58..a7b3530d0e9 100644 --- a/src/mongo/db/commands/collection_to_capped.cpp +++ b/src/mongo/db/commands/collection_to_capped.cpp @@ -46,7 +46,10 @@ #include "mongo/db/service_context.h" namespace mongo { -namespace { + +using std::unique_ptr; +using std::string; +using std::stringstream; class CmdCloneCollectionAsCapped : public ErrmsgCommandDeprecated { public: @@ -84,9 +87,9 @@ public: out->push_back(Privilege(ResourcePattern::forExactNamespace(nss), targetActions)); } bool errmsgRun(OperationContext* opCtx, - const std::string& dbname, + const string& dbname, const BSONObj& jsobj, - std::string& errmsg, + string& errmsg, BSONObjBuilder& result) { const auto fromElt = jsobj["cloneCollectionAsCapped"]; const auto toElt = jsobj["toCollection"]; @@ -137,13 +140,13 @@ public: uassertStatusOK(status); return true; } - } cmdCloneCollectionAsCapped; -/** - * Converts the given collection to a capped collection w/ the specified size. This command is not - * highly used, and is not currently supported with sharded environments. - */ +/* jan2010: + Converts the given collection to a capped collection w/ the specified size. + This command is not highly used, and is not currently supported with sharded + environments. + */ class CmdConvertToCapped : public ErrmsgCommandDeprecated { public: CmdConvertToCapped() : ErrmsgCommandDeprecated("convertToCapped") {} @@ -165,9 +168,9 @@ public: } bool errmsgRun(OperationContext* opCtx, - const std::string& dbname, + const string& dbname, const BSONObj& jsobj, - std::string& errmsg, + string& errmsg, BSONObjBuilder& result) { const NamespaceString nss(CommandHelpers::parseNsCollectionRequired(dbname, jsobj)); long long size = jsobj.getField("size").safeNumberLong(); @@ -182,6 +185,4 @@ public: } } cmdConvertToCapped; - -} // namespace -} // namespace mongo +} diff --git a/src/mongo/db/commands/find_cmd.cpp b/src/mongo/db/commands/find_cmd.cpp index 4ce8c38e5f2..6811bb958da 100644 --- a/src/mongo/db/commands/find_cmd.cpp +++ b/src/mongo/db/commands/find_cmd.cpp @@ -201,7 +201,7 @@ public: Collection* const collection = ctx->getCollection(); // We have a parsed query. Time to get the execution plan for it. - auto statusWithPlanExecutor = getExecutorFind(opCtx, collection, std::move(cq)); + auto statusWithPlanExecutor = getExecutorFind(opCtx, collection, nss, std::move(cq)); if (!statusWithPlanExecutor.isOK()) { return statusWithPlanExecutor.getStatus(); } @@ -312,7 +312,7 @@ public: Collection* const collection = ctx->getCollection(); // Get the execution plan for the query. - auto statusWithPlanExecutor = getExecutorFind(opCtx, collection, std::move(cq)); + auto statusWithPlanExecutor = getExecutorFind(opCtx, collection, nss, std::move(cq)); uassertStatusOK(statusWithPlanExecutor.getStatus()); auto exec = std::move(statusWithPlanExecutor.getValue()); diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp index 5291d665598..5b368e81af1 100644 --- a/src/mongo/db/pipeline/pipeline_d.cpp +++ b/src/mongo/db/pipeline/pipeline_d.cpp @@ -72,7 +72,6 @@ #include "mongo/db/query/plan_summary_stats.h" #include "mongo/db/query/query_planner.h" #include "mongo/db/s/collection_sharding_state.h" -#include "mongo/db/s/operation_sharding_state.h" #include "mongo/db/s/sharding_state.h" #include "mongo/db/service_context.h" #include "mongo/db/session_catalog.h" @@ -155,7 +154,7 @@ StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> createRandomCursorEx } // If we're in a sharded environment, we need to filter out documents we don't own. - if (OperationShardingState::isOperationVersioned(opCtx)) { + if (ShardingState::get(opCtx)->needCollectionMetadata(opCtx, collection->ns().ns())) { auto shardFilterStage = stdx::make_unique<ShardFilterStage>( opCtx, CollectionShardingState::get(opCtx, collection->ns())->getOrphansFilter(opCtx), @@ -218,7 +217,7 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> attemptToGetExe return {cq.getStatus()}; } - return getExecutorFind(opCtx, collection, std::move(cq.getValue()), plannerOpts); + return getExecutorFind(opCtx, collection, nss, std::move(cq.getValue()), plannerOpts); } BSONObj removeSortKeyMetaProjection(BSONObj projectionObj) { diff --git a/src/mongo/db/query/find.cpp b/src/mongo/db/query/find.cpp index c8308a0f331..ae14be74be9 100644 --- a/src/mongo/db/query/find.cpp +++ b/src/mongo/db/query/find.cpp @@ -560,8 +560,8 @@ std::string runQuery(OperationContext* opCtx, repl::ReplicationCoordinator::get(opCtx)->checkCanServeReadsFor(opCtx, nss, slaveOK)); } - // Get the execution plan for the query. - auto exec = uassertStatusOK(getExecutorLegacyFind(opCtx, collection, std::move(cq))); + // We have a parsed query. Time to get the execution plan for it. + auto exec = uassertStatusOK(getExecutorLegacyFind(opCtx, collection, nss, std::move(cq))); const QueryRequest& qr = exec->getCanonicalQuery()->getQueryRequest(); diff --git a/src/mongo/db/query/get_executor.cpp b/src/mongo/db/query/get_executor.cpp index c00ab569f65..ea89990bd2c 100644 --- a/src/mongo/db/query/get_executor.cpp +++ b/src/mongo/db/query/get_executor.cpp @@ -76,8 +76,9 @@ #include "mongo/db/query/stage_builder.h" #include "mongo/db/repl/optime.h" #include "mongo/db/repl/replication_coordinator.h" +#include "mongo/db/s/collection_metadata.h" #include "mongo/db/s/collection_sharding_state.h" -#include "mongo/db/s/operation_sharding_state.h" +#include "mongo/db/s/sharding_state.h" #include "mongo/db/server_options.h" #include "mongo/db/server_parameters.h" #include "mongo/db/service_context.h" @@ -673,6 +674,7 @@ StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getOplogStartHack( StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> _getExecutorFind( OperationContext* opCtx, Collection* collection, + const NamespaceString& nss, unique_ptr<CanonicalQuery> canonicalQuery, PlanExecutor::YieldPolicy yieldPolicy, size_t plannerOptions) { @@ -685,7 +687,7 @@ StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> _getExecutorFind( return getOplogStartHack(opCtx, collection, std::move(canonicalQuery), plannerOptions); } - if (OperationShardingState::isOperationVersioned(opCtx)) { + if (ShardingState::get(opCtx)->needCollectionMetadata(opCtx, nss.ns())) { plannerOptions |= QueryPlannerParams::INCLUDE_SHARD_FILTER; } return getExecutor(opCtx, collection, std::move(canonicalQuery), yieldPolicy, plannerOptions); @@ -696,22 +698,25 @@ StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> _getExecutorFind( StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutorFind( OperationContext* opCtx, Collection* collection, + const NamespaceString& nss, unique_ptr<CanonicalQuery> canonicalQuery, size_t plannerOptions) { - const auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx); + auto readConcernArgs = repl::ReadConcernArgs::get(opCtx); auto yieldPolicy = readConcernArgs.getLevel() == repl::ReadConcernLevel::kSnapshotReadConcern ? PlanExecutor::INTERRUPT_ONLY : PlanExecutor::YIELD_AUTO; return _getExecutorFind( - opCtx, collection, std::move(canonicalQuery), yieldPolicy, plannerOptions); + opCtx, collection, nss, std::move(canonicalQuery), yieldPolicy, plannerOptions); } StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutorLegacyFind( OperationContext* opCtx, Collection* collection, + const NamespaceString& nss, std::unique_ptr<CanonicalQuery> canonicalQuery) { return _getExecutorFind(opCtx, collection, + nss, std::move(canonicalQuery), PlanExecutor::YIELD_AUTO, QueryPlannerParams::DEFAULT); @@ -1363,7 +1368,7 @@ StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutorCount( } size_t plannerOptions = QueryPlannerParams::IS_COUNT; - if (OperationShardingState::isOperationVersioned(opCtx)) { + if (ShardingState::get(opCtx)->needCollectionMetadata(opCtx, request.getNs().ns())) { plannerOptions |= QueryPlannerParams::INCLUDE_SHARD_FILTER; } diff --git a/src/mongo/db/query/get_executor.h b/src/mongo/db/query/get_executor.h index 26b7513f251..f8abcc16c13 100644 --- a/src/mongo/db/query/get_executor.h +++ b/src/mongo/db/query/get_executor.h @@ -99,6 +99,7 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutor( StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutorFind( OperationContext* opCtx, Collection* collection, + const NamespaceString& nss, std::unique_ptr<CanonicalQuery> canonicalQuery, size_t plannerOptions = QueryPlannerParams::DEFAULT); @@ -108,6 +109,7 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutorFind StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutorLegacyFind( OperationContext* opCtx, Collection* collection, + const NamespaceString& nss, std::unique_ptr<CanonicalQuery> canonicalQuery); /** diff --git a/src/mongo/db/query/query_planner_params.h b/src/mongo/db/query/query_planner_params.h index cdac2cf1307..9e4e8fa4f0c 100644 --- a/src/mongo/db/query/query_planner_params.h +++ b/src/mongo/db/query/query_planner_params.h @@ -59,11 +59,10 @@ struct QueryPlannerParams { // Set this if you're running on a sharded cluster. We'll add a "drop all docs that // shouldn't be on this shard" stage before projection. // - // In order to set this, you must check OperationShardingState::isOperationVersioned() in - // the same lock that you use to build the query executor. You must also wrap the - // PlanExecutor in a ClientCursor within the same lock. - // - // See the comment on ShardFilterStage for details. + // In order to set this, you must check + // ShardingState::needCollectionMetadata(current_namespace) in the same lock that you use to + // build the query executor. You must also wrap the PlanExecutor in a ClientCursor within + // the same lock. See the comment on ShardFilterStage for details. INCLUDE_SHARD_FILTER = 1 << 2, // Set this if you don't want any plans with a blocking sort stage. All sorts must be diff --git a/src/mongo/db/s/cleanup_orphaned_cmd.cpp b/src/mongo/db/s/cleanup_orphaned_cmd.cpp index c1676b79b18..8bb1c5546e0 100644 --- a/src/mongo/db/s/cleanup_orphaned_cmd.cpp +++ b/src/mongo/db/s/cleanup_orphaned_cmd.cpp @@ -81,7 +81,13 @@ CleanupResult cleanupOrphanedData(OperationContext* opCtx, { AutoGetCollection autoColl(opCtx, ns, MODE_IX); auto* const css = CollectionShardingRuntime::get(opCtx, ns); - const auto metadata = css->getCurrentMetadata(); + const auto optMetadata = css->getCurrentMetadataIfKnown(); + uassert(ErrorCodes::ConflictingOperationInProgress, + str::stream() << "Unable to establish sharding status for collection " << ns.ns(), + optMetadata); + + const auto& metadata = *optMetadata; + if (!metadata->isSharded()) { LOG(0) << "skipping orphaned data cleanup for " << ns.ns() << ", collection is not sharded"; diff --git a/src/mongo/db/s/operation_sharding_state.cpp b/src/mongo/db/s/operation_sharding_state.cpp index 0600d1f6216..2c588be9ece 100644 --- a/src/mongo/db/s/operation_sharding_state.cpp +++ b/src/mongo/db/s/operation_sharding_state.cpp @@ -33,9 +33,9 @@ #include "mongo/db/s/operation_sharding_state.h" #include "mongo/db/operation_context.h" -#include "mongo/db/s/sharded_connection_info.h" namespace mongo { + namespace { const OperationContext::Decoration<OperationShardingState> shardingMetadataDecoration = @@ -49,7 +49,6 @@ const Milliseconds kMaxWaitForMovePrimaryCriticalSection = Minutes(5); // The name of the field in which the client attaches its database version. constexpr auto kDbVersionField = "databaseVersion"_sd; - } // namespace OperationShardingState::OperationShardingState() = default; @@ -62,14 +61,6 @@ OperationShardingState& OperationShardingState::get(OperationContext* opCtx) { return shardingMetadataDecoration(opCtx); } -bool OperationShardingState::isOperationVersioned(OperationContext* opCtx) { - const auto client = opCtx->getClient(); - - // Shard version information received from mongos may either be attached to the Client or - // directly to the OperationContext - return ShardedConnectionInfo::get(client, false) || get(opCtx).hasShardVersion(); -} - void OperationShardingState::setAllowImplicitCollectionCreation( const BSONElement& allowImplicitCollectionCreationElem) { if (!allowImplicitCollectionCreationElem.eoo()) { diff --git a/src/mongo/db/s/operation_sharding_state.h b/src/mongo/db/s/operation_sharding_state.h index 51914410a3e..d2cf419f8e6 100644 --- a/src/mongo/db/s/operation_sharding_state.h +++ b/src/mongo/db/s/operation_sharding_state.h @@ -64,13 +64,6 @@ public: static OperationShardingState& get(OperationContext* opCtx); /** - * Returns true if the the current operation was sent by the caller with shard version - * information attached, meaning that it must perform shard version checking and orphan - * filtering. - */ - static bool isOperationVersioned(OperationContext* opCtx); - - /** * Requests on a sharded collection that are broadcast without a shardVersion should not cause * the collection to be created on a shard that does not know about the collection already, * since the collection options will not be propagated. Such requests specify to disallow diff --git a/src/mongo/db/s/sharding_state.cpp b/src/mongo/db/s/sharding_state.cpp index 331c52ca2d2..f5e41c73f20 100644 --- a/src/mongo/db/s/sharding_state.cpp +++ b/src/mongo/db/s/sharding_state.cpp @@ -34,7 +34,8 @@ #include "mongo/db/s/sharding_state.h" -#include "mongo/db/operation_context.h" +#include "mongo/db/s/operation_sharding_state.h" +#include "mongo/db/s/sharded_connection_info.h" #include "mongo/db/server_options.h" #include "mongo/util/log.h" @@ -116,6 +117,18 @@ OID ShardingState::clusterId() { return _clusterId; } +bool ShardingState::needCollectionMetadata(OperationContext* opCtx, const std::string& ns) { + if (!enabled()) + return false; + + Client* client = opCtx->getClient(); + + // Shard version information received from mongos may either by attached to the Client or + // directly to the OperationContext. + return ShardedConnectionInfo::get(client, false) || + OperationShardingState::get(opCtx).hasShardVersion(); +} + void ShardingState::clearForTests() { _initializationState.store(static_cast<uint32_t>(InitializationState::kNew)); } diff --git a/src/mongo/db/s/sharding_state.h b/src/mongo/db/s/sharding_state.h index e049d0fcb3b..fb220c9aaea 100644 --- a/src/mongo/db/s/sharding_state.h +++ b/src/mongo/db/s/sharding_state.h @@ -106,6 +106,12 @@ public: OID clusterId(); /** + * Returns true if this node is a shard and if the currently runnint operation must engage the + * sharding subsystem (i.e., perform version checking, orphan filtering, etc). + */ + bool needCollectionMetadata(OperationContext* opCtx, const std::string& ns); + + /** * For testing only. This is a workaround for the fact that it is not possible to get a clean * ServiceContext in between test executions. Because of this, tests which require that they get * started with a clean (uninitialized) ShardingState must invoke this in their tearDown method. diff --git a/src/mongo/s/commands/commands_public.cpp b/src/mongo/s/commands/commands_public.cpp index 19f0a97c962..a5eb613114c 100644 --- a/src/mongo/s/commands/commands_public.cpp +++ b/src/mongo/s/commands/commands_public.cpp @@ -366,9 +366,6 @@ public: const NamespaceString nss(parseNs(dbName, cmdObj)); const auto routingInfo = uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss)); - uassert(ErrorCodes::IllegalOperation, - "You can't convertToCapped a sharded collection", - !routingInfo.cm()); // convertToCapped creates a temp collection and renames it at the end. It will require // special handling for create collection. |