summaryrefslogtreecommitdiff
path: root/src/mongo/db/sessions_collection_rs.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/sessions_collection_rs.cpp')
-rw-r--r--src/mongo/db/sessions_collection_rs.cpp90
1 files changed, 71 insertions, 19 deletions
diff --git a/src/mongo/db/sessions_collection_rs.cpp b/src/mongo/db/sessions_collection_rs.cpp
index 89387af9531..f4d5f44b453 100644
--- a/src/mongo/db/sessions_collection_rs.cpp
+++ b/src/mongo/db/sessions_collection_rs.cpp
@@ -84,13 +84,16 @@ Status makePrimaryConnection(OperationContext* opCtx, boost::optional<ScopedDbCo
}
template <typename Callback>
-auto runIfStandaloneOrPrimary(OperationContext* opCtx, Callback callback)
+auto runIfStandaloneOrPrimary(const NamespaceString& ns,
+ LockMode mode,
+ OperationContext* opCtx,
+ Callback callback)
-> boost::optional<decltype(std::declval<Callback>()())> {
- Lock::DBLock lk(opCtx, SessionsCollection::kSessionsDb, MODE_IX);
- Lock::CollectionLock lock(opCtx->lockState(), SessionsCollection::kSessionsFullNS, MODE_IX);
+ Lock::DBLock lk(opCtx, ns.db(), mode);
+ Lock::CollectionLock lock(opCtx->lockState(), SessionsCollection::kSessionsFullNS, mode);
auto coord = mongo::repl::ReplicationCoordinator::get(opCtx);
- if (coord->canAcceptWritesForDatabase(opCtx, SessionsCollection::kSessionsDb)) {
+ if (coord->canAcceptWritesForDatabase(opCtx, ns.db())) {
return callback();
}
@@ -110,10 +113,14 @@ auto sendToPrimary(OperationContext* opCtx, Callback callback)
}
template <typename LocalCallback, typename RemoteCallback>
-auto dispatch(OperationContext* opCtx, LocalCallback localCallback, RemoteCallback remoteCallback)
+auto dispatch(const NamespaceString& ns,
+ LockMode mode,
+ OperationContext* opCtx,
+ LocalCallback localCallback,
+ RemoteCallback remoteCallback)
-> decltype(std::declval<RemoteCallback>()(static_cast<DBClientBase*>(nullptr))) {
// If we are the primary, write directly to ourself.
- auto result = runIfStandaloneOrPrimary(opCtx, [&] { return localCallback(); });
+ auto result = runIfStandaloneOrPrimary(ns, mode, opCtx, [&] { return localCallback(); });
if (result) {
return *result;
@@ -127,38 +134,83 @@ auto dispatch(OperationContext* opCtx, LocalCallback localCallback, RemoteCallba
Status SessionsCollectionRS::refreshSessions(OperationContext* opCtx,
const LogicalSessionRecordSet& sessions,
Date_t refreshTime) {
- return dispatch(opCtx,
+ return dispatch(
+ kSessionsNamespaceString,
+ MODE_IX,
+ opCtx,
+ [&] {
+ DBDirectClient client(opCtx);
+ return doRefresh(kSessionsNamespaceString,
+ sessions,
+ refreshTime,
+ makeSendFnForBatchWrite(kSessionsNamespaceString, &client));
+ },
+ [&](DBClientBase* client) {
+ return doRefreshExternal(kSessionsNamespaceString,
+ sessions,
+ refreshTime,
+ makeSendFnForCommand(kSessionsNamespaceString, client));
+ });
+}
+
+Status SessionsCollectionRS::removeRecords(OperationContext* opCtx,
+ const LogicalSessionIdSet& sessions) {
+ return dispatch(kSessionsNamespaceString,
+ MODE_IX,
+ opCtx,
[&] {
DBDirectClient client(opCtx);
- return doRefresh(sessions, refreshTime, makeSendFnForBatchWrite(&client));
+ return doRemove(kSessionsNamespaceString,
+ sessions,
+ makeSendFnForBatchWrite(kSessionsNamespaceString, &client));
},
[&](DBClientBase* client) {
- return doRefreshExternal(
- sessions, refreshTime, makeSendFnForCommand(client));
+ return doRemoveExternal(
+ kSessionsNamespaceString,
+ sessions,
+ makeSendFnForCommand(kSessionsNamespaceString, client));
});
}
-Status SessionsCollectionRS::removeRecords(OperationContext* opCtx,
- const LogicalSessionIdSet& sessions) {
- return dispatch(opCtx,
+StatusWith<LogicalSessionIdSet> SessionsCollectionRS::findRemovedSessions(
+ OperationContext* opCtx, const LogicalSessionIdSet& sessions) {
+ return dispatch(kSessionsNamespaceString,
+ MODE_IS,
+ opCtx,
[&] {
DBDirectClient client(opCtx);
- return doRemove(sessions, makeSendFnForBatchWrite(&client));
+ return doFetch(kSessionsNamespaceString,
+ sessions,
+ makeFindFnForCommand(kSessionsNamespaceString, &client));
},
[&](DBClientBase* client) {
- return doRemoveExternal(sessions, makeSendFnForCommand(client));
+ return doFetch(kSessionsNamespaceString,
+ sessions,
+ makeFindFnForCommand(kSessionsNamespaceString, client));
});
}
-StatusWith<LogicalSessionIdSet> SessionsCollectionRS::findRemovedSessions(
- OperationContext* opCtx, const LogicalSessionIdSet& sessions) {
+Status SessionsCollectionRS::removeTransactionRecords(OperationContext* opCtx,
+ const LogicalSessionIdSet& sessions) {
return dispatch(
+ kSessionsNamespaceString,
+ MODE_IX,
opCtx,
[&] {
DBDirectClient client(opCtx);
- return doFetch(sessions, makeFindFnForCommand(&client));
+ return doRemove(NamespaceString::kSessionTransactionsTableNamespace,
+ sessions,
+ makeSendFnForBatchWrite(
+ NamespaceString::kSessionTransactionsTableNamespace, &client));
},
- [&](DBClientBase* client) { return doFetch(sessions, makeFindFnForCommand(client)); });
+ [](DBClientBase*) {
+ return Status(ErrorCodes::NotMaster, "Not eligible to remove transaction records");
+ });
+}
+
+Status SessionsCollectionRS::removeTransactionRecordsHelper(OperationContext* opCtx,
+ const LogicalSessionIdSet& sessions) {
+ return SessionsCollectionRS{}.removeTransactionRecords(opCtx, sessions);
}
} // namespace mongo