// wiredtiger_index.cpp /** * Copyright (C) 2014 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kStorage #include "mongo/platform/basic.h" #include "mongo/db/storage/wiredtiger/wiredtiger_index.h" #include #include "mongo/db/json.h" #include "mongo/db/catalog/index_catalog_entry.h" #include "mongo/db/index/index_descriptor.h" #include "mongo/db/storage/key_string.h" #include "mongo/db/storage/wiredtiger/wiredtiger_global_options.h" #include "mongo/db/storage/wiredtiger/wiredtiger_record_store.h" #include "mongo/db/storage/wiredtiger/wiredtiger_session_cache.h" #include "mongo/db/storage/wiredtiger/wiredtiger_util.h" #include "mongo/db/storage_options.h" #include "mongo/util/log.h" #include "mongo/util/assert_util.h" #include "mongo/util/mongoutils/str.h" #if 0 #define TRACE_CURSOR log() << "WT index (" << (const void*)&_idx << ") " #define TRACE_INDEX log() << "WT index (" << (const void*)this << ") " #else #define TRACE_CURSOR if ( 0 ) log() #define TRACE_INDEX if ( 0 ) log() #endif namespace mongo { namespace { static const int TempKeyMaxSize = 1024; // this goes away with SERVER-3372 static const WiredTigerItem emptyItem(NULL, 0); bool shouldCheckIndexVersions = true; static const int kMinimumIndexVersion = 6; static const int kCurrentIndexVersion = 6; // New indexes use this by default. static const int kMaximumIndexVersion = 6; BOOST_STATIC_ASSERT(kCurrentIndexVersion >= kMinimumIndexVersion); BOOST_STATIC_ASSERT(kCurrentIndexVersion <= kMaximumIndexVersion); bool hasFieldNames(const BSONObj& obj) { BSONForEach(e, obj) { if (e.fieldName()[0]) return true; } return false; } BSONObj stripFieldNames(const BSONObj& query) { if (!hasFieldNames(query)) return query; BSONObjBuilder bb; BSONForEach(e, query) { bb.appendAs(e, StringData()); } return bb.obj(); } // taken from btree_logic.cpp Status dupKeyError(const BSONObj& key) { StringBuilder sb; sb << "E11000 duplicate key error "; sb << "dup key: " << key; return Status(ErrorCodes::DuplicateKey, sb.str()); } Status checkKeySize(const BSONObj& key) { if ( key.objsize() >= TempKeyMaxSize ) { string msg = mongoutils::str::stream() << "WiredTigerIndex::insert: key too large to index, failing " << ' ' << key.objsize() << ' ' << key; return Status(ErrorCodes::KeyTooLong, msg); } return Status::OK(); } } // namespace // static void WiredTigerIndex::disableVersionCheckForRepair() { shouldCheckIndexVersions = false; } // static StatusWith WiredTigerIndex::parseIndexOptions(const BSONObj& options) { BSONForEach(elem, options) { if (elem.fieldNameStringData() == "configString") { if (elem.type() != String) { return StatusWith(ErrorCodes::TypeMismatch, str::stream() << "configString must be a string. " << "Not adding 'configString' value " << elem << " to index configuration"); } if (elem.valueStringData().empty()) { return StatusWith(ErrorCodes::InvalidOptions, "configString must be not be an empty string."); } return StatusWith(elem.String()); } else { // Return error on first unrecognized field. return StatusWith(ErrorCodes::InvalidOptions, str::stream() << '\'' << elem.fieldNameStringData() << '\'' << " is not a supported option."); } } return StatusWith(ErrorCodes::BadValue, "Storage engine options document must not be empty."); } // static StatusWith WiredTigerIndex::generateCreateString(const std::string& extraConfig, const IndexDescriptor& desc) { str::stream ss; // Separate out a prefix and suffix in the default string. User configuration will // override values in the prefix, but not values in the suffix. ss << "type=file,leaf_page_max=16k,"; if (wiredTigerGlobalOptions.useIndexPrefixCompression) { ss << "prefix_compression,"; } ss << "block_compressor=" << wiredTigerGlobalOptions.indexBlockCompressor << ","; ss << extraConfig; // Validate configuration object. // Raise an error about unrecognized fields that may be introduced in newer versions of // this storage engine. // Ensure that 'configString' field is a string. Raise an error if this is not the case. BSONElement storageEngineElement = desc.getInfoElement("storageEngine"); if (storageEngineElement.isABSONObj()) { BSONObj storageEngine = storageEngineElement.Obj(); StatusWith parseStatus = parseIndexOptions(storageEngine.getObjectField(kWiredTigerEngineName)); if (!parseStatus.isOK()) { return parseStatus; } if (!parseStatus.getValue().empty()) { ss << "," << parseStatus.getValue(); } } // WARNING: No user-specified config can appear below this line. These options are required // for correct behavior of the server. // Indexes need to store the metadata for collation to work as expected. ss << ",key_format=u,value_format=u"; // Index metadata ss << ",app_metadata=(" << "formatVersion=" << kCurrentIndexVersion << ',' << "infoObj=" << desc.infoObj().jsonString() << "),"; LOG(3) << "index create string: " << ss.ss.str(); return StatusWith(ss); } int WiredTigerIndex::Create(OperationContext* txn, const std::string& uri, const std::string& config) { WT_SESSION* s = WiredTigerRecoveryUnit::get( txn )->getSession()->getSession(); LOG(1) << "create uri: " << uri << " config: " << config; return s->create(s, uri.c_str(), config.c_str()); } WiredTigerIndex::WiredTigerIndex(OperationContext* ctx, const std::string& uri, const IndexDescriptor* desc) : _ordering(Ordering::make(desc->keyPattern())), _uri( uri ), _instanceId( WiredTigerSession::genCursorId() ) { if (shouldCheckIndexVersions) { Status versionStatus = WiredTigerUtil::checkApplicationMetadataFormatVersion(ctx, uri, kMinimumIndexVersion, kMaximumIndexVersion); if (!versionStatus.isOK()) { fassertFailedWithStatusNoTrace(28579, versionStatus); } } } Status WiredTigerIndex::insert(OperationContext* txn, const BSONObj& key, const RecordId& loc, bool dupsAllowed) { invariant(loc.isNormal()); dassert(!hasFieldNames(key)); Status s = checkKeySize(key); if (!s.isOK()) return s; WiredTigerCursor curwrap(_uri, _instanceId, false, txn); curwrap.assertInActiveTxn(); WT_CURSOR *c = curwrap.get(); return _insert( c, key, loc, dupsAllowed ); } void WiredTigerIndex::unindex(OperationContext* txn, const BSONObj& key, const RecordId& loc, bool dupsAllowed ) { invariant(loc.isNormal()); dassert(!hasFieldNames(key)); WiredTigerCursor curwrap(_uri, _instanceId, false, txn); curwrap.assertInActiveTxn(); WT_CURSOR *c = curwrap.get(); invariant( c ); _unindex( c, key, loc, dupsAllowed ); } void WiredTigerIndex::fullValidate(OperationContext* txn, bool full, long long *numKeysOut, BSONObjBuilder* output) const { boost::scoped_ptr cursor(newCursor(txn, 1)); cursor->locate( minKey, RecordId::min() ); long long count = 0; TRACE_INDEX << " fullValidate"; while ( !cursor->isEOF() ) { TRACE_INDEX << "\t" << cursor->getKey(); cursor->advance(); count++; } if ( numKeysOut ) { *numKeysOut = count; } // Nothing further to do if 'full' validation is not requested. if (!full) { return; } invariant(output); appendCustomStats(txn, output, 1); } bool WiredTigerIndex::appendCustomStats(OperationContext* txn, BSONObjBuilder* output, double scale) const { { BSONObjBuilder metadata(output->subobjStart("metadata")); Status status = WiredTigerUtil::getApplicationMetadata(txn, uri(), &metadata); if (!status.isOK()) { metadata.append("error", "unable to retrieve metadata"); metadata.append("code", static_cast(status.code())); metadata.append("reason", status.reason()); } } std::string type, sourceURI; WiredTigerUtil::fetchTypeAndSourceURI(txn, _uri, &type, &sourceURI); StatusWith metadataResult = WiredTigerUtil::getMetadata(txn, sourceURI); StringData creationStringName("creationString"); if (!metadataResult.isOK()) { BSONObjBuilder creationString(output->subobjStart(creationStringName)); creationString.append("error", "unable to retrieve creation config"); creationString.append("code", static_cast(metadataResult.getStatus().code())); creationString.append("reason", metadataResult.getStatus().reason()); } else { output->append(creationStringName, metadataResult.getValue()); // Type can be "lsm" or "file" output->append("type", type); } WiredTigerSession* session = WiredTigerRecoveryUnit::get(txn)->getSession(); WT_SESSION* s = session->getSession(); Status status = WiredTigerUtil::exportTableToBSON(s, "statistics:" + uri(), "statistics=(fast)", output); if (!status.isOK()) { output->append("error", "unable to retrieve statistics"); output->append("code", static_cast(status.code())); output->append("reason", status.reason()); } return true; } Status WiredTigerIndex::dupKeyCheck( OperationContext* txn, const BSONObj& key, const RecordId& loc) { invariant(!hasFieldNames(key)); invariant(unique()); WiredTigerCursor curwrap(_uri, _instanceId, false, txn); WT_CURSOR *c = curwrap.get(); if ( isDup(c, key, loc) ) return dupKeyError(key); return Status::OK(); } bool WiredTigerIndex::isEmpty(OperationContext* txn) { WiredTigerCursor curwrap(_uri, _instanceId, false, txn); WT_CURSOR *c = curwrap.get(); if (!c) return true; int ret = c->next(c); if (ret == WT_NOTFOUND) return true; invariantWTOK(ret); return false; } long long WiredTigerIndex::getSpaceUsedBytes( OperationContext* txn ) const { WiredTigerSession* session = WiredTigerRecoveryUnit::get(txn)->getSession(); return static_cast( WiredTigerUtil::getIdentSize( session->getSession(), _uri ) ); } bool WiredTigerIndex::isDup(WT_CURSOR *c, const BSONObj& key, const RecordId& loc ) { invariant( unique() ); // First check whether the key exists. KeyString data = KeyString::make( key, _ordering ); WiredTigerItem item( data.getBuffer(), data.getSize() ); c->set_key( c, item.Get() ); int ret = c->search(c); if ( ret == WT_NOTFOUND ) return false; invariantWTOK( ret ); // If the key exists, check if we already have this loc at this key. If so, we don't // consider that to be a dup. WT_ITEM value; invariantWTOK( c->get_value(c,&value) ); BufReader br(value.data, value.size); while (br.remaining()) { if (KeyString::decodeRecordId(&br) == loc) return false; KeyString::TypeBits::fromBuffer(&br); // Just calling this to advance reader. } return true; } Status WiredTigerIndex::initAsEmpty(OperationContext* txn) { // No-op return Status::OK(); } /** * Base class for WiredTigerIndex bulk builders. * * Manages the bulk cursor used by bulk builders. */ class WiredTigerIndex::BulkBuilder : public SortedDataBuilderInterface { public: BulkBuilder(WiredTigerIndex* idx, OperationContext* txn) : _ordering(idx->_ordering) , _txn(txn) , _session(WiredTigerRecoveryUnit::get(_txn)->getSessionCache()->getSession()) , _cursor(openBulkCursor(idx)) {} ~BulkBuilder() { _cursor->close(_cursor); WiredTigerRecoveryUnit::get(_txn)->getSessionCache()->releaseSession(_session); } protected: WT_CURSOR* openBulkCursor(WiredTigerIndex* idx) { // Open cursors can cause bulk open_cursor to fail with EBUSY. // TODO any other cases that could cause EBUSY? WiredTigerSession* outerSession = WiredTigerRecoveryUnit::get(_txn)->getSession(); outerSession->closeAllCursors(); // Not using cursor cache since we need to set "bulk". WT_CURSOR* cursor; // We use our own session to ensure we aren't in a transaction. WT_SESSION* session = _session->getSession(); int err = session->open_cursor(session, idx->uri().c_str(), NULL, "bulk", &cursor); if (!err) return cursor; warning() << "failed to create WiredTiger bulk cursor: " << wiredtiger_strerror(err); warning() << "falling back to non-bulk cursor for index " << idx->uri(); invariantWTOK(session->open_cursor(session, idx->uri().c_str(), NULL, NULL, &cursor)); return cursor; } const Ordering _ordering; OperationContext* const _txn; WiredTigerSession* const _session; WT_CURSOR* const _cursor; }; /** * Bulk builds a non-unique index. */ class WiredTigerIndex::StandardBulkBuilder : public BulkBuilder { public: StandardBulkBuilder(WiredTigerIndex* idx, OperationContext* txn) : BulkBuilder(idx, txn), _idx(idx) { } Status addKey(const BSONObj& key, const RecordId& loc) { { const Status s = checkKeySize(key); if (!s.isOK()) return s; } KeyString data = KeyString::make( key, _idx->_ordering, loc ); // Can't use WiredTigerCursor since we aren't using the cache. WiredTigerItem item(data.getBuffer(), data.getSize()); _cursor->set_key(_cursor, item.Get() ); WiredTigerItem valueItem = data.getTypeBits().isAllZeros() ? emptyItem : WiredTigerItem(data.getTypeBits().getBuffer(), data.getTypeBits().getSize()); _cursor->set_value(_cursor, valueItem.Get()); invariantWTOK(_cursor->insert(_cursor)); invariantWTOK(_cursor->reset(_cursor)); return Status::OK(); } void commit(bool mayInterrupt) { // TODO do we still need this? // this is bizarre, but required as part of the contract WriteUnitOfWork uow( _txn ); uow.commit(); } private: WiredTigerIndex* _idx; }; /** * Bulk builds a unique index. * * In order to support unique indexes in dupsAllowed mode this class only does an actual insert * after it sees a key after the one we are trying to insert. This allows us to gather up all * duplicate locs and insert them all together. This is necessary since bulk cursors can only * append data. */ class WiredTigerIndex::UniqueBulkBuilder : public BulkBuilder { public: UniqueBulkBuilder(WiredTigerIndex* idx, OperationContext* txn, bool dupsAllowed) : BulkBuilder(idx, txn), _idx(idx), _dupsAllowed(dupsAllowed) { } Status addKey(const BSONObj& newKey, const RecordId& loc) { { const Status s = checkKeySize(newKey); if (!s.isOK()) return s; } const int cmp = newKey.woCompare(_key, _ordering); if (cmp != 0) { if (!_key.isEmpty()) { // _key.isEmpty() is only true on the first call to addKey(). invariant(cmp > 0); // newKey must be > the last key // We are done with dups of the last key so we can insert it now. doInsert(); } invariant(_records.empty()); } else { // Dup found! if (!_dupsAllowed) { return dupKeyError(newKey); } // If we get here, we are in the weird mode where dups are allowed on a unique // index, so add ourselves to the list of duplicate locs. This also replaces the // _key which is correct since any dups seen later are likely to be newer. } _key = newKey.getOwned(); _keyString.resetToKey(_key, _idx->ordering()); _records.push_back(std::make_pair(loc, _keyString.getTypeBits())); return Status::OK(); } void commit(bool mayInterrupt) { WriteUnitOfWork uow( _txn ); if (!_records.empty()) { // This handles inserting the last unique key. doInsert(); } uow.commit(); } private: void doInsert() { invariant(!_records.empty()); KeyString value; for (size_t i = 0; i < _records.size(); i++) { value.appendRecordId(_records[i].first); // When there is only one record, we can omit AllZeros TypeBits. Otherwise they need // to be included. if (!(_records[i].second.isAllZeros() && _records.size() == 1)) { value.appendTypeBits(_records[i].second); } } WiredTigerItem keyItem( _keyString.getBuffer(), _keyString.getSize() ); WiredTigerItem valueItem(value.getBuffer(), value.getSize()); _cursor->set_key(_cursor, keyItem.Get()); _cursor->set_value(_cursor, valueItem.Get()); invariantWTOK(_cursor->insert(_cursor)); invariantWTOK(_cursor->reset(_cursor)); _records.clear(); } WiredTigerIndex* _idx; const bool _dupsAllowed; BSONObj _key; KeyString _keyString; std::vector > _records; }; namespace { /** * Implements the basic WT_CURSOR functionality used by both unique and standard indexes. */ class WiredTigerIndexCursorBase : public SortedDataInterface::Cursor { public: WiredTigerIndexCursorBase(const WiredTigerIndex& idx, OperationContext *txn, bool forward) : _txn(txn), _cursor(idx.uri(), idx.instanceId(), false, txn), _idx(idx), _forward(forward), _eof(true), _isKeyCurrent(false) { } virtual int getDirection() const { return _forward ? 1 : -1; } virtual bool isEOF() const { return _eof; } virtual bool pointsToSamePlaceAs(const SortedDataInterface::Cursor& genOther) const { const WiredTigerIndexCursorBase& other = dynamic_cast(genOther); if ( _eof && other._eof ) return true; else if ( _eof || other._eof ) return false; // Check the locs first since they are likely to differ and comparing them is fast. if ( getRecordId() != other.getRecordId() ) return false; loadKeyIfNeeded(); other.loadKeyIfNeeded(); return _key.getSize() == other._key.getSize() && memcmp(_key.getBuffer(), other._key.getBuffer(), _key.getSize()) == 0; } bool locate(const BSONObj &key, const RecordId& loc) { const BSONObj finalKey = stripFieldNames(key); fillKey(finalKey, loc); bool result = _locate(loc); // An explicit search at the start of the range should always return false if (loc == RecordId::min() || loc == RecordId::max() ) return false; return result; } void advanceTo(const BSONObj &keyBegin, int keyBeginLen, bool afterKey, const vector& keyEnd, const vector& keyEndInclusive) { // TODO: don't go to a bson obj then to a KeyString, go straight BSONObj key = IndexEntryComparison::makeQueryObject( keyBegin, keyBeginLen, afterKey, keyEnd, keyEndInclusive, getDirection() ); fillKey(key, RecordId()); _locate(RecordId()); } void customLocate(const BSONObj& keyBegin, int keyBeginLen, bool afterKey, const vector& keyEnd, const vector& keyEndInclusive) { advanceTo(keyBegin, keyBeginLen, afterKey, keyEnd, keyEndInclusive); } BSONObj getKey() const { if (_isKeyCurrent && !_keyBson.isEmpty()) return _keyBson; loadKeyIfNeeded(); _keyBson = KeyString::toBson(_key.getBuffer(), _key.getSize(), _idx.ordering(), getTypeBits()); TRACE_INDEX << " returning key: " << _keyBson; return _keyBson; } void savePosition() { _savedForCheck = _txn->recoveryUnit(); if ( !wt_keeptxnopen() && !_eof ) { loadKeyIfNeeded(); _savedLoc = getRecordId(); _cursor.reset(); } _txn = NULL; } void restorePosition( OperationContext *txn ) { // Update the session handle with our new operation context. _txn = txn; invariant( _savedForCheck == txn->recoveryUnit() ); if ( !wt_keeptxnopen() && !_eof ) { // Ensure an active session exists, so any restored cursors will bind to it WiredTigerRecoveryUnit::get(txn)->getSession(); _locate(_savedLoc); } } protected: // Uses _key for the key. virtual bool _locate(RecordId loc) = 0; // Must invalidateCache() virtual void fillKey(const BSONObj& key, RecordId loc) = 0; virtual const KeyString::TypeBits& getTypeBits() const = 0; void advanceWTCursor() { invalidateCache(); WT_CURSOR *c = _cursor.get(); int ret = _forward ? c->next(c) : c->prev(c); if ( ret == WT_NOTFOUND ) { _eof = true; return; } invariantWTOK(ret); _eof = false; } // Seeks to _key. Returns true on exact match. bool seekWTCursor() { invalidateCache(); WT_CURSOR *c = _cursor.get(); int cmp = -1; const WiredTigerItem keyItem(_key.getBuffer(), _key.getSize()); c->set_key(c, keyItem.Get()); int ret = c->search_near(c, &cmp); if ( ret == WT_NOTFOUND ) { _eof = true; TRACE_CURSOR << "\t not found"; return false; } invariantWTOK( ret ); _eof = false; TRACE_CURSOR << "\t cmp: " << cmp; if (cmp == 0) { // Found it! This means _key must be current. Double check in DEV mode. _isKeyCurrent = true; dassertKeyCacheIsValid(); return true; } // Make sure we land on a matching key if (_forward) { // We need to be >= if (cmp < 0) { ret = c->next(c); } } else { // We need to be <= if (cmp > 0) { ret = c->prev(c); } } if (ret == WT_NOTFOUND) { _eof = true; TRACE_CURSOR << "\t eof " << ret << " _forward: " << _forward; } else { invariantWTOK(ret); } return false; } void loadKeyIfNeeded() const { if (_isKeyCurrent) { dassertKeyCacheIsValid(); return; } WT_CURSOR *c = _cursor.get(); WT_ITEM item; invariantWTOK(c->get_key(c, &item)); _key.resetFromBuffer(item.data, item.size); _isKeyCurrent = true; } virtual void invalidateCache() { _isKeyCurrent = false; _keyBson = BSONObj(); } virtual void dassertKeyCacheIsValid() const { DEV { invariant(_isKeyCurrent); WT_ITEM item; WT_CURSOR *c = _cursor.get(); invariantWTOK(c->get_key(c, &item)); invariant(item.size == _key.getSize()); invariant(memcmp(item.data, _key.getBuffer(), item.size) == 0); } } OperationContext *_txn; WiredTigerCursor _cursor; const WiredTigerIndex& _idx; // not owned const bool _forward; bool _eof; // For save/restorePosition RecoveryUnit* _savedForCheck; RecordId _savedLoc; // These are all lazily loaded caches. mutable BSONObj _keyBson; // if isEmpty, it is invalid and must be loaded from _key. mutable bool _isKeyCurrent; // true if _key matches where the cursor is pointing mutable KeyString _key; }; class WiredTigerIndexStandardCursor : public WiredTigerIndexCursorBase { public: WiredTigerIndexStandardCursor(const WiredTigerIndex& idx, OperationContext *txn, bool forward) : WiredTigerIndexCursorBase(idx, txn, forward), _isTypeBitsValid(false) { } virtual void invalidateCache() { WiredTigerIndexCursorBase::invalidateCache(); _loc = RecordId(); _isTypeBitsValid = false; } virtual void fillKey(const BSONObj& key, RecordId loc) { TRACE_CURSOR << " fillKey " << key << " " << loc << (_forward ? " forward" : " backward"); // Null cursors should start at the zero key to maintain search ordering in the // collator. // Reverse cursors should start on the last matching key. if (loc.isNull()) loc = _forward ? RecordId::min() : RecordId::max(); _key.resetToKey(key, _idx.ordering(), loc); invalidateCache(); } virtual bool _locate(RecordId loc) { // loc already encoded in _key return seekWTCursor(); } virtual RecordId getRecordId() const { if ( _eof ) return RecordId(); if (_loc.isNull()) { loadKeyIfNeeded(); _loc = KeyString::decodeRecordIdAtEnd(_key.getBuffer(), _key.getSize()); } dassert(!_loc.isNull()); return _loc; } virtual void advance() { // Advance on a cursor at the end is a no-op if (_eof) return; advanceWTCursor(); } virtual const KeyString::TypeBits& getTypeBits() const { if (!_isTypeBitsValid) { WT_CURSOR *c = _cursor.get(); WT_ITEM item; invariantWTOK(c->get_value(c, &item)); BufReader br(item.data, item.size); _typeBits.resetFromBuffer(&br); _isTypeBitsValid = true; } return _typeBits; } private: mutable RecordId _loc; mutable bool _isTypeBitsValid; mutable KeyString::TypeBits _typeBits; }; class WiredTigerIndexUniqueCursor : public WiredTigerIndexCursorBase { public: WiredTigerIndexUniqueCursor(const WiredTigerIndex& idx, OperationContext *txn, bool forward) : WiredTigerIndexCursorBase(idx, txn, forward), _recordsIndex(0) { } virtual void invalidateCache() { WiredTigerIndexCursorBase::invalidateCache(); _records.clear(); } virtual void fillKey(const BSONObj& key, RecordId loc) { TRACE_CURSOR << " fillKey " << key << " " << loc << (_forward ? " forward" : " backward"); invalidateCache(); _key.resetToKey(key, _idx.ordering()); // loc doesn't go in _key for unique indexes } virtual bool _locate(RecordId loc) { if (!seekWTCursor()) { // If didn't seek to exact key, start at beginning of wherever we ended up. return false; } dassert(!_eof); if ( loc.isNull() ) { // Null loc means means start and beginning or end of array as needed. // so nothing to do return true; } // If we get here we need to make sure we are positioned at the correct point of the // _records vector. TRACE_CURSOR << "\t in weird"; if ( _forward ) { while (getRecordId() < loc) { _recordsIndex++; if (_recordsIndex == _records.size()) { // This means we exhausted the scan and didn't find a record in range. advanceWTCursor(); return false; } } } else { while (getRecordId() > loc) { _recordsIndex++; if (_recordsIndex == _records.size()) { advanceWTCursor(); return false; } } } return true; } virtual RecordId getRecordId() const { if ( _eof ) return RecordId(); loadValueIfNeeded(); dassert(!_records[_recordsIndex].first.isNull()); return _records[_recordsIndex].first; } virtual void advance() { // Advance on a cursor at the end is a no-op if ( _eof ) return; // We may just be advancing within the RecordIds for this key. loadValueIfNeeded(); _recordsIndex++; if (_recordsIndex == _records.size()) { advanceWTCursor(); } } virtual const KeyString::TypeBits& getTypeBits() const { invariant(!_eof); loadValueIfNeeded(); return _records[_recordsIndex].second; } private: void loadValueIfNeeded() const { if (!_records.empty()) return; _recordsIndex = 0; WT_CURSOR *c = _cursor.get(); WT_ITEM item; invariantWTOK( c->get_value(c, &item ) ); BufReader br(item.data, item.size); while (br.remaining()) { RecordId loc = KeyString::decodeRecordId(&br); _records.push_back(std::make_pair(loc, KeyString::TypeBits::fromBuffer(&br))); } invariant(!_records.empty()); if (!_forward) std::reverse(_records.begin(), _records.end()); } mutable size_t _recordsIndex; mutable std::vector > _records; }; } // namespace WiredTigerIndexUnique::WiredTigerIndexUnique( OperationContext* ctx, const std::string& uri, const IndexDescriptor* desc ) : WiredTigerIndex( ctx, uri, desc ) { } SortedDataInterface::Cursor* WiredTigerIndexUnique::newCursor(OperationContext* txn, int direction) const { invariant((direction == 1) || (direction == -1)); return new WiredTigerIndexUniqueCursor(*this, txn, direction == 1); } SortedDataBuilderInterface* WiredTigerIndexUnique::getBulkBuilder(OperationContext* txn, bool dupsAllowed) { return new UniqueBulkBuilder(this, txn, dupsAllowed); } Status WiredTigerIndexUnique::_insert( WT_CURSOR* c, const BSONObj& key, const RecordId& loc, bool dupsAllowed ) { const KeyString data = KeyString::make( key, _ordering ); WiredTigerItem keyItem( data.getBuffer(), data.getSize() ); KeyString value = KeyString::make(loc); if (!data.getTypeBits().isAllZeros()) value.appendTypeBits(data.getTypeBits()); WiredTigerItem valueItem(value.getBuffer(), value.getSize()); c->set_key( c, keyItem.Get() ); c->set_value( c, valueItem.Get() ); int ret = c->insert( c ); if ( ret == WT_ROLLBACK && !dupsAllowed ) { // if there is a conflict on a unique key, it means there is a dup key // even if someone else is deleting at the same time, its ok to fail this // insert as a dup key as it a race return dupKeyError(key); } else if ( ret != WT_DUPLICATE_KEY ) { return wtRCToStatus( ret ); } // we might be in weird mode where there might be multiple values // we put them all in the "list" // Note that we can't omit AllZeros when there are multiple locs for a value. When we remove // down to a single value, it will be cleaned up. ret = c->search(c); invariantWTOK( ret ); WT_ITEM old; invariantWTOK( c->get_value(c, &old ) ); bool insertedLoc = false; value.resetToEmpty(); BufReader br(old.data, old.size); while (br.remaining()) { RecordId locInIndex = KeyString::decodeRecordId(&br); if (loc == locInIndex) return Status::OK(); // already in index if (loc < locInIndex) { value.appendRecordId(loc); value.appendTypeBits(data.getTypeBits()); insertedLoc = true; } // Copy from old to new value value.appendRecordId(locInIndex); value.appendTypeBits(KeyString::TypeBits::fromBuffer(&br)); } if (!dupsAllowed) return dupKeyError(key); if (!insertedLoc) { // This loc is higher than all currently in the index for this key value.appendRecordId(loc); value.appendTypeBits(data.getTypeBits()); } valueItem = WiredTigerItem(value.getBuffer(), value.getSize()); c->set_value( c, valueItem.Get() ); return wtRCToStatus( c->update( c ) ); } void WiredTigerIndexUnique::_unindex( WT_CURSOR* c, const BSONObj& key, const RecordId& loc, bool dupsAllowed ) { KeyString data = KeyString::make( key, _ordering ); WiredTigerItem keyItem( data.getBuffer(), data.getSize() ); c->set_key( c, keyItem.Get() ); if ( !dupsAllowed ) { // nice and clear int ret = c->remove(c); if (ret == WT_NOTFOUND) { return; } invariantWTOK(ret); return; } // dups are allowed, so we have to deal with a vector of RecordIds. int ret = c->search(c); if ( ret == WT_NOTFOUND ) return; invariantWTOK( ret ); WT_ITEM old; invariantWTOK( c->get_value(c, &old ) ); bool foundLoc = false; std::vector > records; BufReader br(old.data, old.size); while (br.remaining()) { RecordId locInIndex = KeyString::decodeRecordId(&br); KeyString::TypeBits typeBits = KeyString::TypeBits::fromBuffer(&br); if (loc == locInIndex) { if (records.empty() && !br.remaining()) { // This is the common case: we are removing the only loc for this key. // Remove the whole entry. invariantWTOK(c->remove(c)); return; } foundLoc = true; continue; } records.push_back(std::make_pair(locInIndex, typeBits)); } if (!foundLoc) { warning().stream() << loc << " not found in the index for key " << key; return; // nothing to do } // Put other locs for this key back in the index. KeyString newValue; invariant(!records.empty()); for (size_t i = 0; i < records.size(); i++) { newValue.appendRecordId(records[i].first); // When there is only one record, we can omit AllZeros TypeBits. Otherwise they need // to be included. if (!(records[i].second.isAllZeros() && records.size() == 1)) { newValue.appendTypeBits(records[i].second); } } WiredTigerItem valueItem = WiredTigerItem(newValue.getBuffer(), newValue.getSize()); c->set_value( c, valueItem.Get() ); invariantWTOK( c->update( c ) ); } // ------------------------------ WiredTigerIndexStandard::WiredTigerIndexStandard( OperationContext* ctx, const std::string& uri, const IndexDescriptor* desc ) : WiredTigerIndex( ctx, uri, desc ) { } SortedDataInterface::Cursor* WiredTigerIndexStandard::newCursor(OperationContext* txn, int direction) const { invariant((direction == 1) || (direction == -1)); return new WiredTigerIndexStandardCursor(*this, txn, direction == 1); } SortedDataBuilderInterface* WiredTigerIndexStandard::getBulkBuilder(OperationContext* txn, bool dupsAllowed) { // We aren't unique so dups better be allowed. invariant(dupsAllowed); return new StandardBulkBuilder(this, txn); } Status WiredTigerIndexStandard::_insert( WT_CURSOR* c, const BSONObj& keyBson, const RecordId& loc, bool dupsAllowed ) { invariant( dupsAllowed ); TRACE_INDEX << " key: " << keyBson << " loc: " << loc; KeyString key = KeyString::make( keyBson, _ordering, loc ); WiredTigerItem keyItem( key.getBuffer(), key.getSize() ); WiredTigerItem valueItem = key.getTypeBits().isAllZeros() ? emptyItem : WiredTigerItem(key.getTypeBits().getBuffer(), key.getTypeBits().getSize()); c->set_key(c, keyItem.Get()); c->set_value(c, valueItem.Get()); int ret = c->insert( c ); if ( ret != WT_DUPLICATE_KEY ) return wtRCToStatus( ret ); // If the record was already in the index, we just return OK. // This can happen, for example, when building a background index while documents are being // written and reindexed. return Status::OK(); } void WiredTigerIndexStandard::_unindex( WT_CURSOR* c, const BSONObj& key, const RecordId& loc, bool dupsAllowed ) { invariant( dupsAllowed ); KeyString data = KeyString::make( key, _ordering, loc ); WiredTigerItem item( data.getBuffer(), data.getSize() ); c->set_key(c, item.Get() ); int ret = c->remove(c); if (ret != WT_NOTFOUND) { invariantWTOK(ret); } } // ---------------- for compatability with rc4 and previous ------ int index_collator_customize(WT_COLLATOR *coll, WT_SESSION *s, const char *uri, WT_CONFIG_ITEM *metadata, WT_COLLATOR **collp) { fassertFailedWithStatusNoTrace(28580, Status(ErrorCodes::UnsupportedFormat, str::stream() << "Found an index from an unsupported RC version." << " Please restart with --repair to fix.")); } extern "C" MONGO_COMPILER_API_EXPORT int index_collator_extension(WT_CONNECTION *conn, WT_CONFIG_ARG *cfg) { static WT_COLLATOR idx_static; idx_static.customize = index_collator_customize; return conn->add_collator(conn, "mongo_index", &idx_static, NULL); } } // namespace mongo