summaryrefslogtreecommitdiff
path: root/src/mongo/db/storage/in_memory/in_memory_record_store.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/storage/in_memory/in_memory_record_store.cpp')
-rw-r--r--src/mongo/db/storage/in_memory/in_memory_record_store.cpp963
1 files changed, 474 insertions, 489 deletions
diff --git a/src/mongo/db/storage/in_memory/in_memory_record_store.cpp b/src/mongo/db/storage/in_memory/in_memory_record_store.cpp
index b0c583954f6..af596f7a569 100644
--- a/src/mongo/db/storage/in_memory/in_memory_record_store.cpp
+++ b/src/mongo/db/storage/in_memory/in_memory_record_store.cpp
@@ -46,584 +46,569 @@
namespace mongo {
- using std::shared_ptr;
-
- class InMemoryRecordStore::InsertChange : public RecoveryUnit::Change {
- public:
- InsertChange(Data* data, RecordId loc) :_data(data), _loc(loc) {}
- virtual void commit() {}
- virtual void rollback() {
- Records::iterator it = _data->records.find(_loc);
- if (it != _data->records.end()) {
- _data->dataSize -= it->second.size;
- _data->records.erase(it);
- }
- }
-
- private:
- Data* const _data;
- const RecordId _loc;
- };
-
- // Works for both removes and updates
- class InMemoryRecordStore::RemoveChange : public RecoveryUnit::Change {
- public:
- RemoveChange(Data* data, RecordId loc, const InMemoryRecord& rec)
- :_data(data), _loc(loc), _rec(rec)
- {}
-
- virtual void commit() {}
- virtual void rollback() {
- Records::iterator it = _data->records.find(_loc);
- if (it != _data->records.end()) {
- _data->dataSize -= it->second.size;
- }
-
- _data->dataSize += _rec.size;
- _data->records[_loc] = _rec;
+using std::shared_ptr;
+
+class InMemoryRecordStore::InsertChange : public RecoveryUnit::Change {
+public:
+ InsertChange(Data* data, RecordId loc) : _data(data), _loc(loc) {}
+ virtual void commit() {}
+ virtual void rollback() {
+ Records::iterator it = _data->records.find(_loc);
+ if (it != _data->records.end()) {
+ _data->dataSize -= it->second.size;
+ _data->records.erase(it);
}
+ }
- private:
- Data* const _data;
- const RecordId _loc;
- const InMemoryRecord _rec;
- };
-
- class InMemoryRecordStore::TruncateChange : public RecoveryUnit::Change {
- public:
- TruncateChange(Data* data) : _data(data), _dataSize(0) {
- using std::swap;
- swap(_dataSize, _data->dataSize);
- swap(_records, _data->records);
+private:
+ Data* const _data;
+ const RecordId _loc;
+};
+
+// Works for both removes and updates
+class InMemoryRecordStore::RemoveChange : public RecoveryUnit::Change {
+public:
+ RemoveChange(Data* data, RecordId loc, const InMemoryRecord& rec)
+ : _data(data), _loc(loc), _rec(rec) {}
+
+ virtual void commit() {}
+ virtual void rollback() {
+ Records::iterator it = _data->records.find(_loc);
+ if (it != _data->records.end()) {
+ _data->dataSize -= it->second.size;
}
- virtual void commit() {}
- virtual void rollback() {
- using std::swap;
- swap(_dataSize, _data->dataSize);
- swap(_records, _data->records);
- }
+ _data->dataSize += _rec.size;
+ _data->records[_loc] = _rec;
+ }
- private:
- Data* const _data;
- int64_t _dataSize;
- Records _records;
- };
-
- class InMemoryRecordStore::Cursor final : public RecordCursor {
- public:
- Cursor(OperationContext* txn, const InMemoryRecordStore& rs)
- : _txn(txn)
- , _records(rs._data->records)
- , _isCapped(rs.isCapped())
- {}
-
- boost::optional<Record> next() final {
- if (_needFirstSeek) {
- _needFirstSeek = false;
- _it = _records.begin();
- }
- else if (!_lastMoveWasRestore && _it != _records.end()) {
- ++_it;
- }
- _lastMoveWasRestore = false;
+private:
+ Data* const _data;
+ const RecordId _loc;
+ const InMemoryRecord _rec;
+};
+
+class InMemoryRecordStore::TruncateChange : public RecoveryUnit::Change {
+public:
+ TruncateChange(Data* data) : _data(data), _dataSize(0) {
+ using std::swap;
+ swap(_dataSize, _data->dataSize);
+ swap(_records, _data->records);
+ }
- if (_it == _records.end()) return {};
- return {{_it->first, _it->second.toRecordData()}};
- }
+ virtual void commit() {}
+ virtual void rollback() {
+ using std::swap;
+ swap(_dataSize, _data->dataSize);
+ swap(_records, _data->records);
+ }
- boost::optional<Record> seekExact(const RecordId& id) final {
- _lastMoveWasRestore = false;
- _needFirstSeek = false;
- _it = _records.find(id);
- if (_it == _records.end()) return {};
- return {{_it->first, _it->second.toRecordData()}};
- }
+private:
+ Data* const _data;
+ int64_t _dataSize;
+ Records _records;
+};
- void savePositioned() final {
- _txn = nullptr;
- if (!_needFirstSeek && !_lastMoveWasRestore)
- _savedId = _it == _records.end() ? RecordId() : _it->first;
- }
+class InMemoryRecordStore::Cursor final : public RecordCursor {
+public:
+ Cursor(OperationContext* txn, const InMemoryRecordStore& rs)
+ : _txn(txn), _records(rs._data->records), _isCapped(rs.isCapped()) {}
- void saveUnpositioned() final {
- _txn = nullptr;
- _savedId = RecordId();
+ boost::optional<Record> next() final {
+ if (_needFirstSeek) {
+ _needFirstSeek = false;
+ _it = _records.begin();
+ } else if (!_lastMoveWasRestore && _it != _records.end()) {
+ ++_it;
}
+ _lastMoveWasRestore = false;
- bool restore(OperationContext* txn) final {
- _txn = txn;
- if (_savedId.isNull()) {
- _it = _records.end();
- return true;
- }
+ if (_it == _records.end())
+ return {};
+ return {{_it->first, _it->second.toRecordData()}};
+ }
- _it = _records.lower_bound(_savedId);
- _lastMoveWasRestore = _it == _records.end() || _it->first != _savedId;
+ boost::optional<Record> seekExact(const RecordId& id) final {
+ _lastMoveWasRestore = false;
+ _needFirstSeek = false;
+ _it = _records.find(id);
+ if (_it == _records.end())
+ return {};
+ return {{_it->first, _it->second.toRecordData()}};
+ }
- // Capped iterators die on invalidation rather than advancing.
- return !(_isCapped && _lastMoveWasRestore);
- }
+ void savePositioned() final {
+ _txn = nullptr;
+ if (!_needFirstSeek && !_lastMoveWasRestore)
+ _savedId = _it == _records.end() ? RecordId() : _it->first;
+ }
- private:
- unowned_ptr<OperationContext> _txn;
- Records::const_iterator _it;
- bool _needFirstSeek = true;
- bool _lastMoveWasRestore = false;
- RecordId _savedId; // Location to restore() to. Null means EOF.
-
- const InMemoryRecordStore::Records& _records;
- const bool _isCapped;
- };
-
- class InMemoryRecordStore::ReverseCursor final : public RecordCursor {
- public:
- ReverseCursor(OperationContext* txn, const InMemoryRecordStore& rs)
- : _txn(txn)
- , _records(rs._data->records)
- , _isCapped(rs.isCapped())
- {}
-
- boost::optional<Record> next() final {
- if (_needFirstSeek) {
- _needFirstSeek = false;
- _it = _records.rbegin();
- }
- else if (!_lastMoveWasRestore && _it != _records.rend()) {
- ++_it;
- }
- _lastMoveWasRestore = false;
+ void saveUnpositioned() final {
+ _txn = nullptr;
+ _savedId = RecordId();
+ }
- if (_it == _records.rend()) return {};
- return {{_it->first, _it->second.toRecordData()}};
+ bool restore(OperationContext* txn) final {
+ _txn = txn;
+ if (_savedId.isNull()) {
+ _it = _records.end();
+ return true;
}
- boost::optional<Record> seekExact(const RecordId& id) final {
- _lastMoveWasRestore = false;
- _needFirstSeek = false;
-
- auto forwardIt = _records.find(id);
- if (forwardIt == _records.end()) {
- _it = _records.rend();
- return {};
- }
+ _it = _records.lower_bound(_savedId);
+ _lastMoveWasRestore = _it == _records.end() || _it->first != _savedId;
- // The reverse_iterator will point to the preceding element, so increment the base
- // iterator to make it point past the found element.
- ++forwardIt;
- _it = Records::const_reverse_iterator(forwardIt);
- dassert(_it != _records.rend());
- dassert(_it->first == id);
- return {{_it->first, _it->second.toRecordData()}};
- }
-
- void savePositioned() final {
- _txn = nullptr;
- if (!_needFirstSeek && !_lastMoveWasRestore)
- _savedId = _it == _records.rend() ? RecordId() : _it->first;
- }
+ // Capped iterators die on invalidation rather than advancing.
+ return !(_isCapped && _lastMoveWasRestore);
+ }
- void saveUnpositioned() final {
- _txn = nullptr;
- _savedId = RecordId();
- }
+private:
+ unowned_ptr<OperationContext> _txn;
+ Records::const_iterator _it;
+ bool _needFirstSeek = true;
+ bool _lastMoveWasRestore = false;
+ RecordId _savedId; // Location to restore() to. Null means EOF.
- bool restore(OperationContext* txn) final {
- _txn = txn;
- if (_savedId.isNull()) {
- _it = _records.rend();
- return true;
- }
+ const InMemoryRecordStore::Records& _records;
+ const bool _isCapped;
+};
- // Note: upper_bound returns the first entry > _savedId and reverse_iterators
- // dereference to the element before their base iterator. This combine to make this
- // dereference to the first element <= _savedId which is what we want here.
- _it = Records::const_reverse_iterator(_records.upper_bound(_savedId));
- _lastMoveWasRestore = _it == _records.rend() || _it->first != _savedId;
+class InMemoryRecordStore::ReverseCursor final : public RecordCursor {
+public:
+ ReverseCursor(OperationContext* txn, const InMemoryRecordStore& rs)
+ : _txn(txn), _records(rs._data->records), _isCapped(rs.isCapped()) {}
- // Capped iterators die on invalidation rather than advancing.
- return !(_isCapped && _lastMoveWasRestore);
+ boost::optional<Record> next() final {
+ if (_needFirstSeek) {
+ _needFirstSeek = false;
+ _it = _records.rbegin();
+ } else if (!_lastMoveWasRestore && _it != _records.rend()) {
+ ++_it;
}
+ _lastMoveWasRestore = false;
- private:
- unowned_ptr<OperationContext> _txn;
- Records::const_reverse_iterator _it;
- bool _needFirstSeek = true;
- bool _lastMoveWasRestore = false;
- RecordId _savedId; // Location to restore() to. Null means EOF.
- const InMemoryRecordStore::Records& _records;
- const bool _isCapped;
- };
+ if (_it == _records.rend())
+ return {};
+ return {{_it->first, _it->second.toRecordData()}};
+ }
+ boost::optional<Record> seekExact(const RecordId& id) final {
+ _lastMoveWasRestore = false;
+ _needFirstSeek = false;
- //
- // RecordStore
- //
-
- InMemoryRecordStore::InMemoryRecordStore(StringData ns,
- std::shared_ptr<void>* dataInOut,
- bool isCapped,
- int64_t cappedMaxSize,
- int64_t cappedMaxDocs,
- CappedDocumentDeleteCallback* cappedDeleteCallback)
- : RecordStore(ns),
- _isCapped(isCapped),
- _cappedMaxSize(cappedMaxSize),
- _cappedMaxDocs(cappedMaxDocs),
- _cappedDeleteCallback(cappedDeleteCallback),
- _data(*dataInOut ? static_cast<Data*>(dataInOut->get())
- : new Data(NamespaceString::oplog(ns))) {
- if (!*dataInOut) {
- dataInOut->reset(_data); // takes ownership
+ auto forwardIt = _records.find(id);
+ if (forwardIt == _records.end()) {
+ _it = _records.rend();
+ return {};
}
- if (_isCapped) {
- invariant(_cappedMaxSize > 0);
- invariant(_cappedMaxDocs == -1 || _cappedMaxDocs > 0);
- }
- else {
- invariant(_cappedMaxSize == -1);
- invariant(_cappedMaxDocs == -1);
- }
+ // The reverse_iterator will point to the preceding element, so increment the base
+ // iterator to make it point past the found element.
+ ++forwardIt;
+ _it = Records::const_reverse_iterator(forwardIt);
+ dassert(_it != _records.rend());
+ dassert(_it->first == id);
+ return {{_it->first, _it->second.toRecordData()}};
}
- const char* InMemoryRecordStore::name() const { return "InMemory"; }
-
- RecordData InMemoryRecordStore::dataFor( OperationContext* txn, const RecordId& loc ) const {
- return recordFor(loc)->toRecordData();
+ void savePositioned() final {
+ _txn = nullptr;
+ if (!_needFirstSeek && !_lastMoveWasRestore)
+ _savedId = _it == _records.rend() ? RecordId() : _it->first;
}
- const InMemoryRecordStore::InMemoryRecord* InMemoryRecordStore::recordFor(
- const RecordId& loc) const {
- Records::const_iterator it = _data->records.find(loc);
- if ( it == _data->records.end() ) {
- error() << "InMemoryRecordStore::recordFor cannot find record for " << ns()
- << ":" << loc;
- }
- invariant(it != _data->records.end());
- return &it->second;
+ void saveUnpositioned() final {
+ _txn = nullptr;
+ _savedId = RecordId();
}
- InMemoryRecordStore::InMemoryRecord* InMemoryRecordStore::recordFor(const RecordId& loc) {
- Records::iterator it = _data->records.find(loc);
- if ( it == _data->records.end() ) {
- error() << "InMemoryRecordStore::recordFor cannot find record for " << ns()
- << ":" << loc;
+ bool restore(OperationContext* txn) final {
+ _txn = txn;
+ if (_savedId.isNull()) {
+ _it = _records.rend();
+ return true;
}
- invariant(it != _data->records.end());
- return &it->second;
- }
- bool InMemoryRecordStore::findRecord( OperationContext* txn,
- const RecordId& loc, RecordData* rd ) const {
- Records::const_iterator it = _data->records.find(loc);
- if ( it == _data->records.end() ) {
- return false;
- }
- *rd = it->second.toRecordData();
- return true;
+ // Note: upper_bound returns the first entry > _savedId and reverse_iterators
+ // dereference to the element before their base iterator. This combine to make this
+ // dereference to the first element <= _savedId which is what we want here.
+ _it = Records::const_reverse_iterator(_records.upper_bound(_savedId));
+ _lastMoveWasRestore = _it == _records.rend() || _it->first != _savedId;
+
+ // Capped iterators die on invalidation rather than advancing.
+ return !(_isCapped && _lastMoveWasRestore);
}
- void InMemoryRecordStore::deleteRecord(OperationContext* txn, const RecordId& loc) {
- InMemoryRecord* rec = recordFor(loc);
- txn->recoveryUnit()->registerChange(new RemoveChange(_data, loc, *rec));
- _data->dataSize -= rec->size;
- invariant(_data->records.erase(loc) == 1);
+private:
+ unowned_ptr<OperationContext> _txn;
+ Records::const_reverse_iterator _it;
+ bool _needFirstSeek = true;
+ bool _lastMoveWasRestore = false;
+ RecordId _savedId; // Location to restore() to. Null means EOF.
+ const InMemoryRecordStore::Records& _records;
+ const bool _isCapped;
+};
+
+
+//
+// RecordStore
+//
+
+InMemoryRecordStore::InMemoryRecordStore(StringData ns,
+ std::shared_ptr<void>* dataInOut,
+ bool isCapped,
+ int64_t cappedMaxSize,
+ int64_t cappedMaxDocs,
+ CappedDocumentDeleteCallback* cappedDeleteCallback)
+ : RecordStore(ns),
+ _isCapped(isCapped),
+ _cappedMaxSize(cappedMaxSize),
+ _cappedMaxDocs(cappedMaxDocs),
+ _cappedDeleteCallback(cappedDeleteCallback),
+ _data(*dataInOut ? static_cast<Data*>(dataInOut->get())
+ : new Data(NamespaceString::oplog(ns))) {
+ if (!*dataInOut) {
+ dataInOut->reset(_data); // takes ownership
}
- bool InMemoryRecordStore::cappedAndNeedDelete(OperationContext* txn) const {
- if (!_isCapped)
- return false;
+ if (_isCapped) {
+ invariant(_cappedMaxSize > 0);
+ invariant(_cappedMaxDocs == -1 || _cappedMaxDocs > 0);
+ } else {
+ invariant(_cappedMaxSize == -1);
+ invariant(_cappedMaxDocs == -1);
+ }
+}
- if (_data->dataSize > _cappedMaxSize)
- return true;
+const char* InMemoryRecordStore::name() const {
+ return "InMemory";
+}
- if ((_cappedMaxDocs != -1) && (numRecords(txn) > _cappedMaxDocs))
- return true;
+RecordData InMemoryRecordStore::dataFor(OperationContext* txn, const RecordId& loc) const {
+ return recordFor(loc)->toRecordData();
+}
+const InMemoryRecordStore::InMemoryRecord* InMemoryRecordStore::recordFor(
+ const RecordId& loc) const {
+ Records::const_iterator it = _data->records.find(loc);
+ if (it == _data->records.end()) {
+ error() << "InMemoryRecordStore::recordFor cannot find record for " << ns() << ":" << loc;
+ }
+ invariant(it != _data->records.end());
+ return &it->second;
+}
+
+InMemoryRecordStore::InMemoryRecord* InMemoryRecordStore::recordFor(const RecordId& loc) {
+ Records::iterator it = _data->records.find(loc);
+ if (it == _data->records.end()) {
+ error() << "InMemoryRecordStore::recordFor cannot find record for " << ns() << ":" << loc;
+ }
+ invariant(it != _data->records.end());
+ return &it->second;
+}
+
+bool InMemoryRecordStore::findRecord(OperationContext* txn,
+ const RecordId& loc,
+ RecordData* rd) const {
+ Records::const_iterator it = _data->records.find(loc);
+ if (it == _data->records.end()) {
return false;
}
+ *rd = it->second.toRecordData();
+ return true;
+}
+
+void InMemoryRecordStore::deleteRecord(OperationContext* txn, const RecordId& loc) {
+ InMemoryRecord* rec = recordFor(loc);
+ txn->recoveryUnit()->registerChange(new RemoveChange(_data, loc, *rec));
+ _data->dataSize -= rec->size;
+ invariant(_data->records.erase(loc) == 1);
+}
+
+bool InMemoryRecordStore::cappedAndNeedDelete(OperationContext* txn) const {
+ if (!_isCapped)
+ return false;
- void InMemoryRecordStore::cappedDeleteAsNeeded(OperationContext* txn) {
- while (cappedAndNeedDelete(txn)) {
- invariant(!_data->records.empty());
+ if (_data->dataSize > _cappedMaxSize)
+ return true;
- Records::iterator oldest = _data->records.begin();
- RecordId id = oldest->first;
- RecordData data = oldest->second.toRecordData();
+ if ((_cappedMaxDocs != -1) && (numRecords(txn) > _cappedMaxDocs))
+ return true;
- if (_cappedDeleteCallback)
- uassertStatusOK(_cappedDeleteCallback->aboutToDeleteCapped(txn, id, data));
+ return false;
+}
- deleteRecord(txn, id);
- }
- }
+void InMemoryRecordStore::cappedDeleteAsNeeded(OperationContext* txn) {
+ while (cappedAndNeedDelete(txn)) {
+ invariant(!_data->records.empty());
- StatusWith<RecordId> InMemoryRecordStore::extractAndCheckLocForOplog(const char* data,
- int len) const {
- StatusWith<RecordId> status = oploghack::extractKey(data, len);
- if (!status.isOK())
- return status;
+ Records::iterator oldest = _data->records.begin();
+ RecordId id = oldest->first;
+ RecordData data = oldest->second.toRecordData();
- if (!_data->records.empty() && status.getValue() <= _data->records.rbegin()->first)
- return StatusWith<RecordId>(ErrorCodes::BadValue, "ts not higher than highest");
+ if (_cappedDeleteCallback)
+ uassertStatusOK(_cappedDeleteCallback->aboutToDeleteCapped(txn, id, data));
- return status;
+ deleteRecord(txn, id);
}
+}
- StatusWith<RecordId> InMemoryRecordStore::insertRecord(OperationContext* txn,
- const char* data,
- int len,
- bool enforceQuota) {
- if (_isCapped && len > _cappedMaxSize) {
- // We use dataSize for capped rollover and we don't want to delete everything if we know
- // this won't fit.
- return StatusWith<RecordId>(ErrorCodes::BadValue,
- "object to insert exceeds cappedMaxSize");
- }
-
- InMemoryRecord rec(len);
- memcpy(rec.data.get(), data, len);
-
- RecordId loc;
- if (_data->isOplog) {
- StatusWith<RecordId> status = extractAndCheckLocForOplog(data, len);
- if (!status.isOK())
- return status;
- loc = status.getValue();
- }
- else {
- loc = allocateLoc();
- }
+StatusWith<RecordId> InMemoryRecordStore::extractAndCheckLocForOplog(const char* data,
+ int len) const {
+ StatusWith<RecordId> status = oploghack::extractKey(data, len);
+ if (!status.isOK())
+ return status;
- txn->recoveryUnit()->registerChange(new InsertChange(_data, loc));
- _data->dataSize += len;
- _data->records[loc] = rec;
+ if (!_data->records.empty() && status.getValue() <= _data->records.rbegin()->first)
+ return StatusWith<RecordId>(ErrorCodes::BadValue, "ts not higher than highest");
- cappedDeleteAsNeeded(txn);
+ return status;
+}
- return StatusWith<RecordId>(loc);
+StatusWith<RecordId> InMemoryRecordStore::insertRecord(OperationContext* txn,
+ const char* data,
+ int len,
+ bool enforceQuota) {
+ if (_isCapped && len > _cappedMaxSize) {
+ // We use dataSize for capped rollover and we don't want to delete everything if we know
+ // this won't fit.
+ return StatusWith<RecordId>(ErrorCodes::BadValue, "object to insert exceeds cappedMaxSize");
}
- StatusWith<RecordId> InMemoryRecordStore::insertRecord(OperationContext* txn,
- const DocWriter* doc,
- bool enforceQuota) {
- const int len = doc->documentSize();
- if (_isCapped && len > _cappedMaxSize) {
- // We use dataSize for capped rollover and we don't want to delete everything if we know
- // this won't fit.
- return StatusWith<RecordId>(ErrorCodes::BadValue,
- "object to insert exceeds cappedMaxSize");
- }
-
- InMemoryRecord rec(len);
- doc->writeDocument(rec.data.get());
-
- RecordId loc;
- if (_data->isOplog) {
- StatusWith<RecordId> status = extractAndCheckLocForOplog(rec.data.get(), len);
- if (!status.isOK())
- return status;
- loc = status.getValue();
- }
- else {
- loc = allocateLoc();
- }
+ InMemoryRecord rec(len);
+ memcpy(rec.data.get(), data, len);
- txn->recoveryUnit()->registerChange(new InsertChange(_data, loc));
- _data->dataSize += len;
- _data->records[loc] = rec;
-
- cappedDeleteAsNeeded(txn);
-
- return StatusWith<RecordId>(loc);
+ RecordId loc;
+ if (_data->isOplog) {
+ StatusWith<RecordId> status = extractAndCheckLocForOplog(data, len);
+ if (!status.isOK())
+ return status;
+ loc = status.getValue();
+ } else {
+ loc = allocateLoc();
}
- StatusWith<RecordId> InMemoryRecordStore::updateRecord(OperationContext* txn,
- const RecordId& loc,
- const char* data,
- int len,
- bool enforceQuota,
- UpdateNotifier* notifier ) {
- InMemoryRecord* oldRecord = recordFor( loc );
- int oldLen = oldRecord->size;
-
- if (_isCapped && len > oldLen) {
- return StatusWith<RecordId>( ErrorCodes::InternalError,
- "failing update: objects in a capped ns cannot grow",
- 10003 );
- }
-
- if (notifier) {
- // The in-memory KV engine uses the invalidation framework (does not support
- // doc-locking), and therefore must notify that it is updating a document.
- Status callbackStatus = notifier->recordStoreGoingToUpdateInPlace(txn, loc);
- if (!callbackStatus.isOK()) {
- return StatusWith<RecordId>(callbackStatus);
- }
- }
+ txn->recoveryUnit()->registerChange(new InsertChange(_data, loc));
+ _data->dataSize += len;
+ _data->records[loc] = rec;
- InMemoryRecord newRecord(len);
- memcpy(newRecord.data.get(), data, len);
+ cappedDeleteAsNeeded(txn);
- txn->recoveryUnit()->registerChange(new RemoveChange(_data, loc, *oldRecord));
- _data->dataSize += len - oldLen;
- *oldRecord = newRecord;
+ return StatusWith<RecordId>(loc);
+}
- cappedDeleteAsNeeded(txn);
-
- return StatusWith<RecordId>(loc);
+StatusWith<RecordId> InMemoryRecordStore::insertRecord(OperationContext* txn,
+ const DocWriter* doc,
+ bool enforceQuota) {
+ const int len = doc->documentSize();
+ if (_isCapped && len > _cappedMaxSize) {
+ // We use dataSize for capped rollover and we don't want to delete everything if we know
+ // this won't fit.
+ return StatusWith<RecordId>(ErrorCodes::BadValue, "object to insert exceeds cappedMaxSize");
}
- bool InMemoryRecordStore::updateWithDamagesSupported() const {
- // TODO: Currently the UpdateStage assumes that updateWithDamages will apply the
- // damages directly to the unowned BSONObj containing the record to be modified.
- // The implementation of updateWithDamages() below copies the old record to a
- // a new one and then applies the damages.
- //
- // We should be able to enable updateWithDamages() here once this assumption is
- // relaxed.
- return false;
- }
+ InMemoryRecord rec(len);
+ doc->writeDocument(rec.data.get());
- Status InMemoryRecordStore::updateWithDamages( OperationContext* txn,
- const RecordId& loc,
- const RecordData& oldRec,
- const char* damageSource,
- const mutablebson::DamageVector& damages ) {
- InMemoryRecord* oldRecord = recordFor( loc );
- const int len = oldRecord->size;
-
- InMemoryRecord newRecord(len);
- memcpy(newRecord.data.get(), oldRecord->data.get(), len);
-
- txn->recoveryUnit()->registerChange(new RemoveChange(_data, loc, *oldRecord));
- *oldRecord = newRecord;
-
- cappedDeleteAsNeeded(txn);
-
- char* root = newRecord.data.get();
- mutablebson::DamageVector::const_iterator where = damages.begin();
- const mutablebson::DamageVector::const_iterator end = damages.end();
- for( ; where != end; ++where ) {
- const char* sourcePtr = damageSource + where->sourceOffset;
- char* targetPtr = root + where->targetOffset;
- std::memcpy(targetPtr, sourcePtr, where->size);
- }
+ RecordId loc;
+ if (_data->isOplog) {
+ StatusWith<RecordId> status = extractAndCheckLocForOplog(rec.data.get(), len);
+ if (!status.isOK())
+ return status;
+ loc = status.getValue();
+ } else {
+ loc = allocateLoc();
+ }
- *oldRecord = newRecord;
+ txn->recoveryUnit()->registerChange(new InsertChange(_data, loc));
+ _data->dataSize += len;
+ _data->records[loc] = rec;
- return Status::OK();
- }
+ cappedDeleteAsNeeded(txn);
- std::unique_ptr<RecordCursor> InMemoryRecordStore::getCursor(OperationContext* txn,
- bool forward) const {
+ return StatusWith<RecordId>(loc);
+}
- if (forward) return stdx::make_unique<Cursor>(txn, *this);
- return stdx::make_unique<ReverseCursor>(txn, *this);
- }
+StatusWith<RecordId> InMemoryRecordStore::updateRecord(OperationContext* txn,
+ const RecordId& loc,
+ const char* data,
+ int len,
+ bool enforceQuota,
+ UpdateNotifier* notifier) {
+ InMemoryRecord* oldRecord = recordFor(loc);
+ int oldLen = oldRecord->size;
- Status InMemoryRecordStore::truncate(OperationContext* txn) {
- // Unlike other changes, TruncateChange mutates _data on construction to perform the
- // truncate
- txn->recoveryUnit()->registerChange(new TruncateChange(_data));
- return Status::OK();
+ if (_isCapped && len > oldLen) {
+ return StatusWith<RecordId>(
+ ErrorCodes::InternalError, "failing update: objects in a capped ns cannot grow", 10003);
}
- void InMemoryRecordStore::temp_cappedTruncateAfter(OperationContext* txn,
- RecordId end,
- bool inclusive) {
- Records::iterator it = inclusive ? _data->records.lower_bound(end)
- : _data->records.upper_bound(end);
- while(it != _data->records.end()) {
- txn->recoveryUnit()->registerChange(new RemoveChange(_data, it->first, it->second));
- _data->dataSize -= it->second.size;
- _data->records.erase(it++);
+ if (notifier) {
+ // The in-memory KV engine uses the invalidation framework (does not support
+ // doc-locking), and therefore must notify that it is updating a document.
+ Status callbackStatus = notifier->recordStoreGoingToUpdateInPlace(txn, loc);
+ if (!callbackStatus.isOK()) {
+ return StatusWith<RecordId>(callbackStatus);
}
}
- Status InMemoryRecordStore::validate(OperationContext* txn,
- bool full,
- bool scanData,
- ValidateAdaptor* adaptor,
- ValidateResults* results,
- BSONObjBuilder* output) {
- results->valid = true;
- if (scanData && full) {
- for (Records::const_iterator it = _data->records.begin();
- it != _data->records.end(); ++it) {
- const InMemoryRecord& rec = it->second;
- size_t dataSize;
- const Status status = adaptor->validate(rec.toRecordData(), &dataSize);
- if (!status.isOK()) {
- results->valid = false;
- results->errors.push_back("invalid object detected (see logs)");
- log() << "Invalid object detected in " << _ns << ": " << status.reason();
- }
- }
- }
+ InMemoryRecord newRecord(len);
+ memcpy(newRecord.data.get(), data, len);
- output->appendNumber( "nrecords", _data->records.size() );
+ txn->recoveryUnit()->registerChange(new RemoveChange(_data, loc, *oldRecord));
+ _data->dataSize += len - oldLen;
+ *oldRecord = newRecord;
- return Status::OK();
+ cappedDeleteAsNeeded(txn);
- }
+ return StatusWith<RecordId>(loc);
+}
- void InMemoryRecordStore::appendCustomStats( OperationContext* txn,
- BSONObjBuilder* result,
- double scale ) const {
- result->appendBool( "capped", _isCapped );
- if ( _isCapped ) {
- result->appendIntOrLL( "max", _cappedMaxDocs );
- result->appendIntOrLL( "maxSize", _cappedMaxSize / scale );
- }
+bool InMemoryRecordStore::updateWithDamagesSupported() const {
+ // TODO: Currently the UpdateStage assumes that updateWithDamages will apply the
+ // damages directly to the unowned BSONObj containing the record to be modified.
+ // The implementation of updateWithDamages() below copies the old record to a
+ // a new one and then applies the damages.
+ //
+ // We should be able to enable updateWithDamages() here once this assumption is
+ // relaxed.
+ return false;
+}
+
+Status InMemoryRecordStore::updateWithDamages(OperationContext* txn,
+ const RecordId& loc,
+ const RecordData& oldRec,
+ const char* damageSource,
+ const mutablebson::DamageVector& damages) {
+ InMemoryRecord* oldRecord = recordFor(loc);
+ const int len = oldRecord->size;
+
+ InMemoryRecord newRecord(len);
+ memcpy(newRecord.data.get(), oldRecord->data.get(), len);
+
+ txn->recoveryUnit()->registerChange(new RemoveChange(_data, loc, *oldRecord));
+ *oldRecord = newRecord;
+
+ cappedDeleteAsNeeded(txn);
+
+ char* root = newRecord.data.get();
+ mutablebson::DamageVector::const_iterator where = damages.begin();
+ const mutablebson::DamageVector::const_iterator end = damages.end();
+ for (; where != end; ++where) {
+ const char* sourcePtr = damageSource + where->sourceOffset;
+ char* targetPtr = root + where->targetOffset;
+ std::memcpy(targetPtr, sourcePtr, where->size);
}
- Status InMemoryRecordStore::touch(OperationContext* txn, BSONObjBuilder* output) const {
- if (output) {
- output->append("numRanges", 1);
- output->append("millis", 0);
+ *oldRecord = newRecord;
+
+ return Status::OK();
+}
+
+std::unique_ptr<RecordCursor> InMemoryRecordStore::getCursor(OperationContext* txn,
+ bool forward) const {
+ if (forward)
+ return stdx::make_unique<Cursor>(txn, *this);
+ return stdx::make_unique<ReverseCursor>(txn, *this);
+}
+
+Status InMemoryRecordStore::truncate(OperationContext* txn) {
+ // Unlike other changes, TruncateChange mutates _data on construction to perform the
+ // truncate
+ txn->recoveryUnit()->registerChange(new TruncateChange(_data));
+ return Status::OK();
+}
+
+void InMemoryRecordStore::temp_cappedTruncateAfter(OperationContext* txn,
+ RecordId end,
+ bool inclusive) {
+ Records::iterator it =
+ inclusive ? _data->records.lower_bound(end) : _data->records.upper_bound(end);
+ while (it != _data->records.end()) {
+ txn->recoveryUnit()->registerChange(new RemoveChange(_data, it->first, it->second));
+ _data->dataSize -= it->second.size;
+ _data->records.erase(it++);
+ }
+}
+
+Status InMemoryRecordStore::validate(OperationContext* txn,
+ bool full,
+ bool scanData,
+ ValidateAdaptor* adaptor,
+ ValidateResults* results,
+ BSONObjBuilder* output) {
+ results->valid = true;
+ if (scanData && full) {
+ for (Records::const_iterator it = _data->records.begin(); it != _data->records.end();
+ ++it) {
+ const InMemoryRecord& rec = it->second;
+ size_t dataSize;
+ const Status status = adaptor->validate(rec.toRecordData(), &dataSize);
+ if (!status.isOK()) {
+ results->valid = false;
+ results->errors.push_back("invalid object detected (see logs)");
+ log() << "Invalid object detected in " << _ns << ": " << status.reason();
+ }
}
- return Status::OK();
}
- void InMemoryRecordStore::increaseStorageSize(OperationContext* txn,
- int size, bool enforceQuota) {
- // unclear what this would mean for this class. For now, just error if called.
- invariant(!"increaseStorageSize not yet implemented");
- }
+ output->appendNumber("nrecords", _data->records.size());
- int64_t InMemoryRecordStore::storageSize(OperationContext* txn,
- BSONObjBuilder* extraInfo,
- int infoLevel) const {
- // Note: not making use of extraInfo or infoLevel since we don't have extents
- const int64_t recordOverhead = numRecords(txn) * sizeof(InMemoryRecord);
- return _data->dataSize + recordOverhead;
- }
+ return Status::OK();
+}
- RecordId InMemoryRecordStore::allocateLoc() {
- RecordId out = RecordId(_data->nextId++);
- invariant(out < RecordId::max());
- return out;
+void InMemoryRecordStore::appendCustomStats(OperationContext* txn,
+ BSONObjBuilder* result,
+ double scale) const {
+ result->appendBool("capped", _isCapped);
+ if (_isCapped) {
+ result->appendIntOrLL("max", _cappedMaxDocs);
+ result->appendIntOrLL("maxSize", _cappedMaxSize / scale);
}
+}
- boost::optional<RecordId> InMemoryRecordStore::oplogStartHack(
- OperationContext* txn,
- const RecordId& startingPosition) const {
-
- if (!_data->isOplog)
- return boost::none;
-
- const Records& records = _data->records;
-
- if (records.empty())
- return RecordId();
-
- Records::const_iterator it = records.lower_bound(startingPosition);
- if (it == records.end() || it->first > startingPosition)
- --it;
-
- return it->first;
+Status InMemoryRecordStore::touch(OperationContext* txn, BSONObjBuilder* output) const {
+ if (output) {
+ output->append("numRanges", 1);
+ output->append("millis", 0);
}
-
-} // namespace mongo
+ return Status::OK();
+}
+
+void InMemoryRecordStore::increaseStorageSize(OperationContext* txn, int size, bool enforceQuota) {
+ // unclear what this would mean for this class. For now, just error if called.
+ invariant(!"increaseStorageSize not yet implemented");
+}
+
+int64_t InMemoryRecordStore::storageSize(OperationContext* txn,
+ BSONObjBuilder* extraInfo,
+ int infoLevel) const {
+ // Note: not making use of extraInfo or infoLevel since we don't have extents
+ const int64_t recordOverhead = numRecords(txn) * sizeof(InMemoryRecord);
+ return _data->dataSize + recordOverhead;
+}
+
+RecordId InMemoryRecordStore::allocateLoc() {
+ RecordId out = RecordId(_data->nextId++);
+ invariant(out < RecordId::max());
+ return out;
+}
+
+boost::optional<RecordId> InMemoryRecordStore::oplogStartHack(
+ OperationContext* txn, const RecordId& startingPosition) const {
+ if (!_data->isOplog)
+ return boost::none;
+
+ const Records& records = _data->records;
+
+ if (records.empty())
+ return RecordId();
+
+ Records::const_iterator it = records.lower_bound(startingPosition);
+ if (it == records.end() || it->first > startingPosition)
+ --it;
+
+ return it->first;
+}
+
+} // namespace mongo