/**
* Copyright 2011 (c) 10gen Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#include "mongo/platform/basic.h"
#include "mongo/db/pipeline/document_source.h"
#include "mongo/util/destructor_guard.h"
namespace mongo {
using boost::intrusive_ptr;
using std::vector;
DocumentSourceOut::~DocumentSourceOut() {
DESTRUCTOR_GUARD(
// Make sure we drop the temp collection if anything goes wrong. Errors are ignored
// here because nothing can be done about them. Additionally, if this fails and the
// collection is left behind, it will be cleaned up next time the server is started.
if (_mongod && _tempNs.size()) _mongod->directClient()->dropCollection(_tempNs.ns());)
}
REGISTER_DOCUMENT_SOURCE(out, DocumentSourceOut::createFromBson);
const char* DocumentSourceOut::getSourceName() const {
return "$out";
}
static AtomicUInt32 aggOutCounter;
void DocumentSourceOut::prepTempCollection() {
verify(_mongod);
verify(_tempNs.size() == 0);
DBClientBase* conn = _mongod->directClient();
// Fail early by checking before we do any work.
uassert(17017,
str::stream() << "namespace '" << _outputNs.ns()
<< "' is sharded so it can't be used for $out'",
!_mongod->isSharded(_outputNs));
// cannot $out to capped collection
uassert(17152,
str::stream() << "namespace '" << _outputNs.ns()
<< "' is capped so it can't be used for $out",
!_mongod->isCapped(_outputNs));
_tempNs = NamespaceString(StringData(str::stream() << _outputNs.db() << ".tmp.agg_out."
<< aggOutCounter.addAndFetch(1)));
// Create output collection, copying options from existing collection if any.
{
const auto infos =
conn->getCollectionInfos(_outputNs.db().toString(), BSON("name" << _outputNs.coll()));
const auto options = infos.empty() ? BSONObj() : infos.front().getObjectField("options");
BSONObjBuilder cmd;
cmd << "create" << _tempNs.coll();
cmd << "temp" << true;
cmd.appendElementsUnique(options);
BSONObj info;
bool ok = conn->runCommand(_outputNs.db().toString(), cmd.done(), info);
uassert(16994,
str::stream() << "failed to create temporary $out collection '" << _tempNs.ns()
<< "': "
<< info.toString(),
ok);
}
// copy indexes on _outputNs to _tempNs
const std::list indexes = conn->getIndexSpecs(_outputNs.ns());
for (std::list::const_iterator it = indexes.begin(); it != indexes.end(); ++it) {
MutableDocument index((Document(*it)));
index.remove("_id"); // indexes shouldn't have _ids but some existing ones do
index["ns"] = Value(_tempNs.ns());
BSONObj indexBson = index.freeze().toBson();
conn->insert(_tempNs.getSystemIndexesCollection(), indexBson);
BSONObj err = conn->getLastErrorDetailed();
uassert(16995,
str::stream() << "copying index for $out failed."
<< " index: "
<< indexBson
<< " error: "
<< err,
DBClientWithCommands::getLastErrorString(err).empty());
}
}
void DocumentSourceOut::spill(const vector& toInsert) {
BSONObj err = _mongod->insert(_tempNs, toInsert);
uassert(16996,
str::stream() << "insert for $out failed: " << err,
DBClientWithCommands::getLastErrorString(err).empty());
}
boost::optional DocumentSourceOut::getNext() {
pExpCtx->checkForInterrupt();
// make sure we only write out once
if (_done)
return boost::none;
_done = true;
verify(_mongod);
DBClientBase* conn = _mongod->directClient();
prepTempCollection();
verify(_tempNs.size() != 0);
vector bufferedObjects;
int bufferedBytes = 0;
while (boost::optional next = pSource->getNext()) {
BSONObj toInsert = next->toBson();
bufferedBytes += toInsert.objsize();
if (!bufferedObjects.empty() && bufferedBytes > BSONObjMaxUserSize) {
spill(bufferedObjects);
bufferedObjects.clear();
bufferedBytes = toInsert.objsize();
}
bufferedObjects.push_back(toInsert);
}
if (!bufferedObjects.empty())
spill(bufferedObjects);
// Checking again to make sure we didn't become sharded while running.
uassert(17018,
str::stream() << "namespace '" << _outputNs.ns()
<< "' became sharded so it can't be used for $out'",
!_mongod->isSharded(_outputNs));
BSONObj rename =
BSON("renameCollection" << _tempNs.ns() << "to" << _outputNs.ns() << "dropTarget" << true);
BSONObj info;
bool ok = conn->runCommand("admin", rename, info);
uassert(16997, str::stream() << "renameCollection for $out failed: " << info, ok);
// We don't need to drop the temp collection in our destructor if the rename succeeded.
_tempNs = NamespaceString("");
// This "DocumentSource" doesn't produce output documents. This can change in the future
// if we support using $out in "tee" mode.
return boost::none;
}
DocumentSourceOut::DocumentSourceOut(const NamespaceString& outputNs,
const intrusive_ptr& pExpCtx)
: DocumentSourceNeedsMongod(pExpCtx),
_done(false),
_tempNs("") // filled in by prepTempCollection
,
_outputNs(outputNs) {}
intrusive_ptr DocumentSourceOut::createFromBson(
BSONElement elem, const intrusive_ptr& pExpCtx) {
uassert(16990,
str::stream() << "$out only supports a string argument, not " << typeName(elem.type()),
elem.type() == String);
uassert(ErrorCodes::InvalidOptions,
"$out can only be used with the 'local' read concern level",
!pExpCtx->opCtx->recoveryUnit()->isReadingFromMajorityCommittedSnapshot());
NamespaceString outputNs(pExpCtx->ns.db().toString() + '.' + elem.str());
uassert(17385, "Can't $out to special collection: " + elem.str(), !outputNs.isSpecial());
return new DocumentSourceOut(outputNs, pExpCtx);
}
Value DocumentSourceOut::serialize(bool explain) const {
massert(
17000, "$out shouldn't have different db than input", _outputNs.db() == pExpCtx->ns.db());
return Value(DOC(getSourceName() << _outputNs.coll()));
}
DocumentSource::GetDepsReturn DocumentSourceOut::getDependencies(DepsTracker* deps) const {
deps->needWholeDocument = true;
return EXHAUSTIVE_ALL;
}
}