diff options
author | Greg Studer <greg@10gen.com> | 2012-10-17 16:03:53 -0400 |
---|---|---|
committer | Greg Studer <greg@10gen.com> | 2012-10-17 16:07:07 -0400 |
commit | dfb8e2604409e2879ea975c4c49ba6ffde90d795 (patch) | |
tree | de079ca071e899b8843422aa98b3db62a518650c | |
parent | 83d73563c47df15373b95e4d4428e323d6005b55 (diff) | |
download | mongo-dfb8e2604409e2879ea975c4c49ba6ffde90d795.tar.gz |
SERVER-7376 make all migration cleanup async unless explicitly specified otherwise
-rw-r--r-- | src/mongo/s/balance.cpp | 29 | ||||
-rw-r--r-- | src/mongo/s/balance.h | 6 | ||||
-rw-r--r-- | src/mongo/s/chunk.cpp | 11 | ||||
-rw-r--r-- | src/mongo/s/chunk.h | 7 | ||||
-rw-r--r-- | src/mongo/s/commands_admin.cpp | 7 | ||||
-rw-r--r-- | src/mongo/s/d_migrate.cpp | 53 | ||||
-rw-r--r-- | src/mongo/util/concurrency/ticketholder.h | 15 |
7 files changed, 95 insertions, 33 deletions
diff --git a/src/mongo/s/balance.cpp b/src/mongo/s/balance.cpp index 248b3e4d8d5..e12ecbf58a4 100644 --- a/src/mongo/s/balance.cpp +++ b/src/mongo/s/balance.cpp @@ -40,7 +40,10 @@ namespace mongo { Balancer::~Balancer() { } - int Balancer::_moveChunks( const vector<CandidateChunkPtr>* candidateChunks , bool secondaryThrottle ) { + int Balancer::_moveChunks( const vector<CandidateChunkPtr>* candidateChunks , + bool secondaryThrottle, + bool waitForDelete ) + { int movedCount = 0; for ( vector<CandidateChunkPtr>::const_iterator it = candidateChunks->begin(); it != candidateChunks->end(); ++it ) { @@ -66,7 +69,12 @@ namespace mongo { } BSONObj res; - if ( c->moveAndCommit( Shard::make( chunkInfo.to ) , Chunk::MaxChunkSize , secondaryThrottle , res ) ) { + if ( c->moveAndCommit( Shard::make( chunkInfo.to ) , + Chunk::MaxChunkSize , + secondaryThrottle , + waitForDelete, + res ) ) + { movedCount++; continue; } @@ -330,7 +338,7 @@ namespace mongo { } sleepTime = balancerConfig["_nosleep"].trueValue() ? 30 : 6; - + uassert( 13258 , "oids broken after resetting!" , _checkOIDs() ); { @@ -349,6 +357,14 @@ namespace mongo { LOG(1) << "*** start balancing round" << endl; + if (balancerConfig["_waitForDelete"].trueValue()) { + LOG(1) << "balancer chunk moves will wait for cleanup" << endl; + } + + if (balancerConfig["_secondaryThrottle"].trueValue()) { + LOG(1) << "balancer chunk moves will wait for secondaries" << endl; + } + vector<CandidateChunkPtr> candidateChunks; _doBalanceRound( conn.conn() , &candidateChunks ); if ( candidateChunks.size() == 0 ) { @@ -356,9 +372,12 @@ namespace mongo { _balancedLastTime = 0; } else { - _balancedLastTime = _moveChunks( &candidateChunks, balancerConfig["_secondaryThrottle"].trueValue() ); + _balancedLastTime = + _moveChunks( &candidateChunks, + balancerConfig["_secondaryThrottle"].trueValue(), + balancerConfig["_waitForDelete"].trueValue()); } - + LOG(1) << "*** end of balancing round" << endl; } diff --git a/src/mongo/s/balance.h b/src/mongo/s/balance.h index 98221c5d4db..26b471913e0 100644 --- a/src/mongo/s/balance.h +++ b/src/mongo/s/balance.h @@ -82,9 +82,13 @@ namespace mongo { * Issues chunk migration request, one at a time. * * @param candidateChunks possible chunks to move + * @param secondaryThrottle wait for secondaries to catch up before pushing more deletes + * @param waitForDelete wait for deletes to complete after each chunk move * @return number of chunks effectively moved */ - int _moveChunks( const vector<CandidateChunkPtr>* candidateChunks , bool secondaryThrottle ); + int _moveChunks( const vector<CandidateChunkPtr>* candidateChunks, + bool secondaryThrottle, + bool waitForDelete ); /** * Marks this balancer as being live on the config server(s). diff --git a/src/mongo/s/chunk.cpp b/src/mongo/s/chunk.cpp index 773978014dc..eace90f9e7d 100644 --- a/src/mongo/s/chunk.cpp +++ b/src/mongo/s/chunk.cpp @@ -301,7 +301,12 @@ namespace mongo { return true; } - bool Chunk::moveAndCommit( const Shard& to , long long chunkSize /* bytes */, bool secondaryThrottle, BSONObj& res ) const { + bool Chunk::moveAndCommit( const Shard& to, + long long chunkSize /* bytes */, + bool secondaryThrottle, + bool waitForDelete, + BSONObj& res ) const + { uassert( 10167 , "can't move shard to its current location!" , getShard() != to ); log() << "moving chunk ns: " << _manager->getns() << " moving ( " << toString() << ") " << _shard.toString() << " -> " << to.toString() << endl; @@ -324,7 +329,8 @@ namespace mongo { "maxChunkSizeBytes" << chunkSize << "shardId" << genID() << "configdb" << configServer.modelServer() << - "secondaryThrottle" << secondaryThrottle + "secondaryThrottle" << secondaryThrottle << + "waitForDelete" << waitForDelete ) , res ); @@ -425,6 +431,7 @@ namespace mongo { toMove->moveAndCommit( newLocation , MaxChunkSize , false , /* secondaryThrottle - small chunk, no need */ + false , /* waitForDelete - small chunk, no need */ res ) ); // update our config diff --git a/src/mongo/s/chunk.h b/src/mongo/s/chunk.h index 5fd01c5abe4..11f910ea52c 100644 --- a/src/mongo/s/chunk.h +++ b/src/mongo/s/chunk.h @@ -146,10 +146,15 @@ namespace mongo { * @param to shard to move this chunk to * @param chunSize maximum number of bytes beyond which the migrate should no go trhough * @param secondaryThrottle whether during migrate all writes should block for repl + * @param waitForDelete whether chunk move should wait for cleanup or return immediately * @param res the object containing details about the migrate execution * @return true if move was successful */ - bool moveAndCommit( const Shard& to , long long chunkSize , bool secondaryThrottle, BSONObj& res ) const; + bool moveAndCommit( const Shard& to, + long long chunkSize, + bool secondaryThrottle, + bool waitForDelete, + BSONObj& res ) const; /** * @return size of shard in bytes diff --git a/src/mongo/s/commands_admin.cpp b/src/mongo/s/commands_admin.cpp index c155e25066d..3ccd3e58f98 100644 --- a/src/mongo/s/commands_admin.cpp +++ b/src/mongo/s/commands_admin.cpp @@ -844,7 +844,12 @@ namespace mongo { } BSONObj res; - if ( ! c->moveAndCommit( to , maxChunkSizeBytes , cmdObj["_secondaryThrottle"].trueValue() , res ) ) { + if ( ! c->moveAndCommit( to, + maxChunkSizeBytes, + cmdObj["_secondaryThrottle"].trueValue(), + cmdObj["_waitForDelete"].trueValue(), + res ) ) + { errmsg = "move failed"; result.append( "cause" , res ); return false; diff --git a/src/mongo/s/d_migrate.cpp b/src/mongo/s/d_migrate.cpp index f5d59566b71..008cd4d6b12 100644 --- a/src/mongo/s/d_migrate.cpp +++ b/src/mongo/s/d_migrate.cpp @@ -255,7 +255,9 @@ namespace mongo { class MigrateFromStatus { public: - MigrateFromStatus() : _m("MigrateFromStatus") , _workLock("MigrateFromStatus::workLock") { + MigrateFromStatus() : _m("MigrateFromStatus") , + _cleanupTickets(1) /* only one cleanup thread at once */ + { _active = false; _inCriticalSection = false; _memoryUsed = 0; @@ -265,7 +267,9 @@ namespace mongo { const BSONObj& min , const BSONObj& max , const BSONObj& shardKeyPattern ) { - scoped_lock ll(_workLock); + + // Note: migrations not blocked by queued deletes using _workLock + scoped_lock l(_m); // reads and writes _active verify( ! _active ); @@ -334,7 +338,7 @@ namespace mongo { case 'd': { - if ( getThreadName() == cleanUpThreadName ) { + if (getThreadName().find(cleanUpThreadName) == 0) { // we don't want to xfer things we're cleaning // as then they'll be deleted on TO // which is bad @@ -608,18 +612,12 @@ namespace mongo { bool isActive() const { return _getActive(); } void doRemove( OldDataCleanup& cleanup ) { - int it = 0; - while ( true ) { - if ( it > 20 && it % 10 == 0 ) log() << "doRemote iteration " << it << " for: " << cleanup << endl; - { - scoped_lock ll(_workLock); - if ( ! _active ) { - cleanup.doRemove(); - return; - } - } - sleepmillis( 1000 ); - } + + log() << "waiting to remove documents for " << cleanup.toString() << endl; + + ScopedTicket ticket(&_cleanupTickets); + + cleanup.doRemove(); } private: @@ -646,8 +644,8 @@ namespace mongo { list<BSONObj> _deleted; // objects deleted during clone that should be deleted later long long _memoryUsed; // bytes in _reload + _deleted - mutable mongo::mutex _workLock; // this is used to make sure only 1 thread is doing serious work - // for now, this means migrate or removing old chunk data + // this is used to make sure only a certain number of threads are doing cleanup at once. + mutable TicketHolder _cleanupTickets; bool _getActive() const { scoped_lock l(_m); return _active; } void _setActive( bool b ) { scoped_lock l(_m); _active = b; } @@ -667,7 +665,10 @@ namespace mongo { }; void _cleanupOldData( OldDataCleanup cleanup ) { - Client::initThread( cleanUpThreadName ); + + Client::initThread((string(cleanUpThreadName) + string("-") + + OID::gen().toString()).c_str()); + if (!noauth) { cc().getAuthenticationInfo()->authorize("local", internalSecurity.user); } @@ -808,6 +809,12 @@ namespace mongo { warning() << "secondaryThrottle selected but no replication" << endl; } + // Do inline deletion + bool waitForDelete = cmdObj["waitForDelete"].trueValue(); + if (waitForDelete) { + log() << "moveChunk waiting for full cleanup after move" << endl; + } + BSONObj min = cmdObj["min"].Obj(); BSONObj max = cmdObj["max"].Obj(); BSONElement shardId = cmdObj["shardId"]; @@ -1334,17 +1341,17 @@ namespace mongo { c.max = max.getOwned(); c.shardKeyPattern = shardKeyPattern.getOwned(); ClientCursor::find( ns , c.initial ); - if ( c.initial.size() ) { - log() << "forking for cleaning up chunk data" << migrateLog; + + if (!waitForDelete) { + // 7. + log() << "forking for cleanup of chunk data" << migrateLog; boost::thread t( boost::bind( &cleanupOldData , c ) ); } else { - log() << "doing delete inline" << migrateLog; // 7. + log() << "doing delete inline for cleanup of chunk data" << migrateLog; c.doRemove(); } - - } timing.done(6); diff --git a/src/mongo/util/concurrency/ticketholder.h b/src/mongo/util/concurrency/ticketholder.h index 036d33fb9ef..c32e283e130 100644 --- a/src/mongo/util/concurrency/ticketholder.h +++ b/src/mongo/util/concurrency/ticketholder.h @@ -95,6 +95,21 @@ namespace mongo { boost::condition_variable_any _newTicket; }; + class ScopedTicket { + public: + + ScopedTicket(TicketHolder* holder) : _holder(holder) { + _holder->waitForTicket(); + } + + ~ScopedTicket() { + _holder->release(); + } + + private: + TicketHolder* _holder; + }; + class TicketHolderReleaser { public: TicketHolderReleaser( TicketHolder * holder ) { |