summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBernard Gorman <bernard.gorman@gmail.com>2019-12-29 18:12:09 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-01-25 03:47:19 +0000
commit6c45478fbdc994353541a0f05ff202cedf251d7a (patch)
tree88c318fe11b559428318fa04f6a581a0943a0d55
parente192a049f15905b65b36b485a47d85cdee8e80e2 (diff)
downloadmongo-6c45478fbdc994353541a0f05ff202cedf251d7a.tar.gz
SERVER-44484 Allow change stream update lookup to retrieve post-image by _id
create mode 100644 jstests/sharding/change_streams_unsharded_update_resume.js
-rw-r--r--jstests/sharding/change_streams_unsharded_update_resume.js63
-rw-r--r--jstests/sharding/change_streams_update_lookup_shard_metadata_missing.js28
-rw-r--r--src/mongo/db/pipeline/mongos_process_interface.cpp111
-rw-r--r--src/mongo/s/cluster_commands_helpers.cpp20
-rw-r--r--src/mongo/s/cluster_commands_helpers.h12
5 files changed, 160 insertions, 74 deletions
diff --git a/jstests/sharding/change_streams_unsharded_update_resume.js b/jstests/sharding/change_streams_unsharded_update_resume.js
new file mode 100644
index 00000000000..281701144c4
--- /dev/null
+++ b/jstests/sharding/change_streams_unsharded_update_resume.js
@@ -0,0 +1,63 @@
+/**
+ * Tests that the post-image of an update which occurred while the collection was unsharded can
+ * still be looked up after the collection becomes sharded. Exercises the fix for SERVER-44484.
+ * @tags: [uses_change_streams, requires_fcv_44]
+ */
+(function() {
+"use strict";
+
+// Start a new sharded cluster with 2 nodes and obtain references to the test DB and collection.
+const st = new ShardingTest({
+ shards: 2,
+ mongos: 1,
+ rs: {nodes: 1, setParameter: {periodicNoopIntervalSecs: 1, writePeriodicNoops: true}}
+});
+
+const mongosDB = st.s0.getDB(jsTestName());
+const mongosColl = mongosDB.test;
+
+// Open a change stream on the unsharded collection.
+let csCur = mongosColl.watch([], {fullDocument: "updateLookup"});
+
+// Insert one document with a known _id into the unsharded collection, and obtain its resume token.
+assert.commandWorked(mongosColl.insert({_id: 0, a: -100}));
+assert.soon(() => csCur.hasNext());
+const insertEvent = csCur.next();
+assert.eq(insertEvent.operationType, "insert");
+
+// Update the document and confirm that we can see the updateLookup fullDocument.
+assert.commandWorked(mongosColl.update({_id: 0}, {$set: {updated: true}}));
+assert.soon(() => csCur.hasNext());
+let updateEvent = csCur.next();
+assert.eq(updateEvent.operationType, "update");
+assert.docEq(updateEvent.fullDocument, {_id: 0, a: -100, updated: true});
+
+// Now shard the collection on {a: 1} and move the upper chunk to the other shard.
+assert.commandWorked(mongosColl.createIndex({a: 1}));
+st.shardColl(mongosColl, {a: 1}, {a: 0}, {a: 0});
+
+// Resume a change stream just after the initial insert. We expect the update lookup to succeed,
+// despite the fact that only the _id and not the entire documentKey was recorded in the oplog.
+csCur = mongosColl.watch([], {resumeAfter: insertEvent._id, fullDocument: "updateLookup"});
+assert.soon(() => csCur.hasNext());
+updateEvent = csCur.next();
+assert.eq(updateEvent.operationType, "update");
+assert.docEq(updateEvent.fullDocument, {_id: 0, a: -100, updated: true});
+
+// Insert a second document with the same _id on the second shard.
+assert.commandWorked(mongosColl.insert({_id: 0, a: 100}));
+
+// Now that two documents with the same _id are present, the update lookup fails.
+csCur = mongosColl.watch([], {resumeAfter: insertEvent._id, fullDocument: "updateLookup"});
+assert.soon(() => {
+ try {
+ assert.eq(csCur.hasNext(), false, () => tojson(csCur.next()));
+ return false;
+ } catch (ex) {
+ assert.eq(ex.code, ErrorCodes.ChangeStreamFatalError);
+ return true;
+ }
+});
+
+st.stop();
+})(); \ No newline at end of file
diff --git a/jstests/sharding/change_streams_update_lookup_shard_metadata_missing.js b/jstests/sharding/change_streams_update_lookup_shard_metadata_missing.js
index a5c8ae8df9f..df091e01283 100644
--- a/jstests/sharding/change_streams_update_lookup_shard_metadata_missing.js
+++ b/jstests/sharding/change_streams_update_lookup_shard_metadata_missing.js
@@ -31,7 +31,7 @@ st.ensurePrimaryShard(mongosDB.getName(), shard0.getURL());
st.shardColl(mongosColl, {a: 1}, {a: 0}, {a: 1});
// Open a change stream on the collection.
-const csCursor = mongosColl.watch();
+let csCursor = mongosColl.watch();
// Write one document onto shard0 and obtain its resume token.
assert.commandWorked(mongosColl.insert({_id: 0, a: -100}));
@@ -39,14 +39,14 @@ assert.soon(() => csCursor.hasNext());
const resumeToken = csCursor.next()._id;
-// get any secondary
+// Obtain a reference to any secondary.
const newPrimary = st.rs0.getSecondary();
-let shards = st.s.getDB('config').shards.find().toArray();
// Step up one of the Secondaries, which will not have any sharding metadata loaded.
st.rs0.stepUpNoAwaitReplication(newPrimary);
-// make sure the mongos refreshes it's connections to the shard
+// Make sure the mongos refreshes its connections to the shard.
+let shards = st.s.getDB('config').shards.find().toArray();
let primary = {};
do {
let connPoolStats = st.s0.adminCommand({connPoolStats: 1});
@@ -62,17 +62,19 @@ do {
assert.soonNoExcept(
() => assert.commandWorked(mongosColl.update({_id: 0}, {$set: {updated: true}}, false, true)));
-// Resume the change stream with {fullDocument: 'updateLookup'}.
-const cmdRes = assert.commandWorked(mongosColl.runCommand("aggregate", {
- pipeline: [{$changeStream: {resumeAfter: resumeToken, fullDocument: "updateLookup"}}],
- cursor: {}
-}));
-
+// Resume the change stream with {fullDocument: 'updateLookup'}. Update lookup can successfully
+// identify the document based on its _id alone so long as the _id is unique in the collection, so
+// this alone does not prove that the multi-update actually wrote its shard key into the oplog.
+csCursor = mongosColl.watch([], {resumeAfter: resumeToken, fullDocument: "updateLookup"});
assert.soon(() => csCursor.hasNext());
+assert.docEq(csCursor.next().fullDocument, {_id: 0, a: -100, updated: true});
-const updateObj = csCursor.next();
-
-assert.eq(true, updateObj.updateDescription.updatedFields.updated);
+// Now insert a new document with the same _id on the other shard. Update lookup will be able to
+// distinguish between the two, proving that they both have full shard keys available.
+assert.commandWorked(mongosColl.insert({_id: 0, a: 100}));
+csCursor = mongosColl.watch([], {resumeAfter: resumeToken, fullDocument: "updateLookup"});
+assert.soon(() => csCursor.hasNext());
+assert.docEq(csCursor.next().fullDocument, {_id: 0, a: -100, updated: true});
st.stop();
})(); \ No newline at end of file
diff --git a/src/mongo/db/pipeline/mongos_process_interface.cpp b/src/mongo/db/pipeline/mongos_process_interface.cpp
index 6520a94e035..de70e39372e 100644
--- a/src/mongo/db/pipeline/mongos_process_interface.cpp
+++ b/src/mongo/db/pipeline/mongos_process_interface.cpp
@@ -61,25 +61,6 @@ using std::unique_ptr;
namespace {
/**
- * Determines the single shard to which the given query will be targeted, and its associated
- * shardVersion. Throws if the query targets more than one shard.
- */
-std::pair<ShardId, ChunkVersion> getSingleTargetedShardForQuery(
- OperationContext* opCtx, const CachedCollectionRoutingInfo& routingInfo, BSONObj query) {
- if (auto chunkMgr = routingInfo.cm()) {
- std::set<ShardId> shardIds;
- chunkMgr->getShardIdsForQuery(opCtx, query, CollationSpec::kSimpleSpec, &shardIds);
- uassert(ErrorCodes::ChangeStreamFatalError,
- str::stream() << "Unable to target lookup query to a single shard: "
- << query.toString(),
- shardIds.size() == 1u);
- return {*shardIds.begin(), chunkMgr->getVersion(*shardIds.begin())};
- }
-
- return {routingInfo.db().primaryId(), ChunkVersion::UNSHARDED()};
-}
-
-/**
* Returns the routing information for the namespace set on the passed ExpressionContext. Also
* verifies that the ExpressionContext's UUID, if present, matches that of the routing table entry.
*/
@@ -152,8 +133,7 @@ boost::optional<Document> MongoSInterface::lookupSingleDocument(
bool allowSpeculativeMajorityRead) {
auto foreignExpCtx = expCtx->copyWith(nss, collectionUUID);
- // Create the find command to be dispatched to the shard in order to return the post-change
- // document.
+ // Create the find command to be dispatched to the shard(s) in order to return the post-image.
auto filterObj = filter.toBson();
BSONObjBuilder cmdBuilder;
bool findCmdIsByUuid(foreignExpCtx->uuid);
@@ -170,17 +150,22 @@ boost::optional<Document> MongoSInterface::lookupSingleDocument(
cmdBuilder.append("allowSpeculativeMajorityRead", true);
}
- auto shardResult = std::vector<RemoteCursor>();
+ auto shardResults = std::vector<RemoteCursor>();
auto findCmd = cmdBuilder.obj();
- size_t numAttempts = 0;
- while (++numAttempts <= kMaxNumStaleVersionRetries) {
- // Verify that the collection exists, with the correct UUID.
+ for (size_t numRetries = 0; numRetries <= kMaxNumStaleVersionRetries; ++numRetries) {
+ // Obtain the catalog cache. If we are retrying after a stale shard error, mark this
+ // operation as needing to block on the next catalog cache refresh.
auto catalogCache = Grid::get(expCtx->opCtx)->catalogCache();
+ catalogCache->setOperationShouldBlockBehindCatalogCacheRefresh(expCtx->opCtx, numRetries);
+
+ // Verify that the collection exists, with the correct UUID.
auto swRoutingInfo = getCollectionRoutingInfo(foreignExpCtx);
if (swRoutingInfo == ErrorCodes::NamespaceNotFound) {
return boost::none;
}
auto routingInfo = uassertStatusOK(std::move(swRoutingInfo));
+
+ // Finalize the 'find' command object based on the routing table information.
if (findCmdIsByUuid && routingInfo.cm()) {
// Find by UUID and shard versioning do not work together (SERVER-31946). In the
// sharded case we've already checked the UUID, so find by namespace is safe. In the
@@ -191,62 +176,66 @@ boost::optional<Document> MongoSInterface::lookupSingleDocument(
findCmdIsByUuid = false;
}
- // Dispatch the request. This will only be sent to a single shard and only a single result
- // will be returned. The 'establishCursors' method conveniently prepares the result into a
- // cursor response for us.
try {
- // Get the ID and version of the single shard to which this query will be sent.
- auto shardInfo = getSingleTargetedShardForQuery(expCtx->opCtx, routingInfo, filterObj);
-
- shardResult = establishCursors(
+ // Build the versioned requests to be dispatched to the shards. Typically, only a single
+ // shard will be targeted here; however, in certain cases where only the _id is present,
+ // we may need to scatter-gather the query to all shards in order to find the document.
+ auto requests = getVersionedRequestsForTargetedShards(
+ expCtx->opCtx, nss, routingInfo, findCmd, filterObj, CollationSpec::kSimpleSpec);
+
+ // Dispatch the requests. The 'establishCursors' method conveniently prepares the result
+ // into a vector of cursor responses for us.
+ shardResults = establishCursors(
expCtx->opCtx,
Grid::get(expCtx->opCtx)->getExecutorPool()->getArbitraryExecutor(),
nss,
ReadPreferenceSetting::get(expCtx->opCtx),
- {{shardInfo.first, appendShardVersion(findCmd, shardInfo.second)}},
+ std::move(requests),
false);
- break;
} catch (const ExceptionFor<ErrorCodes::NamespaceNotFound>&) {
// If it's an unsharded collection which has been deleted and re-created, we may get a
// NamespaceNotFound error when looking up by UUID.
return boost::none;
+ } catch (const ExceptionFor<ErrorCodes::StaleDbVersion>&) {
+ // If the database version is stale, refresh its entry in the catalog cache.
+ catalogCache->onStaleDatabaseVersion(nss.db(), routingInfo.db().databaseVersion());
+ continue; // Try again if allowed.
} catch (const ExceptionForCat<ErrorCategory::StaleShardVersionError>& e) {
- if (auto staleInfo = e.extraInfo<StaleConfigInfo>()) {
- catalogCache->invalidateShardOrEntireCollectionEntryForShardedCollection(
- expCtx->opCtx,
- nss,
- staleInfo->getVersionWanted(),
- staleInfo->getVersionReceived(),
- staleInfo->getShardId());
+ // If the exception provides a shardId, add it to the set of shards requiring a refresh.
+ // If the cache currently considers the collection to be unsharded, this will trigger an
+ // epoch refresh. If no shard is provided, then the epoch is stale and we must refresh.
+ auto staleInfo = e.extraInfo<StaleConfigInfo>();
+ if (auto staleShardId = (staleInfo ? staleInfo->getShardId() : boost::none)) {
+ catalogCache->onStaleShardVersion(std::move(routingInfo), *staleShardId);
} else {
catalogCache->onEpochChange(nss);
}
-
- catalogCache->setOperationShouldBlockBehindCatalogCacheRefresh(expCtx->opCtx, true);
-
continue; // Try again if allowed.
}
break; // Success!
}
- invariant(shardResult.size() == 1u);
-
- auto& cursor = shardResult.front().getCursorResponse();
- auto& batch = cursor.getBatch();
-
- // We should have at most 1 result, and the cursor should be exhausted.
- uassert(ErrorCodes::ChangeStreamFatalError,
- str::stream() << "Shard cursor was unexpectedly open after lookup: "
- << shardResult.front().getHostAndPort()
- << ", id: " << cursor.getCursorId(),
- cursor.getCursorId() == 0);
- uassert(ErrorCodes::ChangeStreamFatalError,
- str::stream() << "found more than one document matching " << filter.toString() << " ["
- << batch.begin()->toString() << ", "
- << std::next(batch.begin())->toString() << "]",
- batch.size() <= 1u);
+ // Iterate all shard results and build a single composite batch. We also enforce the requirement
+ // that only a single document should have been returned from across the cluster.
+ std::vector<BSONObj> finalBatch;
+ for (auto&& shardResult : shardResults) {
+ auto& shardCursor = shardResult.getCursorResponse();
+ finalBatch.insert(
+ finalBatch.end(), shardCursor.getBatch().begin(), shardCursor.getBatch().end());
+ // We should have at most 1 result, and the cursor should be exhausted.
+ uassert(ErrorCodes::ChangeStreamFatalError,
+ str::stream() << "Shard cursor was unexpectedly open after lookup: "
+ << shardResult.getHostAndPort()
+ << ", id: " << shardCursor.getCursorId(),
+ shardCursor.getCursorId() == 0);
+ uassert(ErrorCodes::ChangeStreamFatalError,
+ str::stream() << "found more than one document matching " << filter.toString()
+ << " [" << finalBatch.begin()->toString() << ", "
+ << std::next(finalBatch.begin())->toString() << "]",
+ finalBatch.size() <= 1u);
+ }
- return (!batch.empty() ? Document(batch.front()) : boost::optional<Document>{});
+ return (!finalBatch.empty() ? Document(finalBatch.front()) : boost::optional<Document>{});
}
BSONObj MongoSInterface::_reportCurrentOpForClient(OperationContext* opCtx,
diff --git a/src/mongo/s/cluster_commands_helpers.cpp b/src/mongo/s/cluster_commands_helpers.cpp
index b52daddc718..2df30dd8d33 100644
--- a/src/mongo/s/cluster_commands_helpers.cpp
+++ b/src/mongo/s/cluster_commands_helpers.cpp
@@ -659,6 +659,26 @@ std::set<ShardId> getTargetedShardsForQuery(OperationContext* opCtx,
return {routingInfo.db().primaryId()};
}
+std::vector<std::pair<ShardId, BSONObj>> getVersionedRequestsForTargetedShards(
+ OperationContext* opCtx,
+ const NamespaceString& nss,
+ const CachedCollectionRoutingInfo& routingInfo,
+ const BSONObj& cmdObj,
+ const BSONObj& query,
+ const BSONObj& collation) {
+ std::vector<std::pair<ShardId, BSONObj>> requests;
+ auto ars_requests =
+ buildVersionedRequestsForTargetedShards(opCtx, nss, routingInfo, cmdObj, query, collation);
+ std::transform(std::make_move_iterator(ars_requests.begin()),
+ std::make_move_iterator(ars_requests.end()),
+ std::back_inserter(requests),
+ [](auto&& ars) {
+ return std::pair<ShardId, BSONObj>(std::move(ars.shardId),
+ std::move(ars.cmdObj));
+ });
+ return requests;
+}
+
StatusWith<CachedCollectionRoutingInfo> getCollectionRoutingInfoForTxnCmd(
OperationContext* opCtx, const NamespaceString& nss) {
auto catalogCache = Grid::get(opCtx)->catalogCache();
diff --git a/src/mongo/s/cluster_commands_helpers.h b/src/mongo/s/cluster_commands_helpers.h
index b14a681a824..f76d9aeada7 100644
--- a/src/mongo/s/cluster_commands_helpers.h
+++ b/src/mongo/s/cluster_commands_helpers.h
@@ -261,6 +261,18 @@ std::set<ShardId> getTargetedShardsForQuery(OperationContext* opCtx,
const BSONObj& collation);
/**
+ * Determines the shard(s) to which the given query will be targeted, and builds a separate
+ * versioned copy of the command object for each such shard.
+ */
+std::vector<std::pair<ShardId, BSONObj>> getVersionedRequestsForTargetedShards(
+ OperationContext* opCtx,
+ const NamespaceString& nss,
+ const CachedCollectionRoutingInfo& routingInfo,
+ const BSONObj& cmdObj,
+ const BSONObj& query,
+ const BSONObj& collation);
+
+/**
* If the command is running in a transaction, returns the proper routing table to use for targeting
* shards. If there is no active transaction or the transaction is not running with snapshot level
* read concern, the latest routing table is returned, otherwise a historical routing table is