summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--jstests/noPassthrough/minvalid.js7
-rw-r--r--jstests/replsets/oplog_truncated_on_recovery.js16
-rw-r--r--jstests/replsets/replset1.js4
-rw-r--r--src/mongo/db/repl/SConscript1
-rw-r--r--src/mongo/db/repl/bgsync.cpp18
-rw-r--r--src/mongo/db/repl/data_replicator.cpp1
-rw-r--r--src/mongo/db/repl/oplog.cpp49
-rw-r--r--src/mongo/db/repl/oplogreader.cpp3
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state.h18
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.cpp135
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.h5
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_mock.cpp10
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_mock.h3
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp34
-rw-r--r--src/mongo/db/repl/roll_back_local_operations.cpp19
-rw-r--r--src/mongo/db/repl/roll_back_local_operations.h3
-rw-r--r--src/mongo/db/repl/roll_back_local_operations_test.cpp12
-rw-r--r--src/mongo/db/repl/rs_initialsync.cpp19
-rw-r--r--src/mongo/db/repl/rs_rollback.cpp13
-rw-r--r--src/mongo/db/repl/rs_rollback_test.cpp8
-rw-r--r--src/mongo/db/repl/storage_interface.cpp15
-rw-r--r--src/mongo/db/repl/storage_interface.h58
-rw-r--r--src/mongo/db/repl/storage_interface_impl.cpp187
-rw-r--r--src/mongo/db/repl/storage_interface_impl.h19
-rw-r--r--src/mongo/db/repl/storage_interface_impl_test.cpp53
-rw-r--r--src/mongo/db/repl/storage_interface_mock.cpp35
-rw-r--r--src/mongo/db/repl/storage_interface_mock.h16
-rw-r--r--src/mongo/db/repl/sync_tail.cpp123
-rw-r--r--src/mongo/db/repl/sync_tail.h4
29 files changed, 491 insertions, 397 deletions
diff --git a/jstests/noPassthrough/minvalid.js b/jstests/noPassthrough/minvalid.js
index 6f22e65e2ca..d31f6d58da7 100644
--- a/jstests/noPassthrough/minvalid.js
+++ b/jstests/noPassthrough/minvalid.js
@@ -21,9 +21,10 @@ var lastOp = local.oplog.rs.find().sort({$natural: -1}).limit(1).next();
printjson(lastOp);
print("3: change minvalid");
-// primaries don't populate minvalid by default
-local.replset.minvalid.insert(
- {ts: new Timestamp(lastOp.ts.t, lastOp.ts.i + 1), h: new NumberLong("1234567890")});
+assert.writeOK(local.replset.minvalid.update(
+ {},
+ {$set: {ts: new Timestamp(lastOp.ts.t, lastOp.ts.i + 1), h: new NumberLong("1234567890")}},
+ {upsert: true}));
printjson(local.replset.minvalid.findOne());
print("4: restart");
diff --git a/jstests/replsets/oplog_truncated_on_recovery.js b/jstests/replsets/oplog_truncated_on_recovery.js
index f4e1bf9b1ec..4d469178691 100644
--- a/jstests/replsets/oplog_truncated_on_recovery.js
+++ b/jstests/replsets/oplog_truncated_on_recovery.js
@@ -50,18 +50,24 @@
log(primaryOpTime);
// Set the start of the failed batch
+ // TODO this test should restart in stand-alone mode to futz with the state rather than trying
+ // to do it on a running primary.
jsTest.log("future TS: " + tojson(farFutureTS) + ", date:" + tsToDate(farFutureTS));
+ var divergedTS = new Timestamp(primaryOpTime.ts.t, primaryOpTime.ts.i + 1);
// We do an update in case there is a minvalid document on the primary already.
// If the doc doesn't exist then upsert:true will create it, and the writeConcern ensures
// that update returns details of the write, like whether an update or insert was performed.
- log(assert.writeOK(
- minvalidColl.update({},
- {ts: farFutureTS, t: NumberLong(-1), begin: primaryOpTime},
- {upsert: true, writeConcern: {w: 1}})));
+ log(assert.writeOK(minvalidColl.update({},
+ {
+ ts: farFutureTS,
+ t: NumberLong(-1),
+ begin: primaryOpTime,
+ oplogDeleteFromPoint: divergedTS
+ },
+ {upsert: true, writeConcern: {w: 1}})));
// Insert a diverged oplog entry that will be truncated after restart.
- var divergedTS = new Timestamp(primaryOpTime.ts.t, primaryOpTime.ts.i + 1);
log(assert.writeOK(localDB.oplog.rs.insert(
{_id: 0, ts: divergedTS, op: "n", h: NumberLong(0), t: NumberLong(-1)})));
log(localDB.oplog.rs.find().toArray());
diff --git a/jstests/replsets/replset1.js b/jstests/replsets/replset1.js
index ed254f99758..68ca8968fb1 100644
--- a/jstests/replsets/replset1.js
+++ b/jstests/replsets/replset1.js
@@ -32,9 +32,7 @@ var doTest = function(signal) {
var isPV1 = (replTest.getReplSetConfigFromNode().protocolVersion == 1);
if (isPV1) {
// Ensure the primary logs an n-op to the oplog upon transitioning to primary.
- var oplog_entry = master.getDB("local").oplog.rs.find().sort({$natural: -1})[0];
- assert.eq("new primary", oplog_entry["o"]["msg"]);
- assert.eq("n", oplog_entry["op"]);
+ assert.gt(master.getDB("local").oplog.rs.count({op: 'n', o: {msg: 'new primary'}}), 0);
}
// Calling getPrimary also makes available the liveNodes structure,
// which looks like this:
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index 175cf7c17d0..36b6b4cc603 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -1019,6 +1019,7 @@ env.Library(
'roll_back_local_operations.cpp',
],
LIBDEPS=[
+ 'optime',
'$BUILD_DIR/mongo/base',
'$BUILD_DIR/mongo/util/foundation',
],
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp
index db134069821..d31775f0001 100644
--- a/src/mongo/db/repl/bgsync.cpp
+++ b/src/mongo/db/repl/bgsync.cpp
@@ -302,12 +302,9 @@ void BackgroundSync::_produce(OperationContext* txn) {
log() << "Our newest OpTime : " << lastOpTimeFetched;
log() << "Earliest OpTime available is " << syncSourceResp.earliestOpTimeSeen;
log() << "See http://dochub.mongodb.org/core/resyncingaverystalereplicasetmember";
-
- StorageInterface::get(txn)->setMinValid(
- txn, {lastOpTimeFetched, syncSourceResp.earliestOpTimeSeen});
auto status = _replCoord->setMaintenanceMode(true);
if (!status.isOK()) {
- warning() << "Failed to transition into maintenance mode.";
+ warning() << "Failed to transition into maintenance mode: " << status;
}
bool worked = _replCoord->setFollowerMode(MemberState::RS_RECOVERING);
if (!worked) {
@@ -342,6 +339,13 @@ void BackgroundSync::_produce(OperationContext* txn) {
}
}
+ // Set the applied point if unset. This is most likely the first time we've established a sync
+ // source since stepping down or otherwise clearing the applied point. We need to set this here,
+ // before the OplogWriter gets a chance to append to the oplog.
+ if (StorageInterface::get(txn)->getAppliedThrough(txn).isNull()) {
+ StorageInterface::get(txn)->setAppliedThrough(txn, _replCoord->getMyLastAppliedOpTime());
+ }
+
// "lastFetched" not used. Already set in _enqueueDocuments.
Status fetcherReturnStatus = Status::OK();
DataReplicatorExternalStateBackgroundSync dataReplicatorExternalState(
@@ -442,13 +446,13 @@ void BackgroundSync::_produce(OperationContext* txn) {
}
// check that we are at minvalid, otherwise we cannot roll back as we may be in an
// inconsistent state
- BatchBoundaries boundaries = StorageInterface::get(txn)->getMinValid(txn);
- if (!boundaries.start.isNull() || boundaries.end > lastApplied) {
+ const auto minValid = StorageInterface::get(txn)->getMinValid(txn);
+ if (lastApplied < minValid) {
fassertNoTrace(18750,
Status(ErrorCodes::UnrecoverableRollbackError,
str::stream() << "need to rollback, but in inconsistent state. "
<< "minvalid: "
- << boundaries.end.toString()
+ << minValid.toString()
<< " > our last optime: "
<< lastApplied.toString()));
}
diff --git a/src/mongo/db/repl/data_replicator.cpp b/src/mongo/db/repl/data_replicator.cpp
index 3a101edf49c..edf62d53261 100644
--- a/src/mongo/db/repl/data_replicator.cpp
+++ b/src/mongo/db/repl/data_replicator.cpp
@@ -806,7 +806,6 @@ StatusWith<OpTimeWithHash> DataReplicator::doInitialSync(OperationContext* txn,
_lastFetched = _lastApplied;
_storage->clearInitialSyncFlag(txn);
- _storage->setMinValid(txn, _lastApplied.opTime, DurableRequirement::Strong);
_opts.setMyLastOptime(_lastApplied.opTime);
log() << "initial sync done; took " << _stats.initialSyncEnd - _stats.initialSyncStart
<< " milliseconds.";
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp
index 5fda4574d05..2e610523d99 100644
--- a/src/mongo/db/repl/oplog.cpp
+++ b/src/mongo/db/repl/oplog.cpp
@@ -288,7 +288,7 @@ OplogDocWriter _logOpWriter(OperationContext* txn,
}
} // end anon namespace
-// Truncates the oplog to but excluding the "truncateTimestamp" entry.
+// Truncates the oplog after and including the "truncateTimestamp" entry.
void truncateOplogTo(OperationContext* txn, Timestamp truncateTimestamp) {
const NamespaceString oplogNss(rsOplogName);
ScopedTransaction transaction(txn, MODE_IX);
@@ -302,42 +302,41 @@ void truncateOplogTo(OperationContext* txn, Timestamp truncateTimestamp) {
}
// Scan through oplog in reverse, from latest entry to first, to find the truncateTimestamp.
- bool foundSomethingToTruncate = false;
- RecordId lastRecordId;
- BSONObj lastOplogEntry;
+ RecordId oldestIDToDelete; // Non-null if there is something to delete.
auto oplogRs = oplogCollection->getRecordStore();
- auto oplogReverseCursor = oplogRs->getCursor(txn, false);
- bool first = true;
+ auto oplogReverseCursor = oplogRs->getCursor(txn, /*forward=*/false);
+ size_t count = 0;
while (auto next = oplogReverseCursor->next()) {
- lastOplogEntry = next->data.releaseToBson();
- lastRecordId = next->id;
+ const BSONObj entry = next->data.releaseToBson();
+ const RecordId id = next->id;
+ count++;
- const auto tsElem = lastOplogEntry["ts"];
-
- if (first) {
+ const auto tsElem = entry["ts"];
+ if (count == 1) {
if (tsElem.eoo())
- LOG(2) << "Oplog tail entry: " << lastOplogEntry;
+ LOG(2) << "Oplog tail entry: " << entry;
else
LOG(2) << "Oplog tail entry ts field: " << tsElem;
- first = false;
}
- if (tsElem.timestamp() == truncateTimestamp) {
- break;
- } else if (tsElem.timestamp() < truncateTimestamp) {
- fassertFailedWithStatusNoTrace(34411,
- Status(ErrorCodes::OplogOutOfOrder,
- str::stream() << "Can't find "
- << truncateTimestamp.toString()
- << " to truncate from!"));
+ if (tsElem.timestamp() < truncateTimestamp) {
+ // If count == 1, that means that we have nothing to delete because everything in the
+ // oplog is < truncateTimestamp.
+ if (count != 1) {
+ invariant(!oldestIDToDelete.isNull());
+ oplogCollection->temp_cappedTruncateAfter(
+ txn, oldestIDToDelete, /*inclusive=*/true);
+ }
+ return;
}
- foundSomethingToTruncate = true;
+ oldestIDToDelete = id;
}
- if (foundSomethingToTruncate) {
- oplogCollection->temp_cappedTruncateAfter(txn, lastRecordId, false);
- }
+ severe() << "Reached end of oplog looking for oplog entry before "
+ << truncateTimestamp.toStringPretty()
+ << " but couldn't find any after looking through " << count << " entries.";
+ fassertFailedNoTrace(40296);
}
/* we write to local.oplog.rs:
diff --git a/src/mongo/db/repl/oplogreader.cpp b/src/mongo/db/repl/oplogreader.cpp
index f9a08da370f..6ab73a44b01 100644
--- a/src/mongo/db/repl/oplogreader.cpp
+++ b/src/mongo/db/repl/oplogreader.cpp
@@ -154,10 +154,9 @@ void OplogReader::connectToSyncSource(OperationContext* txn,
log() << "our last optime : " << lastOpTimeFetched;
log() << "oldest available is " << oldestOpTimeSeen;
log() << "See http://dochub.mongodb.org/core/resyncingaverystalereplicasetmember";
- StorageInterface::get(txn)->setMinValid(txn, {lastOpTimeFetched, oldestOpTimeSeen});
auto status = replCoord->setMaintenanceMode(true);
if (!status.isOK()) {
- warning() << "Failed to transition into maintenance mode.";
+ warning() << "Failed to transition into maintenance mode: " << status;
}
bool worked = replCoord->setFollowerMode(MemberState::RS_RECOVERING);
if (!worked) {
diff --git a/src/mongo/db/repl/replication_coordinator_external_state.h b/src/mongo/db/repl/replication_coordinator_external_state.h
index 857bac4abd7..f64214b462c 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state.h
+++ b/src/mongo/db/repl/replication_coordinator_external_state.h
@@ -136,9 +136,16 @@ public:
virtual Status initializeReplSetStorage(OperationContext* txn, const BSONObj& config) = 0;
/**
- * Writes a message about our transition to primary to the oplog.
+ * Called as part of the process of transitioning to primary. See the call site in
+ * ReplicationCoordinatorImpl for details about when and how it is called.
+ *
+ * Among other things, this writes a message about our transition to primary to the oplog if
+ * isV1 and and returns the optime of that message. If !isV1, returns the optime of the last op
+ * in the oplog.
+ *
+ * Throws on errors.
*/
- virtual void logTransitionToPrimaryToOplog(OperationContext* txn) = 0;
+ virtual OpTime onTransitionToPrimary(OperationContext* txn, bool isV1ElectionProtocol) = 0;
/**
* Simple wrapper around SyncSourceFeedback::forwardSlaveProgress. Signals to the
@@ -225,13 +232,6 @@ public:
virtual void shardingOnStepDownHook() = 0;
/**
- * Called when the instance transitions to primary. Calls all drain mode hooks.
- *
- * Throws on errors.
- */
- virtual void drainModeHook(OperationContext* txn) = 0;
-
- /**
* Notifies the bgsync and syncSourceFeedback threads to choose a new sync source.
*/
virtual void signalApplierToChooseNewSyncSource() = 0;
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
index c5d36369d07..ad89b160e80 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
@@ -43,6 +43,7 @@
#include "mongo/db/commands/feature_compatibility_version.h"
#include "mongo/db/concurrency/d_concurrency.h"
#include "mongo/db/concurrency/write_conflict_exception.h"
+#include "mongo/db/dbdirectclient.h"
#include "mongo/db/dbhelpers.h"
#include "mongo/db/jsobj.h"
#include "mongo/db/op_observer.h"
@@ -87,6 +88,7 @@
#include "mongo/util/mongoutils/str.h"
#include "mongo/util/net/hostandport.h"
#include "mongo/util/net/listen.h"
+#include "mongo/util/scopeguard.h"
namespace mongo {
namespace repl {
@@ -286,6 +288,13 @@ void ReplicationCoordinatorExternalStateImpl::shutdown(OperationContext* txn) {
_snapshotThread->shutdown();
}
+ if (_storageInterface->getOplogDeleteFromPoint(txn).isNull() &&
+ loadLastOpTime(txn) == _storageInterface->getAppliedThrough(txn)) {
+ // Clear the appliedThrough marker to indicate we are consistent with the top of the
+ // oplog.
+ _storageInterface->setAppliedThrough(txn, {});
+ }
+
log() << "Stopping replication storage threads";
_taskExecutor->shutdown();
_taskExecutor->join();
@@ -318,24 +327,46 @@ Status ReplicationCoordinatorExternalStateImpl::initializeReplSetStorage(Operati
wuow.commit();
}
MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "initiate oplog entry", "local.oplog.rs");
+
+ // This initializes the minvalid document with a null "ts" because older versions (<=3.2)
+ // get angry if the minValid document is present but doesn't have a "ts" field.
+ // Consider removing this once we no longer need to support downgrading to 3.2.
+ _storageInterface->setMinValidToAtLeast(txn, {});
} catch (const DBException& ex) {
return ex.toStatus();
}
return Status::OK();
}
-void ReplicationCoordinatorExternalStateImpl::logTransitionToPrimaryToOplog(OperationContext* txn) {
- MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
- ScopedTransaction scopedXact(txn, MODE_X);
+OpTime ReplicationCoordinatorExternalStateImpl::onTransitionToPrimary(OperationContext* txn,
+ bool isV1ElectionProtocol) {
+
+ // Clear the appliedThrough marker so on startup we'll use the top of the oplog. This must be
+ // done before we add anything to our oplog.
+ invariant(_storageInterface->getOplogDeleteFromPoint(txn).isNull());
+ _storageInterface->setAppliedThrough(txn, {});
- WriteUnitOfWork wuow(txn);
- txn->getClient()->getServiceContext()->getOpObserver()->onOpMessage(txn,
- BSON("msg"
- << "new primary"));
- wuow.commit();
+ if (isV1ElectionProtocol) {
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
+ ScopedTransaction scopedXact(txn, MODE_X);
+
+ WriteUnitOfWork wuow(txn);
+ txn->getClient()->getServiceContext()->getOpObserver()->onOpMessage(
+ txn,
+ BSON("msg"
+ << "new primary"));
+ wuow.commit();
+ }
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_END(
+ txn, "logging transition to primary to oplog", "local.oplog.rs");
}
- MONGO_WRITE_CONFLICT_RETRY_LOOP_END(
- txn, "logging transition to primary to oplog", "local.oplog.rs");
+ const auto opTimeToReturn = fassertStatusOK(28665, loadLastOpTime(txn));
+
+ FeatureCompatibilityVersion::setIfCleanStartup(txn);
+ shardingOnTransitionToPrimaryHook(txn);
+ dropAllTempCollections(txn);
+
+ return opTimeToReturn;
}
void ReplicationCoordinatorExternalStateImpl::forwardSlaveProgress() {
@@ -453,12 +484,77 @@ void ReplicationCoordinatorExternalStateImpl::cleanUpLastApplyBatch(OperationCon
if (_storageInterface->getInitialSyncFlag(txn)) {
return; // Initial Sync will take over so no cleanup is needed.
}
- auto mv = _storageInterface->getMinValid(txn);
- if (!mv.start.isNull()) {
- // If we are in the middle of a batch, and recoveringm then we need to truncate the oplog.
- LOG(2) << "Recovering from a failed apply batch, start:" << mv.start.toBSON();
- truncateOplogTo(txn, mv.start.getTimestamp());
+ const auto deleteFromPoint = _storageInterface->getOplogDeleteFromPoint(txn);
+ const auto appliedThrough = _storageInterface->getAppliedThrough(txn);
+
+ const bool needToDeleteEndOfOplog = !deleteFromPoint.isNull() &&
+ // This version should never have a non-null deleteFromPoint with a null appliedThrough.
+ // This scenario means that we downgraded after unclean shutdown, then the downgraded node
+ // deleted the ragged end of our oplog, then did a clean shutdown.
+ !appliedThrough.isNull() &&
+ // Similarly we should never have an appliedThrough higher than the deleteFromPoint. This
+ // means that the downgraded node deleted our ragged end then applied ahead of our
+ // deleteFromPoint and then had an unclean shutdown before upgrading. We are ok with
+ // applying these ops because older versions wrote to the oplog from a single thread so we
+ // know they are in order.
+ !(appliedThrough.getTimestamp() >= deleteFromPoint);
+ if (needToDeleteEndOfOplog) {
+ log() << "Removing unapplied entries starting at: " << deleteFromPoint;
+ truncateOplogTo(txn, deleteFromPoint);
+ }
+ _storageInterface->setOplogDeleteFromPoint(txn, {}); // clear the deleteFromPoint
+
+ if (appliedThrough.isNull()) {
+ // No follow-up work to do.
+ return;
+ }
+
+ // Check if we have any unapplied ops in our oplog. It is important that this is done after
+ // deleting the ragged end of the oplog.
+ const auto topOfOplog = fassertStatusOK(40290, loadLastOpTime(txn));
+ if (topOfOplog >= appliedThrough) {
+ return; // We've applied all the valid oplog we have.
+ }
+
+ log() << "Replaying stored operations from " << appliedThrough << " (exclusive) to "
+ << topOfOplog << " (inclusive).";
+
+ DBDirectClient db(txn);
+ auto cursor = db.query(rsOplogName,
+ QUERY("ts" << BSON("$gte" << appliedThrough.getTimestamp())),
+ /*batchSize*/ 0,
+ /*skip*/ 0,
+ /*projection*/ nullptr,
+ QueryOption_OplogReplay);
+
+ // Check that the first document matches our appliedThrough point then skip it since it's
+ // already been applied.
+ if (!cursor->more()) {
+ // This should really be impossible because we check above that the top of the oplog is
+ // strictly > appliedThrough. If this fails it represents a serious bug in either the
+ // storage engine or query's implementation of OplogReplay.
+ severe() << "Couldn't find any entries in the oplog >= " << appliedThrough
+ << " which should be impossible.";
+ fassertFailedNoTrace(40293);
+ }
+ auto firstOpTimeFound = fassertStatusOK(40291, OpTime::parseFromOplogEntry(cursor->nextSafe()));
+ if (firstOpTimeFound != appliedThrough) {
+ severe() << "Oplog entry at " << appliedThrough << " is missing; actual entry found is "
+ << firstOpTimeFound;
+ fassertFailedNoTrace(40292);
+ }
+
+ // Apply remaining ops one at at time, but don't log them because they are already logged.
+ const bool wereWritesReplicated = txn->writesAreReplicated();
+ ON_BLOCK_EXIT([&] { txn->setReplicatedWrites(wereWritesReplicated); });
+ txn->setReplicatedWrites(false);
+
+ while (cursor->more()) {
+ auto entry = cursor->nextSafe();
+ fassertStatusOK(40294, SyncTail::syncApply(txn, entry, true));
+ _storageInterface->setAppliedThrough(
+ txn, fassertStatusOK(40295, OpTime::parseFromOplogEntry(entry)));
}
}
@@ -524,13 +620,8 @@ void ReplicationCoordinatorExternalStateImpl::shardingOnStepDownHook() {
ShardingState::get(getGlobalServiceContext())->clearCollectionMetadata();
}
-void ReplicationCoordinatorExternalStateImpl::drainModeHook(OperationContext* txn) {
- FeatureCompatibilityVersion::setIfCleanStartup(txn);
- shardingOnDrainingStateHook(txn);
- dropAllTempCollections(txn);
-}
-
-void ReplicationCoordinatorExternalStateImpl::shardingOnDrainingStateHook(OperationContext* txn) {
+void ReplicationCoordinatorExternalStateImpl::shardingOnTransitionToPrimaryHook(
+ OperationContext* txn) {
auto status = ShardingStateRecovery::recover(txn);
if (ErrorCodes::isShutdownError(status.code())) {
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.h b/src/mongo/db/repl/replication_coordinator_external_state_impl.h
index 7e339031d17..0dec8b25560 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.h
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.h
@@ -74,7 +74,7 @@ public:
virtual executor::TaskExecutor* getTaskExecutor() const override;
virtual OldThreadPool* getDbWorkThreadPool() const override;
virtual Status initializeReplSetStorage(OperationContext* txn, const BSONObj& config);
- virtual void logTransitionToPrimaryToOplog(OperationContext* txn);
+ virtual OpTime onTransitionToPrimary(OperationContext* txn, bool isV1ElectionProtocol);
virtual void forwardSlaveProgress();
virtual OID ensureMe(OperationContext* txn);
virtual bool isSelf(const HostAndPort& host, ServiceContext* ctx);
@@ -89,7 +89,6 @@ public:
virtual void closeConnections();
virtual void killAllUserOperations(OperationContext* txn);
virtual void shardingOnStepDownHook();
- virtual void drainModeHook(OperationContext* txn);
virtual void signalApplierToChooseNewSyncSource();
virtual void signalApplierToCancelFetcher();
void dropAllSnapshots() final;
@@ -129,7 +128,7 @@ private:
*
* Throws on errors.
*/
- void shardingOnDrainingStateHook(OperationContext* txn);
+ void shardingOnTransitionToPrimaryHook(OperationContext* txn);
/**
* Drops all temporary collections on all databases except "local".
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp
index 4b104c697fa..80d362cd13c 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp
@@ -226,8 +226,6 @@ void ReplicationCoordinatorExternalStateMock::killAllUserOperations(OperationCon
void ReplicationCoordinatorExternalStateMock::shardingOnStepDownHook() {}
-void ReplicationCoordinatorExternalStateMock::drainModeHook(OperationContext* txn) {}
-
void ReplicationCoordinatorExternalStateMock::signalApplierToChooseNewSyncSource() {}
void ReplicationCoordinatorExternalStateMock::signalApplierToCancelFetcher() {
@@ -288,8 +286,12 @@ void ReplicationCoordinatorExternalStateMock::setIsReadCommittedEnabled(bool val
_isReadCommittedSupported = val;
}
-void ReplicationCoordinatorExternalStateMock::logTransitionToPrimaryToOplog(OperationContext* txn) {
- _lastOpTime = OpTime(Timestamp(1, 0), 1);
+OpTime ReplicationCoordinatorExternalStateMock::onTransitionToPrimary(OperationContext* txn,
+ bool isV1ElectionProtocol) {
+ if (isV1ElectionProtocol) {
+ _lastOpTime = OpTime(Timestamp(1, 0), 1);
+ }
+ return fassertStatusOK(40297, _lastOpTime);
}
} // namespace repl
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.h b/src/mongo/db/repl/replication_coordinator_external_state_mock.h
index 5df6cd7733f..c8ac8263b0b 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_mock.h
+++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.h
@@ -69,7 +69,7 @@ public:
virtual executor::TaskExecutor* getTaskExecutor() const override;
virtual OldThreadPool* getDbWorkThreadPool() const override;
virtual Status initializeReplSetStorage(OperationContext* txn, const BSONObj& config);
- virtual void logTransitionToPrimaryToOplog(OperationContext* txn);
+ virtual OpTime onTransitionToPrimary(OperationContext* txn, bool isV1ElectionProtocol);
virtual void forwardSlaveProgress();
virtual OID ensureMe(OperationContext*);
virtual bool isSelf(const HostAndPort& host, ServiceContext* ctx);
@@ -84,7 +84,6 @@ public:
virtual void closeConnections();
virtual void killAllUserOperations(OperationContext* txn);
virtual void shardingOnStepDownHook();
- virtual void drainModeHook(OperationContext* txn);
virtual void signalApplierToChooseNewSyncSource();
virtual void signalApplierToCancelFetcher();
virtual void dropAllSnapshots();
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index a25fce29a13..00f3635072c 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -448,7 +448,7 @@ bool ReplicationCoordinatorImpl::_startLoadLocalConfig(OperationContext* txn) {
fassertFailedNoTrace(28545);
}
- // Returns the last optime from the oplog, possibly truncating first if we need to recover.
+ // Read the last op from the oplog after cleaning up any partially applied batches.
_externalState->cleanUpLastApplyBatch(txn);
auto lastOpTimeStatus = _externalState->loadLastOpTime(txn);
@@ -856,7 +856,7 @@ void ReplicationCoordinatorImpl::signalDrainComplete(OperationContext* txn) {
// _isWaitingForDrainToComplete, set the flag allowing non-local database writes and
// drop the mutex. At this point, no writes can occur from other threads, due to the
// global exclusive lock.
- // 4.) Drop all temp collections.
+ // 4.) Drop all temp collections, and log the drops to the oplog.
// 5.) Log transition to primary in the oplog and set that OpTime as the floor for what we will
// consider to be committed.
// 6.) Drop the global exclusive lock.
@@ -868,40 +868,42 @@ void ReplicationCoordinatorImpl::signalDrainComplete(OperationContext* txn) {
// external writes will be processed. This is important so that a new temp collection isn't
// introduced on the new primary before we drop all the temp collections.
+ // When we go to drop all temp collections, we must replicate the drops.
+ invariant(txn->writesAreReplicated());
+
stdx::unique_lock<stdx::mutex> lk(_mutex);
if (!_isWaitingForDrainToComplete) {
return;
}
- lk.unlock();
+ lk.unlock();
ScopedTransaction transaction(txn, MODE_X);
+ // Block step downs even after we unlock lk.
Lock::GlobalWrite globalWriteLock(txn->lockState());
-
lk.lock();
+
if (!_isWaitingForDrainToComplete) {
return;
}
+ invariant(!_isCatchingUp);
_isWaitingForDrainToComplete = false;
- _canAcceptNonLocalWrites = true;
_drainFinishedCond.notify_all();
- lk.unlock();
- _externalState->drainModeHook(txn);
-
- // This is done for compatibility with PV0 replicas wrt how "n" ops are processed.
- if (isV1ElectionProtocol()) {
- _externalState->logTransitionToPrimaryToOplog(txn);
+ if (!_getMemberState_inlock().primary()) {
+ // We must have decided not to transition to primary while waiting for the applier to drain.
+ // Skip the rest of this function since it should only be done when really transitioning.
+ return;
}
- StatusWith<OpTime> lastOpTime = _externalState->loadLastOpTime(txn);
- fassertStatusOK(28665, lastOpTime.getStatus());
- _setFirstOpTimeOfMyTerm(lastOpTime.getValue());
+ _canAcceptNonLocalWrites = true;
+ lk.unlock();
+ _setFirstOpTimeOfMyTerm(_externalState->onTransitionToPrimary(txn, isV1ElectionProtocol()));
lk.lock();
+
// Must calculate the commit level again because firstOpTimeOfMyTerm wasn't set when we logged
- // our election in logTransitionToPrimaryToOplog(), above.
+ // our election in onTransitionToPrimary(), above.
_updateLastCommittedOpTime_inlock();
- lk.unlock();
log() << "transition to primary complete; database writes are now permitted" << rsLog;
}
diff --git a/src/mongo/db/repl/roll_back_local_operations.cpp b/src/mongo/db/repl/roll_back_local_operations.cpp
index 2117458e9f4..92312d0d21c 100644
--- a/src/mongo/db/repl/roll_back_local_operations.cpp
+++ b/src/mongo/db/repl/roll_back_local_operations.cpp
@@ -41,6 +41,10 @@ namespace repl {
namespace {
+OpTime getOpTime(const OplogInterface::Iterator::Value& oplogValue) {
+ return fassertStatusOK(40298, OpTime::parseFromOplogEntry(oplogValue.first));
+}
+
Timestamp getTimestamp(const BSONObj& operation) {
return operation["ts"].timestamp();
}
@@ -116,7 +120,7 @@ StatusWith<RollBackLocalOperations::RollbackCommonPoint> RollBackLocalOperations
_scanned++;
if (getHash(_localOplogValue) == getHash(operation)) {
return StatusWith<RollbackCommonPoint>(
- std::make_pair(getTimestamp(_localOplogValue), _localOplogValue.second));
+ std::make_pair(getOpTime(_localOplogValue), _localOplogValue.second));
}
auto status = _rollbackOperation(_localOplogValue.first);
if (!status.isOK()) {
@@ -139,14 +143,11 @@ StatusWith<RollBackLocalOperations::RollbackCommonPoint> RollBackLocalOperations
"Need to process additional remote operations.");
}
- if (getTimestamp(_localOplogValue) < getTimestamp(operation)) {
- _scanned++;
- return StatusWith<RollbackCommonPoint>(ErrorCodes::NoSuchKey,
- "Unable to determine common point. "
- "Need to process additional remote operations.");
- }
-
- return RollbackCommonPoint(Timestamp(Seconds(1), 0), RecordId());
+ invariant(getTimestamp(_localOplogValue) < getTimestamp(operation));
+ _scanned++;
+ return StatusWith<RollbackCommonPoint>(ErrorCodes::NoSuchKey,
+ "Unable to determine common point. "
+ "Need to process additional remote operations.");
}
StatusWith<RollBackLocalOperations::RollbackCommonPoint> syncRollBackLocalOperations(
diff --git a/src/mongo/db/repl/roll_back_local_operations.h b/src/mongo/db/repl/roll_back_local_operations.h
index 20eb923083d..87a940ce57b 100644
--- a/src/mongo/db/repl/roll_back_local_operations.h
+++ b/src/mongo/db/repl/roll_back_local_operations.h
@@ -34,6 +34,7 @@
#include "mongo/bson/bsonobj.h"
#include "mongo/db/record_id.h"
#include "mongo/db/repl/oplog_interface.h"
+#include "mongo/db/repl/optime.h"
#include "mongo/stdx/functional.h"
namespace mongo {
@@ -49,7 +50,7 @@ public:
*/
using RollbackOperationFn = stdx::function<Status(const BSONObj&)>;
- using RollbackCommonPoint = std::pair<Timestamp, RecordId>;
+ using RollbackCommonPoint = std::pair<OpTime, RecordId>;
/**
* Initializes rollback processor with a valid local oplog.
diff --git a/src/mongo/db/repl/roll_back_local_operations_test.cpp b/src/mongo/db/repl/roll_back_local_operations_test.cpp
index 03abb00dfb2..b4a0200f145 100644
--- a/src/mongo/db/repl/roll_back_local_operations_test.cpp
+++ b/src/mongo/db/repl/roll_back_local_operations_test.cpp
@@ -107,7 +107,7 @@ TEST(RollBackLocalOperationsTest, RollbackMultipleLocalOperations) {
RollBackLocalOperations finder(localOplog, rollbackOperation);
auto result = finder.onRemoteOperation(commonOperation.first);
ASSERT_OK(result.getStatus());
- ASSERT_EQUALS(commonOperation.first["ts"].timestamp(), result.getValue().first);
+ ASSERT_EQUALS(OpTime::parseFromOplogEntry(commonOperation.first), result.getValue().first);
ASSERT_EQUALS(commonOperation.second, result.getValue().second);
ASSERT_FALSE(i == localOperations.cend());
ASSERT_BSONOBJ_EQ(commonOperation.first, i->first);
@@ -165,7 +165,7 @@ TEST(RollBackLocalOperationsTest, SkipRemoteOperations) {
}
auto result = finder.onRemoteOperation(commonOperation.first);
ASSERT_OK(result.getStatus());
- ASSERT_EQUALS(commonOperation.first["ts"].timestamp(), result.getValue().first);
+ ASSERT_EQUALS(OpTime::parseFromOplogEntry(commonOperation.first), result.getValue().first);
ASSERT_EQUALS(commonOperation.second, result.getValue().second);
ASSERT_FALSE(i == localOperations.cend());
ASSERT_BSONOBJ_EQ(commonOperation.first, i->first);
@@ -198,7 +198,7 @@ TEST(RollBackLocalOperationsTest, SameTimestampDifferentHashess) {
}
auto result = finder.onRemoteOperation(commonOperation.first);
ASSERT_OK(result.getStatus());
- ASSERT_EQUALS(commonOperation.first["ts"].timestamp(), result.getValue().first);
+ ASSERT_EQUALS(OpTime::parseFromOplogEntry(commonOperation.first), result.getValue().first);
ASSERT_EQUALS(commonOperation.second, result.getValue().second);
ASSERT_FALSE(i == localOperations.cend());
ASSERT_BSONOBJ_EQ(commonOperation.first, i->first);
@@ -271,7 +271,7 @@ TEST(SyncRollBackLocalOperationsTest, RollbackTwoOperations) {
return Status::OK();
});
ASSERT_OK(result.getStatus());
- ASSERT_EQUALS(commonOperation.first["ts"].timestamp(), result.getValue().first);
+ ASSERT_EQUALS(OpTime::parseFromOplogEntry(commonOperation.first), result.getValue().first);
ASSERT_EQUALS(commonOperation.second, result.getValue().second);
ASSERT_FALSE(i == localOperations.cend());
ASSERT_BSONOBJ_EQ(commonOperation.first, i->first);
@@ -290,7 +290,7 @@ TEST(SyncRollBackLocalOperationsTest, SkipOneRemoteOperation) {
return Status::OK();
});
ASSERT_OK(result.getStatus());
- ASSERT_EQUALS(commonOperation.first["ts"].timestamp(), result.getValue().first);
+ ASSERT_EQUALS(OpTime::parseFromOplogEntry(commonOperation.first), result.getValue().first);
ASSERT_EQUALS(commonOperation.second, result.getValue().second);
}
@@ -308,7 +308,7 @@ TEST(SyncRollBackLocalOperationsTest, SameTimestampDifferentHashes) {
return Status::OK();
});
ASSERT_OK(result.getStatus());
- ASSERT_EQUALS(commonOperation.first["ts"].timestamp(), result.getValue().first);
+ ASSERT_EQUALS(OpTime::parseFromOplogEntry(commonOperation.first), result.getValue().first);
ASSERT_EQUALS(commonOperation.second, result.getValue().second);
ASSERT_TRUE(called);
}
diff --git a/src/mongo/db/repl/rs_initialsync.cpp b/src/mongo/db/repl/rs_initialsync.cpp
index 88c7183b1da..990b5fcf554 100644
--- a/src/mongo/db/repl/rs_initialsync.cpp
+++ b/src/mongo/db/repl/rs_initialsync.cpp
@@ -82,8 +82,9 @@ using std::string;
void truncateAndResetOplog(OperationContext* txn,
ReplicationCoordinator* replCoord,
BackgroundSync* bgsync) {
- // Clear minvalid
- StorageInterface::get(txn)->setMinValid(txn, OpTime(), DurableRequirement::None);
+
+ // Add field to minvalid document to tell us to restart initial sync if we crash
+ StorageInterface::get(txn)->setInitialSyncFlag(txn);
AutoGetDb autoDb(txn, "local", MODE_X);
massert(28585, "no local database found", autoDb.getDb());
@@ -300,9 +301,6 @@ Status _initialSync(BackgroundSync* bgsync) {
return Status(ErrorCodes::InitialSyncFailure, msg);
}
- // Add field to minvalid document to tell us to restart initial sync if we crash
- StorageInterface::get(&txn)->setInitialSyncFlag(&txn);
-
log() << "initial sync drop all databases";
dropAllDatabasesExceptLocal(&txn);
@@ -427,16 +425,7 @@ Status _initialSync(BackgroundSync* bgsync) {
log() << "initial sync finishing up";
- {
- ScopedTransaction scopedXact(&txn, MODE_IX);
- AutoGetDb autodb(&txn, "local", MODE_X);
- OpTime lastOpTimeWritten(getGlobalReplicationCoordinator()->getMyLastAppliedOpTime());
- log() << "set minValid=" << lastOpTimeWritten;
-
- // Initial sync is now complete. Flag this by setting minValid to the last thing we synced.
- StorageInterface::get(&txn)->setMinValid(&txn, lastOpTimeWritten, DurableRequirement::None);
- }
-
+ // Initial sync is now complete.
// Clear the initial sync flag -- cannot be done under a db lock, or recursive.
StorageInterface::get(&txn)->clearInitialSyncFlag(&txn);
diff --git a/src/mongo/db/repl/rs_rollback.cpp b/src/mongo/db/repl/rs_rollback.cpp
index 0663c13e4dd..c03251f499d 100644
--- a/src/mongo/db/repl/rs_rollback.cpp
+++ b/src/mongo/db/repl/rs_rollback.cpp
@@ -159,7 +159,7 @@ struct FixUpInfo {
set<string> collectionsToResyncData;
set<string> collectionsToResyncMetadata;
- Timestamp commonPoint;
+ OpTime commonPoint;
RecordId commonPointOurDiskloc;
int rbid; // remote server's current rollback sequence #
@@ -391,9 +391,12 @@ void syncFixUp(OperationContext* txn,
// we have items we are writing that aren't from a point-in-time. thus best not to come
// online until we get to that point in freshness.
+ // TODO this is still wrong because we don't record that we are in rollback, and we can't really
+ // recover.
OpTime minValid = fassertStatusOK(28774, OpTime::parseFromOplogEntry(newMinValid));
log() << "minvalid=" << minValid;
- StorageInterface::get(txn)->setMinValid(txn, {OpTime{}, minValid});
+ StorageInterface::get(txn)->setAppliedThrough(txn, {}); // Use top of oplog.
+ StorageInterface::get(txn)->setMinValid(txn, minValid);
// any full collection resyncs required?
if (!fixUpInfo.collectionsToResyncData.empty() ||
@@ -498,8 +501,8 @@ void syncFixUp(OperationContext* txn,
} else {
OpTime minValid = fassertStatusOK(28775, OpTime::parseFromOplogEntry(newMinValid));
log() << "minvalid=" << minValid;
- const OpTime start{fixUpInfo.commonPoint, OpTime::kUninitializedTerm};
- StorageInterface::get(txn)->setMinValid(txn, {start, minValid});
+ StorageInterface::get(txn)->setMinValid(txn, minValid);
+ StorageInterface::get(txn)->setAppliedThrough(txn, fixUpInfo.commonPoint);
}
} catch (const DBException& e) {
err = "can't get/set minvalid: ";
@@ -769,7 +772,7 @@ void syncFixUp(OperationContext* txn,
log() << "rollback 6";
// clean up oplog
- LOG(2) << "rollback truncate oplog after " << fixUpInfo.commonPoint.toStringPretty();
+ LOG(2) << "rollback truncate oplog after " << fixUpInfo.commonPoint.toString();
{
const NamespaceString oplogNss(rsOplogName);
ScopedTransaction transaction(txn, MODE_IX);
diff --git a/src/mongo/db/repl/rs_rollback_test.cpp b/src/mongo/db/repl/rs_rollback_test.cpp
index 2c38252d442..7bbe51cde11 100644
--- a/src/mongo/db/repl/rs_rollback_test.cpp
+++ b/src/mongo/db/repl/rs_rollback_test.cpp
@@ -147,7 +147,8 @@ void RSRollbackTest::setUp() {
StorageInterface::set(serviceContext, stdx::make_unique<StorageInterfaceMock>());
setOplogCollectionName();
- repl::StorageInterface::get(_txn.get())->setMinValid(_txn.get(), {OpTime{}, OpTime{}});
+ repl::StorageInterface::get(_txn.get())->setAppliedThrough(_txn.get(), OpTime{});
+ repl::StorageInterface::get(_txn.get())->setMinValid(_txn.get(), OpTime{});
}
void RSRollbackTest::tearDown() {
@@ -161,8 +162,9 @@ void noSleep(Seconds seconds) {}
TEST_F(RSRollbackTest, InconsistentMinValid) {
repl::StorageInterface::get(_txn.get())
- ->setMinValid(_txn.get(),
- {OpTime(Timestamp(Seconds(0), 0), 0), OpTime(Timestamp(Seconds(1), 0), 0)});
+ ->setAppliedThrough(_txn.get(), OpTime(Timestamp(Seconds(0), 0), 0));
+ repl::StorageInterface::get(_txn.get())
+ ->setMinValid(_txn.get(), OpTime(Timestamp(Seconds(1), 0), 0));
auto status = syncRollback(_txn.get(),
OplogInterfaceMock(kEmptyMockOperations),
RollbackSourceMock(std::unique_ptr<OplogInterface>(
diff --git a/src/mongo/db/repl/storage_interface.cpp b/src/mongo/db/repl/storage_interface.cpp
index 557b44d079a..b9a1c25df45 100644
--- a/src/mongo/db/repl/storage_interface.cpp
+++ b/src/mongo/db/repl/storage_interface.cpp
@@ -43,21 +43,6 @@ const auto getStorageInterface =
ServiceContext::declareDecoration<std::unique_ptr<StorageInterface>>();
}
-bool BatchBoundaries::operator==(const BatchBoundaries& rhs) const {
- if (&rhs == this) {
- return true;
- }
- return start == rhs.start && end == rhs.end;
-}
-
-std::string BatchBoundaries::toString() const {
- return str::stream() << "[start=" << start.toString() << ", end=" << end.toString() << "]";
-}
-
-std::ostream& operator<<(std::ostream& stream, const BatchBoundaries& boundaries) {
- return stream << boundaries.toString();
-}
-
StorageInterface* StorageInterface::get(ServiceContext* service) {
return getStorageInterface(service).get();
}
diff --git a/src/mongo/db/repl/storage_interface.h b/src/mongo/db/repl/storage_interface.h
index 25873b366ea..22c7ca94e04 100644
--- a/src/mongo/db/repl/storage_interface.h
+++ b/src/mongo/db/repl/storage_interface.h
@@ -48,21 +48,6 @@ class OperationContext;
namespace repl {
-struct BatchBoundaries {
- BatchBoundaries(const OpTime s, const OpTime e) : start(s), end(e) {}
- bool operator==(const BatchBoundaries& rhs) const;
- std::string toString() const;
- OpTime start;
- OpTime end;
-};
-
-std::ostream& operator<<(std::ostream& stream, const BatchBoundaries& boundaries);
-
-enum class DurableRequirement {
- None, // Does not require any durability of the write.
- Strong, // Requires journal or checkpoint write.
-};
-
/**
* Storage interface used by the replication system to interact with storage.
* This interface provides seperation of concerns and a place for mocking out test
@@ -137,29 +122,40 @@ public:
virtual void clearInitialSyncFlag(OperationContext* txn) = 0;
/**
- * Returns the bounds of the current apply batch, if active. If start is null/missing, and
- * end is equal to the last oplog entry then we are in a consistent state and ready for reads.
+ * The minValid value is the earliest (minimum) Timestamp that must be applied in order to
+ * consider the dataset consistent.
*/
- virtual BatchBoundaries getMinValid(OperationContext* txn) const = 0;
+ virtual void setMinValid(OperationContext* txn, const OpTime& minValid) = 0;
+ virtual OpTime getMinValid(OperationContext* txn) const = 0;
/**
- * The minValid value is the earliest (minimum) Timestamp that must be applied in order to
- * consider the dataset consistent.
- *
- * This is called when a batch finishes.
- *
- * Wait for durable writes (which will block on journaling/checkpointing) when specified.
- *
+ * Sets minValid only if it is not already higher than endOpTime.
+ * Warning, this compares the term and timestamp independently. Do not use if the current
+ * minValid could be from the other fork of a rollback.
+ */
+ virtual void setMinValidToAtLeast(OperationContext* txn, const OpTime& endOpTime) = 0;
+
+ /**
+ * On startup all oplog entries with a value >= the oplog delete from point should be deleted.
+ * If null, no documents should be deleted.
*/
- virtual void setMinValid(OperationContext* txn,
- const OpTime& endOpTime,
- const DurableRequirement durReq) = 0;
+ virtual void setOplogDeleteFromPoint(OperationContext* txn, const Timestamp& timestamp) = 0;
+ virtual Timestamp getOplogDeleteFromPoint(OperationContext* txn) = 0;
/**
- * The bounds indicate an apply is active and we are not in a consistent state to allow reads
- * or transition from a non-visible state to primary/secondary.
+ * The applied through point is a persistent record of where we've applied through. If null, the
+ * applied through point is the top of the oplog.
*/
- virtual void setMinValid(OperationContext* txn, const BatchBoundaries& boundaries) = 0;
+ virtual void setAppliedThrough(OperationContext* txn, const OpTime& optime) = 0;
+
+ /**
+ * You should probably be calling ReplicationCoordinator::getLastAppliedOpTime() instead.
+ *
+ * This reads the value from storage which isn't always updated when the ReplicationCoordinator
+ * is.
+ */
+ virtual OpTime getAppliedThrough(OperationContext* txn) = 0;
+
// Collection creation and population for initial sync.
/**
diff --git a/src/mongo/db/repl/storage_interface_impl.cpp b/src/mongo/db/repl/storage_interface_impl.cpp
index 27d15935055..ab7ff8623b6 100644
--- a/src/mongo/db/repl/storage_interface_impl.cpp
+++ b/src/mongo/db/repl/storage_interface_impl.cpp
@@ -75,6 +75,7 @@ namespace repl {
const char StorageInterfaceImpl::kDefaultMinValidNamespace[] = "local.replset.minvalid";
const char StorageInterfaceImpl::kInitialSyncFlagFieldName[] = "doingInitialSync";
const char StorageInterfaceImpl::kBeginFieldName[] = "begin";
+const char StorageInterfaceImpl::kOplogDeleteFromPointFieldName[] = "oplogDeleteFromPoint";
namespace {
using UniqueLock = stdx::unique_lock<stdx::mutex>;
@@ -100,135 +101,139 @@ NamespaceString StorageInterfaceImpl::getMinValidNss() const {
return _minValidNss;
}
-bool StorageInterfaceImpl::getInitialSyncFlag(OperationContext* txn) const {
+BSONObj StorageInterfaceImpl::getMinValidDocument(OperationContext* txn) const {
MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
ScopedTransaction transaction(txn, MODE_IS);
Lock::DBLock dblk(txn->lockState(), _minValidNss.db(), MODE_IS);
Lock::CollectionLock lk(txn->lockState(), _minValidNss.ns(), MODE_IS);
- BSONObj mv;
- bool found = Helpers::getSingleton(txn, _minValidNss.ns().c_str(), mv);
-
- if (found) {
- const auto flag = mv[kInitialSyncFlagFieldName].trueValue();
- LOG(3) << "return initial flag value of " << flag;
- return flag;
- }
- LOG(3) << "return initial flag value of false";
- return false;
+ BSONObj doc;
+ bool found = Helpers::getSingleton(txn, _minValidNss.ns().c_str(), doc);
+ invariant(found || doc.isEmpty());
+ return doc;
}
MONGO_WRITE_CONFLICT_RETRY_LOOP_END(
- txn, "StorageInterfaceImpl::getInitialSyncFlag", _minValidNss.ns());
+ txn, "StorageInterfaceImpl::getMinValidDocument", _minValidNss.ns());
MONGO_UNREACHABLE;
}
-void StorageInterfaceImpl::setInitialSyncFlag(OperationContext* txn) {
+void StorageInterfaceImpl::updateMinValidDocument(OperationContext* txn,
+ const BSONObj& updateSpec) {
MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
ScopedTransaction transaction(txn, MODE_IX);
+ // For now this needs to be MODE_X because it sometimes creates the collection.
Lock::DBLock dblk(txn->lockState(), _minValidNss.db(), MODE_X);
- Helpers::putSingleton(txn, _minValidNss.ns().c_str(), BSON("$set" << kInitialSyncFlag));
+ Helpers::putSingleton(txn, _minValidNss.ns().c_str(), updateSpec);
}
MONGO_WRITE_CONFLICT_RETRY_LOOP_END(
- txn, "StorageInterfaceImpl::setInitialSyncFlag", _minValidNss.ns());
+ txn, "StorageInterfaceImpl::updateMinValidDocument", _minValidNss.ns());
+}
- txn->recoveryUnit()->waitUntilDurable();
+bool StorageInterfaceImpl::getInitialSyncFlag(OperationContext* txn) const {
+ const BSONObj doc = getMinValidDocument(txn);
+ const auto flag = doc[kInitialSyncFlagFieldName].trueValue();
+ LOG(3) << "returning initial sync flag value of " << flag;
+ return flag;
+}
+
+void StorageInterfaceImpl::setInitialSyncFlag(OperationContext* txn) {
LOG(3) << "setting initial sync flag";
+ updateMinValidDocument(txn, BSON("$set" << kInitialSyncFlag));
+ txn->recoveryUnit()->waitUntilDurable();
}
void StorageInterfaceImpl::clearInitialSyncFlag(OperationContext* txn) {
- MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
- ScopedTransaction transaction(txn, MODE_IX);
- // TODO: Investigate correctness of taking MODE_IX for DB/Collection locks
- Lock::DBLock dblk(txn->lockState(), _minValidNss.db(), MODE_X);
- Helpers::putSingleton(txn, _minValidNss.ns().c_str(), BSON("$unset" << kInitialSyncFlag));
- }
- MONGO_WRITE_CONFLICT_RETRY_LOOP_END(
- txn, "StorageInterfaceImpl::clearInitialSyncFlag", _minValidNss.ns());
+ LOG(3) << "clearing initial sync flag";
auto replCoord = repl::ReplicationCoordinator::get(txn);
+ OpTime time = replCoord->getMyLastAppliedOpTime();
+ updateMinValidDocument(
+ txn,
+ BSON("$unset" << kInitialSyncFlag << "$set"
+ << BSON("ts" << time.getTimestamp() << "t" << time.getTerm()
+ << kBeginFieldName
+ << time.toBSON())));
+
if (getGlobalServiceContext()->getGlobalStorageEngine()->isDurable()) {
- OpTime time = replCoord->getMyLastAppliedOpTime();
txn->recoveryUnit()->waitUntilDurable();
replCoord->setMyLastDurableOpTime(time);
}
- LOG(3) << "clearing initial sync flag";
}
-BatchBoundaries StorageInterfaceImpl::getMinValid(OperationContext* txn) const {
- MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
- ScopedTransaction transaction(txn, MODE_IS);
- Lock::DBLock dblk(txn->lockState(), _minValidNss.db(), MODE_IS);
- Lock::CollectionLock lk(txn->lockState(), _minValidNss.ns(), MODE_IS);
- BSONObj mv;
- bool found = Helpers::getSingleton(txn, _minValidNss.ns().c_str(), mv);
- if (found) {
- auto status = OpTime::parseFromOplogEntry(mv.getObjectField(kBeginFieldName));
- OpTime start(status.isOK() ? status.getValue() : OpTime{});
- const auto opTimeStatus = OpTime::parseFromOplogEntry(mv);
- // If any of the keys (fields) are missing from the minvalid document, we return
- // empty.
- if (opTimeStatus == ErrorCodes::NoSuchKey) {
- return BatchBoundaries{{}, {}};
- }
-
- if (!opTimeStatus.isOK()) {
- error() << "Error parsing minvalid entry: " << mv
- << ", with status:" << opTimeStatus.getStatus();
- }
- OpTime end(fassertStatusOK(40052, opTimeStatus));
- LOG(3) << "returning minvalid: " << start.toString() << "(" << start.toBSON() << ") -> "
- << end.toString() << "(" << end.toBSON() << ")";
+OpTime StorageInterfaceImpl::getMinValid(OperationContext* txn) const {
+ const BSONObj doc = getMinValidDocument(txn);
+ const auto opTimeStatus = OpTime::parseFromOplogEntry(doc);
+ // If any of the keys (fields) are missing from the minvalid document, we return
+ // a null OpTime.
+ if (opTimeStatus == ErrorCodes::NoSuchKey) {
+ return {};
+ }
- return BatchBoundaries(start, end);
- }
- LOG(3) << "returning empty minvalid";
- return BatchBoundaries{{}, {}};
+ if (!opTimeStatus.isOK()) {
+ severe() << "Error parsing minvalid entry: " << doc
+ << ", with status:" << opTimeStatus.getStatus();
+ fassertFailedNoTrace(40052);
}
- MONGO_WRITE_CONFLICT_RETRY_LOOP_END(
- txn, "StorageInterfaceImpl::getMinValid", _minValidNss.ns());
+
+ OpTime minValid = opTimeStatus.getValue();
+ LOG(3) << "returning minvalid: " << minValid.toString() << "(" << minValid.toBSON() << ")";
+
+ return minValid;
}
-void StorageInterfaceImpl::setMinValid(OperationContext* txn,
- const OpTime& endOpTime,
- const DurableRequirement durReq) {
- MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
- ScopedTransaction transaction(txn, MODE_IX);
- Lock::DBLock dblk(txn->lockState(), _minValidNss.db(), MODE_X);
- Helpers::putSingleton(
- txn,
- _minValidNss.ns().c_str(),
- BSON("$set" << BSON("ts" << endOpTime.getTimestamp() << "t" << endOpTime.getTerm())
- << "$unset"
- << BSON(kBeginFieldName << 1)));
+void StorageInterfaceImpl::setMinValid(OperationContext* txn, const OpTime& minValid) {
+ LOG(3) << "setting minvalid to exactly: " << minValid.toString() << "(" << minValid.toBSON()
+ << ")";
+ updateMinValidDocument(
+ txn, BSON("$set" << BSON("ts" << minValid.getTimestamp() << "t" << minValid.getTerm())));
+}
+
+void StorageInterfaceImpl::setMinValidToAtLeast(OperationContext* txn, const OpTime& minValid) {
+ LOG(3) << "setting minvalid to at least: " << minValid.toString() << "(" << minValid.toBSON()
+ << ")";
+ updateMinValidDocument(
+ txn, BSON("$max" << BSON("ts" << minValid.getTimestamp() << "t" << minValid.getTerm())));
+}
+
+void StorageInterfaceImpl::setOplogDeleteFromPoint(OperationContext* txn,
+ const Timestamp& timestamp) {
+ LOG(3) << "setting oplog delete from point to: " << timestamp.toStringPretty();
+ updateMinValidDocument(txn, BSON("$set" << BSON(kOplogDeleteFromPointFieldName << timestamp)));
+}
+
+Timestamp StorageInterfaceImpl::getOplogDeleteFromPoint(OperationContext* txn) {
+ const BSONObj doc = getMinValidDocument(txn);
+ Timestamp out = {};
+ if (auto field = doc[kOplogDeleteFromPointFieldName]) {
+ out = field.timestamp();
}
- MONGO_WRITE_CONFLICT_RETRY_LOOP_END(
- txn, "StorageInterfaceImpl::setMinValid", _minValidNss.ns());
- if (durReq == DurableRequirement::Strong) {
- txn->recoveryUnit()->waitUntilDurable();
+ LOG(3) << "returning oplog delete from point: " << out;
+ return out;
+}
+
+void StorageInterfaceImpl::setAppliedThrough(OperationContext* txn, const OpTime& optime) {
+ LOG(3) << "setting appliedThrough to: " << optime.toString() << "(" << optime.toBSON() << ")";
+ if (optime.isNull()) {
+ updateMinValidDocument(txn, BSON("$unset" << BSON(kBeginFieldName << 1)));
+ } else {
+ updateMinValidDocument(txn, BSON("$set" << BSON(kBeginFieldName << optime.toBSON())));
}
- LOG(3) << "setting minvalid: " << endOpTime.toString() << "(" << endOpTime.toBSON() << ")";
}
-void StorageInterfaceImpl::setMinValid(OperationContext* txn, const BatchBoundaries& boundaries) {
- const OpTime& start(boundaries.start);
- const OpTime& end(boundaries.end);
- MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
- ScopedTransaction transaction(txn, MODE_IX);
- Lock::DBLock dblk(txn->lockState(), _minValidNss.db(), MODE_X);
- Helpers::putSingleton(txn,
- _minValidNss.ns().c_str(),
- BSON("$set" << BSON("ts" << end.getTimestamp() << "t" << end.getTerm()
- << kBeginFieldName
- << start.toBSON())));
+OpTime StorageInterfaceImpl::getAppliedThrough(OperationContext* txn) {
+ const BSONObj doc = getMinValidDocument(txn);
+ const auto opTimeStatus = OpTime::parseFromOplogEntry(doc.getObjectField(kBeginFieldName));
+ if (!opTimeStatus.isOK()) {
+ // Return null OpTime on any parse failure, including if "begin" is missing.
+ return {};
}
- MONGO_WRITE_CONFLICT_RETRY_LOOP_END(
- txn, "StorageInterfaceImpl::setMinValid", _minValidNss.ns());
- // NOTE: No need to ensure durability here since starting a batch isn't a problem unless
- // writes happen after, in which case this marker (minvalid) will be written already.
- LOG(3) << "setting minvalid: " << boundaries.start.toString() << "("
- << boundaries.start.toBSON() << ") -> " << boundaries.end.toString() << "("
- << boundaries.end.toBSON() << ")";
+
+ OpTime appliedThrough = opTimeStatus.getValue();
+ LOG(3) << "returning appliedThrough: " << appliedThrough.toString() << "("
+ << appliedThrough.toBSON() << ")";
+
+ return appliedThrough;
}
StatusWith<std::unique_ptr<CollectionBulkLoader>>
diff --git a/src/mongo/db/repl/storage_interface_impl.h b/src/mongo/db/repl/storage_interface_impl.h
index 0226c4705aa..fc3e67fcae5 100644
--- a/src/mongo/db/repl/storage_interface_impl.h
+++ b/src/mongo/db/repl/storage_interface_impl.h
@@ -48,6 +48,7 @@ public:
static const char kDefaultMinValidNamespace[];
static const char kInitialSyncFlagFieldName[];
static const char kBeginFieldName[];
+ static const char kOplogDeleteFromPointFieldName[];
StorageInterfaceImpl();
explicit StorageInterfaceImpl(const NamespaceString& minValidNss);
@@ -67,13 +68,13 @@ public:
void clearInitialSyncFlag(OperationContext* txn) override;
- BatchBoundaries getMinValid(OperationContext* txn) const override;
-
- void setMinValid(OperationContext* ctx,
- const OpTime& endOpTime,
- const DurableRequirement durReq) override;
-
- void setMinValid(OperationContext* ctx, const BatchBoundaries& boundaries) override;
+ OpTime getMinValid(OperationContext* txn) const override;
+ void setMinValid(OperationContext* txn, const OpTime& minValid) override;
+ void setMinValidToAtLeast(OperationContext* txn, const OpTime& endOpTime) override;
+ void setOplogDeleteFromPoint(OperationContext* txn, const Timestamp& timestamp) override;
+ Timestamp getOplogDeleteFromPoint(OperationContext* txn) override;
+ void setAppliedThrough(OperationContext* txn, const OpTime& optime) override;
+ OpTime getAppliedThrough(OperationContext* txn) override;
/**
* Allocates a new TaskRunner for use by the passed in collection.
@@ -115,6 +116,10 @@ public:
Status isAdminDbValid(OperationContext* txn) override;
private:
+ // Returns empty document if not present.
+ BSONObj getMinValidDocument(OperationContext* txn) const;
+ void updateMinValidDocument(OperationContext* txn, const BSONObj& updateSpec);
+
const NamespaceString _minValidNss;
};
diff --git a/src/mongo/db/repl/storage_interface_impl_test.cpp b/src/mongo/db/repl/storage_interface_impl_test.cpp
index 9a1f35682fa..1349331dcfc 100644
--- a/src/mongo/db/repl/storage_interface_impl_test.cpp
+++ b/src/mongo/db/repl/storage_interface_impl_test.cpp
@@ -274,9 +274,9 @@ TEST_F(StorageInterfaceImplTest, GetMinValidAfterSettingInitialSyncFlagWorks) {
storageInterface.setInitialSyncFlag(txn.get());
ASSERT_TRUE(storageInterface.getInitialSyncFlag(txn.get()));
- auto minValid = storageInterface.getMinValid(txn.get());
- ASSERT_TRUE(minValid.start.isNull());
- ASSERT_TRUE(minValid.end.isNull());
+ ASSERT(storageInterface.getMinValid(txn.get()).isNull());
+ ASSERT(storageInterface.getAppliedThrough(txn.get()).isNull());
+ ASSERT(storageInterface.getOplogDeleteFromPoint(txn.get()).isNull());
}
TEST_F(StorageInterfaceImplTest, MinValid) {
@@ -285,18 +285,30 @@ TEST_F(StorageInterfaceImplTest, MinValid) {
StorageInterfaceImpl storageInterface(nss);
auto txn = getClient()->makeOperationContext();
- // MinValid boundaries should be {null optime, null optime} after initializing a new storage
- // engine.
- auto minValid = storageInterface.getMinValid(txn.get());
- ASSERT_TRUE(minValid.start.isNull());
- ASSERT_TRUE(minValid.end.isNull());
+ // MinValid boundaries should all be null after initializing a new storage engine.
+ ASSERT(storageInterface.getMinValid(txn.get()).isNull());
+ ASSERT(storageInterface.getAppliedThrough(txn.get()).isNull());
+ ASSERT(storageInterface.getOplogDeleteFromPoint(txn.get()).isNull());
// Setting min valid boundaries should affect getMinValid() result.
OpTime startOpTime({Seconds(123), 0}, 1LL);
OpTime endOpTime({Seconds(456), 0}, 1LL);
- storageInterface.setMinValid(txn.get(), {startOpTime, endOpTime});
- minValid = storageInterface.getMinValid(txn.get());
- ASSERT_EQUALS(BatchBoundaries(startOpTime, endOpTime), minValid);
+ storageInterface.setAppliedThrough(txn.get(), startOpTime);
+ storageInterface.setMinValid(txn.get(), endOpTime);
+ storageInterface.setOplogDeleteFromPoint(txn.get(), endOpTime.getTimestamp());
+
+ ASSERT_EQ(storageInterface.getAppliedThrough(txn.get()), startOpTime);
+ ASSERT_EQ(storageInterface.getMinValid(txn.get()), endOpTime);
+ ASSERT_EQ(storageInterface.getOplogDeleteFromPoint(txn.get()), endOpTime.getTimestamp());
+
+
+ // setMinValid always changes minValid, but setMinValidToAtLeast only does if higher.
+ storageInterface.setMinValid(txn.get(), startOpTime); // Forcibly lower it.
+ ASSERT_EQ(storageInterface.getMinValid(txn.get()), startOpTime);
+ storageInterface.setMinValidToAtLeast(txn.get(), endOpTime); // Higher than current (sets it).
+ ASSERT_EQ(storageInterface.getMinValid(txn.get()), endOpTime);
+ storageInterface.setMinValidToAtLeast(txn.get(), startOpTime); // Lower than current (no-op).
+ ASSERT_EQ(storageInterface.getMinValid(txn.get()), endOpTime);
// Check min valid document using storage engine interface.
auto minValidDocument = getMinValidDocument(txn.get(), nss);
@@ -306,6 +318,9 @@ TEST_F(StorageInterfaceImplTest, MinValid) {
unittest::assertGet(OpTime::parseFromOplogEntry(
minValidDocument[StorageInterfaceImpl::kBeginFieldName].Obj())));
ASSERT_EQUALS(endOpTime, unittest::assertGet(OpTime::parseFromOplogEntry(minValidDocument)));
+ ASSERT_EQUALS(
+ endOpTime.getTimestamp(),
+ minValidDocument[StorageInterfaceImpl::kOplogDeleteFromPointFieldName].timestamp());
// Recovery unit will be owned by "txn".
RecoveryUnitWithDurabilityTracking* recoveryUnit = new RecoveryUnitWithDurabilityTracking();
@@ -313,19 +328,11 @@ TEST_F(StorageInterfaceImplTest, MinValid) {
// Set min valid without waiting for the changes to be durable.
OpTime endOpTime2({Seconds(789), 0}, 1LL);
- storageInterface.setMinValid(txn.get(), endOpTime2, DurableRequirement::None);
- minValid = storageInterface.getMinValid(txn.get());
- ASSERT_TRUE(minValid.start.isNull());
- ASSERT_EQUALS(endOpTime2, minValid.end);
+ storageInterface.setMinValid(txn.get(), endOpTime2);
+ storageInterface.setAppliedThrough(txn.get(), {});
+ ASSERT_EQUALS(storageInterface.getAppliedThrough(txn.get()), OpTime());
+ ASSERT_EQUALS(storageInterface.getMinValid(txn.get()), endOpTime2);
ASSERT_FALSE(recoveryUnit->waitUntilDurableCalled);
-
- // Set min valid and wait for the changes to be durable.
- OpTime endOpTime3({Seconds(999), 0}, 1LL);
- storageInterface.setMinValid(txn.get(), endOpTime3, DurableRequirement::Strong);
- minValid = storageInterface.getMinValid(txn.get());
- ASSERT_TRUE(minValid.start.isNull());
- ASSERT_EQUALS(endOpTime3, minValid.end);
- ASSERT_TRUE(recoveryUnit->waitUntilDurableCalled);
}
TEST_F(StorageInterfaceImplTest, SnapshotSupported) {
diff --git a/src/mongo/db/repl/storage_interface_mock.cpp b/src/mongo/db/repl/storage_interface_mock.cpp
index 96b40deba0d..7104824ebb0 100644
--- a/src/mongo/db/repl/storage_interface_mock.cpp
+++ b/src/mongo/db/repl/storage_interface_mock.cpp
@@ -55,21 +55,40 @@ void StorageInterfaceMock::clearInitialSyncFlag(OperationContext* txn) {
_initialSyncFlag = false;
}
-BatchBoundaries StorageInterfaceMock::getMinValid(OperationContext* txn) const {
+OpTime StorageInterfaceMock::getMinValid(OperationContext* txn) const {
stdx::lock_guard<stdx::mutex> lock(_minValidBoundariesMutex);
- return _minValidBoundaries;
+ return _minValid;
}
-void StorageInterfaceMock::setMinValid(OperationContext* txn,
- const OpTime& endOpTime,
- const DurableRequirement durReq) {
+void StorageInterfaceMock::setMinValid(OperationContext* txn, const OpTime& minValid) {
stdx::lock_guard<stdx::mutex> lock(_minValidBoundariesMutex);
- _minValidBoundaries = {OpTime(), endOpTime};
+ _minValid = minValid;
}
-void StorageInterfaceMock::setMinValid(OperationContext* txn, const BatchBoundaries& boundaries) {
+void StorageInterfaceMock::setMinValidToAtLeast(OperationContext* txn, const OpTime& minValid) {
stdx::lock_guard<stdx::mutex> lock(_minValidBoundariesMutex);
- _minValidBoundaries = boundaries;
+ _minValid = std::max(_minValid, minValid);
+}
+
+void StorageInterfaceMock::setOplogDeleteFromPoint(OperationContext* txn,
+ const Timestamp& timestamp) {
+ stdx::lock_guard<stdx::mutex> lock(_minValidBoundariesMutex);
+ _oplogDeleteFromPoint = timestamp;
+}
+
+Timestamp StorageInterfaceMock::getOplogDeleteFromPoint(OperationContext* txn) {
+ stdx::lock_guard<stdx::mutex> lock(_minValidBoundariesMutex);
+ return _oplogDeleteFromPoint;
+}
+
+void StorageInterfaceMock::setAppliedThrough(OperationContext* txn, const OpTime& optime) {
+ stdx::lock_guard<stdx::mutex> lock(_minValidBoundariesMutex);
+ _appliedThrough = optime;
+}
+
+OpTime StorageInterfaceMock::getAppliedThrough(OperationContext* txn) {
+ stdx::lock_guard<stdx::mutex> lock(_minValidBoundariesMutex);
+ return _appliedThrough;
}
Status CollectionBulkLoaderMock::init(OperationContext* txn,
diff --git a/src/mongo/db/repl/storage_interface_mock.h b/src/mongo/db/repl/storage_interface_mock.h
index 480cd2b7ecd..163dd76c396 100644
--- a/src/mongo/db/repl/storage_interface_mock.h
+++ b/src/mongo/db/repl/storage_interface_mock.h
@@ -125,11 +125,13 @@ public:
void setInitialSyncFlag(OperationContext* txn) override;
void clearInitialSyncFlag(OperationContext* txn) override;
- BatchBoundaries getMinValid(OperationContext* txn) const override;
- void setMinValid(OperationContext* txn,
- const OpTime& endOpTime,
- const DurableRequirement durReq) override;
- void setMinValid(OperationContext* txn, const BatchBoundaries& boundaries) override;
+ OpTime getMinValid(OperationContext* txn) const override;
+ void setMinValid(OperationContext* txn, const OpTime& minValid) override;
+ void setMinValidToAtLeast(OperationContext* txn, const OpTime& minValid) override;
+ void setOplogDeleteFromPoint(OperationContext* txn, const Timestamp& timestamp) override;
+ Timestamp getOplogDeleteFromPoint(OperationContext* txn) override;
+ void setAppliedThrough(OperationContext* txn, const OpTime& optime) override;
+ OpTime getAppliedThrough(OperationContext* txn) override;
StatusWith<std::unique_ptr<CollectionBulkLoader>> createCollectionForBulkLoading(
const NamespaceString& nss,
@@ -239,7 +241,9 @@ private:
bool _initialSyncFlag = false;
mutable stdx::mutex _minValidBoundariesMutex;
- BatchBoundaries _minValidBoundaries = {OpTime(), OpTime()};
+ OpTime _appliedThrough;
+ OpTime _minValid;
+ Timestamp _oplogDeleteFromPoint;
};
} // namespace repl
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index d0017600bef..9785eb94c6b 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -174,10 +174,14 @@ public:
protected:
void _recordApplied(const OpTime& newOpTime) {
+ // We have to use setMyLastAppliedOpTimeForward since this thread races with
+ // ReplicationExternalStateImpl::onTransitionToPrimary.
_replCoord->setMyLastAppliedOpTimeForward(newOpTime);
}
void _recordDurable(const OpTime& newOpTime) {
+ // We have to use setMyLastDurableOpTimeForward since this thread races with
+ // ReplicationExternalStateImpl::onTransitionToPrimary.
_replCoord->setMyLastDurableOpTimeForward(newOpTime);
}
@@ -225,8 +229,6 @@ ApplyBatchFinalizerForJournal::~ApplyBatchFinalizerForJournal() {
}
void ApplyBatchFinalizerForJournal::record(const OpTime& newOpTime) {
- // We have to use setMyLastAppliedOpTimeForward since this thread races with
- // logTransitionToPrimaryToOplog.
_recordApplied(newOpTime);
stdx::unique_lock<stdx::mutex> lock(_mutex);
@@ -256,8 +258,6 @@ void ApplyBatchFinalizerForJournal::_run() {
auto txn = cc().makeOperationContext();
txn->recoveryUnit()->waitUntilDurable();
- // We have to use setMyLastDurableOpTimeForward since this thread races with
- // logTransitionToPrimaryToOplog.
_recordDurable(latestOpTime);
}
}
@@ -635,10 +635,7 @@ OpTime SyncTail::multiApply(OperationContext* txn, MultiApplier::Operations ops)
}
namespace {
-void tryToGoLiveAsASecondary(OperationContext* txn,
- ReplicationCoordinator* replCoord,
- const BatchBoundaries& minValidBoundaries,
- const OpTime& lastWriteOpTime) {
+void tryToGoLiveAsASecondary(OperationContext* txn, ReplicationCoordinator* replCoord) {
if (replCoord->isInPrimaryOrSecondaryState()) {
return;
}
@@ -659,19 +656,8 @@ void tryToGoLiveAsASecondary(OperationContext* txn,
return;
}
- // If an apply batch is active then we cannot transition.
- if (!minValidBoundaries.start.isNull()) {
- LOG(1) << "Can't go live (tryToGoLiveAsASecondary) as there is an active apply batch.";
- return;
- }
-
- // Must have applied/written to minvalid, so return if not.
- // -- If 'lastWriteOpTime' is null/uninitialized then we can't transition.
- // -- If 'lastWriteOpTime' is less than the end of the last batch then we can't transition.
- if (lastWriteOpTime.isNull() || minValidBoundaries.end > lastWriteOpTime) {
- log() << "Can't go live (tryToGoLiveAsASecondary) as last written optime ("
- << lastWriteOpTime
- << ") is null or greater than minvalid: " << minValidBoundaries.end;
+ // We can't go to SECONDARY until we reach minvalid.
+ if (replCoord->getMyLastAppliedOpTime() < StorageInterface::get(txn)->getMinValid(txn)) {
return;
}
@@ -786,12 +772,8 @@ void SyncTail::oplogApplication(ReplicationCoordinator* replCoord,
? new ApplyBatchFinalizerForJournal(replCoord)
: new ApplyBatchFinalizer(replCoord)};
- auto minValidBoundaries = StorageInterface::get(&txn)->getMinValid(&txn);
- OpTime originalEndOpTime(minValidBoundaries.end);
- OpTime lastWriteOpTime{replCoord->getMyLastAppliedOpTime()};
while (!shouldShutdown()) {
-
- tryToGoLiveAsASecondary(&txn, replCoord, minValidBoundaries, lastWriteOpTime);
+ tryToGoLiveAsASecondary(&txn, replCoord);
// Blocks up to a second waiting for a batch to be ready to apply. If one doesn't become
// ready in time, we'll loop again so we can do the above checks periodically.
@@ -799,77 +781,50 @@ void SyncTail::oplogApplication(ReplicationCoordinator* replCoord,
if (ops.empty())
continue; // Try again.
- const BSONObj lastOp = ops.back().raw;
-
- if (lastOp.isEmpty()) {
+ if (ops.front().raw.isEmpty()) {
// This means that the network thread has coalesced and we have processed all of its
// data.
invariant(ops.getCount() == 1);
if (replCoord->isWaitingForApplierToDrain()) {
replCoord->signalDrainComplete(&txn);
}
-
- // Reset some values when triggered in case it was from a rollback.
- minValidBoundaries = StorageInterface::get(&txn)->getMinValid(&txn);
- lastWriteOpTime = replCoord->getMyLastAppliedOpTime();
- originalEndOpTime = minValidBoundaries.end;
-
continue; // This wasn't a real op. Don't try to apply it.
}
- const auto lastOpTime = fassertStatusOK(28773, OpTime::parseFromOplogEntry(lastOp));
- if (lastWriteOpTime >= lastOpTime) {
- // Error for the oplog to go back in time.
+ // Extract some info from ops that we'll need after releasing the batch below.
+ const size_t opsInBatch = ops.getCount();
+ const auto firstOpTimeInBatch =
+ fassertStatusOK(40299, OpTime::parseFromOplogEntry(ops.front().raw));
+ const auto lastOpTimeInBatch =
+ fassertStatusOK(28773, OpTime::parseFromOplogEntry(ops.back().raw));
+
+ // Make sure the oplog doesn't go back in time or repeat an entry.
+ if (firstOpTimeInBatch <= replCoord->getMyLastAppliedOpTime()) {
fassert(34361,
Status(ErrorCodes::OplogOutOfOrder,
str::stream() << "Attempted to apply an oplog entry ("
- << lastOpTime.toString()
- << ") which is not greater than our lastWrittenOptime ("
- << lastWriteOpTime.toString()
+ << firstOpTimeInBatch.toString()
+ << ") which is not greater than our last applied OpTime ("
+ << replCoord->getMyLastAppliedOpTime().toString()
<< ")."));
}
- // Set minValid to the last OpTime that needs to be applied, in this batch or from the
- // (last) failed batch, whichever is larger.
- // This will cause this node to go into RECOVERING state
- // if we should crash and restart before updating finishing.
- const auto& start = lastWriteOpTime;
-
- // Take the max of the first endOptime (if we recovered) and the end of our batch.
-
- // Setting end to the max of originalEndOpTime and lastOpTime (the end of the batch)
- // ensures that we keep pushing out the point where we can become consistent
- // and allow reads. If we recover and end up doing smaller batches we must pass the
- // originalEndOpTime before we are good.
- //
- // For example:
- // batch apply, 20-40, end = 40
- // batch failure,
- // restart
- // batch apply, 20-25, end = max(25, 40) = 40
- // batch apply, 25-45, end = 45
- const OpTime end(std::max(originalEndOpTime, lastOpTime));
-
- // This write will not journal/checkpoint.
- StorageInterface::get(&txn)->setMinValid(&txn, {start, end});
-
- const size_t opsInBatch = ops.getCount();
- lastWriteOpTime = multiApply(&txn, ops.releaseBatch());
- if (lastWriteOpTime.isNull()) {
+ const bool fail = multiApply(&txn, ops.releaseBatch()).isNull();
+ if (fail) {
// fassert if oplog application failed for any reasons other than shutdown.
- error() << "Failed to apply " << opsInBatch << " operations - batch start:" << start
- << " end:" << end;
+ error() << "Failed to apply " << opsInBatch
+ << " operations - batch start:" << firstOpTimeInBatch
+ << " end:" << lastOpTimeInBatch;
fassert(34360, inShutdownStrict());
// Return without setting minvalid in the case of shutdown.
return;
}
- setNewTimestamp(lastWriteOpTime.getTimestamp());
- StorageInterface::get(&txn)->setMinValid(&txn, end, DurableRequirement::None);
- minValidBoundaries.start = {};
- minValidBoundaries.end = end;
- finalizer->record(lastWriteOpTime);
+ // Update various things that care about our last applied optime.
+ setNewTimestamp(lastOpTimeInBatch.getTimestamp());
+ StorageInterface::get(&txn)->setAppliedThrough(&txn, lastOpTimeInBatch);
+ finalizer->record(lastOpTimeInBatch);
}
}
@@ -1275,6 +1230,8 @@ StatusWith<OpTime> multiApply(OperationContext* txn,
prefetchOps(ops, workerPool);
}
+ auto storage = StorageInterface::get(txn);
+
LOG(2) << "replication batch size is " << ops.size();
// We must grab this because we're going to grab write locks later.
// We hold this mutex the entire time we're writing; it doesn't matter
@@ -1299,6 +1256,7 @@ StatusWith<OpTime> multiApply(OperationContext* txn,
std::vector<MultiApplier::OperationPtrs> writerVectors;
ON_BLOCK_EXIT([&] { workerPool->join(); });
+ storage->setOplogDeleteFromPoint(txn, ops.front().ts.timestamp());
const bool multiThreadedOplogWrites = scheduleWritesToOplog(txn, workerPool, ops);
if (multiThreadedOplogWrites) {
// Use all threads for oplog application.
@@ -1309,6 +1267,21 @@ StatusWith<OpTime> multiApply(OperationContext* txn,
}
fillWriterVectors(txn, &ops, &writerVectors);
+
+ workerPool->join();
+
+ // We must check this before altering the MinValid document because setting inShutdown may
+ // have caused the oplog writers to abort early. This code (and its duplicate below) will go
+ // away once SERVER-25071 is done and clean shutdown drains the apply queue.
+ if (inShutdownStrict()) {
+ log() << "Cannot apply operations due to shutdown in progress";
+ return {ErrorCodes::InterruptedAtShutdown,
+ "Cannot apply operations due to shutdown in progress"};
+ }
+
+ storage->setOplogDeleteFromPoint(txn, Timestamp());
+ storage->setMinValidToAtLeast(txn, ops.back().getOpTime());
+
applyOps(&writerVectors, workerPool, applyOperation);
}
diff --git a/src/mongo/db/repl/sync_tail.h b/src/mongo/db/repl/sync_tail.h
index d4976be5116..c7f63999266 100644
--- a/src/mongo/db/repl/sync_tail.h
+++ b/src/mongo/db/repl/sync_tail.h
@@ -123,6 +123,10 @@ public:
bool empty() const {
return _batch.empty();
}
+ const OplogEntry& front() const {
+ invariant(!_batch.empty());
+ return _batch.front();
+ }
const OplogEntry& back() const {
invariant(!_batch.empty());
return _batch.back();