diff options
-rw-r--r-- | src/mongo/db/logical_session_id.h | 5 | ||||
-rw-r--r-- | src/mongo/db/repl/mock_repl_coord_server_fixture.cpp | 24 | ||||
-rw-r--r-- | src/mongo/db/repl/mock_repl_coord_server_fixture.h | 2 | ||||
-rw-r--r-- | src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp | 41 | ||||
-rw-r--r-- | src/mongo/db/s/migration_chunk_cloner_source_legacy.h | 2 | ||||
-rw-r--r-- | src/mongo/db/s/session_catalog_migration_destination.cpp | 15 | ||||
-rw-r--r-- | src/mongo/db/s/session_catalog_migration_destination_test.cpp | 109 | ||||
-rw-r--r-- | src/mongo/db/s/session_catalog_migration_source.cpp | 105 | ||||
-rw-r--r-- | src/mongo/db/s/session_catalog_migration_source.h | 47 | ||||
-rw-r--r-- | src/mongo/db/s/session_catalog_migration_source_test.cpp | 131 | ||||
-rw-r--r-- | src/mongo/db/session.cpp | 36 | ||||
-rw-r--r-- | src/mongo/db/session.h | 6 | ||||
-rw-r--r-- | src/mongo/db/session_test.cpp | 75 |
13 files changed, 541 insertions, 57 deletions
diff --git a/src/mongo/db/logical_session_id.h b/src/mongo/db/logical_session_id.h index 24629bdd050..a769c19d0f9 100644 --- a/src/mongo/db/logical_session_id.h +++ b/src/mongo/db/logical_session_id.h @@ -40,7 +40,12 @@ namespace mongo { using TxnNumber = std::int64_t; using StmtId = std::int32_t; +// Default value for unassigned statementId. const StmtId kUninitializedStmtId = -1; + +// Used as a substitute statementId for oplog entries that were truncated and lost. +const StmtId kIncompleteHistoryStmtId = -2; + const TxnNumber kUninitializedTxnNumber = -1; class BSONObjBuilder; diff --git a/src/mongo/db/repl/mock_repl_coord_server_fixture.cpp b/src/mongo/db/repl/mock_repl_coord_server_fixture.cpp index 73c0209864a..d690484b0ce 100644 --- a/src/mongo/db/repl/mock_repl_coord_server_fixture.cpp +++ b/src/mongo/db/repl/mock_repl_coord_server_fixture.cpp @@ -28,18 +28,22 @@ #include "mongo/platform/basic.h" -#include "mongo/db/repl/mock_repl_coord_server_fixture.h" - #include "mongo/db/client.h" #include "mongo/db/curop.h" #include "mongo/db/db_raii.h" #include "mongo/db/dbdirectclient.h" #include "mongo/db/operation_context.h" +#include "mongo/db/repl/mock_repl_coord_server_fixture.h" #include "mongo/db/repl/oplog.h" #include "mongo/db/repl/oplog_entry.h" +#include "mongo/db/repl/replication_consistency_markers_mock.h" #include "mongo/db/repl/replication_coordinator_mock.h" +#include "mongo/db/repl/replication_process.h" +#include "mongo/db/repl/replication_recovery_mock.h" +#include "mongo/db/repl/storage_interface_mock.h" #include "mongo/db/service_context.h" #include "mongo/db/service_context_d_test_fixture.h" +#include "mongo/stdx/memory.h" namespace mongo { @@ -48,13 +52,27 @@ void MockReplCoordServerFixture::setUp() { _opCtx = cc().makeOperationContext(); + auto service = getServiceContext(); + + _storageInterface = new repl::StorageInterfaceMock(); + repl::StorageInterface::set(service, + std::unique_ptr<repl::StorageInterface>(_storageInterface)); + ASSERT_TRUE(_storageInterface == repl::StorageInterface::get(service)); + + repl::ReplicationProcess::set(service, + stdx::make_unique<repl::ReplicationProcess>( + _storageInterface, + stdx::make_unique<repl::ReplicationConsistencyMarkersMock>(), + stdx::make_unique<repl::ReplicationRecoveryMock>())); + + ASSERT_OK(repl::ReplicationProcess::get(service)->initializeRollbackID(opCtx())); + // Insert code path assumes existence of repl coordinator! repl::ReplSettings replSettings; replSettings.setReplSetString( ConnectionString::forReplicaSet("sessionTxnStateTest", {HostAndPort("a:1")}).toString()); replSettings.setMaster(true); - auto service = getServiceContext(); repl::ReplicationCoordinator::set( service, stdx::make_unique<repl::ReplicationCoordinatorMock>(service, replSettings)); diff --git a/src/mongo/db/repl/mock_repl_coord_server_fixture.h b/src/mongo/db/repl/mock_repl_coord_server_fixture.h index 71f1fa06fec..fb1b71f4d0a 100644 --- a/src/mongo/db/repl/mock_repl_coord_server_fixture.h +++ b/src/mongo/db/repl/mock_repl_coord_server_fixture.h @@ -37,6 +37,7 @@ class OperationContext; namespace repl { class OplogEntry; +class StorageInterfaceMock; } /** @@ -58,6 +59,7 @@ public: private: ServiceContext::UniqueOperationContext _opCtx; + repl::StorageInterfaceMock* _storageInterface; }; } // namespace mongo diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp index a4c146c1ade..05436465040 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp @@ -164,12 +164,14 @@ public: MONGO_UNREACHABLE; } - if (!_prePostImageOpTime.isNull()) { - _cloner->_sessionCatalogSource.notifyNewWriteOpTime(_prePostImageOpTime); - } + if (auto sessionSource = _cloner->_sessionCatalogSource.get()) { + if (!_prePostImageOpTime.isNull()) { + sessionSource->notifyNewWriteOpTime(_prePostImageOpTime); + } - if (!_opTime.isNull()) { - _cloner->_sessionCatalogSource.notifyNewWriteOpTime(_opTime); + if (!_opTime.isNull()) { + sessionSource->notifyNewWriteOpTime(_opTime); + } } } @@ -192,8 +194,7 @@ MigrationChunkClonerSourceLegacy::MigrationChunkClonerSourceLegacy(MoveChunkRequ _sessionId(MigrationSessionId::generate(_args.getFromShardId().toString(), _args.getToShardId().toString())), _donorConnStr(std::move(donorConnStr)), - _recipientHost(std::move(recipientHost)), - _sessionCatalogSource(_args.getNss()) {} + _recipientHost(std::move(recipientHost)) {} MigrationChunkClonerSourceLegacy::~MigrationChunkClonerSourceLegacy() { invariant(_state == kDone); @@ -204,7 +205,14 @@ Status MigrationChunkClonerSourceLegacy::startClone(OperationContext* opCtx) { invariant(_state == kNew); invariant(!opCtx->lockState()->isLocked()); - _sessionCatalogSource.init(opCtx); + auto const replCoord = repl::ReplicationCoordinator::get(opCtx); + if (replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet) { + _sessionCatalogSource = stdx::make_unique<SessionCatalogMigrationSource>(_args.getNss()); + _sessionCatalogSource->init(opCtx); + + // Prime up the session migration source if there are oplog entries to migrate. + _sessionCatalogSource->fetchNextOplog(opCtx); + } // Load the ids of the currently available documents auto storeCurrentLocsStatus = _storeCurrentLocs(opCtx); @@ -212,9 +220,6 @@ Status MigrationChunkClonerSourceLegacy::startClone(OperationContext* opCtx) { return storeCurrentLocsStatus; } - // Prime up the session migration source if there are oplog entries to migrate. - _sessionCatalogSource.fetchNextOplog(opCtx); - // Tell the recipient shard to start cloning BSONObjBuilder cmdBuilder; StartChunkCloneRequest::appendAsCommand(&cmdBuilder, @@ -338,7 +343,7 @@ Status MigrationChunkClonerSourceLegacy::commitClone(OperationContext* opCtx) { if (responseStatus.isOK()) { _cleanup(opCtx); - if (_sessionCatalogSource.hasMoreOplog()) { + if (_sessionCatalogSource && _sessionCatalogSource->hasMoreOplog()) { return {ErrorCodes::SessionTransferIncomplete, "destination shard finished committing but there are still some session " "metadata that needs to be transferred"}; @@ -735,12 +740,16 @@ repl::OpTime MigrationChunkClonerSourceLegacy::nextSessionMigrationBatch( repl::OpTime opTimeToWait; auto seenOpTimeTerm = repl::OpTime::kUninitializedTerm; - while (_sessionCatalogSource.hasMoreOplog()) { - auto result = _sessionCatalogSource.getLastFetchedOplog(); + if (!_sessionCatalogSource) { + return {}; + } + + while (_sessionCatalogSource->hasMoreOplog()) { + auto result = _sessionCatalogSource->getLastFetchedOplog(); if (!result.oplog) { // Last fetched turned out empty, try to see if there are more - _sessionCatalogSource.fetchNextOplog(opCtx); + _sessionCatalogSource->fetchNextOplog(opCtx); continue; } @@ -764,7 +773,7 @@ repl::OpTime MigrationChunkClonerSourceLegacy::nextSessionMigrationBatch( } arrBuilder->append(oplogDoc); - _sessionCatalogSource.fetchNextOplog(opCtx); + _sessionCatalogSource->fetchNextOplog(opCtx); if (result.shouldWaitForMajority) { if (opTimeToWait < newOpTime) { diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h index 5bf180bcea7..1bdad9ee4c7 100644 --- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h +++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h @@ -201,7 +201,7 @@ private: // during the cloning stage std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> _deleteNotifyExec; - SessionCatalogMigrationSource _sessionCatalogSource; + std::unique_ptr<SessionCatalogMigrationSource> _sessionCatalogSource; // Protects the entries below stdx::mutex _mutex; diff --git a/src/mongo/db/s/session_catalog_migration_destination.cpp b/src/mongo/db/s/session_catalog_migration_destination.cpp index 863d268257d..cdf0f0a778d 100644 --- a/src/mongo/db/s/session_catalog_migration_destination.cpp +++ b/src/mongo/db/s/session_catalog_migration_destination.cpp @@ -245,8 +245,16 @@ ProcessOplogResult processSessionOplog(OperationContext* opCtx, auto scopedSession = SessionCatalog::get(opCtx)->getOrCreateSession(opCtx, result.sessionId); scopedSession->beginTxn(opCtx, result.txnNum); - if (scopedSession->checkStatementExecuted(opCtx, result.txnNum, stmtId)) { - return lastResult; + if (stmtId != kIncompleteHistoryStmtId) { + try { + if (scopedSession->checkStatementExecuted(opCtx, result.txnNum, stmtId)) { + return lastResult; + } + } catch (const DBException& excep) { + if (excep.code() != ErrorCodes::IncompleteTransactionHistory) { + throw; + } + } } BSONObj object(result.isPrePostImage @@ -291,8 +299,7 @@ ProcessOplogResult processSessionOplog(OperationContext* opCtx, !oplogOpTime.isNull()); // Do not call onWriteOpCompletedOnPrimary if we inserted a pre/post - // image, because - // the next oplog will contain the real operation. + // image, because the next oplog will contain the real operation. if (!result.isPrePostImage) { scopedSession->onWriteOpCompletedOnPrimary( opCtx, result.txnNum, {stmtId}, oplogOpTime); diff --git a/src/mongo/db/s/session_catalog_migration_destination_test.cpp b/src/mongo/db/s/session_catalog_migration_destination_test.cpp index 2bcc64ce537..053b7f4f28d 100644 --- a/src/mongo/db/s/session_catalog_migration_destination_test.cpp +++ b/src/mongo/db/s/session_catalog_migration_destination_test.cpp @@ -172,6 +172,24 @@ public: } } + void checkStatementExecuted(OperationContext* opCtx, + Session* session, + TxnNumber txnNumber, + StmtId stmtId) { + auto oplog = session->checkStatementExecuted(opCtx, txnNumber, stmtId); + ASSERT_TRUE(oplog); + } + + void checkStatementExecuted(OperationContext* opCtx, + Session* session, + TxnNumber txnNumber, + StmtId stmtId, + repl::OplogEntry& expectedOplog) { + auto oplog = session->checkStatementExecuted(opCtx, txnNumber, stmtId); + ASSERT_TRUE(oplog); + checkOplogWithNestedOplog(expectedOplog, *oplog); + } + void insertDocWithSessionInfo(const OperationSessionInfo& sessionInfo, const NamespaceString& ns, const BSONObj& doc, @@ -307,6 +325,10 @@ TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithSameTxn) { checkOplogWithNestedOplog(oplog1, historyIter.next(opCtx)); ASSERT_FALSE(historyIter.hasNext()); + + checkStatementExecuted(opCtx, session.get(), 2, 23, oplog1); + checkStatementExecuted(opCtx, session.get(), 2, 45, oplog2); + checkStatementExecuted(opCtx, session.get(), 2, 5, oplog3); } TEST_F(SessionCatalogMigrationDestinationTest, ShouldOnlyStoreHistoryOfLatestTxn) { @@ -347,6 +369,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldOnlyStoreHistoryOfLatestTxn ASSERT_TRUE(historyIter.hasNext()); checkOplogWithNestedOplog(oplog3, historyIter.next(opCtx)); ASSERT_FALSE(historyIter.hasNext()); + + checkStatementExecuted(opCtx, session.get(), txnNum, 5, oplog3); } TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithSameTxnInSeparateBatches) { @@ -394,6 +418,10 @@ TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithSameTxnInSeparate checkOplogWithNestedOplog(oplog1, historyIter.next(opCtx)); ASSERT_FALSE(historyIter.hasNext()); + + checkStatementExecuted(opCtx, session.get(), 2, 23, oplog1); + checkStatementExecuted(opCtx, session.get(), 2, 45, oplog2); + checkStatementExecuted(opCtx, session.get(), 2, 5, oplog3); } TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithDifferentSession) { @@ -439,6 +467,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithDifferentSession) ASSERT_TRUE(historyIter.hasNext()); checkOplogWithNestedOplog(oplog1, historyIter.next(opCtx)); ASSERT_FALSE(historyIter.hasNext()); + + checkStatementExecuted(opCtx, session.get(), 2, 23, oplog1); } { @@ -452,6 +482,9 @@ TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithDifferentSession) checkOplogWithNestedOplog(oplog2, historyIter.next(opCtx)); ASSERT_FALSE(historyIter.hasNext()); + + checkStatementExecuted(opCtx, session.get(), 42, 45, oplog2); + checkStatementExecuted(opCtx, session.get(), 42, 5, oplog3); } } @@ -511,6 +544,9 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldNotNestAlreadyNestedOplog) checkOplog(oplog1, historyIter.next(opCtx)); ASSERT_FALSE(historyIter.hasNext()); + + checkStatementExecuted(opCtx, session.get(), 2, 23); + checkStatementExecuted(opCtx, session.get(), 2, 45); } TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandlePreImageFindAndModify) { @@ -597,6 +633,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandlePreImageFindA ASSERT_BSONOBJ_EQ(preImageOplog.getObject(), newPreImageOplog.getObject()); ASSERT_TRUE(newPreImageOplog.getObject2()); ASSERT_TRUE(newPreImageOplog.getObject2().value().isEmpty()); + + checkStatementExecuted(opCtx, session.get(), 2, 45, updateOplog); } TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandlePostImageFindAndModify) { @@ -682,6 +720,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandlePostImageFind ASSERT_BSONOBJ_EQ(postImageOplog.getObject(), newPostImageOplog.getObject()); ASSERT_TRUE(newPostImageOplog.getObject2()); ASSERT_TRUE(newPostImageOplog.getObject2().value().isEmpty()); + + checkStatementExecuted(opCtx, session.get(), 2, 45, updateOplog); } TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandleFindAndModifySplitIn2Batches) { @@ -770,6 +810,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandleFindAndModify ASSERT_BSONOBJ_EQ(preImageOplog.getObject(), newPreImageOplog.getObject()); ASSERT_TRUE(newPreImageOplog.getObject2()); ASSERT_TRUE(newPreImageOplog.getObject2().value().isEmpty()); + + checkStatementExecuted(opCtx, session.get(), 2, 45, updateOplog); } TEST_F(SessionCatalogMigrationDestinationTest, OlderTxnShouldBeIgnored) { @@ -820,6 +862,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, OlderTxnShouldBeIgnored) { oplog.getObject()); ASSERT_FALSE(historyIter.hasNext()); + + checkStatementExecuted(opCtx, session.get(), 20, 0); } TEST_F(SessionCatalogMigrationDestinationTest, NewerTxnWriteShouldNotBeOverwrittenByOldMigrateTxn) { @@ -871,6 +915,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, NewerTxnWriteShouldNotBeOverwritt ASSERT_BSONOBJ_EQ(BSON("_id" << "newerSess"), oplog.getObject()); + + checkStatementExecuted(opCtx, session.get(), 20, 0); } TEST_F(SessionCatalogMigrationDestinationTest, ShouldJoinProperlyAfterNetworkError) { @@ -1042,6 +1088,10 @@ TEST_F(SessionCatalogMigrationDestinationTest, checkOplogWithNestedOplog(oplog1, historyIter.next(opCtx)); ASSERT_FALSE(historyIter.hasNext()); + + checkStatementExecuted(opCtx, session.get(), 2, 0); + checkStatementExecuted(opCtx, session.get(), 2, 23, oplog1); + checkStatementExecuted(opCtx, session.get(), 2, 45, oplog2); } TEST_F(SessionCatalogMigrationDestinationTest, ShouldErrorForConsecutivePreImageOplog) { @@ -1323,6 +1373,65 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldIgnoreAlreadyExecutedStatem ASSERT_BSONOBJ_EQ(BSON("_id" << 46), firstInsertOplog.getObject()); ASSERT_TRUE(firstInsertOplog.getStatementId()); ASSERT_EQ(30, *firstInsertOplog.getStatementId()); + + checkStatementExecuted(opCtx, session.get(), 19, 23, oplog1); + checkStatementExecuted(opCtx, session.get(), 19, 30); + checkStatementExecuted(opCtx, session.get(), 19, 45, oplog3); +} + +TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithIncompleteHistory) { + const NamespaceString kNs("a.b"); + const auto sessionId = makeLogicalSessionIdForTest(); + + SessionCatalogMigrationDestination sessionMigration(kFromShard, migrationId()); + sessionMigration.start(getServiceContext()); + sessionMigration.finish(); + + OperationSessionInfo sessionInfo; + sessionInfo.setSessionId(sessionId); + sessionInfo.setTxnNumber(2); + + OplogEntry oplog1( + OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 100)); + oplog1.setOperationSessionInfo(sessionInfo); + oplog1.setStatementId(23); + + OplogEntry oplog2( + OpTime(Timestamp(80, 2), 1), 0, OpTypeEnum::kNoop, kNs, 0, {}, Session::kDeadEndSentinel); + oplog2.setOperationSessionInfo(sessionInfo); + oplog2.setStatementId(kIncompleteHistoryStmtId); + + OplogEntry oplog3(OpTime(Timestamp(60, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 60)); + oplog3.setOperationSessionInfo(sessionInfo); + oplog3.setStatementId(5); + + returnOplog({oplog1, oplog2, oplog3}); + // migration always fetches at least twice to transition from committing to done. + returnOplog({}); + returnOplog({}); + + sessionMigration.join(); + + ASSERT_TRUE(SessionCatalogMigrationDestination::State::Done == sessionMigration.getState()); + + auto opCtx = operationContext(); + auto session = getSessionWithTxn(opCtx, sessionId, 2); + TransactionHistoryIterator historyIter(session->getLastWriteOpTime(2)); + + ASSERT_TRUE(historyIter.hasNext()); + checkOplogWithNestedOplog(oplog3, historyIter.next(opCtx)); + + ASSERT_TRUE(historyIter.hasNext()); + checkOplog(oplog2, historyIter.next(opCtx)); + + ASSERT_TRUE(historyIter.hasNext()); + checkOplogWithNestedOplog(oplog1, historyIter.next(opCtx)); + + ASSERT_FALSE(historyIter.hasNext()); + + checkStatementExecuted(opCtx, session.get(), 2, 23, oplog1); + checkStatementExecuted(opCtx, session.get(), 2, 5, oplog3); + ASSERT_THROWS(session->checkStatementExecuted(opCtx, 2, 38), AssertionException); } } // namespace diff --git a/src/mongo/db/s/session_catalog_migration_source.cpp b/src/mongo/db/s/session_catalog_migration_source.cpp index 021eb013c73..723c2151334 100644 --- a/src/mongo/db/s/session_catalog_migration_source.cpp +++ b/src/mongo/db/s/session_catalog_migration_source.cpp @@ -35,9 +35,12 @@ #include "mongo/db/dbdirectclient.h" #include "mongo/db/namespace_string.h" #include "mongo/db/repl/repl_client_info.h" +#include "mongo/db/repl/replication_process.h" +#include "mongo/db/session.h" #include "mongo/db/session_txn_record_gen.h" #include "mongo/db/transaction_history_iterator.h" #include "mongo/db/write_concern.h" +#include "mongo/platform/random.h" #include "mongo/stdx/memory.h" #include "mongo/util/assert_util.h" #include "mongo/util/mongoutils/str.h" @@ -46,6 +49,8 @@ namespace mongo { namespace { +PseudoRandom hashGenerator(std::unique_ptr<SecureRandom>(SecureRandom::create())->nextInt64()); + boost::optional<repl::OplogEntry> fetchPrePostImageOplog(OperationContext* opCtx, const repl::OplogEntry& oplog) { auto opTimeToFetch = oplog.getPreImageOpTime(); @@ -97,14 +102,16 @@ bool SessionCatalogMigrationSource::fetchNextOplog(OperationContext* opCtx) { } bool SessionCatalogMigrationSource::_handleWriteHistory(WithLock, OperationContext* opCtx) { - if (_writeHistoryIterator) { - if (_writeHistoryIterator->hasNext()) { - auto nextOplog = _writeHistoryIterator->next(opCtx); + if (_currentOplogIterator) { + if (_currentOplogIterator->hasNext()) { + auto nextOplog = _currentOplogIterator->getNext(opCtx); + auto nextStmtId = nextOplog.getStatementId(); // Note: This is an optimization based on the assumption that it is not possible to be // touching different namespaces in the same transaction. - if (nextOplog.getNamespace() != _ns) { - _writeHistoryIterator.reset(); + if (!nextStmtId || (nextStmtId && *nextStmtId != kIncompleteHistoryStmtId && + nextOplog.getNamespace() != _ns)) { + _currentOplogIterator.reset(); return false; } @@ -118,7 +125,7 @@ bool SessionCatalogMigrationSource::_handleWriteHistory(WithLock, OperationConte return true; } else { - _writeHistoryIterator.reset(); + _currentOplogIterator.reset(); } } @@ -127,7 +134,8 @@ bool SessionCatalogMigrationSource::_handleWriteHistory(WithLock, OperationConte bool SessionCatalogMigrationSource::_hasMoreOplogFromSessionCatalog() { stdx::lock_guard<stdx::mutex> _lk(_sessionCloneMutex); - return _lastFetchedOplog || !_lastFetchedOplogBuffer.empty(); + return _lastFetchedOplog || !_lastFetchedOplogBuffer.empty() || + !_sessionOplogIterators.empty() || _currentOplogIterator; } // Important: The no-op oplog entry for findAndModify should always be returned first before the @@ -153,12 +161,10 @@ bool SessionCatalogMigrationSource::_fetchNextOplogFromSessionCatalog(OperationC return true; } - while (!_sessionLastWriteOpTimes.empty()) { - auto lowestOpTimeIter = _sessionLastWriteOpTimes.begin(); - auto nextOpTime = *lowestOpTimeIter; - _sessionLastWriteOpTimes.erase(lowestOpTimeIter); + while (!_sessionOplogIterators.empty()) { + _currentOplogIterator = std::move(_sessionOplogIterators.back()); + _sessionOplogIterators.pop_back(); - _writeHistoryIterator = stdx::make_unique<TransactionHistoryIterator>(nextOpTime); if (_handleWriteHistory(lk, opCtx)) { return true; } @@ -219,16 +225,22 @@ void SessionCatalogMigrationSource::notifyNewWriteOpTime(repl::OpTime opTime) { void SessionCatalogMigrationSource::init(OperationContext* opCtx) { invariant(!_alreadyInitialized); + _rollbackIdAtInit = uassertStatusOK(repl::ReplicationProcess::get(opCtx)->getRollbackID(opCtx)); + DBDirectClient client(opCtx); - auto cursor = client.query(NamespaceString::kSessionTransactionsTableNamespace.ns(), {}); + Query query; + // Sort is not needed for correctness. This is just for making it easier to write deterministic + // tests. + query.sort(BSON("_id" << 1)); + auto cursor = client.query(NamespaceString::kSessionTransactionsTableNamespace.ns(), query); - std::set<repl::OpTime> opTimes; + std::vector<std::unique_ptr<SessionOplogIterator>> sessionOplogIterators; while (cursor->more()) { auto nextSession = SessionTxnRecord::parse( IDLParserErrorContext("Session migration cloning"), cursor->next()); - auto opTime = nextSession.getLastWriteOpTime(); - if (!opTime.isNull()) { - opTimes.insert(nextSession.getLastWriteOpTime()); + if (!nextSession.getLastWriteOpTime().isNull()) { + sessionOplogIterators.push_back( + stdx::make_unique<SessionOplogIterator>(std::move(nextSession), _rollbackIdAtInit)); } } @@ -255,7 +267,64 @@ void SessionCatalogMigrationSource::init(OperationContext* opCtx) { stdx::lock_guard<stdx::mutex> lk(_sessionCloneMutex); _alreadyInitialized = true; - _sessionLastWriteOpTimes.swap(opTimes); + _sessionOplogIterators.swap(sessionOplogIterators); +} + +SessionCatalogMigrationSource::SessionOplogIterator::SessionOplogIterator( + SessionTxnRecord txnRecord, int expectedRollbackId) + : _record(std::move(txnRecord)), _initialRollbackId(expectedRollbackId) { + _writeHistoryIterator = + stdx::make_unique<TransactionHistoryIterator>(_record.getLastWriteOpTime()); +} + +bool SessionCatalogMigrationSource::SessionOplogIterator::hasNext() const { + return _writeHistoryIterator && _writeHistoryIterator->hasNext(); +} + +repl::OplogEntry SessionCatalogMigrationSource::SessionOplogIterator::getNext( + OperationContext* opCtx) { + try { + // Note: during SessionCatalogMigrationSource::init, we inserted a document and wait for it + // to committed to the majority. In addition, the TransactionHistoryIterator uses OpTime + // to query for the oplog. This means that if we can successfully fetch the oplog, we are + // guaranteed that they are majority committed. If we can't fetch the oplog, it can either + // mean that the oplog has been rolled over or was rolled back. + return _writeHistoryIterator->next(opCtx); + } catch (const AssertionException& excep) { + if (excep.code() == ErrorCodes::IncompleteTransactionHistory) { + // Note: no need to check if in replicaSet mode because having an iterator implies + // oplog exists. + auto rollbackId = + uassertStatusOK(repl::ReplicationProcess::get(opCtx)->getRollbackID(opCtx)); + + uassert(40656, + str::stream() << "rollback detected, rollbackId was " << _initialRollbackId + << " but is now " + << rollbackId, + rollbackId == _initialRollbackId); + + // If the rollbackId hasn't changed, this means that the oplog has been truncated. + // So, we return the special "write history lost" sentinel. + repl::OplogEntry oplog({}, + hashGenerator.nextInt64(), + repl::OpTypeEnum::kNoop, + {}, + repl::OplogEntry::kOplogVersion, + {}, + Session::kDeadEndSentinel); + OperationSessionInfo sessionInfo; + sessionInfo.setSessionId(_record.getSessionId()); + sessionInfo.setTxnNumber(_record.getTxnNum()); + oplog.setOperationSessionInfo(sessionInfo); + oplog.setStatementId(kIncompleteHistoryStmtId); + + _writeHistoryIterator.reset(); + + return oplog; + } + + throw; + } } } // namespace mongo diff --git a/src/mongo/db/s/session_catalog_migration_source.h b/src/mongo/db/s/session_catalog_migration_source.h index 0133281df07..2e5faf393c2 100644 --- a/src/mongo/db/s/session_catalog_migration_source.h +++ b/src/mongo/db/s/session_catalog_migration_source.h @@ -35,6 +35,7 @@ #include "mongo/client/dbclientcursor.h" #include "mongo/db/namespace_string.h" #include "mongo/db/repl/optime.h" +#include "mongo/db/session_txn_record_gen.h" #include "mongo/db/transaction_history_iterator.h" #include "mongo/stdx/mutex.h" #include "mongo/util/concurrency/with_lock.h" @@ -111,6 +112,36 @@ public: void notifyNewWriteOpTime(repl::OpTime opTimestamp); private: + /** + * An iterator for extracting session write oplogs that need to be cloned during migration. + */ + class SessionOplogIterator { + public: + SessionOplogIterator(SessionTxnRecord txnRecord, int expectedRollbackId); + + /** + * Returns true if there are more oplog entries to fetch for this session. + */ + bool hasNext() const; + + /** + * Returns the next oplog write that happened in this session. If the oplog is lost + * because the oplog rolled over, this will return a sentinel oplog entry instead with + * type 'n' and o2 field set to Session::kDeadEndSentinel. This will also mean that + * next subsequent calls to hasNext will return false. + */ + repl::OplogEntry getNext(OperationContext* opCtx); + + BSONObj toBSON() const { + return _record.toBSON(); + } + + private: + const SessionTxnRecord _record; + const int _initialRollbackId; + std::unique_ptr<TransactionHistoryIterator> _writeHistoryIterator; + }; + /////////////////////////////////////////////////////////////////////////// // Methods for extracting the oplog entries from session information. @@ -161,18 +192,20 @@ private: const NamespaceString _ns; - // Protects _alreadyInitialized, _sessionCatalogCursor, _writeHistoryIterator - // _lastFetchedOplogBuffer, _lastFetchedOplog + // Protects _alreadyInitialized, _sessionCatalogCursor, _sessionOplogIterators + // _currentOplogIterator, _lastFetchedOplogBuffer, _lastFetchedOplog stdx::mutex _sessionCloneMutex; - bool _alreadyInitialized = false; - std::set<repl::OpTime> _sessionLastWriteOpTimes; + int _rollbackIdAtInit = 0; + + // List of remaining session records that needs to be cloned. + std::vector<std::unique_ptr<SessionOplogIterator>> _sessionOplogIterators; - // Iterator for oplog entries for a specific transaction. - std::unique_ptr<TransactionHistoryIterator> _writeHistoryIterator; + // Points to the current session record eing cloned. + std::unique_ptr<SessionOplogIterator> _currentOplogIterator; - // Used for temporarily storing oplog entries for operations that has more than one entry. + // Used for temporarily storng oplog entries for operations that has more than one entry. // For example, findAndModify generates one for the actual operation and another for the // pre/post image. std::vector<repl::OplogEntry> _lastFetchedOplogBuffer; diff --git a/src/mongo/db/s/session_catalog_migration_source_test.cpp b/src/mongo/db/s/session_catalog_migration_source_test.cpp index c1e707c0c9f..2bd48ce08dc 100644 --- a/src/mongo/db/s/session_catalog_migration_source_test.cpp +++ b/src/mongo/db/s/session_catalog_migration_source_test.cpp @@ -31,7 +31,9 @@ #include "mongo/db/dbdirectclient.h" #include "mongo/db/logical_session_id.h" #include "mongo/db/repl/mock_repl_coord_server_fixture.h" +#include "mongo/db/repl/replication_process.h" #include "mongo/db/s/session_catalog_migration_source.h" +#include "mongo/db/session.h" #include "mongo/db/session_txn_record_gen.h" #include "mongo/executor/remote_command_request.h" #include "mongo/unittest/unittest.h" @@ -58,11 +60,13 @@ TEST_F(SessionCatalogMigrationSourceTest, OneSessionWithTwoWrites) { repl::OplogEntry entry1( repl::OpTime(Timestamp(52, 345), 2), 0, repl::OpTypeEnum::kInsert, kNs, BSON("x" << 30)); entry1.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0)); + entry1.setStatementId(0); insertOplogEntry(entry1); repl::OplogEntry entry2( repl::OpTime(Timestamp(67, 54801), 2), 0, repl::OpTypeEnum::kInsert, kNs, BSON("y" << 50)); entry2.setPrevWriteOpTimeInTransaction(entry1.getOpTime()); + entry2.setStatementId(1); insertOplogEntry(entry2); SessionTxnRecord sessionRecord; @@ -104,9 +108,11 @@ TEST_F(SessionCatalogMigrationSourceTest, TwoSessionWithTwoWrites) { repl::OplogEntry entry1a( repl::OpTime(Timestamp(52, 345), 2), 0, repl::OpTypeEnum::kInsert, kNs, BSON("x" << 30)); entry1a.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0)); + entry1a.setStatementId(0); repl::OplogEntry entry1b( repl::OpTime(Timestamp(67, 54801), 2), 0, repl::OpTypeEnum::kInsert, kNs, BSON("y" << 50)); + entry1b.setStatementId(1); entry1b.setPrevWriteOpTimeInTransaction(entry1a.getOpTime()); SessionTxnRecord sessionRecord1; @@ -121,10 +127,12 @@ TEST_F(SessionCatalogMigrationSourceTest, TwoSessionWithTwoWrites) { repl::OplogEntry entry2a( repl::OpTime(Timestamp(43, 12), 2), 0, repl::OpTypeEnum::kDelete, kNs, BSON("x" << 30)); entry2a.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0)); + entry2a.setStatementId(3); repl::OplogEntry entry2b( repl::OpTime(Timestamp(789, 13), 2), 0, repl::OpTypeEnum::kDelete, kNs, BSON("y" << 50)); entry2b.setPrevWriteOpTimeInTransaction(entry2a.getOpTime()); + entry2b.setStatementId(4); SessionTxnRecord sessionRecord2; sessionRecord2.setSessionId(makeLogicalSessionIdForTest()); @@ -162,15 +170,23 @@ TEST_F(SessionCatalogMigrationSourceTest, TwoSessionWithTwoWrites) { } }; - checkNextBatch(entry1b, entry1a); + if (sessionRecord1.getSessionId().toBSON().woCompare(sessionRecord2.getSessionId().toBSON()) < + 0) { + checkNextBatch(entry2b, entry2a); - ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); - ASSERT_TRUE(migrationSource.hasMoreOplog()); + ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); + ASSERT_TRUE(migrationSource.hasMoreOplog()); - checkNextBatch(entry2b, entry2a); + checkNextBatch(entry1b, entry1a); - ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx())); - ASSERT_FALSE(migrationSource.hasMoreOplog()); + } else { + checkNextBatch(entry1b, entry1a); + + ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); + ASSERT_TRUE(migrationSource.hasMoreOplog()); + + checkNextBatch(entry2b, entry2a); + } } // It is currently not possible to have 2 findAndModify operations in one transaction, but this @@ -181,17 +197,20 @@ TEST_F(SessionCatalogMigrationSourceTest, OneSessionWithFindAndModifyPreImageAnd repl::OplogEntry entry1( repl::OpTime(Timestamp(52, 345), 2), 0, repl::OpTypeEnum::kNoop, kNs, BSON("x" << 30)); entry1.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0)); + entry1.setStatementId(0); insertOplogEntry(entry1); repl::OplogEntry entry2( repl::OpTime(Timestamp(52, 346), 2), 0, repl::OpTypeEnum::kDelete, kNs, BSON("y" << 50)); entry2.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0)); entry2.setPreImageOpTime(entry1.getOpTime()); + entry2.setStatementId(1); insertOplogEntry(entry2); repl::OplogEntry entry3( repl::OpTime(Timestamp(73, 5), 2), 0, repl::OpTypeEnum::kNoop, kNs, BSON("x" << 20)); entry3.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0)); + entry3.setStatementId(2); insertOplogEntry(entry3); repl::OplogEntry entry4(repl::OpTime(Timestamp(73, 6), 2), @@ -202,6 +221,7 @@ TEST_F(SessionCatalogMigrationSourceTest, OneSessionWithFindAndModifyPreImageAnd BSON("$inc" << BSON("x" << 1))); entry4.setPrevWriteOpTimeInTransaction(entry2.getOpTime()); entry4.setPostImageOpTime(entry3.getOpTime()); + entry4.setStatementId(3); insertOplogEntry(entry4); SessionTxnRecord sessionRecord; @@ -236,6 +256,7 @@ TEST_F(SessionCatalogMigrationSourceTest, OplogWithOtherNsShouldBeIgnored) { repl::OplogEntry entry1( repl::OpTime(Timestamp(52, 345), 2), 0, repl::OpTypeEnum::kInsert, kNs, BSON("x" << 30)); entry1.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0)); + entry1.setStatementId(0); insertOplogEntry(entry1); SessionTxnRecord sessionRecord1; @@ -254,6 +275,7 @@ TEST_F(SessionCatalogMigrationSourceTest, OplogWithOtherNsShouldBeIgnored) { NamespaceString("x.y"), BSON("x" << 30)); entry2.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0)); + entry2.setStatementId(1); insertOplogEntry(entry2); SessionTxnRecord sessionRecord2; @@ -284,6 +306,7 @@ TEST_F(SessionCatalogMigrationSourceTest, SessionDumpWithMultipleNewWrites) { repl::OplogEntry entry1( repl::OpTime(Timestamp(52, 345), 2), 0, repl::OpTypeEnum::kInsert, kNs, BSON("x" << 30)); entry1.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0)); + entry1.setStatementId(0); insertOplogEntry(entry1); SessionTxnRecord sessionRecord1; @@ -298,11 +321,13 @@ TEST_F(SessionCatalogMigrationSourceTest, SessionDumpWithMultipleNewWrites) { repl::OplogEntry entry2( repl::OpTime(Timestamp(53, 12), 2), 0, repl::OpTypeEnum::kDelete, kNs, BSON("x" << 30)); entry2.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0)); + entry2.setStatementId(1); insertOplogEntry(entry2); repl::OplogEntry entry3( repl::OpTime(Timestamp(55, 12), 2), 0, repl::OpTypeEnum::kInsert, kNs, BSON("z" << 40)); entry3.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0)); + entry3.setStatementId(2); insertOplogEntry(entry3); SessionCatalogMigrationSource migrationSource(kNs); @@ -366,6 +391,7 @@ TEST_F(SessionCatalogMigrationSourceTest, ShouldBeAbleInsertNewWritesAfterBuffer kNs, BSON("x" << 30)); entry.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0)); + entry.setStatementId(0); insertOplogEntry(entry); migrationSource.notifyNewWriteOpTime(entry.getOpTime()); @@ -385,6 +411,7 @@ TEST_F(SessionCatalogMigrationSourceTest, ShouldBeAbleInsertNewWritesAfterBuffer repl::OplogEntry entry( repl::OpTime(Timestamp(53, 12), 2), 0, repl::OpTypeEnum::kDelete, kNs, BSON("x" << 30)); entry.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0)); + entry.setStatementId(1); insertOplogEntry(entry); migrationSource.notifyNewWriteOpTime(entry.getOpTime()); @@ -403,6 +430,7 @@ TEST_F(SessionCatalogMigrationSourceTest, ShouldBeAbleInsertNewWritesAfterBuffer repl::OplogEntry entry( repl::OpTime(Timestamp(55, 12), 2), 0, repl::OpTypeEnum::kInsert, kNs, BSON("z" << 40)); entry.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0)); + entry.setStatementId(2); insertOplogEntry(entry); migrationSource.notifyNewWriteOpTime(entry.getOpTime()); @@ -418,6 +446,97 @@ TEST_F(SessionCatalogMigrationSourceTest, ShouldBeAbleInsertNewWritesAfterBuffer } } +TEST_F(SessionCatalogMigrationSourceTest, ReturnsDeadEndSentinelForIncompleteHistory) { + const NamespaceString kNs("a.b"); + + repl::OplogEntry entry( + repl::OpTime(Timestamp(52, 345), 2), 0, repl::OpTypeEnum::kInsert, kNs, BSON("x" << 30)); + entry.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(40, 1), 2)); + entry.setStatementId(0); + insertOplogEntry(entry); + + const auto sessionId = makeLogicalSessionIdForTest(); + + SessionTxnRecord sessionRecord; + sessionRecord.setSessionId(sessionId); + sessionRecord.setTxnNum(31); + sessionRecord.setLastWriteOpTime(entry.getOpTime()); + + DBDirectClient client(opCtx()); + client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord.toBSON()); + + SessionCatalogMigrationSource migrationSource(kNs); + migrationSource.init(opCtx()); + ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); + + { + ASSERT_TRUE(migrationSource.hasMoreOplog()); + auto nextOplogResult = migrationSource.getLastFetchedOplog(); + ASSERT_FALSE(nextOplogResult.shouldWaitForMajority); + // Cannot compare directly because of SERVER-31356 + ASSERT_BSONOBJ_EQ(entry.toBSON(), nextOplogResult.oplog->toBSON()); + ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); + } + + { + ASSERT_TRUE(migrationSource.hasMoreOplog()); + auto nextOplogResult = migrationSource.getLastFetchedOplog(); + ASSERT_FALSE(nextOplogResult.shouldWaitForMajority); + + auto oplog = *nextOplogResult.oplog; + ASSERT_TRUE(oplog.getObject2()); + ASSERT_BSONOBJ_EQ(Session::kDeadEndSentinel, *oplog.getObject2()); + ASSERT_TRUE(oplog.getStatementId()); + ASSERT_EQ(kIncompleteHistoryStmtId, *oplog.getStatementId()); + + auto sessionInfo = oplog.getOperationSessionInfo(); + ASSERT_TRUE(sessionInfo.getSessionId()); + ASSERT_EQ(sessionId, *sessionInfo.getSessionId()); + ASSERT_TRUE(sessionInfo.getTxnNumber()); + ASSERT_EQ(31, *sessionInfo.getTxnNumber()); + } + + ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx())); + ASSERT_FALSE(migrationSource.hasMoreOplog()); +} + +TEST_F(SessionCatalogMigrationSourceTest, ShouldAssertWhenRollbackDetected) { + const NamespaceString kNs("a.b"); + + repl::OplogEntry entry( + repl::OpTime(Timestamp(52, 345), 2), 0, repl::OpTypeEnum::kInsert, kNs, BSON("x" << 30)); + entry.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(40, 1), 2)); + entry.setStatementId(0); + insertOplogEntry(entry); + + const auto sessionId = makeLogicalSessionIdForTest(); + + SessionTxnRecord sessionRecord; + sessionRecord.setSessionId(sessionId); + sessionRecord.setTxnNum(31); + sessionRecord.setLastWriteOpTime(entry.getOpTime()); + + DBDirectClient client(opCtx()); + client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord.toBSON()); + + SessionCatalogMigrationSource migrationSource(kNs); + migrationSource.init(opCtx()); + ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx())); + + { + ASSERT_TRUE(migrationSource.hasMoreOplog()); + auto nextOplogResult = migrationSource.getLastFetchedOplog(); + ASSERT_FALSE(nextOplogResult.shouldWaitForMajority); + // Cannot compare directly because of SERVER-31356 + ASSERT_BSONOBJ_EQ(entry.toBSON(), nextOplogResult.oplog->toBSON()); + } + + ASSERT_OK(repl::ReplicationProcess::get(opCtx())->incrementRollbackID(opCtx())); + + ASSERT_THROWS(migrationSource.fetchNextOplog(opCtx()), AssertionException); + ASSERT_TRUE(migrationSource.hasMoreOplog()); +} + } // namespace } // namespace mongo diff --git a/src/mongo/db/session.cpp b/src/mongo/db/session.cpp index 7c66713b13c..8acfb4cb2d0 100644 --- a/src/mongo/db/session.cpp +++ b/src/mongo/db/session.cpp @@ -102,6 +102,8 @@ MONGO_FP_DECLARE(onPrimaryTransactionalWrite); } // namespace +const BSONObj Session::kDeadEndSentinel(BSON("$incompleteOplogHistory" << 1)); + Session::Session(LogicalSessionId sessionId) : _sessionId(std::move(sessionId)) {} void Session::refreshFromStorageIfNeeded(OperationContext* opCtx) { @@ -131,11 +133,21 @@ void Session::refreshFromStorageIfNeeded(OperationContext* opCtx) { CommittedStatementTimestampMap activeTxnCommittedStatements; + bool hasIncompleteHistory = false; if (lastWrittenTxnRecord) { auto it = TransactionHistoryIterator(lastWrittenTxnRecord->getLastWriteOpTime()); while (it.hasNext()) { const auto entry = it.next(opCtx); invariant(entry.getStatementId()); + + if (*entry.getStatementId() == kIncompleteHistoryStmtId) { + // Only the dead end sentinel can have this id for oplog write history. + invariant(entry.getObject2()); + invariant(entry.getObject2()->woCompare(kDeadEndSentinel) == 0); + hasIncompleteHistory = true; + continue; + } + const auto insertRes = activeTxnCommittedStatements.emplace(*entry.getStatementId(), entry.getOpTime()); if (!insertRes.second) { @@ -152,6 +164,8 @@ void Session::refreshFromStorageIfNeeded(OperationContext* opCtx) { ul.lock(); + _hasIncompleteHistory = hasIncompleteHistory; + // Protect against concurrent refreshes or invalidations if (!_isValid && _numInvalidations == numInvalidations) { _isValid = true; @@ -232,6 +246,7 @@ void Session::invalidate() { _activeTxnNumber = kUninitializedTxnNumber; _activeTxnCommittedStatements.clear(); + _hasIncompleteHistory = false; } repl::OpTime Session::getLastWriteOpTime(TxnNumber txnNumber) const { @@ -250,7 +265,17 @@ boost::optional<repl::OplogEntry> Session::checkStatementExecuted(OperationConte StmtId stmtId) const { const auto stmtTimestamp = [&] { stdx::lock_guard<stdx::mutex> lg(_mutex); - return _checkStatementExecuted(lg, txnNumber, stmtId); + auto result = _checkStatementExecuted(lg, txnNumber, stmtId); + + if (!result) { + uassert(ErrorCodes::IncompleteTransactionHistory, + str::stream() << "incomplete history detected for lsid: " << _sessionId.toBSON() + << ", txnNum: " + << txnNumber, + !_hasIncompleteHistory); + } + + return result; }(); if (!stmtTimestamp) @@ -289,6 +314,7 @@ void Session::_beginTxn(WithLock wl, TxnNumber txnNumber) { _activeTxnNumber = txnNumber; _activeTxnCommittedStatements.clear(); + _hasIncompleteHistory = false; } void Session::_checkValid(WithLock) const { @@ -316,8 +342,9 @@ boost::optional<repl::OpTime> Session::_checkStatementExecuted(WithLock wl, _checkIsActiveTransaction(wl, txnNumber); const auto it = _activeTxnCommittedStatements.find(stmtId); - if (it == _activeTxnCommittedStatements.end()) + if (it == _activeTxnCommittedStatements.end()) { return boost::none; + } invariant(_lastWrittenSessionRecord); invariant(_lastWrittenSessionRecord->getTxnNum() == txnNumber); @@ -396,6 +423,11 @@ void Session::_registerUpdateCacheOnCommit(OperationContext* opCtx, if (newTxnNumber == _activeTxnNumber) { for (const auto stmtId : stmtIdsWritten) { + if (stmtId == kIncompleteHistoryStmtId) { + _hasIncompleteHistory = true; + continue; + } + const auto insertRes = _activeTxnCommittedStatements.emplace(stmtId, lastStmtIdWriteOpTime); if (!insertRes.second) { diff --git a/src/mongo/db/session.h b/src/mongo/db/session.h index ffc0b7b59f5..f627ca8468d 100644 --- a/src/mongo/db/session.h +++ b/src/mongo/db/session.h @@ -55,6 +55,8 @@ class Session { MONGO_DISALLOW_COPYING(Session); public: + static const BSONObj kDeadEndSentinel; + explicit Session(LogicalSessionId sessionId); const LogicalSessionId& getSessionId() const { @@ -169,6 +171,10 @@ private: // happen during refresh int _numInvalidations{0}; + // Set to true if incomplete history is detected. For example, when the oplog to a write was + // truncated because it was too old. + bool _hasIncompleteHistory{false}; + // Caches what is known to be the last written transaction record for the session boost::optional<SessionTxnRecord> _lastWrittenSessionRecord; diff --git a/src/mongo/db/session_test.cpp b/src/mongo/db/session_test.cpp index 168232a2c25..15cbcd8ba27 100644 --- a/src/mongo/db/session_test.cpp +++ b/src/mongo/db/session_test.cpp @@ -338,5 +338,80 @@ TEST_F(SessionTest, WriteOpCompletedOnPrimaryCommitIgnoresInvalidation) { ASSERT(session.checkStatementExecuted(opCtx(), txnNum, 0)); } +TEST_F(SessionTest, ErrorOnlyWhenStmtIdBeingCheckedIsNotInCache) { + const auto sessionId = makeLogicalSessionIdForTest(); + const TxnNumber txnNum = 2; + + OperationSessionInfo osi; + osi.setSessionId(sessionId); + osi.setTxnNumber(txnNum); + + Session session(sessionId); + session.refreshFromStorageIfNeeded(opCtx()); + session.beginTxn(opCtx(), txnNum); + + auto firstOpTime = ([&]() { + AutoGetCollection autoColl(opCtx(), kNss, MODE_IX); + WriteUnitOfWork wuow(opCtx()); + + auto opTime = repl::logOp(opCtx(), + "i", + kNss, + kUUID, + BSON("x" << 1), + &Session::kDeadEndSentinel, + false, + osi, + 1, + {}); + session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {1}, opTime); + wuow.commit(); + + return opTime; + })(); + + { + repl::OplogLink link; + link.prevOpTime = firstOpTime; + + AutoGetCollection autoColl(opCtx(), kNss, MODE_IX); + WriteUnitOfWork wuow(opCtx()); + + auto opTime = repl::logOp(opCtx(), + "n", + kNss, + kUUID, + {}, + &Session::kDeadEndSentinel, + false, + osi, + kIncompleteHistoryStmtId, + link); + + session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {kIncompleteHistoryStmtId}, opTime); + wuow.commit(); + } + + { + auto oplog = session.checkStatementExecuted(opCtx(), txnNum, 1); + ASSERT_TRUE(oplog); + ASSERT_EQ(firstOpTime, oplog->getOpTime()); + } + + ASSERT_THROWS(session.checkStatementExecuted(opCtx(), txnNum, 2), AssertionException); + + // Should have the same behavior after loading state from storage. + session.invalidate(); + session.refreshFromStorageIfNeeded(opCtx()); + + { + auto oplog = session.checkStatementExecuted(opCtx(), txnNum, 1); + ASSERT_TRUE(oplog); + ASSERT_EQ(firstOpTime, oplog->getOpTime()); + } + + ASSERT_THROWS(session.checkStatementExecuted(opCtx(), txnNum, 2), AssertionException); +} + } // namespace } // namespace mongo |