summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline/document_source_out.cpp
diff options
context:
space:
mode:
authorTed Tuckman <ted.tuckman@mongodb.com>2019-11-15 19:50:52 +0000
committerevergreen <evergreen@mongodb.com>2019-11-15 19:50:52 +0000
commit6258369bc2d74e69a5e1fd8e025a291550aeb368 (patch)
treebe895955944d025eb4dd94f1771ef883cb3515cc /src/mongo/db/pipeline/document_source_out.cpp
parentab0e1e6875faa56155eb4777be66b575f6b48395 (diff)
downloadmongo-6258369bc2d74e69a5e1fd8e025a291550aeb368.tar.gz
SERVER-42693 Add renameAndPreserveOptions command and allow $out to output to different DB
Diffstat (limited to 'src/mongo/db/pipeline/document_source_out.cpp')
-rw-r--r--src/mongo/db/pipeline/document_source_out.cpp95
1 files changed, 60 insertions, 35 deletions
diff --git a/src/mongo/db/pipeline/document_source_out.cpp b/src/mongo/db/pipeline/document_source_out.cpp
index 860d9028bec..fc390947621 100644
--- a/src/mongo/db/pipeline/document_source_out.cpp
+++ b/src/mongo/db/pipeline/document_source_out.cpp
@@ -41,16 +41,18 @@
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/util/destructor_guard.h"
#include "mongo/util/log.h"
+#include "mongo/util/uuid.h"
namespace mongo {
using namespace fmt::literals;
-static AtomicWord<unsigned> aggOutCounter;
-
MONGO_FAIL_POINT_DEFINE(hangWhileBuildingDocumentSourceOutBatch);
REGISTER_DOCUMENT_SOURCE(out,
DocumentSourceOut::LiteParsed::parse,
DocumentSourceOut::createFromBson);
+REGISTER_DOCUMENT_SOURCE(internalOutToDifferentDB,
+ DocumentSourceOut::LiteParsed::parseToDifferentDB,
+ DocumentSourceOut::createFromBsonToDifferentDB);
DocumentSourceOut::~DocumentSourceOut() {
DESTRUCTOR_GUARD(
@@ -72,18 +74,42 @@ DocumentSourceOut::~DocumentSourceOut() {
[this] { pExpCtx->mongoProcessInterface->setOperationContext(pExpCtx->opCtx); });
pExpCtx->mongoProcessInterface->setOperationContext(cleanupOpCtx.get());
- pExpCtx->mongoProcessInterface->directClient()->dropCollection(_tempNs.ns());
+ pExpCtx->mongoProcessInterface->dropCollection(cleanupOpCtx.get(), _tempNs);
});
}
+std::unique_ptr<DocumentSourceOut::LiteParsed> DocumentSourceOut::LiteParsed::parseToDifferentDB(
+ const AggregationRequest& request, const BSONElement& spec) {
+
+ auto specObj = spec.Obj();
+ auto dbElem = specObj["db"];
+ auto collElem = specObj["coll"];
+ uassert(16994,
+ str::stream() << kStageName << " must have db and coll string arguments",
+ dbElem.type() == BSONType::String && collElem.type() == BSONType::String);
+ NamespaceString targetNss{dbElem.String(), collElem.String()};
+ uassert(ErrorCodes::InvalidNamespace,
+ "Invalid {} target namespace, {}"_format(kStageName, targetNss.ns()),
+ targetNss.isValid());
+
+ ActionSet actions{ActionType::insert, ActionType::remove};
+ if (request.shouldBypassDocumentValidation()) {
+ actions.addAction(ActionType::bypassDocumentValidation);
+ }
+
+ PrivilegeVector privileges{Privilege(ResourcePattern::forExactNamespace(targetNss), actions)};
+
+ return std::make_unique<DocumentSourceOut::LiteParsed>(std::move(targetNss),
+ std::move(privileges));
+}
+
std::unique_ptr<DocumentSourceOut::LiteParsed> DocumentSourceOut::LiteParsed::parse(
const AggregationRequest& request, const BSONElement& spec) {
- uassert(ErrorCodes::TypeMismatch,
- "{} stage requires a string argument, but found {}"_format(kStageName,
- typeName(spec.type())),
+ uassert(16990,
+ "{} only supports a string argument, but found {}"_format(kStageName,
+ typeName(spec.type())),
spec.type() == BSONType::String);
-
NamespaceString targetNss{request.getNamespaceString().db(), spec.valueStringData()};
uassert(ErrorCodes::InvalidNamespace,
"Invalid {} target namespace, {}"_format(kStageName, targetNss.ns()),
@@ -103,16 +129,19 @@ std::unique_ptr<DocumentSourceOut::LiteParsed> DocumentSourceOut::LiteParsed::pa
void DocumentSourceOut::initialize() {
DocumentSourceWriteBlock writeBlock(pExpCtx->opCtx);
- DBClientBase* conn = pExpCtx->mongoProcessInterface->directClient();
-
const auto& outputNs = getOutputNs();
- _tempNs = NamespaceString(str::stream()
- << outputNs.db() << ".tmp.agg_out." << aggOutCounter.addAndFetch(1));
+ // We will write all results into a temporary collection, then rename the temporary collection
+ // to be the target collection once we are done.
+ _tempNs = NamespaceString(str::stream() << outputNs.db() << ".tmp.agg_out." << UUID::gen());
// Save the original collection options and index specs so we can check they didn't change
// during computation.
- _originalOutOptions = pExpCtx->mongoProcessInterface->getCollectionOptions(outputNs);
- _originalIndexes = conn->getIndexSpecs(outputNs);
+ _originalOutOptions =
+ // The uuid field is considered an option, but cannot be passed to createCollection.
+ pExpCtx->mongoProcessInterface->getCollectionOptions(pExpCtx->opCtx, outputNs)
+ .removeField("uuid");
+ _originalIndexes = pExpCtx->mongoProcessInterface->getIndexSpecs(
+ pExpCtx->opCtx, outputNs, false /* includeBuildUUIDs */);
// Check if it's capped to make sure we have a chance of succeeding before we do all the work.
// If the collection becomes capped during processing, the collection options will have changed,
@@ -121,11 +150,6 @@ void DocumentSourceOut::initialize() {
"namespace '{}' is capped so it can't be used for {}"_format(outputNs.ns(), kStageName),
_originalOutOptions["capped"].eoo());
- // We will write all results into a temporary collection, then rename the temporary
- // collection to be the target collection once we are done.
- _tempNs = NamespaceString(str::stream()
- << outputNs.db() << ".tmp.agg_out." << aggOutCounter.addAndFetch(1));
-
// Create temp collection, copying options from the existing output collection if any.
{
BSONObjBuilder cmd;
@@ -133,11 +157,8 @@ void DocumentSourceOut::initialize() {
cmd << "temp" << true;
cmd.appendElementsUnique(_originalOutOptions);
- BSONObj info;
- uassert(16994,
- "failed to create temporary {} collection '{}': {}"_format(
- kStageName, _tempNs.ns(), getStatusFromCommandResult(info).reason()),
- conn->runCommand(outputNs.db().toString(), cmd.done(), info));
+ pExpCtx->mongoProcessInterface->createCollection(
+ pExpCtx->opCtx, _tempNs.db().toString(), cmd.done());
}
if (_originalIndexes.empty()) {
@@ -148,7 +169,7 @@ void DocumentSourceOut::initialize() {
try {
std::vector<BSONObj> tempNsIndexes = {std::begin(_originalIndexes),
std::end(_originalIndexes)};
- conn->createIndexes(_tempNs.ns(), tempNsIndexes);
+ pExpCtx->mongoProcessInterface->createIndexes(pExpCtx->opCtx, _tempNs, tempNsIndexes);
} catch (DBException& ex) {
ex.addContext("Copying indexes for $out failed");
throw;
@@ -177,7 +198,11 @@ boost::intrusive_ptr<DocumentSource> DocumentSourceOut::create(
"{} is not supported when the output collection is in a different "
"database"_format(kStageName),
outputNs.db() == expCtx->ns.db());
+ return createAndAllowDifferentDB(outputNs, expCtx);
+}
+boost::intrusive_ptr<DocumentSource> DocumentSourceOut::createAndAllowDifferentDB(
+ NamespaceString outputNs, const boost::intrusive_ptr<ExpressionContext>& expCtx) {
uassert(ErrorCodes::OperationNotSupportedInTransaction,
"{} cannot be used in a transaction"_format(kStageName),
!expCtx->inMultiDocumentTransaction);
@@ -191,10 +216,6 @@ boost::intrusive_ptr<DocumentSource> DocumentSourceOut::create(
"Invalid {} target namespace, {}"_format(kStageName, outputNs.ns()),
outputNs.isValid());
- uassert(17017,
- "{} is not supported to an existing *sharded* output collection"_format(kStageName),
- !expCtx->mongoProcessInterface->isSharded(expCtx->opCtx, outputNs));
-
uassert(17385,
"Can't {} to special collection: {}"_format(kStageName, outputNs.coll()),
!outputNs.isSystem());
@@ -208,20 +229,24 @@ boost::intrusive_ptr<DocumentSource> DocumentSourceOut::create(
boost::intrusive_ptr<DocumentSource> DocumentSourceOut::createFromBson(
BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx) {
- uassert(16990,
+ uassert(31278,
"{} only supports a string argument, but found {}"_format(kStageName,
typeName(elem.type())),
elem.type() == BSONType::String);
-
return create({expCtx->ns.db(), elem.str()}, expCtx);
}
-Value DocumentSourceOut::serialize(boost::optional<ExplainOptions::Verbosity> explain) const {
- massert(17000,
- "{} shouldn't have different db than input"_format(kStageName),
- _outputNs.db() == pExpCtx->ns.db());
+boost::intrusive_ptr<DocumentSource> DocumentSourceOut::createFromBsonToDifferentDB(
+ BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx) {
- return Value(DOC(getSourceName() << _outputNs.coll()));
+ auto nsObj = elem.Obj();
+ return createAndAllowDifferentDB(NamespaceString(nsObj["db"].String(), nsObj["coll"].String()),
+ expCtx);
+}
+Value DocumentSourceOut::serialize(boost::optional<ExplainOptions::Verbosity> explain) const {
+ return _toDifferentDB
+ ? Value(DOC(getSourceName() << DOC("db" << _outputNs.db() << "coll" << _outputNs.coll())))
+ : Value(DOC(getSourceName() << _outputNs.coll()));
}
void DocumentSourceOut::waitWhileFailPointEnabled() {