diff options
author | Mathias Stearn <mathias@10gen.com> | 2018-01-08 17:42:04 -0500 |
---|---|---|
committer | Mathias Stearn <mathias@10gen.com> | 2018-01-17 17:10:47 -0500 |
commit | 84c3f83421b262028081296e4c387d29691ea376 (patch) | |
tree | 802785c4a4cd2f6a2f7ffb8c2a2193f7c06e1313 /src/mongo/s/query/cluster_find.cpp | |
parent | 076a499621f7b423d6541dc96254e7c947c33413 (diff) | |
download | mongo-84c3f83421b262028081296e4c387d29691ea376.tar.gz |
SERVER-32586 Convert ResolvedView to use ErrorExtraInfo
Diffstat (limited to 'src/mongo/s/query/cluster_find.cpp')
-rw-r--r-- | src/mongo/s/query/cluster_find.cpp | 131 |
1 files changed, 47 insertions, 84 deletions
diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp index c9d473bfcf8..f8e7a69ed33 100644 --- a/src/mongo/s/query/cluster_find.cpp +++ b/src/mongo/s/query/cluster_find.cpp @@ -163,13 +163,12 @@ StatusWith<std::unique_ptr<QueryRequest>> transformQueryForShards( return std::move(newQR); } -StatusWith<CursorId> runQueryWithoutRetrying(OperationContext* opCtx, - const CanonicalQuery& query, - const ReadPreferenceSetting& readPref, - ChunkManager* chunkManager, - std::shared_ptr<Shard> primary, - std::vector<BSONObj>* results, - BSONObj* viewDefinition) { +CursorId runQueryWithoutRetrying(OperationContext* opCtx, + const CanonicalQuery& query, + const ReadPreferenceSetting& readPref, + ChunkManager* chunkManager, + std::shared_ptr<Shard> primary, + std::vector<BSONObj>* results) { auto shardRegistry = Grid::get(opCtx)->shardRegistry(); // Get the set of shards on which we will run the query. @@ -187,11 +186,7 @@ StatusWith<CursorId> runQueryWithoutRetrying(OperationContext* opCtx, &shardIds); for (auto id : shardIds) { - auto shardStatus = shardRegistry->getShard(opCtx, id); - if (!shardStatus.isOK()) { - return shardStatus.getStatus(); - } - shards.emplace_back(shardStatus.getValue()); + shards.emplace_back(uassertStatusOK(shardRegistry->getShard(opCtx, id))); } } @@ -236,12 +231,8 @@ StatusWith<CursorId> runQueryWithoutRetrying(OperationContext* opCtx, // Tailable cursors can't have a sort, which should have already been validated. invariant(params.sort.isEmpty() || !query.getQueryRequest().isTailable()); - const auto qrToForward = - transformQueryForShards(query.getQueryRequest(), appendGeoNearDistanceProjection); - if (!qrToForward.isOK()) { - return qrToForward.getStatus(); - } - + const auto qrToForward = uassertStatusOK( + transformQueryForShards(query.getQueryRequest(), appendGeoNearDistanceProjection)); // Construct the find command that we will use to establish cursors, attaching the shardVersion. std::vector<std::pair<ShardId, BSONObj>> requests; @@ -249,7 +240,7 @@ StatusWith<CursorId> runQueryWithoutRetrying(OperationContext* opCtx, invariant(!shard->isConfig() || shard->getConnString().type() != ConnectionString::INVALID); BSONObjBuilder cmdBuilder; - qrToForward.getValue()->asFindCommand(&cmdBuilder); + qrToForward->asFindCommand(&cmdBuilder); if (chunkManager) { ChunkVersion version(chunkManager->getVersion(shard->getId())); @@ -264,34 +255,21 @@ StatusWith<CursorId> runQueryWithoutRetrying(OperationContext* opCtx, // Establish the cursors with a consistent shardVersion across shards. - auto swCursors = establishCursors(opCtx, + params.remotes = establishCursors(opCtx, Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), query.nss(), readPref, requests, - query.getQueryRequest().isAllowPartialResults(), - viewDefinition); - if (!swCursors.isOK()) { - // Make sure a viewDefinition was set if the find was on a view. - if (ErrorCodes::CommandOnShardedViewNotSupportedOnMongod == swCursors.getStatus().code()) { - if (!viewDefinition) { - return {ErrorCodes::InternalError, - str::stream() << "Missing resolved view definition, but remote returned " - << ErrorCodes::errorString(swCursors.getStatus().code())}; - } - } - return swCursors.getStatus(); - } + query.getQueryRequest().isAllowPartialResults()); // Determine whether the cursor we may eventually register will be single- or multi-target. - const auto cursorType = swCursors.getValue().size() > 1 + const auto cursorType = params.remotes.size() > 1 ? ClusterCursorManager::CursorType::MultiTarget : ClusterCursorManager::CursorType::SingleTarget; // Transfer the established cursors to a ClusterClientCursor. - params.remotes = std::move(swCursors.getValue()); auto ccc = ClusterClientCursorImpl::make( opCtx, Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), std::move(params)); @@ -301,13 +279,9 @@ StatusWith<CursorId> runQueryWithoutRetrying(OperationContext* opCtx, int bytesBuffered = 0; while (!FindCommon::enoughForFirstBatch(query.getQueryRequest(), results->size())) { - auto next = ccc->next(RouterExecStage::ExecContext::kInitialFind); - - if (!next.isOK()) { - return next.getStatus(); - } + auto next = uassertStatusOK(ccc->next(RouterExecStage::ExecContext::kInitialFind)); - if (next.getValue().isEOF()) { + if (next.isEOF()) { // We reached end-of-stream. If the cursor is not tailable, then we mark it as // exhausted. If it is tailable, usually we keep it open (i.e. "NotExhausted") even // when we reach end-of-stream. However, if all the remote cursors are exhausted, there @@ -318,7 +292,7 @@ StatusWith<CursorId> runQueryWithoutRetrying(OperationContext* opCtx, break; } - auto nextObj = *next.getValue().getResult(); + auto nextObj = *next.getResult(); // If adding this object will cause us to exceed the message size limit, then we stash it // for later. @@ -351,28 +325,27 @@ StatusWith<CursorId> runQueryWithoutRetrying(OperationContext* opCtx, const auto cursorLifetime = query.getQueryRequest().isNoCursorTimeout() ? ClusterCursorManager::CursorLifetime::Immortal : ClusterCursorManager::CursorLifetime::Mortal; - return cursorManager->registerCursor( - opCtx, ccc.releaseCursor(), query.nss(), cursorType, cursorLifetime); + return uassertStatusOK(cursorManager->registerCursor( + opCtx, ccc.releaseCursor(), query.nss(), cursorType, cursorLifetime)); } } // namespace const size_t ClusterFind::kMaxStaleConfigRetries = 10; -StatusWith<CursorId> ClusterFind::runQuery(OperationContext* opCtx, - const CanonicalQuery& query, - const ReadPreferenceSetting& readPref, - std::vector<BSONObj>* results, - BSONObj* viewDefinition) { +CursorId ClusterFind::runQuery(OperationContext* opCtx, + const CanonicalQuery& query, + const ReadPreferenceSetting& readPref, + std::vector<BSONObj>* results) { invariant(results); // Projection on the reserved sort key field is illegal in mongos. if (query.getQueryRequest().getProj().hasField(ClusterClientCursorParams::kSortKeyField)) { - return {ErrorCodes::BadValue, - str::stream() << "Projection contains illegal field '" - << ClusterClientCursorParams::kSortKeyField - << "': " - << query.getQueryRequest().getProj()}; + uasserted(ErrorCodes::BadValue, + str::stream() << "Projection contains illegal field '" + << ClusterClientCursorParams::kSortKeyField + << "': " + << query.getQueryRequest().getProj()); } auto const catalogCache = Grid::get(opCtx)->catalogCache(); @@ -385,43 +358,33 @@ StatusWith<CursorId> ClusterFind::runQuery(OperationContext* opCtx, // If the database doesn't exist, we successfully return an empty result set without // creating a cursor. return CursorId(0); - } else if (!routingInfoStatus.isOK()) { - return routingInfoStatus.getStatus(); - } - - auto& routingInfo = routingInfoStatus.getValue(); - - auto cursorId = runQueryWithoutRetrying(opCtx, - query, - readPref, - routingInfo.cm().get(), - routingInfo.primary(), - results, - viewDefinition); - if (cursorId.isOK()) { - return cursorId; } - const auto& status = cursorId.getStatus(); + auto routingInfo = uassertStatusOK(routingInfoStatus); - if (!ErrorCodes::isStaleShardingError(status.code()) && - status != ErrorCodes::ShardNotFound) { - // Errors other than trying to reach a non existent shard or receiving a stale - // metadata message from MongoD are fatal to the operation. Network errors and - // replication retries happen at the level of the AsyncResultsMerger. - return status; - } + try { + return runQueryWithoutRetrying( + opCtx, query, readPref, routingInfo.cm().get(), routingInfo.primary(), results); + } catch (const DBException& ex) { + if (!ErrorCodes::isStaleShardingError(ex.code()) && + ex.code() != ErrorCodes::ShardNotFound) { + // Errors other than trying to reach a non existent shard or receiving a stale + // metadata message from MongoD are fatal to the operation. Network errors and + // replication retries happen at the level of the AsyncResultsMerger. + throw; + } - LOG(1) << "Received error status for query " << redact(query.toStringShort()) - << " on attempt " << retries << " of " << kMaxStaleConfigRetries << ": " - << redact(status); + LOG(1) << "Received error status for query " << redact(query.toStringShort()) + << " on attempt " << retries << " of " << kMaxStaleConfigRetries << ": " + << redact(ex); - catalogCache->onStaleConfigError(std::move(routingInfo)); + catalogCache->onStaleConfigError(std::move(routingInfo)); + } } - return {ErrorCodes::StaleShardVersion, - str::stream() << "Retried " << kMaxStaleConfigRetries - << " times without successfully establishing shard version."}; + uasserted(ErrorCodes::StaleShardVersion, + str::stream() << "Retried " << kMaxStaleConfigRetries + << " times without successfully establishing shard version."); } StatusWith<CursorResponse> ClusterFind::runGetMore(OperationContext* opCtx, |