summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAllison Easton <allison.easton@mongodb.com>2019-07-12 14:58:28 -0400
committerAllison Easton <allison.easton@mongodb.com>2019-07-12 14:58:28 -0400
commit49b66d842d58eba32c72bb9ef8f3cd6fd7d0c9ed (patch)
tree987f04224da223f3e903b2fa021113c4e7dda98c
parent4bdf1fd63dc793a6d36d1f9da660c9747478206b (diff)
downloadmongo-49b66d842d58eba32c72bb9ef8f3cd6fd7d0c9ed.tar.gz
SERVER-41530 For uncapped collections, CollectionBulkLoaderImpl::insertDocuments
should batch the documents and commit the batch in a single WriteUnitOfWork
-rw-r--r--src/mongo/db/repl/SConscript1
-rw-r--r--src/mongo/db/repl/collection_bulk_loader_impl.cpp116
-rw-r--r--src/mongo/db/repl/collection_bulk_loader_impl.h14
3 files changed, 89 insertions, 42 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index 5af7dda53c9..68e102c8925 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -229,6 +229,7 @@ env.Library(
'$BUILD_DIR/mongo/db/catalog/database_holder',
'$BUILD_DIR/mongo/db/storage/oplog_cap_maintainer_thread',
'$BUILD_DIR/mongo/db/logical_clock',
+ 'repl_server_parameters',
],
)
diff --git a/src/mongo/db/repl/collection_bulk_loader_impl.cpp b/src/mongo/db/repl/collection_bulk_loader_impl.cpp
index 40471bb6e0a..393f41f11f2 100644
--- a/src/mongo/db/repl/collection_bulk_loader_impl.cpp
+++ b/src/mongo/db/repl/collection_bulk_loader_impl.cpp
@@ -44,6 +44,7 @@
#include "mongo/db/jsobj.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/repl/collection_bulk_loader_impl.h"
+#include "mongo/db/repl/repl_server_parameters_gen.h"
#include "mongo/util/destructor_guard.h"
#include "mongo/util/log.h"
#include "mongo/util/scopeguard.h"
@@ -113,58 +114,89 @@ Status CollectionBulkLoaderImpl::init(const std::vector<BSONObj>& secondaryIndex
});
}
-Status CollectionBulkLoaderImpl::insertDocuments(const std::vector<BSONObj>::const_iterator begin,
- const std::vector<BSONObj>::const_iterator end) {
- return _runTaskReleaseResourcesOnFailure([&] {
- UnreplicatedWritesBlock uwb(_opCtx.get());
-
- for (auto iter = begin; iter != end; ++iter) {
- boost::optional<RecordId> loc;
- const auto& doc = *iter;
- Status status = writeConflictRetry(
- _opCtx.get(), "CollectionBulkLoaderImpl::insertDocuments", _nss.ns(), [&] {
- WriteUnitOfWork wunit(_opCtx.get());
- if (_idIndexBlock || _secondaryIndexesBlock) {
- auto onRecordInserted = [&](const RecordId& location) {
- loc = location;
- return Status::OK();
- };
- // This version of insert will not update any indexes.
- const auto status = _autoColl->getCollection()->insertDocumentForBulkLoader(
- _opCtx.get(), doc, onRecordInserted);
- if (!status.isOK()) {
- return status;
- }
- } else {
- // For capped collections, we use regular insertDocument, which will update
- // pre-existing indexes.
- const auto status = _autoColl->getCollection()->insertDocument(
- _opCtx.get(), InsertStatement(doc), nullptr);
- if (!status.isOK()) {
- return status;
- }
+Status CollectionBulkLoaderImpl::_insertDocumentsForUncappedCollection(
+ const std::vector<BSONObj>::const_iterator begin,
+ const std::vector<BSONObj>::const_iterator end) {
+ auto iter = begin;
+ while (iter != end) {
+ std::vector<RecordId> locs;
+ Status status = writeConflictRetry(
+ _opCtx.get(), "CollectionBulkLoaderImpl/insertDocomuntsUncapped", _nss.ns(), [&] {
+ WriteUnitOfWork wunit(_opCtx.get());
+ auto insertIter = iter;
+ int bytesInBlock = 0;
+ locs.clear();
+
+ auto onRecordInserted = [&](const RecordId& location) {
+ locs.emplace_back(location);
+ return Status::OK();
+ };
+
+ while (insertIter != end && bytesInBlock < collectionBulkLoaderBatchSizeInBytes) {
+ const auto& doc = *insertIter++;
+ bytesInBlock += doc.objsize();
+ // This version of insert will not update any indexes.
+ const auto status = _autoColl->getCollection()->insertDocumentForBulkLoader(
+ _opCtx.get(), doc, onRecordInserted);
+ if (!status.isOK()) {
+ return status;
}
+ }
- wunit.commit();
+ wunit.commit();
+ return Status::OK();
+ });
- return Status::OK();
- });
+ if (!status.isOK()) {
+ return status;
+ }
+ // Inserts index entries into the external sorter. This will not update
+ // pre-existing indexes.
+ for (size_t index = 0; index < locs.size(); ++index) {
+ status = _addDocumentToIndexBlocks(*iter++, locs.at(index));
if (!status.isOK()) {
return status;
}
+ }
+ }
+ return Status::OK();
+}
- if (loc) {
- // Inserts index entries into the external sorter. This will not update
- // pre-existing indexes.
- status = _addDocumentToIndexBlocks(doc, loc.get());
- }
+Status CollectionBulkLoaderImpl::_insertDocumentsForCappedCollection(
+ const std::vector<BSONObj>::const_iterator begin,
+ const std::vector<BSONObj>::const_iterator end) {
+ for (auto iter = begin; iter != end; ++iter) {
+ const auto& doc = *iter;
+ Status status = writeConflictRetry(
+ _opCtx.get(), "CollectionBulkLoaderImpl/insertDocumentsCapped", _nss.ns(), [&] {
+ WriteUnitOfWork wunit(_opCtx.get());
+ // For capped collections, we use regular insertDocument, which
+ // will update pre-existing indexes.
+ const auto status = _autoColl->getCollection()->insertDocument(
+ _opCtx.get(), InsertStatement(doc), nullptr);
+ if (!status.isOK()) {
+ return status;
+ }
+ wunit.commit();
+ return Status::OK();
+ });
+ if (!status.isOK()) {
+ return status;
+ }
+ }
+ return Status::OK();
+}
- if (!status.isOK()) {
- return status;
- }
+Status CollectionBulkLoaderImpl::insertDocuments(const std::vector<BSONObj>::const_iterator begin,
+ const std::vector<BSONObj>::const_iterator end) {
+ return _runTaskReleaseResourcesOnFailure([&] {
+ UnreplicatedWritesBlock uwb(_opCtx.get());
+ if (_idIndexBlock || _secondaryIndexesBlock) {
+ return _insertDocumentsForUncappedCollection(begin, end);
+ } else {
+ return _insertDocumentsForCappedCollection(begin, end);
}
- return Status::OK();
});
}
diff --git a/src/mongo/db/repl/collection_bulk_loader_impl.h b/src/mongo/db/repl/collection_bulk_loader_impl.h
index 5e03fcbda5d..afb6df03bc2 100644
--- a/src/mongo/db/repl/collection_bulk_loader_impl.h
+++ b/src/mongo/db/repl/collection_bulk_loader_impl.h
@@ -84,6 +84,20 @@ private:
Status _runTaskReleaseResourcesOnFailure(const F& task) noexcept;
/**
+ * For capped collections, each document will be inserted in its own WriteUnitOfWork.
+ */
+ Status _insertDocumentsForCappedCollection(const std::vector<BSONObj>::const_iterator begin,
+ const std::vector<BSONObj>::const_iterator end);
+
+ /**
+ * For uncapped collections, we will insert documents in batches of size
+ * collectionBulkLoaderBatchSizeInBytes or up to one document size greater. All insertions in a
+ * given batch will be inserted in one WriteUnitOfWork.
+ */
+ Status _insertDocumentsForUncappedCollection(const std::vector<BSONObj>::const_iterator begin,
+ const std::vector<BSONObj>::const_iterator end);
+
+ /**
* Adds document and associated RecordId to index blocks after inserting into RecordStore.
*/
Status _addDocumentToIndexBlocks(const BSONObj& doc, const RecordId& loc);