diff options
author | Andrew Chen <andrew.chen@10gen.com> | 2020-02-28 11:25:10 -0500 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-02-28 16:39:37 +0000 |
commit | d6a9438a1dbbdddf0af13f60d2bd8f77cd3cfd11 (patch) | |
tree | ef1f80f006b6ad9116e6c07d0fe42705dd71dfc0 | |
parent | 28210b5144c11ea5cb09bf19ff98e67c36c14c90 (diff) | |
download | mongo-d6a9438a1dbbdddf0af13f60d2bd8f77cd3cfd11.tar.gz |
SERVER-46094: Now using walltime instead of ts when checking for excess oplog stones.
-rw-r--r-- | src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp | 122 | ||||
-rw-r--r-- | src/mongo/db/storage/wiredtiger/wiredtiger_record_store_oplog_stones.h | 8 |
2 files changed, 88 insertions, 42 deletions
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp index 102a73097f5..00517e8dcd7 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp @@ -80,13 +80,21 @@ using std::unique_ptr; namespace { +struct RecordIdAndWall { + RecordId id; + Date_t wall; + + RecordIdAndWall(RecordId lastRecord, Date_t wallTime) : id(lastRecord), wall(wallTime) {} +}; + + static const int kMinimumRecordStoreVersion = 1; static const int kCurrentRecordStoreVersion = 1; // New record stores use this by default. static const int kMaximumRecordStoreVersion = 1; MONGO_STATIC_ASSERT(kCurrentRecordStoreVersion >= kMinimumRecordStoreVersion); MONGO_STATIC_ASSERT(kCurrentRecordStoreVersion <= kMaximumRecordStoreVersion); -const double kNumSecsInHour = 3600.0; +const double kNumMSInHour = 1000 * 60 * 60; void checkOplogFormatVersion(OperationContext* opCtx, const std::string& uri) { StatusWith<BSONObj> appMetadata = WiredTigerUtil::getApplicationMetadata(opCtx, uri); @@ -118,21 +126,42 @@ class WiredTigerRecordStore::OplogStones::InsertChange final : public RecoveryUn public: InsertChange(OplogStones* oplogStones, int64_t bytesInserted, - RecordId highestInserted, - int64_t countInserted) + const Record& highestInsertedRecord, + int64_t countInserted, + OperationContext* opCtx) : _oplogStones(oplogStones), _bytesInserted(bytesInserted), - _highestInserted(highestInserted), - _countInserted(countInserted) {} + _recordId(highestInsertedRecord.id), + _countInserted(countInserted), + _opCtx(opCtx) { + // We only want to initialize _wall by parsing BSONObj when we expect to need it in + // OplogStone::createNewStoneIfNeeded. + int64_t currBytes = _oplogStones->_currentBytes.load() + _bytesInserted; + if (currBytes >= _oplogStones->_minBytesPerStone) { + BSONObj obj = highestInsertedRecord.data.toBson(); + BSONElement ele = obj["wall"]; + if (!ele) { + // This shouldn't happen in normal cases, but this is needed because some tests do + // not add wall clock times. Note that, with this addition, it's possible that the + // oplog may grow larger than expected if --oplogMinRetentionHours is set. + _wall = Date_t::now(); + } else { + _wall = ele.Date(); + } + } + } void commit(boost::optional<Timestamp>) final { invariant(_bytesInserted >= 0); - invariant(_highestInserted.isValid()); + invariant(_recordId.isValid()); _oplogStones->_currentRecords.addAndFetch(_countInserted); int64_t newCurrentBytes = _oplogStones->_currentBytes.addAndFetch(_bytesInserted); - if (newCurrentBytes >= _oplogStones->_minBytesPerStone) { - _oplogStones->createNewStoneIfNeeded(_highestInserted); + if (_wall != Date_t() && newCurrentBytes >= _oplogStones->_minBytesPerStone) { + // When other InsertChanges commit concurrently, an uninitialized wallTime may delay the + // creation of a new stone. This delay is limited to the number of concurrently running + // transactions, so the size difference should be inconsequential. + _oplogStones->createNewStoneIfNeeded(_opCtx, _recordId, _wall); } } @@ -141,8 +170,10 @@ public: private: OplogStones* _oplogStones; int64_t _bytesInserted; - RecordId _highestInserted; + RecordId _recordId; int64_t _countInserted; + OperationContext* _opCtx; + Date_t _wall; }; class WiredTigerRecordStore::OplogStones::TruncateChange final : public RecoveryUnit::Change { @@ -248,11 +279,11 @@ bool WiredTigerRecordStore::OplogStones::hasExcessStones_inlock() const { return true; } - auto rc = repl::ReplicationCoordinator::get(getGlobalServiceContext()); - double lastAppliedTs = rc->getMyLastAppliedOpTime().getTimestamp().getSecs(); - double lastStoneTs = Timestamp(_stones.front().lastRecord.repr()).getSecs(); + auto nowWall = Date_t::now(); + auto lastStoneWall = _stones.front().wallTime; - double currRetentionHours = (lastAppliedTs - lastStoneTs) / kNumSecsInHour; + auto currRetentionMS = durationCount<Milliseconds>(nowWall - lastStoneWall); + double currRetentionHours = currRetentionMS / kNumMSInHour; return currRetentionHours >= minRetentionHours; } @@ -272,7 +303,9 @@ void WiredTigerRecordStore::OplogStones::popOldestStone() { _stones.pop_front(); } -void WiredTigerRecordStore::OplogStones::createNewStoneIfNeeded(RecordId lastRecord) { +void WiredTigerRecordStore::OplogStones::createNewStoneIfNeeded(OperationContext* opCtx, + RecordId lastRecord, + Date_t wallTime) { stdx::unique_lock<Latch> lk(_mutex, stdx::try_to_lock); if (!lk) { // Someone else is either already creating a new stone or popping the oldest one. In the @@ -296,7 +329,8 @@ void WiredTigerRecordStore::OplogStones::createNewStoneIfNeeded(RecordId lastRec 2, "create new oplogStone, current stones:{stones_size}", "stones_size"_attr = _stones.size()); - OplogStones::Stone stone = {_currentRecords.swap(0), _currentBytes.swap(0), lastRecord}; + + OplogStones::Stone stone(_currentRecords.swap(0), _currentBytes.swap(0), lastRecord, wallTime); _stones.push_back(stone); _pokeReclaimThreadIfNeeded(); @@ -305,10 +339,10 @@ void WiredTigerRecordStore::OplogStones::createNewStoneIfNeeded(RecordId lastRec void WiredTigerRecordStore::OplogStones::updateCurrentStoneAfterInsertOnCommit( OperationContext* opCtx, int64_t bytesInserted, - RecordId highestInserted, + const Record& highestInsertedRecord, int64_t countInserted) { - opCtx->recoveryUnit()->registerChange( - std::make_unique<InsertChange>(this, bytesInserted, highestInserted, countInserted)); + opCtx->recoveryUnit()->registerChange(std::make_unique<InsertChange>( + this, bytesInserted, highestInsertedRecord, countInserted, opCtx)); } void WiredTigerRecordStore::OplogStones::clearStonesOnCommit(OperationContext* opCtx) { @@ -415,13 +449,13 @@ void WiredTigerRecordStore::OplogStones::_calculateStonesByScanning(OperationCon _currentRecords.addAndFetch(1); int64_t newCurrentBytes = _currentBytes.addAndFetch(record->data.size()); if (newCurrentBytes >= _minBytesPerStone) { - LOGV2_DEBUG(22385, - 1, - "Placing a marker at optime {Timestamp_record_id_repr_Pretty}", - "Timestamp_record_id_repr_Pretty"_attr = - Timestamp(record->id.repr()).toStringPretty()); + BSONObj obj = record->data.toBson(); + Date_t wallTime = obj["wall"].Date(); - OplogStones::Stone stone = {_currentRecords.swap(0), _currentBytes.swap(0), record->id}; + LOGV2_DEBUG( + 22385, 1, "Marking oplog entry as a potential future oplog truncation point."); + OplogStones::Stone stone( + _currentRecords.swap(0), _currentBytes.swap(0), record->id, wallTime); _stones.push_back(stone); } @@ -495,7 +529,7 @@ void WiredTigerRecordStore::OplogStones::_calculateStonesBySampling(OperationCon // order of their RecordId, and then choosing the samples expected to be near the right edge of // each logical section. auto cursor = _rs->getRandomCursorWithOptions(opCtx, extraConfig); - std::vector<RecordId> oplogEstimates; + std::vector<RecordIdAndWall> oplogEstimates; auto lastProgressLog = Date_t::now(); for (int i = 0; i < numSamples; ++i) { auto samplingLogIntervalSeconds = gOplogSamplingLogIntervalSeconds.load(); @@ -509,7 +543,10 @@ void WiredTigerRecordStore::OplogStones::_calculateStonesBySampling(OperationCon _calculateStonesByScanning(opCtx); return; } - oplogEstimates.push_back(record->id); + + BSONObj obj = record->data.toBson(); + Date_t wall = obj["wall"].Date(); + oplogEstimates.push_back(RecordIdAndWall{record->id, wall}); const auto now = Date_t::now(); if (samplingLogIntervalSeconds > 0 && @@ -521,20 +558,25 @@ void WiredTigerRecordStore::OplogStones::_calculateStonesBySampling(OperationCon lastProgressLog = now; } } - std::sort(oplogEstimates.begin(), oplogEstimates.end()); + std::sort(oplogEstimates.begin(), + oplogEstimates.end(), + [](RecordIdAndWall a, RecordIdAndWall b) { return a.id < b.id; }); LOGV2(22393, "Oplog sampling complete"); for (int i = 1; i <= wholeStones; ++i) { // Use every (kRandomSamplesPerStone)th sample, starting with the // (kRandomSamplesPerStone - 1)th, as the last record for each stone. + // If parsing "wall" fails, we crash to allow user to fix their oplog. int sampleIndex = kRandomSamplesPerStone * i - 1; - RecordId lastRecord = oplogEstimates[sampleIndex]; - - LOGV2(22394, - "Placing a marker at optime {Timestamp_lastRecord_repr_Pretty}", - "Timestamp_lastRecord_repr_Pretty"_attr = - Timestamp(lastRecord.repr()).toStringPretty()); - OplogStones::Stone stone = {estRecordsPerStone, estBytesPerStone, lastRecord}; + RecordIdAndWall rIdAndWall = oplogEstimates[sampleIndex]; + LOGV2_DEBUG(22394, + 1, + "Marking oplog entry as a potential future oplog truncation point. wall: " + "{wall}, ts: {ts}", + "wall"_attr = rIdAndWall.wall, + "ts"_attr = rIdAndWall.id); + OplogStones::Stone stone( + estRecordsPerStone, estBytesPerStone, rIdAndWall.id, rIdAndWall.wall); _stones.push_back(stone); } @@ -1394,8 +1436,8 @@ Status WiredTigerRecordStore::_insertRecords(OperationContext* opCtx, WT_CURSOR* c = curwrap.get(); invariant(c); - RecordId highestId = RecordId(); - dassert(nRecords != 0); + Record highestIdRecord; + invariant(nRecords != 0); for (size_t i = 0; i < nRecords; i++) { auto& record = records[i]; if (_isOplog) { @@ -1407,8 +1449,8 @@ Status WiredTigerRecordStore::_insertRecords(OperationContext* opCtx, } else { record.id = _nextId(opCtx); } - dassert(record.id > highestId); - highestId = record.id; + dassert(record.id > highestIdRecord.id); + highestIdRecord = record; } for (size_t i = 0; i < nRecords; i++) { @@ -1443,9 +1485,9 @@ Status WiredTigerRecordStore::_insertRecords(OperationContext* opCtx, if (_oplogStones) { _oplogStones->updateCurrentStoneAfterInsertOnCommit( - opCtx, totalLength, highestId, nRecords); + opCtx, totalLength, highestIdRecord, nRecords); } else { - _cappedDeleteAsNeeded(opCtx, highestId); + _cappedDeleteAsNeeded(opCtx, highestIdRecord.id); } return Status::OK(); diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_oplog_stones.h b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_oplog_stones.h index be7eb4afb7d..1f1412c0b81 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_oplog_stones.h +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_oplog_stones.h @@ -49,6 +49,10 @@ public: int64_t records; // Approximate number of records in a chunk of the oplog. int64_t bytes; // Approximate size of records in a chunk of the oplog. RecordId lastRecord; // RecordId of the last record in a chunk of the oplog. + Date_t wallTime; // Walltime of when this chunk of the oplog was created. + + Stone(int64_t records, int64_t bytes, RecordId lastRecord, Date_t wallTime) + : records(records), bytes(bytes), lastRecord(lastRecord), wallTime(wallTime) {} }; OplogStones(OperationContext* opCtx, WiredTigerRecordStore* rs); @@ -73,11 +77,11 @@ public: void popOldestStone(); - void createNewStoneIfNeeded(RecordId lastRecord); + void createNewStoneIfNeeded(OperationContext* opCtx, RecordId lastRecord, Date_t wallTime); void updateCurrentStoneAfterInsertOnCommit(OperationContext* opCtx, int64_t bytesInserted, - RecordId highestInserted, + const Record& highestInsertedRecord, int64_t countInserted); void clearStonesOnCommit(OperationContext* opCtx); |