summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/client/syncclusterconnection.cpp19
-rw-r--r--src/mongo/client/syncclusterconnection.h46
-rw-r--r--src/mongo/s/SConscript3
-rw-r--r--src/mongo/s/scc_fast_query_handler.cpp248
-rw-r--r--src/mongo/s/scc_fast_query_handler.h76
-rw-r--r--src/mongo/s/shard.cpp10
6 files changed, 399 insertions, 3 deletions
diff --git a/src/mongo/client/syncclusterconnection.cpp b/src/mongo/client/syncclusterconnection.cpp
index b3a56d02ad6..a4324a3376b 100644
--- a/src/mongo/client/syncclusterconnection.cpp
+++ b/src/mongo/client/syncclusterconnection.cpp
@@ -291,9 +291,28 @@ namespace mongo {
return isOk( info );
}
+ void SyncClusterConnection::attachQueryHandler( QueryHandler* handler ) {
+ _customQueryHandler.reset( handler );
+ }
+
auto_ptr<DBClientCursor> SyncClusterConnection::_queryOnActive(const string &ns, Query query, int nToReturn, int nToSkip,
const BSONObj *fieldsToReturn, int queryOptions, int batchSize ) {
+ if ( _customQueryHandler && _customQueryHandler->canHandleQuery( ns, query ) ) {
+
+ LOG( 2 ) << "custom query handler used for query on " << ns << ": "
+ << query.toString() << endl;
+
+ return _customQueryHandler->handleQuery( _connAddresses,
+ ns,
+ query,
+ nToReturn,
+ nToSkip,
+ fieldsToReturn,
+ queryOptions,
+ batchSize );
+ }
+
for ( size_t i=0; i<_conns.size(); i++ ) {
try {
auto_ptr<DBClientCursor> cursor =
diff --git a/src/mongo/client/syncclusterconnection.h b/src/mongo/client/syncclusterconnection.h
index 792e5bdeac8..0efd49fdae1 100644
--- a/src/mongo/client/syncclusterconnection.h
+++ b/src/mongo/client/syncclusterconnection.h
@@ -47,6 +47,8 @@ namespace mongo {
using DBClientBase::update;
using DBClientBase::remove;
+ class QueryHandler;
+
/**
* @param commaSeparated should be 3 hosts comma separated
*/
@@ -116,6 +118,13 @@ namespace mongo {
virtual void setRunCommandHook(DBClientWithCommands::RunCommandHookFunc func);
virtual void setPostRunCommandHook(DBClientWithCommands::PostRunCommandHookFunc func);
+ /**
+ * Allow custom query processing through an external (e.g. mongos-only) service.
+ *
+ * Takes ownership of attached handler.
+ */
+ void attachQueryHandler( QueryHandler* handler );
+
protected:
virtual void _auth(const BSONObj& params);
@@ -132,14 +141,47 @@ namespace mongo {
string _address;
vector<string> _connAddresses;
vector<DBClientConnection*> _conns;
- map<string,int> _lockTypes;
- mongo::mutex _mutex;
vector<BSONObj> _lastErrors;
+ // Optionally attached by user
+ scoped_ptr<QueryHandler> _customQueryHandler;
+
+ mongo::mutex _mutex;
+ map<string,int> _lockTypes;
+ // End mutex
+
double _socketTimeout;
};
+ /**
+ * Interface for custom query processing for the SCC.
+ * Allows plugging different host query behaviors for different types of queries.
+ */
+ class SyncClusterConnection::QueryHandler {
+ public:
+
+ virtual ~QueryHandler() {};
+
+ /**
+ * Returns true if the query can be processed using this handler.
+ */
+ virtual bool canHandleQuery( const string& ns, Query query ) = 0;
+
+ /**
+ * Returns a cursor on one of the hosts with the desired results for the query.
+ * May throw or return an empty auto_ptr on failure.
+ */
+ virtual auto_ptr<DBClientCursor> handleQuery( const vector<string>& hosts,
+ const string &ns,
+ Query query,
+ int nToReturn,
+ int nToSkip,
+ const BSONObj *fieldsToReturn,
+ int queryOptions,
+ int batchSize ) = 0;
+ };
+
class MONGO_CLIENT_API UpdateNotTheSame : public UserException {
public:
UpdateNotTheSame( int code , const string& msg , const vector<string>& addrs , const vector<BSONObj>& lastErrors )
diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript
index 09d30c9169a..e8cae53a9df 100644
--- a/src/mongo/s/SConscript
+++ b/src/mongo/s/SConscript
@@ -345,7 +345,8 @@ env.Library(
'cluster_write.cpp',
'dbclient_multi_command.cpp',
'dbclient_shard_resolver.cpp',
- 'write_ops/dbclient_safe_writer.cpp'
+ 'scc_fast_query_handler.cpp',
+ 'write_ops/dbclient_safe_writer.cpp',
],
LIBDEPS=[
'$BUILD_DIR/mongo/bson',
diff --git a/src/mongo/s/scc_fast_query_handler.cpp b/src/mongo/s/scc_fast_query_handler.cpp
new file mode 100644
index 00000000000..86e22927525
--- /dev/null
+++ b/src/mongo/s/scc_fast_query_handler.cpp
@@ -0,0 +1,248 @@
+/**
+ * Copyright (C) 2014 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/s/scc_fast_query_handler.h"
+
+#include <vector>
+
+#include "mongo/base/init.h"
+#include "mongo/bson/util/builder.h"
+#include "mongo/client/connpool.h"
+#include "mongo/client/dbclientcursor.h"
+#include "mongo/client/dbclientinterface.h"
+#include "mongo/db/namespace_string.h"
+#include "mongo/db/server_parameters.h"
+
+namespace mongo {
+
+ /**
+ * This parameter turns on fastest config reads for auth data only - *.system.users collections
+ * and the "usersInfo" command. This should be enough to prevent a non-responsive config from
+ * hanging other operations in a cluster.
+ */
+ MONGO_EXPORT_SERVER_PARAMETER(internalSCCAllowFastestAuthConfigReads, bool, false);
+
+ /**
+ * TESTING ONLY.
+ *
+ * This parameter turns on fastest config reads for *all* config.* collections except those
+ * deemed extremely critical (config.version, config.locks).
+ *
+ * NOT FOR PRODUCTION USE.
+ */
+ MONGO_EXPORT_SERVER_PARAMETER(internalSCCAllowFastestMetadataConfigReads, bool, false);
+
+ //
+ // The shared environment for MultiHostQueries
+ //
+
+ namespace {
+
+ class MultiQueryEnv : public MultiHostQueryOp::SystemEnv {
+ public:
+
+ virtual ~MultiQueryEnv() {
+ }
+
+ Date_t currentTimeMillis();
+
+ StatusWith<DBClientCursor*> doBlockingQuery(const ConnectionString& host,
+ const QuerySpec& query);
+ };
+
+ StatusWith<DBClientCursor*> MultiQueryEnv::doBlockingQuery(const ConnectionString& host,
+ const QuerySpec& query) {
+
+ //
+ // Note that this function may be active during shutdown. This means that we must
+ // handle connection pool shutdown exceptions (uasserts). The results of this
+ // operation must then be correctly discarded by the calling thread.
+ //
+
+ auto_ptr<DBClientCursor> cursor;
+
+ try {
+
+ ScopedDbConnection conn(host, 30.0 /* timeout secs */);
+
+ cursor = conn->query(query.ns(),
+ query.filter(),
+ query.ntoreturn(),
+ query.ntoskip(),
+ query.fieldsPtr(),
+ query.options(),
+ 0);
+
+ if ( NULL == cursor.get()) {
+
+ // Confusingly, exceptions here are written to the log, not thrown
+
+ StringBuilder builder;
+ builder << "error querying server " << host.toString()
+ << ", could not create cursor for query";
+
+ warning() << builder.str() << endl;
+ return StatusWith<DBClientCursor*>(ErrorCodes::HostUnreachable, builder.str());
+ }
+
+ // Confusingly, this *detaches* the cursor from the connection it was established
+ // on, further getMore calls will use a (potentially different) ScopedDbConnection.
+ //
+ // Calls done() too.
+ cursor->attach(&conn);
+ }
+ catch (const DBException& ex) {
+ return StatusWith<DBClientCursor*>(ex.toStatus());
+ }
+
+ return StatusWith<DBClientCursor*>(cursor.release());
+ }
+
+ Date_t MultiQueryEnv::currentTimeMillis() {
+ return jsTime();
+ }
+ }
+
+ // Shared networking environment which executes queries.
+ // NOTE: This environment must stay in scope as long as per-host threads are executing queries -
+ // i.e. for the lifetime of the server.
+ // Once the thread pools are disposed and connections shut down, the per-host threads should
+ // be self-contained and correctly shut down after discarding the results.
+ static MultiQueryEnv* _multiQueryEnv;
+
+ namespace {
+
+ //
+ // Create the shared multi-query environment at startup
+ //
+
+ MONGO_INITIALIZER(InitMultiQueryEnv)(InitializerContext* context) {
+ // Leaked intentionally
+ _multiQueryEnv = new MultiQueryEnv();
+ return Status::OK();
+ }
+ }
+
+ //
+ // Per-SCC handling of queries
+ //
+
+ SCCFastQueryHandler::SCCFastQueryHandler() :
+ _queryThreads(1, false) {
+ }
+
+ bool SCCFastQueryHandler::canHandleQuery(const string& ns, Query query) {
+
+ if (!internalSCCAllowFastestAuthConfigReads &&
+ !internalSCCAllowFastestMetadataConfigReads) {
+ return false;
+ }
+
+ //
+ // More operations can be added here
+ //
+ // NOTE: Not all operations actually pass through the SCC _queryOnActive path - notable
+ // exceptions include anything related to direct query ops and direct operations for
+ // connection maintenance.
+ //
+
+ NamespaceString nss(ns);
+ if (nss.isCommand()) {
+ BSONObj cmdObj = query.getFilter();
+ string cmdName = cmdObj.firstElement().fieldName();
+ if (cmdName == "usersInfo")
+ return true;
+ }
+ else if (nss.coll() == "system.users") {
+ return true;
+ }
+
+ //
+ // Allow fastest config reads for all collections except for those involved in locks and
+ // cluster versioning.
+ //
+
+ if (!internalSCCAllowFastestMetadataConfigReads)
+ return false;
+
+ if (nss.db() != "config")
+ return false;
+
+ if (nss.coll() != "version" && nss.coll() != "locks" && nss.coll() != "lockpings") {
+ return true;
+ }
+
+ return false;
+ }
+
+ static vector<ConnectionString> getHosts(const vector<string> hostStrings) {
+
+ vector<ConnectionString> hosts;
+ for (vector<string>::const_iterator it = hostStrings.begin(); it != hostStrings.end();
+ ++it) {
+
+ string errMsg;
+ ConnectionString host;
+ hosts.push_back(ConnectionString::parse(*it, errMsg));
+ invariant( hosts.back().type() != ConnectionString::INVALID );
+ }
+
+ return hosts;
+ }
+
+ auto_ptr<DBClientCursor> SCCFastQueryHandler::handleQuery(const vector<string>& hostStrings,
+ const string& ns,
+ Query query,
+ int nToReturn,
+ int nToSkip,
+ const BSONObj* fieldsToReturn,
+ int queryOptions,
+ int batchSize) {
+
+ MultiHostQueryOp queryOp(_multiQueryEnv, &_queryThreads);
+
+ QuerySpec querySpec(ns,
+ query.obj,
+ fieldsToReturn ? *fieldsToReturn : BSONObj(),
+ nToSkip,
+ nToReturn,
+ queryOptions);
+
+ // TODO: Timeout must be passed down here as well - 30s timeout may not be applicable for
+ // all operations handled.
+ StatusWith<DBClientCursor*> status = queryOp.queryAny(getHosts(hostStrings),
+ querySpec,
+ 30 * 1000);
+ uassertStatusOK(status.getStatus());
+
+ auto_ptr<DBClientCursor> cursor(status.getValue());
+ return cursor;
+ }
+
+}
+
diff --git a/src/mongo/s/scc_fast_query_handler.h b/src/mongo/s/scc_fast_query_handler.h
new file mode 100644
index 00000000000..76c81dab6c8
--- /dev/null
+++ b/src/mongo/s/scc_fast_query_handler.h
@@ -0,0 +1,76 @@
+/**
+ * Copyright (C) 2014 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/client/syncclusterconnection.h"
+#include "mongo/s/multi_host_query.h"
+
+namespace mongo {
+
+ /**
+ * Query handler that plugs in to a SyncClusterConnection and allows query on fastest host
+ * (if enabled).
+ *
+ * Glue code which shields the MultiHostQuery and server parameters from the separate client
+ * module which knows about neither.
+ *
+ * There is a *single* SCCFastQueryHandler for every SCC. Each SCCFastQueryHandler contains
+ * its own thread pool (lazily initialized) so that at maximum there is a thread-per-SCC-host
+ * and this thread may have an open connection to the host until it times out.
+ * If using the "fastestConfigReads" options, clients must be ready for the additional thread
+ * and connection load when configs are slow.
+ */
+ class SCCFastQueryHandler : public SyncClusterConnection::QueryHandler {
+ public:
+
+ SCCFastQueryHandler();
+
+ virtual ~SCCFastQueryHandler() {
+ }
+
+ virtual bool canHandleQuery(const string& ns, Query query);
+
+ virtual auto_ptr<DBClientCursor> handleQuery(const vector<string>& hostStrings,
+ const string &ns,
+ Query query,
+ int nToReturn,
+ int nToSkip,
+ const BSONObj *fieldsToReturn,
+ int queryOptions,
+ int batchSize);
+
+ private:
+
+ // The thread pool itself is scoped to the handler and SCC, and lazily creates threads
+ // per-host as needed. This ensures query starvation cannot occur due to other active
+ // client threads - though a thread must be created for every client.
+ HostThreadPools _queryThreads;
+ };
+
+}
diff --git a/src/mongo/s/shard.cpp b/src/mongo/s/shard.cpp
index f80637bb960..5631eeafb0a 100644
--- a/src/mongo/s/shard.cpp
+++ b/src/mongo/s/shard.cpp
@@ -49,6 +49,7 @@
#include "mongo/s/client_info.h"
#include "mongo/s/config.h"
#include "mongo/s/request.h"
+#include "mongo/s/scc_fast_query_handler.h"
#include "mongo/s/type_shard.h"
#include "mongo/s/version_manager.h"
@@ -497,6 +498,15 @@ namespace mongo {
// to the end of every runCommand. mongod uses this information to produce auditing
// records attributed to the proper authenticated user(s).
conn->setRunCommandHook(boost::bind(&audit::appendImpersonatedUsers, _1));
+
+ // For every SCC created, add a hook that will allow fastest-config-first config reads if
+ // the appropriate server options are set.
+ if ( conn->type() == ConnectionString::SYNC ) {
+ SyncClusterConnection* scc = dynamic_cast<SyncClusterConnection*>( conn );
+ if ( scc ) {
+ scc->attachQueryHandler( new SCCFastQueryHandler );
+ }
+ }
}
void ShardingConnectionHook::onDestroy( DBClientBase * conn ) {