summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorBenety Goh <benety@mongodb.com>2015-05-27 22:00:05 -0400
committerBenety Goh <benety@mongodb.com>2015-06-02 09:50:22 -0400
commite02e22a862b40183c54918f264b538a31f39eef6 (patch)
tree3ee7bd28760196b3cf4b447511f18c6c559608bf /src
parentf0fa4307ef62af978454951900c7f4ae3a2cd8f6 (diff)
downloadmongo-e02e22a862b40183c54918f264b538a31f39eef6.tar.gz
SERVER-18035 cleaned up syncRollback. added unit test rs_rollback_test.
Diffstat (limited to 'src')
-rw-r--r--src/mongo/base/error_codes.err1
-rw-r--r--src/mongo/db/SConscript2
-rw-r--r--src/mongo/db/repl/SConscript91
-rw-r--r--src/mongo/db/repl/bgsync.cpp33
-rw-r--r--src/mongo/db/repl/oplog_interface.h79
-rw-r--r--src/mongo/db/repl/oplog_interface_local.cpp103
-rw-r--r--src/mongo/db/repl/oplog_interface_local.h58
-rw-r--r--src/mongo/db/repl/oplog_interface_mock.cpp83
-rw-r--r--src/mongo/db/repl/oplog_interface_mock.h54
-rw-r--r--src/mongo/db/repl/oplog_interface_remote.cpp89
-rw-r--r--src/mongo/db/repl/oplog_interface_remote.h58
-rw-r--r--src/mongo/db/repl/replication_coordinator_mock.cpp6
-rw-r--r--src/mongo/db/repl/roll_back_local_operations.cpp197
-rw-r--r--src/mongo/db/repl/roll_back_local_operations.h95
-rw-r--r--src/mongo/db/repl/roll_back_local_operations_test.cpp432
-rw-r--r--src/mongo/db/repl/rollback_source.h90
-rw-r--r--src/mongo/db/repl/rollback_source_impl.cpp98
-rw-r--r--src/mongo/db/repl/rollback_source_impl.h74
-rw-r--r--src/mongo/db/repl/rs_rollback.cpp345
-rw-r--r--src/mongo/db/repl/rs_rollback.h45
-rw-r--r--src/mongo/db/repl/rs_rollback_test.cpp541
21 files changed, 2343 insertions, 231 deletions
diff --git a/src/mongo/base/error_codes.err b/src/mongo/base/error_codes.err
index f7431039681..c8f5c40e513 100644
--- a/src/mongo/base/error_codes.err
+++ b/src/mongo/base/error_codes.err
@@ -126,6 +126,7 @@ error_code("NotAReplicaSet", 123)
error_code("IncompatibleElectionProtocol", 124)
error_code("CommandFailed", 125)
error_code("RPCProtocolNegotiationFailed", 126)
+error_code("UnrecoverableRollbackError", 127)
# Non-sequential error codes (for compatibility only)
error_code("NotMaster", 10107) #this comes from assert_util.h
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index f97420e5f44..a8a0567225e 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -597,7 +597,6 @@ serverOnlyFiles = [
"repl/replication_info.cpp",
"repl/resync.cpp",
"repl/rs_initialsync.cpp",
- "repl/rs_rollback.cpp",
"repl/rs_sync.cpp",
"repl/sync_source_feedback.cpp",
"service_context_d.cpp",
@@ -658,6 +657,7 @@ serveronlyLibdeps = [
"repl/replication_executor",
"repl/replication_executor",
"repl/replset_commands",
+ "repl/rs_rollback",
"repl/rslog",
"repl/sync_tail",
"repl/topology_coordinator_impl",
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index d7db78aae5c..8cf92b22702 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -78,6 +78,76 @@ env.Library(
)
env.Library(
+ target='oplog_interface_local',
+ source=[
+ 'oplog_interface_local.cpp',
+ ],
+ LIBDEPS=[
+ '$BUILD_DIR/mongo/db/query/internal_plans',
+ '$BUILD_DIR/mongo/db/query/query',
+ ],
+)
+
+env.Library(
+ target='oplog_interface_mock',
+ source=[
+ 'oplog_interface_mock.cpp',
+ ],
+ LIBDEPS=[
+ ],
+)
+
+env.Library(
+ target='oplog_interface_remote',
+ source=[
+ 'oplog_interface_remote.cpp',
+ ],
+ LIBDEPS=[
+ '$BUILD_DIR/mongo/client/clientdriver',
+ ],
+)
+
+env.Library(
+ target='rs_rollback',
+ source=[
+ 'rs_rollback.cpp',
+ ],
+ LIBDEPS=[
+ 'oplog_interface_local',
+ 'oplog_interface_remote',
+ 'roll_back_local_operations',
+ 'rollback_source_impl',
+ ],
+)
+
+env.CppUnitTest(
+ target='rs_rollback_test',
+ source=[
+ 'rs_rollback_test.cpp',
+ ],
+ LIBDEPS=[
+ 'oplog_interface_mock',
+ 'replmocks',
+ 'rs_rollback',
+ '$BUILD_DIR/mongo/db/coredb',
+ '$BUILD_DIR/mongo/db/serveronly',
+ '$BUILD_DIR/mongo/util/ntservice_mock',
+ ],
+ NO_CRUTCH = True,
+)
+
+env.Library(
+ target='rollback_source_impl',
+ source=[
+ 'rollback_source_impl.cpp',
+ ],
+ LIBDEPS=[
+ '$BUILD_DIR/mongo/bson/bson',
+ '$BUILD_DIR/mongo/client/clientdriver',
+ ],
+)
+
+env.Library(
target='sync_tail',
source=[
'sync_tail.cpp',
@@ -545,3 +615,24 @@ env.CppUnitTest(
'$BUILD_DIR/mongo/unittest/concurrency',
],
)
+
+env.Library(
+ target='roll_back_local_operations',
+ source=[
+ 'roll_back_local_operations.cpp',
+ ],
+ LIBDEPS=[
+ '$BUILD_DIR/mongo/bson/bson',
+ '$BUILD_DIR/mongo/logger/logger',
+ '$BUILD_DIR/mongo/util/foundation',
+ ],
+)
+
+env.CppUnitTest(
+ target='roll_back_local_operations_test',
+ source='roll_back_local_operations_test.cpp',
+ LIBDEPS=[
+ 'oplog_interface_mock',
+ 'roll_back_local_operations',
+ ],
+)
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp
index 164cb3c99c7..29ad6d5704f 100644
--- a/src/mongo/db/repl/bgsync.cpp
+++ b/src/mongo/db/repl/bgsync.cpp
@@ -41,8 +41,11 @@
#include "mongo/db/dbhelpers.h"
#include "mongo/db/operation_context_impl.h"
#include "mongo/db/repl/oplog.h"
+#include "mongo/db/repl/oplog_interface_local.h"
+#include "mongo/db/repl/oplogreader.h"
#include "mongo/db/repl/replication_coordinator_global.h"
#include "mongo/db/repl/replication_coordinator_impl.h"
+#include "mongo/db/repl/rollback_source_impl.h"
#include "mongo/db/repl/rs_rollback.h"
#include "mongo/db/repl/rs_sync.h"
#include "mongo/db/stats/timer_stats.h"
@@ -394,6 +397,19 @@ namespace {
bool BackgroundSync::_rollbackIfNeeded(OperationContext* txn, OplogReader& r) {
string hn = r.conn()->getServerAddress();
+ // Abort only when syncRollback detects we are in a unrecoverable state.
+ // In other cases, we log the message contained in the error status and retry later.
+ auto fassertRollbackStatusNoTrace = [](int msgid, const Status& status) {
+ if (status.isOK()) {
+ return;
+ }
+ if (ErrorCodes::UnrecoverableRollbackError == status.code()) {
+ fassertNoTrace(msgid, status);
+ }
+ warning() << "rollback cannot proceed at this time (retrying later): "
+ << status;
+ };
+
if (!r.more()) {
try {
BSONObj theirLastOp = r.getLastOp(rsOplogName.c_str());
@@ -405,7 +421,14 @@ namespace {
OpTime theirOpTime = extractOpTime(theirLastOp);
if (theirOpTime < _lastOpTimeFetched) {
log() << "we are ahead of the sync source, will try to roll back";
- syncRollback(txn, _replCoord->getMyLastOptime(), &r, _replCoord);
+ fassertRollbackStatusNoTrace(
+ 28656,
+ syncRollback(txn,
+ _replCoord->getMyLastOptime(),
+ OplogInterfaceLocal(txn, rsOplogName),
+ RollbackSourceImpl(r.conn(), rsOplogName),
+ _replCoord));
+
return true;
}
/* we're not ahead? maybe our new query got fresher data. best to come back and try again */
@@ -425,7 +448,13 @@ namespace {
if ( opTime != _lastOpTimeFetched || hash != _lastFetchedHash ) {
log() << "our last op time fetched: " << _lastOpTimeFetched;
log() << "source's GTE: " << opTime;
- syncRollback(txn, _replCoord->getMyLastOptime(), &r, _replCoord);
+ fassertRollbackStatusNoTrace(
+ 28657,
+ syncRollback(txn,
+ _replCoord->getMyLastOptime(),
+ OplogInterfaceLocal(txn, rsOplogName),
+ RollbackSourceImpl(r.conn(), rsOplogName),
+ _replCoord));
return true;
}
diff --git a/src/mongo/db/repl/oplog_interface.h b/src/mongo/db/repl/oplog_interface.h
new file mode 100644
index 00000000000..4ca10947ddd
--- /dev/null
+++ b/src/mongo/db/repl/oplog_interface.h
@@ -0,0 +1,79 @@
+/**
+ * Copyright (C) 2015 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <memory>
+#include <string>
+#include <utility>
+
+#include "mongo/base/disallow_copying.h"
+#include "mongo/base/status_with.h"
+#include "mongo/bson/bsonobj.h"
+#include "mongo/db/record_id.h"
+
+namespace mongo {
+namespace repl {
+
+ class OplogInterface {
+ MONGO_DISALLOW_COPYING(OplogInterface);
+ public:
+
+ class Iterator;
+
+ OplogInterface() = default;
+ virtual ~OplogInterface() = default;
+
+ /**
+ * Diagnostic information.
+ */
+ virtual std::string toString() const = 0;
+
+ /**
+ * Produces an iterator over oplog collection in reverse natural order.
+ */
+ virtual std::unique_ptr<Iterator> makeIterator() const = 0;
+ };
+
+ class OplogInterface::Iterator {
+ MONGO_DISALLOW_COPYING(Iterator);
+ public:
+
+ using Value = std::pair<BSONObj, RecordId>;
+
+ Iterator() = default;
+ virtual ~Iterator() = default;
+
+ /**
+ * Returns next operation and record id (if applicable) in the oplog.
+ */
+ virtual StatusWith<Value> next() = 0;
+ };
+
+} // namespace repl
+} // namespace mongo
diff --git a/src/mongo/db/repl/oplog_interface_local.cpp b/src/mongo/db/repl/oplog_interface_local.cpp
new file mode 100644
index 00000000000..14a2906217b
--- /dev/null
+++ b/src/mongo/db/repl/oplog_interface_local.cpp
@@ -0,0 +1,103 @@
+/**
+ * Copyright (C) 2015 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/repl/oplog_interface_local.h"
+
+#include "mongo/db/db_raii.h"
+#include "mongo/db/query/internal_plans.h"
+#include "mongo/db/query/plan_executor.h"
+#include "mongo/util/mongoutils/str.h"
+
+namespace mongo {
+namespace repl {
+
+namespace {
+
+ class OplogIteratorLocal : public OplogInterface::Iterator {
+ public:
+
+ OplogIteratorLocal(std::unique_ptr<OldClientContext> ctx,
+ std::unique_ptr<PlanExecutor> exec);
+
+ StatusWith<Value> next() override;
+
+ private:
+
+ std::unique_ptr<OldClientContext> _ctx;
+ std::unique_ptr<PlanExecutor> _exec;
+
+ };
+
+ OplogIteratorLocal::OplogIteratorLocal(std::unique_ptr<OldClientContext> ctx,
+ std::unique_ptr<PlanExecutor> exec)
+ : _ctx(std::move(ctx)),
+ _exec(std::move(exec)) { }
+
+ StatusWith<OplogInterface::Iterator::Value> OplogIteratorLocal::next() {
+ BSONObj obj;
+ RecordId recordId;
+
+ if (PlanExecutor::ADVANCED != _exec->getNext(&obj, &recordId)) {
+ return StatusWith<Value>(ErrorCodes::NoSuchKey, "no more operations in local oplog");
+ }
+ return StatusWith<Value>(std::make_pair(obj, recordId));
+ }
+
+} // namespace
+
+ OplogInterfaceLocal::OplogInterfaceLocal(OperationContext* txn,
+ const std::string& collectionName)
+ : _txn(txn),
+ _collectionName(collectionName) {
+
+ invariant(txn);
+ invariant(!collectionName.empty());
+ }
+
+ std::string OplogInterfaceLocal::toString() const {
+ return str::stream() <<
+ "LocalOplogInterface: "
+ "operation context: " << _txn->getNS() << "/" << _txn->getOpID() <<
+ "; collection: " << _collectionName;
+ }
+
+ std::unique_ptr<OplogInterface::Iterator> OplogInterfaceLocal::makeIterator() const {
+ std::unique_ptr<OldClientContext> ctx(new OldClientContext(_txn, _collectionName));
+ std::unique_ptr<PlanExecutor> exec(
+ InternalPlanner::collectionScan(_txn,
+ _collectionName,
+ ctx->db()->getCollection(_collectionName),
+ InternalPlanner::BACKWARD));
+ return std::unique_ptr<OplogInterface::Iterator>(
+ new OplogIteratorLocal(std::move(ctx), std::move(exec)));
+ }
+
+} // namespace repl
+} // namespace mongo
diff --git a/src/mongo/db/repl/oplog_interface_local.h b/src/mongo/db/repl/oplog_interface_local.h
new file mode 100644
index 00000000000..cd61a81a239
--- /dev/null
+++ b/src/mongo/db/repl/oplog_interface_local.h
@@ -0,0 +1,58 @@
+/**
+ * Copyright (C) 2015 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/repl/oplog_interface.h"
+
+namespace mongo {
+
+ class OperationContext;
+
+namespace repl {
+
+ /**
+ * Scans local oplog collection in reverse natural order.
+ */
+
+ class OplogInterfaceLocal : public OplogInterface {
+ public:
+
+ OplogInterfaceLocal(OperationContext* txn, const std::string& collectionName);
+ std::string toString() const override;
+ std::unique_ptr<OplogInterface::Iterator> makeIterator() const override;
+
+ private:
+
+ OperationContext* _txn;
+ std::string _collectionName;
+
+ };
+
+} // namespace repl
+} // namespace mongo
diff --git a/src/mongo/db/repl/oplog_interface_mock.cpp b/src/mongo/db/repl/oplog_interface_mock.cpp
new file mode 100644
index 00000000000..97cea831fd7
--- /dev/null
+++ b/src/mongo/db/repl/oplog_interface_mock.cpp
@@ -0,0 +1,83 @@
+/**
+ * Copyright (C) 2015 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/repl/oplog_interface_mock.h"
+
+namespace mongo {
+namespace repl {
+
+namespace {
+
+ class OplogIteratorMock : public OplogInterface::Iterator {
+ public:
+
+ OplogIteratorMock(OplogInterfaceMock::Operations::const_iterator iterator,
+ OplogInterfaceMock::Operations::const_iterator iteratorEnd);
+ StatusWith<Value> next() override;
+
+ private:
+
+ OplogInterfaceMock::Operations::const_iterator _iterator;
+ OplogInterfaceMock::Operations::const_iterator _iteratorEnd;
+
+ };
+
+ OplogIteratorMock::OplogIteratorMock(OplogInterfaceMock::Operations::const_iterator iter,
+ OplogInterfaceMock::Operations::const_iterator iterEnd)
+ : _iterator(iter),
+ _iteratorEnd(iterEnd) {}
+
+ StatusWith<OplogInterface::Iterator::Value> OplogIteratorMock::next() {
+ if (_iterator == _iteratorEnd) {
+ return StatusWith<OplogInterface::Iterator::Value>(ErrorCodes::NoSuchKey,
+ "no more ops");
+ }
+ return *(_iterator++);
+ }
+
+} // namespace
+
+ OplogInterfaceMock::OplogInterfaceMock(std::initializer_list<Operation> operations)
+ : _operations(operations) {}
+
+ OplogInterfaceMock::OplogInterfaceMock(const Operations& operations)
+ : _operations(operations) {}
+
+ std::string OplogInterfaceMock::toString() const {
+ return "OplogInterfaceMock";
+ }
+
+ std::unique_ptr<OplogInterface::Iterator> OplogInterfaceMock::makeIterator() const {
+ return std::unique_ptr<OplogInterface::Iterator>(
+ new OplogIteratorMock(_operations.begin(), _operations.end()));
+ }
+
+} // namespace repl
+} // namespace mongo
diff --git a/src/mongo/db/repl/oplog_interface_mock.h b/src/mongo/db/repl/oplog_interface_mock.h
new file mode 100644
index 00000000000..4c2049a5688
--- /dev/null
+++ b/src/mongo/db/repl/oplog_interface_mock.h
@@ -0,0 +1,54 @@
+/**
+ * Copyright (C) 2015 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <initializer_list>
+#include "mongo/db/repl/oplog_interface.h"
+
+namespace mongo {
+namespace repl {
+
+ /**
+ * Simulates oplog for testing rollback functionality.
+ */
+ class OplogInterfaceMock : public OplogInterface {
+ MONGO_DISALLOW_COPYING(OplogInterfaceMock);
+ public:
+ using Operation = std::pair<BSONObj,RecordId>;
+ using Operations = std::list<Operation>;
+ explicit OplogInterfaceMock(std::initializer_list<Operation> operations);
+ explicit OplogInterfaceMock(const Operations& operations);
+ std::string toString() const override;
+ std::unique_ptr<OplogInterface::Iterator> makeIterator() const override;
+ private:
+ Operations _operations;
+ };
+
+} // namespace repl
+} // namespace mongo
diff --git a/src/mongo/db/repl/oplog_interface_remote.cpp b/src/mongo/db/repl/oplog_interface_remote.cpp
new file mode 100644
index 00000000000..da78924fc55
--- /dev/null
+++ b/src/mongo/db/repl/oplog_interface_remote.cpp
@@ -0,0 +1,89 @@
+/**
+ * Copyright (C) 2015 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/repl/oplog_interface_remote.h"
+
+#include "mongo/client/dbclientinterface.h"
+#include "mongo/db/jsobj.h"
+#include "mongo/util/mongoutils/str.h"
+
+namespace mongo {
+namespace repl {
+
+namespace {
+
+ class OplogIteratorRemote : public OplogInterface::Iterator {
+ public:
+
+ OplogIteratorRemote(std::unique_ptr<DBClientCursor> cursor);
+ StatusWith<Value> next() override;
+
+ private:
+
+ std::unique_ptr<DBClientCursor> _cursor;
+
+ };
+
+ OplogIteratorRemote::OplogIteratorRemote(std::unique_ptr<DBClientCursor> cursor)
+ : _cursor(std::move(cursor)) { }
+
+ StatusWith<OplogInterface::Iterator::Value> OplogIteratorRemote::next() {
+ if (!_cursor.get()) {
+ return StatusWith<Value>(ErrorCodes::NamespaceNotFound, "no cursor for remote oplog");
+ }
+ if (!_cursor->more()) {
+ return StatusWith<Value>(ErrorCodes::NoSuchKey, "no more operations in remote oplog");
+ }
+ return StatusWith<Value>(std::make_pair(_cursor->nextSafe(), RecordId()));
+ }
+
+} // namespace
+
+ OplogInterfaceRemote::OplogInterfaceRemote(DBClientConnection* conn,
+ const std::string& collectionName)
+ : _conn(conn),
+ _collectionName(collectionName) {
+
+ invariant(conn);
+ }
+
+ std::string OplogInterfaceRemote::toString() const {
+ return _conn->toString();
+ }
+
+ std::unique_ptr<OplogInterface::Iterator> OplogInterfaceRemote::makeIterator() const {
+ const Query query = Query().sort(BSON("$natural" << -1));
+ const BSONObj fields = BSON("ts" << 1 << "h" << 1);
+ return std::unique_ptr<OplogInterface::Iterator>(
+ new OplogIteratorRemote(_conn->query(_collectionName, query, 0, 0, &fields, 0, 0)));
+ }
+
+} // namespace repl
+} // namespace mongo
diff --git a/src/mongo/db/repl/oplog_interface_remote.h b/src/mongo/db/repl/oplog_interface_remote.h
new file mode 100644
index 00000000000..ee91d9197d2
--- /dev/null
+++ b/src/mongo/db/repl/oplog_interface_remote.h
@@ -0,0 +1,58 @@
+/**
+ * Copyright (C) 2015 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/repl/oplog_interface.h"
+
+namespace mongo {
+
+ class DBClientConnection;
+
+namespace repl {
+
+ /**
+ * Reads oplog on remote server.
+ */
+
+ class OplogInterfaceRemote : public OplogInterface {
+ public:
+
+ explicit OplogInterfaceRemote(DBClientConnection* conn, const std::string& collectionName);
+ std::string toString() const override;
+ std::unique_ptr<OplogInterface::Iterator> makeIterator() const override;
+
+ private:
+
+ DBClientConnection* _conn;
+ std::string _collectionName;
+
+ };
+
+} // namespace repl
+} // namespace mongo
diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp
index 4ca92dca19d..4648eb05c7d 100644
--- a/src/mongo/db/repl/replication_coordinator_mock.cpp
+++ b/src/mongo/db/repl/replication_coordinator_mock.cpp
@@ -64,6 +64,12 @@ namespace repl {
}
ReplicationCoordinator::Mode ReplicationCoordinatorMock::getReplicationMode() const {
+ if (_settings.usingReplSets()) {
+ return modeReplSet;
+ }
+ if (_settings.master || _settings.slave) {
+ return modeMasterSlave;
+ }
return modeNone;
}
diff --git a/src/mongo/db/repl/roll_back_local_operations.cpp b/src/mongo/db/repl/roll_back_local_operations.cpp
new file mode 100644
index 00000000000..2cf07fa36b7
--- /dev/null
+++ b/src/mongo/db/repl/roll_back_local_operations.cpp
@@ -0,0 +1,197 @@
+/**
+ * Copyright (C) 2015 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/repl/roll_back_local_operations.h"
+
+#include "mongo/util/assert_util.h"
+#include "mongo/util/log.h"
+#include "mongo/util/mongoutils/str.h"
+
+namespace mongo {
+namespace repl {
+
+namespace {
+
+ Timestamp getTimestamp(const BSONObj& operation) {
+ return operation["ts"].timestamp();
+ }
+
+ Timestamp getTimestamp(const OplogInterface::Iterator::Value& oplogValue) {
+ return getTimestamp(oplogValue.first);
+ }
+
+ long long getHash(const BSONObj& operation) {
+ return operation["h"].Long();
+ }
+
+ long long getHash(const OplogInterface::Iterator::Value& oplogValue) {
+ return getHash(oplogValue.first);
+ }
+
+} // namespace
+
+ RollBackLocalOperations::RollBackLocalOperations(
+ const OplogInterface& localOplog,
+ const RollbackOperationFn& rollbackOperation)
+
+ : _localOplogIterator(localOplog.makeIterator()),
+ _rollbackOperation(rollbackOperation),
+ _scanned(0) {
+
+ uassert(ErrorCodes::BadValue, "invalid local oplog iterator", _localOplogIterator);
+ uassert(ErrorCodes::BadValue, "null roll back operation function", rollbackOperation);
+ }
+
+ StatusWith<RollBackLocalOperations::RollbackCommonPoint>
+ RollBackLocalOperations::onRemoteOperation(const BSONObj& operation) {
+
+ if (_scanned == 0) {
+ auto result = _localOplogIterator->next();
+ if (!result.isOK()) {
+ return StatusWith<RollbackCommonPoint>(ErrorCodes::OplogStartMissing,
+ "no oplog during initsync");
+ }
+ _localOplogValue = result.getValue();
+
+ long long diff =
+ static_cast<long long>(getTimestamp(_localOplogValue).getSecs()) -
+ getTimestamp(operation).getSecs();
+ // diff could be positive, negative, or zero
+ log() << "rollback our last optime: "
+ << getTimestamp(_localOplogValue).toStringPretty();
+ log() << "rollback their last optime: " << getTimestamp(operation).toStringPretty();
+ log() << "rollback diff in end of log times: " << diff << " seconds";
+ if (diff > 1800) {
+ severe() << "rollback too long a time period for a rollback.";
+ return StatusWith<RollbackCommonPoint>(
+ ErrorCodes::ExceededTimeLimit,
+ "rollback error: not willing to roll back more than 30 minutes of data");
+ }
+ }
+
+ while (getTimestamp(_localOplogValue) > getTimestamp(operation)) {
+ _scanned++;
+ auto status = _rollbackOperation(_localOplogValue.first);
+ if (!status.isOK()) {
+ invariant(ErrorCodes::NoSuchKey != status.code());
+ return status;
+ }
+ auto result = _localOplogIterator->next();
+ if (!result.isOK()) {
+ severe() << "rollback error RS101 reached beginning of local oplog";
+ log() << " scanned: " << _scanned;
+ log() << " theirTime: " << getTimestamp(operation).toStringLong();
+ log() << " ourTime: " << getTimestamp(_localOplogValue).toStringLong();
+ return StatusWith<RollbackCommonPoint>(
+ ErrorCodes::NoMatchingDocument,
+ "RS101 reached beginning of local oplog [2]");
+ }
+ _localOplogValue = result.getValue();
+ }
+
+ if (getTimestamp(_localOplogValue) == getTimestamp(operation)) {
+ _scanned++;
+ if (getHash(_localOplogValue) == getHash(operation)) {
+ return StatusWith<RollbackCommonPoint>(
+ std::make_pair(getTimestamp(_localOplogValue), _localOplogValue.second));
+ }
+ auto status = _rollbackOperation(_localOplogValue.first);
+ if (!status.isOK()) {
+ invariant(ErrorCodes::NoSuchKey != status.code());
+ return status;
+ }
+ auto result = _localOplogIterator->next();
+ if (!result.isOK()) {
+ severe() << "rollback error RS101 reached beginning of local oplog";
+ log() << " scanned: " << _scanned;
+ log() << " theirTime: " << getTimestamp(operation).toStringLong();
+ log() << " ourTime: " << getTimestamp(_localOplogValue).toStringLong();
+ return StatusWith<RollbackCommonPoint>(
+ ErrorCodes::NoMatchingDocument,
+ "RS101 reached beginning of local oplog [1]");
+ }
+ _localOplogValue = result.getValue();
+ return StatusWith<RollbackCommonPoint>(
+ ErrorCodes::NoSuchKey,
+ "Unable to determine common point - same timestamp but different hash. "
+ "Need to process additional remote operations.");
+ }
+
+ if (getTimestamp(_localOplogValue) < getTimestamp(operation)) {
+ _scanned++;
+ return StatusWith<RollbackCommonPoint>(
+ ErrorCodes::NoSuchKey,
+ "Unable to determine common point. "
+ "Need to process additional remote operations.");
+ }
+
+ return RollbackCommonPoint(Timestamp(Seconds(1), 0), RecordId());
+ }
+
+ StatusWith<RollBackLocalOperations::RollbackCommonPoint> syncRollBackLocalOperations(
+ const OplogInterface& localOplog,
+ const OplogInterface& remoteOplog,
+ const RollBackLocalOperations::RollbackOperationFn& rollbackOperation) {
+
+ auto remoteIterator = remoteOplog.makeIterator();
+ auto remoteResult = remoteIterator->next();
+ if (!remoteResult.isOK()) {
+ return StatusWith<RollBackLocalOperations::RollbackCommonPoint>(
+ ErrorCodes::InvalidSyncSource,
+ "remote oplog empty or unreadable");
+ }
+
+ RollBackLocalOperations finder(localOplog, rollbackOperation);
+ Timestamp theirTime;
+ while (remoteResult.isOK()) {
+ theirTime = remoteResult.getValue().first["ts"].timestamp();
+ BSONObj theirObj = remoteResult.getValue().first;
+ auto result = finder.onRemoteOperation(theirObj);
+ if (result.isOK()) {
+ return result.getValue();
+ }
+ else if (result.getStatus().code() != ErrorCodes::NoSuchKey) {
+ return result;
+ }
+ remoteResult = remoteIterator->next();
+ }
+
+ severe() << "rollback error RS100 reached beginning of remote oplog";
+ log() << " them: " << remoteOplog.toString();
+ log() << " theirTime: " << theirTime.toStringLong();
+ return StatusWith<RollBackLocalOperations::RollbackCommonPoint>(
+ ErrorCodes::NoMatchingDocument,
+ "RS100 reached beginning of remote oplog [1]");
+ }
+
+} // namespace repl
+} // namespace mongo
diff --git a/src/mongo/db/repl/roll_back_local_operations.h b/src/mongo/db/repl/roll_back_local_operations.h
new file mode 100644
index 00000000000..4a9d5b71cd8
--- /dev/null
+++ b/src/mongo/db/repl/roll_back_local_operations.h
@@ -0,0 +1,95 @@
+/**
+ * Copyright (C) 2015 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/base/disallow_copying.h"
+#include "mongo/base/status.h"
+#include "mongo/base/status_with.h"
+#include "mongo/bson/bsonobj.h"
+#include "mongo/db/record_id.h"
+#include "mongo/db/repl/oplog_interface.h"
+#include "mongo/stdx/functional.h"
+
+namespace mongo {
+namespace repl {
+
+ class RollBackLocalOperations {
+ MONGO_DISALLOW_COPYING(RollBackLocalOperations);
+ public:
+
+ /**
+ * Type of function to roll back an operation or process it for future use.
+ * It can return any status except ErrorCodes::NoSuchKey. See onRemoteOperation().
+ */
+ using RollbackOperationFn = stdx::function<Status (const BSONObj&)>;
+
+ using RollbackCommonPoint = std::pair<Timestamp, RecordId>;
+
+ /**
+ * Initializes rollback processor with a valid local oplog.
+ * Whenever we encounter an operation in the local oplog that has to be rolled back,
+ * we will pass it to 'rollbackOperation'.
+ */
+ RollBackLocalOperations(const OplogInterface& localOplog,
+ const RollbackOperationFn& rollbackOperation);
+
+ virtual ~RollBackLocalOperations() = default;
+
+ /**
+ * Process single remote operation.
+ * Returns ErrorCodes::NoSuchKey if common point has not been found and
+ * additional operations have to be read from the remote oplog.
+ */
+ StatusWith<RollbackCommonPoint> onRemoteOperation(const BSONObj& operation);
+
+ private:
+
+ std::unique_ptr<OplogInterface::Iterator> _localOplogIterator;
+ RollbackOperationFn _rollbackOperation;
+ OplogInterface::Iterator::Value _localOplogValue;
+ unsigned long long _scanned;
+
+ };
+
+ /**
+ * Rolls back every operation in the local oplog that is not in the remote oplog, in reverse
+ * order.
+ *
+ * Whenever we encounter an operation in the local oplog that has to be rolled back,
+ * we will pass it to 'rollbackOperation' starting with the most recent operation.
+ * It is up to 'rollbackOperation' to roll back this operation immediately or
+ * process it for future use.
+ */
+ StatusWith<RollBackLocalOperations::RollbackCommonPoint> syncRollBackLocalOperations(
+ const OplogInterface& localOplog,
+ const OplogInterface& remoteOplog,
+ const RollBackLocalOperations::RollbackOperationFn& rollbackOperation);
+
+} // namespace repl
+} // namespace mongo
diff --git a/src/mongo/db/repl/roll_back_local_operations_test.cpp b/src/mongo/db/repl/roll_back_local_operations_test.cpp
new file mode 100644
index 00000000000..9fae8ce648c
--- /dev/null
+++ b/src/mongo/db/repl/roll_back_local_operations_test.cpp
@@ -0,0 +1,432 @@
+/**
+ * Copyright 2015 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include <iterator>
+
+#include "mongo/db/jsobj.h"
+#include "mongo/db/repl/oplog_interface_mock.h"
+#include "mongo/db/repl/roll_back_local_operations.h"
+#include "mongo/unittest/unittest.h"
+
+namespace {
+
+ using namespace mongo;
+ using namespace mongo::repl;
+
+ const OplogInterfaceMock::Operations kEmptyMockOperations;
+
+ BSONObj makeOp(int seconds, long long hash) {
+ return BSON("ts" << Timestamp(Seconds(seconds), 0) << "h" << hash);
+ }
+
+ int recordId = 0;
+ OplogInterfaceMock::Operation makeOpAndRecordId(int seconds, long long hash) {
+ return std::make_pair(makeOp(seconds, hash), RecordId(++recordId));
+ }
+
+ TEST(RollBackLocalOperationsTest, InvalidLocalOplogIterator) {
+ class InvalidOplogInterface : public OplogInterface {
+ public:
+ std::string toString() const override { return ""; }
+ std::unique_ptr<Iterator> makeIterator() const override {
+ return std::unique_ptr<Iterator>();
+ }
+ } invalidOplog;
+ ASSERT_THROWS_CODE(
+ RollBackLocalOperations(invalidOplog, [](const BSONObj&) { return Status::OK(); }),
+ UserException,
+ ErrorCodes::BadValue);
+ }
+
+ TEST(RollBackLocalOperationsTest, InvalidRollbackOperationFunction) {
+ ASSERT_THROWS_CODE(
+ RollBackLocalOperations(OplogInterfaceMock({makeOpAndRecordId(1, 0)}),
+ RollBackLocalOperations::RollbackOperationFn()),
+ UserException,
+ ErrorCodes::BadValue);
+ }
+
+ TEST(RollBackLocalOperationsTest, EmptyLocalOplog) {
+ OplogInterfaceMock localOplog(kEmptyMockOperations);
+ RollBackLocalOperations finder(localOplog, [](const BSONObj&) { return Status::OK(); });
+ auto result = finder.onRemoteOperation(makeOp(1, 0));
+ ASSERT_EQUALS(ErrorCodes::OplogStartMissing, result.getStatus().code());
+ }
+
+ TEST(RollBackLocalOperationsTest, RollbackPeriodTooLong) {
+ OplogInterfaceMock localOplog({makeOpAndRecordId(1802, 0)});
+ RollBackLocalOperations finder(localOplog, [](const BSONObj&) { return Status::OK(); });
+ auto result = finder.onRemoteOperation(makeOp(1, 0));
+ ASSERT_EQUALS(ErrorCodes::ExceededTimeLimit, result.getStatus().code());
+ }
+
+ TEST(RollBackLocalOperationsTest, RollbackMultipleLocalOperations) {
+ auto commonOperation = makeOpAndRecordId(1, 1);
+ OplogInterfaceMock::Operations localOperations({
+ makeOpAndRecordId(5, 1),
+ makeOpAndRecordId(4, 1),
+ makeOpAndRecordId(3, 1),
+ makeOpAndRecordId(2, 1),
+ commonOperation,
+ });
+ OplogInterfaceMock localOplog(localOperations);
+ auto i = localOperations.cbegin();
+ auto rollbackOperation = [&](const BSONObj& operation) {
+ ASSERT_EQUALS(i->first, operation);
+ i++;
+ return Status::OK();
+ };
+ RollBackLocalOperations finder(localOplog, rollbackOperation);
+ auto result = finder.onRemoteOperation(commonOperation.first);
+ ASSERT_OK(result.getStatus());
+ ASSERT_EQUALS(commonOperation.first["ts"].timestamp(), result.getValue().first);
+ ASSERT_EQUALS(commonOperation.second, result.getValue().second);
+ ASSERT_FALSE(i == localOperations.cend());
+ ASSERT_EQUALS(commonOperation.first, i->first);
+ i++;
+ ASSERT_TRUE(i == localOperations.cend());
+ }
+
+ TEST(RollBackLocalOperationsTest, RollbackOperationFailed) {
+ auto commonOperation = makeOpAndRecordId(1, 1);
+ OplogInterfaceMock::Operations localOperations({
+ makeOpAndRecordId(2, 1),
+ commonOperation,
+ });
+ OplogInterfaceMock localOplog(localOperations);
+ auto rollbackOperation = [&](const BSONObj& operation) {
+ return Status(ErrorCodes::OperationFailed, "");
+ };
+ RollBackLocalOperations finder(localOplog, rollbackOperation);
+ auto result = finder.onRemoteOperation(commonOperation.first);
+ ASSERT_EQUALS(ErrorCodes::OperationFailed, result.getStatus().code());
+ }
+
+ TEST(RollBackLocalOperationsTest, EndOfLocalOplog) {
+ auto commonOperation = makeOpAndRecordId(1, 1);
+ OplogInterfaceMock::Operations localOperations({
+ makeOpAndRecordId(2, 1),
+ });
+ OplogInterfaceMock localOplog(localOperations);
+ RollBackLocalOperations finder(localOplog, [](const BSONObj&) { return Status::OK(); });
+ auto result = finder.onRemoteOperation(commonOperation.first);
+ ASSERT_EQUALS(ErrorCodes::NoMatchingDocument, result.getStatus().code());
+ }
+
+ TEST(RollBackLocalOperationsTest, SkipRemoteOperations) {
+ auto commonOperation = makeOpAndRecordId(1, 1);
+ OplogInterfaceMock::Operations localOperations({
+ makeOpAndRecordId(5, 1),
+ makeOpAndRecordId(4, 1),
+ makeOpAndRecordId(2, 1),
+ commonOperation,
+ });
+ OplogInterfaceMock localOplog(localOperations);
+ auto i = localOperations.cbegin();
+ auto rollbackOperation = [&](const BSONObj& operation) {
+ ASSERT_EQUALS(i->first, operation);
+ i++;
+ return Status::OK();
+ };
+ RollBackLocalOperations finder(localOplog, rollbackOperation);
+ {
+ auto result = finder.onRemoteOperation(makeOp(6,1));
+ ASSERT_EQUALS(ErrorCodes::NoSuchKey, result.getStatus().code());
+ ASSERT_TRUE(i == localOperations.cbegin());
+ }
+ {
+ auto result = finder.onRemoteOperation(makeOp(3,1));
+ ASSERT_EQUALS(ErrorCodes::NoSuchKey, result.getStatus().code());
+ ASSERT_TRUE(std::distance(localOperations.cbegin(), i) == 2);
+ }
+ auto result = finder.onRemoteOperation(commonOperation.first);
+ ASSERT_OK(result.getStatus());
+ ASSERT_EQUALS(commonOperation.first["ts"].timestamp(), result.getValue().first);
+ ASSERT_EQUALS(commonOperation.second, result.getValue().second);
+ ASSERT_FALSE(i == localOperations.cend());
+ ASSERT_EQUALS(commonOperation.first, i->first);
+ i++;
+ ASSERT_TRUE(i == localOperations.cend());
+ }
+
+ TEST(RollBackLocalOperationsTest, SameTimestampDifferentHashess) {
+ auto commonOperation = makeOpAndRecordId(1, 1);
+ OplogInterfaceMock::Operations localOperations({
+ makeOpAndRecordId(1, 5),
+ makeOpAndRecordId(1, 3),
+ commonOperation,
+ });
+ OplogInterfaceMock localOplog(localOperations);
+ auto i = localOperations.cbegin();
+ auto rollbackOperation = [&](const BSONObj& operation) {
+ ASSERT_EQUALS(i->first, operation);
+ i++;
+ return Status::OK();
+ };
+ RollBackLocalOperations finder(localOplog, rollbackOperation);
+ {
+ auto result = finder.onRemoteOperation(makeOp(1,4));
+ ASSERT_EQUALS(ErrorCodes::NoSuchKey, result.getStatus().code());
+ ASSERT_TRUE(std::distance(localOperations.cbegin(), i) == 1);
+ }
+ {
+ auto result = finder.onRemoteOperation(makeOp(1,2));
+ ASSERT_EQUALS(ErrorCodes::NoSuchKey, result.getStatus().code());
+ ASSERT_TRUE(std::distance(localOperations.cbegin(), i) == 2);
+ }
+ auto result = finder.onRemoteOperation(commonOperation.first);
+ ASSERT_OK(result.getStatus());
+ ASSERT_EQUALS(commonOperation.first["ts"].timestamp(), result.getValue().first);
+ ASSERT_EQUALS(commonOperation.second, result.getValue().second);
+ ASSERT_FALSE(i == localOperations.cend());
+ ASSERT_EQUALS(commonOperation.first, i->first);
+ i++;
+ ASSERT_TRUE(i == localOperations.cend());
+ }
+
+ TEST(RollBackLocalOperationsTest, SameTimestampDifferentHashesRollbackOperationFailed) {
+ auto commonOperation = makeOpAndRecordId(1, 1);
+ OplogInterfaceMock::Operations localOperations({
+ makeOpAndRecordId(1, 3),
+ commonOperation,
+ });
+ OplogInterfaceMock localOplog(localOperations);
+ auto rollbackOperation = [&](const BSONObj& operation) {
+ return Status(ErrorCodes::OperationFailed, "");
+ };
+ RollBackLocalOperations finder(localOplog, rollbackOperation);
+ auto result = finder.onRemoteOperation(makeOp(1,2));
+ ASSERT_EQUALS(ErrorCodes::OperationFailed, result.getStatus().code());
+ }
+
+ TEST(RollBackLocalOperationsTest, SameTimestampDifferentHashesEndOfLocalOplog) {
+ OplogInterfaceMock::Operations localOperations({
+ makeOpAndRecordId(1, 3),
+ });
+ OplogInterfaceMock localOplog(localOperations);
+ RollBackLocalOperations finder(localOplog, [](const BSONObj&) { return Status::OK(); });
+ auto result = finder.onRemoteOperation(makeOp(1,2));
+ ASSERT_EQUALS(ErrorCodes::NoMatchingDocument, result.getStatus().code());
+ }
+
+ TEST(SyncRollBackLocalOperationsTest, OplogStartMissing) {
+ ASSERT_EQUALS(
+ ErrorCodes::OplogStartMissing,
+ syncRollBackLocalOperations(
+ OplogInterfaceMock(kEmptyMockOperations),
+ OplogInterfaceMock({makeOpAndRecordId(1, 0)}),
+ [](const BSONObj&) { return Status::OK(); }).getStatus().code());
+ }
+
+ TEST(SyncRollBackLocalOperationsTest, RemoteOplogMissing) {
+ ASSERT_EQUALS(
+ ErrorCodes::InvalidSyncSource,
+ syncRollBackLocalOperations(
+ OplogInterfaceMock({makeOpAndRecordId(1, 0)}),
+ OplogInterfaceMock(kEmptyMockOperations),
+ [](const BSONObj&) { return Status::OK(); }).getStatus().code());
+ }
+
+ TEST(SyncRollBackLocalOperationsTest, RollbackPeriodTooLong) {
+ ASSERT_EQUALS(
+ ErrorCodes::ExceededTimeLimit,
+ syncRollBackLocalOperations(
+ OplogInterfaceMock({makeOpAndRecordId(1802, 0)}),
+ OplogInterfaceMock({makeOpAndRecordId(1, 0)}),
+ [](const BSONObj&) { return Status::OK(); }).getStatus().code());
+ }
+
+ TEST(SyncRollBackLocalOperationsTest, RollbackTwoOperations) {
+ auto commonOperation = makeOpAndRecordId(1, 1);
+ OplogInterfaceMock::Operations localOperations({
+ makeOpAndRecordId(3, 1),
+ makeOpAndRecordId(2, 1),
+ commonOperation,
+ });
+ auto i = localOperations.cbegin();
+ auto result =
+ syncRollBackLocalOperations(
+ OplogInterfaceMock(localOperations),
+ OplogInterfaceMock({commonOperation}),
+ [&](const BSONObj& operation) {
+ ASSERT_EQUALS(i->first, operation);
+ i++;
+ return Status::OK();
+ });
+ ASSERT_OK(result.getStatus());
+ ASSERT_EQUALS(commonOperation.first["ts"].timestamp(), result.getValue().first);
+ ASSERT_EQUALS(commonOperation.second, result.getValue().second);
+ ASSERT_FALSE(i == localOperations.cend());
+ ASSERT_EQUALS(commonOperation.first, i->first);
+ i++;
+ ASSERT_TRUE(i == localOperations.cend());
+ }
+
+ TEST(SyncRollBackLocalOperationsTest, SkipOneRemoteOperation) {
+ auto commonOperation = makeOpAndRecordId(1, 1);
+ auto remoteOperation = makeOpAndRecordId(2, 1);
+ auto result =
+ syncRollBackLocalOperations(
+ OplogInterfaceMock({commonOperation}),
+ OplogInterfaceMock({remoteOperation, commonOperation}),
+ [&](const BSONObj& operation) {
+ FAIL("should not reach here");
+ return Status::OK();
+ });
+ ASSERT_OK(result.getStatus());
+ ASSERT_EQUALS(commonOperation.first["ts"].timestamp(), result.getValue().first);
+ ASSERT_EQUALS(commonOperation.second, result.getValue().second);
+ }
+
+ TEST(SyncRollBackLocalOperationsTest, SameTimestampDifferentHashes) {
+ auto commonOperation = makeOpAndRecordId(1, 1);
+ auto localOperation = makeOpAndRecordId(1, 2);
+ auto remoteOperation = makeOpAndRecordId(1, 3);
+ bool called = false;
+ auto result =
+ syncRollBackLocalOperations(
+ OplogInterfaceMock({localOperation, commonOperation}),
+ OplogInterfaceMock({remoteOperation, commonOperation}),
+ [&](const BSONObj& operation) {
+ ASSERT_EQUALS(localOperation.first, operation);
+ called = true;
+ return Status::OK();
+ });
+ ASSERT_OK(result.getStatus());
+ ASSERT_EQUALS(commonOperation.first["ts"].timestamp(), result.getValue().first);
+ ASSERT_EQUALS(commonOperation.second, result.getValue().second);
+ ASSERT_TRUE(called);
+ }
+
+ TEST(SyncRollBackLocalOperationsTest, SameTimestampEndOfLocalOplog) {
+ auto commonOperation = makeOpAndRecordId(1, 1);
+ auto localOperation = makeOpAndRecordId(1, 2);
+ auto remoteOperation = makeOpAndRecordId(1, 3);
+ bool called = false;
+ auto result =
+ syncRollBackLocalOperations(
+ OplogInterfaceMock({localOperation}),
+ OplogInterfaceMock({remoteOperation, commonOperation}),
+ [&](const BSONObj& operation) {
+ ASSERT_EQUALS(localOperation.first, operation);
+ called = true;
+ return Status::OK();
+ });
+ ASSERT_EQUALS(ErrorCodes::NoMatchingDocument, result.getStatus().code());
+ ASSERT_STRING_CONTAINS(result.getStatus().reason(),
+ "RS101 reached beginning of local oplog [1]");
+ ASSERT_TRUE(called);
+ }
+
+ TEST(SyncRollBackLocalOperationsTest, SameTimestampRollbackOperationFailed) {
+ auto commonOperation = makeOpAndRecordId(1, 1);
+ auto localOperation = makeOpAndRecordId(1, 2);
+ auto remoteOperation = makeOpAndRecordId(1, 3);
+ auto result =
+ syncRollBackLocalOperations(
+ OplogInterfaceMock({localOperation, commonOperation}),
+ OplogInterfaceMock({remoteOperation, commonOperation}),
+ [&](const BSONObj& operation) {
+ return Status(ErrorCodes::OperationFailed, "");
+ });
+ ASSERT_EQUALS(ErrorCodes::OperationFailed, result.getStatus().code());
+ }
+
+ TEST(SyncRollBackLocalOperationsTest, SameTimestampEndOfRemoteOplog) {
+ auto commonOperation = makeOpAndRecordId(1, 1);
+ auto localOperation = makeOpAndRecordId(1, 2);
+ auto remoteOperation = makeOpAndRecordId(1, 3);
+ bool called = false;
+ auto result =
+ syncRollBackLocalOperations(
+ OplogInterfaceMock({localOperation, commonOperation}),
+ OplogInterfaceMock({remoteOperation}),
+ [&](const BSONObj& operation) {
+ ASSERT_EQUALS(localOperation.first, operation);
+ called = true;
+ return Status::OK();
+ });
+ ASSERT_EQUALS(ErrorCodes::NoMatchingDocument, result.getStatus().code());
+ ASSERT_STRING_CONTAINS(result.getStatus().reason(),
+ "RS100 reached beginning of remote oplog");
+ ASSERT_TRUE(called);
+ }
+
+ TEST(SyncRollBackLocalOperationsTest, DifferentTimestampEndOfLocalOplog) {
+ auto commonOperation = makeOpAndRecordId(1, 1);
+ auto localOperation = makeOpAndRecordId(3, 1);
+ auto remoteOperation = makeOpAndRecordId(2, 1);
+ bool called = false;
+ auto result =
+ syncRollBackLocalOperations(
+ OplogInterfaceMock({localOperation}),
+ OplogInterfaceMock({remoteOperation, commonOperation}),
+ [&](const BSONObj& operation) {
+ ASSERT_EQUALS(localOperation.first, operation);
+ called = true;
+ return Status::OK();
+ });
+ ASSERT_EQUALS(ErrorCodes::NoMatchingDocument, result.getStatus().code());
+ ASSERT_STRING_CONTAINS(result.getStatus().reason(),
+ "RS101 reached beginning of local oplog [2]");
+ ASSERT_TRUE(called);
+ }
+
+ TEST(SyncRollBackLocalOperationsTest, DifferentTimestampRollbackOperationFailed) {
+ auto localOperation = makeOpAndRecordId(3, 1);
+ auto remoteOperation = makeOpAndRecordId(2, 1);
+ auto result =
+ syncRollBackLocalOperations(
+ OplogInterfaceMock({localOperation}),
+ OplogInterfaceMock({remoteOperation}),
+ [&](const BSONObj& operation) {
+ return Status(ErrorCodes::OperationFailed, "");
+ });
+ ASSERT_EQUALS(ErrorCodes::OperationFailed, result.getStatus().code());
+ }
+
+ TEST(SyncRollBackLocalOperationsTest, DifferentTimestampEndOfRemoteOplog) {
+ auto commonOperation = makeOpAndRecordId(1, 1);
+ auto localOperation = makeOpAndRecordId(2, 1);
+ auto remoteOperation = makeOpAndRecordId(3, 1);
+ auto result =
+ syncRollBackLocalOperations(
+ OplogInterfaceMock({localOperation, commonOperation}),
+ OplogInterfaceMock({remoteOperation}),
+ [&](const BSONObj& operation) {
+ FAIL("Should not reach here");
+ return Status::OK();
+ });
+ ASSERT_EQUALS(ErrorCodes::NoMatchingDocument, result.getStatus().code());
+ ASSERT_STRING_CONTAINS(result.getStatus().reason(),
+ "RS100 reached beginning of remote oplog [1]");
+ }
+
+} // namespace
diff --git a/src/mongo/db/repl/rollback_source.h b/src/mongo/db/repl/rollback_source.h
new file mode 100644
index 00000000000..304a81717dc
--- /dev/null
+++ b/src/mongo/db/repl/rollback_source.h
@@ -0,0 +1,90 @@
+/**
+ * Copyright (C) 2015 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/base/disallow_copying.h"
+#include "mongo/base/status_with.h"
+#include "mongo/db/jsobj.h"
+
+namespace mongo {
+
+ class NamespaceString;
+ class OperationContext;
+
+namespace repl {
+
+ class OplogInterface;
+
+ /**
+ * Interface for rollback-related operations on the sync source.
+ */
+ class RollbackSource {
+ MONGO_DISALLOW_COPYING(RollbackSource);
+ public:
+
+ RollbackSource() = default;
+
+ virtual ~RollbackSource() = default;
+
+ /**
+ * Returns remote oplog interface.
+ * Read oplog entries with OplogInterface::makeIterator().
+ */
+ virtual const OplogInterface& getOplog() const = 0;
+
+ /**
+ * Returns rollback ID.
+ */
+ virtual int getRollbackId() const = 0;
+
+ /**
+ * Returns last operation in oplog.
+ */
+ virtual BSONObj getLastOperation() const = 0;
+
+ /**
+ * Fetch a single document from the sync source.
+ */
+ virtual BSONObj findOne(const NamespaceString& nss, const BSONObj& filter) const = 0;
+
+ /**
+ * Clones a single collection from the sync source.
+ */
+ virtual void copyCollectionFromRemote(OperationContext* txn,
+ const NamespaceString& nss) const = 0;
+
+ /**
+ * Returns collection info.
+ */
+ virtual StatusWith<BSONObj> getCollectionInfo(const NamespaceString& nss) const = 0;
+
+ };
+
+} // namespace repl
+} // namespace mongo
diff --git a/src/mongo/db/repl/rollback_source_impl.cpp b/src/mongo/db/repl/rollback_source_impl.cpp
new file mode 100644
index 00000000000..bdda5e331d9
--- /dev/null
+++ b/src/mongo/db/repl/rollback_source_impl.cpp
@@ -0,0 +1,98 @@
+/**
+ * Copyright (C) 2015 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/repl/rollback_source_impl.h"
+
+#include "mongo/client/dbclientinterface.h"
+#include "mongo/db/cloner.h"
+#include "mongo/db/jsobj.h"
+#include "mongo/db/namespace_string.h"
+#include "mongo/db/repl/oplogreader.h"
+#include "mongo/util/mongoutils/str.h"
+#include "mongo/util/assert_util.h"
+
+namespace mongo {
+namespace repl {
+
+ RollbackSourceImpl::RollbackSourceImpl(DBClientConnection* conn,
+ const std::string& collectionName)
+ : _conn(conn),
+ _collectionName(collectionName),
+ _oplog(conn, collectionName) { }
+
+ const OplogInterface& RollbackSourceImpl::getOplog() const {
+ return _oplog;
+ }
+
+ int RollbackSourceImpl::getRollbackId() const {
+ bo info;
+ _conn->simpleCommand("admin", &info, "replSetGetRBID");
+ return info["rbid"].numberInt();
+ }
+
+ BSONObj RollbackSourceImpl::getLastOperation() const {
+ const Query query = Query().sort(BSON("$natural" << -1));
+ return _conn->findOne(_collectionName, query, 0, QueryOption_SlaveOk);
+ }
+
+ BSONObj RollbackSourceImpl::findOne(const NamespaceString& nss, const BSONObj& filter) const {
+ return _conn->findOne(nss.toString(), filter, NULL, QueryOption_SlaveOk).getOwned();
+ }
+
+ void RollbackSourceImpl::copyCollectionFromRemote(OperationContext* txn,
+ const NamespaceString& nss) const {
+ std::string errmsg;
+ std::unique_ptr<DBClientConnection> tmpConn(new DBClientConnection());
+ uassert(15908,
+ errmsg,
+ tmpConn->connect(_conn->getServerHostAndPort(), errmsg) &&
+ replAuthenticate(tmpConn.get()));
+
+ // cloner owns _conn in unique_ptr
+ Cloner cloner;
+ cloner.setConnection(tmpConn.release());
+ uassert(15909, str::stream() <<
+ "replSet rollback error resyncing collection " << nss.ns() << ' ' << errmsg,
+ cloner.copyCollection(txn, nss.ns(), BSONObj(), errmsg, true, false, true));
+ }
+
+ StatusWith<BSONObj> RollbackSourceImpl::getCollectionInfo(const NamespaceString& nss) const {
+ std::list<BSONObj> info =
+ _conn->getCollectionInfos(nss.db().toString(), BSON("name" << nss.coll()));
+ if (info.empty()) {
+ return StatusWith<BSONObj>(ErrorCodes::NoSuchKey, str::stream() <<
+ "no collection info found: " << nss.ns());
+ }
+ invariant(info.size() == 1U);
+ return info.front();
+ }
+
+} // namespace repl
+} // namespace mongo
diff --git a/src/mongo/db/repl/rollback_source_impl.h b/src/mongo/db/repl/rollback_source_impl.h
new file mode 100644
index 00000000000..3be6ef4339b
--- /dev/null
+++ b/src/mongo/db/repl/rollback_source_impl.h
@@ -0,0 +1,74 @@
+/**
+ * Copyright (C) 2015 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <string>
+
+#include "mongo/db/repl/oplog_interface_remote.h"
+#include "mongo/db/repl/rollback_source.h"
+
+namespace mongo {
+
+ class DBClientConnection;
+
+namespace repl {
+
+ /**
+ * Rollback source implementation using a connection.
+ */
+
+ class RollbackSourceImpl : public RollbackSource {
+ public:
+
+ explicit RollbackSourceImpl(DBClientConnection* conn, const std::string& collectionName);
+
+ const OplogInterface& getOplog() const override;
+
+ int getRollbackId() const override;
+
+ BSONObj getLastOperation() const override;
+
+ BSONObj findOne(const NamespaceString& nss, const BSONObj& filter) const override;
+
+ void copyCollectionFromRemote(OperationContext* txn,
+ const NamespaceString& nss) const override;
+
+ StatusWith<BSONObj> getCollectionInfo(const NamespaceString& nss) const override;
+
+ private:
+
+ DBClientConnection* _conn;
+ std::string _collectionName;
+ OplogInterfaceRemote _oplog;
+
+ };
+
+
+} // namespace repl
+} // namespace mongo
diff --git a/src/mongo/db/repl/rs_rollback.cpp b/src/mongo/db/repl/rs_rollback.cpp
index 3ceda67ffec..0822cf934fd 100644
--- a/src/mongo/db/repl/rs_rollback.cpp
+++ b/src/mongo/db/repl/rs_rollback.cpp
@@ -34,6 +34,7 @@
#include "mongo/db/repl/rs_rollback.h"
#include <boost/shared_ptr.hpp>
+#include <memory>
#include "mongo/db/auth/authorization_manager_global.h"
#include "mongo/db/auth/authorization_manager.h"
@@ -41,13 +42,11 @@
#include "mongo/db/catalog/collection_catalog_entry.h"
#include "mongo/db/catalog/document_validation.h"
#include "mongo/db/client.h"
-#include "mongo/db/cloner.h"
#include "mongo/db/commands.h"
#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/dbhelpers.h"
#include "mongo/db/exec/working_set_common.h"
#include "mongo/db/db_raii.h"
-#include "mongo/db/operation_context_impl.h"
#include "mongo/db/ops/delete.h"
#include "mongo/db/ops/update.h"
#include "mongo/db/ops/update_lifecycle_impl.h"
@@ -56,9 +55,11 @@
#include "mongo/db/repl/bgsync.h"
#include "mongo/db/repl/minvalid.h"
#include "mongo/db/repl/oplog.h"
-#include "mongo/db/repl/oplogreader.h"
+#include "mongo/db/repl/oplog_interface.h"
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/repl/replication_coordinator_impl.h"
+#include "mongo/db/repl/roll_back_local_operations.h"
+#include "mongo/db/repl/rollback_source.h"
#include "mongo/db/repl/rslog.h"
#include "mongo/util/log.h"
@@ -159,18 +160,10 @@ namespace {
};
- /** helper to get rollback id from another server. */
- int getRBID(DBClientConnection *c) {
- bo info;
- c->simpleCommand("admin", &info, "replSetGetRBID");
- return info["rbid"].numberInt();
- }
-
-
- void refetch(FixUpInfo& fixUpInfo, const BSONObj& ourObj) {
+ Status refetch(FixUpInfo& fixUpInfo, const BSONObj& ourObj) {
const char* op = ourObj.getStringField("op");
if (*op == 'n')
- return;
+ return Status::OK();
if (ourObj.objsize() > 512 * 1024 * 1024)
throw RSFatalException("rollback too large");
@@ -181,13 +174,13 @@ namespace {
if (*doc.ns == '\0') {
warning() << "ignoring op on rollback no ns TODO : "
<< doc.ownedObj.toString();
- return;
+ return Status::OK();
}
BSONObj obj = doc.ownedObj.getObjectField(*op=='u' ? "o2" : "o");
if (obj.isEmpty()) {
warning() << "ignoring op on rollback : " << doc.ownedObj.toString();
- return;
+ return Status::OK();
}
if (*op == 'c') {
@@ -197,19 +190,21 @@ namespace {
Command* cmd = Command::findCommand(cmdname.c_str());
if (cmd == NULL) {
severe() << "rollback no such command " << first.fieldName();
- fassertFailedNoTrace(18751);
+ return Status(ErrorCodes::UnrecoverableRollbackError, str::stream() <<
+ "rollback no such command " << first.fieldName(),
+ 18751);
}
if (cmdname == "create") {
// Create collection operation
// { ts: ..., h: ..., op: "c", ns: "foo.$cmd", o: { create: "abc", ... } }
string ns = nss.db().toString() + '.' + obj["create"].String(); // -> foo.abc
fixUpInfo.toDrop.insert(ns);
- return;
+ return Status::OK();
}
else if (cmdname == "drop") {
string ns = nss.db().toString() + '.' + first.valuestr();
fixUpInfo.collectionsToResyncData.insert(ns);
- return;
+ return Status::OK();
}
else if (cmdname == "dropIndexes" || cmdname == "deleteIndexes") {
// TODO: this is bad. we simply full resync the collection here,
@@ -218,7 +213,7 @@ namespace {
<< "mongod";
string ns = nss.db().toString() + '.' + first.valuestr();
fixUpInfo.collectionsToResyncData.insert(ns);
- return;
+ return Status::OK();
}
else if (cmdname == "renameCollection") {
// TODO: slow.
@@ -228,7 +223,7 @@ namespace {
string to = obj["to"].String();
fixUpInfo.collectionsToResyncData.insert(from);
fixUpInfo.collectionsToResyncData.insert(to);
- return;
+ return Status::OK();
}
else if (cmdname == "dropDatabase") {
severe() << "rollback : can't rollback drop database full resync "
@@ -267,144 +262,18 @@ namespace {
if (doc._id.eoo()) {
warning() << "ignoring op on rollback no _id TODO : " << doc.ns << ' '
<< doc.ownedObj.toString();
- return;
+ return Status::OK();
}
fixUpInfo.toRefetch.insert(doc);
+ return Status::OK();
}
- StatusWith<FixUpInfo> syncRollbackFindCommonPoint(OperationContext* txn,
- DBClientConnection* them) {
- OldClientContext ctx(txn, rsOplogName);
- FixUpInfo fixUpInfo;
-
- boost::scoped_ptr<PlanExecutor> exec(
- InternalPlanner::collectionScan(txn,
- rsOplogName,
- ctx.db()->getCollection(rsOplogName),
- InternalPlanner::BACKWARD));
-
- BSONObj ourObj;
- RecordId ourLoc;
-
- if (PlanExecutor::ADVANCED != exec->getNext(&ourObj, &ourLoc)) {
- return StatusWith<FixUpInfo>(ErrorCodes::OplogStartMissing, "no oplog during initsync");
- }
-
- const Query query = Query().sort(reverseNaturalObj);
- const BSONObj fields = BSON("ts" << 1 << "h" << 1);
-
- //auto_ptr<DBClientCursor> u = us->query(rsOplogName, query, 0, 0, &fields, 0, 0);
-
- fixUpInfo.rbid = getRBID(them);
- auto_ptr<DBClientCursor> oplogCursor = them->query(rsOplogName, query, 0, 0, &fields, 0, 0);
-
- if (oplogCursor.get() == NULL || !oplogCursor->more())
- throw RSFatalException("remote oplog empty or unreadable");
-
- Timestamp ourTime = ourObj["ts"].timestamp();
- BSONObj theirObj = oplogCursor->nextSafe();
- Timestamp theirTime = theirObj["ts"].timestamp();
-
- long long diff = static_cast<long long>(ourTime.getSecs())
- - static_cast<long long>(theirTime.getSecs());
- // diff could be positive, negative, or zero
- log() << "rollback our last optime: " << ourTime.toStringPretty();
- log() << "rollback their last optime: " << theirTime.toStringPretty();
- log() << "rollback diff in end of log times: " << diff << " seconds";
- if (diff > 1800) {
- severe() << "rollback too long a time period for a rollback.";
- throw RSFatalException("rollback error: not willing to roll back "
- "more than 30 minutes of data");
- }
-
- unsigned long long scanned = 0;
- while (1) {
- scanned++;
- // todo add code to assure no excessive scanning for too long
- if (ourTime == theirTime) {
- if (ourObj["h"].Long() == theirObj["h"].Long()) {
- // found the point back in time where we match.
- // todo : check a few more just to be careful about hash collisions.
- log() << "rollback found matching events at "
- << ourTime.toStringPretty();
- log() << "rollback findcommonpoint scanned : " << scanned;
- fixUpInfo.commonPoint = ourTime;
- fixUpInfo.commonPointOurDiskloc = ourLoc;
- break;
- }
-
- refetch(fixUpInfo, ourObj);
-
- if (!oplogCursor->more()) {
- severe() << "rollback error RS100 reached beginning of remote oplog";
- log() << " them: " << them->toString() << " scanned: " << scanned;
- log() << " theirTime: " << theirTime.toStringLong();
- log() << " ourTime: " << ourTime.toStringLong();
- throw RSFatalException("RS100 reached beginning of remote oplog [2]");
- }
- theirObj = oplogCursor->nextSafe();
- theirTime = theirObj["ts"].timestamp();
-
- if (PlanExecutor::ADVANCED != exec->getNext(&ourObj, &ourLoc)) {
- severe() << "rollback error RS101 reached beginning of local oplog";
- log() << " them: " << them->toString() << " scanned: " << scanned;
- log() << " theirTime: " << theirTime.toStringLong();
- log() << " ourTime: " << ourTime.toStringLong();
- throw RSFatalException("RS101 reached beginning of local oplog [1]");
- }
- ourTime = ourObj["ts"].timestamp();
- }
- else if (theirTime > ourTime) {
- if (!oplogCursor->more()) {
- severe() << "rollback error RS100 reached beginning of remote oplog";
- log() << " them: " << them->toString() << " scanned: "
- << scanned;
- log() << " theirTime: " << theirTime.toStringLong();
- log() << " ourTime: " << ourTime.toStringLong();
- throw RSFatalException("RS100 reached beginning of remote oplog [1]");
- }
- theirObj = oplogCursor->nextSafe();
- theirTime = theirObj["ts"].timestamp();
- }
- else {
- // theirTime < ourTime
- refetch(fixUpInfo, ourObj);
- if (PlanExecutor::ADVANCED != exec->getNext(&ourObj, &ourLoc)) {
- severe() << "rollback error RS101 reached beginning of local oplog";
- log() << " them: " << them->toString() << " scanned: " << scanned;
- log() << " theirTime: " << theirTime.toStringLong();
- log() << " ourTime: " << ourTime.toStringLong();
- throw RSFatalException("RS101 reached beginning of local oplog [2]");
- }
- ourTime = ourObj["ts"].timestamp();
- }
- }
-
- return StatusWith<FixUpInfo>(fixUpInfo);
- }
-
- bool copyCollectionFromRemote(OperationContext* txn,
- const string& host,
- const string& ns,
- string& errmsg) {
- Cloner cloner;
-
- DBClientConnection *tmpConn = new DBClientConnection();
- // cloner owns _conn in auto_ptr
- cloner.setConnection(tmpConn);
- uassert(15908, errmsg,
- tmpConn->connect(HostAndPort(host), errmsg) && replAuthenticate(tmpConn));
-
- return cloner.copyCollection(txn, ns, BSONObj(), errmsg, true, false, true);
- }
void syncFixUp(OperationContext* txn,
FixUpInfo& fixUpInfo,
- OplogReader* oplogreader,
+ const RollbackSource& rollbackSource,
ReplicationCoordinator* replCoord) {
- DBClientConnection* them = oplogreader->conn();
-
// fetch all first so we needn't handle interruption in a fancy way
unsigned long long totalSize = 0;
@@ -427,8 +296,7 @@ namespace {
{
// TODO : slow. lots of round trips.
numFetched++;
- BSONObj good = them->findOne(doc.ns, doc._id.wrap(),
- NULL, QueryOption_SlaveOk).getOwned();
+ BSONObj good = rollbackSource.findOne(NamespaceString(doc.ns), doc._id.wrap());
totalSize += good.objsize();
uassert(13410, "replSet too much data to roll back",
totalSize < 300 * 1024 * 1024);
@@ -437,13 +305,13 @@ namespace {
goodVersions.push_back(pair<DocID, BSONObj>(doc,good));
}
}
- newMinValid = oplogreader->getLastOp(rsOplogName);
+ newMinValid = rollbackSource.getLastOperation();
if (newMinValid.isEmpty()) {
error() << "rollback error newMinValid empty?";
return;
}
}
- catch (DBException& e) {
+ catch (const DBException& e) {
LOG(1) << "rollback re-get objects: " << e.toString();
error() << "rollback couldn't re-get ns:" << doc.ns << " _id:" << doc._id << ' '
<< numFetched << '/' << fixUpInfo.toRefetch.size();
@@ -451,9 +319,10 @@ namespace {
}
log() << "rollback 3.5";
- if (fixUpInfo.rbid != getRBID(oplogreader->conn())) {
- // our source rolled back itself. so the data we received isn't necessarily consistent.
- warning() << "rollback rbid on source changed during rollback, cancelling this attempt";
+ if (fixUpInfo.rbid != rollbackSource.getRollbackId()) {
+ // Our source rolled back itself so the data we received isn't necessarily consistent.
+ warning() << "rollback rbid on source changed during rollback, "
+ << "cancelling this attempt";
return;
}
@@ -492,8 +361,6 @@ namespace {
}
{
- string errmsg;
-
// This comes as a GlobalWrite lock, so there is no DB to be acquired after
// resume, so we can skip the DB stability checks. Also
// copyCollectionFromRemote will acquire its own database pointer, under the
@@ -501,9 +368,7 @@ namespace {
invariant(txn->lockState()->isW());
Lock::TempRelease release(txn->lockState());
- bool ok = copyCollectionFromRemote(txn, them->getServerAddress(), ns, errmsg);
- uassert(15909, str::stream() << "replSet rollback error resyncing collection "
- << ns << ' ' << errmsg, ok);
+ rollbackSource.copyCollectionFromRemote(txn, nss);
}
}
@@ -517,30 +382,28 @@ namespace {
invariant(collection);
auto cce = collection->getCatalogEntry();
- const std::list<BSONObj> info =
- them->getCollectionInfos(nss.db().toString(), BSON("name" << nss.coll()));
+ auto infoResult = rollbackSource.getCollectionInfo(nss);
- if (info.empty()) {
+ if (!infoResult.isOK()) {
// Collection dropped by "them" so we should drop it too.
log() << ns << " not found on remote host, dropping";
fixUpInfo.toDrop.insert(ns);
continue;
}
- invariant(info.size() == 1);
-
+ auto info = infoResult.getValue();
CollectionOptions options;
- if (auto optionsField = info.front()["options"]) {
+ if (auto optionsField = info["options"]) {
if (optionsField.type() != Object) {
throw RSFatalException(str::stream() << "Failed to parse options "
- << info.front() << ": expected 'options' to be an "
+ << info << ": expected 'options' to be an "
<< "Object, got " << typeName(optionsField.type()));
}
auto status = options.parse(optionsField.Obj());
if (!status.isOK()) {
throw RSFatalException(str::stream() << "Failed to parse options "
- << info.front() << ": "
+ << info << ": "
<< status.toString());
}
}
@@ -567,7 +430,7 @@ namespace {
string err;
try {
- newMinValid = oplogreader->getLastOp(rsOplogName);
+ newMinValid = rollbackSource.getLastOperation();
if (newMinValid.isEmpty()) {
err = "can't get minvalid from sync source";
}
@@ -577,11 +440,11 @@ namespace {
setMinValid(txn, minValid);
}
}
- catch (DBException& e) {
+ catch (const DBException& e) {
err = "can't get/set minvalid: ";
err += e.what();
}
- if (fixUpInfo.rbid != getRBID(oplogreader->conn())) {
+ if (fixUpInfo.rbid != rollbackSource.getRollbackId()) {
// our source rolled back itself. so the data we received isn't necessarily
// consistent. however, we've now done writes. thus we have a problem.
err += "rbid at primary changed during resync/rollback";
@@ -723,7 +586,7 @@ namespace {
try {
collection->temp_cappedTruncateAfter(txn, loc, true);
}
- catch (DBException& e) {
+ catch (const DBException& e) {
if (e.getCode() == 13415) {
// hack: need to just make cappedTruncate do this...
MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
@@ -741,7 +604,7 @@ namespace {
}
}
}
- catch (DBException& e) {
+ catch (const DBException& e) {
error() << "rolling back capped collection rec "
<< doc.ns << ' ' << e.toString();
}
@@ -759,17 +622,16 @@ namespace {
// exists on the source.
if (collection->numRecords(txn) == 0) {
try {
- std::list<BSONObj> lst =
- them->getCollectionInfos( ctx.db()->name(),
- BSON( "name" << nsToCollectionSubstring( doc.ns ) ) );
- if (lst.empty()) {
+ NamespaceString nss(doc.ns);
+ auto infoResult = rollbackSource.getCollectionInfo(nss);
+ if (!infoResult.isOK()) {
// we should drop
WriteUnitOfWork wunit(txn);
ctx.db()->dropCollection(txn, doc.ns);
wunit.commit();
}
}
- catch (DBException&) {
+ catch (const DBException&) {
// this isn't *that* big a deal, but is bad.
warning() << "rollback error querying for existence of "
<< doc.ns << " at the primary, ignoring";
@@ -796,7 +658,7 @@ namespace {
}
}
- catch (DBException& e) {
+ catch (const DBException& e) {
log() << "exception in rollback ns:" << doc.ns << ' ' << pattern.toString()
<< ' ' << e.toString() << " ndeletes:" << deletes;
warn = true;
@@ -831,17 +693,22 @@ namespace {
log() << "rollback done";
}
- unsigned _syncRollback(OperationContext* txn,
- OplogReader* oplogreader,
- ReplicationCoordinator* replCoord) {
+ Status _syncRollback(OperationContext* txn,
+ const OplogInterface& localOplog,
+ const RollbackSource& rollbackSource,
+ ReplicationCoordinator* replCoord,
+ const SleepSecondsFn& sleepSecondsFn,
+ Milliseconds globalWriteLockTimeoutMs) {
invariant(!txn->lockState()->isLocked());
log() << "rollback 0";
- Lock::GlobalWrite globalWrite(txn->lockState(), 20000);
- if (!globalWrite.isLocked()) {
- warning() << "rollback couldn't get write lock in a reasonable time";
- return 2;
+ std::unique_ptr<Lock::GlobalWrite> globalWrite(
+ new Lock::GlobalWrite(txn->lockState(), globalWriteLockTimeoutMs.count()));
+ if (!globalWrite->isLocked()) {
+ sleepSecondsFn(Seconds(2));
+ return Status(ErrorCodes::LockTimeout,
+ "rollback couldn't get write lock in a reasonable time");
}
/** by doing this, we will not service reads (return an error as we aren't in secondary
@@ -851,45 +718,56 @@ namespace {
* also, this is better for status reporting - we know what is happening.
*/
if (!replCoord->setFollowerMode(MemberState::RS_ROLLBACK)) {
- warning() << "Cannot transition from " << replCoord->getMemberState() <<
- " to " << MemberState(MemberState::RS_ROLLBACK);
- return 0;
+ return Status(ErrorCodes::OperationFailed, str::stream() <<
+ "Cannot transition from " << replCoord->getMemberState().toString() <<
+ " to " << MemberState(MemberState::RS_ROLLBACK).toString());
}
FixUpInfo how;
log() << "rollback 1";
+ how.rbid = rollbackSource.getRollbackId();
{
- oplogreader->resetCursor();
-
log() << "rollback 2 FindCommonPoint";
try {
- StatusWith<FixUpInfo> res = syncRollbackFindCommonPoint(txn, oplogreader->conn());
+ auto processOperationForFixUp = [&how](const BSONObj& operation) {
+ return refetch(how, operation);
+ };
+ auto res = syncRollBackLocalOperations(
+ localOplog,
+ rollbackSource.getOplog(),
+ processOperationForFixUp);
if (!res.isOK()) {
switch (res.getStatus().code()) {
case ErrorCodes::OplogStartMissing:
- return 1;
+ case ErrorCodes::UnrecoverableRollbackError:
+ globalWrite.reset();
+ sleepSecondsFn(Seconds(1));
+ return res.getStatus();
default:
- throw new RSFatalException(res.getStatus().toString());
+ throw RSFatalException(res.getStatus().toString());
}
}
else {
- how = res.getValue();
+ how.commonPoint = res.getValue().first;
+ how.commonPointOurDiskloc = res.getValue().second;
}
}
- catch (RSFatalException& e) {
+ catch (const RSFatalException& e) {
error() << string(e.what());
- fassertFailedNoTrace(18752);
- return 2;
+ return Status(ErrorCodes::UnrecoverableRollbackError, str::stream() <<
+ "need to rollback, but unable to determine common point between"
+ "local and remote oplog: " << e.what(),
+ 18752);
}
- catch (DBException& e) {
+ catch (const DBException& e) {
warning() << "rollback 2 exception " << e.toString() << "; sleeping 1 min";
// Release the GlobalWrite lock while sleeping. We should always come here with a
// GlobalWrite lock
invariant(txn->lockState()->isW());
- Lock::TempRelease(txn->lockState());
+ globalWrite.reset();
- sleepsecs(60);
+ sleepSecondsFn(Seconds(60));
throw;
}
}
@@ -898,12 +776,13 @@ namespace {
replCoord->incrementRollbackID();
try {
- syncFixUp(txn, how, oplogreader, replCoord);
+ syncFixUp(txn, how, rollbackSource, replCoord);
}
- catch (RSFatalException& e) {
+ catch (const RSFatalException& e) {
error() << "exception during rollback: " << e.what();
- fassertFailedNoTrace(18753);
- return 2;
+ return Status(ErrorCodes::UnrecoverableRollbackError, str::stream() <<
+ "exception during rollback: " << e.what(),
+ 18753);
}
catch (...) {
replCoord->incrementRollbackID();
@@ -927,24 +806,35 @@ namespace {
"but found self in " << replCoord->getMemberState();
}
- return 0;
+ return Status::OK();
}
+
+ const Milliseconds kDefaultGlobalWriteLockTimeoutMs(20000);
+
} // namespace
- void syncRollback(OperationContext* txn,
- const OpTime& lastOpTimeApplied,
- OplogReader* oplogreader,
- ReplicationCoordinator* replCoord) {
+ Status syncRollback(OperationContext* txn,
+ const OpTime& lastOpTimeApplied,
+ const OplogInterface& localOplog,
+ const RollbackSource& rollbackSource,
+ ReplicationCoordinator* replCoord,
+ const SleepSecondsFn& sleepSecondsFn,
+ Milliseconds globalWriteLockTimeoutMs) {
+
+ invariant(txn);
+ invariant(replCoord);
+
// check that we are at minvalid, otherwise we cannot rollback as we may be in an
// inconsistent state
{
OpTime minvalid = getMinValid(txn);
if( minvalid > lastOpTimeApplied ) {
severe() << "need to rollback, but in inconsistent state" << endl;
- log() << "minvalid: " << minvalid.toString() << " our last optime: "
- << lastOpTimeApplied.toString() << endl;
- fassertFailedNoTrace(18750);
- return;
+ return Status(ErrorCodes::UnrecoverableRollbackError, str::stream() <<
+ "need to rollback, but in inconsistent state. " <<
+ "minvalid: " << minvalid.toString() << " our last optime: " <<
+ lastOpTimeApplied.toString(),
+ 18750);
}
}
@@ -952,11 +842,30 @@ namespace {
DisableDocumentValidation validationDisabler(txn);
txn->setReplicatedWrites(false);
- unsigned s = _syncRollback(txn, oplogreader, replCoord);
- if (s)
- sleepsecs(s);
+ Status status = _syncRollback(txn,
+ localOplog,
+ rollbackSource,
+ replCoord,
+ sleepSecondsFn,
+ globalWriteLockTimeoutMs);
log() << "rollback finished" << rsLog;
+ return status;
+ }
+
+ Status syncRollback(OperationContext* txn,
+ const OpTime& lastOpTimeWritten,
+ const OplogInterface& localOplog,
+ const RollbackSource& rollbackSource,
+ ReplicationCoordinator* replCoord) {
+
+ return syncRollback(txn,
+ lastOpTimeWritten,
+ localOplog,
+ rollbackSource,
+ replCoord,
+ [](Seconds seconds) { sleepsecs(seconds.count()); },
+ kDefaultGlobalWriteLockTimeoutMs);
}
} // namespace repl
diff --git a/src/mongo/db/repl/rs_rollback.h b/src/mongo/db/repl/rs_rollback.h
index bfe2c0c5621..7f4912c080f 100644
--- a/src/mongo/db/repl/rs_rollback.h
+++ b/src/mongo/db/repl/rs_rollback.h
@@ -28,14 +28,25 @@
#pragma once
+#include "mongo/base/disallow_copying.h"
+#include "mongo/base/status.h"
+#include "mongo/base/status_with.h"
+#include "mongo/db/jsobj.h"
+#include "mongo/stdx/functional.h"
+#include "mongo/util/time_support.h"
+
namespace mongo {
+
+ class DBClientConnection;
+ class NamespaceString;
class OperationContext;
- class Timestamp;
namespace repl {
- class OplogReader;
+
+ class OplogInterface;
class OpTime;
class ReplicationCoordinator;
+ class RollbackSource;
/**
* Initiates the rollback process.
@@ -53,16 +64,30 @@ namespace repl {
*
* @param txn Used to read and write from this node's databases
* @param lastOpTimeWritten The last OpTime applied by the applier
- * @param oplogreader Must already be connected to a sync source. Used to fetch documents.
+ * @param localOplog reads the oplog on this server.
+ * @param rollbackSource interface for sync source:
+ * provides oplog; and
+ * supports fetching documents and copying collections.
* @param replCoord Used to track the rollback ID and to change the follower state
*
- * Failures: some failure cases are fatal; others throw std::exception.
+ * Failures: Most failures are returned as a status but some failures throw an std::exception.
*/
- void syncRollback(OperationContext* txn,
- const OpTime& lastOpTimeWritten,
- OplogReader* oplogreader,
- ReplicationCoordinator* replCoord);
+ using SleepSecondsFn = stdx::function<void (Seconds)>;
+
+ Status syncRollback(OperationContext* txn,
+ const OpTime& lastOpTimeWritten,
+ const OplogInterface& localOplog,
+ const RollbackSource& rollbackSource,
+ ReplicationCoordinator* replCoord,
+ const SleepSecondsFn& sleepSecondsFn,
+ Milliseconds globalWriteLockTimeoutMs);
+
+ Status syncRollback(OperationContext* txn,
+ const OpTime& lastOpTimeWritten,
+ const OplogInterface& localOplog,
+ const RollbackSource& rollbackSource,
+ ReplicationCoordinator* replCoord);
-}
-}
+} // namespace repl
+} // namespace mongo
diff --git a/src/mongo/db/repl/rs_rollback_test.cpp b/src/mongo/db/repl/rs_rollback_test.cpp
new file mode 100644
index 00000000000..08b0046b4e0
--- /dev/null
+++ b/src/mongo/db/repl/rs_rollback_test.cpp
@@ -0,0 +1,541 @@
+/**
+ * Copyright 2015 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include <list>
+#include <memory>
+#include <utility>
+
+#include "mongo/db/catalog/database.h"
+#include "mongo/db/catalog/database_holder.h"
+#include "mongo/db/concurrency/d_concurrency.h"
+#include "mongo/db/client.h"
+#include "mongo/db/jsobj.h"
+#include "mongo/db/repl/minvalid.h"
+#include "mongo/db/repl/operation_context_repl_mock.h"
+#include "mongo/db/repl/oplog.h"
+#include "mongo/db/repl/oplog_interface.h"
+#include "mongo/db/repl/oplog_interface_mock.h"
+#include "mongo/db/repl/replication_coordinator_global.h"
+#include "mongo/db/repl/replication_coordinator_mock.h"
+#include "mongo/db/repl/rs_rollback.h"
+#include "mongo/db/repl/rollback_source.h"
+#include "mongo/db/service_context.h"
+#include "mongo/db/storage_options.h"
+#include "mongo/unittest/unittest.h"
+#include "mongo/unittest/temp_dir.h"
+
+namespace {
+
+ using namespace mongo;
+ using namespace mongo::repl;
+
+ const Milliseconds globalWriteLockTimeoutMs(10);
+ const OplogInterfaceMock::Operations kEmptyMockOperations;
+
+ class OperationContextRollbackMock : public OperationContextReplMock {
+ public:
+ Client* getClient() const override;
+ };
+
+ Client* OperationContextRollbackMock::getClient() const {
+ Client::initThreadIfNotAlready();
+ return &cc();
+ }
+
+ ReplSettings createReplSettings() {
+ ReplSettings settings;
+ settings.oplogSize = 5 * 1024 * 1024;
+ settings.replSet = "mySet/node1:12345";
+ return settings;
+ }
+
+ class ReplicationCoordinatorRollbackMock : public ReplicationCoordinatorMock {
+ public:
+ ReplicationCoordinatorRollbackMock();
+ void resetLastOpTimeFromOplog(OperationContext* txn) override;
+ };
+
+ ReplicationCoordinatorRollbackMock::ReplicationCoordinatorRollbackMock()
+ : ReplicationCoordinatorMock(createReplSettings()) { }
+
+ void ReplicationCoordinatorRollbackMock::resetLastOpTimeFromOplog(OperationContext* txn) { }
+
+ class RollbackSourceMock : public RollbackSource {
+ public:
+ RollbackSourceMock(std::unique_ptr<OplogInterface> oplog);
+ int getRollbackId() const override;
+ const OplogInterface& getOplog() const override;
+ BSONObj getLastOperation() const override;
+ BSONObj findOne(const NamespaceString& nss, const BSONObj& filter) const override;
+ void copyCollectionFromRemote(OperationContext* txn,
+ const NamespaceString& nss) const override;
+ StatusWith<BSONObj> getCollectionInfo(const NamespaceString& nss) const override;
+ private:
+ std::unique_ptr<OplogInterface> _oplog;
+ };
+
+ RollbackSourceMock::RollbackSourceMock(std::unique_ptr<OplogInterface> oplog)
+ : _oplog(std::move(oplog)) { }
+
+ const OplogInterface& RollbackSourceMock::getOplog() const {
+ return *_oplog;
+ }
+
+ int RollbackSourceMock::getRollbackId() const {
+ return 0;
+ }
+
+ BSONObj RollbackSourceMock::getLastOperation() const {
+ auto iter = _oplog->makeIterator();
+ auto result = iter->next();
+ ASSERT_OK(result.getStatus());
+ return result.getValue().first;
+ }
+
+ BSONObj RollbackSourceMock::findOne(const NamespaceString& nss, const BSONObj& filter) const {
+ return BSONObj();
+ }
+
+ void RollbackSourceMock::copyCollectionFromRemote(OperationContext* txn,
+ const NamespaceString& nss) const { }
+
+ StatusWith<BSONObj> RollbackSourceMock::getCollectionInfo(const NamespaceString& nss) const {
+ return BSON("name" << nss.ns() << "options" << BSONObj());
+ }
+
+ class RSRollbackTest : public unittest::Test {
+ public:
+ RSRollbackTest();
+
+ protected:
+
+ std::unique_ptr<OperationContext> _txn;
+ std::unique_ptr<ReplicationCoordinator> _coordinator;
+
+ private:
+ void setUp() override;
+ void tearDown() override;
+
+ ReplicationCoordinator* _prevCoordinator;
+ };
+
+ RSRollbackTest::RSRollbackTest() : _prevCoordinator(nullptr) { }
+
+ void RSRollbackTest::setUp() {
+ ServiceContext* serviceContext = getGlobalServiceContext();
+ if (!serviceContext->getGlobalStorageEngine()) {
+ // When using the 'devnull' storage engine, it is fine for the temporary directory to
+ // go away after the global storage engine is initialized.
+ unittest::TempDir tempDir("rs_rollback_test");
+ mongo::storageGlobalParams.dbpath = tempDir.path();
+ mongo::storageGlobalParams.dbpath = tempDir.path();
+ mongo::storageGlobalParams.engine = "inMemoryExperiment";
+ mongo::storageGlobalParams.engineSetByUser = true;
+ serviceContext->initializeGlobalStorageEngine();
+ }
+
+ Client::initThreadIfNotAlready();
+ _txn.reset(new OperationContextRollbackMock());
+ _coordinator.reset(new ReplicationCoordinatorRollbackMock());
+
+ _prevCoordinator = getGlobalReplicationCoordinator();
+ setGlobalReplicationCoordinator(_coordinator.get());
+
+ setOplogCollectionName();
+ }
+
+ void RSRollbackTest::tearDown() {
+ {
+ Lock::GlobalWrite globalLock(_txn->lockState());
+ BSONObjBuilder unused;
+ invariant(mongo::dbHolder().closeAll(_txn.get(), unused, false));
+ }
+ setGlobalReplicationCoordinator(_prevCoordinator);
+ _coordinator.reset();
+ _txn.reset();
+ }
+
+ void noSleep(Seconds seconds) {}
+
+ TEST_F(RSRollbackTest, InconsistentMinValid) {
+ repl::setMinValid(_txn.get(), OpTime(Timestamp(Seconds(1), 0), 0));
+ auto status =
+ syncRollback(
+ _txn.get(),
+ OpTime(),
+ OplogInterfaceMock(kEmptyMockOperations),
+ RollbackSourceMock(std::unique_ptr<OplogInterface>(
+ new OplogInterfaceMock(kEmptyMockOperations))),
+ _coordinator.get(),
+ noSleep,
+ globalWriteLockTimeoutMs);
+ ASSERT_EQUALS(ErrorCodes::UnrecoverableRollbackError, status.code());
+ ASSERT_EQUALS(18750, status.location());
+ }
+
+ TEST_F(RSRollbackTest, CannotObtainGlobalWriteLock) {
+ OperationContextReplMock txn2;
+ Lock::DBLock dbLock(txn2.lockState(), "test", MODE_X);
+ ASSERT_FALSE(_txn->lockState()->isLocked());
+ ASSERT_EQUALS(
+ ErrorCodes::LockTimeout,
+ syncRollback(
+ _txn.get(),
+ OpTime(),
+ OplogInterfaceMock(kEmptyMockOperations),
+ RollbackSourceMock(std::unique_ptr<OplogInterface>(
+ new OplogInterfaceMock(kEmptyMockOperations))),
+ _coordinator.get(),
+ noSleep,
+ globalWriteLockTimeoutMs).code());
+ }
+
+ TEST_F(RSRollbackTest, SetFollowerModeFailed) {
+ class ReplicationCoordinatorSetFollowerModeMock : public ReplicationCoordinatorMock {
+ public:
+ ReplicationCoordinatorSetFollowerModeMock()
+ : ReplicationCoordinatorMock(createReplSettings()) { }
+ MemberState getMemberState() const override { return MemberState::RS_DOWN; }
+ bool setFollowerMode(const MemberState& newState) override { return false; }
+ };
+ _coordinator.reset(new ReplicationCoordinatorSetFollowerModeMock());
+ setGlobalReplicationCoordinator(_coordinator.get());
+
+ ASSERT_EQUALS(
+ ErrorCodes::OperationFailed,
+ syncRollback(
+ _txn.get(),
+ OpTime(),
+ OplogInterfaceMock(kEmptyMockOperations),
+ RollbackSourceMock(std::unique_ptr<OplogInterface>(
+ new OplogInterfaceMock(kEmptyMockOperations))),
+ _coordinator.get(),
+ noSleep,
+ globalWriteLockTimeoutMs).code());
+ }
+
+ TEST_F(RSRollbackTest, OplogStartMissing) {
+ OpTime ts(Timestamp(Seconds(1), 0), 0);
+ auto operation =
+ std::make_pair(BSON("ts" << ts.getTimestamp() << "h" << ts.getTerm()), RecordId());
+ ASSERT_EQUALS(
+ ErrorCodes::OplogStartMissing,
+ syncRollback(
+ _txn.get(),
+ OpTime(),
+ OplogInterfaceMock(kEmptyMockOperations),
+ RollbackSourceMock(std::unique_ptr<OplogInterface>(new OplogInterfaceMock({
+ operation,
+ }))),
+ _coordinator.get(),
+ noSleep,
+ globalWriteLockTimeoutMs).code());
+ }
+
+ TEST_F(RSRollbackTest, NoRemoteOpLog) {
+ OpTime ts(Timestamp(Seconds(1), 0), 0);
+ auto operation =
+ std::make_pair(BSON("ts" << ts.getTimestamp() << "h" << ts.getTerm()), RecordId());
+ auto status =
+ syncRollback(
+ _txn.get(),
+ ts,
+ OplogInterfaceMock({operation}),
+ RollbackSourceMock(std::unique_ptr<OplogInterface>(
+ new OplogInterfaceMock(kEmptyMockOperations))),
+ _coordinator.get(),
+ noSleep,
+ globalWriteLockTimeoutMs);
+ ASSERT_EQUALS(ErrorCodes::UnrecoverableRollbackError, status.code());
+ ASSERT_EQUALS(18752, status.location());
+ }
+
+ TEST_F(RSRollbackTest, RemoteGetRollbackIdThrows) {
+ OpTime ts(Timestamp(Seconds(1), 0), 0);
+ auto operation =
+ std::make_pair(BSON("ts" << ts.getTimestamp() << "h" << ts.getTerm()), RecordId());
+ class RollbackSourceLocal : public RollbackSourceMock {
+ public:
+ RollbackSourceLocal(std::unique_ptr<OplogInterface> oplog)
+ : RollbackSourceMock(std::move(oplog)) { }
+ int getRollbackId() const override {
+ uassert(ErrorCodes::UnknownError, "getRollbackId() failed", false);
+ }
+ };
+ ASSERT_THROWS_CODE(
+ syncRollback(
+ _txn.get(),
+ ts,
+ OplogInterfaceMock({operation}),
+ RollbackSourceLocal(std::unique_ptr<OplogInterface>(
+ new OplogInterfaceMock(kEmptyMockOperations))),
+ _coordinator.get(),
+ noSleep,
+ globalWriteLockTimeoutMs),
+ UserException,
+ ErrorCodes::UnknownError);
+ }
+
+ TEST_F(RSRollbackTest, BothOplogsAtCommonPoint) {
+ createOplog(_txn.get());
+ OpTime ts(Timestamp(Seconds(1), 0), 1);
+ auto operation =
+ std::make_pair(BSON("ts" << ts.getTimestamp() << "h" << ts.getTerm()), RecordId(1));
+ ASSERT_OK(
+ syncRollback(
+ _txn.get(),
+ ts,
+ OplogInterfaceMock({operation}),
+ RollbackSourceMock(std::unique_ptr<OplogInterface>(new OplogInterfaceMock({
+ operation,
+ }))),
+ _coordinator.get(),
+ noSleep,
+ globalWriteLockTimeoutMs));
+ }
+
+ TEST_F(RSRollbackTest, FetchDeletedDocumentFromSource) {
+ createOplog(_txn.get());
+ auto commonOperation =
+ std::make_pair(BSON("ts" << Timestamp(Seconds(1), 0) << "h" << 1LL), RecordId(1));
+ auto deleteOperation =
+ std::make_pair(BSON("ts" << Timestamp(Seconds(2), 0) <<
+ "h" << 1LL <<
+ "op" << "d" <<
+ "ns" << "test.t" <<
+ "o" << BSON("_id" << 0)),
+ RecordId(2));
+ class RollbackSourceLocal : public RollbackSourceMock {
+ public:
+ RollbackSourceLocal(std::unique_ptr<OplogInterface> oplog)
+ : RollbackSourceMock(std::move(oplog)),
+ called(false) { }
+ BSONObj findOne(const NamespaceString& nss, const BSONObj& filter) const {
+ called = true;
+ return BSONObj();
+ }
+ mutable bool called;
+ };
+ RollbackSourceLocal rollbackSource(std::unique_ptr<OplogInterface>(new OplogInterfaceMock({
+ commonOperation,
+ })));
+ OpTime opTime(deleteOperation.first["ts"].timestamp(),
+ deleteOperation.first["h"].Long());
+ ASSERT_OK(
+ syncRollback(
+ _txn.get(),
+ opTime,
+ OplogInterfaceMock({deleteOperation, commonOperation}),
+ rollbackSource,
+ _coordinator.get(),
+ noSleep,
+ globalWriteLockTimeoutMs));
+ ASSERT_TRUE(rollbackSource.called);
+ }
+
+ TEST_F(RSRollbackTest, RollbackUnknownCommand) {
+ createOplog(_txn.get());
+ auto commonOperation =
+ std::make_pair(BSON("ts" << Timestamp(Seconds(1), 0) << "h" << 1LL), RecordId(1));
+ auto unknownCommandOperation =
+ std::make_pair(BSON("ts" << Timestamp(Seconds(2), 0) <<
+ "h" << 1LL <<
+ "op" << "c" <<
+ "ns" << "test.t" <<
+ "o" << BSON("unknown_command" << "t")),
+ RecordId(2));
+ {
+ Lock::DBLock dbLock(_txn->lockState(), "test", MODE_X);
+ mongo::WriteUnitOfWork wuow(_txn.get());
+ auto db = dbHolder().openDb(_txn.get(), "test");
+ ASSERT_TRUE(db);
+ ASSERT_TRUE(db->getOrCreateCollection(_txn.get(), "test.t"));
+ wuow.commit();
+ }
+ OpTime opTime(unknownCommandOperation.first["ts"].timestamp(),
+ unknownCommandOperation.first["h"].Long());
+ auto status =
+ syncRollback(
+ _txn.get(),
+ opTime,
+ OplogInterfaceMock({unknownCommandOperation, commonOperation}),
+ RollbackSourceMock(std::unique_ptr<OplogInterface>(new OplogInterfaceMock({
+ commonOperation,
+ }))),
+ _coordinator.get(),
+ noSleep,
+ globalWriteLockTimeoutMs);
+ ASSERT_EQUALS(ErrorCodes::UnrecoverableRollbackError, status.code());
+ ASSERT_EQUALS(18751, status.location());
+ }
+
+// Re-enable after fixing recovery unit noop registerChange behavior
+#if 0
+ TEST_F(RSRollbackTest, RollbackDropCollectionCommand) {
+ createOplog(_txn.get());
+ auto commonOperation =
+ std::make_pair(BSON("ts" << Timestamp(Seconds(1), 0) << "h" << 1LL), RecordId(1));
+ auto dropCollectionOperation =
+ std::make_pair(BSON("ts" << Timestamp(Seconds(2), 0) <<
+ "h" << 1LL <<
+ "op" << "c" <<
+ "ns" << "test.t" <<
+ "o" << BSON("drop" << "t")),
+ RecordId(2));
+ class RollbackSourceLocal : public RollbackSourceMock {
+ public:
+ RollbackSourceLocal(std::unique_ptr<OplogInterface> oplog)
+ : RollbackSourceMock(std::move(oplog)),
+ called(false) { }
+ void copyCollectionFromRemote(OperationContext* txn,
+ const NamespaceString& nss) const override {
+ called = true;
+ }
+ mutable bool called;
+ };
+ RollbackSourceLocal rollbackSource(std::unique_ptr<OplogInterface>(new OplogInterfaceMock({
+ commonOperation,
+ })));
+ {
+ Lock::DBLock dbLock(_txn->lockState(), "test", MODE_X);
+ mongo::WriteUnitOfWork wuow(_txn.get());
+ auto db = dbHolder().openDb(_txn.get(), "test");
+ ASSERT_TRUE(db);
+ ASSERT_TRUE(db->getOrCreateCollection(_txn.get(), "test.t"));
+ wuow.commit();
+ }
+ OpTime opTime(dropCollectionOperation.first["ts"].timestamp(),
+ dropCollectionOperation.first["h"].Long());
+ ASSERT_OK(
+ syncRollback(
+ _txn.get(),
+ opTime,
+ OplogInterfaceMock({dropCollectionOperation, commonOperation}),
+ rollbackSource,
+ _coordinator.get(),
+ noSleep,
+ globalWriteLockTimeoutMs));
+ ASSERT_TRUE(rollbackSource.called);
+ }
+#endif // 0
+
+ TEST_F(RSRollbackTest, RollbackCollectionModificationCommand) {
+ createOplog(_txn.get());
+ auto commonOperation =
+ std::make_pair(BSON("ts" << Timestamp(Seconds(1), 0) << "h" << 1LL), RecordId(1));
+ auto collectionModificationOperation =
+ std::make_pair(BSON("ts" << Timestamp(Seconds(2), 0) <<
+ "h" << 1LL <<
+ "op" << "c" <<
+ "ns" << "test.t" <<
+ "o" << BSON("collMod" << "t" << "noPadding" << false)),
+ RecordId(2));
+ class RollbackSourceLocal : public RollbackSourceMock {
+ public:
+ RollbackSourceLocal(std::unique_ptr<OplogInterface> oplog)
+ : RollbackSourceMock(std::move(oplog)),
+ called(false) { }
+ StatusWith<BSONObj> getCollectionInfo(const NamespaceString& nss) const {
+ called = true;
+ return RollbackSourceMock::getCollectionInfo(nss);
+ }
+ mutable bool called;
+ };
+ RollbackSourceLocal rollbackSource(std::unique_ptr<OplogInterface>(new OplogInterfaceMock({
+ commonOperation,
+ })));
+ {
+ Lock::DBLock dbLock(_txn->lockState(), "test", MODE_X);
+ mongo::WriteUnitOfWork wuow(_txn.get());
+ auto db = dbHolder().openDb(_txn.get(), "test");
+ ASSERT_TRUE(db);
+ ASSERT_TRUE(db->getOrCreateCollection(_txn.get(), "test.t"));
+ wuow.commit();
+ }
+ OpTime opTime(collectionModificationOperation.first["ts"].timestamp(),
+ collectionModificationOperation.first["h"].Long());
+ ASSERT_OK(
+ syncRollback(
+ _txn.get(),
+ opTime,
+ OplogInterfaceMock({collectionModificationOperation, commonOperation}),
+ rollbackSource,
+ _coordinator.get(),
+ noSleep,
+ globalWriteLockTimeoutMs));
+ ASSERT_TRUE(rollbackSource.called);
+ }
+
+ TEST_F(RSRollbackTest, RollbackCollectionModificationCommandInvalidCollectionOptions) {
+ createOplog(_txn.get());
+ auto commonOperation =
+ std::make_pair(BSON("ts" << Timestamp(Seconds(1), 0) << "h" << 1LL), RecordId(1));
+ auto collectionModificationOperation =
+ std::make_pair(BSON("ts" << Timestamp(Seconds(2), 0) <<
+ "h" << 1LL <<
+ "op" << "c" <<
+ "ns" << "test.t" <<
+ "o" << BSON("collMod" << "t" << "noPadding" << false)),
+ RecordId(2));
+ class RollbackSourceLocal : public RollbackSourceMock {
+ public:
+ RollbackSourceLocal(std::unique_ptr<OplogInterface> oplog)
+ : RollbackSourceMock(std::move(oplog)) { }
+ StatusWith<BSONObj> getCollectionInfo(const NamespaceString& nss) const {
+ return BSON("name" << nss.ns() << "options" << 12345);
+ }
+ };
+ RollbackSourceLocal rollbackSource(std::unique_ptr<OplogInterface>(new OplogInterfaceMock({
+ commonOperation,
+ })));
+ {
+ Lock::DBLock dbLock(_txn->lockState(), "test", MODE_X);
+ mongo::WriteUnitOfWork wuow(_txn.get());
+ auto db = dbHolder().openDb(_txn.get(), "test");
+ ASSERT_TRUE(db);
+ ASSERT_TRUE(db->getOrCreateCollection(_txn.get(), "test.t"));
+ wuow.commit();
+ }
+ OpTime opTime(collectionModificationOperation.first["ts"].timestamp(),
+ collectionModificationOperation.first["h"].Long());
+ auto status =
+ syncRollback(
+ _txn.get(),
+ opTime,
+ OplogInterfaceMock({collectionModificationOperation, commonOperation}),
+ rollbackSource,
+ _coordinator.get(),
+ noSleep,
+ globalWriteLockTimeoutMs);
+ ASSERT_EQUALS(ErrorCodes::UnrecoverableRollbackError, status.code());
+ ASSERT_EQUALS(18753, status.location());
+ }
+
+} // namespace