summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSiyuan Zhou <visualzhou@gmail.com>2017-01-11 20:51:51 -0500
committerSiyuan Zhou <visualzhou@gmail.com>2017-01-12 18:58:06 -0500
commit5313582dae3fc0fe11d91dbbc3d2a8154de3b038 (patch)
treee2fc13a6a001b207c381fbace9ec2fee1bbf87f7
parentdf3fcefe77119447ce2ad8174657563176da2545 (diff)
downloadmongo-5313582dae3fc0fe11d91dbbc3d2a8154de3b038.tar.gz
SERVER-26117 renameCollection 'c' op should restart initial sync upon application
-rw-r--r--src/mongo/db/catalog/apply_ops.cpp2
-rw-r--r--src/mongo/db/repl/applier.h2
-rw-r--r--src/mongo/db/repl/master_slave.cpp4
-rw-r--r--src/mongo/db/repl/oplog.cpp16
-rw-r--r--src/mongo/db/repl/oplog.h8
-rw-r--r--src/mongo/db/repl/sync_tail.cpp32
-rw-r--r--src/mongo/db/repl/sync_tail.h8
-rw-r--r--src/mongo/db/repl/sync_tail_test.cpp80
8 files changed, 91 insertions, 61 deletions
diff --git a/src/mongo/db/catalog/apply_ops.cpp b/src/mongo/db/catalog/apply_ops.cpp
index 6d00127b9af..0afea74e776 100644
--- a/src/mongo/db/catalog/apply_ops.cpp
+++ b/src/mongo/db/catalog/apply_ops.cpp
@@ -132,7 +132,7 @@ Status applyOps(OperationContext* txn,
try {
MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
if (*opType == 'c') {
- status = repl::applyCommand_inlock(txn, temp);
+ status = repl::applyCommand_inlock(txn, temp, true);
break;
} else {
OldClientContext ctx(txn, ns);
diff --git a/src/mongo/db/repl/applier.h b/src/mongo/db/repl/applier.h
index a8e48c0edb9..d035e0d80ef 100644
--- a/src/mongo/db/repl/applier.h
+++ b/src/mongo/db/repl/applier.h
@@ -64,7 +64,7 @@ public:
/**
* Type of function to to apply a single operation. In production, this function
- * would have the same outcome as calling SyncTail::syncApply() ('convertUpdatesToUpserts'
+ * would have the same outcome as calling SyncTail::syncApply() ('inSteadyStateReplication'
* value will be embedded in the function implementation).
*/
using ApplyOperationFn = stdx::function<Status(OperationContext*, const BSONObj&)>;
diff --git a/src/mongo/db/repl/master_slave.cpp b/src/mongo/db/repl/master_slave.cpp
index 1cb12ba561e..67a156cbce7 100644
--- a/src/mongo/db/repl/master_slave.cpp
+++ b/src/mongo/db/repl/master_slave.cpp
@@ -616,14 +616,14 @@ bool ReplSource::handleDuplicateDbName(OperationContext* txn,
void ReplSource::applyCommand(OperationContext* txn, const BSONObj& op) {
try {
- Status status = applyCommand_inlock(txn, op);
+ Status status = applyCommand_inlock(txn, op, true);
if (!status.isOK()) {
SyncTail sync(nullptr, SyncTail::MultiSyncApplyFunc());
sync.setHostname(hostName);
if (sync.shouldRetry(txn, op)) {
uassert(28639,
"Failure retrying initial sync update",
- applyCommand_inlock(txn, op).isOK());
+ applyCommand_inlock(txn, op, true).isOK());
}
}
} catch (UserException& e) {
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp
index d98db32fd43..2c23f12e97b 100644
--- a/src/mongo/db/repl/oplog.cpp
+++ b/src/mongo/db/repl/oplog.cpp
@@ -712,7 +712,7 @@ std::map<std::string, ApplyOpMetadata> opsMap = {
Status applyOperation_inlock(OperationContext* txn,
Database* db,
const BSONObj& op,
- bool convertUpdateToUpsert) {
+ bool inSteadyStateReplication) {
LOG(3) << "applying op: " << op << endl;
OpCounters* opCounters = txn->writesAreReplicated() ? &globalOpCounters : &replOpCounters;
@@ -866,7 +866,7 @@ Status applyOperation_inlock(OperationContext* txn,
OpDebug debug;
BSONObj updateCriteria = o2;
- const bool upsert = valueB || convertUpdateToUpsert;
+ const bool upsert = valueB || inSteadyStateReplication;
uassert(ErrorCodes::NoSuchKey,
str::stream() << "Failed to apply update due to missing _id: " << op.toString(),
@@ -949,7 +949,9 @@ Status applyOperation_inlock(OperationContext* txn,
return Status::OK();
}
-Status applyCommand_inlock(OperationContext* txn, const BSONObj& op) {
+Status applyCommand_inlock(OperationContext* txn,
+ const BSONObj& op,
+ bool inSteadyStateReplication) {
const char* names[] = {"o", "ns", "op"};
BSONElement fields[3];
op.getFields(3, names, fields);
@@ -972,6 +974,14 @@ Status applyCommand_inlock(OperationContext* txn, const BSONObj& op) {
const char* ns = fieldNs.valuestrsafe();
+ // Applying renameCollection during initial sync might lead to data corruption, so we restart
+ // the initial sync.
+ if (!inSteadyStateReplication && o.firstElementFieldName() == std::string("renameCollection")) {
+ return Status(ErrorCodes::OplogOperationUnsupported,
+ str::stream()
+ << "Applying renameCollection not supported in initial sync: " << op);
+ }
+
// Applying commands in repl is done under Global W-lock, so it is safe to not
// perform the current DB checks after reacquiring the lock.
invariant(txn->lockState()->isW());
diff --git a/src/mongo/db/repl/oplog.h b/src/mongo/db/repl/oplog.h
index b022aa0e545..e3251102fef 100644
--- a/src/mongo/db/repl/oplog.h
+++ b/src/mongo/db/repl/oplog.h
@@ -128,20 +128,22 @@ void oplogCheckCloseDatabase(OperationContext* txn, Database* db);
/**
* Take a non-command op and apply it locally
* Used for applying from an oplog
- * @param convertUpdateToUpsert convert some updates to upserts for idempotency reasons
+ * @param inSteadyStateReplication convert some updates to upserts for idempotency reasons
* Returns failure status if the op was an update that could not be applied.
*/
Status applyOperation_inlock(OperationContext* txn,
Database* db,
const BSONObj& op,
- bool convertUpdateToUpsert = false);
+ bool inSteadyStateReplication = false);
/**
* Take a command op and apply it locally
* Used for applying from an oplog
+ * inSteadyStateReplication indicates whether we are in steady state replication, rather than
+ * initial sync.
* Returns failure status if the op that could not be applied.
*/
-Status applyCommand_inlock(OperationContext* txn, const BSONObj& op);
+Status applyCommand_inlock(OperationContext* txn, const BSONObj& op, bool inSteadyStateReplication);
/**
* Initializes the global Timestamp with the value from the timestamp of the last oplog entry.
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index 6ecf9d7028a..3a87828e76b 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -255,7 +255,7 @@ bool SyncTail::peek(BSONObj* op) {
// static
Status SyncTail::syncApply(OperationContext* txn,
const BSONObj& op,
- bool convertUpdateToUpsert,
+ bool inSteadyStateReplication,
ApplyOperationInLockFn applyOperationInLock,
ApplyCommandInLockFn applyCommandInLock,
IncrementOpsAppliedStatsFn incrementOpsAppliedStats) {
@@ -291,7 +291,7 @@ Status SyncTail::syncApply(OperationContext* txn,
Lock::GlobalWrite globalWriteLock(txn->lockState());
// special case apply for commands to avoid implicit database creation
- Status status = applyCommandInLock(txn, op);
+ Status status = applyCommandInLock(txn, op, inSteadyStateReplication);
incrementOpsAppliedStats();
return status;
}
@@ -304,7 +304,7 @@ Status SyncTail::syncApply(OperationContext* txn,
txn->setReplicatedWrites(false);
DisableDocumentValidation validationDisabler(txn);
- Status status = applyOperationInLock(txn, db, op, convertUpdateToUpsert);
+ Status status = applyOperationInLock(txn, db, op, inSteadyStateReplication);
if (!status.isOK() && status.code() == ErrorCodes::WriteConflict) {
throw WriteConflictException();
}
@@ -369,10 +369,12 @@ Status SyncTail::syncApply(OperationContext* txn,
return Status(ErrorCodes::BadValue, ss);
}
-Status SyncTail::syncApply(OperationContext* txn, const BSONObj& op, bool convertUpdateToUpsert) {
+Status SyncTail::syncApply(OperationContext* txn,
+ const BSONObj& op,
+ bool inSteadyStateReplication) {
return syncApply(txn,
op,
- convertUpdateToUpsert,
+ inSteadyStateReplication,
applyOperation_inlock,
applyCommand_inlock,
stdx::bind(&Counter64::increment, &opsAppliedStats, 1ULL));
@@ -989,11 +991,12 @@ void multiSyncApply(const std::vector<BSONObj>& ops, SyncTail* st) {
// allow us to get through the magic barrier
txn.lockState()->setIsBatchWriter(true);
- bool convertUpdatesToUpserts = true;
+ // This function is only called in steady state replication.
+ bool inSteadyStateReplication = true;
for (std::vector<BSONObj>::const_iterator it = ops.begin(); it != ops.end(); ++it) {
try {
- const Status s = SyncTail::syncApply(&txn, *it, convertUpdatesToUpserts);
+ const Status s = SyncTail::syncApply(&txn, *it, inSteadyStateReplication);
if (!s.isOK()) {
severe() << "Error applying operation (" << it->toString() << "): " << s;
fassertFailedNoTrace(16359);
@@ -1022,14 +1025,23 @@ void multiInitialSyncApply(const std::vector<BSONObj>& ops, SyncTail* st) {
// allow us to get through the magic barrier
txn.lockState()->setIsBatchWriter(true);
- bool convertUpdatesToUpserts = false;
+ // This function is only called in initial sync, as its name suggests.
+ bool inSteadyStateReplication = false;
for (std::vector<BSONObj>::const_iterator it = ops.begin(); it != ops.end(); ++it) {
try {
- const Status s = SyncTail::syncApply(&txn, *it, convertUpdatesToUpserts);
+ const Status s = SyncTail::syncApply(&txn, *it, inSteadyStateReplication);
if (!s.isOK()) {
+ // Don't retry on commands.
+ SyncTail::OplogEntry entry(*it);
+ if (entry.opType[0] == 'c') {
+ error() << "Error applying command (" << it->toString() << "): " << s;
+ fassertFailedNoTrace(40353);
+ }
+
+ // We might need to fetch the missing docs from the sync source.
if (st->shouldRetry(&txn, *it)) {
- const Status s2 = SyncTail::syncApply(&txn, *it, convertUpdatesToUpserts);
+ const Status s2 = SyncTail::syncApply(&txn, *it, inSteadyStateReplication);
if (!s2.isOK()) {
severe() << "Error applying operation (" << it->toString() << "): " << s2;
fassertFailedNoTrace(15915);
diff --git a/src/mongo/db/repl/sync_tail.h b/src/mongo/db/repl/sync_tail.h
index 313a16bf48b..7e5cf2ec2a7 100644
--- a/src/mongo/db/repl/sync_tail.h
+++ b/src/mongo/db/repl/sync_tail.h
@@ -59,7 +59,7 @@ public:
/**
* Type of function that takes a non-command op and applies it locally.
* Used for applying from an oplog.
- * Last boolean argument 'convertUpdateToUpsert' converts some updates to upserts for
+ * Last boolean argument 'inSteadyStateReplication' converts some updates to upserts for
* idempotency reasons.
* Returns failure status if the op was an update that could not be applied.
*/
@@ -71,7 +71,7 @@ public:
* Used for applying from an oplog.
* Returns failure status if the op that could not be applied.
*/
- using ApplyCommandInLockFn = stdx::function<Status(OperationContext*, const BSONObj&)>;
+ using ApplyCommandInLockFn = stdx::function<Status(OperationContext*, const BSONObj&, bool)>;
/**
* Type of function to increment "repl.apply.ops" server status metric.
@@ -88,12 +88,12 @@ public:
*/
static Status syncApply(OperationContext* txn,
const BSONObj& o,
- bool convertUpdateToUpsert,
+ bool inSteadyStateReplication,
ApplyOperationInLockFn applyOperationInLock,
ApplyCommandInLockFn applyCommandInLock,
IncrementOpsAppliedStatsFn incrementOpsAppliedStats);
- static Status syncApply(OperationContext* txn, const BSONObj& o, bool convertUpdateToUpsert);
+ static Status syncApply(OperationContext* txn, const BSONObj& o, bool inSteadyStateReplication);
void oplogApplication(StorageInterface* storageInterface);
bool peek(BSONObj* obj);
diff --git a/src/mongo/db/repl/sync_tail_test.cpp b/src/mongo/db/repl/sync_tail_test.cpp
index 29f81337e05..eb0895c0d82 100644
--- a/src/mongo/db/repl/sync_tail_test.cpp
+++ b/src/mongo/db/repl/sync_tail_test.cpp
@@ -101,10 +101,10 @@ void SyncTailTest::setUp() {
_txn.reset(new OperationContextReplMock(&cc(), 0));
_opsApplied = 0;
_applyOp =
- [](OperationContext* txn, Database* db, const BSONObj& op, bool convertUpdateToUpsert) {
+ [](OperationContext* txn, Database* db, const BSONObj& op, bool inSteadyStateReplication) {
return Status::OK();
};
- _applyCmd = [](OperationContext* txn, const BSONObj& op) { return Status::OK(); };
+ _applyCmd = [](OperationContext* txn, const BSONObj& op, bool) { return Status::OK(); };
_incOps = [this]() { _opsApplied++; };
}
@@ -118,6 +118,11 @@ void SyncTailTest::tearDown() {
setGlobalReplicationCoordinator(nullptr);
}
+Status failedApplyCommand(OperationContext* txn, const BSONObj& theOperation, bool) {
+ FAIL("applyCommand unexpectedly invoked.");
+ return Status::OK();
+}
+
TEST_F(SyncTailTest, Peek) {
BackgroundSyncMock bgsync;
SyncTail syncTail(&bgsync, [](const std::vector<BSONObj>& ops, SyncTail* st) {});
@@ -159,7 +164,7 @@ TEST_F(SyncTailTest, SyncApplyNoOp) {
SyncTail::ApplyOperationInLockFn applyOp = [&](OperationContext* txn,
Database* db,
const BSONObj& theOperation,
- bool convertUpdateToUpsert) {
+ bool inSteadyStateReplication) {
applyOpCalled = true;
ASSERT_TRUE(txn);
ASSERT_TRUE(txn->lockState()->isDbLockedForMode("test", MODE_X));
@@ -167,17 +172,12 @@ TEST_F(SyncTailTest, SyncApplyNoOp) {
ASSERT_TRUE(documentValidationDisabled(txn));
ASSERT_TRUE(db);
ASSERT_EQUALS(op, theOperation);
- ASSERT_FALSE(convertUpdateToUpsert);
+ ASSERT_FALSE(inSteadyStateReplication);
return Status::OK();
};
- SyncTail::ApplyCommandInLockFn applyCmd =
- [&](OperationContext* txn, const BSONObj& theOperation) {
- FAIL("applyCommand unexpectedly invoked.");
- return Status::OK();
- };
ASSERT_TRUE(_txn->writesAreReplicated());
ASSERT_FALSE(documentValidationDisabled(_txn.get()));
- ASSERT_OK(SyncTail::syncApply(_txn.get(), op, false, applyOp, applyCmd, _incOps));
+ ASSERT_OK(SyncTail::syncApply(_txn.get(), op, false, applyOp, failedApplyCommand, _incOps));
ASSERT_TRUE(applyOpCalled);
ASSERT_EQUALS(1U, _opsApplied);
}
@@ -191,19 +191,14 @@ TEST_F(SyncTailTest, SyncApplyNoOpApplyOpThrowsException) {
SyncTail::ApplyOperationInLockFn applyOp = [&](OperationContext* txn,
Database* db,
const BSONObj& theOperation,
- bool convertUpdateToUpsert) {
+ bool inSteadyStateReplication) {
applyOpCalled++;
if (applyOpCalled < 5) {
throw WriteConflictException();
}
return Status::OK();
};
- SyncTail::ApplyCommandInLockFn applyCmd =
- [&](OperationContext* txn, const BSONObj& theOperation) {
- FAIL("applyCommand unexpectedly invoked.");
- return Status::OK();
- };
- ASSERT_OK(SyncTail::syncApply(_txn.get(), op, false, applyOp, applyCmd, _incOps));
+ ASSERT_OK(SyncTail::syncApply(_txn.get(), op, false, applyOp, failedApplyCommand, _incOps));
ASSERT_EQUALS(5, applyOpCalled);
ASSERT_EQUALS(1U, _opsApplied);
}
@@ -217,7 +212,7 @@ void SyncTailTest::_testSyncApplyInsertDocument(LockMode expectedMode) {
SyncTail::ApplyOperationInLockFn applyOp = [&](OperationContext* txn,
Database* db,
const BSONObj& theOperation,
- bool convertUpdateToUpsert) {
+ bool inSteadyStateReplication) {
applyOpCalled = true;
ASSERT_TRUE(txn);
ASSERT_TRUE(txn->lockState()->isDbLockedForMode("test", expectedMode));
@@ -226,17 +221,12 @@ void SyncTailTest::_testSyncApplyInsertDocument(LockMode expectedMode) {
ASSERT_TRUE(documentValidationDisabled(txn));
ASSERT_TRUE(db);
ASSERT_EQUALS(op, theOperation);
- ASSERT_TRUE(convertUpdateToUpsert);
+ ASSERT_TRUE(inSteadyStateReplication);
return Status::OK();
};
- SyncTail::ApplyCommandInLockFn applyCmd =
- [&](OperationContext* txn, const BSONObj& theOperation) {
- FAIL("applyCommand unexpectedly invoked.");
- return Status::OK();
- };
ASSERT_TRUE(_txn->writesAreReplicated());
ASSERT_FALSE(documentValidationDisabled(_txn.get()));
- ASSERT_OK(SyncTail::syncApply(_txn.get(), op, true, applyOp, applyCmd, _incOps));
+ ASSERT_OK(SyncTail::syncApply(_txn.get(), op, true, applyOp, failedApplyCommand, _incOps));
ASSERT_TRUE(applyOpCalled);
ASSERT_EQUALS(1U, _opsApplied);
}
@@ -278,7 +268,7 @@ TEST_F(SyncTailTest, SyncApplyIndexBuild) {
SyncTail::ApplyOperationInLockFn applyOp = [&](OperationContext* txn,
Database* db,
const BSONObj& theOperation,
- bool convertUpdateToUpsert) {
+ bool inSteadyStateReplication) {
applyOpCalled = true;
ASSERT_TRUE(txn);
ASSERT_TRUE(txn->lockState()->isDbLockedForMode("test", MODE_X));
@@ -286,17 +276,12 @@ TEST_F(SyncTailTest, SyncApplyIndexBuild) {
ASSERT_TRUE(documentValidationDisabled(txn));
ASSERT_TRUE(db);
ASSERT_EQUALS(op, theOperation);
- ASSERT_FALSE(convertUpdateToUpsert);
+ ASSERT_FALSE(inSteadyStateReplication);
return Status::OK();
};
- SyncTail::ApplyCommandInLockFn applyCmd =
- [&](OperationContext* txn, const BSONObj& theOperation) {
- FAIL("applyCommand unexpectedly invoked.");
- return Status::OK();
- };
ASSERT_TRUE(_txn->writesAreReplicated());
ASSERT_FALSE(documentValidationDisabled(_txn.get()));
- ASSERT_OK(SyncTail::syncApply(_txn.get(), op, false, applyOp, applyCmd, _incOps));
+ ASSERT_OK(SyncTail::syncApply(_txn.get(), op, false, applyOp, failedApplyCommand, _incOps));
ASSERT_TRUE(applyOpCalled);
ASSERT_EQUALS(1U, _opsApplied);
}
@@ -310,12 +295,12 @@ TEST_F(SyncTailTest, SyncApplyCommand) {
SyncTail::ApplyOperationInLockFn applyOp = [&](OperationContext* txn,
Database* db,
const BSONObj& theOperation,
- bool convertUpdateToUpsert) {
+ bool inSteadyStateReplication) {
FAIL("applyOperation unexpectedly invoked.");
return Status::OK();
};
SyncTail::ApplyCommandInLockFn applyCmd =
- [&](OperationContext* txn, const BSONObj& theOperation) {
+ [&](OperationContext* txn, const BSONObj& theOperation, bool) {
applyCmdCalled = true;
ASSERT_TRUE(txn);
ASSERT_TRUE(txn->lockState()->isW());
@@ -340,12 +325,12 @@ TEST_F(SyncTailTest, SyncApplyCommandThrowsException) {
SyncTail::ApplyOperationInLockFn applyOp = [&](OperationContext* txn,
Database* db,
const BSONObj& theOperation,
- bool convertUpdateToUpsert) {
+ bool inSteadyStateReplication) {
FAIL("applyOperation unexpectedly invoked.");
return Status::OK();
};
SyncTail::ApplyCommandInLockFn applyCmd =
- [&](OperationContext* txn, const BSONObj& theOperation) {
+ [&](OperationContext* txn, const BSONObj& theOperation, bool) {
applyCmdCalled++;
if (applyCmdCalled < 5) {
throw WriteConflictException();
@@ -357,4 +342,25 @@ TEST_F(SyncTailTest, SyncApplyCommandThrowsException) {
ASSERT_EQUALS(1U, _opsApplied);
}
+TEST_F(SyncTailTest, MultiInitialSyncApplyFailsOnRenameCollection) {
+ SyncTail syncTail(nullptr, SyncTail::MultiSyncApplyFunc());
+
+ BSONObjBuilder bob;
+ bob.appendElements(OpTime(Timestamp(1, 0), 1LL).toBSON());
+ bob.append("h", 1LL);
+ bob.append("op", "c");
+ bob.append("ns", "test.$cmd");
+ bob.append("o",
+ BSON("renameCollection"
+ << "test.foo"
+ << "to"
+ << "test.bar"
+ << "stayTemp" << false << "dropTarget" << false));
+ auto op = bob.obj();
+
+ ASSERT_EQUALS(ErrorCodes::OplogOperationUnsupported,
+ SyncTail::syncApply(_txn.get(), op, false));
+}
+
+
} // namespace