summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline/document_source_out.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/pipeline/document_source_out.cpp')
-rw-r--r--src/mongo/db/pipeline/document_source_out.cpp293
1 files changed, 144 insertions, 149 deletions
diff --git a/src/mongo/db/pipeline/document_source_out.cpp b/src/mongo/db/pipeline/document_source_out.cpp
index bdb8d1d7055..858df389be8 100644
--- a/src/mongo/db/pipeline/document_source_out.cpp
+++ b/src/mongo/db/pipeline/document_source_out.cpp
@@ -32,171 +32,166 @@
namespace mongo {
- using boost::intrusive_ptr;
- using std::vector;
-
- const char DocumentSourceOut::outName[] = "$out";
-
- DocumentSourceOut::~DocumentSourceOut() {
- DESTRUCTOR_GUARD(
- // Make sure we drop the temp collection if anything goes wrong. Errors are ignored
- // here because nothing can be done about them. Additionally, if this fails and the
- // collection is left behind, it will be cleaned up next time the server is started.
- if (_mongod && _tempNs.size())
- _mongod->directClient()->dropCollection(_tempNs.ns());
- )
- }
+using boost::intrusive_ptr;
+using std::vector;
- const char *DocumentSourceOut::getSourceName() const {
- return outName;
- }
+const char DocumentSourceOut::outName[] = "$out";
- static AtomicUInt32 aggOutCounter;
- void DocumentSourceOut::prepTempCollection() {
- verify(_mongod);
- verify(_tempNs.size() == 0);
-
- DBClientBase* conn = _mongod->directClient();
-
- // Fail early by checking before we do any work.
- uassert(17017, str::stream() << "namespace '" << _outputNs.ns()
- << "' is sharded so it can't be used for $out'",
- !_mongod->isSharded(_outputNs));
-
- // cannot $out to capped collection
- uassert(17152, str::stream() << "namespace '" << _outputNs.ns()
- << "' is capped so it can't be used for $out",
- !_mongod->isCapped(_outputNs));
-
- _tempNs = NamespaceString(StringData(str::stream() << _outputNs.db()
- << ".tmp.agg_out."
- << aggOutCounter.addAndFetch(1)
- ));
-
- // Create output collection, copying options from existing collection if any.
- {
- const auto infos = conn->getCollectionInfos(_outputNs.db().toString(),
- BSON("name" << _outputNs.coll()));
- const auto options = infos.empty() ? BSONObj()
- : infos.front().getObjectField("options");
-
- BSONObjBuilder cmd;
- cmd << "create" << _tempNs.coll();
- cmd << "temp" << true;
- cmd.appendElementsUnique(options);
-
- BSONObj info;
- bool ok = conn->runCommand(_outputNs.db().toString(), cmd.done(), info);
- uassert(16994, str::stream() << "failed to create temporary $out collection '"
- << _tempNs.ns() << "': " << info.toString(),
- ok);
- }
+DocumentSourceOut::~DocumentSourceOut() {
+ DESTRUCTOR_GUARD(
+ // Make sure we drop the temp collection if anything goes wrong. Errors are ignored
+ // here because nothing can be done about them. Additionally, if this fails and the
+ // collection is left behind, it will be cleaned up next time the server is started.
+ if (_mongod && _tempNs.size()) _mongod->directClient()->dropCollection(_tempNs.ns());)
+}
- // copy indexes on _outputNs to _tempNs
- const std::list<BSONObj> indexes = conn->getIndexSpecs(_outputNs);
- for (std::list<BSONObj>::const_iterator it = indexes.begin(); it != indexes.end(); ++it) {
- MutableDocument index((Document(*it)));
- index.remove("_id"); // indexes shouldn't have _ids but some existing ones do
- index["ns"] = Value(_tempNs.ns());
-
- BSONObj indexBson = index.freeze().toBson();
- conn->insert(_tempNs.getSystemIndexesCollection(), indexBson);
- BSONObj err = conn->getLastErrorDetailed();
- uassert(16995, str::stream() << "copying index for $out failed."
- << " index: " << indexBson
- << " error: " << err,
- DBClientWithCommands::getLastErrorString(err).empty());
- }
- }
+const char* DocumentSourceOut::getSourceName() const {
+ return outName;
+}
- void DocumentSourceOut::spill(const vector<BSONObj>& toInsert) {
- BSONObj err = _mongod->insert(_tempNs, toInsert);
- uassert(16996, str::stream() << "insert for $out failed: " << err,
- DBClientWithCommands::getLastErrorString(err).empty());
- }
+static AtomicUInt32 aggOutCounter;
+void DocumentSourceOut::prepTempCollection() {
+ verify(_mongod);
+ verify(_tempNs.size() == 0);
- boost::optional<Document> DocumentSourceOut::getNext() {
- pExpCtx->checkForInterrupt();
-
- // make sure we only write out once
- if (_done)
- return boost::none;
- _done = true;
-
- verify(_mongod);
- DBClientBase* conn = _mongod->directClient();
-
- prepTempCollection();
- verify(_tempNs.size() != 0);
-
- vector<BSONObj> bufferedObjects;
- int bufferedBytes = 0;
- while (boost::optional<Document> next = pSource->getNext()) {
- BSONObj toInsert = next->toBson();
- bufferedBytes += toInsert.objsize();
- if (!bufferedObjects.empty() && bufferedBytes > BSONObjMaxUserSize) {
- spill(bufferedObjects);
- bufferedObjects.clear();
- bufferedBytes = toInsert.objsize();
- }
- bufferedObjects.push_back(toInsert);
- }
+ DBClientBase* conn = _mongod->directClient();
- if (!bufferedObjects.empty())
- spill(bufferedObjects);
+ // Fail early by checking before we do any work.
+ uassert(17017,
+ str::stream() << "namespace '" << _outputNs.ns()
+ << "' is sharded so it can't be used for $out'",
+ !_mongod->isSharded(_outputNs));
+
+ // cannot $out to capped collection
+ uassert(17152,
+ str::stream() << "namespace '" << _outputNs.ns()
+ << "' is capped so it can't be used for $out",
+ !_mongod->isCapped(_outputNs));
+
+ _tempNs = NamespaceString(StringData(str::stream() << _outputNs.db() << ".tmp.agg_out."
+ << aggOutCounter.addAndFetch(1)));
- // Checking again to make sure we didn't become sharded while running.
- uassert(17018, str::stream() << "namespace '" << _outputNs.ns()
- << "' became sharded so it can't be used for $out'",
- !_mongod->isSharded(_outputNs));
+ // Create output collection, copying options from existing collection if any.
+ {
+ const auto infos =
+ conn->getCollectionInfos(_outputNs.db().toString(), BSON("name" << _outputNs.coll()));
+ const auto options = infos.empty() ? BSONObj() : infos.front().getObjectField("options");
+
+ BSONObjBuilder cmd;
+ cmd << "create" << _tempNs.coll();
+ cmd << "temp" << true;
+ cmd.appendElementsUnique(options);
- BSONObj rename = BSON("renameCollection" << _tempNs.ns()
- << "to" << _outputNs.ns()
- << "dropTarget" << true
- );
BSONObj info;
- bool ok = conn->runCommand("admin", rename, info);
- uassert(16997, str::stream() << "renameCollection for $out failed: " << info,
+ bool ok = conn->runCommand(_outputNs.db().toString(), cmd.done(), info);
+ uassert(16994,
+ str::stream() << "failed to create temporary $out collection '" << _tempNs.ns()
+ << "': " << info.toString(),
ok);
+ }
+
+ // copy indexes on _outputNs to _tempNs
+ const std::list<BSONObj> indexes = conn->getIndexSpecs(_outputNs);
+ for (std::list<BSONObj>::const_iterator it = indexes.begin(); it != indexes.end(); ++it) {
+ MutableDocument index((Document(*it)));
+ index.remove("_id"); // indexes shouldn't have _ids but some existing ones do
+ index["ns"] = Value(_tempNs.ns());
+
+ BSONObj indexBson = index.freeze().toBson();
+ conn->insert(_tempNs.getSystemIndexesCollection(), indexBson);
+ BSONObj err = conn->getLastErrorDetailed();
+ uassert(16995,
+ str::stream() << "copying index for $out failed."
+ << " index: " << indexBson << " error: " << err,
+ DBClientWithCommands::getLastErrorString(err).empty());
+ }
+}
- // We don't need to drop the temp collection in our destructor if the rename succeeded.
- _tempNs = NamespaceString("");
+void DocumentSourceOut::spill(const vector<BSONObj>& toInsert) {
+ BSONObj err = _mongod->insert(_tempNs, toInsert);
+ uassert(16996,
+ str::stream() << "insert for $out failed: " << err,
+ DBClientWithCommands::getLastErrorString(err).empty());
+}
+
+boost::optional<Document> DocumentSourceOut::getNext() {
+ pExpCtx->checkForInterrupt();
- // This "DocumentSource" doesn't produce output documents. This can change in the future
- // if we support using $out in "tee" mode.
+ // make sure we only write out once
+ if (_done)
return boost::none;
- }
+ _done = true;
- DocumentSourceOut::DocumentSourceOut(const NamespaceString& outputNs,
- const intrusive_ptr<ExpressionContext>& pExpCtx)
- : DocumentSource(pExpCtx)
- , _done(false)
- , _tempNs("") // filled in by prepTempCollection
- , _outputNs(outputNs)
- {}
-
- intrusive_ptr<DocumentSource> DocumentSourceOut::createFromBson(
- BSONElement elem,
- const intrusive_ptr<ExpressionContext> &pExpCtx) {
- uassert(16990, str::stream() << "$out only supports a string argument, not "
- << typeName(elem.type()),
- elem.type() == String);
-
- NamespaceString outputNs(pExpCtx->ns.db().toString() + '.' + elem.str());
- uassert(17385, "Can't $out to special collection: " + elem.str(),
- !outputNs.isSpecial());
- return new DocumentSourceOut(outputNs, pExpCtx);
- }
+ verify(_mongod);
+ DBClientBase* conn = _mongod->directClient();
- Value DocumentSourceOut::serialize(bool explain) const {
- massert(17000, "$out shouldn't have different db than input",
- _outputNs.db() == pExpCtx->ns.db());
+ prepTempCollection();
+ verify(_tempNs.size() != 0);
- return Value(DOC(getSourceName() << _outputNs.coll()));
+ vector<BSONObj> bufferedObjects;
+ int bufferedBytes = 0;
+ while (boost::optional<Document> next = pSource->getNext()) {
+ BSONObj toInsert = next->toBson();
+ bufferedBytes += toInsert.objsize();
+ if (!bufferedObjects.empty() && bufferedBytes > BSONObjMaxUserSize) {
+ spill(bufferedObjects);
+ bufferedObjects.clear();
+ bufferedBytes = toInsert.objsize();
+ }
+ bufferedObjects.push_back(toInsert);
}
- DocumentSource::GetDepsReturn DocumentSourceOut::getDependencies(DepsTracker* deps) const {
- deps->needWholeDocument = true;
- return EXHAUSTIVE_ALL;
- }
+ if (!bufferedObjects.empty())
+ spill(bufferedObjects);
+
+ // Checking again to make sure we didn't become sharded while running.
+ uassert(17018,
+ str::stream() << "namespace '" << _outputNs.ns()
+ << "' became sharded so it can't be used for $out'",
+ !_mongod->isSharded(_outputNs));
+
+ BSONObj rename =
+ BSON("renameCollection" << _tempNs.ns() << "to" << _outputNs.ns() << "dropTarget" << true);
+ BSONObj info;
+ bool ok = conn->runCommand("admin", rename, info);
+ uassert(16997, str::stream() << "renameCollection for $out failed: " << info, ok);
+
+ // We don't need to drop the temp collection in our destructor if the rename succeeded.
+ _tempNs = NamespaceString("");
+
+ // This "DocumentSource" doesn't produce output documents. This can change in the future
+ // if we support using $out in "tee" mode.
+ return boost::none;
+}
+
+DocumentSourceOut::DocumentSourceOut(const NamespaceString& outputNs,
+ const intrusive_ptr<ExpressionContext>& pExpCtx)
+ : DocumentSource(pExpCtx),
+ _done(false),
+ _tempNs("") // filled in by prepTempCollection
+ ,
+ _outputNs(outputNs) {}
+
+intrusive_ptr<DocumentSource> DocumentSourceOut::createFromBson(
+ BSONElement elem, const intrusive_ptr<ExpressionContext>& pExpCtx) {
+ uassert(16990,
+ str::stream() << "$out only supports a string argument, not " << typeName(elem.type()),
+ elem.type() == String);
+
+ NamespaceString outputNs(pExpCtx->ns.db().toString() + '.' + elem.str());
+ uassert(17385, "Can't $out to special collection: " + elem.str(), !outputNs.isSpecial());
+ return new DocumentSourceOut(outputNs, pExpCtx);
+}
+
+Value DocumentSourceOut::serialize(bool explain) const {
+ massert(
+ 17000, "$out shouldn't have different db than input", _outputNs.db() == pExpCtx->ns.db());
+
+ return Value(DOC(getSourceName() << _outputNs.coll()));
+}
+
+DocumentSource::GetDepsReturn DocumentSourceOut::getDependencies(DepsTracker* deps) const {
+ deps->needWholeDocument = true;
+ return EXHAUSTIVE_ALL;
+}
}