summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl
diff options
context:
space:
mode:
authorScott Hernandez <scotthernandez@tart.local>2016-06-10 16:12:01 -0400
committerScott Hernandez <scotthernandez@tart.local>2016-06-17 11:53:01 -0400
commit42eb9464500381ef8034070472badded4b427cf5 (patch)
tree9f7aaf66732d0352f37ced532f6c0fc99cd5793d /src/mongo/db/repl
parentc59f5ade57e41b6a50f40999ea14883da691e951 (diff)
downloadmongo-42eb9464500381ef8034070472badded4b427cf5.tar.gz
SERVER-23059: storage interface improvements
Diffstat (limited to 'src/mongo/db/repl')
-rw-r--r--src/mongo/db/repl/SConscript1
-rw-r--r--src/mongo/db/repl/collection_bulk_loader.h67
-rw-r--r--src/mongo/db/repl/collection_bulk_loader_impl.cpp207
-rw-r--r--src/mongo/db/repl/collection_bulk_loader_impl.h90
-rw-r--r--src/mongo/db/repl/oplog.cpp23
-rw-r--r--src/mongo/db/repl/oplog.h12
-rw-r--r--src/mongo/db/repl/rs_initialsync.cpp113
-rw-r--r--src/mongo/db/repl/rs_initialsync.h11
-rw-r--r--src/mongo/db/repl/storage_interface.cpp6
-rw-r--r--src/mongo/db/repl/storage_interface.h89
-rw-r--r--src/mongo/db/repl/storage_interface_impl.cpp210
-rw-r--r--src/mongo/db/repl/storage_interface_impl.h47
-rw-r--r--src/mongo/db/repl/storage_interface_impl_test.cpp244
-rw-r--r--src/mongo/db/repl/storage_interface_mock.cpp35
-rw-r--r--src/mongo/db/repl/storage_interface_mock.h115
-rw-r--r--src/mongo/db/repl/task_runner.cpp40
-rw-r--r--src/mongo/db/repl/task_runner.h10
17 files changed, 1219 insertions, 101 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index f7c80b2cb0d..f745889c129 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -67,6 +67,7 @@ env.Library(
target='storage_interface_impl',
source=[
'storage_interface_impl.cpp',
+ 'collection_bulk_loader_impl.cpp',
],
LIBDEPS=[
'storage_interface',
diff --git a/src/mongo/db/repl/collection_bulk_loader.h b/src/mongo/db/repl/collection_bulk_loader.h
new file mode 100644
index 00000000000..f3f5e0cb6e5
--- /dev/null
+++ b/src/mongo/db/repl/collection_bulk_loader.h
@@ -0,0 +1,67 @@
+/**
+ * Copyright (C) 2015 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+
+#pragma once
+
+#include <string>
+
+namespace mongo {
+
+class Collection;
+class OperationContext;
+
+namespace repl {
+
+/**
+ * Used on a local Collection to create and bulk build indexes.
+ */
+class CollectionBulkLoader {
+public:
+ virtual ~CollectionBulkLoader() = default;
+
+ virtual Status init(OperationContext* txn,
+ Collection* coll,
+ const std::vector<BSONObj>& indexSpecs) = 0;
+ /**
+ * Inserts the documents into the collection record store, and indexes them with the
+ * MultiIndexBlock on the side.
+ */
+ virtual Status insertDocuments(const std::vector<BSONObj>::const_iterator begin,
+ const std::vector<BSONObj>::const_iterator end) = 0;
+ /**
+ * Called when inserts are done and indexes can be committed.
+ */
+ virtual Status commit() = 0;
+
+ virtual std::string toString() const = 0;
+ virtual BSONObj toBSON() const = 0;
+};
+
+} // namespace repl
+} // namespace mongo
diff --git a/src/mongo/db/repl/collection_bulk_loader_impl.cpp b/src/mongo/db/repl/collection_bulk_loader_impl.cpp
new file mode 100644
index 00000000000..9cc6fa5f7f0
--- /dev/null
+++ b/src/mongo/db/repl/collection_bulk_loader_impl.cpp
@@ -0,0 +1,207 @@
+/**
+ * Copyright (C) 2015 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/base/status.h"
+#include "mongo/base/status_with.h"
+#include "mongo/bson/bsonobj.h"
+#include "mongo/bson/bsonobjbuilder.h"
+#include "mongo/db/catalog/collection.h"
+#include "mongo/db/catalog/index_create.h"
+#include "mongo/db/client.h"
+#include "mongo/db/concurrency/d_concurrency.h"
+#include "mongo/db/concurrency/write_conflict_exception.h"
+#include "mongo/db/curop.h"
+#include "mongo/db/jsobj.h"
+#include "mongo/db/operation_context.h"
+#include "mongo/db/repl/collection_bulk_loader_impl.h"
+#include "mongo/util/destructor_guard.h"
+#include "mongo/util/log.h"
+#include "mongo/util/mongoutils/str.h"
+
+namespace mongo {
+namespace repl {
+
+CollectionBulkLoaderImpl::CollectionBulkLoaderImpl(OperationContext* txn,
+ TaskRunner* runner,
+ Collection* coll,
+ const BSONObj idIndexSpec,
+ std::unique_ptr<AutoGetOrCreateDb> autoDb,
+ std::unique_ptr<AutoGetCollection> autoColl)
+ : _runner(runner),
+ _autoColl(std::move(autoColl)),
+ _autoDB(std::move(autoDb)),
+ _txn(txn),
+ _coll(coll),
+ _nss{coll->ns()},
+ _idIndexBlock{txn, coll},
+ _secondaryIndexesBlock{txn, coll},
+ _idIndexSpec(!idIndexSpec.isEmpty() ? idIndexSpec : BSON("ns" << _nss.toString() << "name"
+ << "_id_"
+ << "key"
+ << BSON("_id" << 1)
+ << "unique"
+ << true)) {
+ invariant(txn);
+ invariant(coll);
+ invariant(runner);
+ invariant(_autoDB);
+ invariant(_autoColl);
+ invariant(_autoDB->getDb());
+ invariant(_autoColl->getDb() == _autoDB->getDb());
+}
+
+CollectionBulkLoaderImpl::~CollectionBulkLoaderImpl() {
+ DESTRUCTOR_GUARD(_runner->cancel();)
+}
+
+Status CollectionBulkLoaderImpl::init(OperationContext* txn,
+ Collection* coll,
+ const std::vector<BSONObj>& secondaryIndexSpecs) {
+ invariant(txn);
+ invariant(coll);
+ invariant(txn->getClient() == &cc());
+ _callAbortOnDestructor = true;
+ if (secondaryIndexSpecs.size()) {
+ _hasSecondaryIndexes = true;
+ _secondaryIndexesBlock.ignoreUniqueConstraint();
+ auto status = _secondaryIndexesBlock.init(secondaryIndexSpecs);
+ if (!status.isOK()) {
+ return status;
+ }
+ }
+
+ auto status = _idIndexBlock.init(_idIndexSpec);
+ if (!status.isOK()) {
+ return status;
+ }
+
+ return Status::OK();
+}
+
+Status CollectionBulkLoaderImpl::insertDocuments(const std::vector<BSONObj>::const_iterator begin,
+ const std::vector<BSONObj>::const_iterator end) {
+ int count = 0;
+ return _runner->runSynchronousTask([begin, end, &count, this](OperationContext* txn) -> Status {
+ invariant(txn);
+
+ for (auto iter = begin; iter != end; ++iter) {
+ std::vector<MultiIndexBlock*> indexers{&_idIndexBlock};
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
+ WriteUnitOfWork wunit(txn);
+ const auto status = _coll->insertDocument(txn, *iter, indexers, false);
+ if (!status.isOK()) {
+ return status;
+ }
+ wunit.commit();
+ }
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_END(
+ _txn, "CollectionBulkLoaderImpl::insertDocuments", _nss.ns());
+
+ ++count;
+ }
+ return Status::OK();
+ });
+}
+
+Status CollectionBulkLoaderImpl::commit() {
+ return _runner->runSynchronousTask(
+ [this](OperationContext* txn) -> Status {
+ invariant(txn->getClient() == &cc());
+ invariant(txn == _txn);
+
+ // Commit before deleting dups, so the dups will be removed from secondary indexes when
+ // deleted.
+ if (_hasSecondaryIndexes) {
+ std::set<RecordId> secDups;
+ auto status = _secondaryIndexesBlock.doneInserting(&secDups);
+ if (!status.isOK()) {
+ return status;
+ }
+ if (secDups.size()) {
+ return Status{ErrorCodes::UserDataInconsistent,
+ "Found duplicates when dups are disabled in MultiIndexBlock."};
+ }
+ WriteUnitOfWork wunit(txn);
+ _secondaryIndexesBlock.commit();
+ wunit.commit();
+ }
+
+ // Delete dups.
+ std::set<RecordId> dups;
+ // Do not do inside a WriteUnitOfWork (required by doneInserting).
+ auto status = _idIndexBlock.doneInserting(&dups);
+ if (!status.isOK()) {
+ return status;
+ }
+
+ for (auto&& it : dups) {
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
+ WriteUnitOfWork wunit(_txn);
+ _coll->deleteDocument(_txn,
+ it,
+ nullptr /** OpDebug **/,
+ false /* fromMigrate */,
+ true /* noWarn */);
+ wunit.commit();
+ }
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_END(
+ _txn, "CollectionBulkLoaderImpl::commit", _nss.ns());
+ }
+
+ // Commit _id index, without dups.
+ WriteUnitOfWork wunit(txn);
+ _idIndexBlock.commit();
+ wunit.commit();
+
+ // release locks.
+ _autoColl.reset(nullptr);
+ _autoDB.reset(nullptr);
+ _coll = nullptr;
+ _callAbortOnDestructor = false;
+ return Status::OK();
+ },
+ TaskRunner::NextAction::kDisposeOperationContext);
+}
+
+std::string CollectionBulkLoaderImpl::toString() const {
+ return toBSON().toString();
+}
+
+BSONObj CollectionBulkLoaderImpl::toBSON() const {
+ BSONObjBuilder bob;
+ bob.append("BulkLoader", _nss.toString());
+ // TODO: Add index specs here.
+ return bob.done();
+}
+
+} // namespace repl
+} // namespace mongo
diff --git a/src/mongo/db/repl/collection_bulk_loader_impl.h b/src/mongo/db/repl/collection_bulk_loader_impl.h
new file mode 100644
index 00000000000..1650d98f5fb
--- /dev/null
+++ b/src/mongo/db/repl/collection_bulk_loader_impl.h
@@ -0,0 +1,90 @@
+/**
+ * Copyright (C) 2015 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+
+#pragma once
+
+#include "mongo/base/disallow_copying.h"
+#include "mongo/base/status.h"
+#include "mongo/base/status_with.h"
+#include "mongo/bson/bsonobj.h"
+#include "mongo/db/catalog/index_create.h"
+#include "mongo/db/db_raii.h"
+#include "mongo/db/namespace_string.h"
+#include "mongo/db/repl/collection_bulk_loader.h"
+#include "mongo/db/repl/storage_interface.h"
+#include "mongo/db/repl/task_runner.h"
+#include "mongo/util/concurrency/old_thread_pool.h"
+
+namespace mongo {
+namespace repl {
+
+/**
+ * Class in charge of building a collection during data loading (like initial sync).
+ *
+ * Note: Call commit when done inserting documents.
+ */
+class CollectionBulkLoaderImpl : public CollectionBulkLoader {
+ MONGO_DISALLOW_COPYING(CollectionBulkLoaderImpl);
+
+public:
+ CollectionBulkLoaderImpl(OperationContext* txn,
+ TaskRunner* runner,
+ Collection* coll,
+ const BSONObj idIndexSpec,
+ std::unique_ptr<AutoGetOrCreateDb> autoDB,
+ std::unique_ptr<AutoGetCollection> autoColl);
+ virtual ~CollectionBulkLoaderImpl();
+
+ virtual Status init(OperationContext* txn,
+ Collection* coll,
+ const std::vector<BSONObj>& secondaryIndexSpecs) override;
+
+ virtual Status insertDocuments(const std::vector<BSONObj>::const_iterator begin,
+ const std::vector<BSONObj>::const_iterator end) override;
+ virtual Status commit() override;
+
+ virtual std::string toString() const override;
+ virtual BSONObj toBSON() const override;
+
+private:
+ TaskRunner* _runner;
+ std::unique_ptr<AutoGetCollection> _autoColl;
+ std::unique_ptr<AutoGetOrCreateDb> _autoDB;
+ OperationContext* _txn = nullptr;
+ Collection* _coll = nullptr;
+ NamespaceString _nss;
+ MultiIndexBlock _idIndexBlock;
+ MultiIndexBlock _secondaryIndexesBlock;
+ bool _hasSecondaryIndexes = false;
+ BSONObj _idIndexSpec;
+ bool _callAbortOnDestructor = false;
+};
+
+} // namespace repl
+} // namespace mongo
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp
index 1cad85f2607..820f4b11c96 100644
--- a/src/mongo/db/repl/oplog.cpp
+++ b/src/mongo/db/repl/oplog.cpp
@@ -518,15 +518,14 @@ long long getNewOplogSizeBytes(OperationContext* txn, const ReplSettings& replSe
}
} // namespace
-void createOplog(OperationContext* txn) {
+void createOplog(OperationContext* txn, const std::string& oplogCollectionName, bool isReplSet) {
ScopedTransaction transaction(txn, MODE_X);
Lock::GlobalWrite lk(txn->lockState());
const ReplSettings& replSettings = ReplicationCoordinator::get(txn)->getSettings();
- bool replEnabled = replSettings.usingReplSets();
- OldClientContext ctx(txn, _oplogCollectionName);
- Collection* collection = ctx.db()->getCollection(_oplogCollectionName);
+ OldClientContext ctx(txn, oplogCollectionName);
+ Collection* collection = ctx.db()->getCollection(oplogCollectionName);
if (collection) {
if (replSettings.getOplogSizeBytes() != 0) {
@@ -544,8 +543,8 @@ void createOplog(OperationContext* txn) {
}
}
- if (!replEnabled)
- initTimestampFromOplog(txn, _oplogCollectionName);
+ if (!isReplSet)
+ initTimestampFromOplog(txn, oplogCollectionName);
return;
}
@@ -562,12 +561,12 @@ void createOplog(OperationContext* txn) {
MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
WriteUnitOfWork uow(txn);
- invariant(ctx.db()->createCollection(txn, _oplogCollectionName, options));
- if (!replEnabled)
+ invariant(ctx.db()->createCollection(txn, oplogCollectionName, options));
+ if (!isReplSet)
getGlobalServiceContext()->getOpObserver()->onOpMessage(txn, BSONObj());
uow.commit();
}
- MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "createCollection", _oplogCollectionName);
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "createCollection", oplogCollectionName);
/* sync here so we don't get any surprising lag later when we try to sync */
StorageEngine* storageEngine = getGlobalServiceContext()->getGlobalStorageEngine();
@@ -575,6 +574,12 @@ void createOplog(OperationContext* txn) {
log() << "******" << endl;
}
+void createOplog(OperationContext* txn) {
+ const auto isReplSet = ReplicationCoordinator::get(txn)->getReplicationMode() ==
+ ReplicationCoordinator::modeReplSet;
+ createOplog(txn, _oplogCollectionName, isReplSet);
+}
+
// -------------------------------------
namespace {
diff --git a/src/mongo/db/repl/oplog.h b/src/mongo/db/repl/oplog.h
index 2b0f77164b2..9015af9e238 100644
--- a/src/mongo/db/repl/oplog.h
+++ b/src/mongo/db/repl/oplog.h
@@ -52,10 +52,16 @@ class ReplSettings;
*/
void truncateOplogTo(OperationContext* txn, Timestamp truncateTimestamp);
+/**
+ * Create a new capped collection for the oplog if it doesn't yet exist.
+ * If the collection already exists (and isReplSet is false),
+ * set the 'last' Timestamp from the last entry of the oplog collection (side effect!)
+ */
+void createOplog(OperationContext* txn, const std::string& oplogCollectionName, bool isReplSet);
+
/*
- * Create a new capped collection for the oplog using createOplog() if it doesn't yet exist.
- * Collection name will be "_oplogCollectionName" initialized in setOplogCollectionName().
- * This will be either local.oplog.rs (replica sets) or local.oplog.$main (master/slave)
+ * Shortcut for above function using oplogCollectionName = _oplogCollectionName,
+ * and replEnabled = replCoord::isReplSet();
*/
void createOplog(OperationContext* txn);
diff --git a/src/mongo/db/repl/rs_initialsync.cpp b/src/mongo/db/repl/rs_initialsync.cpp
index 8c50820b31e..696e699b984 100644
--- a/src/mongo/db/repl/rs_initialsync.cpp
+++ b/src/mongo/db/repl/rs_initialsync.cpp
@@ -45,6 +45,7 @@
#include "mongo/db/dbhelpers.h"
#include "mongo/db/op_observer.h"
#include "mongo/db/repl/bgsync.h"
+#include "mongo/db/repl/data_replicator.h"
#include "mongo/db/repl/initial_sync.h"
#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/oplogreader.h"
@@ -112,61 +113,6 @@ void truncateAndResetOplog(OperationContext* txn,
MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "truncate", collection->ns().ns());
}
-/**
- * Confirms that the "admin" database contains a supported version of the auth
- * data schema. Terminates the process if the "admin" contains clearly incompatible
- * auth data.
- */
-void checkAdminDatabasePostClone(OperationContext* txn, Database* adminDb) {
- // Assumes txn holds MODE_X or MODE_S lock on "admin" database.
- if (!adminDb) {
- return;
- }
- Collection* const usersCollection =
- adminDb->getCollection(AuthorizationManager::usersCollectionNamespace);
- const bool hasUsers =
- usersCollection && !Helpers::findOne(txn, usersCollection, BSONObj(), false).isNull();
- Collection* const adminVersionCollection =
- adminDb->getCollection(AuthorizationManager::versionCollectionNamespace);
- BSONObj authSchemaVersionDocument;
- if (!adminVersionCollection ||
- !Helpers::findOne(txn,
- adminVersionCollection,
- AuthorizationManager::versionDocumentQuery,
- authSchemaVersionDocument)) {
- if (!hasUsers) {
- // It's OK to have no auth version document if there are no user documents.
- return;
- }
- severe() << "During initial sync, found documents in "
- << AuthorizationManager::usersCollectionNamespace
- << " but could not find an auth schema version document in "
- << AuthorizationManager::versionCollectionNamespace;
- severe() << "This indicates that the primary of this replica set was not successfully "
- "upgraded to schema version "
- << AuthorizationManager::schemaVersion26Final
- << ", which is the minimum supported schema version in this version of MongoDB";
- fassertFailedNoTrace(28620);
- }
- long long foundSchemaVersion;
- Status status = bsonExtractIntegerField(authSchemaVersionDocument,
- AuthorizationManager::schemaVersionFieldName,
- &foundSchemaVersion);
- if (!status.isOK()) {
- severe() << "During initial sync, found malformed auth schema version document: " << status
- << "; document: " << authSchemaVersionDocument;
- fassertFailedNoTrace(28618);
- }
- if ((foundSchemaVersion != AuthorizationManager::schemaVersion26Final) &&
- (foundSchemaVersion != AuthorizationManager::schemaVersion28SCRAM)) {
- severe() << "During initial sync, found auth schema version " << foundSchemaVersion
- << ", but this version of MongoDB only supports schema versions "
- << AuthorizationManager::schemaVersion26Final << " and "
- << AuthorizationManager::schemaVersion28SCRAM;
- fassertFailedNoTrace(28619);
- }
-}
-
bool _initialSyncClone(OperationContext* txn,
Cloner& cloner,
const std::string& host,
@@ -202,7 +148,7 @@ bool _initialSyncClone(OperationContext* txn,
}
if (dataPass && (db == "admin")) {
- checkAdminDatabasePostClone(txn, dbHolder().get(txn, db));
+ fassertNoTrace(28619, checkAdminDatabase(txn, dbHolder().get(txn, db)));
}
return true;
}
@@ -512,6 +458,61 @@ const auto kMaxFailedAttempts = 10;
const auto kInitialSyncRetrySleepDuration = Seconds{5};
} // namespace
+Status checkAdminDatabase(OperationContext* txn, Database* adminDb) {
+ // Assumes txn holds MODE_X or MODE_S lock on "admin" database.
+ if (!adminDb) {
+ return Status::OK();
+ }
+ Collection* const usersCollection =
+ adminDb->getCollection(AuthorizationManager::usersCollectionNamespace);
+ const bool hasUsers =
+ usersCollection && !Helpers::findOne(txn, usersCollection, BSONObj(), false).isNull();
+ Collection* const adminVersionCollection =
+ adminDb->getCollection(AuthorizationManager::versionCollectionNamespace);
+ BSONObj authSchemaVersionDocument;
+ if (!adminVersionCollection ||
+ !Helpers::findOne(txn,
+ adminVersionCollection,
+ AuthorizationManager::versionDocumentQuery,
+ authSchemaVersionDocument)) {
+ if (!hasUsers) {
+ // It's OK to have no auth version document if there are no user documents.
+ return Status::OK();
+ }
+ std::string msg = str::stream()
+ << "During initial sync, found documents in "
+ << AuthorizationManager::usersCollectionNamespace.ns()
+ << " but could not find an auth schema version document in "
+ << AuthorizationManager::versionCollectionNamespace.ns() << ". "
+ << "This indicates that the primary of this replica set was not successfully "
+ "upgraded to schema version "
+ << AuthorizationManager::schemaVersion26Final
+ << ", which is the minimum supported schema version in this version of MongoDB";
+ return {ErrorCodes::AuthSchemaIncompatible, msg};
+ }
+ long long foundSchemaVersion;
+ Status status = bsonExtractIntegerField(authSchemaVersionDocument,
+ AuthorizationManager::schemaVersionFieldName,
+ &foundSchemaVersion);
+ if (!status.isOK()) {
+ std::string msg = str::stream()
+ << "During initial sync, found malformed auth schema version document: "
+ << status.toString() << "; document: " << authSchemaVersionDocument;
+ return {ErrorCodes::AuthSchemaIncompatible, msg};
+ }
+ if ((foundSchemaVersion != AuthorizationManager::schemaVersion26Final) &&
+ (foundSchemaVersion != AuthorizationManager::schemaVersion28SCRAM)) {
+ std::string msg = str::stream()
+ << "During initial sync, found auth schema version " << foundSchemaVersion
+ << ", but this version of MongoDB only supports schema versions "
+ << AuthorizationManager::schemaVersion26Final << " and "
+ << AuthorizationManager::schemaVersion28SCRAM;
+ return {ErrorCodes::AuthSchemaIncompatible, msg};
+ }
+
+ return Status::OK();
+}
+
void syncDoInitialSync(BackgroundSync* bgsync) {
stdx::unique_lock<stdx::mutex> lk(_initialSyncMutex, stdx::defer_lock);
if (!lk.try_lock()) {
diff --git a/src/mongo/db/repl/rs_initialsync.h b/src/mongo/db/repl/rs_initialsync.h
index 1f5416453e0..4f8cd5e5017 100644
--- a/src/mongo/db/repl/rs_initialsync.h
+++ b/src/mongo/db/repl/rs_initialsync.h
@@ -28,9 +28,13 @@
#pragma once
+#include "mongo/base/status.h"
+
namespace mongo {
-namespace repl {
+class OperationContext;
+class Database;
+namespace repl {
class BackgroundSync;
/**
@@ -39,5 +43,10 @@ class BackgroundSync;
*/
void syncDoInitialSync(BackgroundSync* bgsync);
+/**
+ * Checks that the "admin" database contains a supported version of the auth data schema.
+ */
+Status checkAdminDatabase(OperationContext* txn, Database* adminDb);
+
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/storage_interface.cpp b/src/mongo/db/repl/storage_interface.cpp
index ba885bda35c..557b44d079a 100644
--- a/src/mongo/db/repl/storage_interface.cpp
+++ b/src/mongo/db/repl/storage_interface.cpp
@@ -71,9 +71,9 @@ StorageInterface* StorageInterface::get(OperationContext* txn) {
}
-void StorageInterface::set(ServiceContext* service, std::unique_ptr<StorageInterface> replCoord) {
- auto& coordinator = getStorageInterface(service);
- coordinator = std::move(replCoord);
+void StorageInterface::set(ServiceContext* service, std::unique_ptr<StorageInterface> storage) {
+ auto& storageInterface = getStorageInterface(service);
+ storageInterface = std::move(storage);
}
} // namespace repl
diff --git a/src/mongo/db/repl/storage_interface.h b/src/mongo/db/repl/storage_interface.h
index d0a3cc723fc..1a45d84021e 100644
--- a/src/mongo/db/repl/storage_interface.h
+++ b/src/mongo/db/repl/storage_interface.h
@@ -34,12 +34,15 @@
#include "mongo/base/disallow_copying.h"
#include "mongo/db/namespace_string.h"
+#include "mongo/db/repl/collection_bulk_loader.h"
#include "mongo/db/repl/multiapplier.h"
#include "mongo/db/repl/optime.h"
#include "mongo/db/service_context.h"
namespace mongo {
+class Collection;
+struct CollectionOptions;
class OperationContext;
namespace repl {
@@ -60,9 +63,19 @@ enum class DurableRequirement {
};
/**
- * Storage interface used by used by the ReplicationExecutor inside mongod for supporting
- * ReplicationExectutor's ability to take database locks.
+ * Storage interface used by the replication system to interact with storage.
+ * This interface provides seperation of concerns and a place for mocking out test
+ * interactions.
*
+ * The grouping of functionality includes general collection helpers, and more specific replication
+ * concepts:
+ * * Create Collection and Oplog
+ * * Drop database and all user databases
+ * * Drop a collection
+ * * Insert documents into a collection
+ * * Manage minvalid boundaries and initial sync state
+ *
+ * ***** MINVALID *****
* This interface provides helper functions for maintaining a single document in the
* local.replset.minvalid collection.
*
@@ -83,20 +96,40 @@ enum class DurableRequirement {
* begin: {ts:..., t:...} // a batch is currently being applied, and not consistent
* }
*/
-
class StorageInterface {
MONGO_DISALLOW_COPYING(StorageInterface);
public:
+ // Used for testing.
+
+ using CreateCollectionFn = stdx::function<StatusWith<std::unique_ptr<CollectionBulkLoader>>(
+ const NamespaceString& nss,
+ const CollectionOptions& options,
+ const BSONObj idIndexSpec,
+ const std::vector<BSONObj>& secondaryIndexSpecs)>;
+ using InsertDocumentFn = stdx::function<Status(
+ OperationContext* txn, const NamespaceString& nss, const BSONObj& doc)>;
+ using InsertOplogDocumentsFn = stdx::function<StatusWith<OpTime>(
+ OperationContext* txn, const NamespaceString& nss, const std::vector<BSONObj>& ops)>;
+ using DropUserDatabasesFn = stdx::function<Status(OperationContext* txn)>;
+ using CreateOplogFn = stdx::function<Status(OperationContext* txn, const NamespaceString& nss)>;
+ using DropCollectionFn =
+ stdx::function<Status(OperationContext* txn, const NamespaceString& nss)>;
+
+ // Operation Context binding.
static StorageInterface* get(ServiceContext* service);
static StorageInterface* get(ServiceContext& service);
static StorageInterface* get(OperationContext* txn);
-
static void set(ServiceContext* service, std::unique_ptr<StorageInterface> storageInterface);
+ // Constructor and Destructor.
StorageInterface() = default;
virtual ~StorageInterface() = default;
+ virtual void startup() = 0;
+ virtual void shutdown() = 0;
+
+ // MinValid and Initial Sync Flag.
/**
* Returns true if initial sync was started but has not not completed.
*/
@@ -152,6 +185,54 @@ public:
virtual StatusWith<OpTime> writeOpsToOplog(OperationContext* txn,
const NamespaceString& nss,
const MultiApplier::Operations& operations) = 0;
+
+ // Collection creation and population for initial sync.
+ /**
+ * Creates a collection with the provided indexes.
+ *
+ * Assumes that no database locks have been acquired prior to calling this function.
+ */
+ virtual StatusWith<std::unique_ptr<CollectionBulkLoader>> createCollectionForBulkLoading(
+ const NamespaceString& nss,
+ const CollectionOptions& options,
+ const BSONObj idIndexSpec,
+ const std::vector<BSONObj>& secondaryIndexSpecs) = 0;
+
+ /**
+ * Inserts a document into a collection.
+ *
+ * NOTE: If the collection doesn't exist, it will not be created, and instead
+ * an error is returned.
+ */
+ virtual Status insertDocument(OperationContext* txn,
+ const NamespaceString& nss,
+ const BSONObj& doc) = 0;
+
+ /**
+ * Inserts the given documents into the oplog, returning the last written OpTime.
+ */
+ virtual StatusWith<OpTime> insertOplogDocuments(OperationContext* txn,
+ const NamespaceString& nss,
+ const std::vector<BSONObj>& ops) = 0;
+ /**
+ * Creates the initial oplog, errors if it exists.
+ */
+ virtual Status createOplog(OperationContext* txn, const NamespaceString& nss) = 0;
+
+ /**
+ * Drops a collection, like the oplog.
+ */
+ virtual Status dropCollection(OperationContext* txn, const NamespaceString& nss) = 0;
+
+ /**
+ * Drops all databases except "local".
+ */
+ virtual Status dropReplicatedDatabases(OperationContext* txn) = 0;
+
+ /**
+ * Validates that the admin database is valid during initial sync.
+ */
+ virtual Status isAdminDbValid(OperationContext* txn) = 0;
};
} // namespace repl
diff --git a/src/mongo/db/repl/storage_interface_impl.cpp b/src/mongo/db/repl/storage_interface_impl.cpp
index afe4a709d98..3acbad55326 100644
--- a/src/mongo/db/repl/storage_interface_impl.cpp
+++ b/src/mongo/db/repl/storage_interface_impl.cpp
@@ -34,7 +34,14 @@
#include <algorithm>
+#include "mongo/base/status.h"
+#include "mongo/base/status_with.h"
+#include "mongo/bson/bsonobj.h"
+#include "mongo/bson/bsonobjbuilder.h"
#include "mongo/db/catalog/collection.h"
+#include "mongo/db/catalog/database.h"
+#include "mongo/db/catalog/document_validation.h"
+#include "mongo/db/catalog/index_create.h"
#include "mongo/db/client.h"
#include "mongo/db/concurrency/d_concurrency.h"
#include "mongo/db/concurrency/write_conflict_exception.h"
@@ -43,11 +50,18 @@
#include "mongo/db/dbhelpers.h"
#include "mongo/db/jsobj.h"
#include "mongo/db/operation_context.h"
+#include "mongo/db/repl/collection_bulk_loader_impl.h"
+#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/replication_coordinator.h"
+#include "mongo/db/repl/replication_coordinator_global.h"
+#include "mongo/db/repl/rs_initialsync.h"
+#include "mongo/db/server_parameters.h"
#include "mongo/db/service_context.h"
#include "mongo/util/assert_util.h"
+#include "mongo/util/concurrency/old_thread_pool.h"
+#include "mongo/util/destructor_guard.h"
#include "mongo/util/log.h"
-
+#include "mongo/util/mongoutils/str.h"
namespace mongo {
namespace repl {
@@ -56,9 +70,11 @@ const char StorageInterfaceImpl::kInitialSyncFlagFieldName[] = "doingInitialSync
const char StorageInterfaceImpl::kBeginFieldName[] = "begin";
namespace {
+using UniqueLock = stdx::unique_lock<stdx::mutex>;
-const BSONObj kInitialSyncFlag(BSON(StorageInterfaceImpl::kInitialSyncFlagFieldName << true));
+MONGO_EXPORT_STARTUP_SERVER_PARAMETER(dataReplicatorInitialSyncInserterThreads, int, 4);
+const BSONObj kInitialSyncFlag(BSON(StorageInterfaceImpl::kInitialSyncFlagFieldName << true));
} // namespace
StorageInterfaceImpl::StorageInterfaceImpl()
@@ -67,6 +83,22 @@ StorageInterfaceImpl::StorageInterfaceImpl()
StorageInterfaceImpl::StorageInterfaceImpl(const NamespaceString& minValidNss)
: _minValidNss(minValidNss) {}
+StorageInterfaceImpl::~StorageInterfaceImpl() {
+ DESTRUCTOR_GUARD(shutdown(););
+}
+
+void StorageInterfaceImpl::startup() {
+ _bulkLoaderThreads.reset(
+ new OldThreadPool{dataReplicatorInitialSyncInserterThreads, "InitialSyncInserters-"});
+};
+
+void StorageInterfaceImpl::shutdown() {
+ if (_bulkLoaderThreads) {
+ _bulkLoaderThreads->join();
+ _bulkLoaderThreads.reset();
+ }
+}
+
NamespaceString StorageInterfaceImpl::getMinValidNss() const {
return _minValidNss;
}
@@ -135,14 +167,25 @@ BatchBoundaries StorageInterfaceImpl::getMinValid(OperationContext* txn) const {
if (found) {
auto status = OpTime::parseFromOplogEntry(mv.getObjectField(kBeginFieldName));
OpTime start(status.isOK() ? status.getValue() : OpTime{});
- OpTime end(fassertStatusOK(40052, OpTime::parseFromOplogEntry(mv)));
+ const auto opTimeStatus = OpTime::parseFromOplogEntry(mv);
+ // If any of the keys (fields) are missing from the minvalid document, we return
+ // empty.
+ if (opTimeStatus == ErrorCodes::NoSuchKey) {
+ return BatchBoundaries{{}, {}};
+ }
+
+ if (!opTimeStatus.isOK()) {
+ error() << "Error parsing minvalid entry: " << mv
+ << ", with status:" << opTimeStatus.getStatus();
+ }
+ OpTime end(fassertStatusOK(40052, opTimeStatus));
LOG(3) << "returning minvalid: " << start.toString() << "(" << start.toBSON() << ") -> "
<< end.toString() << "(" << end.toBSON() << ")";
return BatchBoundaries(start, end);
}
LOG(3) << "returning empty minvalid";
- return BatchBoundaries{OpTime{}, OpTime{}};
+ return BatchBoundaries{{}, {}};
}
MONGO_WRITE_CONFLICT_RETRY_LOOP_END(
txn, "StorageInterfaceImpl::getMinValid", _minValidNss.ns());
@@ -224,5 +267,164 @@ StatusWith<OpTime> StorageInterfaceImpl::writeOpsToOplog(
return operations.back().getOpTime();
}
+StatusWith<std::unique_ptr<CollectionBulkLoader>>
+StorageInterfaceImpl::createCollectionForBulkLoading(
+ const NamespaceString& nss,
+ const CollectionOptions& options,
+ const BSONObj idIndexSpec,
+ const std::vector<BSONObj>& secondaryIndexSpecs) {
+
+ UniqueLock lk(_runnersMutex);
+ // Check to make sure we don't already have a runner.
+ for (auto&& item : _runners) {
+ if (item.first == nss) {
+ return {ErrorCodes::IllegalOperation,
+ str::stream() << "There is already an active collection cloner for: "
+ << nss.ns()};
+ }
+ }
+ // Create the runner, and schedule the collection creation.
+ _runners.emplace_back(
+ std::make_pair(nss, stdx::make_unique<TaskRunner>(_bulkLoaderThreads.get())));
+ auto&& inserter = _runners.back();
+ TaskRunner* runner = inserter.second.get();
+ lk.unlock();
+
+ // Setup cond_var for signalling when done.
+ std::unique_ptr<CollectionBulkLoader> loaderToReturn;
+
+ auto status = runner->runSynchronousTask([&](OperationContext* txn) -> Status {
+ // We are not replicating nor validating these writes.
+ txn->setReplicatedWrites(false);
+ DisableDocumentValidation validationDisabler(txn);
+
+ // Retry if WCE.
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
+ // Get locks and create the collection.
+ ScopedTransaction transaction(txn, MODE_IX);
+ auto db = stdx::make_unique<AutoGetOrCreateDb>(txn, nss.db(), MODE_IX);
+ auto coll = stdx::make_unique<AutoGetCollection>(txn, nss, MODE_X);
+ Collection* collection = coll->getCollection();
+
+ if (collection) {
+ return {ErrorCodes::NamespaceExists, "Collection already exists."};
+ }
+
+ // Create the collection.
+ WriteUnitOfWork wunit(txn);
+ collection = db->getDb()->createCollection(txn, nss.ns(), options, false);
+ invariant(collection);
+ wunit.commit();
+ coll = stdx::make_unique<AutoGetCollection>(txn, nss, MODE_IX);
+
+ // Move locks into loader, so it now controls their lifetime.
+ auto loader = stdx::make_unique<CollectionBulkLoaderImpl>(
+ txn, runner, collection, idIndexSpec, std::move(db), std::move(coll));
+ invariant(collection);
+ auto status = loader->init(txn, collection, secondaryIndexSpecs);
+ if (!status.isOK()) {
+ return status;
+ }
+
+ // Move the loader into the StatusWith.
+ loaderToReturn = std::move(loader);
+ return Status::OK();
+ }
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "beginCollectionClone", nss.ns());
+ MONGO_UNREACHABLE;
+ });
+
+ if (!status.isOK()) {
+ return status;
+ }
+
+ return std::move(loaderToReturn);
+}
+
+
+Status StorageInterfaceImpl::insertDocument(OperationContext* txn,
+ const NamespaceString& nss,
+ const BSONObj& doc) {
+ ScopedTransaction transaction(txn, MODE_IX);
+ AutoGetOrCreateDb autoDB(txn, nss.db(), MODE_IX);
+ AutoGetCollection autoColl(txn, nss, MODE_IX);
+ if (!autoColl.getCollection()) {
+ return {ErrorCodes::NamespaceNotFound,
+ "The collection must exist before inserting documents."};
+ }
+ WriteUnitOfWork wunit(txn);
+ const auto status =
+ autoColl.getCollection()->insertDocument(txn, doc, nullptr /** OpDebug **/, false, false);
+ if (status.isOK()) {
+ wunit.commit();
+ }
+ return status;
+}
+
+StatusWith<OpTime> StorageInterfaceImpl::insertOplogDocuments(OperationContext* txn,
+ const NamespaceString& nss,
+ const std::vector<BSONObj>& ops) {
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
+ ScopedTransaction transaction(txn, MODE_IX);
+ AutoGetOrCreateDb autoDB(txn, nss.db(), MODE_IX);
+ AutoGetCollection autoColl(txn, nss, MODE_IX);
+ if (!autoColl.getCollection()) {
+ return {
+ ErrorCodes::NamespaceNotFound,
+ str::stream() << "The oplog collection must exist before inserting documents, ns:"
+ << nss.ns()};
+ }
+ if (!autoColl.getCollection()->isCapped()) {
+ return {ErrorCodes::NamespaceNotFound,
+ str::stream() << "The oplog collection must be capped, ns:" << nss.ns()};
+ }
+
+ WriteUnitOfWork wunit(txn);
+ const auto lastOpTime = repl::writeOpsToOplog(txn, nss.ns(), ops);
+ wunit.commit();
+ return lastOpTime;
+ }
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_END(
+ txn, "StorageInterfaceImpl::insertOplogDocuments", nss.ns());
+}
+
+Status StorageInterfaceImpl::dropReplicatedDatabases(OperationContext* txn) {
+ dropAllDatabasesExceptLocal(txn);
+ return Status::OK();
+}
+
+Status StorageInterfaceImpl::createOplog(OperationContext* txn, const NamespaceString& nss) {
+ mongo::repl::createOplog(txn, nss.ns(), true);
+ return Status::OK();
+}
+
+Status StorageInterfaceImpl::dropCollection(OperationContext* txn, const NamespaceString& nss) {
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
+ ScopedTransaction transaction(txn, MODE_IX);
+ AutoGetOrCreateDb autoDB(txn, nss.db(), MODE_X);
+ WriteUnitOfWork wunit(txn);
+ const auto status = autoDB.getDb()->dropCollection(txn, nss.ns());
+ if (status.isOK()) {
+ wunit.commit();
+ }
+ return status;
+ }
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "StorageInterfaceImpl::dropCollection", nss.ns());
+}
+
+Status StorageInterfaceImpl::isAdminDbValid(OperationContext* txn) {
+ log() << "StorageInterfaceImpl::isAdminDbValid called.";
+ // TODO: plumb through operation context from caller, for now run on ioThread with runner.
+ TaskRunner runner(_bulkLoaderThreads.get());
+ auto status = runner.runSynchronousTask(
+ [](OperationContext* txn) -> Status {
+ ScopedTransaction transaction(txn, MODE_IX);
+ AutoGetDb autoDB(txn, "admin", MODE_X);
+ return checkAdminDatabase(txn, autoDB.getDb());
+ },
+ TaskRunner::NextAction::kDisposeOperationContext);
+ return status;
+}
+
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/storage_interface_impl.h b/src/mongo/db/repl/storage_interface_impl.h
index 9abf6dd5ff7..9f05270ef1f 100644
--- a/src/mongo/db/repl/storage_interface_impl.h
+++ b/src/mongo/db/repl/storage_interface_impl.h
@@ -30,8 +30,15 @@
#pragma once
#include "mongo/base/disallow_copying.h"
+#include "mongo/base/status.h"
+#include "mongo/base/status_with.h"
+#include "mongo/bson/bsonobj.h"
+#include "mongo/db/catalog/index_create.h"
+#include "mongo/db/db_raii.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/repl/storage_interface.h"
+#include "mongo/db/repl/task_runner.h"
+#include "mongo/util/concurrency/old_thread_pool.h"
namespace mongo {
namespace repl {
@@ -46,6 +53,10 @@ public:
StorageInterfaceImpl();
explicit StorageInterfaceImpl(const NamespaceString& minValidNss);
+ virtual ~StorageInterfaceImpl();
+
+ void startup() override;
+ void shutdown() override;
/**
* Returns namespace of collection containing the minvalid boundaries and initial sync flag.
@@ -70,8 +81,42 @@ public:
const NamespaceString& nss,
const MultiApplier::Operations& operations) override;
+ /**
+ * Allocates a new TaskRunner for use by the passed in collection.
+ */
+ StatusWith<std::unique_ptr<CollectionBulkLoader>> createCollectionForBulkLoading(
+ const NamespaceString& nss,
+ const CollectionOptions& options,
+ const BSONObj idIndexSpec,
+ const std::vector<BSONObj>& secondaryIndexSpecs) override;
+
+ Status insertDocument(OperationContext* txn,
+ const NamespaceString& nss,
+ const BSONObj& doc) override;
+
+ StatusWith<OpTime> insertOplogDocuments(OperationContext* txn,
+ const NamespaceString& nss,
+ const std::vector<BSONObj>& ops) override;
+
+ Status dropReplicatedDatabases(OperationContext* txn) override;
+
+ Status createOplog(OperationContext* txn, const NamespaceString& nss) override;
+
+ Status dropCollection(OperationContext* txn, const NamespaceString& nss) override;
+
+ Status isAdminDbValid(OperationContext* txn) override;
+
private:
- NamespaceString _minValidNss;
+ // One thread per collection/TaskRunner
+ std::unique_ptr<OldThreadPool> _bulkLoaderThreads;
+ const NamespaceString _minValidNss;
+
+ // This mutex protects _runners vector.
+ stdx::mutex _runnersMutex;
+
+ // Each runner services a single collection and holds on to the OperationContext (and thread)
+ // until it is done with the collection (CollectionBulkLoaderImpl::commit/abort is called).
+ std::vector<std::pair<const NamespaceString&, std::unique_ptr<TaskRunner>>> _runners;
};
} // namespace repl
diff --git a/src/mongo/db/repl/storage_interface_impl_test.cpp b/src/mongo/db/repl/storage_interface_impl_test.cpp
index 752944d4dcc..084bc33cd2d 100644
--- a/src/mongo/db/repl/storage_interface_impl_test.cpp
+++ b/src/mongo/db/repl/storage_interface_impl_test.cpp
@@ -32,6 +32,7 @@
#include "mongo/db/catalog/collection_options.h"
#include "mongo/db/catalog/database.h"
+#include "mongo/db/catalog/document_validation.h"
#include "mongo/db/client.h"
#include "mongo/db/concurrency/d_concurrency.h"
#include "mongo/db/concurrency/write_conflict_exception.h"
@@ -40,7 +41,9 @@
#include "mongo/db/dbhelpers.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/operation_context.h"
+#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/oplog_interface_local.h"
+#include "mongo/db/repl/replication_coordinator_global.h"
#include "mongo/db/repl/replication_coordinator_mock.h"
#include "mongo/db/repl/storage_interface_impl.h"
#include "mongo/db/service_context_d_test_fixture.h"
@@ -115,12 +118,97 @@ OplogEntry makeOplogEntry(OpTime opTime) {
return OplogEntry(bob.obj());
}
+/**
+ * Helper to create default ReplSettings for tests.
+ */
+ReplSettings createReplSettings() {
+ ReplSettings settings;
+ settings.setOplogSizeBytes(5 * 1024 * 1024);
+ settings.setReplSetString("mySet/node1:12345");
+ return settings;
+}
+
+/**
+ * Creates a collection given the supplied options.
+ */
+StatusWith<Collection*> createCollection(OperationContext* txn,
+ NamespaceString& nss,
+ CollectionOptions opts = CollectionOptions()) {
+ ScopedTransaction transaction(txn, MODE_IX);
+ AutoGetOrCreateDb db{txn, nss.db(), MODE_X};
+ WriteUnitOfWork wunit{txn};
+ auto collection = db.getDb()->createCollection(txn, nss.ns(), opts, true);
+ if (collection) {
+ wunit.commit();
+ return collection;
+ }
+ return {ErrorCodes::InternalError, "collection creation failed."};
+}
+
+/**
+ * Counts the number of keys in an index using an IndexAccessMethod::validate call.
+ */
+int64_t getIndexKeyCount(OperationContext* txn, IndexCatalog* cat, IndexDescriptor* desc) {
+ auto idx = cat->getIndex(desc);
+ int64_t numKeys;
+ ValidateResults fullRes;
+ idx->validate(txn, &numKeys, &fullRes);
+ return numKeys;
+}
+
+
class StorageInterfaceImplTest : public ServiceContextMongoDTest {
protected:
- Client* getClient() const;
+ Client* getClient() const {
+ return &cc();
+ }
+
+private:
+ void setUp() override {
+ ServiceContextMongoDTest::setUp();
+ // Initializes cc() used in ServiceContextMongoD::_newOpCtx().
+ Client::initThreadIfNotAlready("StorageInterfaceImplTest");
+
+ ReplSettings settings;
+ settings.setOplogSizeBytes(5 * 1024 * 1024);
+ settings.setReplSetString("mySet/node1:12345");
+ ReplicationCoordinator::set(getGlobalServiceContext(),
+ stdx::make_unique<ReplicationCoordinatorMock>(settings));
+ }
+};
+
+class StorageInterfaceImplWithReplCoordTest : public ServiceContextMongoDTest {
+protected:
+ void setUp() override {
+ ServiceContextMongoDTest::setUp();
+ Client::initThreadIfNotAlready();
+ createOptCtx();
+ _coordinator = new ReplicationCoordinatorMock(createReplSettings());
+ setGlobalReplicationCoordinator(_coordinator);
+ }
+ void tearDown() override {
+ _txn.reset(nullptr);
+ ServiceContextMongoDTest::tearDown();
+ }
+
+
+ void createOptCtx() {
+ Client::initThreadIfNotAlready();
+ _txn = cc().makeOperationContext();
+ // We are not replicating nor validating these writes.
+ _txn->setReplicatedWrites(false);
+ DisableDocumentValidation validationDisabler(_txn.get());
+ }
+
+ OperationContext* getOperationContext() {
+ return _txn.get();
+ }
private:
- void setUp() override;
+ ServiceContext::UniqueOperationContext _txn;
+
+ // Owned by service context
+ ReplicationCoordinator* _coordinator;
};
/**
@@ -132,22 +220,6 @@ public:
bool waitUntilDurableCalled = false;
};
-void StorageInterfaceImplTest::setUp() {
- ServiceContextMongoDTest::setUp();
- // Initializes cc() used in ServiceContextMongoD::_newOpCtx().
- Client::initThreadIfNotAlready("StorageInterfaceImplTest");
-
- ReplSettings settings;
- settings.setOplogSizeBytes(5 * 1024 * 1024);
- settings.setReplSetString("mySet/node1:12345");
- ReplicationCoordinator::set(getGlobalServiceContext(),
- stdx::make_unique<ReplicationCoordinatorMock>(settings));
-}
-
-Client* StorageInterfaceImplTest::getClient() const {
- return &cc();
-}
-
bool RecoveryUnitWithDurabilityTracking::waitUntilDurable() {
waitUntilDurableCalled = true;
return RecoveryUnitNoop::waitUntilDurable();
@@ -189,6 +261,25 @@ TEST_F(StorageInterfaceImplTest, InitialSyncFlag) {
ASSERT_FALSE(storageInterface.getInitialSyncFlag(txn.get()));
}
+TEST_F(StorageInterfaceImplTest, GetMinValidAfterSettingInitialSyncFlagWorks) {
+ NamespaceString nss(
+ "local.StorageInterfaceImplTest_GetMinValidAfterSettingInitialSyncFlagWorks");
+
+ StorageInterfaceImpl storageInterface(nss);
+ auto txn = getClient()->makeOperationContext();
+
+ // Initial sync flag should be unset after initializing a new storage engine.
+ ASSERT_FALSE(storageInterface.getInitialSyncFlag(txn.get()));
+
+ // Setting initial sync flag should affect getInitialSyncFlag() result.
+ storageInterface.setInitialSyncFlag(txn.get());
+ ASSERT_TRUE(storageInterface.getInitialSyncFlag(txn.get()));
+
+ auto minValid = storageInterface.getMinValid(txn.get());
+ ASSERT_TRUE(minValid.start.isNull());
+ ASSERT_TRUE(minValid.end.isNull());
+}
+
TEST_F(StorageInterfaceImplTest, MinValid) {
NamespaceString nss("local.StorageInterfaceImplTest_MinValid");
@@ -303,4 +394,121 @@ TEST_F(StorageInterfaceImplTest,
ASSERT_STRING_CONTAINS(status.reason(), "collection not found");
}
+TEST_F(StorageInterfaceImplWithReplCoordTest, InsertMissingDocWorksOnExistingCappedCollection) {
+ auto txn = getOperationContext();
+ StorageInterfaceImpl storage;
+ NamespaceString nss("foo.bar");
+ CollectionOptions opts;
+ opts.capped = true;
+ opts.cappedSize = 1024 * 1024;
+ ASSERT_OK(createCollection(txn, nss, opts));
+ ASSERT_OK(storage.insertDocument(txn, nss, BSON("_id" << 1)));
+ AutoGetCollectionForRead autoColl(txn, nss);
+ ASSERT_TRUE(autoColl.getCollection());
+}
+
+TEST_F(StorageInterfaceImplWithReplCoordTest, InsertMissingDocWorksOnExistingCollection) {
+ auto txn = getOperationContext();
+ StorageInterfaceImpl storage;
+ NamespaceString nss("foo.bar");
+ ASSERT_OK(createCollection(txn, nss));
+ ASSERT_OK(storage.insertDocument(txn, nss, BSON("_id" << 1)));
+ AutoGetCollectionForRead autoColl(txn, nss);
+ ASSERT_TRUE(autoColl.getCollection());
+}
+
+TEST_F(StorageInterfaceImplWithReplCoordTest, InsertMissingDocFailesIfCollectionIsMissing) {
+ auto txn = getOperationContext();
+ StorageInterfaceImpl storage;
+ NamespaceString nss("foo.bar");
+ const auto status = storage.insertDocument(txn, nss, BSON("_id" << 1));
+ ASSERT_NOT_OK(status);
+ ASSERT_EQ(status.code(), ErrorCodes::NamespaceNotFound);
+}
+
+TEST_F(StorageInterfaceImplWithReplCoordTest, CreateCollectionWithIDIndexCommits) {
+ auto txn = getOperationContext();
+ StorageInterfaceImpl storage;
+ storage.startup();
+ NamespaceString nss("foo.bar");
+ CollectionOptions opts;
+ std::vector<BSONObj> indexes;
+ auto loaderStatus = storage.createCollectionForBulkLoading(nss, opts, {}, indexes);
+ ASSERT_OK(loaderStatus.getStatus());
+ auto loader = std::move(loaderStatus.getValue());
+ std::vector<BSONObj> docs = {BSON("_id" << 1), BSON("_id" << 1), BSON("_id" << 2)};
+ ASSERT_OK(loader->insertDocuments(docs.begin(), docs.end()));
+ ASSERT_OK(loader->commit());
+
+ AutoGetCollectionForRead autoColl(txn, nss);
+ auto coll = autoColl.getCollection();
+ ASSERT(coll);
+ ASSERT_EQ(coll->getRecordStore()->numRecords(txn), 2LL);
+ auto collIdxCat = coll->getIndexCatalog();
+ auto idIdxDesc = collIdxCat->findIdIndex(txn);
+ auto count = getIndexKeyCount(txn, collIdxCat, idIdxDesc);
+ ASSERT_EQ(count, 2LL);
+}
+
+TEST_F(StorageInterfaceImplWithReplCoordTest, CreateCollectionThatAlreadyExistsFails) {
+ auto txn = getOperationContext();
+ StorageInterfaceImpl storage;
+ storage.startup();
+ NamespaceString nss("test.system.indexes");
+ auto coll = createCollection(txn, nss);
+ ASSERT_OK(coll.getStatus());
+
+ const CollectionOptions opts;
+ const std::vector<BSONObj> indexes;
+ const auto status = storage.createCollectionForBulkLoading(nss, opts, {}, indexes);
+ ASSERT_NOT_OK(status.getStatus());
+}
+
+TEST_F(StorageInterfaceImplWithReplCoordTest, CreateOplogCreateCappedCollection) {
+ auto txn = getOperationContext();
+ StorageInterfaceImpl storage;
+ NamespaceString nss("local.oplog.X");
+ {
+ AutoGetCollectionForRead autoColl(txn, nss);
+ ASSERT_FALSE(autoColl.getCollection());
+ }
+ ASSERT_OK(storage.createOplog(txn, nss));
+ {
+ AutoGetCollectionForRead autoColl(txn, nss);
+ ASSERT_TRUE(autoColl.getCollection());
+ ASSERT_EQ(nss.toString(), autoColl.getCollection()->ns().toString());
+ ASSERT_TRUE(autoColl.getCollection()->isCapped());
+ }
+}
+
+TEST_F(StorageInterfaceImplWithReplCoordTest, DropCollectionWorksWithExistingWithDataCollection) {
+ auto txn = getOperationContext();
+ StorageInterfaceImpl storage;
+ NamespaceString nss("foo.bar");
+ auto coll = createCollection(txn, nss);
+ ASSERT_OK(coll.getStatus());
+ ASSERT_OK(coll.getValue()->insertDocument(
+ txn, BSON("_id" << 1), nullptr /** OpDebug **/, false, true));
+ ASSERT_OK(storage.dropCollection(txn, nss));
+}
+
+TEST_F(StorageInterfaceImplWithReplCoordTest, DropCollectionWorksWithExistingEmptyCollection) {
+ auto txn = getOperationContext();
+ StorageInterfaceImpl storage;
+ NamespaceString nss("foo.bar");
+ ASSERT_OK(createCollection(txn, nss));
+ ASSERT_OK(storage.dropCollection(txn, nss));
+ AutoGetCollectionForRead autoColl(txn, nss);
+ ASSERT_FALSE(autoColl.getCollection());
+}
+
+TEST_F(StorageInterfaceImplWithReplCoordTest, DropCollectionWorksWithMissingCollection) {
+ auto txn = getOperationContext();
+ StorageInterfaceImpl storage;
+ NamespaceString nss("foo.bar");
+ ASSERT_OK(storage.dropCollection(txn, nss));
+ AutoGetCollectionForRead autoColl(txn, nss);
+ ASSERT_FALSE(autoColl.getCollection());
+}
+
} // namespace
diff --git a/src/mongo/db/repl/storage_interface_mock.cpp b/src/mongo/db/repl/storage_interface_mock.cpp
index 36dcb6ce847..57daebdb60c 100644
--- a/src/mongo/db/repl/storage_interface_mock.cpp
+++ b/src/mongo/db/repl/storage_interface_mock.cpp
@@ -25,14 +25,21 @@
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication
+
+#include <numeric>
#include "mongo/platform/basic.h"
#include "mongo/db/repl/storage_interface_mock.h"
+#include "mongo/util/log.h"
+#include "mongo/util/mongoutils/str.h"
+
namespace mongo {
namespace repl {
-
+void StorageInterfaceMock::startup() {}
+void StorageInterfaceMock::shutdown() {}
bool StorageInterfaceMock::getInitialSyncFlag(OperationContext* txn) const {
stdx::lock_guard<stdx::mutex> lock(_initialSyncFlagMutex);
return _initialSyncFlag;
@@ -80,5 +87,31 @@ MultiApplier::Operations StorageInterfaceMock::getOperationsWrittenToOplog() con
return _operationsWrittenToOplog;
}
+Status CollectionBulkLoaderMock::init(OperationContext* txn,
+ Collection* coll,
+ const std::vector<BSONObj>& secondaryIndexSpecs) {
+ LOG(1) << "CollectionBulkLoaderMock::init called";
+ stats->initCalled = true;
+ return initFn(txn, coll, secondaryIndexSpecs);
+};
+
+Status CollectionBulkLoaderMock::insertDocuments(const std::vector<BSONObj>::const_iterator begin,
+ const std::vector<BSONObj>::const_iterator end) {
+ LOG(1) << "CollectionBulkLoaderMock::insertDocuments called";
+ const auto status = insertDocsFn(begin, end);
+
+ // Only count if it succeeds.
+ if (status.isOK()) {
+ stats->insertCount += std::distance(begin, end);
+ }
+ return status;
+};
+
+Status CollectionBulkLoaderMock::commit() {
+ LOG(1) << "CollectionBulkLoaderMock::commit called";
+ stats->commitCalled = true;
+ return commitFn();
+};
+
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/storage_interface_mock.h b/src/mongo/db/repl/storage_interface_mock.h
index 99ca0020e0b..143d5d367cd 100644
--- a/src/mongo/db/repl/storage_interface_mock.h
+++ b/src/mongo/db/repl/storage_interface_mock.h
@@ -30,18 +30,68 @@
#pragma once
#include "mongo/base/disallow_copying.h"
+#include "mongo/base/status.h"
+#include "mongo/base/status_with.h"
+#include "mongo/bson/bsonobj.h"
+#include "mongo/db/namespace_string.h"
#include "mongo/db/repl/storage_interface.h"
#include "mongo/stdx/mutex.h"
namespace mongo {
namespace repl {
+struct CollectionMockStats {
+ bool initCalled = false;
+ int insertCount = 0;
+ bool commitCalled = false;
+};
+
+class CollectionBulkLoaderMock : public CollectionBulkLoader {
+ MONGO_DISALLOW_COPYING(CollectionBulkLoaderMock);
+
+public:
+ CollectionBulkLoaderMock(CollectionMockStats* collStats) : stats(collStats){};
+ virtual ~CollectionBulkLoaderMock() = default;
+ virtual Status init(OperationContext* txn,
+ Collection* coll,
+ const std::vector<BSONObj>& secondaryIndexSpecs) override;
+
+ virtual Status insertDocuments(const std::vector<BSONObj>::const_iterator begin,
+ const std::vector<BSONObj>::const_iterator end) override;
+ virtual Status commit() override;
+
+ std::string toString() const override {
+ return toBSON().toString();
+ };
+ BSONObj toBSON() const override {
+ return BSONObj();
+ };
+
+ CollectionMockStats* stats;
+
+ // Override functions.
+ stdx::function<Status(const std::vector<BSONObj>::const_iterator begin,
+ const std::vector<BSONObj>::const_iterator end)>
+ insertDocsFn = [](const std::vector<BSONObj>::const_iterator,
+ const std::vector<BSONObj>::const_iterator) { return Status::OK(); };
+ stdx::function<Status()> abortFn = []() { return Status::OK(); };
+ stdx::function<Status()> commitFn = []() { return Status::OK(); };
+ stdx::function<Status(
+ OperationContext* txn, Collection* coll, const std::vector<BSONObj>& secondaryIndexSpecs)>
+ initFn = [](OperationContext*, Collection*, const std::vector<BSONObj>&) {
+ return Status::OK();
+ };
+};
+
class StorageInterfaceMock : public StorageInterface {
MONGO_DISALLOW_COPYING(StorageInterfaceMock);
public:
StorageInterfaceMock() = default;
+ void startup() override;
+ void shutdown() override;
+
bool getInitialSyncFlag(OperationContext* txn) const override;
void setInitialSyncFlag(OperationContext* txn) override;
void clearInitialSyncFlag(OperationContext* txn) override;
@@ -52,13 +102,76 @@ public:
const DurableRequirement durReq) override;
void setMinValid(OperationContext* txn, const BatchBoundaries& boundaries) override;
-
StatusWith<OpTime> writeOpsToOplog(OperationContext* txn,
const NamespaceString& nss,
const MultiApplier::Operations& operations) override;
MultiApplier::Operations getOperationsWrittenToOplog() const;
+ StatusWith<std::unique_ptr<CollectionBulkLoader>> createCollectionForBulkLoading(
+ const NamespaceString& nss,
+ const CollectionOptions& options,
+ const BSONObj idIndexSpec,
+ const std::vector<BSONObj>& secondaryIndexSpecs) override {
+ return createCollectionFn(nss, options, idIndexSpec, secondaryIndexSpecs);
+ };
+
+ Status insertDocument(OperationContext* txn,
+ const NamespaceString& nss,
+ const BSONObj& doc) override {
+ return insertDocumentFn(txn, nss, doc);
+ };
+
+ StatusWith<OpTime> insertOplogDocuments(OperationContext* txn,
+ const NamespaceString& nss,
+ const std::vector<BSONObj>& ops) override {
+ return insertOplogDocumentsFn(txn, nss, ops);
+ }
+
+ Status dropReplicatedDatabases(OperationContext* txn) override {
+ return dropUserDBsFn(txn);
+ };
+
+ Status createOplog(OperationContext* txn, const NamespaceString& nss) override {
+ return createOplogFn(txn, nss);
+ };
+
+ Status dropCollection(OperationContext* txn, const NamespaceString& nss) override {
+ return dropCollFn(txn, nss);
+ };
+
+ Status isAdminDbValid(OperationContext* txn) override {
+ return Status::OK();
+ };
+
+
+ // Testing functions.
+ CreateCollectionFn createCollectionFn = [](const NamespaceString& nss,
+ const CollectionOptions& options,
+ const BSONObj idIndexSpec,
+ const std::vector<BSONObj>& secondaryIndexSpecs)
+ -> StatusWith<std::unique_ptr<CollectionBulkLoader>> {
+ return Status{ErrorCodes::IllegalOperation, "CreateCollectionFn not implemented."};
+ };
+ InsertDocumentFn insertDocumentFn =
+ [](OperationContext* txn, const NamespaceString& nss, const BSONObj& doc) {
+ return Status{ErrorCodes::IllegalOperation, "InsertDocumentFn not implemented."};
+ };
+ InsertOplogDocumentsFn insertOplogDocumentsFn =
+ [](OperationContext* txn, const NamespaceString& nss, const std::vector<BSONObj>& ops) {
+ return StatusWith<OpTime>(
+ Status{ErrorCodes::IllegalOperation, "InsertOplogDocumentsFn not implemented."});
+ };
+ DropUserDatabasesFn dropUserDBsFn = [](OperationContext* txn) {
+ return Status{ErrorCodes::IllegalOperation, "DropUserDatabasesFn not implemented."};
+ };
+ CreateOplogFn createOplogFn = [](OperationContext* txn, const NamespaceString& nss) {
+ return Status{ErrorCodes::IllegalOperation, "CreateOplogFn not implemented."};
+ };
+ DropCollectionFn dropCollFn = [](OperationContext* txn, const NamespaceString& nss) {
+ return Status{ErrorCodes::IllegalOperation, "DropCollectionFn not implemented."};
+ };
+
private:
mutable stdx::mutex _initialSyncFlagMutex;
bool _initialSyncFlag = false;
diff --git a/src/mongo/db/repl/task_runner.cpp b/src/mongo/db/repl/task_runner.cpp
index 9665a17972e..a5bd1a5e852 100644
--- a/src/mongo/db/repl/task_runner.cpp
+++ b/src/mongo/db/repl/task_runner.cpp
@@ -51,6 +51,8 @@ namespace repl {
namespace {
using UniqueLock = stdx::unique_lock<stdx::mutex>;
+using LockGuard = stdx::lock_guard<stdx::mutex>;
+
/**
* Runs a single task runner task.
@@ -209,5 +211,43 @@ TaskRunner::Task TaskRunner::_waitForNextTask() {
return task;
}
+Status TaskRunner::runSynchronousTask(SynchronousTask func, TaskRunner::NextAction nextAction) {
+ // Setup cond_var for signaling when done.
+ bool done = false;
+ stdx::mutex mutex;
+ stdx::condition_variable waitTillDoneCond;
+
+ Status returnStatus{Status::OK()};
+ this->schedule([&](OperationContext* txn, const Status taskStatus) {
+ if (!taskStatus.isOK()) {
+ returnStatus = taskStatus;
+ } else {
+ // Run supplied function.
+ try {
+ log() << "starting to run synchronous task on runner.";
+ returnStatus = func(txn);
+ log() << "done running the synchronous task.";
+ } catch (...) {
+ returnStatus = exceptionToStatus();
+ error() << "Exception thrown in runSynchronousTask: " << returnStatus;
+ }
+ }
+
+ // Signal done.
+ LockGuard lk2{mutex};
+ done = true;
+ waitTillDoneCond.notify_all();
+
+ // return nextAction based on status from supplied function.
+ if (returnStatus.isOK()) {
+ return nextAction;
+ }
+ return TaskRunner::NextAction::kCancel;
+ });
+
+ UniqueLock lk{mutex};
+ waitTillDoneCond.wait(lk, [&done] { return done; });
+ return returnStatus;
+}
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/task_runner.h b/src/mongo/db/repl/task_runner.h
index 80fbd12d337..2b201757ff1 100644
--- a/src/mongo/db/repl/task_runner.h
+++ b/src/mongo/db/repl/task_runner.h
@@ -59,6 +59,16 @@ public:
};
using Task = stdx::function<NextAction(OperationContext*, const Status&)>;
+ using SynchronousTask = stdx::function<Status(OperationContext* txn)>;
+
+ /**
+ * Returns the Status from the supplied function after running it..
+ *
+ * Note: TaskRunner::NextAction controls when the operation context and thread will be released.
+ */
+ Status runSynchronousTask(
+ SynchronousTask func,
+ TaskRunner::NextAction nextAction = TaskRunner::NextAction::kKeepOperationContext);
/**
* Creates a Task returning kCancel. This is useful in shutting down the task runner after