diff options
Diffstat (limited to 'src/mongo/db/exec')
-rw-r--r-- | src/mongo/db/exec/update_stage.cpp | 339 | ||||
-rw-r--r-- | src/mongo/db/exec/update_stage.h | 104 | ||||
-rw-r--r-- | src/mongo/db/exec/upsert_stage.cpp | 301 | ||||
-rw-r--r-- | src/mongo/db/exec/upsert_stage.h | 76 |
4 files changed, 470 insertions, 350 deletions
diff --git a/src/mongo/db/exec/update_stage.cpp b/src/mongo/db/exec/update_stage.cpp index a7b307bf49c..10696c6cb65 100644 --- a/src/mongo/db/exec/update_stage.cpp +++ b/src/mongo/db/exec/update_stage.cpp @@ -39,7 +39,6 @@ #include "mongo/bson/bson_comparator_interface_base.h" #include "mongo/bson/mutable/algorithm.h" #include "mongo/db/concurrency/write_conflict_exception.h" -#include "mongo/db/curop_failpoint_helpers.h" #include "mongo/db/exec/scoped_timer.h" #include "mongo/db/exec/working_set_common.h" #include "mongo/db/exec/write_stage_common.h" @@ -57,11 +56,9 @@ #include "mongo/util/assert_util.h" #include "mongo/util/log.h" #include "mongo/util/scopeguard.h" -#include "mongo/util/transitional_tools_do_not_use/vector_spooling.h" namespace mongo { -MONGO_FAIL_POINT_DEFINE(hangBeforeUpsertPerformsInsert); MONGO_FAIL_POINT_DEFINE(hangBeforeThrowWouldChangeOwningShard); using std::string; @@ -76,59 +73,13 @@ namespace { const char idFieldName[] = "_id"; const FieldRef idFieldRef(idFieldName); -Status ensureIdFieldIsFirst(mb::Document* doc) { - mb::Element idElem = mb::findFirstChildNamed(doc->root(), idFieldName); - - if (!idElem.ok()) { - return {ErrorCodes::InvalidIdField, "_id field is missing"}; - } - - if (idElem.leftSibling().ok()) { - // Move '_id' to be the first element - Status s = idElem.remove(); - if (!s.isOK()) - return s; - s = doc->root().pushFront(idElem); - if (!s.isOK()) - return s; - } - - return Status::OK(); -} - void addObjectIDIdField(mb::Document* doc) { const auto idElem = doc->makeElementNewOID(idFieldName); - if (!idElem.ok()) - uasserted(17268, "Could not create new ObjectId '_id' field."); - + uassert(17268, "Could not create new ObjectId '_id' field.", idElem.ok()); uassertStatusOK(doc->root().pushFront(idElem)); } /** - * Uasserts if any of the paths in 'requiredPaths' are not present in 'document', or if they are - * arrays or array descendants. - */ -void assertRequiredPathsPresent(const mb::Document& document, const FieldRefSet& requiredPaths) { - for (const auto& path : requiredPaths) { - auto elem = document.root(); - for (size_t i = 0; i < (*path).numParts(); ++i) { - elem = elem[(*path).getPart(i)]; - uassert(ErrorCodes::NoSuchKey, - str::stream() << "After applying the update, the new document was missing the " - "required field '" - << (*path).dottedField() << "'", - elem.ok()); - uassert( - ErrorCodes::NotSingleValueField, - str::stream() << "After applying the update to the document, the required field '" - << (*path).dottedField() - << "' was found to be an array or array descendant.", - elem.getType() != BSONType::Array); - } - } -} - -/** * Returns true if we should throw a WriteConflictException in order to retry the operation in the * case of a conflict. Returns false if we should skip the document and keep going. */ @@ -158,19 +109,30 @@ const char* UpdateStage::kStageType = "UPDATE"; const UpdateStats UpdateStage::kEmptyUpdateStats; +// Public constructor. UpdateStage::UpdateStage(OperationContext* opCtx, const UpdateStageParams& params, WorkingSet* ws, Collection* collection, PlanStage* child) + : UpdateStage(opCtx, params, ws, collection) { + // We should never reach here if the request is an upsert. + invariant(!_params.request->isUpsert()); + _children.emplace_back(child); +} + +// Protected constructor. +UpdateStage::UpdateStage(OperationContext* opCtx, + const UpdateStageParams& params, + WorkingSet* ws, + Collection* collection) : RequiresMutableCollectionStage(kStageType, opCtx, collection), _params(params), _ws(ws), + _doc(params.driver->getDocument()), _idRetrying(WorkingSet::INVALID_ID), _idReturning(WorkingSet::INVALID_ID), - _updatedRecordIds(params.request->isMulti() ? new RecordIdSet() : NULL), - _doc(params.driver->getDocument()) { - _children.emplace_back(child); + _updatedRecordIds(params.request->isMulti() ? new RecordIdSet() : nullptr) { // Should the modifiers validate their embedded docs via storage_validation::storageValid()? // Only user updates should be checked. Any system or replication stuff should pass through. @@ -226,9 +188,7 @@ BSONObj UpdateStage::transformAndUpdate(const Snapshotted<BSONObj>& oldObj, Reco if (getOpCtx()->writesAreReplicated() && !request->isFromMigration()) { if (metadata->isSharded() && (!OperationShardingState::isOperationVersioned(getOpCtx()) || !isFCV42)) { - auto& immutablePathsVector = metadata->getKeyPatternFields(); - immutablePaths.fillFrom( - transitional_tools_do_not_use::unspool_vector(immutablePathsVector)); + immutablePaths.fillFrom(metadata->getKeyPatternFields()); } immutablePaths.keepShortest(&idFieldRef); } @@ -270,16 +230,8 @@ BSONObj UpdateStage::transformAndUpdate(const Snapshotted<BSONObj>& oldObj, Reco // neither grow nor shrink). const auto createIdField = !collection()->isCapped(); - // Ensure if _id exists it is first - status = ensureIdFieldIsFirst(&_doc); - if (status.code() == ErrorCodes::InvalidIdField) { - // Create ObjectId _id field if we are doing that - if (createIdField) { - addObjectIDIdField(&_doc); - } - } else { - uassertStatusOK(status); - } + // Ensure _id is first if it exists, and generate a new OID if appropriate. + _ensureIdFieldIsFirst(&_doc, createIdField); // See if the changes were applied in place const char* source = NULL; @@ -394,99 +346,6 @@ BSONObj UpdateStage::transformAndUpdate(const Snapshotted<BSONObj>& oldObj, Reco return newObj; } -BSONObj UpdateStage::applyUpdateOpsForInsert(OperationContext* opCtx, - const CanonicalQuery* cq, - const BSONObj& query, - UpdateDriver* driver, - mutablebson::Document* doc, - bool isInternalRequest, - const NamespaceString& ns, - bool enforceOkForStorage, - UpdateStats* stats) { - // Since this is an insert (no docs found and upsert:true), we will be logging it - // as an insert in the oplog. We don't need the driver's help to build the - // oplog record, then. We also set the context of the update driver to the INSERT_CONTEXT. - // Some mods may only work in that context (e.g. $setOnInsert). - driver->setLogOp(false); - - auto* const css = CollectionShardingState::get(opCtx, ns); - auto metadata = css->getCurrentMetadata(); - - const auto isFCV42 = serverGlobalParams.featureCompatibility.isVersionInitialized() && - serverGlobalParams.featureCompatibility.getVersion() == - ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo42; - - FieldRefSet immutablePaths; - if (metadata->isSharded() && - (!OperationShardingState::isOperationVersioned(opCtx) || !isFCV42)) { - auto& immutablePathsVector = metadata->getKeyPatternFields(); - immutablePaths.fillFrom( - transitional_tools_do_not_use::unspool_vector(immutablePathsVector)); - } - immutablePaths.keepShortest(&idFieldRef); - - if (cq) { - FieldRefSet requiredPaths; - if (metadata->isSharded()) { - const auto& shardKeyPathsVector = metadata->getKeyPatternFields(); - requiredPaths.fillFrom( - transitional_tools_do_not_use::unspool_vector(shardKeyPathsVector)); - } - requiredPaths.keepShortest(&idFieldRef); - uassertStatusOK(driver->populateDocumentWithQueryFields(*cq, requiredPaths, *doc)); - } else { - fassert(17354, CanonicalQuery::isSimpleIdQuery(query)); - BSONElement idElt = query[idFieldName]; - fassert(17352, doc->root().appendElement(idElt)); - } - - // Apply the update modifications here. Do not validate for storage, since we will validate the - // entire document after the update. However, we ensure that no immutable fields are updated. - const bool validateForStorage = false; - const bool isInsert = true; - if (isInternalRequest) { - immutablePaths.clear(); - } - Status updateStatus = - driver->update(StringData(), doc, validateForStorage, immutablePaths, isInsert); - if (!updateStatus.isOK()) { - uasserted(16836, updateStatus.reason()); - } - - // Ensure _id exists and is first - auto idAndFirstStatus = ensureIdFieldIsFirst(doc); - if (idAndFirstStatus.code() == ErrorCodes::InvalidIdField) { // _id field is missing - addObjectIDIdField(doc); - } else { - uassertStatusOK(idAndFirstStatus); - } - - // Validate that the object replacement or modifiers resulted in a document - // that contains all the required keys and can be stored if it isn't coming - // from a migration or via replication. - if (!isInternalRequest) { - if (enforceOkForStorage) { - storage_validation::storageValid(*doc); - } - FieldRefSet requiredPaths; - if (metadata->isSharded()) { - const auto& shardKeyPathsVector = metadata->getKeyPatternFields(); - requiredPaths.fillFrom( - transitional_tools_do_not_use::unspool_vector(shardKeyPathsVector)); - } - requiredPaths.keepShortest(&idFieldRef); - assertRequiredPathsPresent(*doc, requiredPaths); - } - - BSONObj newObj = doc->getObject(); - if (newObj.objsize() > BSONObjMaxUserSize) { - uasserted(17420, - str::stream() << "Document to upsert is larger than " << BSONObjMaxUserSize); - } - - return newObj; -} - bool UpdateStage::matchContainsOnlyAndedEqualityNodes(const MatchExpression& root) { if (root.matchType() == MatchExpression::EQ) { return true; @@ -563,131 +422,18 @@ bool UpdateStage::shouldRetryDuplicateKeyException(const ParsedUpdate& parsedUpd return true; } -void UpdateStage::doInsert() { - _specificStats.inserted = true; - - const UpdateRequest* request = _params.request; - bool isInternalRequest = !getOpCtx()->writesAreReplicated() || request->isFromMigration(); - - // Reset the document we will be writing to. - _doc.reset(); - - BSONObj newObj = applyUpdateOpsForInsert(getOpCtx(), - _params.canonicalQuery, - request->getQuery(), - _params.driver, - &_doc, - isInternalRequest, - request->getNamespaceString(), - _enforceOkForStorage, - &_specificStats); - - _specificStats.objInserted = newObj; - - // If this is an explain, bail out now without doing the insert. - if (request->isExplain()) { - return; - } - - // If in FCV 4.2 and this collection is sharded, check if the doc we plan to insert belongs to - // this shard. MongoS uses the query field to target a shard, and it is possible the shard key - // fields in the 'q' field belong to this shard, but those in the 'u' field do not. In this case - // we need to throw so that MongoS can target the insert to the correct shard. - if (_shouldCheckForShardKeyUpdate) { - const auto isFCV42 = serverGlobalParams.featureCompatibility.isVersionInitialized() && - serverGlobalParams.featureCompatibility.getVersion() == - ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo42; - auto* const css = CollectionShardingState::get(getOpCtx(), collection()->ns()); - const auto& metadata = css->getCurrentMetadata(); - - if (isFCV42 && metadata->isSharded()) { - const ShardKeyPattern shardKeyPattern(metadata->getKeyPattern()); - auto newShardKey = shardKeyPattern.extractShardKeyFromDoc(newObj); - - if (!metadata->keyBelongsToMe(newShardKey)) { - // An attempt to upsert a document with a shard key value that belongs on another - // shard must either be a retryable write or inside a transaction. - uassert(ErrorCodes::IllegalOperation, - "The upsert document could not be inserted onto the shard targeted by the " - "query, since its shard key belongs on a different shard. Cross-shard " - "upserts are only allowed when running in a transaction or with " - "retryWrites: true.", - getOpCtx()->getTxnNumber()); - uasserted( - WouldChangeOwningShardInfo(request->getQuery(), newObj, true /* upsert */), - "The document we are inserting belongs on a different shard"); - } - } - } - - if (MONGO_FAIL_POINT(hangBeforeUpsertPerformsInsert)) { - CurOpFailpointHelpers::waitWhileFailPointEnabled( - &hangBeforeUpsertPerformsInsert, getOpCtx(), "hangBeforeUpsertPerformsInsert"); - } - - writeConflictRetry(getOpCtx(), "upsert", collection()->ns().ns(), [&] { - WriteUnitOfWork wunit(getOpCtx()); - uassertStatusOK(collection()->insertDocument(getOpCtx(), - InsertStatement(request->getStmtId(), newObj), - _params.opDebug, - request->isFromMigration())); - - // Technically, we should save/restore state here, but since we are going to return - // immediately after, it would just be wasted work. - wunit.commit(); - }); -} - -bool UpdateStage::doneUpdating() { +bool UpdateStage::isEOF() { // We're done updating if either the child has no more results to give us, or we've // already gotten a result back and we're not a multi-update. return _idRetrying == WorkingSet::INVALID_ID && _idReturning == WorkingSet::INVALID_ID && (child()->isEOF() || (_specificStats.nMatched > 0 && !_params.request->isMulti())); } -bool UpdateStage::needInsert() { - // We need to insert if - // 1) we haven't inserted already, - // 2) the child stage returned zero matches, and - // 3) the user asked for an upsert. - return !_specificStats.inserted && _specificStats.nMatched == 0 && _params.request->isUpsert(); -} - -bool UpdateStage::isEOF() { - return doneUpdating() && !needInsert(); -} - PlanStage::StageState UpdateStage::doWork(WorkingSetID* out) { if (isEOF()) { return PlanStage::IS_EOF; } - if (doneUpdating()) { - // Even if we're done updating, we may have some inserting left to do. - if (needInsert()) { - - doInsert(); - - invariant(isEOF()); - if (_params.request->shouldReturnNewDocs()) { - // Want to return the document we just inserted, create it as a WorkingSetMember - // so that we can return it. - BSONObj newObj = _specificStats.objInserted; - *out = _ws->allocate(); - WorkingSetMember* member = _ws->get(*out); - member->obj = Snapshotted<BSONObj>(getOpCtx()->recoveryUnit()->getSnapshotId(), - newObj.getOwned()); - member->transitionToOwnedObj(); - return PlanStage::ADVANCED; - } - } - - // At this point either we're done updating and there was no insert to do, - // or we're done updating and we're done inserting. Either way, we're EOF. - invariant(isEOF()); - return PlanStage::IS_EOF; - } - // It is possible that after an update was applied, a WriteConflictException // occurred and prevented us from returning ADVANCED with the requested version // of the document. @@ -834,9 +580,8 @@ PlanStage::StageState UpdateStage::doWork(WorkingSetID* out) { return PlanStage::NEED_TIME; } else if (PlanStage::IS_EOF == status) { - // The child is out of results, but we might not be done yet because we still might - // have to do an insert. - return PlanStage::NEED_TIME; + // The child is out of results, and therefore so are we. + return PlanStage::IS_EOF; } else if (PlanStage::FAILURE == status) { *out = id; // If a stage fails, it may create a status WSM to indicate why it failed, in which case @@ -855,6 +600,40 @@ PlanStage::StageState UpdateStage::doWork(WorkingSetID* out) { return status; } +void UpdateStage::_assertRequiredPathsPresent(const mb::Document& document, + const FieldRefSet& requiredPaths) { + for (const auto& path : requiredPaths) { + auto elem = document.root(); + for (size_t i = 0; i < (*path).numParts(); ++i) { + elem = elem[(*path).getPart(i)]; + uassert(ErrorCodes::NoSuchKey, + str::stream() << "After applying the update, the new document was missing the " + "required field '" + << (*path).dottedField() << "'", + elem.ok()); + uassert( + ErrorCodes::NotSingleValueField, + str::stream() << "After applying the update to the document, the required field '" + << (*path).dottedField() + << "' was found to be an array or array descendant.", + elem.getType() != BSONType::Array); + } + } +} + +void UpdateStage::_ensureIdFieldIsFirst(mb::Document* doc, bool generateOIDIfMissing) { + mb::Element idElem = mb::findFirstChildNamed(doc->root(), idFieldName); + + // If the document has no _id and the caller has requested that we generate one, do so. + if (!idElem.ok() && generateOIDIfMissing) { + addObjectIDIdField(doc); + } else if (idElem.ok() && idElem.leftSibling().ok()) { + // If the document does have an _id but it is not the first element, move it to the front. + uassertStatusOK(idElem.remove()); + uassertStatusOK(doc->root().pushFront(idElem)); + } +} + void UpdateStage::doRestoreStateRequiresCollection() { const UpdateRequest& request = *_params.request; const NamespaceString& nsString(request.getNamespaceString()); @@ -947,13 +726,11 @@ bool UpdateStage::checkUpdateChangesShardKeyFields(ScopedCollectionMetadata meta return false; } - FieldRefSet shardKeyPaths; - const auto& shardKeyPathsVector = metadata->getKeyPatternFields(); - shardKeyPaths.fillFrom(transitional_tools_do_not_use::unspool_vector(shardKeyPathsVector)); + FieldRefSet shardKeyPaths(metadata->getKeyPatternFields()); // Assert that the updated doc has all shard key fields and none are arrays or array // descendants. - assertRequiredPathsPresent(_doc, shardKeyPaths); + _assertRequiredPathsPresent(_doc, shardKeyPaths); // We do not allow modifying shard key value without specifying the full shard key in the query. // If the query is a simple equality match on _id, then '_params.canonicalQuery' will be null. @@ -968,7 +745,7 @@ bool UpdateStage::checkUpdateChangesShardKeyFields(ScopedCollectionMetadata meta pathsupport::extractFullEqualityMatches( *(_params.canonicalQuery->root()), shardKeyPaths, &equalities) .isOK() && - equalities.size() == shardKeyPathsVector.size()); + equalities.size() == metadata->getKeyPatternFields().size()); // We do not allow updates to the shard key when 'multi' is true. uassert(ErrorCodes::InvalidOptions, diff --git a/src/mongo/db/exec/update_stage.h b/src/mongo/db/exec/update_stage.h index 93936166544..f865fb9d162 100644 --- a/src/mongo/db/exec/update_stage.h +++ b/src/mongo/db/exec/update_stage.h @@ -69,14 +69,14 @@ private: }; /** - * Execution stage responsible for updates to documents and upserts. If the prior or - * newly-updated version of the document was requested to be returned, then ADVANCED is - * returned after updating or inserting a document. Otherwise, NEED_TIME is returned after - * updating or inserting a document. + * Execution stage responsible for updates to documents. If the prior or newly-updated version of + * the document was requested to be returned, then ADVANCED is returned after updating a document. + * Otherwise, NEED_TIME is returned after updating a document if further updates are pending, + * and IS_EOF is returned if no documents were found or all updates have been performed. * * Callers of doWork() must be holding a write lock. */ -class UpdateStage final : public RequiresMutableCollectionStage { +class UpdateStage : public RequiresMutableCollectionStage { UpdateStage(const UpdateStage&) = delete; UpdateStage& operator=(const UpdateStage&) = delete; @@ -87,8 +87,8 @@ public: Collection* collection, PlanStage* child); - bool isEOF() final; - StageState doWork(WorkingSetID* out) final; + bool isEOF() override; + StageState doWork(WorkingSetID* out) override; StageType stageType() const final { return STAGE_UPDATE; @@ -119,32 +119,6 @@ public: static UpdateResult makeUpdateResult(const UpdateStats* updateStats); /** - * Computes the document to insert if the upsert flag is set to true and no matching - * documents are found in the database. The document to upsert is computing using the - * query 'cq' and the update mods contained in 'driver'. - * - * If 'cq' is NULL, which can happen for the idhack update fast path, then 'query' is - * used to compute the doc to insert instead of 'cq'. - * - * 'doc' is the mutable BSON document which you would like the update driver to use - * when computing the document to insert. - * - * Set 'isInternalRequest' to true if the upsert was issued by the replication or - * sharding systems. - * - * Returns the document to insert. - */ - static BSONObj applyUpdateOpsForInsert(OperationContext* opCtx, - const CanonicalQuery* cq, - const BSONObj& query, - UpdateDriver* driver, - mutablebson::Document* doc, - bool isInternalRequest, - const NamespaceString& ns, - bool enforceOkForStorage, - UpdateStats* stats); - - /** * Returns true if an update failure due to a given DuplicateKey error is eligible for retry. * Requires that parsedUpdate.hasParsedQuery() is true. */ @@ -152,10 +126,38 @@ public: const DuplicateKeyErrorInfo& errorInfo); protected: + UpdateStage(OperationContext* opCtx, + const UpdateStageParams& params, + WorkingSet* ws, + Collection* collection); + void doSaveStateRequiresCollection() final {} void doRestoreStateRequiresCollection() final; + void _ensureIdFieldIsFirst(mutablebson::Document* doc, bool generateOIDIfMissing); + + void _assertRequiredPathsPresent(const mutablebson::Document& document, + const FieldRefSet& requiredPaths); + + UpdateStageParams _params; + + // Not owned by us. + WorkingSet* _ws; + + // Stats + UpdateStats _specificStats; + + // True if the request should be checked for an update to the shard key. + bool _shouldCheckForShardKeyUpdate; + + // True if updated documents should be validated with storage_validation::storageValid(). + bool _enforceOkForStorage; + + // These get reused for each update. + mutablebson::Document& _doc; + mutablebson::DamageVector _damages; + private: static const UpdateStats kEmptyUpdateStats; @@ -173,24 +175,6 @@ private: BSONObj transformAndUpdate(const Snapshotted<BSONObj>& oldObj, RecordId& recordId); /** - * Computes the document to insert and inserts it into the collection. Used if the - * user requested an upsert and no matching documents were found. - */ - void doInsert(); - - /** - * Have we performed all necessary updates? Even if this is true, we might not be EOF, - * as we might still have to do an insert. - */ - bool doneUpdating(); - - /** - * Examines the stats / update request and returns whether there is still an insert left - * to do. If so then this stage is not EOF yet. - */ - bool needInsert(); - - /** * Stores 'idToRetry' in '_idRetrying' so the update can be retried during the next call to * doWork(). Always returns NEED_YIELD and sets 'out' to WorkingSet::INVALID_ID. */ @@ -210,26 +194,12 @@ private: bool checkUpdateChangesShardKeyFields(ScopedCollectionMetadata metadata, const Snapshotted<BSONObj>& oldObj); - UpdateStageParams _params; - - // Not owned by us. - WorkingSet* _ws; - // If not WorkingSet::INVALID_ID, we use this rather than asking our child what to do next. WorkingSetID _idRetrying; // If not WorkingSet::INVALID_ID, we return this member to our caller. WorkingSetID _idReturning; - // Stats - UpdateStats _specificStats; - - // True if updated documents should be validated with storage_validation::storageValid(). - bool _enforceOkForStorage; - - // True if the request should be checked for an update to the shard key. - bool _shouldCheckForShardKeyUpdate; - // If the update was in-place, we may see it again. This only matters if we're doing // a multi-update; if we're not doing a multi-update we stop after one update and we // won't see any more docs. @@ -244,10 +214,6 @@ private: // So, no matter what, we keep track of where the doc wound up. typedef stdx::unordered_set<RecordId, RecordId::Hasher> RecordIdSet; const std::unique_ptr<RecordIdSet> _updatedRecordIds; - - // These get reused for each update. - mutablebson::Document& _doc; - mutablebson::DamageVector _damages; }; } // namespace mongo diff --git a/src/mongo/db/exec/upsert_stage.cpp b/src/mongo/db/exec/upsert_stage.cpp new file mode 100644 index 00000000000..dadbaf522f7 --- /dev/null +++ b/src/mongo/db/exec/upsert_stage.cpp @@ -0,0 +1,301 @@ +/** + * Copyright (C) 2019 MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/db/exec/upsert_stage.h" + +#include "mongo/db/concurrency/write_conflict_exception.h" +#include "mongo/db/curop_failpoint_helpers.h" +#include "mongo/db/s/operation_sharding_state.h" +#include "mongo/db/update/storage_validation.h" +#include "mongo/s/would_change_owning_shard_exception.h" + +namespace mongo { + +MONGO_FAIL_POINT_DEFINE(hangBeforeUpsertPerformsInsert); + +namespace mb = mutablebson; + +namespace { + +const char idFieldName[] = "_id"; +const FieldRef idFieldRef(idFieldName); + +/** + * Populates the given FieldRefSets with the fields that are required to be present in the document + * and the fields which cannot be changed, respectively. + */ +void getRequiredAndImmutablePaths(OperationContext* opCtx, + const ScopedCollectionMetadata& metadata, + bool isInternalRequest, + FieldRefSet* requiredPaths, + FieldRefSet* immutablePaths) { + // Each document has a set of required paths and a potentially different set of immutable paths. + // If the collection is sharded, add the shard key fields to the required paths vector. + if (metadata->isSharded()) { + requiredPaths->fillFrom(metadata->getKeyPatternFields()); + } + // Add the _id field, replacing any existing paths that are prefixed by _id if present. + requiredPaths->keepShortest(&idFieldRef); + + // If this is an internal request, no fields are immutable and we leave 'immutablePaths' empty. + if (!isInternalRequest) { + // An unversioned request cannot update the shard key, so all required fields are immutable. + // The shard key is also immutable if we have not yet upgraded to 4.2 feature compatibility. + const auto isFCV42 = serverGlobalParams.featureCompatibility.isVersionInitialized() && + serverGlobalParams.featureCompatibility.getVersion() == + ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo42; + if (!OperationShardingState::isOperationVersioned(opCtx) || !isFCV42) { + for (auto&& reqPath : *requiredPaths) { + immutablePaths->insert(reqPath); + } + } + // The _id field is always immutable to user requests, even if the shard key is mutable. + immutablePaths->keepShortest(&idFieldRef); + } +} +} // namespace + +UpsertStage::UpsertStage(OperationContext* opCtx, + const UpdateStageParams& params, + WorkingSet* ws, + Collection* collection, + PlanStage* child) + : UpdateStage(opCtx, params, ws, collection) { + // We should never create this stage for a non-upsert request. + invariant(_params.request->isUpsert()); + _children.emplace_back(child); +}; + +// We're done when updating is finished and we have either matched or inserted. +bool UpsertStage::isEOF() { + return UpdateStage::isEOF() && (_specificStats.nMatched > 0 || _specificStats.inserted); +} + +PlanStage::StageState UpsertStage::doWork(WorkingSetID* out) { + if (isEOF()) { + return StageState::IS_EOF; + } + + // First, attempt to perform the update on a matching document. + auto updateState = UpdateStage::doWork(out); + + // If the update returned anything other than EOF, just forward it along. There's a chance we + // still may find a document to update and will not have to insert anything. If it did return + // EOF and we do not need to insert a new document, return EOF immediately here. + if (updateState != PlanStage::IS_EOF || isEOF()) { + return updateState; + } + + // If the update resulted in EOF without matching anything, we must insert a new document. + invariant(updateState == PlanStage::IS_EOF && !isEOF()); + + // Since this is an insert, we will be logging it as such in the oplog. We don't need the + // driver's help to build the oplog record. We also set the 'inserted' stats flag here. + _params.driver->setLogOp(false); + _specificStats.inserted = true; + + // Determine whether this is a user-initiated or internal request. + const bool isInternalRequest = + !getOpCtx()->writesAreReplicated() || _params.request->isFromMigration(); + + // Generate the new document to be inserted. + _specificStats.objInserted = _produceNewDocumentForInsert(isInternalRequest); + + // If this is an explain, skip performing the actual insert. + if (!_params.request->isExplain()) { + _performInsert(_specificStats.objInserted); + } + + // We should always be EOF at this point. + invariant(isEOF()); + + // If we want to return the document we just inserted, create it as a WorkingSetMember. + if (_params.request->shouldReturnNewDocs()) { + BSONObj newObj = _specificStats.objInserted; + *out = _ws->allocate(); + WorkingSetMember* member = _ws->get(*out); + member->obj = + Snapshotted<BSONObj>(getOpCtx()->recoveryUnit()->getSnapshotId(), newObj.getOwned()); + member->transitionToOwnedObj(); + return PlanStage::ADVANCED; + } + + // If we don't need to return the inserted document, we're done. + return PlanStage::IS_EOF; +} + +void UpsertStage::_performInsert(BSONObj newDocument) { + // If in FCV 4.2 and this collection is sharded, check if the doc we plan to insert belongs to + // this shard. MongoS uses the query field to target a shard, and it is possible the shard key + // fields in the 'q' field belong to this shard, but those in the 'u' field do not. In this case + // we need to throw so that MongoS can target the insert to the correct shard. + if (_shouldCheckForShardKeyUpdate) { + auto* const css = CollectionShardingState::get(getOpCtx(), collection()->ns()); + const auto& metadata = css->getCurrentMetadata(); + + const auto isFCV42 = serverGlobalParams.featureCompatibility.isVersionInitialized() && + serverGlobalParams.featureCompatibility.getVersion() == + ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo42; + + if (isFCV42 && metadata->isSharded()) { + const ShardKeyPattern shardKeyPattern(metadata->getKeyPattern()); + auto newShardKey = shardKeyPattern.extractShardKeyFromDoc(newDocument); + + if (!metadata->keyBelongsToMe(newShardKey)) { + // An attempt to upsert a document with a shard key value that belongs on another + // shard must either be a retryable write or inside a transaction. + uassert(ErrorCodes::IllegalOperation, + "The upsert document could not be inserted onto the shard targeted by the " + "query, since its shard key belongs on a different shard. Cross-shard " + "upserts are only allowed when running in a transaction or with " + "retryWrites: true.", + getOpCtx()->getTxnNumber()); + uasserted(WouldChangeOwningShardInfo( + _params.request->getQuery(), newDocument, true /* upsert */), + "The document we are inserting belongs on a different shard"); + } + } + } + + if (MONGO_unlikely(hangBeforeUpsertPerformsInsert.shouldFail())) { + CurOpFailpointHelpers::waitWhileFailPointEnabled( + &hangBeforeUpsertPerformsInsert, getOpCtx(), "hangBeforeUpsertPerformsInsert"); + } + + writeConflictRetry(getOpCtx(), "upsert", collection()->ns().ns(), [&] { + WriteUnitOfWork wunit(getOpCtx()); + uassertStatusOK( + collection()->insertDocument(getOpCtx(), + InsertStatement(_params.request->getStmtId(), newDocument), + _params.opDebug, + _params.request->isFromMigration())); + + // Technically, we should save/restore state here, but since we are going to return + // immediately after, it would just be wasted work. + wunit.commit(); + }); +} + +BSONObj UpsertStage::_produceNewDocumentForInsert(bool isInternalRequest) { + // Obtain the sharding metadata. This will be needed to compute the required paths. The metadata + // must remain in scope since it owns the pointers used by 'requiredPaths' and 'immutablePaths'. + auto* css = CollectionShardingState::get(getOpCtx(), _params.request->getNamespaceString()); + auto metadata = css->getCurrentMetadata(); + + // Each document has a set of required paths and a potentially different set of immutable paths. + FieldRefSet requiredPaths, immutablePaths; + getRequiredAndImmutablePaths( + getOpCtx(), metadata, isInternalRequest, &requiredPaths, &immutablePaths); + + // Reset the document into which we will be writing. + _doc.reset(); + + // First: populate the document's required paths with equality predicate values from the query, + // if available. This generates the pre-image document that we will run the update against. + if (auto* cq = _params.canonicalQuery) { + uassertStatusOK(_params.driver->populateDocumentWithQueryFields(*cq, requiredPaths, _doc)); + } else { + fassert(17354, CanonicalQuery::isSimpleIdQuery(_params.request->getQuery())); + fassert(17352, _doc.root().appendElement(_params.request->getQuery()[idFieldName])); + } + + // Second: run the appropriate document generation strategy over the document to generate the + // post-image. If the update operation modifies any of the immutable paths, this will throw. + if (_params.request->shouldUpsertSuppliedDocument()) { + _generateNewDocumentFromSuppliedDoc(immutablePaths); + } else { + _generateNewDocumentFromUpdateOp(immutablePaths); + } + + // Third: ensure _id is first if it exists, and generate a new OID otherwise. + _ensureIdFieldIsFirst(&_doc, true); + + // Fourth: assert that the finished document has all required fields and is valid for storage. + _assertDocumentToBeInsertedIsValid( + _doc, requiredPaths, isInternalRequest, _enforceOkForStorage); + + // Fifth: validate that the newly-produced document does not exceed the maximum BSON user size. + auto newDocument = _doc.getObject(); + uassert(17420, + str::stream() << "Document to upsert is larger than " << BSONObjMaxUserSize, + newDocument.objsize() <= BSONObjMaxUserSize); + + return newDocument; +} + +void UpsertStage::_generateNewDocumentFromUpdateOp(const FieldRefSet& immutablePaths) { + // Use the UpdateModification from the original request to generate a new document by running + // the update over the empty (except for fields extracted from the query) document. We do not + // validate for storage until later, but we do ensure that no immutable fields are modified. + const bool validateForStorage = false; + const bool isInsert = true; + uassertStatusOK( + _params.driver->update({}, &_doc, validateForStorage, immutablePaths, isInsert)); +}; + +void UpsertStage::_generateNewDocumentFromSuppliedDoc(const FieldRefSet& immutablePaths) { + // We should never call this method unless the request has a set of update constants. + invariant(_params.request->shouldUpsertSuppliedDocument()); + invariant(_params.request->getUpdateConstants()); + + // Extract the supplied document from the constants and validate that it is an object. + auto suppliedDocElt = _params.request->getUpdateConstants()->getField("new"_sd); + invariant(suppliedDocElt.type() == BSONType::Object); + auto suppliedDoc = suppliedDocElt.embeddedObject(); + + // The supplied doc is functionally a replacement update. We need a new driver to apply it. + UpdateDriver replacementDriver(nullptr); + + // Create a new replacement-style update from the supplied document. + replacementDriver.parse({suppliedDoc}, {}); + replacementDriver.setLogOp(false); + + // We do not validate for storage, as we will validate the full document before inserting. + // However, we ensure that no immutable fields are modified. + const bool validateForStorage = false; + const bool isInsert = true; + uassertStatusOK( + replacementDriver.update({}, &_doc, validateForStorage, immutablePaths, isInsert)); +} + +void UpsertStage::_assertDocumentToBeInsertedIsValid(const mb::Document& document, + const FieldRefSet& requiredPaths, + bool isInternalRequest, + bool enforceOkForStorage) { + // For a non-internal operation, we assert that the document contains all required paths, that + // no shard key fields have arrays at any point along their paths, and that the document is + // valid for storage. Skip all such checks for an internal operation. + if (!isInternalRequest) { + if (enforceOkForStorage) { + storage_validation::storageValid(document); + } + _assertRequiredPathsPresent(document, requiredPaths); + } +} +} // namespace mongo diff --git a/src/mongo/db/exec/upsert_stage.h b/src/mongo/db/exec/upsert_stage.h new file mode 100644 index 00000000000..fe6572f9d16 --- /dev/null +++ b/src/mongo/db/exec/upsert_stage.h @@ -0,0 +1,76 @@ +/** + * Copyright (C) 2019 MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/db/exec/update_stage.h" + +namespace mongo { + +/** + * Execution stage for update requests with {upsert:true}. This is a specialized UpdateStage which, + * in the event that no documents match the update request's query, generates and inserts a new + * document into the collection. All logic related to the insertion phase is implemented by this + * class. + * + * If the prior or newly-updated version of the document was requested to be returned, then ADVANCED + * is returned after updating or inserting a document. Otherwise, NEED_TIME is returned after + * updating a document if further updates are pending, and IS_EOF is returned if all updates have + * been performed or if a document has been inserted. + * + * Callers of doWork() must be holding a write lock. + */ +class UpsertStage final : public UpdateStage { + UpsertStage(const UpsertStage&) = delete; + UpsertStage& operator=(const UpsertStage&) = delete; + +public: + UpsertStage(OperationContext* opCtx, + const UpdateStageParams& params, + WorkingSet* ws, + Collection* collection, + PlanStage* child); + + bool isEOF() final; + StageState doWork(WorkingSetID* out) final; + +private: + BSONObj _produceNewDocumentForInsert(bool isInternalRequest); + void _performInsert(BSONObj newDocument); + + void _generateNewDocumentFromSuppliedDoc(const FieldRefSet& immutablePaths); + void _generateNewDocumentFromUpdateOp(const FieldRefSet& immutablePaths); + + void _assertDocumentToBeInsertedIsValid(const mutablebson::Document& document, + const FieldRefSet& requiredPaths, + bool isInternalRequest, + bool enforceOkForStorage); +}; + +} // namespace mongo |