diff options
Diffstat (limited to 'src/mongo/s/client/scc_fast_query_handler.cpp')
-rw-r--r-- | src/mongo/s/client/scc_fast_query_handler.cpp | 324 |
1 files changed, 153 insertions, 171 deletions
diff --git a/src/mongo/s/client/scc_fast_query_handler.cpp b/src/mongo/s/client/scc_fast_query_handler.cpp index f09dd767a21..20a8f1a0193 100644 --- a/src/mongo/s/client/scc_fast_query_handler.cpp +++ b/src/mongo/s/client/scc_fast_query_handler.cpp @@ -45,214 +45,196 @@ namespace mongo { - using std::unique_ptr; - using std::endl; - using std::string; - using std::vector; - - /** - * This parameter turns on fastest config reads for auth data only - *.system.users collections - * and the "usersInfo" command. This should be enough to prevent a non-responsive config from - * hanging other operations in a cluster. - */ - MONGO_EXPORT_SERVER_PARAMETER(internalSCCAllowFastestAuthConfigReads, bool, false); - - /** - * TESTING ONLY. - * - * This parameter turns on fastest config reads for *all* config.* collections except those - * deemed extremely critical (config.version, config.locks). - * - * NOT FOR PRODUCTION USE. - */ - MONGO_EXPORT_SERVER_PARAMETER(internalSCCAllowFastestMetadataConfigReads, bool, false); +using std::unique_ptr; +using std::endl; +using std::string; +using std::vector; - // - // The shared environment for MultiHostQueries - // - - namespace { - - class MultiQueryEnv : public MultiHostQueryOp::SystemEnv { - public: - - virtual ~MultiQueryEnv() { - } - - Date_t currentTimeMillis(); +/** + * This parameter turns on fastest config reads for auth data only - *.system.users collections + * and the "usersInfo" command. This should be enough to prevent a non-responsive config from + * hanging other operations in a cluster. + */ +MONGO_EXPORT_SERVER_PARAMETER(internalSCCAllowFastestAuthConfigReads, bool, false); - StatusWith<DBClientCursor*> doBlockingQuery(const ConnectionString& host, - const QuerySpec& query); - }; +/** + * TESTING ONLY. + * + * This parameter turns on fastest config reads for *all* config.* collections except those + * deemed extremely critical (config.version, config.locks). + * + * NOT FOR PRODUCTION USE. + */ +MONGO_EXPORT_SERVER_PARAMETER(internalSCCAllowFastestMetadataConfigReads, bool, false); - StatusWith<DBClientCursor*> MultiQueryEnv::doBlockingQuery(const ConnectionString& host, - const QuerySpec& query) { +// +// The shared environment for MultiHostQueries +// - // - // Note that this function may be active during shutdown. This means that we must - // handle connection pool shutdown exceptions (uasserts). The results of this - // operation must then be correctly discarded by the calling thread. - // +namespace { - unique_ptr<DBClientCursor> cursor; +class MultiQueryEnv : public MultiHostQueryOp::SystemEnv { +public: + virtual ~MultiQueryEnv() {} - try { + Date_t currentTimeMillis(); - ScopedDbConnection conn(host, 30.0 /* timeout secs */); + StatusWith<DBClientCursor*> doBlockingQuery(const ConnectionString& host, + const QuerySpec& query); +}; - cursor = conn->query(query.ns(), - query.filter(), - query.ntoreturn(), - query.ntoskip(), - query.fieldsPtr(), - query.options(), - 0); +StatusWith<DBClientCursor*> MultiQueryEnv::doBlockingQuery(const ConnectionString& host, + const QuerySpec& query) { + // + // Note that this function may be active during shutdown. This means that we must + // handle connection pool shutdown exceptions (uasserts). The results of this + // operation must then be correctly discarded by the calling thread. + // - if ( NULL == cursor.get()) { + unique_ptr<DBClientCursor> cursor; - // Confusingly, exceptions here are written to the log, not thrown + try { + ScopedDbConnection conn(host, 30.0 /* timeout secs */); - StringBuilder builder; - builder << "error querying server " << host.toString() - << ", could not create cursor for query"; + cursor = conn->query(query.ns(), + query.filter(), + query.ntoreturn(), + query.ntoskip(), + query.fieldsPtr(), + query.options(), + 0); - warning() << builder.str() << endl; - return StatusWith<DBClientCursor*>(ErrorCodes::HostUnreachable, builder.str()); - } + if (NULL == cursor.get()) { + // Confusingly, exceptions here are written to the log, not thrown - // Confusingly, this *detaches* the cursor from the connection it was established - // on, further getMore calls will use a (potentially different) ScopedDbConnection. - // - // Calls done() too. - cursor->attach(&conn); - } - catch (const DBException& ex) { - return StatusWith<DBClientCursor*>(ex.toStatus()); - } + StringBuilder builder; + builder << "error querying server " << host.toString() + << ", could not create cursor for query"; - return StatusWith<DBClientCursor*>(cursor.release()); + warning() << builder.str() << endl; + return StatusWith<DBClientCursor*>(ErrorCodes::HostUnreachable, builder.str()); } - Date_t MultiQueryEnv::currentTimeMillis() { - return jsTime(); - } - } - - // Shared networking environment which executes queries. - // NOTE: This environment must stay in scope as long as per-host threads are executing queries - - // i.e. for the lifetime of the server. - // Once the thread pools are disposed and connections shut down, the per-host threads should - // be self-contained and correctly shut down after discarding the results. - static MultiQueryEnv* _multiQueryEnv; - - namespace { - + // Confusingly, this *detaches* the cursor from the connection it was established + // on, further getMore calls will use a (potentially different) ScopedDbConnection. // - // Create the shared multi-query environment at startup - // - - MONGO_INITIALIZER(InitMultiQueryEnv)(InitializerContext* context) { - // Leaked intentionally - _multiQueryEnv = new MultiQueryEnv(); - return Status::OK(); - } + // Calls done() too. + cursor->attach(&conn); + } catch (const DBException& ex) { + return StatusWith<DBClientCursor*>(ex.toStatus()); } - // - // Per-SCC handling of queries - // + return StatusWith<DBClientCursor*>(cursor.release()); +} - SCCFastQueryHandler::SCCFastQueryHandler() : - _queryThreads(1, false) { - } +Date_t MultiQueryEnv::currentTimeMillis() { + return jsTime(); +} +} - bool SCCFastQueryHandler::canHandleQuery(const string& ns, Query query) { +// Shared networking environment which executes queries. +// NOTE: This environment must stay in scope as long as per-host threads are executing queries - +// i.e. for the lifetime of the server. +// Once the thread pools are disposed and connections shut down, the per-host threads should +// be self-contained and correctly shut down after discarding the results. +static MultiQueryEnv* _multiQueryEnv; - if (!internalSCCAllowFastestAuthConfigReads && - !internalSCCAllowFastestMetadataConfigReads) { - return false; - } +namespace { - // - // More operations can be added here - // - // NOTE: Not all operations actually pass through the SCC _queryOnActive path - notable - // exceptions include anything related to direct query ops and direct operations for - // connection maintenance. - // +// +// Create the shared multi-query environment at startup +// - NamespaceString nss(ns); - if (nss.isCommand()) { - BSONObj cmdObj = query.getFilter(); - string cmdName = cmdObj.firstElement().fieldName(); - if (cmdName == "usersInfo") - return true; - } - else if (nss.coll() == "system.users") { - return true; - } +MONGO_INITIALIZER(InitMultiQueryEnv)(InitializerContext* context) { + // Leaked intentionally + _multiQueryEnv = new MultiQueryEnv(); + return Status::OK(); +} +} - // - // Allow fastest config reads for all collections except for those involved in locks and - // cluster versioning. - // +// +// Per-SCC handling of queries +// - if (!internalSCCAllowFastestMetadataConfigReads) - return false; +SCCFastQueryHandler::SCCFastQueryHandler() : _queryThreads(1, false) {} - if (nss.db() != "config") - return false; +bool SCCFastQueryHandler::canHandleQuery(const string& ns, Query query) { + if (!internalSCCAllowFastestAuthConfigReads && !internalSCCAllowFastestMetadataConfigReads) { + return false; + } - if (nss.coll() != "version" && nss.coll() != "locks" && nss.coll() != "lockpings") { - return true; - } + // + // More operations can be added here + // + // NOTE: Not all operations actually pass through the SCC _queryOnActive path - notable + // exceptions include anything related to direct query ops and direct operations for + // connection maintenance. + // - return false; + NamespaceString nss(ns); + if (nss.isCommand()) { + BSONObj cmdObj = query.getFilter(); + string cmdName = cmdObj.firstElement().fieldName(); + if (cmdName == "usersInfo") + return true; + } else if (nss.coll() == "system.users") { + return true; } - static vector<ConnectionString> getHosts(const vector<string> hostStrings) { + // + // Allow fastest config reads for all collections except for those involved in locks and + // cluster versioning. + // - vector<ConnectionString> hosts; - for (vector<string>::const_iterator it = hostStrings.begin(); it != hostStrings.end(); - ++it) { + if (!internalSCCAllowFastestMetadataConfigReads) + return false; - string errMsg; - ConnectionString host; - hosts.push_back(ConnectionString::parse(*it, errMsg)); - invariant( hosts.back().type() != ConnectionString::INVALID ); - } + if (nss.db() != "config") + return false; - return hosts; + if (nss.coll() != "version" && nss.coll() != "locks" && nss.coll() != "lockpings") { + return true; } - unique_ptr<DBClientCursor> SCCFastQueryHandler::handleQuery(const vector<string>& hostStrings, - const string& ns, - Query query, - int nToReturn, - int nToSkip, - const BSONObj* fieldsToReturn, - int queryOptions, - int batchSize) { - - MultiHostQueryOp queryOp(_multiQueryEnv, &_queryThreads); - - QuerySpec querySpec(ns, - query.obj, - fieldsToReturn ? *fieldsToReturn : BSONObj(), - nToSkip, - nToReturn, - queryOptions); - - // TODO: Timeout must be passed down here as well - 30s timeout may not be applicable for - // all operations handled. - StatusWith<DBClientCursor*> status = queryOp.queryAny(getHosts(hostStrings), - querySpec, - 30 * 1000); - uassertStatusOK(status.getStatus()); - - unique_ptr<DBClientCursor> cursor(status.getValue()); - return cursor; + return false; +} + +static vector<ConnectionString> getHosts(const vector<string> hostStrings) { + vector<ConnectionString> hosts; + for (vector<string>::const_iterator it = hostStrings.begin(); it != hostStrings.end(); ++it) { + string errMsg; + ConnectionString host; + hosts.push_back(ConnectionString::parse(*it, errMsg)); + invariant(hosts.back().type() != ConnectionString::INVALID); } + return hosts; } +unique_ptr<DBClientCursor> SCCFastQueryHandler::handleQuery(const vector<string>& hostStrings, + const string& ns, + Query query, + int nToReturn, + int nToSkip, + const BSONObj* fieldsToReturn, + int queryOptions, + int batchSize) { + MultiHostQueryOp queryOp(_multiQueryEnv, &_queryThreads); + + QuerySpec querySpec(ns, + query.obj, + fieldsToReturn ? *fieldsToReturn : BSONObj(), + nToSkip, + nToReturn, + queryOptions); + + // TODO: Timeout must be passed down here as well - 30s timeout may not be applicable for + // all operations handled. + StatusWith<DBClientCursor*> status = + queryOp.queryAny(getHosts(hostStrings), querySpec, 30 * 1000); + uassertStatusOK(status.getStatus()); + + unique_ptr<DBClientCursor> cursor(status.getValue()); + return cursor; +} +} |