summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGreg Studer <greg@10gen.com>2012-10-17 16:03:53 -0400
committerGreg Studer <greg@10gen.com>2012-10-17 16:07:07 -0400
commitdfb8e2604409e2879ea975c4c49ba6ffde90d795 (patch)
treede079ca071e899b8843422aa98b3db62a518650c
parent83d73563c47df15373b95e4d4428e323d6005b55 (diff)
downloadmongo-dfb8e2604409e2879ea975c4c49ba6ffde90d795.tar.gz
SERVER-7376 make all migration cleanup async unless explicitly specified otherwise
-rw-r--r--src/mongo/s/balance.cpp29
-rw-r--r--src/mongo/s/balance.h6
-rw-r--r--src/mongo/s/chunk.cpp11
-rw-r--r--src/mongo/s/chunk.h7
-rw-r--r--src/mongo/s/commands_admin.cpp7
-rw-r--r--src/mongo/s/d_migrate.cpp53
-rw-r--r--src/mongo/util/concurrency/ticketholder.h15
7 files changed, 95 insertions, 33 deletions
diff --git a/src/mongo/s/balance.cpp b/src/mongo/s/balance.cpp
index 248b3e4d8d5..e12ecbf58a4 100644
--- a/src/mongo/s/balance.cpp
+++ b/src/mongo/s/balance.cpp
@@ -40,7 +40,10 @@ namespace mongo {
Balancer::~Balancer() {
}
- int Balancer::_moveChunks( const vector<CandidateChunkPtr>* candidateChunks , bool secondaryThrottle ) {
+ int Balancer::_moveChunks( const vector<CandidateChunkPtr>* candidateChunks ,
+ bool secondaryThrottle,
+ bool waitForDelete )
+ {
int movedCount = 0;
for ( vector<CandidateChunkPtr>::const_iterator it = candidateChunks->begin(); it != candidateChunks->end(); ++it ) {
@@ -66,7 +69,12 @@ namespace mongo {
}
BSONObj res;
- if ( c->moveAndCommit( Shard::make( chunkInfo.to ) , Chunk::MaxChunkSize , secondaryThrottle , res ) ) {
+ if ( c->moveAndCommit( Shard::make( chunkInfo.to ) ,
+ Chunk::MaxChunkSize ,
+ secondaryThrottle ,
+ waitForDelete,
+ res ) )
+ {
movedCount++;
continue;
}
@@ -330,7 +338,7 @@ namespace mongo {
}
sleepTime = balancerConfig["_nosleep"].trueValue() ? 30 : 6;
-
+
uassert( 13258 , "oids broken after resetting!" , _checkOIDs() );
{
@@ -349,6 +357,14 @@ namespace mongo {
LOG(1) << "*** start balancing round" << endl;
+ if (balancerConfig["_waitForDelete"].trueValue()) {
+ LOG(1) << "balancer chunk moves will wait for cleanup" << endl;
+ }
+
+ if (balancerConfig["_secondaryThrottle"].trueValue()) {
+ LOG(1) << "balancer chunk moves will wait for secondaries" << endl;
+ }
+
vector<CandidateChunkPtr> candidateChunks;
_doBalanceRound( conn.conn() , &candidateChunks );
if ( candidateChunks.size() == 0 ) {
@@ -356,9 +372,12 @@ namespace mongo {
_balancedLastTime = 0;
}
else {
- _balancedLastTime = _moveChunks( &candidateChunks, balancerConfig["_secondaryThrottle"].trueValue() );
+ _balancedLastTime =
+ _moveChunks( &candidateChunks,
+ balancerConfig["_secondaryThrottle"].trueValue(),
+ balancerConfig["_waitForDelete"].trueValue());
}
-
+
LOG(1) << "*** end of balancing round" << endl;
}
diff --git a/src/mongo/s/balance.h b/src/mongo/s/balance.h
index 98221c5d4db..26b471913e0 100644
--- a/src/mongo/s/balance.h
+++ b/src/mongo/s/balance.h
@@ -82,9 +82,13 @@ namespace mongo {
* Issues chunk migration request, one at a time.
*
* @param candidateChunks possible chunks to move
+ * @param secondaryThrottle wait for secondaries to catch up before pushing more deletes
+ * @param waitForDelete wait for deletes to complete after each chunk move
* @return number of chunks effectively moved
*/
- int _moveChunks( const vector<CandidateChunkPtr>* candidateChunks , bool secondaryThrottle );
+ int _moveChunks( const vector<CandidateChunkPtr>* candidateChunks,
+ bool secondaryThrottle,
+ bool waitForDelete );
/**
* Marks this balancer as being live on the config server(s).
diff --git a/src/mongo/s/chunk.cpp b/src/mongo/s/chunk.cpp
index 773978014dc..eace90f9e7d 100644
--- a/src/mongo/s/chunk.cpp
+++ b/src/mongo/s/chunk.cpp
@@ -301,7 +301,12 @@ namespace mongo {
return true;
}
- bool Chunk::moveAndCommit( const Shard& to , long long chunkSize /* bytes */, bool secondaryThrottle, BSONObj& res ) const {
+ bool Chunk::moveAndCommit( const Shard& to,
+ long long chunkSize /* bytes */,
+ bool secondaryThrottle,
+ bool waitForDelete,
+ BSONObj& res ) const
+ {
uassert( 10167 , "can't move shard to its current location!" , getShard() != to );
log() << "moving chunk ns: " << _manager->getns() << " moving ( " << toString() << ") " << _shard.toString() << " -> " << to.toString() << endl;
@@ -324,7 +329,8 @@ namespace mongo {
"maxChunkSizeBytes" << chunkSize <<
"shardId" << genID() <<
"configdb" << configServer.modelServer() <<
- "secondaryThrottle" << secondaryThrottle
+ "secondaryThrottle" << secondaryThrottle <<
+ "waitForDelete" << waitForDelete
) ,
res
);
@@ -425,6 +431,7 @@ namespace mongo {
toMove->moveAndCommit( newLocation ,
MaxChunkSize ,
false , /* secondaryThrottle - small chunk, no need */
+ false , /* waitForDelete - small chunk, no need */
res ) );
// update our config
diff --git a/src/mongo/s/chunk.h b/src/mongo/s/chunk.h
index 5fd01c5abe4..11f910ea52c 100644
--- a/src/mongo/s/chunk.h
+++ b/src/mongo/s/chunk.h
@@ -146,10 +146,15 @@ namespace mongo {
* @param to shard to move this chunk to
* @param chunSize maximum number of bytes beyond which the migrate should no go trhough
* @param secondaryThrottle whether during migrate all writes should block for repl
+ * @param waitForDelete whether chunk move should wait for cleanup or return immediately
* @param res the object containing details about the migrate execution
* @return true if move was successful
*/
- bool moveAndCommit( const Shard& to , long long chunkSize , bool secondaryThrottle, BSONObj& res ) const;
+ bool moveAndCommit( const Shard& to,
+ long long chunkSize,
+ bool secondaryThrottle,
+ bool waitForDelete,
+ BSONObj& res ) const;
/**
* @return size of shard in bytes
diff --git a/src/mongo/s/commands_admin.cpp b/src/mongo/s/commands_admin.cpp
index c155e25066d..3ccd3e58f98 100644
--- a/src/mongo/s/commands_admin.cpp
+++ b/src/mongo/s/commands_admin.cpp
@@ -844,7 +844,12 @@ namespace mongo {
}
BSONObj res;
- if ( ! c->moveAndCommit( to , maxChunkSizeBytes , cmdObj["_secondaryThrottle"].trueValue() , res ) ) {
+ if ( ! c->moveAndCommit( to,
+ maxChunkSizeBytes,
+ cmdObj["_secondaryThrottle"].trueValue(),
+ cmdObj["_waitForDelete"].trueValue(),
+ res ) )
+ {
errmsg = "move failed";
result.append( "cause" , res );
return false;
diff --git a/src/mongo/s/d_migrate.cpp b/src/mongo/s/d_migrate.cpp
index f5d59566b71..008cd4d6b12 100644
--- a/src/mongo/s/d_migrate.cpp
+++ b/src/mongo/s/d_migrate.cpp
@@ -255,7 +255,9 @@ namespace mongo {
class MigrateFromStatus {
public:
- MigrateFromStatus() : _m("MigrateFromStatus") , _workLock("MigrateFromStatus::workLock") {
+ MigrateFromStatus() : _m("MigrateFromStatus") ,
+ _cleanupTickets(1) /* only one cleanup thread at once */
+ {
_active = false;
_inCriticalSection = false;
_memoryUsed = 0;
@@ -265,7 +267,9 @@ namespace mongo {
const BSONObj& min ,
const BSONObj& max ,
const BSONObj& shardKeyPattern ) {
- scoped_lock ll(_workLock);
+
+ // Note: migrations not blocked by queued deletes using _workLock
+
scoped_lock l(_m); // reads and writes _active
verify( ! _active );
@@ -334,7 +338,7 @@ namespace mongo {
case 'd': {
- if ( getThreadName() == cleanUpThreadName ) {
+ if (getThreadName().find(cleanUpThreadName) == 0) {
// we don't want to xfer things we're cleaning
// as then they'll be deleted on TO
// which is bad
@@ -608,18 +612,12 @@ namespace mongo {
bool isActive() const { return _getActive(); }
void doRemove( OldDataCleanup& cleanup ) {
- int it = 0;
- while ( true ) {
- if ( it > 20 && it % 10 == 0 ) log() << "doRemote iteration " << it << " for: " << cleanup << endl;
- {
- scoped_lock ll(_workLock);
- if ( ! _active ) {
- cleanup.doRemove();
- return;
- }
- }
- sleepmillis( 1000 );
- }
+
+ log() << "waiting to remove documents for " << cleanup.toString() << endl;
+
+ ScopedTicket ticket(&_cleanupTickets);
+
+ cleanup.doRemove();
}
private:
@@ -646,8 +644,8 @@ namespace mongo {
list<BSONObj> _deleted; // objects deleted during clone that should be deleted later
long long _memoryUsed; // bytes in _reload + _deleted
- mutable mongo::mutex _workLock; // this is used to make sure only 1 thread is doing serious work
- // for now, this means migrate or removing old chunk data
+ // this is used to make sure only a certain number of threads are doing cleanup at once.
+ mutable TicketHolder _cleanupTickets;
bool _getActive() const { scoped_lock l(_m); return _active; }
void _setActive( bool b ) { scoped_lock l(_m); _active = b; }
@@ -667,7 +665,10 @@ namespace mongo {
};
void _cleanupOldData( OldDataCleanup cleanup ) {
- Client::initThread( cleanUpThreadName );
+
+ Client::initThread((string(cleanUpThreadName) + string("-") +
+ OID::gen().toString()).c_str());
+
if (!noauth) {
cc().getAuthenticationInfo()->authorize("local", internalSecurity.user);
}
@@ -808,6 +809,12 @@ namespace mongo {
warning() << "secondaryThrottle selected but no replication" << endl;
}
+ // Do inline deletion
+ bool waitForDelete = cmdObj["waitForDelete"].trueValue();
+ if (waitForDelete) {
+ log() << "moveChunk waiting for full cleanup after move" << endl;
+ }
+
BSONObj min = cmdObj["min"].Obj();
BSONObj max = cmdObj["max"].Obj();
BSONElement shardId = cmdObj["shardId"];
@@ -1334,17 +1341,17 @@ namespace mongo {
c.max = max.getOwned();
c.shardKeyPattern = shardKeyPattern.getOwned();
ClientCursor::find( ns , c.initial );
- if ( c.initial.size() ) {
- log() << "forking for cleaning up chunk data" << migrateLog;
+
+ if (!waitForDelete) {
+ // 7.
+ log() << "forking for cleanup of chunk data" << migrateLog;
boost::thread t( boost::bind( &cleanupOldData , c ) );
}
else {
- log() << "doing delete inline" << migrateLog;
// 7.
+ log() << "doing delete inline for cleanup of chunk data" << migrateLog;
c.doRemove();
}
-
-
}
timing.done(6);
diff --git a/src/mongo/util/concurrency/ticketholder.h b/src/mongo/util/concurrency/ticketholder.h
index 036d33fb9ef..c32e283e130 100644
--- a/src/mongo/util/concurrency/ticketholder.h
+++ b/src/mongo/util/concurrency/ticketholder.h
@@ -95,6 +95,21 @@ namespace mongo {
boost::condition_variable_any _newTicket;
};
+ class ScopedTicket {
+ public:
+
+ ScopedTicket(TicketHolder* holder) : _holder(holder) {
+ _holder->waitForTicket();
+ }
+
+ ~ScopedTicket() {
+ _holder->release();
+ }
+
+ private:
+ TicketHolder* _holder;
+ };
+
class TicketHolderReleaser {
public:
TicketHolderReleaser( TicketHolder * holder ) {