/* * Copyright (C) 2010 10gen Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects * for all of the code used other than as permitted herein. If you modify * file(s) with this exception, you may extend this exception to your * version of the file(s), but you are not obligated to do so. If you do not * wish to do so, delete this exception statement from your version. If you * delete this exception statement from all source files in the program, * then also delete it in the license file. */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding #include "mongo/platform/basic.h" #include "mongo/s/strategy.h" #include "mongo/base/status.h" #include "mongo/base/owned_pointer_vector.h" #include "mongo/bson/util/builder.h" #include "mongo/bson/util/bson_extract.h" #include "mongo/client/connpool.h" #include "mongo/client/dbclientcursor.h" #include "mongo/db/audit.h" #include "mongo/db/auth/action_type.h" #include "mongo/db/auth/authorization_session.h" #include "mongo/db/commands.h" #include "mongo/db/max_time.h" #include "mongo/db/server_parameters.h" #include "mongo/db/namespace_string.h" #include "mongo/db/query/lite_parsed_query.h" #include "mongo/db/stats/counters.h" #include "mongo/s/bson_serializable.h" #include "mongo/s/catalog/catalog_cache.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/chunk_manager.h" #include "mongo/s/chunk_version.h" #include "mongo/s/config.h" #include "mongo/s/cursors.h" #include "mongo/s/grid.h" #include "mongo/s/query/cluster_find.h" #include "mongo/s/request.h" #include "mongo/s/stale_exception.h" #include "mongo/s/version_manager.h" #include "mongo/s/write_ops/batched_command_request.h" #include "mongo/s/write_ops/batch_upconvert.h" #include "mongo/util/log.h" #include "mongo/util/mongoutils/str.h" #include "mongo/util/timer.h" namespace mongo { using std::unique_ptr; using std::shared_ptr; using std::set; using std::string; using std::stringstream; using std::vector; namespace { // A spigot to enable the ClusterClientCursor codepath MONGO_EXPORT_SERVER_PARAMETER(useClusterClientCursor, bool, false); } // namespace static bool _isSystemIndexes(const char* ns) { return nsToCollectionSubstring(ns) == "system.indexes"; } /** * Returns true if request is a query for sharded indexes. */ static bool doShardedIndexQuery(Request& r, const QuerySpec& qSpec) { // Extract the ns field from the query, which may be embedded within the "query" or // "$query" field. auto nsField = qSpec.filter()["ns"]; if (nsField.eoo()) { return false; } const NamespaceString indexNSSQuery(nsField.str()); auto status = grid.catalogCache()->getDatabase(indexNSSQuery.db().toString()); if (!status.isOK()) { return false; } shared_ptr config = status.getValue(); if (!config->isSharded(indexNSSQuery.ns())) { return false; } // if you are querying on system.indexes, we need to make sure we go to a shard // that actually has chunks. This is not a perfect solution (what if you just // look at all indexes), but better than doing nothing. ShardPtr shard; ChunkManagerPtr cm; config->getChunkManagerOrPrimary(indexNSSQuery.ns(), cm, shard); if (cm) { set shardIds; cm->getAllShardIds(&shardIds); verify(shardIds.size() > 0); shard = grid.shardRegistry()->getShard(*shardIds.begin()); } ShardConnection dbcon(shard->getConnString(), r.getns()); DBClientBase& c = dbcon.conn(); string actualServer; Message response; bool ok = c.call(r.m(), response, true, &actualServer); uassert(10200, "mongos: error calling db", ok); { QueryResult::View qr = response.singleData().view2ptr(); if (qr.getResultFlags() & ResultFlag_ShardConfigStale) { dbcon.done(); // Version is zero b/c this is deprecated codepath throw RecvStaleConfigException(r.getns(), "Strategy::doQuery", ChunkVersion(0, 0, OID()), ChunkVersion(0, 0, OID())); } } r.reply(response, actualServer.size() ? actualServer : c.getServerAddress()); dbcon.done(); return true; } void Strategy::queryOp(Request& r) { verify(!NamespaceString(r.getns()).isCommand()); Timer queryTimer; globalOpCounters.gotQuery(); QueryMessage q(r.d()); NamespaceString ns(q.ns); ClientBasic* client = ClientBasic::getCurrent(); AuthorizationSession* authSession = AuthorizationSession::get(client); Status status = authSession->checkAuthForQuery(ns, q.query); audit::logQueryAuthzCheck(client, ns, q.query, status.code()); uassertStatusOK(status); LOG(3) << "query: " << q.ns << " " << q.query << " ntoreturn: " << q.ntoreturn << " options: " << q.queryOptions; if (q.ntoreturn == 1 && strstr(q.ns, ".$cmd")) throw UserException(8010, "something is wrong, shouldn't see a command here"); if (q.queryOptions & QueryOption_Exhaust) { uasserted(18526, string("the 'exhaust' query option is invalid for mongos queries: ") + q.ns + " " + q.query.toString()); } // Spigot which controls whether OP_QUERY style find on mongos uses the new ClusterClientCursor // code path. // TODO: Delete the spigot and always use the new code. if (useClusterClientCursor) { auto txn = cc().makeOperationContext(); ReadPreferenceSetting readPreference(ReadPreference::PrimaryOnly, TagSet::primaryOnly()); BSONElement rpElem; auto readPrefExtractStatus = bsonExtractTypedField( q.query, LiteParsedQuery::kFindCommandReadPrefField, mongo::Object, &rpElem); if (readPrefExtractStatus.isOK()) { auto parsedRps = ReadPreferenceSetting::fromBSON(rpElem.Obj()); uassertStatusOK(parsedRps.getStatus()); readPreference = parsedRps.getValue(); } else if (readPrefExtractStatus != ErrorCodes::NoSuchKey) { uassertStatusOK(readPrefExtractStatus); } auto canonicalQuery = CanonicalQuery::canonicalize(q); uassertStatusOK(canonicalQuery.getStatus()); // Do the work to generate the first batch of results. This blocks waiting to get responses // from the shard(s). std::vector batch; // 0 means the cursor is exhausted and // otherwise we assume that a cursor with the returned id can be retrieved via the // ClusterCursorManager auto cursorId = ClusterFind::runQuery(txn.get(), *canonicalQuery.getValue(), readPreference, &batch); uassertStatusOK(cursorId.getStatus()); // Build the response document. // TODO: this constant should be shared between mongos and mongod, and should // not be inside ShardedClientCursor. BufBuilder buffer(ShardedClientCursor::INIT_REPLY_BUFFER_SIZE); int numResults = 0; for (const auto& obj : batch) { buffer.appendBuf((void*)obj.objdata(), obj.objsize()); numResults++; } replyToQuery(0, // query result flags r.p(), r.m(), buffer.buf(), buffer.len(), numResults, 0, // startingFrom cursorId.getValue()); return; } QuerySpec qSpec((string)q.ns, q.query, q.fields, q.ntoskip, q.ntoreturn, q.queryOptions); // Parse "$maxTimeMS". StatusWith maxTimeMS = LiteParsedQuery::parseMaxTimeMSQuery(q.query); uassert(17233, maxTimeMS.getStatus().reason(), maxTimeMS.isOK()); if (_isSystemIndexes(q.ns) && doShardedIndexQuery(r, qSpec)) { return; } ParallelSortClusteredCursor* cursor = new ParallelSortClusteredCursor(qSpec, CommandInfo()); verify(cursor); // TODO: Move out to Request itself, not strategy based try { cursor->init(); if (qSpec.isExplain()) { BSONObjBuilder explain_builder; cursor->explain(explain_builder); explain_builder.appendNumber("executionTimeMillis", static_cast(queryTimer.millis())); BSONObj b = explain_builder.obj(); replyToQuery(0, r.p(), r.m(), b); delete (cursor); return; } } catch (...) { delete cursor; throw; } // TODO: Revisit all of this when we revisit the sharded cursor cache if (cursor->getNumQueryShards() != 1) { // More than one shard (or zero), manage with a ShardedClientCursor // NOTE: We may also have *zero* shards here when the returnPartial flag is set. // Currently the code in ShardedClientCursor handles this. ShardedClientCursorPtr cc(new ShardedClientCursor(q, cursor)); BufBuilder buffer(ShardedClientCursor::INIT_REPLY_BUFFER_SIZE); int docCount = 0; const int startFrom = cc->getTotalSent(); bool hasMore = cc->sendNextBatch(q.ntoreturn, buffer, docCount); if (hasMore) { LOG(5) << "storing cursor : " << cc->getId(); int cursorLeftoverMillis = maxTimeMS.getValue() - queryTimer.millis(); if (maxTimeMS.getValue() == 0) { // 0 represents "no limit". cursorLeftoverMillis = kMaxTimeCursorNoTimeLimit; } else if (cursorLeftoverMillis <= 0) { cursorLeftoverMillis = kMaxTimeCursorTimeLimitExpired; } cursorCache.store(cc, cursorLeftoverMillis); } replyToQuery(0, r.p(), r.m(), buffer.buf(), buffer.len(), docCount, startFrom, hasMore ? cc->getId() : 0); } else { // Only one shard is used // Remote cursors are stored remotely, we shouldn't need this around. unique_ptr cursorDeleter(cursor); ShardPtr shard = cursor->getQueryShard(); verify(shard.get()); DBClientCursorPtr shardCursor = cursor->getShardCursor(shard->getId()); // Implicitly stores the cursor in the cache r.reply(*(shardCursor->getMessage()), shardCursor->originalHost()); // We don't want to kill the cursor remotely if there's still data left shardCursor->decouple(); } } void Strategy::clientCommandOp(Request& r) { QueryMessage q(r.d()); LOG(3) << "command: " << q.ns << " " << q.query << " ntoreturn: " << q.ntoreturn << " options: " << q.queryOptions; if (q.queryOptions & QueryOption_Exhaust) { uasserted(18527, string("the 'exhaust' query option is invalid for mongos commands: ") + q.ns + " " + q.query.toString()); } NamespaceString nss(r.getns()); // Regular queries are handled in strategy_shard.cpp verify(nss.isCommand() || nss.isSpecialCommand()); if (handleSpecialNamespaces(r, q)) return; int loops = 5; while (true) { BSONObjBuilder builder; try { BSONObj cmdObj = q.query; { BSONElement e = cmdObj.firstElement(); if (e.type() == Object && (e.fieldName()[0] == '$' ? str::equals("query", e.fieldName() + 1) : str::equals("query", e.fieldName()))) { // Extract the embedded query object. if (cmdObj.hasField(Query::ReadPrefField.name())) { // The command has a read preference setting. We don't want // to lose this information so we copy this to a new field // called $queryOptions.$readPreference BSONObjBuilder finalCmdObjBuilder; finalCmdObjBuilder.appendElements(e.embeddedObject()); BSONObjBuilder queryOptionsBuilder( finalCmdObjBuilder.subobjStart("$queryOptions")); queryOptionsBuilder.append(cmdObj[Query::ReadPrefField.name()]); queryOptionsBuilder.done(); cmdObj = finalCmdObjBuilder.obj(); } else { cmdObj = e.embeddedObject(); } } } Command::runAgainstRegistered(q.ns, cmdObj, builder, q.queryOptions); BSONObj x = builder.done(); replyToQuery(0, r.p(), r.m(), x); return; } catch (StaleConfigException& e) { if (loops <= 0) throw e; loops--; log() << "retrying command: " << q.query; // For legacy reasons, ns may not actually be set in the exception :-( string staleNS = e.getns(); if (staleNS.size() == 0) staleNS = q.ns; ShardConnection::checkMyConnectionVersions(staleNS); if (loops < 4) versionManager.forceRemoteCheckShardVersionCB(staleNS); } catch (AssertionException& e) { Command::appendCommandStatus(builder, e.toStatus()); BSONObj x = builder.done(); replyToQuery(0, r.p(), r.m(), x); return; } } } // TODO: remove after MongoDB 3.2 bool Strategy::handleSpecialNamespaces(Request& r, QueryMessage& q) { const char* ns = strstr(r.getns(), ".$cmd.sys."); if (!ns) return false; ns += 10; BSONObjBuilder reply; const auto upgradeToRealCommand = [&r, &q, &reply](StringData commandName) { BSONObjBuilder cmdBob; cmdBob.append(commandName, 1); cmdBob.appendElements(q.query); // fields are validated by Commands auto interposedCmd = cmdBob.done(); NamespaceString nss(r.getns()); NamespaceString interposedNss(nss.db(), "$cmd"); Command::runAgainstRegistered( interposedNss.ns().c_str(), interposedCmd, reply, q.queryOptions); }; if (strcmp(ns, "inprog") == 0) { upgradeToRealCommand("currentOp"); } else if (strcmp(ns, "killop") == 0) { upgradeToRealCommand("killOp"); } else if (strcmp(ns, "unlock") == 0) { reply.append("err", "can't do unlock through mongos"); } else { warning() << "unknown sys command [" << ns << "]"; return false; } BSONObj x = reply.done(); replyToQuery(0, r.p(), r.m(), x); return true; } void Strategy::commandOp(const string& db, const BSONObj& command, int options, const string& versionedNS, const BSONObj& targetingQuery, vector* results) { QuerySpec qSpec(db + ".$cmd", command, BSONObj(), 0, 1, options); ParallelSortClusteredCursor cursor(qSpec, CommandInfo(versionedNS, targetingQuery)); // Initialize the cursor cursor.init(); set shardIds; cursor.getQueryShardIds(shardIds); for (const ShardId& shardId : shardIds) { CommandResult result; result.shardTargetId = shardId; result.target = fassertStatusOK( 28739, ConnectionString::parse(cursor.getShardCursor(shardId)->originalHost())); result.result = cursor.getShardCursor(shardId)->peekFirst().getOwned(); results->push_back(result); } } Status Strategy::commandOpUnsharded(const std::string& db, const BSONObj& command, int options, const std::string& versionedNS, CommandResult* cmdResult) { // Note that this implementation will not handle targeting retries and fails when the // sharding metadata is too stale auto status = grid.catalogCache()->getDatabase(db); if (!status.isOK()) { mongoutils::str::stream ss; ss << "Passthrough command failed: " << command.toString() << " on ns " << versionedNS << ". Caused by " << causedBy(status.getStatus()); return Status(ErrorCodes::IllegalOperation, ss); } shared_ptr conf = status.getValue(); if (conf->isSharded(versionedNS)) { mongoutils::str::stream ss; ss << "Passthrough command failed: " << command.toString() << " on ns " << versionedNS << ". Cannot run on sharded namespace."; return Status(ErrorCodes::IllegalOperation, ss); } const auto primaryShard = grid.shardRegistry()->getShard(conf->getPrimaryId()); BSONObj shardResult; try { ShardConnection conn(primaryShard->getConnString(), ""); // TODO: this can throw a stale config when mongos is not up-to-date -- fix. if (!conn->runCommand(db, command, shardResult, options)) { conn.done(); return Status(ErrorCodes::OperationFailed, str::stream() << "Passthrough command failed: " << command << " on ns " << versionedNS << "; result: " << shardResult); } conn.done(); } catch (const DBException& ex) { return ex.toStatus(); } // Fill out the command result. cmdResult->shardTargetId = conf->getPrimaryId(); cmdResult->result = shardResult; cmdResult->target = primaryShard->getConnString(); return Status::OK(); } void Strategy::getMore(Request& r) { Timer getMoreTimer; const char* ns = r.getns(); const int ntoreturn = r.d().pullInt(); const long long id = r.d().pullInt64(); // TODO: Handle stale config exceptions here from coll being dropped or sharded during op // for now has same semantics as legacy request const NamespaceString nss(ns); auto statusGetDb = grid.catalogCache()->getDatabase(nss.db().toString()); if (statusGetDb == ErrorCodes::DatabaseNotFound) { cursorCache.remove(id); replyToQuery(ResultFlag_CursorNotFound, r.p(), r.m(), 0, 0, 0); return; } uassertStatusOK(statusGetDb); shared_ptr config = statusGetDb.getValue(); ShardPtr primary; ChunkManagerPtr info; config->getChunkManagerOrPrimary(ns, info, primary); // // TODO: Cleanup cursor cache, consolidate into single codepath // const string host = cursorCache.getRef(id); ShardedClientCursorPtr cursor = cursorCache.get(id); int cursorMaxTimeMS = cursorCache.getMaxTimeMS(id); // Cursor ids should not overlap between sharded and unsharded cursors massert(17012, str::stream() << "duplicate sharded and unsharded cursor id " << id << " detected for " << ns << ", duplicated on host " << host, NULL == cursorCache.get(id).get() || host.empty()); ClientBasic* client = ClientBasic::getCurrent(); NamespaceString nsString(ns); AuthorizationSession* authSession = AuthorizationSession::get(client); Status status = authSession->checkAuthForGetMore(nsString, id); audit::logGetMoreAuthzCheck(client, nsString, id, status.code()); uassertStatusOK(status); if (!host.empty()) { LOG(3) << "single getmore: " << ns; // we used ScopedDbConnection because we don't get about config versions // not deleting data is handled elsewhere // and we don't want to call setShardVersion ScopedDbConnection conn(host); Message response; bool ok = conn->callRead(r.m(), response); uassert(10204, "dbgrid: getmore: error calling db", ok); bool hasMore = (response.singleData().getCursor() != 0); if (!hasMore) { cursorCache.removeRef(id); } r.reply(response, "" /*conn->getServerAddress() */); conn.done(); return; } else if (cursor) { if (cursorMaxTimeMS == kMaxTimeCursorTimeLimitExpired) { cursorCache.remove(id); uasserted(ErrorCodes::ExceededTimeLimit, "operation exceeded time limit"); } // TODO: Try to match logic of mongod, where on subsequent getMore() we pull lots more data? BufBuilder buffer(ShardedClientCursor::INIT_REPLY_BUFFER_SIZE); int docCount = 0; const int startFrom = cursor->getTotalSent(); bool hasMore = cursor->sendNextBatch(ntoreturn, buffer, docCount); if (hasMore) { // still more data cursor->accessed(); if (cursorMaxTimeMS != kMaxTimeCursorNoTimeLimit) { // Update remaining amount of time in cursor cache. int cursorLeftoverMillis = cursorMaxTimeMS - getMoreTimer.millis(); if (cursorLeftoverMillis <= 0) { cursorLeftoverMillis = kMaxTimeCursorTimeLimitExpired; } cursorCache.updateMaxTimeMS(id, cursorLeftoverMillis); } } else { // we've exhausted the cursor cursorCache.remove(id); } replyToQuery(0, r.p(), r.m(), buffer.buf(), buffer.len(), docCount, startFrom, hasMore ? cursor->getId() : 0); return; } else { LOG(3) << "could not find cursor " << id << " in cache for " << ns; replyToQuery(ResultFlag_CursorNotFound, r.p(), r.m(), 0, 0, 0); return; } } void Strategy::writeOp(int op, Request& r) { // make sure we have a last error dassert(&LastError::get(cc())); OwnedPointerVector requestsOwned; vector& requests = requestsOwned.mutableVector(); msgToBatchRequests(r.m(), &requests); for (vector::iterator it = requests.begin(); it != requests.end(); ++it) { // Multiple commands registered to last error as multiple requests if (it != requests.begin()) LastError::get(cc()).startRequest(); BatchedCommandRequest* request = *it; // Adjust namespaces for command NamespaceString fullNS(request->getNS()); string cmdNS = fullNS.getCommandNS(); // We only pass in collection name to command request->setNS(fullNS); BSONObjBuilder builder; BSONObj requestBSON = request->toBSON(); { // Disable the last error object for the duration of the write cmd LastError::Disabled disableLastError(&LastError::get(cc())); Command::runAgainstRegistered(cmdNS.c_str(), requestBSON, builder, 0); } BatchedCommandResponse response; bool parsed = response.parseBSON(builder.done(), NULL); (void)parsed; // for compile dassert(parsed && response.isValid(NULL)); // Populate the lastError object based on the write response LastError::get(cc()).reset(); bool hadError = batchErrorToLastError(*request, response, &LastError::get(cc())); // Check if this is an ordered batch and we had an error which should stop processing if (request->getOrdered() && hadError) break; } } }