diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/logical_session_cache_impl.cpp | 11 | ||||
-rw-r--r-- | src/mongo/db/session_catalog_mongod.cpp | 6 | ||||
-rw-r--r-- | src/mongo/db/sessions_collection.cpp | 92 | ||||
-rw-r--r-- | src/mongo/db/sessions_collection.h | 38 | ||||
-rw-r--r-- | src/mongo/db/sessions_collection_mock.cpp | 25 | ||||
-rw-r--r-- | src/mongo/db/sessions_collection_mock.h | 30 | ||||
-rw-r--r-- | src/mongo/db/sessions_collection_rs.cpp | 70 | ||||
-rw-r--r-- | src/mongo/db/sessions_collection_rs.h | 9 | ||||
-rw-r--r-- | src/mongo/db/sessions_collection_sharded.cpp | 52 | ||||
-rw-r--r-- | src/mongo/db/sessions_collection_sharded.h | 9 | ||||
-rw-r--r-- | src/mongo/db/sessions_collection_standalone.cpp | 22 | ||||
-rw-r--r-- | src/mongo/db/sessions_collection_standalone.h | 9 | ||||
-rw-r--r-- | src/mongo/dbtests/logical_sessions_tests.cpp | 33 | ||||
-rw-r--r-- | src/mongo/s/session_catalog_router.cpp | 2 |
14 files changed, 177 insertions, 231 deletions
diff --git a/src/mongo/db/logical_session_cache_impl.cpp b/src/mongo/db/logical_session_cache_impl.cpp index a623b15e856..2d8834461ab 100644 --- a/src/mongo/db/logical_session_cache_impl.cpp +++ b/src/mongo/db/logical_session_cache_impl.cpp @@ -296,7 +296,7 @@ void LogicalSessionCacheImpl::_refresh(Client* client) { } // Refresh the active sessions in the sessions collection. - uassertStatusOK(_sessionsColl->refreshSessions(opCtx, activeSessionRecords)); + _sessionsColl->refreshSessions(opCtx, activeSessionRecords); activeSessionsBackSwapper.dismiss(); { stdx::lock_guard<Latch> lk(_mutex); @@ -304,7 +304,7 @@ void LogicalSessionCacheImpl::_refresh(Client* client) { } // Remove the ending sessions from the sessions collection. - uassertStatusOK(_sessionsColl->removeRecords(opCtx, explicitlyEndingSessions)); + _sessionsColl->removeRecords(opCtx, explicitlyEndingSessions); explicitlyEndingBackSwaper.dismiss(); { stdx::lock_guard<Latch> lk(_mutex); @@ -331,14 +331,13 @@ void LogicalSessionCacheImpl::_refresh(Client* client) { } // think about pruning ending and active out of openCursorSessions - auto statusAndRemovedSessions = _sessionsColl->findRemovedSessions(opCtx, openCursorSessions); + try { + auto removedSessions = _sessionsColl->findRemovedSessions(opCtx, openCursorSessions); - if (statusAndRemovedSessions.isOK()) { - auto removedSessions = statusAndRemovedSessions.getValue(); for (const auto& lsid : removedSessions) { patterns.emplace(makeKillAllSessionsByPattern(opCtx, lsid)); } - } else { + } catch (...) { // Ignore errors. } diff --git a/src/mongo/db/session_catalog_mongod.cpp b/src/mongo/db/session_catalog_mongod.cpp index 253ffcc09fd..9094e585b35 100644 --- a/src/mongo/db/session_catalog_mongod.cpp +++ b/src/mongo/db/session_catalog_mongod.cpp @@ -135,8 +135,7 @@ int removeSessionsTransactionRecords(OperationContext* opCtx, return 0; // From the passed-in sessions, find the ones which are actually expired/removed - auto expiredSessionIds = - uassertStatusOK(sessionsCollection.findRemovedSessions(opCtx, sessionIdsToRemove)); + auto expiredSessionIds = sessionsCollection.findRemovedSessions(opCtx, sessionIdsToRemove); if (expiredSessionIds.empty()) return 0; @@ -346,8 +345,7 @@ int MongoDSessionCatalog::reapSessionsOlderThan(OperationContext* opCtx, }); // From the passed-in sessions, find the ones which are actually expired/removed - auto expiredSessionIds = - uassertStatusOK(sessionsCollection.findRemovedSessions(opCtx, lsids)); + auto expiredSessionIds = sessionsCollection.findRemovedSessions(opCtx, lsids); // Remove the session ids from the in-memory catalog for (const auto& lsid : expiredSessionIds) { diff --git a/src/mongo/db/sessions_collection.cpp b/src/mongo/db/sessions_collection.cpp index 6c0f73b82fc..21e5f55b8f4 100644 --- a/src/mongo/db/sessions_collection.cpp +++ b/src/mongo/db/sessions_collection.cpp @@ -91,7 +91,7 @@ BSONObj updateQuery(const LogicalSessionRecord& record) { } template <typename TFactory, typename AddLineFn, typename SendFn, typename Container> -Status runBulkGeneric(TFactory makeT, AddLineFn addLine, SendFn sendBatch, const Container& items) { +void runBulkGeneric(TFactory makeT, AddLineFn addLine, SendFn sendBatch, const Container& items) { using T = decltype(makeT()); size_t i = 0; @@ -102,7 +102,7 @@ Status runBulkGeneric(TFactory makeT, AddLineFn addLine, SendFn sendBatch, const thing.emplace(makeT()); }; - auto sendLocalBatch = [&] { return sendBatch(thing.value()); }; + auto sendLocalBatch = [&] { sendBatch(thing.value()); }; setupBatch(); @@ -110,28 +110,23 @@ Status runBulkGeneric(TFactory makeT, AddLineFn addLine, SendFn sendBatch, const addLine(*thing, item); if (++i >= kMaxBatchSize) { - auto res = sendLocalBatch(); - if (!res.isOK()) { - return res; - } + sendLocalBatch(); setupBatch(); } } if (i > 0) { - return sendLocalBatch(); - } else { - return Status::OK(); + sendLocalBatch(); } } template <typename InitBatchFn, typename AddLineFn, typename SendBatchFn, typename Container> -Status runBulkCmd(StringData label, - InitBatchFn&& initBatch, - AddLineFn&& addLine, - SendBatchFn&& sendBatch, - const Container& items) { +void runBulkCmd(StringData label, + InitBatchFn&& initBatch, + AddLineFn&& addLine, + SendBatchFn&& sendBatch, + const Container& items) { BufBuilder buf; boost::optional<BSONObjBuilder> batchBuilder; @@ -148,10 +143,10 @@ Status runBulkCmd(StringData label, auto sendLocalBatch = [&](BSONArrayBuilder*) { entries->done(); - return sendBatch(batchBuilder->done()); + sendBatch(batchBuilder->done()); }; - return runBulkGeneric(makeBatch, addLine, sendLocalBatch, items); + runBulkGeneric(makeBatch, addLine, sendLocalBatch, items); } } // namespace @@ -164,19 +159,11 @@ SessionsCollection::~SessionsCollection() = default; SessionsCollection::SendBatchFn SessionsCollection::makeSendFnForBatchWrite( const NamespaceString& ns, DBClientBase* client) { - auto send = [client, ns](BSONObj batch) -> Status { + auto send = [client, ns](BSONObj batch) { BSONObj res; if (!client->runCommand(ns.db().toString(), batch, res)) { - return getStatusFromCommandResult(res); + uassertStatusOK(getStatusFromCommandResult(res)); } - - BatchedCommandResponse response; - std::string errmsg; - if (!response.parseBSON(res, &errmsg)) { - return {ErrorCodes::FailedToParse, errmsg}; - } - - return response.toStatus(); }; return send; @@ -184,13 +171,11 @@ SessionsCollection::SendBatchFn SessionsCollection::makeSendFnForBatchWrite( SessionsCollection::SendBatchFn SessionsCollection::makeSendFnForCommand(const NamespaceString& ns, DBClientBase* client) { - auto send = [client, ns](BSONObj cmd) -> Status { + auto send = [client, ns](BSONObj cmd) { BSONObj res; if (!client->runCommand(ns.db().toString(), cmd, res)) { - return getStatusFromCommandResult(res); + uassertStatusOK(getStatusFromCommandResult(res)); } - - return Status::OK(); }; return send; @@ -198,10 +183,10 @@ SessionsCollection::SendBatchFn SessionsCollection::makeSendFnForCommand(const N SessionsCollection::FindBatchFn SessionsCollection::makeFindFnForCommand(const NamespaceString& ns, DBClientBase* client) { - auto send = [client, ns](BSONObj cmd) -> StatusWith<BSONObj> { + auto send = [client, ns](BSONObj cmd) -> BSONObj { BSONObj res; if (!client->runCommand(ns.db().toString(), cmd, res)) { - return getStatusFromCommandResult(res); + uassertStatusOK(getStatusFromCommandResult(res)); } return res; @@ -210,9 +195,9 @@ SessionsCollection::FindBatchFn SessionsCollection::makeFindFnForCommand(const N return send; } -Status SessionsCollection::doRefresh(const NamespaceString& ns, - const std::vector<LogicalSessionRecord>& sessions, - SendBatchFn send) { +void SessionsCollection::doRefresh(const NamespaceString& ns, + const std::vector<LogicalSessionRecord>& sessions, + SendBatchFn send) { auto init = [ns](BSONObjBuilder* batch) { batch->append("update", ns.coll()); batch->append("ordered", false); @@ -225,12 +210,12 @@ Status SessionsCollection::doRefresh(const NamespaceString& ns, BSON("q" << lsidQuery(record) << "u" << updateQuery(record) << "upsert" << true)); }; - return runBulkCmd("updates", init, add, send, sessions); + runBulkCmd("updates", init, add, send, sessions); } -Status SessionsCollection::doRemove(const NamespaceString& ns, - const std::vector<LogicalSessionId>& sessions, - SendBatchFn send) { +void SessionsCollection::doRemove(const NamespaceString& ns, + const std::vector<LogicalSessionId>& sessions, + SendBatchFn send) { auto init = [ns](BSONObjBuilder* batch) { batch->append("delete", ns.coll()); batch->append("ordered", false); @@ -241,11 +226,12 @@ Status SessionsCollection::doRemove(const NamespaceString& ns, builder->append(BSON("q" << lsidQuery(lsid) << "limit" << 0)); }; - return runBulkCmd("deletes", init, add, send, sessions); + runBulkCmd("deletes", init, add, send, sessions); } -StatusWith<LogicalSessionIdSet> SessionsCollection::doFindRemoved( - const NamespaceString& ns, const std::vector<LogicalSessionId>& sessions, FindBatchFn send) { +LogicalSessionIdSet SessionsCollection::doFindRemoved(const NamespaceString& ns, + const std::vector<LogicalSessionId>& sessions, + FindBatchFn send) { auto makeT = [] { return std::vector<LogicalSessionId>{}; }; auto add = [](std::vector<LogicalSessionId>& batch, const LogicalSessionId& record) { @@ -257,17 +243,11 @@ StatusWith<LogicalSessionIdSet> SessionsCollection::doFindRemoved( auto wrappedSend = [&](BSONObj batch) { auto swBatchResult = send(batch); - if (!swBatchResult.isOK()) { - return swBatchResult.getStatus(); - } else { - auto result = SessionsCollectionFetchResult::parse("SessionsCollectionFetchResult"_sd, - swBatchResult.getValue()); + auto result = + SessionsCollectionFetchResult::parse("SessionsCollectionFetchResult"_sd, swBatchResult); - for (const auto& lsid : result.getCursor().getFirstBatch()) { - removed.erase(lsid.get_id()); - } - - return Status::OK(); + for (const auto& lsid : result.getCursor().getFirstBatch()) { + removed.erase(lsid.get_id()); } }; @@ -285,14 +265,10 @@ StatusWith<LogicalSessionIdSet> SessionsCollection::doFindRemoved( request.setLimit(batch.size()); request.setSingleBatch(true); - return wrappedSend(request.toBSON()); + wrappedSend(request.toBSON()); }; - auto status = runBulkGeneric(makeT, add, sendLocal, sessions); - - if (!status.isOK()) { - return status; - } + runBulkGeneric(makeT, add, sendLocal, sessions); return removed; } diff --git a/src/mongo/db/sessions_collection.h b/src/mongo/db/sessions_collection.h index 96cbcb80694..58230d7a83f 100644 --- a/src/mongo/db/sessions_collection.h +++ b/src/mongo/db/sessions_collection.h @@ -62,10 +62,10 @@ public: /** * Updates the last-use times on the given sessions to be greater than - * or equal to the given time. Returns an error if a networking issue occurred. + * or equal to the given time. Throws an exception if a networking issue occurred. */ - virtual Status refreshSessions(OperationContext* opCtx, - const LogicalSessionRecordSet& sessions) = 0; + virtual void refreshSessions(OperationContext* opCtx, + const LogicalSessionRecordSet& sessions) = 0; /** * Removes the authoritative records for the specified sessions. @@ -73,17 +73,17 @@ public: * Implementations should perform authentication checks to ensure that * session records may only be removed if their owner is logged in. * - * Returns an error if the removal fails, for example from a network error. + * Throws an exception if the removal fails, for example from a network error. */ - virtual Status removeRecords(OperationContext* opCtx, const LogicalSessionIdSet& sessions) = 0; + virtual void removeRecords(OperationContext* opCtx, const LogicalSessionIdSet& sessions) = 0; /** * Checks a set of lsids and returns the set that no longer exists * - * Returns an error if the fetch cannot occur, for example from a network error. + * Throws an exception if the fetch cannot occur, for example from a network error. */ - virtual StatusWith<LogicalSessionIdSet> findRemovedSessions( - OperationContext* opCtx, const LogicalSessionIdSet& sessions) = 0; + virtual LogicalSessionIdSet findRemovedSessions(OperationContext* opCtx, + const LogicalSessionIdSet& sessions) = 0; /** * Generates a createIndexes command for the sessions collection TTL index. @@ -101,35 +101,35 @@ protected: /** * Makes a send function for the given client. */ - using SendBatchFn = std::function<Status(BSONObj batch)>; + using SendBatchFn = std::function<void(BSONObj batch)>; static SendBatchFn makeSendFnForCommand(const NamespaceString& ns, DBClientBase* client); static SendBatchFn makeSendFnForBatchWrite(const NamespaceString& ns, DBClientBase* client); - using FindBatchFn = std::function<StatusWith<BSONObj>(BSONObj batch)>; + using FindBatchFn = std::function<BSONObj(BSONObj batch)>; static FindBatchFn makeFindFnForCommand(const NamespaceString& ns, DBClientBase* client); /** * Formats and sends batches of refreshes for the given set of sessions. */ - Status doRefresh(const NamespaceString& ns, - const std::vector<LogicalSessionRecord>& sessions, - SendBatchFn send); + void doRefresh(const NamespaceString& ns, + const std::vector<LogicalSessionRecord>& sessions, + SendBatchFn send); /** * Formats and sends batches of deletes for the given set of sessions. */ - Status doRemove(const NamespaceString& ns, - const std::vector<LogicalSessionId>& sessions, - SendBatchFn send); + void doRemove(const NamespaceString& ns, + const std::vector<LogicalSessionId>& sessions, + SendBatchFn send); /** * Returns those lsids from the input 'sessions' array which are not present in the sessions * collection (essentially performs an inner join of 'sessions' against the sessions * collection). */ - StatusWith<LogicalSessionIdSet> doFindRemoved(const NamespaceString& ns, - const std::vector<LogicalSessionId>& sessions, - FindBatchFn send); + LogicalSessionIdSet doFindRemoved(const NamespaceString& ns, + const std::vector<LogicalSessionId>& sessions, + FindBatchFn send); }; } // namespace mongo diff --git a/src/mongo/db/sessions_collection_mock.cpp b/src/mongo/db/sessions_collection_mock.cpp index 00992744589..1919e7d5bf0 100644 --- a/src/mongo/db/sessions_collection_mock.cpp +++ b/src/mongo/db/sessions_collection_mock.cpp @@ -35,8 +35,8 @@ namespace mongo { MockSessionsCollectionImpl::MockSessionsCollectionImpl() - : _refresh([=](const LogicalSessionRecordSet& sessions) { return _refreshSessions(sessions); }), - _remove([=](const LogicalSessionIdSet& sessions) { return _removeRecords(sessions); }) {} + : _refresh([=](const LogicalSessionRecordSet& sessions) { _refreshSessions(sessions); }), + _remove([=](const LogicalSessionIdSet& sessions) { _removeRecords(sessions); }) {} void MockSessionsCollectionImpl::setRefreshHook(RefreshHook hook) { _refresh = std::move(hook); @@ -47,16 +47,16 @@ void MockSessionsCollectionImpl::setRemoveHook(RemoveHook hook) { } void MockSessionsCollectionImpl::clearHooks() { - _refresh = [=](const LogicalSessionRecordSet& sessions) { return _refreshSessions(sessions); }; - _remove = [=](const LogicalSessionIdSet& sessions) { return _removeRecords(sessions); }; + _refresh = [=](const LogicalSessionRecordSet& sessions) { _refreshSessions(sessions); }; + _remove = [=](const LogicalSessionIdSet& sessions) { _removeRecords(sessions); }; } -Status MockSessionsCollectionImpl::refreshSessions(const LogicalSessionRecordSet& sessions) { - return _refresh(sessions); +void MockSessionsCollectionImpl::refreshSessions(const LogicalSessionRecordSet& sessions) { + _refresh(sessions); } -Status MockSessionsCollectionImpl::removeRecords(const LogicalSessionIdSet& sessions) { - return _remove(std::move(sessions)); +void MockSessionsCollectionImpl::removeRecords(const LogicalSessionIdSet& sessions) { + _remove(std::move(sessions)); } void MockSessionsCollectionImpl::add(LogicalSessionRecord record) { @@ -83,25 +83,22 @@ const MockSessionsCollectionImpl::SessionMap& MockSessionsCollectionImpl::sessio return _sessions; } -Status MockSessionsCollectionImpl::_refreshSessions(const LogicalSessionRecordSet& sessions) { +void MockSessionsCollectionImpl::_refreshSessions(const LogicalSessionRecordSet& sessions) { for (auto& record : sessions) { if (!has(record.getId())) { _sessions.insert({record.getId(), record}); } } - return Status::OK(); } -Status MockSessionsCollectionImpl::_removeRecords(const LogicalSessionIdSet& sessions) { +void MockSessionsCollectionImpl::_removeRecords(const LogicalSessionIdSet& sessions) { stdx::unique_lock<Latch> lk(_mutex); for (auto& lsid : sessions) { _sessions.erase(lsid); } - - return Status::OK(); } -StatusWith<LogicalSessionIdSet> MockSessionsCollectionImpl::findRemovedSessions( +LogicalSessionIdSet MockSessionsCollectionImpl::findRemovedSessions( OperationContext* opCtx, const LogicalSessionIdSet& sessions) { LogicalSessionIdSet lsids; stdx::unique_lock<Latch> lk(_mutex); diff --git a/src/mongo/db/sessions_collection_mock.h b/src/mongo/db/sessions_collection_mock.h index ae118648650..1d95af63f2c 100644 --- a/src/mongo/db/sessions_collection_mock.h +++ b/src/mongo/db/sessions_collection_mock.h @@ -60,8 +60,8 @@ public: MockSessionsCollectionImpl(); - using RefreshHook = std::function<Status(const LogicalSessionRecordSet&)>; - using RemoveHook = std::function<Status(const LogicalSessionIdSet&)>; + using RefreshHook = std::function<void(const LogicalSessionRecordSet&)>; + using RemoveHook = std::function<void(const LogicalSessionIdSet&)>; // Set custom hooks to override default behavior void setRefreshHook(RefreshHook hook); @@ -71,8 +71,8 @@ public: void clearHooks(); // Forwarding methods from the MockSessionsCollection - Status refreshSessions(const LogicalSessionRecordSet& sessions); - Status removeRecords(const LogicalSessionIdSet& sessions); + void refreshSessions(const LogicalSessionRecordSet& sessions); + void removeRecords(const LogicalSessionIdSet& sessions); // Test-side methods that operate on the _sessions map void add(LogicalSessionRecord record); @@ -81,13 +81,13 @@ public: void clearSessions(); const SessionMap& sessions() const; - StatusWith<LogicalSessionIdSet> findRemovedSessions(OperationContext* opCtx, - const LogicalSessionIdSet& sessions); + LogicalSessionIdSet findRemovedSessions(OperationContext* opCtx, + const LogicalSessionIdSet& sessions); private: // Default implementations, may be overridden with custom hooks. - Status _refreshSessions(const LogicalSessionRecordSet& sessions); - Status _removeRecords(const LogicalSessionIdSet& sessions); + void _refreshSessions(const LogicalSessionRecordSet& sessions); + void _removeRecords(const LogicalSessionIdSet& sessions); Mutex _mutex = MONGO_MAKE_LATCH("MockSessionsCollectionImpl::_mutex"); SessionMap _sessions; @@ -110,17 +110,17 @@ public: void checkSessionsCollectionExists(OperationContext* opCtx) override {} - Status refreshSessions(OperationContext* opCtx, - const LogicalSessionRecordSet& sessions) override { - return _impl->refreshSessions(sessions); + void refreshSessions(OperationContext* opCtx, + const LogicalSessionRecordSet& sessions) override { + _impl->refreshSessions(sessions); } - Status removeRecords(OperationContext* opCtx, const LogicalSessionIdSet& sessions) override { - return _impl->removeRecords(sessions); + void removeRecords(OperationContext* opCtx, const LogicalSessionIdSet& sessions) override { + _impl->removeRecords(sessions); } - StatusWith<LogicalSessionIdSet> findRemovedSessions( - OperationContext* opCtx, const LogicalSessionIdSet& sessions) override { + LogicalSessionIdSet findRemovedSessions(OperationContext* opCtx, + const LogicalSessionIdSet& sessions) override { return _impl->findRemovedSessions(opCtx, sessions); } diff --git a/src/mongo/db/sessions_collection_rs.cpp b/src/mongo/db/sessions_collection_rs.cpp index c67de981876..10c5c06125a 100644 --- a/src/mongo/db/sessions_collection_rs.cpp +++ b/src/mongo/db/sessions_collection_rs.cpp @@ -171,50 +171,48 @@ void SessionsCollectionRS::checkSessionsCollectionExists(OperationContext* opCtx (localLogicalSessionTimeoutMinutes * 60)); } -Status SessionsCollectionRS::refreshSessions(OperationContext* opCtx, - const LogicalSessionRecordSet& sessions) { +void SessionsCollectionRS::refreshSessions(OperationContext* opCtx, + const LogicalSessionRecordSet& sessions) { const std::vector<LogicalSessionRecord> sessionsVector(sessions.begin(), sessions.end()); - return _dispatch(NamespaceString::kLogicalSessionsNamespace, - opCtx, - [&] { - DBDirectClient client(opCtx); - return doRefresh(NamespaceString::kLogicalSessionsNamespace, - sessionsVector, - makeSendFnForBatchWrite( - NamespaceString::kLogicalSessionsNamespace, &client)); - }, - [&](DBClientBase* client) { - return doRefresh(NamespaceString::kLogicalSessionsNamespace, - sessionsVector, - makeSendFnForBatchWrite( - NamespaceString::kLogicalSessionsNamespace, client)); - }); + _dispatch( + NamespaceString::kLogicalSessionsNamespace, + opCtx, + [&] { + DBDirectClient client(opCtx); + doRefresh(NamespaceString::kLogicalSessionsNamespace, + sessionsVector, + makeSendFnForBatchWrite(NamespaceString::kLogicalSessionsNamespace, &client)); + }, + [&](DBClientBase* client) { + doRefresh(NamespaceString::kLogicalSessionsNamespace, + sessionsVector, + makeSendFnForBatchWrite(NamespaceString::kLogicalSessionsNamespace, client)); + }); } -Status SessionsCollectionRS::removeRecords(OperationContext* opCtx, - const LogicalSessionIdSet& sessions) { +void SessionsCollectionRS::removeRecords(OperationContext* opCtx, + const LogicalSessionIdSet& sessions) { const std::vector<LogicalSessionId> sessionsVector(sessions.begin(), sessions.end()); - return _dispatch(NamespaceString::kLogicalSessionsNamespace, - opCtx, - [&] { - DBDirectClient client(opCtx); - return doRemove(NamespaceString::kLogicalSessionsNamespace, - sessionsVector, - makeSendFnForBatchWrite( - NamespaceString::kLogicalSessionsNamespace, &client)); - }, - [&](DBClientBase* client) { - return doRemove(NamespaceString::kLogicalSessionsNamespace, - sessionsVector, - makeSendFnForBatchWrite( - NamespaceString::kLogicalSessionsNamespace, client)); - }); + _dispatch( + NamespaceString::kLogicalSessionsNamespace, + opCtx, + [&] { + DBDirectClient client(opCtx); + doRemove(NamespaceString::kLogicalSessionsNamespace, + sessionsVector, + makeSendFnForBatchWrite(NamespaceString::kLogicalSessionsNamespace, &client)); + }, + [&](DBClientBase* client) { + doRemove(NamespaceString::kLogicalSessionsNamespace, + sessionsVector, + makeSendFnForBatchWrite(NamespaceString::kLogicalSessionsNamespace, client)); + }); } -StatusWith<LogicalSessionIdSet> SessionsCollectionRS::findRemovedSessions( - OperationContext* opCtx, const LogicalSessionIdSet& sessions) { +LogicalSessionIdSet SessionsCollectionRS::findRemovedSessions(OperationContext* opCtx, + const LogicalSessionIdSet& sessions) { const std::vector<LogicalSessionId> sessionsVector(sessions.begin(), sessions.end()); return _dispatch( diff --git a/src/mongo/db/sessions_collection_rs.h b/src/mongo/db/sessions_collection_rs.h index f87f48c217d..7382112e6af 100644 --- a/src/mongo/db/sessions_collection_rs.h +++ b/src/mongo/db/sessions_collection_rs.h @@ -65,15 +65,14 @@ public: * * If a step-down happens on this node as this method is running, it may fail. */ - Status refreshSessions(OperationContext* opCtx, - const LogicalSessionRecordSet& sessions) override; + void refreshSessions(OperationContext* opCtx, const LogicalSessionRecordSet& sessions) override; /** * Removes the authoritative records for the specified sessions. * * If a step-down happens on this node as this method is running, it may fail. */ - Status removeRecords(OperationContext* opCtx, const LogicalSessionIdSet& sessions) override; + void removeRecords(OperationContext* opCtx, const LogicalSessionIdSet& sessions) override; /** * Returns the subset of sessions from the given set that do not have entries @@ -82,8 +81,8 @@ public: * If a step-down happens on this node as this method is running, it may * return stale results. */ - StatusWith<LogicalSessionIdSet> findRemovedSessions( - OperationContext* opCtx, const LogicalSessionIdSet& sessions) override; + LogicalSessionIdSet findRemovedSessions(OperationContext* opCtx, + const LogicalSessionIdSet& sessions) override; private: auto _makePrimaryConnection(OperationContext* opCtx); diff --git a/src/mongo/db/sessions_collection_sharded.cpp b/src/mongo/db/sessions_collection_sharded.cpp index 0e91b8e8f71..243fc1b3c97 100644 --- a/src/mongo/db/sessions_collection_sharded.cpp +++ b/src/mongo/db/sessions_collection_sharded.cpp @@ -143,8 +143,8 @@ void SessionsCollectionSharded::checkSessionsCollectionExists(OperationContext* uassertStatusOK(_checkCacheForSessionsCollection(opCtx)); } -Status SessionsCollectionSharded::refreshSessions(OperationContext* opCtx, - const LogicalSessionRecordSet& sessions) { +void SessionsCollectionSharded::refreshSessions(OperationContext* opCtx, + const LogicalSessionRecordSet& sessions) { auto send = [&](BSONObj toSend) { auto opMsg = OpMsgRequest::fromDBAndBody(NamespaceString::kLogicalSessionsNamespace.db(), toSend); @@ -157,13 +157,13 @@ Status SessionsCollectionSharded::refreshSessions(OperationContext* opCtx, return response.toStatus(); }; - return doRefresh(NamespaceString::kLogicalSessionsNamespace, - _groupSessionRecordsByOwningShard(opCtx, sessions), - send); + doRefresh(NamespaceString::kLogicalSessionsNamespace, + _groupSessionRecordsByOwningShard(opCtx, sessions), + send); } -Status SessionsCollectionSharded::removeRecords(OperationContext* opCtx, - const LogicalSessionIdSet& sessions) { +void SessionsCollectionSharded::removeRecords(OperationContext* opCtx, + const LogicalSessionIdSet& sessions) { auto send = [&](BSONObj toSend) { auto opMsg = OpMsgRequest::fromDBAndBody(NamespaceString::kLogicalSessionsNamespace.db(), toSend); @@ -176,41 +176,31 @@ Status SessionsCollectionSharded::removeRecords(OperationContext* opCtx, return response.toStatus(); }; - return doRemove(NamespaceString::kLogicalSessionsNamespace, - _groupSessionIdsByOwningShard(opCtx, sessions), - send); + doRemove(NamespaceString::kLogicalSessionsNamespace, + _groupSessionIdsByOwningShard(opCtx, sessions), + send); } -StatusWith<LogicalSessionIdSet> SessionsCollectionSharded::findRemovedSessions( +LogicalSessionIdSet SessionsCollectionSharded::findRemovedSessions( OperationContext* opCtx, const LogicalSessionIdSet& sessions) { - auto send = [&](BSONObj toSend) -> StatusWith<BSONObj> { - auto qr = QueryRequest::makeFromFindCommand( - NamespaceString::kLogicalSessionsNamespace, toSend, false); - if (!qr.isOK()) { - return qr.getStatus(); - } + auto send = [&](BSONObj toSend) -> BSONObj { + auto qr = uassertStatusOK(QueryRequest::makeFromFindCommand( + NamespaceString::kLogicalSessionsNamespace, toSend, false)); const boost::intrusive_ptr<ExpressionContext> expCtx; - auto cq = CanonicalQuery::canonicalize(opCtx, - std::move(qr.getValue()), - expCtx, - ExtensionsCallbackNoop(), - MatchExpressionParser::kBanAllSpecialFeatures); - if (!cq.isOK()) { - return cq.getStatus(); - } + auto cq = uassertStatusOK( + CanonicalQuery::canonicalize(opCtx, + std::move(qr), + expCtx, + ExtensionsCallbackNoop(), + MatchExpressionParser::kBanAllSpecialFeatures)); // Do the work to generate the first batch of results. This blocks waiting to get responses // from the shard(s). std::vector<BSONObj> batch; CursorId cursorId; - try { - cursorId = ClusterFind::runQuery( - opCtx, *cq.getValue(), ReadPreferenceSetting::get(opCtx), &batch); - } catch (const DBException& ex) { - return ex.toStatus(); - } + cursorId = ClusterFind::runQuery(opCtx, *cq, ReadPreferenceSetting::get(opCtx), &batch); rpc::OpMsgReplyBuilder replyBuilder; CursorResponseBuilder::Options options; diff --git a/src/mongo/db/sessions_collection_sharded.h b/src/mongo/db/sessions_collection_sharded.h index 55cd728c891..755034d91b7 100644 --- a/src/mongo/db/sessions_collection_sharded.h +++ b/src/mongo/db/sessions_collection_sharded.h @@ -60,16 +60,15 @@ public: * Updates the last-use times on the given sessions to be greater than * or equal to the current time. */ - Status refreshSessions(OperationContext* opCtx, - const LogicalSessionRecordSet& sessions) override; + void refreshSessions(OperationContext* opCtx, const LogicalSessionRecordSet& sessions) override; /** * Removes the authoritative records for the specified sessions. */ - Status removeRecords(OperationContext* opCtx, const LogicalSessionIdSet& sessions) override; + void removeRecords(OperationContext* opCtx, const LogicalSessionIdSet& sessions) override; - StatusWith<LogicalSessionIdSet> findRemovedSessions( - OperationContext* opCtx, const LogicalSessionIdSet& sessions) override; + LogicalSessionIdSet findRemovedSessions(OperationContext* opCtx, + const LogicalSessionIdSet& sessions) override; protected: Status _checkCacheForSessionsCollection(OperationContext* opCtx); diff --git a/src/mongo/db/sessions_collection_standalone.cpp b/src/mongo/db/sessions_collection_standalone.cpp index 9d1bb415641..83f83748d2b 100644 --- a/src/mongo/db/sessions_collection_standalone.cpp +++ b/src/mongo/db/sessions_collection_standalone.cpp @@ -95,25 +95,25 @@ void SessionsCollectionStandalone::checkSessionsCollectionExists(OperationContex (localLogicalSessionTimeoutMinutes * 60)); } -Status SessionsCollectionStandalone::refreshSessions(OperationContext* opCtx, - const LogicalSessionRecordSet& sessions) { +void SessionsCollectionStandalone::refreshSessions(OperationContext* opCtx, + const LogicalSessionRecordSet& sessions) { const std::vector<LogicalSessionRecord> sessionsVector(sessions.begin(), sessions.end()); DBDirectClient client(opCtx); - return doRefresh(NamespaceString::kLogicalSessionsNamespace, - sessionsVector, - makeSendFnForBatchWrite(NamespaceString::kLogicalSessionsNamespace, &client)); + doRefresh(NamespaceString::kLogicalSessionsNamespace, + sessionsVector, + makeSendFnForBatchWrite(NamespaceString::kLogicalSessionsNamespace, &client)); } -Status SessionsCollectionStandalone::removeRecords(OperationContext* opCtx, - const LogicalSessionIdSet& sessions) { +void SessionsCollectionStandalone::removeRecords(OperationContext* opCtx, + const LogicalSessionIdSet& sessions) { const std::vector<LogicalSessionId> sessionsVector(sessions.begin(), sessions.end()); DBDirectClient client(opCtx); - return doRemove(NamespaceString::kLogicalSessionsNamespace, - sessionsVector, - makeSendFnForBatchWrite(NamespaceString::kLogicalSessionsNamespace, &client)); + doRemove(NamespaceString::kLogicalSessionsNamespace, + sessionsVector, + makeSendFnForBatchWrite(NamespaceString::kLogicalSessionsNamespace, &client)); } -StatusWith<LogicalSessionIdSet> SessionsCollectionStandalone::findRemovedSessions( +LogicalSessionIdSet SessionsCollectionStandalone::findRemovedSessions( OperationContext* opCtx, const LogicalSessionIdSet& sessions) { const std::vector<LogicalSessionId> sessionsVector(sessions.begin(), sessions.end()); DBDirectClient client(opCtx); diff --git a/src/mongo/db/sessions_collection_standalone.h b/src/mongo/db/sessions_collection_standalone.h index 8c6f8d1f91a..d8de38502ca 100644 --- a/src/mongo/db/sessions_collection_standalone.h +++ b/src/mongo/db/sessions_collection_standalone.h @@ -57,16 +57,15 @@ public: * Updates the last-use times on the given sessions to be greater than * or equal to the current time. */ - Status refreshSessions(OperationContext* opCtx, - const LogicalSessionRecordSet& sessions) override; + void refreshSessions(OperationContext* opCtx, const LogicalSessionRecordSet& sessions) override; /** * Removes the authoritative records for the specified sessions. */ - Status removeRecords(OperationContext* opCtx, const LogicalSessionIdSet& sessions) override; + void removeRecords(OperationContext* opCtx, const LogicalSessionIdSet& sessions) override; - StatusWith<LogicalSessionIdSet> findRemovedSessions( - OperationContext* opCtx, const LogicalSessionIdSet& sessions) override; + LogicalSessionIdSet findRemovedSessions(OperationContext* opCtx, + const LogicalSessionIdSet& sessions) override; }; } // namespace mongo diff --git a/src/mongo/dbtests/logical_sessions_tests.cpp b/src/mongo/dbtests/logical_sessions_tests.cpp index 94b0123afce..2c41c92b1f2 100644 --- a/src/mongo/dbtests/logical_sessions_tests.cpp +++ b/src/mongo/dbtests/logical_sessions_tests.cpp @@ -128,8 +128,7 @@ public: ASSERT_OK(res); // Remove one record, the other stays - res = collection()->removeRecords(opCtx(), {record1.getId()}); - ASSERT_OK(res); + collection()->removeRecords(opCtx(), {record1.getId()}); auto swRecord = fetchRecord(opCtx(), record1.getId()); ASSERT(!swRecord.isOK()); @@ -149,15 +148,13 @@ public: auto thePast = now - Minutes(5); // Attempt to refresh with no active records, should succeed (and do nothing). - auto resRefresh = collection()->refreshSessions(opCtx(), LogicalSessionRecordSet{}); - ASSERT(resRefresh.isOK()); + collection()->refreshSessions(opCtx(), LogicalSessionRecordSet{}); // Attempt to refresh one active record, should succeed. auto record1 = makeRecord(thePast); auto res = insertRecord(opCtx(), record1); ASSERT_OK(res); - resRefresh = collection()->refreshSessions(opCtx(), {record1}); - ASSERT(resRefresh.isOK()); + collection()->refreshSessions(opCtx(), {record1}); // The timestamp on the refreshed record should be updated. auto swRecord = fetchRecord(opCtx(), record1.getId()); @@ -169,8 +166,7 @@ public: // Attempt to refresh a record that is not present, should upsert it. auto record2 = makeRecord(thePast); - resRefresh = collection()->refreshSessions(opCtx(), {record2}); - ASSERT(resRefresh.isOK()); + collection()->refreshSessions(opCtx(), {record2}); swRecord = fetchRecord(opCtx(), record2.getId()); ASSERT(swRecord.isOK()); @@ -195,8 +191,7 @@ public: } // Run the refresh, should succeed. - resRefresh = collection()->refreshSessions(opCtx(), toRefresh); - ASSERT(resRefresh.isOK()); + collection()->refreshSessions(opCtx(), toRefresh); // Ensure that the right number of timestamps were updated. auto n = db.count(NamespaceString(ns()), BSON("lastUse" << now)); @@ -219,9 +214,8 @@ public: LogicalSessionIdSet lsids{notInsertedRecord.getId()}; auto response = collection()->findRemovedSessions(opCtx(), lsids); - ASSERT_EQ(response.isOK(), true); - ASSERT_EQ(response.getValue().size(), 1u); - ASSERT(*(response.getValue().begin()) == notInsertedRecord.getId()); + ASSERT_EQ(response.size(), 1u); + ASSERT(*(response.begin()) == notInsertedRecord.getId()); } // if a record is there, it hasn't been removed @@ -229,8 +223,7 @@ public: LogicalSessionIdSet lsids{insertedRecord.getId()}; auto response = collection()->findRemovedSessions(opCtx(), lsids); - ASSERT_EQ(response.isOK(), true); - ASSERT_EQ(response.getValue().size(), 0u); + ASSERT_EQ(response.size(), 0u); } // We can tell the difference with multiple records @@ -238,9 +231,8 @@ public: LogicalSessionIdSet lsids{insertedRecord.getId(), notInsertedRecord.getId()}; auto response = collection()->findRemovedSessions(opCtx(), lsids); - ASSERT_EQ(response.isOK(), true); - ASSERT_EQ(response.getValue().size(), 1u); - ASSERT(*(response.getValue().begin()) == notInsertedRecord.getId()); + ASSERT_EQ(response.size(), 1u); + ASSERT(*(response.begin()) == notInsertedRecord.getId()); } // Batch logic works @@ -262,9 +254,8 @@ public: } auto response = collection()->findRemovedSessions(opCtx(), mixedRecords); - ASSERT_EQ(response.isOK(), true); - ASSERT_EQ(response.getValue().size(), 5000u); - ASSERT(response.getValue() == uninsertedRecords); + ASSERT_EQ(response.size(), 5000u); + ASSERT(response == uninsertedRecords); } } }; diff --git a/src/mongo/s/session_catalog_router.cpp b/src/mongo/s/session_catalog_router.cpp index 5c35434fe25..de5b26c3f53 100644 --- a/src/mongo/s/session_catalog_router.cpp +++ b/src/mongo/s/session_catalog_router.cpp @@ -54,7 +54,7 @@ int RouterSessionCatalog::reapSessionsOlderThan(OperationContext* opCtx, }); // From the passed-in sessions, find the ones which are actually expired/removed - auto expiredSessionIds = uassertStatusOK(sessionsCollection.findRemovedSessions(opCtx, lsids)); + auto expiredSessionIds = sessionsCollection.findRemovedSessions(opCtx, lsids); // Remove the session ids from the in-memory catalog int numReaped = 0; |