summaryrefslogtreecommitdiff
path: root/src/mongo/s/client/parallel.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/s/client/parallel.cpp')
-rw-r--r--src/mongo/s/client/parallel.cpp238
1 files changed, 169 insertions, 69 deletions
diff --git a/src/mongo/s/client/parallel.cpp b/src/mongo/s/client/parallel.cpp
index fae0d1c7c47..83360c2f958 100644
--- a/src/mongo/s/client/parallel.cpp
+++ b/src/mongo/s/client/parallel.cpp
@@ -39,6 +39,7 @@
#include "mongo/client/replica_set_monitor.h"
#include "mongo/db/bson/dotted_path_support.h"
#include "mongo/db/query/query_request.h"
+#include "mongo/logv2/log.h"
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/s/catalog_cache.h"
#include "mongo/s/client/shard_connection.h"
@@ -376,9 +377,11 @@ void ParallelSortClusteredCursor::setupVersionAndHandleSlaveOk(
if (sampler.tick()) {
const DBClientReplicaSet* repl = dynamic_cast<const DBClientReplicaSet*>(rawConn);
dassert(repl);
- warning() << "Primary for " << repl->getServerAddress()
- << " was down before, bypassing setShardVersion."
- << " The local replica set view and targeting may be stale.";
+ LOGV2_WARNING(
+ 22695,
+ "Primary for {repl_getServerAddress} was down before, bypassing "
+ "setShardVersion. The local replica set view and targeting may be stale.",
+ "repl_getServerAddress"_attr = repl->getServerAddress());
}
return;
@@ -387,8 +390,11 @@ void ParallelSortClusteredCursor::setupVersionAndHandleSlaveOk(
try {
if (state->conn->setVersion()) {
- LOG(2) << "pcursor: needed to set remote version on connection to value "
- << "compatible with " << vinfo;
+ LOGV2_DEBUG(22678,
+ 2,
+ "pcursor: needed to set remote version on connection to value compatible "
+ "with {vinfo}",
+ "vinfo"_attr = vinfo);
}
} catch (const DBException& dbExcep) {
auto errCode = dbExcep.code();
@@ -403,9 +409,10 @@ void ParallelSortClusteredCursor::setupVersionAndHandleSlaveOk(
const DBClientReplicaSet* repl =
dynamic_cast<const DBClientReplicaSet*>(state->conn->getRawConn());
dassert(repl);
- warning() << "Cannot contact primary for " << repl->getServerAddress()
- << " to check shard version."
- << " The local replica set view and targeting may be stale.";
+ LOGV2_WARNING(22696,
+ "Cannot contact primary for {repl_getServerAddress} to check shard "
+ "version. The local replica set view and targeting may be stale.",
+ "repl_getServerAddress"_attr = repl->getServerAddress());
}
} else {
throw;
@@ -425,7 +432,12 @@ void ParallelSortClusteredCursor::startInit(OperationContext* opCtx) {
prefix = "creating";
}
}
- LOG(2) << "pcursor: " << prefix << " pcursor over " << _qSpec << " and " << _cInfo;
+ LOGV2_DEBUG(22679,
+ 2,
+ "pcursor: {prefix} pcursor over {qSpec} and {cInfo}",
+ "prefix"_attr = prefix,
+ "qSpec"_attr = _qSpec,
+ "cInfo"_attr = _cInfo);
shared_ptr<ChunkManager> manager;
shared_ptr<Shard> primary;
@@ -470,14 +482,22 @@ void ParallelSortClusteredCursor::startInit(OperationContext* opCtx) {
const auto& shardId = cmEntry.first;
if (shardIds.find(shardId) == shardIds.end()) {
- LOG(2) << "pcursor: closing cursor on shard " << shardId
- << " as the connection is no longer required by " << vinfo;
+ LOGV2_DEBUG(22680,
+ 2,
+ "pcursor: closing cursor on shard {shardId} as the connection is no longer "
+ "required by {vinfo}",
+ "shardId"_attr = shardId,
+ "vinfo"_attr = vinfo);
cmEntry.second.cleanup(true);
}
}
- LOG(2) << "pcursor: initializing over " << shardIds.size() << " shards required by " << vinfo;
+ LOGV2_DEBUG(22681,
+ 2,
+ "pcursor: initializing over {shardIds_size} shards required by {vinfo}",
+ "shardIds_size"_attr = shardIds.size(),
+ "vinfo"_attr = vinfo);
// Don't retry indefinitely for whatever reason
_totalTries++;
@@ -486,8 +506,11 @@ void ParallelSortClusteredCursor::startInit(OperationContext* opCtx) {
for (const ShardId& shardId : shardIds) {
auto& mdata = _cursorMap[shardId];
- LOG(2) << "pcursor: initializing on shard " << shardId << ", current connection state is "
- << mdata.toBSON();
+ LOGV2_DEBUG(22682,
+ 2,
+ "pcursor: initializing on shard {shardId}, current connection state is {mdata}",
+ "shardId"_attr = shardId,
+ "mdata"_attr = mdata.toBSON());
// This may be the first time connecting to this shard, if so we can get an error here
try {
@@ -500,11 +523,11 @@ void ParallelSortClusteredCursor::startInit(OperationContext* opCtx) {
bool compatibleManager = true;
if (primary && !state->primary)
- warning() << "Collection becoming unsharded detected";
+ LOGV2_WARNING(22697, "Collection becoming unsharded detected");
if (manager && !state->manager)
- warning() << "Collection becoming sharded detected";
+ LOGV2_WARNING(22698, "Collection becoming sharded detected");
if (primary && state->primary && primary != state->primary)
- warning() << "Weird shift of primary detected";
+ LOGV2_WARNING(22699, "Weird shift of primary detected");
compatiblePrimary = primary && state->primary && primary == state->primary;
compatibleManager =
@@ -609,9 +632,14 @@ void ParallelSortClusteredCursor::startInit(OperationContext* opCtx) {
mdata.finished = true;
}
- LOG(2) << "pcursor: initialized " << (isCommand() ? "command " : "query ")
- << (lazyInit ? "(lazily) " : "(full) ") << "on shard " << shardId
- << ", current connection state is " << mdata.toBSON();
+ LOGV2_DEBUG(22683,
+ 2,
+ "pcursor: initialized {isCommand_command_query}{lazyInit_lazily_full}on "
+ "shard {shardId}, current connection state is {mdata}",
+ "isCommand_command_query"_attr = (isCommand() ? "command " : "query "),
+ "lazyInit_lazily_full"_attr = (lazyInit ? "(lazily) " : "(full) "),
+ "shardId"_attr = shardId,
+ "mdata"_attr = mdata.toBSON());
} catch (StaleConfigException& e) {
// Our version isn't compatible with the current version anymore on at least one shard,
// need to retry immediately
@@ -623,21 +651,32 @@ void ParallelSortClusteredCursor::startInit(OperationContext* opCtx) {
->invalidateShardOrEntireCollectionEntryForShardedCollection(
opCtx, nss, e->getVersionWanted(), e->getVersionReceived(), e->getShardId());
- LOG(1) << "stale config of ns " << staleNS << " during initialization, will retry"
- << causedBy(redact(e));
+ LOGV2_DEBUG(
+ 22684,
+ 1,
+ "stale config of ns {staleNS} during initialization, will retry{causedBy_e}",
+ "staleNS"_attr = staleNS,
+ "causedBy_e"_attr = causedBy(redact(e)));
// This is somewhat strange
if (staleNS != nss) {
- warning() << "versioned ns " << nss.ns() << " doesn't match stale config namespace "
- << staleNS;
+ LOGV2_WARNING(
+ 22700,
+ "versioned ns {nss_ns} doesn't match stale config namespace {staleNS}",
+ "nss_ns"_attr = nss.ns(),
+ "staleNS"_attr = staleNS);
}
// Restart with new chunk manager
startInit(opCtx);
return;
} catch (NetworkException& e) {
- warning() << "socket exception when initializing on " << shardId
- << ", current connection state is " << mdata.toBSON() << causedBy(redact(e));
+ LOGV2_WARNING(22701,
+ "socket exception when initializing on {shardId}, current connection "
+ "state is {mdata}{causedBy_e}",
+ "shardId"_attr = shardId,
+ "mdata"_attr = mdata.toBSON(),
+ "causedBy_e"_attr = causedBy(redact(e)));
mdata.errored = true;
if (returnPartial) {
mdata.cleanup(true);
@@ -645,8 +684,12 @@ void ParallelSortClusteredCursor::startInit(OperationContext* opCtx) {
}
throw;
} catch (DBException& e) {
- warning() << "db exception when initializing on " << shardId
- << ", current connection state is " << mdata.toBSON() << causedBy(redact(e));
+ LOGV2_WARNING(22702,
+ "db exception when initializing on {shardId}, current connection state "
+ "is {mdata}{causedBy_e}",
+ "shardId"_attr = shardId,
+ "mdata"_attr = mdata.toBSON(),
+ "causedBy_e"_attr = causedBy(redact(e)));
mdata.errored = true;
if (returnPartial && e.code() == 15925 /* From above! */) {
mdata.cleanup(true);
@@ -654,13 +697,20 @@ void ParallelSortClusteredCursor::startInit(OperationContext* opCtx) {
}
throw;
} catch (std::exception& e) {
- warning() << "exception when initializing on " << shardId
- << ", current connection state is " << mdata.toBSON() << causedBy(e);
+ LOGV2_WARNING(22703,
+ "exception when initializing on {shardId}, current connection state is "
+ "{mdata}{causedBy_e}",
+ "shardId"_attr = shardId,
+ "mdata"_attr = mdata.toBSON(),
+ "causedBy_e"_attr = causedBy(e));
mdata.errored = true;
throw;
} catch (...) {
- warning() << "unknown exception when initializing on " << shardId
- << ", current connection state is " << mdata.toBSON();
+ LOGV2_WARNING(22704,
+ "unknown exception when initializing on {shardId}, current connection "
+ "state is {mdata}",
+ "shardId"_attr = shardId,
+ "mdata"_attr = mdata.toBSON());
mdata.errored = true;
throw;
}
@@ -709,14 +759,20 @@ void ParallelSortClusteredCursor::finishInit(OperationContext* opCtx) {
bool retry = false;
map<string, StaleConfigException> staleNSExceptions;
- LOG(2) << "pcursor: finishing over " << _cursorMap.size() << " shards";
+ LOGV2_DEBUG(22685,
+ 2,
+ "pcursor: finishing over {cursorMap_size} shards",
+ "cursorMap_size"_attr = _cursorMap.size());
for (auto& cmEntry : _cursorMap) {
const auto& shardId = cmEntry.first;
auto& mdata = cmEntry.second;
- LOG(2) << "pcursor: finishing on shard " << shardId << ", current connection state is "
- << mdata.toBSON();
+ LOGV2_DEBUG(22686,
+ 2,
+ "pcursor: finishing on shard {shardId}, current connection state is {mdata}",
+ "shardId"_attr = shardId,
+ "mdata"_attr = mdata.toBSON());
// Ignore empty conns for now
if (!mdata.pcState)
@@ -763,8 +819,12 @@ void ParallelSortClusteredCursor::finishInit(OperationContext* opCtx) {
// Finalize state
state->cursor->attach(state->conn.get()); // Closes connection for us
- LOG(2) << "pcursor: finished on shard " << shardId
- << ", current connection state is " << mdata.toBSON();
+ LOGV2_DEBUG(
+ 22687,
+ 2,
+ "pcursor: finished on shard {shardId}, current connection state is {mdata}",
+ "shardId"_attr = shardId,
+ "mdata"_attr = mdata.toBSON());
}
} catch (StaleConfigException& e) {
retry = true;
@@ -778,8 +838,12 @@ void ParallelSortClusteredCursor::finishInit(OperationContext* opCtx) {
mdata.cleanup(true);
continue;
} catch (NetworkException& e) {
- warning() << "socket exception when finishing on " << shardId
- << ", current connection state is " << mdata.toBSON() << causedBy(redact(e));
+ LOGV2_WARNING(22705,
+ "socket exception when finishing on {shardId}, current connection state "
+ "is {mdata}{causedBy_e}",
+ "shardId"_attr = shardId,
+ "mdata"_attr = mdata.toBSON(),
+ "causedBy_e"_attr = causedBy(redact(e)));
mdata.errored = true;
if (returnPartial) {
mdata.cleanup(true);
@@ -790,9 +854,12 @@ void ParallelSortClusteredCursor::finishInit(OperationContext* opCtx) {
// NOTE: RECV() WILL NOT THROW A SOCKET EXCEPTION - WE GET THIS AS ERROR 15988 FROM
// ABOVE
if (e.code() == 15988) {
- warning() << "exception when receiving data from " << shardId
- << ", current connection state is " << mdata.toBSON()
- << causedBy(redact(e));
+ LOGV2_WARNING(22706,
+ "exception when receiving data from {shardId}, current connection "
+ "state is {mdata}{causedBy_e}",
+ "shardId"_attr = shardId,
+ "mdata"_attr = mdata.toBSON(),
+ "causedBy_e"_attr = causedBy(redact(e)));
mdata.errored = true;
if (returnPartial) {
@@ -804,23 +871,36 @@ void ParallelSortClusteredCursor::finishInit(OperationContext* opCtx) {
// the InvalidBSON exception indicates that the BSON is malformed ->
// don't print/call "mdata.toBSON()" to avoid unexpected errors e.g. a segfault
if (e.code() == ErrorCodes::InvalidBSON)
- warning() << "bson is malformed :: db exception when finishing on " << shardId
- << causedBy(redact(e));
+ LOGV2_WARNING(
+ 22707,
+ "bson is malformed :: db exception when finishing on {shardId}{causedBy_e}",
+ "shardId"_attr = shardId,
+ "causedBy_e"_attr = causedBy(redact(e)));
else
- warning() << "db exception when finishing on " << shardId
- << ", current connection state is " << mdata.toBSON()
- << causedBy(redact(e));
+ LOGV2_WARNING(22708,
+ "db exception when finishing on {shardId}, current connection "
+ "state is {mdata}{causedBy_e}",
+ "shardId"_attr = shardId,
+ "mdata"_attr = mdata.toBSON(),
+ "causedBy_e"_attr = causedBy(redact(e)));
mdata.errored = true;
throw;
}
} catch (std::exception& e) {
- warning() << "exception when finishing on " << shardId
- << ", current connection state is " << mdata.toBSON() << causedBy(e);
+ LOGV2_WARNING(22709,
+ "exception when finishing on {shardId}, current connection state is "
+ "{mdata}{causedBy_e}",
+ "shardId"_attr = shardId,
+ "mdata"_attr = mdata.toBSON(),
+ "causedBy_e"_attr = causedBy(e));
mdata.errored = true;
throw;
} catch (...) {
- warning() << "unknown exception when finishing on " << shardId
- << ", current connection state is " << mdata.toBSON();
+ LOGV2_WARNING(22710,
+ "unknown exception when finishing on {shardId}, current connection state "
+ "is {mdata}",
+ "shardId"_attr = shardId,
+ "mdata"_attr = mdata.toBSON());
mdata.errored = true;
throw;
}
@@ -844,13 +924,20 @@ void ParallelSortClusteredCursor::finishInit(OperationContext* opCtx) {
staleNS, *ex->getShardId());
}
- LOG(1) << "stale config of ns " << staleNS << " on finishing query, will retry"
- << causedBy(redact(ex));
+ LOGV2_DEBUG(
+ 22688,
+ 1,
+ "stale config of ns {staleNS} on finishing query, will retry{causedBy_ex}",
+ "staleNS"_attr = staleNS,
+ "causedBy_ex"_attr = causedBy(redact(ex)));
// This is somewhat strange
if (staleNS.ns() != ns) {
- warning() << "versioned ns " << ns << " doesn't match stale config namespace "
- << staleNS;
+ LOGV2_WARNING(
+ 22711,
+ "versioned ns {ns} doesn't match stale config namespace {staleNS}",
+ "ns"_attr = ns,
+ "staleNS"_attr = staleNS);
}
}
}
@@ -867,7 +954,7 @@ void ParallelSortClusteredCursor::finishInit(OperationContext* opCtx) {
// Erase empty stuff
if (!mdata.pcState) {
- log() << "PCursor erasing empty state " << mdata.toBSON();
+ LOGV2(22689, "PCursor erasing empty state {mdata}", "mdata"_attr = mdata.toBSON());
_cursorMap.erase(i++);
continue;
} else {
@@ -957,13 +1044,16 @@ void ParallelSortClusteredCursor::_oldInit(OperationContext* opCtx) {
bool firstPass = retryQueries.size() == 0;
if (!firstPass) {
- log() << "retrying " << (returnPartial ? "(partial) " : "")
- << "parallel connection to ";
+ LOGV2(22690,
+ "retrying {returnPartial_partial}parallel connection to ",
+ "returnPartial_partial"_attr = (returnPartial ? "(partial) " : ""));
for (set<int>::const_iterator it = retryQueries.begin(); it != retryQueries.end();
++it) {
- log() << serverHosts[*it] << ", ";
+ LOGV2(22691, "{serverHosts_it}, ", "serverHosts_it"_attr = serverHosts[*it]);
}
- log() << finishedQueries << " finished queries.";
+ LOGV2(22692,
+ "{finishedQueries} finished queries.",
+ "finishedQueries"_attr = finishedQueries);
}
size_t num = 0;
@@ -1008,9 +1098,15 @@ void ParallelSortClusteredCursor::_oldInit(OperationContext* opCtx) {
break;
}
- LOG(5) << "ParallelSortClusteredCursor::init server:" << serverHost << " ns:" << _ns
- << " query:" << redact(_query) << " fields:" << redact(_fields)
- << " options: " << _options;
+ LOGV2_DEBUG(22693,
+ 5,
+ "ParallelSortClusteredCursor::init server:{serverHost} ns:{ns} "
+ "query:{query} fields:{fields} options: {options}",
+ "serverHost"_attr = serverHost,
+ "ns"_attr = _ns,
+ "query"_attr = redact(_query),
+ "fields"_attr = redact(_fields),
+ "options"_attr = _options);
if (!_cursors[i].get())
_cursors[i].reset(
@@ -1061,8 +1157,10 @@ void ParallelSortClusteredCursor::_oldInit(OperationContext* opCtx) {
try {
if (!_cursors[i].get()->initLazyFinish(retry)) {
- warning() << "invalid result from " << conns[i]->getHost()
- << (retry ? ", retrying" : "");
+ LOGV2_WARNING(22712,
+ "invalid result from {conns_i_getHost}{retry_retrying}",
+ "conns_i_getHost"_attr = conns[i]->getHost(),
+ "retry_retrying"_attr = (retry ? ", retrying" : ""));
_cursors[i].reset(nullptr, nullptr);
if (!retry) {
@@ -1166,12 +1264,14 @@ void ParallelSortClusteredCursor::_oldInit(OperationContext* opCtx) {
} else if (throwException) {
uasserted(14827, errMsg.str());
} else {
- warning() << redact(errMsg.str());
+ LOGV2_WARNING(22713, "{errMsg_str}", "errMsg_str"_attr = redact(errMsg.str()));
}
}
if (retries > 0)
- log() << "successfully finished parallel query after " << retries << " retries";
+ LOGV2(22694,
+ "successfully finished parallel query after {retries} retries",
+ "retries"_attr = retries);
}
bool ParallelSortClusteredCursor::more() {
@@ -1264,9 +1364,9 @@ void ParallelConnectionMetadata::cleanup(bool full) {
bool retry = false;
pcState->cursor->initLazyFinish(retry);
} catch (std::exception&) {
- warning() << "exception closing cursor";
+ LOGV2_WARNING(22714, "exception closing cursor");
} catch (...) {
- warning() << "unknown exception closing cursor";
+ LOGV2_WARNING(22715, "unknown exception closing cursor");
}
}
}