summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/roll_back_local_operations.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/repl/roll_back_local_operations.cpp')
-rw-r--r--src/mongo/db/repl/roll_back_local_operations.cpp266
1 files changed, 127 insertions, 139 deletions
diff --git a/src/mongo/db/repl/roll_back_local_operations.cpp b/src/mongo/db/repl/roll_back_local_operations.cpp
index 2cf07fa36b7..2117458e9f4 100644
--- a/src/mongo/db/repl/roll_back_local_operations.cpp
+++ b/src/mongo/db/repl/roll_back_local_operations.cpp
@@ -41,157 +41,145 @@ namespace repl {
namespace {
- Timestamp getTimestamp(const BSONObj& operation) {
- return operation["ts"].timestamp();
- }
-
- Timestamp getTimestamp(const OplogInterface::Iterator::Value& oplogValue) {
- return getTimestamp(oplogValue.first);
- }
-
- long long getHash(const BSONObj& operation) {
- return operation["h"].Long();
- }
-
- long long getHash(const OplogInterface::Iterator::Value& oplogValue) {
- return getHash(oplogValue.first);
- }
-
-} // namespace
-
- RollBackLocalOperations::RollBackLocalOperations(
- const OplogInterface& localOplog,
- const RollbackOperationFn& rollbackOperation)
-
- : _localOplogIterator(localOplog.makeIterator()),
- _rollbackOperation(rollbackOperation),
- _scanned(0) {
-
- uassert(ErrorCodes::BadValue, "invalid local oplog iterator", _localOplogIterator);
- uassert(ErrorCodes::BadValue, "null roll back operation function", rollbackOperation);
+Timestamp getTimestamp(const BSONObj& operation) {
+ return operation["ts"].timestamp();
+}
+
+Timestamp getTimestamp(const OplogInterface::Iterator::Value& oplogValue) {
+ return getTimestamp(oplogValue.first);
+}
+
+long long getHash(const BSONObj& operation) {
+ return operation["h"].Long();
+}
+
+long long getHash(const OplogInterface::Iterator::Value& oplogValue) {
+ return getHash(oplogValue.first);
+}
+
+} // namespace
+
+RollBackLocalOperations::RollBackLocalOperations(const OplogInterface& localOplog,
+ const RollbackOperationFn& rollbackOperation)
+
+ : _localOplogIterator(localOplog.makeIterator()),
+ _rollbackOperation(rollbackOperation),
+ _scanned(0) {
+ uassert(ErrorCodes::BadValue, "invalid local oplog iterator", _localOplogIterator);
+ uassert(ErrorCodes::BadValue, "null roll back operation function", rollbackOperation);
+}
+
+StatusWith<RollBackLocalOperations::RollbackCommonPoint> RollBackLocalOperations::onRemoteOperation(
+ const BSONObj& operation) {
+ if (_scanned == 0) {
+ auto result = _localOplogIterator->next();
+ if (!result.isOK()) {
+ return StatusWith<RollbackCommonPoint>(ErrorCodes::OplogStartMissing,
+ "no oplog during initsync");
+ }
+ _localOplogValue = result.getValue();
+
+ long long diff = static_cast<long long>(getTimestamp(_localOplogValue).getSecs()) -
+ getTimestamp(operation).getSecs();
+ // diff could be positive, negative, or zero
+ log() << "rollback our last optime: " << getTimestamp(_localOplogValue).toStringPretty();
+ log() << "rollback their last optime: " << getTimestamp(operation).toStringPretty();
+ log() << "rollback diff in end of log times: " << diff << " seconds";
+ if (diff > 1800) {
+ severe() << "rollback too long a time period for a rollback.";
+ return StatusWith<RollbackCommonPoint>(
+ ErrorCodes::ExceededTimeLimit,
+ "rollback error: not willing to roll back more than 30 minutes of data");
+ }
}
- StatusWith<RollBackLocalOperations::RollbackCommonPoint>
- RollBackLocalOperations::onRemoteOperation(const BSONObj& operation) {
-
- if (_scanned == 0) {
- auto result = _localOplogIterator->next();
- if (!result.isOK()) {
- return StatusWith<RollbackCommonPoint>(ErrorCodes::OplogStartMissing,
- "no oplog during initsync");
- }
- _localOplogValue = result.getValue();
-
- long long diff =
- static_cast<long long>(getTimestamp(_localOplogValue).getSecs()) -
- getTimestamp(operation).getSecs();
- // diff could be positive, negative, or zero
- log() << "rollback our last optime: "
- << getTimestamp(_localOplogValue).toStringPretty();
- log() << "rollback their last optime: " << getTimestamp(operation).toStringPretty();
- log() << "rollback diff in end of log times: " << diff << " seconds";
- if (diff > 1800) {
- severe() << "rollback too long a time period for a rollback.";
- return StatusWith<RollbackCommonPoint>(
- ErrorCodes::ExceededTimeLimit,
- "rollback error: not willing to roll back more than 30 minutes of data");
- }
+ while (getTimestamp(_localOplogValue) > getTimestamp(operation)) {
+ _scanned++;
+ auto status = _rollbackOperation(_localOplogValue.first);
+ if (!status.isOK()) {
+ invariant(ErrorCodes::NoSuchKey != status.code());
+ return status;
}
-
- while (getTimestamp(_localOplogValue) > getTimestamp(operation)) {
- _scanned++;
- auto status = _rollbackOperation(_localOplogValue.first);
- if (!status.isOK()) {
- invariant(ErrorCodes::NoSuchKey != status.code());
- return status;
- }
- auto result = _localOplogIterator->next();
- if (!result.isOK()) {
- severe() << "rollback error RS101 reached beginning of local oplog";
- log() << " scanned: " << _scanned;
- log() << " theirTime: " << getTimestamp(operation).toStringLong();
- log() << " ourTime: " << getTimestamp(_localOplogValue).toStringLong();
- return StatusWith<RollbackCommonPoint>(
- ErrorCodes::NoMatchingDocument,
- "RS101 reached beginning of local oplog [2]");
- }
- _localOplogValue = result.getValue();
+ auto result = _localOplogIterator->next();
+ if (!result.isOK()) {
+ severe() << "rollback error RS101 reached beginning of local oplog";
+ log() << " scanned: " << _scanned;
+ log() << " theirTime: " << getTimestamp(operation).toStringLong();
+ log() << " ourTime: " << getTimestamp(_localOplogValue).toStringLong();
+ return StatusWith<RollbackCommonPoint>(ErrorCodes::NoMatchingDocument,
+ "RS101 reached beginning of local oplog [2]");
}
+ _localOplogValue = result.getValue();
+ }
- if (getTimestamp(_localOplogValue) == getTimestamp(operation)) {
- _scanned++;
- if (getHash(_localOplogValue) == getHash(operation)) {
- return StatusWith<RollbackCommonPoint>(
- std::make_pair(getTimestamp(_localOplogValue), _localOplogValue.second));
- }
- auto status = _rollbackOperation(_localOplogValue.first);
- if (!status.isOK()) {
- invariant(ErrorCodes::NoSuchKey != status.code());
- return status;
- }
- auto result = _localOplogIterator->next();
- if (!result.isOK()) {
- severe() << "rollback error RS101 reached beginning of local oplog";
- log() << " scanned: " << _scanned;
- log() << " theirTime: " << getTimestamp(operation).toStringLong();
- log() << " ourTime: " << getTimestamp(_localOplogValue).toStringLong();
- return StatusWith<RollbackCommonPoint>(
- ErrorCodes::NoMatchingDocument,
- "RS101 reached beginning of local oplog [1]");
- }
- _localOplogValue = result.getValue();
+ if (getTimestamp(_localOplogValue) == getTimestamp(operation)) {
+ _scanned++;
+ if (getHash(_localOplogValue) == getHash(operation)) {
return StatusWith<RollbackCommonPoint>(
- ErrorCodes::NoSuchKey,
- "Unable to determine common point - same timestamp but different hash. "
- "Need to process additional remote operations.");
+ std::make_pair(getTimestamp(_localOplogValue), _localOplogValue.second));
}
-
- if (getTimestamp(_localOplogValue) < getTimestamp(operation)) {
- _scanned++;
- return StatusWith<RollbackCommonPoint>(
- ErrorCodes::NoSuchKey,
- "Unable to determine common point. "
- "Need to process additional remote operations.");
+ auto status = _rollbackOperation(_localOplogValue.first);
+ if (!status.isOK()) {
+ invariant(ErrorCodes::NoSuchKey != status.code());
+ return status;
}
-
- return RollbackCommonPoint(Timestamp(Seconds(1), 0), RecordId());
+ auto result = _localOplogIterator->next();
+ if (!result.isOK()) {
+ severe() << "rollback error RS101 reached beginning of local oplog";
+ log() << " scanned: " << _scanned;
+ log() << " theirTime: " << getTimestamp(operation).toStringLong();
+ log() << " ourTime: " << getTimestamp(_localOplogValue).toStringLong();
+ return StatusWith<RollbackCommonPoint>(ErrorCodes::NoMatchingDocument,
+ "RS101 reached beginning of local oplog [1]");
+ }
+ _localOplogValue = result.getValue();
+ return StatusWith<RollbackCommonPoint>(
+ ErrorCodes::NoSuchKey,
+ "Unable to determine common point - same timestamp but different hash. "
+ "Need to process additional remote operations.");
}
- StatusWith<RollBackLocalOperations::RollbackCommonPoint> syncRollBackLocalOperations(
- const OplogInterface& localOplog,
- const OplogInterface& remoteOplog,
- const RollBackLocalOperations::RollbackOperationFn& rollbackOperation) {
-
- auto remoteIterator = remoteOplog.makeIterator();
- auto remoteResult = remoteIterator->next();
- if (!remoteResult.isOK()) {
- return StatusWith<RollBackLocalOperations::RollbackCommonPoint>(
- ErrorCodes::InvalidSyncSource,
- "remote oplog empty or unreadable");
- }
+ if (getTimestamp(_localOplogValue) < getTimestamp(operation)) {
+ _scanned++;
+ return StatusWith<RollbackCommonPoint>(ErrorCodes::NoSuchKey,
+ "Unable to determine common point. "
+ "Need to process additional remote operations.");
+ }
- RollBackLocalOperations finder(localOplog, rollbackOperation);
- Timestamp theirTime;
- while (remoteResult.isOK()) {
- theirTime = remoteResult.getValue().first["ts"].timestamp();
- BSONObj theirObj = remoteResult.getValue().first;
- auto result = finder.onRemoteOperation(theirObj);
- if (result.isOK()) {
- return result.getValue();
- }
- else if (result.getStatus().code() != ErrorCodes::NoSuchKey) {
- return result;
- }
- remoteResult = remoteIterator->next();
- }
+ return RollbackCommonPoint(Timestamp(Seconds(1), 0), RecordId());
+}
- severe() << "rollback error RS100 reached beginning of remote oplog";
- log() << " them: " << remoteOplog.toString();
- log() << " theirTime: " << theirTime.toStringLong();
+StatusWith<RollBackLocalOperations::RollbackCommonPoint> syncRollBackLocalOperations(
+ const OplogInterface& localOplog,
+ const OplogInterface& remoteOplog,
+ const RollBackLocalOperations::RollbackOperationFn& rollbackOperation) {
+ auto remoteIterator = remoteOplog.makeIterator();
+ auto remoteResult = remoteIterator->next();
+ if (!remoteResult.isOK()) {
return StatusWith<RollBackLocalOperations::RollbackCommonPoint>(
- ErrorCodes::NoMatchingDocument,
- "RS100 reached beginning of remote oplog [1]");
+ ErrorCodes::InvalidSyncSource, "remote oplog empty or unreadable");
+ }
+
+ RollBackLocalOperations finder(localOplog, rollbackOperation);
+ Timestamp theirTime;
+ while (remoteResult.isOK()) {
+ theirTime = remoteResult.getValue().first["ts"].timestamp();
+ BSONObj theirObj = remoteResult.getValue().first;
+ auto result = finder.onRemoteOperation(theirObj);
+ if (result.isOK()) {
+ return result.getValue();
+ } else if (result.getStatus().code() != ErrorCodes::NoSuchKey) {
+ return result;
+ }
+ remoteResult = remoteIterator->next();
}
-} // namespace repl
-} // namespace mongo
+ severe() << "rollback error RS100 reached beginning of remote oplog";
+ log() << " them: " << remoteOplog.toString();
+ log() << " theirTime: " << theirTime.toStringLong();
+ return StatusWith<RollBackLocalOperations::RollbackCommonPoint>(
+ ErrorCodes::NoMatchingDocument, "RS100 reached beginning of remote oplog [1]");
+}
+
+} // namespace repl
+} // namespace mongo