summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline/mongos_process_interface.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/pipeline/mongos_process_interface.cpp')
-rw-r--r--src/mongo/db/pipeline/mongos_process_interface.cpp90
1 files changed, 41 insertions, 49 deletions
diff --git a/src/mongo/db/pipeline/mongos_process_interface.cpp b/src/mongo/db/pipeline/mongos_process_interface.cpp
index bffbe6e5b75..6a38971e08e 100644
--- a/src/mongo/db/pipeline/mongos_process_interface.cpp
+++ b/src/mongo/db/pipeline/mongos_process_interface.cpp
@@ -61,25 +61,6 @@ using std::unique_ptr;
namespace {
/**
- * Determines the single shard to which the given query will be targeted, and its associated
- * shardVersion. Throws if the query targets more than one shard.
- */
-std::pair<ShardId, ChunkVersion> getSingleTargetedShardForQuery(
- OperationContext* opCtx, const CachedCollectionRoutingInfo& routingInfo, BSONObj query) {
- if (auto chunkMgr = routingInfo.cm()) {
- std::set<ShardId> shardIds;
- chunkMgr->getShardIdsForQuery(opCtx, query, CollationSpec::kSimpleSpec, &shardIds);
- uassert(ErrorCodes::ChangeStreamFatalError,
- str::stream() << "Unable to target lookup query to a single shard: "
- << query.toString(),
- shardIds.size() == 1u);
- return {*shardIds.begin(), chunkMgr->getVersion(*shardIds.begin())};
- }
-
- return {routingInfo.db().primaryId(), ChunkVersion::UNSHARDED()};
-}
-
-/**
* Returns the routing information for the namespace set on the passed ExpressionContext. Also
* verifies that the ExpressionContext's UUID, if present, matches that of the routing table entry.
*/
@@ -152,8 +133,7 @@ boost::optional<Document> MongoSInterface::lookupSingleDocument(
bool allowSpeculativeMajorityRead) {
auto foreignExpCtx = expCtx->copyWith(nss, collectionUUID);
- // Create the find command to be dispatched to the shard in order to return the post-change
- // document.
+ // Create the find command to be dispatched to the shard(s) in order to return the post-image.
auto filterObj = filter.toBson();
BSONObjBuilder cmdBuilder;
bool findCmdIsByUuid(foreignExpCtx->uuid);
@@ -171,17 +151,20 @@ boost::optional<Document> MongoSInterface::lookupSingleDocument(
cmdBuilder.append("allowSpeculativeMajorityRead", true);
}
- auto shardResult = std::vector<RemoteCursor>();
+ auto shardResults = std::vector<RemoteCursor>();
auto findCmd = cmdBuilder.obj();
size_t numAttempts = 0;
while (++numAttempts <= kMaxNumStaleVersionRetries) {
- // Verify that the collection exists, with the correct UUID.
+ // Obtain the catalog cache. If we are retrying after a stale shard error, mark this
+ // operation as needing to block on the next catalog cache refresh.
auto catalogCache = Grid::get(expCtx->opCtx)->catalogCache();
auto swRoutingInfo = getCollectionRoutingInfo(foreignExpCtx);
if (swRoutingInfo == ErrorCodes::NamespaceNotFound) {
return boost::none;
}
auto routingInfo = uassertStatusOK(std::move(swRoutingInfo));
+
+ // Finalize the 'find' command object based on the routing table information.
if (findCmdIsByUuid && routingInfo.cm()) {
// Find by UUID and shard versioning do not work together (SERVER-31946). In the
// sharded case we've already checked the UUID, so find by namespace is safe. In the
@@ -192,25 +175,31 @@ boost::optional<Document> MongoSInterface::lookupSingleDocument(
findCmdIsByUuid = false;
}
- // Get the ID and version of the single shard to which this query will be sent.
- auto shardInfo = getSingleTargetedShardForQuery(expCtx->opCtx, routingInfo, filterObj);
-
- // Dispatch the request. This will only be sent to a single shard and only a single result
- // will be returned. The 'establishCursors' method conveniently prepares the result into a
- // cursor response for us.
try {
- shardResult = establishCursors(
+ // Build the versioned requests to be dispatched to the shards. Typically, only a single
+ // shard will be targeted here; however, in certain cases where only the _id is present,
+ // we may need to scatter-gather the query to all shards in order to find the document.
+ auto requests = getVersionedRequestsForTargetedShards(
+ expCtx->opCtx, nss, routingInfo, findCmd, filterObj, CollationSpec::kSimpleSpec);
+
+ // Dispatch the requests. The 'establishCursors' method conveniently prepares the result
+ // into a vector of cursor responses for us.
+ shardResults = establishCursors(
expCtx->opCtx,
Grid::get(expCtx->opCtx)->getExecutorPool()->getArbitraryExecutor(),
nss,
ReadPreferenceSetting::get(expCtx->opCtx),
- {{shardInfo.first, appendShardVersion(findCmd, shardInfo.second)}},
+ std::move(requests),
false);
break;
} catch (const ExceptionFor<ErrorCodes::NamespaceNotFound>&) {
// If it's an unsharded collection which has been deleted and re-created, we may get a
// NamespaceNotFound error when looking up by UUID.
return boost::none;
+ } catch (const ExceptionFor<ErrorCodes::StaleDbVersion>&) {
+ // If the database version is stale, refresh its entry in the catalog cache.
+ catalogCache->onStaleDatabaseVersion(nss.db(), routingInfo.db().databaseVersion());
+ continue; // Try again if allowed.
} catch (const ExceptionForCat<ErrorCategory::StaleShardVersionError>&) {
// If we hit a stale shardVersion exception, invalidate the routing table cache.
catalogCache->onStaleShardVersion(std::move(routingInfo));
@@ -219,25 +208,28 @@ boost::optional<Document> MongoSInterface::lookupSingleDocument(
break; // Success!
}
- invariant(shardResult.size() == 1u);
-
- auto& cursor = shardResult.front().getCursorResponse();
- auto& batch = cursor.getBatch();
-
- // We should have at most 1 result, and the cursor should be exhausted.
- uassert(ErrorCodes::ChangeStreamFatalError,
- str::stream() << "Shard cursor was unexpectedly open after lookup: "
- << shardResult.front().getHostAndPort()
- << ", id: " << cursor.getCursorId(),
- cursor.getCursorId() == 0);
- uassert(ErrorCodes::ChangeStreamFatalError,
- str::stream() << "found more than one document matching " << filter.toString() << " ["
- << batch.begin()->toString() << ", "
- << std::next(batch.begin())->toString() << "]",
- batch.size() <= 1u);
+ // Iterate all shard results and build a single composite batch. We also enforce the requirement
+ // that only a single document should have been returned from across the cluster.
+ std::vector<BSONObj> finalBatch;
+ for (auto&& shardResult : shardResults) {
+ auto& shardCursor = shardResult.getCursorResponse();
+ finalBatch.insert(
+ finalBatch.end(), shardCursor.getBatch().begin(), shardCursor.getBatch().end());
+ // We should have at most 1 result, and the cursor should be exhausted.
+ uassert(ErrorCodes::ChangeStreamFatalError,
+ str::stream() << "Shard cursor was unexpectedly open after lookup: "
+ << shardResult.getHostAndPort()
+ << ", id: " << shardCursor.getCursorId(),
+ shardCursor.getCursorId() == 0);
+ uassert(ErrorCodes::ChangeStreamFatalError,
+ str::stream() << "found more than one document matching " << filter.toString()
+ << " [" << finalBatch.begin()->toString() << ", "
+ << std::next(finalBatch.begin())->toString() << "]",
+ finalBatch.size() <= 1u);
+ }
- return (!batch.empty() ? Document(batch.front()) : boost::optional<Document>{});
-}
+ return (!finalBatch.empty() ? Document(finalBatch.front()) : boost::optional<Document>{});
+} // namespace mongo
BSONObj MongoSInterface::_reportCurrentOpForClient(OperationContext* opCtx,
Client* client,