diff options
Diffstat (limited to 'src/mongo/db/repl/bgsync.cpp')
-rw-r--r-- | src/mongo/db/repl/bgsync.cpp | 861 |
1 files changed, 424 insertions, 437 deletions
diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp index 5317ab36305..5c84a724b94 100644 --- a/src/mongo/db/repl/bgsync.cpp +++ b/src/mongo/db/repl/bgsync.cpp @@ -55,523 +55,510 @@ namespace mongo { - using std::string; +using std::string; namespace repl { namespace { - const char hashFieldName[] = "h"; - int SleepToAllowBatchingMillis = 2; - const int BatchIsSmallish = 40000; // bytes -} // namespace - - MONGO_FP_DECLARE(rsBgSyncProduce); - - BackgroundSync* BackgroundSync::s_instance = 0; - stdx::mutex BackgroundSync::s_mutex; - - //The number and time spent reading batches off the network - static TimerStats getmoreReplStats; - static ServerStatusMetricField<TimerStats> displayBatchesRecieved( - "repl.network.getmores", - &getmoreReplStats ); - //The oplog entries read via the oplog reader - static Counter64 opsReadStats; - static ServerStatusMetricField<Counter64> displayOpsRead( "repl.network.ops", - &opsReadStats ); - //The bytes read via the oplog reader - static Counter64 networkByteStats; - static ServerStatusMetricField<Counter64> displayBytesRead( "repl.network.bytes", - &networkByteStats ); - - //The count of items in the buffer - static Counter64 bufferCountGauge; - static ServerStatusMetricField<Counter64> displayBufferCount( "repl.buffer.count", - &bufferCountGauge ); - //The size (bytes) of items in the buffer - static Counter64 bufferSizeGauge; - static ServerStatusMetricField<Counter64> displayBufferSize( "repl.buffer.sizeBytes", - &bufferSizeGauge ); - //The max size (bytes) of the buffer - static int bufferMaxSizeGauge = 256*1024*1024; - static ServerStatusMetricField<int> displayBufferMaxSize( "repl.buffer.maxSizeBytes", - &bufferMaxSizeGauge ); - - - BackgroundSyncInterface::~BackgroundSyncInterface() {} - - size_t getSize(const BSONObj& o) { - // SERVER-9808 Avoid Fortify complaint about implicit signed->unsigned conversion - return static_cast<size_t>(o.objsize()); +const char hashFieldName[] = "h"; +int SleepToAllowBatchingMillis = 2; +const int BatchIsSmallish = 40000; // bytes +} // namespace + +MONGO_FP_DECLARE(rsBgSyncProduce); + +BackgroundSync* BackgroundSync::s_instance = 0; +stdx::mutex BackgroundSync::s_mutex; + +// The number and time spent reading batches off the network +static TimerStats getmoreReplStats; +static ServerStatusMetricField<TimerStats> displayBatchesRecieved("repl.network.getmores", + &getmoreReplStats); +// The oplog entries read via the oplog reader +static Counter64 opsReadStats; +static ServerStatusMetricField<Counter64> displayOpsRead("repl.network.ops", &opsReadStats); +// The bytes read via the oplog reader +static Counter64 networkByteStats; +static ServerStatusMetricField<Counter64> displayBytesRead("repl.network.bytes", &networkByteStats); + +// The count of items in the buffer +static Counter64 bufferCountGauge; +static ServerStatusMetricField<Counter64> displayBufferCount("repl.buffer.count", + &bufferCountGauge); +// The size (bytes) of items in the buffer +static Counter64 bufferSizeGauge; +static ServerStatusMetricField<Counter64> displayBufferSize("repl.buffer.sizeBytes", + &bufferSizeGauge); +// The max size (bytes) of the buffer +static int bufferMaxSizeGauge = 256 * 1024 * 1024; +static ServerStatusMetricField<int> displayBufferMaxSize("repl.buffer.maxSizeBytes", + &bufferMaxSizeGauge); + + +BackgroundSyncInterface::~BackgroundSyncInterface() {} + +size_t getSize(const BSONObj& o) { + // SERVER-9808 Avoid Fortify complaint about implicit signed->unsigned conversion + return static_cast<size_t>(o.objsize()); +} + +BackgroundSync::BackgroundSync() + : _buffer(bufferMaxSizeGauge, &getSize), + _lastOpTimeFetched(Timestamp(std::numeric_limits<int>::max(), 0), + std::numeric_limits<long long>::max()), + _lastAppliedHash(0), + _lastFetchedHash(0), + _pause(true), + _appliedBuffer(true), + _replCoord(getGlobalReplicationCoordinator()), + _initialSyncRequestedFlag(false), + _indexPrefetchConfig(PREFETCH_ALL) {} + +BackgroundSync* BackgroundSync::get() { + stdx::unique_lock<stdx::mutex> lock(s_mutex); + if (s_instance == NULL && !inShutdown()) { + s_instance = new BackgroundSync(); } + return s_instance; +} - BackgroundSync::BackgroundSync() : _buffer(bufferMaxSizeGauge, &getSize), - _lastOpTimeFetched( - Timestamp(std::numeric_limits<int>::max(), 0), - std::numeric_limits<long long>::max()), - _lastAppliedHash(0), - _lastFetchedHash(0), - _pause(true), - _appliedBuffer(true), - _replCoord(getGlobalReplicationCoordinator()), - _initialSyncRequestedFlag(false), - _indexPrefetchConfig(PREFETCH_ALL) { - } +void BackgroundSync::shutdown() { + stdx::lock_guard<stdx::mutex> lock(_mutex); - BackgroundSync* BackgroundSync::get() { - stdx::unique_lock<stdx::mutex> lock(s_mutex); - if (s_instance == NULL && !inShutdown()) { - s_instance = new BackgroundSync(); - } - return s_instance; - } + // Clear the buffer in case the producerThread is waiting in push() due to a full queue. + invariant(inShutdown()); + _buffer.clear(); + _pause = true; - void BackgroundSync::shutdown() { - stdx::lock_guard<stdx::mutex> lock(_mutex); + // Wake up producerThread so it notices that we're in shutdown + _appliedBufferCondition.notify_all(); + _pausedCondition.notify_all(); +} - // Clear the buffer in case the producerThread is waiting in push() due to a full queue. - invariant(inShutdown()); - _buffer.clear(); - _pause = true; +void BackgroundSync::notify(OperationContext* txn) { + stdx::lock_guard<stdx::mutex> lock(_mutex); - // Wake up producerThread so it notices that we're in shutdown + // If all ops in the buffer have been applied, unblock waitForRepl (if it's waiting) + if (_buffer.empty()) { + _appliedBuffer = true; _appliedBufferCondition.notify_all(); - _pausedCondition.notify_all(); } +} - void BackgroundSync::notify(OperationContext* txn) { - stdx::lock_guard<stdx::mutex> lock(_mutex); +void BackgroundSync::producerThread() { + Client::initThread("rsBackgroundSync"); + AuthorizationSession::get(cc())->grantInternalAuthorization(); - // If all ops in the buffer have been applied, unblock waitForRepl (if it's waiting) - if (_buffer.empty()) { - _appliedBuffer = true; - _appliedBufferCondition.notify_all(); + while (!inShutdown()) { + try { + _producerThread(); + } catch (const DBException& e) { + std::string msg(str::stream() << "sync producer problem: " << e.toString()); + error() << msg; + _replCoord->setMyHeartbeatMessage(msg); + } catch (const std::exception& e2) { + severe() << "sync producer exception: " << e2.what(); + fassertFailed(28546); } } +} - void BackgroundSync::producerThread() { - Client::initThread("rsBackgroundSync"); - AuthorizationSession::get(cc())->grantInternalAuthorization(); - - while (!inShutdown()) { - try { - _producerThread(); - } - catch (const DBException& e) { - std::string msg(str::stream() << "sync producer problem: " << e.toString()); - error() << msg; - _replCoord->setMyHeartbeatMessage(msg); - } - catch (const std::exception& e2) { - severe() << "sync producer exception: " << e2.what(); - fassertFailed(28546); - } +void BackgroundSync::_producerThread() { + const MemberState state = _replCoord->getMemberState(); + // we want to pause when the state changes to primary + if (_replCoord->isWaitingForApplierToDrain() || state.primary()) { + if (!_pause) { + stop(); } + sleepsecs(1); + return; } - void BackgroundSync::_producerThread() { - const MemberState state = _replCoord->getMemberState(); - // we want to pause when the state changes to primary - if (_replCoord->isWaitingForApplierToDrain() || state.primary()) { - if (!_pause) { - stop(); - } - sleepsecs(1); - return; - } - - // TODO(spencer): Use a condition variable to await loading a config. - if (state.startup()) { - // Wait for a config to be loaded - sleepsecs(1); - return; - } - - // We need to wait until initial sync has started. - if (_replCoord->getMyLastOptime().isNull()) { - sleepsecs(1); - return; - } - // we want to unpause when we're no longer primary - // start() also loads _lastOpTimeFetched, which we know is set from the "if" - OperationContextImpl txn; - if (_pause) { - start(&txn); - } + // TODO(spencer): Use a condition variable to await loading a config. + if (state.startup()) { + // Wait for a config to be loaded + sleepsecs(1); + return; + } - produce(&txn); + // We need to wait until initial sync has started. + if (_replCoord->getMyLastOptime().isNull()) { + sleepsecs(1); + return; + } + // we want to unpause when we're no longer primary + // start() also loads _lastOpTimeFetched, which we know is set from the "if" + OperationContextImpl txn; + if (_pause) { + start(&txn); } - void BackgroundSync::produce(OperationContext* txn) { - // this oplog reader does not do a handshake because we don't want the server it's syncing - // from to track how far it has synced - { - stdx::unique_lock<stdx::mutex> lock(_mutex); - if (_lastOpTimeFetched.isNull()) { - // then we're initial syncing and we're still waiting for this to be set - lock.unlock(); - sleepsecs(1); - // if there is no one to sync from - return; - } + produce(&txn); +} - // Wait until we've applied the ops we have before we choose a sync target - while (!_appliedBuffer && !inShutdownStrict()) { - _appliedBufferCondition.wait(lock); - } - if (inShutdownStrict()) { - return; - } +void BackgroundSync::produce(OperationContext* txn) { + // this oplog reader does not do a handshake because we don't want the server it's syncing + // from to track how far it has synced + { + stdx::unique_lock<stdx::mutex> lock(_mutex); + if (_lastOpTimeFetched.isNull()) { + // then we're initial syncing and we're still waiting for this to be set + lock.unlock(); + sleepsecs(1); + // if there is no one to sync from + return; } - while (MONGO_FAIL_POINT(rsBgSyncProduce)) { - sleepmillis(0); + // Wait until we've applied the ops we have before we choose a sync target + while (!_appliedBuffer && !inShutdownStrict()) { + _appliedBufferCondition.wait(lock); } - - - // find a target to sync from the last optime fetched - OpTime lastOpTimeFetched; - { - stdx::unique_lock<stdx::mutex> lock(_mutex); - lastOpTimeFetched = _lastOpTimeFetched; - _syncSourceHost = HostAndPort(); + if (inShutdownStrict()) { + return; } - _syncSourceReader.resetConnection(); - _syncSourceReader.connectToSyncSource(txn, lastOpTimeFetched, _replCoord); + } - { - stdx::unique_lock<stdx::mutex> lock(_mutex); - // no server found - if (_syncSourceReader.getHost().empty()) { - lock.unlock(); - sleepsecs(1); - // if there is no one to sync from - return; - } - lastOpTimeFetched = _lastOpTimeFetched; - _syncSourceHost = _syncSourceReader.getHost(); - _replCoord->signalUpstreamUpdater(); - } + while (MONGO_FAIL_POINT(rsBgSyncProduce)) { + sleepmillis(0); + } - _syncSourceReader.tailingQueryGTE(rsOplogName.c_str(), lastOpTimeFetched.getTimestamp()); - // if target cut connections between connecting and querying (for - // example, because it stepped down) we might not have a cursor - if (!_syncSourceReader.haveCursor()) { - return; - } + // find a target to sync from the last optime fetched + OpTime lastOpTimeFetched; + { + stdx::unique_lock<stdx::mutex> lock(_mutex); + lastOpTimeFetched = _lastOpTimeFetched; + _syncSourceHost = HostAndPort(); + } + _syncSourceReader.resetConnection(); + _syncSourceReader.connectToSyncSource(txn, lastOpTimeFetched, _replCoord); - if (_rollbackIfNeeded(txn, _syncSourceReader)) { - stop(); + { + stdx::unique_lock<stdx::mutex> lock(_mutex); + // no server found + if (_syncSourceReader.getHost().empty()) { + lock.unlock(); + sleepsecs(1); + // if there is no one to sync from return; } + lastOpTimeFetched = _lastOpTimeFetched; + _syncSourceHost = _syncSourceReader.getHost(); + _replCoord->signalUpstreamUpdater(); + } - while (!inShutdown()) { - if (!_syncSourceReader.moreInCurrentBatch()) { - // Check some things periodically - // (whenever we run out of items in the - // current cursor batch) - - int bs = _syncSourceReader.currentBatchMessageSize(); - if( bs > 0 && bs < BatchIsSmallish ) { - // on a very low latency network, if we don't wait a little, we'll be - // getting ops to write almost one at a time. this will both be expensive - // for the upstream server as well as potentially defeating our parallel - // application of batches on the secondary. - // - // the inference here is basically if the batch is really small, we are - // "caught up". - // - sleepmillis(SleepToAllowBatchingMillis); - } - - // If we are transitioning to primary state, we need to leave - // this loop in order to go into bgsync-pause mode. - if (_replCoord->isWaitingForApplierToDrain() || - _replCoord->getMemberState().primary()) { - return; - } - - // re-evaluate quality of sync target - if (shouldChangeSyncSource()) { - return; - } + _syncSourceReader.tailingQueryGTE(rsOplogName.c_str(), lastOpTimeFetched.getTimestamp()); - { - //record time for each getmore - TimerHolder batchTimer(&getmoreReplStats); - - // This calls receiveMore() on the oplogreader cursor. - // It can wait up to five seconds for more data. - _syncSourceReader.more(); - } - networkByteStats.increment(_syncSourceReader.currentBatchMessageSize()); - - if (!_syncSourceReader.moreInCurrentBatch()) { - // If there is still no data from upstream, check a few more things - // and then loop back for another pass at getting more data - { - stdx::unique_lock<stdx::mutex> lock(_mutex); - if (_pause) { - return; - } - } + // if target cut connections between connecting and querying (for + // example, because it stepped down) we might not have a cursor + if (!_syncSourceReader.haveCursor()) { + return; + } - _syncSourceReader.tailCheck(); - if( !_syncSourceReader.haveCursor() ) { - LOG(1) << "replSet end syncTail pass"; - return; - } + if (_rollbackIfNeeded(txn, _syncSourceReader)) { + stop(); + return; + } - continue; - } + while (!inShutdown()) { + if (!_syncSourceReader.moreInCurrentBatch()) { + // Check some things periodically + // (whenever we run out of items in the + // current cursor batch) + + int bs = _syncSourceReader.currentBatchMessageSize(); + if (bs > 0 && bs < BatchIsSmallish) { + // on a very low latency network, if we don't wait a little, we'll be + // getting ops to write almost one at a time. this will both be expensive + // for the upstream server as well as potentially defeating our parallel + // application of batches on the secondary. + // + // the inference here is basically if the batch is really small, we are + // "caught up". + // + sleepmillis(SleepToAllowBatchingMillis); } // If we are transitioning to primary state, we need to leave // this loop in order to go into bgsync-pause mode. if (_replCoord->isWaitingForApplierToDrain() || _replCoord->getMemberState().primary()) { - LOG(1) << "waiting for draining or we are primary, not adding more ops to buffer"; return; } - // At this point, we are guaranteed to have at least one thing to read out - // of the oplogreader cursor. - BSONObj o = _syncSourceReader.nextSafe().getOwned(); - opsReadStats.increment(); + // re-evaluate quality of sync target + if (shouldChangeSyncSource()) { + return; + } { - stdx::unique_lock<stdx::mutex> lock(_mutex); - _appliedBuffer = false; - } + // record time for each getmore + TimerHolder batchTimer(&getmoreReplStats); - OCCASIONALLY { - LOG(2) << "bgsync buffer has " << _buffer.size() << " bytes"; + // This calls receiveMore() on the oplogreader cursor. + // It can wait up to five seconds for more data. + _syncSourceReader.more(); } + networkByteStats.increment(_syncSourceReader.currentBatchMessageSize()); - bufferCountGauge.increment(); - bufferSizeGauge.increment(getSize(o)); - _buffer.push(o); + if (!_syncSourceReader.moreInCurrentBatch()) { + // If there is still no data from upstream, check a few more things + // and then loop back for another pass at getting more data + { + stdx::unique_lock<stdx::mutex> lock(_mutex); + if (_pause) { + return; + } + } - { - stdx::unique_lock<stdx::mutex> lock(_mutex); - _lastFetchedHash = o["h"].numberLong(); - _lastOpTimeFetched = extractOpTime(o); - LOG(3) << "lastOpTimeFetched: " << _lastOpTimeFetched; + _syncSourceReader.tailCheck(); + if (!_syncSourceReader.haveCursor()) { + LOG(1) << "replSet end syncTail pass"; + return; + } + + continue; } } - } - bool BackgroundSync::shouldChangeSyncSource() { - // is it even still around? - if (getSyncTarget().empty() || _syncSourceReader.getHost().empty()) { - return true; + // If we are transitioning to primary state, we need to leave + // this loop in order to go into bgsync-pause mode. + if (_replCoord->isWaitingForApplierToDrain() || _replCoord->getMemberState().primary()) { + LOG(1) << "waiting for draining or we are primary, not adding more ops to buffer"; + return; } - // check other members: is any member's optime more than MaxSyncSourceLag seconds - // ahead of the current sync source? - return _replCoord->shouldChangeSyncSource(_syncSourceReader.getHost()); - } - - - bool BackgroundSync::peek(BSONObj* op) { - return _buffer.peek(*op); - } - - void BackgroundSync::waitForMore() { - BSONObj op; - // Block for one second before timing out. - // Ignore the value of the op we peeked at. - _buffer.blockingPeek(op, 1); - } - - void BackgroundSync::consume() { - // this is just to get the op off the queue, it's been peeked at - // and queued for application already - BSONObj op = _buffer.blockingPop(); - bufferCountGauge.decrement(1); - bufferSizeGauge.decrement(getSize(op)); - } + // At this point, we are guaranteed to have at least one thing to read out + // of the oplogreader cursor. + BSONObj o = _syncSourceReader.nextSafe().getOwned(); + opsReadStats.increment(); - bool BackgroundSync::_rollbackIfNeeded(OperationContext* txn, OplogReader& r) { - string hn = r.conn()->getServerAddress(); - - // Abort only when syncRollback detects we are in a unrecoverable state. - // In other cases, we log the message contained in the error status and retry later. - auto fassertRollbackStatusNoTrace = [](int msgid, const Status& status) { - if (status.isOK()) { - return; - } - if (ErrorCodes::UnrecoverableRollbackError == status.code()) { - fassertNoTrace(msgid, status); - } - warning() << "rollback cannot proceed at this time (retrying later): " - << status; - }; - - if (!r.more()) { - try { - BSONObj theirLastOp = r.getLastOp(rsOplogName.c_str()); - if (theirLastOp.isEmpty()) { - error() << "empty query result from " << hn << " oplog"; - sleepsecs(2); - return true; - } - OpTime theirOpTime = extractOpTime(theirLastOp); - if (theirOpTime < _lastOpTimeFetched) { - log() << "we are ahead of the sync source, will try to roll back"; - fassertRollbackStatusNoTrace( - 28656, - syncRollback(txn, - _replCoord->getMyLastOptime(), - OplogInterfaceLocal(txn, rsOplogName), - RollbackSourceImpl(r.conn(), rsOplogName), - _replCoord)); - - return true; - } - /* we're not ahead? maybe our new query got fresher data. best to come back and try again */ - log() << "syncTail condition 1"; - sleepsecs(1); - } - catch(DBException& e) { - error() << "querying " << hn << ' ' << e.toString(); - sleepsecs(2); - } - return true; + { + stdx::unique_lock<stdx::mutex> lock(_mutex); + _appliedBuffer = false; } - BSONObj o = r.nextSafe(); - OpTime opTime = extractOpTime(o); - long long hash = o["h"].numberLong(); - if ( opTime != _lastOpTimeFetched || hash != _lastFetchedHash ) { - log() << "our last op time fetched: " << _lastOpTimeFetched; - log() << "source's GTE: " << opTime; - fassertRollbackStatusNoTrace( - 28657, - syncRollback(txn, - _replCoord->getMyLastOptime(), - OplogInterfaceLocal(txn, rsOplogName), - RollbackSourceImpl(r.conn(), rsOplogName), - _replCoord)); - return true; + OCCASIONALLY { + LOG(2) << "bgsync buffer has " << _buffer.size() << " bytes"; } - return false; - } - - HostAndPort BackgroundSync::getSyncTarget() { - stdx::unique_lock<stdx::mutex> lock(_mutex); - return _syncSourceHost; - } + bufferCountGauge.increment(); + bufferSizeGauge.increment(getSize(o)); + _buffer.push(o); - void BackgroundSync::clearSyncTarget() { - stdx::unique_lock<stdx::mutex> lock(_mutex); - _syncSourceHost = HostAndPort(); + { + stdx::unique_lock<stdx::mutex> lock(_mutex); + _lastFetchedHash = o["h"].numberLong(); + _lastOpTimeFetched = extractOpTime(o); + LOG(3) << "lastOpTimeFetched: " << _lastOpTimeFetched; + } } +} - void BackgroundSync::stop() { - stdx::lock_guard<stdx::mutex> lock(_mutex); - - _pause = true; - _syncSourceHost = HostAndPort(); - _lastOpTimeFetched = OpTime(); - _lastFetchedHash = 0; - _appliedBufferCondition.notify_all(); - _pausedCondition.notify_all(); +bool BackgroundSync::shouldChangeSyncSource() { + // is it even still around? + if (getSyncTarget().empty() || _syncSourceReader.getHost().empty()) { + return true; } - void BackgroundSync::start(OperationContext* txn) { - massert(16235, "going to start syncing, but buffer is not empty", _buffer.empty()); - - long long updatedLastAppliedHash = _readLastAppliedHash(txn); - stdx::lock_guard<stdx::mutex> lk(_mutex); - _pause = false; - - // reset _last fields with current oplog data - _lastAppliedHash = updatedLastAppliedHash; - _lastOpTimeFetched = _replCoord->getMyLastOptime(); - _lastFetchedHash = _lastAppliedHash; - - LOG(1) << "bgsync fetch queue set to: " << _lastOpTimeFetched << - " " << _lastFetchedHash; - } + // check other members: is any member's optime more than MaxSyncSourceLag seconds + // ahead of the current sync source? + return _replCoord->shouldChangeSyncSource(_syncSourceReader.getHost()); +} + + +bool BackgroundSync::peek(BSONObj* op) { + return _buffer.peek(*op); +} + +void BackgroundSync::waitForMore() { + BSONObj op; + // Block for one second before timing out. + // Ignore the value of the op we peeked at. + _buffer.blockingPeek(op, 1); +} + +void BackgroundSync::consume() { + // this is just to get the op off the queue, it's been peeked at + // and queued for application already + BSONObj op = _buffer.blockingPop(); + bufferCountGauge.decrement(1); + bufferSizeGauge.decrement(getSize(op)); +} + +bool BackgroundSync::_rollbackIfNeeded(OperationContext* txn, OplogReader& r) { + string hn = r.conn()->getServerAddress(); + + // Abort only when syncRollback detects we are in a unrecoverable state. + // In other cases, we log the message contained in the error status and retry later. + auto fassertRollbackStatusNoTrace = [](int msgid, const Status& status) { + if (status.isOK()) { + return; + } + if (ErrorCodes::UnrecoverableRollbackError == status.code()) { + fassertNoTrace(msgid, status); + } + warning() << "rollback cannot proceed at this time (retrying later): " << status; + }; - void BackgroundSync::waitUntilPaused() { - stdx::unique_lock<stdx::mutex> lock(_mutex); - while (!_pause) { - _pausedCondition.wait(lock); + if (!r.more()) { + try { + BSONObj theirLastOp = r.getLastOp(rsOplogName.c_str()); + if (theirLastOp.isEmpty()) { + error() << "empty query result from " << hn << " oplog"; + sleepsecs(2); + return true; + } + OpTime theirOpTime = extractOpTime(theirLastOp); + if (theirOpTime < _lastOpTimeFetched) { + log() << "we are ahead of the sync source, will try to roll back"; + fassertRollbackStatusNoTrace(28656, + syncRollback(txn, + _replCoord->getMyLastOptime(), + OplogInterfaceLocal(txn, rsOplogName), + RollbackSourceImpl(r.conn(), rsOplogName), + _replCoord)); + + return true; + } + /* we're not ahead? maybe our new query got fresher data. best to come back and try again */ + log() << "syncTail condition 1"; + sleepsecs(1); + } catch (DBException& e) { + error() << "querying " << hn << ' ' << e.toString(); + sleepsecs(2); } + return true; } - long long BackgroundSync::getLastAppliedHash() const { - stdx::lock_guard<stdx::mutex> lck(_mutex); - return _lastAppliedHash; + BSONObj o = r.nextSafe(); + OpTime opTime = extractOpTime(o); + long long hash = o["h"].numberLong(); + if (opTime != _lastOpTimeFetched || hash != _lastFetchedHash) { + log() << "our last op time fetched: " << _lastOpTimeFetched; + log() << "source's GTE: " << opTime; + fassertRollbackStatusNoTrace(28657, + syncRollback(txn, + _replCoord->getMyLastOptime(), + OplogInterfaceLocal(txn, rsOplogName), + RollbackSourceImpl(r.conn(), rsOplogName), + _replCoord)); + return true; } - void BackgroundSync::clearBuffer() { - _buffer.clear(); + return false; +} + +HostAndPort BackgroundSync::getSyncTarget() { + stdx::unique_lock<stdx::mutex> lock(_mutex); + return _syncSourceHost; +} + +void BackgroundSync::clearSyncTarget() { + stdx::unique_lock<stdx::mutex> lock(_mutex); + _syncSourceHost = HostAndPort(); +} + +void BackgroundSync::stop() { + stdx::lock_guard<stdx::mutex> lock(_mutex); + + _pause = true; + _syncSourceHost = HostAndPort(); + _lastOpTimeFetched = OpTime(); + _lastFetchedHash = 0; + _appliedBufferCondition.notify_all(); + _pausedCondition.notify_all(); +} + +void BackgroundSync::start(OperationContext* txn) { + massert(16235, "going to start syncing, but buffer is not empty", _buffer.empty()); + + long long updatedLastAppliedHash = _readLastAppliedHash(txn); + stdx::lock_guard<stdx::mutex> lk(_mutex); + _pause = false; + + // reset _last fields with current oplog data + _lastAppliedHash = updatedLastAppliedHash; + _lastOpTimeFetched = _replCoord->getMyLastOptime(); + _lastFetchedHash = _lastAppliedHash; + + LOG(1) << "bgsync fetch queue set to: " << _lastOpTimeFetched << " " << _lastFetchedHash; +} + +void BackgroundSync::waitUntilPaused() { + stdx::unique_lock<stdx::mutex> lock(_mutex); + while (!_pause) { + _pausedCondition.wait(lock); } - - void BackgroundSync::setLastAppliedHash(long long newHash) { - stdx::lock_guard<stdx::mutex> lck(_mutex); - _lastAppliedHash = newHash; +} + +long long BackgroundSync::getLastAppliedHash() const { + stdx::lock_guard<stdx::mutex> lck(_mutex); + return _lastAppliedHash; +} + +void BackgroundSync::clearBuffer() { + _buffer.clear(); +} + +void BackgroundSync::setLastAppliedHash(long long newHash) { + stdx::lock_guard<stdx::mutex> lck(_mutex); + _lastAppliedHash = newHash; +} + +void BackgroundSync::loadLastAppliedHash(OperationContext* txn) { + long long result = _readLastAppliedHash(txn); + stdx::lock_guard<stdx::mutex> lk(_mutex); + _lastAppliedHash = result; +} + +long long BackgroundSync::_readLastAppliedHash(OperationContext* txn) { + BSONObj oplogEntry; + try { + MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { + ScopedTransaction transaction(txn, MODE_IX); + Lock::DBLock lk(txn->lockState(), "local", MODE_X); + bool success = Helpers::getLast(txn, rsOplogName.c_str(), oplogEntry); + if (!success) { + // This can happen when we are to do an initial sync. lastHash will be set + // after the initial sync is complete. + return 0; + } + } + MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "readLastAppliedHash", rsOplogName); + } catch (const DBException& ex) { + severe() << "Problem reading " << rsOplogName << ": " << ex.toStatus(); + fassertFailed(18904); } - - void BackgroundSync::loadLastAppliedHash(OperationContext* txn) { - long long result = _readLastAppliedHash(txn); - stdx::lock_guard<stdx::mutex> lk(_mutex); - _lastAppliedHash = result; + BSONElement hashElement = oplogEntry[hashFieldName]; + if (hashElement.eoo()) { + severe() << "Most recent entry in " << rsOplogName << " missing \"" << hashFieldName + << "\" field"; + fassertFailed(18902); } - - long long BackgroundSync::_readLastAppliedHash(OperationContext* txn) { - BSONObj oplogEntry; - try { - MONGO_WRITE_CONFLICT_RETRY_LOOP_BEGIN { - ScopedTransaction transaction(txn, MODE_IX); - Lock::DBLock lk(txn->lockState(), "local", MODE_X); - bool success = Helpers::getLast(txn, rsOplogName.c_str(), oplogEntry); - if (!success) { - // This can happen when we are to do an initial sync. lastHash will be set - // after the initial sync is complete. - return 0; - } - } MONGO_WRITE_CONFLICT_RETRY_LOOP_END(txn, "readLastAppliedHash", rsOplogName); - } - catch (const DBException& ex) { - severe() << "Problem reading " << rsOplogName << ": " << ex.toStatus(); - fassertFailed(18904); - } - BSONElement hashElement = oplogEntry[hashFieldName]; - if (hashElement.eoo()) { - severe() << "Most recent entry in " << rsOplogName << " missing \"" << hashFieldName << - "\" field"; - fassertFailed(18902); - } - if (hashElement.type() != NumberLong) { - severe() << "Expected type of \"" << hashFieldName << "\" in most recent " << - rsOplogName << " entry to have type NumberLong, but found " << - typeName(hashElement.type()); - fassertFailed(18903); - } - return hashElement.safeNumberLong(); + if (hashElement.type() != NumberLong) { + severe() << "Expected type of \"" << hashFieldName << "\" in most recent " << rsOplogName + << " entry to have type NumberLong, but found " << typeName(hashElement.type()); + fassertFailed(18903); } + return hashElement.safeNumberLong(); +} - bool BackgroundSync::getInitialSyncRequestedFlag() { - stdx::lock_guard<stdx::mutex> lock(_initialSyncMutex); - return _initialSyncRequestedFlag; - } +bool BackgroundSync::getInitialSyncRequestedFlag() { + stdx::lock_guard<stdx::mutex> lock(_initialSyncMutex); + return _initialSyncRequestedFlag; +} - void BackgroundSync::setInitialSyncRequestedFlag(bool value) { - stdx::lock_guard<stdx::mutex> lock(_initialSyncMutex); - _initialSyncRequestedFlag = value; - } +void BackgroundSync::setInitialSyncRequestedFlag(bool value) { + stdx::lock_guard<stdx::mutex> lock(_initialSyncMutex); + _initialSyncRequestedFlag = value; +} - void BackgroundSync::pushTestOpToBuffer(const BSONObj& op) { - stdx::lock_guard<stdx::mutex> lock(_mutex); - _buffer.push(op); - } +void BackgroundSync::pushTestOpToBuffer(const BSONObj& op) { + stdx::lock_guard<stdx::mutex> lock(_mutex); + _buffer.push(op); +} -} // namespace repl -} // namespace mongo +} // namespace repl +} // namespace mongo |