summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBrett Nawrocki <brett.nawrocki@mongodb.com>2021-09-16 20:28:58 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-09-21 22:10:12 +0000
commit31f1068f5273f1dd55378eda3fb4bdc441e0a15e (patch)
tree534b79539507bfc77a9f3e523b44880fca9318a0
parent14b059e544578774d7f7ee3aacdc6ac47a15bb4c (diff)
downloadmongo-31f1068f5273f1dd55378eda3fb4bdc441e0a15e.tar.gz
SERVER-60094 Add shard version to internal write
ReshardingOplogApplicationRules::_applyDelete_inlock() has special logic to, upon removing a document from the temporary resharding collection, move a document from one of the other stash collections (i.e. one associated with a different donor shard) into its place. The runWithTransaction() helper does so by using AlternativeSessionRegion which will construct a separate OperationContext. The write being performed in this separate OperationContext won't have shard version attached for the temporary resharding collection like the original one had. Therefore, call initializeClientRoutingVersions() on this context to set the version.
-rw-r--r--jstests/sharding/resharding_secondary_recovers_temp_ns_metadata.js7
-rw-r--r--src/mongo/db/catalog_raii.cpp3
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_application.cpp14
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_batch_applier.cpp2
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_crud_application_test.cpp69
5 files changed, 72 insertions, 23 deletions
diff --git a/jstests/sharding/resharding_secondary_recovers_temp_ns_metadata.js b/jstests/sharding/resharding_secondary_recovers_temp_ns_metadata.js
index 723c4c1fc3c..f7360d2430c 100644
--- a/jstests/sharding/resharding_secondary_recovers_temp_ns_metadata.js
+++ b/jstests/sharding/resharding_secondary_recovers_temp_ns_metadata.js
@@ -39,6 +39,13 @@ reshardingTest.withReshardingInBackground( //
// recovered from the config server.
reshardingTest.stepUpNewPrimaryOnShard(recipientShardNames[0]);
assert.commandWorked(sourceCollection.insert({oldKey: 1, newKey: 2}));
+
+ /* TODO SERVER-59721: Enable tests for update and remove
+ reshardingTest.stepUpNewPrimaryOnShard(recipientShardNames[0]);
+ assert.commandWorked(sourceCollection.update({oldKey: 1, newKey: 2}, {$set: {extra: 3}}));
+
+ reshardingTest.stepUpNewPrimaryOnShard(recipientShardNames[0]);
+ assert.commandWorked(sourceCollection.remove({oldKey: 1, newKey: 2}, {justOne: true})); */
});
reshardingTest.teardown();
diff --git a/src/mongo/db/catalog_raii.cpp b/src/mongo/db/catalog_raii.cpp
index bc21e6a5ef8..77f71f6d822 100644
--- a/src/mongo/db/catalog_raii.cpp
+++ b/src/mongo/db/catalog_raii.cpp
@@ -131,8 +131,7 @@ AutoGetCollection::AutoGetCollection(OperationContext* opCtx,
_coll = catalog->lookupCollectionByNamespace(opCtx, _resolvedNss);
invariant(!nsOrUUID.uuid() || _coll,
str::stream() << "Collection for " << _resolvedNss.ns()
- << " disappeared after successufully resolving "
- << nsOrUUID.toString());
+ << " disappeared after successfully resolving " << nsOrUUID.toString());
if (_coll) {
// If we are in a transaction, we cannot yield and wait when there are pending catalog
diff --git a/src/mongo/db/s/resharding/resharding_oplog_application.cpp b/src/mongo/db/s/resharding/resharding_oplog_application.cpp
index f704bb10181..9bce751d916 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_application.cpp
+++ b/src/mongo/db/s/resharding/resharding_oplog_application.cpp
@@ -45,6 +45,7 @@
#include "mongo/db/query/get_executor.h"
#include "mongo/db/query/plan_executor.h"
#include "mongo/db/repl/oplog_applier_utils.h"
+#include "mongo/db/s/operation_sharding_state.h"
#include "mongo/db/s/resharding/resharding_metrics.h"
#include "mongo/db/s/resharding/resharding_server_parameters_gen.h"
#include "mongo/db/session_catalog_mongod.h"
@@ -59,7 +60,9 @@ Date_t getDeadline(OperationContext* opCtx) {
Milliseconds(resharding::gReshardingOplogApplierMaxLockRequestTimeoutMillis.load());
}
-void runWithTransaction(OperationContext* opCtx, unique_function<void(OperationContext*)> func) {
+void runWithTransaction(OperationContext* opCtx,
+ const NamespaceString& nss,
+ unique_function<void(OperationContext*)> func) {
AlternativeSessionRegion asr(opCtx);
auto* const client = asr.opCtx()->getClient();
{
@@ -73,6 +76,13 @@ void runWithTransaction(OperationContext* opCtx, unique_function<void(OperationC
asr.opCtx()->setTxnNumber(txnNumber);
asr.opCtx()->setInMultiDocumentTransaction();
+ // ReshardingOpObserver depends on the collection metadata being known when processing writes to
+ // the temporary resharding collection. We attach shard version IGNORED to the write operations
+ // and leave it to ReshardingOplogBatchApplier::applyBatch() to retry on a StaleConfig exception
+ // to allow the collection metadata information to be recovered.
+ auto& oss = OperationShardingState::get(asr.opCtx());
+ oss.initializeClientRoutingVersions(nss, ChunkVersion::IGNORED(), boost::none);
+
MongoDOperationContextSession ocs(asr.opCtx());
auto txnParticipant = TransactionParticipant::get(asr.opCtx());
@@ -423,7 +433,7 @@ void ReshardingOplogApplicationRules::_applyDelete_inlock(OperationContext* opCt
// We must run 'findByIdAndNoopUpdate' in the same storage transaction as the ops run in the
// single replica set transaction that is executed if we apply rule #4, so we therefore must run
// 'findByIdAndNoopUpdate' as a part of the single replica set transaction.
- runWithTransaction(opCtx, [this, idQuery](OperationContext* opCtx) {
+ runWithTransaction(opCtx, _outputNss, [this, idQuery](OperationContext* opCtx) {
AutoGetCollection autoCollOutput(opCtx,
_outputNss,
MODE_IX,
diff --git a/src/mongo/db/s/resharding/resharding_oplog_batch_applier.cpp b/src/mongo/db/s/resharding/resharding_oplog_batch_applier.cpp
index d43a94b708c..ee732a68207 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_batch_applier.cpp
+++ b/src/mongo/db/s/resharding/resharding_oplog_batch_applier.cpp
@@ -82,7 +82,7 @@ SemiFuture<void> ReshardingOplogBatchApplier::applyBatch(
} else {
// ReshardingOpObserver depends on the collection metadata being known
// when processing writes to the temporary resharding collection. We
- // attach shard version IGNORED to the insert operations and retry once
+ // attach shard version IGNORED to the write operations and retry once
// on a StaleConfig exception to allow the collection metadata
// information to be recovered.
auto& oss = OperationShardingState::get(opCtx.get());
diff --git a/src/mongo/db/s/resharding/resharding_oplog_crud_application_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_crud_application_test.cpp
index 02889248639..4be2c87a008 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_crud_application_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_oplog_crud_application_test.cpp
@@ -45,11 +45,13 @@
#include "mongo/db/repl/repl_client_info.h"
#include "mongo/db/repl/replication_coordinator_mock.h"
#include "mongo/db/repl/storage_interface_impl.h"
+#include "mongo/db/s/collection_sharding_runtime.h"
#include "mongo/db/s/op_observer_sharding_impl.h"
#include "mongo/db/s/resharding/resharding_data_copy_util.h"
#include "mongo/db/s/resharding/resharding_metrics.h"
#include "mongo/db/s/resharding/resharding_oplog_application.h"
#include "mongo/db/s/resharding_util.h"
+#include "mongo/db/s/sharding_state.h"
#include "mongo/db/service_context_d_test_fixture.h"
#include "mongo/db/session_catalog_mongod.h"
#include "mongo/s/catalog/type_chunk.h"
@@ -64,11 +66,11 @@ public:
void setUp() override {
ServiceContextMongoDTest::setUp();
- auto serviceContext = getServiceContext();
-
// Initialize sharding components as a shard server.
serverGlobalParams.clusterRole = ClusterRole::ShardServer;
+ auto serviceContext = getServiceContext();
+ ShardingState::get(serviceContext)->setInitialized(_myDonorId.toString(), OID::gen());
{
auto opCtx = makeOperationContext();
auto replCoord = std::make_unique<repl::ReplicationCoordinatorMock>(serviceContext);
@@ -100,6 +102,14 @@ public:
opCtx.get(), nss, CollectionOptions{});
}
+ {
+ AutoGetCollection autoColl(opCtx.get(), _outputNss, MODE_X);
+ CollectionShardingRuntime::get(opCtx.get(), _outputNss)
+ ->setFilteringMetadata(
+ opCtx.get(),
+ CollectionMetadata(makeChunkManagerForOutputCollection(), _myDonorId));
+ }
+
_metrics = std::make_unique<ReshardingMetrics>(getServiceContext());
_applier = std::make_unique<ReshardingOplogApplicationRules>(
_outputNss,
@@ -235,6 +245,30 @@ public:
}
private:
+ ChunkManager makeChunkManager(const OID& epoch,
+ const ShardId& shardId,
+ const NamespaceString& nss,
+ const UUID& uuid,
+ const BSONObj& shardKey,
+ const std::vector<ChunkType>& chunks) {
+ auto rt = RoutingTableHistory::makeNew(nss,
+ uuid,
+ shardKey,
+ nullptr /* defaultCollator */,
+ false /* unique */,
+ epoch,
+ Timestamp(),
+ boost::none /* timeseriesFields */,
+ boost::none /* reshardingFields */,
+ boost::none /* chunkSizeBytes */,
+ true /* allowMigrations */,
+ chunks);
+ return ChunkManager(shardId,
+ DatabaseVersion(UUID::gen(), Timestamp()),
+ makeStandaloneRoutingTableHistory(std::move(rt)),
+ boost::none /* clusterTime */);
+ }
+
ChunkManager makeChunkManagerForSourceCollection() {
// Create three chunks, two that are owned by this donor shard and one owned by some other
// shard. The chunk for {sk: null} is owned by this donor shard to allow test cases to omit
@@ -257,23 +291,21 @@ private:
ChunkVersion(100, 2, epoch, Timestamp()),
_myDonorId}};
- auto rt = RoutingTableHistory::makeNew(_sourceNss,
- _sourceUUID,
- BSON(_currentShardKey << 1),
- nullptr /* defaultCollator */,
- false /* unique */,
- std::move(epoch),
- Timestamp(),
- boost::none /* timeseriesFields */,
- boost::none /* reshardingFields */,
- boost::none /* chunkSizeBytes */,
- true /* allowMigrations */,
- chunks);
+ return makeChunkManager(
+ epoch, _myDonorId, _sourceNss, _sourceUUID, BSON(_currentShardKey << 1), chunks);
+ }
- return ChunkManager(_myDonorId,
- DatabaseVersion(UUID::gen(), Timestamp()),
- makeStandaloneRoutingTableHistory(std::move(rt)),
- boost::none /* clusterTime */);
+ ChunkManager makeChunkManagerForOutputCollection() {
+ const OID epoch = OID::gen();
+ const CollectionUUID outputUuid = UUID::gen();
+ std::vector<ChunkType> chunks = {
+ ChunkType{outputUuid,
+ ChunkRange{BSON(_newShardKey << MINKEY), BSON(_newShardKey << MAXKEY)},
+ ChunkVersion(100, 0, epoch, Timestamp()),
+ _myDonorId}};
+
+ return makeChunkManager(
+ epoch, _myDonorId, _outputNss, outputUuid, BSON(_newShardKey << 1), chunks);
}
RoutingTableHistoryValueHandle makeStandaloneRoutingTableHistory(RoutingTableHistory rt) {
@@ -283,6 +315,7 @@ private:
}
const StringData _currentShardKey = "sk";
+ const StringData _newShardKey = "new_sk";
const NamespaceString _sourceNss{"test_crud", "collection_being_resharded"};
const CollectionUUID _sourceUUID = UUID::gen();