/**
* Copyright 2010 10gen Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects
* for all of the code used other than as permitted herein. If you modify
* file(s) with this exception, you may extend this exception to your
* version of the file(s), but you are not obligated to do so. If you do not
* wish to do so, delete this exception statement from your version. If you
* delete this exception statement from all source files in the program,
* then also delete it in the license file.
*/
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork
#include "mongo/platform/basic.h"
#include "mongo/s/client/parallel.h"
#include "mongo/client/constants.h"
#include "mongo/client/dbclient_rs.h"
#include "mongo/client/dbclientcursor.h"
#include "mongo/client/replica_set_monitor.h"
#include "mongo/db/bson/dotted_path_support.h"
#include "mongo/db/query/query_request.h"
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/s/catalog_cache.h"
#include "mongo/s/client/shard_connection.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/grid.h"
#include "mongo/util/log.h"
#include "mongo/util/net/socket_exception.h"
namespace mongo {
using std::shared_ptr;
using std::map;
using std::set;
using std::string;
using std::vector;
namespace dps = ::mongo::dotted_path_support;
namespace {
LabeledLevel pc("pcursor", 2);
/**
* Throws an exception wrapping the error document in this cursor when the error flag is set.
*/
void throwCursorError(DBClientCursor* cursor) {
verify(cursor);
if (cursor->hasResultFlag(ResultFlag_ErrSet)) {
uassertStatusOK(getStatusFromCommandResult(cursor->next()));
}
}
} // namespace
struct ParallelConnectionState {
ParallelConnectionState() : count(0), done(false) {}
std::string toString() const;
BSONObj toBSON() const;
// Please do not reorder. cursor destructor can use conn.
// On a related note, never attempt to cleanup these pointers manually.
std::shared_ptr conn;
std::shared_ptr cursor;
// Version information
std::shared_ptr manager;
std::shared_ptr primary;
// Cursor status information
long long count;
bool done;
};
struct ParallelConnectionMetadata {
ParallelConnectionMetadata()
: retryNext(false), initialized(false), finished(false), completed(false), errored(false) {}
~ParallelConnectionMetadata() {
cleanup(true);
}
void cleanup(bool full = true);
std::shared_ptr pcState;
bool retryNext;
bool initialized;
bool finished;
bool completed;
bool errored;
BSONObj toBSON() const;
std::string toString() const {
return str::stream() << "PCMData : " << toBSON();
}
};
/**
* Helper class to manage ownership of opened cursors while merging results.
*
* TODO: Choose one set of ownership semantics so that this isn't needed - merge sort via mapreduce
* is the main issue since it has no metadata and this holder owns the cursors.
*/
class DBClientCursorHolder {
public:
DBClientCursorHolder() = default;
void reset(DBClientCursor* cursor, ParallelConnectionMetadata* pcmData) {
_cursor.reset(cursor);
_pcmData.reset(pcmData);
}
DBClientCursor* get() {
return _cursor.get();
}
ParallelConnectionMetadata* getMData() {
return _pcmData.get();
}
void release() {
_cursor.release();
_pcmData.release();
}
private:
std::unique_ptr _cursor;
std::unique_ptr _pcmData;
};
// -------- ParallelSortClusteredCursor -----------
ParallelSortClusteredCursor::ParallelSortClusteredCursor(const QuerySpec& qSpec,
const CommandInfo& cInfo)
: _qSpec(qSpec), _cInfo(cInfo), _totalTries(0) {
_done = false;
_didInit = false;
_finishCons();
}
ParallelSortClusteredCursor::ParallelSortClusteredCursor(const set& servers,
const string& ns,
const Query& q,
int options,
const BSONObj& fields)
: _servers(servers) {
_sortKey = q.getSort().copy();
_needToSkip = 0;
_done = false;
_didInit = false;
// Populate legacy fields
_ns = ns;
_query = q.obj.getOwned();
_options = options;
_fields = fields.getOwned();
_batchSize = 0;
_finishCons();
}
ParallelSortClusteredCursor::~ParallelSortClusteredCursor() {
// WARNING: Commands (in particular M/R) connect via _oldInit() directly to shards
bool isDirectShardCursor = _cursorMap.empty();
// Non-direct shard cursors are owned by the _cursorMap, so we release
// them in the array here. Direct shard cursors clean themselves.
if (!isDirectShardCursor) {
for (int i = 0; i < _numServers; i++)
_cursors[i].release();
}
delete[] _cursors;
_cursors = 0;
// Clear out our metadata after removing legacy cursor data
_cursorMap.clear();
// Just to be sure
_done = true;
}
void ParallelSortClusteredCursor::init(OperationContext* opCtx) {
if (_didInit)
return;
_didInit = true;
if (!_qSpec.isEmpty()) {
fullInit(opCtx);
} else {
// You can only get here by using the legacy constructor
// TODO: Eliminate this
_oldInit();
}
}
void ParallelSortClusteredCursor::_finishCons() {
_numServers = _servers.size();
_lastFrom = 0;
_cursors = 0;
if (!_qSpec.isEmpty()) {
_needToSkip = _qSpec.ntoskip();
_cursors = 0;
_sortKey = _qSpec.sort();
_fields = _qSpec.fields();
}
// Partition sort key fields into (a) text meta fields and (b) all other fields.
set textMetaSortKeyFields;
set normalSortKeyFields;
// Transform _sortKey fields {a:{$meta:"textScore"}} into {a:-1}, in order to apply the
// merge sort for text metadata in the correct direction.
BSONObjBuilder transformedSortKeyBuilder;
BSONObjIterator sortKeyIt(_sortKey);
while (sortKeyIt.more()) {
BSONElement e = sortKeyIt.next();
if (QueryRequest::isTextScoreMeta(e)) {
textMetaSortKeyFields.insert(e.fieldName());
transformedSortKeyBuilder.append(e.fieldName(), -1);
} else {
normalSortKeyFields.insert(e.fieldName());
transformedSortKeyBuilder.append(e);
}
}
_sortKey = transformedSortKeyBuilder.obj();
// Verify that that all text metadata sort fields are in the projection. For all other sort
// fields, copy them into the projection if they are missing (and if projection is
// negative).
if (!_sortKey.isEmpty() && !_fields.isEmpty()) {
BSONObjBuilder b;
bool isNegative = false;
{
BSONObjIterator i(_fields);
while (i.more()) {
BSONElement e = i.next();
b.append(e);
string fieldName = e.fieldName();
if (QueryRequest::isTextScoreMeta(e)) {
textMetaSortKeyFields.erase(fieldName);
} else {
// exact field
bool found = normalSortKeyFields.erase(fieldName);
// subfields
set::const_iterator begin =
normalSortKeyFields.lower_bound(fieldName + ".\x00");
set::const_iterator end =
normalSortKeyFields.lower_bound(fieldName + ".\xFF");
normalSortKeyFields.erase(begin, end);
if (!e.trueValue()) {
uassert(13431,
"have to have sort key in projection and removing it",
!found && begin == end);
} else if (!e.isABSONObj()) {
isNegative = true;
}
}
}
}
if (isNegative) {
for (set::const_iterator it(normalSortKeyFields.begin()),
end(normalSortKeyFields.end());
it != end;
++it) {
b.append(*it, 1);
}
}
_fields = b.obj();
}
if (!_qSpec.isEmpty()) {
_qSpec.setFields(_fields);
}
uassert(
17306, "have to have all text meta sort keys in projection", textMetaSortKeyFields.empty());
}
void ParallelSortClusteredCursor::fullInit(OperationContext* opCtx) {
startInit(opCtx);
finishInit(opCtx);
}
void ParallelSortClusteredCursor::_markStaleNS(const NamespaceString& staleNS,
const StaleConfigException& e) {
if (_staleNSMap.find(staleNS.ns()) == _staleNSMap.end()) {
_staleNSMap[staleNS.ns()] = 1;
}
const int tries = ++_staleNSMap[staleNS.ns()];
if (tries >= 5) {
uassertStatusOK(e.toStatus("too many retries of stale version info"));
}
}
void ParallelSortClusteredCursor::setupVersionAndHandleSlaveOk(
OperationContext* opCtx,
std::shared_ptr state,
const ShardId& shardId,
std::shared_ptr primary,
const NamespaceString& ns,
const string& vinfo,
std::shared_ptr manager) {
if (manager) {
state->manager = manager;
} else if (primary) {
state->primary = primary;
}
verify(!primary || shardId == primary->getId());
// Setup conn
if (!state->conn) {
const auto shard =
uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, shardId));
state->conn.reset(new ShardConnection(shard->getConnString(), ns.ns(), manager));
}
const DBClientBase* rawConn = state->conn->getRawConn();
bool allowShardVersionFailure = rawConn->type() == ConnectionString::SET &&
DBClientReplicaSet::isSecondaryQuery(_qSpec.ns(), _qSpec.query(), _qSpec.options());
// Skip shard version checking if primary is known to be down.
if (allowShardVersionFailure) {
const DBClientReplicaSet* replConn = dynamic_cast(rawConn);
invariant(replConn);
ReplicaSetMonitorPtr rsMonitor = ReplicaSetMonitor::get(replConn->getSetName());
uassert(16388,
str::stream() << "cannot access unknown replica set: " << replConn->getSetName(),
rsMonitor != nullptr);
if (!rsMonitor->isKnownToHaveGoodPrimary()) {
state->conn->donotCheckVersion();
// A side effect of this short circuiting is the mongos will not be able figure out
// that the primary is now up on it's own and has to rely on other threads to refresh
// node states.
OCCASIONALLY {
const DBClientReplicaSet* repl = dynamic_cast(rawConn);
dassert(repl);
warning() << "Primary for " << repl->getServerAddress()
<< " was down before, bypassing setShardVersion."
<< " The local replica set view and targeting may be stale.";
}
return;
}
}
try {
if (state->conn->setVersion()) {
LOG(pc) << "needed to set remote version on connection to value "
<< "compatible with " << vinfo;
}
} catch (const DBException& dbExcep) {
auto errCode = dbExcep.code();
if (allowShardVersionFailure &&
(ErrorCodes::isNotMasterError(errCode) || ErrorCodes::isNetworkError(errCode) ||
errCode == ErrorCodes::FailedToSatisfyReadPreference)) {
// It's okay if we don't set the version when talking to a secondary, we can
// be stale in any case.
OCCASIONALLY {
const DBClientReplicaSet* repl =
dynamic_cast(state->conn->getRawConn());
dassert(repl);
warning() << "Cannot contact primary for " << repl->getServerAddress()
<< " to check shard version."
<< " The local replica set view and targeting may be stale.";
}
} else {
throw;
}
}
}
void ParallelSortClusteredCursor::startInit(OperationContext* opCtx) {
const bool returnPartial = (_qSpec.options() & QueryOption_PartialResults);
const NamespaceString nss(!_cInfo.isEmpty() ? _cInfo.versionedNS : _qSpec.ns());
string prefix;
if (MONGO_unlikely(shouldLog(pc))) {
if (_totalTries > 0) {
prefix = str::stream() << "retrying (" << _totalTries << " tries)";
} else {
prefix = "creating";
}
}
LOG(pc) << prefix << " pcursor over " << _qSpec << " and " << _cInfo;
shared_ptr manager;
shared_ptr primary;
{
auto routingInfoStatus =
Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss);
if (routingInfoStatus != ErrorCodes::NamespaceNotFound) {
auto routingInfo = uassertStatusOK(std::move(routingInfoStatus));
manager = routingInfo.cm();
// ParallelSortClusteredCursor has two states - either !cm && primary, which means
// unsharded collection, or cm && !primary, which means sharded collection.
if (!manager) {
primary = routingInfo.db().primary();
}
}
}
set shardIds;
string vinfo;
if (manager) {
if (MONGO_unlikely(shouldLog(pc))) {
vinfo = str::stream() << "[" << manager->getns().ns() << " @ "
<< manager->getVersion().toString() << "]";
}
manager->getShardIdsForQuery(opCtx,
!_cInfo.isEmpty() ? _cInfo.cmdFilter : _qSpec.filter(),
!_cInfo.isEmpty() ? _cInfo.cmdCollation : BSONObj(),
&shardIds);
} else if (primary) {
if (MONGO_unlikely(shouldLog(pc))) {
vinfo = str::stream() << "[unsharded @ " << primary->toString() << "]";
}
shardIds.insert(primary->getId());
}
// Close all cursors on extra shards first, as these will be invalid
for (auto& cmEntry : _cursorMap) {
const auto& shardId = cmEntry.first;
if (shardIds.find(shardId) == shardIds.end()) {
LOG(pc) << "closing cursor on shard " << shardId
<< " as the connection is no longer required by " << vinfo;
cmEntry.second.cleanup(true);
}
}
LOG(pc) << "initializing over " << shardIds.size() << " shards required by " << vinfo;
// Don't retry indefinitely for whatever reason
_totalTries++;
uassert(15986, "too many retries in total", _totalTries < 10);
for (const ShardId& shardId : shardIds) {
auto& mdata = _cursorMap[shardId];
LOG(pc) << "initializing on shard " << shardId << ", current connection state is "
<< mdata.toBSON();
// This may be the first time connecting to this shard, if so we can get an error here
try {
if (mdata.initialized) {
invariant(mdata.pcState);
auto state = mdata.pcState;
bool compatiblePrimary = true;
bool compatibleManager = true;
if (primary && !state->primary)
warning() << "Collection becoming unsharded detected";
if (manager && !state->manager)
warning() << "Collection becoming sharded detected";
if (primary && state->primary && primary != state->primary)
warning() << "Weird shift of primary detected";
compatiblePrimary = primary && state->primary && primary == state->primary;
compatibleManager =
manager && state->manager && manager->compatibleWith(*state->manager, shardId);
if (compatiblePrimary || compatibleManager) {
// If we're compatible, don't need to retry unless forced
if (!mdata.retryNext)
continue;
// Do partial cleanup
mdata.cleanup(false);
} else {
// Force total cleanup of connection if no longer compatible
mdata.cleanup(true);
}
} else {
// Cleanup connection if we're not yet initialized
mdata.cleanup(false);
}
mdata.pcState = std::make_shared();
auto state = mdata.pcState;
setupVersionAndHandleSlaveOk(opCtx, state, shardId, primary, nss, vinfo, manager);
const string& ns = _qSpec.ns();
// Setup cursor
if (!state->cursor) {
//
// Here we decide whether to split the query limits up for multiple shards.
// NOTE: There's a subtle issue here, in that it's possible we target a single
// shard first, but are stale, and then target multiple shards, or vice-versa.
// In both these cases, we won't re-use the old cursor created here, since the
// shard version must have changed on the single shard between queries.
//
if (shardIds.size() > 1) {
// Query limits split for multiple shards
state->cursor.reset(new DBClientCursor(
state->conn->get(),
ns,
_qSpec.query(),
isCommand() ? 1 : 0, // nToReturn (0 if query indicates multi)
0, // nToSkip
// Does this need to be a ptr?
_qSpec.fields().isEmpty() ? 0 : _qSpec.fieldsData(), // fieldsToReturn
_qSpec.options(), // options
// NtoReturn is weird.
// If zero, it means use default size, so we do that for all cursors
// If positive, it's the batch size (we don't want this cursor limiting
// results), that's done at a higher level
// If negative, it's the batch size, but we don't create a cursor - so we
// don't want to create a child cursor either.
// Either way, if non-zero, we want to pull back the batch size + the skip
// amount as quickly as possible. Potentially, for a cursor on a single
// shard or if we keep better track of chunks, we can actually add the skip
// value into the cursor and/or make some assumptions about the return value
// size ( (batch size + skip amount) / num_servers ).
_qSpec.ntoreturn() == 0 ? 0 : (_qSpec.ntoreturn() > 0
? _qSpec.ntoreturn() + _qSpec.ntoskip()
: _qSpec.ntoreturn() -
_qSpec.ntoskip()))); // batchSize
} else {
// Single shard query
state->cursor.reset(new DBClientCursor(
state->conn->get(),
ns,
_qSpec.query(),
_qSpec.ntoreturn(), // nToReturn
_qSpec.ntoskip(), // nToSkip
// Does this need to be a ptr?
_qSpec.fields().isEmpty() ? 0 : _qSpec.fieldsData(), // fieldsToReturn
_qSpec.options(), // options
0)); // batchSize
}
}
bool lazyInit = state->conn->get()->lazySupported();
if (lazyInit) {
// Need to keep track if this is a second or third try for replica sets
state->cursor->initLazy(mdata.retryNext);
mdata.retryNext = false;
mdata.initialized = true;
} else {
bool success = state->cursor->init();
// Without full initialization, throw an exception
uassert(15987,
str::stream() << "could not fully initialize cursor on shard " << shardId
<< ", current connection state is "
<< mdata.toBSON().toString(),
success);
mdata.retryNext = false;
mdata.initialized = true;
mdata.finished = true;
}
LOG(pc) << "initialized " << (isCommand() ? "command " : "query ")
<< (lazyInit ? "(lazily) " : "(full) ") << "on shard " << shardId
<< ", current connection state is " << mdata.toBSON();
} catch (StaleConfigException& e) {
// Our version isn't compatible with the current version anymore on at least one shard,
// need to retry immediately
NamespaceString staleNS(e->getNss());
_markStaleNS(staleNS, e);
Grid::get(opCtx)->catalogCache()->invalidateShardedCollection(staleNS);
LOG(1) << "stale config of ns " << staleNS << " during initialization, will retry"
<< causedBy(redact(e));
// This is somewhat strange
if (staleNS != nss) {
warning() << "versioned ns " << nss.ns() << " doesn't match stale config namespace "
<< staleNS;
}
// Restart with new chunk manager
startInit(opCtx);
return;
} catch (NetworkException& e) {
warning() << "socket exception when initializing on " << shardId
<< ", current connection state is " << mdata.toBSON() << causedBy(redact(e));
mdata.errored = true;
if (returnPartial) {
mdata.cleanup(true);
continue;
}
throw;
} catch (DBException& e) {
warning() << "db exception when initializing on " << shardId
<< ", current connection state is " << mdata.toBSON() << causedBy(redact(e));
mdata.errored = true;
if (returnPartial && e.code() == 15925 /* From above! */) {
mdata.cleanup(true);
continue;
}
throw;
} catch (std::exception& e) {
warning() << "exception when initializing on " << shardId
<< ", current connection state is " << mdata.toBSON() << causedBy(e);
mdata.errored = true;
throw;
} catch (...) {
warning() << "unknown exception when initializing on " << shardId
<< ", current connection state is " << mdata.toBSON();
mdata.errored = true;
throw;
}
}
// Sanity check final init'ed connections
for (const auto& cmEntry : _cursorMap) {
const auto& shardId = cmEntry.first;
const auto& mdata = cmEntry.second;
if (!mdata.pcState) {
continue;
}
// Make sure all state is in shards
invariant(shardIds.find(shardId) != shardIds.end());
invariant(mdata.initialized == true);
if (!mdata.completed) {
invariant(mdata.pcState->conn->ok());
}
invariant(mdata.pcState->cursor);
invariant(mdata.pcState->primary || mdata.pcState->manager);
invariant(!mdata.retryNext);
if (mdata.completed) {
invariant(mdata.finished);
}
if (mdata.finished) {
invariant(mdata.initialized);
}
if (!returnPartial) {
invariant(mdata.initialized);
}
}
}
void ParallelSortClusteredCursor::finishInit(OperationContext* opCtx) {
bool returnPartial = (_qSpec.options() & QueryOption_PartialResults);
bool specialVersion = _cInfo.versionedNS.size() > 0;
string ns = specialVersion ? _cInfo.versionedNS : _qSpec.ns();
bool retry = false;
map staleNSExceptions;
LOG(pc) << "finishing over " << _cursorMap.size() << " shards";
for (auto& cmEntry : _cursorMap) {
const auto& shardId = cmEntry.first;
auto& mdata = cmEntry.second;
LOG(pc) << "finishing on shard " << shardId << ", current connection state is "
<< mdata.toBSON();
// Ignore empty conns for now
if (!mdata.pcState)
continue;
auto state = mdata.pcState;
try {
// Sanity checks
if (!mdata.completed)
verify(state->conn && state->conn->ok());
verify(state->cursor);
verify(state->manager || state->primary);
verify(!state->manager || !state->primary);
// If we weren't init'ing lazily, ignore this
if (!mdata.finished) {
mdata.finished = true;
// Mark the cursor as non-retry by default
mdata.retryNext = false;
if (!state->cursor->initLazyFinish(mdata.retryNext)) {
if (!mdata.retryNext) {
uassert(15988, "error querying server", false);
} else {
retry = true;
continue;
}
}
mdata.completed = false;
}
if (!mdata.completed) {
mdata.completed = true;
// Make sure we didn't get an error we should rethrow
// TODO : Refactor this to something better
throwCursorStale(state->cursor.get());
throwCursorError(state->cursor.get());
// Finalize state
state->cursor->attach(state->conn.get()); // Closes connection for us
LOG(pc) << "finished on shard " << shardId << ", current connection state is "
<< mdata.toBSON();
}
} catch (StaleConfigException& e) {
retry = true;
std::string staleNS = e->getNss().ns();
// Will retry all at once
staleNSExceptions.emplace(staleNS, e);
// Fully clear this cursor, as it needs to be re-established
mdata.cleanup(true);
continue;
} catch (NetworkException& e) {
warning() << "socket exception when finishing on " << shardId
<< ", current connection state is " << mdata.toBSON() << causedBy(redact(e));
mdata.errored = true;
if (returnPartial) {
mdata.cleanup(true);
continue;
}
throw;
} catch (DBException& e) {
// NOTE: RECV() WILL NOT THROW A SOCKET EXCEPTION - WE GET THIS AS ERROR 15988 FROM
// ABOVE
if (e.code() == 15988) {
warning() << "exception when receiving data from " << shardId
<< ", current connection state is " << mdata.toBSON()
<< causedBy(redact(e));
mdata.errored = true;
if (returnPartial) {
mdata.cleanup(true);
continue;
}
throw;
} else {
// the InvalidBSON exception indicates that the BSON is malformed ->
// don't print/call "mdata.toBSON()" to avoid unexpected errors e.g. a segfault
if (e.code() == ErrorCodes::InvalidBSON)
warning() << "bson is malformed :: db exception when finishing on " << shardId
<< causedBy(redact(e));
else
warning() << "db exception when finishing on " << shardId
<< ", current connection state is " << mdata.toBSON()
<< causedBy(redact(e));
mdata.errored = true;
throw;
}
} catch (std::exception& e) {
warning() << "exception when finishing on " << shardId
<< ", current connection state is " << mdata.toBSON() << causedBy(e);
mdata.errored = true;
throw;
} catch (...) {
warning() << "unknown exception when finishing on " << shardId
<< ", current connection state is " << mdata.toBSON();
mdata.errored = true;
throw;
}
}
// Retry logic for single refresh of namespaces / retry init'ing connections
if (retry) {
// Refresh stale namespaces
if (staleNSExceptions.size()) {
for (const auto& exEntry : staleNSExceptions) {
const NamespaceString staleNS(exEntry.first);
const StaleConfigException& ex = exEntry.second;
_markStaleNS(staleNS, ex);
Grid::get(opCtx)->catalogCache()->invalidateShardedCollection(staleNS);
LOG(1) << "stale config of ns " << staleNS << " on finishing query, will retry"
<< causedBy(redact(ex));
// This is somewhat strange
if (staleNS.ns() != ns) {
warning() << "versioned ns " << ns << " doesn't match stale config namespace "
<< staleNS;
}
}
}
// Re-establish connections we need to
startInit(opCtx);
finishInit(opCtx);
return;
}
// Sanity check and clean final connections
for (auto i = _cursorMap.begin(); i != _cursorMap.end();) {
auto& mdata = i->second;
// Erase empty stuff
if (!mdata.pcState) {
log() << "PCursor erasing empty state " << mdata.toBSON();
_cursorMap.erase(i++);
continue;
} else {
++i;
}
// Make sure all state is in shards
verify(mdata.initialized == true);
verify(mdata.finished == true);
verify(mdata.completed == true);
verify(!mdata.pcState->conn->ok());
verify(mdata.pcState->cursor);
verify(mdata.pcState->primary || mdata.pcState->manager);
}
// TODO : More cleanup of metadata?
// LEGACY STUFF NOW
_cursors = new DBClientCursorHolder[_cursorMap.size()];
// Put the cursors in the legacy format
int index = 0;
for (auto& cmEntry : _cursorMap) {
const auto& shardId = cmEntry.first;
auto& mdata = cmEntry.second;
_cursors[index].reset(mdata.pcState->cursor.get(), &mdata);
const auto shard =
uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, shardId));
_servers.insert(shard->getConnString().toString());
index++;
}
_numServers = _cursorMap.size();
}
void ParallelSortClusteredCursor::getQueryShardIds(set& shardIds) const {
std::transform(_cursorMap.begin(),
_cursorMap.end(),
std::inserter(shardIds, shardIds.end()),
[](const ShardCursorsMap::value_type& pair) { return pair.first; });
}
std::shared_ptr ParallelSortClusteredCursor::getShardCursor(
const ShardId& shardId) const {
auto it = _cursorMap.find(shardId);
if (it == _cursorMap.end()) {
return nullptr;
}
return it->second.pcState->cursor;
}
// DEPRECATED (but still used by map/reduce)
void ParallelSortClusteredCursor::_oldInit() {
// make sure we're not already initialized
verify(!_cursors);
_cursors = new DBClientCursorHolder[_numServers];
bool returnPartial = (_options & QueryOption_PartialResults);
vector serverHosts(_servers.begin(), _servers.end());
set retryQueries;
int finishedQueries = 0;
vector> conns;
vector servers;
// Since we may get all sorts of errors, record them all as they come and throw them later if
// necessary
vector staleConfigExs;
vector socketExs;
vector otherExs;
bool allConfigStale = false;
int retries = -1;
// Loop through all the queries until we've finished or gotten a socket exception on all of them
// We break early for non-socket exceptions, and socket exceptions if we aren't returning
// partial results
do {
retries++;
bool firstPass = retryQueries.size() == 0;
if (!firstPass) {
log() << "retrying " << (returnPartial ? "(partial) " : "")
<< "parallel connection to ";
for (set::const_iterator it = retryQueries.begin(); it != retryQueries.end();
++it) {
log() << serverHosts[*it] << ", ";
}
log() << finishedQueries << " finished queries.";
}
size_t num = 0;
for (vector::const_iterator it = serverHosts.begin(); it != serverHosts.end();
++it) {
size_t i = num++;
const string& serverHost = *it;
// If we're not retrying this cursor on later passes, continue
if (!firstPass && retryQueries.find(i) == retryQueries.end())
continue;
const string errLoc = " @ " + serverHost;
if (firstPass) {
// This may be the first time connecting to this shard, if so we can get an error
// here
try {
conns.push_back(shared_ptr(new ShardConnection(
uassertStatusOK(ConnectionString::parse(serverHost)), _ns)));
} catch (std::exception& e) {
socketExs.push_back(e.what() + errLoc);
if (!returnPartial) {
num--;
break;
}
conns.push_back(shared_ptr());
continue;
}
servers.push_back(serverHost);
}
if (conns[i]->setVersion()) {
conns[i]->done();
// Version is zero b/c this is deprecated codepath
staleConfigExs.push_back(str::stream() << "stale config detected for " << _ns
<< " in ParallelCursor::_init "
<< errLoc);
break;
}
LOG(5) << "ParallelSortClusteredCursor::init server:" << serverHost << " ns:" << _ns
<< " query:" << redact(_query) << " fields:" << redact(_fields)
<< " options: " << _options;
if (!_cursors[i].get())
_cursors[i].reset(
new DBClientCursor(conns[i]->get(),
_ns,
_query,
0, // nToReturn
0, // nToSkip
_fields.isEmpty() ? 0 : &_fields, // fieldsToReturn
_options,
_batchSize == 0 ? 0 : _batchSize + _needToSkip // batchSize
),
NULL);
try {
_cursors[i].get()->initLazy(!firstPass);
} catch (NetworkException& e) {
socketExs.push_back(e.what() + errLoc);
_cursors[i].reset(NULL, NULL);
conns[i]->done();
if (!returnPartial)
break;
} catch (std::exception& e) {
otherExs.push_back(e.what() + errLoc);
_cursors[i].reset(NULL, NULL);
conns[i]->done();
break;
}
}
// Go through all the potentially started cursors and finish initializing them or log any
// errors and potentially retry
// TODO: Better error classification would make this easier, errors are indicated in all
// sorts of ways here that we need to trap.
for (size_t i = 0; i < num; i++) {
const string errLoc = " @ " + serverHosts[i];
if (!_cursors[i].get() || (!firstPass && retryQueries.find(i) == retryQueries.end())) {
if (conns[i])
conns[i].get()->done();
continue;
}
verify(conns[i]);
retryQueries.erase(i);
bool retry = false;
try {
if (!_cursors[i].get()->initLazyFinish(retry)) {
warning() << "invalid result from " << conns[i]->getHost()
<< (retry ? ", retrying" : "");
_cursors[i].reset(NULL, NULL);
if (!retry) {
socketExs.push_back(str::stream() << "error querying server: "
<< servers[i]);
conns[i]->done();
} else {
retryQueries.insert(i);
}
continue;
}
} catch (StaleConfigException& e) {
// Our stored configuration data is actually stale, we need to reload it
// when we throw our exception
allConfigStale = true;
staleConfigExs.push_back(
(string) "stale config detected when receiving response for " + e.toString() +
errLoc);
_cursors[i].reset(NULL, NULL);
conns[i]->done();
continue;
} catch (NetworkException& e) {
socketExs.push_back(e.what() + errLoc);
_cursors[i].reset(NULL, NULL);
conns[i]->done();
continue;
} catch (std::exception& e) {
otherExs.push_back(e.what() + errLoc);
_cursors[i].reset(NULL, NULL);
conns[i]->done();
continue;
}
try {
_cursors[i].get()->attach(conns[i].get()); // this calls done on conn
// Rethrow stale config or other errors
throwCursorStale(_cursors[i].get());
throwCursorError(_cursors[i].get());
finishedQueries++;
} catch (StaleConfigException& e) {
// Our stored configuration data is actually stale, we need to reload it
// when we throw our exception
allConfigStale = true;
staleConfigExs.push_back((string) "stale config detected for " + e.toString() +
errLoc);
_cursors[i].reset(NULL, NULL);
conns[i]->done();
continue;
} catch (std::exception& e) {
otherExs.push_back(e.what() + errLoc);
_cursors[i].reset(NULL, NULL);
conns[i]->done();
continue;
}
}
// Don't exceed our max retries, should not happen
verify(retries < 5);
} while (retryQueries.size() > 0 /* something to retry */ &&
(socketExs.size() == 0 || returnPartial) /* no conn issues */ &&
staleConfigExs.size() == 0 /* no config issues */ &&
otherExs.size() == 0 /* no other issues */);
// Assert that our conns are all closed!
for (vector>::iterator i = conns.begin(); i < conns.end(); ++i) {
verify(!(*i) || !(*i)->ok());
}
// Handle errors we got during initialization.
// If we're returning partial results, we can ignore socketExs, but nothing else
// Log a warning in any case, so we don't lose these messages
bool throwException = (socketExs.size() > 0 && !returnPartial) || staleConfigExs.size() > 0 ||
otherExs.size() > 0;
if (socketExs.size() > 0 || staleConfigExs.size() > 0 || otherExs.size() > 0) {
vector errMsgs;
errMsgs.insert(errMsgs.end(), staleConfigExs.begin(), staleConfigExs.end());
errMsgs.insert(errMsgs.end(), otherExs.begin(), otherExs.end());
errMsgs.insert(errMsgs.end(), socketExs.begin(), socketExs.end());
std::stringstream errMsg;
errMsg << "could not initialize cursor across all shards because : ";
for (vector::iterator i = errMsgs.begin(); i != errMsgs.end(); i++) {
if (i != errMsgs.begin())
errMsg << " :: and :: ";
errMsg << *i;
}
if (throwException && staleConfigExs.size() > 0) {
// Version is zero b/c this is deprecated codepath
uasserted(StaleConfigInfo(NamespaceString(_ns),
ChunkVersion(0, 0, OID()),
ChunkVersion(0, 0, OID())),
errMsg.str());
} else if (throwException) {
uasserted(14827, errMsg.str());
} else {
warning() << redact(errMsg.str());
}
}
if (retries > 0)
log() << "successfully finished parallel query after " << retries << " retries";
}
bool ParallelSortClusteredCursor::more() {
if (_needToSkip > 0) {
int n = _needToSkip;
_needToSkip = 0;
while (n > 0 && more()) {
next();
n--;
}
_needToSkip = n;
}
for (int i = 0; i < _numServers; i++) {
if (_cursors[i].get() && _cursors[i].get()->more())
return true;
}
return false;
}
BSONObj ParallelSortClusteredCursor::next() {
BSONObj best = BSONObj();
int bestFrom = -1;
for (int j = 0; j < _numServers; j++) {
// Iterate _numServers times, starting one past the last server we used.
// This means we actually start at server #1, not #0, but shouldn't matter
int i = (j + _lastFrom + 1) % _numServers;
// Check to see if the cursor is finished
if (!_cursors[i].get() || !_cursors[i].get()->more()) {
if (_cursors[i].getMData())
_cursors[i].getMData()->pcState->done = true;
continue;
}
// We know we have at least one result in this cursor
BSONObj me = _cursors[i].get()->peekFirst();
// If this is the first non-empty cursor, save the result as best
if (bestFrom < 0) {
best = me;
bestFrom = i;
if (_sortKey.isEmpty())
break;
continue;
}
// Otherwise compare the result to the current best result
int comp = dps::compareObjectsAccordingToSort(best, me, _sortKey, true);
if (comp < 0)
continue;
best = me;
bestFrom = i;
}
_lastFrom = bestFrom;
uassert(10019, "no more elements", bestFrom >= 0);
_cursors[bestFrom].get()->next();
// Make sure the result data won't go away after the next call to more()
if (!_cursors[bestFrom].get()->moreInCurrentBatch()) {
best = best.getOwned();
}
if (_cursors[bestFrom].getMData())
_cursors[bestFrom].getMData()->pcState->count++;
return best;
}
void ParallelConnectionMetadata::cleanup(bool full) {
if (full || errored)
retryNext = false;
if (!retryNext && pcState) {
if (initialized && !errored) {
verify(pcState->cursor);
verify(pcState->conn);
if (!finished && pcState->conn->ok()) {
try {
// Complete the call if only halfway done
bool retry = false;
pcState->cursor->initLazyFinish(retry);
} catch (std::exception&) {
warning() << "exception closing cursor";
} catch (...) {
warning() << "unknown exception closing cursor";
}
}
}
// Double-check conn is closed
if (pcState->conn) {
pcState->conn->done();
}
pcState.reset();
} else
verify(finished || !initialized);
initialized = false;
finished = false;
completed = false;
errored = false;
}
BSONObj ParallelConnectionMetadata::toBSON() const {
return BSON("state" << (pcState ? pcState->toBSON() : BSONObj()) << "retryNext" << retryNext
<< "init"
<< initialized
<< "finish"
<< finished
<< "errored"
<< errored);
}
std::string ParallelConnectionState::toString() const {
return str::stream() << "PCState : " << toBSON();
}
BSONObj ParallelConnectionState::toBSON() const {
BSONObj cursorPeek = BSON("no cursor"
<< "");
if (cursor) {
vector v;
cursor->peek(v, 1);
if (v.size() == 0)
cursorPeek = BSON("no data"
<< "");
else
cursorPeek = BSON("" << v[0]);
}
BSONObj stateObj =
BSON("conn" << (conn ? (conn->ok() ? conn->conn().toString() : "(done)") : "") << "vinfo"
<< (manager ? (str::stream() << manager->getns().ns() << " @ "
<< manager->getVersion().toString())
: primary->toString()));
// Append cursor data if exists
BSONObjBuilder stateB;
stateB.appendElements(stateObj);
if (!cursor)
stateB.append("cursor", "(none)");
else {
vector v;
cursor->peek(v, 1);
if (v.size() == 0)
stateB.append("cursor", "(empty)");
else
stateB.append("cursor", v[0]);
}
stateB.append("count", count);
stateB.append("done", done);
return stateB.obj().getOwned();
}
void throwCursorStale(DBClientCursor* cursor) {
invariant(cursor);
if (cursor->hasResultFlag(ResultFlag_ShardConfigStale)) {
BSONObj error;
cursor->peekError(&error);
uasserted(StaleConfigInfo::parseFromCommandError(error),
"query returned a stale config error");
}
if (NamespaceString(cursor->getns()).isCommand()) {
// Commands that care about versioning (like the count or geoNear command) sometimes return
// with the stale config error code, but don't set the ShardConfigStale result flag on the
// cursor.
//
// TODO: Standardize stale config reporting.
BSONObj res = cursor->peekFirst();
auto status = getStatusFromCommandResult(res);
if (status == ErrorCodes::StaleConfig) {
uassertStatusOK(status.withContext("command returned a stale config error"));
}
}
}
} // namespace mongo