/**
* Copyright (C) 2015 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication
#include "mongo/platform/basic.h"
#include "mongo/db/repl/database_cloner.h"
#include
#include
#include
#include "mongo/db/catalog/collection_options.h"
#include "mongo/stdx/functional.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/destructor_guard.h"
#include "mongo/util/log.h"
#include "mongo/util/mongoutils/str.h"
namespace mongo {
namespace repl {
namespace {
const char* kNameFieldName = "name";
const char* kOptionsFieldName = "options";
/**
* Default listCollections predicate.
*/
bool acceptAllPred(const BSONObj&) {
return true;
}
/**
* Creates a listCollections command obj with an optional filter.
*/
BSONObj createListCollectionsCommandObject(const BSONObj& filter) {
BSONObjBuilder output;
output.append("listCollections", 1);
if (!filter.isEmpty()) {
output.append("filter", filter);
}
return output.obj();
}
} // namespace
DatabaseCloner::DatabaseCloner(ReplicationExecutor* executor,
const HostAndPort& source,
const std::string& dbname,
const BSONObj& listCollectionsFilter,
const ListCollectionsPredicateFn& listCollectionsPred,
CollectionCloner::StorageInterface* si,
const CollectionCallbackFn& collWork,
const CallbackFn& onCompletion)
: _executor(executor),
_source(source),
_dbname(dbname),
_listCollectionsFilter(listCollectionsFilter),
_listCollectionsPredicate(listCollectionsPred ? listCollectionsPred : acceptAllPred),
_storageInterface(si),
_collectionWork(collWork),
_onCompletion(onCompletion),
_active(false),
_listCollectionsFetcher(_executor,
_source,
_dbname,
createListCollectionsCommandObject(_listCollectionsFilter),
stdx::bind(&DatabaseCloner::_listCollectionsCallback,
this,
stdx::placeholders::_1,
stdx::placeholders::_2,
stdx::placeholders::_3)),
_scheduleDbWorkFn([this](const ReplicationExecutor::CallbackFn& work) {
return _executor->scheduleDBWork(work);
}),
_startCollectionCloner([](CollectionCloner& cloner) { return cloner.start(); }) {
uassert(ErrorCodes::BadValue, "null replication executor", executor);
uassert(ErrorCodes::BadValue, "empty database name", !dbname.empty());
uassert(ErrorCodes::BadValue, "storage interface cannot be null", si);
uassert(ErrorCodes::BadValue, "collection callback function cannot be null", collWork);
uassert(ErrorCodes::BadValue, "callback function cannot be null", onCompletion);
}
DatabaseCloner::~DatabaseCloner() {
DESTRUCTOR_GUARD(cancel(); wait(););
}
const std::vector& DatabaseCloner::getCollectionInfos() const {
stdx::lock_guard lk(_mutex);
return _collectionInfos;
}
std::string DatabaseCloner::getDiagnosticString() const {
stdx::lock_guard lk(_mutex);
str::stream output;
output << "DatabaseCloner";
output << " executor: " << _executor->getDiagnosticString();
output << " source: " << _source.toString();
output << " database: " << _dbname;
output << " listCollections filter" << _listCollectionsFilter;
output << " active: " << _active;
output << " collection info objects (empty if listCollections is in progress): "
<< _collectionInfos.size();
return output;
}
bool DatabaseCloner::isActive() const {
stdx::lock_guard lk(_mutex);
return _active;
}
Status DatabaseCloner::start() {
stdx::lock_guard lk(_mutex);
if (_active) {
return Status(ErrorCodes::IllegalOperation, "database cloner already started");
}
Status scheduleResult = _listCollectionsFetcher.schedule();
if (!scheduleResult.isOK()) {
return scheduleResult;
}
_active = true;
return Status::OK();
}
void DatabaseCloner::cancel() {
{
stdx::lock_guard lk(_mutex);
if (!_active) {
return;
}
}
_listCollectionsFetcher.cancel();
}
void DatabaseCloner::wait() {
stdx::unique_lock lk(_mutex);
_condition.wait(lk, [this]() { return !_active; });
}
void DatabaseCloner::setScheduleDbWorkFn(const CollectionCloner::ScheduleDbWorkFn& work) {
stdx::lock_guard lk(_mutex);
_scheduleDbWorkFn = work;
}
void DatabaseCloner::setStartCollectionClonerFn(
const StartCollectionClonerFn& startCollectionCloner) {
_startCollectionCloner = startCollectionCloner;
}
void DatabaseCloner::_listCollectionsCallback(const StatusWith& result,
Fetcher::NextAction* nextAction,
BSONObjBuilder* getMoreBob) {
if (!result.isOK()) {
_finishCallback(result.getStatus());
return;
}
auto batchData(result.getValue());
auto&& documents = batchData.documents;
// We may be called with multiple batches leading to a need to grow _collectionInfos.
_collectionInfos.reserve(_collectionInfos.size() + documents.size());
std::copy_if(documents.begin(),
documents.end(),
std::back_inserter(_collectionInfos),
_listCollectionsPredicate);
// The fetcher will continue to call with kGetMore until an error or the last batch.
if (*nextAction == Fetcher::NextAction::kGetMore) {
invariant(getMoreBob);
getMoreBob->append("getMore", batchData.cursorId);
getMoreBob->append("collection", batchData.nss.coll());
return;
}
// Nothing to do for an empty database.
if (_collectionInfos.empty()) {
_finishCallback(Status::OK());
return;
}
_collectionNamespaces.reserve(_collectionInfos.size());
std::set seen;
for (auto&& info : _collectionInfos) {
BSONElement nameElement = info.getField(kNameFieldName);
if (nameElement.eoo()) {
_finishCallback(
Status(ErrorCodes::FailedToParse,
str::stream() << "collection info must contain '" << kNameFieldName << "' "
<< "field : "
<< info));
return;
}
if (nameElement.type() != mongo::String) {
_finishCallback(Status(
ErrorCodes::TypeMismatch,
str::stream() << "'" << kNameFieldName << "' field must be a string: " << info));
return;
}
const std::string collectionName = nameElement.String();
if (seen.find(collectionName) != seen.end()) {
_finishCallback(Status(ErrorCodes::DuplicateKey,
str::stream()
<< "collection info contains duplicate collection name "
<< "'"
<< collectionName
<< "': "
<< info));
return;
}
BSONElement optionsElement = info.getField(kOptionsFieldName);
if (optionsElement.eoo()) {
_finishCallback(Status(
ErrorCodes::FailedToParse,
str::stream() << "collection info must contain '" << kOptionsFieldName << "' "
<< "field : "
<< info));
return;
}
if (!optionsElement.isABSONObj()) {
_finishCallback(Status(ErrorCodes::TypeMismatch,
str::stream() << "'" << kOptionsFieldName
<< "' field must be an object: "
<< info));
return;
}
const BSONObj optionsObj = optionsElement.Obj();
CollectionOptions options;
Status parseStatus = options.parse(optionsObj);
if (!parseStatus.isOK()) {
_finishCallback(parseStatus);
return;
}
seen.insert(collectionName);
_collectionNamespaces.emplace_back(_dbname, collectionName);
auto&& nss = *_collectionNamespaces.crbegin();
try {
_collectionCloners.emplace_back(
_executor,
_source,
nss,
options,
stdx::bind(
&DatabaseCloner::_collectionClonerCallback, this, stdx::placeholders::_1, nss),
_storageInterface);
} catch (const UserException& ex) {
_finishCallback(ex.toStatus());
return;
}
}
for (auto&& collectionCloner : _collectionCloners) {
collectionCloner.setScheduleDbWorkFn(_scheduleDbWorkFn);
}
// Start first collection cloner.
_currentCollectionClonerIter = _collectionCloners.begin();
LOG(1) << " cloning collection " << _currentCollectionClonerIter->getSourceNamespace();
Status startStatus = _startCollectionCloner(*_currentCollectionClonerIter);
if (!startStatus.isOK()) {
LOG(1) << " failed to start collection cloning on "
<< _currentCollectionClonerIter->getSourceNamespace() << ": " << startStatus;
_finishCallback(startStatus);
return;
}
}
void DatabaseCloner::_collectionClonerCallback(const Status& status, const NamespaceString& nss) {
// Forward collection cloner result to caller.
// Failure to clone a collection does not stop the database cloner
// from cloning the rest of the collections in the listCollections result.
_collectionWork(status, nss);
_currentCollectionClonerIter++;
LOG(1) << " cloning collection " << _currentCollectionClonerIter->getSourceNamespace();
if (_currentCollectionClonerIter != _collectionCloners.end()) {
Status startStatus = _startCollectionCloner(*_currentCollectionClonerIter);
if (!startStatus.isOK()) {
LOG(1) << " failed to start collection cloning on "
<< _currentCollectionClonerIter->getSourceNamespace() << ": " << startStatus;
_finishCallback(startStatus);
return;
}
return;
}
_finishCallback(Status::OK());
}
void DatabaseCloner::_finishCallback(const Status& status) {
_onCompletion(status);
stdx::lock_guard lk(_mutex);
_active = false;
_condition.notify_all();
}
} // namespace repl
} // namespace mongo