summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAllison Easton <allison.easton@mongodb.com>2022-08-08 15:14:03 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-08-08 15:42:02 +0000
commitc0e5ce18648ccb6356c00db4a5fca05e24a9bfcd (patch)
tree7d89c71118ff53b0d4e10f0a21113cecebcdd032
parentd32fafd0ec9829d1c6c1a9c3bfb2f1dd15004688 (diff)
downloadmongo-c0e5ce18648ccb6356c00db4a5fca05e24a9bfcd.tar.gz
SERVER-56185 Investigate possible improvements with session migration and a chunk migration's critical section
(cherry picked from commit 6e8bf2cc701d81c11ad3b898cb4c394933034ac3)
-rw-r--r--src/mongo/bson/bsonobj.h2
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp16
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp4
-rw-r--r--src/mongo/db/s/session_catalog_migration_source.cpp17
-rw-r--r--src/mongo/db/s/session_catalog_migration_source.h12
-rw-r--r--src/mongo/db/s/session_catalog_migration_source_test.cpp101
6 files changed, 148 insertions, 4 deletions
diff --git a/src/mongo/bson/bsonobj.h b/src/mongo/bson/bsonobj.h
index f22c9391bb6..230ddc84ff4 100644
--- a/src/mongo/bson/bsonobj.h
+++ b/src/mongo/bson/bsonobj.h
@@ -121,7 +121,7 @@ public:
using ComparisonRules = BSONComparatorInterfaceBase<BSONObj>::ComparisonRules;
using ComparisonRulesSet = BSONComparatorInterfaceBase<BSONObj>::ComparisonRulesSet;
- static const char kMinBSONLength = 5;
+ static constexpr char kMinBSONLength = 5;
/** Construct an empty BSONObj -- that is, {}. */
BSONObj() {
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
index 2ed87344465..ee26d0d0edf 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
@@ -55,6 +55,7 @@
#include "mongo/executor/task_executor_pool.h"
#include "mongo/logv2/log.h"
#include "mongo/rpc/get_status_from_command_result.h"
+#include "mongo/s/balancer_configuration.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/grid.h"
#include "mongo/util/elapsed_tracker.h"
@@ -1131,7 +1132,8 @@ Status MigrationChunkClonerSourceLegacy::_checkRecipientCloningStatus(OperationC
"docsRemainingToClone"_attr = cloneLocsRemaining);
}
- if (res["state"].String() == "steady") {
+ if (res["state"].String() == "steady" && _sessionCatalogSource->inCatchupPhase() &&
+ _sessionCatalogSource->untransferredCatchUpDataSize() == 0) {
if (cloneLocsRemaining != 0 ||
(_jumboChunkCloneState && _forceJumbo &&
PlanExecutor::IS_EOF != _jumboChunkCloneState->clonerState)) {
@@ -1156,14 +1158,20 @@ Status MigrationChunkClonerSourceLegacy::_checkRecipientCloningStatus(OperationC
supportsCriticalSectionDuringCatchUp = true;
}
- if (res["state"].String() == "catchup" && supportsCriticalSectionDuringCatchUp) {
+ if ((res["state"].String() == "steady" || res["state"].String() == "catchup") &&
+ _sessionCatalogSource->inCatchupPhase() && supportsCriticalSectionDuringCatchUp) {
int64_t estimatedUntransferredModsSize =
_untransferredDeletesCounter * _averageObjectIdSize +
_untransferredUpsertsCounter * _averageObjectSizeForCloneLocs;
auto estimatedUntransferredChunkPercentage =
(std::min(_args.getMaxChunkSizeBytes(), estimatedUntransferredModsSize) * 100) /
_args.getMaxChunkSizeBytes();
- if (estimatedUntransferredChunkPercentage < maxCatchUpPercentageBeforeBlockingWrites) {
+ int64_t estimateUntransferredSessionsSize =
+ _sessionCatalogSource->untransferredCatchUpDataSize();
+ int64_t maxUntransferredSessionsSize = BSONObjMaxUserSize *
+ _args.getMaxChunkSizeBytes() / ChunkSizeSettingsType::kDefaultMaxChunkSizeBytes;
+ if (estimatedUntransferredChunkPercentage < maxCatchUpPercentageBeforeBlockingWrites &&
+ estimateUntransferredSessionsSize < maxUntransferredSessionsSize) {
// The recipient is sufficiently caught-up with the writes on the donor.
// Block writes, so that it can drain everything.
LOGV2_DEBUG(5630700,
@@ -1173,6 +1181,8 @@ Status MigrationChunkClonerSourceLegacy::_checkRecipientCloningStatus(OperationC
"_untransferredDeletesCounter"_attr = _untransferredDeletesCounter,
"_averageObjectSizeForCloneLocs"_attr = _averageObjectSizeForCloneLocs,
"_averageObjectIdSize"_attr = _averageObjectIdSize,
+ "untransferredSessionDataInBytes"_attr =
+ estimateUntransferredSessionsSize,
"maxChunksSizeBytes"_attr = _args.getMaxChunkSizeBytes(),
"_sessionId"_attr = _sessionId.toString());
return Status::OK();
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp
index 8750348e8cc..feff389a508 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy_test.cpp
@@ -71,6 +71,10 @@ protected:
void setUp() override {
ShardServerTestFixture::setUp();
+ auto opCtx = operationContext();
+ DBDirectClient client(opCtx);
+ client.createCollection(NamespaceString::kSessionTransactionsTableNamespace.ns());
+
// TODO: SERVER-26919 set the flag on the mock repl coordinator just for the window where it
// actually needs to bypass the op observer.
replicationCoordinator()->alwaysAllowWrites(true);
diff --git a/src/mongo/db/s/session_catalog_migration_source.cpp b/src/mongo/db/s/session_catalog_migration_source.cpp
index 4061f8735f6..8249e2abc54 100644
--- a/src/mongo/db/s/session_catalog_migration_source.cpp
+++ b/src/mongo/db/s/session_catalog_migration_source.cpp
@@ -228,6 +228,14 @@ SessionCatalogMigrationSource::SessionCatalogMigrationSource(OperationContext* o
WriteConcernOptions majority(
WriteConcernOptions::kMajority, WriteConcernOptions::SyncMode::UNSET, 0);
uassertStatusOK(waitForWriteConcern(opCtx, opTimeToWait, majority, &result));
+
+ AutoGetCollection autoColl(opCtx, NamespaceString::kSessionTransactionsTableNamespace, MODE_IS);
+ Collection* const collection = autoColl.getCollection();
+ // Session docs contain at least LSID, TxnNumber, Timestamp, and some BSON overhead.
+ const int64_t defaultSessionDocSize =
+ sizeof(LogicalSessionId) + sizeof(TxnNumber) + sizeof(Timestamp) + 16;
+ _averageSessionDocSize =
+ collection ? collection->averageObjectSize(opCtx) : defaultSessionDocSize;
}
bool SessionCatalogMigrationSource::hasMoreOplog() {
@@ -239,6 +247,15 @@ bool SessionCatalogMigrationSource::hasMoreOplog() {
return _hasNewWrites(lk);
}
+bool SessionCatalogMigrationSource::inCatchupPhase() {
+ return !_hasMoreOplogFromSessionCatalog();
+}
+
+int64_t SessionCatalogMigrationSource::untransferredCatchUpDataSize() {
+ invariant(inCatchupPhase());
+ return _newWriteOpTimeList.size() * _averageSessionDocSize;
+}
+
void SessionCatalogMigrationSource::onCommitCloneStarted() {
stdx::lock_guard<Latch> _lk(_newOplogMutex);
diff --git a/src/mongo/db/s/session_catalog_migration_source.h b/src/mongo/db/s/session_catalog_migration_source.h
index 9e138af12a1..32b9db37111 100644
--- a/src/mongo/db/s/session_catalog_migration_source.h
+++ b/src/mongo/db/s/session_catalog_migration_source.h
@@ -103,6 +103,16 @@ public:
bool hasMoreOplog();
/**
+ * Returns true if the majority committed oplog entries are drained and false otherwise.
+ */
+ bool inCatchupPhase();
+
+ /**
+ * Returns the estimated bytes of data left to transfer in _newWriteOpTimeList.
+ */
+ int64_t untransferredCatchUpDataSize();
+
+ /**
* Attempts to fetch the next oplog entry. Returns true if it was able to fetch anything.
*/
bool fetchNextOplog(OperationContext* opCtx);
@@ -251,6 +261,8 @@ private:
// Protects _newWriteTsList, _lastFetchedNewWriteOplog, _state, _newOplogNotification
Mutex _newOplogMutex = MONGO_MAKE_LATCH("SessionCatalogMigrationSource::_newOplogMutex");
+ // The average size of documents in config.transactions.
+ uint64_t _averageSessionDocSize;
// Stores oplog opTime of new writes that are coming in.
std::list<std::pair<repl::OpTime, EntryAtOpTimeType>> _newWriteOpTimeList;
diff --git a/src/mongo/db/s/session_catalog_migration_source_test.cpp b/src/mongo/db/s/session_catalog_migration_source_test.cpp
index 98e02f6b8c0..b1aa6686080 100644
--- a/src/mongo/db/s/session_catalog_migration_source_test.cpp
+++ b/src/mongo/db/s/session_catalog_migration_source_test.cpp
@@ -117,6 +117,8 @@ TEST_F(SessionCatalogMigrationSourceTest, NoSessionsToTransferShouldNotHaveOplog
SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey);
ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx()));
ASSERT_FALSE(migrationSource.hasMoreOplog());
+ ASSERT_TRUE(migrationSource.inCatchupPhase());
+ ASSERT_EQ(0, migrationSource.untransferredCatchUpDataSize());
}
TEST_F(SessionCatalogMigrationSourceTest, OneSessionWithTwoWrites) {
@@ -1172,5 +1174,104 @@ TEST_F(SessionCatalogMigrationSourceTest, TwoSessionWithTwoWritesContainingWrite
ASSERT_FALSE(migrationSource.hasMoreOplog());
}
+TEST_F(SessionCatalogMigrationSourceTest, UntransferredDataSizeWithCommittedWrites) {
+ DBDirectClient client(opCtx());
+ client.createCollection(NamespaceString::kSessionTransactionsTableNamespace.ns());
+ // Enter an oplog entry before creating SessionCatalogMigrationSource to set config.transactions
+ // average object size to the size of this entry.
+ auto entry = makeOplogEntry(
+ repl::OpTime(Timestamp(52, 345), 2), // optime
+ repl::OpTypeEnum::kInsert, // op type
+ BSON("x" << 0), // o
+ boost::none, // o2
+ Date_t::now(), // wall clock time
+ 0, // statement ids
+ repl::OpTime(Timestamp(0, 0), 0)); // optime of previous write within same transaction
+ insertOplogEntry(entry);
+
+ SessionTxnRecord sessionRecord;
+ sessionRecord.setSessionId(makeLogicalSessionIdForTest());
+ sessionRecord.setTxnNum(1);
+ sessionRecord.setLastWriteOpTime(entry.getOpTime());
+ sessionRecord.setLastWriteDate(entry.getWallClockTime());
+
+ client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord.toBSON());
+
+ // Check for the initial state of the SessionCatalogMigrationSource, and drain the majority
+ // committed session writes.
+ SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey);
+ ASSERT_TRUE(migrationSource.hasMoreOplog());
+ ASSERT_FALSE(migrationSource.inCatchupPhase());
+ migrationSource.fetchNextOplog(opCtx());
+ migrationSource.getLastFetchedOplog();
+ ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx()));
+ ASSERT_FALSE(migrationSource.hasMoreOplog());
+
+ // Test inCatchupPhase() and untransferredCatchUpDataSize() with new writes.
+ auto entry2 = makeOplogEntry(
+ repl::OpTime(Timestamp(53, 345), 2), // optime
+ repl::OpTypeEnum::kInsert, // op type
+ BSON("x" << 1), // o
+ boost::none, // o2
+ Date_t::now(), // wall clock time
+ 0, // statement ids
+ repl::OpTime(Timestamp(0, 0), 0)); // optime of previous write within same transaction
+ insertOplogEntry(entry2);
+ migrationSource.notifyNewWriteOpTime(
+ entry2.getOpTime(), SessionCatalogMigrationSource::EntryAtOpTimeType::kRetryableWrite);
+
+ ASSERT_TRUE(migrationSource.hasMoreOplog());
+ ASSERT_TRUE(migrationSource.inCatchupPhase());
+ ASSERT_EQ(migrationSource.untransferredCatchUpDataSize(), sessionRecord.toBSON().objsize());
+
+ auto entry3 = makeOplogEntry(
+ repl::OpTime(Timestamp(54, 345), 2), // optime
+ repl::OpTypeEnum::kInsert, // op type
+ BSON("x" << 1), // o
+ boost::none, // o2
+ Date_t::now(), // wall clock time
+ 0, // statement ids
+ repl::OpTime(Timestamp(0, 0), 0)); // optime of previous write within same transaction
+ insertOplogEntry(entry3);
+ migrationSource.notifyNewWriteOpTime(
+ entry3.getOpTime(), SessionCatalogMigrationSource::EntryAtOpTimeType::kRetryableWrite);
+
+ ASSERT_TRUE(migrationSource.hasMoreOplog());
+ ASSERT_TRUE(migrationSource.inCatchupPhase());
+ ASSERT_EQ(migrationSource.untransferredCatchUpDataSize(), 2 * sessionRecord.toBSON().objsize());
+
+ // Drain new writes and check untransferred data size.
+ ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
+ ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
+ ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx()));
+
+ ASSERT_FALSE(migrationSource.hasMoreOplog());
+ ASSERT_TRUE(migrationSource.inCatchupPhase());
+ ASSERT_EQ(0, migrationSource.untransferredCatchUpDataSize());
+}
+
+TEST_F(SessionCatalogMigrationSourceTest, UntransferredDataSizeWithNoCommittedWrites) {
+ SessionCatalogMigrationSource migrationSource(opCtx(), kNs, kChunkRange, kShardKey);
+
+ auto entry = makeOplogEntry(
+ repl::OpTime(Timestamp(52, 345), 2), // optime
+ repl::OpTypeEnum::kInsert, // op type
+ BSON("x" << 0), // o
+ boost::none, // o2
+ Date_t::now(), // wall clock time
+ 0, // statement ids
+ repl::OpTime(Timestamp(0, 0), 0)); // optime of previous write within same transaction
+ insertOplogEntry(entry);
+ migrationSource.notifyNewWriteOpTime(
+ entry.getOpTime(), SessionCatalogMigrationSource::EntryAtOpTimeType::kRetryableWrite);
+
+ ASSERT_TRUE(migrationSource.hasMoreOplog());
+ ASSERT_TRUE(migrationSource.inCatchupPhase());
+ // Average object size is default since the config.transactions collection does not exist.
+ const int64_t defaultSessionDocSize =
+ sizeof(LogicalSessionId) + sizeof(TxnNumber) + sizeof(Timestamp) + 16;
+ ASSERT_EQ(migrationSource.untransferredCatchUpDataSize(), defaultSessionDocSize);
+}
+
} // namespace
} // namespace mongo