summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/db/repl/bgsync.cpp3
-rw-r--r--src/mongo/db/repl/oplog.cpp45
-rw-r--r--src/mongo/db/repl/oplog.h4
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state.h7
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.cpp12
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.h1
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_mock.cpp2
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_mock.h1
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp6
-rw-r--r--src/mongo/db/repl/sync_tail.cpp2
10 files changed, 80 insertions, 3 deletions
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp
index 4e4a69385bd..96deab1683a 100644
--- a/src/mongo/db/repl/bgsync.cpp
+++ b/src/mongo/db/repl/bgsync.cpp
@@ -106,7 +106,8 @@ Status checkRemoteOplogStart(stdx::function<StatusWith<BSONObj>()> getNextOperat
if (opTime != lastOpTimeFetched || hash != lastHashFetched) {
return Status(ErrorCodes::OplogStartMissing,
str::stream() << "our last op time fetched: " << lastOpTimeFetched.toString()
- << ". source's GTE: " << opTime.toString());
+ << ". source's GTE: " << opTime.toString() << " hashes: ("
+ << lastHashFetched << "/" << hash << ")");
}
return Status::OK();
}
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp
index d662dc29825..9f415ba1917 100644
--- a/src/mongo/db/repl/oplog.cpp
+++ b/src/mongo/db/repl/oplog.cpp
@@ -294,6 +294,51 @@ OplogDocWriter _logOpWriter(OperationContext* txn,
}
} // end anon namespace
+// Truncates the oplog to and including the "truncateTimestamp" entry.
+void truncateOplogTo(OperationContext* txn, Timestamp truncateTimestamp) {
+ const NamespaceString oplogNss(rsOplogName);
+ ScopedTransaction transaction(txn, MODE_IX);
+ AutoGetDb autoDb(txn, oplogNss.db(), MODE_IX);
+ Lock::CollectionLock oplogCollectionLoc(txn->lockState(), oplogNss.ns(), MODE_X);
+ Collection* oplogCollection = autoDb.getDb()->getCollection(oplogNss);
+ if (!oplogCollection) {
+ fassertFailedWithStatusNoTrace(
+ 28820,
+ Status(ErrorCodes::NamespaceNotFound, str::stream() << "Can't find " << rsOplogName));
+ }
+
+ // Scan through oplog in reverse, from latest entry to first, to find the truncateTimestamp.
+ bool foundSomethingToTruncate = false;
+ RecordId lastRecordId;
+ BSONObj lastOplogEntry;
+ auto oplogRs = oplogCollection->getRecordStore();
+ auto oplogReverseCursor = oplogRs->getCursor(txn, false);
+ bool first = true;
+ while (auto next = oplogReverseCursor->next()) {
+ lastOplogEntry = next->data.releaseToBson();
+ lastRecordId = next->id;
+
+ const auto tsElem = lastOplogEntry["ts"];
+
+ if (first) {
+ if (tsElem.eoo())
+ LOG(2) << "Oplog tail entry: " << lastOplogEntry;
+ else
+ LOG(2) << "Oplog tail entry ts field: " << tsElem;
+ first = false;
+ }
+
+ if (tsElem.timestamp() < truncateTimestamp) {
+ break;
+ }
+
+ foundSomethingToTruncate = true;
+ }
+
+ if (foundSomethingToTruncate) {
+ oplogCollection->temp_cappedTruncateAfter(txn, lastRecordId, false);
+ }
+}
/* we write to local.oplog.rs:
{ ts : ..., h: ..., v: ..., op: ..., etc }
ts: an OpTime timestamp
diff --git a/src/mongo/db/repl/oplog.h b/src/mongo/db/repl/oplog.h
index 764d1cfe1c0..bd47fbc0a1b 100644
--- a/src/mongo/db/repl/oplog.h
+++ b/src/mongo/db/repl/oplog.h
@@ -52,6 +52,10 @@ class RecordId;
namespace repl {
class ReplSettings;
+/**
+ * Truncates the oplog after, and including, the "truncateTimestamp" entry.
+ */
+void truncateOplogTo(OperationContext* txn, Timestamp truncateTimestamp);
/**
* Create a new capped collection for the oplog if it doesn't yet exist.
diff --git a/src/mongo/db/repl/replication_coordinator_external_state.h b/src/mongo/db/repl/replication_coordinator_external_state.h
index 2a815c0d758..5255c6ffbd7 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state.h
+++ b/src/mongo/db/repl/replication_coordinator_external_state.h
@@ -145,6 +145,13 @@ public:
virtual StatusWith<OpTime> loadLastOpTime(OperationContext* txn) = 0;
/**
+ * Cleaning up the oplog, by potentially truncating:
+ * If we are recovering from a failed batch then minvalid.start though minvalid.end need
+ * to be removed from the oplog before we can start applying operations.
+ */
+ virtual void cleanUpLastApplyBatch(OperationContext* txn) = 0;
+
+ /**
* Returns the HostAndPort of the remote client connected to us that initiated the operation
* represented by "txn".
*/
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
index 6dd4570611f..b39319f7824 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
@@ -50,6 +50,7 @@
#include "mongo/db/repl/isself.h"
#include "mongo/db/repl/last_vote.h"
#include "mongo/db/repl/master_slave.h"
+#include "mongo/db/repl/minvalid.h"
#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/repl_settings.h"
#include "mongo/db/repl/rs_sync.h"
@@ -84,6 +85,7 @@ const char tsFieldName[] = "ts";
// Set this to false to disable the background creation of snapshots. This can be used for A-B
// benchmarking to find how much overhead repl::SnapshotThread introduces.
MONGO_EXPORT_STARTUP_SERVER_PARAMETER(enableReplSnapshotThread, bool, true);
+
} // namespace
ReplicationCoordinatorExternalStateImpl::ReplicationCoordinatorExternalStateImpl()
@@ -287,6 +289,16 @@ void ReplicationCoordinatorExternalStateImpl::setGlobalTimestamp(const Timestamp
setNewTimestamp(newTime);
}
+void ReplicationCoordinatorExternalStateImpl::cleanUpLastApplyBatch(OperationContext* txn) {
+ auto mv = getMinValid(txn);
+
+ if (!mv.start.isNull()) {
+ // If we are in the middle of a batch, and recoveringm then we need to truncate the oplog.
+ LOG(2) << "Recovering from a failed apply batch, start:" << mv.start.toBSON();
+ truncateOplogTo(txn, mv.start.getTimestamp());
+ }
+}
+
StatusWith<OpTime> ReplicationCoordinatorExternalStateImpl::loadLastOpTime(OperationContext* txn) {
// TODO: handle WriteConflictExceptions below
try {
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.h b/src/mongo/db/repl/replication_coordinator_external_state_impl.h
index 1f36394a05c..d3d34e95031 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.h
@@ -63,6 +63,7 @@ public:
virtual Status storeLocalLastVoteDocument(OperationContext* txn, const LastVote& lastVote);
virtual void setGlobalTimestamp(const Timestamp& newTime);
virtual StatusWith<OpTime> loadLastOpTime(OperationContext* txn);
+ virtual void cleanUpLastApplyBatch(OperationContext* txn);
virtual HostAndPort getClientHostAndPort(const OperationContext* txn);
virtual void closeConnections();
virtual void killAllUserOperations(OperationContext* txn);
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp
index 801a1ba2d2c..320a7aaa6cb 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp
@@ -142,6 +142,8 @@ void ReplicationCoordinatorExternalStateMock::setLocalLastVoteDocument(
void ReplicationCoordinatorExternalStateMock::setGlobalTimestamp(const Timestamp& newTime) {}
+void ReplicationCoordinatorExternalStateMock::cleanUpLastApplyBatch(OperationContext* txn) {}
+
StatusWith<OpTime> ReplicationCoordinatorExternalStateMock::loadLastOpTime(OperationContext* txn) {
return _lastOpTime;
}
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.h b/src/mongo/db/repl/replication_coordinator_external_state_mock.h
index 2de3bbe50ec..5d18c034b15 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_mock.h
+++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.h
@@ -67,6 +67,7 @@ public:
virtual Status storeLocalLastVoteDocument(OperationContext* txn, const LastVote& lastVote);
virtual void setGlobalTimestamp(const Timestamp& newTime);
virtual StatusWith<OpTime> loadLastOpTime(OperationContext* txn);
+ virtual void cleanUpLastApplyBatch(OperationContext* txn);
virtual void closeConnections();
virtual void killAllUserOperations(OperationContext* txn);
virtual void clearShardingState();
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index c029db24838..b4c10c1e420 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -305,7 +305,9 @@ bool ReplicationCoordinatorImpl::_startLoadLocalConfig(OperationContext* txn) {
fassertFailedNoTrace(28545);
}
- StatusWith<OpTime> lastOpTimeStatus = _externalState->loadLastOpTime(txn);
+ // Returns the last optime from the oplog, possibly truncating first if we need to recover.
+ _externalState->cleanUpLastApplyBatch(txn);
+ auto lastOpTimeStatus = _externalState->loadLastOpTime(txn);
// Use a callback here, because _finishLoadLocalConfig calls isself() which requires
// that the server's networking layer be up and running and accepting connections, which
@@ -365,7 +367,7 @@ void ReplicationCoordinatorImpl::_finishLoadLocalConfig(
OpTime lastOpTime;
if (!isArbiter) {
if (!lastOpTimeStatus.isOK()) {
- warning() << "Failed to load timestamp of most recently applied operation; "
+ warning() << "Failed to load timestamp of most recently applied operation: "
<< lastOpTimeStatus.getStatus();
} else {
lastOpTime = lastOpTimeStatus.getValue();
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index 47d2b4edd5b..548a3928432 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -46,6 +46,7 @@
#include "mongo/db/commands/fsync.h"
#include "mongo/db/commands/server_status_metric.h"
#include "mongo/db/concurrency/write_conflict_exception.h"
+#include "mongo/db/dbhelpers.h"
#include "mongo/db/curop.h"
#include "mongo/db/db_raii.h"
#include "mongo/db/global_timestamp.h"
@@ -435,6 +436,7 @@ void SyncTail::oplogApplication() {
OperationContextImpl txn;
OpTime originalEndOpTime(getMinValid(&txn).end);
+
while (!inShutdown()) {
OpQueue ops;