diff options
Diffstat (limited to 'src')
22 files changed, 243 insertions, 61 deletions
diff --git a/src/mongo/db/ops/write_ops_exec.cpp b/src/mongo/db/ops/write_ops_exec.cpp index de92a6e94d6..834a2034033 100644 --- a/src/mongo/db/ops/write_ops_exec.cpp +++ b/src/mongo/db/ops/write_ops_exec.cpp @@ -93,6 +93,7 @@ MONGO_FAIL_POINT_DEFINE(hangBeforeChildRemoveOpFinishes); MONGO_FAIL_POINT_DEFINE(hangBeforeChildRemoveOpIsPopped); MONGO_FAIL_POINT_DEFINE(hangAfterAllChildRemoveOpsArePopped); MONGO_FAIL_POINT_DEFINE(hangDuringBatchInsert); +MONGO_FAIL_POINT_DEFINE(hangDuringBatchUpdate); void updateRetryStats(OperationContext* opCtx, bool containsRetry) { if (containsRetry) { @@ -596,6 +597,12 @@ static SingleWriteResult performSingleUpdateOp(OperationContext* opCtx, boost::optional<AutoGetCollection> collection; while (true) { + if (MONGO_FAIL_POINT(hangDuringBatchUpdate)) { + log() << "batch update - hangDuringBatchUpdate fail point enabled. Blocking until " + "fail point is disabled."; + MONGO_FAIL_POINT_PAUSE_WHILE_SET(hangDuringBatchUpdate); + } + if (MONGO_FAIL_POINT(failAllUpdates)) { uasserted(ErrorCodes::InternalError, "failAllUpdates failpoint active!"); } diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript index 0717e281ce1..193fab16ab3 100644 --- a/src/mongo/db/pipeline/SConscript +++ b/src/mongo/db/pipeline/SConscript @@ -282,6 +282,7 @@ env.Library( LIBDEPS=[ '$BUILD_DIR/mongo/db/auth/auth', '$BUILD_DIR/mongo/db/generic_cursor', + '$BUILD_DIR/mongo/s/sharding_router_api', 'field_path', ] ) @@ -319,7 +320,6 @@ env.Library( ], LIBDEPS=[ '$BUILD_DIR/mongo/s/query/async_results_merger', - '$BUILD_DIR/mongo/s/sharding_router_api', 'mongo_process_common', ] ) diff --git a/src/mongo/db/pipeline/document_source_out.cpp b/src/mongo/db/pipeline/document_source_out.cpp index be19fbd681e..7012af386af 100644 --- a/src/mongo/db/pipeline/document_source_out.cpp +++ b/src/mongo/db/pipeline/document_source_out.cpp @@ -31,6 +31,7 @@ #include "mongo/platform/basic.h" #include "mongo/db/ops/write_ops.h" +#include "mongo/db/pipeline/document_path_support.h" #include "mongo/db/pipeline/document_source_out.h" #include "mongo/db/pipeline/document_source_out_gen.h" #include "mongo/db/pipeline/document_source_out_in_place.h" @@ -238,7 +239,9 @@ intrusive_ptr<DocumentSourceOut> DocumentSourceOut::create( NamespaceString outputNs, const intrusive_ptr<ExpressionContext>& expCtx, WriteModeEnum mode, - std::set<FieldPath> uniqueKey) { + std::set<FieldPath> uniqueKey, + boost::optional<OID> targetEpoch) { + // TODO (SERVER-36832): Allow this combination. uassert( 50939, @@ -272,13 +275,13 @@ intrusive_ptr<DocumentSourceOut> DocumentSourceOut::create( switch (mode) { case WriteModeEnum::kModeReplaceCollection: return new DocumentSourceOutReplaceColl( - std::move(outputNs), expCtx, mode, std::move(uniqueKey)); + std::move(outputNs), expCtx, mode, std::move(uniqueKey), targetEpoch); case WriteModeEnum::kModeInsertDocuments: return new DocumentSourceOutInPlace( - std::move(outputNs), expCtx, mode, std::move(uniqueKey)); + std::move(outputNs), expCtx, mode, std::move(uniqueKey), targetEpoch); case WriteModeEnum::kModeReplaceDocuments: return new DocumentSourceOutInPlaceReplace( - std::move(outputNs), expCtx, mode, std::move(uniqueKey)); + std::move(outputNs), expCtx, mode, std::move(uniqueKey), targetEpoch); default: MONGO_UNREACHABLE; } @@ -287,11 +290,13 @@ intrusive_ptr<DocumentSourceOut> DocumentSourceOut::create( DocumentSourceOut::DocumentSourceOut(NamespaceString outputNs, const intrusive_ptr<ExpressionContext>& expCtx, WriteModeEnum mode, - std::set<FieldPath> uniqueKey) + std::set<FieldPath> uniqueKey, + boost::optional<OID> targetEpoch) : DocumentSource(expCtx), _writeConcern(expCtx->opCtx->getWriteConcern()), - _done(false), _outputNs(std::move(outputNs)), + _targetEpoch(targetEpoch), + _done(false), _mode(mode), _uniqueKeyFields(std::move(uniqueKey)), _uniqueKeyIncludesId(_uniqueKeyFields.count("_id") == 1) {} @@ -302,6 +307,7 @@ intrusive_ptr<DocumentSource> DocumentSourceOut::createFromBson( auto mode = WriteModeEnum::kModeReplaceCollection; std::set<FieldPath> uniqueKey; NamespaceString outputNs; + boost::optional<OID> targetEpoch; if (elem.type() == BSONType::String) { outputNs = NamespaceString(expCtx->ns.db().toString() + '.' + elem.str()); uniqueKey.emplace("_id"); @@ -310,6 +316,10 @@ intrusive_ptr<DocumentSource> DocumentSourceOut::createFromBson( DocumentSourceOutSpec::parse(IDLParserErrorContext("$out"), elem.embeddedObject()); mode = spec.getMode(); + targetEpoch = spec.getTargetEpoch(); + uassert(50984, + "$out received unexpected 'targetEpoch' on mongos", + !(expCtx->inMongos && bool(targetEpoch))); // Retrieve the target database from the user command, otherwise use the namespace from the // expression context. @@ -320,26 +330,40 @@ intrusive_ptr<DocumentSource> DocumentSourceOut::createFromBson( } // Convert unique key object to a vector of FieldPaths. - std::vector<FieldPath> docKeyPaths = std::get<0>( - expCtx->mongoProcessInterface->collectDocumentKeyFields(expCtx->opCtx, outputNs)); - std::set<FieldPath> docKeyPathsSet = - std::set<FieldPath>(std::make_move_iterator(docKeyPaths.begin()), - std::make_move_iterator(docKeyPaths.end())); if (auto userSpecifiedUniqueKey = spec.getUniqueKey()) { uniqueKey = parseUniqueKeyFromSpec(userSpecifiedUniqueKey.get()); - // Skip the unique index check if the provided uniqueKey is the documentKey. - const bool isDocumentKey = (uniqueKey == docKeyPathsSet); - // Make sure the uniqueKey has a supporting index. Skip this check if the command is // sent from mongos since the uniqueKey check would've happened already. uassert(50938, - "Cannot find index to verify that $out's unique key will be unique", - expCtx->fromMongos || isDocumentKey || + str::stream() + << "Cannot find index to verify that $out's unique key will be unique: " + << userSpecifiedUniqueKey, + expCtx->fromMongos || expCtx->mongoProcessInterface->uniqueKeyIsSupportedByIndex( expCtx, outputNs, uniqueKey)); } else { - uniqueKey = std::move(docKeyPathsSet); + if (expCtx->inMongos && mode != WriteModeEnum::kModeReplaceCollection) { + // In case there are multiple shards which will perform this $out in parallel, we + // need to figure out and attach the collection's epoch to ensure each shard is + // talking about the same version of the collection. This mongos will coordinate + // that. We force a catalog refresh to do so because there is no shard versioning + // protocol on this namespace. We will also figure out and attach the uniqueKey to + // send to the shards. We don't need to do this for 'replaceCollection' mode since + // that mode cannot currently target a sharded collection. + + // There are cases where the aggregation could fail if the collection is dropped or + // re-created during or near the time of the aggregation. This is okay - we are + // mostly paranoid that this mongos is very stale and want to prevent returning an + // error if the collection was dropped a long time ago. Because of this, we are okay + // with piggy-backing off another thread's request to refresh the cache, simply + // waiting for that request to return instead of forcing another refresh. + targetEpoch = expCtx->mongoProcessInterface->refreshAndGetEpoch(expCtx, outputNs); + } + std::vector<FieldPath> docKeyPaths = std::get<0>( + expCtx->mongoProcessInterface->collectDocumentKeyFields(expCtx->opCtx, outputNs)); + uniqueKey = std::set<FieldPath>(std::make_move_iterator(docKeyPaths.begin()), + std::make_move_iterator(docKeyPaths.end())); } } else { uasserted(16990, @@ -347,20 +371,23 @@ intrusive_ptr<DocumentSource> DocumentSourceOut::createFromBson( << typeName(elem.type())); } - return create(std::move(outputNs), expCtx, mode, std::move(uniqueKey)); + return create(std::move(outputNs), expCtx, mode, std::move(uniqueKey), targetEpoch); } Value DocumentSourceOut::serialize(boost::optional<ExplainOptions::Verbosity> explain) const { - MutableDocument serialized( - Document{{DocumentSourceOutSpec::kTargetCollectionFieldName, _outputNs.coll()}, - {DocumentSourceOutSpec::kTargetDbFieldName, _outputNs.db()}, - {DocumentSourceOutSpec::kModeFieldName, WriteMode_serializer(_mode)}}); - BSONObjBuilder uniqueKeyBob; - for (auto path : _uniqueKeyFields) { - uniqueKeyBob.append(path.fullPath(), 1); - } - serialized[DocumentSourceOutSpec::kUniqueKeyFieldName] = Value(uniqueKeyBob.done()); - return Value(Document{{getSourceName(), serialized.freeze()}}); + DocumentSourceOutSpec spec; + spec.setTargetDb(_outputNs.db()); + spec.setTargetCollection(_outputNs.coll()); + spec.setMode(_mode); + spec.setUniqueKey([&]() { + BSONObjBuilder uniqueKeyBob; + for (auto path : _uniqueKeyFields) { + uniqueKeyBob.append(path.fullPath(), 1); + } + return uniqueKeyBob.obj(); + }()); + spec.setTargetEpoch(_targetEpoch); + return Value(Document{{getSourceName(), spec.toBSON()}}); } DepsTracker::State DocumentSourceOut::getDependencies(DepsTracker* deps) const { diff --git a/src/mongo/db/pipeline/document_source_out.h b/src/mongo/db/pipeline/document_source_out.h index 62deff0291a..c47a9a52ecc 100644 --- a/src/mongo/db/pipeline/document_source_out.h +++ b/src/mongo/db/pipeline/document_source_out.h @@ -72,7 +72,8 @@ public: DocumentSourceOut(NamespaceString outputNs, const boost::intrusive_ptr<ExpressionContext>& expCtx, WriteModeEnum mode, - std::set<FieldPath> uniqueKey); + std::set<FieldPath> uniqueKey, + boost::optional<OID> targetEpoch); virtual ~DocumentSourceOut() = default; @@ -167,7 +168,7 @@ public: */ virtual void spill(BatchedObjects&& batch) { pExpCtx->mongoProcessInterface->insert( - pExpCtx, getWriteNs(), std::move(batch.objects), _writeConcern); + pExpCtx, getWriteNs(), std::move(batch.objects), _writeConcern, _targetEpoch); }; /** @@ -182,7 +183,8 @@ public: NamespaceString outputNs, const boost::intrusive_ptr<ExpressionContext>& expCtx, WriteModeEnum, - std::set<FieldPath> uniqueKey = std::set<FieldPath>{"_id"}); + std::set<FieldPath> uniqueKey = std::set<FieldPath>{"_id"}, + boost::optional<OID> targetEpoch = boost::none); /** * Parses a $out stage from the user-supplied BSON. @@ -198,11 +200,13 @@ protected: // respect the writeConcern of the original command. WriteConcernOptions _writeConcern; + const NamespaceString _outputNs; + boost::optional<OID> _targetEpoch; + private: bool _initialized = false; bool _done = false; - const NamespaceString _outputNs; WriteModeEnum _mode; // Holds the unique key used for uniquely identifying documents. There must exist a unique index diff --git a/src/mongo/db/pipeline/document_source_out.idl b/src/mongo/db/pipeline/document_source_out.idl index 7ae0b5acd9f..006dfe665ab 100644 --- a/src/mongo/db/pipeline/document_source_out.idl +++ b/src/mongo/db/pipeline/document_source_out.idl @@ -70,3 +70,13 @@ structs: type: object optional: true description: Document of fields representing the unique key. + + epoch: + cpp_name: targetEpoch + type: objectid + optional: true + description: If set, the epoch found when parsed on mongos. Can be used to check if + a collection has since been dropped and re-created, in which case the + shard key may have changed. As of this writing, this also can be used + to detect if the collection has gone from unsharded to sharded, and + thus now has a shard key. diff --git a/src/mongo/db/pipeline/document_source_out_in_place.h b/src/mongo/db/pipeline/document_source_out_in_place.h index 791d0985657..993378466d1 100644 --- a/src/mongo/db/pipeline/document_source_out_in_place.h +++ b/src/mongo/db/pipeline/document_source_out_in_place.h @@ -43,7 +43,7 @@ public: using DocumentSourceOut::DocumentSourceOut; const NamespaceString& getWriteNs() const final { - return getOutputNs(); + return _outputNs; }; /** @@ -75,7 +75,8 @@ public: std::move(batch.objects), _writeConcern, upsert, - multi); + multi, + _targetEpoch); } catch (const ExceptionFor<ErrorCodes::ImmutableField>& ex) { uassertStatusOKWithContext(ex.toStatus(), "$out failed to update the matching document, did you " diff --git a/src/mongo/db/pipeline/document_source_out_test.cpp b/src/mongo/db/pipeline/document_source_out_test.cpp index 1b040ca236b..01839641dfb 100644 --- a/src/mongo/db/pipeline/document_source_out_test.cpp +++ b/src/mongo/db/pipeline/document_source_out_test.cpp @@ -339,6 +339,23 @@ TEST_F(DocumentSourceOutTest, FailsToParseIfUniqueKeyHasDuplicateFields) { ASSERT_THROWS_CODE(createOutStage(spec), AssertionException, ErrorCodes::BadValue); } +TEST_F(DocumentSourceOutTest, FailsToParseIfTargetEpochIsSpecifiedOnMongos) { + BSONObj spec = BSON("$out" << BSON("to" + << "test" + << "mode" + << kDefaultMode + << "uniqueKey" + << BSON("_id" << 1) + << "epoch" + << OID::gen())); + getExpCtx()->inMongos = true; + ASSERT_THROWS_CODE(createOutStage(spec), AssertionException, 50984); + + // Test that 'targetEpoch' is accepted if not in mongos. + getExpCtx()->inMongos = false; + ASSERT(createOutStage(spec) != nullptr); +} + TEST_F(DocumentSourceOutTest, CorrectlyUsesTargetDbThatMatchesAggregationDb) { const auto targetDbSameAsAggregationDb = getExpCtx()->ns.db(); const auto targetColl = "test"_sd; @@ -350,7 +367,7 @@ TEST_F(DocumentSourceOutTest, CorrectlyUsesTargetDbThatMatchesAggregationDb) { ASSERT_EQ(outStage->getOutputNs().coll(), targetColl); } -// TODO (SERVER-50939): Allow "replaceCollection" to a foreign database. +// TODO (SERVER-36832): Allow "replaceCollection" to a foreign database. TEST_F(DocumentSourceOutTest, CorrectlyUsesForeignTargetDb) { const auto foreignDb = "someOtherDb"_sd; const auto targetColl = "test"_sd; diff --git a/src/mongo/db/pipeline/mongo_process_common.cpp b/src/mongo/db/pipeline/mongo_process_common.cpp index 9609a4909b5..390900ce3af 100644 --- a/src/mongo/db/pipeline/mongo_process_common.cpp +++ b/src/mongo/db/pipeline/mongo_process_common.cpp @@ -39,6 +39,8 @@ #include "mongo/db/operation_context.h" #include "mongo/db/pipeline/expression_context.h" #include "mongo/db/service_context.h" +#include "mongo/s/catalog_cache.h" +#include "mongo/s/grid.h" #include "mongo/util/net/socket_utils.h" namespace mongo { @@ -135,4 +137,16 @@ bool MongoProcessCommon::keyPatternNamesExactPaths(const BSONObj& keyPattern, return nFieldsMatched == uniqueKeyPaths.size(); } +boost::optional<OID> MongoProcessCommon::refreshAndGetEpoch( + const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& nss) const { + const bool forceRefreshFromThisThread = false; + auto routingInfo = uassertStatusOK( + Grid::get(expCtx->opCtx) + ->catalogCache() + ->getCollectionRoutingInfoWithRefresh(expCtx->opCtx, nss, forceRefreshFromThisThread)); + if (auto chunkManager = routingInfo.cm()) { + return chunkManager->getVersion().epoch(); + } + return boost::none; +} } // namespace mongo diff --git a/src/mongo/db/pipeline/mongo_process_common.h b/src/mongo/db/pipeline/mongo_process_common.h index 9aa6149120d..53ce182e93a 100644 --- a/src/mongo/db/pipeline/mongo_process_common.h +++ b/src/mongo/db/pipeline/mongo_process_common.h @@ -60,6 +60,10 @@ public: CurrentOpTruncateMode truncateMode, CurrentOpCursorMode cursorMode) const final; + virtual boost::optional<OID> refreshAndGetEpoch( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString& nss) const final; + protected: /** * Returns a BSONObj representing a report of the operation which is currently being diff --git a/src/mongo/db/pipeline/mongo_process_interface.h b/src/mongo/db/pipeline/mongo_process_interface.h index da96b66c4b0..033d9658c51 100644 --- a/src/mongo/db/pipeline/mongo_process_interface.h +++ b/src/mongo/db/pipeline/mongo_process_interface.h @@ -108,16 +108,20 @@ public: virtual bool isSharded(OperationContext* opCtx, const NamespaceString& ns) = 0; /** - * Inserts 'objs' into 'ns' and throws a UserException if the insert fails. + * Inserts 'objs' into 'ns' and throws a UserException if the insert fails. If 'targetEpoch' is + * set, throws ErrorCodes::StaleEpoch if the targeted collection does not have the same epoch or + * the epoch changes during the course of the insert. */ virtual void insert(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, std::vector<BSONObj>&& objs, - const WriteConcernOptions& wc) = 0; + const WriteConcernOptions& wc, + boost::optional<OID> targetEpoch) = 0; /** * Updates the documents matching 'queries' with the objects 'updates'. Throws a UserException - * if any of the updates fail. + * if any of the updates fail. If 'targetEpoch' is set, throws ErrorCodes::StaleEpoch if the + * targeted collection does not have the same epoch, or if the epoch changes during the update. */ virtual void update(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, @@ -125,7 +129,8 @@ public: std::vector<BSONObj>&& updates, const WriteConcernOptions& wc, bool upsert, - bool multi) = 0; + bool multi, + boost::optional<OID> targetEpoch) = 0; virtual CollectionIndexUsageMap getIndexStats(OperationContext* opCtx, const NamespaceString& ns) = 0; @@ -269,6 +274,16 @@ public: virtual bool uniqueKeyIsSupportedByIndex(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& nss, const std::set<FieldPath>& uniqueKeyPaths) const = 0; + + /** + * Refreshes the CatalogCache entry for the namespace 'nss', and returns the epoch associated + * with that namespace, if any. Note that this refresh will not necessarily force a new + * request to be sent to the config servers. If another thread has already requested a refresh, + * it will instead wait for that response. + */ + virtual boost::optional<OID> refreshAndGetEpoch( + const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString& nss) const = 0; }; } // namespace mongo diff --git a/src/mongo/db/pipeline/mongos_process_interface.cpp b/src/mongo/db/pipeline/mongos_process_interface.cpp index 569b968c94f..75d69161eb0 100644 --- a/src/mongo/db/pipeline/mongos_process_interface.cpp +++ b/src/mongo/db/pipeline/mongos_process_interface.cpp @@ -104,7 +104,9 @@ bool supportsUniqueKey(const boost::intrusive_ptr<ExpressionContext>& expCtx, ? index.getObjectField(IndexDescriptor::kCollationFieldName) : CollationSpec::kSimpleSpec)); - return index.getBoolField(IndexDescriptor::kUniqueFieldName) && + // SERVER-5335: The _id index does not report to be unique, but in fact is unique. + auto isIdIndex = index[IndexDescriptor::kIndexNameFieldName].String() == "_id_"; + return (isIdIndex || index.getBoolField(IndexDescriptor::kUniqueFieldName)) && !index.hasField(IndexDescriptor::kPartialFilterExprFieldName) && MongoProcessCommon::keyPatternNamesExactPaths( index.getObjectField(IndexDescriptor::kKeyPatternFieldName), uniqueKeyPaths) && diff --git a/src/mongo/db/pipeline/mongos_process_interface.h b/src/mongo/db/pipeline/mongos_process_interface.h index ee900586ba4..16a7005d262 100644 --- a/src/mongo/db/pipeline/mongos_process_interface.h +++ b/src/mongo/db/pipeline/mongos_process_interface.h @@ -66,7 +66,8 @@ public: void insert(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, std::vector<BSONObj>&& objs, - const WriteConcernOptions& wc) final { + const WriteConcernOptions& wc, + boost::optional<OID>) final { MONGO_UNREACHABLE; } @@ -76,7 +77,8 @@ public: std::vector<BSONObj>&& updates, const WriteConcernOptions& wc, bool upsert, - bool multi) final { + bool multi, + boost::optional<OID>) final { MONGO_UNREACHABLE; } diff --git a/src/mongo/db/pipeline/process_interface_shardsvr.cpp b/src/mongo/db/pipeline/process_interface_shardsvr.cpp index c6ccdb2d7ad..b9d5fffe02b 100644 --- a/src/mongo/db/pipeline/process_interface_shardsvr.cpp +++ b/src/mongo/db/pipeline/process_interface_shardsvr.cpp @@ -134,7 +134,8 @@ std::pair<std::vector<FieldPath>, bool> MongoInterfaceShardServer::collectDocume void MongoInterfaceShardServer::insert(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, std::vector<BSONObj>&& objs, - const WriteConcernOptions& wc) { + const WriteConcernOptions& wc, + boost::optional<OID> targetEpoch) { BatchedCommandResponse response; BatchWriteExecStats stats; @@ -144,7 +145,7 @@ void MongoInterfaceShardServer::insert(const boost::intrusive_ptr<ExpressionCont // If applicable, attach a write concern to the batched command request. attachWriteConcern(&insertCommand, wc); - ClusterWriter::write(expCtx->opCtx, insertCommand, &stats, &response); + ClusterWriter::write(expCtx->opCtx, insertCommand, &stats, &response, targetEpoch); // TODO SERVER-35403: Add more context for which shard produced the error. uassertStatusOKWithContext(response.toStatus(), "Insert failed: "); @@ -156,7 +157,8 @@ void MongoInterfaceShardServer::update(const boost::intrusive_ptr<ExpressionCont std::vector<BSONObj>&& updates, const WriteConcernOptions& wc, bool upsert, - bool multi) { + bool multi, + boost::optional<OID> targetEpoch) { BatchedCommandResponse response; BatchWriteExecStats stats; @@ -170,7 +172,7 @@ void MongoInterfaceShardServer::update(const boost::intrusive_ptr<ExpressionCont // If applicable, attach a write concern to the batched command request. attachWriteConcern(&updateCommand, wc); - ClusterWriter::write(expCtx->opCtx, updateCommand, &stats, &response); + ClusterWriter::write(expCtx->opCtx, updateCommand, &stats, &response, targetEpoch); // TODO SERVER-35403: Add more context for which shard produced the error. uassertStatusOKWithContext(response.toStatus(), "Update failed: "); diff --git a/src/mongo/db/pipeline/process_interface_shardsvr.h b/src/mongo/db/pipeline/process_interface_shardsvr.h index 39d550fc083..ddafac24cdb 100644 --- a/src/mongo/db/pipeline/process_interface_shardsvr.h +++ b/src/mongo/db/pipeline/process_interface_shardsvr.h @@ -53,7 +53,8 @@ public: void insert(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, std::vector<BSONObj>&& objs, - const WriteConcernOptions& wc) final; + const WriteConcernOptions& wc, + boost::optional<OID> targetEpoch) final; /** * Replaces the documents matching 'queries' with 'updates' using the ClusterWriter for locking, @@ -65,7 +66,8 @@ public: std::vector<BSONObj>&& updates, const WriteConcernOptions& wc, bool upsert, - bool multi) final; + bool multi, + boost::optional<OID> targetEpoch) final; }; } // namespace mongo diff --git a/src/mongo/db/pipeline/process_interface_standalone.cpp b/src/mongo/db/pipeline/process_interface_standalone.cpp index 8e874498b15..8f7113042d9 100644 --- a/src/mongo/db/pipeline/process_interface_standalone.cpp +++ b/src/mongo/db/pipeline/process_interface_standalone.cpp @@ -156,7 +156,8 @@ Update MongoInterfaceStandalone::buildUpdateOp(const NamespaceString& nss, void MongoInterfaceStandalone::insert(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, std::vector<BSONObj>&& objs, - const WriteConcernOptions& wc) { + const WriteConcernOptions& wc, + boost::optional<OID> targetEpoch) { auto writeResults = performInserts( expCtx->opCtx, buildInsertOp(ns, std::move(objs), expCtx->bypassDocumentValidation)); @@ -179,7 +180,8 @@ void MongoInterfaceStandalone::update(const boost::intrusive_ptr<ExpressionConte std::vector<BSONObj>&& updates, const WriteConcernOptions& wc, bool upsert, - bool multi) { + bool multi, + boost::optional<OID> targetEpoch) { auto writeResults = performUpdates(expCtx->opCtx, buildUpdateOp(ns, std::move(queries), diff --git a/src/mongo/db/pipeline/process_interface_standalone.h b/src/mongo/db/pipeline/process_interface_standalone.h index de64e9b0578..7ed45fe14f1 100644 --- a/src/mongo/db/pipeline/process_interface_standalone.h +++ b/src/mongo/db/pipeline/process_interface_standalone.h @@ -59,14 +59,16 @@ public: void insert(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, std::vector<BSONObj>&& objs, - const WriteConcernOptions& wc) override; + const WriteConcernOptions& wc, + boost::optional<OID> targetEpoch) override; void update(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, std::vector<BSONObj>&& queries, std::vector<BSONObj>&& updates, const WriteConcernOptions& wc, bool upsert, - bool multi) override; + bool multi, + boost::optional<OID> targetEpoch) override; CollectionIndexUsageMap getIndexStats(OperationContext* opCtx, const NamespaceString& ns) final; void appendLatencyStats(OperationContext* opCtx, const NamespaceString& nss, diff --git a/src/mongo/db/pipeline/stub_mongo_process_interface.h b/src/mongo/db/pipeline/stub_mongo_process_interface.h index e822be1e7b1..d4f3a6dacb6 100644 --- a/src/mongo/db/pipeline/stub_mongo_process_interface.h +++ b/src/mongo/db/pipeline/stub_mongo_process_interface.h @@ -62,7 +62,8 @@ public: void insert(const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& ns, std::vector<BSONObj>&& objs, - const WriteConcernOptions& wc) override { + const WriteConcernOptions& wc, + boost::optional<OID>) override { MONGO_UNREACHABLE; } @@ -72,7 +73,8 @@ public: std::vector<BSONObj>&& updates, const WriteConcernOptions& wc, bool upsert, - bool multi) final { + bool multi, + boost::optional<OID>) final { MONGO_UNREACHABLE; } @@ -177,5 +179,10 @@ public: const std::set<FieldPath>& uniqueKeyPaths) const override { return true; } + + boost::optional<OID> refreshAndGetEpoch(const boost::intrusive_ptr<ExpressionContext>& expCtx, + const NamespaceString& nss) const override { + return boost::none; + } }; } // namespace mongo diff --git a/src/mongo/s/write_ops/batch_write_exec_test.cpp b/src/mongo/s/write_ops/batch_write_exec_test.cpp index 9f6fe308408..c76b915090f 100644 --- a/src/mongo/s/write_ops/batch_write_exec_test.cpp +++ b/src/mongo/s/write_ops/batch_write_exec_test.cpp @@ -575,6 +575,45 @@ TEST_F(BatchWriteExecTest, NonRetryableErrorTxnNumber) { future.timed_get(kFutureTimeout); } +TEST_F(BatchWriteExecTest, StaleEpochIsNotRetryable) { + // A StaleEpoch error is not retried. + + BatchedCommandRequest request([&] { + write_ops::Insert insertOp(nss); + insertOp.setWriteCommandBase([] { + write_ops::WriteCommandBase writeCommandBase; + writeCommandBase.setOrdered(true); + return writeCommandBase; + }()); + insertOp.setDocuments({BSON("x" << 1), BSON("x" << 2)}); + return insertOp; + }()); + request.setWriteConcern(BSONObj()); + + operationContext()->setLogicalSessionId(makeLogicalSessionIdForTest()); + operationContext()->setTxnNumber(5); + + BatchedCommandResponse nonRetryableErrResponse; + nonRetryableErrResponse.setStatus({ErrorCodes::StaleEpoch, "mock stale epoch error"}); + + auto future = launchAsync([&] { + BatchedCommandResponse response; + BatchWriteExecStats stats; + BatchWriteExec::executeBatch(operationContext(), nsTargeter, request, &response, &stats); + ASSERT(response.getOk()); + ASSERT_EQ(0, response.getN()); + ASSERT(response.isErrDetailsSet()); + ASSERT_EQUALS(response.getErrDetailsAt(0)->toStatus().code(), + nonRetryableErrResponse.toStatus().code()); + ASSERT(response.getErrDetailsAt(0)->toStatus().reason().find( + nonRetryableErrResponse.toStatus().reason()) != std::string::npos); + ASSERT_EQ(1, stats.numRounds); + }); + + expectInsertsReturnError({BSON("x" << 1), BSON("x" << 2)}, nonRetryableErrResponse); + + future.timed_get(kFutureTimeout); +} class BatchWriteExecTransactionTest : public BatchWriteExecTest { public: diff --git a/src/mongo/s/write_ops/chunk_manager_targeter.cpp b/src/mongo/s/write_ops/chunk_manager_targeter.cpp index 0c33291a7fe..727f8505489 100644 --- a/src/mongo/s/write_ops/chunk_manager_targeter.cpp +++ b/src/mongo/s/write_ops/chunk_manager_targeter.cpp @@ -317,8 +317,9 @@ bool wasMetadataRefreshed(const std::shared_ptr<ChunkManager>& managerA, } // namespace -ChunkManagerTargeter::ChunkManagerTargeter(const NamespaceString& nss) - : _nss(nss), _needsTargetingRefresh(false) {} +ChunkManagerTargeter::ChunkManagerTargeter(const NamespaceString& nss, + boost::optional<OID> targetEpoch) + : _nss(nss), _needsTargetingRefresh(false), _targetEpoch(targetEpoch) {} Status ChunkManagerTargeter::init(OperationContext* opCtx) { @@ -334,6 +335,13 @@ Status ChunkManagerTargeter::init(OperationContext* opCtx) { _routingInfo = std::move(routingInfoStatus.getValue()); + if (_targetEpoch) { + uassert(ErrorCodes::StaleEpoch, "Collection has been dropped", _routingInfo->cm()); + uassert(ErrorCodes::StaleEpoch, + "Collection has been dropped and recreated", + _routingInfo->cm()->getVersion().epoch() == *_targetEpoch); + } + return Status::OK(); } diff --git a/src/mongo/s/write_ops/chunk_manager_targeter.h b/src/mongo/s/write_ops/chunk_manager_targeter.h index d0f65e21402..c5738c6a786 100644 --- a/src/mongo/s/write_ops/chunk_manager_targeter.h +++ b/src/mongo/s/write_ops/chunk_manager_targeter.h @@ -58,12 +58,19 @@ class ChunkManagerTargeter : public NSTargeter { public: enum class UpdateType { kReplacement, kOpStyle, kUnknown }; - ChunkManagerTargeter(const NamespaceString& nss); + /** + * If 'targetEpoch' is not boost::none, throws a 'StaleEpoch' exception if the collection given + * by 'nss' is ever found to not have the target epoch. + */ + ChunkManagerTargeter(const NamespaceString& nss, + boost::optional<OID> targetEpoch = boost::none); /** * Initializes the ChunkManagerTargeter with the latest targeting information for the * namespace. May need to block and load information from a remote config server. * + * Throws a 'StaleEpoch' exception if the collection targeted has an epoch which does not match + * '_targetEpoch' * Returns !OK if the information could not be initialized. */ Status init(OperationContext* opCtx); @@ -166,6 +173,10 @@ private: // The latest loaded routing cache entry boost::optional<CachedCollectionRoutingInfo> _routingInfo; + // Set to the epoch of the namespace we are targeting. If we ever refresh the catalog cache and + // find a new epoch, we immediately throw a StaleEpoch exception. + boost::optional<OID> _targetEpoch; + // Map of shard->remote shard version reported from stale errors ShardVersionMap _remoteShardVersions; }; diff --git a/src/mongo/s/write_ops/cluster_write.cpp b/src/mongo/s/write_ops/cluster_write.cpp index be7664f3931..7ed3ac2553d 100644 --- a/src/mongo/s/write_ops/cluster_write.cpp +++ b/src/mongo/s/write_ops/cluster_write.cpp @@ -67,7 +67,8 @@ void toBatchError(const Status& status, BatchedCommandResponse* response) { void ClusterWriter::write(OperationContext* opCtx, const BatchedCommandRequest& request, BatchWriteExecStats* stats, - BatchedCommandResponse* response) { + BatchedCommandResponse* response, + boost::optional<OID> targetEpoch) { const NamespaceString& nss = request.getNS(); LastError::Disabled disableLastError(&LastError::get(opCtx->getClient())); @@ -77,7 +78,7 @@ void ClusterWriter::write(OperationContext* opCtx, Grid::get(opCtx)->catalogClient()->writeConfigServerDirect(opCtx, request, response); } else { { - ChunkManagerTargeter targeter(request.getNS()); + ChunkManagerTargeter targeter(request.getNS(), targetEpoch); Status targetInitStatus = targeter.init(opCtx); if (!targetInitStatus.isOK()) { diff --git a/src/mongo/s/write_ops/cluster_write.h b/src/mongo/s/write_ops/cluster_write.h index 96530e55fd5..ab67644bdbf 100644 --- a/src/mongo/s/write_ops/cluster_write.h +++ b/src/mongo/s/write_ops/cluster_write.h @@ -41,10 +41,15 @@ class OperationContext; class ClusterWriter { public: + /** + * If 'targetEpoch' is set, throws a 'StaleEpoch' error if the targeted namespace is found to no + * longer have the epoch given by 'targetEpoch'. + */ static void write(OperationContext* opCtx, const BatchedCommandRequest& request, BatchWriteExecStats* stats, - BatchedCommandResponse* response); + BatchedCommandResponse* response, + boost::optional<OID> targetEpoch = boost::none); }; } // namespace mongo |