summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKaloian Manassiev <kaloian.manassiev@mongodb.com>2020-01-29 10:47:02 -0500
committerKaloian Manassiev <kaloian.manassiev@mongodb.com>2020-01-29 11:37:03 -0500
commit385511e9d05b254beb6767ed92a3cd95e83ea166 (patch)
tree90fd740beb4ef3b89552426d4f498cb1ba27a663
parent329618bf849fb8fdeca96703ca691874f8817947 (diff)
downloadmongo-385511e9d05b254beb6767ed92a3cd95e83ea166.tar.gz
Revert "SERVER-45599 Backport of SERVER-39495: Move ShardingState::needCollectionMetadata under OperationShardingState"
This reverts commit 1a01c53df8f7c1e016c0ccbc38b77f6b3508bf65.
-rw-r--r--jstests/change_streams/change_stream.js40
-rw-r--r--jstests/sharding/change_streams.js423
-rw-r--r--src/mongo/db/catalog/capped_utils.cpp28
-rw-r--r--src/mongo/db/catalog/create_collection.cpp3
-rw-r--r--src/mongo/db/catalog/rename_collection.cpp3
-rw-r--r--src/mongo/db/commands/collection_to_capped.cpp27
-rw-r--r--src/mongo/db/commands/find_cmd.cpp4
-rw-r--r--src/mongo/db/pipeline/pipeline_d.cpp5
-rw-r--r--src/mongo/db/query/find.cpp4
-rw-r--r--src/mongo/db/query/get_executor.cpp15
-rw-r--r--src/mongo/db/query/get_executor.h2
-rw-r--r--src/mongo/db/query/query_planner_params.h9
-rw-r--r--src/mongo/db/s/cleanup_orphaned_cmd.cpp8
-rw-r--r--src/mongo/db/s/operation_sharding_state.cpp11
-rw-r--r--src/mongo/db/s/operation_sharding_state.h7
-rw-r--r--src/mongo/db/s/sharding_state.cpp15
-rw-r--r--src/mongo/db/s/sharding_state.h6
-rw-r--r--src/mongo/s/commands/commands_public.cpp3
18 files changed, 262 insertions, 351 deletions
diff --git a/jstests/change_streams/change_stream.js b/jstests/change_streams/change_stream.js
index 9f41255c599..7c5688b0704 100644
--- a/jstests/change_streams/change_stream.js
+++ b/jstests/change_streams/change_stream.js
@@ -132,27 +132,6 @@
};
cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [expected]});
- jsTestLog("Testing multi:true update");
- assert.writeOK(db.t1.insert({_id: 4, a: 0, b: 1}));
- assert.writeOK(db.t1.insert({_id: 5, a: 0, b: 1}));
- cursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t1});
- assert.writeOK(db.t1.update({a: 0}, {$set: {b: 2}}, {multi: true}));
- expected = [
- {
- documentKey: {_id: 4},
- ns: {db: "test", coll: "t1"},
- operationType: "update",
- updateDescription: {removedFields: [], updatedFields: {b: 2}}
- },
- {
- documentKey: {_id: 5},
- ns: {db: "test", coll: "t1"},
- operationType: "update",
- updateDescription: {removedFields: [], updatedFields: {b: 2}}
- }
- ];
- cst.assertNextChangesEqual({cursor: cursor, expectedChanges: expected});
-
jsTestLog("Testing delete");
cursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t1});
assert.writeOK(db.t1.remove({_id: 1}));
@@ -163,25 +142,6 @@
};
cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [expected]});
- jsTestLog("Testing justOne:false delete");
- assert.writeOK(db.t1.insert({_id: 6, a: 1, b: 1}));
- assert.writeOK(db.t1.insert({_id: 7, a: 1, b: 1}));
- cursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t1});
- assert.writeOK(db.t1.remove({a: 1}, {justOne: false}));
- expected = [
- {
- documentKey: {_id: 6},
- ns: {db: "test", coll: "t1"},
- operationType: "delete",
- },
- {
- documentKey: {_id: 7},
- ns: {db: "test", coll: "t1"},
- operationType: "delete",
- }
- ];
- cst.assertNextChangesEqual({cursor: cursor, expectedChanges: expected});
-
jsTestLog("Testing intervening write on another collection");
cursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t1});
let t2cursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t2});
diff --git a/jstests/sharding/change_streams.js b/jstests/sharding/change_streams.js
index 53c22bfb723..cd13f86678c 100644
--- a/jstests/sharding/change_streams.js
+++ b/jstests/sharding/change_streams.js
@@ -3,9 +3,10 @@
(function() {
"use strict";
- load('jstests/aggregation/extras/utils.js'); // For assertErrorCode()
- load('jstests/libs/change_stream_util.js');
load('jstests/replsets/libs/two_phase_drops.js'); // For TwoPhaseDropCollectionTest.
+ load('jstests/aggregation/extras/utils.js'); // For assertErrorCode().
+
+ // For supportsMajorityReadConcern().
load("jstests/multiVersion/libs/causal_consistency_helpers.js");
if (!supportsMajorityReadConcern()) {
@@ -13,241 +14,187 @@
return;
}
- function runTest(collName, shardKey) {
- const st = new ShardingTest({
- shards: 2,
- rs: {
- nodes: 1,
- enableMajorityReadConcern: '',
- // Intentionally disable the periodic no-op writer in order to allow the test have
- // control of advancing the cluster time. For when it is enabled later in the test,
- // use a higher frequency for periodic noops to speed up the test.
- setParameter: {periodicNoopIntervalSecs: 1, writePeriodicNoops: false}
- }
- });
-
- const mongosDB = st.s0.getDB(jsTestName());
- assert.commandWorked(st.s0.adminCommand({enableSharding: mongosDB.getName()}));
- st.ensurePrimaryShard(mongosDB.getName(), st.shard0.shardName);
-
- const mongosColl = mongosDB[collName];
-
- //
- // Sanity tests
- //
-
- // Test that $sort and $group are banned from running in a $changeStream pipeline.
- assertErrorCode(mongosDB.NegativeTest,
- [{$changeStream: {}}, {$sort: {operationType: 1}}],
- ErrorCodes.IllegalOperation);
- assertErrorCode(mongosDB.NegativeTest,
- [{$changeStream: {}}, {$group: {_id: '$documentKey'}}],
- ErrorCodes.IllegalOperation);
-
- // Test that using change streams with any stages not allowed to run on mongos results in an
- // error.
- assertErrorCode(
- mongosColl, [{$changeStream: {}}, {$out: "shouldntWork"}], ErrorCodes.IllegalOperation);
-
- //
- // Main tests
- //
-
- function makeShardKey(value) {
- var obj = {};
- obj[shardKey] = value;
- return obj;
- }
-
- function makeShardKeyDocument(value, optExtraFields) {
- var obj = {};
- if (shardKey !== '_id')
- obj['_id'] = value;
- obj[shardKey] = value;
- return Object.assign(obj, optExtraFields);
+ const st = new ShardingTest({
+ shards: 2,
+ rs: {
+ nodes: 1,
+ enableMajorityReadConcern: '',
+ // Use a higher frequency for periodic noops to speed up the test.
+ setParameter: {periodicNoopIntervalSecs: 1}
}
-
- jsTestLog('Testing change streams with shard key ' + shardKey);
- // Shard the test collection and split it into 2 chunks:
- // [MinKey, 0) - shard0, [0, MaxKey) - shard1
- st.shardColl(mongosColl,
- makeShardKey(1) /* shard key */,
- makeShardKey(0) /* split at */,
- makeShardKey(1) /* move to shard 1 */);
-
- // Write a document to each chunk.
- assert.writeOK(mongosColl.insert(makeShardKeyDocument(-1)));
- assert.writeOK(mongosColl.insert(makeShardKeyDocument(1)));
-
- let changeStream = mongosColl.aggregate([{$changeStream: {}}]);
-
- // Test that a change stream can see inserts on shard 0.
- assert.writeOK(mongosColl.insert(makeShardKeyDocument(1000)));
- assert.writeOK(mongosColl.insert(makeShardKeyDocument(-1000)));
-
- assert.soon(() => changeStream.hasNext(), "expected to be able to see the first insert");
- assertChangeStreamEventEq(changeStream.next(), {
- documentKey: makeShardKeyDocument(1000),
- fullDocument: makeShardKeyDocument(1000),
- ns: {db: mongosDB.getName(), coll: mongosColl.getName()},
- operationType: "insert",
- });
-
- // Because the periodic noop writer is disabled, do another write to shard 0 in order to
- // advance that shard's clock and enabling the stream to return the earlier write to shard 1
- assert.writeOK(mongosColl.insert(makeShardKeyDocument(1001)));
-
- assert.soon(() => changeStream.hasNext(), "expected to be able to see the second insert");
- assertChangeStreamEventEq(changeStream.next(), {
- documentKey: makeShardKeyDocument(-1000),
- fullDocument: makeShardKeyDocument(-1000),
- ns: {db: mongosDB.getName(), coll: mongosColl.getName()},
- operationType: "insert",
- });
-
- // Test that all changes are eventually visible due to the periodic noop writer.
- assert.commandWorked(
- st.rs0.getPrimary().adminCommand({setParameter: 1, writePeriodicNoops: true}));
- assert.commandWorked(
- st.rs1.getPrimary().adminCommand({setParameter: 1, writePeriodicNoops: true}));
-
- assert.soon(() => changeStream.hasNext());
- assertChangeStreamEventEq(changeStream.next(), {
- documentKey: makeShardKeyDocument(1001),
- fullDocument: makeShardKeyDocument(1001),
- ns: {db: mongosDB.getName(), coll: mongosColl.getName()},
- operationType: "insert",
- });
- changeStream.close();
-
- jsTestLog('Testing multi-update change streams with shard key ' + shardKey);
- assert.writeOK(mongosColl.insert(makeShardKeyDocument(10, {a: 0, b: 0})));
- assert.writeOK(mongosColl.insert(makeShardKeyDocument(-10, {a: 0, b: 0})));
- changeStream = mongosColl.aggregate([{$changeStream: {}}]);
-
- assert.writeOK(mongosColl.update({a: 0}, {$set: {b: 2}}, {multi: true}));
-
- assert.soon(() => changeStream.hasNext());
- assertChangeStreamEventEq(changeStream.next(), {
- operationType: "update",
- ns: {db: mongosDB.getName(), coll: mongosColl.getName()},
- documentKey: makeShardKeyDocument(-10),
- updateDescription: {updatedFields: {b: 2}, removedFields: []},
- });
-
- assert.soon(() => changeStream.hasNext());
- assertChangeStreamEventEq(changeStream.next(), {
- operationType: "update",
- ns: {db: mongosDB.getName(), coll: mongosColl.getName()},
- documentKey: makeShardKeyDocument(10),
- updateDescription: {updatedFields: {b: 2}, removedFields: []},
- });
- changeStream.close();
-
- // Test that it is legal to open a change stream, even if the
- // 'internalQueryProhibitMergingOnMongos' parameter is set.
- assert.commandWorked(
- st.s0.adminCommand({setParameter: 1, internalQueryProhibitMergingOnMongoS: true}));
- let tempCursor = assert.doesNotThrow(() => mongosColl.aggregate([{$changeStream: {}}]));
- tempCursor.close();
- assert.commandWorked(
- st.s0.adminCommand({setParameter: 1, internalQueryProhibitMergingOnMongoS: false}));
-
- assert.writeOK(mongosColl.remove({}));
- // We awaited the replication of the first write, so the change stream shouldn't return it.
- // Use { w: "majority" } to deal with journaling correctly, even though we only have one
- // node.
- assert.writeOK(
- mongosColl.insert(makeShardKeyDocument(0, {a: 1}), {writeConcern: {w: "majority"}}));
-
- changeStream = mongosColl.aggregate([{$changeStream: {}}]);
- assert(!changeStream.hasNext());
-
- // Drop the collection and test that we return a "drop" followed by an "invalidate" entry
- // and close the cursor.
- jsTestLog('Testing getMore command closes cursor for invalidate entries with shard key' +
- shardKey);
- mongosColl.drop();
- // Wait for the drop to actually happen.
- assert.soon(() => !TwoPhaseDropCollectionTest.collectionIsPendingDropInDatabase(
- mongosColl.getDB(), mongosColl.getName()));
- assert.soon(() => changeStream.hasNext());
- assert.eq(changeStream.next().operationType, "drop");
- assert.soon(() => changeStream.hasNext());
- assert.eq(changeStream.next().operationType, "invalidate");
- assert(changeStream.isExhausted());
-
- jsTestLog('Testing aggregate command closes cursor for invalidate entries with shard key' +
- shardKey);
- // Shard the test collection and split it into 2 chunks:
- // [MinKey, 0) - shard0, [0, MaxKey) - shard1
- st.shardColl(mongosColl,
- makeShardKey(1) /* shard key */,
- makeShardKey(0) /* split at */,
- makeShardKey(1) /* move to shard 1 */);
-
- // Write one document to each chunk.
- assert.writeOK(
- mongosColl.insert(makeShardKeyDocument(-1), {writeConcern: {w: "majority"}}));
- assert.writeOK(mongosColl.insert(makeShardKeyDocument(1), {writeConcern: {w: "majority"}}));
-
- changeStream = mongosColl.aggregate([{$changeStream: {}}]);
- assert(!changeStream.hasNext());
-
- // Store a valid resume token before dropping the collection, to be used later in the test
- assert.writeOK(
- mongosColl.insert(makeShardKeyDocument(-2), {writeConcern: {w: "majority"}}));
- assert.writeOK(mongosColl.insert(makeShardKeyDocument(2), {writeConcern: {w: "majority"}}));
-
- assert.soon(() => changeStream.hasNext());
- const resumeToken = changeStream.next()._id;
-
- mongosColl.drop();
-
- assert.soon(() => changeStream.hasNext());
- assertChangeStreamEventEq(changeStream.next(), {
- documentKey: makeShardKeyDocument(2),
- fullDocument: makeShardKeyDocument(2),
- ns: {db: mongosDB.getName(), coll: mongosColl.getName()},
- operationType: "insert",
- });
-
- assert.soon(() => changeStream.hasNext());
- assert.eq(changeStream.next().operationType, "drop");
-
- assert.soon(() => changeStream.hasNext());
- assert.eq(changeStream.next().operationType, "invalidate");
-
- // With an explicit collation, test that we can resume from before the collection drop
- changeStream =
- mongosColl.watch([], {resumeAfter: resumeToken, collation: {locale: "simple"}});
-
- assert.soon(() => changeStream.hasNext());
- assertChangeStreamEventEq(changeStream.next(), {
- documentKey: makeShardKeyDocument(2),
- fullDocument: makeShardKeyDocument(2),
- ns: {db: mongosDB.getName(), coll: mongosColl.getName()},
- operationType: "insert",
- });
-
- assert.soon(() => changeStream.hasNext());
- assert.eq(changeStream.next().operationType, "drop");
-
- assert.soon(() => changeStream.hasNext());
- assert.eq(changeStream.next().operationType, "invalidate");
-
- // Without an explicit collation, test that we *cannot* resume from before the collection
- // drop
- assert.commandFailedWithCode(mongosDB.runCommand({
- aggregate: mongosColl.getName(),
- pipeline: [{$changeStream: {resumeAfter: resumeToken}}],
- cursor: {}
- }),
- ErrorCodes.InvalidResumeToken);
-
- st.stop();
- }
-
- runTest('with_id_shard_key', '_id');
- runTest('with_non_id_shard_key', 'non_id');
+ });
+
+ const mongosDB = st.s0.getDB(jsTestName());
+ const mongosColl = mongosDB[jsTestName()];
+
+ assert.commandWorked(mongosDB.dropDatabase());
+
+ // Enable sharding on the test DB and ensure its primary is st.shard0.shardName.
+ assert.commandWorked(mongosDB.adminCommand({enableSharding: mongosDB.getName()}));
+ st.ensurePrimaryShard(mongosDB.getName(), st.rs0.getURL());
+
+ // Shard the test collection on _id.
+ assert.commandWorked(
+ mongosDB.adminCommand({shardCollection: mongosColl.getFullName(), key: {_id: 1}}));
+
+ // Split the collection into 2 chunks: [MinKey, 0), [0, MaxKey).
+ assert.commandWorked(
+ mongosDB.adminCommand({split: mongosColl.getFullName(), middle: {_id: 0}}));
+
+ // Move the [0, MaxKey) chunk to st.shard1.shardName.
+ assert.commandWorked(mongosDB.adminCommand(
+ {moveChunk: mongosColl.getFullName(), find: {_id: 1}, to: st.rs1.getURL()}));
+
+ // Write a document to each chunk.
+ assert.writeOK(mongosColl.insert({_id: -1}));
+ assert.writeOK(mongosColl.insert({_id: 1}));
+
+ let changeStream =
+ mongosColl.aggregate([{$changeStream: {}}, {$project: {_id: 0, clusterTime: 0}}]);
+
+ // Test that a change stream can see inserts on shard 0.
+ assert.writeOK(mongosColl.insert({_id: 1000}));
+ assert.writeOK(mongosColl.insert({_id: -1000}));
+
+ assert.soon(() => changeStream.hasNext(), "expected to be able to see the first insert");
+ assert.docEq(changeStream.next(), {
+ documentKey: {_id: 1000},
+ fullDocument: {_id: 1000},
+ ns: {db: mongosDB.getName(), coll: mongosColl.getName()},
+ operationType: "insert",
+ });
+
+ // Now do another write to shard 0, advancing that shard's clock and enabling the stream to
+ // return the earlier write to shard 1.
+ assert.writeOK(mongosColl.insert({_id: 1001}));
+
+ assert.soon(() => changeStream.hasNext(), "expected to be able to see the second insert");
+ assert.docEq(changeStream.next(), {
+ documentKey: {_id: -1000},
+ fullDocument: {_id: -1000},
+ ns: {db: mongosDB.getName(), coll: mongosColl.getName()},
+ operationType: "insert",
+ });
+
+ // Test that all changes are eventually visible due to the periodic noop writer.
+ assert.commandWorked(
+ st.rs0.getPrimary().adminCommand({setParameter: 1, writePeriodicNoops: true}));
+ assert.commandWorked(
+ st.rs1.getPrimary().adminCommand({setParameter: 1, writePeriodicNoops: true}));
+ assert.soon(() => changeStream.hasNext());
+
+ assert.docEq(changeStream.next(), {
+ documentKey: {_id: 1001},
+ fullDocument: {_id: 1001},
+ ns: {db: mongosDB.getName(), coll: mongosColl.getName()},
+ operationType: "insert",
+ });
+ changeStream.close();
+
+ // Test that using change streams with any stages not allowed to run on mongos results in an
+ // error.
+ assertErrorCode(
+ mongosColl, [{$changeStream: {}}, {$out: "shouldntWork"}], ErrorCodes.IllegalOperation);
+
+ // Test that it is legal to open a change stream, even if the
+ // 'internalQueryProhibitMergingOnMongos' parameter is set.
+ assert.commandWorked(
+ mongosDB.adminCommand({setParameter: 1, internalQueryProhibitMergingOnMongoS: true}));
+ let tempCursor = assert.doesNotThrow(() => mongosColl.aggregate([{$changeStream: {}}]));
+ tempCursor.close();
+ assert.commandWorked(
+ mongosDB.adminCommand({setParameter: 1, internalQueryProhibitMergingOnMongoS: false}));
+
+ // Test that $sort and $group are banned from running in a $changeStream pipeline.
+ assertErrorCode(mongosColl,
+ [{$changeStream: {}}, {$sort: {operationType: 1}}],
+ ErrorCodes.IllegalOperation);
+ assertErrorCode(mongosColl,
+ [{$changeStream: {}}, {$group: {_id: "$documentKey"}}],
+ ErrorCodes.IllegalOperation);
+
+ assert.writeOK(mongosColl.remove({}));
+ // We awaited the replication of the first write, so the change stream shouldn't return it.
+ // Use { w: "majority" } to deal with journaling correctly, even though we only have one node.
+ assert.writeOK(mongosColl.insert({_id: 0, a: 1}, {writeConcern: {w: "majority"}}));
+
+ changeStream = mongosColl.aggregate([{$changeStream: {}}, {$project: {"_id.clusterTime": 0}}]);
+ assert(!changeStream.hasNext());
+
+ // Drop the collection and test that we return a "drop" followed by an "invalidate" entry and
+ // close the cursor.
+ jsTestLog("Testing getMore command closes cursor for invalidate entries");
+ mongosColl.drop();
+ // Wait for the drop to actually happen.
+ assert.soon(() => !TwoPhaseDropCollectionTest.collectionIsPendingDropInDatabase(
+ mongosColl.getDB(), mongosColl.getName()));
+ assert.soon(() => changeStream.hasNext());
+ assert.eq(changeStream.next().operationType, "drop");
+ assert.soon(() => changeStream.hasNext());
+ assert.eq(changeStream.next().operationType, "invalidate");
+ assert(changeStream.isExhausted());
+
+ jsTestLog("Testing aggregate command closes cursor for invalidate entries");
+ // Shard the test collection on _id.
+ assert.commandWorked(
+ mongosDB.adminCommand({shardCollection: mongosColl.getFullName(), key: {_id: 1}}));
+
+ // Split the collection into 2 chunks: [MinKey, 0), [0, MaxKey).
+ assert.commandWorked(
+ mongosDB.adminCommand({split: mongosColl.getFullName(), middle: {_id: 0}}));
+ // Move the [0, MaxKey) chunk to st.shard1.shardName.
+ assert.commandWorked(mongosDB.adminCommand(
+ {moveChunk: mongosColl.getFullName(), find: {_id: 1}, to: st.rs1.getURL()}));
+
+ // Write one document to each chunk.
+ assert.writeOK(mongosColl.insert({_id: -1}, {writeConcern: {w: "majority"}}));
+ assert.writeOK(mongosColl.insert({_id: 1}, {writeConcern: {w: "majority"}}));
+
+ changeStream = mongosColl.aggregate([{$changeStream: {}}]);
+ assert(!changeStream.hasNext());
+
+ // Store a valid resume token before dropping the collection, to be used later in the test.
+ assert.writeOK(mongosColl.insert({_id: -2}, {writeConcern: {w: "majority"}}));
+ assert.writeOK(mongosColl.insert({_id: 2}, {writeConcern: {w: "majority"}}));
+
+ assert.soon(() => changeStream.hasNext());
+ const resumeToken = changeStream.next()._id;
+
+ mongosColl.drop();
+
+ assert.soon(() => changeStream.hasNext());
+ let next = changeStream.next();
+ assert.eq(next.operationType, "insert");
+ assert.eq(next.documentKey._id, 2);
+
+ assert.soon(() => changeStream.hasNext());
+ assert.eq(changeStream.next().operationType, "drop");
+
+ assert.soon(() => changeStream.hasNext());
+ assert.eq(changeStream.next().operationType, "invalidate");
+
+ // With an explicit collation, test that we can resume from before the collection drop.
+ changeStream = mongosColl.watch([{$project: {_id: 0}}],
+ {resumeAfter: resumeToken, collation: {locale: "simple"}});
+
+ assert.soon(() => changeStream.hasNext());
+ next = changeStream.next();
+ assert.eq(next.operationType, "insert");
+ assert.eq(next.documentKey, {_id: 2});
+
+ assert.soon(() => changeStream.hasNext());
+ assert.eq(changeStream.next().operationType, "drop");
+
+ assert.soon(() => changeStream.hasNext());
+ assert.eq(changeStream.next().operationType, "invalidate");
+
+ // Without an explicit collation, test that we *cannot* resume from before the collection drop.
+ assert.commandFailedWithCode(mongosDB.runCommand({
+ aggregate: mongosColl.getName(),
+ pipeline: [{$changeStream: {resumeAfter: resumeToken}}],
+ cursor: {}
+ }),
+ ErrorCodes.InvalidResumeToken);
+
+ st.stop();
})();
diff --git a/src/mongo/db/catalog/capped_utils.cpp b/src/mongo/db/catalog/capped_utils.cpp
index df880bd6f4c..376f879e6f1 100644
--- a/src/mongo/db/catalog/capped_utils.cpp
+++ b/src/mongo/db/catalog/capped_utils.cpp
@@ -53,9 +53,7 @@
#include "mongo/db/service_context.h"
#include "mongo/util/scopeguard.h"
-namespace mongo {
-
-Status emptyCapped(OperationContext* opCtx, const NamespaceString& collectionName) {
+mongo::Status mongo::emptyCapped(OperationContext* opCtx, const NamespaceString& collectionName) {
AutoGetDb autoDb(opCtx, collectionName.db(), MODE_X);
bool userInitiatedWritesAndNotPrimary = opCtx->writesAreReplicated() &&
@@ -105,20 +103,20 @@ Status emptyCapped(OperationContext* opCtx, const NamespaceString& collectionNam
return status;
}
- const auto service = opCtx->getServiceContext();
- service->getOpObserver()->onEmptyCapped(opCtx, collection->ns(), collection->uuid());
+ getGlobalServiceContext()->getOpObserver()->onEmptyCapped(
+ opCtx, collection->ns(), collection->uuid());
wuow.commit();
return Status::OK();
}
-Status cloneCollectionAsCapped(OperationContext* opCtx,
- Database* db,
- const std::string& shortFrom,
- const std::string& shortTo,
- long long size,
- bool temp) {
+mongo::Status mongo::cloneCollectionAsCapped(OperationContext* opCtx,
+ Database* db,
+ const std::string& shortFrom,
+ const std::string& shortTo,
+ long long size,
+ bool temp) {
NamespaceString fromNss(db->name(), shortFrom);
NamespaceString toNss(db->name(), shortTo);
@@ -253,9 +251,9 @@ Status cloneCollectionAsCapped(OperationContext* opCtx,
MONGO_UNREACHABLE;
}
-Status convertToCapped(OperationContext* opCtx,
- const NamespaceString& collectionName,
- long long size) {
+mongo::Status mongo::convertToCapped(OperationContext* opCtx,
+ const NamespaceString& collectionName,
+ long long size) {
StringData dbname = collectionName.db();
StringData shortSource = collectionName.coll();
@@ -303,5 +301,3 @@ Status convertToCapped(OperationContext* opCtx,
options.stayTemp = false;
return renameCollection(opCtx, longTmpName, collectionName, options);
}
-
-} // namespace mongo
diff --git a/src/mongo/db/catalog/create_collection.cpp b/src/mongo/db/catalog/create_collection.cpp
index 20ed630bae4..a9bf373f38a 100644
--- a/src/mongo/db/catalog/create_collection.cpp
+++ b/src/mongo/db/catalog/create_collection.cpp
@@ -51,7 +51,6 @@
namespace mongo {
namespace {
-
/**
* Shared part of the implementation of the createCollection versions for replicated and regular
* collection creation.
@@ -100,10 +99,8 @@ Status createCollection(OperationContext* opCtx,
return writeConflictRetry(opCtx, "create", nss.ns(), [&] {
Lock::DBLock dbXLock(opCtx, nss.db(), MODE_X);
-
const bool shardVersionCheck = true;
OldClientContext ctx(opCtx, nss.ns(), shardVersionCheck);
-
if (opCtx->writesAreReplicated() &&
!repl::ReplicationCoordinator::get(opCtx)->canAcceptWritesFor(opCtx, nss)) {
return Status(ErrorCodes::NotMaster,
diff --git a/src/mongo/db/catalog/rename_collection.cpp b/src/mongo/db/catalog/rename_collection.cpp
index 3d5b681edd4..6aadfe754b4 100644
--- a/src/mongo/db/catalog/rename_collection.cpp
+++ b/src/mongo/db/catalog/rename_collection.cpp
@@ -144,8 +144,7 @@ Status renameCollectionCommon(OperationContext* opCtx,
// We stay in source context the whole time. This is mostly to set the CurOp namespace.
boost::optional<OldClientContext> ctx;
- const bool shardVersionCheck = true;
- ctx.emplace(opCtx, source.ns(), shardVersionCheck);
+ ctx.emplace(opCtx, source.ns());
auto replCoord = repl::ReplicationCoordinator::get(opCtx);
bool userInitiatedWritesAndNotPrimary =
diff --git a/src/mongo/db/commands/collection_to_capped.cpp b/src/mongo/db/commands/collection_to_capped.cpp
index 18635222f58..a7b3530d0e9 100644
--- a/src/mongo/db/commands/collection_to_capped.cpp
+++ b/src/mongo/db/commands/collection_to_capped.cpp
@@ -46,7 +46,10 @@
#include "mongo/db/service_context.h"
namespace mongo {
-namespace {
+
+using std::unique_ptr;
+using std::string;
+using std::stringstream;
class CmdCloneCollectionAsCapped : public ErrmsgCommandDeprecated {
public:
@@ -84,9 +87,9 @@ public:
out->push_back(Privilege(ResourcePattern::forExactNamespace(nss), targetActions));
}
bool errmsgRun(OperationContext* opCtx,
- const std::string& dbname,
+ const string& dbname,
const BSONObj& jsobj,
- std::string& errmsg,
+ string& errmsg,
BSONObjBuilder& result) {
const auto fromElt = jsobj["cloneCollectionAsCapped"];
const auto toElt = jsobj["toCollection"];
@@ -137,13 +140,13 @@ public:
uassertStatusOK(status);
return true;
}
-
} cmdCloneCollectionAsCapped;
-/**
- * Converts the given collection to a capped collection w/ the specified size. This command is not
- * highly used, and is not currently supported with sharded environments.
- */
+/* jan2010:
+ Converts the given collection to a capped collection w/ the specified size.
+ This command is not highly used, and is not currently supported with sharded
+ environments.
+ */
class CmdConvertToCapped : public ErrmsgCommandDeprecated {
public:
CmdConvertToCapped() : ErrmsgCommandDeprecated("convertToCapped") {}
@@ -165,9 +168,9 @@ public:
}
bool errmsgRun(OperationContext* opCtx,
- const std::string& dbname,
+ const string& dbname,
const BSONObj& jsobj,
- std::string& errmsg,
+ string& errmsg,
BSONObjBuilder& result) {
const NamespaceString nss(CommandHelpers::parseNsCollectionRequired(dbname, jsobj));
long long size = jsobj.getField("size").safeNumberLong();
@@ -182,6 +185,4 @@ public:
}
} cmdConvertToCapped;
-
-} // namespace
-} // namespace mongo
+}
diff --git a/src/mongo/db/commands/find_cmd.cpp b/src/mongo/db/commands/find_cmd.cpp
index 4ce8c38e5f2..6811bb958da 100644
--- a/src/mongo/db/commands/find_cmd.cpp
+++ b/src/mongo/db/commands/find_cmd.cpp
@@ -201,7 +201,7 @@ public:
Collection* const collection = ctx->getCollection();
// We have a parsed query. Time to get the execution plan for it.
- auto statusWithPlanExecutor = getExecutorFind(opCtx, collection, std::move(cq));
+ auto statusWithPlanExecutor = getExecutorFind(opCtx, collection, nss, std::move(cq));
if (!statusWithPlanExecutor.isOK()) {
return statusWithPlanExecutor.getStatus();
}
@@ -312,7 +312,7 @@ public:
Collection* const collection = ctx->getCollection();
// Get the execution plan for the query.
- auto statusWithPlanExecutor = getExecutorFind(opCtx, collection, std::move(cq));
+ auto statusWithPlanExecutor = getExecutorFind(opCtx, collection, nss, std::move(cq));
uassertStatusOK(statusWithPlanExecutor.getStatus());
auto exec = std::move(statusWithPlanExecutor.getValue());
diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp
index 5291d665598..5b368e81af1 100644
--- a/src/mongo/db/pipeline/pipeline_d.cpp
+++ b/src/mongo/db/pipeline/pipeline_d.cpp
@@ -72,7 +72,6 @@
#include "mongo/db/query/plan_summary_stats.h"
#include "mongo/db/query/query_planner.h"
#include "mongo/db/s/collection_sharding_state.h"
-#include "mongo/db/s/operation_sharding_state.h"
#include "mongo/db/s/sharding_state.h"
#include "mongo/db/service_context.h"
#include "mongo/db/session_catalog.h"
@@ -155,7 +154,7 @@ StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> createRandomCursorEx
}
// If we're in a sharded environment, we need to filter out documents we don't own.
- if (OperationShardingState::isOperationVersioned(opCtx)) {
+ if (ShardingState::get(opCtx)->needCollectionMetadata(opCtx, collection->ns().ns())) {
auto shardFilterStage = stdx::make_unique<ShardFilterStage>(
opCtx,
CollectionShardingState::get(opCtx, collection->ns())->getOrphansFilter(opCtx),
@@ -218,7 +217,7 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> attemptToGetExe
return {cq.getStatus()};
}
- return getExecutorFind(opCtx, collection, std::move(cq.getValue()), plannerOpts);
+ return getExecutorFind(opCtx, collection, nss, std::move(cq.getValue()), plannerOpts);
}
BSONObj removeSortKeyMetaProjection(BSONObj projectionObj) {
diff --git a/src/mongo/db/query/find.cpp b/src/mongo/db/query/find.cpp
index c8308a0f331..ae14be74be9 100644
--- a/src/mongo/db/query/find.cpp
+++ b/src/mongo/db/query/find.cpp
@@ -560,8 +560,8 @@ std::string runQuery(OperationContext* opCtx,
repl::ReplicationCoordinator::get(opCtx)->checkCanServeReadsFor(opCtx, nss, slaveOK));
}
- // Get the execution plan for the query.
- auto exec = uassertStatusOK(getExecutorLegacyFind(opCtx, collection, std::move(cq)));
+ // We have a parsed query. Time to get the execution plan for it.
+ auto exec = uassertStatusOK(getExecutorLegacyFind(opCtx, collection, nss, std::move(cq)));
const QueryRequest& qr = exec->getCanonicalQuery()->getQueryRequest();
diff --git a/src/mongo/db/query/get_executor.cpp b/src/mongo/db/query/get_executor.cpp
index c00ab569f65..ea89990bd2c 100644
--- a/src/mongo/db/query/get_executor.cpp
+++ b/src/mongo/db/query/get_executor.cpp
@@ -76,8 +76,9 @@
#include "mongo/db/query/stage_builder.h"
#include "mongo/db/repl/optime.h"
#include "mongo/db/repl/replication_coordinator.h"
+#include "mongo/db/s/collection_metadata.h"
#include "mongo/db/s/collection_sharding_state.h"
-#include "mongo/db/s/operation_sharding_state.h"
+#include "mongo/db/s/sharding_state.h"
#include "mongo/db/server_options.h"
#include "mongo/db/server_parameters.h"
#include "mongo/db/service_context.h"
@@ -673,6 +674,7 @@ StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getOplogStartHack(
StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> _getExecutorFind(
OperationContext* opCtx,
Collection* collection,
+ const NamespaceString& nss,
unique_ptr<CanonicalQuery> canonicalQuery,
PlanExecutor::YieldPolicy yieldPolicy,
size_t plannerOptions) {
@@ -685,7 +687,7 @@ StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> _getExecutorFind(
return getOplogStartHack(opCtx, collection, std::move(canonicalQuery), plannerOptions);
}
- if (OperationShardingState::isOperationVersioned(opCtx)) {
+ if (ShardingState::get(opCtx)->needCollectionMetadata(opCtx, nss.ns())) {
plannerOptions |= QueryPlannerParams::INCLUDE_SHARD_FILTER;
}
return getExecutor(opCtx, collection, std::move(canonicalQuery), yieldPolicy, plannerOptions);
@@ -696,22 +698,25 @@ StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> _getExecutorFind(
StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutorFind(
OperationContext* opCtx,
Collection* collection,
+ const NamespaceString& nss,
unique_ptr<CanonicalQuery> canonicalQuery,
size_t plannerOptions) {
- const auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx);
+ auto readConcernArgs = repl::ReadConcernArgs::get(opCtx);
auto yieldPolicy = readConcernArgs.getLevel() == repl::ReadConcernLevel::kSnapshotReadConcern
? PlanExecutor::INTERRUPT_ONLY
: PlanExecutor::YIELD_AUTO;
return _getExecutorFind(
- opCtx, collection, std::move(canonicalQuery), yieldPolicy, plannerOptions);
+ opCtx, collection, nss, std::move(canonicalQuery), yieldPolicy, plannerOptions);
}
StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutorLegacyFind(
OperationContext* opCtx,
Collection* collection,
+ const NamespaceString& nss,
std::unique_ptr<CanonicalQuery> canonicalQuery) {
return _getExecutorFind(opCtx,
collection,
+ nss,
std::move(canonicalQuery),
PlanExecutor::YIELD_AUTO,
QueryPlannerParams::DEFAULT);
@@ -1363,7 +1368,7 @@ StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutorCount(
}
size_t plannerOptions = QueryPlannerParams::IS_COUNT;
- if (OperationShardingState::isOperationVersioned(opCtx)) {
+ if (ShardingState::get(opCtx)->needCollectionMetadata(opCtx, request.getNs().ns())) {
plannerOptions |= QueryPlannerParams::INCLUDE_SHARD_FILTER;
}
diff --git a/src/mongo/db/query/get_executor.h b/src/mongo/db/query/get_executor.h
index 26b7513f251..f8abcc16c13 100644
--- a/src/mongo/db/query/get_executor.h
+++ b/src/mongo/db/query/get_executor.h
@@ -99,6 +99,7 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutor(
StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutorFind(
OperationContext* opCtx,
Collection* collection,
+ const NamespaceString& nss,
std::unique_ptr<CanonicalQuery> canonicalQuery,
size_t plannerOptions = QueryPlannerParams::DEFAULT);
@@ -108,6 +109,7 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutorFind
StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> getExecutorLegacyFind(
OperationContext* opCtx,
Collection* collection,
+ const NamespaceString& nss,
std::unique_ptr<CanonicalQuery> canonicalQuery);
/**
diff --git a/src/mongo/db/query/query_planner_params.h b/src/mongo/db/query/query_planner_params.h
index cdac2cf1307..9e4e8fa4f0c 100644
--- a/src/mongo/db/query/query_planner_params.h
+++ b/src/mongo/db/query/query_planner_params.h
@@ -59,11 +59,10 @@ struct QueryPlannerParams {
// Set this if you're running on a sharded cluster. We'll add a "drop all docs that
// shouldn't be on this shard" stage before projection.
//
- // In order to set this, you must check OperationShardingState::isOperationVersioned() in
- // the same lock that you use to build the query executor. You must also wrap the
- // PlanExecutor in a ClientCursor within the same lock.
- //
- // See the comment on ShardFilterStage for details.
+ // In order to set this, you must check
+ // ShardingState::needCollectionMetadata(current_namespace) in the same lock that you use to
+ // build the query executor. You must also wrap the PlanExecutor in a ClientCursor within
+ // the same lock. See the comment on ShardFilterStage for details.
INCLUDE_SHARD_FILTER = 1 << 2,
// Set this if you don't want any plans with a blocking sort stage. All sorts must be
diff --git a/src/mongo/db/s/cleanup_orphaned_cmd.cpp b/src/mongo/db/s/cleanup_orphaned_cmd.cpp
index c1676b79b18..8bb1c5546e0 100644
--- a/src/mongo/db/s/cleanup_orphaned_cmd.cpp
+++ b/src/mongo/db/s/cleanup_orphaned_cmd.cpp
@@ -81,7 +81,13 @@ CleanupResult cleanupOrphanedData(OperationContext* opCtx,
{
AutoGetCollection autoColl(opCtx, ns, MODE_IX);
auto* const css = CollectionShardingRuntime::get(opCtx, ns);
- const auto metadata = css->getCurrentMetadata();
+ const auto optMetadata = css->getCurrentMetadataIfKnown();
+ uassert(ErrorCodes::ConflictingOperationInProgress,
+ str::stream() << "Unable to establish sharding status for collection " << ns.ns(),
+ optMetadata);
+
+ const auto& metadata = *optMetadata;
+
if (!metadata->isSharded()) {
LOG(0) << "skipping orphaned data cleanup for " << ns.ns()
<< ", collection is not sharded";
diff --git a/src/mongo/db/s/operation_sharding_state.cpp b/src/mongo/db/s/operation_sharding_state.cpp
index 0600d1f6216..2c588be9ece 100644
--- a/src/mongo/db/s/operation_sharding_state.cpp
+++ b/src/mongo/db/s/operation_sharding_state.cpp
@@ -33,9 +33,9 @@
#include "mongo/db/s/operation_sharding_state.h"
#include "mongo/db/operation_context.h"
-#include "mongo/db/s/sharded_connection_info.h"
namespace mongo {
+
namespace {
const OperationContext::Decoration<OperationShardingState> shardingMetadataDecoration =
@@ -49,7 +49,6 @@ const Milliseconds kMaxWaitForMovePrimaryCriticalSection = Minutes(5);
// The name of the field in which the client attaches its database version.
constexpr auto kDbVersionField = "databaseVersion"_sd;
-
} // namespace
OperationShardingState::OperationShardingState() = default;
@@ -62,14 +61,6 @@ OperationShardingState& OperationShardingState::get(OperationContext* opCtx) {
return shardingMetadataDecoration(opCtx);
}
-bool OperationShardingState::isOperationVersioned(OperationContext* opCtx) {
- const auto client = opCtx->getClient();
-
- // Shard version information received from mongos may either be attached to the Client or
- // directly to the OperationContext
- return ShardedConnectionInfo::get(client, false) || get(opCtx).hasShardVersion();
-}
-
void OperationShardingState::setAllowImplicitCollectionCreation(
const BSONElement& allowImplicitCollectionCreationElem) {
if (!allowImplicitCollectionCreationElem.eoo()) {
diff --git a/src/mongo/db/s/operation_sharding_state.h b/src/mongo/db/s/operation_sharding_state.h
index 51914410a3e..d2cf419f8e6 100644
--- a/src/mongo/db/s/operation_sharding_state.h
+++ b/src/mongo/db/s/operation_sharding_state.h
@@ -64,13 +64,6 @@ public:
static OperationShardingState& get(OperationContext* opCtx);
/**
- * Returns true if the the current operation was sent by the caller with shard version
- * information attached, meaning that it must perform shard version checking and orphan
- * filtering.
- */
- static bool isOperationVersioned(OperationContext* opCtx);
-
- /**
* Requests on a sharded collection that are broadcast without a shardVersion should not cause
* the collection to be created on a shard that does not know about the collection already,
* since the collection options will not be propagated. Such requests specify to disallow
diff --git a/src/mongo/db/s/sharding_state.cpp b/src/mongo/db/s/sharding_state.cpp
index 331c52ca2d2..f5e41c73f20 100644
--- a/src/mongo/db/s/sharding_state.cpp
+++ b/src/mongo/db/s/sharding_state.cpp
@@ -34,7 +34,8 @@
#include "mongo/db/s/sharding_state.h"
-#include "mongo/db/operation_context.h"
+#include "mongo/db/s/operation_sharding_state.h"
+#include "mongo/db/s/sharded_connection_info.h"
#include "mongo/db/server_options.h"
#include "mongo/util/log.h"
@@ -116,6 +117,18 @@ OID ShardingState::clusterId() {
return _clusterId;
}
+bool ShardingState::needCollectionMetadata(OperationContext* opCtx, const std::string& ns) {
+ if (!enabled())
+ return false;
+
+ Client* client = opCtx->getClient();
+
+ // Shard version information received from mongos may either by attached to the Client or
+ // directly to the OperationContext.
+ return ShardedConnectionInfo::get(client, false) ||
+ OperationShardingState::get(opCtx).hasShardVersion();
+}
+
void ShardingState::clearForTests() {
_initializationState.store(static_cast<uint32_t>(InitializationState::kNew));
}
diff --git a/src/mongo/db/s/sharding_state.h b/src/mongo/db/s/sharding_state.h
index e049d0fcb3b..fb220c9aaea 100644
--- a/src/mongo/db/s/sharding_state.h
+++ b/src/mongo/db/s/sharding_state.h
@@ -106,6 +106,12 @@ public:
OID clusterId();
/**
+ * Returns true if this node is a shard and if the currently runnint operation must engage the
+ * sharding subsystem (i.e., perform version checking, orphan filtering, etc).
+ */
+ bool needCollectionMetadata(OperationContext* opCtx, const std::string& ns);
+
+ /**
* For testing only. This is a workaround for the fact that it is not possible to get a clean
* ServiceContext in between test executions. Because of this, tests which require that they get
* started with a clean (uninitialized) ShardingState must invoke this in their tearDown method.
diff --git a/src/mongo/s/commands/commands_public.cpp b/src/mongo/s/commands/commands_public.cpp
index 19f0a97c962..a5eb613114c 100644
--- a/src/mongo/s/commands/commands_public.cpp
+++ b/src/mongo/s/commands/commands_public.cpp
@@ -366,9 +366,6 @@ public:
const NamespaceString nss(parseNs(dbName, cmdObj));
const auto routingInfo =
uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss));
- uassert(ErrorCodes::IllegalOperation,
- "You can't convertToCapped a sharded collection",
- !routingInfo.cm());
// convertToCapped creates a temp collection and renames it at the end. It will require
// special handling for create collection.