diff options
Diffstat (limited to 'src')
19 files changed, 227 insertions, 134 deletions
diff --git a/src/mongo/db/catalog/collection.cpp b/src/mongo/db/catalog/collection.cpp index 40c2cd100a6..84c596c25ef 100644 --- a/src/mongo/db/catalog/collection.cpp +++ b/src/mongo/db/catalog/collection.cpp @@ -324,24 +324,25 @@ StatusWithMatchExpression Collection::parseValidator(const BSONObj& validator) c return statusWithMatcher; } -Status Collection::insertDocument(OperationContext* txn, const DocWriter* doc, bool enforceQuota) { - invariant(!_validator || documentValidationDisabled(txn)); +Status Collection::insertDocumentsForOplog(OperationContext* txn, + const DocWriter* const* docs, + size_t nDocs) { dassert(txn->lockState()->isCollectionLockedForMode(ns().toString(), MODE_IX)); - invariant(!_indexCatalog.haveAnyIndexes()); // eventually can implement, just not done - if (_mustTakeCappedLockOnInsert) - synchronizeOnCappedInFlightResource(txn->lockState(), _ns); - - StatusWith<RecordId> loc = _recordStore->insertRecord(txn, doc, _enforceQuota(enforceQuota)); - if (!loc.isOK()) - return loc.getStatus(); + // Since this is only for the OpLog, we can assume these for simplicity. + // This also means that we do not need to forward this object to the OpObserver, which is good + // because it would defeat the purpose of using DocWriter. + invariant(!_validator); + invariant(!_indexCatalog.haveAnyIndexes()); + invariant(!_mustTakeCappedLockOnInsert); - // we cannot call into the OpObserver here because the document being written is not present - // fortunately, this is currently only used for adding entries to the oplog. + Status status = _recordStore->insertRecordsWithDocWriter(txn, docs, nDocs); + if (!status.isOK()) + return status; txn->recoveryUnit()->onCommit([this]() { notifyCappedWaitersIfNeeded(); }); - return loc.getStatus(); + return status; } @@ -439,7 +440,8 @@ Status Collection::_insertDocuments(OperationContext* txn, OpDebug* opDebug) { dassert(txn->lockState()->isCollectionLockedForMode(ns().toString(), MODE_IX)); - if (isCapped() && _indexCatalog.haveAnyIndexes() && std::distance(begin, end) > 1) { + const size_t count = std::distance(begin, end); + if (isCapped() && _indexCatalog.haveAnyIndexes() && count > 1) { // We require that inserts to indexed capped collections be done one-at-a-time to avoid the // possibility that a later document causes an earlier document to be deleted before it can // be indexed. @@ -456,6 +458,7 @@ Status Collection::_insertDocuments(OperationContext* txn, } std::vector<Record> records; + records.reserve(count); for (auto it = begin; it != end; it++) { Record record = {RecordId(), RecordData(it->objdata(), it->objsize())}; records.push_back(record); @@ -465,6 +468,7 @@ Status Collection::_insertDocuments(OperationContext* txn, return status; std::vector<BsonRecord> bsonRecords; + bsonRecords.reserve(count); int recordIndex = 0; for (auto it = begin; it != end; it++) { RecordId loc = records[recordIndex++].id; diff --git a/src/mongo/db/catalog/collection.h b/src/mongo/db/catalog/collection.h index f5e4e0fe700..3f8c51e6756 100644 --- a/src/mongo/db/catalog/collection.h +++ b/src/mongo/db/catalog/collection.h @@ -293,7 +293,9 @@ public: * Callers must ensure no document validation is performed for this collection when calling * this method. */ - Status insertDocument(OperationContext* txn, const DocWriter* doc, bool enforceQuota); + Status insertDocumentsForOplog(OperationContext* txn, + const DocWriter* const* docs, + size_t nDocs); Status insertDocument(OperationContext* txn, const BSONObj& doc, diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index 2566562b1da..a58de781952 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -375,8 +375,7 @@ void _logOpsInner(OperationContext* txn, // we jump through a bunch of hoops here to avoid copying the obj buffer twice -- // instead we do a single copy to the destination in the record store. - for (size_t i = 0; i < nWriters; i++) - checkOplogInsert(oplogCollection->insertDocument(txn, writers[i], false)); + checkOplogInsert(oplogCollection->insertDocumentsForOplog(txn, writers, nWriters)); // Set replCoord last optime only after we're sure the WUOW didn't abort and roll back. if (updateReplOpTime) { diff --git a/src/mongo/db/storage/devnull/devnull_kv_engine.cpp b/src/mongo/db/storage/devnull/devnull_kv_engine.cpp index 65a16414dcc..2b1a70d34a1 100644 --- a/src/mongo/db/storage/devnull/devnull_kv_engine.cpp +++ b/src/mongo/db/storage/devnull/devnull_kv_engine.cpp @@ -104,11 +104,17 @@ public: return StatusWith<RecordId>(RecordId(6, 4)); } - virtual StatusWith<RecordId> insertRecord(OperationContext* txn, - const DocWriter* doc, - bool enforceQuota) { - _numInserts++; - return StatusWith<RecordId>(RecordId(6, 4)); + virtual Status insertRecordsWithDocWriter(OperationContext* txn, + const DocWriter* const* docs, + size_t nDocs, + RecordId* idsOut) { + _numInserts += nDocs; + if (idsOut) { + for (size_t i = 0; i < nDocs; i++) { + idsOut[i] = RecordId(6, 4); + } + } + return Status::OK(); } virtual Status updateRecord(OperationContext* txn, diff --git a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_record_store.cpp b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_record_store.cpp index ddee4d85a75..6969242dd73 100644 --- a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_record_store.cpp +++ b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_record_store.cpp @@ -398,36 +398,42 @@ StatusWith<RecordId> EphemeralForTestRecordStore::insertRecord(OperationContext* return StatusWith<RecordId>(loc); } -StatusWith<RecordId> EphemeralForTestRecordStore::insertRecord(OperationContext* txn, - const DocWriter* doc, - bool enforceQuota) { - const int len = doc->documentSize(); - if (_isCapped && len > _cappedMaxSize) { - // We use dataSize for capped rollover and we don't want to delete everything if we know - // this won't fit. - return StatusWith<RecordId>(ErrorCodes::BadValue, "object to insert exceeds cappedMaxSize"); - } +Status EphemeralForTestRecordStore::insertRecordsWithDocWriter(OperationContext* txn, + const DocWriter* const* docs, + size_t nDocs, + RecordId* idsOut) { + for (size_t i = 0; i < nDocs; i++) { + const int len = docs[i]->documentSize(); + if (_isCapped && len > _cappedMaxSize) { + // We use dataSize for capped rollover and we don't want to delete everything if we know + // this won't fit. + return Status(ErrorCodes::BadValue, "object to insert exceeds cappedMaxSize"); + } - EphemeralForTestRecord rec(len); - doc->writeDocument(rec.data.get()); + EphemeralForTestRecord rec(len); + docs[i]->writeDocument(rec.data.get()); + + RecordId loc; + if (_data->isOplog) { + StatusWith<RecordId> status = extractAndCheckLocForOplog(rec.data.get(), len); + if (!status.isOK()) + return status.getStatus(); + loc = status.getValue(); + } else { + loc = allocateLoc(); + } - RecordId loc; - if (_data->isOplog) { - StatusWith<RecordId> status = extractAndCheckLocForOplog(rec.data.get(), len); - if (!status.isOK()) - return status; - loc = status.getValue(); - } else { - loc = allocateLoc(); - } + txn->recoveryUnit()->registerChange(new InsertChange(_data, loc)); + _data->dataSize += len; + _data->records[loc] = rec; - txn->recoveryUnit()->registerChange(new InsertChange(_data, loc)); - _data->dataSize += len; - _data->records[loc] = rec; + cappedDeleteAsNeeded(txn); - cappedDeleteAsNeeded(txn); + if (idsOut) + idsOut[i] = loc; + } - return StatusWith<RecordId>(loc); + return Status::OK(); } Status EphemeralForTestRecordStore::updateRecord(OperationContext* txn, diff --git a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_record_store.h b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_record_store.h index 8df67ece618..905fbca7806 100644 --- a/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_record_store.h +++ b/src/mongo/db/storage/ephemeral_for_test/ephemeral_for_test_record_store.h @@ -65,9 +65,10 @@ public: int len, bool enforceQuota); - virtual StatusWith<RecordId> insertRecord(OperationContext* txn, - const DocWriter* doc, - bool enforceQuota); + virtual Status insertRecordsWithDocWriter(OperationContext* txn, + const DocWriter* const* docs, + size_t nDocs, + RecordId* idsOut); virtual Status updateRecord(OperationContext* txn, const RecordId& oldLocation, diff --git a/src/mongo/db/storage/mmap_v1/btree/btree_logic.cpp b/src/mongo/db/storage/mmap_v1/btree/btree_logic.cpp index fe33bf86d11..ff42dd15c16 100644 --- a/src/mongo/db/storage/mmap_v1/btree/btree_logic.cpp +++ b/src/mongo/db/storage/mmap_v1/btree/btree_logic.cpp @@ -1822,7 +1822,7 @@ void BtreeLogic<BtreeLayout>::split(OperationContext* txn, } } -class DummyDocWriter : public DocWriter { +class DummyDocWriter final : public DocWriter { public: DummyDocWriter(size_t sz) : _sz(sz) {} virtual void writeDocument(char* buf) const { /* no-op */ @@ -1848,7 +1848,7 @@ Status BtreeLogic<BtreeLayout>::initAsEmpty(OperationContext* txn) { template <class BtreeLayout> DiskLoc BtreeLogic<BtreeLayout>::_addBucket(OperationContext* txn) { DummyDocWriter docWriter(BtreeLayout::BucketSize); - StatusWith<RecordId> loc = _recordStore->insertRecord(txn, &docWriter, false); + StatusWith<RecordId> loc = _recordStore->insertRecordWithDocWriter(txn, &docWriter); // XXX: remove this(?) or turn into massert or sanely bubble it back up. uassertStatusOK(loc.getStatus()); diff --git a/src/mongo/db/storage/mmap_v1/heap_record_store_btree.cpp b/src/mongo/db/storage/mmap_v1/heap_record_store_btree.cpp index 934f9807628..123a94fdeb9 100644 --- a/src/mongo/db/storage/mmap_v1/heap_record_store_btree.cpp +++ b/src/mongo/db/storage/mmap_v1/heap_record_store_btree.cpp @@ -78,18 +78,25 @@ StatusWith<RecordId> HeapRecordStoreBtree::insertRecord(OperationContext* txn, return StatusWith<RecordId>(loc); } -StatusWith<RecordId> HeapRecordStoreBtree::insertRecord(OperationContext* txn, - const DocWriter* doc, - bool enforceQuota) { - MmapV1RecordHeader rec(doc->documentSize()); - doc->writeDocument(rec.data.get()); +Status HeapRecordStoreBtree::insertRecordsWithDocWriter(OperationContext* txn, + const DocWriter* const* docs, + size_t nDocs, + RecordId* idsOut) { + // This class is only for unit tests of the mmapv1 btree code and this is how it is called. + // If that ever changes, this class will need to be fixed. + invariant(nDocs == 1); + invariant(idsOut); + + MmapV1RecordHeader rec(docs[0]->documentSize()); + docs[0]->writeDocument(rec.data.get()); const RecordId loc = allocateLoc(); _records[loc] = rec; + *idsOut = loc; HeapRecordStoreBtreeRecoveryUnit::notifyInsert(txn, this, loc); - return StatusWith<RecordId>(loc); + return Status::OK(); } RecordId HeapRecordStoreBtree::allocateLoc() { diff --git a/src/mongo/db/storage/mmap_v1/heap_record_store_btree.h b/src/mongo/db/storage/mmap_v1/heap_record_store_btree.h index 32dec97358e..744b87b00ca 100644 --- a/src/mongo/db/storage/mmap_v1/heap_record_store_btree.h +++ b/src/mongo/db/storage/mmap_v1/heap_record_store_btree.h @@ -60,9 +60,10 @@ public: int len, bool enforceQuota); - virtual StatusWith<RecordId> insertRecord(OperationContext* txn, - const DocWriter* doc, - bool enforceQuota); + virtual Status insertRecordsWithDocWriter(OperationContext* txn, + const DocWriter* const* docs, + size_t nDocs, + RecordId* idsOut); virtual long long numRecords(OperationContext* txn) const { return _records.size(); diff --git a/src/mongo/db/storage/mmap_v1/record_store_v1_base.cpp b/src/mongo/db/storage/mmap_v1/record_store_v1_base.cpp index 790f35e98c2..57bd2640e14 100644 --- a/src/mongo/db/storage/mmap_v1/record_store_v1_base.cpp +++ b/src/mongo/db/storage/mmap_v1/record_store_v1_base.cpp @@ -294,35 +294,43 @@ DiskLoc RecordStoreV1Base::getPrevRecordInExtent(OperationContext* txn, const Di return result; } -StatusWith<RecordId> RecordStoreV1Base::insertRecord(OperationContext* txn, - const DocWriter* doc, - bool enforceQuota) { - int docSize = doc->documentSize(); - if (docSize < 4) { - return StatusWith<RecordId>(ErrorCodes::InvalidLength, "record has to be >= 4 bytes"); - } - const int lenWHdr = docSize + MmapV1RecordHeader::HeaderSize; - if (lenWHdr > MaxAllowedAllocation) { - return StatusWith<RecordId>(ErrorCodes::InvalidLength, "record has to be <= 16.5MB"); - } - const int lenToAlloc = - (doc->addPadding() && shouldPadInserts()) ? quantizeAllocationSpace(lenWHdr) : lenWHdr; +Status RecordStoreV1Base::insertRecordsWithDocWriter(OperationContext* txn, + const DocWriter* const* docs, + size_t nDocs, + RecordId* idsOut) { + for (size_t i = 0; i < nDocs; i++) { + int docSize = docs[i]->documentSize(); + if (docSize < 4) { + return Status(ErrorCodes::InvalidLength, "record has to be >= 4 bytes"); + } + const int lenWHdr = docSize + MmapV1RecordHeader::HeaderSize; + if (lenWHdr > MaxAllowedAllocation) { + return Status(ErrorCodes::InvalidLength, "record has to be <= 16.5MB"); + } + const int lenToAlloc = (docs[i]->addPadding() && shouldPadInserts()) + ? quantizeAllocationSpace(lenWHdr) + : lenWHdr; - StatusWith<DiskLoc> loc = allocRecord(txn, lenToAlloc, enforceQuota); - if (!loc.isOK()) - return StatusWith<RecordId>(loc.getStatus()); + StatusWith<DiskLoc> loc = allocRecord(txn, lenToAlloc, /*enforceQuota=*/false); + if (!loc.isOK()) + return loc.getStatus(); - MmapV1RecordHeader* r = recordFor(loc.getValue()); - fassert(17319, r->lengthWithHeaders() >= lenWHdr); + MmapV1RecordHeader* r = recordFor(loc.getValue()); + fassert(17319, r->lengthWithHeaders() >= lenWHdr); - r = reinterpret_cast<MmapV1RecordHeader*>(txn->recoveryUnit()->writingPtr(r, lenWHdr)); - doc->writeDocument(r->data()); + r = reinterpret_cast<MmapV1RecordHeader*>(txn->recoveryUnit()->writingPtr(r, lenWHdr)); + docs[i]->writeDocument(r->data()); - _addRecordToRecListInExtent(txn, r, loc.getValue()); + _addRecordToRecListInExtent(txn, r, loc.getValue()); - _details->incrementStats(txn, r->netLength(), 1); + _details->incrementStats(txn, r->netLength(), 1); - return StatusWith<RecordId>(loc.getValue().toRecordId()); + if (idsOut) + idsOut[i] = loc.getValue().toRecordId(); + } + + + return Status::OK(); } diff --git a/src/mongo/db/storage/mmap_v1/record_store_v1_base.h b/src/mongo/db/storage/mmap_v1/record_store_v1_base.h index c9ee22a40da..53b86129e13 100644 --- a/src/mongo/db/storage/mmap_v1/record_store_v1_base.h +++ b/src/mongo/db/storage/mmap_v1/record_store_v1_base.h @@ -39,7 +39,6 @@ namespace mongo { class DeletedRecord; -class DocWriter; class ExtentManager; class MmapV1RecordHeader; class OperationContext; @@ -195,9 +194,10 @@ public: int len, bool enforceQuota); - StatusWith<RecordId> insertRecord(OperationContext* txn, - const DocWriter* doc, - bool enforceQuota); + Status insertRecordsWithDocWriter(OperationContext* txn, + const DocWriter* const* docs, + size_t nDocs, + RecordId* idsOut) final; virtual Status updateRecord(OperationContext* txn, const RecordId& oldLocation, diff --git a/src/mongo/db/storage/mmap_v1/record_store_v1_simple.cpp b/src/mongo/db/storage/mmap_v1/record_store_v1_simple.cpp index 6803e5a3bc7..aa29b7f0174 100644 --- a/src/mongo/db/storage/mmap_v1/record_store_v1_simple.cpp +++ b/src/mongo/db/storage/mmap_v1/record_store_v1_simple.cpp @@ -255,7 +255,7 @@ vector<std::unique_ptr<RecordCursor>> SimpleRecordStoreV1::getManyCursors( return cursors; } -class CompactDocWriter : public DocWriter { +class CompactDocWriter final : public DocWriter { public: /** * param allocationSize - allocation size WITH header @@ -368,7 +368,7 @@ void SimpleRecordStoreV1::_compactExtent(OperationContext* txn, // start of the compact, this insert will allocate a record in a new extent. // See the comment in compact() for more details. CompactDocWriter writer(recOld, rawDataSize, allocationSize); - StatusWith<RecordId> status = insertRecord(txn, &writer, false); + StatusWith<RecordId> status = insertRecordWithDocWriter(txn, &writer); uassertStatusOK(status.getStatus()); const MmapV1RecordHeader* newRec = recordFor(DiskLoc::fromRecordId(status.getValue())); diff --git a/src/mongo/db/storage/mmap_v1/record_store_v1_simple_test.cpp b/src/mongo/db/storage/mmap_v1/record_store_v1_simple_test.cpp index e4e85168b01..573d4975fbf 100644 --- a/src/mongo/db/storage/mmap_v1/record_store_v1_simple_test.cpp +++ b/src/mongo/db/storage/mmap_v1/record_store_v1_simple_test.cpp @@ -94,7 +94,7 @@ BSONObj docForRecordSize(int size) { return x; } -class BsonDocWriter : public DocWriter { +class BsonDocWriter final : public DocWriter { public: BsonDocWriter(const BSONObj& obj, bool padding) : _obj(obj), _padding(padding) {} @@ -174,7 +174,7 @@ TEST(SimpleRecordStoreV1, AllocQuantizedWithDocWriter) { SimpleRecordStoreV1 rs(&txn, myns, md, &em, false); BsonDocWriter docWriter(docForRecordSize(300), true); - StatusWith<RecordId> result = rs.insertRecord(&txn, &docWriter, false); + StatusWith<RecordId> result = rs.insertRecordWithDocWriter(&txn, &docWriter); ASSERT(result.isOK()); // The length of the allocated record is quantized. @@ -193,7 +193,7 @@ TEST(SimpleRecordStoreV1, AllocNonQuantizedDocWriter) { SimpleRecordStoreV1 rs(&txn, myns + "$x", md, &em, false); BsonDocWriter docWriter(docForRecordSize(300), false); - StatusWith<RecordId> result = rs.insertRecord(&txn, &docWriter, false); + StatusWith<RecordId> result = rs.insertRecordWithDocWriter(&txn, &docWriter); ASSERT(result.isOK()); // The length of the allocated record is not quantized. @@ -210,7 +210,7 @@ TEST(SimpleRecordStoreV1, AllocAlignedDocWriter) { SimpleRecordStoreV1 rs(&txn, myns + "$x", md, &em, false); BsonDocWriter docWriter(docForRecordSize(298), false); - StatusWith<RecordId> result = rs.insertRecord(&txn, &docWriter, false); + StatusWith<RecordId> result = rs.insertRecordWithDocWriter(&txn, &docWriter); ASSERT(result.isOK()); ASSERT_EQUALS(300, rs.dataFor(&txn, result.getValue()).size() + MmapV1RecordHeader::HeaderSize); @@ -230,7 +230,7 @@ TEST(SimpleRecordStoreV1, AllocUseQuantizedDeletedRecordWithoutSplit) { } BsonDocWriter docWriter(docForRecordSize(300), true); - StatusWith<RecordId> actualLocation = rs.insertRecord(&txn, &docWriter, false); + StatusWith<RecordId> actualLocation = rs.insertRecordWithDocWriter(&txn, &docWriter); ASSERT_OK(actualLocation.getStatus()); { @@ -255,7 +255,7 @@ TEST(SimpleRecordStoreV1, AllocUseQuantizedDeletedRecordWithSplit) { } BsonDocWriter docWriter(docForRecordSize(300), true); - StatusWith<RecordId> actualLocation = rs.insertRecord(&txn, &docWriter, false); + StatusWith<RecordId> actualLocation = rs.insertRecordWithDocWriter(&txn, &docWriter); ASSERT_OK(actualLocation.getStatus()); { @@ -280,7 +280,7 @@ TEST(SimpleRecordStoreV1, AllocUseNonQuantizedDeletedRecordWithoutSplit) { } BsonDocWriter docWriter(docForRecordSize(300), false); - StatusWith<RecordId> actualLocation = rs.insertRecord(&txn, &docWriter, false); + StatusWith<RecordId> actualLocation = rs.insertRecordWithDocWriter(&txn, &docWriter); ASSERT_OK(actualLocation.getStatus()); { @@ -305,7 +305,7 @@ TEST(SimpleRecordStoreV1, AllocUseNonQuantizedDeletedRecordWithSplit) { } BsonDocWriter docWriter(docForRecordSize(300), false); - StatusWith<RecordId> actualLocation = rs.insertRecord(&txn, &docWriter, false); + StatusWith<RecordId> actualLocation = rs.insertRecordWithDocWriter(&txn, &docWriter); ASSERT_OK(actualLocation.getStatus()); { @@ -332,7 +332,7 @@ TEST(SimpleRecordStoreV1, GrabBagIsUsed) { } BsonDocWriter docWriter(docForRecordSize(256), false); - StatusWith<RecordId> actualLocation = rs.insertRecord(&txn, &docWriter, false); + StatusWith<RecordId> actualLocation = rs.insertRecordWithDocWriter(&txn, &docWriter); ASSERT_OK(actualLocation.getStatus()); { @@ -360,7 +360,7 @@ TEST(SimpleRecordStoreV1, GrabBagIsPoppedEvenIfUnneeded) { } BsonDocWriter docWriter(docForRecordSize(1000), false); - StatusWith<RecordId> actualLocation = rs.insertRecord(&txn, &docWriter, false); + StatusWith<RecordId> actualLocation = rs.insertRecordWithDocWriter(&txn, &docWriter); ASSERT_OK(actualLocation.getStatus()); { @@ -388,7 +388,7 @@ TEST(SimpleRecordStoreV1, GrabBagIsPoppedEvenIfUnusable) { } BsonDocWriter docWriter(docForRecordSize(8 * 1024 * 1024), false); - StatusWith<RecordId> actualLocation = rs.insertRecord(&txn, &docWriter, false); + StatusWith<RecordId> actualLocation = rs.insertRecordWithDocWriter(&txn, &docWriter); ASSERT_OK(actualLocation.getStatus()); { diff --git a/src/mongo/db/storage/record_store.h b/src/mongo/db/storage/record_store.h index ea0ad7c69b7..648250148ba 100644 --- a/src/mongo/db/storage/record_store.h +++ b/src/mongo/db/storage/record_store.h @@ -45,7 +45,6 @@ class CappedCallback; class Collection; struct CompactOptions; struct CompactStats; -class DocWriter; class MAdvise; class NamespaceDetails; class OperationContext; @@ -62,12 +61,15 @@ class ValidateAdaptor; */ class DocWriter { public: - virtual ~DocWriter() {} virtual void writeDocument(char* buf) const = 0; virtual size_t documentSize() const = 0; virtual bool addPadding() const { return true; } + +protected: + // Can't delete through base pointer. + ~DocWriter() = default; }; /** @@ -370,10 +372,6 @@ public: int len, bool enforceQuota) = 0; - virtual StatusWith<RecordId> insertRecord(OperationContext* txn, - const DocWriter* doc, - bool enforceQuota) = 0; - virtual Status insertRecords(OperationContext* txn, std::vector<Record>* records, bool enforceQuota) { @@ -389,6 +387,31 @@ public: } /** + * Inserts nDocs documents into this RecordStore using the DocWriter interface. + * + * This allows the storage engine to reserve space for a record and have it built in-place + * rather than building the record then copying it into its destination. + * + * On success, if idsOut is non-null the RecordIds of the inserted records will be written into + * it. It must have space for nDocs RecordIds. + */ + virtual Status insertRecordsWithDocWriter(OperationContext* txn, + const DocWriter* const* docs, + size_t nDocs, + RecordId* idsOut = nullptr) = 0; + + /** + * A thin wrapper around insertRecordsWithDocWriter() to simplify handling of single DocWriters. + */ + StatusWith<RecordId> insertRecordWithDocWriter(OperationContext* txn, const DocWriter* doc) { + RecordId out; + Status status = insertRecordsWithDocWriter(txn, &doc, 1, &out); + if (!status.isOK()) + return status; + return out; + } + + /** * @param notifier - Only used by record stores which do not support doc-locking. Called only * in the case of an in-place update. Called just before the in-place write * occurs. diff --git a/src/mongo/db/storage/record_store_test_docwriter.h b/src/mongo/db/storage/record_store_test_docwriter.h index b6032f2509a..f788c8a2541 100644 --- a/src/mongo/db/storage/record_store_test_docwriter.h +++ b/src/mongo/db/storage/record_store_test_docwriter.h @@ -35,7 +35,7 @@ namespace mongo { namespace { -class StringDocWriter : public DocWriter { +class StringDocWriter final : public DocWriter { public: StringDocWriter(const std::string& data, bool padding) : _data(data), _padding(padding) {} diff --git a/src/mongo/db/storage/record_store_test_harness.cpp b/src/mongo/db/storage/record_store_test_harness.cpp index 8fc234ab9cd..75561e1429b 100644 --- a/src/mongo/db/storage/record_store_test_harness.cpp +++ b/src/mongo/db/storage/record_store_test_harness.cpp @@ -97,7 +97,7 @@ TEST(RecordStoreTestHarness, Simple1) { } namespace { -class DummyDocWriter : public DocWriter { +class DummyDocWriter final : public DocWriter { public: virtual ~DummyDocWriter() {} virtual void writeDocument(char* buf) const { @@ -125,7 +125,7 @@ TEST(RecordStoreTestHarness, Simple1InsertDocWroter) { { WriteUnitOfWork uow(opCtx.get()); DummyDocWriter dw; - StatusWith<RecordId> res = rs->insertRecord(opCtx.get(), &dw, false); + StatusWith<RecordId> res = rs->insertRecordWithDocWriter(opCtx.get(), &dw); ASSERT_OK(res.getStatus()); loc1 = res.getValue(); uow.commit(); diff --git a/src/mongo/db/storage/record_store_test_insertrecord.cpp b/src/mongo/db/storage/record_store_test_insertrecord.cpp index 4f62256c8e1..03f8bcebc7a 100644 --- a/src/mongo/db/storage/record_store_test_insertrecord.cpp +++ b/src/mongo/db/storage/record_store_test_insertrecord.cpp @@ -127,7 +127,7 @@ TEST(RecordStoreTestHarness, InsertRecordUsingDocWriter) { StringDocWriter docWriter("my record", false); WriteUnitOfWork uow(opCtx.get()); - StatusWith<RecordId> res = rs->insertRecord(opCtx.get(), &docWriter, false); + StatusWith<RecordId> res = rs->insertRecordWithDocWriter(opCtx.get(), &docWriter); ASSERT_OK(res.getStatus()); loc = res.getValue(); uow.commit(); @@ -161,7 +161,7 @@ TEST(RecordStoreTestHarness, InsertMultipleRecordsUsingDocWriter) { StringDocWriter docWriter(ss.str(), false); WriteUnitOfWork uow(opCtx.get()); - StatusWith<RecordId> res = rs->insertRecord(opCtx.get(), &docWriter, false); + StatusWith<RecordId> res = rs->insertRecordWithDocWriter(opCtx.get(), &docWriter); ASSERT_OK(res.getStatus()); locs[i] = res.getValue(); uow.commit(); diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp index 87fd30bb27d..ef1442342a0 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.cpp @@ -1261,11 +1261,17 @@ void WiredTigerRecordStore::reclaimOplog(OperationContext* txn) { Status WiredTigerRecordStore::insertRecords(OperationContext* txn, std::vector<Record>* records, bool enforceQuota) { + return _insertRecords(txn, records->data(), records->size()); +} + +Status WiredTigerRecordStore::_insertRecords(OperationContext* txn, + Record* records, + size_t nRecords) { // We are kind of cheating on capped collections since we write all of them at once .... // Simplest way out would be to just block vector writes for everything except oplog ? - int totalLength = 0; - for (auto& record : *records) - totalLength += record.data.size(); + int64_t totalLength = 0; + for (size_t i = 0; i < nRecords; i++) + totalLength += records[i].data.size(); // caller will retry one element at a time if (_isCapped && totalLength > _cappedMaxSize) @@ -1277,8 +1283,9 @@ Status WiredTigerRecordStore::insertRecords(OperationContext* txn, invariant(c); RecordId highestId = RecordId(); - dassert(!records->empty()); - for (auto& record : *records) { + dassert(nRecords != 0); + for (size_t i = 0; i < nRecords; i++) { + auto& record = records[i]; if (_useOplogHack) { StatusWith<RecordId> status = oploghack::extractKey(record.data.data(), record.data.size()); @@ -1302,7 +1309,8 @@ Status WiredTigerRecordStore::insertRecords(OperationContext* txn, _oplog_highestSeen = highestId; } - for (auto& record : *records) { + for (size_t i = 0; i < nRecords; i++) { + auto& record = records[i]; c->set_key(c, _makeKey(record.id)); WiredTigerItem value(record.data.data(), record.data.size()); c->set_value(c, value.Get()); @@ -1311,12 +1319,11 @@ Status WiredTigerRecordStore::insertRecords(OperationContext* txn, return wtRCToStatus(ret, "WiredTigerRecordStore::insertRecord"); } - _changeNumRecords(txn, records->size()); + _changeNumRecords(txn, nRecords); _increaseDataSize(txn, totalLength); if (_oplogStones) { - _oplogStones->updateCurrentStoneAfterInsertOnCommit( - txn, totalLength, highestId, records->size()); + _oplogStones->updateCurrentStoneAfterInsertOnCommit(txn, totalLength, highestId, nRecords); } else { cappedDeleteAsNeeded(txn, highestId); } @@ -1328,13 +1335,11 @@ StatusWith<RecordId> WiredTigerRecordStore::insertRecord(OperationContext* txn, const char* data, int len, bool enforceQuota) { - std::vector<Record> records; Record record = {RecordId(), RecordData(data, len)}; - records.push_back(record); - Status status = insertRecords(txn, &records, enforceQuota); + Status status = _insertRecords(txn, &record, 1); if (!status.isOK()) return StatusWith<RecordId>(status); - return StatusWith<RecordId>(records[0].id); + return StatusWith<RecordId>(record.id); } void WiredTigerRecordStore::_dealtWithCappedId(SortedRecordIds::iterator it) { @@ -1356,15 +1361,43 @@ RecordId WiredTigerRecordStore::lowestCappedHiddenRecord() const { return _uncommittedRecordIds.empty() ? RecordId() : _uncommittedRecordIds.front(); } -StatusWith<RecordId> WiredTigerRecordStore::insertRecord(OperationContext* txn, - const DocWriter* doc, - bool enforceQuota) { - const int len = doc->documentSize(); +Status WiredTigerRecordStore::insertRecordsWithDocWriter(OperationContext* txn, + const DocWriter* const* docs, + size_t nDocs, + RecordId* idsOut) { + std::unique_ptr<Record[]> records(new Record[nDocs]); + + // First get all the sizes so we can allocate a single buffer for all documents. Eventually it + // would be nice if we could either hand off the buffers to WT without copying or write them + // in-place as we do with MMAPv1, but for now this is the best we can do. + size_t totalSize = 0; + for (size_t i = 0; i < nDocs; i++) { + const size_t docSize = docs[i]->documentSize(); + records[i].data = RecordData(nullptr, docSize); // We fill in the real ptr in next loop. + totalSize += docSize; + } - std::unique_ptr<char[]> buf(new char[len]); - doc->writeDocument(buf.get()); + std::unique_ptr<char[]> buffer(new char[totalSize]); + char* pos = buffer.get(); + for (size_t i = 0; i < nDocs; i++) { + docs[i]->writeDocument(pos); + const size_t size = records[i].data.size(); + records[i].data = RecordData(pos, size); + pos += size; + } + invariant(pos == (buffer.get() + totalSize)); + + Status s = _insertRecords(txn, records.get(), nDocs); + if (!s.isOK()) + return s; + + if (idsOut) { + for (size_t i = 0; i < nDocs; i++) { + idsOut[i] = records[i].id; + } + } - return insertRecord(txn, buf.get(), len, enforceQuota); + return s; } Status WiredTigerRecordStore::updateRecord(OperationContext* txn, diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h index 67e64464c03..6200196cb02 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_record_store.h @@ -127,9 +127,10 @@ public: int len, bool enforceQuota); - virtual StatusWith<RecordId> insertRecord(OperationContext* txn, - const DocWriter* doc, - bool enforceQuota); + virtual Status insertRecordsWithDocWriter(OperationContext* txn, + const DocWriter* const* docs, + size_t nDocs, + RecordId* idsOut); virtual Status updateRecord(OperationContext* txn, const RecordId& oldLocation, @@ -257,6 +258,8 @@ private: void _dealtWithCappedId(SortedRecordIds::iterator it); void _addUncommitedRecordId_inlock(OperationContext* txn, const RecordId& id); + Status _insertRecords(OperationContext* txn, Record* records, size_t nRecords); + RecordId _nextId(); void _setId(RecordId id); bool cappedAndNeedDelete() const; |