summaryrefslogtreecommitdiff
path: root/src/mongo/s
diff options
context:
space:
mode:
authorMark Benvenuto <mark.benvenuto@mongodb.com>2016-07-26 15:23:10 -0400
committerMark Benvenuto <mark.benvenuto@mongodb.com>2016-07-26 15:23:51 -0400
commit1febb4ceb0e59743b0a49af35db10c4c689aa130 (patch)
tree431445a395fd4fbd1e7c94bf1c4151bba714088d /src/mongo/s
parent7daf57f28f564329e92b8779cf12845776b958b3 (diff)
downloadmongo-1febb4ceb0e59743b0a49af35db10c4c689aa130.tar.gz
SERVER-24615 Add support for OperationContext in EgressMetadataHook
Diffstat (limited to 'src/mongo/s')
-rw-r--r--src/mongo/s/balancer/migration_manager.cpp2
-rw-r--r--src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp4
-rw-r--r--src/mongo/s/client/dbclient_multi_command.cpp5
-rw-r--r--src/mongo/s/client/shard_remote.cpp1
-rw-r--r--src/mongo/s/client/sharding_connection_hook.cpp8
-rw-r--r--src/mongo/s/query/async_results_merger.cpp14
-rw-r--r--src/mongo/s/query/async_results_merger.h7
-rw-r--r--src/mongo/s/query/cluster_client_cursor.h8
-rw-r--r--src/mongo/s/query/cluster_client_cursor_impl.cpp4
-rw-r--r--src/mongo/s/query/cluster_client_cursor_impl.h2
-rw-r--r--src/mongo/s/query/cluster_client_cursor_mock.cpp5
-rw-r--r--src/mongo/s/query/cluster_client_cursor_mock.h2
-rw-r--r--src/mongo/s/query/cluster_client_cursor_params.h4
-rw-r--r--src/mongo/s/query/cluster_cursor_manager.cpp13
-rw-r--r--src/mongo/s/query/cluster_cursor_manager.h13
-rw-r--r--src/mongo/s/query/cluster_cursor_manager_test.cpp61
-rw-r--r--src/mongo/s/query/cluster_find.cpp3
-rw-r--r--src/mongo/s/query/router_exec_stage.h9
-rw-r--r--src/mongo/s/query/router_stage_limit.cpp4
-rw-r--r--src/mongo/s/query/router_stage_limit.h2
-rw-r--r--src/mongo/s/query/router_stage_merge.cpp4
-rw-r--r--src/mongo/s/query/router_stage_merge.h2
-rw-r--r--src/mongo/s/query/router_stage_mock.cpp4
-rw-r--r--src/mongo/s/query/router_stage_mock.h2
-rw-r--r--src/mongo/s/query/router_stage_remove_sortkey.cpp4
-rw-r--r--src/mongo/s/query/router_stage_remove_sortkey.h2
-rw-r--r--src/mongo/s/query/router_stage_skip.cpp4
-rw-r--r--src/mongo/s/query/router_stage_skip.h2
-rw-r--r--src/mongo/s/sharding_egress_metadata_hook.cpp8
-rw-r--r--src/mongo/s/sharding_egress_metadata_hook.h5
-rw-r--r--src/mongo/s/sharding_initialization.cpp6
31 files changed, 165 insertions, 49 deletions
diff --git a/src/mongo/s/balancer/migration_manager.cpp b/src/mongo/s/balancer/migration_manager.cpp
index 3b24561ade6..209c90b7106 100644
--- a/src/mongo/s/balancer/migration_manager.cpp
+++ b/src/mongo/s/balancer/migration_manager.cpp
@@ -178,7 +178,7 @@ void MigrationManager::_executeMigrations(OperationContext* txn,
continue;
}
- RemoteCommandRequest remoteRequest(host.getValue(), "admin", moveChunkRequestObj);
+ RemoteCommandRequest remoteRequest(host.getValue(), "admin", moveChunkRequestObj, txn);
StatusWith<RemoteCommandResponse> remoteCommandResponse(
Status{ErrorCodes::InternalError, "Uninitialized value"});
diff --git a/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp b/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp
index f8aec3aea0c..bd587160121 100644
--- a/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp
+++ b/src/mongo/s/catalog/replset/sharding_catalog_manager_impl.cpp
@@ -269,7 +269,7 @@ StatusWith<Shard::CommandResponse> ShardingCatalogManagerImpl::_runCommandForAdd
}
executor::RemoteCommandRequest request(
- host.getValue(), dbName, cmdObj, rpc::makeEmptyMetadata(), Seconds(30));
+ host.getValue(), dbName, cmdObj, rpc::makeEmptyMetadata(), nullptr, Seconds(30));
StatusWith<executor::RemoteCommandResponse> swResponse =
Status(ErrorCodes::InternalError, "Internal error running command");
@@ -1167,7 +1167,7 @@ void ShardingCatalogManagerImpl::_scheduleAddShardTask(
}
executor::RemoteCommandRequest request(
- swHost.getValue(), "admin", commandRequest, rpc::makeEmptyMetadata(), Seconds(30));
+ swHost.getValue(), "admin", commandRequest, rpc::makeEmptyMetadata(), nullptr, Seconds(30));
const RemoteCommandCallbackFn callback =
stdx::bind(&ShardingCatalogManagerImpl::_handleAddShardTaskResponse,
diff --git a/src/mongo/s/client/dbclient_multi_command.cpp b/src/mongo/s/client/dbclient_multi_command.cpp
index 2f3e6ae67f8..9b086c15aa3 100644
--- a/src/mongo/s/client/dbclient_multi_command.cpp
+++ b/src/mongo/s/client/dbclient_multi_command.cpp
@@ -31,6 +31,7 @@
#include "mongo/s/client/dbclient_multi_command.h"
#include "mongo/db/audit.h"
+#include "mongo/db/client.h"
#include "mongo/db/dbmessage.h"
#include "mongo/db/wire_version.h"
#include "mongo/rpc/factory.h"
@@ -88,7 +89,9 @@ static void sayAsCmd(DBClientBase* conn, StringData dbName, const BSONObj& cmdOb
BSONObjBuilder metadataBob;
metadataBob.appendElements(upconvertedMetadata);
if (conn->getRequestMetadataWriter()) {
- conn->getRequestMetadataWriter()(&metadataBob, conn->getServerAddress());
+ conn->getRequestMetadataWriter()((haveClient() ? cc().getOperationContext() : nullptr),
+ &metadataBob,
+ conn->getServerAddress());
}
requestBuilder->setDatabase(dbName);
diff --git a/src/mongo/s/client/shard_remote.cpp b/src/mongo/s/client/shard_remote.cpp
index 2fff4b54520..79b7d493863 100644
--- a/src/mongo/s/client/shard_remote.cpp
+++ b/src/mongo/s/client/shard_remote.cpp
@@ -212,6 +212,7 @@ Shard::HostWithResponse ShardRemote::_runCommand(OperationContext* txn,
dbName,
cmdWithMaxTimeMS,
_getMetadataForCommand(readPref),
+ txn,
isConfig() ? kConfigCommandTimeout
: executor::RemoteCommandRequest::kNoTimeout);
StatusWith<RemoteCommandResponse> swResponse =
diff --git a/src/mongo/s/client/sharding_connection_hook.cpp b/src/mongo/s/client/sharding_connection_hook.cpp
index b838c50e10c..b24bccd3381 100644
--- a/src/mongo/s/client/sharding_connection_hook.cpp
+++ b/src/mongo/s/client/sharding_connection_hook.cpp
@@ -77,9 +77,11 @@ void ShardingConnectionHook::onCreate(DBClientBase* conn) {
return _egressHook->readReplyMetadata(target, metadataObj);
});
}
- conn->setRequestMetadataWriter([this](BSONObjBuilder* metadataBob, StringData hostStringData) {
- return _egressHook->writeRequestMetadata(_shardedConnections, hostStringData, metadataBob);
- });
+ conn->setRequestMetadataWriter(
+ [this](OperationContext* txn, BSONObjBuilder* metadataBob, StringData hostStringData) {
+ return _egressHook->writeRequestMetadata(
+ _shardedConnections, txn, hostStringData, metadataBob);
+ });
if (conn->type() == ConnectionString::MASTER) {
diff --git a/src/mongo/s/query/async_results_merger.cpp b/src/mongo/s/query/async_results_merger.cpp
index 9dd3e10c1f3..2d1919769ad 100644
--- a/src/mongo/s/query/async_results_merger.cpp
+++ b/src/mongo/s/query/async_results_merger.cpp
@@ -114,6 +114,11 @@ Status AsyncResultsMerger::setAwaitDataTimeout(Milliseconds awaitDataTimeout) {
return Status::OK();
}
+void AsyncResultsMerger::setOperationContext(OperationContext* txn) {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ _params.txn = txn;
+}
+
bool AsyncResultsMerger::ready() {
stdx::lock_guard<stdx::mutex> lk(_mutex);
return ready_inlock();
@@ -290,8 +295,11 @@ Status AsyncResultsMerger::askForNextBatch_inlock(size_t remoteIndex) {
cmdObj = *remote.initialCmdObj;
}
- executor::RemoteCommandRequest request(
- remote.getTargetHost(), _params.nsString.db().toString(), cmdObj, _metadataObj);
+ executor::RemoteCommandRequest request(remote.getTargetHost(),
+ _params.nsString.db().toString(),
+ cmdObj,
+ _metadataObj,
+ _params.txn);
auto callbackStatus = _executor->scheduleRemoteCommand(
request,
@@ -560,7 +568,7 @@ void AsyncResultsMerger::scheduleKillCursors_inlock() {
BSONObj cmdObj = KillCursorsRequest(_params.nsString, {*remote.cursorId}).toBSON();
executor::RemoteCommandRequest request(
- remote.getTargetHost(), _params.nsString.db().toString(), cmdObj);
+ remote.getTargetHost(), _params.nsString.db().toString(), cmdObj, _params.txn);
_executor->scheduleRemoteCommand(
request,
diff --git a/src/mongo/s/query/async_results_merger.h b/src/mongo/s/query/async_results_merger.h
index b4d04a9c7ad..8956dca52c8 100644
--- a/src/mongo/s/query/async_results_merger.h
+++ b/src/mongo/s/query/async_results_merger.h
@@ -101,6 +101,13 @@ public:
Status setAwaitDataTimeout(Milliseconds awaitDataTimeout);
/**
+ * Update the operation context for remote requests.
+ *
+ * Network requests depend on having a valid operation context for user initiated actions.
+ */
+ void setOperationContext(OperationContext* txn);
+
+ /**
* Returns true if there is no need to schedule remote work in order to take the next action.
* This means that either
* --there is a buffered result which we can return,
diff --git a/src/mongo/s/query/cluster_client_cursor.h b/src/mongo/s/query/cluster_client_cursor.h
index 55f2412f7c8..47f4e46f89a 100644
--- a/src/mongo/s/query/cluster_client_cursor.h
+++ b/src/mongo/s/query/cluster_client_cursor.h
@@ -35,6 +35,7 @@
namespace mongo {
+class OperationContext;
template <typename T>
class StatusWith;
@@ -105,6 +106,13 @@ public:
* the cursor is not tailable + awaitData).
*/
virtual Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) = 0;
+
+ /**
+ * Update the operation context for remote requests.
+ *
+ * Network requests depend on having a valid operation context for user initiated actions.
+ */
+ virtual void setOperationContext(OperationContext* txn) = 0;
};
} // namespace mongo
diff --git a/src/mongo/s/query/cluster_client_cursor_impl.cpp b/src/mongo/s/query/cluster_client_cursor_impl.cpp
index c40dafa91ee..d37b78f2a58 100644
--- a/src/mongo/s/query/cluster_client_cursor_impl.cpp
+++ b/src/mongo/s/query/cluster_client_cursor_impl.cpp
@@ -113,6 +113,10 @@ Status ClusterClientCursorImpl::setAwaitDataTimeout(Milliseconds awaitDataTimeou
return _root->setAwaitDataTimeout(awaitDataTimeout);
}
+void ClusterClientCursorImpl::setOperationContext(OperationContext* txn) {
+ return _root->setOperationContext(txn);
+}
+
std::unique_ptr<RouterExecStage> ClusterClientCursorImpl::buildMergerPlan(
executor::TaskExecutor* executor, ClusterClientCursorParams&& params) {
const auto skip = params.skip;
diff --git a/src/mongo/s/query/cluster_client_cursor_impl.h b/src/mongo/s/query/cluster_client_cursor_impl.h
index b16dae9f9d3..d8645bb7834 100644
--- a/src/mongo/s/query/cluster_client_cursor_impl.h
+++ b/src/mongo/s/query/cluster_client_cursor_impl.h
@@ -105,6 +105,8 @@ public:
Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) final;
+ void setOperationContext(OperationContext* txn) final;
+
private:
/**
* Constructs a cluster client cursor.
diff --git a/src/mongo/s/query/cluster_client_cursor_mock.cpp b/src/mongo/s/query/cluster_client_cursor_mock.cpp
index 4cf9418fc7f..2b10928449e 100644
--- a/src/mongo/s/query/cluster_client_cursor_mock.cpp
+++ b/src/mongo/s/query/cluster_client_cursor_mock.cpp
@@ -97,4 +97,9 @@ Status ClusterClientCursorMock::setAwaitDataTimeout(Milliseconds awaitDataTimeou
MONGO_UNREACHABLE;
}
+
+void ClusterClientCursorMock::setOperationContext(OperationContext* txn) {
+ // Do nothing
+}
+
} // namespace mongo
diff --git a/src/mongo/s/query/cluster_client_cursor_mock.h b/src/mongo/s/query/cluster_client_cursor_mock.h
index 67efae2181a..3749a8abb19 100644
--- a/src/mongo/s/query/cluster_client_cursor_mock.h
+++ b/src/mongo/s/query/cluster_client_cursor_mock.h
@@ -55,6 +55,8 @@ public:
Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) final;
+ void setOperationContext(OperationContext* txn) final;
+
/**
* Returns true unless marked as having non-exhausted remote cursors via
* markRemotesNotExhausted().
diff --git a/src/mongo/s/query/cluster_client_cursor_params.h b/src/mongo/s/query/cluster_client_cursor_params.h
index 5e21b25ea8b..fce3bcf12cb 100644
--- a/src/mongo/s/query/cluster_client_cursor_params.h
+++ b/src/mongo/s/query/cluster_client_cursor_params.h
@@ -137,6 +137,10 @@ struct ClusterClientCursorParams {
// Whether the client indicated that it is willing to receive partial results in the case of an
// unreachable host.
bool isAllowPartialResults = false;
+
+ // OperationContext of the calling thread. Used to append Client dependent metadata to remote
+ // requests.
+ OperationContext* txn;
};
} // mongo
diff --git a/src/mongo/s/query/cluster_cursor_manager.cpp b/src/mongo/s/query/cluster_cursor_manager.cpp
index 924a472d7a5..cca70a8dbdf 100644
--- a/src/mongo/s/query/cluster_cursor_manager.cpp
+++ b/src/mongo/s/query/cluster_cursor_manager.cpp
@@ -152,6 +152,11 @@ Status ClusterCursorManager::PinnedCursor::setAwaitDataTimeout(Milliseconds awai
return _cursor->setAwaitDataTimeout(awaitDataTimeout);
}
+void ClusterCursorManager::PinnedCursor::setOperationContext(OperationContext* txn) {
+ return _cursor->setOperationContext(txn);
+}
+
+
void ClusterCursorManager::PinnedCursor::returnAndKillCursor() {
invariant(_cursor);
@@ -245,7 +250,7 @@ StatusWith<CursorId> ClusterCursorManager::registerCursor(
}
StatusWith<ClusterCursorManager::PinnedCursor> ClusterCursorManager::checkOutCursor(
- const NamespaceString& nss, CursorId cursorId) {
+ const NamespaceString& nss, CursorId cursorId, OperationContext* txn) {
// Read the clock out of the lock.
const auto now = _clockSource->now();
@@ -269,6 +274,7 @@ StatusWith<ClusterCursorManager::PinnedCursor> ClusterCursorManager::checkOutCur
}
entry->setLastActive(now);
+ cursor->setOperationContext(txn);
// Note that pinning a cursor transfers ownership of the underlying ClusterClientCursor object
// to the pin; the CursorEntry is left with a null ClusterClientCursor.
@@ -283,11 +289,15 @@ void ClusterCursorManager::checkInCursor(std::unique_ptr<ClusterClientCursor> cu
invariant(cursor);
+ // Reset OperationContext so that non-user initiated operations do not try to use an invalid
+ // operation context
+ cursor->setOperationContext(nullptr);
const bool remotesExhausted = cursor->remotesExhausted();
CursorEntry* entry = getEntry_inlock(nss, cursorId);
invariant(entry);
+
entry->returnCursor(std::move(cursor));
if (cursorState == CursorState::NotExhausted || entry->getKillPending()) {
@@ -390,6 +400,7 @@ std::size_t ClusterCursorManager::reapZombieCursors() {
}
lk.unlock();
+ zombieCursor.getValue()->setOperationContext(nullptr);
zombieCursor.getValue()->kill();
zombieCursor.getValue().reset();
lk.lock();
diff --git a/src/mongo/s/query/cluster_cursor_manager.h b/src/mongo/s/query/cluster_cursor_manager.h
index 7770cc741c8..a1d4b28ba68 100644
--- a/src/mongo/s/query/cluster_cursor_manager.h
+++ b/src/mongo/s/query/cluster_cursor_manager.h
@@ -42,6 +42,7 @@
namespace mongo {
class ClockSource;
+class OperationContext;
template <typename T>
class StatusWith;
@@ -201,6 +202,14 @@ public:
*/
Status setAwaitDataTimeout(Milliseconds awaitDataTimeout);
+
+ /**
+ * Update the operation context for remote requests.
+ *
+ * Network requests depend on having a valid operation context for user initiated actions.
+ */
+ void setOperationContext(OperationContext* txn);
+
private:
// ClusterCursorManager is a friend so that its methods can call the PinnedCursor
// constructor declared below, which is private to prevent clients from calling it directly.
@@ -278,7 +287,9 @@ public:
*
* Does not block.
*/
- StatusWith<PinnedCursor> checkOutCursor(const NamespaceString& nss, CursorId cursorId);
+ StatusWith<PinnedCursor> checkOutCursor(const NamespaceString& nss,
+ CursorId cursorId,
+ OperationContext* txn);
/**
* Informs the manager that the given cursor should be killed. The cursor need not necessarily
diff --git a/src/mongo/s/query/cluster_cursor_manager_test.cpp b/src/mongo/s/query/cluster_cursor_manager_test.cpp
index 0edda882977..ce65558f865 100644
--- a/src/mongo/s/query/cluster_cursor_manager_test.cpp
+++ b/src/mongo/s/query/cluster_cursor_manager_test.cpp
@@ -113,7 +113,7 @@ TEST_F(ClusterCursorManagerTest, RegisterCursor) {
nss,
ClusterCursorManager::CursorType::NamespaceNotSharded,
ClusterCursorManager::CursorLifetime::Mortal));
- auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId);
+ auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, nullptr);
ASSERT_OK(pinnedCursor.getStatus());
auto nextResult = pinnedCursor.getValue().next();
ASSERT_OK(nextResult.getStatus());
@@ -143,7 +143,7 @@ TEST_F(ClusterCursorManagerTest, CheckOutCursorBasic) {
nss,
ClusterCursorManager::CursorType::NamespaceNotSharded,
ClusterCursorManager::CursorLifetime::Mortal));
- auto checkedOutCursor = getManager()->checkOutCursor(nss, cursorId);
+ auto checkedOutCursor = getManager()->checkOutCursor(nss, cursorId, nullptr);
ASSERT_OK(checkedOutCursor.getStatus());
ASSERT_EQ(cursorId, checkedOutCursor.getValue().getCursorId());
auto nextResult = checkedOutCursor.getValue().next();
@@ -170,7 +170,7 @@ TEST_F(ClusterCursorManagerTest, CheckOutCursorMultipleCursors) {
ClusterCursorManager::CursorLifetime::Mortal));
}
for (int i = 0; i < numCursors; ++i) {
- auto pinnedCursor = getManager()->checkOutCursor(nss, cursorIds[i]);
+ auto pinnedCursor = getManager()->checkOutCursor(nss, cursorIds[i], nullptr);
ASSERT_OK(pinnedCursor.getStatus());
auto nextResult = pinnedCursor.getValue().next();
ASSERT_OK(nextResult.getStatus());
@@ -189,9 +189,10 @@ TEST_F(ClusterCursorManagerTest, CheckOutCursorPinned) {
nss,
ClusterCursorManager::CursorType::NamespaceNotSharded,
ClusterCursorManager::CursorLifetime::Mortal));
- auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId);
+ auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, nullptr);
ASSERT_OK(pinnedCursor.getStatus());
- ASSERT_EQ(ErrorCodes::CursorInUse, getManager()->checkOutCursor(nss, cursorId).getStatus());
+ ASSERT_EQ(ErrorCodes::CursorInUse,
+ getManager()->checkOutCursor(nss, cursorId, nullptr).getStatus());
}
// Test that checking out a killed cursor returns an error with code ErrorCodes::CursorNotFound.
@@ -202,12 +203,14 @@ TEST_F(ClusterCursorManagerTest, CheckOutCursorKilled) {
ClusterCursorManager::CursorType::NamespaceNotSharded,
ClusterCursorManager::CursorLifetime::Mortal));
ASSERT_OK(getManager()->killCursor(nss, cursorId));
- ASSERT_EQ(ErrorCodes::CursorNotFound, getManager()->checkOutCursor(nss, cursorId).getStatus());
+ ASSERT_EQ(ErrorCodes::CursorNotFound,
+ getManager()->checkOutCursor(nss, cursorId, nullptr).getStatus());
}
// Test that checking out an unknown cursor returns an error with code ErrorCodes::CursorNotFound.
TEST_F(ClusterCursorManagerTest, CheckOutCursorUnknown) {
- ASSERT_EQ(ErrorCodes::CursorNotFound, getManager()->checkOutCursor(nss, 5).getStatus());
+ ASSERT_EQ(ErrorCodes::CursorNotFound,
+ getManager()->checkOutCursor(nss, 5, nullptr).getStatus());
}
// Test that checking out a unknown cursor returns an error with code ErrorCodes::CursorNotFound,
@@ -221,7 +224,7 @@ TEST_F(ClusterCursorManagerTest, CheckOutCursorWrongNamespace) {
ClusterCursorManager::CursorType::NamespaceNotSharded,
ClusterCursorManager::CursorLifetime::Mortal));
ASSERT_EQ(ErrorCodes::CursorNotFound,
- getManager()->checkOutCursor(incorrectNamespace, cursorId).getStatus());
+ getManager()->checkOutCursor(incorrectNamespace, cursorId, nullptr).getStatus());
}
// Test that checking out a unknown cursor returns an error with code ErrorCodes::CursorNotFound,
@@ -233,7 +236,7 @@ TEST_F(ClusterCursorManagerTest, CheckOutCursorWrongCursorId) {
ClusterCursorManager::CursorType::NamespaceNotSharded,
ClusterCursorManager::CursorLifetime::Mortal));
ASSERT_EQ(ErrorCodes::CursorNotFound,
- getManager()->checkOutCursor(nss, cursorId + 1).getStatus());
+ getManager()->checkOutCursor(nss, cursorId + 1, nullptr).getStatus());
}
// Test that checking out a cursor updates the 'last active' time associated with the cursor to the
@@ -246,7 +249,7 @@ TEST_F(ClusterCursorManagerTest, CheckOutCursorUpdateActiveTime) {
ClusterCursorManager::CursorLifetime::Mortal));
Date_t cursorRegistrationTime = getClockSource()->now();
getClockSource()->advance(Milliseconds(1));
- auto checkedOutCursor = getManager()->checkOutCursor(nss, cursorId);
+ auto checkedOutCursor = getManager()->checkOutCursor(nss, cursorId, nullptr);
ASSERT_OK(checkedOutCursor.getStatus());
checkedOutCursor.getValue().returnCursor(ClusterCursorManager::CursorState::NotExhausted);
getManager()->killMortalCursorsInactiveSince(cursorRegistrationTime);
@@ -262,7 +265,7 @@ TEST_F(ClusterCursorManagerTest, KillCursorBasic) {
nss,
ClusterCursorManager::CursorType::NamespaceNotSharded,
ClusterCursorManager::CursorLifetime::Mortal));
- auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId);
+ auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, nullptr);
ASSERT_OK(pinnedCursor.getStatus());
ASSERT_OK(getManager()->killCursor(nss, pinnedCursor.getValue().getCursorId()));
pinnedCursor.getValue().returnCursor(ClusterCursorManager::CursorState::NotExhausted);
@@ -434,7 +437,7 @@ TEST_F(ClusterCursorManagerTest, ReapZombieCursorsSkipPinned) {
nss,
ClusterCursorManager::CursorType::NamespaceNotSharded,
ClusterCursorManager::CursorLifetime::Mortal));
- auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId);
+ auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, nullptr);
ASSERT(!isMockCursorKilled(0));
getManager()->reapZombieCursors();
ASSERT(!isMockCursorKilled(0));
@@ -484,7 +487,7 @@ TEST_F(ClusterCursorManagerTest, StatsPinCursor) {
nss,
ClusterCursorManager::CursorType::NamespaceSharded,
ClusterCursorManager::CursorLifetime::Mortal));
- auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId);
+ auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, nullptr);
ASSERT_EQ(1U, getManager()->stats().cursorsPinned);
}
@@ -542,7 +545,7 @@ TEST_F(ClusterCursorManagerTest, StatsKillPinnedCursor) {
nss,
ClusterCursorManager::CursorType::NamespaceSharded,
ClusterCursorManager::CursorLifetime::Mortal));
- auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId);
+ auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, nullptr);
ASSERT_EQ(1U, getManager()->stats().cursorsPinned);
ASSERT_OK(getManager()->killCursor(nss, cursorId));
ASSERT_EQ(0U, getManager()->stats().cursorsPinned);
@@ -555,7 +558,7 @@ TEST_F(ClusterCursorManagerTest, StatsExhaustShardedCursor) {
nss,
ClusterCursorManager::CursorType::NamespaceSharded,
ClusterCursorManager::CursorLifetime::Mortal));
- auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId);
+ auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, nullptr);
ASSERT_OK(pinnedCursor.getStatus());
ASSERT_OK(pinnedCursor.getValue().next().getStatus());
ASSERT_EQ(1U, getManager()->stats().cursorsSharded);
@@ -570,7 +573,7 @@ TEST_F(ClusterCursorManagerTest, StatsExhaustNotShardedCursor) {
nss,
ClusterCursorManager::CursorType::NamespaceNotSharded,
ClusterCursorManager::CursorLifetime::Mortal));
- auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId);
+ auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, nullptr);
ASSERT_OK(pinnedCursor.getStatus());
ASSERT_OK(pinnedCursor.getValue().next().getStatus());
ASSERT_EQ(1U, getManager()->stats().cursorsNotSharded);
@@ -586,7 +589,7 @@ TEST_F(ClusterCursorManagerTest, StatsExhaustPinnedCursor) {
nss,
ClusterCursorManager::CursorType::NamespaceNotSharded,
ClusterCursorManager::CursorLifetime::Mortal));
- auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId);
+ auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, nullptr);
ASSERT_OK(pinnedCursor.getStatus());
ASSERT_OK(pinnedCursor.getValue().next().getStatus());
ASSERT_EQ(1U, getManager()->stats().cursorsPinned);
@@ -602,7 +605,7 @@ TEST_F(ClusterCursorManagerTest, StatsCheckInWithoutExhaustingPinnedCursor) {
nss,
ClusterCursorManager::CursorType::NamespaceNotSharded,
ClusterCursorManager::CursorLifetime::Mortal));
- auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId);
+ auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, nullptr);
ASSERT_OK(pinnedCursor.getStatus());
ASSERT_OK(pinnedCursor.getValue().next().getStatus());
ASSERT_EQ(1U, getManager()->stats().cursorsPinned);
@@ -685,13 +688,13 @@ TEST_F(ClusterCursorManagerTest, PinnedCursorReturnCursorNotExhausted) {
nss,
ClusterCursorManager::CursorType::NamespaceNotSharded,
ClusterCursorManager::CursorLifetime::Mortal));
- auto registeredCursor = getManager()->checkOutCursor(nss, cursorId);
+ auto registeredCursor = getManager()->checkOutCursor(nss, cursorId, nullptr);
ASSERT_OK(registeredCursor.getStatus());
ASSERT_EQ(cursorId, registeredCursor.getValue().getCursorId());
ASSERT_NE(0, cursorId);
registeredCursor.getValue().returnCursor(ClusterCursorManager::CursorState::NotExhausted);
ASSERT_EQ(0, registeredCursor.getValue().getCursorId());
- auto checkedOutCursor = getManager()->checkOutCursor(nss, cursorId);
+ auto checkedOutCursor = getManager()->checkOutCursor(nss, cursorId, nullptr);
ASSERT_OK(checkedOutCursor.getStatus());
}
@@ -703,7 +706,7 @@ TEST_F(ClusterCursorManagerTest, PinnedCursorReturnCursorExhausted) {
nss,
ClusterCursorManager::CursorType::NamespaceNotSharded,
ClusterCursorManager::CursorLifetime::Mortal));
- auto registeredCursor = getManager()->checkOutCursor(nss, cursorId);
+ auto registeredCursor = getManager()->checkOutCursor(nss, cursorId, nullptr);
ASSERT_OK(registeredCursor.getStatus());
ASSERT_EQ(cursorId, registeredCursor.getValue().getCursorId());
ASSERT_NE(0, cursorId);
@@ -714,7 +717,7 @@ TEST_F(ClusterCursorManagerTest, PinnedCursorReturnCursorExhausted) {
// Cursor should have been destroyed without ever being killed. To be sure that the cursor has
// not been marked kill pending but not yet destroyed (i.e. that the cursor is not a zombie), we
// reapZombieCursors() and check that the cursor still has not been killed.
- ASSERT_NOT_OK(getManager()->checkOutCursor(nss, cursorId).getStatus());
+ ASSERT_NOT_OK(getManager()->checkOutCursor(nss, cursorId, nullptr).getStatus());
ASSERT(!isMockCursorKilled(0));
getManager()->reapZombieCursors();
ASSERT(!isMockCursorKilled(0));
@@ -734,7 +737,7 @@ TEST_F(ClusterCursorManagerTest, PinnedCursorReturnCursorExhaustedWithNonExhaust
nss,
ClusterCursorManager::CursorType::NamespaceNotSharded,
ClusterCursorManager::CursorLifetime::Mortal));
- auto registeredCursor = getManager()->checkOutCursor(nss, cursorId);
+ auto registeredCursor = getManager()->checkOutCursor(nss, cursorId, nullptr);
ASSERT_OK(registeredCursor.getStatus());
ASSERT_EQ(cursorId, registeredCursor.getValue().getCursorId());
ASSERT_NE(0, cursorId);
@@ -743,7 +746,7 @@ TEST_F(ClusterCursorManagerTest, PinnedCursorReturnCursorExhaustedWithNonExhaust
ASSERT_EQ(0, registeredCursor.getValue().getCursorId());
// Cursor should be kill pending, so it will be killed during reaping.
- ASSERT_NOT_OK(getManager()->checkOutCursor(nss, cursorId).getStatus());
+ ASSERT_NOT_OK(getManager()->checkOutCursor(nss, cursorId, nullptr).getStatus());
ASSERT(!isMockCursorKilled(0));
getManager()->reapZombieCursors();
ASSERT(isMockCursorKilled(0));
@@ -757,7 +760,7 @@ TEST_F(ClusterCursorManagerTest, PinnedCursorMoveAssignmentKill) {
nss,
ClusterCursorManager::CursorType::NamespaceNotSharded,
ClusterCursorManager::CursorLifetime::Mortal));
- auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId);
+ auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, nullptr);
pinnedCursor = ClusterCursorManager::PinnedCursor();
ASSERT(!isMockCursorKilled(0));
getManager()->reapZombieCursors();
@@ -772,7 +775,7 @@ TEST_F(ClusterCursorManagerTest, PinnedCursorDestructorKill) {
nss,
ClusterCursorManager::CursorType::NamespaceNotSharded,
ClusterCursorManager::CursorLifetime::Mortal));
- auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId);
+ auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, nullptr);
}
ASSERT(!isMockCursorKilled(0));
getManager()->reapZombieCursors();
@@ -790,7 +793,7 @@ TEST_F(ClusterCursorManagerTest, RemotesExhausted) {
nss,
ClusterCursorManager::CursorType::NamespaceNotSharded,
ClusterCursorManager::CursorLifetime::Mortal));
- auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId);
+ auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, nullptr);
ASSERT_OK(pinnedCursor.getStatus());
ASSERT_FALSE(pinnedCursor.getValue().remotesExhausted());
}
@@ -802,7 +805,7 @@ TEST_F(ClusterCursorManagerTest, DoNotReapKilledPinnedCursors) {
nss,
ClusterCursorManager::CursorType::NamespaceNotSharded,
ClusterCursorManager::CursorLifetime::Mortal));
- auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId);
+ auto pinnedCursor = getManager()->checkOutCursor(nss, cursorId, nullptr);
ASSERT_OK(pinnedCursor.getStatus());
ASSERT_OK(getManager()->killCursor(nss, cursorId));
ASSERT(!isMockCursorKilled(0));
@@ -850,7 +853,7 @@ TEST_F(ClusterCursorManagerTest, CannotCheckoutCursorDuringShutdown) {
ASSERT(isMockCursorKilled(0));
ASSERT_EQUALS(ErrorCodes::ShutdownInProgress,
- getManager()->checkOutCursor(nss, cursorId).getStatus());
+ getManager()->checkOutCursor(nss, cursorId, nullptr).getStatus());
}
} // namespace
diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp
index 8f7b260e7a1..57d6ece2c87 100644
--- a/src/mongo/s/query/cluster_find.cpp
+++ b/src/mongo/s/query/cluster_find.cpp
@@ -180,6 +180,7 @@ StatusWith<CursorId> runQueryWithoutRetrying(OperationContext* txn,
params.isTailable = query.getQueryRequest().isTailable();
params.isAwaitData = query.getQueryRequest().isAwaitData();
params.isAllowPartialResults = query.getQueryRequest().isAllowPartialResults();
+ params.txn = txn;
// This is the batchSize passed to each subsequent getMore command issued by the cursor. We
// usually use the batchSize associated with the initial find, but as it is illegal to send a
@@ -358,7 +359,7 @@ StatusWith<CursorResponse> ClusterFind::runGetMore(OperationContext* txn,
const GetMoreRequest& request) {
auto cursorManager = grid.getCursorManager();
- auto pinnedCursor = cursorManager->checkOutCursor(request.nss, request.cursorid);
+ auto pinnedCursor = cursorManager->checkOutCursor(request.nss, request.cursorid, txn);
if (!pinnedCursor.isOK()) {
return pinnedCursor.getStatus();
}
diff --git a/src/mongo/s/query/router_exec_stage.h b/src/mongo/s/query/router_exec_stage.h
index 1f2dbdf6c7c..0e10d9edff2 100644
--- a/src/mongo/s/query/router_exec_stage.h
+++ b/src/mongo/s/query/router_exec_stage.h
@@ -37,6 +37,8 @@
namespace mongo {
+class OperationContext;
+
/**
* This is the lightweight mongoS analogue of the PlanStage abstraction used to execute queries on
* mongoD (see mongo/db/plan_stage.h).
@@ -85,6 +87,13 @@ public:
*/
virtual Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) = 0;
+ /**
+ * Update the operation context for remote requests.
+ *
+ * Network requests depend on having a valid operation context for user initiated actions.
+ */
+ virtual void setOperationContext(OperationContext* txn) = 0;
+
protected:
/**
* Returns an unowned pointer to the child stage, or nullptr if there is no child.
diff --git a/src/mongo/s/query/router_stage_limit.cpp b/src/mongo/s/query/router_stage_limit.cpp
index 9a9f77fbf00..5f7db02ec7b 100644
--- a/src/mongo/s/query/router_stage_limit.cpp
+++ b/src/mongo/s/query/router_stage_limit.cpp
@@ -67,4 +67,8 @@ Status RouterStageLimit::setAwaitDataTimeout(Milliseconds awaitDataTimeout) {
return getChildStage()->setAwaitDataTimeout(awaitDataTimeout);
}
+void RouterStageLimit::setOperationContext(OperationContext* txn) {
+ return getChildStage()->setOperationContext(txn);
+}
+
} // namespace mongo
diff --git a/src/mongo/s/query/router_stage_limit.h b/src/mongo/s/query/router_stage_limit.h
index 0db06c30c3b..366a964f2a4 100644
--- a/src/mongo/s/query/router_stage_limit.h
+++ b/src/mongo/s/query/router_stage_limit.h
@@ -47,6 +47,8 @@ public:
Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) final;
+ void setOperationContext(OperationContext* txn) final;
+
private:
long long _limit;
diff --git a/src/mongo/s/query/router_stage_merge.cpp b/src/mongo/s/query/router_stage_merge.cpp
index 527bc0f0063..0e9304d9952 100644
--- a/src/mongo/s/query/router_stage_merge.cpp
+++ b/src/mongo/s/query/router_stage_merge.cpp
@@ -68,4 +68,8 @@ Status RouterStageMerge::setAwaitDataTimeout(Milliseconds awaitDataTimeout) {
return _arm.setAwaitDataTimeout(awaitDataTimeout);
}
+void RouterStageMerge::setOperationContext(OperationContext* txn) {
+ return _arm.setOperationContext(txn);
+}
+
} // namespace mongo
diff --git a/src/mongo/s/query/router_stage_merge.h b/src/mongo/s/query/router_stage_merge.h
index d74870f8a94..a75d5a46f8d 100644
--- a/src/mongo/s/query/router_stage_merge.h
+++ b/src/mongo/s/query/router_stage_merge.h
@@ -53,6 +53,8 @@ public:
Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) final;
+ void setOperationContext(OperationContext* txn) final;
+
private:
// Not owned here.
executor::TaskExecutor* _executor;
diff --git a/src/mongo/s/query/router_stage_mock.cpp b/src/mongo/s/query/router_stage_mock.cpp
index 179635bbb08..daad6fe6d07 100644
--- a/src/mongo/s/query/router_stage_mock.cpp
+++ b/src/mongo/s/query/router_stage_mock.cpp
@@ -73,6 +73,10 @@ Status RouterStageMock::setAwaitDataTimeout(Milliseconds awaitDataTimeout) {
return Status::OK();
}
+void RouterStageMock::setOperationContext(OperationContext* txn) {
+ // Do nothing
+}
+
StatusWith<Milliseconds> RouterStageMock::getAwaitDataTimeout() {
if (!_awaitDataTimeout) {
return Status(ErrorCodes::BadValue, "no awaitData timeout set");
diff --git a/src/mongo/s/query/router_stage_mock.h b/src/mongo/s/query/router_stage_mock.h
index b83e2879096..255ae75b595 100644
--- a/src/mongo/s/query/router_stage_mock.h
+++ b/src/mongo/s/query/router_stage_mock.h
@@ -51,6 +51,8 @@ public:
Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) final;
+ void setOperationContext(OperationContext* txn) final;
+
/**
* Queues a BSONObj to be returned.
*/
diff --git a/src/mongo/s/query/router_stage_remove_sortkey.cpp b/src/mongo/s/query/router_stage_remove_sortkey.cpp
index 16e6f9407a4..949182e3a1a 100644
--- a/src/mongo/s/query/router_stage_remove_sortkey.cpp
+++ b/src/mongo/s/query/router_stage_remove_sortkey.cpp
@@ -69,4 +69,8 @@ Status RouterStageRemoveSortKey::setAwaitDataTimeout(Milliseconds awaitDataTimeo
return getChildStage()->setAwaitDataTimeout(awaitDataTimeout);
}
+void RouterStageRemoveSortKey::setOperationContext(OperationContext* txn) {
+ return getChildStage()->setOperationContext(txn);
+}
+
} // namespace mongo
diff --git a/src/mongo/s/query/router_stage_remove_sortkey.h b/src/mongo/s/query/router_stage_remove_sortkey.h
index 6ef60012a4d..f7965312d47 100644
--- a/src/mongo/s/query/router_stage_remove_sortkey.h
+++ b/src/mongo/s/query/router_stage_remove_sortkey.h
@@ -48,6 +48,8 @@ public:
bool remotesExhausted() final;
Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) final;
+
+ void setOperationContext(OperationContext* txn) final;
};
} // namespace mongo
diff --git a/src/mongo/s/query/router_stage_skip.cpp b/src/mongo/s/query/router_stage_skip.cpp
index 536c3d173a2..af746d5e430 100644
--- a/src/mongo/s/query/router_stage_skip.cpp
+++ b/src/mongo/s/query/router_stage_skip.cpp
@@ -68,4 +68,8 @@ Status RouterStageSkip::setAwaitDataTimeout(Milliseconds awaitDataTimeout) {
return getChildStage()->setAwaitDataTimeout(awaitDataTimeout);
}
+void RouterStageSkip::setOperationContext(OperationContext* txn) {
+ return getChildStage()->setOperationContext(txn);
+}
+
} // namespace mongo
diff --git a/src/mongo/s/query/router_stage_skip.h b/src/mongo/s/query/router_stage_skip.h
index 35994d31e3e..430d3748c91 100644
--- a/src/mongo/s/query/router_stage_skip.h
+++ b/src/mongo/s/query/router_stage_skip.h
@@ -47,6 +47,8 @@ public:
Status setAwaitDataTimeout(Milliseconds awaitDataTimeout) final;
+ void setOperationContext(OperationContext* txn) final;
+
private:
long long _skip;
diff --git a/src/mongo/s/sharding_egress_metadata_hook.cpp b/src/mongo/s/sharding_egress_metadata_hook.cpp
index 0ed82987534..49ee7c03d60 100644
--- a/src/mongo/s/sharding_egress_metadata_hook.cpp
+++ b/src/mongo/s/sharding_egress_metadata_hook.cpp
@@ -50,10 +50,11 @@ namespace rpc {
using std::shared_ptr;
Status ShardingEgressMetadataHook::writeRequestMetadata(bool shardedConnection,
+ OperationContext* txn,
const StringData target,
BSONObjBuilder* metadataBob) {
try {
- audit::writeImpersonatedUsersToMetadata(metadataBob);
+ audit::writeImpersonatedUsersToMetadata(txn, metadataBob);
if (!shardedConnection) {
return Status::OK();
}
@@ -64,10 +65,11 @@ Status ShardingEgressMetadataHook::writeRequestMetadata(bool shardedConnection,
}
}
-Status ShardingEgressMetadataHook::writeRequestMetadata(const HostAndPort& target,
+Status ShardingEgressMetadataHook::writeRequestMetadata(OperationContext* txn,
+ const HostAndPort& target,
BSONObjBuilder* metadataBob) {
try {
- audit::writeImpersonatedUsersToMetadata(metadataBob);
+ audit::writeImpersonatedUsersToMetadata(txn, metadataBob);
rpc::ConfigServerMetadata(_getConfigServerOpTime()).writeToMetadata(metadataBob);
return Status::OK();
} catch (...) {
diff --git a/src/mongo/s/sharding_egress_metadata_hook.h b/src/mongo/s/sharding_egress_metadata_hook.h
index c510aefaab4..df105c813bf 100644
--- a/src/mongo/s/sharding_egress_metadata_hook.h
+++ b/src/mongo/s/sharding_egress_metadata_hook.h
@@ -46,7 +46,9 @@ public:
virtual ~ShardingEgressMetadataHook() = default;
Status readReplyMetadata(const HostAndPort& replySource, const BSONObj& metadataObj) override;
- Status writeRequestMetadata(const HostAndPort& target, BSONObjBuilder* metadataBob) override;
+ Status writeRequestMetadata(OperationContext* txn,
+ const HostAndPort& target,
+ BSONObjBuilder* metadataBob) override;
// These overloaded methods exist to allow ShardingConnectionHook, which is soon to be
// deprecated, to use the logic in ShardingEgressMetadataHook instead of duplicating the
@@ -55,6 +57,7 @@ public:
// contact.
Status readReplyMetadata(const StringData replySource, const BSONObj& metadataObj);
Status writeRequestMetadata(bool shardedConnection,
+ OperationContext* txn,
const StringData target,
BSONObjBuilder* metadataBob);
diff --git a/src/mongo/s/sharding_initialization.cpp b/src/mongo/s/sharding_initialization.cpp
index 61f0ed8d66f..6dde6355a82 100644
--- a/src/mongo/s/sharding_initialization.cpp
+++ b/src/mongo/s/sharding_initialization.cpp
@@ -95,13 +95,13 @@ std::unique_ptr<ShardingCatalogClient> makeCatalogClient(ServiceContext* service
std::unique_ptr<TaskExecutorPool> makeTaskExecutorPool(
std::unique_ptr<NetworkInterface> fixedNet,
- std::unique_ptr<rpc::EgressMetadataHook> metadataHook) {
+ rpc::ShardingEgressMetadataHookBuilder metadataHookBuilder) {
std::vector<std::unique_ptr<executor::TaskExecutor>> executors;
for (size_t i = 0; i < TaskExecutorPool::getSuggestedPoolSize(); ++i) {
auto net = executor::makeNetworkInterface(
"NetworkInterfaceASIO-TaskExecutorPool-" + std::to_string(i),
stdx::make_unique<ShardingNetworkConnectionHook>(),
- std::move(metadataHook));
+ metadataHookBuilder());
auto netPtr = net.get();
auto exec = stdx::make_unique<ThreadPoolTaskExecutor>(
stdx::make_unique<NetworkInterfaceThreadPool>(netPtr), std::move(net));
@@ -148,7 +148,7 @@ Status initializeGlobalShardingState(OperationContext* txn,
stdx::make_unique<ShardingNetworkConnectionHook>(),
hookBuilder());
auto networkPtr = network.get();
- auto executorPool = makeTaskExecutorPool(std::move(network), hookBuilder());
+ auto executorPool = makeTaskExecutorPool(std::move(network), hookBuilder);
executorPool->startup();
auto shardRegistry(stdx::make_unique<ShardRegistry>(std::move(shardFactory), configCS));