summaryrefslogtreecommitdiff
path: root/src/mongo/db/sessions_collection_sharded.cpp
diff options
context:
space:
mode:
authorKaloian Manassiev <kaloian.manassiev@mongodb.com>2019-05-03 16:21:24 -0400
committerKaloian Manassiev <kaloian.manassiev@mongodb.com>2019-05-09 07:40:56 -0400
commit2791817876636c0cfd60d867f31c7a83cf3f18c1 (patch)
tree3aefcb1999cccf4cb53b2401a44857549ba8722a /src/mongo/db/sessions_collection_sharded.cpp
parent1b8a9f5dc5c3314042b55e7415a2a25045b32a94 (diff)
downloadmongo-2791817876636c0cfd60d867f31c7a83cf3f18c1.tar.gz
SERVER-37837 Get rid of TransactionReaper (Part 1)
This change gets rid of the TransactionReaper's usage of the ReplicationCoordinator for checking whether it is primary or not and makes the LogicalSessionCache joinable on shutdown. It also removes the TransactionReaper's grouping per-shard optimization and moves it all under SessionCollectionSharded.
Diffstat (limited to 'src/mongo/db/sessions_collection_sharded.cpp')
-rw-r--r--src/mongo/db/sessions_collection_sharded.cpp68
1 files changed, 65 insertions, 3 deletions
diff --git a/src/mongo/db/sessions_collection_sharded.cpp b/src/mongo/db/sessions_collection_sharded.cpp
index cf1be44431c..e65ddd0dab5 100644
--- a/src/mongo/db/sessions_collection_sharded.cpp
+++ b/src/mongo/db/sessions_collection_sharded.cpp
@@ -40,6 +40,8 @@
#include "mongo/rpc/op_msg.h"
#include "mongo/rpc/op_msg_rpc_impls.h"
#include "mongo/s/catalog_cache.h"
+#include "mongo/s/client/shard.h"
+#include "mongo/s/client/shard_registry.h"
#include "mongo/s/grid.h"
#include "mongo/s/query/cluster_find.h"
#include "mongo/s/write_ops/batch_write_exec.h"
@@ -78,6 +80,60 @@ Status SessionsCollectionSharded::_checkCacheForSessionsCollection(OperationCont
return {ErrorCodes::NamespaceNotFound, "config.system.sessions does not exist"};
}
+std::vector<LogicalSessionId> SessionsCollectionSharded::_groupSessionIdsByOwningShard(
+ OperationContext* opCtx, const LogicalSessionIdSet& sessions) {
+ auto routingInfo = uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(
+ opCtx, NamespaceString::kLogicalSessionsNamespace));
+ auto cm = routingInfo.cm();
+
+ uassert(ErrorCodes::NamespaceNotSharded,
+ str::stream() << "Collection " << NamespaceString::kLogicalSessionsNamespace
+ << " is not sharded",
+ cm);
+
+ std::multimap<ShardId, LogicalSessionId> sessionIdsByOwningShard;
+ for (const auto& session : sessions) {
+ sessionIdsByOwningShard.emplace(
+ cm->findIntersectingChunkWithSimpleCollation(session.getId().toBSON()).getShardId(),
+ session);
+ }
+
+ std::vector<LogicalSessionId> sessionIdsGroupedByShard;
+ sessionIdsGroupedByShard.reserve(sessions.size());
+ for (auto& session : sessionIdsByOwningShard) {
+ sessionIdsGroupedByShard.emplace_back(std::move(session.second));
+ }
+
+ return sessionIdsGroupedByShard;
+}
+
+std::vector<LogicalSessionRecord> SessionsCollectionSharded::_groupSessionRecordsByOwningShard(
+ OperationContext* opCtx, const LogicalSessionRecordSet& sessions) {
+ auto routingInfo = uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(
+ opCtx, NamespaceString::kLogicalSessionsNamespace));
+ auto cm = routingInfo.cm();
+
+ uassert(ErrorCodes::NamespaceNotSharded,
+ str::stream() << "Collection " << NamespaceString::kLogicalSessionsNamespace
+ << " is not sharded",
+ cm);
+
+ std::multimap<ShardId, LogicalSessionRecord> sessionsByOwningShard;
+ for (const auto& session : sessions) {
+ sessionsByOwningShard.emplace(
+ cm->findIntersectingChunkWithSimpleCollation(session.getId().toBSON()).getShardId(),
+ session);
+ }
+
+ std::vector<LogicalSessionRecord> sessionRecordsGroupedByShard;
+ sessionRecordsGroupedByShard.reserve(sessions.size());
+ for (auto& session : sessionsByOwningShard) {
+ sessionRecordsGroupedByShard.emplace_back(std::move(session.second));
+ }
+
+ return sessionRecordsGroupedByShard;
+}
+
Status SessionsCollectionSharded::setupSessionsCollection(OperationContext* opCtx) {
return checkSessionsCollectionExists(opCtx);
}
@@ -100,7 +156,9 @@ Status SessionsCollectionSharded::refreshSessions(OperationContext* opCtx,
return response.toStatus();
};
- return doRefresh(NamespaceString::kLogicalSessionsNamespace, sessions, send);
+ return doRefresh(NamespaceString::kLogicalSessionsNamespace,
+ _groupSessionRecordsByOwningShard(opCtx, sessions),
+ send);
}
Status SessionsCollectionSharded::removeRecords(OperationContext* opCtx,
@@ -117,7 +175,9 @@ Status SessionsCollectionSharded::removeRecords(OperationContext* opCtx,
return response.toStatus();
};
- return doRemove(NamespaceString::kLogicalSessionsNamespace, sessions, send);
+ return doRemove(NamespaceString::kLogicalSessionsNamespace,
+ _groupSessionIdsByOwningShard(opCtx, sessions),
+ send);
}
StatusWith<LogicalSessionIdSet> SessionsCollectionSharded::findRemovedSessions(
@@ -163,7 +223,9 @@ StatusWith<LogicalSessionIdSet> SessionsCollectionSharded::findRemovedSessions(
return replyBuilder.releaseBody();
};
- return doFetch(NamespaceString::kLogicalSessionsNamespace, sessions, send);
+ return doFindRemoved(NamespaceString::kLogicalSessionsNamespace,
+ _groupSessionIdsByOwningShard(opCtx, sessions),
+ send);
}
} // namespace mongo