summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSara Golemon <sara.golemon@mongodb.com>2017-08-08 13:34:39 -0400
committerSara Golemon <sara.golemon@mongodb.com>2017-08-23 09:18:17 -0400
commita09f19822fb2a1e5b662bd8d542dd8e2f2607fc6 (patch)
tree4d396a07bdf8db1752aa4952211f4b46490459b7 /src
parent3b8719aecf7541ee83738d9241bfcbc1281b6ed2 (diff)
downloadmongo-a09f19822fb2a1e5b662bd8d542dd8e2f2607fc6.tar.gz
SERVER-29628 $listLocalSessions aggregation stage
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/auth/action_types.txt1
-rw-r--r--src/mongo/db/auth/authorization_session.cpp4
-rw-r--r--src/mongo/db/auth/role_graph_builtin_roles.cpp2
-rw-r--r--src/mongo/db/logical_session_cache.h18
-rw-r--r--src/mongo/db/logical_session_cache_impl.cpp32
-rw-r--r--src/mongo/db/logical_session_cache_impl.h9
-rw-r--r--src/mongo/db/logical_session_cache_noop.h13
-rw-r--r--src/mongo/db/logical_session_id_helpers.cpp23
-rw-r--r--src/mongo/db/logical_session_id_helpers.h10
-rw-r--r--src/mongo/db/pipeline/SConscript2
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.h4
-rw-r--r--src/mongo/db/pipeline/document_source_list_local_sessions.cpp166
-rw-r--r--src/mongo/db/pipeline/document_source_list_local_sessions.h118
-rw-r--r--src/mongo/db/pipeline/document_sources.idl18
-rw-r--r--src/mongo/db/pipeline/lite_parsed_document_source.h7
-rw-r--r--src/mongo/db/pipeline/lite_parsed_pipeline.h9
-rw-r--r--src/mongo/s/commands/cluster_aggregate.cpp6
17 files changed, 428 insertions, 14 deletions
diff --git a/src/mongo/db/auth/action_types.txt b/src/mongo/db/auth/action_types.txt
index d3b2745d319..06185ac9de9 100644
--- a/src/mongo/db/auth/action_types.txt
+++ b/src/mongo/db/auth/action_types.txt
@@ -70,6 +70,7 @@
"listCollections",
"listDatabases",
"listIndexes",
+"listSessions",
"listShards",
"logRotate",
"moveChunk",
diff --git a/src/mongo/db/auth/authorization_session.cpp b/src/mongo/db/auth/authorization_session.cpp
index 201acb11b00..86cba41f616 100644
--- a/src/mongo/db/auth/authorization_session.cpp
+++ b/src/mongo/db/auth/authorization_session.cpp
@@ -200,10 +200,10 @@ User* AuthorizationSession::getSingleUser() {
if (userNameItr.more()) {
userName = userNameItr.next();
if (userNameItr.more()) {
- uasserted(ErrorCodes::Unauthorized, "there are no users authenticated");
+ uasserted(ErrorCodes::Unauthorized, "too many users are authenticated");
}
} else {
- uasserted(ErrorCodes::Unauthorized, "too many users are authenticated");
+ uasserted(ErrorCodes::Unauthorized, "there are no users authenticated");
}
return lookupUser(userName);
diff --git a/src/mongo/db/auth/role_graph_builtin_roles.cpp b/src/mongo/db/auth/role_graph_builtin_roles.cpp
index 36cc6b4c15a..0ab6e26d052 100644
--- a/src/mongo/db/auth/role_graph_builtin_roles.cpp
+++ b/src/mongo/db/auth/role_graph_builtin_roles.cpp
@@ -186,6 +186,7 @@ MONGO_INITIALIZER(AuthorizationBuiltinRoles)(InitializerContext* context) {
<< ActionType::getShardMap
<< ActionType::hostInfo
<< ActionType::listDatabases
+ << ActionType::listSessions // clusterManager gets this also
<< ActionType::listShards // clusterManager gets this also
<< ActionType::netstat
<< ActionType::replSetGetConfig // clusterManager gets this also
@@ -238,6 +239,7 @@ MONGO_INITIALIZER(AuthorizationBuiltinRoles)(InitializerContext* context) {
<< ActionType::resync // hostManager gets this also
<< ActionType::addShard
<< ActionType::removeShard
+ << ActionType::listSessions // clusterMonitor gets this also
<< ActionType::listShards // clusterMonitor gets this also
<< ActionType::flushRouterConfig // hostManager gets this also
<< ActionType::cleanupOrphaned;
diff --git a/src/mongo/db/logical_session_cache.h b/src/mongo/db/logical_session_cache.h
index dfc1772013a..8e7ba166874 100644
--- a/src/mongo/db/logical_session_cache.h
+++ b/src/mongo/db/logical_session_cache.h
@@ -28,6 +28,8 @@
#pragma once
+#include <boost/optional.hpp>
+
#include "mongo/base/status.h"
#include "mongo/db/logical_session_id.h"
#include "mongo/db/refresh_sessions_gen.h"
@@ -103,6 +105,22 @@ public:
* Returns the number of session records currently in the cache.
*/
virtual size_t size() = 0;
+
+ /**
+ * Ennumerate all LogicalSessionId keys currently in the cache.
+ */
+ virtual std::vector<LogicalSessionId> listIds() const = 0;
+
+ /**
+ * Ennumerate all LogicalSessionId keys in the cache for the given UserDigests.
+ */
+ virtual std::vector<LogicalSessionId> listIds(
+ const std::vector<SHA256Block>& userDigest) const = 0;
+
+ /**
+ * Retrieve a LogicalSessionRecord by LogicalSessionId, if it exists in the cache.
+ */
+ virtual boost::optional<LogicalSessionRecord> peekCached(const LogicalSessionId& id) const = 0;
};
} // namespace mongo
diff --git a/src/mongo/db/logical_session_cache_impl.cpp b/src/mongo/db/logical_session_cache_impl.cpp
index c6dd6df3630..5cb38a51b08 100644
--- a/src/mongo/db/logical_session_cache_impl.cpp
+++ b/src/mongo/db/logical_session_cache_impl.cpp
@@ -263,4 +263,36 @@ boost::optional<LogicalSessionRecord> LogicalSessionCacheImpl::_addToCache(
return _cache.add(record.getId(), std::move(record));
}
+std::vector<LogicalSessionId> LogicalSessionCacheImpl::listIds() const {
+ stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ std::vector<LogicalSessionId> ret;
+ ret.reserve(_cache.size());
+ for (const auto& id : _cache) {
+ ret.push_back(id.first);
+ }
+ return ret;
+}
+
+std::vector<LogicalSessionId> LogicalSessionCacheImpl::listIds(
+ const std::vector<SHA256Block>& userDigests) const {
+ stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ std::vector<LogicalSessionId> ret;
+ for (const auto& it : _cache) {
+ if (std::find(userDigests.cbegin(), userDigests.cend(), it.first.getUid()) !=
+ userDigests.cend()) {
+ ret.push_back(it.first);
+ }
+ }
+ return ret;
+}
+
+boost::optional<LogicalSessionRecord> LogicalSessionCacheImpl::peekCached(
+ const LogicalSessionId& id) const {
+ stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ const auto it = _cache.cfind(id);
+ if (it == _cache.cend()) {
+ return boost::none;
+ }
+ return it->second;
+}
} // namespace mongo
diff --git a/src/mongo/db/logical_session_cache_impl.h b/src/mongo/db/logical_session_cache_impl.h
index 92b8dd7301c..47a2f897f3b 100644
--- a/src/mongo/db/logical_session_cache_impl.h
+++ b/src/mongo/db/logical_session_cache_impl.h
@@ -123,6 +123,13 @@ public:
size_t size() override;
+ std::vector<LogicalSessionId> listIds() const override;
+
+ std::vector<LogicalSessionId> listIds(
+ const std::vector<SHA256Block>& userDigest) const override;
+
+ boost::optional<LogicalSessionRecord> peekCached(const LogicalSessionId& id) const override;
+
private:
/**
* Internal methods to handle scheduling and perform refreshes for active
@@ -147,7 +154,7 @@ private:
std::unique_ptr<ServiceLiason> _service;
std::unique_ptr<SessionsCollection> _sessionsColl;
- stdx::mutex _cacheMutex;
+ mutable stdx::mutex _cacheMutex;
LRUCache<LogicalSessionId, LogicalSessionRecord, LogicalSessionIdHash> _cache;
};
diff --git a/src/mongo/db/logical_session_cache_noop.h b/src/mongo/db/logical_session_cache_noop.h
index 59785ff240e..7c5e0be1784 100644
--- a/src/mongo/db/logical_session_cache_noop.h
+++ b/src/mongo/db/logical_session_cache_noop.h
@@ -73,6 +73,19 @@ public:
size_t size() override {
return 0;
}
+
+ std::vector<LogicalSessionId> listIds() const override {
+ return {};
+ }
+
+ std::vector<LogicalSessionId> listIds(
+ const std::vector<SHA256Block>& userDigest) const override {
+ return {};
+ }
+
+ boost::optional<LogicalSessionRecord> peekCached(const LogicalSessionId& id) const override {
+ return boost::none;
+ }
};
} // namespace mongo
diff --git a/src/mongo/db/logical_session_id_helpers.cpp b/src/mongo/db/logical_session_id_helpers.cpp
index 6e47690a111..e3de10a12b7 100644
--- a/src/mongo/db/logical_session_id_helpers.cpp
+++ b/src/mongo/db/logical_session_id_helpers.cpp
@@ -32,26 +32,26 @@
#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/auth/user.h"
+#include "mongo/db/auth/user_name.h"
+#include "mongo/db/logical_session_cache.h"
#include "mongo/db/operation_context.h"
namespace mongo {
-namespace {
-
/**
* This is a safe hash that will not collide with a username because all full usernames include an
* '@' character.
*/
const auto kNoAuthDigest = SHA256Block::computeHash(reinterpret_cast<const uint8_t*>(""), 0);
-SHA256Block lookupUserDigest(OperationContext* opCtx) {
+SHA256Block getLogicalSessionUserDigestForLoggedInUser(const OperationContext* opCtx) {
auto client = opCtx->getClient();
ServiceContext* serviceContext = client->getServiceContext();
if (AuthorizationManager::get(serviceContext)->isAuthEnabled()) {
UserName userName;
- auto user = AuthorizationSession::get(client)->getSingleUser();
+ const auto user = AuthorizationSession::get(client)->getSingleUser();
invariant(user);
return user->getDigest();
@@ -60,7 +60,14 @@ SHA256Block lookupUserDigest(OperationContext* opCtx) {
}
}
-} // namespace
+SHA256Block getLogicalSessionUserDigestFor(StringData user, StringData db) {
+ if (user.empty() && db.empty()) {
+ return kNoAuthDigest;
+ }
+ const UserName un(user, db);
+ const auto& fn = un.getFullName();
+ return SHA256Block::computeHash({ConstDataRange(fn.c_str(), fn.size())});
+}
LogicalSessionId makeLogicalSessionId(const LogicalSessionFromClient& fromClient,
OperationContext* opCtx,
@@ -81,11 +88,11 @@ LogicalSessionId makeLogicalSessionId(const LogicalSessionFromClient& fromClient
}) ||
authSession->isAuthorizedForPrivilege(Privilege(
ResourcePattern::forClusterResource(), ActionType::impersonate)) ||
- lookupUserDigest(opCtx) == fromClient.getUid());
+ getLogicalSessionUserDigestForLoggedInUser(opCtx) == fromClient.getUid());
lsid.setUid(*fromClient.getUid());
} else {
- lsid.setUid(lookupUserDigest(opCtx));
+ lsid.setUid(getLogicalSessionUserDigestForLoggedInUser(opCtx));
}
return lsid;
@@ -95,7 +102,7 @@ LogicalSessionId makeLogicalSessionId(OperationContext* opCtx) {
LogicalSessionId id{};
id.setId(UUID::gen());
- id.setUid(lookupUserDigest(opCtx));
+ id.setUid(getLogicalSessionUserDigestForLoggedInUser(opCtx));
return id;
}
diff --git a/src/mongo/db/logical_session_id_helpers.h b/src/mongo/db/logical_session_id_helpers.h
index 2b4c2e76e61..a913ac3c769 100644
--- a/src/mongo/db/logical_session_id_helpers.h
+++ b/src/mongo/db/logical_session_id_helpers.h
@@ -37,6 +37,16 @@
namespace mongo {
/**
+ * Get the currently logged in user's UID digest.
+ */
+SHA256Block getLogicalSessionUserDigestForLoggedInUser(const OperationContext* opCtx);
+
+/**
+ * Get a user digest for a specific user/db identifier.
+ */
+SHA256Block getLogicalSessionUserDigestFor(StringData user, StringData db);
+
+/**
* Factory functions to generate logical session records.
*/
LogicalSessionId makeLogicalSessionId(const LogicalSessionFromClient& lsid,
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript
index 1d622a2c8cf..ce3a9c4c43e 100644
--- a/src/mongo/db/pipeline/SConscript
+++ b/src/mongo/db/pipeline/SConscript
@@ -244,6 +244,7 @@ docSourceEnv.Library(
'document_source_internal_inhibit_optimization.cpp',
'document_source_internal_split_pipeline.cpp',
'document_source_limit.cpp',
+ 'document_source_list_local_sessions.cpp',
'document_source_match.cpp',
'document_source_merge_cursors.cpp',
'document_source_mock.cpp',
@@ -263,6 +264,7 @@ docSourceEnv.Library(
'$BUILD_DIR/mongo/client/clientdriver',
'$BUILD_DIR/mongo/db/bson/dotted_path_support',
'$BUILD_DIR/mongo/db/index/key_generator',
+ '$BUILD_DIR/mongo/db/logical_session_cache_impl',
'$BUILD_DIR/mongo/db/matcher/expression_algo',
'$BUILD_DIR/mongo/db/matcher/expressions',
'$BUILD_DIR/mongo/db/pipeline/lite_parsed_document_source',
diff --git a/src/mongo/db/pipeline/document_source_change_stream.h b/src/mongo/db/pipeline/document_source_change_stream.h
index 39aed452ed4..93dcd5483b8 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.h
+++ b/src/mongo/db/pipeline/document_source_change_stream.h
@@ -61,6 +61,10 @@ public:
PrivilegeVector requiredPrivileges(bool isMongos) const final {
return {};
}
+
+ bool allowedToForwardFromMongos() const final {
+ return false;
+ }
};
class Transformation : public DocumentSourceSingleDocumentTransformation::TransformerInterface {
diff --git a/src/mongo/db/pipeline/document_source_list_local_sessions.cpp b/src/mongo/db/pipeline/document_source_list_local_sessions.cpp
new file mode 100644
index 00000000000..af4d454d208
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_list_local_sessions.cpp
@@ -0,0 +1,166 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/auth/authorization_session.h"
+#include "mongo/db/auth/user_name.h"
+#include "mongo/db/logical_session_id_helpers.h"
+#include "mongo/db/pipeline/document_source_list_local_sessions.h"
+#include "mongo/db/pipeline/document_sources_gen.h"
+
+namespace mongo {
+
+REGISTER_DOCUMENT_SOURCE(listLocalSessions,
+ DocumentSourceListLocalSessions::LiteParsed::parse,
+ DocumentSourceListLocalSessions::createFromBson);
+
+const char* DocumentSourceListLocalSessions::kStageName = "$listLocalSessions";
+
+DocumentSource::GetNextResult DocumentSourceListLocalSessions::getNext() {
+ pExpCtx->checkForInterrupt();
+
+ while (!_ids.empty()) {
+ const auto& id = _ids.back();
+ _ids.pop_back();
+
+ const auto& record = _cache->peekCached(id);
+ if (!record) {
+ // It's possible for SessionRecords to have expired while we're walking
+ continue;
+ }
+ return Document(record->toBSON());
+ }
+
+ return GetNextResult::makeEOF();
+}
+
+boost::intrusive_ptr<DocumentSource> DocumentSourceListLocalSessions::createFromBson(
+ BSONElement spec, const boost::intrusive_ptr<ExpressionContext>& pExpCtx) {
+
+ uassert(
+ ErrorCodes::InvalidNamespace,
+ str::stream() << kStageName
+ << " must be run against the database with {aggregate: 1}, not a collection",
+ pExpCtx->ns.isCollectionlessAggregateNS());
+
+ return new DocumentSourceListLocalSessions(pExpCtx, listSessionsParseSpec(kStageName, spec));
+}
+
+DocumentSourceListLocalSessions::DocumentSourceListLocalSessions(
+ const boost::intrusive_ptr<ExpressionContext>& pExpCtx, const ListSessionsSpec& spec)
+ : DocumentSource(pExpCtx), _spec(spec) {
+ const auto& opCtx = pExpCtx->opCtx;
+ _cache = LogicalSessionCache::get(opCtx);
+ if (_spec.getAllUsers()) {
+ invariant(!_spec.getUsers() || _spec.getUsers()->empty());
+ _ids = _cache->listIds();
+ } else {
+ _ids = _cache->listIds(listSessionsUsersToDigests(_spec.getUsers().get()));
+ }
+}
+
+namespace {
+ListSessionsUser getUserNameForLoggedInUser(const OperationContext* opCtx) {
+ auto* client = opCtx->getClient();
+
+ ListSessionsUser user;
+ if (AuthorizationManager::get(client->getServiceContext())->isAuthEnabled()) {
+ const auto& userName = AuthorizationSession::get(client)->getSingleUser()->getName();
+ user.setUser(userName.getUser());
+ user.setDb(userName.getDB());
+ } else {
+ user.setUser("");
+ user.setDb("");
+ }
+ return user;
+}
+
+bool operator==(const ListSessionsUser& user1, const ListSessionsUser& user2) {
+ return std::tie(user1.getUser(), user1.getDb()) == std::tie(user2.getUser(), user2.getDb());
+}
+} // namespace
+
+} // namespace mongo
+
+std::vector<mongo::SHA256Block> mongo::listSessionsUsersToDigests(
+ const std::vector<ListSessionsUser>& users) {
+ std::vector<SHA256Block> ret;
+ ret.reserve(users.size());
+ for (const auto& user : users) {
+ ret.push_back(getLogicalSessionUserDigestFor(user.getUser(), user.getDb()));
+ }
+ return ret;
+}
+
+mongo::PrivilegeVector mongo::listSessionsRequiredPrivileges(const ListSessionsSpec& spec) {
+ const auto needsPrivs = ([spec]() {
+ if (spec.getAllUsers()) {
+ return true;
+ }
+ // parseSpec should ensure users is non-empty.
+ invariant(spec.getUsers());
+
+ const auto& myName =
+ getUserNameForLoggedInUser(Client::getCurrent()->getOperationContext());
+ const auto& users = spec.getUsers().get();
+ return !std::all_of(
+ users.cbegin(), users.cend(), [myName](const auto& name) { return myName == name; });
+ })();
+
+ if (needsPrivs) {
+ return {Privilege(ResourcePattern::forClusterResource(), ActionType::listSessions)};
+ } else {
+ return PrivilegeVector();
+ }
+}
+
+mongo::ListSessionsSpec mongo::listSessionsParseSpec(StringData stageName,
+ const BSONElement& spec) {
+ uassert(ErrorCodes::TypeMismatch,
+ str::stream() << stageName << " options must be specified in an object, but found: "
+ << typeName(spec.type()),
+ spec.type() == BSONType::Object);
+
+ IDLParserErrorContext ctx(stageName);
+ auto ret = ListSessionsSpec::parse(ctx, spec.Obj());
+
+ uassert(ErrorCodes::UnsupportedFormat,
+ str::stream() << stageName
+ << " may not specify {allUsers:true} and {users:[...]} at the same time",
+ !ret.getAllUsers() || !ret.getUsers() || ret.getUsers()->empty());
+
+ if (!ret.getAllUsers() && (!ret.getUsers() || ret.getUsers()->empty())) {
+ // Implicit request for self
+ const auto& userName =
+ getUserNameForLoggedInUser(Client::getCurrent()->getOperationContext());
+ ret.setUsers(std::vector<ListSessionsUser>({userName}));
+ }
+
+ return ret;
+}
diff --git a/src/mongo/db/pipeline/document_source_list_local_sessions.h b/src/mongo/db/pipeline/document_source_list_local_sessions.h
new file mode 100644
index 00000000000..8c386f0ecaa
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_list_local_sessions.h
@@ -0,0 +1,118 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+#include <vector>
+
+#include "mongo/bson/bsonmisc.h"
+#include "mongo/bson/bsonobj.h"
+#include "mongo/crypto/sha256_block.h"
+#include "mongo/db/logical_session_cache.h"
+#include "mongo/db/pipeline/document_source.h"
+#include "mongo/db/pipeline/document_sources_gen.h"
+#include "mongo/db/pipeline/lite_parsed_document_source.h"
+
+namespace mongo {
+
+ListSessionsSpec listSessionsParseSpec(StringData stageName, const BSONElement& spec);
+PrivilegeVector listSessionsRequiredPrivileges(const ListSessionsSpec& spec);
+std::vector<SHA256Block> listSessionsUsersToDigests(const std::vector<ListSessionsUser>& users);
+
+/**
+ * Produces one document per session in the local cache if 'allUsers' is specified
+ * as true, and returns just sessions for the currently logged in user if
+ * 'allUsers' is specified as false, or not specified at all.
+ */
+class DocumentSourceListLocalSessions final : public DocumentSource {
+public:
+ static const char* kStageName;
+
+ class LiteParsed final : public LiteParsedDocumentSource {
+ public:
+ static std::unique_ptr<LiteParsed> parse(const AggregationRequest& request,
+ const BSONElement& spec) {
+
+ return stdx::make_unique<LiteParsed>(listSessionsParseSpec(kStageName, spec));
+ }
+
+ explicit LiteParsed(const ListSessionsSpec& spec) : _spec(spec) {}
+
+ stdx::unordered_set<NamespaceString> getInvolvedNamespaces() const final {
+ return stdx::unordered_set<NamespaceString>();
+ }
+
+ PrivilegeVector requiredPrivileges(bool isMongos) const final {
+ return listSessionsRequiredPrivileges(_spec);
+ }
+
+ bool isInitialSource() const final {
+ return true;
+ }
+
+ bool allowedToForwardFromMongos() const final {
+ return false;
+ }
+
+ private:
+ const ListSessionsSpec _spec;
+ };
+
+ GetNextResult getNext() final;
+
+ const char* getSourceName() const final {
+ return kStageName;
+ }
+
+ Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final {
+ return Value(Document{{getSourceName(), _spec.toBSON()}});
+ }
+
+ StageConstraints constraints() const final {
+ StageConstraints constraints;
+ constraints.requiredPosition = StageConstraints::PositionRequirement::kFirst;
+ constraints.hostRequirement = StageConstraints::HostTypeRequirement::kAnyShardOrMongoS;
+ constraints.requiresInputDocSource = false;
+ constraints.isAllowedInsideFacetStage = false;
+ constraints.isIndependentOfAnyCollection = true;
+ return constraints;
+ }
+
+ static boost::intrusive_ptr<DocumentSource> createFromBson(
+ BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+
+private:
+ DocumentSourceListLocalSessions(const boost::intrusive_ptr<ExpressionContext>& pExpCtx,
+ const ListSessionsSpec& spec);
+
+ const ListSessionsSpec _spec;
+ const LogicalSessionCache* _cache;
+ std::vector<LogicalSessionId> _ids;
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/pipeline/document_sources.idl b/src/mongo/db/pipeline/document_sources.idl
index 52f6803f69b..e329a286c5e 100644
--- a/src/mongo/db/pipeline/document_sources.idl
+++ b/src/mongo/db/pipeline/document_sources.idl
@@ -90,6 +90,24 @@ structs:
type: timestamp
description: The timestamp of the logical time
+ ListSessionsUser:
+ description: "A struct representing a $listSessions/$listLocalSessions User"
+ strict: true
+ fields:
+ user: string
+ db: string
+
+ ListSessionsSpec:
+ description: "$listSessions and $listLocalSessions pipeline spec"
+ strict: true
+ fields:
+ allUsers:
+ type: bool
+ default: false
+ users:
+ type: array<ListSessionsUser>
+ optional: true
+
ResumeTokenInternal:
description: The internal format of a resume token. For use by the ResumeToken class
only.
diff --git a/src/mongo/db/pipeline/lite_parsed_document_source.h b/src/mongo/db/pipeline/lite_parsed_document_source.h
index 60e2e8bd8ee..bfe69d6af00 100644
--- a/src/mongo/db/pipeline/lite_parsed_document_source.h
+++ b/src/mongo/db/pipeline/lite_parsed_document_source.h
@@ -111,6 +111,13 @@ public:
virtual bool isInitialSource() const {
return false;
}
+
+ /**
+ * Returns true if this stage may be forwarded to shards from a mongos.
+ */
+ virtual bool allowedToForwardFromMongos() const {
+ return true;
+ }
};
class LiteParsedDocumentSourceDefault final : public LiteParsedDocumentSource {
diff --git a/src/mongo/db/pipeline/lite_parsed_pipeline.h b/src/mongo/db/pipeline/lite_parsed_pipeline.h
index a83a1e3c8a0..7a9f7625604 100644
--- a/src/mongo/db/pipeline/lite_parsed_pipeline.h
+++ b/src/mongo/db/pipeline/lite_parsed_pipeline.h
@@ -102,6 +102,15 @@ public:
});
}
+ /**
+ * Returns false if the pipeline has any stage which must be run locally on mongos.
+ */
+ bool allowedToForwardFromMongos() const {
+ return std::all_of(_stageSpecs.cbegin(), _stageSpecs.cend(), [](const auto& spec) {
+ return spec->allowedToForwardFromMongos();
+ });
+ }
+
private:
std::vector<std::unique_ptr<LiteParsedDocumentSource>> _stageSpecs;
};
diff --git a/src/mongo/s/commands/cluster_aggregate.cpp b/src/mongo/s/commands/cluster_aggregate.cpp
index b6c326b53cb..4788de5b5a1 100644
--- a/src/mongo/s/commands/cluster_aggregate.cpp
+++ b/src/mongo/s/commands/cluster_aggregate.cpp
@@ -415,10 +415,10 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx,
StringMap<ExpressionContext::ResolvedNamespace> resolvedNamespaces;
LiteParsedPipeline liteParsedPipeline(request);
- // TODO SERVER-29141 support $changeStream on mongos.
+ // TODO SERVER-29141 support forcing pipeline to run on Mongos.
uassert(40567,
- "$changeStream is not yet supported on mongos",
- !liteParsedPipeline.hasChangeStream());
+ "Unable to force mongos-only stage to run on mongos",
+ liteParsedPipeline.allowedToForwardFromMongos());
for (auto&& nss : liteParsedPipeline.getInvolvedNamespaces()) {
const auto resolvedNsRoutingInfo =