diff options
author | Charlie Swanson <charlie.swanson@mongodb.com> | 2017-09-05 11:23:54 -0400 |
---|---|---|
committer | Charlie Swanson <charlie.swanson@mongodb.com> | 2017-09-13 14:44:09 -0400 |
commit | fe125855b6b3e8feb9d7d666338a7f2d29d301ad (patch) | |
tree | c682c408675b895bd343dd7187de8be18e875f66 /src/mongo/db/query/query_request.cpp | |
parent | 61d1cfbf2c8521126506c12bcd2d187a7926fbe0 (diff) | |
download | mongo-fe125855b6b3e8feb9d7d666338a7f2d29d301ad.tar.gz |
SERVER-29142 Support $changeStream on unsharded collections.
Diffstat (limited to 'src/mongo/db/query/query_request.cpp')
-rw-r--r-- | src/mongo/db/query/query_request.cpp | 60 |
1 files changed, 34 insertions, 26 deletions
diff --git a/src/mongo/db/query/query_request.cpp b/src/mongo/db/query/query_request.cpp index 00a8915c1c4..cd09a591955 100644 --- a/src/mongo/db/query/query_request.cpp +++ b/src/mongo/db/query/query_request.cpp @@ -127,6 +127,8 @@ StatusWith<unique_ptr<QueryRequest>> QueryRequest::parseFromFindCommand(unique_p const BSONObj& cmdObj, bool isExplain) { qr->_explain = isExplain; + bool tailable = false; + bool awaitData = false; // Parse the command BSON by looping through one element at a time. BSONObjIterator it(cmdObj); @@ -316,7 +318,7 @@ StatusWith<unique_ptr<QueryRequest>> QueryRequest::parseFromFindCommand(unique_p return status; } - qr->_tailable = el.boolean(); + tailable = el.boolean(); } else if (fieldName == kOplogReplayField) { Status status = checkFieldType(el, Bool); if (!status.isOK()) { @@ -337,7 +339,7 @@ StatusWith<unique_ptr<QueryRequest>> QueryRequest::parseFromFindCommand(unique_p return status; } - qr->_awaitData = el.boolean(); + awaitData = el.boolean(); } else if (fieldName == kPartialResultsField) { Status status = checkFieldType(el, Bool); if (!status.isOK()) { @@ -385,6 +387,11 @@ StatusWith<unique_ptr<QueryRequest>> QueryRequest::parseFromFindCommand(unique_p } } + auto tailableMode = tailableModeFromBools(tailable, awaitData); + if (!tailableMode.isOK()) { + return tailableMode.getStatus(); + } + qr->_tailableMode = tailableMode.getValue(); qr->addMetaProjection(); Status validateStatus = qr->validate(); @@ -494,8 +501,19 @@ void QueryRequest::asFindCommand(BSONObjBuilder* cmdBuilder) const { cmdBuilder->append(kSnapshotField, true); } - if (_tailable) { - cmdBuilder->append(kTailableField, true); + switch (_tailableMode) { + case TailableMode::kTailable: { + cmdBuilder->append(kTailableField, true); + break; + } + case TailableMode::kTailableAndAwaitData: { + cmdBuilder->append(kTailableField, true); + cmdBuilder->append(kAwaitDataField, true); + break; + } + case TailableMode::kNormal: { + break; + } } if (_oplogReplay) { @@ -506,10 +524,6 @@ void QueryRequest::asFindCommand(BSONObjBuilder* cmdBuilder) const { cmdBuilder->append(kNoCursorTimeoutField, true); } - if (_awaitData) { - cmdBuilder->append(kAwaitDataField, true); - } - if (_allowPartialResults) { cmdBuilder->append(kPartialResultsField, true); } @@ -625,7 +639,7 @@ Status QueryRequest::validate() const { << _maxTimeMS); } - if (_tailable) { + if (_tailableMode != TailableMode::kNormal) { // Tailable cursors cannot have any sort other than {$natural: 1}. const BSONObj expectedSort = BSON(kNaturalSortField << 1); if (!_sort.isEmpty() && @@ -641,10 +655,6 @@ Status QueryRequest::validate() const { } } - if (_awaitData && !_tailable) { - return Status(ErrorCodes::BadValue, "Cannot set awaitData without tailable"); - } - return Status::OK(); } @@ -917,7 +927,8 @@ Status QueryRequest::initFullQuery(const BSONObj& top) { } _maxTimeMS = maxTimeMS.getValue(); } else if (str::equals("comment", name)) { - // Legacy $comment can be any BSON element. Convert to string if it isn't already. + // Legacy $comment can be any BSON element. Convert to string if it isn't + // already. if (e.type() == BSONType::String) { _comment = e.str(); } else { @@ -932,8 +943,11 @@ Status QueryRequest::initFullQuery(const BSONObj& top) { int QueryRequest::getOptions() const { int options = 0; - if (_tailable) { + if (_tailableMode == TailableMode::kTailable) { + options |= QueryOption_CursorTailable; + } else if (_tailableMode == TailableMode::kTailableAndAwaitData) { options |= QueryOption_CursorTailable; + options |= QueryOption_AwaitData; } if (_slaveOk) { options |= QueryOption_SlaveOk; @@ -944,9 +958,6 @@ int QueryRequest::getOptions() const { if (_noCursorTimeout) { options |= QueryOption_NoCursorTimeout; } - if (_awaitData) { - options |= QueryOption_AwaitData; - } if (_exhaust) { options |= QueryOption_Exhaust; } @@ -957,11 +968,12 @@ int QueryRequest::getOptions() const { } void QueryRequest::initFromInt(int options) { - _tailable = (options & QueryOption_CursorTailable) != 0; + bool tailable = (options & QueryOption_CursorTailable) != 0; + bool awaitData = (options & QueryOption_AwaitData) != 0; + _tailableMode = uassertStatusOK(tailableModeFromBools(tailable, awaitData)); _slaveOk = (options & QueryOption_SlaveOk) != 0; _oplogReplay = (options & QueryOption_OplogReplay) != 0; _noCursorTimeout = (options & QueryOption_NoCursorTimeout) != 0; - _awaitData = (options & QueryOption_AwaitData) != 0; _exhaust = (options & QueryOption_Exhaust) != 0; _allowPartialResults = (options & QueryOption_PartialResults) != 0; } @@ -1010,9 +1022,9 @@ StatusWith<BSONObj> QueryRequest::asAggregationCommand() const { return {ErrorCodes::InvalidPipelineOperator, str::stream() << "Option " << kSnapshotField << " not supported in aggregation."}; } - if (_tailable) { + if (isTailable()) { return {ErrorCodes::InvalidPipelineOperator, - str::stream() << "Option " << kTailableField << " not supported in aggregation."}; + "Tailable cursors are not supported in aggregation."}; } if (_oplogReplay) { return {ErrorCodes::InvalidPipelineOperator, @@ -1024,10 +1036,6 @@ StatusWith<BSONObj> QueryRequest::asAggregationCommand() const { str::stream() << "Option " << kNoCursorTimeoutField << " not supported in aggregation."}; } - if (_awaitData) { - return {ErrorCodes::InvalidPipelineOperator, - str::stream() << "Option " << kAwaitDataField << " not supported in aggregation."}; - } if (_allowPartialResults) { return {ErrorCodes::InvalidPipelineOperator, str::stream() << "Option " << kPartialResultsField |