diff options
author | Greg Studer <greg@10gen.com> | 2014-04-28 16:54:17 -0400 |
---|---|---|
committer | Greg Studer <greg@10gen.com> | 2014-05-15 14:23:15 -0400 |
commit | 945ed48cc77ecbc97f7fdc6f7a06c8968a7a14c5 (patch) | |
tree | eb08e75a0bfd47649c426541b1fa02b886455089 | |
parent | fef6805061b31e1c6269a438c1922f17db72213b (diff) | |
download | mongo-945ed48cc77ecbc97f7fdc6f7a06c8968a7a14c5.tar.gz |
SERVER-11332 hookup of fastest query to SyncClusterConnection
(cherry picked from commit d2e4b7d17a8b4a406f053e39f692f394d66e6b11)
-rw-r--r-- | src/mongo/client/syncclusterconnection.cpp | 19 | ||||
-rw-r--r-- | src/mongo/client/syncclusterconnection.h | 46 | ||||
-rw-r--r-- | src/mongo/s/SConscript | 3 | ||||
-rw-r--r-- | src/mongo/s/scc_fast_query_handler.cpp | 248 | ||||
-rw-r--r-- | src/mongo/s/scc_fast_query_handler.h | 76 | ||||
-rw-r--r-- | src/mongo/s/shard.cpp | 10 |
6 files changed, 399 insertions, 3 deletions
diff --git a/src/mongo/client/syncclusterconnection.cpp b/src/mongo/client/syncclusterconnection.cpp index b3a56d02ad6..a4324a3376b 100644 --- a/src/mongo/client/syncclusterconnection.cpp +++ b/src/mongo/client/syncclusterconnection.cpp @@ -291,9 +291,28 @@ namespace mongo { return isOk( info ); } + void SyncClusterConnection::attachQueryHandler( QueryHandler* handler ) { + _customQueryHandler.reset( handler ); + } + auto_ptr<DBClientCursor> SyncClusterConnection::_queryOnActive(const string &ns, Query query, int nToReturn, int nToSkip, const BSONObj *fieldsToReturn, int queryOptions, int batchSize ) { + if ( _customQueryHandler && _customQueryHandler->canHandleQuery( ns, query ) ) { + + LOG( 2 ) << "custom query handler used for query on " << ns << ": " + << query.toString() << endl; + + return _customQueryHandler->handleQuery( _connAddresses, + ns, + query, + nToReturn, + nToSkip, + fieldsToReturn, + queryOptions, + batchSize ); + } + for ( size_t i=0; i<_conns.size(); i++ ) { try { auto_ptr<DBClientCursor> cursor = diff --git a/src/mongo/client/syncclusterconnection.h b/src/mongo/client/syncclusterconnection.h index 792e5bdeac8..0efd49fdae1 100644 --- a/src/mongo/client/syncclusterconnection.h +++ b/src/mongo/client/syncclusterconnection.h @@ -47,6 +47,8 @@ namespace mongo { using DBClientBase::update; using DBClientBase::remove; + class QueryHandler; + /** * @param commaSeparated should be 3 hosts comma separated */ @@ -116,6 +118,13 @@ namespace mongo { virtual void setRunCommandHook(DBClientWithCommands::RunCommandHookFunc func); virtual void setPostRunCommandHook(DBClientWithCommands::PostRunCommandHookFunc func); + /** + * Allow custom query processing through an external (e.g. mongos-only) service. + * + * Takes ownership of attached handler. + */ + void attachQueryHandler( QueryHandler* handler ); + protected: virtual void _auth(const BSONObj& params); @@ -132,14 +141,47 @@ namespace mongo { string _address; vector<string> _connAddresses; vector<DBClientConnection*> _conns; - map<string,int> _lockTypes; - mongo::mutex _mutex; vector<BSONObj> _lastErrors; + // Optionally attached by user + scoped_ptr<QueryHandler> _customQueryHandler; + + mongo::mutex _mutex; + map<string,int> _lockTypes; + // End mutex + double _socketTimeout; }; + /** + * Interface for custom query processing for the SCC. + * Allows plugging different host query behaviors for different types of queries. + */ + class SyncClusterConnection::QueryHandler { + public: + + virtual ~QueryHandler() {}; + + /** + * Returns true if the query can be processed using this handler. + */ + virtual bool canHandleQuery( const string& ns, Query query ) = 0; + + /** + * Returns a cursor on one of the hosts with the desired results for the query. + * May throw or return an empty auto_ptr on failure. + */ + virtual auto_ptr<DBClientCursor> handleQuery( const vector<string>& hosts, + const string &ns, + Query query, + int nToReturn, + int nToSkip, + const BSONObj *fieldsToReturn, + int queryOptions, + int batchSize ) = 0; + }; + class MONGO_CLIENT_API UpdateNotTheSame : public UserException { public: UpdateNotTheSame( int code , const string& msg , const vector<string>& addrs , const vector<BSONObj>& lastErrors ) diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript index f8119eb56ba..ee756c9fd9d 100644 --- a/src/mongo/s/SConscript +++ b/src/mongo/s/SConscript @@ -345,7 +345,8 @@ env.Library( 'cluster_write.cpp', 'dbclient_multi_command.cpp', 'dbclient_shard_resolver.cpp', - 'write_ops/dbclient_safe_writer.cpp' + 'scc_fast_query_handler.cpp', + 'write_ops/dbclient_safe_writer.cpp', ], LIBDEPS=[ '$BUILD_DIR/mongo/bson', diff --git a/src/mongo/s/scc_fast_query_handler.cpp b/src/mongo/s/scc_fast_query_handler.cpp new file mode 100644 index 00000000000..86e22927525 --- /dev/null +++ b/src/mongo/s/scc_fast_query_handler.cpp @@ -0,0 +1,248 @@ +/** + * Copyright (C) 2014 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/s/scc_fast_query_handler.h" + +#include <vector> + +#include "mongo/base/init.h" +#include "mongo/bson/util/builder.h" +#include "mongo/client/connpool.h" +#include "mongo/client/dbclientcursor.h" +#include "mongo/client/dbclientinterface.h" +#include "mongo/db/namespace_string.h" +#include "mongo/db/server_parameters.h" + +namespace mongo { + + /** + * This parameter turns on fastest config reads for auth data only - *.system.users collections + * and the "usersInfo" command. This should be enough to prevent a non-responsive config from + * hanging other operations in a cluster. + */ + MONGO_EXPORT_SERVER_PARAMETER(internalSCCAllowFastestAuthConfigReads, bool, false); + + /** + * TESTING ONLY. + * + * This parameter turns on fastest config reads for *all* config.* collections except those + * deemed extremely critical (config.version, config.locks). + * + * NOT FOR PRODUCTION USE. + */ + MONGO_EXPORT_SERVER_PARAMETER(internalSCCAllowFastestMetadataConfigReads, bool, false); + + // + // The shared environment for MultiHostQueries + // + + namespace { + + class MultiQueryEnv : public MultiHostQueryOp::SystemEnv { + public: + + virtual ~MultiQueryEnv() { + } + + Date_t currentTimeMillis(); + + StatusWith<DBClientCursor*> doBlockingQuery(const ConnectionString& host, + const QuerySpec& query); + }; + + StatusWith<DBClientCursor*> MultiQueryEnv::doBlockingQuery(const ConnectionString& host, + const QuerySpec& query) { + + // + // Note that this function may be active during shutdown. This means that we must + // handle connection pool shutdown exceptions (uasserts). The results of this + // operation must then be correctly discarded by the calling thread. + // + + auto_ptr<DBClientCursor> cursor; + + try { + + ScopedDbConnection conn(host, 30.0 /* timeout secs */); + + cursor = conn->query(query.ns(), + query.filter(), + query.ntoreturn(), + query.ntoskip(), + query.fieldsPtr(), + query.options(), + 0); + + if ( NULL == cursor.get()) { + + // Confusingly, exceptions here are written to the log, not thrown + + StringBuilder builder; + builder << "error querying server " << host.toString() + << ", could not create cursor for query"; + + warning() << builder.str() << endl; + return StatusWith<DBClientCursor*>(ErrorCodes::HostUnreachable, builder.str()); + } + + // Confusingly, this *detaches* the cursor from the connection it was established + // on, further getMore calls will use a (potentially different) ScopedDbConnection. + // + // Calls done() too. + cursor->attach(&conn); + } + catch (const DBException& ex) { + return StatusWith<DBClientCursor*>(ex.toStatus()); + } + + return StatusWith<DBClientCursor*>(cursor.release()); + } + + Date_t MultiQueryEnv::currentTimeMillis() { + return jsTime(); + } + } + + // Shared networking environment which executes queries. + // NOTE: This environment must stay in scope as long as per-host threads are executing queries - + // i.e. for the lifetime of the server. + // Once the thread pools are disposed and connections shut down, the per-host threads should + // be self-contained and correctly shut down after discarding the results. + static MultiQueryEnv* _multiQueryEnv; + + namespace { + + // + // Create the shared multi-query environment at startup + // + + MONGO_INITIALIZER(InitMultiQueryEnv)(InitializerContext* context) { + // Leaked intentionally + _multiQueryEnv = new MultiQueryEnv(); + return Status::OK(); + } + } + + // + // Per-SCC handling of queries + // + + SCCFastQueryHandler::SCCFastQueryHandler() : + _queryThreads(1, false) { + } + + bool SCCFastQueryHandler::canHandleQuery(const string& ns, Query query) { + + if (!internalSCCAllowFastestAuthConfigReads && + !internalSCCAllowFastestMetadataConfigReads) { + return false; + } + + // + // More operations can be added here + // + // NOTE: Not all operations actually pass through the SCC _queryOnActive path - notable + // exceptions include anything related to direct query ops and direct operations for + // connection maintenance. + // + + NamespaceString nss(ns); + if (nss.isCommand()) { + BSONObj cmdObj = query.getFilter(); + string cmdName = cmdObj.firstElement().fieldName(); + if (cmdName == "usersInfo") + return true; + } + else if (nss.coll() == "system.users") { + return true; + } + + // + // Allow fastest config reads for all collections except for those involved in locks and + // cluster versioning. + // + + if (!internalSCCAllowFastestMetadataConfigReads) + return false; + + if (nss.db() != "config") + return false; + + if (nss.coll() != "version" && nss.coll() != "locks" && nss.coll() != "lockpings") { + return true; + } + + return false; + } + + static vector<ConnectionString> getHosts(const vector<string> hostStrings) { + + vector<ConnectionString> hosts; + for (vector<string>::const_iterator it = hostStrings.begin(); it != hostStrings.end(); + ++it) { + + string errMsg; + ConnectionString host; + hosts.push_back(ConnectionString::parse(*it, errMsg)); + invariant( hosts.back().type() != ConnectionString::INVALID ); + } + + return hosts; + } + + auto_ptr<DBClientCursor> SCCFastQueryHandler::handleQuery(const vector<string>& hostStrings, + const string& ns, + Query query, + int nToReturn, + int nToSkip, + const BSONObj* fieldsToReturn, + int queryOptions, + int batchSize) { + + MultiHostQueryOp queryOp(_multiQueryEnv, &_queryThreads); + + QuerySpec querySpec(ns, + query.obj, + fieldsToReturn ? *fieldsToReturn : BSONObj(), + nToSkip, + nToReturn, + queryOptions); + + // TODO: Timeout must be passed down here as well - 30s timeout may not be applicable for + // all operations handled. + StatusWith<DBClientCursor*> status = queryOp.queryAny(getHosts(hostStrings), + querySpec, + 30 * 1000); + uassertStatusOK(status.getStatus()); + + auto_ptr<DBClientCursor> cursor(status.getValue()); + return cursor; + } + +} + diff --git a/src/mongo/s/scc_fast_query_handler.h b/src/mongo/s/scc_fast_query_handler.h new file mode 100644 index 00000000000..76c81dab6c8 --- /dev/null +++ b/src/mongo/s/scc_fast_query_handler.h @@ -0,0 +1,76 @@ +/** + * Copyright (C) 2014 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/client/syncclusterconnection.h" +#include "mongo/s/multi_host_query.h" + +namespace mongo { + + /** + * Query handler that plugs in to a SyncClusterConnection and allows query on fastest host + * (if enabled). + * + * Glue code which shields the MultiHostQuery and server parameters from the separate client + * module which knows about neither. + * + * There is a *single* SCCFastQueryHandler for every SCC. Each SCCFastQueryHandler contains + * its own thread pool (lazily initialized) so that at maximum there is a thread-per-SCC-host + * and this thread may have an open connection to the host until it times out. + * If using the "fastestConfigReads" options, clients must be ready for the additional thread + * and connection load when configs are slow. + */ + class SCCFastQueryHandler : public SyncClusterConnection::QueryHandler { + public: + + SCCFastQueryHandler(); + + virtual ~SCCFastQueryHandler() { + } + + virtual bool canHandleQuery(const string& ns, Query query); + + virtual auto_ptr<DBClientCursor> handleQuery(const vector<string>& hostStrings, + const string &ns, + Query query, + int nToReturn, + int nToSkip, + const BSONObj *fieldsToReturn, + int queryOptions, + int batchSize); + + private: + + // The thread pool itself is scoped to the handler and SCC, and lazily creates threads + // per-host as needed. This ensures query starvation cannot occur due to other active + // client threads - though a thread must be created for every client. + HostThreadPools _queryThreads; + }; + +} diff --git a/src/mongo/s/shard.cpp b/src/mongo/s/shard.cpp index 0d86a24a496..2a319765724 100644 --- a/src/mongo/s/shard.cpp +++ b/src/mongo/s/shard.cpp @@ -49,6 +49,7 @@ #include "mongo/s/client_info.h" #include "mongo/s/config.h" #include "mongo/s/request.h" +#include "mongo/s/scc_fast_query_handler.h" #include "mongo/s/type_shard.h" #include "mongo/s/version_manager.h" @@ -497,6 +498,15 @@ namespace mongo { // to the end of every runCommand. mongod uses this information to produce auditing // records attributed to the proper authenticated user(s). conn->setRunCommandHook(boost::bind(&audit::appendImpersonatedUsers, _1)); + + // For every SCC created, add a hook that will allow fastest-config-first config reads if + // the appropriate server options are set. + if ( conn->type() == ConnectionString::SYNC ) { + SyncClusterConnection* scc = dynamic_cast<SyncClusterConnection*>( conn ); + if ( scc ) { + scc->attachQueryHandler( new SCCFastQueryHandler ); + } + } } void ShardingConnectionHook::onDestroy( DBClientBase * conn ) { |