// wiredtiger_kv_engine.cpp /** * Copyright (C) 2014 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kStorage #include "mongo/platform/basic.h" #ifdef _WIN32 #define NVALGRIND #endif #include #include "mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h" #include #include #include #include "mongo/base/error_codes.h" #include "mongo/bson/bsonobjbuilder.h" #include "mongo/db/bson/dotted_path_support.h" #include "mongo/db/catalog/collection_catalog_entry.h" #include "mongo/db/client.h" #include "mongo/db/commands/server_status_metric.h" #include "mongo/db/concurrency/locker.h" #include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/index/index_descriptor.h" #include "mongo/db/server_parameters.h" #include "mongo/db/service_context.h" #include "mongo/db/storage/journal_listener.h" #include "mongo/db/storage/storage_options.h" #include "mongo/db/storage/wiredtiger/wiredtiger_customization_hooks.h" #include "mongo/db/storage/wiredtiger/wiredtiger_extensions.h" #include "mongo/db/storage/wiredtiger/wiredtiger_global_options.h" #include "mongo/db/storage/wiredtiger/wiredtiger_index.h" #include "mongo/db/storage/wiredtiger/wiredtiger_record_store.h" #include "mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h" #include "mongo/db/storage/wiredtiger/wiredtiger_session_cache.h" #include "mongo/db/storage/wiredtiger/wiredtiger_size_storer.h" #include "mongo/db/storage/wiredtiger/wiredtiger_util.h" #include "mongo/stdx/memory.h" #include "mongo/util/background.h" #include "mongo/util/concurrency/idle_thread_block.h" #include "mongo/util/concurrency/ticketholder.h" #include "mongo/util/exit.h" #include "mongo/util/log.h" #include "mongo/util/processinfo.h" #include "mongo/util/scopeguard.h" #include "mongo/util/time_support.h" #if !defined(__has_feature) #define __has_feature(x) 0 #endif namespace mongo { using std::set; using std::string; namespace dps = ::mongo::dotted_path_support; class WiredTigerKVEngine::WiredTigerJournalFlusher : public BackgroundJob { public: explicit WiredTigerJournalFlusher(WiredTigerSessionCache* sessionCache) : BackgroundJob(false /* deleteSelf */), _sessionCache(sessionCache) {} virtual string name() const { return "WTJournalFlusher"; } virtual void run() { Client::initThread(name().c_str()); LOG(1) << "starting " << name() << " thread"; while (!_shuttingDown.load()) { try { _sessionCache->waitUntilDurable(false); } catch (const UserException& e) { invariant(e.getCode() == ErrorCodes::ShutdownInProgress); } int ms = storageGlobalParams.journalCommitIntervalMs; if (!ms) { ms = 100; } MONGO_IDLE_THREAD_BLOCK; sleepmillis(ms); } LOG(1) << "stopping " << name() << " thread"; } void shutdown() { _shuttingDown.store(true); wait(); } private: WiredTigerSessionCache* _sessionCache; std::atomic _shuttingDown{false}; // NOLINT }; namespace { class TicketServerParameter : public ServerParameter { MONGO_DISALLOW_COPYING(TicketServerParameter); public: TicketServerParameter(TicketHolder* holder, const std::string& name) : ServerParameter(ServerParameterSet::getGlobal(), name, true, true), _holder(holder) {} virtual void append(OperationContext* txn, BSONObjBuilder& b, const std::string& name) { b.append(name, _holder->outof()); } virtual Status set(const BSONElement& newValueElement) { if (!newValueElement.isNumber()) return Status(ErrorCodes::BadValue, str::stream() << name() << " has to be a number"); return _set(newValueElement.numberInt()); } virtual Status setFromString(const std::string& str) { int num = 0; Status status = parseNumberFromString(str, &num); if (!status.isOK()) return status; return _set(num); } Status _set(int newNum) { if (newNum <= 0) { return Status(ErrorCodes::BadValue, str::stream() << name() << " has to be > 0"); } return _holder->resize(newNum); } private: TicketHolder* _holder; }; TicketHolder openWriteTransaction(128); TicketServerParameter openWriteTransactionParam(&openWriteTransaction, "wiredTigerConcurrentWriteTransactions"); TicketHolder openReadTransaction(128); TicketServerParameter openReadTransactionParam(&openReadTransaction, "wiredTigerConcurrentReadTransactions"); } // namespace WiredTigerKVEngine::WiredTigerKVEngine(const std::string& canonicalName, const std::string& path, ClockSource* cs, const std::string& extraOpenOptions, size_t cacheSizeMB, bool durable, bool ephemeral, bool repair, bool readOnly) : _eventHandler(WiredTigerUtil::defaultEventHandlers()), _canonicalName(canonicalName), _path(path), _sizeStorerSyncTracker(cs, 100000, Seconds(60)), _durable(durable), _ephemeral(ephemeral), _readOnly(readOnly) { boost::filesystem::path journalPath = path; journalPath /= "journal"; if (_durable) { if (!boost::filesystem::exists(journalPath)) { try { boost::filesystem::create_directory(journalPath); } catch (std::exception& e) { log() << "error creating journal dir " << journalPath.string() << ' ' << e.what(); throw; } } } _previousCheckedDropsQueued = Date_t::now(); std::stringstream ss; ss << "create,"; ss << "cache_size=" << cacheSizeMB << "M,"; ss << "session_max=20000,"; ss << "eviction=(threads_min=4,threads_max=4),"; ss << "config_base=false,"; ss << "statistics=(fast),"; // The setting may have a later setting override it if not using the journal. We make it // unconditional here because even nojournal may need this setting if it is a transition // from using the journal. if (!_readOnly) { // If we're readOnly skip all WAL-related settings. ss << "log=(enabled=true,archive=true,path=journal,compressor="; ss << wiredTigerGlobalOptions.journalCompressor << "),"; ss << "file_manager=(close_idle_time=100000),"; //~28 hours, will put better fix in 3.1.x ss << "checkpoint=(wait=" << wiredTigerGlobalOptions.checkpointDelaySecs; ss << ",log_size=2GB),"; ss << "statistics_log=(wait=" << wiredTigerGlobalOptions.statisticsLogDelaySecs << "),"; } ss << WiredTigerCustomizationHooks::get(getGlobalServiceContext()) ->getTableCreateConfig("system"); ss << WiredTigerExtensions::get(getGlobalServiceContext())->getOpenExtensionsConfig(); ss << extraOpenOptions; if (_readOnly) { invariant(!_durable); ss << "readonly=true,"; } if (!_durable && !_readOnly) { // If we started without the journal, but previously used the journal then open with the // WT log enabled to perform any unclean shutdown recovery and then close and reopen in // the normal path without the journal. if (boost::filesystem::exists(journalPath)) { string config = ss.str(); log() << "Detected WT journal files. Running recovery from last checkpoint."; log() << "journal to nojournal transition config: " << config; int ret = wiredtiger_open(path.c_str(), &_eventHandler, config.c_str(), &_conn); if (ret == EINVAL) { fassertFailedNoTrace(28717); } else if (ret != 0) { Status s(wtRCToStatus(ret)); msgassertedNoTrace(28718, s.reason()); } invariantWTOK(_conn->close(_conn, NULL)); } // This setting overrides the earlier setting because it is later in the config string. ss << ",log=(enabled=false),"; } string config = ss.str(); log() << "wiredtiger_open config: " << config; int ret = wiredtiger_open(path.c_str(), &_eventHandler, config.c_str(), &_conn); // Invalid argument (EINVAL) is usually caused by invalid configuration string. // We still fassert() but without a stack trace. if (ret == EINVAL) { fassertFailedNoTrace(28561); } else if (ret != 0) { Status s(wtRCToStatus(ret)); msgassertedNoTrace(28595, s.reason()); } _sessionCache.reset(new WiredTigerSessionCache(this)); if (_durable && !_ephemeral) { _journalFlusher = stdx::make_unique(_sessionCache.get()); _journalFlusher->go(); } _sizeStorerUri = "table:sizeStorer"; WiredTigerSession session(_conn); if (!_readOnly && repair && _hasUri(session.getSession(), _sizeStorerUri)) { log() << "Repairing size cache"; fassertNoTrace(28577, _salvageIfNeeded(_sizeStorerUri.c_str())); } _sizeStorer.reset(new WiredTigerSizeStorer(_conn, _sizeStorerUri)); _sizeStorer->fillCache(); Locker::setGlobalThrottling(&openReadTransaction, &openWriteTransaction); } WiredTigerKVEngine::~WiredTigerKVEngine() { if (_conn) { cleanShutdown(); } _sessionCache.reset(NULL); } void WiredTigerKVEngine::appendGlobalStats(BSONObjBuilder& b) { BSONObjBuilder bb(b.subobjStart("concurrentTransactions")); { BSONObjBuilder bbb(bb.subobjStart("write")); bbb.append("out", openWriteTransaction.used()); bbb.append("available", openWriteTransaction.available()); bbb.append("totalTickets", openWriteTransaction.outof()); bbb.done(); } { BSONObjBuilder bbb(bb.subobjStart("read")); bbb.append("out", openReadTransaction.used()); bbb.append("available", openReadTransaction.available()); bbb.append("totalTickets", openReadTransaction.outof()); bbb.done(); } bb.done(); } void WiredTigerKVEngine::cleanShutdown() { log() << "WiredTigerKVEngine shutting down"; if (!_readOnly) syncSizeInfo(true); if (_conn) { // these must be the last things we do before _conn->close(); if (_journalFlusher) _journalFlusher->shutdown(); _sizeStorer.reset(); _sessionCache->shuttingDown(); // We want WiredTiger to leak memory for faster shutdown except when we are running tools to // look for memory leaks. #if !__has_feature(address_sanitizer) bool leak_memory = true; #else bool leak_memory = false; #endif const char* config = nullptr; if (RUNNING_ON_VALGRIND) { leak_memory = false; } if (leak_memory) { config = "leak_memory=true"; } invariantWTOK(_conn->close(_conn, config)); _conn = NULL; } } Status WiredTigerKVEngine::okToRename(OperationContext* opCtx, StringData fromNS, StringData toNS, StringData ident, const RecordStore* originalRecordStore) const { _sizeStorer->storeToCache( _uri(ident), originalRecordStore->numRecords(opCtx), originalRecordStore->dataSize(opCtx)); syncSizeInfo(true); return Status::OK(); } int64_t WiredTigerKVEngine::getIdentSize(OperationContext* opCtx, StringData ident) { WiredTigerSession* session = WiredTigerRecoveryUnit::get(opCtx)->getSession(opCtx); return WiredTigerUtil::getIdentSize(session->getSession(), _uri(ident)); } Status WiredTigerKVEngine::repairIdent(OperationContext* opCtx, StringData ident) { WiredTigerSession* session = WiredTigerRecoveryUnit::get(opCtx)->getSession(opCtx); string uri = _uri(ident); session->closeAllCursors(uri); _sessionCache->closeAllCursors(uri); if (isEphemeral()) { return Status::OK(); } return _salvageIfNeeded(uri.c_str()); } Status WiredTigerKVEngine::_salvageIfNeeded(const char* uri) { // Using a side session to avoid transactional issues WiredTigerSession sessionWrapper(_conn); WT_SESSION* session = sessionWrapper.getSession(); int rc = (session->verify)(session, uri, NULL); if (rc == 0) { log() << "Verify succeeded on uri " << uri << ". Not salvaging."; return Status::OK(); } if (rc == EBUSY) { // SERVER-16457: verify and salvage are occasionally failing with EBUSY. For now we // lie and return OK to avoid breaking tests. This block should go away when that ticket // is resolved. error() << "Verify on " << uri << " failed with EBUSY. Assuming no salvage is needed."; return Status::OK(); } // TODO need to cleanup the sizeStorer cache after salvaging. log() << "Verify failed on uri " << uri << ". Running a salvage operation."; return wtRCToStatus(session->salvage(session, uri, NULL), "Salvage failed:"); } int WiredTigerKVEngine::flushAllFiles(bool sync) { LOG(1) << "WiredTigerKVEngine::flushAllFiles"; if (_ephemeral) { return 0; } syncSizeInfo(true); _sessionCache->waitUntilDurable(true); return 1; } Status WiredTigerKVEngine::beginBackup(OperationContext* txn) { invariant(!_backupSession); // This cursor will be freed by the backupSession being closed as the session is uncached auto session = stdx::make_unique(_conn); WT_CURSOR* c = NULL; WT_SESSION* s = session->getSession(); int ret = WT_OP_CHECK(s->open_cursor(s, "backup:", NULL, NULL, &c)); if (ret != 0) { return wtRCToStatus(ret); } _backupSession = std::move(session); return Status::OK(); } void WiredTigerKVEngine::endBackup(OperationContext* txn) { _backupSession.reset(); } void WiredTigerKVEngine::syncSizeInfo(bool sync) const { if (!_sizeStorer) return; try { _sizeStorer->syncCache(sync); } catch (const WriteConflictException&) { // ignore, we'll try again later. } } RecoveryUnit* WiredTigerKVEngine::newRecoveryUnit() { return new WiredTigerRecoveryUnit(_sessionCache.get()); } void WiredTigerKVEngine::setRecordStoreExtraOptions(const std::string& options) { _rsOptions = options; } void WiredTigerKVEngine::setSortedDataInterfaceExtraOptions(const std::string& options) { _indexOptions = options; } Status WiredTigerKVEngine::createRecordStore(OperationContext* opCtx, StringData ns, StringData ident, const CollectionOptions& options) { _checkIdentPath(ident); WiredTigerSession session(_conn); StatusWith result = WiredTigerRecordStore::generateCreateString(_canonicalName, ns, options, _rsOptions); if (!result.isOK()) { return result.getStatus(); } std::string config = result.getValue(); string uri = _uri(ident); WT_SESSION* s = session.getSession(); LOG(2) << "WiredTigerKVEngine::createRecordStore uri: " << uri << " config: " << config; return wtRCToStatus(s->create(s, uri.c_str(), config.c_str())); } std::unique_ptr WiredTigerKVEngine::getRecordStore(OperationContext* opCtx, StringData ns, StringData ident, const CollectionOptions& options) { if (options.capped) { return stdx::make_unique( opCtx, ns, _uri(ident), _canonicalName, options.capped, _ephemeral, options.cappedSize ? options.cappedSize : 4096, options.cappedMaxDocs ? options.cappedMaxDocs : -1, nullptr, _sizeStorer.get()); } else { return stdx::make_unique(opCtx, ns, _uri(ident), _canonicalName, false, _ephemeral, -1, -1, nullptr, _sizeStorer.get()); } } string WiredTigerKVEngine::_uri(StringData ident) const { return string("table:") + ident.toString(); } Status WiredTigerKVEngine::createSortedDataInterface(OperationContext* opCtx, StringData ident, const IndexDescriptor* desc) { _checkIdentPath(ident); std::string collIndexOptions; const Collection* collection = desc->getCollection(); // Treat 'collIndexOptions' as an empty string when the collection member of 'desc' is NULL in // order to allow for unit testing WiredTigerKVEngine::createSortedDataInterface(). if (collection) { const CollectionCatalogEntry* cce = collection->getCatalogEntry(); const CollectionOptions collOptions = cce->getCollectionOptions(opCtx); if (!collOptions.indexOptionDefaults["storageEngine"].eoo()) { BSONObj storageEngineOptions = collOptions.indexOptionDefaults["storageEngine"].Obj(); collIndexOptions = dps::extractElementAtPath(storageEngineOptions, _canonicalName + ".configString") .valuestrsafe(); } } StatusWith result = WiredTigerIndex::generateCreateString( _canonicalName, _indexOptions, collIndexOptions, *desc); if (!result.isOK()) { return result.getStatus(); } std::string config = result.getValue(); LOG(2) << "WiredTigerKVEngine::createSortedDataInterface ident: " << ident << " config: " << config; return wtRCToStatus(WiredTigerIndex::Create(opCtx, _uri(ident), config)); } SortedDataInterface* WiredTigerKVEngine::getSortedDataInterface(OperationContext* opCtx, StringData ident, const IndexDescriptor* desc) { if (desc->unique()) return new WiredTigerIndexUnique(opCtx, _uri(ident), desc); return new WiredTigerIndexStandard(opCtx, _uri(ident), desc); } Status WiredTigerKVEngine::dropIdent(OperationContext* opCtx, StringData ident) { _drop(ident); return Status::OK(); } bool WiredTigerKVEngine::_drop(StringData ident) { string uri = _uri(ident); _sessionCache->closeAllCursors(uri); WiredTigerSession session(_conn); int ret = session.getSession()->drop( session.getSession(), uri.c_str(), "force,checkpoint_wait=false"); LOG(1) << "WT drop of " << uri << " res " << ret; if (ret == 0) { // yay, it worked return true; } if (ret == EBUSY) { // this is expected, queue it up { stdx::lock_guard lk(_identToDropMutex); _identToDrop.push_front(uri); } _sessionCache->closeCursorsForQueuedDrops(); return false; } invariantWTOK(ret); return false; } std::list WiredTigerKVEngine::filterCursorsWithQueuedDrops( std::list* cache) { std::list toDrop; stdx::lock_guard lk(_identToDropMutex); if (_identToDrop.empty()) return toDrop; for (auto i = cache->begin(); i != cache->end();) { if (!i->_cursor || std::find(_identToDrop.begin(), _identToDrop.end(), std::string(i->_cursor->uri)) == _identToDrop.end()) { ++i; continue; } toDrop.push_back(*i); i = cache->erase(i); } return toDrop; } bool WiredTigerKVEngine::haveDropsQueued() const { Date_t now = Date_t::now(); Milliseconds delta = now - _previousCheckedDropsQueued; if (!_readOnly && _sizeStorerSyncTracker.intervalHasElapsed()) { _sizeStorerSyncTracker.resetLastTime(); syncSizeInfo(false); } // We only want to check the queue max once per second or we'll thrash if (delta < Milliseconds(1000)) return false; _previousCheckedDropsQueued = now; // Don't wait for the mutex: if we can't get it, report that no drops are queued. stdx::unique_lock lk(_identToDropMutex, stdx::defer_lock); return lk.try_lock() && !_identToDrop.empty(); } void WiredTigerKVEngine::dropSomeQueuedIdents() { int numInQueue; WiredTigerSession session(_conn); { stdx::lock_guard lk(_identToDropMutex); numInQueue = _identToDrop.size(); } int numToDelete = 10; int tenPercentQueue = numInQueue * 0.1; if (tenPercentQueue > 10) numToDelete = tenPercentQueue; LOG(1) << "WT Queue is: " << numInQueue << " attempting to drop: " << numToDelete << " tables"; for (int i = 0; i < numToDelete; i++) { string uri; { stdx::lock_guard lk(_identToDropMutex); if (_identToDrop.empty()) break; uri = _identToDrop.front(); _identToDrop.pop_front(); } int ret = session.getSession()->drop( session.getSession(), uri.c_str(), "force,checkpoint_wait=false"); LOG(1) << "WT queued drop of " << uri << " res " << ret; if (ret == EBUSY) { stdx::lock_guard lk(_identToDropMutex); _identToDrop.push_back(uri); } else { invariantWTOK(ret); } } } bool WiredTigerKVEngine::supportsDocLocking() const { return true; } bool WiredTigerKVEngine::supportsDirectoryPerDB() const { return true; } bool WiredTigerKVEngine::hasIdent(OperationContext* opCtx, StringData ident) const { return _hasUri(WiredTigerRecoveryUnit::get(opCtx)->getSession(opCtx)->getSession(), _uri(ident)); } bool WiredTigerKVEngine::_hasUri(WT_SESSION* session, const std::string& uri) const { // can't use WiredTigerCursor since this is called from constructor. WT_CURSOR* c = NULL; int ret = session->open_cursor(session, "metadata:", NULL, NULL, &c); if (ret == ENOENT) return false; invariantWTOK(ret); ON_BLOCK_EXIT(c->close, c); c->set_key(c, uri.c_str()); return c->search(c) == 0; } std::vector WiredTigerKVEngine::getAllIdents(OperationContext* opCtx) const { std::vector all; WiredTigerCursor cursor("metadata:", WiredTigerSession::kMetadataTableId, false, opCtx); WT_CURSOR* c = cursor.get(); if (!c) return all; while (c->next(c) == 0) { const char* raw; c->get_key(c, &raw); StringData key(raw); size_t idx = key.find(':'); if (idx == string::npos) continue; StringData type = key.substr(0, idx); if (type != "table") continue; StringData ident = key.substr(idx + 1); if (ident == "sizeStorer") continue; all.push_back(ident.toString()); } return all; } int WiredTigerKVEngine::reconfigure(const char* str) { return _conn->reconfigure(_conn, str); } void WiredTigerKVEngine::_checkIdentPath(StringData ident) { size_t start = 0; size_t idx; while ((idx = ident.find('/', start)) != string::npos) { StringData dir = ident.substr(0, idx); boost::filesystem::path subdir = _path; subdir /= dir.toString(); if (!boost::filesystem::exists(subdir)) { LOG(1) << "creating subdirectory: " << dir; try { boost::filesystem::create_directory(subdir); } catch (const std::exception& e) { error() << "error creating path " << subdir.string() << ' ' << e.what(); throw; } } start = idx + 1; } } void WiredTigerKVEngine::setJournalListener(JournalListener* jl) { return _sessionCache->setJournalListener(jl); } }