diff options
author | Ted Tuckman <ted.tuckman@mongodb.com> | 2019-11-15 19:50:52 +0000 |
---|---|---|
committer | evergreen <evergreen@mongodb.com> | 2019-11-15 19:50:52 +0000 |
commit | 6258369bc2d74e69a5e1fd8e025a291550aeb368 (patch) | |
tree | be895955944d025eb4dd94f1771ef883cb3515cc /src/mongo/db/pipeline/document_source_out.cpp | |
parent | ab0e1e6875faa56155eb4777be66b575f6b48395 (diff) | |
download | mongo-6258369bc2d74e69a5e1fd8e025a291550aeb368.tar.gz |
SERVER-42693 Add renameAndPreserveOptions command and allow $out to output to different DB
Diffstat (limited to 'src/mongo/db/pipeline/document_source_out.cpp')
-rw-r--r-- | src/mongo/db/pipeline/document_source_out.cpp | 95 |
1 files changed, 60 insertions, 35 deletions
diff --git a/src/mongo/db/pipeline/document_source_out.cpp b/src/mongo/db/pipeline/document_source_out.cpp index 860d9028bec..fc390947621 100644 --- a/src/mongo/db/pipeline/document_source_out.cpp +++ b/src/mongo/db/pipeline/document_source_out.cpp @@ -41,16 +41,18 @@ #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/util/destructor_guard.h" #include "mongo/util/log.h" +#include "mongo/util/uuid.h" namespace mongo { using namespace fmt::literals; -static AtomicWord<unsigned> aggOutCounter; - MONGO_FAIL_POINT_DEFINE(hangWhileBuildingDocumentSourceOutBatch); REGISTER_DOCUMENT_SOURCE(out, DocumentSourceOut::LiteParsed::parse, DocumentSourceOut::createFromBson); +REGISTER_DOCUMENT_SOURCE(internalOutToDifferentDB, + DocumentSourceOut::LiteParsed::parseToDifferentDB, + DocumentSourceOut::createFromBsonToDifferentDB); DocumentSourceOut::~DocumentSourceOut() { DESTRUCTOR_GUARD( @@ -72,18 +74,42 @@ DocumentSourceOut::~DocumentSourceOut() { [this] { pExpCtx->mongoProcessInterface->setOperationContext(pExpCtx->opCtx); }); pExpCtx->mongoProcessInterface->setOperationContext(cleanupOpCtx.get()); - pExpCtx->mongoProcessInterface->directClient()->dropCollection(_tempNs.ns()); + pExpCtx->mongoProcessInterface->dropCollection(cleanupOpCtx.get(), _tempNs); }); } +std::unique_ptr<DocumentSourceOut::LiteParsed> DocumentSourceOut::LiteParsed::parseToDifferentDB( + const AggregationRequest& request, const BSONElement& spec) { + + auto specObj = spec.Obj(); + auto dbElem = specObj["db"]; + auto collElem = specObj["coll"]; + uassert(16994, + str::stream() << kStageName << " must have db and coll string arguments", + dbElem.type() == BSONType::String && collElem.type() == BSONType::String); + NamespaceString targetNss{dbElem.String(), collElem.String()}; + uassert(ErrorCodes::InvalidNamespace, + "Invalid {} target namespace, {}"_format(kStageName, targetNss.ns()), + targetNss.isValid()); + + ActionSet actions{ActionType::insert, ActionType::remove}; + if (request.shouldBypassDocumentValidation()) { + actions.addAction(ActionType::bypassDocumentValidation); + } + + PrivilegeVector privileges{Privilege(ResourcePattern::forExactNamespace(targetNss), actions)}; + + return std::make_unique<DocumentSourceOut::LiteParsed>(std::move(targetNss), + std::move(privileges)); +} + std::unique_ptr<DocumentSourceOut::LiteParsed> DocumentSourceOut::LiteParsed::parse( const AggregationRequest& request, const BSONElement& spec) { - uassert(ErrorCodes::TypeMismatch, - "{} stage requires a string argument, but found {}"_format(kStageName, - typeName(spec.type())), + uassert(16990, + "{} only supports a string argument, but found {}"_format(kStageName, + typeName(spec.type())), spec.type() == BSONType::String); - NamespaceString targetNss{request.getNamespaceString().db(), spec.valueStringData()}; uassert(ErrorCodes::InvalidNamespace, "Invalid {} target namespace, {}"_format(kStageName, targetNss.ns()), @@ -103,16 +129,19 @@ std::unique_ptr<DocumentSourceOut::LiteParsed> DocumentSourceOut::LiteParsed::pa void DocumentSourceOut::initialize() { DocumentSourceWriteBlock writeBlock(pExpCtx->opCtx); - DBClientBase* conn = pExpCtx->mongoProcessInterface->directClient(); - const auto& outputNs = getOutputNs(); - _tempNs = NamespaceString(str::stream() - << outputNs.db() << ".tmp.agg_out." << aggOutCounter.addAndFetch(1)); + // We will write all results into a temporary collection, then rename the temporary collection + // to be the target collection once we are done. + _tempNs = NamespaceString(str::stream() << outputNs.db() << ".tmp.agg_out." << UUID::gen()); // Save the original collection options and index specs so we can check they didn't change // during computation. - _originalOutOptions = pExpCtx->mongoProcessInterface->getCollectionOptions(outputNs); - _originalIndexes = conn->getIndexSpecs(outputNs); + _originalOutOptions = + // The uuid field is considered an option, but cannot be passed to createCollection. + pExpCtx->mongoProcessInterface->getCollectionOptions(pExpCtx->opCtx, outputNs) + .removeField("uuid"); + _originalIndexes = pExpCtx->mongoProcessInterface->getIndexSpecs( + pExpCtx->opCtx, outputNs, false /* includeBuildUUIDs */); // Check if it's capped to make sure we have a chance of succeeding before we do all the work. // If the collection becomes capped during processing, the collection options will have changed, @@ -121,11 +150,6 @@ void DocumentSourceOut::initialize() { "namespace '{}' is capped so it can't be used for {}"_format(outputNs.ns(), kStageName), _originalOutOptions["capped"].eoo()); - // We will write all results into a temporary collection, then rename the temporary - // collection to be the target collection once we are done. - _tempNs = NamespaceString(str::stream() - << outputNs.db() << ".tmp.agg_out." << aggOutCounter.addAndFetch(1)); - // Create temp collection, copying options from the existing output collection if any. { BSONObjBuilder cmd; @@ -133,11 +157,8 @@ void DocumentSourceOut::initialize() { cmd << "temp" << true; cmd.appendElementsUnique(_originalOutOptions); - BSONObj info; - uassert(16994, - "failed to create temporary {} collection '{}': {}"_format( - kStageName, _tempNs.ns(), getStatusFromCommandResult(info).reason()), - conn->runCommand(outputNs.db().toString(), cmd.done(), info)); + pExpCtx->mongoProcessInterface->createCollection( + pExpCtx->opCtx, _tempNs.db().toString(), cmd.done()); } if (_originalIndexes.empty()) { @@ -148,7 +169,7 @@ void DocumentSourceOut::initialize() { try { std::vector<BSONObj> tempNsIndexes = {std::begin(_originalIndexes), std::end(_originalIndexes)}; - conn->createIndexes(_tempNs.ns(), tempNsIndexes); + pExpCtx->mongoProcessInterface->createIndexes(pExpCtx->opCtx, _tempNs, tempNsIndexes); } catch (DBException& ex) { ex.addContext("Copying indexes for $out failed"); throw; @@ -177,7 +198,11 @@ boost::intrusive_ptr<DocumentSource> DocumentSourceOut::create( "{} is not supported when the output collection is in a different " "database"_format(kStageName), outputNs.db() == expCtx->ns.db()); + return createAndAllowDifferentDB(outputNs, expCtx); +} +boost::intrusive_ptr<DocumentSource> DocumentSourceOut::createAndAllowDifferentDB( + NamespaceString outputNs, const boost::intrusive_ptr<ExpressionContext>& expCtx) { uassert(ErrorCodes::OperationNotSupportedInTransaction, "{} cannot be used in a transaction"_format(kStageName), !expCtx->inMultiDocumentTransaction); @@ -191,10 +216,6 @@ boost::intrusive_ptr<DocumentSource> DocumentSourceOut::create( "Invalid {} target namespace, {}"_format(kStageName, outputNs.ns()), outputNs.isValid()); - uassert(17017, - "{} is not supported to an existing *sharded* output collection"_format(kStageName), - !expCtx->mongoProcessInterface->isSharded(expCtx->opCtx, outputNs)); - uassert(17385, "Can't {} to special collection: {}"_format(kStageName, outputNs.coll()), !outputNs.isSystem()); @@ -208,20 +229,24 @@ boost::intrusive_ptr<DocumentSource> DocumentSourceOut::create( boost::intrusive_ptr<DocumentSource> DocumentSourceOut::createFromBson( BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx) { - uassert(16990, + uassert(31278, "{} only supports a string argument, but found {}"_format(kStageName, typeName(elem.type())), elem.type() == BSONType::String); - return create({expCtx->ns.db(), elem.str()}, expCtx); } -Value DocumentSourceOut::serialize(boost::optional<ExplainOptions::Verbosity> explain) const { - massert(17000, - "{} shouldn't have different db than input"_format(kStageName), - _outputNs.db() == pExpCtx->ns.db()); +boost::intrusive_ptr<DocumentSource> DocumentSourceOut::createFromBsonToDifferentDB( + BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx) { - return Value(DOC(getSourceName() << _outputNs.coll())); + auto nsObj = elem.Obj(); + return createAndAllowDifferentDB(NamespaceString(nsObj["db"].String(), nsObj["coll"].String()), + expCtx); +} +Value DocumentSourceOut::serialize(boost::optional<ExplainOptions::Verbosity> explain) const { + return _toDifferentDB + ? Value(DOC(getSourceName() << DOC("db" << _outputNs.db() << "coll" << _outputNs.coll()))) + : Value(DOC(getSourceName() << _outputNs.coll())); } void DocumentSourceOut::waitWhileFailPointEnabled() { |