summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Storch <david.storch@mongodb.com>2019-10-24 22:27:47 +0000
committerevergreen <evergreen@mongodb.com>2019-10-24 22:27:47 +0000
commitf22a062b373ccdb5aa9232a2da435918b54c358e (patch)
treef10d356535a1bdccc57b1f51e19f632f5493a20f
parentb4aa65fde0a61a8a8e2f7c439f928215b23c14e0 (diff)
downloadmongo-f22a062b373ccdb5aa9232a2da435918b54c358e.tar.gz
SERVER-43685 Move snapshot id handling above storage API.
This ensures that all storage engines support tracking of snapshot ids, and by extenion, allows the query system to rely on the validity of these ids for all engines.
-rw-r--r--src/mongo/db/concurrency/d_concurrency_test.cpp7
-rw-r--r--src/mongo/db/exec/working_set_common.cpp6
-rw-r--r--src/mongo/db/exec/write_stage_common.cpp7
-rw-r--r--src/mongo/db/storage/SConscript1
-rw-r--r--src/mongo/db/storage/biggie/biggie_recovery_unit.cpp10
-rw-r--r--src/mongo/db/storage/biggie/biggie_recovery_unit.h12
-rw-r--r--src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.cpp6
-rw-r--r--src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.h13
-rw-r--r--src/mongo/db/storage/mobile/mobile_recovery_unit.cpp6
-rw-r--r--src/mongo/db/storage/mobile/mobile_recovery_unit.h13
-rw-r--r--src/mongo/db/storage/recovery_unit.cpp15
-rw-r--r--src/mongo/db/storage/recovery_unit.h29
-rw-r--r--src/mongo/db/storage/recovery_unit_noop.h42
-rw-r--r--src/mongo/db/storage/recovery_unit_test_harness.cpp32
-rw-r--r--src/mongo/db/storage/snapshot.h8
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp32
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h11
17 files changed, 148 insertions, 102 deletions
diff --git a/src/mongo/db/concurrency/d_concurrency_test.cpp b/src/mongo/db/concurrency/d_concurrency_test.cpp
index 5af7a4f5595..92c4a943075 100644
--- a/src/mongo/db/concurrency/d_concurrency_test.cpp
+++ b/src/mongo/db/concurrency/d_concurrency_test.cpp
@@ -2052,11 +2052,12 @@ TEST_F(DConcurrencyTestFixture, CompatibleFirstStress) {
namespace {
class RecoveryUnitMock : public RecoveryUnitNoop {
public:
- virtual void abandonSnapshot() {
+ bool activeTransaction = true;
+
+private:
+ void doAbandonSnapshot() override {
activeTransaction = false;
}
-
- bool activeTransaction = true;
};
} // namespace
diff --git a/src/mongo/db/exec/working_set_common.cpp b/src/mongo/db/exec/working_set_common.cpp
index 571e7a25b4f..fa8198341fa 100644
--- a/src/mongo/db/exec/working_set_common.cpp
+++ b/src/mongo/db/exec/working_set_common.cpp
@@ -66,9 +66,9 @@ bool WorkingSetCommon::fetch(OperationContext* opCtx,
if (member->getState() == WorkingSetMember::RID_AND_IDX) {
for (size_t i = 0; i < member->keyData.size(); i++) {
auto&& memberKey = member->keyData[i];
- // For storage engines that support document-level concurrency, if this key was obtained
- // in the current snapshot, then move on to the next key.
- if (supportsDocLocking() && memberKey.snapshotId == currentSnapshotId) {
+ // If this key was obtained in the current snapshot, then move on to the next key. There
+ // is no way for this key to be inconsistent with the document it points to.
+ if (memberKey.snapshotId == currentSnapshotId) {
continue;
}
diff --git a/src/mongo/db/exec/write_stage_common.cpp b/src/mongo/db/exec/write_stage_common.cpp
index 5d62016cf25..74a3d846a2a 100644
--- a/src/mongo/db/exec/write_stage_common.cpp
+++ b/src/mongo/db/exec/write_stage_common.cpp
@@ -46,13 +46,8 @@ bool ensureStillMatches(const Collection* collection,
const CanonicalQuery* cq) {
// If the snapshot changed, then we have to make sure we have the latest copy of the doc and
// that it still matches.
- //
- // Storage engines that don't support document-level concurrency also do not track snapshot ids.
- // Those storage engines always need to check whether the document still matches, as the
- // document we are planning to delete may have already been deleted or updated during yield.
WorkingSetMember* member = ws->get(id);
- if (!supportsDocLocking() ||
- opCtx->recoveryUnit()->getSnapshotId() != member->doc.snapshotId()) {
+ if (opCtx->recoveryUnit()->getSnapshotId() != member->doc.snapshotId()) {
std::unique_ptr<SeekableRecordCursor> cursor(collection->getCursor(opCtx));
if (!WorkingSetCommon::fetch(opCtx, ws, id, cursor)) {
diff --git a/src/mongo/db/storage/SConscript b/src/mongo/db/storage/SConscript
index 9d7a10e54b0..ad3be3a8604 100644
--- a/src/mongo/db/storage/SConscript
+++ b/src/mongo/db/storage/SConscript
@@ -271,6 +271,7 @@ env.Library(
"$BUILD_DIR/mongo/base",
"$BUILD_DIR/mongo/db/storage/storage_options",
'$BUILD_DIR/mongo/util/fail_point',
+ 'recovery_unit_base',
],
)
diff --git a/src/mongo/db/storage/biggie/biggie_recovery_unit.cpp b/src/mongo/db/storage/biggie/biggie_recovery_unit.cpp
index 4786b7de7c9..d330607e9fa 100644
--- a/src/mongo/db/storage/biggie/biggie_recovery_unit.cpp
+++ b/src/mongo/db/storage/biggie/biggie_recovery_unit.cpp
@@ -53,7 +53,7 @@ void RecoveryUnit::beginUnitOfWork(OperationContext* opCtx) {
_setState(State::kInactiveInUnitOfWork);
}
-void RecoveryUnit::commitUnitOfWork() {
+void RecoveryUnit::doCommitUnitOfWork() {
invariant(_inUnitOfWork(), toString(_getState()));
if (_dirty) {
@@ -86,7 +86,7 @@ void RecoveryUnit::commitUnitOfWork() {
_setState(State::kInactive);
}
-void RecoveryUnit::abortUnitOfWork() {
+void RecoveryUnit::doAbortUnitOfWork() {
invariant(_inUnitOfWork(), toString(_getState()));
_abort();
}
@@ -96,16 +96,12 @@ bool RecoveryUnit::waitUntilDurable(OperationContext* opCtx) {
return true; // This is an in-memory storage engine.
}
-void RecoveryUnit::abandonSnapshot() {
+void RecoveryUnit::doAbandonSnapshot() {
invariant(!_inUnitOfWork(), toString(_getState()));
_forked = false;
_dirty = false;
}
-SnapshotId RecoveryUnit::getSnapshotId() const {
- return SnapshotId();
-}
-
bool RecoveryUnit::forkIfNeeded() {
if (_forked)
return false;
diff --git a/src/mongo/db/storage/biggie/biggie_recovery_unit.h b/src/mongo/db/storage/biggie/biggie_recovery_unit.h
index 13a61067955..9b4a3bb7e3e 100644
--- a/src/mongo/db/storage/biggie/biggie_recovery_unit.h
+++ b/src/mongo/db/storage/biggie/biggie_recovery_unit.h
@@ -46,8 +46,6 @@ public:
~RecoveryUnit();
void beginUnitOfWork(OperationContext* opCtx) override final;
- void commitUnitOfWork() override final;
- void abortUnitOfWork() override final;
bool inActiveTxn() const {
return _inUnitOfWork();
@@ -55,10 +53,6 @@ public:
virtual bool waitUntilDurable(OperationContext* opCtx) override;
- virtual void abandonSnapshot() override;
-
- virtual SnapshotId getSnapshotId() const override;
-
virtual void setOrderedCommit(bool orderedCommit) override;
// Biggie specific function declarations below.
@@ -80,6 +74,12 @@ public:
static RecoveryUnit* get(OperationContext* opCtx);
private:
+ void doCommitUnitOfWork() override final;
+
+ void doAbortUnitOfWork() override final;
+
+ void doAbandonSnapshot() override final;
+
void _abort();
std::function<void()> _waitUntilDurableCallback;
diff --git a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.cpp b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.cpp
index dd819c9ad48..8d1ac9735a7 100644
--- a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.cpp
+++ b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.cpp
@@ -47,7 +47,7 @@ void EphemeralForTestRecoveryUnit::beginUnitOfWork(OperationContext* opCtx) {
_setState(State::kInactiveInUnitOfWork);
}
-void EphemeralForTestRecoveryUnit::commitUnitOfWork() {
+void EphemeralForTestRecoveryUnit::doCommitUnitOfWork() {
invariant(_inUnitOfWork(), toString(_getState()));
_setState(State::kCommitting);
@@ -70,7 +70,7 @@ void EphemeralForTestRecoveryUnit::commitUnitOfWork() {
_setState(State::kInactive);
}
-void EphemeralForTestRecoveryUnit::abortUnitOfWork() {
+void EphemeralForTestRecoveryUnit::doAbortUnitOfWork() {
invariant(_inUnitOfWork(), toString(_getState()));
_setState(State::kAborting);
@@ -100,7 +100,7 @@ bool EphemeralForTestRecoveryUnit::inActiveTxn() const {
return _inUnitOfWork();
}
-void EphemeralForTestRecoveryUnit::abandonSnapshot() {
+void EphemeralForTestRecoveryUnit::doAbandonSnapshot() {
invariant(!_inUnitOfWork(), toString(_getState()));
}
diff --git a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.h b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.h
index 02e1727c44c..568a8fd19a5 100644
--- a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.h
+++ b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_recovery_unit.h
@@ -47,23 +47,15 @@ public:
virtual ~EphemeralForTestRecoveryUnit();
void beginUnitOfWork(OperationContext* opCtx) final;
- void commitUnitOfWork() final;
- void abortUnitOfWork() final;
virtual bool waitUntilDurable(OperationContext* opCtx);
bool inActiveTxn() const;
- virtual void abandonSnapshot();
-
Status obtainMajorityCommittedSnapshot() final;
virtual void registerChange(std::unique_ptr<Change> change);
- virtual SnapshotId getSnapshotId() const {
- return SnapshotId();
- }
-
virtual void setOrderedCommit(bool orderedCommit) {}
virtual void prepareUnitOfWork() override {}
@@ -89,6 +81,11 @@ public:
}
private:
+ void doCommitUnitOfWork() final;
+ void doAbortUnitOfWork() final;
+
+ void doAbandonSnapshot() final;
+
typedef std::vector<std::shared_ptr<Change>> Changes;
Changes _changes;
diff --git a/src/mongo/db/storage/mobile/mobile_recovery_unit.cpp b/src/mongo/db/storage/mobile/mobile_recovery_unit.cpp
index 05141f78c11..3cc2433870b 100644
--- a/src/mongo/db/storage/mobile/mobile_recovery_unit.cpp
+++ b/src/mongo/db/storage/mobile/mobile_recovery_unit.cpp
@@ -99,7 +99,7 @@ void MobileRecoveryUnit::beginUnitOfWork(OperationContext* opCtx) {
_txnOpen(opCtx, false);
}
-void MobileRecoveryUnit::commitUnitOfWork() {
+void MobileRecoveryUnit::doCommitUnitOfWork() {
invariant(_inUnitOfWork(), toString(_getState()));
RECOVERY_UNIT_TRACE() << "Unit of work commited, marked inactive.";
@@ -107,7 +107,7 @@ void MobileRecoveryUnit::commitUnitOfWork() {
_commit();
}
-void MobileRecoveryUnit::abortUnitOfWork() {
+void MobileRecoveryUnit::doAbortUnitOfWork() {
invariant(_inUnitOfWork(), toString(_getState()));
RECOVERY_UNIT_TRACE() << "Unit of work aborted, marked inactive.";
@@ -149,7 +149,7 @@ bool MobileRecoveryUnit::waitUntilDurable(OperationContext* opCtx) {
return true;
}
-void MobileRecoveryUnit::abandonSnapshot() {
+void MobileRecoveryUnit::doAbandonSnapshot() {
invariant(!_inUnitOfWork(), toString(_getState()));
if (_isActive()) {
// We can't be in a WriteUnitOfWork, so it is safe to rollback.
diff --git a/src/mongo/db/storage/mobile/mobile_recovery_unit.h b/src/mongo/db/storage/mobile/mobile_recovery_unit.h
index 163d2354837..6304cde8a5f 100644
--- a/src/mongo/db/storage/mobile/mobile_recovery_unit.h
+++ b/src/mongo/db/storage/mobile/mobile_recovery_unit.h
@@ -52,16 +52,8 @@ public:
virtual ~MobileRecoveryUnit();
void beginUnitOfWork(OperationContext* opCtx) override;
- void commitUnitOfWork() override;
- void abortUnitOfWork() override;
bool waitUntilDurable(OperationContext* opCtx) override;
- void abandonSnapshot() override;
-
- SnapshotId getSnapshotId() const override {
- return SnapshotId();
- }
-
MobileSession* getSession(OperationContext* opCtx, bool readOnly = true);
MobileSession* getSessionNoTxn(OperationContext* opCtx);
@@ -81,6 +73,11 @@ public:
void setOrderedCommit(bool orderedCommit) override {}
private:
+ void doCommitUnitOfWork() override;
+ void doAbortUnitOfWork() override;
+
+ void doAbandonSnapshot() override;
+
void _abort();
void _commit();
diff --git a/src/mongo/db/storage/recovery_unit.cpp b/src/mongo/db/storage/recovery_unit.cpp
index 27f433d0c5f..70471edc713 100644
--- a/src/mongo/db/storage/recovery_unit.cpp
+++ b/src/mongo/db/storage/recovery_unit.cpp
@@ -35,6 +35,21 @@
#include "mongo/util/log.h"
namespace mongo {
+namespace {
+// SnapshotIds need to be globally unique, as they are used in a WorkingSetMember to
+// determine if documents changed, but a different recovery unit may be used across a getMore,
+// so there is a chance the snapshot ID will be reused.
+AtomicWord<unsigned long long> nextSnapshotId{1};
+} // namespace
+
+RecoveryUnit::RecoveryUnit() {
+ assignNextSnapshotId();
+}
+
+void RecoveryUnit::assignNextSnapshotId() {
+ _mySnapshotId = nextSnapshotId.fetchAndAdd(1);
+}
+
void RecoveryUnit::registerChange(std::unique_ptr<Change> change) {
invariant(_inUnitOfWork(), toString(_getState()));
_changes.push_back(std::move(change));
diff --git a/src/mongo/db/storage/recovery_unit.h b/src/mongo/db/storage/recovery_unit.h
index 4a03a9a71a7..d0b2ffca17b 100644
--- a/src/mongo/db/storage/recovery_unit.h
+++ b/src/mongo/db/storage/recovery_unit.h
@@ -132,7 +132,10 @@ public:
*
* Should be called through WriteUnitOfWork rather than directly.
*/
- virtual void commitUnitOfWork() = 0;
+ void commitUnitOfWork() {
+ assignNextSnapshotId();
+ doCommitUnitOfWork();
+ }
/**
* Marks the end of a unit of work and rolls back all changes registered by calls to onRollback
@@ -141,7 +144,10 @@ public:
*
* Should be called through WriteUnitOfWork rather than directly.
*/
- virtual void abortUnitOfWork() = 0;
+ void abortUnitOfWork() {
+ assignNextSnapshotId();
+ doAbortUnitOfWork();
+ }
/**
* Transitions the active unit of work to the "prepared" state. Must be called after
@@ -209,7 +215,10 @@ public:
* If there is an open transaction, it is closed. On return no transaction is active. This
* cannot be called inside of a WriteUnitOfWork, and should fail if it is.
*/
- virtual void abandonSnapshot() = 0;
+ void abandonSnapshot() {
+ assignNextSnapshotId();
+ doAbandonSnapshot();
+ }
/**
* Informs the RecoveryUnit that a snapshot will be needed soon, if one was not already
@@ -264,7 +273,9 @@ public:
*
* This is unrelated to Timestamp which must be globally comparable.
*/
- virtual SnapshotId getSnapshotId() const = 0;
+ SnapshotId getSnapshotId() const {
+ return SnapshotId{_mySnapshotId};
+ }
/**
* Sets a timestamp to assign to future writes in a transaction.
@@ -569,7 +580,7 @@ public:
}
protected:
- RecoveryUnit() {}
+ RecoveryUnit();
/**
* Returns the current state.
@@ -609,9 +620,17 @@ protected:
bool _mustBeTimestamped = false;
private:
+ // Sets the snapshot associated with this RecoveryUnit to a new globally unique id number.
+ void assignNextSnapshotId();
+
+ virtual void doAbandonSnapshot() = 0;
+ virtual void doCommitUnitOfWork() = 0;
+ virtual void doAbortUnitOfWork() = 0;
+
typedef std::vector<std::unique_ptr<Change>> Changes;
Changes _changes;
State _state = State::kInactive;
+ uint64_t _mySnapshotId;
};
} // namespace mongo
diff --git a/src/mongo/db/storage/recovery_unit_noop.h b/src/mongo/db/storage/recovery_unit_noop.h
index c9fee6ca4b2..335021ea8ff 100644
--- a/src/mongo/db/storage/recovery_unit_noop.h
+++ b/src/mongo/db/storage/recovery_unit_noop.h
@@ -41,7 +41,23 @@ class OperationContext;
class RecoveryUnitNoop : public RecoveryUnit {
public:
void beginUnitOfWork(OperationContext* opCtx) final {}
- void commitUnitOfWork() final {
+
+ virtual bool waitUntilDurable(OperationContext* opCtx) {
+ return true;
+ }
+
+ virtual void registerChange(std::unique_ptr<Change> change) {
+ _changes.push_back(std::move(change));
+ }
+
+ virtual void setOrderedCommit(bool orderedCommit) {}
+
+ bool inActiveTxn() const {
+ return false;
+ }
+
+private:
+ void doCommitUnitOfWork() final {
for (auto& change : _changes) {
try {
change->commit(boost::none);
@@ -51,7 +67,8 @@ public:
}
_changes.clear();
}
- void abortUnitOfWork() final {
+
+ void doAbortUnitOfWork() final {
for (auto it = _changes.rbegin(); it != _changes.rend(); ++it) {
try {
(*it)->rollback();
@@ -62,27 +79,8 @@ public:
_changes.clear();
}
- virtual void abandonSnapshot() {}
+ virtual void doAbandonSnapshot() {}
- virtual bool waitUntilDurable(OperationContext* opCtx) {
- return true;
- }
-
- virtual void registerChange(std::unique_ptr<Change> change) {
- _changes.push_back(std::move(change));
- }
-
- virtual SnapshotId getSnapshotId() const {
- return SnapshotId();
- }
-
- virtual void setOrderedCommit(bool orderedCommit) {}
-
- bool inActiveTxn() const {
- return false;
- }
-
-private:
std::vector<std::unique_ptr<Change>> _changes;
};
diff --git a/src/mongo/db/storage/recovery_unit_test_harness.cpp b/src/mongo/db/storage/recovery_unit_test_harness.cpp
index 86382c10785..198dfc0c657 100644
--- a/src/mongo/db/storage/recovery_unit_test_harness.cpp
+++ b/src/mongo/db/storage/recovery_unit_test_harness.cpp
@@ -139,6 +139,38 @@ TEST_F(RecoveryUnitTestHarness, CheckInActiveTxnWithAbort) {
ASSERT_FALSE(ru->inActiveTxn());
}
+TEST_F(RecoveryUnitTestHarness, BeginningUnitOfWorkDoesNotIncrementSnapshotId) {
+ auto snapshotIdBefore = ru->getSnapshotId();
+ ru->beginUnitOfWork(opCtx.get());
+ ASSERT_EQ(snapshotIdBefore, ru->getSnapshotId());
+ ru->abortUnitOfWork();
+}
+
+TEST_F(RecoveryUnitTestHarness, NewlyAllocatedRecoveryUnitHasNewSnapshotId) {
+ auto newRu = harnessHelper->newRecoveryUnit();
+ ASSERT_NE(newRu->getSnapshotId(), ru->getSnapshotId());
+}
+
+TEST_F(RecoveryUnitTestHarness, AbandonSnapshotIncrementsSnapshotId) {
+ auto snapshotIdBefore = ru->getSnapshotId();
+ ru->abandonSnapshot();
+ ASSERT_NE(snapshotIdBefore, ru->getSnapshotId());
+}
+
+TEST_F(RecoveryUnitTestHarness, CommitUnitOfWorkIncrementsSnapshotId) {
+ auto snapshotIdBefore = ru->getSnapshotId();
+ ru->beginUnitOfWork(opCtx.get());
+ ru->commitUnitOfWork();
+ ASSERT_NE(snapshotIdBefore, ru->getSnapshotId());
+}
+
+TEST_F(RecoveryUnitTestHarness, AbortUnitOfWorkIncrementsSnapshotId) {
+ auto snapshotIdBefore = ru->getSnapshotId();
+ ru->beginUnitOfWork(opCtx.get());
+ ru->abortUnitOfWork();
+ ASSERT_NE(snapshotIdBefore, ru->getSnapshotId());
+}
+
DEATH_TEST_F(RecoveryUnitTestHarness, RegisterChangeMustBeInUnitOfWork, "invariant") {
int count = 0;
opCtx->recoveryUnit()->registerChange(std::make_unique<TestChange>(&count));
diff --git a/src/mongo/db/storage/snapshot.h b/src/mongo/db/storage/snapshot.h
index 75870a7791f..bbb9e4580cd 100644
--- a/src/mongo/db/storage/snapshot.h
+++ b/src/mongo/db/storage/snapshot.h
@@ -68,6 +68,14 @@ private:
uint64_t _id;
};
+inline std::ostream& operator<<(std::ostream& stream, const SnapshotId& snapshotId) {
+ return stream << "SnapshotId(" << snapshotId.toNumber() << ")";
+}
+
+inline StringBuilder& operator<<(StringBuilder& stream, const SnapshotId& snapshotId) {
+ return stream << "SnapshotId(" << snapshotId.toNumber() << ")";
+}
+
template <typename T>
class Snapshotted {
public:
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp
index e6abd29ed08..825a82b5e6e 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp
@@ -52,11 +52,6 @@ namespace {
// because the recovery unit may not ever actually be in a prepared state.
MONGO_FAIL_POINT_DEFINE(WTAlwaysNotifyPrepareConflictWaiters);
-// SnapshotIds need to be globally unique, as they are used in a WorkingSetMember to
-// determine if documents changed, but a different recovery unit may be used across a getMore,
-// so there is a chance the snapshot ID will be reused.
-AtomicWord<unsigned long long> nextSnapshotId{1};
-
logger::LogSeverity kSlowTransactionSeverity = logger::LogSeverity::Debug(1);
} // namespace
@@ -164,9 +159,7 @@ WiredTigerRecoveryUnit::WiredTigerRecoveryUnit(WiredTigerSessionCache* sc)
WiredTigerRecoveryUnit::WiredTigerRecoveryUnit(WiredTigerSessionCache* sc,
WiredTigerOplogManager* oplogManager)
- : _sessionCache(sc),
- _oplogManager(oplogManager),
- _mySnapshotId(nextSnapshotId.fetchAndAdd(1)) {}
+ : _sessionCache(sc), _oplogManager(oplogManager) {}
WiredTigerRecoveryUnit::~WiredTigerRecoveryUnit() {
invariant(!_inUnitOfWork(), toString(_getState()));
@@ -235,12 +228,12 @@ void WiredTigerRecoveryUnit::prepareUnitOfWork() {
invariantWTOK(s->prepare_transaction(s, conf.c_str()));
}
-void WiredTigerRecoveryUnit::commitUnitOfWork() {
+void WiredTigerRecoveryUnit::doCommitUnitOfWork() {
invariant(_inUnitOfWork(), toString(_getState()));
_commit();
}
-void WiredTigerRecoveryUnit::abortUnitOfWork() {
+void WiredTigerRecoveryUnit::doAbortUnitOfWork() {
invariant(_inUnitOfWork(), toString(_getState()));
_abort();
}
@@ -305,7 +298,7 @@ WiredTigerSession* WiredTigerRecoveryUnit::getSessionNoTxn() {
return session;
}
-void WiredTigerRecoveryUnit::abandonSnapshot() {
+void WiredTigerRecoveryUnit::doAbandonSnapshot() {
invariant(!_inUnitOfWork(), toString(_getState()));
if (_isActive()) {
// Can't be in a WriteUnitOfWork, so safe to rollback
@@ -327,8 +320,9 @@ void WiredTigerRecoveryUnit::_txnClose(bool commit) {
// `serverGlobalParams.slowMs` can be set to values <= 0. In those cases, give logging a
// break.
if (transactionTime >= std::max(1, serverGlobalParams.slowMS)) {
- LOG(kSlowTransactionSeverity) << "Slow WT transaction. Lifetime of SnapshotId "
- << _mySnapshotId << " was " << transactionTime << "ms";
+ LOG(kSlowTransactionSeverity)
+ << "Slow WT transaction. Lifetime of SnapshotId " << getSnapshotId().toNumber()
+ << " was " << transactionTime << "ms";
}
}
@@ -353,11 +347,11 @@ void WiredTigerRecoveryUnit::_txnClose(bool commit) {
}
wtRet = s->commit_transaction(s, conf.str().c_str());
- LOG(3) << "WT commit_transaction for snapshot id " << _mySnapshotId;
+ LOG(3) << "WT commit_transaction for snapshot id " << getSnapshotId().toNumber();
} else {
wtRet = s->rollback_transaction(s, nullptr);
invariant(!wtRet);
- LOG(3) << "WT rollback_transaction for snapshot id " << _mySnapshotId;
+ LOG(3) << "WT rollback_transaction for snapshot id " << getSnapshotId().toNumber();
}
if (_isTimestamped) {
@@ -385,18 +379,12 @@ void WiredTigerRecoveryUnit::_txnClose(bool commit) {
_prepareTimestamp = Timestamp();
_durableTimestamp = Timestamp();
_roundUpPreparedTimestamps = RoundUpPreparedTimestamps::kNoRound;
- _mySnapshotId = nextSnapshotId.fetchAndAdd(1);
_isOplogReader = false;
_oplogVisibleTs = boost::none;
_orderedCommit = true; // Default value is true; we assume all writes are ordered.
_mustBeTimestamped = false;
}
-SnapshotId WiredTigerRecoveryUnit::getSnapshotId() const {
- // TODO: use actual wiredtiger txn id
- return SnapshotId(_mySnapshotId);
-}
-
Status WiredTigerRecoveryUnit::obtainMajorityCommittedSnapshot() {
invariant(_timestampReadSource == ReadSource::kMajorityCommitted);
auto snapshotName = _sessionCache->snapshotManager().getMinSnapshotForNextCommittedRead();
@@ -538,7 +526,7 @@ void WiredTigerRecoveryUnit::_txnOpen() {
}
}
- LOG(3) << "WT begin_transaction for snapshot id " << _mySnapshotId;
+ LOG(3) << "WT begin_transaction for snapshot id " << getSnapshotId().toNumber();
}
Timestamp WiredTigerRecoveryUnit::_beginTransactionAtAllDurableTimestamp(WT_SESSION* session) {
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h
index f456dca4b7a..3217be3f8f5 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h
@@ -104,23 +104,18 @@ public:
void beginUnitOfWork(OperationContext* opCtx) override;
void prepareUnitOfWork() override;
- void commitUnitOfWork() override;
- void abortUnitOfWork() override;
bool waitUntilDurable(OperationContext* opCtx) override;
bool waitUntilUnjournaledWritesDurable(OperationContext* opCtx,
bool stableCheckpoint = true) override;
- void abandonSnapshot() override;
void preallocateSnapshot() override;
Status obtainMajorityCommittedSnapshot() override;
boost::optional<Timestamp> getPointInTimeReadTimestamp() override;
- SnapshotId getSnapshotId() const override;
-
Status setTimestamp(Timestamp timestamp) override;
void setCommitTimestamp(Timestamp timestamp) override;
@@ -205,6 +200,11 @@ public:
static void appendGlobalStats(BSONObjBuilder& b);
private:
+ void doCommitUnitOfWork() override;
+ void doAbortUnitOfWork() override;
+
+ void doAbandonSnapshot() override;
+
void _abort();
void _commit();
@@ -252,7 +252,6 @@ private:
Timestamp _durableTimestamp;
Timestamp _prepareTimestamp;
boost::optional<Timestamp> _lastTimestampSet;
- uint64_t _mySnapshotId;
Timestamp _majorityCommittedSnapshot;
Timestamp _readAtTimestamp;
std::unique_ptr<Timer> _timer;