summaryrefslogtreecommitdiff
path: root/src/mongo/s/d_migrate.cpp
diff options
context:
space:
mode:
authorAndy Schwerin <schwerin@mongodb.com>2015-06-11 18:32:23 -0400
committerAndy Schwerin <schwerin@mongodb.com>2015-06-16 10:36:48 -0400
commitb06861044ad1610cef2f2fa8a9e24e5b72ee7345 (patch)
treea957920ed04330a170d219a9a95f069a00c5babf /src/mongo/s/d_migrate.cpp
parente9aee4b9bc04a4aaad478aeeee561fcba970bcb5 (diff)
downloadmongo-b06861044ad1610cef2f2fa8a9e24e5b72ee7345.tar.gz
SERVER-6686 Remove all uses of boost::xtime outside of time_support.cpp.
Diffstat (limited to 'src/mongo/s/d_migrate.cpp')
-rw-r--r--src/mongo/s/d_migrate.cpp135
1 files changed, 66 insertions, 69 deletions
diff --git a/src/mongo/s/d_migrate.cpp b/src/mongo/s/d_migrate.cpp
index a74198e6464..9dffd676e7f 100644
--- a/src/mongo/s/d_migrate.cpp
+++ b/src/mongo/s/d_migrate.cpp
@@ -33,9 +33,12 @@
#include "mongo/platform/basic.h"
#include <algorithm>
-#include <boost/thread/thread.hpp>
+#include <chrono>
+#include <condition_variable>
#include <map>
+#include <mutex>
#include <string>
+#include <thread>
#include <vector>
#include "mongo/client/connpool.h"
@@ -86,7 +89,6 @@
#include "mongo/util/fail_point_service.h"
#include "mongo/util/log.h"
#include "mongo/util/processinfo.h"
-#include "mongo/util/queue.h"
#include "mongo/util/startup_test.h"
// Pause while a fail point is enabled.
@@ -94,6 +96,7 @@
namespace mongo {
+ using namespace std::chrono;
using std::list;
using std::set;
using std::string;
@@ -193,7 +196,7 @@ namespace {
CurOp * op = CurOp::get(_txn);
{
- stdx::lock_guard<Client> lk(*_txn->getClient());
+ std::lock_guard<Client> lk(*_txn->getClient());
op->setMessage_inlock(s.c_str());
}
@@ -266,7 +269,7 @@ namespace {
// Get global shared to synchronize with logOp. Also see comments in the class
// members declaration for more details.
Lock::GlobalRead globalShared(txn->lockState());
- boost::lock_guard<boost::mutex> lk(_mutex);
+ std::lock_guard<std::mutex> lk(_mutex);
if (_active) {
return false;
@@ -283,7 +286,7 @@ namespace {
_active = true;
- boost::lock_guard<boost::mutex> tLock(_cloneLocsMutex);
+ std::lock_guard<std::mutex> tLock(_cloneLocsMutex);
verify(_cloneLocs.size() == 0);
return true;
@@ -296,7 +299,7 @@ namespace {
// Get global shared to synchronize with logOp. Also see comments in the class
// members declaration for more details.
Lock::GlobalRead globalShared(txn->lockState());
- boost::lock_guard<boost::mutex> lk(_mutex);
+ std::lock_guard<std::mutex> lk(_mutex);
_active = false;
_deleteNotifyExec.reset( NULL );
@@ -307,7 +310,7 @@ namespace {
_reload.clear();
_memoryUsed = 0;
- boost::lock_guard<boost::mutex> cloneLock(_cloneLocsMutex);
+ std::lock_guard<std::mutex> cloneLock(_cloneLocsMutex);
_cloneLocs.clear();
}
@@ -435,7 +438,7 @@ namespace {
{
AutoGetCollectionForRead ctx(txn, getNS());
- boost::lock_guard<boost::mutex> sl(_mutex);
+ std::lock_guard<std::mutex> sl(_mutex);
if (!_active) {
errmsg = "no active migration!";
return false;
@@ -494,7 +497,7 @@ namespace {
// It's alright not to lock _mutex all the way through based on the assumption
// that this is only called by the main thread that drives the migration and
// only it can start and stop the current migration.
- boost::lock_guard<boost::mutex> sl(_mutex);
+ std::lock_guard<std::mutex> sl(_mutex);
invariant( _deleteNotifyExec.get() == NULL );
WorkingSet* ws = new WorkingSet();
@@ -536,7 +539,7 @@ namespace {
avgRecSize = 0;
maxRecsWhenFull = Chunk::MaxObjectPerChunk + 1;
}
-
+
// do a full traversal of the chunk and don't stop even if we think it is a large chunk
// we want the number of records to better report, in that case
bool isLargeChunk = false;
@@ -544,7 +547,7 @@ namespace {
RecordId dl;
while (PlanExecutor::ADVANCED == exec->getNext(NULL, &dl)) {
if ( ! isLargeChunk ) {
- boost::lock_guard<boost::mutex> lk(_cloneLocsMutex);
+ std::lock_guard<std::mutex> lk(_cloneLocsMutex);
_cloneLocs.insert( dl );
}
@@ -557,7 +560,7 @@ namespace {
exec.reset();
if ( isLargeChunk ) {
- boost::lock_guard<boost::mutex> sl(_mutex);
+ std::lock_guard<std::mutex> sl(_mutex);
warning() << "cannot move chunk: the maximum number of documents for a chunk is "
<< maxRecsWhenFull << " , the maximum chunk size is " << maxChunkSize
<< " , average document size is " << avgRecSize
@@ -585,7 +588,7 @@ namespace {
{
AutoGetCollectionForRead ctx(txn, getNS());
- boost::lock_guard<boost::mutex> sl(_mutex);
+ std::lock_guard<std::mutex> sl(_mutex);
if (!_active) {
errmsg = "not active";
return false;
@@ -608,7 +611,7 @@ namespace {
while (!isBufferFilled) {
AutoGetCollectionForRead ctx(txn, getNS());
- boost::lock_guard<boost::mutex> sl(_mutex);
+ std::lock_guard<std::mutex> sl(_mutex);
if (!_active) {
errmsg = "not active";
return false;
@@ -623,7 +626,7 @@ namespace {
return false;
}
- boost::lock_guard<boost::mutex> lk(_cloneLocsMutex);
+ std::lock_guard<std::mutex> lk(_cloneLocsMutex);
set<RecordId>::iterator cloneLocsIter = _cloneLocs.begin();
for ( ; cloneLocsIter != _cloneLocs.end(); ++cloneLocsIter) {
if (tracker.intervalHasElapsed()) // should I yield?
@@ -666,33 +669,33 @@ namespace {
// that check only works for non-mmapv1 engines, and this is needed
// for mmapv1.
- boost::lock_guard<boost::mutex> lk(_cloneLocsMutex);
+ std::lock_guard<std::mutex> lk(_cloneLocsMutex);
_cloneLocs.erase( dl );
}
std::size_t cloneLocsRemaining() {
- boost::lock_guard<boost::mutex> lk(_cloneLocsMutex);
+ std::lock_guard<std::mutex> lk(_cloneLocsMutex);
return _cloneLocs.size();
}
long long mbUsed() const {
- boost::lock_guard<boost::mutex> lk(_mutex);
+ std::lock_guard<std::mutex> lk(_mutex);
return _memoryUsed / ( 1024 * 1024 );
}
bool getInCriticalSection() const {
- boost::lock_guard<boost::mutex> lk(_mutex);
+ std::lock_guard<std::mutex> lk(_mutex);
return _inCriticalSection;
}
void setInCriticalSection( bool b ) {
- boost::lock_guard<boost::mutex> lk(_mutex);
+ std::lock_guard<std::mutex> lk(_mutex);
_inCriticalSection = b;
_inCriticalSectionCV.notify_all();
}
std::string getNS() const {
- boost::lock_guard<boost::mutex> sl(_mutex);
+ std::lock_guard<std::mutex> sl(_mutex);
return _ns;
}
@@ -700,13 +703,10 @@ namespace {
* @return true if we are NOT in the critical section
*/
bool waitTillNotInCriticalSection( int maxSecondsToWait ) {
- boost::xtime xt;
- boost::xtime_get(&xt, MONGO_BOOST_TIME_UTC);
- xt.sec += maxSecondsToWait;
-
- boost::unique_lock<boost::mutex> lk(_mutex);
+ const auto deadline = system_clock::now() + seconds(maxSecondsToWait);
+ std::unique_lock<std::mutex> lk(_mutex);
while (_inCriticalSection) {
- if (!_inCriticalSectionCV.timed_wait(lk, xt))
+ if (std::cv_status::timeout == _inCriticalSectionCV.wait_until(lk, deadline))
return false;
}
@@ -716,8 +716,8 @@ namespace {
bool isActive() const { return _getActive(); }
private:
- bool _getActive() const { boost::lock_guard<boost::mutex> lk(_mutex); return _active; }
- void _setActive( bool b ) { boost::lock_guard<boost::mutex> lk(_mutex); _active = b; }
+ bool _getActive() const { std::lock_guard<std::mutex> lk(_mutex); return _active; }
+ void _setActive( bool b ) { std::lock_guard<std::mutex> lk(_mutex); _active = b; }
/**
* Used to commit work for LogOpForSharding. Used to keep track of changes in documents
@@ -740,7 +740,7 @@ namespace {
virtual void commit() {
switch (_op) {
case 'd': {
- boost::lock_guard<boost::mutex> sl(_migrateFromStatus->_mutex);
+ std::lock_guard<std::mutex> sl(_migrateFromStatus->_mutex);
_migrateFromStatus->_deleted.push_back(_idObj);
_migrateFromStatus->_memoryUsed += _idObj.firstElement().size() + 5;
break;
@@ -749,7 +749,7 @@ namespace {
case 'i':
case 'u':
{
- boost::lock_guard<boost::mutex> sl(_migrateFromStatus->_mutex);
+ std::lock_guard<std::mutex> sl(_migrateFromStatus->_mutex);
_migrateFromStatus->_reload.push_back(_idObj);
_migrateFromStatus->_memoryUsed += _idObj.firstElement().size() + 5;
break;
@@ -829,9 +829,9 @@ namespace {
//
// Global Lock -> _mutex -> _cloneLocsMutex
- mutable mongo::mutex _mutex;
+ mutable std::mutex _mutex;
- boost::condition _inCriticalSectionCV; // (M)
+ std::condition_variable _inCriticalSectionCV; // (M)
// Is migration currently in critical section. This can be used to block new writes.
bool _inCriticalSection; // (M)
@@ -855,7 +855,7 @@ namespace {
BSONObj _max; // (MG)
BSONObj _shardKeyPattern; // (MG)
- mutable mongo::mutex _cloneLocsMutex;
+ mutable std::mutex _cloneLocsMutex;
// List of record id that needs to be transferred from here to the other side.
set<RecordId> _cloneLocs; // (C)
@@ -1155,7 +1155,7 @@ namespace {
MONGO_FP_PAUSE_WHILE(moveChunkHangAtStep1);
// 2.
-
+
if ( migrateFromStatus.isActive() ) {
errmsg = "migration already in progress";
warning() << errmsg;
@@ -1261,7 +1261,7 @@ namespace {
// 3.
MigrateStatusHolder statusHolder(txn, ns, min, max, shardKeyPattern);
-
+
if (statusHolder.isAnotherMigrationActive()) {
errmsg = "moveChunk is already in progress from this shard";
warning() << errmsg;
@@ -1279,7 +1279,7 @@ namespace {
str::stream() << "Source shard " << fromShardName
<< " is missing. This indicates metadata corruption.",
fromShard);
-
+
fromShardCS = fromShard->getConnString();
std::shared_ptr<Shard> toShard = grid.shardRegistry()->findIfExists(toShardName);
@@ -1630,7 +1630,7 @@ namespace {
Status applyOpsStatus{Status::OK()};
try {
-
+
// For testing migration failures
if ( MONGO_FAIL_POINT(failMigrationConfigWritePrepare) ) {
throw DBException( "mock migration failure before config write",
@@ -1838,12 +1838,12 @@ namespace {
}
void setState(State newState) {
- boost::lock_guard<boost::mutex> sl(_mutex);
+ std::lock_guard<std::mutex> sl(_mutex);
_state = newState;
}
State getState() const {
- boost::lock_guard<boost::mutex> sl(_mutex);
+ std::lock_guard<std::mutex> sl(_mutex);
return _state;
}
@@ -1855,7 +1855,7 @@ namespace {
const BSONObj& min,
const BSONObj& max,
const BSONObj& shardKeyPattern) {
- boost::lock_guard<boost::mutex> lk(_mutex);
+ std::lock_guard<std::mutex> lk(_mutex);
if (_active) {
return Status(ErrorCodes::ConflictingOperationInProgress,
@@ -1898,7 +1898,7 @@ namespace {
}
catch ( std::exception& e ) {
{
- boost::lock_guard<boost::mutex> sl(_mutex);
+ std::lock_guard<std::mutex> sl(_mutex);
_state = FAIL;
_errmsg = e.what();
}
@@ -1907,7 +1907,7 @@ namespace {
}
catch ( ... ) {
{
- boost::lock_guard<boost::mutex> sl(_mutex);
+ std::lock_guard<std::mutex> sl(_mutex);
_state = FAIL;
_errmsg = "UNKNOWN ERROR";
}
@@ -1994,9 +1994,9 @@ namespace {
}
}
- {
+ {
// 1. copy indexes
-
+
vector<BSONObj> indexSpecs;
{
const std::list<BSONObj> indexes = conn->getIndexSpecs(ns);
@@ -2192,7 +2192,7 @@ namespace {
thisTime++;
{
- boost::lock_guard<boost::mutex> statsLock(_mutex);
+ std::lock_guard<std::mutex> statsLock(_mutex);
_numCloned++;
_clonedBytes += docToClone.objsize();
}
@@ -2244,7 +2244,7 @@ namespace {
break;
apply(txn, ns, min, max, shardKeyPattern, res, &lastOpApplied);
-
+
const int maxIterations = 3600*50;
int i;
for ( i=0;i<maxIterations; i++) {
@@ -2257,10 +2257,10 @@ namespace {
return;
}
-
+
if (opReplicatedEnough(txn, lastOpApplied, writeConcern))
break;
-
+
if ( i > 100 ) {
warning() << "secondaries having hard time keeping up with migrate" << migrateLog;
}
@@ -2274,14 +2274,14 @@ namespace {
conn.done();
setState(FAIL);
return;
- }
+ }
}
timing.done(4);
MONGO_FP_PAUSE_WHILE(migrateThreadHangAtStep4);
}
- {
+ {
// pause to wait for replication
// this will prevent us from going into critical section until we're ready
Timer t;
@@ -2340,7 +2340,7 @@ namespace {
if ( getState() == ABORT ) {
return;
}
-
+
// We know we're finished when:
// 1) The from side has told us that it has locked writes (COMMIT_START)
// 2) We've checked at least one more time for un-transmitted mods
@@ -2348,7 +2348,7 @@ namespace {
if (flushPendingWrites(txn, ns, min, max, lastOpApplied, writeConcern))
break;
}
-
+
// Only sleep if we aren't committing
if ( getState() == STEADY ) sleepmillis( 10 );
}
@@ -2367,7 +2367,7 @@ namespace {
}
void status(BSONObjBuilder& b) {
- boost::lock_guard<boost::mutex> sl(_mutex);
+ std::lock_guard<std::mutex> sl(_mutex);
b.appendBool("active", _active);
@@ -2582,20 +2582,16 @@ namespace {
}
bool startCommit() {
- boost::unique_lock<boost::mutex> lock(_mutex);
+ std::unique_lock<std::mutex> lock(_mutex);
if (_state != STEADY) {
return false;
}
- boost::xtime xt;
- boost::xtime_get(&xt, MONGO_BOOST_TIME_UTC);
- xt.sec += 30;
-
+ const auto deadline = system_clock::now() + seconds(30);
_state = COMMIT_START;
while (_active) {
- if ( ! isActiveCV.timed_wait( lock, xt ) ){
- // TIMEOUT
+ if (std::cv_status::timeout == isActiveCV.wait_until(lock, deadline)) {
_state = FAIL;
log() << "startCommit never finished!" << migrateLog;
return false;
@@ -2611,22 +2607,22 @@ namespace {
}
void abort() {
- boost::lock_guard<boost::mutex> sl(_mutex);
+ std::lock_guard<std::mutex> sl(_mutex);
_state = ABORT;
_errmsg = "aborted";
}
- bool getActive() const { boost::lock_guard<boost::mutex> lk(_mutex); return _active; }
- void setActive( bool b ) {
- boost::lock_guard<boost::mutex> lk(_mutex);
+ bool getActive() const { std::lock_guard<std::mutex> lk(_mutex); return _active; }
+ void setActive( bool b ) {
+ std::lock_guard<std::mutex> lk(_mutex);
_active = b;
- isActiveCV.notify_all();
+ isActiveCV.notify_all();
}
// Guards all fields.
- mutable mongo::mutex _mutex;
+ mutable std::mutex _mutex;
bool _active;
- boost::condition isActiveCV;
+ std::condition_variable isActiveCV;
std::string _ns;
std::string _from;
@@ -2827,7 +2823,7 @@ namespace {
return appendCommandStatus(result, prepareStatus);
}
- boost::thread m(migrateThread,
+ std::thread m(migrateThread,
ns,
min,
max,
@@ -2836,6 +2832,7 @@ namespace {
currentVersion.epoch(),
writeConcern);
+ m.detach();
result.appendBool( "started" , true );
return true;
}