// wiredtiger_kv_engine.cpp
/**
* Copyright (C) 2014 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kStorage
#include "mongo/db/storage/wiredtiger/wiredtiger_kv_engine.h"
#include
#include
#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/index/index_descriptor.h"
#include "mongo/db/storage/wiredtiger/wiredtiger_global_options.h"
#include "mongo/db/storage/wiredtiger/wiredtiger_index.h"
#include "mongo/db/storage/wiredtiger/wiredtiger_record_store.h"
#include "mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.h"
#include "mongo/db/storage/wiredtiger/wiredtiger_session_cache.h"
#include "mongo/db/storage/wiredtiger/wiredtiger_size_storer.h"
#include "mongo/db/storage/wiredtiger/wiredtiger_util.h"
#include "mongo/util/log.h"
#include "mongo/util/processinfo.h"
#include "mongo/util/scopeguard.h"
#if !defined(__has_feature)
#define __has_feature(x) 0
#endif
namespace mongo {
using std::set;
using std::string;
namespace {
int mdb_handle_error(WT_EVENT_HANDLER *handler, WT_SESSION *session,
int errorCode, const char *message) {
try {
error() << "WiredTiger (" << errorCode << ") " << message;
fassert( 28558, errorCode != WT_PANIC );
}
catch (...) {
std::terminate();
}
return 0;
}
int mdb_handle_message( WT_EVENT_HANDLER *handler, WT_SESSION *session,
const char *message) {
try {
log() << "WiredTiger " << message;
}
catch (...) {
std::terminate();
}
return 0;
}
int mdb_handle_progress( WT_EVENT_HANDLER *handler, WT_SESSION *session,
const char *operation, uint64_t progress) {
try {
log() << "WiredTiger progress " << operation << " " << progress;
}
catch (...) {
std::terminate();
}
return 0;
}
int mdb_handle_close( WT_EVENT_HANDLER *handler, WT_SESSION *session,
WT_CURSOR *cursor) {
return 0;
}
}
WiredTigerKVEngine::WiredTigerKVEngine( const std::string& path,
const std::string& extraOpenOptions,
bool durable,
bool repair )
: _path( path ),
_durable( durable ),
_sizeStorerSyncTracker( 100000, 60 * 1000 ) {
_eventHandler.handle_error = mdb_handle_error;
_eventHandler.handle_message = mdb_handle_message;
_eventHandler.handle_progress = mdb_handle_progress;
_eventHandler.handle_close = mdb_handle_close;
size_t cacheSizeGB = wiredTigerGlobalOptions.cacheSizeGB;
if (cacheSizeGB == 0) {
// Since the user didn't provide a cache size, choose a reasonable default value.
ProcessInfo pi;
unsigned long long memSizeMB = pi.getMemSizeMB();
if ( memSizeMB > 0 ) {
double cacheMB = memSizeMB / 2;
cacheSizeGB = static_cast( cacheMB / 1024 );
if ( cacheSizeGB < 1 )
cacheSizeGB = 1;
}
}
if ( _durable ) {
boost::filesystem::path journalPath = path;
journalPath /= "journal";
if ( !boost::filesystem::exists( journalPath ) ) {
try {
boost::filesystem::create_directory( journalPath );
}
catch( std::exception& e) {
log() << "error creating journal dir " << journalPath.string() << ' ' << e.what();
throw;
}
}
}
std::stringstream ss;
ss << "create,";
ss << "cache_size=" << cacheSizeGB << "G,";
ss << "session_max=20000,";
ss << "eviction=(threads_max=4),";
ss << "statistics=(fast),";
if ( _durable ) {
ss << "log=(enabled=true,archive=true,path=journal,compressor=";
ss << wiredTigerGlobalOptions.journalCompressor << "),";
}
ss << "checkpoint=(wait=" << wiredTigerGlobalOptions.checkpointDelaySecs;
ss << ",log_size=2GB),";
ss << "statistics_log=(wait=" << wiredTigerGlobalOptions.statisticsLogDelaySecs << "),";
ss << extraOpenOptions;
string config = ss.str();
log() << "wiredtiger_open config: " << config;
int ret = wiredtiger_open(path.c_str(), &_eventHandler, config.c_str(), &_conn);
// Invalid argument (EINVAL) is usually caused by invalid configuration string.
// We still fassert() but without a stack trace.
if (ret == EINVAL) {
fassertFailedNoTrace(28561);
}
else if (ret != 0) {
Status s(wtRCToStatus(ret));
msgassertedNoTrace(28595, s.reason());
}
_sessionCache.reset( new WiredTigerSessionCache( this ) );
_sizeStorerUri = "table:sizeStorer";
{
WiredTigerSession session(_conn);
if (repair && _hasUri(session.getSession(), _sizeStorerUri)) {
log() << "Repairing size cache";
fassertNoTrace(28577, _salvageIfNeeded(_sizeStorerUri.c_str()));
}
WiredTigerSizeStorer* ss = new WiredTigerSizeStorer();
ss->loadFrom( &session, _sizeStorerUri );
_sizeStorer.reset( ss );
}
}
WiredTigerKVEngine::~WiredTigerKVEngine() {
if (_conn) {
cleanShutdown();
}
_sizeStorer.reset( NULL );
_sessionCache.reset( NULL );
}
void WiredTigerKVEngine::cleanShutdown() {
log() << "WiredTigerKVEngine shutting down";
syncSizeInfo(true);
if (_conn) {
// this must be the last thing we do before _conn->close();
_sessionCache->shuttingDown();
#if !__has_feature(address_sanitizer)
const char* config = "leak_memory=true";
#else
const char* config = NULL;
#endif
invariantWTOK( _conn->close(_conn, config) );
_conn = NULL;
}
}
Status WiredTigerKVEngine::okToRename( OperationContext* opCtx,
StringData fromNS,
StringData toNS,
StringData ident,
const RecordStore* originalRecordStore ) const {
_sizeStorer->store( _uri( ident ),
originalRecordStore->numRecords( opCtx ),
originalRecordStore->dataSize( opCtx ) );
syncSizeInfo(true);
return Status::OK();
}
int64_t WiredTigerKVEngine::getIdentSize( OperationContext* opCtx,
StringData ident ) {
WiredTigerSession* session = WiredTigerRecoveryUnit::get(opCtx)->getSession(opCtx);
return WiredTigerUtil::getIdentSize(session->getSession(), _uri(ident) );
}
Status WiredTigerKVEngine::repairIdent( OperationContext* opCtx,
StringData ident ) {
WiredTigerSession* session = WiredTigerRecoveryUnit::get(opCtx)->getSession(opCtx);
session->closeAllCursors();
string uri = _uri(ident);
return _salvageIfNeeded(uri.c_str());
}
Status WiredTigerKVEngine::_salvageIfNeeded(const char* uri) {
// Using a side session to avoid transactional issues
WiredTigerSession sessionWrapper(_conn);
WT_SESSION* session = sessionWrapper.getSession();
int rc = (session->verify)(session, uri, NULL);
if (rc == 0) {
log() << "Verify succeeded on uri " << uri << ". Not salvaging.";
return Status::OK();
}
if (rc == EBUSY) {
// SERVER-16457: verify and salvage are occasionally failing with EBUSY. For now we
// lie and return OK to avoid breaking tests. This block should go away when that ticket
// is resolved.
error() << "Verify on " << uri << " failed with EBUSY. Assuming no salvage is needed.";
return Status::OK();
}
// TODO need to cleanup the sizeStorer cache after salvaging.
log() << "Verify failed on uri " << uri << ". Running a salvage operation.";
return wtRCToStatus(session->salvage(session, uri, NULL), "Salvage failed:");
}
int WiredTigerKVEngine::flushAllFiles( bool sync ) {
LOG(1) << "WiredTigerKVEngine::flushAllFiles";
syncSizeInfo(true);
WiredTigerSession session(_conn);
WT_SESSION* s = session.getSession();
invariantWTOK( s->checkpoint(s, NULL ) );
return 1;
}
void WiredTigerKVEngine::syncSizeInfo( bool sync ) const {
if ( !_sizeStorer )
return;
try {
WiredTigerSession session(_conn);
WT_SESSION* s = session.getSession();
invariantWTOK( s->begin_transaction( s, sync ? "sync=true" : NULL ) );
_sizeStorer->storeInto( &session, _sizeStorerUri );
invariantWTOK( s->commit_transaction( s, NULL ) );
}
catch (const WriteConflictException&) {
// ignore, it means someone else is doing it
}
}
RecoveryUnit* WiredTigerKVEngine::newRecoveryUnit() {
return new WiredTigerRecoveryUnit( _sessionCache.get() );
}
void WiredTigerKVEngine::setRecordStoreExtraOptions( const std::string& options ) {
_rsOptions = options;
}
void WiredTigerKVEngine::setSortedDataInterfaceExtraOptions( const std::string& options ) {
_indexOptions = options;
}
Status WiredTigerKVEngine::createRecordStore( OperationContext* opCtx,
StringData ns,
StringData ident,
const CollectionOptions& options ) {
_checkIdentPath( ident );
WiredTigerSession session(_conn);
StatusWith result =
WiredTigerRecordStore::generateCreateString(ns, options, _rsOptions);
if (!result.isOK()) {
return result.getStatus();
}
std::string config = result.getValue();
string uri = _uri( ident );
WT_SESSION* s = session.getSession();
LOG(2) << "WiredTigerKVEngine::createRecordStore uri: " << uri << " config: " << config;
return wtRCToStatus( s->create( s, uri.c_str(), config.c_str() ) );
}
RecordStore* WiredTigerKVEngine::getRecordStore( OperationContext* opCtx,
StringData ns,
StringData ident,
const CollectionOptions& options ) {
if (options.capped) {
return new WiredTigerRecordStore(opCtx, ns, _uri(ident), options.capped,
options.cappedSize ? options.cappedSize : 4096,
options.cappedMaxDocs ? options.cappedMaxDocs : -1,
NULL,
_sizeStorer.get() );
}
else {
return new WiredTigerRecordStore(opCtx, ns, _uri(ident),
false, -1, -1, NULL, _sizeStorer.get() );
}
}
string WiredTigerKVEngine::_uri( StringData ident ) const {
return string("table:") + ident.toString();
}
Status WiredTigerKVEngine::createSortedDataInterface( OperationContext* opCtx,
StringData ident,
const IndexDescriptor* desc ) {
_checkIdentPath( ident );
StatusWith result =
WiredTigerIndex::generateCreateString(_indexOptions, *desc);
if (!result.isOK()) {
return result.getStatus();
}
std::string config = result.getValue();
LOG(2) << "WiredTigerKVEngine::createSortedDataInterface ident: " << ident
<< " config: " << config;
return wtRCToStatus(WiredTigerIndex::Create(opCtx, _uri(ident), config));
}
SortedDataInterface* WiredTigerKVEngine::getSortedDataInterface( OperationContext* opCtx,
StringData ident,
const IndexDescriptor* desc ) {
if ( desc->unique() )
return new WiredTigerIndexUnique( opCtx, _uri( ident ), desc );
return new WiredTigerIndexStandard( opCtx, _uri( ident ), desc );
}
Status WiredTigerKVEngine::dropIdent( OperationContext* opCtx,
StringData ident ) {
_drop( ident );
return Status::OK();
}
bool WiredTigerKVEngine::_drop( StringData ident ) {
string uri = _uri( ident );
WiredTigerSession session(_conn);
int ret = session.getSession()->drop( session.getSession(), uri.c_str(), "force" );
LOG(1) << "WT drop of " << uri << " res " << ret;
if ( ret == 0 ) {
// yay, it worked
return true;
}
if ( ret == EBUSY ) {
// this is expected, queue it up
{
boost::mutex::scoped_lock lk( _identToDropMutex );
_identToDrop.insert( uri );
}
_sessionCache->closeAll();
return false;
}
invariantWTOK( ret );
return false;
}
bool WiredTigerKVEngine::haveDropsQueued() const {
if ( _sizeStorerSyncTracker.intervalHasElapsed() ) {
_sizeStorerSyncTracker.resetLastTime();
syncSizeInfo(false);
}
boost::mutex::scoped_lock lk( _identToDropMutex );
return !_identToDrop.empty();
}
void WiredTigerKVEngine::dropAllQueued() {
set mine;
{
boost::mutex::scoped_lock lk( _identToDropMutex );
mine = _identToDrop;
}
set deleted;
{
WiredTigerSession session(_conn);
for ( set::const_iterator it = mine.begin(); it != mine.end(); ++it ) {
string uri = *it;
int ret = session.getSession()->drop( session.getSession(), uri.c_str(), "force" );
LOG(1) << "WT queued drop of " << uri << " res " << ret;
if ( ret == 0 ) {
deleted.insert( uri );
continue;
}
if ( ret == EBUSY ) {
// leave in qeuue
continue;
}
invariantWTOK( ret );
}
}
{
boost::mutex::scoped_lock lk( _identToDropMutex );
for ( set::const_iterator it = deleted.begin(); it != deleted.end(); ++it ) {
_identToDrop.erase( *it );
}
}
}
bool WiredTigerKVEngine::supportsDocLocking() const {
return true;
}
bool WiredTigerKVEngine::supportsDirectoryPerDB() const {
return true;
}
bool WiredTigerKVEngine::hasIdent(OperationContext* opCtx, StringData ident) const {
return _hasUri(WiredTigerRecoveryUnit::get(opCtx)->getSession(opCtx)->getSession(),
_uri(ident));
}
bool WiredTigerKVEngine::_hasUri(WT_SESSION* session, const std::string& uri) const {
// can't use WiredTigerCursor since this is called from constructor.
WT_CURSOR* c = NULL;
int ret = session->open_cursor(session, "metadata:", NULL, NULL, &c);
if (ret == ENOENT) return false;
invariantWTOK(ret);
ON_BLOCK_EXIT(c->close, c);
c->set_key(c, uri.c_str());
return c->search(c) == 0;
}
std::vector WiredTigerKVEngine::getAllIdents( OperationContext* opCtx ) const {
std::vector all;
WiredTigerCursor cursor( "metadata:", WiredTigerSession::kMetadataCursorId, false, opCtx );
WT_CURSOR* c = cursor.get();
if ( !c )
return all;
while ( c->next(c) == 0 ) {
const char* raw;
c->get_key(c, &raw );
StringData key(raw);
size_t idx = key.find( ':' );
if ( idx == string::npos )
continue;
StringData type = key.substr( 0, idx );
if ( type != "table" )
continue;
StringData ident = key.substr(idx+1);
if ( ident == "sizeStorer" )
continue;
all.push_back( ident.toString() );
}
return all;
}
int WiredTigerKVEngine::reconfigure(const char* str) {
return _conn->reconfigure(_conn, str);
}
void WiredTigerKVEngine::_checkIdentPath( StringData ident ) {
size_t start = 0;
size_t idx;
while ( ( idx = ident.find( '/', start ) ) != string::npos ) {
StringData dir = ident.substr( 0, idx );
boost::filesystem::path subdir = _path;
subdir /= dir.toString();
if ( !boost::filesystem::exists( subdir ) ) {
LOG(1) << "creating subdirectory: " << dir;
try {
boost::filesystem::create_directory( subdir );
}
catch (const std::exception& e) {
error() << "error creating path " << subdir.string() << ' ' << e.what();
throw;
}
}
start = idx + 1;
}
}
}