diff options
author | Brett Nawrocki <brett.nawrocki@mongodb.com> | 2021-09-16 20:28:58 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-09-21 22:10:12 +0000 |
commit | 31f1068f5273f1dd55378eda3fb4bdc441e0a15e (patch) | |
tree | 534b79539507bfc77a9f3e523b44880fca9318a0 | |
parent | 14b059e544578774d7f7ee3aacdc6ac47a15bb4c (diff) | |
download | mongo-31f1068f5273f1dd55378eda3fb4bdc441e0a15e.tar.gz |
SERVER-60094 Add shard version to internal write
ReshardingOplogApplicationRules::_applyDelete_inlock() has special logic
to, upon removing a document from the temporary resharding collection,
move a document from one of the other stash collections (i.e. one
associated with a different donor shard) into its place. The
runWithTransaction() helper does so by using AlternativeSessionRegion
which will construct a separate OperationContext. The write being
performed in this separate OperationContext won't have shard version
attached for the temporary resharding collection like the original one
had. Therefore, call initializeClientRoutingVersions() on this context
to set the version.
5 files changed, 72 insertions, 23 deletions
diff --git a/jstests/sharding/resharding_secondary_recovers_temp_ns_metadata.js b/jstests/sharding/resharding_secondary_recovers_temp_ns_metadata.js index 723c4c1fc3c..f7360d2430c 100644 --- a/jstests/sharding/resharding_secondary_recovers_temp_ns_metadata.js +++ b/jstests/sharding/resharding_secondary_recovers_temp_ns_metadata.js @@ -39,6 +39,13 @@ reshardingTest.withReshardingInBackground( // // recovered from the config server. reshardingTest.stepUpNewPrimaryOnShard(recipientShardNames[0]); assert.commandWorked(sourceCollection.insert({oldKey: 1, newKey: 2})); + + /* TODO SERVER-59721: Enable tests for update and remove + reshardingTest.stepUpNewPrimaryOnShard(recipientShardNames[0]); + assert.commandWorked(sourceCollection.update({oldKey: 1, newKey: 2}, {$set: {extra: 3}})); + + reshardingTest.stepUpNewPrimaryOnShard(recipientShardNames[0]); + assert.commandWorked(sourceCollection.remove({oldKey: 1, newKey: 2}, {justOne: true})); */ }); reshardingTest.teardown(); diff --git a/src/mongo/db/catalog_raii.cpp b/src/mongo/db/catalog_raii.cpp index bc21e6a5ef8..77f71f6d822 100644 --- a/src/mongo/db/catalog_raii.cpp +++ b/src/mongo/db/catalog_raii.cpp @@ -131,8 +131,7 @@ AutoGetCollection::AutoGetCollection(OperationContext* opCtx, _coll = catalog->lookupCollectionByNamespace(opCtx, _resolvedNss); invariant(!nsOrUUID.uuid() || _coll, str::stream() << "Collection for " << _resolvedNss.ns() - << " disappeared after successufully resolving " - << nsOrUUID.toString()); + << " disappeared after successfully resolving " << nsOrUUID.toString()); if (_coll) { // If we are in a transaction, we cannot yield and wait when there are pending catalog diff --git a/src/mongo/db/s/resharding/resharding_oplog_application.cpp b/src/mongo/db/s/resharding/resharding_oplog_application.cpp index f704bb10181..9bce751d916 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_application.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_application.cpp @@ -45,6 +45,7 @@ #include "mongo/db/query/get_executor.h" #include "mongo/db/query/plan_executor.h" #include "mongo/db/repl/oplog_applier_utils.h" +#include "mongo/db/s/operation_sharding_state.h" #include "mongo/db/s/resharding/resharding_metrics.h" #include "mongo/db/s/resharding/resharding_server_parameters_gen.h" #include "mongo/db/session_catalog_mongod.h" @@ -59,7 +60,9 @@ Date_t getDeadline(OperationContext* opCtx) { Milliseconds(resharding::gReshardingOplogApplierMaxLockRequestTimeoutMillis.load()); } -void runWithTransaction(OperationContext* opCtx, unique_function<void(OperationContext*)> func) { +void runWithTransaction(OperationContext* opCtx, + const NamespaceString& nss, + unique_function<void(OperationContext*)> func) { AlternativeSessionRegion asr(opCtx); auto* const client = asr.opCtx()->getClient(); { @@ -73,6 +76,13 @@ void runWithTransaction(OperationContext* opCtx, unique_function<void(OperationC asr.opCtx()->setTxnNumber(txnNumber); asr.opCtx()->setInMultiDocumentTransaction(); + // ReshardingOpObserver depends on the collection metadata being known when processing writes to + // the temporary resharding collection. We attach shard version IGNORED to the write operations + // and leave it to ReshardingOplogBatchApplier::applyBatch() to retry on a StaleConfig exception + // to allow the collection metadata information to be recovered. + auto& oss = OperationShardingState::get(asr.opCtx()); + oss.initializeClientRoutingVersions(nss, ChunkVersion::IGNORED(), boost::none); + MongoDOperationContextSession ocs(asr.opCtx()); auto txnParticipant = TransactionParticipant::get(asr.opCtx()); @@ -423,7 +433,7 @@ void ReshardingOplogApplicationRules::_applyDelete_inlock(OperationContext* opCt // We must run 'findByIdAndNoopUpdate' in the same storage transaction as the ops run in the // single replica set transaction that is executed if we apply rule #4, so we therefore must run // 'findByIdAndNoopUpdate' as a part of the single replica set transaction. - runWithTransaction(opCtx, [this, idQuery](OperationContext* opCtx) { + runWithTransaction(opCtx, _outputNss, [this, idQuery](OperationContext* opCtx) { AutoGetCollection autoCollOutput(opCtx, _outputNss, MODE_IX, diff --git a/src/mongo/db/s/resharding/resharding_oplog_batch_applier.cpp b/src/mongo/db/s/resharding/resharding_oplog_batch_applier.cpp index d43a94b708c..ee732a68207 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_batch_applier.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_batch_applier.cpp @@ -82,7 +82,7 @@ SemiFuture<void> ReshardingOplogBatchApplier::applyBatch( } else { // ReshardingOpObserver depends on the collection metadata being known // when processing writes to the temporary resharding collection. We - // attach shard version IGNORED to the insert operations and retry once + // attach shard version IGNORED to the write operations and retry once // on a StaleConfig exception to allow the collection metadata // information to be recovered. auto& oss = OperationShardingState::get(opCtx.get()); diff --git a/src/mongo/db/s/resharding/resharding_oplog_crud_application_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_crud_application_test.cpp index 02889248639..4be2c87a008 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_crud_application_test.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_crud_application_test.cpp @@ -45,11 +45,13 @@ #include "mongo/db/repl/repl_client_info.h" #include "mongo/db/repl/replication_coordinator_mock.h" #include "mongo/db/repl/storage_interface_impl.h" +#include "mongo/db/s/collection_sharding_runtime.h" #include "mongo/db/s/op_observer_sharding_impl.h" #include "mongo/db/s/resharding/resharding_data_copy_util.h" #include "mongo/db/s/resharding/resharding_metrics.h" #include "mongo/db/s/resharding/resharding_oplog_application.h" #include "mongo/db/s/resharding_util.h" +#include "mongo/db/s/sharding_state.h" #include "mongo/db/service_context_d_test_fixture.h" #include "mongo/db/session_catalog_mongod.h" #include "mongo/s/catalog/type_chunk.h" @@ -64,11 +66,11 @@ public: void setUp() override { ServiceContextMongoDTest::setUp(); - auto serviceContext = getServiceContext(); - // Initialize sharding components as a shard server. serverGlobalParams.clusterRole = ClusterRole::ShardServer; + auto serviceContext = getServiceContext(); + ShardingState::get(serviceContext)->setInitialized(_myDonorId.toString(), OID::gen()); { auto opCtx = makeOperationContext(); auto replCoord = std::make_unique<repl::ReplicationCoordinatorMock>(serviceContext); @@ -100,6 +102,14 @@ public: opCtx.get(), nss, CollectionOptions{}); } + { + AutoGetCollection autoColl(opCtx.get(), _outputNss, MODE_X); + CollectionShardingRuntime::get(opCtx.get(), _outputNss) + ->setFilteringMetadata( + opCtx.get(), + CollectionMetadata(makeChunkManagerForOutputCollection(), _myDonorId)); + } + _metrics = std::make_unique<ReshardingMetrics>(getServiceContext()); _applier = std::make_unique<ReshardingOplogApplicationRules>( _outputNss, @@ -235,6 +245,30 @@ public: } private: + ChunkManager makeChunkManager(const OID& epoch, + const ShardId& shardId, + const NamespaceString& nss, + const UUID& uuid, + const BSONObj& shardKey, + const std::vector<ChunkType>& chunks) { + auto rt = RoutingTableHistory::makeNew(nss, + uuid, + shardKey, + nullptr /* defaultCollator */, + false /* unique */, + epoch, + Timestamp(), + boost::none /* timeseriesFields */, + boost::none /* reshardingFields */, + boost::none /* chunkSizeBytes */, + true /* allowMigrations */, + chunks); + return ChunkManager(shardId, + DatabaseVersion(UUID::gen(), Timestamp()), + makeStandaloneRoutingTableHistory(std::move(rt)), + boost::none /* clusterTime */); + } + ChunkManager makeChunkManagerForSourceCollection() { // Create three chunks, two that are owned by this donor shard and one owned by some other // shard. The chunk for {sk: null} is owned by this donor shard to allow test cases to omit @@ -257,23 +291,21 @@ private: ChunkVersion(100, 2, epoch, Timestamp()), _myDonorId}}; - auto rt = RoutingTableHistory::makeNew(_sourceNss, - _sourceUUID, - BSON(_currentShardKey << 1), - nullptr /* defaultCollator */, - false /* unique */, - std::move(epoch), - Timestamp(), - boost::none /* timeseriesFields */, - boost::none /* reshardingFields */, - boost::none /* chunkSizeBytes */, - true /* allowMigrations */, - chunks); + return makeChunkManager( + epoch, _myDonorId, _sourceNss, _sourceUUID, BSON(_currentShardKey << 1), chunks); + } - return ChunkManager(_myDonorId, - DatabaseVersion(UUID::gen(), Timestamp()), - makeStandaloneRoutingTableHistory(std::move(rt)), - boost::none /* clusterTime */); + ChunkManager makeChunkManagerForOutputCollection() { + const OID epoch = OID::gen(); + const CollectionUUID outputUuid = UUID::gen(); + std::vector<ChunkType> chunks = { + ChunkType{outputUuid, + ChunkRange{BSON(_newShardKey << MINKEY), BSON(_newShardKey << MAXKEY)}, + ChunkVersion(100, 0, epoch, Timestamp()), + _myDonorId}}; + + return makeChunkManager( + epoch, _myDonorId, _outputNss, outputUuid, BSON(_newShardKey << 1), chunks); } RoutingTableHistoryValueHandle makeStandaloneRoutingTableHistory(RoutingTableHistory rt) { @@ -283,6 +315,7 @@ private: } const StringData _currentShardKey = "sk"; + const StringData _newShardKey = "new_sk"; const NamespaceString _sourceNss{"test_crud", "collection_being_resharded"}; const CollectionUUID _sourceUUID = UUID::gen(); |