summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBenety Goh <benety@mongodb.com>2018-10-25 19:44:15 -0400
committerBenety Goh <benety@mongodb.com>2018-10-25 19:44:15 -0400
commitfc4c06660da1e121c817add86a56bbee1ef05f16 (patch)
tree0521b0b267ab32c59559e573c936eec561ca1e08
parent7d63b4cb968325aaea632fd14c10476dd9a6117e (diff)
downloadmongo-fc4c06660da1e121c817add86a56bbee1ef05f16.tar.gz
SERVER-37589 rename Collection::insertDocument() overload for bulk loader and remove MultiIndexBlock reference
-rw-r--r--src/mongo/db/catalog/collection.h25
-rw-r--r--src/mongo/db/catalog/collection_impl.cpp14
-rw-r--r--src/mongo/db/catalog/collection_impl.h10
-rw-r--r--src/mongo/db/catalog/collection_mock.h6
-rw-r--r--src/mongo/db/repl/collection_bulk_loader_impl.cpp39
-rw-r--r--src/mongo/db/repl/collection_bulk_loader_impl.h5
-rw-r--r--src/mongo/db/s/sharding_initialization_op_observer_test.cpp4
7 files changed, 66 insertions, 37 deletions
diff --git a/src/mongo/db/catalog/collection.h b/src/mongo/db/catalog/collection.h
index ea37ea27943..bb38fb4b25b 100644
--- a/src/mongo/db/catalog/collection.h
+++ b/src/mongo/db/catalog/collection.h
@@ -189,8 +189,15 @@ public:
enum ValidationLevel { OFF, MODERATE, STRICT_V };
enum class StoreDeletedDoc { Off, On };
+ /**
+ * Callback function for callers of insertDocumentForBulkLoader().
+ */
+ using OnRecordInsertedFn = stdx::function<Status(const RecordId& loc)>;
+
class Impl : virtual CappedCallback {
public:
+ using OnRecordInsertedFn = Collection::OnRecordInsertedFn;
+
virtual ~Impl() = 0;
virtual void init(OperationContext* opCtx) = 0;
@@ -262,9 +269,9 @@ public:
Timestamp* timestamps,
size_t nDocs) = 0;
- virtual Status insertDocument(OperationContext* opCtx,
- const BSONObj& doc,
- const std::vector<MultiIndexBlock*>& indexBlocks) = 0;
+ virtual Status insertDocumentForBulkLoader(OperationContext* opCtx,
+ const BSONObj& doc,
+ const OnRecordInsertedFn& onRecordInserted) = 0;
virtual RecordId updateDocument(OperationContext* opCtx,
const RecordId& oldLocation,
@@ -511,14 +518,16 @@ public:
}
/**
- * Inserts a document into the record store and adds it to the MultiIndexBlocks passed in.
+ * Inserts a document into the record store for a bulk loader that manages the index building
+ * outside this Collection. The bulk loader is notified with the RecordId of the document
+ * inserted into the RecordStore.
*
* NOTE: It is up to caller to commit the indexes.
*/
- inline Status insertDocument(OperationContext* const opCtx,
- const BSONObj& doc,
- const std::vector<MultiIndexBlock*>& indexBlocks) {
- return this->_impl().insertDocument(opCtx, doc, indexBlocks);
+ inline Status insertDocumentForBulkLoader(OperationContext* const opCtx,
+ const BSONObj& doc,
+ const OnRecordInsertedFn& onRecordInserted) {
+ return this->_impl().insertDocumentForBulkLoader(opCtx, doc, onRecordInserted);
}
/**
diff --git a/src/mongo/db/catalog/collection_impl.cpp b/src/mongo/db/catalog/collection_impl.cpp
index 08c8d95fbda..a2644f098d8 100644
--- a/src/mongo/db/catalog/collection_impl.cpp
+++ b/src/mongo/db/catalog/collection_impl.cpp
@@ -418,9 +418,9 @@ Status CollectionImpl::insertDocument(OperationContext* opCtx,
return insertDocuments(opCtx, docs.begin(), docs.end(), opDebug, fromMigrate);
}
-Status CollectionImpl::insertDocument(OperationContext* opCtx,
- const BSONObj& doc,
- const std::vector<MultiIndexBlock*>& indexBlocks) {
+Status CollectionImpl::insertDocumentForBulkLoader(OperationContext* opCtx,
+ const BSONObj& doc,
+ const OnRecordInsertedFn& onRecordInserted) {
auto status = checkFailCollectionInsertsFailPoint(_ns, doc);
if (!status.isOK()) {
@@ -442,11 +442,9 @@ Status CollectionImpl::insertDocument(OperationContext* opCtx,
if (!loc.isOK())
return loc.getStatus();
- for (auto&& indexBlock : indexBlocks) {
- Status status = indexBlock->insert(doc, loc.getValue());
- if (!status.isOK()) {
- return status;
- }
+ status = onRecordInserted(loc.getValue());
+ if (!status.isOK()) {
+ return status;
}
vector<InsertStatement> inserts;
diff --git a/src/mongo/db/catalog/collection_impl.h b/src/mongo/db/catalog/collection_impl.h
index 5d616d01791..1eb30230065 100644
--- a/src/mongo/db/catalog/collection_impl.h
+++ b/src/mongo/db/catalog/collection_impl.h
@@ -182,13 +182,15 @@ public:
size_t nDocs) final;
/**
- * Inserts a document into the record store and adds it to the MultiIndexBlocks passed in.
+ * Inserts a document into the record store for a bulk loader that manages the index building
+ * outside this Collection. The bulk loader is notified with the RecordId of the document
+ * inserted into the RecordStore.
*
* NOTE: It is up to caller to commit the indexes.
*/
- Status insertDocument(OperationContext* opCtx,
- const BSONObj& doc,
- const std::vector<MultiIndexBlock*>& indexBlocks) final;
+ Status insertDocumentForBulkLoader(OperationContext* opCtx,
+ const BSONObj& doc,
+ const OnRecordInsertedFn& onRecordInserted) final;
/**
* Updates the document @ oldLocation with newDoc.
diff --git a/src/mongo/db/catalog/collection_mock.h b/src/mongo/db/catalog/collection_mock.h
index 471dba32623..f74550f0f7d 100644
--- a/src/mongo/db/catalog/collection_mock.h
+++ b/src/mongo/db/catalog/collection_mock.h
@@ -153,9 +153,9 @@ public:
std::abort();
}
- Status insertDocument(OperationContext* opCtx,
- const BSONObj& doc,
- const std::vector<MultiIndexBlock*>& indexBlocks) {
+ Status insertDocumentForBulkLoader(OperationContext* opCtx,
+ const BSONObj& doc,
+ const OnRecordInsertedFn& onRecordInserted) {
std::abort();
}
diff --git a/src/mongo/db/repl/collection_bulk_loader_impl.cpp b/src/mongo/db/repl/collection_bulk_loader_impl.cpp
index 0231cf9749d..d2b28d1d349 100644
--- a/src/mongo/db/repl/collection_bulk_loader_impl.cpp
+++ b/src/mongo/db/repl/collection_bulk_loader_impl.cpp
@@ -112,22 +112,18 @@ Status CollectionBulkLoaderImpl::insertDocuments(const std::vector<BSONObj>::con
UnreplicatedWritesBlock uwb(_opCtx.get());
for (auto iter = begin; iter != end; ++iter) {
- std::vector<MultiIndexBlock*> indexers;
- if (_idIndexBlock) {
- indexers.push_back(_idIndexBlock.get());
- }
- if (_secondaryIndexesBlock) {
- indexers.push_back(_secondaryIndexesBlock.get());
- }
-
Status status = writeConflictRetry(
_opCtx.get(), "CollectionBulkLoaderImpl::insertDocuments", _nss.ns(), [&] {
WriteUnitOfWork wunit(_opCtx.get());
- if (!indexers.empty()) {
+ const auto& doc = *iter;
+ if (_idIndexBlock || _secondaryIndexesBlock) {
// This flavor of insertDocument will not update any pre-existing indexes,
// only the indexers passed in.
- const auto status = _autoColl->getCollection()->insertDocument(
- _opCtx.get(), *iter, indexers);
+ auto onRecordInserted = [&](const RecordId& loc) {
+ return _addDocumentToIndexBlocks(doc, loc);
+ };
+ const auto status = _autoColl->getCollection()->insertDocumentForBulkLoader(
+ _opCtx.get(), doc, onRecordInserted);
if (!status.isOK()) {
return status;
}
@@ -135,7 +131,7 @@ Status CollectionBulkLoaderImpl::insertDocuments(const std::vector<BSONObj>::con
// For capped collections, we use regular insertDocument, which will update
// pre-existing indexes.
const auto status = _autoColl->getCollection()->insertDocument(
- _opCtx.get(), InsertStatement(*iter), nullptr);
+ _opCtx.get(), InsertStatement(doc), nullptr);
if (!status.isOK()) {
return status;
}
@@ -258,6 +254,25 @@ Status CollectionBulkLoaderImpl::_runTaskReleaseResourcesOnFailure(F task) noexc
}
}
+Status CollectionBulkLoaderImpl::_addDocumentToIndexBlocks(const BSONObj& doc,
+ const RecordId& loc) {
+ if (_idIndexBlock) {
+ auto status = _idIndexBlock->insert(doc, loc);
+ if (!status.isOK()) {
+ return status.withContext("failed to add document to _id index");
+ }
+ }
+
+ if (_secondaryIndexesBlock) {
+ auto status = _secondaryIndexesBlock->insert(doc, loc);
+ if (!status.isOK()) {
+ return status.withContext("failed to add document to secondary indexes");
+ }
+ }
+
+ return Status::OK();
+}
+
CollectionBulkLoaderImpl::Stats CollectionBulkLoaderImpl::getStats() const {
return _stats;
}
diff --git a/src/mongo/db/repl/collection_bulk_loader_impl.h b/src/mongo/db/repl/collection_bulk_loader_impl.h
index 5982e251559..2c81b5588d4 100644
--- a/src/mongo/db/repl/collection_bulk_loader_impl.h
+++ b/src/mongo/db/repl/collection_bulk_loader_impl.h
@@ -84,6 +84,11 @@ private:
template <typename F>
Status _runTaskReleaseResourcesOnFailure(F task) noexcept;
+ /**
+ * Adds document and associated RecordId to index blocks after inserting into RecordStore.
+ */
+ Status _addDocumentToIndexBlocks(const BSONObj& doc, const RecordId& loc);
+
ServiceContext::UniqueClient _client;
ServiceContext::UniqueOperationContext _opCtx;
std::unique_ptr<AutoGetCollection> _autoColl;
diff --git a/src/mongo/db/s/sharding_initialization_op_observer_test.cpp b/src/mongo/db/s/sharding_initialization_op_observer_test.cpp
index 0c79243ca68..37f61b19653 100644
--- a/src/mongo/db/s/sharding_initialization_op_observer_test.cpp
+++ b/src/mongo/db/s/sharding_initialization_op_observer_test.cpp
@@ -118,8 +118,8 @@ TEST_F(ShardingInitializationOpObserverTest, GlobalInitDoesntGetCalledIfWriteAbo
operationContext(), NamespaceString("admin.system.version"), MODE_IX);
WriteUnitOfWork wuow(operationContext());
- ASSERT_OK(autoColl.getCollection()->insertDocument(
- operationContext(), shardIdentity.toShardIdentityDocument(), {}));
+ InsertStatement stmt(shardIdentity.toShardIdentityDocument());
+ ASSERT_OK(autoColl.getCollection()->insertDocument(operationContext(), stmt, nullptr));
ASSERT_EQ(0, getInitCallCount());
}