summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--jstests/replsets/initial_sync_fail_insert_once.js38
-rw-r--r--src/mongo/base/error_codes.err1
-rw-r--r--src/mongo/db/catalog/collection.cpp33
-rw-r--r--src/mongo/db/repl/collection_bulk_loader_impl.cpp110
-rw-r--r--src/mongo/db/repl/collection_bulk_loader_impl.h10
-rw-r--r--src/mongo/db/repl/task_runner.cpp2
6 files changed, 149 insertions, 45 deletions
diff --git a/jstests/replsets/initial_sync_fail_insert_once.js b/jstests/replsets/initial_sync_fail_insert_once.js
new file mode 100644
index 00000000000..9c13095e22f
--- /dev/null
+++ b/jstests/replsets/initial_sync_fail_insert_once.js
@@ -0,0 +1,38 @@
+/**
+ * Tests that initial sync can complete after a failed insert to a cloned collection.
+ */
+
+(function() {
+ var name = 'initial_sync_fail_insert_once';
+ var replSet = new ReplSetTest({
+ name: name,
+ nodes: 2,
+ });
+
+ replSet.startSet();
+ replSet.initiate();
+ var primary = replSet.getPrimary();
+ var secondary = replSet.getSecondary();
+
+ var coll = primary.getDB('test').getCollection(name);
+ assert.writeOK(coll.insert({_id: 0, x: 1}, {writeConcern: {w: 2}}));
+
+ jsTest.log("Enabling Failpoint failCollectionInserts on " + tojson(secondary));
+ assert.commandWorked(secondary.getDB("admin").adminCommand({
+ configureFailPoint: "failCollectionInserts",
+ mode: {times: 2},
+ data: {collectionNS: coll.getFullName()}
+ }));
+
+ jsTest.log("Issuing RESYNC command to " + tojson(secondary));
+ assert.commandWorked(secondary.getDB("admin").runCommand({resync: 1}));
+
+ replSet.awaitReplication();
+ replSet.awaitSecondaryNodes();
+
+ assert.eq(1, secondary.getDB("test")[name].count());
+ assert.docEq({_id: 0, x: 1}, secondary.getDB("test")[name].findOne());
+
+ jsTest.log("Stopping repl set test; finished.");
+ replSet.stopSet();
+})();
diff --git a/src/mongo/base/error_codes.err b/src/mongo/base/error_codes.err
index 50a38f41816..0cac2a6528f 100644
--- a/src/mongo/base/error_codes.err
+++ b/src/mongo/base/error_codes.err
@@ -190,6 +190,7 @@ error_code("IncompatibleServerVersion", 188)
error_code("PrimarySteppedDown", 189)
error_code("MasterSlaveConnectionFailure", 190)
error_code("BalancerLostDistributedLock", 191)
+error_code("FailPointEnabled", 192)
# Non-sequential error codes (for compatibility only)
error_code("SocketException", 9001)
diff --git a/src/mongo/db/catalog/collection.cpp b/src/mongo/db/catalog/collection.cpp
index 8d82d6ff395..b4651f41d5a 100644
--- a/src/mongo/db/catalog/collection.cpp
+++ b/src/mongo/db/catalog/collection.cpp
@@ -68,11 +68,16 @@
#include "mongo/db/storage/wiredtiger/wiredtiger_record_store.h"
#include "mongo/db/auth/user_document_parser.h" // XXX-ANDY
+#include "mongo/util/fail_point.h"
#include "mongo/util/log.h"
namespace mongo {
namespace {
+
+// Used below to fail during inserts.
+MONGO_FP_DECLARE(failCollectionInserts);
+
const auto bannedExpressionsInValidators = std::set<StringData>{
"$geoNear", "$near", "$nearSphere", "$text", "$where",
};
@@ -369,6 +374,20 @@ Status Collection::insertDocuments(OperationContext* txn,
OpDebug* opDebug,
bool enforceQuota,
bool fromMigrate) {
+
+ MONGO_FAIL_POINT_BLOCK(failCollectionInserts, extraData) {
+ const BSONObj& data = extraData.getData();
+ const auto collElem = data["collectionNS"];
+ // If the failpoint specifies no collection or matches the existing one, fail.
+ if (!collElem || _ns == collElem.str()) {
+ const std::string msg = str::stream()
+ << "Failpoint (failCollectionInserts) has been enabled (" << data
+ << "), so rejecting insert (first doc): " << *begin;
+ log() << msg;
+ return {ErrorCodes::FailPointEnabled, msg};
+ }
+ }
+
// Should really be done in the collection object at creation and updated on index create.
const bool hasIdIndex = _indexCatalog.findIdIndex(txn);
@@ -418,6 +437,20 @@ Status Collection::insertDocument(OperationContext* txn,
const BSONObj& doc,
const std::vector<MultiIndexBlock*>& indexBlocks,
bool enforceQuota) {
+
+ MONGO_FAIL_POINT_BLOCK(failCollectionInserts, extraData) {
+ const BSONObj& data = extraData.getData();
+ const auto collElem = data["collectionNS"];
+ // If the failpoint specifies no collection or matches the existing one, fail.
+ if (!collElem || _ns == collElem.str()) {
+ const std::string msg = str::stream()
+ << "Failpoint (failCollectionInserts) has been enabled (" << data
+ << "), so rejecting insert: " << doc;
+ log() << msg;
+ return {ErrorCodes::FailPointEnabled, msg};
+ }
+ }
+
{
auto status = checkValidation(txn, doc);
if (!status.isOK())
diff --git a/src/mongo/db/repl/collection_bulk_loader_impl.cpp b/src/mongo/db/repl/collection_bulk_loader_impl.cpp
index cf31132e024..3f8e78d46ca 100644
--- a/src/mongo/db/repl/collection_bulk_loader_impl.cpp
+++ b/src/mongo/db/repl/collection_bulk_loader_impl.cpp
@@ -46,6 +46,7 @@
#include "mongo/util/destructor_guard.h"
#include "mongo/util/log.h"
#include "mongo/util/mongoutils/str.h"
+#include "mongo/util/scopeguard.h"
namespace mongo {
namespace repl {
@@ -64,8 +65,8 @@ CollectionBulkLoaderImpl::CollectionBulkLoaderImpl(OperationContext* txn,
_txn(txn),
_coll(coll),
_nss{coll->ns()},
- _idIndexBlock{txn, coll},
- _secondaryIndexesBlock{txn, coll},
+ _idIndexBlock(stdx::make_unique<MultiIndexBlock>(txn, coll)),
+ _secondaryIndexesBlock(stdx::make_unique<MultiIndexBlock>(txn, coll)),
_idIndexSpec(idIndexSpec) {
invariant(txn);
invariant(coll);
@@ -91,18 +92,21 @@ Status CollectionBulkLoaderImpl::init(OperationContext* txn,
invariant(coll);
invariant(txn->getClient() == &cc());
if (secondaryIndexSpecs.size()) {
- _hasSecondaryIndexes = true;
- _secondaryIndexesBlock.ignoreUniqueConstraint();
- auto status = _secondaryIndexesBlock.init(secondaryIndexSpecs);
+ _secondaryIndexesBlock->ignoreUniqueConstraint();
+ auto status = _secondaryIndexesBlock->init(secondaryIndexSpecs);
if (!status.isOK()) {
return status;
}
+ } else {
+ _secondaryIndexesBlock.reset();
}
if (!_idIndexSpec.isEmpty()) {
- auto status = _idIndexBlock.init(_idIndexSpec);
+ auto status = _idIndexBlock->init(_idIndexSpec);
if (!status.isOK()) {
return status;
}
+ } else {
+ _idIndexBlock.reset();
}
return Status::OK();
@@ -111,36 +115,37 @@ Status CollectionBulkLoaderImpl::init(OperationContext* txn,
Status CollectionBulkLoaderImpl::insertDocuments(const std::vector<BSONObj>::const_iterator begin,
const std::vector<BSONObj>::const_iterator end) {
int count = 0;
- return _runner->runSynchronousTask([begin, end, &count, this](OperationContext* txn) -> Status {
- invariant(txn);
+ return _runTaskReleaseResourcesOnFailure(
+ [begin, end, &count, this](OperationContext* txn) -> Status {
+ invariant(txn);
- for (auto iter = begin; iter != end; ++iter) {
- std::vector<MultiIndexBlock*> indexers;
- if (!_idIndexSpec.isEmpty()) {
- indexers.push_back(&_idIndexBlock);
- }
- if (_hasSecondaryIndexes) {
- indexers.push_back(&_secondaryIndexesBlock);
- }
- MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
- WriteUnitOfWork wunit(txn);
- const auto status = _coll->insertDocument(txn, *iter, indexers, false);
- if (!status.isOK()) {
- return status;
+ for (auto iter = begin; iter != end; ++iter) {
+ std::vector<MultiIndexBlock*> indexers;
+ if (_idIndexBlock) {
+ indexers.push_back(_idIndexBlock.get());
}
- wunit.commit();
- }
- MONGO_WRITE_CONFLICT_RETRY_LOOP_END(
- _txn, "CollectionBulkLoaderImpl::insertDocuments", _nss.ns());
+ if (_secondaryIndexesBlock) {
+ indexers.push_back(_secondaryIndexesBlock.get());
+ }
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
+ WriteUnitOfWork wunit(txn);
+ const auto status = _coll->insertDocument(txn, *iter, indexers, false);
+ if (!status.isOK()) {
+ return status;
+ }
+ wunit.commit();
+ }
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_END(
+ _txn, "CollectionBulkLoaderImpl::insertDocuments", _nss.ns());
- ++count;
- }
- return Status::OK();
- });
+ ++count;
+ }
+ return Status::OK();
+ });
}
Status CollectionBulkLoaderImpl::commit() {
- return _runner->runSynchronousTask(
+ return _runTaskReleaseResourcesOnFailure(
[this](OperationContext* txn) -> Status {
_stats.startBuildingIndexes = Date_t::now();
LOG(2) << "Creating indexes for ns: " << _nss.ns();
@@ -149,9 +154,9 @@ Status CollectionBulkLoaderImpl::commit() {
// Commit before deleting dups, so the dups will be removed from secondary indexes when
// deleted.
- if (_hasSecondaryIndexes) {
+ if (_secondaryIndexesBlock) {
std::set<RecordId> secDups;
- auto status = _secondaryIndexesBlock.doneInserting(&secDups);
+ auto status = _secondaryIndexesBlock->doneInserting(&secDups);
if (!status.isOK()) {
return status;
}
@@ -163,18 +168,18 @@ Status CollectionBulkLoaderImpl::commit() {
}
MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
WriteUnitOfWork wunit(txn);
- _secondaryIndexesBlock.commit();
+ _secondaryIndexesBlock->commit();
wunit.commit();
}
MONGO_WRITE_CONFLICT_RETRY_LOOP_END(
_txn, "CollectionBulkLoaderImpl::commit", _nss.ns());
}
- if (!_idIndexSpec.isEmpty()) {
+ if (_idIndexBlock) {
// Delete dups.
std::set<RecordId> dups;
// Do not do inside a WriteUnitOfWork (required by doneInserting).
- auto status = _idIndexBlock.doneInserting(&dups);
+ auto status = _idIndexBlock->doneInserting(&dups);
if (!status.isOK()) {
return status;
}
@@ -196,7 +201,7 @@ Status CollectionBulkLoaderImpl::commit() {
// Commit _id index, without dups.
MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
WriteUnitOfWork wunit(txn);
- _idIndexBlock.commit();
+ _idIndexBlock->commit();
wunit.commit();
}
MONGO_WRITE_CONFLICT_RETRY_LOOP_END(
@@ -206,15 +211,40 @@ Status CollectionBulkLoaderImpl::commit() {
LOG(2) << "Done creating indexes for ns: " << _nss.ns()
<< ", stats: " << _stats.toString();
- // release locks.
- _autoColl.reset(nullptr);
- _autoDB.reset(nullptr);
- _coll = nullptr;
+ _releaseResources();
return Status::OK();
},
TaskRunner::NextAction::kDisposeOperationContext);
}
+void CollectionBulkLoaderImpl::_releaseResources() {
+ if (_secondaryIndexesBlock) {
+ _secondaryIndexesBlock.reset();
+ }
+
+ if (_idIndexBlock) {
+ _idIndexBlock.reset();
+ }
+
+ // release locks.
+ _coll = nullptr;
+ _autoColl.reset(nullptr);
+ _autoDB.reset(nullptr);
+}
+
+Status CollectionBulkLoaderImpl::_runTaskReleaseResourcesOnFailure(
+ TaskRunner::SynchronousTask task, TaskRunner::NextAction nextAction) {
+ auto newTask = [this, &task](OperationContext* txn) -> Status {
+ ScopeGuard guard = MakeGuard(&CollectionBulkLoaderImpl::_releaseResources, this);
+ const auto status = task(txn);
+ if (status.isOK()) {
+ guard.Dismiss();
+ }
+ return status;
+ };
+ return _runner->runSynchronousTask(newTask, nextAction);
+}
+
CollectionBulkLoaderImpl::Stats CollectionBulkLoaderImpl::getStats() const {
return _stats;
}
diff --git a/src/mongo/db/repl/collection_bulk_loader_impl.h b/src/mongo/db/repl/collection_bulk_loader_impl.h
index 1fa6033a104..b66baeb2bae 100644
--- a/src/mongo/db/repl/collection_bulk_loader_impl.h
+++ b/src/mongo/db/repl/collection_bulk_loader_impl.h
@@ -84,6 +84,11 @@ public:
virtual BSONObj toBSON() const override;
private:
+ void _releaseResources();
+ Status _runTaskReleaseResourcesOnFailure(
+ TaskRunner::SynchronousTask task,
+ TaskRunner::NextAction nextAction = TaskRunner::NextAction::kKeepOperationContext);
+
std::unique_ptr<OldThreadPool> _threadPool;
std::unique_ptr<TaskRunner> _runner;
std::unique_ptr<AutoGetCollection> _autoColl;
@@ -91,9 +96,8 @@ private:
OperationContext* _txn = nullptr;
Collection* _coll = nullptr;
NamespaceString _nss;
- MultiIndexBlock _idIndexBlock;
- MultiIndexBlock _secondaryIndexesBlock;
- bool _hasSecondaryIndexes = false;
+ std::unique_ptr<MultiIndexBlock> _idIndexBlock;
+ std::unique_ptr<MultiIndexBlock> _secondaryIndexesBlock;
BSONObj _idIndexSpec;
Stats _stats;
};
diff --git a/src/mongo/db/repl/task_runner.cpp b/src/mongo/db/repl/task_runner.cpp
index 2f0bc184738..210718bba3e 100644
--- a/src/mongo/db/repl/task_runner.cpp
+++ b/src/mongo/db/repl/task_runner.cpp
@@ -227,9 +227,7 @@ Status TaskRunner::runSynchronousTask(SynchronousTask func, TaskRunner::NextActi
} else {
// Run supplied function.
try {
- log() << "starting to run synchronous task on runner.";
returnStatus = func(txn);
- log() << "done running the synchronous task.";
} catch (...) {
returnStatus = exceptionToStatus();
error() << "Exception thrown in runSynchronousTask: " << redact(returnStatus);