diff options
author | Randolph Tan <randolph@10gen.com> | 2012-12-10 16:28:37 -0500 |
---|---|---|
committer | Randolph Tan <randolph@10gen.com> | 2012-12-11 13:30:47 -0500 |
commit | 93cda0b81d954f1c0184bb15691105d90b9cb0c3 (patch) | |
tree | 739dd80d28cc32db9a5712dc22461cb9b33e333d /src/mongo/s | |
parent | b1ba986ac53f472f1b1c484ea5187a79d4af97fa (diff) | |
download | mongo-93cda0b81d954f1c0184bb15691105d90b9cb0c3.tar.gz |
SERVER-7376 migrate deletion not aggressive enough and tied to migrate moves
Put back dfb8e2604409e2879ea975c4c49ba6ffde90d795 and fixed tests.
Diffstat (limited to 'src/mongo/s')
-rw-r--r-- | src/mongo/s/balance.cpp | 26 | ||||
-rw-r--r-- | src/mongo/s/balance.h | 6 | ||||
-rw-r--r-- | src/mongo/s/chunk.cpp | 11 | ||||
-rw-r--r-- | src/mongo/s/chunk.h | 7 | ||||
-rw-r--r-- | src/mongo/s/commands_admin.cpp | 9 | ||||
-rw-r--r-- | src/mongo/s/d_migrate.cpp | 47 |
6 files changed, 69 insertions, 37 deletions
diff --git a/src/mongo/s/balance.cpp b/src/mongo/s/balance.cpp index b2a0d11c50b..eb6d9c11677 100644 --- a/src/mongo/s/balance.cpp +++ b/src/mongo/s/balance.cpp @@ -38,7 +38,10 @@ namespace mongo { Balancer::~Balancer() { } - int Balancer::_moveChunks( const vector<CandidateChunkPtr>* candidateChunks , bool secondaryThrottle ) { + int Balancer::_moveChunks(const vector<CandidateChunkPtr>* candidateChunks, + bool secondaryThrottle, + bool waitForDelete) + { int movedCount = 0; for ( vector<CandidateChunkPtr>::const_iterator it = candidateChunks->begin(); it != candidateChunks->end(); ++it ) { @@ -64,7 +67,11 @@ namespace mongo { } BSONObj res; - if ( c->moveAndCommit( Shard::make( chunkInfo.to ) , Chunk::MaxChunkSize , secondaryThrottle , res ) ) { + if (c->moveAndCommit(Shard::make(chunkInfo.to), + Chunk::MaxChunkSize, + secondaryThrottle, + waitForDelete, + res)) { movedCount++; continue; } @@ -358,6 +365,14 @@ namespace mongo { LOG(1) << "*** start balancing round" << endl; + if (balancerConfig["_waitForDelete"].trueValue()) { + LOG(1) << "balancer chunk moves will wait for cleanup" << endl; + } + + if (balancerConfig["_secondaryThrottle"].trueValue()) { + LOG(1) << "balancer chunk moves will wait for secondaries" << endl; + } + vector<CandidateChunkPtr> candidateChunks; _doBalanceRound( conn.conn() , &candidateChunks ); if ( candidateChunks.size() == 0 ) { @@ -365,10 +380,11 @@ namespace mongo { _balancedLastTime = 0; } else { - _balancedLastTime = _moveChunks( &candidateChunks, - balancerConfig[SettingsFields::secondaryThrottle()].trueValue() ); + _balancedLastTime = _moveChunks(&candidateChunks, + balancerConfig[SettingsFields::secondaryThrottle()].trueValue(), + balancerConfig["_waitForDelete"].trueValue()); } - + LOG(1) << "*** end of balancing round" << endl; } diff --git a/src/mongo/s/balance.h b/src/mongo/s/balance.h index fe55f928247..cd0f5047198 100644 --- a/src/mongo/s/balance.h +++ b/src/mongo/s/balance.h @@ -84,9 +84,13 @@ namespace mongo { * Issues chunk migration request, one at a time. * * @param candidateChunks possible chunks to move + * @param secondaryThrottle wait for secondaries to catch up before pushing more deletes + * @param waitForDelete wait for deletes to complete after each chunk move * @return number of chunks effectively moved */ - int _moveChunks( const vector<CandidateChunkPtr>* candidateChunks , bool secondaryThrottle ); + int _moveChunks(const vector<CandidateChunkPtr>* candidateChunks, + bool secondaryThrottle, + bool waitForDelete); /** * Marks this balancer as being live on the config server(s). diff --git a/src/mongo/s/chunk.cpp b/src/mongo/s/chunk.cpp index 54b98ee5c84..fbf80a4d5d0 100644 --- a/src/mongo/s/chunk.cpp +++ b/src/mongo/s/chunk.cpp @@ -301,7 +301,12 @@ namespace mongo { return true; } - bool Chunk::moveAndCommit( const Shard& to , long long chunkSize /* bytes */, bool secondaryThrottle, BSONObj& res ) const { + bool Chunk::moveAndCommit(const Shard& to, + long long chunkSize /* bytes */, + bool secondaryThrottle, + bool waitForDelete, + BSONObj& res) const + { uassert( 10167 , "can't move shard to its current location!" , getShard() != to ); log() << "moving chunk ns: " << _manager->getns() << " moving ( " << toString() << ") " << _shard.toString() << " -> " << to.toString() << endl; @@ -324,7 +329,8 @@ namespace mongo { "maxChunkSizeBytes" << chunkSize << "shardId" << genID() << "configdb" << configServer.modelServer() << - "secondaryThrottle" << secondaryThrottle + "secondaryThrottle" << secondaryThrottle << + "waitForDelete" << waitForDelete ) , res ); @@ -425,6 +431,7 @@ namespace mongo { toMove->moveAndCommit( newLocation , MaxChunkSize , false , /* secondaryThrottle - small chunk, no need */ + false, /* waitForDelete - small chunk, no need */ res ) ); // update our config diff --git a/src/mongo/s/chunk.h b/src/mongo/s/chunk.h index cdec3d32b51..86e94dbe020 100644 --- a/src/mongo/s/chunk.h +++ b/src/mongo/s/chunk.h @@ -151,10 +151,15 @@ namespace mongo { * @param to shard to move this chunk to * @param chunSize maximum number of bytes beyond which the migrate should no go trhough * @param secondaryThrottle whether during migrate all writes should block for repl + * @param waitForDelete whether chunk move should wait for cleanup or return immediately * @param res the object containing details about the migrate execution * @return true if move was successful */ - bool moveAndCommit( const Shard& to , long long chunkSize , bool secondaryThrottle, BSONObj& res ) const; + bool moveAndCommit(const Shard& to, + long long chunkSize, + bool secondaryThrottle, + bool waitForDelete, + BSONObj& res) const; /** * @return size of shard in bytes diff --git a/src/mongo/s/commands_admin.cpp b/src/mongo/s/commands_admin.cpp index 55b33e9ec91..c0043bf6309 100644 --- a/src/mongo/s/commands_admin.cpp +++ b/src/mongo/s/commands_admin.cpp @@ -697,7 +697,8 @@ namespace mongo { continue; BSONObj moveResult; - if ( ! chunk->moveAndCommit( to , Chunk::MaxChunkSize , false , moveResult ) ) { + if (!chunk->moveAndCommit(to, Chunk::MaxChunkSize, + false, true, moveResult)) { warning() << "Couldn't move chunk " << chunk << " to shard " << to << " while sharding collection " << ns << ". Reason: " << moveResult << endl; @@ -959,7 +960,11 @@ namespace mongo { tlog() << "CMD: movechunk: " << cmdObj << endl; BSONObj res; - if ( ! c->moveAndCommit( to , maxChunkSizeBytes , cmdObj["_secondaryThrottle"].trueValue() , res ) ) { + if (!c->moveAndCommit(to, + maxChunkSizeBytes, + cmdObj["_secondaryThrottle"].trueValue(), + cmdObj["_waitForDelete"].trueValue(), + res)) { errmsg = "move failed"; result.append( "cause" , res ); return false; diff --git a/src/mongo/s/d_migrate.cpp b/src/mongo/s/d_migrate.cpp index f4741c439cf..4f9eb259ae0 100644 --- a/src/mongo/s/d_migrate.cpp +++ b/src/mongo/s/d_migrate.cpp @@ -267,7 +267,8 @@ namespace mongo { class MigrateFromStatus { public: - MigrateFromStatus() : _mutex("MigrateFromStatus") , _workLock("MigrateFromStatus::workLock") { + MigrateFromStatus() : _mutex("MigrateFromStatus"), + _cleanupTickets(1) /* only one cleanup thread at once */ { _active = false; _inCriticalSection = false; _memoryUsed = 0; @@ -282,12 +283,6 @@ namespace mongo { const BSONObj& max , const BSONObj& shardKeyPattern ) { - // - // Do not hold _workLock - // - - //scoped_lock ll(_workLock); - scoped_lock l(_mutex); // reads and writes _active if (_active) { @@ -664,18 +659,12 @@ namespace mongo { bool isActive() const { return _getActive(); } void doRemove( OldDataCleanup& cleanup ) { - int it = 0; - while ( true ) { - if ( it > 20 && it % 10 == 0 ) log() << "doRemote iteration " << it << " for: " << cleanup << endl; - { - scoped_lock ll(_workLock); - if ( ! _active ) { - cleanup.doRemove(); - return; - } - } - sleepmillis( 1000 ); - } + + log() << "waiting to remove documents for " << cleanup.toString() << endl; + + ScopedTicket ticket(&_cleanupTickets); + + cleanup.doRemove(); } private: @@ -704,8 +693,8 @@ namespace mongo { list<BSONObj> _deleted; // objects deleted during clone that should be deleted later long long _memoryUsed; // bytes in _reload + _deleted - mutable mongo::mutex _workLock; // this is used to make sure only 1 thread is doing serious work - // for now, this means migrate or removing old chunk data + // this is used to make sure only a certain number of threads are doing cleanup at once. + mutable TicketHolder _cleanupTickets; bool _getActive() const { scoped_lock l(_mutex); return _active; } void _setActive( bool b ) { scoped_lock l(_mutex); _active = b; } @@ -899,6 +888,12 @@ namespace mongo { warning() << "secondaryThrottle selected but no replication" << endl; } + // Do inline deletion + bool waitForDelete = cmdObj["waitForDelete"].trueValue(); + if (waitForDelete) { + log() << "moveChunk waiting for full cleanup after move" << endl; + } + BSONObj min = cmdObj["min"].Obj(); BSONObj max = cmdObj["max"].Obj(); BSONElement shardId = cmdObj["shardId"]; @@ -1443,17 +1438,17 @@ namespace mongo { c.max = max.getOwned(); c.shardKeyPattern = shardKeyPattern.getOwned(); ClientCursor::find( ns , c.initial ); - if ( c.initial.size() ) { - log() << "forking for cleaning up chunk data" << migrateLog; + + if (!waitForDelete) { + // 7. + log() << "forking for cleanup of chunk data" << migrateLog; boost::thread t( boost::bind( &cleanupOldData , c ) ); } else { - log() << "doing delete inline" << migrateLog; // 7. + log() << "doing delete inline for cleanup of chunk data" << migrateLog; c.doRemove(); } - - } timing.done(6); |