summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJason Carey <jcarey@argv.me>2017-08-29 15:34:20 -0400
committerJason Carey <jcarey@argv.me>2017-08-31 16:10:18 -0400
commitc351caa6815218c5b4a9801342ccbb1b050f6aea (patch)
treeac65b981a61218ff9384a0f68a89d9b3141ea4bb
parented619087e8dc51eb13578f5ebdd60f8ffee750aa (diff)
downloadmongo-c351caa6815218c5b4a9801342ccbb1b050f6aea.tar.gz
SERVER-30805 add LSC::findRemovedSessions()
Implements a findRemovedSessions method for the logical session collection and impls for the various backends.
-rw-r--r--src/mongo/db/SConscript16
-rw-r--r--src/mongo/db/clientcursor.cpp6
-rw-r--r--src/mongo/db/cursor_manager.cpp10
-rw-r--r--src/mongo/db/cursor_manager.h2
-rw-r--r--src/mongo/db/cursor_server_params.cpp59
-rw-r--r--src/mongo/db/cursor_server_params.h43
-rw-r--r--src/mongo/db/logical_session_id.idl50
-rw-r--r--src/mongo/db/sessions_collection.cpp67
-rw-r--r--src/mongo/db/sessions_collection.h15
-rw-r--r--src/mongo/db/sessions_collection_mock.h5
-rw-r--r--src/mongo/db/sessions_collection_rs.cpp93
-rw-r--r--src/mongo/db/sessions_collection_rs.h3
-rw-r--r--src/mongo/db/sessions_collection_sharded.cpp49
-rw-r--r--src/mongo/db/sessions_collection_sharded.h3
-rw-r--r--src/mongo/db/sessions_collection_standalone.cpp6
-rw-r--r--src/mongo/db/sessions_collection_standalone.h3
-rw-r--r--src/mongo/dbtests/cursor_manager_test.cpp15
-rw-r--r--src/mongo/dbtests/logical_sessions_tests.cpp65
-rw-r--r--src/mongo/s/query/SConscript1
-rw-r--r--src/mongo/s/query/cluster_cursor_cleanup_job.cpp21
20 files changed, 451 insertions, 81 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index 1302ce14683..204dcfe02fc 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -636,8 +636,9 @@ env.Library(
"$BUILD_DIR/mongo/db/logical_session_cache",
"$BUILD_DIR/mongo/db/logical_session_id",
"$BUILD_DIR/mongo/util/background_job",
- "query/query",
+ "cursor_server_params",
"background",
+ "query/query",
],
)
@@ -1042,6 +1043,9 @@ env.Library(
'logical_session_id',
'sessions_collection',
],
+ LIBDEPS_PRIVATE=[
+ '$BUILD_DIR/mongo/s/query/cluster_query',
+ ],
)
env.Library(
@@ -1448,6 +1452,16 @@ env.Library(
)
env.Library(
+ target='cursor_server_params',
+ source=[
+ 'cursor_server_params.cpp',
+ ],
+ LIBDEPS=[
+ '$BUILD_DIR/mongo/db/server_parameters',
+ ],
+)
+
+env.Library(
target='ttl_collection_cache',
source=[
'ttl_collection_cache.cpp',
diff --git a/src/mongo/db/clientcursor.cpp b/src/mongo/db/clientcursor.cpp
index 4f169b2fa1f..52007343a9c 100644
--- a/src/mongo/db/clientcursor.cpp
+++ b/src/mongo/db/clientcursor.cpp
@@ -45,10 +45,10 @@
#include "mongo/db/commands.h"
#include "mongo/db/commands/server_status.h"
#include "mongo/db/commands/server_status_metric.h"
+#include "mongo/db/cursor_server_params.h"
#include "mongo/db/jsobj.h"
#include "mongo/db/repl/repl_client_info.h"
#include "mongo/db/repl/replication_coordinator_global.h"
-#include "mongo/db/server_parameters.h"
#include "mongo/util/background.h"
#include "mongo/util/concurrency/idle_thread_block.h"
#include "mongo/util/exit.h"
@@ -71,8 +71,6 @@ static ServerStatusMetricField<Counter64> dCursorStatsOpenNoTimeout("cursor.open
static ServerStatusMetricField<Counter64> dCursorStatusTimedout("cursor.timedOut",
&cursorStatsTimedOut);
-MONGO_EXPORT_SERVER_PARAMETER(clientCursorMonitorFrequencySecs, int, 4);
-
long long ClientCursor::totalOpen() {
return cursorStatsOpen.get();
}
@@ -287,7 +285,7 @@ public:
CursorManager::timeoutCursorsGlobal(opCtx.get(), now));
}
MONGO_IDLE_THREAD_BLOCK;
- sleepsecs(clientCursorMonitorFrequencySecs.load());
+ sleepsecs(getClientCursorMonitorFrequencySecs());
}
}
};
diff --git a/src/mongo/db/cursor_manager.cpp b/src/mongo/db/cursor_manager.cpp
index a4b5712965f..364116a0fd1 100644
--- a/src/mongo/db/cursor_manager.cpp
+++ b/src/mongo/db/cursor_manager.cpp
@@ -39,6 +39,7 @@
#include "mongo/db/catalog/database.h"
#include "mongo/db/catalog/database_holder.h"
#include "mongo/db/client.h"
+#include "mongo/db/cursor_server_params.h"
#include "mongo/db/db_raii.h"
#include "mongo/db/kill_sessions_common.h"
#include "mongo/db/logical_session_cache.h"
@@ -55,13 +56,6 @@
namespace mongo {
using std::vector;
-constexpr Minutes CursorManager::kDefaultCursorTimeoutMinutes;
-
-MONGO_EXPORT_SERVER_PARAMETER(
- cursorTimeoutMillis,
- int,
- durationCount<Milliseconds>(CursorManager::kDefaultCursorTimeoutMinutes));
-
constexpr int CursorManager::kNumPartitions;
namespace {
@@ -451,7 +445,7 @@ bool CursorManager::cursorShouldTimeout_inlock(const ClientCursor* cursor, Date_
if (cursor->isNoTimeout() || cursor->_isPinned) {
return false;
}
- return (now - cursor->_lastUseDate) >= Milliseconds(cursorTimeoutMillis.load());
+ return (now - cursor->_lastUseDate) >= Milliseconds(getCursorTimeoutMillis());
}
std::size_t CursorManager::timeoutCursors(OperationContext* opCtx, Date_t now) {
diff --git a/src/mongo/db/cursor_manager.h b/src/mongo/db/cursor_manager.h
index c2fdc12e93c..7c113c89235 100644
--- a/src/mongo/db/cursor_manager.h
+++ b/src/mongo/db/cursor_manager.h
@@ -77,8 +77,6 @@ class PlanExecutor;
*/
class CursorManager {
public:
- // The number of minutes a cursor is allowed to be idle before timing out.
- static constexpr Minutes kDefaultCursorTimeoutMinutes{10};
using RegistrationToken = Partitioned<unordered_set<PlanExecutor*>>::PartitionId;
/**
diff --git a/src/mongo/db/cursor_server_params.cpp b/src/mongo/db/cursor_server_params.cpp
new file mode 100644
index 00000000000..39c3038c0b8
--- /dev/null
+++ b/src/mongo/db/cursor_server_params.cpp
@@ -0,0 +1,59 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/cursor_server_params.h"
+
+#include "mongo/db/server_parameters.h"
+
+namespace mongo {
+namespace {
+
+static constexpr Minutes kDefaultCursorTimeoutMinutes{10};
+
+MONGO_EXPORT_SERVER_PARAMETER(clientCursorMonitorFrequencySecs, int, 4);
+MONGO_EXPORT_SERVER_PARAMETER(cursorTimeoutMillis,
+ long long,
+ durationCount<Milliseconds>(kDefaultCursorTimeoutMinutes));
+
+} // namespace
+
+int getClientCursorMonitorFrequencySecs() {
+ return clientCursorMonitorFrequencySecs.load();
+}
+
+long long getCursorTimeoutMillis() {
+ return cursorTimeoutMillis.load();
+}
+
+Milliseconds getDefaultCursorTimeoutMillis() {
+ return kDefaultCursorTimeoutMinutes;
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/cursor_server_params.h b/src/mongo/db/cursor_server_params.h
new file mode 100644
index 00000000000..eadc7f6d0b1
--- /dev/null
+++ b/src/mongo/db/cursor_server_params.h
@@ -0,0 +1,43 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/util/duration.h"
+
+namespace mongo {
+
+int getClientCursorMonitorFrequencySecs();
+
+// Period of time after which mortal cursors are killed for inactivity. Configurable with server
+// parameter "cursorTimeoutMillis".
+long long getCursorTimeoutMillis();
+
+Milliseconds getDefaultCursorTimeoutMillis();
+
+} // namespace mongo
diff --git a/src/mongo/db/logical_session_id.idl b/src/mongo/db/logical_session_id.idl
index 24b58226f51..96310498441 100644
--- a/src/mongo/db/logical_session_id.idl
+++ b/src/mongo/db/logical_session_id.idl
@@ -112,3 +112,53 @@ structs:
operation executes."
type: TxnNumber
optional: true
+
+ SessionsCollectionFetchResultIndividualResult:
+ description: "Individual result"
+ strict: true
+ fields:
+ _id: LogicalSessionId
+
+ SessionsCollectionFetchResultCursor:
+ description: "Cursor object"
+ strict: false
+ fields:
+ firstBatch: array<SessionsCollectionFetchResultIndividualResult>
+
+ SessionsCollectionFetchResult:
+ description: "Parser for pulling out the fetch results from SessionsCollection::fetch"
+ strict: false
+ fields:
+ cursor: SessionsCollectionFetchResultCursor
+
+ SessionsCollectionFetchRequestFilterId:
+ description: "Id"
+ strict: true
+ fields:
+ $in:
+ type: array<LogicalSessionId>
+ cpp_name: "in"
+
+ SessionsCollectionFetchRequestFilter:
+ description: "filter"
+ strict: true
+ fields:
+ _id: SessionsCollectionFetchRequestFilterId
+
+ SessionsCollectionFetchRequestProjection:
+ description: "projection"
+ strict: true
+ fields:
+ _id: int
+
+ SessionsCollectionFetchRequest:
+ description: "Parser for forming the fetch request for SessionsCollection::fetch"
+ strict: true
+ fields:
+ find: namespacestring
+ filter: SessionsCollectionFetchRequestFilter
+ projection: SessionsCollectionFetchRequestProjection
+ batchSize: int
+ singleBatch: bool
+ allowPartialResults: bool
+ limit: int
diff --git a/src/mongo/db/sessions_collection.cpp b/src/mongo/db/sessions_collection.cpp
index aa43c706ddb..d1618901350 100644
--- a/src/mongo/db/sessions_collection.cpp
+++ b/src/mongo/db/sessions_collection.cpp
@@ -180,6 +180,19 @@ SessionsCollection::SendBatchFn SessionsCollection::makeSendFnForCommand(DBClien
return send;
}
+SessionsCollection::FindBatchFn SessionsCollection::makeFindFnForCommand(DBClientBase* client) {
+ auto send = [client](BSONObj cmd) -> StatusWith<BSONObj> {
+ BSONObj res;
+ if (!client->runCommand(SessionsCollection::kSessionsDb.toString(), cmd, res)) {
+ return getStatusFromCommandResult(res);
+ }
+
+ return res;
+ };
+
+ return send;
+}
+
Status SessionsCollection::doRefresh(const LogicalSessionRecordSet& sessions,
Date_t refreshTime,
SendBatchFn send) {
@@ -232,4 +245,58 @@ Status SessionsCollection::doRemoveExternal(const LogicalSessionIdSet& sessions,
return Status::OK();
}
+StatusWith<LogicalSessionIdSet> SessionsCollection::doFetch(const LogicalSessionIdSet& sessions,
+ FindBatchFn send) {
+ auto makeT = [] { return std::vector<LogicalSessionId>{}; };
+
+ auto add = [](std::vector<LogicalSessionId>& batch, const LogicalSessionId& record) {
+ batch.push_back(record);
+ };
+
+ LogicalSessionIdSet removed = sessions;
+
+ auto wrappedSend = [&](BSONObj batch) {
+ auto swBatchResult = send(batch);
+
+ if (!swBatchResult.isOK()) {
+ return swBatchResult.getStatus();
+ } else {
+ auto result = SessionsCollectionFetchResult::parse("SessionsCollectionFetchResult"_sd,
+ swBatchResult.getValue());
+
+ for (const auto& lsid : result.getCursor().getFirstBatch()) {
+ removed.erase(lsid.get_id());
+ }
+
+ return Status::OK();
+ }
+ };
+
+ auto sendLocal = [&](std::vector<LogicalSessionId>& batch) {
+ SessionsCollectionFetchRequest request;
+
+ request.setFind(NamespaceString{SessionsCollection::kSessionsCollection});
+ request.setFilter({});
+ request.getFilter().set_id({});
+ request.getFilter().get_id().setIn(batch);
+
+ request.setProjection({});
+ request.getProjection().set_id(1);
+ request.setBatchSize(batch.size());
+ request.setLimit(batch.size());
+ request.setAllowPartialResults(true);
+ request.setSingleBatch(true);
+
+ return wrappedSend(request.toBSON());
+ };
+
+ auto status = runBulkGeneric(makeT, add, sendLocal, sessions);
+
+ if (!status.isOK()) {
+ return status;
+ }
+
+ return removed;
+}
+
} // namespace mongo
diff --git a/src/mongo/db/sessions_collection.h b/src/mongo/db/sessions_collection.h
index 553a74224a4..3c9a5d65fb1 100644
--- a/src/mongo/db/sessions_collection.h
+++ b/src/mongo/db/sessions_collection.h
@@ -71,6 +71,14 @@ public:
*/
virtual Status removeRecords(OperationContext* opCtx, const LogicalSessionIdSet& sessions) = 0;
+ /**
+ * Checks a set of lsids and returns the set that no longer exists
+ *
+ * Returns an error if the fetch cannot occur, for example from a network error.
+ */
+ virtual StatusWith<LogicalSessionIdSet> findRemovedSessions(
+ OperationContext* opCtx, const LogicalSessionIdSet& sessions) = 0;
+
protected:
/**
* Makes a send function for the given client.
@@ -78,6 +86,8 @@ protected:
using SendBatchFn = stdx::function<Status(BSONObj batch)>;
SendBatchFn makeSendFnForCommand(DBClientBase* client);
SendBatchFn makeSendFnForBatchWrite(DBClientBase* client);
+ using FindBatchFn = stdx::function<StatusWith<BSONObj>(BSONObj batch)>;
+ FindBatchFn makeFindFnForCommand(DBClientBase* client);
/**
* Formats and sends batches of refreshes for the given set of sessions.
@@ -92,6 +102,11 @@ protected:
*/
Status doRemove(const LogicalSessionIdSet& sessions, SendBatchFn send);
Status doRemoveExternal(const LogicalSessionIdSet& sessions, SendBatchFn send);
+
+ /**
+ * Formats and sends batches of fetches for the given set of sessions.
+ */
+ StatusWith<LogicalSessionIdSet> doFetch(const LogicalSessionIdSet& sessions, FindBatchFn send);
};
} // namespace mongo
diff --git a/src/mongo/db/sessions_collection_mock.h b/src/mongo/db/sessions_collection_mock.h
index cf0a88c324c..f88d877b17b 100644
--- a/src/mongo/db/sessions_collection_mock.h
+++ b/src/mongo/db/sessions_collection_mock.h
@@ -111,6 +111,11 @@ public:
return _impl->removeRecords(sessions);
}
+ StatusWith<LogicalSessionIdSet> findRemovedSessions(
+ OperationContext* opCtx, const LogicalSessionIdSet& sessions) override {
+ return LogicalSessionIdSet{};
+ }
+
private:
std::shared_ptr<MockSessionsCollectionImpl> _impl;
};
diff --git a/src/mongo/db/sessions_collection_rs.cpp b/src/mongo/db/sessions_collection_rs.cpp
index f153d31f385..89387af9531 100644
--- a/src/mongo/db/sessions_collection_rs.cpp
+++ b/src/mongo/db/sessions_collection_rs.cpp
@@ -84,7 +84,8 @@ Status makePrimaryConnection(OperationContext* opCtx, boost::optional<ScopedDbCo
}
template <typename Callback>
-Status runIfStandaloneOrPrimary(OperationContext* opCtx, Callback callback) {
+auto runIfStandaloneOrPrimary(OperationContext* opCtx, Callback callback)
+ -> boost::optional<decltype(std::declval<Callback>()())> {
Lock::DBLock lk(opCtx, SessionsCollection::kSessionsDb, MODE_IX);
Lock::CollectionLock lock(opCtx->lockState(), SessionsCollection::kSessionsFullNS, MODE_IX);
@@ -93,61 +94,71 @@ Status runIfStandaloneOrPrimary(OperationContext* opCtx, Callback callback) {
return callback();
}
- return {ErrorCodes::NotMaster, "Cannot perform a local write"};
+ return boost::none;
}
-} // namespace
-
-Status SessionsCollectionRS::refreshSessions(OperationContext* opCtx,
- const LogicalSessionRecordSet& sessions,
- Date_t refreshTime) {
- bool ran = false;
-
- // If we are the primary, write directly to ourself.
- auto status = runIfStandaloneOrPrimary(opCtx, [&] {
- ran = true;
- DBDirectClient client(opCtx);
- return doRefresh(sessions, refreshTime, makeSendFnForBatchWrite(&client));
- });
-
- if (ran) {
- return status;
- }
-
- // If we are not writeable, then send refreshSessions cmd to the primary.
+template <typename Callback>
+auto sendToPrimary(OperationContext* opCtx, Callback callback)
+ -> decltype(std::declval<Callback>()(static_cast<DBClientBase*>(nullptr))) {
boost::optional<ScopedDbConnection> conn;
auto res = makePrimaryConnection(opCtx, &conn);
if (!res.isOK()) {
return res;
}
- return doRefreshExternal(sessions, refreshTime, makeSendFnForCommand(conn->get()));
+ return callback(conn->get());
}
-Status SessionsCollectionRS::removeRecords(OperationContext* opCtx,
- const LogicalSessionIdSet& sessions) {
- bool ran = false;
-
+template <typename LocalCallback, typename RemoteCallback>
+auto dispatch(OperationContext* opCtx, LocalCallback localCallback, RemoteCallback remoteCallback)
+ -> decltype(std::declval<RemoteCallback>()(static_cast<DBClientBase*>(nullptr))) {
// If we are the primary, write directly to ourself.
- auto status = runIfStandaloneOrPrimary(opCtx, [&] {
- ran = true;
- DBDirectClient client(opCtx);
- return doRemove(sessions, makeSendFnForBatchWrite(&client));
- });
-
- if (ran) {
- return status;
- }
+ auto result = runIfStandaloneOrPrimary(opCtx, [&] { return localCallback(); });
- // If we are not writeable, then send endSessions cmd to the primary
- boost::optional<ScopedDbConnection> conn;
- auto res = makePrimaryConnection(opCtx, &conn);
- if (!res.isOK()) {
- return res;
+ if (result) {
+ return *result;
}
- return doRemoveExternal(sessions, makeSendFnForCommand(conn->get()));
+ return sendToPrimary(opCtx, remoteCallback);
}
+} // namespace
+
+Status SessionsCollectionRS::refreshSessions(OperationContext* opCtx,
+ const LogicalSessionRecordSet& sessions,
+ Date_t refreshTime) {
+ return dispatch(opCtx,
+ [&] {
+ DBDirectClient client(opCtx);
+ return doRefresh(sessions, refreshTime, makeSendFnForBatchWrite(&client));
+ },
+ [&](DBClientBase* client) {
+ return doRefreshExternal(
+ sessions, refreshTime, makeSendFnForCommand(client));
+ });
+}
+
+Status SessionsCollectionRS::removeRecords(OperationContext* opCtx,
+ const LogicalSessionIdSet& sessions) {
+ return dispatch(opCtx,
+ [&] {
+ DBDirectClient client(opCtx);
+ return doRemove(sessions, makeSendFnForBatchWrite(&client));
+ },
+ [&](DBClientBase* client) {
+ return doRemoveExternal(sessions, makeSendFnForCommand(client));
+ });
+}
+
+StatusWith<LogicalSessionIdSet> SessionsCollectionRS::findRemovedSessions(
+ OperationContext* opCtx, const LogicalSessionIdSet& sessions) {
+ return dispatch(
+ opCtx,
+ [&] {
+ DBDirectClient client(opCtx);
+ return doFetch(sessions, makeFindFnForCommand(&client));
+ },
+ [&](DBClientBase* client) { return doFetch(sessions, makeFindFnForCommand(client)); });
+}
} // namespace mongo
diff --git a/src/mongo/db/sessions_collection_rs.h b/src/mongo/db/sessions_collection_rs.h
index 799f4564291..a273dd88955 100644
--- a/src/mongo/db/sessions_collection_rs.h
+++ b/src/mongo/db/sessions_collection_rs.h
@@ -65,6 +65,9 @@ public:
* Removes the authoritative records for the specified sessions.
*/
Status removeRecords(OperationContext* opCtx, const LogicalSessionIdSet& sessions) override;
+
+ StatusWith<LogicalSessionIdSet> findRemovedSessions(
+ OperationContext* opCtx, const LogicalSessionIdSet& sessions) override;
};
} // namespace mongo
diff --git a/src/mongo/db/sessions_collection_sharded.cpp b/src/mongo/db/sessions_collection_sharded.cpp
index 46ad4ce5a21..6c35d317b26 100644
--- a/src/mongo/db/sessions_collection_sharded.cpp
+++ b/src/mongo/db/sessions_collection_sharded.cpp
@@ -30,8 +30,12 @@
#include "mongo/db/sessions_collection_sharded.h"
+#include "mongo/db/matcher/extensions_callback_noop.h"
#include "mongo/db/operation_context.h"
+#include "mongo/db/query/canonical_query.h"
+#include "mongo/db/query/query_request.h"
#include "mongo/s/commands/cluster_write.h"
+#include "mongo/s/query/cluster_find.h"
#include "mongo/s/write_ops/batch_write_exec.h"
#include "mongo/s/write_ops/batched_command_request.h"
#include "mongo/s/write_ops/batched_command_response.h"
@@ -93,5 +97,50 @@ Status SessionsCollectionSharded::removeRecords(OperationContext* opCtx,
return doRemove(sessions, send);
}
+StatusWith<LogicalSessionIdSet> SessionsCollectionSharded::findRemovedSessions(
+ OperationContext* opCtx, const LogicalSessionIdSet& sessions) {
+
+ auto send = [&](BSONObj toSend) -> StatusWith<BSONObj> {
+ const NamespaceString nss(SessionsCollection::kSessionsFullNS);
+
+ auto qr = QueryRequest::makeFromFindCommand(nss, toSend, false);
+ if (!qr.isOK()) {
+ return qr.getStatus();
+ }
+
+ const boost::intrusive_ptr<ExpressionContext> expCtx;
+ auto cq = CanonicalQuery::canonicalize(opCtx,
+ std::move(qr.getValue()),
+ expCtx,
+ ExtensionsCallbackNoop(),
+ MatchExpressionParser::kAllowAllSpecialFeatures &
+ ~MatchExpressionParser::AllowedFeatures::kExpr);
+ if (!cq.isOK()) {
+ return cq.getStatus();
+ }
+
+ // Do the work to generate the first batch of results. This blocks waiting to get responses
+ // from the shard(s).
+ std::vector<BSONObj> batch;
+ BSONObj viewDefinition;
+ auto cursorId = ClusterFind::runQuery(
+ opCtx, *cq.getValue(), ReadPreferenceSetting::get(opCtx), &batch, &viewDefinition);
+
+ if (!cursorId.isOK()) {
+ return cursorId.getStatus();
+ }
+
+ BSONObjBuilder result;
+ CursorResponseBuilder firstBatch(/*firstBatch*/ true, &result);
+ for (const auto& obj : batch) {
+ firstBatch.append(obj);
+ }
+ firstBatch.done(cursorId.getValue(), nss.ns());
+
+ return result.obj();
+ };
+
+ return doFetch(sessions, send);
+}
} // namespace mongo
diff --git a/src/mongo/db/sessions_collection_sharded.h b/src/mongo/db/sessions_collection_sharded.h
index ddf15ba11d6..cb0a2e6fd9e 100644
--- a/src/mongo/db/sessions_collection_sharded.h
+++ b/src/mongo/db/sessions_collection_sharded.h
@@ -55,6 +55,9 @@ public:
* Removes the authoritative records for the specified sessions.
*/
Status removeRecords(OperationContext* opCtx, const LogicalSessionIdSet& sessions) override;
+
+ StatusWith<LogicalSessionIdSet> findRemovedSessions(
+ OperationContext* opCtx, const LogicalSessionIdSet& sessions) override;
};
} // namespace mongo
diff --git a/src/mongo/db/sessions_collection_standalone.cpp b/src/mongo/db/sessions_collection_standalone.cpp
index 79f36e66fd6..8187b19ad06 100644
--- a/src/mongo/db/sessions_collection_standalone.cpp
+++ b/src/mongo/db/sessions_collection_standalone.cpp
@@ -57,4 +57,10 @@ Status SessionsCollectionStandalone::removeRecords(OperationContext* opCtx,
return doRemove(sessions, makeSendFnForBatchWrite(&client));
}
+StatusWith<LogicalSessionIdSet> SessionsCollectionStandalone::findRemovedSessions(
+ OperationContext* opCtx, const LogicalSessionIdSet& sessions) {
+ DBDirectClient client(opCtx);
+ return doFetch(sessions, makeFindFnForCommand(&client));
+}
+
} // namespace mongo
diff --git a/src/mongo/db/sessions_collection_standalone.h b/src/mongo/db/sessions_collection_standalone.h
index a164117fe44..2c1bfe32096 100644
--- a/src/mongo/db/sessions_collection_standalone.h
+++ b/src/mongo/db/sessions_collection_standalone.h
@@ -54,6 +54,9 @@ public:
* Removes the authoritative records for the specified sessions.
*/
Status removeRecords(OperationContext* opCtx, const LogicalSessionIdSet& sessions) override;
+
+ StatusWith<LogicalSessionIdSet> findRemovedSessions(
+ OperationContext* opCtx, const LogicalSessionIdSet& sessions) override;
};
} // namespace mongo
diff --git a/src/mongo/dbtests/cursor_manager_test.cpp b/src/mongo/dbtests/cursor_manager_test.cpp
index 6505a64bea0..5b96f891823 100644
--- a/src/mongo/dbtests/cursor_manager_test.cpp
+++ b/src/mongo/dbtests/cursor_manager_test.cpp
@@ -36,6 +36,7 @@
#include "mongo/db/client.h"
#include "mongo/db/clientcursor.h"
#include "mongo/db/cursor_manager.h"
+#include "mongo/db/cursor_server_params.h"
#include "mongo/db/exec/queued_data_stage.h"
#include "mongo/db/exec/working_set.h"
#include "mongo/db/exec/working_set_common.h"
@@ -286,7 +287,7 @@ TEST_F(CursorManagerTest, InactiveCursorShouldTimeout) {
ASSERT_EQ(0UL, cursorManager->timeoutCursors(_opCtx.get(), Date_t()));
- clock->advance(Milliseconds(CursorManager::kDefaultCursorTimeoutMinutes));
+ clock->advance(getDefaultCursorTimeoutMillis());
ASSERT_EQ(1UL, cursorManager->timeoutCursors(_opCtx.get(), clock->now()));
ASSERT_EQ(0UL, cursorManager->numCursors());
@@ -309,7 +310,7 @@ TEST_F(CursorManagerTest, InactivePinnedCursorShouldNotTimeout) {
{makeFakePlanExecutor(), NamespaceString{"test.collection"}, {}, false, BSONObj()});
// The pin is still in scope, so it should not time out.
- clock->advance(Milliseconds(CursorManager::kDefaultCursorTimeoutMinutes));
+ clock->advance(getDefaultCursorTimeoutMillis());
ASSERT_EQ(0UL, cursorManager->timeoutCursors(_opCtx.get(), clock->now()));
}
@@ -330,7 +331,7 @@ TEST_F(CursorManagerTest, InactiveKilledCursorsShouldTimeout) {
_opCtx.get(), collectionGoingAway, "KilledCursorsShouldTimeoutTest");
// Advance the clock to simulate time passing.
- clock->advance(Milliseconds(CursorManager::kDefaultCursorTimeoutMinutes));
+ clock->advance(getDefaultCursorTimeoutMillis());
ASSERT_EQ(1UL, cursorManager->timeoutCursors(_opCtx.get(), clock->now()));
ASSERT_EQ(0UL, cursorManager->numCursors());
@@ -352,7 +353,7 @@ TEST_F(CursorManagerTest, InactiveKilledCursorsThatAreStillPinnedShouldNotTimeou
_opCtx.get(), collectionGoingAway, "KilledCursorsShouldTimeoutTest");
// Advance the clock to simulate time passing.
- clock->advance(Milliseconds(CursorManager::kDefaultCursorTimeoutMinutes));
+ clock->advance(getDefaultCursorTimeoutMillis());
// The pin is still in scope, so it should not time out.
ASSERT_EQ(0UL, cursorManager->timeoutCursors(_opCtx.get(), clock->now()));
@@ -384,7 +385,7 @@ TEST_F(CursorManagerTest, UsingACursorShouldUpdateTimeOfLastUse) {
// We should be able to time out the unused cursor, but the one we used should stay alive.
ASSERT_EQ(2UL, cursorManager->numCursors());
- clock->advance(Milliseconds(CursorManager::kDefaultCursorTimeoutMinutes) - Milliseconds(1));
+ clock->advance(getDefaultCursorTimeoutMillis() - Milliseconds(1));
ASSERT_EQ(1UL, cursorManager->timeoutCursors(_opCtx.get(), clock->now()));
ASSERT_EQ(1UL, cursorManager->numCursors());
@@ -407,7 +408,7 @@ TEST_F(CursorManagerTest, CursorShouldNotTimeOutUntilIdleForLongEnoughAfterBeing
_opCtx.get(), {makeFakePlanExecutor(), kTestNss, {}, false, BSONObj()});
// Advance the clock to simulate time passing.
- clock->advance(CursorManager::kDefaultCursorTimeoutMinutes + Milliseconds(1));
+ clock->advance(getDefaultCursorTimeoutMillis() + Milliseconds(1));
// Make sure the pinned cursor does not time out, before or after unpinning it.
ASSERT_EQ(1UL, cursorManager->numCursors());
@@ -422,7 +423,7 @@ TEST_F(CursorManagerTest, CursorShouldNotTimeOutUntilIdleForLongEnoughAfterBeing
// Advance the clock to simulate more time passing, then assert that the now-inactive cursor
// times out.
- clock->advance(CursorManager::kDefaultCursorTimeoutMinutes + Milliseconds(1));
+ clock->advance(getDefaultCursorTimeoutMillis() + Milliseconds(1));
ASSERT_EQ(1UL, cursorManager->timeoutCursors(_opCtx.get(), clock->now()));
ASSERT_EQ(0UL, cursorManager->numCursors());
}
diff --git a/src/mongo/dbtests/logical_sessions_tests.cpp b/src/mongo/dbtests/logical_sessions_tests.cpp
index 9c8d773b6a3..51614181623 100644
--- a/src/mongo/dbtests/logical_sessions_tests.cpp
+++ b/src/mongo/dbtests/logical_sessions_tests.cpp
@@ -205,6 +205,70 @@ public:
}
};
+// Test that finding entries in this collection works.
+class SessionsCollectionStandaloneFindTest : public SessionsCollectionStandaloneTest {
+public:
+ void run() {
+ DBDirectClient db(opCtx());
+ auto notInsertedRecord = makeRecord();
+
+ auto insertedRecord = makeRecord();
+ ASSERT(insertRecord(opCtx(), insertedRecord).isOK());
+
+ // if a record isn't there, it's been removed
+ {
+ LogicalSessionIdSet lsids{notInsertedRecord.getId()};
+
+ auto response = collection()->findRemovedSessions(opCtx(), lsids);
+ ASSERT_EQ(response.isOK(), true);
+ ASSERT_EQ(response.getValue().size(), 1u);
+ ASSERT(*(response.getValue().begin()) == notInsertedRecord.getId());
+ }
+
+ // if a record is there, it hasn't been removed
+ {
+ LogicalSessionIdSet lsids{insertedRecord.getId()};
+
+ auto response = collection()->findRemovedSessions(opCtx(), lsids);
+ ASSERT_EQ(response.isOK(), true);
+ ASSERT_EQ(response.getValue().size(), 0u);
+ }
+
+ // We can tell the difference with multiple records
+ {
+ LogicalSessionIdSet lsids{insertedRecord.getId(), notInsertedRecord.getId()};
+
+ auto response = collection()->findRemovedSessions(opCtx(), lsids);
+ ASSERT_EQ(response.isOK(), true);
+ ASSERT_EQ(response.getValue().size(), 1u);
+ ASSERT(*(response.getValue().begin()) == notInsertedRecord.getId());
+ }
+
+ // Batch logic works
+ {
+ LogicalSessionIdSet insertedRecords;
+ LogicalSessionIdSet uninsertedRecords;
+ LogicalSessionIdSet mixedRecords;
+
+ for (int i = 0; i < 5000; ++i) {
+ auto insertedRecord = makeRecord();
+ ASSERT(insertRecord(opCtx(), insertedRecord).isOK());
+ insertedRecords.insert(insertedRecord.getId());
+
+ auto uninsertedRecord = makeRecord();
+ uninsertedRecords.insert(uninsertedRecord.getId());
+
+ mixedRecords.insert(insertedRecord.getId());
+ mixedRecords.insert(uninsertedRecord.getId());
+ }
+
+ auto response = collection()->findRemovedSessions(opCtx(), mixedRecords);
+ ASSERT_EQ(response.isOK(), true);
+ ASSERT_EQ(response.getValue().size(), 5000u);
+ ASSERT(response.getValue() == uninsertedRecords);
+ }
+ }
+};
class All : public Suite {
public:
All() : Suite("logical_sessions") {}
@@ -212,6 +276,7 @@ public:
void setupTests() {
add<SessionsCollectionStandaloneRemoveTest>();
add<SessionsCollectionStandaloneRefreshTest>();
+ add<SessionsCollectionStandaloneFindTest>();
}
};
diff --git a/src/mongo/s/query/SConscript b/src/mongo/s/query/SConscript
index 311a5b47a01..460dcf7912e 100644
--- a/src/mongo/s/query/SConscript
+++ b/src/mongo/s/query/SConscript
@@ -159,6 +159,7 @@ env.Library(
"cluster_cursor_cleanup_job.cpp",
],
LIBDEPS=[
+ "$BUILD_DIR/mongo/db/cursor_server_params",
"$BUILD_DIR/mongo/s/coreshard",
"$BUILD_DIR/mongo/util/background_job",
],
diff --git a/src/mongo/s/query/cluster_cursor_cleanup_job.cpp b/src/mongo/s/query/cluster_cursor_cleanup_job.cpp
index dcf19acf775..963f306b2b2 100644
--- a/src/mongo/s/query/cluster_cursor_cleanup_job.cpp
+++ b/src/mongo/s/query/cluster_cursor_cleanup_job.cpp
@@ -31,6 +31,7 @@
#include "mongo/s/query/cluster_cursor_cleanup_job.h"
#include "mongo/db/client.h"
+#include "mongo/db/cursor_server_params.h"
#include "mongo/db/server_parameters.h"
#include "mongo/s/grid.h"
#include "mongo/s/query/cluster_cursor_manager.h"
@@ -40,22 +41,6 @@
namespace mongo {
-namespace {
-
-// Period of time after which mortal cursors are killed for inactivity. Configurable with server
-// parameter "cursorTimeoutMillis".
-AtomicInt64 cursorTimeoutMillis(durationCount<Milliseconds>(Minutes(10)));
-
-ExportedServerParameter<long long, ServerParameterType::kStartupAndRuntime>
- cursorTimeoutMillisConfig(ServerParameterSet::getGlobal(),
- "cursorTimeoutMillis",
- &cursorTimeoutMillis);
-
-// Frequency with which ClusterCursorCleanupJob is run.
-MONGO_EXPORT_SERVER_PARAMETER(clientCursorMonitorFrequencySecs, long long, 4);
-
-} // namespace
-
ClusterCursorCleanupJob clusterCursorCleanupJob;
std::string ClusterCursorCleanupJob::name() const {
@@ -72,7 +57,7 @@ void ClusterCursorCleanupJob::run() {
while (!globalInShutdownDeprecated()) {
// Mirroring the behavior in CursorManager::timeoutCursors(), a negative value for
// cursorTimeoutMillis has the same effect as a 0 value: cursors are cleaned immediately.
- auto cursorTimeoutValue = cursorTimeoutMillis.load();
+ auto cursorTimeoutValue = getCursorTimeoutMillis();
const auto opCtx = client->makeOperationContext();
Date_t cutoff = (cursorTimeoutValue > 0)
? (Date_t::now() - Milliseconds(cursorTimeoutValue))
@@ -81,7 +66,7 @@ void ClusterCursorCleanupJob::run() {
manager->incrementCursorsTimedOut(manager->reapZombieCursors(opCtx.get()));
MONGO_IDLE_THREAD_BLOCK;
- sleepsecs(clientCursorMonitorFrequencySecs.load());
+ sleepsecs(getClientCursorMonitorFrequencySecs());
}
}