summaryrefslogtreecommitdiff
path: root/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp')
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp326
1 files changed, 166 insertions, 160 deletions
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp
index 6a33378a070..a88754cf992 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp
@@ -143,7 +143,7 @@ private:
OplogStones* _oplogStones;
};
-WiredTigerRecordStore::OplogStones::OplogStones(OperationContext* txn, WiredTigerRecordStore* rs)
+WiredTigerRecordStore::OplogStones::OplogStones(OperationContext* opCtx, WiredTigerRecordStore* rs)
: _rs(rs) {
stdx::lock_guard<stdx::mutex> lk(_mutex);
@@ -159,7 +159,7 @@ WiredTigerRecordStore::OplogStones::OplogStones(OperationContext* txn, WiredTige
_minBytesPerStone = maxSize / _numStonesToKeep;
invariant(_minBytesPerStone > 0);
- _calculateStones(txn);
+ _calculateStones(opCtx);
_pokeReclaimThreadIfNeeded(); // Reclaim stones if over the limit.
}
@@ -227,13 +227,16 @@ void WiredTigerRecordStore::OplogStones::createNewStoneIfNeeded(RecordId lastRec
}
void WiredTigerRecordStore::OplogStones::updateCurrentStoneAfterInsertOnCommit(
- OperationContext* txn, int64_t bytesInserted, RecordId highestInserted, int64_t countInserted) {
- txn->recoveryUnit()->registerChange(
+ OperationContext* opCtx,
+ int64_t bytesInserted,
+ RecordId highestInserted,
+ int64_t countInserted) {
+ opCtx->recoveryUnit()->registerChange(
new InsertChange(this, bytesInserted, highestInserted, countInserted));
}
-void WiredTigerRecordStore::OplogStones::clearStonesOnCommit(OperationContext* txn) {
- txn->recoveryUnit()->registerChange(new TruncateChange(this));
+void WiredTigerRecordStore::OplogStones::clearStonesOnCommit(OperationContext* opCtx) {
+ opCtx->recoveryUnit()->registerChange(new TruncateChange(this));
}
void WiredTigerRecordStore::OplogStones::updateStonesAfterCappedTruncateAfter(
@@ -285,9 +288,9 @@ void WiredTigerRecordStore::OplogStones::setNumStonesToKeep(size_t numStones) {
_numStonesToKeep = numStones;
}
-void WiredTigerRecordStore::OplogStones::_calculateStones(OperationContext* txn) {
- long long numRecords = _rs->numRecords(txn);
- long long dataSize = _rs->dataSize(txn);
+void WiredTigerRecordStore::OplogStones::_calculateStones(OperationContext* opCtx) {
+ long long numRecords = _rs->numRecords(opCtx);
+ long long dataSize = _rs->dataSize(opCtx);
log() << "The size storer reports that the oplog contains " << numRecords
<< " records totaling to " << dataSize << " bytes";
@@ -301,7 +304,7 @@ void WiredTigerRecordStore::OplogStones::_calculateStones(OperationContext* txn)
if (numRecords <= 0 || dataSize <= 0 ||
uint64_t(numRecords) <
kMinSampleRatioForRandCursor * kRandomSamplesPerStone * _numStonesToKeep) {
- _calculateStonesByScanning(txn);
+ _calculateStonesByScanning(opCtx);
return;
}
@@ -311,16 +314,16 @@ void WiredTigerRecordStore::OplogStones::_calculateStones(OperationContext* txn)
double estRecordsPerStone = std::ceil(_minBytesPerStone / avgRecordSize);
double estBytesPerStone = estRecordsPerStone * avgRecordSize;
- _calculateStonesBySampling(txn, int64_t(estRecordsPerStone), int64_t(estBytesPerStone));
+ _calculateStonesBySampling(opCtx, int64_t(estRecordsPerStone), int64_t(estBytesPerStone));
}
-void WiredTigerRecordStore::OplogStones::_calculateStonesByScanning(OperationContext* txn) {
+void WiredTigerRecordStore::OplogStones::_calculateStonesByScanning(OperationContext* opCtx) {
log() << "Scanning the oplog to determine where to place markers for truncation";
long long numRecords = 0;
long long dataSize = 0;
- auto cursor = _rs->getCursor(txn, true);
+ auto cursor = _rs->getCursor(opCtx, true);
while (auto record = cursor->next()) {
_currentRecords.addAndFetch(1);
int64_t newCurrentBytes = _currentBytes.addAndFetch(record->data.size());
@@ -336,10 +339,10 @@ void WiredTigerRecordStore::OplogStones::_calculateStonesByScanning(OperationCon
dataSize += record->data.size();
}
- _rs->updateStatsAfterRepair(txn, numRecords, dataSize);
+ _rs->updateStatsAfterRepair(opCtx, numRecords, dataSize);
}
-void WiredTigerRecordStore::OplogStones::_calculateStonesBySampling(OperationContext* txn,
+void WiredTigerRecordStore::OplogStones::_calculateStonesBySampling(OperationContext* opCtx,
int64_t estRecordsPerStone,
int64_t estBytesPerStone) {
Timestamp earliestOpTime;
@@ -347,13 +350,13 @@ void WiredTigerRecordStore::OplogStones::_calculateStonesBySampling(OperationCon
{
const bool forward = true;
- auto cursor = _rs->getCursor(txn, forward);
+ auto cursor = _rs->getCursor(opCtx, forward);
auto record = cursor->next();
if (!record) {
// This shouldn't really happen unless the size storer values are far off from reality.
// The collection is probably empty, but fall back to scanning the oplog just in case.
log() << "Failed to determine the earliest optime, falling back to scanning the oplog";
- _calculateStonesByScanning(txn);
+ _calculateStonesByScanning(opCtx);
return;
}
earliestOpTime = Timestamp(record->id.repr());
@@ -361,13 +364,13 @@ void WiredTigerRecordStore::OplogStones::_calculateStonesBySampling(OperationCon
{
const bool forward = false;
- auto cursor = _rs->getCursor(txn, forward);
+ auto cursor = _rs->getCursor(opCtx, forward);
auto record = cursor->next();
if (!record) {
// This shouldn't really happen unless the size storer values are far off from reality.
// The collection is probably empty, but fall back to scanning the oplog just in case.
log() << "Failed to determine the latest optime, falling back to scanning the oplog";
- _calculateStonesByScanning(txn);
+ _calculateStonesByScanning(opCtx);
return;
}
latestOpTime = Timestamp(record->id.repr());
@@ -376,8 +379,8 @@ void WiredTigerRecordStore::OplogStones::_calculateStonesBySampling(OperationCon
log() << "Sampling from the oplog between " << earliestOpTime.toStringPretty() << " and "
<< latestOpTime.toStringPretty() << " to determine where to place markers for truncation";
- int64_t wholeStones = _rs->numRecords(txn) / estRecordsPerStone;
- int64_t numSamples = kRandomSamplesPerStone * _rs->numRecords(txn) / estRecordsPerStone;
+ int64_t wholeStones = _rs->numRecords(opCtx) / estRecordsPerStone;
+ int64_t numSamples = kRandomSamplesPerStone * _rs->numRecords(opCtx) / estRecordsPerStone;
log() << "Taking " << numSamples << " samples and assuming that each section of oplog contains"
<< " approximately " << estRecordsPerStone << " records totaling to " << estBytesPerStone
@@ -391,7 +394,7 @@ void WiredTigerRecordStore::OplogStones::_calculateStonesBySampling(OperationCon
// approximately 'estRecordsPerStone'. Do so by oversampling the oplog, sorting the samples in
// order of their RecordId, and then choosing the samples expected to be near the right edge of
// each logical section.
- auto cursor = _rs->getRandomCursorWithOptions(txn, extraConfig);
+ auto cursor = _rs->getRandomCursorWithOptions(opCtx, extraConfig);
std::vector<RecordId> oplogEstimates;
for (int i = 0; i < numSamples; ++i) {
auto record = cursor->next();
@@ -399,7 +402,7 @@ void WiredTigerRecordStore::OplogStones::_calculateStonesBySampling(OperationCon
// This shouldn't really happen unless the size storer values are far off from reality.
// The collection is probably empty, but fall back to scanning the oplog just in case.
log() << "Failed to get enough random samples, falling back to scanning the oplog";
- _calculateStonesByScanning(txn);
+ _calculateStonesByScanning(opCtx);
return;
}
oplogEstimates.push_back(record->id);
@@ -418,8 +421,8 @@ void WiredTigerRecordStore::OplogStones::_calculateStonesBySampling(OperationCon
}
// Account for the partially filled chunk.
- _currentRecords.store(_rs->numRecords(txn) - estRecordsPerStone * wholeStones);
- _currentBytes.store(_rs->dataSize(txn) - estBytesPerStone * wholeStones);
+ _currentRecords.store(_rs->numRecords(opCtx) - estRecordsPerStone * wholeStones);
+ _currentBytes.store(_rs->dataSize(opCtx) - estBytesPerStone * wholeStones);
}
void WiredTigerRecordStore::OplogStones::_pokeReclaimThreadIfNeeded() {
@@ -430,12 +433,12 @@ void WiredTigerRecordStore::OplogStones::_pokeReclaimThreadIfNeeded() {
class WiredTigerRecordStore::Cursor final : public SeekableRecordCursor {
public:
- Cursor(OperationContext* txn, const WiredTigerRecordStore& rs, bool forward = true)
+ Cursor(OperationContext* opCtx, const WiredTigerRecordStore& rs, bool forward = true)
: _rs(rs),
- _txn(txn),
+ _opCtx(opCtx),
_forward(forward),
- _readUntilForOplog(WiredTigerRecoveryUnit::get(txn)->getOplogReadTill()) {
- _cursor.emplace(rs.getURI(), rs.tableId(), true, txn);
+ _readUntilForOplog(WiredTigerRecoveryUnit::get(opCtx)->getOplogReadTill()) {
+ _cursor.emplace(rs.getURI(), rs.tableId(), true, opCtx);
}
boost::optional<Record> next() final {
@@ -519,10 +522,10 @@ public:
bool restore() final {
if (!_cursor)
- _cursor.emplace(_rs.getURI(), _rs.tableId(), true, _txn);
+ _cursor.emplace(_rs.getURI(), _rs.tableId(), true, _opCtx);
// This will ensure an active session exists, so any restored cursors will bind to it
- invariant(WiredTigerRecoveryUnit::get(_txn)->getSession(_txn) == _cursor->getSession());
+ invariant(WiredTigerRecoveryUnit::get(_opCtx)->getSession(_opCtx) == _cursor->getSession());
_skipNextAdvance = false;
// If we've hit EOF, then this iterator is done and need not be restored.
@@ -566,12 +569,12 @@ public:
}
void detachFromOperationContext() final {
- _txn = nullptr;
+ _opCtx = nullptr;
_cursor = boost::none;
}
- void reattachToOperationContext(OperationContext* txn) final {
- _txn = txn;
+ void reattachToOperationContext(OperationContext* opCtx) final {
+ _opCtx = opCtx;
// _cursor recreated in restore() to avoid risk of WT_ROLLBACK issues.
}
@@ -598,7 +601,7 @@ private:
}
const WiredTigerRecordStore& _rs;
- OperationContext* _txn;
+ OperationContext* _opCtx;
const bool _forward;
bool _skipNextAdvance = false;
boost::optional<WiredTigerCursor> _cursor;
@@ -629,8 +632,8 @@ StatusWith<std::string> WiredTigerRecordStore::parseOptionsField(const BSONObj o
class WiredTigerRecordStore::RandomCursor final : public RecordCursor {
public:
- RandomCursor(OperationContext* txn, const WiredTigerRecordStore& rs, StringData config)
- : _cursor(nullptr), _rs(&rs), _txn(txn), _config(config.toString() + ",next_random") {
+ RandomCursor(OperationContext* opCtx, const WiredTigerRecordStore& rs, StringData config)
+ : _cursor(nullptr), _rs(&rs), _opCtx(opCtx), _config(config.toString() + ",next_random") {
restore();
}
@@ -668,7 +671,7 @@ public:
bool restore() final {
// We can't use the CursorCache since this cursor needs a special config string.
- WT_SESSION* session = WiredTigerRecoveryUnit::get(_txn)->getSession(_txn)->getSession();
+ WT_SESSION* session = WiredTigerRecoveryUnit::get(_opCtx)->getSession(_opCtx)->getSession();
if (!_cursor) {
invariantWTOK(session->open_cursor(
@@ -678,22 +681,22 @@ public:
return true;
}
void detachFromOperationContext() final {
- invariant(_txn);
- _txn = nullptr;
+ invariant(_opCtx);
+ _opCtx = nullptr;
if (_cursor) {
invariantWTOK(_cursor->close(_cursor));
}
_cursor = nullptr;
}
- void reattachToOperationContext(OperationContext* txn) final {
- invariant(!_txn);
- _txn = txn;
+ void reattachToOperationContext(OperationContext* opCtx) final {
+ invariant(!_opCtx);
+ _opCtx = opCtx;
}
private:
WT_CURSOR* _cursor;
const WiredTigerRecordStore* _rs;
- OperationContext* _txn;
+ OperationContext* _opCtx;
const std::string _config;
};
@@ -878,11 +881,11 @@ bool WiredTigerRecordStore::inShutdown() const {
return _shuttingDown;
}
-long long WiredTigerRecordStore::dataSize(OperationContext* txn) const {
+long long WiredTigerRecordStore::dataSize(OperationContext* opCtx) const {
return _dataSize.load();
}
-long long WiredTigerRecordStore::numRecords(OperationContext* txn) const {
+long long WiredTigerRecordStore::numRecords(OperationContext* opCtx) const {
return _numRecords.load();
}
@@ -900,13 +903,13 @@ int64_t WiredTigerRecordStore::cappedMaxSize() const {
return _cappedMaxSize;
}
-int64_t WiredTigerRecordStore::storageSize(OperationContext* txn,
+int64_t WiredTigerRecordStore::storageSize(OperationContext* opCtx,
BSONObjBuilder* extraInfo,
int infoLevel) const {
if (_isEphemeral) {
- return dataSize(txn);
+ return dataSize(opCtx);
}
- WiredTigerSession* session = WiredTigerRecoveryUnit::get(txn)->getSession(txn);
+ WiredTigerSession* session = WiredTigerRecoveryUnit::get(opCtx)->getSession(opCtx);
StatusWith<int64_t> result =
WiredTigerUtil::getStatisticsValueAs<int64_t>(session->getSession(),
"statistics:" + getURI(),
@@ -934,9 +937,9 @@ RecordData WiredTigerRecordStore::_getData(const WiredTigerCursor& cursor) const
return RecordData(data, value.size);
}
-RecordData WiredTigerRecordStore::dataFor(OperationContext* txn, const RecordId& id) const {
+RecordData WiredTigerRecordStore::dataFor(OperationContext* opCtx, const RecordId& id) const {
// ownership passes to the shared_array created below
- WiredTigerCursor curwrap(_uri, _tableId, true, txn);
+ WiredTigerCursor curwrap(_uri, _tableId, true, opCtx);
WT_CURSOR* c = curwrap.get();
invariant(c);
c->set_key(c, _makeKey(id));
@@ -946,10 +949,10 @@ RecordData WiredTigerRecordStore::dataFor(OperationContext* txn, const RecordId&
return _getData(curwrap);
}
-bool WiredTigerRecordStore::findRecord(OperationContext* txn,
+bool WiredTigerRecordStore::findRecord(OperationContext* opCtx,
const RecordId& id,
RecordData* out) const {
- WiredTigerCursor curwrap(_uri, _tableId, true, txn);
+ WiredTigerCursor curwrap(_uri, _tableId, true, opCtx);
WT_CURSOR* c = curwrap.get();
invariant(c);
c->set_key(c, _makeKey(id));
@@ -962,12 +965,12 @@ bool WiredTigerRecordStore::findRecord(OperationContext* txn,
return true;
}
-void WiredTigerRecordStore::deleteRecord(OperationContext* txn, const RecordId& id) {
+void WiredTigerRecordStore::deleteRecord(OperationContext* opCtx, const RecordId& id) {
// Deletes should never occur on a capped collection because truncation uses
// WT_SESSION::truncate().
invariant(!isCapped());
- WiredTigerCursor cursor(_uri, _tableId, true, txn);
+ WiredTigerCursor cursor(_uri, _tableId, true, opCtx);
cursor.assertInActiveTxn();
WT_CURSOR* c = cursor.get();
c->set_key(c, _makeKey(id));
@@ -983,8 +986,8 @@ void WiredTigerRecordStore::deleteRecord(OperationContext* txn, const RecordId&
ret = WT_OP_CHECK(c->remove(c));
invariantWTOK(ret);
- _changeNumRecords(txn, -1);
- _increaseDataSize(txn, -old_length);
+ _changeNumRecords(opCtx, -1);
+ _increaseDataSize(opCtx, -old_length);
}
bool WiredTigerRecordStore::cappedAndNeedDelete() const {
@@ -1000,7 +1003,7 @@ bool WiredTigerRecordStore::cappedAndNeedDelete() const {
return false;
}
-int64_t WiredTigerRecordStore::cappedDeleteAsNeeded(OperationContext* txn,
+int64_t WiredTigerRecordStore::cappedDeleteAsNeeded(OperationContext* opCtx,
const RecordId& justInserted) {
invariant(!_oplogStones);
@@ -1040,20 +1043,20 @@ int64_t WiredTigerRecordStore::cappedDeleteAsNeeded(OperationContext* txn,
}
}
- return cappedDeleteAsNeeded_inlock(txn, justInserted);
+ return cappedDeleteAsNeeded_inlock(opCtx, justInserted);
}
-int64_t WiredTigerRecordStore::cappedDeleteAsNeeded_inlock(OperationContext* txn,
+int64_t WiredTigerRecordStore::cappedDeleteAsNeeded_inlock(OperationContext* opCtx,
const RecordId& justInserted) {
// we do this in a side transaction in case it aborts
WiredTigerRecoveryUnit* realRecoveryUnit =
- checked_cast<WiredTigerRecoveryUnit*>(txn->releaseRecoveryUnit());
+ checked_cast<WiredTigerRecoveryUnit*>(opCtx->releaseRecoveryUnit());
invariant(realRecoveryUnit);
WiredTigerSessionCache* sc = realRecoveryUnit->getSessionCache();
OperationContext::RecoveryUnitState const realRUstate =
- txn->setRecoveryUnit(new WiredTigerRecoveryUnit(sc), OperationContext::kNotInUnitOfWork);
+ opCtx->setRecoveryUnit(new WiredTigerRecoveryUnit(sc), OperationContext::kNotInUnitOfWork);
- WT_SESSION* session = WiredTigerRecoveryUnit::get(txn)->getSession(txn)->getSession();
+ WT_SESSION* session = WiredTigerRecoveryUnit::get(opCtx)->getSession(opCtx)->getSession();
int64_t dataSize = _dataSize.load();
int64_t numRecords = _numRecords.load();
@@ -1065,9 +1068,9 @@ int64_t WiredTigerRecordStore::cappedDeleteAsNeeded_inlock(OperationContext* txn
docsOverCap = numRecords - _cappedMaxDocs;
try {
- WriteUnitOfWork wuow(txn);
+ WriteUnitOfWork wuow(opCtx);
- WiredTigerCursor curwrap(_uri, _tableId, true, txn);
+ WiredTigerCursor curwrap(_uri, _tableId, true, opCtx);
WT_CURSOR* truncateEnd = curwrap.get();
RecordId newestIdToDelete;
int ret = 0;
@@ -1109,7 +1112,7 @@ int64_t WiredTigerRecordStore::cappedDeleteAsNeeded_inlock(OperationContext* txn
stdx::lock_guard<stdx::mutex> cappedCallbackLock(_cappedCallbackMutex);
if (_cappedCallback) {
uassertStatusOK(_cappedCallback->aboutToDeleteCapped(
- txn,
+ opCtx,
newestIdToDelete,
RecordData(static_cast<const char*>(old_value.data), old_value.size)));
}
@@ -1136,7 +1139,7 @@ int64_t WiredTigerRecordStore::cappedDeleteAsNeeded_inlock(OperationContext* txn
}
invariantWTOK(truncateEnd->prev(truncateEnd)); // put the cursor back where it was
- WiredTigerCursor startWrap(_uri, _tableId, true, txn);
+ WiredTigerCursor startWrap(_uri, _tableId, true, opCtx);
WT_CURSOR* truncateStart = startWrap.get();
// If we know where the start point is, set it for the truncate
@@ -1153,35 +1156,35 @@ int64_t WiredTigerRecordStore::cappedDeleteAsNeeded_inlock(OperationContext* txn
docsRemoved = 0;
} else {
invariantWTOK(ret);
- _changeNumRecords(txn, -docsRemoved);
- _increaseDataSize(txn, -sizeSaved);
+ _changeNumRecords(opCtx, -docsRemoved);
+ _increaseDataSize(opCtx, -sizeSaved);
wuow.commit();
// Save the key for the next round
_cappedFirstRecord = firstRemainingId;
}
}
} catch (const WriteConflictException& wce) {
- delete txn->releaseRecoveryUnit();
- txn->setRecoveryUnit(realRecoveryUnit, realRUstate);
+ delete opCtx->releaseRecoveryUnit();
+ opCtx->setRecoveryUnit(realRecoveryUnit, realRUstate);
log() << "got conflict truncating capped, ignoring";
return 0;
} catch (...) {
- delete txn->releaseRecoveryUnit();
- txn->setRecoveryUnit(realRecoveryUnit, realRUstate);
+ delete opCtx->releaseRecoveryUnit();
+ opCtx->setRecoveryUnit(realRecoveryUnit, realRUstate);
throw;
}
- delete txn->releaseRecoveryUnit();
- txn->setRecoveryUnit(realRecoveryUnit, realRUstate);
+ delete opCtx->releaseRecoveryUnit();
+ opCtx->setRecoveryUnit(realRecoveryUnit, realRUstate);
return docsRemoved;
}
-bool WiredTigerRecordStore::yieldAndAwaitOplogDeletionRequest(OperationContext* txn) {
+bool WiredTigerRecordStore::yieldAndAwaitOplogDeletionRequest(OperationContext* opCtx) {
// Create another reference to the oplog stones while holding a lock on the collection to
// prevent it from being destructed.
std::shared_ptr<OplogStones> oplogStones = _oplogStones;
- Locker* locker = txn->lockState();
+ Locker* locker = opCtx->lockState();
Locker::LockSnapshot snapshot;
// Release any locks before waiting on the condition variable. It is illegal to access any
@@ -1191,7 +1194,7 @@ bool WiredTigerRecordStore::yieldAndAwaitOplogDeletionRequest(OperationContext*
// The top-level locks were freed, so also release any potential low-level (storage engine)
// locks that might be held.
- txn->recoveryUnit()->abandonSnapshot();
+ opCtx->recoveryUnit()->abandonSnapshot();
// Wait for an oplog deletion request, or for this record store to have been destroyed.
oplogStones->awaitHasExcessStonesOrDead();
@@ -1202,7 +1205,7 @@ bool WiredTigerRecordStore::yieldAndAwaitOplogDeletionRequest(OperationContext*
return !oplogStones->isDead();
}
-void WiredTigerRecordStore::reclaimOplog(OperationContext* txn) {
+void WiredTigerRecordStore::reclaimOplog(OperationContext* opCtx) {
while (auto stone = _oplogStones->peekOldestStoneIfNeeded()) {
invariant(stone->lastRecord.isNormal());
@@ -1210,23 +1213,23 @@ void WiredTigerRecordStore::reclaimOplog(OperationContext* txn) {
<< stone->lastRecord << " to remove approximately " << stone->records
<< " records totaling to " << stone->bytes << " bytes";
- WiredTigerRecoveryUnit* ru = WiredTigerRecoveryUnit::get(txn);
- WT_SESSION* session = ru->getSession(txn)->getSession();
+ WiredTigerRecoveryUnit* ru = WiredTigerRecoveryUnit::get(opCtx);
+ WT_SESSION* session = ru->getSession(opCtx)->getSession();
try {
- WriteUnitOfWork wuow(txn);
+ WriteUnitOfWork wuow(opCtx);
- WiredTigerCursor startwrap(_uri, _tableId, true, txn);
+ WiredTigerCursor startwrap(_uri, _tableId, true, opCtx);
WT_CURSOR* start = startwrap.get();
start->set_key(start, _makeKey(_oplogStones->firstRecord));
- WiredTigerCursor endwrap(_uri, _tableId, true, txn);
+ WiredTigerCursor endwrap(_uri, _tableId, true, opCtx);
WT_CURSOR* end = endwrap.get();
end->set_key(end, _makeKey(stone->lastRecord));
invariantWTOK(session->truncate(session, nullptr, start, end, nullptr));
- _changeNumRecords(txn, -stone->records);
- _increaseDataSize(txn, -stone->bytes);
+ _changeNumRecords(opCtx, -stone->records);
+ _increaseDataSize(opCtx, -stone->bytes);
wuow.commit();
@@ -1244,13 +1247,13 @@ void WiredTigerRecordStore::reclaimOplog(OperationContext* txn) {
<< " records totaling to " << _dataSize.load() << " bytes";
}
-Status WiredTigerRecordStore::insertRecords(OperationContext* txn,
+Status WiredTigerRecordStore::insertRecords(OperationContext* opCtx,
std::vector<Record>* records,
bool enforceQuota) {
- return _insertRecords(txn, records->data(), records->size());
+ return _insertRecords(opCtx, records->data(), records->size());
}
-Status WiredTigerRecordStore::_insertRecords(OperationContext* txn,
+Status WiredTigerRecordStore::_insertRecords(OperationContext* opCtx,
Record* records,
size_t nRecords) {
// We are kind of cheating on capped collections since we write all of them at once ....
@@ -1263,7 +1266,7 @@ Status WiredTigerRecordStore::_insertRecords(OperationContext* txn,
if (_isCapped && totalLength > _cappedMaxSize)
return Status(ErrorCodes::BadValue, "object to insert exceeds cappedMaxSize");
- WiredTigerCursor curwrap(_uri, _tableId, true, txn);
+ WiredTigerCursor curwrap(_uri, _tableId, true, opCtx);
curwrap.assertInActiveTxn();
WT_CURSOR* c = curwrap.get();
invariant(c);
@@ -1281,7 +1284,7 @@ Status WiredTigerRecordStore::_insertRecords(OperationContext* txn,
} else if (_isCapped) {
stdx::lock_guard<stdx::mutex> lk(_uncommittedRecordIdsMutex);
record.id = _nextId();
- _addUncommittedRecordId_inlock(txn, record.id);
+ _addUncommittedRecordId_inlock(opCtx, record.id);
} else {
record.id = _nextId();
}
@@ -1305,24 +1308,25 @@ Status WiredTigerRecordStore::_insertRecords(OperationContext* txn,
return wtRCToStatus(ret, "WiredTigerRecordStore::insertRecord");
}
- _changeNumRecords(txn, nRecords);
- _increaseDataSize(txn, totalLength);
+ _changeNumRecords(opCtx, nRecords);
+ _increaseDataSize(opCtx, totalLength);
if (_oplogStones) {
- _oplogStones->updateCurrentStoneAfterInsertOnCommit(txn, totalLength, highestId, nRecords);
+ _oplogStones->updateCurrentStoneAfterInsertOnCommit(
+ opCtx, totalLength, highestId, nRecords);
} else {
- cappedDeleteAsNeeded(txn, highestId);
+ cappedDeleteAsNeeded(opCtx, highestId);
}
return Status::OK();
}
-StatusWith<RecordId> WiredTigerRecordStore::insertRecord(OperationContext* txn,
+StatusWith<RecordId> WiredTigerRecordStore::insertRecord(OperationContext* opCtx,
const char* data,
int len,
bool enforceQuota) {
Record record = {RecordId(), RecordData(data, len)};
- Status status = _insertRecords(txn, &record, 1);
+ Status status = _insertRecords(opCtx, &record, 1);
if (!status.isOK())
return StatusWith<RecordId>(status);
return StatusWith<RecordId>(record.id);
@@ -1362,7 +1366,7 @@ RecordId WiredTigerRecordStore::lowestCappedHiddenRecord() const {
return _uncommittedRecordIds.empty() ? RecordId() : _uncommittedRecordIds.front();
}
-Status WiredTigerRecordStore::insertRecordsWithDocWriter(OperationContext* txn,
+Status WiredTigerRecordStore::insertRecordsWithDocWriter(OperationContext* opCtx,
const DocWriter* const* docs,
size_t nDocs,
RecordId* idsOut) {
@@ -1388,7 +1392,7 @@ Status WiredTigerRecordStore::insertRecordsWithDocWriter(OperationContext* txn,
}
invariant(pos == (buffer.get() + totalSize));
- Status s = _insertRecords(txn, records.get(), nDocs);
+ Status s = _insertRecords(opCtx, records.get(), nDocs);
if (!s.isOK())
return s;
@@ -1401,13 +1405,13 @@ Status WiredTigerRecordStore::insertRecordsWithDocWriter(OperationContext* txn,
return s;
}
-Status WiredTigerRecordStore::updateRecord(OperationContext* txn,
+Status WiredTigerRecordStore::updateRecord(OperationContext* opCtx,
const RecordId& id,
const char* data,
int len,
bool enforceQuota,
UpdateNotifier* notifier) {
- WiredTigerCursor curwrap(_uri, _tableId, true, txn);
+ WiredTigerCursor curwrap(_uri, _tableId, true, opCtx);
curwrap.assertInActiveTxn();
WT_CURSOR* c = curwrap.get();
invariant(c);
@@ -1431,9 +1435,9 @@ Status WiredTigerRecordStore::updateRecord(OperationContext* txn,
ret = WT_OP_CHECK(c->insert(c));
invariantWTOK(ret);
- _increaseDataSize(txn, len - old_length);
+ _increaseDataSize(opCtx, len - old_length);
if (!_oplogStones) {
- cappedDeleteAsNeeded(txn, id);
+ cappedDeleteAsNeeded(opCtx, id);
}
return Status::OK();
@@ -1444,7 +1448,7 @@ bool WiredTigerRecordStore::updateWithDamagesSupported() const {
}
StatusWith<RecordData> WiredTigerRecordStore::updateWithDamages(
- OperationContext* txn,
+ OperationContext* opCtx,
const RecordId& id,
const RecordData& oldRec,
const char* damageSource,
@@ -1461,41 +1465,42 @@ void WiredTigerRecordStore::_oplogSetStartHack(WiredTigerRecoveryUnit* wru) cons
}
}
-std::unique_ptr<SeekableRecordCursor> WiredTigerRecordStore::getCursor(OperationContext* txn,
+std::unique_ptr<SeekableRecordCursor> WiredTigerRecordStore::getCursor(OperationContext* opCtx,
bool forward) const {
if (_isOplog && forward) {
- WiredTigerRecoveryUnit* wru = WiredTigerRecoveryUnit::get(txn);
+ WiredTigerRecoveryUnit* wru = WiredTigerRecoveryUnit::get(opCtx);
// If we already have a snapshot we don't know what it can see, unless we know no one
// else could be writing (because we hold an exclusive lock).
- if (wru->inActiveTxn() && !txn->lockState()->isNoop() &&
- !txn->lockState()->isCollectionLockedForMode(_ns, MODE_X)) {
+ if (wru->inActiveTxn() && !opCtx->lockState()->isNoop() &&
+ !opCtx->lockState()->isCollectionLockedForMode(_ns, MODE_X)) {
throw WriteConflictException();
}
_oplogSetStartHack(wru);
}
- return stdx::make_unique<Cursor>(txn, *this, forward);
+ return stdx::make_unique<Cursor>(opCtx, *this, forward);
}
-std::unique_ptr<RecordCursor> WiredTigerRecordStore::getRandomCursor(OperationContext* txn) const {
+std::unique_ptr<RecordCursor> WiredTigerRecordStore::getRandomCursor(
+ OperationContext* opCtx) const {
const char* extraConfig = "";
- return getRandomCursorWithOptions(txn, extraConfig);
+ return getRandomCursorWithOptions(opCtx, extraConfig);
}
std::unique_ptr<RecordCursor> WiredTigerRecordStore::getRandomCursorWithOptions(
- OperationContext* txn, StringData extraConfig) const {
- return stdx::make_unique<RandomCursor>(txn, *this, extraConfig);
+ OperationContext* opCtx, StringData extraConfig) const {
+ return stdx::make_unique<RandomCursor>(opCtx, *this, extraConfig);
}
std::vector<std::unique_ptr<RecordCursor>> WiredTigerRecordStore::getManyCursors(
- OperationContext* txn) const {
+ OperationContext* opCtx) const {
std::vector<std::unique_ptr<RecordCursor>> cursors(1);
- cursors[0] = stdx::make_unique<Cursor>(txn, *this, /*forward=*/true);
+ cursors[0] = stdx::make_unique<Cursor>(opCtx, *this, /*forward=*/true);
return cursors;
}
-Status WiredTigerRecordStore::truncate(OperationContext* txn) {
- WiredTigerCursor startWrap(_uri, _tableId, true, txn);
+Status WiredTigerRecordStore::truncate(OperationContext* opCtx) {
+ WiredTigerCursor startWrap(_uri, _tableId, true, opCtx);
WT_CURSOR* start = startWrap.get();
int ret = WT_OP_CHECK(start->next(start));
// Empty collections don't have anything to truncate.
@@ -1504,23 +1509,23 @@ Status WiredTigerRecordStore::truncate(OperationContext* txn) {
}
invariantWTOK(ret);
- WT_SESSION* session = WiredTigerRecoveryUnit::get(txn)->getSession(txn)->getSession();
+ WT_SESSION* session = WiredTigerRecoveryUnit::get(opCtx)->getSession(opCtx)->getSession();
invariantWTOK(WT_OP_CHECK(session->truncate(session, NULL, start, NULL, NULL)));
- _changeNumRecords(txn, -numRecords(txn));
- _increaseDataSize(txn, -dataSize(txn));
+ _changeNumRecords(opCtx, -numRecords(opCtx));
+ _increaseDataSize(opCtx, -dataSize(opCtx));
if (_oplogStones) {
- _oplogStones->clearStonesOnCommit(txn);
+ _oplogStones->clearStonesOnCommit(opCtx);
}
return Status::OK();
}
-Status WiredTigerRecordStore::compact(OperationContext* txn,
+Status WiredTigerRecordStore::compact(OperationContext* opCtx,
RecordStoreCompactAdaptor* adaptor,
const CompactOptions* options,
CompactStats* stats) {
- WiredTigerSessionCache* cache = WiredTigerRecoveryUnit::get(txn)->getSessionCache();
+ WiredTigerSessionCache* cache = WiredTigerRecoveryUnit::get(opCtx)->getSessionCache();
if (!cache->isEphemeral()) {
UniqueWiredTigerSession session = cache->getSession();
WT_SESSION* s = session->getSession();
@@ -1530,13 +1535,13 @@ Status WiredTigerRecordStore::compact(OperationContext* txn,
return Status::OK();
}
-Status WiredTigerRecordStore::validate(OperationContext* txn,
+Status WiredTigerRecordStore::validate(OperationContext* opCtx,
ValidateCmdLevel level,
ValidateAdaptor* adaptor,
ValidateResults* results,
BSONObjBuilder* output) {
if (!_isEphemeral) {
- int err = WiredTigerUtil::verifyTable(txn, _uri, &results->errors);
+ int err = WiredTigerUtil::verifyTable(opCtx, _uri, &results->errors);
if (err == EBUSY) {
const char* msg = "verify() returned EBUSY. Not treating as invalid.";
warning() << msg;
@@ -1558,12 +1563,12 @@ Status WiredTigerRecordStore::validate(OperationContext* txn,
long long nInvalid = 0;
results->valid = true;
- Cursor cursor(txn, *this, true);
+ Cursor cursor(opCtx, *this, true);
int interruptInterval = 4096;
while (auto record = cursor.next()) {
if (!(nrecords % interruptInterval))
- txn->checkForInterrupt();
+ opCtx->checkForInterrupt();
++nrecords;
auto dataSize = record->data.size();
dataSizeTotal += dataSize;
@@ -1606,7 +1611,7 @@ Status WiredTigerRecordStore::validate(OperationContext* txn,
return Status::OK();
}
-void WiredTigerRecordStore::appendCustomStats(OperationContext* txn,
+void WiredTigerRecordStore::appendCustomStats(OperationContext* opCtx,
BSONObjBuilder* result,
double scale) const {
result->appendBool("capped", _isCapped);
@@ -1616,12 +1621,12 @@ void WiredTigerRecordStore::appendCustomStats(OperationContext* txn,
result->appendIntOrLL("sleepCount", _cappedSleep.load());
result->appendIntOrLL("sleepMS", _cappedSleepMS.load());
}
- WiredTigerSession* session = WiredTigerRecoveryUnit::get(txn)->getSession(txn);
+ WiredTigerSession* session = WiredTigerRecoveryUnit::get(opCtx)->getSession(opCtx);
WT_SESSION* s = session->getSession();
BSONObjBuilder bob(result->subobjStart(_engineName));
{
BSONObjBuilder metadata(bob.subobjStart("metadata"));
- Status status = WiredTigerUtil::getApplicationMetadata(txn, getURI(), &metadata);
+ Status status = WiredTigerUtil::getApplicationMetadata(opCtx, getURI(), &metadata);
if (!status.isOK()) {
metadata.append("error", "unable to retrieve metadata");
metadata.append("code", static_cast<int>(status.code()));
@@ -1630,8 +1635,8 @@ void WiredTigerRecordStore::appendCustomStats(OperationContext* txn,
}
std::string type, sourceURI;
- WiredTigerUtil::fetchTypeAndSourceURI(txn, _uri, &type, &sourceURI);
- StatusWith<std::string> metadataResult = WiredTigerUtil::getMetadata(txn, sourceURI);
+ WiredTigerUtil::fetchTypeAndSourceURI(opCtx, _uri, &type, &sourceURI);
+ StatusWith<std::string> metadataResult = WiredTigerUtil::getMetadata(opCtx, sourceURI);
StringData creationStringName("creationString");
if (!metadataResult.isOK()) {
BSONObjBuilder creationString(bob.subobjStart(creationStringName));
@@ -1653,7 +1658,7 @@ void WiredTigerRecordStore::appendCustomStats(OperationContext* txn,
}
}
-Status WiredTigerRecordStore::touch(OperationContext* txn, BSONObjBuilder* output) const {
+Status WiredTigerRecordStore::touch(OperationContext* opCtx, BSONObjBuilder* output) const {
if (_isEphemeral) {
// Everything is already in memory.
return Status::OK();
@@ -1661,13 +1666,14 @@ Status WiredTigerRecordStore::touch(OperationContext* txn, BSONObjBuilder* outpu
return Status(ErrorCodes::CommandNotSupported, "this storage engine does not support touch");
}
-Status WiredTigerRecordStore::oplogDiskLocRegister(OperationContext* txn, const Timestamp& opTime) {
+Status WiredTigerRecordStore::oplogDiskLocRegister(OperationContext* opCtx,
+ const Timestamp& opTime) {
StatusWith<RecordId> id = oploghack::keyForOptime(opTime);
if (!id.isOK())
return id.getStatus();
stdx::lock_guard<stdx::mutex> lk(_uncommittedRecordIdsMutex);
- _addUncommittedRecordId_inlock(txn, id.getValue());
+ _addUncommittedRecordId_inlock(opCtx, id.getValue());
return Status::OK();
}
@@ -1733,38 +1739,38 @@ void WiredTigerRecordStore::_oplogJournalThreadLoop(WiredTigerSessionCache* sess
std::terminate();
}
-void WiredTigerRecordStore::waitForAllEarlierOplogWritesToBeVisible(OperationContext* txn) const {
- invariant(txn->lockState()->isNoop() || !txn->lockState()->inAWriteUnitOfWork());
+void WiredTigerRecordStore::waitForAllEarlierOplogWritesToBeVisible(OperationContext* opCtx) const {
+ invariant(opCtx->lockState()->isNoop() || !opCtx->lockState()->inAWriteUnitOfWork());
// This function must not start a WT transaction, otherwise we will get stuck in an infinite
// loop of WCE handling when the getCursor() is called.
stdx::unique_lock<stdx::mutex> lk(_uncommittedRecordIdsMutex);
const auto waitingFor = _oplog_highestSeen;
- txn->waitForConditionOrInterrupt(_opsBecameVisibleCV, lk, [&] {
+ opCtx->waitForConditionOrInterrupt(_opsBecameVisibleCV, lk, [&] {
return _uncommittedRecordIds.empty() || _uncommittedRecordIds.front() > waitingFor;
});
}
-void WiredTigerRecordStore::_addUncommittedRecordId_inlock(OperationContext* txn, RecordId id) {
+void WiredTigerRecordStore::_addUncommittedRecordId_inlock(OperationContext* opCtx, RecordId id) {
dassert(_uncommittedRecordIds.empty() || _uncommittedRecordIds.back() < id);
SortedRecordIds::iterator it = _uncommittedRecordIds.insert(_uncommittedRecordIds.end(), id);
invariant(it->isNormal());
- txn->recoveryUnit()->registerChange(new CappedInsertChange(this, it));
+ opCtx->recoveryUnit()->registerChange(new CappedInsertChange(this, it));
_oplog_highestSeen = id;
}
boost::optional<RecordId> WiredTigerRecordStore::oplogStartHack(
- OperationContext* txn, const RecordId& startingPosition) const {
+ OperationContext* opCtx, const RecordId& startingPosition) const {
if (!_useOplogHack)
return boost::none;
{
- WiredTigerRecoveryUnit* wru = WiredTigerRecoveryUnit::get(txn);
+ WiredTigerRecoveryUnit* wru = WiredTigerRecoveryUnit::get(opCtx);
_oplogSetStartHack(wru);
}
- WiredTigerCursor cursor(_uri, _tableId, true, txn);
+ WiredTigerCursor cursor(_uri, _tableId, true, opCtx);
WT_CURSOR* c = cursor.get();
int cmp;
@@ -1782,7 +1788,7 @@ boost::optional<RecordId> WiredTigerRecordStore::oplogStartHack(
return _fromKey(key);
}
-void WiredTigerRecordStore::updateStatsAfterRepair(OperationContext* txn,
+void WiredTigerRecordStore::updateStatsAfterRepair(OperationContext* opCtx,
long long numRecords,
long long dataSize) {
_numRecords.store(numRecords);
@@ -1800,8 +1806,8 @@ RecordId WiredTigerRecordStore::_nextId() {
return out;
}
-WiredTigerRecoveryUnit* WiredTigerRecordStore::_getRecoveryUnit(OperationContext* txn) {
- return checked_cast<WiredTigerRecoveryUnit*>(txn->recoveryUnit());
+WiredTigerRecoveryUnit* WiredTigerRecordStore::_getRecoveryUnit(OperationContext* opCtx) {
+ return checked_cast<WiredTigerRecoveryUnit*>(opCtx->recoveryUnit());
}
class WiredTigerRecordStore::NumRecordsChange : public RecoveryUnit::Change {
@@ -1817,8 +1823,8 @@ private:
int64_t _diff;
};
-void WiredTigerRecordStore::_changeNumRecords(OperationContext* txn, int64_t diff) {
- txn->recoveryUnit()->registerChange(new NumRecordsChange(this, diff));
+void WiredTigerRecordStore::_changeNumRecords(OperationContext* opCtx, int64_t diff) {
+ opCtx->recoveryUnit()->registerChange(new NumRecordsChange(this, diff));
if (_numRecords.fetchAndAdd(diff) < 0)
_numRecords.store(std::max(diff, int64_t(0)));
}
@@ -1836,9 +1842,9 @@ private:
int64_t _amount;
};
-void WiredTigerRecordStore::_increaseDataSize(OperationContext* txn, int64_t amount) {
- if (txn)
- txn->recoveryUnit()->registerChange(new DataSizeChange(this, amount));
+void WiredTigerRecordStore::_increaseDataSize(OperationContext* opCtx, int64_t amount) {
+ if (opCtx)
+ opCtx->recoveryUnit()->registerChange(new DataSizeChange(this, amount));
if (_dataSize.fetchAndAdd(amount) < 0)
_dataSize.store(std::max(amount, int64_t(0)));
@@ -1855,10 +1861,10 @@ RecordId WiredTigerRecordStore::_fromKey(int64_t key) {
return RecordId(key);
}
-void WiredTigerRecordStore::cappedTruncateAfter(OperationContext* txn,
+void WiredTigerRecordStore::cappedTruncateAfter(OperationContext* opCtx,
RecordId end,
bool inclusive) {
- Cursor cursor(txn, *this);
+ Cursor cursor(opCtx, *this);
auto record = cursor.seekExact(end);
massert(28807, str::stream() << "Failed to seek to the record located at " << end, record);
@@ -1869,7 +1875,7 @@ void WiredTigerRecordStore::cappedTruncateAfter(OperationContext* txn,
RecordId firstRemovedId;
if (inclusive) {
- Cursor reverseCursor(txn, *this, false);
+ Cursor reverseCursor(opCtx, *this, false);
invariant(reverseCursor.seekExact(end));
auto prev = reverseCursor.next();
lastKeptId = prev ? prev->id : RecordId();
@@ -1891,7 +1897,7 @@ void WiredTigerRecordStore::cappedTruncateAfter(OperationContext* txn,
do {
if (_cappedCallback) {
uassertStatusOK(
- _cappedCallback->aboutToDeleteCapped(txn, record->id, record->data));
+ _cappedCallback->aboutToDeleteCapped(opCtx, record->id, record->data));
}
recordsRemoved++;
bytesRemoved += record->data.size();
@@ -1900,17 +1906,17 @@ void WiredTigerRecordStore::cappedTruncateAfter(OperationContext* txn,
// Truncate the collection starting from the record located at 'firstRemovedId' to the end of
// the collection.
- WriteUnitOfWork wuow(txn);
+ WriteUnitOfWork wuow(opCtx);
- WiredTigerCursor startwrap(_uri, _tableId, true, txn);
+ WiredTigerCursor startwrap(_uri, _tableId, true, opCtx);
WT_CURSOR* start = startwrap.get();
start->set_key(start, _makeKey(firstRemovedId));
- WT_SESSION* session = WiredTigerRecoveryUnit::get(txn)->getSession(txn)->getSession();
+ WT_SESSION* session = WiredTigerRecoveryUnit::get(opCtx)->getSession(opCtx)->getSession();
invariantWTOK(session->truncate(session, nullptr, start, nullptr, nullptr));
- _changeNumRecords(txn, -recordsRemoved);
- _increaseDataSize(txn, -bytesRemoved);
+ _changeNumRecords(opCtx, -recordsRemoved);
+ _increaseDataSize(opCtx, -bytesRemoved);
wuow.commit();