diff options
4 files changed, 212 insertions, 130 deletions
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h index 0cacc08e41c..b257bd6bc24 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h @@ -146,12 +146,15 @@ public: Status nextModsBatch(OperationContext* opCtx, Database* db, BSONObjBuilder* builder); /** - * Appends to the buffer oplogs that contain session information for this migration. - * If this function returns a valid OpTime, this means that the oplog appended are - * not guaranteed to be majority committed and the caller has to use wait for the - * returned opTime to be majority committed. If the underlying SessionCatalogMigrationSource - * does not exist, that means this node is running as a standalone and doesn't support retryable - * writes, so we return boost::none. + * Appends to 'arrBuilder' oplog entries which wrote to the currently migrated chunk and contain + * session information. + * + * If this function returns a valid OpTime, this means that the oplog appended are not + * guaranteed to be majority committed and the caller has to wait for the returned opTime to be + * majority committed before returning them to the donor shard. + * + * If the underlying SessionCatalogMigrationSource does not exist, that means this node is + * running as a standalone and doesn't support retryable writes, so we return boost::none. * * This waiting is necessary because session migration is only allowed to send out committed * entries, as opposed to chunk migration, which can send out uncommitted documents. With chunk diff --git a/src/mongo/db/s/session_catalog_migration_destination.cpp b/src/mongo/db/s/session_catalog_migration_destination.cpp index 3b9969bebfb..5353a4e6d5d 100644 --- a/src/mongo/db/s/session_catalog_migration_destination.cpp +++ b/src/mongo/db/s/session_catalog_migration_destination.cpp @@ -59,10 +59,11 @@ const WriteConcernOptions kMajorityWC(WriteConcernOptions::kMajority, Milliseconds(0)); struct ProcessOplogResult { - bool isPrePostImage = false; - repl::OpTime oplogTime; LogicalSessionId sessionId; TxnNumber txnNum; + + repl::OpTime oplogTime; + bool isPrePostImage = false; }; /** @@ -140,12 +141,13 @@ repl::OplogLink extractPrePostImageTs(const ProcessOplogResult& lastResult, * Parses the oplog into an oplog entry and makes sure that it contains the expected fields. */ repl::OplogEntry parseOplog(const BSONObj& oplogBSON) { - auto oplogStatus = repl::OplogEntry::parse(oplogBSON); - uassertStatusOK(oplogStatus.getStatus()); + auto oplogEntry = uassertStatusOK(repl::OplogEntry::parse(oplogBSON)); - auto oplogEntry = oplogStatus.getValue(); + // Session oplog entries must always contain wall clock time, because we will not be + // transferring anything from a previous version of the server + invariant(oplogEntry.getWallClockTime()); - auto sessionInfo = oplogEntry.getOperationSessionInfo(); + const auto& sessionInfo = oplogEntry.getOperationSessionInfo(); uassert(ErrorCodes::UnsupportedFormat, str::stream() << "oplog with opTime " << oplogEntry.getTimestamp().toString() @@ -204,8 +206,12 @@ BSONObj getNextSessionOplogBatch(OperationContext* opCtx, ProcessOplogResult processSessionOplog(OperationContext* opCtx, const BSONObj& oplogBSON, const ProcessOplogResult& lastResult) { - ProcessOplogResult result; auto oplogEntry = parseOplog(oplogBSON); + const auto& sessionInfo = oplogEntry.getOperationSessionInfo(); + + ProcessOplogResult result; + result.sessionId = *sessionInfo.getSessionId(); + result.txnNum = *sessionInfo.getTxnNumber(); BSONObj object2; if (oplogEntry.getOpType() == repl::OpTypeEnum::kNoop) { @@ -231,22 +237,15 @@ ProcessOplogResult processSessionOplog(OperationContext* opCtx, << ", oplog ts: " << oplogEntry.getTimestamp().toString() << ": " - << redact(oplogBSON), + << oplogBSON, !lastResult.isPrePostImage); } } else { object2 = oplogBSON; // TODO: strip redundant info? } - const auto& sessionInfo = oplogEntry.getOperationSessionInfo(); - result.sessionId = sessionInfo.getSessionId().value(); - result.txnNum = sessionInfo.getTxnNumber().value(); const auto stmtId = *oplogEntry.getStatementId(); - // Session oplog entries must always contain wall clock time, because we will not be - // transferring anything from a previous version of the server - invariant(oplogEntry.getWallClockTime()); - auto scopedSession = SessionCatalog::get(opCtx)->getOrCreateSession(opCtx, result.sessionId); scopedSession->beginTxn(opCtx, result.txnNum); @@ -255,6 +254,11 @@ ProcessOplogResult processSessionOplog(OperationContext* opCtx, return lastResult; } } catch (const DBException& ex) { + // If the transaction chain was truncated on the recipient shard, then we are most likely + // copying from a session that hasn't been touched on the recipient shard for a very long + // time but could be recent on the donor. + // + // We continue copying regardless to get the entire transaction from the donor. if (ex.code() != ErrorCodes::IncompleteTransactionHistory) { throw; } @@ -297,7 +301,7 @@ ProcessOplogResult processSessionOplog(OperationContext* opCtx, oplogLink, OplogSlot()); - auto oplogOpTime = result.oplogTime; + const auto& oplogOpTime = result.oplogTime; uassert(40633, str::stream() << "Failed to create new oplog entry for oplog with opTime: " << oplogEntry.getOpTime().toString() @@ -341,8 +345,21 @@ void SessionCatalogMigrationDestination::start(ServiceContext* service) { _isStateChanged.notify_all(); } - _thread = stdx::thread(stdx::bind( - &SessionCatalogMigrationDestination::_retrieveSessionStateFromSource, this, service)); + _thread = stdx::thread([=] { + try { + _retrieveSessionStateFromSource(service); + } catch (const DBException& ex) { + if (ex.code() == ErrorCodes::CommandNotFound) { + // TODO: remove this after v3.7 + // + // This means that the donor shard is running at an older version so it is safe to + // just end this because there is no session information to transfer. + return; + } + + _errorOccurred(ex.toString()); + } + }); } void SessionCatalogMigrationDestination::finish() { @@ -390,85 +407,67 @@ void SessionCatalogMigrationDestination::_retrieveSessionStateFromSource(Service } } - try { - auto nextBatch = getNextSessionOplogBatch(opCtx, _fromShard, _migrationSessionId); - BSONArray oplogArray(nextBatch[kOplogField].Obj()); - BSONArrayIteratorSorted oplogIter(oplogArray); - - if (!oplogIter.more()) { - { - stdx::lock_guard<stdx::mutex> lk(_mutex); - if (_state == State::Committing) { - // The migration is considered done only when it gets an empty result from - // the source shard while this is in state committing. This is to make sure - // that it doesn't miss any new oplog created between the time window where - // this depleted the buffer from the source shard and receiving the commit - // command. - if (oplogDrainedAfterCommiting) { - break; - } - - oplogDrainedAfterCommiting = true; + auto nextBatch = getNextSessionOplogBatch(opCtx, _fromShard, _migrationSessionId); + BSONArray oplogArray(nextBatch[kOplogField].Obj()); + BSONArrayIteratorSorted oplogIter(oplogArray); + + if (!oplogIter.more()) { + { + stdx::lock_guard<stdx::mutex> lk(_mutex); + if (_state == State::Committing) { + // The migration is considered done only when it gets an empty result from the + // source shard while this is in state committing. This is to make sure that it + // doesn't miss any new oplog created between the time window where this + // depleted the buffer from the source shard and receiving the commit command. + if (oplogDrainedAfterCommiting) { + break; } - } - - WriteConcernResult wcResult; - auto wcStatus = - waitForWriteConcern(opCtx, lastResult.oplogTime, kMajorityWC, &wcResult); - if (!wcStatus.isOK()) { - _errorOccurred(wcStatus.toString()); - return; - } - // We depleted the buffer at least once, transition to ready for commit. - { - stdx::lock_guard<stdx::mutex> lk(_mutex); - // Note: only transition to "ready to commit" if state is not error/force stop. - if (_state == State::Migrating) { - _state = State::ReadyToCommit; - _isStateChanged.notify_all(); - } + oplogDrainedAfterCommiting = true; } + } - if (lastOpTimeWaited == lastResult.oplogTime) { - // We got an empty result at least twice in a row from the source shard so - // space it out a little bit so we don't hammer the shard. - opCtx->sleepFor(Milliseconds(200)); + WriteConcernResult unusedWCResult; + uassertStatusOK( + waitForWriteConcern(opCtx, lastResult.oplogTime, kMajorityWC, &unusedWCResult)); + + // We depleted the buffer at least once, transition to ready for commit. + { + stdx::lock_guard<stdx::mutex> lk(_mutex); + // Note: only transition to "ready to commit" if state is not error/force stop. + if (_state == State::Migrating) { + _state = State::ReadyToCommit; + _isStateChanged.notify_all(); } + } - lastOpTimeWaited = lastResult.oplogTime; + if (lastOpTimeWaited == lastResult.oplogTime) { + // We got an empty result at least twice in a row from the source shard so space it + // out a little bit so we don't hammer the shard + opCtx->sleepFor(Milliseconds(200)); } - while (oplogIter.more()) { + lastOpTimeWaited = lastResult.oplogTime; + } + + while (oplogIter.more()) { + try { lastResult = processSessionOplog(opCtx, oplogIter.next().Obj(), lastResult); - } - } catch (const DBException& excep) { - if (excep.code() == ErrorCodes::ConflictingOperationInProgress || - excep.code() == ErrorCodes::TransactionTooOld) { - // This means that the server has a newer txnNumber than the oplog being migrated, - // so just skip it. - continue; - } + } catch (const DBException& ex) { + if (ex.code() == ErrorCodes::ConflictingOperationInProgress || + ex.code() == ErrorCodes::TransactionTooOld) { + // This means that the server has a newer txnNumber than the oplog being + // migrated, so just skip it + continue; + } - if (excep.code() == ErrorCodes::CommandNotFound) { - // TODO: remove this after v3.7 - // - // This means that the donor shard is running at an older version so it is safe to - // just end this because there is no session information to transfer. - break; + throw; } - - _errorOccurred(excep.toString()); - return; } } - WriteConcernResult wcResult; - auto wcStatus = waitForWriteConcern(opCtx, lastResult.oplogTime, kMajorityWC, &wcResult); - if (!wcStatus.isOK()) { - _errorOccurred(wcStatus.toString()); - return; - } + WriteConcernResult unusedWCResult; + uassertStatusOK(waitForWriteConcern(opCtx, lastResult.oplogTime, kMajorityWC, &unusedWCResult)); { stdx::lock_guard<stdx::mutex> lk(_mutex); diff --git a/src/mongo/db/s/session_catalog_migration_destination_test.cpp b/src/mongo/db/s/session_catalog_migration_destination_test.cpp index fe71b57e88c..4614ab30602 100644 --- a/src/mongo/db/s/session_catalog_migration_destination_test.cpp +++ b/src/mongo/db/s/session_catalog_migration_destination_test.cpp @@ -220,7 +220,7 @@ public: Session* session, TxnNumber txnNumber, StmtId stmtId, - repl::OplogEntry& expectedOplog) { + const repl::OplogEntry& expectedOplog) { auto oplog = session->checkStatementExecuted(opCtx, txnNumber, stmtId); ASSERT_TRUE(oplog); checkOplogWithNestedOplog(expectedOplog, *oplog); @@ -1510,41 +1510,38 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldIgnoreAlreadyExecutedStatem } TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithIncompleteHistory) { - const auto sessionId = makeLogicalSessionIdForTest(); + OperationSessionInfo sessionInfo; + sessionInfo.setSessionId(makeLogicalSessionIdForTest()); + sessionInfo.setTxnNumber(2); + const std::vector<repl::OplogEntry> oplogEntries{ + makeOplogEntry(OpTime(Timestamp(100, 2), 1), // optime + OpTypeEnum::kInsert, // op type + BSON("x" << 100), // o + boost::none, // o2 + sessionInfo, // session info + Date_t::now(), // wall clock time + 23), // statement id + makeOplogEntry(OpTime(Timestamp(80, 2), 1), // optime + OpTypeEnum::kNoop, // op type + {}, // o + Session::kDeadEndSentinel, // o2 + sessionInfo, // session info + Date_t::now(), // wall clock time + kIncompleteHistoryStmtId), // statement id + makeOplogEntry(OpTime(Timestamp(60, 2), 1), // optime + OpTypeEnum::kInsert, // op type + BSON("x" << 60), // o + boost::none, // o2 + sessionInfo, // session info + Date_t::now(), // wall clock time + 5)}; // statement id SessionCatalogMigrationDestination sessionMigration(kFromShard, migrationId()); sessionMigration.start(getServiceContext()); sessionMigration.finish(); - OperationSessionInfo sessionInfo; - sessionInfo.setSessionId(sessionId); - sessionInfo.setTxnNumber(2); - - auto oplog1 = makeOplogEntry(OpTime(Timestamp(100, 2), 1), // optime - OpTypeEnum::kInsert, // op type - BSON("x" << 100), // o - boost::none, // o2 - sessionInfo, // session info - Date_t::now(), // wall clock time - 23); // statement id - - auto oplog2 = makeOplogEntry(OpTime(Timestamp(80, 2), 1), // optime - OpTypeEnum::kNoop, // op type - {}, // o - Session::kDeadEndSentinel, // o2 - sessionInfo, // session info - Date_t::now(), // wall clock time - kIncompleteHistoryStmtId); // statement id + returnOplog(oplogEntries); - auto oplog3 = makeOplogEntry(OpTime(Timestamp(60, 2), 1), // optime - OpTypeEnum::kInsert, // op type - BSON("x" << 60), // o - boost::none, // o2 - sessionInfo, // session info - Date_t::now(), // wall clock time - 5); // statement id - - returnOplog({oplog1, oplog2, oplog3}); // migration always fetches at least twice to transition from committing to done. returnOplog({}); returnOplog({}); @@ -1553,26 +1550,110 @@ TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithIncompleteHistory ASSERT_TRUE(SessionCatalogMigrationDestination::State::Done == sessionMigration.getState()); - auto opCtx = operationContext(); - auto session = getSessionWithTxn(opCtx, sessionId, 2); + const auto opCtx = operationContext(); + auto session = getSessionWithTxn(opCtx, *sessionInfo.getSessionId(), 2); TransactionHistoryIterator historyIter(session->getLastWriteOpTime(2)); ASSERT_TRUE(historyIter.hasNext()); - checkOplogWithNestedOplog(oplog3, historyIter.next(opCtx)); + checkOplogWithNestedOplog(oplogEntries[2], historyIter.next(opCtx)); ASSERT_TRUE(historyIter.hasNext()); - checkOplog(oplog2, historyIter.next(opCtx)); + checkOplog(oplogEntries[1], historyIter.next(opCtx)); ASSERT_TRUE(historyIter.hasNext()); - checkOplogWithNestedOplog(oplog1, historyIter.next(opCtx)); + checkOplogWithNestedOplog(oplogEntries[0], historyIter.next(opCtx)); ASSERT_FALSE(historyIter.hasNext()); - checkStatementExecuted(opCtx, session.get(), 2, 23, oplog1); - checkStatementExecuted(opCtx, session.get(), 2, 5, oplog3); + checkStatementExecuted(opCtx, session.get(), 2, 23, oplogEntries[0]); + checkStatementExecuted(opCtx, session.get(), 2, 5, oplogEntries[2]); ASSERT_THROWS(session->checkStatementExecuted(opCtx, 2, 38), AssertionException); } -} // namespace +TEST_F(SessionCatalogMigrationDestinationTest, + OplogEntriesWithOldTransactionFollowedByUpToDateEntries) { + auto opCtx = operationContext(); + + OperationSessionInfo sessionInfo1; + sessionInfo1.setSessionId(makeLogicalSessionIdForTest()); + sessionInfo1.setTxnNumber(2); + { + // "Start" a new transaction on session 1, so that migrating the entries above will result + // in TransactionTooOld. This should not preclude the entries for session 2 from getting + // applied. + auto scopedSession = + SessionCatalog::get(opCtx)->getOrCreateSession(opCtx, *sessionInfo1.getSessionId()); + scopedSession->refreshFromStorageIfNeeded(opCtx); + scopedSession->beginTxn(opCtx, 3); + } + + OperationSessionInfo sessionInfo2; + sessionInfo2.setSessionId(makeLogicalSessionIdForTest()); + sessionInfo2.setTxnNumber(15); + + const std::vector<repl::OplogEntry> oplogEntries{ + // Session 1 entries + makeOplogEntry(OpTime(Timestamp(100, 2), 1), // optime + OpTypeEnum::kInsert, // op type + BSON("x" << 100), // o + boost::none, // o2 + sessionInfo1, // session info + Date_t::now(), // wall clock time + 23), // statement id + + // Session 2 entries + makeOplogEntry(OpTime(Timestamp(50, 2), 1), // optime + OpTypeEnum::kInsert, // op type + BSON("x" << 50), // o + boost::none, // o2 + sessionInfo2, // session info + Date_t::now(), // wall clock time + 56), // statement id + makeOplogEntry(OpTime(Timestamp(20, 2), 1), // optime + OpTypeEnum::kInsert, // op type + BSON("x" << 20), // o + boost::none, // o2 + sessionInfo2, // session info + Date_t::now(), // wall clock time + 55)}; // statement id + + SessionCatalogMigrationDestination sessionMigration(kFromShard, migrationId()); + sessionMigration.start(getServiceContext()); + sessionMigration.finish(); + + returnOplog(oplogEntries); + + // migration always fetches at least twice to transition from committing to done. + returnOplog({}); + returnOplog({}); + + sessionMigration.join(); + + ASSERT_TRUE(SessionCatalogMigrationDestination::State::Done == sessionMigration.getState()); + + // Check nothing was written for session 1 + { + auto session1 = getSessionWithTxn(opCtx, *sessionInfo1.getSessionId(), 3); + ASSERT(session1->getLastWriteOpTime(3).isNull()); + } + + // Check session 2 was correctly updated + { + auto session2 = getSessionWithTxn(opCtx, *sessionInfo2.getSessionId(), 15); + + TransactionHistoryIterator historyIter(session2->getLastWriteOpTime(15)); + ASSERT_TRUE(historyIter.hasNext()); + checkOplogWithNestedOplog(oplogEntries[2], historyIter.next(opCtx)); + + ASSERT_TRUE(historyIter.hasNext()); + checkOplogWithNestedOplog(oplogEntries[1], historyIter.next(opCtx)); + + ASSERT_FALSE(historyIter.hasNext()); + checkStatementExecuted(opCtx, session2.get(), 15, 56, oplogEntries[1]); + checkStatementExecuted(opCtx, session2.get(), 15, 55, oplogEntries[2]); + } +} + +} // namespace } // namespace mongo diff --git a/src/mongo/db/s/session_catalog_migration_source_test.cpp b/src/mongo/db/s/session_catalog_migration_source_test.cpp index 5b1e76a1e18..5df4116685b 100644 --- a/src/mongo/db/s/session_catalog_migration_source_test.cpp +++ b/src/mongo/db/s/session_catalog_migration_source_test.cpp @@ -41,15 +41,14 @@ #include "mongo/unittest/unittest.h" namespace mongo { +namespace { using executor::RemoteCommandRequest; -namespace { +const NamespaceString kNs("a.b"); class SessionCatalogMigrationSourceTest : public MockReplCoordServerFixture {}; -const NamespaceString kNs("a.b"); - /** * Creates an OplogEntry with given parameters and preset defaults for this test suite. */ |