summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorCharlie Swanson <charlie.swanson@mongodb.com>2017-09-05 11:23:54 -0400
committerCharlie Swanson <charlie.swanson@mongodb.com>2017-09-13 14:44:09 -0400
commitfe125855b6b3e8feb9d7d666338a7f2d29d301ad (patch)
treec682c408675b895bd343dd7187de8be18e875f66 /src/mongo
parent61d1cfbf2c8521126506c12bcd2d187a7926fbe0 (diff)
downloadmongo-fe125855b6b3e8feb9d7d666338a7f2d29d301ad.tar.gz
SERVER-29142 Support $changeStream on unsharded collections.
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/commands/run_aggregate.cpp4
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.h4
-rw-r--r--src/mongo/db/pipeline/expression_context.h5
-rw-r--r--src/mongo/db/pipeline/pipeline_d.cpp9
-rw-r--r--src/mongo/db/query/SConscript3
-rw-r--r--src/mongo/db/query/canonical_query_test.cpp2
-rw-r--r--src/mongo/db/query/plan_executor.cpp8
-rw-r--r--src/mongo/db/query/query_request.cpp60
-rw-r--r--src/mongo/db/query/query_request.h27
-rw-r--r--src/mongo/db/query/query_request_test.cpp10
-rw-r--r--src/mongo/db/query/tailable_mode.cpp48
-rw-r--r--src/mongo/db/query/tailable_mode.h48
-rw-r--r--src/mongo/dbtests/documentsourcetests.cpp12
-rw-r--r--src/mongo/dbtests/query_plan_executor.cpp19
-rw-r--r--src/mongo/s/commands/cluster_aggregate.cpp34
-rw-r--r--src/mongo/s/commands/cluster_aggregate.h10
-rw-r--r--src/mongo/s/query/async_results_merger.cpp18
-rw-r--r--src/mongo/s/query/async_results_merger_test.cpp3
-rw-r--r--src/mongo/s/query/cluster_client_cursor_impl.cpp2
-rw-r--r--src/mongo/s/query/cluster_client_cursor_params.h9
-rw-r--r--src/mongo/s/query/cluster_find.cpp5
-rw-r--r--src/mongo/s/query/store_possible_cursor.cpp4
-rw-r--r--src/mongo/s/query/store_possible_cursor.h4
23 files changed, 230 insertions, 118 deletions
diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp
index 3350f43e4b5..c8d63487229 100644
--- a/src/mongo/db/commands/run_aggregate.cpp
+++ b/src/mongo/db/commands/run_aggregate.cpp
@@ -409,7 +409,7 @@ Status runAggregate(OperationContext* opCtx,
expCtx->tempDir = storageGlobalParams.dbpath + "/_tmp";
if (liteParsedPipeline.hasChangeStream()) {
- expCtx->tailableMode = ExpressionContext::TailableMode::kTailableAndAwaitData;
+ expCtx->tailableMode = TailableMode::kTailableAndAwaitData;
}
auto pipeline = uassertStatusOK(Pipeline::parse(request.getPipeline(), expCtx));
@@ -471,7 +471,7 @@ Status runAggregate(OperationContext* opCtx,
AuthorizationSession::get(opCtx->getClient())->getAuthenticatedUserNames(),
opCtx->recoveryUnit()->isReadingFromMajorityCommittedSnapshot(),
cmdObj);
- if (expCtx->tailableMode == ExpressionContext::TailableMode::kTailableAndAwaitData) {
+ if (expCtx->tailableMode == TailableMode::kTailableAndAwaitData) {
cursorParams.setTailable(true);
cursorParams.setAwaitData(true);
}
diff --git a/src/mongo/db/pipeline/document_source_change_stream.h b/src/mongo/db/pipeline/document_source_change_stream.h
index 1b637abbd6c..a33322b80dd 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.h
+++ b/src/mongo/db/pipeline/document_source_change_stream.h
@@ -62,10 +62,6 @@ public:
PrivilegeVector requiredPrivileges(bool isMongos) const final {
return {};
}
-
- bool allowedToForwardFromMongos() const final {
- return false;
- }
};
class Transformation : public DocumentSourceSingleDocumentTransformation::TransformerInterface {
diff --git a/src/mongo/db/pipeline/expression_context.h b/src/mongo/db/pipeline/expression_context.h
index ad7052968c1..037f1b81383 100644
--- a/src/mongo/db/pipeline/expression_context.h
+++ b/src/mongo/db/pipeline/expression_context.h
@@ -43,6 +43,7 @@
#include "mongo/db/pipeline/variables.h"
#include "mongo/db/query/collation/collator_interface.h"
#include "mongo/db/query/explain_options.h"
+#include "mongo/db/query/tailable_mode.h"
#include "mongo/util/intrusive_counter.h"
#include "mongo/util/string_map.h"
@@ -58,8 +59,6 @@ public:
std::vector<BSONObj> pipeline;
};
- enum class TailableMode { kNormal, kTailableAndAwaitData };
-
/**
* Constructs an ExpressionContext to be used for Pipeline parsing and evaluation.
* 'resolvedNamespaces' maps collection names (not full namespaces) to ResolvedNamespaces.
@@ -107,7 +106,7 @@ public:
* Convenience call that returns true if the tailableMode indicate a tailable query.
*/
bool isTailable() const {
- return tailableMode == ExpressionContext::TailableMode::kTailableAndAwaitData;
+ return tailableMode == TailableMode::kTailableAndAwaitData;
}
// The explain verbosity requested by the user, or boost::none if no explain was requested.
diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp
index 7f6a7c5fc49..66d4baf9379 100644
--- a/src/mongo/db/pipeline/pipeline_d.cpp
+++ b/src/mongo/db/pipeline/pipeline_d.cpp
@@ -400,14 +400,7 @@ StatusWith<std::unique_ptr<PlanExecutor, PlanExecutor::Deleter>> attemptToGetExe
const AggregationRequest* aggRequest,
const size_t plannerOpts) {
auto qr = stdx::make_unique<QueryRequest>(nss);
- switch (pExpCtx->tailableMode) {
- case ExpressionContext::TailableMode::kNormal:
- break;
- case ExpressionContext::TailableMode::kTailableAndAwaitData:
- qr->setTailable(true);
- qr->setAwaitData(true);
- break;
- }
+ qr->setTailableMode(pExpCtx->tailableMode);
qr->setOplogReplay(oplogReplay);
qr->setFilter(queryObj);
qr->setProj(projectionObj);
diff --git a/src/mongo/db/query/SConscript b/src/mongo/db/query/SConscript
index 657e485348e..6701dc560f7 100644
--- a/src/mongo/db/query/SConscript
+++ b/src/mongo/db/query/SConscript
@@ -241,7 +241,8 @@ env.Library(
env.Library(
target="query_request",
source=[
- "query_request.cpp"
+ "query_request.cpp",
+ "tailable_mode.cpp"
],
LIBDEPS=[
"$BUILD_DIR/mongo/base",
diff --git a/src/mongo/db/query/canonical_query_test.cpp b/src/mongo/db/query/canonical_query_test.cpp
index 691079a9560..04048242c64 100644
--- a/src/mongo/db/query/canonical_query_test.cpp
+++ b/src/mongo/db/query/canonical_query_test.cpp
@@ -169,7 +169,7 @@ TEST(CanonicalQueryTest, IsValidText) {
TEST(CanonicalQueryTest, IsValidTextTailable) {
// Filter inside QueryRequest is not used.
auto qr = stdx::make_unique<QueryRequest>(nss);
- qr->setTailable(true);
+ qr->setTailableMode(TailableMode::kTailable);
ASSERT_OK(qr->validate());
// Invalid: TEXT and tailable.
diff --git a/src/mongo/db/query/plan_executor.cpp b/src/mongo/db/query/plan_executor.cpp
index dbeb1d56b71..ed443a2b09d 100644
--- a/src/mongo/db/query/plan_executor.cpp
+++ b/src/mongo/db/query/plan_executor.cpp
@@ -422,7 +422,7 @@ PlanExecutor::ExecState PlanExecutor::getNextSnapshotted(Snapshotted<BSONObj>* o
bool PlanExecutor::shouldWaitForInserts() {
// If this is an awaitData-respecting operation and we have time left and we're not interrupted,
// we should wait for inserts.
- if (_cq && _cq->getQueryRequest().isTailable() && _cq->getQueryRequest().isAwaitData() &&
+ if (_cq && _cq->getQueryRequest().isTailableAndAwaitData() &&
mongo::shouldWaitForInserts(_opCtx) && _opCtx->checkForInterruptNoAssert().isOK() &&
_opCtx->getRemainingMaxTimeMicros() > Microseconds::zero()) {
// We expect awaitData cursors to be yielding.
@@ -689,9 +689,9 @@ void PlanExecutor::enqueue(const BSONObj& obj) {
PlanExecutor::ExecState PlanExecutor::swallowTimeoutIfAwaitData(
Status yieldError, Snapshotted<BSONObj>* errorObj) const {
if (yieldError == ErrorCodes::ExceededTimeLimit) {
- if (_cq && _cq->getQueryRequest().isTailable() && _cq->getQueryRequest().isAwaitData()) {
- // If the cursor is tailable then exceeding the time limit should not
- // destroy this PlanExecutor, we should just stop waiting for inserts.
+ if (_cq && _cq->getQueryRequest().isTailableAndAwaitData()) {
+ // If the cursor is tailable then exceeding the time limit should not destroy this
+ // PlanExecutor, we should just stop waiting for inserts.
return PlanExecutor::IS_EOF;
}
}
diff --git a/src/mongo/db/query/query_request.cpp b/src/mongo/db/query/query_request.cpp
index 00a8915c1c4..cd09a591955 100644
--- a/src/mongo/db/query/query_request.cpp
+++ b/src/mongo/db/query/query_request.cpp
@@ -127,6 +127,8 @@ StatusWith<unique_ptr<QueryRequest>> QueryRequest::parseFromFindCommand(unique_p
const BSONObj& cmdObj,
bool isExplain) {
qr->_explain = isExplain;
+ bool tailable = false;
+ bool awaitData = false;
// Parse the command BSON by looping through one element at a time.
BSONObjIterator it(cmdObj);
@@ -316,7 +318,7 @@ StatusWith<unique_ptr<QueryRequest>> QueryRequest::parseFromFindCommand(unique_p
return status;
}
- qr->_tailable = el.boolean();
+ tailable = el.boolean();
} else if (fieldName == kOplogReplayField) {
Status status = checkFieldType(el, Bool);
if (!status.isOK()) {
@@ -337,7 +339,7 @@ StatusWith<unique_ptr<QueryRequest>> QueryRequest::parseFromFindCommand(unique_p
return status;
}
- qr->_awaitData = el.boolean();
+ awaitData = el.boolean();
} else if (fieldName == kPartialResultsField) {
Status status = checkFieldType(el, Bool);
if (!status.isOK()) {
@@ -385,6 +387,11 @@ StatusWith<unique_ptr<QueryRequest>> QueryRequest::parseFromFindCommand(unique_p
}
}
+ auto tailableMode = tailableModeFromBools(tailable, awaitData);
+ if (!tailableMode.isOK()) {
+ return tailableMode.getStatus();
+ }
+ qr->_tailableMode = tailableMode.getValue();
qr->addMetaProjection();
Status validateStatus = qr->validate();
@@ -494,8 +501,19 @@ void QueryRequest::asFindCommand(BSONObjBuilder* cmdBuilder) const {
cmdBuilder->append(kSnapshotField, true);
}
- if (_tailable) {
- cmdBuilder->append(kTailableField, true);
+ switch (_tailableMode) {
+ case TailableMode::kTailable: {
+ cmdBuilder->append(kTailableField, true);
+ break;
+ }
+ case TailableMode::kTailableAndAwaitData: {
+ cmdBuilder->append(kTailableField, true);
+ cmdBuilder->append(kAwaitDataField, true);
+ break;
+ }
+ case TailableMode::kNormal: {
+ break;
+ }
}
if (_oplogReplay) {
@@ -506,10 +524,6 @@ void QueryRequest::asFindCommand(BSONObjBuilder* cmdBuilder) const {
cmdBuilder->append(kNoCursorTimeoutField, true);
}
- if (_awaitData) {
- cmdBuilder->append(kAwaitDataField, true);
- }
-
if (_allowPartialResults) {
cmdBuilder->append(kPartialResultsField, true);
}
@@ -625,7 +639,7 @@ Status QueryRequest::validate() const {
<< _maxTimeMS);
}
- if (_tailable) {
+ if (_tailableMode != TailableMode::kNormal) {
// Tailable cursors cannot have any sort other than {$natural: 1}.
const BSONObj expectedSort = BSON(kNaturalSortField << 1);
if (!_sort.isEmpty() &&
@@ -641,10 +655,6 @@ Status QueryRequest::validate() const {
}
}
- if (_awaitData && !_tailable) {
- return Status(ErrorCodes::BadValue, "Cannot set awaitData without tailable");
- }
-
return Status::OK();
}
@@ -917,7 +927,8 @@ Status QueryRequest::initFullQuery(const BSONObj& top) {
}
_maxTimeMS = maxTimeMS.getValue();
} else if (str::equals("comment", name)) {
- // Legacy $comment can be any BSON element. Convert to string if it isn't already.
+ // Legacy $comment can be any BSON element. Convert to string if it isn't
+ // already.
if (e.type() == BSONType::String) {
_comment = e.str();
} else {
@@ -932,8 +943,11 @@ Status QueryRequest::initFullQuery(const BSONObj& top) {
int QueryRequest::getOptions() const {
int options = 0;
- if (_tailable) {
+ if (_tailableMode == TailableMode::kTailable) {
+ options |= QueryOption_CursorTailable;
+ } else if (_tailableMode == TailableMode::kTailableAndAwaitData) {
options |= QueryOption_CursorTailable;
+ options |= QueryOption_AwaitData;
}
if (_slaveOk) {
options |= QueryOption_SlaveOk;
@@ -944,9 +958,6 @@ int QueryRequest::getOptions() const {
if (_noCursorTimeout) {
options |= QueryOption_NoCursorTimeout;
}
- if (_awaitData) {
- options |= QueryOption_AwaitData;
- }
if (_exhaust) {
options |= QueryOption_Exhaust;
}
@@ -957,11 +968,12 @@ int QueryRequest::getOptions() const {
}
void QueryRequest::initFromInt(int options) {
- _tailable = (options & QueryOption_CursorTailable) != 0;
+ bool tailable = (options & QueryOption_CursorTailable) != 0;
+ bool awaitData = (options & QueryOption_AwaitData) != 0;
+ _tailableMode = uassertStatusOK(tailableModeFromBools(tailable, awaitData));
_slaveOk = (options & QueryOption_SlaveOk) != 0;
_oplogReplay = (options & QueryOption_OplogReplay) != 0;
_noCursorTimeout = (options & QueryOption_NoCursorTimeout) != 0;
- _awaitData = (options & QueryOption_AwaitData) != 0;
_exhaust = (options & QueryOption_Exhaust) != 0;
_allowPartialResults = (options & QueryOption_PartialResults) != 0;
}
@@ -1010,9 +1022,9 @@ StatusWith<BSONObj> QueryRequest::asAggregationCommand() const {
return {ErrorCodes::InvalidPipelineOperator,
str::stream() << "Option " << kSnapshotField << " not supported in aggregation."};
}
- if (_tailable) {
+ if (isTailable()) {
return {ErrorCodes::InvalidPipelineOperator,
- str::stream() << "Option " << kTailableField << " not supported in aggregation."};
+ "Tailable cursors are not supported in aggregation."};
}
if (_oplogReplay) {
return {ErrorCodes::InvalidPipelineOperator,
@@ -1024,10 +1036,6 @@ StatusWith<BSONObj> QueryRequest::asAggregationCommand() const {
str::stream() << "Option " << kNoCursorTimeoutField
<< " not supported in aggregation."};
}
- if (_awaitData) {
- return {ErrorCodes::InvalidPipelineOperator,
- str::stream() << "Option " << kAwaitDataField << " not supported in aggregation."};
- }
if (_allowPartialResults) {
return {ErrorCodes::InvalidPipelineOperator,
str::stream() << "Option " << kPartialResultsField
diff --git a/src/mongo/db/query/query_request.h b/src/mongo/db/query/query_request.h
index fa0f269b4b5..dbe89f8d1d3 100644
--- a/src/mongo/db/query/query_request.h
+++ b/src/mongo/db/query/query_request.h
@@ -35,6 +35,7 @@
#include "mongo/db/jsobj.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/operation_context.h"
+#include "mongo/db/query/tailable_mode.h"
namespace mongo {
@@ -331,11 +332,20 @@ public:
}
bool isTailable() const {
- return _tailable;
+ return _tailableMode == TailableMode::kTailable ||
+ _tailableMode == TailableMode::kTailableAndAwaitData;
}
- void setTailable(bool tailable) {
- _tailable = tailable;
+ bool isTailableAndAwaitData() const {
+ return _tailableMode == TailableMode::kTailableAndAwaitData;
+ }
+
+ void setTailableMode(TailableMode tailableMode) {
+ _tailableMode = tailableMode;
+ }
+
+ TailableMode getTailableMode() const {
+ return _tailableMode;
}
bool isSlaveOk() const {
@@ -362,14 +372,6 @@ public:
_noCursorTimeout = noCursorTimeout;
}
- bool isAwaitData() const {
- return _awaitData;
- }
-
- void setAwaitData(bool awaitData) {
- _awaitData = awaitData;
- }
-
bool isExhaust() const {
return _exhaust;
}
@@ -510,11 +512,10 @@ private:
bool _hasReadPref = false;
// Options that can be specified in the OP_QUERY 'flags' header.
- bool _tailable = false;
+ TailableMode _tailableMode = TailableMode::kNormal;
bool _slaveOk = false;
bool _oplogReplay = false;
bool _noCursorTimeout = false;
- bool _awaitData = false;
bool _exhaust = false;
bool _allowPartialResults = false;
diff --git a/src/mongo/db/query/query_request_test.cpp b/src/mongo/db/query/query_request_test.cpp
index 4910b769af9..a838cfd1882 100644
--- a/src/mongo/db/query/query_request_test.cpp
+++ b/src/mongo/db/query/query_request_test.cpp
@@ -444,7 +444,7 @@ TEST(QueryRequestTest, ParseFromCommandAllFlagsTrue) {
ASSERT(!qr->isSlaveOk());
ASSERT(qr->isOplogReplay());
ASSERT(qr->isNoCursorTimeout());
- ASSERT(qr->isAwaitData());
+ ASSERT(qr->isTailableAndAwaitData());
ASSERT(qr->isAllowPartialResults());
}
@@ -1047,7 +1047,7 @@ TEST(QueryRequestTest, DefaultQueryParametersCorrect) {
ASSERT_EQUALS(false, qr->isSlaveOk());
ASSERT_EQUALS(false, qr->isOplogReplay());
ASSERT_EQUALS(false, qr->isNoCursorTimeout());
- ASSERT_EQUALS(false, qr->isAwaitData());
+ ASSERT_EQUALS(false, qr->isTailableAndAwaitData());
ASSERT_EQUALS(false, qr->isExhaust());
ASSERT_EQUALS(false, qr->isAllowPartialResults());
}
@@ -1220,7 +1220,7 @@ TEST(QueryRequestTest, ConvertToAggregationWithSnapshotFails) {
TEST(QueryRequestTest, ConvertToAggregationWithTailableFails) {
QueryRequest qr(testns);
- qr.setTailable(true);
+ qr.setTailableMode(TailableMode::kTailable);
ASSERT_NOT_OK(qr.asAggregationCommand());
}
@@ -1238,7 +1238,7 @@ TEST(QueryRequestTest, ConvertToAggregationWithNoCursorTimeoutFails) {
TEST(QueryRequestTest, ConvertToAggregationWithAwaitDataFails) {
QueryRequest qr(testns);
- qr.setAwaitData(true);
+ qr.setTailableMode(TailableMode::kTailableAndAwaitData);
ASSERT_NOT_OK(qr.asAggregationCommand());
}
@@ -1388,7 +1388,7 @@ TEST(QueryRequestTest, ParseFromLegacyQuery) {
ASSERT_EQ(qr->isSlaveOk(), false);
ASSERT_EQ(qr->isOplogReplay(), false);
ASSERT_EQ(qr->isNoCursorTimeout(), false);
- ASSERT_EQ(qr->isAwaitData(), false);
+ ASSERT_EQ(qr->isTailable(), false);
ASSERT_EQ(qr->isExhaust(), true);
ASSERT_EQ(qr->isAllowPartialResults(), false);
ASSERT_EQ(qr->getOptions(), QueryOption_Exhaust);
diff --git a/src/mongo/db/query/tailable_mode.cpp b/src/mongo/db/query/tailable_mode.cpp
new file mode 100644
index 00000000000..b19a1988672
--- /dev/null
+++ b/src/mongo/db/query/tailable_mode.cpp
@@ -0,0 +1,48 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/query/tailable_mode.h"
+
+namespace mongo {
+
+StatusWith<TailableMode> tailableModeFromBools(bool isTailable, bool isAwaitData) {
+ if (isTailable) {
+ if (isAwaitData) {
+ return TailableMode::kTailableAndAwaitData;
+ }
+ return TailableMode::kTailable;
+ } else if (isAwaitData) {
+ return {ErrorCodes::FailedToParse,
+ "Cannot set 'awaitData' without also setting 'tailable'"};
+ }
+ return TailableMode::kNormal;
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/query/tailable_mode.h b/src/mongo/db/query/tailable_mode.h
new file mode 100644
index 00000000000..92c0fe9292e
--- /dev/null
+++ b/src/mongo/db/query/tailable_mode.h
@@ -0,0 +1,48 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/base/error_codes.h"
+#include "mongo/base/status_with.h"
+
+namespace mongo {
+
+enum class TailableMode {
+ kNormal,
+ kTailable,
+ kTailableAndAwaitData,
+};
+
+/**
+ * Returns a TailableMode from two booleans, returning ErrorCodes::FailedToParse if awaitData is
+ * set without tailable.
+ */
+StatusWith<TailableMode> tailableModeFromBools(bool isTailable, bool isAwaitData);
+
+} // namespace mongo
diff --git a/src/mongo/dbtests/documentsourcetests.cpp b/src/mongo/dbtests/documentsourcetests.cpp
index f5bd327705f..9d5fc0b5162 100644
--- a/src/mongo/dbtests/documentsourcetests.cpp
+++ b/src/mongo/dbtests/documentsourcetests.cpp
@@ -331,8 +331,7 @@ TEST_F(DocumentSourceCursorTest, TailableAwaitDataCursorStillUsableAfterTimeout)
opCtx(), collScanParams, workingSet.get(), matchExpression.get());
auto queryRequest = stdx::make_unique<QueryRequest>(nss);
queryRequest->setFilter(filter);
- queryRequest->setTailable(true);
- queryRequest->setAwaitData(true);
+ queryRequest->setTailableMode(TailableMode::kTailableAndAwaitData);
auto canonicalQuery = unittest::assertGet(
CanonicalQuery::canonicalize(opCtx(), std::move(queryRequest), nullptr));
auto planExecutor =
@@ -344,7 +343,7 @@ TEST_F(DocumentSourceCursorTest, TailableAwaitDataCursorStillUsableAfterTimeout)
PlanExecutor::YieldPolicy::ALWAYS_TIME_OUT));
// Make a DocumentSourceCursor.
- ctx()->tailableMode = ExpressionContext::TailableMode::kTailableAndAwaitData;
+ ctx()->tailableMode = TailableMode::kTailableAndAwaitData;
// DocumentSourceCursor expects a PlanExecutor that has had its state saved.
planExecutor->saveState();
auto cursor =
@@ -382,7 +381,7 @@ TEST_F(DocumentSourceCursorTest, NonAwaitDataCursorShouldErrorAfterTimeout) {
PlanExecutor::YieldPolicy::ALWAYS_TIME_OUT));
// Make a DocumentSourceCursor.
- ctx()->tailableMode = ExpressionContext::TailableMode::kNormal;
+ ctx()->tailableMode = TailableMode::kNormal;
// DocumentSourceCursor expects a PlanExecutor that has had its state saved.
planExecutor->saveState();
auto cursor =
@@ -412,6 +411,7 @@ TEST_F(DocumentSourceCursorTest, TailableAwaitDataCursorShouldErrorAfterBeingKil
opCtx(), collScanParams, workingSet.get(), matchExpression.get());
auto queryRequest = stdx::make_unique<QueryRequest>(nss);
queryRequest->setFilter(filter);
+ queryRequest->setTailableMode(TailableMode::kTailableAndAwaitData);
auto canonicalQuery = unittest::assertGet(
CanonicalQuery::canonicalize(opCtx(), std::move(queryRequest), nullptr));
auto planExecutor =
@@ -423,7 +423,7 @@ TEST_F(DocumentSourceCursorTest, TailableAwaitDataCursorShouldErrorAfterBeingKil
PlanExecutor::YieldPolicy::ALWAYS_MARK_KILLED));
// Make a DocumentSourceCursor.
- ctx()->tailableMode = ExpressionContext::TailableMode::kTailableAndAwaitData;
+ ctx()->tailableMode = TailableMode::kTailableAndAwaitData;
// DocumentSourceCursor expects a PlanExecutor that has had its state saved.
planExecutor->saveState();
auto cursor =
@@ -461,7 +461,7 @@ TEST_F(DocumentSourceCursorTest, NormalCursorShouldErrorAfterBeingKilled) {
PlanExecutor::YieldPolicy::ALWAYS_MARK_KILLED));
// Make a DocumentSourceCursor.
- ctx()->tailableMode = ExpressionContext::TailableMode::kNormal;
+ ctx()->tailableMode = TailableMode::kNormal;
// DocumentSourceCursor expects a PlanExecutor that has had its state saved.
planExecutor->saveState();
auto cursor =
diff --git a/src/mongo/dbtests/query_plan_executor.cpp b/src/mongo/dbtests/query_plan_executor.cpp
index 99f4548b631..99a45b8a402 100644
--- a/src/mongo/dbtests/query_plan_executor.cpp
+++ b/src/mongo/dbtests/query_plan_executor.cpp
@@ -99,8 +99,7 @@ public:
Collection* coll,
BSONObj& filterObj,
PlanExecutor::YieldPolicy yieldPolicy = PlanExecutor::YieldPolicy::YIELD_MANUAL,
- bool tailable = false,
- bool awaitData = false) {
+ TailableMode tailableMode = TailableMode::kNormal) {
CollectionScanParams csparams;
csparams.collection = coll;
csparams.direction = CollectionScanParams::FORWARD;
@@ -109,8 +108,7 @@ public:
// Canonicalize the query.
auto qr = stdx::make_unique<QueryRequest>(nss);
qr->setFilter(filterObj);
- qr->setTailable(tailable);
- qr->setAwaitData(awaitData);
+ qr->setTailableMode(tailableMode);
auto statusWithCQ = CanonicalQuery::canonicalize(&_opCtx, std::move(qr));
ASSERT_OK(statusWithCQ.getStatus());
unique_ptr<CanonicalQuery> cq = std::move(statusWithCQ.getValue());
@@ -306,10 +304,10 @@ TEST_F(PlanExecutorTest, ShouldReportEOFIfExceedsTimeLimitDuringYieldButIsTailab
BSONObj filterObj = fromjson("{_id: {$gt: 0}}");
Collection* coll = ctx.getCollection();
- const bool tailable = true;
- const bool awaitData = true;
- auto exec = makeCollScanExec(
- coll, filterObj, PlanExecutor::YieldPolicy::ALWAYS_TIME_OUT, tailable, awaitData);
+ auto exec = makeCollScanExec(coll,
+ filterObj,
+ PlanExecutor::YieldPolicy::ALWAYS_TIME_OUT,
+ TailableMode::kTailableAndAwaitData);
BSONObj resultObj;
ASSERT_EQ(PlanExecutor::IS_EOF, exec->getNext(&resultObj, nullptr));
@@ -323,9 +321,8 @@ TEST_F(PlanExecutorTest, ShouldNotSwallowExceedsTimeLimitDuringYieldButIsTailabl
BSONObj filterObj = fromjson("{_id: {$gt: 0}}");
Collection* coll = ctx.getCollection();
- const bool tailable = true;
- auto exec =
- makeCollScanExec(coll, filterObj, PlanExecutor::YieldPolicy::ALWAYS_TIME_OUT, tailable);
+ auto exec = makeCollScanExec(
+ coll, filterObj, PlanExecutor::YieldPolicy::ALWAYS_TIME_OUT, TailableMode::kTailable);
BSONObj resultObj;
ASSERT_EQ(PlanExecutor::DEAD, exec->getNext(&resultObj, nullptr));
diff --git a/src/mongo/s/commands/cluster_aggregate.cpp b/src/mongo/s/commands/cluster_aggregate.cpp
index fc8bab48bf4..39ac66d1f9d 100644
--- a/src/mongo/s/commands/cluster_aggregate.cpp
+++ b/src/mongo/s/commands/cluster_aggregate.cpp
@@ -435,9 +435,18 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
// If this aggregation is on an unsharded collection, pass through to the primary shard.
if (!executionNsRoutingInfo.cm() && !namespaces.executionNss.isCollectionlessAggregateNS() &&
liteParsedPipeline.allowedToPassthroughFromMongos()) {
- return aggPassthrough(
- opCtx, namespaces, executionNsRoutingInfo.primary()->getId(), request, cmdObj, result);
+ return aggPassthrough(opCtx,
+ namespaces,
+ executionNsRoutingInfo.primary()->getId(),
+ cmdObj,
+ request,
+ liteParsedPipeline,
+ result);
}
+ // TODO SERVER-29141 support $changeStream on sharded collections.
+ uassert(40622,
+ "$changeStream is not yet supported on sharded collections",
+ !liteParsedPipeline.hasChangeStream());
std::unique_ptr<CollatorInterface> collation;
if (!request.getCollation().isEmpty()) {
@@ -668,8 +677,9 @@ void ClusterAggregate::uassertAllShardsSupportExplain(
Status ClusterAggregate::aggPassthrough(OperationContext* opCtx,
const Namespaces& namespaces,
const ShardId& shardId,
- const AggregationRequest& aggRequest,
BSONObj cmdObj,
+ const AggregationRequest& aggRequest,
+ const LiteParsedPipeline& liteParsedPipeline,
BSONObjBuilder* out) {
// Temporary hack. See comment on declaration for details.
auto swShard = Grid::get(opCtx)->shardRegistry()->getShard(opCtx, shardId);
@@ -704,14 +714,16 @@ Status ClusterAggregate::aggPassthrough(OperationContext* opCtx,
// The merging shard is remote, so if a response was received, a HostAndPort must have been
// set.
invariant(cmdResponse.hostAndPort);
- result = uassertStatusOK(
- storePossibleCursor(opCtx,
- shard->getId(),
- *cmdResponse.hostAndPort,
- cmdResponse.response,
- namespaces.requestedNss,
- Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(),
- Grid::get(opCtx)->getCursorManager()));
+ result = uassertStatusOK(storePossibleCursor(
+ opCtx,
+ shard->getId(),
+ *cmdResponse.hostAndPort,
+ cmdResponse.response,
+ namespaces.requestedNss,
+ Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(),
+ Grid::get(opCtx)->getCursorManager(),
+ liteParsedPipeline.hasChangeStream() ? TailableMode::kTailableAndAwaitData
+ : TailableMode::kNormal));
}
// First append the properly constructed writeConcernError. It will then be skipped
diff --git a/src/mongo/s/commands/cluster_aggregate.h b/src/mongo/s/commands/cluster_aggregate.h
index 71c6709d262..740cc62c610 100644
--- a/src/mongo/s/commands/cluster_aggregate.h
+++ b/src/mongo/s/commands/cluster_aggregate.h
@@ -43,6 +43,7 @@
namespace mongo {
+class LiteParsedPipeline;
class OperationContext;
class ShardId;
@@ -98,11 +99,12 @@ private:
const AggregationRequest& aggRequest,
BSONObj cmd);
- static Status aggPassthrough(OperationContext* opCtx,
- const Namespaces& namespaces,
- const ShardId& shardId,
- const AggregationRequest& aggRequest,
+ static Status aggPassthrough(OperationContext*,
+ const Namespaces&,
+ const ShardId&,
BSONObj cmd,
+ const AggregationRequest&,
+ const LiteParsedPipeline&,
BSONObjBuilder* result);
};
diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp
index 50886944bb2..b867c7b43df 100644
--- a/src/mongo/s/query/async_results_merger.cpp
+++ b/src/mongo/s/query/async_results_merger.cpp
@@ -102,7 +102,7 @@ bool AsyncResultsMerger::remotesExhausted_inlock() {
Status AsyncResultsMerger::setAwaitDataTimeout(Milliseconds awaitDataTimeout) {
stdx::lock_guard<stdx::mutex> lk(_mutex);
- if (!_params->isTailable || !_params->isAwaitData) {
+ if (_params->tailableMode != TailableMode::kTailableAndAwaitData) {
return Status(ErrorCodes::BadValue,
"maxTimeMS can only be used with getMore for tailable, awaitData cursors");
}
@@ -155,7 +155,7 @@ bool AsyncResultsMerger::ready_inlock() {
bool AsyncResultsMerger::readySorted_inlock() {
// Tailable cursors cannot have a sort.
- invariant(!_params->isTailable);
+ invariant(_params->tailableMode == TailableMode::kNormal);
for (const auto& remote : _remotes) {
if (!remote.hasNext() && !remote.exhausted()) {
@@ -203,7 +203,7 @@ StatusWith<ClusterQueryResult> AsyncResultsMerger::nextReady() {
ClusterQueryResult AsyncResultsMerger::nextReadySorted() {
// Tailable cursors cannot have a sort.
- invariant(!_params->isTailable);
+ invariant(_params->tailableMode == TailableMode::kNormal);
if (_mergeQueue.empty()) {
return {};
@@ -237,7 +237,8 @@ ClusterQueryResult AsyncResultsMerger::nextReadyUnsorted() {
ClusterQueryResult front = _remotes[_gettingFromRemote].docBuffer.front();
_remotes[_gettingFromRemote].docBuffer.pop();
- if (_params->isTailable && !_remotes[_gettingFromRemote].hasNext()) {
+ if (_params->tailableMode != TailableMode::kNormal &&
+ !_remotes[_gettingFromRemote].hasNext()) {
// The cursor is tailable and we're about to return the last buffered result. This
// means that the next value returned should be boost::none to indicate the end of
// the batch.
@@ -413,6 +414,10 @@ void AsyncResultsMerger::handleBatchResponse(
cbData.response.isOK() ? parseCursorResponse(cbData.response.data, remote)
: cbData.response.status);
if (!cursorResponseStatus.isOK()) {
+ if (cursorResponseStatus == ErrorCodes::ExceededTimeLimit &&
+ _params->tailableMode != TailableMode::kNormal) {
+ // We timed out before hearing back from the shard,
+ }
remote.status = cursorResponseStatus.getStatus();
// Unreachable host errors are swallowed if the 'allowPartialResults' option is set. We
// remove the unreachable host entirely from consideration by marking it as exhausted.
@@ -444,7 +449,7 @@ void AsyncResultsMerger::handleBatchResponse(
// be boost::none in order to indicate the end of the batch.
// (Note: tailable cursors are only valid on unsharded collections, so the end of the batch from
// one shard means the end of the overall batch).
- if (_params->isTailable && !remote.hasNext()) {
+ if (_params->tailableMode != TailableMode::kNormal && !remote.hasNext()) {
_eofNext = true;
}
@@ -453,7 +458,8 @@ void AsyncResultsMerger::handleBatchResponse(
//
// We do not ask for the next batch if the cursor is tailable, as batches received from remote
// tailable cursors should be passed through to the client without asking for more batches.
- if (!_params->isTailable && !remote.hasNext() && !remote.exhausted()) {
+ if (_params->tailableMode == TailableMode::kNormal && !remote.hasNext() &&
+ !remote.exhausted()) {
remote.status = askForNextBatch_inlock(remoteIndex);
if (!remote.status.isOK()) {
return;
diff --git a/src/mongo/s/query/async_results_merger_test.cpp b/src/mongo/s/query/async_results_merger_test.cpp
index 6076a559867..071ec469a1b 100644
--- a/src/mongo/s/query/async_results_merger_test.cpp
+++ b/src/mongo/s/query/async_results_merger_test.cpp
@@ -127,8 +127,7 @@ protected:
_params->limit = qr->getLimit();
_params->batchSize = getMoreBatchSize ? getMoreBatchSize : qr->getBatchSize();
_params->skip = qr->getSkip();
- _params->isTailable = qr->isTailable();
- _params->isAwaitData = qr->isAwaitData();
+ _params->tailableMode = qr->getTailableMode();
_params->isAllowPartialResults = qr->isAllowPartialResults();
}
diff --git a/src/mongo/s/query/cluster_client_cursor_impl.cpp b/src/mongo/s/query/cluster_client_cursor_impl.cpp
index f286cee408e..9ec302ea579 100644
--- a/src/mongo/s/query/cluster_client_cursor_impl.cpp
+++ b/src/mongo/s/query/cluster_client_cursor_impl.cpp
@@ -109,7 +109,7 @@ void ClusterClientCursorImpl::detachFromOperationContext() {
}
bool ClusterClientCursorImpl::isTailable() const {
- return _params.isTailable;
+ return _params.tailableMode != TailableMode::kNormal;
}
UserNameIterator ClusterClientCursorImpl::getAuthenticatedUsers() const {
diff --git a/src/mongo/s/query/cluster_client_cursor_params.h b/src/mongo/s/query/cluster_client_cursor_params.h
index 1b4d76124c3..e3a5e4f62cb 100644
--- a/src/mongo/s/query/cluster_client_cursor_params.h
+++ b/src/mongo/s/query/cluster_client_cursor_params.h
@@ -39,6 +39,7 @@
#include "mongo/db/namespace_string.h"
#include "mongo/db/pipeline/pipeline.h"
#include "mongo/db/query/cursor_response.h"
+#include "mongo/db/query/tailable_mode.h"
#include "mongo/s/client/shard.h"
#include "mongo/util/net/hostandport.h"
@@ -114,11 +115,9 @@ struct ClusterClientCursorParams {
// If set, we use this pipeline to merge the output of aggregations on each remote.
std::unique_ptr<Pipeline, Pipeline::Deleter> mergePipeline;
- // Whether this cursor is tailing a capped collection.
- bool isTailable = false;
-
- // Whether this cursor has the awaitData option set.
- bool isAwaitData = false;
+ // Whether this cursor is tailing a capped collection, and whether it has the awaitData option
+ // set.
+ TailableMode tailableMode = TailableMode::kNormal;
// Set if a readPreference must be respected throughout the lifetime of the cursor.
boost::optional<ReadPreferenceSetting> readPreference;
diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp
index 57050b408b6..ba53581905b 100644
--- a/src/mongo/s/query/cluster_find.cpp
+++ b/src/mongo/s/query/cluster_find.cpp
@@ -190,8 +190,7 @@ StatusWith<CursorId> runQueryWithoutRetrying(OperationContext* opCtx,
params.limit = query.getQueryRequest().getLimit();
params.batchSize = query.getQueryRequest().getEffectiveBatchSize();
params.skip = query.getQueryRequest().getSkip();
- params.isTailable = query.getQueryRequest().isTailable();
- params.isAwaitData = query.getQueryRequest().isAwaitData();
+ params.tailableMode = query.getQueryRequest().getTailableMode();
params.isAllowPartialResults = query.getQueryRequest().isAllowPartialResults();
// This is the batchSize passed to each subsequent getMore command issued by the cursor. We
@@ -209,7 +208,7 @@ StatusWith<CursorId> runQueryWithoutRetrying(OperationContext* opCtx,
}
// Tailable cursors can't have a sort, which should have already been validated.
- invariant(params.sort.isEmpty() || !params.isTailable);
+ invariant(params.sort.isEmpty() || !query.getQueryRequest().isTailable());
const auto qrToForward = transformQueryForShards(query.getQueryRequest());
if (!qrToForward.isOK()) {
diff --git a/src/mongo/s/query/store_possible_cursor.cpp b/src/mongo/s/query/store_possible_cursor.cpp
index 506ac226636..f611e612d2a 100644
--- a/src/mongo/s/query/store_possible_cursor.cpp
+++ b/src/mongo/s/query/store_possible_cursor.cpp
@@ -46,7 +46,8 @@ StatusWith<BSONObj> storePossibleCursor(OperationContext* opCtx,
const BSONObj& cmdResult,
const NamespaceString& requestedNss,
executor::TaskExecutor* executor,
- ClusterCursorManager* cursorManager) {
+ ClusterCursorManager* cursorManager,
+ TailableMode tailableMode) {
if (!cmdResult["ok"].trueValue() || !cmdResult.hasField("cursor")) {
return cmdResult;
}
@@ -68,6 +69,7 @@ StatusWith<BSONObj> storePossibleCursor(OperationContext* opCtx,
CursorResponse(incomingCursorResponse.getValue().getNSS(),
incomingCursorResponse.getValue().getCursorId(),
{}));
+ params.tailableMode = tailableMode;
auto ccc = ClusterClientCursorImpl::make(opCtx, executor, std::move(params));
diff --git a/src/mongo/s/query/store_possible_cursor.h b/src/mongo/s/query/store_possible_cursor.h
index 14d2942d66d..75a4e76bf24 100644
--- a/src/mongo/s/query/store_possible_cursor.h
+++ b/src/mongo/s/query/store_possible_cursor.h
@@ -30,6 +30,7 @@
#include "mongo/db/namespace_string.h"
#include "mongo/db/operation_context.h"
+#include "mongo/db/query/tailable_mode.h"
#include "mongo/s/shard_id.h"
namespace mongo {
@@ -72,6 +73,7 @@ StatusWith<BSONObj> storePossibleCursor(OperationContext* opCtx,
const BSONObj& cmdResult,
const NamespaceString& requestedNss,
executor::TaskExecutor* executor,
- ClusterCursorManager* cursorManager);
+ ClusterCursorManager* cursorManager,
+ TailableMode tailableMode = TailableMode::kNormal);
} // namespace mongo