summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/ops/write_ops_exec.cpp7
-rw-r--r--src/mongo/db/pipeline/SConscript2
-rw-r--r--src/mongo/db/pipeline/document_source_out.cpp83
-rw-r--r--src/mongo/db/pipeline/document_source_out.h12
-rw-r--r--src/mongo/db/pipeline/document_source_out.idl10
-rw-r--r--src/mongo/db/pipeline/document_source_out_in_place.h5
-rw-r--r--src/mongo/db/pipeline/document_source_out_test.cpp19
-rw-r--r--src/mongo/db/pipeline/mongo_process_common.cpp14
-rw-r--r--src/mongo/db/pipeline/mongo_process_common.h4
-rw-r--r--src/mongo/db/pipeline/mongo_process_interface.h23
-rw-r--r--src/mongo/db/pipeline/mongos_process_interface.cpp4
-rw-r--r--src/mongo/db/pipeline/mongos_process_interface.h6
-rw-r--r--src/mongo/db/pipeline/process_interface_shardsvr.cpp10
-rw-r--r--src/mongo/db/pipeline/process_interface_shardsvr.h6
-rw-r--r--src/mongo/db/pipeline/process_interface_standalone.cpp6
-rw-r--r--src/mongo/db/pipeline/process_interface_standalone.h6
-rw-r--r--src/mongo/db/pipeline/stub_mongo_process_interface.h11
-rw-r--r--src/mongo/s/write_ops/batch_write_exec_test.cpp39
-rw-r--r--src/mongo/s/write_ops/chunk_manager_targeter.cpp12
-rw-r--r--src/mongo/s/write_ops/chunk_manager_targeter.h13
-rw-r--r--src/mongo/s/write_ops/cluster_write.cpp5
-rw-r--r--src/mongo/s/write_ops/cluster_write.h7
22 files changed, 243 insertions, 61 deletions
diff --git a/src/mongo/db/ops/write_ops_exec.cpp b/src/mongo/db/ops/write_ops_exec.cpp
index de92a6e94d6..834a2034033 100644
--- a/src/mongo/db/ops/write_ops_exec.cpp
+++ b/src/mongo/db/ops/write_ops_exec.cpp
@@ -93,6 +93,7 @@ MONGO_FAIL_POINT_DEFINE(hangBeforeChildRemoveOpFinishes);
MONGO_FAIL_POINT_DEFINE(hangBeforeChildRemoveOpIsPopped);
MONGO_FAIL_POINT_DEFINE(hangAfterAllChildRemoveOpsArePopped);
MONGO_FAIL_POINT_DEFINE(hangDuringBatchInsert);
+MONGO_FAIL_POINT_DEFINE(hangDuringBatchUpdate);
void updateRetryStats(OperationContext* opCtx, bool containsRetry) {
if (containsRetry) {
@@ -596,6 +597,12 @@ static SingleWriteResult performSingleUpdateOp(OperationContext* opCtx,
boost::optional<AutoGetCollection> collection;
while (true) {
+ if (MONGO_FAIL_POINT(hangDuringBatchUpdate)) {
+ log() << "batch update - hangDuringBatchUpdate fail point enabled. Blocking until "
+ "fail point is disabled.";
+ MONGO_FAIL_POINT_PAUSE_WHILE_SET(hangDuringBatchUpdate);
+ }
+
if (MONGO_FAIL_POINT(failAllUpdates)) {
uasserted(ErrorCodes::InternalError, "failAllUpdates failpoint active!");
}
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript
index 0717e281ce1..193fab16ab3 100644
--- a/src/mongo/db/pipeline/SConscript
+++ b/src/mongo/db/pipeline/SConscript
@@ -282,6 +282,7 @@ env.Library(
LIBDEPS=[
'$BUILD_DIR/mongo/db/auth/auth',
'$BUILD_DIR/mongo/db/generic_cursor',
+ '$BUILD_DIR/mongo/s/sharding_router_api',
'field_path',
]
)
@@ -319,7 +320,6 @@ env.Library(
],
LIBDEPS=[
'$BUILD_DIR/mongo/s/query/async_results_merger',
- '$BUILD_DIR/mongo/s/sharding_router_api',
'mongo_process_common',
]
)
diff --git a/src/mongo/db/pipeline/document_source_out.cpp b/src/mongo/db/pipeline/document_source_out.cpp
index be19fbd681e..7012af386af 100644
--- a/src/mongo/db/pipeline/document_source_out.cpp
+++ b/src/mongo/db/pipeline/document_source_out.cpp
@@ -31,6 +31,7 @@
#include "mongo/platform/basic.h"
#include "mongo/db/ops/write_ops.h"
+#include "mongo/db/pipeline/document_path_support.h"
#include "mongo/db/pipeline/document_source_out.h"
#include "mongo/db/pipeline/document_source_out_gen.h"
#include "mongo/db/pipeline/document_source_out_in_place.h"
@@ -238,7 +239,9 @@ intrusive_ptr<DocumentSourceOut> DocumentSourceOut::create(
NamespaceString outputNs,
const intrusive_ptr<ExpressionContext>& expCtx,
WriteModeEnum mode,
- std::set<FieldPath> uniqueKey) {
+ std::set<FieldPath> uniqueKey,
+ boost::optional<OID> targetEpoch) {
+
// TODO (SERVER-36832): Allow this combination.
uassert(
50939,
@@ -272,13 +275,13 @@ intrusive_ptr<DocumentSourceOut> DocumentSourceOut::create(
switch (mode) {
case WriteModeEnum::kModeReplaceCollection:
return new DocumentSourceOutReplaceColl(
- std::move(outputNs), expCtx, mode, std::move(uniqueKey));
+ std::move(outputNs), expCtx, mode, std::move(uniqueKey), targetEpoch);
case WriteModeEnum::kModeInsertDocuments:
return new DocumentSourceOutInPlace(
- std::move(outputNs), expCtx, mode, std::move(uniqueKey));
+ std::move(outputNs), expCtx, mode, std::move(uniqueKey), targetEpoch);
case WriteModeEnum::kModeReplaceDocuments:
return new DocumentSourceOutInPlaceReplace(
- std::move(outputNs), expCtx, mode, std::move(uniqueKey));
+ std::move(outputNs), expCtx, mode, std::move(uniqueKey), targetEpoch);
default:
MONGO_UNREACHABLE;
}
@@ -287,11 +290,13 @@ intrusive_ptr<DocumentSourceOut> DocumentSourceOut::create(
DocumentSourceOut::DocumentSourceOut(NamespaceString outputNs,
const intrusive_ptr<ExpressionContext>& expCtx,
WriteModeEnum mode,
- std::set<FieldPath> uniqueKey)
+ std::set<FieldPath> uniqueKey,
+ boost::optional<OID> targetEpoch)
: DocumentSource(expCtx),
_writeConcern(expCtx->opCtx->getWriteConcern()),
- _done(false),
_outputNs(std::move(outputNs)),
+ _targetEpoch(targetEpoch),
+ _done(false),
_mode(mode),
_uniqueKeyFields(std::move(uniqueKey)),
_uniqueKeyIncludesId(_uniqueKeyFields.count("_id") == 1) {}
@@ -302,6 +307,7 @@ intrusive_ptr<DocumentSource> DocumentSourceOut::createFromBson(
auto mode = WriteModeEnum::kModeReplaceCollection;
std::set<FieldPath> uniqueKey;
NamespaceString outputNs;
+ boost::optional<OID> targetEpoch;
if (elem.type() == BSONType::String) {
outputNs = NamespaceString(expCtx->ns.db().toString() + '.' + elem.str());
uniqueKey.emplace("_id");
@@ -310,6 +316,10 @@ intrusive_ptr<DocumentSource> DocumentSourceOut::createFromBson(
DocumentSourceOutSpec::parse(IDLParserErrorContext("$out"), elem.embeddedObject());
mode = spec.getMode();
+ targetEpoch = spec.getTargetEpoch();
+ uassert(50984,
+ "$out received unexpected 'targetEpoch' on mongos",
+ !(expCtx->inMongos && bool(targetEpoch)));
// Retrieve the target database from the user command, otherwise use the namespace from the
// expression context.
@@ -320,26 +330,40 @@ intrusive_ptr<DocumentSource> DocumentSourceOut::createFromBson(
}
// Convert unique key object to a vector of FieldPaths.
- std::vector<FieldPath> docKeyPaths = std::get<0>(
- expCtx->mongoProcessInterface->collectDocumentKeyFields(expCtx->opCtx, outputNs));
- std::set<FieldPath> docKeyPathsSet =
- std::set<FieldPath>(std::make_move_iterator(docKeyPaths.begin()),
- std::make_move_iterator(docKeyPaths.end()));
if (auto userSpecifiedUniqueKey = spec.getUniqueKey()) {
uniqueKey = parseUniqueKeyFromSpec(userSpecifiedUniqueKey.get());
- // Skip the unique index check if the provided uniqueKey is the documentKey.
- const bool isDocumentKey = (uniqueKey == docKeyPathsSet);
-
// Make sure the uniqueKey has a supporting index. Skip this check if the command is
// sent from mongos since the uniqueKey check would've happened already.
uassert(50938,
- "Cannot find index to verify that $out's unique key will be unique",
- expCtx->fromMongos || isDocumentKey ||
+ str::stream()
+ << "Cannot find index to verify that $out's unique key will be unique: "
+ << userSpecifiedUniqueKey,
+ expCtx->fromMongos ||
expCtx->mongoProcessInterface->uniqueKeyIsSupportedByIndex(
expCtx, outputNs, uniqueKey));
} else {
- uniqueKey = std::move(docKeyPathsSet);
+ if (expCtx->inMongos && mode != WriteModeEnum::kModeReplaceCollection) {
+ // In case there are multiple shards which will perform this $out in parallel, we
+ // need to figure out and attach the collection's epoch to ensure each shard is
+ // talking about the same version of the collection. This mongos will coordinate
+ // that. We force a catalog refresh to do so because there is no shard versioning
+ // protocol on this namespace. We will also figure out and attach the uniqueKey to
+ // send to the shards. We don't need to do this for 'replaceCollection' mode since
+ // that mode cannot currently target a sharded collection.
+
+ // There are cases where the aggregation could fail if the collection is dropped or
+ // re-created during or near the time of the aggregation. This is okay - we are
+ // mostly paranoid that this mongos is very stale and want to prevent returning an
+ // error if the collection was dropped a long time ago. Because of this, we are okay
+ // with piggy-backing off another thread's request to refresh the cache, simply
+ // waiting for that request to return instead of forcing another refresh.
+ targetEpoch = expCtx->mongoProcessInterface->refreshAndGetEpoch(expCtx, outputNs);
+ }
+ std::vector<FieldPath> docKeyPaths = std::get<0>(
+ expCtx->mongoProcessInterface->collectDocumentKeyFields(expCtx->opCtx, outputNs));
+ uniqueKey = std::set<FieldPath>(std::make_move_iterator(docKeyPaths.begin()),
+ std::make_move_iterator(docKeyPaths.end()));
}
} else {
uasserted(16990,
@@ -347,20 +371,23 @@ intrusive_ptr<DocumentSource> DocumentSourceOut::createFromBson(
<< typeName(elem.type()));
}
- return create(std::move(outputNs), expCtx, mode, std::move(uniqueKey));
+ return create(std::move(outputNs), expCtx, mode, std::move(uniqueKey), targetEpoch);
}
Value DocumentSourceOut::serialize(boost::optional<ExplainOptions::Verbosity> explain) const {
- MutableDocument serialized(
- Document{{DocumentSourceOutSpec::kTargetCollectionFieldName, _outputNs.coll()},
- {DocumentSourceOutSpec::kTargetDbFieldName, _outputNs.db()},
- {DocumentSourceOutSpec::kModeFieldName, WriteMode_serializer(_mode)}});
- BSONObjBuilder uniqueKeyBob;
- for (auto path : _uniqueKeyFields) {
- uniqueKeyBob.append(path.fullPath(), 1);
- }
- serialized[DocumentSourceOutSpec::kUniqueKeyFieldName] = Value(uniqueKeyBob.done());
- return Value(Document{{getSourceName(), serialized.freeze()}});
+ DocumentSourceOutSpec spec;
+ spec.setTargetDb(_outputNs.db());
+ spec.setTargetCollection(_outputNs.coll());
+ spec.setMode(_mode);
+ spec.setUniqueKey([&]() {
+ BSONObjBuilder uniqueKeyBob;
+ for (auto path : _uniqueKeyFields) {
+ uniqueKeyBob.append(path.fullPath(), 1);
+ }
+ return uniqueKeyBob.obj();
+ }());
+ spec.setTargetEpoch(_targetEpoch);
+ return Value(Document{{getSourceName(), spec.toBSON()}});
}
DepsTracker::State DocumentSourceOut::getDependencies(DepsTracker* deps) const {
diff --git a/src/mongo/db/pipeline/document_source_out.h b/src/mongo/db/pipeline/document_source_out.h
index 62deff0291a..c47a9a52ecc 100644
--- a/src/mongo/db/pipeline/document_source_out.h
+++ b/src/mongo/db/pipeline/document_source_out.h
@@ -72,7 +72,8 @@ public:
DocumentSourceOut(NamespaceString outputNs,
const boost::intrusive_ptr<ExpressionContext>& expCtx,
WriteModeEnum mode,
- std::set<FieldPath> uniqueKey);
+ std::set<FieldPath> uniqueKey,
+ boost::optional<OID> targetEpoch);
virtual ~DocumentSourceOut() = default;
@@ -167,7 +168,7 @@ public:
*/
virtual void spill(BatchedObjects&& batch) {
pExpCtx->mongoProcessInterface->insert(
- pExpCtx, getWriteNs(), std::move(batch.objects), _writeConcern);
+ pExpCtx, getWriteNs(), std::move(batch.objects), _writeConcern, _targetEpoch);
};
/**
@@ -182,7 +183,8 @@ public:
NamespaceString outputNs,
const boost::intrusive_ptr<ExpressionContext>& expCtx,
WriteModeEnum,
- std::set<FieldPath> uniqueKey = std::set<FieldPath>{"_id"});
+ std::set<FieldPath> uniqueKey = std::set<FieldPath>{"_id"},
+ boost::optional<OID> targetEpoch = boost::none);
/**
* Parses a $out stage from the user-supplied BSON.
@@ -198,11 +200,13 @@ protected:
// respect the writeConcern of the original command.
WriteConcernOptions _writeConcern;
+ const NamespaceString _outputNs;
+ boost::optional<OID> _targetEpoch;
+
private:
bool _initialized = false;
bool _done = false;
- const NamespaceString _outputNs;
WriteModeEnum _mode;
// Holds the unique key used for uniquely identifying documents. There must exist a unique index
diff --git a/src/mongo/db/pipeline/document_source_out.idl b/src/mongo/db/pipeline/document_source_out.idl
index 7ae0b5acd9f..006dfe665ab 100644
--- a/src/mongo/db/pipeline/document_source_out.idl
+++ b/src/mongo/db/pipeline/document_source_out.idl
@@ -70,3 +70,13 @@ structs:
type: object
optional: true
description: Document of fields representing the unique key.
+
+ epoch:
+ cpp_name: targetEpoch
+ type: objectid
+ optional: true
+ description: If set, the epoch found when parsed on mongos. Can be used to check if
+ a collection has since been dropped and re-created, in which case the
+ shard key may have changed. As of this writing, this also can be used
+ to detect if the collection has gone from unsharded to sharded, and
+ thus now has a shard key.
diff --git a/src/mongo/db/pipeline/document_source_out_in_place.h b/src/mongo/db/pipeline/document_source_out_in_place.h
index 791d0985657..993378466d1 100644
--- a/src/mongo/db/pipeline/document_source_out_in_place.h
+++ b/src/mongo/db/pipeline/document_source_out_in_place.h
@@ -43,7 +43,7 @@ public:
using DocumentSourceOut::DocumentSourceOut;
const NamespaceString& getWriteNs() const final {
- return getOutputNs();
+ return _outputNs;
};
/**
@@ -75,7 +75,8 @@ public:
std::move(batch.objects),
_writeConcern,
upsert,
- multi);
+ multi,
+ _targetEpoch);
} catch (const ExceptionFor<ErrorCodes::ImmutableField>& ex) {
uassertStatusOKWithContext(ex.toStatus(),
"$out failed to update the matching document, did you "
diff --git a/src/mongo/db/pipeline/document_source_out_test.cpp b/src/mongo/db/pipeline/document_source_out_test.cpp
index 1b040ca236b..01839641dfb 100644
--- a/src/mongo/db/pipeline/document_source_out_test.cpp
+++ b/src/mongo/db/pipeline/document_source_out_test.cpp
@@ -339,6 +339,23 @@ TEST_F(DocumentSourceOutTest, FailsToParseIfUniqueKeyHasDuplicateFields) {
ASSERT_THROWS_CODE(createOutStage(spec), AssertionException, ErrorCodes::BadValue);
}
+TEST_F(DocumentSourceOutTest, FailsToParseIfTargetEpochIsSpecifiedOnMongos) {
+ BSONObj spec = BSON("$out" << BSON("to"
+ << "test"
+ << "mode"
+ << kDefaultMode
+ << "uniqueKey"
+ << BSON("_id" << 1)
+ << "epoch"
+ << OID::gen()));
+ getExpCtx()->inMongos = true;
+ ASSERT_THROWS_CODE(createOutStage(spec), AssertionException, 50984);
+
+ // Test that 'targetEpoch' is accepted if not in mongos.
+ getExpCtx()->inMongos = false;
+ ASSERT(createOutStage(spec) != nullptr);
+}
+
TEST_F(DocumentSourceOutTest, CorrectlyUsesTargetDbThatMatchesAggregationDb) {
const auto targetDbSameAsAggregationDb = getExpCtx()->ns.db();
const auto targetColl = "test"_sd;
@@ -350,7 +367,7 @@ TEST_F(DocumentSourceOutTest, CorrectlyUsesTargetDbThatMatchesAggregationDb) {
ASSERT_EQ(outStage->getOutputNs().coll(), targetColl);
}
-// TODO (SERVER-50939): Allow "replaceCollection" to a foreign database.
+// TODO (SERVER-36832): Allow "replaceCollection" to a foreign database.
TEST_F(DocumentSourceOutTest, CorrectlyUsesForeignTargetDb) {
const auto foreignDb = "someOtherDb"_sd;
const auto targetColl = "test"_sd;
diff --git a/src/mongo/db/pipeline/mongo_process_common.cpp b/src/mongo/db/pipeline/mongo_process_common.cpp
index 9609a4909b5..390900ce3af 100644
--- a/src/mongo/db/pipeline/mongo_process_common.cpp
+++ b/src/mongo/db/pipeline/mongo_process_common.cpp
@@ -39,6 +39,8 @@
#include "mongo/db/operation_context.h"
#include "mongo/db/pipeline/expression_context.h"
#include "mongo/db/service_context.h"
+#include "mongo/s/catalog_cache.h"
+#include "mongo/s/grid.h"
#include "mongo/util/net/socket_utils.h"
namespace mongo {
@@ -135,4 +137,16 @@ bool MongoProcessCommon::keyPatternNamesExactPaths(const BSONObj& keyPattern,
return nFieldsMatched == uniqueKeyPaths.size();
}
+boost::optional<OID> MongoProcessCommon::refreshAndGetEpoch(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& nss) const {
+ const bool forceRefreshFromThisThread = false;
+ auto routingInfo = uassertStatusOK(
+ Grid::get(expCtx->opCtx)
+ ->catalogCache()
+ ->getCollectionRoutingInfoWithRefresh(expCtx->opCtx, nss, forceRefreshFromThisThread));
+ if (auto chunkManager = routingInfo.cm()) {
+ return chunkManager->getVersion().epoch();
+ }
+ return boost::none;
+}
} // namespace mongo
diff --git a/src/mongo/db/pipeline/mongo_process_common.h b/src/mongo/db/pipeline/mongo_process_common.h
index 9aa6149120d..53ce182e93a 100644
--- a/src/mongo/db/pipeline/mongo_process_common.h
+++ b/src/mongo/db/pipeline/mongo_process_common.h
@@ -60,6 +60,10 @@ public:
CurrentOpTruncateMode truncateMode,
CurrentOpCursorMode cursorMode) const final;
+ virtual boost::optional<OID> refreshAndGetEpoch(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString& nss) const final;
+
protected:
/**
* Returns a BSONObj representing a report of the operation which is currently being
diff --git a/src/mongo/db/pipeline/mongo_process_interface.h b/src/mongo/db/pipeline/mongo_process_interface.h
index da96b66c4b0..033d9658c51 100644
--- a/src/mongo/db/pipeline/mongo_process_interface.h
+++ b/src/mongo/db/pipeline/mongo_process_interface.h
@@ -108,16 +108,20 @@ public:
virtual bool isSharded(OperationContext* opCtx, const NamespaceString& ns) = 0;
/**
- * Inserts 'objs' into 'ns' and throws a UserException if the insert fails.
+ * Inserts 'objs' into 'ns' and throws a UserException if the insert fails. If 'targetEpoch' is
+ * set, throws ErrorCodes::StaleEpoch if the targeted collection does not have the same epoch or
+ * the epoch changes during the course of the insert.
*/
virtual void insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
std::vector<BSONObj>&& objs,
- const WriteConcernOptions& wc) = 0;
+ const WriteConcernOptions& wc,
+ boost::optional<OID> targetEpoch) = 0;
/**
* Updates the documents matching 'queries' with the objects 'updates'. Throws a UserException
- * if any of the updates fail.
+ * if any of the updates fail. If 'targetEpoch' is set, throws ErrorCodes::StaleEpoch if the
+ * targeted collection does not have the same epoch, or if the epoch changes during the update.
*/
virtual void update(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
@@ -125,7 +129,8 @@ public:
std::vector<BSONObj>&& updates,
const WriteConcernOptions& wc,
bool upsert,
- bool multi) = 0;
+ bool multi,
+ boost::optional<OID> targetEpoch) = 0;
virtual CollectionIndexUsageMap getIndexStats(OperationContext* opCtx,
const NamespaceString& ns) = 0;
@@ -269,6 +274,16 @@ public:
virtual bool uniqueKeyIsSupportedByIndex(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& nss,
const std::set<FieldPath>& uniqueKeyPaths) const = 0;
+
+ /**
+ * Refreshes the CatalogCache entry for the namespace 'nss', and returns the epoch associated
+ * with that namespace, if any. Note that this refresh will not necessarily force a new
+ * request to be sent to the config servers. If another thread has already requested a refresh,
+ * it will instead wait for that response.
+ */
+ virtual boost::optional<OID> refreshAndGetEpoch(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString& nss) const = 0;
};
} // namespace mongo
diff --git a/src/mongo/db/pipeline/mongos_process_interface.cpp b/src/mongo/db/pipeline/mongos_process_interface.cpp
index 569b968c94f..75d69161eb0 100644
--- a/src/mongo/db/pipeline/mongos_process_interface.cpp
+++ b/src/mongo/db/pipeline/mongos_process_interface.cpp
@@ -104,7 +104,9 @@ bool supportsUniqueKey(const boost::intrusive_ptr<ExpressionContext>& expCtx,
? index.getObjectField(IndexDescriptor::kCollationFieldName)
: CollationSpec::kSimpleSpec));
- return index.getBoolField(IndexDescriptor::kUniqueFieldName) &&
+ // SERVER-5335: The _id index does not report to be unique, but in fact is unique.
+ auto isIdIndex = index[IndexDescriptor::kIndexNameFieldName].String() == "_id_";
+ return (isIdIndex || index.getBoolField(IndexDescriptor::kUniqueFieldName)) &&
!index.hasField(IndexDescriptor::kPartialFilterExprFieldName) &&
MongoProcessCommon::keyPatternNamesExactPaths(
index.getObjectField(IndexDescriptor::kKeyPatternFieldName), uniqueKeyPaths) &&
diff --git a/src/mongo/db/pipeline/mongos_process_interface.h b/src/mongo/db/pipeline/mongos_process_interface.h
index ee900586ba4..16a7005d262 100644
--- a/src/mongo/db/pipeline/mongos_process_interface.h
+++ b/src/mongo/db/pipeline/mongos_process_interface.h
@@ -66,7 +66,8 @@ public:
void insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
std::vector<BSONObj>&& objs,
- const WriteConcernOptions& wc) final {
+ const WriteConcernOptions& wc,
+ boost::optional<OID>) final {
MONGO_UNREACHABLE;
}
@@ -76,7 +77,8 @@ public:
std::vector<BSONObj>&& updates,
const WriteConcernOptions& wc,
bool upsert,
- bool multi) final {
+ bool multi,
+ boost::optional<OID>) final {
MONGO_UNREACHABLE;
}
diff --git a/src/mongo/db/pipeline/process_interface_shardsvr.cpp b/src/mongo/db/pipeline/process_interface_shardsvr.cpp
index c6ccdb2d7ad..b9d5fffe02b 100644
--- a/src/mongo/db/pipeline/process_interface_shardsvr.cpp
+++ b/src/mongo/db/pipeline/process_interface_shardsvr.cpp
@@ -134,7 +134,8 @@ std::pair<std::vector<FieldPath>, bool> MongoInterfaceShardServer::collectDocume
void MongoInterfaceShardServer::insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
std::vector<BSONObj>&& objs,
- const WriteConcernOptions& wc) {
+ const WriteConcernOptions& wc,
+ boost::optional<OID> targetEpoch) {
BatchedCommandResponse response;
BatchWriteExecStats stats;
@@ -144,7 +145,7 @@ void MongoInterfaceShardServer::insert(const boost::intrusive_ptr<ExpressionCont
// If applicable, attach a write concern to the batched command request.
attachWriteConcern(&insertCommand, wc);
- ClusterWriter::write(expCtx->opCtx, insertCommand, &stats, &response);
+ ClusterWriter::write(expCtx->opCtx, insertCommand, &stats, &response, targetEpoch);
// TODO SERVER-35403: Add more context for which shard produced the error.
uassertStatusOKWithContext(response.toStatus(), "Insert failed: ");
@@ -156,7 +157,8 @@ void MongoInterfaceShardServer::update(const boost::intrusive_ptr<ExpressionCont
std::vector<BSONObj>&& updates,
const WriteConcernOptions& wc,
bool upsert,
- bool multi) {
+ bool multi,
+ boost::optional<OID> targetEpoch) {
BatchedCommandResponse response;
BatchWriteExecStats stats;
@@ -170,7 +172,7 @@ void MongoInterfaceShardServer::update(const boost::intrusive_ptr<ExpressionCont
// If applicable, attach a write concern to the batched command request.
attachWriteConcern(&updateCommand, wc);
- ClusterWriter::write(expCtx->opCtx, updateCommand, &stats, &response);
+ ClusterWriter::write(expCtx->opCtx, updateCommand, &stats, &response, targetEpoch);
// TODO SERVER-35403: Add more context for which shard produced the error.
uassertStatusOKWithContext(response.toStatus(), "Update failed: ");
diff --git a/src/mongo/db/pipeline/process_interface_shardsvr.h b/src/mongo/db/pipeline/process_interface_shardsvr.h
index 39d550fc083..ddafac24cdb 100644
--- a/src/mongo/db/pipeline/process_interface_shardsvr.h
+++ b/src/mongo/db/pipeline/process_interface_shardsvr.h
@@ -53,7 +53,8 @@ public:
void insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
std::vector<BSONObj>&& objs,
- const WriteConcernOptions& wc) final;
+ const WriteConcernOptions& wc,
+ boost::optional<OID> targetEpoch) final;
/**
* Replaces the documents matching 'queries' with 'updates' using the ClusterWriter for locking,
@@ -65,7 +66,8 @@ public:
std::vector<BSONObj>&& updates,
const WriteConcernOptions& wc,
bool upsert,
- bool multi) final;
+ bool multi,
+ boost::optional<OID> targetEpoch) final;
};
} // namespace mongo
diff --git a/src/mongo/db/pipeline/process_interface_standalone.cpp b/src/mongo/db/pipeline/process_interface_standalone.cpp
index 8e874498b15..8f7113042d9 100644
--- a/src/mongo/db/pipeline/process_interface_standalone.cpp
+++ b/src/mongo/db/pipeline/process_interface_standalone.cpp
@@ -156,7 +156,8 @@ Update MongoInterfaceStandalone::buildUpdateOp(const NamespaceString& nss,
void MongoInterfaceStandalone::insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
std::vector<BSONObj>&& objs,
- const WriteConcernOptions& wc) {
+ const WriteConcernOptions& wc,
+ boost::optional<OID> targetEpoch) {
auto writeResults = performInserts(
expCtx->opCtx, buildInsertOp(ns, std::move(objs), expCtx->bypassDocumentValidation));
@@ -179,7 +180,8 @@ void MongoInterfaceStandalone::update(const boost::intrusive_ptr<ExpressionConte
std::vector<BSONObj>&& updates,
const WriteConcernOptions& wc,
bool upsert,
- bool multi) {
+ bool multi,
+ boost::optional<OID> targetEpoch) {
auto writeResults = performUpdates(expCtx->opCtx,
buildUpdateOp(ns,
std::move(queries),
diff --git a/src/mongo/db/pipeline/process_interface_standalone.h b/src/mongo/db/pipeline/process_interface_standalone.h
index de64e9b0578..7ed45fe14f1 100644
--- a/src/mongo/db/pipeline/process_interface_standalone.h
+++ b/src/mongo/db/pipeline/process_interface_standalone.h
@@ -59,14 +59,16 @@ public:
void insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
std::vector<BSONObj>&& objs,
- const WriteConcernOptions& wc) override;
+ const WriteConcernOptions& wc,
+ boost::optional<OID> targetEpoch) override;
void update(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
std::vector<BSONObj>&& queries,
std::vector<BSONObj>&& updates,
const WriteConcernOptions& wc,
bool upsert,
- bool multi) override;
+ bool multi,
+ boost::optional<OID> targetEpoch) override;
CollectionIndexUsageMap getIndexStats(OperationContext* opCtx, const NamespaceString& ns) final;
void appendLatencyStats(OperationContext* opCtx,
const NamespaceString& nss,
diff --git a/src/mongo/db/pipeline/stub_mongo_process_interface.h b/src/mongo/db/pipeline/stub_mongo_process_interface.h
index e822be1e7b1..d4f3a6dacb6 100644
--- a/src/mongo/db/pipeline/stub_mongo_process_interface.h
+++ b/src/mongo/db/pipeline/stub_mongo_process_interface.h
@@ -62,7 +62,8 @@ public:
void insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
std::vector<BSONObj>&& objs,
- const WriteConcernOptions& wc) override {
+ const WriteConcernOptions& wc,
+ boost::optional<OID>) override {
MONGO_UNREACHABLE;
}
@@ -72,7 +73,8 @@ public:
std::vector<BSONObj>&& updates,
const WriteConcernOptions& wc,
bool upsert,
- bool multi) final {
+ bool multi,
+ boost::optional<OID>) final {
MONGO_UNREACHABLE;
}
@@ -177,5 +179,10 @@ public:
const std::set<FieldPath>& uniqueKeyPaths) const override {
return true;
}
+
+ boost::optional<OID> refreshAndGetEpoch(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString& nss) const override {
+ return boost::none;
+ }
};
} // namespace mongo
diff --git a/src/mongo/s/write_ops/batch_write_exec_test.cpp b/src/mongo/s/write_ops/batch_write_exec_test.cpp
index 9f6fe308408..c76b915090f 100644
--- a/src/mongo/s/write_ops/batch_write_exec_test.cpp
+++ b/src/mongo/s/write_ops/batch_write_exec_test.cpp
@@ -575,6 +575,45 @@ TEST_F(BatchWriteExecTest, NonRetryableErrorTxnNumber) {
future.timed_get(kFutureTimeout);
}
+TEST_F(BatchWriteExecTest, StaleEpochIsNotRetryable) {
+ // A StaleEpoch error is not retried.
+
+ BatchedCommandRequest request([&] {
+ write_ops::Insert insertOp(nss);
+ insertOp.setWriteCommandBase([] {
+ write_ops::WriteCommandBase writeCommandBase;
+ writeCommandBase.setOrdered(true);
+ return writeCommandBase;
+ }());
+ insertOp.setDocuments({BSON("x" << 1), BSON("x" << 2)});
+ return insertOp;
+ }());
+ request.setWriteConcern(BSONObj());
+
+ operationContext()->setLogicalSessionId(makeLogicalSessionIdForTest());
+ operationContext()->setTxnNumber(5);
+
+ BatchedCommandResponse nonRetryableErrResponse;
+ nonRetryableErrResponse.setStatus({ErrorCodes::StaleEpoch, "mock stale epoch error"});
+
+ auto future = launchAsync([&] {
+ BatchedCommandResponse response;
+ BatchWriteExecStats stats;
+ BatchWriteExec::executeBatch(operationContext(), nsTargeter, request, &response, &stats);
+ ASSERT(response.getOk());
+ ASSERT_EQ(0, response.getN());
+ ASSERT(response.isErrDetailsSet());
+ ASSERT_EQUALS(response.getErrDetailsAt(0)->toStatus().code(),
+ nonRetryableErrResponse.toStatus().code());
+ ASSERT(response.getErrDetailsAt(0)->toStatus().reason().find(
+ nonRetryableErrResponse.toStatus().reason()) != std::string::npos);
+ ASSERT_EQ(1, stats.numRounds);
+ });
+
+ expectInsertsReturnError({BSON("x" << 1), BSON("x" << 2)}, nonRetryableErrResponse);
+
+ future.timed_get(kFutureTimeout);
+}
class BatchWriteExecTransactionTest : public BatchWriteExecTest {
public:
diff --git a/src/mongo/s/write_ops/chunk_manager_targeter.cpp b/src/mongo/s/write_ops/chunk_manager_targeter.cpp
index 0c33291a7fe..727f8505489 100644
--- a/src/mongo/s/write_ops/chunk_manager_targeter.cpp
+++ b/src/mongo/s/write_ops/chunk_manager_targeter.cpp
@@ -317,8 +317,9 @@ bool wasMetadataRefreshed(const std::shared_ptr<ChunkManager>& managerA,
} // namespace
-ChunkManagerTargeter::ChunkManagerTargeter(const NamespaceString& nss)
- : _nss(nss), _needsTargetingRefresh(false) {}
+ChunkManagerTargeter::ChunkManagerTargeter(const NamespaceString& nss,
+ boost::optional<OID> targetEpoch)
+ : _nss(nss), _needsTargetingRefresh(false), _targetEpoch(targetEpoch) {}
Status ChunkManagerTargeter::init(OperationContext* opCtx) {
@@ -334,6 +335,13 @@ Status ChunkManagerTargeter::init(OperationContext* opCtx) {
_routingInfo = std::move(routingInfoStatus.getValue());
+ if (_targetEpoch) {
+ uassert(ErrorCodes::StaleEpoch, "Collection has been dropped", _routingInfo->cm());
+ uassert(ErrorCodes::StaleEpoch,
+ "Collection has been dropped and recreated",
+ _routingInfo->cm()->getVersion().epoch() == *_targetEpoch);
+ }
+
return Status::OK();
}
diff --git a/src/mongo/s/write_ops/chunk_manager_targeter.h b/src/mongo/s/write_ops/chunk_manager_targeter.h
index d0f65e21402..c5738c6a786 100644
--- a/src/mongo/s/write_ops/chunk_manager_targeter.h
+++ b/src/mongo/s/write_ops/chunk_manager_targeter.h
@@ -58,12 +58,19 @@ class ChunkManagerTargeter : public NSTargeter {
public:
enum class UpdateType { kReplacement, kOpStyle, kUnknown };
- ChunkManagerTargeter(const NamespaceString& nss);
+ /**
+ * If 'targetEpoch' is not boost::none, throws a 'StaleEpoch' exception if the collection given
+ * by 'nss' is ever found to not have the target epoch.
+ */
+ ChunkManagerTargeter(const NamespaceString& nss,
+ boost::optional<OID> targetEpoch = boost::none);
/**
* Initializes the ChunkManagerTargeter with the latest targeting information for the
* namespace. May need to block and load information from a remote config server.
*
+ * Throws a 'StaleEpoch' exception if the collection targeted has an epoch which does not match
+ * '_targetEpoch'
* Returns !OK if the information could not be initialized.
*/
Status init(OperationContext* opCtx);
@@ -166,6 +173,10 @@ private:
// The latest loaded routing cache entry
boost::optional<CachedCollectionRoutingInfo> _routingInfo;
+ // Set to the epoch of the namespace we are targeting. If we ever refresh the catalog cache and
+ // find a new epoch, we immediately throw a StaleEpoch exception.
+ boost::optional<OID> _targetEpoch;
+
// Map of shard->remote shard version reported from stale errors
ShardVersionMap _remoteShardVersions;
};
diff --git a/src/mongo/s/write_ops/cluster_write.cpp b/src/mongo/s/write_ops/cluster_write.cpp
index be7664f3931..7ed3ac2553d 100644
--- a/src/mongo/s/write_ops/cluster_write.cpp
+++ b/src/mongo/s/write_ops/cluster_write.cpp
@@ -67,7 +67,8 @@ void toBatchError(const Status& status, BatchedCommandResponse* response) {
void ClusterWriter::write(OperationContext* opCtx,
const BatchedCommandRequest& request,
BatchWriteExecStats* stats,
- BatchedCommandResponse* response) {
+ BatchedCommandResponse* response,
+ boost::optional<OID> targetEpoch) {
const NamespaceString& nss = request.getNS();
LastError::Disabled disableLastError(&LastError::get(opCtx->getClient()));
@@ -77,7 +78,7 @@ void ClusterWriter::write(OperationContext* opCtx,
Grid::get(opCtx)->catalogClient()->writeConfigServerDirect(opCtx, request, response);
} else {
{
- ChunkManagerTargeter targeter(request.getNS());
+ ChunkManagerTargeter targeter(request.getNS(), targetEpoch);
Status targetInitStatus = targeter.init(opCtx);
if (!targetInitStatus.isOK()) {
diff --git a/src/mongo/s/write_ops/cluster_write.h b/src/mongo/s/write_ops/cluster_write.h
index 96530e55fd5..ab67644bdbf 100644
--- a/src/mongo/s/write_ops/cluster_write.h
+++ b/src/mongo/s/write_ops/cluster_write.h
@@ -41,10 +41,15 @@ class OperationContext;
class ClusterWriter {
public:
+ /**
+ * If 'targetEpoch' is set, throws a 'StaleEpoch' error if the targeted namespace is found to no
+ * longer have the epoch given by 'targetEpoch'.
+ */
static void write(OperationContext* opCtx,
const BatchedCommandRequest& request,
BatchWriteExecStats* stats,
- BatchedCommandResponse* response);
+ BatchedCommandResponse* response,
+ boost::optional<OID> targetEpoch = boost::none);
};
} // namespace mongo