diff options
author | Andy Schwerin <schwerin@mongodb.com> | 2017-06-08 17:32:39 -0400 |
---|---|---|
committer | Andy Schwerin <schwerin@mongodb.com> | 2017-06-08 17:32:39 -0400 |
commit | cce59be2127c2f30d9e98b895c9d79e0a3b2e157 (patch) | |
tree | 1bdcd1b007e60537812ad649412318aa9a2a5f25 | |
parent | 23f38f73d21b2dce00f1bc2239c941ea43c59b03 (diff) | |
download | mongo-cce59be2127c2f30d9e98b895c9d79e0a3b2e157.tar.gz |
SERVER-29492 Remove task runner from collection bulk loader.
-rw-r--r-- | src/mongo/db/repl/collection_bulk_loader.h | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/collection_bulk_loader_impl.cpp | 251 | ||||
-rw-r--r-- | src/mongo/db/repl/collection_bulk_loader_impl.h | 28 | ||||
-rw-r--r-- | src/mongo/db/repl/collection_cloner_test.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/repl/database_cloner_test.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/databases_cloner_test.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/initial_syncer_test.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/storage_interface_impl.cpp | 140 | ||||
-rw-r--r-- | src/mongo/db/repl/storage_interface_mock.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/repl/storage_interface_mock.h | 4 |
10 files changed, 222 insertions, 219 deletions
diff --git a/src/mongo/db/repl/collection_bulk_loader.h b/src/mongo/db/repl/collection_bulk_loader.h index 3884c16673c..dfe7ef0f5bb 100644 --- a/src/mongo/db/repl/collection_bulk_loader.h +++ b/src/mongo/db/repl/collection_bulk_loader.h @@ -45,7 +45,7 @@ class CollectionBulkLoader { public: virtual ~CollectionBulkLoader() = default; - virtual Status init(Collection* coll, const std::vector<BSONObj>& indexSpecs) = 0; + virtual Status init(const std::vector<BSONObj>& indexSpecs) = 0; /** * Inserts the documents into the collection record store, and indexes them with the * MultiIndexBlock on the side. diff --git a/src/mongo/db/repl/collection_bulk_loader_impl.cpp b/src/mongo/db/repl/collection_bulk_loader_impl.cpp index efcaa10231c..abfe67a63cb 100644 --- a/src/mongo/db/repl/collection_bulk_loader_impl.cpp +++ b/src/mongo/db/repl/collection_bulk_loader_impl.cpp @@ -51,51 +51,68 @@ namespace mongo { namespace repl { -CollectionBulkLoaderImpl::CollectionBulkLoaderImpl(OperationContext* opCtx, - Collection* coll, - const BSONObj idIndexSpec, - std::unique_ptr<OldThreadPool> threadPool, - std::unique_ptr<TaskRunner> runner, - std::unique_ptr<AutoGetOrCreateDb> autoDb, - std::unique_ptr<AutoGetCollection> autoColl) - : _threadPool(std::move(threadPool)), - _runner(std::move(runner)), - _autoColl(std::move(autoColl)), - _autoDB(std::move(autoDb)), - _opCtx(opCtx), - _coll(coll), - _nss{coll->ns()}, - _idIndexBlock(stdx::make_unique<MultiIndexBlock>(opCtx, coll)), - _secondaryIndexesBlock(stdx::make_unique<MultiIndexBlock>(opCtx, coll)), - _idIndexSpec(idIndexSpec) { - invariant(opCtx); - invariant(coll); - invariant(_runner); - invariant(_autoDB); - invariant(_autoColl); - invariant(_autoDB->getDb()); - invariant(_autoColl->getDb() == _autoDB->getDb()); +namespace { + +/** + * Utility class to temporarily swap which client is bound to the running thread. + * + * Use this class to bind a client to the current thread for the duration of the + * AlternativeClientRegion's lifetime, restoring the prior client, if any, at the + * end of the block. + */ +class AlternativeClientRegion { +public: + explicit AlternativeClientRegion(ServiceContext::UniqueClient& clientToUse) + : _alternateClient(&clientToUse) { + invariant(clientToUse); + if (Client::getCurrent()) { + _originalClient = Client::releaseCurrent(); + } + Client::setCurrent(std::move(*_alternateClient)); + } + + ~AlternativeClientRegion() { + *_alternateClient = Client::releaseCurrent(); + if (_originalClient) { + Client::setCurrent(std::move(_originalClient)); + } + } + +private: + ServiceContext::UniqueClient _originalClient; + ServiceContext::UniqueClient* const _alternateClient; +}; + +} // namespace + +CollectionBulkLoaderImpl::CollectionBulkLoaderImpl(ServiceContext::UniqueClient&& client, + ServiceContext::UniqueOperationContext&& opCtx, + std::unique_ptr<AutoGetCollection>&& autoColl, + const BSONObj& idIndexSpec) + : _client{std::move(client)}, + _opCtx{std::move(opCtx)}, + _autoColl{std::move(autoColl)}, + _nss{_autoColl->getCollection()->ns()}, + _idIndexBlock(stdx::make_unique<MultiIndexBlock>(_opCtx.get(), _autoColl->getCollection())), + _secondaryIndexesBlock( + stdx::make_unique<MultiIndexBlock>(_opCtx.get(), _autoColl->getCollection())), + _idIndexSpec(idIndexSpec.getOwned()) { + + invariant(_opCtx); + invariant(_autoColl->getCollection()); } CollectionBulkLoaderImpl::~CollectionBulkLoaderImpl() { - DESTRUCTOR_GUARD({ - _releaseResources(); - _runner->cancel(); - _runner->join(); - _threadPool->join(); - }) + AlternativeClientRegion acr(_client); + DESTRUCTOR_GUARD({ _releaseResources(); }) } -Status CollectionBulkLoaderImpl::init(Collection* coll, - const std::vector<BSONObj>& secondaryIndexSpecs) { +Status CollectionBulkLoaderImpl::init(const std::vector<BSONObj>& secondaryIndexSpecs) { return _runTaskReleaseResourcesOnFailure( - [coll, &secondaryIndexSpecs, this](OperationContext* opCtx) -> Status { - invariant(opCtx); - invariant(coll); - invariant(opCtx->getClient() == &cc()); + [ coll = _autoColl->getCollection(), &secondaryIndexSpecs, this ]()->Status { // All writes in CollectionBulkLoaderImpl should be unreplicated. // The opCtx is accessed indirectly through _secondaryIndexesBlock. - UnreplicatedWritesBlock uwb(opCtx); + UnreplicatedWritesBlock uwb(_opCtx.get()); std::vector<BSONObj> specs(secondaryIndexSpecs); // This enforces the buildIndexes setting in the replica set configuration. _secondaryIndexesBlock->removeExistingIndexes(&specs); @@ -124,10 +141,8 @@ Status CollectionBulkLoaderImpl::init(Collection* coll, Status CollectionBulkLoaderImpl::insertDocuments(const std::vector<BSONObj>::const_iterator begin, const std::vector<BSONObj>::const_iterator end) { int count = 0; - return _runTaskReleaseResourcesOnFailure([begin, end, &count, this]( - OperationContext* opCtx) -> Status { - invariant(opCtx); - UnreplicatedWritesBlock uwb(opCtx); + return _runTaskReleaseResourcesOnFailure([&]() -> Status { + UnreplicatedWritesBlock uwb(_opCtx.get()); for (auto iter = begin; iter != end; ++iter) { std::vector<MultiIndexBlock*> indexers; @@ -138,18 +153,20 @@ Status CollectionBulkLoaderImpl::insertDocuments(const std::vector<BSONObj>::con indexers.push_back(_secondaryIndexesBlock.get()); } MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { - WriteUnitOfWork wunit(opCtx); + WriteUnitOfWork wunit(_opCtx.get()); if (!indexers.empty()) { // This flavor of insertDocument will not update any pre-existing indexes, only // the indexers passed in. - const auto status = _coll->insertDocument(opCtx, *iter, indexers, false); + const auto status = _autoColl->getCollection()->insertDocument( + _opCtx.get(), *iter, indexers, false); if (!status.isOK()) { return status; } } else { // For capped collections, we use regular insertDocument, which will update // pre-existing indexes. - const auto status = _coll->insertDocument(opCtx, *iter, nullptr, false, false); + const auto status = _autoColl->getCollection()->insertDocument( + _opCtx.get(), *iter, nullptr, false); if (!status.isOK()) { return status; } @@ -158,7 +175,7 @@ Status CollectionBulkLoaderImpl::insertDocuments(const std::vector<BSONObj>::con wunit.commit(); } MONGO_WRITE_CONFLICT_RETRY_LOOP_END( - _opCtx, "CollectionBulkLoaderImpl::insertDocuments", _nss.ns()); + _opCtx.get(), "CollectionBulkLoaderImpl::insertDocuments", _nss.ns()); ++count; } @@ -167,80 +184,76 @@ Status CollectionBulkLoaderImpl::insertDocuments(const std::vector<BSONObj>::con } Status CollectionBulkLoaderImpl::commit() { - return _runTaskReleaseResourcesOnFailure( - [this](OperationContext* opCtx) -> Status { - _stats.startBuildingIndexes = Date_t::now(); - LOG(2) << "Creating indexes for ns: " << _nss.ns(); - invariant(opCtx->getClient() == &cc()); - invariant(opCtx == _opCtx); - UnreplicatedWritesBlock uwb(opCtx); - - // Commit before deleting dups, so the dups will be removed from secondary indexes when - // deleted. - if (_secondaryIndexesBlock) { - std::set<RecordId> secDups; - auto status = _secondaryIndexesBlock->doneInserting(&secDups); - if (!status.isOK()) { - return status; - } - if (secDups.size()) { - return Status{ErrorCodes::UserDataInconsistent, - str::stream() << "Found " << secDups.size() - << " duplicates on secondary index(es) even though " - "MultiIndexBlock::ignoreUniqueConstraint set."}; - } - MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { - WriteUnitOfWork wunit(opCtx); - _secondaryIndexesBlock->commit(); - wunit.commit(); - } - MONGO_WRITE_CONFLICT_RETRY_LOOP_END( - _opCtx, "CollectionBulkLoaderImpl::commit", _nss.ns()); - } + return _runTaskReleaseResourcesOnFailure([this]() -> Status { + _stats.startBuildingIndexes = Date_t::now(); + LOG(2) << "Creating indexes for ns: " << _nss.ns(); + UnreplicatedWritesBlock uwb(_opCtx.get()); - if (_idIndexBlock) { - // Delete dups. - std::set<RecordId> dups; - // Do not do inside a WriteUnitOfWork (required by doneInserting). - auto status = _idIndexBlock->doneInserting(&dups); - if (!status.isOK()) { - return status; - } + // Commit before deleting dups, so the dups will be removed from secondary indexes when + // deleted. + if (_secondaryIndexesBlock) { + std::set<RecordId> secDups; + auto status = _secondaryIndexesBlock->doneInserting(&secDups); + if (!status.isOK()) { + return status; + } + if (secDups.size()) { + return Status{ErrorCodes::UserDataInconsistent, + str::stream() << "Found " << secDups.size() + << " duplicates on secondary index(es) even though " + "MultiIndexBlock::ignoreUniqueConstraint set."}; + } + MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { + WriteUnitOfWork wunit(_opCtx.get()); + _secondaryIndexesBlock->commit(); + wunit.commit(); + } + MONGO_WRITE_CONFLICT_RETRY_LOOP_END( + _opCtx.get(), "CollectionBulkLoaderImpl::commit", _nss.ns()); + } - for (auto&& it : dups) { - MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { - WriteUnitOfWork wunit(_opCtx); - _coll->deleteDocument(_opCtx, - it, - nullptr /** OpDebug **/, - false /* fromMigrate */, - true /* noWarn */); - wunit.commit(); - } - MONGO_WRITE_CONFLICT_RETRY_LOOP_END( - _opCtx, "CollectionBulkLoaderImpl::commit", _nss.ns()); - } + if (_idIndexBlock) { + // Delete dups. + std::set<RecordId> dups; + // Do not do inside a WriteUnitOfWork (required by doneInserting). + auto status = _idIndexBlock->doneInserting(&dups); + if (!status.isOK()) { + return status; + } - // Commit _id index, without dups. + for (auto&& it : dups) { MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { - WriteUnitOfWork wunit(opCtx); - _idIndexBlock->commit(); + WriteUnitOfWork wunit(_opCtx.get()); + _autoColl->getCollection()->deleteDocument(_opCtx.get(), + it, + nullptr /** OpDebug **/, + false /* fromMigrate */, + true /* noWarn */); wunit.commit(); } MONGO_WRITE_CONFLICT_RETRY_LOOP_END( - _opCtx, "CollectionBulkLoaderImpl::commit", _nss.ns()); + _opCtx.get(), "CollectionBulkLoaderImpl::commit", _nss.ns()); } - _stats.endBuildingIndexes = Date_t::now(); - LOG(2) << "Done creating indexes for ns: " << _nss.ns() - << ", stats: " << _stats.toString(); - _releaseResources(); - return Status::OK(); - }, - TaskRunner::NextAction::kDisposeOperationContext); + // Commit _id index, without dups. + MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { + WriteUnitOfWork wunit(_opCtx.get()); + _idIndexBlock->commit(); + wunit.commit(); + } + MONGO_WRITE_CONFLICT_RETRY_LOOP_END( + _opCtx.get(), "CollectionBulkLoaderImpl::commit", _nss.ns()); + } + _stats.endBuildingIndexes = Date_t::now(); + LOG(2) << "Done creating indexes for ns: " << _nss.ns() << ", stats: " << _stats.toString(); + + _releaseResources(); + return Status::OK(); + }); } void CollectionBulkLoaderImpl::_releaseResources() { + invariant(&cc() == _opCtx->getClient()); if (_secondaryIndexesBlock) { // A valid Client is required to drop unfinished indexes. Client::initThreadIfNotAlready(); @@ -254,22 +267,26 @@ void CollectionBulkLoaderImpl::_releaseResources() { } // release locks. - _coll = nullptr; - _autoColl.reset(nullptr); - _autoDB.reset(nullptr); + _autoColl.reset(); } -Status CollectionBulkLoaderImpl::_runTaskReleaseResourcesOnFailure( - TaskRunner::SynchronousTask task, TaskRunner::NextAction nextAction) { - auto newTask = [this, &task](OperationContext* opCtx) -> Status { - ScopeGuard guard = MakeGuard(&CollectionBulkLoaderImpl::_releaseResources, this); - const auto status = task(opCtx); +template <typename F> +Status CollectionBulkLoaderImpl::_runTaskReleaseResourcesOnFailure(F task) noexcept { + + AlternativeClientRegion acr(_client); + ScopeGuard guard = MakeGuard(&CollectionBulkLoaderImpl::_releaseResources, this); + try { + const auto status = [&task]() noexcept { + return task(); + } + (); if (status.isOK()) { guard.Dismiss(); } return status; - }; - return _runner->runSynchronousTask(newTask, nextAction); + } catch (...) { + std::terminate(); + } } CollectionBulkLoaderImpl::Stats CollectionBulkLoaderImpl::getStats() const { diff --git a/src/mongo/db/repl/collection_bulk_loader_impl.h b/src/mongo/db/repl/collection_bulk_loader_impl.h index 17b4741ec8f..37eb5a9f6e4 100644 --- a/src/mongo/db/repl/collection_bulk_loader_impl.h +++ b/src/mongo/db/repl/collection_bulk_loader_impl.h @@ -38,8 +38,6 @@ #include "mongo/db/namespace_string.h" #include "mongo/db/repl/collection_bulk_loader.h" #include "mongo/db/repl/storage_interface.h" -#include "mongo/db/repl/task_runner.h" -#include "mongo/util/concurrency/old_thread_pool.h" namespace mongo { namespace repl { @@ -61,16 +59,13 @@ public: BSONObj toBSON() const; }; - CollectionBulkLoaderImpl(OperationContext* opCtx, - Collection* coll, - const BSONObj idIndexSpec, - std::unique_ptr<OldThreadPool> threadPool, - std::unique_ptr<TaskRunner> runner, - std::unique_ptr<AutoGetOrCreateDb> autoDB, - std::unique_ptr<AutoGetCollection> autoColl); + CollectionBulkLoaderImpl(ServiceContext::UniqueClient&& client, + ServiceContext::UniqueOperationContext&& opCtx, + std::unique_ptr<AutoGetCollection>&& autoColl, + const BSONObj& idIndexSpec); virtual ~CollectionBulkLoaderImpl(); - virtual Status init(Collection* coll, const std::vector<BSONObj>& secondaryIndexSpecs) override; + virtual Status init(const std::vector<BSONObj>& secondaryIndexSpecs) override; virtual Status insertDocuments(const std::vector<BSONObj>::const_iterator begin, const std::vector<BSONObj>::const_iterator end) override; @@ -83,16 +78,13 @@ public: private: void _releaseResources(); - Status _runTaskReleaseResourcesOnFailure( - TaskRunner::SynchronousTask task, - TaskRunner::NextAction nextAction = TaskRunner::NextAction::kKeepOperationContext); - std::unique_ptr<OldThreadPool> _threadPool; - std::unique_ptr<TaskRunner> _runner; + template <typename F> + Status _runTaskReleaseResourcesOnFailure(F task) noexcept; + + ServiceContext::UniqueClient _client; + ServiceContext::UniqueOperationContext _opCtx; std::unique_ptr<AutoGetCollection> _autoColl; - std::unique_ptr<AutoGetOrCreateDb> _autoDB; - OperationContext* _opCtx = nullptr; - Collection* _coll = nullptr; NamespaceString _nss; std::unique_ptr<MultiIndexBlock> _idIndexBlock; std::unique_ptr<MultiIndexBlock> _secondaryIndexesBlock; diff --git a/src/mongo/db/repl/collection_cloner_test.cpp b/src/mongo/db/repl/collection_cloner_test.cpp index 417159caef4..61ac3482ce3 100644 --- a/src/mongo/db/repl/collection_cloner_test.cpp +++ b/src/mongo/db/repl/collection_cloner_test.cpp @@ -90,8 +90,7 @@ void CollectionClonerTest::setUp() { const CollectionOptions& options, const BSONObj idIndexSpec, const std::vector<BSONObj>& secondaryIndexSpecs) { - (_loader = new CollectionBulkLoaderMock(&collectionStats)) - ->init(nullptr, secondaryIndexSpecs); + (_loader = new CollectionBulkLoaderMock(&collectionStats))->init(secondaryIndexSpecs); return StatusWith<std::unique_ptr<CollectionBulkLoader>>( std::unique_ptr<CollectionBulkLoader>(_loader)); @@ -345,7 +344,7 @@ TEST_F(CollectionClonerTest, DoNotCreateIDIndexIfAutoIndexIdUsed) { collNss = theNss; collOptions = theOptions; collIndexSpecs = theIndexSpecs; - loader->init(nullptr, theIndexSpecs); + loader->init(theIndexSpecs); return std::unique_ptr<CollectionBulkLoader>(loader); }; diff --git a/src/mongo/db/repl/database_cloner_test.cpp b/src/mongo/db/repl/database_cloner_test.cpp index 3db5ce40eb8..364e737fba7 100644 --- a/src/mongo/db/repl/database_cloner_test.cpp +++ b/src/mongo/db/repl/database_cloner_test.cpp @@ -101,7 +101,7 @@ void DatabaseClonerTest::setUp() { const std::vector<BSONObj>& secondaryIndexSpecs) { const auto collInfo = &_collections[nss]; (collInfo->loader = new CollectionBulkLoaderMock(&collInfo->stats)) - ->init(nullptr, secondaryIndexSpecs); + ->init(secondaryIndexSpecs); return StatusWith<std::unique_ptr<CollectionBulkLoader>>( std::unique_ptr<CollectionBulkLoader>(collInfo->loader)); diff --git a/src/mongo/db/repl/databases_cloner_test.cpp b/src/mongo/db/repl/databases_cloner_test.cpp index 8752f30adaa..f6a50786e47 100644 --- a/src/mongo/db/repl/databases_cloner_test.cpp +++ b/src/mongo/db/repl/databases_cloner_test.cpp @@ -176,7 +176,7 @@ protected: log() << "reusing collection during test which may cause problems, ns:" << nss; } (collInfo->loader = new CollectionBulkLoaderMock(&collInfo->stats)) - ->init(nullptr, secondaryIndexSpecs); + ->init(secondaryIndexSpecs); return StatusWith<std::unique_ptr<CollectionBulkLoader>>( std::unique_ptr<CollectionBulkLoader>(collInfo->loader)); diff --git a/src/mongo/db/repl/initial_syncer_test.cpp b/src/mongo/db/repl/initial_syncer_test.cpp index 6dd7bfd5f66..570c57840e5 100644 --- a/src/mongo/db/repl/initial_syncer_test.cpp +++ b/src/mongo/db/repl/initial_syncer_test.cpp @@ -269,7 +269,7 @@ protected: log() << "reusing collection during test which may cause problems, ns:" << nss; } (collInfo->loader = new CollectionBulkLoaderMock(&collInfo->stats)) - ->init(nullptr, secondaryIndexSpecs); + ->init(secondaryIndexSpecs); return StatusWith<std::unique_ptr<CollectionBulkLoader>>( std::unique_ptr<CollectionBulkLoader>(collInfo->loader)); diff --git a/src/mongo/db/repl/storage_interface_impl.cpp b/src/mongo/db/repl/storage_interface_impl.cpp index 17d1077c559..09fc5c8eaa2 100644 --- a/src/mongo/db/repl/storage_interface_impl.cpp +++ b/src/mongo/db/repl/storage_interface_impl.cpp @@ -70,10 +70,8 @@ #include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/repl/replication_coordinator_global.h" #include "mongo/db/repl/rollback_gen.h" -#include "mongo/db/repl/task_runner.h" #include "mongo/db/service_context.h" #include "mongo/util/assert_util.h" -#include "mongo/util/concurrency/old_thread_pool.h" #include "mongo/util/log.h" #include "mongo/util/mongoutils/str.h" @@ -169,95 +167,95 @@ StorageInterfaceImpl::createCollectionForBulkLoading( const std::vector<BSONObj>& secondaryIndexSpecs) { LOG(2) << "StorageInterfaceImpl::createCollectionForBulkLoading called for ns: " << nss.ns(); - auto threadPool = - stdx::make_unique<OldThreadPool>(1, str::stream() << "InitialSyncInserters-" << nss.ns()); - std::unique_ptr<TaskRunner> runner = stdx::make_unique<TaskRunner>(threadPool.get()); - // Setup cond_var for signalling when done. - std::unique_ptr<CollectionBulkLoader> loaderToReturn; - Collection* collection; + class StashClient { + public: + StashClient() { + if (Client::getCurrent()) { + _stashedClient = Client::releaseCurrent(); + } + } + ~StashClient() { + if (Client::getCurrent()) { + Client::releaseCurrent(); + } + if (_stashedClient) { + Client::setCurrent(std::move(_stashedClient)); + } + } - auto status = runner->runSynchronousTask([&](OperationContext* opCtx) -> Status { - // We are not replicating nor validating writes under this OperationContext*. - // The OperationContext* is used for all writes to the (newly) cloned collection. - UnreplicatedWritesBlock uwb(opCtx); - documentValidationDisabled(opCtx) = true; + private: + ServiceContext::UniqueClient _stashedClient; + } stash; + Client::setCurrent( + getGlobalServiceContext()->makeClient(str::stream() << nss.ns() << " loader")); + auto opCtx = cc().makeOperationContext(); - // Retry if WCE. - MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { - // Get locks and create the collection. - auto db = stdx::make_unique<AutoGetOrCreateDb>(opCtx, nss.db(), MODE_IX); - auto coll = stdx::make_unique<AutoGetCollection>(opCtx, nss, MODE_X); - collection = coll->getCollection(); + // We are not replicating nor validating writes under this OperationContext*. + // The OperationContext* is used for all writes to the (newly) cloned collection. + UnreplicatedWritesBlock uwb(opCtx.get()); + documentValidationDisabled(opCtx.get()) = true; - if (collection) { - return {ErrorCodes::NamespaceExists, "Collection already exists."}; - } + std::unique_ptr<AutoGetCollection> autoColl; + // Retry if WCE. + MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { + // Get locks and create the collection. + AutoGetOrCreateDb db(opCtx.get(), nss.db(), MODE_X); + AutoGetCollection coll(opCtx.get(), nss, MODE_IX); + if (coll.getCollection()) { + return {ErrorCodes::NamespaceExists, + str::stream() << "Collection " << nss.ns() << " already exists."}; + } + { // Create the collection. - WriteUnitOfWork wunit(opCtx); - collection = db->getDb()->createCollection(opCtx, nss.ns(), options, false); - invariant(collection); + WriteUnitOfWork wunit(opCtx.get()); + fassert(40332, db.getDb()->createCollection(opCtx.get(), nss.ns(), options, false)); wunit.commit(); + } - // Build empty capped indexes. Capped indexes cannot be build by the MultiIndexBlock - // because the cap might delete documents off the back while we are inserting them into - // the front. - if (options.capped) { - WriteUnitOfWork wunit(opCtx); - if (!idIndexSpec.isEmpty()) { - auto status = collection->getIndexCatalog()->createIndexOnEmptyCollection( - opCtx, idIndexSpec); - if (!status.getStatus().isOK()) { - return status.getStatus(); - } + autoColl = stdx::make_unique<AutoGetCollection>(opCtx.get(), nss, MODE_IX); + + // Build empty capped indexes. Capped indexes cannot be built by the MultiIndexBlock + // because the cap might delete documents off the back while we are inserting them into + // the front. + if (options.capped) { + WriteUnitOfWork wunit(opCtx.get()); + if (!idIndexSpec.isEmpty()) { + auto status = + autoColl->getCollection()->getIndexCatalog()->createIndexOnEmptyCollection( + opCtx.get(), idIndexSpec); + if (!status.getStatus().isOK()) { + return status.getStatus(); } - for (auto&& spec : secondaryIndexSpecs) { - auto status = - collection->getIndexCatalog()->createIndexOnEmptyCollection(opCtx, spec); - if (!status.getStatus().isOK()) { - return status.getStatus(); - } + } + for (auto&& spec : secondaryIndexSpecs) { + auto status = + autoColl->getCollection()->getIndexCatalog()->createIndexOnEmptyCollection( + opCtx.get(), spec); + if (!status.getStatus().isOK()) { + return status.getStatus(); } - wunit.commit(); } - - - coll = stdx::make_unique<AutoGetCollection>(opCtx, nss, MODE_IX); - - // Move locks into loader, so it now controls their lifetime. - auto loader = stdx::make_unique<CollectionBulkLoaderImpl>(opCtx, - collection, - options.capped ? BSONObj() - : idIndexSpec, - std::move(threadPool), - std::move(runner), - std::move(db), - std::move(coll)); - - // Move the loader into the StatusWith. - loaderToReturn = std::move(loader); - return Status::OK(); + wunit.commit(); } - MONGO_WRITE_CONFLICT_RETRY_LOOP_END(opCtx, "beginCollectionClone", nss.ns()); - MONGO_UNREACHABLE; - }); - - if (!status.isOK()) { - return status; } + MONGO_WRITE_CONFLICT_RETRY_LOOP_END(opCtx.get(), "beginCollectionClone", nss.ns()); - invariant(collection); + // Move locks into loader, so it now controls their lifetime. + auto loader = + stdx::make_unique<CollectionBulkLoaderImpl>(Client::releaseCurrent(), + std::move(opCtx), + std::move(autoColl), + options.capped ? BSONObj() : idIndexSpec); - status = loaderToReturn->init(collection, - options.capped ? std::vector<BSONObj>() : secondaryIndexSpecs); + auto status = loader->init(options.capped ? std::vector<BSONObj>() : secondaryIndexSpecs); if (!status.isOK()) { return status; } - return std::move(loaderToReturn); + return {std::move(loader)}; } - Status StorageInterfaceImpl::insertDocument(OperationContext* opCtx, const NamespaceString& nss, const BSONObj& doc) { diff --git a/src/mongo/db/repl/storage_interface_mock.cpp b/src/mongo/db/repl/storage_interface_mock.cpp index 64ebecdc573..89fb5831083 100644 --- a/src/mongo/db/repl/storage_interface_mock.cpp +++ b/src/mongo/db/repl/storage_interface_mock.cpp @@ -67,11 +67,10 @@ Status StorageInterfaceMock::incrementRollbackID(OperationContext* opCtx) { _rbid++; return Status::OK(); } -Status CollectionBulkLoaderMock::init(Collection* coll, - const std::vector<BSONObj>& secondaryIndexSpecs) { +Status CollectionBulkLoaderMock::init(const std::vector<BSONObj>& secondaryIndexSpecs) { LOG(1) << "CollectionBulkLoaderMock::init called"; stats->initCalled = true; - return initFn(coll, secondaryIndexSpecs); + return Status::OK(); }; Status CollectionBulkLoaderMock::insertDocuments(const std::vector<BSONObj>::const_iterator begin, diff --git a/src/mongo/db/repl/storage_interface_mock.h b/src/mongo/db/repl/storage_interface_mock.h index 74a6bac4abc..fc6efeb989b 100644 --- a/src/mongo/db/repl/storage_interface_mock.h +++ b/src/mongo/db/repl/storage_interface_mock.h @@ -53,7 +53,7 @@ class CollectionBulkLoaderMock : public CollectionBulkLoader { public: CollectionBulkLoaderMock(CollectionMockStats* collStats) : stats(collStats){}; virtual ~CollectionBulkLoaderMock() = default; - virtual Status init(Collection* coll, const std::vector<BSONObj>& secondaryIndexSpecs) override; + virtual Status init(const std::vector<BSONObj>& secondaryIndexSpecs) override; virtual Status insertDocuments(const std::vector<BSONObj>::const_iterator begin, const std::vector<BSONObj>::const_iterator end) override; @@ -75,8 +75,6 @@ public: const std::vector<BSONObj>::const_iterator) { return Status::OK(); }; stdx::function<Status()> abortFn = []() { return Status::OK(); }; stdx::function<Status()> commitFn = []() { return Status::OK(); }; - stdx::function<Status(Collection* coll, const std::vector<BSONObj>& secondaryIndexSpecs)> - initFn = [](Collection*, const std::vector<BSONObj>&) { return Status::OK(); }; }; class StorageInterfaceMock : public StorageInterface { |