diff options
author | Andy Schwerin <schwerin@mongodb.com> | 2016-01-04 11:07:47 -0500 |
---|---|---|
committer | Dan Pasette <dan@mongodb.com> | 2016-01-29 12:05:12 -0500 |
commit | 5ca8d6e28255e9519db61d5d25807d70a3000ba7 (patch) | |
tree | 98c8fec8881f0f19a919dc14bec5ffd98c290dc5 | |
parent | 9287affdb7e7c80061450fed87671311672b55d8 (diff) | |
download | mongo-5ca8d6e28255e9519db61d5d25807d70a3000ba7.tar.gz |
SERVER-22016 Support rolling back certain applyOps oplog entries in replication.
Specifically, support rolling back insert, update and delete applyOps operations
such as those generated by sharding's moveChunk, splitChunk and mergeChunks
operations.
(cherry picked from commit fc81fdee1da1d949f80075c8a88998fa3b0c5e78)
-rw-r--r-- | src/mongo/db/repl/rs_rollback.cpp | 33 | ||||
-rw-r--r-- | src/mongo/db/repl/rs_rollback_test.cpp | 144 |
2 files changed, 169 insertions, 8 deletions
diff --git a/src/mongo/db/repl/rs_rollback.cpp b/src/mongo/db/repl/rs_rollback.cpp index be9bf0e1d1c..a9073f49524 100644 --- a/src/mongo/db/repl/rs_rollback.cpp +++ b/src/mongo/db/repl/rs_rollback.cpp @@ -250,6 +250,27 @@ Status refetch(FixUpInfo& fixUpInfo, const BSONObj& ourObj) { throw RSFatalException(); } return Status::OK(); + } else if (cmdname == "applyOps") { + if (first.type() != Array) { + std::string message = str::stream() + << "Expected applyOps argument to be an array; found " << first.toString(); + severe() << message; + return Status(ErrorCodes::UnrecoverableRollbackError, message); + } + for (const auto& subopElement : first.Array()) { + if (subopElement.type() != Object) { + std::string message = str::stream() + << "Expected applyOps operations to be of Object type, but found " + << subopElement.toString(); + severe() << message; + return Status(ErrorCodes::UnrecoverableRollbackError, message); + } + auto subStatus = refetch(fixUpInfo, subopElement.Obj()); + if (!subStatus.isOK()) { + return subStatus; + } + } + return Status::OK(); } else { severe() << "can't rollback this command yet: " << obj.toString(); log() << "cmdname=" << cmdname; @@ -625,7 +646,8 @@ void syncFixUp(OperationContext* txn, // Add the doc to our rollback file if the collection was not dropped while // rolling back createCollection operations. - // Do not log an error when undoing an insert on a no longer existent collection. + // Do not log an error when undoing an insert on a no longer existent + // collection. // It is likely that the collection was dropped as part of rolling back a // createCollection command and regardless, the document no longer exists. if (collection && removeSaver) { @@ -646,15 +668,18 @@ void syncFixUp(OperationContext* txn, if (idAndDoc.second.isEmpty()) { // wasn't on the primary; delete. - // TODO 1.6 : can't delete from a capped collection. need to handle that here. + // TODO 1.6 : can't delete from a capped collection. need to handle that + // here. deletes++; if (collection) { if (collection->isCapped()) { - // can't delete from a capped collection - so we truncate instead. if + // can't delete from a capped collection - so we truncate instead. + // if // this item must go, so must all successors!!! try { - // TODO: IIRC cappedTruncateAfter does not handle completely empty. + // TODO: IIRC cappedTruncateAfter does not handle completely + // empty. // this will crazy slow if no _id index. long long start = Listener::getElapsedTimeMillis(); RecordId loc = Helpers::findOne(txn, collection, pattern, false); diff --git a/src/mongo/db/repl/rs_rollback_test.cpp b/src/mongo/db/repl/rs_rollback_test.cpp index fc3a1c40014..32e3c275e39 100644 --- a/src/mongo/db/repl/rs_rollback_test.cpp +++ b/src/mongo/db/repl/rs_rollback_test.cpp @@ -26,10 +26,11 @@ * it in the license file. */ +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault + #include "mongo/platform/basic.h" #include <list> -#include <memory> #include <utility> #include "mongo/db/catalog/collection.h" @@ -37,8 +38,10 @@ #include "mongo/db/catalog/database_holder.h" #include "mongo/db/catalog/index_catalog.h" #include "mongo/db/catalog/index_create.h" -#include "mongo/db/concurrency/d_concurrency.h" #include "mongo/db/client.h" +#include "mongo/db/concurrency/d_concurrency.h" +#include "mongo/db/db_raii.h" +#include "mongo/db/dbhelpers.h" #include "mongo/db/jsobj.h" #include "mongo/db/repl/minvalid.h" #include "mongo/db/repl/operation_context_repl_mock.h" @@ -47,12 +50,14 @@ #include "mongo/db/repl/oplog_interface_mock.h" #include "mongo/db/repl/replication_coordinator_global.h" #include "mongo/db/repl/replication_coordinator_mock.h" -#include "mongo/db/repl/rs_rollback.h" #include "mongo/db/repl/rollback_source.h" +#include "mongo/db/repl/rs_rollback.h" #include "mongo/db/service_context.h" #include "mongo/db/storage/storage_options.h" -#include "mongo/unittest/unittest.h" +#include "mongo/stdx/memory.h" #include "mongo/unittest/temp_dir.h" +#include "mongo/unittest/unittest.h" +#include "mongo/util/log.h" namespace { @@ -734,6 +739,137 @@ TEST_F(RSRollbackTest, RollbackDropCollectionCommand) { ASSERT_TRUE(rollbackSource.called); } +BSONObj makeApplyOpsOplogEntry(Timestamp ts, std::initializer_list<BSONObj> ops) { + BSONObjBuilder entry; + entry << "ts" << ts << "h" << 1LL << "op" + << "c" + << "ns" + << "admin"; + { + BSONObjBuilder cmd(entry.subobjStart("o")); + BSONArrayBuilder subops(entry.subarrayStart("applyOps")); + for (const auto& op : ops) { + subops << op; + } + } + return entry.obj(); +} + +OpTime getOpTimeFromOplogEntry(const BSONObj& entry) { + const BSONElement tsElement = entry["ts"]; + const BSONElement termElement = entry["t"]; + const BSONElement hashElement = entry["h"]; + ASSERT_EQUALS(bsonTimestamp, tsElement.type()) << entry; + ASSERT_TRUE(hashElement.isNumber()) << entry; + ASSERT_TRUE(termElement.eoo() || termElement.isNumber()) << entry; + long long term = hashElement.numberLong(); + if (!termElement.eoo()) { + term = termElement.numberLong(); + } + return OpTime(tsElement.timestamp(), term); +} + +TEST_F(RSRollbackTest, RollbackApplyOpsCommand) { + createOplog(_txn.get()); + + { + AutoGetOrCreateDb autoDb(_txn.get(), "test", MODE_X); + mongo::WriteUnitOfWork wuow(_txn.get()); + auto coll = autoDb.getDb()->getCollection("test.t"); + if (!coll) { + coll = autoDb.getDb()->createCollection(_txn.get(), "test.t"); + } + ASSERT(coll); + ASSERT_OK(coll->insertDocument(_txn.get(), BSON("_id" << 1 << "v" << 2), false)); + ASSERT_OK(coll->insertDocument(_txn.get(), BSON("_id" << 2 << "v" << 4), false)); + ASSERT_OK(coll->insertDocument(_txn.get(), BSON("_id" << 4), false)); + wuow.commit(); + } + const auto commonOperation = + std::make_pair(BSON("ts" << Timestamp(Seconds(1), 0) << "h" << 1LL), RecordId(1)); + const auto applyOpsOperation = + std::make_pair(makeApplyOpsOplogEntry(Timestamp(Seconds(2), 0), + {BSON("op" + << "u" + << "ns" + << "test.t" + << "o2" << BSON("_id" << 1) << "o" + << BSON("_id" << 1 << "v" << 2)), + BSON("op" + << "u" + << "ns" + << "test.t" + << "o2" << BSON("_id" << 2) << "o" + << BSON("_id" << 2 << "v" << 4)), + BSON("op" + << "d" + << "ns" + << "test.t" + << "o" << BSON("_id" << 3)), + BSON("op" + << "i" + << "ns" + << "test.t" + << "o" << BSON("_id" << 4))}), + RecordId(2)); + + class RollbackSourceLocal : public RollbackSourceMock { + public: + RollbackSourceLocal(std::unique_ptr<OplogInterface> oplog) + : RollbackSourceMock(std::move(oplog)) {} + + BSONObj findOne(const NamespaceString& nss, const BSONObj& filter) const override { + int numFields = 0; + for (const auto element : filter) { + ++numFields; + ASSERT_EQUALS("_id", element.fieldNameStringData()) << filter; + } + ASSERT_EQUALS(1, numFields) << filter; + searchedIds.insert(filter.firstElement().numberInt()); + switch (filter.firstElement().numberInt()) { + case 1: + return BSON("_id" << 1 << "v" << 1); + case 2: + return BSON("_id" << 2 << "v" << 3); + case 3: + return BSON("_id" << 3 << "v" << 5); + case 4: + return {}; + } + FAIL("Unexpected findOne request") << filter; + return {}; // Unreachable; why doesn't compiler know? + } + + mutable std::multiset<int> searchedIds; + } rollbackSource(std::unique_ptr<OplogInterface>(new OplogInterfaceMock({commonOperation}))); + + _createCollection(_txn.get(), "test.t", CollectionOptions()); + const auto opTime = getOpTimeFromOplogEntry(applyOpsOperation.first); + log() << "Now is " << opTime; + ASSERT_OK(syncRollback(_txn.get(), + opTime, + OplogInterfaceMock({applyOpsOperation, commonOperation}), + rollbackSource, + _coordinator, + noSleep)); + ASSERT_EQUALS(4U, rollbackSource.searchedIds.size()); + ASSERT_EQUALS(1U, rollbackSource.searchedIds.count(1)); + ASSERT_EQUALS(1U, rollbackSource.searchedIds.count(2)); + ASSERT_EQUALS(1U, rollbackSource.searchedIds.count(3)); + ASSERT_EQUALS(1U, rollbackSource.searchedIds.count(4)); + + AutoGetCollectionForRead acr(_txn.get(), "test.t"); + BSONObj result; + ASSERT(Helpers::findOne(_txn.get(), acr.getCollection(), BSON("_id" << 1), result)); + ASSERT_EQUALS(1, result["v"].numberInt()) << result; + ASSERT(Helpers::findOne(_txn.get(), acr.getCollection(), BSON("_id" << 2), result)); + ASSERT_EQUALS(3, result["v"].numberInt()) << result; + ASSERT(Helpers::findOne(_txn.get(), acr.getCollection(), BSON("_id" << 3), result)); + ASSERT_EQUALS(5, result["v"].numberInt()) << result; + ASSERT_FALSE(Helpers::findOne(_txn.get(), acr.getCollection(), BSON("_id" << 4), result)) + << result; +} + TEST_F(RSRollbackTest, RollbackCreateCollectionCommand) { createOplog(_txn.get()); auto commonOperation = |