diff options
Diffstat (limited to 'src/mongo/db/sessions_collection_sharded.cpp')
-rw-r--r-- | src/mongo/db/sessions_collection_sharded.cpp | 68 |
1 files changed, 65 insertions, 3 deletions
diff --git a/src/mongo/db/sessions_collection_sharded.cpp b/src/mongo/db/sessions_collection_sharded.cpp index cf1be44431c..e65ddd0dab5 100644 --- a/src/mongo/db/sessions_collection_sharded.cpp +++ b/src/mongo/db/sessions_collection_sharded.cpp @@ -40,6 +40,8 @@ #include "mongo/rpc/op_msg.h" #include "mongo/rpc/op_msg_rpc_impls.h" #include "mongo/s/catalog_cache.h" +#include "mongo/s/client/shard.h" +#include "mongo/s/client/shard_registry.h" #include "mongo/s/grid.h" #include "mongo/s/query/cluster_find.h" #include "mongo/s/write_ops/batch_write_exec.h" @@ -78,6 +80,60 @@ Status SessionsCollectionSharded::_checkCacheForSessionsCollection(OperationCont return {ErrorCodes::NamespaceNotFound, "config.system.sessions does not exist"}; } +std::vector<LogicalSessionId> SessionsCollectionSharded::_groupSessionIdsByOwningShard( + OperationContext* opCtx, const LogicalSessionIdSet& sessions) { + auto routingInfo = uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo( + opCtx, NamespaceString::kLogicalSessionsNamespace)); + auto cm = routingInfo.cm(); + + uassert(ErrorCodes::NamespaceNotSharded, + str::stream() << "Collection " << NamespaceString::kLogicalSessionsNamespace + << " is not sharded", + cm); + + std::multimap<ShardId, LogicalSessionId> sessionIdsByOwningShard; + for (const auto& session : sessions) { + sessionIdsByOwningShard.emplace( + cm->findIntersectingChunkWithSimpleCollation(session.getId().toBSON()).getShardId(), + session); + } + + std::vector<LogicalSessionId> sessionIdsGroupedByShard; + sessionIdsGroupedByShard.reserve(sessions.size()); + for (auto& session : sessionIdsByOwningShard) { + sessionIdsGroupedByShard.emplace_back(std::move(session.second)); + } + + return sessionIdsGroupedByShard; +} + +std::vector<LogicalSessionRecord> SessionsCollectionSharded::_groupSessionRecordsByOwningShard( + OperationContext* opCtx, const LogicalSessionRecordSet& sessions) { + auto routingInfo = uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo( + opCtx, NamespaceString::kLogicalSessionsNamespace)); + auto cm = routingInfo.cm(); + + uassert(ErrorCodes::NamespaceNotSharded, + str::stream() << "Collection " << NamespaceString::kLogicalSessionsNamespace + << " is not sharded", + cm); + + std::multimap<ShardId, LogicalSessionRecord> sessionsByOwningShard; + for (const auto& session : sessions) { + sessionsByOwningShard.emplace( + cm->findIntersectingChunkWithSimpleCollation(session.getId().toBSON()).getShardId(), + session); + } + + std::vector<LogicalSessionRecord> sessionRecordsGroupedByShard; + sessionRecordsGroupedByShard.reserve(sessions.size()); + for (auto& session : sessionsByOwningShard) { + sessionRecordsGroupedByShard.emplace_back(std::move(session.second)); + } + + return sessionRecordsGroupedByShard; +} + Status SessionsCollectionSharded::setupSessionsCollection(OperationContext* opCtx) { return checkSessionsCollectionExists(opCtx); } @@ -100,7 +156,9 @@ Status SessionsCollectionSharded::refreshSessions(OperationContext* opCtx, return response.toStatus(); }; - return doRefresh(NamespaceString::kLogicalSessionsNamespace, sessions, send); + return doRefresh(NamespaceString::kLogicalSessionsNamespace, + _groupSessionRecordsByOwningShard(opCtx, sessions), + send); } Status SessionsCollectionSharded::removeRecords(OperationContext* opCtx, @@ -117,7 +175,9 @@ Status SessionsCollectionSharded::removeRecords(OperationContext* opCtx, return response.toStatus(); }; - return doRemove(NamespaceString::kLogicalSessionsNamespace, sessions, send); + return doRemove(NamespaceString::kLogicalSessionsNamespace, + _groupSessionIdsByOwningShard(opCtx, sessions), + send); } StatusWith<LogicalSessionIdSet> SessionsCollectionSharded::findRemovedSessions( @@ -163,7 +223,9 @@ StatusWith<LogicalSessionIdSet> SessionsCollectionSharded::findRemovedSessions( return replyBuilder.releaseBody(); }; - return doFetch(NamespaceString::kLogicalSessionsNamespace, sessions, send); + return doFindRemoved(NamespaceString::kLogicalSessionsNamespace, + _groupSessionIdsByOwningShard(opCtx, sessions), + send); } } // namespace mongo |