diff options
author | Benety Goh <benety@mongodb.com> | 2015-05-27 22:00:05 -0400 |
---|---|---|
committer | Benety Goh <benety@mongodb.com> | 2015-06-02 09:50:22 -0400 |
commit | e02e22a862b40183c54918f264b538a31f39eef6 (patch) | |
tree | 3ee7bd28760196b3cf4b447511f18c6c559608bf /src/mongo/db/repl | |
parent | f0fa4307ef62af978454951900c7f4ae3a2cd8f6 (diff) | |
download | mongo-e02e22a862b40183c54918f264b538a31f39eef6.tar.gz |
SERVER-18035 cleaned up syncRollback. added unit test rs_rollback_test.
Diffstat (limited to 'src/mongo/db/repl')
19 files changed, 2341 insertions, 230 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index d7db78aae5c..8cf92b22702 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -78,6 +78,76 @@ env.Library( ) env.Library( + target='oplog_interface_local', + source=[ + 'oplog_interface_local.cpp', + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/db/query/internal_plans', + '$BUILD_DIR/mongo/db/query/query', + ], +) + +env.Library( + target='oplog_interface_mock', + source=[ + 'oplog_interface_mock.cpp', + ], + LIBDEPS=[ + ], +) + +env.Library( + target='oplog_interface_remote', + source=[ + 'oplog_interface_remote.cpp', + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/client/clientdriver', + ], +) + +env.Library( + target='rs_rollback', + source=[ + 'rs_rollback.cpp', + ], + LIBDEPS=[ + 'oplog_interface_local', + 'oplog_interface_remote', + 'roll_back_local_operations', + 'rollback_source_impl', + ], +) + +env.CppUnitTest( + target='rs_rollback_test', + source=[ + 'rs_rollback_test.cpp', + ], + LIBDEPS=[ + 'oplog_interface_mock', + 'replmocks', + 'rs_rollback', + '$BUILD_DIR/mongo/db/coredb', + '$BUILD_DIR/mongo/db/serveronly', + '$BUILD_DIR/mongo/util/ntservice_mock', + ], + NO_CRUTCH = True, +) + +env.Library( + target='rollback_source_impl', + source=[ + 'rollback_source_impl.cpp', + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/bson/bson', + '$BUILD_DIR/mongo/client/clientdriver', + ], +) + +env.Library( target='sync_tail', source=[ 'sync_tail.cpp', @@ -545,3 +615,24 @@ env.CppUnitTest( '$BUILD_DIR/mongo/unittest/concurrency', ], ) + +env.Library( + target='roll_back_local_operations', + source=[ + 'roll_back_local_operations.cpp', + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/bson/bson', + '$BUILD_DIR/mongo/logger/logger', + '$BUILD_DIR/mongo/util/foundation', + ], +) + +env.CppUnitTest( + target='roll_back_local_operations_test', + source='roll_back_local_operations_test.cpp', + LIBDEPS=[ + 'oplog_interface_mock', + 'roll_back_local_operations', + ], +) diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp index 164cb3c99c7..29ad6d5704f 100644 --- a/src/mongo/db/repl/bgsync.cpp +++ b/src/mongo/db/repl/bgsync.cpp @@ -41,8 +41,11 @@ #include "mongo/db/dbhelpers.h" #include "mongo/db/operation_context_impl.h" #include "mongo/db/repl/oplog.h" +#include "mongo/db/repl/oplog_interface_local.h" +#include "mongo/db/repl/oplogreader.h" #include "mongo/db/repl/replication_coordinator_global.h" #include "mongo/db/repl/replication_coordinator_impl.h" +#include "mongo/db/repl/rollback_source_impl.h" #include "mongo/db/repl/rs_rollback.h" #include "mongo/db/repl/rs_sync.h" #include "mongo/db/stats/timer_stats.h" @@ -394,6 +397,19 @@ namespace { bool BackgroundSync::_rollbackIfNeeded(OperationContext* txn, OplogReader& r) { string hn = r.conn()->getServerAddress(); + // Abort only when syncRollback detects we are in a unrecoverable state. + // In other cases, we log the message contained in the error status and retry later. + auto fassertRollbackStatusNoTrace = [](int msgid, const Status& status) { + if (status.isOK()) { + return; + } + if (ErrorCodes::UnrecoverableRollbackError == status.code()) { + fassertNoTrace(msgid, status); + } + warning() << "rollback cannot proceed at this time (retrying later): " + << status; + }; + if (!r.more()) { try { BSONObj theirLastOp = r.getLastOp(rsOplogName.c_str()); @@ -405,7 +421,14 @@ namespace { OpTime theirOpTime = extractOpTime(theirLastOp); if (theirOpTime < _lastOpTimeFetched) { log() << "we are ahead of the sync source, will try to roll back"; - syncRollback(txn, _replCoord->getMyLastOptime(), &r, _replCoord); + fassertRollbackStatusNoTrace( + 28656, + syncRollback(txn, + _replCoord->getMyLastOptime(), + OplogInterfaceLocal(txn, rsOplogName), + RollbackSourceImpl(r.conn(), rsOplogName), + _replCoord)); + return true; } /* we're not ahead? maybe our new query got fresher data. best to come back and try again */ @@ -425,7 +448,13 @@ namespace { if ( opTime != _lastOpTimeFetched || hash != _lastFetchedHash ) { log() << "our last op time fetched: " << _lastOpTimeFetched; log() << "source's GTE: " << opTime; - syncRollback(txn, _replCoord->getMyLastOptime(), &r, _replCoord); + fassertRollbackStatusNoTrace( + 28657, + syncRollback(txn, + _replCoord->getMyLastOptime(), + OplogInterfaceLocal(txn, rsOplogName), + RollbackSourceImpl(r.conn(), rsOplogName), + _replCoord)); return true; } diff --git a/src/mongo/db/repl/oplog_interface.h b/src/mongo/db/repl/oplog_interface.h new file mode 100644 index 00000000000..4ca10947ddd --- /dev/null +++ b/src/mongo/db/repl/oplog_interface.h @@ -0,0 +1,79 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <memory> +#include <string> +#include <utility> + +#include "mongo/base/disallow_copying.h" +#include "mongo/base/status_with.h" +#include "mongo/bson/bsonobj.h" +#include "mongo/db/record_id.h" + +namespace mongo { +namespace repl { + + class OplogInterface { + MONGO_DISALLOW_COPYING(OplogInterface); + public: + + class Iterator; + + OplogInterface() = default; + virtual ~OplogInterface() = default; + + /** + * Diagnostic information. + */ + virtual std::string toString() const = 0; + + /** + * Produces an iterator over oplog collection in reverse natural order. + */ + virtual std::unique_ptr<Iterator> makeIterator() const = 0; + }; + + class OplogInterface::Iterator { + MONGO_DISALLOW_COPYING(Iterator); + public: + + using Value = std::pair<BSONObj, RecordId>; + + Iterator() = default; + virtual ~Iterator() = default; + + /** + * Returns next operation and record id (if applicable) in the oplog. + */ + virtual StatusWith<Value> next() = 0; + }; + +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/oplog_interface_local.cpp b/src/mongo/db/repl/oplog_interface_local.cpp new file mode 100644 index 00000000000..14a2906217b --- /dev/null +++ b/src/mongo/db/repl/oplog_interface_local.cpp @@ -0,0 +1,103 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/repl/oplog_interface_local.h" + +#include "mongo/db/db_raii.h" +#include "mongo/db/query/internal_plans.h" +#include "mongo/db/query/plan_executor.h" +#include "mongo/util/mongoutils/str.h" + +namespace mongo { +namespace repl { + +namespace { + + class OplogIteratorLocal : public OplogInterface::Iterator { + public: + + OplogIteratorLocal(std::unique_ptr<OldClientContext> ctx, + std::unique_ptr<PlanExecutor> exec); + + StatusWith<Value> next() override; + + private: + + std::unique_ptr<OldClientContext> _ctx; + std::unique_ptr<PlanExecutor> _exec; + + }; + + OplogIteratorLocal::OplogIteratorLocal(std::unique_ptr<OldClientContext> ctx, + std::unique_ptr<PlanExecutor> exec) + : _ctx(std::move(ctx)), + _exec(std::move(exec)) { } + + StatusWith<OplogInterface::Iterator::Value> OplogIteratorLocal::next() { + BSONObj obj; + RecordId recordId; + + if (PlanExecutor::ADVANCED != _exec->getNext(&obj, &recordId)) { + return StatusWith<Value>(ErrorCodes::NoSuchKey, "no more operations in local oplog"); + } + return StatusWith<Value>(std::make_pair(obj, recordId)); + } + +} // namespace + + OplogInterfaceLocal::OplogInterfaceLocal(OperationContext* txn, + const std::string& collectionName) + : _txn(txn), + _collectionName(collectionName) { + + invariant(txn); + invariant(!collectionName.empty()); + } + + std::string OplogInterfaceLocal::toString() const { + return str::stream() << + "LocalOplogInterface: " + "operation context: " << _txn->getNS() << "/" << _txn->getOpID() << + "; collection: " << _collectionName; + } + + std::unique_ptr<OplogInterface::Iterator> OplogInterfaceLocal::makeIterator() const { + std::unique_ptr<OldClientContext> ctx(new OldClientContext(_txn, _collectionName)); + std::unique_ptr<PlanExecutor> exec( + InternalPlanner::collectionScan(_txn, + _collectionName, + ctx->db()->getCollection(_collectionName), + InternalPlanner::BACKWARD)); + return std::unique_ptr<OplogInterface::Iterator>( + new OplogIteratorLocal(std::move(ctx), std::move(exec))); + } + +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/oplog_interface_local.h b/src/mongo/db/repl/oplog_interface_local.h new file mode 100644 index 00000000000..cd61a81a239 --- /dev/null +++ b/src/mongo/db/repl/oplog_interface_local.h @@ -0,0 +1,58 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/db/repl/oplog_interface.h" + +namespace mongo { + + class OperationContext; + +namespace repl { + + /** + * Scans local oplog collection in reverse natural order. + */ + + class OplogInterfaceLocal : public OplogInterface { + public: + + OplogInterfaceLocal(OperationContext* txn, const std::string& collectionName); + std::string toString() const override; + std::unique_ptr<OplogInterface::Iterator> makeIterator() const override; + + private: + + OperationContext* _txn; + std::string _collectionName; + + }; + +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/oplog_interface_mock.cpp b/src/mongo/db/repl/oplog_interface_mock.cpp new file mode 100644 index 00000000000..97cea831fd7 --- /dev/null +++ b/src/mongo/db/repl/oplog_interface_mock.cpp @@ -0,0 +1,83 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/repl/oplog_interface_mock.h" + +namespace mongo { +namespace repl { + +namespace { + + class OplogIteratorMock : public OplogInterface::Iterator { + public: + + OplogIteratorMock(OplogInterfaceMock::Operations::const_iterator iterator, + OplogInterfaceMock::Operations::const_iterator iteratorEnd); + StatusWith<Value> next() override; + + private: + + OplogInterfaceMock::Operations::const_iterator _iterator; + OplogInterfaceMock::Operations::const_iterator _iteratorEnd; + + }; + + OplogIteratorMock::OplogIteratorMock(OplogInterfaceMock::Operations::const_iterator iter, + OplogInterfaceMock::Operations::const_iterator iterEnd) + : _iterator(iter), + _iteratorEnd(iterEnd) {} + + StatusWith<OplogInterface::Iterator::Value> OplogIteratorMock::next() { + if (_iterator == _iteratorEnd) { + return StatusWith<OplogInterface::Iterator::Value>(ErrorCodes::NoSuchKey, + "no more ops"); + } + return *(_iterator++); + } + +} // namespace + + OplogInterfaceMock::OplogInterfaceMock(std::initializer_list<Operation> operations) + : _operations(operations) {} + + OplogInterfaceMock::OplogInterfaceMock(const Operations& operations) + : _operations(operations) {} + + std::string OplogInterfaceMock::toString() const { + return "OplogInterfaceMock"; + } + + std::unique_ptr<OplogInterface::Iterator> OplogInterfaceMock::makeIterator() const { + return std::unique_ptr<OplogInterface::Iterator>( + new OplogIteratorMock(_operations.begin(), _operations.end())); + } + +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/oplog_interface_mock.h b/src/mongo/db/repl/oplog_interface_mock.h new file mode 100644 index 00000000000..4c2049a5688 --- /dev/null +++ b/src/mongo/db/repl/oplog_interface_mock.h @@ -0,0 +1,54 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <initializer_list> +#include "mongo/db/repl/oplog_interface.h" + +namespace mongo { +namespace repl { + + /** + * Simulates oplog for testing rollback functionality. + */ + class OplogInterfaceMock : public OplogInterface { + MONGO_DISALLOW_COPYING(OplogInterfaceMock); + public: + using Operation = std::pair<BSONObj,RecordId>; + using Operations = std::list<Operation>; + explicit OplogInterfaceMock(std::initializer_list<Operation> operations); + explicit OplogInterfaceMock(const Operations& operations); + std::string toString() const override; + std::unique_ptr<OplogInterface::Iterator> makeIterator() const override; + private: + Operations _operations; + }; + +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/oplog_interface_remote.cpp b/src/mongo/db/repl/oplog_interface_remote.cpp new file mode 100644 index 00000000000..da78924fc55 --- /dev/null +++ b/src/mongo/db/repl/oplog_interface_remote.cpp @@ -0,0 +1,89 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/repl/oplog_interface_remote.h" + +#include "mongo/client/dbclientinterface.h" +#include "mongo/db/jsobj.h" +#include "mongo/util/mongoutils/str.h" + +namespace mongo { +namespace repl { + +namespace { + + class OplogIteratorRemote : public OplogInterface::Iterator { + public: + + OplogIteratorRemote(std::unique_ptr<DBClientCursor> cursor); + StatusWith<Value> next() override; + + private: + + std::unique_ptr<DBClientCursor> _cursor; + + }; + + OplogIteratorRemote::OplogIteratorRemote(std::unique_ptr<DBClientCursor> cursor) + : _cursor(std::move(cursor)) { } + + StatusWith<OplogInterface::Iterator::Value> OplogIteratorRemote::next() { + if (!_cursor.get()) { + return StatusWith<Value>(ErrorCodes::NamespaceNotFound, "no cursor for remote oplog"); + } + if (!_cursor->more()) { + return StatusWith<Value>(ErrorCodes::NoSuchKey, "no more operations in remote oplog"); + } + return StatusWith<Value>(std::make_pair(_cursor->nextSafe(), RecordId())); + } + +} // namespace + + OplogInterfaceRemote::OplogInterfaceRemote(DBClientConnection* conn, + const std::string& collectionName) + : _conn(conn), + _collectionName(collectionName) { + + invariant(conn); + } + + std::string OplogInterfaceRemote::toString() const { + return _conn->toString(); + } + + std::unique_ptr<OplogInterface::Iterator> OplogInterfaceRemote::makeIterator() const { + const Query query = Query().sort(BSON("$natural" << -1)); + const BSONObj fields = BSON("ts" << 1 << "h" << 1); + return std::unique_ptr<OplogInterface::Iterator>( + new OplogIteratorRemote(_conn->query(_collectionName, query, 0, 0, &fields, 0, 0))); + } + +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/oplog_interface_remote.h b/src/mongo/db/repl/oplog_interface_remote.h new file mode 100644 index 00000000000..ee91d9197d2 --- /dev/null +++ b/src/mongo/db/repl/oplog_interface_remote.h @@ -0,0 +1,58 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/db/repl/oplog_interface.h" + +namespace mongo { + + class DBClientConnection; + +namespace repl { + + /** + * Reads oplog on remote server. + */ + + class OplogInterfaceRemote : public OplogInterface { + public: + + explicit OplogInterfaceRemote(DBClientConnection* conn, const std::string& collectionName); + std::string toString() const override; + std::unique_ptr<OplogInterface::Iterator> makeIterator() const override; + + private: + + DBClientConnection* _conn; + std::string _collectionName; + + }; + +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp index 4ca92dca19d..4648eb05c7d 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.cpp +++ b/src/mongo/db/repl/replication_coordinator_mock.cpp @@ -64,6 +64,12 @@ namespace repl { } ReplicationCoordinator::Mode ReplicationCoordinatorMock::getReplicationMode() const { + if (_settings.usingReplSets()) { + return modeReplSet; + } + if (_settings.master || _settings.slave) { + return modeMasterSlave; + } return modeNone; } diff --git a/src/mongo/db/repl/roll_back_local_operations.cpp b/src/mongo/db/repl/roll_back_local_operations.cpp new file mode 100644 index 00000000000..2cf07fa36b7 --- /dev/null +++ b/src/mongo/db/repl/roll_back_local_operations.cpp @@ -0,0 +1,197 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication + +#include "mongo/platform/basic.h" + +#include "mongo/db/repl/roll_back_local_operations.h" + +#include "mongo/util/assert_util.h" +#include "mongo/util/log.h" +#include "mongo/util/mongoutils/str.h" + +namespace mongo { +namespace repl { + +namespace { + + Timestamp getTimestamp(const BSONObj& operation) { + return operation["ts"].timestamp(); + } + + Timestamp getTimestamp(const OplogInterface::Iterator::Value& oplogValue) { + return getTimestamp(oplogValue.first); + } + + long long getHash(const BSONObj& operation) { + return operation["h"].Long(); + } + + long long getHash(const OplogInterface::Iterator::Value& oplogValue) { + return getHash(oplogValue.first); + } + +} // namespace + + RollBackLocalOperations::RollBackLocalOperations( + const OplogInterface& localOplog, + const RollbackOperationFn& rollbackOperation) + + : _localOplogIterator(localOplog.makeIterator()), + _rollbackOperation(rollbackOperation), + _scanned(0) { + + uassert(ErrorCodes::BadValue, "invalid local oplog iterator", _localOplogIterator); + uassert(ErrorCodes::BadValue, "null roll back operation function", rollbackOperation); + } + + StatusWith<RollBackLocalOperations::RollbackCommonPoint> + RollBackLocalOperations::onRemoteOperation(const BSONObj& operation) { + + if (_scanned == 0) { + auto result = _localOplogIterator->next(); + if (!result.isOK()) { + return StatusWith<RollbackCommonPoint>(ErrorCodes::OplogStartMissing, + "no oplog during initsync"); + } + _localOplogValue = result.getValue(); + + long long diff = + static_cast<long long>(getTimestamp(_localOplogValue).getSecs()) - + getTimestamp(operation).getSecs(); + // diff could be positive, negative, or zero + log() << "rollback our last optime: " + << getTimestamp(_localOplogValue).toStringPretty(); + log() << "rollback their last optime: " << getTimestamp(operation).toStringPretty(); + log() << "rollback diff in end of log times: " << diff << " seconds"; + if (diff > 1800) { + severe() << "rollback too long a time period for a rollback."; + return StatusWith<RollbackCommonPoint>( + ErrorCodes::ExceededTimeLimit, + "rollback error: not willing to roll back more than 30 minutes of data"); + } + } + + while (getTimestamp(_localOplogValue) > getTimestamp(operation)) { + _scanned++; + auto status = _rollbackOperation(_localOplogValue.first); + if (!status.isOK()) { + invariant(ErrorCodes::NoSuchKey != status.code()); + return status; + } + auto result = _localOplogIterator->next(); + if (!result.isOK()) { + severe() << "rollback error RS101 reached beginning of local oplog"; + log() << " scanned: " << _scanned; + log() << " theirTime: " << getTimestamp(operation).toStringLong(); + log() << " ourTime: " << getTimestamp(_localOplogValue).toStringLong(); + return StatusWith<RollbackCommonPoint>( + ErrorCodes::NoMatchingDocument, + "RS101 reached beginning of local oplog [2]"); + } + _localOplogValue = result.getValue(); + } + + if (getTimestamp(_localOplogValue) == getTimestamp(operation)) { + _scanned++; + if (getHash(_localOplogValue) == getHash(operation)) { + return StatusWith<RollbackCommonPoint>( + std::make_pair(getTimestamp(_localOplogValue), _localOplogValue.second)); + } + auto status = _rollbackOperation(_localOplogValue.first); + if (!status.isOK()) { + invariant(ErrorCodes::NoSuchKey != status.code()); + return status; + } + auto result = _localOplogIterator->next(); + if (!result.isOK()) { + severe() << "rollback error RS101 reached beginning of local oplog"; + log() << " scanned: " << _scanned; + log() << " theirTime: " << getTimestamp(operation).toStringLong(); + log() << " ourTime: " << getTimestamp(_localOplogValue).toStringLong(); + return StatusWith<RollbackCommonPoint>( + ErrorCodes::NoMatchingDocument, + "RS101 reached beginning of local oplog [1]"); + } + _localOplogValue = result.getValue(); + return StatusWith<RollbackCommonPoint>( + ErrorCodes::NoSuchKey, + "Unable to determine common point - same timestamp but different hash. " + "Need to process additional remote operations."); + } + + if (getTimestamp(_localOplogValue) < getTimestamp(operation)) { + _scanned++; + return StatusWith<RollbackCommonPoint>( + ErrorCodes::NoSuchKey, + "Unable to determine common point. " + "Need to process additional remote operations."); + } + + return RollbackCommonPoint(Timestamp(Seconds(1), 0), RecordId()); + } + + StatusWith<RollBackLocalOperations::RollbackCommonPoint> syncRollBackLocalOperations( + const OplogInterface& localOplog, + const OplogInterface& remoteOplog, + const RollBackLocalOperations::RollbackOperationFn& rollbackOperation) { + + auto remoteIterator = remoteOplog.makeIterator(); + auto remoteResult = remoteIterator->next(); + if (!remoteResult.isOK()) { + return StatusWith<RollBackLocalOperations::RollbackCommonPoint>( + ErrorCodes::InvalidSyncSource, + "remote oplog empty or unreadable"); + } + + RollBackLocalOperations finder(localOplog, rollbackOperation); + Timestamp theirTime; + while (remoteResult.isOK()) { + theirTime = remoteResult.getValue().first["ts"].timestamp(); + BSONObj theirObj = remoteResult.getValue().first; + auto result = finder.onRemoteOperation(theirObj); + if (result.isOK()) { + return result.getValue(); + } + else if (result.getStatus().code() != ErrorCodes::NoSuchKey) { + return result; + } + remoteResult = remoteIterator->next(); + } + + severe() << "rollback error RS100 reached beginning of remote oplog"; + log() << " them: " << remoteOplog.toString(); + log() << " theirTime: " << theirTime.toStringLong(); + return StatusWith<RollBackLocalOperations::RollbackCommonPoint>( + ErrorCodes::NoMatchingDocument, + "RS100 reached beginning of remote oplog [1]"); + } + +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/roll_back_local_operations.h b/src/mongo/db/repl/roll_back_local_operations.h new file mode 100644 index 00000000000..4a9d5b71cd8 --- /dev/null +++ b/src/mongo/db/repl/roll_back_local_operations.h @@ -0,0 +1,95 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/base/disallow_copying.h" +#include "mongo/base/status.h" +#include "mongo/base/status_with.h" +#include "mongo/bson/bsonobj.h" +#include "mongo/db/record_id.h" +#include "mongo/db/repl/oplog_interface.h" +#include "mongo/stdx/functional.h" + +namespace mongo { +namespace repl { + + class RollBackLocalOperations { + MONGO_DISALLOW_COPYING(RollBackLocalOperations); + public: + + /** + * Type of function to roll back an operation or process it for future use. + * It can return any status except ErrorCodes::NoSuchKey. See onRemoteOperation(). + */ + using RollbackOperationFn = stdx::function<Status (const BSONObj&)>; + + using RollbackCommonPoint = std::pair<Timestamp, RecordId>; + + /** + * Initializes rollback processor with a valid local oplog. + * Whenever we encounter an operation in the local oplog that has to be rolled back, + * we will pass it to 'rollbackOperation'. + */ + RollBackLocalOperations(const OplogInterface& localOplog, + const RollbackOperationFn& rollbackOperation); + + virtual ~RollBackLocalOperations() = default; + + /** + * Process single remote operation. + * Returns ErrorCodes::NoSuchKey if common point has not been found and + * additional operations have to be read from the remote oplog. + */ + StatusWith<RollbackCommonPoint> onRemoteOperation(const BSONObj& operation); + + private: + + std::unique_ptr<OplogInterface::Iterator> _localOplogIterator; + RollbackOperationFn _rollbackOperation; + OplogInterface::Iterator::Value _localOplogValue; + unsigned long long _scanned; + + }; + + /** + * Rolls back every operation in the local oplog that is not in the remote oplog, in reverse + * order. + * + * Whenever we encounter an operation in the local oplog that has to be rolled back, + * we will pass it to 'rollbackOperation' starting with the most recent operation. + * It is up to 'rollbackOperation' to roll back this operation immediately or + * process it for future use. + */ + StatusWith<RollBackLocalOperations::RollbackCommonPoint> syncRollBackLocalOperations( + const OplogInterface& localOplog, + const OplogInterface& remoteOplog, + const RollBackLocalOperations::RollbackOperationFn& rollbackOperation); + +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/roll_back_local_operations_test.cpp b/src/mongo/db/repl/roll_back_local_operations_test.cpp new file mode 100644 index 00000000000..9fae8ce648c --- /dev/null +++ b/src/mongo/db/repl/roll_back_local_operations_test.cpp @@ -0,0 +1,432 @@ +/** + * Copyright 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include <iterator> + +#include "mongo/db/jsobj.h" +#include "mongo/db/repl/oplog_interface_mock.h" +#include "mongo/db/repl/roll_back_local_operations.h" +#include "mongo/unittest/unittest.h" + +namespace { + + using namespace mongo; + using namespace mongo::repl; + + const OplogInterfaceMock::Operations kEmptyMockOperations; + + BSONObj makeOp(int seconds, long long hash) { + return BSON("ts" << Timestamp(Seconds(seconds), 0) << "h" << hash); + } + + int recordId = 0; + OplogInterfaceMock::Operation makeOpAndRecordId(int seconds, long long hash) { + return std::make_pair(makeOp(seconds, hash), RecordId(++recordId)); + } + + TEST(RollBackLocalOperationsTest, InvalidLocalOplogIterator) { + class InvalidOplogInterface : public OplogInterface { + public: + std::string toString() const override { return ""; } + std::unique_ptr<Iterator> makeIterator() const override { + return std::unique_ptr<Iterator>(); + } + } invalidOplog; + ASSERT_THROWS_CODE( + RollBackLocalOperations(invalidOplog, [](const BSONObj&) { return Status::OK(); }), + UserException, + ErrorCodes::BadValue); + } + + TEST(RollBackLocalOperationsTest, InvalidRollbackOperationFunction) { + ASSERT_THROWS_CODE( + RollBackLocalOperations(OplogInterfaceMock({makeOpAndRecordId(1, 0)}), + RollBackLocalOperations::RollbackOperationFn()), + UserException, + ErrorCodes::BadValue); + } + + TEST(RollBackLocalOperationsTest, EmptyLocalOplog) { + OplogInterfaceMock localOplog(kEmptyMockOperations); + RollBackLocalOperations finder(localOplog, [](const BSONObj&) { return Status::OK(); }); + auto result = finder.onRemoteOperation(makeOp(1, 0)); + ASSERT_EQUALS(ErrorCodes::OplogStartMissing, result.getStatus().code()); + } + + TEST(RollBackLocalOperationsTest, RollbackPeriodTooLong) { + OplogInterfaceMock localOplog({makeOpAndRecordId(1802, 0)}); + RollBackLocalOperations finder(localOplog, [](const BSONObj&) { return Status::OK(); }); + auto result = finder.onRemoteOperation(makeOp(1, 0)); + ASSERT_EQUALS(ErrorCodes::ExceededTimeLimit, result.getStatus().code()); + } + + TEST(RollBackLocalOperationsTest, RollbackMultipleLocalOperations) { + auto commonOperation = makeOpAndRecordId(1, 1); + OplogInterfaceMock::Operations localOperations({ + makeOpAndRecordId(5, 1), + makeOpAndRecordId(4, 1), + makeOpAndRecordId(3, 1), + makeOpAndRecordId(2, 1), + commonOperation, + }); + OplogInterfaceMock localOplog(localOperations); + auto i = localOperations.cbegin(); + auto rollbackOperation = [&](const BSONObj& operation) { + ASSERT_EQUALS(i->first, operation); + i++; + return Status::OK(); + }; + RollBackLocalOperations finder(localOplog, rollbackOperation); + auto result = finder.onRemoteOperation(commonOperation.first); + ASSERT_OK(result.getStatus()); + ASSERT_EQUALS(commonOperation.first["ts"].timestamp(), result.getValue().first); + ASSERT_EQUALS(commonOperation.second, result.getValue().second); + ASSERT_FALSE(i == localOperations.cend()); + ASSERT_EQUALS(commonOperation.first, i->first); + i++; + ASSERT_TRUE(i == localOperations.cend()); + } + + TEST(RollBackLocalOperationsTest, RollbackOperationFailed) { + auto commonOperation = makeOpAndRecordId(1, 1); + OplogInterfaceMock::Operations localOperations({ + makeOpAndRecordId(2, 1), + commonOperation, + }); + OplogInterfaceMock localOplog(localOperations); + auto rollbackOperation = [&](const BSONObj& operation) { + return Status(ErrorCodes::OperationFailed, ""); + }; + RollBackLocalOperations finder(localOplog, rollbackOperation); + auto result = finder.onRemoteOperation(commonOperation.first); + ASSERT_EQUALS(ErrorCodes::OperationFailed, result.getStatus().code()); + } + + TEST(RollBackLocalOperationsTest, EndOfLocalOplog) { + auto commonOperation = makeOpAndRecordId(1, 1); + OplogInterfaceMock::Operations localOperations({ + makeOpAndRecordId(2, 1), + }); + OplogInterfaceMock localOplog(localOperations); + RollBackLocalOperations finder(localOplog, [](const BSONObj&) { return Status::OK(); }); + auto result = finder.onRemoteOperation(commonOperation.first); + ASSERT_EQUALS(ErrorCodes::NoMatchingDocument, result.getStatus().code()); + } + + TEST(RollBackLocalOperationsTest, SkipRemoteOperations) { + auto commonOperation = makeOpAndRecordId(1, 1); + OplogInterfaceMock::Operations localOperations({ + makeOpAndRecordId(5, 1), + makeOpAndRecordId(4, 1), + makeOpAndRecordId(2, 1), + commonOperation, + }); + OplogInterfaceMock localOplog(localOperations); + auto i = localOperations.cbegin(); + auto rollbackOperation = [&](const BSONObj& operation) { + ASSERT_EQUALS(i->first, operation); + i++; + return Status::OK(); + }; + RollBackLocalOperations finder(localOplog, rollbackOperation); + { + auto result = finder.onRemoteOperation(makeOp(6,1)); + ASSERT_EQUALS(ErrorCodes::NoSuchKey, result.getStatus().code()); + ASSERT_TRUE(i == localOperations.cbegin()); + } + { + auto result = finder.onRemoteOperation(makeOp(3,1)); + ASSERT_EQUALS(ErrorCodes::NoSuchKey, result.getStatus().code()); + ASSERT_TRUE(std::distance(localOperations.cbegin(), i) == 2); + } + auto result = finder.onRemoteOperation(commonOperation.first); + ASSERT_OK(result.getStatus()); + ASSERT_EQUALS(commonOperation.first["ts"].timestamp(), result.getValue().first); + ASSERT_EQUALS(commonOperation.second, result.getValue().second); + ASSERT_FALSE(i == localOperations.cend()); + ASSERT_EQUALS(commonOperation.first, i->first); + i++; + ASSERT_TRUE(i == localOperations.cend()); + } + + TEST(RollBackLocalOperationsTest, SameTimestampDifferentHashess) { + auto commonOperation = makeOpAndRecordId(1, 1); + OplogInterfaceMock::Operations localOperations({ + makeOpAndRecordId(1, 5), + makeOpAndRecordId(1, 3), + commonOperation, + }); + OplogInterfaceMock localOplog(localOperations); + auto i = localOperations.cbegin(); + auto rollbackOperation = [&](const BSONObj& operation) { + ASSERT_EQUALS(i->first, operation); + i++; + return Status::OK(); + }; + RollBackLocalOperations finder(localOplog, rollbackOperation); + { + auto result = finder.onRemoteOperation(makeOp(1,4)); + ASSERT_EQUALS(ErrorCodes::NoSuchKey, result.getStatus().code()); + ASSERT_TRUE(std::distance(localOperations.cbegin(), i) == 1); + } + { + auto result = finder.onRemoteOperation(makeOp(1,2)); + ASSERT_EQUALS(ErrorCodes::NoSuchKey, result.getStatus().code()); + ASSERT_TRUE(std::distance(localOperations.cbegin(), i) == 2); + } + auto result = finder.onRemoteOperation(commonOperation.first); + ASSERT_OK(result.getStatus()); + ASSERT_EQUALS(commonOperation.first["ts"].timestamp(), result.getValue().first); + ASSERT_EQUALS(commonOperation.second, result.getValue().second); + ASSERT_FALSE(i == localOperations.cend()); + ASSERT_EQUALS(commonOperation.first, i->first); + i++; + ASSERT_TRUE(i == localOperations.cend()); + } + + TEST(RollBackLocalOperationsTest, SameTimestampDifferentHashesRollbackOperationFailed) { + auto commonOperation = makeOpAndRecordId(1, 1); + OplogInterfaceMock::Operations localOperations({ + makeOpAndRecordId(1, 3), + commonOperation, + }); + OplogInterfaceMock localOplog(localOperations); + auto rollbackOperation = [&](const BSONObj& operation) { + return Status(ErrorCodes::OperationFailed, ""); + }; + RollBackLocalOperations finder(localOplog, rollbackOperation); + auto result = finder.onRemoteOperation(makeOp(1,2)); + ASSERT_EQUALS(ErrorCodes::OperationFailed, result.getStatus().code()); + } + + TEST(RollBackLocalOperationsTest, SameTimestampDifferentHashesEndOfLocalOplog) { + OplogInterfaceMock::Operations localOperations({ + makeOpAndRecordId(1, 3), + }); + OplogInterfaceMock localOplog(localOperations); + RollBackLocalOperations finder(localOplog, [](const BSONObj&) { return Status::OK(); }); + auto result = finder.onRemoteOperation(makeOp(1,2)); + ASSERT_EQUALS(ErrorCodes::NoMatchingDocument, result.getStatus().code()); + } + + TEST(SyncRollBackLocalOperationsTest, OplogStartMissing) { + ASSERT_EQUALS( + ErrorCodes::OplogStartMissing, + syncRollBackLocalOperations( + OplogInterfaceMock(kEmptyMockOperations), + OplogInterfaceMock({makeOpAndRecordId(1, 0)}), + [](const BSONObj&) { return Status::OK(); }).getStatus().code()); + } + + TEST(SyncRollBackLocalOperationsTest, RemoteOplogMissing) { + ASSERT_EQUALS( + ErrorCodes::InvalidSyncSource, + syncRollBackLocalOperations( + OplogInterfaceMock({makeOpAndRecordId(1, 0)}), + OplogInterfaceMock(kEmptyMockOperations), + [](const BSONObj&) { return Status::OK(); }).getStatus().code()); + } + + TEST(SyncRollBackLocalOperationsTest, RollbackPeriodTooLong) { + ASSERT_EQUALS( + ErrorCodes::ExceededTimeLimit, + syncRollBackLocalOperations( + OplogInterfaceMock({makeOpAndRecordId(1802, 0)}), + OplogInterfaceMock({makeOpAndRecordId(1, 0)}), + [](const BSONObj&) { return Status::OK(); }).getStatus().code()); + } + + TEST(SyncRollBackLocalOperationsTest, RollbackTwoOperations) { + auto commonOperation = makeOpAndRecordId(1, 1); + OplogInterfaceMock::Operations localOperations({ + makeOpAndRecordId(3, 1), + makeOpAndRecordId(2, 1), + commonOperation, + }); + auto i = localOperations.cbegin(); + auto result = + syncRollBackLocalOperations( + OplogInterfaceMock(localOperations), + OplogInterfaceMock({commonOperation}), + [&](const BSONObj& operation) { + ASSERT_EQUALS(i->first, operation); + i++; + return Status::OK(); + }); + ASSERT_OK(result.getStatus()); + ASSERT_EQUALS(commonOperation.first["ts"].timestamp(), result.getValue().first); + ASSERT_EQUALS(commonOperation.second, result.getValue().second); + ASSERT_FALSE(i == localOperations.cend()); + ASSERT_EQUALS(commonOperation.first, i->first); + i++; + ASSERT_TRUE(i == localOperations.cend()); + } + + TEST(SyncRollBackLocalOperationsTest, SkipOneRemoteOperation) { + auto commonOperation = makeOpAndRecordId(1, 1); + auto remoteOperation = makeOpAndRecordId(2, 1); + auto result = + syncRollBackLocalOperations( + OplogInterfaceMock({commonOperation}), + OplogInterfaceMock({remoteOperation, commonOperation}), + [&](const BSONObj& operation) { + FAIL("should not reach here"); + return Status::OK(); + }); + ASSERT_OK(result.getStatus()); + ASSERT_EQUALS(commonOperation.first["ts"].timestamp(), result.getValue().first); + ASSERT_EQUALS(commonOperation.second, result.getValue().second); + } + + TEST(SyncRollBackLocalOperationsTest, SameTimestampDifferentHashes) { + auto commonOperation = makeOpAndRecordId(1, 1); + auto localOperation = makeOpAndRecordId(1, 2); + auto remoteOperation = makeOpAndRecordId(1, 3); + bool called = false; + auto result = + syncRollBackLocalOperations( + OplogInterfaceMock({localOperation, commonOperation}), + OplogInterfaceMock({remoteOperation, commonOperation}), + [&](const BSONObj& operation) { + ASSERT_EQUALS(localOperation.first, operation); + called = true; + return Status::OK(); + }); + ASSERT_OK(result.getStatus()); + ASSERT_EQUALS(commonOperation.first["ts"].timestamp(), result.getValue().first); + ASSERT_EQUALS(commonOperation.second, result.getValue().second); + ASSERT_TRUE(called); + } + + TEST(SyncRollBackLocalOperationsTest, SameTimestampEndOfLocalOplog) { + auto commonOperation = makeOpAndRecordId(1, 1); + auto localOperation = makeOpAndRecordId(1, 2); + auto remoteOperation = makeOpAndRecordId(1, 3); + bool called = false; + auto result = + syncRollBackLocalOperations( + OplogInterfaceMock({localOperation}), + OplogInterfaceMock({remoteOperation, commonOperation}), + [&](const BSONObj& operation) { + ASSERT_EQUALS(localOperation.first, operation); + called = true; + return Status::OK(); + }); + ASSERT_EQUALS(ErrorCodes::NoMatchingDocument, result.getStatus().code()); + ASSERT_STRING_CONTAINS(result.getStatus().reason(), + "RS101 reached beginning of local oplog [1]"); + ASSERT_TRUE(called); + } + + TEST(SyncRollBackLocalOperationsTest, SameTimestampRollbackOperationFailed) { + auto commonOperation = makeOpAndRecordId(1, 1); + auto localOperation = makeOpAndRecordId(1, 2); + auto remoteOperation = makeOpAndRecordId(1, 3); + auto result = + syncRollBackLocalOperations( + OplogInterfaceMock({localOperation, commonOperation}), + OplogInterfaceMock({remoteOperation, commonOperation}), + [&](const BSONObj& operation) { + return Status(ErrorCodes::OperationFailed, ""); + }); + ASSERT_EQUALS(ErrorCodes::OperationFailed, result.getStatus().code()); + } + + TEST(SyncRollBackLocalOperationsTest, SameTimestampEndOfRemoteOplog) { + auto commonOperation = makeOpAndRecordId(1, 1); + auto localOperation = makeOpAndRecordId(1, 2); + auto remoteOperation = makeOpAndRecordId(1, 3); + bool called = false; + auto result = + syncRollBackLocalOperations( + OplogInterfaceMock({localOperation, commonOperation}), + OplogInterfaceMock({remoteOperation}), + [&](const BSONObj& operation) { + ASSERT_EQUALS(localOperation.first, operation); + called = true; + return Status::OK(); + }); + ASSERT_EQUALS(ErrorCodes::NoMatchingDocument, result.getStatus().code()); + ASSERT_STRING_CONTAINS(result.getStatus().reason(), + "RS100 reached beginning of remote oplog"); + ASSERT_TRUE(called); + } + + TEST(SyncRollBackLocalOperationsTest, DifferentTimestampEndOfLocalOplog) { + auto commonOperation = makeOpAndRecordId(1, 1); + auto localOperation = makeOpAndRecordId(3, 1); + auto remoteOperation = makeOpAndRecordId(2, 1); + bool called = false; + auto result = + syncRollBackLocalOperations( + OplogInterfaceMock({localOperation}), + OplogInterfaceMock({remoteOperation, commonOperation}), + [&](const BSONObj& operation) { + ASSERT_EQUALS(localOperation.first, operation); + called = true; + return Status::OK(); + }); + ASSERT_EQUALS(ErrorCodes::NoMatchingDocument, result.getStatus().code()); + ASSERT_STRING_CONTAINS(result.getStatus().reason(), + "RS101 reached beginning of local oplog [2]"); + ASSERT_TRUE(called); + } + + TEST(SyncRollBackLocalOperationsTest, DifferentTimestampRollbackOperationFailed) { + auto localOperation = makeOpAndRecordId(3, 1); + auto remoteOperation = makeOpAndRecordId(2, 1); + auto result = + syncRollBackLocalOperations( + OplogInterfaceMock({localOperation}), + OplogInterfaceMock({remoteOperation}), + [&](const BSONObj& operation) { + return Status(ErrorCodes::OperationFailed, ""); + }); + ASSERT_EQUALS(ErrorCodes::OperationFailed, result.getStatus().code()); + } + + TEST(SyncRollBackLocalOperationsTest, DifferentTimestampEndOfRemoteOplog) { + auto commonOperation = makeOpAndRecordId(1, 1); + auto localOperation = makeOpAndRecordId(2, 1); + auto remoteOperation = makeOpAndRecordId(3, 1); + auto result = + syncRollBackLocalOperations( + OplogInterfaceMock({localOperation, commonOperation}), + OplogInterfaceMock({remoteOperation}), + [&](const BSONObj& operation) { + FAIL("Should not reach here"); + return Status::OK(); + }); + ASSERT_EQUALS(ErrorCodes::NoMatchingDocument, result.getStatus().code()); + ASSERT_STRING_CONTAINS(result.getStatus().reason(), + "RS100 reached beginning of remote oplog [1]"); + } + +} // namespace diff --git a/src/mongo/db/repl/rollback_source.h b/src/mongo/db/repl/rollback_source.h new file mode 100644 index 00000000000..304a81717dc --- /dev/null +++ b/src/mongo/db/repl/rollback_source.h @@ -0,0 +1,90 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/base/disallow_copying.h" +#include "mongo/base/status_with.h" +#include "mongo/db/jsobj.h" + +namespace mongo { + + class NamespaceString; + class OperationContext; + +namespace repl { + + class OplogInterface; + + /** + * Interface for rollback-related operations on the sync source. + */ + class RollbackSource { + MONGO_DISALLOW_COPYING(RollbackSource); + public: + + RollbackSource() = default; + + virtual ~RollbackSource() = default; + + /** + * Returns remote oplog interface. + * Read oplog entries with OplogInterface::makeIterator(). + */ + virtual const OplogInterface& getOplog() const = 0; + + /** + * Returns rollback ID. + */ + virtual int getRollbackId() const = 0; + + /** + * Returns last operation in oplog. + */ + virtual BSONObj getLastOperation() const = 0; + + /** + * Fetch a single document from the sync source. + */ + virtual BSONObj findOne(const NamespaceString& nss, const BSONObj& filter) const = 0; + + /** + * Clones a single collection from the sync source. + */ + virtual void copyCollectionFromRemote(OperationContext* txn, + const NamespaceString& nss) const = 0; + + /** + * Returns collection info. + */ + virtual StatusWith<BSONObj> getCollectionInfo(const NamespaceString& nss) const = 0; + + }; + +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/rollback_source_impl.cpp b/src/mongo/db/repl/rollback_source_impl.cpp new file mode 100644 index 00000000000..bdda5e331d9 --- /dev/null +++ b/src/mongo/db/repl/rollback_source_impl.cpp @@ -0,0 +1,98 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/repl/rollback_source_impl.h" + +#include "mongo/client/dbclientinterface.h" +#include "mongo/db/cloner.h" +#include "mongo/db/jsobj.h" +#include "mongo/db/namespace_string.h" +#include "mongo/db/repl/oplogreader.h" +#include "mongo/util/mongoutils/str.h" +#include "mongo/util/assert_util.h" + +namespace mongo { +namespace repl { + + RollbackSourceImpl::RollbackSourceImpl(DBClientConnection* conn, + const std::string& collectionName) + : _conn(conn), + _collectionName(collectionName), + _oplog(conn, collectionName) { } + + const OplogInterface& RollbackSourceImpl::getOplog() const { + return _oplog; + } + + int RollbackSourceImpl::getRollbackId() const { + bo info; + _conn->simpleCommand("admin", &info, "replSetGetRBID"); + return info["rbid"].numberInt(); + } + + BSONObj RollbackSourceImpl::getLastOperation() const { + const Query query = Query().sort(BSON("$natural" << -1)); + return _conn->findOne(_collectionName, query, 0, QueryOption_SlaveOk); + } + + BSONObj RollbackSourceImpl::findOne(const NamespaceString& nss, const BSONObj& filter) const { + return _conn->findOne(nss.toString(), filter, NULL, QueryOption_SlaveOk).getOwned(); + } + + void RollbackSourceImpl::copyCollectionFromRemote(OperationContext* txn, + const NamespaceString& nss) const { + std::string errmsg; + std::unique_ptr<DBClientConnection> tmpConn(new DBClientConnection()); + uassert(15908, + errmsg, + tmpConn->connect(_conn->getServerHostAndPort(), errmsg) && + replAuthenticate(tmpConn.get())); + + // cloner owns _conn in unique_ptr + Cloner cloner; + cloner.setConnection(tmpConn.release()); + uassert(15909, str::stream() << + "replSet rollback error resyncing collection " << nss.ns() << ' ' << errmsg, + cloner.copyCollection(txn, nss.ns(), BSONObj(), errmsg, true, false, true)); + } + + StatusWith<BSONObj> RollbackSourceImpl::getCollectionInfo(const NamespaceString& nss) const { + std::list<BSONObj> info = + _conn->getCollectionInfos(nss.db().toString(), BSON("name" << nss.coll())); + if (info.empty()) { + return StatusWith<BSONObj>(ErrorCodes::NoSuchKey, str::stream() << + "no collection info found: " << nss.ns()); + } + invariant(info.size() == 1U); + return info.front(); + } + +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/rollback_source_impl.h b/src/mongo/db/repl/rollback_source_impl.h new file mode 100644 index 00000000000..3be6ef4339b --- /dev/null +++ b/src/mongo/db/repl/rollback_source_impl.h @@ -0,0 +1,74 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <string> + +#include "mongo/db/repl/oplog_interface_remote.h" +#include "mongo/db/repl/rollback_source.h" + +namespace mongo { + + class DBClientConnection; + +namespace repl { + + /** + * Rollback source implementation using a connection. + */ + + class RollbackSourceImpl : public RollbackSource { + public: + + explicit RollbackSourceImpl(DBClientConnection* conn, const std::string& collectionName); + + const OplogInterface& getOplog() const override; + + int getRollbackId() const override; + + BSONObj getLastOperation() const override; + + BSONObj findOne(const NamespaceString& nss, const BSONObj& filter) const override; + + void copyCollectionFromRemote(OperationContext* txn, + const NamespaceString& nss) const override; + + StatusWith<BSONObj> getCollectionInfo(const NamespaceString& nss) const override; + + private: + + DBClientConnection* _conn; + std::string _collectionName; + OplogInterfaceRemote _oplog; + + }; + + +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/rs_rollback.cpp b/src/mongo/db/repl/rs_rollback.cpp index 3ceda67ffec..0822cf934fd 100644 --- a/src/mongo/db/repl/rs_rollback.cpp +++ b/src/mongo/db/repl/rs_rollback.cpp @@ -34,6 +34,7 @@ #include "mongo/db/repl/rs_rollback.h" #include <boost/shared_ptr.hpp> +#include <memory> #include "mongo/db/auth/authorization_manager_global.h" #include "mongo/db/auth/authorization_manager.h" @@ -41,13 +42,11 @@ #include "mongo/db/catalog/collection_catalog_entry.h" #include "mongo/db/catalog/document_validation.h" #include "mongo/db/client.h" -#include "mongo/db/cloner.h" #include "mongo/db/commands.h" #include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/dbhelpers.h" #include "mongo/db/exec/working_set_common.h" #include "mongo/db/db_raii.h" -#include "mongo/db/operation_context_impl.h" #include "mongo/db/ops/delete.h" #include "mongo/db/ops/update.h" #include "mongo/db/ops/update_lifecycle_impl.h" @@ -56,9 +55,11 @@ #include "mongo/db/repl/bgsync.h" #include "mongo/db/repl/minvalid.h" #include "mongo/db/repl/oplog.h" -#include "mongo/db/repl/oplogreader.h" +#include "mongo/db/repl/oplog_interface.h" #include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/repl/replication_coordinator_impl.h" +#include "mongo/db/repl/roll_back_local_operations.h" +#include "mongo/db/repl/rollback_source.h" #include "mongo/db/repl/rslog.h" #include "mongo/util/log.h" @@ -159,18 +160,10 @@ namespace { }; - /** helper to get rollback id from another server. */ - int getRBID(DBClientConnection *c) { - bo info; - c->simpleCommand("admin", &info, "replSetGetRBID"); - return info["rbid"].numberInt(); - } - - - void refetch(FixUpInfo& fixUpInfo, const BSONObj& ourObj) { + Status refetch(FixUpInfo& fixUpInfo, const BSONObj& ourObj) { const char* op = ourObj.getStringField("op"); if (*op == 'n') - return; + return Status::OK(); if (ourObj.objsize() > 512 * 1024 * 1024) throw RSFatalException("rollback too large"); @@ -181,13 +174,13 @@ namespace { if (*doc.ns == '\0') { warning() << "ignoring op on rollback no ns TODO : " << doc.ownedObj.toString(); - return; + return Status::OK(); } BSONObj obj = doc.ownedObj.getObjectField(*op=='u' ? "o2" : "o"); if (obj.isEmpty()) { warning() << "ignoring op on rollback : " << doc.ownedObj.toString(); - return; + return Status::OK(); } if (*op == 'c') { @@ -197,19 +190,21 @@ namespace { Command* cmd = Command::findCommand(cmdname.c_str()); if (cmd == NULL) { severe() << "rollback no such command " << first.fieldName(); - fassertFailedNoTrace(18751); + return Status(ErrorCodes::UnrecoverableRollbackError, str::stream() << + "rollback no such command " << first.fieldName(), + 18751); } if (cmdname == "create") { // Create collection operation // { ts: ..., h: ..., op: "c", ns: "foo.$cmd", o: { create: "abc", ... } } string ns = nss.db().toString() + '.' + obj["create"].String(); // -> foo.abc fixUpInfo.toDrop.insert(ns); - return; + return Status::OK(); } else if (cmdname == "drop") { string ns = nss.db().toString() + '.' + first.valuestr(); fixUpInfo.collectionsToResyncData.insert(ns); - return; + return Status::OK(); } else if (cmdname == "dropIndexes" || cmdname == "deleteIndexes") { // TODO: this is bad. we simply full resync the collection here, @@ -218,7 +213,7 @@ namespace { << "mongod"; string ns = nss.db().toString() + '.' + first.valuestr(); fixUpInfo.collectionsToResyncData.insert(ns); - return; + return Status::OK(); } else if (cmdname == "renameCollection") { // TODO: slow. @@ -228,7 +223,7 @@ namespace { string to = obj["to"].String(); fixUpInfo.collectionsToResyncData.insert(from); fixUpInfo.collectionsToResyncData.insert(to); - return; + return Status::OK(); } else if (cmdname == "dropDatabase") { severe() << "rollback : can't rollback drop database full resync " @@ -267,144 +262,18 @@ namespace { if (doc._id.eoo()) { warning() << "ignoring op on rollback no _id TODO : " << doc.ns << ' ' << doc.ownedObj.toString(); - return; + return Status::OK(); } fixUpInfo.toRefetch.insert(doc); + return Status::OK(); } - StatusWith<FixUpInfo> syncRollbackFindCommonPoint(OperationContext* txn, - DBClientConnection* them) { - OldClientContext ctx(txn, rsOplogName); - FixUpInfo fixUpInfo; - - boost::scoped_ptr<PlanExecutor> exec( - InternalPlanner::collectionScan(txn, - rsOplogName, - ctx.db()->getCollection(rsOplogName), - InternalPlanner::BACKWARD)); - - BSONObj ourObj; - RecordId ourLoc; - - if (PlanExecutor::ADVANCED != exec->getNext(&ourObj, &ourLoc)) { - return StatusWith<FixUpInfo>(ErrorCodes::OplogStartMissing, "no oplog during initsync"); - } - - const Query query = Query().sort(reverseNaturalObj); - const BSONObj fields = BSON("ts" << 1 << "h" << 1); - - //auto_ptr<DBClientCursor> u = us->query(rsOplogName, query, 0, 0, &fields, 0, 0); - - fixUpInfo.rbid = getRBID(them); - auto_ptr<DBClientCursor> oplogCursor = them->query(rsOplogName, query, 0, 0, &fields, 0, 0); - - if (oplogCursor.get() == NULL || !oplogCursor->more()) - throw RSFatalException("remote oplog empty or unreadable"); - - Timestamp ourTime = ourObj["ts"].timestamp(); - BSONObj theirObj = oplogCursor->nextSafe(); - Timestamp theirTime = theirObj["ts"].timestamp(); - - long long diff = static_cast<long long>(ourTime.getSecs()) - - static_cast<long long>(theirTime.getSecs()); - // diff could be positive, negative, or zero - log() << "rollback our last optime: " << ourTime.toStringPretty(); - log() << "rollback their last optime: " << theirTime.toStringPretty(); - log() << "rollback diff in end of log times: " << diff << " seconds"; - if (diff > 1800) { - severe() << "rollback too long a time period for a rollback."; - throw RSFatalException("rollback error: not willing to roll back " - "more than 30 minutes of data"); - } - - unsigned long long scanned = 0; - while (1) { - scanned++; - // todo add code to assure no excessive scanning for too long - if (ourTime == theirTime) { - if (ourObj["h"].Long() == theirObj["h"].Long()) { - // found the point back in time where we match. - // todo : check a few more just to be careful about hash collisions. - log() << "rollback found matching events at " - << ourTime.toStringPretty(); - log() << "rollback findcommonpoint scanned : " << scanned; - fixUpInfo.commonPoint = ourTime; - fixUpInfo.commonPointOurDiskloc = ourLoc; - break; - } - - refetch(fixUpInfo, ourObj); - - if (!oplogCursor->more()) { - severe() << "rollback error RS100 reached beginning of remote oplog"; - log() << " them: " << them->toString() << " scanned: " << scanned; - log() << " theirTime: " << theirTime.toStringLong(); - log() << " ourTime: " << ourTime.toStringLong(); - throw RSFatalException("RS100 reached beginning of remote oplog [2]"); - } - theirObj = oplogCursor->nextSafe(); - theirTime = theirObj["ts"].timestamp(); - - if (PlanExecutor::ADVANCED != exec->getNext(&ourObj, &ourLoc)) { - severe() << "rollback error RS101 reached beginning of local oplog"; - log() << " them: " << them->toString() << " scanned: " << scanned; - log() << " theirTime: " << theirTime.toStringLong(); - log() << " ourTime: " << ourTime.toStringLong(); - throw RSFatalException("RS101 reached beginning of local oplog [1]"); - } - ourTime = ourObj["ts"].timestamp(); - } - else if (theirTime > ourTime) { - if (!oplogCursor->more()) { - severe() << "rollback error RS100 reached beginning of remote oplog"; - log() << " them: " << them->toString() << " scanned: " - << scanned; - log() << " theirTime: " << theirTime.toStringLong(); - log() << " ourTime: " << ourTime.toStringLong(); - throw RSFatalException("RS100 reached beginning of remote oplog [1]"); - } - theirObj = oplogCursor->nextSafe(); - theirTime = theirObj["ts"].timestamp(); - } - else { - // theirTime < ourTime - refetch(fixUpInfo, ourObj); - if (PlanExecutor::ADVANCED != exec->getNext(&ourObj, &ourLoc)) { - severe() << "rollback error RS101 reached beginning of local oplog"; - log() << " them: " << them->toString() << " scanned: " << scanned; - log() << " theirTime: " << theirTime.toStringLong(); - log() << " ourTime: " << ourTime.toStringLong(); - throw RSFatalException("RS101 reached beginning of local oplog [2]"); - } - ourTime = ourObj["ts"].timestamp(); - } - } - - return StatusWith<FixUpInfo>(fixUpInfo); - } - - bool copyCollectionFromRemote(OperationContext* txn, - const string& host, - const string& ns, - string& errmsg) { - Cloner cloner; - - DBClientConnection *tmpConn = new DBClientConnection(); - // cloner owns _conn in auto_ptr - cloner.setConnection(tmpConn); - uassert(15908, errmsg, - tmpConn->connect(HostAndPort(host), errmsg) && replAuthenticate(tmpConn)); - - return cloner.copyCollection(txn, ns, BSONObj(), errmsg, true, false, true); - } void syncFixUp(OperationContext* txn, FixUpInfo& fixUpInfo, - OplogReader* oplogreader, + const RollbackSource& rollbackSource, ReplicationCoordinator* replCoord) { - DBClientConnection* them = oplogreader->conn(); - // fetch all first so we needn't handle interruption in a fancy way unsigned long long totalSize = 0; @@ -427,8 +296,7 @@ namespace { { // TODO : slow. lots of round trips. numFetched++; - BSONObj good = them->findOne(doc.ns, doc._id.wrap(), - NULL, QueryOption_SlaveOk).getOwned(); + BSONObj good = rollbackSource.findOne(NamespaceString(doc.ns), doc._id.wrap()); totalSize += good.objsize(); uassert(13410, "replSet too much data to roll back", totalSize < 300 * 1024 * 1024); @@ -437,13 +305,13 @@ namespace { goodVersions.push_back(pair<DocID, BSONObj>(doc,good)); } } - newMinValid = oplogreader->getLastOp(rsOplogName); + newMinValid = rollbackSource.getLastOperation(); if (newMinValid.isEmpty()) { error() << "rollback error newMinValid empty?"; return; } } - catch (DBException& e) { + catch (const DBException& e) { LOG(1) << "rollback re-get objects: " << e.toString(); error() << "rollback couldn't re-get ns:" << doc.ns << " _id:" << doc._id << ' ' << numFetched << '/' << fixUpInfo.toRefetch.size(); @@ -451,9 +319,10 @@ namespace { } log() << "rollback 3.5"; - if (fixUpInfo.rbid != getRBID(oplogreader->conn())) { - // our source rolled back itself. so the data we received isn't necessarily consistent. - warning() << "rollback rbid on source changed during rollback, cancelling this attempt"; + if (fixUpInfo.rbid != rollbackSource.getRollbackId()) { + // Our source rolled back itself so the data we received isn't necessarily consistent. + warning() << "rollback rbid on source changed during rollback, " + << "cancelling this attempt"; return; } @@ -492,8 +361,6 @@ namespace { } { - string errmsg; - // This comes as a GlobalWrite lock, so there is no DB to be acquired after // resume, so we can skip the DB stability checks. Also // copyCollectionFromRemote will acquire its own database pointer, under the @@ -501,9 +368,7 @@ namespace { invariant(txn->lockState()->isW()); Lock::TempRelease release(txn->lockState()); - bool ok = copyCollectionFromRemote(txn, them->getServerAddress(), ns, errmsg); - uassert(15909, str::stream() << "replSet rollback error resyncing collection " - << ns << ' ' << errmsg, ok); + rollbackSource.copyCollectionFromRemote(txn, nss); } } @@ -517,30 +382,28 @@ namespace { invariant(collection); auto cce = collection->getCatalogEntry(); - const std::list<BSONObj> info = - them->getCollectionInfos(nss.db().toString(), BSON("name" << nss.coll())); + auto infoResult = rollbackSource.getCollectionInfo(nss); - if (info.empty()) { + if (!infoResult.isOK()) { // Collection dropped by "them" so we should drop it too. log() << ns << " not found on remote host, dropping"; fixUpInfo.toDrop.insert(ns); continue; } - invariant(info.size() == 1); - + auto info = infoResult.getValue(); CollectionOptions options; - if (auto optionsField = info.front()["options"]) { + if (auto optionsField = info["options"]) { if (optionsField.type() != Object) { throw RSFatalException(str::stream() << "Failed to parse options " - << info.front() << ": expected 'options' to be an " + << info << ": expected 'options' to be an " << "Object, got " << typeName(optionsField.type())); } auto status = options.parse(optionsField.Obj()); if (!status.isOK()) { throw RSFatalException(str::stream() << "Failed to parse options " - << info.front() << ": " + << info << ": " << status.toString()); } } @@ -567,7 +430,7 @@ namespace { string err; try { - newMinValid = oplogreader->getLastOp(rsOplogName); + newMinValid = rollbackSource.getLastOperation(); if (newMinValid.isEmpty()) { err = "can't get minvalid from sync source"; } @@ -577,11 +440,11 @@ namespace { setMinValid(txn, minValid); } } - catch (DBException& e) { + catch (const DBException& e) { err = "can't get/set minvalid: "; err += e.what(); } - if (fixUpInfo.rbid != getRBID(oplogreader->conn())) { + if (fixUpInfo.rbid != rollbackSource.getRollbackId()) { // our source rolled back itself. so the data we received isn't necessarily // consistent. however, we've now done writes. thus we have a problem. err += "rbid at primary changed during resync/rollback"; @@ -723,7 +586,7 @@ namespace { try { collection->temp_cappedTruncateAfter(txn, loc, true); } - catch (DBException& e) { + catch (const DBException& e) { if (e.getCode() == 13415) { // hack: need to just make cappedTruncate do this... MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { @@ -741,7 +604,7 @@ namespace { } } } - catch (DBException& e) { + catch (const DBException& e) { error() << "rolling back capped collection rec " << doc.ns << ' ' << e.toString(); } @@ -759,17 +622,16 @@ namespace { // exists on the source. if (collection->numRecords(txn) == 0) { try { - std::list<BSONObj> lst = - them->getCollectionInfos( ctx.db()->name(), - BSON( "name" << nsToCollectionSubstring( doc.ns ) ) ); - if (lst.empty()) { + NamespaceString nss(doc.ns); + auto infoResult = rollbackSource.getCollectionInfo(nss); + if (!infoResult.isOK()) { // we should drop WriteUnitOfWork wunit(txn); ctx.db()->dropCollection(txn, doc.ns); wunit.commit(); } } - catch (DBException&) { + catch (const DBException&) { // this isn't *that* big a deal, but is bad. warning() << "rollback error querying for existence of " << doc.ns << " at the primary, ignoring"; @@ -796,7 +658,7 @@ namespace { } } - catch (DBException& e) { + catch (const DBException& e) { log() << "exception in rollback ns:" << doc.ns << ' ' << pattern.toString() << ' ' << e.toString() << " ndeletes:" << deletes; warn = true; @@ -831,17 +693,22 @@ namespace { log() << "rollback done"; } - unsigned _syncRollback(OperationContext* txn, - OplogReader* oplogreader, - ReplicationCoordinator* replCoord) { + Status _syncRollback(OperationContext* txn, + const OplogInterface& localOplog, + const RollbackSource& rollbackSource, + ReplicationCoordinator* replCoord, + const SleepSecondsFn& sleepSecondsFn, + Milliseconds globalWriteLockTimeoutMs) { invariant(!txn->lockState()->isLocked()); log() << "rollback 0"; - Lock::GlobalWrite globalWrite(txn->lockState(), 20000); - if (!globalWrite.isLocked()) { - warning() << "rollback couldn't get write lock in a reasonable time"; - return 2; + std::unique_ptr<Lock::GlobalWrite> globalWrite( + new Lock::GlobalWrite(txn->lockState(), globalWriteLockTimeoutMs.count())); + if (!globalWrite->isLocked()) { + sleepSecondsFn(Seconds(2)); + return Status(ErrorCodes::LockTimeout, + "rollback couldn't get write lock in a reasonable time"); } /** by doing this, we will not service reads (return an error as we aren't in secondary @@ -851,45 +718,56 @@ namespace { * also, this is better for status reporting - we know what is happening. */ if (!replCoord->setFollowerMode(MemberState::RS_ROLLBACK)) { - warning() << "Cannot transition from " << replCoord->getMemberState() << - " to " << MemberState(MemberState::RS_ROLLBACK); - return 0; + return Status(ErrorCodes::OperationFailed, str::stream() << + "Cannot transition from " << replCoord->getMemberState().toString() << + " to " << MemberState(MemberState::RS_ROLLBACK).toString()); } FixUpInfo how; log() << "rollback 1"; + how.rbid = rollbackSource.getRollbackId(); { - oplogreader->resetCursor(); - log() << "rollback 2 FindCommonPoint"; try { - StatusWith<FixUpInfo> res = syncRollbackFindCommonPoint(txn, oplogreader->conn()); + auto processOperationForFixUp = [&how](const BSONObj& operation) { + return refetch(how, operation); + }; + auto res = syncRollBackLocalOperations( + localOplog, + rollbackSource.getOplog(), + processOperationForFixUp); if (!res.isOK()) { switch (res.getStatus().code()) { case ErrorCodes::OplogStartMissing: - return 1; + case ErrorCodes::UnrecoverableRollbackError: + globalWrite.reset(); + sleepSecondsFn(Seconds(1)); + return res.getStatus(); default: - throw new RSFatalException(res.getStatus().toString()); + throw RSFatalException(res.getStatus().toString()); } } else { - how = res.getValue(); + how.commonPoint = res.getValue().first; + how.commonPointOurDiskloc = res.getValue().second; } } - catch (RSFatalException& e) { + catch (const RSFatalException& e) { error() << string(e.what()); - fassertFailedNoTrace(18752); - return 2; + return Status(ErrorCodes::UnrecoverableRollbackError, str::stream() << + "need to rollback, but unable to determine common point between" + "local and remote oplog: " << e.what(), + 18752); } - catch (DBException& e) { + catch (const DBException& e) { warning() << "rollback 2 exception " << e.toString() << "; sleeping 1 min"; // Release the GlobalWrite lock while sleeping. We should always come here with a // GlobalWrite lock invariant(txn->lockState()->isW()); - Lock::TempRelease(txn->lockState()); + globalWrite.reset(); - sleepsecs(60); + sleepSecondsFn(Seconds(60)); throw; } } @@ -898,12 +776,13 @@ namespace { replCoord->incrementRollbackID(); try { - syncFixUp(txn, how, oplogreader, replCoord); + syncFixUp(txn, how, rollbackSource, replCoord); } - catch (RSFatalException& e) { + catch (const RSFatalException& e) { error() << "exception during rollback: " << e.what(); - fassertFailedNoTrace(18753); - return 2; + return Status(ErrorCodes::UnrecoverableRollbackError, str::stream() << + "exception during rollback: " << e.what(), + 18753); } catch (...) { replCoord->incrementRollbackID(); @@ -927,24 +806,35 @@ namespace { "but found self in " << replCoord->getMemberState(); } - return 0; + return Status::OK(); } + + const Milliseconds kDefaultGlobalWriteLockTimeoutMs(20000); + } // namespace - void syncRollback(OperationContext* txn, - const OpTime& lastOpTimeApplied, - OplogReader* oplogreader, - ReplicationCoordinator* replCoord) { + Status syncRollback(OperationContext* txn, + const OpTime& lastOpTimeApplied, + const OplogInterface& localOplog, + const RollbackSource& rollbackSource, + ReplicationCoordinator* replCoord, + const SleepSecondsFn& sleepSecondsFn, + Milliseconds globalWriteLockTimeoutMs) { + + invariant(txn); + invariant(replCoord); + // check that we are at minvalid, otherwise we cannot rollback as we may be in an // inconsistent state { OpTime minvalid = getMinValid(txn); if( minvalid > lastOpTimeApplied ) { severe() << "need to rollback, but in inconsistent state" << endl; - log() << "minvalid: " << minvalid.toString() << " our last optime: " - << lastOpTimeApplied.toString() << endl; - fassertFailedNoTrace(18750); - return; + return Status(ErrorCodes::UnrecoverableRollbackError, str::stream() << + "need to rollback, but in inconsistent state. " << + "minvalid: " << minvalid.toString() << " our last optime: " << + lastOpTimeApplied.toString(), + 18750); } } @@ -952,11 +842,30 @@ namespace { DisableDocumentValidation validationDisabler(txn); txn->setReplicatedWrites(false); - unsigned s = _syncRollback(txn, oplogreader, replCoord); - if (s) - sleepsecs(s); + Status status = _syncRollback(txn, + localOplog, + rollbackSource, + replCoord, + sleepSecondsFn, + globalWriteLockTimeoutMs); log() << "rollback finished" << rsLog; + return status; + } + + Status syncRollback(OperationContext* txn, + const OpTime& lastOpTimeWritten, + const OplogInterface& localOplog, + const RollbackSource& rollbackSource, + ReplicationCoordinator* replCoord) { + + return syncRollback(txn, + lastOpTimeWritten, + localOplog, + rollbackSource, + replCoord, + [](Seconds seconds) { sleepsecs(seconds.count()); }, + kDefaultGlobalWriteLockTimeoutMs); } } // namespace repl diff --git a/src/mongo/db/repl/rs_rollback.h b/src/mongo/db/repl/rs_rollback.h index bfe2c0c5621..7f4912c080f 100644 --- a/src/mongo/db/repl/rs_rollback.h +++ b/src/mongo/db/repl/rs_rollback.h @@ -28,14 +28,25 @@ #pragma once +#include "mongo/base/disallow_copying.h" +#include "mongo/base/status.h" +#include "mongo/base/status_with.h" +#include "mongo/db/jsobj.h" +#include "mongo/stdx/functional.h" +#include "mongo/util/time_support.h" + namespace mongo { + + class DBClientConnection; + class NamespaceString; class OperationContext; - class Timestamp; namespace repl { - class OplogReader; + + class OplogInterface; class OpTime; class ReplicationCoordinator; + class RollbackSource; /** * Initiates the rollback process. @@ -53,16 +64,30 @@ namespace repl { * * @param txn Used to read and write from this node's databases * @param lastOpTimeWritten The last OpTime applied by the applier - * @param oplogreader Must already be connected to a sync source. Used to fetch documents. + * @param localOplog reads the oplog on this server. + * @param rollbackSource interface for sync source: + * provides oplog; and + * supports fetching documents and copying collections. * @param replCoord Used to track the rollback ID and to change the follower state * - * Failures: some failure cases are fatal; others throw std::exception. + * Failures: Most failures are returned as a status but some failures throw an std::exception. */ - void syncRollback(OperationContext* txn, - const OpTime& lastOpTimeWritten, - OplogReader* oplogreader, - ReplicationCoordinator* replCoord); + using SleepSecondsFn = stdx::function<void (Seconds)>; + + Status syncRollback(OperationContext* txn, + const OpTime& lastOpTimeWritten, + const OplogInterface& localOplog, + const RollbackSource& rollbackSource, + ReplicationCoordinator* replCoord, + const SleepSecondsFn& sleepSecondsFn, + Milliseconds globalWriteLockTimeoutMs); + + Status syncRollback(OperationContext* txn, + const OpTime& lastOpTimeWritten, + const OplogInterface& localOplog, + const RollbackSource& rollbackSource, + ReplicationCoordinator* replCoord); -} -} +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/rs_rollback_test.cpp b/src/mongo/db/repl/rs_rollback_test.cpp new file mode 100644 index 00000000000..08b0046b4e0 --- /dev/null +++ b/src/mongo/db/repl/rs_rollback_test.cpp @@ -0,0 +1,541 @@ +/** + * Copyright 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include <list> +#include <memory> +#include <utility> + +#include "mongo/db/catalog/database.h" +#include "mongo/db/catalog/database_holder.h" +#include "mongo/db/concurrency/d_concurrency.h" +#include "mongo/db/client.h" +#include "mongo/db/jsobj.h" +#include "mongo/db/repl/minvalid.h" +#include "mongo/db/repl/operation_context_repl_mock.h" +#include "mongo/db/repl/oplog.h" +#include "mongo/db/repl/oplog_interface.h" +#include "mongo/db/repl/oplog_interface_mock.h" +#include "mongo/db/repl/replication_coordinator_global.h" +#include "mongo/db/repl/replication_coordinator_mock.h" +#include "mongo/db/repl/rs_rollback.h" +#include "mongo/db/repl/rollback_source.h" +#include "mongo/db/service_context.h" +#include "mongo/db/storage_options.h" +#include "mongo/unittest/unittest.h" +#include "mongo/unittest/temp_dir.h" + +namespace { + + using namespace mongo; + using namespace mongo::repl; + + const Milliseconds globalWriteLockTimeoutMs(10); + const OplogInterfaceMock::Operations kEmptyMockOperations; + + class OperationContextRollbackMock : public OperationContextReplMock { + public: + Client* getClient() const override; + }; + + Client* OperationContextRollbackMock::getClient() const { + Client::initThreadIfNotAlready(); + return &cc(); + } + + ReplSettings createReplSettings() { + ReplSettings settings; + settings.oplogSize = 5 * 1024 * 1024; + settings.replSet = "mySet/node1:12345"; + return settings; + } + + class ReplicationCoordinatorRollbackMock : public ReplicationCoordinatorMock { + public: + ReplicationCoordinatorRollbackMock(); + void resetLastOpTimeFromOplog(OperationContext* txn) override; + }; + + ReplicationCoordinatorRollbackMock::ReplicationCoordinatorRollbackMock() + : ReplicationCoordinatorMock(createReplSettings()) { } + + void ReplicationCoordinatorRollbackMock::resetLastOpTimeFromOplog(OperationContext* txn) { } + + class RollbackSourceMock : public RollbackSource { + public: + RollbackSourceMock(std::unique_ptr<OplogInterface> oplog); + int getRollbackId() const override; + const OplogInterface& getOplog() const override; + BSONObj getLastOperation() const override; + BSONObj findOne(const NamespaceString& nss, const BSONObj& filter) const override; + void copyCollectionFromRemote(OperationContext* txn, + const NamespaceString& nss) const override; + StatusWith<BSONObj> getCollectionInfo(const NamespaceString& nss) const override; + private: + std::unique_ptr<OplogInterface> _oplog; + }; + + RollbackSourceMock::RollbackSourceMock(std::unique_ptr<OplogInterface> oplog) + : _oplog(std::move(oplog)) { } + + const OplogInterface& RollbackSourceMock::getOplog() const { + return *_oplog; + } + + int RollbackSourceMock::getRollbackId() const { + return 0; + } + + BSONObj RollbackSourceMock::getLastOperation() const { + auto iter = _oplog->makeIterator(); + auto result = iter->next(); + ASSERT_OK(result.getStatus()); + return result.getValue().first; + } + + BSONObj RollbackSourceMock::findOne(const NamespaceString& nss, const BSONObj& filter) const { + return BSONObj(); + } + + void RollbackSourceMock::copyCollectionFromRemote(OperationContext* txn, + const NamespaceString& nss) const { } + + StatusWith<BSONObj> RollbackSourceMock::getCollectionInfo(const NamespaceString& nss) const { + return BSON("name" << nss.ns() << "options" << BSONObj()); + } + + class RSRollbackTest : public unittest::Test { + public: + RSRollbackTest(); + + protected: + + std::unique_ptr<OperationContext> _txn; + std::unique_ptr<ReplicationCoordinator> _coordinator; + + private: + void setUp() override; + void tearDown() override; + + ReplicationCoordinator* _prevCoordinator; + }; + + RSRollbackTest::RSRollbackTest() : _prevCoordinator(nullptr) { } + + void RSRollbackTest::setUp() { + ServiceContext* serviceContext = getGlobalServiceContext(); + if (!serviceContext->getGlobalStorageEngine()) { + // When using the 'devnull' storage engine, it is fine for the temporary directory to + // go away after the global storage engine is initialized. + unittest::TempDir tempDir("rs_rollback_test"); + mongo::storageGlobalParams.dbpath = tempDir.path(); + mongo::storageGlobalParams.dbpath = tempDir.path(); + mongo::storageGlobalParams.engine = "inMemoryExperiment"; + mongo::storageGlobalParams.engineSetByUser = true; + serviceContext->initializeGlobalStorageEngine(); + } + + Client::initThreadIfNotAlready(); + _txn.reset(new OperationContextRollbackMock()); + _coordinator.reset(new ReplicationCoordinatorRollbackMock()); + + _prevCoordinator = getGlobalReplicationCoordinator(); + setGlobalReplicationCoordinator(_coordinator.get()); + + setOplogCollectionName(); + } + + void RSRollbackTest::tearDown() { + { + Lock::GlobalWrite globalLock(_txn->lockState()); + BSONObjBuilder unused; + invariant(mongo::dbHolder().closeAll(_txn.get(), unused, false)); + } + setGlobalReplicationCoordinator(_prevCoordinator); + _coordinator.reset(); + _txn.reset(); + } + + void noSleep(Seconds seconds) {} + + TEST_F(RSRollbackTest, InconsistentMinValid) { + repl::setMinValid(_txn.get(), OpTime(Timestamp(Seconds(1), 0), 0)); + auto status = + syncRollback( + _txn.get(), + OpTime(), + OplogInterfaceMock(kEmptyMockOperations), + RollbackSourceMock(std::unique_ptr<OplogInterface>( + new OplogInterfaceMock(kEmptyMockOperations))), + _coordinator.get(), + noSleep, + globalWriteLockTimeoutMs); + ASSERT_EQUALS(ErrorCodes::UnrecoverableRollbackError, status.code()); + ASSERT_EQUALS(18750, status.location()); + } + + TEST_F(RSRollbackTest, CannotObtainGlobalWriteLock) { + OperationContextReplMock txn2; + Lock::DBLock dbLock(txn2.lockState(), "test", MODE_X); + ASSERT_FALSE(_txn->lockState()->isLocked()); + ASSERT_EQUALS( + ErrorCodes::LockTimeout, + syncRollback( + _txn.get(), + OpTime(), + OplogInterfaceMock(kEmptyMockOperations), + RollbackSourceMock(std::unique_ptr<OplogInterface>( + new OplogInterfaceMock(kEmptyMockOperations))), + _coordinator.get(), + noSleep, + globalWriteLockTimeoutMs).code()); + } + + TEST_F(RSRollbackTest, SetFollowerModeFailed) { + class ReplicationCoordinatorSetFollowerModeMock : public ReplicationCoordinatorMock { + public: + ReplicationCoordinatorSetFollowerModeMock() + : ReplicationCoordinatorMock(createReplSettings()) { } + MemberState getMemberState() const override { return MemberState::RS_DOWN; } + bool setFollowerMode(const MemberState& newState) override { return false; } + }; + _coordinator.reset(new ReplicationCoordinatorSetFollowerModeMock()); + setGlobalReplicationCoordinator(_coordinator.get()); + + ASSERT_EQUALS( + ErrorCodes::OperationFailed, + syncRollback( + _txn.get(), + OpTime(), + OplogInterfaceMock(kEmptyMockOperations), + RollbackSourceMock(std::unique_ptr<OplogInterface>( + new OplogInterfaceMock(kEmptyMockOperations))), + _coordinator.get(), + noSleep, + globalWriteLockTimeoutMs).code()); + } + + TEST_F(RSRollbackTest, OplogStartMissing) { + OpTime ts(Timestamp(Seconds(1), 0), 0); + auto operation = + std::make_pair(BSON("ts" << ts.getTimestamp() << "h" << ts.getTerm()), RecordId()); + ASSERT_EQUALS( + ErrorCodes::OplogStartMissing, + syncRollback( + _txn.get(), + OpTime(), + OplogInterfaceMock(kEmptyMockOperations), + RollbackSourceMock(std::unique_ptr<OplogInterface>(new OplogInterfaceMock({ + operation, + }))), + _coordinator.get(), + noSleep, + globalWriteLockTimeoutMs).code()); + } + + TEST_F(RSRollbackTest, NoRemoteOpLog) { + OpTime ts(Timestamp(Seconds(1), 0), 0); + auto operation = + std::make_pair(BSON("ts" << ts.getTimestamp() << "h" << ts.getTerm()), RecordId()); + auto status = + syncRollback( + _txn.get(), + ts, + OplogInterfaceMock({operation}), + RollbackSourceMock(std::unique_ptr<OplogInterface>( + new OplogInterfaceMock(kEmptyMockOperations))), + _coordinator.get(), + noSleep, + globalWriteLockTimeoutMs); + ASSERT_EQUALS(ErrorCodes::UnrecoverableRollbackError, status.code()); + ASSERT_EQUALS(18752, status.location()); + } + + TEST_F(RSRollbackTest, RemoteGetRollbackIdThrows) { + OpTime ts(Timestamp(Seconds(1), 0), 0); + auto operation = + std::make_pair(BSON("ts" << ts.getTimestamp() << "h" << ts.getTerm()), RecordId()); + class RollbackSourceLocal : public RollbackSourceMock { + public: + RollbackSourceLocal(std::unique_ptr<OplogInterface> oplog) + : RollbackSourceMock(std::move(oplog)) { } + int getRollbackId() const override { + uassert(ErrorCodes::UnknownError, "getRollbackId() failed", false); + } + }; + ASSERT_THROWS_CODE( + syncRollback( + _txn.get(), + ts, + OplogInterfaceMock({operation}), + RollbackSourceLocal(std::unique_ptr<OplogInterface>( + new OplogInterfaceMock(kEmptyMockOperations))), + _coordinator.get(), + noSleep, + globalWriteLockTimeoutMs), + UserException, + ErrorCodes::UnknownError); + } + + TEST_F(RSRollbackTest, BothOplogsAtCommonPoint) { + createOplog(_txn.get()); + OpTime ts(Timestamp(Seconds(1), 0), 1); + auto operation = + std::make_pair(BSON("ts" << ts.getTimestamp() << "h" << ts.getTerm()), RecordId(1)); + ASSERT_OK( + syncRollback( + _txn.get(), + ts, + OplogInterfaceMock({operation}), + RollbackSourceMock(std::unique_ptr<OplogInterface>(new OplogInterfaceMock({ + operation, + }))), + _coordinator.get(), + noSleep, + globalWriteLockTimeoutMs)); + } + + TEST_F(RSRollbackTest, FetchDeletedDocumentFromSource) { + createOplog(_txn.get()); + auto commonOperation = + std::make_pair(BSON("ts" << Timestamp(Seconds(1), 0) << "h" << 1LL), RecordId(1)); + auto deleteOperation = + std::make_pair(BSON("ts" << Timestamp(Seconds(2), 0) << + "h" << 1LL << + "op" << "d" << + "ns" << "test.t" << + "o" << BSON("_id" << 0)), + RecordId(2)); + class RollbackSourceLocal : public RollbackSourceMock { + public: + RollbackSourceLocal(std::unique_ptr<OplogInterface> oplog) + : RollbackSourceMock(std::move(oplog)), + called(false) { } + BSONObj findOne(const NamespaceString& nss, const BSONObj& filter) const { + called = true; + return BSONObj(); + } + mutable bool called; + }; + RollbackSourceLocal rollbackSource(std::unique_ptr<OplogInterface>(new OplogInterfaceMock({ + commonOperation, + }))); + OpTime opTime(deleteOperation.first["ts"].timestamp(), + deleteOperation.first["h"].Long()); + ASSERT_OK( + syncRollback( + _txn.get(), + opTime, + OplogInterfaceMock({deleteOperation, commonOperation}), + rollbackSource, + _coordinator.get(), + noSleep, + globalWriteLockTimeoutMs)); + ASSERT_TRUE(rollbackSource.called); + } + + TEST_F(RSRollbackTest, RollbackUnknownCommand) { + createOplog(_txn.get()); + auto commonOperation = + std::make_pair(BSON("ts" << Timestamp(Seconds(1), 0) << "h" << 1LL), RecordId(1)); + auto unknownCommandOperation = + std::make_pair(BSON("ts" << Timestamp(Seconds(2), 0) << + "h" << 1LL << + "op" << "c" << + "ns" << "test.t" << + "o" << BSON("unknown_command" << "t")), + RecordId(2)); + { + Lock::DBLock dbLock(_txn->lockState(), "test", MODE_X); + mongo::WriteUnitOfWork wuow(_txn.get()); + auto db = dbHolder().openDb(_txn.get(), "test"); + ASSERT_TRUE(db); + ASSERT_TRUE(db->getOrCreateCollection(_txn.get(), "test.t")); + wuow.commit(); + } + OpTime opTime(unknownCommandOperation.first["ts"].timestamp(), + unknownCommandOperation.first["h"].Long()); + auto status = + syncRollback( + _txn.get(), + opTime, + OplogInterfaceMock({unknownCommandOperation, commonOperation}), + RollbackSourceMock(std::unique_ptr<OplogInterface>(new OplogInterfaceMock({ + commonOperation, + }))), + _coordinator.get(), + noSleep, + globalWriteLockTimeoutMs); + ASSERT_EQUALS(ErrorCodes::UnrecoverableRollbackError, status.code()); + ASSERT_EQUALS(18751, status.location()); + } + +// Re-enable after fixing recovery unit noop registerChange behavior +#if 0 + TEST_F(RSRollbackTest, RollbackDropCollectionCommand) { + createOplog(_txn.get()); + auto commonOperation = + std::make_pair(BSON("ts" << Timestamp(Seconds(1), 0) << "h" << 1LL), RecordId(1)); + auto dropCollectionOperation = + std::make_pair(BSON("ts" << Timestamp(Seconds(2), 0) << + "h" << 1LL << + "op" << "c" << + "ns" << "test.t" << + "o" << BSON("drop" << "t")), + RecordId(2)); + class RollbackSourceLocal : public RollbackSourceMock { + public: + RollbackSourceLocal(std::unique_ptr<OplogInterface> oplog) + : RollbackSourceMock(std::move(oplog)), + called(false) { } + void copyCollectionFromRemote(OperationContext* txn, + const NamespaceString& nss) const override { + called = true; + } + mutable bool called; + }; + RollbackSourceLocal rollbackSource(std::unique_ptr<OplogInterface>(new OplogInterfaceMock({ + commonOperation, + }))); + { + Lock::DBLock dbLock(_txn->lockState(), "test", MODE_X); + mongo::WriteUnitOfWork wuow(_txn.get()); + auto db = dbHolder().openDb(_txn.get(), "test"); + ASSERT_TRUE(db); + ASSERT_TRUE(db->getOrCreateCollection(_txn.get(), "test.t")); + wuow.commit(); + } + OpTime opTime(dropCollectionOperation.first["ts"].timestamp(), + dropCollectionOperation.first["h"].Long()); + ASSERT_OK( + syncRollback( + _txn.get(), + opTime, + OplogInterfaceMock({dropCollectionOperation, commonOperation}), + rollbackSource, + _coordinator.get(), + noSleep, + globalWriteLockTimeoutMs)); + ASSERT_TRUE(rollbackSource.called); + } +#endif // 0 + + TEST_F(RSRollbackTest, RollbackCollectionModificationCommand) { + createOplog(_txn.get()); + auto commonOperation = + std::make_pair(BSON("ts" << Timestamp(Seconds(1), 0) << "h" << 1LL), RecordId(1)); + auto collectionModificationOperation = + std::make_pair(BSON("ts" << Timestamp(Seconds(2), 0) << + "h" << 1LL << + "op" << "c" << + "ns" << "test.t" << + "o" << BSON("collMod" << "t" << "noPadding" << false)), + RecordId(2)); + class RollbackSourceLocal : public RollbackSourceMock { + public: + RollbackSourceLocal(std::unique_ptr<OplogInterface> oplog) + : RollbackSourceMock(std::move(oplog)), + called(false) { } + StatusWith<BSONObj> getCollectionInfo(const NamespaceString& nss) const { + called = true; + return RollbackSourceMock::getCollectionInfo(nss); + } + mutable bool called; + }; + RollbackSourceLocal rollbackSource(std::unique_ptr<OplogInterface>(new OplogInterfaceMock({ + commonOperation, + }))); + { + Lock::DBLock dbLock(_txn->lockState(), "test", MODE_X); + mongo::WriteUnitOfWork wuow(_txn.get()); + auto db = dbHolder().openDb(_txn.get(), "test"); + ASSERT_TRUE(db); + ASSERT_TRUE(db->getOrCreateCollection(_txn.get(), "test.t")); + wuow.commit(); + } + OpTime opTime(collectionModificationOperation.first["ts"].timestamp(), + collectionModificationOperation.first["h"].Long()); + ASSERT_OK( + syncRollback( + _txn.get(), + opTime, + OplogInterfaceMock({collectionModificationOperation, commonOperation}), + rollbackSource, + _coordinator.get(), + noSleep, + globalWriteLockTimeoutMs)); + ASSERT_TRUE(rollbackSource.called); + } + + TEST_F(RSRollbackTest, RollbackCollectionModificationCommandInvalidCollectionOptions) { + createOplog(_txn.get()); + auto commonOperation = + std::make_pair(BSON("ts" << Timestamp(Seconds(1), 0) << "h" << 1LL), RecordId(1)); + auto collectionModificationOperation = + std::make_pair(BSON("ts" << Timestamp(Seconds(2), 0) << + "h" << 1LL << + "op" << "c" << + "ns" << "test.t" << + "o" << BSON("collMod" << "t" << "noPadding" << false)), + RecordId(2)); + class RollbackSourceLocal : public RollbackSourceMock { + public: + RollbackSourceLocal(std::unique_ptr<OplogInterface> oplog) + : RollbackSourceMock(std::move(oplog)) { } + StatusWith<BSONObj> getCollectionInfo(const NamespaceString& nss) const { + return BSON("name" << nss.ns() << "options" << 12345); + } + }; + RollbackSourceLocal rollbackSource(std::unique_ptr<OplogInterface>(new OplogInterfaceMock({ + commonOperation, + }))); + { + Lock::DBLock dbLock(_txn->lockState(), "test", MODE_X); + mongo::WriteUnitOfWork wuow(_txn.get()); + auto db = dbHolder().openDb(_txn.get(), "test"); + ASSERT_TRUE(db); + ASSERT_TRUE(db->getOrCreateCollection(_txn.get(), "test.t")); + wuow.commit(); + } + OpTime opTime(collectionModificationOperation.first["ts"].timestamp(), + collectionModificationOperation.first["h"].Long()); + auto status = + syncRollback( + _txn.get(), + opTime, + OplogInterfaceMock({collectionModificationOperation, commonOperation}), + rollbackSource, + _coordinator.get(), + noSleep, + globalWriteLockTimeoutMs); + ASSERT_EQUALS(ErrorCodes::UnrecoverableRollbackError, status.code()); + ASSERT_EQUALS(18753, status.location()); + } + +} // namespace |