diff options
Diffstat (limited to 'src/mongo/db/pipeline/document_source_out.cpp')
-rw-r--r-- | src/mongo/db/pipeline/document_source_out.cpp | 293 |
1 files changed, 144 insertions, 149 deletions
diff --git a/src/mongo/db/pipeline/document_source_out.cpp b/src/mongo/db/pipeline/document_source_out.cpp index bdb8d1d7055..858df389be8 100644 --- a/src/mongo/db/pipeline/document_source_out.cpp +++ b/src/mongo/db/pipeline/document_source_out.cpp @@ -32,171 +32,166 @@ namespace mongo { - using boost::intrusive_ptr; - using std::vector; - - const char DocumentSourceOut::outName[] = "$out"; - - DocumentSourceOut::~DocumentSourceOut() { - DESTRUCTOR_GUARD( - // Make sure we drop the temp collection if anything goes wrong. Errors are ignored - // here because nothing can be done about them. Additionally, if this fails and the - // collection is left behind, it will be cleaned up next time the server is started. - if (_mongod && _tempNs.size()) - _mongod->directClient()->dropCollection(_tempNs.ns()); - ) - } +using boost::intrusive_ptr; +using std::vector; - const char *DocumentSourceOut::getSourceName() const { - return outName; - } +const char DocumentSourceOut::outName[] = "$out"; - static AtomicUInt32 aggOutCounter; - void DocumentSourceOut::prepTempCollection() { - verify(_mongod); - verify(_tempNs.size() == 0); - - DBClientBase* conn = _mongod->directClient(); - - // Fail early by checking before we do any work. - uassert(17017, str::stream() << "namespace '" << _outputNs.ns() - << "' is sharded so it can't be used for $out'", - !_mongod->isSharded(_outputNs)); - - // cannot $out to capped collection - uassert(17152, str::stream() << "namespace '" << _outputNs.ns() - << "' is capped so it can't be used for $out", - !_mongod->isCapped(_outputNs)); - - _tempNs = NamespaceString(StringData(str::stream() << _outputNs.db() - << ".tmp.agg_out." - << aggOutCounter.addAndFetch(1) - )); - - // Create output collection, copying options from existing collection if any. - { - const auto infos = conn->getCollectionInfos(_outputNs.db().toString(), - BSON("name" << _outputNs.coll())); - const auto options = infos.empty() ? BSONObj() - : infos.front().getObjectField("options"); - - BSONObjBuilder cmd; - cmd << "create" << _tempNs.coll(); - cmd << "temp" << true; - cmd.appendElementsUnique(options); - - BSONObj info; - bool ok = conn->runCommand(_outputNs.db().toString(), cmd.done(), info); - uassert(16994, str::stream() << "failed to create temporary $out collection '" - << _tempNs.ns() << "': " << info.toString(), - ok); - } +DocumentSourceOut::~DocumentSourceOut() { + DESTRUCTOR_GUARD( + // Make sure we drop the temp collection if anything goes wrong. Errors are ignored + // here because nothing can be done about them. Additionally, if this fails and the + // collection is left behind, it will be cleaned up next time the server is started. + if (_mongod && _tempNs.size()) _mongod->directClient()->dropCollection(_tempNs.ns());) +} - // copy indexes on _outputNs to _tempNs - const std::list<BSONObj> indexes = conn->getIndexSpecs(_outputNs); - for (std::list<BSONObj>::const_iterator it = indexes.begin(); it != indexes.end(); ++it) { - MutableDocument index((Document(*it))); - index.remove("_id"); // indexes shouldn't have _ids but some existing ones do - index["ns"] = Value(_tempNs.ns()); - - BSONObj indexBson = index.freeze().toBson(); - conn->insert(_tempNs.getSystemIndexesCollection(), indexBson); - BSONObj err = conn->getLastErrorDetailed(); - uassert(16995, str::stream() << "copying index for $out failed." - << " index: " << indexBson - << " error: " << err, - DBClientWithCommands::getLastErrorString(err).empty()); - } - } +const char* DocumentSourceOut::getSourceName() const { + return outName; +} - void DocumentSourceOut::spill(const vector<BSONObj>& toInsert) { - BSONObj err = _mongod->insert(_tempNs, toInsert); - uassert(16996, str::stream() << "insert for $out failed: " << err, - DBClientWithCommands::getLastErrorString(err).empty()); - } +static AtomicUInt32 aggOutCounter; +void DocumentSourceOut::prepTempCollection() { + verify(_mongod); + verify(_tempNs.size() == 0); - boost::optional<Document> DocumentSourceOut::getNext() { - pExpCtx->checkForInterrupt(); - - // make sure we only write out once - if (_done) - return boost::none; - _done = true; - - verify(_mongod); - DBClientBase* conn = _mongod->directClient(); - - prepTempCollection(); - verify(_tempNs.size() != 0); - - vector<BSONObj> bufferedObjects; - int bufferedBytes = 0; - while (boost::optional<Document> next = pSource->getNext()) { - BSONObj toInsert = next->toBson(); - bufferedBytes += toInsert.objsize(); - if (!bufferedObjects.empty() && bufferedBytes > BSONObjMaxUserSize) { - spill(bufferedObjects); - bufferedObjects.clear(); - bufferedBytes = toInsert.objsize(); - } - bufferedObjects.push_back(toInsert); - } + DBClientBase* conn = _mongod->directClient(); - if (!bufferedObjects.empty()) - spill(bufferedObjects); + // Fail early by checking before we do any work. + uassert(17017, + str::stream() << "namespace '" << _outputNs.ns() + << "' is sharded so it can't be used for $out'", + !_mongod->isSharded(_outputNs)); + + // cannot $out to capped collection + uassert(17152, + str::stream() << "namespace '" << _outputNs.ns() + << "' is capped so it can't be used for $out", + !_mongod->isCapped(_outputNs)); + + _tempNs = NamespaceString(StringData(str::stream() << _outputNs.db() << ".tmp.agg_out." + << aggOutCounter.addAndFetch(1))); - // Checking again to make sure we didn't become sharded while running. - uassert(17018, str::stream() << "namespace '" << _outputNs.ns() - << "' became sharded so it can't be used for $out'", - !_mongod->isSharded(_outputNs)); + // Create output collection, copying options from existing collection if any. + { + const auto infos = + conn->getCollectionInfos(_outputNs.db().toString(), BSON("name" << _outputNs.coll())); + const auto options = infos.empty() ? BSONObj() : infos.front().getObjectField("options"); + + BSONObjBuilder cmd; + cmd << "create" << _tempNs.coll(); + cmd << "temp" << true; + cmd.appendElementsUnique(options); - BSONObj rename = BSON("renameCollection" << _tempNs.ns() - << "to" << _outputNs.ns() - << "dropTarget" << true - ); BSONObj info; - bool ok = conn->runCommand("admin", rename, info); - uassert(16997, str::stream() << "renameCollection for $out failed: " << info, + bool ok = conn->runCommand(_outputNs.db().toString(), cmd.done(), info); + uassert(16994, + str::stream() << "failed to create temporary $out collection '" << _tempNs.ns() + << "': " << info.toString(), ok); + } + + // copy indexes on _outputNs to _tempNs + const std::list<BSONObj> indexes = conn->getIndexSpecs(_outputNs); + for (std::list<BSONObj>::const_iterator it = indexes.begin(); it != indexes.end(); ++it) { + MutableDocument index((Document(*it))); + index.remove("_id"); // indexes shouldn't have _ids but some existing ones do + index["ns"] = Value(_tempNs.ns()); + + BSONObj indexBson = index.freeze().toBson(); + conn->insert(_tempNs.getSystemIndexesCollection(), indexBson); + BSONObj err = conn->getLastErrorDetailed(); + uassert(16995, + str::stream() << "copying index for $out failed." + << " index: " << indexBson << " error: " << err, + DBClientWithCommands::getLastErrorString(err).empty()); + } +} - // We don't need to drop the temp collection in our destructor if the rename succeeded. - _tempNs = NamespaceString(""); +void DocumentSourceOut::spill(const vector<BSONObj>& toInsert) { + BSONObj err = _mongod->insert(_tempNs, toInsert); + uassert(16996, + str::stream() << "insert for $out failed: " << err, + DBClientWithCommands::getLastErrorString(err).empty()); +} + +boost::optional<Document> DocumentSourceOut::getNext() { + pExpCtx->checkForInterrupt(); - // This "DocumentSource" doesn't produce output documents. This can change in the future - // if we support using $out in "tee" mode. + // make sure we only write out once + if (_done) return boost::none; - } + _done = true; - DocumentSourceOut::DocumentSourceOut(const NamespaceString& outputNs, - const intrusive_ptr<ExpressionContext>& pExpCtx) - : DocumentSource(pExpCtx) - , _done(false) - , _tempNs("") // filled in by prepTempCollection - , _outputNs(outputNs) - {} - - intrusive_ptr<DocumentSource> DocumentSourceOut::createFromBson( - BSONElement elem, - const intrusive_ptr<ExpressionContext> &pExpCtx) { - uassert(16990, str::stream() << "$out only supports a string argument, not " - << typeName(elem.type()), - elem.type() == String); - - NamespaceString outputNs(pExpCtx->ns.db().toString() + '.' + elem.str()); - uassert(17385, "Can't $out to special collection: " + elem.str(), - !outputNs.isSpecial()); - return new DocumentSourceOut(outputNs, pExpCtx); - } + verify(_mongod); + DBClientBase* conn = _mongod->directClient(); - Value DocumentSourceOut::serialize(bool explain) const { - massert(17000, "$out shouldn't have different db than input", - _outputNs.db() == pExpCtx->ns.db()); + prepTempCollection(); + verify(_tempNs.size() != 0); - return Value(DOC(getSourceName() << _outputNs.coll())); + vector<BSONObj> bufferedObjects; + int bufferedBytes = 0; + while (boost::optional<Document> next = pSource->getNext()) { + BSONObj toInsert = next->toBson(); + bufferedBytes += toInsert.objsize(); + if (!bufferedObjects.empty() && bufferedBytes > BSONObjMaxUserSize) { + spill(bufferedObjects); + bufferedObjects.clear(); + bufferedBytes = toInsert.objsize(); + } + bufferedObjects.push_back(toInsert); } - DocumentSource::GetDepsReturn DocumentSourceOut::getDependencies(DepsTracker* deps) const { - deps->needWholeDocument = true; - return EXHAUSTIVE_ALL; - } + if (!bufferedObjects.empty()) + spill(bufferedObjects); + + // Checking again to make sure we didn't become sharded while running. + uassert(17018, + str::stream() << "namespace '" << _outputNs.ns() + << "' became sharded so it can't be used for $out'", + !_mongod->isSharded(_outputNs)); + + BSONObj rename = + BSON("renameCollection" << _tempNs.ns() << "to" << _outputNs.ns() << "dropTarget" << true); + BSONObj info; + bool ok = conn->runCommand("admin", rename, info); + uassert(16997, str::stream() << "renameCollection for $out failed: " << info, ok); + + // We don't need to drop the temp collection in our destructor if the rename succeeded. + _tempNs = NamespaceString(""); + + // This "DocumentSource" doesn't produce output documents. This can change in the future + // if we support using $out in "tee" mode. + return boost::none; +} + +DocumentSourceOut::DocumentSourceOut(const NamespaceString& outputNs, + const intrusive_ptr<ExpressionContext>& pExpCtx) + : DocumentSource(pExpCtx), + _done(false), + _tempNs("") // filled in by prepTempCollection + , + _outputNs(outputNs) {} + +intrusive_ptr<DocumentSource> DocumentSourceOut::createFromBson( + BSONElement elem, const intrusive_ptr<ExpressionContext>& pExpCtx) { + uassert(16990, + str::stream() << "$out only supports a string argument, not " << typeName(elem.type()), + elem.type() == String); + + NamespaceString outputNs(pExpCtx->ns.db().toString() + '.' + elem.str()); + uassert(17385, "Can't $out to special collection: " + elem.str(), !outputNs.isSpecial()); + return new DocumentSourceOut(outputNs, pExpCtx); +} + +Value DocumentSourceOut::serialize(bool explain) const { + massert( + 17000, "$out shouldn't have different db than input", _outputNs.db() == pExpCtx->ns.db()); + + return Value(DOC(getSourceName() << _outputNs.coll())); +} + +DocumentSource::GetDepsReturn DocumentSourceOut::getDependencies(DepsTracker* deps) const { + deps->needWholeDocument = true; + return EXHAUSTIVE_ALL; +} } |