summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorJames Wahlin <james@mongodb.com>2017-06-07 14:59:10 -0400
committerJames Wahlin <james@mongodb.com>2017-06-07 14:59:10 -0400
commitb85788f99e2f7d309582bd9d31ffc2769de72405 (patch)
treefdf5348758f1713163bb7ae63c081b64a15547ef /src
parent103972cebb1cdfccbc76c2afd82c824ed02fc11c (diff)
downloadmongo-b85788f99e2f7d309582bd9d31ffc2769de72405.tar.gz
Revert "SERVER-29125 Add $changeNotification stage that always outputs the single last oplog entry, unmodified"
This reverts commit bbfbb9f0ee75db566ac236c6c12daad71e3d3e10.
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/commands/run_aggregate.cpp12
-rw-r--r--src/mongo/db/pipeline/SConscript2
-rw-r--r--src/mongo/db/pipeline/document_source_change_notification.cpp84
-rw-r--r--src/mongo/db/pipeline/document_source_change_notification.h75
-rw-r--r--src/mongo/db/pipeline/document_source_change_notification_test.cpp93
-rw-r--r--src/mongo/db/pipeline/lite_parsed_document_source.h7
-rw-r--r--src/mongo/db/pipeline/lite_parsed_pipeline.h7
-rw-r--r--src/mongo/db/pipeline/pipeline_d.cpp36
-rw-r--r--src/mongo/db/pipeline/pipeline_d.h3
9 files changed, 13 insertions, 306 deletions
diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp
index 123bd491456..02dd220d340 100644
--- a/src/mongo/db/commands/run_aggregate.cpp
+++ b/src/mongo/db/commands/run_aggregate.cpp
@@ -54,7 +54,6 @@
#include "mongo/db/query/find_common.h"
#include "mongo/db/query/get_executor.h"
#include "mongo/db/query/plan_summary_stats.h"
-#include "mongo/db/repl/oplog.h"
#include "mongo/db/service_context.h"
#include "mongo/db/storage/storage_options.h"
#include "mongo/db/views/view.h"
@@ -271,7 +270,7 @@ Status runAggregate(OperationContext* opCtx,
const BSONObj& cmdObj,
BSONObjBuilder& result) {
// For operations on views, this will be the underlying namespace.
- NamespaceString nss = request.getNamespaceString();
+ const NamespaceString& nss = request.getNamespaceString();
// Parse the user-specified collation, if any.
std::unique_ptr<CollatorInterface> userSpecifiedCollator = request.getCollation().isEmpty()
@@ -284,11 +283,6 @@ Status runAggregate(OperationContext* opCtx,
Pipeline* unownedPipeline;
auto curOp = CurOp::get(opCtx);
{
- const LiteParsedPipeline liteParsedPipeline(request);
- if (liteParsedPipeline.startsWithChangeNotification()) {
- nss = NamespaceString(repl::rsOplogName);
- }
-
// This will throw if the sharding version for this connection is out of date. If the
// namespace is a view, the lock will be released before re-running the aggregation.
AutoGetCollectionOrViewForReadCommand ctx(opCtx, nss);
@@ -299,8 +293,8 @@ Status runAggregate(OperationContext* opCtx,
// recursively calling runAggregate(), which will re-acquire locks on the underlying
// collection. (The lock must be released because recursively acquiring locks on the
// database will prohibit yielding.)
+ const LiteParsedPipeline liteParsedPipeline(request);
if (ctx.getView() && !liteParsedPipeline.startsWithCollStats()) {
- invariant(nss != repl::rsOplogName);
// Check that the default collation of 'view' is compatible with the operation's
// collation. The check is skipped if the 'request' has the empty collation, which
// means that no collation was specified.
@@ -396,7 +390,7 @@ Status runAggregate(OperationContext* opCtx,
// This does mongod-specific stuff like creating the input PlanExecutor and adding
// it to the front of the pipeline if needed.
- PipelineD::prepareCursorSource(collection, nss, &request, pipeline.get());
+ PipelineD::prepareCursorSource(collection, &request, pipeline.get());
// Transfer ownership of the Pipeline to the PipelineProxyStage.
unownedPipeline = pipeline.get();
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript
index 6c1ee4f7772..f8b58061d5c 100644
--- a/src/mongo/db/pipeline/SConscript
+++ b/src/mongo/db/pipeline/SConscript
@@ -127,7 +127,6 @@ env.CppUnitTest(
'document_source_add_fields_test.cpp',
'document_source_bucket_auto_test.cpp',
'document_source_bucket_test.cpp',
- 'document_source_change_notification_test.cpp',
'document_source_count_test.cpp',
'document_source_current_op_test.cpp',
'document_source_geo_near_test.cpp',
@@ -227,7 +226,6 @@ docSourceEnv.Library(
'document_source_add_fields.cpp',
'document_source_bucket.cpp',
'document_source_bucket_auto.cpp',
- 'document_source_change_notification.cpp',
'document_source_coll_stats.cpp',
'document_source_count.cpp',
'document_source_current_op.cpp',
diff --git a/src/mongo/db/pipeline/document_source_change_notification.cpp b/src/mongo/db/pipeline/document_source_change_notification.cpp
deleted file mode 100644
index 5fc130d83a2..00000000000
--- a/src/mongo/db/pipeline/document_source_change_notification.cpp
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Copyright (C) 2017 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kCommand
-
-#include "mongo/db/pipeline/document_source_change_notification.h"
-
-#include "mongo/db/pipeline/document_source_limit.h"
-#include "mongo/db/pipeline/document_source_match.h"
-#include "mongo/db/pipeline/document_source_sort.h"
-#include "mongo/db/pipeline/expression.h"
-#include "mongo/db/pipeline/lite_parsed_document_source.h"
-#include "mongo/util/log.h"
-
-namespace mongo {
-
-using boost::intrusive_ptr;
-using std::vector;
-
-REGISTER_MULTI_STAGE_ALIAS(changeNotification,
- DocumentSourceChangeNotification::LiteParsed::parse,
- DocumentSourceChangeNotification::createFromBson);
-
-BSONObj DocumentSourceChangeNotification::buildMatch(BSONElement elem, const NamespaceString& nss) {
- auto target = nss.toString();
- return BSON("$match" << BSON(
- "op" << BSON("$ne"
- << "n")
- << "ts"
- << BSON("$gt" << Timestamp())
- << "$or"
- << BSON_ARRAY(BSON("ns" << target)
- << BSON("op"
- << "c"
- << "$or"
- << BSON_ARRAY(BSON("o.renameCollection" << target)
- << BSON("o.to" << target))))));
-}
-
-vector<intrusive_ptr<DocumentSource>> DocumentSourceChangeNotification::createFromBson(
- BSONElement elem, const intrusive_ptr<ExpressionContext>& expCtx) {
- // TODO: Add sharding support here (SERVER-29141).
- uassert(40470,
- "The $changeNotification stage is not supported on sharded systems.",
- !expCtx->inRouter);
- uassert(40471,
- "Only default collation is allowed when using a $changeNotification stage.",
- !expCtx->getCollator());
-
- BSONObj matchObj = buildMatch(elem, expCtx->ns);
- BSONObj sortObj = BSON("$sort" << BSON("ts" << -1));
-
- auto matchSource = DocumentSourceMatch::createFromBson(matchObj.firstElement(), expCtx);
- auto sortSource = DocumentSourceSort::createFromBson(sortObj.firstElement(), expCtx);
- auto limitSource = DocumentSourceLimit::create(expCtx, 1);
- return {matchSource, sortSource, limitSource};
-}
-
-} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_change_notification.h b/src/mongo/db/pipeline/document_source_change_notification.h
deleted file mode 100644
index f18ad52b26d..00000000000
--- a/src/mongo/db/pipeline/document_source_change_notification.h
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
- * Copyright (C) 2017 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects
- * for all of the code used other than as permitted herein. If you modify
- * file(s) with this exception, you may extend this exception to your
- * version of the file(s), but you are not obligated to do so. If you do not
- * wish to do so, delete this exception statement from your version. If you
- * delete this exception statement from all source files in the program,
- * then also delete it in the license file.
- */
-
-#pragma once
-
-#include "mongo/db/pipeline/document_source.h"
-
-namespace mongo {
-
-/**
- * The $changeNotification stage is an alias for a cursor on oplog followed by a $match stage and a
- * transform stage on mongod.
- */
-class DocumentSourceChangeNotification final {
-public:
- class LiteParsed final : public LiteParsedDocumentSource {
- public:
- static std::unique_ptr<LiteParsed> parse(const AggregationRequest& request,
- const BSONElement& spec) {
- return stdx::make_unique<LiteParsed>();
- }
-
- bool isChangeNotification() const final {
- return true;
- }
-
- stdx::unordered_set<NamespaceString> getInvolvedNamespaces() const final {
- return stdx::unordered_set<NamespaceString>();
- }
- };
-
- /**
- * Produce the BSON for the $match stage based on a $changeNotification stage.
- */
- static BSONObj buildMatch(BSONElement elem, const NamespaceString& nss);
-
- /**
- * Parses a $changeNotification stage from 'elem' and produces the $match and transformation
- * stages required.
- */
- static std::vector<boost::intrusive_ptr<DocumentSource>> createFromBson(
- BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx);
-
-private:
- // It is illegal to construct a DocumentSourceChangeNotification directly, use createFromBson()
- // instead.
- DocumentSourceChangeNotification() = default;
-};
-
-} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_source_change_notification_test.cpp b/src/mongo/db/pipeline/document_source_change_notification_test.cpp
deleted file mode 100644
index c75677507c1..00000000000
--- a/src/mongo/db/pipeline/document_source_change_notification_test.cpp
+++ /dev/null
@@ -1,93 +0,0 @@
-/**
- * Copyright (C) 2016 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects
- * for all of the code used other than as permitted herein. If you modify
- * file(s) with this exception, you may extend this exception to your
- * version of the file(s), but you are not obligated to do so. If you do not
- * wish to do so, delete this exception statement from your version. If you
- * delete this exception statement from all source files in the program,
- * then also delete it in the license file.
- */
-
-#include "mongo/platform/basic.h"
-
-#include <boost/intrusive_ptr.hpp>
-#include <vector>
-
-#include "mongo/bson/bsonobj.h"
-#include "mongo/bson/json.h"
-#include "mongo/db/pipeline/aggregation_context_fixture.h"
-#include "mongo/db/pipeline/document.h"
-#include "mongo/db/pipeline/document_source_change_notification.h"
-#include "mongo/db/pipeline/document_source_limit.h"
-#include "mongo/db/pipeline/document_source_match.h"
-#include "mongo/db/pipeline/document_source_sort.h"
-#include "mongo/db/pipeline/document_value_test_util.h"
-#include "mongo/db/pipeline/value.h"
-#include "mongo/db/pipeline/value_comparator.h"
-#include "mongo/unittest/unittest.h"
-
-namespace mongo {
-namespace {
-
-using std::vector;
-using boost::intrusive_ptr;
-
-using ChangeNotificationStageTest = AggregationContextFixture;
-
-TEST_F(ChangeNotificationStageTest, Basic) {
- const auto spec = fromjson("{$changeNotification: {}}");
-
- vector<intrusive_ptr<DocumentSource>> result =
- DocumentSourceChangeNotification::createFromBson(spec.firstElement(), getExpCtx());
-
- ASSERT_EQUALS(result.size(), 3UL);
-
- const auto* matchStage = dynamic_cast<DocumentSourceMatch*>(result[0].get());
- ASSERT(matchStage);
- const std::string target = "unittests.pipeline_test";
- ASSERT_BSONOBJ_EQ(matchStage->getQuery(),
- BSON("op" << BSON("$ne"
- << "n")
- << "ts"
- << BSON("$gt" << Timestamp())
- << "$or"
- << BSON_ARRAY(BSON("ns" << target) << BSON(
- "op"
- << "c"
- << "$or"
- << BSON_ARRAY(BSON("o.renameCollection" << target)
- << BSON("o.to" << target))))));
-
- auto* sortStage = dynamic_cast<DocumentSourceSort*>(result[1].get());
- ASSERT(sortStage);
- BSONObjSet outputSort = sortStage->getOutputSorts();
- ASSERT_EQUALS(outputSort.count(BSON("ts" << -1)), 1U);
- ASSERT_EQUALS(outputSort.size(), 1U);
-
- const auto* limitStage = dynamic_cast<DocumentSourceLimit*>(result[2].get());
- ASSERT(limitStage);
- ASSERT_EQUALS(limitStage->getLimit(), 1);
-
- // TODO: Check explain result.
-}
-
-} // namespace
-} // namespace mongo
diff --git a/src/mongo/db/pipeline/lite_parsed_document_source.h b/src/mongo/db/pipeline/lite_parsed_document_source.h
index da358775ab8..ad82339f36d 100644
--- a/src/mongo/db/pipeline/lite_parsed_document_source.h
+++ b/src/mongo/db/pipeline/lite_parsed_document_source.h
@@ -90,13 +90,6 @@ public:
virtual bool isCollStats() const {
return false;
}
-
- /**
- * Returns true if this is a $changeNotification stage.
- */
- virtual bool isChangeNotification() const {
- return false;
- }
};
class LiteParsedDocumentSourceDefault final : public LiteParsedDocumentSource {
diff --git a/src/mongo/db/pipeline/lite_parsed_pipeline.h b/src/mongo/db/pipeline/lite_parsed_pipeline.h
index ca2e9dfda77..c39d9f1031e 100644
--- a/src/mongo/db/pipeline/lite_parsed_pipeline.h
+++ b/src/mongo/db/pipeline/lite_parsed_pipeline.h
@@ -78,13 +78,6 @@ public:
return !_stageSpecs.empty() && _stageSpecs.front()->isCollStats();
}
- /**
- * Returns true if the pipeline begins with a $changeNotification stage.
- */
- bool startsWithChangeNotification() const {
- return !_stageSpecs.empty() && _stageSpecs.front()->isChangeNotification();
- }
-
private:
std::vector<std::unique_ptr<LiteParsedDocumentSource>> _stageSpecs;
};
diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp
index 9c0eb1ec9bd..06777b9d162 100644
--- a/src/mongo/db/pipeline/pipeline_d.cpp
+++ b/src/mongo/db/pipeline/pipeline_d.cpp
@@ -210,7 +210,7 @@ public:
uassert(4567, "from collection cannot be sharded", !bool(css->getMetadata()));
PipelineD::prepareCursorSource(
- autoColl.getCollection(), expCtx->ns, nullptr, pipeline.getValue().get());
+ autoColl.getCollection(), nullptr, pipeline.getValue().get());
return pipeline;
}
@@ -368,14 +368,13 @@ StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> createRandomCursorEx
StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> attemptToGetExecutor(
OperationContext* opCtx,
Collection* collection,
- const NamespaceString& nss,
const intrusive_ptr<ExpressionContext>& pExpCtx,
BSONObj queryObj,
BSONObj projectionObj,
BSONObj sortObj,
const AggregationRequest* aggRequest,
const size_t plannerOpts) {
- auto qr = stdx::make_unique<QueryRequest>(nss);
+ auto qr = stdx::make_unique<QueryRequest>(pExpCtx->ns);
qr->setFilter(queryObj);
qr->setProj(projectionObj);
qr->setSort(sortObj);
@@ -394,7 +393,7 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> attemptToGetExe
qr->setCollation(pExpCtx->getCollator() ? pExpCtx->getCollator()->getSpec().toBSON()
: pExpCtx->collation);
- const ExtensionsCallbackReal extensionsCallback(pExpCtx->opCtx, &nss);
+ const ExtensionsCallbackReal extensionsCallback(pExpCtx->opCtx, &pExpCtx->ns);
auto cq = CanonicalQuery::canonicalize(opCtx, std::move(qr), extensionsCallback);
@@ -413,11 +412,10 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> attemptToGetExe
} // namespace
void PipelineD::prepareCursorSource(Collection* collection,
- const NamespaceString& nss,
const AggregationRequest* aggRequest,
Pipeline* pipeline) {
auto expCtx = pipeline->getContext();
- dassert(expCtx->opCtx->lockState()->isCollectionLockedForMode(nss.ns(), MODE_IS));
+ dassert(expCtx->opCtx->lockState()->isCollectionLockedForMode(expCtx->ns.ns(), MODE_IS));
// We will be modifying the source vector as we go.
Pipeline::SourceContainer& sources = pipeline->_sources;
@@ -503,7 +501,7 @@ void PipelineD::prepareCursorSource(Collection* collection,
// Create the PlanExecutor.
auto exec = uassertStatusOK(prepareExecutor(expCtx->opCtx,
collection,
- nss,
+ expCtx->ns,
pipeline,
expCtx,
sortStage,
@@ -584,7 +582,6 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PipelineD::prep
// See if the query system can provide a non-blocking sort.
auto swExecutorSort = attemptToGetExecutor(opCtx,
collection,
- nss,
expCtx,
queryObj,
emptyProjection,
@@ -596,7 +593,6 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PipelineD::prep
// Success! Now see if the query system can also cover the projection.
auto swExecutorSortAndProj = attemptToGetExecutor(opCtx,
collection,
- nss,
expCtx,
queryObj,
*projectionObj,
@@ -643,15 +639,8 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PipelineD::prep
dassert(sortObj->isEmpty());
// See if the query system can cover the projection.
- auto swExecutorProj = attemptToGetExecutor(opCtx,
- collection,
- nss,
- expCtx,
- queryObj,
- *projectionObj,
- *sortObj,
- aggRequest,
- plannerOpts);
+ auto swExecutorProj = attemptToGetExecutor(
+ opCtx, collection, expCtx, queryObj, *projectionObj, *sortObj, aggRequest, plannerOpts);
if (swExecutorProj.isOK()) {
// Success! We have a covered projection.
return std::move(swExecutorProj.getValue());
@@ -665,15 +654,8 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PipelineD::prep
// The query system couldn't provide a covered projection.
*projectionObj = BSONObj();
// If this doesn't work, nothing will.
- return attemptToGetExecutor(opCtx,
- collection,
- nss,
- expCtx,
- queryObj,
- *projectionObj,
- *sortObj,
- aggRequest,
- plannerOpts);
+ return attemptToGetExecutor(
+ opCtx, collection, expCtx, queryObj, *projectionObj, *sortObj, aggRequest, plannerOpts);
}
void PipelineD::addCursorSource(Collection* collection,
diff --git a/src/mongo/db/pipeline/pipeline_d.h b/src/mongo/db/pipeline/pipeline_d.h
index 3248459fbf4..8c06a9e2f27 100644
--- a/src/mongo/db/pipeline/pipeline_d.h
+++ b/src/mongo/db/pipeline/pipeline_d.h
@@ -71,12 +71,11 @@ public:
*
* The cursor is added to the front of the pipeline's sources.
*
- * Callers must take care to ensure that 'nss' is locked in at least IS-mode.
+ * Callers must take care to ensure that 'collection' is locked in at least IS-mode.
*
* When not null, 'aggRequest' provides access to pipeline command options such as hint.
*/
static void prepareCursorSource(Collection* collection,
- const NamespaceString& nss,
const AggregationRequest* aggRequest,
Pipeline* pipeline);