diff options
-rw-r--r-- | src/mongo/db/repl/bgsync.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog.cpp | 45 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog.h | 4 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_external_state.h | 7 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_external_state_impl.cpp | 12 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_external_state_impl.h | 1 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_external_state_mock.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_external_state_mock.h | 1 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl.cpp | 6 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail.cpp | 2 |
10 files changed, 80 insertions, 3 deletions
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp index 4e4a69385bd..96deab1683a 100644 --- a/src/mongo/db/repl/bgsync.cpp +++ b/src/mongo/db/repl/bgsync.cpp @@ -106,7 +106,8 @@ Status checkRemoteOplogStart(stdx::function<StatusWith<BSONObj>()> getNextOperat if (opTime != lastOpTimeFetched || hash != lastHashFetched) { return Status(ErrorCodes::OplogStartMissing, str::stream() << "our last op time fetched: " << lastOpTimeFetched.toString() - << ". source's GTE: " << opTime.toString()); + << ". source's GTE: " << opTime.toString() << " hashes: (" + << lastHashFetched << "/" << hash << ")"); } return Status::OK(); } diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index d662dc29825..9f415ba1917 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -294,6 +294,51 @@ OplogDocWriter _logOpWriter(OperationContext* txn, } } // end anon namespace +// Truncates the oplog to and including the "truncateTimestamp" entry. +void truncateOplogTo(OperationContext* txn, Timestamp truncateTimestamp) { + const NamespaceString oplogNss(rsOplogName); + ScopedTransaction transaction(txn, MODE_IX); + AutoGetDb autoDb(txn, oplogNss.db(), MODE_IX); + Lock::CollectionLock oplogCollectionLoc(txn->lockState(), oplogNss.ns(), MODE_X); + Collection* oplogCollection = autoDb.getDb()->getCollection(oplogNss); + if (!oplogCollection) { + fassertFailedWithStatusNoTrace( + 28820, + Status(ErrorCodes::NamespaceNotFound, str::stream() << "Can't find " << rsOplogName)); + } + + // Scan through oplog in reverse, from latest entry to first, to find the truncateTimestamp. + bool foundSomethingToTruncate = false; + RecordId lastRecordId; + BSONObj lastOplogEntry; + auto oplogRs = oplogCollection->getRecordStore(); + auto oplogReverseCursor = oplogRs->getCursor(txn, false); + bool first = true; + while (auto next = oplogReverseCursor->next()) { + lastOplogEntry = next->data.releaseToBson(); + lastRecordId = next->id; + + const auto tsElem = lastOplogEntry["ts"]; + + if (first) { + if (tsElem.eoo()) + LOG(2) << "Oplog tail entry: " << lastOplogEntry; + else + LOG(2) << "Oplog tail entry ts field: " << tsElem; + first = false; + } + + if (tsElem.timestamp() < truncateTimestamp) { + break; + } + + foundSomethingToTruncate = true; + } + + if (foundSomethingToTruncate) { + oplogCollection->temp_cappedTruncateAfter(txn, lastRecordId, false); + } +} /* we write to local.oplog.rs: { ts : ..., h: ..., v: ..., op: ..., etc } ts: an OpTime timestamp diff --git a/src/mongo/db/repl/oplog.h b/src/mongo/db/repl/oplog.h index 764d1cfe1c0..bd47fbc0a1b 100644 --- a/src/mongo/db/repl/oplog.h +++ b/src/mongo/db/repl/oplog.h @@ -52,6 +52,10 @@ class RecordId; namespace repl { class ReplSettings; +/** + * Truncates the oplog after, and including, the "truncateTimestamp" entry. + */ +void truncateOplogTo(OperationContext* txn, Timestamp truncateTimestamp); /** * Create a new capped collection for the oplog if it doesn't yet exist. diff --git a/src/mongo/db/repl/replication_coordinator_external_state.h b/src/mongo/db/repl/replication_coordinator_external_state.h index 2a815c0d758..5255c6ffbd7 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state.h +++ b/src/mongo/db/repl/replication_coordinator_external_state.h @@ -145,6 +145,13 @@ public: virtual StatusWith<OpTime> loadLastOpTime(OperationContext* txn) = 0; /** + * Cleaning up the oplog, by potentially truncating: + * If we are recovering from a failed batch then minvalid.start though minvalid.end need + * to be removed from the oplog before we can start applying operations. + */ + virtual void cleanUpLastApplyBatch(OperationContext* txn) = 0; + + /** * Returns the HostAndPort of the remote client connected to us that initiated the operation * represented by "txn". */ diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index 6dd4570611f..b39319f7824 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -50,6 +50,7 @@ #include "mongo/db/repl/isself.h" #include "mongo/db/repl/last_vote.h" #include "mongo/db/repl/master_slave.h" +#include "mongo/db/repl/minvalid.h" #include "mongo/db/repl/oplog.h" #include "mongo/db/repl/repl_settings.h" #include "mongo/db/repl/rs_sync.h" @@ -84,6 +85,7 @@ const char tsFieldName[] = "ts"; // Set this to false to disable the background creation of snapshots. This can be used for A-B // benchmarking to find how much overhead repl::SnapshotThread introduces. MONGO_EXPORT_STARTUP_SERVER_PARAMETER(enableReplSnapshotThread, bool, true); + } // namespace ReplicationCoordinatorExternalStateImpl::ReplicationCoordinatorExternalStateImpl() @@ -287,6 +289,16 @@ void ReplicationCoordinatorExternalStateImpl::setGlobalTimestamp(const Timestamp setNewTimestamp(newTime); } +void ReplicationCoordinatorExternalStateImpl::cleanUpLastApplyBatch(OperationContext* txn) { + auto mv = getMinValid(txn); + + if (!mv.start.isNull()) { + // If we are in the middle of a batch, and recoveringm then we need to truncate the oplog. + LOG(2) << "Recovering from a failed apply batch, start:" << mv.start.toBSON(); + truncateOplogTo(txn, mv.start.getTimestamp()); + } +} + StatusWith<OpTime> ReplicationCoordinatorExternalStateImpl::loadLastOpTime(OperationContext* txn) { // TODO: handle WriteConflictExceptions below try { diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.h b/src/mongo/db/repl/replication_coordinator_external_state_impl.h index 1f36394a05c..d3d34e95031 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.h +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.h @@ -63,6 +63,7 @@ public: virtual Status storeLocalLastVoteDocument(OperationContext* txn, const LastVote& lastVote); virtual void setGlobalTimestamp(const Timestamp& newTime); virtual StatusWith<OpTime> loadLastOpTime(OperationContext* txn); + virtual void cleanUpLastApplyBatch(OperationContext* txn); virtual HostAndPort getClientHostAndPort(const OperationContext* txn); virtual void closeConnections(); virtual void killAllUserOperations(OperationContext* txn); diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp index 801a1ba2d2c..320a7aaa6cb 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp @@ -142,6 +142,8 @@ void ReplicationCoordinatorExternalStateMock::setLocalLastVoteDocument( void ReplicationCoordinatorExternalStateMock::setGlobalTimestamp(const Timestamp& newTime) {} +void ReplicationCoordinatorExternalStateMock::cleanUpLastApplyBatch(OperationContext* txn) {} + StatusWith<OpTime> ReplicationCoordinatorExternalStateMock::loadLastOpTime(OperationContext* txn) { return _lastOpTime; } diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.h b/src/mongo/db/repl/replication_coordinator_external_state_mock.h index 2de3bbe50ec..5d18c034b15 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_mock.h +++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.h @@ -67,6 +67,7 @@ public: virtual Status storeLocalLastVoteDocument(OperationContext* txn, const LastVote& lastVote); virtual void setGlobalTimestamp(const Timestamp& newTime); virtual StatusWith<OpTime> loadLastOpTime(OperationContext* txn); + virtual void cleanUpLastApplyBatch(OperationContext* txn); virtual void closeConnections(); virtual void killAllUserOperations(OperationContext* txn); virtual void clearShardingState(); diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index c029db24838..b4c10c1e420 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -305,7 +305,9 @@ bool ReplicationCoordinatorImpl::_startLoadLocalConfig(OperationContext* txn) { fassertFailedNoTrace(28545); } - StatusWith<OpTime> lastOpTimeStatus = _externalState->loadLastOpTime(txn); + // Returns the last optime from the oplog, possibly truncating first if we need to recover. + _externalState->cleanUpLastApplyBatch(txn); + auto lastOpTimeStatus = _externalState->loadLastOpTime(txn); // Use a callback here, because _finishLoadLocalConfig calls isself() which requires // that the server's networking layer be up and running and accepting connections, which @@ -365,7 +367,7 @@ void ReplicationCoordinatorImpl::_finishLoadLocalConfig( OpTime lastOpTime; if (!isArbiter) { if (!lastOpTimeStatus.isOK()) { - warning() << "Failed to load timestamp of most recently applied operation; " + warning() << "Failed to load timestamp of most recently applied operation: " << lastOpTimeStatus.getStatus(); } else { lastOpTime = lastOpTimeStatus.getValue(); diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index 47d2b4edd5b..548a3928432 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -46,6 +46,7 @@ #include "mongo/db/commands/fsync.h" #include "mongo/db/commands/server_status_metric.h" #include "mongo/db/concurrency/write_conflict_exception.h" +#include "mongo/db/dbhelpers.h" #include "mongo/db/curop.h" #include "mongo/db/db_raii.h" #include "mongo/db/global_timestamp.h" @@ -435,6 +436,7 @@ void SyncTail::oplogApplication() { OperationContextImpl txn; OpTime originalEndOpTime(getMinValid(&txn).end); + while (!inShutdown()) { OpQueue ops; |