summaryrefslogtreecommitdiff
path: root/src/mongo/db/storage/mobile/mobile_record_store.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/storage/mobile/mobile_record_store.cpp')
-rw-r--r--src/mongo/db/storage/mobile/mobile_record_store.cpp773
1 files changed, 773 insertions, 0 deletions
diff --git a/src/mongo/db/storage/mobile/mobile_record_store.cpp b/src/mongo/db/storage/mobile/mobile_record_store.cpp
new file mode 100644
index 00000000000..01c8422ce3f
--- /dev/null
+++ b/src/mongo/db/storage/mobile/mobile_record_store.cpp
@@ -0,0 +1,773 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kStorage
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/storage/mobile/mobile_record_store.h"
+
+#include <sqlite3.h>
+
+#include "mongo/base/static_assert.h"
+#include "mongo/db/catalog/collection_options.h"
+#include "mongo/db/jsobj.h"
+#include "mongo/db/namespace_string.h"
+#include "mongo/db/operation_context.h"
+#include "mongo/db/storage/mobile/mobile_recovery_unit.h"
+#include "mongo/db/storage/mobile/mobile_session.h"
+#include "mongo/db/storage/mobile/mobile_sqlite_statement.h"
+#include "mongo/db/storage/mobile/mobile_util.h"
+#include "mongo/db/storage/oplog_hack.h"
+#include "mongo/db/storage/recovery_unit.h"
+#include "mongo/stdx/memory.h"
+#include "mongo/util/assert_util.h"
+#include "mongo/util/log.h"
+#include "mongo/util/mongoutils/str.h"
+
+namespace mongo {
+
+class MobileRecordStore::Cursor final : public SeekableRecordCursor {
+public:
+ Cursor(OperationContext* opCtx,
+ const MobileRecordStore& rs,
+ const std::string& path,
+ const std::string& ident,
+ bool forward)
+ : _opCtx(opCtx), _isCapped(rs.isCapped()), _forward(forward) {
+
+ str::stream cursorQuery;
+ cursorQuery << "SELECT rec_id, data from \"" << ident << "\" "
+ << "WHERE rec_id " << (forward ? '>' : '<') << " ? "
+ << "ORDER BY rec_id " << (forward ? "ASC" : "DESC") << ';';
+
+ MobileSession* session = MobileRecoveryUnit::get(_opCtx)->getSession(_opCtx);
+ _stmt = stdx::make_unique<SqliteStatement>(*session, cursorQuery);
+
+ _startIdNum = (forward ? RecordId::min().repr() : RecordId::max().repr());
+ _savedId = RecordId(_startIdNum);
+
+ _stmt->bindInt(0, _savedId.repr());
+ }
+
+ boost::optional<Record> next() final {
+ if (_eof) {
+ return {};
+ }
+
+ int status = _stmt->step();
+ // Reached end of result rows.
+ if (status == SQLITE_DONE) {
+ _eof = true;
+ _savedId = RecordId(_startIdNum);
+ return {};
+ }
+
+ // Checks no error was thrown and that step retrieved a row.
+ checkStatus(status, SQLITE_ROW, "_stmt->step() in MobileCursor's next");
+
+ long long recId = _stmt->getColInt(0);
+ const void* data = _stmt->getColBlob(1);
+ int64_t dataSize = _stmt->getColBytes(1);
+
+ _savedId = RecordId(recId);
+ // The data returned from sqlite3_column_blob is only valid until the next call to
+ // sqlite3_step. Using getOwned copies the buffer so the data is not invalidated.
+ return {{_savedId, RecordData(static_cast<const char*>(data), dataSize).getOwned()}};
+ }
+
+ boost::optional<Record> seekExact(const RecordId& id) final {
+ // Set the saved position and use save/restore to reprepare the SQL statement so that
+ // the cursor restarts at the parameter id.
+ int decr = (_forward ? -1 : 1);
+ _savedId = RecordId(id.repr() + decr);
+ _eof = false;
+
+ save();
+ restore();
+
+ boost::optional<Record> rec = next();
+ return rec;
+ }
+
+ void save() final {
+ _resetStatement();
+ }
+
+ void saveUnpositioned() final {
+ save();
+ _savedId = RecordId(_startIdNum);
+ }
+
+ bool restore() final {
+ if (_eof) {
+ return true;
+ }
+
+ bool isValid = true;
+
+ if (_isCapped) {
+ // Check that the cursor's saved position still exists. If not, it has been invalidated
+ // by capped deletion.
+ RecordId oldPosition = _savedId;
+ int decr = _forward ? -1 : 1;
+ if (_savedId == RecordId::min() || _savedId == RecordId::max()) {
+ decr = 0;
+ }
+
+ _resetStatement();
+ _stmt->bindInt(0, static_cast<int64_t>(_savedId.repr()) + decr);
+ boost::optional<Record> newPosition = next();
+
+ if (!newPosition || newPosition->id != oldPosition) {
+ // The cursor's current position was invalidated.
+ isValid = false;
+ }
+
+ _savedId = oldPosition;
+ _eof = false;
+ }
+
+ _resetStatement();
+ _stmt->bindInt(0, _savedId.repr());
+
+ return isValid;
+ }
+
+ void detachFromOperationContext() final {
+ _opCtx = nullptr;
+ }
+
+ void reattachToOperationContext(OperationContext* opCtx) final {
+ _opCtx = opCtx;
+ }
+
+private:
+ /**
+ * Resets the prepared statement.
+ */
+ void _resetStatement() {
+ _stmt->reset();
+ }
+
+ OperationContext* _opCtx;
+ std::unique_ptr<SqliteStatement> _stmt;
+
+ bool _eof = false;
+
+ // Saved location for restoring. RecordId(0) means EOF.
+ RecordId _savedId;
+
+ // Default start ID number that is specific to cursor direction.
+ int64_t _startIdNum;
+
+ const bool _isCapped;
+ const bool _forward;
+};
+
+MobileRecordStore::MobileRecordStore(OperationContext* opCtx,
+ StringData ns,
+ const std::string& path,
+ const std::string& ident,
+ const CollectionOptions& options)
+ : RecordStore(ns),
+ _path(path),
+ _ident(ident),
+ _isOplog(NamespaceString::oplog(ns)),
+ _isCapped(options.capped),
+ _cappedMaxSize(options.cappedSize > 4096 ? options.cappedSize : 4096),
+ _cappedMaxDocs(options.cappedMaxDocs) {
+
+ // Mobile SE doesn't support creating an oplog, assert now
+ massert(ErrorCodes::IllegalOperation,
+ "Replication is not supported by the mobile storage engine",
+ !_isOplog);
+
+ // Determines the nextId to be used for a new record.
+ MobileSession* session = MobileRecoveryUnit::get(opCtx)->getSession(opCtx);
+ std::string maxRecIdQuery = "SELECT IFNULL(MAX(rec_id), 0) FROM \"" + _ident + "\";";
+ SqliteStatement maxRecIdStmt(*session, maxRecIdQuery);
+
+ maxRecIdStmt.step(SQLITE_ROW);
+
+ long long nextId = maxRecIdStmt.getColInt(0);
+ _nextIdNum.store(nextId + 1);
+}
+
+const char* MobileRecordStore::name() const {
+ return "Mobile";
+}
+
+void MobileRecordStore::_initDataSizeIfNeeded_inlock(OperationContext* opCtx) const {
+ if (_isDataSizeInitialized) {
+ return;
+ }
+
+ MobileSession* session = MobileRecoveryUnit::get(opCtx)->getSession(opCtx);
+ std::string dataSizeQuery = "SELECT IFNULL(SUM(LENGTH(data)), 0) FROM \"" + _ident + "\";";
+ SqliteStatement dataSizeStmt(*session, dataSizeQuery);
+
+ dataSizeStmt.step(SQLITE_ROW);
+ int64_t dataSize = dataSizeStmt.getColInt(0);
+
+ _dataSize = dataSize;
+ _isDataSizeInitialized = true;
+}
+
+long long MobileRecordStore::dataSize(OperationContext* opCtx) const {
+ stdx::lock_guard<stdx::mutex> lock(_dataSizeMutex);
+ _initDataSizeIfNeeded_inlock(opCtx);
+ return _dataSize;
+}
+
+void MobileRecordStore::_initNumRecsIfNeeded_inlock(OperationContext* opCtx) const {
+ if (_isNumRecsInitialized) {
+ return;
+ }
+
+ MobileSession* session = MobileRecoveryUnit::get(opCtx)->getSession(opCtx);
+ std::string numRecordsQuery = "SELECT COUNT(*) FROM \"" + _ident + "\";";
+ SqliteStatement numRecordsStmt(*session, numRecordsQuery);
+
+ numRecordsStmt.step(SQLITE_ROW);
+
+ int64_t numRecs = numRecordsStmt.getColInt(0);
+
+ _numRecs = numRecs;
+ _isNumRecsInitialized = true;
+}
+
+long long MobileRecordStore::numRecords(OperationContext* opCtx) const {
+ stdx::lock_guard<stdx::mutex> lock(_numRecsMutex);
+ _initNumRecsIfNeeded_inlock(opCtx);
+ return _numRecs;
+}
+
+RecordData MobileRecordStore::dataFor(OperationContext* opCtx, const RecordId& recId) const {
+ RecordData recData;
+ bool recFound = findRecord(opCtx, recId, &recData);
+ invariant(recFound);
+ return recData;
+}
+
+bool MobileRecordStore::findRecord(OperationContext* opCtx,
+ const RecordId& recId,
+ RecordData* rd) const {
+ MobileSession* session = MobileRecoveryUnit::get(opCtx)->getSession(opCtx);
+ std::string sqlQuery = "SELECT data FROM \"" + _ident + "\" WHERE rec_id = ?;";
+ SqliteStatement stmt(*session, sqlQuery);
+
+ stmt.bindInt(0, recId.repr());
+
+ int status = stmt.step();
+ if (status == SQLITE_DONE) {
+ return false;
+ }
+ checkStatus(status, SQLITE_ROW, "sqlite3_step");
+
+ const void* recData = stmt.getColBlob(0);
+ int nBytes = stmt.getColBytes(0);
+ *rd = RecordData(static_cast<const char*>(recData), nBytes).getOwned();
+ return true;
+}
+
+void MobileRecordStore::deleteRecord(OperationContext* opCtx, const RecordId& recId) {
+ invariant(!_isCapped);
+
+ MobileSession* session = MobileRecoveryUnit::get(opCtx)->getSession(opCtx);
+ std::string dataSizeQuery =
+ "SELECT IFNULL(LENGTH(data), 0) FROM \"" + _ident + "\" WHERE rec_id = ?;";
+ SqliteStatement dataSizeStmt(*session, dataSizeQuery);
+ dataSizeStmt.bindInt(0, recId.repr());
+ dataSizeStmt.step(SQLITE_ROW);
+
+ int64_t dataSizeBefore = dataSizeStmt.getColInt(0);
+ _changeNumRecs(opCtx, -1);
+ _changeDataSize(opCtx, -dataSizeBefore);
+
+ std::string deleteQuery = "DELETE FROM \"" + _ident + "\" WHERE rec_id = ?;";
+ SqliteStatement deleteStmt(*session, deleteQuery);
+ deleteStmt.bindInt(0, recId.repr());
+ deleteStmt.step(SQLITE_DONE);
+}
+
+bool MobileRecordStore::_isCappedAndNeedsDelete(int64_t numRecs, int64_t numBytes) {
+ if (!_isCapped) {
+ return false;
+ }
+
+ return numBytes > _cappedMaxSize || (_cappedMaxDocs > 0 && numRecs > _cappedMaxDocs);
+}
+
+void MobileRecordStore::_notifyCappedCallbackIfNeeded_inlock(OperationContext* opCtx,
+ RecordId recId,
+ const RecordData& recData) {
+ if (!_cappedCallback) {
+ return;
+ }
+
+ uassertStatusOK(_cappedCallback->aboutToDeleteCapped(opCtx, recId, recData));
+}
+
+void MobileRecordStore::_doCappedDelete(OperationContext* opCtx,
+ SqliteStatement& stmt,
+ const std::string& direction,
+ int64_t startRecId) {
+ MobileSession* session = MobileRecoveryUnit::get(opCtx)->getSession(opCtx);
+
+ bool isStartRecIdKnown = startRecId;
+
+ int64_t numRecs = numRecords(opCtx), numBytes = dataSize(opCtx);
+ int64_t numRecsRemoved = 0, numBytesRemoved = 0;
+ {
+ stdx::lock_guard<stdx::mutex> cappedCallbackLock(_cappedCallbackMutex);
+
+ // Update how many records and how many bytes of data are about to be removed. Notify the
+ // capped callback of each record that will be deleted.
+ while ((isStartRecIdKnown && stmt.step() == SQLITE_ROW) ||
+ (!isStartRecIdKnown &&
+ _isCappedAndNeedsDelete(numRecs - numRecsRemoved, numBytes - numBytesRemoved) &&
+ stmt.step() == SQLITE_ROW)) {
+ int64_t id = stmt.getColInt(0);
+ if (!isStartRecIdKnown) {
+ startRecId = id;
+ }
+
+ const void* data = stmt.getColBlob(1);
+ int64_t size = stmt.getColBytes(1);
+
+ numRecsRemoved++;
+ numBytesRemoved += size;
+
+ _notifyCappedCallbackIfNeeded_inlock(
+ opCtx, RecordId(id), RecordData(static_cast<const char*>(data), size));
+ }
+ }
+
+ WriteUnitOfWork wuow(opCtx);
+
+ _changeNumRecs(opCtx, -numRecsRemoved);
+ _changeDataSize(opCtx, -numBytesRemoved);
+
+ str::stream cappedDeleteQuery;
+ cappedDeleteQuery << "DELETE FROM \"" << _ident << "\" WHERE rec_id " << direction << " ?;";
+ SqliteStatement cappedDeleteStmt(*session, cappedDeleteQuery);
+ cappedDeleteStmt.bindInt(0, startRecId);
+ cappedDeleteStmt.step(SQLITE_DONE);
+
+ wuow.commit();
+}
+
+void MobileRecordStore::_cappedDeleteIfNeeded(OperationContext* opCtx) {
+ if (!_isCappedAndNeedsDelete(numRecords(opCtx), dataSize(opCtx))) {
+ return;
+ }
+
+ MobileSession* session = MobileRecoveryUnit::get(opCtx)->getSession(opCtx);
+
+ std::string recordRemovalQuery =
+ "SELECT rec_id, data FROM \"" + _ident + "\" ORDER BY rec_id ASC;";
+ SqliteStatement recordRemovalStmt(*session, recordRemovalQuery);
+
+ _doCappedDelete(opCtx, recordRemovalStmt, "<=");
+}
+
+StatusWith<RecordId> MobileRecordStore::insertRecord(
+ OperationContext* opCtx, const char* data, int len, Timestamp, bool enforceQuota) {
+ // Inserts record into SQLite table (or replaces if duplicate record id).
+ MobileSession* session = MobileRecoveryUnit::get(opCtx)->getSession(opCtx);
+
+ if (_isCapped && len > _cappedMaxSize) {
+ return Status(ErrorCodes::BadValue, "object to insert exceeds cappedMaxSize");
+ }
+
+ _changeNumRecs(opCtx, 1);
+ _changeDataSize(opCtx, len);
+
+ std::string insertQuery =
+ "INSERT OR REPLACE INTO \"" + _ident + "\"(rec_id, data) VALUES(?, ?);";
+ SqliteStatement insertStmt(*session, insertQuery);
+ RecordId recId = _nextId();
+ insertStmt.bindInt(0, recId.repr());
+ insertStmt.bindBlob(1, data, len);
+ insertStmt.step(SQLITE_DONE);
+
+ _cappedDeleteIfNeeded(opCtx);
+
+ return StatusWith<RecordId>(recId);
+}
+
+Status MobileRecordStore::insertRecordsWithDocWriter(OperationContext* opCtx,
+ const DocWriter* const* docs,
+ const Timestamp* timestamps,
+ size_t nDocs,
+ RecordId* idsOut) {
+ // Calculates the total size of the data buffer.
+ size_t totalSize = 0;
+ for (size_t i = 0; i < nDocs; i++) {
+ totalSize += docs[i]->documentSize();
+ }
+
+ std::unique_ptr<char[]> buffer(new char[totalSize]);
+ char* pos = buffer.get();
+ for (size_t i = 0; i < nDocs; i++) {
+ docs[i]->writeDocument(pos);
+ size_t docLen = docs[i]->documentSize();
+ StatusWith<RecordId> res = insertRecord(opCtx, pos, docLen, timestamps[i], true);
+ idsOut[i] = res.getValue();
+ pos += docLen;
+ }
+
+ return Status::OK();
+}
+
+Status MobileRecordStore::updateRecord(OperationContext* opCtx,
+ const RecordId& recId,
+ const char* data,
+ int len,
+ bool enforceQuota,
+ UpdateNotifier* notifier) {
+ MobileSession* session = MobileRecoveryUnit::get(opCtx)->getSession(opCtx);
+ std::string dataSizeQuery =
+ "SELECT IFNULL(LENGTH(data), 0) FROM \"" + _ident + "\" WHERE rec_id = ?;";
+ SqliteStatement dataSizeStmt(*session, dataSizeQuery);
+ dataSizeStmt.bindInt(0, recId.repr());
+ dataSizeStmt.step(SQLITE_ROW);
+
+ int64_t dataSizeBefore = dataSizeStmt.getColInt(0);
+ if (_isCapped && dataSizeBefore != len) {
+ return Status(ErrorCodes::IllegalOperation, "cannot change the size of a document");
+ }
+ _changeDataSize(opCtx, -dataSizeBefore + len);
+
+ if (notifier) {
+ fassertStatusOK(37054, notifier->recordStoreGoingToUpdateInPlace(opCtx, recId));
+ }
+
+ std::string updateQuery = "UPDATE \"" + _ident + "\" SET data = ? " + "WHERE rec_id = ?;";
+ SqliteStatement updateStmt(*session, updateQuery);
+ updateStmt.bindBlob(0, data, len);
+ updateStmt.bindInt(1, recId.repr());
+ updateStmt.step(SQLITE_DONE);
+
+ return Status::OK();
+}
+
+bool MobileRecordStore::updateWithDamagesSupported() const {
+ return false;
+}
+
+StatusWith<RecordData> MobileRecordStore::updateWithDamages(
+ OperationContext* opCtx,
+ const RecordId& recId,
+ const RecordData& oldRec,
+ const char* damageSource,
+ const mutablebson::DamageVector& damages) {
+ return RecordData();
+}
+
+std::unique_ptr<SeekableRecordCursor> MobileRecordStore::getCursor(OperationContext* opCtx,
+ bool forward) const {
+ return stdx::make_unique<Cursor>(opCtx, *this, _path, _ident, forward);
+}
+
+/**
+ * SQLite does not directly support truncate. The SQLite documentation recommends dropping then
+ * recreating the table rather than deleting all the contents of a table.
+ */
+Status MobileRecordStore::truncate(OperationContext* opCtx) {
+ MobileSession* session = MobileRecoveryUnit::get(opCtx)->getSession(opCtx);
+
+ int64_t numRecsBefore = numRecords(opCtx);
+ _changeNumRecs(opCtx, -numRecsBefore);
+ int64_t dataSizeBefore = dataSize(opCtx);
+ _changeDataSize(opCtx, -dataSizeBefore);
+
+ std::string dropQuery = "DROP TABLE \"" + _ident + "\";";
+ SqliteStatement::execQuery(session, dropQuery);
+ MobileRecordStore::create(opCtx, _ident);
+
+ return Status::OK();
+}
+
+/**
+ * The method throws an assertion if the capped truncate results in an emptied table.
+ */
+void MobileRecordStore::cappedTruncateAfter(OperationContext* opCtx, RecordId end, bool inclusive) {
+ MobileSession* session = MobileRecoveryUnit::get(opCtx)->getSession(opCtx);
+
+ // Check that the table will not be empty after performing deletes.
+ str::stream recordsRemainingQuery;
+ recordsRemainingQuery << "SELECT * FROM \"" << _ident << "\" "
+ << "WHERE rec_id " << (inclusive ? "< " : "<= ") << end.repr()
+ << " LIMIT 1;";
+
+ SqliteStatement recordsRemainingStmt(*session, recordsRemainingQuery);
+ int status = recordsRemainingStmt.step();
+ if (status == SQLITE_DONE) {
+ massert(
+ 37003, str::stream() << "Cannot perform cappedTruncateAfter with end " << end, false);
+ }
+ checkStatus(status, SQLITE_ROW, "sqlite3_step");
+
+ str::stream recordsRemovedQuery;
+ recordsRemovedQuery << "SELECT rec_id, data FROM \"" << _ident << "\" "
+ << "WHERE rec_id " << (inclusive ? ">= " : "> ") << end.repr();
+ SqliteStatement recordsRemovedStmt(*session, recordsRemovedQuery);
+
+ _doCappedDelete(opCtx, recordsRemovedStmt, (inclusive ? ">=" : ">"), end.repr());
+}
+
+Status MobileRecordStore::compact(OperationContext* opCtx,
+ RecordStoreCompactAdaptor* adaptor,
+ const CompactOptions* options,
+ CompactStats* stats) {
+ return Status::OK();
+}
+
+/**
+ * Note: on full validation, this validates the entire database file, not just the table used by
+ * this record store.
+ */
+Status MobileRecordStore::validate(OperationContext* opCtx,
+ ValidateCmdLevel level,
+ ValidateAdaptor* adaptor,
+ ValidateResults* results,
+ BSONObjBuilder* output) {
+ if (level == kValidateFull) {
+ doValidate(opCtx, results);
+ }
+
+ if (!results->valid) {
+ // The database was corrupt, so return without checking the table.
+ return Status::OK();
+ }
+
+ MobileSession* session = MobileRecoveryUnit::get(opCtx)->getSession(opCtx);
+ try {
+ std::string selectQuery = "SELECT rec_id, data FROM \"" + _ident + "\";";
+ SqliteStatement selectStmt(*session, selectQuery);
+
+ int interruptInterval = 4096;
+ long long actualNumRecs = 0;
+ long long actualDataSize = 0;
+ long long numInvalidRecs = 0;
+
+ int status;
+ while ((status = selectStmt.step()) == SQLITE_ROW) {
+ if (!(actualNumRecs % interruptInterval)) {
+ opCtx->checkForInterrupt();
+ }
+
+ long long id = selectStmt.getColInt(0);
+ const void* data = selectStmt.getColBlob(1);
+ int dataSize = selectStmt.getColBytes(1);
+
+ ++actualNumRecs;
+ actualDataSize += dataSize;
+
+ RecordId recId(id);
+ RecordData recData(reinterpret_cast<const char*>(data), dataSize);
+
+ size_t validatedSize;
+ Status status = adaptor->validate(recId, recData, &validatedSize);
+
+ if (!status.isOK() || validatedSize != static_cast<size_t>(dataSize)) {
+ if (results->valid) {
+ std::string errMsg = "detected one or more invalid documents";
+ validateLogAndAppendError(results, errMsg);
+ }
+
+ ++numInvalidRecs;
+ log() << "document at location " << recId << " is corrupted";
+ }
+ }
+
+ if (status == SQLITE_CORRUPT) {
+ uasserted(ErrorCodes::UnknownError, sqlite3_errstr(status));
+ }
+ checkStatus(status, SQLITE_DONE, "sqlite3_step");
+
+ // Verify that _numRecs and _dataSize are accurate.
+ int64_t cachedNumRecs = numRecords(opCtx);
+ if (_resetNumRecsIfNeeded(opCtx, actualNumRecs)) {
+ str::stream errMsg;
+ errMsg << "cached number of records does not match actual number of records - ";
+ errMsg << "cached number of records = " << cachedNumRecs << "; ";
+ errMsg << "actual number of records = " << actualNumRecs;
+ validateLogAndAppendError(results, errMsg);
+ }
+ int64_t cachedDataSize = dataSize(opCtx);
+ if (_resetDataSizeIfNeeded(opCtx, actualDataSize)) {
+ str::stream errMsg;
+ errMsg << "cached data size does not match actual data size - ";
+ errMsg << "cached data size = " << cachedDataSize << "; ";
+ errMsg << "actual data size = " << actualDataSize;
+ validateLogAndAppendError(results, errMsg);
+ }
+
+ if (level == kValidateFull) {
+ output->append("nInvalidDocuments", numInvalidRecs);
+ }
+ output->appendNumber("nrecords", actualNumRecs);
+
+ } catch (const DBException& e) {
+ std::string errMsg = "record store is corrupt, could not read documents - " + e.toString();
+ validateLogAndAppendError(results, errMsg);
+ }
+
+ return Status::OK();
+}
+
+void MobileRecordStore::appendCustomStats(OperationContext* opCtx,
+ BSONObjBuilder* result,
+ double scale) const {
+ result->appendBool("capped", _isCapped);
+ if (_isCapped) {
+ result->appendIntOrLL("max", _cappedMaxDocs);
+ result->appendIntOrLL("maxSize", static_cast<long long>(_cappedMaxSize / scale));
+ }
+}
+
+Status MobileRecordStore::touch(OperationContext* opCtx, BSONObjBuilder* output) const {
+ return Status(ErrorCodes::CommandNotSupported, "this storage engine does not support touch");
+}
+
+/**
+ * Note: does not accurately return the size of the table on disk. Instead, it returns the number of
+ * bytes used to store the BSON documents.
+ */
+int64_t MobileRecordStore::storageSize(OperationContext* opCtx,
+ BSONObjBuilder* extraInfo,
+ int infoLevel) const {
+ return dataSize(opCtx);
+}
+
+RecordId MobileRecordStore::_nextId() {
+ RecordId out = RecordId(_nextIdNum.fetchAndAdd(1));
+ invariant(out.isNormal());
+ return out;
+}
+
+/**
+ * Keeps track of the changes to the number of records.
+ */
+class MobileRecordStore::NumRecsChange final : public RecoveryUnit::Change {
+public:
+ NumRecsChange(MobileRecordStore* rs, int64_t diff) : _rs(rs), _diff(diff) {}
+
+ void commit() override {}
+
+ void rollback() override {
+ stdx::lock_guard<stdx::mutex> lock(_rs->_numRecsMutex);
+ _rs->_numRecs -= _diff;
+ }
+
+private:
+ MobileRecordStore* _rs;
+ int64_t _diff;
+};
+
+void MobileRecordStore::_changeNumRecs(OperationContext* opCtx, int64_t diff) {
+ stdx::lock_guard<stdx::mutex> lock(_numRecsMutex);
+ opCtx->recoveryUnit()->registerChange(new NumRecsChange(this, diff));
+ _initNumRecsIfNeeded_inlock(opCtx);
+ _numRecs += diff;
+}
+
+bool MobileRecordStore::_resetNumRecsIfNeeded(OperationContext* opCtx, int64_t newNumRecs) {
+ bool wasReset = false;
+ int64_t currNumRecs = numRecords(opCtx);
+ if (currNumRecs != newNumRecs) {
+ wasReset = true;
+ stdx::lock_guard<stdx::mutex> lock(_numRecsMutex);
+ _numRecs = newNumRecs;
+ }
+ return wasReset;
+}
+
+/**
+ * Keeps track of the total data size.
+ */
+class MobileRecordStore::DataSizeChange final : public RecoveryUnit::Change {
+public:
+ DataSizeChange(MobileRecordStore* rs, int64_t diff) : _rs(rs), _diff(diff) {}
+
+ void commit() override {}
+
+ void rollback() override {
+ stdx::lock_guard<stdx::mutex> lock(_rs->_dataSizeMutex);
+ _rs->_dataSize -= _diff;
+ }
+
+private:
+ MobileRecordStore* _rs;
+ int64_t _diff;
+};
+
+void MobileRecordStore::_changeDataSize(OperationContext* opCtx, int64_t diff) {
+ stdx::lock_guard<stdx::mutex> lock(_dataSizeMutex);
+ opCtx->recoveryUnit()->registerChange(new DataSizeChange(this, diff));
+ _initDataSizeIfNeeded_inlock(opCtx);
+ _dataSize += diff;
+}
+
+bool MobileRecordStore::_resetDataSizeIfNeeded(OperationContext* opCtx, int64_t newDataSize) {
+ bool wasReset = false;
+ int64_t currDataSize = dataSize(opCtx);
+
+ if (currDataSize != _dataSize) {
+ wasReset = true;
+ stdx::lock_guard<stdx::mutex> lock(_dataSizeMutex);
+ _dataSize = newDataSize;
+ }
+ return wasReset;
+}
+
+Status MobileRecordStore::updateCappedSize(OperationContext* opCtx, long long cappedSize) {
+ _cappedMaxSize = cappedSize;
+ return Status::OK();
+}
+
+boost::optional<RecordId> MobileRecordStore::oplogStartHack(
+ OperationContext* opCtx, const RecordId& startingPosition) const {
+ return {};
+}
+
+/**
+ * Creates a new record store within SQLite.
+ * The method is not transactional. Callers are responsible for handling transactional semantics.
+ */
+void MobileRecordStore::create(OperationContext* opCtx, const std::string& ident) {
+ MobileSession* session = MobileRecoveryUnit::get(opCtx)->getSessionNoTxn(opCtx);
+ std::string sqlQuery =
+ "CREATE TABLE IF NOT EXISTS \"" + ident + "\"(rec_id INT, data BLOB, PRIMARY KEY(rec_id));";
+ SqliteStatement::execQuery(session, sqlQuery);
+}
+
+} // namespace mongo