diff options
author | Gregory Wlodarek <gregory.wlodarek@mongodb.com> | 2019-01-02 21:38:14 -0500 |
---|---|---|
committer | Gregory Wlodarek <gregory.wlodarek@mongodb.com> | 2019-01-02 21:51:19 -0500 |
commit | a8fabf5c4e4d5e456bcfa3510618e7ce96fb2d4e (patch) | |
tree | 009f299a9364f1eae73a9d7f803c7aca0739ae39 /src/mongo/db/storage | |
parent | 1e6e1401ffe10264404c032f9efdc77d0ffcc649 (diff) | |
download | mongo-a8fabf5c4e4d5e456bcfa3510618e7ce96fb2d4e.tar.gz |
SERVER-37788 Implement oplog hack and plugin oplog visibility manager in the Biggie record store
Diffstat (limited to 'src/mongo/db/storage')
-rw-r--r-- | src/mongo/db/storage/biggie/biggie_record_store.cpp | 141 | ||||
-rw-r--r-- | src/mongo/db/storage/biggie/biggie_record_store.h | 28 |
2 files changed, 147 insertions, 22 deletions
diff --git a/src/mongo/db/storage/biggie/biggie_record_store.cpp b/src/mongo/db/storage/biggie/biggie_record_store.cpp index d5d520858be..4ac476864ea 100644 --- a/src/mongo/db/storage/biggie/biggie_record_store.cpp +++ b/src/mongo/db/storage/biggie/biggie_record_store.cpp @@ -40,8 +40,10 @@ #include "mongo/db/operation_context.h" #include "mongo/db/storage/biggie/biggie_record_store.h" #include "mongo/db/storage/biggie/biggie_recovery_unit.h" +#include "mongo/db/storage/biggie/biggie_visibility_manager.h" #include "mongo/db/storage/biggie/store.h" #include "mongo/db/storage/key_string.h" +#include "mongo/db/storage/oplog_hack.h" #include "mongo/db/storage/write_unit_of_work.h" #include "mongo/stdx/memory.h" #include "mongo/util/hex.h" @@ -86,7 +88,9 @@ RecordStore::RecordStore(StringData ns, _ident(_identStr.data(), _identStr.size()), _prefix(createKey(_ident, std::numeric_limits<int64_t>::min())), _postfix(createKey(_ident, std::numeric_limits<int64_t>::max())), - _cappedCallback(cappedCallback) { + _cappedCallback(cappedCallback), + _isOplog(NamespaceString::oplog(ns)), + _visibilityManager(std::make_unique<VisibilityManager>(this)) { if (_isCapped) { invariant(_cappedMaxSize > 0); invariant(_cappedMaxDocs == -1 || _cappedMaxDocs > 0); @@ -171,7 +175,17 @@ Status RecordStore::insertRecords(OperationContext* opCtx, auto ru = RecoveryUnit::get(opCtx); StringStore* workingCopy(ru->getHead()); for (auto& record : *inOutRecords) { - int64_t thisRecordId = nextRecordId(); + int64_t thisRecordId = 0; + if (_isOplog) { + StatusWith<RecordId> status = + extractAndCheckLocForOplog(opCtx, record.data.data(), record.data.size()); + if (!status.isOK()) + return status.getStatus(); + thisRecordId = status.getValue().repr(); + _visibilityManager->addUncommittedRecord(opCtx, RecordId(thisRecordId)); + } else { + thisRecordId = nextRecordId(); + } workingCopy->insert(StringStore::value_type{ createKey(_ident, thisRecordId), std::string(record.data.data(), record.data.size())}); record.id = RecordId(thisRecordId); @@ -203,11 +217,22 @@ Status RecordStore::insertRecordsWithDocWriter(OperationContext* opCtx, for (size_t i = 0; i < nDocs; i++) { const size_t len = docs[i]->documentSize(); - int64_t thisRecordId = nextRecordId(); + std::string buf(len, '\0'); + docs[i]->writeDocument(&buf[0]); + + int64_t thisRecordId = 0; + if (_isOplog) { + StatusWith<RecordId> status = extractAndCheckLocForOplog(opCtx, buf.data(), len); + if (!status.isOK()) + return status.getStatus(); + thisRecordId = status.getValue().repr(); + _visibilityManager->addUncommittedRecord(opCtx, RecordId(thisRecordId)); + } else { + thisRecordId = nextRecordId(); + } std::string key = createKey(_ident, thisRecordId); - StringStore::value_type vt{key, std::string(len, '\0')}; - docs[i]->writeDocument(&vt.second[0]); + StringStore::value_type vt{key, buf}; workingCopy->insert(std::move(vt)); if (idsOut) idsOut[i] = RecordId(thisRecordId); @@ -253,8 +278,8 @@ StatusWith<RecordData> RecordStore::updateWithDamages(OperationContext* opCtx, std::unique_ptr<SeekableRecordCursor> RecordStore::getCursor(OperationContext* opCtx, bool forward) const { if (forward) - return std::make_unique<Cursor>(opCtx, *this); - return std::make_unique<ReverseCursor>(opCtx, *this); + return std::make_unique<Cursor>(opCtx, *this, _visibilityManager.get()); + return std::make_unique<ReverseCursor>(opCtx, *this, _visibilityManager.get()); } Status RecordStore::truncate(OperationContext* opCtx) { @@ -374,16 +399,64 @@ Status RecordStore::touch(OperationContext* opCtx, BSONObjBuilder* output) const return Status::OK(); // All data is already in 'cache'. } -void RecordStore::waitForAllEarlierOplogWritesToBeVisible(OperationContext* opCtx) const { - // Shouldn't need to do anything here as writes are visible on commit. -} - void RecordStore::updateStatsAfterRepair(OperationContext* opCtx, long long numRecords, long long dataSize) { // TODO: Implement. } +void RecordStore::waitForAllEarlierOplogWritesToBeVisible(OperationContext* opCtx) const { + _visibilityManager->waitForAllEarlierOplogWritesToBeVisible(opCtx); +} + +boost::optional<RecordId> RecordStore::oplogStartHack(OperationContext* opCtx, + const RecordId& startingPosition) const { + if (!_isOplog) + return boost::none; + + if (numRecords(opCtx) == 0) + return RecordId(); + + StringStore* workingCopy{RecoveryUnit::get(opCtx)->getHead()}; + + std::string key = createKey(_ident, startingPosition.repr()); + StringStore::const_reverse_iterator it(workingCopy->upper_bound(key)); + + if (it == workingCopy->rend()) + return RecordId(); + + RecordId rid = RecordId(extractRecordId(it->first)); + if (rid > startingPosition) + return RecordId(); + + return rid; +} + +StatusWith<RecordId> RecordStore::extractAndCheckLocForOplog(OperationContext* opCtx, + const char* data, + int len) const { + StatusWith<RecordId> status = oploghack::extractKey(data, len); + if (!status.isOK()) + return status; + + StringStore* workingCopy(RecoveryUnit::get(opCtx)->getHead()); + StringStore::const_reverse_iterator it = workingCopy->upper_bound(_postfix); + + if (numRecords(opCtx) == 0) + return status; + + RecordId rid = RecordId(extractRecordId(it->first)); + if (status.getValue() <= rid) + return StatusWith<RecordId>(ErrorCodes::BadValue, + str::stream() << "attempted out-of-order oplog insert of " + << status.getValue() + << " (oplog last insert was " + << rid + << " )"); + + return status; +} + bool RecordStore::cappedAndNeedDelete(OperationContext* opCtx, StringStore* workingCopy) { if (!_isCapped) return false; @@ -408,8 +481,15 @@ void RecordStore::cappedDeleteAsNeeded(OperationContext* opCtx, StringStore* wor invariant(numRecords(opCtx) > 0); stdx::lock_guard<stdx::mutex> cappedCallbackLock(_cappedCallbackMutex); + RecordId rid = RecordId(extractRecordId(recordIt->first)); + + if (_isOplog && _visibilityManager->isFirstHidden(rid)) { + // We have a record that hasn't been committed yet, so we shouldn't truncate anymore + // until it gets committed. + return; + } + if (_cappedCallback) { - RecordId rid = RecordId(extractRecordId(recordIt->first)); RecordData rd = RecordData(recordIt->second.c_str(), recordIt->second.length()); uassertStatusOK(_cappedCallback->aboutToDeleteCapped(opCtx, rid, rd)); } @@ -424,11 +504,15 @@ void RecordStore::cappedDeleteAsNeeded(OperationContext* opCtx, StringStore* wor } } -RecordStore::Cursor::Cursor(OperationContext* opCtx, const RecordStore& rs) : opCtx(opCtx) { +RecordStore::Cursor::Cursor(OperationContext* opCtx, + const RecordStore& rs, + VisibilityManager* visibilityManager) + : opCtx(opCtx), _visibilityManager(visibilityManager) { _ident = rs._ident; _prefix = rs._prefix; _postfix = rs._postfix; _isCapped = rs._isCapped; + _isOplog = rs._isOplog; } boost::optional<Record> RecordStore::Cursor::next() { @@ -443,8 +527,13 @@ boost::optional<Record> RecordStore::Cursor::next() { _lastMoveWasRestore = false; if (it != workingCopy->end() && inPrefix(it->first)) { _savedPosition = it->first; - return Record{RecordId(extractRecordId(it->first)), - RecordData(it->second.c_str(), it->second.length())}; + Record nextRecord; + nextRecord.id = RecordId(extractRecordId(it->first)); + nextRecord.data = RecordData(it->second.c_str(), it->second.length()); + + if (_isOplog && nextRecord.id >= _visibilityManager->getEarliestUncommittedRecord()) + return boost::none; + return nextRecord; } return boost::none; } @@ -455,9 +544,13 @@ boost::optional<Record> RecordStore::Cursor::seekExact(const RecordId& id) { StringStore* workingCopy(RecoveryUnit::get(opCtx)->getHead()); std::string key = createKey(_ident, id.repr()); it = workingCopy->find(key); - if (it == workingCopy->end() || !inPrefix(it->first)) { + + if (it == workingCopy->end() || !inPrefix(it->first)) return boost::none; - } + + if (_isOplog && id >= _visibilityManager->getEarliestUncommittedRecord()) + return boost::none; + _needFirstSeek = false; _savedPosition = it->first; return Record{id, RecordData(it->second.c_str(), it->second.length())}; @@ -490,13 +583,16 @@ bool RecordStore::Cursor::inPrefix(const std::string& key_string) { return (key_string > _prefix) && (key_string < _postfix); } -RecordStore::ReverseCursor::ReverseCursor(OperationContext* opCtx, const RecordStore& rs) - : opCtx(opCtx) { +RecordStore::ReverseCursor::ReverseCursor(OperationContext* opCtx, + const RecordStore& rs, + VisibilityManager* visibilityManager) + : opCtx(opCtx), _visibilityManager(visibilityManager) { _savedPosition = boost::none; _ident = rs._ident; _prefix = rs._prefix; _postfix = rs._postfix; _isCapped = rs._isCapped; + _isOplog = rs._isOplog; } boost::optional<Record> RecordStore::ReverseCursor::next() { @@ -515,6 +611,9 @@ boost::optional<Record> RecordStore::ReverseCursor::next() { Record nextRecord; nextRecord.id = RecordId(extractRecordId(it->first)); nextRecord.data = RecordData(it->second.c_str(), it->second.length()); + + if (_isOplog && nextRecord.id >= _visibilityManager->getEarliestUncommittedRecord()) + return boost::none; return nextRecord; } return boost::none; @@ -530,6 +629,10 @@ boost::optional<Record> RecordStore::ReverseCursor::seekExact(const RecordId& id it = workingCopy->rend(); return boost::none; } + + if (_isOplog && id >= _visibilityManager->getEarliestUncommittedRecord()) + return boost::none; + it = StringStore::const_reverse_iterator(++canFind); // reverse iterator returns item 1 before _savedPosition = it->first; return Record{id, RecordData(it->second.c_str(), it->second.length())}; diff --git a/src/mongo/db/storage/biggie/biggie_record_store.h b/src/mongo/db/storage/biggie/biggie_record_store.h index d94bf97d89a..cfbaf67e8eb 100644 --- a/src/mongo/db/storage/biggie/biggie_record_store.h +++ b/src/mongo/db/storage/biggie/biggie_record_store.h @@ -34,6 +34,7 @@ #include <map> #include "mongo/db/concurrency/d_concurrency.h" +#include "mongo/db/storage/biggie/biggie_visibility_manager.h" #include "mongo/db/storage/biggie/store.h" #include "mongo/db/storage/capped_callback.h" #include "mongo/db/storage/record_store.h" @@ -42,6 +43,7 @@ namespace mongo { namespace biggie { + /** * A RecordStore that stores all data in-memory. */ @@ -53,6 +55,7 @@ public: int64_t cappedMaxSize = -1, int64_t cappedMaxDocs = -1, CappedCallback* cappedCallback = nullptr); + ~RecordStore() = default; virtual const char* name() const; virtual const std::string& getIdent() const; @@ -111,7 +114,10 @@ public: virtual Status touch(OperationContext* opCtx, BSONObjBuilder* output) const; - void waitForAllEarlierOplogWritesToBeVisible(OperationContext* opCtx) const; + virtual boost::optional<RecordId> oplogStartHack(OperationContext* opCtx, + const RecordId& startingPosition) const; + + void waitForAllEarlierOplogWritesToBeVisible(OperationContext* opCtx) const override; virtual void updateStatsAfterRepair(OperationContext* opCtx, long long numRecords, @@ -139,6 +145,9 @@ private: AtomicInt64 _numRecords{0}; std::string generateKey(const uint8_t* key, size_t key_len) const; + bool _isOplog; + std::unique_ptr<VisibilityManager> _visibilityManager; + /* * This gets the next (guaranteed) unique record id. */ @@ -146,6 +155,10 @@ private: return _highest_record_id.fetchAndAdd(1); } + StatusWith<RecordId> extractAndCheckLocForOplog(OperationContext* opCtx, + const char* data, + int len) const; + bool cappedAndNeedDelete(OperationContext* opCtx, StringStore* workingCopy); void cappedDeleteAsNeeded(OperationContext* opCtx, StringStore* workingCopy); @@ -159,9 +172,13 @@ private: bool _needFirstSeek = true; bool _lastMoveWasRestore = false; bool _isCapped; + bool _isOplog; + VisibilityManager* _visibilityManager; public: - Cursor(OperationContext* opCtx, const RecordStore& rs); + Cursor(OperationContext* opCtx, + const RecordStore& rs, + VisibilityManager* visibilityManager); boost::optional<Record> next() final; boost::optional<Record> seekExact(const RecordId& id) final override; void save() final; @@ -184,9 +201,13 @@ private: bool _needFirstSeek = true; bool _lastMoveWasRestore = false; bool _isCapped; + bool _isOplog; + VisibilityManager* _visibilityManager; public: - ReverseCursor(OperationContext* opCtx, const RecordStore& rs); + ReverseCursor(OperationContext* opCtx, + const RecordStore& rs, + VisibilityManager* visibilityManager); boost::optional<Record> next() final; boost::optional<Record> seekExact(const RecordId& id) final override; void save() final; @@ -199,5 +220,6 @@ private: bool inPrefix(const std::string& key_string); }; }; + } // namespace biggie } // namespace mongo |