diff options
author | James Wahlin <james@mongodb.com> | 2017-06-07 14:59:10 -0400 |
---|---|---|
committer | James Wahlin <james@mongodb.com> | 2017-06-07 14:59:10 -0400 |
commit | b85788f99e2f7d309582bd9d31ffc2769de72405 (patch) | |
tree | fdf5348758f1713163bb7ae63c081b64a15547ef /src | |
parent | 103972cebb1cdfccbc76c2afd82c824ed02fc11c (diff) | |
download | mongo-b85788f99e2f7d309582bd9d31ffc2769de72405.tar.gz |
Revert "SERVER-29125 Add $changeNotification stage that always outputs the single last oplog entry, unmodified"
This reverts commit bbfbb9f0ee75db566ac236c6c12daad71e3d3e10.
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/commands/run_aggregate.cpp | 12 | ||||
-rw-r--r-- | src/mongo/db/pipeline/SConscript | 2 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_change_notification.cpp | 84 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_change_notification.h | 75 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_change_notification_test.cpp | 93 | ||||
-rw-r--r-- | src/mongo/db/pipeline/lite_parsed_document_source.h | 7 | ||||
-rw-r--r-- | src/mongo/db/pipeline/lite_parsed_pipeline.h | 7 | ||||
-rw-r--r-- | src/mongo/db/pipeline/pipeline_d.cpp | 36 | ||||
-rw-r--r-- | src/mongo/db/pipeline/pipeline_d.h | 3 |
9 files changed, 13 insertions, 306 deletions
diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp index 123bd491456..02dd220d340 100644 --- a/src/mongo/db/commands/run_aggregate.cpp +++ b/src/mongo/db/commands/run_aggregate.cpp @@ -54,7 +54,6 @@ #include "mongo/db/query/find_common.h" #include "mongo/db/query/get_executor.h" #include "mongo/db/query/plan_summary_stats.h" -#include "mongo/db/repl/oplog.h" #include "mongo/db/service_context.h" #include "mongo/db/storage/storage_options.h" #include "mongo/db/views/view.h" @@ -271,7 +270,7 @@ Status runAggregate(OperationContext* opCtx, const BSONObj& cmdObj, BSONObjBuilder& result) { // For operations on views, this will be the underlying namespace. - NamespaceString nss = request.getNamespaceString(); + const NamespaceString& nss = request.getNamespaceString(); // Parse the user-specified collation, if any. std::unique_ptr<CollatorInterface> userSpecifiedCollator = request.getCollation().isEmpty() @@ -284,11 +283,6 @@ Status runAggregate(OperationContext* opCtx, Pipeline* unownedPipeline; auto curOp = CurOp::get(opCtx); { - const LiteParsedPipeline liteParsedPipeline(request); - if (liteParsedPipeline.startsWithChangeNotification()) { - nss = NamespaceString(repl::rsOplogName); - } - // This will throw if the sharding version for this connection is out of date. If the // namespace is a view, the lock will be released before re-running the aggregation. AutoGetCollectionOrViewForReadCommand ctx(opCtx, nss); @@ -299,8 +293,8 @@ Status runAggregate(OperationContext* opCtx, // recursively calling runAggregate(), which will re-acquire locks on the underlying // collection. (The lock must be released because recursively acquiring locks on the // database will prohibit yielding.) + const LiteParsedPipeline liteParsedPipeline(request); if (ctx.getView() && !liteParsedPipeline.startsWithCollStats()) { - invariant(nss != repl::rsOplogName); // Check that the default collation of 'view' is compatible with the operation's // collation. The check is skipped if the 'request' has the empty collation, which // means that no collation was specified. @@ -396,7 +390,7 @@ Status runAggregate(OperationContext* opCtx, // This does mongod-specific stuff like creating the input PlanExecutor and adding // it to the front of the pipeline if needed. - PipelineD::prepareCursorSource(collection, nss, &request, pipeline.get()); + PipelineD::prepareCursorSource(collection, &request, pipeline.get()); // Transfer ownership of the Pipeline to the PipelineProxyStage. unownedPipeline = pipeline.get(); diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index 6c1ee4f7772..f8b58061d5c 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -127,7 +127,6 @@ env.CppUnitTest( 'document_source_add_fields_test.cpp', 'document_source_bucket_auto_test.cpp', 'document_source_bucket_test.cpp', - 'document_source_change_notification_test.cpp', 'document_source_count_test.cpp', 'document_source_current_op_test.cpp', 'document_source_geo_near_test.cpp', @@ -227,7 +226,6 @@ docSourceEnv.Library( 'document_source_add_fields.cpp', 'document_source_bucket.cpp', 'document_source_bucket_auto.cpp', - 'document_source_change_notification.cpp', 'document_source_coll_stats.cpp', 'document_source_count.cpp', 'document_source_current_op.cpp', diff --git a/src/mongo/db/pipeline/document_source_change_notification.cpp b/src/mongo/db/pipeline/document_source_change_notification.cpp deleted file mode 100644 index 5fc130d83a2..00000000000 --- a/src/mongo/db/pipeline/document_source_change_notification.cpp +++ /dev/null @@ -1,84 +0,0 @@ -/** - * Copyright (C) 2017 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kCommand - -#include "mongo/db/pipeline/document_source_change_notification.h" - -#include "mongo/db/pipeline/document_source_limit.h" -#include "mongo/db/pipeline/document_source_match.h" -#include "mongo/db/pipeline/document_source_sort.h" -#include "mongo/db/pipeline/expression.h" -#include "mongo/db/pipeline/lite_parsed_document_source.h" -#include "mongo/util/log.h" - -namespace mongo { - -using boost::intrusive_ptr; -using std::vector; - -REGISTER_MULTI_STAGE_ALIAS(changeNotification, - DocumentSourceChangeNotification::LiteParsed::parse, - DocumentSourceChangeNotification::createFromBson); - -BSONObj DocumentSourceChangeNotification::buildMatch(BSONElement elem, const NamespaceString& nss) { - auto target = nss.toString(); - return BSON("$match" << BSON( - "op" << BSON("$ne" - << "n") - << "ts" - << BSON("$gt" << Timestamp()) - << "$or" - << BSON_ARRAY(BSON("ns" << target) - << BSON("op" - << "c" - << "$or" - << BSON_ARRAY(BSON("o.renameCollection" << target) - << BSON("o.to" << target)))))); -} - -vector<intrusive_ptr<DocumentSource>> DocumentSourceChangeNotification::createFromBson( - BSONElement elem, const intrusive_ptr<ExpressionContext>& expCtx) { - // TODO: Add sharding support here (SERVER-29141). - uassert(40470, - "The $changeNotification stage is not supported on sharded systems.", - !expCtx->inRouter); - uassert(40471, - "Only default collation is allowed when using a $changeNotification stage.", - !expCtx->getCollator()); - - BSONObj matchObj = buildMatch(elem, expCtx->ns); - BSONObj sortObj = BSON("$sort" << BSON("ts" << -1)); - - auto matchSource = DocumentSourceMatch::createFromBson(matchObj.firstElement(), expCtx); - auto sortSource = DocumentSourceSort::createFromBson(sortObj.firstElement(), expCtx); - auto limitSource = DocumentSourceLimit::create(expCtx, 1); - return {matchSource, sortSource, limitSource}; -} - -} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_change_notification.h b/src/mongo/db/pipeline/document_source_change_notification.h deleted file mode 100644 index f18ad52b26d..00000000000 --- a/src/mongo/db/pipeline/document_source_change_notification.h +++ /dev/null @@ -1,75 +0,0 @@ -/** - * Copyright (C) 2017 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects - * for all of the code used other than as permitted herein. If you modify - * file(s) with this exception, you may extend this exception to your - * version of the file(s), but you are not obligated to do so. If you do not - * wish to do so, delete this exception statement from your version. If you - * delete this exception statement from all source files in the program, - * then also delete it in the license file. - */ - -#pragma once - -#include "mongo/db/pipeline/document_source.h" - -namespace mongo { - -/** - * The $changeNotification stage is an alias for a cursor on oplog followed by a $match stage and a - * transform stage on mongod. - */ -class DocumentSourceChangeNotification final { -public: - class LiteParsed final : public LiteParsedDocumentSource { - public: - static std::unique_ptr<LiteParsed> parse(const AggregationRequest& request, - const BSONElement& spec) { - return stdx::make_unique<LiteParsed>(); - } - - bool isChangeNotification() const final { - return true; - } - - stdx::unordered_set<NamespaceString> getInvolvedNamespaces() const final { - return stdx::unordered_set<NamespaceString>(); - } - }; - - /** - * Produce the BSON for the $match stage based on a $changeNotification stage. - */ - static BSONObj buildMatch(BSONElement elem, const NamespaceString& nss); - - /** - * Parses a $changeNotification stage from 'elem' and produces the $match and transformation - * stages required. - */ - static std::vector<boost::intrusive_ptr<DocumentSource>> createFromBson( - BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx); - -private: - // It is illegal to construct a DocumentSourceChangeNotification directly, use createFromBson() - // instead. - DocumentSourceChangeNotification() = default; -}; - -} // namespace mongo diff --git a/src/mongo/db/pipeline/document_source_change_notification_test.cpp b/src/mongo/db/pipeline/document_source_change_notification_test.cpp deleted file mode 100644 index c75677507c1..00000000000 --- a/src/mongo/db/pipeline/document_source_change_notification_test.cpp +++ /dev/null @@ -1,93 +0,0 @@ -/** - * Copyright (C) 2016 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects - * for all of the code used other than as permitted herein. If you modify - * file(s) with this exception, you may extend this exception to your - * version of the file(s), but you are not obligated to do so. If you do not - * wish to do so, delete this exception statement from your version. If you - * delete this exception statement from all source files in the program, - * then also delete it in the license file. - */ - -#include "mongo/platform/basic.h" - -#include <boost/intrusive_ptr.hpp> -#include <vector> - -#include "mongo/bson/bsonobj.h" -#include "mongo/bson/json.h" -#include "mongo/db/pipeline/aggregation_context_fixture.h" -#include "mongo/db/pipeline/document.h" -#include "mongo/db/pipeline/document_source_change_notification.h" -#include "mongo/db/pipeline/document_source_limit.h" -#include "mongo/db/pipeline/document_source_match.h" -#include "mongo/db/pipeline/document_source_sort.h" -#include "mongo/db/pipeline/document_value_test_util.h" -#include "mongo/db/pipeline/value.h" -#include "mongo/db/pipeline/value_comparator.h" -#include "mongo/unittest/unittest.h" - -namespace mongo { -namespace { - -using std::vector; -using boost::intrusive_ptr; - -using ChangeNotificationStageTest = AggregationContextFixture; - -TEST_F(ChangeNotificationStageTest, Basic) { - const auto spec = fromjson("{$changeNotification: {}}"); - - vector<intrusive_ptr<DocumentSource>> result = - DocumentSourceChangeNotification::createFromBson(spec.firstElement(), getExpCtx()); - - ASSERT_EQUALS(result.size(), 3UL); - - const auto* matchStage = dynamic_cast<DocumentSourceMatch*>(result[0].get()); - ASSERT(matchStage); - const std::string target = "unittests.pipeline_test"; - ASSERT_BSONOBJ_EQ(matchStage->getQuery(), - BSON("op" << BSON("$ne" - << "n") - << "ts" - << BSON("$gt" << Timestamp()) - << "$or" - << BSON_ARRAY(BSON("ns" << target) << BSON( - "op" - << "c" - << "$or" - << BSON_ARRAY(BSON("o.renameCollection" << target) - << BSON("o.to" << target)))))); - - auto* sortStage = dynamic_cast<DocumentSourceSort*>(result[1].get()); - ASSERT(sortStage); - BSONObjSet outputSort = sortStage->getOutputSorts(); - ASSERT_EQUALS(outputSort.count(BSON("ts" << -1)), 1U); - ASSERT_EQUALS(outputSort.size(), 1U); - - const auto* limitStage = dynamic_cast<DocumentSourceLimit*>(result[2].get()); - ASSERT(limitStage); - ASSERT_EQUALS(limitStage->getLimit(), 1); - - // TODO: Check explain result. -} - -} // namespace -} // namespace mongo diff --git a/src/mongo/db/pipeline/lite_parsed_document_source.h b/src/mongo/db/pipeline/lite_parsed_document_source.h index da358775ab8..ad82339f36d 100644 --- a/src/mongo/db/pipeline/lite_parsed_document_source.h +++ b/src/mongo/db/pipeline/lite_parsed_document_source.h @@ -90,13 +90,6 @@ public: virtual bool isCollStats() const { return false; } - - /** - * Returns true if this is a $changeNotification stage. - */ - virtual bool isChangeNotification() const { - return false; - } }; class LiteParsedDocumentSourceDefault final : public LiteParsedDocumentSource { diff --git a/src/mongo/db/pipeline/lite_parsed_pipeline.h b/src/mongo/db/pipeline/lite_parsed_pipeline.h index ca2e9dfda77..c39d9f1031e 100644 --- a/src/mongo/db/pipeline/lite_parsed_pipeline.h +++ b/src/mongo/db/pipeline/lite_parsed_pipeline.h @@ -78,13 +78,6 @@ public: return !_stageSpecs.empty() && _stageSpecs.front()->isCollStats(); } - /** - * Returns true if the pipeline begins with a $changeNotification stage. - */ - bool startsWithChangeNotification() const { - return !_stageSpecs.empty() && _stageSpecs.front()->isChangeNotification(); - } - private: std::vector<std::unique_ptr<LiteParsedDocumentSource>> _stageSpecs; }; diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp index 9c0eb1ec9bd..06777b9d162 100644 --- a/src/mongo/db/pipeline/pipeline_d.cpp +++ b/src/mongo/db/pipeline/pipeline_d.cpp @@ -210,7 +210,7 @@ public: uassert(4567, "from collection cannot be sharded", !bool(css->getMetadata())); PipelineD::prepareCursorSource( - autoColl.getCollection(), expCtx->ns, nullptr, pipeline.getValue().get()); + autoColl.getCollection(), nullptr, pipeline.getValue().get()); return pipeline; } @@ -368,14 +368,13 @@ StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> createRandomCursorEx StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> attemptToGetExecutor( OperationContext* opCtx, Collection* collection, - const NamespaceString& nss, const intrusive_ptr<ExpressionContext>& pExpCtx, BSONObj queryObj, BSONObj projectionObj, BSONObj sortObj, const AggregationRequest* aggRequest, const size_t plannerOpts) { - auto qr = stdx::make_unique<QueryRequest>(nss); + auto qr = stdx::make_unique<QueryRequest>(pExpCtx->ns); qr->setFilter(queryObj); qr->setProj(projectionObj); qr->setSort(sortObj); @@ -394,7 +393,7 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> attemptToGetExe qr->setCollation(pExpCtx->getCollator() ? pExpCtx->getCollator()->getSpec().toBSON() : pExpCtx->collation); - const ExtensionsCallbackReal extensionsCallback(pExpCtx->opCtx, &nss); + const ExtensionsCallbackReal extensionsCallback(pExpCtx->opCtx, &pExpCtx->ns); auto cq = CanonicalQuery::canonicalize(opCtx, std::move(qr), extensionsCallback); @@ -413,11 +412,10 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> attemptToGetExe } // namespace void PipelineD::prepareCursorSource(Collection* collection, - const NamespaceString& nss, const AggregationRequest* aggRequest, Pipeline* pipeline) { auto expCtx = pipeline->getContext(); - dassert(expCtx->opCtx->lockState()->isCollectionLockedForMode(nss.ns(), MODE_IS)); + dassert(expCtx->opCtx->lockState()->isCollectionLockedForMode(expCtx->ns.ns(), MODE_IS)); // We will be modifying the source vector as we go. Pipeline::SourceContainer& sources = pipeline->_sources; @@ -503,7 +501,7 @@ void PipelineD::prepareCursorSource(Collection* collection, // Create the PlanExecutor. auto exec = uassertStatusOK(prepareExecutor(expCtx->opCtx, collection, - nss, + expCtx->ns, pipeline, expCtx, sortStage, @@ -584,7 +582,6 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PipelineD::prep // See if the query system can provide a non-blocking sort. auto swExecutorSort = attemptToGetExecutor(opCtx, collection, - nss, expCtx, queryObj, emptyProjection, @@ -596,7 +593,6 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PipelineD::prep // Success! Now see if the query system can also cover the projection. auto swExecutorSortAndProj = attemptToGetExecutor(opCtx, collection, - nss, expCtx, queryObj, *projectionObj, @@ -643,15 +639,8 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PipelineD::prep dassert(sortObj->isEmpty()); // See if the query system can cover the projection. - auto swExecutorProj = attemptToGetExecutor(opCtx, - collection, - nss, - expCtx, - queryObj, - *projectionObj, - *sortObj, - aggRequest, - plannerOpts); + auto swExecutorProj = attemptToGetExecutor( + opCtx, collection, expCtx, queryObj, *projectionObj, *sortObj, aggRequest, plannerOpts); if (swExecutorProj.isOK()) { // Success! We have a covered projection. return std::move(swExecutorProj.getValue()); @@ -665,15 +654,8 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> PipelineD::prep // The query system couldn't provide a covered projection. *projectionObj = BSONObj(); // If this doesn't work, nothing will. - return attemptToGetExecutor(opCtx, - collection, - nss, - expCtx, - queryObj, - *projectionObj, - *sortObj, - aggRequest, - plannerOpts); + return attemptToGetExecutor( + opCtx, collection, expCtx, queryObj, *projectionObj, *sortObj, aggRequest, plannerOpts); } void PipelineD::addCursorSource(Collection* collection, diff --git a/src/mongo/db/pipeline/pipeline_d.h b/src/mongo/db/pipeline/pipeline_d.h index 3248459fbf4..8c06a9e2f27 100644 --- a/src/mongo/db/pipeline/pipeline_d.h +++ b/src/mongo/db/pipeline/pipeline_d.h @@ -71,12 +71,11 @@ public: * * The cursor is added to the front of the pipeline's sources. * - * Callers must take care to ensure that 'nss' is locked in at least IS-mode. + * Callers must take care to ensure that 'collection' is locked in at least IS-mode. * * When not null, 'aggRequest' provides access to pipeline command options such as hint. */ static void prepareCursorSource(Collection* collection, - const NamespaceString& nss, const AggregationRequest* aggRequest, Pipeline* pipeline); |