summaryrefslogtreecommitdiff
path: root/src/mongo/db/s/session_catalog_migration_source.h
diff options
context:
space:
mode:
authorRandolph Tan <randolph@10gen.com>2017-10-11 13:32:15 -0400
committerRandolph Tan <randolph@10gen.com>2017-10-25 01:20:09 -0400
commit67f735e6705091659e2a8cf46a9285f09bcf749a (patch)
treeffcaa27ecec2babe0c2dba0452a6866a5a3ed9f3 /src/mongo/db/s/session_catalog_migration_source.h
parent5b9b9a9f04b06109b77b5522f7318c366deecf6f (diff)
downloadmongo-67f735e6705091659e2a8cf46a9285f09bcf749a.tar.gz
SERVER-30880 Handle migration of sessions with incomplete history
Diffstat (limited to 'src/mongo/db/s/session_catalog_migration_source.h')
-rw-r--r--src/mongo/db/s/session_catalog_migration_source.h47
1 files changed, 40 insertions, 7 deletions
diff --git a/src/mongo/db/s/session_catalog_migration_source.h b/src/mongo/db/s/session_catalog_migration_source.h
index 0133281df07..2e5faf393c2 100644
--- a/src/mongo/db/s/session_catalog_migration_source.h
+++ b/src/mongo/db/s/session_catalog_migration_source.h
@@ -35,6 +35,7 @@
#include "mongo/client/dbclientcursor.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/repl/optime.h"
+#include "mongo/db/session_txn_record_gen.h"
#include "mongo/db/transaction_history_iterator.h"
#include "mongo/stdx/mutex.h"
#include "mongo/util/concurrency/with_lock.h"
@@ -111,6 +112,36 @@ public:
void notifyNewWriteOpTime(repl::OpTime opTimestamp);
private:
+ /**
+ * An iterator for extracting session write oplogs that need to be cloned during migration.
+ */
+ class SessionOplogIterator {
+ public:
+ SessionOplogIterator(SessionTxnRecord txnRecord, int expectedRollbackId);
+
+ /**
+ * Returns true if there are more oplog entries to fetch for this session.
+ */
+ bool hasNext() const;
+
+ /**
+ * Returns the next oplog write that happened in this session. If the oplog is lost
+ * because the oplog rolled over, this will return a sentinel oplog entry instead with
+ * type 'n' and o2 field set to Session::kDeadEndSentinel. This will also mean that
+ * next subsequent calls to hasNext will return false.
+ */
+ repl::OplogEntry getNext(OperationContext* opCtx);
+
+ BSONObj toBSON() const {
+ return _record.toBSON();
+ }
+
+ private:
+ const SessionTxnRecord _record;
+ const int _initialRollbackId;
+ std::unique_ptr<TransactionHistoryIterator> _writeHistoryIterator;
+ };
+
///////////////////////////////////////////////////////////////////////////
// Methods for extracting the oplog entries from session information.
@@ -161,18 +192,20 @@ private:
const NamespaceString _ns;
- // Protects _alreadyInitialized, _sessionCatalogCursor, _writeHistoryIterator
- // _lastFetchedOplogBuffer, _lastFetchedOplog
+ // Protects _alreadyInitialized, _sessionCatalogCursor, _sessionOplogIterators
+ // _currentOplogIterator, _lastFetchedOplogBuffer, _lastFetchedOplog
stdx::mutex _sessionCloneMutex;
-
bool _alreadyInitialized = false;
- std::set<repl::OpTime> _sessionLastWriteOpTimes;
+ int _rollbackIdAtInit = 0;
+
+ // List of remaining session records that needs to be cloned.
+ std::vector<std::unique_ptr<SessionOplogIterator>> _sessionOplogIterators;
- // Iterator for oplog entries for a specific transaction.
- std::unique_ptr<TransactionHistoryIterator> _writeHistoryIterator;
+ // Points to the current session record eing cloned.
+ std::unique_ptr<SessionOplogIterator> _currentOplogIterator;
- // Used for temporarily storing oplog entries for operations that has more than one entry.
+ // Used for temporarily storng oplog entries for operations that has more than one entry.
// For example, findAndModify generates one for the actual operation and another for the
// pre/post image.
std::vector<repl::OplogEntry> _lastFetchedOplogBuffer;