diff options
Diffstat (limited to 'src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp')
-rw-r--r-- | src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp | 326 |
1 files changed, 166 insertions, 160 deletions
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp index 6a33378a070..a88754cf992 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp @@ -143,7 +143,7 @@ private: OplogStones* _oplogStones; }; -WiredTigerRecordStore::OplogStones::OplogStones(OperationContext* txn, WiredTigerRecordStore* rs) +WiredTigerRecordStore::OplogStones::OplogStones(OperationContext* opCtx, WiredTigerRecordStore* rs) : _rs(rs) { stdx::lock_guard<stdx::mutex> lk(_mutex); @@ -159,7 +159,7 @@ WiredTigerRecordStore::OplogStones::OplogStones(OperationContext* txn, WiredTige _minBytesPerStone = maxSize / _numStonesToKeep; invariant(_minBytesPerStone > 0); - _calculateStones(txn); + _calculateStones(opCtx); _pokeReclaimThreadIfNeeded(); // Reclaim stones if over the limit. } @@ -227,13 +227,16 @@ void WiredTigerRecordStore::OplogStones::createNewStoneIfNeeded(RecordId lastRec } void WiredTigerRecordStore::OplogStones::updateCurrentStoneAfterInsertOnCommit( - OperationContext* txn, int64_t bytesInserted, RecordId highestInserted, int64_t countInserted) { - txn->recoveryUnit()->registerChange( + OperationContext* opCtx, + int64_t bytesInserted, + RecordId highestInserted, + int64_t countInserted) { + opCtx->recoveryUnit()->registerChange( new InsertChange(this, bytesInserted, highestInserted, countInserted)); } -void WiredTigerRecordStore::OplogStones::clearStonesOnCommit(OperationContext* txn) { - txn->recoveryUnit()->registerChange(new TruncateChange(this)); +void WiredTigerRecordStore::OplogStones::clearStonesOnCommit(OperationContext* opCtx) { + opCtx->recoveryUnit()->registerChange(new TruncateChange(this)); } void WiredTigerRecordStore::OplogStones::updateStonesAfterCappedTruncateAfter( @@ -285,9 +288,9 @@ void WiredTigerRecordStore::OplogStones::setNumStonesToKeep(size_t numStones) { _numStonesToKeep = numStones; } -void WiredTigerRecordStore::OplogStones::_calculateStones(OperationContext* txn) { - long long numRecords = _rs->numRecords(txn); - long long dataSize = _rs->dataSize(txn); +void WiredTigerRecordStore::OplogStones::_calculateStones(OperationContext* opCtx) { + long long numRecords = _rs->numRecords(opCtx); + long long dataSize = _rs->dataSize(opCtx); log() << "The size storer reports that the oplog contains " << numRecords << " records totaling to " << dataSize << " bytes"; @@ -301,7 +304,7 @@ void WiredTigerRecordStore::OplogStones::_calculateStones(OperationContext* txn) if (numRecords <= 0 || dataSize <= 0 || uint64_t(numRecords) < kMinSampleRatioForRandCursor * kRandomSamplesPerStone * _numStonesToKeep) { - _calculateStonesByScanning(txn); + _calculateStonesByScanning(opCtx); return; } @@ -311,16 +314,16 @@ void WiredTigerRecordStore::OplogStones::_calculateStones(OperationContext* txn) double estRecordsPerStone = std::ceil(_minBytesPerStone / avgRecordSize); double estBytesPerStone = estRecordsPerStone * avgRecordSize; - _calculateStonesBySampling(txn, int64_t(estRecordsPerStone), int64_t(estBytesPerStone)); + _calculateStonesBySampling(opCtx, int64_t(estRecordsPerStone), int64_t(estBytesPerStone)); } -void WiredTigerRecordStore::OplogStones::_calculateStonesByScanning(OperationContext* txn) { +void WiredTigerRecordStore::OplogStones::_calculateStonesByScanning(OperationContext* opCtx) { log() << "Scanning the oplog to determine where to place markers for truncation"; long long numRecords = 0; long long dataSize = 0; - auto cursor = _rs->getCursor(txn, true); + auto cursor = _rs->getCursor(opCtx, true); while (auto record = cursor->next()) { _currentRecords.addAndFetch(1); int64_t newCurrentBytes = _currentBytes.addAndFetch(record->data.size()); @@ -336,10 +339,10 @@ void WiredTigerRecordStore::OplogStones::_calculateStonesByScanning(OperationCon dataSize += record->data.size(); } - _rs->updateStatsAfterRepair(txn, numRecords, dataSize); + _rs->updateStatsAfterRepair(opCtx, numRecords, dataSize); } -void WiredTigerRecordStore::OplogStones::_calculateStonesBySampling(OperationContext* txn, +void WiredTigerRecordStore::OplogStones::_calculateStonesBySampling(OperationContext* opCtx, int64_t estRecordsPerStone, int64_t estBytesPerStone) { Timestamp earliestOpTime; @@ -347,13 +350,13 @@ void WiredTigerRecordStore::OplogStones::_calculateStonesBySampling(OperationCon { const bool forward = true; - auto cursor = _rs->getCursor(txn, forward); + auto cursor = _rs->getCursor(opCtx, forward); auto record = cursor->next(); if (!record) { // This shouldn't really happen unless the size storer values are far off from reality. // The collection is probably empty, but fall back to scanning the oplog just in case. log() << "Failed to determine the earliest optime, falling back to scanning the oplog"; - _calculateStonesByScanning(txn); + _calculateStonesByScanning(opCtx); return; } earliestOpTime = Timestamp(record->id.repr()); @@ -361,13 +364,13 @@ void WiredTigerRecordStore::OplogStones::_calculateStonesBySampling(OperationCon { const bool forward = false; - auto cursor = _rs->getCursor(txn, forward); + auto cursor = _rs->getCursor(opCtx, forward); auto record = cursor->next(); if (!record) { // This shouldn't really happen unless the size storer values are far off from reality. // The collection is probably empty, but fall back to scanning the oplog just in case. log() << "Failed to determine the latest optime, falling back to scanning the oplog"; - _calculateStonesByScanning(txn); + _calculateStonesByScanning(opCtx); return; } latestOpTime = Timestamp(record->id.repr()); @@ -376,8 +379,8 @@ void WiredTigerRecordStore::OplogStones::_calculateStonesBySampling(OperationCon log() << "Sampling from the oplog between " << earliestOpTime.toStringPretty() << " and " << latestOpTime.toStringPretty() << " to determine where to place markers for truncation"; - int64_t wholeStones = _rs->numRecords(txn) / estRecordsPerStone; - int64_t numSamples = kRandomSamplesPerStone * _rs->numRecords(txn) / estRecordsPerStone; + int64_t wholeStones = _rs->numRecords(opCtx) / estRecordsPerStone; + int64_t numSamples = kRandomSamplesPerStone * _rs->numRecords(opCtx) / estRecordsPerStone; log() << "Taking " << numSamples << " samples and assuming that each section of oplog contains" << " approximately " << estRecordsPerStone << " records totaling to " << estBytesPerStone @@ -391,7 +394,7 @@ void WiredTigerRecordStore::OplogStones::_calculateStonesBySampling(OperationCon // approximately 'estRecordsPerStone'. Do so by oversampling the oplog, sorting the samples in // order of their RecordId, and then choosing the samples expected to be near the right edge of // each logical section. - auto cursor = _rs->getRandomCursorWithOptions(txn, extraConfig); + auto cursor = _rs->getRandomCursorWithOptions(opCtx, extraConfig); std::vector<RecordId> oplogEstimates; for (int i = 0; i < numSamples; ++i) { auto record = cursor->next(); @@ -399,7 +402,7 @@ void WiredTigerRecordStore::OplogStones::_calculateStonesBySampling(OperationCon // This shouldn't really happen unless the size storer values are far off from reality. // The collection is probably empty, but fall back to scanning the oplog just in case. log() << "Failed to get enough random samples, falling back to scanning the oplog"; - _calculateStonesByScanning(txn); + _calculateStonesByScanning(opCtx); return; } oplogEstimates.push_back(record->id); @@ -418,8 +421,8 @@ void WiredTigerRecordStore::OplogStones::_calculateStonesBySampling(OperationCon } // Account for the partially filled chunk. - _currentRecords.store(_rs->numRecords(txn) - estRecordsPerStone * wholeStones); - _currentBytes.store(_rs->dataSize(txn) - estBytesPerStone * wholeStones); + _currentRecords.store(_rs->numRecords(opCtx) - estRecordsPerStone * wholeStones); + _currentBytes.store(_rs->dataSize(opCtx) - estBytesPerStone * wholeStones); } void WiredTigerRecordStore::OplogStones::_pokeReclaimThreadIfNeeded() { @@ -430,12 +433,12 @@ void WiredTigerRecordStore::OplogStones::_pokeReclaimThreadIfNeeded() { class WiredTigerRecordStore::Cursor final : public SeekableRecordCursor { public: - Cursor(OperationContext* txn, const WiredTigerRecordStore& rs, bool forward = true) + Cursor(OperationContext* opCtx, const WiredTigerRecordStore& rs, bool forward = true) : _rs(rs), - _txn(txn), + _opCtx(opCtx), _forward(forward), - _readUntilForOplog(WiredTigerRecoveryUnit::get(txn)->getOplogReadTill()) { - _cursor.emplace(rs.getURI(), rs.tableId(), true, txn); + _readUntilForOplog(WiredTigerRecoveryUnit::get(opCtx)->getOplogReadTill()) { + _cursor.emplace(rs.getURI(), rs.tableId(), true, opCtx); } boost::optional<Record> next() final { @@ -519,10 +522,10 @@ public: bool restore() final { if (!_cursor) - _cursor.emplace(_rs.getURI(), _rs.tableId(), true, _txn); + _cursor.emplace(_rs.getURI(), _rs.tableId(), true, _opCtx); // This will ensure an active session exists, so any restored cursors will bind to it - invariant(WiredTigerRecoveryUnit::get(_txn)->getSession(_txn) == _cursor->getSession()); + invariant(WiredTigerRecoveryUnit::get(_opCtx)->getSession(_opCtx) == _cursor->getSession()); _skipNextAdvance = false; // If we've hit EOF, then this iterator is done and need not be restored. @@ -566,12 +569,12 @@ public: } void detachFromOperationContext() final { - _txn = nullptr; + _opCtx = nullptr; _cursor = boost::none; } - void reattachToOperationContext(OperationContext* txn) final { - _txn = txn; + void reattachToOperationContext(OperationContext* opCtx) final { + _opCtx = opCtx; // _cursor recreated in restore() to avoid risk of WT_ROLLBACK issues. } @@ -598,7 +601,7 @@ private: } const WiredTigerRecordStore& _rs; - OperationContext* _txn; + OperationContext* _opCtx; const bool _forward; bool _skipNextAdvance = false; boost::optional<WiredTigerCursor> _cursor; @@ -629,8 +632,8 @@ StatusWith<std::string> WiredTigerRecordStore::parseOptionsField(const BSONObj o class WiredTigerRecordStore::RandomCursor final : public RecordCursor { public: - RandomCursor(OperationContext* txn, const WiredTigerRecordStore& rs, StringData config) - : _cursor(nullptr), _rs(&rs), _txn(txn), _config(config.toString() + ",next_random") { + RandomCursor(OperationContext* opCtx, const WiredTigerRecordStore& rs, StringData config) + : _cursor(nullptr), _rs(&rs), _opCtx(opCtx), _config(config.toString() + ",next_random") { restore(); } @@ -668,7 +671,7 @@ public: bool restore() final { // We can't use the CursorCache since this cursor needs a special config string. - WT_SESSION* session = WiredTigerRecoveryUnit::get(_txn)->getSession(_txn)->getSession(); + WT_SESSION* session = WiredTigerRecoveryUnit::get(_opCtx)->getSession(_opCtx)->getSession(); if (!_cursor) { invariantWTOK(session->open_cursor( @@ -678,22 +681,22 @@ public: return true; } void detachFromOperationContext() final { - invariant(_txn); - _txn = nullptr; + invariant(_opCtx); + _opCtx = nullptr; if (_cursor) { invariantWTOK(_cursor->close(_cursor)); } _cursor = nullptr; } - void reattachToOperationContext(OperationContext* txn) final { - invariant(!_txn); - _txn = txn; + void reattachToOperationContext(OperationContext* opCtx) final { + invariant(!_opCtx); + _opCtx = opCtx; } private: WT_CURSOR* _cursor; const WiredTigerRecordStore* _rs; - OperationContext* _txn; + OperationContext* _opCtx; const std::string _config; }; @@ -878,11 +881,11 @@ bool WiredTigerRecordStore::inShutdown() const { return _shuttingDown; } -long long WiredTigerRecordStore::dataSize(OperationContext* txn) const { +long long WiredTigerRecordStore::dataSize(OperationContext* opCtx) const { return _dataSize.load(); } -long long WiredTigerRecordStore::numRecords(OperationContext* txn) const { +long long WiredTigerRecordStore::numRecords(OperationContext* opCtx) const { return _numRecords.load(); } @@ -900,13 +903,13 @@ int64_t WiredTigerRecordStore::cappedMaxSize() const { return _cappedMaxSize; } -int64_t WiredTigerRecordStore::storageSize(OperationContext* txn, +int64_t WiredTigerRecordStore::storageSize(OperationContext* opCtx, BSONObjBuilder* extraInfo, int infoLevel) const { if (_isEphemeral) { - return dataSize(txn); + return dataSize(opCtx); } - WiredTigerSession* session = WiredTigerRecoveryUnit::get(txn)->getSession(txn); + WiredTigerSession* session = WiredTigerRecoveryUnit::get(opCtx)->getSession(opCtx); StatusWith<int64_t> result = WiredTigerUtil::getStatisticsValueAs<int64_t>(session->getSession(), "statistics:" + getURI(), @@ -934,9 +937,9 @@ RecordData WiredTigerRecordStore::_getData(const WiredTigerCursor& cursor) const return RecordData(data, value.size); } -RecordData WiredTigerRecordStore::dataFor(OperationContext* txn, const RecordId& id) const { +RecordData WiredTigerRecordStore::dataFor(OperationContext* opCtx, const RecordId& id) const { // ownership passes to the shared_array created below - WiredTigerCursor curwrap(_uri, _tableId, true, txn); + WiredTigerCursor curwrap(_uri, _tableId, true, opCtx); WT_CURSOR* c = curwrap.get(); invariant(c); c->set_key(c, _makeKey(id)); @@ -946,10 +949,10 @@ RecordData WiredTigerRecordStore::dataFor(OperationContext* txn, const RecordId& return _getData(curwrap); } -bool WiredTigerRecordStore::findRecord(OperationContext* txn, +bool WiredTigerRecordStore::findRecord(OperationContext* opCtx, const RecordId& id, RecordData* out) const { - WiredTigerCursor curwrap(_uri, _tableId, true, txn); + WiredTigerCursor curwrap(_uri, _tableId, true, opCtx); WT_CURSOR* c = curwrap.get(); invariant(c); c->set_key(c, _makeKey(id)); @@ -962,12 +965,12 @@ bool WiredTigerRecordStore::findRecord(OperationContext* txn, return true; } -void WiredTigerRecordStore::deleteRecord(OperationContext* txn, const RecordId& id) { +void WiredTigerRecordStore::deleteRecord(OperationContext* opCtx, const RecordId& id) { // Deletes should never occur on a capped collection because truncation uses // WT_SESSION::truncate(). invariant(!isCapped()); - WiredTigerCursor cursor(_uri, _tableId, true, txn); + WiredTigerCursor cursor(_uri, _tableId, true, opCtx); cursor.assertInActiveTxn(); WT_CURSOR* c = cursor.get(); c->set_key(c, _makeKey(id)); @@ -983,8 +986,8 @@ void WiredTigerRecordStore::deleteRecord(OperationContext* txn, const RecordId& ret = WT_OP_CHECK(c->remove(c)); invariantWTOK(ret); - _changeNumRecords(txn, -1); - _increaseDataSize(txn, -old_length); + _changeNumRecords(opCtx, -1); + _increaseDataSize(opCtx, -old_length); } bool WiredTigerRecordStore::cappedAndNeedDelete() const { @@ -1000,7 +1003,7 @@ bool WiredTigerRecordStore::cappedAndNeedDelete() const { return false; } -int64_t WiredTigerRecordStore::cappedDeleteAsNeeded(OperationContext* txn, +int64_t WiredTigerRecordStore::cappedDeleteAsNeeded(OperationContext* opCtx, const RecordId& justInserted) { invariant(!_oplogStones); @@ -1040,20 +1043,20 @@ int64_t WiredTigerRecordStore::cappedDeleteAsNeeded(OperationContext* txn, } } - return cappedDeleteAsNeeded_inlock(txn, justInserted); + return cappedDeleteAsNeeded_inlock(opCtx, justInserted); } -int64_t WiredTigerRecordStore::cappedDeleteAsNeeded_inlock(OperationContext* txn, +int64_t WiredTigerRecordStore::cappedDeleteAsNeeded_inlock(OperationContext* opCtx, const RecordId& justInserted) { // we do this in a side transaction in case it aborts WiredTigerRecoveryUnit* realRecoveryUnit = - checked_cast<WiredTigerRecoveryUnit*>(txn->releaseRecoveryUnit()); + checked_cast<WiredTigerRecoveryUnit*>(opCtx->releaseRecoveryUnit()); invariant(realRecoveryUnit); WiredTigerSessionCache* sc = realRecoveryUnit->getSessionCache(); OperationContext::RecoveryUnitState const realRUstate = - txn->setRecoveryUnit(new WiredTigerRecoveryUnit(sc), OperationContext::kNotInUnitOfWork); + opCtx->setRecoveryUnit(new WiredTigerRecoveryUnit(sc), OperationContext::kNotInUnitOfWork); - WT_SESSION* session = WiredTigerRecoveryUnit::get(txn)->getSession(txn)->getSession(); + WT_SESSION* session = WiredTigerRecoveryUnit::get(opCtx)->getSession(opCtx)->getSession(); int64_t dataSize = _dataSize.load(); int64_t numRecords = _numRecords.load(); @@ -1065,9 +1068,9 @@ int64_t WiredTigerRecordStore::cappedDeleteAsNeeded_inlock(OperationContext* txn docsOverCap = numRecords - _cappedMaxDocs; try { - WriteUnitOfWork wuow(txn); + WriteUnitOfWork wuow(opCtx); - WiredTigerCursor curwrap(_uri, _tableId, true, txn); + WiredTigerCursor curwrap(_uri, _tableId, true, opCtx); WT_CURSOR* truncateEnd = curwrap.get(); RecordId newestIdToDelete; int ret = 0; @@ -1109,7 +1112,7 @@ int64_t WiredTigerRecordStore::cappedDeleteAsNeeded_inlock(OperationContext* txn stdx::lock_guard<stdx::mutex> cappedCallbackLock(_cappedCallbackMutex); if (_cappedCallback) { uassertStatusOK(_cappedCallback->aboutToDeleteCapped( - txn, + opCtx, newestIdToDelete, RecordData(static_cast<const char*>(old_value.data), old_value.size))); } @@ -1136,7 +1139,7 @@ int64_t WiredTigerRecordStore::cappedDeleteAsNeeded_inlock(OperationContext* txn } invariantWTOK(truncateEnd->prev(truncateEnd)); // put the cursor back where it was - WiredTigerCursor startWrap(_uri, _tableId, true, txn); + WiredTigerCursor startWrap(_uri, _tableId, true, opCtx); WT_CURSOR* truncateStart = startWrap.get(); // If we know where the start point is, set it for the truncate @@ -1153,35 +1156,35 @@ int64_t WiredTigerRecordStore::cappedDeleteAsNeeded_inlock(OperationContext* txn docsRemoved = 0; } else { invariantWTOK(ret); - _changeNumRecords(txn, -docsRemoved); - _increaseDataSize(txn, -sizeSaved); + _changeNumRecords(opCtx, -docsRemoved); + _increaseDataSize(opCtx, -sizeSaved); wuow.commit(); // Save the key for the next round _cappedFirstRecord = firstRemainingId; } } } catch (const WriteConflictException& wce) { - delete txn->releaseRecoveryUnit(); - txn->setRecoveryUnit(realRecoveryUnit, realRUstate); + delete opCtx->releaseRecoveryUnit(); + opCtx->setRecoveryUnit(realRecoveryUnit, realRUstate); log() << "got conflict truncating capped, ignoring"; return 0; } catch (...) { - delete txn->releaseRecoveryUnit(); - txn->setRecoveryUnit(realRecoveryUnit, realRUstate); + delete opCtx->releaseRecoveryUnit(); + opCtx->setRecoveryUnit(realRecoveryUnit, realRUstate); throw; } - delete txn->releaseRecoveryUnit(); - txn->setRecoveryUnit(realRecoveryUnit, realRUstate); + delete opCtx->releaseRecoveryUnit(); + opCtx->setRecoveryUnit(realRecoveryUnit, realRUstate); return docsRemoved; } -bool WiredTigerRecordStore::yieldAndAwaitOplogDeletionRequest(OperationContext* txn) { +bool WiredTigerRecordStore::yieldAndAwaitOplogDeletionRequest(OperationContext* opCtx) { // Create another reference to the oplog stones while holding a lock on the collection to // prevent it from being destructed. std::shared_ptr<OplogStones> oplogStones = _oplogStones; - Locker* locker = txn->lockState(); + Locker* locker = opCtx->lockState(); Locker::LockSnapshot snapshot; // Release any locks before waiting on the condition variable. It is illegal to access any @@ -1191,7 +1194,7 @@ bool WiredTigerRecordStore::yieldAndAwaitOplogDeletionRequest(OperationContext* // The top-level locks were freed, so also release any potential low-level (storage engine) // locks that might be held. - txn->recoveryUnit()->abandonSnapshot(); + opCtx->recoveryUnit()->abandonSnapshot(); // Wait for an oplog deletion request, or for this record store to have been destroyed. oplogStones->awaitHasExcessStonesOrDead(); @@ -1202,7 +1205,7 @@ bool WiredTigerRecordStore::yieldAndAwaitOplogDeletionRequest(OperationContext* return !oplogStones->isDead(); } -void WiredTigerRecordStore::reclaimOplog(OperationContext* txn) { +void WiredTigerRecordStore::reclaimOplog(OperationContext* opCtx) { while (auto stone = _oplogStones->peekOldestStoneIfNeeded()) { invariant(stone->lastRecord.isNormal()); @@ -1210,23 +1213,23 @@ void WiredTigerRecordStore::reclaimOplog(OperationContext* txn) { << stone->lastRecord << " to remove approximately " << stone->records << " records totaling to " << stone->bytes << " bytes"; - WiredTigerRecoveryUnit* ru = WiredTigerRecoveryUnit::get(txn); - WT_SESSION* session = ru->getSession(txn)->getSession(); + WiredTigerRecoveryUnit* ru = WiredTigerRecoveryUnit::get(opCtx); + WT_SESSION* session = ru->getSession(opCtx)->getSession(); try { - WriteUnitOfWork wuow(txn); + WriteUnitOfWork wuow(opCtx); - WiredTigerCursor startwrap(_uri, _tableId, true, txn); + WiredTigerCursor startwrap(_uri, _tableId, true, opCtx); WT_CURSOR* start = startwrap.get(); start->set_key(start, _makeKey(_oplogStones->firstRecord)); - WiredTigerCursor endwrap(_uri, _tableId, true, txn); + WiredTigerCursor endwrap(_uri, _tableId, true, opCtx); WT_CURSOR* end = endwrap.get(); end->set_key(end, _makeKey(stone->lastRecord)); invariantWTOK(session->truncate(session, nullptr, start, end, nullptr)); - _changeNumRecords(txn, -stone->records); - _increaseDataSize(txn, -stone->bytes); + _changeNumRecords(opCtx, -stone->records); + _increaseDataSize(opCtx, -stone->bytes); wuow.commit(); @@ -1244,13 +1247,13 @@ void WiredTigerRecordStore::reclaimOplog(OperationContext* txn) { << " records totaling to " << _dataSize.load() << " bytes"; } -Status WiredTigerRecordStore::insertRecords(OperationContext* txn, +Status WiredTigerRecordStore::insertRecords(OperationContext* opCtx, std::vector<Record>* records, bool enforceQuota) { - return _insertRecords(txn, records->data(), records->size()); + return _insertRecords(opCtx, records->data(), records->size()); } -Status WiredTigerRecordStore::_insertRecords(OperationContext* txn, +Status WiredTigerRecordStore::_insertRecords(OperationContext* opCtx, Record* records, size_t nRecords) { // We are kind of cheating on capped collections since we write all of them at once .... @@ -1263,7 +1266,7 @@ Status WiredTigerRecordStore::_insertRecords(OperationContext* txn, if (_isCapped && totalLength > _cappedMaxSize) return Status(ErrorCodes::BadValue, "object to insert exceeds cappedMaxSize"); - WiredTigerCursor curwrap(_uri, _tableId, true, txn); + WiredTigerCursor curwrap(_uri, _tableId, true, opCtx); curwrap.assertInActiveTxn(); WT_CURSOR* c = curwrap.get(); invariant(c); @@ -1281,7 +1284,7 @@ Status WiredTigerRecordStore::_insertRecords(OperationContext* txn, } else if (_isCapped) { stdx::lock_guard<stdx::mutex> lk(_uncommittedRecordIdsMutex); record.id = _nextId(); - _addUncommittedRecordId_inlock(txn, record.id); + _addUncommittedRecordId_inlock(opCtx, record.id); } else { record.id = _nextId(); } @@ -1305,24 +1308,25 @@ Status WiredTigerRecordStore::_insertRecords(OperationContext* txn, return wtRCToStatus(ret, "WiredTigerRecordStore::insertRecord"); } - _changeNumRecords(txn, nRecords); - _increaseDataSize(txn, totalLength); + _changeNumRecords(opCtx, nRecords); + _increaseDataSize(opCtx, totalLength); if (_oplogStones) { - _oplogStones->updateCurrentStoneAfterInsertOnCommit(txn, totalLength, highestId, nRecords); + _oplogStones->updateCurrentStoneAfterInsertOnCommit( + opCtx, totalLength, highestId, nRecords); } else { - cappedDeleteAsNeeded(txn, highestId); + cappedDeleteAsNeeded(opCtx, highestId); } return Status::OK(); } -StatusWith<RecordId> WiredTigerRecordStore::insertRecord(OperationContext* txn, +StatusWith<RecordId> WiredTigerRecordStore::insertRecord(OperationContext* opCtx, const char* data, int len, bool enforceQuota) { Record record = {RecordId(), RecordData(data, len)}; - Status status = _insertRecords(txn, &record, 1); + Status status = _insertRecords(opCtx, &record, 1); if (!status.isOK()) return StatusWith<RecordId>(status); return StatusWith<RecordId>(record.id); @@ -1362,7 +1366,7 @@ RecordId WiredTigerRecordStore::lowestCappedHiddenRecord() const { return _uncommittedRecordIds.empty() ? RecordId() : _uncommittedRecordIds.front(); } -Status WiredTigerRecordStore::insertRecordsWithDocWriter(OperationContext* txn, +Status WiredTigerRecordStore::insertRecordsWithDocWriter(OperationContext* opCtx, const DocWriter* const* docs, size_t nDocs, RecordId* idsOut) { @@ -1388,7 +1392,7 @@ Status WiredTigerRecordStore::insertRecordsWithDocWriter(OperationContext* txn, } invariant(pos == (buffer.get() + totalSize)); - Status s = _insertRecords(txn, records.get(), nDocs); + Status s = _insertRecords(opCtx, records.get(), nDocs); if (!s.isOK()) return s; @@ -1401,13 +1405,13 @@ Status WiredTigerRecordStore::insertRecordsWithDocWriter(OperationContext* txn, return s; } -Status WiredTigerRecordStore::updateRecord(OperationContext* txn, +Status WiredTigerRecordStore::updateRecord(OperationContext* opCtx, const RecordId& id, const char* data, int len, bool enforceQuota, UpdateNotifier* notifier) { - WiredTigerCursor curwrap(_uri, _tableId, true, txn); + WiredTigerCursor curwrap(_uri, _tableId, true, opCtx); curwrap.assertInActiveTxn(); WT_CURSOR* c = curwrap.get(); invariant(c); @@ -1431,9 +1435,9 @@ Status WiredTigerRecordStore::updateRecord(OperationContext* txn, ret = WT_OP_CHECK(c->insert(c)); invariantWTOK(ret); - _increaseDataSize(txn, len - old_length); + _increaseDataSize(opCtx, len - old_length); if (!_oplogStones) { - cappedDeleteAsNeeded(txn, id); + cappedDeleteAsNeeded(opCtx, id); } return Status::OK(); @@ -1444,7 +1448,7 @@ bool WiredTigerRecordStore::updateWithDamagesSupported() const { } StatusWith<RecordData> WiredTigerRecordStore::updateWithDamages( - OperationContext* txn, + OperationContext* opCtx, const RecordId& id, const RecordData& oldRec, const char* damageSource, @@ -1461,41 +1465,42 @@ void WiredTigerRecordStore::_oplogSetStartHack(WiredTigerRecoveryUnit* wru) cons } } -std::unique_ptr<SeekableRecordCursor> WiredTigerRecordStore::getCursor(OperationContext* txn, +std::unique_ptr<SeekableRecordCursor> WiredTigerRecordStore::getCursor(OperationContext* opCtx, bool forward) const { if (_isOplog && forward) { - WiredTigerRecoveryUnit* wru = WiredTigerRecoveryUnit::get(txn); + WiredTigerRecoveryUnit* wru = WiredTigerRecoveryUnit::get(opCtx); // If we already have a snapshot we don't know what it can see, unless we know no one // else could be writing (because we hold an exclusive lock). - if (wru->inActiveTxn() && !txn->lockState()->isNoop() && - !txn->lockState()->isCollectionLockedForMode(_ns, MODE_X)) { + if (wru->inActiveTxn() && !opCtx->lockState()->isNoop() && + !opCtx->lockState()->isCollectionLockedForMode(_ns, MODE_X)) { throw WriteConflictException(); } _oplogSetStartHack(wru); } - return stdx::make_unique<Cursor>(txn, *this, forward); + return stdx::make_unique<Cursor>(opCtx, *this, forward); } -std::unique_ptr<RecordCursor> WiredTigerRecordStore::getRandomCursor(OperationContext* txn) const { +std::unique_ptr<RecordCursor> WiredTigerRecordStore::getRandomCursor( + OperationContext* opCtx) const { const char* extraConfig = ""; - return getRandomCursorWithOptions(txn, extraConfig); + return getRandomCursorWithOptions(opCtx, extraConfig); } std::unique_ptr<RecordCursor> WiredTigerRecordStore::getRandomCursorWithOptions( - OperationContext* txn, StringData extraConfig) const { - return stdx::make_unique<RandomCursor>(txn, *this, extraConfig); + OperationContext* opCtx, StringData extraConfig) const { + return stdx::make_unique<RandomCursor>(opCtx, *this, extraConfig); } std::vector<std::unique_ptr<RecordCursor>> WiredTigerRecordStore::getManyCursors( - OperationContext* txn) const { + OperationContext* opCtx) const { std::vector<std::unique_ptr<RecordCursor>> cursors(1); - cursors[0] = stdx::make_unique<Cursor>(txn, *this, /*forward=*/true); + cursors[0] = stdx::make_unique<Cursor>(opCtx, *this, /*forward=*/true); return cursors; } -Status WiredTigerRecordStore::truncate(OperationContext* txn) { - WiredTigerCursor startWrap(_uri, _tableId, true, txn); +Status WiredTigerRecordStore::truncate(OperationContext* opCtx) { + WiredTigerCursor startWrap(_uri, _tableId, true, opCtx); WT_CURSOR* start = startWrap.get(); int ret = WT_OP_CHECK(start->next(start)); // Empty collections don't have anything to truncate. @@ -1504,23 +1509,23 @@ Status WiredTigerRecordStore::truncate(OperationContext* txn) { } invariantWTOK(ret); - WT_SESSION* session = WiredTigerRecoveryUnit::get(txn)->getSession(txn)->getSession(); + WT_SESSION* session = WiredTigerRecoveryUnit::get(opCtx)->getSession(opCtx)->getSession(); invariantWTOK(WT_OP_CHECK(session->truncate(session, NULL, start, NULL, NULL))); - _changeNumRecords(txn, -numRecords(txn)); - _increaseDataSize(txn, -dataSize(txn)); + _changeNumRecords(opCtx, -numRecords(opCtx)); + _increaseDataSize(opCtx, -dataSize(opCtx)); if (_oplogStones) { - _oplogStones->clearStonesOnCommit(txn); + _oplogStones->clearStonesOnCommit(opCtx); } return Status::OK(); } -Status WiredTigerRecordStore::compact(OperationContext* txn, +Status WiredTigerRecordStore::compact(OperationContext* opCtx, RecordStoreCompactAdaptor* adaptor, const CompactOptions* options, CompactStats* stats) { - WiredTigerSessionCache* cache = WiredTigerRecoveryUnit::get(txn)->getSessionCache(); + WiredTigerSessionCache* cache = WiredTigerRecoveryUnit::get(opCtx)->getSessionCache(); if (!cache->isEphemeral()) { UniqueWiredTigerSession session = cache->getSession(); WT_SESSION* s = session->getSession(); @@ -1530,13 +1535,13 @@ Status WiredTigerRecordStore::compact(OperationContext* txn, return Status::OK(); } -Status WiredTigerRecordStore::validate(OperationContext* txn, +Status WiredTigerRecordStore::validate(OperationContext* opCtx, ValidateCmdLevel level, ValidateAdaptor* adaptor, ValidateResults* results, BSONObjBuilder* output) { if (!_isEphemeral) { - int err = WiredTigerUtil::verifyTable(txn, _uri, &results->errors); + int err = WiredTigerUtil::verifyTable(opCtx, _uri, &results->errors); if (err == EBUSY) { const char* msg = "verify() returned EBUSY. Not treating as invalid."; warning() << msg; @@ -1558,12 +1563,12 @@ Status WiredTigerRecordStore::validate(OperationContext* txn, long long nInvalid = 0; results->valid = true; - Cursor cursor(txn, *this, true); + Cursor cursor(opCtx, *this, true); int interruptInterval = 4096; while (auto record = cursor.next()) { if (!(nrecords % interruptInterval)) - txn->checkForInterrupt(); + opCtx->checkForInterrupt(); ++nrecords; auto dataSize = record->data.size(); dataSizeTotal += dataSize; @@ -1606,7 +1611,7 @@ Status WiredTigerRecordStore::validate(OperationContext* txn, return Status::OK(); } -void WiredTigerRecordStore::appendCustomStats(OperationContext* txn, +void WiredTigerRecordStore::appendCustomStats(OperationContext* opCtx, BSONObjBuilder* result, double scale) const { result->appendBool("capped", _isCapped); @@ -1616,12 +1621,12 @@ void WiredTigerRecordStore::appendCustomStats(OperationContext* txn, result->appendIntOrLL("sleepCount", _cappedSleep.load()); result->appendIntOrLL("sleepMS", _cappedSleepMS.load()); } - WiredTigerSession* session = WiredTigerRecoveryUnit::get(txn)->getSession(txn); + WiredTigerSession* session = WiredTigerRecoveryUnit::get(opCtx)->getSession(opCtx); WT_SESSION* s = session->getSession(); BSONObjBuilder bob(result->subobjStart(_engineName)); { BSONObjBuilder metadata(bob.subobjStart("metadata")); - Status status = WiredTigerUtil::getApplicationMetadata(txn, getURI(), &metadata); + Status status = WiredTigerUtil::getApplicationMetadata(opCtx, getURI(), &metadata); if (!status.isOK()) { metadata.append("error", "unable to retrieve metadata"); metadata.append("code", static_cast<int>(status.code())); @@ -1630,8 +1635,8 @@ void WiredTigerRecordStore::appendCustomStats(OperationContext* txn, } std::string type, sourceURI; - WiredTigerUtil::fetchTypeAndSourceURI(txn, _uri, &type, &sourceURI); - StatusWith<std::string> metadataResult = WiredTigerUtil::getMetadata(txn, sourceURI); + WiredTigerUtil::fetchTypeAndSourceURI(opCtx, _uri, &type, &sourceURI); + StatusWith<std::string> metadataResult = WiredTigerUtil::getMetadata(opCtx, sourceURI); StringData creationStringName("creationString"); if (!metadataResult.isOK()) { BSONObjBuilder creationString(bob.subobjStart(creationStringName)); @@ -1653,7 +1658,7 @@ void WiredTigerRecordStore::appendCustomStats(OperationContext* txn, } } -Status WiredTigerRecordStore::touch(OperationContext* txn, BSONObjBuilder* output) const { +Status WiredTigerRecordStore::touch(OperationContext* opCtx, BSONObjBuilder* output) const { if (_isEphemeral) { // Everything is already in memory. return Status::OK(); @@ -1661,13 +1666,14 @@ Status WiredTigerRecordStore::touch(OperationContext* txn, BSONObjBuilder* outpu return Status(ErrorCodes::CommandNotSupported, "this storage engine does not support touch"); } -Status WiredTigerRecordStore::oplogDiskLocRegister(OperationContext* txn, const Timestamp& opTime) { +Status WiredTigerRecordStore::oplogDiskLocRegister(OperationContext* opCtx, + const Timestamp& opTime) { StatusWith<RecordId> id = oploghack::keyForOptime(opTime); if (!id.isOK()) return id.getStatus(); stdx::lock_guard<stdx::mutex> lk(_uncommittedRecordIdsMutex); - _addUncommittedRecordId_inlock(txn, id.getValue()); + _addUncommittedRecordId_inlock(opCtx, id.getValue()); return Status::OK(); } @@ -1733,38 +1739,38 @@ void WiredTigerRecordStore::_oplogJournalThreadLoop(WiredTigerSessionCache* sess std::terminate(); } -void WiredTigerRecordStore::waitForAllEarlierOplogWritesToBeVisible(OperationContext* txn) const { - invariant(txn->lockState()->isNoop() || !txn->lockState()->inAWriteUnitOfWork()); +void WiredTigerRecordStore::waitForAllEarlierOplogWritesToBeVisible(OperationContext* opCtx) const { + invariant(opCtx->lockState()->isNoop() || !opCtx->lockState()->inAWriteUnitOfWork()); // This function must not start a WT transaction, otherwise we will get stuck in an infinite // loop of WCE handling when the getCursor() is called. stdx::unique_lock<stdx::mutex> lk(_uncommittedRecordIdsMutex); const auto waitingFor = _oplog_highestSeen; - txn->waitForConditionOrInterrupt(_opsBecameVisibleCV, lk, [&] { + opCtx->waitForConditionOrInterrupt(_opsBecameVisibleCV, lk, [&] { return _uncommittedRecordIds.empty() || _uncommittedRecordIds.front() > waitingFor; }); } -void WiredTigerRecordStore::_addUncommittedRecordId_inlock(OperationContext* txn, RecordId id) { +void WiredTigerRecordStore::_addUncommittedRecordId_inlock(OperationContext* opCtx, RecordId id) { dassert(_uncommittedRecordIds.empty() || _uncommittedRecordIds.back() < id); SortedRecordIds::iterator it = _uncommittedRecordIds.insert(_uncommittedRecordIds.end(), id); invariant(it->isNormal()); - txn->recoveryUnit()->registerChange(new CappedInsertChange(this, it)); + opCtx->recoveryUnit()->registerChange(new CappedInsertChange(this, it)); _oplog_highestSeen = id; } boost::optional<RecordId> WiredTigerRecordStore::oplogStartHack( - OperationContext* txn, const RecordId& startingPosition) const { + OperationContext* opCtx, const RecordId& startingPosition) const { if (!_useOplogHack) return boost::none; { - WiredTigerRecoveryUnit* wru = WiredTigerRecoveryUnit::get(txn); + WiredTigerRecoveryUnit* wru = WiredTigerRecoveryUnit::get(opCtx); _oplogSetStartHack(wru); } - WiredTigerCursor cursor(_uri, _tableId, true, txn); + WiredTigerCursor cursor(_uri, _tableId, true, opCtx); WT_CURSOR* c = cursor.get(); int cmp; @@ -1782,7 +1788,7 @@ boost::optional<RecordId> WiredTigerRecordStore::oplogStartHack( return _fromKey(key); } -void WiredTigerRecordStore::updateStatsAfterRepair(OperationContext* txn, +void WiredTigerRecordStore::updateStatsAfterRepair(OperationContext* opCtx, long long numRecords, long long dataSize) { _numRecords.store(numRecords); @@ -1800,8 +1806,8 @@ RecordId WiredTigerRecordStore::_nextId() { return out; } -WiredTigerRecoveryUnit* WiredTigerRecordStore::_getRecoveryUnit(OperationContext* txn) { - return checked_cast<WiredTigerRecoveryUnit*>(txn->recoveryUnit()); +WiredTigerRecoveryUnit* WiredTigerRecordStore::_getRecoveryUnit(OperationContext* opCtx) { + return checked_cast<WiredTigerRecoveryUnit*>(opCtx->recoveryUnit()); } class WiredTigerRecordStore::NumRecordsChange : public RecoveryUnit::Change { @@ -1817,8 +1823,8 @@ private: int64_t _diff; }; -void WiredTigerRecordStore::_changeNumRecords(OperationContext* txn, int64_t diff) { - txn->recoveryUnit()->registerChange(new NumRecordsChange(this, diff)); +void WiredTigerRecordStore::_changeNumRecords(OperationContext* opCtx, int64_t diff) { + opCtx->recoveryUnit()->registerChange(new NumRecordsChange(this, diff)); if (_numRecords.fetchAndAdd(diff) < 0) _numRecords.store(std::max(diff, int64_t(0))); } @@ -1836,9 +1842,9 @@ private: int64_t _amount; }; -void WiredTigerRecordStore::_increaseDataSize(OperationContext* txn, int64_t amount) { - if (txn) - txn->recoveryUnit()->registerChange(new DataSizeChange(this, amount)); +void WiredTigerRecordStore::_increaseDataSize(OperationContext* opCtx, int64_t amount) { + if (opCtx) + opCtx->recoveryUnit()->registerChange(new DataSizeChange(this, amount)); if (_dataSize.fetchAndAdd(amount) < 0) _dataSize.store(std::max(amount, int64_t(0))); @@ -1855,10 +1861,10 @@ RecordId WiredTigerRecordStore::_fromKey(int64_t key) { return RecordId(key); } -void WiredTigerRecordStore::cappedTruncateAfter(OperationContext* txn, +void WiredTigerRecordStore::cappedTruncateAfter(OperationContext* opCtx, RecordId end, bool inclusive) { - Cursor cursor(txn, *this); + Cursor cursor(opCtx, *this); auto record = cursor.seekExact(end); massert(28807, str::stream() << "Failed to seek to the record located at " << end, record); @@ -1869,7 +1875,7 @@ void WiredTigerRecordStore::cappedTruncateAfter(OperationContext* txn, RecordId firstRemovedId; if (inclusive) { - Cursor reverseCursor(txn, *this, false); + Cursor reverseCursor(opCtx, *this, false); invariant(reverseCursor.seekExact(end)); auto prev = reverseCursor.next(); lastKeptId = prev ? prev->id : RecordId(); @@ -1891,7 +1897,7 @@ void WiredTigerRecordStore::cappedTruncateAfter(OperationContext* txn, do { if (_cappedCallback) { uassertStatusOK( - _cappedCallback->aboutToDeleteCapped(txn, record->id, record->data)); + _cappedCallback->aboutToDeleteCapped(opCtx, record->id, record->data)); } recordsRemoved++; bytesRemoved += record->data.size(); @@ -1900,17 +1906,17 @@ void WiredTigerRecordStore::cappedTruncateAfter(OperationContext* txn, // Truncate the collection starting from the record located at 'firstRemovedId' to the end of // the collection. - WriteUnitOfWork wuow(txn); + WriteUnitOfWork wuow(opCtx); - WiredTigerCursor startwrap(_uri, _tableId, true, txn); + WiredTigerCursor startwrap(_uri, _tableId, true, opCtx); WT_CURSOR* start = startwrap.get(); start->set_key(start, _makeKey(firstRemovedId)); - WT_SESSION* session = WiredTigerRecoveryUnit::get(txn)->getSession(txn)->getSession(); + WT_SESSION* session = WiredTigerRecoveryUnit::get(opCtx)->getSession(opCtx)->getSession(); invariantWTOK(session->truncate(session, nullptr, start, nullptr, nullptr)); - _changeNumRecords(txn, -recordsRemoved); - _increaseDataSize(txn, -bytesRemoved); + _changeNumRecords(opCtx, -recordsRemoved); + _increaseDataSize(opCtx, -bytesRemoved); wuow.commit(); |