/** * Copyright (C) 2018-present MongoDB, Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the Server Side Public License, version 1, * as published by MongoDB, Inc. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * Server Side Public License for more details. * * You should have received a copy of the Server Side Public License * along with this program. If not, see * . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the Server Side Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork #include "mongo/platform/basic.h" #include "mongo/s/client/parallel.h" #include "mongo/client/constants.h" #include "mongo/client/dbclient_cursor.h" #include "mongo/client/dbclient_rs.h" #include "mongo/client/replica_set_monitor.h" #include "mongo/db/bson/dotted_path_support.h" #include "mongo/db/query/query_request.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/s/catalog_cache.h" #include "mongo/s/client/shard_connection.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/grid.h" #include "mongo/util/log.h" #include "mongo/util/net/socket_exception.h" namespace mongo { using std::shared_ptr; using std::map; using std::set; using std::string; using std::vector; namespace dps = ::mongo::dotted_path_support; namespace { /** * Throws an exception wrapping the error document in this cursor when the error flag is set. */ void throwCursorError(DBClientCursor* cursor) { verify(cursor); if (cursor->hasResultFlag(ResultFlag_ErrSet)) { uassertStatusOK(getStatusFromCommandResult(cursor->next())); } } } // namespace struct ParallelConnectionState { ParallelConnectionState() : count(0), done(false) {} std::string toString() const; BSONObj toBSON() const; // Please do not reorder. cursor destructor can use conn. // On a related note, never attempt to cleanup these pointers manually. std::shared_ptr conn; std::shared_ptr cursor; // Version information std::shared_ptr manager; std::shared_ptr primary; // Cursor status information long long count; bool done; }; struct ParallelConnectionMetadata { ParallelConnectionMetadata() : retryNext(false), initialized(false), finished(false), completed(false), errored(false) {} ~ParallelConnectionMetadata() { cleanup(true); } void cleanup(bool full = true); std::shared_ptr pcState; bool retryNext; bool initialized; bool finished; bool completed; bool errored; BSONObj toBSON() const; std::string toString() const { return str::stream() << "PCMData : " << toBSON(); } }; /** * Helper class to manage ownership of opened cursors while merging results. * * TODO: Choose one set of ownership semantics so that this isn't needed - merge sort via mapreduce * is the main issue since it has no metadata and this holder owns the cursors. */ class DBClientCursorHolder { public: DBClientCursorHolder() = default; void reset(DBClientCursor* cursor, ParallelConnectionMetadata* pcmData) { _cursor.reset(cursor); _pcmData.reset(pcmData); } DBClientCursor* get() { return _cursor.get(); } ParallelConnectionMetadata* getMData() { return _pcmData.get(); } void release() { _cursor.release(); _pcmData.release(); } private: std::unique_ptr _cursor; std::unique_ptr _pcmData; }; // -------- ParallelSortClusteredCursor ----------- ParallelSortClusteredCursor::ParallelSortClusteredCursor(const QuerySpec& qSpec, const CommandInfo& cInfo) : _qSpec(qSpec), _cInfo(cInfo), _totalTries(0) { _done = false; _didInit = false; _finishCons(); } ParallelSortClusteredCursor::ParallelSortClusteredCursor(const set& servers, const string& ns, const Query& q, int options, const BSONObj& fields) : _servers(servers) { _sortKey = q.getSort().copy(); _needToSkip = 0; _done = false; _didInit = false; // Populate legacy fields _ns = ns; _query = q.obj.getOwned(); _options = options; _fields = fields.getOwned(); _batchSize = 0; _finishCons(); } ParallelSortClusteredCursor::~ParallelSortClusteredCursor() { // WARNING: Commands (in particular M/R) connect via _oldInit() directly to shards bool isDirectShardCursor = _cursorMap.empty(); // Non-direct shard cursors are owned by the _cursorMap, so we release // them in the array here. Direct shard cursors clean themselves. if (!isDirectShardCursor) { for (int i = 0; i < _numServers; i++) _cursors[i].release(); } delete[] _cursors; _cursors = 0; // Clear out our metadata after removing legacy cursor data _cursorMap.clear(); // Just to be sure _done = true; } void ParallelSortClusteredCursor::init(OperationContext* opCtx) { if (_didInit) return; _didInit = true; if (!_qSpec.isEmpty()) { fullInit(opCtx); } else { // You can only get here by using the legacy constructor // TODO: Eliminate this _oldInit(opCtx); } } void ParallelSortClusteredCursor::_finishCons() { _numServers = _servers.size(); _lastFrom = 0; _cursors = 0; if (!_qSpec.isEmpty()) { _needToSkip = _qSpec.ntoskip(); _cursors = 0; _sortKey = _qSpec.sort(); _fields = _qSpec.fields(); } // Partition sort key fields into (a) text meta fields and (b) all other fields. set textMetaSortKeyFields; set normalSortKeyFields; // Transform _sortKey fields {a:{$meta:"textScore"}} into {a:-1}, in order to apply the // merge sort for text metadata in the correct direction. BSONObjBuilder transformedSortKeyBuilder; BSONObjIterator sortKeyIt(_sortKey); while (sortKeyIt.more()) { BSONElement e = sortKeyIt.next(); if (QueryRequest::isTextScoreMeta(e)) { textMetaSortKeyFields.insert(e.fieldName()); transformedSortKeyBuilder.append(e.fieldName(), -1); } else { normalSortKeyFields.insert(e.fieldName()); transformedSortKeyBuilder.append(e); } } _sortKey = transformedSortKeyBuilder.obj(); // Verify that that all text metadata sort fields are in the projection. For all other sort // fields, copy them into the projection if they are missing (and if projection is // negative). if (!_sortKey.isEmpty() && !_fields.isEmpty()) { BSONObjBuilder b; bool isNegative = false; { BSONObjIterator i(_fields); while (i.more()) { BSONElement e = i.next(); b.append(e); string fieldName = e.fieldName(); if (QueryRequest::isTextScoreMeta(e)) { textMetaSortKeyFields.erase(fieldName); } else { // exact field bool found = normalSortKeyFields.erase(fieldName); // subfields set::const_iterator begin = normalSortKeyFields.lower_bound(fieldName + ".\x00"); set::const_iterator end = normalSortKeyFields.lower_bound(fieldName + ".\xFF"); normalSortKeyFields.erase(begin, end); if (!e.trueValue()) { uassert(13431, "have to have sort key in projection and removing it", !found && begin == end); } else if (!e.isABSONObj()) { isNegative = true; } } } } if (isNegative) { for (set::const_iterator it(normalSortKeyFields.begin()), end(normalSortKeyFields.end()); it != end; ++it) { b.append(*it, 1); } } _fields = b.obj(); } if (!_qSpec.isEmpty()) { _qSpec.setFields(_fields); } uassert( 17306, "have to have all text meta sort keys in projection", textMetaSortKeyFields.empty()); } void ParallelSortClusteredCursor::fullInit(OperationContext* opCtx) { startInit(opCtx); finishInit(opCtx); } void ParallelSortClusteredCursor::_markStaleNS(const NamespaceString& staleNS, const StaleConfigException& e) { if (_staleNSMap.find(staleNS.ns()) == _staleNSMap.end()) { _staleNSMap[staleNS.ns()] = 1; } const int tries = ++_staleNSMap[staleNS.ns()]; if (tries >= 5) { uassertStatusOK(e.toStatus("too many retries of stale version info")); } } void ParallelSortClusteredCursor::setupVersionAndHandleSlaveOk( OperationContext* opCtx, std::shared_ptr state, const ShardId& shardId, std::shared_ptr primary, const NamespaceString& ns, const string& vinfo, std::shared_ptr manager) { if (manager) { state->manager = manager; } else if (primary) { state->primary = primary; } verify(!primary || shardId == primary->getId()); // Setup conn if (!state->conn) { const auto shard = uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, shardId)); state->conn.reset(new ShardConnection(opCtx, shard->getConnString(), ns.ns(), manager)); } const DBClientBase* rawConn = state->conn->getRawConn(); bool allowShardVersionFailure = rawConn->type() == ConnectionString::SET && DBClientReplicaSet::isSecondaryQuery(_qSpec.ns(), _qSpec.query(), _qSpec.options()); // Skip shard version checking if primary is known to be down. if (allowShardVersionFailure) { const DBClientReplicaSet* replConn = dynamic_cast(rawConn); invariant(replConn); ReplicaSetMonitorPtr rsMonitor = ReplicaSetMonitor::get(replConn->getSetName()); uassert(16388, str::stream() << "cannot access unknown replica set: " << replConn->getSetName(), rsMonitor != nullptr); if (!rsMonitor->isKnownToHaveGoodPrimary()) { state->conn->donotCheckVersion(); // A side effect of this short circuiting is the mongos will not be able figure out // that the primary is now up on it's own and has to rely on other threads to refresh // node states. static Occasionally sampler; if (sampler.tick()) { const DBClientReplicaSet* repl = dynamic_cast(rawConn); dassert(repl); warning() << "Primary for " << repl->getServerAddress() << " was down before, bypassing setShardVersion." << " The local replica set view and targeting may be stale."; } return; } } try { if (state->conn->setVersion()) { LOG(2) << "pcursor: needed to set remote version on connection to value " << "compatible with " << vinfo; } } catch (const DBException& dbExcep) { auto errCode = dbExcep.code(); if (allowShardVersionFailure && (ErrorCodes::isNotMasterError(errCode) || ErrorCodes::isNetworkError(errCode) || errCode == ErrorCodes::FailedToSatisfyReadPreference)) { // It's okay if we don't set the version when talking to a secondary, we can // be stale in any case. static Occasionally sampler; if (sampler.tick()) { const DBClientReplicaSet* repl = dynamic_cast(state->conn->getRawConn()); dassert(repl); warning() << "Cannot contact primary for " << repl->getServerAddress() << " to check shard version." << " The local replica set view and targeting may be stale."; } } else { throw; } } } void ParallelSortClusteredCursor::startInit(OperationContext* opCtx) { const bool returnPartial = (_qSpec.options() & QueryOption_PartialResults); const NamespaceString nss(!_cInfo.isEmpty() ? _cInfo.versionedNS : _qSpec.ns()); string prefix; if (MONGO_unlikely(shouldLog(logger::LogSeverity::Debug(2)))) { if (_totalTries > 0) { prefix = str::stream() << "retrying (" << _totalTries << " tries)"; } else { prefix = "creating"; } } LOG(2) << "pcursor: " << prefix << " pcursor over " << _qSpec << " and " << _cInfo; shared_ptr manager; shared_ptr primary; { auto routingInfoStatus = Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss); if (routingInfoStatus != ErrorCodes::NamespaceNotFound) { auto routingInfo = uassertStatusOK(std::move(routingInfoStatus)); manager = routingInfo.cm(); // ParallelSortClusteredCursor has two states - either !cm && primary, which means // unsharded collection, or cm && !primary, which means sharded collection. if (!manager) { primary = routingInfo.db().primary(); } } } set shardIds; string vinfo; if (manager) { if (MONGO_unlikely(shouldLog(logger::LogSeverity::Debug(2)))) { vinfo = str::stream() << "[" << manager->getns().ns() << " @ " << manager->getVersion().toString() << "]"; } manager->getShardIdsForQuery(opCtx, !_cInfo.isEmpty() ? _cInfo.cmdFilter : _qSpec.filter(), !_cInfo.isEmpty() ? _cInfo.cmdCollation : BSONObj(), &shardIds); } else if (primary) { if (MONGO_unlikely(shouldLog(logger::LogSeverity::Debug(2)))) { vinfo = str::stream() << "[unsharded @ " << primary->toString() << "]"; } shardIds.insert(primary->getId()); } // Close all cursors on extra shards first, as these will be invalid for (auto& cmEntry : _cursorMap) { const auto& shardId = cmEntry.first; if (shardIds.find(shardId) == shardIds.end()) { LOG(2) << "pcursor: closing cursor on shard " << shardId << " as the connection is no longer required by " << vinfo; cmEntry.second.cleanup(true); } } LOG(2) << "pcursor: initializing over " << shardIds.size() << " shards required by " << vinfo; // Don't retry indefinitely for whatever reason _totalTries++; uassert(15986, "too many retries in total", _totalTries < 10); for (const ShardId& shardId : shardIds) { auto& mdata = _cursorMap[shardId]; LOG(2) << "pcursor: initializing on shard " << shardId << ", current connection state is " << mdata.toBSON(); // This may be the first time connecting to this shard, if so we can get an error here try { if (mdata.initialized) { invariant(mdata.pcState); auto state = mdata.pcState; bool compatiblePrimary = true; bool compatibleManager = true; if (primary && !state->primary) warning() << "Collection becoming unsharded detected"; if (manager && !state->manager) warning() << "Collection becoming sharded detected"; if (primary && state->primary && primary != state->primary) warning() << "Weird shift of primary detected"; compatiblePrimary = primary && state->primary && primary == state->primary; compatibleManager = manager && state->manager && manager->compatibleWith(*state->manager, shardId); if (compatiblePrimary || compatibleManager) { // If we're compatible, don't need to retry unless forced if (!mdata.retryNext) continue; // Do partial cleanup mdata.cleanup(false); } else { // Force total cleanup of connection if no longer compatible mdata.cleanup(true); } } else { // Cleanup connection if we're not yet initialized mdata.cleanup(false); } mdata.pcState = std::make_shared(); auto state = mdata.pcState; setupVersionAndHandleSlaveOk(opCtx, state, shardId, primary, nss, vinfo, manager); const string& ns = _qSpec.ns(); // Setup cursor if (!state->cursor) { // // Here we decide whether to split the query limits up for multiple shards. // NOTE: There's a subtle issue here, in that it's possible we target a single // shard first, but are stale, and then target multiple shards, or vice-versa. // In both these cases, we won't re-use the old cursor created here, since the // shard version must have changed on the single shard between queries. // if (shardIds.size() > 1) { // Query limits split for multiple shards state->cursor.reset(new DBClientCursor( state->conn->get(), NamespaceString(ns), _qSpec.query(), isCommand() ? 1 : 0, // nToReturn (0 if query indicates multi) 0, // nToSkip // Does this need to be a ptr? _qSpec.fields().isEmpty() ? 0 : _qSpec.fieldsData(), // fieldsToReturn _qSpec.options(), // options // NtoReturn is weird. // If zero, it means use default size, so we do that for all cursors // If positive, it's the batch size (we don't want this cursor limiting // results), that's done at a higher level // If negative, it's the batch size, but we don't create a cursor - so we // don't want to create a child cursor either. // Either way, if non-zero, we want to pull back the batch size + the skip // amount as quickly as possible. Potentially, for a cursor on a single // shard or if we keep better track of chunks, we can actually add the skip // value into the cursor and/or make some assumptions about the return value // size ( (batch size + skip amount) / num_servers ). _qSpec.ntoreturn() == 0 ? 0 : (_qSpec.ntoreturn() > 0 ? _qSpec.ntoreturn() + _qSpec.ntoskip() : _qSpec.ntoreturn() - _qSpec.ntoskip()))); // batchSize } else { // Single shard query state->cursor.reset(new DBClientCursor( state->conn->get(), NamespaceString(ns), _qSpec.query(), _qSpec.ntoreturn(), // nToReturn _qSpec.ntoskip(), // nToSkip // Does this need to be a ptr? _qSpec.fields().isEmpty() ? 0 : _qSpec.fieldsData(), // fieldsToReturn _qSpec.options(), // options 0)); // batchSize } } bool lazyInit = state->conn->get()->lazySupported(); if (lazyInit) { // Need to keep track if this is a second or third try for replica sets state->cursor->initLazy(mdata.retryNext); mdata.retryNext = false; mdata.initialized = true; } else { bool success = state->cursor->init(); // Without full initialization, throw an exception uassert(15987, str::stream() << "could not fully initialize cursor on shard " << shardId << ", current connection state is " << mdata.toBSON().toString(), success); mdata.retryNext = false; mdata.initialized = true; mdata.finished = true; } LOG(2) << "pcursor: initialized " << (isCommand() ? "command " : "query ") << (lazyInit ? "(lazily) " : "(full) ") << "on shard " << shardId << ", current connection state is " << mdata.toBSON(); } catch (StaleConfigException& e) { // Our version isn't compatible with the current version anymore on at least one shard, // need to retry immediately NamespaceString staleNS(e->getNss()); _markStaleNS(staleNS, e); Grid::get(opCtx)->catalogCache()->invalidateShardedCollection(staleNS); LOG(1) << "stale config of ns " << staleNS << " during initialization, will retry" << causedBy(redact(e)); // This is somewhat strange if (staleNS != nss) { warning() << "versioned ns " << nss.ns() << " doesn't match stale config namespace " << staleNS; } // Restart with new chunk manager startInit(opCtx); return; } catch (NetworkException& e) { warning() << "socket exception when initializing on " << shardId << ", current connection state is " << mdata.toBSON() << causedBy(redact(e)); mdata.errored = true; if (returnPartial) { mdata.cleanup(true); continue; } throw; } catch (DBException& e) { warning() << "db exception when initializing on " << shardId << ", current connection state is " << mdata.toBSON() << causedBy(redact(e)); mdata.errored = true; if (returnPartial && e.code() == 15925 /* From above! */) { mdata.cleanup(true); continue; } throw; } catch (std::exception& e) { warning() << "exception when initializing on " << shardId << ", current connection state is " << mdata.toBSON() << causedBy(e); mdata.errored = true; throw; } catch (...) { warning() << "unknown exception when initializing on " << shardId << ", current connection state is " << mdata.toBSON(); mdata.errored = true; throw; } } // Sanity check final init'ed connections for (const auto& cmEntry : _cursorMap) { const auto& shardId = cmEntry.first; const auto& mdata = cmEntry.second; if (!mdata.pcState) { continue; } // Make sure all state is in shards invariant(shardIds.find(shardId) != shardIds.end()); invariant(mdata.initialized == true); if (!mdata.completed) { invariant(mdata.pcState->conn->ok()); } invariant(mdata.pcState->cursor); invariant(mdata.pcState->primary || mdata.pcState->manager); invariant(!mdata.retryNext); if (mdata.completed) { invariant(mdata.finished); } if (mdata.finished) { invariant(mdata.initialized); } if (!returnPartial) { invariant(mdata.initialized); } } } void ParallelSortClusteredCursor::finishInit(OperationContext* opCtx) { bool returnPartial = (_qSpec.options() & QueryOption_PartialResults); bool specialVersion = _cInfo.versionedNS.size() > 0; string ns = specialVersion ? _cInfo.versionedNS : _qSpec.ns(); bool retry = false; map staleNSExceptions; LOG(2) << "pcursor: finishing over " << _cursorMap.size() << " shards"; for (auto& cmEntry : _cursorMap) { const auto& shardId = cmEntry.first; auto& mdata = cmEntry.second; LOG(2) << "pcursor: finishing on shard " << shardId << ", current connection state is " << mdata.toBSON(); // Ignore empty conns for now if (!mdata.pcState) continue; auto state = mdata.pcState; try { // Sanity checks if (!mdata.completed) verify(state->conn && state->conn->ok()); verify(state->cursor); verify(state->manager || state->primary); verify(!state->manager || !state->primary); // If we weren't init'ing lazily, ignore this if (!mdata.finished) { mdata.finished = true; // Mark the cursor as non-retry by default mdata.retryNext = false; if (!state->cursor->initLazyFinish(mdata.retryNext)) { if (!mdata.retryNext) { uassert(15988, "error querying server", false); } else { retry = true; continue; } } mdata.completed = false; } if (!mdata.completed) { mdata.completed = true; // Make sure we didn't get an error we should rethrow // TODO : Refactor this to something better throwCursorStale(state->cursor.get()); throwCursorError(state->cursor.get()); // Finalize state state->cursor->attach(state->conn.get()); // Closes connection for us LOG(2) << "pcursor: finished on shard " << shardId << ", current connection state is " << mdata.toBSON(); } } catch (StaleConfigException& e) { retry = true; std::string staleNS = e->getNss().ns(); // Will retry all at once staleNSExceptions.emplace(staleNS, e); // Fully clear this cursor, as it needs to be re-established mdata.cleanup(true); continue; } catch (NetworkException& e) { warning() << "socket exception when finishing on " << shardId << ", current connection state is " << mdata.toBSON() << causedBy(redact(e)); mdata.errored = true; if (returnPartial) { mdata.cleanup(true); continue; } throw; } catch (DBException& e) { // NOTE: RECV() WILL NOT THROW A SOCKET EXCEPTION - WE GET THIS AS ERROR 15988 FROM // ABOVE if (e.code() == 15988) { warning() << "exception when receiving data from " << shardId << ", current connection state is " << mdata.toBSON() << causedBy(redact(e)); mdata.errored = true; if (returnPartial) { mdata.cleanup(true); continue; } throw; } else { // the InvalidBSON exception indicates that the BSON is malformed -> // don't print/call "mdata.toBSON()" to avoid unexpected errors e.g. a segfault if (e.code() == ErrorCodes::InvalidBSON) warning() << "bson is malformed :: db exception when finishing on " << shardId << causedBy(redact(e)); else warning() << "db exception when finishing on " << shardId << ", current connection state is " << mdata.toBSON() << causedBy(redact(e)); mdata.errored = true; throw; } } catch (std::exception& e) { warning() << "exception when finishing on " << shardId << ", current connection state is " << mdata.toBSON() << causedBy(e); mdata.errored = true; throw; } catch (...) { warning() << "unknown exception when finishing on " << shardId << ", current connection state is " << mdata.toBSON(); mdata.errored = true; throw; } } // Retry logic for single refresh of namespaces / retry init'ing connections if (retry) { // Refresh stale namespaces if (staleNSExceptions.size()) { for (const auto& exEntry : staleNSExceptions) { const NamespaceString staleNS(exEntry.first); const StaleConfigException& ex = exEntry.second; _markStaleNS(staleNS, ex); Grid::get(opCtx)->catalogCache()->invalidateShardedCollection(staleNS); LOG(1) << "stale config of ns " << staleNS << " on finishing query, will retry" << causedBy(redact(ex)); // This is somewhat strange if (staleNS.ns() != ns) { warning() << "versioned ns " << ns << " doesn't match stale config namespace " << staleNS; } } } // Re-establish connections we need to startInit(opCtx); finishInit(opCtx); return; } // Sanity check and clean final connections for (auto i = _cursorMap.begin(); i != _cursorMap.end();) { auto& mdata = i->second; // Erase empty stuff if (!mdata.pcState) { log() << "PCursor erasing empty state " << mdata.toBSON(); _cursorMap.erase(i++); continue; } else { ++i; } // Make sure all state is in shards verify(mdata.initialized == true); verify(mdata.finished == true); verify(mdata.completed == true); verify(!mdata.pcState->conn->ok()); verify(mdata.pcState->cursor); verify(mdata.pcState->primary || mdata.pcState->manager); } // TODO : More cleanup of metadata? // LEGACY STUFF NOW _cursors = new DBClientCursorHolder[_cursorMap.size()]; // Put the cursors in the legacy format int index = 0; for (auto& cmEntry : _cursorMap) { const auto& shardId = cmEntry.first; auto& mdata = cmEntry.second; _cursors[index].reset(mdata.pcState->cursor.get(), &mdata); const auto shard = uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, shardId)); _servers.insert(shard->getConnString().toString()); index++; } _numServers = _cursorMap.size(); } void ParallelSortClusteredCursor::getQueryShardIds(set& shardIds) const { std::transform(_cursorMap.begin(), _cursorMap.end(), std::inserter(shardIds, shardIds.end()), [](const ShardCursorsMap::value_type& pair) { return pair.first; }); } std::shared_ptr ParallelSortClusteredCursor::getShardCursor( const ShardId& shardId) const { auto it = _cursorMap.find(shardId); if (it == _cursorMap.end()) { return nullptr; } return it->second.pcState->cursor; } // DEPRECATED (but still used by map/reduce) void ParallelSortClusteredCursor::_oldInit(OperationContext* opCtx) { // make sure we're not already initialized verify(!_cursors); _cursors = new DBClientCursorHolder[_numServers]; bool returnPartial = (_options & QueryOption_PartialResults); vector serverHosts(_servers.begin(), _servers.end()); set retryQueries; int finishedQueries = 0; vector> conns; vector servers; // Since we may get all sorts of errors, record them all as they come and throw them later if // necessary vector staleConfigExs; vector socketExs; vector otherExs; bool allConfigStale = false; int retries = -1; // Loop through all the queries until we've finished or gotten a socket exception on all of them // We break early for non-socket exceptions, and socket exceptions if we aren't returning // partial results do { retries++; bool firstPass = retryQueries.size() == 0; if (!firstPass) { log() << "retrying " << (returnPartial ? "(partial) " : "") << "parallel connection to "; for (set::const_iterator it = retryQueries.begin(); it != retryQueries.end(); ++it) { log() << serverHosts[*it] << ", "; } log() << finishedQueries << " finished queries."; } size_t num = 0; for (vector::const_iterator it = serverHosts.begin(); it != serverHosts.end(); ++it) { size_t i = num++; const string& serverHost = *it; // If we're not retrying this cursor on later passes, continue if (!firstPass && retryQueries.find(i) == retryQueries.end()) continue; const string errLoc = " @ " + serverHost; if (firstPass) { // This may be the first time connecting to this shard, if so we can get an error // here try { conns.push_back(shared_ptr(new ShardConnection( opCtx, uassertStatusOK(ConnectionString::parse(serverHost)), _ns))); } catch (std::exception& e) { socketExs.push_back(e.what() + errLoc); if (!returnPartial) { num--; break; } conns.push_back(shared_ptr()); continue; } servers.push_back(serverHost); } if (conns[i]->setVersion()) { conns[i]->done(); // Version is zero b/c this is deprecated codepath staleConfigExs.push_back(str::stream() << "stale config detected for " << _ns << " in ParallelCursor::_init " << errLoc); break; } LOG(5) << "ParallelSortClusteredCursor::init server:" << serverHost << " ns:" << _ns << " query:" << redact(_query) << " fields:" << redact(_fields) << " options: " << _options; if (!_cursors[i].get()) _cursors[i].reset( new DBClientCursor(conns[i]->get(), NamespaceString(_ns), _query, 0, // nToReturn 0, // nToSkip _fields.isEmpty() ? 0 : &_fields, // fieldsToReturn _options, _batchSize == 0 ? 0 : _batchSize + _needToSkip // batchSize ), NULL); try { _cursors[i].get()->initLazy(!firstPass); } catch (NetworkException& e) { socketExs.push_back(e.what() + errLoc); _cursors[i].reset(NULL, NULL); conns[i]->done(); if (!returnPartial) break; } catch (std::exception& e) { otherExs.push_back(e.what() + errLoc); _cursors[i].reset(NULL, NULL); conns[i]->done(); break; } } // Go through all the potentially started cursors and finish initializing them or log any // errors and potentially retry // TODO: Better error classification would make this easier, errors are indicated in all // sorts of ways here that we need to trap. for (size_t i = 0; i < num; i++) { const string errLoc = " @ " + serverHosts[i]; if (!_cursors[i].get() || (!firstPass && retryQueries.find(i) == retryQueries.end())) { if (conns[i]) conns[i].get()->done(); continue; } verify(conns[i]); retryQueries.erase(i); bool retry = false; try { if (!_cursors[i].get()->initLazyFinish(retry)) { warning() << "invalid result from " << conns[i]->getHost() << (retry ? ", retrying" : ""); _cursors[i].reset(NULL, NULL); if (!retry) { socketExs.push_back(str::stream() << "error querying server: " << servers[i]); conns[i]->done(); } else { retryQueries.insert(i); } continue; } } catch (StaleConfigException& e) { // Our stored configuration data is actually stale, we need to reload it // when we throw our exception allConfigStale = true; staleConfigExs.push_back( (string) "stale config detected when receiving response for " + e.toString() + errLoc); _cursors[i].reset(NULL, NULL); conns[i]->done(); continue; } catch (NetworkException& e) { socketExs.push_back(e.what() + errLoc); _cursors[i].reset(NULL, NULL); conns[i]->done(); continue; } catch (std::exception& e) { otherExs.push_back(e.what() + errLoc); _cursors[i].reset(NULL, NULL); conns[i]->done(); continue; } try { _cursors[i].get()->attach(conns[i].get()); // this calls done on conn // Rethrow stale config or other errors throwCursorStale(_cursors[i].get()); throwCursorError(_cursors[i].get()); finishedQueries++; } catch (StaleConfigException& e) { // Our stored configuration data is actually stale, we need to reload it // when we throw our exception allConfigStale = true; staleConfigExs.push_back((string) "stale config detected for " + e.toString() + errLoc); _cursors[i].reset(NULL, NULL); conns[i]->done(); continue; } catch (std::exception& e) { otherExs.push_back(e.what() + errLoc); _cursors[i].reset(NULL, NULL); conns[i]->done(); continue; } } // Don't exceed our max retries, should not happen verify(retries < 5); } while (retryQueries.size() > 0 /* something to retry */ && (socketExs.size() == 0 || returnPartial) /* no conn issues */ && staleConfigExs.size() == 0 /* no config issues */ && otherExs.size() == 0 /* no other issues */); // Assert that our conns are all closed! for (vector>::iterator i = conns.begin(); i < conns.end(); ++i) { verify(!(*i) || !(*i)->ok()); } // Handle errors we got during initialization. // If we're returning partial results, we can ignore socketExs, but nothing else // Log a warning in any case, so we don't lose these messages bool throwException = (socketExs.size() > 0 && !returnPartial) || staleConfigExs.size() > 0 || otherExs.size() > 0; if (socketExs.size() > 0 || staleConfigExs.size() > 0 || otherExs.size() > 0) { vector errMsgs; errMsgs.insert(errMsgs.end(), staleConfigExs.begin(), staleConfigExs.end()); errMsgs.insert(errMsgs.end(), otherExs.begin(), otherExs.end()); errMsgs.insert(errMsgs.end(), socketExs.begin(), socketExs.end()); std::stringstream errMsg; errMsg << "could not initialize cursor across all shards because : "; for (vector::iterator i = errMsgs.begin(); i != errMsgs.end(); i++) { if (i != errMsgs.begin()) errMsg << " :: and :: "; errMsg << *i; } if (throwException && staleConfigExs.size() > 0) { // Version is zero b/c this is deprecated codepath uasserted(StaleConfigInfo(NamespaceString(_ns), ChunkVersion(0, 0, OID()), ChunkVersion(0, 0, OID())), errMsg.str()); } else if (throwException) { uasserted(14827, errMsg.str()); } else { warning() << redact(errMsg.str()); } } if (retries > 0) log() << "successfully finished parallel query after " << retries << " retries"; } bool ParallelSortClusteredCursor::more() { if (_needToSkip > 0) { int n = _needToSkip; _needToSkip = 0; while (n > 0 && more()) { next(); n--; } _needToSkip = n; } for (int i = 0; i < _numServers; i++) { if (_cursors[i].get() && _cursors[i].get()->more()) return true; } return false; } BSONObj ParallelSortClusteredCursor::next() { BSONObj best = BSONObj(); int bestFrom = -1; for (int j = 0; j < _numServers; j++) { // Iterate _numServers times, starting one past the last server we used. // This means we actually start at server #1, not #0, but shouldn't matter int i = (j + _lastFrom + 1) % _numServers; // Check to see if the cursor is finished if (!_cursors[i].get() || !_cursors[i].get()->more()) { if (_cursors[i].getMData()) _cursors[i].getMData()->pcState->done = true; continue; } // We know we have at least one result in this cursor BSONObj me = _cursors[i].get()->peekFirst(); // If this is the first non-empty cursor, save the result as best if (bestFrom < 0) { best = me; bestFrom = i; if (_sortKey.isEmpty()) break; continue; } // Otherwise compare the result to the current best result int comp = dps::compareObjectsAccordingToSort(best, me, _sortKey, true); if (comp < 0) continue; best = me; bestFrom = i; } _lastFrom = bestFrom; uassert(10019, "no more elements", bestFrom >= 0); _cursors[bestFrom].get()->next(); // Make sure the result data won't go away after the next call to more() if (!_cursors[bestFrom].get()->moreInCurrentBatch()) { best = best.getOwned(); } if (_cursors[bestFrom].getMData()) _cursors[bestFrom].getMData()->pcState->count++; return best; } void ParallelConnectionMetadata::cleanup(bool full) { if (full || errored) retryNext = false; if (!retryNext && pcState) { if (initialized && !errored) { verify(pcState->cursor); verify(pcState->conn); if (!finished && pcState->conn->ok()) { try { // Complete the call if only halfway done bool retry = false; pcState->cursor->initLazyFinish(retry); } catch (std::exception&) { warning() << "exception closing cursor"; } catch (...) { warning() << "unknown exception closing cursor"; } } } // Double-check conn is closed if (pcState->conn) { pcState->conn->done(); } pcState.reset(); } else verify(finished || !initialized); initialized = false; finished = false; completed = false; errored = false; } BSONObj ParallelConnectionMetadata::toBSON() const { return BSON("state" << (pcState ? pcState->toBSON() : BSONObj()) << "retryNext" << retryNext << "init" << initialized << "finish" << finished << "errored" << errored); } std::string ParallelConnectionState::toString() const { return str::stream() << "PCState : " << toBSON(); } BSONObj ParallelConnectionState::toBSON() const { BSONObj cursorPeek = BSON("no cursor" << ""); if (cursor) { vector v; cursor->peek(v, 1); if (v.size() == 0) cursorPeek = BSON("no data" << ""); else cursorPeek = BSON("" << v[0]); } BSONObj stateObj = BSON("conn" << (conn ? (conn->ok() ? conn->conn().toString() : "(done)") : "") << "vinfo" << (manager ? (str::stream() << manager->getns().ns() << " @ " << manager->getVersion().toString()) : primary->toString())); // Append cursor data if exists BSONObjBuilder stateB; stateB.appendElements(stateObj); if (!cursor) stateB.append("cursor", "(none)"); else { vector v; cursor->peek(v, 1); if (v.size() == 0) stateB.append("cursor", "(empty)"); else stateB.append("cursor", v[0]); } stateB.append("count", count); stateB.append("done", done); return stateB.obj().getOwned(); } void throwCursorStale(DBClientCursor* cursor) { invariant(cursor); if (cursor->hasResultFlag(ResultFlag_ShardConfigStale)) { BSONObj error; cursor->peekError(&error); uasserted(StaleConfigInfo::parseFromCommandError(error), "query returned a stale config error"); } if (NamespaceString(cursor->getns()).isCommand()) { // Commands that care about versioning (like the count or geoNear command) sometimes return // with the stale config error code, but don't set the ShardConfigStale result flag on the // cursor. // // TODO: Standardize stale config reporting. BSONObj res = cursor->peekFirst(); auto status = getStatusFromCommandResult(res); if (status == ErrorCodes::StaleConfig) { uassertStatusOK(status.withContext("command returned a stale config error")); } } } } // namespace mongo