summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCharlie Swanson <charlie.swanson@mongodb.com>2018-09-18 13:23:54 -0400
committerCharlie Swanson <charlie.swanson@mongodb.com>2018-11-01 16:59:04 -0400
commit9282c05723eb9f15a6591613007ebe68561c88cb (patch)
treedf797690af51e981ca3845bd124c88fb93d88b69
parentb43765f2caa4c152204f7361c28e091247443c18 (diff)
downloadmongo-9282c05723eb9f15a6591613007ebe68561c88cb.tar.gz
SERVER-36813 Be careful when choosing default uniqueKey
Before doing so, refresh the catalog cache to make sure the mongos serving the request is at least somewhat up to date. Additionally, communicate the epoch used to choose the uniqueKey from mongos to the shards, and raise an error from the shards and terminate the aggregation if the epoch of the targeted collection ever changes.
-rw-r--r--buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml1
-rw-r--r--buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_misc.yml5
-rw-r--r--jstests/sharding/out_stale_unique_key.js170
-rw-r--r--jstests/sharding/out_unique_key_requires_index.js3
-rw-r--r--src/mongo/db/ops/write_ops_exec.cpp7
-rw-r--r--src/mongo/db/pipeline/SConscript2
-rw-r--r--src/mongo/db/pipeline/document_source_out.cpp83
-rw-r--r--src/mongo/db/pipeline/document_source_out.h12
-rw-r--r--src/mongo/db/pipeline/document_source_out.idl10
-rw-r--r--src/mongo/db/pipeline/document_source_out_in_place.h5
-rw-r--r--src/mongo/db/pipeline/document_source_out_test.cpp19
-rw-r--r--src/mongo/db/pipeline/mongo_process_common.cpp14
-rw-r--r--src/mongo/db/pipeline/mongo_process_common.h4
-rw-r--r--src/mongo/db/pipeline/mongo_process_interface.h23
-rw-r--r--src/mongo/db/pipeline/mongos_process_interface.cpp4
-rw-r--r--src/mongo/db/pipeline/mongos_process_interface.h6
-rw-r--r--src/mongo/db/pipeline/process_interface_shardsvr.cpp10
-rw-r--r--src/mongo/db/pipeline/process_interface_shardsvr.h6
-rw-r--r--src/mongo/db/pipeline/process_interface_standalone.cpp6
-rw-r--r--src/mongo/db/pipeline/process_interface_standalone.h6
-rw-r--r--src/mongo/db/pipeline/stub_mongo_process_interface.h11
-rw-r--r--src/mongo/s/write_ops/batch_write_exec_test.cpp39
-rw-r--r--src/mongo/s/write_ops/chunk_manager_targeter.cpp12
-rw-r--r--src/mongo/s/write_ops/chunk_manager_targeter.h13
-rw-r--r--src/mongo/s/write_ops/cluster_write.cpp5
-rw-r--r--src/mongo/s/write_ops/cluster_write.h7
26 files changed, 419 insertions, 64 deletions
diff --git a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
index 5121ce36812..9921398ded5 100644
--- a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
+++ b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
@@ -36,6 +36,7 @@ selector:
- jstests/sharding/out_command_options.js
- jstests/sharding/out_from_stale_mongos.js
- jstests/sharding/out_hashed_shard_key.js
+ - jstests/sharding/out_stale_unique_key.js
- jstests/sharding/out_to_existing.js
- jstests/sharding/out_to_non_existing.js
- jstests/sharding/out_unique_key.js
diff --git a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_misc.yml b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_misc.yml
index aeb804c4a21..cbb084ae70b 100644
--- a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_misc.yml
+++ b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_misc.yml
@@ -348,14 +348,15 @@ selector:
- jstests/sharding/collation_targeting_inherited.js
- jstests/sharding/geo_near_random1.js
- jstests/sharding/geo_near_random2.js
- - jstests/sharding/out_with_chunk_migrations.js
+ - jstests/sharding/out_command_options.js
- jstests/sharding/out_from_stale_mongos.js
- jstests/sharding/out_hashed_shard_key.js
- - jstests/sharding/out_command_options.js
+ - jstests/sharding/out_stale_unique_key.js
- jstests/sharding/out_to_existing.js
- jstests/sharding/out_to_non_existing.js
- jstests/sharding/out_unique_key.js
- jstests/sharding/out_unique_key_requires_index.js
+ - jstests/sharding/out_with_chunk_migrations.js
- jstests/sharding/restart_transactions.js
- jstests/sharding/shard7.js
- jstests/sharding/shard_collection_existing_zones.js
diff --git a/jstests/sharding/out_stale_unique_key.js b/jstests/sharding/out_stale_unique_key.js
new file mode 100644
index 00000000000..d42fffa0b51
--- /dev/null
+++ b/jstests/sharding/out_stale_unique_key.js
@@ -0,0 +1,170 @@
+// Tests that an $out stage is able to default the uniqueKey to the correct value - even if one or
+// more of the involved nodes has a stale cache of the routing information.
+(function() {
+ "use strict";
+
+ const st = new ShardingTest({shards: 2, mongos: 2});
+
+ const dbName = "out_stale_unique_key";
+ assert.commandWorked(st.s.adminCommand({enableSharding: dbName}));
+
+ const source = st.s0.getDB(dbName).source;
+ const target = st.s0.getDB(dbName).target;
+
+ // Test that an $out through a stale mongos can still use the correct uniqueKey and succeed.
+ (function testDefaultUniqueKeyIsRecent() {
+ const freshMongos = st.s0;
+ const staleMongos = st.s1;
+
+ // Set up two collections for an aggregate with an $out: The source collection will be
+ // unsharded and the target collection will be sharded amongst the two shards.
+ const staleMongosDB = staleMongos.getDB(dbName);
+ st.shardColl(source, {_id: 1}, {_id: 0}, {_id: 1});
+
+ (function setupStaleMongos() {
+ // Shard the collection through 'staleMongos', setting up 'staleMongos' to believe the
+ // collection is sharded by {sk: 1, _id: 1}.
+ assert.commandWorked(staleMongosDB.adminCommand(
+ {shardCollection: target.getFullName(), key: {sk: 1, _id: 1}}));
+ // Perform a query through that mongos to ensure the cache is populated.
+ assert.eq(0, staleMongosDB[target.getName()].find().itcount());
+
+ // Drop the collection from the other mongos - it is no longer sharded but the stale
+ // mongos doesn't know that yet.
+ target.drop();
+ }());
+
+ // At this point 'staleMongos' will believe that the target collection is sharded. This
+ // should not prevent it from running an $out without a uniqueKey specified. Specifically,
+ // the mongos should force a refresh of its cache before defaulting the uniqueKey.
+ assert.commandWorked(source.insert({_id: 'seed'}));
+
+ // If we had used the stale uniqueKey, this aggregation would fail since the documents do
+ // not have an 'sk' field.
+ assert.doesNotThrow(() => staleMongosDB[source.getName()].aggregate(
+ [{$out: {to: target.getName(), mode: 'insertDocuments'}}]));
+ assert.eq(target.find().toArray(), [{_id: 'seed'}]);
+ target.drop();
+ }());
+
+ // Test that if the collection is dropped and re-sharded during the course of the aggregation
+ // that the operation will fail rather than proceed with the old shard key.
+ function testEpochChangeDuringAgg({outSpec, failpoint, failpointData}) {
+ target.drop();
+ if (outSpec.hasOwnProperty("uniqueKey")) {
+ assert.commandWorked(target.createIndex(outSpec.uniqueKey, {unique: true}));
+ assert.commandWorked(
+ st.s.adminCommand({shardCollection: target.getFullName(), key: outSpec.uniqueKey}));
+ } else {
+ assert.commandWorked(
+ st.s.adminCommand({shardCollection: target.getFullName(), key: {sk: 1, _id: 1}}));
+ }
+
+ // Use a failpoint to make the query feeding into the aggregate hang while we drop the
+ // collection.
+ [st.rs0.getPrimary(), st.rs1.getPrimary()].forEach((mongod) => {
+ assert.commandWorked(mongod.adminCommand(
+ {configureFailPoint: failpoint, mode: "alwaysOn", data: failpointData || {}}));
+ });
+ let parallelShellJoiner;
+ try {
+ let parallelCode = `
+ const source = db.getSiblingDB("${dbName}").${source.getName()};
+ const error = assert.throws(() => source.aggregate([
+ {$addFields: {sk: "$_id"}},
+ {$out: ${tojsononeline(outSpec)}}
+ ]));
+ assert.eq(error.code, ErrorCodes.StaleEpoch);
+ `;
+
+ if (outSpec.hasOwnProperty("uniqueKey")) {
+ // If a user specifies their own uniqueKey, we don't need to fail an aggregation if
+ // the collection is dropped and recreated or the epoch otherwise changes. We are
+ // allowed to fail such an operation should we choose to in the future, but for now
+ // we don't expect to because we do not do anything special on mongos to ensure the
+ // catalog cache is up to date, so do not want to attach mongos's believed epoch to
+ // the command for the shards.
+ parallelCode = `
+ const source = db.getSiblingDB("${dbName}").${source.getName()};
+ assert.doesNotThrow(() => source.aggregate([
+ {$addFields: {sk: "$_id"}},
+ {$out: ${tojsononeline(outSpec)}}
+ ]));
+ `;
+ }
+
+ parallelShellJoiner = startParallelShell(parallelCode, st.s.port);
+
+ // Wait for the merging $out to appear in the currentOp output from the shards. We
+ // should see that the $out stage has an 'epoch' field serialized from the mongos.
+ const getAggOps = function() {
+ return st.s.getDB("admin")
+ .aggregate([
+ {$currentOp: {}},
+ {$match: {"cursor.originatingCommand.pipeline": {$exists: true}}}
+ ])
+ .toArray();
+ };
+ const hasOutRunning = function() {
+ return getAggOps()
+ .filter((op) => {
+ const pipeline = op.cursor.originatingCommand.pipeline;
+ return pipeline.length > 0 &&
+ pipeline[pipeline.length - 1].hasOwnProperty("$out");
+ })
+ .length >= 1;
+ };
+ assert.soon(hasOutRunning, () => tojson(getAggOps()));
+
+ // Drop the collection so that the epoch changes.
+ target.drop();
+ } finally {
+ [st.rs0.getPrimary(), st.rs1.getPrimary()].forEach((mongod) => {
+ assert.commandWorked(
+ mongod.adminCommand({configureFailPoint: failpoint, mode: "off"}));
+ });
+ }
+ parallelShellJoiner();
+ }
+
+ // Insert enough documents to force a yield.
+ const bulk = source.initializeUnorderedBulkOp();
+ for (let i = 0; i < 1000; ++i) {
+ bulk.insert({_id: i});
+ }
+ assert.commandWorked(bulk.execute());
+
+ testEpochChangeDuringAgg({
+ outSpec: {to: target.getName(), mode: "insertDocuments"},
+ failpoint: "setYieldAllLocksHang",
+ failpointData: {namespace: source.getFullName()}
+ });
+ testEpochChangeDuringAgg({
+ outSpec: {to: target.getName(), mode: "insertDocuments", uniqueKey: {sk: 1}},
+ failpoint: "setYieldAllLocksHang",
+ failpointData: {namespace: source.getFullName()}
+ });
+ testEpochChangeDuringAgg({
+ outSpec: {to: target.getName(), mode: "replaceDocuments"},
+ failpoint: "setYieldAllLocksHang",
+ failpointData: {namespace: source.getFullName()}
+ });
+ testEpochChangeDuringAgg({
+ outSpec: {to: target.getName(), mode: "replaceDocuments", uniqueKey: {sk: 1}},
+ failpoint: "setYieldAllLocksHang",
+ failpointData: {namespace: source.getFullName()}
+ });
+
+ // Test with some different failpoints to prove we will detect an epoch change in the middle of
+ // the inserts or updates.
+ testEpochChangeDuringAgg({
+ outSpec: {to: target.getName(), mode: "insertDocuments"},
+ failpoint: "hangDuringBatchInsert"
+ });
+ testEpochChangeDuringAgg({
+ outSpec: {to: target.getName(), mode: "replaceDocuments"},
+ failpoint: "hangDuringBatchUpdate"
+ });
+
+ st.stop();
+}());
diff --git a/jstests/sharding/out_unique_key_requires_index.js b/jstests/sharding/out_unique_key_requires_index.js
index d6fc6b8894c..ba85f855fc8 100644
--- a/jstests/sharding/out_unique_key_requires_index.js
+++ b/jstests/sharding/out_unique_key_requires_index.js
@@ -270,7 +270,8 @@
testAgainstDB(mongosDB);
// Then test against a foreign database, with the same expected behavior.
- testAgainstDB(foreignDB);
+ // TODO SERVER-37871 re-enable this test.
+ // testAgainstDB(foreignDB);
st.stop();
})();
diff --git a/src/mongo/db/ops/write_ops_exec.cpp b/src/mongo/db/ops/write_ops_exec.cpp
index de92a6e94d6..834a2034033 100644
--- a/src/mongo/db/ops/write_ops_exec.cpp
+++ b/src/mongo/db/ops/write_ops_exec.cpp
@@ -93,6 +93,7 @@ MONGO_FAIL_POINT_DEFINE(hangBeforeChildRemoveOpFinishes);
MONGO_FAIL_POINT_DEFINE(hangBeforeChildRemoveOpIsPopped);
MONGO_FAIL_POINT_DEFINE(hangAfterAllChildRemoveOpsArePopped);
MONGO_FAIL_POINT_DEFINE(hangDuringBatchInsert);
+MONGO_FAIL_POINT_DEFINE(hangDuringBatchUpdate);
void updateRetryStats(OperationContext* opCtx, bool containsRetry) {
if (containsRetry) {
@@ -596,6 +597,12 @@ static SingleWriteResult performSingleUpdateOp(OperationContext* opCtx,
boost::optional<AutoGetCollection> collection;
while (true) {
+ if (MONGO_FAIL_POINT(hangDuringBatchUpdate)) {
+ log() << "batch update - hangDuringBatchUpdate fail point enabled. Blocking until "
+ "fail point is disabled.";
+ MONGO_FAIL_POINT_PAUSE_WHILE_SET(hangDuringBatchUpdate);
+ }
+
if (MONGO_FAIL_POINT(failAllUpdates)) {
uasserted(ErrorCodes::InternalError, "failAllUpdates failpoint active!");
}
diff --git a/src/mongo/db/pipeline/SConscript b/src/mongo/db/pipeline/SConscript
index 0717e281ce1..193fab16ab3 100644
--- a/src/mongo/db/pipeline/SConscript
+++ b/src/mongo/db/pipeline/SConscript
@@ -282,6 +282,7 @@ env.Library(
LIBDEPS=[
'$BUILD_DIR/mongo/db/auth/auth',
'$BUILD_DIR/mongo/db/generic_cursor',
+ '$BUILD_DIR/mongo/s/sharding_router_api',
'field_path',
]
)
@@ -319,7 +320,6 @@ env.Library(
],
LIBDEPS=[
'$BUILD_DIR/mongo/s/query/async_results_merger',
- '$BUILD_DIR/mongo/s/sharding_router_api',
'mongo_process_common',
]
)
diff --git a/src/mongo/db/pipeline/document_source_out.cpp b/src/mongo/db/pipeline/document_source_out.cpp
index be19fbd681e..7012af386af 100644
--- a/src/mongo/db/pipeline/document_source_out.cpp
+++ b/src/mongo/db/pipeline/document_source_out.cpp
@@ -31,6 +31,7 @@
#include "mongo/platform/basic.h"
#include "mongo/db/ops/write_ops.h"
+#include "mongo/db/pipeline/document_path_support.h"
#include "mongo/db/pipeline/document_source_out.h"
#include "mongo/db/pipeline/document_source_out_gen.h"
#include "mongo/db/pipeline/document_source_out_in_place.h"
@@ -238,7 +239,9 @@ intrusive_ptr<DocumentSourceOut> DocumentSourceOut::create(
NamespaceString outputNs,
const intrusive_ptr<ExpressionContext>& expCtx,
WriteModeEnum mode,
- std::set<FieldPath> uniqueKey) {
+ std::set<FieldPath> uniqueKey,
+ boost::optional<OID> targetEpoch) {
+
// TODO (SERVER-36832): Allow this combination.
uassert(
50939,
@@ -272,13 +275,13 @@ intrusive_ptr<DocumentSourceOut> DocumentSourceOut::create(
switch (mode) {
case WriteModeEnum::kModeReplaceCollection:
return new DocumentSourceOutReplaceColl(
- std::move(outputNs), expCtx, mode, std::move(uniqueKey));
+ std::move(outputNs), expCtx, mode, std::move(uniqueKey), targetEpoch);
case WriteModeEnum::kModeInsertDocuments:
return new DocumentSourceOutInPlace(
- std::move(outputNs), expCtx, mode, std::move(uniqueKey));
+ std::move(outputNs), expCtx, mode, std::move(uniqueKey), targetEpoch);
case WriteModeEnum::kModeReplaceDocuments:
return new DocumentSourceOutInPlaceReplace(
- std::move(outputNs), expCtx, mode, std::move(uniqueKey));
+ std::move(outputNs), expCtx, mode, std::move(uniqueKey), targetEpoch);
default:
MONGO_UNREACHABLE;
}
@@ -287,11 +290,13 @@ intrusive_ptr<DocumentSourceOut> DocumentSourceOut::create(
DocumentSourceOut::DocumentSourceOut(NamespaceString outputNs,
const intrusive_ptr<ExpressionContext>& expCtx,
WriteModeEnum mode,
- std::set<FieldPath> uniqueKey)
+ std::set<FieldPath> uniqueKey,
+ boost::optional<OID> targetEpoch)
: DocumentSource(expCtx),
_writeConcern(expCtx->opCtx->getWriteConcern()),
- _done(false),
_outputNs(std::move(outputNs)),
+ _targetEpoch(targetEpoch),
+ _done(false),
_mode(mode),
_uniqueKeyFields(std::move(uniqueKey)),
_uniqueKeyIncludesId(_uniqueKeyFields.count("_id") == 1) {}
@@ -302,6 +307,7 @@ intrusive_ptr<DocumentSource> DocumentSourceOut::createFromBson(
auto mode = WriteModeEnum::kModeReplaceCollection;
std::set<FieldPath> uniqueKey;
NamespaceString outputNs;
+ boost::optional<OID> targetEpoch;
if (elem.type() == BSONType::String) {
outputNs = NamespaceString(expCtx->ns.db().toString() + '.' + elem.str());
uniqueKey.emplace("_id");
@@ -310,6 +316,10 @@ intrusive_ptr<DocumentSource> DocumentSourceOut::createFromBson(
DocumentSourceOutSpec::parse(IDLParserErrorContext("$out"), elem.embeddedObject());
mode = spec.getMode();
+ targetEpoch = spec.getTargetEpoch();
+ uassert(50984,
+ "$out received unexpected 'targetEpoch' on mongos",
+ !(expCtx->inMongos && bool(targetEpoch)));
// Retrieve the target database from the user command, otherwise use the namespace from the
// expression context.
@@ -320,26 +330,40 @@ intrusive_ptr<DocumentSource> DocumentSourceOut::createFromBson(
}
// Convert unique key object to a vector of FieldPaths.
- std::vector<FieldPath> docKeyPaths = std::get<0>(
- expCtx->mongoProcessInterface->collectDocumentKeyFields(expCtx->opCtx, outputNs));
- std::set<FieldPath> docKeyPathsSet =
- std::set<FieldPath>(std::make_move_iterator(docKeyPaths.begin()),
- std::make_move_iterator(docKeyPaths.end()));
if (auto userSpecifiedUniqueKey = spec.getUniqueKey()) {
uniqueKey = parseUniqueKeyFromSpec(userSpecifiedUniqueKey.get());
- // Skip the unique index check if the provided uniqueKey is the documentKey.
- const bool isDocumentKey = (uniqueKey == docKeyPathsSet);
-
// Make sure the uniqueKey has a supporting index. Skip this check if the command is
// sent from mongos since the uniqueKey check would've happened already.
uassert(50938,
- "Cannot find index to verify that $out's unique key will be unique",
- expCtx->fromMongos || isDocumentKey ||
+ str::stream()
+ << "Cannot find index to verify that $out's unique key will be unique: "
+ << userSpecifiedUniqueKey,
+ expCtx->fromMongos ||
expCtx->mongoProcessInterface->uniqueKeyIsSupportedByIndex(
expCtx, outputNs, uniqueKey));
} else {
- uniqueKey = std::move(docKeyPathsSet);
+ if (expCtx->inMongos && mode != WriteModeEnum::kModeReplaceCollection) {
+ // In case there are multiple shards which will perform this $out in parallel, we
+ // need to figure out and attach the collection's epoch to ensure each shard is
+ // talking about the same version of the collection. This mongos will coordinate
+ // that. We force a catalog refresh to do so because there is no shard versioning
+ // protocol on this namespace. We will also figure out and attach the uniqueKey to
+ // send to the shards. We don't need to do this for 'replaceCollection' mode since
+ // that mode cannot currently target a sharded collection.
+
+ // There are cases where the aggregation could fail if the collection is dropped or
+ // re-created during or near the time of the aggregation. This is okay - we are
+ // mostly paranoid that this mongos is very stale and want to prevent returning an
+ // error if the collection was dropped a long time ago. Because of this, we are okay
+ // with piggy-backing off another thread's request to refresh the cache, simply
+ // waiting for that request to return instead of forcing another refresh.
+ targetEpoch = expCtx->mongoProcessInterface->refreshAndGetEpoch(expCtx, outputNs);
+ }
+ std::vector<FieldPath> docKeyPaths = std::get<0>(
+ expCtx->mongoProcessInterface->collectDocumentKeyFields(expCtx->opCtx, outputNs));
+ uniqueKey = std::set<FieldPath>(std::make_move_iterator(docKeyPaths.begin()),
+ std::make_move_iterator(docKeyPaths.end()));
}
} else {
uasserted(16990,
@@ -347,20 +371,23 @@ intrusive_ptr<DocumentSource> DocumentSourceOut::createFromBson(
<< typeName(elem.type()));
}
- return create(std::move(outputNs), expCtx, mode, std::move(uniqueKey));
+ return create(std::move(outputNs), expCtx, mode, std::move(uniqueKey), targetEpoch);
}
Value DocumentSourceOut::serialize(boost::optional<ExplainOptions::Verbosity> explain) const {
- MutableDocument serialized(
- Document{{DocumentSourceOutSpec::kTargetCollectionFieldName, _outputNs.coll()},
- {DocumentSourceOutSpec::kTargetDbFieldName, _outputNs.db()},
- {DocumentSourceOutSpec::kModeFieldName, WriteMode_serializer(_mode)}});
- BSONObjBuilder uniqueKeyBob;
- for (auto path : _uniqueKeyFields) {
- uniqueKeyBob.append(path.fullPath(), 1);
- }
- serialized[DocumentSourceOutSpec::kUniqueKeyFieldName] = Value(uniqueKeyBob.done());
- return Value(Document{{getSourceName(), serialized.freeze()}});
+ DocumentSourceOutSpec spec;
+ spec.setTargetDb(_outputNs.db());
+ spec.setTargetCollection(_outputNs.coll());
+ spec.setMode(_mode);
+ spec.setUniqueKey([&]() {
+ BSONObjBuilder uniqueKeyBob;
+ for (auto path : _uniqueKeyFields) {
+ uniqueKeyBob.append(path.fullPath(), 1);
+ }
+ return uniqueKeyBob.obj();
+ }());
+ spec.setTargetEpoch(_targetEpoch);
+ return Value(Document{{getSourceName(), spec.toBSON()}});
}
DepsTracker::State DocumentSourceOut::getDependencies(DepsTracker* deps) const {
diff --git a/src/mongo/db/pipeline/document_source_out.h b/src/mongo/db/pipeline/document_source_out.h
index 62deff0291a..c47a9a52ecc 100644
--- a/src/mongo/db/pipeline/document_source_out.h
+++ b/src/mongo/db/pipeline/document_source_out.h
@@ -72,7 +72,8 @@ public:
DocumentSourceOut(NamespaceString outputNs,
const boost::intrusive_ptr<ExpressionContext>& expCtx,
WriteModeEnum mode,
- std::set<FieldPath> uniqueKey);
+ std::set<FieldPath> uniqueKey,
+ boost::optional<OID> targetEpoch);
virtual ~DocumentSourceOut() = default;
@@ -167,7 +168,7 @@ public:
*/
virtual void spill(BatchedObjects&& batch) {
pExpCtx->mongoProcessInterface->insert(
- pExpCtx, getWriteNs(), std::move(batch.objects), _writeConcern);
+ pExpCtx, getWriteNs(), std::move(batch.objects), _writeConcern, _targetEpoch);
};
/**
@@ -182,7 +183,8 @@ public:
NamespaceString outputNs,
const boost::intrusive_ptr<ExpressionContext>& expCtx,
WriteModeEnum,
- std::set<FieldPath> uniqueKey = std::set<FieldPath>{"_id"});
+ std::set<FieldPath> uniqueKey = std::set<FieldPath>{"_id"},
+ boost::optional<OID> targetEpoch = boost::none);
/**
* Parses a $out stage from the user-supplied BSON.
@@ -198,11 +200,13 @@ protected:
// respect the writeConcern of the original command.
WriteConcernOptions _writeConcern;
+ const NamespaceString _outputNs;
+ boost::optional<OID> _targetEpoch;
+
private:
bool _initialized = false;
bool _done = false;
- const NamespaceString _outputNs;
WriteModeEnum _mode;
// Holds the unique key used for uniquely identifying documents. There must exist a unique index
diff --git a/src/mongo/db/pipeline/document_source_out.idl b/src/mongo/db/pipeline/document_source_out.idl
index 7ae0b5acd9f..006dfe665ab 100644
--- a/src/mongo/db/pipeline/document_source_out.idl
+++ b/src/mongo/db/pipeline/document_source_out.idl
@@ -70,3 +70,13 @@ structs:
type: object
optional: true
description: Document of fields representing the unique key.
+
+ epoch:
+ cpp_name: targetEpoch
+ type: objectid
+ optional: true
+ description: If set, the epoch found when parsed on mongos. Can be used to check if
+ a collection has since been dropped and re-created, in which case the
+ shard key may have changed. As of this writing, this also can be used
+ to detect if the collection has gone from unsharded to sharded, and
+ thus now has a shard key.
diff --git a/src/mongo/db/pipeline/document_source_out_in_place.h b/src/mongo/db/pipeline/document_source_out_in_place.h
index 791d0985657..993378466d1 100644
--- a/src/mongo/db/pipeline/document_source_out_in_place.h
+++ b/src/mongo/db/pipeline/document_source_out_in_place.h
@@ -43,7 +43,7 @@ public:
using DocumentSourceOut::DocumentSourceOut;
const NamespaceString& getWriteNs() const final {
- return getOutputNs();
+ return _outputNs;
};
/**
@@ -75,7 +75,8 @@ public:
std::move(batch.objects),
_writeConcern,
upsert,
- multi);
+ multi,
+ _targetEpoch);
} catch (const ExceptionFor<ErrorCodes::ImmutableField>& ex) {
uassertStatusOKWithContext(ex.toStatus(),
"$out failed to update the matching document, did you "
diff --git a/src/mongo/db/pipeline/document_source_out_test.cpp b/src/mongo/db/pipeline/document_source_out_test.cpp
index 1b040ca236b..01839641dfb 100644
--- a/src/mongo/db/pipeline/document_source_out_test.cpp
+++ b/src/mongo/db/pipeline/document_source_out_test.cpp
@@ -339,6 +339,23 @@ TEST_F(DocumentSourceOutTest, FailsToParseIfUniqueKeyHasDuplicateFields) {
ASSERT_THROWS_CODE(createOutStage(spec), AssertionException, ErrorCodes::BadValue);
}
+TEST_F(DocumentSourceOutTest, FailsToParseIfTargetEpochIsSpecifiedOnMongos) {
+ BSONObj spec = BSON("$out" << BSON("to"
+ << "test"
+ << "mode"
+ << kDefaultMode
+ << "uniqueKey"
+ << BSON("_id" << 1)
+ << "epoch"
+ << OID::gen()));
+ getExpCtx()->inMongos = true;
+ ASSERT_THROWS_CODE(createOutStage(spec), AssertionException, 50984);
+
+ // Test that 'targetEpoch' is accepted if not in mongos.
+ getExpCtx()->inMongos = false;
+ ASSERT(createOutStage(spec) != nullptr);
+}
+
TEST_F(DocumentSourceOutTest, CorrectlyUsesTargetDbThatMatchesAggregationDb) {
const auto targetDbSameAsAggregationDb = getExpCtx()->ns.db();
const auto targetColl = "test"_sd;
@@ -350,7 +367,7 @@ TEST_F(DocumentSourceOutTest, CorrectlyUsesTargetDbThatMatchesAggregationDb) {
ASSERT_EQ(outStage->getOutputNs().coll(), targetColl);
}
-// TODO (SERVER-50939): Allow "replaceCollection" to a foreign database.
+// TODO (SERVER-36832): Allow "replaceCollection" to a foreign database.
TEST_F(DocumentSourceOutTest, CorrectlyUsesForeignTargetDb) {
const auto foreignDb = "someOtherDb"_sd;
const auto targetColl = "test"_sd;
diff --git a/src/mongo/db/pipeline/mongo_process_common.cpp b/src/mongo/db/pipeline/mongo_process_common.cpp
index 9609a4909b5..390900ce3af 100644
--- a/src/mongo/db/pipeline/mongo_process_common.cpp
+++ b/src/mongo/db/pipeline/mongo_process_common.cpp
@@ -39,6 +39,8 @@
#include "mongo/db/operation_context.h"
#include "mongo/db/pipeline/expression_context.h"
#include "mongo/db/service_context.h"
+#include "mongo/s/catalog_cache.h"
+#include "mongo/s/grid.h"
#include "mongo/util/net/socket_utils.h"
namespace mongo {
@@ -135,4 +137,16 @@ bool MongoProcessCommon::keyPatternNamesExactPaths(const BSONObj& keyPattern,
return nFieldsMatched == uniqueKeyPaths.size();
}
+boost::optional<OID> MongoProcessCommon::refreshAndGetEpoch(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx, const NamespaceString& nss) const {
+ const bool forceRefreshFromThisThread = false;
+ auto routingInfo = uassertStatusOK(
+ Grid::get(expCtx->opCtx)
+ ->catalogCache()
+ ->getCollectionRoutingInfoWithRefresh(expCtx->opCtx, nss, forceRefreshFromThisThread));
+ if (auto chunkManager = routingInfo.cm()) {
+ return chunkManager->getVersion().epoch();
+ }
+ return boost::none;
+}
} // namespace mongo
diff --git a/src/mongo/db/pipeline/mongo_process_common.h b/src/mongo/db/pipeline/mongo_process_common.h
index 9aa6149120d..53ce182e93a 100644
--- a/src/mongo/db/pipeline/mongo_process_common.h
+++ b/src/mongo/db/pipeline/mongo_process_common.h
@@ -60,6 +60,10 @@ public:
CurrentOpTruncateMode truncateMode,
CurrentOpCursorMode cursorMode) const final;
+ virtual boost::optional<OID> refreshAndGetEpoch(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString& nss) const final;
+
protected:
/**
* Returns a BSONObj representing a report of the operation which is currently being
diff --git a/src/mongo/db/pipeline/mongo_process_interface.h b/src/mongo/db/pipeline/mongo_process_interface.h
index da96b66c4b0..033d9658c51 100644
--- a/src/mongo/db/pipeline/mongo_process_interface.h
+++ b/src/mongo/db/pipeline/mongo_process_interface.h
@@ -108,16 +108,20 @@ public:
virtual bool isSharded(OperationContext* opCtx, const NamespaceString& ns) = 0;
/**
- * Inserts 'objs' into 'ns' and throws a UserException if the insert fails.
+ * Inserts 'objs' into 'ns' and throws a UserException if the insert fails. If 'targetEpoch' is
+ * set, throws ErrorCodes::StaleEpoch if the targeted collection does not have the same epoch or
+ * the epoch changes during the course of the insert.
*/
virtual void insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
std::vector<BSONObj>&& objs,
- const WriteConcernOptions& wc) = 0;
+ const WriteConcernOptions& wc,
+ boost::optional<OID> targetEpoch) = 0;
/**
* Updates the documents matching 'queries' with the objects 'updates'. Throws a UserException
- * if any of the updates fail.
+ * if any of the updates fail. If 'targetEpoch' is set, throws ErrorCodes::StaleEpoch if the
+ * targeted collection does not have the same epoch, or if the epoch changes during the update.
*/
virtual void update(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
@@ -125,7 +129,8 @@ public:
std::vector<BSONObj>&& updates,
const WriteConcernOptions& wc,
bool upsert,
- bool multi) = 0;
+ bool multi,
+ boost::optional<OID> targetEpoch) = 0;
virtual CollectionIndexUsageMap getIndexStats(OperationContext* opCtx,
const NamespaceString& ns) = 0;
@@ -269,6 +274,16 @@ public:
virtual bool uniqueKeyIsSupportedByIndex(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& nss,
const std::set<FieldPath>& uniqueKeyPaths) const = 0;
+
+ /**
+ * Refreshes the CatalogCache entry for the namespace 'nss', and returns the epoch associated
+ * with that namespace, if any. Note that this refresh will not necessarily force a new
+ * request to be sent to the config servers. If another thread has already requested a refresh,
+ * it will instead wait for that response.
+ */
+ virtual boost::optional<OID> refreshAndGetEpoch(
+ const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString& nss) const = 0;
};
} // namespace mongo
diff --git a/src/mongo/db/pipeline/mongos_process_interface.cpp b/src/mongo/db/pipeline/mongos_process_interface.cpp
index 569b968c94f..75d69161eb0 100644
--- a/src/mongo/db/pipeline/mongos_process_interface.cpp
+++ b/src/mongo/db/pipeline/mongos_process_interface.cpp
@@ -104,7 +104,9 @@ bool supportsUniqueKey(const boost::intrusive_ptr<ExpressionContext>& expCtx,
? index.getObjectField(IndexDescriptor::kCollationFieldName)
: CollationSpec::kSimpleSpec));
- return index.getBoolField(IndexDescriptor::kUniqueFieldName) &&
+ // SERVER-5335: The _id index does not report to be unique, but in fact is unique.
+ auto isIdIndex = index[IndexDescriptor::kIndexNameFieldName].String() == "_id_";
+ return (isIdIndex || index.getBoolField(IndexDescriptor::kUniqueFieldName)) &&
!index.hasField(IndexDescriptor::kPartialFilterExprFieldName) &&
MongoProcessCommon::keyPatternNamesExactPaths(
index.getObjectField(IndexDescriptor::kKeyPatternFieldName), uniqueKeyPaths) &&
diff --git a/src/mongo/db/pipeline/mongos_process_interface.h b/src/mongo/db/pipeline/mongos_process_interface.h
index ee900586ba4..16a7005d262 100644
--- a/src/mongo/db/pipeline/mongos_process_interface.h
+++ b/src/mongo/db/pipeline/mongos_process_interface.h
@@ -66,7 +66,8 @@ public:
void insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
std::vector<BSONObj>&& objs,
- const WriteConcernOptions& wc) final {
+ const WriteConcernOptions& wc,
+ boost::optional<OID>) final {
MONGO_UNREACHABLE;
}
@@ -76,7 +77,8 @@ public:
std::vector<BSONObj>&& updates,
const WriteConcernOptions& wc,
bool upsert,
- bool multi) final {
+ bool multi,
+ boost::optional<OID>) final {
MONGO_UNREACHABLE;
}
diff --git a/src/mongo/db/pipeline/process_interface_shardsvr.cpp b/src/mongo/db/pipeline/process_interface_shardsvr.cpp
index c6ccdb2d7ad..b9d5fffe02b 100644
--- a/src/mongo/db/pipeline/process_interface_shardsvr.cpp
+++ b/src/mongo/db/pipeline/process_interface_shardsvr.cpp
@@ -134,7 +134,8 @@ std::pair<std::vector<FieldPath>, bool> MongoInterfaceShardServer::collectDocume
void MongoInterfaceShardServer::insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
std::vector<BSONObj>&& objs,
- const WriteConcernOptions& wc) {
+ const WriteConcernOptions& wc,
+ boost::optional<OID> targetEpoch) {
BatchedCommandResponse response;
BatchWriteExecStats stats;
@@ -144,7 +145,7 @@ void MongoInterfaceShardServer::insert(const boost::intrusive_ptr<ExpressionCont
// If applicable, attach a write concern to the batched command request.
attachWriteConcern(&insertCommand, wc);
- ClusterWriter::write(expCtx->opCtx, insertCommand, &stats, &response);
+ ClusterWriter::write(expCtx->opCtx, insertCommand, &stats, &response, targetEpoch);
// TODO SERVER-35403: Add more context for which shard produced the error.
uassertStatusOKWithContext(response.toStatus(), "Insert failed: ");
@@ -156,7 +157,8 @@ void MongoInterfaceShardServer::update(const boost::intrusive_ptr<ExpressionCont
std::vector<BSONObj>&& updates,
const WriteConcernOptions& wc,
bool upsert,
- bool multi) {
+ bool multi,
+ boost::optional<OID> targetEpoch) {
BatchedCommandResponse response;
BatchWriteExecStats stats;
@@ -170,7 +172,7 @@ void MongoInterfaceShardServer::update(const boost::intrusive_ptr<ExpressionCont
// If applicable, attach a write concern to the batched command request.
attachWriteConcern(&updateCommand, wc);
- ClusterWriter::write(expCtx->opCtx, updateCommand, &stats, &response);
+ ClusterWriter::write(expCtx->opCtx, updateCommand, &stats, &response, targetEpoch);
// TODO SERVER-35403: Add more context for which shard produced the error.
uassertStatusOKWithContext(response.toStatus(), "Update failed: ");
diff --git a/src/mongo/db/pipeline/process_interface_shardsvr.h b/src/mongo/db/pipeline/process_interface_shardsvr.h
index 39d550fc083..ddafac24cdb 100644
--- a/src/mongo/db/pipeline/process_interface_shardsvr.h
+++ b/src/mongo/db/pipeline/process_interface_shardsvr.h
@@ -53,7 +53,8 @@ public:
void insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
std::vector<BSONObj>&& objs,
- const WriteConcernOptions& wc) final;
+ const WriteConcernOptions& wc,
+ boost::optional<OID> targetEpoch) final;
/**
* Replaces the documents matching 'queries' with 'updates' using the ClusterWriter for locking,
@@ -65,7 +66,8 @@ public:
std::vector<BSONObj>&& updates,
const WriteConcernOptions& wc,
bool upsert,
- bool multi) final;
+ bool multi,
+ boost::optional<OID> targetEpoch) final;
};
} // namespace mongo
diff --git a/src/mongo/db/pipeline/process_interface_standalone.cpp b/src/mongo/db/pipeline/process_interface_standalone.cpp
index 8e874498b15..8f7113042d9 100644
--- a/src/mongo/db/pipeline/process_interface_standalone.cpp
+++ b/src/mongo/db/pipeline/process_interface_standalone.cpp
@@ -156,7 +156,8 @@ Update MongoInterfaceStandalone::buildUpdateOp(const NamespaceString& nss,
void MongoInterfaceStandalone::insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
std::vector<BSONObj>&& objs,
- const WriteConcernOptions& wc) {
+ const WriteConcernOptions& wc,
+ boost::optional<OID> targetEpoch) {
auto writeResults = performInserts(
expCtx->opCtx, buildInsertOp(ns, std::move(objs), expCtx->bypassDocumentValidation));
@@ -179,7 +180,8 @@ void MongoInterfaceStandalone::update(const boost::intrusive_ptr<ExpressionConte
std::vector<BSONObj>&& updates,
const WriteConcernOptions& wc,
bool upsert,
- bool multi) {
+ bool multi,
+ boost::optional<OID> targetEpoch) {
auto writeResults = performUpdates(expCtx->opCtx,
buildUpdateOp(ns,
std::move(queries),
diff --git a/src/mongo/db/pipeline/process_interface_standalone.h b/src/mongo/db/pipeline/process_interface_standalone.h
index de64e9b0578..7ed45fe14f1 100644
--- a/src/mongo/db/pipeline/process_interface_standalone.h
+++ b/src/mongo/db/pipeline/process_interface_standalone.h
@@ -59,14 +59,16 @@ public:
void insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
std::vector<BSONObj>&& objs,
- const WriteConcernOptions& wc) override;
+ const WriteConcernOptions& wc,
+ boost::optional<OID> targetEpoch) override;
void update(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
std::vector<BSONObj>&& queries,
std::vector<BSONObj>&& updates,
const WriteConcernOptions& wc,
bool upsert,
- bool multi) override;
+ bool multi,
+ boost::optional<OID> targetEpoch) override;
CollectionIndexUsageMap getIndexStats(OperationContext* opCtx, const NamespaceString& ns) final;
void appendLatencyStats(OperationContext* opCtx,
const NamespaceString& nss,
diff --git a/src/mongo/db/pipeline/stub_mongo_process_interface.h b/src/mongo/db/pipeline/stub_mongo_process_interface.h
index e822be1e7b1..d4f3a6dacb6 100644
--- a/src/mongo/db/pipeline/stub_mongo_process_interface.h
+++ b/src/mongo/db/pipeline/stub_mongo_process_interface.h
@@ -62,7 +62,8 @@ public:
void insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
std::vector<BSONObj>&& objs,
- const WriteConcernOptions& wc) override {
+ const WriteConcernOptions& wc,
+ boost::optional<OID>) override {
MONGO_UNREACHABLE;
}
@@ -72,7 +73,8 @@ public:
std::vector<BSONObj>&& updates,
const WriteConcernOptions& wc,
bool upsert,
- bool multi) final {
+ bool multi,
+ boost::optional<OID>) final {
MONGO_UNREACHABLE;
}
@@ -177,5 +179,10 @@ public:
const std::set<FieldPath>& uniqueKeyPaths) const override {
return true;
}
+
+ boost::optional<OID> refreshAndGetEpoch(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const NamespaceString& nss) const override {
+ return boost::none;
+ }
};
} // namespace mongo
diff --git a/src/mongo/s/write_ops/batch_write_exec_test.cpp b/src/mongo/s/write_ops/batch_write_exec_test.cpp
index 9f6fe308408..c76b915090f 100644
--- a/src/mongo/s/write_ops/batch_write_exec_test.cpp
+++ b/src/mongo/s/write_ops/batch_write_exec_test.cpp
@@ -575,6 +575,45 @@ TEST_F(BatchWriteExecTest, NonRetryableErrorTxnNumber) {
future.timed_get(kFutureTimeout);
}
+TEST_F(BatchWriteExecTest, StaleEpochIsNotRetryable) {
+ // A StaleEpoch error is not retried.
+
+ BatchedCommandRequest request([&] {
+ write_ops::Insert insertOp(nss);
+ insertOp.setWriteCommandBase([] {
+ write_ops::WriteCommandBase writeCommandBase;
+ writeCommandBase.setOrdered(true);
+ return writeCommandBase;
+ }());
+ insertOp.setDocuments({BSON("x" << 1), BSON("x" << 2)});
+ return insertOp;
+ }());
+ request.setWriteConcern(BSONObj());
+
+ operationContext()->setLogicalSessionId(makeLogicalSessionIdForTest());
+ operationContext()->setTxnNumber(5);
+
+ BatchedCommandResponse nonRetryableErrResponse;
+ nonRetryableErrResponse.setStatus({ErrorCodes::StaleEpoch, "mock stale epoch error"});
+
+ auto future = launchAsync([&] {
+ BatchedCommandResponse response;
+ BatchWriteExecStats stats;
+ BatchWriteExec::executeBatch(operationContext(), nsTargeter, request, &response, &stats);
+ ASSERT(response.getOk());
+ ASSERT_EQ(0, response.getN());
+ ASSERT(response.isErrDetailsSet());
+ ASSERT_EQUALS(response.getErrDetailsAt(0)->toStatus().code(),
+ nonRetryableErrResponse.toStatus().code());
+ ASSERT(response.getErrDetailsAt(0)->toStatus().reason().find(
+ nonRetryableErrResponse.toStatus().reason()) != std::string::npos);
+ ASSERT_EQ(1, stats.numRounds);
+ });
+
+ expectInsertsReturnError({BSON("x" << 1), BSON("x" << 2)}, nonRetryableErrResponse);
+
+ future.timed_get(kFutureTimeout);
+}
class BatchWriteExecTransactionTest : public BatchWriteExecTest {
public:
diff --git a/src/mongo/s/write_ops/chunk_manager_targeter.cpp b/src/mongo/s/write_ops/chunk_manager_targeter.cpp
index 0c33291a7fe..727f8505489 100644
--- a/src/mongo/s/write_ops/chunk_manager_targeter.cpp
+++ b/src/mongo/s/write_ops/chunk_manager_targeter.cpp
@@ -317,8 +317,9 @@ bool wasMetadataRefreshed(const std::shared_ptr<ChunkManager>& managerA,
} // namespace
-ChunkManagerTargeter::ChunkManagerTargeter(const NamespaceString& nss)
- : _nss(nss), _needsTargetingRefresh(false) {}
+ChunkManagerTargeter::ChunkManagerTargeter(const NamespaceString& nss,
+ boost::optional<OID> targetEpoch)
+ : _nss(nss), _needsTargetingRefresh(false), _targetEpoch(targetEpoch) {}
Status ChunkManagerTargeter::init(OperationContext* opCtx) {
@@ -334,6 +335,13 @@ Status ChunkManagerTargeter::init(OperationContext* opCtx) {
_routingInfo = std::move(routingInfoStatus.getValue());
+ if (_targetEpoch) {
+ uassert(ErrorCodes::StaleEpoch, "Collection has been dropped", _routingInfo->cm());
+ uassert(ErrorCodes::StaleEpoch,
+ "Collection has been dropped and recreated",
+ _routingInfo->cm()->getVersion().epoch() == *_targetEpoch);
+ }
+
return Status::OK();
}
diff --git a/src/mongo/s/write_ops/chunk_manager_targeter.h b/src/mongo/s/write_ops/chunk_manager_targeter.h
index d0f65e21402..c5738c6a786 100644
--- a/src/mongo/s/write_ops/chunk_manager_targeter.h
+++ b/src/mongo/s/write_ops/chunk_manager_targeter.h
@@ -58,12 +58,19 @@ class ChunkManagerTargeter : public NSTargeter {
public:
enum class UpdateType { kReplacement, kOpStyle, kUnknown };
- ChunkManagerTargeter(const NamespaceString& nss);
+ /**
+ * If 'targetEpoch' is not boost::none, throws a 'StaleEpoch' exception if the collection given
+ * by 'nss' is ever found to not have the target epoch.
+ */
+ ChunkManagerTargeter(const NamespaceString& nss,
+ boost::optional<OID> targetEpoch = boost::none);
/**
* Initializes the ChunkManagerTargeter with the latest targeting information for the
* namespace. May need to block and load information from a remote config server.
*
+ * Throws a 'StaleEpoch' exception if the collection targeted has an epoch which does not match
+ * '_targetEpoch'
* Returns !OK if the information could not be initialized.
*/
Status init(OperationContext* opCtx);
@@ -166,6 +173,10 @@ private:
// The latest loaded routing cache entry
boost::optional<CachedCollectionRoutingInfo> _routingInfo;
+ // Set to the epoch of the namespace we are targeting. If we ever refresh the catalog cache and
+ // find a new epoch, we immediately throw a StaleEpoch exception.
+ boost::optional<OID> _targetEpoch;
+
// Map of shard->remote shard version reported from stale errors
ShardVersionMap _remoteShardVersions;
};
diff --git a/src/mongo/s/write_ops/cluster_write.cpp b/src/mongo/s/write_ops/cluster_write.cpp
index be7664f3931..7ed3ac2553d 100644
--- a/src/mongo/s/write_ops/cluster_write.cpp
+++ b/src/mongo/s/write_ops/cluster_write.cpp
@@ -67,7 +67,8 @@ void toBatchError(const Status& status, BatchedCommandResponse* response) {
void ClusterWriter::write(OperationContext* opCtx,
const BatchedCommandRequest& request,
BatchWriteExecStats* stats,
- BatchedCommandResponse* response) {
+ BatchedCommandResponse* response,
+ boost::optional<OID> targetEpoch) {
const NamespaceString& nss = request.getNS();
LastError::Disabled disableLastError(&LastError::get(opCtx->getClient()));
@@ -77,7 +78,7 @@ void ClusterWriter::write(OperationContext* opCtx,
Grid::get(opCtx)->catalogClient()->writeConfigServerDirect(opCtx, request, response);
} else {
{
- ChunkManagerTargeter targeter(request.getNS());
+ ChunkManagerTargeter targeter(request.getNS(), targetEpoch);
Status targetInitStatus = targeter.init(opCtx);
if (!targetInitStatus.isOK()) {
diff --git a/src/mongo/s/write_ops/cluster_write.h b/src/mongo/s/write_ops/cluster_write.h
index 96530e55fd5..ab67644bdbf 100644
--- a/src/mongo/s/write_ops/cluster_write.h
+++ b/src/mongo/s/write_ops/cluster_write.h
@@ -41,10 +41,15 @@ class OperationContext;
class ClusterWriter {
public:
+ /**
+ * If 'targetEpoch' is set, throws a 'StaleEpoch' error if the targeted namespace is found to no
+ * longer have the epoch given by 'targetEpoch'.
+ */
static void write(OperationContext* opCtx,
const BatchedCommandRequest& request,
BatchWriteExecStats* stats,
- BatchedCommandResponse* response);
+ BatchedCommandResponse* response,
+ boost::optional<OID> targetEpoch = boost::none);
};
} // namespace mongo