summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorVesselina Ratcheva <vesselina.ratcheva@10gen.com>2018-10-02 18:34:01 -0400
committerVesselina Ratcheva <vesselina.ratcheva@10gen.com>2019-01-23 22:48:40 -0500
commitafb213ced669a328c90e0a0fb0b7e06bf92c0ccb (patch)
tree46d3074b06d9871f2752ee4d81c946295b6fc1e3
parent8cb57a8acb9ab0ca7937d66c30d500a76247077b (diff)
downloadmongo-afb213ced669a328c90e0a0fb0b7e06bf92c0ccb.tar.gz
SERVER-32146 Log slow oplog entry application
(cherry picked from commit 66434ba0a7930efedbe5e06a098ab8e7bb659cfa)
-rw-r--r--jstests/libs/check_log.js28
-rw-r--r--jstests/replsets/log_secondary_oplog_application.js81
-rw-r--r--src/mongo/db/repl/SConscript1
-rw-r--r--src/mongo/db/repl/sync_tail.cpp60
-rw-r--r--src/mongo/db/repl/sync_tail_test.cpp132
-rw-r--r--src/mongo/util/clock_source_mock.h18
6 files changed, 298 insertions, 22 deletions
diff --git a/jstests/libs/check_log.js b/jstests/libs/check_log.js
index 809923b5dd3..46b4b031bff 100644
--- a/jstests/libs/check_log.js
+++ b/jstests/libs/check_log.js
@@ -29,23 +29,19 @@ var checkLog;
* the provided 'msg' is found in the logs, or 5 minutes have elapsed. Throws an exception
* on timeout.
*/
- var contains = function(conn, msg) {
- assert.soon(
- function() {
- var logMessages = getGlobalLog(conn);
- if (logMessages === null) {
- return false;
- }
- for (var i = 0; i < logMessages.length; i++) {
- if (logMessages[i].indexOf(msg) != -1) {
- return true;
- }
- }
+ var contains = function(conn, msg, timeout = 5 * 60 * 1000) {
+ assert.soon(function() {
+ var logMessages = getGlobalLog(conn);
+ if (logMessages === null) {
return false;
- },
- 'Could not find log entries containing the following message: ' + msg,
- 5 * 60 * 1000,
- 300);
+ }
+ for (var i = 0; i < logMessages.length; i++) {
+ if (logMessages[i].indexOf(msg) != -1) {
+ return true;
+ }
+ }
+ return false;
+ }, 'Could not find log entries containing the following message: ' + msg, timeout, 300);
};
/*
diff --git a/jstests/replsets/log_secondary_oplog_application.js b/jstests/replsets/log_secondary_oplog_application.js
new file mode 100644
index 00000000000..f6918dfa811
--- /dev/null
+++ b/jstests/replsets/log_secondary_oplog_application.js
@@ -0,0 +1,81 @@
+/**
+ * Tests that the server logs ops on the secondary if and only if they are slow to apply.
+ * We should only report ops if they take longer than "slowMS" to apply on a secondary.
+ * We intentionally target CRUD ops in this test, since we know we should be the only ones
+ * issuing them. See below for details on how we simulate quickness and slowness.
+ */
+
+(function() {
+ "use strict";
+ load("jstests/libs/check_log.js");
+
+ let name = "log_secondary_oplog_application";
+ let rst = ReplSetTest({name: name, nodes: 2});
+ rst.startSet();
+
+ let nodes = rst.nodeList();
+ rst.initiate({
+ "_id": name,
+ "members": [{"_id": 0, "host": nodes[0]}, {"_id": 1, "host": nodes[1], "priority": 0}]
+ });
+
+ let primary = rst.getPrimary();
+ let secondary = rst.getSecondary();
+
+ /**
+ * Part 1: Issue a fast op and make sure that we do *not* log it.
+ * We ensure the op is always considered fast by vastly increasing the "slowMS" threshold.
+ */
+
+ // Create collection explicitly so the insert doesn't have to do it.
+ assert.commandWorked(primary.getDB(name).createCollection("fastOp"));
+ rst.awaitReplication();
+
+ // Set "slowMS" to a very high value (in milliseconds).
+ assert.commandWorked(secondary.getDB(name).setProfilingLevel(1, 60 * 60 * 1000));
+
+ // Issue a write and make sure we replicate it.
+ assert.writeOK(primary.getDB(name)["fastOp"].insert({"fast": "cheetah"}));
+ rst.awaitReplication();
+
+ // The op should not have been logged.
+ assert.throws(function() {
+ checkLog.contains(secondary, "applied op: CRUD", 1 * 1000);
+ });
+
+ /**
+ * Part 2: Issue a slow op and make sure that we *do* log it.
+ * We use a failpoint in SyncTail::syncApply which blocks after we read the time at the start
+ * of the application of the op, and we wait there to simulate slowness.
+ */
+
+ // Create collection explicitly so the insert doesn't have to do it.
+ assert.commandWorked(primary.getDB(name).createCollection("slowOp"));
+ rst.awaitReplication();
+
+ // Set "slowMS" to a low value (in milliseconds).
+ assert.commandWorked(secondary.getDB(name).setProfilingLevel(1, 20));
+
+ // Hang right after taking note of the start time of the application.
+ assert.commandWorked(secondary.adminCommand(
+ {configureFailPoint: "hangAfterRecordingOpApplicationStartTime", mode: "alwaysOn"}));
+
+ // Issue a write and make sure we've hit the failpoint before moving on.
+ assert.writeOK(primary.getDB(name)["slowOp"].insert({"slow": "sloth"}));
+ checkLog.contains(secondary,
+ "syncApply - fail point hangAfterRecordingOpApplicationStartTime enabled");
+
+ // Wait for an amount of time safely above the "slowMS" we set.
+ sleep(0.5 * 1000);
+
+ // Disable the failpoint so the op finish can applying.
+ assert.commandWorked(secondary.adminCommand(
+ {configureFailPoint: "hangAfterRecordingOpApplicationStartTime", mode: "off"}));
+
+ // Make sure we log that insert op.
+ rst.awaitReplication();
+ checkLog.contains(secondary, "applied op: CRUD");
+
+ rst.stopSet();
+
+})(); \ No newline at end of file
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index 10cb6c30e0a..c813905d97e 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -674,6 +674,7 @@ env.CppUnitTest(
],
LIBDEPS=[
'$BUILD_DIR/mongo/db/dbdirectclient',
+ '$BUILD_DIR/mongo/util/clock_source_mock',
'idempotency_test_fixture',
'oplog_buffer_blocking_queue',
'oplog_interface_local',
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index 306e0b4185c..651592c587c 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -94,6 +94,8 @@ AtomicInt32 SyncTail::replBatchLimitOperations{5 * 1000};
namespace {
+MONGO_FP_DECLARE(hangAfterRecordingOpApplicationStartTime);
+
/**
* This variable determines the number of writer threads SyncTail will have. It has a default value,
* which varies based on architecture and can be overridden using the "replWriterThreadCount" server
@@ -316,6 +318,41 @@ bool SyncTail::peek(OperationContext* opCtx, BSONObj* op) {
return _networkQueue->peek(opCtx, op);
}
+/**
+ * Used for logging a report of ops that take longer than "slowMS" to apply. This is called
+ * right before returning from syncApply, and it returns the same status.
+ */
+Status finishAndLogApply(ClockSource* clockSource,
+ Status finalStatus,
+ Date_t applyStartTime,
+ const char* opType,
+ const BSONObj& op) {
+
+ if (finalStatus.isOK()) {
+ auto applyEndTime = clockSource->now();
+ auto diffMS = durationCount<Milliseconds>(applyEndTime - applyStartTime);
+
+ // This op was slow to apply, so we should log a report of it.
+ if (diffMS > serverGlobalParams.slowMS) {
+
+ StringBuilder s;
+ s << "applied op: ";
+
+ if (opType[0] == 'c') {
+ s << "command ";
+ } else {
+ s << "CRUD ";
+ }
+
+ s << redact(op);
+ s << ", took " << diffMS << "ms";
+
+ log() << s.str();
+ }
+ }
+ return finalStatus;
+}
+
// static
Status SyncTail::syncApply(OperationContext* opCtx,
const BSONObj& op,
@@ -353,6 +390,19 @@ Status SyncTail::syncApply(OperationContext* opCtx,
return status;
};
+ auto clockSource = opCtx->getServiceContext()->getFastClockSource();
+ auto applyStartTime = clockSource->now();
+
+ if (MONGO_FAIL_POINT(hangAfterRecordingOpApplicationStartTime)) {
+ log() << "syncApply - fail point hangAfterRecordingOpApplicationStartTime enabled. "
+ << "Blocking until fail point is disabled. ";
+ MONGO_FAIL_POINT_PAUSE_WHILE_SET(hangAfterRecordingOpApplicationStartTime);
+ }
+
+ auto finishApply = [&](Status status) {
+ return finishAndLogApply(clockSource, status, applyStartTime, opType, op);
+ };
+
bool isNoOp = opType[0] == 'n';
if (isNoOp || (opType[0] == 'i' && nss.isSystemDotIndexes())) {
if (isNoOp && nss.db() == "") {
@@ -361,11 +411,11 @@ Status SyncTail::syncApply(OperationContext* opCtx,
}
Lock::DBLock dbLock(opCtx, nss.db(), MODE_X);
OldClientContext ctx(opCtx, nss.ns());
- return applyOp(ctx.db());
+ return finishApply(applyOp(ctx.db()));
}
if (isCrudOpType(opType)) {
- return writeConflictRetry(opCtx, "syncApply_CRUD", nss.ns(), [&] {
+ return finishApply(writeConflictRetry(opCtx, "syncApply_CRUD", nss.ns(), [&] {
// DB lock always acquires the global lock
Lock::DBLock dbLock(opCtx, nss.db(), MODE_IX);
auto ui = op["ui"];
@@ -399,11 +449,11 @@ Status SyncTail::syncApply(OperationContext* opCtx,
OldClientContext ctx(opCtx, actualNss.ns(), db, /*justCreated*/ false);
return applyOp(ctx.db());
- });
+ }));
}
if (opType[0] == 'c') {
- return writeConflictRetry(opCtx, "syncApply_command", nss.ns(), [&] {
+ return finishApply(writeConflictRetry(opCtx, "syncApply_command", nss.ns(), [&] {
// a command may need a global write lock. so we will conservatively go
// ahead and grab one here. suboptimal. :-(
Lock::GlobalWrite globalWriteLock(opCtx);
@@ -412,7 +462,7 @@ Status SyncTail::syncApply(OperationContext* opCtx,
Status status = applyCommandInLock(opCtx, op, oplogApplicationMode);
incrementOpsAppliedStats();
return status;
- });
+ }));
}
// unknown opType
diff --git a/src/mongo/db/repl/sync_tail_test.cpp b/src/mongo/db/repl/sync_tail_test.cpp
index fd3eaa79b0f..975249cda01 100644
--- a/src/mongo/db/repl/sync_tail_test.cpp
+++ b/src/mongo/db/repl/sync_tail_test.cpp
@@ -70,6 +70,7 @@
#include "mongo/stdx/mutex.h"
#include "mongo/unittest/death_test.h"
#include "mongo/unittest/unittest.h"
+#include "mongo/util/clock_source_mock.h"
#include "mongo/util/concurrency/old_thread_pool.h"
#include "mongo/util/md5.hpp"
#include "mongo/util/scopeguard.h"
@@ -2123,6 +2124,137 @@ TEST_F(IdempotencyTest, ConvertToCappedNamespaceNotFound) {
ASSERT_FALSE(autoColl.getDb());
}
+TEST_F(SyncTailTest, LogSlowOpApplicationWhenSuccessful) {
+ // We are inserting into an existing collection.
+ const NamespaceString nss("test.t");
+ createCollection(_opCtx.get(), nss, {});
+
+ // This duration is greater than "slowMS", so the op would be considered slow.
+ auto applyDuration = serverGlobalParams.slowMS * 10;
+ getServiceContext()->setFastClockSource(
+ stdx::make_unique<AutoAdvancingClockSourceMock>(Milliseconds(applyDuration)));
+
+ auto entry = repl::OplogEntry(OpTime(Timestamp(1, 1), 1), // optime
+ 1LL, // hash
+ OpTypeEnum::kInsert, // opType
+ nss, // namespace
+ boost::none, // uuid
+ boost::none, // fromMigrate
+ repl::OplogEntry::kOplogVersion, // version
+ BSON("_id" << 1), // o
+ boost::none, // o2
+ {}, // sessionInfo
+ boost::none, // upsert
+ boost::none, // wall clock time
+ boost::none, // statement id
+ boost::none, // optime of previous write within same transaction
+ boost::none, // pre-image optime
+ boost::none); // post-image optime
+ startCapturingLogMessages();
+ ASSERT_OK(SyncTail::syncApply(_opCtx.get(),
+ entry.toBSON(),
+ OplogApplication::Mode::kSecondary,
+ _applyOp,
+ _applyCmd,
+ _incOps));
+
+ // Use a builder for easier escaping. We expect the operation to be logged.
+ StringBuilder expected;
+ expected << "applied op: CRUD { ts: Timestamp(1, 1), t: 1, h: 1, v: 2, op: \"i\", ns: "
+ "\"test.t\", o: { _id: 1 } }, took "
+ << applyDuration << "ms";
+
+ ASSERT_EQUALS(1, countLogLinesContaining(expected.str()));
+}
+
+TEST_F(SyncTailTest, DoNotLogSlowOpApplicationWhenFailed) {
+ // We are trying to insert into a nonexistent database.
+ NamespaceString nss("test.t");
+
+ // This duration is greater than "slowMS", so the op would be considered slow.
+ auto applyDuration = serverGlobalParams.slowMS * 10;
+ getServiceContext()->setFastClockSource(
+ stdx::make_unique<AutoAdvancingClockSourceMock>(Milliseconds(applyDuration)));
+
+ auto entry = repl::OplogEntry(OpTime(Timestamp(1, 1), 1), // optime
+ 1LL, // hash
+ OpTypeEnum::kInsert, // opType
+ nss, // namespace
+ boost::none, // uuid
+ boost::none, // fromMigrate
+ repl::OplogEntry::kOplogVersion, // version
+ BSON("_id" << 1), // o
+ boost::none, // o2
+ {}, // sessionInfo
+ boost::none, // upsert
+ boost::none, // wall clock time
+ boost::none, // statement id
+ boost::none, // optime of previous write within same transaction
+ boost::none, // pre-image optime
+ boost::none); // post-image optime
+
+ startCapturingLogMessages();
+ ASSERT_THROWS(SyncTail::syncApply(_opCtx.get(),
+ entry.toBSON(),
+ OplogApplication::Mode::kSecondary,
+ _applyOp,
+ _applyCmd,
+ _incOps)
+ .transitional_ignore(),
+ ExceptionFor<ErrorCodes::NamespaceNotFound>);
+
+ // Use a builder for easier escaping. We expect the operation to *not* be logged
+ // even thought it was slow, since we couldn't apply it successfully.
+ StringBuilder expected;
+ expected << "applied op: CRUD { ts: Timestamp(1, 1), t: 1, h: 1, v: 2, op: \"i\", ns: "
+ "\"test.t\", o: { _id: 1 } }, took "
+ << applyDuration << "ms";
+ ASSERT_EQUALS(0, countLogLinesContaining(expected.str()));
+}
+
+TEST_F(SyncTailTest, DoNotLogNonSlowOpApplicationWhenSuccessful) {
+ // This duration is below "slowMS", so the op would *not* be considered slow.
+ auto applyDuration = serverGlobalParams.slowMS / 10;
+ getServiceContext()->setFastClockSource(
+ stdx::make_unique<AutoAdvancingClockSourceMock>(Milliseconds(applyDuration)));
+
+ // We are inserting into an existing collection.
+ const NamespaceString nss("test.t");
+ createCollection(_opCtx.get(), nss, {});
+ auto entry = repl::OplogEntry(OpTime(Timestamp(1, 1), 1), // optime
+ 1LL, // hash
+ OpTypeEnum::kInsert, // opType
+ nss, // namespace
+ boost::none, // uuid
+ boost::none, // fromMigrate
+ repl::OplogEntry::kOplogVersion, // version
+ BSON("_id" << 1), // o
+ boost::none, // o2
+ {}, // sessionInfo
+ boost::none, // upsert
+ boost::none, // wall clock time
+ boost::none, // statement id
+ boost::none, // optime of previous write within same transaction
+ boost::none, // pre-image optime
+ boost::none); // post-image optime
+
+ startCapturingLogMessages();
+ ASSERT_OK(SyncTail::syncApply(_opCtx.get(),
+ entry.toBSON(),
+ OplogApplication::Mode::kSecondary,
+ _applyOp,
+ _applyCmd,
+ _incOps));
+
+ // Use a builder for easier escaping. We expect the operation to *not* be logged,
+ // since it wasn't slow to apply.
+ StringBuilder expected;
+ expected << "applied op: CRUD { ts: Timestamp(1, 1), t: 1, h: 1, v: 2, op: \"i\", ns: "
+ "\"test.t\", o: { _id: 1 } }, took "
+ << applyDuration << "ms";
+ ASSERT_EQUALS(0, countLogLinesContaining(expected.str()));
+}
+
} // namespace
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/util/clock_source_mock.h b/src/mongo/util/clock_source_mock.h
index 0fc968a430e..a257775bb7c 100644
--- a/src/mongo/util/clock_source_mock.h
+++ b/src/mongo/util/clock_source_mock.h
@@ -43,7 +43,7 @@ namespace mongo {
/**
* Mock clock source that returns a fixed time until explicitly advanced.
*/
-class ClockSourceMock final : public ClockSource {
+class ClockSourceMock : public ClockSource {
public:
/**
* Constructs a ClockSourceMock with the current time set to the Unix epoch.
@@ -75,6 +75,22 @@ private:
std::vector<Alarm> _alarms;
};
+/**
+ * Mock clock source where reading the clock also advances the current time by a fixed interval.
+ */
+class AutoAdvancingClockSourceMock : public ClockSourceMock {
+public:
+ AutoAdvancingClockSourceMock(Milliseconds increment) : _increment(increment) {}
+
+ Date_t now() override {
+ ClockSourceMock::advance(_increment);
+ return ClockSourceMock::now();
+ }
+
+private:
+ const Milliseconds _increment;
+};
+
class SharedClockSourceAdapter final : public ClockSource {
public:
explicit SharedClockSourceAdapter(std::shared_ptr<ClockSource> source)