summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew Chen <andrew.chen@10gen.com>2020-02-28 11:25:10 -0500
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-02-28 16:39:37 +0000
commitd6a9438a1dbbdddf0af13f60d2bd8f77cd3cfd11 (patch)
treeef1f80f006b6ad9116e6c07d0fe42705dd71dfc0
parent28210b5144c11ea5cb09bf19ff98e67c36c14c90 (diff)
downloadmongo-d6a9438a1dbbdddf0af13f60d2bd8f77cd3cfd11.tar.gz
SERVER-46094: Now using walltime instead of ts when checking for excess oplog stones.
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp122
-rw-r--r--src/mongo/db/storage/wiredtiger/wiredtiger_record_store_oplog_stones.h8
2 files changed, 88 insertions, 42 deletions
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp
index 102a73097f5..00517e8dcd7 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp
@@ -80,13 +80,21 @@ using std::unique_ptr;
namespace {
+struct RecordIdAndWall {
+ RecordId id;
+ Date_t wall;
+
+ RecordIdAndWall(RecordId lastRecord, Date_t wallTime) : id(lastRecord), wall(wallTime) {}
+};
+
+
static const int kMinimumRecordStoreVersion = 1;
static const int kCurrentRecordStoreVersion = 1; // New record stores use this by default.
static const int kMaximumRecordStoreVersion = 1;
MONGO_STATIC_ASSERT(kCurrentRecordStoreVersion >= kMinimumRecordStoreVersion);
MONGO_STATIC_ASSERT(kCurrentRecordStoreVersion <= kMaximumRecordStoreVersion);
-const double kNumSecsInHour = 3600.0;
+const double kNumMSInHour = 1000 * 60 * 60;
void checkOplogFormatVersion(OperationContext* opCtx, const std::string& uri) {
StatusWith<BSONObj> appMetadata = WiredTigerUtil::getApplicationMetadata(opCtx, uri);
@@ -118,21 +126,42 @@ class WiredTigerRecordStore::OplogStones::InsertChange final : public RecoveryUn
public:
InsertChange(OplogStones* oplogStones,
int64_t bytesInserted,
- RecordId highestInserted,
- int64_t countInserted)
+ const Record& highestInsertedRecord,
+ int64_t countInserted,
+ OperationContext* opCtx)
: _oplogStones(oplogStones),
_bytesInserted(bytesInserted),
- _highestInserted(highestInserted),
- _countInserted(countInserted) {}
+ _recordId(highestInsertedRecord.id),
+ _countInserted(countInserted),
+ _opCtx(opCtx) {
+ // We only want to initialize _wall by parsing BSONObj when we expect to need it in
+ // OplogStone::createNewStoneIfNeeded.
+ int64_t currBytes = _oplogStones->_currentBytes.load() + _bytesInserted;
+ if (currBytes >= _oplogStones->_minBytesPerStone) {
+ BSONObj obj = highestInsertedRecord.data.toBson();
+ BSONElement ele = obj["wall"];
+ if (!ele) {
+ // This shouldn't happen in normal cases, but this is needed because some tests do
+ // not add wall clock times. Note that, with this addition, it's possible that the
+ // oplog may grow larger than expected if --oplogMinRetentionHours is set.
+ _wall = Date_t::now();
+ } else {
+ _wall = ele.Date();
+ }
+ }
+ }
void commit(boost::optional<Timestamp>) final {
invariant(_bytesInserted >= 0);
- invariant(_highestInserted.isValid());
+ invariant(_recordId.isValid());
_oplogStones->_currentRecords.addAndFetch(_countInserted);
int64_t newCurrentBytes = _oplogStones->_currentBytes.addAndFetch(_bytesInserted);
- if (newCurrentBytes >= _oplogStones->_minBytesPerStone) {
- _oplogStones->createNewStoneIfNeeded(_highestInserted);
+ if (_wall != Date_t() && newCurrentBytes >= _oplogStones->_minBytesPerStone) {
+ // When other InsertChanges commit concurrently, an uninitialized wallTime may delay the
+ // creation of a new stone. This delay is limited to the number of concurrently running
+ // transactions, so the size difference should be inconsequential.
+ _oplogStones->createNewStoneIfNeeded(_opCtx, _recordId, _wall);
}
}
@@ -141,8 +170,10 @@ public:
private:
OplogStones* _oplogStones;
int64_t _bytesInserted;
- RecordId _highestInserted;
+ RecordId _recordId;
int64_t _countInserted;
+ OperationContext* _opCtx;
+ Date_t _wall;
};
class WiredTigerRecordStore::OplogStones::TruncateChange final : public RecoveryUnit::Change {
@@ -248,11 +279,11 @@ bool WiredTigerRecordStore::OplogStones::hasExcessStones_inlock() const {
return true;
}
- auto rc = repl::ReplicationCoordinator::get(getGlobalServiceContext());
- double lastAppliedTs = rc->getMyLastAppliedOpTime().getTimestamp().getSecs();
- double lastStoneTs = Timestamp(_stones.front().lastRecord.repr()).getSecs();
+ auto nowWall = Date_t::now();
+ auto lastStoneWall = _stones.front().wallTime;
- double currRetentionHours = (lastAppliedTs - lastStoneTs) / kNumSecsInHour;
+ auto currRetentionMS = durationCount<Milliseconds>(nowWall - lastStoneWall);
+ double currRetentionHours = currRetentionMS / kNumMSInHour;
return currRetentionHours >= minRetentionHours;
}
@@ -272,7 +303,9 @@ void WiredTigerRecordStore::OplogStones::popOldestStone() {
_stones.pop_front();
}
-void WiredTigerRecordStore::OplogStones::createNewStoneIfNeeded(RecordId lastRecord) {
+void WiredTigerRecordStore::OplogStones::createNewStoneIfNeeded(OperationContext* opCtx,
+ RecordId lastRecord,
+ Date_t wallTime) {
stdx::unique_lock<Latch> lk(_mutex, stdx::try_to_lock);
if (!lk) {
// Someone else is either already creating a new stone or popping the oldest one. In the
@@ -296,7 +329,8 @@ void WiredTigerRecordStore::OplogStones::createNewStoneIfNeeded(RecordId lastRec
2,
"create new oplogStone, current stones:{stones_size}",
"stones_size"_attr = _stones.size());
- OplogStones::Stone stone = {_currentRecords.swap(0), _currentBytes.swap(0), lastRecord};
+
+ OplogStones::Stone stone(_currentRecords.swap(0), _currentBytes.swap(0), lastRecord, wallTime);
_stones.push_back(stone);
_pokeReclaimThreadIfNeeded();
@@ -305,10 +339,10 @@ void WiredTigerRecordStore::OplogStones::createNewStoneIfNeeded(RecordId lastRec
void WiredTigerRecordStore::OplogStones::updateCurrentStoneAfterInsertOnCommit(
OperationContext* opCtx,
int64_t bytesInserted,
- RecordId highestInserted,
+ const Record& highestInsertedRecord,
int64_t countInserted) {
- opCtx->recoveryUnit()->registerChange(
- std::make_unique<InsertChange>(this, bytesInserted, highestInserted, countInserted));
+ opCtx->recoveryUnit()->registerChange(std::make_unique<InsertChange>(
+ this, bytesInserted, highestInsertedRecord, countInserted, opCtx));
}
void WiredTigerRecordStore::OplogStones::clearStonesOnCommit(OperationContext* opCtx) {
@@ -415,13 +449,13 @@ void WiredTigerRecordStore::OplogStones::_calculateStonesByScanning(OperationCon
_currentRecords.addAndFetch(1);
int64_t newCurrentBytes = _currentBytes.addAndFetch(record->data.size());
if (newCurrentBytes >= _minBytesPerStone) {
- LOGV2_DEBUG(22385,
- 1,
- "Placing a marker at optime {Timestamp_record_id_repr_Pretty}",
- "Timestamp_record_id_repr_Pretty"_attr =
- Timestamp(record->id.repr()).toStringPretty());
+ BSONObj obj = record->data.toBson();
+ Date_t wallTime = obj["wall"].Date();
- OplogStones::Stone stone = {_currentRecords.swap(0), _currentBytes.swap(0), record->id};
+ LOGV2_DEBUG(
+ 22385, 1, "Marking oplog entry as a potential future oplog truncation point.");
+ OplogStones::Stone stone(
+ _currentRecords.swap(0), _currentBytes.swap(0), record->id, wallTime);
_stones.push_back(stone);
}
@@ -495,7 +529,7 @@ void WiredTigerRecordStore::OplogStones::_calculateStonesBySampling(OperationCon
// order of their RecordId, and then choosing the samples expected to be near the right edge of
// each logical section.
auto cursor = _rs->getRandomCursorWithOptions(opCtx, extraConfig);
- std::vector<RecordId> oplogEstimates;
+ std::vector<RecordIdAndWall> oplogEstimates;
auto lastProgressLog = Date_t::now();
for (int i = 0; i < numSamples; ++i) {
auto samplingLogIntervalSeconds = gOplogSamplingLogIntervalSeconds.load();
@@ -509,7 +543,10 @@ void WiredTigerRecordStore::OplogStones::_calculateStonesBySampling(OperationCon
_calculateStonesByScanning(opCtx);
return;
}
- oplogEstimates.push_back(record->id);
+
+ BSONObj obj = record->data.toBson();
+ Date_t wall = obj["wall"].Date();
+ oplogEstimates.push_back(RecordIdAndWall{record->id, wall});
const auto now = Date_t::now();
if (samplingLogIntervalSeconds > 0 &&
@@ -521,20 +558,25 @@ void WiredTigerRecordStore::OplogStones::_calculateStonesBySampling(OperationCon
lastProgressLog = now;
}
}
- std::sort(oplogEstimates.begin(), oplogEstimates.end());
+ std::sort(oplogEstimates.begin(),
+ oplogEstimates.end(),
+ [](RecordIdAndWall a, RecordIdAndWall b) { return a.id < b.id; });
LOGV2(22393, "Oplog sampling complete");
for (int i = 1; i <= wholeStones; ++i) {
// Use every (kRandomSamplesPerStone)th sample, starting with the
// (kRandomSamplesPerStone - 1)th, as the last record for each stone.
+ // If parsing "wall" fails, we crash to allow user to fix their oplog.
int sampleIndex = kRandomSamplesPerStone * i - 1;
- RecordId lastRecord = oplogEstimates[sampleIndex];
-
- LOGV2(22394,
- "Placing a marker at optime {Timestamp_lastRecord_repr_Pretty}",
- "Timestamp_lastRecord_repr_Pretty"_attr =
- Timestamp(lastRecord.repr()).toStringPretty());
- OplogStones::Stone stone = {estRecordsPerStone, estBytesPerStone, lastRecord};
+ RecordIdAndWall rIdAndWall = oplogEstimates[sampleIndex];
+ LOGV2_DEBUG(22394,
+ 1,
+ "Marking oplog entry as a potential future oplog truncation point. wall: "
+ "{wall}, ts: {ts}",
+ "wall"_attr = rIdAndWall.wall,
+ "ts"_attr = rIdAndWall.id);
+ OplogStones::Stone stone(
+ estRecordsPerStone, estBytesPerStone, rIdAndWall.id, rIdAndWall.wall);
_stones.push_back(stone);
}
@@ -1394,8 +1436,8 @@ Status WiredTigerRecordStore::_insertRecords(OperationContext* opCtx,
WT_CURSOR* c = curwrap.get();
invariant(c);
- RecordId highestId = RecordId();
- dassert(nRecords != 0);
+ Record highestIdRecord;
+ invariant(nRecords != 0);
for (size_t i = 0; i < nRecords; i++) {
auto& record = records[i];
if (_isOplog) {
@@ -1407,8 +1449,8 @@ Status WiredTigerRecordStore::_insertRecords(OperationContext* opCtx,
} else {
record.id = _nextId(opCtx);
}
- dassert(record.id > highestId);
- highestId = record.id;
+ dassert(record.id > highestIdRecord.id);
+ highestIdRecord = record;
}
for (size_t i = 0; i < nRecords; i++) {
@@ -1443,9 +1485,9 @@ Status WiredTigerRecordStore::_insertRecords(OperationContext* opCtx,
if (_oplogStones) {
_oplogStones->updateCurrentStoneAfterInsertOnCommit(
- opCtx, totalLength, highestId, nRecords);
+ opCtx, totalLength, highestIdRecord, nRecords);
} else {
- _cappedDeleteAsNeeded(opCtx, highestId);
+ _cappedDeleteAsNeeded(opCtx, highestIdRecord.id);
}
return Status::OK();
diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_oplog_stones.h b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_oplog_stones.h
index be7eb4afb7d..1f1412c0b81 100644
--- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_oplog_stones.h
+++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store_oplog_stones.h
@@ -49,6 +49,10 @@ public:
int64_t records; // Approximate number of records in a chunk of the oplog.
int64_t bytes; // Approximate size of records in a chunk of the oplog.
RecordId lastRecord; // RecordId of the last record in a chunk of the oplog.
+ Date_t wallTime; // Walltime of when this chunk of the oplog was created.
+
+ Stone(int64_t records, int64_t bytes, RecordId lastRecord, Date_t wallTime)
+ : records(records), bytes(bytes), lastRecord(lastRecord), wallTime(wallTime) {}
};
OplogStones(OperationContext* opCtx, WiredTigerRecordStore* rs);
@@ -73,11 +77,11 @@ public:
void popOldestStone();
- void createNewStoneIfNeeded(RecordId lastRecord);
+ void createNewStoneIfNeeded(OperationContext* opCtx, RecordId lastRecord, Date_t wallTime);
void updateCurrentStoneAfterInsertOnCommit(OperationContext* opCtx,
int64_t bytesInserted,
- RecordId highestInserted,
+ const Record& highestInsertedRecord,
int64_t countInserted);
void clearStonesOnCommit(OperationContext* opCtx);