summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndy Schwerin <schwerin@mongodb.com>2016-01-04 11:07:47 -0500
committerDan Pasette <dan@mongodb.com>2016-01-29 12:05:12 -0500
commit5ca8d6e28255e9519db61d5d25807d70a3000ba7 (patch)
tree98c8fec8881f0f19a919dc14bec5ffd98c290dc5
parent9287affdb7e7c80061450fed87671311672b55d8 (diff)
downloadmongo-5ca8d6e28255e9519db61d5d25807d70a3000ba7.tar.gz
SERVER-22016 Support rolling back certain applyOps oplog entries in replication.
Specifically, support rolling back insert, update and delete applyOps operations such as those generated by sharding's moveChunk, splitChunk and mergeChunks operations. (cherry picked from commit fc81fdee1da1d949f80075c8a88998fa3b0c5e78)
-rw-r--r--src/mongo/db/repl/rs_rollback.cpp33
-rw-r--r--src/mongo/db/repl/rs_rollback_test.cpp144
2 files changed, 169 insertions, 8 deletions
diff --git a/src/mongo/db/repl/rs_rollback.cpp b/src/mongo/db/repl/rs_rollback.cpp
index be9bf0e1d1c..a9073f49524 100644
--- a/src/mongo/db/repl/rs_rollback.cpp
+++ b/src/mongo/db/repl/rs_rollback.cpp
@@ -250,6 +250,27 @@ Status refetch(FixUpInfo& fixUpInfo, const BSONObj& ourObj) {
throw RSFatalException();
}
return Status::OK();
+ } else if (cmdname == "applyOps") {
+ if (first.type() != Array) {
+ std::string message = str::stream()
+ << "Expected applyOps argument to be an array; found " << first.toString();
+ severe() << message;
+ return Status(ErrorCodes::UnrecoverableRollbackError, message);
+ }
+ for (const auto& subopElement : first.Array()) {
+ if (subopElement.type() != Object) {
+ std::string message = str::stream()
+ << "Expected applyOps operations to be of Object type, but found "
+ << subopElement.toString();
+ severe() << message;
+ return Status(ErrorCodes::UnrecoverableRollbackError, message);
+ }
+ auto subStatus = refetch(fixUpInfo, subopElement.Obj());
+ if (!subStatus.isOK()) {
+ return subStatus;
+ }
+ }
+ return Status::OK();
} else {
severe() << "can't rollback this command yet: " << obj.toString();
log() << "cmdname=" << cmdname;
@@ -625,7 +646,8 @@ void syncFixUp(OperationContext* txn,
// Add the doc to our rollback file if the collection was not dropped while
// rolling back createCollection operations.
- // Do not log an error when undoing an insert on a no longer existent collection.
+ // Do not log an error when undoing an insert on a no longer existent
+ // collection.
// It is likely that the collection was dropped as part of rolling back a
// createCollection command and regardless, the document no longer exists.
if (collection && removeSaver) {
@@ -646,15 +668,18 @@ void syncFixUp(OperationContext* txn,
if (idAndDoc.second.isEmpty()) {
// wasn't on the primary; delete.
- // TODO 1.6 : can't delete from a capped collection. need to handle that here.
+ // TODO 1.6 : can't delete from a capped collection. need to handle that
+ // here.
deletes++;
if (collection) {
if (collection->isCapped()) {
- // can't delete from a capped collection - so we truncate instead. if
+ // can't delete from a capped collection - so we truncate instead.
+ // if
// this item must go, so must all successors!!!
try {
- // TODO: IIRC cappedTruncateAfter does not handle completely empty.
+ // TODO: IIRC cappedTruncateAfter does not handle completely
+ // empty.
// this will crazy slow if no _id index.
long long start = Listener::getElapsedTimeMillis();
RecordId loc = Helpers::findOne(txn, collection, pattern, false);
diff --git a/src/mongo/db/repl/rs_rollback_test.cpp b/src/mongo/db/repl/rs_rollback_test.cpp
index fc3a1c40014..32e3c275e39 100644
--- a/src/mongo/db/repl/rs_rollback_test.cpp
+++ b/src/mongo/db/repl/rs_rollback_test.cpp
@@ -26,10 +26,11 @@
* it in the license file.
*/
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kDefault
+
#include "mongo/platform/basic.h"
#include <list>
-#include <memory>
#include <utility>
#include "mongo/db/catalog/collection.h"
@@ -37,8 +38,10 @@
#include "mongo/db/catalog/database_holder.h"
#include "mongo/db/catalog/index_catalog.h"
#include "mongo/db/catalog/index_create.h"
-#include "mongo/db/concurrency/d_concurrency.h"
#include "mongo/db/client.h"
+#include "mongo/db/concurrency/d_concurrency.h"
+#include "mongo/db/db_raii.h"
+#include "mongo/db/dbhelpers.h"
#include "mongo/db/jsobj.h"
#include "mongo/db/repl/minvalid.h"
#include "mongo/db/repl/operation_context_repl_mock.h"
@@ -47,12 +50,14 @@
#include "mongo/db/repl/oplog_interface_mock.h"
#include "mongo/db/repl/replication_coordinator_global.h"
#include "mongo/db/repl/replication_coordinator_mock.h"
-#include "mongo/db/repl/rs_rollback.h"
#include "mongo/db/repl/rollback_source.h"
+#include "mongo/db/repl/rs_rollback.h"
#include "mongo/db/service_context.h"
#include "mongo/db/storage/storage_options.h"
-#include "mongo/unittest/unittest.h"
+#include "mongo/stdx/memory.h"
#include "mongo/unittest/temp_dir.h"
+#include "mongo/unittest/unittest.h"
+#include "mongo/util/log.h"
namespace {
@@ -734,6 +739,137 @@ TEST_F(RSRollbackTest, RollbackDropCollectionCommand) {
ASSERT_TRUE(rollbackSource.called);
}
+BSONObj makeApplyOpsOplogEntry(Timestamp ts, std::initializer_list<BSONObj> ops) {
+ BSONObjBuilder entry;
+ entry << "ts" << ts << "h" << 1LL << "op"
+ << "c"
+ << "ns"
+ << "admin";
+ {
+ BSONObjBuilder cmd(entry.subobjStart("o"));
+ BSONArrayBuilder subops(entry.subarrayStart("applyOps"));
+ for (const auto& op : ops) {
+ subops << op;
+ }
+ }
+ return entry.obj();
+}
+
+OpTime getOpTimeFromOplogEntry(const BSONObj& entry) {
+ const BSONElement tsElement = entry["ts"];
+ const BSONElement termElement = entry["t"];
+ const BSONElement hashElement = entry["h"];
+ ASSERT_EQUALS(bsonTimestamp, tsElement.type()) << entry;
+ ASSERT_TRUE(hashElement.isNumber()) << entry;
+ ASSERT_TRUE(termElement.eoo() || termElement.isNumber()) << entry;
+ long long term = hashElement.numberLong();
+ if (!termElement.eoo()) {
+ term = termElement.numberLong();
+ }
+ return OpTime(tsElement.timestamp(), term);
+}
+
+TEST_F(RSRollbackTest, RollbackApplyOpsCommand) {
+ createOplog(_txn.get());
+
+ {
+ AutoGetOrCreateDb autoDb(_txn.get(), "test", MODE_X);
+ mongo::WriteUnitOfWork wuow(_txn.get());
+ auto coll = autoDb.getDb()->getCollection("test.t");
+ if (!coll) {
+ coll = autoDb.getDb()->createCollection(_txn.get(), "test.t");
+ }
+ ASSERT(coll);
+ ASSERT_OK(coll->insertDocument(_txn.get(), BSON("_id" << 1 << "v" << 2), false));
+ ASSERT_OK(coll->insertDocument(_txn.get(), BSON("_id" << 2 << "v" << 4), false));
+ ASSERT_OK(coll->insertDocument(_txn.get(), BSON("_id" << 4), false));
+ wuow.commit();
+ }
+ const auto commonOperation =
+ std::make_pair(BSON("ts" << Timestamp(Seconds(1), 0) << "h" << 1LL), RecordId(1));
+ const auto applyOpsOperation =
+ std::make_pair(makeApplyOpsOplogEntry(Timestamp(Seconds(2), 0),
+ {BSON("op"
+ << "u"
+ << "ns"
+ << "test.t"
+ << "o2" << BSON("_id" << 1) << "o"
+ << BSON("_id" << 1 << "v" << 2)),
+ BSON("op"
+ << "u"
+ << "ns"
+ << "test.t"
+ << "o2" << BSON("_id" << 2) << "o"
+ << BSON("_id" << 2 << "v" << 4)),
+ BSON("op"
+ << "d"
+ << "ns"
+ << "test.t"
+ << "o" << BSON("_id" << 3)),
+ BSON("op"
+ << "i"
+ << "ns"
+ << "test.t"
+ << "o" << BSON("_id" << 4))}),
+ RecordId(2));
+
+ class RollbackSourceLocal : public RollbackSourceMock {
+ public:
+ RollbackSourceLocal(std::unique_ptr<OplogInterface> oplog)
+ : RollbackSourceMock(std::move(oplog)) {}
+
+ BSONObj findOne(const NamespaceString& nss, const BSONObj& filter) const override {
+ int numFields = 0;
+ for (const auto element : filter) {
+ ++numFields;
+ ASSERT_EQUALS("_id", element.fieldNameStringData()) << filter;
+ }
+ ASSERT_EQUALS(1, numFields) << filter;
+ searchedIds.insert(filter.firstElement().numberInt());
+ switch (filter.firstElement().numberInt()) {
+ case 1:
+ return BSON("_id" << 1 << "v" << 1);
+ case 2:
+ return BSON("_id" << 2 << "v" << 3);
+ case 3:
+ return BSON("_id" << 3 << "v" << 5);
+ case 4:
+ return {};
+ }
+ FAIL("Unexpected findOne request") << filter;
+ return {}; // Unreachable; why doesn't compiler know?
+ }
+
+ mutable std::multiset<int> searchedIds;
+ } rollbackSource(std::unique_ptr<OplogInterface>(new OplogInterfaceMock({commonOperation})));
+
+ _createCollection(_txn.get(), "test.t", CollectionOptions());
+ const auto opTime = getOpTimeFromOplogEntry(applyOpsOperation.first);
+ log() << "Now is " << opTime;
+ ASSERT_OK(syncRollback(_txn.get(),
+ opTime,
+ OplogInterfaceMock({applyOpsOperation, commonOperation}),
+ rollbackSource,
+ _coordinator,
+ noSleep));
+ ASSERT_EQUALS(4U, rollbackSource.searchedIds.size());
+ ASSERT_EQUALS(1U, rollbackSource.searchedIds.count(1));
+ ASSERT_EQUALS(1U, rollbackSource.searchedIds.count(2));
+ ASSERT_EQUALS(1U, rollbackSource.searchedIds.count(3));
+ ASSERT_EQUALS(1U, rollbackSource.searchedIds.count(4));
+
+ AutoGetCollectionForRead acr(_txn.get(), "test.t");
+ BSONObj result;
+ ASSERT(Helpers::findOne(_txn.get(), acr.getCollection(), BSON("_id" << 1), result));
+ ASSERT_EQUALS(1, result["v"].numberInt()) << result;
+ ASSERT(Helpers::findOne(_txn.get(), acr.getCollection(), BSON("_id" << 2), result));
+ ASSERT_EQUALS(3, result["v"].numberInt()) << result;
+ ASSERT(Helpers::findOne(_txn.get(), acr.getCollection(), BSON("_id" << 3), result));
+ ASSERT_EQUALS(5, result["v"].numberInt()) << result;
+ ASSERT_FALSE(Helpers::findOne(_txn.get(), acr.getCollection(), BSON("_id" << 4), result))
+ << result;
+}
+
TEST_F(RSRollbackTest, RollbackCreateCollectionCommand) {
createOplog(_txn.get());
auto commonOperation =