summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/bgsync.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/repl/bgsync.cpp')
-rw-r--r--src/mongo/db/repl/bgsync.cpp861
1 files changed, 424 insertions, 437 deletions
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp
index 5317ab36305..5c84a724b94 100644
--- a/src/mongo/db/repl/bgsync.cpp
+++ b/src/mongo/db/repl/bgsync.cpp
@@ -55,523 +55,510 @@
namespace mongo {
- using std::string;
+using std::string;
namespace repl {
namespace {
- const char hashFieldName[] = "h";
- int SleepToAllowBatchingMillis = 2;
- const int BatchIsSmallish = 40000; // bytes
-} // namespace
-
- MONGO_FP_DECLARE(rsBgSyncProduce);
-
- BackgroundSync* BackgroundSync::s_instance = 0;
- stdx::mutex BackgroundSync::s_mutex;
-
- //The number and time spent reading batches off the network
- static TimerStats getmoreReplStats;
- static ServerStatusMetricField<TimerStats> displayBatchesRecieved(
- "repl.network.getmores",
- &getmoreReplStats );
- //The oplog entries read via the oplog reader
- static Counter64 opsReadStats;
- static ServerStatusMetricField<Counter64> displayOpsRead( "repl.network.ops",
- &opsReadStats );
- //The bytes read via the oplog reader
- static Counter64 networkByteStats;
- static ServerStatusMetricField<Counter64> displayBytesRead( "repl.network.bytes",
- &networkByteStats );
-
- //The count of items in the buffer
- static Counter64 bufferCountGauge;
- static ServerStatusMetricField<Counter64> displayBufferCount( "repl.buffer.count",
- &bufferCountGauge );
- //The size (bytes) of items in the buffer
- static Counter64 bufferSizeGauge;
- static ServerStatusMetricField<Counter64> displayBufferSize( "repl.buffer.sizeBytes",
- &bufferSizeGauge );
- //The max size (bytes) of the buffer
- static int bufferMaxSizeGauge = 256*1024*1024;
- static ServerStatusMetricField<int> displayBufferMaxSize( "repl.buffer.maxSizeBytes",
- &bufferMaxSizeGauge );
-
-
- BackgroundSyncInterface::~BackgroundSyncInterface() {}
-
- size_t getSize(const BSONObj& o) {
- // SERVER-9808 Avoid Fortify complaint about implicit signed->unsigned conversion
- return static_cast<size_t>(o.objsize());
+const char hashFieldName[] = "h";
+int SleepToAllowBatchingMillis = 2;
+const int BatchIsSmallish = 40000; // bytes
+} // namespace
+
+MONGO_FP_DECLARE(rsBgSyncProduce);
+
+BackgroundSync* BackgroundSync::s_instance = 0;
+stdx::mutex BackgroundSync::s_mutex;
+
+// The number and time spent reading batches off the network
+static TimerStats getmoreReplStats;
+static ServerStatusMetricField<TimerStats> displayBatchesRecieved("repl.network.getmores",
+ &getmoreReplStats);
+// The oplog entries read via the oplog reader
+static Counter64 opsReadStats;
+static ServerStatusMetricField<Counter64> displayOpsRead("repl.network.ops", &opsReadStats);
+// The bytes read via the oplog reader
+static Counter64 networkByteStats;
+static ServerStatusMetricField<Counter64> displayBytesRead("repl.network.bytes", &networkByteStats);
+
+// The count of items in the buffer
+static Counter64 bufferCountGauge;
+static ServerStatusMetricField<Counter64> displayBufferCount("repl.buffer.count",
+ &bufferCountGauge);
+// The size (bytes) of items in the buffer
+static Counter64 bufferSizeGauge;
+static ServerStatusMetricField<Counter64> displayBufferSize("repl.buffer.sizeBytes",
+ &bufferSizeGauge);
+// The max size (bytes) of the buffer
+static int bufferMaxSizeGauge = 256 * 1024 * 1024;
+static ServerStatusMetricField<int> displayBufferMaxSize("repl.buffer.maxSizeBytes",
+ &bufferMaxSizeGauge);
+
+
+BackgroundSyncInterface::~BackgroundSyncInterface() {}
+
+size_t getSize(const BSONObj& o) {
+ // SERVER-9808 Avoid Fortify complaint about implicit signed->unsigned conversion
+ return static_cast<size_t>(o.objsize());
+}
+
+BackgroundSync::BackgroundSync()
+ : _buffer(bufferMaxSizeGauge, &getSize),
+ _lastOpTimeFetched(Timestamp(std::numeric_limits<int>::max(), 0),
+ std::numeric_limits<long long>::max()),
+ _lastAppliedHash(0),
+ _lastFetchedHash(0),
+ _pause(true),
+ _appliedBuffer(true),
+ _replCoord(getGlobalReplicationCoordinator()),
+ _initialSyncRequestedFlag(false),
+ _indexPrefetchConfig(PREFETCH_ALL) {}
+
+BackgroundSync* BackgroundSync::get() {
+ stdx::unique_lock<stdx::mutex> lock(s_mutex);
+ if (s_instance == NULL && !inShutdown()) {
+ s_instance = new BackgroundSync();
}
+ return s_instance;
+}
- BackgroundSync::BackgroundSync() : _buffer(bufferMaxSizeGauge, &getSize),
- _lastOpTimeFetched(
- Timestamp(std::numeric_limits<int>::max(), 0),
- std::numeric_limits<long long>::max()),
- _lastAppliedHash(0),
- _lastFetchedHash(0),
- _pause(true),
- _appliedBuffer(true),
- _replCoord(getGlobalReplicationCoordinator()),
- _initialSyncRequestedFlag(false),
- _indexPrefetchConfig(PREFETCH_ALL) {
- }
+void BackgroundSync::shutdown() {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
- BackgroundSync* BackgroundSync::get() {
- stdx::unique_lock<stdx::mutex> lock(s_mutex);
- if (s_instance == NULL && !inShutdown()) {
- s_instance = new BackgroundSync();
- }
- return s_instance;
- }
+ // Clear the buffer in case the producerThread is waiting in push() due to a full queue.
+ invariant(inShutdown());
+ _buffer.clear();
+ _pause = true;
- void BackgroundSync::shutdown() {
- stdx::lock_guard<stdx::mutex> lock(_mutex);
+ // Wake up producerThread so it notices that we're in shutdown
+ _appliedBufferCondition.notify_all();
+ _pausedCondition.notify_all();
+}
- // Clear the buffer in case the producerThread is waiting in push() due to a full queue.
- invariant(inShutdown());
- _buffer.clear();
- _pause = true;
+void BackgroundSync::notify(OperationContext* txn) {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
- // Wake up producerThread so it notices that we're in shutdown
+ // If all ops in the buffer have been applied, unblock waitForRepl (if it's waiting)
+ if (_buffer.empty()) {
+ _appliedBuffer = true;
_appliedBufferCondition.notify_all();
- _pausedCondition.notify_all();
}
+}
- void BackgroundSync::notify(OperationContext* txn) {
- stdx::lock_guard<stdx::mutex> lock(_mutex);
+void BackgroundSync::producerThread() {
+ Client::initThread("rsBackgroundSync");
+ AuthorizationSession::get(cc())->grantInternalAuthorization();
- // If all ops in the buffer have been applied, unblock waitForRepl (if it's waiting)
- if (_buffer.empty()) {
- _appliedBuffer = true;
- _appliedBufferCondition.notify_all();
+ while (!inShutdown()) {
+ try {
+ _producerThread();
+ } catch (const DBException& e) {
+ std::string msg(str::stream() << "sync producer problem: " << e.toString());
+ error() << msg;
+ _replCoord->setMyHeartbeatMessage(msg);
+ } catch (const std::exception& e2) {
+ severe() << "sync producer exception: " << e2.what();
+ fassertFailed(28546);
}
}
+}
- void BackgroundSync::producerThread() {
- Client::initThread("rsBackgroundSync");
- AuthorizationSession::get(cc())->grantInternalAuthorization();
-
- while (!inShutdown()) {
- try {
- _producerThread();
- }
- catch (const DBException& e) {
- std::string msg(str::stream() << "sync producer problem: " << e.toString());
- error() << msg;
- _replCoord->setMyHeartbeatMessage(msg);
- }
- catch (const std::exception& e2) {
- severe() << "sync producer exception: " << e2.what();
- fassertFailed(28546);
- }
+void BackgroundSync::_producerThread() {
+ const MemberState state = _replCoord->getMemberState();
+ // we want to pause when the state changes to primary
+ if (_replCoord->isWaitingForApplierToDrain() || state.primary()) {
+ if (!_pause) {
+ stop();
}
+ sleepsecs(1);
+ return;
}
- void BackgroundSync::_producerThread() {
- const MemberState state = _replCoord->getMemberState();
- // we want to pause when the state changes to primary
- if (_replCoord->isWaitingForApplierToDrain() || state.primary()) {
- if (!_pause) {
- stop();
- }
- sleepsecs(1);
- return;
- }
-
- // TODO(spencer): Use a condition variable to await loading a config.
- if (state.startup()) {
- // Wait for a config to be loaded
- sleepsecs(1);
- return;
- }
-
- // We need to wait until initial sync has started.
- if (_replCoord->getMyLastOptime().isNull()) {
- sleepsecs(1);
- return;
- }
- // we want to unpause when we're no longer primary
- // start() also loads _lastOpTimeFetched, which we know is set from the "if"
- OperationContextImpl txn;
- if (_pause) {
- start(&txn);
- }
+ // TODO(spencer): Use a condition variable to await loading a config.
+ if (state.startup()) {
+ // Wait for a config to be loaded
+ sleepsecs(1);
+ return;
+ }
- produce(&txn);
+ // We need to wait until initial sync has started.
+ if (_replCoord->getMyLastOptime().isNull()) {
+ sleepsecs(1);
+ return;
+ }
+ // we want to unpause when we're no longer primary
+ // start() also loads _lastOpTimeFetched, which we know is set from the "if"
+ OperationContextImpl txn;
+ if (_pause) {
+ start(&txn);
}
- void BackgroundSync::produce(OperationContext* txn) {
- // this oplog reader does not do a handshake because we don't want the server it's syncing
- // from to track how far it has synced
- {
- stdx::unique_lock<stdx::mutex> lock(_mutex);
- if (_lastOpTimeFetched.isNull()) {
- // then we're initial syncing and we're still waiting for this to be set
- lock.unlock();
- sleepsecs(1);
- // if there is no one to sync from
- return;
- }
+ produce(&txn);
+}
- // Wait until we've applied the ops we have before we choose a sync target
- while (!_appliedBuffer && !inShutdownStrict()) {
- _appliedBufferCondition.wait(lock);
- }
- if (inShutdownStrict()) {
- return;
- }
+void BackgroundSync::produce(OperationContext* txn) {
+ // this oplog reader does not do a handshake because we don't want the server it's syncing
+ // from to track how far it has synced
+ {
+ stdx::unique_lock<stdx::mutex> lock(_mutex);
+ if (_lastOpTimeFetched.isNull()) {
+ // then we're initial syncing and we're still waiting for this to be set
+ lock.unlock();
+ sleepsecs(1);
+ // if there is no one to sync from
+ return;
}
- while (MONGO_FAIL_POINT(rsBgSyncProduce)) {
- sleepmillis(0);
+ // Wait until we've applied the ops we have before we choose a sync target
+ while (!_appliedBuffer && !inShutdownStrict()) {
+ _appliedBufferCondition.wait(lock);
}
-
-
- // find a target to sync from the last optime fetched
- OpTime lastOpTimeFetched;
- {
- stdx::unique_lock<stdx::mutex> lock(_mutex);
- lastOpTimeFetched = _lastOpTimeFetched;
- _syncSourceHost = HostAndPort();
+ if (inShutdownStrict()) {
+ return;
}
- _syncSourceReader.resetConnection();
- _syncSourceReader.connectToSyncSource(txn, lastOpTimeFetched, _replCoord);
+ }
- {
- stdx::unique_lock<stdx::mutex> lock(_mutex);
- // no server found
- if (_syncSourceReader.getHost().empty()) {
- lock.unlock();
- sleepsecs(1);
- // if there is no one to sync from
- return;
- }
- lastOpTimeFetched = _lastOpTimeFetched;
- _syncSourceHost = _syncSourceReader.getHost();
- _replCoord->signalUpstreamUpdater();
- }
+ while (MONGO_FAIL_POINT(rsBgSyncProduce)) {
+ sleepmillis(0);
+ }
- _syncSourceReader.tailingQueryGTE(rsOplogName.c_str(), lastOpTimeFetched.getTimestamp());
- // if target cut connections between connecting and querying (for
- // example, because it stepped down) we might not have a cursor
- if (!_syncSourceReader.haveCursor()) {
- return;
- }
+ // find a target to sync from the last optime fetched
+ OpTime lastOpTimeFetched;
+ {
+ stdx::unique_lock<stdx::mutex> lock(_mutex);
+ lastOpTimeFetched = _lastOpTimeFetched;
+ _syncSourceHost = HostAndPort();
+ }
+ _syncSourceReader.resetConnection();
+ _syncSourceReader.connectToSyncSource(txn, lastOpTimeFetched, _replCoord);
- if (_rollbackIfNeeded(txn, _syncSourceReader)) {
- stop();
+ {
+ stdx::unique_lock<stdx::mutex> lock(_mutex);
+ // no server found
+ if (_syncSourceReader.getHost().empty()) {
+ lock.unlock();
+ sleepsecs(1);
+ // if there is no one to sync from
return;
}
+ lastOpTimeFetched = _lastOpTimeFetched;
+ _syncSourceHost = _syncSourceReader.getHost();
+ _replCoord->signalUpstreamUpdater();
+ }
- while (!inShutdown()) {
- if (!_syncSourceReader.moreInCurrentBatch()) {
- // Check some things periodically
- // (whenever we run out of items in the
- // current cursor batch)
-
- int bs = _syncSourceReader.currentBatchMessageSize();
- if( bs > 0 && bs < BatchIsSmallish ) {
- // on a very low latency network, if we don't wait a little, we'll be
- // getting ops to write almost one at a time. this will both be expensive
- // for the upstream server as well as potentially defeating our parallel
- // application of batches on the secondary.
- //
- // the inference here is basically if the batch is really small, we are
- // "caught up".
- //
- sleepmillis(SleepToAllowBatchingMillis);
- }
-
- // If we are transitioning to primary state, we need to leave
- // this loop in order to go into bgsync-pause mode.
- if (_replCoord->isWaitingForApplierToDrain() ||
- _replCoord->getMemberState().primary()) {
- return;
- }
-
- // re-evaluate quality of sync target
- if (shouldChangeSyncSource()) {
- return;
- }
+ _syncSourceReader.tailingQueryGTE(rsOplogName.c_str(), lastOpTimeFetched.getTimestamp());
- {
- //record time for each getmore
- TimerHolder batchTimer(&getmoreReplStats);
-
- // This calls receiveMore() on the oplogreader cursor.
- // It can wait up to five seconds for more data.
- _syncSourceReader.more();
- }
- networkByteStats.increment(_syncSourceReader.currentBatchMessageSize());
-
- if (!_syncSourceReader.moreInCurrentBatch()) {
- // If there is still no data from upstream, check a few more things
- // and then loop back for another pass at getting more data
- {
- stdx::unique_lock<stdx::mutex> lock(_mutex);
- if (_pause) {
- return;
- }
- }
+ // if target cut connections between connecting and querying (for
+ // example, because it stepped down) we might not have a cursor
+ if (!_syncSourceReader.haveCursor()) {
+ return;
+ }
- _syncSourceReader.tailCheck();
- if( !_syncSourceReader.haveCursor() ) {
- LOG(1) << "replSet end syncTail pass";
- return;
- }
+ if (_rollbackIfNeeded(txn, _syncSourceReader)) {
+ stop();
+ return;
+ }
- continue;
- }
+ while (!inShutdown()) {
+ if (!_syncSourceReader.moreInCurrentBatch()) {
+ // Check some things periodically
+ // (whenever we run out of items in the
+ // current cursor batch)
+
+ int bs = _syncSourceReader.currentBatchMessageSize();
+ if (bs > 0 && bs < BatchIsSmallish) {
+ // on a very low latency network, if we don't wait a little, we'll be
+ // getting ops to write almost one at a time. this will both be expensive
+ // for the upstream server as well as potentially defeating our parallel
+ // application of batches on the secondary.
+ //
+ // the inference here is basically if the batch is really small, we are
+ // "caught up".
+ //
+ sleepmillis(SleepToAllowBatchingMillis);
}
// If we are transitioning to primary state, we need to leave
// this loop in order to go into bgsync-pause mode.
if (_replCoord->isWaitingForApplierToDrain() ||
_replCoord->getMemberState().primary()) {
- LOG(1) << "waiting for draining or we are primary, not adding more ops to buffer";
return;
}
- // At this point, we are guaranteed to have at least one thing to read out
- // of the oplogreader cursor.
- BSONObj o = _syncSourceReader.nextSafe().getOwned();
- opsReadStats.increment();
+ // re-evaluate quality of sync target
+ if (shouldChangeSyncSource()) {
+ return;
+ }
{
- stdx::unique_lock<stdx::mutex> lock(_mutex);
- _appliedBuffer = false;
- }
+ // record time for each getmore
+ TimerHolder batchTimer(&getmoreReplStats);
- OCCASIONALLY {
- LOG(2) << "bgsync buffer has " << _buffer.size() << " bytes";
+ // This calls receiveMore() on the oplogreader cursor.
+ // It can wait up to five seconds for more data.
+ _syncSourceReader.more();
}
+ networkByteStats.increment(_syncSourceReader.currentBatchMessageSize());
- bufferCountGauge.increment();
- bufferSizeGauge.increment(getSize(o));
- _buffer.push(o);
+ if (!_syncSourceReader.moreInCurrentBatch()) {
+ // If there is still no data from upstream, check a few more things
+ // and then loop back for another pass at getting more data
+ {
+ stdx::unique_lock<stdx::mutex> lock(_mutex);
+ if (_pause) {
+ return;
+ }
+ }
- {
- stdx::unique_lock<stdx::mutex> lock(_mutex);
- _lastFetchedHash = o["h"].numberLong();
- _lastOpTimeFetched = extractOpTime(o);
- LOG(3) << "lastOpTimeFetched: " << _lastOpTimeFetched;
+ _syncSourceReader.tailCheck();
+ if (!_syncSourceReader.haveCursor()) {
+ LOG(1) << "replSet end syncTail pass";
+ return;
+ }
+
+ continue;
}
}
- }
- bool BackgroundSync::shouldChangeSyncSource() {
- // is it even still around?
- if (getSyncTarget().empty() || _syncSourceReader.getHost().empty()) {
- return true;
+ // If we are transitioning to primary state, we need to leave
+ // this loop in order to go into bgsync-pause mode.
+ if (_replCoord->isWaitingForApplierToDrain() || _replCoord->getMemberState().primary()) {
+ LOG(1) << "waiting for draining or we are primary, not adding more ops to buffer";
+ return;
}
- // check other members: is any member's optime more than MaxSyncSourceLag seconds
- // ahead of the current sync source?
- return _replCoord->shouldChangeSyncSource(_syncSourceReader.getHost());
- }
-
-
- bool BackgroundSync::peek(BSONObj* op) {
- return _buffer.peek(*op);
- }
-
- void BackgroundSync::waitForMore() {
- BSONObj op;
- // Block for one second before timing out.
- // Ignore the value of the op we peeked at.
- _buffer.blockingPeek(op, 1);
- }
-
- void BackgroundSync::consume() {
- // this is just to get the op off the queue, it's been peeked at
- // and queued for application already
- BSONObj op = _buffer.blockingPop();
- bufferCountGauge.decrement(1);
- bufferSizeGauge.decrement(getSize(op));
- }
+ // At this point, we are guaranteed to have at least one thing to read out
+ // of the oplogreader cursor.
+ BSONObj o = _syncSourceReader.nextSafe().getOwned();
+ opsReadStats.increment();
- bool BackgroundSync::_rollbackIfNeeded(OperationContext* txn, OplogReader& r) {
- string hn = r.conn()->getServerAddress();
-
- // Abort only when syncRollback detects we are in a unrecoverable state.
- // In other cases, we log the message contained in the error status and retry later.
- auto fassertRollbackStatusNoTrace = [](int msgid, const Status& status) {
- if (status.isOK()) {
- return;
- }
- if (ErrorCodes::UnrecoverableRollbackError == status.code()) {
- fassertNoTrace(msgid, status);
- }
- warning() << "rollback cannot proceed at this time (retrying later): "
- << status;
- };
-
- if (!r.more()) {
- try {
- BSONObj theirLastOp = r.getLastOp(rsOplogName.c_str());
- if (theirLastOp.isEmpty()) {
- error() << "empty query result from " << hn << " oplog";
- sleepsecs(2);
- return true;
- }
- OpTime theirOpTime = extractOpTime(theirLastOp);
- if (theirOpTime < _lastOpTimeFetched) {
- log() << "we are ahead of the sync source, will try to roll back";
- fassertRollbackStatusNoTrace(
- 28656,
- syncRollback(txn,
- _replCoord->getMyLastOptime(),
- OplogInterfaceLocal(txn, rsOplogName),
- RollbackSourceImpl(r.conn(), rsOplogName),
- _replCoord));
-
- return true;
- }
- /* we're not ahead? maybe our new query got fresher data. best to come back and try again */
- log() << "syncTail condition 1";
- sleepsecs(1);
- }
- catch(DBException& e) {
- error() << "querying " << hn << ' ' << e.toString();
- sleepsecs(2);
- }
- return true;
+ {
+ stdx::unique_lock<stdx::mutex> lock(_mutex);
+ _appliedBuffer = false;
}
- BSONObj o = r.nextSafe();
- OpTime opTime = extractOpTime(o);
- long long hash = o["h"].numberLong();
- if ( opTime != _lastOpTimeFetched || hash != _lastFetchedHash ) {
- log() << "our last op time fetched: " << _lastOpTimeFetched;
- log() << "source's GTE: " << opTime;
- fassertRollbackStatusNoTrace(
- 28657,
- syncRollback(txn,
- _replCoord->getMyLastOptime(),
- OplogInterfaceLocal(txn, rsOplogName),
- RollbackSourceImpl(r.conn(), rsOplogName),
- _replCoord));
- return true;
+ OCCASIONALLY {
+ LOG(2) << "bgsync buffer has " << _buffer.size() << " bytes";
}
- return false;
- }
-
- HostAndPort BackgroundSync::getSyncTarget() {
- stdx::unique_lock<stdx::mutex> lock(_mutex);
- return _syncSourceHost;
- }
+ bufferCountGauge.increment();
+ bufferSizeGauge.increment(getSize(o));
+ _buffer.push(o);
- void BackgroundSync::clearSyncTarget() {
- stdx::unique_lock<stdx::mutex> lock(_mutex);
- _syncSourceHost = HostAndPort();
+ {
+ stdx::unique_lock<stdx::mutex> lock(_mutex);
+ _lastFetchedHash = o["h"].numberLong();
+ _lastOpTimeFetched = extractOpTime(o);
+ LOG(3) << "lastOpTimeFetched: " << _lastOpTimeFetched;
+ }
}
+}
- void BackgroundSync::stop() {
- stdx::lock_guard<stdx::mutex> lock(_mutex);
-
- _pause = true;
- _syncSourceHost = HostAndPort();
- _lastOpTimeFetched = OpTime();
- _lastFetchedHash = 0;
- _appliedBufferCondition.notify_all();
- _pausedCondition.notify_all();
+bool BackgroundSync::shouldChangeSyncSource() {
+ // is it even still around?
+ if (getSyncTarget().empty() || _syncSourceReader.getHost().empty()) {
+ return true;
}
- void BackgroundSync::start(OperationContext* txn) {
- massert(16235, "going to start syncing, but buffer is not empty", _buffer.empty());
-
- long long updatedLastAppliedHash = _readLastAppliedHash(txn);
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- _pause = false;
-
- // reset _last fields with current oplog data
- _lastAppliedHash = updatedLastAppliedHash;
- _lastOpTimeFetched = _replCoord->getMyLastOptime();
- _lastFetchedHash = _lastAppliedHash;
-
- LOG(1) << "bgsync fetch queue set to: " << _lastOpTimeFetched <<
- " " << _lastFetchedHash;
- }
+ // check other members: is any member's optime more than MaxSyncSourceLag seconds
+ // ahead of the current sync source?
+ return _replCoord->shouldChangeSyncSource(_syncSourceReader.getHost());
+}
+
+
+bool BackgroundSync::peek(BSONObj* op) {
+ return _buffer.peek(*op);
+}
+
+void BackgroundSync::waitForMore() {
+ BSONObj op;
+ // Block for one second before timing out.
+ // Ignore the value of the op we peeked at.
+ _buffer.blockingPeek(op, 1);
+}
+
+void BackgroundSync::consume() {
+ // this is just to get the op off the queue, it's been peeked at
+ // and queued for application already
+ BSONObj op = _buffer.blockingPop();
+ bufferCountGauge.decrement(1);
+ bufferSizeGauge.decrement(getSize(op));
+}
+
+bool BackgroundSync::_rollbackIfNeeded(OperationContext* txn, OplogReader& r) {
+ string hn = r.conn()->getServerAddress();
+
+ // Abort only when syncRollback detects we are in a unrecoverable state.
+ // In other cases, we log the message contained in the error status and retry later.
+ auto fassertRollbackStatusNoTrace = [](int msgid, const Status& status) {
+ if (status.isOK()) {
+ return;
+ }
+ if (ErrorCodes::UnrecoverableRollbackError == status.code()) {
+ fassertNoTrace(msgid, status);
+ }
+ warning() << "rollback cannot proceed at this time (retrying later): " << status;
+ };
- void BackgroundSync::waitUntilPaused() {
- stdx::unique_lock<stdx::mutex> lock(_mutex);
- while (!_pause) {
- _pausedCondition.wait(lock);
+ if (!r.more()) {
+ try {
+ BSONObj theirLastOp = r.getLastOp(rsOplogName.c_str());
+ if (theirLastOp.isEmpty()) {
+ error() << "empty query result from " << hn << " oplog";
+ sleepsecs(2);
+ return true;
+ }
+ OpTime theirOpTime = extractOpTime(theirLastOp);
+ if (theirOpTime < _lastOpTimeFetched) {
+ log() << "we are ahead of the sync source, will try to roll back";
+ fassertRollbackStatusNoTrace(28656,
+ syncRollback(txn,
+ _replCoord->getMyLastOptime(),
+ OplogInterfaceLocal(txn, rsOplogName),
+ RollbackSourceImpl(r.conn(), rsOplogName),
+ _replCoord));
+
+ return true;
+ }
+ /* we're not ahead? maybe our new query got fresher data. best to come back and try again */
+ log() << "syncTail condition 1";
+ sleepsecs(1);
+ } catch (DBException& e) {
+ error() << "querying " << hn << ' ' << e.toString();
+ sleepsecs(2);
}
+ return true;
}
- long long BackgroundSync::getLastAppliedHash() const {
- stdx::lock_guard<stdx::mutex> lck(_mutex);
- return _lastAppliedHash;
+ BSONObj o = r.nextSafe();
+ OpTime opTime = extractOpTime(o);
+ long long hash = o["h"].numberLong();
+ if (opTime != _lastOpTimeFetched || hash != _lastFetchedHash) {
+ log() << "our last op time fetched: " << _lastOpTimeFetched;
+ log() << "source's GTE: " << opTime;
+ fassertRollbackStatusNoTrace(28657,
+ syncRollback(txn,
+ _replCoord->getMyLastOptime(),
+ OplogInterfaceLocal(txn, rsOplogName),
+ RollbackSourceImpl(r.conn(), rsOplogName),
+ _replCoord));
+ return true;
}
- void BackgroundSync::clearBuffer() {
- _buffer.clear();
+ return false;
+}
+
+HostAndPort BackgroundSync::getSyncTarget() {
+ stdx::unique_lock<stdx::mutex> lock(_mutex);
+ return _syncSourceHost;
+}
+
+void BackgroundSync::clearSyncTarget() {
+ stdx::unique_lock<stdx::mutex> lock(_mutex);
+ _syncSourceHost = HostAndPort();
+}
+
+void BackgroundSync::stop() {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+
+ _pause = true;
+ _syncSourceHost = HostAndPort();
+ _lastOpTimeFetched = OpTime();
+ _lastFetchedHash = 0;
+ _appliedBufferCondition.notify_all();
+ _pausedCondition.notify_all();
+}
+
+void BackgroundSync::start(OperationContext* txn) {
+ massert(16235, "going to start syncing, but buffer is not empty", _buffer.empty());
+
+ long long updatedLastAppliedHash = _readLastAppliedHash(txn);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ _pause = false;
+
+ // reset _last fields with current oplog data
+ _lastAppliedHash = updatedLastAppliedHash;
+ _lastOpTimeFetched = _replCoord->getMyLastOptime();
+ _lastFetchedHash = _lastAppliedHash;
+
+ LOG(1) << "bgsync fetch queue set to: " << _lastOpTimeFetched << " " << _lastFetchedHash;
+}
+
+void BackgroundSync::waitUntilPaused() {
+ stdx::unique_lock<stdx::mutex> lock(_mutex);
+ while (!_pause) {
+ _pausedCondition.wait(lock);
}
-
- void BackgroundSync::setLastAppliedHash(long long newHash) {
- stdx::lock_guard<stdx::mutex> lck(_mutex);
- _lastAppliedHash = newHash;
+}
+
+long long BackgroundSync::getLastAppliedHash() const {
+ stdx::lock_guard<stdx::mutex> lck(_mutex);
+ return _lastAppliedHash;
+}
+
+void BackgroundSync::clearBuffer() {
+ _buffer.clear();
+}
+
+void BackgroundSync::setLastAppliedHash(long long newHash) {
+ stdx::lock_guard<stdx::mutex> lck(_mutex);
+ _lastAppliedHash = newHash;
+}
+
+void BackgroundSync::loadLastAppliedHash(OperationContext* txn) {
+ long long result = _readLastAppliedHash(txn);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ _lastAppliedHash = result;
+}
+
+long long BackgroundSync::_readLastAppliedHash(OperationContext* txn) {
+ BSONObj oplogEntry;
+ try {
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
+ ScopedTransaction transaction(txn, MODE_IX);
+ Lock::DBLock lk(txn->lockState(), "local", MODE_X);
+ bool success = Helpers::getLast(txn, rsOplogName.c_str(), oplogEntry);
+ if (!success) {
+ // This can happen when we are to do an initial sync. lastHash will be set
+ // after the initial sync is complete.
+ return 0;
+ }
+ }
+ MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "readLastAppliedHash", rsOplogName);
+ } catch (const DBException& ex) {
+ severe() << "Problem reading " << rsOplogName << ": " << ex.toStatus();
+ fassertFailed(18904);
}
-
- void BackgroundSync::loadLastAppliedHash(OperationContext* txn) {
- long long result = _readLastAppliedHash(txn);
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- _lastAppliedHash = result;
+ BSONElement hashElement = oplogEntry[hashFieldName];
+ if (hashElement.eoo()) {
+ severe() << "Most recent entry in " << rsOplogName << " missing \"" << hashFieldName
+ << "\" field";
+ fassertFailed(18902);
}
-
- long long BackgroundSync::_readLastAppliedHash(OperationContext* txn) {
- BSONObj oplogEntry;
- try {
- MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN {
- ScopedTransaction transaction(txn, MODE_IX);
- Lock::DBLock lk(txn->lockState(), "local", MODE_X);
- bool success = Helpers::getLast(txn, rsOplogName.c_str(), oplogEntry);
- if (!success) {
- // This can happen when we are to do an initial sync. lastHash will be set
- // after the initial sync is complete.
- return 0;
- }
- } MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "readLastAppliedHash", rsOplogName);
- }
- catch (const DBException& ex) {
- severe() << "Problem reading " << rsOplogName << ": " << ex.toStatus();
- fassertFailed(18904);
- }
- BSONElement hashElement = oplogEntry[hashFieldName];
- if (hashElement.eoo()) {
- severe() << "Most recent entry in " << rsOplogName << " missing \"" << hashFieldName <<
- "\" field";
- fassertFailed(18902);
- }
- if (hashElement.type() != NumberLong) {
- severe() << "Expected type of \"" << hashFieldName << "\" in most recent " <<
- rsOplogName << " entry to have type NumberLong, but found " <<
- typeName(hashElement.type());
- fassertFailed(18903);
- }
- return hashElement.safeNumberLong();
+ if (hashElement.type() != NumberLong) {
+ severe() << "Expected type of \"" << hashFieldName << "\" in most recent " << rsOplogName
+ << " entry to have type NumberLong, but found " << typeName(hashElement.type());
+ fassertFailed(18903);
}
+ return hashElement.safeNumberLong();
+}
- bool BackgroundSync::getInitialSyncRequestedFlag() {
- stdx::lock_guard<stdx::mutex> lock(_initialSyncMutex);
- return _initialSyncRequestedFlag;
- }
+bool BackgroundSync::getInitialSyncRequestedFlag() {
+ stdx::lock_guard<stdx::mutex> lock(_initialSyncMutex);
+ return _initialSyncRequestedFlag;
+}
- void BackgroundSync::setInitialSyncRequestedFlag(bool value) {
- stdx::lock_guard<stdx::mutex> lock(_initialSyncMutex);
- _initialSyncRequestedFlag = value;
- }
+void BackgroundSync::setInitialSyncRequestedFlag(bool value) {
+ stdx::lock_guard<stdx::mutex> lock(_initialSyncMutex);
+ _initialSyncRequestedFlag = value;
+}
- void BackgroundSync::pushTestOpToBuffer(const BSONObj& op) {
- stdx::lock_guard<stdx::mutex> lock(_mutex);
- _buffer.push(op);
- }
+void BackgroundSync::pushTestOpToBuffer(const BSONObj& op) {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ _buffer.push(op);
+}
-} // namespace repl
-} // namespace mongo
+} // namespace repl
+} // namespace mongo