diff options
author | Siyuan Zhou <visualzhou@gmail.com> | 2017-01-11 20:51:51 -0500 |
---|---|---|
committer | Siyuan Zhou <visualzhou@gmail.com> | 2017-01-12 18:58:06 -0500 |
commit | 5313582dae3fc0fe11d91dbbc3d2a8154de3b038 (patch) | |
tree | e2fc13a6a001b207c381fbace9ec2fee1bbf87f7 | |
parent | df3fcefe77119447ce2ad8174657563176da2545 (diff) | |
download | mongo-5313582dae3fc0fe11d91dbbc3d2a8154de3b038.tar.gz |
SERVER-26117 renameCollection 'c' op should restart initial sync upon application
-rw-r--r-- | src/mongo/db/catalog/apply_ops.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/applier.h | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/master_slave.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog.cpp | 16 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog.h | 8 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail.cpp | 32 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail.h | 8 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail_test.cpp | 80 |
8 files changed, 91 insertions, 61 deletions
diff --git a/src/mongo/db/catalog/apply_ops.cpp b/src/mongo/db/catalog/apply_ops.cpp index 6d00127b9af..0afea74e776 100644 --- a/src/mongo/db/catalog/apply_ops.cpp +++ b/src/mongo/db/catalog/apply_ops.cpp @@ -132,7 +132,7 @@ Status applyOps(OperationContext* txn, try { MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { if (*opType == 'c') { - status = repl::applyCommand_inlock(txn, temp); + status = repl::applyCommand_inlock(txn, temp, true); break; } else { OldClientContext ctx(txn, ns); diff --git a/src/mongo/db/repl/applier.h b/src/mongo/db/repl/applier.h index a8e48c0edb9..d035e0d80ef 100644 --- a/src/mongo/db/repl/applier.h +++ b/src/mongo/db/repl/applier.h @@ -64,7 +64,7 @@ public: /** * Type of function to to apply a single operation. In production, this function - * would have the same outcome as calling SyncTail::syncApply() ('convertUpdatesToUpserts' + * would have the same outcome as calling SyncTail::syncApply() ('inSteadyStateReplication' * value will be embedded in the function implementation). */ using ApplyOperationFn = stdx::function<Status(OperationContext*, const BSONObj&)>; diff --git a/src/mongo/db/repl/master_slave.cpp b/src/mongo/db/repl/master_slave.cpp index 1cb12ba561e..67a156cbce7 100644 --- a/src/mongo/db/repl/master_slave.cpp +++ b/src/mongo/db/repl/master_slave.cpp @@ -616,14 +616,14 @@ bool ReplSource::handleDuplicateDbName(OperationContext* txn, void ReplSource::applyCommand(OperationContext* txn, const BSONObj& op) { try { - Status status = applyCommand_inlock(txn, op); + Status status = applyCommand_inlock(txn, op, true); if (!status.isOK()) { SyncTail sync(nullptr, SyncTail::MultiSyncApplyFunc()); sync.setHostname(hostName); if (sync.shouldRetry(txn, op)) { uassert(28639, "Failure retrying initial sync update", - applyCommand_inlock(txn, op).isOK()); + applyCommand_inlock(txn, op, true).isOK()); } } } catch (UserException& e) { diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index d98db32fd43..2c23f12e97b 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -712,7 +712,7 @@ std::map<std::string, ApplyOpMetadata> opsMap = { Status applyOperation_inlock(OperationContext* txn, Database* db, const BSONObj& op, - bool convertUpdateToUpsert) { + bool inSteadyStateReplication) { LOG(3) << "applying op: " << op << endl; OpCounters* opCounters = txn->writesAreReplicated() ? &globalOpCounters : &replOpCounters; @@ -866,7 +866,7 @@ Status applyOperation_inlock(OperationContext* txn, OpDebug debug; BSONObj updateCriteria = o2; - const bool upsert = valueB || convertUpdateToUpsert; + const bool upsert = valueB || inSteadyStateReplication; uassert(ErrorCodes::NoSuchKey, str::stream() << "Failed to apply update due to missing _id: " << op.toString(), @@ -949,7 +949,9 @@ Status applyOperation_inlock(OperationContext* txn, return Status::OK(); } -Status applyCommand_inlock(OperationContext* txn, const BSONObj& op) { +Status applyCommand_inlock(OperationContext* txn, + const BSONObj& op, + bool inSteadyStateReplication) { const char* names[] = {"o", "ns", "op"}; BSONElement fields[3]; op.getFields(3, names, fields); @@ -972,6 +974,14 @@ Status applyCommand_inlock(OperationContext* txn, const BSONObj& op) { const char* ns = fieldNs.valuestrsafe(); + // Applying renameCollection during initial sync might lead to data corruption, so we restart + // the initial sync. + if (!inSteadyStateReplication && o.firstElementFieldName() == std::string("renameCollection")) { + return Status(ErrorCodes::OplogOperationUnsupported, + str::stream() + << "Applying renameCollection not supported in initial sync: " << op); + } + // Applying commands in repl is done under Global W-lock, so it is safe to not // perform the current DB checks after reacquiring the lock. invariant(txn->lockState()->isW()); diff --git a/src/mongo/db/repl/oplog.h b/src/mongo/db/repl/oplog.h index b022aa0e545..e3251102fef 100644 --- a/src/mongo/db/repl/oplog.h +++ b/src/mongo/db/repl/oplog.h @@ -128,20 +128,22 @@ void oplogCheckCloseDatabase(OperationContext* txn, Database* db); /** * Take a non-command op and apply it locally * Used for applying from an oplog - * @param convertUpdateToUpsert convert some updates to upserts for idempotency reasons + * @param inSteadyStateReplication convert some updates to upserts for idempotency reasons * Returns failure status if the op was an update that could not be applied. */ Status applyOperation_inlock(OperationContext* txn, Database* db, const BSONObj& op, - bool convertUpdateToUpsert = false); + bool inSteadyStateReplication = false); /** * Take a command op and apply it locally * Used for applying from an oplog + * inSteadyStateReplication indicates whether we are in steady state replication, rather than + * initial sync. * Returns failure status if the op that could not be applied. */ -Status applyCommand_inlock(OperationContext* txn, const BSONObj& op); +Status applyCommand_inlock(OperationContext* txn, const BSONObj& op, bool inSteadyStateReplication); /** * Initializes the global Timestamp with the value from the timestamp of the last oplog entry. diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index 6ecf9d7028a..3a87828e76b 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -255,7 +255,7 @@ bool SyncTail::peek(BSONObj* op) { // static Status SyncTail::syncApply(OperationContext* txn, const BSONObj& op, - bool convertUpdateToUpsert, + bool inSteadyStateReplication, ApplyOperationInLockFn applyOperationInLock, ApplyCommandInLockFn applyCommandInLock, IncrementOpsAppliedStatsFn incrementOpsAppliedStats) { @@ -291,7 +291,7 @@ Status SyncTail::syncApply(OperationContext* txn, Lock::GlobalWrite globalWriteLock(txn->lockState()); // special case apply for commands to avoid implicit database creation - Status status = applyCommandInLock(txn, op); + Status status = applyCommandInLock(txn, op, inSteadyStateReplication); incrementOpsAppliedStats(); return status; } @@ -304,7 +304,7 @@ Status SyncTail::syncApply(OperationContext* txn, txn->setReplicatedWrites(false); DisableDocumentValidation validationDisabler(txn); - Status status = applyOperationInLock(txn, db, op, convertUpdateToUpsert); + Status status = applyOperationInLock(txn, db, op, inSteadyStateReplication); if (!status.isOK() && status.code() == ErrorCodes::WriteConflict) { throw WriteConflictException(); } @@ -369,10 +369,12 @@ Status SyncTail::syncApply(OperationContext* txn, return Status(ErrorCodes::BadValue, ss); } -Status SyncTail::syncApply(OperationContext* txn, const BSONObj& op, bool convertUpdateToUpsert) { +Status SyncTail::syncApply(OperationContext* txn, + const BSONObj& op, + bool inSteadyStateReplication) { return syncApply(txn, op, - convertUpdateToUpsert, + inSteadyStateReplication, applyOperation_inlock, applyCommand_inlock, stdx::bind(&Counter64::increment, &opsAppliedStats, 1ULL)); @@ -989,11 +991,12 @@ void multiSyncApply(const std::vector<BSONObj>& ops, SyncTail* st) { // allow us to get through the magic barrier txn.lockState()->setIsBatchWriter(true); - bool convertUpdatesToUpserts = true; + // This function is only called in steady state replication. + bool inSteadyStateReplication = true; for (std::vector<BSONObj>::const_iterator it = ops.begin(); it != ops.end(); ++it) { try { - const Status s = SyncTail::syncApply(&txn, *it, convertUpdatesToUpserts); + const Status s = SyncTail::syncApply(&txn, *it, inSteadyStateReplication); if (!s.isOK()) { severe() << "Error applying operation (" << it->toString() << "): " << s; fassertFailedNoTrace(16359); @@ -1022,14 +1025,23 @@ void multiInitialSyncApply(const std::vector<BSONObj>& ops, SyncTail* st) { // allow us to get through the magic barrier txn.lockState()->setIsBatchWriter(true); - bool convertUpdatesToUpserts = false; + // This function is only called in initial sync, as its name suggests. + bool inSteadyStateReplication = false; for (std::vector<BSONObj>::const_iterator it = ops.begin(); it != ops.end(); ++it) { try { - const Status s = SyncTail::syncApply(&txn, *it, convertUpdatesToUpserts); + const Status s = SyncTail::syncApply(&txn, *it, inSteadyStateReplication); if (!s.isOK()) { + // Don't retry on commands. + SyncTail::OplogEntry entry(*it); + if (entry.opType[0] == 'c') { + error() << "Error applying command (" << it->toString() << "): " << s; + fassertFailedNoTrace(40353); + } + + // We might need to fetch the missing docs from the sync source. if (st->shouldRetry(&txn, *it)) { - const Status s2 = SyncTail::syncApply(&txn, *it, convertUpdatesToUpserts); + const Status s2 = SyncTail::syncApply(&txn, *it, inSteadyStateReplication); if (!s2.isOK()) { severe() << "Error applying operation (" << it->toString() << "): " << s2; fassertFailedNoTrace(15915); diff --git a/src/mongo/db/repl/sync_tail.h b/src/mongo/db/repl/sync_tail.h index 313a16bf48b..7e5cf2ec2a7 100644 --- a/src/mongo/db/repl/sync_tail.h +++ b/src/mongo/db/repl/sync_tail.h @@ -59,7 +59,7 @@ public: /** * Type of function that takes a non-command op and applies it locally. * Used for applying from an oplog. - * Last boolean argument 'convertUpdateToUpsert' converts some updates to upserts for + * Last boolean argument 'inSteadyStateReplication' converts some updates to upserts for * idempotency reasons. * Returns failure status if the op was an update that could not be applied. */ @@ -71,7 +71,7 @@ public: * Used for applying from an oplog. * Returns failure status if the op that could not be applied. */ - using ApplyCommandInLockFn = stdx::function<Status(OperationContext*, const BSONObj&)>; + using ApplyCommandInLockFn = stdx::function<Status(OperationContext*, const BSONObj&, bool)>; /** * Type of function to increment "repl.apply.ops" server status metric. @@ -88,12 +88,12 @@ public: */ static Status syncApply(OperationContext* txn, const BSONObj& o, - bool convertUpdateToUpsert, + bool inSteadyStateReplication, ApplyOperationInLockFn applyOperationInLock, ApplyCommandInLockFn applyCommandInLock, IncrementOpsAppliedStatsFn incrementOpsAppliedStats); - static Status syncApply(OperationContext* txn, const BSONObj& o, bool convertUpdateToUpsert); + static Status syncApply(OperationContext* txn, const BSONObj& o, bool inSteadyStateReplication); void oplogApplication(StorageInterface* storageInterface); bool peek(BSONObj* obj); diff --git a/src/mongo/db/repl/sync_tail_test.cpp b/src/mongo/db/repl/sync_tail_test.cpp index 29f81337e05..eb0895c0d82 100644 --- a/src/mongo/db/repl/sync_tail_test.cpp +++ b/src/mongo/db/repl/sync_tail_test.cpp @@ -101,10 +101,10 @@ void SyncTailTest::setUp() { _txn.reset(new OperationContextReplMock(&cc(), 0)); _opsApplied = 0; _applyOp = - [](OperationContext* txn, Database* db, const BSONObj& op, bool convertUpdateToUpsert) { + [](OperationContext* txn, Database* db, const BSONObj& op, bool inSteadyStateReplication) { return Status::OK(); }; - _applyCmd = [](OperationContext* txn, const BSONObj& op) { return Status::OK(); }; + _applyCmd = [](OperationContext* txn, const BSONObj& op, bool) { return Status::OK(); }; _incOps = [this]() { _opsApplied++; }; } @@ -118,6 +118,11 @@ void SyncTailTest::tearDown() { setGlobalReplicationCoordinator(nullptr); } +Status failedApplyCommand(OperationContext* txn, const BSONObj& theOperation, bool) { + FAIL("applyCommand unexpectedly invoked."); + return Status::OK(); +} + TEST_F(SyncTailTest, Peek) { BackgroundSyncMock bgsync; SyncTail syncTail(&bgsync, [](const std::vector<BSONObj>& ops, SyncTail* st) {}); @@ -159,7 +164,7 @@ TEST_F(SyncTailTest, SyncApplyNoOp) { SyncTail::ApplyOperationInLockFn applyOp = [&](OperationContext* txn, Database* db, const BSONObj& theOperation, - bool convertUpdateToUpsert) { + bool inSteadyStateReplication) { applyOpCalled = true; ASSERT_TRUE(txn); ASSERT_TRUE(txn->lockState()->isDbLockedForMode("test", MODE_X)); @@ -167,17 +172,12 @@ TEST_F(SyncTailTest, SyncApplyNoOp) { ASSERT_TRUE(documentValidationDisabled(txn)); ASSERT_TRUE(db); ASSERT_EQUALS(op, theOperation); - ASSERT_FALSE(convertUpdateToUpsert); + ASSERT_FALSE(inSteadyStateReplication); return Status::OK(); }; - SyncTail::ApplyCommandInLockFn applyCmd = - [&](OperationContext* txn, const BSONObj& theOperation) { - FAIL("applyCommand unexpectedly invoked."); - return Status::OK(); - }; ASSERT_TRUE(_txn->writesAreReplicated()); ASSERT_FALSE(documentValidationDisabled(_txn.get())); - ASSERT_OK(SyncTail::syncApply(_txn.get(), op, false, applyOp, applyCmd, _incOps)); + ASSERT_OK(SyncTail::syncApply(_txn.get(), op, false, applyOp, failedApplyCommand, _incOps)); ASSERT_TRUE(applyOpCalled); ASSERT_EQUALS(1U, _opsApplied); } @@ -191,19 +191,14 @@ TEST_F(SyncTailTest, SyncApplyNoOpApplyOpThrowsException) { SyncTail::ApplyOperationInLockFn applyOp = [&](OperationContext* txn, Database* db, const BSONObj& theOperation, - bool convertUpdateToUpsert) { + bool inSteadyStateReplication) { applyOpCalled++; if (applyOpCalled < 5) { throw WriteConflictException(); } return Status::OK(); }; - SyncTail::ApplyCommandInLockFn applyCmd = - [&](OperationContext* txn, const BSONObj& theOperation) { - FAIL("applyCommand unexpectedly invoked."); - return Status::OK(); - }; - ASSERT_OK(SyncTail::syncApply(_txn.get(), op, false, applyOp, applyCmd, _incOps)); + ASSERT_OK(SyncTail::syncApply(_txn.get(), op, false, applyOp, failedApplyCommand, _incOps)); ASSERT_EQUALS(5, applyOpCalled); ASSERT_EQUALS(1U, _opsApplied); } @@ -217,7 +212,7 @@ void SyncTailTest::_testSyncApplyInsertDocument(LockMode expectedMode) { SyncTail::ApplyOperationInLockFn applyOp = [&](OperationContext* txn, Database* db, const BSONObj& theOperation, - bool convertUpdateToUpsert) { + bool inSteadyStateReplication) { applyOpCalled = true; ASSERT_TRUE(txn); ASSERT_TRUE(txn->lockState()->isDbLockedForMode("test", expectedMode)); @@ -226,17 +221,12 @@ void SyncTailTest::_testSyncApplyInsertDocument(LockMode expectedMode) { ASSERT_TRUE(documentValidationDisabled(txn)); ASSERT_TRUE(db); ASSERT_EQUALS(op, theOperation); - ASSERT_TRUE(convertUpdateToUpsert); + ASSERT_TRUE(inSteadyStateReplication); return Status::OK(); }; - SyncTail::ApplyCommandInLockFn applyCmd = - [&](OperationContext* txn, const BSONObj& theOperation) { - FAIL("applyCommand unexpectedly invoked."); - return Status::OK(); - }; ASSERT_TRUE(_txn->writesAreReplicated()); ASSERT_FALSE(documentValidationDisabled(_txn.get())); - ASSERT_OK(SyncTail::syncApply(_txn.get(), op, true, applyOp, applyCmd, _incOps)); + ASSERT_OK(SyncTail::syncApply(_txn.get(), op, true, applyOp, failedApplyCommand, _incOps)); ASSERT_TRUE(applyOpCalled); ASSERT_EQUALS(1U, _opsApplied); } @@ -278,7 +268,7 @@ TEST_F(SyncTailTest, SyncApplyIndexBuild) { SyncTail::ApplyOperationInLockFn applyOp = [&](OperationContext* txn, Database* db, const BSONObj& theOperation, - bool convertUpdateToUpsert) { + bool inSteadyStateReplication) { applyOpCalled = true; ASSERT_TRUE(txn); ASSERT_TRUE(txn->lockState()->isDbLockedForMode("test", MODE_X)); @@ -286,17 +276,12 @@ TEST_F(SyncTailTest, SyncApplyIndexBuild) { ASSERT_TRUE(documentValidationDisabled(txn)); ASSERT_TRUE(db); ASSERT_EQUALS(op, theOperation); - ASSERT_FALSE(convertUpdateToUpsert); + ASSERT_FALSE(inSteadyStateReplication); return Status::OK(); }; - SyncTail::ApplyCommandInLockFn applyCmd = - [&](OperationContext* txn, const BSONObj& theOperation) { - FAIL("applyCommand unexpectedly invoked."); - return Status::OK(); - }; ASSERT_TRUE(_txn->writesAreReplicated()); ASSERT_FALSE(documentValidationDisabled(_txn.get())); - ASSERT_OK(SyncTail::syncApply(_txn.get(), op, false, applyOp, applyCmd, _incOps)); + ASSERT_OK(SyncTail::syncApply(_txn.get(), op, false, applyOp, failedApplyCommand, _incOps)); ASSERT_TRUE(applyOpCalled); ASSERT_EQUALS(1U, _opsApplied); } @@ -310,12 +295,12 @@ TEST_F(SyncTailTest, SyncApplyCommand) { SyncTail::ApplyOperationInLockFn applyOp = [&](OperationContext* txn, Database* db, const BSONObj& theOperation, - bool convertUpdateToUpsert) { + bool inSteadyStateReplication) { FAIL("applyOperation unexpectedly invoked."); return Status::OK(); }; SyncTail::ApplyCommandInLockFn applyCmd = - [&](OperationContext* txn, const BSONObj& theOperation) { + [&](OperationContext* txn, const BSONObj& theOperation, bool) { applyCmdCalled = true; ASSERT_TRUE(txn); ASSERT_TRUE(txn->lockState()->isW()); @@ -340,12 +325,12 @@ TEST_F(SyncTailTest, SyncApplyCommandThrowsException) { SyncTail::ApplyOperationInLockFn applyOp = [&](OperationContext* txn, Database* db, const BSONObj& theOperation, - bool convertUpdateToUpsert) { + bool inSteadyStateReplication) { FAIL("applyOperation unexpectedly invoked."); return Status::OK(); }; SyncTail::ApplyCommandInLockFn applyCmd = - [&](OperationContext* txn, const BSONObj& theOperation) { + [&](OperationContext* txn, const BSONObj& theOperation, bool) { applyCmdCalled++; if (applyCmdCalled < 5) { throw WriteConflictException(); @@ -357,4 +342,25 @@ TEST_F(SyncTailTest, SyncApplyCommandThrowsException) { ASSERT_EQUALS(1U, _opsApplied); } +TEST_F(SyncTailTest, MultiInitialSyncApplyFailsOnRenameCollection) { + SyncTail syncTail(nullptr, SyncTail::MultiSyncApplyFunc()); + + BSONObjBuilder bob; + bob.appendElements(OpTime(Timestamp(1, 0), 1LL).toBSON()); + bob.append("h", 1LL); + bob.append("op", "c"); + bob.append("ns", "test.$cmd"); + bob.append("o", + BSON("renameCollection" + << "test.foo" + << "to" + << "test.bar" + << "stayTemp" << false << "dropTarget" << false)); + auto op = bob.obj(); + + ASSERT_EQUALS(ErrorCodes::OplogOperationUnsupported, + SyncTail::syncApply(_txn.get(), op, false)); +} + + } // namespace |