summaryrefslogtreecommitdiff
path: root/src/mongo/db/storage
diff options
context:
space:
mode:
authorGregory Wlodarek <gregory.wlodarek@mongodb.com>2019-01-02 21:38:14 -0500
committerGregory Wlodarek <gregory.wlodarek@mongodb.com>2019-01-02 21:51:19 -0500
commita8fabf5c4e4d5e456bcfa3510618e7ce96fb2d4e (patch)
tree009f299a9364f1eae73a9d7f803c7aca0739ae39 /src/mongo/db/storage
parent1e6e1401ffe10264404c032f9efdc77d0ffcc649 (diff)
downloadmongo-a8fabf5c4e4d5e456bcfa3510618e7ce96fb2d4e.tar.gz
SERVER-37788 Implement oplog hack and plugin oplog visibility manager in the Biggie record store
Diffstat (limited to 'src/mongo/db/storage')
-rw-r--r--src/mongo/db/storage/biggie/biggie_record_store.cpp141
-rw-r--r--src/mongo/db/storage/biggie/biggie_record_store.h28
2 files changed, 147 insertions, 22 deletions
diff --git a/src/mongo/db/storage/biggie/biggie_record_store.cpp b/src/mongo/db/storage/biggie/biggie_record_store.cpp
index d5d520858be..4ac476864ea 100644
--- a/src/mongo/db/storage/biggie/biggie_record_store.cpp
+++ b/src/mongo/db/storage/biggie/biggie_record_store.cpp
@@ -40,8 +40,10 @@
#include "mongo/db/operation_context.h"
#include "mongo/db/storage/biggie/biggie_record_store.h"
#include "mongo/db/storage/biggie/biggie_recovery_unit.h"
+#include "mongo/db/storage/biggie/biggie_visibility_manager.h"
#include "mongo/db/storage/biggie/store.h"
#include "mongo/db/storage/key_string.h"
+#include "mongo/db/storage/oplog_hack.h"
#include "mongo/db/storage/write_unit_of_work.h"
#include "mongo/stdx/memory.h"
#include "mongo/util/hex.h"
@@ -86,7 +88,9 @@ RecordStore::RecordStore(StringData ns,
_ident(_identStr.data(), _identStr.size()),
_prefix(createKey(_ident, std::numeric_limits<int64_t>::min())),
_postfix(createKey(_ident, std::numeric_limits<int64_t>::max())),
- _cappedCallback(cappedCallback) {
+ _cappedCallback(cappedCallback),
+ _isOplog(NamespaceString::oplog(ns)),
+ _visibilityManager(std::make_unique<VisibilityManager>(this)) {
if (_isCapped) {
invariant(_cappedMaxSize > 0);
invariant(_cappedMaxDocs == -1 || _cappedMaxDocs > 0);
@@ -171,7 +175,17 @@ Status RecordStore::insertRecords(OperationContext* opCtx,
auto ru = RecoveryUnit::get(opCtx);
StringStore* workingCopy(ru->getHead());
for (auto& record : *inOutRecords) {
- int64_t thisRecordId = nextRecordId();
+ int64_t thisRecordId = 0;
+ if (_isOplog) {
+ StatusWith<RecordId> status =
+ extractAndCheckLocForOplog(opCtx, record.data.data(), record.data.size());
+ if (!status.isOK())
+ return status.getStatus();
+ thisRecordId = status.getValue().repr();
+ _visibilityManager->addUncommittedRecord(opCtx, RecordId(thisRecordId));
+ } else {
+ thisRecordId = nextRecordId();
+ }
workingCopy->insert(StringStore::value_type{
createKey(_ident, thisRecordId), std::string(record.data.data(), record.data.size())});
record.id = RecordId(thisRecordId);
@@ -203,11 +217,22 @@ Status RecordStore::insertRecordsWithDocWriter(OperationContext* opCtx,
for (size_t i = 0; i < nDocs; i++) {
const size_t len = docs[i]->documentSize();
- int64_t thisRecordId = nextRecordId();
+ std::string buf(len, '\0');
+ docs[i]->writeDocument(&buf[0]);
+
+ int64_t thisRecordId = 0;
+ if (_isOplog) {
+ StatusWith<RecordId> status = extractAndCheckLocForOplog(opCtx, buf.data(), len);
+ if (!status.isOK())
+ return status.getStatus();
+ thisRecordId = status.getValue().repr();
+ _visibilityManager->addUncommittedRecord(opCtx, RecordId(thisRecordId));
+ } else {
+ thisRecordId = nextRecordId();
+ }
std::string key = createKey(_ident, thisRecordId);
- StringStore::value_type vt{key, std::string(len, '\0')};
- docs[i]->writeDocument(&vt.second[0]);
+ StringStore::value_type vt{key, buf};
workingCopy->insert(std::move(vt));
if (idsOut)
idsOut[i] = RecordId(thisRecordId);
@@ -253,8 +278,8 @@ StatusWith<RecordData> RecordStore::updateWithDamages(OperationContext* opCtx,
std::unique_ptr<SeekableRecordCursor> RecordStore::getCursor(OperationContext* opCtx,
bool forward) const {
if (forward)
- return std::make_unique<Cursor>(opCtx, *this);
- return std::make_unique<ReverseCursor>(opCtx, *this);
+ return std::make_unique<Cursor>(opCtx, *this, _visibilityManager.get());
+ return std::make_unique<ReverseCursor>(opCtx, *this, _visibilityManager.get());
}
Status RecordStore::truncate(OperationContext* opCtx) {
@@ -374,16 +399,64 @@ Status RecordStore::touch(OperationContext* opCtx, BSONObjBuilder* output) const
return Status::OK(); // All data is already in 'cache'.
}
-void RecordStore::waitForAllEarlierOplogWritesToBeVisible(OperationContext* opCtx) const {
- // Shouldn't need to do anything here as writes are visible on commit.
-}
-
void RecordStore::updateStatsAfterRepair(OperationContext* opCtx,
long long numRecords,
long long dataSize) {
// TODO: Implement.
}
+void RecordStore::waitForAllEarlierOplogWritesToBeVisible(OperationContext* opCtx) const {
+ _visibilityManager->waitForAllEarlierOplogWritesToBeVisible(opCtx);
+}
+
+boost::optional<RecordId> RecordStore::oplogStartHack(OperationContext* opCtx,
+ const RecordId& startingPosition) const {
+ if (!_isOplog)
+ return boost::none;
+
+ if (numRecords(opCtx) == 0)
+ return RecordId();
+
+ StringStore* workingCopy{RecoveryUnit::get(opCtx)->getHead()};
+
+ std::string key = createKey(_ident, startingPosition.repr());
+ StringStore::const_reverse_iterator it(workingCopy->upper_bound(key));
+
+ if (it == workingCopy->rend())
+ return RecordId();
+
+ RecordId rid = RecordId(extractRecordId(it->first));
+ if (rid > startingPosition)
+ return RecordId();
+
+ return rid;
+}
+
+StatusWith<RecordId> RecordStore::extractAndCheckLocForOplog(OperationContext* opCtx,
+ const char* data,
+ int len) const {
+ StatusWith<RecordId> status = oploghack::extractKey(data, len);
+ if (!status.isOK())
+ return status;
+
+ StringStore* workingCopy(RecoveryUnit::get(opCtx)->getHead());
+ StringStore::const_reverse_iterator it = workingCopy->upper_bound(_postfix);
+
+ if (numRecords(opCtx) == 0)
+ return status;
+
+ RecordId rid = RecordId(extractRecordId(it->first));
+ if (status.getValue() <= rid)
+ return StatusWith<RecordId>(ErrorCodes::BadValue,
+ str::stream() << "attempted out-of-order oplog insert of "
+ << status.getValue()
+ << " (oplog last insert was "
+ << rid
+ << " )");
+
+ return status;
+}
+
bool RecordStore::cappedAndNeedDelete(OperationContext* opCtx, StringStore* workingCopy) {
if (!_isCapped)
return false;
@@ -408,8 +481,15 @@ void RecordStore::cappedDeleteAsNeeded(OperationContext* opCtx, StringStore* wor
invariant(numRecords(opCtx) > 0);
stdx::lock_guard<stdx::mutex> cappedCallbackLock(_cappedCallbackMutex);
+ RecordId rid = RecordId(extractRecordId(recordIt->first));
+
+ if (_isOplog && _visibilityManager->isFirstHidden(rid)) {
+ // We have a record that hasn't been committed yet, so we shouldn't truncate anymore
+ // until it gets committed.
+ return;
+ }
+
if (_cappedCallback) {
- RecordId rid = RecordId(extractRecordId(recordIt->first));
RecordData rd = RecordData(recordIt->second.c_str(), recordIt->second.length());
uassertStatusOK(_cappedCallback->aboutToDeleteCapped(opCtx, rid, rd));
}
@@ -424,11 +504,15 @@ void RecordStore::cappedDeleteAsNeeded(OperationContext* opCtx, StringStore* wor
}
}
-RecordStore::Cursor::Cursor(OperationContext* opCtx, const RecordStore& rs) : opCtx(opCtx) {
+RecordStore::Cursor::Cursor(OperationContext* opCtx,
+ const RecordStore& rs,
+ VisibilityManager* visibilityManager)
+ : opCtx(opCtx), _visibilityManager(visibilityManager) {
_ident = rs._ident;
_prefix = rs._prefix;
_postfix = rs._postfix;
_isCapped = rs._isCapped;
+ _isOplog = rs._isOplog;
}
boost::optional<Record> RecordStore::Cursor::next() {
@@ -443,8 +527,13 @@ boost::optional<Record> RecordStore::Cursor::next() {
_lastMoveWasRestore = false;
if (it != workingCopy->end() && inPrefix(it->first)) {
_savedPosition = it->first;
- return Record{RecordId(extractRecordId(it->first)),
- RecordData(it->second.c_str(), it->second.length())};
+ Record nextRecord;
+ nextRecord.id = RecordId(extractRecordId(it->first));
+ nextRecord.data = RecordData(it->second.c_str(), it->second.length());
+
+ if (_isOplog && nextRecord.id >= _visibilityManager->getEarliestUncommittedRecord())
+ return boost::none;
+ return nextRecord;
}
return boost::none;
}
@@ -455,9 +544,13 @@ boost::optional<Record> RecordStore::Cursor::seekExact(const RecordId& id) {
StringStore* workingCopy(RecoveryUnit::get(opCtx)->getHead());
std::string key = createKey(_ident, id.repr());
it = workingCopy->find(key);
- if (it == workingCopy->end() || !inPrefix(it->first)) {
+
+ if (it == workingCopy->end() || !inPrefix(it->first))
return boost::none;
- }
+
+ if (_isOplog && id >= _visibilityManager->getEarliestUncommittedRecord())
+ return boost::none;
+
_needFirstSeek = false;
_savedPosition = it->first;
return Record{id, RecordData(it->second.c_str(), it->second.length())};
@@ -490,13 +583,16 @@ bool RecordStore::Cursor::inPrefix(const std::string& key_string) {
return (key_string > _prefix) && (key_string < _postfix);
}
-RecordStore::ReverseCursor::ReverseCursor(OperationContext* opCtx, const RecordStore& rs)
- : opCtx(opCtx) {
+RecordStore::ReverseCursor::ReverseCursor(OperationContext* opCtx,
+ const RecordStore& rs,
+ VisibilityManager* visibilityManager)
+ : opCtx(opCtx), _visibilityManager(visibilityManager) {
_savedPosition = boost::none;
_ident = rs._ident;
_prefix = rs._prefix;
_postfix = rs._postfix;
_isCapped = rs._isCapped;
+ _isOplog = rs._isOplog;
}
boost::optional<Record> RecordStore::ReverseCursor::next() {
@@ -515,6 +611,9 @@ boost::optional<Record> RecordStore::ReverseCursor::next() {
Record nextRecord;
nextRecord.id = RecordId(extractRecordId(it->first));
nextRecord.data = RecordData(it->second.c_str(), it->second.length());
+
+ if (_isOplog && nextRecord.id >= _visibilityManager->getEarliestUncommittedRecord())
+ return boost::none;
return nextRecord;
}
return boost::none;
@@ -530,6 +629,10 @@ boost::optional<Record> RecordStore::ReverseCursor::seekExact(const RecordId& id
it = workingCopy->rend();
return boost::none;
}
+
+ if (_isOplog && id >= _visibilityManager->getEarliestUncommittedRecord())
+ return boost::none;
+
it = StringStore::const_reverse_iterator(++canFind); // reverse iterator returns item 1 before
_savedPosition = it->first;
return Record{id, RecordData(it->second.c_str(), it->second.length())};
diff --git a/src/mongo/db/storage/biggie/biggie_record_store.h b/src/mongo/db/storage/biggie/biggie_record_store.h
index d94bf97d89a..cfbaf67e8eb 100644
--- a/src/mongo/db/storage/biggie/biggie_record_store.h
+++ b/src/mongo/db/storage/biggie/biggie_record_store.h
@@ -34,6 +34,7 @@
#include <map>
#include "mongo/db/concurrency/d_concurrency.h"
+#include "mongo/db/storage/biggie/biggie_visibility_manager.h"
#include "mongo/db/storage/biggie/store.h"
#include "mongo/db/storage/capped_callback.h"
#include "mongo/db/storage/record_store.h"
@@ -42,6 +43,7 @@
namespace mongo {
namespace biggie {
+
/**
* A RecordStore that stores all data in-memory.
*/
@@ -53,6 +55,7 @@ public:
int64_t cappedMaxSize = -1,
int64_t cappedMaxDocs = -1,
CappedCallback* cappedCallback = nullptr);
+ ~RecordStore() = default;
virtual const char* name() const;
virtual const std::string& getIdent() const;
@@ -111,7 +114,10 @@ public:
virtual Status touch(OperationContext* opCtx, BSONObjBuilder* output) const;
- void waitForAllEarlierOplogWritesToBeVisible(OperationContext* opCtx) const;
+ virtual boost::optional<RecordId> oplogStartHack(OperationContext* opCtx,
+ const RecordId& startingPosition) const;
+
+ void waitForAllEarlierOplogWritesToBeVisible(OperationContext* opCtx) const override;
virtual void updateStatsAfterRepair(OperationContext* opCtx,
long long numRecords,
@@ -139,6 +145,9 @@ private:
AtomicInt64 _numRecords{0};
std::string generateKey(const uint8_t* key, size_t key_len) const;
+ bool _isOplog;
+ std::unique_ptr<VisibilityManager> _visibilityManager;
+
/*
* This gets the next (guaranteed) unique record id.
*/
@@ -146,6 +155,10 @@ private:
return _highest_record_id.fetchAndAdd(1);
}
+ StatusWith<RecordId> extractAndCheckLocForOplog(OperationContext* opCtx,
+ const char* data,
+ int len) const;
+
bool cappedAndNeedDelete(OperationContext* opCtx, StringStore* workingCopy);
void cappedDeleteAsNeeded(OperationContext* opCtx, StringStore* workingCopy);
@@ -159,9 +172,13 @@ private:
bool _needFirstSeek = true;
bool _lastMoveWasRestore = false;
bool _isCapped;
+ bool _isOplog;
+ VisibilityManager* _visibilityManager;
public:
- Cursor(OperationContext* opCtx, const RecordStore& rs);
+ Cursor(OperationContext* opCtx,
+ const RecordStore& rs,
+ VisibilityManager* visibilityManager);
boost::optional<Record> next() final;
boost::optional<Record> seekExact(const RecordId& id) final override;
void save() final;
@@ -184,9 +201,13 @@ private:
bool _needFirstSeek = true;
bool _lastMoveWasRestore = false;
bool _isCapped;
+ bool _isOplog;
+ VisibilityManager* _visibilityManager;
public:
- ReverseCursor(OperationContext* opCtx, const RecordStore& rs);
+ ReverseCursor(OperationContext* opCtx,
+ const RecordStore& rs,
+ VisibilityManager* visibilityManager);
boost::optional<Record> next() final;
boost::optional<Record> seekExact(const RecordId& id) final override;
void save() final;
@@ -199,5 +220,6 @@ private:
bool inPrefix(const std::string& key_string);
};
};
+
} // namespace biggie
} // namespace mongo