summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/collection_bulk_loader_impl.cpp
diff options
context:
space:
mode:
authorAllison Easton <allison.easton@mongodb.com>2019-07-12 14:58:28 -0400
committerAllison Easton <allison.easton@mongodb.com>2019-07-12 14:58:28 -0400
commit49b66d842d58eba32c72bb9ef8f3cd6fd7d0c9ed (patch)
tree987f04224da223f3e903b2fa021113c4e7dda98c /src/mongo/db/repl/collection_bulk_loader_impl.cpp
parent4bdf1fd63dc793a6d36d1f9da660c9747478206b (diff)
downloadmongo-49b66d842d58eba32c72bb9ef8f3cd6fd7d0c9ed.tar.gz
SERVER-41530 For uncapped collections, CollectionBulkLoaderImpl::insertDocuments
should batch the documents and commit the batch in a single WriteUnitOfWork
Diffstat (limited to 'src/mongo/db/repl/collection_bulk_loader_impl.cpp')
-rw-r--r--src/mongo/db/repl/collection_bulk_loader_impl.cpp116
1 files changed, 74 insertions, 42 deletions
diff --git a/src/mongo/db/repl/collection_bulk_loader_impl.cpp b/src/mongo/db/repl/collection_bulk_loader_impl.cpp
index 40471bb6e0a..393f41f11f2 100644
--- a/src/mongo/db/repl/collection_bulk_loader_impl.cpp
+++ b/src/mongo/db/repl/collection_bulk_loader_impl.cpp
@@ -44,6 +44,7 @@
#include "mongo/db/jsobj.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/repl/collection_bulk_loader_impl.h"
+#include "mongo/db/repl/repl_server_parameters_gen.h"
#include "mongo/util/destructor_guard.h"
#include "mongo/util/log.h"
#include "mongo/util/scopeguard.h"
@@ -113,58 +114,89 @@ Status CollectionBulkLoaderImpl::init(const std::vector<BSONObj>& secondaryIndex
});
}
-Status CollectionBulkLoaderImpl::insertDocuments(const std::vector<BSONObj>::const_iterator begin,
- const std::vector<BSONObj>::const_iterator end) {
- return _runTaskReleaseResourcesOnFailure([&] {
- UnreplicatedWritesBlock uwb(_opCtx.get());
-
- for (auto iter = begin; iter != end; ++iter) {
- boost::optional<RecordId> loc;
- const auto& doc = *iter;
- Status status = writeConflictRetry(
- _opCtx.get(), "CollectionBulkLoaderImpl::insertDocuments", _nss.ns(), [&] {
- WriteUnitOfWork wunit(_opCtx.get());
- if (_idIndexBlock || _secondaryIndexesBlock) {
- auto onRecordInserted = [&](const RecordId& location) {
- loc = location;
- return Status::OK();
- };
- // This version of insert will not update any indexes.
- const auto status = _autoColl->getCollection()->insertDocumentForBulkLoader(
- _opCtx.get(), doc, onRecordInserted);
- if (!status.isOK()) {
- return status;
- }
- } else {
- // For capped collections, we use regular insertDocument, which will update
- // pre-existing indexes.
- const auto status = _autoColl->getCollection()->insertDocument(
- _opCtx.get(), InsertStatement(doc), nullptr);
- if (!status.isOK()) {
- return status;
- }
+Status CollectionBulkLoaderImpl::_insertDocumentsForUncappedCollection(
+ const std::vector<BSONObj>::const_iterator begin,
+ const std::vector<BSONObj>::const_iterator end) {
+ auto iter = begin;
+ while (iter != end) {
+ std::vector<RecordId> locs;
+ Status status = writeConflictRetry(
+ _opCtx.get(), "CollectionBulkLoaderImpl/insertDocomuntsUncapped", _nss.ns(), [&] {
+ WriteUnitOfWork wunit(_opCtx.get());
+ auto insertIter = iter;
+ int bytesInBlock = 0;
+ locs.clear();
+
+ auto onRecordInserted = [&](const RecordId& location) {
+ locs.emplace_back(location);
+ return Status::OK();
+ };
+
+ while (insertIter != end && bytesInBlock < collectionBulkLoaderBatchSizeInBytes) {
+ const auto& doc = *insertIter++;
+ bytesInBlock += doc.objsize();
+ // This version of insert will not update any indexes.
+ const auto status = _autoColl->getCollection()->insertDocumentForBulkLoader(
+ _opCtx.get(), doc, onRecordInserted);
+ if (!status.isOK()) {
+ return status;
}
+ }
- wunit.commit();
+ wunit.commit();
+ return Status::OK();
+ });
- return Status::OK();
- });
+ if (!status.isOK()) {
+ return status;
+ }
+ // Inserts index entries into the external sorter. This will not update
+ // pre-existing indexes.
+ for (size_t index = 0; index < locs.size(); ++index) {
+ status = _addDocumentToIndexBlocks(*iter++, locs.at(index));
if (!status.isOK()) {
return status;
}
+ }
+ }
+ return Status::OK();
+}
- if (loc) {
- // Inserts index entries into the external sorter. This will not update
- // pre-existing indexes.
- status = _addDocumentToIndexBlocks(doc, loc.get());
- }
+Status CollectionBulkLoaderImpl::_insertDocumentsForCappedCollection(
+ const std::vector<BSONObj>::const_iterator begin,
+ const std::vector<BSONObj>::const_iterator end) {
+ for (auto iter = begin; iter != end; ++iter) {
+ const auto& doc = *iter;
+ Status status = writeConflictRetry(
+ _opCtx.get(), "CollectionBulkLoaderImpl/insertDocumentsCapped", _nss.ns(), [&] {
+ WriteUnitOfWork wunit(_opCtx.get());
+ // For capped collections, we use regular insertDocument, which
+ // will update pre-existing indexes.
+ const auto status = _autoColl->getCollection()->insertDocument(
+ _opCtx.get(), InsertStatement(doc), nullptr);
+ if (!status.isOK()) {
+ return status;
+ }
+ wunit.commit();
+ return Status::OK();
+ });
+ if (!status.isOK()) {
+ return status;
+ }
+ }
+ return Status::OK();
+}
- if (!status.isOK()) {
- return status;
- }
+Status CollectionBulkLoaderImpl::insertDocuments(const std::vector<BSONObj>::const_iterator begin,
+ const std::vector<BSONObj>::const_iterator end) {
+ return _runTaskReleaseResourcesOnFailure([&] {
+ UnreplicatedWritesBlock uwb(_opCtx.get());
+ if (_idIndexBlock || _secondaryIndexesBlock) {
+ return _insertDocumentsForUncappedCollection(begin, end);
+ } else {
+ return _insertDocumentsForCappedCollection(begin, end);
}
- return Status::OK();
});
}