summaryrefslogtreecommitdiff
path: root/src/mongo/s
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/s')
-rw-r--r--src/mongo/s/query/blocking_results_merger.cpp49
-rw-r--r--src/mongo/s/query/blocking_results_merger.h15
-rw-r--r--src/mongo/s/query/blocking_results_merger_test.cpp156
-rw-r--r--src/mongo/s/query/cluster_aggregate.cpp34
-rw-r--r--src/mongo/s/query/document_source_merge_cursors.cpp6
-rw-r--r--src/mongo/s/query/document_source_merge_cursors.h4
-rw-r--r--src/mongo/s/query/router_stage_merge.h2
7 files changed, 237 insertions, 29 deletions
diff --git a/src/mongo/s/query/blocking_results_merger.cpp b/src/mongo/s/query/blocking_results_merger.cpp
index b572a51705e..2431901340d 100644
--- a/src/mongo/s/query/blocking_results_merger.cpp
+++ b/src/mongo/s/query/blocking_results_merger.cpp
@@ -31,16 +31,51 @@
#include "mongo/platform/basic.h"
#include "mongo/db/query/find_common.h"
+#include "mongo/db/session_catalog_mongod.h"
+#include "mongo/db/transaction_participant.h"
#include "mongo/s/query/blocking_results_merger.h"
namespace mongo {
BlockingResultsMerger::BlockingResultsMerger(OperationContext* opCtx,
AsyncResultsMergerParams&& armParams,
- executor::TaskExecutor* executor)
+ executor::TaskExecutor* executor,
+ std::unique_ptr<ResourceYielder> resourceYielder)
: _tailableMode(armParams.getTailableMode().value_or(TailableModeEnum::kNormal)),
_executor(executor),
- _arm(opCtx, executor, std::move(armParams)) {}
+ _arm(opCtx, executor, std::move(armParams)),
+ _resourceYielder(std::move(resourceYielder)) {}
+
+StatusWith<stdx::cv_status> BlockingResultsMerger::doWaiting(
+ OperationContext* opCtx, const std::function<StatusWith<stdx::cv_status>()>& waitFn) noexcept {
+
+ if (_resourceYielder) {
+ try {
+ // The BRM interface returns Statuses. Be sure we respect that here.
+ _resourceYielder->yield(opCtx);
+ } catch (const DBException& e) {
+ return e.toStatus();
+ }
+ }
+
+ boost::optional<StatusWith<stdx::cv_status>> result;
+ try {
+ // This shouldn't throw, but we cannot enforce that.
+ result = waitFn();
+ } catch (const DBException&) {
+ MONGO_UNREACHABLE;
+ }
+
+ if (_resourceYielder) {
+ try {
+ _resourceYielder->unyield(opCtx);
+ } catch (const DBException& e) {
+ return e.toStatus();
+ }
+ }
+
+ return *result;
+}
StatusWith<ClusterQueryResult> BlockingResultsMerger::awaitNextWithTimeout(
OperationContext* opCtx, RouterExecStage::ExecContext execCtx) {
@@ -54,9 +89,10 @@ StatusWith<ClusterQueryResult> BlockingResultsMerger::awaitNextWithTimeout(
}
auto event = nextEventStatus.getValue();
- // Block until there are further results to return, or our time limit is exceeded.
- auto waitStatus =
- _executor->waitForEvent(opCtx, event, awaitDataState(opCtx).waitForInsertsDeadline);
+ const auto waitStatus = doWaiting(opCtx, [this, opCtx, &event]() {
+ return _executor->waitForEvent(
+ opCtx, event, awaitDataState(opCtx).waitForInsertsDeadline);
+ });
if (!waitStatus.isOK()) {
return waitStatus.getStatus();
@@ -84,7 +120,8 @@ StatusWith<ClusterQueryResult> BlockingResultsMerger::blockUntilNext(OperationCo
auto event = nextEventStatus.getValue();
// Block until there are further results to return.
- auto status = _executor->waitForEvent(opCtx, event);
+ auto status = doWaiting(
+ opCtx, [this, opCtx, &event]() { return _executor->waitForEvent(opCtx, event); });
if (!status.isOK()) {
return status.getStatus();
diff --git a/src/mongo/s/query/blocking_results_merger.h b/src/mongo/s/query/blocking_results_merger.h
index f35ce904709..7d82cdff49c 100644
--- a/src/mongo/s/query/blocking_results_merger.h
+++ b/src/mongo/s/query/blocking_results_merger.h
@@ -43,7 +43,8 @@ class BlockingResultsMerger {
public:
BlockingResultsMerger(OperationContext* opCtx,
AsyncResultsMergerParams&& arm,
- executor::TaskExecutor*);
+ executor::TaskExecutor*,
+ std::unique_ptr<ResourceYielder> resourceYielder);
/**
* Blocks until the next result is available or an error is detected.
@@ -100,6 +101,14 @@ private:
*/
StatusWith<executor::TaskExecutor::EventHandle> getNextEvent();
+ /**
+ * Call the waitFn and return the result, yielding resources while waiting if necessary.
+ * 'waitFn' may not throw.
+ */
+ StatusWith<stdx::cv_status> doWaiting(
+ OperationContext* opCtx,
+ const std::function<StatusWith<stdx::cv_status>()>& waitFn) noexcept;
+
TailableModeEnum _tailableMode;
executor::TaskExecutor* _executor;
@@ -110,6 +119,10 @@ private:
// and pick back up waiting for it on the next call to 'next()'.
executor::TaskExecutor::EventHandle _leftoverEventFromLastTimeout;
AsyncResultsMerger _arm;
+
+ // Provides interface for yielding and "unyielding" resources while waiting for results from
+ // the network. A value of nullptr implies that no such yielding or unyielding is necessary.
+ std::unique_ptr<ResourceYielder> _resourceYielder;
};
} // namespace mongo
diff --git a/src/mongo/s/query/blocking_results_merger_test.cpp b/src/mongo/s/query/blocking_results_merger_test.cpp
index 961b3b5912c..2f76d576e3d 100644
--- a/src/mongo/s/query/blocking_results_merger_test.cpp
+++ b/src/mongo/s/query/blocking_results_merger_test.cpp
@@ -32,6 +32,7 @@
#include "mongo/s/query/blocking_results_merger.h"
#include "mongo/s/query/results_merger_test_fixture.h"
+#include "mongo/unittest/unittest.h"
namespace mongo {
@@ -43,9 +44,33 @@ TEST_F(ResultsMergerTestFixture, ShouldBeAbleToBlockUntilKilled) {
std::vector<RemoteCursor> cursors;
cursors.emplace_back(
makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
+ BlockingResultsMerger blockingMerger(operationContext(),
+ makeARMParamsFromExistingCursors(std::move(cursors)),
+ executor(),
+ nullptr);
+
+ blockingMerger.kill(operationContext());
+}
+
+TEST_F(ResultsMergerTestFixture, ShouldBeAbleToBlockUntilDeadlineExpires) {
+ std::vector<RemoteCursor> cursors;
+ cursors.emplace_back(
+ makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
+ auto params = makeARMParamsFromExistingCursors(std::move(cursors));
+ params.setTailableMode(TailableModeEnum::kTailableAndAwaitData);
BlockingResultsMerger blockingMerger(
- operationContext(), makeARMParamsFromExistingCursors(std::move(cursors)), executor());
+ operationContext(), std::move(params), executor(), nullptr);
+ // Issue a blocking wait for the next result asynchronously on a different thread.
+ auto future = launchAsync([&]() {
+ auto next = unittest::assertGet(
+ blockingMerger.next(operationContext(), RouterExecStage::ExecContext::kInitialFind));
+
+ // The timeout should hit, and return an empty object.
+ ASSERT_TRUE(next.isEOF());
+ });
+
+ future.timed_get(kFutureTimeout);
blockingMerger.kill(operationContext());
}
@@ -53,8 +78,10 @@ TEST_F(ResultsMergerTestFixture, ShouldBeAbleToBlockUntilNextResultIsReady) {
std::vector<RemoteCursor> cursors;
cursors.emplace_back(
makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
- BlockingResultsMerger blockingMerger(
- operationContext(), makeARMParamsFromExistingCursors(std::move(cursors)), executor());
+ BlockingResultsMerger blockingMerger(operationContext(),
+ makeARMParamsFromExistingCursors(std::move(cursors)),
+ executor(),
+ nullptr);
// Issue a blocking wait for the next result asynchronously on a different thread.
auto future = launchAsync([&]() {
@@ -78,12 +105,57 @@ TEST_F(ResultsMergerTestFixture, ShouldBeAbleToBlockUntilNextResultIsReady) {
future.timed_get(kFutureTimeout);
}
+TEST_F(ResultsMergerTestFixture, ShouldBeAbleToBlockUntilNextResultIsReadyWithDeadline) {
+ std::vector<RemoteCursor> cursors;
+ cursors.emplace_back(
+ makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
+ auto params = makeARMParamsFromExistingCursors(std::move(cursors));
+ params.setTailableMode(TailableModeEnum::kTailableAndAwaitData);
+ BlockingResultsMerger blockingMerger(
+ operationContext(), std::move(params), executor(), nullptr);
+
+ // Used for synchronizing the background thread with this thread.
+ stdx::mutex mutex;
+ stdx::unique_lock<stdx::mutex> lk(mutex);
+
+ // Issue a blocking wait for the next result asynchronously on a different thread.
+ auto future = launchAsync([&]() {
+ // Will schedule a getMore. No one will send a response, so will return EOF.
+ auto next = unittest::assertGet(blockingMerger.next(
+ operationContext(), RouterExecStage::ExecContext::kGetMoreNoResultsYet));
+ ASSERT_TRUE(next.isEOF());
+
+ // Block until the main thread has responded to the getMore.
+ stdx::unique_lock<stdx::mutex> lk(mutex);
+
+ next = unittest::assertGet(blockingMerger.next(
+ operationContext(), RouterExecStage::ExecContext::kGetMoreNoResultsYet));
+ ASSERT_FALSE(next.isEOF());
+ ASSERT_BSONOBJ_EQ(*next.getResult(), BSON("x" << 1));
+
+ });
+
+ // Schedule the response to the getMore which will return the next result and mark the cursor as
+ // exhausted.
+ onCommand([&](const auto& request) {
+ ASSERT(request.cmdObj["getMore"]);
+ return CursorResponse(kTestNss, 0LL, {BSON("x" << 1)})
+ .toBSON(CursorResponse::ResponseType::SubsequentResponse);
+ });
+
+ // Unblock the other thread, allowing it to call next() on the BlockingResultsMerger.
+ lk.unlock();
+
+ future.timed_get(kFutureTimeout);
+}
+
TEST_F(ResultsMergerTestFixture, ShouldBeInterruptableDuringBlockingNext) {
std::vector<RemoteCursor> cursors;
cursors.emplace_back(
makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
auto params = makeARMParamsFromExistingCursors(std::move(cursors));
- BlockingResultsMerger blockingMerger(operationContext(), std::move(params), executor());
+ BlockingResultsMerger blockingMerger(
+ operationContext(), std::move(params), executor(), nullptr);
// Issue a blocking wait for the next result asynchronously on a different thread.
auto future = launchAsync([&]() {
@@ -116,5 +188,81 @@ TEST_F(ResultsMergerTestFixture, ShouldBeInterruptableDuringBlockingNext) {
future.timed_get(kFutureTimeout);
}
+TEST_F(ResultsMergerTestFixture, ShouldBeAbleToHandleExceptionWhenYielding) {
+ class ThrowyResourceYielder : public ResourceYielder {
+ public:
+ void yield(OperationContext*) {
+ uasserted(ErrorCodes::BadValue, "Simulated error");
+ }
+
+ void unyield(OperationContext*) {}
+ };
+
+ std::vector<RemoteCursor> cursors;
+ cursors.emplace_back(
+ makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
+ BlockingResultsMerger blockingMerger(operationContext(),
+ makeARMParamsFromExistingCursors(std::move(cursors)),
+ executor(),
+ std::make_unique<ThrowyResourceYielder>());
+
+ // Issue a blocking wait for the next result asynchronously on a different thread.
+ auto future = launchAsync([&]() {
+ // Make sure that the next() call throws correctly.
+ const auto status =
+ blockingMerger.next(operationContext(), RouterExecStage::ExecContext::kInitialFind)
+ .getStatus();
+ ASSERT_EQ(status, ErrorCodes::BadValue);
+ });
+
+ // Schedule the response to the getMore which will return the next result and mark the cursor as
+ // exhausted.
+ onCommand([&](const auto& request) {
+ ASSERT(request.cmdObj["getMore"]);
+ return CursorResponse(kTestNss, 0LL, {BSON("x" << 1)})
+ .toBSON(CursorResponse::ResponseType::SubsequentResponse);
+ });
+
+ future.timed_get(kFutureTimeout);
+}
+
+TEST_F(ResultsMergerTestFixture, ShouldBeAbleToHandleExceptionWhenUnyielding) {
+ class ThrowyResourceYielder : public ResourceYielder {
+ public:
+ void yield(OperationContext*) {}
+
+ void unyield(OperationContext*) {
+ uasserted(ErrorCodes::BadValue, "Simulated error");
+ }
+ };
+
+ std::vector<RemoteCursor> cursors;
+ cursors.emplace_back(
+ makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
+ BlockingResultsMerger blockingMerger(operationContext(),
+ makeARMParamsFromExistingCursors(std::move(cursors)),
+ executor(),
+ std::make_unique<ThrowyResourceYielder>());
+
+ // Issue a blocking wait for the next result asynchronously on a different thread.
+ auto future = launchAsync([&]() {
+ // Make sure that the next() call throws correctly.
+ const auto status =
+ blockingMerger.next(operationContext(), RouterExecStage::ExecContext::kInitialFind)
+ .getStatus();
+ ASSERT_EQ(status, ErrorCodes::BadValue);
+ });
+
+ // Schedule the response to the getMore which will return the next result and mark the cursor as
+ // exhausted.
+ onCommand([&](const auto& request) {
+ ASSERT(request.cmdObj["getMore"]);
+ return CursorResponse(kTestNss, 0LL, {BSON("x" << 1)})
+ .toBSON(CursorResponse::ResponseType::SubsequentResponse);
+ });
+
+ future.timed_get(kFutureTimeout);
+}
+
} // namespace
} // namespace mongo
diff --git a/src/mongo/s/query/cluster_aggregate.cpp b/src/mongo/s/query/cluster_aggregate.cpp
index 12270fc1c99..3ead06bc1f1 100644
--- a/src/mongo/s/query/cluster_aggregate.cpp
+++ b/src/mongo/s/query/cluster_aggregate.cpp
@@ -105,6 +105,7 @@ Status appendCursorResponseToCommandResult(const ShardId& shardId,
BSONObj createCommandForMergingShard(const AggregationRequest& request,
const boost::intrusive_ptr<ExpressionContext>& mergeCtx,
const ShardId& shardId,
+ bool mergingShardContributesData,
const Pipeline* pipelineForMerging) {
MutableDocument mergeCmd(request.serializeToCommandObj());
@@ -119,9 +120,19 @@ BSONObj createCommandForMergingShard(const AggregationRequest& request,
: Value(Document{CollationSpec::kSimpleSpec});
}
+ const auto txnRouter = TransactionRouter::get(mergeCtx->opCtx);
+ if (txnRouter && mergingShardContributesData) {
+ // Don't include a readConcern since we can only include read concerns on the _first_
+ // command sent to a participant per transaction. Assuming the merging shard is a
+ // participant, it will already have received another 'aggregate' command earlier which
+ // contained a readConcern.
+
+ mergeCmd.remove("readConcern");
+ }
+
auto aggCmd = mergeCmd.freeze().toBson();
- if (auto txnRouter = TransactionRouter::get(mergeCtx->opCtx)) {
+ if (txnRouter) {
aggCmd = txnRouter->attachTxnFieldsIfNeeded(shardId, aggCmd);
}
@@ -609,19 +620,16 @@ Status dispatchMergingPipeline(const boost::intrusive_ptr<ExpressionContext>& ex
// therefore must have a valid routing table.
invariant(routingInfo);
- // TODO SERVER-33683 allowing an aggregation within a transaction can lead to a deadlock in the
- // SessionCatalog when a pipeline with a $mergeCursors sends a getMore to itself.
- uassert(ErrorCodes::OperationNotSupportedInTransaction,
- "Cannot specify a transaction number in combination with an aggregation on mongos when "
- "merging on a shard",
- !opCtx->getTxnNumber());
-
- ShardId mergingShardId = pickMergingShard(opCtx,
- shardDispatchResults.needsPrimaryShardMerge,
- targetedShards,
- routingInfo->db().primaryId());
+ const ShardId mergingShardId = pickMergingShard(opCtx,
+ shardDispatchResults.needsPrimaryShardMerge,
+ targetedShards,
+ routingInfo->db().primaryId());
+ const bool mergingShardContributesData =
+ std::find(targetedShards.begin(), targetedShards.end(), mergingShardId) !=
+ targetedShards.end();
- auto mergeCmdObj = createCommandForMergingShard(request, expCtx, mergingShardId, mergePipeline);
+ auto mergeCmdObj = createCommandForMergingShard(
+ request, expCtx, mergingShardId, mergingShardContributesData, mergePipeline);
// Dispatch $mergeCursors to the chosen shard, store the resulting cursor, and return.
auto mergeResponse = establishMergingShardCursor(
diff --git a/src/mongo/s/query/document_source_merge_cursors.cpp b/src/mongo/s/query/document_source_merge_cursors.cpp
index fe1303684da..fbe93ea1b53 100644
--- a/src/mongo/s/query/document_source_merge_cursors.cpp
+++ b/src/mongo/s/query/document_source_merge_cursors.cpp
@@ -74,7 +74,11 @@ bool DocumentSourceMergeCursors::remotesExhausted() const {
void DocumentSourceMergeCursors::populateMerger() {
invariant(!_blockingResultsMerger);
invariant(_armParams);
- _blockingResultsMerger.emplace(pExpCtx->opCtx, std::move(*_armParams), _executor);
+
+ _blockingResultsMerger.emplace(pExpCtx->opCtx,
+ std::move(*_armParams),
+ _executor,
+ pExpCtx->mongoProcessInterface->getResourceYielder());
_armParams = boost::none;
// '_blockingResultsMerger' now owns the cursors.
_ownCursors = false;
diff --git a/src/mongo/s/query/document_source_merge_cursors.h b/src/mongo/s/query/document_source_merge_cursors.h
index a3d65bcab27..5249da51a8e 100644
--- a/src/mongo/s/query/document_source_merge_cursors.h
+++ b/src/mongo/s/query/document_source_merge_cursors.h
@@ -89,9 +89,7 @@ public:
HostTypeRequirement::kNone,
DiskUseRequirement::kNoDiskUse,
FacetRequirement::kNotAllowed,
- // TODO SERVER-33683: Permit $mergeCursors with readConcern
- // level "snapshot".
- TransactionRequirement::kNotAllowed);
+ TransactionRequirement::kAllowed);
constraints.requiresInputDocSource = false;
return constraints;
diff --git a/src/mongo/s/query/router_stage_merge.h b/src/mongo/s/query/router_stage_merge.h
index 7833c6550c0..6c8e389eded 100644
--- a/src/mongo/s/query/router_stage_merge.h
+++ b/src/mongo/s/query/router_stage_merge.h
@@ -47,7 +47,7 @@ public:
RouterStageMerge(OperationContext* opCtx,
executor::TaskExecutor* executor,
AsyncResultsMergerParams&& armParams)
- : RouterExecStage(opCtx), _resultsMerger(opCtx, std::move(armParams), executor) {}
+ : RouterExecStage(opCtx), _resultsMerger(opCtx, std::move(armParams), executor, nullptr) {}
StatusWith<ClusterQueryResult> next(ExecContext execCtx) final {
return _resultsMerger.next(getOpCtx(), execCtx);