summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/oplog_fetcher.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/repl/oplog_fetcher.cpp')
-rw-r--r--src/mongo/db/repl/oplog_fetcher.cpp90
1 files changed, 63 insertions, 27 deletions
diff --git a/src/mongo/db/repl/oplog_fetcher.cpp b/src/mongo/db/repl/oplog_fetcher.cpp
index afff88413c3..0609e46c634 100644
--- a/src/mongo/db/repl/oplog_fetcher.cpp
+++ b/src/mongo/db/repl/oplog_fetcher.cpp
@@ -41,6 +41,7 @@
#include "mongo/db/repl/replication_auth.h"
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/stats/timer_stats.h"
+#include "mongo/logv2/log.h"
#include "mongo/rpc/metadata/oplog_query_metadata.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/fail_point.h"
@@ -467,7 +468,7 @@ StatusWith<BSONObj> OplogFetcher::_onSuccessfulBatch(const Fetcher::QueryRespons
[&](auto&&) {
status = {ErrorCodes::FailPointEnabled,
"stopReplProducerOnDocument fail point is enabled."};
- log() << status.reason();
+ LOGV2(21264, "{status_reason}", "status_reason"_attr = status.reason());
},
[&](const BSONObj& data) {
auto opCtx = cc().makeOperationContext();
@@ -485,17 +486,25 @@ StatusWith<BSONObj> OplogFetcher::_onSuccessfulBatch(const Fetcher::QueryRespons
auto firstDocToApply = documents.cbegin();
if (!documents.empty()) {
- LOG(2) << "oplog fetcher read " << documents.size()
- << " operations from remote oplog starting at " << documents.front()["ts"]
- << " and ending at " << documents.back()["ts"];
+ LOGV2_DEBUG(21265,
+ 2,
+ "oplog fetcher read {documents_size} operations from remote oplog starting at "
+ "{documents_front_ts} and ending at {documents_back_ts}",
+ "documents_size"_attr = documents.size(),
+ "documents_front_ts"_attr = documents.front()["ts"],
+ "documents_back_ts"_attr = documents.back()["ts"]);
} else {
- LOG(2) << "oplog fetcher read 0 operations from remote oplog";
+ LOGV2_DEBUG(21266, 2, "oplog fetcher read 0 operations from remote oplog");
}
auto oqMetadataResult = parseOplogQueryMetadata(queryResponse);
if (!oqMetadataResult.isOK()) {
- error() << "invalid oplog query metadata from sync source " << _getSource() << ": "
- << oqMetadataResult.getStatus() << ": " << queryResponse.otherFields.metadata;
+ LOGV2_ERROR(21276,
+ "invalid oplog query metadata from sync source {getSource}: "
+ "{oqMetadataResult_getStatus}: {queryResponse_otherFields_metadata}",
+ "getSource"_attr = _getSource(),
+ "oqMetadataResult_getStatus"_attr = oqMetadataResult.getStatus(),
+ "queryResponse_otherFields_metadata"_attr = queryResponse.otherFields.metadata);
return oqMetadataResult.getStatus();
}
auto oqMetadata = oqMetadataResult.getValue();
@@ -519,7 +528,10 @@ StatusWith<BSONObj> OplogFetcher::_onSuccessfulBatch(const Fetcher::QueryRespons
return status;
}
- LOG(1) << "oplog fetcher successfully fetched from " << _getSource();
+ LOGV2_DEBUG(21267,
+ 1,
+ "oplog fetcher successfully fetched from {getSource}",
+ "getSource"_attr = _getSource());
// We do not always enqueue the first document. We elect to skip it for the following
// reasons:
@@ -553,8 +565,12 @@ StatusWith<BSONObj> OplogFetcher::_onSuccessfulBatch(const Fetcher::QueryRespons
const auto& metadataObj = queryResponse.otherFields.metadata;
auto metadataResult = rpc::ReplSetMetadata::readFromMetadata(metadataObj);
if (!metadataResult.isOK()) {
- error() << "invalid replication metadata from sync source " << _getSource() << ": "
- << metadataResult.getStatus() << ": " << metadataObj;
+ LOGV2_ERROR(21277,
+ "invalid replication metadata from sync source {getSource}: "
+ "{metadataResult_getStatus}: {metadataObj}",
+ "getSource"_attr = _getSource(),
+ "metadataResult_getStatus"_attr = metadataResult.getStatus(),
+ "metadataObj"_attr = metadataObj);
return metadataResult.getStatus();
}
replSetMetadata = metadataResult.getValue();
@@ -838,7 +854,7 @@ void NewOplogFetcher::_runQuery(const executor::TaskExecutor::CallbackArgs& call
if (MONGO_unlikely(logAfterOplogFetcherConnCreated.shouldFail())) {
// Used in tests that wait for this failpoint to be entered to ensure the DBClientConnection
// was created.
- log() << "logAfterOplogFetcherConnCreated failpoint enabled.";
+ LOGV2(21268, "logAfterOplogFetcherConnCreated failpoint enabled.");
}
hangAfterOplogFetcherCallbackScheduled.pauseWhileSet();
@@ -1074,7 +1090,7 @@ Status NewOplogFetcher::_onSuccessfulBatch(const Documents& documents) {
[&](auto&&) {
status = {ErrorCodes::FailPointEnabled,
"stopReplProducerOnDocument fail point is enabled."};
- log() << status.reason();
+ LOGV2(21269, "{status_reason}", "status_reason"_attr = status.reason());
},
[&](const BSONObj& data) {
auto opCtx = cc().makeOperationContext();
@@ -1091,17 +1107,25 @@ Status NewOplogFetcher::_onSuccessfulBatch(const Documents& documents) {
auto firstDocToApply = documents.cbegin();
if (!documents.empty()) {
- LOG(2) << "oplog fetcher read " << documents.size()
- << " operations from remote oplog starting at " << documents.front()["ts"]
- << " and ending at " << documents.back()["ts"];
+ LOGV2_DEBUG(21270,
+ 2,
+ "oplog fetcher read {documents_size} operations from remote oplog starting at "
+ "{documents_front_ts} and ending at {documents_back_ts}",
+ "documents_size"_attr = documents.size(),
+ "documents_front_ts"_attr = documents.front()["ts"],
+ "documents_back_ts"_attr = documents.back()["ts"]);
} else {
- LOG(2) << "oplog fetcher read 0 operations from remote oplog";
+ LOGV2_DEBUG(21271, 2, "oplog fetcher read 0 operations from remote oplog");
}
auto oqMetadataResult = parseOplogQueryMetadata(_metadataObj);
if (!oqMetadataResult.isOK()) {
- error() << "invalid oplog query metadata from sync source " << _source << ": "
- << oqMetadataResult.getStatus() << ": " << _metadataObj;
+ LOGV2_ERROR(21278,
+ "invalid oplog query metadata from sync source {source}: "
+ "{oqMetadataResult_getStatus}: {metadataObj}",
+ "source"_attr = _source,
+ "oqMetadataResult_getStatus"_attr = oqMetadataResult.getStatus(),
+ "metadataObj"_attr = _metadataObj);
return oqMetadataResult.getStatus();
}
auto oqMetadata = oqMetadataResult.getValue();
@@ -1124,7 +1148,8 @@ Status NewOplogFetcher::_onSuccessfulBatch(const Documents& documents) {
return status;
}
- LOG(1) << "oplog fetcher successfully fetched from " << _source;
+ LOGV2_DEBUG(
+ 21272, 1, "oplog fetcher successfully fetched from {source}", "source"_attr = _source);
// We do not always enqueue the first document. We elect to skip it for the following
// reasons:
@@ -1156,8 +1181,12 @@ Status NewOplogFetcher::_onSuccessfulBatch(const Documents& documents) {
if (receivedReplMetadata) {
auto metadataResult = rpc::ReplSetMetadata::readFromMetadata(_metadataObj);
if (!metadataResult.isOK()) {
- error() << "invalid replication metadata from sync source " << _source << ": "
- << metadataResult.getStatus() << ": " << _metadataObj;
+ LOGV2_ERROR(21279,
+ "invalid replication metadata from sync source {source}: "
+ "{metadataResult_getStatus}: {metadataObj}",
+ "source"_attr = _source,
+ "metadataResult_getStatus"_attr = metadataResult.getStatus(),
+ "metadataObj"_attr = _metadataObj);
return metadataResult.getStatus();
}
replSetMetadata = metadataResult.getValue();
@@ -1210,7 +1239,10 @@ Status NewOplogFetcher::_onSuccessfulBatch(const Documents& documents) {
}
auto lastDocOpTime = lastDocOpTimeRes.getValue();
- LOG(3) << "Oplog fetcher setting last fetched optime ahead after batch: " << lastDocOpTime;
+ LOGV2_DEBUG(21273,
+ 3,
+ "Oplog fetcher setting last fetched optime ahead after batch: {lastDocOpTime}",
+ "lastDocOpTime"_attr = lastDocOpTime);
stdx::lock_guard<Latch> lock(_mutex);
_lastFetched = lastDocOpTime;
@@ -1223,13 +1255,17 @@ Status NewOplogFetcher::_onSuccessfulBatch(const Documents& documents) {
bool NewOplogFetcher::OplogFetcherRestartDecisionDefault::shouldContinue(NewOplogFetcher* fetcher,
Status status) {
if (_numRestarts == _maxRestarts) {
- log() << "Error returned from oplog query (no more query restarts left): "
- << redact(status);
+ LOGV2(21274,
+ "Error returned from oplog query (no more query restarts left): {status}",
+ "status"_attr = redact(status));
return false;
}
- log() << "Recreating cursor for oplog fetcher due to error: " << redact(status)
- << ". Last fetched optime: " << fetcher->_getLastOpTimeFetched()
- << ". Attempts remaining: " << (_maxRestarts - _numRestarts);
+ LOGV2(21275,
+ "Recreating cursor for oplog fetcher due to error: {status}. Last fetched optime: "
+ "{fetcher_getLastOpTimeFetched}. Attempts remaining: {maxRestarts_numRestarts}",
+ "status"_attr = redact(status),
+ "fetcher_getLastOpTimeFetched"_attr = fetcher->_getLastOpTimeFetched(),
+ "maxRestarts_numRestarts"_attr = (_maxRestarts - _numRestarts));
_numRestarts++;
return true;
}