diff options
-rw-r--r-- | jstests/replsets/initial_sync_fail_insert_once.js | 38 | ||||
-rw-r--r-- | src/mongo/base/error_codes.err | 1 | ||||
-rw-r--r-- | src/mongo/db/catalog/collection.cpp | 33 | ||||
-rw-r--r-- | src/mongo/db/repl/collection_bulk_loader_impl.cpp | 110 | ||||
-rw-r--r-- | src/mongo/db/repl/collection_bulk_loader_impl.h | 10 | ||||
-rw-r--r-- | src/mongo/db/repl/task_runner.cpp | 2 |
6 files changed, 149 insertions, 45 deletions
diff --git a/jstests/replsets/initial_sync_fail_insert_once.js b/jstests/replsets/initial_sync_fail_insert_once.js new file mode 100644 index 00000000000..9c13095e22f --- /dev/null +++ b/jstests/replsets/initial_sync_fail_insert_once.js @@ -0,0 +1,38 @@ +/** + * Tests that initial sync can complete after a failed insert to a cloned collection. + */ + +(function() { + var name = 'initial_sync_fail_insert_once'; + var replSet = new ReplSetTest({ + name: name, + nodes: 2, + }); + + replSet.startSet(); + replSet.initiate(); + var primary = replSet.getPrimary(); + var secondary = replSet.getSecondary(); + + var coll = primary.getDB('test').getCollection(name); + assert.writeOK(coll.insert({_id: 0, x: 1}, {writeConcern: {w: 2}})); + + jsTest.log("Enabling Failpoint failCollectionInserts on " + tojson(secondary)); + assert.commandWorked(secondary.getDB("admin").adminCommand({ + configureFailPoint: "failCollectionInserts", + mode: {times: 2}, + data: {collectionNS: coll.getFullName()} + })); + + jsTest.log("Issuing RESYNC command to " + tojson(secondary)); + assert.commandWorked(secondary.getDB("admin").runCommand({resync: 1})); + + replSet.awaitReplication(); + replSet.awaitSecondaryNodes(); + + assert.eq(1, secondary.getDB("test")[name].count()); + assert.docEq({_id: 0, x: 1}, secondary.getDB("test")[name].findOne()); + + jsTest.log("Stopping repl set test; finished."); + replSet.stopSet(); +})(); diff --git a/src/mongo/base/error_codes.err b/src/mongo/base/error_codes.err index 50a38f41816..0cac2a6528f 100644 --- a/src/mongo/base/error_codes.err +++ b/src/mongo/base/error_codes.err @@ -190,6 +190,7 @@ error_code("IncompatibleServerVersion", 188) error_code("PrimarySteppedDown", 189) error_code("MasterSlaveConnectionFailure", 190) error_code("BalancerLostDistributedLock", 191) +error_code("FailPointEnabled", 192) # Non-sequential error codes (for compatibility only) error_code("SocketException", 9001) diff --git a/src/mongo/db/catalog/collection.cpp b/src/mongo/db/catalog/collection.cpp index 8d82d6ff395..b4651f41d5a 100644 --- a/src/mongo/db/catalog/collection.cpp +++ b/src/mongo/db/catalog/collection.cpp @@ -68,11 +68,16 @@ #include "mongo/db/storage/wiredtiger/wiredtiger_record_store.h" #include "mongo/db/auth/user_document_parser.h" // XXX-ANDY +#include "mongo/util/fail_point.h" #include "mongo/util/log.h" namespace mongo { namespace { + +// Used below to fail during inserts. +MONGO_FP_DECLARE(failCollectionInserts); + const auto bannedExpressionsInValidators = std::set<StringData>{ "$geoNear", "$near", "$nearSphere", "$text", "$where", }; @@ -369,6 +374,20 @@ Status Collection::insertDocuments(OperationContext* txn, OpDebug* opDebug, bool enforceQuota, bool fromMigrate) { + + MONGO_FAIL_POINT_BLOCK(failCollectionInserts, extraData) { + const BSONObj& data = extraData.getData(); + const auto collElem = data["collectionNS"]; + // If the failpoint specifies no collection or matches the existing one, fail. + if (!collElem || _ns == collElem.str()) { + const std::string msg = str::stream() + << "Failpoint (failCollectionInserts) has been enabled (" << data + << "), so rejecting insert (first doc): " << *begin; + log() << msg; + return {ErrorCodes::FailPointEnabled, msg}; + } + } + // Should really be done in the collection object at creation and updated on index create. const bool hasIdIndex = _indexCatalog.findIdIndex(txn); @@ -418,6 +437,20 @@ Status Collection::insertDocument(OperationContext* txn, const BSONObj& doc, const std::vector<MultiIndexBlock*>& indexBlocks, bool enforceQuota) { + + MONGO_FAIL_POINT_BLOCK(failCollectionInserts, extraData) { + const BSONObj& data = extraData.getData(); + const auto collElem = data["collectionNS"]; + // If the failpoint specifies no collection or matches the existing one, fail. + if (!collElem || _ns == collElem.str()) { + const std::string msg = str::stream() + << "Failpoint (failCollectionInserts) has been enabled (" << data + << "), so rejecting insert: " << doc; + log() << msg; + return {ErrorCodes::FailPointEnabled, msg}; + } + } + { auto status = checkValidation(txn, doc); if (!status.isOK()) diff --git a/src/mongo/db/repl/collection_bulk_loader_impl.cpp b/src/mongo/db/repl/collection_bulk_loader_impl.cpp index cf31132e024..3f8e78d46ca 100644 --- a/src/mongo/db/repl/collection_bulk_loader_impl.cpp +++ b/src/mongo/db/repl/collection_bulk_loader_impl.cpp @@ -46,6 +46,7 @@ #include "mongo/util/destructor_guard.h" #include "mongo/util/log.h" #include "mongo/util/mongoutils/str.h" +#include "mongo/util/scopeguard.h" namespace mongo { namespace repl { @@ -64,8 +65,8 @@ CollectionBulkLoaderImpl::CollectionBulkLoaderImpl(OperationContext* txn, _txn(txn), _coll(coll), _nss{coll->ns()}, - _idIndexBlock{txn, coll}, - _secondaryIndexesBlock{txn, coll}, + _idIndexBlock(stdx::make_unique<MultiIndexBlock>(txn, coll)), + _secondaryIndexesBlock(stdx::make_unique<MultiIndexBlock>(txn, coll)), _idIndexSpec(idIndexSpec) { invariant(txn); invariant(coll); @@ -91,18 +92,21 @@ Status CollectionBulkLoaderImpl::init(OperationContext* txn, invariant(coll); invariant(txn->getClient() == &cc()); if (secondaryIndexSpecs.size()) { - _hasSecondaryIndexes = true; - _secondaryIndexesBlock.ignoreUniqueConstraint(); - auto status = _secondaryIndexesBlock.init(secondaryIndexSpecs); + _secondaryIndexesBlock->ignoreUniqueConstraint(); + auto status = _secondaryIndexesBlock->init(secondaryIndexSpecs); if (!status.isOK()) { return status; } + } else { + _secondaryIndexesBlock.reset(); } if (!_idIndexSpec.isEmpty()) { - auto status = _idIndexBlock.init(_idIndexSpec); + auto status = _idIndexBlock->init(_idIndexSpec); if (!status.isOK()) { return status; } + } else { + _idIndexBlock.reset(); } return Status::OK(); @@ -111,36 +115,37 @@ Status CollectionBulkLoaderImpl::init(OperationContext* txn, Status CollectionBulkLoaderImpl::insertDocuments(const std::vector<BSONObj>::const_iterator begin, const std::vector<BSONObj>::const_iterator end) { int count = 0; - return _runner->runSynchronousTask([begin, end, &count, this](OperationContext* txn) -> Status { - invariant(txn); + return _runTaskReleaseResourcesOnFailure( + [begin, end, &count, this](OperationContext* txn) -> Status { + invariant(txn); - for (auto iter = begin; iter != end; ++iter) { - std::vector<MultiIndexBlock*> indexers; - if (!_idIndexSpec.isEmpty()) { - indexers.push_back(&_idIndexBlock); - } - if (_hasSecondaryIndexes) { - indexers.push_back(&_secondaryIndexesBlock); - } - MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { - WriteUnitOfWork wunit(txn); - const auto status = _coll->insertDocument(txn, *iter, indexers, false); - if (!status.isOK()) { - return status; + for (auto iter = begin; iter != end; ++iter) { + std::vector<MultiIndexBlock*> indexers; + if (_idIndexBlock) { + indexers.push_back(_idIndexBlock.get()); } - wunit.commit(); - } - MONGO_WRITE_CONFLICT_RETRY_LOOP_END( - _txn, "CollectionBulkLoaderImpl::insertDocuments", _nss.ns()); + if (_secondaryIndexesBlock) { + indexers.push_back(_secondaryIndexesBlock.get()); + } + MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { + WriteUnitOfWork wunit(txn); + const auto status = _coll->insertDocument(txn, *iter, indexers, false); + if (!status.isOK()) { + return status; + } + wunit.commit(); + } + MONGO_WRITE_CONFLICT_RETRY_LOOP_END( + _txn, "CollectionBulkLoaderImpl::insertDocuments", _nss.ns()); - ++count; - } - return Status::OK(); - }); + ++count; + } + return Status::OK(); + }); } Status CollectionBulkLoaderImpl::commit() { - return _runner->runSynchronousTask( + return _runTaskReleaseResourcesOnFailure( [this](OperationContext* txn) -> Status { _stats.startBuildingIndexes = Date_t::now(); LOG(2) << "Creating indexes for ns: " << _nss.ns(); @@ -149,9 +154,9 @@ Status CollectionBulkLoaderImpl::commit() { // Commit before deleting dups, so the dups will be removed from secondary indexes when // deleted. - if (_hasSecondaryIndexes) { + if (_secondaryIndexesBlock) { std::set<RecordId> secDups; - auto status = _secondaryIndexesBlock.doneInserting(&secDups); + auto status = _secondaryIndexesBlock->doneInserting(&secDups); if (!status.isOK()) { return status; } @@ -163,18 +168,18 @@ Status CollectionBulkLoaderImpl::commit() { } MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { WriteUnitOfWork wunit(txn); - _secondaryIndexesBlock.commit(); + _secondaryIndexesBlock->commit(); wunit.commit(); } MONGO_WRITE_CONFLICT_RETRY_LOOP_END( _txn, "CollectionBulkLoaderImpl::commit", _nss.ns()); } - if (!_idIndexSpec.isEmpty()) { + if (_idIndexBlock) { // Delete dups. std::set<RecordId> dups; // Do not do inside a WriteUnitOfWork (required by doneInserting). - auto status = _idIndexBlock.doneInserting(&dups); + auto status = _idIndexBlock->doneInserting(&dups); if (!status.isOK()) { return status; } @@ -196,7 +201,7 @@ Status CollectionBulkLoaderImpl::commit() { // Commit _id index, without dups. MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { WriteUnitOfWork wunit(txn); - _idIndexBlock.commit(); + _idIndexBlock->commit(); wunit.commit(); } MONGO_WRITE_CONFLICT_RETRY_LOOP_END( @@ -206,15 +211,40 @@ Status CollectionBulkLoaderImpl::commit() { LOG(2) << "Done creating indexes for ns: " << _nss.ns() << ", stats: " << _stats.toString(); - // release locks. - _autoColl.reset(nullptr); - _autoDB.reset(nullptr); - _coll = nullptr; + _releaseResources(); return Status::OK(); }, TaskRunner::NextAction::kDisposeOperationContext); } +void CollectionBulkLoaderImpl::_releaseResources() { + if (_secondaryIndexesBlock) { + _secondaryIndexesBlock.reset(); + } + + if (_idIndexBlock) { + _idIndexBlock.reset(); + } + + // release locks. + _coll = nullptr; + _autoColl.reset(nullptr); + _autoDB.reset(nullptr); +} + +Status CollectionBulkLoaderImpl::_runTaskReleaseResourcesOnFailure( + TaskRunner::SynchronousTask task, TaskRunner::NextAction nextAction) { + auto newTask = [this, &task](OperationContext* txn) -> Status { + ScopeGuard guard = MakeGuard(&CollectionBulkLoaderImpl::_releaseResources, this); + const auto status = task(txn); + if (status.isOK()) { + guard.Dismiss(); + } + return status; + }; + return _runner->runSynchronousTask(newTask, nextAction); +} + CollectionBulkLoaderImpl::Stats CollectionBulkLoaderImpl::getStats() const { return _stats; } diff --git a/src/mongo/db/repl/collection_bulk_loader_impl.h b/src/mongo/db/repl/collection_bulk_loader_impl.h index 1fa6033a104..b66baeb2bae 100644 --- a/src/mongo/db/repl/collection_bulk_loader_impl.h +++ b/src/mongo/db/repl/collection_bulk_loader_impl.h @@ -84,6 +84,11 @@ public: virtual BSONObj toBSON() const override; private: + void _releaseResources(); + Status _runTaskReleaseResourcesOnFailure( + TaskRunner::SynchronousTask task, + TaskRunner::NextAction nextAction = TaskRunner::NextAction::kKeepOperationContext); + std::unique_ptr<OldThreadPool> _threadPool; std::unique_ptr<TaskRunner> _runner; std::unique_ptr<AutoGetCollection> _autoColl; @@ -91,9 +96,8 @@ private: OperationContext* _txn = nullptr; Collection* _coll = nullptr; NamespaceString _nss; - MultiIndexBlock _idIndexBlock; - MultiIndexBlock _secondaryIndexesBlock; - bool _hasSecondaryIndexes = false; + std::unique_ptr<MultiIndexBlock> _idIndexBlock; + std::unique_ptr<MultiIndexBlock> _secondaryIndexesBlock; BSONObj _idIndexSpec; Stats _stats; }; diff --git a/src/mongo/db/repl/task_runner.cpp b/src/mongo/db/repl/task_runner.cpp index 2f0bc184738..210718bba3e 100644 --- a/src/mongo/db/repl/task_runner.cpp +++ b/src/mongo/db/repl/task_runner.cpp @@ -227,9 +227,7 @@ Status TaskRunner::runSynchronousTask(SynchronousTask func, TaskRunner::NextActi } else { // Run supplied function. try { - log() << "starting to run synchronous task on runner."; returnStatus = func(txn); - log() << "done running the synchronous task."; } catch (...) { returnStatus = exceptionToStatus(); error() << "Exception thrown in runSynchronousTask: " << redact(returnStatus); |