diff options
-rw-r--r-- | src/mongo/db/SConscript | 16 | ||||
-rw-r--r-- | src/mongo/db/clientcursor.cpp | 6 | ||||
-rw-r--r-- | src/mongo/db/cursor_manager.cpp | 10 | ||||
-rw-r--r-- | src/mongo/db/cursor_manager.h | 2 | ||||
-rw-r--r-- | src/mongo/db/cursor_server_params.cpp | 59 | ||||
-rw-r--r-- | src/mongo/db/cursor_server_params.h | 43 | ||||
-rw-r--r-- | src/mongo/db/logical_session_id.idl | 50 | ||||
-rw-r--r-- | src/mongo/db/sessions_collection.cpp | 67 | ||||
-rw-r--r-- | src/mongo/db/sessions_collection.h | 15 | ||||
-rw-r--r-- | src/mongo/db/sessions_collection_mock.h | 5 | ||||
-rw-r--r-- | src/mongo/db/sessions_collection_rs.cpp | 93 | ||||
-rw-r--r-- | src/mongo/db/sessions_collection_rs.h | 3 | ||||
-rw-r--r-- | src/mongo/db/sessions_collection_sharded.cpp | 49 | ||||
-rw-r--r-- | src/mongo/db/sessions_collection_sharded.h | 3 | ||||
-rw-r--r-- | src/mongo/db/sessions_collection_standalone.cpp | 6 | ||||
-rw-r--r-- | src/mongo/db/sessions_collection_standalone.h | 3 | ||||
-rw-r--r-- | src/mongo/dbtests/cursor_manager_test.cpp | 15 | ||||
-rw-r--r-- | src/mongo/dbtests/logical_sessions_tests.cpp | 65 | ||||
-rw-r--r-- | src/mongo/s/query/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/s/query/cluster_cursor_cleanup_job.cpp | 21 |
20 files changed, 451 insertions, 81 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index 1302ce14683..204dcfe02fc 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -636,8 +636,9 @@ env.Library( "$BUILD_DIR/mongo/db/logical_session_cache", "$BUILD_DIR/mongo/db/logical_session_id", "$BUILD_DIR/mongo/util/background_job", - "query/query", + "cursor_server_params", "background", + "query/query", ], ) @@ -1042,6 +1043,9 @@ env.Library( 'logical_session_id', 'sessions_collection', ], + LIBDEPS_PRIVATE=[ + '$BUILD_DIR/mongo/s/query/cluster_query', + ], ) env.Library( @@ -1448,6 +1452,16 @@ env.Library( ) env.Library( + target='cursor_server_params', + source=[ + 'cursor_server_params.cpp', + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/db/server_parameters', + ], +) + +env.Library( target='ttl_collection_cache', source=[ 'ttl_collection_cache.cpp', diff --git a/src/mongo/db/clientcursor.cpp b/src/mongo/db/clientcursor.cpp index 4f169b2fa1f..52007343a9c 100644 --- a/src/mongo/db/clientcursor.cpp +++ b/src/mongo/db/clientcursor.cpp @@ -45,10 +45,10 @@ #include "mongo/db/commands.h" #include "mongo/db/commands/server_status.h" #include "mongo/db/commands/server_status_metric.h" +#include "mongo/db/cursor_server_params.h" #include "mongo/db/jsobj.h" #include "mongo/db/repl/repl_client_info.h" #include "mongo/db/repl/replication_coordinator_global.h" -#include "mongo/db/server_parameters.h" #include "mongo/util/background.h" #include "mongo/util/concurrency/idle_thread_block.h" #include "mongo/util/exit.h" @@ -71,8 +71,6 @@ static ServerStatusMetricField<Counter64> dCursorStatsOpenNoTimeout("cursor.open static ServerStatusMetricField<Counter64> dCursorStatusTimedout("cursor.timedOut", &cursorStatsTimedOut); -MONGO_EXPORT_SERVER_PARAMETER(clientCursorMonitorFrequencySecs, int, 4); - long long ClientCursor::totalOpen() { return cursorStatsOpen.get(); } @@ -287,7 +285,7 @@ public: CursorManager::timeoutCursorsGlobal(opCtx.get(), now)); } MONGO_IDLE_THREAD_BLOCK; - sleepsecs(clientCursorMonitorFrequencySecs.load()); + sleepsecs(getClientCursorMonitorFrequencySecs()); } } }; diff --git a/src/mongo/db/cursor_manager.cpp b/src/mongo/db/cursor_manager.cpp index a4b5712965f..364116a0fd1 100644 --- a/src/mongo/db/cursor_manager.cpp +++ b/src/mongo/db/cursor_manager.cpp @@ -39,6 +39,7 @@ #include "mongo/db/catalog/database.h" #include "mongo/db/catalog/database_holder.h" #include "mongo/db/client.h" +#include "mongo/db/cursor_server_params.h" #include "mongo/db/db_raii.h" #include "mongo/db/kill_sessions_common.h" #include "mongo/db/logical_session_cache.h" @@ -55,13 +56,6 @@ namespace mongo { using std::vector; -constexpr Minutes CursorManager::kDefaultCursorTimeoutMinutes; - -MONGO_EXPORT_SERVER_PARAMETER( - cursorTimeoutMillis, - int, - durationCount<Milliseconds>(CursorManager::kDefaultCursorTimeoutMinutes)); - constexpr int CursorManager::kNumPartitions; namespace { @@ -451,7 +445,7 @@ bool CursorManager::cursorShouldTimeout_inlock(const ClientCursor* cursor, Date_ if (cursor->isNoTimeout() || cursor->_isPinned) { return false; } - return (now - cursor->_lastUseDate) >= Milliseconds(cursorTimeoutMillis.load()); + return (now - cursor->_lastUseDate) >= Milliseconds(getCursorTimeoutMillis()); } std::size_t CursorManager::timeoutCursors(OperationContext* opCtx, Date_t now) { diff --git a/src/mongo/db/cursor_manager.h b/src/mongo/db/cursor_manager.h index c2fdc12e93c..7c113c89235 100644 --- a/src/mongo/db/cursor_manager.h +++ b/src/mongo/db/cursor_manager.h @@ -77,8 +77,6 @@ class PlanExecutor; */ class CursorManager { public: - // The number of minutes a cursor is allowed to be idle before timing out. - static constexpr Minutes kDefaultCursorTimeoutMinutes{10}; using RegistrationToken = Partitioned<unordered_set<PlanExecutor*>>::PartitionId; /** diff --git a/src/mongo/db/cursor_server_params.cpp b/src/mongo/db/cursor_server_params.cpp new file mode 100644 index 00000000000..39c3038c0b8 --- /dev/null +++ b/src/mongo/db/cursor_server_params.cpp @@ -0,0 +1,59 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/cursor_server_params.h" + +#include "mongo/db/server_parameters.h" + +namespace mongo { +namespace { + +static constexpr Minutes kDefaultCursorTimeoutMinutes{10}; + +MONGO_EXPORT_SERVER_PARAMETER(clientCursorMonitorFrequencySecs, int, 4); +MONGO_EXPORT_SERVER_PARAMETER(cursorTimeoutMillis, + long long, + durationCount<Milliseconds>(kDefaultCursorTimeoutMinutes)); + +} // namespace + +int getClientCursorMonitorFrequencySecs() { + return clientCursorMonitorFrequencySecs.load(); +} + +long long getCursorTimeoutMillis() { + return cursorTimeoutMillis.load(); +} + +Milliseconds getDefaultCursorTimeoutMillis() { + return kDefaultCursorTimeoutMinutes; +} + +} // namespace mongo diff --git a/src/mongo/db/cursor_server_params.h b/src/mongo/db/cursor_server_params.h new file mode 100644 index 00000000000..eadc7f6d0b1 --- /dev/null +++ b/src/mongo/db/cursor_server_params.h @@ -0,0 +1,43 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include "mongo/util/duration.h" + +namespace mongo { + +int getClientCursorMonitorFrequencySecs(); + +// Period of time after which mortal cursors are killed for inactivity. Configurable with server +// parameter "cursorTimeoutMillis". +long long getCursorTimeoutMillis(); + +Milliseconds getDefaultCursorTimeoutMillis(); + +} // namespace mongo diff --git a/src/mongo/db/logical_session_id.idl b/src/mongo/db/logical_session_id.idl index 24b58226f51..96310498441 100644 --- a/src/mongo/db/logical_session_id.idl +++ b/src/mongo/db/logical_session_id.idl @@ -112,3 +112,53 @@ structs: operation executes." type: TxnNumber optional: true + + SessionsCollectionFetchResultIndividualResult: + description: "Individual result" + strict: true + fields: + _id: LogicalSessionId + + SessionsCollectionFetchResultCursor: + description: "Cursor object" + strict: false + fields: + firstBatch: array<SessionsCollectionFetchResultIndividualResult> + + SessionsCollectionFetchResult: + description: "Parser for pulling out the fetch results from SessionsCollection::fetch" + strict: false + fields: + cursor: SessionsCollectionFetchResultCursor + + SessionsCollectionFetchRequestFilterId: + description: "Id" + strict: true + fields: + $in: + type: array<LogicalSessionId> + cpp_name: "in" + + SessionsCollectionFetchRequestFilter: + description: "filter" + strict: true + fields: + _id: SessionsCollectionFetchRequestFilterId + + SessionsCollectionFetchRequestProjection: + description: "projection" + strict: true + fields: + _id: int + + SessionsCollectionFetchRequest: + description: "Parser for forming the fetch request for SessionsCollection::fetch" + strict: true + fields: + find: namespacestring + filter: SessionsCollectionFetchRequestFilter + projection: SessionsCollectionFetchRequestProjection + batchSize: int + singleBatch: bool + allowPartialResults: bool + limit: int diff --git a/src/mongo/db/sessions_collection.cpp b/src/mongo/db/sessions_collection.cpp index aa43c706ddb..d1618901350 100644 --- a/src/mongo/db/sessions_collection.cpp +++ b/src/mongo/db/sessions_collection.cpp @@ -180,6 +180,19 @@ SessionsCollection::SendBatchFn SessionsCollection::makeSendFnForCommand(DBClien return send; } +SessionsCollection::FindBatchFn SessionsCollection::makeFindFnForCommand(DBClientBase* client) { + auto send = [client](BSONObj cmd) -> StatusWith<BSONObj> { + BSONObj res; + if (!client->runCommand(SessionsCollection::kSessionsDb.toString(), cmd, res)) { + return getStatusFromCommandResult(res); + } + + return res; + }; + + return send; +} + Status SessionsCollection::doRefresh(const LogicalSessionRecordSet& sessions, Date_t refreshTime, SendBatchFn send) { @@ -232,4 +245,58 @@ Status SessionsCollection::doRemoveExternal(const LogicalSessionIdSet& sessions, return Status::OK(); } +StatusWith<LogicalSessionIdSet> SessionsCollection::doFetch(const LogicalSessionIdSet& sessions, + FindBatchFn send) { + auto makeT = [] { return std::vector<LogicalSessionId>{}; }; + + auto add = [](std::vector<LogicalSessionId>& batch, const LogicalSessionId& record) { + batch.push_back(record); + }; + + LogicalSessionIdSet removed = sessions; + + auto wrappedSend = [&](BSONObj batch) { + auto swBatchResult = send(batch); + + if (!swBatchResult.isOK()) { + return swBatchResult.getStatus(); + } else { + auto result = SessionsCollectionFetchResult::parse("SessionsCollectionFetchResult"_sd, + swBatchResult.getValue()); + + for (const auto& lsid : result.getCursor().getFirstBatch()) { + removed.erase(lsid.get_id()); + } + + return Status::OK(); + } + }; + + auto sendLocal = [&](std::vector<LogicalSessionId>& batch) { + SessionsCollectionFetchRequest request; + + request.setFind(NamespaceString{SessionsCollection::kSessionsCollection}); + request.setFilter({}); + request.getFilter().set_id({}); + request.getFilter().get_id().setIn(batch); + + request.setProjection({}); + request.getProjection().set_id(1); + request.setBatchSize(batch.size()); + request.setLimit(batch.size()); + request.setAllowPartialResults(true); + request.setSingleBatch(true); + + return wrappedSend(request.toBSON()); + }; + + auto status = runBulkGeneric(makeT, add, sendLocal, sessions); + + if (!status.isOK()) { + return status; + } + + return removed; +} + } // namespace mongo diff --git a/src/mongo/db/sessions_collection.h b/src/mongo/db/sessions_collection.h index 553a74224a4..3c9a5d65fb1 100644 --- a/src/mongo/db/sessions_collection.h +++ b/src/mongo/db/sessions_collection.h @@ -71,6 +71,14 @@ public: */ virtual Status removeRecords(OperationContext* opCtx, const LogicalSessionIdSet& sessions) = 0; + /** + * Checks a set of lsids and returns the set that no longer exists + * + * Returns an error if the fetch cannot occur, for example from a network error. + */ + virtual StatusWith<LogicalSessionIdSet> findRemovedSessions( + OperationContext* opCtx, const LogicalSessionIdSet& sessions) = 0; + protected: /** * Makes a send function for the given client. @@ -78,6 +86,8 @@ protected: using SendBatchFn = stdx::function<Status(BSONObj batch)>; SendBatchFn makeSendFnForCommand(DBClientBase* client); SendBatchFn makeSendFnForBatchWrite(DBClientBase* client); + using FindBatchFn = stdx::function<StatusWith<BSONObj>(BSONObj batch)>; + FindBatchFn makeFindFnForCommand(DBClientBase* client); /** * Formats and sends batches of refreshes for the given set of sessions. @@ -92,6 +102,11 @@ protected: */ Status doRemove(const LogicalSessionIdSet& sessions, SendBatchFn send); Status doRemoveExternal(const LogicalSessionIdSet& sessions, SendBatchFn send); + + /** + * Formats and sends batches of fetches for the given set of sessions. + */ + StatusWith<LogicalSessionIdSet> doFetch(const LogicalSessionIdSet& sessions, FindBatchFn send); }; } // namespace mongo diff --git a/src/mongo/db/sessions_collection_mock.h b/src/mongo/db/sessions_collection_mock.h index cf0a88c324c..f88d877b17b 100644 --- a/src/mongo/db/sessions_collection_mock.h +++ b/src/mongo/db/sessions_collection_mock.h @@ -111,6 +111,11 @@ public: return _impl->removeRecords(sessions); } + StatusWith<LogicalSessionIdSet> findRemovedSessions( + OperationContext* opCtx, const LogicalSessionIdSet& sessions) override { + return LogicalSessionIdSet{}; + } + private: std::shared_ptr<MockSessionsCollectionImpl> _impl; }; diff --git a/src/mongo/db/sessions_collection_rs.cpp b/src/mongo/db/sessions_collection_rs.cpp index f153d31f385..89387af9531 100644 --- a/src/mongo/db/sessions_collection_rs.cpp +++ b/src/mongo/db/sessions_collection_rs.cpp @@ -84,7 +84,8 @@ Status makePrimaryConnection(OperationContext* opCtx, boost::optional<ScopedDbCo } template <typename Callback> -Status runIfStandaloneOrPrimary(OperationContext* opCtx, Callback callback) { +auto runIfStandaloneOrPrimary(OperationContext* opCtx, Callback callback) + -> boost::optional<decltype(std::declval<Callback>()())> { Lock::DBLock lk(opCtx, SessionsCollection::kSessionsDb, MODE_IX); Lock::CollectionLock lock(opCtx->lockState(), SessionsCollection::kSessionsFullNS, MODE_IX); @@ -93,61 +94,71 @@ Status runIfStandaloneOrPrimary(OperationContext* opCtx, Callback callback) { return callback(); } - return {ErrorCodes::NotMaster, "Cannot perform a local write"}; + return boost::none; } -} // namespace - -Status SessionsCollectionRS::refreshSessions(OperationContext* opCtx, - const LogicalSessionRecordSet& sessions, - Date_t refreshTime) { - bool ran = false; - - // If we are the primary, write directly to ourself. - auto status = runIfStandaloneOrPrimary(opCtx, [&] { - ran = true; - DBDirectClient client(opCtx); - return doRefresh(sessions, refreshTime, makeSendFnForBatchWrite(&client)); - }); - - if (ran) { - return status; - } - - // If we are not writeable, then send refreshSessions cmd to the primary. +template <typename Callback> +auto sendToPrimary(OperationContext* opCtx, Callback callback) + -> decltype(std::declval<Callback>()(static_cast<DBClientBase*>(nullptr))) { boost::optional<ScopedDbConnection> conn; auto res = makePrimaryConnection(opCtx, &conn); if (!res.isOK()) { return res; } - return doRefreshExternal(sessions, refreshTime, makeSendFnForCommand(conn->get())); + return callback(conn->get()); } -Status SessionsCollectionRS::removeRecords(OperationContext* opCtx, - const LogicalSessionIdSet& sessions) { - bool ran = false; - +template <typename LocalCallback, typename RemoteCallback> +auto dispatch(OperationContext* opCtx, LocalCallback localCallback, RemoteCallback remoteCallback) + -> decltype(std::declval<RemoteCallback>()(static_cast<DBClientBase*>(nullptr))) { // If we are the primary, write directly to ourself. - auto status = runIfStandaloneOrPrimary(opCtx, [&] { - ran = true; - DBDirectClient client(opCtx); - return doRemove(sessions, makeSendFnForBatchWrite(&client)); - }); - - if (ran) { - return status; - } + auto result = runIfStandaloneOrPrimary(opCtx, [&] { return localCallback(); }); - // If we are not writeable, then send endSessions cmd to the primary - boost::optional<ScopedDbConnection> conn; - auto res = makePrimaryConnection(opCtx, &conn); - if (!res.isOK()) { - return res; + if (result) { + return *result; } - return doRemoveExternal(sessions, makeSendFnForCommand(conn->get())); + return sendToPrimary(opCtx, remoteCallback); } +} // namespace + +Status SessionsCollectionRS::refreshSessions(OperationContext* opCtx, + const LogicalSessionRecordSet& sessions, + Date_t refreshTime) { + return dispatch(opCtx, + [&] { + DBDirectClient client(opCtx); + return doRefresh(sessions, refreshTime, makeSendFnForBatchWrite(&client)); + }, + [&](DBClientBase* client) { + return doRefreshExternal( + sessions, refreshTime, makeSendFnForCommand(client)); + }); +} + +Status SessionsCollectionRS::removeRecords(OperationContext* opCtx, + const LogicalSessionIdSet& sessions) { + return dispatch(opCtx, + [&] { + DBDirectClient client(opCtx); + return doRemove(sessions, makeSendFnForBatchWrite(&client)); + }, + [&](DBClientBase* client) { + return doRemoveExternal(sessions, makeSendFnForCommand(client)); + }); +} + +StatusWith<LogicalSessionIdSet> SessionsCollectionRS::findRemovedSessions( + OperationContext* opCtx, const LogicalSessionIdSet& sessions) { + return dispatch( + opCtx, + [&] { + DBDirectClient client(opCtx); + return doFetch(sessions, makeFindFnForCommand(&client)); + }, + [&](DBClientBase* client) { return doFetch(sessions, makeFindFnForCommand(client)); }); +} } // namespace mongo diff --git a/src/mongo/db/sessions_collection_rs.h b/src/mongo/db/sessions_collection_rs.h index 799f4564291..a273dd88955 100644 --- a/src/mongo/db/sessions_collection_rs.h +++ b/src/mongo/db/sessions_collection_rs.h @@ -65,6 +65,9 @@ public: * Removes the authoritative records for the specified sessions. */ Status removeRecords(OperationContext* opCtx, const LogicalSessionIdSet& sessions) override; + + StatusWith<LogicalSessionIdSet> findRemovedSessions( + OperationContext* opCtx, const LogicalSessionIdSet& sessions) override; }; } // namespace mongo diff --git a/src/mongo/db/sessions_collection_sharded.cpp b/src/mongo/db/sessions_collection_sharded.cpp index 46ad4ce5a21..6c35d317b26 100644 --- a/src/mongo/db/sessions_collection_sharded.cpp +++ b/src/mongo/db/sessions_collection_sharded.cpp @@ -30,8 +30,12 @@ #include "mongo/db/sessions_collection_sharded.h" +#include "mongo/db/matcher/extensions_callback_noop.h" #include "mongo/db/operation_context.h" +#include "mongo/db/query/canonical_query.h" +#include "mongo/db/query/query_request.h" #include "mongo/s/commands/cluster_write.h" +#include "mongo/s/query/cluster_find.h" #include "mongo/s/write_ops/batch_write_exec.h" #include "mongo/s/write_ops/batched_command_request.h" #include "mongo/s/write_ops/batched_command_response.h" @@ -93,5 +97,50 @@ Status SessionsCollectionSharded::removeRecords(OperationContext* opCtx, return doRemove(sessions, send); } +StatusWith<LogicalSessionIdSet> SessionsCollectionSharded::findRemovedSessions( + OperationContext* opCtx, const LogicalSessionIdSet& sessions) { + + auto send = [&](BSONObj toSend) -> StatusWith<BSONObj> { + const NamespaceString nss(SessionsCollection::kSessionsFullNS); + + auto qr = QueryRequest::makeFromFindCommand(nss, toSend, false); + if (!qr.isOK()) { + return qr.getStatus(); + } + + const boost::intrusive_ptr<ExpressionContext> expCtx; + auto cq = CanonicalQuery::canonicalize(opCtx, + std::move(qr.getValue()), + expCtx, + ExtensionsCallbackNoop(), + MatchExpressionParser::kAllowAllSpecialFeatures & + ~MatchExpressionParser::AllowedFeatures::kExpr); + if (!cq.isOK()) { + return cq.getStatus(); + } + + // Do the work to generate the first batch of results. This blocks waiting to get responses + // from the shard(s). + std::vector<BSONObj> batch; + BSONObj viewDefinition; + auto cursorId = ClusterFind::runQuery( + opCtx, *cq.getValue(), ReadPreferenceSetting::get(opCtx), &batch, &viewDefinition); + + if (!cursorId.isOK()) { + return cursorId.getStatus(); + } + + BSONObjBuilder result; + CursorResponseBuilder firstBatch(/*firstBatch*/ true, &result); + for (const auto& obj : batch) { + firstBatch.append(obj); + } + firstBatch.done(cursorId.getValue(), nss.ns()); + + return result.obj(); + }; + + return doFetch(sessions, send); +} } // namespace mongo diff --git a/src/mongo/db/sessions_collection_sharded.h b/src/mongo/db/sessions_collection_sharded.h index ddf15ba11d6..cb0a2e6fd9e 100644 --- a/src/mongo/db/sessions_collection_sharded.h +++ b/src/mongo/db/sessions_collection_sharded.h @@ -55,6 +55,9 @@ public: * Removes the authoritative records for the specified sessions. */ Status removeRecords(OperationContext* opCtx, const LogicalSessionIdSet& sessions) override; + + StatusWith<LogicalSessionIdSet> findRemovedSessions( + OperationContext* opCtx, const LogicalSessionIdSet& sessions) override; }; } // namespace mongo diff --git a/src/mongo/db/sessions_collection_standalone.cpp b/src/mongo/db/sessions_collection_standalone.cpp index 79f36e66fd6..8187b19ad06 100644 --- a/src/mongo/db/sessions_collection_standalone.cpp +++ b/src/mongo/db/sessions_collection_standalone.cpp @@ -57,4 +57,10 @@ Status SessionsCollectionStandalone::removeRecords(OperationContext* opCtx, return doRemove(sessions, makeSendFnForBatchWrite(&client)); } +StatusWith<LogicalSessionIdSet> SessionsCollectionStandalone::findRemovedSessions( + OperationContext* opCtx, const LogicalSessionIdSet& sessions) { + DBDirectClient client(opCtx); + return doFetch(sessions, makeFindFnForCommand(&client)); +} + } // namespace mongo diff --git a/src/mongo/db/sessions_collection_standalone.h b/src/mongo/db/sessions_collection_standalone.h index a164117fe44..2c1bfe32096 100644 --- a/src/mongo/db/sessions_collection_standalone.h +++ b/src/mongo/db/sessions_collection_standalone.h @@ -54,6 +54,9 @@ public: * Removes the authoritative records for the specified sessions. */ Status removeRecords(OperationContext* opCtx, const LogicalSessionIdSet& sessions) override; + + StatusWith<LogicalSessionIdSet> findRemovedSessions( + OperationContext* opCtx, const LogicalSessionIdSet& sessions) override; }; } // namespace mongo diff --git a/src/mongo/dbtests/cursor_manager_test.cpp b/src/mongo/dbtests/cursor_manager_test.cpp index 6505a64bea0..5b96f891823 100644 --- a/src/mongo/dbtests/cursor_manager_test.cpp +++ b/src/mongo/dbtests/cursor_manager_test.cpp @@ -36,6 +36,7 @@ #include "mongo/db/client.h" #include "mongo/db/clientcursor.h" #include "mongo/db/cursor_manager.h" +#include "mongo/db/cursor_server_params.h" #include "mongo/db/exec/queued_data_stage.h" #include "mongo/db/exec/working_set.h" #include "mongo/db/exec/working_set_common.h" @@ -286,7 +287,7 @@ TEST_F(CursorManagerTest, InactiveCursorShouldTimeout) { ASSERT_EQ(0UL, cursorManager->timeoutCursors(_opCtx.get(), Date_t())); - clock->advance(Milliseconds(CursorManager::kDefaultCursorTimeoutMinutes)); + clock->advance(getDefaultCursorTimeoutMillis()); ASSERT_EQ(1UL, cursorManager->timeoutCursors(_opCtx.get(), clock->now())); ASSERT_EQ(0UL, cursorManager->numCursors()); @@ -309,7 +310,7 @@ TEST_F(CursorManagerTest, InactivePinnedCursorShouldNotTimeout) { {makeFakePlanExecutor(), NamespaceString{"test.collection"}, {}, false, BSONObj()}); // The pin is still in scope, so it should not time out. - clock->advance(Milliseconds(CursorManager::kDefaultCursorTimeoutMinutes)); + clock->advance(getDefaultCursorTimeoutMillis()); ASSERT_EQ(0UL, cursorManager->timeoutCursors(_opCtx.get(), clock->now())); } @@ -330,7 +331,7 @@ TEST_F(CursorManagerTest, InactiveKilledCursorsShouldTimeout) { _opCtx.get(), collectionGoingAway, "KilledCursorsShouldTimeoutTest"); // Advance the clock to simulate time passing. - clock->advance(Milliseconds(CursorManager::kDefaultCursorTimeoutMinutes)); + clock->advance(getDefaultCursorTimeoutMillis()); ASSERT_EQ(1UL, cursorManager->timeoutCursors(_opCtx.get(), clock->now())); ASSERT_EQ(0UL, cursorManager->numCursors()); @@ -352,7 +353,7 @@ TEST_F(CursorManagerTest, InactiveKilledCursorsThatAreStillPinnedShouldNotTimeou _opCtx.get(), collectionGoingAway, "KilledCursorsShouldTimeoutTest"); // Advance the clock to simulate time passing. - clock->advance(Milliseconds(CursorManager::kDefaultCursorTimeoutMinutes)); + clock->advance(getDefaultCursorTimeoutMillis()); // The pin is still in scope, so it should not time out. ASSERT_EQ(0UL, cursorManager->timeoutCursors(_opCtx.get(), clock->now())); @@ -384,7 +385,7 @@ TEST_F(CursorManagerTest, UsingACursorShouldUpdateTimeOfLastUse) { // We should be able to time out the unused cursor, but the one we used should stay alive. ASSERT_EQ(2UL, cursorManager->numCursors()); - clock->advance(Milliseconds(CursorManager::kDefaultCursorTimeoutMinutes) - Milliseconds(1)); + clock->advance(getDefaultCursorTimeoutMillis() - Milliseconds(1)); ASSERT_EQ(1UL, cursorManager->timeoutCursors(_opCtx.get(), clock->now())); ASSERT_EQ(1UL, cursorManager->numCursors()); @@ -407,7 +408,7 @@ TEST_F(CursorManagerTest, CursorShouldNotTimeOutUntilIdleForLongEnoughAfterBeing _opCtx.get(), {makeFakePlanExecutor(), kTestNss, {}, false, BSONObj()}); // Advance the clock to simulate time passing. - clock->advance(CursorManager::kDefaultCursorTimeoutMinutes + Milliseconds(1)); + clock->advance(getDefaultCursorTimeoutMillis() + Milliseconds(1)); // Make sure the pinned cursor does not time out, before or after unpinning it. ASSERT_EQ(1UL, cursorManager->numCursors()); @@ -422,7 +423,7 @@ TEST_F(CursorManagerTest, CursorShouldNotTimeOutUntilIdleForLongEnoughAfterBeing // Advance the clock to simulate more time passing, then assert that the now-inactive cursor // times out. - clock->advance(CursorManager::kDefaultCursorTimeoutMinutes + Milliseconds(1)); + clock->advance(getDefaultCursorTimeoutMillis() + Milliseconds(1)); ASSERT_EQ(1UL, cursorManager->timeoutCursors(_opCtx.get(), clock->now())); ASSERT_EQ(0UL, cursorManager->numCursors()); } diff --git a/src/mongo/dbtests/logical_sessions_tests.cpp b/src/mongo/dbtests/logical_sessions_tests.cpp index 9c8d773b6a3..51614181623 100644 --- a/src/mongo/dbtests/logical_sessions_tests.cpp +++ b/src/mongo/dbtests/logical_sessions_tests.cpp @@ -205,6 +205,70 @@ public: } }; +// Test that finding entries in this collection works. +class SessionsCollectionStandaloneFindTest : public SessionsCollectionStandaloneTest { +public: + void run() { + DBDirectClient db(opCtx()); + auto notInsertedRecord = makeRecord(); + + auto insertedRecord = makeRecord(); + ASSERT(insertRecord(opCtx(), insertedRecord).isOK()); + + // if a record isn't there, it's been removed + { + LogicalSessionIdSet lsids{notInsertedRecord.getId()}; + + auto response = collection()->findRemovedSessions(opCtx(), lsids); + ASSERT_EQ(response.isOK(), true); + ASSERT_EQ(response.getValue().size(), 1u); + ASSERT(*(response.getValue().begin()) == notInsertedRecord.getId()); + } + + // if a record is there, it hasn't been removed + { + LogicalSessionIdSet lsids{insertedRecord.getId()}; + + auto response = collection()->findRemovedSessions(opCtx(), lsids); + ASSERT_EQ(response.isOK(), true); + ASSERT_EQ(response.getValue().size(), 0u); + } + + // We can tell the difference with multiple records + { + LogicalSessionIdSet lsids{insertedRecord.getId(), notInsertedRecord.getId()}; + + auto response = collection()->findRemovedSessions(opCtx(), lsids); + ASSERT_EQ(response.isOK(), true); + ASSERT_EQ(response.getValue().size(), 1u); + ASSERT(*(response.getValue().begin()) == notInsertedRecord.getId()); + } + + // Batch logic works + { + LogicalSessionIdSet insertedRecords; + LogicalSessionIdSet uninsertedRecords; + LogicalSessionIdSet mixedRecords; + + for (int i = 0; i < 5000; ++i) { + auto insertedRecord = makeRecord(); + ASSERT(insertRecord(opCtx(), insertedRecord).isOK()); + insertedRecords.insert(insertedRecord.getId()); + + auto uninsertedRecord = makeRecord(); + uninsertedRecords.insert(uninsertedRecord.getId()); + + mixedRecords.insert(insertedRecord.getId()); + mixedRecords.insert(uninsertedRecord.getId()); + } + + auto response = collection()->findRemovedSessions(opCtx(), mixedRecords); + ASSERT_EQ(response.isOK(), true); + ASSERT_EQ(response.getValue().size(), 5000u); + ASSERT(response.getValue() == uninsertedRecords); + } + } +}; class All : public Suite { public: All() : Suite("logical_sessions") {} @@ -212,6 +276,7 @@ public: void setupTests() { add<SessionsCollectionStandaloneRemoveTest>(); add<SessionsCollectionStandaloneRefreshTest>(); + add<SessionsCollectionStandaloneFindTest>(); } }; diff --git a/src/mongo/s/query/SConscript b/src/mongo/s/query/SConscript index 311a5b47a01..460dcf7912e 100644 --- a/src/mongo/s/query/SConscript +++ b/src/mongo/s/query/SConscript @@ -159,6 +159,7 @@ env.Library( "cluster_cursor_cleanup_job.cpp", ], LIBDEPS=[ + "$BUILD_DIR/mongo/db/cursor_server_params", "$BUILD_DIR/mongo/s/coreshard", "$BUILD_DIR/mongo/util/background_job", ], diff --git a/src/mongo/s/query/cluster_cursor_cleanup_job.cpp b/src/mongo/s/query/cluster_cursor_cleanup_job.cpp index dcf19acf775..963f306b2b2 100644 --- a/src/mongo/s/query/cluster_cursor_cleanup_job.cpp +++ b/src/mongo/s/query/cluster_cursor_cleanup_job.cpp @@ -31,6 +31,7 @@ #include "mongo/s/query/cluster_cursor_cleanup_job.h" #include "mongo/db/client.h" +#include "mongo/db/cursor_server_params.h" #include "mongo/db/server_parameters.h" #include "mongo/s/grid.h" #include "mongo/s/query/cluster_cursor_manager.h" @@ -40,22 +41,6 @@ namespace mongo { -namespace { - -// Period of time after which mortal cursors are killed for inactivity. Configurable with server -// parameter "cursorTimeoutMillis". -AtomicInt64 cursorTimeoutMillis(durationCount<Milliseconds>(Minutes(10))); - -ExportedServerParameter<long long, ServerParameterType::kStartupAndRuntime> - cursorTimeoutMillisConfig(ServerParameterSet::getGlobal(), - "cursorTimeoutMillis", - &cursorTimeoutMillis); - -// Frequency with which ClusterCursorCleanupJob is run. -MONGO_EXPORT_SERVER_PARAMETER(clientCursorMonitorFrequencySecs, long long, 4); - -} // namespace - ClusterCursorCleanupJob clusterCursorCleanupJob; std::string ClusterCursorCleanupJob::name() const { @@ -72,7 +57,7 @@ void ClusterCursorCleanupJob::run() { while (!globalInShutdownDeprecated()) { // Mirroring the behavior in CursorManager::timeoutCursors(), a negative value for // cursorTimeoutMillis has the same effect as a 0 value: cursors are cleaned immediately. - auto cursorTimeoutValue = cursorTimeoutMillis.load(); + auto cursorTimeoutValue = getCursorTimeoutMillis(); const auto opCtx = client->makeOperationContext(); Date_t cutoff = (cursorTimeoutValue > 0) ? (Date_t::now() - Milliseconds(cursorTimeoutValue)) @@ -81,7 +66,7 @@ void ClusterCursorCleanupJob::run() { manager->incrementCursorsTimedOut(manager->reapZombieCursors(opCtx.get())); MONGO_IDLE_THREAD_BLOCK; - sleepsecs(clientCursorMonitorFrequencySecs.load()); + sleepsecs(getClientCursorMonitorFrequencySecs()); } } |