diff options
author | Benety Goh <benety@mongodb.com> | 2018-05-09 14:37:27 -0400 |
---|---|---|
committer | Benety Goh <benety@mongodb.com> | 2018-05-09 14:37:27 -0400 |
commit | d3ad5762ef90fef083584333a39fd9fd26407c58 (patch) | |
tree | fafd6d1c164fe7b40be3f6e7147796f5062c124c /src/mongo/db/repl/replication_recovery.cpp | |
parent | 7e4f6276ec553354e2b88a209e3000ea7f729513 (diff) | |
download | mongo-d3ad5762ef90fef083584333a39fd9fd26407c58.tar.gz |
SERVER-32334 add OplogBuffer adaptor for DBDirectClient oplog query
Diffstat (limited to 'src/mongo/db/repl/replication_recovery.cpp')
-rw-r--r-- | src/mongo/db/repl/replication_recovery.cpp | 149 |
1 files changed, 119 insertions, 30 deletions
diff --git a/src/mongo/db/repl/replication_recovery.cpp b/src/mongo/db/repl/replication_recovery.cpp index 286841e070a..eda4598e838 100644 --- a/src/mongo/db/repl/replication_recovery.cpp +++ b/src/mongo/db/repl/replication_recovery.cpp @@ -37,6 +37,7 @@ #include "mongo/db/db_raii.h" #include "mongo/db/dbdirectclient.h" #include "mongo/db/namespace_string.h" +#include "mongo/db/repl/oplog_buffer.h" #include "mongo/db/repl/replication_consistency_markers_impl.h" #include "mongo/db/repl/storage_interface.h" #include "mongo/db/repl/sync_tail.h" @@ -47,6 +48,112 @@ namespace mongo { namespace repl { +namespace { + +/** + * OplogBuffer adaptor for a DBClient query on the oplog. + * Implements only functions used by OplogApplier::getNextApplierBatch(). + */ +class OplogBufferLocalOplog final : public OplogBuffer { +public: + explicit OplogBufferLocalOplog(Timestamp oplogApplicationStartPoint) + : _oplogApplicationStartPoint(oplogApplicationStartPoint) {} + + void startup(OperationContext* opCtx) final { + _client = std::make_unique<DBDirectClient>(opCtx); + _cursor = _client->query(NamespaceString::kRsOplogNamespace.ns(), + QUERY("ts" << BSON("$gte" << _oplogApplicationStartPoint)), + /*batchSize*/ 0, + /*skip*/ 0, + /*projection*/ nullptr, + QueryOption_OplogReplay); + + // Check that the first document matches our appliedThrough point then skip it since it's + // already been applied. + if (!_cursor->more()) { + // This should really be impossible because we check above that the top of the oplog is + // strictly > appliedThrough. If this fails it represents a serious bug in either the + // storage engine or query's implementation of OplogReplay. + severe() << "Couldn't find any entries in the oplog >= " + << _oplogApplicationStartPoint.toBSON() << " which should be impossible."; + fassertFailedNoTrace(40293); + } + + auto firstTimestampFound = + fassert(40291, OpTime::parseFromOplogEntry(_cursor->nextSafe())).getTimestamp(); + if (firstTimestampFound != _oplogApplicationStartPoint) { + severe() << "Oplog entry at " << _oplogApplicationStartPoint.toBSON() + << " is missing; actual entry found is " << firstTimestampFound.toBSON(); + fassertFailedNoTrace(40292); + } + } + + void shutdown(OperationContext*) final { + _cursor = {}; + _client = {}; + } + + bool isEmpty() const final { + return !_cursor->more(); + } + + bool tryPop(OperationContext*, Value* value) final { + return _peekOrPop(value, Mode::kPop); + } + + bool peek(OperationContext*, Value* value) final { + return _peekOrPop(value, Mode::kPeek); + } + + void pushEvenIfFull(OperationContext*, const Value&) final { + MONGO_UNREACHABLE; + } + void push(OperationContext*, const Value&) final { + MONGO_UNREACHABLE; + } + void pushAllNonBlocking(OperationContext*, Batch::const_iterator, Batch::const_iterator) final { + MONGO_UNREACHABLE; + } + void waitForSpace(OperationContext*, std::size_t) final { + MONGO_UNREACHABLE; + } + std::size_t getMaxSize() const final { + MONGO_UNREACHABLE; + } + std::size_t getSize() const final { + MONGO_UNREACHABLE; + } + std::size_t getCount() const final { + MONGO_UNREACHABLE; + } + void clear(OperationContext*) final { + MONGO_UNREACHABLE; + } + bool waitForData(Seconds) final { + MONGO_UNREACHABLE; + } + boost::optional<Value> lastObjectPushed(OperationContext*) const final { + MONGO_UNREACHABLE; + } + +private: + enum class Mode { kPeek, kPop }; + bool _peekOrPop(Value* value, Mode mode) { + if (isEmpty()) { + return false; + } + *value = mode == Mode::kPeek ? _cursor->peekFirst() : _cursor->nextSafe(); + invariant(!value->isEmpty()); + return true; + } + + const Timestamp _oplogApplicationStartPoint; + std::unique_ptr<DBDirectClient> _client; + std::unique_ptr<DBClientCursor> _cursor; +}; + +} // namespace + ReplicationRecoveryImpl::ReplicationRecoveryImpl(StorageInterface* storageInterface, ReplicationConsistencyMarkers* consistencyMarkers) : _storageInterface(storageInterface), _consistencyMarkers(consistencyMarkers) {} @@ -197,44 +304,21 @@ void ReplicationRecoveryImpl::_applyToEndOfOplog(OperationContext* opCtx, log() << "Replaying stored operations from " << oplogApplicationStartPoint.toBSON() << " (exclusive) to " << topOfOplog.toBSON() << " (inclusive)."; - DBDirectClient db(opCtx); - auto cursor = db.query(NamespaceString::kRsOplogNamespace.ns(), - QUERY("ts" << BSON("$gte" << oplogApplicationStartPoint)), - /*batchSize*/ 0, - /*skip*/ 0, - /*projection*/ nullptr, - QueryOption_OplogReplay); - - // Check that the first document matches our appliedThrough point then skip it since it's - // already been applied. - if (!cursor->more()) { - // This should really be impossible because we check above that the top of the oplog is - // strictly > appliedThrough. If this fails it represents a serious bug in either the - // storage engine or query's implementation of OplogReplay. - severe() << "Couldn't find any entries in the oplog >= " - << oplogApplicationStartPoint.toBSON() << " which should be impossible."; - fassertFailedNoTrace(40293); - } - - auto firstTimestampFound = - fassert(40291, OpTime::parseFromOplogEntry(cursor->nextSafe())).getTimestamp(); - if (firstTimestampFound != oplogApplicationStartPoint) { - severe() << "Oplog entry at " << oplogApplicationStartPoint.toBSON() - << " is missing; actual entry found is " << firstTimestampFound.toBSON(); - fassertFailedNoTrace(40292); - } + OplogBufferLocalOplog oplogBuffer(oplogApplicationStartPoint); + oplogBuffer.startup(opCtx); // Apply remaining ops one at at time, but don't log them because they are already logged. UnreplicatedWritesBlock uwb(opCtx); DisableDocumentValidation validationDisabler(opCtx); + OpTime applyThroughOpTime; BSONObj entry; - while (cursor->more()) { - entry = cursor->nextSafe(); + while (oplogBuffer.tryPop(opCtx, &entry)) { LOG_FOR_RECOVERY(2) << "Applying op during replication recovery: " << redact(entry); fassert(40294, SyncTail::syncApply(opCtx, entry, OplogApplication::Mode::kRecovering)); auto oplogEntry = fassert(50763, OplogEntry::parse(entry)); + applyThroughOpTime = oplogEntry.getOpTime(); if (auto txnTableOplog = Session::createMatchingTransactionTableUpdate(oplogEntry)) { fassert(50764, SyncTail::syncApply( @@ -242,14 +326,19 @@ void ReplicationRecoveryImpl::_applyToEndOfOplog(OperationContext* opCtx, } } + invariant(oplogBuffer.isEmpty(), + str::stream() << "Oplog buffer not empty after applying operations. Last operation " + "applied with optime: " + << applyThroughOpTime.toBSON()); + oplogBuffer.shutdown(opCtx); + // We may crash before setting appliedThrough. If we have a stable checkpoint, we will recover // to that checkpoint at a replication consistent point, and applying the oplog is safe. // If we don't have a stable checkpoint, then we must be in startup recovery, and not rollback // recovery, because we only roll back to a stable timestamp when we have a stable checkpoint. // Startup recovery from an unstable checkpoint only ever applies a single batch and it is safe // to replay the batch from any point. - _consistencyMarkers->setAppliedThrough(opCtx, - fassert(40295, OpTime::parseFromOplogEntry(entry))); + _consistencyMarkers->setAppliedThrough(opCtx, applyThroughOpTime); } StatusWith<OpTime> ReplicationRecoveryImpl::_getTopOfOplog(OperationContext* opCtx) const { |