summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorSiyuan Zhou <siyuan.zhou@mongodb.com>2017-08-30 23:58:30 -0400
committerSiyuan Zhou <siyuan.zhou@mongodb.com>2017-09-15 17:20:22 -0400
commit7626535bbcc2f90b7815cbf1a8e6d2c0bef732f1 (patch)
tree8638e4aafe02c50a616e8f319f8ed0cae068210f /src/mongo
parentc9e5bcbc0dacfa8031f3a2aaa1c6e369d0bc26c3 (diff)
downloadmongo-7626535bbcc2f90b7815cbf1a8e6d2c0bef732f1.tar.gz
SERVER-30591 Do changeStream lookups by UUID instead of namespace.
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/db_raii.cpp37
-rw-r--r--src/mongo/db/db_raii.h16
-rw-r--r--src/mongo/db/pipeline/document_source.h2
-rw-r--r--src/mongo/db/pipeline/document_source_lookup_change_post_image.cpp12
-rw-r--r--src/mongo/db/pipeline/document_source_lookup_change_post_image.h1
-rw-r--r--src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp57
-rw-r--r--src/mongo/db/pipeline/expression_context.cpp4
-rw-r--r--src/mongo/db/pipeline/expression_context.h7
-rw-r--r--src/mongo/db/pipeline/pipeline_d.cpp14
9 files changed, 131 insertions, 19 deletions
diff --git a/src/mongo/db/db_raii.cpp b/src/mongo/db/db_raii.cpp
index 018a2591ed3..07e65df9aa1 100644
--- a/src/mongo/db/db_raii.cpp
+++ b/src/mongo/db/db_raii.cpp
@@ -33,6 +33,7 @@
#include "mongo/db/catalog/collection.h"
#include "mongo/db/catalog/database.h"
#include "mongo/db/catalog/database_holder.h"
+#include "mongo/db/catalog/uuid_catalog.h"
#include "mongo/db/client.h"
#include "mongo/db/curop.h"
#include "mongo/db/repl/replication_coordinator_global.h"
@@ -134,6 +135,25 @@ AutoStatsTracker::~AutoStatsTracker() {
}
AutoGetCollectionForRead::AutoGetCollectionForRead(OperationContext* opCtx,
+ const StringData dbName,
+ const UUID& uuid) {
+ // Lock the database since a UUID will always be in the same database even though its
+ // collection name may change.
+ Lock::DBLock dbSLock(opCtx, dbName, MODE_IS);
+
+ auto nss = UUIDCatalog::get(opCtx).lookupNSSByUUID(uuid);
+
+ // If the UUID doesn't exist, we leave _autoColl to be boost::none.
+ if (!nss.isEmpty()) {
+ _autoColl.emplace(
+ opCtx, nss, MODE_IS, AutoGetCollection::ViewMode::kViewsForbidden, std::move(dbSLock));
+
+ // Note: this can yield.
+ _ensureMajorityCommittedSnapshotIsValid(nss, opCtx);
+ }
+}
+
+AutoGetCollectionForRead::AutoGetCollectionForRead(OperationContext* opCtx,
const NamespaceString& nss,
AutoGetCollection::ViewMode viewMode) {
_autoColl.emplace(opCtx, nss, MODE_IS, MODE_IS, viewMode);
@@ -226,6 +246,23 @@ AutoGetCollectionOrViewForReadCommand::AutoGetCollectionOrViewForReadCommand(
? _autoCollForRead->getDb()->getViewCatalog()->lookup(opCtx, nss.ns())
: nullptr) {}
+AutoGetCollectionForReadCommand::AutoGetCollectionForReadCommand(OperationContext* opCtx,
+ const StringData dbName,
+ const UUID& uuid) {
+ _autoCollForRead.emplace(opCtx, dbName, uuid);
+ if (_autoCollForRead->getCollection()) {
+ _statsTracker.emplace(opCtx,
+ _autoCollForRead->getCollection()->ns(),
+ Top::LockType::ReadLocked,
+ _autoCollForRead->getDb()->getProfilingLevel());
+
+ // We have both the DB and collection locked, which is the prerequisite to do a stable shard
+ // version check, but we'd like to do the check after we have a satisfactory snapshot.
+ auto css = CollectionShardingState::get(opCtx, _autoCollForRead->getCollection()->ns());
+ css->checkShardVersionOrThrow(opCtx);
+ }
+}
+
void AutoGetCollectionOrViewForReadCommand::releaseLocksForView() noexcept {
invariant(_view);
_view = nullptr;
diff --git a/src/mongo/db/db_raii.h b/src/mongo/db/db_raii.h
index 7ff6b8f1a1d..e251d1fe05d 100644
--- a/src/mongo/db/db_raii.h
+++ b/src/mongo/db/db_raii.h
@@ -268,6 +268,12 @@ public:
AutoGetCollectionForRead(OperationContext* opCtx, const NamespaceString& nss)
: AutoGetCollectionForRead(opCtx, nss, AutoGetCollection::ViewMode::kViewsForbidden) {}
+ AutoGetCollectionForRead(OperationContext* opCtx, const NamespaceString& nss, Lock::DBLock lock)
+ : AutoGetCollectionForRead(
+ opCtx, nss, AutoGetCollection::ViewMode::kViewsForbidden, std::move(lock)) {}
+
+ AutoGetCollectionForRead(OperationContext* opCtx, const StringData dbName, const UUID& uuid);
+
/**
* This constructor is intended for internal use and should not be used outside this file.
* AutoGetCollectionForReadCommand and AutoGetCollectionOrViewForReadCommand use 'viewMode' to
@@ -283,10 +289,16 @@ public:
AutoGetCollection::ViewMode viewMode,
Lock::DBLock lock);
Database* getDb() const {
+ if (!_autoColl) {
+ return nullptr;
+ }
return _autoColl->getDb();
}
Collection* getCollection() const {
+ if (!_autoColl) {
+ return nullptr;
+ }
return _autoColl->getCollection();
}
@@ -325,6 +337,10 @@ public:
: AutoGetCollectionForReadCommand(
opCtx, nss, AutoGetCollection::ViewMode::kViewsForbidden, std::move(lock)) {}
+ AutoGetCollectionForReadCommand(OperationContext* opCtx,
+ const StringData dbName,
+ const UUID& uuid);
+
Database* getDb() const {
return _autoCollForRead->getDb();
}
diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h
index 5c10259ce4a..462afbecb2f 100644
--- a/src/mongo/db/pipeline/document_source.h
+++ b/src/mongo/db/pipeline/document_source.h
@@ -637,6 +637,8 @@ public:
* for execution. The returned pipeline is optimized and has a cursor source prepared.
*
* This function returns a non-OK status if parsing the pipeline failed.
+ * NamespaceNotFound will be returned if ExpressionContext has a UUID and that UUID doesn't
+ * exist anymore. That should be the only case where NamespaceNotFound gets returned.
*/
virtual StatusWith<std::unique_ptr<Pipeline, Pipeline::Deleter>> makePipeline(
const std::vector<BSONObj>& rawPipeline,
diff --git a/src/mongo/db/pipeline/document_source_lookup_change_post_image.cpp b/src/mongo/db/pipeline/document_source_lookup_change_post_image.cpp
index 30f79ab8787..7c6295ea021 100644
--- a/src/mongo/db/pipeline/document_source_lookup_change_post_image.cpp
+++ b/src/mongo/db/pipeline/document_source_lookup_change_post_image.cpp
@@ -97,10 +97,18 @@ Value DocumentSourceLookupChangePostImage::lookupPostImage(const Document& updat
updateOp, DocumentSourceChangeStream::kDocumentKeyField, BSONType::Object);
auto matchSpec = BSON("$match" << documentKey);
+ // Extract the UUID from resume token and do change stream lookups by UUID.
+ ResumeToken resumeToken(updateOp[DocumentSourceChangeStream::kIdField]);
+
// TODO SERVER-29134 we need to extract the namespace from the document and set them on the new
// ExpressionContext if we're getting notifications from an entire database.
- auto foreignExpCtx = pExpCtx->copyWith(nss);
- auto pipeline = uassertStatusOK(_mongod->makePipeline({matchSpec}, foreignExpCtx));
+ auto foreignExpCtx = pExpCtx->copyWith(nss, resumeToken.getUuid());
+ auto pipelineStatus = _mongod->makePipeline({matchSpec}, foreignExpCtx);
+ if (pipelineStatus.getStatus() == ErrorCodes::NamespaceNotFound) {
+ // We couldn't find the collection with UUID, it may have been dropped.
+ return Value(BSONNULL);
+ }
+ auto pipeline = uassertStatusOK(std::move(pipelineStatus));
if (auto first = pipeline->getNext()) {
auto lookedUpDocument = Value(*first);
diff --git a/src/mongo/db/pipeline/document_source_lookup_change_post_image.h b/src/mongo/db/pipeline/document_source_lookup_change_post_image.h
index b11c79e44d3..c51653ddb8d 100644
--- a/src/mongo/db/pipeline/document_source_lookup_change_post_image.h
+++ b/src/mongo/db/pipeline/document_source_lookup_change_post_image.h
@@ -74,6 +74,7 @@ public:
deps->fields.insert(DocumentSourceChangeStream::kNamespaceField.toString());
deps->fields.insert(DocumentSourceChangeStream::kDocumentKeyField.toString());
deps->fields.insert(DocumentSourceChangeStream::kOperationTypeField.toString());
+ deps->fields.insert(DocumentSourceChangeStream::kIdField.toString());
// This stage does not restrict the output fields to a finite set, and has no impact on
// whether metadata is available or needed.
return SEE_NEXT;
diff --git a/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp b/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp
index fdeea5bffc6..66c408aa0d2 100644
--- a/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp
+++ b/src/mongo/db/pipeline/document_source_lookup_change_post_image_test.cpp
@@ -52,7 +52,27 @@ using std::deque;
using std::vector;
// This provides access to getExpCtx(), but we'll use a different name for this test suite.
-using DocumentSourceLookupChangePostImageTest = AggregationContextFixture;
+class DocumentSourceLookupChangePostImageTest : public AggregationContextFixture {
+public:
+ /**
+ * This method is required to avoid a static initialization fiasco resulting from calling
+ * UUID::gen() in file static scope.
+ */
+ static const UUID& testUuid() {
+ static const UUID* uuid_gen = new UUID(UUID::gen());
+ return *uuid_gen;
+ }
+
+ Document makeResumeToken(ImplicitValue id = Value()) {
+ const Timestamp ts(100, 1);
+ if (id.missing()) {
+ return {{"clusterTime", Document{{"ts", ts}}}, {"uuid", testUuid()}};
+ }
+ return {{"clusterTime", Document{{"ts", ts}}},
+ {"uuid", testUuid()},
+ {"documentKey", Document{{"_id", id}}}};
+ }
+};
/**
* A mock MongodInterface which allows mocking a foreign pipeline.
@@ -92,7 +112,8 @@ TEST_F(DocumentSourceLookupChangePostImageTest, ShouldErrorIfMissingDocumentKeyO
// Mock its input with a document without a "documentKey" field.
auto mockLocalSource = DocumentSourceMock::create(
- Document{{"operationType", "update"_sd},
+ Document{{"_id", makeResumeToken(0)},
+ {"operationType", "update"_sd},
{"fullDocument", Document{{"_id", 0}}},
{"ns", Document{{"db", expCtx->ns.db()}, {"coll", expCtx->ns.coll()}}}});
@@ -113,7 +134,8 @@ TEST_F(DocumentSourceLookupChangePostImageTest, ShouldErrorIfMissingOperationTyp
// Mock its input with a document without a "ns" field.
auto mockLocalSource = DocumentSourceMock::create(
- Document{{"documentKey", Document{{"_id", 0}}},
+ Document{{"_id", makeResumeToken(0)},
+ {"documentKey", Document{{"_id", 0}}},
{"fullDocument", Document{{"_id", 0}}},
{"ns", Document{{"db", expCtx->ns.db()}, {"coll", expCtx->ns.coll()}}}});
@@ -134,7 +156,9 @@ TEST_F(DocumentSourceLookupChangePostImageTest, ShouldErrorIfMissingNamespace) {
// Mock its input with a document without a "ns" field.
auto mockLocalSource = DocumentSourceMock::create(Document{
- {"documentKey", Document{{"_id", 0}}}, {"operationType", "update"_sd},
+ {"_id", makeResumeToken(0)},
+ {"documentKey", Document{{"_id", 0}}},
+ {"operationType", "update"_sd},
});
lookupChangeStage->setSource(mockLocalSource.get());
@@ -153,8 +177,11 @@ TEST_F(DocumentSourceLookupChangePostImageTest, ShouldErrorIfNsFieldHasWrongType
auto lookupChangeStage = DocumentSourceLookupChangePostImage::create(expCtx);
// Mock its input with a document without a "ns" field.
- auto mockLocalSource = DocumentSourceMock::create(
- Document{{"documentKey", Document{{"_id", 0}}}, {"operationType", "update"_sd}, {"ns", 4}});
+ auto mockLocalSource =
+ DocumentSourceMock::create(Document{{"_id", makeResumeToken(0)},
+ {"documentKey", Document{{"_id", 0}}},
+ {"operationType", "update"_sd},
+ {"ns", 4}});
lookupChangeStage->setSource(mockLocalSource.get());
@@ -173,7 +200,8 @@ TEST_F(DocumentSourceLookupChangePostImageTest, ShouldErrorIfNsFieldDoesNotMatch
// Mock its input with a document without a "ns" field.
auto mockLocalSource = DocumentSourceMock::create(
- Document{{"documentKey", Document{{"_id", 0}}},
+ Document{{"_id", makeResumeToken(0)},
+ {"documentKey", Document{{"_id", 0}}},
{"operationType", "update"_sd},
{"ns", Document{{"db", "DIFFERENT"_sd}, {"coll", expCtx->ns.coll()}}}});
@@ -194,7 +222,8 @@ TEST_F(DocumentSourceLookupChangePostImageTest, ShouldErrorIfDocumentKeyIsNotUni
// Mock its input with an update document.
auto mockLocalSource = DocumentSourceMock::create(
- Document{{"documentKey", Document{{"_id", 0}}},
+ Document{{"_id", makeResumeToken(0)},
+ {"documentKey", Document{{"_id", 0}}},
{"operationType", "update"_sd},
{"ns", Document{{"db", expCtx->ns.db()}, {"coll", expCtx->ns.coll()}}}});
@@ -217,12 +246,14 @@ TEST_F(DocumentSourceLookupChangePostImageTest, ShouldPropagatePauses) {
// Mock its input, pausing every other result.
auto mockLocalSource = DocumentSourceMock::create(
- {Document{{"documentKey", Document{{"_id", 0}}},
+ {Document{{"_id", makeResumeToken(0)},
+ {"documentKey", Document{{"_id", 0}}},
{"operationType", "insert"_sd},
{"ns", Document{{"db", expCtx->ns.db()}, {"coll", expCtx->ns.coll()}}},
{"fullDocument", Document{{"_id", 0}}}},
DocumentSource::GetNextResult::makePauseExecution(),
- Document{{"documentKey", Document{{"_id", 1}}},
+ Document{{"_id", makeResumeToken(1)},
+ {"documentKey", Document{{"_id", 1}}},
{"operationType", "update"_sd},
{"ns", Document{{"db", expCtx->ns.db()}, {"coll", expCtx->ns.coll()}}}},
DocumentSource::GetNextResult::makePauseExecution()});
@@ -239,7 +270,8 @@ TEST_F(DocumentSourceLookupChangePostImageTest, ShouldPropagatePauses) {
ASSERT_TRUE(next.isAdvanced());
ASSERT_DOCUMENT_EQ(
next.releaseDocument(),
- (Document{{"documentKey", Document{{"_id", 0}}},
+ (Document{{"_id", makeResumeToken(0)},
+ {"documentKey", Document{{"_id", 0}}},
{"operationType", "insert"_sd},
{"ns", Document{{"db", expCtx->ns.db()}, {"coll", expCtx->ns.coll()}}},
{"fullDocument", Document{{"_id", 0}}}}));
@@ -250,7 +282,8 @@ TEST_F(DocumentSourceLookupChangePostImageTest, ShouldPropagatePauses) {
ASSERT_TRUE(next.isAdvanced());
ASSERT_DOCUMENT_EQ(
next.releaseDocument(),
- (Document{{"documentKey", Document{{"_id", 1}}},
+ (Document{{"_id", makeResumeToken(1)},
+ {"documentKey", Document{{"_id", 1}}},
{"operationType", "update"_sd},
{"ns", Document{{"db", expCtx->ns.db()}, {"coll", expCtx->ns.coll()}}},
{"fullDocument", Document{{"_id", 1}}}}));
diff --git a/src/mongo/db/pipeline/expression_context.cpp b/src/mongo/db/pipeline/expression_context.cpp
index 0a6005c73b0..8f15fb5a3fe 100644
--- a/src/mongo/db/pipeline/expression_context.cpp
+++ b/src/mongo/db/pipeline/expression_context.cpp
@@ -74,9 +74,11 @@ void ExpressionContext::setCollator(std::unique_ptr<CollatorInterface> coll) {
_valueComparator = ValueComparator(_collator.get());
}
-intrusive_ptr<ExpressionContext> ExpressionContext::copyWith(NamespaceString ns) const {
+intrusive_ptr<ExpressionContext> ExpressionContext::copyWith(NamespaceString ns,
+ boost::optional<UUID> uuid) const {
intrusive_ptr<ExpressionContext> expCtx = new ExpressionContext(std::move(ns));
+ expCtx->uuid = std::move(uuid);
expCtx->explain = explain;
expCtx->needsMerge = needsMerge;
expCtx->fromMongos = fromMongos;
diff --git a/src/mongo/db/pipeline/expression_context.h b/src/mongo/db/pipeline/expression_context.h
index 037f1b81383..4fe6bc8e541 100644
--- a/src/mongo/db/pipeline/expression_context.h
+++ b/src/mongo/db/pipeline/expression_context.h
@@ -46,6 +46,7 @@
#include "mongo/db/query/tailable_mode.h"
#include "mongo/util/intrusive_counter.h"
#include "mongo/util/string_map.h"
+#include "mongo/util/uuid.h"
namespace mongo {
@@ -88,9 +89,10 @@ public:
/**
* Returns an ExpressionContext that is identical to 'this' that can be used to execute a
- * separate aggregation pipeline on 'ns'.
+ * separate aggregation pipeline on 'ns' with the optional 'uuid'.
*/
- boost::intrusive_ptr<ExpressionContext> copyWith(NamespaceString ns) const;
+ boost::intrusive_ptr<ExpressionContext> copyWith(
+ NamespaceString ns, boost::optional<UUID> uuid = boost::none) const;
/**
* Returns the ResolvedNamespace corresponding to 'nss'. It is an error to call this method on a
@@ -125,6 +127,7 @@ public:
bool from34Mongos = false;
NamespaceString ns;
+ boost::optional<UUID> uuid;
std::string tempDir; // Defaults to empty to prevent external sorting in mongos.
OperationContext* opCtx;
diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp
index 66d4baf9379..75fb836c319 100644
--- a/src/mongo/db/pipeline/pipeline_d.cpp
+++ b/src/mongo/db/pipeline/pipeline_d.cpp
@@ -204,7 +204,17 @@ public:
pipeline.getValue()->optimizePipeline();
- AutoGetCollectionForReadCommand autoColl(expCtx->opCtx, expCtx->ns);
+ boost::optional<AutoGetCollectionForReadCommand> autoColl;
+ if (expCtx->uuid) {
+ autoColl.emplace(expCtx->opCtx, expCtx->ns.db(), *expCtx->uuid);
+ if (autoColl->getCollection() == nullptr) {
+ // The UUID doesn't exist anymore.
+ return {ErrorCodes::NamespaceNotFound,
+ "No namespace with UUID " + expCtx->uuid->toString()};
+ }
+ } else {
+ autoColl.emplace(expCtx->opCtx, expCtx->ns);
+ }
// makePipeline() is only called to perform secondary aggregation requests and expects the
// collection representing the document source to be not-sharded. We confirm sharding state
@@ -217,7 +227,7 @@ public:
uassert(4567, "from collection cannot be sharded", !bool(css->getMetadata()));
PipelineD::prepareCursorSource(
- autoColl.getCollection(), expCtx->ns, nullptr, pipeline.getValue().get());
+ autoColl->getCollection(), expCtx->ns, nullptr, pipeline.getValue().get());
return pipeline;
}