summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml2
-rw-r--r--jstests/sharding/prepared_txn_metadata_refresh.js85
-rw-r--r--src/mongo/db/commands/find_cmd.cpp6
-rw-r--r--src/mongo/db/commands/mr.cpp2
-rw-r--r--src/mongo/db/db_raii.cpp5
-rw-r--r--src/mongo/db/pipeline/pipeline_d.cpp11
-rw-r--r--src/mongo/db/pipeline/process_interface_shardsvr.cpp5
-rw-r--r--src/mongo/db/pipeline/process_interface_standalone.h5
-rw-r--r--src/mongo/db/query/find.cpp5
-rw-r--r--src/mongo/db/query/stage_builder.cpp8
-rw-r--r--src/mongo/db/s/collection_sharding_runtime.cpp3
-rw-r--r--src/mongo/db/s/collection_sharding_state.cpp28
-rw-r--r--src/mongo/db/s/collection_sharding_state.h12
-rw-r--r--src/mongo/db/s/op_observer_sharding_impl.cpp7
-rw-r--r--src/mongo/db/s/shard_filtering_metadata_refresh.cpp5
15 files changed, 140 insertions, 49 deletions
diff --git a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
index aa01978d011..7637f14f324 100644
--- a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
+++ b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
@@ -42,6 +42,8 @@ selector:
- jstests/sharding/write_transactions_during_migration.js
- jstests/sharding/change_stream_show_migration_events.js
- jstests/sharding/prepare_transaction_then_migrate.js
+ # Enable after SERVER-40258 gets backported and available in the official 4.2 binaries.
+ - jstests/sharding/prepared_txn_metadata_refresh.js
# mongos in 4.0 doesn't like an aggregation explain without stages for optimized away pipelines,
# so blacklisting the test until 4.2 becomes last-stable.
- jstests/sharding/agg_explain_fmt.js
diff --git a/jstests/sharding/prepared_txn_metadata_refresh.js b/jstests/sharding/prepared_txn_metadata_refresh.js
new file mode 100644
index 00000000000..4990a68867c
--- /dev/null
+++ b/jstests/sharding/prepared_txn_metadata_refresh.js
@@ -0,0 +1,85 @@
+/**
+ * Test to make sure that transactions doesn't block shard version metadata refresh.
+ * Test relies on the fact that destination shard does not update it's shard version after a
+ * migration when doNotRefreshRecipientAfterCommit failpoint is ON.
+ */
+
+(function() {
+ "use strict";
+
+ load('./jstests/libs/chunk_manipulation_util.js');
+
+ var staticMongod = MongoRunner.runMongod({}); // For startParallelOps.
+
+ let st = new ShardingTest({shards: 3, other: {shardOptions: {verbose: 1}}});
+
+ assert.commandWorked(st.s.adminCommand({enableSharding: 'test'}));
+ st.ensurePrimaryShard('test', st.shard0.shardName);
+ assert.commandWorked(st.s.adminCommand({shardCollection: 'test.user', key: {x: 1}}));
+ assert.commandWorked(st.s.adminCommand({split: 'test.user', middle: {x: 0}}));
+ assert.commandWorked(st.s.adminCommand({split: 'test.user', middle: {x: -100}}));
+ assert.commandWorked(st.s.adminCommand({split: 'test.user', middle: {x: 100}}));
+ assert.commandWorked(
+ st.s.adminCommand({moveChunk: 'test.user', find: {x: 0}, to: st.shard1.shardName}));
+ assert.commandWorked(
+ st.s.adminCommand({moveChunk: 'test.user', find: {x: 100}, to: st.shard2.shardName}));
+
+ // Send a normal write to establish the shard versions outside the transaction.
+ assert.commandWorked(st.s.getDB('test').runCommand({
+ insert: 'user',
+ documents: [{x: 0}, {x: 100}],
+ }));
+
+ let lsid = {id: UUID()};
+ let txnNumber = 0;
+
+ // Start a migration in parallel to get around the X lock acquisition at the beginning of
+ // migration at the destination.
+ assert.commandWorked(st.rs0.getPrimary().getDB('admin').runCommand(
+ {configureFailPoint: 'doNotRefreshRecipientAfterCommit', mode: 'alwaysOn'}));
+
+ let destPrimary = st.rs2.getPrimary();
+ pauseMigrateAtStep(destPrimary, migrateStepNames.cloned);
+ var joinMoveChunk = moveChunkParallel(
+ staticMongod, st.s0.host, {x: -100}, null, 'test.user', st.shard2.shardName);
+ waitForMigrateStep(destPrimary, migrateStepNames.cloned);
+
+ assert.commandWorked(st.s.getDB('test').runCommand({
+ insert: 'user',
+ documents: [{x: 1}, {x: 101}],
+ lsid: lsid,
+ txnNumber: NumberLong(txnNumber),
+ stmtId: NumberInt(0),
+ startTransaction: true,
+ autocommit: false,
+ }));
+
+ unpauseMigrateAtStep(destPrimary, migrateStepNames.cloned);
+ joinMoveChunk();
+
+ // Make the transaction stay in prepared state so it will hold on to the collection locks.
+ assert.commandWorked(st.rs1.getPrimary().getDB('admin').runCommand(
+ {configureFailPoint: 'hangBeforeWritingDecision', mode: 'alwaysOn'}));
+ assert.commandWorked(st.rs2.getPrimary().getDB('admin').runCommand(
+ {configureFailPoint: 'hangBeforeWritingDecision', mode: 'alwaysOn'}));
+
+ const runCommitCode = "db.adminCommand({" + "commitTransaction: 1," + "lsid: " + tojson(lsid) +
+ "," + "txnNumber: NumberLong(" + txnNumber + ")," + "stmtId: NumberInt(0)," +
+ "autocommit: false," + "});";
+ let commitTxn = startParallelShell(runCommitCode, st.s.port);
+
+ // Insert should be able to refresh the sharding metadata even with existing transactions
+ // holding the collection lock in IX.
+ assert.commandWorked(st.s.getDB('test').runCommand(
+ {insert: 'user', documents: [{x: -100}], maxTimeMS: 5 * 1000}));
+
+ assert.commandWorked(st.rs1.getPrimary().getDB('admin').runCommand(
+ {configureFailPoint: 'hangBeforeWritingDecision', mode: 'off'}));
+ assert.commandWorked(st.rs2.getPrimary().getDB('admin').runCommand(
+ {configureFailPoint: 'hangBeforeWritingDecision', mode: 'off'}));
+ commitTxn();
+
+ st.stop();
+ MongoRunner.stopMongod(staticMongod);
+
+})();
diff --git a/src/mongo/db/commands/find_cmd.cpp b/src/mongo/db/commands/find_cmd.cpp
index 47e08334bf9..19c0acbb486 100644
--- a/src/mongo/db/commands/find_cmd.cpp
+++ b/src/mongo/db/commands/find_cmd.cpp
@@ -49,7 +49,6 @@
#include "mongo/db/query/find_common.h"
#include "mongo/db/query/get_executor.h"
#include "mongo/db/repl/replication_coordinator.h"
-#include "mongo/db/s/collection_sharding_state.h"
#include "mongo/db/service_context.h"
#include "mongo/db/stats/counters.h"
#include "mongo/db/stats/server_read_concern_metrics.h"
@@ -487,11 +486,6 @@ public:
"Executor error during find command"));
}
- // Before saving the cursor, ensure that whatever plan we established happened with the
- // expected collection version
- auto css = CollectionShardingState::get(opCtx, nss);
- css->checkShardVersionOrThrow(opCtx);
-
// Set up the cursor for getMore.
CursorId cursorId = 0;
if (shouldSaveCursor(opCtx, collection, state, exec.get())) {
diff --git a/src/mongo/db/commands/mr.cpp b/src/mongo/db/commands/mr.cpp
index 19276846563..e4bc431be5e 100644
--- a/src/mongo/db/commands/mr.cpp
+++ b/src/mongo/db/commands/mr.cpp
@@ -1460,7 +1460,7 @@ public:
const auto metadata = [&] {
AutoGetCollectionForReadCommand autoColl(opCtx, config.nss);
- return CollectionShardingState::get(opCtx, config.nss)->getCurrentMetadata();
+ return CollectionShardingState::get(opCtx, config.nss)->getOrphansFilter(opCtx);
}();
bool shouldHaveData = false;
diff --git a/src/mongo/db/db_raii.cpp b/src/mongo/db/db_raii.cpp
index fae5952f834..8cf2455629b 100644
--- a/src/mongo/db/db_raii.cpp
+++ b/src/mongo/db/db_raii.cpp
@@ -318,8 +318,9 @@ AutoGetCollectionForReadCommand::AutoGetCollectionForReadCommand(
: kDoNotChangeProfilingLevel,
deadline) {
if (!_autoCollForRead.getView()) {
- // We have both the DB and collection locked, which is the prerequisite to do a stable shard
- // version check, but we'd like to do the check after we have a satisfactory snapshot.
+ // Perform the check early so the query planner would be able to extract the correct
+ // shard key. Also make sure that version is compatible if query planner decides to
+ // use an empty plan.
auto css = CollectionShardingState::get(opCtx, _autoCollForRead.getNss());
css->checkShardVersionOrThrow(opCtx);
}
diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp
index ef2b73846f3..0e659a37da5 100644
--- a/src/mongo/db/pipeline/pipeline_d.cpp
+++ b/src/mongo/db/pipeline/pipeline_d.cpp
@@ -127,16 +127,13 @@ StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> createRandomCursorEx
// If the incoming operation is sharded, use the CSS to infer the filtering metadata for the
// collection, otherwise treat it as unsharded
- boost::optional<ScopedCollectionMetadata> shardMetadata =
- (OperationShardingState::isOperationVersioned(opCtx)
- ? CollectionShardingState::get(opCtx, coll->ns())->getOrphansFilter(opCtx)
- : boost::optional<ScopedCollectionMetadata>{});
+ auto shardMetadata = CollectionShardingState::get(opCtx, coll->ns())->getOrphansFilter(opCtx);
// Because 'numRecords' includes orphan documents, our initial decision to optimize the $sample
// cursor may have been mistaken. For sharded collections, build a TRIAL plan that will switch
// to a collection scan if the ratio of orphaned to owned documents encountered over the first
// 100 works() is such that we would have chosen not to optimize.
- if (shardMetadata && (*shardMetadata)->isSharded()) {
+ if (shardMetadata->isSharded()) {
// The ratio of owned to orphaned documents must be at least equal to the ratio between the
// requested sampleSize and the maximum permitted sampleSize for the original constraints to
// be satisfied. For instance, if there are 200 documents and the sampleSize is 5, then at
@@ -147,12 +144,12 @@ StatusWith<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> createRandomCursorEx
sampleSize / (numRecords * kMaxSampleRatioForRandCursor), kMaxSampleRatioForRandCursor);
// The trial plan is SHARDING_FILTER-MULTI_ITERATOR.
auto randomCursorPlan =
- std::make_unique<ShardFilterStage>(opCtx, *shardMetadata, ws.get(), root.release());
+ std::make_unique<ShardFilterStage>(opCtx, shardMetadata, ws.get(), root.release());
// The backup plan is SHARDING_FILTER-COLLSCAN.
std::unique_ptr<PlanStage> collScanPlan = std::make_unique<CollectionScan>(
opCtx, coll, CollectionScanParams{}, ws.get(), nullptr);
collScanPlan = std::make_unique<ShardFilterStage>(
- opCtx, *shardMetadata, ws.get(), collScanPlan.release());
+ opCtx, shardMetadata, ws.get(), collScanPlan.release());
// Place a TRIAL stage at the root of the plan tree, and pass it the trial and backup plans.
root = std::make_unique<TrialStage>(opCtx,
ws.get(),
diff --git a/src/mongo/db/pipeline/process_interface_shardsvr.cpp b/src/mongo/db/pipeline/process_interface_shardsvr.cpp
index c2672dcf2a0..74508c4a10e 100644
--- a/src/mongo/db/pipeline/process_interface_shardsvr.cpp
+++ b/src/mongo/db/pipeline/process_interface_shardsvr.cpp
@@ -206,8 +206,9 @@ unique_ptr<Pipeline, PipelineDeleter> MongoInterfaceShardServer::attachCursorSou
std::unique_ptr<ShardFilterer> MongoInterfaceShardServer::getShardFilterer(
const boost::intrusive_ptr<ExpressionContext>& expCtx) const {
- return std::make_unique<ShardFiltererImpl>(
- CollectionShardingState::get(expCtx->opCtx, expCtx->ns)->getOrphansFilter(expCtx->opCtx));
+ auto shardingMetadata =
+ CollectionShardingState::get(expCtx->opCtx, expCtx->ns)->getOrphansFilter(expCtx->opCtx);
+ return std::make_unique<ShardFiltererImpl>(std::move(shardingMetadata));
}
} // namespace mongo
diff --git a/src/mongo/db/pipeline/process_interface_standalone.h b/src/mongo/db/pipeline/process_interface_standalone.h
index 5ba8ea4f21b..1569e090c89 100644
--- a/src/mongo/db/pipeline/process_interface_standalone.h
+++ b/src/mongo/db/pipeline/process_interface_standalone.h
@@ -57,6 +57,11 @@ public:
DBClientBase* directClient() final;
std::unique_ptr<TransactionHistoryIteratorBase> createTransactionHistoryIterator(
repl::OpTime time) const final;
+
+ /**
+ * Note: Information returned can be stale. Caller should always attach shardVersion when
+ * sending request against nss based on this information.
+ */
bool isSharded(OperationContext* opCtx, const NamespaceString& nss) final;
Status insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
const NamespaceString& ns,
diff --git a/src/mongo/db/query/find.cpp b/src/mongo/db/query/find.cpp
index 0a47bc2ce0a..8f4132b1eaf 100644
--- a/src/mongo/db/query/find.cpp
+++ b/src/mongo/db/query/find.cpp
@@ -710,11 +710,6 @@ std::string runQuery(OperationContext* opCtx,
MONGO_UNREACHABLE;
}
- // Before saving the cursor, ensure that whatever plan we established happened with the expected
- // collection version
- auto css = CollectionShardingState::get(opCtx, nss);
- css->checkShardVersionOrThrow(opCtx);
-
// Fill out CurOp based on query results. If we have a cursorid, we will fill out CurOp with
// this cursorid later.
long long ccId = 0;
diff --git a/src/mongo/db/query/stage_builder.cpp b/src/mongo/db/query/stage_builder.cpp
index ac2295c4985..190ed408770 100644
--- a/src/mongo/db/query/stage_builder.cpp
+++ b/src/mongo/db/query/stage_builder.cpp
@@ -307,11 +307,9 @@ PlanStage* buildStages(OperationContext* opCtx,
if (nullptr == childStage) {
return nullptr;
}
- return new ShardFilterStage(
- opCtx,
- CollectionShardingState::get(opCtx, collection->ns())->getOrphansFilter(opCtx),
- ws,
- childStage);
+
+ auto css = CollectionShardingState::get(opCtx, collection->ns());
+ return new ShardFilterStage(opCtx, css->getOrphansFilter(opCtx), ws, childStage);
}
case STAGE_DISTINCT_SCAN: {
const DistinctNode* dn = static_cast<const DistinctNode*>(root);
diff --git a/src/mongo/db/s/collection_sharding_runtime.cpp b/src/mongo/db/s/collection_sharding_runtime.cpp
index 6c48b8faf40..fd09b44ace5 100644
--- a/src/mongo/db/s/collection_sharding_runtime.cpp
+++ b/src/mongo/db/s/collection_sharding_runtime.cpp
@@ -96,7 +96,8 @@ void CollectionShardingRuntime::setFilteringMetadata(OperationContext* opCtx,
CollectionMetadata newMetadata) {
invariant(!newMetadata.isSharded() || !isNamespaceAlwaysUnsharded(_nss),
str::stream() << "Namespace " << _nss.ns() << " must never be sharded.");
- invariant(opCtx->lockState()->isCollectionLockedForMode(_nss, MODE_X));
+
+ auto csrLock = CollectionShardingState::CSRLock::lockExclusive(opCtx, this);
_metadataManager->setFilteringMetadata(std::move(newMetadata));
}
diff --git a/src/mongo/db/s/collection_sharding_state.cpp b/src/mongo/db/s/collection_sharding_state.cpp
index e0dd987600d..feb519090e3 100644
--- a/src/mongo/db/s/collection_sharding_state.cpp
+++ b/src/mongo/db/s/collection_sharding_state.cpp
@@ -160,12 +160,8 @@ void CollectionShardingState::report(OperationContext* opCtx, BSONObjBuilder* bu
}
ScopedCollectionMetadata CollectionShardingState::getOrphansFilter(OperationContext* opCtx) {
- const auto receivedShardVersion = getOperationReceivedVersion(opCtx, _nss);
- if (!receivedShardVersion)
- return {kUnshardedCollection};
-
const auto atClusterTime = repl::ReadConcernArgs::get(opCtx).getArgsAtClusterTime();
- auto optMetadata = _getMetadata(atClusterTime);
+ auto optMetadata = _getMetadataWithVersionCheckAt(opCtx, atClusterTime);
if (!optMetadata)
return {kUnshardedCollection};
@@ -199,26 +195,34 @@ boost::optional<ChunkVersion> CollectionShardingState::getCurrentShardVersionIfK
}
void CollectionShardingState::checkShardVersionOrThrow(OperationContext* opCtx) {
+ (void)_getMetadataWithVersionCheckAt(opCtx, boost::none);
+}
+
+boost::optional<ScopedCollectionMetadata> CollectionShardingState::_getMetadataWithVersionCheckAt(
+ OperationContext* opCtx, const boost::optional<mongo::LogicalTime>& atClusterTime) {
const auto optReceivedShardVersion = getOperationReceivedVersion(opCtx, _nss);
if (!optReceivedShardVersion)
- return;
+ return ScopedCollectionMetadata(kUnshardedCollection);
const auto& receivedShardVersion = *optReceivedShardVersion;
if (ChunkVersion::isIgnoredVersion(receivedShardVersion)) {
- return;
+ return boost::none;
}
// An operation with read concern 'available' should never have shardVersion set.
invariant(repl::ReadConcernArgs::get(opCtx).getLevel() !=
repl::ReadConcernLevel::kAvailableReadConcern);
- const auto metadata = getCurrentMetadata();
- const auto wantedShardVersion =
- metadata->isSharded() ? metadata->getShardVersion() : ChunkVersion::UNSHARDED();
+ auto csrLock = CSRLock::lockShared(opCtx, this);
+
+ auto metadata = _getMetadata(atClusterTime);
+ auto wantedShardVersion = ChunkVersion::UNSHARDED();
+ if (metadata && (*metadata)->isSharded()) {
+ wantedShardVersion = (*metadata)->getShardVersion();
+ }
auto criticalSectionSignal = [&] {
- auto csrLock = CSRLock::lockShared(opCtx, this);
return _critSec.getSignal(opCtx->lockState()->isWriteLocked()
? ShardingMigrationCriticalSection::kWrite
: ShardingMigrationCriticalSection::kRead);
@@ -235,7 +239,7 @@ void CollectionShardingState::checkShardVersionOrThrow(OperationContext* opCtx)
}
if (receivedShardVersion.isWriteCompatibleWith(wantedShardVersion)) {
- return;
+ return metadata;
}
//
diff --git a/src/mongo/db/s/collection_sharding_state.h b/src/mongo/db/s/collection_sharding_state.h
index d62b010bec6..906c366b8fb 100644
--- a/src/mongo/db/s/collection_sharding_state.h
+++ b/src/mongo/db/s/collection_sharding_state.h
@@ -91,8 +91,9 @@ public:
* metadata object.
*
* The intended users of this method are callers which need to perform orphan filtering. Use
- * 'getCurrentMetadata' for all other cases, where just sharding-related properties of the
- * collection are necessary (e.g., isSharded or the shard key).
+ * 'getCurrentMetadata' for other cases, like obtaining information about sharding-related
+ * properties of the collection are necessary that won't change under collection IX/IS lock
+ * (e.g., isSharded or the shard key).
*
* The returned object is safe to access even after the collection lock has been dropped.
*/
@@ -155,6 +156,13 @@ protected:
private:
friend CSRLock;
+ /**
+ * Returns the latest version of collection metadata with filtering configured for
+ * atClusterTime if specified.
+ */
+ boost::optional<ScopedCollectionMetadata> _getMetadataWithVersionCheckAt(
+ OperationContext* opCtx, const boost::optional<mongo::LogicalTime>& atClusterTime);
+
// Object-wide ResourceMutex to protect changes to the CollectionShardingRuntime or objects
// held within. Use only the CollectionShardingRuntimeLock to lock this mutex.
Lock::ResourceMutex _stateChangeMutex;
diff --git a/src/mongo/db/s/op_observer_sharding_impl.cpp b/src/mongo/db/s/op_observer_sharding_impl.cpp
index 2bd4165b89f..606d8ae7dfc 100644
--- a/src/mongo/db/s/op_observer_sharding_impl.cpp
+++ b/src/mongo/db/s/op_observer_sharding_impl.cpp
@@ -42,9 +42,10 @@ namespace {
const auto getIsMigrating = OperationContext::declareDecoration<bool>();
/**
- * Write operations do shard version checking, but do not perform orphan document filtering. Because
- * of this, if an update operation runs as part of a 'readConcern:snapshot' transaction, it might
- * get routed to a shard which no longer owns the chunk being written to. In such cases, throw a
+ * Write operations do shard version checking, but if an update operation runs as part of a
+ * 'readConcern:snapshot' transaction, the router could have used the metadata at the snapshot
+ * time and yet set the latest shard version on the request. This is why the write can get routed
+ * to a shard which no longer owns the chunk being written to. In such cases, throw a
* MigrationConflict exception to indicate that the transaction needs to be rolled-back and
* restarted.
*/
diff --git a/src/mongo/db/s/shard_filtering_metadata_refresh.cpp b/src/mongo/db/s/shard_filtering_metadata_refresh.cpp
index 78dbafcae2d..155be7189d5 100644
--- a/src/mongo/db/s/shard_filtering_metadata_refresh.cpp
+++ b/src/mongo/db/s/shard_filtering_metadata_refresh.cpp
@@ -153,8 +153,7 @@ ChunkVersion forceShardFilteringMetadataRefresh(OperationContext* opCtx,
if (!cm) {
// No chunk manager, so unsharded.
- // Exclusive collection lock needed since we're now changing the metadata
- AutoGetCollection autoColl(opCtx, nss, MODE_X);
+ AutoGetCollection autoColl(opCtx, nss, MODE_IX);
CollectionShardingRuntime::get(opCtx, nss)
->setFilteringMetadata(opCtx, CollectionMetadata());
@@ -181,7 +180,7 @@ ChunkVersion forceShardFilteringMetadataRefresh(OperationContext* opCtx,
}
// Exclusive collection lock needed since we're now changing the metadata
- AutoGetCollection autoColl(opCtx, nss, MODE_X);
+ AutoGetCollection autoColl(opCtx, nss, MODE_IX);
auto* const css = CollectionShardingRuntime::get(opCtx, nss);
{