summaryrefslogtreecommitdiff
path: root/src/mongo/s/query/document_source_update_on_add_shard.cpp
diff options
context:
space:
mode:
authorBernard Gorman <bernard.gorman@mongodb.com>2019-10-17 20:26:03 +0000
committerevergreen <evergreen@mongodb.com>2019-10-17 20:26:03 +0000
commit97cc7b5838db4ef13ede3149c44bceca8f5c2977 (patch)
tree38d9b78346777033545f99048f947559a67a9ab1 /src/mongo/s/query/document_source_update_on_add_shard.cpp
parent1daf063435aa3f748840c476f8fa3bd13d7d2a68 (diff)
downloadmongo-97cc7b5838db4ef13ede3149c44bceca8f5c2977.tar.gz
SERVER-42723 New shard with new database can be ignored by change streams
Diffstat (limited to 'src/mongo/s/query/document_source_update_on_add_shard.cpp')
-rw-r--r--src/mongo/s/query/document_source_update_on_add_shard.cpp72
1 files changed, 43 insertions, 29 deletions
diff --git a/src/mongo/s/query/document_source_update_on_add_shard.cpp b/src/mongo/s/query/document_source_update_on_add_shard.cpp
index d94a99518b7..4ae4318c997 100644
--- a/src/mongo/s/query/document_source_update_on_add_shard.cpp
+++ b/src/mongo/s/query/document_source_update_on_add_shard.cpp
@@ -32,6 +32,7 @@
#include <algorithm>
#include "mongo/db/pipeline/document_source_change_stream.h"
+#include "mongo/s/catalog/type_shard.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/grid.h"
#include "mongo/s/query/async_results_merger_params_gen.h"
@@ -40,10 +41,20 @@
namespace mongo {
namespace {
-// Returns true if the change stream document has an 'operationType' of 'newShardDetected'.
-bool needsUpdate(const Document& childResult) {
- return childResult[DocumentSourceChangeStream::kOperationTypeField].getStringData() ==
- DocumentSourceChangeStream::kNewShardDetectedOpType;
+// Returns true if the change stream document is an event in 'config.shards'.
+bool isShardConfigEvent(const Document& eventDoc) {
+ // TODO SERVER-44039: we continue to generate 'kNewShardDetected' events for compatibility
+ // with 4.2, even though we no longer rely on them to detect new shards. We swallow the event
+ // here. We may wish to remove this mechanism entirely 4.6, or retain it for future cases where
+ // a change stream is targeted to a subset of shards. See SERVER-44039 for details.
+ if (eventDoc[DocumentSourceChangeStream::kOperationTypeField].getStringData() ==
+ DocumentSourceChangeStream::kNewShardDetectedOpType) {
+ return true;
+ }
+ auto nsObj = eventDoc[DocumentSourceChangeStream::kNamespaceField];
+ return nsObj.getType() == BSONType::Object &&
+ nsObj["db"_sd].getStringData() == ShardType::ConfigNS.db() &&
+ nsObj["coll"_sd].getStringData() == ShardType::ConfigNS.coll();
}
} // namespace
@@ -69,14 +80,19 @@ DocumentSourceUpdateOnAddShard::DocumentSourceUpdateOnAddShard(
: DocumentSource(kStageName, expCtx),
_executor(std::move(executor)),
_mergeCursors(mergeCursors),
- _shardsWithCursors(std::move(shardsWithCursors)),
+ _shardsWithCursors(shardsWithCursors.begin(), shardsWithCursors.end()),
_cmdToRunOnNewShards(cmdToRunOnNewShards.getOwned()) {}
DocumentSource::GetNextResult DocumentSourceUpdateOnAddShard::doGetNext() {
auto childResult = pSource->getNext();
- while (childResult.isAdvanced() && needsUpdate(childResult.getDocument())) {
- addNewShardCursors(childResult.getDocument());
+ // If this is an insertion into the 'config.shards' collection, open a cursor on the new shard.
+ while (childResult.isAdvanced() && isShardConfigEvent(childResult.getDocument())) {
+ auto opType = childResult.getDocument()[DocumentSourceChangeStream::kOperationTypeField];
+ if (opType.getStringData() == DocumentSourceChangeStream::kInsertOpType) {
+ addNewShardCursors(childResult.getDocument());
+ }
+ // For shard removal or update, we do nothing. We also swallow kNewShardDetectedOpType.
childResult = pSource->getNext();
}
return childResult;
@@ -88,41 +104,39 @@ void DocumentSourceUpdateOnAddShard::addNewShardCursors(const Document& newShard
std::vector<RemoteCursor> DocumentSourceUpdateOnAddShard::establishShardCursorsOnNewShards(
const Document& newShardDetectedObj) {
- auto* opCtx = pExpCtx->opCtx;
// Reload the shard registry. We need to ensure a reload initiated after calling this method
- // caused the reload, otherwise we aren't guaranteed to get all the new shards.
- auto* shardRegistry = Grid::get(opCtx)->shardRegistry();
- if (!shardRegistry->reload(opCtx)) {
+ // caused the reload, otherwise we may not see the new shard, so we perform a "hard" reload.
+ auto* opCtx = pExpCtx->opCtx;
+ if (!Grid::get(opCtx)->shardRegistry()->reload(opCtx)) {
// A 'false' return from shardRegistry.reload() means a reload was already in progress and
// it completed before reload() returned. So another reload(), regardless of return value,
// will ensure a reload started after the first call to reload().
- shardRegistry->reload(opCtx);
+ Grid::get(opCtx)->shardRegistry()->reload(opCtx);
}
- std::vector<ShardId> shardIds, newShardIds;
- shardRegistry->getAllShardIdsNoReload(&shardIds);
- std::sort(_shardsWithCursors.begin(), _shardsWithCursors.end());
- std::sort(shardIds.begin(), shardIds.end());
- std::set_difference(shardIds.begin(),
- shardIds.end(),
- _shardsWithCursors.begin(),
- _shardsWithCursors.end(),
- std::back_inserter(newShardIds));
+ // Parse the new shard's information from the document inserted into 'config.shards'.
+ auto newShardSpec = newShardDetectedObj[DocumentSourceChangeStream::kFullDocumentField];
+ auto newShard = uassertStatusOK(ShardType::fromBSON(newShardSpec.getDocument().toBson()));
- auto cmdObj = DocumentSourceChangeStream::replaceResumeTokenInCommand(
- _cmdToRunOnNewShards,
- newShardDetectedObj[DocumentSourceChangeStream::kIdField].getDocument());
- std::vector<std::pair<ShardId, BSONObj>> requests;
- for (const auto& shardId : newShardIds) {
- requests.emplace_back(shardId, cmdObj);
- _shardsWithCursors.push_back(shardId);
+ // Make sure we are not attempting to open a cursor on a shard that already has one.
+ if (!_shardsWithCursors.insert(newShard.getName()).second) {
+ return {};
}
+
+ // We must start the new cursor from the moment at which the shard became visible.
+ const auto newShardAddedTime = LogicalTime{
+ newShardDetectedObj[DocumentSourceChangeStream::kClusterTimeField].getTimestamp()};
+ auto resumeTokenForNewShard =
+ ResumeToken::makeHighWaterMarkToken(newShardAddedTime.addTicks(1).asTimestamp());
+ auto cmdObj = DocumentSourceChangeStream::replaceResumeTokenInCommand(
+ _cmdToRunOnNewShards, resumeTokenForNewShard.toDocument());
+
const bool allowPartialResults = false; // partial results are not allowed
return establishCursors(opCtx,
_executor,
pExpCtx->ns,
ReadPreferenceSetting::get(opCtx),
- requests,
+ {{newShard.getName(), cmdObj}},
allowPartialResults);
}