diff options
author | Charlie Swanson <charlie.swanson@mongodb.com> | 2017-09-05 11:23:54 -0400 |
---|---|---|
committer | Charlie Swanson <charlie.swanson@mongodb.com> | 2017-09-13 14:44:09 -0400 |
commit | fe125855b6b3e8feb9d7d666338a7f2d29d301ad (patch) | |
tree | c682c408675b895bd343dd7187de8be18e875f66 /src/mongo/s/query | |
parent | 61d1cfbf2c8521126506c12bcd2d187a7926fbe0 (diff) | |
download | mongo-fe125855b6b3e8feb9d7d666338a7f2d29d301ad.tar.gz |
SERVER-29142 Support $changeStream on unsharded collections.
Diffstat (limited to 'src/mongo/s/query')
-rw-r--r-- | src/mongo/s/query/async_results_merger.cpp | 18 | ||||
-rw-r--r-- | src/mongo/s/query/async_results_merger_test.cpp | 3 | ||||
-rw-r--r-- | src/mongo/s/query/cluster_client_cursor_impl.cpp | 2 | ||||
-rw-r--r-- | src/mongo/s/query/cluster_client_cursor_params.h | 9 | ||||
-rw-r--r-- | src/mongo/s/query/cluster_find.cpp | 5 | ||||
-rw-r--r-- | src/mongo/s/query/store_possible_cursor.cpp | 4 | ||||
-rw-r--r-- | src/mongo/s/query/store_possible_cursor.h | 4 |
7 files changed, 26 insertions, 19 deletions
diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp index 50886944bb2..b867c7b43df 100644 --- a/src/mongo/s/query/async_results_merger.cpp +++ b/src/mongo/s/query/async_results_merger.cpp @@ -102,7 +102,7 @@ bool AsyncResultsMerger::remotesExhausted_inlock() { Status AsyncResultsMerger::setAwaitDataTimeout(Milliseconds awaitDataTimeout) { stdx::lock_guard<stdx::mutex> lk(_mutex); - if (!_params->isTailable || !_params->isAwaitData) { + if (_params->tailableMode != TailableMode::kTailableAndAwaitData) { return Status(ErrorCodes::BadValue, "maxTimeMS can only be used with getMore for tailable, awaitData cursors"); } @@ -155,7 +155,7 @@ bool AsyncResultsMerger::ready_inlock() { bool AsyncResultsMerger::readySorted_inlock() { // Tailable cursors cannot have a sort. - invariant(!_params->isTailable); + invariant(_params->tailableMode == TailableMode::kNormal); for (const auto& remote : _remotes) { if (!remote.hasNext() && !remote.exhausted()) { @@ -203,7 +203,7 @@ StatusWith<ClusterQueryResult> AsyncResultsMerger::nextReady() { ClusterQueryResult AsyncResultsMerger::nextReadySorted() { // Tailable cursors cannot have a sort. - invariant(!_params->isTailable); + invariant(_params->tailableMode == TailableMode::kNormal); if (_mergeQueue.empty()) { return {}; @@ -237,7 +237,8 @@ ClusterQueryResult AsyncResultsMerger::nextReadyUnsorted() { ClusterQueryResult front = _remotes[_gettingFromRemote].docBuffer.front(); _remotes[_gettingFromRemote].docBuffer.pop(); - if (_params->isTailable && !_remotes[_gettingFromRemote].hasNext()) { + if (_params->tailableMode != TailableMode::kNormal && + !_remotes[_gettingFromRemote].hasNext()) { // The cursor is tailable and we're about to return the last buffered result. This // means that the next value returned should be boost::none to indicate the end of // the batch. @@ -413,6 +414,10 @@ void AsyncResultsMerger::handleBatchResponse( cbData.response.isOK() ? parseCursorResponse(cbData.response.data, remote) : cbData.response.status); if (!cursorResponseStatus.isOK()) { + if (cursorResponseStatus == ErrorCodes::ExceededTimeLimit && + _params->tailableMode != TailableMode::kNormal) { + // We timed out before hearing back from the shard, + } remote.status = cursorResponseStatus.getStatus(); // Unreachable host errors are swallowed if the 'allowPartialResults' option is set. We // remove the unreachable host entirely from consideration by marking it as exhausted. @@ -444,7 +449,7 @@ void AsyncResultsMerger::handleBatchResponse( // be boost::none in order to indicate the end of the batch. // (Note: tailable cursors are only valid on unsharded collections, so the end of the batch from // one shard means the end of the overall batch). - if (_params->isTailable && !remote.hasNext()) { + if (_params->tailableMode != TailableMode::kNormal && !remote.hasNext()) { _eofNext = true; } @@ -453,7 +458,8 @@ void AsyncResultsMerger::handleBatchResponse( // // We do not ask for the next batch if the cursor is tailable, as batches received from remote // tailable cursors should be passed through to the client without asking for more batches. - if (!_params->isTailable && !remote.hasNext() && !remote.exhausted()) { + if (_params->tailableMode == TailableMode::kNormal && !remote.hasNext() && + !remote.exhausted()) { remote.status = askForNextBatch_inlock(remoteIndex); if (!remote.status.isOK()) { return; diff --git a/src/mongo/s/query/async_results_merger_test.cpp b/src/mongo/s/query/async_results_merger_test.cpp index 6076a559867..071ec469a1b 100644 --- a/src/mongo/s/query/async_results_merger_test.cpp +++ b/src/mongo/s/query/async_results_merger_test.cpp @@ -127,8 +127,7 @@ protected: _params->limit = qr->getLimit(); _params->batchSize = getMoreBatchSize ? getMoreBatchSize : qr->getBatchSize(); _params->skip = qr->getSkip(); - _params->isTailable = qr->isTailable(); - _params->isAwaitData = qr->isAwaitData(); + _params->tailableMode = qr->getTailableMode(); _params->isAllowPartialResults = qr->isAllowPartialResults(); } diff --git a/src/mongo/s/query/cluster_client_cursor_impl.cpp b/src/mongo/s/query/cluster_client_cursor_impl.cpp index f286cee408e..9ec302ea579 100644 --- a/src/mongo/s/query/cluster_client_cursor_impl.cpp +++ b/src/mongo/s/query/cluster_client_cursor_impl.cpp @@ -109,7 +109,7 @@ void ClusterClientCursorImpl::detachFromOperationContext() { } bool ClusterClientCursorImpl::isTailable() const { - return _params.isTailable; + return _params.tailableMode != TailableMode::kNormal; } UserNameIterator ClusterClientCursorImpl::getAuthenticatedUsers() const { diff --git a/src/mongo/s/query/cluster_client_cursor_params.h b/src/mongo/s/query/cluster_client_cursor_params.h index 1b4d76124c3..e3a5e4f62cb 100644 --- a/src/mongo/s/query/cluster_client_cursor_params.h +++ b/src/mongo/s/query/cluster_client_cursor_params.h @@ -39,6 +39,7 @@ #include "mongo/db/namespace_string.h" #include "mongo/db/pipeline/pipeline.h" #include "mongo/db/query/cursor_response.h" +#include "mongo/db/query/tailable_mode.h" #include "mongo/s/client/shard.h" #include "mongo/util/net/hostandport.h" @@ -114,11 +115,9 @@ struct ClusterClientCursorParams { // If set, we use this pipeline to merge the output of aggregations on each remote. std::unique_ptr<Pipeline, Pipeline::Deleter> mergePipeline; - // Whether this cursor is tailing a capped collection. - bool isTailable = false; - - // Whether this cursor has the awaitData option set. - bool isAwaitData = false; + // Whether this cursor is tailing a capped collection, and whether it has the awaitData option + // set. + TailableMode tailableMode = TailableMode::kNormal; // Set if a readPreference must be respected throughout the lifetime of the cursor. boost::optional<ReadPreferenceSetting> readPreference; diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp index 57050b408b6..ba53581905b 100644 --- a/src/mongo/s/query/cluster_find.cpp +++ b/src/mongo/s/query/cluster_find.cpp @@ -190,8 +190,7 @@ StatusWith<CursorId> runQueryWithoutRetrying(OperationContext* opCtx, params.limit = query.getQueryRequest().getLimit(); params.batchSize = query.getQueryRequest().getEffectiveBatchSize(); params.skip = query.getQueryRequest().getSkip(); - params.isTailable = query.getQueryRequest().isTailable(); - params.isAwaitData = query.getQueryRequest().isAwaitData(); + params.tailableMode = query.getQueryRequest().getTailableMode(); params.isAllowPartialResults = query.getQueryRequest().isAllowPartialResults(); // This is the batchSize passed to each subsequent getMore command issued by the cursor. We @@ -209,7 +208,7 @@ StatusWith<CursorId> runQueryWithoutRetrying(OperationContext* opCtx, } // Tailable cursors can't have a sort, which should have already been validated. - invariant(params.sort.isEmpty() || !params.isTailable); + invariant(params.sort.isEmpty() || !query.getQueryRequest().isTailable()); const auto qrToForward = transformQueryForShards(query.getQueryRequest()); if (!qrToForward.isOK()) { diff --git a/src/mongo/s/query/store_possible_cursor.cpp b/src/mongo/s/query/store_possible_cursor.cpp index 506ac226636..f611e612d2a 100644 --- a/src/mongo/s/query/store_possible_cursor.cpp +++ b/src/mongo/s/query/store_possible_cursor.cpp @@ -46,7 +46,8 @@ StatusWith<BSONObj> storePossibleCursor(OperationContext* opCtx, const BSONObj& cmdResult, const NamespaceString& requestedNss, executor::TaskExecutor* executor, - ClusterCursorManager* cursorManager) { + ClusterCursorManager* cursorManager, + TailableMode tailableMode) { if (!cmdResult["ok"].trueValue() || !cmdResult.hasField("cursor")) { return cmdResult; } @@ -68,6 +69,7 @@ StatusWith<BSONObj> storePossibleCursor(OperationContext* opCtx, CursorResponse(incomingCursorResponse.getValue().getNSS(), incomingCursorResponse.getValue().getCursorId(), {})); + params.tailableMode = tailableMode; auto ccc = ClusterClientCursorImpl::make(opCtx, executor, std::move(params)); diff --git a/src/mongo/s/query/store_possible_cursor.h b/src/mongo/s/query/store_possible_cursor.h index 14d2942d66d..75a4e76bf24 100644 --- a/src/mongo/s/query/store_possible_cursor.h +++ b/src/mongo/s/query/store_possible_cursor.h @@ -30,6 +30,7 @@ #include "mongo/db/namespace_string.h" #include "mongo/db/operation_context.h" +#include "mongo/db/query/tailable_mode.h" #include "mongo/s/shard_id.h" namespace mongo { @@ -72,6 +73,7 @@ StatusWith<BSONObj> storePossibleCursor(OperationContext* opCtx, const BSONObj& cmdResult, const NamespaceString& requestedNss, executor::TaskExecutor* executor, - ClusterCursorManager* cursorManager); + ClusterCursorManager* cursorManager, + TailableMode tailableMode = TailableMode::kNormal); } // namespace mongo |