From 5ca8d6e28255e9519db61d5d25807d70a3000ba7 Mon Sep 17 00:00:00 2001 From: Andy Schwerin Date: Mon, 4 Jan 2016 11:07:47 -0500 Subject: SERVER-22016 Support rolling back certain applyOps oplog entries in replication. Specifically, support rolling back insert, update and delete applyOps operations such as those generated by sharding's moveChunk, splitChunk and mergeChunks operations. (cherry picked from commit fc81fdee1da1d949f80075c8a88998fa3b0c5e78) --- src/mongo/db/repl/rs_rollback.cpp | 33 +++++++- src/mongo/db/repl/rs_rollback_test.cpp | 144 ++++++++++++++++++++++++++++++++- 2 files changed, 169 insertions(+), 8 deletions(-) diff --git a/src/mongo/db/repl/rs_rollback.cpp b/src/mongo/db/repl/rs_rollback.cpp index be9bf0e1d1c..a9073f49524 100644 --- a/src/mongo/db/repl/rs_rollback.cpp +++ b/src/mongo/db/repl/rs_rollback.cpp @@ -250,6 +250,27 @@ Status refetch(FixUpInfo& fixUpInfo, const BSONObj& ourObj) { throw RSFatalException(); } return Status::OK(); + } else if (cmdname == "applyOps") { + if (first.type() != Array) { + std::string message = str::stream() + << "Expected applyOps argument to be an array; found " << first.toString(); + severe() << message; + return Status(ErrorCodes::UnrecoverableRollbackError, message); + } + for (const auto& subopElement : first.Array()) { + if (subopElement.type() != Object) { + std::string message = str::stream() + << "Expected applyOps operations to be of Object type, but found " + << subopElement.toString(); + severe() << message; + return Status(ErrorCodes::UnrecoverableRollbackError, message); + } + auto subStatus = refetch(fixUpInfo, subopElement.Obj()); + if (!subStatus.isOK()) { + return subStatus; + } + } + return Status::OK(); } else { severe() << "can't rollback this command yet: " << obj.toString(); log() << "cmdname=" << cmdname; @@ -625,7 +646,8 @@ void syncFixUp(OperationContext* txn, // Add the doc to our rollback file if the collection was not dropped while // rolling back createCollection operations. - // Do not log an error when undoing an insert on a no longer existent collection. + // Do not log an error when undoing an insert on a no longer existent + // collection. // It is likely that the collection was dropped as part of rolling back a // createCollection command and regardless, the document no longer exists. if (collection && removeSaver) { @@ -646,15 +668,18 @@ void syncFixUp(OperationContext* txn, if (idAndDoc.second.isEmpty()) { // wasn't on the primary; delete. - // TODO 1.6 : can't delete from a capped collection. need to handle that here. + // TODO 1.6 : can't delete from a capped collection. need to handle that + // here. deletes++; if (collection) { if (collection->isCapped()) { - // can't delete from a capped collection - so we truncate instead. if + // can't delete from a capped collection - so we truncate instead. + // if // this item must go, so must all successors!!! try { - // TODO: IIRC cappedTruncateAfter does not handle completely empty. + // TODO: IIRC cappedTruncateAfter does not handle completely + // empty. // this will crazy slow if no _id index. long long start = Listener::getElapsedTimeMillis(); RecordId loc = Helpers::findOne(txn, collection, pattern, false); diff --git a/src/mongo/db/repl/rs_rollback_test.cpp b/src/mongo/db/repl/rs_rollback_test.cpp index fc3a1c40014..32e3c275e39 100644 --- a/src/mongo/db/repl/rs_rollback_test.cpp +++ b/src/mongo/db/repl/rs_rollback_test.cpp @@ -26,10 +26,11 @@ * it in the license file. */ +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault + #include "mongo/platform/basic.h" #include -#include #include #include "mongo/db/catalog/collection.h" @@ -37,8 +38,10 @@ #include "mongo/db/catalog/database_holder.h" #include "mongo/db/catalog/index_catalog.h" #include "mongo/db/catalog/index_create.h" -#include "mongo/db/concurrency/d_concurrency.h" #include "mongo/db/client.h" +#include "mongo/db/concurrency/d_concurrency.h" +#include "mongo/db/db_raii.h" +#include "mongo/db/dbhelpers.h" #include "mongo/db/jsobj.h" #include "mongo/db/repl/minvalid.h" #include "mongo/db/repl/operation_context_repl_mock.h" @@ -47,12 +50,14 @@ #include "mongo/db/repl/oplog_interface_mock.h" #include "mongo/db/repl/replication_coordinator_global.h" #include "mongo/db/repl/replication_coordinator_mock.h" -#include "mongo/db/repl/rs_rollback.h" #include "mongo/db/repl/rollback_source.h" +#include "mongo/db/repl/rs_rollback.h" #include "mongo/db/service_context.h" #include "mongo/db/storage/storage_options.h" -#include "mongo/unittest/unittest.h" +#include "mongo/stdx/memory.h" #include "mongo/unittest/temp_dir.h" +#include "mongo/unittest/unittest.h" +#include "mongo/util/log.h" namespace { @@ -734,6 +739,137 @@ TEST_F(RSRollbackTest, RollbackDropCollectionCommand) { ASSERT_TRUE(rollbackSource.called); } +BSONObj makeApplyOpsOplogEntry(Timestamp ts, std::initializer_list ops) { + BSONObjBuilder entry; + entry << "ts" << ts << "h" << 1LL << "op" + << "c" + << "ns" + << "admin"; + { + BSONObjBuilder cmd(entry.subobjStart("o")); + BSONArrayBuilder subops(entry.subarrayStart("applyOps")); + for (const auto& op : ops) { + subops << op; + } + } + return entry.obj(); +} + +OpTime getOpTimeFromOplogEntry(const BSONObj& entry) { + const BSONElement tsElement = entry["ts"]; + const BSONElement termElement = entry["t"]; + const BSONElement hashElement = entry["h"]; + ASSERT_EQUALS(bsonTimestamp, tsElement.type()) << entry; + ASSERT_TRUE(hashElement.isNumber()) << entry; + ASSERT_TRUE(termElement.eoo() || termElement.isNumber()) << entry; + long long term = hashElement.numberLong(); + if (!termElement.eoo()) { + term = termElement.numberLong(); + } + return OpTime(tsElement.timestamp(), term); +} + +TEST_F(RSRollbackTest, RollbackApplyOpsCommand) { + createOplog(_txn.get()); + + { + AutoGetOrCreateDb autoDb(_txn.get(), "test", MODE_X); + mongo::WriteUnitOfWork wuow(_txn.get()); + auto coll = autoDb.getDb()->getCollection("test.t"); + if (!coll) { + coll = autoDb.getDb()->createCollection(_txn.get(), "test.t"); + } + ASSERT(coll); + ASSERT_OK(coll->insertDocument(_txn.get(), BSON("_id" << 1 << "v" << 2), false)); + ASSERT_OK(coll->insertDocument(_txn.get(), BSON("_id" << 2 << "v" << 4), false)); + ASSERT_OK(coll->insertDocument(_txn.get(), BSON("_id" << 4), false)); + wuow.commit(); + } + const auto commonOperation = + std::make_pair(BSON("ts" << Timestamp(Seconds(1), 0) << "h" << 1LL), RecordId(1)); + const auto applyOpsOperation = + std::make_pair(makeApplyOpsOplogEntry(Timestamp(Seconds(2), 0), + {BSON("op" + << "u" + << "ns" + << "test.t" + << "o2" << BSON("_id" << 1) << "o" + << BSON("_id" << 1 << "v" << 2)), + BSON("op" + << "u" + << "ns" + << "test.t" + << "o2" << BSON("_id" << 2) << "o" + << BSON("_id" << 2 << "v" << 4)), + BSON("op" + << "d" + << "ns" + << "test.t" + << "o" << BSON("_id" << 3)), + BSON("op" + << "i" + << "ns" + << "test.t" + << "o" << BSON("_id" << 4))}), + RecordId(2)); + + class RollbackSourceLocal : public RollbackSourceMock { + public: + RollbackSourceLocal(std::unique_ptr oplog) + : RollbackSourceMock(std::move(oplog)) {} + + BSONObj findOne(const NamespaceString& nss, const BSONObj& filter) const override { + int numFields = 0; + for (const auto element : filter) { + ++numFields; + ASSERT_EQUALS("_id", element.fieldNameStringData()) << filter; + } + ASSERT_EQUALS(1, numFields) << filter; + searchedIds.insert(filter.firstElement().numberInt()); + switch (filter.firstElement().numberInt()) { + case 1: + return BSON("_id" << 1 << "v" << 1); + case 2: + return BSON("_id" << 2 << "v" << 3); + case 3: + return BSON("_id" << 3 << "v" << 5); + case 4: + return {}; + } + FAIL("Unexpected findOne request") << filter; + return {}; // Unreachable; why doesn't compiler know? + } + + mutable std::multiset searchedIds; + } rollbackSource(std::unique_ptr(new OplogInterfaceMock({commonOperation}))); + + _createCollection(_txn.get(), "test.t", CollectionOptions()); + const auto opTime = getOpTimeFromOplogEntry(applyOpsOperation.first); + log() << "Now is " << opTime; + ASSERT_OK(syncRollback(_txn.get(), + opTime, + OplogInterfaceMock({applyOpsOperation, commonOperation}), + rollbackSource, + _coordinator, + noSleep)); + ASSERT_EQUALS(4U, rollbackSource.searchedIds.size()); + ASSERT_EQUALS(1U, rollbackSource.searchedIds.count(1)); + ASSERT_EQUALS(1U, rollbackSource.searchedIds.count(2)); + ASSERT_EQUALS(1U, rollbackSource.searchedIds.count(3)); + ASSERT_EQUALS(1U, rollbackSource.searchedIds.count(4)); + + AutoGetCollectionForRead acr(_txn.get(), "test.t"); + BSONObj result; + ASSERT(Helpers::findOne(_txn.get(), acr.getCollection(), BSON("_id" << 1), result)); + ASSERT_EQUALS(1, result["v"].numberInt()) << result; + ASSERT(Helpers::findOne(_txn.get(), acr.getCollection(), BSON("_id" << 2), result)); + ASSERT_EQUALS(3, result["v"].numberInt()) << result; + ASSERT(Helpers::findOne(_txn.get(), acr.getCollection(), BSON("_id" << 3), result)); + ASSERT_EQUALS(5, result["v"].numberInt()) << result; + ASSERT_FALSE(Helpers::findOne(_txn.get(), acr.getCollection(), BSON("_id" << 4), result)) + << result; +} + TEST_F(RSRollbackTest, RollbackCreateCollectionCommand) { createOplog(_txn.get()); auto commonOperation = -- cgit v1.2.1