summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJudah Schvimer <judah@mongodb.com>2018-05-21 13:17:03 -0400
committerJudah Schvimer <judah@mongodb.com>2018-05-21 13:17:03 -0400
commitb64de307169891f859c29f207e712ed0eb3cd2a2 (patch)
tree7ae081a06b6d91303b7d0e86c301c8f9b135a570
parent6389499b09a899c5bc70e460338fb814b24640da (diff)
downloadmongo-b64de307169891f859c29f207e712ed0eb3cd2a2.tar.gz
SERVER-30724 Initial sync waits for oplog visibility before beginning clone
-rw-r--r--jstests/replsets/initial_sync_oplog_visibility.js52
-rw-r--r--src/mongo/db/op_observer.cpp17
-rw-r--r--src/mongo/db/repl/rs_initialsync.cpp34
3 files changed, 102 insertions, 1 deletions
diff --git a/jstests/replsets/initial_sync_oplog_visibility.js b/jstests/replsets/initial_sync_oplog_visibility.js
new file mode 100644
index 00000000000..9002a5f9b23
--- /dev/null
+++ b/jstests/replsets/initial_sync_oplog_visibility.js
@@ -0,0 +1,52 @@
+/**
+ * Test that we wait for oplog visibility before beginning initial sync.
+ */
+(function() {
+ 'use strict';
+ load("jstests/libs/check_log.js");
+
+ var name = 'initial_sync_oplog_visibility';
+
+ var replTest = new ReplSetTest({name: name, nodes: 1});
+ replTest.startSet();
+ replTest.initiate();
+ var primary = replTest.getPrimary();
+
+ var firstColl = "hangColl";
+ var secondColl = "secondColl";
+
+ // Create both collections.
+ assert.writeOK(primary.getDB(name)[firstColl].insert({init: 1}));
+ assert.writeOK(primary.getDB(name)[secondColl].insert({init: 1}));
+
+ jsTestLog("Add a node to initial sync.");
+ var secondary = replTest.add({});
+ assert.commandWorked(secondary.adminCommand(
+ {configureFailPoint: 'initialSyncHangBeforeOplogVisibility', mode: 'alwaysOn'}));
+ replTest.reInitiate();
+ checkLog.contains(secondary,
+ "initial sync - initialSyncHangBeforeOplogVisibility fail point enabled");
+
+ // Start an insert that will hang in a parallel shell.
+ assert.commandWorked(
+ primary.adminCommand({configureFailPoint: 'hangOnInsertObserver', mode: 'alwaysOn'}));
+ const awaitInsertShell = startParallelShell(function() {
+ var name = 'initial_sync_oplog_visibility';
+ var firstColl = "hangColl";
+ assert.writeOK(db.getSiblingDB(name)[firstColl].insert({a: 1}));
+ }, primary.port);
+ checkLog.contains(primary, "op observer - hangOnInsertObserver fail point enabled");
+
+ // Let initial sync finish and fail.
+ assert.commandWorked(secondary.adminCommand(
+ {configureFailPoint: 'initialSyncHangBeforeOplogVisibility', mode: 'off'}));
+ checkLog.contains(secondary, "initial sync attempt failed");
+
+ // Let the insert and the initial sync finish.
+ assert.commandWorked(
+ primary.adminCommand({configureFailPoint: 'hangOnInsertObserver', mode: 'off'}));
+ awaitInsertShell();
+ replTest.awaitSecondaryNodes();
+
+ replTest.stopSet();
+})(); \ No newline at end of file
diff --git a/src/mongo/db/op_observer.cpp b/src/mongo/db/op_observer.cpp
index b4dacd091c5..dbec2d47486 100644
--- a/src/mongo/db/op_observer.cpp
+++ b/src/mongo/db/op_observer.cpp
@@ -25,6 +25,7 @@
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication
#include "mongo/platform/basic.h"
@@ -36,16 +37,20 @@
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/service_context.h"
#include "mongo/db/namespace_string.h"
+#include "mongo/db/operation_context.h"
#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/replication_coordinator_global.h"
#include "mongo/s/d_state.h"
#include "mongo/scripting/engine.h"
-#include "mongo/db/operation_context.h"
+#include "mongo/util/fail_point_service.h"
+#include "mongo/util/log.h"
namespace mongo {
using std::vector;
+MONGO_FP_DECLARE(hangOnInsertObserver);
+
void OpObserver::onCreateIndex(OperationContext* txn,
const std::string& ns,
BSONObj indexDoc,
@@ -64,6 +69,16 @@ void OpObserver::onInserts(OperationContext* txn,
bool fromMigrate) {
repl::logOps(txn, "i", nss, begin, end, fromMigrate);
+ if (MONGO_FAIL_POINT(hangOnInsertObserver)) {
+ // This log output is used in js tests so please leave it.
+ log() << "op observer - hangOnInsertObserver fail point enabled. Blocking until fail point "
+ "is disabled.";
+ while (MONGO_FAIL_POINT(hangOnInsertObserver)) {
+ mongo::sleepsecs(1);
+ txn->checkForInterrupt();
+ }
+ }
+
const char* ns = nss.ns().c_str();
for (auto it = begin; it != end; it++) {
getGlobalAuthorizationManager()->logOp(txn, "i", ns, *it, nullptr);
diff --git a/src/mongo/db/repl/rs_initialsync.cpp b/src/mongo/db/repl/rs_initialsync.cpp
index ecaea8fcbc4..3f8ff685de9 100644
--- a/src/mongo/db/repl/rs_initialsync.cpp
+++ b/src/mongo/db/repl/rs_initialsync.cpp
@@ -72,6 +72,7 @@ MONGO_FP_DECLARE(failInitSyncWithBufferedEntriesLeft);
// Failpoint which causes the initial sync function to hang before copying databases.
MONGO_FP_DECLARE(initialSyncHangBeforeCopyingDatabases);
+MONGO_FP_DECLARE(initialSyncHangBeforeOplogVisibility);
/**
* Truncates the oplog (removes any documents) and resets internal variables that were
@@ -344,6 +345,39 @@ Status _initialSync() {
sleepsecs(15);
return Status(ErrorCodes::InitialSyncFailure, msg);
}
+ OpTime firstLastOpTime = fassertStatusOK(40420, OpTime::parseFromOplogEntry(lastOp));
+
+ log() << "initial sync ensuring correct oplog visibility. Starting at "
+ << firstLastOpTime.toBSON();
+
+ if (MONGO_FAIL_POINT(initialSyncHangBeforeOplogVisibility)) {
+ // This log output is used in js tests so please leave it.
+ log() << "initial sync - initialSyncHangBeforeOplogVisibility fail point "
+ "enabled. Blocking until fail point is disabled.";
+ while (MONGO_FAIL_POINT(initialSyncHangBeforeOplogVisibility) && !inShutdown()) {
+ mongo::sleepsecs(1);
+ }
+ log() << "initial sync - initialSyncHangBeforeOplogVisibility fail point disabled.";
+ }
+
+ // We do a forward scan starting at the lastOp we just fetched to ensure that all operations
+ // with earlier OpTimes are committed. This will block until all earlier oplog entries are
+ // visible and should either throw a socket exception or return lastOp.
+ BSONObjBuilder gte;
+ gte.append("$gte", firstLastOpTime.getTimestamp());
+ BSONObjBuilder queryBob;
+ queryBob.append("ts", gte.done());
+ const BSONObj& query = queryBob.done();
+ BSONObj lastOpConfirm = r.findOne(rsOplogName.c_str(), query);
+ invariant(!lastOpConfirm.isEmpty());
+
+ OpTime lastOpConfirmTime = fassertStatusOK(40421, OpTime::parseFromOplogEntry(lastOpConfirm));
+ if (lastOpConfirmTime != firstLastOpTime) {
+ return Status(ErrorCodes::InitialSyncFailure,
+ str::stream()
+ << "Last op was not confirmed. last op: " << firstLastOpTime.toBSON()
+ << ". confirmation: " << lastOpConfirmTime.toBSON());
+ }
log() << "initial sync drop all databases";
dropAllDatabasesExceptLocal(&txn);