summaryrefslogtreecommitdiff
path: root/src/mongo/db/exec
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/exec')
-rw-r--r--src/mongo/db/exec/update_stage.cpp339
-rw-r--r--src/mongo/db/exec/update_stage.h104
-rw-r--r--src/mongo/db/exec/upsert_stage.cpp301
-rw-r--r--src/mongo/db/exec/upsert_stage.h76
4 files changed, 470 insertions, 350 deletions
diff --git a/src/mongo/db/exec/update_stage.cpp b/src/mongo/db/exec/update_stage.cpp
index a7b307bf49c..10696c6cb65 100644
--- a/src/mongo/db/exec/update_stage.cpp
+++ b/src/mongo/db/exec/update_stage.cpp
@@ -39,7 +39,6 @@
#include "mongo/bson/bson_comparator_interface_base.h"
#include "mongo/bson/mutable/algorithm.h"
#include "mongo/db/concurrency/write_conflict_exception.h"
-#include "mongo/db/curop_failpoint_helpers.h"
#include "mongo/db/exec/scoped_timer.h"
#include "mongo/db/exec/working_set_common.h"
#include "mongo/db/exec/write_stage_common.h"
@@ -57,11 +56,9 @@
#include "mongo/util/assert_util.h"
#include "mongo/util/log.h"
#include "mongo/util/scopeguard.h"
-#include "mongo/util/transitional_tools_do_not_use/vector_spooling.h"
namespace mongo {
-MONGO_FAIL_POINT_DEFINE(hangBeforeUpsertPerformsInsert);
MONGO_FAIL_POINT_DEFINE(hangBeforeThrowWouldChangeOwningShard);
using std::string;
@@ -76,59 +73,13 @@ namespace {
const char idFieldName[] = "_id";
const FieldRef idFieldRef(idFieldName);
-Status ensureIdFieldIsFirst(mb::Document* doc) {
- mb::Element idElem = mb::findFirstChildNamed(doc->root(), idFieldName);
-
- if (!idElem.ok()) {
- return {ErrorCodes::InvalidIdField, "_id field is missing"};
- }
-
- if (idElem.leftSibling().ok()) {
- // Move '_id' to be the first element
- Status s = idElem.remove();
- if (!s.isOK())
- return s;
- s = doc->root().pushFront(idElem);
- if (!s.isOK())
- return s;
- }
-
- return Status::OK();
-}
-
void addObjectIDIdField(mb::Document* doc) {
const auto idElem = doc->makeElementNewOID(idFieldName);
- if (!idElem.ok())
- uasserted(17268, "Could not create new ObjectId '_id' field.");
-
+ uassert(17268, "Could not create new ObjectId '_id' field.", idElem.ok());
uassertStatusOK(doc->root().pushFront(idElem));
}
/**
- * Uasserts if any of the paths in 'requiredPaths' are not present in 'document', or if they are
- * arrays or array descendants.
- */
-void assertRequiredPathsPresent(const mb::Document& document, const FieldRefSet& requiredPaths) {
- for (const auto& path : requiredPaths) {
- auto elem = document.root();
- for (size_t i = 0; i < (*path).numParts(); ++i) {
- elem = elem[(*path).getPart(i)];
- uassert(ErrorCodes::NoSuchKey,
- str::stream() << "After applying the update, the new document was missing the "
- "required field '"
- << (*path).dottedField() << "'",
- elem.ok());
- uassert(
- ErrorCodes::NotSingleValueField,
- str::stream() << "After applying the update to the document, the required field '"
- << (*path).dottedField()
- << "' was found to be an array or array descendant.",
- elem.getType() != BSONType::Array);
- }
- }
-}
-
-/**
* Returns true if we should throw a WriteConflictException in order to retry the operation in the
* case of a conflict. Returns false if we should skip the document and keep going.
*/
@@ -158,19 +109,30 @@ const char* UpdateStage::kStageType = "UPDATE";
const UpdateStats UpdateStage::kEmptyUpdateStats;
+// Public constructor.
UpdateStage::UpdateStage(OperationContext* opCtx,
const UpdateStageParams& params,
WorkingSet* ws,
Collection* collection,
PlanStage* child)
+ : UpdateStage(opCtx, params, ws, collection) {
+ // We should never reach here if the request is an upsert.
+ invariant(!_params.request->isUpsert());
+ _children.emplace_back(child);
+}
+
+// Protected constructor.
+UpdateStage::UpdateStage(OperationContext* opCtx,
+ const UpdateStageParams& params,
+ WorkingSet* ws,
+ Collection* collection)
: RequiresMutableCollectionStage(kStageType, opCtx, collection),
_params(params),
_ws(ws),
+ _doc(params.driver->getDocument()),
_idRetrying(WorkingSet::INVALID_ID),
_idReturning(WorkingSet::INVALID_ID),
- _updatedRecordIds(params.request->isMulti() ? new RecordIdSet() : NULL),
- _doc(params.driver->getDocument()) {
- _children.emplace_back(child);
+ _updatedRecordIds(params.request->isMulti() ? new RecordIdSet() : nullptr) {
// Should the modifiers validate their embedded docs via storage_validation::storageValid()?
// Only user updates should be checked. Any system or replication stuff should pass through.
@@ -226,9 +188,7 @@ BSONObj UpdateStage::transformAndUpdate(const Snapshotted<BSONObj>& oldObj, Reco
if (getOpCtx()->writesAreReplicated() && !request->isFromMigration()) {
if (metadata->isSharded() &&
(!OperationShardingState::isOperationVersioned(getOpCtx()) || !isFCV42)) {
- auto& immutablePathsVector = metadata->getKeyPatternFields();
- immutablePaths.fillFrom(
- transitional_tools_do_not_use::unspool_vector(immutablePathsVector));
+ immutablePaths.fillFrom(metadata->getKeyPatternFields());
}
immutablePaths.keepShortest(&idFieldRef);
}
@@ -270,16 +230,8 @@ BSONObj UpdateStage::transformAndUpdate(const Snapshotted<BSONObj>& oldObj, Reco
// neither grow nor shrink).
const auto createIdField = !collection()->isCapped();
- // Ensure if _id exists it is first
- status = ensureIdFieldIsFirst(&_doc);
- if (status.code() == ErrorCodes::InvalidIdField) {
- // Create ObjectId _id field if we are doing that
- if (createIdField) {
- addObjectIDIdField(&_doc);
- }
- } else {
- uassertStatusOK(status);
- }
+ // Ensure _id is first if it exists, and generate a new OID if appropriate.
+ _ensureIdFieldIsFirst(&_doc, createIdField);
// See if the changes were applied in place
const char* source = NULL;
@@ -394,99 +346,6 @@ BSONObj UpdateStage::transformAndUpdate(const Snapshotted<BSONObj>& oldObj, Reco
return newObj;
}
-BSONObj UpdateStage::applyUpdateOpsForInsert(OperationContext* opCtx,
- const CanonicalQuery* cq,
- const BSONObj& query,
- UpdateDriver* driver,
- mutablebson::Document* doc,
- bool isInternalRequest,
- const NamespaceString& ns,
- bool enforceOkForStorage,
- UpdateStats* stats) {
- // Since this is an insert (no docs found and upsert:true), we will be logging it
- // as an insert in the oplog. We don't need the driver's help to build the
- // oplog record, then. We also set the context of the update driver to the INSERT_CONTEXT.
- // Some mods may only work in that context (e.g. $setOnInsert).
- driver->setLogOp(false);
-
- auto* const css = CollectionShardingState::get(opCtx, ns);
- auto metadata = css->getCurrentMetadata();
-
- const auto isFCV42 = serverGlobalParams.featureCompatibility.isVersionInitialized() &&
- serverGlobalParams.featureCompatibility.getVersion() ==
- ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo42;
-
- FieldRefSet immutablePaths;
- if (metadata->isSharded() &&
- (!OperationShardingState::isOperationVersioned(opCtx) || !isFCV42)) {
- auto& immutablePathsVector = metadata->getKeyPatternFields();
- immutablePaths.fillFrom(
- transitional_tools_do_not_use::unspool_vector(immutablePathsVector));
- }
- immutablePaths.keepShortest(&idFieldRef);
-
- if (cq) {
- FieldRefSet requiredPaths;
- if (metadata->isSharded()) {
- const auto& shardKeyPathsVector = metadata->getKeyPatternFields();
- requiredPaths.fillFrom(
- transitional_tools_do_not_use::unspool_vector(shardKeyPathsVector));
- }
- requiredPaths.keepShortest(&idFieldRef);
- uassertStatusOK(driver->populateDocumentWithQueryFields(*cq, requiredPaths, *doc));
- } else {
- fassert(17354, CanonicalQuery::isSimpleIdQuery(query));
- BSONElement idElt = query[idFieldName];
- fassert(17352, doc->root().appendElement(idElt));
- }
-
- // Apply the update modifications here. Do not validate for storage, since we will validate the
- // entire document after the update. However, we ensure that no immutable fields are updated.
- const bool validateForStorage = false;
- const bool isInsert = true;
- if (isInternalRequest) {
- immutablePaths.clear();
- }
- Status updateStatus =
- driver->update(StringData(), doc, validateForStorage, immutablePaths, isInsert);
- if (!updateStatus.isOK()) {
- uasserted(16836, updateStatus.reason());
- }
-
- // Ensure _id exists and is first
- auto idAndFirstStatus = ensureIdFieldIsFirst(doc);
- if (idAndFirstStatus.code() == ErrorCodes::InvalidIdField) { // _id field is missing
- addObjectIDIdField(doc);
- } else {
- uassertStatusOK(idAndFirstStatus);
- }
-
- // Validate that the object replacement or modifiers resulted in a document
- // that contains all the required keys and can be stored if it isn't coming
- // from a migration or via replication.
- if (!isInternalRequest) {
- if (enforceOkForStorage) {
- storage_validation::storageValid(*doc);
- }
- FieldRefSet requiredPaths;
- if (metadata->isSharded()) {
- const auto& shardKeyPathsVector = metadata->getKeyPatternFields();
- requiredPaths.fillFrom(
- transitional_tools_do_not_use::unspool_vector(shardKeyPathsVector));
- }
- requiredPaths.keepShortest(&idFieldRef);
- assertRequiredPathsPresent(*doc, requiredPaths);
- }
-
- BSONObj newObj = doc->getObject();
- if (newObj.objsize() > BSONObjMaxUserSize) {
- uasserted(17420,
- str::stream() << "Document to upsert is larger than " << BSONObjMaxUserSize);
- }
-
- return newObj;
-}
-
bool UpdateStage::matchContainsOnlyAndedEqualityNodes(const MatchExpression& root) {
if (root.matchType() == MatchExpression::EQ) {
return true;
@@ -563,131 +422,18 @@ bool UpdateStage::shouldRetryDuplicateKeyException(const ParsedUpdate& parsedUpd
return true;
}
-void UpdateStage::doInsert() {
- _specificStats.inserted = true;
-
- const UpdateRequest* request = _params.request;
- bool isInternalRequest = !getOpCtx()->writesAreReplicated() || request->isFromMigration();
-
- // Reset the document we will be writing to.
- _doc.reset();
-
- BSONObj newObj = applyUpdateOpsForInsert(getOpCtx(),
- _params.canonicalQuery,
- request->getQuery(),
- _params.driver,
- &_doc,
- isInternalRequest,
- request->getNamespaceString(),
- _enforceOkForStorage,
- &_specificStats);
-
- _specificStats.objInserted = newObj;
-
- // If this is an explain, bail out now without doing the insert.
- if (request->isExplain()) {
- return;
- }
-
- // If in FCV 4.2 and this collection is sharded, check if the doc we plan to insert belongs to
- // this shard. MongoS uses the query field to target a shard, and it is possible the shard key
- // fields in the 'q' field belong to this shard, but those in the 'u' field do not. In this case
- // we need to throw so that MongoS can target the insert to the correct shard.
- if (_shouldCheckForShardKeyUpdate) {
- const auto isFCV42 = serverGlobalParams.featureCompatibility.isVersionInitialized() &&
- serverGlobalParams.featureCompatibility.getVersion() ==
- ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo42;
- auto* const css = CollectionShardingState::get(getOpCtx(), collection()->ns());
- const auto& metadata = css->getCurrentMetadata();
-
- if (isFCV42 && metadata->isSharded()) {
- const ShardKeyPattern shardKeyPattern(metadata->getKeyPattern());
- auto newShardKey = shardKeyPattern.extractShardKeyFromDoc(newObj);
-
- if (!metadata->keyBelongsToMe(newShardKey)) {
- // An attempt to upsert a document with a shard key value that belongs on another
- // shard must either be a retryable write or inside a transaction.
- uassert(ErrorCodes::IllegalOperation,
- "The upsert document could not be inserted onto the shard targeted by the "
- "query, since its shard key belongs on a different shard. Cross-shard "
- "upserts are only allowed when running in a transaction or with "
- "retryWrites: true.",
- getOpCtx()->getTxnNumber());
- uasserted(
- WouldChangeOwningShardInfo(request->getQuery(), newObj, true /* upsert */),
- "The document we are inserting belongs on a different shard");
- }
- }
- }
-
- if (MONGO_FAIL_POINT(hangBeforeUpsertPerformsInsert)) {
- CurOpFailpointHelpers::waitWhileFailPointEnabled(
- &hangBeforeUpsertPerformsInsert, getOpCtx(), "hangBeforeUpsertPerformsInsert");
- }
-
- writeConflictRetry(getOpCtx(), "upsert", collection()->ns().ns(), [&] {
- WriteUnitOfWork wunit(getOpCtx());
- uassertStatusOK(collection()->insertDocument(getOpCtx(),
- InsertStatement(request->getStmtId(), newObj),
- _params.opDebug,
- request->isFromMigration()));
-
- // Technically, we should save/restore state here, but since we are going to return
- // immediately after, it would just be wasted work.
- wunit.commit();
- });
-}
-
-bool UpdateStage::doneUpdating() {
+bool UpdateStage::isEOF() {
// We're done updating if either the child has no more results to give us, or we've
// already gotten a result back and we're not a multi-update.
return _idRetrying == WorkingSet::INVALID_ID && _idReturning == WorkingSet::INVALID_ID &&
(child()->isEOF() || (_specificStats.nMatched > 0 && !_params.request->isMulti()));
}
-bool UpdateStage::needInsert() {
- // We need to insert if
- // 1) we haven't inserted already,
- // 2) the child stage returned zero matches, and
- // 3) the user asked for an upsert.
- return !_specificStats.inserted && _specificStats.nMatched == 0 && _params.request->isUpsert();
-}
-
-bool UpdateStage::isEOF() {
- return doneUpdating() && !needInsert();
-}
-
PlanStage::StageState UpdateStage::doWork(WorkingSetID* out) {
if (isEOF()) {
return PlanStage::IS_EOF;
}
- if (doneUpdating()) {
- // Even if we're done updating, we may have some inserting left to do.
- if (needInsert()) {
-
- doInsert();
-
- invariant(isEOF());
- if (_params.request->shouldReturnNewDocs()) {
- // Want to return the document we just inserted, create it as a WorkingSetMember
- // so that we can return it.
- BSONObj newObj = _specificStats.objInserted;
- *out = _ws->allocate();
- WorkingSetMember* member = _ws->get(*out);
- member->obj = Snapshotted<BSONObj>(getOpCtx()->recoveryUnit()->getSnapshotId(),
- newObj.getOwned());
- member->transitionToOwnedObj();
- return PlanStage::ADVANCED;
- }
- }
-
- // At this point either we're done updating and there was no insert to do,
- // or we're done updating and we're done inserting. Either way, we're EOF.
- invariant(isEOF());
- return PlanStage::IS_EOF;
- }
-
// It is possible that after an update was applied, a WriteConflictException
// occurred and prevented us from returning ADVANCED with the requested version
// of the document.
@@ -834,9 +580,8 @@ PlanStage::StageState UpdateStage::doWork(WorkingSetID* out) {
return PlanStage::NEED_TIME;
} else if (PlanStage::IS_EOF == status) {
- // The child is out of results, but we might not be done yet because we still might
- // have to do an insert.
- return PlanStage::NEED_TIME;
+ // The child is out of results, and therefore so are we.
+ return PlanStage::IS_EOF;
} else if (PlanStage::FAILURE == status) {
*out = id;
// If a stage fails, it may create a status WSM to indicate why it failed, in which case
@@ -855,6 +600,40 @@ PlanStage::StageState UpdateStage::doWork(WorkingSetID* out) {
return status;
}
+void UpdateStage::_assertRequiredPathsPresent(const mb::Document& document,
+ const FieldRefSet& requiredPaths) {
+ for (const auto& path : requiredPaths) {
+ auto elem = document.root();
+ for (size_t i = 0; i < (*path).numParts(); ++i) {
+ elem = elem[(*path).getPart(i)];
+ uassert(ErrorCodes::NoSuchKey,
+ str::stream() << "After applying the update, the new document was missing the "
+ "required field '"
+ << (*path).dottedField() << "'",
+ elem.ok());
+ uassert(
+ ErrorCodes::NotSingleValueField,
+ str::stream() << "After applying the update to the document, the required field '"
+ << (*path).dottedField()
+ << "' was found to be an array or array descendant.",
+ elem.getType() != BSONType::Array);
+ }
+ }
+}
+
+void UpdateStage::_ensureIdFieldIsFirst(mb::Document* doc, bool generateOIDIfMissing) {
+ mb::Element idElem = mb::findFirstChildNamed(doc->root(), idFieldName);
+
+ // If the document has no _id and the caller has requested that we generate one, do so.
+ if (!idElem.ok() && generateOIDIfMissing) {
+ addObjectIDIdField(doc);
+ } else if (idElem.ok() && idElem.leftSibling().ok()) {
+ // If the document does have an _id but it is not the first element, move it to the front.
+ uassertStatusOK(idElem.remove());
+ uassertStatusOK(doc->root().pushFront(idElem));
+ }
+}
+
void UpdateStage::doRestoreStateRequiresCollection() {
const UpdateRequest& request = *_params.request;
const NamespaceString& nsString(request.getNamespaceString());
@@ -947,13 +726,11 @@ bool UpdateStage::checkUpdateChangesShardKeyFields(ScopedCollectionMetadata meta
return false;
}
- FieldRefSet shardKeyPaths;
- const auto& shardKeyPathsVector = metadata->getKeyPatternFields();
- shardKeyPaths.fillFrom(transitional_tools_do_not_use::unspool_vector(shardKeyPathsVector));
+ FieldRefSet shardKeyPaths(metadata->getKeyPatternFields());
// Assert that the updated doc has all shard key fields and none are arrays or array
// descendants.
- assertRequiredPathsPresent(_doc, shardKeyPaths);
+ _assertRequiredPathsPresent(_doc, shardKeyPaths);
// We do not allow modifying shard key value without specifying the full shard key in the query.
// If the query is a simple equality match on _id, then '_params.canonicalQuery' will be null.
@@ -968,7 +745,7 @@ bool UpdateStage::checkUpdateChangesShardKeyFields(ScopedCollectionMetadata meta
pathsupport::extractFullEqualityMatches(
*(_params.canonicalQuery->root()), shardKeyPaths, &equalities)
.isOK() &&
- equalities.size() == shardKeyPathsVector.size());
+ equalities.size() == metadata->getKeyPatternFields().size());
// We do not allow updates to the shard key when 'multi' is true.
uassert(ErrorCodes::InvalidOptions,
diff --git a/src/mongo/db/exec/update_stage.h b/src/mongo/db/exec/update_stage.h
index 93936166544..f865fb9d162 100644
--- a/src/mongo/db/exec/update_stage.h
+++ b/src/mongo/db/exec/update_stage.h
@@ -69,14 +69,14 @@ private:
};
/**
- * Execution stage responsible for updates to documents and upserts. If the prior or
- * newly-updated version of the document was requested to be returned, then ADVANCED is
- * returned after updating or inserting a document. Otherwise, NEED_TIME is returned after
- * updating or inserting a document.
+ * Execution stage responsible for updates to documents. If the prior or newly-updated version of
+ * the document was requested to be returned, then ADVANCED is returned after updating a document.
+ * Otherwise, NEED_TIME is returned after updating a document if further updates are pending,
+ * and IS_EOF is returned if no documents were found or all updates have been performed.
*
* Callers of doWork() must be holding a write lock.
*/
-class UpdateStage final : public RequiresMutableCollectionStage {
+class UpdateStage : public RequiresMutableCollectionStage {
UpdateStage(const UpdateStage&) = delete;
UpdateStage& operator=(const UpdateStage&) = delete;
@@ -87,8 +87,8 @@ public:
Collection* collection,
PlanStage* child);
- bool isEOF() final;
- StageState doWork(WorkingSetID* out) final;
+ bool isEOF() override;
+ StageState doWork(WorkingSetID* out) override;
StageType stageType() const final {
return STAGE_UPDATE;
@@ -119,32 +119,6 @@ public:
static UpdateResult makeUpdateResult(const UpdateStats* updateStats);
/**
- * Computes the document to insert if the upsert flag is set to true and no matching
- * documents are found in the database. The document to upsert is computing using the
- * query 'cq' and the update mods contained in 'driver'.
- *
- * If 'cq' is NULL, which can happen for the idhack update fast path, then 'query' is
- * used to compute the doc to insert instead of 'cq'.
- *
- * 'doc' is the mutable BSON document which you would like the update driver to use
- * when computing the document to insert.
- *
- * Set 'isInternalRequest' to true if the upsert was issued by the replication or
- * sharding systems.
- *
- * Returns the document to insert.
- */
- static BSONObj applyUpdateOpsForInsert(OperationContext* opCtx,
- const CanonicalQuery* cq,
- const BSONObj& query,
- UpdateDriver* driver,
- mutablebson::Document* doc,
- bool isInternalRequest,
- const NamespaceString& ns,
- bool enforceOkForStorage,
- UpdateStats* stats);
-
- /**
* Returns true if an update failure due to a given DuplicateKey error is eligible for retry.
* Requires that parsedUpdate.hasParsedQuery() is true.
*/
@@ -152,10 +126,38 @@ public:
const DuplicateKeyErrorInfo& errorInfo);
protected:
+ UpdateStage(OperationContext* opCtx,
+ const UpdateStageParams& params,
+ WorkingSet* ws,
+ Collection* collection);
+
void doSaveStateRequiresCollection() final {}
void doRestoreStateRequiresCollection() final;
+ void _ensureIdFieldIsFirst(mutablebson::Document* doc, bool generateOIDIfMissing);
+
+ void _assertRequiredPathsPresent(const mutablebson::Document& document,
+ const FieldRefSet& requiredPaths);
+
+ UpdateStageParams _params;
+
+ // Not owned by us.
+ WorkingSet* _ws;
+
+ // Stats
+ UpdateStats _specificStats;
+
+ // True if the request should be checked for an update to the shard key.
+ bool _shouldCheckForShardKeyUpdate;
+
+ // True if updated documents should be validated with storage_validation::storageValid().
+ bool _enforceOkForStorage;
+
+ // These get reused for each update.
+ mutablebson::Document& _doc;
+ mutablebson::DamageVector _damages;
+
private:
static const UpdateStats kEmptyUpdateStats;
@@ -173,24 +175,6 @@ private:
BSONObj transformAndUpdate(const Snapshotted<BSONObj>& oldObj, RecordId& recordId);
/**
- * Computes the document to insert and inserts it into the collection. Used if the
- * user requested an upsert and no matching documents were found.
- */
- void doInsert();
-
- /**
- * Have we performed all necessary updates? Even if this is true, we might not be EOF,
- * as we might still have to do an insert.
- */
- bool doneUpdating();
-
- /**
- * Examines the stats / update request and returns whether there is still an insert left
- * to do. If so then this stage is not EOF yet.
- */
- bool needInsert();
-
- /**
* Stores 'idToRetry' in '_idRetrying' so the update can be retried during the next call to
* doWork(). Always returns NEED_YIELD and sets 'out' to WorkingSet::INVALID_ID.
*/
@@ -210,26 +194,12 @@ private:
bool checkUpdateChangesShardKeyFields(ScopedCollectionMetadata metadata,
const Snapshotted<BSONObj>& oldObj);
- UpdateStageParams _params;
-
- // Not owned by us.
- WorkingSet* _ws;
-
// If not WorkingSet::INVALID_ID, we use this rather than asking our child what to do next.
WorkingSetID _idRetrying;
// If not WorkingSet::INVALID_ID, we return this member to our caller.
WorkingSetID _idReturning;
- // Stats
- UpdateStats _specificStats;
-
- // True if updated documents should be validated with storage_validation::storageValid().
- bool _enforceOkForStorage;
-
- // True if the request should be checked for an update to the shard key.
- bool _shouldCheckForShardKeyUpdate;
-
// If the update was in-place, we may see it again. This only matters if we're doing
// a multi-update; if we're not doing a multi-update we stop after one update and we
// won't see any more docs.
@@ -244,10 +214,6 @@ private:
// So, no matter what, we keep track of where the doc wound up.
typedef stdx::unordered_set<RecordId, RecordId::Hasher> RecordIdSet;
const std::unique_ptr<RecordIdSet> _updatedRecordIds;
-
- // These get reused for each update.
- mutablebson::Document& _doc;
- mutablebson::DamageVector _damages;
};
} // namespace mongo
diff --git a/src/mongo/db/exec/upsert_stage.cpp b/src/mongo/db/exec/upsert_stage.cpp
new file mode 100644
index 00000000000..dadbaf522f7
--- /dev/null
+++ b/src/mongo/db/exec/upsert_stage.cpp
@@ -0,0 +1,301 @@
+/**
+ * Copyright (C) 2019 MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/db/exec/upsert_stage.h"
+
+#include "mongo/db/concurrency/write_conflict_exception.h"
+#include "mongo/db/curop_failpoint_helpers.h"
+#include "mongo/db/s/operation_sharding_state.h"
+#include "mongo/db/update/storage_validation.h"
+#include "mongo/s/would_change_owning_shard_exception.h"
+
+namespace mongo {
+
+MONGO_FAIL_POINT_DEFINE(hangBeforeUpsertPerformsInsert);
+
+namespace mb = mutablebson;
+
+namespace {
+
+const char idFieldName[] = "_id";
+const FieldRef idFieldRef(idFieldName);
+
+/**
+ * Populates the given FieldRefSets with the fields that are required to be present in the document
+ * and the fields which cannot be changed, respectively.
+ */
+void getRequiredAndImmutablePaths(OperationContext* opCtx,
+ const ScopedCollectionMetadata& metadata,
+ bool isInternalRequest,
+ FieldRefSet* requiredPaths,
+ FieldRefSet* immutablePaths) {
+ // Each document has a set of required paths and a potentially different set of immutable paths.
+ // If the collection is sharded, add the shard key fields to the required paths vector.
+ if (metadata->isSharded()) {
+ requiredPaths->fillFrom(metadata->getKeyPatternFields());
+ }
+ // Add the _id field, replacing any existing paths that are prefixed by _id if present.
+ requiredPaths->keepShortest(&idFieldRef);
+
+ // If this is an internal request, no fields are immutable and we leave 'immutablePaths' empty.
+ if (!isInternalRequest) {
+ // An unversioned request cannot update the shard key, so all required fields are immutable.
+ // The shard key is also immutable if we have not yet upgraded to 4.2 feature compatibility.
+ const auto isFCV42 = serverGlobalParams.featureCompatibility.isVersionInitialized() &&
+ serverGlobalParams.featureCompatibility.getVersion() ==
+ ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo42;
+ if (!OperationShardingState::isOperationVersioned(opCtx) || !isFCV42) {
+ for (auto&& reqPath : *requiredPaths) {
+ immutablePaths->insert(reqPath);
+ }
+ }
+ // The _id field is always immutable to user requests, even if the shard key is mutable.
+ immutablePaths->keepShortest(&idFieldRef);
+ }
+}
+} // namespace
+
+UpsertStage::UpsertStage(OperationContext* opCtx,
+ const UpdateStageParams& params,
+ WorkingSet* ws,
+ Collection* collection,
+ PlanStage* child)
+ : UpdateStage(opCtx, params, ws, collection) {
+ // We should never create this stage for a non-upsert request.
+ invariant(_params.request->isUpsert());
+ _children.emplace_back(child);
+};
+
+// We're done when updating is finished and we have either matched or inserted.
+bool UpsertStage::isEOF() {
+ return UpdateStage::isEOF() && (_specificStats.nMatched > 0 || _specificStats.inserted);
+}
+
+PlanStage::StageState UpsertStage::doWork(WorkingSetID* out) {
+ if (isEOF()) {
+ return StageState::IS_EOF;
+ }
+
+ // First, attempt to perform the update on a matching document.
+ auto updateState = UpdateStage::doWork(out);
+
+ // If the update returned anything other than EOF, just forward it along. There's a chance we
+ // still may find a document to update and will not have to insert anything. If it did return
+ // EOF and we do not need to insert a new document, return EOF immediately here.
+ if (updateState != PlanStage::IS_EOF || isEOF()) {
+ return updateState;
+ }
+
+ // If the update resulted in EOF without matching anything, we must insert a new document.
+ invariant(updateState == PlanStage::IS_EOF && !isEOF());
+
+ // Since this is an insert, we will be logging it as such in the oplog. We don't need the
+ // driver's help to build the oplog record. We also set the 'inserted' stats flag here.
+ _params.driver->setLogOp(false);
+ _specificStats.inserted = true;
+
+ // Determine whether this is a user-initiated or internal request.
+ const bool isInternalRequest =
+ !getOpCtx()->writesAreReplicated() || _params.request->isFromMigration();
+
+ // Generate the new document to be inserted.
+ _specificStats.objInserted = _produceNewDocumentForInsert(isInternalRequest);
+
+ // If this is an explain, skip performing the actual insert.
+ if (!_params.request->isExplain()) {
+ _performInsert(_specificStats.objInserted);
+ }
+
+ // We should always be EOF at this point.
+ invariant(isEOF());
+
+ // If we want to return the document we just inserted, create it as a WorkingSetMember.
+ if (_params.request->shouldReturnNewDocs()) {
+ BSONObj newObj = _specificStats.objInserted;
+ *out = _ws->allocate();
+ WorkingSetMember* member = _ws->get(*out);
+ member->obj =
+ Snapshotted<BSONObj>(getOpCtx()->recoveryUnit()->getSnapshotId(), newObj.getOwned());
+ member->transitionToOwnedObj();
+ return PlanStage::ADVANCED;
+ }
+
+ // If we don't need to return the inserted document, we're done.
+ return PlanStage::IS_EOF;
+}
+
+void UpsertStage::_performInsert(BSONObj newDocument) {
+ // If in FCV 4.2 and this collection is sharded, check if the doc we plan to insert belongs to
+ // this shard. MongoS uses the query field to target a shard, and it is possible the shard key
+ // fields in the 'q' field belong to this shard, but those in the 'u' field do not. In this case
+ // we need to throw so that MongoS can target the insert to the correct shard.
+ if (_shouldCheckForShardKeyUpdate) {
+ auto* const css = CollectionShardingState::get(getOpCtx(), collection()->ns());
+ const auto& metadata = css->getCurrentMetadata();
+
+ const auto isFCV42 = serverGlobalParams.featureCompatibility.isVersionInitialized() &&
+ serverGlobalParams.featureCompatibility.getVersion() ==
+ ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo42;
+
+ if (isFCV42 && metadata->isSharded()) {
+ const ShardKeyPattern shardKeyPattern(metadata->getKeyPattern());
+ auto newShardKey = shardKeyPattern.extractShardKeyFromDoc(newDocument);
+
+ if (!metadata->keyBelongsToMe(newShardKey)) {
+ // An attempt to upsert a document with a shard key value that belongs on another
+ // shard must either be a retryable write or inside a transaction.
+ uassert(ErrorCodes::IllegalOperation,
+ "The upsert document could not be inserted onto the shard targeted by the "
+ "query, since its shard key belongs on a different shard. Cross-shard "
+ "upserts are only allowed when running in a transaction or with "
+ "retryWrites: true.",
+ getOpCtx()->getTxnNumber());
+ uasserted(WouldChangeOwningShardInfo(
+ _params.request->getQuery(), newDocument, true /* upsert */),
+ "The document we are inserting belongs on a different shard");
+ }
+ }
+ }
+
+ if (MONGO_unlikely(hangBeforeUpsertPerformsInsert.shouldFail())) {
+ CurOpFailpointHelpers::waitWhileFailPointEnabled(
+ &hangBeforeUpsertPerformsInsert, getOpCtx(), "hangBeforeUpsertPerformsInsert");
+ }
+
+ writeConflictRetry(getOpCtx(), "upsert", collection()->ns().ns(), [&] {
+ WriteUnitOfWork wunit(getOpCtx());
+ uassertStatusOK(
+ collection()->insertDocument(getOpCtx(),
+ InsertStatement(_params.request->getStmtId(), newDocument),
+ _params.opDebug,
+ _params.request->isFromMigration()));
+
+ // Technically, we should save/restore state here, but since we are going to return
+ // immediately after, it would just be wasted work.
+ wunit.commit();
+ });
+}
+
+BSONObj UpsertStage::_produceNewDocumentForInsert(bool isInternalRequest) {
+ // Obtain the sharding metadata. This will be needed to compute the required paths. The metadata
+ // must remain in scope since it owns the pointers used by 'requiredPaths' and 'immutablePaths'.
+ auto* css = CollectionShardingState::get(getOpCtx(), _params.request->getNamespaceString());
+ auto metadata = css->getCurrentMetadata();
+
+ // Each document has a set of required paths and a potentially different set of immutable paths.
+ FieldRefSet requiredPaths, immutablePaths;
+ getRequiredAndImmutablePaths(
+ getOpCtx(), metadata, isInternalRequest, &requiredPaths, &immutablePaths);
+
+ // Reset the document into which we will be writing.
+ _doc.reset();
+
+ // First: populate the document's required paths with equality predicate values from the query,
+ // if available. This generates the pre-image document that we will run the update against.
+ if (auto* cq = _params.canonicalQuery) {
+ uassertStatusOK(_params.driver->populateDocumentWithQueryFields(*cq, requiredPaths, _doc));
+ } else {
+ fassert(17354, CanonicalQuery::isSimpleIdQuery(_params.request->getQuery()));
+ fassert(17352, _doc.root().appendElement(_params.request->getQuery()[idFieldName]));
+ }
+
+ // Second: run the appropriate document generation strategy over the document to generate the
+ // post-image. If the update operation modifies any of the immutable paths, this will throw.
+ if (_params.request->shouldUpsertSuppliedDocument()) {
+ _generateNewDocumentFromSuppliedDoc(immutablePaths);
+ } else {
+ _generateNewDocumentFromUpdateOp(immutablePaths);
+ }
+
+ // Third: ensure _id is first if it exists, and generate a new OID otherwise.
+ _ensureIdFieldIsFirst(&_doc, true);
+
+ // Fourth: assert that the finished document has all required fields and is valid for storage.
+ _assertDocumentToBeInsertedIsValid(
+ _doc, requiredPaths, isInternalRequest, _enforceOkForStorage);
+
+ // Fifth: validate that the newly-produced document does not exceed the maximum BSON user size.
+ auto newDocument = _doc.getObject();
+ uassert(17420,
+ str::stream() << "Document to upsert is larger than " << BSONObjMaxUserSize,
+ newDocument.objsize() <= BSONObjMaxUserSize);
+
+ return newDocument;
+}
+
+void UpsertStage::_generateNewDocumentFromUpdateOp(const FieldRefSet& immutablePaths) {
+ // Use the UpdateModification from the original request to generate a new document by running
+ // the update over the empty (except for fields extracted from the query) document. We do not
+ // validate for storage until later, but we do ensure that no immutable fields are modified.
+ const bool validateForStorage = false;
+ const bool isInsert = true;
+ uassertStatusOK(
+ _params.driver->update({}, &_doc, validateForStorage, immutablePaths, isInsert));
+};
+
+void UpsertStage::_generateNewDocumentFromSuppliedDoc(const FieldRefSet& immutablePaths) {
+ // We should never call this method unless the request has a set of update constants.
+ invariant(_params.request->shouldUpsertSuppliedDocument());
+ invariant(_params.request->getUpdateConstants());
+
+ // Extract the supplied document from the constants and validate that it is an object.
+ auto suppliedDocElt = _params.request->getUpdateConstants()->getField("new"_sd);
+ invariant(suppliedDocElt.type() == BSONType::Object);
+ auto suppliedDoc = suppliedDocElt.embeddedObject();
+
+ // The supplied doc is functionally a replacement update. We need a new driver to apply it.
+ UpdateDriver replacementDriver(nullptr);
+
+ // Create a new replacement-style update from the supplied document.
+ replacementDriver.parse({suppliedDoc}, {});
+ replacementDriver.setLogOp(false);
+
+ // We do not validate for storage, as we will validate the full document before inserting.
+ // However, we ensure that no immutable fields are modified.
+ const bool validateForStorage = false;
+ const bool isInsert = true;
+ uassertStatusOK(
+ replacementDriver.update({}, &_doc, validateForStorage, immutablePaths, isInsert));
+}
+
+void UpsertStage::_assertDocumentToBeInsertedIsValid(const mb::Document& document,
+ const FieldRefSet& requiredPaths,
+ bool isInternalRequest,
+ bool enforceOkForStorage) {
+ // For a non-internal operation, we assert that the document contains all required paths, that
+ // no shard key fields have arrays at any point along their paths, and that the document is
+ // valid for storage. Skip all such checks for an internal operation.
+ if (!isInternalRequest) {
+ if (enforceOkForStorage) {
+ storage_validation::storageValid(document);
+ }
+ _assertRequiredPathsPresent(document, requiredPaths);
+ }
+}
+} // namespace mongo
diff --git a/src/mongo/db/exec/upsert_stage.h b/src/mongo/db/exec/upsert_stage.h
new file mode 100644
index 00000000000..fe6572f9d16
--- /dev/null
+++ b/src/mongo/db/exec/upsert_stage.h
@@ -0,0 +1,76 @@
+/**
+ * Copyright (C) 2019 MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/exec/update_stage.h"
+
+namespace mongo {
+
+/**
+ * Execution stage for update requests with {upsert:true}. This is a specialized UpdateStage which,
+ * in the event that no documents match the update request's query, generates and inserts a new
+ * document into the collection. All logic related to the insertion phase is implemented by this
+ * class.
+ *
+ * If the prior or newly-updated version of the document was requested to be returned, then ADVANCED
+ * is returned after updating or inserting a document. Otherwise, NEED_TIME is returned after
+ * updating a document if further updates are pending, and IS_EOF is returned if all updates have
+ * been performed or if a document has been inserted.
+ *
+ * Callers of doWork() must be holding a write lock.
+ */
+class UpsertStage final : public UpdateStage {
+ UpsertStage(const UpsertStage&) = delete;
+ UpsertStage& operator=(const UpsertStage&) = delete;
+
+public:
+ UpsertStage(OperationContext* opCtx,
+ const UpdateStageParams& params,
+ WorkingSet* ws,
+ Collection* collection,
+ PlanStage* child);
+
+ bool isEOF() final;
+ StageState doWork(WorkingSetID* out) final;
+
+private:
+ BSONObj _produceNewDocumentForInsert(bool isInternalRequest);
+ void _performInsert(BSONObj newDocument);
+
+ void _generateNewDocumentFromSuppliedDoc(const FieldRefSet& immutablePaths);
+ void _generateNewDocumentFromUpdateOp(const FieldRefSet& immutablePaths);
+
+ void _assertDocumentToBeInsertedIsValid(const mutablebson::Document& document,
+ const FieldRefSet& requiredPaths,
+ bool isInternalRequest,
+ bool enforceOkForStorage);
+};
+
+} // namespace mongo