// wiredtiger_record_store.h /** * Copyright (C) 2014 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #pragma once #include #include #include #include "mongo/db/catalog/collection_options.h" #include "mongo/db/storage/capped_callback.h" #include "mongo/db/storage/record_store.h" #include "mongo/platform/atomic_word.h" #include "mongo/stdx/condition_variable.h" #include "mongo/stdx/mutex.h" #include "mongo/stdx/thread.h" #include "mongo/util/fail_point_service.h" /** * Either executes the specified operation and returns it's value or randomly throws a write * conflict exception if the WTWriteConflictException failpoint is enabled. */ #define WT_OP_CHECK(x) (((MONGO_FAIL_POINT(WTWriteConflictException))) ? (WT_ROLLBACK) : (x)) namespace mongo { class RecoveryUnit; class WiredTigerCursor; class WiredTigerSessionCache; class WiredTigerRecoveryUnit; class WiredTigerSizeStorer; extern const std::string kWiredTigerEngineName; typedef std::list SortedRecordIds; class WiredTigerRecordStore final : public RecordStore { public: /** * Parses collections options for wired tiger configuration string for table creation. * The document 'options' is typically obtained from the 'wiredTiger' field of * CollectionOptions::storageEngine. */ static StatusWith parseOptionsField(const BSONObj options); /** * Creates a configuration string suitable for 'config' parameter in WT_SESSION::create(). * Configuration string is constructed from: * built-in defaults * storageEngine.wiredTiger.configString in 'options' * 'extraStrings' * Performs simple validation on the supplied parameters. * Returns error status if validation fails. * Note that even if this function returns an OK status, WT_SESSION:create() may still * fail with the constructed configuration string. */ static StatusWith generateCreateString(const std::string& engineName, StringData ns, const CollectionOptions& options, StringData extraStrings); WiredTigerRecordStore(OperationContext* txn, StringData ns, StringData uri, std::string engineName, bool isCapped, bool isEphemeral, int64_t cappedMaxSize = -1, int64_t cappedMaxDocs = -1, CappedCallback* cappedCallback = nullptr, WiredTigerSizeStorer* sizeStorer = nullptr); virtual ~WiredTigerRecordStore(); // name of the RecordStore implementation virtual const char* name() const; virtual long long dataSize(OperationContext* txn) const; virtual long long numRecords(OperationContext* txn) const; virtual bool isCapped() const; virtual int64_t storageSize(OperationContext* txn, BSONObjBuilder* extraInfo = NULL, int infoLevel = 0) const; // CRUD related virtual RecordData dataFor(OperationContext* txn, const RecordId& id) const; virtual bool findRecord(OperationContext* txn, const RecordId& id, RecordData* out) const; virtual void deleteRecord(OperationContext* txn, const RecordId& id); virtual Status insertRecords(OperationContext* txn, std::vector* records, bool enforceQuota); virtual StatusWith insertRecord(OperationContext* txn, const char* data, int len, bool enforceQuota); virtual Status insertRecordsWithDocWriter(OperationContext* txn, const DocWriter* const* docs, size_t nDocs, RecordId* idsOut); virtual Status updateRecord(OperationContext* txn, const RecordId& oldLocation, const char* data, int len, bool enforceQuota, UpdateNotifier* notifier); virtual bool updateWithDamagesSupported() const; virtual StatusWith updateWithDamages(OperationContext* txn, const RecordId& id, const RecordData& oldRec, const char* damageSource, const mutablebson::DamageVector& damages); std::unique_ptr getCursor(OperationContext* txn, bool forward) const final; std::unique_ptr getRandomCursor(OperationContext* txn) const final; std::unique_ptr getRandomCursorWithOptions(OperationContext* txn, StringData extraConfig) const; std::vector> getManyCursors(OperationContext* txn) const final; virtual Status truncate(OperationContext* txn); virtual bool compactSupported() const { return !_isEphemeral; } virtual bool compactsInPlace() const { return true; } virtual Status compact(OperationContext* txn, RecordStoreCompactAdaptor* adaptor, const CompactOptions* options, CompactStats* stats); virtual Status validate(OperationContext* txn, ValidateCmdLevel level, ValidateAdaptor* adaptor, ValidateResults* results, BSONObjBuilder* output); virtual void appendCustomStats(OperationContext* txn, BSONObjBuilder* result, double scale) const; virtual Status touch(OperationContext* txn, BSONObjBuilder* output) const; virtual void cappedTruncateAfter(OperationContext* txn, RecordId end, bool inclusive); virtual boost::optional oplogStartHack(OperationContext* txn, const RecordId& startingPosition) const; virtual Status oplogDiskLocRegister(OperationContext* txn, const Timestamp& opTime); virtual void updateStatsAfterRepair(OperationContext* txn, long long numRecords, long long dataSize); void waitForAllEarlierOplogWritesToBeVisible(OperationContext* txn) const override; bool isOplog() const { return _isOplog; } bool usingOplogHack() const { return _useOplogHack; } void setCappedCallback(CappedCallback* cb) { stdx::lock_guard lk(_cappedCallbackMutex); _cappedCallback = cb; } int64_t cappedMaxDocs() const; int64_t cappedMaxSize() const; const std::string& getURI() const { return _uri; } uint64_t tableId() const { return _tableId; } void setSizeStorer(WiredTigerSizeStorer* ss) { _sizeStorer = ss; } bool isCappedHidden(const RecordId& id) const; RecordId lowestCappedHiddenRecord() const; bool inShutdown() const; void reclaimOplog(OperationContext* txn); int64_t cappedDeleteAsNeeded(OperationContext* txn, const RecordId& justInserted); int64_t cappedDeleteAsNeeded_inlock(OperationContext* txn, const RecordId& justInserted); boost::timed_mutex& cappedDeleterMutex() { // NOLINT return _cappedDeleterMutex; } // Returns false if the oplog was dropped while waiting for a deletion request. bool yieldAndAwaitOplogDeletionRequest(OperationContext* txn); class OplogStones; // Exposed only for testing. OplogStones* oplogStones() { return _oplogStones.get(); }; private: class Cursor; class RandomCursor; class CappedInsertChange; class NumRecordsChange; class DataSizeChange; static WiredTigerRecoveryUnit* _getRecoveryUnit(OperationContext* txn); static int64_t _makeKey(const RecordId& id); static RecordId _fromKey(int64_t k); void _dealtWithCappedId(SortedRecordIds::iterator it, bool didCommit); void _addUncommittedRecordId_inlock(OperationContext* txn, RecordId id); Status _insertRecords(OperationContext* txn, Record* records, size_t nRecords); RecordId _nextId(); void _setId(RecordId id); bool cappedAndNeedDelete() const; void _changeNumRecords(OperationContext* txn, int64_t diff); void _increaseDataSize(OperationContext* txn, int64_t amount); RecordData _getData(const WiredTigerCursor& cursor) const; void _oplogSetStartHack(WiredTigerRecoveryUnit* wru) const; void _oplogJournalThreadLoop(WiredTigerSessionCache* sessionCache); const std::string _uri; const uint64_t _tableId; // not persisted // Canonical engine name to use for retrieving options const std::string _engineName; // The capped settings should not be updated once operations have started const bool _isCapped; // True if the storage engine is an in-memory storage engine const bool _isEphemeral; // True if the namespace of this record store starts with "local.oplog.", and false otherwise. const bool _isOplog; const int64_t _cappedMaxSize; const int64_t _cappedMaxSizeSlack; // when to start applying backpressure const int64_t _cappedMaxDocs; RecordId _cappedFirstRecord; AtomicInt64 _cappedSleep; AtomicInt64 _cappedSleepMS; CappedCallback* _cappedCallback; stdx::mutex _cappedCallbackMutex; // guards _cappedCallback. // See comment in ::cappedDeleteAsNeeded int _cappedDeleteCheckCount; mutable boost::timed_mutex _cappedDeleterMutex; // NOLINT const bool _useOplogHack; SortedRecordIds _uncommittedRecordIds; RecordId _oplog_highestSeen; mutable stdx::mutex _uncommittedRecordIdsMutex; AtomicInt64 _nextIdNum; AtomicInt64 _dataSize; AtomicInt64 _numRecords; WiredTigerSizeStorer* _sizeStorer; // not owned, can be NULL int _sizeStorerCounter; bool _shuttingDown; // Non-null if this record store is underlying the active oplog. std::shared_ptr _oplogStones; // These use the _uncommittedRecordIdsMutex and are only used when _isOplog is true. stdx::condition_variable _opsWaitingForJournalCV; mutable stdx::condition_variable _opsBecameVisibleCV; std::vector _opsWaitingForJournal; stdx::thread _oplogJournalThread; }; // WT failpoint to throw write conflict exceptions randomly MONGO_FP_FORWARD_DECLARE(WTWriteConflictException); // Prevents oplog writes from being considered durable on the primary. Once activated, new writes // will not be considered durable until deactivated. It is unspecified whether writes that commit // before activation will become visible while active. MONGO_FP_FORWARD_DECLARE(WTPausePrimaryOplogDurabilityLoop); }