diff options
author | Ian Boros <ian.boros@10gen.com> | 2018-11-28 18:38:15 -0500 |
---|---|---|
committer | Ian Boros <ian.boros@10gen.com> | 2018-12-17 16:36:20 -0500 |
commit | d40d24abc025690150ccf8009ba1facb9ed1c6b2 (patch) | |
tree | 8ba35e0d4f8c7e441ffe80de30faa24a708c436e /src/mongo | |
parent | b37b5ef7ec0ec2e502423d53e6c0d6e86b343c27 (diff) | |
download | mongo-d40d24abc025690150ccf8009ba1facb9ed1c6b2.tar.gz |
SERVER-33683 Prevent deadlock in aggregate with transactions
Diffstat (limited to 'src/mongo')
19 files changed, 377 insertions, 73 deletions
diff --git a/src/mongo/db/pipeline/mongo_process_interface.h b/src/mongo/db/pipeline/mongo_process_interface.h index be934e88707..d51d348c459 100644 --- a/src/mongo/db/pipeline/mongo_process_interface.h +++ b/src/mongo/db/pipeline/mongo_process_interface.h @@ -48,6 +48,7 @@ #include "mongo/db/pipeline/lite_parsed_document_source.h" #include "mongo/db/pipeline/value.h" #include "mongo/db/query/explain_options.h" +#include "mongo/db/resource_yielder.h" #include "mongo/db/storage/backup_cursor_state.h" #include "mongo/s/chunk_version.h" @@ -316,6 +317,8 @@ public: virtual void checkRoutingInfoEpochOrThrow(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& nss, ChunkVersion targetCollectionVersion) const = 0; + + virtual std::unique_ptr<ResourceYielder> getResourceYielder() const = 0; }; } // namespace mongo diff --git a/src/mongo/db/pipeline/mongos_process_interface.h b/src/mongo/db/pipeline/mongos_process_interface.h index 6111f3346d8..d31a12c7f74 100644 --- a/src/mongo/db/pipeline/mongos_process_interface.h +++ b/src/mongo/db/pipeline/mongos_process_interface.h @@ -254,6 +254,10 @@ public: MONGO_UNREACHABLE; } + std::unique_ptr<ResourceYielder> getResourceYielder() const override { + return nullptr; + } + protected: BSONObj _reportCurrentOpForClient(OperationContext* opCtx, Client* client, diff --git a/src/mongo/db/pipeline/process_interface_standalone.cpp b/src/mongo/db/pipeline/process_interface_standalone.cpp index ab9caa05c93..7f40bca4140 100644 --- a/src/mongo/db/pipeline/process_interface_standalone.cpp +++ b/src/mongo/db/pipeline/process_interface_standalone.cpp @@ -1,4 +1,3 @@ - /** * Copyright (C) 2018-present MongoDB, Inc. * @@ -47,6 +46,7 @@ #include "mongo/db/s/collection_sharding_state.h" #include "mongo/db/s/sharding_state.h" #include "mongo/db/session_catalog.h" +#include "mongo/db/session_catalog_mongod.h" #include "mongo/db/stats/fill_locker_info.h" #include "mongo/db/stats/storage_stats.h" #include "mongo/db/storage/backup_cursor_hooks.h" @@ -65,6 +65,42 @@ using write_ops::UpdateOpEntry; namespace { +class MongoDResourceYielder : public ResourceYielder { +public: + void yield(OperationContext* opCtx) override { + // We're about to block. Check back in the session so that it's available to other + // threads. Note that we may block on a request to _ourselves_, meaning that we may have to + // wait for another thread which will use the same session. This step is necessary + // to prevent deadlocks. + + Session* const session = OperationContextSession::get(opCtx); + if (session) { + MongoDOperationContextSession::checkIn(opCtx); + } + _yielded = (session != nullptr); + } + + void unyield(OperationContext* opCtx) override { + if (_yielded) { + // This may block on a sub-operation on this node finishing. It's possible that while + // blocked on the network layer, another shard could have responded, theoretically + // unblocking this thread of execution. However, we must wait until the child operation + // on this shard finishes so we can get the session back. This may limit the throughput + // of the operation, but it's correct. + MongoDOperationContextSession::checkOut(opCtx, + // Assumes this is only called from the + // 'aggregate' or 'getMore' commands. The code + // which relies on this parameter does not + // distinguish/care about the difference so we + // simply always pass 'aggregate'. + "aggregate"); + } + } + +private: + bool _yielded = false; +}; + // Returns true if the field names of 'keyPattern' are exactly those in 'uniqueKeyPaths', and each // of the elements of 'keyPattern' is numeric, i.e. not "text", "$**", or any other special type of // index. @@ -549,4 +585,8 @@ std::unique_ptr<CollatorInterface> MongoInterfaceStandalone::_getCollectionDefau return collator ? collator->clone() : nullptr; } +std::unique_ptr<ResourceYielder> MongoInterfaceStandalone::getResourceYielder() const { + return std::make_unique<MongoDResourceYielder>(); +} + } // namespace mongo diff --git a/src/mongo/db/pipeline/process_interface_standalone.h b/src/mongo/db/pipeline/process_interface_standalone.h index f1c7fbcc910..b9f7a250849 100644 --- a/src/mongo/db/pipeline/process_interface_standalone.h +++ b/src/mongo/db/pipeline/process_interface_standalone.h @@ -126,6 +126,8 @@ public: uasserted(51020, "unexpected request to consult sharding catalog on non-shardsvr"); } + std::unique_ptr<ResourceYielder> getResourceYielder() const override; + protected: BSONObj _reportCurrentOpForClient(OperationContext* opCtx, Client* client, diff --git a/src/mongo/db/pipeline/stub_mongo_process_interface.h b/src/mongo/db/pipeline/stub_mongo_process_interface.h index b9d7befb857..1d1f4a3bd27 100644 --- a/src/mongo/db/pipeline/stub_mongo_process_interface.h +++ b/src/mongo/db/pipeline/stub_mongo_process_interface.h @@ -200,5 +200,9 @@ public: ChunkVersion) const override { uasserted(51019, "Unexpected check of routing table"); } + + std::unique_ptr<ResourceYielder> getResourceYielder() const override { + return nullptr; + } }; } // namespace mongo diff --git a/src/mongo/db/resource_yielder.h b/src/mongo/db/resource_yielder.h new file mode 100644 index 00000000000..7702c286f81 --- /dev/null +++ b/src/mongo/db/resource_yielder.h @@ -0,0 +1,48 @@ +/** + * Copyright (C) 2018-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/db/operation_context.h" + +namespace mongo { +/** + * Functions to call before and after blocking on the network layer so that resources may be given + * up so that "sub operations" running on the same node may use them. For example, a node may want + * to check in its session before waiting on the network so that a sub-operation may check it out + * and use it. This is important for preventing deadlocks. + */ +class ResourceYielder { +public: + virtual ~ResourceYielder() = default; + + virtual void yield(OperationContext*) = 0; + virtual void unyield(OperationContext*) = 0; +}; +} // namespace mongo diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp index c695741d003..380e7784bdb 100644 --- a/src/mongo/db/service_entry_point_common.cpp +++ b/src/mongo/db/service_entry_point_common.cpp @@ -371,6 +371,13 @@ void invokeWithSessionCheckedOut(OperationContext* opCtx, createTransactionCoordinator(opCtx, *sessionOptions.getTxnNumber()); } } + + if (txnParticipant->inMultiDocumentTransaction() && !sessionOptions.getStartTransaction()) { + const auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx); + uassert(ErrorCodes::InvalidOptions, + "Only the first command in a transaction may specify a readConcern", + readConcernArgs.isEmpty()); + } } txnParticipant->unstashTransactionResources(opCtx, invocation->definition()->getName()); diff --git a/src/mongo/db/session_catalog_mongod.cpp b/src/mongo/db/session_catalog_mongod.cpp index acb0a8e5c62..9ef31bc588b 100644 --- a/src/mongo/db/session_catalog_mongod.cpp +++ b/src/mongo/db/session_catalog_mongod.cpp @@ -200,6 +200,22 @@ MongoDOperationContextSession::MongoDOperationContextSession(OperationContext* o MongoDOperationContextSession::~MongoDOperationContextSession() = default; +void MongoDOperationContextSession::checkIn(OperationContext* opCtx) { + + if (auto txnParticipant = TransactionParticipant::get(opCtx)) { + txnParticipant->stashTransactionResources(opCtx); + } + + OperationContextSession::checkIn(opCtx); +} + +void MongoDOperationContextSession::checkOut(OperationContext* opCtx, const std::string& cmdName) { + OperationContextSession::checkOut(opCtx); + + if (auto txnParticipant = TransactionParticipant::get(opCtx)) { + txnParticipant->unstashTransactionResources(opCtx, cmdName); + } +} MongoDOperationContextSessionWithoutRefresh::MongoDOperationContextSessionWithoutRefresh( OperationContext* opCtx) diff --git a/src/mongo/db/session_catalog_mongod.h b/src/mongo/db/session_catalog_mongod.h index ca9e5b0e0ca..4aa7944c8c0 100644 --- a/src/mongo/db/session_catalog_mongod.h +++ b/src/mongo/db/session_catalog_mongod.h @@ -75,6 +75,20 @@ public: MongoDOperationContextSession(OperationContext* opCtx); ~MongoDOperationContextSession(); + /** + * This method takes an operation context with a checked-out session and allows it to be + * temporarily or permanently checked back in, in order to allow other operations to use it. + * + * May only be called if the session has actually been checked out previously. + */ + static void checkIn(OperationContext* opCtx); + + /** + * May only be called if the session is not checked out already. 'cmdType' is used to validate + * that the expected transaction flow control is being obeyed. + */ + static void checkOut(OperationContext* opCtx, const std::string& cmdName); + private: OperationContextSession _operationContextSession; }; diff --git a/src/mongo/db/transaction_participant.cpp b/src/mongo/db/transaction_participant.cpp index 8e2b8f42da9..5f67b995cf7 100644 --- a/src/mongo/db/transaction_participant.cpp +++ b/src/mongo/db/transaction_participant.cpp @@ -702,10 +702,6 @@ void TransactionParticipant::unstashTransactionResources(OperationContext* opCtx if (_txnResourceStash) { // Transaction resources already exist for this transaction. Transfer them from the // stash to the operation context. - auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx); - uassert(ErrorCodes::InvalidOptions, - "Only the first command in a transaction may specify a readConcern", - readConcernArgs.isEmpty()); _txnResourceStash->release(opCtx); _txnResourceStash = boost::none; stdx::lock_guard<stdx::mutex> lm(_metricsMutex); diff --git a/src/mongo/db/transaction_participant.h b/src/mongo/db/transaction_participant.h index 1c40764decd..7c493bae0d9 100644 --- a/src/mongo/db/transaction_participant.h +++ b/src/mongo/db/transaction_participant.h @@ -317,7 +317,7 @@ public: void abortArbitraryTransaction(); /** - * Returns whether the transaction has exceedet its expiration time. + * Returns whether the transaction has exceeded its expiration time. */ bool expired() const; diff --git a/src/mongo/db/transaction_participant_test.cpp b/src/mongo/db/transaction_participant_test.cpp index a8ffc392980..d1e522e4526 100644 --- a/src/mongo/db/transaction_participant_test.cpp +++ b/src/mongo/db/transaction_participant_test.cpp @@ -2281,44 +2281,6 @@ TEST_F(TransactionsMetricsTest, TimeActiveMicrosShouldIncreaseUntilCommit) { Microseconds(200)); } -TEST_F(TransactionsMetricsTest, TimeActiveMicrosShouldNotBeSetIfUnstashHasBadReadConcernArgs) { - auto tickSource = initMockTickSource(); - - auto sessionCheckout = checkOutSession(); - auto txnParticipant = TransactionParticipant::get(opCtx()); - - // Initialize bad read concern args (!readConcernArgs.isEmpty()). - repl::ReadConcernArgs readConcernArgs(repl::ReadConcernLevel::kLocalReadConcern); - repl::ReadConcernArgs::get(opCtx()) = readConcernArgs; - - // Transaction resources do not exist yet. - txnParticipant->unstashTransactionResources(opCtx(), "find"); - - tickSource->advance(Microseconds(100)); - - // The transaction machinery cannot store an empty locker. - { Lock::GlobalLock lk(opCtx(), MODE_IX, Date_t::now(), Lock::InterruptBehavior::kThrow); } - txnParticipant->stashTransactionResources(opCtx()); - - // Time active should have increased. - ASSERT_EQ(txnParticipant->getSingleTransactionStats().getTimeActiveMicros( - tickSource, tickSource->getTicks()), - Microseconds{100}); - - // Transaction resources already exist here and should throw an exception due to bad read - // concern arguments. - ASSERT_THROWS_CODE(txnParticipant->unstashTransactionResources(opCtx(), "find"), - AssertionException, - ErrorCodes::InvalidOptions); - - tickSource->advance(Microseconds(100)); - - // Time active should not have increased. - ASSERT_EQ(txnParticipant->getSingleTransactionStats().getTimeActiveMicros( - tickSource, tickSource->getTicks()), - Microseconds{100}); -} - TEST_F(TransactionsMetricsTest, AdditiveMetricsObjectsShouldBeAddedTogetherUponStash) { auto sessionCheckout = checkOutSession(); auto txnParticipant = TransactionParticipant::get(opCtx()); diff --git a/src/mongo/s/query/blocking_results_merger.cpp b/src/mongo/s/query/blocking_results_merger.cpp index b572a51705e..2431901340d 100644 --- a/src/mongo/s/query/blocking_results_merger.cpp +++ b/src/mongo/s/query/blocking_results_merger.cpp @@ -31,16 +31,51 @@ #include "mongo/platform/basic.h" #include "mongo/db/query/find_common.h" +#include "mongo/db/session_catalog_mongod.h" +#include "mongo/db/transaction_participant.h" #include "mongo/s/query/blocking_results_merger.h" namespace mongo { BlockingResultsMerger::BlockingResultsMerger(OperationContext* opCtx, AsyncResultsMergerParams&& armParams, - executor::TaskExecutor* executor) + executor::TaskExecutor* executor, + std::unique_ptr<ResourceYielder> resourceYielder) : _tailableMode(armParams.getTailableMode().value_or(TailableModeEnum::kNormal)), _executor(executor), - _arm(opCtx, executor, std::move(armParams)) {} + _arm(opCtx, executor, std::move(armParams)), + _resourceYielder(std::move(resourceYielder)) {} + +StatusWith<stdx::cv_status> BlockingResultsMerger::doWaiting( + OperationContext* opCtx, const std::function<StatusWith<stdx::cv_status>()>& waitFn) noexcept { + + if (_resourceYielder) { + try { + // The BRM interface returns Statuses. Be sure we respect that here. + _resourceYielder->yield(opCtx); + } catch (const DBException& e) { + return e.toStatus(); + } + } + + boost::optional<StatusWith<stdx::cv_status>> result; + try { + // This shouldn't throw, but we cannot enforce that. + result = waitFn(); + } catch (const DBException&) { + MONGO_UNREACHABLE; + } + + if (_resourceYielder) { + try { + _resourceYielder->unyield(opCtx); + } catch (const DBException& e) { + return e.toStatus(); + } + } + + return *result; +} StatusWith<ClusterQueryResult> BlockingResultsMerger::awaitNextWithTimeout( OperationContext* opCtx, RouterExecStage::ExecContext execCtx) { @@ -54,9 +89,10 @@ StatusWith<ClusterQueryResult> BlockingResultsMerger::awaitNextWithTimeout( } auto event = nextEventStatus.getValue(); - // Block until there are further results to return, or our time limit is exceeded. - auto waitStatus = - _executor->waitForEvent(opCtx, event, awaitDataState(opCtx).waitForInsertsDeadline); + const auto waitStatus = doWaiting(opCtx, [this, opCtx, &event]() { + return _executor->waitForEvent( + opCtx, event, awaitDataState(opCtx).waitForInsertsDeadline); + }); if (!waitStatus.isOK()) { return waitStatus.getStatus(); @@ -84,7 +120,8 @@ StatusWith<ClusterQueryResult> BlockingResultsMerger::blockUntilNext(OperationCo auto event = nextEventStatus.getValue(); // Block until there are further results to return. - auto status = _executor->waitForEvent(opCtx, event); + auto status = doWaiting( + opCtx, [this, opCtx, &event]() { return _executor->waitForEvent(opCtx, event); }); if (!status.isOK()) { return status.getStatus(); diff --git a/src/mongo/s/query/blocking_results_merger.h b/src/mongo/s/query/blocking_results_merger.h index f35ce904709..7d82cdff49c 100644 --- a/src/mongo/s/query/blocking_results_merger.h +++ b/src/mongo/s/query/blocking_results_merger.h @@ -43,7 +43,8 @@ class BlockingResultsMerger { public: BlockingResultsMerger(OperationContext* opCtx, AsyncResultsMergerParams&& arm, - executor::TaskExecutor*); + executor::TaskExecutor*, + std::unique_ptr<ResourceYielder> resourceYielder); /** * Blocks until the next result is available or an error is detected. @@ -100,6 +101,14 @@ private: */ StatusWith<executor::TaskExecutor::EventHandle> getNextEvent(); + /** + * Call the waitFn and return the result, yielding resources while waiting if necessary. + * 'waitFn' may not throw. + */ + StatusWith<stdx::cv_status> doWaiting( + OperationContext* opCtx, + const std::function<StatusWith<stdx::cv_status>()>& waitFn) noexcept; + TailableModeEnum _tailableMode; executor::TaskExecutor* _executor; @@ -110,6 +119,10 @@ private: // and pick back up waiting for it on the next call to 'next()'. executor::TaskExecutor::EventHandle _leftoverEventFromLastTimeout; AsyncResultsMerger _arm; + + // Provides interface for yielding and "unyielding" resources while waiting for results from + // the network. A value of nullptr implies that no such yielding or unyielding is necessary. + std::unique_ptr<ResourceYielder> _resourceYielder; }; } // namespace mongo diff --git a/src/mongo/s/query/blocking_results_merger_test.cpp b/src/mongo/s/query/blocking_results_merger_test.cpp index 961b3b5912c..2f76d576e3d 100644 --- a/src/mongo/s/query/blocking_results_merger_test.cpp +++ b/src/mongo/s/query/blocking_results_merger_test.cpp @@ -32,6 +32,7 @@ #include "mongo/s/query/blocking_results_merger.h" #include "mongo/s/query/results_merger_test_fixture.h" +#include "mongo/unittest/unittest.h" namespace mongo { @@ -43,9 +44,33 @@ TEST_F(ResultsMergerTestFixture, ShouldBeAbleToBlockUntilKilled) { std::vector<RemoteCursor> cursors; cursors.emplace_back( makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {}))); + BlockingResultsMerger blockingMerger(operationContext(), + makeARMParamsFromExistingCursors(std::move(cursors)), + executor(), + nullptr); + + blockingMerger.kill(operationContext()); +} + +TEST_F(ResultsMergerTestFixture, ShouldBeAbleToBlockUntilDeadlineExpires) { + std::vector<RemoteCursor> cursors; + cursors.emplace_back( + makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {}))); + auto params = makeARMParamsFromExistingCursors(std::move(cursors)); + params.setTailableMode(TailableModeEnum::kTailableAndAwaitData); BlockingResultsMerger blockingMerger( - operationContext(), makeARMParamsFromExistingCursors(std::move(cursors)), executor()); + operationContext(), std::move(params), executor(), nullptr); + // Issue a blocking wait for the next result asynchronously on a different thread. + auto future = launchAsync([&]() { + auto next = unittest::assertGet( + blockingMerger.next(operationContext(), RouterExecStage::ExecContext::kInitialFind)); + + // The timeout should hit, and return an empty object. + ASSERT_TRUE(next.isEOF()); + }); + + future.timed_get(kFutureTimeout); blockingMerger.kill(operationContext()); } @@ -53,8 +78,10 @@ TEST_F(ResultsMergerTestFixture, ShouldBeAbleToBlockUntilNextResultIsReady) { std::vector<RemoteCursor> cursors; cursors.emplace_back( makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {}))); - BlockingResultsMerger blockingMerger( - operationContext(), makeARMParamsFromExistingCursors(std::move(cursors)), executor()); + BlockingResultsMerger blockingMerger(operationContext(), + makeARMParamsFromExistingCursors(std::move(cursors)), + executor(), + nullptr); // Issue a blocking wait for the next result asynchronously on a different thread. auto future = launchAsync([&]() { @@ -78,12 +105,57 @@ TEST_F(ResultsMergerTestFixture, ShouldBeAbleToBlockUntilNextResultIsReady) { future.timed_get(kFutureTimeout); } +TEST_F(ResultsMergerTestFixture, ShouldBeAbleToBlockUntilNextResultIsReadyWithDeadline) { + std::vector<RemoteCursor> cursors; + cursors.emplace_back( + makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {}))); + auto params = makeARMParamsFromExistingCursors(std::move(cursors)); + params.setTailableMode(TailableModeEnum::kTailableAndAwaitData); + BlockingResultsMerger blockingMerger( + operationContext(), std::move(params), executor(), nullptr); + + // Used for synchronizing the background thread with this thread. + stdx::mutex mutex; + stdx::unique_lock<stdx::mutex> lk(mutex); + + // Issue a blocking wait for the next result asynchronously on a different thread. + auto future = launchAsync([&]() { + // Will schedule a getMore. No one will send a response, so will return EOF. + auto next = unittest::assertGet(blockingMerger.next( + operationContext(), RouterExecStage::ExecContext::kGetMoreNoResultsYet)); + ASSERT_TRUE(next.isEOF()); + + // Block until the main thread has responded to the getMore. + stdx::unique_lock<stdx::mutex> lk(mutex); + + next = unittest::assertGet(blockingMerger.next( + operationContext(), RouterExecStage::ExecContext::kGetMoreNoResultsYet)); + ASSERT_FALSE(next.isEOF()); + ASSERT_BSONOBJ_EQ(*next.getResult(), BSON("x" << 1)); + + }); + + // Schedule the response to the getMore which will return the next result and mark the cursor as + // exhausted. + onCommand([&](const auto& request) { + ASSERT(request.cmdObj["getMore"]); + return CursorResponse(kTestNss, 0LL, {BSON("x" << 1)}) + .toBSON(CursorResponse::ResponseType::SubsequentResponse); + }); + + // Unblock the other thread, allowing it to call next() on the BlockingResultsMerger. + lk.unlock(); + + future.timed_get(kFutureTimeout); +} + TEST_F(ResultsMergerTestFixture, ShouldBeInterruptableDuringBlockingNext) { std::vector<RemoteCursor> cursors; cursors.emplace_back( makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {}))); auto params = makeARMParamsFromExistingCursors(std::move(cursors)); - BlockingResultsMerger blockingMerger(operationContext(), std::move(params), executor()); + BlockingResultsMerger blockingMerger( + operationContext(), std::move(params), executor(), nullptr); // Issue a blocking wait for the next result asynchronously on a different thread. auto future = launchAsync([&]() { @@ -116,5 +188,81 @@ TEST_F(ResultsMergerTestFixture, ShouldBeInterruptableDuringBlockingNext) { future.timed_get(kFutureTimeout); } +TEST_F(ResultsMergerTestFixture, ShouldBeAbleToHandleExceptionWhenYielding) { + class ThrowyResourceYielder : public ResourceYielder { + public: + void yield(OperationContext*) { + uasserted(ErrorCodes::BadValue, "Simulated error"); + } + + void unyield(OperationContext*) {} + }; + + std::vector<RemoteCursor> cursors; + cursors.emplace_back( + makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {}))); + BlockingResultsMerger blockingMerger(operationContext(), + makeARMParamsFromExistingCursors(std::move(cursors)), + executor(), + std::make_unique<ThrowyResourceYielder>()); + + // Issue a blocking wait for the next result asynchronously on a different thread. + auto future = launchAsync([&]() { + // Make sure that the next() call throws correctly. + const auto status = + blockingMerger.next(operationContext(), RouterExecStage::ExecContext::kInitialFind) + .getStatus(); + ASSERT_EQ(status, ErrorCodes::BadValue); + }); + + // Schedule the response to the getMore which will return the next result and mark the cursor as + // exhausted. + onCommand([&](const auto& request) { + ASSERT(request.cmdObj["getMore"]); + return CursorResponse(kTestNss, 0LL, {BSON("x" << 1)}) + .toBSON(CursorResponse::ResponseType::SubsequentResponse); + }); + + future.timed_get(kFutureTimeout); +} + +TEST_F(ResultsMergerTestFixture, ShouldBeAbleToHandleExceptionWhenUnyielding) { + class ThrowyResourceYielder : public ResourceYielder { + public: + void yield(OperationContext*) {} + + void unyield(OperationContext*) { + uasserted(ErrorCodes::BadValue, "Simulated error"); + } + }; + + std::vector<RemoteCursor> cursors; + cursors.emplace_back( + makeRemoteCursor(kTestShardIds[0], kTestShardHosts[0], CursorResponse(kTestNss, 1, {}))); + BlockingResultsMerger blockingMerger(operationContext(), + makeARMParamsFromExistingCursors(std::move(cursors)), + executor(), + std::make_unique<ThrowyResourceYielder>()); + + // Issue a blocking wait for the next result asynchronously on a different thread. + auto future = launchAsync([&]() { + // Make sure that the next() call throws correctly. + const auto status = + blockingMerger.next(operationContext(), RouterExecStage::ExecContext::kInitialFind) + .getStatus(); + ASSERT_EQ(status, ErrorCodes::BadValue); + }); + + // Schedule the response to the getMore which will return the next result and mark the cursor as + // exhausted. + onCommand([&](const auto& request) { + ASSERT(request.cmdObj["getMore"]); + return CursorResponse(kTestNss, 0LL, {BSON("x" << 1)}) + .toBSON(CursorResponse::ResponseType::SubsequentResponse); + }); + + future.timed_get(kFutureTimeout); +} + } // namespace } // namespace mongo diff --git a/src/mongo/s/query/cluster_aggregate.cpp b/src/mongo/s/query/cluster_aggregate.cpp index 12270fc1c99..3ead06bc1f1 100644 --- a/src/mongo/s/query/cluster_aggregate.cpp +++ b/src/mongo/s/query/cluster_aggregate.cpp @@ -105,6 +105,7 @@ Status appendCursorResponseToCommandResult(const ShardId& shardId, BSONObj createCommandForMergingShard(const AggregationRequest& request, const boost::intrusive_ptr<ExpressionContext>& mergeCtx, const ShardId& shardId, + bool mergingShardContributesData, const Pipeline* pipelineForMerging) { MutableDocument mergeCmd(request.serializeToCommandObj()); @@ -119,9 +120,19 @@ BSONObj createCommandForMergingShard(const AggregationRequest& request, : Value(Document{CollationSpec::kSimpleSpec}); } + const auto txnRouter = TransactionRouter::get(mergeCtx->opCtx); + if (txnRouter && mergingShardContributesData) { + // Don't include a readConcern since we can only include read concerns on the _first_ + // command sent to a participant per transaction. Assuming the merging shard is a + // participant, it will already have received another 'aggregate' command earlier which + // contained a readConcern. + + mergeCmd.remove("readConcern"); + } + auto aggCmd = mergeCmd.freeze().toBson(); - if (auto txnRouter = TransactionRouter::get(mergeCtx->opCtx)) { + if (txnRouter) { aggCmd = txnRouter->attachTxnFieldsIfNeeded(shardId, aggCmd); } @@ -609,19 +620,16 @@ Status dispatchMergingPipeline(const boost::intrusive_ptr<ExpressionContext>& ex // therefore must have a valid routing table. invariant(routingInfo); - // TODO SERVER-33683 allowing an aggregation within a transaction can lead to a deadlock in the - // SessionCatalog when a pipeline with a $mergeCursors sends a getMore to itself. - uassert(ErrorCodes::OperationNotSupportedInTransaction, - "Cannot specify a transaction number in combination with an aggregation on mongos when " - "merging on a shard", - !opCtx->getTxnNumber()); - - ShardId mergingShardId = pickMergingShard(opCtx, - shardDispatchResults.needsPrimaryShardMerge, - targetedShards, - routingInfo->db().primaryId()); + const ShardId mergingShardId = pickMergingShard(opCtx, + shardDispatchResults.needsPrimaryShardMerge, + targetedShards, + routingInfo->db().primaryId()); + const bool mergingShardContributesData = + std::find(targetedShards.begin(), targetedShards.end(), mergingShardId) != + targetedShards.end(); - auto mergeCmdObj = createCommandForMergingShard(request, expCtx, mergingShardId, mergePipeline); + auto mergeCmdObj = createCommandForMergingShard( + request, expCtx, mergingShardId, mergingShardContributesData, mergePipeline); // Dispatch $mergeCursors to the chosen shard, store the resulting cursor, and return. auto mergeResponse = establishMergingShardCursor( diff --git a/src/mongo/s/query/document_source_merge_cursors.cpp b/src/mongo/s/query/document_source_merge_cursors.cpp index fe1303684da..fbe93ea1b53 100644 --- a/src/mongo/s/query/document_source_merge_cursors.cpp +++ b/src/mongo/s/query/document_source_merge_cursors.cpp @@ -74,7 +74,11 @@ bool DocumentSourceMergeCursors::remotesExhausted() const { void DocumentSourceMergeCursors::populateMerger() { invariant(!_blockingResultsMerger); invariant(_armParams); - _blockingResultsMerger.emplace(pExpCtx->opCtx, std::move(*_armParams), _executor); + + _blockingResultsMerger.emplace(pExpCtx->opCtx, + std::move(*_armParams), + _executor, + pExpCtx->mongoProcessInterface->getResourceYielder()); _armParams = boost::none; // '_blockingResultsMerger' now owns the cursors. _ownCursors = false; diff --git a/src/mongo/s/query/document_source_merge_cursors.h b/src/mongo/s/query/document_source_merge_cursors.h index a3d65bcab27..5249da51a8e 100644 --- a/src/mongo/s/query/document_source_merge_cursors.h +++ b/src/mongo/s/query/document_source_merge_cursors.h @@ -89,9 +89,7 @@ public: HostTypeRequirement::kNone, DiskUseRequirement::kNoDiskUse, FacetRequirement::kNotAllowed, - // TODO SERVER-33683: Permit $mergeCursors with readConcern - // level "snapshot". - TransactionRequirement::kNotAllowed); + TransactionRequirement::kAllowed); constraints.requiresInputDocSource = false; return constraints; diff --git a/src/mongo/s/query/router_stage_merge.h b/src/mongo/s/query/router_stage_merge.h index 7833c6550c0..6c8e389eded 100644 --- a/src/mongo/s/query/router_stage_merge.h +++ b/src/mongo/s/query/router_stage_merge.h @@ -47,7 +47,7 @@ public: RouterStageMerge(OperationContext* opCtx, executor::TaskExecutor* executor, AsyncResultsMergerParams&& armParams) - : RouterExecStage(opCtx), _resultsMerger(opCtx, std::move(armParams), executor) {} + : RouterExecStage(opCtx), _resultsMerger(opCtx, std::move(armParams), executor, nullptr) {} StatusWith<ClusterQueryResult> next(ExecContext execCtx) final { return _resultsMerger.next(getOpCtx(), execCtx); |