summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/db/logical_session_id.h5
-rw-r--r--src/mongo/db/repl/mock_repl_coord_server_fixture.cpp24
-rw-r--r--src/mongo/db/repl/mock_repl_coord_server_fixture.h2
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp41
-rw-r--r--src/mongo/db/s/migration_chunk_cloner_source_legacy.h2
-rw-r--r--src/mongo/db/s/session_catalog_migration_destination.cpp15
-rw-r--r--src/mongo/db/s/session_catalog_migration_destination_test.cpp109
-rw-r--r--src/mongo/db/s/session_catalog_migration_source.cpp105
-rw-r--r--src/mongo/db/s/session_catalog_migration_source.h47
-rw-r--r--src/mongo/db/s/session_catalog_migration_source_test.cpp131
-rw-r--r--src/mongo/db/session.cpp36
-rw-r--r--src/mongo/db/session.h6
-rw-r--r--src/mongo/db/session_test.cpp75
13 files changed, 541 insertions, 57 deletions
diff --git a/src/mongo/db/logical_session_id.h b/src/mongo/db/logical_session_id.h
index 24629bdd050..a769c19d0f9 100644
--- a/src/mongo/db/logical_session_id.h
+++ b/src/mongo/db/logical_session_id.h
@@ -40,7 +40,12 @@ namespace mongo {
using TxnNumber = std::int64_t;
using StmtId = std::int32_t;
+// Default value for unassigned statementId.
const StmtId kUninitializedStmtId = -1;
+
+// Used as a substitute statementId for oplog entries that were truncated and lost.
+const StmtId kIncompleteHistoryStmtId = -2;
+
const TxnNumber kUninitializedTxnNumber = -1;
class BSONObjBuilder;
diff --git a/src/mongo/db/repl/mock_repl_coord_server_fixture.cpp b/src/mongo/db/repl/mock_repl_coord_server_fixture.cpp
index 73c0209864a..d690484b0ce 100644
--- a/src/mongo/db/repl/mock_repl_coord_server_fixture.cpp
+++ b/src/mongo/db/repl/mock_repl_coord_server_fixture.cpp
@@ -28,18 +28,22 @@
#include "mongo/platform/basic.h"
-#include "mongo/db/repl/mock_repl_coord_server_fixture.h"
-
#include "mongo/db/client.h"
#include "mongo/db/curop.h"
#include "mongo/db/db_raii.h"
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/operation_context.h"
+#include "mongo/db/repl/mock_repl_coord_server_fixture.h"
#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/oplog_entry.h"
+#include "mongo/db/repl/replication_consistency_markers_mock.h"
#include "mongo/db/repl/replication_coordinator_mock.h"
+#include "mongo/db/repl/replication_process.h"
+#include "mongo/db/repl/replication_recovery_mock.h"
+#include "mongo/db/repl/storage_interface_mock.h"
#include "mongo/db/service_context.h"
#include "mongo/db/service_context_d_test_fixture.h"
+#include "mongo/stdx/memory.h"
namespace mongo {
@@ -48,13 +52,27 @@ void MockReplCoordServerFixture::setUp() {
_opCtx = cc().makeOperationContext();
+ auto service = getServiceContext();
+
+ _storageInterface = new repl::StorageInterfaceMock();
+ repl::StorageInterface::set(service,
+ std::unique_ptr<repl::StorageInterface>(_storageInterface));
+ ASSERT_TRUE(_storageInterface == repl::StorageInterface::get(service));
+
+ repl::ReplicationProcess::set(service,
+ stdx::make_unique<repl::ReplicationProcess>(
+ _storageInterface,
+ stdx::make_unique<repl::ReplicationConsistencyMarkersMock>(),
+ stdx::make_unique<repl::ReplicationRecoveryMock>()));
+
+ ASSERT_OK(repl::ReplicationProcess::get(service)->initializeRollbackID(opCtx()));
+
// Insert code path assumes existence of repl coordinator!
repl::ReplSettings replSettings;
replSettings.setReplSetString(
ConnectionString::forReplicaSet("sessionTxnStateTest", {HostAndPort("a:1")}).toString());
replSettings.setMaster(true);
- auto service = getServiceContext();
repl::ReplicationCoordinator::set(
service, stdx::make_unique<repl::ReplicationCoordinatorMock>(service, replSettings));
diff --git a/src/mongo/db/repl/mock_repl_coord_server_fixture.h b/src/mongo/db/repl/mock_repl_coord_server_fixture.h
index 71f1fa06fec..fb1b71f4d0a 100644
--- a/src/mongo/db/repl/mock_repl_coord_server_fixture.h
+++ b/src/mongo/db/repl/mock_repl_coord_server_fixture.h
@@ -37,6 +37,7 @@ class OperationContext;
namespace repl {
class OplogEntry;
+class StorageInterfaceMock;
}
/**
@@ -58,6 +59,7 @@ public:
private:
ServiceContext::UniqueOperationContext _opCtx;
+ repl::StorageInterfaceMock* _storageInterface;
};
} // namespace mongo
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
index a4c146c1ade..05436465040 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.cpp
@@ -164,12 +164,14 @@ public:
MONGO_UNREACHABLE;
}
- if (!_prePostImageOpTime.isNull()) {
- _cloner->_sessionCatalogSource.notifyNewWriteOpTime(_prePostImageOpTime);
- }
+ if (auto sessionSource = _cloner->_sessionCatalogSource.get()) {
+ if (!_prePostImageOpTime.isNull()) {
+ sessionSource->notifyNewWriteOpTime(_prePostImageOpTime);
+ }
- if (!_opTime.isNull()) {
- _cloner->_sessionCatalogSource.notifyNewWriteOpTime(_opTime);
+ if (!_opTime.isNull()) {
+ sessionSource->notifyNewWriteOpTime(_opTime);
+ }
}
}
@@ -192,8 +194,7 @@ MigrationChunkClonerSourceLegacy::MigrationChunkClonerSourceLegacy(MoveChunkRequ
_sessionId(MigrationSessionId::generate(_args.getFromShardId().toString(),
_args.getToShardId().toString())),
_donorConnStr(std::move(donorConnStr)),
- _recipientHost(std::move(recipientHost)),
- _sessionCatalogSource(_args.getNss()) {}
+ _recipientHost(std::move(recipientHost)) {}
MigrationChunkClonerSourceLegacy::~MigrationChunkClonerSourceLegacy() {
invariant(_state == kDone);
@@ -204,7 +205,14 @@ Status MigrationChunkClonerSourceLegacy::startClone(OperationContext* opCtx) {
invariant(_state == kNew);
invariant(!opCtx->lockState()->isLocked());
- _sessionCatalogSource.init(opCtx);
+ auto const replCoord = repl::ReplicationCoordinator::get(opCtx);
+ if (replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet) {
+ _sessionCatalogSource = stdx::make_unique<SessionCatalogMigrationSource>(_args.getNss());
+ _sessionCatalogSource->init(opCtx);
+
+ // Prime up the session migration source if there are oplog entries to migrate.
+ _sessionCatalogSource->fetchNextOplog(opCtx);
+ }
// Load the ids of the currently available documents
auto storeCurrentLocsStatus = _storeCurrentLocs(opCtx);
@@ -212,9 +220,6 @@ Status MigrationChunkClonerSourceLegacy::startClone(OperationContext* opCtx) {
return storeCurrentLocsStatus;
}
- // Prime up the session migration source if there are oplog entries to migrate.
- _sessionCatalogSource.fetchNextOplog(opCtx);
-
// Tell the recipient shard to start cloning
BSONObjBuilder cmdBuilder;
StartChunkCloneRequest::appendAsCommand(&cmdBuilder,
@@ -338,7 +343,7 @@ Status MigrationChunkClonerSourceLegacy::commitClone(OperationContext* opCtx) {
if (responseStatus.isOK()) {
_cleanup(opCtx);
- if (_sessionCatalogSource.hasMoreOplog()) {
+ if (_sessionCatalogSource && _sessionCatalogSource->hasMoreOplog()) {
return {ErrorCodes::SessionTransferIncomplete,
"destination shard finished committing but there are still some session "
"metadata that needs to be transferred"};
@@ -735,12 +740,16 @@ repl::OpTime MigrationChunkClonerSourceLegacy::nextSessionMigrationBatch(
repl::OpTime opTimeToWait;
auto seenOpTimeTerm = repl::OpTime::kUninitializedTerm;
- while (_sessionCatalogSource.hasMoreOplog()) {
- auto result = _sessionCatalogSource.getLastFetchedOplog();
+ if (!_sessionCatalogSource) {
+ return {};
+ }
+
+ while (_sessionCatalogSource->hasMoreOplog()) {
+ auto result = _sessionCatalogSource->getLastFetchedOplog();
if (!result.oplog) {
// Last fetched turned out empty, try to see if there are more
- _sessionCatalogSource.fetchNextOplog(opCtx);
+ _sessionCatalogSource->fetchNextOplog(opCtx);
continue;
}
@@ -764,7 +773,7 @@ repl::OpTime MigrationChunkClonerSourceLegacy::nextSessionMigrationBatch(
}
arrBuilder->append(oplogDoc);
- _sessionCatalogSource.fetchNextOplog(opCtx);
+ _sessionCatalogSource->fetchNextOplog(opCtx);
if (result.shouldWaitForMajority) {
if (opTimeToWait < newOpTime) {
diff --git a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h
index 5bf180bcea7..1bdad9ee4c7 100644
--- a/src/mongo/db/s/migration_chunk_cloner_source_legacy.h
+++ b/src/mongo/db/s/migration_chunk_cloner_source_legacy.h
@@ -201,7 +201,7 @@ private:
// during the cloning stage
std::unique_ptr<PlanExecutor, PlanExecutor::Deleter> _deleteNotifyExec;
- SessionCatalogMigrationSource _sessionCatalogSource;
+ std::unique_ptr<SessionCatalogMigrationSource> _sessionCatalogSource;
// Protects the entries below
stdx::mutex _mutex;
diff --git a/src/mongo/db/s/session_catalog_migration_destination.cpp b/src/mongo/db/s/session_catalog_migration_destination.cpp
index 863d268257d..cdf0f0a778d 100644
--- a/src/mongo/db/s/session_catalog_migration_destination.cpp
+++ b/src/mongo/db/s/session_catalog_migration_destination.cpp
@@ -245,8 +245,16 @@ ProcessOplogResult processSessionOplog(OperationContext* opCtx,
auto scopedSession = SessionCatalog::get(opCtx)->getOrCreateSession(opCtx, result.sessionId);
scopedSession->beginTxn(opCtx, result.txnNum);
- if (scopedSession->checkStatementExecuted(opCtx, result.txnNum, stmtId)) {
- return lastResult;
+ if (stmtId != kIncompleteHistoryStmtId) {
+ try {
+ if (scopedSession->checkStatementExecuted(opCtx, result.txnNum, stmtId)) {
+ return lastResult;
+ }
+ } catch (const DBException& excep) {
+ if (excep.code() != ErrorCodes::IncompleteTransactionHistory) {
+ throw;
+ }
+ }
}
BSONObj object(result.isPrePostImage
@@ -291,8 +299,7 @@ ProcessOplogResult processSessionOplog(OperationContext* opCtx,
!oplogOpTime.isNull());
// Do not call onWriteOpCompletedOnPrimary if we inserted a pre/post
- // image, because
- // the next oplog will contain the real operation.
+ // image, because the next oplog will contain the real operation.
if (!result.isPrePostImage) {
scopedSession->onWriteOpCompletedOnPrimary(
opCtx, result.txnNum, {stmtId}, oplogOpTime);
diff --git a/src/mongo/db/s/session_catalog_migration_destination_test.cpp b/src/mongo/db/s/session_catalog_migration_destination_test.cpp
index 2bcc64ce537..053b7f4f28d 100644
--- a/src/mongo/db/s/session_catalog_migration_destination_test.cpp
+++ b/src/mongo/db/s/session_catalog_migration_destination_test.cpp
@@ -172,6 +172,24 @@ public:
}
}
+ void checkStatementExecuted(OperationContext* opCtx,
+ Session* session,
+ TxnNumber txnNumber,
+ StmtId stmtId) {
+ auto oplog = session->checkStatementExecuted(opCtx, txnNumber, stmtId);
+ ASSERT_TRUE(oplog);
+ }
+
+ void checkStatementExecuted(OperationContext* opCtx,
+ Session* session,
+ TxnNumber txnNumber,
+ StmtId stmtId,
+ repl::OplogEntry& expectedOplog) {
+ auto oplog = session->checkStatementExecuted(opCtx, txnNumber, stmtId);
+ ASSERT_TRUE(oplog);
+ checkOplogWithNestedOplog(expectedOplog, *oplog);
+ }
+
void insertDocWithSessionInfo(const OperationSessionInfo& sessionInfo,
const NamespaceString& ns,
const BSONObj& doc,
@@ -307,6 +325,10 @@ TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithSameTxn) {
checkOplogWithNestedOplog(oplog1, historyIter.next(opCtx));
ASSERT_FALSE(historyIter.hasNext());
+
+ checkStatementExecuted(opCtx, session.get(), 2, 23, oplog1);
+ checkStatementExecuted(opCtx, session.get(), 2, 45, oplog2);
+ checkStatementExecuted(opCtx, session.get(), 2, 5, oplog3);
}
TEST_F(SessionCatalogMigrationDestinationTest, ShouldOnlyStoreHistoryOfLatestTxn) {
@@ -347,6 +369,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldOnlyStoreHistoryOfLatestTxn
ASSERT_TRUE(historyIter.hasNext());
checkOplogWithNestedOplog(oplog3, historyIter.next(opCtx));
ASSERT_FALSE(historyIter.hasNext());
+
+ checkStatementExecuted(opCtx, session.get(), txnNum, 5, oplog3);
}
TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithSameTxnInSeparateBatches) {
@@ -394,6 +418,10 @@ TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithSameTxnInSeparate
checkOplogWithNestedOplog(oplog1, historyIter.next(opCtx));
ASSERT_FALSE(historyIter.hasNext());
+
+ checkStatementExecuted(opCtx, session.get(), 2, 23, oplog1);
+ checkStatementExecuted(opCtx, session.get(), 2, 45, oplog2);
+ checkStatementExecuted(opCtx, session.get(), 2, 5, oplog3);
}
TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithDifferentSession) {
@@ -439,6 +467,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithDifferentSession)
ASSERT_TRUE(historyIter.hasNext());
checkOplogWithNestedOplog(oplog1, historyIter.next(opCtx));
ASSERT_FALSE(historyIter.hasNext());
+
+ checkStatementExecuted(opCtx, session.get(), 2, 23, oplog1);
}
{
@@ -452,6 +482,9 @@ TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithDifferentSession)
checkOplogWithNestedOplog(oplog2, historyIter.next(opCtx));
ASSERT_FALSE(historyIter.hasNext());
+
+ checkStatementExecuted(opCtx, session.get(), 42, 45, oplog2);
+ checkStatementExecuted(opCtx, session.get(), 42, 5, oplog3);
}
}
@@ -511,6 +544,9 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldNotNestAlreadyNestedOplog)
checkOplog(oplog1, historyIter.next(opCtx));
ASSERT_FALSE(historyIter.hasNext());
+
+ checkStatementExecuted(opCtx, session.get(), 2, 23);
+ checkStatementExecuted(opCtx, session.get(), 2, 45);
}
TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandlePreImageFindAndModify) {
@@ -597,6 +633,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandlePreImageFindA
ASSERT_BSONOBJ_EQ(preImageOplog.getObject(), newPreImageOplog.getObject());
ASSERT_TRUE(newPreImageOplog.getObject2());
ASSERT_TRUE(newPreImageOplog.getObject2().value().isEmpty());
+
+ checkStatementExecuted(opCtx, session.get(), 2, 45, updateOplog);
}
TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandlePostImageFindAndModify) {
@@ -682,6 +720,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandlePostImageFind
ASSERT_BSONOBJ_EQ(postImageOplog.getObject(), newPostImageOplog.getObject());
ASSERT_TRUE(newPostImageOplog.getObject2());
ASSERT_TRUE(newPostImageOplog.getObject2().value().isEmpty());
+
+ checkStatementExecuted(opCtx, session.get(), 2, 45, updateOplog);
}
TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandleFindAndModifySplitIn2Batches) {
@@ -770,6 +810,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandleFindAndModify
ASSERT_BSONOBJ_EQ(preImageOplog.getObject(), newPreImageOplog.getObject());
ASSERT_TRUE(newPreImageOplog.getObject2());
ASSERT_TRUE(newPreImageOplog.getObject2().value().isEmpty());
+
+ checkStatementExecuted(opCtx, session.get(), 2, 45, updateOplog);
}
TEST_F(SessionCatalogMigrationDestinationTest, OlderTxnShouldBeIgnored) {
@@ -820,6 +862,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, OlderTxnShouldBeIgnored) {
oplog.getObject());
ASSERT_FALSE(historyIter.hasNext());
+
+ checkStatementExecuted(opCtx, session.get(), 20, 0);
}
TEST_F(SessionCatalogMigrationDestinationTest, NewerTxnWriteShouldNotBeOverwrittenByOldMigrateTxn) {
@@ -871,6 +915,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, NewerTxnWriteShouldNotBeOverwritt
ASSERT_BSONOBJ_EQ(BSON("_id"
<< "newerSess"),
oplog.getObject());
+
+ checkStatementExecuted(opCtx, session.get(), 20, 0);
}
TEST_F(SessionCatalogMigrationDestinationTest, ShouldJoinProperlyAfterNetworkError) {
@@ -1042,6 +1088,10 @@ TEST_F(SessionCatalogMigrationDestinationTest,
checkOplogWithNestedOplog(oplog1, historyIter.next(opCtx));
ASSERT_FALSE(historyIter.hasNext());
+
+ checkStatementExecuted(opCtx, session.get(), 2, 0);
+ checkStatementExecuted(opCtx, session.get(), 2, 23, oplog1);
+ checkStatementExecuted(opCtx, session.get(), 2, 45, oplog2);
}
TEST_F(SessionCatalogMigrationDestinationTest, ShouldErrorForConsecutivePreImageOplog) {
@@ -1323,6 +1373,65 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldIgnoreAlreadyExecutedStatem
ASSERT_BSONOBJ_EQ(BSON("_id" << 46), firstInsertOplog.getObject());
ASSERT_TRUE(firstInsertOplog.getStatementId());
ASSERT_EQ(30, *firstInsertOplog.getStatementId());
+
+ checkStatementExecuted(opCtx, session.get(), 19, 23, oplog1);
+ checkStatementExecuted(opCtx, session.get(), 19, 30);
+ checkStatementExecuted(opCtx, session.get(), 19, 45, oplog3);
+}
+
+TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithIncompleteHistory) {
+ const NamespaceString kNs("a.b");
+ const auto sessionId = makeLogicalSessionIdForTest();
+
+ SessionCatalogMigrationDestination sessionMigration(kFromShard, migrationId());
+ sessionMigration.start(getServiceContext());
+ sessionMigration.finish();
+
+ OperationSessionInfo sessionInfo;
+ sessionInfo.setSessionId(sessionId);
+ sessionInfo.setTxnNumber(2);
+
+ OplogEntry oplog1(
+ OpTime(Timestamp(100, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 100));
+ oplog1.setOperationSessionInfo(sessionInfo);
+ oplog1.setStatementId(23);
+
+ OplogEntry oplog2(
+ OpTime(Timestamp(80, 2), 1), 0, OpTypeEnum::kNoop, kNs, 0, {}, Session::kDeadEndSentinel);
+ oplog2.setOperationSessionInfo(sessionInfo);
+ oplog2.setStatementId(kIncompleteHistoryStmtId);
+
+ OplogEntry oplog3(OpTime(Timestamp(60, 2), 1), 0, OpTypeEnum::kInsert, kNs, 0, BSON("x" << 60));
+ oplog3.setOperationSessionInfo(sessionInfo);
+ oplog3.setStatementId(5);
+
+ returnOplog({oplog1, oplog2, oplog3});
+ // migration always fetches at least twice to transition from committing to done.
+ returnOplog({});
+ returnOplog({});
+
+ sessionMigration.join();
+
+ ASSERT_TRUE(SessionCatalogMigrationDestination::State::Done == sessionMigration.getState());
+
+ auto opCtx = operationContext();
+ auto session = getSessionWithTxn(opCtx, sessionId, 2);
+ TransactionHistoryIterator historyIter(session->getLastWriteOpTime(2));
+
+ ASSERT_TRUE(historyIter.hasNext());
+ checkOplogWithNestedOplog(oplog3, historyIter.next(opCtx));
+
+ ASSERT_TRUE(historyIter.hasNext());
+ checkOplog(oplog2, historyIter.next(opCtx));
+
+ ASSERT_TRUE(historyIter.hasNext());
+ checkOplogWithNestedOplog(oplog1, historyIter.next(opCtx));
+
+ ASSERT_FALSE(historyIter.hasNext());
+
+ checkStatementExecuted(opCtx, session.get(), 2, 23, oplog1);
+ checkStatementExecuted(opCtx, session.get(), 2, 5, oplog3);
+ ASSERT_THROWS(session->checkStatementExecuted(opCtx, 2, 38), AssertionException);
}
} // namespace
diff --git a/src/mongo/db/s/session_catalog_migration_source.cpp b/src/mongo/db/s/session_catalog_migration_source.cpp
index 021eb013c73..723c2151334 100644
--- a/src/mongo/db/s/session_catalog_migration_source.cpp
+++ b/src/mongo/db/s/session_catalog_migration_source.cpp
@@ -35,9 +35,12 @@
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/repl/repl_client_info.h"
+#include "mongo/db/repl/replication_process.h"
+#include "mongo/db/session.h"
#include "mongo/db/session_txn_record_gen.h"
#include "mongo/db/transaction_history_iterator.h"
#include "mongo/db/write_concern.h"
+#include "mongo/platform/random.h"
#include "mongo/stdx/memory.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/mongoutils/str.h"
@@ -46,6 +49,8 @@ namespace mongo {
namespace {
+PseudoRandom hashGenerator(std::unique_ptr<SecureRandom>(SecureRandom::create())->nextInt64());
+
boost::optional<repl::OplogEntry> fetchPrePostImageOplog(OperationContext* opCtx,
const repl::OplogEntry& oplog) {
auto opTimeToFetch = oplog.getPreImageOpTime();
@@ -97,14 +102,16 @@ bool SessionCatalogMigrationSource::fetchNextOplog(OperationContext* opCtx) {
}
bool SessionCatalogMigrationSource::_handleWriteHistory(WithLock, OperationContext* opCtx) {
- if (_writeHistoryIterator) {
- if (_writeHistoryIterator->hasNext()) {
- auto nextOplog = _writeHistoryIterator->next(opCtx);
+ if (_currentOplogIterator) {
+ if (_currentOplogIterator->hasNext()) {
+ auto nextOplog = _currentOplogIterator->getNext(opCtx);
+ auto nextStmtId = nextOplog.getStatementId();
// Note: This is an optimization based on the assumption that it is not possible to be
// touching different namespaces in the same transaction.
- if (nextOplog.getNamespace() != _ns) {
- _writeHistoryIterator.reset();
+ if (!nextStmtId || (nextStmtId && *nextStmtId != kIncompleteHistoryStmtId &&
+ nextOplog.getNamespace() != _ns)) {
+ _currentOplogIterator.reset();
return false;
}
@@ -118,7 +125,7 @@ bool SessionCatalogMigrationSource::_handleWriteHistory(WithLock, OperationConte
return true;
} else {
- _writeHistoryIterator.reset();
+ _currentOplogIterator.reset();
}
}
@@ -127,7 +134,8 @@ bool SessionCatalogMigrationSource::_handleWriteHistory(WithLock, OperationConte
bool SessionCatalogMigrationSource::_hasMoreOplogFromSessionCatalog() {
stdx::lock_guard<stdx::mutex> _lk(_sessionCloneMutex);
- return _lastFetchedOplog || !_lastFetchedOplogBuffer.empty();
+ return _lastFetchedOplog || !_lastFetchedOplogBuffer.empty() ||
+ !_sessionOplogIterators.empty() || _currentOplogIterator;
}
// Important: The no-op oplog entry for findAndModify should always be returned first before the
@@ -153,12 +161,10 @@ bool SessionCatalogMigrationSource::_fetchNextOplogFromSessionCatalog(OperationC
return true;
}
- while (!_sessionLastWriteOpTimes.empty()) {
- auto lowestOpTimeIter = _sessionLastWriteOpTimes.begin();
- auto nextOpTime = *lowestOpTimeIter;
- _sessionLastWriteOpTimes.erase(lowestOpTimeIter);
+ while (!_sessionOplogIterators.empty()) {
+ _currentOplogIterator = std::move(_sessionOplogIterators.back());
+ _sessionOplogIterators.pop_back();
- _writeHistoryIterator = stdx::make_unique<TransactionHistoryIterator>(nextOpTime);
if (_handleWriteHistory(lk, opCtx)) {
return true;
}
@@ -219,16 +225,22 @@ void SessionCatalogMigrationSource::notifyNewWriteOpTime(repl::OpTime opTime) {
void SessionCatalogMigrationSource::init(OperationContext* opCtx) {
invariant(!_alreadyInitialized);
+ _rollbackIdAtInit = uassertStatusOK(repl::ReplicationProcess::get(opCtx)->getRollbackID(opCtx));
+
DBDirectClient client(opCtx);
- auto cursor = client.query(NamespaceString::kSessionTransactionsTableNamespace.ns(), {});
+ Query query;
+ // Sort is not needed for correctness. This is just for making it easier to write deterministic
+ // tests.
+ query.sort(BSON("_id" << 1));
+ auto cursor = client.query(NamespaceString::kSessionTransactionsTableNamespace.ns(), query);
- std::set<repl::OpTime> opTimes;
+ std::vector<std::unique_ptr<SessionOplogIterator>> sessionOplogIterators;
while (cursor->more()) {
auto nextSession = SessionTxnRecord::parse(
IDLParserErrorContext("Session migration cloning"), cursor->next());
- auto opTime = nextSession.getLastWriteOpTime();
- if (!opTime.isNull()) {
- opTimes.insert(nextSession.getLastWriteOpTime());
+ if (!nextSession.getLastWriteOpTime().isNull()) {
+ sessionOplogIterators.push_back(
+ stdx::make_unique<SessionOplogIterator>(std::move(nextSession), _rollbackIdAtInit));
}
}
@@ -255,7 +267,64 @@ void SessionCatalogMigrationSource::init(OperationContext* opCtx) {
stdx::lock_guard<stdx::mutex> lk(_sessionCloneMutex);
_alreadyInitialized = true;
- _sessionLastWriteOpTimes.swap(opTimes);
+ _sessionOplogIterators.swap(sessionOplogIterators);
+}
+
+SessionCatalogMigrationSource::SessionOplogIterator::SessionOplogIterator(
+ SessionTxnRecord txnRecord, int expectedRollbackId)
+ : _record(std::move(txnRecord)), _initialRollbackId(expectedRollbackId) {
+ _writeHistoryIterator =
+ stdx::make_unique<TransactionHistoryIterator>(_record.getLastWriteOpTime());
+}
+
+bool SessionCatalogMigrationSource::SessionOplogIterator::hasNext() const {
+ return _writeHistoryIterator && _writeHistoryIterator->hasNext();
+}
+
+repl::OplogEntry SessionCatalogMigrationSource::SessionOplogIterator::getNext(
+ OperationContext* opCtx) {
+ try {
+ // Note: during SessionCatalogMigrationSource::init, we inserted a document and wait for it
+ // to committed to the majority. In addition, the TransactionHistoryIterator uses OpTime
+ // to query for the oplog. This means that if we can successfully fetch the oplog, we are
+ // guaranteed that they are majority committed. If we can't fetch the oplog, it can either
+ // mean that the oplog has been rolled over or was rolled back.
+ return _writeHistoryIterator->next(opCtx);
+ } catch (const AssertionException& excep) {
+ if (excep.code() == ErrorCodes::IncompleteTransactionHistory) {
+ // Note: no need to check if in replicaSet mode because having an iterator implies
+ // oplog exists.
+ auto rollbackId =
+ uassertStatusOK(repl::ReplicationProcess::get(opCtx)->getRollbackID(opCtx));
+
+ uassert(40656,
+ str::stream() << "rollback detected, rollbackId was " << _initialRollbackId
+ << " but is now "
+ << rollbackId,
+ rollbackId == _initialRollbackId);
+
+ // If the rollbackId hasn't changed, this means that the oplog has been truncated.
+ // So, we return the special "write history lost" sentinel.
+ repl::OplogEntry oplog({},
+ hashGenerator.nextInt64(),
+ repl::OpTypeEnum::kNoop,
+ {},
+ repl::OplogEntry::kOplogVersion,
+ {},
+ Session::kDeadEndSentinel);
+ OperationSessionInfo sessionInfo;
+ sessionInfo.setSessionId(_record.getSessionId());
+ sessionInfo.setTxnNumber(_record.getTxnNum());
+ oplog.setOperationSessionInfo(sessionInfo);
+ oplog.setStatementId(kIncompleteHistoryStmtId);
+
+ _writeHistoryIterator.reset();
+
+ return oplog;
+ }
+
+ throw;
+ }
}
} // namespace mongo
diff --git a/src/mongo/db/s/session_catalog_migration_source.h b/src/mongo/db/s/session_catalog_migration_source.h
index 0133281df07..2e5faf393c2 100644
--- a/src/mongo/db/s/session_catalog_migration_source.h
+++ b/src/mongo/db/s/session_catalog_migration_source.h
@@ -35,6 +35,7 @@
#include "mongo/client/dbclientcursor.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/repl/optime.h"
+#include "mongo/db/session_txn_record_gen.h"
#include "mongo/db/transaction_history_iterator.h"
#include "mongo/stdx/mutex.h"
#include "mongo/util/concurrency/with_lock.h"
@@ -111,6 +112,36 @@ public:
void notifyNewWriteOpTime(repl::OpTime opTimestamp);
private:
+ /**
+ * An iterator for extracting session write oplogs that need to be cloned during migration.
+ */
+ class SessionOplogIterator {
+ public:
+ SessionOplogIterator(SessionTxnRecord txnRecord, int expectedRollbackId);
+
+ /**
+ * Returns true if there are more oplog entries to fetch for this session.
+ */
+ bool hasNext() const;
+
+ /**
+ * Returns the next oplog write that happened in this session. If the oplog is lost
+ * because the oplog rolled over, this will return a sentinel oplog entry instead with
+ * type 'n' and o2 field set to Session::kDeadEndSentinel. This will also mean that
+ * next subsequent calls to hasNext will return false.
+ */
+ repl::OplogEntry getNext(OperationContext* opCtx);
+
+ BSONObj toBSON() const {
+ return _record.toBSON();
+ }
+
+ private:
+ const SessionTxnRecord _record;
+ const int _initialRollbackId;
+ std::unique_ptr<TransactionHistoryIterator> _writeHistoryIterator;
+ };
+
///////////////////////////////////////////////////////////////////////////
// Methods for extracting the oplog entries from session information.
@@ -161,18 +192,20 @@ private:
const NamespaceString _ns;
- // Protects _alreadyInitialized, _sessionCatalogCursor, _writeHistoryIterator
- // _lastFetchedOplogBuffer, _lastFetchedOplog
+ // Protects _alreadyInitialized, _sessionCatalogCursor, _sessionOplogIterators
+ // _currentOplogIterator, _lastFetchedOplogBuffer, _lastFetchedOplog
stdx::mutex _sessionCloneMutex;
-
bool _alreadyInitialized = false;
- std::set<repl::OpTime> _sessionLastWriteOpTimes;
+ int _rollbackIdAtInit = 0;
+
+ // List of remaining session records that needs to be cloned.
+ std::vector<std::unique_ptr<SessionOplogIterator>> _sessionOplogIterators;
- // Iterator for oplog entries for a specific transaction.
- std::unique_ptr<TransactionHistoryIterator> _writeHistoryIterator;
+ // Points to the current session record eing cloned.
+ std::unique_ptr<SessionOplogIterator> _currentOplogIterator;
- // Used for temporarily storing oplog entries for operations that has more than one entry.
+ // Used for temporarily storng oplog entries for operations that has more than one entry.
// For example, findAndModify generates one for the actual operation and another for the
// pre/post image.
std::vector<repl::OplogEntry> _lastFetchedOplogBuffer;
diff --git a/src/mongo/db/s/session_catalog_migration_source_test.cpp b/src/mongo/db/s/session_catalog_migration_source_test.cpp
index c1e707c0c9f..2bd48ce08dc 100644
--- a/src/mongo/db/s/session_catalog_migration_source_test.cpp
+++ b/src/mongo/db/s/session_catalog_migration_source_test.cpp
@@ -31,7 +31,9 @@
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/logical_session_id.h"
#include "mongo/db/repl/mock_repl_coord_server_fixture.h"
+#include "mongo/db/repl/replication_process.h"
#include "mongo/db/s/session_catalog_migration_source.h"
+#include "mongo/db/session.h"
#include "mongo/db/session_txn_record_gen.h"
#include "mongo/executor/remote_command_request.h"
#include "mongo/unittest/unittest.h"
@@ -58,11 +60,13 @@ TEST_F(SessionCatalogMigrationSourceTest, OneSessionWithTwoWrites) {
repl::OplogEntry entry1(
repl::OpTime(Timestamp(52, 345), 2), 0, repl::OpTypeEnum::kInsert, kNs, BSON("x" << 30));
entry1.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0));
+ entry1.setStatementId(0);
insertOplogEntry(entry1);
repl::OplogEntry entry2(
repl::OpTime(Timestamp(67, 54801), 2), 0, repl::OpTypeEnum::kInsert, kNs, BSON("y" << 50));
entry2.setPrevWriteOpTimeInTransaction(entry1.getOpTime());
+ entry2.setStatementId(1);
insertOplogEntry(entry2);
SessionTxnRecord sessionRecord;
@@ -104,9 +108,11 @@ TEST_F(SessionCatalogMigrationSourceTest, TwoSessionWithTwoWrites) {
repl::OplogEntry entry1a(
repl::OpTime(Timestamp(52, 345), 2), 0, repl::OpTypeEnum::kInsert, kNs, BSON("x" << 30));
entry1a.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0));
+ entry1a.setStatementId(0);
repl::OplogEntry entry1b(
repl::OpTime(Timestamp(67, 54801), 2), 0, repl::OpTypeEnum::kInsert, kNs, BSON("y" << 50));
+ entry1b.setStatementId(1);
entry1b.setPrevWriteOpTimeInTransaction(entry1a.getOpTime());
SessionTxnRecord sessionRecord1;
@@ -121,10 +127,12 @@ TEST_F(SessionCatalogMigrationSourceTest, TwoSessionWithTwoWrites) {
repl::OplogEntry entry2a(
repl::OpTime(Timestamp(43, 12), 2), 0, repl::OpTypeEnum::kDelete, kNs, BSON("x" << 30));
entry2a.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0));
+ entry2a.setStatementId(3);
repl::OplogEntry entry2b(
repl::OpTime(Timestamp(789, 13), 2), 0, repl::OpTypeEnum::kDelete, kNs, BSON("y" << 50));
entry2b.setPrevWriteOpTimeInTransaction(entry2a.getOpTime());
+ entry2b.setStatementId(4);
SessionTxnRecord sessionRecord2;
sessionRecord2.setSessionId(makeLogicalSessionIdForTest());
@@ -162,15 +170,23 @@ TEST_F(SessionCatalogMigrationSourceTest, TwoSessionWithTwoWrites) {
}
};
- checkNextBatch(entry1b, entry1a);
+ if (sessionRecord1.getSessionId().toBSON().woCompare(sessionRecord2.getSessionId().toBSON()) <
+ 0) {
+ checkNextBatch(entry2b, entry2a);
- ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
- ASSERT_TRUE(migrationSource.hasMoreOplog());
+ ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
+ ASSERT_TRUE(migrationSource.hasMoreOplog());
- checkNextBatch(entry2b, entry2a);
+ checkNextBatch(entry1b, entry1a);
- ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx()));
- ASSERT_FALSE(migrationSource.hasMoreOplog());
+ } else {
+ checkNextBatch(entry1b, entry1a);
+
+ ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
+ ASSERT_TRUE(migrationSource.hasMoreOplog());
+
+ checkNextBatch(entry2b, entry2a);
+ }
}
// It is currently not possible to have 2 findAndModify operations in one transaction, but this
@@ -181,17 +197,20 @@ TEST_F(SessionCatalogMigrationSourceTest, OneSessionWithFindAndModifyPreImageAnd
repl::OplogEntry entry1(
repl::OpTime(Timestamp(52, 345), 2), 0, repl::OpTypeEnum::kNoop, kNs, BSON("x" << 30));
entry1.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0));
+ entry1.setStatementId(0);
insertOplogEntry(entry1);
repl::OplogEntry entry2(
repl::OpTime(Timestamp(52, 346), 2), 0, repl::OpTypeEnum::kDelete, kNs, BSON("y" << 50));
entry2.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0));
entry2.setPreImageOpTime(entry1.getOpTime());
+ entry2.setStatementId(1);
insertOplogEntry(entry2);
repl::OplogEntry entry3(
repl::OpTime(Timestamp(73, 5), 2), 0, repl::OpTypeEnum::kNoop, kNs, BSON("x" << 20));
entry3.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0));
+ entry3.setStatementId(2);
insertOplogEntry(entry3);
repl::OplogEntry entry4(repl::OpTime(Timestamp(73, 6), 2),
@@ -202,6 +221,7 @@ TEST_F(SessionCatalogMigrationSourceTest, OneSessionWithFindAndModifyPreImageAnd
BSON("$inc" << BSON("x" << 1)));
entry4.setPrevWriteOpTimeInTransaction(entry2.getOpTime());
entry4.setPostImageOpTime(entry3.getOpTime());
+ entry4.setStatementId(3);
insertOplogEntry(entry4);
SessionTxnRecord sessionRecord;
@@ -236,6 +256,7 @@ TEST_F(SessionCatalogMigrationSourceTest, OplogWithOtherNsShouldBeIgnored) {
repl::OplogEntry entry1(
repl::OpTime(Timestamp(52, 345), 2), 0, repl::OpTypeEnum::kInsert, kNs, BSON("x" << 30));
entry1.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0));
+ entry1.setStatementId(0);
insertOplogEntry(entry1);
SessionTxnRecord sessionRecord1;
@@ -254,6 +275,7 @@ TEST_F(SessionCatalogMigrationSourceTest, OplogWithOtherNsShouldBeIgnored) {
NamespaceString("x.y"),
BSON("x" << 30));
entry2.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0));
+ entry2.setStatementId(1);
insertOplogEntry(entry2);
SessionTxnRecord sessionRecord2;
@@ -284,6 +306,7 @@ TEST_F(SessionCatalogMigrationSourceTest, SessionDumpWithMultipleNewWrites) {
repl::OplogEntry entry1(
repl::OpTime(Timestamp(52, 345), 2), 0, repl::OpTypeEnum::kInsert, kNs, BSON("x" << 30));
entry1.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0));
+ entry1.setStatementId(0);
insertOplogEntry(entry1);
SessionTxnRecord sessionRecord1;
@@ -298,11 +321,13 @@ TEST_F(SessionCatalogMigrationSourceTest, SessionDumpWithMultipleNewWrites) {
repl::OplogEntry entry2(
repl::OpTime(Timestamp(53, 12), 2), 0, repl::OpTypeEnum::kDelete, kNs, BSON("x" << 30));
entry2.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0));
+ entry2.setStatementId(1);
insertOplogEntry(entry2);
repl::OplogEntry entry3(
repl::OpTime(Timestamp(55, 12), 2), 0, repl::OpTypeEnum::kInsert, kNs, BSON("z" << 40));
entry3.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0));
+ entry3.setStatementId(2);
insertOplogEntry(entry3);
SessionCatalogMigrationSource migrationSource(kNs);
@@ -366,6 +391,7 @@ TEST_F(SessionCatalogMigrationSourceTest, ShouldBeAbleInsertNewWritesAfterBuffer
kNs,
BSON("x" << 30));
entry.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0));
+ entry.setStatementId(0);
insertOplogEntry(entry);
migrationSource.notifyNewWriteOpTime(entry.getOpTime());
@@ -385,6 +411,7 @@ TEST_F(SessionCatalogMigrationSourceTest, ShouldBeAbleInsertNewWritesAfterBuffer
repl::OplogEntry entry(
repl::OpTime(Timestamp(53, 12), 2), 0, repl::OpTypeEnum::kDelete, kNs, BSON("x" << 30));
entry.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0));
+ entry.setStatementId(1);
insertOplogEntry(entry);
migrationSource.notifyNewWriteOpTime(entry.getOpTime());
@@ -403,6 +430,7 @@ TEST_F(SessionCatalogMigrationSourceTest, ShouldBeAbleInsertNewWritesAfterBuffer
repl::OplogEntry entry(
repl::OpTime(Timestamp(55, 12), 2), 0, repl::OpTypeEnum::kInsert, kNs, BSON("z" << 40));
entry.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(0, 0), 0));
+ entry.setStatementId(2);
insertOplogEntry(entry);
migrationSource.notifyNewWriteOpTime(entry.getOpTime());
@@ -418,6 +446,97 @@ TEST_F(SessionCatalogMigrationSourceTest, ShouldBeAbleInsertNewWritesAfterBuffer
}
}
+TEST_F(SessionCatalogMigrationSourceTest, ReturnsDeadEndSentinelForIncompleteHistory) {
+ const NamespaceString kNs("a.b");
+
+ repl::OplogEntry entry(
+ repl::OpTime(Timestamp(52, 345), 2), 0, repl::OpTypeEnum::kInsert, kNs, BSON("x" << 30));
+ entry.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(40, 1), 2));
+ entry.setStatementId(0);
+ insertOplogEntry(entry);
+
+ const auto sessionId = makeLogicalSessionIdForTest();
+
+ SessionTxnRecord sessionRecord;
+ sessionRecord.setSessionId(sessionId);
+ sessionRecord.setTxnNum(31);
+ sessionRecord.setLastWriteOpTime(entry.getOpTime());
+
+ DBDirectClient client(opCtx());
+ client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord.toBSON());
+
+ SessionCatalogMigrationSource migrationSource(kNs);
+ migrationSource.init(opCtx());
+ ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
+
+ {
+ ASSERT_TRUE(migrationSource.hasMoreOplog());
+ auto nextOplogResult = migrationSource.getLastFetchedOplog();
+ ASSERT_FALSE(nextOplogResult.shouldWaitForMajority);
+ // Cannot compare directly because of SERVER-31356
+ ASSERT_BSONOBJ_EQ(entry.toBSON(), nextOplogResult.oplog->toBSON());
+ ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
+ }
+
+ {
+ ASSERT_TRUE(migrationSource.hasMoreOplog());
+ auto nextOplogResult = migrationSource.getLastFetchedOplog();
+ ASSERT_FALSE(nextOplogResult.shouldWaitForMajority);
+
+ auto oplog = *nextOplogResult.oplog;
+ ASSERT_TRUE(oplog.getObject2());
+ ASSERT_BSONOBJ_EQ(Session::kDeadEndSentinel, *oplog.getObject2());
+ ASSERT_TRUE(oplog.getStatementId());
+ ASSERT_EQ(kIncompleteHistoryStmtId, *oplog.getStatementId());
+
+ auto sessionInfo = oplog.getOperationSessionInfo();
+ ASSERT_TRUE(sessionInfo.getSessionId());
+ ASSERT_EQ(sessionId, *sessionInfo.getSessionId());
+ ASSERT_TRUE(sessionInfo.getTxnNumber());
+ ASSERT_EQ(31, *sessionInfo.getTxnNumber());
+ }
+
+ ASSERT_FALSE(migrationSource.fetchNextOplog(opCtx()));
+ ASSERT_FALSE(migrationSource.hasMoreOplog());
+}
+
+TEST_F(SessionCatalogMigrationSourceTest, ShouldAssertWhenRollbackDetected) {
+ const NamespaceString kNs("a.b");
+
+ repl::OplogEntry entry(
+ repl::OpTime(Timestamp(52, 345), 2), 0, repl::OpTypeEnum::kInsert, kNs, BSON("x" << 30));
+ entry.setPrevWriteOpTimeInTransaction(repl::OpTime(Timestamp(40, 1), 2));
+ entry.setStatementId(0);
+ insertOplogEntry(entry);
+
+ const auto sessionId = makeLogicalSessionIdForTest();
+
+ SessionTxnRecord sessionRecord;
+ sessionRecord.setSessionId(sessionId);
+ sessionRecord.setTxnNum(31);
+ sessionRecord.setLastWriteOpTime(entry.getOpTime());
+
+ DBDirectClient client(opCtx());
+ client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), sessionRecord.toBSON());
+
+ SessionCatalogMigrationSource migrationSource(kNs);
+ migrationSource.init(opCtx());
+ ASSERT_TRUE(migrationSource.fetchNextOplog(opCtx()));
+
+ {
+ ASSERT_TRUE(migrationSource.hasMoreOplog());
+ auto nextOplogResult = migrationSource.getLastFetchedOplog();
+ ASSERT_FALSE(nextOplogResult.shouldWaitForMajority);
+ // Cannot compare directly because of SERVER-31356
+ ASSERT_BSONOBJ_EQ(entry.toBSON(), nextOplogResult.oplog->toBSON());
+ }
+
+ ASSERT_OK(repl::ReplicationProcess::get(opCtx())->incrementRollbackID(opCtx()));
+
+ ASSERT_THROWS(migrationSource.fetchNextOplog(opCtx()), AssertionException);
+ ASSERT_TRUE(migrationSource.hasMoreOplog());
+}
+
} // namespace
} // namespace mongo
diff --git a/src/mongo/db/session.cpp b/src/mongo/db/session.cpp
index 7c66713b13c..8acfb4cb2d0 100644
--- a/src/mongo/db/session.cpp
+++ b/src/mongo/db/session.cpp
@@ -102,6 +102,8 @@ MONGO_FP_DECLARE(onPrimaryTransactionalWrite);
} // namespace
+const BSONObj Session::kDeadEndSentinel(BSON("$incompleteOplogHistory" << 1));
+
Session::Session(LogicalSessionId sessionId) : _sessionId(std::move(sessionId)) {}
void Session::refreshFromStorageIfNeeded(OperationContext* opCtx) {
@@ -131,11 +133,21 @@ void Session::refreshFromStorageIfNeeded(OperationContext* opCtx) {
CommittedStatementTimestampMap activeTxnCommittedStatements;
+ bool hasIncompleteHistory = false;
if (lastWrittenTxnRecord) {
auto it = TransactionHistoryIterator(lastWrittenTxnRecord->getLastWriteOpTime());
while (it.hasNext()) {
const auto entry = it.next(opCtx);
invariant(entry.getStatementId());
+
+ if (*entry.getStatementId() == kIncompleteHistoryStmtId) {
+ // Only the dead end sentinel can have this id for oplog write history.
+ invariant(entry.getObject2());
+ invariant(entry.getObject2()->woCompare(kDeadEndSentinel) == 0);
+ hasIncompleteHistory = true;
+ continue;
+ }
+
const auto insertRes = activeTxnCommittedStatements.emplace(*entry.getStatementId(),
entry.getOpTime());
if (!insertRes.second) {
@@ -152,6 +164,8 @@ void Session::refreshFromStorageIfNeeded(OperationContext* opCtx) {
ul.lock();
+ _hasIncompleteHistory = hasIncompleteHistory;
+
// Protect against concurrent refreshes or invalidations
if (!_isValid && _numInvalidations == numInvalidations) {
_isValid = true;
@@ -232,6 +246,7 @@ void Session::invalidate() {
_activeTxnNumber = kUninitializedTxnNumber;
_activeTxnCommittedStatements.clear();
+ _hasIncompleteHistory = false;
}
repl::OpTime Session::getLastWriteOpTime(TxnNumber txnNumber) const {
@@ -250,7 +265,17 @@ boost::optional<repl::OplogEntry> Session::checkStatementExecuted(OperationConte
StmtId stmtId) const {
const auto stmtTimestamp = [&] {
stdx::lock_guard<stdx::mutex> lg(_mutex);
- return _checkStatementExecuted(lg, txnNumber, stmtId);
+ auto result = _checkStatementExecuted(lg, txnNumber, stmtId);
+
+ if (!result) {
+ uassert(ErrorCodes::IncompleteTransactionHistory,
+ str::stream() << "incomplete history detected for lsid: " << _sessionId.toBSON()
+ << ", txnNum: "
+ << txnNumber,
+ !_hasIncompleteHistory);
+ }
+
+ return result;
}();
if (!stmtTimestamp)
@@ -289,6 +314,7 @@ void Session::_beginTxn(WithLock wl, TxnNumber txnNumber) {
_activeTxnNumber = txnNumber;
_activeTxnCommittedStatements.clear();
+ _hasIncompleteHistory = false;
}
void Session::_checkValid(WithLock) const {
@@ -316,8 +342,9 @@ boost::optional<repl::OpTime> Session::_checkStatementExecuted(WithLock wl,
_checkIsActiveTransaction(wl, txnNumber);
const auto it = _activeTxnCommittedStatements.find(stmtId);
- if (it == _activeTxnCommittedStatements.end())
+ if (it == _activeTxnCommittedStatements.end()) {
return boost::none;
+ }
invariant(_lastWrittenSessionRecord);
invariant(_lastWrittenSessionRecord->getTxnNum() == txnNumber);
@@ -396,6 +423,11 @@ void Session::_registerUpdateCacheOnCommit(OperationContext* opCtx,
if (newTxnNumber == _activeTxnNumber) {
for (const auto stmtId : stmtIdsWritten) {
+ if (stmtId == kIncompleteHistoryStmtId) {
+ _hasIncompleteHistory = true;
+ continue;
+ }
+
const auto insertRes =
_activeTxnCommittedStatements.emplace(stmtId, lastStmtIdWriteOpTime);
if (!insertRes.second) {
diff --git a/src/mongo/db/session.h b/src/mongo/db/session.h
index ffc0b7b59f5..f627ca8468d 100644
--- a/src/mongo/db/session.h
+++ b/src/mongo/db/session.h
@@ -55,6 +55,8 @@ class Session {
MONGO_DISALLOW_COPYING(Session);
public:
+ static const BSONObj kDeadEndSentinel;
+
explicit Session(LogicalSessionId sessionId);
const LogicalSessionId& getSessionId() const {
@@ -169,6 +171,10 @@ private:
// happen during refresh
int _numInvalidations{0};
+ // Set to true if incomplete history is detected. For example, when the oplog to a write was
+ // truncated because it was too old.
+ bool _hasIncompleteHistory{false};
+
// Caches what is known to be the last written transaction record for the session
boost::optional<SessionTxnRecord> _lastWrittenSessionRecord;
diff --git a/src/mongo/db/session_test.cpp b/src/mongo/db/session_test.cpp
index 168232a2c25..15cbcd8ba27 100644
--- a/src/mongo/db/session_test.cpp
+++ b/src/mongo/db/session_test.cpp
@@ -338,5 +338,80 @@ TEST_F(SessionTest, WriteOpCompletedOnPrimaryCommitIgnoresInvalidation) {
ASSERT(session.checkStatementExecuted(opCtx(), txnNum, 0));
}
+TEST_F(SessionTest, ErrorOnlyWhenStmtIdBeingCheckedIsNotInCache) {
+ const auto sessionId = makeLogicalSessionIdForTest();
+ const TxnNumber txnNum = 2;
+
+ OperationSessionInfo osi;
+ osi.setSessionId(sessionId);
+ osi.setTxnNumber(txnNum);
+
+ Session session(sessionId);
+ session.refreshFromStorageIfNeeded(opCtx());
+ session.beginTxn(opCtx(), txnNum);
+
+ auto firstOpTime = ([&]() {
+ AutoGetCollection autoColl(opCtx(), kNss, MODE_IX);
+ WriteUnitOfWork wuow(opCtx());
+
+ auto opTime = repl::logOp(opCtx(),
+ "i",
+ kNss,
+ kUUID,
+ BSON("x" << 1),
+ &Session::kDeadEndSentinel,
+ false,
+ osi,
+ 1,
+ {});
+ session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {1}, opTime);
+ wuow.commit();
+
+ return opTime;
+ })();
+
+ {
+ repl::OplogLink link;
+ link.prevOpTime = firstOpTime;
+
+ AutoGetCollection autoColl(opCtx(), kNss, MODE_IX);
+ WriteUnitOfWork wuow(opCtx());
+
+ auto opTime = repl::logOp(opCtx(),
+ "n",
+ kNss,
+ kUUID,
+ {},
+ &Session::kDeadEndSentinel,
+ false,
+ osi,
+ kIncompleteHistoryStmtId,
+ link);
+
+ session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {kIncompleteHistoryStmtId}, opTime);
+ wuow.commit();
+ }
+
+ {
+ auto oplog = session.checkStatementExecuted(opCtx(), txnNum, 1);
+ ASSERT_TRUE(oplog);
+ ASSERT_EQ(firstOpTime, oplog->getOpTime());
+ }
+
+ ASSERT_THROWS(session.checkStatementExecuted(opCtx(), txnNum, 2), AssertionException);
+
+ // Should have the same behavior after loading state from storage.
+ session.invalidate();
+ session.refreshFromStorageIfNeeded(opCtx());
+
+ {
+ auto oplog = session.checkStatementExecuted(opCtx(), txnNum, 1);
+ ASSERT_TRUE(oplog);
+ ASSERT_EQ(firstOpTime, oplog->getOpTime());
+ }
+
+ ASSERT_THROWS(session.checkStatementExecuted(opCtx(), txnNum, 2), AssertionException);
+}
+
} // namespace
} // namespace mongo