diff options
Diffstat (limited to 'src/mongo/db/storage/in_memory/in_memory_record_store.cpp')
-rw-r--r-- | src/mongo/db/storage/in_memory/in_memory_record_store.cpp | 963 |
1 files changed, 474 insertions, 489 deletions
diff --git a/src/mongo/db/storage/in_memory/in_memory_record_store.cpp b/src/mongo/db/storage/in_memory/in_memory_record_store.cpp index b0c583954f6..af596f7a569 100644 --- a/src/mongo/db/storage/in_memory/in_memory_record_store.cpp +++ b/src/mongo/db/storage/in_memory/in_memory_record_store.cpp @@ -46,584 +46,569 @@ namespace mongo { - using std::shared_ptr; - - class InMemoryRecordStore::InsertChange : public RecoveryUnit::Change { - public: - InsertChange(Data* data, RecordId loc) :_data(data), _loc(loc) {} - virtual void commit() {} - virtual void rollback() { - Records::iterator it = _data->records.find(_loc); - if (it != _data->records.end()) { - _data->dataSize -= it->second.size; - _data->records.erase(it); - } - } - - private: - Data* const _data; - const RecordId _loc; - }; - - // Works for both removes and updates - class InMemoryRecordStore::RemoveChange : public RecoveryUnit::Change { - public: - RemoveChange(Data* data, RecordId loc, const InMemoryRecord& rec) - :_data(data), _loc(loc), _rec(rec) - {} - - virtual void commit() {} - virtual void rollback() { - Records::iterator it = _data->records.find(_loc); - if (it != _data->records.end()) { - _data->dataSize -= it->second.size; - } - - _data->dataSize += _rec.size; - _data->records[_loc] = _rec; +using std::shared_ptr; + +class InMemoryRecordStore::InsertChange : public RecoveryUnit::Change { +public: + InsertChange(Data* data, RecordId loc) : _data(data), _loc(loc) {} + virtual void commit() {} + virtual void rollback() { + Records::iterator it = _data->records.find(_loc); + if (it != _data->records.end()) { + _data->dataSize -= it->second.size; + _data->records.erase(it); } + } - private: - Data* const _data; - const RecordId _loc; - const InMemoryRecord _rec; - }; - - class InMemoryRecordStore::TruncateChange : public RecoveryUnit::Change { - public: - TruncateChange(Data* data) : _data(data), _dataSize(0) { - using std::swap; - swap(_dataSize, _data->dataSize); - swap(_records, _data->records); +private: + Data* const _data; + const RecordId _loc; +}; + +// Works for both removes and updates +class InMemoryRecordStore::RemoveChange : public RecoveryUnit::Change { +public: + RemoveChange(Data* data, RecordId loc, const InMemoryRecord& rec) + : _data(data), _loc(loc), _rec(rec) {} + + virtual void commit() {} + virtual void rollback() { + Records::iterator it = _data->records.find(_loc); + if (it != _data->records.end()) { + _data->dataSize -= it->second.size; } - virtual void commit() {} - virtual void rollback() { - using std::swap; - swap(_dataSize, _data->dataSize); - swap(_records, _data->records); - } + _data->dataSize += _rec.size; + _data->records[_loc] = _rec; + } - private: - Data* const _data; - int64_t _dataSize; - Records _records; - }; - - class InMemoryRecordStore::Cursor final : public RecordCursor { - public: - Cursor(OperationContext* txn, const InMemoryRecordStore& rs) - : _txn(txn) - , _records(rs._data->records) - , _isCapped(rs.isCapped()) - {} - - boost::optional<Record> next() final { - if (_needFirstSeek) { - _needFirstSeek = false; - _it = _records.begin(); - } - else if (!_lastMoveWasRestore && _it != _records.end()) { - ++_it; - } - _lastMoveWasRestore = false; +private: + Data* const _data; + const RecordId _loc; + const InMemoryRecord _rec; +}; + +class InMemoryRecordStore::TruncateChange : public RecoveryUnit::Change { +public: + TruncateChange(Data* data) : _data(data), _dataSize(0) { + using std::swap; + swap(_dataSize, _data->dataSize); + swap(_records, _data->records); + } - if (_it == _records.end()) return {}; - return {{_it->first, _it->second.toRecordData()}}; - } + virtual void commit() {} + virtual void rollback() { + using std::swap; + swap(_dataSize, _data->dataSize); + swap(_records, _data->records); + } - boost::optional<Record> seekExact(const RecordId& id) final { - _lastMoveWasRestore = false; - _needFirstSeek = false; - _it = _records.find(id); - if (_it == _records.end()) return {}; - return {{_it->first, _it->second.toRecordData()}}; - } +private: + Data* const _data; + int64_t _dataSize; + Records _records; +}; - void savePositioned() final { - _txn = nullptr; - if (!_needFirstSeek && !_lastMoveWasRestore) - _savedId = _it == _records.end() ? RecordId() : _it->first; - } +class InMemoryRecordStore::Cursor final : public RecordCursor { +public: + Cursor(OperationContext* txn, const InMemoryRecordStore& rs) + : _txn(txn), _records(rs._data->records), _isCapped(rs.isCapped()) {} - void saveUnpositioned() final { - _txn = nullptr; - _savedId = RecordId(); + boost::optional<Record> next() final { + if (_needFirstSeek) { + _needFirstSeek = false; + _it = _records.begin(); + } else if (!_lastMoveWasRestore && _it != _records.end()) { + ++_it; } + _lastMoveWasRestore = false; - bool restore(OperationContext* txn) final { - _txn = txn; - if (_savedId.isNull()) { - _it = _records.end(); - return true; - } + if (_it == _records.end()) + return {}; + return {{_it->first, _it->second.toRecordData()}}; + } - _it = _records.lower_bound(_savedId); - _lastMoveWasRestore = _it == _records.end() || _it->first != _savedId; + boost::optional<Record> seekExact(const RecordId& id) final { + _lastMoveWasRestore = false; + _needFirstSeek = false; + _it = _records.find(id); + if (_it == _records.end()) + return {}; + return {{_it->first, _it->second.toRecordData()}}; + } - // Capped iterators die on invalidation rather than advancing. - return !(_isCapped && _lastMoveWasRestore); - } + void savePositioned() final { + _txn = nullptr; + if (!_needFirstSeek && !_lastMoveWasRestore) + _savedId = _it == _records.end() ? RecordId() : _it->first; + } - private: - unowned_ptr<OperationContext> _txn; - Records::const_iterator _it; - bool _needFirstSeek = true; - bool _lastMoveWasRestore = false; - RecordId _savedId; // Location to restore() to. Null means EOF. - - const InMemoryRecordStore::Records& _records; - const bool _isCapped; - }; - - class InMemoryRecordStore::ReverseCursor final : public RecordCursor { - public: - ReverseCursor(OperationContext* txn, const InMemoryRecordStore& rs) - : _txn(txn) - , _records(rs._data->records) - , _isCapped(rs.isCapped()) - {} - - boost::optional<Record> next() final { - if (_needFirstSeek) { - _needFirstSeek = false; - _it = _records.rbegin(); - } - else if (!_lastMoveWasRestore && _it != _records.rend()) { - ++_it; - } - _lastMoveWasRestore = false; + void saveUnpositioned() final { + _txn = nullptr; + _savedId = RecordId(); + } - if (_it == _records.rend()) return {}; - return {{_it->first, _it->second.toRecordData()}}; + bool restore(OperationContext* txn) final { + _txn = txn; + if (_savedId.isNull()) { + _it = _records.end(); + return true; } - boost::optional<Record> seekExact(const RecordId& id) final { - _lastMoveWasRestore = false; - _needFirstSeek = false; - - auto forwardIt = _records.find(id); - if (forwardIt == _records.end()) { - _it = _records.rend(); - return {}; - } + _it = _records.lower_bound(_savedId); + _lastMoveWasRestore = _it == _records.end() || _it->first != _savedId; - // The reverse_iterator will point to the preceding element, so increment the base - // iterator to make it point past the found element. - ++forwardIt; - _it = Records::const_reverse_iterator(forwardIt); - dassert(_it != _records.rend()); - dassert(_it->first == id); - return {{_it->first, _it->second.toRecordData()}}; - } - - void savePositioned() final { - _txn = nullptr; - if (!_needFirstSeek && !_lastMoveWasRestore) - _savedId = _it == _records.rend() ? RecordId() : _it->first; - } + // Capped iterators die on invalidation rather than advancing. + return !(_isCapped && _lastMoveWasRestore); + } - void saveUnpositioned() final { - _txn = nullptr; - _savedId = RecordId(); - } +private: + unowned_ptr<OperationContext> _txn; + Records::const_iterator _it; + bool _needFirstSeek = true; + bool _lastMoveWasRestore = false; + RecordId _savedId; // Location to restore() to. Null means EOF. - bool restore(OperationContext* txn) final { - _txn = txn; - if (_savedId.isNull()) { - _it = _records.rend(); - return true; - } + const InMemoryRecordStore::Records& _records; + const bool _isCapped; +}; - // Note: upper_bound returns the first entry > _savedId and reverse_iterators - // dereference to the element before their base iterator. This combine to make this - // dereference to the first element <= _savedId which is what we want here. - _it = Records::const_reverse_iterator(_records.upper_bound(_savedId)); - _lastMoveWasRestore = _it == _records.rend() || _it->first != _savedId; +class InMemoryRecordStore::ReverseCursor final : public RecordCursor { +public: + ReverseCursor(OperationContext* txn, const InMemoryRecordStore& rs) + : _txn(txn), _records(rs._data->records), _isCapped(rs.isCapped()) {} - // Capped iterators die on invalidation rather than advancing. - return !(_isCapped && _lastMoveWasRestore); + boost::optional<Record> next() final { + if (_needFirstSeek) { + _needFirstSeek = false; + _it = _records.rbegin(); + } else if (!_lastMoveWasRestore && _it != _records.rend()) { + ++_it; } + _lastMoveWasRestore = false; - private: - unowned_ptr<OperationContext> _txn; - Records::const_reverse_iterator _it; - bool _needFirstSeek = true; - bool _lastMoveWasRestore = false; - RecordId _savedId; // Location to restore() to. Null means EOF. - const InMemoryRecordStore::Records& _records; - const bool _isCapped; - }; + if (_it == _records.rend()) + return {}; + return {{_it->first, _it->second.toRecordData()}}; + } + boost::optional<Record> seekExact(const RecordId& id) final { + _lastMoveWasRestore = false; + _needFirstSeek = false; - // - // RecordStore - // - - InMemoryRecordStore::InMemoryRecordStore(StringData ns, - std::shared_ptr<void>* dataInOut, - bool isCapped, - int64_t cappedMaxSize, - int64_t cappedMaxDocs, - CappedDocumentDeleteCallback* cappedDeleteCallback) - : RecordStore(ns), - _isCapped(isCapped), - _cappedMaxSize(cappedMaxSize), - _cappedMaxDocs(cappedMaxDocs), - _cappedDeleteCallback(cappedDeleteCallback), - _data(*dataInOut ? static_cast<Data*>(dataInOut->get()) - : new Data(NamespaceString::oplog(ns))) { - if (!*dataInOut) { - dataInOut->reset(_data); // takes ownership + auto forwardIt = _records.find(id); + if (forwardIt == _records.end()) { + _it = _records.rend(); + return {}; } - if (_isCapped) { - invariant(_cappedMaxSize > 0); - invariant(_cappedMaxDocs == -1 || _cappedMaxDocs > 0); - } - else { - invariant(_cappedMaxSize == -1); - invariant(_cappedMaxDocs == -1); - } + // The reverse_iterator will point to the preceding element, so increment the base + // iterator to make it point past the found element. + ++forwardIt; + _it = Records::const_reverse_iterator(forwardIt); + dassert(_it != _records.rend()); + dassert(_it->first == id); + return {{_it->first, _it->second.toRecordData()}}; } - const char* InMemoryRecordStore::name() const { return "InMemory"; } - - RecordData InMemoryRecordStore::dataFor( OperationContext* txn, const RecordId& loc ) const { - return recordFor(loc)->toRecordData(); + void savePositioned() final { + _txn = nullptr; + if (!_needFirstSeek && !_lastMoveWasRestore) + _savedId = _it == _records.rend() ? RecordId() : _it->first; } - const InMemoryRecordStore::InMemoryRecord* InMemoryRecordStore::recordFor( - const RecordId& loc) const { - Records::const_iterator it = _data->records.find(loc); - if ( it == _data->records.end() ) { - error() << "InMemoryRecordStore::recordFor cannot find record for " << ns() - << ":" << loc; - } - invariant(it != _data->records.end()); - return &it->second; + void saveUnpositioned() final { + _txn = nullptr; + _savedId = RecordId(); } - InMemoryRecordStore::InMemoryRecord* InMemoryRecordStore::recordFor(const RecordId& loc) { - Records::iterator it = _data->records.find(loc); - if ( it == _data->records.end() ) { - error() << "InMemoryRecordStore::recordFor cannot find record for " << ns() - << ":" << loc; + bool restore(OperationContext* txn) final { + _txn = txn; + if (_savedId.isNull()) { + _it = _records.rend(); + return true; } - invariant(it != _data->records.end()); - return &it->second; - } - bool InMemoryRecordStore::findRecord( OperationContext* txn, - const RecordId& loc, RecordData* rd ) const { - Records::const_iterator it = _data->records.find(loc); - if ( it == _data->records.end() ) { - return false; - } - *rd = it->second.toRecordData(); - return true; + // Note: upper_bound returns the first entry > _savedId and reverse_iterators + // dereference to the element before their base iterator. This combine to make this + // dereference to the first element <= _savedId which is what we want here. + _it = Records::const_reverse_iterator(_records.upper_bound(_savedId)); + _lastMoveWasRestore = _it == _records.rend() || _it->first != _savedId; + + // Capped iterators die on invalidation rather than advancing. + return !(_isCapped && _lastMoveWasRestore); } - void InMemoryRecordStore::deleteRecord(OperationContext* txn, const RecordId& loc) { - InMemoryRecord* rec = recordFor(loc); - txn->recoveryUnit()->registerChange(new RemoveChange(_data, loc, *rec)); - _data->dataSize -= rec->size; - invariant(_data->records.erase(loc) == 1); +private: + unowned_ptr<OperationContext> _txn; + Records::const_reverse_iterator _it; + bool _needFirstSeek = true; + bool _lastMoveWasRestore = false; + RecordId _savedId; // Location to restore() to. Null means EOF. + const InMemoryRecordStore::Records& _records; + const bool _isCapped; +}; + + +// +// RecordStore +// + +InMemoryRecordStore::InMemoryRecordStore(StringData ns, + std::shared_ptr<void>* dataInOut, + bool isCapped, + int64_t cappedMaxSize, + int64_t cappedMaxDocs, + CappedDocumentDeleteCallback* cappedDeleteCallback) + : RecordStore(ns), + _isCapped(isCapped), + _cappedMaxSize(cappedMaxSize), + _cappedMaxDocs(cappedMaxDocs), + _cappedDeleteCallback(cappedDeleteCallback), + _data(*dataInOut ? static_cast<Data*>(dataInOut->get()) + : new Data(NamespaceString::oplog(ns))) { + if (!*dataInOut) { + dataInOut->reset(_data); // takes ownership } - bool InMemoryRecordStore::cappedAndNeedDelete(OperationContext* txn) const { - if (!_isCapped) - return false; + if (_isCapped) { + invariant(_cappedMaxSize > 0); + invariant(_cappedMaxDocs == -1 || _cappedMaxDocs > 0); + } else { + invariant(_cappedMaxSize == -1); + invariant(_cappedMaxDocs == -1); + } +} - if (_data->dataSize > _cappedMaxSize) - return true; +const char* InMemoryRecordStore::name() const { + return "InMemory"; +} - if ((_cappedMaxDocs != -1) && (numRecords(txn) > _cappedMaxDocs)) - return true; +RecordData InMemoryRecordStore::dataFor(OperationContext* txn, const RecordId& loc) const { + return recordFor(loc)->toRecordData(); +} +const InMemoryRecordStore::InMemoryRecord* InMemoryRecordStore::recordFor( + const RecordId& loc) const { + Records::const_iterator it = _data->records.find(loc); + if (it == _data->records.end()) { + error() << "InMemoryRecordStore::recordFor cannot find record for " << ns() << ":" << loc; + } + invariant(it != _data->records.end()); + return &it->second; +} + +InMemoryRecordStore::InMemoryRecord* InMemoryRecordStore::recordFor(const RecordId& loc) { + Records::iterator it = _data->records.find(loc); + if (it == _data->records.end()) { + error() << "InMemoryRecordStore::recordFor cannot find record for " << ns() << ":" << loc; + } + invariant(it != _data->records.end()); + return &it->second; +} + +bool InMemoryRecordStore::findRecord(OperationContext* txn, + const RecordId& loc, + RecordData* rd) const { + Records::const_iterator it = _data->records.find(loc); + if (it == _data->records.end()) { return false; } + *rd = it->second.toRecordData(); + return true; +} + +void InMemoryRecordStore::deleteRecord(OperationContext* txn, const RecordId& loc) { + InMemoryRecord* rec = recordFor(loc); + txn->recoveryUnit()->registerChange(new RemoveChange(_data, loc, *rec)); + _data->dataSize -= rec->size; + invariant(_data->records.erase(loc) == 1); +} + +bool InMemoryRecordStore::cappedAndNeedDelete(OperationContext* txn) const { + if (!_isCapped) + return false; - void InMemoryRecordStore::cappedDeleteAsNeeded(OperationContext* txn) { - while (cappedAndNeedDelete(txn)) { - invariant(!_data->records.empty()); + if (_data->dataSize > _cappedMaxSize) + return true; - Records::iterator oldest = _data->records.begin(); - RecordId id = oldest->first; - RecordData data = oldest->second.toRecordData(); + if ((_cappedMaxDocs != -1) && (numRecords(txn) > _cappedMaxDocs)) + return true; - if (_cappedDeleteCallback) - uassertStatusOK(_cappedDeleteCallback->aboutToDeleteCapped(txn, id, data)); + return false; +} - deleteRecord(txn, id); - } - } +void InMemoryRecordStore::cappedDeleteAsNeeded(OperationContext* txn) { + while (cappedAndNeedDelete(txn)) { + invariant(!_data->records.empty()); - StatusWith<RecordId> InMemoryRecordStore::extractAndCheckLocForOplog(const char* data, - int len) const { - StatusWith<RecordId> status = oploghack::extractKey(data, len); - if (!status.isOK()) - return status; + Records::iterator oldest = _data->records.begin(); + RecordId id = oldest->first; + RecordData data = oldest->second.toRecordData(); - if (!_data->records.empty() && status.getValue() <= _data->records.rbegin()->first) - return StatusWith<RecordId>(ErrorCodes::BadValue, "ts not higher than highest"); + if (_cappedDeleteCallback) + uassertStatusOK(_cappedDeleteCallback->aboutToDeleteCapped(txn, id, data)); - return status; + deleteRecord(txn, id); } +} - StatusWith<RecordId> InMemoryRecordStore::insertRecord(OperationContext* txn, - const char* data, - int len, - bool enforceQuota) { - if (_isCapped && len > _cappedMaxSize) { - // We use dataSize for capped rollover and we don't want to delete everything if we know - // this won't fit. - return StatusWith<RecordId>(ErrorCodes::BadValue, - "object to insert exceeds cappedMaxSize"); - } - - InMemoryRecord rec(len); - memcpy(rec.data.get(), data, len); - - RecordId loc; - if (_data->isOplog) { - StatusWith<RecordId> status = extractAndCheckLocForOplog(data, len); - if (!status.isOK()) - return status; - loc = status.getValue(); - } - else { - loc = allocateLoc(); - } +StatusWith<RecordId> InMemoryRecordStore::extractAndCheckLocForOplog(const char* data, + int len) const { + StatusWith<RecordId> status = oploghack::extractKey(data, len); + if (!status.isOK()) + return status; - txn->recoveryUnit()->registerChange(new InsertChange(_data, loc)); - _data->dataSize += len; - _data->records[loc] = rec; + if (!_data->records.empty() && status.getValue() <= _data->records.rbegin()->first) + return StatusWith<RecordId>(ErrorCodes::BadValue, "ts not higher than highest"); - cappedDeleteAsNeeded(txn); + return status; +} - return StatusWith<RecordId>(loc); +StatusWith<RecordId> InMemoryRecordStore::insertRecord(OperationContext* txn, + const char* data, + int len, + bool enforceQuota) { + if (_isCapped && len > _cappedMaxSize) { + // We use dataSize for capped rollover and we don't want to delete everything if we know + // this won't fit. + return StatusWith<RecordId>(ErrorCodes::BadValue, "object to insert exceeds cappedMaxSize"); } - StatusWith<RecordId> InMemoryRecordStore::insertRecord(OperationContext* txn, - const DocWriter* doc, - bool enforceQuota) { - const int len = doc->documentSize(); - if (_isCapped && len > _cappedMaxSize) { - // We use dataSize for capped rollover and we don't want to delete everything if we know - // this won't fit. - return StatusWith<RecordId>(ErrorCodes::BadValue, - "object to insert exceeds cappedMaxSize"); - } - - InMemoryRecord rec(len); - doc->writeDocument(rec.data.get()); - - RecordId loc; - if (_data->isOplog) { - StatusWith<RecordId> status = extractAndCheckLocForOplog(rec.data.get(), len); - if (!status.isOK()) - return status; - loc = status.getValue(); - } - else { - loc = allocateLoc(); - } + InMemoryRecord rec(len); + memcpy(rec.data.get(), data, len); - txn->recoveryUnit()->registerChange(new InsertChange(_data, loc)); - _data->dataSize += len; - _data->records[loc] = rec; - - cappedDeleteAsNeeded(txn); - - return StatusWith<RecordId>(loc); + RecordId loc; + if (_data->isOplog) { + StatusWith<RecordId> status = extractAndCheckLocForOplog(data, len); + if (!status.isOK()) + return status; + loc = status.getValue(); + } else { + loc = allocateLoc(); } - StatusWith<RecordId> InMemoryRecordStore::updateRecord(OperationContext* txn, - const RecordId& loc, - const char* data, - int len, - bool enforceQuota, - UpdateNotifier* notifier ) { - InMemoryRecord* oldRecord = recordFor( loc ); - int oldLen = oldRecord->size; - - if (_isCapped && len > oldLen) { - return StatusWith<RecordId>( ErrorCodes::InternalError, - "failing update: objects in a capped ns cannot grow", - 10003 ); - } - - if (notifier) { - // The in-memory KV engine uses the invalidation framework (does not support - // doc-locking), and therefore must notify that it is updating a document. - Status callbackStatus = notifier->recordStoreGoingToUpdateInPlace(txn, loc); - if (!callbackStatus.isOK()) { - return StatusWith<RecordId>(callbackStatus); - } - } + txn->recoveryUnit()->registerChange(new InsertChange(_data, loc)); + _data->dataSize += len; + _data->records[loc] = rec; - InMemoryRecord newRecord(len); - memcpy(newRecord.data.get(), data, len); + cappedDeleteAsNeeded(txn); - txn->recoveryUnit()->registerChange(new RemoveChange(_data, loc, *oldRecord)); - _data->dataSize += len - oldLen; - *oldRecord = newRecord; + return StatusWith<RecordId>(loc); +} - cappedDeleteAsNeeded(txn); - - return StatusWith<RecordId>(loc); +StatusWith<RecordId> InMemoryRecordStore::insertRecord(OperationContext* txn, + const DocWriter* doc, + bool enforceQuota) { + const int len = doc->documentSize(); + if (_isCapped && len > _cappedMaxSize) { + // We use dataSize for capped rollover and we don't want to delete everything if we know + // this won't fit. + return StatusWith<RecordId>(ErrorCodes::BadValue, "object to insert exceeds cappedMaxSize"); } - bool InMemoryRecordStore::updateWithDamagesSupported() const { - // TODO: Currently the UpdateStage assumes that updateWithDamages will apply the - // damages directly to the unowned BSONObj containing the record to be modified. - // The implementation of updateWithDamages() below copies the old record to a - // a new one and then applies the damages. - // - // We should be able to enable updateWithDamages() here once this assumption is - // relaxed. - return false; - } + InMemoryRecord rec(len); + doc->writeDocument(rec.data.get()); - Status InMemoryRecordStore::updateWithDamages( OperationContext* txn, - const RecordId& loc, - const RecordData& oldRec, - const char* damageSource, - const mutablebson::DamageVector& damages ) { - InMemoryRecord* oldRecord = recordFor( loc ); - const int len = oldRecord->size; - - InMemoryRecord newRecord(len); - memcpy(newRecord.data.get(), oldRecord->data.get(), len); - - txn->recoveryUnit()->registerChange(new RemoveChange(_data, loc, *oldRecord)); - *oldRecord = newRecord; - - cappedDeleteAsNeeded(txn); - - char* root = newRecord.data.get(); - mutablebson::DamageVector::const_iterator where = damages.begin(); - const mutablebson::DamageVector::const_iterator end = damages.end(); - for( ; where != end; ++where ) { - const char* sourcePtr = damageSource + where->sourceOffset; - char* targetPtr = root + where->targetOffset; - std::memcpy(targetPtr, sourcePtr, where->size); - } + RecordId loc; + if (_data->isOplog) { + StatusWith<RecordId> status = extractAndCheckLocForOplog(rec.data.get(), len); + if (!status.isOK()) + return status; + loc = status.getValue(); + } else { + loc = allocateLoc(); + } - *oldRecord = newRecord; + txn->recoveryUnit()->registerChange(new InsertChange(_data, loc)); + _data->dataSize += len; + _data->records[loc] = rec; - return Status::OK(); - } + cappedDeleteAsNeeded(txn); - std::unique_ptr<RecordCursor> InMemoryRecordStore::getCursor(OperationContext* txn, - bool forward) const { + return StatusWith<RecordId>(loc); +} - if (forward) return stdx::make_unique<Cursor>(txn, *this); - return stdx::make_unique<ReverseCursor>(txn, *this); - } +StatusWith<RecordId> InMemoryRecordStore::updateRecord(OperationContext* txn, + const RecordId& loc, + const char* data, + int len, + bool enforceQuota, + UpdateNotifier* notifier) { + InMemoryRecord* oldRecord = recordFor(loc); + int oldLen = oldRecord->size; - Status InMemoryRecordStore::truncate(OperationContext* txn) { - // Unlike other changes, TruncateChange mutates _data on construction to perform the - // truncate - txn->recoveryUnit()->registerChange(new TruncateChange(_data)); - return Status::OK(); + if (_isCapped && len > oldLen) { + return StatusWith<RecordId>( + ErrorCodes::InternalError, "failing update: objects in a capped ns cannot grow", 10003); } - void InMemoryRecordStore::temp_cappedTruncateAfter(OperationContext* txn, - RecordId end, - bool inclusive) { - Records::iterator it = inclusive ? _data->records.lower_bound(end) - : _data->records.upper_bound(end); - while(it != _data->records.end()) { - txn->recoveryUnit()->registerChange(new RemoveChange(_data, it->first, it->second)); - _data->dataSize -= it->second.size; - _data->records.erase(it++); + if (notifier) { + // The in-memory KV engine uses the invalidation framework (does not support + // doc-locking), and therefore must notify that it is updating a document. + Status callbackStatus = notifier->recordStoreGoingToUpdateInPlace(txn, loc); + if (!callbackStatus.isOK()) { + return StatusWith<RecordId>(callbackStatus); } } - Status InMemoryRecordStore::validate(OperationContext* txn, - bool full, - bool scanData, - ValidateAdaptor* adaptor, - ValidateResults* results, - BSONObjBuilder* output) { - results->valid = true; - if (scanData && full) { - for (Records::const_iterator it = _data->records.begin(); - it != _data->records.end(); ++it) { - const InMemoryRecord& rec = it->second; - size_t dataSize; - const Status status = adaptor->validate(rec.toRecordData(), &dataSize); - if (!status.isOK()) { - results->valid = false; - results->errors.push_back("invalid object detected (see logs)"); - log() << "Invalid object detected in " << _ns << ": " << status.reason(); - } - } - } + InMemoryRecord newRecord(len); + memcpy(newRecord.data.get(), data, len); - output->appendNumber( "nrecords", _data->records.size() ); + txn->recoveryUnit()->registerChange(new RemoveChange(_data, loc, *oldRecord)); + _data->dataSize += len - oldLen; + *oldRecord = newRecord; - return Status::OK(); + cappedDeleteAsNeeded(txn); - } + return StatusWith<RecordId>(loc); +} - void InMemoryRecordStore::appendCustomStats( OperationContext* txn, - BSONObjBuilder* result, - double scale ) const { - result->appendBool( "capped", _isCapped ); - if ( _isCapped ) { - result->appendIntOrLL( "max", _cappedMaxDocs ); - result->appendIntOrLL( "maxSize", _cappedMaxSize / scale ); - } +bool InMemoryRecordStore::updateWithDamagesSupported() const { + // TODO: Currently the UpdateStage assumes that updateWithDamages will apply the + // damages directly to the unowned BSONObj containing the record to be modified. + // The implementation of updateWithDamages() below copies the old record to a + // a new one and then applies the damages. + // + // We should be able to enable updateWithDamages() here once this assumption is + // relaxed. + return false; +} + +Status InMemoryRecordStore::updateWithDamages(OperationContext* txn, + const RecordId& loc, + const RecordData& oldRec, + const char* damageSource, + const mutablebson::DamageVector& damages) { + InMemoryRecord* oldRecord = recordFor(loc); + const int len = oldRecord->size; + + InMemoryRecord newRecord(len); + memcpy(newRecord.data.get(), oldRecord->data.get(), len); + + txn->recoveryUnit()->registerChange(new RemoveChange(_data, loc, *oldRecord)); + *oldRecord = newRecord; + + cappedDeleteAsNeeded(txn); + + char* root = newRecord.data.get(); + mutablebson::DamageVector::const_iterator where = damages.begin(); + const mutablebson::DamageVector::const_iterator end = damages.end(); + for (; where != end; ++where) { + const char* sourcePtr = damageSource + where->sourceOffset; + char* targetPtr = root + where->targetOffset; + std::memcpy(targetPtr, sourcePtr, where->size); } - Status InMemoryRecordStore::touch(OperationContext* txn, BSONObjBuilder* output) const { - if (output) { - output->append("numRanges", 1); - output->append("millis", 0); + *oldRecord = newRecord; + + return Status::OK(); +} + +std::unique_ptr<RecordCursor> InMemoryRecordStore::getCursor(OperationContext* txn, + bool forward) const { + if (forward) + return stdx::make_unique<Cursor>(txn, *this); + return stdx::make_unique<ReverseCursor>(txn, *this); +} + +Status InMemoryRecordStore::truncate(OperationContext* txn) { + // Unlike other changes, TruncateChange mutates _data on construction to perform the + // truncate + txn->recoveryUnit()->registerChange(new TruncateChange(_data)); + return Status::OK(); +} + +void InMemoryRecordStore::temp_cappedTruncateAfter(OperationContext* txn, + RecordId end, + bool inclusive) { + Records::iterator it = + inclusive ? _data->records.lower_bound(end) : _data->records.upper_bound(end); + while (it != _data->records.end()) { + txn->recoveryUnit()->registerChange(new RemoveChange(_data, it->first, it->second)); + _data->dataSize -= it->second.size; + _data->records.erase(it++); + } +} + +Status InMemoryRecordStore::validate(OperationContext* txn, + bool full, + bool scanData, + ValidateAdaptor* adaptor, + ValidateResults* results, + BSONObjBuilder* output) { + results->valid = true; + if (scanData && full) { + for (Records::const_iterator it = _data->records.begin(); it != _data->records.end(); + ++it) { + const InMemoryRecord& rec = it->second; + size_t dataSize; + const Status status = adaptor->validate(rec.toRecordData(), &dataSize); + if (!status.isOK()) { + results->valid = false; + results->errors.push_back("invalid object detected (see logs)"); + log() << "Invalid object detected in " << _ns << ": " << status.reason(); + } } - return Status::OK(); } - void InMemoryRecordStore::increaseStorageSize(OperationContext* txn, - int size, bool enforceQuota) { - // unclear what this would mean for this class. For now, just error if called. - invariant(!"increaseStorageSize not yet implemented"); - } + output->appendNumber("nrecords", _data->records.size()); - int64_t InMemoryRecordStore::storageSize(OperationContext* txn, - BSONObjBuilder* extraInfo, - int infoLevel) const { - // Note: not making use of extraInfo or infoLevel since we don't have extents - const int64_t recordOverhead = numRecords(txn) * sizeof(InMemoryRecord); - return _data->dataSize + recordOverhead; - } + return Status::OK(); +} - RecordId InMemoryRecordStore::allocateLoc() { - RecordId out = RecordId(_data->nextId++); - invariant(out < RecordId::max()); - return out; +void InMemoryRecordStore::appendCustomStats(OperationContext* txn, + BSONObjBuilder* result, + double scale) const { + result->appendBool("capped", _isCapped); + if (_isCapped) { + result->appendIntOrLL("max", _cappedMaxDocs); + result->appendIntOrLL("maxSize", _cappedMaxSize / scale); } +} - boost::optional<RecordId> InMemoryRecordStore::oplogStartHack( - OperationContext* txn, - const RecordId& startingPosition) const { - - if (!_data->isOplog) - return boost::none; - - const Records& records = _data->records; - - if (records.empty()) - return RecordId(); - - Records::const_iterator it = records.lower_bound(startingPosition); - if (it == records.end() || it->first > startingPosition) - --it; - - return it->first; +Status InMemoryRecordStore::touch(OperationContext* txn, BSONObjBuilder* output) const { + if (output) { + output->append("numRanges", 1); + output->append("millis", 0); } - -} // namespace mongo + return Status::OK(); +} + +void InMemoryRecordStore::increaseStorageSize(OperationContext* txn, int size, bool enforceQuota) { + // unclear what this would mean for this class. For now, just error if called. + invariant(!"increaseStorageSize not yet implemented"); +} + +int64_t InMemoryRecordStore::storageSize(OperationContext* txn, + BSONObjBuilder* extraInfo, + int infoLevel) const { + // Note: not making use of extraInfo or infoLevel since we don't have extents + const int64_t recordOverhead = numRecords(txn) * sizeof(InMemoryRecord); + return _data->dataSize + recordOverhead; +} + +RecordId InMemoryRecordStore::allocateLoc() { + RecordId out = RecordId(_data->nextId++); + invariant(out < RecordId::max()); + return out; +} + +boost::optional<RecordId> InMemoryRecordStore::oplogStartHack( + OperationContext* txn, const RecordId& startingPosition) const { + if (!_data->isOplog) + return boost::none; + + const Records& records = _data->records; + + if (records.empty()) + return RecordId(); + + Records::const_iterator it = records.lower_bound(startingPosition); + if (it == records.end() || it->first > startingPosition) + --it; + + return it->first; +} + +} // namespace mongo |