diff options
author | Mark Benvenuto <mark.benvenuto@mongodb.com> | 2016-07-26 15:23:10 -0400 |
---|---|---|
committer | Mark Benvenuto <mark.benvenuto@mongodb.com> | 2016-07-26 15:23:51 -0400 |
commit | 1febb4ceb0e59743b0a49af35db10c4c689aa130 (patch) | |
tree | 431445a395fd4fbd1e7c94bf1c4151bba714088d /src/mongo/s | |
parent | 7daf57f28f564329e92b8779cf12845776b958b3 (diff) | |
download | mongo-1febb4ceb0e59743b0a49af35db10c4c689aa130.tar.gz |
SERVER-24615 Add support for OperationContext in EgressMetadataHook
Diffstat (limited to 'src/mongo/s')
31 files changed, 165 insertions, 49 deletions
diff --git a/src/mongo/s/balancer/migration_manager.cpp b/src/mongo/s/balancer/migration_manager.cpp index 3b24561ade6..209c90b7106 100644 --- a/src/mongo/s/balancer/migration_manager.cpp +++ b/src/mongo/s/balancer/migration_manager.cpp @@ -178,7 +178,7 @@ void MigrationManager::_executeMigrations(OperationContext* txn, continue; } - RemoteCommandRequest remoteRequest(host.getValue(), "admin", moveChunkRequestObj); + RemoteCommandRequest remoteRequest(host.getValue(), "admin", moveChunkRequestObj, txn); StatusWith<RemoteCommandResponse> remoteCommandResponse( Status{ErrorCodes::InternalError, "Uninitialized value"}); diff --git a/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp b/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp index f8aec3aea0c..bd587160121 100644 --- a/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp +++ b/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp @@ -269,7 +269,7 @@ StatusWith<Shard::CommandResponse> ShardingCatalogManagerImpl::_runCommandForAdd } executor::RemoteCommandRequest request( - host.getValue(), dbName, cmdObj, rpc::makeEmptyMetadata(), Seconds(30)); + host.getValue(), dbName, cmdObj, rpc::makeEmptyMetadata(), nullptr, Seconds(30)); StatusWith<executor::RemoteCommandResponse> swResponse = Status(ErrorCodes::InternalError, "Internal error running command"); @@ -1167,7 +1167,7 @@ void ShardingCatalogManagerImpl::_scheduleAddShardTask( } executor::RemoteCommandRequest request( - swHost.getValue(), "admin", commandRequest, rpc::makeEmptyMetadata(), Seconds(30)); + swHost.getValue(), "admin", commandRequest, rpc::makeEmptyMetadata(), nullptr, Seconds(30)); const RemoteCommandCallbackFn callback = stdx::bind(&ShardingCatalogManagerImpl::_handleAddShardTaskResponse, diff --git a/src/mongo/s/client/dbclient_multi_command.cpp b/src/mongo/s/client/dbclient_multi_command.cpp index 2f3e6ae67f8..9b086c15aa3 100644 --- a/src/mongo/s/client/dbclient_multi_command.cpp +++ b/src/mongo/s/client/dbclient_multi_command.cpp @@ -31,6 +31,7 @@ #include "mongo/s/client/dbclient_multi_command.h" #include "mongo/db/audit.h" +#include "mongo/db/client.h" #include "mongo/db/dbmessage.h" #include "mongo/db/wire_version.h" #include "mongo/rpc/factory.h" @@ -88,7 +89,9 @@ static void sayAsCmd(DBClientBase* conn, StringData dbName, const BSONObj& cmdOb BSONObjBuilder metadataBob; metadataBob.appendElements(upconvertedMetadata); if (conn->getRequestMetadataWriter()) { - conn->getRequestMetadataWriter()(&metadataBob, conn->getServerAddress()); + conn->getRequestMetadataWriter()((haveClient() ? cc().getOperationContext() : nullptr), + &metadataBob, + conn->getServerAddress()); } requestBuilder->setDatabase(dbName); diff --git a/src/mongo/s/client/shard_remote.cpp b/src/mongo/s/client/shard_remote.cpp index 2fff4b54520..79b7d493863 100644 --- a/src/mongo/s/client/shard_remote.cpp +++ b/src/mongo/s/client/shard_remote.cpp @@ -212,6 +212,7 @@ Shard::HostWithResponse ShardRemote::_runCommand(OperationContext* txn, dbName, cmdWithMaxTimeMS, _getMetadataForCommand(readPref), + txn, isConfig() ? kConfigCommandTimeout : executor::RemoteCommandRequest::kNoTimeout); StatusWith<RemoteCommandResponse> swResponse = diff --git a/src/mongo/s/client/sharding_connection_hook.cpp b/src/mongo/s/client/sharding_connection_hook.cpp index b838c50e10c..b24bccd3381 100644 --- a/src/mongo/s/client/sharding_connection_hook.cpp +++ b/src/mongo/s/client/sharding_connection_hook.cpp @@ -77,9 +77,11 @@ void ShardingConnectionHook::onCreate(DBClientBase* conn) { return _egressHook->readReplyMetadata(target, metadataObj); }); } - conn->setRequestMetadataWriter([this](BSONObjBuilder* metadataBob, StringData hostStringData) { - return _egressHook->writeRequestMetadata(_shardedConnections, hostStringData, metadataBob); - }); + conn->setRequestMetadataWriter( + [this](OperationContext* txn, BSONObjBuilder* metadataBob, StringData hostStringData) { + return _egressHook->writeRequestMetadata( + _shardedConnections, txn, hostStringData, metadataBob); + }); if (conn->type() == ConnectionString::MASTER) { diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp index 9dd3e10c1f3..2d1919769ad 100644 --- a/src/mongo/s/query/async_results_merger.cpp +++ b/src/mongo/s/query/async_results_merger.cpp @@ -114,6 +114,11 @@ Status AsyncResultsMerger::setAwaitDataTimeout(Milliseconds awaitDataTimeout) { return Status::OK(); } +void AsyncResultsMerger::setOperationContext(OperationContext* txn) { + stdx::lock_guard<stdx::mutex> lk(_mutex); + _params.txn = txn; +} + bool AsyncResultsMerger::ready() { stdx::lock_guard<stdx::mutex> lk(_mutex); return ready_inlock(); @@ -290,8 +295,11 @@ Status AsyncResultsMerger::askForNextBatch_inlock(size_t remoteIndex) { cmdObj = *remote.initialCmdObj; } - executor::RemoteCommandRequest request( - remote.getTargetHost(), _params.nsString.db().toString(), cmdObj, _metadataObj); + executor::RemoteCommandRequest request(remote.getTargetHost(), + _params.nsString.db().toString(), + cmdObj, + _metadataObj, + _params.txn); auto callbackStatus = _executor->scheduleRemoteCommand( request, @@ -560,7 +568,7 @@ void AsyncResultsMerger::scheduleKillCursors_inlock() { BSONObj cmdObj = KillCursorsRequest(_params.nsString, {*remote.cursorId}).toBSON(); executor::RemoteCommandRequest request( - remote.getTargetHost(), _params.nsString.db().toString(), cmdObj); + remote.getTargetHost(), _params.nsString.db().toString(), cmdObj, _params.txn); _executor->scheduleRemoteCommand( request, diff --git a/src/mongo/s/query/async_results_merger.h b/src/mongo/s/query/async_results_merger.h index b4d04a9c7ad..8956dca52c8 100644 --- a/src/mongo/s/query/async_results_merger.h +++ b/src/mongo/s/query/async_results_merger.h @@ -101,6 +101,13 @@ public: Status setAwaitDataTimeout(Milliseconds awaitDataTimeout); /** + * Update the operation context for remote requests. + * + * Network requests depend on having a valid operation context for user initiated actions. + */ + void setOperationContext(OperationContext* txn); + + /** * Returns true if there is no need to schedule remote work in order to take the next action. * This means that either * --there is a buffered result which we can return, diff --git a/src/mongo/s/query/cluster_client_cursor.h b/src/mongo/s/query/cluster_client_cursor.h index 55f2412f7c8..47f4e46f89a 100644 --- a/src/mongo/s/query/cluster_client_cursor.h +++ b/src/mongo/s/query/cluster_client_cursor.h @@ -35,6 +35,7 @@ namespace mongo { +class OperationContext; template <typename T> class StatusWith; @@ -105,6 +106,13 @@ public: * the cursor is not tailable + awaitData). */ virtual Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) = 0; + + /** + * Update the operation context for remote requests. + * + * Network requests depend on having a valid operation context for user initiated actions. + */ + virtual void setOperationContext(OperationContext* txn) = 0; }; } // namespace mongo diff --git a/src/mongo/s/query/cluster_client_cursor_impl.cpp b/src/mongo/s/query/cluster_client_cursor_impl.cpp index c40dafa91ee..d37b78f2a58 100644 --- a/src/mongo/s/query/cluster_client_cursor_impl.cpp +++ b/src/mongo/s/query/cluster_client_cursor_impl.cpp @@ -113,6 +113,10 @@ Status ClusterClientCursorImpl::setAwaitDataTimeout(Milliseconds awaitDataTimeou return _root->setAwaitDataTimeout(awaitDataTimeout); } +void ClusterClientCursorImpl::setOperationContext(OperationContext* txn) { + return _root->setOperationContext(txn); +} + std::unique_ptr<RouterExecStage> ClusterClientCursorImpl::buildMergerPlan( executor::TaskExecutor* executor, ClusterClientCursorParams&& params) { const auto skip = params.skip; diff --git a/src/mongo/s/query/cluster_client_cursor_impl.h b/src/mongo/s/query/cluster_client_cursor_impl.h index b16dae9f9d3..d8645bb7834 100644 --- a/src/mongo/s/query/cluster_client_cursor_impl.h +++ b/src/mongo/s/query/cluster_client_cursor_impl.h @@ -105,6 +105,8 @@ public: Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) final; + void setOperationContext(OperationContext* txn) final; + private: /** * Constructs a cluster client cursor. diff --git a/src/mongo/s/query/cluster_client_cursor_mock.cpp b/src/mongo/s/query/cluster_client_cursor_mock.cpp index 4cf9418fc7f..2b10928449e 100644 --- a/src/mongo/s/query/cluster_client_cursor_mock.cpp +++ b/src/mongo/s/query/cluster_client_cursor_mock.cpp @@ -97,4 +97,9 @@ Status ClusterClientCursorMock::setAwaitDataTimeout(Milliseconds awaitDataTimeou MONGO_UNREACHABLE; } + +void ClusterClientCursorMock::setOperationContext(OperationContext* txn) { + // Do nothing +} + } // namespace mongo diff --git a/src/mongo/s/query/cluster_client_cursor_mock.h b/src/mongo/s/query/cluster_client_cursor_mock.h index 67efae2181a..3749a8abb19 100644 --- a/src/mongo/s/query/cluster_client_cursor_mock.h +++ b/src/mongo/s/query/cluster_client_cursor_mock.h @@ -55,6 +55,8 @@ public: Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) final; + void setOperationContext(OperationContext* txn) final; + /** * Returns true unless marked as having non-exhausted remote cursors via * markRemotesNotExhausted(). diff --git a/src/mongo/s/query/cluster_client_cursor_params.h b/src/mongo/s/query/cluster_client_cursor_params.h index 5e21b25ea8b..fce3bcf12cb 100644 --- a/src/mongo/s/query/cluster_client_cursor_params.h +++ b/src/mongo/s/query/cluster_client_cursor_params.h @@ -137,6 +137,10 @@ struct ClusterClientCursorParams { // Whether the client indicated that it is willing to receive partial results in the case of an // unreachable host. bool isAllowPartialResults = false; + + // OperationContext of the calling thread. Used to append Client dependent metadata to remote + // requests. + OperationContext* txn; }; } // mongo diff --git a/src/mongo/s/query/cluster_cursor_manager.cpp b/src/mongo/s/query/cluster_cursor_manager.cpp index 924a472d7a5..cca70a8dbdf 100644 --- a/src/mongo/s/query/cluster_cursor_manager.cpp +++ b/src/mongo/s/query/cluster_cursor_manager.cpp @@ -152,6 +152,11 @@ Status ClusterCursorManager::PinnedCursor::setAwaitDataTimeout(Milliseconds awai return _cursor->setAwaitDataTimeout(awaitDataTimeout); } +void ClusterCursorManager::PinnedCursor::setOperationContext(OperationContext* txn) { + return _cursor->setOperationContext(txn); +} + + void ClusterCursorManager::PinnedCursor::returnAndKillCursor() { invariant(_cursor); @@ -245,7 +250,7 @@ StatusWith<CursorId> ClusterCursorManager::registerCursor( } StatusWith<ClusterCursorManager::PinnedCursor> ClusterCursorManager::checkOutCursor( - const NamespaceString& nss, CursorId cursorId) { + const NamespaceString& nss, CursorId cursorId, OperationContext* txn) { // Read the clock out of the lock. const auto now = _clockSource->now(); @@ -269,6 +274,7 @@ StatusWith<ClusterCursorManager::PinnedCursor> ClusterCursorManager::checkOutCur } entry->setLastActive(now); + cursor->setOperationContext(txn); // Note that pinning a cursor transfers ownership of the underlying ClusterClientCursor object // to the pin; the CursorEntry is left with a null ClusterClientCursor. @@ -283,11 +289,15 @@ void ClusterCursorManager::checkInCursor(std::unique_ptr<ClusterClientCursor> cu invariant(cursor); + // Reset OperationContext so that non-user initiated operations do not try to use an invalid + // operation context + cursor->setOperationContext(nullptr); const bool remotesExhausted = cursor->remotesExhausted(); CursorEntry* entry = getEntry_inlock(nss, cursorId); invariant(entry); + entry->returnCursor(std::move(cursor)); if (cursorState == CursorState::NotExhausted || entry->getKillPending()) { @@ -390,6 +400,7 @@ std::size_t ClusterCursorManager::reapZombieCursors() { } lk.unlock(); + zombieCursor.getValue()->setOperationContext(nullptr); zombieCursor.getValue()->kill(); zombieCursor.getValue().reset(); lk.lock(); diff --git a/src/mongo/s/query/cluster_cursor_manager.h b/src/mongo/s/query/cluster_cursor_manager.h index 7770cc741c8..a1d4b28ba68 100644 --- a/src/mongo/s/query/cluster_cursor_manager.h +++ b/src/mongo/s/query/cluster_cursor_manager.h @@ -42,6 +42,7 @@ namespace mongo { class ClockSource; +class OperationContext; template <typename T> class StatusWith; @@ -201,6 +202,14 @@ public: */ Status setAwaitDataTimeout(Milliseconds awaitDataTimeout); + + /** + * Update the operation context for remote requests. + * + * Network requests depend on having a valid operation context for user initiated actions. + */ + void setOperationContext(OperationContext* txn); + private: // ClusterCursorManager is a friend so that its methods can call the PinnedCursor // constructor declared below, which is private to prevent clients from calling it directly. @@ -278,7 +287,9 @@ public: * * Does not block. */ - StatusWith<PinnedCursor> checkOutCursor(const NamespaceString& nss, CursorId cursorId); + StatusWith<PinnedCursor> checkOutCursor(const NamespaceString& nss, + CursorId cursorId, + OperationContext* txn); /** * Informs the manager that the given cursor should be killed. The cursor need not necessarily diff --git a/src/mongo/s/query/cluster_cursor_manager_test.cpp b/src/mongo/s/query/cluster_cursor_manager_test.cpp index 0edda882977..ce65558f865 100644 --- a/src/mongo/s/query/cluster_cursor_manager_test.cpp +++ b/src/mongo/s/query/cluster_cursor_manager_test.cpp @@ -113,7 +113,7 @@ TEST_F(ClusterCursorManagerTest, RegisterCursor) { nss, ClusterCursorManager::CursorType::NamespaceNotSharded, ClusterCursorManager::CursorLifetime::Mortal)); - auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId); + auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, nullptr); ASSERT_OK(pinnedCursor.getStatus()); auto nextResult = pinnedCursor.getValue().next(); ASSERT_OK(nextResult.getStatus()); @@ -143,7 +143,7 @@ TEST_F(ClusterCursorManagerTest, CheckOutCursorBasic) { nss, ClusterCursorManager::CursorType::NamespaceNotSharded, ClusterCursorManager::CursorLifetime::Mortal)); - auto checkedOutCursor = getManager()->checkOutCursor(nss, cursorId); + auto checkedOutCursor = getManager()->checkOutCursor(nss, cursorId, nullptr); ASSERT_OK(checkedOutCursor.getStatus()); ASSERT_EQ(cursorId, checkedOutCursor.getValue().getCursorId()); auto nextResult = checkedOutCursor.getValue().next(); @@ -170,7 +170,7 @@ TEST_F(ClusterCursorManagerTest, CheckOutCursorMultipleCursors) { ClusterCursorManager::CursorLifetime::Mortal)); } for (int i = 0; i < numCursors; ++i) { - auto pinnedCursor = getManager()->checkOutCursor(nss, cursorIds[i]); + auto pinnedCursor = getManager()->checkOutCursor(nss, cursorIds[i], nullptr); ASSERT_OK(pinnedCursor.getStatus()); auto nextResult = pinnedCursor.getValue().next(); ASSERT_OK(nextResult.getStatus()); @@ -189,9 +189,10 @@ TEST_F(ClusterCursorManagerTest, CheckOutCursorPinned) { nss, ClusterCursorManager::CursorType::NamespaceNotSharded, ClusterCursorManager::CursorLifetime::Mortal)); - auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId); + auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, nullptr); ASSERT_OK(pinnedCursor.getStatus()); - ASSERT_EQ(ErrorCodes::CursorInUse, getManager()->checkOutCursor(nss, cursorId).getStatus()); + ASSERT_EQ(ErrorCodes::CursorInUse, + getManager()->checkOutCursor(nss, cursorId, nullptr).getStatus()); } // Test that checking out a killed cursor returns an error with code ErrorCodes::CursorNotFound. @@ -202,12 +203,14 @@ TEST_F(ClusterCursorManagerTest, CheckOutCursorKilled) { ClusterCursorManager::CursorType::NamespaceNotSharded, ClusterCursorManager::CursorLifetime::Mortal)); ASSERT_OK(getManager()->killCursor(nss, cursorId)); - ASSERT_EQ(ErrorCodes::CursorNotFound, getManager()->checkOutCursor(nss, cursorId).getStatus()); + ASSERT_EQ(ErrorCodes::CursorNotFound, + getManager()->checkOutCursor(nss, cursorId, nullptr).getStatus()); } // Test that checking out an unknown cursor returns an error with code ErrorCodes::CursorNotFound. TEST_F(ClusterCursorManagerTest, CheckOutCursorUnknown) { - ASSERT_EQ(ErrorCodes::CursorNotFound, getManager()->checkOutCursor(nss, 5).getStatus()); + ASSERT_EQ(ErrorCodes::CursorNotFound, + getManager()->checkOutCursor(nss, 5, nullptr).getStatus()); } // Test that checking out a unknown cursor returns an error with code ErrorCodes::CursorNotFound, @@ -221,7 +224,7 @@ TEST_F(ClusterCursorManagerTest, CheckOutCursorWrongNamespace) { ClusterCursorManager::CursorType::NamespaceNotSharded, ClusterCursorManager::CursorLifetime::Mortal)); ASSERT_EQ(ErrorCodes::CursorNotFound, - getManager()->checkOutCursor(incorrectNamespace, cursorId).getStatus()); + getManager()->checkOutCursor(incorrectNamespace, cursorId, nullptr).getStatus()); } // Test that checking out a unknown cursor returns an error with code ErrorCodes::CursorNotFound, @@ -233,7 +236,7 @@ TEST_F(ClusterCursorManagerTest, CheckOutCursorWrongCursorId) { ClusterCursorManager::CursorType::NamespaceNotSharded, ClusterCursorManager::CursorLifetime::Mortal)); ASSERT_EQ(ErrorCodes::CursorNotFound, - getManager()->checkOutCursor(nss, cursorId + 1).getStatus()); + getManager()->checkOutCursor(nss, cursorId + 1, nullptr).getStatus()); } // Test that checking out a cursor updates the 'last active' time associated with the cursor to the @@ -246,7 +249,7 @@ TEST_F(ClusterCursorManagerTest, CheckOutCursorUpdateActiveTime) { ClusterCursorManager::CursorLifetime::Mortal)); Date_t cursorRegistrationTime = getClockSource()->now(); getClockSource()->advance(Milliseconds(1)); - auto checkedOutCursor = getManager()->checkOutCursor(nss, cursorId); + auto checkedOutCursor = getManager()->checkOutCursor(nss, cursorId, nullptr); ASSERT_OK(checkedOutCursor.getStatus()); checkedOutCursor.getValue().returnCursor(ClusterCursorManager::CursorState::NotExhausted); getManager()->killMortalCursorsInactiveSince(cursorRegistrationTime); @@ -262,7 +265,7 @@ TEST_F(ClusterCursorManagerTest, KillCursorBasic) { nss, ClusterCursorManager::CursorType::NamespaceNotSharded, ClusterCursorManager::CursorLifetime::Mortal)); - auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId); + auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, nullptr); ASSERT_OK(pinnedCursor.getStatus()); ASSERT_OK(getManager()->killCursor(nss, pinnedCursor.getValue().getCursorId())); pinnedCursor.getValue().returnCursor(ClusterCursorManager::CursorState::NotExhausted); @@ -434,7 +437,7 @@ TEST_F(ClusterCursorManagerTest, ReapZombieCursorsSkipPinned) { nss, ClusterCursorManager::CursorType::NamespaceNotSharded, ClusterCursorManager::CursorLifetime::Mortal)); - auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId); + auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, nullptr); ASSERT(!isMockCursorKilled(0)); getManager()->reapZombieCursors(); ASSERT(!isMockCursorKilled(0)); @@ -484,7 +487,7 @@ TEST_F(ClusterCursorManagerTest, StatsPinCursor) { nss, ClusterCursorManager::CursorType::NamespaceSharded, ClusterCursorManager::CursorLifetime::Mortal)); - auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId); + auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, nullptr); ASSERT_EQ(1U, getManager()->stats().cursorsPinned); } @@ -542,7 +545,7 @@ TEST_F(ClusterCursorManagerTest, StatsKillPinnedCursor) { nss, ClusterCursorManager::CursorType::NamespaceSharded, ClusterCursorManager::CursorLifetime::Mortal)); - auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId); + auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, nullptr); ASSERT_EQ(1U, getManager()->stats().cursorsPinned); ASSERT_OK(getManager()->killCursor(nss, cursorId)); ASSERT_EQ(0U, getManager()->stats().cursorsPinned); @@ -555,7 +558,7 @@ TEST_F(ClusterCursorManagerTest, StatsExhaustShardedCursor) { nss, ClusterCursorManager::CursorType::NamespaceSharded, ClusterCursorManager::CursorLifetime::Mortal)); - auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId); + auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, nullptr); ASSERT_OK(pinnedCursor.getStatus()); ASSERT_OK(pinnedCursor.getValue().next().getStatus()); ASSERT_EQ(1U, getManager()->stats().cursorsSharded); @@ -570,7 +573,7 @@ TEST_F(ClusterCursorManagerTest, StatsExhaustNotShardedCursor) { nss, ClusterCursorManager::CursorType::NamespaceNotSharded, ClusterCursorManager::CursorLifetime::Mortal)); - auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId); + auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, nullptr); ASSERT_OK(pinnedCursor.getStatus()); ASSERT_OK(pinnedCursor.getValue().next().getStatus()); ASSERT_EQ(1U, getManager()->stats().cursorsNotSharded); @@ -586,7 +589,7 @@ TEST_F(ClusterCursorManagerTest, StatsExhaustPinnedCursor) { nss, ClusterCursorManager::CursorType::NamespaceNotSharded, ClusterCursorManager::CursorLifetime::Mortal)); - auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId); + auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, nullptr); ASSERT_OK(pinnedCursor.getStatus()); ASSERT_OK(pinnedCursor.getValue().next().getStatus()); ASSERT_EQ(1U, getManager()->stats().cursorsPinned); @@ -602,7 +605,7 @@ TEST_F(ClusterCursorManagerTest, StatsCheckInWithoutExhaustingPinnedCursor) { nss, ClusterCursorManager::CursorType::NamespaceNotSharded, ClusterCursorManager::CursorLifetime::Mortal)); - auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId); + auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, nullptr); ASSERT_OK(pinnedCursor.getStatus()); ASSERT_OK(pinnedCursor.getValue().next().getStatus()); ASSERT_EQ(1U, getManager()->stats().cursorsPinned); @@ -685,13 +688,13 @@ TEST_F(ClusterCursorManagerTest, PinnedCursorReturnCursorNotExhausted) { nss, ClusterCursorManager::CursorType::NamespaceNotSharded, ClusterCursorManager::CursorLifetime::Mortal)); - auto registeredCursor = getManager()->checkOutCursor(nss, cursorId); + auto registeredCursor = getManager()->checkOutCursor(nss, cursorId, nullptr); ASSERT_OK(registeredCursor.getStatus()); ASSERT_EQ(cursorId, registeredCursor.getValue().getCursorId()); ASSERT_NE(0, cursorId); registeredCursor.getValue().returnCursor(ClusterCursorManager::CursorState::NotExhausted); ASSERT_EQ(0, registeredCursor.getValue().getCursorId()); - auto checkedOutCursor = getManager()->checkOutCursor(nss, cursorId); + auto checkedOutCursor = getManager()->checkOutCursor(nss, cursorId, nullptr); ASSERT_OK(checkedOutCursor.getStatus()); } @@ -703,7 +706,7 @@ TEST_F(ClusterCursorManagerTest, PinnedCursorReturnCursorExhausted) { nss, ClusterCursorManager::CursorType::NamespaceNotSharded, ClusterCursorManager::CursorLifetime::Mortal)); - auto registeredCursor = getManager()->checkOutCursor(nss, cursorId); + auto registeredCursor = getManager()->checkOutCursor(nss, cursorId, nullptr); ASSERT_OK(registeredCursor.getStatus()); ASSERT_EQ(cursorId, registeredCursor.getValue().getCursorId()); ASSERT_NE(0, cursorId); @@ -714,7 +717,7 @@ TEST_F(ClusterCursorManagerTest, PinnedCursorReturnCursorExhausted) { // Cursor should have been destroyed without ever being killed. To be sure that the cursor has // not been marked kill pending but not yet destroyed (i.e. that the cursor is not a zombie), we // reapZombieCursors() and check that the cursor still has not been killed. - ASSERT_NOT_OK(getManager()->checkOutCursor(nss, cursorId).getStatus()); + ASSERT_NOT_OK(getManager()->checkOutCursor(nss, cursorId, nullptr).getStatus()); ASSERT(!isMockCursorKilled(0)); getManager()->reapZombieCursors(); ASSERT(!isMockCursorKilled(0)); @@ -734,7 +737,7 @@ TEST_F(ClusterCursorManagerTest, PinnedCursorReturnCursorExhaustedWithNonExhaust nss, ClusterCursorManager::CursorType::NamespaceNotSharded, ClusterCursorManager::CursorLifetime::Mortal)); - auto registeredCursor = getManager()->checkOutCursor(nss, cursorId); + auto registeredCursor = getManager()->checkOutCursor(nss, cursorId, nullptr); ASSERT_OK(registeredCursor.getStatus()); ASSERT_EQ(cursorId, registeredCursor.getValue().getCursorId()); ASSERT_NE(0, cursorId); @@ -743,7 +746,7 @@ TEST_F(ClusterCursorManagerTest, PinnedCursorReturnCursorExhaustedWithNonExhaust ASSERT_EQ(0, registeredCursor.getValue().getCursorId()); // Cursor should be kill pending, so it will be killed during reaping. - ASSERT_NOT_OK(getManager()->checkOutCursor(nss, cursorId).getStatus()); + ASSERT_NOT_OK(getManager()->checkOutCursor(nss, cursorId, nullptr).getStatus()); ASSERT(!isMockCursorKilled(0)); getManager()->reapZombieCursors(); ASSERT(isMockCursorKilled(0)); @@ -757,7 +760,7 @@ TEST_F(ClusterCursorManagerTest, PinnedCursorMoveAssignmentKill) { nss, ClusterCursorManager::CursorType::NamespaceNotSharded, ClusterCursorManager::CursorLifetime::Mortal)); - auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId); + auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, nullptr); pinnedCursor = ClusterCursorManager::PinnedCursor(); ASSERT(!isMockCursorKilled(0)); getManager()->reapZombieCursors(); @@ -772,7 +775,7 @@ TEST_F(ClusterCursorManagerTest, PinnedCursorDestructorKill) { nss, ClusterCursorManager::CursorType::NamespaceNotSharded, ClusterCursorManager::CursorLifetime::Mortal)); - auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId); + auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, nullptr); } ASSERT(!isMockCursorKilled(0)); getManager()->reapZombieCursors(); @@ -790,7 +793,7 @@ TEST_F(ClusterCursorManagerTest, RemotesExhausted) { nss, ClusterCursorManager::CursorType::NamespaceNotSharded, ClusterCursorManager::CursorLifetime::Mortal)); - auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId); + auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, nullptr); ASSERT_OK(pinnedCursor.getStatus()); ASSERT_FALSE(pinnedCursor.getValue().remotesExhausted()); } @@ -802,7 +805,7 @@ TEST_F(ClusterCursorManagerTest, DoNotReapKilledPinnedCursors) { nss, ClusterCursorManager::CursorType::NamespaceNotSharded, ClusterCursorManager::CursorLifetime::Mortal)); - auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId); + auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, nullptr); ASSERT_OK(pinnedCursor.getStatus()); ASSERT_OK(getManager()->killCursor(nss, cursorId)); ASSERT(!isMockCursorKilled(0)); @@ -850,7 +853,7 @@ TEST_F(ClusterCursorManagerTest, CannotCheckoutCursorDuringShutdown) { ASSERT(isMockCursorKilled(0)); ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, - getManager()->checkOutCursor(nss, cursorId).getStatus()); + getManager()->checkOutCursor(nss, cursorId, nullptr).getStatus()); } } // namespace diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp index 8f7b260e7a1..57d6ece2c87 100644 --- a/src/mongo/s/query/cluster_find.cpp +++ b/src/mongo/s/query/cluster_find.cpp @@ -180,6 +180,7 @@ StatusWith<CursorId> runQueryWithoutRetrying(OperationContext* txn, params.isTailable = query.getQueryRequest().isTailable(); params.isAwaitData = query.getQueryRequest().isAwaitData(); params.isAllowPartialResults = query.getQueryRequest().isAllowPartialResults(); + params.txn = txn; // This is the batchSize passed to each subsequent getMore command issued by the cursor. We // usually use the batchSize associated with the initial find, but as it is illegal to send a @@ -358,7 +359,7 @@ StatusWith<CursorResponse> ClusterFind::runGetMore(OperationContext* txn, const GetMoreRequest& request) { auto cursorManager = grid.getCursorManager(); - auto pinnedCursor = cursorManager->checkOutCursor(request.nss, request.cursorid); + auto pinnedCursor = cursorManager->checkOutCursor(request.nss, request.cursorid, txn); if (!pinnedCursor.isOK()) { return pinnedCursor.getStatus(); } diff --git a/src/mongo/s/query/router_exec_stage.h b/src/mongo/s/query/router_exec_stage.h index 1f2dbdf6c7c..0e10d9edff2 100644 --- a/src/mongo/s/query/router_exec_stage.h +++ b/src/mongo/s/query/router_exec_stage.h @@ -37,6 +37,8 @@ namespace mongo { +class OperationContext; + /** * This is the lightweight mongoS analogue of the PlanStage abstraction used to execute queries on * mongoD (see mongo/db/plan_stage.h). @@ -85,6 +87,13 @@ public: */ virtual Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) = 0; + /** + * Update the operation context for remote requests. + * + * Network requests depend on having a valid operation context for user initiated actions. + */ + virtual void setOperationContext(OperationContext* txn) = 0; + protected: /** * Returns an unowned pointer to the child stage, or nullptr if there is no child. diff --git a/src/mongo/s/query/router_stage_limit.cpp b/src/mongo/s/query/router_stage_limit.cpp index 9a9f77fbf00..5f7db02ec7b 100644 --- a/src/mongo/s/query/router_stage_limit.cpp +++ b/src/mongo/s/query/router_stage_limit.cpp @@ -67,4 +67,8 @@ Status RouterStageLimit::setAwaitDataTimeout(Milliseconds awaitDataTimeout) { return getChildStage()->setAwaitDataTimeout(awaitDataTimeout); } +void RouterStageLimit::setOperationContext(OperationContext* txn) { + return getChildStage()->setOperationContext(txn); +} + } // namespace mongo diff --git a/src/mongo/s/query/router_stage_limit.h b/src/mongo/s/query/router_stage_limit.h index 0db06c30c3b..366a964f2a4 100644 --- a/src/mongo/s/query/router_stage_limit.h +++ b/src/mongo/s/query/router_stage_limit.h @@ -47,6 +47,8 @@ public: Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) final; + void setOperationContext(OperationContext* txn) final; + private: long long _limit; diff --git a/src/mongo/s/query/router_stage_merge.cpp b/src/mongo/s/query/router_stage_merge.cpp index 527bc0f0063..0e9304d9952 100644 --- a/src/mongo/s/query/router_stage_merge.cpp +++ b/src/mongo/s/query/router_stage_merge.cpp @@ -68,4 +68,8 @@ Status RouterStageMerge::setAwaitDataTimeout(Milliseconds awaitDataTimeout) { return _arm.setAwaitDataTimeout(awaitDataTimeout); } +void RouterStageMerge::setOperationContext(OperationContext* txn) { + return _arm.setOperationContext(txn); +} + } // namespace mongo diff --git a/src/mongo/s/query/router_stage_merge.h b/src/mongo/s/query/router_stage_merge.h index d74870f8a94..a75d5a46f8d 100644 --- a/src/mongo/s/query/router_stage_merge.h +++ b/src/mongo/s/query/router_stage_merge.h @@ -53,6 +53,8 @@ public: Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) final; + void setOperationContext(OperationContext* txn) final; + private: // Not owned here. executor::TaskExecutor* _executor; diff --git a/src/mongo/s/query/router_stage_mock.cpp b/src/mongo/s/query/router_stage_mock.cpp index 179635bbb08..daad6fe6d07 100644 --- a/src/mongo/s/query/router_stage_mock.cpp +++ b/src/mongo/s/query/router_stage_mock.cpp @@ -73,6 +73,10 @@ Status RouterStageMock::setAwaitDataTimeout(Milliseconds awaitDataTimeout) { return Status::OK(); } +void RouterStageMock::setOperationContext(OperationContext* txn) { + // Do nothing +} + StatusWith<Milliseconds> RouterStageMock::getAwaitDataTimeout() { if (!_awaitDataTimeout) { return Status(ErrorCodes::BadValue, "no awaitData timeout set"); diff --git a/src/mongo/s/query/router_stage_mock.h b/src/mongo/s/query/router_stage_mock.h index b83e2879096..255ae75b595 100644 --- a/src/mongo/s/query/router_stage_mock.h +++ b/src/mongo/s/query/router_stage_mock.h @@ -51,6 +51,8 @@ public: Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) final; + void setOperationContext(OperationContext* txn) final; + /** * Queues a BSONObj to be returned. */ diff --git a/src/mongo/s/query/router_stage_remove_sortkey.cpp b/src/mongo/s/query/router_stage_remove_sortkey.cpp index 16e6f9407a4..949182e3a1a 100644 --- a/src/mongo/s/query/router_stage_remove_sortkey.cpp +++ b/src/mongo/s/query/router_stage_remove_sortkey.cpp @@ -69,4 +69,8 @@ Status RouterStageRemoveSortKey::setAwaitDataTimeout(Milliseconds awaitDataTimeo return getChildStage()->setAwaitDataTimeout(awaitDataTimeout); } +void RouterStageRemoveSortKey::setOperationContext(OperationContext* txn) { + return getChildStage()->setOperationContext(txn); +} + } // namespace mongo diff --git a/src/mongo/s/query/router_stage_remove_sortkey.h b/src/mongo/s/query/router_stage_remove_sortkey.h index 6ef60012a4d..f7965312d47 100644 --- a/src/mongo/s/query/router_stage_remove_sortkey.h +++ b/src/mongo/s/query/router_stage_remove_sortkey.h @@ -48,6 +48,8 @@ public: bool remotesExhausted() final; Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) final; + + void setOperationContext(OperationContext* txn) final; }; } // namespace mongo diff --git a/src/mongo/s/query/router_stage_skip.cpp b/src/mongo/s/query/router_stage_skip.cpp index 536c3d173a2..af746d5e430 100644 --- a/src/mongo/s/query/router_stage_skip.cpp +++ b/src/mongo/s/query/router_stage_skip.cpp @@ -68,4 +68,8 @@ Status RouterStageSkip::setAwaitDataTimeout(Milliseconds awaitDataTimeout) { return getChildStage()->setAwaitDataTimeout(awaitDataTimeout); } +void RouterStageSkip::setOperationContext(OperationContext* txn) { + return getChildStage()->setOperationContext(txn); +} + } // namespace mongo diff --git a/src/mongo/s/query/router_stage_skip.h b/src/mongo/s/query/router_stage_skip.h index 35994d31e3e..430d3748c91 100644 --- a/src/mongo/s/query/router_stage_skip.h +++ b/src/mongo/s/query/router_stage_skip.h @@ -47,6 +47,8 @@ public: Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) final; + void setOperationContext(OperationContext* txn) final; + private: long long _skip; diff --git a/src/mongo/s/sharding_egress_metadata_hook.cpp b/src/mongo/s/sharding_egress_metadata_hook.cpp index 0ed82987534..49ee7c03d60 100644 --- a/src/mongo/s/sharding_egress_metadata_hook.cpp +++ b/src/mongo/s/sharding_egress_metadata_hook.cpp @@ -50,10 +50,11 @@ namespace rpc { using std::shared_ptr; Status ShardingEgressMetadataHook::writeRequestMetadata(bool shardedConnection, + OperationContext* txn, const StringData target, BSONObjBuilder* metadataBob) { try { - audit::writeImpersonatedUsersToMetadata(metadataBob); + audit::writeImpersonatedUsersToMetadata(txn, metadataBob); if (!shardedConnection) { return Status::OK(); } @@ -64,10 +65,11 @@ Status ShardingEgressMetadataHook::writeRequestMetadata(bool shardedConnection, } } -Status ShardingEgressMetadataHook::writeRequestMetadata(const HostAndPort& target, +Status ShardingEgressMetadataHook::writeRequestMetadata(OperationContext* txn, + const HostAndPort& target, BSONObjBuilder* metadataBob) { try { - audit::writeImpersonatedUsersToMetadata(metadataBob); + audit::writeImpersonatedUsersToMetadata(txn, metadataBob); rpc::ConfigServerMetadata(_getConfigServerOpTime()).writeToMetadata(metadataBob); return Status::OK(); } catch (...) { diff --git a/src/mongo/s/sharding_egress_metadata_hook.h b/src/mongo/s/sharding_egress_metadata_hook.h index c510aefaab4..df105c813bf 100644 --- a/src/mongo/s/sharding_egress_metadata_hook.h +++ b/src/mongo/s/sharding_egress_metadata_hook.h @@ -46,7 +46,9 @@ public: virtual ~ShardingEgressMetadataHook() = default; Status readReplyMetadata(const HostAndPort& replySource, const BSONObj& metadataObj) override; - Status writeRequestMetadata(const HostAndPort& target, BSONObjBuilder* metadataBob) override; + Status writeRequestMetadata(OperationContext* txn, + const HostAndPort& target, + BSONObjBuilder* metadataBob) override; // These overloaded methods exist to allow ShardingConnectionHook, which is soon to be // deprecated, to use the logic in ShardingEgressMetadataHook instead of duplicating the @@ -55,6 +57,7 @@ public: // contact. Status readReplyMetadata(const StringData replySource, const BSONObj& metadataObj); Status writeRequestMetadata(bool shardedConnection, + OperationContext* txn, const StringData target, BSONObjBuilder* metadataBob); diff --git a/src/mongo/s/sharding_initialization.cpp b/src/mongo/s/sharding_initialization.cpp index 61f0ed8d66f..6dde6355a82 100644 --- a/src/mongo/s/sharding_initialization.cpp +++ b/src/mongo/s/sharding_initialization.cpp @@ -95,13 +95,13 @@ std::unique_ptr<ShardingCatalogClient> makeCatalogClient(ServiceContext* service std::unique_ptr<TaskExecutorPool> makeTaskExecutorPool( std::unique_ptr<NetworkInterface> fixedNet, - std::unique_ptr<rpc::EgressMetadataHook> metadataHook) { + rpc::ShardingEgressMetadataHookBuilder metadataHookBuilder) { std::vector<std::unique_ptr<executor::TaskExecutor>> executors; for (size_t i = 0; i < TaskExecutorPool::getSuggestedPoolSize(); ++i) { auto net = executor::makeNetworkInterface( "NetworkInterfaceASIO-TaskExecutorPool-" + std::to_string(i), stdx::make_unique<ShardingNetworkConnectionHook>(), - std::move(metadataHook)); + metadataHookBuilder()); auto netPtr = net.get(); auto exec = stdx::make_unique<ThreadPoolTaskExecutor>( stdx::make_unique<NetworkInterfaceThreadPool>(netPtr), std::move(net)); @@ -148,7 +148,7 @@ Status initializeGlobalShardingState(OperationContext* txn, stdx::make_unique<ShardingNetworkConnectionHook>(), hookBuilder()); auto networkPtr = network.get(); - auto executorPool = makeTaskExecutorPool(std::move(network), hookBuilder()); + auto executorPool = makeTaskExecutorPool(std::move(network), hookBuilder); executorPool->startup(); auto shardRegistry(stdx::make_unique<ShardRegistry>(std::move(shardFactory), configCS)); |