diff options
author | Allison Easton <allison.easton@mongodb.com> | 2022-08-08 15:14:03 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-08-08 15:42:02 +0000 |
commit | c0e5ce18648ccb6356c00db4a5fca05e24a9bfcd (patch) | |
tree | 7d89c71118ff53b0d4e10f0a21113cecebcdd032 | |
parent | d32fafd0ec9829d1c6c1a9c3bfb2f1dd15004688 (diff) | |
download | mongo-c0e5ce18648ccb6356c00db4a5fca05e24a9bfcd.tar.gz |
SERVER-56185 Investigate possible improvements with session migration and a chunk migration's critical section
(cherry picked from commit 6e8bf2cc701d81c11ad3b898cb4c394933034ac3)
-rw-r--r-- | src/mongo/bson/bsonobj.h | 2 | ||||
-rw-r--r-- | src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp | 16 | ||||
-rw-r--r-- | src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/s/session_catalog_migration_source.cpp | 17 | ||||
-rw-r--r-- | src/mongo/db/s/session_catalog_migration_source.h | 12 | ||||
-rw-r--r-- | src/mongo/db/s/session_catalog_migration_source_test.cpp | 101 |
6 files changed, 148 insertions, 4 deletions
diff --git a/src/mongo/bson/bsonobj.h b/src/mongo/bson/bsonobj.h index f22c9391bb6..230ddc84ff4 100644 --- a/src/mongo/bson/bsonobj.h +++ b/src/mongo/bson/bsonobj.h @@ -121,7 +121,7 @@ public: using ComparisonRules = BSONComparatorInterfaceBase<BSONObj>::ComparisonRules; using ComparisonRulesSet = BSONComparatorInterfaceBase<BSONObj>::ComparisonRulesSet; - static const char kMinBSONLength = 5; + static constexpr char kMinBSONLength = 5; /** Construct an empty BSONObj -- that is, {}. */ BSONObj() { diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp index 2ed87344465..ee26d0d0edf 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp @@ -55,6 +55,7 @@ #include "mongo/executor/task_executor_pool.h" #include "mongo/logv2/log.h" #include "mongo/rpc/get_status_from_command_result.h" +#include "mongo/s/balancer_configuration.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/grid.h" #include "mongo/util/elapsed_tracker.h" @@ -1131,7 +1132,8 @@ Status MigrationChunkClonerSourceLegacy::_checkRecipientCloningStatus(OperationC "docsRemainingToClone"_attr = cloneLocsRemaining); } - if (res["state"].String() == "steady") { + if (res["state"].String() == "steady" && _sessionCatalogSource->inCatchupPhase() && + _sessionCatalogSource->untransferredCatchUpDataSize() == 0) { if (cloneLocsRemaining != 0 || (_jumboChunkCloneState && _forceJumbo && PlanExecutor::IS_EOF != _jumboChunkCloneState->clonerState)) { @@ -1156,14 +1158,20 @@ Status MigrationChunkClonerSourceLegacy::_checkRecipientCloningStatus(OperationC supportsCriticalSectionDuringCatchUp = true; } - if (res["state"].String() == "catchup" && supportsCriticalSectionDuringCatchUp) { + if ((res["state"].String() == "steady" || res["state"].String() == "catchup") && + _sessionCatalogSource->inCatchupPhase() && supportsCriticalSectionDuringCatchUp) { int64_t estimatedUntransferredModsSize = _untransferredDeletesCounter * _averageObjectIdSize + _untransferredUpsertsCounter * _averageObjectSizeForCloneLocs; auto estimatedUntransferredChunkPercentage = (std::min(_args.getMaxChunkSizeBytes(), estimatedUntransferredModsSize) * 100) / _args.getMaxChunkSizeBytes(); - if (estimatedUntransferredChunkPercentage < maxCatchUpPercentageBeforeBlockingWrites) { + int64_t estimateUntransferredSessionsSize = + _sessionCatalogSource->untransferredCatchUpDataSize(); + int64_t maxUntransferredSessionsSize = BSONObjMaxUserSize * + _args.getMaxChunkSizeBytes() / ChunkSizeSettingsType::kDefaultMaxChunkSizeBytes; + if (estimatedUntransferredChunkPercentage < maxCatchUpPercentageBeforeBlockingWrites && + estimateUntransferredSessionsSize < maxUntransferredSessionsSize) { // The recipient is sufficiently caught-up with the writes on the donor. // Block writes, so that it can drain everything. LOGV2_DEBUG(5630700, @@ -1173,6 +1181,8 @@ Status MigrationChunkClonerSourceLegacy::_checkRecipientCloningStatus(OperationC "_untransferredDeletesCounter"_attr = _untransferredDeletesCounter, "_averageObjectSizeForCloneLocs"_attr = _averageObjectSizeForCloneLocs, "_averageObjectIdSize"_attr = _averageObjectIdSize, + "untransferredSessionDataInBytes"_attr = + estimateUntransferredSessionsSize, "maxChunksSizeBytes"_attr = _args.getMaxChunkSizeBytes(), "_sessionId"_attr = _sessionId.toString()); return Status::OK(); diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp index 8750348e8cc..feff389a508 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp @@ -71,6 +71,10 @@ protected: void setUp() override { ShardServerTestFixture::setUp(); + auto opCtx = operationContext(); + DBDirectClient client(opCtx); + client.createCollection(NamespaceString::kSessionTransactionsTableNamespace.ns()); + // TODO: SERVER-26919 set the flag on the mock repl coordinator just for the window where it // actually needs to bypass the op observer. replicationCoordinator()->alwaysAllowWrites(true); diff --git a/src/mongo/db/s/session_catalog_migration_source.cpp b/src/mongo/db/s/session_catalog_migration_source.cpp index 4061f8735f6..8249e2abc54 100644 --- a/src/mongo/db/s/session_catalog_migration_source.cpp +++ b/src/mongo/db/s/session_catalog_migration_source.cpp @@ -228,6 +228,14 @@ SessionCatalogMigrationSource::SessionCatalogMigrationSource(OperationContext* o WriteConcernOptions majority( WriteConcernOptions::kMajority, WriteConcernOptions::SyncMode::UNSET, 0); uassertStatusOK(waitForWriteConcern(opCtx, opTimeToWait, majority, &result)); + + AutoGetCollection autoColl(opCtx, NamespaceString::kSessionTransactionsTableNamespace, MODE_IS); + Collection* const collection = autoColl.getCollection(); + // Session docs contain at least LSID, TxnNumber, Timestamp, and some BSON overhead. + const int64_t defaultSessionDocSize = + sizeof(LogicalSessionId) + sizeof(TxnNumber) + sizeof(Timestamp) + 16; + _averageSessionDocSize = + collection ? collection->averageObjectSize(opCtx) : defaultSessionDocSize; } bool SessionCatalogMigrationSource::hasMoreOplog() { @@ -239,6 +247,15 @@ bool SessionCatalogMigrationSource::hasMoreOplog() { return _hasNewWrites(lk); } +bool SessionCatalogMigrationSource::inCatchupPhase() { + return !_hasMoreOplogFromSessionCatalog(); +} + +int64_t SessionCatalogMigrationSource::untransferredCatchUpDataSize() { + invariant(inCatchupPhase()); + return _newWriteOpTimeList.size() * _averageSessionDocSize; +} + void SessionCatalogMigrationSource::onCommitCloneStarted() { stdx::lock_guard<Latch> _lk(_newOplogMutex); diff --git a/src/mongo/db/s/session_catalog_migration_source.h b/src/mongo/db/s/session_catalog_migration_source.h index 9e138af12a1..32b9db37111 100644 --- a/src/mongo/db/s/session_catalog_migration_source.h +++ b/src/mongo/db/s/session_catalog_migration_source.h @@ -103,6 +103,16 @@ public: bool hasMoreOplog(); /** + * Returns true if the majority committed oplog entries are drained and false otherwise. + */ + bool inCatchupPhase(); + + /** + * Returns the estimated bytes of data left to transfer in _newWriteOpTimeList. + */ + int64_t untransferredCatchUpDataSize(); + + /** * Attempts to fetch the next oplog entry. Returns true if it was able to fetch anything. */ bool fetchNextOplog(OperationContext* opCtx); @@ -251,6 +261,8 @@ private: // Protects _newWriteTsList, _lastFetchedNewWriteOplog, _state, _newOplogNotification Mutex _newOplogMutex = MONGO_MAKE_LATCH("SessionCatalogMigrationSource::_newOplogMutex"); + // The average size of documents in config.transactions. + uint64_t _averageSessionDocSize; // Stores oplog opTime of new writes that are coming in. std::list<std::pair<repl::OpTime, EntryAtOpTimeType>> _newWriteOpTimeList; diff --git a/src/mongo/db/s/session_catalog_migration_source_test.cpp b/src/mongo/db/s/session_catalog_migration_source_test.cpp index 98e02f6b8c0..b1aa6686080 100644 --- a/src/mongo/db/s/session_catalog_migration_source_test.cpp +++ b/src/mongo/db/s/session_catalog_migration_source_test.cpp @@ -117,6 +117,8 @@ TEST_F(SessionCatalogMigrationSourceTest, NoSessionsToTransferShouldNotHaveOplog SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey); ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx())); ASSERT_FALSE(migrationSource.hasMoreOplog()); + ASSERT_TRUE(migrationSource.inCatchupPhase()); + ASSERT_EQ(0, migrationSource.untransferredCatchUpDataSize()); } TEST_F(SessionCatalogMigrationSourceTest, OneSessionWithTwoWrites) { @@ -1172,5 +1174,104 @@ TEST_F(SessionCatalogMigrationSourceTest, TwoSessionWithTwoWritesContainingWrite ASSERT_FALSE(migrationSource.hasMoreOplog()); } +TEST_F(SessionCatalogMigrationSourceTest, UntransferredDataSizeWithCommittedWrites) { + DBDirectClient client(opCtx()); + client.createCollection(NamespaceString::kSessionTransactionsTableNamespace.ns()); + // Enter an oplog entry before creating SessionCatalogMigrationSource to set config.transactions + // average object size to the size of this entry. + auto entry = makeOplogEntry( + repl::OpTime(Timestamp(52, 345), 2), // optime + repl::OpTypeEnum::kInsert, // op type + BSON("x" << 0), // o + boost::none, // o2 + Date_t::now(), // wall clock time + 0, // statement ids + repl::OpTime(Timestamp(0, 0), 0)); // optime of previous write within same transaction + insertOplogEntry(entry); + + SessionTxnRecord sessionRecord; + sessionRecord.setSessionId(makeLogicalSessionIdForTest()); + sessionRecord.setTxnNum(1); + sessionRecord.setLastWriteOpTime(entry.getOpTime()); + sessionRecord.setLastWriteDate(entry.getWallClockTime()); + + client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord.toBSON()); + + // Check for the initial state of the SessionCatalogMigrationSource, and drain the majority + // committed session writes. + SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey); + ASSERT_TRUE(migrationSource.hasMoreOplog()); + ASSERT_FALSE(migrationSource.inCatchupPhase()); + migrationSource.fetchNextOplog(opCtx()); + migrationSource.getLastFetchedOplog(); + ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx())); + ASSERT_FALSE(migrationSource.hasMoreOplog()); + + // Test inCatchupPhase() and untransferredCatchUpDataSize() with new writes. + auto entry2 = makeOplogEntry( + repl::OpTime(Timestamp(53, 345), 2), // optime + repl::OpTypeEnum::kInsert, // op type + BSON("x" << 1), // o + boost::none, // o2 + Date_t::now(), // wall clock time + 0, // statement ids + repl::OpTime(Timestamp(0, 0), 0)); // optime of previous write within same transaction + insertOplogEntry(entry2); + migrationSource.notifyNewWriteOpTime( + entry2.getOpTime(), SessionCatalogMigrationSource::EntryAtOpTimeType::kRetryableWrite); + + ASSERT_TRUE(migrationSource.hasMoreOplog()); + ASSERT_TRUE(migrationSource.inCatchupPhase()); + ASSERT_EQ(migrationSource.untransferredCatchUpDataSize(), sessionRecord.toBSON().objsize()); + + auto entry3 = makeOplogEntry( + repl::OpTime(Timestamp(54, 345), 2), // optime + repl::OpTypeEnum::kInsert, // op type + BSON("x" << 1), // o + boost::none, // o2 + Date_t::now(), // wall clock time + 0, // statement ids + repl::OpTime(Timestamp(0, 0), 0)); // optime of previous write within same transaction + insertOplogEntry(entry3); + migrationSource.notifyNewWriteOpTime( + entry3.getOpTime(), SessionCatalogMigrationSource::EntryAtOpTimeType::kRetryableWrite); + + ASSERT_TRUE(migrationSource.hasMoreOplog()); + ASSERT_TRUE(migrationSource.inCatchupPhase()); + ASSERT_EQ(migrationSource.untransferredCatchUpDataSize(), 2 * sessionRecord.toBSON().objsize()); + + // Drain new writes and check untransferred data size. + ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); + ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); + ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx())); + + ASSERT_FALSE(migrationSource.hasMoreOplog()); + ASSERT_TRUE(migrationSource.inCatchupPhase()); + ASSERT_EQ(0, migrationSource.untransferredCatchUpDataSize()); +} + +TEST_F(SessionCatalogMigrationSourceTest, UntransferredDataSizeWithNoCommittedWrites) { + SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey); + + auto entry = makeOplogEntry( + repl::OpTime(Timestamp(52, 345), 2), // optime + repl::OpTypeEnum::kInsert, // op type + BSON("x" << 0), // o + boost::none, // o2 + Date_t::now(), // wall clock time + 0, // statement ids + repl::OpTime(Timestamp(0, 0), 0)); // optime of previous write within same transaction + insertOplogEntry(entry); + migrationSource.notifyNewWriteOpTime( + entry.getOpTime(), SessionCatalogMigrationSource::EntryAtOpTimeType::kRetryableWrite); + + ASSERT_TRUE(migrationSource.hasMoreOplog()); + ASSERT_TRUE(migrationSource.inCatchupPhase()); + // Average object size is default since the config.transactions collection does not exist. + const int64_t defaultSessionDocSize = + sizeof(LogicalSessionId) + sizeof(TxnNumber) + sizeof(Timestamp) + 16; + ASSERT_EQ(migrationSource.untransferredCatchUpDataSize(), defaultSessionDocSize); +} + } // namespace } // namespace mongo |