summaryrefslogtreecommitdiff
path: root/src/mongo/db/sessions_collection_sharded.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/sessions_collection_sharded.cpp')
-rw-r--r--src/mongo/db/sessions_collection_sharded.cpp68
1 files changed, 65 insertions, 3 deletions
diff --git a/src/mongo/db/sessions_collection_sharded.cpp b/src/mongo/db/sessions_collection_sharded.cpp
index cf1be44431c..e65ddd0dab5 100644
--- a/src/mongo/db/sessions_collection_sharded.cpp
+++ b/src/mongo/db/sessions_collection_sharded.cpp
@@ -40,6 +40,8 @@
#include "mongo/rpc/op_msg.h"
#include "mongo/rpc/op_msg_rpc_impls.h"
#include "mongo/s/catalog_cache.h"
+#include "mongo/s/client/shard.h"
+#include "mongo/s/client/shard_registry.h"
#include "mongo/s/grid.h"
#include "mongo/s/query/cluster_find.h"
#include "mongo/s/write_ops/batch_write_exec.h"
@@ -78,6 +80,60 @@ Status SessionsCollectionSharded::_checkCacheForSessionsCollection(OperationCont
return {ErrorCodes::NamespaceNotFound, "config.system.sessions does not exist"};
}
+std::vector<LogicalSessionId> SessionsCollectionSharded::_groupSessionIdsByOwningShard(
+ OperationContext* opCtx, const LogicalSessionIdSet& sessions) {
+ auto routingInfo = uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(
+ opCtx, NamespaceString::kLogicalSessionsNamespace));
+ auto cm = routingInfo.cm();
+
+ uassert(ErrorCodes::NamespaceNotSharded,
+ str::stream() << "Collection " << NamespaceString::kLogicalSessionsNamespace
+ << " is not sharded",
+ cm);
+
+ std::multimap<ShardId, LogicalSessionId> sessionIdsByOwningShard;
+ for (const auto& session : sessions) {
+ sessionIdsByOwningShard.emplace(
+ cm->findIntersectingChunkWithSimpleCollation(session.getId().toBSON()).getShardId(),
+ session);
+ }
+
+ std::vector<LogicalSessionId> sessionIdsGroupedByShard;
+ sessionIdsGroupedByShard.reserve(sessions.size());
+ for (auto& session : sessionIdsByOwningShard) {
+ sessionIdsGroupedByShard.emplace_back(std::move(session.second));
+ }
+
+ return sessionIdsGroupedByShard;
+}
+
+std::vector<LogicalSessionRecord> SessionsCollectionSharded::_groupSessionRecordsByOwningShard(
+ OperationContext* opCtx, const LogicalSessionRecordSet& sessions) {
+ auto routingInfo = uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(
+ opCtx, NamespaceString::kLogicalSessionsNamespace));
+ auto cm = routingInfo.cm();
+
+ uassert(ErrorCodes::NamespaceNotSharded,
+ str::stream() << "Collection " << NamespaceString::kLogicalSessionsNamespace
+ << " is not sharded",
+ cm);
+
+ std::multimap<ShardId, LogicalSessionRecord> sessionsByOwningShard;
+ for (const auto& session : sessions) {
+ sessionsByOwningShard.emplace(
+ cm->findIntersectingChunkWithSimpleCollation(session.getId().toBSON()).getShardId(),
+ session);
+ }
+
+ std::vector<LogicalSessionRecord> sessionRecordsGroupedByShard;
+ sessionRecordsGroupedByShard.reserve(sessions.size());
+ for (auto& session : sessionsByOwningShard) {
+ sessionRecordsGroupedByShard.emplace_back(std::move(session.second));
+ }
+
+ return sessionRecordsGroupedByShard;
+}
+
Status SessionsCollectionSharded::setupSessionsCollection(OperationContext* opCtx) {
return checkSessionsCollectionExists(opCtx);
}
@@ -100,7 +156,9 @@ Status SessionsCollectionSharded::refreshSessions(OperationContext* opCtx,
return response.toStatus();
};
- return doRefresh(NamespaceString::kLogicalSessionsNamespace, sessions, send);
+ return doRefresh(NamespaceString::kLogicalSessionsNamespace,
+ _groupSessionRecordsByOwningShard(opCtx, sessions),
+ send);
}
Status SessionsCollectionSharded::removeRecords(OperationContext* opCtx,
@@ -117,7 +175,9 @@ Status SessionsCollectionSharded::removeRecords(OperationContext* opCtx,
return response.toStatus();
};
- return doRemove(NamespaceString::kLogicalSessionsNamespace, sessions, send);
+ return doRemove(NamespaceString::kLogicalSessionsNamespace,
+ _groupSessionIdsByOwningShard(opCtx, sessions),
+ send);
}
StatusWith<LogicalSessionIdSet> SessionsCollectionSharded::findRemovedSessions(
@@ -163,7 +223,9 @@ StatusWith<LogicalSessionIdSet> SessionsCollectionSharded::findRemovedSessions(
return replyBuilder.releaseBody();
};
- return doFetch(NamespaceString::kLogicalSessionsNamespace, sessions, send);
+ return doFindRemoved(NamespaceString::kLogicalSessionsNamespace,
+ _groupSessionIdsByOwningShard(opCtx, sessions),
+ send);
}
} // namespace mongo