summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndy Schwerin <schwerin@mongodb.com>2017-06-08 17:32:39 -0400
committerAndy Schwerin <schwerin@mongodb.com>2017-06-08 17:32:39 -0400
commitcce59be2127c2f30d9e98b895c9d79e0a3b2e157 (patch)
tree1bdcd1b007e60537812ad649412318aa9a2a5f25
parent23f38f73d21b2dce00f1bc2239c941ea43c59b03 (diff)
downloadmongo-cce59be2127c2f30d9e98b895c9d79e0a3b2e157.tar.gz
SERVER-29492 Remove task runner from collection bulk loader.
-rw-r--r--src/mongo/db/repl/collection_bulk_loader.h2
-rw-r--r--src/mongo/db/repl/collection_bulk_loader_impl.cpp251
-rw-r--r--src/mongo/db/repl/collection_bulk_loader_impl.h28
-rw-r--r--src/mongo/db/repl/collection_cloner_test.cpp5
-rw-r--r--src/mongo/db/repl/database_cloner_test.cpp2
-rw-r--r--src/mongo/db/repl/databases_cloner_test.cpp2
-rw-r--r--src/mongo/db/repl/initial_syncer_test.cpp2
-rw-r--r--src/mongo/db/repl/storage_interface_impl.cpp140
-rw-r--r--src/mongo/db/repl/storage_interface_mock.cpp5
-rw-r--r--src/mongo/db/repl/storage_interface_mock.h4
10 files changed, 222 insertions, 219 deletions
diff --git a/src/mongo/db/repl/collection_bulk_loader.h b/src/mongo/db/repl/collection_bulk_loader.h
index 3884c16673c..dfe7ef0f5bb 100644
--- a/src/mongo/db/repl/collection_bulk_loader.h
+++ b/src/mongo/db/repl/collection_bulk_loader.h
@@ -45,7 +45,7 @@ class CollectionBulkLoader {
public:
virtual ~CollectionBulkLoader() = default;
- virtual Status init(Collection* coll, const std::vector<BSONObj>& indexSpecs) = 0;
+ virtual Status init(const std::vector<BSONObj>& indexSpecs) = 0;
/**
* Inserts the documents into the collection record store, and indexes them with the
* MultiIndexBlock on the side.
diff --git a/src/mongo/db/repl/collection_bulk_loader_impl.cpp b/src/mongo/db/repl/collection_bulk_loader_impl.cpp
index efcaa10231c..abfe67a63cb 100644
--- a/src/mongo/db/repl/collection_bulk_loader_impl.cpp
+++ b/src/mongo/db/repl/collection_bulk_loader_impl.cpp
@@ -51,51 +51,68 @@
namespace mongo {
namespace repl {
-CollectionBulkLoaderImpl::CollectionBulkLoaderImpl(OperationContext* opCtx,
- Collection* coll,
- const BSONObj idIndexSpec,
- std::unique_ptr<OldThreadPool> threadPool,
- std::unique_ptr<TaskRunner> runner,
- std::unique_ptr<AutoGetOrCreateDb> autoDb,
- std::unique_ptr<AutoGetCollection> autoColl)
- : _threadPool(std::move(threadPool)),
- _runner(std::move(runner)),
- _autoColl(std::move(autoColl)),
- _autoDB(std::move(autoDb)),
- _opCtx(opCtx),
- _coll(coll),
- _nss{coll->ns()},
- _idIndexBlock(stdx::make_unique<MultiIndexBlock>(opCtx, coll)),
- _secondaryIndexesBlock(stdx::make_unique<MultiIndexBlock>(opCtx, coll)),
- _idIndexSpec(idIndexSpec) {
- invariant(opCtx);
- invariant(coll);
- invariant(_runner);
- invariant(_autoDB);
- invariant(_autoColl);
- invariant(_autoDB->getDb());
- invariant(_autoColl->getDb() == _autoDB->getDb());
+namespace {
+
+/**
+ * Utility class to temporarily swap which client is bound to the running thread.
+ *
+ * Use this class to bind a client to the current thread for the duration of the
+ * AlternativeClientRegion's lifetime, restoring the prior client, if any, at the
+ * end of the block.
+ */
+class AlternativeClientRegion {
+public:
+ explicit AlternativeClientRegion(ServiceContext::UniqueClient& clientToUse)
+ : _alternateClient(&clientToUse) {
+ invariant(clientToUse);
+ if (Client::getCurrent()) {
+ _originalClient = Client::releaseCurrent();
+ }
+ Client::setCurrent(std::move(*_alternateClient));
+ }
+
+ ~AlternativeClientRegion() {
+ *_alternateClient = Client::releaseCurrent();
+ if (_originalClient) {
+ Client::setCurrent(std::move(_originalClient));
+ }
+ }
+
+private:
+ ServiceContext::UniqueClient _originalClient;
+ ServiceContext::UniqueClient* const _alternateClient;
+};
+
+} // namespace
+
+CollectionBulkLoaderImpl::CollectionBulkLoaderImpl(ServiceContext::UniqueClient&& client,
+ ServiceContext::UniqueOperationContext&& opCtx,
+ std::unique_ptr<AutoGetCollection>&& autoColl,
+ const BSONObj& idIndexSpec)
+ : _client{std::move(client)},
+ _opCtx{std::move(opCtx)},
+ _autoColl{std::move(autoColl)},
+ _nss{_autoColl->getCollection()->ns()},
+ _idIndexBlock(stdx::make_unique<MultiIndexBlock>(_opCtx.get(), _autoColl->getCollection())),
+ _secondaryIndexesBlock(
+ stdx::make_unique<MultiIndexBlock>(_opCtx.get(), _autoColl->getCollection())),
+ _idIndexSpec(idIndexSpec.getOwned()) {
+
+ invariant(_opCtx);
+ invariant(_autoColl->getCollection());
}
CollectionBulkLoaderImpl::~CollectionBulkLoaderImpl() {
- DESTRUCTOR_GUARD({
- _releaseResources();
- _runner->cancel();
- _runner->join();
- _threadPool->join();
- })
+ AlternativeClientRegion acr(_client);
+ DESTRUCTOR_GUARD({ _releaseResources(); })
}
-Status CollectionBulkLoaderImpl::init(Collection* coll,
- const std::vector<BSONObj>& secondaryIndexSpecs) {
+Status CollectionBulkLoaderImpl::init(const std::vector<BSONObj>& secondaryIndexSpecs) {
return _runTaskReleaseResourcesOnFailure(
- [coll, &secondaryIndexSpecs, this](OperationContext* opCtx) -> Status {
- invariant(opCtx);
- invariant(coll);
- invariant(opCtx->getClient() == &cc());
+ [ coll = _autoColl->getCollection(), &secondaryIndexSpecs, this ]()->Status {
// All writes in CollectionBulkLoaderImpl should be unreplicated.
// The opCtx is accessed indirectly through _secondaryIndexesBlock.
- UnreplicatedWritesBlock uwb(opCtx);
+ UnreplicatedWritesBlock uwb(_opCtx.get());
std::vector<BSONObj> specs(secondaryIndexSpecs);
// This enforces the buildIndexes setting in the replica set configuration.
_secondaryIndexesBlock->removeExistingIndexes(&specs);
@@ -124,10 +141,8 @@ Status CollectionBulkLoaderImpl::init(Collection* coll,
Status CollectionBulkLoaderImpl::insertDocuments(const std::vector<BSONObj>::const_iterator begin,
const std::vector<BSONObj>::const_iterator end) {
int count = 0;
- return _runTaskReleaseResourcesOnFailure([begin, end, &count, this](
- OperationContext* opCtx) -> Status {
- invariant(opCtx);
- UnreplicatedWritesBlock uwb(opCtx);
+ return _runTaskReleaseResourcesOnFailure([&]() -> Status {
+ UnreplicatedWritesBlock uwb(_opCtx.get());
for (auto iter = begin; iter != end; ++iter) {
std::vector<MultiIndexBlock*> indexers;
@@ -138,18 +153,20 @@ Status CollectionBulkLoaderImpl::insertDocuments(const std::vector<BSONObj>::con
indexers.push_back(_secondaryIndexesBlock.get());
}
MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
- WriteUnitOfWork wunit(opCtx);
+ WriteUnitOfWork wunit(_opCtx.get());
if (!indexers.empty()) {
// This flavor of insertDocument will not update any pre-existing indexes, only
// the indexers passed in.
- const auto status = _coll->insertDocument(opCtx, *iter, indexers, false);
+ const auto status = _autoColl->getCollection()->insertDocument(
+ _opCtx.get(), *iter, indexers, false);
if (!status.isOK()) {
return status;
}
} else {
// For capped collections, we use regular insertDocument, which will update
// pre-existing indexes.
- const auto status = _coll->insertDocument(opCtx, *iter, nullptr, false, false);
+ const auto status = _autoColl->getCollection()->insertDocument(
+ _opCtx.get(), *iter, nullptr, false);
if (!status.isOK()) {
return status;
}
@@ -158,7 +175,7 @@ Status CollectionBulkLoaderImpl::insertDocuments(const std::vector<BSONObj>::con
wunit.commit();
}
MONGO_WRITE_CONFLICT_RETRY_LOOP_END(
- _opCtx, "CollectionBulkLoaderImpl::insertDocuments", _nss.ns());
+ _opCtx.get(), "CollectionBulkLoaderImpl::insertDocuments", _nss.ns());
++count;
}
@@ -167,80 +184,76 @@ Status CollectionBulkLoaderImpl::insertDocuments(const std::vector<BSONObj>::con
}
Status CollectionBulkLoaderImpl::commit() {
- return _runTaskReleaseResourcesOnFailure(
- [this](OperationContext* opCtx) -> Status {
- _stats.startBuildingIndexes = Date_t::now();
- LOG(2) << "Creating indexes for ns: " << _nss.ns();
- invariant(opCtx->getClient() == &cc());
- invariant(opCtx == _opCtx);
- UnreplicatedWritesBlock uwb(opCtx);
-
- // Commit before deleting dups, so the dups will be removed from secondary indexes when
- // deleted.
- if (_secondaryIndexesBlock) {
- std::set<RecordId> secDups;
- auto status = _secondaryIndexesBlock->doneInserting(&secDups);
- if (!status.isOK()) {
- return status;
- }
- if (secDups.size()) {
- return Status{ErrorCodes::UserDataInconsistent,
- str::stream() << "Found " << secDups.size()
- << " duplicates on secondary index(es) even though "
- "MultiIndexBlock::ignoreUniqueConstraint set."};
- }
- MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
- WriteUnitOfWork wunit(opCtx);
- _secondaryIndexesBlock->commit();
- wunit.commit();
- }
- MONGO_WRITE_CONFLICT_RETRY_LOOP_END(
- _opCtx, "CollectionBulkLoaderImpl::commit", _nss.ns());
- }
+ return _runTaskReleaseResourcesOnFailure([this]() -> Status {
+ _stats.startBuildingIndexes = Date_t::now();
+ LOG(2) << "Creating indexes for ns: " << _nss.ns();
+ UnreplicatedWritesBlock uwb(_opCtx.get());
- if (_idIndexBlock) {
- // Delete dups.
- std::set<RecordId> dups;
- // Do not do inside a WriteUnitOfWork (required by doneInserting).
- auto status = _idIndexBlock->doneInserting(&dups);
- if (!status.isOK()) {
- return status;
- }
+ // Commit before deleting dups, so the dups will be removed from secondary indexes when
+ // deleted.
+ if (_secondaryIndexesBlock) {
+ std::set<RecordId> secDups;
+ auto status = _secondaryIndexesBlock->doneInserting(&secDups);
+ if (!status.isOK()) {
+ return status;
+ }
+ if (secDups.size()) {
+ return Status{ErrorCodes::UserDataInconsistent,
+ str::stream() << "Found " << secDups.size()
+ << " duplicates on secondary index(es) even though "
+ "MultiIndexBlock::ignoreUniqueConstraint set."};
+ }
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
+ WriteUnitOfWork wunit(_opCtx.get());
+ _secondaryIndexesBlock->commit();
+ wunit.commit();
+ }
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_END(
+ _opCtx.get(), "CollectionBulkLoaderImpl::commit", _nss.ns());
+ }
- for (auto&& it : dups) {
- MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
- WriteUnitOfWork wunit(_opCtx);
- _coll->deleteDocument(_opCtx,
- it,
- nullptr /** OpDebug **/,
- false /* fromMigrate */,
- true /* noWarn */);
- wunit.commit();
- }
- MONGO_WRITE_CONFLICT_RETRY_LOOP_END(
- _opCtx, "CollectionBulkLoaderImpl::commit", _nss.ns());
- }
+ if (_idIndexBlock) {
+ // Delete dups.
+ std::set<RecordId> dups;
+ // Do not do inside a WriteUnitOfWork (required by doneInserting).
+ auto status = _idIndexBlock->doneInserting(&dups);
+ if (!status.isOK()) {
+ return status;
+ }
- // Commit _id index, without dups.
+ for (auto&& it : dups) {
MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
- WriteUnitOfWork wunit(opCtx);
- _idIndexBlock->commit();
+ WriteUnitOfWork wunit(_opCtx.get());
+ _autoColl->getCollection()->deleteDocument(_opCtx.get(),
+ it,
+ nullptr /** OpDebug **/,
+ false /* fromMigrate */,
+ true /* noWarn */);
wunit.commit();
}
MONGO_WRITE_CONFLICT_RETRY_LOOP_END(
- _opCtx, "CollectionBulkLoaderImpl::commit", _nss.ns());
+ _opCtx.get(), "CollectionBulkLoaderImpl::commit", _nss.ns());
}
- _stats.endBuildingIndexes = Date_t::now();
- LOG(2) << "Done creating indexes for ns: " << _nss.ns()
- << ", stats: " << _stats.toString();
- _releaseResources();
- return Status::OK();
- },
- TaskRunner::NextAction::kDisposeOperationContext);
+ // Commit _id index, without dups.
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
+ WriteUnitOfWork wunit(_opCtx.get());
+ _idIndexBlock->commit();
+ wunit.commit();
+ }
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_END(
+ _opCtx.get(), "CollectionBulkLoaderImpl::commit", _nss.ns());
+ }
+ _stats.endBuildingIndexes = Date_t::now();
+ LOG(2) << "Done creating indexes for ns: " << _nss.ns() << ", stats: " << _stats.toString();
+
+ _releaseResources();
+ return Status::OK();
+ });
}
void CollectionBulkLoaderImpl::_releaseResources() {
+ invariant(&cc() == _opCtx->getClient());
if (_secondaryIndexesBlock) {
// A valid Client is required to drop unfinished indexes.
Client::initThreadIfNotAlready();
@@ -254,22 +267,26 @@ void CollectionBulkLoaderImpl::_releaseResources() {
}
// release locks.
- _coll = nullptr;
- _autoColl.reset(nullptr);
- _autoDB.reset(nullptr);
+ _autoColl.reset();
}
-Status CollectionBulkLoaderImpl::_runTaskReleaseResourcesOnFailure(
- TaskRunner::SynchronousTask task, TaskRunner::NextAction nextAction) {
- auto newTask = [this, &task](OperationContext* opCtx) -> Status {
- ScopeGuard guard = MakeGuard(&CollectionBulkLoaderImpl::_releaseResources, this);
- const auto status = task(opCtx);
+template <typename F>
+Status CollectionBulkLoaderImpl::_runTaskReleaseResourcesOnFailure(F task) noexcept {
+
+ AlternativeClientRegion acr(_client);
+ ScopeGuard guard = MakeGuard(&CollectionBulkLoaderImpl::_releaseResources, this);
+ try {
+ const auto status = [&task]() noexcept {
+ return task();
+ }
+ ();
if (status.isOK()) {
guard.Dismiss();
}
return status;
- };
- return _runner->runSynchronousTask(newTask, nextAction);
+ } catch (...) {
+ std::terminate();
+ }
}
CollectionBulkLoaderImpl::Stats CollectionBulkLoaderImpl::getStats() const {
diff --git a/src/mongo/db/repl/collection_bulk_loader_impl.h b/src/mongo/db/repl/collection_bulk_loader_impl.h
index 17b4741ec8f..37eb5a9f6e4 100644
--- a/src/mongo/db/repl/collection_bulk_loader_impl.h
+++ b/src/mongo/db/repl/collection_bulk_loader_impl.h
@@ -38,8 +38,6 @@
#include "mongo/db/namespace_string.h"
#include "mongo/db/repl/collection_bulk_loader.h"
#include "mongo/db/repl/storage_interface.h"
-#include "mongo/db/repl/task_runner.h"
-#include "mongo/util/concurrency/old_thread_pool.h"
namespace mongo {
namespace repl {
@@ -61,16 +59,13 @@ public:
BSONObj toBSON() const;
};
- CollectionBulkLoaderImpl(OperationContext* opCtx,
- Collection* coll,
- const BSONObj idIndexSpec,
- std::unique_ptr<OldThreadPool> threadPool,
- std::unique_ptr<TaskRunner> runner,
- std::unique_ptr<AutoGetOrCreateDb> autoDB,
- std::unique_ptr<AutoGetCollection> autoColl);
+ CollectionBulkLoaderImpl(ServiceContext::UniqueClient&& client,
+ ServiceContext::UniqueOperationContext&& opCtx,
+ std::unique_ptr<AutoGetCollection>&& autoColl,
+ const BSONObj& idIndexSpec);
virtual ~CollectionBulkLoaderImpl();
- virtual Status init(Collection* coll, const std::vector<BSONObj>& secondaryIndexSpecs) override;
+ virtual Status init(const std::vector<BSONObj>& secondaryIndexSpecs) override;
virtual Status insertDocuments(const std::vector<BSONObj>::const_iterator begin,
const std::vector<BSONObj>::const_iterator end) override;
@@ -83,16 +78,13 @@ public:
private:
void _releaseResources();
- Status _runTaskReleaseResourcesOnFailure(
- TaskRunner::SynchronousTask task,
- TaskRunner::NextAction nextAction = TaskRunner::NextAction::kKeepOperationContext);
- std::unique_ptr<OldThreadPool> _threadPool;
- std::unique_ptr<TaskRunner> _runner;
+ template <typename F>
+ Status _runTaskReleaseResourcesOnFailure(F task) noexcept;
+
+ ServiceContext::UniqueClient _client;
+ ServiceContext::UniqueOperationContext _opCtx;
std::unique_ptr<AutoGetCollection> _autoColl;
- std::unique_ptr<AutoGetOrCreateDb> _autoDB;
- OperationContext* _opCtx = nullptr;
- Collection* _coll = nullptr;
NamespaceString _nss;
std::unique_ptr<MultiIndexBlock> _idIndexBlock;
std::unique_ptr<MultiIndexBlock> _secondaryIndexesBlock;
diff --git a/src/mongo/db/repl/collection_cloner_test.cpp b/src/mongo/db/repl/collection_cloner_test.cpp
index 417159caef4..61ac3482ce3 100644
--- a/src/mongo/db/repl/collection_cloner_test.cpp
+++ b/src/mongo/db/repl/collection_cloner_test.cpp
@@ -90,8 +90,7 @@ void CollectionClonerTest::setUp() {
const CollectionOptions& options,
const BSONObj idIndexSpec,
const std::vector<BSONObj>& secondaryIndexSpecs) {
- (_loader = new CollectionBulkLoaderMock(&collectionStats))
- ->init(nullptr, secondaryIndexSpecs);
+ (_loader = new CollectionBulkLoaderMock(&collectionStats))->init(secondaryIndexSpecs);
return StatusWith<std::unique_ptr<CollectionBulkLoader>>(
std::unique_ptr<CollectionBulkLoader>(_loader));
@@ -345,7 +344,7 @@ TEST_F(CollectionClonerTest, DoNotCreateIDIndexIfAutoIndexIdUsed) {
collNss = theNss;
collOptions = theOptions;
collIndexSpecs = theIndexSpecs;
- loader->init(nullptr, theIndexSpecs);
+ loader->init(theIndexSpecs);
return std::unique_ptr<CollectionBulkLoader>(loader);
};
diff --git a/src/mongo/db/repl/database_cloner_test.cpp b/src/mongo/db/repl/database_cloner_test.cpp
index 3db5ce40eb8..364e737fba7 100644
--- a/src/mongo/db/repl/database_cloner_test.cpp
+++ b/src/mongo/db/repl/database_cloner_test.cpp
@@ -101,7 +101,7 @@ void DatabaseClonerTest::setUp() {
const std::vector<BSONObj>& secondaryIndexSpecs) {
const auto collInfo = &_collections[nss];
(collInfo->loader = new CollectionBulkLoaderMock(&collInfo->stats))
- ->init(nullptr, secondaryIndexSpecs);
+ ->init(secondaryIndexSpecs);
return StatusWith<std::unique_ptr<CollectionBulkLoader>>(
std::unique_ptr<CollectionBulkLoader>(collInfo->loader));
diff --git a/src/mongo/db/repl/databases_cloner_test.cpp b/src/mongo/db/repl/databases_cloner_test.cpp
index 8752f30adaa..f6a50786e47 100644
--- a/src/mongo/db/repl/databases_cloner_test.cpp
+++ b/src/mongo/db/repl/databases_cloner_test.cpp
@@ -176,7 +176,7 @@ protected:
log() << "reusing collection during test which may cause problems, ns:" << nss;
}
(collInfo->loader = new CollectionBulkLoaderMock(&collInfo->stats))
- ->init(nullptr, secondaryIndexSpecs);
+ ->init(secondaryIndexSpecs);
return StatusWith<std::unique_ptr<CollectionBulkLoader>>(
std::unique_ptr<CollectionBulkLoader>(collInfo->loader));
diff --git a/src/mongo/db/repl/initial_syncer_test.cpp b/src/mongo/db/repl/initial_syncer_test.cpp
index 6dd7bfd5f66..570c57840e5 100644
--- a/src/mongo/db/repl/initial_syncer_test.cpp
+++ b/src/mongo/db/repl/initial_syncer_test.cpp
@@ -269,7 +269,7 @@ protected:
log() << "reusing collection during test which may cause problems, ns:" << nss;
}
(collInfo->loader = new CollectionBulkLoaderMock(&collInfo->stats))
- ->init(nullptr, secondaryIndexSpecs);
+ ->init(secondaryIndexSpecs);
return StatusWith<std::unique_ptr<CollectionBulkLoader>>(
std::unique_ptr<CollectionBulkLoader>(collInfo->loader));
diff --git a/src/mongo/db/repl/storage_interface_impl.cpp b/src/mongo/db/repl/storage_interface_impl.cpp
index 17d1077c559..09fc5c8eaa2 100644
--- a/src/mongo/db/repl/storage_interface_impl.cpp
+++ b/src/mongo/db/repl/storage_interface_impl.cpp
@@ -70,10 +70,8 @@
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/repl/replication_coordinator_global.h"
#include "mongo/db/repl/rollback_gen.h"
-#include "mongo/db/repl/task_runner.h"
#include "mongo/db/service_context.h"
#include "mongo/util/assert_util.h"
-#include "mongo/util/concurrency/old_thread_pool.h"
#include "mongo/util/log.h"
#include "mongo/util/mongoutils/str.h"
@@ -169,95 +167,95 @@ StorageInterfaceImpl::createCollectionForBulkLoading(
const std::vector<BSONObj>& secondaryIndexSpecs) {
LOG(2) << "StorageInterfaceImpl::createCollectionForBulkLoading called for ns: " << nss.ns();
- auto threadPool =
- stdx::make_unique<OldThreadPool>(1, str::stream() << "InitialSyncInserters-" << nss.ns());
- std::unique_ptr<TaskRunner> runner = stdx::make_unique<TaskRunner>(threadPool.get());
- // Setup cond_var for signalling when done.
- std::unique_ptr<CollectionBulkLoader> loaderToReturn;
- Collection* collection;
+ class StashClient {
+ public:
+ StashClient() {
+ if (Client::getCurrent()) {
+ _stashedClient = Client::releaseCurrent();
+ }
+ }
+ ~StashClient() {
+ if (Client::getCurrent()) {
+ Client::releaseCurrent();
+ }
+ if (_stashedClient) {
+ Client::setCurrent(std::move(_stashedClient));
+ }
+ }
- auto status = runner->runSynchronousTask([&](OperationContext* opCtx) -> Status {
- // We are not replicating nor validating writes under this OperationContext*.
- // The OperationContext* is used for all writes to the (newly) cloned collection.
- UnreplicatedWritesBlock uwb(opCtx);
- documentValidationDisabled(opCtx) = true;
+ private:
+ ServiceContext::UniqueClient _stashedClient;
+ } stash;
+ Client::setCurrent(
+ getGlobalServiceContext()->makeClient(str::stream() << nss.ns() << " loader"));
+ auto opCtx = cc().makeOperationContext();
- // Retry if WCE.
- MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
- // Get locks and create the collection.
- auto db = stdx::make_unique<AutoGetOrCreateDb>(opCtx, nss.db(), MODE_IX);
- auto coll = stdx::make_unique<AutoGetCollection>(opCtx, nss, MODE_X);
- collection = coll->getCollection();
+ // We are not replicating nor validating writes under this OperationContext*.
+ // The OperationContext* is used for all writes to the (newly) cloned collection.
+ UnreplicatedWritesBlock uwb(opCtx.get());
+ documentValidationDisabled(opCtx.get()) = true;
- if (collection) {
- return {ErrorCodes::NamespaceExists, "Collection already exists."};
- }
+ std::unique_ptr<AutoGetCollection> autoColl;
+ // Retry if WCE.
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
+ // Get locks and create the collection.
+ AutoGetOrCreateDb db(opCtx.get(), nss.db(), MODE_X);
+ AutoGetCollection coll(opCtx.get(), nss, MODE_IX);
+ if (coll.getCollection()) {
+ return {ErrorCodes::NamespaceExists,
+ str::stream() << "Collection " << nss.ns() << " already exists."};
+ }
+ {
// Create the collection.
- WriteUnitOfWork wunit(opCtx);
- collection = db->getDb()->createCollection(opCtx, nss.ns(), options, false);
- invariant(collection);
+ WriteUnitOfWork wunit(opCtx.get());
+ fassert(40332, db.getDb()->createCollection(opCtx.get(), nss.ns(), options, false));
wunit.commit();
+ }
- // Build empty capped indexes. Capped indexes cannot be build by the MultiIndexBlock
- // because the cap might delete documents off the back while we are inserting them into
- // the front.
- if (options.capped) {
- WriteUnitOfWork wunit(opCtx);
- if (!idIndexSpec.isEmpty()) {
- auto status = collection->getIndexCatalog()->createIndexOnEmptyCollection(
- opCtx, idIndexSpec);
- if (!status.getStatus().isOK()) {
- return status.getStatus();
- }
+ autoColl = stdx::make_unique<AutoGetCollection>(opCtx.get(), nss, MODE_IX);
+
+ // Build empty capped indexes. Capped indexes cannot be built by the MultiIndexBlock
+ // because the cap might delete documents off the back while we are inserting them into
+ // the front.
+ if (options.capped) {
+ WriteUnitOfWork wunit(opCtx.get());
+ if (!idIndexSpec.isEmpty()) {
+ auto status =
+ autoColl->getCollection()->getIndexCatalog()->createIndexOnEmptyCollection(
+ opCtx.get(), idIndexSpec);
+ if (!status.getStatus().isOK()) {
+ return status.getStatus();
}
- for (auto&& spec : secondaryIndexSpecs) {
- auto status =
- collection->getIndexCatalog()->createIndexOnEmptyCollection(opCtx, spec);
- if (!status.getStatus().isOK()) {
- return status.getStatus();
- }
+ }
+ for (auto&& spec : secondaryIndexSpecs) {
+ auto status =
+ autoColl->getCollection()->getIndexCatalog()->createIndexOnEmptyCollection(
+ opCtx.get(), spec);
+ if (!status.getStatus().isOK()) {
+ return status.getStatus();
}
- wunit.commit();
}
-
-
- coll = stdx::make_unique<AutoGetCollection>(opCtx, nss, MODE_IX);
-
- // Move locks into loader, so it now controls their lifetime.
- auto loader = stdx::make_unique<CollectionBulkLoaderImpl>(opCtx,
- collection,
- options.capped ? BSONObj()
- : idIndexSpec,
- std::move(threadPool),
- std::move(runner),
- std::move(db),
- std::move(coll));
-
- // Move the loader into the StatusWith.
- loaderToReturn = std::move(loader);
- return Status::OK();
+ wunit.commit();
}
- MONGO_WRITE_CONFLICT_RETRY_LOOP_END(opCtx, "beginCollectionClone", nss.ns());
- MONGO_UNREACHABLE;
- });
-
- if (!status.isOK()) {
- return status;
}
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_END(opCtx.get(), "beginCollectionClone", nss.ns());
- invariant(collection);
+ // Move locks into loader, so it now controls their lifetime.
+ auto loader =
+ stdx::make_unique<CollectionBulkLoaderImpl>(Client::releaseCurrent(),
+ std::move(opCtx),
+ std::move(autoColl),
+ options.capped ? BSONObj() : idIndexSpec);
- status = loaderToReturn->init(collection,
- options.capped ? std::vector<BSONObj>() : secondaryIndexSpecs);
+ auto status = loader->init(options.capped ? std::vector<BSONObj>() : secondaryIndexSpecs);
if (!status.isOK()) {
return status;
}
- return std::move(loaderToReturn);
+ return {std::move(loader)};
}
-
Status StorageInterfaceImpl::insertDocument(OperationContext* opCtx,
const NamespaceString& nss,
const BSONObj& doc) {
diff --git a/src/mongo/db/repl/storage_interface_mock.cpp b/src/mongo/db/repl/storage_interface_mock.cpp
index 64ebecdc573..89fb5831083 100644
--- a/src/mongo/db/repl/storage_interface_mock.cpp
+++ b/src/mongo/db/repl/storage_interface_mock.cpp
@@ -67,11 +67,10 @@ Status StorageInterfaceMock::incrementRollbackID(OperationContext* opCtx) {
_rbid++;
return Status::OK();
}
-Status CollectionBulkLoaderMock::init(Collection* coll,
- const std::vector<BSONObj>& secondaryIndexSpecs) {
+Status CollectionBulkLoaderMock::init(const std::vector<BSONObj>& secondaryIndexSpecs) {
LOG(1) << "CollectionBulkLoaderMock::init called";
stats->initCalled = true;
- return initFn(coll, secondaryIndexSpecs);
+ return Status::OK();
};
Status CollectionBulkLoaderMock::insertDocuments(const std::vector<BSONObj>::const_iterator begin,
diff --git a/src/mongo/db/repl/storage_interface_mock.h b/src/mongo/db/repl/storage_interface_mock.h
index 74a6bac4abc..fc6efeb989b 100644
--- a/src/mongo/db/repl/storage_interface_mock.h
+++ b/src/mongo/db/repl/storage_interface_mock.h
@@ -53,7 +53,7 @@ class CollectionBulkLoaderMock : public CollectionBulkLoader {
public:
CollectionBulkLoaderMock(CollectionMockStats* collStats) : stats(collStats){};
virtual ~CollectionBulkLoaderMock() = default;
- virtual Status init(Collection* coll, const std::vector<BSONObj>& secondaryIndexSpecs) override;
+ virtual Status init(const std::vector<BSONObj>& secondaryIndexSpecs) override;
virtual Status insertDocuments(const std::vector<BSONObj>::const_iterator begin,
const std::vector<BSONObj>::const_iterator end) override;
@@ -75,8 +75,6 @@ public:
const std::vector<BSONObj>::const_iterator) { return Status::OK(); };
stdx::function<Status()> abortFn = []() { return Status::OK(); };
stdx::function<Status()> commitFn = []() { return Status::OK(); };
- stdx::function<Status(Collection* coll, const std::vector<BSONObj>& secondaryIndexSpecs)>
- initFn = [](Collection*, const std::vector<BSONObj>&) { return Status::OK(); };
};
class StorageInterfaceMock : public StorageInterface {