summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorBlake Oler <blake.oler@mongodb.com>2020-04-08 19:38:49 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-04-13 15:45:54 +0000
commit8cf0fbc5815cb1c75a698f8f1e1ac38cca1bfa65 (patch)
tree50b5fd30b7747904fdebf5ad369969f264d6d540 /src/mongo
parent17ada65dd6eef3867c2a2890780002729c0abe41 (diff)
downloadmongo-8cf0fbc5815cb1c75a698f8f1e1ac38cca1bfa65.tar.gz
SERVER-47426 Remove ParallelSortClusteredCursor
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/s/SConscript1
-rw-r--r--src/mongo/s/client/parallel.cpp1503
-rw-r--r--src/mongo/s/client/parallel.h176
-rw-r--r--src/mongo/s/commands/strategy.cpp30
4 files changed, 0 insertions, 1710 deletions
diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript
index a16fb3b1ee8..709e0710c97 100644
--- a/src/mongo/s/SConscript
+++ b/src/mongo/s/SConscript
@@ -37,7 +37,6 @@ env.Library(
env.Library(
target='sharding_legacy_api',
source=[
- 'client/parallel.cpp',
'client/shard_connection.cpp',
env.Idlc('client/shard_connection.idl')[0],
'client/version_manager.cpp',
diff --git a/src/mongo/s/client/parallel.cpp b/src/mongo/s/client/parallel.cpp
deleted file mode 100644
index 5161808c140..00000000000
--- a/src/mongo/s/client/parallel.cpp
+++ /dev/null
@@ -1,1503 +0,0 @@
-/**
- * Copyright (C) 2018-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/s/client/parallel.h"
-
-#include "mongo/client/constants.h"
-#include "mongo/client/dbclient_cursor.h"
-#include "mongo/client/dbclient_rs.h"
-#include "mongo/client/replica_set_monitor.h"
-#include "mongo/db/bson/dotted_path_support.h"
-#include "mongo/db/query/query_request.h"
-#include "mongo/logv2/log.h"
-#include "mongo/rpc/get_status_from_command_result.h"
-#include "mongo/s/catalog_cache.h"
-#include "mongo/s/client/shard_connection.h"
-#include "mongo/s/client/shard_registry.h"
-#include "mongo/s/grid.h"
-#include "mongo/util/net/socket_exception.h"
-
-namespace mongo {
-
-using std::map;
-using std::set;
-using std::shared_ptr;
-using std::string;
-using std::vector;
-
-namespace dps = ::mongo::dotted_path_support;
-
-namespace {
-
-/**
- * Throws an exception wrapping the error document in this cursor when the error flag is set.
- */
-void throwCursorError(DBClientCursor* cursor) {
- verify(cursor);
-
- if (cursor->hasResultFlag(ResultFlag_ErrSet)) {
- uassertStatusOK(getStatusFromCommandResult(cursor->next()));
- }
-}
-
-} // namespace
-
-struct ParallelConnectionState {
- ParallelConnectionState() : count(0), done(false) {}
-
- std::string toString() const;
-
- BSONObj toBSON() const;
-
- // Please do not reorder. cursor destructor can use conn.
- // On a related note, never attempt to cleanup these pointers manually.
- std::shared_ptr<ShardConnection> conn;
- std::shared_ptr<DBClientCursor> cursor;
-
- // Version information
- std::shared_ptr<ChunkManager> manager;
- std::shared_ptr<Shard> primary;
-
- // Cursor status information
- long long count;
- bool done;
-};
-
-struct ParallelConnectionMetadata {
- ParallelConnectionMetadata()
- : retryNext(false), initialized(false), finished(false), completed(false), errored(false) {}
-
- ~ParallelConnectionMetadata() {
- cleanup(true);
- }
-
- void cleanup(bool full = true);
-
- std::shared_ptr<ParallelConnectionState> pcState;
-
- bool retryNext;
-
- bool initialized;
- bool finished;
- bool completed;
-
- bool errored;
-
- BSONObj toBSON() const;
-
- std::string toString() const {
- return str::stream() << "PCMData : " << toBSON();
- }
-};
-
-/**
- * Helper class to manage ownership of opened cursors while merging results.
- *
- * TODO: Choose one set of ownership semantics so that this isn't needed - merge sort via mapreduce
- * is the main issue since it has no metadata and this holder owns the cursors.
- */
-class DBClientCursorHolder {
-public:
- DBClientCursorHolder() = default;
-
- void reset(DBClientCursor* cursor, ParallelConnectionMetadata* pcmData) {
- _cursor.reset(cursor);
- _pcmData.reset(pcmData);
- }
-
- DBClientCursor* get() {
- return _cursor.get();
- }
-
- ParallelConnectionMetadata* getMData() {
- return _pcmData.get();
- }
-
- void release() {
- _cursor.release();
- _pcmData.release();
- }
-
-private:
- std::unique_ptr<DBClientCursor> _cursor;
- std::unique_ptr<ParallelConnectionMetadata> _pcmData;
-};
-
-// -------- ParallelSortClusteredCursor -----------
-
-ParallelSortClusteredCursor::ParallelSortClusteredCursor(const QuerySpec& qSpec,
- const CommandInfo& cInfo)
- : _qSpec(qSpec), _cInfo(cInfo), _totalTries(0) {
- _done = false;
- _didInit = false;
-
- _finishCons();
-}
-
-ParallelSortClusteredCursor::ParallelSortClusteredCursor(const set<string>& servers,
- const string& ns,
- const Query& q,
- int options,
- const BSONObj& fields)
- : _servers(servers) {
- _sortKey = q.getSort().copy();
- _needToSkip = 0;
-
- _done = false;
- _didInit = false;
-
- // Populate legacy fields
- _ns = ns;
- _query = q.obj.getOwned();
- _options = options;
- _fields = fields.getOwned();
- _batchSize = 0;
-
- _finishCons();
-}
-
-ParallelSortClusteredCursor::~ParallelSortClusteredCursor() {
- // WARNING: Commands (in particular M/R) connect via _oldInit() directly to shards
- bool isDirectShardCursor = _cursorMap.empty();
-
- // Non-direct shard cursors are owned by the _cursorMap, so we release
- // them in the array here. Direct shard cursors clean themselves.
- if (!isDirectShardCursor) {
- for (int i = 0; i < _numServers; i++)
- _cursors[i].release();
- }
-
- delete[] _cursors;
- _cursors = nullptr;
-
- // Clear out our metadata after removing legacy cursor data
- _cursorMap.clear();
-
- // Just to be sure
- _done = true;
-}
-
-void ParallelSortClusteredCursor::init(OperationContext* opCtx) {
- if (_didInit)
- return;
- _didInit = true;
-
- if (!_qSpec.isEmpty()) {
- fullInit(opCtx);
- } else {
- // You can only get here by using the legacy constructor
- // TODO: Eliminate this
- _oldInit(opCtx);
- }
-}
-
-void ParallelSortClusteredCursor::_finishCons() {
- _numServers = _servers.size();
- _lastFrom = 0;
- _cursors = nullptr;
-
- if (!_qSpec.isEmpty()) {
- _needToSkip = _qSpec.ntoskip();
- _cursors = nullptr;
- _sortKey = _qSpec.sort();
- _fields = _qSpec.fields();
- }
-
- // Partition sort key fields into (a) text meta fields and (b) all other fields.
- set<string> textMetaSortKeyFields;
- set<string> normalSortKeyFields;
-
- // Transform _sortKey fields {a:{$meta:"textScore"}} into {a:-1}, in order to apply the
- // merge sort for text metadata in the correct direction.
- BSONObjBuilder transformedSortKeyBuilder;
-
- BSONObjIterator sortKeyIt(_sortKey);
- while (sortKeyIt.more()) {
- BSONElement e = sortKeyIt.next();
- if (QueryRequest::isTextScoreMeta(e)) {
- textMetaSortKeyFields.insert(e.fieldName());
- transformedSortKeyBuilder.append(e.fieldName(), -1);
- } else {
- normalSortKeyFields.insert(e.fieldName());
- transformedSortKeyBuilder.append(e);
- }
- }
- _sortKey = transformedSortKeyBuilder.obj();
-
- // Verify that that all text metadata sort fields are in the projection. For all other sort
- // fields, copy them into the projection if they are missing (and if projection is
- // negative).
- if (!_sortKey.isEmpty() && !_fields.isEmpty()) {
- BSONObjBuilder b;
- bool isNegative = false;
- {
- BSONObjIterator i(_fields);
- while (i.more()) {
- BSONElement e = i.next();
- b.append(e);
-
- string fieldName = e.fieldName();
-
- if (QueryRequest::isTextScoreMeta(e)) {
- textMetaSortKeyFields.erase(fieldName);
- } else {
- // exact field
- bool found = normalSortKeyFields.erase(fieldName);
-
- // subfields
- set<string>::const_iterator begin =
- normalSortKeyFields.lower_bound(fieldName + ".\x00");
- set<string>::const_iterator end =
- normalSortKeyFields.lower_bound(fieldName + ".\xFF");
- normalSortKeyFields.erase(begin, end);
-
- if (!e.trueValue()) {
- uassert(13431,
- "have to have sort key in projection and removing it",
- !found && begin == end);
- } else if (!e.isABSONObj()) {
- isNegative = true;
- }
- }
- }
- }
-
- if (isNegative) {
- for (set<string>::const_iterator it(normalSortKeyFields.begin()),
- end(normalSortKeyFields.end());
- it != end;
- ++it) {
- b.append(*it, 1);
- }
- }
-
- _fields = b.obj();
- }
-
- if (!_qSpec.isEmpty()) {
- _qSpec.setFields(_fields);
- }
-
- uassert(
- 17306, "have to have all text meta sort keys in projection", textMetaSortKeyFields.empty());
-}
-
-void ParallelSortClusteredCursor::fullInit(OperationContext* opCtx) {
- startInit(opCtx);
- finishInit(opCtx);
-}
-
-void ParallelSortClusteredCursor::_markStaleNS(const NamespaceString& staleNS,
- const StaleConfigException& e) {
- if (_staleNSMap.find(staleNS.ns()) == _staleNSMap.end()) {
- _staleNSMap[staleNS.ns()] = 1;
- }
-
- const int tries = ++_staleNSMap[staleNS.ns()];
-
- if (tries >= 5) {
- uassertStatusOK(e.toStatus("too many retries of stale version info"));
- }
-}
-
-void ParallelSortClusteredCursor::setupVersionAndHandleSlaveOk(
- OperationContext* opCtx,
- std::shared_ptr<ParallelConnectionState> state,
- const ShardId& shardId,
- std::shared_ptr<Shard> primary,
- const NamespaceString& ns,
- const string& vinfo,
- std::shared_ptr<ChunkManager> manager) {
- if (manager) {
- state->manager = manager;
- } else if (primary) {
- state->primary = primary;
- }
-
- verify(!primary || shardId == primary->getId());
-
- // Setup conn
- if (!state->conn) {
- const auto shard =
- uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, shardId));
- state->conn.reset(new ShardConnection(opCtx, shard->getConnString(), ns.ns(), manager));
- }
-
- const DBClientBase* rawConn = state->conn->getRawConn();
- bool allowShardVersionFailure = rawConn->type() == ConnectionString::SET &&
- DBClientReplicaSet::isSecondaryQuery(_qSpec.ns(), _qSpec.query(), _qSpec.options());
-
- // Skip shard version checking if primary is known to be down.
- if (allowShardVersionFailure) {
- const DBClientReplicaSet* replConn = dynamic_cast<const DBClientReplicaSet*>(rawConn);
- invariant(replConn);
- auto rsMonitor = ReplicaSetMonitor::get(replConn->getSetName());
- uassert(16388,
- str::stream() << "cannot access unknown replica set: " << replConn->getSetName(),
- rsMonitor != nullptr);
- if (!rsMonitor->isKnownToHaveGoodPrimary()) {
- state->conn->donotCheckVersion();
-
- // A side effect of this short circuiting is the mongos will not be able figure out
- // that the primary is now up on it's own and has to rely on other threads to refresh
- // node states.
-
- static Occasionally sampler;
- if (sampler.tick()) {
- const DBClientReplicaSet* repl = dynamic_cast<const DBClientReplicaSet*>(rawConn);
- dassert(repl);
- LOGV2_WARNING(
- 22695,
- "Primary for {replicaSetAddress} was down before, bypassing "
- "setShardVersion. The local replica set view and targeting may be stale.",
- "Bypassing setShardVersion because replica set view and targeting may be stale "
- "from primary having been down",
- "replicaSetAddress"_attr = repl->getServerAddress());
- }
-
- return;
- }
- }
-
- try {
- if (state->conn->setVersion()) {
- LOGV2_DEBUG(22678,
- 2,
- "pcursor: needed to set remote version on connection to value compatible "
- "with {shardVersion}",
- "pcursor: needed to set remote version on connection",
- "shardVersion"_attr = vinfo);
- }
- } catch (const DBException& dbExcep) {
- auto errCode = dbExcep.code();
- if (allowShardVersionFailure &&
- (ErrorCodes::isNotMasterError(errCode) || ErrorCodes::isNetworkError(errCode) ||
- errCode == ErrorCodes::FailedToSatisfyReadPreference)) {
- // It's okay if we don't set the version when talking to a secondary, we can
- // be stale in any case.
-
- static Occasionally sampler;
- if (sampler.tick()) {
- const DBClientReplicaSet* repl =
- dynamic_cast<const DBClientReplicaSet*>(state->conn->getRawConn());
- dassert(repl);
- LOGV2_WARNING(
- 22696,
- "Cannot contact replica set {replicaSetAdress} to check shard version. "
- "The local replica set view and targeting may be stale",
- "Cannot contact replica set to check shard version. "
- "The local replica set view and targeting may be stale",
- "replicaSetAddress"_attr = repl->getServerAddress());
- }
- } else {
- throw;
- }
- }
-}
-
-void ParallelSortClusteredCursor::startInit(OperationContext* opCtx) {
- const bool returnPartial = (_qSpec.options() & QueryOption_PartialResults);
- const NamespaceString nss(!_cInfo.isEmpty() ? _cInfo.versionedNS : _qSpec.ns());
-
- string prefix;
- if (MONGO_unlikely(shouldLog(logv2::LogSeverity::Debug(2)))) {
- if (_totalTries > 0) {
- prefix = str::stream() << "retrying (" << _totalTries << " tries)";
- } else {
- prefix = "creating";
- }
- }
- LOGV2_DEBUG(22679,
- 2,
- "pcursor: {prefix} pcursor over {query} and {command}",
- "pcursor",
- "prefix"_attr = prefix,
- "query"_attr = _qSpec,
- "command"_attr = _cInfo);
-
- shared_ptr<ChunkManager> manager;
- shared_ptr<Shard> primary;
-
- {
- auto routingInfoStatus =
- Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss);
- if (routingInfoStatus != ErrorCodes::NamespaceNotFound) {
- auto routingInfo = uassertStatusOK(std::move(routingInfoStatus));
- manager = routingInfo.cm();
- // ParallelSortClusteredCursor has two states - either !cm && primary, which means
- // unsharded collection, or cm && !primary, which means sharded collection.
- if (!manager) {
- primary = routingInfo.db().primary();
- }
- }
- }
-
- set<ShardId> shardIds;
- string vinfo;
-
- if (manager) {
- if (MONGO_unlikely(shouldLog(logv2::LogSeverity::Debug(2)))) {
- vinfo = str::stream() << "[" << manager->getns().ns() << " @ "
- << manager->getVersion().toString() << "]";
- }
-
- manager->getShardIdsForQuery(opCtx,
- !_cInfo.isEmpty() ? _cInfo.cmdFilter : _qSpec.filter(),
- !_cInfo.isEmpty() ? _cInfo.cmdCollation : BSONObj(),
- &shardIds);
- } else if (primary) {
- if (MONGO_unlikely(shouldLog(logv2::LogSeverity::Debug(2)))) {
- vinfo = str::stream() << "[unsharded @ " << primary->toString() << "]";
- }
-
- shardIds.insert(primary->getId());
- }
-
- // Close all cursors on extra shards first, as these will be invalid
- for (auto& cmEntry : _cursorMap) {
- const auto& shardId = cmEntry.first;
-
- if (shardIds.find(shardId) == shardIds.end()) {
- LOGV2_DEBUG(22680,
- 2,
- "pcursor: closing cursor on shard {shardId} as the connection is no longer "
- "required by {shardVersion}",
- "pcursor: closing cursor as the connection is no longer required",
- "shardId"_attr = shardId,
- "shardVersion"_attr = vinfo);
-
- cmEntry.second.cleanup(true);
- }
- }
-
- LOGV2_DEBUG(22681,
- 2,
- "pcursor: initializing over {shardsNumber} shards required by {shardVersion}",
- "pcursor: initializing for all shards",
- "shardsNumber"_attr = shardIds.size(),
- "shardVersion"_attr = vinfo);
-
- // Don't retry indefinitely for whatever reason
- _totalTries++;
- uassert(15986, "too many retries in total", _totalTries < 10);
-
- for (const ShardId& shardId : shardIds) {
- auto& mdata = _cursorMap[shardId];
-
- LOGV2_DEBUG(22682,
- 2,
- "pcursor: initializing on shard {shardId}, current connection state is "
- "{connectionState}",
- "pcursor: initializing on shard",
- "shardId"_attr = shardId,
- "connectionState"_attr = mdata.toBSON());
-
- // This may be the first time connecting to this shard, if so we can get an error here
- try {
- if (mdata.initialized) {
- invariant(mdata.pcState);
-
- auto state = mdata.pcState;
-
- bool compatiblePrimary = true;
- bool compatibleManager = true;
-
- if (primary && !state->primary)
- LOGV2_WARNING(22697, "Collection becoming unsharded detected");
- if (manager && !state->manager)
- LOGV2_WARNING(22698, "Collection becoming sharded detected");
- if (primary && state->primary && primary != state->primary)
- LOGV2_WARNING(22699, "Weird shift of primary detected");
-
- compatiblePrimary = primary && state->primary && primary == state->primary;
- compatibleManager =
- manager && state->manager && manager->compatibleWith(*state->manager, shardId);
-
- if (compatiblePrimary || compatibleManager) {
- // If we're compatible, don't need to retry unless forced
- if (!mdata.retryNext)
- continue;
- // Do partial cleanup
- mdata.cleanup(false);
- } else {
- // Force total cleanup of connection if no longer compatible
- mdata.cleanup(true);
- }
- } else {
- // Cleanup connection if we're not yet initialized
- mdata.cleanup(false);
- }
-
- mdata.pcState = std::make_shared<ParallelConnectionState>();
- auto state = mdata.pcState;
-
- setupVersionAndHandleSlaveOk(opCtx, state, shardId, primary, nss, vinfo, manager);
-
- const string& ns = _qSpec.ns();
-
- // Setup cursor
- if (!state->cursor) {
- //
- // Here we decide whether to split the query limits up for multiple shards.
- // NOTE: There's a subtle issue here, in that it's possible we target a single
- // shard first, but are stale, and then target multiple shards, or vice-versa.
- // In both these cases, we won't re-use the old cursor created here, since the
- // shard version must have changed on the single shard between queries.
- //
-
- if (shardIds.size() > 1) {
- // Query limits split for multiple shards
-
- state->cursor.reset(new DBClientCursor(
- state->conn->get(),
- NamespaceString(ns),
- _qSpec.query(),
- isCommand() ? 1 : 0, // nToReturn (0 if query indicates multi)
- 0, // nToSkip
- // Does this need to be a ptr?
- _qSpec.fields().isEmpty() ? nullptr
- : _qSpec.fieldsData(), // fieldsToReturn
- _qSpec.options(), // options
- // NtoReturn is weird.
- // If zero, it means use default size, so we do that for all cursors
- // If positive, it's the batch size (we don't want this cursor limiting
- // results), that's done at a higher level
- // If negative, it's the batch size, but we don't create a cursor - so we
- // don't want to create a child cursor either.
- // Either way, if non-zero, we want to pull back the batch size + the skip
- // amount as quickly as possible. Potentially, for a cursor on a single
- // shard or if we keep better track of chunks, we can actually add the skip
- // value into the cursor and/or make some assumptions about the return value
- // size ( (batch size + skip amount) / num_servers ).
- _qSpec.ntoreturn() == 0
- ? 0
- : (_qSpec.ntoreturn() > 0
- ? _qSpec.ntoreturn() + _qSpec.ntoskip()
- : _qSpec.ntoreturn() - _qSpec.ntoskip()))); // batchSize
- } else {
- // Single shard query
-
- state->cursor.reset(new DBClientCursor(
- state->conn->get(),
- NamespaceString(ns),
- _qSpec.query(),
- _qSpec.ntoreturn(), // nToReturn
- _qSpec.ntoskip(), // nToSkip
- // Does this need to be a ptr?
- _qSpec.fields().isEmpty() ? nullptr
- : _qSpec.fieldsData(), // fieldsToReturn
- _qSpec.options(), // options
- 0)); // batchSize
- }
- }
-
- bool lazyInit = state->conn->get()->lazySupported();
- if (lazyInit) {
- // Need to keep track if this is a second or third try for replica sets
- state->cursor->initLazy(mdata.retryNext);
- mdata.retryNext = false;
- mdata.initialized = true;
- } else {
- bool success = state->cursor->init();
-
- // Without full initialization, throw an exception
- uassert(15987,
- str::stream()
- << "could not fully initialize cursor on shard " << shardId
- << ", current connection state is " << mdata.toBSON().toString(),
- success);
-
- mdata.retryNext = false;
- mdata.initialized = true;
- mdata.finished = true;
- }
-
- LOGV2_DEBUG(22683,
- 2,
- "pcursor: initialized {commandType} ({lazyInit}) on shard {shardId}, "
- "current connection state is {connectionState}",
- "pcursor: initialized command",
- "commandType"_attr = (isCommand() ? "command" : "query"),
- "lazyInit"_attr = (lazyInit ? "lazily" : "full"),
- "shardId"_attr = shardId,
- "connectionState"_attr = mdata.toBSON());
- } catch (StaleConfigException& e) {
- // Our version isn't compatible with the current version anymore on at least one shard,
- // need to retry immediately
- NamespaceString staleNS(e->getNss());
- _markStaleNS(staleNS, e);
-
- Grid::get(opCtx)
- ->catalogCache()
- ->invalidateShardOrEntireCollectionEntryForShardedCollection(
- opCtx, nss, e->getVersionWanted(), e->getVersionReceived(), e->getShardId());
-
- LOGV2_DEBUG(22684,
- 1,
- "Error initializing pcursor, stale config for namespace {namespace}, "
- "caused by {error}, will retry",
- "Error initializing pcursor, will retry",
- "namespace"_attr = staleNS,
- "error"_attr = redact(e));
-
- // This is somewhat strange
- if (staleNS != nss) {
- LOGV2_WARNING(22700,
- "Versioned namespace {namespace} doesn't match stale config "
- "namespace {staleNamespace}",
- "Versioned namespace doesn't match stale config namespace",
- "namespace"_attr = nss.ns(),
- "staleNamespace"_attr = staleNS);
- }
-
- // Restart with new chunk manager
- startInit(opCtx);
- return;
- } catch (NetworkException& e) {
- LOGV2_WARNING(22701,
- "Error initializing pcursor on {shardId} with current connection state "
- "{connectionState} caused by {error}",
- "Error initializing pcursor",
- "shardId"_attr = shardId,
- "connectionState"_attr = mdata.toBSON(),
- "error"_attr = redact(e));
- mdata.errored = true;
- if (returnPartial) {
- mdata.cleanup(true);
- continue;
- }
- throw;
- } catch (DBException& e) {
- LOGV2_WARNING(22702,
- "Error initializing pcursor on {shardId} with current connection state "
- "{connectionState} caused by {error}",
- "Error initializing pcursor",
- "shardId"_attr = shardId,
- "connectionState"_attr = mdata.toBSON(),
- "error"_attr = redact(e));
- mdata.errored = true;
- if (returnPartial && e.code() == 15925 /* From above! */) {
- mdata.cleanup(true);
- continue;
- }
- throw;
- } catch (std::exception& e) {
- LOGV2_WARNING(22703,
- "Error initializing pcursor on {shardId} with current connection state "
- "{connectionState} caused by {error}",
- "Error initializing pcursor",
- "shardId"_attr = shardId,
- "connectionState"_attr = mdata.toBSON(),
- "error"_attr = redact(e.what()));
- mdata.errored = true;
- throw;
- } catch (...) {
- LOGV2_WARNING(22704,
- "Error initializing pcursor on {shardId} with current connection state "
- "{connectionState} caused by {error}",
- "Error initializing pcursor",
- "shardId"_attr = shardId,
- "connectionState"_attr = mdata.toBSON(),
- "error"_attr = "uknownError");
- mdata.errored = true;
- throw;
- }
- }
-
- // Sanity check final init'ed connections
- for (const auto& cmEntry : _cursorMap) {
- const auto& shardId = cmEntry.first;
- const auto& mdata = cmEntry.second;
-
- if (!mdata.pcState) {
- continue;
- }
-
- // Make sure all state is in shards
- invariant(shardIds.find(shardId) != shardIds.end());
- invariant(mdata.initialized == true);
-
- if (!mdata.completed) {
- invariant(mdata.pcState->conn->ok());
- }
-
- invariant(mdata.pcState->cursor);
- invariant(mdata.pcState->primary || mdata.pcState->manager);
- invariant(!mdata.retryNext);
-
- if (mdata.completed) {
- invariant(mdata.finished);
- }
-
- if (mdata.finished) {
- invariant(mdata.initialized);
- }
-
- if (!returnPartial) {
- invariant(mdata.initialized);
- }
- }
-}
-
-void ParallelSortClusteredCursor::finishInit(OperationContext* opCtx) {
- bool returnPartial = (_qSpec.options() & QueryOption_PartialResults);
- bool specialVersion = _cInfo.versionedNS.size() > 0;
- string ns = specialVersion ? _cInfo.versionedNS : _qSpec.ns();
-
- bool retry = false;
- map<string, StaleConfigException> staleNSExceptions;
-
- LOGV2_DEBUG(22685,
- 2,
- "pcursor: finishing over {shardsCount} shards",
- "pcursor: finishing initialization",
- "shardsCount"_attr = _cursorMap.size());
-
- for (auto& cmEntry : _cursorMap) {
- const auto& shardId = cmEntry.first;
- auto& mdata = cmEntry.second;
-
- LOGV2_DEBUG(22686,
- 2,
- "pcursor: finishing initialization on shard {shardId}, current connection "
- "state is {connectionState}",
- "pcursor: finishing initialization for shard",
- "shardId"_attr = shardId,
- "connectionState"_attr = mdata.toBSON());
-
- // Ignore empty conns for now
- if (!mdata.pcState)
- continue;
-
- auto state = mdata.pcState;
-
- try {
- // Sanity checks
- if (!mdata.completed)
- verify(state->conn && state->conn->ok());
- verify(state->cursor);
- verify(state->manager || state->primary);
- verify(!state->manager || !state->primary);
-
-
- // If we weren't init'ing lazily, ignore this
- if (!mdata.finished) {
- mdata.finished = true;
-
- // Mark the cursor as non-retry by default
- mdata.retryNext = false;
-
- if (!state->cursor->initLazyFinish(mdata.retryNext)) {
- if (!mdata.retryNext) {
- uassert(15988, "error querying server", false);
- } else {
- retry = true;
- continue;
- }
- }
-
- mdata.completed = false;
- }
-
- if (!mdata.completed) {
- mdata.completed = true;
-
- // Make sure we didn't get an error we should rethrow
- // TODO : Refactor this to something better
- throwCursorStale(state->cursor.get());
- throwCursorError(state->cursor.get());
-
- // Finalize state
- state->cursor->attach(state->conn.get()); // Closes connection for us
-
- LOGV2_DEBUG(22687,
- 2,
- "pcursor: finished on shard {shardId}, current connection state is "
- "{connectionState}",
- "pcursor: finished initialization for shard",
- "shardId"_attr = shardId,
- "connectionState"_attr = mdata.toBSON());
- }
- } catch (StaleConfigException& e) {
- retry = true;
-
- std::string staleNS = e->getNss().ns();
-
- // Will retry all at once
- staleNSExceptions.emplace(staleNS, e);
-
- // Fully clear this cursor, as it needs to be re-established
- mdata.cleanup(true);
- continue;
- } catch (NetworkException& e) {
- LOGV2_WARNING(22705,
- "Error finalizing pcursor initialization for shard {shardId} with "
- "connection state {connectionState} caused by {error}",
- "Error finalizing pcursor initialization for shard",
- "shardId"_attr = shardId,
- "connectionState"_attr = mdata.toBSON(),
- "error"_attr = redact(e));
- mdata.errored = true;
- if (returnPartial) {
- mdata.cleanup(true);
- continue;
- }
- throw;
- } catch (DBException& e) {
- // NOTE: RECV() WILL NOT THROW A SOCKET EXCEPTION - WE GET THIS AS ERROR 15988 FROM
- // ABOVE
- if (e.code() == 15988) {
- LOGV2_WARNING(22706,
- "Error finalizing pcursor initialization for shard {shardId} with "
- "connection state {connectionState} caused by {error}",
- "Error finalizing pcursor initialization for shard",
- "shardId"_attr = shardId,
- "connectionState"_attr = mdata.toBSON(),
- "error"_attr = redact(e));
-
- mdata.errored = true;
- if (returnPartial) {
- mdata.cleanup(true);
- continue;
- }
- throw;
- } else {
- // the InvalidBSON exception indicates that the BSON is malformed ->
- // don't print/call "mdata.toBSON()" to avoid unexpected errors e.g. a segfault
- if (e.code() == ErrorCodes::InvalidBSON)
- LOGV2_WARNING(22707,
- "Error finalizing pcursor initialization for shard "
- "{shardId} caused by {error}",
- "Error finalizing pcursor initialization for shard",
- "shardId"_attr = shardId,
- "error"_attr = redact(e));
- else
- LOGV2_WARNING(
- 22708,
- "Error finalizing pcursor initialization for shard {shardId} with "
- "connection state {connectionState} caused by {error}",
- "Error finalizing pcursor initialization for shard",
- "shardId"_attr = shardId,
- "connectionState"_attr = mdata.toBSON(),
- "error"_attr = redact(e));
- mdata.errored = true;
- throw;
- }
- } catch (std::exception& e) {
- LOGV2_WARNING(22709,
- "Error finalizing pcursor initialization for shard {shardId} with "
- "connection state {connectionState} caused by {error}",
- "Error finalizing pcursor initialization for shard",
- "shardId"_attr = shardId,
- "connectionState"_attr = mdata.toBSON(),
- "error"_attr = redact(e.what()));
- mdata.errored = true;
- throw;
- } catch (...) {
- LOGV2_WARNING(
- 22710,
- "Unknown error finalizing pcursor initialization for shard {shardId} with "
- "connection state {connectionState}",
- "Unknown error finalizing pcursor initialization for shard",
- "shardId"_attr = shardId,
- "connectionState"_attr = mdata.toBSON());
- mdata.errored = true;
- throw;
- }
- }
-
- // Retry logic for single refresh of namespaces / retry init'ing connections
- if (retry) {
- // Refresh stale namespaces
- if (staleNSExceptions.size()) {
- for (const auto& exEntry : staleNSExceptions) {
- const NamespaceString staleNS(exEntry.first);
- const StaleConfigException& ex = exEntry.second;
-
- _markStaleNS(staleNS, ex);
- // If we don't have a shardId (a valid case being receiving an exception from a
- // v4.2 binary), we have to set the entire collection as stale.
- if (!ex->getShardId()) {
- Grid::get(opCtx)->catalogCache()->onEpochChange(staleNS);
- } else {
- Grid::get(opCtx)->catalogCache()->invalidateShardForShardedCollection(
- staleNS, *ex->getShardId());
- }
-
- LOGV2_DEBUG(22688,
- 1,
- "Found stale config for namespace {namespace} on finishing query "
- "caused by {error}",
- "Found stale config for namespace",
- "namespace"_attr = staleNS,
- "error"_attr = redact(ex));
-
- // This is somewhat strange
- if (staleNS.ns() != ns) {
- LOGV2_WARNING(22711,
- "Versioned namespace {namespace} doesn't match stale config "
- "namespace {staleNamespace}",
- "Versioned namespace doesn't match stale config namespace",
- "namespace"_attr = ns,
- "staleNamespace"_attr = staleNS);
- }
- }
- }
-
- // Re-establish connections we need to
- startInit(opCtx);
- finishInit(opCtx);
- return;
- }
-
- // Sanity check and clean final connections
- for (auto i = _cursorMap.begin(); i != _cursorMap.end();) {
- auto& mdata = i->second;
-
- // Erase empty stuff
- if (!mdata.pcState) {
- LOGV2(22689,
- "PCursor erasing empty connection state {connectionState}",
- "PCursor erasing empty connection state",
- "connectionState"_attr = mdata.toBSON());
- _cursorMap.erase(i++);
- continue;
- } else {
- ++i;
- }
-
- // Make sure all state is in shards
- verify(mdata.initialized == true);
- verify(mdata.finished == true);
- verify(mdata.completed == true);
- verify(!mdata.pcState->conn->ok());
- verify(mdata.pcState->cursor);
- verify(mdata.pcState->primary || mdata.pcState->manager);
- }
-
- // TODO : More cleanup of metadata?
-
- // LEGACY STUFF NOW
-
- _cursors = new DBClientCursorHolder[_cursorMap.size()];
-
- // Put the cursors in the legacy format
- int index = 0;
- for (auto& cmEntry : _cursorMap) {
- const auto& shardId = cmEntry.first;
- auto& mdata = cmEntry.second;
-
- _cursors[index].reset(mdata.pcState->cursor.get(), &mdata);
-
- const auto shard =
- uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, shardId));
- _servers.insert(shard->getConnString().toString());
-
- index++;
- }
-
- _numServers = _cursorMap.size();
-}
-
-void ParallelSortClusteredCursor::getQueryShardIds(set<ShardId>& shardIds) const {
- std::transform(_cursorMap.begin(),
- _cursorMap.end(),
- std::inserter(shardIds, shardIds.end()),
- [](const ShardCursorsMap::value_type& pair) { return pair.first; });
-}
-
-std::shared_ptr<DBClientCursor> ParallelSortClusteredCursor::getShardCursor(
- const ShardId& shardId) const {
- auto it = _cursorMap.find(shardId);
- if (it == _cursorMap.end()) {
- return nullptr;
- }
-
- return it->second.pcState->cursor;
-}
-
-// DEPRECATED (but still used by map/reduce)
-void ParallelSortClusteredCursor::_oldInit(OperationContext* opCtx) {
- // make sure we're not already initialized
- verify(!_cursors);
- _cursors = new DBClientCursorHolder[_numServers];
-
- bool returnPartial = (_options & QueryOption_PartialResults);
-
- vector<string> serverHosts(_servers.begin(), _servers.end());
- set<int> retryQueries;
- int finishedQueries = 0;
-
- vector<shared_ptr<ShardConnection>> conns;
- vector<string> servers;
-
- // Since we may get all sorts of errors, record them all as they come and throw them later if
- // necessary
- vector<string> staleConfigExs;
- vector<string> socketExs;
- vector<string> otherExs;
- bool allConfigStale = false;
-
- int retries = -1;
-
- // Loop through all the queries until we've finished or gotten a socket exception on all of them
- // We break early for non-socket exceptions, and socket exceptions if we aren't returning
- // partial results
- do {
- retries++;
-
- bool firstPass = retryQueries.size() == 0;
-
- if (!firstPass) {
- vector<StringData> serverHostsToRetry;
- serverHostsToRetry.reserve(retryQueries.size());
- transform(retryQueries.begin(),
- retryQueries.end(),
- std::back_inserter(serverHostsToRetry),
- [&](const auto& query) { return serverHosts[query]; });
- LOGV2(22690,
- "Finished queries {finishedQueriesCount}, retrying parallel connection to "
- "{serverHostsToRetry}",
- "Retrying parallel connection",
- "returnPartial"_attr = returnPartial,
- "finishedQueriesCount"_attr = finishedQueries,
- "serverHostsToRetry"_attr = serverHostsToRetry);
- }
-
- size_t num = 0;
- for (vector<string>::const_iterator it = serverHosts.begin(); it != serverHosts.end();
- ++it) {
- size_t i = num++;
-
- const string& serverHost = *it;
-
- // If we're not retrying this cursor on later passes, continue
- if (!firstPass && retryQueries.find(i) == retryQueries.end())
- continue;
-
- const string errLoc = " @ " + serverHost;
-
- if (firstPass) {
- // This may be the first time connecting to this shard, if so we can get an error
- // here
- try {
- conns.push_back(shared_ptr<ShardConnection>(new ShardConnection(
- opCtx, uassertStatusOK(ConnectionString::parse(serverHost)), _ns)));
- } catch (std::exception& e) {
- socketExs.push_back(e.what() + errLoc);
- if (!returnPartial) {
- num--;
- break;
- }
-
- conns.push_back(shared_ptr<ShardConnection>());
- continue;
- }
-
- servers.push_back(serverHost);
- }
-
- if (conns[i]->setVersion()) {
- conns[i]->done();
-
- // Version is zero b/c this is deprecated codepath
- staleConfigExs.push_back(str::stream() << "stale config detected for " << _ns
- << " in ParallelCursor::_init " << errLoc);
- break;
- }
-
- LOGV2_DEBUG(22693,
- 5,
- "ParallelSortClusteredCursor initialization",
- "serverHost"_attr = serverHost,
- "namespace"_attr = _ns,
- "query"_attr = redact(_query),
- "fields"_attr = redact(_fields),
- "options"_attr = _options);
-
- if (!_cursors[i].get())
- _cursors[i].reset(
- new DBClientCursor(conns[i]->get(),
- NamespaceString(_ns),
- _query,
- 0, // nToReturn
- 0, // nToSkip
- _fields.isEmpty() ? nullptr : &_fields, // fieldsToReturn
- _options,
- _batchSize == 0 ? 0 : _batchSize + _needToSkip // batchSize
- ),
- nullptr);
-
- try {
- _cursors[i].get()->initLazy(!firstPass);
- } catch (NetworkException& e) {
- socketExs.push_back(e.what() + errLoc);
- _cursors[i].reset(nullptr, nullptr);
- conns[i]->done();
- if (!returnPartial)
- break;
- } catch (std::exception& e) {
- otherExs.push_back(e.what() + errLoc);
- _cursors[i].reset(nullptr, nullptr);
- conns[i]->done();
- break;
- }
- }
-
- // Go through all the potentially started cursors and finish initializing them or log any
- // errors and potentially retry
- // TODO: Better error classification would make this easier, errors are indicated in all
- // sorts of ways here that we need to trap.
- for (size_t i = 0; i < num; i++) {
- const string errLoc = " @ " + serverHosts[i];
-
- if (!_cursors[i].get() || (!firstPass && retryQueries.find(i) == retryQueries.end())) {
- if (conns[i])
- conns[i].get()->done();
- continue;
- }
-
- verify(conns[i]);
- retryQueries.erase(i);
-
- bool retry = false;
-
- try {
- if (!_cursors[i].get()->initLazyFinish(retry)) {
- LOGV2_WARNING(22712,
- "Got invalid result from {host}",
- "Got invalid result",
- "host"_attr = conns[i]->getHost(),
- "retrying"_attr = retry);
- _cursors[i].reset(nullptr, nullptr);
-
- if (!retry) {
- socketExs.push_back(str::stream()
- << "error querying server: " << servers[i]);
- conns[i]->done();
- } else {
- retryQueries.insert(i);
- }
-
- continue;
- }
- } catch (StaleConfigException& e) {
- // Our stored configuration data is actually stale, we need to reload it
- // when we throw our exception
- allConfigStale = true;
-
- staleConfigExs.push_back(
- (string) "stale config detected when receiving response for " + e.toString() +
- errLoc);
- _cursors[i].reset(nullptr, nullptr);
- conns[i]->done();
- continue;
- } catch (NetworkException& e) {
- socketExs.push_back(e.what() + errLoc);
- _cursors[i].reset(nullptr, nullptr);
- conns[i]->done();
- continue;
- } catch (std::exception& e) {
- otherExs.push_back(e.what() + errLoc);
- _cursors[i].reset(nullptr, nullptr);
- conns[i]->done();
- continue;
- }
-
- try {
- _cursors[i].get()->attach(conns[i].get()); // this calls done on conn
- // Rethrow stale config or other errors
- throwCursorStale(_cursors[i].get());
- throwCursorError(_cursors[i].get());
-
- finishedQueries++;
- } catch (StaleConfigException& e) {
- // Our stored configuration data is actually stale, we need to reload it
- // when we throw our exception
- allConfigStale = true;
-
- staleConfigExs.push_back((string) "stale config detected for " + e.toString() +
- errLoc);
- _cursors[i].reset(nullptr, nullptr);
- conns[i]->done();
- continue;
- } catch (std::exception& e) {
- otherExs.push_back(e.what() + errLoc);
- _cursors[i].reset(nullptr, nullptr);
- conns[i]->done();
- continue;
- }
- }
-
- // Don't exceed our max retries, should not happen
- verify(retries < 5);
- } while (retryQueries.size() > 0 /* something to retry */ &&
- (socketExs.size() == 0 || returnPartial) /* no conn issues */ &&
- staleConfigExs.size() == 0 /* no config issues */ &&
- otherExs.size() == 0 /* no other issues */);
-
- // Assert that our conns are all closed!
- for (vector<shared_ptr<ShardConnection>>::iterator i = conns.begin(); i < conns.end(); ++i) {
- verify(!(*i) || !(*i)->ok());
- }
-
- // Handle errors we got during initialization.
- // If we're returning partial results, we can ignore socketExs, but nothing else
- // Log a warning in any case, so we don't lose these messages
- bool throwException = (socketExs.size() > 0 && !returnPartial) || staleConfigExs.size() > 0 ||
- otherExs.size() > 0;
-
- if (socketExs.size() > 0 || staleConfigExs.size() > 0 || otherExs.size() > 0) {
- vector<string> errMsgs;
-
- errMsgs.insert(errMsgs.end(), staleConfigExs.begin(), staleConfigExs.end());
- errMsgs.insert(errMsgs.end(), otherExs.begin(), otherExs.end());
- errMsgs.insert(errMsgs.end(), socketExs.begin(), socketExs.end());
-
- std::stringstream errMsg;
- errMsg << "could not initialize cursor across all shards because : ";
- for (vector<string>::iterator i = errMsgs.begin(); i != errMsgs.end(); i++) {
- if (i != errMsgs.begin())
- errMsg << " :: and :: ";
- errMsg << *i;
- }
-
- if (throwException && staleConfigExs.size() > 0) {
- // Version is zero b/c this is deprecated codepath
- uasserted(StaleConfigInfo(NamespaceString(_ns),
- ChunkVersion(0, 0, OID()),
- ChunkVersion(0, 0, OID()),
- boost::none),
- errMsg.str());
- } else if (throwException) {
- uasserted(14827, errMsg.str());
- } else {
- LOGV2_WARNING(22713,
- "Error while initializing cursor across all shards caused by {error}",
- "Error while initializing cursor across all shards",
- "error"_attr = redact(errMsg.str()));
- }
- }
-
- if (retries > 0)
- LOGV2(22694,
- "Successfully finished parallel query after {retries} retries",
- "Successfully finished parallel query",
- "retries"_attr = retries);
-}
-
-bool ParallelSortClusteredCursor::more() {
- if (_needToSkip > 0) {
- int n = _needToSkip;
- _needToSkip = 0;
-
- while (n > 0 && more()) {
- next();
- n--;
- }
-
- _needToSkip = n;
- }
-
- for (int i = 0; i < _numServers; i++) {
- if (_cursors[i].get() && _cursors[i].get()->more())
- return true;
- }
-
- return false;
-}
-
-BSONObj ParallelSortClusteredCursor::next() {
- BSONObj best = BSONObj();
- int bestFrom = -1;
-
- for (int j = 0; j < _numServers; j++) {
- // Iterate _numServers times, starting one past the last server we used.
- // This means we actually start at server #1, not #0, but shouldn't matter
-
- int i = (j + _lastFrom + 1) % _numServers;
-
- // Check to see if the cursor is finished
- if (!_cursors[i].get() || !_cursors[i].get()->more()) {
- if (_cursors[i].getMData())
- _cursors[i].getMData()->pcState->done = true;
- continue;
- }
-
- // We know we have at least one result in this cursor
- BSONObj me = _cursors[i].get()->peekFirst();
-
- // If this is the first non-empty cursor, save the result as best
- if (bestFrom < 0) {
- best = me;
- bestFrom = i;
- if (_sortKey.isEmpty())
- break;
- continue;
- }
-
- // Otherwise compare the result to the current best result
- int comp = dps::compareObjectsAccordingToSort(best, me, _sortKey, true);
- if (comp < 0)
- continue;
-
- best = me;
- bestFrom = i;
- }
-
- _lastFrom = bestFrom;
-
- uassert(10019, "no more elements", bestFrom >= 0);
- _cursors[bestFrom].get()->next();
-
- // Make sure the result data won't go away after the next call to more()
- if (!_cursors[bestFrom].get()->moreInCurrentBatch()) {
- best = best.getOwned();
- }
-
- if (_cursors[bestFrom].getMData())
- _cursors[bestFrom].getMData()->pcState->count++;
-
- return best;
-}
-
-void ParallelConnectionMetadata::cleanup(bool full) {
- if (full || errored)
- retryNext = false;
-
- if (!retryNext && pcState) {
- if (initialized && !errored) {
- verify(pcState->cursor);
- verify(pcState->conn);
-
- if (!finished && pcState->conn->ok()) {
- try {
- // Complete the call if only halfway done
- bool retry = false;
- pcState->cursor->initLazyFinish(retry);
- } catch (std::exception&) {
- LOGV2_WARNING(22714, "Exception closing cursor");
- } catch (...) {
- LOGV2_WARNING(22715, "Unknown exception closing cursor");
- }
- }
- }
-
- // Double-check conn is closed
- if (pcState->conn) {
- pcState->conn->done();
- }
-
- pcState.reset();
- } else
- verify(finished || !initialized);
-
- initialized = false;
- finished = false;
- completed = false;
- errored = false;
-}
-
-BSONObj ParallelConnectionMetadata::toBSON() const {
- return BSON("state" << (pcState ? pcState->toBSON() : BSONObj()) << "retryNext" << retryNext
- << "init" << initialized << "finish" << finished << "errored" << errored);
-}
-
-std::string ParallelConnectionState::toString() const {
- return str::stream() << "PCState : " << toBSON();
-}
-
-BSONObj ParallelConnectionState::toBSON() const {
- BSONObj cursorPeek = BSON("no cursor"
- << "");
- if (cursor) {
- vector<BSONObj> v;
- cursor->peek(v, 1);
- if (v.size() == 0)
- cursorPeek = BSON("no data"
- << "");
- else
- cursorPeek = BSON("" << v[0]);
- }
-
- BSONObj stateObj =
- BSON("conn" << (conn ? (conn->ok() ? conn->conn().toString() : "(done)") : "") << "vinfo"
- << (manager ? (str::stream() << manager->getns().ns() << " @ "
- << manager->getVersion().toString())
- : primary->toString()));
-
- // Append cursor data if exists
- BSONObjBuilder stateB;
- stateB.appendElements(stateObj);
- if (!cursor)
- stateB.append("cursor", "(none)");
- else {
- vector<BSONObj> v;
- cursor->peek(v, 1);
- if (v.size() == 0)
- stateB.append("cursor", "(empty)");
- else
- stateB.append("cursor", v[0]);
- }
-
- stateB.append("count", count);
- stateB.append("done", done);
-
- return stateB.obj().getOwned();
-}
-
-void throwCursorStale(DBClientCursor* cursor) {
- invariant(cursor);
-
- if (cursor->hasResultFlag(ResultFlag_ShardConfigStale)) {
- BSONObj error;
- cursor->peekError(&error);
- uasserted(StaleConfigInfo::parseFromCommandError(error),
- "query returned a stale config error");
- }
-
- if (NamespaceString(cursor->getns()).isCommand()) {
- // Commands that care about versioning (like the count or geoNear command) sometimes return
- // with the stale config error code, but don't set the ShardConfigStale result flag on the
- // cursor.
- //
- // TODO: Standardize stale config reporting.
- BSONObj res = cursor->peekFirst();
- auto status = getStatusFromCommandResult(res);
- if (status == ErrorCodes::StaleConfig) {
- uassertStatusOK(status.withContext("command returned a stale config error"));
- }
- }
-}
-
-} // namespace mongo
diff --git a/src/mongo/s/client/parallel.h b/src/mongo/s/client/parallel.h
deleted file mode 100644
index 05616aa2a5b..00000000000
--- a/src/mongo/s/client/parallel.h
+++ /dev/null
@@ -1,176 +0,0 @@
-/**
- * Copyright (C) 2018-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#pragma once
-
-#include <set>
-#include <string>
-
-#include "mongo/client/query.h"
-#include "mongo/client/query_spec.h"
-#include "mongo/db/namespace_string.h"
-#include "mongo/s/client/shard.h"
-#include "mongo/s/stale_exception.h"
-
-namespace mongo {
-
-class ChunkManager;
-class DBClientCursor;
-class DBClientCursorHolder;
-class OperationContext;
-struct ParallelConnectionMetadata;
-struct ParallelConnectionState;
-
-struct CommandInfo {
- CommandInfo() = default;
-
- CommandInfo(const std::string& vns, const BSONObj& filter, const BSONObj& collation)
- : versionedNS(vns), cmdFilter(filter), cmdCollation(collation) {}
-
- bool isEmpty() const {
- return versionedNS.empty();
- }
-
- std::string toString() const {
- return str::stream() << "CInfo "
- << BSON("v_ns" << versionedNS << "filter" << cmdFilter << "collation"
- << cmdCollation);
- }
-
- std::string versionedNS;
- BSONObj cmdFilter;
- BSONObj cmdCollation;
-};
-
-/**
- * Runs a query in parallel across N servers, enforcing compatible chunk versions for queries
- * across all shards.
- *
- * If CommandInfo is provided, the ParallelCursor does not use the direct .$cmd namespace in the
- * query spec, but instead enforces versions across another namespace specified by CommandInfo.
- * This is to support commands like:
- * db.runCommand({ fileMD5 : "<coll name>" })
- *
- * There is a deprecated legacy mode as well which effectively does a merge-sort across a number
- * of servers, but does not correctly enforce versioning (used only in mapreduce).
- */
-class ParallelSortClusteredCursor {
-public:
- ParallelSortClusteredCursor(const QuerySpec& qSpec, const CommandInfo& cInfo);
-
- // DEPRECATED legacy constructor for pure mergesort functionality - do not use
- ParallelSortClusteredCursor(const std::set<std::string>& servers,
- const std::string& ns,
- const Query& q,
- int options = 0,
- const BSONObj& fields = BSONObj());
-
- ~ParallelSortClusteredCursor();
-
- void init(OperationContext* opCtx);
-
- bool more();
-
- BSONObj next();
-
- /**
- * Returns the set of shards with open cursors.
- */
- void getQueryShardIds(std::set<ShardId>& shardIds) const;
-
- std::shared_ptr<DBClientCursor> getShardCursor(const ShardId& shardId) const;
-
-private:
- using ShardCursorsMap = std::map<ShardId, ParallelConnectionMetadata>;
-
- void fullInit(OperationContext* opCtx);
- void startInit(OperationContext* opCtx);
- void finishInit(OperationContext* opCtx);
-
- bool isCommand() {
- return NamespaceString(_qSpec.ns()).isCommand();
- }
-
- void _finishCons();
-
- void _markStaleNS(const NamespaceString& staleNS, const StaleConfigException& e);
-
- bool _didInit;
- bool _done;
-
- QuerySpec _qSpec;
- CommandInfo _cInfo;
-
- // Count round-trips req'd for namespaces and total
- std::map<std::string, int> _staleNSMap;
-
- int _totalTries;
-
- ShardCursorsMap _cursorMap;
-
- // LEGACY BELOW
- int _numServers;
- int _lastFrom;
- std::set<std::string> _servers;
- BSONObj _sortKey;
-
- DBClientCursorHolder* _cursors;
- int _needToSkip;
-
- /**
- * Setups the shard version of the connection. When using a replica
- * set connection and the primary cannot be reached, the version
- * will not be set if the slaveOk flag is set.
- */
- void setupVersionAndHandleSlaveOk(OperationContext* opCtx,
- std::shared_ptr<ParallelConnectionState> state /* in & out */,
- const ShardId& shardId,
- std::shared_ptr<Shard> primary /* in */,
- const NamespaceString& ns,
- const std::string& vinfo,
- std::shared_ptr<ChunkManager> manager /* in */);
-
- // LEGACY init - Needed for map reduce
- void _oldInit(OperationContext* opCtx);
-
- // LEGACY - Needed ONLY for _oldInit
- std::string _ns;
- BSONObj _query;
- int _options;
- BSONObj _fields;
- int _batchSize;
-};
-
-/**
- * Throws a StaleConfigException wrapping the stale error document in this cursor when the
- * ShardConfigStale flag is set or a command returns a ErrorCodes::StaleConfig error code.
- */
-void throwCursorStale(DBClientCursor* cursor);
-
-} // namespace mongo
diff --git a/src/mongo/s/commands/strategy.cpp b/src/mongo/s/commands/strategy.cpp
index 33190ef2630..b47987ca7a2 100644
--- a/src/mongo/s/commands/strategy.cpp
+++ b/src/mongo/s/commands/strategy.cpp
@@ -72,7 +72,6 @@
#include "mongo/rpc/op_msg.h"
#include "mongo/rpc/op_msg_rpc_impls.h"
#include "mongo/s/catalog_cache.h"
-#include "mongo/s/client/parallel.h"
#include "mongo/s/client/shard_connection.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/cluster_commands_helpers.h"
@@ -1073,35 +1072,6 @@ DbResponse Strategy::clientCommand(OperationContext* opCtx, const Message& m) {
return dbResponse;
}
-void Strategy::commandOp(OperationContext* opCtx,
- const std::string& db,
- const BSONObj& command,
- const std::string& versionedNS,
- const BSONObj& targetingQuery,
- const BSONObj& targetingCollation,
- std::vector<CommandResult>* results) {
- QuerySpec qSpec(db + ".$cmd", command, BSONObj(), 0, 1, 0);
-
- ParallelSortClusteredCursor cursor(
- qSpec, CommandInfo(versionedNS, targetingQuery, targetingCollation));
-
- // Initialize the cursor
- cursor.init(opCtx);
-
- std::set<ShardId> shardIds;
- cursor.getQueryShardIds(shardIds);
-
- for (const ShardId& shardId : shardIds) {
- CommandResult result;
- result.shardTargetId = shardId;
-
- result.target =
- fassert(34417, ConnectionString::parse(cursor.getShardCursor(shardId)->originalHost()));
- result.result = cursor.getShardCursor(shardId)->peekFirst().getOwned();
- results->push_back(result);
- }
-}
-
DbResponse Strategy::getMore(OperationContext* opCtx, const NamespaceString& nss, DbMessage* dbm) {
const int ntoreturn = dbm->pullInt();
uassert(