/**
* Copyright (C) 2015 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication
#include "mongo/platform/basic.h"
#include "mongo/db/repl/collection_cloner.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/log.h"
#include "mongo/util/mongoutils/str.h"
namespace mongo {
namespace repl {
CollectionCloner::CollectionCloner(ReplicationExecutor* executor,
const HostAndPort& source,
const NamespaceString& sourceNss,
const CollectionOptions& options,
const CallbackFn& onCompletion,
StorageInterface* storageInterface)
: _executor(executor),
_source(source),
_sourceNss(sourceNss),
_destNss(_sourceNss),
_options(options),
_onCompletion(onCompletion),
_storageInterface(storageInterface),
_active(false),
_listIndexesFetcher(_executor,
_source,
_sourceNss.db().toString(),
BSON("listIndexes" << _sourceNss.coll()),
stdx::bind(&CollectionCloner::_listIndexesCallback,
this,
stdx::placeholders::_1,
stdx::placeholders::_2,
stdx::placeholders::_3)),
_findFetcher(_executor,
_source,
_sourceNss.db().toString(),
BSON("find" << _sourceNss.coll() <<
"noCursorTimeout" << true), // SERVER-1387
stdx::bind(&CollectionCloner::_findCallback,
this,
stdx::placeholders::_1,
stdx::placeholders::_2,
stdx::placeholders::_3)),
_indexSpecs(),
_documents(),
_dbWorkCallbackHandle(),
_scheduleDbWorkFn([this](const ReplicationExecutor::CallbackFn& work) {
return _executor->scheduleDBWork(work);
}) {
uassert(ErrorCodes::BadValue, "null replication executor", executor);
uassert(ErrorCodes::BadValue, "invalid collection namespace: " + sourceNss.ns(),
sourceNss.isValid());
uassertStatusOK(options.validate());
uassert(ErrorCodes::BadValue, "callback function cannot be null", onCompletion);
uassert(ErrorCodes::BadValue, "null storage interface", storageInterface);
}
CollectionCloner::~CollectionCloner() {
DESTRUCTOR_GUARD(
cancel();
wait();
);
}
const NamespaceString& CollectionCloner::getSourceNamespace() const {
return _sourceNss;
}
std::string CollectionCloner::getDiagnosticString() const {
stdx::lock_guard lk(_mutex);
str::stream output;
output << "CollectionCloner";
output << " executor: " << _executor->getDiagnosticString();
output << " source: " << _source.toString();
output << " source namespace: " << _sourceNss.toString();
output << " destination namespace: " << _destNss.toString();
output << " collection options: " << _options.toBSON();
output << " active: " << _active;
output << " listIndexes fetcher: " << _listIndexesFetcher.getDiagnosticString();
output << " find fetcher: " << _findFetcher.getDiagnosticString();
output << " database worked callback handle: "
<< (_dbWorkCallbackHandle.isValid() ? "valid" : "invalid");
return output;
}
bool CollectionCloner::isActive() const {
stdx::lock_guard lk(_mutex);
return _active;
}
Status CollectionCloner::start() {
stdx::lock_guard lk(_mutex);
if (_active) {
return Status(ErrorCodes::IllegalOperation, "collection cloner already started");
}
Status scheduleResult = _listIndexesFetcher.schedule();
if (!scheduleResult.isOK()) {
return scheduleResult;
}
_active = true;
return Status::OK();
}
void CollectionCloner::cancel() {
ReplicationExecutor::CallbackHandle dbWorkCallbackHandle;
{
stdx::lock_guard lk(_mutex);
if (!_active) {
return;
}
dbWorkCallbackHandle = _dbWorkCallbackHandle;
}
_listIndexesFetcher.cancel();
_findFetcher.cancel();
if (dbWorkCallbackHandle.isValid()) {
_executor->cancel(dbWorkCallbackHandle);
}
}
void CollectionCloner::wait() {
stdx::unique_lock lk(_mutex);
_condition.wait(lk, [this]() { return !_active; });
}
void CollectionCloner::waitForDbWorker() {
ReplicationExecutor::CallbackHandle dbWorkCallbackHandle;
{
stdx::lock_guard lk(_mutex);
if (!_active) {
return;
}
dbWorkCallbackHandle = _dbWorkCallbackHandle;
}
if (dbWorkCallbackHandle.isValid()) {
_executor->wait(dbWorkCallbackHandle);
}
}
void CollectionCloner::setScheduleDbWorkFn(const ScheduleDbWorkFn& scheduleDbWorkFn) {
stdx::lock_guard lk(_mutex);
_scheduleDbWorkFn = scheduleDbWorkFn;
}
void CollectionCloner::_listIndexesCallback(const Fetcher::QueryResponseStatus& fetchResult,
Fetcher::NextAction* nextAction,
BSONObjBuilder* getMoreBob) {
if (!fetchResult.isOK()) {
_finishCallback(nullptr, fetchResult.getStatus());
return;
}
auto batchData(fetchResult.getValue());
auto&& documents = batchData.documents;
if (documents.empty()) {
warning() << "No indexes found for collection " << _sourceNss.ns()
<< " while cloning from " << _source;
}
// We may be called with multiple batches leading to a need to grow _indexSpecs.
_indexSpecs.reserve(_indexSpecs.size() + documents.size());
_indexSpecs.insert(_indexSpecs.end(), documents.begin(), documents.end());
// The fetcher will continue to call with kGetMore until an error or the last batch.
if (*nextAction == Fetcher::NextAction::kGetMore) {
invariant(getMoreBob);
getMoreBob->append("getMore", batchData.cursorId);
getMoreBob->append("collection", batchData.nss.coll());
return;
}
// We have all of the indexes now, so we can start cloning the collection data.
auto&& scheduleResult = _scheduleDbWorkFn(
stdx::bind(&CollectionCloner::_beginCollectionCallback, this, stdx::placeholders::_1));
if (!scheduleResult.isOK()) {
_finishCallback(nullptr, scheduleResult.getStatus());
return;
}
_dbWorkCallbackHandle = scheduleResult.getValue();
}
void CollectionCloner::_findCallback(const StatusWith& fetchResult,
Fetcher::NextAction* nextAction,
BSONObjBuilder* getMoreBob) {
if (!fetchResult.isOK()) {
_finishCallback(nullptr, fetchResult.getStatus());
return;
}
auto batchData(fetchResult.getValue());
_documents = batchData.documents;
bool lastBatch = *nextAction == Fetcher::NextAction::kNoAction;
auto&& scheduleResult = _scheduleDbWorkFn(stdx::bind(
&CollectionCloner::_insertDocumentsCallback, this, stdx::placeholders::_1, lastBatch));
if (!scheduleResult.isOK()) {
_finishCallback(nullptr, scheduleResult.getStatus());
return;
}
if (*nextAction == Fetcher::NextAction::kGetMore) {
invariant(getMoreBob);
getMoreBob->append("getMore", batchData.cursorId);
getMoreBob->append("collection", batchData.nss.coll());
}
_dbWorkCallbackHandle = scheduleResult.getValue();
}
void CollectionCloner::_beginCollectionCallback(const ReplicationExecutor::CallbackArgs& cbd) {
OperationContext* txn = cbd.txn;
if (!cbd.status.isOK()) {
_finishCallback(txn, cbd.status);
return;
}
Status status = _storageInterface->beginCollection(txn, _destNss, _options, _indexSpecs);
if (!status.isOK()) {
_finishCallback(txn, status);
return;
}
Status scheduleStatus = _findFetcher.schedule();
if (!scheduleStatus.isOK()) {
_finishCallback(txn, scheduleStatus);
return;
}
}
void CollectionCloner::_insertDocumentsCallback(const ReplicationExecutor::CallbackArgs& cbd,
bool lastBatch) {
OperationContext* txn = cbd.txn;
if (!cbd.status.isOK()) {
_finishCallback(txn, cbd.status);
return;
}
Status status = _storageInterface->insertDocuments(txn, _destNss, _documents);
if (!status.isOK()) {
_finishCallback(txn, status);
return;
}
if (!lastBatch) {
return;
}
_finishCallback(txn, Status::OK());
}
void CollectionCloner::_finishCallback(OperationContext* txn, const Status& status) {
if (status.isOK()) {
auto commitStatus = _storageInterface->commitCollection(txn, _destNss);
if (!commitStatus.isOK()) {
warning() << "Failed to commit changes to collection " << _destNss.ns()
<< ": " << commitStatus;
}
}
_onCompletion(status);
stdx::lock_guard lk(_mutex);
_active = false;
_condition.notify_all();
}
} // namespace repl
} // namespace mongo