From 04a653db3c1734b2e7d8cb612e56f303115be4e3 Mon Sep 17 00:00:00 2001 From: Rishab Joshi Date: Fri, 3 Dec 2021 16:54:58 +0000 Subject: SERVER-44484 Changestream with updateLookup uasserts on updates from before collection was sharded. --- .../change_streams_unsharded_update_resume.js | 63 +++++++++++++++ src/mongo/db/pipeline/mongos_process_interface.cpp | 90 ++++++++++------------ src/mongo/s/cluster_commands_helpers.cpp | 20 +++++ src/mongo/s/cluster_commands_helpers.h | 12 +++ 4 files changed, 136 insertions(+), 49 deletions(-) create mode 100644 jstests/sharding/change_streams_unsharded_update_resume.js diff --git a/jstests/sharding/change_streams_unsharded_update_resume.js b/jstests/sharding/change_streams_unsharded_update_resume.js new file mode 100644 index 00000000000..1b4cad51941 --- /dev/null +++ b/jstests/sharding/change_streams_unsharded_update_resume.js @@ -0,0 +1,63 @@ +/** + * Tests that the post-image of an update which occurred while the collection was unsharded can + * still be looked up after the collection becomes sharded. Exercises the fix for SERVER-44484. + * @tags: [uses_change_streams, requires_fcv_44] + */ +(function() { +"use strict"; + +// Start a new sharded cluster with 2 nodes and obtain references to the test DB and collection. +const st = new ShardingTest({ + shards: 2, + mongos: 1, + rs: {nodes: 1, setParameter: {periodicNoopIntervalSecs: 1, writePeriodicNoops: true}} +}); + +const mongosDB = st.s0.getDB(jsTestName()); +const mongosColl = mongosDB.test; + +// Open a change stream on the unsharded collection. +let csCur = mongosColl.watch([], {fullDocument: "updateLookup"}); + +// Insert one document with a known _id into the unsharded collection, and obtain its resume token. +assert.commandWorked(mongosColl.insert({_id: 0, a: -100})); +assert.soon(() => csCur.hasNext()); +const insertEvent = csCur.next(); +assert.eq(insertEvent.operationType, "insert"); + +// Update the document and confirm that we can see the updateLookup fullDocument. +assert.commandWorked(mongosColl.update({_id: 0}, {$set: {updated: true}})); +assert.soon(() => csCur.hasNext()); +let updateEvent = csCur.next(); +assert.eq(updateEvent.operationType, "update"); +assert.docEq(updateEvent.fullDocument, {_id: 0, a: -100, updated: true}); + +// Now shard the collection on {a: 1} and move the upper chunk to the other shard. +assert.commandWorked(mongosColl.createIndex({a: 1})); +st.shardColl(mongosColl, {a: 1}, {a: 0}, {a: 0}); + +// Resume a change stream just after the initial insert. We expect the update lookup to succeed, +// despite the fact that only the _id and not the entire documentKey was recorded in the oplog. +csCur = mongosColl.watch([], {resumeAfter: insertEvent._id, fullDocument: "updateLookup"}); +assert.soon(() => csCur.hasNext()); +updateEvent = csCur.next(); +assert.eq(updateEvent.operationType, "update"); +assert.docEq(updateEvent.fullDocument, {_id: 0, a: -100, updated: true}); + +// Insert a second document with the same _id on the second shard. +assert.commandWorked(mongosColl.insert({_id: 0, a: 100})); + +// Now that two documents with the same _id are present, the update lookup fails. +csCur = mongosColl.watch([], {resumeAfter: insertEvent._id, fullDocument: "updateLookup"}); +assert.soon(() => { + try { + assert.eq(csCur.hasNext(), false, () => tojson(csCur.next())); + return false; + } catch (ex) { + assert.eq(ex.code, ErrorCodes.ChangeStreamFatalError); + return true; + } +}); + +st.stop(); +})(); diff --git a/src/mongo/db/pipeline/mongos_process_interface.cpp b/src/mongo/db/pipeline/mongos_process_interface.cpp index bffbe6e5b75..6a38971e08e 100644 --- a/src/mongo/db/pipeline/mongos_process_interface.cpp +++ b/src/mongo/db/pipeline/mongos_process_interface.cpp @@ -60,25 +60,6 @@ using std::unique_ptr; namespace { -/** - * Determines the single shard to which the given query will be targeted, and its associated - * shardVersion. Throws if the query targets more than one shard. - */ -std::pair getSingleTargetedShardForQuery( - OperationContext* opCtx, const CachedCollectionRoutingInfo& routingInfo, BSONObj query) { - if (auto chunkMgr = routingInfo.cm()) { - std::set shardIds; - chunkMgr->getShardIdsForQuery(opCtx, query, CollationSpec::kSimpleSpec, &shardIds); - uassert(ErrorCodes::ChangeStreamFatalError, - str::stream() << "Unable to target lookup query to a single shard: " - << query.toString(), - shardIds.size() == 1u); - return {*shardIds.begin(), chunkMgr->getVersion(*shardIds.begin())}; - } - - return {routingInfo.db().primaryId(), ChunkVersion::UNSHARDED()}; -} - /** * Returns the routing information for the namespace set on the passed ExpressionContext. Also * verifies that the ExpressionContext's UUID, if present, matches that of the routing table entry. @@ -152,8 +133,7 @@ boost::optional MongoSInterface::lookupSingleDocument( bool allowSpeculativeMajorityRead) { auto foreignExpCtx = expCtx->copyWith(nss, collectionUUID); - // Create the find command to be dispatched to the shard in order to return the post-change - // document. + // Create the find command to be dispatched to the shard(s) in order to return the post-image. auto filterObj = filter.toBson(); BSONObjBuilder cmdBuilder; bool findCmdIsByUuid(foreignExpCtx->uuid); @@ -171,17 +151,20 @@ boost::optional MongoSInterface::lookupSingleDocument( cmdBuilder.append("allowSpeculativeMajorityRead", true); } - auto shardResult = std::vector(); + auto shardResults = std::vector(); auto findCmd = cmdBuilder.obj(); size_t numAttempts = 0; while (++numAttempts <= kMaxNumStaleVersionRetries) { - // Verify that the collection exists, with the correct UUID. + // Obtain the catalog cache. If we are retrying after a stale shard error, mark this + // operation as needing to block on the next catalog cache refresh. auto catalogCache = Grid::get(expCtx->opCtx)->catalogCache(); auto swRoutingInfo = getCollectionRoutingInfo(foreignExpCtx); if (swRoutingInfo == ErrorCodes::NamespaceNotFound) { return boost::none; } auto routingInfo = uassertStatusOK(std::move(swRoutingInfo)); + + // Finalize the 'find' command object based on the routing table information. if (findCmdIsByUuid && routingInfo.cm()) { // Find by UUID and shard versioning do not work together (SERVER-31946). In the // sharded case we've already checked the UUID, so find by namespace is safe. In the @@ -192,25 +175,31 @@ boost::optional MongoSInterface::lookupSingleDocument( findCmdIsByUuid = false; } - // Get the ID and version of the single shard to which this query will be sent. - auto shardInfo = getSingleTargetedShardForQuery(expCtx->opCtx, routingInfo, filterObj); - - // Dispatch the request. This will only be sent to a single shard and only a single result - // will be returned. The 'establishCursors' method conveniently prepares the result into a - // cursor response for us. try { - shardResult = establishCursors( + // Build the versioned requests to be dispatched to the shards. Typically, only a single + // shard will be targeted here; however, in certain cases where only the _id is present, + // we may need to scatter-gather the query to all shards in order to find the document. + auto requests = getVersionedRequestsForTargetedShards( + expCtx->opCtx, nss, routingInfo, findCmd, filterObj, CollationSpec::kSimpleSpec); + + // Dispatch the requests. The 'establishCursors' method conveniently prepares the result + // into a vector of cursor responses for us. + shardResults = establishCursors( expCtx->opCtx, Grid::get(expCtx->opCtx)->getExecutorPool()->getArbitraryExecutor(), nss, ReadPreferenceSetting::get(expCtx->opCtx), - {{shardInfo.first, appendShardVersion(findCmd, shardInfo.second)}}, + std::move(requests), false); break; } catch (const ExceptionFor&) { // If it's an unsharded collection which has been deleted and re-created, we may get a // NamespaceNotFound error when looking up by UUID. return boost::none; + } catch (const ExceptionFor&) { + // If the database version is stale, refresh its entry in the catalog cache. + catalogCache->onStaleDatabaseVersion(nss.db(), routingInfo.db().databaseVersion()); + continue; // Try again if allowed. } catch (const ExceptionForCat&) { // If we hit a stale shardVersion exception, invalidate the routing table cache. catalogCache->onStaleShardVersion(std::move(routingInfo)); @@ -219,25 +208,28 @@ boost::optional MongoSInterface::lookupSingleDocument( break; // Success! } - invariant(shardResult.size() == 1u); - - auto& cursor = shardResult.front().getCursorResponse(); - auto& batch = cursor.getBatch(); - - // We should have at most 1 result, and the cursor should be exhausted. - uassert(ErrorCodes::ChangeStreamFatalError, - str::stream() << "Shard cursor was unexpectedly open after lookup: " - << shardResult.front().getHostAndPort() - << ", id: " << cursor.getCursorId(), - cursor.getCursorId() == 0); - uassert(ErrorCodes::ChangeStreamFatalError, - str::stream() << "found more than one document matching " << filter.toString() << " [" - << batch.begin()->toString() << ", " - << std::next(batch.begin())->toString() << "]", - batch.size() <= 1u); + // Iterate all shard results and build a single composite batch. We also enforce the requirement + // that only a single document should have been returned from across the cluster. + std::vector finalBatch; + for (auto&& shardResult : shardResults) { + auto& shardCursor = shardResult.getCursorResponse(); + finalBatch.insert( + finalBatch.end(), shardCursor.getBatch().begin(), shardCursor.getBatch().end()); + // We should have at most 1 result, and the cursor should be exhausted. + uassert(ErrorCodes::ChangeStreamFatalError, + str::stream() << "Shard cursor was unexpectedly open after lookup: " + << shardResult.getHostAndPort() + << ", id: " << shardCursor.getCursorId(), + shardCursor.getCursorId() == 0); + uassert(ErrorCodes::ChangeStreamFatalError, + str::stream() << "found more than one document matching " << filter.toString() + << " [" << finalBatch.begin()->toString() << ", " + << std::next(finalBatch.begin())->toString() << "]", + finalBatch.size() <= 1u); + } - return (!batch.empty() ? Document(batch.front()) : boost::optional{}); -} + return (!finalBatch.empty() ? Document(finalBatch.front()) : boost::optional{}); +} // namespace mongo BSONObj MongoSInterface::_reportCurrentOpForClient(OperationContext* opCtx, Client* client, diff --git a/src/mongo/s/cluster_commands_helpers.cpp b/src/mongo/s/cluster_commands_helpers.cpp index f5f5c0fb812..fea6ded53a5 100644 --- a/src/mongo/s/cluster_commands_helpers.cpp +++ b/src/mongo/s/cluster_commands_helpers.cpp @@ -563,6 +563,26 @@ std::set getTargetedShardsForQuery(OperationContext* opCtx, return {routingInfo.db().primaryId()}; } +std::vector> getVersionedRequestsForTargetedShards( + OperationContext* opCtx, + const NamespaceString& nss, + const CachedCollectionRoutingInfo& routingInfo, + const BSONObj& cmdObj, + const BSONObj& query, + const BSONObj& collation) { + std::vector> requests; + auto ars_requests = + buildVersionedRequestsForTargetedShards(opCtx, nss, routingInfo, cmdObj, query, collation); + std::transform(std::make_move_iterator(ars_requests.begin()), + std::make_move_iterator(ars_requests.end()), + std::back_inserter(requests), + [](auto&& ars) { + return std::pair(std::move(ars.shardId), + std::move(ars.cmdObj)); + }); + return requests; +} + StatusWith getCollectionRoutingInfoForTxnCmd( OperationContext* opCtx, const NamespaceString& nss) { auto catalogCache = Grid::get(opCtx)->catalogCache(); diff --git a/src/mongo/s/cluster_commands_helpers.h b/src/mongo/s/cluster_commands_helpers.h index a2b1c6465f3..9b097f92a09 100644 --- a/src/mongo/s/cluster_commands_helpers.h +++ b/src/mongo/s/cluster_commands_helpers.h @@ -217,6 +217,18 @@ std::set getTargetedShardsForQuery(OperationContext* opCtx, const BSONObj& query, const BSONObj& collation); +/** + * Determines the shard(s) to which the given query will be targeted, and builds a separate + * versioned copy of the command object for each such shard. + */ +std::vector> getVersionedRequestsForTargetedShards( + OperationContext* opCtx, + const NamespaceString& nss, + const CachedCollectionRoutingInfo& routingInfo, + const BSONObj& cmdObj, + const BSONObj& query, + const BSONObj& collation); + /** * If the command is running in a transaction, returns the proper routing table to use for targeting * shards. If there is no active transaction or the transaction is not running with snapshot level -- cgit v1.2.1