summaryrefslogtreecommitdiff
path: root/src/mongo/db/query/query_request.cpp
diff options
context:
space:
mode:
authorCharlie Swanson <charlie.swanson@mongodb.com>2017-09-05 11:23:54 -0400
committerCharlie Swanson <charlie.swanson@mongodb.com>2017-09-13 14:44:09 -0400
commitfe125855b6b3e8feb9d7d666338a7f2d29d301ad (patch)
treec682c408675b895bd343dd7187de8be18e875f66 /src/mongo/db/query/query_request.cpp
parent61d1cfbf2c8521126506c12bcd2d187a7926fbe0 (diff)
downloadmongo-fe125855b6b3e8feb9d7d666338a7f2d29d301ad.tar.gz
SERVER-29142 Support $changeStream on unsharded collections.
Diffstat (limited to 'src/mongo/db/query/query_request.cpp')
-rw-r--r--src/mongo/db/query/query_request.cpp60
1 files changed, 34 insertions, 26 deletions
diff --git a/src/mongo/db/query/query_request.cpp b/src/mongo/db/query/query_request.cpp
index 00a8915c1c4..cd09a591955 100644
--- a/src/mongo/db/query/query_request.cpp
+++ b/src/mongo/db/query/query_request.cpp
@@ -127,6 +127,8 @@ StatusWith<unique_ptr<QueryRequest>> QueryRequest::parseFromFindCommand(unique_p
const BSONObj& cmdObj,
bool isExplain) {
qr->_explain = isExplain;
+ bool tailable = false;
+ bool awaitData = false;
// Parse the command BSON by looping through one element at a time.
BSONObjIterator it(cmdObj);
@@ -316,7 +318,7 @@ StatusWith<unique_ptr<QueryRequest>> QueryRequest::parseFromFindCommand(unique_p
return status;
}
- qr->_tailable = el.boolean();
+ tailable = el.boolean();
} else if (fieldName == kOplogReplayField) {
Status status = checkFieldType(el, Bool);
if (!status.isOK()) {
@@ -337,7 +339,7 @@ StatusWith<unique_ptr<QueryRequest>> QueryRequest::parseFromFindCommand(unique_p
return status;
}
- qr->_awaitData = el.boolean();
+ awaitData = el.boolean();
} else if (fieldName == kPartialResultsField) {
Status status = checkFieldType(el, Bool);
if (!status.isOK()) {
@@ -385,6 +387,11 @@ StatusWith<unique_ptr<QueryRequest>> QueryRequest::parseFromFindCommand(unique_p
}
}
+ auto tailableMode = tailableModeFromBools(tailable, awaitData);
+ if (!tailableMode.isOK()) {
+ return tailableMode.getStatus();
+ }
+ qr->_tailableMode = tailableMode.getValue();
qr->addMetaProjection();
Status validateStatus = qr->validate();
@@ -494,8 +501,19 @@ void QueryRequest::asFindCommand(BSONObjBuilder* cmdBuilder) const {
cmdBuilder->append(kSnapshotField, true);
}
- if (_tailable) {
- cmdBuilder->append(kTailableField, true);
+ switch (_tailableMode) {
+ case TailableMode::kTailable: {
+ cmdBuilder->append(kTailableField, true);
+ break;
+ }
+ case TailableMode::kTailableAndAwaitData: {
+ cmdBuilder->append(kTailableField, true);
+ cmdBuilder->append(kAwaitDataField, true);
+ break;
+ }
+ case TailableMode::kNormal: {
+ break;
+ }
}
if (_oplogReplay) {
@@ -506,10 +524,6 @@ void QueryRequest::asFindCommand(BSONObjBuilder* cmdBuilder) const {
cmdBuilder->append(kNoCursorTimeoutField, true);
}
- if (_awaitData) {
- cmdBuilder->append(kAwaitDataField, true);
- }
-
if (_allowPartialResults) {
cmdBuilder->append(kPartialResultsField, true);
}
@@ -625,7 +639,7 @@ Status QueryRequest::validate() const {
<< _maxTimeMS);
}
- if (_tailable) {
+ if (_tailableMode != TailableMode::kNormal) {
// Tailable cursors cannot have any sort other than {$natural: 1}.
const BSONObj expectedSort = BSON(kNaturalSortField << 1);
if (!_sort.isEmpty() &&
@@ -641,10 +655,6 @@ Status QueryRequest::validate() const {
}
}
- if (_awaitData && !_tailable) {
- return Status(ErrorCodes::BadValue, "Cannot set awaitData without tailable");
- }
-
return Status::OK();
}
@@ -917,7 +927,8 @@ Status QueryRequest::initFullQuery(const BSONObj& top) {
}
_maxTimeMS = maxTimeMS.getValue();
} else if (str::equals("comment", name)) {
- // Legacy $comment can be any BSON element. Convert to string if it isn't already.
+ // Legacy $comment can be any BSON element. Convert to string if it isn't
+ // already.
if (e.type() == BSONType::String) {
_comment = e.str();
} else {
@@ -932,8 +943,11 @@ Status QueryRequest::initFullQuery(const BSONObj& top) {
int QueryRequest::getOptions() const {
int options = 0;
- if (_tailable) {
+ if (_tailableMode == TailableMode::kTailable) {
+ options |= QueryOption_CursorTailable;
+ } else if (_tailableMode == TailableMode::kTailableAndAwaitData) {
options |= QueryOption_CursorTailable;
+ options |= QueryOption_AwaitData;
}
if (_slaveOk) {
options |= QueryOption_SlaveOk;
@@ -944,9 +958,6 @@ int QueryRequest::getOptions() const {
if (_noCursorTimeout) {
options |= QueryOption_NoCursorTimeout;
}
- if (_awaitData) {
- options |= QueryOption_AwaitData;
- }
if (_exhaust) {
options |= QueryOption_Exhaust;
}
@@ -957,11 +968,12 @@ int QueryRequest::getOptions() const {
}
void QueryRequest::initFromInt(int options) {
- _tailable = (options & QueryOption_CursorTailable) != 0;
+ bool tailable = (options & QueryOption_CursorTailable) != 0;
+ bool awaitData = (options & QueryOption_AwaitData) != 0;
+ _tailableMode = uassertStatusOK(tailableModeFromBools(tailable, awaitData));
_slaveOk = (options & QueryOption_SlaveOk) != 0;
_oplogReplay = (options & QueryOption_OplogReplay) != 0;
_noCursorTimeout = (options & QueryOption_NoCursorTimeout) != 0;
- _awaitData = (options & QueryOption_AwaitData) != 0;
_exhaust = (options & QueryOption_Exhaust) != 0;
_allowPartialResults = (options & QueryOption_PartialResults) != 0;
}
@@ -1010,9 +1022,9 @@ StatusWith<BSONObj> QueryRequest::asAggregationCommand() const {
return {ErrorCodes::InvalidPipelineOperator,
str::stream() << "Option " << kSnapshotField << " not supported in aggregation."};
}
- if (_tailable) {
+ if (isTailable()) {
return {ErrorCodes::InvalidPipelineOperator,
- str::stream() << "Option " << kTailableField << " not supported in aggregation."};
+ "Tailable cursors are not supported in aggregation."};
}
if (_oplogReplay) {
return {ErrorCodes::InvalidPipelineOperator,
@@ -1024,10 +1036,6 @@ StatusWith<BSONObj> QueryRequest::asAggregationCommand() const {
str::stream() << "Option " << kNoCursorTimeoutField
<< " not supported in aggregation."};
}
- if (_awaitData) {
- return {ErrorCodes::InvalidPipelineOperator,
- str::stream() << "Option " << kAwaitDataField << " not supported in aggregation."};
- }
if (_allowPartialResults) {
return {ErrorCodes::InvalidPipelineOperator,
str::stream() << "Option " << kPartialResultsField