summaryrefslogtreecommitdiff
path: root/src/mongo/s/query/cluster_find.cpp
diff options
context:
space:
mode:
authorMathias Stearn <mathias@10gen.com>2018-01-08 17:42:04 -0500
committerMathias Stearn <mathias@10gen.com>2018-01-17 17:10:47 -0500
commit84c3f83421b262028081296e4c387d29691ea376 (patch)
tree802785c4a4cd2f6a2f7ffb8c2a2193f7c06e1313 /src/mongo/s/query/cluster_find.cpp
parent076a499621f7b423d6541dc96254e7c947c33413 (diff)
downloadmongo-84c3f83421b262028081296e4c387d29691ea376.tar.gz
SERVER-32586 Convert ResolvedView to use ErrorExtraInfo
Diffstat (limited to 'src/mongo/s/query/cluster_find.cpp')
-rw-r--r--src/mongo/s/query/cluster_find.cpp131
1 files changed, 47 insertions, 84 deletions
diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp
index c9d473bfcf8..f8e7a69ed33 100644
--- a/src/mongo/s/query/cluster_find.cpp
+++ b/src/mongo/s/query/cluster_find.cpp
@@ -163,13 +163,12 @@ StatusWith<std::unique_ptr<QueryRequest>> transformQueryForShards(
return std::move(newQR);
}
-StatusWith<CursorId> runQueryWithoutRetrying(OperationContext* opCtx,
- const CanonicalQuery& query,
- const ReadPreferenceSetting& readPref,
- ChunkManager* chunkManager,
- std::shared_ptr<Shard> primary,
- std::vector<BSONObj>* results,
- BSONObj* viewDefinition) {
+CursorId runQueryWithoutRetrying(OperationContext* opCtx,
+ const CanonicalQuery& query,
+ const ReadPreferenceSetting& readPref,
+ ChunkManager* chunkManager,
+ std::shared_ptr<Shard> primary,
+ std::vector<BSONObj>* results) {
auto shardRegistry = Grid::get(opCtx)->shardRegistry();
// Get the set of shards on which we will run the query.
@@ -187,11 +186,7 @@ StatusWith<CursorId> runQueryWithoutRetrying(OperationContext* opCtx,
&shardIds);
for (auto id : shardIds) {
- auto shardStatus = shardRegistry->getShard(opCtx, id);
- if (!shardStatus.isOK()) {
- return shardStatus.getStatus();
- }
- shards.emplace_back(shardStatus.getValue());
+ shards.emplace_back(uassertStatusOK(shardRegistry->getShard(opCtx, id)));
}
}
@@ -236,12 +231,8 @@ StatusWith<CursorId> runQueryWithoutRetrying(OperationContext* opCtx,
// Tailable cursors can't have a sort, which should have already been validated.
invariant(params.sort.isEmpty() || !query.getQueryRequest().isTailable());
- const auto qrToForward =
- transformQueryForShards(query.getQueryRequest(), appendGeoNearDistanceProjection);
- if (!qrToForward.isOK()) {
- return qrToForward.getStatus();
- }
-
+ const auto qrToForward = uassertStatusOK(
+ transformQueryForShards(query.getQueryRequest(), appendGeoNearDistanceProjection));
// Construct the find command that we will use to establish cursors, attaching the shardVersion.
std::vector<std::pair<ShardId, BSONObj>> requests;
@@ -249,7 +240,7 @@ StatusWith<CursorId> runQueryWithoutRetrying(OperationContext* opCtx,
invariant(!shard->isConfig() || shard->getConnString().type() != ConnectionString::INVALID);
BSONObjBuilder cmdBuilder;
- qrToForward.getValue()->asFindCommand(&cmdBuilder);
+ qrToForward->asFindCommand(&cmdBuilder);
if (chunkManager) {
ChunkVersion version(chunkManager->getVersion(shard->getId()));
@@ -264,34 +255,21 @@ StatusWith<CursorId> runQueryWithoutRetrying(OperationContext* opCtx,
// Establish the cursors with a consistent shardVersion across shards.
- auto swCursors = establishCursors(opCtx,
+ params.remotes = establishCursors(opCtx,
Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(),
query.nss(),
readPref,
requests,
- query.getQueryRequest().isAllowPartialResults(),
- viewDefinition);
- if (!swCursors.isOK()) {
- // Make sure a viewDefinition was set if the find was on a view.
- if (ErrorCodes::CommandOnShardedViewNotSupportedOnMongod == swCursors.getStatus().code()) {
- if (!viewDefinition) {
- return {ErrorCodes::InternalError,
- str::stream() << "Missing resolved view definition, but remote returned "
- << ErrorCodes::errorString(swCursors.getStatus().code())};
- }
- }
- return swCursors.getStatus();
- }
+ query.getQueryRequest().isAllowPartialResults());
// Determine whether the cursor we may eventually register will be single- or multi-target.
- const auto cursorType = swCursors.getValue().size() > 1
+ const auto cursorType = params.remotes.size() > 1
? ClusterCursorManager::CursorType::MultiTarget
: ClusterCursorManager::CursorType::SingleTarget;
// Transfer the established cursors to a ClusterClientCursor.
- params.remotes = std::move(swCursors.getValue());
auto ccc = ClusterClientCursorImpl::make(
opCtx, Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), std::move(params));
@@ -301,13 +279,9 @@ StatusWith<CursorId> runQueryWithoutRetrying(OperationContext* opCtx,
int bytesBuffered = 0;
while (!FindCommon::enoughForFirstBatch(query.getQueryRequest(), results->size())) {
- auto next = ccc->next(RouterExecStage::ExecContext::kInitialFind);
-
- if (!next.isOK()) {
- return next.getStatus();
- }
+ auto next = uassertStatusOK(ccc->next(RouterExecStage::ExecContext::kInitialFind));
- if (next.getValue().isEOF()) {
+ if (next.isEOF()) {
// We reached end-of-stream. If the cursor is not tailable, then we mark it as
// exhausted. If it is tailable, usually we keep it open (i.e. "NotExhausted") even
// when we reach end-of-stream. However, if all the remote cursors are exhausted, there
@@ -318,7 +292,7 @@ StatusWith<CursorId> runQueryWithoutRetrying(OperationContext* opCtx,
break;
}
- auto nextObj = *next.getValue().getResult();
+ auto nextObj = *next.getResult();
// If adding this object will cause us to exceed the message size limit, then we stash it
// for later.
@@ -351,28 +325,27 @@ StatusWith<CursorId> runQueryWithoutRetrying(OperationContext* opCtx,
const auto cursorLifetime = query.getQueryRequest().isNoCursorTimeout()
? ClusterCursorManager::CursorLifetime::Immortal
: ClusterCursorManager::CursorLifetime::Mortal;
- return cursorManager->registerCursor(
- opCtx, ccc.releaseCursor(), query.nss(), cursorType, cursorLifetime);
+ return uassertStatusOK(cursorManager->registerCursor(
+ opCtx, ccc.releaseCursor(), query.nss(), cursorType, cursorLifetime));
}
} // namespace
const size_t ClusterFind::kMaxStaleConfigRetries = 10;
-StatusWith<CursorId> ClusterFind::runQuery(OperationContext* opCtx,
- const CanonicalQuery& query,
- const ReadPreferenceSetting& readPref,
- std::vector<BSONObj>* results,
- BSONObj* viewDefinition) {
+CursorId ClusterFind::runQuery(OperationContext* opCtx,
+ const CanonicalQuery& query,
+ const ReadPreferenceSetting& readPref,
+ std::vector<BSONObj>* results) {
invariant(results);
// Projection on the reserved sort key field is illegal in mongos.
if (query.getQueryRequest().getProj().hasField(ClusterClientCursorParams::kSortKeyField)) {
- return {ErrorCodes::BadValue,
- str::stream() << "Projection contains illegal field '"
- << ClusterClientCursorParams::kSortKeyField
- << "': "
- << query.getQueryRequest().getProj()};
+ uasserted(ErrorCodes::BadValue,
+ str::stream() << "Projection contains illegal field '"
+ << ClusterClientCursorParams::kSortKeyField
+ << "': "
+ << query.getQueryRequest().getProj());
}
auto const catalogCache = Grid::get(opCtx)->catalogCache();
@@ -385,43 +358,33 @@ StatusWith<CursorId> ClusterFind::runQuery(OperationContext* opCtx,
// If the database doesn't exist, we successfully return an empty result set without
// creating a cursor.
return CursorId(0);
- } else if (!routingInfoStatus.isOK()) {
- return routingInfoStatus.getStatus();
- }
-
- auto& routingInfo = routingInfoStatus.getValue();
-
- auto cursorId = runQueryWithoutRetrying(opCtx,
- query,
- readPref,
- routingInfo.cm().get(),
- routingInfo.primary(),
- results,
- viewDefinition);
- if (cursorId.isOK()) {
- return cursorId;
}
- const auto& status = cursorId.getStatus();
+ auto routingInfo = uassertStatusOK(routingInfoStatus);
- if (!ErrorCodes::isStaleShardingError(status.code()) &&
- status != ErrorCodes::ShardNotFound) {
- // Errors other than trying to reach a non existent shard or receiving a stale
- // metadata message from MongoD are fatal to the operation. Network errors and
- // replication retries happen at the level of the AsyncResultsMerger.
- return status;
- }
+ try {
+ return runQueryWithoutRetrying(
+ opCtx, query, readPref, routingInfo.cm().get(), routingInfo.primary(), results);
+ } catch (const DBException& ex) {
+ if (!ErrorCodes::isStaleShardingError(ex.code()) &&
+ ex.code() != ErrorCodes::ShardNotFound) {
+ // Errors other than trying to reach a non existent shard or receiving a stale
+ // metadata message from MongoD are fatal to the operation. Network errors and
+ // replication retries happen at the level of the AsyncResultsMerger.
+ throw;
+ }
- LOG(1) << "Received error status for query " << redact(query.toStringShort())
- << " on attempt " << retries << " of " << kMaxStaleConfigRetries << ": "
- << redact(status);
+ LOG(1) << "Received error status for query " << redact(query.toStringShort())
+ << " on attempt " << retries << " of " << kMaxStaleConfigRetries << ": "
+ << redact(ex);
- catalogCache->onStaleConfigError(std::move(routingInfo));
+ catalogCache->onStaleConfigError(std::move(routingInfo));
+ }
}
- return {ErrorCodes::StaleShardVersion,
- str::stream() << "Retried " << kMaxStaleConfigRetries
- << " times without successfully establishing shard version."};
+ uasserted(ErrorCodes::StaleShardVersion,
+ str::stream() << "Retried " << kMaxStaleConfigRetries
+ << " times without successfully establishing shard version.");
}
StatusWith<CursorResponse> ClusterFind::runGetMore(OperationContext* opCtx,