From e9638fa4456bbb4fd2492f84c6ec82e37dded8e2 Mon Sep 17 00:00:00 2001 From: Jason Rassi Date: Tue, 29 Mar 2016 16:04:42 -0400 Subject: SERVER-23294 DocumentSourceMergeCursors accept any valid cursor ns --- src/mongo/db/pipeline/document_source.h | 22 ++++++++--- .../db/pipeline/document_source_merge_cursors.cpp | 46 +++++++++++++--------- src/mongo/s/commands/cluster_pipeline_cmd.cpp | 21 +++++----- 3 files changed, 54 insertions(+), 35 deletions(-) diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h index 6c9dee2aa39..a59dfe6999d 100644 --- a/src/mongo/db/pipeline/document_source.h +++ b/src/mongo/db/pipeline/document_source.h @@ -678,7 +678,16 @@ private: class DocumentSourceMergeCursors : public DocumentSource { public: - typedef std::vector> CursorIds; + struct CursorDescriptor { + CursorDescriptor(ConnectionString connectionString, std::string ns, CursorId cursorId) + : connectionString(std::move(connectionString)), + ns(std::move(ns)), + cursorId(cursorId) {} + + ConnectionString connectionString; + std::string ns; + CursorId cursorId; + }; // virtuals from DocumentSource boost::optional getNext(); @@ -693,7 +702,8 @@ public: BSONElement elem, const boost::intrusive_ptr& pExpCtx); static boost::intrusive_ptr create( - const CursorIds& cursorIds, const boost::intrusive_ptr& pExpCtx); + std::vector cursorDescriptors, + const boost::intrusive_ptr& pExpCtx); /** Returns non-owning pointers to cursors managed by this stage. * Call this instead of getNext() if you want access to the raw streams. @@ -709,7 +719,7 @@ public: private: struct CursorAndConnection { - CursorAndConnection(ConnectionString host, NamespaceString ns, CursorId id); + CursorAndConnection(const CursorDescriptor& cursorDescriptor); ScopedDbConnection connection; DBClientCursor cursor; }; @@ -717,14 +727,14 @@ private: // using list to enable removing arbitrary elements typedef std::list> Cursors; - DocumentSourceMergeCursors(const CursorIds& cursorIds, + DocumentSourceMergeCursors(std::vector cursorDescriptors, const boost::intrusive_ptr& pExpCtx); - // Converts _cursorIds into active _cursors. + // Converts _cursorDescriptors into active _cursors. void start(); // This is the description of cursors to merge. - const CursorIds _cursorIds; + const std::vector _cursorDescriptors; // These are the actual cursors we are merging. Created lazily. Cursors _cursors; diff --git a/src/mongo/db/pipeline/document_source_merge_cursors.cpp b/src/mongo/db/pipeline/document_source_merge_cursors.cpp index 47939919c9d..6056a2f7646 100644 --- a/src/mongo/db/pipeline/document_source_merge_cursors.cpp +++ b/src/mongo/db/pipeline/document_source_merge_cursors.cpp @@ -39,8 +39,9 @@ using std::string; using std::vector; DocumentSourceMergeCursors::DocumentSourceMergeCursors( - const CursorIds& cursorIds, const intrusive_ptr& pExpCtx) - : DocumentSource(pExpCtx), _cursorIds(cursorIds), _unstarted(true) {} + std::vector cursorDescriptors, + const intrusive_ptr& pExpCtx) + : DocumentSource(pExpCtx), _cursorDescriptors(std::move(cursorDescriptors)), _unstarted(true) {} REGISTER_DOCUMENT_SOURCE(mergeCursors, DocumentSourceMergeCursors::createFromBson); @@ -49,8 +50,9 @@ const char* DocumentSourceMergeCursors::getSourceName() const { } intrusive_ptr DocumentSourceMergeCursors::create( - const CursorIds& cursorIds, const intrusive_ptr& pExpCtx) { - return new DocumentSourceMergeCursors(cursorIds, pExpCtx); + std::vector cursorDescriptors, + const intrusive_ptr& pExpCtx) { + return new DocumentSourceMergeCursors(std::move(cursorDescriptors), pExpCtx); } intrusive_ptr DocumentSourceMergeCursors::createFromBson( @@ -59,33 +61,40 @@ intrusive_ptr DocumentSourceMergeCursors::createFromBson( string("Expected an Array, but got a ") + typeName(elem.type()), elem.type() == Array); - CursorIds cursorIds; + std::vector cursorDescriptors; BSONObj array = elem.embeddedObject(); BSONForEach(cursor, array) { massert(17027, string("Expected an Object, but got a ") + typeName(cursor.type()), cursor.type() == Object); - cursorIds.push_back( - make_pair(ConnectionString(HostAndPort(cursor["host"].String())), cursor["id"].Long())); + // The cursor descriptors for the merge cursors stage used to lack an "ns" field; "ns" was + // understood to be the expression context namespace in that case. For mixed-version + // compatibility, we accept both the old and new formats here. + std::string cursorNs = cursor["ns"] ? cursor["ns"].String() : pExpCtx->ns.ns(); + + cursorDescriptors.emplace_back(ConnectionString(HostAndPort(cursor["host"].String())), + std::move(cursorNs), + cursor["id"].Long()); } - return new DocumentSourceMergeCursors(cursorIds, pExpCtx); + return new DocumentSourceMergeCursors(std::move(cursorDescriptors), pExpCtx); } Value DocumentSourceMergeCursors::serialize(bool explain) const { vector cursors; - for (size_t i = 0; i < _cursorIds.size(); i++) { + for (size_t i = 0; i < _cursorDescriptors.size(); i++) { cursors.push_back(Value( - DOC("host" << Value(_cursorIds[i].first.toString()) << "id" << _cursorIds[i].second))); + DOC("host" << Value(_cursorDescriptors[i].connectionString.toString()) << "ns" + << _cursorDescriptors[i].ns << "id" << _cursorDescriptors[i].cursorId))); } return Value(DOC(getSourceName() << Value(cursors))); } -DocumentSourceMergeCursors::CursorAndConnection::CursorAndConnection(ConnectionString host, - NamespaceString nss, - CursorId id) - : connection(host), cursor(connection.get(), nss.ns(), id, 0, 0) {} +DocumentSourceMergeCursors::CursorAndConnection::CursorAndConnection( + const CursorDescriptor& cursorDescriptor) + : connection(cursorDescriptor.connectionString), + cursor(connection.get(), cursorDescriptor.ns, cursorDescriptor.cursorId, 0, 0) {} vector DocumentSourceMergeCursors::getCursors() { verify(_unstarted); @@ -102,18 +111,17 @@ void DocumentSourceMergeCursors::start() { _unstarted = false; // open each cursor and send message asking for a batch - for (CursorIds::const_iterator it = _cursorIds.begin(); it != _cursorIds.end(); ++it) { - _cursors.push_back( - std::make_shared(it->first, pExpCtx->ns, it->second)); + for (auto&& cursorDescriptor : _cursorDescriptors) { + _cursors.push_back(std::make_shared(cursorDescriptor)); verify(_cursors.back()->connection->lazySupported()); _cursors.back()->cursor.initLazy(); // shouldn't block } // wait for all cursors to return a batch // TODO need a way to keep cursors alive if some take longer than 10 minutes. - for (Cursors::const_iterator it = _cursors.begin(); it != _cursors.end(); ++it) { + for (auto&& cursor : _cursors) { bool retry = false; - bool ok = (*it)->cursor.initLazyFinish(retry); // blocks here for first batch + bool ok = cursor->cursor.initLazyFinish(retry); // blocks here for first batch uassert( 17028, "error reading response from " + _cursors.back()->connection->toString(), ok); diff --git a/src/mongo/s/commands/cluster_pipeline_cmd.cpp b/src/mongo/s/commands/cluster_pipeline_cmd.cpp index 5d34c4fb2b7..b3401a8c52f 100644 --- a/src/mongo/s/commands/cluster_pipeline_cmd.cpp +++ b/src/mongo/s/commands/cluster_pipeline_cmd.cpp @@ -209,8 +209,8 @@ public: return reply["ok"].trueValue(); } - DocumentSourceMergeCursors::CursorIds cursorIds = parseCursors(shardResults, fullns); - pipeline->addInitialSource(DocumentSourceMergeCursors::create(cursorIds, mergeCtx)); + pipeline->addInitialSource( + DocumentSourceMergeCursors::create(parseCursors(shardResults), mergeCtx)); MutableDocument mergeCmd(pipeline->serialize()); mergeCmd["cursor"] = Value(cmdObj["cursor"]); @@ -251,8 +251,8 @@ public: } private: - DocumentSourceMergeCursors::CursorIds parseCursors( - const vector& shardResults, const string& fullns); + std::vector parseCursors( + const vector& shardResults); void killAllCursors(const vector& shardResults); void uassertAllShardsSupportExplain(const vector& shardResults); @@ -271,10 +271,10 @@ private: int queryOptions); } clusterPipelineCmd; -DocumentSourceMergeCursors::CursorIds PipelineCommand::parseCursors( - const vector& shardResults, const string& fullns) { +std::vector PipelineCommand::parseCursors( + const vector& shardResults) { try { - DocumentSourceMergeCursors::CursorIds cursors; + std::vector cursors; for (size_t i = 0; i < shardResults.size(); i++) { BSONObj result = shardResults[i].result; @@ -309,10 +309,11 @@ DocumentSourceMergeCursors::CursorIds PipelineCommand::parseCursors( massert(17025, str::stream() << "shard " << shardResults[i].shardTargetId - << " returned different ns: " << cursor["ns"], - cursor["ns"].String() == fullns); + << " returned invalid ns: " << cursor["ns"], + NamespaceString(cursor["ns"].String()).isValid()); - cursors.push_back(std::make_pair(shardResults[i].target, cursor["id"].Long())); + cursors.emplace_back( + shardResults[i].target, cursor["ns"].String(), cursor["id"].Long()); } return cursors; -- cgit v1.2.1