summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJason Rassi <rassi@10gen.com>2016-03-29 16:04:42 -0400
committerJason Rassi <rassi@10gen.com>2016-03-29 17:01:33 -0400
commite9638fa4456bbb4fd2492f84c6ec82e37dded8e2 (patch)
treef5228aaea5e441c04f8a4d3e78e62f8d87182b71
parent42930d4ffc88527489f8c60c80a0d57bb95519e5 (diff)
downloadmongo-e9638fa4456bbb4fd2492f84c6ec82e37dded8e2.tar.gz
SERVER-23294 DocumentSourceMergeCursors accept any valid cursor ns
-rw-r--r--src/mongo/db/pipeline/document_source.h22
-rw-r--r--src/mongo/db/pipeline/document_source_merge_cursors.cpp46
-rw-r--r--src/mongo/s/commands/cluster_pipeline_cmd.cpp21
3 files changed, 54 insertions, 35 deletions
diff --git a/src/mongo/db/pipeline/document_source.h b/src/mongo/db/pipeline/document_source.h
index 6c9dee2aa39..a59dfe6999d 100644
--- a/src/mongo/db/pipeline/document_source.h
+++ b/src/mongo/db/pipeline/document_source.h
@@ -678,7 +678,16 @@ private:
class DocumentSourceMergeCursors : public DocumentSource {
public:
- typedef std::vector<std::pair<ConnectionString, CursorId>> CursorIds;
+ struct CursorDescriptor {
+ CursorDescriptor(ConnectionString connectionString, std::string ns, CursorId cursorId)
+ : connectionString(std::move(connectionString)),
+ ns(std::move(ns)),
+ cursorId(cursorId) {}
+
+ ConnectionString connectionString;
+ std::string ns;
+ CursorId cursorId;
+ };
// virtuals from DocumentSource
boost::optional<Document> getNext();
@@ -693,7 +702,8 @@ public:
BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
static boost::intrusive_ptr<DocumentSource> create(
- const CursorIds& cursorIds, const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
+ std::vector<CursorDescriptor> cursorDescriptors,
+ const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
/** Returns non-owning pointers to cursors managed by this stage.
* Call this instead of getNext() if you want access to the raw streams.
@@ -709,7 +719,7 @@ public:
private:
struct CursorAndConnection {
- CursorAndConnection(ConnectionString host, NamespaceString ns, CursorId id);
+ CursorAndConnection(const CursorDescriptor& cursorDescriptor);
ScopedDbConnection connection;
DBClientCursor cursor;
};
@@ -717,14 +727,14 @@ private:
// using list to enable removing arbitrary elements
typedef std::list<std::shared_ptr<CursorAndConnection>> Cursors;
- DocumentSourceMergeCursors(const CursorIds& cursorIds,
+ DocumentSourceMergeCursors(std::vector<CursorDescriptor> cursorDescriptors,
const boost::intrusive_ptr<ExpressionContext>& pExpCtx);
- // Converts _cursorIds into active _cursors.
+ // Converts _cursorDescriptors into active _cursors.
void start();
// This is the description of cursors to merge.
- const CursorIds _cursorIds;
+ const std::vector<CursorDescriptor> _cursorDescriptors;
// These are the actual cursors we are merging. Created lazily.
Cursors _cursors;
diff --git a/src/mongo/db/pipeline/document_source_merge_cursors.cpp b/src/mongo/db/pipeline/document_source_merge_cursors.cpp
index 47939919c9d..6056a2f7646 100644
--- a/src/mongo/db/pipeline/document_source_merge_cursors.cpp
+++ b/src/mongo/db/pipeline/document_source_merge_cursors.cpp
@@ -39,8 +39,9 @@ using std::string;
using std::vector;
DocumentSourceMergeCursors::DocumentSourceMergeCursors(
- const CursorIds& cursorIds, const intrusive_ptr<ExpressionContext>& pExpCtx)
- : DocumentSource(pExpCtx), _cursorIds(cursorIds), _unstarted(true) {}
+ std::vector<CursorDescriptor> cursorDescriptors,
+ const intrusive_ptr<ExpressionContext>& pExpCtx)
+ : DocumentSource(pExpCtx), _cursorDescriptors(std::move(cursorDescriptors)), _unstarted(true) {}
REGISTER_DOCUMENT_SOURCE(mergeCursors, DocumentSourceMergeCursors::createFromBson);
@@ -49,8 +50,9 @@ const char* DocumentSourceMergeCursors::getSourceName() const {
}
intrusive_ptr<DocumentSource> DocumentSourceMergeCursors::create(
- const CursorIds& cursorIds, const intrusive_ptr<ExpressionContext>& pExpCtx) {
- return new DocumentSourceMergeCursors(cursorIds, pExpCtx);
+ std::vector<CursorDescriptor> cursorDescriptors,
+ const intrusive_ptr<ExpressionContext>& pExpCtx) {
+ return new DocumentSourceMergeCursors(std::move(cursorDescriptors), pExpCtx);
}
intrusive_ptr<DocumentSource> DocumentSourceMergeCursors::createFromBson(
@@ -59,33 +61,40 @@ intrusive_ptr<DocumentSource> DocumentSourceMergeCursors::createFromBson(
string("Expected an Array, but got a ") + typeName(elem.type()),
elem.type() == Array);
- CursorIds cursorIds;
+ std::vector<CursorDescriptor> cursorDescriptors;
BSONObj array = elem.embeddedObject();
BSONForEach(cursor, array) {
massert(17027,
string("Expected an Object, but got a ") + typeName(cursor.type()),
cursor.type() == Object);
- cursorIds.push_back(
- make_pair(ConnectionString(HostAndPort(cursor["host"].String())), cursor["id"].Long()));
+ // The cursor descriptors for the merge cursors stage used to lack an "ns" field; "ns" was
+ // understood to be the expression context namespace in that case. For mixed-version
+ // compatibility, we accept both the old and new formats here.
+ std::string cursorNs = cursor["ns"] ? cursor["ns"].String() : pExpCtx->ns.ns();
+
+ cursorDescriptors.emplace_back(ConnectionString(HostAndPort(cursor["host"].String())),
+ std::move(cursorNs),
+ cursor["id"].Long());
}
- return new DocumentSourceMergeCursors(cursorIds, pExpCtx);
+ return new DocumentSourceMergeCursors(std::move(cursorDescriptors), pExpCtx);
}
Value DocumentSourceMergeCursors::serialize(bool explain) const {
vector<Value> cursors;
- for (size_t i = 0; i < _cursorIds.size(); i++) {
+ for (size_t i = 0; i < _cursorDescriptors.size(); i++) {
cursors.push_back(Value(
- DOC("host" << Value(_cursorIds[i].first.toString()) << "id" << _cursorIds[i].second)));
+ DOC("host" << Value(_cursorDescriptors[i].connectionString.toString()) << "ns"
+ << _cursorDescriptors[i].ns << "id" << _cursorDescriptors[i].cursorId)));
}
return Value(DOC(getSourceName() << Value(cursors)));
}
-DocumentSourceMergeCursors::CursorAndConnection::CursorAndConnection(ConnectionString host,
- NamespaceString nss,
- CursorId id)
- : connection(host), cursor(connection.get(), nss.ns(), id, 0, 0) {}
+DocumentSourceMergeCursors::CursorAndConnection::CursorAndConnection(
+ const CursorDescriptor& cursorDescriptor)
+ : connection(cursorDescriptor.connectionString),
+ cursor(connection.get(), cursorDescriptor.ns, cursorDescriptor.cursorId, 0, 0) {}
vector<DBClientCursor*> DocumentSourceMergeCursors::getCursors() {
verify(_unstarted);
@@ -102,18 +111,17 @@ void DocumentSourceMergeCursors::start() {
_unstarted = false;
// open each cursor and send message asking for a batch
- for (CursorIds::const_iterator it = _cursorIds.begin(); it != _cursorIds.end(); ++it) {
- _cursors.push_back(
- std::make_shared<CursorAndConnection>(it->first, pExpCtx->ns, it->second));
+ for (auto&& cursorDescriptor : _cursorDescriptors) {
+ _cursors.push_back(std::make_shared<CursorAndConnection>(cursorDescriptor));
verify(_cursors.back()->connection->lazySupported());
_cursors.back()->cursor.initLazy(); // shouldn't block
}
// wait for all cursors to return a batch
// TODO need a way to keep cursors alive if some take longer than 10 minutes.
- for (Cursors::const_iterator it = _cursors.begin(); it != _cursors.end(); ++it) {
+ for (auto&& cursor : _cursors) {
bool retry = false;
- bool ok = (*it)->cursor.initLazyFinish(retry); // blocks here for first batch
+ bool ok = cursor->cursor.initLazyFinish(retry); // blocks here for first batch
uassert(
17028, "error reading response from " + _cursors.back()->connection->toString(), ok);
diff --git a/src/mongo/s/commands/cluster_pipeline_cmd.cpp b/src/mongo/s/commands/cluster_pipeline_cmd.cpp
index 5d34c4fb2b7..b3401a8c52f 100644
--- a/src/mongo/s/commands/cluster_pipeline_cmd.cpp
+++ b/src/mongo/s/commands/cluster_pipeline_cmd.cpp
@@ -209,8 +209,8 @@ public:
return reply["ok"].trueValue();
}
- DocumentSourceMergeCursors::CursorIds cursorIds = parseCursors(shardResults, fullns);
- pipeline->addInitialSource(DocumentSourceMergeCursors::create(cursorIds, mergeCtx));
+ pipeline->addInitialSource(
+ DocumentSourceMergeCursors::create(parseCursors(shardResults), mergeCtx));
MutableDocument mergeCmd(pipeline->serialize());
mergeCmd["cursor"] = Value(cmdObj["cursor"]);
@@ -251,8 +251,8 @@ public:
}
private:
- DocumentSourceMergeCursors::CursorIds parseCursors(
- const vector<Strategy::CommandResult>& shardResults, const string& fullns);
+ std::vector<DocumentSourceMergeCursors::CursorDescriptor> parseCursors(
+ const vector<Strategy::CommandResult>& shardResults);
void killAllCursors(const vector<Strategy::CommandResult>& shardResults);
void uassertAllShardsSupportExplain(const vector<Strategy::CommandResult>& shardResults);
@@ -271,10 +271,10 @@ private:
int queryOptions);
} clusterPipelineCmd;
-DocumentSourceMergeCursors::CursorIds PipelineCommand::parseCursors(
- const vector<Strategy::CommandResult>& shardResults, const string& fullns) {
+std::vector<DocumentSourceMergeCursors::CursorDescriptor> PipelineCommand::parseCursors(
+ const vector<Strategy::CommandResult>& shardResults) {
try {
- DocumentSourceMergeCursors::CursorIds cursors;
+ std::vector<DocumentSourceMergeCursors::CursorDescriptor> cursors;
for (size_t i = 0; i < shardResults.size(); i++) {
BSONObj result = shardResults[i].result;
@@ -309,10 +309,11 @@ DocumentSourceMergeCursors::CursorIds PipelineCommand::parseCursors(
massert(17025,
str::stream() << "shard " << shardResults[i].shardTargetId
- << " returned different ns: " << cursor["ns"],
- cursor["ns"].String() == fullns);
+ << " returned invalid ns: " << cursor["ns"],
+ NamespaceString(cursor["ns"].String()).isValid());
- cursors.push_back(std::make_pair(shardResults[i].target, cursor["id"].Long()));
+ cursors.emplace_back(
+ shardResults[i].target, cursor["ns"].String(), cursor["id"].Long());
}
return cursors;