diff options
author | Scott Hernandez <scotthernandez@tart.local> | 2016-06-10 16:12:01 -0400 |
---|---|---|
committer | Scott Hernandez <scotthernandez@tart.local> | 2016-06-17 11:53:01 -0400 |
commit | 42eb9464500381ef8034070472badded4b427cf5 (patch) | |
tree | 9f7aaf66732d0352f37ced532f6c0fc99cd5793d /src | |
parent | c59f5ade57e41b6a50f40999ea14883da691e951 (diff) | |
download | mongo-42eb9464500381ef8034070472badded4b427cf5.tar.gz |
SERVER-23059: storage interface improvements
Diffstat (limited to 'src')
22 files changed, 1237 insertions, 108 deletions
diff --git a/src/mongo/db/catalog/collection.cpp b/src/mongo/db/catalog/collection.cpp index 673a2f10628..03f57458f9a 100644 --- a/src/mongo/db/catalog/collection.cpp +++ b/src/mongo/db/catalog/collection.cpp @@ -414,7 +414,7 @@ Status Collection::insertDocument(OperationContext* txn, Status Collection::insertDocument(OperationContext* txn, const BSONObj& doc, - MultiIndexBlock* indexBlock, + const std::vector<MultiIndexBlock*>& indexBlocks, bool enforceQuota) { { auto status = checkValidation(txn, doc); @@ -433,9 +433,12 @@ Status Collection::insertDocument(OperationContext* txn, if (!loc.isOK()) return loc.getStatus(); - Status status = indexBlock->insert(doc, loc.getValue()); - if (!status.isOK()) - return status; + for (auto&& indexBlock : indexBlocks) { + Status status = indexBlock->insert(doc, loc.getValue()); + if (!status.isOK()) { + return status; + } + } vector<BSONObj> docs; docs.push_back(doc); diff --git a/src/mongo/db/catalog/collection.h b/src/mongo/db/catalog/collection.h index 292581736a2..f6e782bd230 100644 --- a/src/mongo/db/catalog/collection.h +++ b/src/mongo/db/catalog/collection.h @@ -297,9 +297,14 @@ public: const DocWriter* const* docs, size_t nDocs); + /** + * Inserts a document into the record store and adds it to the MultiIndexBlocks passed in. + * + * NOTE: It is up to caller to commit the indexes. + */ Status insertDocument(OperationContext* txn, const BSONObj& doc, - MultiIndexBlock* indexBlock, + const std::vector<MultiIndexBlock*>& indexBlocks, bool enforceQuota); /** diff --git a/src/mongo/db/catalog/rename_collection.cpp b/src/mongo/db/catalog/rename_collection.cpp index 2f3517e8159..e41c00a4ebe 100644 --- a/src/mongo/db/catalog/rename_collection.cpp +++ b/src/mongo/db/catalog/rename_collection.cpp @@ -181,6 +181,7 @@ Status renameCollection(OperationContext* txn, MultiIndexBlock indexer(txn, targetColl); indexer.allowInterruption(); + std::vector<MultiIndexBlock*> indexers{&indexer}; // Copy the index descriptions from the source collection, adjusting the ns field. { @@ -211,7 +212,7 @@ Status renameCollection(OperationContext* txn, // No logOp necessary because the entire renameCollection command is one logOp. bool shouldReplicateWrites = txn->writesAreReplicated(); txn->setReplicatedWrites(false); - Status status = targetColl->insertDocument(txn, obj, &indexer, true); + Status status = targetColl->insertDocument(txn, obj, indexers, true); txn->setReplicatedWrites(shouldReplicateWrites); if (!status.isOK()) return status; diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index f7c80b2cb0d..f745889c129 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -67,6 +67,7 @@ env.Library( target='storage_interface_impl', source=[ 'storage_interface_impl.cpp', + 'collection_bulk_loader_impl.cpp', ], LIBDEPS=[ 'storage_interface', diff --git a/src/mongo/db/repl/collection_bulk_loader.h b/src/mongo/db/repl/collection_bulk_loader.h new file mode 100644 index 00000000000..f3f5e0cb6e5 --- /dev/null +++ b/src/mongo/db/repl/collection_bulk_loader.h @@ -0,0 +1,67 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + + +#pragma once + +#include <string> + +namespace mongo { + +class Collection; +class OperationContext; + +namespace repl { + +/** + * Used on a local Collection to create and bulk build indexes. + */ +class CollectionBulkLoader { +public: + virtual ~CollectionBulkLoader() = default; + + virtual Status init(OperationContext* txn, + Collection* coll, + const std::vector<BSONObj>& indexSpecs) = 0; + /** + * Inserts the documents into the collection record store, and indexes them with the + * MultiIndexBlock on the side. + */ + virtual Status insertDocuments(const std::vector<BSONObj>::const_iterator begin, + const std::vector<BSONObj>::const_iterator end) = 0; + /** + * Called when inserts are done and indexes can be committed. + */ + virtual Status commit() = 0; + + virtual std::string toString() const = 0; + virtual BSONObj toBSON() const = 0; +}; + +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/collection_bulk_loader_impl.cpp b/src/mongo/db/repl/collection_bulk_loader_impl.cpp new file mode 100644 index 00000000000..9cc6fa5f7f0 --- /dev/null +++ b/src/mongo/db/repl/collection_bulk_loader_impl.cpp @@ -0,0 +1,207 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication + +#include "mongo/platform/basic.h" + +#include "mongo/base/status.h" +#include "mongo/base/status_with.h" +#include "mongo/bson/bsonobj.h" +#include "mongo/bson/bsonobjbuilder.h" +#include "mongo/db/catalog/collection.h" +#include "mongo/db/catalog/index_create.h" +#include "mongo/db/client.h" +#include "mongo/db/concurrency/d_concurrency.h" +#include "mongo/db/concurrency/write_conflict_exception.h" +#include "mongo/db/curop.h" +#include "mongo/db/jsobj.h" +#include "mongo/db/operation_context.h" +#include "mongo/db/repl/collection_bulk_loader_impl.h" +#include "mongo/util/destructor_guard.h" +#include "mongo/util/log.h" +#include "mongo/util/mongoutils/str.h" + +namespace mongo { +namespace repl { + +CollectionBulkLoaderImpl::CollectionBulkLoaderImpl(OperationContext* txn, + TaskRunner* runner, + Collection* coll, + const BSONObj idIndexSpec, + std::unique_ptr<AutoGetOrCreateDb> autoDb, + std::unique_ptr<AutoGetCollection> autoColl) + : _runner(runner), + _autoColl(std::move(autoColl)), + _autoDB(std::move(autoDb)), + _txn(txn), + _coll(coll), + _nss{coll->ns()}, + _idIndexBlock{txn, coll}, + _secondaryIndexesBlock{txn, coll}, + _idIndexSpec(!idIndexSpec.isEmpty() ? idIndexSpec : BSON("ns" << _nss.toString() << "name" + << "_id_" + << "key" + << BSON("_id" << 1) + << "unique" + << true)) { + invariant(txn); + invariant(coll); + invariant(runner); + invariant(_autoDB); + invariant(_autoColl); + invariant(_autoDB->getDb()); + invariant(_autoColl->getDb() == _autoDB->getDb()); +} + +CollectionBulkLoaderImpl::~CollectionBulkLoaderImpl() { + DESTRUCTOR_GUARD(_runner->cancel();) +} + +Status CollectionBulkLoaderImpl::init(OperationContext* txn, + Collection* coll, + const std::vector<BSONObj>& secondaryIndexSpecs) { + invariant(txn); + invariant(coll); + invariant(txn->getClient() == &cc()); + _callAbortOnDestructor = true; + if (secondaryIndexSpecs.size()) { + _hasSecondaryIndexes = true; + _secondaryIndexesBlock.ignoreUniqueConstraint(); + auto status = _secondaryIndexesBlock.init(secondaryIndexSpecs); + if (!status.isOK()) { + return status; + } + } + + auto status = _idIndexBlock.init(_idIndexSpec); + if (!status.isOK()) { + return status; + } + + return Status::OK(); +} + +Status CollectionBulkLoaderImpl::insertDocuments(const std::vector<BSONObj>::const_iterator begin, + const std::vector<BSONObj>::const_iterator end) { + int count = 0; + return _runner->runSynchronousTask([begin, end, &count, this](OperationContext* txn) -> Status { + invariant(txn); + + for (auto iter = begin; iter != end; ++iter) { + std::vector<MultiIndexBlock*> indexers{&_idIndexBlock}; + MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { + WriteUnitOfWork wunit(txn); + const auto status = _coll->insertDocument(txn, *iter, indexers, false); + if (!status.isOK()) { + return status; + } + wunit.commit(); + } + MONGO_WRITE_CONFLICT_RETRY_LOOP_END( + _txn, "CollectionBulkLoaderImpl::insertDocuments", _nss.ns()); + + ++count; + } + return Status::OK(); + }); +} + +Status CollectionBulkLoaderImpl::commit() { + return _runner->runSynchronousTask( + [this](OperationContext* txn) -> Status { + invariant(txn->getClient() == &cc()); + invariant(txn == _txn); + + // Commit before deleting dups, so the dups will be removed from secondary indexes when + // deleted. + if (_hasSecondaryIndexes) { + std::set<RecordId> secDups; + auto status = _secondaryIndexesBlock.doneInserting(&secDups); + if (!status.isOK()) { + return status; + } + if (secDups.size()) { + return Status{ErrorCodes::UserDataInconsistent, + "Found duplicates when dups are disabled in MultiIndexBlock."}; + } + WriteUnitOfWork wunit(txn); + _secondaryIndexesBlock.commit(); + wunit.commit(); + } + + // Delete dups. + std::set<RecordId> dups; + // Do not do inside a WriteUnitOfWork (required by doneInserting). + auto status = _idIndexBlock.doneInserting(&dups); + if (!status.isOK()) { + return status; + } + + for (auto&& it : dups) { + MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { + WriteUnitOfWork wunit(_txn); + _coll->deleteDocument(_txn, + it, + nullptr /** OpDebug **/, + false /* fromMigrate */, + true /* noWarn */); + wunit.commit(); + } + MONGO_WRITE_CONFLICT_RETRY_LOOP_END( + _txn, "CollectionBulkLoaderImpl::commit", _nss.ns()); + } + + // Commit _id index, without dups. + WriteUnitOfWork wunit(txn); + _idIndexBlock.commit(); + wunit.commit(); + + // release locks. + _autoColl.reset(nullptr); + _autoDB.reset(nullptr); + _coll = nullptr; + _callAbortOnDestructor = false; + return Status::OK(); + }, + TaskRunner::NextAction::kDisposeOperationContext); +} + +std::string CollectionBulkLoaderImpl::toString() const { + return toBSON().toString(); +} + +BSONObj CollectionBulkLoaderImpl::toBSON() const { + BSONObjBuilder bob; + bob.append("BulkLoader", _nss.toString()); + // TODO: Add index specs here. + return bob.done(); +} + +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/collection_bulk_loader_impl.h b/src/mongo/db/repl/collection_bulk_loader_impl.h new file mode 100644 index 00000000000..1650d98f5fb --- /dev/null +++ b/src/mongo/db/repl/collection_bulk_loader_impl.h @@ -0,0 +1,90 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + + +#pragma once + +#include "mongo/base/disallow_copying.h" +#include "mongo/base/status.h" +#include "mongo/base/status_with.h" +#include "mongo/bson/bsonobj.h" +#include "mongo/db/catalog/index_create.h" +#include "mongo/db/db_raii.h" +#include "mongo/db/namespace_string.h" +#include "mongo/db/repl/collection_bulk_loader.h" +#include "mongo/db/repl/storage_interface.h" +#include "mongo/db/repl/task_runner.h" +#include "mongo/util/concurrency/old_thread_pool.h" + +namespace mongo { +namespace repl { + +/** + * Class in charge of building a collection during data loading (like initial sync). + * + * Note: Call commit when done inserting documents. + */ +class CollectionBulkLoaderImpl : public CollectionBulkLoader { + MONGO_DISALLOW_COPYING(CollectionBulkLoaderImpl); + +public: + CollectionBulkLoaderImpl(OperationContext* txn, + TaskRunner* runner, + Collection* coll, + const BSONObj idIndexSpec, + std::unique_ptr<AutoGetOrCreateDb> autoDB, + std::unique_ptr<AutoGetCollection> autoColl); + virtual ~CollectionBulkLoaderImpl(); + + virtual Status init(OperationContext* txn, + Collection* coll, + const std::vector<BSONObj>& secondaryIndexSpecs) override; + + virtual Status insertDocuments(const std::vector<BSONObj>::const_iterator begin, + const std::vector<BSONObj>::const_iterator end) override; + virtual Status commit() override; + + virtual std::string toString() const override; + virtual BSONObj toBSON() const override; + +private: + TaskRunner* _runner; + std::unique_ptr<AutoGetCollection> _autoColl; + std::unique_ptr<AutoGetOrCreateDb> _autoDB; + OperationContext* _txn = nullptr; + Collection* _coll = nullptr; + NamespaceString _nss; + MultiIndexBlock _idIndexBlock; + MultiIndexBlock _secondaryIndexesBlock; + bool _hasSecondaryIndexes = false; + BSONObj _idIndexSpec; + bool _callAbortOnDestructor = false; +}; + +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index 1cad85f2607..820f4b11c96 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -518,15 +518,14 @@ long long getNewOplogSizeBytes(OperationContext* txn, const ReplSettings& replSe } } // namespace -void createOplog(OperationContext* txn) { +void createOplog(OperationContext* txn, const std::string& oplogCollectionName, bool isReplSet) { ScopedTransaction transaction(txn, MODE_X); Lock::GlobalWrite lk(txn->lockState()); const ReplSettings& replSettings = ReplicationCoordinator::get(txn)->getSettings(); - bool replEnabled = replSettings.usingReplSets(); - OldClientContext ctx(txn, _oplogCollectionName); - Collection* collection = ctx.db()->getCollection(_oplogCollectionName); + OldClientContext ctx(txn, oplogCollectionName); + Collection* collection = ctx.db()->getCollection(oplogCollectionName); if (collection) { if (replSettings.getOplogSizeBytes() != 0) { @@ -544,8 +543,8 @@ void createOplog(OperationContext* txn) { } } - if (!replEnabled) - initTimestampFromOplog(txn, _oplogCollectionName); + if (!isReplSet) + initTimestampFromOplog(txn, oplogCollectionName); return; } @@ -562,12 +561,12 @@ void createOplog(OperationContext* txn) { MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { WriteUnitOfWork uow(txn); - invariant(ctx.db()->createCollection(txn, _oplogCollectionName, options)); - if (!replEnabled) + invariant(ctx.db()->createCollection(txn, oplogCollectionName, options)); + if (!isReplSet) getGlobalServiceContext()->getOpObserver()->onOpMessage(txn, BSONObj()); uow.commit(); } - MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "createCollection", _oplogCollectionName); + MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "createCollection", oplogCollectionName); /* sync here so we don't get any surprising lag later when we try to sync */ StorageEngine* storageEngine = getGlobalServiceContext()->getGlobalStorageEngine(); @@ -575,6 +574,12 @@ void createOplog(OperationContext* txn) { log() << "******" << endl; } +void createOplog(OperationContext* txn) { + const auto isReplSet = ReplicationCoordinator::get(txn)->getReplicationMode() == + ReplicationCoordinator::modeReplSet; + createOplog(txn, _oplogCollectionName, isReplSet); +} + // ------------------------------------- namespace { diff --git a/src/mongo/db/repl/oplog.h b/src/mongo/db/repl/oplog.h index 2b0f77164b2..9015af9e238 100644 --- a/src/mongo/db/repl/oplog.h +++ b/src/mongo/db/repl/oplog.h @@ -52,10 +52,16 @@ class ReplSettings; */ void truncateOplogTo(OperationContext* txn, Timestamp truncateTimestamp); +/** + * Create a new capped collection for the oplog if it doesn't yet exist. + * If the collection already exists (and isReplSet is false), + * set the 'last' Timestamp from the last entry of the oplog collection (side effect!) + */ +void createOplog(OperationContext* txn, const std::string& oplogCollectionName, bool isReplSet); + /* - * Create a new capped collection for the oplog using createOplog() if it doesn't yet exist. - * Collection name will be "_oplogCollectionName" initialized in setOplogCollectionName(). - * This will be either local.oplog.rs (replica sets) or local.oplog.$main (master/slave) + * Shortcut for above function using oplogCollectionName = _oplogCollectionName, + * and replEnabled = replCoord::isReplSet(); */ void createOplog(OperationContext* txn); diff --git a/src/mongo/db/repl/rs_initialsync.cpp b/src/mongo/db/repl/rs_initialsync.cpp index 8c50820b31e..696e699b984 100644 --- a/src/mongo/db/repl/rs_initialsync.cpp +++ b/src/mongo/db/repl/rs_initialsync.cpp @@ -45,6 +45,7 @@ #include "mongo/db/dbhelpers.h" #include "mongo/db/op_observer.h" #include "mongo/db/repl/bgsync.h" +#include "mongo/db/repl/data_replicator.h" #include "mongo/db/repl/initial_sync.h" #include "mongo/db/repl/oplog.h" #include "mongo/db/repl/oplogreader.h" @@ -112,61 +113,6 @@ void truncateAndResetOplog(OperationContext* txn, MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "truncate", collection->ns().ns()); } -/** - * Confirms that the "admin" database contains a supported version of the auth - * data schema. Terminates the process if the "admin" contains clearly incompatible - * auth data. - */ -void checkAdminDatabasePostClone(OperationContext* txn, Database* adminDb) { - // Assumes txn holds MODE_X or MODE_S lock on "admin" database. - if (!adminDb) { - return; - } - Collection* const usersCollection = - adminDb->getCollection(AuthorizationManager::usersCollectionNamespace); - const bool hasUsers = - usersCollection && !Helpers::findOne(txn, usersCollection, BSONObj(), false).isNull(); - Collection* const adminVersionCollection = - adminDb->getCollection(AuthorizationManager::versionCollectionNamespace); - BSONObj authSchemaVersionDocument; - if (!adminVersionCollection || - !Helpers::findOne(txn, - adminVersionCollection, - AuthorizationManager::versionDocumentQuery, - authSchemaVersionDocument)) { - if (!hasUsers) { - // It's OK to have no auth version document if there are no user documents. - return; - } - severe() << "During initial sync, found documents in " - << AuthorizationManager::usersCollectionNamespace - << " but could not find an auth schema version document in " - << AuthorizationManager::versionCollectionNamespace; - severe() << "This indicates that the primary of this replica set was not successfully " - "upgraded to schema version " - << AuthorizationManager::schemaVersion26Final - << ", which is the minimum supported schema version in this version of MongoDB"; - fassertFailedNoTrace(28620); - } - long long foundSchemaVersion; - Status status = bsonExtractIntegerField(authSchemaVersionDocument, - AuthorizationManager::schemaVersionFieldName, - &foundSchemaVersion); - if (!status.isOK()) { - severe() << "During initial sync, found malformed auth schema version document: " << status - << "; document: " << authSchemaVersionDocument; - fassertFailedNoTrace(28618); - } - if ((foundSchemaVersion != AuthorizationManager::schemaVersion26Final) && - (foundSchemaVersion != AuthorizationManager::schemaVersion28SCRAM)) { - severe() << "During initial sync, found auth schema version " << foundSchemaVersion - << ", but this version of MongoDB only supports schema versions " - << AuthorizationManager::schemaVersion26Final << " and " - << AuthorizationManager::schemaVersion28SCRAM; - fassertFailedNoTrace(28619); - } -} - bool _initialSyncClone(OperationContext* txn, Cloner& cloner, const std::string& host, @@ -202,7 +148,7 @@ bool _initialSyncClone(OperationContext* txn, } if (dataPass && (db == "admin")) { - checkAdminDatabasePostClone(txn, dbHolder().get(txn, db)); + fassertNoTrace(28619, checkAdminDatabase(txn, dbHolder().get(txn, db))); } return true; } @@ -512,6 +458,61 @@ const auto kMaxFailedAttempts = 10; const auto kInitialSyncRetrySleepDuration = Seconds{5}; } // namespace +Status checkAdminDatabase(OperationContext* txn, Database* adminDb) { + // Assumes txn holds MODE_X or MODE_S lock on "admin" database. + if (!adminDb) { + return Status::OK(); + } + Collection* const usersCollection = + adminDb->getCollection(AuthorizationManager::usersCollectionNamespace); + const bool hasUsers = + usersCollection && !Helpers::findOne(txn, usersCollection, BSONObj(), false).isNull(); + Collection* const adminVersionCollection = + adminDb->getCollection(AuthorizationManager::versionCollectionNamespace); + BSONObj authSchemaVersionDocument; + if (!adminVersionCollection || + !Helpers::findOne(txn, + adminVersionCollection, + AuthorizationManager::versionDocumentQuery, + authSchemaVersionDocument)) { + if (!hasUsers) { + // It's OK to have no auth version document if there are no user documents. + return Status::OK(); + } + std::string msg = str::stream() + << "During initial sync, found documents in " + << AuthorizationManager::usersCollectionNamespace.ns() + << " but could not find an auth schema version document in " + << AuthorizationManager::versionCollectionNamespace.ns() << ". " + << "This indicates that the primary of this replica set was not successfully " + "upgraded to schema version " + << AuthorizationManager::schemaVersion26Final + << ", which is the minimum supported schema version in this version of MongoDB"; + return {ErrorCodes::AuthSchemaIncompatible, msg}; + } + long long foundSchemaVersion; + Status status = bsonExtractIntegerField(authSchemaVersionDocument, + AuthorizationManager::schemaVersionFieldName, + &foundSchemaVersion); + if (!status.isOK()) { + std::string msg = str::stream() + << "During initial sync, found malformed auth schema version document: " + << status.toString() << "; document: " << authSchemaVersionDocument; + return {ErrorCodes::AuthSchemaIncompatible, msg}; + } + if ((foundSchemaVersion != AuthorizationManager::schemaVersion26Final) && + (foundSchemaVersion != AuthorizationManager::schemaVersion28SCRAM)) { + std::string msg = str::stream() + << "During initial sync, found auth schema version " << foundSchemaVersion + << ", but this version of MongoDB only supports schema versions " + << AuthorizationManager::schemaVersion26Final << " and " + << AuthorizationManager::schemaVersion28SCRAM; + return {ErrorCodes::AuthSchemaIncompatible, msg}; + } + + return Status::OK(); +} + void syncDoInitialSync(BackgroundSync* bgsync) { stdx::unique_lock<stdx::mutex> lk(_initialSyncMutex, stdx::defer_lock); if (!lk.try_lock()) { diff --git a/src/mongo/db/repl/rs_initialsync.h b/src/mongo/db/repl/rs_initialsync.h index 1f5416453e0..4f8cd5e5017 100644 --- a/src/mongo/db/repl/rs_initialsync.h +++ b/src/mongo/db/repl/rs_initialsync.h @@ -28,9 +28,13 @@ #pragma once +#include "mongo/base/status.h" + namespace mongo { -namespace repl { +class OperationContext; +class Database; +namespace repl { class BackgroundSync; /** @@ -39,5 +43,10 @@ class BackgroundSync; */ void syncDoInitialSync(BackgroundSync* bgsync); +/** + * Checks that the "admin" database contains a supported version of the auth data schema. + */ +Status checkAdminDatabase(OperationContext* txn, Database* adminDb); + } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/storage_interface.cpp b/src/mongo/db/repl/storage_interface.cpp index ba885bda35c..557b44d079a 100644 --- a/src/mongo/db/repl/storage_interface.cpp +++ b/src/mongo/db/repl/storage_interface.cpp @@ -71,9 +71,9 @@ StorageInterface* StorageInterface::get(OperationContext* txn) { } -void StorageInterface::set(ServiceContext* service, std::unique_ptr<StorageInterface> replCoord) { - auto& coordinator = getStorageInterface(service); - coordinator = std::move(replCoord); +void StorageInterface::set(ServiceContext* service, std::unique_ptr<StorageInterface> storage) { + auto& storageInterface = getStorageInterface(service); + storageInterface = std::move(storage); } } // namespace repl diff --git a/src/mongo/db/repl/storage_interface.h b/src/mongo/db/repl/storage_interface.h index d0a3cc723fc..1a45d84021e 100644 --- a/src/mongo/db/repl/storage_interface.h +++ b/src/mongo/db/repl/storage_interface.h @@ -34,12 +34,15 @@ #include "mongo/base/disallow_copying.h" #include "mongo/db/namespace_string.h" +#include "mongo/db/repl/collection_bulk_loader.h" #include "mongo/db/repl/multiapplier.h" #include "mongo/db/repl/optime.h" #include "mongo/db/service_context.h" namespace mongo { +class Collection; +struct CollectionOptions; class OperationContext; namespace repl { @@ -60,9 +63,19 @@ enum class DurableRequirement { }; /** - * Storage interface used by used by the ReplicationExecutor inside mongod for supporting - * ReplicationExectutor's ability to take database locks. + * Storage interface used by the replication system to interact with storage. + * This interface provides seperation of concerns and a place for mocking out test + * interactions. * + * The grouping of functionality includes general collection helpers, and more specific replication + * concepts: + * * Create Collection and Oplog + * * Drop database and all user databases + * * Drop a collection + * * Insert documents into a collection + * * Manage minvalid boundaries and initial sync state + * + * ***** MINVALID ***** * This interface provides helper functions for maintaining a single document in the * local.replset.minvalid collection. * @@ -83,20 +96,40 @@ enum class DurableRequirement { * begin: {ts:..., t:...} // a batch is currently being applied, and not consistent * } */ - class StorageInterface { MONGO_DISALLOW_COPYING(StorageInterface); public: + // Used for testing. + + using CreateCollectionFn = stdx::function<StatusWith<std::unique_ptr<CollectionBulkLoader>>( + const NamespaceString& nss, + const CollectionOptions& options, + const BSONObj idIndexSpec, + const std::vector<BSONObj>& secondaryIndexSpecs)>; + using InsertDocumentFn = stdx::function<Status( + OperationContext* txn, const NamespaceString& nss, const BSONObj& doc)>; + using InsertOplogDocumentsFn = stdx::function<StatusWith<OpTime>( + OperationContext* txn, const NamespaceString& nss, const std::vector<BSONObj>& ops)>; + using DropUserDatabasesFn = stdx::function<Status(OperationContext* txn)>; + using CreateOplogFn = stdx::function<Status(OperationContext* txn, const NamespaceString& nss)>; + using DropCollectionFn = + stdx::function<Status(OperationContext* txn, const NamespaceString& nss)>; + + // Operation Context binding. static StorageInterface* get(ServiceContext* service); static StorageInterface* get(ServiceContext& service); static StorageInterface* get(OperationContext* txn); - static void set(ServiceContext* service, std::unique_ptr<StorageInterface> storageInterface); + // Constructor and Destructor. StorageInterface() = default; virtual ~StorageInterface() = default; + virtual void startup() = 0; + virtual void shutdown() = 0; + + // MinValid and Initial Sync Flag. /** * Returns true if initial sync was started but has not not completed. */ @@ -152,6 +185,54 @@ public: virtual StatusWith<OpTime> writeOpsToOplog(OperationContext* txn, const NamespaceString& nss, const MultiApplier::Operations& operations) = 0; + + // Collection creation and population for initial sync. + /** + * Creates a collection with the provided indexes. + * + * Assumes that no database locks have been acquired prior to calling this function. + */ + virtual StatusWith<std::unique_ptr<CollectionBulkLoader>> createCollectionForBulkLoading( + const NamespaceString& nss, + const CollectionOptions& options, + const BSONObj idIndexSpec, + const std::vector<BSONObj>& secondaryIndexSpecs) = 0; + + /** + * Inserts a document into a collection. + * + * NOTE: If the collection doesn't exist, it will not be created, and instead + * an error is returned. + */ + virtual Status insertDocument(OperationContext* txn, + const NamespaceString& nss, + const BSONObj& doc) = 0; + + /** + * Inserts the given documents into the oplog, returning the last written OpTime. + */ + virtual StatusWith<OpTime> insertOplogDocuments(OperationContext* txn, + const NamespaceString& nss, + const std::vector<BSONObj>& ops) = 0; + /** + * Creates the initial oplog, errors if it exists. + */ + virtual Status createOplog(OperationContext* txn, const NamespaceString& nss) = 0; + + /** + * Drops a collection, like the oplog. + */ + virtual Status dropCollection(OperationContext* txn, const NamespaceString& nss) = 0; + + /** + * Drops all databases except "local". + */ + virtual Status dropReplicatedDatabases(OperationContext* txn) = 0; + + /** + * Validates that the admin database is valid during initial sync. + */ + virtual Status isAdminDbValid(OperationContext* txn) = 0; }; } // namespace repl diff --git a/src/mongo/db/repl/storage_interface_impl.cpp b/src/mongo/db/repl/storage_interface_impl.cpp index afe4a709d98..3acbad55326 100644 --- a/src/mongo/db/repl/storage_interface_impl.cpp +++ b/src/mongo/db/repl/storage_interface_impl.cpp @@ -34,7 +34,14 @@ #include <algorithm> +#include "mongo/base/status.h" +#include "mongo/base/status_with.h" +#include "mongo/bson/bsonobj.h" +#include "mongo/bson/bsonobjbuilder.h" #include "mongo/db/catalog/collection.h" +#include "mongo/db/catalog/database.h" +#include "mongo/db/catalog/document_validation.h" +#include "mongo/db/catalog/index_create.h" #include "mongo/db/client.h" #include "mongo/db/concurrency/d_concurrency.h" #include "mongo/db/concurrency/write_conflict_exception.h" @@ -43,11 +50,18 @@ #include "mongo/db/dbhelpers.h" #include "mongo/db/jsobj.h" #include "mongo/db/operation_context.h" +#include "mongo/db/repl/collection_bulk_loader_impl.h" +#include "mongo/db/repl/oplog.h" #include "mongo/db/repl/replication_coordinator.h" +#include "mongo/db/repl/replication_coordinator_global.h" +#include "mongo/db/repl/rs_initialsync.h" +#include "mongo/db/server_parameters.h" #include "mongo/db/service_context.h" #include "mongo/util/assert_util.h" +#include "mongo/util/concurrency/old_thread_pool.h" +#include "mongo/util/destructor_guard.h" #include "mongo/util/log.h" - +#include "mongo/util/mongoutils/str.h" namespace mongo { namespace repl { @@ -56,9 +70,11 @@ const char StorageInterfaceImpl::kInitialSyncFlagFieldName[] = "doingInitialSync const char StorageInterfaceImpl::kBeginFieldName[] = "begin"; namespace { +using UniqueLock = stdx::unique_lock<stdx::mutex>; -const BSONObj kInitialSyncFlag(BSON(StorageInterfaceImpl::kInitialSyncFlagFieldName << true)); +MONGO_EXPORT_STARTUP_SERVER_PARAMETER(dataReplicatorInitialSyncInserterThreads, int, 4); +const BSONObj kInitialSyncFlag(BSON(StorageInterfaceImpl::kInitialSyncFlagFieldName << true)); } // namespace StorageInterfaceImpl::StorageInterfaceImpl() @@ -67,6 +83,22 @@ StorageInterfaceImpl::StorageInterfaceImpl() StorageInterfaceImpl::StorageInterfaceImpl(const NamespaceString& minValidNss) : _minValidNss(minValidNss) {} +StorageInterfaceImpl::~StorageInterfaceImpl() { + DESTRUCTOR_GUARD(shutdown();); +} + +void StorageInterfaceImpl::startup() { + _bulkLoaderThreads.reset( + new OldThreadPool{dataReplicatorInitialSyncInserterThreads, "InitialSyncInserters-"}); +}; + +void StorageInterfaceImpl::shutdown() { + if (_bulkLoaderThreads) { + _bulkLoaderThreads->join(); + _bulkLoaderThreads.reset(); + } +} + NamespaceString StorageInterfaceImpl::getMinValidNss() const { return _minValidNss; } @@ -135,14 +167,25 @@ BatchBoundaries StorageInterfaceImpl::getMinValid(OperationContext* txn) const { if (found) { auto status = OpTime::parseFromOplogEntry(mv.getObjectField(kBeginFieldName)); OpTime start(status.isOK() ? status.getValue() : OpTime{}); - OpTime end(fassertStatusOK(40052, OpTime::parseFromOplogEntry(mv))); + const auto opTimeStatus = OpTime::parseFromOplogEntry(mv); + // If any of the keys (fields) are missing from the minvalid document, we return + // empty. + if (opTimeStatus == ErrorCodes::NoSuchKey) { + return BatchBoundaries{{}, {}}; + } + + if (!opTimeStatus.isOK()) { + error() << "Error parsing minvalid entry: " << mv + << ", with status:" << opTimeStatus.getStatus(); + } + OpTime end(fassertStatusOK(40052, opTimeStatus)); LOG(3) << "returning minvalid: " << start.toString() << "(" << start.toBSON() << ") -> " << end.toString() << "(" << end.toBSON() << ")"; return BatchBoundaries(start, end); } LOG(3) << "returning empty minvalid"; - return BatchBoundaries{OpTime{}, OpTime{}}; + return BatchBoundaries{{}, {}}; } MONGO_WRITE_CONFLICT_RETRY_LOOP_END( txn, "StorageInterfaceImpl::getMinValid", _minValidNss.ns()); @@ -224,5 +267,164 @@ StatusWith<OpTime> StorageInterfaceImpl::writeOpsToOplog( return operations.back().getOpTime(); } +StatusWith<std::unique_ptr<CollectionBulkLoader>> +StorageInterfaceImpl::createCollectionForBulkLoading( + const NamespaceString& nss, + const CollectionOptions& options, + const BSONObj idIndexSpec, + const std::vector<BSONObj>& secondaryIndexSpecs) { + + UniqueLock lk(_runnersMutex); + // Check to make sure we don't already have a runner. + for (auto&& item : _runners) { + if (item.first == nss) { + return {ErrorCodes::IllegalOperation, + str::stream() << "There is already an active collection cloner for: " + << nss.ns()}; + } + } + // Create the runner, and schedule the collection creation. + _runners.emplace_back( + std::make_pair(nss, stdx::make_unique<TaskRunner>(_bulkLoaderThreads.get()))); + auto&& inserter = _runners.back(); + TaskRunner* runner = inserter.second.get(); + lk.unlock(); + + // Setup cond_var for signalling when done. + std::unique_ptr<CollectionBulkLoader> loaderToReturn; + + auto status = runner->runSynchronousTask([&](OperationContext* txn) -> Status { + // We are not replicating nor validating these writes. + txn->setReplicatedWrites(false); + DisableDocumentValidation validationDisabler(txn); + + // Retry if WCE. + MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { + // Get locks and create the collection. + ScopedTransaction transaction(txn, MODE_IX); + auto db = stdx::make_unique<AutoGetOrCreateDb>(txn, nss.db(), MODE_IX); + auto coll = stdx::make_unique<AutoGetCollection>(txn, nss, MODE_X); + Collection* collection = coll->getCollection(); + + if (collection) { + return {ErrorCodes::NamespaceExists, "Collection already exists."}; + } + + // Create the collection. + WriteUnitOfWork wunit(txn); + collection = db->getDb()->createCollection(txn, nss.ns(), options, false); + invariant(collection); + wunit.commit(); + coll = stdx::make_unique<AutoGetCollection>(txn, nss, MODE_IX); + + // Move locks into loader, so it now controls their lifetime. + auto loader = stdx::make_unique<CollectionBulkLoaderImpl>( + txn, runner, collection, idIndexSpec, std::move(db), std::move(coll)); + invariant(collection); + auto status = loader->init(txn, collection, secondaryIndexSpecs); + if (!status.isOK()) { + return status; + } + + // Move the loader into the StatusWith. + loaderToReturn = std::move(loader); + return Status::OK(); + } + MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "beginCollectionClone", nss.ns()); + MONGO_UNREACHABLE; + }); + + if (!status.isOK()) { + return status; + } + + return std::move(loaderToReturn); +} + + +Status StorageInterfaceImpl::insertDocument(OperationContext* txn, + const NamespaceString& nss, + const BSONObj& doc) { + ScopedTransaction transaction(txn, MODE_IX); + AutoGetOrCreateDb autoDB(txn, nss.db(), MODE_IX); + AutoGetCollection autoColl(txn, nss, MODE_IX); + if (!autoColl.getCollection()) { + return {ErrorCodes::NamespaceNotFound, + "The collection must exist before inserting documents."}; + } + WriteUnitOfWork wunit(txn); + const auto status = + autoColl.getCollection()->insertDocument(txn, doc, nullptr /** OpDebug **/, false, false); + if (status.isOK()) { + wunit.commit(); + } + return status; +} + +StatusWith<OpTime> StorageInterfaceImpl::insertOplogDocuments(OperationContext* txn, + const NamespaceString& nss, + const std::vector<BSONObj>& ops) { + MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { + ScopedTransaction transaction(txn, MODE_IX); + AutoGetOrCreateDb autoDB(txn, nss.db(), MODE_IX); + AutoGetCollection autoColl(txn, nss, MODE_IX); + if (!autoColl.getCollection()) { + return { + ErrorCodes::NamespaceNotFound, + str::stream() << "The oplog collection must exist before inserting documents, ns:" + << nss.ns()}; + } + if (!autoColl.getCollection()->isCapped()) { + return {ErrorCodes::NamespaceNotFound, + str::stream() << "The oplog collection must be capped, ns:" << nss.ns()}; + } + + WriteUnitOfWork wunit(txn); + const auto lastOpTime = repl::writeOpsToOplog(txn, nss.ns(), ops); + wunit.commit(); + return lastOpTime; + } + MONGO_WRITE_CONFLICT_RETRY_LOOP_END( + txn, "StorageInterfaceImpl::insertOplogDocuments", nss.ns()); +} + +Status StorageInterfaceImpl::dropReplicatedDatabases(OperationContext* txn) { + dropAllDatabasesExceptLocal(txn); + return Status::OK(); +} + +Status StorageInterfaceImpl::createOplog(OperationContext* txn, const NamespaceString& nss) { + mongo::repl::createOplog(txn, nss.ns(), true); + return Status::OK(); +} + +Status StorageInterfaceImpl::dropCollection(OperationContext* txn, const NamespaceString& nss) { + MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { + ScopedTransaction transaction(txn, MODE_IX); + AutoGetOrCreateDb autoDB(txn, nss.db(), MODE_X); + WriteUnitOfWork wunit(txn); + const auto status = autoDB.getDb()->dropCollection(txn, nss.ns()); + if (status.isOK()) { + wunit.commit(); + } + return status; + } + MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "StorageInterfaceImpl::dropCollection", nss.ns()); +} + +Status StorageInterfaceImpl::isAdminDbValid(OperationContext* txn) { + log() << "StorageInterfaceImpl::isAdminDbValid called."; + // TODO: plumb through operation context from caller, for now run on ioThread with runner. + TaskRunner runner(_bulkLoaderThreads.get()); + auto status = runner.runSynchronousTask( + [](OperationContext* txn) -> Status { + ScopedTransaction transaction(txn, MODE_IX); + AutoGetDb autoDB(txn, "admin", MODE_X); + return checkAdminDatabase(txn, autoDB.getDb()); + }, + TaskRunner::NextAction::kDisposeOperationContext); + return status; +} + } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/storage_interface_impl.h b/src/mongo/db/repl/storage_interface_impl.h index 9abf6dd5ff7..9f05270ef1f 100644 --- a/src/mongo/db/repl/storage_interface_impl.h +++ b/src/mongo/db/repl/storage_interface_impl.h @@ -30,8 +30,15 @@ #pragma once #include "mongo/base/disallow_copying.h" +#include "mongo/base/status.h" +#include "mongo/base/status_with.h" +#include "mongo/bson/bsonobj.h" +#include "mongo/db/catalog/index_create.h" +#include "mongo/db/db_raii.h" #include "mongo/db/namespace_string.h" #include "mongo/db/repl/storage_interface.h" +#include "mongo/db/repl/task_runner.h" +#include "mongo/util/concurrency/old_thread_pool.h" namespace mongo { namespace repl { @@ -46,6 +53,10 @@ public: StorageInterfaceImpl(); explicit StorageInterfaceImpl(const NamespaceString& minValidNss); + virtual ~StorageInterfaceImpl(); + + void startup() override; + void shutdown() override; /** * Returns namespace of collection containing the minvalid boundaries and initial sync flag. @@ -70,8 +81,42 @@ public: const NamespaceString& nss, const MultiApplier::Operations& operations) override; + /** + * Allocates a new TaskRunner for use by the passed in collection. + */ + StatusWith<std::unique_ptr<CollectionBulkLoader>> createCollectionForBulkLoading( + const NamespaceString& nss, + const CollectionOptions& options, + const BSONObj idIndexSpec, + const std::vector<BSONObj>& secondaryIndexSpecs) override; + + Status insertDocument(OperationContext* txn, + const NamespaceString& nss, + const BSONObj& doc) override; + + StatusWith<OpTime> insertOplogDocuments(OperationContext* txn, + const NamespaceString& nss, + const std::vector<BSONObj>& ops) override; + + Status dropReplicatedDatabases(OperationContext* txn) override; + + Status createOplog(OperationContext* txn, const NamespaceString& nss) override; + + Status dropCollection(OperationContext* txn, const NamespaceString& nss) override; + + Status isAdminDbValid(OperationContext* txn) override; + private: - NamespaceString _minValidNss; + // One thread per collection/TaskRunner + std::unique_ptr<OldThreadPool> _bulkLoaderThreads; + const NamespaceString _minValidNss; + + // This mutex protects _runners vector. + stdx::mutex _runnersMutex; + + // Each runner services a single collection and holds on to the OperationContext (and thread) + // until it is done with the collection (CollectionBulkLoaderImpl::commit/abort is called). + std::vector<std::pair<const NamespaceString&, std::unique_ptr<TaskRunner>>> _runners; }; } // namespace repl diff --git a/src/mongo/db/repl/storage_interface_impl_test.cpp b/src/mongo/db/repl/storage_interface_impl_test.cpp index 752944d4dcc..084bc33cd2d 100644 --- a/src/mongo/db/repl/storage_interface_impl_test.cpp +++ b/src/mongo/db/repl/storage_interface_impl_test.cpp @@ -32,6 +32,7 @@ #include "mongo/db/catalog/collection_options.h" #include "mongo/db/catalog/database.h" +#include "mongo/db/catalog/document_validation.h" #include "mongo/db/client.h" #include "mongo/db/concurrency/d_concurrency.h" #include "mongo/db/concurrency/write_conflict_exception.h" @@ -40,7 +41,9 @@ #include "mongo/db/dbhelpers.h" #include "mongo/db/namespace_string.h" #include "mongo/db/operation_context.h" +#include "mongo/db/repl/oplog.h" #include "mongo/db/repl/oplog_interface_local.h" +#include "mongo/db/repl/replication_coordinator_global.h" #include "mongo/db/repl/replication_coordinator_mock.h" #include "mongo/db/repl/storage_interface_impl.h" #include "mongo/db/service_context_d_test_fixture.h" @@ -115,12 +118,97 @@ OplogEntry makeOplogEntry(OpTime opTime) { return OplogEntry(bob.obj()); } +/** + * Helper to create default ReplSettings for tests. + */ +ReplSettings createReplSettings() { + ReplSettings settings; + settings.setOplogSizeBytes(5 * 1024 * 1024); + settings.setReplSetString("mySet/node1:12345"); + return settings; +} + +/** + * Creates a collection given the supplied options. + */ +StatusWith<Collection*> createCollection(OperationContext* txn, + NamespaceString& nss, + CollectionOptions opts = CollectionOptions()) { + ScopedTransaction transaction(txn, MODE_IX); + AutoGetOrCreateDb db{txn, nss.db(), MODE_X}; + WriteUnitOfWork wunit{txn}; + auto collection = db.getDb()->createCollection(txn, nss.ns(), opts, true); + if (collection) { + wunit.commit(); + return collection; + } + return {ErrorCodes::InternalError, "collection creation failed."}; +} + +/** + * Counts the number of keys in an index using an IndexAccessMethod::validate call. + */ +int64_t getIndexKeyCount(OperationContext* txn, IndexCatalog* cat, IndexDescriptor* desc) { + auto idx = cat->getIndex(desc); + int64_t numKeys; + ValidateResults fullRes; + idx->validate(txn, &numKeys, &fullRes); + return numKeys; +} + + class StorageInterfaceImplTest : public ServiceContextMongoDTest { protected: - Client* getClient() const; + Client* getClient() const { + return &cc(); + } + +private: + void setUp() override { + ServiceContextMongoDTest::setUp(); + // Initializes cc() used in ServiceContextMongoD::_newOpCtx(). + Client::initThreadIfNotAlready("StorageInterfaceImplTest"); + + ReplSettings settings; + settings.setOplogSizeBytes(5 * 1024 * 1024); + settings.setReplSetString("mySet/node1:12345"); + ReplicationCoordinator::set(getGlobalServiceContext(), + stdx::make_unique<ReplicationCoordinatorMock>(settings)); + } +}; + +class StorageInterfaceImplWithReplCoordTest : public ServiceContextMongoDTest { +protected: + void setUp() override { + ServiceContextMongoDTest::setUp(); + Client::initThreadIfNotAlready(); + createOptCtx(); + _coordinator = new ReplicationCoordinatorMock(createReplSettings()); + setGlobalReplicationCoordinator(_coordinator); + } + void tearDown() override { + _txn.reset(nullptr); + ServiceContextMongoDTest::tearDown(); + } + + + void createOptCtx() { + Client::initThreadIfNotAlready(); + _txn = cc().makeOperationContext(); + // We are not replicating nor validating these writes. + _txn->setReplicatedWrites(false); + DisableDocumentValidation validationDisabler(_txn.get()); + } + + OperationContext* getOperationContext() { + return _txn.get(); + } private: - void setUp() override; + ServiceContext::UniqueOperationContext _txn; + + // Owned by service context + ReplicationCoordinator* _coordinator; }; /** @@ -132,22 +220,6 @@ public: bool waitUntilDurableCalled = false; }; -void StorageInterfaceImplTest::setUp() { - ServiceContextMongoDTest::setUp(); - // Initializes cc() used in ServiceContextMongoD::_newOpCtx(). - Client::initThreadIfNotAlready("StorageInterfaceImplTest"); - - ReplSettings settings; - settings.setOplogSizeBytes(5 * 1024 * 1024); - settings.setReplSetString("mySet/node1:12345"); - ReplicationCoordinator::set(getGlobalServiceContext(), - stdx::make_unique<ReplicationCoordinatorMock>(settings)); -} - -Client* StorageInterfaceImplTest::getClient() const { - return &cc(); -} - bool RecoveryUnitWithDurabilityTracking::waitUntilDurable() { waitUntilDurableCalled = true; return RecoveryUnitNoop::waitUntilDurable(); @@ -189,6 +261,25 @@ TEST_F(StorageInterfaceImplTest, InitialSyncFlag) { ASSERT_FALSE(storageInterface.getInitialSyncFlag(txn.get())); } +TEST_F(StorageInterfaceImplTest, GetMinValidAfterSettingInitialSyncFlagWorks) { + NamespaceString nss( + "local.StorageInterfaceImplTest_GetMinValidAfterSettingInitialSyncFlagWorks"); + + StorageInterfaceImpl storageInterface(nss); + auto txn = getClient()->makeOperationContext(); + + // Initial sync flag should be unset after initializing a new storage engine. + ASSERT_FALSE(storageInterface.getInitialSyncFlag(txn.get())); + + // Setting initial sync flag should affect getInitialSyncFlag() result. + storageInterface.setInitialSyncFlag(txn.get()); + ASSERT_TRUE(storageInterface.getInitialSyncFlag(txn.get())); + + auto minValid = storageInterface.getMinValid(txn.get()); + ASSERT_TRUE(minValid.start.isNull()); + ASSERT_TRUE(minValid.end.isNull()); +} + TEST_F(StorageInterfaceImplTest, MinValid) { NamespaceString nss("local.StorageInterfaceImplTest_MinValid"); @@ -303,4 +394,121 @@ TEST_F(StorageInterfaceImplTest, ASSERT_STRING_CONTAINS(status.reason(), "collection not found"); } +TEST_F(StorageInterfaceImplWithReplCoordTest, InsertMissingDocWorksOnExistingCappedCollection) { + auto txn = getOperationContext(); + StorageInterfaceImpl storage; + NamespaceString nss("foo.bar"); + CollectionOptions opts; + opts.capped = true; + opts.cappedSize = 1024 * 1024; + ASSERT_OK(createCollection(txn, nss, opts)); + ASSERT_OK(storage.insertDocument(txn, nss, BSON("_id" << 1))); + AutoGetCollectionForRead autoColl(txn, nss); + ASSERT_TRUE(autoColl.getCollection()); +} + +TEST_F(StorageInterfaceImplWithReplCoordTest, InsertMissingDocWorksOnExistingCollection) { + auto txn = getOperationContext(); + StorageInterfaceImpl storage; + NamespaceString nss("foo.bar"); + ASSERT_OK(createCollection(txn, nss)); + ASSERT_OK(storage.insertDocument(txn, nss, BSON("_id" << 1))); + AutoGetCollectionForRead autoColl(txn, nss); + ASSERT_TRUE(autoColl.getCollection()); +} + +TEST_F(StorageInterfaceImplWithReplCoordTest, InsertMissingDocFailesIfCollectionIsMissing) { + auto txn = getOperationContext(); + StorageInterfaceImpl storage; + NamespaceString nss("foo.bar"); + const auto status = storage.insertDocument(txn, nss, BSON("_id" << 1)); + ASSERT_NOT_OK(status); + ASSERT_EQ(status.code(), ErrorCodes::NamespaceNotFound); +} + +TEST_F(StorageInterfaceImplWithReplCoordTest, CreateCollectionWithIDIndexCommits) { + auto txn = getOperationContext(); + StorageInterfaceImpl storage; + storage.startup(); + NamespaceString nss("foo.bar"); + CollectionOptions opts; + std::vector<BSONObj> indexes; + auto loaderStatus = storage.createCollectionForBulkLoading(nss, opts, {}, indexes); + ASSERT_OK(loaderStatus.getStatus()); + auto loader = std::move(loaderStatus.getValue()); + std::vector<BSONObj> docs = {BSON("_id" << 1), BSON("_id" << 1), BSON("_id" << 2)}; + ASSERT_OK(loader->insertDocuments(docs.begin(), docs.end())); + ASSERT_OK(loader->commit()); + + AutoGetCollectionForRead autoColl(txn, nss); + auto coll = autoColl.getCollection(); + ASSERT(coll); + ASSERT_EQ(coll->getRecordStore()->numRecords(txn), 2LL); + auto collIdxCat = coll->getIndexCatalog(); + auto idIdxDesc = collIdxCat->findIdIndex(txn); + auto count = getIndexKeyCount(txn, collIdxCat, idIdxDesc); + ASSERT_EQ(count, 2LL); +} + +TEST_F(StorageInterfaceImplWithReplCoordTest, CreateCollectionThatAlreadyExistsFails) { + auto txn = getOperationContext(); + StorageInterfaceImpl storage; + storage.startup(); + NamespaceString nss("test.system.indexes"); + auto coll = createCollection(txn, nss); + ASSERT_OK(coll.getStatus()); + + const CollectionOptions opts; + const std::vector<BSONObj> indexes; + const auto status = storage.createCollectionForBulkLoading(nss, opts, {}, indexes); + ASSERT_NOT_OK(status.getStatus()); +} + +TEST_F(StorageInterfaceImplWithReplCoordTest, CreateOplogCreateCappedCollection) { + auto txn = getOperationContext(); + StorageInterfaceImpl storage; + NamespaceString nss("local.oplog.X"); + { + AutoGetCollectionForRead autoColl(txn, nss); + ASSERT_FALSE(autoColl.getCollection()); + } + ASSERT_OK(storage.createOplog(txn, nss)); + { + AutoGetCollectionForRead autoColl(txn, nss); + ASSERT_TRUE(autoColl.getCollection()); + ASSERT_EQ(nss.toString(), autoColl.getCollection()->ns().toString()); + ASSERT_TRUE(autoColl.getCollection()->isCapped()); + } +} + +TEST_F(StorageInterfaceImplWithReplCoordTest, DropCollectionWorksWithExistingWithDataCollection) { + auto txn = getOperationContext(); + StorageInterfaceImpl storage; + NamespaceString nss("foo.bar"); + auto coll = createCollection(txn, nss); + ASSERT_OK(coll.getStatus()); + ASSERT_OK(coll.getValue()->insertDocument( + txn, BSON("_id" << 1), nullptr /** OpDebug **/, false, true)); + ASSERT_OK(storage.dropCollection(txn, nss)); +} + +TEST_F(StorageInterfaceImplWithReplCoordTest, DropCollectionWorksWithExistingEmptyCollection) { + auto txn = getOperationContext(); + StorageInterfaceImpl storage; + NamespaceString nss("foo.bar"); + ASSERT_OK(createCollection(txn, nss)); + ASSERT_OK(storage.dropCollection(txn, nss)); + AutoGetCollectionForRead autoColl(txn, nss); + ASSERT_FALSE(autoColl.getCollection()); +} + +TEST_F(StorageInterfaceImplWithReplCoordTest, DropCollectionWorksWithMissingCollection) { + auto txn = getOperationContext(); + StorageInterfaceImpl storage; + NamespaceString nss("foo.bar"); + ASSERT_OK(storage.dropCollection(txn, nss)); + AutoGetCollectionForRead autoColl(txn, nss); + ASSERT_FALSE(autoColl.getCollection()); +} + } // namespace diff --git a/src/mongo/db/repl/storage_interface_mock.cpp b/src/mongo/db/repl/storage_interface_mock.cpp index 36dcb6ce847..57daebdb60c 100644 --- a/src/mongo/db/repl/storage_interface_mock.cpp +++ b/src/mongo/db/repl/storage_interface_mock.cpp @@ -25,14 +25,21 @@ * exception statement from all source files in the program, then also delete * it in the license file. */ +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication + +#include <numeric> #include "mongo/platform/basic.h" #include "mongo/db/repl/storage_interface_mock.h" +#include "mongo/util/log.h" +#include "mongo/util/mongoutils/str.h" + namespace mongo { namespace repl { - +void StorageInterfaceMock::startup() {} +void StorageInterfaceMock::shutdown() {} bool StorageInterfaceMock::getInitialSyncFlag(OperationContext* txn) const { stdx::lock_guard<stdx::mutex> lock(_initialSyncFlagMutex); return _initialSyncFlag; @@ -80,5 +87,31 @@ MultiApplier::Operations StorageInterfaceMock::getOperationsWrittenToOplog() con return _operationsWrittenToOplog; } +Status CollectionBulkLoaderMock::init(OperationContext* txn, + Collection* coll, + const std::vector<BSONObj>& secondaryIndexSpecs) { + LOG(1) << "CollectionBulkLoaderMock::init called"; + stats->initCalled = true; + return initFn(txn, coll, secondaryIndexSpecs); +}; + +Status CollectionBulkLoaderMock::insertDocuments(const std::vector<BSONObj>::const_iterator begin, + const std::vector<BSONObj>::const_iterator end) { + LOG(1) << "CollectionBulkLoaderMock::insertDocuments called"; + const auto status = insertDocsFn(begin, end); + + // Only count if it succeeds. + if (status.isOK()) { + stats->insertCount += std::distance(begin, end); + } + return status; +}; + +Status CollectionBulkLoaderMock::commit() { + LOG(1) << "CollectionBulkLoaderMock::commit called"; + stats->commitCalled = true; + return commitFn(); +}; + } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/storage_interface_mock.h b/src/mongo/db/repl/storage_interface_mock.h index 99ca0020e0b..143d5d367cd 100644 --- a/src/mongo/db/repl/storage_interface_mock.h +++ b/src/mongo/db/repl/storage_interface_mock.h @@ -30,18 +30,68 @@ #pragma once #include "mongo/base/disallow_copying.h" +#include "mongo/base/status.h" +#include "mongo/base/status_with.h" +#include "mongo/bson/bsonobj.h" +#include "mongo/db/namespace_string.h" #include "mongo/db/repl/storage_interface.h" #include "mongo/stdx/mutex.h" namespace mongo { namespace repl { +struct CollectionMockStats { + bool initCalled = false; + int insertCount = 0; + bool commitCalled = false; +}; + +class CollectionBulkLoaderMock : public CollectionBulkLoader { + MONGO_DISALLOW_COPYING(CollectionBulkLoaderMock); + +public: + CollectionBulkLoaderMock(CollectionMockStats* collStats) : stats(collStats){}; + virtual ~CollectionBulkLoaderMock() = default; + virtual Status init(OperationContext* txn, + Collection* coll, + const std::vector<BSONObj>& secondaryIndexSpecs) override; + + virtual Status insertDocuments(const std::vector<BSONObj>::const_iterator begin, + const std::vector<BSONObj>::const_iterator end) override; + virtual Status commit() override; + + std::string toString() const override { + return toBSON().toString(); + }; + BSONObj toBSON() const override { + return BSONObj(); + }; + + CollectionMockStats* stats; + + // Override functions. + stdx::function<Status(const std::vector<BSONObj>::const_iterator begin, + const std::vector<BSONObj>::const_iterator end)> + insertDocsFn = [](const std::vector<BSONObj>::const_iterator, + const std::vector<BSONObj>::const_iterator) { return Status::OK(); }; + stdx::function<Status()> abortFn = []() { return Status::OK(); }; + stdx::function<Status()> commitFn = []() { return Status::OK(); }; + stdx::function<Status( + OperationContext* txn, Collection* coll, const std::vector<BSONObj>& secondaryIndexSpecs)> + initFn = [](OperationContext*, Collection*, const std::vector<BSONObj>&) { + return Status::OK(); + }; +}; + class StorageInterfaceMock : public StorageInterface { MONGO_DISALLOW_COPYING(StorageInterfaceMock); public: StorageInterfaceMock() = default; + void startup() override; + void shutdown() override; + bool getInitialSyncFlag(OperationContext* txn) const override; void setInitialSyncFlag(OperationContext* txn) override; void clearInitialSyncFlag(OperationContext* txn) override; @@ -52,13 +102,76 @@ public: const DurableRequirement durReq) override; void setMinValid(OperationContext* txn, const BatchBoundaries& boundaries) override; - StatusWith<OpTime> writeOpsToOplog(OperationContext* txn, const NamespaceString& nss, const MultiApplier::Operations& operations) override; MultiApplier::Operations getOperationsWrittenToOplog() const; + StatusWith<std::unique_ptr<CollectionBulkLoader>> createCollectionForBulkLoading( + const NamespaceString& nss, + const CollectionOptions& options, + const BSONObj idIndexSpec, + const std::vector<BSONObj>& secondaryIndexSpecs) override { + return createCollectionFn(nss, options, idIndexSpec, secondaryIndexSpecs); + }; + + Status insertDocument(OperationContext* txn, + const NamespaceString& nss, + const BSONObj& doc) override { + return insertDocumentFn(txn, nss, doc); + }; + + StatusWith<OpTime> insertOplogDocuments(OperationContext* txn, + const NamespaceString& nss, + const std::vector<BSONObj>& ops) override { + return insertOplogDocumentsFn(txn, nss, ops); + } + + Status dropReplicatedDatabases(OperationContext* txn) override { + return dropUserDBsFn(txn); + }; + + Status createOplog(OperationContext* txn, const NamespaceString& nss) override { + return createOplogFn(txn, nss); + }; + + Status dropCollection(OperationContext* txn, const NamespaceString& nss) override { + return dropCollFn(txn, nss); + }; + + Status isAdminDbValid(OperationContext* txn) override { + return Status::OK(); + }; + + + // Testing functions. + CreateCollectionFn createCollectionFn = [](const NamespaceString& nss, + const CollectionOptions& options, + const BSONObj idIndexSpec, + const std::vector<BSONObj>& secondaryIndexSpecs) + -> StatusWith<std::unique_ptr<CollectionBulkLoader>> { + return Status{ErrorCodes::IllegalOperation, "CreateCollectionFn not implemented."}; + }; + InsertDocumentFn insertDocumentFn = + [](OperationContext* txn, const NamespaceString& nss, const BSONObj& doc) { + return Status{ErrorCodes::IllegalOperation, "InsertDocumentFn not implemented."}; + }; + InsertOplogDocumentsFn insertOplogDocumentsFn = + [](OperationContext* txn, const NamespaceString& nss, const std::vector<BSONObj>& ops) { + return StatusWith<OpTime>( + Status{ErrorCodes::IllegalOperation, "InsertOplogDocumentsFn not implemented."}); + }; + DropUserDatabasesFn dropUserDBsFn = [](OperationContext* txn) { + return Status{ErrorCodes::IllegalOperation, "DropUserDatabasesFn not implemented."}; + }; + CreateOplogFn createOplogFn = [](OperationContext* txn, const NamespaceString& nss) { + return Status{ErrorCodes::IllegalOperation, "CreateOplogFn not implemented."}; + }; + DropCollectionFn dropCollFn = [](OperationContext* txn, const NamespaceString& nss) { + return Status{ErrorCodes::IllegalOperation, "DropCollectionFn not implemented."}; + }; + private: mutable stdx::mutex _initialSyncFlagMutex; bool _initialSyncFlag = false; diff --git a/src/mongo/db/repl/task_runner.cpp b/src/mongo/db/repl/task_runner.cpp index 9665a17972e..a5bd1a5e852 100644 --- a/src/mongo/db/repl/task_runner.cpp +++ b/src/mongo/db/repl/task_runner.cpp @@ -51,6 +51,8 @@ namespace repl { namespace { using UniqueLock = stdx::unique_lock<stdx::mutex>; +using LockGuard = stdx::lock_guard<stdx::mutex>; + /** * Runs a single task runner task. @@ -209,5 +211,43 @@ TaskRunner::Task TaskRunner::_waitForNextTask() { return task; } +Status TaskRunner::runSynchronousTask(SynchronousTask func, TaskRunner::NextAction nextAction) { + // Setup cond_var for signaling when done. + bool done = false; + stdx::mutex mutex; + stdx::condition_variable waitTillDoneCond; + + Status returnStatus{Status::OK()}; + this->schedule([&](OperationContext* txn, const Status taskStatus) { + if (!taskStatus.isOK()) { + returnStatus = taskStatus; + } else { + // Run supplied function. + try { + log() << "starting to run synchronous task on runner."; + returnStatus = func(txn); + log() << "done running the synchronous task."; + } catch (...) { + returnStatus = exceptionToStatus(); + error() << "Exception thrown in runSynchronousTask: " << returnStatus; + } + } + + // Signal done. + LockGuard lk2{mutex}; + done = true; + waitTillDoneCond.notify_all(); + + // return nextAction based on status from supplied function. + if (returnStatus.isOK()) { + return nextAction; + } + return TaskRunner::NextAction::kCancel; + }); + + UniqueLock lk{mutex}; + waitTillDoneCond.wait(lk, [&done] { return done; }); + return returnStatus; +} } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/task_runner.h b/src/mongo/db/repl/task_runner.h index 80fbd12d337..2b201757ff1 100644 --- a/src/mongo/db/repl/task_runner.h +++ b/src/mongo/db/repl/task_runner.h @@ -59,6 +59,16 @@ public: }; using Task = stdx::function<NextAction(OperationContext*, const Status&)>; + using SynchronousTask = stdx::function<Status(OperationContext* txn)>; + + /** + * Returns the Status from the supplied function after running it.. + * + * Note: TaskRunner::NextAction controls when the operation context and thread will be released. + */ + Status runSynchronousTask( + SynchronousTask func, + TaskRunner::NextAction nextAction = TaskRunner::NextAction::kKeepOperationContext); /** * Creates a Task returning kCancel. This is useful in shutting down the task runner after diff --git a/src/mongo/db/storage/mmap_v1/repair_database.cpp b/src/mongo/db/storage/mmap_v1/repair_database.cpp index 5587e4c2b9a..7de8e632971 100644 --- a/src/mongo/db/storage/mmap_v1/repair_database.cpp +++ b/src/mongo/db/storage/mmap_v1/repair_database.cpp @@ -403,12 +403,13 @@ Status MMAPV1Engine::repairDatabase(OperationContext* txn, } } + std::vector<MultiIndexBlock*> indexers{&indexer}; auto cursor = originalCollection->getCursor(txn); while (auto record = cursor->next()) { BSONObj doc = record->data.releaseToBson(); WriteUnitOfWork wunit(txn); - Status status = tempCollection->insertDocument(txn, doc, &indexer, false); + Status status = tempCollection->insertDocument(txn, doc, indexers, false); if (!status.isOK()) return status; diff --git a/src/mongo/shell/servers.js b/src/mongo/shell/servers.js index e5163ba2200..2f86ff31962 100644 --- a/src/mongo/shell/servers.js +++ b/src/mongo/shell/servers.js @@ -720,6 +720,7 @@ var MongoRunner, _startMongod, startMongoProgram, runMongoProgram, startMongoPro MongoRunner.StopError.prototype.constructor = MongoRunner.StopError; // Constants for exit codes of MongoDB processes + MongoRunner.EXIT_ABORT = -6; MongoRunner.EXIT_CLEAN = 0; MongoRunner.EXIT_BADOPTIONS = 2; MongoRunner.EXIT_REPLICATION_ERROR = 3; |