summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAllison Easton <allison.easton@mongodb.com>2019-06-25 09:11:09 -0400
committerAllison Easton <allison.easton@mongodb.com>2019-06-25 09:11:09 -0400
commit54ca8a7112746c7637a295b6d57b6f2c3b4df9b7 (patch)
treeff3960dfd30fc7d97a16fb4140b0f8ba88b92b8a
parent4ecc335c610d457e5e063c8710c1820833fe45d9 (diff)
downloadmongo-54ca8a7112746c7637a295b6d57b6f2c3b4df9b7.tar.gz
SERVER-41529 Prevent dangling index records by calling
_addDocumentToIndexBlock outside of writeConflictRetry block.
-rw-r--r--jstests/replsets/initial_sync_write_conflict.js34
-rw-r--r--src/mongo/db/catalog/collection_impl.cpp11
-rw-r--r--src/mongo/db/repl/collection_bulk_loader_impl.cpp22
3 files changed, 58 insertions, 9 deletions
diff --git a/jstests/replsets/initial_sync_write_conflict.js b/jstests/replsets/initial_sync_write_conflict.js
new file mode 100644
index 00000000000..20fc4fea93f
--- /dev/null
+++ b/jstests/replsets/initial_sync_write_conflict.js
@@ -0,0 +1,34 @@
+/*
+ * Tests that initial sync is successfully able to clone the collection and build
+ * index without any orphan index entries even if a WriteConflictException is
+ * thrown while inserting documents into collections.
+ */
+
+const testName = "write_conflict_exception";
+
+// Start a 2 node replica set.
+const replSet = new ReplSetTest({name: testName, nodes: [{}, {rsConfig: {priority: 0}}]});
+
+jsTest.log("Starting test");
+replSet.startSet();
+replSet.initiate();
+
+var secondary = replSet.getSecondary();
+
+// Start and restart secondary with fail point that throws exception enabled.
+jsTest.log("Stopping secondary");
+replSet.stop(secondary);
+jsTest.log("Re-starting secondary ");
+secondary = replSet.start(secondary, {
+ startClean: true,
+ setParameter: {"failpoint.failAfterBulkLoadDocInsert": "{'mode': {'times': 1}}"}
+});
+
+// Wait for everything to be synced.
+jsTest.log("Waiting for initial sync to succeed");
+replSet.waitForState(secondary, ReplSetTest.State.SECONDARY);
+
+// If the index table contains any entries pointing to invalid document(RecordID), then
+// validateCollections called during replica stopSet will capture the index corruption and throw
+// error.
+replSet.stopSet(); \ No newline at end of file
diff --git a/src/mongo/db/catalog/collection_impl.cpp b/src/mongo/db/catalog/collection_impl.cpp
index 99d7394478d..ff9dcdd6691 100644
--- a/src/mongo/db/catalog/collection_impl.cpp
+++ b/src/mongo/db/catalog/collection_impl.cpp
@@ -53,6 +53,7 @@
#include "mongo/db/clientcursor.h"
#include "mongo/db/commands/server_status_metric.h"
#include "mongo/db/concurrency/d_concurrency.h"
+#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/curop.h"
#include "mongo/db/index/index_access_method.h"
#include "mongo/db/index/index_descriptor.h"
@@ -97,6 +98,9 @@ MONGO_FAIL_POINT_DEFINE(failCollectionInserts);
// }
MONGO_FAIL_POINT_DEFINE(hangAfterCollectionInserts);
+// This fail point throws a WriteConflictException after a successful call to insertRecords.
+MONGO_FAIL_POINT_DEFINE(failAfterBulkLoadDocInsert);
+
/**
* Checks the 'failCollectionInserts' fail point at the beginning of an insert operation to see if
* the insert should fail. Returns Status::OK if The function should proceed with the insertion.
@@ -473,8 +477,11 @@ Status CollectionImpl::insertDocumentForBulkLoader(OperationContext* opCtx,
return loc.getStatus();
status = onRecordInserted(loc.getValue());
- if (!status.isOK()) {
- return status;
+
+ if (MONGO_FAIL_POINT(failAfterBulkLoadDocInsert)) {
+ log() << "Failpoint failAfterBulkLoadDocInsert enabled for " << _ns.ns()
+ << ". Throwing WriteConflictException.";
+ throw WriteConflictException();
}
vector<InsertStatement> inserts;
diff --git a/src/mongo/db/repl/collection_bulk_loader_impl.cpp b/src/mongo/db/repl/collection_bulk_loader_impl.cpp
index 68911ca6c84..40471bb6e0a 100644
--- a/src/mongo/db/repl/collection_bulk_loader_impl.cpp
+++ b/src/mongo/db/repl/collection_bulk_loader_impl.cpp
@@ -115,21 +115,21 @@ Status CollectionBulkLoaderImpl::init(const std::vector<BSONObj>& secondaryIndex
Status CollectionBulkLoaderImpl::insertDocuments(const std::vector<BSONObj>::const_iterator begin,
const std::vector<BSONObj>::const_iterator end) {
- int count = 0;
return _runTaskReleaseResourcesOnFailure([&] {
UnreplicatedWritesBlock uwb(_opCtx.get());
for (auto iter = begin; iter != end; ++iter) {
+ boost::optional<RecordId> loc;
+ const auto& doc = *iter;
Status status = writeConflictRetry(
_opCtx.get(), "CollectionBulkLoaderImpl::insertDocuments", _nss.ns(), [&] {
WriteUnitOfWork wunit(_opCtx.get());
- const auto& doc = *iter;
if (_idIndexBlock || _secondaryIndexesBlock) {
- // This flavor of insertDocument will not update any pre-existing indexes,
- // only the indexers passed in.
- auto onRecordInserted = [&](const RecordId& loc) {
- return _addDocumentToIndexBlocks(doc, loc);
+ auto onRecordInserted = [&](const RecordId& location) {
+ loc = location;
+ return Status::OK();
};
+ // This version of insert will not update any indexes.
const auto status = _autoColl->getCollection()->insertDocumentForBulkLoader(
_opCtx.get(), doc, onRecordInserted);
if (!status.isOK()) {
@@ -154,7 +154,15 @@ Status CollectionBulkLoaderImpl::insertDocuments(const std::vector<BSONObj>::con
return status;
}
- ++count;
+ if (loc) {
+ // Inserts index entries into the external sorter. This will not update
+ // pre-existing indexes.
+ status = _addDocumentToIndexBlocks(doc, loc.get());
+ }
+
+ if (!status.isOK()) {
+ return status;
+ }
}
return Status::OK();
});