summaryrefslogtreecommitdiff
path: root/src/mongo/s/query
diff options
context:
space:
mode:
authorCharlie Swanson <charlie.swanson@mongodb.com>2017-09-05 11:23:54 -0400
committerCharlie Swanson <charlie.swanson@mongodb.com>2017-09-13 14:44:09 -0400
commitfe125855b6b3e8feb9d7d666338a7f2d29d301ad (patch)
treec682c408675b895bd343dd7187de8be18e875f66 /src/mongo/s/query
parent61d1cfbf2c8521126506c12bcd2d187a7926fbe0 (diff)
downloadmongo-fe125855b6b3e8feb9d7d666338a7f2d29d301ad.tar.gz
SERVER-29142 Support $changeStream on unsharded collections.
Diffstat (limited to 'src/mongo/s/query')
-rw-r--r--src/mongo/s/query/async_results_merger.cpp18
-rw-r--r--src/mongo/s/query/async_results_merger_test.cpp3
-rw-r--r--src/mongo/s/query/cluster_client_cursor_impl.cpp2
-rw-r--r--src/mongo/s/query/cluster_client_cursor_params.h9
-rw-r--r--src/mongo/s/query/cluster_find.cpp5
-rw-r--r--src/mongo/s/query/store_possible_cursor.cpp4
-rw-r--r--src/mongo/s/query/store_possible_cursor.h4
7 files changed, 26 insertions, 19 deletions
diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp
index 50886944bb2..b867c7b43df 100644
--- a/src/mongo/s/query/async_results_merger.cpp
+++ b/src/mongo/s/query/async_results_merger.cpp
@@ -102,7 +102,7 @@ bool AsyncResultsMerger::remotesExhausted_inlock() {
Status AsyncResultsMerger::setAwaitDataTimeout(Milliseconds awaitDataTimeout) {
stdx::lock_guard<stdx::mutex> lk(_mutex);
- if (!_params->isTailable || !_params->isAwaitData) {
+ if (_params->tailableMode != TailableMode::kTailableAndAwaitData) {
return Status(ErrorCodes::BadValue,
"maxTimeMS can only be used with getMore for tailable, awaitData cursors");
}
@@ -155,7 +155,7 @@ bool AsyncResultsMerger::ready_inlock() {
bool AsyncResultsMerger::readySorted_inlock() {
// Tailable cursors cannot have a sort.
- invariant(!_params->isTailable);
+ invariant(_params->tailableMode == TailableMode::kNormal);
for (const auto& remote : _remotes) {
if (!remote.hasNext() && !remote.exhausted()) {
@@ -203,7 +203,7 @@ StatusWith<ClusterQueryResult> AsyncResultsMerger::nextReady() {
ClusterQueryResult AsyncResultsMerger::nextReadySorted() {
// Tailable cursors cannot have a sort.
- invariant(!_params->isTailable);
+ invariant(_params->tailableMode == TailableMode::kNormal);
if (_mergeQueue.empty()) {
return {};
@@ -237,7 +237,8 @@ ClusterQueryResult AsyncResultsMerger::nextReadyUnsorted() {
ClusterQueryResult front = _remotes[_gettingFromRemote].docBuffer.front();
_remotes[_gettingFromRemote].docBuffer.pop();
- if (_params->isTailable && !_remotes[_gettingFromRemote].hasNext()) {
+ if (_params->tailableMode != TailableMode::kNormal &&
+ !_remotes[_gettingFromRemote].hasNext()) {
// The cursor is tailable and we're about to return the last buffered result. This
// means that the next value returned should be boost::none to indicate the end of
// the batch.
@@ -413,6 +414,10 @@ void AsyncResultsMerger::handleBatchResponse(
cbData.response.isOK() ? parseCursorResponse(cbData.response.data, remote)
: cbData.response.status);
if (!cursorResponseStatus.isOK()) {
+ if (cursorResponseStatus == ErrorCodes::ExceededTimeLimit &&
+ _params->tailableMode != TailableMode::kNormal) {
+ // We timed out before hearing back from the shard,
+ }
remote.status = cursorResponseStatus.getStatus();
// Unreachable host errors are swallowed if the 'allowPartialResults' option is set. We
// remove the unreachable host entirely from consideration by marking it as exhausted.
@@ -444,7 +449,7 @@ void AsyncResultsMerger::handleBatchResponse(
// be boost::none in order to indicate the end of the batch.
// (Note: tailable cursors are only valid on unsharded collections, so the end of the batch from
// one shard means the end of the overall batch).
- if (_params->isTailable && !remote.hasNext()) {
+ if (_params->tailableMode != TailableMode::kNormal && !remote.hasNext()) {
_eofNext = true;
}
@@ -453,7 +458,8 @@ void AsyncResultsMerger::handleBatchResponse(
//
// We do not ask for the next batch if the cursor is tailable, as batches received from remote
// tailable cursors should be passed through to the client without asking for more batches.
- if (!_params->isTailable && !remote.hasNext() && !remote.exhausted()) {
+ if (_params->tailableMode == TailableMode::kNormal && !remote.hasNext() &&
+ !remote.exhausted()) {
remote.status = askForNextBatch_inlock(remoteIndex);
if (!remote.status.isOK()) {
return;
diff --git a/src/mongo/s/query/async_results_merger_test.cpp b/src/mongo/s/query/async_results_merger_test.cpp
index 6076a559867..071ec469a1b 100644
--- a/src/mongo/s/query/async_results_merger_test.cpp
+++ b/src/mongo/s/query/async_results_merger_test.cpp
@@ -127,8 +127,7 @@ protected:
_params->limit = qr->getLimit();
_params->batchSize = getMoreBatchSize ? getMoreBatchSize : qr->getBatchSize();
_params->skip = qr->getSkip();
- _params->isTailable = qr->isTailable();
- _params->isAwaitData = qr->isAwaitData();
+ _params->tailableMode = qr->getTailableMode();
_params->isAllowPartialResults = qr->isAllowPartialResults();
}
diff --git a/src/mongo/s/query/cluster_client_cursor_impl.cpp b/src/mongo/s/query/cluster_client_cursor_impl.cpp
index f286cee408e..9ec302ea579 100644
--- a/src/mongo/s/query/cluster_client_cursor_impl.cpp
+++ b/src/mongo/s/query/cluster_client_cursor_impl.cpp
@@ -109,7 +109,7 @@ void ClusterClientCursorImpl::detachFromOperationContext() {
}
bool ClusterClientCursorImpl::isTailable() const {
- return _params.isTailable;
+ return _params.tailableMode != TailableMode::kNormal;
}
UserNameIterator ClusterClientCursorImpl::getAuthenticatedUsers() const {
diff --git a/src/mongo/s/query/cluster_client_cursor_params.h b/src/mongo/s/query/cluster_client_cursor_params.h
index 1b4d76124c3..e3a5e4f62cb 100644
--- a/src/mongo/s/query/cluster_client_cursor_params.h
+++ b/src/mongo/s/query/cluster_client_cursor_params.h
@@ -39,6 +39,7 @@
#include "mongo/db/namespace_string.h"
#include "mongo/db/pipeline/pipeline.h"
#include "mongo/db/query/cursor_response.h"
+#include "mongo/db/query/tailable_mode.h"
#include "mongo/s/client/shard.h"
#include "mongo/util/net/hostandport.h"
@@ -114,11 +115,9 @@ struct ClusterClientCursorParams {
// If set, we use this pipeline to merge the output of aggregations on each remote.
std::unique_ptr<Pipeline, Pipeline::Deleter> mergePipeline;
- // Whether this cursor is tailing a capped collection.
- bool isTailable = false;
-
- // Whether this cursor has the awaitData option set.
- bool isAwaitData = false;
+ // Whether this cursor is tailing a capped collection, and whether it has the awaitData option
+ // set.
+ TailableMode tailableMode = TailableMode::kNormal;
// Set if a readPreference must be respected throughout the lifetime of the cursor.
boost::optional<ReadPreferenceSetting> readPreference;
diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp
index 57050b408b6..ba53581905b 100644
--- a/src/mongo/s/query/cluster_find.cpp
+++ b/src/mongo/s/query/cluster_find.cpp
@@ -190,8 +190,7 @@ StatusWith<CursorId> runQueryWithoutRetrying(OperationContext* opCtx,
params.limit = query.getQueryRequest().getLimit();
params.batchSize = query.getQueryRequest().getEffectiveBatchSize();
params.skip = query.getQueryRequest().getSkip();
- params.isTailable = query.getQueryRequest().isTailable();
- params.isAwaitData = query.getQueryRequest().isAwaitData();
+ params.tailableMode = query.getQueryRequest().getTailableMode();
params.isAllowPartialResults = query.getQueryRequest().isAllowPartialResults();
// This is the batchSize passed to each subsequent getMore command issued by the cursor. We
@@ -209,7 +208,7 @@ StatusWith<CursorId> runQueryWithoutRetrying(OperationContext* opCtx,
}
// Tailable cursors can't have a sort, which should have already been validated.
- invariant(params.sort.isEmpty() || !params.isTailable);
+ invariant(params.sort.isEmpty() || !query.getQueryRequest().isTailable());
const auto qrToForward = transformQueryForShards(query.getQueryRequest());
if (!qrToForward.isOK()) {
diff --git a/src/mongo/s/query/store_possible_cursor.cpp b/src/mongo/s/query/store_possible_cursor.cpp
index 506ac226636..f611e612d2a 100644
--- a/src/mongo/s/query/store_possible_cursor.cpp
+++ b/src/mongo/s/query/store_possible_cursor.cpp
@@ -46,7 +46,8 @@ StatusWith<BSONObj> storePossibleCursor(OperationContext* opCtx,
const BSONObj& cmdResult,
const NamespaceString& requestedNss,
executor::TaskExecutor* executor,
- ClusterCursorManager* cursorManager) {
+ ClusterCursorManager* cursorManager,
+ TailableMode tailableMode) {
if (!cmdResult["ok"].trueValue() || !cmdResult.hasField("cursor")) {
return cmdResult;
}
@@ -68,6 +69,7 @@ StatusWith<BSONObj> storePossibleCursor(OperationContext* opCtx,
CursorResponse(incomingCursorResponse.getValue().getNSS(),
incomingCursorResponse.getValue().getCursorId(),
{}));
+ params.tailableMode = tailableMode;
auto ccc = ClusterClientCursorImpl::make(opCtx, executor, std::move(params));
diff --git a/src/mongo/s/query/store_possible_cursor.h b/src/mongo/s/query/store_possible_cursor.h
index 14d2942d66d..75a4e76bf24 100644
--- a/src/mongo/s/query/store_possible_cursor.h
+++ b/src/mongo/s/query/store_possible_cursor.h
@@ -30,6 +30,7 @@
#include "mongo/db/namespace_string.h"
#include "mongo/db/operation_context.h"
+#include "mongo/db/query/tailable_mode.h"
#include "mongo/s/shard_id.h"
namespace mongo {
@@ -72,6 +73,7 @@ StatusWith<BSONObj> storePossibleCursor(OperationContext* opCtx,
const BSONObj& cmdResult,
const NamespaceString& requestedNss,
executor::TaskExecutor* executor,
- ClusterCursorManager* cursorManager);
+ ClusterCursorManager* cursorManager,
+ TailableMode tailableMode = TailableMode::kNormal);
} // namespace mongo