summaryrefslogtreecommitdiff
path: root/src/mongo/s/client/scc_fast_query_handler.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/s/client/scc_fast_query_handler.cpp')
-rw-r--r--src/mongo/s/client/scc_fast_query_handler.cpp324
1 files changed, 153 insertions, 171 deletions
diff --git a/src/mongo/s/client/scc_fast_query_handler.cpp b/src/mongo/s/client/scc_fast_query_handler.cpp
index f09dd767a21..20a8f1a0193 100644
--- a/src/mongo/s/client/scc_fast_query_handler.cpp
+++ b/src/mongo/s/client/scc_fast_query_handler.cpp
@@ -45,214 +45,196 @@
namespace mongo {
- using std::unique_ptr;
- using std::endl;
- using std::string;
- using std::vector;
-
- /**
- * This parameter turns on fastest config reads for auth data only - *.system.users collections
- * and the "usersInfo" command. This should be enough to prevent a non-responsive config from
- * hanging other operations in a cluster.
- */
- MONGO_EXPORT_SERVER_PARAMETER(internalSCCAllowFastestAuthConfigReads, bool, false);
-
- /**
- * TESTING ONLY.
- *
- * This parameter turns on fastest config reads for *all* config.* collections except those
- * deemed extremely critical (config.version, config.locks).
- *
- * NOT FOR PRODUCTION USE.
- */
- MONGO_EXPORT_SERVER_PARAMETER(internalSCCAllowFastestMetadataConfigReads, bool, false);
+using std::unique_ptr;
+using std::endl;
+using std::string;
+using std::vector;
- //
- // The shared environment for MultiHostQueries
- //
-
- namespace {
-
- class MultiQueryEnv : public MultiHostQueryOp::SystemEnv {
- public:
-
- virtual ~MultiQueryEnv() {
- }
-
- Date_t currentTimeMillis();
+/**
+ * This parameter turns on fastest config reads for auth data only - *.system.users collections
+ * and the "usersInfo" command. This should be enough to prevent a non-responsive config from
+ * hanging other operations in a cluster.
+ */
+MONGO_EXPORT_SERVER_PARAMETER(internalSCCAllowFastestAuthConfigReads, bool, false);
- StatusWith<DBClientCursor*> doBlockingQuery(const ConnectionString& host,
- const QuerySpec& query);
- };
+/**
+ * TESTING ONLY.
+ *
+ * This parameter turns on fastest config reads for *all* config.* collections except those
+ * deemed extremely critical (config.version, config.locks).
+ *
+ * NOT FOR PRODUCTION USE.
+ */
+MONGO_EXPORT_SERVER_PARAMETER(internalSCCAllowFastestMetadataConfigReads, bool, false);
- StatusWith<DBClientCursor*> MultiQueryEnv::doBlockingQuery(const ConnectionString& host,
- const QuerySpec& query) {
+//
+// The shared environment for MultiHostQueries
+//
- //
- // Note that this function may be active during shutdown. This means that we must
- // handle connection pool shutdown exceptions (uasserts). The results of this
- // operation must then be correctly discarded by the calling thread.
- //
+namespace {
- unique_ptr<DBClientCursor> cursor;
+class MultiQueryEnv : public MultiHostQueryOp::SystemEnv {
+public:
+ virtual ~MultiQueryEnv() {}
- try {
+ Date_t currentTimeMillis();
- ScopedDbConnection conn(host, 30.0 /* timeout secs */);
+ StatusWith<DBClientCursor*> doBlockingQuery(const ConnectionString& host,
+ const QuerySpec& query);
+};
- cursor = conn->query(query.ns(),
- query.filter(),
- query.ntoreturn(),
- query.ntoskip(),
- query.fieldsPtr(),
- query.options(),
- 0);
+StatusWith<DBClientCursor*> MultiQueryEnv::doBlockingQuery(const ConnectionString& host,
+ const QuerySpec& query) {
+ //
+ // Note that this function may be active during shutdown. This means that we must
+ // handle connection pool shutdown exceptions (uasserts). The results of this
+ // operation must then be correctly discarded by the calling thread.
+ //
- if ( NULL == cursor.get()) {
+ unique_ptr<DBClientCursor> cursor;
- // Confusingly, exceptions here are written to the log, not thrown
+ try {
+ ScopedDbConnection conn(host, 30.0 /* timeout secs */);
- StringBuilder builder;
- builder << "error querying server " << host.toString()
- << ", could not create cursor for query";
+ cursor = conn->query(query.ns(),
+ query.filter(),
+ query.ntoreturn(),
+ query.ntoskip(),
+ query.fieldsPtr(),
+ query.options(),
+ 0);
- warning() << builder.str() << endl;
- return StatusWith<DBClientCursor*>(ErrorCodes::HostUnreachable, builder.str());
- }
+ if (NULL == cursor.get()) {
+ // Confusingly, exceptions here are written to the log, not thrown
- // Confusingly, this *detaches* the cursor from the connection it was established
- // on, further getMore calls will use a (potentially different) ScopedDbConnection.
- //
- // Calls done() too.
- cursor->attach(&conn);
- }
- catch (const DBException& ex) {
- return StatusWith<DBClientCursor*>(ex.toStatus());
- }
+ StringBuilder builder;
+ builder << "error querying server " << host.toString()
+ << ", could not create cursor for query";
- return StatusWith<DBClientCursor*>(cursor.release());
+ warning() << builder.str() << endl;
+ return StatusWith<DBClientCursor*>(ErrorCodes::HostUnreachable, builder.str());
}
- Date_t MultiQueryEnv::currentTimeMillis() {
- return jsTime();
- }
- }
-
- // Shared networking environment which executes queries.
- // NOTE: This environment must stay in scope as long as per-host threads are executing queries -
- // i.e. for the lifetime of the server.
- // Once the thread pools are disposed and connections shut down, the per-host threads should
- // be self-contained and correctly shut down after discarding the results.
- static MultiQueryEnv* _multiQueryEnv;
-
- namespace {
-
+ // Confusingly, this *detaches* the cursor from the connection it was established
+ // on, further getMore calls will use a (potentially different) ScopedDbConnection.
//
- // Create the shared multi-query environment at startup
- //
-
- MONGO_INITIALIZER(InitMultiQueryEnv)(InitializerContext* context) {
- // Leaked intentionally
- _multiQueryEnv = new MultiQueryEnv();
- return Status::OK();
- }
+ // Calls done() too.
+ cursor->attach(&conn);
+ } catch (const DBException& ex) {
+ return StatusWith<DBClientCursor*>(ex.toStatus());
}
- //
- // Per-SCC handling of queries
- //
+ return StatusWith<DBClientCursor*>(cursor.release());
+}
- SCCFastQueryHandler::SCCFastQueryHandler() :
- _queryThreads(1, false) {
- }
+Date_t MultiQueryEnv::currentTimeMillis() {
+ return jsTime();
+}
+}
- bool SCCFastQueryHandler::canHandleQuery(const string& ns, Query query) {
+// Shared networking environment which executes queries.
+// NOTE: This environment must stay in scope as long as per-host threads are executing queries -
+// i.e. for the lifetime of the server.
+// Once the thread pools are disposed and connections shut down, the per-host threads should
+// be self-contained and correctly shut down after discarding the results.
+static MultiQueryEnv* _multiQueryEnv;
- if (!internalSCCAllowFastestAuthConfigReads &&
- !internalSCCAllowFastestMetadataConfigReads) {
- return false;
- }
+namespace {
- //
- // More operations can be added here
- //
- // NOTE: Not all operations actually pass through the SCC _queryOnActive path - notable
- // exceptions include anything related to direct query ops and direct operations for
- // connection maintenance.
- //
+//
+// Create the shared multi-query environment at startup
+//
- NamespaceString nss(ns);
- if (nss.isCommand()) {
- BSONObj cmdObj = query.getFilter();
- string cmdName = cmdObj.firstElement().fieldName();
- if (cmdName == "usersInfo")
- return true;
- }
- else if (nss.coll() == "system.users") {
- return true;
- }
+MONGO_INITIALIZER(InitMultiQueryEnv)(InitializerContext* context) {
+ // Leaked intentionally
+ _multiQueryEnv = new MultiQueryEnv();
+ return Status::OK();
+}
+}
- //
- // Allow fastest config reads for all collections except for those involved in locks and
- // cluster versioning.
- //
+//
+// Per-SCC handling of queries
+//
- if (!internalSCCAllowFastestMetadataConfigReads)
- return false;
+SCCFastQueryHandler::SCCFastQueryHandler() : _queryThreads(1, false) {}
- if (nss.db() != "config")
- return false;
+bool SCCFastQueryHandler::canHandleQuery(const string& ns, Query query) {
+ if (!internalSCCAllowFastestAuthConfigReads && !internalSCCAllowFastestMetadataConfigReads) {
+ return false;
+ }
- if (nss.coll() != "version" && nss.coll() != "locks" && nss.coll() != "lockpings") {
- return true;
- }
+ //
+ // More operations can be added here
+ //
+ // NOTE: Not all operations actually pass through the SCC _queryOnActive path - notable
+ // exceptions include anything related to direct query ops and direct operations for
+ // connection maintenance.
+ //
- return false;
+ NamespaceString nss(ns);
+ if (nss.isCommand()) {
+ BSONObj cmdObj = query.getFilter();
+ string cmdName = cmdObj.firstElement().fieldName();
+ if (cmdName == "usersInfo")
+ return true;
+ } else if (nss.coll() == "system.users") {
+ return true;
}
- static vector<ConnectionString> getHosts(const vector<string> hostStrings) {
+ //
+ // Allow fastest config reads for all collections except for those involved in locks and
+ // cluster versioning.
+ //
- vector<ConnectionString> hosts;
- for (vector<string>::const_iterator it = hostStrings.begin(); it != hostStrings.end();
- ++it) {
+ if (!internalSCCAllowFastestMetadataConfigReads)
+ return false;
- string errMsg;
- ConnectionString host;
- hosts.push_back(ConnectionString::parse(*it, errMsg));
- invariant( hosts.back().type() != ConnectionString::INVALID );
- }
+ if (nss.db() != "config")
+ return false;
- return hosts;
+ if (nss.coll() != "version" && nss.coll() != "locks" && nss.coll() != "lockpings") {
+ return true;
}
- unique_ptr<DBClientCursor> SCCFastQueryHandler::handleQuery(const vector<string>& hostStrings,
- const string& ns,
- Query query,
- int nToReturn,
- int nToSkip,
- const BSONObj* fieldsToReturn,
- int queryOptions,
- int batchSize) {
-
- MultiHostQueryOp queryOp(_multiQueryEnv, &_queryThreads);
-
- QuerySpec querySpec(ns,
- query.obj,
- fieldsToReturn ? *fieldsToReturn : BSONObj(),
- nToSkip,
- nToReturn,
- queryOptions);
-
- // TODO: Timeout must be passed down here as well - 30s timeout may not be applicable for
- // all operations handled.
- StatusWith<DBClientCursor*> status = queryOp.queryAny(getHosts(hostStrings),
- querySpec,
- 30 * 1000);
- uassertStatusOK(status.getStatus());
-
- unique_ptr<DBClientCursor> cursor(status.getValue());
- return cursor;
+ return false;
+}
+
+static vector<ConnectionString> getHosts(const vector<string> hostStrings) {
+ vector<ConnectionString> hosts;
+ for (vector<string>::const_iterator it = hostStrings.begin(); it != hostStrings.end(); ++it) {
+ string errMsg;
+ ConnectionString host;
+ hosts.push_back(ConnectionString::parse(*it, errMsg));
+ invariant(hosts.back().type() != ConnectionString::INVALID);
}
+ return hosts;
}
+unique_ptr<DBClientCursor> SCCFastQueryHandler::handleQuery(const vector<string>& hostStrings,
+ const string& ns,
+ Query query,
+ int nToReturn,
+ int nToSkip,
+ const BSONObj* fieldsToReturn,
+ int queryOptions,
+ int batchSize) {
+ MultiHostQueryOp queryOp(_multiQueryEnv, &_queryThreads);
+
+ QuerySpec querySpec(ns,
+ query.obj,
+ fieldsToReturn ? *fieldsToReturn : BSONObj(),
+ nToSkip,
+ nToReturn,
+ queryOptions);
+
+ // TODO: Timeout must be passed down here as well - 30s timeout may not be applicable for
+ // all operations handled.
+ StatusWith<DBClientCursor*> status =
+ queryOp.queryAny(getHosts(hostStrings), querySpec, 30 * 1000);
+ uassertStatusOK(status.getStatus());
+
+ unique_ptr<DBClientCursor> cursor(status.getValue());
+ return cursor;
+}
+}