diff options
Diffstat (limited to 'src/mongo/db/sessions_collection_rs.cpp')
-rw-r--r-- | src/mongo/db/sessions_collection_rs.cpp | 90 |
1 files changed, 71 insertions, 19 deletions
diff --git a/src/mongo/db/sessions_collection_rs.cpp b/src/mongo/db/sessions_collection_rs.cpp index 89387af9531..f4d5f44b453 100644 --- a/src/mongo/db/sessions_collection_rs.cpp +++ b/src/mongo/db/sessions_collection_rs.cpp @@ -84,13 +84,16 @@ Status makePrimaryConnection(OperationContext* opCtx, boost::optional<ScopedDbCo } template <typename Callback> -auto runIfStandaloneOrPrimary(OperationContext* opCtx, Callback callback) +auto runIfStandaloneOrPrimary(const NamespaceString& ns, + LockMode mode, + OperationContext* opCtx, + Callback callback) -> boost::optional<decltype(std::declval<Callback>()())> { - Lock::DBLock lk(opCtx, SessionsCollection::kSessionsDb, MODE_IX); - Lock::CollectionLock lock(opCtx->lockState(), SessionsCollection::kSessionsFullNS, MODE_IX); + Lock::DBLock lk(opCtx, ns.db(), mode); + Lock::CollectionLock lock(opCtx->lockState(), SessionsCollection::kSessionsFullNS, mode); auto coord = mongo::repl::ReplicationCoordinator::get(opCtx); - if (coord->canAcceptWritesForDatabase(opCtx, SessionsCollection::kSessionsDb)) { + if (coord->canAcceptWritesForDatabase(opCtx, ns.db())) { return callback(); } @@ -110,10 +113,14 @@ auto sendToPrimary(OperationContext* opCtx, Callback callback) } template <typename LocalCallback, typename RemoteCallback> -auto dispatch(OperationContext* opCtx, LocalCallback localCallback, RemoteCallback remoteCallback) +auto dispatch(const NamespaceString& ns, + LockMode mode, + OperationContext* opCtx, + LocalCallback localCallback, + RemoteCallback remoteCallback) -> decltype(std::declval<RemoteCallback>()(static_cast<DBClientBase*>(nullptr))) { // If we are the primary, write directly to ourself. - auto result = runIfStandaloneOrPrimary(opCtx, [&] { return localCallback(); }); + auto result = runIfStandaloneOrPrimary(ns, mode, opCtx, [&] { return localCallback(); }); if (result) { return *result; @@ -127,38 +134,83 @@ auto dispatch(OperationContext* opCtx, LocalCallback localCallback, RemoteCallba Status SessionsCollectionRS::refreshSessions(OperationContext* opCtx, const LogicalSessionRecordSet& sessions, Date_t refreshTime) { - return dispatch(opCtx, + return dispatch( + kSessionsNamespaceString, + MODE_IX, + opCtx, + [&] { + DBDirectClient client(opCtx); + return doRefresh(kSessionsNamespaceString, + sessions, + refreshTime, + makeSendFnForBatchWrite(kSessionsNamespaceString, &client)); + }, + [&](DBClientBase* client) { + return doRefreshExternal(kSessionsNamespaceString, + sessions, + refreshTime, + makeSendFnForCommand(kSessionsNamespaceString, client)); + }); +} + +Status SessionsCollectionRS::removeRecords(OperationContext* opCtx, + const LogicalSessionIdSet& sessions) { + return dispatch(kSessionsNamespaceString, + MODE_IX, + opCtx, [&] { DBDirectClient client(opCtx); - return doRefresh(sessions, refreshTime, makeSendFnForBatchWrite(&client)); + return doRemove(kSessionsNamespaceString, + sessions, + makeSendFnForBatchWrite(kSessionsNamespaceString, &client)); }, [&](DBClientBase* client) { - return doRefreshExternal( - sessions, refreshTime, makeSendFnForCommand(client)); + return doRemoveExternal( + kSessionsNamespaceString, + sessions, + makeSendFnForCommand(kSessionsNamespaceString, client)); }); } -Status SessionsCollectionRS::removeRecords(OperationContext* opCtx, - const LogicalSessionIdSet& sessions) { - return dispatch(opCtx, +StatusWith<LogicalSessionIdSet> SessionsCollectionRS::findRemovedSessions( + OperationContext* opCtx, const LogicalSessionIdSet& sessions) { + return dispatch(kSessionsNamespaceString, + MODE_IS, + opCtx, [&] { DBDirectClient client(opCtx); - return doRemove(sessions, makeSendFnForBatchWrite(&client)); + return doFetch(kSessionsNamespaceString, + sessions, + makeFindFnForCommand(kSessionsNamespaceString, &client)); }, [&](DBClientBase* client) { - return doRemoveExternal(sessions, makeSendFnForCommand(client)); + return doFetch(kSessionsNamespaceString, + sessions, + makeFindFnForCommand(kSessionsNamespaceString, client)); }); } -StatusWith<LogicalSessionIdSet> SessionsCollectionRS::findRemovedSessions( - OperationContext* opCtx, const LogicalSessionIdSet& sessions) { +Status SessionsCollectionRS::removeTransactionRecords(OperationContext* opCtx, + const LogicalSessionIdSet& sessions) { return dispatch( + kSessionsNamespaceString, + MODE_IX, opCtx, [&] { DBDirectClient client(opCtx); - return doFetch(sessions, makeFindFnForCommand(&client)); + return doRemove(NamespaceString::kSessionTransactionsTableNamespace, + sessions, + makeSendFnForBatchWrite( + NamespaceString::kSessionTransactionsTableNamespace, &client)); }, - [&](DBClientBase* client) { return doFetch(sessions, makeFindFnForCommand(client)); }); + [](DBClientBase*) { + return Status(ErrorCodes::NotMaster, "Not eligible to remove transaction records"); + }); +} + +Status SessionsCollectionRS::removeTransactionRecordsHelper(OperationContext* opCtx, + const LogicalSessionIdSet& sessions) { + return SessionsCollectionRS{}.removeTransactionRecords(opCtx, sessions); } } // namespace mongo |