diff options
author | Judah Schvimer <judah@mongodb.com> | 2018-05-21 13:17:03 -0400 |
---|---|---|
committer | Judah Schvimer <judah@mongodb.com> | 2018-05-21 13:17:03 -0400 |
commit | b64de307169891f859c29f207e712ed0eb3cd2a2 (patch) | |
tree | 7ae081a06b6d91303b7d0e86c301c8f9b135a570 | |
parent | 6389499b09a899c5bc70e460338fb814b24640da (diff) | |
download | mongo-b64de307169891f859c29f207e712ed0eb3cd2a2.tar.gz |
SERVER-30724 Initial sync waits for oplog visibility before beginning clone
-rw-r--r-- | jstests/replsets/initial_sync_oplog_visibility.js | 52 | ||||
-rw-r--r-- | src/mongo/db/op_observer.cpp | 17 | ||||
-rw-r--r-- | src/mongo/db/repl/rs_initialsync.cpp | 34 |
3 files changed, 102 insertions, 1 deletions
diff --git a/jstests/replsets/initial_sync_oplog_visibility.js b/jstests/replsets/initial_sync_oplog_visibility.js new file mode 100644 index 00000000000..9002a5f9b23 --- /dev/null +++ b/jstests/replsets/initial_sync_oplog_visibility.js @@ -0,0 +1,52 @@ +/** + * Test that we wait for oplog visibility before beginning initial sync. + */ +(function() { + 'use strict'; + load("jstests/libs/check_log.js"); + + var name = 'initial_sync_oplog_visibility'; + + var replTest = new ReplSetTest({name: name, nodes: 1}); + replTest.startSet(); + replTest.initiate(); + var primary = replTest.getPrimary(); + + var firstColl = "hangColl"; + var secondColl = "secondColl"; + + // Create both collections. + assert.writeOK(primary.getDB(name)[firstColl].insert({init: 1})); + assert.writeOK(primary.getDB(name)[secondColl].insert({init: 1})); + + jsTestLog("Add a node to initial sync."); + var secondary = replTest.add({}); + assert.commandWorked(secondary.adminCommand( + {configureFailPoint: 'initialSyncHangBeforeOplogVisibility', mode: 'alwaysOn'})); + replTest.reInitiate(); + checkLog.contains(secondary, + "initial sync - initialSyncHangBeforeOplogVisibility fail point enabled"); + + // Start an insert that will hang in a parallel shell. + assert.commandWorked( + primary.adminCommand({configureFailPoint: 'hangOnInsertObserver', mode: 'alwaysOn'})); + const awaitInsertShell = startParallelShell(function() { + var name = 'initial_sync_oplog_visibility'; + var firstColl = "hangColl"; + assert.writeOK(db.getSiblingDB(name)[firstColl].insert({a: 1})); + }, primary.port); + checkLog.contains(primary, "op observer - hangOnInsertObserver fail point enabled"); + + // Let initial sync finish and fail. + assert.commandWorked(secondary.adminCommand( + {configureFailPoint: 'initialSyncHangBeforeOplogVisibility', mode: 'off'})); + checkLog.contains(secondary, "initial sync attempt failed"); + + // Let the insert and the initial sync finish. + assert.commandWorked( + primary.adminCommand({configureFailPoint: 'hangOnInsertObserver', mode: 'off'})); + awaitInsertShell(); + replTest.awaitSecondaryNodes(); + + replTest.stopSet(); +})();
\ No newline at end of file diff --git a/src/mongo/db/op_observer.cpp b/src/mongo/db/op_observer.cpp index b4dacd091c5..dbec2d47486 100644 --- a/src/mongo/db/op_observer.cpp +++ b/src/mongo/db/op_observer.cpp @@ -25,6 +25,7 @@ * exception statement from all source files in the program, then also delete * it in the license file. */ +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication #include "mongo/platform/basic.h" @@ -36,16 +37,20 @@ #include "mongo/db/dbdirectclient.h" #include "mongo/db/service_context.h" #include "mongo/db/namespace_string.h" +#include "mongo/db/operation_context.h" #include "mongo/db/repl/oplog.h" #include "mongo/db/repl/replication_coordinator_global.h" #include "mongo/s/d_state.h" #include "mongo/scripting/engine.h" -#include "mongo/db/operation_context.h" +#include "mongo/util/fail_point_service.h" +#include "mongo/util/log.h" namespace mongo { using std::vector; +MONGO_FP_DECLARE(hangOnInsertObserver); + void OpObserver::onCreateIndex(OperationContext* txn, const std::string& ns, BSONObj indexDoc, @@ -64,6 +69,16 @@ void OpObserver::onInserts(OperationContext* txn, bool fromMigrate) { repl::logOps(txn, "i", nss, begin, end, fromMigrate); + if (MONGO_FAIL_POINT(hangOnInsertObserver)) { + // This log output is used in js tests so please leave it. + log() << "op observer - hangOnInsertObserver fail point enabled. Blocking until fail point " + "is disabled."; + while (MONGO_FAIL_POINT(hangOnInsertObserver)) { + mongo::sleepsecs(1); + txn->checkForInterrupt(); + } + } + const char* ns = nss.ns().c_str(); for (auto it = begin; it != end; it++) { getGlobalAuthorizationManager()->logOp(txn, "i", ns, *it, nullptr); diff --git a/src/mongo/db/repl/rs_initialsync.cpp b/src/mongo/db/repl/rs_initialsync.cpp index ecaea8fcbc4..3f8ff685de9 100644 --- a/src/mongo/db/repl/rs_initialsync.cpp +++ b/src/mongo/db/repl/rs_initialsync.cpp @@ -72,6 +72,7 @@ MONGO_FP_DECLARE(failInitSyncWithBufferedEntriesLeft); // Failpoint which causes the initial sync function to hang before copying databases. MONGO_FP_DECLARE(initialSyncHangBeforeCopyingDatabases); +MONGO_FP_DECLARE(initialSyncHangBeforeOplogVisibility); /** * Truncates the oplog (removes any documents) and resets internal variables that were @@ -344,6 +345,39 @@ Status _initialSync() { sleepsecs(15); return Status(ErrorCodes::InitialSyncFailure, msg); } + OpTime firstLastOpTime = fassertStatusOK(40420, OpTime::parseFromOplogEntry(lastOp)); + + log() << "initial sync ensuring correct oplog visibility. Starting at " + << firstLastOpTime.toBSON(); + + if (MONGO_FAIL_POINT(initialSyncHangBeforeOplogVisibility)) { + // This log output is used in js tests so please leave it. + log() << "initial sync - initialSyncHangBeforeOplogVisibility fail point " + "enabled. Blocking until fail point is disabled."; + while (MONGO_FAIL_POINT(initialSyncHangBeforeOplogVisibility) && !inShutdown()) { + mongo::sleepsecs(1); + } + log() << "initial sync - initialSyncHangBeforeOplogVisibility fail point disabled."; + } + + // We do a forward scan starting at the lastOp we just fetched to ensure that all operations + // with earlier OpTimes are committed. This will block until all earlier oplog entries are + // visible and should either throw a socket exception or return lastOp. + BSONObjBuilder gte; + gte.append("$gte", firstLastOpTime.getTimestamp()); + BSONObjBuilder queryBob; + queryBob.append("ts", gte.done()); + const BSONObj& query = queryBob.done(); + BSONObj lastOpConfirm = r.findOne(rsOplogName.c_str(), query); + invariant(!lastOpConfirm.isEmpty()); + + OpTime lastOpConfirmTime = fassertStatusOK(40421, OpTime::parseFromOplogEntry(lastOpConfirm)); + if (lastOpConfirmTime != firstLastOpTime) { + return Status(ErrorCodes::InitialSyncFailure, + str::stream() + << "Last op was not confirmed. last op: " << firstLastOpTime.toBSON() + << ". confirmation: " << lastOpConfirmTime.toBSON()); + } log() << "initial sync drop all databases"; dropAllDatabasesExceptLocal(&txn); |