diff options
author | Allison Easton <allison.easton@mongodb.com> | 2019-07-12 14:58:28 -0400 |
---|---|---|
committer | Allison Easton <allison.easton@mongodb.com> | 2019-07-12 14:58:28 -0400 |
commit | 49b66d842d58eba32c72bb9ef8f3cd6fd7d0c9ed (patch) | |
tree | 987f04224da223f3e903b2fa021113c4e7dda98c /src/mongo/db/repl/collection_bulk_loader_impl.cpp | |
parent | 4bdf1fd63dc793a6d36d1f9da660c9747478206b (diff) | |
download | mongo-49b66d842d58eba32c72bb9ef8f3cd6fd7d0c9ed.tar.gz |
SERVER-41530 For uncapped collections, CollectionBulkLoaderImpl::insertDocuments
should batch the documents and commit the batch in a single WriteUnitOfWork
Diffstat (limited to 'src/mongo/db/repl/collection_bulk_loader_impl.cpp')
-rw-r--r-- | src/mongo/db/repl/collection_bulk_loader_impl.cpp | 116 |
1 files changed, 74 insertions, 42 deletions
diff --git a/src/mongo/db/repl/collection_bulk_loader_impl.cpp b/src/mongo/db/repl/collection_bulk_loader_impl.cpp index 40471bb6e0a..393f41f11f2 100644 --- a/src/mongo/db/repl/collection_bulk_loader_impl.cpp +++ b/src/mongo/db/repl/collection_bulk_loader_impl.cpp @@ -44,6 +44,7 @@ #include "mongo/db/jsobj.h" #include "mongo/db/operation_context.h" #include "mongo/db/repl/collection_bulk_loader_impl.h" +#include "mongo/db/repl/repl_server_parameters_gen.h" #include "mongo/util/destructor_guard.h" #include "mongo/util/log.h" #include "mongo/util/scopeguard.h" @@ -113,58 +114,89 @@ Status CollectionBulkLoaderImpl::init(const std::vector<BSONObj>& secondaryIndex }); } -Status CollectionBulkLoaderImpl::insertDocuments(const std::vector<BSONObj>::const_iterator begin, - const std::vector<BSONObj>::const_iterator end) { - return _runTaskReleaseResourcesOnFailure([&] { - UnreplicatedWritesBlock uwb(_opCtx.get()); - - for (auto iter = begin; iter != end; ++iter) { - boost::optional<RecordId> loc; - const auto& doc = *iter; - Status status = writeConflictRetry( - _opCtx.get(), "CollectionBulkLoaderImpl::insertDocuments", _nss.ns(), [&] { - WriteUnitOfWork wunit(_opCtx.get()); - if (_idIndexBlock || _secondaryIndexesBlock) { - auto onRecordInserted = [&](const RecordId& location) { - loc = location; - return Status::OK(); - }; - // This version of insert will not update any indexes. - const auto status = _autoColl->getCollection()->insertDocumentForBulkLoader( - _opCtx.get(), doc, onRecordInserted); - if (!status.isOK()) { - return status; - } - } else { - // For capped collections, we use regular insertDocument, which will update - // pre-existing indexes. - const auto status = _autoColl->getCollection()->insertDocument( - _opCtx.get(), InsertStatement(doc), nullptr); - if (!status.isOK()) { - return status; - } +Status CollectionBulkLoaderImpl::_insertDocumentsForUncappedCollection( + const std::vector<BSONObj>::const_iterator begin, + const std::vector<BSONObj>::const_iterator end) { + auto iter = begin; + while (iter != end) { + std::vector<RecordId> locs; + Status status = writeConflictRetry( + _opCtx.get(), "CollectionBulkLoaderImpl/insertDocomuntsUncapped", _nss.ns(), [&] { + WriteUnitOfWork wunit(_opCtx.get()); + auto insertIter = iter; + int bytesInBlock = 0; + locs.clear(); + + auto onRecordInserted = [&](const RecordId& location) { + locs.emplace_back(location); + return Status::OK(); + }; + + while (insertIter != end && bytesInBlock < collectionBulkLoaderBatchSizeInBytes) { + const auto& doc = *insertIter++; + bytesInBlock += doc.objsize(); + // This version of insert will not update any indexes. + const auto status = _autoColl->getCollection()->insertDocumentForBulkLoader( + _opCtx.get(), doc, onRecordInserted); + if (!status.isOK()) { + return status; } + } - wunit.commit(); + wunit.commit(); + return Status::OK(); + }); - return Status::OK(); - }); + if (!status.isOK()) { + return status; + } + // Inserts index entries into the external sorter. This will not update + // pre-existing indexes. + for (size_t index = 0; index < locs.size(); ++index) { + status = _addDocumentToIndexBlocks(*iter++, locs.at(index)); if (!status.isOK()) { return status; } + } + } + return Status::OK(); +} - if (loc) { - // Inserts index entries into the external sorter. This will not update - // pre-existing indexes. - status = _addDocumentToIndexBlocks(doc, loc.get()); - } +Status CollectionBulkLoaderImpl::_insertDocumentsForCappedCollection( + const std::vector<BSONObj>::const_iterator begin, + const std::vector<BSONObj>::const_iterator end) { + for (auto iter = begin; iter != end; ++iter) { + const auto& doc = *iter; + Status status = writeConflictRetry( + _opCtx.get(), "CollectionBulkLoaderImpl/insertDocumentsCapped", _nss.ns(), [&] { + WriteUnitOfWork wunit(_opCtx.get()); + // For capped collections, we use regular insertDocument, which + // will update pre-existing indexes. + const auto status = _autoColl->getCollection()->insertDocument( + _opCtx.get(), InsertStatement(doc), nullptr); + if (!status.isOK()) { + return status; + } + wunit.commit(); + return Status::OK(); + }); + if (!status.isOK()) { + return status; + } + } + return Status::OK(); +} - if (!status.isOK()) { - return status; - } +Status CollectionBulkLoaderImpl::insertDocuments(const std::vector<BSONObj>::const_iterator begin, + const std::vector<BSONObj>::const_iterator end) { + return _runTaskReleaseResourcesOnFailure([&] { + UnreplicatedWritesBlock uwb(_opCtx.get()); + if (_idIndexBlock || _secondaryIndexesBlock) { + return _insertDocumentsForUncappedCollection(begin, end); + } else { + return _insertDocumentsForCappedCollection(begin, end); } - return Status::OK(); }); } |