summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/repl/replication_coordinator_external_state_impl.cpp')
-rw-r--r--src/mongo/db/repl/replication_coordinator_external_state_impl.cpp138
1 files changed, 122 insertions, 16 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
index 75b08a4fd76..67abc29502e 100644
--- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp
@@ -42,6 +42,7 @@
#include "mongo/db/client.h"
#include "mongo/db/concurrency/d_concurrency.h"
#include "mongo/db/concurrency/write_conflict_exception.h"
+#include "mongo/db/dbdirectclient.h"
#include "mongo/db/dbhelpers.h"
#include "mongo/db/jsobj.h"
#include "mongo/db/op_observer.h"
@@ -70,6 +71,7 @@
#include "mongo/util/net/hostandport.h"
#include "mongo/util/net/message_port.h"
#include "mongo/util/net/sock.h"
+#include "mongo/util/scopeguard.h"
namespace mongo {
namespace repl {
@@ -116,7 +118,7 @@ void ReplicationCoordinatorExternalStateImpl::startMasterSlave(OperationContext*
repl::startMasterSlave(txn);
}
-void ReplicationCoordinatorExternalStateImpl::shutdown() {
+void ReplicationCoordinatorExternalStateImpl::shutdown(OperationContext* txn) {
stdx::lock_guard<stdx::mutex> lk(_threadMutex);
if (_startedThreads) {
log() << "Stopping replication applier threads";
@@ -129,6 +131,13 @@ void ReplicationCoordinatorExternalStateImpl::shutdown() {
if (_snapshotThread)
_snapshotThread->shutdown();
+
+ if (getOplogDeleteFromPoint(txn).isNull() &&
+ loadLastOpTime(txn) == getAppliedThrough(txn)) {
+ // Clear the appliedThrough marker to indicate we are consistent with the top of the
+ // oplog.
+ setAppliedThrough(txn, {});
+ }
}
}
@@ -169,24 +178,45 @@ Status ReplicationCoordinatorExternalStateImpl::initializeReplSetStorage(Operati
wuow.commit();
}
MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "initiate oplog entry", "local.oplog.rs");
+
+ // This initializes the minvalid document with a null "ts" because older versions (<=3.2)
+ // get angry if the minValid document is present but doesn't have a "ts" field.
+ // Consider removing this once we no longer need to support downgrading to 3.2.
+ setMinValidToAtLeast(txn, {});
} catch (const DBException& ex) {
return ex.toStatus();
}
return Status::OK();
}
-void ReplicationCoordinatorExternalStateImpl::logTransitionToPrimaryToOplog(OperationContext* txn) {
- MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
- ScopedTransaction scopedXact(txn, MODE_X);
+OpTime ReplicationCoordinatorExternalStateImpl::onTransitionToPrimary(OperationContext* txn,
+ bool isV1ElectionProtocol) {
+ invariant(txn->lockState()->isW());
+
+ // Clear the appliedThrough marker so on startup we'll use the top of the oplog. This must be
+ // done before we add anything to our oplog.
+ invariant(getOplogDeleteFromPoint(txn).isNull());
+ setAppliedThrough(txn, {});
+
+ if (isV1ElectionProtocol) {
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
+ ScopedTransaction scopedXact(txn, MODE_X);
- WriteUnitOfWork wuow(txn);
- txn->getClient()->getServiceContext()->getOpObserver()->onOpMessage(txn,
- BSON("msg"
- << "new primary"));
- wuow.commit();
+ WriteUnitOfWork wuow(txn);
+ txn->getClient()->getServiceContext()->getOpObserver()->onOpMessage(
+ txn,
+ BSON("msg"
+ << "new primary"));
+ wuow.commit();
+ }
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_END(
+ txn, "logging transition to primary to oplog", "local.oplog.rs");
}
- MONGO_WRITE_CONFLICT_RETRY_LOOP_END(
- txn, "logging transition to primary to oplog", "local.oplog.rs");
+ const auto opTimeToReturn = fassertStatusOK(28665, loadLastOpTime(txn));
+
+ dropAllTempCollections(txn);
+
+ return opTimeToReturn;
}
void ReplicationCoordinatorExternalStateImpl::forwardSlaveProgress() {
@@ -301,18 +331,94 @@ void ReplicationCoordinatorExternalStateImpl::setGlobalTimestamp(const Timestamp
}
void ReplicationCoordinatorExternalStateImpl::cleanUpLastApplyBatch(OperationContext* txn) {
- auto mv = getMinValid(txn);
+ if (getInitialSyncFlag(txn)) {
+ return; // Initial Sync will take over so no cleanup is needed.
+ }
- if (!mv.start.isNull()) {
- // If we are in the middle of a batch, and recoveringm then we need to truncate the oplog.
- LOG(2) << "Recovering from a failed apply batch, start:" << mv.start.toBSON();
- truncateOplogTo(txn, mv.start.getTimestamp());
+ const auto deleteFromPoint = getOplogDeleteFromPoint(txn);
+ const auto appliedThrough = getAppliedThrough(txn);
+
+ const bool needToDeleteEndOfOplog = !deleteFromPoint.isNull() &&
+ // This version should never have a non-null deleteFromPoint with a null appliedThrough.
+ // This scenario means that we downgraded after unclean shutdown, then the downgraded node
+ // deleted the ragged end of our oplog, then did a clean shutdown.
+ !appliedThrough.isNull() &&
+ // Similarly we should never have an appliedThrough higher than the deleteFromPoint. This
+ // means that the downgraded node deleted our ragged end then applied ahead of our
+ // deleteFromPoint and then had an unclean shutdown before upgrading. We are ok with
+ // applying these ops because older versions wrote to the oplog from a single thread so we
+ // know they are in order.
+ !(appliedThrough.getTimestamp() >= deleteFromPoint);
+ if (needToDeleteEndOfOplog) {
+ log() << "Removing unapplied entries starting at: " << deleteFromPoint;
+ truncateOplogTo(txn, deleteFromPoint);
+ }
+ setOplogDeleteFromPoint(txn, {}); // clear the deleteFromPoint
+
+ if (appliedThrough.isNull()) {
+ // No follow-up work to do.
+ return;
+ }
+
+ // Check if we have any unapplied ops in our oplog. It is important that this is done after
+ // deleting the ragged end of the oplog.
+ const auto topOfOplog = fassertStatusOK(40290, loadLastOpTime(txn));
+ if (appliedThrough == topOfOplog) {
+ return; // We've applied all the valid oplog we have.
+ } else if (appliedThrough > topOfOplog) {
+ severe() << "Applied op " << appliedThrough << " not found. Top of oplog is " << topOfOplog
+ << '.';
+ fassertFailedNoTrace(40313);
+ }
+
+ log() << "Replaying stored operations from " << appliedThrough << " (exclusive) to "
+ << topOfOplog << " (inclusive).";
+
+ DBDirectClient db(txn);
+ auto cursor = db.query(rsOplogName,
+ QUERY("ts" << BSON("$gte" << appliedThrough.getTimestamp())),
+ /*batchSize*/ 0,
+ /*skip*/ 0,
+ /*projection*/ nullptr,
+ QueryOption_OplogReplay);
+
+ // Check that the first document matches our appliedThrough point then skip it since it's
+ // already been applied.
+ if (!cursor->more()) {
+ // This should really be impossible because we check above that the top of the oplog is
+ // strictly > appliedThrough. If this fails it represents a serious bug in either the
+ // storage engine or query's implementation of OplogReplay.
+ severe() << "Couldn't find any entries in the oplog >= " << appliedThrough
+ << " which should be impossible.";
+ fassertFailedNoTrace(40293);
+ }
+ auto firstOpTimeFound = fassertStatusOK(40291, OpTime::parseFromOplogEntry(cursor->nextSafe()));
+ if (firstOpTimeFound != appliedThrough) {
+ severe() << "Oplog entry at " << appliedThrough << " is missing; actual entry found is "
+ << firstOpTimeFound;
+ fassertFailedNoTrace(40292);
+ }
+
+ // Apply remaining ops one at at time, but don't log them because they are already logged.
+ const bool wereWritesReplicated = txn->writesAreReplicated();
+ ON_BLOCK_EXIT([&] { txn->setReplicatedWrites(wereWritesReplicated); });
+ txn->setReplicatedWrites(false);
+
+ while (cursor->more()) {
+ auto entry = cursor->nextSafe();
+ fassertStatusOK(40294, SyncTail::syncApply(txn, entry, true));
+ setAppliedThrough(txn, fassertStatusOK(40295, OpTime::parseFromOplogEntry(entry)));
}
}
StatusWith<OpTime> ReplicationCoordinatorExternalStateImpl::loadLastOpTime(OperationContext* txn) {
// TODO: handle WriteConflictExceptions below
try {
+ // If we are doing an initial sync do not read from the oplog.
+ if (getInitialSyncFlag(txn)) {
+ return {ErrorCodes::InitialSyncFailure, "In the middle of an initial sync."};
+ }
+
BSONObj oplogEntry;
if (!Helpers::getLast(txn, rsOplogName.c_str(), oplogEntry)) {
return StatusWith<OpTime>(ErrorCodes::NoMatchingDocument,