summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authorScott Hernandez <scotthernandez@gmail.com>2015-09-23 22:36:21 -0400
committerScott Hernandez <scotthernandez@gmail.com>2015-09-25 17:30:33 -0400
commit18de2f3d6bb4c01fb600bbb0e3a2c2e4ab0cc9ee (patch)
tree746011a289a10486090780e3ca0810919bbeba5b /src/mongo/db
parent4980f0d20ed494da2862c337cd2dc1fc7a5dd422 (diff)
downloadmongo-18de2f3d6bb4c01fb600bbb0e3a2c2e4ab0cc9ee.tar.gz
SERVER-20326: record apply batch boundaries
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/repl/minvalid.cpp48
-rw-r--r--src/mongo/db/repl/minvalid.h53
-rw-r--r--src/mongo/db/repl/oplogreader.cpp2
-rw-r--r--src/mongo/db/repl/optime.cpp7
-rw-r--r--src/mongo/db/repl/optime.h1
-rw-r--r--src/mongo/db/repl/rs_rollback.cpp20
-rw-r--r--src/mongo/db/repl/rs_rollback_test.cpp4
-rw-r--r--src/mongo/db/repl/sync_tail.cpp31
8 files changed, 124 insertions, 42 deletions
diff --git a/src/mongo/db/repl/minvalid.cpp b/src/mongo/db/repl/minvalid.cpp
index 7860d4ac988..bcdc5b5a404 100644
--- a/src/mongo/db/repl/minvalid.cpp
+++ b/src/mongo/db/repl/minvalid.cpp
@@ -46,9 +46,10 @@ namespace mongo {
namespace repl {
namespace {
-const char* initialSyncFlagString = "doingInitialSync";
+const char initialSyncFlagString[] = "doingInitialSync";
const BSONObj initialSyncFlag(BSON(initialSyncFlagString << true));
-const char* minvalidNS = "local.replset.minvalid";
+const char minvalidNS[] = "local.replset.minvalid";
+const char beginFieldName[] = "begin";
} // namespace
// Writes
@@ -60,6 +61,7 @@ void clearInitialSyncFlag(OperationContext* txn) {
Helpers::putSingleton(txn, minvalidNS, BSON("$unset" << initialSyncFlag));
}
MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "clearInitialSyncFlags", minvalidNS);
+ LOG(3) << "clearing initial sync flag";
}
void setInitialSyncFlag(OperationContext* txn) {
@@ -69,18 +71,38 @@ void setInitialSyncFlag(OperationContext* txn) {
Helpers::putSingleton(txn, minvalidNS, BSON("$set" << initialSyncFlag));
}
MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "setInitialSyncFlags", minvalidNS);
+ LOG(3) << "setting initial sync flag";
}
-void setMinValid(OperationContext* ctx, const OpTime& opTime) {
+void setMinValid(OperationContext* ctx, const OpTime& endOpTime) {
MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
ScopedTransaction transaction(ctx, MODE_IX);
Lock::DBLock dblk(ctx->lockState(), "local", MODE_X);
Helpers::putSingleton(
ctx,
minvalidNS,
- BSON("$set" << BSON("ts" << opTime.getTimestamp() << "t" << opTime.getTerm())));
+ BSON("$set" << BSON("ts" << endOpTime.getTimestamp() << "t" << endOpTime.getTerm())
+ << "$unset" << BSON(beginFieldName << 1)));
}
MONGO_WRITE_CONFLICT_RETRY_LOOP_END(ctx, "setMinValid", minvalidNS);
+ LOG(3) << "setting minvalid: " << endOpTime.toString() << "(" << endOpTime.toBSON() << ")";
+}
+
+void setMinValid(OperationContext* ctx, const BatchBoundaries& boundaries) {
+ const OpTime& start(boundaries.start);
+ const OpTime& end(boundaries.end);
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
+ ScopedTransaction transaction(ctx, MODE_IX);
+ Lock::DBLock dblk(ctx->lockState(), "local", MODE_X);
+ Helpers::putSingleton(ctx,
+ minvalidNS,
+ BSON("$set" << BSON("ts" << end.getTimestamp() << "t" << end.getTerm()
+ << beginFieldName << start.toBSON())));
+ }
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_END(ctx, "setMinValid", minvalidNS);
+ LOG(3) << "setting minvalid: " << boundaries.start.toString() << "("
+ << boundaries.start.toBSON() << ") -> " << boundaries.end.toString() << "("
+ << boundaries.end.toBSON() << ")";
}
// Reads
@@ -94,8 +116,11 @@ bool getInitialSyncFlag() {
bool found = Helpers::getSingleton(&txn, minvalidNS, mv);
if (found) {
- return mv[initialSyncFlagString].trueValue();
+ const auto flag = mv[initialSyncFlagString].trueValue();
+ LOG(3) << "return initial flag value of " << flag;
+ return flag;
}
+ LOG(3) << "return initial flag value of false";
return false;
}
MONGO_WRITE_CONFLICT_RETRY_LOOP_END(&txn, "getInitialSyncFlags", minvalidNS);
@@ -103,7 +128,7 @@ bool getInitialSyncFlag() {
MONGO_UNREACHABLE;
}
-OpTime getMinValid(OperationContext* txn) {
+BatchBoundaries getMinValid(OperationContext* txn) {
MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
ScopedTransaction transaction(txn, MODE_IS);
Lock::DBLock dblk(txn->lockState(), "local", MODE_IS);
@@ -111,9 +136,16 @@ OpTime getMinValid(OperationContext* txn) {
BSONObj mv;
bool found = Helpers::getSingleton(txn, minvalidNS, mv);
if (found) {
- return fassertStatusOK(28771, OpTime::parseFromOplogEntry(mv));
+ auto status = OpTime::parseFromOplogEntry(mv.getObjectField(beginFieldName));
+ OpTime start(status.isOK() ? status.getValue() : OpTime{});
+ OpTime end(fassertStatusOK(28771, OpTime::parseFromOplogEntry(mv)));
+ LOG(3) << "returning minvalid: " << start.toString() << "(" << start.toBSON() << ") -> "
+ << end.toString() << "(" << end.toBSON() << ")";
+
+ return BatchBoundaries(start, end);
}
- return OpTime();
+ LOG(3) << "returning empty minvalid";
+ return BatchBoundaries{OpTime{}, OpTime{}};
}
MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "getMinValid", minvalidNS);
}
diff --git a/src/mongo/db/repl/minvalid.h b/src/mongo/db/repl/minvalid.h
index 8d0ceb79613..482a7cdc72e 100644
--- a/src/mongo/db/repl/minvalid.h
+++ b/src/mongo/db/repl/minvalid.h
@@ -28,40 +28,67 @@
#pragma once
+#include "mongo/db/repl/optime.h"
+
namespace mongo {
class BSONObj;
class OperationContext;
namespace repl {
-class OpTime;
+
+struct BatchBoundaries {
+ BatchBoundaries(const OpTime s, const OpTime e) : start(s), end(e) {}
+ const OpTime start;
+ const OpTime end;
+};
/**
- * Helper functions for maintaining local.replset.minvalid collection contents.
+ * Helper functions for maintaining a single document in the local.replset.minvalid collection.
*
* When a member reaches its minValid optime it is in a consistent state. Thus, minValid is
- * set as the last step in initial sync. At the beginning of initial sync, _initialSyncFlag
+ * set as the last step in initial sync. At the beginning of initial sync, doingInitialSync
* is appended onto minValid to indicate that initial sync was started but has not yet
* completed.
- * minValid is also used during "normal" sync: the last op in each batch is used to set
- * minValid, to indicate that we are in a consistent state when the batch has been fully
- * applied.
+ *
+ * The document is also updated during "normal" sync. The optime of the last op in each batch is
+ * used to set minValid, along with a "begin" field to demark the start and the fact that a batch
+ * is active. When the batch is done the "begin" field is removed to indicate that we are in a
+ * consistent state when the batch has been fully applied.
+ *
+ * Example of all fields:
+ * { _id:...,
+ * doingInitialSync: true // initial sync is active
+ * ts:..., t:... // end-OpTime
+ * begin: {ts:..., t:...} // a batch is currently being applied, and not consistent
+ * }
*/
/**
- * The initial sync flag is used to durably record the state of an initial sync; its boolean
- * value is true when an initial sync is in progress and hasn't yet completed. The flag
- * is stored as part of the local.replset.minvalid collection.
+ * The initial sync flag is used to durably record that initial sync has not completed.
*/
void clearInitialSyncFlag(OperationContext* txn);
void setInitialSyncFlag(OperationContext* txn);
bool getInitialSyncFlag();
+
+/**
+ * Returns the bounds of the current apply batch, if active. If start is null/missing, and
+ * end is equal to the last oplog entry then we are in a consistent state and ready for reads.
+ */
+BatchBoundaries getMinValid(OperationContext* txn);
+
/**
* The minValid value is the earliest (minimum) Timestamp that must be applied in order to
- * consider the dataset consistent. Do not allow client reads if our last applied operation is
- * before the minValid time.
+ * consider the dataset consistent.
+ *
+ * This is called when a batch finishes.
+ */
+void setMinValid(OperationContext* ctx, const OpTime& endOpTime);
+
+/**
+ * The bounds indicate an apply is active and we are not in a consistent state to allow reads
+ * or transition from a non-visible state to primary/secondary.
*/
-void setMinValid(OperationContext* ctx, const OpTime& opTime);
-OpTime getMinValid(OperationContext* txn);
+void setMinValid(OperationContext* ctx, const BatchBoundaries& boundaries);
}
}
diff --git a/src/mongo/db/repl/oplogreader.cpp b/src/mongo/db/repl/oplogreader.cpp
index ecda101a91f..4ba200694a6 100644
--- a/src/mongo/db/repl/oplogreader.cpp
+++ b/src/mongo/db/repl/oplogreader.cpp
@@ -165,7 +165,7 @@ void OplogReader::connectToSyncSource(OperationContext* txn,
log() << "our last optime : " << lastOpTimeFetched;
log() << "oldest available is " << oldestOpTimeSeen;
log() << "See http://dochub.mongodb.org/core/resyncingaverystalereplicasetmember";
- setMinValid(txn, oldestOpTimeSeen);
+ setMinValid(txn, {lastOpTimeFetched, oldestOpTimeSeen});
bool worked = replCoord->setFollowerMode(MemberState::RS_RECOVERING);
if (!worked) {
warning() << "Failed to transition into " << MemberState(MemberState::RS_RECOVERING)
diff --git a/src/mongo/db/repl/optime.cpp b/src/mongo/db/repl/optime.cpp
index 92a3c5556ca..53474c9ba60 100644
--- a/src/mongo/db/repl/optime.cpp
+++ b/src/mongo/db/repl/optime.cpp
@@ -82,6 +82,13 @@ StatusWith<OpTime> OpTime::parseFromOplogEntry(const BSONObj& obj) {
return OpTime(ts, term);
}
+BSONObj OpTime::toBSON() const {
+ BSONObjBuilder bldr;
+ bldr.append(kTimestampFieldName, _timestamp);
+ bldr.append(kTermFieldName, _term);
+ return bldr.obj();
+}
+
std::string OpTime::toString() const {
std::stringstream ss;
ss << "(term: " << _term << ", timestamp: " << _timestamp.toStringPretty() << ")";
diff --git a/src/mongo/db/repl/optime.h b/src/mongo/db/repl/optime.h
index 0af26bc8250..8e3e140db68 100644
--- a/src/mongo/db/repl/optime.h
+++ b/src/mongo/db/repl/optime.h
@@ -76,6 +76,7 @@ public:
* subObjName : { ts: <timestamp>, t: <term> }
*/
void append(BSONObjBuilder* builder, const std::string& subObjName) const;
+ BSONObj toBSON() const;
static StatusWith<OpTime> parseFromOplogEntry(const BSONObj& obj);
diff --git a/src/mongo/db/repl/rs_rollback.cpp b/src/mongo/db/repl/rs_rollback.cpp
index 3f0019f6462..38b122d745b 100644
--- a/src/mongo/db/repl/rs_rollback.cpp
+++ b/src/mongo/db/repl/rs_rollback.cpp
@@ -372,7 +372,7 @@ void syncFixUp(OperationContext* txn,
// online until we get to that point in freshness.
OpTime minValid = fassertStatusOK(28774, OpTime::parseFromOplogEntry(newMinValid));
log() << "minvalid=" << minValid;
- setMinValid(txn, minValid);
+ setMinValid(txn, {OpTime{}, minValid});
// any full collection resyncs required?
if (!fixUpInfo.collectionsToResyncData.empty() ||
@@ -476,7 +476,8 @@ void syncFixUp(OperationContext* txn,
} else {
OpTime minValid = fassertStatusOK(28775, OpTime::parseFromOplogEntry(newMinValid));
log() << "minvalid=" << minValid;
- setMinValid(txn, minValid);
+ const OpTime start{fixUpInfo.commonPoint, OpTime::kUninitializedTerm};
+ setMinValid(txn, {start, minValid});
}
} catch (const DBException& e) {
err = "can't get/set minvalid: ";
@@ -812,13 +813,14 @@ Status _syncRollback(OperationContext* txn,
auto res = syncRollBackLocalOperations(
localOplog, rollbackSource.getOplog(), processOperationForFixUp);
if (!res.isOK()) {
- switch (res.getStatus().code()) {
+ const auto status = res.getStatus();
+ switch (status.code()) {
case ErrorCodes::OplogStartMissing:
case ErrorCodes::UnrecoverableRollbackError:
sleepSecondsFn(Seconds(1));
- return res.getStatus();
+ return status;
default:
- throw RSFatalException(res.getStatus().toString());
+ throw RSFatalException(status.toString());
}
} else {
how.commonPoint = res.getValue().first;
@@ -887,13 +889,13 @@ Status syncRollback(OperationContext* txn,
// check that we are at minvalid, otherwise we cannot rollback as we may be in an
// inconsistent state
{
- OpTime minvalid = getMinValid(txn);
- if (minvalid > lastOpTimeApplied) {
+ BatchBoundaries boundaries = getMinValid(txn);
+ if (!boundaries.start.isNull() || boundaries.end > lastOpTimeApplied) {
severe() << "need to rollback, but in inconsistent state" << endl;
return Status(ErrorCodes::UnrecoverableRollbackError,
str::stream() << "need to rollback, but in inconsistent state. "
- << "minvalid: " << minvalid.toString()
- << " our last optime: " << lastOpTimeApplied.toString(),
+ << "minvalid: " << boundaries.end.toString()
+ << " > our last optime: " << lastOpTimeApplied.toString(),
18750);
}
}
diff --git a/src/mongo/db/repl/rs_rollback_test.cpp b/src/mongo/db/repl/rs_rollback_test.cpp
index 31f0b59ce41..cb6dc66e39c 100644
--- a/src/mongo/db/repl/rs_rollback_test.cpp
+++ b/src/mongo/db/repl/rs_rollback_test.cpp
@@ -154,6 +154,7 @@ void RSRollbackTest::setUp() {
setGlobalReplicationCoordinator(_coordinator);
setOplogCollectionName();
+ repl::setMinValid(_txn.get(), {OpTime{}, OpTime{}});
}
void RSRollbackTest::tearDown() {
@@ -169,7 +170,8 @@ void RSRollbackTest::tearDown() {
void noSleep(Seconds seconds) {}
TEST_F(RSRollbackTest, InconsistentMinValid) {
- repl::setMinValid(_txn.get(), OpTime(Timestamp(Seconds(1), 0), 0));
+ repl::setMinValid(_txn.get(),
+ {OpTime(Timestamp(Seconds(0), 0), 0), OpTime(Timestamp(Seconds(1), 0), 0)});
auto status = syncRollback(_txn.get(),
OpTime(),
OplogInterfaceMock(kEmptyMockOperations),
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index ed023e1b9d6..f5e545fa31b 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -40,15 +40,15 @@
#include "mongo/base/counter.h"
#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/catalog/collection.h"
-#include "mongo/db/catalog/database.h"
#include "mongo/db/catalog/database_holder.h"
+#include "mongo/db/catalog/database.h"
#include "mongo/db/catalog/document_validation.h"
#include "mongo/db/commands/fsync.h"
#include "mongo/db/commands/server_status_metric.h"
#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/curop.h"
#include "mongo/db/db_raii.h"
-#include "mongo/db/service_context.h"
+#include "mongo/db/global_timestamp.h"
#include "mongo/db/operation_context_impl.h"
#include "mongo/db/prefetch.h"
#include "mongo/db/repl/bgsync.h"
@@ -59,6 +59,7 @@
#include "mongo/db/repl/replica_set_config.h"
#include "mongo/db/repl/replication_coordinator_global.h"
#include "mongo/db/server_parameters.h"
+#include "mongo/db/service_context.h"
#include "mongo/db/stats/timer_stats.h"
#include "mongo/util/exit.h"
#include "mongo/util/fail_point_service.h"
@@ -502,8 +503,8 @@ void tryToGoLiveAsASecondary(OperationContext* txn, ReplicationCoordinator* repl
return;
}
- OpTime minvalid = getMinValid(txn);
- if (minvalid > replCoord->getMyLastOptime()) {
+ BatchBoundaries boundaries = getMinValid(txn);
+ if (!boundaries.start.isNull() || boundaries.end > replCoord->getMyLastOptime()) {
return;
}
@@ -519,9 +520,10 @@ void tryToGoLiveAsASecondary(OperationContext* txn, ReplicationCoordinator* repl
void SyncTail::oplogApplication() {
ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator();
+ OperationContextImpl txn;
+ OpTime originalEndOpTime(getMinValid(&txn).end);
while (!inShutdown()) {
OpQueue ops;
- OperationContextImpl txn;
Timer batchTimer;
int lastTimeChecked = 0;
@@ -591,10 +593,18 @@ void SyncTail::oplogApplication() {
const BSONObj lastOp = ops.back();
handleSlaveDelay(lastOp);
- // Set minValid to the last op to be applied in this next batch.
+ // Set minValid to the last OpTime to be applied in this batch.
// This will cause this node to go into RECOVERING state
- // if we should crash and restart before updating the oplog
- setMinValid(&txn, fassertStatusOK(28773, OpTime::parseFromOplogEntry(lastOp)));
+ // if we should crash and restart before updating finishing.
+ const OpTime start(getLastSetTimestamp(), OpTime::kUninitializedTerm);
+
+ // Take the max of the first endOptime (if we recovered) and the end of our batch.
+ const auto lastOpTime = fassertStatusOK(28773, OpTime::parseFromOplogEntry(lastOp));
+ const OpTime end(std::max(originalEndOpTime, lastOpTime));
+
+ // This write will not journal/checkpoint.
+ setMinValid(&txn, {start, end});
+
multiApply(&txn,
ops,
&_prefetcherPool,
@@ -602,6 +612,9 @@ void SyncTail::oplogApplication() {
_applyFunc,
this,
supportsWaitingUntilDurable());
+
+ // This write will journal/checkpoint, and finish the batch.
+ setMinValid(&txn, end);
}
}
@@ -823,8 +836,6 @@ bool SyncTail::shouldRetry(OperationContext* txn, const BSONObj& o) {
MONGO_UNREACHABLE;
}
-static AtomicUInt32 replWriterWorkerId;
-
static void initializeWriterThread() {
// Only do this once per thread
if (!ClientBasic::getCurrent()) {