/**
* Copyright (C) 2015 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#pragma once
#include
#include "mongo/db/storage/wiredtiger/wiredtiger_record_store.h"
#include "mongo/platform/atomic_word.h"
#include "mongo/stdx/condition_variable.h"
#include "mongo/stdx/mutex.h"
namespace mongo {
class OperationContext;
class RecordId;
// Keep "milestones" against the oplog to efficiently remove the old records when the collection
// grows beyond its desired maximum size.
class WiredTigerRecordStore::OplogStones {
public:
struct Stone {
int64_t records; // Approximate number of records in a chunk of the oplog.
int64_t bytes; // Approximate size of records in a chunk of the oplog.
RecordId lastRecord; // RecordId of the last record in a chunk of the oplog.
};
OplogStones(OperationContext* txn, WiredTigerRecordStore* rs);
bool isDead();
void kill();
bool hasExcessStones() const {
return _stones.size() > _numStonesToKeep;
}
void awaitHasExcessStonesOrDead();
boost::optional peekOldestStoneIfNeeded() const;
void popOldestStone();
void createNewStoneIfNeeded(RecordId lastRecord);
void updateCurrentStoneAfterInsertOnCommit(OperationContext* txn,
int64_t bytesInserted,
RecordId highestInserted,
int64_t countInserted);
void clearStonesOnCommit(OperationContext* txn);
// Updates the metadata about the oplog stones after a rollback occurs.
void updateStonesAfterCappedTruncateAfter(int64_t recordsRemoved,
int64_t bytesRemoved,
RecordId firstRemovedId);
// The start point of where to truncate next. Used by the background reclaim thread to
// efficiently truncate records with WiredTiger by skipping over tombstones, etc.
RecordId firstRecord;
//
// The following methods are public only for use in tests.
//
size_t numStones() const {
stdx::lock_guard lk(_mutex);
return _stones.size();
}
int64_t currentBytes() const {
return _currentBytes.load();
}
int64_t currentRecords() const {
return _currentRecords.load();
}
void setMinBytesPerStone(int64_t size);
void setNumStonesToKeep(size_t numStones);
private:
class InsertChange;
class TruncateChange;
void _calculateStones(OperationContext* txn);
void _calculateStonesByScanning(OperationContext* txn);
void _calculateStonesBySampling(OperationContext* txn,
int64_t estRecordsPerStone,
int64_t estBytesPerStone);
void _pokeReclaimThreadIfNeeded();
static const uint64_t kRandomSamplesPerStone = 10;
WiredTigerRecordStore* _rs;
stdx::mutex _oplogReclaimMutex;
stdx::condition_variable _oplogReclaimCv;
// True if '_rs' has been destroyed, e.g. due to repairDatabase being called on the "local"
// database, and false otherwise.
bool _isDead = false;
// Maximum number of stones to keep in the deque before the background reclaim thread should
// truncate the oldest ones. Does not include the stone currently being filled. This value
// should not be changed after initialization.
size_t _numStonesToKeep;
// Minimum number of bytes the stone being filled should contain before it gets added to the
// deque of oplog stones. This value should not be changed after initialization.
int64_t _minBytesPerStone;
AtomicInt64 _currentRecords; // Number of records in the stone being filled.
AtomicInt64 _currentBytes; // Number of bytes in the stone being filled.
mutable stdx::mutex _mutex; // Protects against concurrent access to the deque of oplog stones.
std::deque _stones; // front = oldest, back = newest.
};
} // namespace mongo