diff options
author | Benety Goh <benety@mongodb.com> | 2018-10-25 19:44:15 -0400 |
---|---|---|
committer | Benety Goh <benety@mongodb.com> | 2018-10-25 19:44:15 -0400 |
commit | fc4c06660da1e121c817add86a56bbee1ef05f16 (patch) | |
tree | 0521b0b267ab32c59559e573c936eec561ca1e08 | |
parent | 7d63b4cb968325aaea632fd14c10476dd9a6117e (diff) | |
download | mongo-fc4c06660da1e121c817add86a56bbee1ef05f16.tar.gz |
SERVER-37589 rename Collection::insertDocument() overload for bulk loader and remove MultiIndexBlock reference
-rw-r--r-- | src/mongo/db/catalog/collection.h | 25 | ||||
-rw-r--r-- | src/mongo/db/catalog/collection_impl.cpp | 14 | ||||
-rw-r--r-- | src/mongo/db/catalog/collection_impl.h | 10 | ||||
-rw-r--r-- | src/mongo/db/catalog/collection_mock.h | 6 | ||||
-rw-r--r-- | src/mongo/db/repl/collection_bulk_loader_impl.cpp | 39 | ||||
-rw-r--r-- | src/mongo/db/repl/collection_bulk_loader_impl.h | 5 | ||||
-rw-r--r-- | src/mongo/db/s/sharding_initialization_op_observer_test.cpp | 4 |
7 files changed, 66 insertions, 37 deletions
diff --git a/src/mongo/db/catalog/collection.h b/src/mongo/db/catalog/collection.h index ea37ea27943..bb38fb4b25b 100644 --- a/src/mongo/db/catalog/collection.h +++ b/src/mongo/db/catalog/collection.h @@ -189,8 +189,15 @@ public: enum ValidationLevel { OFF, MODERATE, STRICT_V }; enum class StoreDeletedDoc { Off, On }; + /** + * Callback function for callers of insertDocumentForBulkLoader(). + */ + using OnRecordInsertedFn = stdx::function<Status(const RecordId& loc)>; + class Impl : virtual CappedCallback { public: + using OnRecordInsertedFn = Collection::OnRecordInsertedFn; + virtual ~Impl() = 0; virtual void init(OperationContext* opCtx) = 0; @@ -262,9 +269,9 @@ public: Timestamp* timestamps, size_t nDocs) = 0; - virtual Status insertDocument(OperationContext* opCtx, - const BSONObj& doc, - const std::vector<MultiIndexBlock*>& indexBlocks) = 0; + virtual Status insertDocumentForBulkLoader(OperationContext* opCtx, + const BSONObj& doc, + const OnRecordInsertedFn& onRecordInserted) = 0; virtual RecordId updateDocument(OperationContext* opCtx, const RecordId& oldLocation, @@ -511,14 +518,16 @@ public: } /** - * Inserts a document into the record store and adds it to the MultiIndexBlocks passed in. + * Inserts a document into the record store for a bulk loader that manages the index building + * outside this Collection. The bulk loader is notified with the RecordId of the document + * inserted into the RecordStore. * * NOTE: It is up to caller to commit the indexes. */ - inline Status insertDocument(OperationContext* const opCtx, - const BSONObj& doc, - const std::vector<MultiIndexBlock*>& indexBlocks) { - return this->_impl().insertDocument(opCtx, doc, indexBlocks); + inline Status insertDocumentForBulkLoader(OperationContext* const opCtx, + const BSONObj& doc, + const OnRecordInsertedFn& onRecordInserted) { + return this->_impl().insertDocumentForBulkLoader(opCtx, doc, onRecordInserted); } /** diff --git a/src/mongo/db/catalog/collection_impl.cpp b/src/mongo/db/catalog/collection_impl.cpp index 08c8d95fbda..a2644f098d8 100644 --- a/src/mongo/db/catalog/collection_impl.cpp +++ b/src/mongo/db/catalog/collection_impl.cpp @@ -418,9 +418,9 @@ Status CollectionImpl::insertDocument(OperationContext* opCtx, return insertDocuments(opCtx, docs.begin(), docs.end(), opDebug, fromMigrate); } -Status CollectionImpl::insertDocument(OperationContext* opCtx, - const BSONObj& doc, - const std::vector<MultiIndexBlock*>& indexBlocks) { +Status CollectionImpl::insertDocumentForBulkLoader(OperationContext* opCtx, + const BSONObj& doc, + const OnRecordInsertedFn& onRecordInserted) { auto status = checkFailCollectionInsertsFailPoint(_ns, doc); if (!status.isOK()) { @@ -442,11 +442,9 @@ Status CollectionImpl::insertDocument(OperationContext* opCtx, if (!loc.isOK()) return loc.getStatus(); - for (auto&& indexBlock : indexBlocks) { - Status status = indexBlock->insert(doc, loc.getValue()); - if (!status.isOK()) { - return status; - } + status = onRecordInserted(loc.getValue()); + if (!status.isOK()) { + return status; } vector<InsertStatement> inserts; diff --git a/src/mongo/db/catalog/collection_impl.h b/src/mongo/db/catalog/collection_impl.h index 5d616d01791..1eb30230065 100644 --- a/src/mongo/db/catalog/collection_impl.h +++ b/src/mongo/db/catalog/collection_impl.h @@ -182,13 +182,15 @@ public: size_t nDocs) final; /** - * Inserts a document into the record store and adds it to the MultiIndexBlocks passed in. + * Inserts a document into the record store for a bulk loader that manages the index building + * outside this Collection. The bulk loader is notified with the RecordId of the document + * inserted into the RecordStore. * * NOTE: It is up to caller to commit the indexes. */ - Status insertDocument(OperationContext* opCtx, - const BSONObj& doc, - const std::vector<MultiIndexBlock*>& indexBlocks) final; + Status insertDocumentForBulkLoader(OperationContext* opCtx, + const BSONObj& doc, + const OnRecordInsertedFn& onRecordInserted) final; /** * Updates the document @ oldLocation with newDoc. diff --git a/src/mongo/db/catalog/collection_mock.h b/src/mongo/db/catalog/collection_mock.h index 471dba32623..f74550f0f7d 100644 --- a/src/mongo/db/catalog/collection_mock.h +++ b/src/mongo/db/catalog/collection_mock.h @@ -153,9 +153,9 @@ public: std::abort(); } - Status insertDocument(OperationContext* opCtx, - const BSONObj& doc, - const std::vector<MultiIndexBlock*>& indexBlocks) { + Status insertDocumentForBulkLoader(OperationContext* opCtx, + const BSONObj& doc, + const OnRecordInsertedFn& onRecordInserted) { std::abort(); } diff --git a/src/mongo/db/repl/collection_bulk_loader_impl.cpp b/src/mongo/db/repl/collection_bulk_loader_impl.cpp index 0231cf9749d..d2b28d1d349 100644 --- a/src/mongo/db/repl/collection_bulk_loader_impl.cpp +++ b/src/mongo/db/repl/collection_bulk_loader_impl.cpp @@ -112,22 +112,18 @@ Status CollectionBulkLoaderImpl::insertDocuments(const std::vector<BSONObj>::con UnreplicatedWritesBlock uwb(_opCtx.get()); for (auto iter = begin; iter != end; ++iter) { - std::vector<MultiIndexBlock*> indexers; - if (_idIndexBlock) { - indexers.push_back(_idIndexBlock.get()); - } - if (_secondaryIndexesBlock) { - indexers.push_back(_secondaryIndexesBlock.get()); - } - Status status = writeConflictRetry( _opCtx.get(), "CollectionBulkLoaderImpl::insertDocuments", _nss.ns(), [&] { WriteUnitOfWork wunit(_opCtx.get()); - if (!indexers.empty()) { + const auto& doc = *iter; + if (_idIndexBlock || _secondaryIndexesBlock) { // This flavor of insertDocument will not update any pre-existing indexes, // only the indexers passed in. - const auto status = _autoColl->getCollection()->insertDocument( - _opCtx.get(), *iter, indexers); + auto onRecordInserted = [&](const RecordId& loc) { + return _addDocumentToIndexBlocks(doc, loc); + }; + const auto status = _autoColl->getCollection()->insertDocumentForBulkLoader( + _opCtx.get(), doc, onRecordInserted); if (!status.isOK()) { return status; } @@ -135,7 +131,7 @@ Status CollectionBulkLoaderImpl::insertDocuments(const std::vector<BSONObj>::con // For capped collections, we use regular insertDocument, which will update // pre-existing indexes. const auto status = _autoColl->getCollection()->insertDocument( - _opCtx.get(), InsertStatement(*iter), nullptr); + _opCtx.get(), InsertStatement(doc), nullptr); if (!status.isOK()) { return status; } @@ -258,6 +254,25 @@ Status CollectionBulkLoaderImpl::_runTaskReleaseResourcesOnFailure(F task) noexc } } +Status CollectionBulkLoaderImpl::_addDocumentToIndexBlocks(const BSONObj& doc, + const RecordId& loc) { + if (_idIndexBlock) { + auto status = _idIndexBlock->insert(doc, loc); + if (!status.isOK()) { + return status.withContext("failed to add document to _id index"); + } + } + + if (_secondaryIndexesBlock) { + auto status = _secondaryIndexesBlock->insert(doc, loc); + if (!status.isOK()) { + return status.withContext("failed to add document to secondary indexes"); + } + } + + return Status::OK(); +} + CollectionBulkLoaderImpl::Stats CollectionBulkLoaderImpl::getStats() const { return _stats; } diff --git a/src/mongo/db/repl/collection_bulk_loader_impl.h b/src/mongo/db/repl/collection_bulk_loader_impl.h index 5982e251559..2c81b5588d4 100644 --- a/src/mongo/db/repl/collection_bulk_loader_impl.h +++ b/src/mongo/db/repl/collection_bulk_loader_impl.h @@ -84,6 +84,11 @@ private: template <typename F> Status _runTaskReleaseResourcesOnFailure(F task) noexcept; + /** + * Adds document and associated RecordId to index blocks after inserting into RecordStore. + */ + Status _addDocumentToIndexBlocks(const BSONObj& doc, const RecordId& loc); + ServiceContext::UniqueClient _client; ServiceContext::UniqueOperationContext _opCtx; std::unique_ptr<AutoGetCollection> _autoColl; diff --git a/src/mongo/db/s/sharding_initialization_op_observer_test.cpp b/src/mongo/db/s/sharding_initialization_op_observer_test.cpp index 0c79243ca68..37f61b19653 100644 --- a/src/mongo/db/s/sharding_initialization_op_observer_test.cpp +++ b/src/mongo/db/s/sharding_initialization_op_observer_test.cpp @@ -118,8 +118,8 @@ TEST_F(ShardingInitializationOpObserverTest, GlobalInitDoesntGetCalledIfWriteAbo operationContext(), NamespaceString("admin.system.version"), MODE_IX); WriteUnitOfWork wuow(operationContext()); - ASSERT_OK(autoColl.getCollection()->insertDocument( - operationContext(), shardIdentity.toShardIdentityDocument(), {})); + InsertStatement stmt(shardIdentity.toShardIdentityDocument()); + ASSERT_OK(autoColl.getCollection()->insertDocument(operationContext(), stmt, nullptr)); ASSERT_EQ(0, getInitCallCount()); } |