diff options
author | Mathias Stearn <mathias@10gen.com> | 2016-05-06 12:45:55 -0400 |
---|---|---|
committer | Mathias Stearn <mathias@10gen.com> | 2016-05-09 17:52:12 -0400 |
commit | 839a4d88a9b4627f0aa69ad6faba2c36e10b41ae (patch) | |
tree | b44c6c20b8b9d0e87229d652206935afd647c1ba /src | |
parent | 955304d50df6c94bb10e757736e531ce91627d23 (diff) | |
download | mongo-839a4d88a9b4627f0aa69ad6faba2c36e10b41ae.tar.gz |
SERVER-24005 Pass batches of DocWriters all the way down to the RecordStore
This further amortizes the costs of bookkeeping work over the whole batch.
Diffstat (limited to 'src')
19 files changed, 227 insertions, 134 deletions
diff --git a/src/mongo/db/catalog/collection.cpp b/src/mongo/db/catalog/collection.cpp index 40c2cd100a6..84c596c25ef 100644 --- a/src/mongo/db/catalog/collection.cpp +++ b/src/mongo/db/catalog/collection.cpp @@ -324,24 +324,25 @@ StatusWithMatchExpression Collection::parseValidator(const BSONObj& validator) c return statusWithMatcher; } -Status Collection::insertDocument(OperationContext* txn, const DocWriter* doc, bool enforceQuota) { - invariant(!_validator || documentValidationDisabled(txn)); +Status Collection::insertDocumentsForOplog(OperationContext* txn, + const DocWriter* const* docs, + size_t nDocs) { dassert(txn->lockState()->isCollectionLockedForMode(ns().toString(), MODE_IX)); - invariant(!_indexCatalog.haveAnyIndexes()); // eventually can implement, just not done - if (_mustTakeCappedLockOnInsert) - synchronizeOnCappedInFlightResource(txn->lockState(), _ns); - - StatusWith<RecordId> loc = _recordStore->insertRecord(txn, doc, _enforceQuota(enforceQuota)); - if (!loc.isOK()) - return loc.getStatus(); + // Since this is only for the OpLog, we can assume these for simplicity. + // This also means that we do not need to forward this object to the OpObserver, which is good + // because it would defeat the purpose of using DocWriter. + invariant(!_validator); + invariant(!_indexCatalog.haveAnyIndexes()); + invariant(!_mustTakeCappedLockOnInsert); - // we cannot call into the OpObserver here because the document being written is not present - // fortunately, this is currently only used for adding entries to the oplog. + Status status = _recordStore->insertRecordsWithDocWriter(txn, docs, nDocs); + if (!status.isOK()) + return status; txn->recoveryUnit()->onCommit([this]() { notifyCappedWaitersIfNeeded(); }); - return loc.getStatus(); + return status; } @@ -439,7 +440,8 @@ Status Collection::_insertDocuments(OperationContext* txn, OpDebug* opDebug) { dassert(txn->lockState()->isCollectionLockedForMode(ns().toString(), MODE_IX)); - if (isCapped() && _indexCatalog.haveAnyIndexes() && std::distance(begin, end) > 1) { + const size_t count = std::distance(begin, end); + if (isCapped() && _indexCatalog.haveAnyIndexes() && count > 1) { // We require that inserts to indexed capped collections be done one-at-a-time to avoid the // possibility that a later document causes an earlier document to be deleted before it can // be indexed. @@ -456,6 +458,7 @@ Status Collection::_insertDocuments(OperationContext* txn, } std::vector<Record> records; + records.reserve(count); for (auto it = begin; it != end; it++) { Record record = {RecordId(), RecordData(it->objdata(), it->objsize())}; records.push_back(record); @@ -465,6 +468,7 @@ Status Collection::_insertDocuments(OperationContext* txn, return status; std::vector<BsonRecord> bsonRecords; + bsonRecords.reserve(count); int recordIndex = 0; for (auto it = begin; it != end; it++) { RecordId loc = records[recordIndex++].id; diff --git a/src/mongo/db/catalog/collection.h b/src/mongo/db/catalog/collection.h index f5e4e0fe700..3f8c51e6756 100644 --- a/src/mongo/db/catalog/collection.h +++ b/src/mongo/db/catalog/collection.h @@ -293,7 +293,9 @@ public: * Callers must ensure no document validation is performed for this collection when calling * this method. */ - Status insertDocument(OperationContext* txn, const DocWriter* doc, bool enforceQuota); + Status insertDocumentsForOplog(OperationContext* txn, + const DocWriter* const* docs, + size_t nDocs); Status insertDocument(OperationContext* txn, const BSONObj& doc, diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index 2566562b1da..a58de781952 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -375,8 +375,7 @@ void _logOpsInner(OperationContext* txn, // we jump through a bunch of hoops here to avoid copying the obj buffer twice -- // instead we do a single copy to the destination in the record store. - for (size_t i = 0; i < nWriters; i++) - checkOplogInsert(oplogCollection->insertDocument(txn, writers[i], false)); + checkOplogInsert(oplogCollection->insertDocumentsForOplog(txn, writers, nWriters)); // Set replCoord last optime only after we're sure the WUOW didn't abort and roll back. if (updateReplOpTime) { diff --git a/src/mongo/db/storage/devnull/devnull_kv_engine.cpp b/src/mongo/db/storage/devnull/devnull_kv_engine.cpp index 65a16414dcc..2b1a70d34a1 100644 --- a/src/mongo/db/storage/devnull/devnull_kv_engine.cpp +++ b/src/mongo/db/storage/devnull/devnull_kv_engine.cpp @@ -104,11 +104,17 @@ public: return StatusWith<RecordId>(RecordId(6, 4)); } - virtual StatusWith<RecordId> insertRecord(OperationContext* txn, - const DocWriter* doc, - bool enforceQuota) { - _numInserts++; - return StatusWith<RecordId>(RecordId(6, 4)); + virtual Status insertRecordsWithDocWriter(OperationContext* txn, + const DocWriter* const* docs, + size_t nDocs, + RecordId* idsOut) { + _numInserts += nDocs; + if (idsOut) { + for (size_t i = 0; i < nDocs; i++) { + idsOut[i] = RecordId(6, 4); + } + } + return Status::OK(); } virtual Status updateRecord(OperationContext* txn, diff --git a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_record_store.cpp b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_record_store.cpp index ddee4d85a75..6969242dd73 100644 --- a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_record_store.cpp +++ b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_record_store.cpp @@ -398,36 +398,42 @@ StatusWith<RecordId> EphemeralForTestRecordStore::insertRecord(OperationContext* return StatusWith<RecordId>(loc); } -StatusWith<RecordId> EphemeralForTestRecordStore::insertRecord(OperationContext* txn, - const DocWriter* doc, - bool enforceQuota) { - const int len = doc->documentSize(); - if (_isCapped && len > _cappedMaxSize) { - // We use dataSize for capped rollover and we don't want to delete everything if we know - // this won't fit. - return StatusWith<RecordId>(ErrorCodes::BadValue, "object to insert exceeds cappedMaxSize"); - } +Status EphemeralForTestRecordStore::insertRecordsWithDocWriter(OperationContext* txn, + const DocWriter* const* docs, + size_t nDocs, + RecordId* idsOut) { + for (size_t i = 0; i < nDocs; i++) { + const int len = docs[i]->documentSize(); + if (_isCapped && len > _cappedMaxSize) { + // We use dataSize for capped rollover and we don't want to delete everything if we know + // this won't fit. + return Status(ErrorCodes::BadValue, "object to insert exceeds cappedMaxSize"); + } - EphemeralForTestRecord rec(len); - doc->writeDocument(rec.data.get()); + EphemeralForTestRecord rec(len); + docs[i]->writeDocument(rec.data.get()); + + RecordId loc; + if (_data->isOplog) { + StatusWith<RecordId> status = extractAndCheckLocForOplog(rec.data.get(), len); + if (!status.isOK()) + return status.getStatus(); + loc = status.getValue(); + } else { + loc = allocateLoc(); + } - RecordId loc; - if (_data->isOplog) { - StatusWith<RecordId> status = extractAndCheckLocForOplog(rec.data.get(), len); - if (!status.isOK()) - return status; - loc = status.getValue(); - } else { - loc = allocateLoc(); - } + txn->recoveryUnit()->registerChange(new InsertChange(_data, loc)); + _data->dataSize += len; + _data->records[loc] = rec; - txn->recoveryUnit()->registerChange(new InsertChange(_data, loc)); - _data->dataSize += len; - _data->records[loc] = rec; + cappedDeleteAsNeeded(txn); - cappedDeleteAsNeeded(txn); + if (idsOut) + idsOut[i] = loc; + } - return StatusWith<RecordId>(loc); + return Status::OK(); } Status EphemeralForTestRecordStore::updateRecord(OperationContext* txn, diff --git a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_record_store.h b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_record_store.h index 8df67ece618..905fbca7806 100644 --- a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_record_store.h +++ b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_record_store.h @@ -65,9 +65,10 @@ public: int len, bool enforceQuota); - virtual StatusWith<RecordId> insertRecord(OperationContext* txn, - const DocWriter* doc, - bool enforceQuota); + virtual Status insertRecordsWithDocWriter(OperationContext* txn, + const DocWriter* const* docs, + size_t nDocs, + RecordId* idsOut); virtual Status updateRecord(OperationContext* txn, const RecordId& oldLocation, diff --git a/src/mongo/db/storage/mmap_v1/btree/btree_logic.cpp b/src/mongo/db/storage/mmap_v1/btree/btree_logic.cpp index fe33bf86d11..ff42dd15c16 100644 --- a/src/mongo/db/storage/mmap_v1/btree/btree_logic.cpp +++ b/src/mongo/db/storage/mmap_v1/btree/btree_logic.cpp @@ -1822,7 +1822,7 @@ void BtreeLogic<BtreeLayout>::split(OperationContext* txn, } } -class DummyDocWriter : public DocWriter { +class DummyDocWriter final : public DocWriter { public: DummyDocWriter(size_t sz) : _sz(sz) {} virtual void writeDocument(char* buf) const { /* no-op */ @@ -1848,7 +1848,7 @@ Status BtreeLogic<BtreeLayout>::initAsEmpty(OperationContext* txn) { template <class BtreeLayout> DiskLoc BtreeLogic<BtreeLayout>::_addBucket(OperationContext* txn) { DummyDocWriter docWriter(BtreeLayout::BucketSize); - StatusWith<RecordId> loc = _recordStore->insertRecord(txn, &docWriter, false); + StatusWith<RecordId> loc = _recordStore->insertRecordWithDocWriter(txn, &docWriter); // XXX: remove this(?) or turn into massert or sanely bubble it back up. uassertStatusOK(loc.getStatus()); diff --git a/src/mongo/db/storage/mmap_v1/heap_record_store_btree.cpp b/src/mongo/db/storage/mmap_v1/heap_record_store_btree.cpp index 934f9807628..123a94fdeb9 100644 --- a/src/mongo/db/storage/mmap_v1/heap_record_store_btree.cpp +++ b/src/mongo/db/storage/mmap_v1/heap_record_store_btree.cpp @@ -78,18 +78,25 @@ StatusWith<RecordId> HeapRecordStoreBtree::insertRecord(OperationContext* txn, return StatusWith<RecordId>(loc); } -StatusWith<RecordId> HeapRecordStoreBtree::insertRecord(OperationContext* txn, - const DocWriter* doc, - bool enforceQuota) { - MmapV1RecordHeader rec(doc->documentSize()); - doc->writeDocument(rec.data.get()); +Status HeapRecordStoreBtree::insertRecordsWithDocWriter(OperationContext* txn, + const DocWriter* const* docs, + size_t nDocs, + RecordId* idsOut) { + // This class is only for unit tests of the mmapv1 btree code and this is how it is called. + // If that ever changes, this class will need to be fixed. + invariant(nDocs == 1); + invariant(idsOut); + + MmapV1RecordHeader rec(docs[0]->documentSize()); + docs[0]->writeDocument(rec.data.get()); const RecordId loc = allocateLoc(); _records[loc] = rec; + *idsOut = loc; HeapRecordStoreBtreeRecoveryUnit::notifyInsert(txn, this, loc); - return StatusWith<RecordId>(loc); + return Status::OK(); } RecordId HeapRecordStoreBtree::allocateLoc() { diff --git a/src/mongo/db/storage/mmap_v1/heap_record_store_btree.h b/src/mongo/db/storage/mmap_v1/heap_record_store_btree.h index 32dec97358e..744b87b00ca 100644 --- a/src/mongo/db/storage/mmap_v1/heap_record_store_btree.h +++ b/src/mongo/db/storage/mmap_v1/heap_record_store_btree.h @@ -60,9 +60,10 @@ public: int len, bool enforceQuota); - virtual StatusWith<RecordId> insertRecord(OperationContext* txn, - const DocWriter* doc, - bool enforceQuota); + virtual Status insertRecordsWithDocWriter(OperationContext* txn, + const DocWriter* const* docs, + size_t nDocs, + RecordId* idsOut); virtual long long numRecords(OperationContext* txn) const { return _records.size(); diff --git a/src/mongo/db/storage/mmap_v1/record_store_v1_base.cpp b/src/mongo/db/storage/mmap_v1/record_store_v1_base.cpp index 790f35e98c2..57bd2640e14 100644 --- a/src/mongo/db/storage/mmap_v1/record_store_v1_base.cpp +++ b/src/mongo/db/storage/mmap_v1/record_store_v1_base.cpp @@ -294,35 +294,43 @@ DiskLoc RecordStoreV1Base::getPrevRecordInExtent(OperationContext* txn, const Di return result; } -StatusWith<RecordId> RecordStoreV1Base::insertRecord(OperationContext* txn, - const DocWriter* doc, - bool enforceQuota) { - int docSize = doc->documentSize(); - if (docSize < 4) { - return StatusWith<RecordId>(ErrorCodes::InvalidLength, "record has to be >= 4 bytes"); - } - const int lenWHdr = docSize + MmapV1RecordHeader::HeaderSize; - if (lenWHdr > MaxAllowedAllocation) { - return StatusWith<RecordId>(ErrorCodes::InvalidLength, "record has to be <= 16.5MB"); - } - const int lenToAlloc = - (doc->addPadding() && shouldPadInserts()) ? quantizeAllocationSpace(lenWHdr) : lenWHdr; +Status RecordStoreV1Base::insertRecordsWithDocWriter(OperationContext* txn, + const DocWriter* const* docs, + size_t nDocs, + RecordId* idsOut) { + for (size_t i = 0; i < nDocs; i++) { + int docSize = docs[i]->documentSize(); + if (docSize < 4) { + return Status(ErrorCodes::InvalidLength, "record has to be >= 4 bytes"); + } + const int lenWHdr = docSize + MmapV1RecordHeader::HeaderSize; + if (lenWHdr > MaxAllowedAllocation) { + return Status(ErrorCodes::InvalidLength, "record has to be <= 16.5MB"); + } + const int lenToAlloc = (docs[i]->addPadding() && shouldPadInserts()) + ? quantizeAllocationSpace(lenWHdr) + : lenWHdr; - StatusWith<DiskLoc> loc = allocRecord(txn, lenToAlloc, enforceQuota); - if (!loc.isOK()) - return StatusWith<RecordId>(loc.getStatus()); + StatusWith<DiskLoc> loc = allocRecord(txn, lenToAlloc, /*enforceQuota=*/false); + if (!loc.isOK()) + return loc.getStatus(); - MmapV1RecordHeader* r = recordFor(loc.getValue()); - fassert(17319, r->lengthWithHeaders() >= lenWHdr); + MmapV1RecordHeader* r = recordFor(loc.getValue()); + fassert(17319, r->lengthWithHeaders() >= lenWHdr); - r = reinterpret_cast<MmapV1RecordHeader*>(txn->recoveryUnit()->writingPtr(r, lenWHdr)); - doc->writeDocument(r->data()); + r = reinterpret_cast<MmapV1RecordHeader*>(txn->recoveryUnit()->writingPtr(r, lenWHdr)); + docs[i]->writeDocument(r->data()); - _addRecordToRecListInExtent(txn, r, loc.getValue()); + _addRecordToRecListInExtent(txn, r, loc.getValue()); - _details->incrementStats(txn, r->netLength(), 1); + _details->incrementStats(txn, r->netLength(), 1); - return StatusWith<RecordId>(loc.getValue().toRecordId()); + if (idsOut) + idsOut[i] = loc.getValue().toRecordId(); + } + + + return Status::OK(); } diff --git a/src/mongo/db/storage/mmap_v1/record_store_v1_base.h b/src/mongo/db/storage/mmap_v1/record_store_v1_base.h index c9ee22a40da..53b86129e13 100644 --- a/src/mongo/db/storage/mmap_v1/record_store_v1_base.h +++ b/src/mongo/db/storage/mmap_v1/record_store_v1_base.h @@ -39,7 +39,6 @@ namespace mongo { class DeletedRecord; -class DocWriter; class ExtentManager; class MmapV1RecordHeader; class OperationContext; @@ -195,9 +194,10 @@ public: int len, bool enforceQuota); - StatusWith<RecordId> insertRecord(OperationContext* txn, - const DocWriter* doc, - bool enforceQuota); + Status insertRecordsWithDocWriter(OperationContext* txn, + const DocWriter* const* docs, + size_t nDocs, + RecordId* idsOut) final; virtual Status updateRecord(OperationContext* txn, const RecordId& oldLocation, diff --git a/src/mongo/db/storage/mmap_v1/record_store_v1_simple.cpp b/src/mongo/db/storage/mmap_v1/record_store_v1_simple.cpp index 6803e5a3bc7..aa29b7f0174 100644 --- a/src/mongo/db/storage/mmap_v1/record_store_v1_simple.cpp +++ b/src/mongo/db/storage/mmap_v1/record_store_v1_simple.cpp @@ -255,7 +255,7 @@ vector<std::unique_ptr<RecordCursor>> SimpleRecordStoreV1::getManyCursors( return cursors; } -class CompactDocWriter : public DocWriter { +class CompactDocWriter final : public DocWriter { public: /** * param allocationSize - allocation size WITH header @@ -368,7 +368,7 @@ void SimpleRecordStoreV1::_compactExtent(OperationContext* txn, // start of the compact, this insert will allocate a record in a new extent. // See the comment in compact() for more details. CompactDocWriter writer(recOld, rawDataSize, allocationSize); - StatusWith<RecordId> status = insertRecord(txn, &writer, false); + StatusWith<RecordId> status = insertRecordWithDocWriter(txn, &writer); uassertStatusOK(status.getStatus()); const MmapV1RecordHeader* newRec = recordFor(DiskLoc::fromRecordId(status.getValue())); diff --git a/src/mongo/db/storage/mmap_v1/record_store_v1_simple_test.cpp b/src/mongo/db/storage/mmap_v1/record_store_v1_simple_test.cpp index e4e85168b01..573d4975fbf 100644 --- a/src/mongo/db/storage/mmap_v1/record_store_v1_simple_test.cpp +++ b/src/mongo/db/storage/mmap_v1/record_store_v1_simple_test.cpp @@ -94,7 +94,7 @@ BSONObj docForRecordSize(int size) { return x; } -class BsonDocWriter : public DocWriter { +class BsonDocWriter final : public DocWriter { public: BsonDocWriter(const BSONObj& obj, bool padding) : _obj(obj), _padding(padding) {} @@ -174,7 +174,7 @@ TEST(SimpleRecordStoreV1, AllocQuantizedWithDocWriter) { SimpleRecordStoreV1 rs(&txn, myns, md, &em, false); BsonDocWriter docWriter(docForRecordSize(300), true); - StatusWith<RecordId> result = rs.insertRecord(&txn, &docWriter, false); + StatusWith<RecordId> result = rs.insertRecordWithDocWriter(&txn, &docWriter); ASSERT(result.isOK()); // The length of the allocated record is quantized. @@ -193,7 +193,7 @@ TEST(SimpleRecordStoreV1, AllocNonQuantizedDocWriter) { SimpleRecordStoreV1 rs(&txn, myns + "$x", md, &em, false); BsonDocWriter docWriter(docForRecordSize(300), false); - StatusWith<RecordId> result = rs.insertRecord(&txn, &docWriter, false); + StatusWith<RecordId> result = rs.insertRecordWithDocWriter(&txn, &docWriter); ASSERT(result.isOK()); // The length of the allocated record is not quantized. @@ -210,7 +210,7 @@ TEST(SimpleRecordStoreV1, AllocAlignedDocWriter) { SimpleRecordStoreV1 rs(&txn, myns + "$x", md, &em, false); BsonDocWriter docWriter(docForRecordSize(298), false); - StatusWith<RecordId> result = rs.insertRecord(&txn, &docWriter, false); + StatusWith<RecordId> result = rs.insertRecordWithDocWriter(&txn, &docWriter); ASSERT(result.isOK()); ASSERT_EQUALS(300, rs.dataFor(&txn, result.getValue()).size() + MmapV1RecordHeader::HeaderSize); @@ -230,7 +230,7 @@ TEST(SimpleRecordStoreV1, AllocUseQuantizedDeletedRecordWithoutSplit) { } BsonDocWriter docWriter(docForRecordSize(300), true); - StatusWith<RecordId> actualLocation = rs.insertRecord(&txn, &docWriter, false); + StatusWith<RecordId> actualLocation = rs.insertRecordWithDocWriter(&txn, &docWriter); ASSERT_OK(actualLocation.getStatus()); { @@ -255,7 +255,7 @@ TEST(SimpleRecordStoreV1, AllocUseQuantizedDeletedRecordWithSplit) { } BsonDocWriter docWriter(docForRecordSize(300), true); - StatusWith<RecordId> actualLocation = rs.insertRecord(&txn, &docWriter, false); + StatusWith<RecordId> actualLocation = rs.insertRecordWithDocWriter(&txn, &docWriter); ASSERT_OK(actualLocation.getStatus()); { @@ -280,7 +280,7 @@ TEST(SimpleRecordStoreV1, AllocUseNonQuantizedDeletedRecordWithoutSplit) { } BsonDocWriter docWriter(docForRecordSize(300), false); - StatusWith<RecordId> actualLocation = rs.insertRecord(&txn, &docWriter, false); + StatusWith<RecordId> actualLocation = rs.insertRecordWithDocWriter(&txn, &docWriter); ASSERT_OK(actualLocation.getStatus()); { @@ -305,7 +305,7 @@ TEST(SimpleRecordStoreV1, AllocUseNonQuantizedDeletedRecordWithSplit) { } BsonDocWriter docWriter(docForRecordSize(300), false); - StatusWith<RecordId> actualLocation = rs.insertRecord(&txn, &docWriter, false); + StatusWith<RecordId> actualLocation = rs.insertRecordWithDocWriter(&txn, &docWriter); ASSERT_OK(actualLocation.getStatus()); { @@ -332,7 +332,7 @@ TEST(SimpleRecordStoreV1, GrabBagIsUsed) { } BsonDocWriter docWriter(docForRecordSize(256), false); - StatusWith<RecordId> actualLocation = rs.insertRecord(&txn, &docWriter, false); + StatusWith<RecordId> actualLocation = rs.insertRecordWithDocWriter(&txn, &docWriter); ASSERT_OK(actualLocation.getStatus()); { @@ -360,7 +360,7 @@ TEST(SimpleRecordStoreV1, GrabBagIsPoppedEvenIfUnneeded) { } BsonDocWriter docWriter(docForRecordSize(1000), false); - StatusWith<RecordId> actualLocation = rs.insertRecord(&txn, &docWriter, false); + StatusWith<RecordId> actualLocation = rs.insertRecordWithDocWriter(&txn, &docWriter); ASSERT_OK(actualLocation.getStatus()); { @@ -388,7 +388,7 @@ TEST(SimpleRecordStoreV1, GrabBagIsPoppedEvenIfUnusable) { } BsonDocWriter docWriter(docForRecordSize(8 * 1024 * 1024), false); - StatusWith<RecordId> actualLocation = rs.insertRecord(&txn, &docWriter, false); + StatusWith<RecordId> actualLocation = rs.insertRecordWithDocWriter(&txn, &docWriter); ASSERT_OK(actualLocation.getStatus()); { diff --git a/src/mongo/db/storage/record_store.h b/src/mongo/db/storage/record_store.h index ea0ad7c69b7..648250148ba 100644 --- a/src/mongo/db/storage/record_store.h +++ b/src/mongo/db/storage/record_store.h @@ -45,7 +45,6 @@ class CappedCallback; class Collection; struct CompactOptions; struct CompactStats; -class DocWriter; class MAdvise; class NamespaceDetails; class OperationContext; @@ -62,12 +61,15 @@ class ValidateAdaptor; */ class DocWriter { public: - virtual ~DocWriter() {} virtual void writeDocument(char* buf) const = 0; virtual size_t documentSize() const = 0; virtual bool addPadding() const { return true; } + +protected: + // Can't delete through base pointer. + ~DocWriter() = default; }; /** @@ -370,10 +372,6 @@ public: int len, bool enforceQuota) = 0; - virtual StatusWith<RecordId> insertRecord(OperationContext* txn, - const DocWriter* doc, - bool enforceQuota) = 0; - virtual Status insertRecords(OperationContext* txn, std::vector<Record>* records, bool enforceQuota) { @@ -389,6 +387,31 @@ public: } /** + * Inserts nDocs documents into this RecordStore using the DocWriter interface. + * + * This allows the storage engine to reserve space for a record and have it built in-place + * rather than building the record then copying it into its destination. + * + * On success, if idsOut is non-null the RecordIds of the inserted records will be written into + * it. It must have space for nDocs RecordIds. + */ + virtual Status insertRecordsWithDocWriter(OperationContext* txn, + const DocWriter* const* docs, + size_t nDocs, + RecordId* idsOut = nullptr) = 0; + + /** + * A thin wrapper around insertRecordsWithDocWriter() to simplify handling of single DocWriters. + */ + StatusWith<RecordId> insertRecordWithDocWriter(OperationContext* txn, const DocWriter* doc) { + RecordId out; + Status status = insertRecordsWithDocWriter(txn, &doc, 1, &out); + if (!status.isOK()) + return status; + return out; + } + + /** * @param notifier - Only used by record stores which do not support doc-locking. Called only * in the case of an in-place update. Called just before the in-place write * occurs. diff --git a/src/mongo/db/storage/record_store_test_docwriter.h b/src/mongo/db/storage/record_store_test_docwriter.h index b6032f2509a..f788c8a2541 100644 --- a/src/mongo/db/storage/record_store_test_docwriter.h +++ b/src/mongo/db/storage/record_store_test_docwriter.h @@ -35,7 +35,7 @@ namespace mongo { namespace { -class StringDocWriter : public DocWriter { +class StringDocWriter final : public DocWriter { public: StringDocWriter(const std::string& data, bool padding) : _data(data), _padding(padding) {} diff --git a/src/mongo/db/storage/record_store_test_harness.cpp b/src/mongo/db/storage/record_store_test_harness.cpp index 8fc234ab9cd..75561e1429b 100644 --- a/src/mongo/db/storage/record_store_test_harness.cpp +++ b/src/mongo/db/storage/record_store_test_harness.cpp @@ -97,7 +97,7 @@ TEST(RecordStoreTestHarness, Simple1) { } namespace { -class DummyDocWriter : public DocWriter { +class DummyDocWriter final : public DocWriter { public: virtual ~DummyDocWriter() {} virtual void writeDocument(char* buf) const { @@ -125,7 +125,7 @@ TEST(RecordStoreTestHarness, Simple1InsertDocWroter) { { WriteUnitOfWork uow(opCtx.get()); DummyDocWriter dw; - StatusWith<RecordId> res = rs->insertRecord(opCtx.get(), &dw, false); + StatusWith<RecordId> res = rs->insertRecordWithDocWriter(opCtx.get(), &dw); ASSERT_OK(res.getStatus()); loc1 = res.getValue(); uow.commit(); diff --git a/src/mongo/db/storage/record_store_test_insertrecord.cpp b/src/mongo/db/storage/record_store_test_insertrecord.cpp index 4f62256c8e1..03f8bcebc7a 100644 --- a/src/mongo/db/storage/record_store_test_insertrecord.cpp +++ b/src/mongo/db/storage/record_store_test_insertrecord.cpp @@ -127,7 +127,7 @@ TEST(RecordStoreTestHarness, InsertRecordUsingDocWriter) { StringDocWriter docWriter("my record", false); WriteUnitOfWork uow(opCtx.get()); - StatusWith<RecordId> res = rs->insertRecord(opCtx.get(), &docWriter, false); + StatusWith<RecordId> res = rs->insertRecordWithDocWriter(opCtx.get(), &docWriter); ASSERT_OK(res.getStatus()); loc = res.getValue(); uow.commit(); @@ -161,7 +161,7 @@ TEST(RecordStoreTestHarness, InsertMultipleRecordsUsingDocWriter) { StringDocWriter docWriter(ss.str(), false); WriteUnitOfWork uow(opCtx.get()); - StatusWith<RecordId> res = rs->insertRecord(opCtx.get(), &docWriter, false); + StatusWith<RecordId> res = rs->insertRecordWithDocWriter(opCtx.get(), &docWriter); ASSERT_OK(res.getStatus()); locs[i] = res.getValue(); uow.commit(); diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp index 87fd30bb27d..ef1442342a0 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp @@ -1261,11 +1261,17 @@ void WiredTigerRecordStore::reclaimOplog(OperationContext* txn) { Status WiredTigerRecordStore::insertRecords(OperationContext* txn, std::vector<Record>* records, bool enforceQuota) { + return _insertRecords(txn, records->data(), records->size()); +} + +Status WiredTigerRecordStore::_insertRecords(OperationContext* txn, + Record* records, + size_t nRecords) { // We are kind of cheating on capped collections since we write all of them at once .... // Simplest way out would be to just block vector writes for everything except oplog ? - int totalLength = 0; - for (auto& record : *records) - totalLength += record.data.size(); + int64_t totalLength = 0; + for (size_t i = 0; i < nRecords; i++) + totalLength += records[i].data.size(); // caller will retry one element at a time if (_isCapped && totalLength > _cappedMaxSize) @@ -1277,8 +1283,9 @@ Status WiredTigerRecordStore::insertRecords(OperationContext* txn, invariant(c); RecordId highestId = RecordId(); - dassert(!records->empty()); - for (auto& record : *records) { + dassert(nRecords != 0); + for (size_t i = 0; i < nRecords; i++) { + auto& record = records[i]; if (_useOplogHack) { StatusWith<RecordId> status = oploghack::extractKey(record.data.data(), record.data.size()); @@ -1302,7 +1309,8 @@ Status WiredTigerRecordStore::insertRecords(OperationContext* txn, _oplog_highestSeen = highestId; } - for (auto& record : *records) { + for (size_t i = 0; i < nRecords; i++) { + auto& record = records[i]; c->set_key(c, _makeKey(record.id)); WiredTigerItem value(record.data.data(), record.data.size()); c->set_value(c, value.Get()); @@ -1311,12 +1319,11 @@ Status WiredTigerRecordStore::insertRecords(OperationContext* txn, return wtRCToStatus(ret, "WiredTigerRecordStore::insertRecord"); } - _changeNumRecords(txn, records->size()); + _changeNumRecords(txn, nRecords); _increaseDataSize(txn, totalLength); if (_oplogStones) { - _oplogStones->updateCurrentStoneAfterInsertOnCommit( - txn, totalLength, highestId, records->size()); + _oplogStones->updateCurrentStoneAfterInsertOnCommit(txn, totalLength, highestId, nRecords); } else { cappedDeleteAsNeeded(txn, highestId); } @@ -1328,13 +1335,11 @@ StatusWith<RecordId> WiredTigerRecordStore::insertRecord(OperationContext* txn, const char* data, int len, bool enforceQuota) { - std::vector<Record> records; Record record = {RecordId(), RecordData(data, len)}; - records.push_back(record); - Status status = insertRecords(txn, &records, enforceQuota); + Status status = _insertRecords(txn, &record, 1); if (!status.isOK()) return StatusWith<RecordId>(status); - return StatusWith<RecordId>(records[0].id); + return StatusWith<RecordId>(record.id); } void WiredTigerRecordStore::_dealtWithCappedId(SortedRecordIds::iterator it) { @@ -1356,15 +1361,43 @@ RecordId WiredTigerRecordStore::lowestCappedHiddenRecord() const { return _uncommittedRecordIds.empty() ? RecordId() : _uncommittedRecordIds.front(); } -StatusWith<RecordId> WiredTigerRecordStore::insertRecord(OperationContext* txn, - const DocWriter* doc, - bool enforceQuota) { - const int len = doc->documentSize(); +Status WiredTigerRecordStore::insertRecordsWithDocWriter(OperationContext* txn, + const DocWriter* const* docs, + size_t nDocs, + RecordId* idsOut) { + std::unique_ptr<Record[]> records(new Record[nDocs]); + + // First get all the sizes so we can allocate a single buffer for all documents. Eventually it + // would be nice if we could either hand off the buffers to WT without copying or write them + // in-place as we do with MMAPv1, but for now this is the best we can do. + size_t totalSize = 0; + for (size_t i = 0; i < nDocs; i++) { + const size_t docSize = docs[i]->documentSize(); + records[i].data = RecordData(nullptr, docSize); // We fill in the real ptr in next loop. + totalSize += docSize; + } - std::unique_ptr<char[]> buf(new char[len]); - doc->writeDocument(buf.get()); + std::unique_ptr<char[]> buffer(new char[totalSize]); + char* pos = buffer.get(); + for (size_t i = 0; i < nDocs; i++) { + docs[i]->writeDocument(pos); + const size_t size = records[i].data.size(); + records[i].data = RecordData(pos, size); + pos += size; + } + invariant(pos == (buffer.get() + totalSize)); + + Status s = _insertRecords(txn, records.get(), nDocs); + if (!s.isOK()) + return s; + + if (idsOut) { + for (size_t i = 0; i < nDocs; i++) { + idsOut[i] = records[i].id; + } + } - return insertRecord(txn, buf.get(), len, enforceQuota); + return s; } Status WiredTigerRecordStore::updateRecord(OperationContext* txn, diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h index 67e64464c03..6200196cb02 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h @@ -127,9 +127,10 @@ public: int len, bool enforceQuota); - virtual StatusWith<RecordId> insertRecord(OperationContext* txn, - const DocWriter* doc, - bool enforceQuota); + virtual Status insertRecordsWithDocWriter(OperationContext* txn, + const DocWriter* const* docs, + size_t nDocs, + RecordId* idsOut); virtual Status updateRecord(OperationContext* txn, const RecordId& oldLocation, @@ -257,6 +258,8 @@ private: void _dealtWithCappedId(SortedRecordIds::iterator it); void _addUncommitedRecordId_inlock(OperationContext* txn, const RecordId& id); + Status _insertRecords(OperationContext* txn, Record* records, size_t nRecords); + RecordId _nextId(); void _setId(RecordId id); bool cappedAndNeedDelete() const; |