summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKaloian Manassiev <kaloian.manassiev@mongodb.com>2018-08-21 10:50:04 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-01-26 11:13:38 +0000
commit1a01c53df8f7c1e016c0ccbc38b77f6b3508bf65 (patch)
tree854c0a0469aa9218b0d13a2e23a4d5e9cb67dbfa
parentfe4ced8f98d731883e5a4511d434716629e457a8 (diff)
downloadmongo-1a01c53df8f7c1e016c0ccbc38b77f6b3508bf65.tar.gz
SERVER-45599 Backport of SERVER-39495: Move ShardingState::needCollectionMetadata under OperationShardingState
ShardingState logically contains answers to questions about whether the current instance is node in a sharded cluster, whereas OperationShardingState is responsible for the 'shardedness' of the commands. This is a partial cherry-pick from b049257fbd1d215388cffaf7544f6741dbce5b45, adapted for the 4.0 branch. Also backports the addition of more testing for multi:true/justOne:false updates and ChangeStreams, which was taken from commit 50f6bd4d6a9428a6f1df22db792d7b55d773762c.
-rw-r--r--jstests/change_streams/change_stream.js40
-rw-r--r--jstests/sharding/change_streams.js423
-rw-r--r--src/mongo/db/catalog/capped_utils.cpp28
-rw-r--r--src/mongo/db/catalog/create_collection.cpp3
-rw-r--r--src/mongo/db/catalog/rename_collection.cpp3
-rw-r--r--src/mongo/db/commands/collection_to_capped.cpp27
-rw-r--r--src/mongo/db/commands/find_cmd.cpp4
-rw-r--r--src/mongo/db/pipeline/pipeline_d.cpp5
-rw-r--r--src/mongo/db/query/find.cpp4
-rw-r--r--src/mongo/db/query/get_executor.cpp15
-rw-r--r--src/mongo/db/query/get_executor.h2
-rw-r--r--src/mongo/db/query/query_planner_params.h9
-rw-r--r--src/mongo/db/s/cleanup_orphaned_cmd.cpp8
-rw-r--r--src/mongo/db/s/operation_sharding_state.cpp11
-rw-r--r--src/mongo/db/s/operation_sharding_state.h7
-rw-r--r--src/mongo/db/s/sharding_state.cpp15
-rw-r--r--src/mongo/db/s/sharding_state.h6
-rw-r--r--src/mongo/s/commands/commands_public.cpp3
18 files changed, 351 insertions, 262 deletions
diff --git a/jstests/change_streams/change_stream.js b/jstests/change_streams/change_stream.js
index 7c5688b0704..9f41255c599 100644
--- a/jstests/change_streams/change_stream.js
+++ b/jstests/change_streams/change_stream.js
@@ -132,6 +132,27 @@
};
cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [expected]});
+ jsTestLog("Testing multi:true update");
+ assert.writeOK(db.t1.insert({_id: 4, a: 0, b: 1}));
+ assert.writeOK(db.t1.insert({_id: 5, a: 0, b: 1}));
+ cursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t1});
+ assert.writeOK(db.t1.update({a: 0}, {$set: {b: 2}}, {multi: true}));
+ expected = [
+ {
+ documentKey: {_id: 4},
+ ns: {db: "test", coll: "t1"},
+ operationType: "update",
+ updateDescription: {removedFields: [], updatedFields: {b: 2}}
+ },
+ {
+ documentKey: {_id: 5},
+ ns: {db: "test", coll: "t1"},
+ operationType: "update",
+ updateDescription: {removedFields: [], updatedFields: {b: 2}}
+ }
+ ];
+ cst.assertNextChangesEqual({cursor: cursor, expectedChanges: expected});
+
jsTestLog("Testing delete");
cursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t1});
assert.writeOK(db.t1.remove({_id: 1}));
@@ -142,6 +163,25 @@
};
cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [expected]});
+ jsTestLog("Testing justOne:false delete");
+ assert.writeOK(db.t1.insert({_id: 6, a: 1, b: 1}));
+ assert.writeOK(db.t1.insert({_id: 7, a: 1, b: 1}));
+ cursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t1});
+ assert.writeOK(db.t1.remove({a: 1}, {justOne: false}));
+ expected = [
+ {
+ documentKey: {_id: 6},
+ ns: {db: "test", coll: "t1"},
+ operationType: "delete",
+ },
+ {
+ documentKey: {_id: 7},
+ ns: {db: "test", coll: "t1"},
+ operationType: "delete",
+ }
+ ];
+ cst.assertNextChangesEqual({cursor: cursor, expectedChanges: expected});
+
jsTestLog("Testing intervening write on another collection");
cursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t1});
let t2cursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t2});
diff --git a/jstests/sharding/change_streams.js b/jstests/sharding/change_streams.js
index cd13f86678c..53c22bfb723 100644
--- a/jstests/sharding/change_streams.js
+++ b/jstests/sharding/change_streams.js
@@ -3,10 +3,9 @@
(function() {
"use strict";
+ load('jstests/aggregation/extras/utils.js'); // For assertErrorCode()
+ load('jstests/libs/change_stream_util.js');
load('jstests/replsets/libs/two_phase_drops.js'); // For TwoPhaseDropCollectionTest.
- load('jstests/aggregation/extras/utils.js'); // For assertErrorCode().
-
- // For supportsMajorityReadConcern().
load("jstests/multiVersion/libs/causal_consistency_helpers.js");
if (!supportsMajorityReadConcern()) {
@@ -14,187 +13,241 @@
return;
}
- const st = new ShardingTest({
- shards: 2,
- rs: {
- nodes: 1,
- enableMajorityReadConcern: '',
- // Use a higher frequency for periodic noops to speed up the test.
- setParameter: {periodicNoopIntervalSecs: 1}
+ function runTest(collName, shardKey) {
+ const st = new ShardingTest({
+ shards: 2,
+ rs: {
+ nodes: 1,
+ enableMajorityReadConcern: '',
+ // Intentionally disable the periodic no-op writer in order to allow the test have
+ // control of advancing the cluster time. For when it is enabled later in the test,
+ // use a higher frequency for periodic noops to speed up the test.
+ setParameter: {periodicNoopIntervalSecs: 1, writePeriodicNoops: false}
+ }
+ });
+
+ const mongosDB = st.s0.getDB(jsTestName());
+ assert.commandWorked(st.s0.adminCommand({enableSharding: mongosDB.getName()}));
+ st.ensurePrimaryShard(mongosDB.getName(), st.shard0.shardName);
+
+ const mongosColl = mongosDB[collName];
+
+ //
+ // Sanity tests
+ //
+
+ // Test that $sort and $group are banned from running in a $changeStream pipeline.
+ assertErrorCode(mongosDB.NegativeTest,
+ [{$changeStream: {}}, {$sort: {operationType: 1}}],
+ ErrorCodes.IllegalOperation);
+ assertErrorCode(mongosDB.NegativeTest,
+ [{$changeStream: {}}, {$group: {_id: '$documentKey'}}],
+ ErrorCodes.IllegalOperation);
+
+ // Test that using change streams with any stages not allowed to run on mongos results in an
+ // error.
+ assertErrorCode(
+ mongosColl, [{$changeStream: {}}, {$out: "shouldntWork"}], ErrorCodes.IllegalOperation);
+
+ //
+ // Main tests
+ //
+
+ function makeShardKey(value) {
+ var obj = {};
+ obj[shardKey] = value;
+ return obj;
+ }
+
+ function makeShardKeyDocument(value, optExtraFields) {
+ var obj = {};
+ if (shardKey !== '_id')
+ obj['_id'] = value;
+ obj[shardKey] = value;
+ return Object.assign(obj, optExtraFields);
}
- });
-
- const mongosDB = st.s0.getDB(jsTestName());
- const mongosColl = mongosDB[jsTestName()];
-
- assert.commandWorked(mongosDB.dropDatabase());
-
- // Enable sharding on the test DB and ensure its primary is st.shard0.shardName.
- assert.commandWorked(mongosDB.adminCommand({enableSharding: mongosDB.getName()}));
- st.ensurePrimaryShard(mongosDB.getName(), st.rs0.getURL());
-
- // Shard the test collection on _id.
- assert.commandWorked(
- mongosDB.adminCommand({shardCollection: mongosColl.getFullName(), key: {_id: 1}}));
-
- // Split the collection into 2 chunks: [MinKey, 0), [0, MaxKey).
- assert.commandWorked(
- mongosDB.adminCommand({split: mongosColl.getFullName(), middle: {_id: 0}}));
-
- // Move the [0, MaxKey) chunk to st.shard1.shardName.
- assert.commandWorked(mongosDB.adminCommand(
- {moveChunk: mongosColl.getFullName(), find: {_id: 1}, to: st.rs1.getURL()}));
-
- // Write a document to each chunk.
- assert.writeOK(mongosColl.insert({_id: -1}));
- assert.writeOK(mongosColl.insert({_id: 1}));
-
- let changeStream =
- mongosColl.aggregate([{$changeStream: {}}, {$project: {_id: 0, clusterTime: 0}}]);
-
- // Test that a change stream can see inserts on shard 0.
- assert.writeOK(mongosColl.insert({_id: 1000}));
- assert.writeOK(mongosColl.insert({_id: -1000}));
-
- assert.soon(() => changeStream.hasNext(), "expected to be able to see the first insert");
- assert.docEq(changeStream.next(), {
- documentKey: {_id: 1000},
- fullDocument: {_id: 1000},
- ns: {db: mongosDB.getName(), coll: mongosColl.getName()},
- operationType: "insert",
- });
-
- // Now do another write to shard 0, advancing that shard's clock and enabling the stream to
- // return the earlier write to shard 1.
- assert.writeOK(mongosColl.insert({_id: 1001}));
-
- assert.soon(() => changeStream.hasNext(), "expected to be able to see the second insert");
- assert.docEq(changeStream.next(), {
- documentKey: {_id: -1000},
- fullDocument: {_id: -1000},
- ns: {db: mongosDB.getName(), coll: mongosColl.getName()},
- operationType: "insert",
- });
-
- // Test that all changes are eventually visible due to the periodic noop writer.
- assert.commandWorked(
- st.rs0.getPrimary().adminCommand({setParameter: 1, writePeriodicNoops: true}));
- assert.commandWorked(
- st.rs1.getPrimary().adminCommand({setParameter: 1, writePeriodicNoops: true}));
- assert.soon(() => changeStream.hasNext());
-
- assert.docEq(changeStream.next(), {
- documentKey: {_id: 1001},
- fullDocument: {_id: 1001},
- ns: {db: mongosDB.getName(), coll: mongosColl.getName()},
- operationType: "insert",
- });
- changeStream.close();
-
- // Test that using change streams with any stages not allowed to run on mongos results in an
- // error.
- assertErrorCode(
- mongosColl, [{$changeStream: {}}, {$out: "shouldntWork"}], ErrorCodes.IllegalOperation);
-
- // Test that it is legal to open a change stream, even if the
- // 'internalQueryProhibitMergingOnMongos' parameter is set.
- assert.commandWorked(
- mongosDB.adminCommand({setParameter: 1, internalQueryProhibitMergingOnMongoS: true}));
- let tempCursor = assert.doesNotThrow(() => mongosColl.aggregate([{$changeStream: {}}]));
- tempCursor.close();
- assert.commandWorked(
- mongosDB.adminCommand({setParameter: 1, internalQueryProhibitMergingOnMongoS: false}));
-
- // Test that $sort and $group are banned from running in a $changeStream pipeline.
- assertErrorCode(mongosColl,
- [{$changeStream: {}}, {$sort: {operationType: 1}}],
- ErrorCodes.IllegalOperation);
- assertErrorCode(mongosColl,
- [{$changeStream: {}}, {$group: {_id: "$documentKey"}}],
- ErrorCodes.IllegalOperation);
-
- assert.writeOK(mongosColl.remove({}));
- // We awaited the replication of the first write, so the change stream shouldn't return it.
- // Use { w: "majority" } to deal with journaling correctly, even though we only have one node.
- assert.writeOK(mongosColl.insert({_id: 0, a: 1}, {writeConcern: {w: "majority"}}));
-
- changeStream = mongosColl.aggregate([{$changeStream: {}}, {$project: {"_id.clusterTime": 0}}]);
- assert(!changeStream.hasNext());
-
- // Drop the collection and test that we return a "drop" followed by an "invalidate" entry and
- // close the cursor.
- jsTestLog("Testing getMore command closes cursor for invalidate entries");
- mongosColl.drop();
- // Wait for the drop to actually happen.
- assert.soon(() => !TwoPhaseDropCollectionTest.collectionIsPendingDropInDatabase(
- mongosColl.getDB(), mongosColl.getName()));
- assert.soon(() => changeStream.hasNext());
- assert.eq(changeStream.next().operationType, "drop");
- assert.soon(() => changeStream.hasNext());
- assert.eq(changeStream.next().operationType, "invalidate");
- assert(changeStream.isExhausted());
-
- jsTestLog("Testing aggregate command closes cursor for invalidate entries");
- // Shard the test collection on _id.
- assert.commandWorked(
- mongosDB.adminCommand({shardCollection: mongosColl.getFullName(), key: {_id: 1}}));
-
- // Split the collection into 2 chunks: [MinKey, 0), [0, MaxKey).
- assert.commandWorked(
- mongosDB.adminCommand({split: mongosColl.getFullName(), middle: {_id: 0}}));
- // Move the [0, MaxKey) chunk to st.shard1.shardName.
- assert.commandWorked(mongosDB.adminCommand(
- {moveChunk: mongosColl.getFullName(), find: {_id: 1}, to: st.rs1.getURL()}));
-
- // Write one document to each chunk.
- assert.writeOK(mongosColl.insert({_id: -1}, {writeConcern: {w: "majority"}}));
- assert.writeOK(mongosColl.insert({_id: 1}, {writeConcern: {w: "majority"}}));
-
- changeStream = mongosColl.aggregate([{$changeStream: {}}]);
- assert(!changeStream.hasNext());
-
- // Store a valid resume token before dropping the collection, to be used later in the test.
- assert.writeOK(mongosColl.insert({_id: -2}, {writeConcern: {w: "majority"}}));
- assert.writeOK(mongosColl.insert({_id: 2}, {writeConcern: {w: "majority"}}));
-
- assert.soon(() => changeStream.hasNext());
- const resumeToken = changeStream.next()._id;
-
- mongosColl.drop();
-
- assert.soon(() => changeStream.hasNext());
- let next = changeStream.next();
- assert.eq(next.operationType, "insert");
- assert.eq(next.documentKey._id, 2);
-
- assert.soon(() => changeStream.hasNext());
- assert.eq(changeStream.next().operationType, "drop");
-
- assert.soon(() => changeStream.hasNext());
- assert.eq(changeStream.next().operationType, "invalidate");
-
- // With an explicit collation, test that we can resume from before the collection drop.
- changeStream = mongosColl.watch([{$project: {_id: 0}}],
- {resumeAfter: resumeToken, collation: {locale: "simple"}});
-
- assert.soon(() => changeStream.hasNext());
- next = changeStream.next();
- assert.eq(next.operationType, "insert");
- assert.eq(next.documentKey, {_id: 2});
-
- assert.soon(() => changeStream.hasNext());
- assert.eq(changeStream.next().operationType, "drop");
-
- assert.soon(() => changeStream.hasNext());
- assert.eq(changeStream.next().operationType, "invalidate");
-
- // Without an explicit collation, test that we *cannot* resume from before the collection drop.
- assert.commandFailedWithCode(mongosDB.runCommand({
- aggregate: mongosColl.getName(),
- pipeline: [{$changeStream: {resumeAfter: resumeToken}}],
- cursor: {}
- }),
- ErrorCodes.InvalidResumeToken);
-
- st.stop();
+
+ jsTestLog('Testing change streams with shard key ' + shardKey);
+ // Shard the test collection and split it into 2 chunks:
+ // [MinKey, 0) - shard0, [0, MaxKey) - shard1
+ st.shardColl(mongosColl,
+ makeShardKey(1) /* shard key */,
+ makeShardKey(0) /* split at */,
+ makeShardKey(1) /* move to shard 1 */);
+
+ // Write a document to each chunk.
+ assert.writeOK(mongosColl.insert(makeShardKeyDocument(-1)));
+ assert.writeOK(mongosColl.insert(makeShardKeyDocument(1)));
+
+ let changeStream = mongosColl.aggregate([{$changeStream: {}}]);
+
+ // Test that a change stream can see inserts on shard 0.
+ assert.writeOK(mongosColl.insert(makeShardKeyDocument(1000)));
+ assert.writeOK(mongosColl.insert(makeShardKeyDocument(-1000)));
+
+ assert.soon(() => changeStream.hasNext(), "expected to be able to see the first insert");
+ assertChangeStreamEventEq(changeStream.next(), {
+ documentKey: makeShardKeyDocument(1000),
+ fullDocument: makeShardKeyDocument(1000),
+ ns: {db: mongosDB.getName(), coll: mongosColl.getName()},
+ operationType: "insert",
+ });
+
+ // Because the periodic noop writer is disabled, do another write to shard 0 in order to
+ // advance that shard's clock and enabling the stream to return the earlier write to shard 1
+ assert.writeOK(mongosColl.insert(makeShardKeyDocument(1001)));
+
+ assert.soon(() => changeStream.hasNext(), "expected to be able to see the second insert");
+ assertChangeStreamEventEq(changeStream.next(), {
+ documentKey: makeShardKeyDocument(-1000),
+ fullDocument: makeShardKeyDocument(-1000),
+ ns: {db: mongosDB.getName(), coll: mongosColl.getName()},
+ operationType: "insert",
+ });
+
+ // Test that all changes are eventually visible due to the periodic noop writer.
+ assert.commandWorked(
+ st.rs0.getPrimary().adminCommand({setParameter: 1, writePeriodicNoops: true}));
+ assert.commandWorked(
+ st.rs1.getPrimary().adminCommand({setParameter: 1, writePeriodicNoops: true}));
+
+ assert.soon(() => changeStream.hasNext());
+ assertChangeStreamEventEq(changeStream.next(), {
+ documentKey: makeShardKeyDocument(1001),
+ fullDocument: makeShardKeyDocument(1001),
+ ns: {db: mongosDB.getName(), coll: mongosColl.getName()},
+ operationType: "insert",
+ });
+ changeStream.close();
+
+ jsTestLog('Testing multi-update change streams with shard key ' + shardKey);
+ assert.writeOK(mongosColl.insert(makeShardKeyDocument(10, {a: 0, b: 0})));
+ assert.writeOK(mongosColl.insert(makeShardKeyDocument(-10, {a: 0, b: 0})));
+ changeStream = mongosColl.aggregate([{$changeStream: {}}]);
+
+ assert.writeOK(mongosColl.update({a: 0}, {$set: {b: 2}}, {multi: true}));
+
+ assert.soon(() => changeStream.hasNext());
+ assertChangeStreamEventEq(changeStream.next(), {
+ operationType: "update",
+ ns: {db: mongosDB.getName(), coll: mongosColl.getName()},
+ documentKey: makeShardKeyDocument(-10),
+ updateDescription: {updatedFields: {b: 2}, removedFields: []},
+ });
+
+ assert.soon(() => changeStream.hasNext());
+ assertChangeStreamEventEq(changeStream.next(), {
+ operationType: "update",
+ ns: {db: mongosDB.getName(), coll: mongosColl.getName()},
+ documentKey: makeShardKeyDocument(10),
+ updateDescription: {updatedFields: {b: 2}, removedFields: []},
+ });
+ changeStream.close();
+
+ // Test that it is legal to open a change stream, even if the
+ // 'internalQueryProhibitMergingOnMongos' parameter is set.
+ assert.commandWorked(
+ st.s0.adminCommand({setParameter: 1, internalQueryProhibitMergingOnMongoS: true}));
+ let tempCursor = assert.doesNotThrow(() => mongosColl.aggregate([{$changeStream: {}}]));
+ tempCursor.close();
+ assert.commandWorked(
+ st.s0.adminCommand({setParameter: 1, internalQueryProhibitMergingOnMongoS: false}));
+
+ assert.writeOK(mongosColl.remove({}));
+ // We awaited the replication of the first write, so the change stream shouldn't return it.
+ // Use { w: "majority" } to deal with journaling correctly, even though we only have one
+ // node.
+ assert.writeOK(
+ mongosColl.insert(makeShardKeyDocument(0, {a: 1}), {writeConcern: {w: "majority"}}));
+
+ changeStream = mongosColl.aggregate([{$changeStream: {}}]);
+ assert(!changeStream.hasNext());
+
+ // Drop the collection and test that we return a "drop" followed by an "invalidate" entry
+ // and close the cursor.
+ jsTestLog('Testing getMore command closes cursor for invalidate entries with shard key' +
+ shardKey);
+ mongosColl.drop();
+ // Wait for the drop to actually happen.
+ assert.soon(() => !TwoPhaseDropCollectionTest.collectionIsPendingDropInDatabase(
+ mongosColl.getDB(), mongosColl.getName()));
+ assert.soon(() => changeStream.hasNext());
+ assert.eq(changeStream.next().operationType, "drop");
+ assert.soon(() => changeStream.hasNext());
+ assert.eq(changeStream.next().operationType, "invalidate");
+ assert(changeStream.isExhausted());
+
+ jsTestLog('Testing aggregate command closes cursor for invalidate entries with shard key' +
+ shardKey);
+ // Shard the test collection and split it into 2 chunks:
+ // [MinKey, 0) - shard0, [0, MaxKey) - shard1
+ st.shardColl(mongosColl,
+ makeShardKey(1) /* shard key */,
+ makeShardKey(0) /* split at */,
+ makeShardKey(1) /* move to shard 1 */);
+
+ // Write one document to each chunk.
+ assert.writeOK(
+ mongosColl.insert(makeShardKeyDocument(-1), {writeConcern: {w: "majority"}}));
+ assert.writeOK(mongosColl.insert(makeShardKeyDocument(1), {writeConcern: {w: "majority"}}));
+
+ changeStream = mongosColl.aggregate([{$changeStream: {}}]);
+ assert(!changeStream.hasNext());
+
+ // Store a valid resume token before dropping the collection, to be used later in the test
+ assert.writeOK(
+ mongosColl.insert(makeShardKeyDocument(-2), {writeConcern: {w: "majority"}}));
+ assert.writeOK(mongosColl.insert(makeShardKeyDocument(2), {writeConcern: {w: "majority"}}));
+
+ assert.soon(() => changeStream.hasNext());
+ const resumeToken = changeStream.next()._id;
+
+ mongosColl.drop();
+
+ assert.soon(() => changeStream.hasNext());
+ assertChangeStreamEventEq(changeStream.next(), {
+ documentKey: makeShardKeyDocument(2),
+ fullDocument: makeShardKeyDocument(2),
+ ns: {db: mongosDB.getName(), coll: mongosColl.getName()},
+ operationType: "insert",
+ });
+
+ assert.soon(() => changeStream.hasNext());
+ assert.eq(changeStream.next().operationType, "drop");
+
+ assert.soon(() => changeStream.hasNext());
+ assert.eq(changeStream.next().operationType, "invalidate");
+
+ // With an explicit collation, test that we can resume from before the collection drop
+ changeStream =
+ mongosColl.watch([], {resumeAfter: resumeToken, collation: {locale: "simple"}});
+
+ assert.soon(() => changeStream.hasNext());
+ assertChangeStreamEventEq(changeStream.next(), {
+ documentKey: makeShardKeyDocument(2),
+ fullDocument: makeShardKeyDocument(2),
+ ns: {db: mongosDB.getName(), coll: mongosColl.getName()},
+ operationType: "insert",
+ });
+
+ assert.soon(() => changeStream.hasNext());
+ assert.eq(changeStream.next().operationType, "drop");
+
+ assert.soon(() => changeStream.hasNext());
+ assert.eq(changeStream.next().operationType, "invalidate");
+
+ // Without an explicit collation, test that we *cannot* resume from before the collection
+ // drop
+ assert.commandFailedWithCode(mongosDB.runCommand({
+ aggregate: mongosColl.getName(),
+ pipeline: [{$changeStream: {resumeAfter: resumeToken}}],
+ cursor: {}
+ }),
+ ErrorCodes.InvalidResumeToken);
+
+ st.stop();
+ }
+
+ runTest('with_id_shard_key', '_id');
+ runTest('with_non_id_shard_key', 'non_id');
})();
diff --git a/src/mongo/db/catalog/capped_utils.cpp b/src/mongo/db/catalog/capped_utils.cpp
index 376f879e6f1..df880bd6f4c 100644
--- a/src/mongo/db/catalog/capped_utils.cpp
+++ b/src/mongo/db/catalog/capped_utils.cpp
@@ -53,7 +53,9 @@
#include "mongo/db/service_context.h"
#include "mongo/util/scopeguard.h"
-mongo::Status mongo::emptyCapped(OperationContext* opCtx, const NamespaceString& collectionName) {
+namespace mongo {
+
+Status emptyCapped(OperationContext* opCtx, const NamespaceString& collectionName) {
AutoGetDb autoDb(opCtx, collectionName.db(), MODE_X);
bool userInitiatedWritesAndNotPrimary = opCtx->writesAreReplicated() &&
@@ -103,20 +105,20 @@ mongo::Status mongo::emptyCapped(OperationContext* opCtx, const NamespaceString&
return status;
}
- getGlobalServiceContext()->getOpObserver()->onEmptyCapped(
- opCtx, collection->ns(), collection->uuid());
+ const auto service = opCtx->getServiceContext();
+ service->getOpObserver()->onEmptyCapped(opCtx, collection->ns(), collection->uuid());
wuow.commit();
return Status::OK();
}
-mongo::Status mongo::cloneCollectionAsCapped(OperationContext* opCtx,
- Database* db,
- const std::string& shortFrom,
- const std::string& shortTo,
- long long size,
- bool temp) {
+Status cloneCollectionAsCapped(OperationContext* opCtx,
+ Database* db,
+ const std::string& shortFrom,
+ const std::string& shortTo,
+ long long size,
+ bool temp) {
NamespaceString fromNss(db->name(), shortFrom);
NamespaceString toNss(db->name(), shortTo);
@@ -251,9 +253,9 @@ mongo::Status mongo::cloneCollectionAsCapped(OperationContext* opCtx,
MONGO_UNREACHABLE;
}
-mongo::Status mongo::convertToCapped(OperationContext* opCtx,
- const NamespaceString& collectionName,
- long long size) {
+Status convertToCapped(OperationContext* opCtx,
+ const NamespaceString& collectionName,
+ long long size) {
StringData dbname = collectionName.db();
StringData shortSource = collectionName.coll();
@@ -301,3 +303,5 @@ mongo::Status mongo::convertToCapped(OperationContext* opCtx,
options.stayTemp = false;
return renameCollection(opCtx, longTmpName, collectionName, options);
}
+
+} // namespace mongo
diff --git a/src/mongo/db/catalog/create_collection.cpp b/src/mongo/db/catalog/create_collection.cpp
index a9bf373f38a..20ed630bae4 100644
--- a/src/mongo/db/catalog/create_collection.cpp
+++ b/src/mongo/db/catalog/create_collection.cpp
@@ -51,6 +51,7 @@
namespace mongo {
namespace {
+
/**
* Shared part of the implementation of the createCollection versions for replicated and regular
* collection creation.
@@ -99,8 +100,10 @@ Status createCollection(OperationContext* opCtx,
return writeConflictRetry(opCtx, "create", nss.ns(), [&] {
Lock::DBLock dbXLock(opCtx, nss.db(), MODE_X);
+
const bool shardVersionCheck = true;
OldClientContext ctx(opCtx, nss.ns(), shardVersionCheck);
+
if (opCtx->writesAreReplicated() &&
!repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesFor(opCtx, nss)) {
return Status(ErrorCodes::NotMaster,
diff --git a/src/mongo/db/catalog/rename_collection.cpp b/src/mongo/db/catalog/rename_collection.cpp
index 6aadfe754b4..3d5b681edd4 100644
--- a/src/mongo/db/catalog/rename_collection.cpp
+++ b/src/mongo/db/catalog/rename_collection.cpp
@@ -144,7 +144,8 @@ Status renameCollectionCommon(OperationContext* opCtx,
// We stay in source context the whole time. This is mostly to set the CurOp namespace.
boost::optional<OldClientContext> ctx;
- ctx.emplace(opCtx, source.ns());
+ const bool shardVersionCheck = true;
+ ctx.emplace(opCtx, source.ns(), shardVersionCheck);
auto replCoord = repl::ReplicationCoordinator::get(opCtx);
bool userInitiatedWritesAndNotPrimary =
diff --git a/src/mongo/db/commands/collection_to_capped.cpp b/src/mongo/db/commands/collection_to_capped.cpp
index a7b3530d0e9..18635222f58 100644
--- a/src/mongo/db/commands/collection_to_capped.cpp
+++ b/src/mongo/db/commands/collection_to_capped.cpp
@@ -46,10 +46,7 @@
#include "mongo/db/service_context.h"
namespace mongo {
-
-using std::unique_ptr;
-using std::string;
-using std::stringstream;
+namespace {
class CmdCloneCollectionAsCapped : public ErrmsgCommandDeprecated {
public:
@@ -87,9 +84,9 @@ public:
out->push_back(Privilege(ResourcePattern::forExactNamespace(nss), targetActions));
}
bool errmsgRun(OperationContext* opCtx,
- const string& dbname,
+ const std::string& dbname,
const BSONObj& jsobj,
- string& errmsg,
+ std::string& errmsg,
BSONObjBuilder& result) {
const auto fromElt = jsobj["cloneCollectionAsCapped"];
const auto toElt = jsobj["toCollection"];
@@ -140,13 +137,13 @@ public:
uassertStatusOK(status);
return true;
}
+
} cmdCloneCollectionAsCapped;
-/* jan2010:
- Converts the given collection to a capped collection w/ the specified size.
- This command is not highly used, and is not currently supported with sharded
- environments.
- */
+/**
+ * Converts the given collection to a capped collection w/ the specified size. This command is not
+ * highly used, and is not currently supported with sharded environments.
+ */
class CmdConvertToCapped : public ErrmsgCommandDeprecated {
public:
CmdConvertToCapped() : ErrmsgCommandDeprecated("convertToCapped") {}
@@ -168,9 +165,9 @@ public:
}
bool errmsgRun(OperationContext* opCtx,
- const string& dbname,
+ const std::string& dbname,
const BSONObj& jsobj,
- string& errmsg,
+ std::string& errmsg,
BSONObjBuilder& result) {
const NamespaceString nss(CommandHelpers::parseNsCollectionRequired(dbname, jsobj));
long long size = jsobj.getField("size").safeNumberLong();
@@ -185,4 +182,6 @@ public:
}
} cmdConvertToCapped;
-}
+
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/db/commands/find_cmd.cpp b/src/mongo/db/commands/find_cmd.cpp
index 6811bb958da..4ce8c38e5f2 100644
--- a/src/mongo/db/commands/find_cmd.cpp
+++ b/src/mongo/db/commands/find_cmd.cpp
@@ -201,7 +201,7 @@ public:
Collection* const collection = ctx->getCollection();
// We have a parsed query. Time to get the execution plan for it.
- auto statusWithPlanExecutor = getExecutorFind(opCtx, collection, nss, std::move(cq));
+ auto statusWithPlanExecutor = getExecutorFind(opCtx, collection, std::move(cq));
if (!statusWithPlanExecutor.isOK()) {
return statusWithPlanExecutor.getStatus();
}
@@ -312,7 +312,7 @@ public:
Collection* const collection = ctx->getCollection();
// Get the execution plan for the query.
- auto statusWithPlanExecutor = getExecutorFind(opCtx, collection, nss, std::move(cq));
+ auto statusWithPlanExecutor = getExecutorFind(opCtx, collection, std::move(cq));
uassertStatusOK(statusWithPlanExecutor.getStatus());
auto exec = std::move(statusWithPlanExecutor.getValue());
diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp
index 5b368e81af1..5291d665598 100644
--- a/src/mongo/db/pipeline/pipeline_d.cpp
+++ b/src/mongo/db/pipeline/pipeline_d.cpp
@@ -72,6 +72,7 @@
#include "mongo/db/query/plan_summary_stats.h"
#include "mongo/db/query/query_planner.h"
#include "mongo/db/s/collection_sharding_state.h"
+#include "mongo/db/s/operation_sharding_state.h"
#include "mongo/db/s/sharding_state.h"
#include "mongo/db/service_context.h"
#include "mongo/db/session_catalog.h"
@@ -154,7 +155,7 @@ StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> createRandomCursorEx
}
// If we're in a sharded environment, we need to filter out documents we don't own.
- if (ShardingState::get(opCtx)->needCollectionMetadata(opCtx, collection->ns().ns())) {
+ if (OperationShardingState::isOperationVersioned(opCtx)) {
auto shardFilterStage = stdx::make_unique<ShardFilterStage>(
opCtx,
CollectionShardingState::get(opCtx, collection->ns())->getOrphansFilter(opCtx),
@@ -217,7 +218,7 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> attemptToGetExe
return {cq.getStatus()};
}
- return getExecutorFind(opCtx, collection, nss, std::move(cq.getValue()), plannerOpts);
+ return getExecutorFind(opCtx, collection, std::move(cq.getValue()), plannerOpts);
}
BSONObj removeSortKeyMetaProjection(BSONObj projectionObj) {
diff --git a/src/mongo/db/query/find.cpp b/src/mongo/db/query/find.cpp
index ae14be74be9..c8308a0f331 100644
--- a/src/mongo/db/query/find.cpp
+++ b/src/mongo/db/query/find.cpp
@@ -560,8 +560,8 @@ std::string runQuery(OperationContext* opCtx,
repl::ReplicationCoordinator::get(opCtx)->checkCanServeReadsFor(opCtx, nss, slaveOK));
}
- // We have a parsed query. Time to get the execution plan for it.
- auto exec = uassertStatusOK(getExecutorLegacyFind(opCtx, collection, nss, std::move(cq)));
+ // Get the execution plan for the query.
+ auto exec = uassertStatusOK(getExecutorLegacyFind(opCtx, collection, std::move(cq)));
const QueryRequest& qr = exec->getCanonicalQuery()->getQueryRequest();
diff --git a/src/mongo/db/query/get_executor.cpp b/src/mongo/db/query/get_executor.cpp
index ea89990bd2c..c00ab569f65 100644
--- a/src/mongo/db/query/get_executor.cpp
+++ b/src/mongo/db/query/get_executor.cpp
@@ -76,9 +76,8 @@
#include "mongo/db/query/stage_builder.h"
#include "mongo/db/repl/optime.h"
#include "mongo/db/repl/replication_coordinator.h"
-#include "mongo/db/s/collection_metadata.h"
#include "mongo/db/s/collection_sharding_state.h"
-#include "mongo/db/s/sharding_state.h"
+#include "mongo/db/s/operation_sharding_state.h"
#include "mongo/db/server_options.h"
#include "mongo/db/server_parameters.h"
#include "mongo/db/service_context.h"
@@ -674,7 +673,6 @@ StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getOplogStartHack(
StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> _getExecutorFind(
OperationContext* opCtx,
Collection* collection,
- const NamespaceString& nss,
unique_ptr<CanonicalQuery> canonicalQuery,
PlanExecutor::YieldPolicy yieldPolicy,
size_t plannerOptions) {
@@ -687,7 +685,7 @@ StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> _getExecutorFind(
return getOplogStartHack(opCtx, collection, std::move(canonicalQuery), plannerOptions);
}
- if (ShardingState::get(opCtx)->needCollectionMetadata(opCtx, nss.ns())) {
+ if (OperationShardingState::isOperationVersioned(opCtx)) {
plannerOptions |= QueryPlannerParams::INCLUDE_SHARD_FILTER;
}
return getExecutor(opCtx, collection, std::move(canonicalQuery), yieldPolicy, plannerOptions);
@@ -698,25 +696,22 @@ StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> _getExecutorFind(
StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutorFind(
OperationContext* opCtx,
Collection* collection,
- const NamespaceString& nss,
unique_ptr<CanonicalQuery> canonicalQuery,
size_t plannerOptions) {
- auto readConcernArgs = repl::ReadConcernArgs::get(opCtx);
+ const auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx);
auto yieldPolicy = readConcernArgs.getLevel() == repl::ReadConcernLevel::kSnapshotReadConcern
? PlanExecutor::INTERRUPT_ONLY
: PlanExecutor::YIELD_AUTO;
return _getExecutorFind(
- opCtx, collection, nss, std::move(canonicalQuery), yieldPolicy, plannerOptions);
+ opCtx, collection, std::move(canonicalQuery), yieldPolicy, plannerOptions);
}
StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutorLegacyFind(
OperationContext* opCtx,
Collection* collection,
- const NamespaceString& nss,
std::unique_ptr<CanonicalQuery> canonicalQuery) {
return _getExecutorFind(opCtx,
collection,
- nss,
std::move(canonicalQuery),
PlanExecutor::YIELD_AUTO,
QueryPlannerParams::DEFAULT);
@@ -1368,7 +1363,7 @@ StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutorCount(
}
size_t plannerOptions = QueryPlannerParams::IS_COUNT;
- if (ShardingState::get(opCtx)->needCollectionMetadata(opCtx, request.getNs().ns())) {
+ if (OperationShardingState::isOperationVersioned(opCtx)) {
plannerOptions |= QueryPlannerParams::INCLUDE_SHARD_FILTER;
}
diff --git a/src/mongo/db/query/get_executor.h b/src/mongo/db/query/get_executor.h
index f8abcc16c13..26b7513f251 100644
--- a/src/mongo/db/query/get_executor.h
+++ b/src/mongo/db/query/get_executor.h
@@ -99,7 +99,6 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutor(
StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutorFind(
OperationContext* opCtx,
Collection* collection,
- const NamespaceString& nss,
std::unique_ptr<CanonicalQuery> canonicalQuery,
size_t plannerOptions = QueryPlannerParams::DEFAULT);
@@ -109,7 +108,6 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutorFind
StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutorLegacyFind(
OperationContext* opCtx,
Collection* collection,
- const NamespaceString& nss,
std::unique_ptr<CanonicalQuery> canonicalQuery);
/**
diff --git a/src/mongo/db/query/query_planner_params.h b/src/mongo/db/query/query_planner_params.h
index 9e4e8fa4f0c..cdac2cf1307 100644
--- a/src/mongo/db/query/query_planner_params.h
+++ b/src/mongo/db/query/query_planner_params.h
@@ -59,10 +59,11 @@ struct QueryPlannerParams {
// Set this if you're running on a sharded cluster. We'll add a "drop all docs that
// shouldn't be on this shard" stage before projection.
//
- // In order to set this, you must check
- // ShardingState::needCollectionMetadata(current_namespace) in the same lock that you use to
- // build the query executor. You must also wrap the PlanExecutor in a ClientCursor within
- // the same lock. See the comment on ShardFilterStage for details.
+ // In order to set this, you must check OperationShardingState::isOperationVersioned() in
+ // the same lock that you use to build the query executor. You must also wrap the
+ // PlanExecutor in a ClientCursor within the same lock.
+ //
+ // See the comment on ShardFilterStage for details.
INCLUDE_SHARD_FILTER = 1 << 2,
// Set this if you don't want any plans with a blocking sort stage. All sorts must be
diff --git a/src/mongo/db/s/cleanup_orphaned_cmd.cpp b/src/mongo/db/s/cleanup_orphaned_cmd.cpp
index 8bb1c5546e0..c1676b79b18 100644
--- a/src/mongo/db/s/cleanup_orphaned_cmd.cpp
+++ b/src/mongo/db/s/cleanup_orphaned_cmd.cpp
@@ -81,13 +81,7 @@ CleanupResult cleanupOrphanedData(OperationContext* opCtx,
{
AutoGetCollection autoColl(opCtx, ns, MODE_IX);
auto* const css = CollectionShardingRuntime::get(opCtx, ns);
- const auto optMetadata = css->getCurrentMetadataIfKnown();
- uassert(ErrorCodes::ConflictingOperationInProgress,
- str::stream() << "Unable to establish sharding status for collection " << ns.ns(),
- optMetadata);
-
- const auto& metadata = *optMetadata;
-
+ const auto metadata = css->getCurrentMetadata();
if (!metadata->isSharded()) {
LOG(0) << "skipping orphaned data cleanup for " << ns.ns()
<< ", collection is not sharded";
diff --git a/src/mongo/db/s/operation_sharding_state.cpp b/src/mongo/db/s/operation_sharding_state.cpp
index 2c588be9ece..0600d1f6216 100644
--- a/src/mongo/db/s/operation_sharding_state.cpp
+++ b/src/mongo/db/s/operation_sharding_state.cpp
@@ -33,9 +33,9 @@
#include "mongo/db/s/operation_sharding_state.h"
#include "mongo/db/operation_context.h"
+#include "mongo/db/s/sharded_connection_info.h"
namespace mongo {
-
namespace {
const OperationContext::Decoration<OperationShardingState> shardingMetadataDecoration =
@@ -49,6 +49,7 @@ const Milliseconds kMaxWaitForMovePrimaryCriticalSection = Minutes(5);
// The name of the field in which the client attaches its database version.
constexpr auto kDbVersionField = "databaseVersion"_sd;
+
} // namespace
OperationShardingState::OperationShardingState() = default;
@@ -61,6 +62,14 @@ OperationShardingState& OperationShardingState::get(OperationContext* opCtx) {
return shardingMetadataDecoration(opCtx);
}
+bool OperationShardingState::isOperationVersioned(OperationContext* opCtx) {
+ const auto client = opCtx->getClient();
+
+ // Shard version information received from mongos may either be attached to the Client or
+ // directly to the OperationContext
+ return ShardedConnectionInfo::get(client, false) || get(opCtx).hasShardVersion();
+}
+
void OperationShardingState::setAllowImplicitCollectionCreation(
const BSONElement& allowImplicitCollectionCreationElem) {
if (!allowImplicitCollectionCreationElem.eoo()) {
diff --git a/src/mongo/db/s/operation_sharding_state.h b/src/mongo/db/s/operation_sharding_state.h
index d2cf419f8e6..51914410a3e 100644
--- a/src/mongo/db/s/operation_sharding_state.h
+++ b/src/mongo/db/s/operation_sharding_state.h
@@ -64,6 +64,13 @@ public:
static OperationShardingState& get(OperationContext* opCtx);
/**
+ * Returns true if the the current operation was sent by the caller with shard version
+ * information attached, meaning that it must perform shard version checking and orphan
+ * filtering.
+ */
+ static bool isOperationVersioned(OperationContext* opCtx);
+
+ /**
* Requests on a sharded collection that are broadcast without a shardVersion should not cause
* the collection to be created on a shard that does not know about the collection already,
* since the collection options will not be propagated. Such requests specify to disallow
diff --git a/src/mongo/db/s/sharding_state.cpp b/src/mongo/db/s/sharding_state.cpp
index f5e41c73f20..331c52ca2d2 100644
--- a/src/mongo/db/s/sharding_state.cpp
+++ b/src/mongo/db/s/sharding_state.cpp
@@ -34,8 +34,7 @@
#include "mongo/db/s/sharding_state.h"
-#include "mongo/db/s/operation_sharding_state.h"
-#include "mongo/db/s/sharded_connection_info.h"
+#include "mongo/db/operation_context.h"
#include "mongo/db/server_options.h"
#include "mongo/util/log.h"
@@ -117,18 +116,6 @@ OID ShardingState::clusterId() {
return _clusterId;
}
-bool ShardingState::needCollectionMetadata(OperationContext* opCtx, const std::string& ns) {
- if (!enabled())
- return false;
-
- Client* client = opCtx->getClient();
-
- // Shard version information received from mongos may either by attached to the Client or
- // directly to the OperationContext.
- return ShardedConnectionInfo::get(client, false) ||
- OperationShardingState::get(opCtx).hasShardVersion();
-}
-
void ShardingState::clearForTests() {
_initializationState.store(static_cast<uint32_t>(InitializationState::kNew));
}
diff --git a/src/mongo/db/s/sharding_state.h b/src/mongo/db/s/sharding_state.h
index fb220c9aaea..e049d0fcb3b 100644
--- a/src/mongo/db/s/sharding_state.h
+++ b/src/mongo/db/s/sharding_state.h
@@ -106,12 +106,6 @@ public:
OID clusterId();
/**
- * Returns true if this node is a shard and if the currently runnint operation must engage the
- * sharding subsystem (i.e., perform version checking, orphan filtering, etc).
- */
- bool needCollectionMetadata(OperationContext* opCtx, const std::string& ns);
-
- /**
* For testing only. This is a workaround for the fact that it is not possible to get a clean
* ServiceContext in between test executions. Because of this, tests which require that they get
* started with a clean (uninitialized) ShardingState must invoke this in their tearDown method.
diff --git a/src/mongo/s/commands/commands_public.cpp b/src/mongo/s/commands/commands_public.cpp
index a5eb613114c..19f0a97c962 100644
--- a/src/mongo/s/commands/commands_public.cpp
+++ b/src/mongo/s/commands/commands_public.cpp
@@ -366,6 +366,9 @@ public:
const NamespaceString nss(parseNs(dbName, cmdObj));
const auto routingInfo =
uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss));
+ uassert(ErrorCodes::IllegalOperation,
+ "You can't convertToCapped a sharded collection",
+ !routingInfo.cm());
// convertToCapped creates a temp collection and renames it at the end. It will require
// special handling for create collection.