/**
* Copyright 2011 (c) 10gen Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#include "mongo/platform/basic.h"
#include "mongo/db/pipeline/document_source_out.h"
#include "mongo/stdx/memory.h"
#include "mongo/util/destructor_guard.h"
namespace mongo {
using boost::intrusive_ptr;
using std::vector;
DocumentSourceOut::~DocumentSourceOut() {
DESTRUCTOR_GUARD(
// Make sure we drop the temp collection if anything goes wrong. Errors are ignored
// here because nothing can be done about them. Additionally, if this fails and the
// collection is left behind, it will be cleaned up next time the server is started.
if (_mongod && _tempNs.size()) _mongod->directClient()->dropCollection(_tempNs.ns());)
}
std::unique_ptr DocumentSourceOut::liteParse(
const AggregationRequest& request, const BSONElement& spec) {
uassert(40325,
str::stream() << "$out stage requires a string argument, but found "
<< typeName(spec.type()),
spec.type() == BSONType::String);
NamespaceString targetNss(request.getNamespaceString().db(), spec.valueStringData());
uassert(40326,
str::stream() << "Invalid $out target namespace, " << targetNss.ns(),
targetNss.isValid());
return stdx::make_unique(std::move(targetNss));
}
REGISTER_DOCUMENT_SOURCE(out, DocumentSourceOut::liteParse, DocumentSourceOut::createFromBson);
const char* DocumentSourceOut::getSourceName() const {
return "$out";
}
static AtomicUInt32 aggOutCounter;
void DocumentSourceOut::initialize() {
invariant(_mongod);
DBClientBase* conn = _mongod->directClient();
// Save the original collection options and index specs so we can check they didn't change
// during computation.
_originalOutOptions = _mongod->getCollectionOptions(_outputNs);
_originalIndexes = conn->getIndexSpecs(_outputNs.ns());
// Check if it's sharded or capped to make sure we have a chance of succeeding before we do all
// the work. If the collection becomes capped during processing, the collection options will
// have changed, and the $out will fail. If it becomes sharded during processing, the final
// rename will fail.
uassert(17017,
str::stream() << "namespace '" << _outputNs.ns()
<< "' is sharded so it can't be used for $out'",
!_mongod->isSharded(_outputNs));
uassert(17152,
str::stream() << "namespace '" << _outputNs.ns()
<< "' is capped so it can't be used for $out",
_originalOutOptions["capped"].eoo());
// We will write all results into a temporary collection, then rename the temporary collection
// to be the target collection once we are done.
_tempNs = NamespaceString(str::stream() << _outputNs.db() << ".tmp.agg_out."
<< aggOutCounter.addAndFetch(1));
// Create output collection, copying options from existing collection if any.
{
BSONObjBuilder cmd;
cmd << "create" << _tempNs.coll();
cmd << "temp" << true;
cmd.appendElementsUnique(_originalOutOptions);
BSONObj info;
bool ok = conn->runCommand(_outputNs.db().toString(), cmd.done(), info);
uassert(16994,
str::stream() << "failed to create temporary $out collection '" << _tempNs.ns()
<< "': "
<< info.toString(),
ok);
}
// copy indexes to _tempNs
for (std::list::const_iterator it = _originalIndexes.begin();
it != _originalIndexes.end();
++it) {
MutableDocument index((Document(*it)));
index.remove("_id"); // indexes shouldn't have _ids but some existing ones do
index["ns"] = Value(_tempNs.ns());
BSONObj indexBson = index.freeze().toBson();
conn->insert(_tempNs.getSystemIndexesCollection(), indexBson);
BSONObj err = conn->getLastErrorDetailed();
uassert(16995,
str::stream() << "copying index for $out failed."
<< " index: "
<< indexBson
<< " error: "
<< err,
DBClientWithCommands::getLastErrorString(err).empty());
}
_initialized = true;
}
void DocumentSourceOut::spill(const vector& toInsert) {
BSONObj err = _mongod->insert(_tempNs, toInsert);
uassert(16996,
str::stream() << "insert for $out failed: " << err,
DBClientWithCommands::getLastErrorString(err).empty());
}
DocumentSource::GetNextResult DocumentSourceOut::getNext() {
pExpCtx->checkForInterrupt();
if (_done) {
return GetNextResult::makeEOF();
}
if (!_initialized) {
initialize();
}
// Insert all documents into temp collection, batching to perform vectored inserts.
vector bufferedObjects;
int bufferedBytes = 0;
auto nextInput = pSource->getNext();
for (; nextInput.isAdvanced(); nextInput = pSource->getNext()) {
BSONObj toInsert = nextInput.releaseDocument().toBson();
bufferedBytes += toInsert.objsize();
if (!bufferedObjects.empty() && bufferedBytes > BSONObjMaxUserSize) {
spill(bufferedObjects);
bufferedObjects.clear();
bufferedBytes = toInsert.objsize();
}
bufferedObjects.push_back(toInsert);
}
if (!bufferedObjects.empty())
spill(bufferedObjects);
switch (nextInput.getStatus()) {
case GetNextResult::ReturnStatus::kAdvanced: {
MONGO_UNREACHABLE; // We consumed all advances above.
}
case GetNextResult::ReturnStatus::kPauseExecution: {
return nextInput; // Propagate the pause.
}
case GetNextResult::ReturnStatus::kEOF: {
auto renameCommandObj =
BSON("renameCollection" << _tempNs.ns() << "to" << _outputNs.ns() << "dropTarget"
<< true);
auto status = _mongod->renameIfOptionsAndIndexesHaveNotChanged(
renameCommandObj, _outputNs, _originalOutOptions, _originalIndexes);
uassert(16997, str::stream() << "$out failed: " << status.reason(), status.isOK());
// We don't need to drop the temp collection in our destructor if the rename succeeded.
_tempNs = {};
_done = true;
// $out doesn't currently produce any outputs.
return nextInput;
}
}
MONGO_UNREACHABLE;
}
DocumentSourceOut::DocumentSourceOut(const NamespaceString& outputNs,
const intrusive_ptr& pExpCtx)
: DocumentSourceNeedsMongod(pExpCtx),
_done(false),
_tempNs(""), // Filled in during getNext().
_outputNs(outputNs) {}
intrusive_ptr DocumentSourceOut::createFromBson(
BSONElement elem, const intrusive_ptr& pExpCtx) {
uassert(16990,
str::stream() << "$out only supports a string argument, not " << typeName(elem.type()),
elem.type() == String);
uassert(ErrorCodes::InvalidOptions,
"$out can only be used with the 'local' read concern level",
!pExpCtx->opCtx->recoveryUnit()->isReadingFromMajorityCommittedSnapshot());
NamespaceString outputNs(pExpCtx->ns.db().toString() + '.' + elem.str());
uassert(17385, "Can't $out to special collection: " + elem.str(), !outputNs.isSpecial());
return new DocumentSourceOut(outputNs, pExpCtx);
}
Value DocumentSourceOut::serialize(boost::optional explain) const {
massert(
17000, "$out shouldn't have different db than input", _outputNs.db() == pExpCtx->ns.db());
return Value(DOC(getSourceName() << _outputNs.coll()));
}
DocumentSource::GetDepsReturn DocumentSourceOut::getDependencies(DepsTracker* deps) const {
deps->needWholeDocument = true;
return EXHAUSTIVE_ALL;
}
}