diff options
author | Andy Schwerin <schwerin@mongodb.com> | 2015-06-11 18:32:23 -0400 |
---|---|---|
committer | Andy Schwerin <schwerin@mongodb.com> | 2015-06-16 10:36:48 -0400 |
commit | b06861044ad1610cef2f2fa8a9e24e5b72ee7345 (patch) | |
tree | a957920ed04330a170d219a9a95f069a00c5babf /src/mongo/s/d_migrate.cpp | |
parent | e9aee4b9bc04a4aaad478aeeee561fcba970bcb5 (diff) | |
download | mongo-b06861044ad1610cef2f2fa8a9e24e5b72ee7345.tar.gz |
SERVER-6686 Remove all uses of boost::xtime outside of time_support.cpp.
Diffstat (limited to 'src/mongo/s/d_migrate.cpp')
-rw-r--r-- | src/mongo/s/d_migrate.cpp | 135 |
1 files changed, 66 insertions, 69 deletions
diff --git a/src/mongo/s/d_migrate.cpp b/src/mongo/s/d_migrate.cpp index a74198e6464..9dffd676e7f 100644 --- a/src/mongo/s/d_migrate.cpp +++ b/src/mongo/s/d_migrate.cpp @@ -33,9 +33,12 @@ #include "mongo/platform/basic.h" #include <algorithm> -#include <boost/thread/thread.hpp> +#include <chrono> +#include <condition_variable> #include <map> +#include <mutex> #include <string> +#include <thread> #include <vector> #include "mongo/client/connpool.h" @@ -86,7 +89,6 @@ #include "mongo/util/fail_point_service.h" #include "mongo/util/log.h" #include "mongo/util/processinfo.h" -#include "mongo/util/queue.h" #include "mongo/util/startup_test.h" // Pause while a fail point is enabled. @@ -94,6 +96,7 @@ namespace mongo { + using namespace std::chrono; using std::list; using std::set; using std::string; @@ -193,7 +196,7 @@ namespace { CurOp * op = CurOp::get(_txn); { - stdx::lock_guard<Client> lk(*_txn->getClient()); + std::lock_guard<Client> lk(*_txn->getClient()); op->setMessage_inlock(s.c_str()); } @@ -266,7 +269,7 @@ namespace { // Get global shared to synchronize with logOp. Also see comments in the class // members declaration for more details. Lock::GlobalRead globalShared(txn->lockState()); - boost::lock_guard<boost::mutex> lk(_mutex); + std::lock_guard<std::mutex> lk(_mutex); if (_active) { return false; @@ -283,7 +286,7 @@ namespace { _active = true; - boost::lock_guard<boost::mutex> tLock(_cloneLocsMutex); + std::lock_guard<std::mutex> tLock(_cloneLocsMutex); verify(_cloneLocs.size() == 0); return true; @@ -296,7 +299,7 @@ namespace { // Get global shared to synchronize with logOp. Also see comments in the class // members declaration for more details. Lock::GlobalRead globalShared(txn->lockState()); - boost::lock_guard<boost::mutex> lk(_mutex); + std::lock_guard<std::mutex> lk(_mutex); _active = false; _deleteNotifyExec.reset( NULL ); @@ -307,7 +310,7 @@ namespace { _reload.clear(); _memoryUsed = 0; - boost::lock_guard<boost::mutex> cloneLock(_cloneLocsMutex); + std::lock_guard<std::mutex> cloneLock(_cloneLocsMutex); _cloneLocs.clear(); } @@ -435,7 +438,7 @@ namespace { { AutoGetCollectionForRead ctx(txn, getNS()); - boost::lock_guard<boost::mutex> sl(_mutex); + std::lock_guard<std::mutex> sl(_mutex); if (!_active) { errmsg = "no active migration!"; return false; @@ -494,7 +497,7 @@ namespace { // It's alright not to lock _mutex all the way through based on the assumption // that this is only called by the main thread that drives the migration and // only it can start and stop the current migration. - boost::lock_guard<boost::mutex> sl(_mutex); + std::lock_guard<std::mutex> sl(_mutex); invariant( _deleteNotifyExec.get() == NULL ); WorkingSet* ws = new WorkingSet(); @@ -536,7 +539,7 @@ namespace { avgRecSize = 0; maxRecsWhenFull = Chunk::MaxObjectPerChunk + 1; } - + // do a full traversal of the chunk and don't stop even if we think it is a large chunk // we want the number of records to better report, in that case bool isLargeChunk = false; @@ -544,7 +547,7 @@ namespace { RecordId dl; while (PlanExecutor::ADVANCED == exec->getNext(NULL, &dl)) { if ( ! isLargeChunk ) { - boost::lock_guard<boost::mutex> lk(_cloneLocsMutex); + std::lock_guard<std::mutex> lk(_cloneLocsMutex); _cloneLocs.insert( dl ); } @@ -557,7 +560,7 @@ namespace { exec.reset(); if ( isLargeChunk ) { - boost::lock_guard<boost::mutex> sl(_mutex); + std::lock_guard<std::mutex> sl(_mutex); warning() << "cannot move chunk: the maximum number of documents for a chunk is " << maxRecsWhenFull << " , the maximum chunk size is " << maxChunkSize << " , average document size is " << avgRecSize @@ -585,7 +588,7 @@ namespace { { AutoGetCollectionForRead ctx(txn, getNS()); - boost::lock_guard<boost::mutex> sl(_mutex); + std::lock_guard<std::mutex> sl(_mutex); if (!_active) { errmsg = "not active"; return false; @@ -608,7 +611,7 @@ namespace { while (!isBufferFilled) { AutoGetCollectionForRead ctx(txn, getNS()); - boost::lock_guard<boost::mutex> sl(_mutex); + std::lock_guard<std::mutex> sl(_mutex); if (!_active) { errmsg = "not active"; return false; @@ -623,7 +626,7 @@ namespace { return false; } - boost::lock_guard<boost::mutex> lk(_cloneLocsMutex); + std::lock_guard<std::mutex> lk(_cloneLocsMutex); set<RecordId>::iterator cloneLocsIter = _cloneLocs.begin(); for ( ; cloneLocsIter != _cloneLocs.end(); ++cloneLocsIter) { if (tracker.intervalHasElapsed()) // should I yield? @@ -666,33 +669,33 @@ namespace { // that check only works for non-mmapv1 engines, and this is needed // for mmapv1. - boost::lock_guard<boost::mutex> lk(_cloneLocsMutex); + std::lock_guard<std::mutex> lk(_cloneLocsMutex); _cloneLocs.erase( dl ); } std::size_t cloneLocsRemaining() { - boost::lock_guard<boost::mutex> lk(_cloneLocsMutex); + std::lock_guard<std::mutex> lk(_cloneLocsMutex); return _cloneLocs.size(); } long long mbUsed() const { - boost::lock_guard<boost::mutex> lk(_mutex); + std::lock_guard<std::mutex> lk(_mutex); return _memoryUsed / ( 1024 * 1024 ); } bool getInCriticalSection() const { - boost::lock_guard<boost::mutex> lk(_mutex); + std::lock_guard<std::mutex> lk(_mutex); return _inCriticalSection; } void setInCriticalSection( bool b ) { - boost::lock_guard<boost::mutex> lk(_mutex); + std::lock_guard<std::mutex> lk(_mutex); _inCriticalSection = b; _inCriticalSectionCV.notify_all(); } std::string getNS() const { - boost::lock_guard<boost::mutex> sl(_mutex); + std::lock_guard<std::mutex> sl(_mutex); return _ns; } @@ -700,13 +703,10 @@ namespace { * @return true if we are NOT in the critical section */ bool waitTillNotInCriticalSection( int maxSecondsToWait ) { - boost::xtime xt; - boost::xtime_get(&xt, MONGO_BOOST_TIME_UTC); - xt.sec += maxSecondsToWait; - - boost::unique_lock<boost::mutex> lk(_mutex); + const auto deadline = system_clock::now() + seconds(maxSecondsToWait); + std::unique_lock<std::mutex> lk(_mutex); while (_inCriticalSection) { - if (!_inCriticalSectionCV.timed_wait(lk, xt)) + if (std::cv_status::timeout == _inCriticalSectionCV.wait_until(lk, deadline)) return false; } @@ -716,8 +716,8 @@ namespace { bool isActive() const { return _getActive(); } private: - bool _getActive() const { boost::lock_guard<boost::mutex> lk(_mutex); return _active; } - void _setActive( bool b ) { boost::lock_guard<boost::mutex> lk(_mutex); _active = b; } + bool _getActive() const { std::lock_guard<std::mutex> lk(_mutex); return _active; } + void _setActive( bool b ) { std::lock_guard<std::mutex> lk(_mutex); _active = b; } /** * Used to commit work for LogOpForSharding. Used to keep track of changes in documents @@ -740,7 +740,7 @@ namespace { virtual void commit() { switch (_op) { case 'd': { - boost::lock_guard<boost::mutex> sl(_migrateFromStatus->_mutex); + std::lock_guard<std::mutex> sl(_migrateFromStatus->_mutex); _migrateFromStatus->_deleted.push_back(_idObj); _migrateFromStatus->_memoryUsed += _idObj.firstElement().size() + 5; break; @@ -749,7 +749,7 @@ namespace { case 'i': case 'u': { - boost::lock_guard<boost::mutex> sl(_migrateFromStatus->_mutex); + std::lock_guard<std::mutex> sl(_migrateFromStatus->_mutex); _migrateFromStatus->_reload.push_back(_idObj); _migrateFromStatus->_memoryUsed += _idObj.firstElement().size() + 5; break; @@ -829,9 +829,9 @@ namespace { // // Global Lock -> _mutex -> _cloneLocsMutex - mutable mongo::mutex _mutex; + mutable std::mutex _mutex; - boost::condition _inCriticalSectionCV; // (M) + std::condition_variable _inCriticalSectionCV; // (M) // Is migration currently in critical section. This can be used to block new writes. bool _inCriticalSection; // (M) @@ -855,7 +855,7 @@ namespace { BSONObj _max; // (MG) BSONObj _shardKeyPattern; // (MG) - mutable mongo::mutex _cloneLocsMutex; + mutable std::mutex _cloneLocsMutex; // List of record id that needs to be transferred from here to the other side. set<RecordId> _cloneLocs; // (C) @@ -1155,7 +1155,7 @@ namespace { MONGO_FP_PAUSE_WHILE(moveChunkHangAtStep1); // 2. - + if ( migrateFromStatus.isActive() ) { errmsg = "migration already in progress"; warning() << errmsg; @@ -1261,7 +1261,7 @@ namespace { // 3. MigrateStatusHolder statusHolder(txn, ns, min, max, shardKeyPattern); - + if (statusHolder.isAnotherMigrationActive()) { errmsg = "moveChunk is already in progress from this shard"; warning() << errmsg; @@ -1279,7 +1279,7 @@ namespace { str::stream() << "Source shard " << fromShardName << " is missing. This indicates metadata corruption.", fromShard); - + fromShardCS = fromShard->getConnString(); std::shared_ptr<Shard> toShard = grid.shardRegistry()->findIfExists(toShardName); @@ -1630,7 +1630,7 @@ namespace { Status applyOpsStatus{Status::OK()}; try { - + // For testing migration failures if ( MONGO_FAIL_POINT(failMigrationConfigWritePrepare) ) { throw DBException( "mock migration failure before config write", @@ -1838,12 +1838,12 @@ namespace { } void setState(State newState) { - boost::lock_guard<boost::mutex> sl(_mutex); + std::lock_guard<std::mutex> sl(_mutex); _state = newState; } State getState() const { - boost::lock_guard<boost::mutex> sl(_mutex); + std::lock_guard<std::mutex> sl(_mutex); return _state; } @@ -1855,7 +1855,7 @@ namespace { const BSONObj& min, const BSONObj& max, const BSONObj& shardKeyPattern) { - boost::lock_guard<boost::mutex> lk(_mutex); + std::lock_guard<std::mutex> lk(_mutex); if (_active) { return Status(ErrorCodes::ConflictingOperationInProgress, @@ -1898,7 +1898,7 @@ namespace { } catch ( std::exception& e ) { { - boost::lock_guard<boost::mutex> sl(_mutex); + std::lock_guard<std::mutex> sl(_mutex); _state = FAIL; _errmsg = e.what(); } @@ -1907,7 +1907,7 @@ namespace { } catch ( ... ) { { - boost::lock_guard<boost::mutex> sl(_mutex); + std::lock_guard<std::mutex> sl(_mutex); _state = FAIL; _errmsg = "UNKNOWN ERROR"; } @@ -1994,9 +1994,9 @@ namespace { } } - { + { // 1. copy indexes - + vector<BSONObj> indexSpecs; { const std::list<BSONObj> indexes = conn->getIndexSpecs(ns); @@ -2192,7 +2192,7 @@ namespace { thisTime++; { - boost::lock_guard<boost::mutex> statsLock(_mutex); + std::lock_guard<std::mutex> statsLock(_mutex); _numCloned++; _clonedBytes += docToClone.objsize(); } @@ -2244,7 +2244,7 @@ namespace { break; apply(txn, ns, min, max, shardKeyPattern, res, &lastOpApplied); - + const int maxIterations = 3600*50; int i; for ( i=0;i<maxIterations; i++) { @@ -2257,10 +2257,10 @@ namespace { return; } - + if (opReplicatedEnough(txn, lastOpApplied, writeConcern)) break; - + if ( i > 100 ) { warning() << "secondaries having hard time keeping up with migrate" << migrateLog; } @@ -2274,14 +2274,14 @@ namespace { conn.done(); setState(FAIL); return; - } + } } timing.done(4); MONGO_FP_PAUSE_WHILE(migrateThreadHangAtStep4); } - { + { // pause to wait for replication // this will prevent us from going into critical section until we're ready Timer t; @@ -2340,7 +2340,7 @@ namespace { if ( getState() == ABORT ) { return; } - + // We know we're finished when: // 1) The from side has told us that it has locked writes (COMMIT_START) // 2) We've checked at least one more time for un-transmitted mods @@ -2348,7 +2348,7 @@ namespace { if (flushPendingWrites(txn, ns, min, max, lastOpApplied, writeConcern)) break; } - + // Only sleep if we aren't committing if ( getState() == STEADY ) sleepmillis( 10 ); } @@ -2367,7 +2367,7 @@ namespace { } void status(BSONObjBuilder& b) { - boost::lock_guard<boost::mutex> sl(_mutex); + std::lock_guard<std::mutex> sl(_mutex); b.appendBool("active", _active); @@ -2582,20 +2582,16 @@ namespace { } bool startCommit() { - boost::unique_lock<boost::mutex> lock(_mutex); + std::unique_lock<std::mutex> lock(_mutex); if (_state != STEADY) { return false; } - boost::xtime xt; - boost::xtime_get(&xt, MONGO_BOOST_TIME_UTC); - xt.sec += 30; - + const auto deadline = system_clock::now() + seconds(30); _state = COMMIT_START; while (_active) { - if ( ! isActiveCV.timed_wait( lock, xt ) ){ - // TIMEOUT + if (std::cv_status::timeout == isActiveCV.wait_until(lock, deadline)) { _state = FAIL; log() << "startCommit never finished!" << migrateLog; return false; @@ -2611,22 +2607,22 @@ namespace { } void abort() { - boost::lock_guard<boost::mutex> sl(_mutex); + std::lock_guard<std::mutex> sl(_mutex); _state = ABORT; _errmsg = "aborted"; } - bool getActive() const { boost::lock_guard<boost::mutex> lk(_mutex); return _active; } - void setActive( bool b ) { - boost::lock_guard<boost::mutex> lk(_mutex); + bool getActive() const { std::lock_guard<std::mutex> lk(_mutex); return _active; } + void setActive( bool b ) { + std::lock_guard<std::mutex> lk(_mutex); _active = b; - isActiveCV.notify_all(); + isActiveCV.notify_all(); } // Guards all fields. - mutable mongo::mutex _mutex; + mutable std::mutex _mutex; bool _active; - boost::condition isActiveCV; + std::condition_variable isActiveCV; std::string _ns; std::string _from; @@ -2827,7 +2823,7 @@ namespace { return appendCommandStatus(result, prepareStatus); } - boost::thread m(migrateThread, + std::thread m(migrateThread, ns, min, max, @@ -2836,6 +2832,7 @@ namespace { currentVersion.epoch(), writeConcern); + m.detach(); result.appendBool( "started" , true ); return true; } |