summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorIan Boros <ian.boros@10gen.com>2018-11-28 18:38:15 -0500
committerIan Boros <ian.boros@10gen.com>2018-12-17 16:36:20 -0500
commitd40d24abc025690150ccf8009ba1facb9ed1c6b2 (patch)
tree8ba35e0d4f8c7e441ffe80de30faa24a708c436e /src
parentb37b5ef7ec0ec2e502423d53e6c0d6e86b343c27 (diff)
downloadmongo-d40d24abc025690150ccf8009ba1facb9ed1c6b2.tar.gz
SERVER-33683 Prevent deadlock in aggregate with transactions
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/pipeline/mongo_process_interface.h3
-rw-r--r--src/mongo/db/pipeline/mongos_process_interface.h4
-rw-r--r--src/mongo/db/pipeline/process_interface_standalone.cpp42
-rw-r--r--src/mongo/db/pipeline/process_interface_standalone.h2
-rw-r--r--src/mongo/db/pipeline/stub_mongo_process_interface.h4
-rw-r--r--src/mongo/db/resource_yielder.h48
-rw-r--r--src/mongo/db/service_entry_point_common.cpp7
-rw-r--r--src/mongo/db/session_catalog_mongod.cpp16
-rw-r--r--src/mongo/db/session_catalog_mongod.h14
-rw-r--r--src/mongo/db/transaction_participant.cpp4
-rw-r--r--src/mongo/db/transaction_participant.h2
-rw-r--r--src/mongo/db/transaction_participant_test.cpp38
-rw-r--r--src/mongo/s/query/blocking_results_merger.cpp49
-rw-r--r--src/mongo/s/query/blocking_results_merger.h15
-rw-r--r--src/mongo/s/query/blocking_results_merger_test.cpp156
-rw-r--r--src/mongo/s/query/cluster_aggregate.cpp34
-rw-r--r--src/mongo/s/query/document_source_merge_cursors.cpp6
-rw-r--r--src/mongo/s/query/document_source_merge_cursors.h4
-rw-r--r--src/mongo/s/query/router_stage_merge.h2
19 files changed, 377 insertions, 73 deletions
diff --git a/src/mongo/db/pipeline/mongo_process_interface.h b/src/mongo/db/pipeline/mongo_process_interface.h
index be934e88707..d51d348c459 100644
--- a/src/mongo/db/pipeline/mongo_process_interface.h
+++ b/src/mongo/db/pipeline/mongo_process_interface.h
@@ -48,6 +48,7 @@
#include "mongo/db/pipeline/lite_parsed_document_source.h"
#include "mongo/db/pipeline/value.h"
#include "mongo/db/query/explain_options.h"
+#include "mongo/db/resource_yielder.h"
#include "mongo/db/storage/backup_cursor_state.h"
#include "mongo/s/chunk_version.h"
@@ -316,6 +317,8 @@ public:
virtual void checkRoutingInfoEpochOrThrow(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& nss,
ChunkVersion targetCollectionVersion) const = 0;
+
+ virtual std::unique_ptr<ResourceYielder> getResourceYielder() const = 0;
};
} // namespace mongo
diff --git a/src/mongo/db/pipeline/mongos_process_interface.h b/src/mongo/db/pipeline/mongos_process_interface.h
index 6111f3346d8..d31a12c7f74 100644
--- a/src/mongo/db/pipeline/mongos_process_interface.h
+++ b/src/mongo/db/pipeline/mongos_process_interface.h
@@ -254,6 +254,10 @@ public:
MONGO_UNREACHABLE;
}
+ std::unique_ptr<ResourceYielder> getResourceYielder() const override {
+ return nullptr;
+ }
+
protected:
BSONObj _reportCurrentOpForClient(OperationContext* opCtx,
Client* client,
diff --git a/src/mongo/db/pipeline/process_interface_standalone.cpp b/src/mongo/db/pipeline/process_interface_standalone.cpp
index ab9caa05c93..7f40bca4140 100644
--- a/src/mongo/db/pipeline/process_interface_standalone.cpp
+++ b/src/mongo/db/pipeline/process_interface_standalone.cpp
@@ -1,4 +1,3 @@
-
/**
* Copyright (C) 2018-present MongoDB, Inc.
*
@@ -47,6 +46,7 @@
#include "mongo/db/s/collection_sharding_state.h"
#include "mongo/db/s/sharding_state.h"
#include "mongo/db/session_catalog.h"
+#include "mongo/db/session_catalog_mongod.h"
#include "mongo/db/stats/fill_locker_info.h"
#include "mongo/db/stats/storage_stats.h"
#include "mongo/db/storage/backup_cursor_hooks.h"
@@ -65,6 +65,42 @@ using write_ops::UpdateOpEntry;
namespace {
+class MongoDResourceYielder : public ResourceYielder {
+public:
+ void yield(OperationContext* opCtx) override {
+ // We're about to block. Check back in the session so that it's available to other
+ // threads. Note that we may block on a request to _ourselves_, meaning that we may have to
+ // wait for another thread which will use the same session. This step is necessary
+ // to prevent deadlocks.
+
+ Session* const session = OperationContextSession::get(opCtx);
+ if (session) {
+ MongoDOperationContextSession::checkIn(opCtx);
+ }
+ _yielded = (session != nullptr);
+ }
+
+ void unyield(OperationContext* opCtx) override {
+ if (_yielded) {
+ // This may block on a sub-operation on this node finishing. It's possible that while
+ // blocked on the network layer, another shard could have responded, theoretically
+ // unblocking this thread of execution. However, we must wait until the child operation
+ // on this shard finishes so we can get the session back. This may limit the throughput
+ // of the operation, but it's correct.
+ MongoDOperationContextSession::checkOut(opCtx,
+ // Assumes this is only called from the
+ // 'aggregate' or 'getMore' commands. The code
+ // which relies on this parameter does not
+ // distinguish/care about the difference so we
+ // simply always pass 'aggregate'.
+ "aggregate");
+ }
+ }
+
+private:
+ bool _yielded = false;
+};
+
// Returns true if the field names of 'keyPattern' are exactly those in 'uniqueKeyPaths', and each
// of the elements of 'keyPattern' is numeric, i.e. not "text", "$**", or any other special type of
// index.
@@ -549,4 +585,8 @@ std::unique_ptr<CollatorInterface> MongoInterfaceStandalone::_getCollectionDefau
return collator ? collator->clone() : nullptr;
}
+std::unique_ptr<ResourceYielder> MongoInterfaceStandalone::getResourceYielder() const {
+ return std::make_unique<MongoDResourceYielder>();
+}
+
} // namespace mongo
diff --git a/src/mongo/db/pipeline/process_interface_standalone.h b/src/mongo/db/pipeline/process_interface_standalone.h
index f1c7fbcc910..b9f7a250849 100644
--- a/src/mongo/db/pipeline/process_interface_standalone.h
+++ b/src/mongo/db/pipeline/process_interface_standalone.h
@@ -126,6 +126,8 @@ public:
uasserted(51020, "unexpected request to consult sharding catalog on non-shardsvr");
}
+ std::unique_ptr<ResourceYielder> getResourceYielder() const override;
+
protected:
BSONObj _reportCurrentOpForClient(OperationContext* opCtx,
Client* client,
diff --git a/src/mongo/db/pipeline/stub_mongo_process_interface.h b/src/mongo/db/pipeline/stub_mongo_process_interface.h
index b9d7befb857..1d1f4a3bd27 100644
--- a/src/mongo/db/pipeline/stub_mongo_process_interface.h
+++ b/src/mongo/db/pipeline/stub_mongo_process_interface.h
@@ -200,5 +200,9 @@ public:
ChunkVersion) const override {
uasserted(51019, "Unexpected check of routing table");
}
+
+ std::unique_ptr<ResourceYielder> getResourceYielder() const override {
+ return nullptr;
+ }
};
} // namespace mongo
diff --git a/src/mongo/db/resource_yielder.h b/src/mongo/db/resource_yielder.h
new file mode 100644
index 00000000000..7702c286f81
--- /dev/null
+++ b/src/mongo/db/resource_yielder.h
@@ -0,0 +1,48 @@
+/**
+ * Copyright (C) 2018-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/operation_context.h"
+
+namespace mongo {
+/**
+ * Functions to call before and after blocking on the network layer so that resources may be given
+ * up so that "sub operations" running on the same node may use them. For example, a node may want
+ * to check in its session before waiting on the network so that a sub-operation may check it out
+ * and use it. This is important for preventing deadlocks.
+ */
+class ResourceYielder {
+public:
+ virtual ~ResourceYielder() = default;
+
+ virtual void yield(OperationContext*) = 0;
+ virtual void unyield(OperationContext*) = 0;
+};
+} // namespace mongo
diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp
index c695741d003..380e7784bdb 100644
--- a/src/mongo/db/service_entry_point_common.cpp
+++ b/src/mongo/db/service_entry_point_common.cpp
@@ -371,6 +371,13 @@ void invokeWithSessionCheckedOut(OperationContext* opCtx,
createTransactionCoordinator(opCtx, *sessionOptions.getTxnNumber());
}
}
+
+ if (txnParticipant->inMultiDocumentTransaction() && !sessionOptions.getStartTransaction()) {
+ const auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx);
+ uassert(ErrorCodes::InvalidOptions,
+ "Only the first command in a transaction may specify a readConcern",
+ readConcernArgs.isEmpty());
+ }
}
txnParticipant->unstashTransactionResources(opCtx, invocation->definition()->getName());
diff --git a/src/mongo/db/session_catalog_mongod.cpp b/src/mongo/db/session_catalog_mongod.cpp
index acb0a8e5c62..9ef31bc588b 100644
--- a/src/mongo/db/session_catalog_mongod.cpp
+++ b/src/mongo/db/session_catalog_mongod.cpp
@@ -200,6 +200,22 @@ MongoDOperationContextSession::MongoDOperationContextSession(OperationContext* o
MongoDOperationContextSession::~MongoDOperationContextSession() = default;
+void MongoDOperationContextSession::checkIn(OperationContext* opCtx) {
+
+ if (auto txnParticipant = TransactionParticipant::get(opCtx)) {
+ txnParticipant->stashTransactionResources(opCtx);
+ }
+
+ OperationContextSession::checkIn(opCtx);
+}
+
+void MongoDOperationContextSession::checkOut(OperationContext* opCtx, const std::string& cmdName) {
+ OperationContextSession::checkOut(opCtx);
+
+ if (auto txnParticipant = TransactionParticipant::get(opCtx)) {
+ txnParticipant->unstashTransactionResources(opCtx, cmdName);
+ }
+}
MongoDOperationContextSessionWithoutRefresh::MongoDOperationContextSessionWithoutRefresh(
OperationContext* opCtx)
diff --git a/src/mongo/db/session_catalog_mongod.h b/src/mongo/db/session_catalog_mongod.h
index ca9e5b0e0ca..4aa7944c8c0 100644
--- a/src/mongo/db/session_catalog_mongod.h
+++ b/src/mongo/db/session_catalog_mongod.h
@@ -75,6 +75,20 @@ public:
MongoDOperationContextSession(OperationContext* opCtx);
~MongoDOperationContextSession();
+ /**
+ * This method takes an operation context with a checked-out session and allows it to be
+ * temporarily or permanently checked back in, in order to allow other operations to use it.
+ *
+ * May only be called if the session has actually been checked out previously.
+ */
+ static void checkIn(OperationContext* opCtx);
+
+ /**
+ * May only be called if the session is not checked out already. 'cmdType' is used to validate
+ * that the expected transaction flow control is being obeyed.
+ */
+ static void checkOut(OperationContext* opCtx, const std::string& cmdName);
+
private:
OperationContextSession _operationContextSession;
};
diff --git a/src/mongo/db/transaction_participant.cpp b/src/mongo/db/transaction_participant.cpp
index 8e2b8f42da9..5f67b995cf7 100644
--- a/src/mongo/db/transaction_participant.cpp
+++ b/src/mongo/db/transaction_participant.cpp
@@ -702,10 +702,6 @@ void TransactionParticipant::unstashTransactionResources(OperationContext* opCtx
if (_txnResourceStash) {
// Transaction resources already exist for this transaction. Transfer them from the
// stash to the operation context.
- auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx);
- uassert(ErrorCodes::InvalidOptions,
- "Only the first command in a transaction may specify a readConcern",
- readConcernArgs.isEmpty());
_txnResourceStash->release(opCtx);
_txnResourceStash = boost::none;
stdx::lock_guard<stdx::mutex> lm(_metricsMutex);
diff --git a/src/mongo/db/transaction_participant.h b/src/mongo/db/transaction_participant.h
index 1c40764decd..7c493bae0d9 100644
--- a/src/mongo/db/transaction_participant.h
+++ b/src/mongo/db/transaction_participant.h
@@ -317,7 +317,7 @@ public:
void abortArbitraryTransaction();
/**
- * Returns whether the transaction has exceedet its expiration time.
+ * Returns whether the transaction has exceeded its expiration time.
*/
bool expired() const;
diff --git a/src/mongo/db/transaction_participant_test.cpp b/src/mongo/db/transaction_participant_test.cpp
index a8ffc392980..d1e522e4526 100644
--- a/src/mongo/db/transaction_participant_test.cpp
+++ b/src/mongo/db/transaction_participant_test.cpp
@@ -2281,44 +2281,6 @@ TEST_F(TransactionsMetricsTest, TimeActiveMicrosShouldIncreaseUntilCommit) {
Microseconds(200));
}
-TEST_F(TransactionsMetricsTest, TimeActiveMicrosShouldNotBeSetIfUnstashHasBadReadConcernArgs) {
- auto tickSource = initMockTickSource();
-
- auto sessionCheckout = checkOutSession();
- auto txnParticipant = TransactionParticipant::get(opCtx());
-
- // Initialize bad read concern args (!readConcernArgs.isEmpty()).
- repl::ReadConcernArgs readConcernArgs(repl::ReadConcernLevel::kLocalReadConcern);
- repl::ReadConcernArgs::get(opCtx()) = readConcernArgs;
-
- // Transaction resources do not exist yet.
- txnParticipant->unstashTransactionResources(opCtx(), "find");
-
- tickSource->advance(Microseconds(100));
-
- // The transaction machinery cannot store an empty locker.
- { Lock::GlobalLock lk(opCtx(), MODE_IX, Date_t::now(), Lock::InterruptBehavior::kThrow); }
- txnParticipant->stashTransactionResources(opCtx());
-
- // Time active should have increased.
- ASSERT_EQ(txnParticipant->getSingleTransactionStats().getTimeActiveMicros(
- tickSource, tickSource->getTicks()),
- Microseconds{100});
-
- // Transaction resources already exist here and should throw an exception due to bad read
- // concern arguments.
- ASSERT_THROWS_CODE(txnParticipant->unstashTransactionResources(opCtx(), "find"),
- AssertionException,
- ErrorCodes::InvalidOptions);
-
- tickSource->advance(Microseconds(100));
-
- // Time active should not have increased.
- ASSERT_EQ(txnParticipant->getSingleTransactionStats().getTimeActiveMicros(
- tickSource, tickSource->getTicks()),
- Microseconds{100});
-}
-
TEST_F(TransactionsMetricsTest, AdditiveMetricsObjectsShouldBeAddedTogetherUponStash) {
auto sessionCheckout = checkOutSession();
auto txnParticipant = TransactionParticipant::get(opCtx());
diff --git a/src/mongo/s/query/blocking_results_merger.cpp b/src/mongo/s/query/blocking_results_merger.cpp
index b572a51705e..2431901340d 100644
--- a/src/mongo/s/query/blocking_results_merger.cpp
+++ b/src/mongo/s/query/blocking_results_merger.cpp
@@ -31,16 +31,51 @@
#include "mongo/platform/basic.h"
#include "mongo/db/query/find_common.h"
+#include "mongo/db/session_catalog_mongod.h"
+#include "mongo/db/transaction_participant.h"
#include "mongo/s/query/blocking_results_merger.h"
namespace mongo {
BlockingResultsMerger::BlockingResultsMerger(OperationContext* opCtx,
AsyncResultsMergerParams&& armParams,
- executor::TaskExecutor* executor)
+ executor::TaskExecutor* executor,
+ std::unique_ptr<ResourceYielder> resourceYielder)
: _tailableMode(armParams.getTailableMode().value_or(TailableModeEnum::kNormal)),
_executor(executor),
- _arm(opCtx, executor, std::move(armParams)) {}
+ _arm(opCtx, executor, std::move(armParams)),
+ _resourceYielder(std::move(resourceYielder)) {}
+
+StatusWith<stdx::cv_status> BlockingResultsMerger::doWaiting(
+ OperationContext* opCtx, const std::function<StatusWith<stdx::cv_status>()>& waitFn) noexcept {
+
+ if (_resourceYielder) {
+ try {
+ // The BRM interface returns Statuses. Be sure we respect that here.
+ _resourceYielder->yield(opCtx);
+ } catch (const DBException& e) {
+ return e.toStatus();
+ }
+ }
+
+ boost::optional<StatusWith<stdx::cv_status>> result;
+ try {
+ // This shouldn't throw, but we cannot enforce that.
+ result = waitFn();
+ } catch (const DBException&) {
+ MONGO_UNREACHABLE;
+ }
+
+ if (_resourceYielder) {
+ try {
+ _resourceYielder->unyield(opCtx);
+ } catch (const DBException& e) {
+ return e.toStatus();
+ }
+ }
+
+ return *result;
+}
StatusWith<ClusterQueryResult> BlockingResultsMerger::awaitNextWithTimeout(
OperationContext* opCtx, RouterExecStage::ExecContext execCtx) {
@@ -54,9 +89,10 @@ StatusWith<ClusterQueryResult> BlockingResultsMerger::awaitNextWithTimeout(
}
auto event = nextEventStatus.getValue();
- // Block until there are further results to return, or our time limit is exceeded.
- auto waitStatus =
- _executor->waitForEvent(opCtx, event, awaitDataState(opCtx).waitForInsertsDeadline);
+ const auto waitStatus = doWaiting(opCtx, [this, opCtx, &event]() {
+ return _executor->waitForEvent(
+ opCtx, event, awaitDataState(opCtx).waitForInsertsDeadline);
+ });
if (!waitStatus.isOK()) {
return waitStatus.getStatus();
@@ -84,7 +120,8 @@ StatusWith<ClusterQueryResult> BlockingResultsMerger::blockUntilNext(OperationCo
auto event = nextEventStatus.getValue();
// Block until there are further results to return.
- auto status = _executor->waitForEvent(opCtx, event);
+ auto status = doWaiting(
+ opCtx, [this, opCtx, &event]() { return _executor->waitForEvent(opCtx, event); });
if (!status.isOK()) {
return status.getStatus();
diff --git a/src/mongo/s/query/blocking_results_merger.h b/src/mongo/s/query/blocking_results_merger.h
index f35ce904709..7d82cdff49c 100644
--- a/src/mongo/s/query/blocking_results_merger.h
+++ b/src/mongo/s/query/blocking_results_merger.h
@@ -43,7 +43,8 @@ class BlockingResultsMerger {
public:
BlockingResultsMerger(OperationContext* opCtx,
AsyncResultsMergerParams&& arm,
- executor::TaskExecutor*);
+ executor::TaskExecutor*,
+ std::unique_ptr<ResourceYielder> resourceYielder);
/**
* Blocks until the next result is available or an error is detected.
@@ -100,6 +101,14 @@ private:
*/
StatusWith<executor::TaskExecutor::EventHandle> getNextEvent();
+ /**
+ * Call the waitFn and return the result, yielding resources while waiting if necessary.
+ * 'waitFn' may not throw.
+ */
+ StatusWith<stdx::cv_status> doWaiting(
+ OperationContext* opCtx,
+ const std::function<StatusWith<stdx::cv_status>()>& waitFn) noexcept;
+
TailableModeEnum _tailableMode;
executor::TaskExecutor* _executor;
@@ -110,6 +119,10 @@ private:
// and pick back up waiting for it on the next call to 'next()'.
executor::TaskExecutor::EventHandle _leftoverEventFromLastTimeout;
AsyncResultsMerger _arm;
+
+ // Provides interface for yielding and "unyielding" resources while waiting for results from
+ // the network. A value of nullptr implies that no such yielding or unyielding is necessary.
+ std::unique_ptr<ResourceYielder> _resourceYielder;
};
} // namespace mongo
diff --git a/src/mongo/s/query/blocking_results_merger_test.cpp b/src/mongo/s/query/blocking_results_merger_test.cpp
index 961b3b5912c..2f76d576e3d 100644
--- a/src/mongo/s/query/blocking_results_merger_test.cpp
+++ b/src/mongo/s/query/blocking_results_merger_test.cpp
@@ -32,6 +32,7 @@
#include "mongo/s/query/blocking_results_merger.h"
#include "mongo/s/query/results_merger_test_fixture.h"
+#include "mongo/unittest/unittest.h"
namespace mongo {
@@ -43,9 +44,33 @@ TEST_F(ResultsMergerTestFixture, ShouldBeAbleToBlockUntilKilled) {
std::vector<RemoteCursor> cursors;
cursors.emplace_back(
makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
+ BlockingResultsMerger blockingMerger(operationContext(),
+ makeARMParamsFromExistingCursors(std::move(cursors)),
+ executor(),
+ nullptr);
+
+ blockingMerger.kill(operationContext());
+}
+
+TEST_F(ResultsMergerTestFixture, ShouldBeAbleToBlockUntilDeadlineExpires) {
+ std::vector<RemoteCursor> cursors;
+ cursors.emplace_back(
+ makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
+ auto params = makeARMParamsFromExistingCursors(std::move(cursors));
+ params.setTailableMode(TailableModeEnum::kTailableAndAwaitData);
BlockingResultsMerger blockingMerger(
- operationContext(), makeARMParamsFromExistingCursors(std::move(cursors)), executor());
+ operationContext(), std::move(params), executor(), nullptr);
+ // Issue a blocking wait for the next result asynchronously on a different thread.
+ auto future = launchAsync([&]() {
+ auto next = unittest::assertGet(
+ blockingMerger.next(operationContext(), RouterExecStage::ExecContext::kInitialFind));
+
+ // The timeout should hit, and return an empty object.
+ ASSERT_TRUE(next.isEOF());
+ });
+
+ future.timed_get(kFutureTimeout);
blockingMerger.kill(operationContext());
}
@@ -53,8 +78,10 @@ TEST_F(ResultsMergerTestFixture, ShouldBeAbleToBlockUntilNextResultIsReady) {
std::vector<RemoteCursor> cursors;
cursors.emplace_back(
makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
- BlockingResultsMerger blockingMerger(
- operationContext(), makeARMParamsFromExistingCursors(std::move(cursors)), executor());
+ BlockingResultsMerger blockingMerger(operationContext(),
+ makeARMParamsFromExistingCursors(std::move(cursors)),
+ executor(),
+ nullptr);
// Issue a blocking wait for the next result asynchronously on a different thread.
auto future = launchAsync([&]() {
@@ -78,12 +105,57 @@ TEST_F(ResultsMergerTestFixture, ShouldBeAbleToBlockUntilNextResultIsReady) {
future.timed_get(kFutureTimeout);
}
+TEST_F(ResultsMergerTestFixture, ShouldBeAbleToBlockUntilNextResultIsReadyWithDeadline) {
+ std::vector<RemoteCursor> cursors;
+ cursors.emplace_back(
+ makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
+ auto params = makeARMParamsFromExistingCursors(std::move(cursors));
+ params.setTailableMode(TailableModeEnum::kTailableAndAwaitData);
+ BlockingResultsMerger blockingMerger(
+ operationContext(), std::move(params), executor(), nullptr);
+
+ // Used for synchronizing the background thread with this thread.
+ stdx::mutex mutex;
+ stdx::unique_lock<stdx::mutex> lk(mutex);
+
+ // Issue a blocking wait for the next result asynchronously on a different thread.
+ auto future = launchAsync([&]() {
+ // Will schedule a getMore. No one will send a response, so will return EOF.
+ auto next = unittest::assertGet(blockingMerger.next(
+ operationContext(), RouterExecStage::ExecContext::kGetMoreNoResultsYet));
+ ASSERT_TRUE(next.isEOF());
+
+ // Block until the main thread has responded to the getMore.
+ stdx::unique_lock<stdx::mutex> lk(mutex);
+
+ next = unittest::assertGet(blockingMerger.next(
+ operationContext(), RouterExecStage::ExecContext::kGetMoreNoResultsYet));
+ ASSERT_FALSE(next.isEOF());
+ ASSERT_BSONOBJ_EQ(*next.getResult(), BSON("x" << 1));
+
+ });
+
+ // Schedule the response to the getMore which will return the next result and mark the cursor as
+ // exhausted.
+ onCommand([&](const auto& request) {
+ ASSERT(request.cmdObj["getMore"]);
+ return CursorResponse(kTestNss, 0LL, {BSON("x" << 1)})
+ .toBSON(CursorResponse::ResponseType::SubsequentResponse);
+ });
+
+ // Unblock the other thread, allowing it to call next() on the BlockingResultsMerger.
+ lk.unlock();
+
+ future.timed_get(kFutureTimeout);
+}
+
TEST_F(ResultsMergerTestFixture, ShouldBeInterruptableDuringBlockingNext) {
std::vector<RemoteCursor> cursors;
cursors.emplace_back(
makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
auto params = makeARMParamsFromExistingCursors(std::move(cursors));
- BlockingResultsMerger blockingMerger(operationContext(), std::move(params), executor());
+ BlockingResultsMerger blockingMerger(
+ operationContext(), std::move(params), executor(), nullptr);
// Issue a blocking wait for the next result asynchronously on a different thread.
auto future = launchAsync([&]() {
@@ -116,5 +188,81 @@ TEST_F(ResultsMergerTestFixture, ShouldBeInterruptableDuringBlockingNext) {
future.timed_get(kFutureTimeout);
}
+TEST_F(ResultsMergerTestFixture, ShouldBeAbleToHandleExceptionWhenYielding) {
+ class ThrowyResourceYielder : public ResourceYielder {
+ public:
+ void yield(OperationContext*) {
+ uasserted(ErrorCodes::BadValue, "Simulated error");
+ }
+
+ void unyield(OperationContext*) {}
+ };
+
+ std::vector<RemoteCursor> cursors;
+ cursors.emplace_back(
+ makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
+ BlockingResultsMerger blockingMerger(operationContext(),
+ makeARMParamsFromExistingCursors(std::move(cursors)),
+ executor(),
+ std::make_unique<ThrowyResourceYielder>());
+
+ // Issue a blocking wait for the next result asynchronously on a different thread.
+ auto future = launchAsync([&]() {
+ // Make sure that the next() call throws correctly.
+ const auto status =
+ blockingMerger.next(operationContext(), RouterExecStage::ExecContext::kInitialFind)
+ .getStatus();
+ ASSERT_EQ(status, ErrorCodes::BadValue);
+ });
+
+ // Schedule the response to the getMore which will return the next result and mark the cursor as
+ // exhausted.
+ onCommand([&](const auto& request) {
+ ASSERT(request.cmdObj["getMore"]);
+ return CursorResponse(kTestNss, 0LL, {BSON("x" << 1)})
+ .toBSON(CursorResponse::ResponseType::SubsequentResponse);
+ });
+
+ future.timed_get(kFutureTimeout);
+}
+
+TEST_F(ResultsMergerTestFixture, ShouldBeAbleToHandleExceptionWhenUnyielding) {
+ class ThrowyResourceYielder : public ResourceYielder {
+ public:
+ void yield(OperationContext*) {}
+
+ void unyield(OperationContext*) {
+ uasserted(ErrorCodes::BadValue, "Simulated error");
+ }
+ };
+
+ std::vector<RemoteCursor> cursors;
+ cursors.emplace_back(
+ makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {})));
+ BlockingResultsMerger blockingMerger(operationContext(),
+ makeARMParamsFromExistingCursors(std::move(cursors)),
+ executor(),
+ std::make_unique<ThrowyResourceYielder>());
+
+ // Issue a blocking wait for the next result asynchronously on a different thread.
+ auto future = launchAsync([&]() {
+ // Make sure that the next() call throws correctly.
+ const auto status =
+ blockingMerger.next(operationContext(), RouterExecStage::ExecContext::kInitialFind)
+ .getStatus();
+ ASSERT_EQ(status, ErrorCodes::BadValue);
+ });
+
+ // Schedule the response to the getMore which will return the next result and mark the cursor as
+ // exhausted.
+ onCommand([&](const auto& request) {
+ ASSERT(request.cmdObj["getMore"]);
+ return CursorResponse(kTestNss, 0LL, {BSON("x" << 1)})
+ .toBSON(CursorResponse::ResponseType::SubsequentResponse);
+ });
+
+ future.timed_get(kFutureTimeout);
+}
+
} // namespace
} // namespace mongo
diff --git a/src/mongo/s/query/cluster_aggregate.cpp b/src/mongo/s/query/cluster_aggregate.cpp
index 12270fc1c99..3ead06bc1f1 100644
--- a/src/mongo/s/query/cluster_aggregate.cpp
+++ b/src/mongo/s/query/cluster_aggregate.cpp
@@ -105,6 +105,7 @@ Status appendCursorResponseToCommandResult(const ShardId& shardId,
BSONObj createCommandForMergingShard(const AggregationRequest& request,
const boost::intrusive_ptr<ExpressionContext>& mergeCtx,
const ShardId& shardId,
+ bool mergingShardContributesData,
const Pipeline* pipelineForMerging) {
MutableDocument mergeCmd(request.serializeToCommandObj());
@@ -119,9 +120,19 @@ BSONObj createCommandForMergingShard(const AggregationRequest& request,
: Value(Document{CollationSpec::kSimpleSpec});
}
+ const auto txnRouter = TransactionRouter::get(mergeCtx->opCtx);
+ if (txnRouter && mergingShardContributesData) {
+ // Don't include a readConcern since we can only include read concerns on the _first_
+ // command sent to a participant per transaction. Assuming the merging shard is a
+ // participant, it will already have received another 'aggregate' command earlier which
+ // contained a readConcern.
+
+ mergeCmd.remove("readConcern");
+ }
+
auto aggCmd = mergeCmd.freeze().toBson();
- if (auto txnRouter = TransactionRouter::get(mergeCtx->opCtx)) {
+ if (txnRouter) {
aggCmd = txnRouter->attachTxnFieldsIfNeeded(shardId, aggCmd);
}
@@ -609,19 +620,16 @@ Status dispatchMergingPipeline(const boost::intrusive_ptr<ExpressionContext>& ex
// therefore must have a valid routing table.
invariant(routingInfo);
- // TODO SERVER-33683 allowing an aggregation within a transaction can lead to a deadlock in the
- // SessionCatalog when a pipeline with a $mergeCursors sends a getMore to itself.
- uassert(ErrorCodes::OperationNotSupportedInTransaction,
- "Cannot specify a transaction number in combination with an aggregation on mongos when "
- "merging on a shard",
- !opCtx->getTxnNumber());
-
- ShardId mergingShardId = pickMergingShard(opCtx,
- shardDispatchResults.needsPrimaryShardMerge,
- targetedShards,
- routingInfo->db().primaryId());
+ const ShardId mergingShardId = pickMergingShard(opCtx,
+ shardDispatchResults.needsPrimaryShardMerge,
+ targetedShards,
+ routingInfo->db().primaryId());
+ const bool mergingShardContributesData =
+ std::find(targetedShards.begin(), targetedShards.end(), mergingShardId) !=
+ targetedShards.end();
- auto mergeCmdObj = createCommandForMergingShard(request, expCtx, mergingShardId, mergePipeline);
+ auto mergeCmdObj = createCommandForMergingShard(
+ request, expCtx, mergingShardId, mergingShardContributesData, mergePipeline);
// Dispatch $mergeCursors to the chosen shard, store the resulting cursor, and return.
auto mergeResponse = establishMergingShardCursor(
diff --git a/src/mongo/s/query/document_source_merge_cursors.cpp b/src/mongo/s/query/document_source_merge_cursors.cpp
index fe1303684da..fbe93ea1b53 100644
--- a/src/mongo/s/query/document_source_merge_cursors.cpp
+++ b/src/mongo/s/query/document_source_merge_cursors.cpp
@@ -74,7 +74,11 @@ bool DocumentSourceMergeCursors::remotesExhausted() const {
void DocumentSourceMergeCursors::populateMerger() {
invariant(!_blockingResultsMerger);
invariant(_armParams);
- _blockingResultsMerger.emplace(pExpCtx->opCtx, std::move(*_armParams), _executor);
+
+ _blockingResultsMerger.emplace(pExpCtx->opCtx,
+ std::move(*_armParams),
+ _executor,
+ pExpCtx->mongoProcessInterface->getResourceYielder());
_armParams = boost::none;
// '_blockingResultsMerger' now owns the cursors.
_ownCursors = false;
diff --git a/src/mongo/s/query/document_source_merge_cursors.h b/src/mongo/s/query/document_source_merge_cursors.h
index a3d65bcab27..5249da51a8e 100644
--- a/src/mongo/s/query/document_source_merge_cursors.h
+++ b/src/mongo/s/query/document_source_merge_cursors.h
@@ -89,9 +89,7 @@ public:
HostTypeRequirement::kNone,
DiskUseRequirement::kNoDiskUse,
FacetRequirement::kNotAllowed,
- // TODO SERVER-33683: Permit $mergeCursors with readConcern
- // level "snapshot".
- TransactionRequirement::kNotAllowed);
+ TransactionRequirement::kAllowed);
constraints.requiresInputDocSource = false;
return constraints;
diff --git a/src/mongo/s/query/router_stage_merge.h b/src/mongo/s/query/router_stage_merge.h
index 7833c6550c0..6c8e389eded 100644
--- a/src/mongo/s/query/router_stage_merge.h
+++ b/src/mongo/s/query/router_stage_merge.h
@@ -47,7 +47,7 @@ public:
RouterStageMerge(OperationContext* opCtx,
executor::TaskExecutor* executor,
AsyncResultsMergerParams&& armParams)
- : RouterExecStage(opCtx), _resultsMerger(opCtx, std::move(armParams), executor) {}
+ : RouterExecStage(opCtx), _resultsMerger(opCtx, std::move(armParams), executor, nullptr) {}
StatusWith<ClusterQueryResult> next(ExecContext execCtx) final {
return _resultsMerger.next(getOpCtx(), execCtx);