diff options
author | Sara Golemon <sara.golemon@mongodb.com> | 2017-08-08 13:34:39 -0400 |
---|---|---|
committer | Sara Golemon <sara.golemon@mongodb.com> | 2017-08-23 09:18:17 -0400 |
commit | a09f19822fb2a1e5b662bd8d542dd8e2f2607fc6 (patch) | |
tree | 4d396a07bdf8db1752aa4952211f4b46490459b7 /src | |
parent | 3b8719aecf7541ee83738d9241bfcbc1281b6ed2 (diff) | |
download | mongo-a09f19822fb2a1e5b662bd8d542dd8e2f2607fc6.tar.gz |
SERVER-29628 $listLocalSessions aggregation stage
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/auth/action_types.txt | 1 | ||||
-rw-r--r-- | src/mongo/db/auth/authorization_session.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/auth/role_graph_builtin_roles.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/logical_session_cache.h | 18 | ||||
-rw-r--r-- | src/mongo/db/logical_session_cache_impl.cpp | 32 | ||||
-rw-r--r-- | src/mongo/db/logical_session_cache_impl.h | 9 | ||||
-rw-r--r-- | src/mongo/db/logical_session_cache_noop.h | 13 | ||||
-rw-r--r-- | src/mongo/db/logical_session_id_helpers.cpp | 23 | ||||
-rw-r--r-- | src/mongo/db/logical_session_id_helpers.h | 10 | ||||
-rw-r--r-- | src/mongo/db/pipeline/SConscript | 2 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_change_stream.h | 4 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_list_local_sessions.cpp | 166 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_source_list_local_sessions.h | 118 | ||||
-rw-r--r-- | src/mongo/db/pipeline/document_sources.idl | 18 | ||||
-rw-r--r-- | src/mongo/db/pipeline/lite_parsed_document_source.h | 7 | ||||
-rw-r--r-- | src/mongo/db/pipeline/lite_parsed_pipeline.h | 9 | ||||
-rw-r--r-- | src/mongo/s/commands/cluster_aggregate.cpp | 6 |
17 files changed, 428 insertions, 14 deletions
diff --git a/src/mongo/db/auth/action_types.txt b/src/mongo/db/auth/action_types.txt index d3b2745d319..06185ac9de9 100644 --- a/src/mongo/db/auth/action_types.txt +++ b/src/mongo/db/auth/action_types.txt @@ -70,6 +70,7 @@ "listCollections", "listDatabases", "listIndexes", +"listSessions", "listShards", "logRotate", "moveChunk", diff --git a/src/mongo/db/auth/authorization_session.cpp b/src/mongo/db/auth/authorization_session.cpp index 201acb11b00..86cba41f616 100644 --- a/src/mongo/db/auth/authorization_session.cpp +++ b/src/mongo/db/auth/authorization_session.cpp @@ -200,10 +200,10 @@ User* AuthorizationSession::getSingleUser() { if (userNameItr.more()) { userName = userNameItr.next(); if (userNameItr.more()) { - uasserted(ErrorCodes::Unauthorized, "there are no users authenticated"); + uasserted(ErrorCodes::Unauthorized, "too many users are authenticated"); } } else { - uasserted(ErrorCodes::Unauthorized, "too many users are authenticated"); + uasserted(ErrorCodes::Unauthorized, "there are no users authenticated"); } return lookupUser(userName); diff --git a/src/mongo/db/auth/role_graph_builtin_roles.cpp b/src/mongo/db/auth/role_graph_builtin_roles.cpp index 36cc6b4c15a..0ab6e26d052 100644 --- a/src/mongo/db/auth/role_graph_builtin_roles.cpp +++ b/src/mongo/db/auth/role_graph_builtin_roles.cpp @@ -186,6 +186,7 @@ MONGO_INITIALIZER(AuthorizationBuiltinRoles)(InitializerContext* context) { << ActionType::getShardMap << ActionType::hostInfo << ActionType::listDatabases + << ActionType::listSessions // clusterManager gets this also << ActionType::listShards // clusterManager gets this also << ActionType::netstat << ActionType::replSetGetConfig // clusterManager gets this also @@ -238,6 +239,7 @@ MONGO_INITIALIZER(AuthorizationBuiltinRoles)(InitializerContext* context) { << ActionType::resync // hostManager gets this also << ActionType::addShard << ActionType::removeShard + << ActionType::listSessions // clusterMonitor gets this also << ActionType::listShards // clusterMonitor gets this also << ActionType::flushRouterConfig // hostManager gets this also << ActionType::cleanupOrphaned; diff --git a/src/mongo/db/logical_session_cache.h b/src/mongo/db/logical_session_cache.h index dfc1772013a..8e7ba166874 100644 --- a/src/mongo/db/logical_session_cache.h +++ b/src/mongo/db/logical_session_cache.h @@ -28,6 +28,8 @@ #pragma once +#include <boost/optional.hpp> + #include "mongo/base/status.h" #include "mongo/db/logical_session_id.h" #include "mongo/db/refresh_sessions_gen.h" @@ -103,6 +105,22 @@ public: * Returns the number of session records currently in the cache. */ virtual size_t size() = 0; + + /** + * Ennumerate all LogicalSessionId keys currently in the cache. + */ + virtual std::vector<LogicalSessionId> listIds() const = 0; + + /** + * Ennumerate all LogicalSessionId keys in the cache for the given UserDigests. + */ + virtual std::vector<LogicalSessionId> listIds( + const std::vector<SHA256Block>& userDigest) const = 0; + + /** + * Retrieve a LogicalSessionRecord by LogicalSessionId, if it exists in the cache. + */ + virtual boost::optional<LogicalSessionRecord> peekCached(const LogicalSessionId& id) const = 0; }; } // namespace mongo diff --git a/src/mongo/db/logical_session_cache_impl.cpp b/src/mongo/db/logical_session_cache_impl.cpp index c6dd6df3630..5cb38a51b08 100644 --- a/src/mongo/db/logical_session_cache_impl.cpp +++ b/src/mongo/db/logical_session_cache_impl.cpp @@ -263,4 +263,36 @@ boost::optional<LogicalSessionRecord> LogicalSessionCacheImpl::_addToCache( return _cache.add(record.getId(), std::move(record)); } +std::vector<LogicalSessionId> LogicalSessionCacheImpl::listIds() const { + stdx::lock_guard<stdx::mutex> lk(_cacheMutex); + std::vector<LogicalSessionId> ret; + ret.reserve(_cache.size()); + for (const auto& id : _cache) { + ret.push_back(id.first); + } + return ret; +} + +std::vector<LogicalSessionId> LogicalSessionCacheImpl::listIds( + const std::vector<SHA256Block>& userDigests) const { + stdx::lock_guard<stdx::mutex> lk(_cacheMutex); + std::vector<LogicalSessionId> ret; + for (const auto& it : _cache) { + if (std::find(userDigests.cbegin(), userDigests.cend(), it.first.getUid()) != + userDigests.cend()) { + ret.push_back(it.first); + } + } + return ret; +} + +boost::optional<LogicalSessionRecord> LogicalSessionCacheImpl::peekCached( + const LogicalSessionId& id) const { + stdx::lock_guard<stdx::mutex> lk(_cacheMutex); + const auto it = _cache.cfind(id); + if (it == _cache.cend()) { + return boost::none; + } + return it->second; +} } // namespace mongo diff --git a/src/mongo/db/logical_session_cache_impl.h b/src/mongo/db/logical_session_cache_impl.h index 92b8dd7301c..47a2f897f3b 100644 --- a/src/mongo/db/logical_session_cache_impl.h +++ b/src/mongo/db/logical_session_cache_impl.h @@ -123,6 +123,13 @@ public: size_t size() override; + std::vector<LogicalSessionId> listIds() const override; + + std::vector<LogicalSessionId> listIds( + const std::vector<SHA256Block>& userDigest) const override; + + boost::optional<LogicalSessionRecord> peekCached(const LogicalSessionId& id) const override; + private: /** * Internal methods to handle scheduling and perform refreshes for active @@ -147,7 +154,7 @@ private: std::unique_ptr<ServiceLiason> _service; std::unique_ptr<SessionsCollection> _sessionsColl; - stdx::mutex _cacheMutex; + mutable stdx::mutex _cacheMutex; LRUCache<LogicalSessionId, LogicalSessionRecord, LogicalSessionIdHash> _cache; }; diff --git a/src/mongo/db/logical_session_cache_noop.h b/src/mongo/db/logical_session_cache_noop.h index 59785ff240e..7c5e0be1784 100644 --- a/src/mongo/db/logical_session_cache_noop.h +++ b/src/mongo/db/logical_session_cache_noop.h @@ -73,6 +73,19 @@ public: size_t size() override { return 0; } + + std::vector<LogicalSessionId> listIds() const override { + return {}; + } + + std::vector<LogicalSessionId> listIds( + const std::vector<SHA256Block>& userDigest) const override { + return {}; + } + + boost::optional<LogicalSessionRecord> peekCached(const LogicalSessionId& id) const override { + return boost::none; + } }; } // namespace mongo diff --git a/src/mongo/db/logical_session_id_helpers.cpp b/src/mongo/db/logical_session_id_helpers.cpp index 6e47690a111..e3de10a12b7 100644 --- a/src/mongo/db/logical_session_id_helpers.cpp +++ b/src/mongo/db/logical_session_id_helpers.cpp @@ -32,26 +32,26 @@ #include "mongo/db/auth/authorization_session.h" #include "mongo/db/auth/user.h" +#include "mongo/db/auth/user_name.h" +#include "mongo/db/logical_session_cache.h" #include "mongo/db/operation_context.h" namespace mongo { -namespace { - /** * This is a safe hash that will not collide with a username because all full usernames include an * '@' character. */ const auto kNoAuthDigest = SHA256Block::computeHash(reinterpret_cast<const uint8_t*>(""), 0); -SHA256Block lookupUserDigest(OperationContext* opCtx) { +SHA256Block getLogicalSessionUserDigestForLoggedInUser(const OperationContext* opCtx) { auto client = opCtx->getClient(); ServiceContext* serviceContext = client->getServiceContext(); if (AuthorizationManager::get(serviceContext)->isAuthEnabled()) { UserName userName; - auto user = AuthorizationSession::get(client)->getSingleUser(); + const auto user = AuthorizationSession::get(client)->getSingleUser(); invariant(user); return user->getDigest(); @@ -60,7 +60,14 @@ SHA256Block lookupUserDigest(OperationContext* opCtx) { } } -} // namespace +SHA256Block getLogicalSessionUserDigestFor(StringData user, StringData db) { + if (user.empty() && db.empty()) { + return kNoAuthDigest; + } + const UserName un(user, db); + const auto& fn = un.getFullName(); + return SHA256Block::computeHash({ConstDataRange(fn.c_str(), fn.size())}); +} LogicalSessionId makeLogicalSessionId(const LogicalSessionFromClient& fromClient, OperationContext* opCtx, @@ -81,11 +88,11 @@ LogicalSessionId makeLogicalSessionId(const LogicalSessionFromClient& fromClient }) || authSession->isAuthorizedForPrivilege(Privilege( ResourcePattern::forClusterResource(), ActionType::impersonate)) || - lookupUserDigest(opCtx) == fromClient.getUid()); + getLogicalSessionUserDigestForLoggedInUser(opCtx) == fromClient.getUid()); lsid.setUid(*fromClient.getUid()); } else { - lsid.setUid(lookupUserDigest(opCtx)); + lsid.setUid(getLogicalSessionUserDigestForLoggedInUser(opCtx)); } return lsid; @@ -95,7 +102,7 @@ LogicalSessionId makeLogicalSessionId(OperationContext* opCtx) { LogicalSessionId id{}; id.setId(UUID::gen()); - id.setUid(lookupUserDigest(opCtx)); + id.setUid(getLogicalSessionUserDigestForLoggedInUser(opCtx)); return id; } diff --git a/src/mongo/db/logical_session_id_helpers.h b/src/mongo/db/logical_session_id_helpers.h index 2b4c2e76e61..a913ac3c769 100644 --- a/src/mongo/db/logical_session_id_helpers.h +++ b/src/mongo/db/logical_session_id_helpers.h @@ -37,6 +37,16 @@ namespace mongo { /** + * Get the currently logged in user's UID digest. + */ +SHA256Block getLogicalSessionUserDigestForLoggedInUser(const OperationContext* opCtx); + +/** + * Get a user digest for a specific user/db identifier. + */ +SHA256Block getLogicalSessionUserDigestFor(StringData user, StringData db); + +/** * Factory functions to generate logical session records. */ LogicalSessionId makeLogicalSessionId(const LogicalSessionFromClient& lsid, diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index 1d622a2c8cf..ce3a9c4c43e 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -244,6 +244,7 @@ docSourceEnv.Library( 'document_source_internal_inhibit_optimization.cpp', 'document_source_internal_split_pipeline.cpp', 'document_source_limit.cpp', + 'document_source_list_local_sessions.cpp', 'document_source_match.cpp', 'document_source_merge_cursors.cpp', 'document_source_mock.cpp', @@ -263,6 +264,7 @@ docSourceEnv.Library( '$BUILD_DIR/mongo/client/clientdriver', '$BUILD_DIR/mongo/db/bson/dotted_path_support', '$BUILD_DIR/mongo/db/index/key_generator', + '$BUILD_DIR/mongo/db/logical_session_cache_impl', '$BUILD_DIR/mongo/db/matcher/expression_algo', '$BUILD_DIR/mongo/db/matcher/expressions', '$BUILD_DIR/mongo/db/pipeline/lite_parsed_document_source', diff --git a/src/mongo/db/pipeline/document_source_change_stream.h b/src/mongo/db/pipeline/document_source_change_stream.h index 39aed452ed4..93dcd5483b8 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.h +++ b/src/mongo/db/pipeline/document_source_change_stream.h @@ -61,6 +61,10 @@ public: PrivilegeVector requiredPrivileges(bool isMongos) const final { return {}; } + + bool allowedToForwardFromMongos() const final { + return false; + } }; class Transformation : public DocumentSourceSingleDocumentTransformation::TransformerInterface { diff --git a/src/mongo/db/pipeline/document_source_list_local_sessions.cpp b/src/mongo/db/pipeline/document_source_list_local_sessions.cpp new file mode 100644 index 00000000000..af4d454d208 --- /dev/null +++ b/src/mongo/db/pipeline/document_source_list_local_sessions.cpp @@ -0,0 +1,166 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/auth/authorization_session.h" +#include "mongo/db/auth/user_name.h" +#include "mongo/db/logical_session_id_helpers.h" +#include "mongo/db/pipeline/document_source_list_local_sessions.h" +#include "mongo/db/pipeline/document_sources_gen.h" + +namespace mongo { + +REGISTER_DOCUMENT_SOURCE(listLocalSessions, + DocumentSourceListLocalSessions::LiteParsed::parse, + DocumentSourceListLocalSessions::createFromBson); + +const char* DocumentSourceListLocalSessions::kStageName = "$listLocalSessions"; + +DocumentSource::GetNextResult DocumentSourceListLocalSessions::getNext() { + pExpCtx->checkForInterrupt(); + + while (!_ids.empty()) { + const auto& id = _ids.back(); + _ids.pop_back(); + + const auto& record = _cache->peekCached(id); + if (!record) { + // It's possible for SessionRecords to have expired while we're walking + continue; + } + return Document(record->toBSON()); + } + + return GetNextResult::makeEOF(); +} + +boost::intrusive_ptr<DocumentSource> DocumentSourceListLocalSessions::createFromBson( + BSONElement spec, const boost::intrusive_ptr<ExpressionContext>& pExpCtx) { + + uassert( + ErrorCodes::InvalidNamespace, + str::stream() << kStageName + << " must be run against the database with {aggregate: 1}, not a collection", + pExpCtx->ns.isCollectionlessAggregateNS()); + + return new DocumentSourceListLocalSessions(pExpCtx, listSessionsParseSpec(kStageName, spec)); +} + +DocumentSourceListLocalSessions::DocumentSourceListLocalSessions( + const boost::intrusive_ptr<ExpressionContext>& pExpCtx, const ListSessionsSpec& spec) + : DocumentSource(pExpCtx), _spec(spec) { + const auto& opCtx = pExpCtx->opCtx; + _cache = LogicalSessionCache::get(opCtx); + if (_spec.getAllUsers()) { + invariant(!_spec.getUsers() || _spec.getUsers()->empty()); + _ids = _cache->listIds(); + } else { + _ids = _cache->listIds(listSessionsUsersToDigests(_spec.getUsers().get())); + } +} + +namespace { +ListSessionsUser getUserNameForLoggedInUser(const OperationContext* opCtx) { + auto* client = opCtx->getClient(); + + ListSessionsUser user; + if (AuthorizationManager::get(client->getServiceContext())->isAuthEnabled()) { + const auto& userName = AuthorizationSession::get(client)->getSingleUser()->getName(); + user.setUser(userName.getUser()); + user.setDb(userName.getDB()); + } else { + user.setUser(""); + user.setDb(""); + } + return user; +} + +bool operator==(const ListSessionsUser& user1, const ListSessionsUser& user2) { + return std::tie(user1.getUser(), user1.getDb()) == std::tie(user2.getUser(), user2.getDb()); +} +} // namespace + +} // namespace mongo + +std::vector<mongo::SHA256Block> mongo::listSessionsUsersToDigests( + const std::vector<ListSessionsUser>& users) { + std::vector<SHA256Block> ret; + ret.reserve(users.size()); + for (const auto& user : users) { + ret.push_back(getLogicalSessionUserDigestFor(user.getUser(), user.getDb())); + } + return ret; +} + +mongo::PrivilegeVector mongo::listSessionsRequiredPrivileges(const ListSessionsSpec& spec) { + const auto needsPrivs = ([spec]() { + if (spec.getAllUsers()) { + return true; + } + // parseSpec should ensure users is non-empty. + invariant(spec.getUsers()); + + const auto& myName = + getUserNameForLoggedInUser(Client::getCurrent()->getOperationContext()); + const auto& users = spec.getUsers().get(); + return !std::all_of( + users.cbegin(), users.cend(), [myName](const auto& name) { return myName == name; }); + })(); + + if (needsPrivs) { + return {Privilege(ResourcePattern::forClusterResource(), ActionType::listSessions)}; + } else { + return PrivilegeVector(); + } +} + +mongo::ListSessionsSpec mongo::listSessionsParseSpec(StringData stageName, + const BSONElement& spec) { + uassert(ErrorCodes::TypeMismatch, + str::stream() << stageName << " options must be specified in an object, but found: " + << typeName(spec.type()), + spec.type() == BSONType::Object); + + IDLParserErrorContext ctx(stageName); + auto ret = ListSessionsSpec::parse(ctx, spec.Obj()); + + uassert(ErrorCodes::UnsupportedFormat, + str::stream() << stageName + << " may not specify {allUsers:true} and {users:[...]} at the same time", + !ret.getAllUsers() || !ret.getUsers() || ret.getUsers()->empty()); + + if (!ret.getAllUsers() && (!ret.getUsers() || ret.getUsers()->empty())) { + // Implicit request for self + const auto& userName = + getUserNameForLoggedInUser(Client::getCurrent()->getOperationContext()); + ret.setUsers(std::vector<ListSessionsUser>({userName})); + } + + return ret; +} diff --git a/src/mongo/db/pipeline/document_source_list_local_sessions.h b/src/mongo/db/pipeline/document_source_list_local_sessions.h new file mode 100644 index 00000000000..8c386f0ecaa --- /dev/null +++ b/src/mongo/db/pipeline/document_source_list_local_sessions.h @@ -0,0 +1,118 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include <vector> + +#include "mongo/bson/bsonmisc.h" +#include "mongo/bson/bsonobj.h" +#include "mongo/crypto/sha256_block.h" +#include "mongo/db/logical_session_cache.h" +#include "mongo/db/pipeline/document_source.h" +#include "mongo/db/pipeline/document_sources_gen.h" +#include "mongo/db/pipeline/lite_parsed_document_source.h" + +namespace mongo { + +ListSessionsSpec listSessionsParseSpec(StringData stageName, const BSONElement& spec); +PrivilegeVector listSessionsRequiredPrivileges(const ListSessionsSpec& spec); +std::vector<SHA256Block> listSessionsUsersToDigests(const std::vector<ListSessionsUser>& users); + +/** + * Produces one document per session in the local cache if 'allUsers' is specified + * as true, and returns just sessions for the currently logged in user if + * 'allUsers' is specified as false, or not specified at all. + */ +class DocumentSourceListLocalSessions final : public DocumentSource { +public: + static const char* kStageName; + + class LiteParsed final : public LiteParsedDocumentSource { + public: + static std::unique_ptr<LiteParsed> parse(const AggregationRequest& request, + const BSONElement& spec) { + + return stdx::make_unique<LiteParsed>(listSessionsParseSpec(kStageName, spec)); + } + + explicit LiteParsed(const ListSessionsSpec& spec) : _spec(spec) {} + + stdx::unordered_set<NamespaceString> getInvolvedNamespaces() const final { + return stdx::unordered_set<NamespaceString>(); + } + + PrivilegeVector requiredPrivileges(bool isMongos) const final { + return listSessionsRequiredPrivileges(_spec); + } + + bool isInitialSource() const final { + return true; + } + + bool allowedToForwardFromMongos() const final { + return false; + } + + private: + const ListSessionsSpec _spec; + }; + + GetNextResult getNext() final; + + const char* getSourceName() const final { + return kStageName; + } + + Value serialize(boost::optional<ExplainOptions::Verbosity> explain = boost::none) const final { + return Value(Document{{getSourceName(), _spec.toBSON()}}); + } + + StageConstraints constraints() const final { + StageConstraints constraints; + constraints.requiredPosition = StageConstraints::PositionRequirement::kFirst; + constraints.hostRequirement = StageConstraints::HostTypeRequirement::kAnyShardOrMongoS; + constraints.requiresInputDocSource = false; + constraints.isAllowedInsideFacetStage = false; + constraints.isIndependentOfAnyCollection = true; + return constraints; + } + + static boost::intrusive_ptr<DocumentSource> createFromBson( + BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx); + +private: + DocumentSourceListLocalSessions(const boost::intrusive_ptr<ExpressionContext>& pExpCtx, + const ListSessionsSpec& spec); + + const ListSessionsSpec _spec; + const LogicalSessionCache* _cache; + std::vector<LogicalSessionId> _ids; +}; + +} // namespace mongo diff --git a/src/mongo/db/pipeline/document_sources.idl b/src/mongo/db/pipeline/document_sources.idl index 52f6803f69b..e329a286c5e 100644 --- a/src/mongo/db/pipeline/document_sources.idl +++ b/src/mongo/db/pipeline/document_sources.idl @@ -90,6 +90,24 @@ structs: type: timestamp description: The timestamp of the logical time + ListSessionsUser: + description: "A struct representing a $listSessions/$listLocalSessions User" + strict: true + fields: + user: string + db: string + + ListSessionsSpec: + description: "$listSessions and $listLocalSessions pipeline spec" + strict: true + fields: + allUsers: + type: bool + default: false + users: + type: array<ListSessionsUser> + optional: true + ResumeTokenInternal: description: The internal format of a resume token. For use by the ResumeToken class only. diff --git a/src/mongo/db/pipeline/lite_parsed_document_source.h b/src/mongo/db/pipeline/lite_parsed_document_source.h index 60e2e8bd8ee..bfe69d6af00 100644 --- a/src/mongo/db/pipeline/lite_parsed_document_source.h +++ b/src/mongo/db/pipeline/lite_parsed_document_source.h @@ -111,6 +111,13 @@ public: virtual bool isInitialSource() const { return false; } + + /** + * Returns true if this stage may be forwarded to shards from a mongos. + */ + virtual bool allowedToForwardFromMongos() const { + return true; + } }; class LiteParsedDocumentSourceDefault final : public LiteParsedDocumentSource { diff --git a/src/mongo/db/pipeline/lite_parsed_pipeline.h b/src/mongo/db/pipeline/lite_parsed_pipeline.h index a83a1e3c8a0..7a9f7625604 100644 --- a/src/mongo/db/pipeline/lite_parsed_pipeline.h +++ b/src/mongo/db/pipeline/lite_parsed_pipeline.h @@ -102,6 +102,15 @@ public: }); } + /** + * Returns false if the pipeline has any stage which must be run locally on mongos. + */ + bool allowedToForwardFromMongos() const { + return std::all_of(_stageSpecs.cbegin(), _stageSpecs.cend(), [](const auto& spec) { + return spec->allowedToForwardFromMongos(); + }); + } + private: std::vector<std::unique_ptr<LiteParsedDocumentSource>> _stageSpecs; }; diff --git a/src/mongo/s/commands/cluster_aggregate.cpp b/src/mongo/s/commands/cluster_aggregate.cpp index b6c326b53cb..4788de5b5a1 100644 --- a/src/mongo/s/commands/cluster_aggregate.cpp +++ b/src/mongo/s/commands/cluster_aggregate.cpp @@ -415,10 +415,10 @@ Status ClusterAggregate::runAggregate(OperationContext* opCtx, StringMap<ExpressionContext::ResolvedNamespace> resolvedNamespaces; LiteParsedPipeline liteParsedPipeline(request); - // TODO SERVER-29141 support $changeStream on mongos. + // TODO SERVER-29141 support forcing pipeline to run on Mongos. uassert(40567, - "$changeStream is not yet supported on mongos", - !liteParsedPipeline.hasChangeStream()); + "Unable to force mongos-only stage to run on mongos", + liteParsedPipeline.allowedToForwardFromMongos()); for (auto&& nss : liteParsedPipeline.getInvolvedNamespaces()) { const auto resolvedNsRoutingInfo = |