diff options
author | Bernard Gorman <bernard.gorman@gmail.com> | 2019-12-29 18:12:09 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-01-25 03:47:19 +0000 |
commit | 6c45478fbdc994353541a0f05ff202cedf251d7a (patch) | |
tree | 88c318fe11b559428318fa04f6a581a0943a0d55 /src/mongo/db | |
parent | e192a049f15905b65b36b485a47d85cdee8e80e2 (diff) | |
download | mongo-6c45478fbdc994353541a0f05ff202cedf251d7a.tar.gz |
SERVER-44484 Allow change stream update lookup to retrieve post-image by _id
create mode 100644 jstests/sharding/change_streams_unsharded_update_resume.js
Diffstat (limited to 'src/mongo/db')
-rw-r--r-- | src/mongo/db/pipeline/mongos_process_interface.cpp | 111 |
1 files changed, 50 insertions, 61 deletions
diff --git a/src/mongo/db/pipeline/mongos_process_interface.cpp b/src/mongo/db/pipeline/mongos_process_interface.cpp index 6520a94e035..de70e39372e 100644 --- a/src/mongo/db/pipeline/mongos_process_interface.cpp +++ b/src/mongo/db/pipeline/mongos_process_interface.cpp @@ -61,25 +61,6 @@ using std::unique_ptr; namespace { /** - * Determines the single shard to which the given query will be targeted, and its associated - * shardVersion. Throws if the query targets more than one shard. - */ -std::pair<ShardId, ChunkVersion> getSingleTargetedShardForQuery( - OperationContext* opCtx, const CachedCollectionRoutingInfo& routingInfo, BSONObj query) { - if (auto chunkMgr = routingInfo.cm()) { - std::set<ShardId> shardIds; - chunkMgr->getShardIdsForQuery(opCtx, query, CollationSpec::kSimpleSpec, &shardIds); - uassert(ErrorCodes::ChangeStreamFatalError, - str::stream() << "Unable to target lookup query to a single shard: " - << query.toString(), - shardIds.size() == 1u); - return {*shardIds.begin(), chunkMgr->getVersion(*shardIds.begin())}; - } - - return {routingInfo.db().primaryId(), ChunkVersion::UNSHARDED()}; -} - -/** * Returns the routing information for the namespace set on the passed ExpressionContext. Also * verifies that the ExpressionContext's UUID, if present, matches that of the routing table entry. */ @@ -152,8 +133,7 @@ boost::optional<Document> MongoSInterface::lookupSingleDocument( bool allowSpeculativeMajorityRead) { auto foreignExpCtx = expCtx->copyWith(nss, collectionUUID); - // Create the find command to be dispatched to the shard in order to return the post-change - // document. + // Create the find command to be dispatched to the shard(s) in order to return the post-image. auto filterObj = filter.toBson(); BSONObjBuilder cmdBuilder; bool findCmdIsByUuid(foreignExpCtx->uuid); @@ -170,17 +150,22 @@ boost::optional<Document> MongoSInterface::lookupSingleDocument( cmdBuilder.append("allowSpeculativeMajorityRead", true); } - auto shardResult = std::vector<RemoteCursor>(); + auto shardResults = std::vector<RemoteCursor>(); auto findCmd = cmdBuilder.obj(); - size_t numAttempts = 0; - while (++numAttempts <= kMaxNumStaleVersionRetries) { - // Verify that the collection exists, with the correct UUID. + for (size_t numRetries = 0; numRetries <= kMaxNumStaleVersionRetries; ++numRetries) { + // Obtain the catalog cache. If we are retrying after a stale shard error, mark this + // operation as needing to block on the next catalog cache refresh. auto catalogCache = Grid::get(expCtx->opCtx)->catalogCache(); + catalogCache->setOperationShouldBlockBehindCatalogCacheRefresh(expCtx->opCtx, numRetries); + + // Verify that the collection exists, with the correct UUID. auto swRoutingInfo = getCollectionRoutingInfo(foreignExpCtx); if (swRoutingInfo == ErrorCodes::NamespaceNotFound) { return boost::none; } auto routingInfo = uassertStatusOK(std::move(swRoutingInfo)); + + // Finalize the 'find' command object based on the routing table information. if (findCmdIsByUuid && routingInfo.cm()) { // Find by UUID and shard versioning do not work together (SERVER-31946). In the // sharded case we've already checked the UUID, so find by namespace is safe. In the @@ -191,62 +176,66 @@ boost::optional<Document> MongoSInterface::lookupSingleDocument( findCmdIsByUuid = false; } - // Dispatch the request. This will only be sent to a single shard and only a single result - // will be returned. The 'establishCursors' method conveniently prepares the result into a - // cursor response for us. try { - // Get the ID and version of the single shard to which this query will be sent. - auto shardInfo = getSingleTargetedShardForQuery(expCtx->opCtx, routingInfo, filterObj); - - shardResult = establishCursors( + // Build the versioned requests to be dispatched to the shards. Typically, only a single + // shard will be targeted here; however, in certain cases where only the _id is present, + // we may need to scatter-gather the query to all shards in order to find the document. + auto requests = getVersionedRequestsForTargetedShards( + expCtx->opCtx, nss, routingInfo, findCmd, filterObj, CollationSpec::kSimpleSpec); + + // Dispatch the requests. The 'establishCursors' method conveniently prepares the result + // into a vector of cursor responses for us. + shardResults = establishCursors( expCtx->opCtx, Grid::get(expCtx->opCtx)->getExecutorPool()->getArbitraryExecutor(), nss, ReadPreferenceSetting::get(expCtx->opCtx), - {{shardInfo.first, appendShardVersion(findCmd, shardInfo.second)}}, + std::move(requests), false); - break; } catch (const ExceptionFor<ErrorCodes::NamespaceNotFound>&) { // If it's an unsharded collection which has been deleted and re-created, we may get a // NamespaceNotFound error when looking up by UUID. return boost::none; + } catch (const ExceptionFor<ErrorCodes::StaleDbVersion>&) { + // If the database version is stale, refresh its entry in the catalog cache. + catalogCache->onStaleDatabaseVersion(nss.db(), routingInfo.db().databaseVersion()); + continue; // Try again if allowed. } catch (const ExceptionForCat<ErrorCategory::StaleShardVersionError>& e) { - if (auto staleInfo = e.extraInfo<StaleConfigInfo>()) { - catalogCache->invalidateShardOrEntireCollectionEntryForShardedCollection( - expCtx->opCtx, - nss, - staleInfo->getVersionWanted(), - staleInfo->getVersionReceived(), - staleInfo->getShardId()); + // If the exception provides a shardId, add it to the set of shards requiring a refresh. + // If the cache currently considers the collection to be unsharded, this will trigger an + // epoch refresh. If no shard is provided, then the epoch is stale and we must refresh. + auto staleInfo = e.extraInfo<StaleConfigInfo>(); + if (auto staleShardId = (staleInfo ? staleInfo->getShardId() : boost::none)) { + catalogCache->onStaleShardVersion(std::move(routingInfo), *staleShardId); } else { catalogCache->onEpochChange(nss); } - - catalogCache->setOperationShouldBlockBehindCatalogCacheRefresh(expCtx->opCtx, true); - continue; // Try again if allowed. } break; // Success! } - invariant(shardResult.size() == 1u); - - auto& cursor = shardResult.front().getCursorResponse(); - auto& batch = cursor.getBatch(); - - // We should have at most 1 result, and the cursor should be exhausted. - uassert(ErrorCodes::ChangeStreamFatalError, - str::stream() << "Shard cursor was unexpectedly open after lookup: " - << shardResult.front().getHostAndPort() - << ", id: " << cursor.getCursorId(), - cursor.getCursorId() == 0); - uassert(ErrorCodes::ChangeStreamFatalError, - str::stream() << "found more than one document matching " << filter.toString() << " [" - << batch.begin()->toString() << ", " - << std::next(batch.begin())->toString() << "]", - batch.size() <= 1u); + // Iterate all shard results and build a single composite batch. We also enforce the requirement + // that only a single document should have been returned from across the cluster. + std::vector<BSONObj> finalBatch; + for (auto&& shardResult : shardResults) { + auto& shardCursor = shardResult.getCursorResponse(); + finalBatch.insert( + finalBatch.end(), shardCursor.getBatch().begin(), shardCursor.getBatch().end()); + // We should have at most 1 result, and the cursor should be exhausted. + uassert(ErrorCodes::ChangeStreamFatalError, + str::stream() << "Shard cursor was unexpectedly open after lookup: " + << shardResult.getHostAndPort() + << ", id: " << shardCursor.getCursorId(), + shardCursor.getCursorId() == 0); + uassert(ErrorCodes::ChangeStreamFatalError, + str::stream() << "found more than one document matching " << filter.toString() + << " [" << finalBatch.begin()->toString() << ", " + << std::next(finalBatch.begin())->toString() << "]", + finalBatch.size() <= 1u); + } - return (!batch.empty() ? Document(batch.front()) : boost::optional<Document>{}); + return (!finalBatch.empty() ? Document(finalBatch.front()) : boost::optional<Document>{}); } BSONObj MongoSInterface::_reportCurrentOpForClient(OperationContext* opCtx, |