summaryrefslogtreecommitdiff
path: root/src/mongo/db/s/session_catalog_migration_source.h
diff options
context:
space:
mode:
authorCheahuychou Mao <mao.cheahuychou@gmail.com>2022-03-18 23:52:31 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-03-19 00:24:26 +0000
commitc141ef8536d51f05a6fa4017de20286d154d09e1 (patch)
tree022bd1ffe70d91657498a2237ca8e49ed12de74b /src/mongo/db/s/session_catalog_migration_source.h
parent6f2f79447a83f1cccb208d815e049699c8f86fbe (diff)
downloadmongo-c141ef8536d51f05a6fa4017de20286d154d09e1.tar.gz
SERVER-63494 Transfer history for retryable transactions with more than one oplog entry across migrations
Diffstat (limited to 'src/mongo/db/s/session_catalog_migration_source.h')
-rw-r--r--src/mongo/db/s/session_catalog_migration_source.h53
1 files changed, 42 insertions, 11 deletions
diff --git a/src/mongo/db/s/session_catalog_migration_source.h b/src/mongo/db/s/session_catalog_migration_source.h
index 3603ddfb055..f3b87baf7fe 100644
--- a/src/mongo/db/s/session_catalog_migration_source.h
+++ b/src/mongo/db/s/session_catalog_migration_source.h
@@ -177,6 +177,8 @@ private:
*/
class SessionOplogIterator {
public:
+ enum class EntryType { kRetryableWrite, kNonRetryableTransaction, kRetryableTransaction };
+
SessionOplogIterator(SessionTxnRecord txnRecord, int expectedRollbackId);
/**
@@ -199,6 +201,8 @@ private:
private:
const SessionTxnRecord _record;
const int _initialRollbackId;
+ const EntryType _entryType;
+
std::unique_ptr<TransactionHistoryIterator> _writeHistoryIterator;
};
@@ -242,6 +246,23 @@ private:
*/
bool _fetchNextNewWriteOplog(OperationContext* opCtx);
+ /**
+ * Same as notifyNewWriteOpTime but must be called while holding the _newOplogMutex.
+ */
+ void _notifyNewWriteOpTime(WithLock,
+ repl::OpTime opTimestamp,
+ EntryAtOpTimeType entryAtOpTimeType);
+
+ /*
+ * Derives retryable write oplog entries from the given retryable internal transaction applyOps
+ * oplog entry, and adds the ones that are related to the migration the given oplog buffer. Must
+ * be called while holding the mutex that protects the buffer.
+ */
+ void _extractOplogEntriesForInternalTransactionForRetryableWrite(
+ WithLock,
+ const repl::OplogEntry& applyOplogEntry,
+ std::vector<repl::OplogEntry>* oplogBuffer);
+
// Namespace for which the migration is happening
const NamespaceString _ns;
@@ -252,26 +273,31 @@ private:
const ChunkRange _chunkRange;
const ShardKeyPattern _keyPattern;
- // Protects _sessionCatalogCursor, _sessionOplogIterators, _currentOplogIterator,
- // _lastFetchedOplogBuffer, _lastFetchedOplog
+ // Protects _sessionOplogIterators, _currentOplogIterator, _lastFetchedOplog,
+ // _lastFetchedOplogImage and _unprocessedOplogBuffer.
Mutex _sessionCloneMutex =
MONGO_MAKE_LATCH("SessionCatalogMigrationSource::_sessionCloneMutex");
// List of remaining session records that needs to be cloned.
std::vector<std::unique_ptr<SessionOplogIterator>> _sessionOplogIterators;
- // Points to the current session record eing cloned.
+ // Points to the current session record being cloned.
std::unique_ptr<SessionOplogIterator> _currentOplogIterator;
- // Used for temporarily storng oplog entries for operations that has more than one entry.
- // For example, findAndModify generates one for the actual operation and another for the
- // pre/post image.
- std::vector<repl::OplogEntry> _lastFetchedOplogBuffer;
- // Used to store the last fetched oplog. This enables calling get multiple times.
+ // Used to store the last fetched and processed oplog entry from _currentOplogIterator. This
+ // enables calling get() multiple times.
boost::optional<repl::OplogEntry> _lastFetchedOplog;
- // Protects _newWriteTsList, _lastFetchedNewWriteOplog, _state, _newOplogNotification
+ // Used to store the pre/post image for _lastFetchedNewWriteOplog if there is one.
+ boost::optional<repl::OplogEntry> _lastFetchedOplogImage;
+
+ // Used to store the last fetched oplog entries from _currentOplogIterator that have not been
+ // processed.
+ std::vector<repl::OplogEntry> _unprocessedOplogBuffer;
+
+ // Protects _newWriteOpTimeList, _lastFetchedNewWriteOplog, _lastFetchedNewWriteOplogImage,
+ // _unprocessedNewWriteOplogBuffer, _state, _newOplogNotification.
Mutex _newOplogMutex = MONGO_MAKE_LATCH("SessionCatalogMigrationSource::_newOplogMutex");
// The average size of documents in config.transactions.
@@ -280,12 +306,17 @@ private:
// Stores oplog opTime of new writes that are coming in.
std::list<std::pair<repl::OpTime, EntryAtOpTimeType>> _newWriteOpTimeList;
- // Used to store the last fetched oplog from _newWriteTsList.
+ // Used to store the last fetched and processed oplog entry from _newWriteOpTimeList. This
+ // enables calling get() multiple times.
boost::optional<repl::OplogEntry> _lastFetchedNewWriteOplog;
- // Used to store an image for `_lastFetchedNewWriteOplog` if there is one.
+ // Used to store the pre/post image oplog entry when _lastFetchedNewWriteOplog if there is one.
boost::optional<repl::OplogEntry> _lastFetchedNewWriteOplogImage;
+ // Used to store the last fetched oplog entries from _newWriteOpTimeList that have not been
+ // processed.
+ std::vector<repl::OplogEntry> _unprocessedNewWriteOplogBuffer;
+
// Stores the current state.
State _state{State::kActive};