diff options
author | Charlie Swanson <charlie.swanson@mongodb.com> | 2017-09-05 11:23:54 -0400 |
---|---|---|
committer | Charlie Swanson <charlie.swanson@mongodb.com> | 2017-09-13 14:44:09 -0400 |
commit | fe125855b6b3e8feb9d7d666338a7f2d29d301ad (patch) | |
tree | c682c408675b895bd343dd7187de8be18e875f66 /src/mongo | |
parent | 61d1cfbf2c8521126506c12bcd2d187a7926fbe0 (diff) | |
download | mongo-fe125855b6b3e8feb9d7d666338a7f2d29d301ad.tar.gz |
SERVER-29142 Support $changeStream on unsharded collections.
Diffstat (limited to 'src/mongo')
23 files changed, 230 insertions, 118 deletions
diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp index 3350f43e4b5..c8d63487229 100644 --- a/src/mongo/db/commands/run_aggregate.cpp +++ b/src/mongo/db/commands/run_aggregate.cpp @@ -409,7 +409,7 @@ Status runAggregate(OperationContext* opCtx, expCtx->tempDir = storageGlobalParams.dbpath + "/_tmp"; if (liteParsedPipeline.hasChangeStream()) { - expCtx->tailableMode = ExpressionContext::TailableMode::kTailableAndAwaitData; + expCtx->tailableMode = TailableMode::kTailableAndAwaitData; } auto pipeline = uassertStatusOK(Pipeline::parse(request.getPipeline(), expCtx)); @@ -471,7 +471,7 @@ Status runAggregate(OperationContext* opCtx, AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(), opCtx->recoveryUnit()->isReadingFromMajorityCommittedSnapshot(), cmdObj); - if (expCtx->tailableMode == ExpressionContext::TailableMode::kTailableAndAwaitData) { + if (expCtx->tailableMode == TailableMode::kTailableAndAwaitData) { cursorParams.setTailable(true); cursorParams.setAwaitData(true); } diff --git a/src/mongo/db/pipeline/document_source_change_stream.h b/src/mongo/db/pipeline/document_source_change_stream.h index 1b637abbd6c..a33322b80dd 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.h +++ b/src/mongo/db/pipeline/document_source_change_stream.h @@ -62,10 +62,6 @@ public: PrivilegeVector requiredPrivileges(bool isMongos) const final { return {}; } - - bool allowedToForwardFromMongos() const final { - return false; - } }; class Transformation : public DocumentSourceSingleDocumentTransformation::TransformerInterface { diff --git a/src/mongo/db/pipeline/expression_context.h b/src/mongo/db/pipeline/expression_context.h index ad7052968c1..037f1b81383 100644 --- a/src/mongo/db/pipeline/expression_context.h +++ b/src/mongo/db/pipeline/expression_context.h @@ -43,6 +43,7 @@ #include "mongo/db/pipeline/variables.h" #include "mongo/db/query/collation/collator_interface.h" #include "mongo/db/query/explain_options.h" +#include "mongo/db/query/tailable_mode.h" #include "mongo/util/intrusive_counter.h" #include "mongo/util/string_map.h" @@ -58,8 +59,6 @@ public: std::vector<BSONObj> pipeline; }; - enum class TailableMode { kNormal, kTailableAndAwaitData }; - /** * Constructs an ExpressionContext to be used for Pipeline parsing and evaluation. * 'resolvedNamespaces' maps collection names (not full namespaces) to ResolvedNamespaces. @@ -107,7 +106,7 @@ public: * Convenience call that returns true if the tailableMode indicate a tailable query. */ bool isTailable() const { - return tailableMode == ExpressionContext::TailableMode::kTailableAndAwaitData; + return tailableMode == TailableMode::kTailableAndAwaitData; } // The explain verbosity requested by the user, or boost::none if no explain was requested. diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp index 7f6a7c5fc49..66d4baf9379 100644 --- a/src/mongo/db/pipeline/pipeline_d.cpp +++ b/src/mongo/db/pipeline/pipeline_d.cpp @@ -400,14 +400,7 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> attemptToGetExe const AggregationRequest* aggRequest, const size_t plannerOpts) { auto qr = stdx::make_unique<QueryRequest>(nss); - switch (pExpCtx->tailableMode) { - case ExpressionContext::TailableMode::kNormal: - break; - case ExpressionContext::TailableMode::kTailableAndAwaitData: - qr->setTailable(true); - qr->setAwaitData(true); - break; - } + qr->setTailableMode(pExpCtx->tailableMode); qr->setOplogReplay(oplogReplay); qr->setFilter(queryObj); qr->setProj(projectionObj); diff --git a/src/mongo/db/query/SConscript b/src/mongo/db/query/SConscript index 657e485348e..6701dc560f7 100644 --- a/src/mongo/db/query/SConscript +++ b/src/mongo/db/query/SConscript @@ -241,7 +241,8 @@ env.Library( env.Library( target="query_request", source=[ - "query_request.cpp" + "query_request.cpp", + "tailable_mode.cpp" ], LIBDEPS=[ "$BUILD_DIR/mongo/base", diff --git a/src/mongo/db/query/canonical_query_test.cpp b/src/mongo/db/query/canonical_query_test.cpp index 691079a9560..04048242c64 100644 --- a/src/mongo/db/query/canonical_query_test.cpp +++ b/src/mongo/db/query/canonical_query_test.cpp @@ -169,7 +169,7 @@ TEST(CanonicalQueryTest, IsValidText) { TEST(CanonicalQueryTest, IsValidTextTailable) { // Filter inside QueryRequest is not used. auto qr = stdx::make_unique<QueryRequest>(nss); - qr->setTailable(true); + qr->setTailableMode(TailableMode::kTailable); ASSERT_OK(qr->validate()); // Invalid: TEXT and tailable. diff --git a/src/mongo/db/query/plan_executor.cpp b/src/mongo/db/query/plan_executor.cpp index dbeb1d56b71..ed443a2b09d 100644 --- a/src/mongo/db/query/plan_executor.cpp +++ b/src/mongo/db/query/plan_executor.cpp @@ -422,7 +422,7 @@ PlanExecutor::ExecState PlanExecutor::getNextSnapshotted(Snapshotted<BSONObj>* o bool PlanExecutor::shouldWaitForInserts() { // If this is an awaitData-respecting operation and we have time left and we're not interrupted, // we should wait for inserts. - if (_cq && _cq->getQueryRequest().isTailable() && _cq->getQueryRequest().isAwaitData() && + if (_cq && _cq->getQueryRequest().isTailableAndAwaitData() && mongo::shouldWaitForInserts(_opCtx) && _opCtx->checkForInterruptNoAssert().isOK() && _opCtx->getRemainingMaxTimeMicros() > Microseconds::zero()) { // We expect awaitData cursors to be yielding. @@ -689,9 +689,9 @@ void PlanExecutor::enqueue(const BSONObj& obj) { PlanExecutor::ExecState PlanExecutor::swallowTimeoutIfAwaitData( Status yieldError, Snapshotted<BSONObj>* errorObj) const { if (yieldError == ErrorCodes::ExceededTimeLimit) { - if (_cq && _cq->getQueryRequest().isTailable() && _cq->getQueryRequest().isAwaitData()) { - // If the cursor is tailable then exceeding the time limit should not - // destroy this PlanExecutor, we should just stop waiting for inserts. + if (_cq && _cq->getQueryRequest().isTailableAndAwaitData()) { + // If the cursor is tailable then exceeding the time limit should not destroy this + // PlanExecutor, we should just stop waiting for inserts. return PlanExecutor::IS_EOF; } } diff --git a/src/mongo/db/query/query_request.cpp b/src/mongo/db/query/query_request.cpp index 00a8915c1c4..cd09a591955 100644 --- a/src/mongo/db/query/query_request.cpp +++ b/src/mongo/db/query/query_request.cpp @@ -127,6 +127,8 @@ StatusWith<unique_ptr<QueryRequest>> QueryRequest::parseFromFindCommand(unique_p const BSONObj& cmdObj, bool isExplain) { qr->_explain = isExplain; + bool tailable = false; + bool awaitData = false; // Parse the command BSON by looping through one element at a time. BSONObjIterator it(cmdObj); @@ -316,7 +318,7 @@ StatusWith<unique_ptr<QueryRequest>> QueryRequest::parseFromFindCommand(unique_p return status; } - qr->_tailable = el.boolean(); + tailable = el.boolean(); } else if (fieldName == kOplogReplayField) { Status status = checkFieldType(el, Bool); if (!status.isOK()) { @@ -337,7 +339,7 @@ StatusWith<unique_ptr<QueryRequest>> QueryRequest::parseFromFindCommand(unique_p return status; } - qr->_awaitData = el.boolean(); + awaitData = el.boolean(); } else if (fieldName == kPartialResultsField) { Status status = checkFieldType(el, Bool); if (!status.isOK()) { @@ -385,6 +387,11 @@ StatusWith<unique_ptr<QueryRequest>> QueryRequest::parseFromFindCommand(unique_p } } + auto tailableMode = tailableModeFromBools(tailable, awaitData); + if (!tailableMode.isOK()) { + return tailableMode.getStatus(); + } + qr->_tailableMode = tailableMode.getValue(); qr->addMetaProjection(); Status validateStatus = qr->validate(); @@ -494,8 +501,19 @@ void QueryRequest::asFindCommand(BSONObjBuilder* cmdBuilder) const { cmdBuilder->append(kSnapshotField, true); } - if (_tailable) { - cmdBuilder->append(kTailableField, true); + switch (_tailableMode) { + case TailableMode::kTailable: { + cmdBuilder->append(kTailableField, true); + break; + } + case TailableMode::kTailableAndAwaitData: { + cmdBuilder->append(kTailableField, true); + cmdBuilder->append(kAwaitDataField, true); + break; + } + case TailableMode::kNormal: { + break; + } } if (_oplogReplay) { @@ -506,10 +524,6 @@ void QueryRequest::asFindCommand(BSONObjBuilder* cmdBuilder) const { cmdBuilder->append(kNoCursorTimeoutField, true); } - if (_awaitData) { - cmdBuilder->append(kAwaitDataField, true); - } - if (_allowPartialResults) { cmdBuilder->append(kPartialResultsField, true); } @@ -625,7 +639,7 @@ Status QueryRequest::validate() const { << _maxTimeMS); } - if (_tailable) { + if (_tailableMode != TailableMode::kNormal) { // Tailable cursors cannot have any sort other than {$natural: 1}. const BSONObj expectedSort = BSON(kNaturalSortField << 1); if (!_sort.isEmpty() && @@ -641,10 +655,6 @@ Status QueryRequest::validate() const { } } - if (_awaitData && !_tailable) { - return Status(ErrorCodes::BadValue, "Cannot set awaitData without tailable"); - } - return Status::OK(); } @@ -917,7 +927,8 @@ Status QueryRequest::initFullQuery(const BSONObj& top) { } _maxTimeMS = maxTimeMS.getValue(); } else if (str::equals("comment", name)) { - // Legacy $comment can be any BSON element. Convert to string if it isn't already. + // Legacy $comment can be any BSON element. Convert to string if it isn't + // already. if (e.type() == BSONType::String) { _comment = e.str(); } else { @@ -932,8 +943,11 @@ Status QueryRequest::initFullQuery(const BSONObj& top) { int QueryRequest::getOptions() const { int options = 0; - if (_tailable) { + if (_tailableMode == TailableMode::kTailable) { + options |= QueryOption_CursorTailable; + } else if (_tailableMode == TailableMode::kTailableAndAwaitData) { options |= QueryOption_CursorTailable; + options |= QueryOption_AwaitData; } if (_slaveOk) { options |= QueryOption_SlaveOk; @@ -944,9 +958,6 @@ int QueryRequest::getOptions() const { if (_noCursorTimeout) { options |= QueryOption_NoCursorTimeout; } - if (_awaitData) { - options |= QueryOption_AwaitData; - } if (_exhaust) { options |= QueryOption_Exhaust; } @@ -957,11 +968,12 @@ int QueryRequest::getOptions() const { } void QueryRequest::initFromInt(int options) { - _tailable = (options & QueryOption_CursorTailable) != 0; + bool tailable = (options & QueryOption_CursorTailable) != 0; + bool awaitData = (options & QueryOption_AwaitData) != 0; + _tailableMode = uassertStatusOK(tailableModeFromBools(tailable, awaitData)); _slaveOk = (options & QueryOption_SlaveOk) != 0; _oplogReplay = (options & QueryOption_OplogReplay) != 0; _noCursorTimeout = (options & QueryOption_NoCursorTimeout) != 0; - _awaitData = (options & QueryOption_AwaitData) != 0; _exhaust = (options & QueryOption_Exhaust) != 0; _allowPartialResults = (options & QueryOption_PartialResults) != 0; } @@ -1010,9 +1022,9 @@ StatusWith<BSONObj> QueryRequest::asAggregationCommand() const { return {ErrorCodes::InvalidPipelineOperator, str::stream() << "Option " << kSnapshotField << " not supported in aggregation."}; } - if (_tailable) { + if (isTailable()) { return {ErrorCodes::InvalidPipelineOperator, - str::stream() << "Option " << kTailableField << " not supported in aggregation."}; + "Tailable cursors are not supported in aggregation."}; } if (_oplogReplay) { return {ErrorCodes::InvalidPipelineOperator, @@ -1024,10 +1036,6 @@ StatusWith<BSONObj> QueryRequest::asAggregationCommand() const { str::stream() << "Option " << kNoCursorTimeoutField << " not supported in aggregation."}; } - if (_awaitData) { - return {ErrorCodes::InvalidPipelineOperator, - str::stream() << "Option " << kAwaitDataField << " not supported in aggregation."}; - } if (_allowPartialResults) { return {ErrorCodes::InvalidPipelineOperator, str::stream() << "Option " << kPartialResultsField diff --git a/src/mongo/db/query/query_request.h b/src/mongo/db/query/query_request.h index fa0f269b4b5..dbe89f8d1d3 100644 --- a/src/mongo/db/query/query_request.h +++ b/src/mongo/db/query/query_request.h @@ -35,6 +35,7 @@ #include "mongo/db/jsobj.h" #include "mongo/db/namespace_string.h" #include "mongo/db/operation_context.h" +#include "mongo/db/query/tailable_mode.h" namespace mongo { @@ -331,11 +332,20 @@ public: } bool isTailable() const { - return _tailable; + return _tailableMode == TailableMode::kTailable || + _tailableMode == TailableMode::kTailableAndAwaitData; } - void setTailable(bool tailable) { - _tailable = tailable; + bool isTailableAndAwaitData() const { + return _tailableMode == TailableMode::kTailableAndAwaitData; + } + + void setTailableMode(TailableMode tailableMode) { + _tailableMode = tailableMode; + } + + TailableMode getTailableMode() const { + return _tailableMode; } bool isSlaveOk() const { @@ -362,14 +372,6 @@ public: _noCursorTimeout = noCursorTimeout; } - bool isAwaitData() const { - return _awaitData; - } - - void setAwaitData(bool awaitData) { - _awaitData = awaitData; - } - bool isExhaust() const { return _exhaust; } @@ -510,11 +512,10 @@ private: bool _hasReadPref = false; // Options that can be specified in the OP_QUERY 'flags' header. - bool _tailable = false; + TailableMode _tailableMode = TailableMode::kNormal; bool _slaveOk = false; bool _oplogReplay = false; bool _noCursorTimeout = false; - bool _awaitData = false; bool _exhaust = false; bool _allowPartialResults = false; diff --git a/src/mongo/db/query/query_request_test.cpp b/src/mongo/db/query/query_request_test.cpp index 4910b769af9..a838cfd1882 100644 --- a/src/mongo/db/query/query_request_test.cpp +++ b/src/mongo/db/query/query_request_test.cpp @@ -444,7 +444,7 @@ TEST(QueryRequestTest, ParseFromCommandAllFlagsTrue) { ASSERT(!qr->isSlaveOk()); ASSERT(qr->isOplogReplay()); ASSERT(qr->isNoCursorTimeout()); - ASSERT(qr->isAwaitData()); + ASSERT(qr->isTailableAndAwaitData()); ASSERT(qr->isAllowPartialResults()); } @@ -1047,7 +1047,7 @@ TEST(QueryRequestTest, DefaultQueryParametersCorrect) { ASSERT_EQUALS(false, qr->isSlaveOk()); ASSERT_EQUALS(false, qr->isOplogReplay()); ASSERT_EQUALS(false, qr->isNoCursorTimeout()); - ASSERT_EQUALS(false, qr->isAwaitData()); + ASSERT_EQUALS(false, qr->isTailableAndAwaitData()); ASSERT_EQUALS(false, qr->isExhaust()); ASSERT_EQUALS(false, qr->isAllowPartialResults()); } @@ -1220,7 +1220,7 @@ TEST(QueryRequestTest, ConvertToAggregationWithSnapshotFails) { TEST(QueryRequestTest, ConvertToAggregationWithTailableFails) { QueryRequest qr(testns); - qr.setTailable(true); + qr.setTailableMode(TailableMode::kTailable); ASSERT_NOT_OK(qr.asAggregationCommand()); } @@ -1238,7 +1238,7 @@ TEST(QueryRequestTest, ConvertToAggregationWithNoCursorTimeoutFails) { TEST(QueryRequestTest, ConvertToAggregationWithAwaitDataFails) { QueryRequest qr(testns); - qr.setAwaitData(true); + qr.setTailableMode(TailableMode::kTailableAndAwaitData); ASSERT_NOT_OK(qr.asAggregationCommand()); } @@ -1388,7 +1388,7 @@ TEST(QueryRequestTest, ParseFromLegacyQuery) { ASSERT_EQ(qr->isSlaveOk(), false); ASSERT_EQ(qr->isOplogReplay(), false); ASSERT_EQ(qr->isNoCursorTimeout(), false); - ASSERT_EQ(qr->isAwaitData(), false); + ASSERT_EQ(qr->isTailable(), false); ASSERT_EQ(qr->isExhaust(), true); ASSERT_EQ(qr->isAllowPartialResults(), false); ASSERT_EQ(qr->getOptions(), QueryOption_Exhaust); diff --git a/src/mongo/db/query/tailable_mode.cpp b/src/mongo/db/query/tailable_mode.cpp new file mode 100644 index 00000000000..b19a1988672 --- /dev/null +++ b/src/mongo/db/query/tailable_mode.cpp @@ -0,0 +1,48 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/query/tailable_mode.h" + +namespace mongo { + +StatusWith<TailableMode> tailableModeFromBools(bool isTailable, bool isAwaitData) { + if (isTailable) { + if (isAwaitData) { + return TailableMode::kTailableAndAwaitData; + } + return TailableMode::kTailable; + } else if (isAwaitData) { + return {ErrorCodes::FailedToParse, + "Cannot set 'awaitData' without also setting 'tailable'"}; + } + return TailableMode::kNormal; +} + +} // namespace mongo diff --git a/src/mongo/db/query/tailable_mode.h b/src/mongo/db/query/tailable_mode.h new file mode 100644 index 00000000000..92c0fe9292e --- /dev/null +++ b/src/mongo/db/query/tailable_mode.h @@ -0,0 +1,48 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include "mongo/base/error_codes.h" +#include "mongo/base/status_with.h" + +namespace mongo { + +enum class TailableMode { + kNormal, + kTailable, + kTailableAndAwaitData, +}; + +/** + * Returns a TailableMode from two booleans, returning ErrorCodes::FailedToParse if awaitData is + * set without tailable. + */ +StatusWith<TailableMode> tailableModeFromBools(bool isTailable, bool isAwaitData); + +} // namespace mongo diff --git a/src/mongo/dbtests/documentsourcetests.cpp b/src/mongo/dbtests/documentsourcetests.cpp index f5bd327705f..9d5fc0b5162 100644 --- a/src/mongo/dbtests/documentsourcetests.cpp +++ b/src/mongo/dbtests/documentsourcetests.cpp @@ -331,8 +331,7 @@ TEST_F(DocumentSourceCursorTest, TailableAwaitDataCursorStillUsableAfterTimeout) opCtx(), collScanParams, workingSet.get(), matchExpression.get()); auto queryRequest = stdx::make_unique<QueryRequest>(nss); queryRequest->setFilter(filter); - queryRequest->setTailable(true); - queryRequest->setAwaitData(true); + queryRequest->setTailableMode(TailableMode::kTailableAndAwaitData); auto canonicalQuery = unittest::assertGet( CanonicalQuery::canonicalize(opCtx(), std::move(queryRequest), nullptr)); auto planExecutor = @@ -344,7 +343,7 @@ TEST_F(DocumentSourceCursorTest, TailableAwaitDataCursorStillUsableAfterTimeout) PlanExecutor::YieldPolicy::ALWAYS_TIME_OUT)); // Make a DocumentSourceCursor. - ctx()->tailableMode = ExpressionContext::TailableMode::kTailableAndAwaitData; + ctx()->tailableMode = TailableMode::kTailableAndAwaitData; // DocumentSourceCursor expects a PlanExecutor that has had its state saved. planExecutor->saveState(); auto cursor = @@ -382,7 +381,7 @@ TEST_F(DocumentSourceCursorTest, NonAwaitDataCursorShouldErrorAfterTimeout) { PlanExecutor::YieldPolicy::ALWAYS_TIME_OUT)); // Make a DocumentSourceCursor. - ctx()->tailableMode = ExpressionContext::TailableMode::kNormal; + ctx()->tailableMode = TailableMode::kNormal; // DocumentSourceCursor expects a PlanExecutor that has had its state saved. planExecutor->saveState(); auto cursor = @@ -412,6 +411,7 @@ TEST_F(DocumentSourceCursorTest, TailableAwaitDataCursorShouldErrorAfterBeingKil opCtx(), collScanParams, workingSet.get(), matchExpression.get()); auto queryRequest = stdx::make_unique<QueryRequest>(nss); queryRequest->setFilter(filter); + queryRequest->setTailableMode(TailableMode::kTailableAndAwaitData); auto canonicalQuery = unittest::assertGet( CanonicalQuery::canonicalize(opCtx(), std::move(queryRequest), nullptr)); auto planExecutor = @@ -423,7 +423,7 @@ TEST_F(DocumentSourceCursorTest, TailableAwaitDataCursorShouldErrorAfterBeingKil PlanExecutor::YieldPolicy::ALWAYS_MARK_KILLED)); // Make a DocumentSourceCursor. - ctx()->tailableMode = ExpressionContext::TailableMode::kTailableAndAwaitData; + ctx()->tailableMode = TailableMode::kTailableAndAwaitData; // DocumentSourceCursor expects a PlanExecutor that has had its state saved. planExecutor->saveState(); auto cursor = @@ -461,7 +461,7 @@ TEST_F(DocumentSourceCursorTest, NormalCursorShouldErrorAfterBeingKilled) { PlanExecutor::YieldPolicy::ALWAYS_MARK_KILLED)); // Make a DocumentSourceCursor. - ctx()->tailableMode = ExpressionContext::TailableMode::kNormal; + ctx()->tailableMode = TailableMode::kNormal; // DocumentSourceCursor expects a PlanExecutor that has had its state saved. planExecutor->saveState(); auto cursor = diff --git a/src/mongo/dbtests/query_plan_executor.cpp b/src/mongo/dbtests/query_plan_executor.cpp index 99f4548b631..99a45b8a402 100644 --- a/src/mongo/dbtests/query_plan_executor.cpp +++ b/src/mongo/dbtests/query_plan_executor.cpp @@ -99,8 +99,7 @@ public: Collection* coll, BSONObj& filterObj, PlanExecutor::YieldPolicy yieldPolicy = PlanExecutor::YieldPolicy::YIELD_MANUAL, - bool tailable = false, - bool awaitData = false) { + TailableMode tailableMode = TailableMode::kNormal) { CollectionScanParams csparams; csparams.collection = coll; csparams.direction = CollectionScanParams::FORWARD; @@ -109,8 +108,7 @@ public: // Canonicalize the query. auto qr = stdx::make_unique<QueryRequest>(nss); qr->setFilter(filterObj); - qr->setTailable(tailable); - qr->setAwaitData(awaitData); + qr->setTailableMode(tailableMode); auto statusWithCQ = CanonicalQuery::canonicalize(&_opCtx, std::move(qr)); ASSERT_OK(statusWithCQ.getStatus()); unique_ptr<CanonicalQuery> cq = std::move(statusWithCQ.getValue()); @@ -306,10 +304,10 @@ TEST_F(PlanExecutorTest, ShouldReportEOFIfExceedsTimeLimitDuringYieldButIsTailab BSONObj filterObj = fromjson("{_id: {$gt: 0}}"); Collection* coll = ctx.getCollection(); - const bool tailable = true; - const bool awaitData = true; - auto exec = makeCollScanExec( - coll, filterObj, PlanExecutor::YieldPolicy::ALWAYS_TIME_OUT, tailable, awaitData); + auto exec = makeCollScanExec(coll, + filterObj, + PlanExecutor::YieldPolicy::ALWAYS_TIME_OUT, + TailableMode::kTailableAndAwaitData); BSONObj resultObj; ASSERT_EQ(PlanExecutor::IS_EOF, exec->getNext(&resultObj, nullptr)); @@ -323,9 +321,8 @@ TEST_F(PlanExecutorTest, ShouldNotSwallowExceedsTimeLimitDuringYieldButIsTailabl BSONObj filterObj = fromjson("{_id: {$gt: 0}}"); Collection* coll = ctx.getCollection(); - const bool tailable = true; - auto exec = - makeCollScanExec(coll, filterObj, PlanExecutor::YieldPolicy::ALWAYS_TIME_OUT, tailable); + auto exec = makeCollScanExec( + coll, filterObj, PlanExecutor::YieldPolicy::ALWAYS_TIME_OUT, TailableMode::kTailable); BSONObj resultObj; ASSERT_EQ(PlanExecutor::DEAD, exec->getNext(&resultObj, nullptr)); diff --git a/src/mongo/s/commands/cluster_aggregate.cpp b/src/mongo/s/commands/cluster_aggregate.cpp index fc8bab48bf4..39ac66d1f9d 100644 --- a/src/mongo/s/commands/cluster_aggregate.cpp +++ b/src/mongo/s/commands/cluster_aggregate.cpp @@ -435,9 +435,18 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, // If this aggregation is on an unsharded collection, pass through to the primary shard. if (!executionNsRoutingInfo.cm() && !namespaces.executionNss.isCollectionlessAggregateNS() && liteParsedPipeline.allowedToPassthroughFromMongos()) { - return aggPassthrough( - opCtx, namespaces, executionNsRoutingInfo.primary()->getId(), request, cmdObj, result); + return aggPassthrough(opCtx, + namespaces, + executionNsRoutingInfo.primary()->getId(), + cmdObj, + request, + liteParsedPipeline, + result); } + // TODO SERVER-29141 support $changeStream on sharded collections. + uassert(40622, + "$changeStream is not yet supported on sharded collections", + !liteParsedPipeline.hasChangeStream()); std::unique_ptr<CollatorInterface> collation; if (!request.getCollation().isEmpty()) { @@ -668,8 +677,9 @@ void ClusterAggregate::uassertAllShardsSupportExplain( Status ClusterAggregate::aggPassthrough(OperationContext* opCtx, const Namespaces& namespaces, const ShardId& shardId, - const AggregationRequest& aggRequest, BSONObj cmdObj, + const AggregationRequest& aggRequest, + const LiteParsedPipeline& liteParsedPipeline, BSONObjBuilder* out) { // Temporary hack. See comment on declaration for details. auto swShard = Grid::get(opCtx)->shardRegistry()->getShard(opCtx, shardId); @@ -704,14 +714,16 @@ Status ClusterAggregate::aggPassthrough(OperationContext* opCtx, // The merging shard is remote, so if a response was received, a HostAndPort must have been // set. invariant(cmdResponse.hostAndPort); - result = uassertStatusOK( - storePossibleCursor(opCtx, - shard->getId(), - *cmdResponse.hostAndPort, - cmdResponse.response, - namespaces.requestedNss, - Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), - Grid::get(opCtx)->getCursorManager())); + result = uassertStatusOK(storePossibleCursor( + opCtx, + shard->getId(), + *cmdResponse.hostAndPort, + cmdResponse.response, + namespaces.requestedNss, + Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), + Grid::get(opCtx)->getCursorManager(), + liteParsedPipeline.hasChangeStream() ? TailableMode::kTailableAndAwaitData + : TailableMode::kNormal)); } // First append the properly constructed writeConcernError. It will then be skipped diff --git a/src/mongo/s/commands/cluster_aggregate.h b/src/mongo/s/commands/cluster_aggregate.h index 71c6709d262..740cc62c610 100644 --- a/src/mongo/s/commands/cluster_aggregate.h +++ b/src/mongo/s/commands/cluster_aggregate.h @@ -43,6 +43,7 @@ namespace mongo { +class LiteParsedPipeline; class OperationContext; class ShardId; @@ -98,11 +99,12 @@ private: const AggregationRequest& aggRequest, BSONObj cmd); - static Status aggPassthrough(OperationContext* opCtx, - const Namespaces& namespaces, - const ShardId& shardId, - const AggregationRequest& aggRequest, + static Status aggPassthrough(OperationContext*, + const Namespaces&, + const ShardId&, BSONObj cmd, + const AggregationRequest&, + const LiteParsedPipeline&, BSONObjBuilder* result); }; diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp index 50886944bb2..b867c7b43df 100644 --- a/src/mongo/s/query/async_results_merger.cpp +++ b/src/mongo/s/query/async_results_merger.cpp @@ -102,7 +102,7 @@ bool AsyncResultsMerger::remotesExhausted_inlock() { Status AsyncResultsMerger::setAwaitDataTimeout(Milliseconds awaitDataTimeout) { stdx::lock_guard<stdx::mutex> lk(_mutex); - if (!_params->isTailable || !_params->isAwaitData) { + if (_params->tailableMode != TailableMode::kTailableAndAwaitData) { return Status(ErrorCodes::BadValue, "maxTimeMS can only be used with getMore for tailable, awaitData cursors"); } @@ -155,7 +155,7 @@ bool AsyncResultsMerger::ready_inlock() { bool AsyncResultsMerger::readySorted_inlock() { // Tailable cursors cannot have a sort. - invariant(!_params->isTailable); + invariant(_params->tailableMode == TailableMode::kNormal); for (const auto& remote : _remotes) { if (!remote.hasNext() && !remote.exhausted()) { @@ -203,7 +203,7 @@ StatusWith<ClusterQueryResult> AsyncResultsMerger::nextReady() { ClusterQueryResult AsyncResultsMerger::nextReadySorted() { // Tailable cursors cannot have a sort. - invariant(!_params->isTailable); + invariant(_params->tailableMode == TailableMode::kNormal); if (_mergeQueue.empty()) { return {}; @@ -237,7 +237,8 @@ ClusterQueryResult AsyncResultsMerger::nextReadyUnsorted() { ClusterQueryResult front = _remotes[_gettingFromRemote].docBuffer.front(); _remotes[_gettingFromRemote].docBuffer.pop(); - if (_params->isTailable && !_remotes[_gettingFromRemote].hasNext()) { + if (_params->tailableMode != TailableMode::kNormal && + !_remotes[_gettingFromRemote].hasNext()) { // The cursor is tailable and we're about to return the last buffered result. This // means that the next value returned should be boost::none to indicate the end of // the batch. @@ -413,6 +414,10 @@ void AsyncResultsMerger::handleBatchResponse( cbData.response.isOK() ? parseCursorResponse(cbData.response.data, remote) : cbData.response.status); if (!cursorResponseStatus.isOK()) { + if (cursorResponseStatus == ErrorCodes::ExceededTimeLimit && + _params->tailableMode != TailableMode::kNormal) { + // We timed out before hearing back from the shard, + } remote.status = cursorResponseStatus.getStatus(); // Unreachable host errors are swallowed if the 'allowPartialResults' option is set. We // remove the unreachable host entirely from consideration by marking it as exhausted. @@ -444,7 +449,7 @@ void AsyncResultsMerger::handleBatchResponse( // be boost::none in order to indicate the end of the batch. // (Note: tailable cursors are only valid on unsharded collections, so the end of the batch from // one shard means the end of the overall batch). - if (_params->isTailable && !remote.hasNext()) { + if (_params->tailableMode != TailableMode::kNormal && !remote.hasNext()) { _eofNext = true; } @@ -453,7 +458,8 @@ void AsyncResultsMerger::handleBatchResponse( // // We do not ask for the next batch if the cursor is tailable, as batches received from remote // tailable cursors should be passed through to the client without asking for more batches. - if (!_params->isTailable && !remote.hasNext() && !remote.exhausted()) { + if (_params->tailableMode == TailableMode::kNormal && !remote.hasNext() && + !remote.exhausted()) { remote.status = askForNextBatch_inlock(remoteIndex); if (!remote.status.isOK()) { return; diff --git a/src/mongo/s/query/async_results_merger_test.cpp b/src/mongo/s/query/async_results_merger_test.cpp index 6076a559867..071ec469a1b 100644 --- a/src/mongo/s/query/async_results_merger_test.cpp +++ b/src/mongo/s/query/async_results_merger_test.cpp @@ -127,8 +127,7 @@ protected: _params->limit = qr->getLimit(); _params->batchSize = getMoreBatchSize ? getMoreBatchSize : qr->getBatchSize(); _params->skip = qr->getSkip(); - _params->isTailable = qr->isTailable(); - _params->isAwaitData = qr->isAwaitData(); + _params->tailableMode = qr->getTailableMode(); _params->isAllowPartialResults = qr->isAllowPartialResults(); } diff --git a/src/mongo/s/query/cluster_client_cursor_impl.cpp b/src/mongo/s/query/cluster_client_cursor_impl.cpp index f286cee408e..9ec302ea579 100644 --- a/src/mongo/s/query/cluster_client_cursor_impl.cpp +++ b/src/mongo/s/query/cluster_client_cursor_impl.cpp @@ -109,7 +109,7 @@ void ClusterClientCursorImpl::detachFromOperationContext() { } bool ClusterClientCursorImpl::isTailable() const { - return _params.isTailable; + return _params.tailableMode != TailableMode::kNormal; } UserNameIterator ClusterClientCursorImpl::getAuthenticatedUsers() const { diff --git a/src/mongo/s/query/cluster_client_cursor_params.h b/src/mongo/s/query/cluster_client_cursor_params.h index 1b4d76124c3..e3a5e4f62cb 100644 --- a/src/mongo/s/query/cluster_client_cursor_params.h +++ b/src/mongo/s/query/cluster_client_cursor_params.h @@ -39,6 +39,7 @@ #include "mongo/db/namespace_string.h" #include "mongo/db/pipeline/pipeline.h" #include "mongo/db/query/cursor_response.h" +#include "mongo/db/query/tailable_mode.h" #include "mongo/s/client/shard.h" #include "mongo/util/net/hostandport.h" @@ -114,11 +115,9 @@ struct ClusterClientCursorParams { // If set, we use this pipeline to merge the output of aggregations on each remote. std::unique_ptr<Pipeline, Pipeline::Deleter> mergePipeline; - // Whether this cursor is tailing a capped collection. - bool isTailable = false; - - // Whether this cursor has the awaitData option set. - bool isAwaitData = false; + // Whether this cursor is tailing a capped collection, and whether it has the awaitData option + // set. + TailableMode tailableMode = TailableMode::kNormal; // Set if a readPreference must be respected throughout the lifetime of the cursor. boost::optional<ReadPreferenceSetting> readPreference; diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp index 57050b408b6..ba53581905b 100644 --- a/src/mongo/s/query/cluster_find.cpp +++ b/src/mongo/s/query/cluster_find.cpp @@ -190,8 +190,7 @@ StatusWith<CursorId> runQueryWithoutRetrying(OperationContext* opCtx, params.limit = query.getQueryRequest().getLimit(); params.batchSize = query.getQueryRequest().getEffectiveBatchSize(); params.skip = query.getQueryRequest().getSkip(); - params.isTailable = query.getQueryRequest().isTailable(); - params.isAwaitData = query.getQueryRequest().isAwaitData(); + params.tailableMode = query.getQueryRequest().getTailableMode(); params.isAllowPartialResults = query.getQueryRequest().isAllowPartialResults(); // This is the batchSize passed to each subsequent getMore command issued by the cursor. We @@ -209,7 +208,7 @@ StatusWith<CursorId> runQueryWithoutRetrying(OperationContext* opCtx, } // Tailable cursors can't have a sort, which should have already been validated. - invariant(params.sort.isEmpty() || !params.isTailable); + invariant(params.sort.isEmpty() || !query.getQueryRequest().isTailable()); const auto qrToForward = transformQueryForShards(query.getQueryRequest()); if (!qrToForward.isOK()) { diff --git a/src/mongo/s/query/store_possible_cursor.cpp b/src/mongo/s/query/store_possible_cursor.cpp index 506ac226636..f611e612d2a 100644 --- a/src/mongo/s/query/store_possible_cursor.cpp +++ b/src/mongo/s/query/store_possible_cursor.cpp @@ -46,7 +46,8 @@ StatusWith<BSONObj> storePossibleCursor(OperationContext* opCtx, const BSONObj& cmdResult, const NamespaceString& requestedNss, executor::TaskExecutor* executor, - ClusterCursorManager* cursorManager) { + ClusterCursorManager* cursorManager, + TailableMode tailableMode) { if (!cmdResult["ok"].trueValue() || !cmdResult.hasField("cursor")) { return cmdResult; } @@ -68,6 +69,7 @@ StatusWith<BSONObj> storePossibleCursor(OperationContext* opCtx, CursorResponse(incomingCursorResponse.getValue().getNSS(), incomingCursorResponse.getValue().getCursorId(), {})); + params.tailableMode = tailableMode; auto ccc = ClusterClientCursorImpl::make(opCtx, executor, std::move(params)); diff --git a/src/mongo/s/query/store_possible_cursor.h b/src/mongo/s/query/store_possible_cursor.h index 14d2942d66d..75a4e76bf24 100644 --- a/src/mongo/s/query/store_possible_cursor.h +++ b/src/mongo/s/query/store_possible_cursor.h @@ -30,6 +30,7 @@ #include "mongo/db/namespace_string.h" #include "mongo/db/operation_context.h" +#include "mongo/db/query/tailable_mode.h" #include "mongo/s/shard_id.h" namespace mongo { @@ -72,6 +73,7 @@ StatusWith<BSONObj> storePossibleCursor(OperationContext* opCtx, const BSONObj& cmdResult, const NamespaceString& requestedNss, executor::TaskExecutor* executor, - ClusterCursorManager* cursorManager); + ClusterCursorManager* cursorManager, + TailableMode tailableMode = TailableMode::kNormal); } // namespace mongo |