summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorBenety Goh <benety@mongodb.com>2018-05-09 14:37:27 -0400
committerBenety Goh <benety@mongodb.com>2018-05-09 14:37:27 -0400
commitd3ad5762ef90fef083584333a39fd9fd26407c58 (patch)
treefafd6d1c164fe7b40be3f6e7147796f5062c124c /src
parent7e4f6276ec553354e2b88a209e3000ea7f729513 (diff)
downloadmongo-d3ad5762ef90fef083584333a39fd9fd26407c58.tar.gz
SERVER-32334 add OplogBuffer adaptor for DBDirectClient oplog query
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/repl/replication_recovery.cpp149
1 files changed, 119 insertions, 30 deletions
diff --git a/src/mongo/db/repl/replication_recovery.cpp b/src/mongo/db/repl/replication_recovery.cpp
index 286841e070a..eda4598e838 100644
--- a/src/mongo/db/repl/replication_recovery.cpp
+++ b/src/mongo/db/repl/replication_recovery.cpp
@@ -37,6 +37,7 @@
#include "mongo/db/db_raii.h"
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/namespace_string.h"
+#include "mongo/db/repl/oplog_buffer.h"
#include "mongo/db/repl/replication_consistency_markers_impl.h"
#include "mongo/db/repl/storage_interface.h"
#include "mongo/db/repl/sync_tail.h"
@@ -47,6 +48,112 @@
namespace mongo {
namespace repl {
+namespace {
+
+/**
+ * OplogBuffer adaptor for a DBClient query on the oplog.
+ * Implements only functions used by OplogApplier::getNextApplierBatch().
+ */
+class OplogBufferLocalOplog final : public OplogBuffer {
+public:
+ explicit OplogBufferLocalOplog(Timestamp oplogApplicationStartPoint)
+ : _oplogApplicationStartPoint(oplogApplicationStartPoint) {}
+
+ void startup(OperationContext* opCtx) final {
+ _client = std::make_unique<DBDirectClient>(opCtx);
+ _cursor = _client->query(NamespaceString::kRsOplogNamespace.ns(),
+ QUERY("ts" << BSON("$gte" << _oplogApplicationStartPoint)),
+ /*batchSize*/ 0,
+ /*skip*/ 0,
+ /*projection*/ nullptr,
+ QueryOption_OplogReplay);
+
+ // Check that the first document matches our appliedThrough point then skip it since it's
+ // already been applied.
+ if (!_cursor->more()) {
+ // This should really be impossible because we check above that the top of the oplog is
+ // strictly > appliedThrough. If this fails it represents a serious bug in either the
+ // storage engine or query's implementation of OplogReplay.
+ severe() << "Couldn't find any entries in the oplog >= "
+ << _oplogApplicationStartPoint.toBSON() << " which should be impossible.";
+ fassertFailedNoTrace(40293);
+ }
+
+ auto firstTimestampFound =
+ fassert(40291, OpTime::parseFromOplogEntry(_cursor->nextSafe())).getTimestamp();
+ if (firstTimestampFound != _oplogApplicationStartPoint) {
+ severe() << "Oplog entry at " << _oplogApplicationStartPoint.toBSON()
+ << " is missing; actual entry found is " << firstTimestampFound.toBSON();
+ fassertFailedNoTrace(40292);
+ }
+ }
+
+ void shutdown(OperationContext*) final {
+ _cursor = {};
+ _client = {};
+ }
+
+ bool isEmpty() const final {
+ return !_cursor->more();
+ }
+
+ bool tryPop(OperationContext*, Value* value) final {
+ return _peekOrPop(value, Mode::kPop);
+ }
+
+ bool peek(OperationContext*, Value* value) final {
+ return _peekOrPop(value, Mode::kPeek);
+ }
+
+ void pushEvenIfFull(OperationContext*, const Value&) final {
+ MONGO_UNREACHABLE;
+ }
+ void push(OperationContext*, const Value&) final {
+ MONGO_UNREACHABLE;
+ }
+ void pushAllNonBlocking(OperationContext*, Batch::const_iterator, Batch::const_iterator) final {
+ MONGO_UNREACHABLE;
+ }
+ void waitForSpace(OperationContext*, std::size_t) final {
+ MONGO_UNREACHABLE;
+ }
+ std::size_t getMaxSize() const final {
+ MONGO_UNREACHABLE;
+ }
+ std::size_t getSize() const final {
+ MONGO_UNREACHABLE;
+ }
+ std::size_t getCount() const final {
+ MONGO_UNREACHABLE;
+ }
+ void clear(OperationContext*) final {
+ MONGO_UNREACHABLE;
+ }
+ bool waitForData(Seconds) final {
+ MONGO_UNREACHABLE;
+ }
+ boost::optional<Value> lastObjectPushed(OperationContext*) const final {
+ MONGO_UNREACHABLE;
+ }
+
+private:
+ enum class Mode { kPeek, kPop };
+ bool _peekOrPop(Value* value, Mode mode) {
+ if (isEmpty()) {
+ return false;
+ }
+ *value = mode == Mode::kPeek ? _cursor->peekFirst() : _cursor->nextSafe();
+ invariant(!value->isEmpty());
+ return true;
+ }
+
+ const Timestamp _oplogApplicationStartPoint;
+ std::unique_ptr<DBDirectClient> _client;
+ std::unique_ptr<DBClientCursor> _cursor;
+};
+
+} // namespace
+
ReplicationRecoveryImpl::ReplicationRecoveryImpl(StorageInterface* storageInterface,
ReplicationConsistencyMarkers* consistencyMarkers)
: _storageInterface(storageInterface), _consistencyMarkers(consistencyMarkers) {}
@@ -197,44 +304,21 @@ void ReplicationRecoveryImpl::_applyToEndOfOplog(OperationContext* opCtx,
log() << "Replaying stored operations from " << oplogApplicationStartPoint.toBSON()
<< " (exclusive) to " << topOfOplog.toBSON() << " (inclusive).";
- DBDirectClient db(opCtx);
- auto cursor = db.query(NamespaceString::kRsOplogNamespace.ns(),
- QUERY("ts" << BSON("$gte" << oplogApplicationStartPoint)),
- /*batchSize*/ 0,
- /*skip*/ 0,
- /*projection*/ nullptr,
- QueryOption_OplogReplay);
-
- // Check that the first document matches our appliedThrough point then skip it since it's
- // already been applied.
- if (!cursor->more()) {
- // This should really be impossible because we check above that the top of the oplog is
- // strictly > appliedThrough. If this fails it represents a serious bug in either the
- // storage engine or query's implementation of OplogReplay.
- severe() << "Couldn't find any entries in the oplog >= "
- << oplogApplicationStartPoint.toBSON() << " which should be impossible.";
- fassertFailedNoTrace(40293);
- }
-
- auto firstTimestampFound =
- fassert(40291, OpTime::parseFromOplogEntry(cursor->nextSafe())).getTimestamp();
- if (firstTimestampFound != oplogApplicationStartPoint) {
- severe() << "Oplog entry at " << oplogApplicationStartPoint.toBSON()
- << " is missing; actual entry found is " << firstTimestampFound.toBSON();
- fassertFailedNoTrace(40292);
- }
+ OplogBufferLocalOplog oplogBuffer(oplogApplicationStartPoint);
+ oplogBuffer.startup(opCtx);
// Apply remaining ops one at at time, but don't log them because they are already logged.
UnreplicatedWritesBlock uwb(opCtx);
DisableDocumentValidation validationDisabler(opCtx);
+ OpTime applyThroughOpTime;
BSONObj entry;
- while (cursor->more()) {
- entry = cursor->nextSafe();
+ while (oplogBuffer.tryPop(opCtx, &entry)) {
LOG_FOR_RECOVERY(2) << "Applying op during replication recovery: " << redact(entry);
fassert(40294, SyncTail::syncApply(opCtx, entry, OplogApplication::Mode::kRecovering));
auto oplogEntry = fassert(50763, OplogEntry::parse(entry));
+ applyThroughOpTime = oplogEntry.getOpTime();
if (auto txnTableOplog = Session::createMatchingTransactionTableUpdate(oplogEntry)) {
fassert(50764,
SyncTail::syncApply(
@@ -242,14 +326,19 @@ void ReplicationRecoveryImpl::_applyToEndOfOplog(OperationContext* opCtx,
}
}
+ invariant(oplogBuffer.isEmpty(),
+ str::stream() << "Oplog buffer not empty after applying operations. Last operation "
+ "applied with optime: "
+ << applyThroughOpTime.toBSON());
+ oplogBuffer.shutdown(opCtx);
+
// We may crash before setting appliedThrough. If we have a stable checkpoint, we will recover
// to that checkpoint at a replication consistent point, and applying the oplog is safe.
// If we don't have a stable checkpoint, then we must be in startup recovery, and not rollback
// recovery, because we only roll back to a stable timestamp when we have a stable checkpoint.
// Startup recovery from an unstable checkpoint only ever applies a single batch and it is safe
// to replay the batch from any point.
- _consistencyMarkers->setAppliedThrough(opCtx,
- fassert(40295, OpTime::parseFromOplogEntry(entry)));
+ _consistencyMarkers->setAppliedThrough(opCtx, applyThroughOpTime);
}
StatusWith<OpTime> ReplicationRecoveryImpl::_getTopOfOplog(OperationContext* opCtx) const {