summaryrefslogtreecommitdiff
path: root/src/mongo/s/d_migrate.cpp
diff options
context:
space:
mode:
authorMark Benvenuto <mark.benvenuto@mongodb.com>2015-06-20 00:22:50 -0400
committerMark Benvenuto <mark.benvenuto@mongodb.com>2015-06-20 10:56:02 -0400
commit9c2ed42daa8fbbef4a919c21ec564e2db55e8d60 (patch)
tree3814f79c10d7b490948d8cb7b112ac1dd41ceff1 /src/mongo/s/d_migrate.cpp
parent01965cf52bce6976637ecb8f4a622aeb05ab256a (diff)
downloadmongo-9c2ed42daa8fbbef4a919c21ec564e2db55e8d60.tar.gz
SERVER-18579: Clang-Format - reformat code, no comment reflow
Diffstat (limited to 'src/mongo/s/d_migrate.cpp')
-rw-r--r--src/mongo/s/d_migrate.cpp4627
1 files changed, 2290 insertions, 2337 deletions
diff --git a/src/mongo/s/d_migrate.cpp b/src/mongo/s/d_migrate.cpp
index 8ed12b4d082..a0893be2524 100644
--- a/src/mongo/s/d_migrate.cpp
+++ b/src/mongo/s/d_migrate.cpp
@@ -91,653 +91,645 @@
#include "mongo/util/startup_test.h"
// Pause while a fail point is enabled.
-#define MONGO_FP_PAUSE_WHILE(symbol) while (MONGO_FAIL_POINT(symbol)) { sleepmillis(100); }
+#define MONGO_FP_PAUSE_WHILE(symbol) \
+ while (MONGO_FAIL_POINT(symbol)) { \
+ sleepmillis(100); \
+ }
namespace mongo {
- using namespace std::chrono;
- using std::list;
- using std::set;
- using std::string;
- using std::vector;
+using namespace std::chrono;
+using std::list;
+using std::set;
+using std::string;
+using std::vector;
namespace {
- const int kDefaultWTimeoutMs = 60 * 1000;
- const WriteConcernOptions DefaultWriteConcern(2, WriteConcernOptions::NONE, kDefaultWTimeoutMs);
-
- /**
- * Returns the default write concern for migration cleanup (at donor shard) and
- * cloning documents (at recipient shard).
- */
- WriteConcernOptions getDefaultWriteConcern() {
- repl::ReplicationCoordinator* replCoordinator = repl::getGlobalReplicationCoordinator();
- if (replCoordinator->getReplicationMode() ==
- mongo::repl::ReplicationCoordinator::modeReplSet) {
+const int kDefaultWTimeoutMs = 60 * 1000;
+const WriteConcernOptions DefaultWriteConcern(2, WriteConcernOptions::NONE, kDefaultWTimeoutMs);
- Status status =
- replCoordinator->checkIfWriteConcernCanBeSatisfied(DefaultWriteConcern);
- if (status.isOK()) {
- return DefaultWriteConcern;
- }
+/**
+ * Returns the default write concern for migration cleanup (at donor shard) and
+ * cloning documents (at recipient shard).
+ */
+WriteConcernOptions getDefaultWriteConcern() {
+ repl::ReplicationCoordinator* replCoordinator = repl::getGlobalReplicationCoordinator();
+ if (replCoordinator->getReplicationMode() == mongo::repl::ReplicationCoordinator::modeReplSet) {
+ Status status = replCoordinator->checkIfWriteConcernCanBeSatisfied(DefaultWriteConcern);
+ if (status.isOK()) {
+ return DefaultWriteConcern;
}
-
- return WriteConcernOptions(1, WriteConcernOptions::NONE, 0);
}
-} // namespace
-
- MONGO_FP_DECLARE(failMigrationCommit);
- MONGO_FP_DECLARE(failMigrationConfigWritePrepare);
- MONGO_FP_DECLARE(failMigrationApplyOps);
-
- Tee* migrateLog = RamLog::get("migrate");
+ return WriteConcernOptions(1, WriteConcernOptions::NONE, 0);
+}
- class MoveTimingHelper {
- public:
- MoveTimingHelper(OperationContext* txn,
- const string& where,
- const string& ns,
- BSONObj min,
- BSONObj max ,
- int total,
- string* cmdErrmsg,
- string toShard,
- string fromShard)
- : _txn(txn),
- _where(where),
- _ns(ns),
- _to(toShard),
- _from(fromShard),
- _next(0),
- _total(total),
- _cmdErrmsg(cmdErrmsg) {
-
- _b.append( "min" , min );
- _b.append( "max" , max );
- }
-
- ~MoveTimingHelper() {
- // even if logChange doesn't throw, bson does
- // sigh
- try {
- if ( !_to.empty() ){
- _b.append( "to", _to );
- }
- if ( !_from.empty() ){
- _b.append( "from", _from );
- }
- if ( _next != _total ) {
- _b.append( "note" , "aborted" );
- }
- else {
- _b.append( "note" , "success" );
- }
- if ( !_cmdErrmsg->empty() ) {
- _b.append( "errmsg" , *_cmdErrmsg );
- }
+} // namespace
+
+MONGO_FP_DECLARE(failMigrationCommit);
+MONGO_FP_DECLARE(failMigrationConfigWritePrepare);
+MONGO_FP_DECLARE(failMigrationApplyOps);
+
+Tee* migrateLog = RamLog::get("migrate");
+
+class MoveTimingHelper {
+public:
+ MoveTimingHelper(OperationContext* txn,
+ const string& where,
+ const string& ns,
+ BSONObj min,
+ BSONObj max,
+ int total,
+ string* cmdErrmsg,
+ string toShard,
+ string fromShard)
+ : _txn(txn),
+ _where(where),
+ _ns(ns),
+ _to(toShard),
+ _from(fromShard),
+ _next(0),
+ _total(total),
+ _cmdErrmsg(cmdErrmsg) {
+ _b.append("min", min);
+ _b.append("max", max);
+ }
- grid.catalogManager()->logChange(_txn,
- (string)"moveChunk." + _where,
- _ns,
- _b.obj());
+ ~MoveTimingHelper() {
+ // even if logChange doesn't throw, bson does
+ // sigh
+ try {
+ if (!_to.empty()) {
+ _b.append("to", _to);
}
- catch ( const std::exception& e ) {
- warning() << "couldn't record timing for moveChunk '" << _where << "': " << e.what() << migrateLog;
+ if (!_from.empty()) {
+ _b.append("from", _from);
+ }
+ if (_next != _total) {
+ _b.append("note", "aborted");
+ } else {
+ _b.append("note", "success");
+ }
+ if (!_cmdErrmsg->empty()) {
+ _b.append("errmsg", *_cmdErrmsg);
}
- }
- void done(int step) {
- invariant(step == ++_next);
- invariant(step <= _total);
+ grid.catalogManager()->logChange(_txn, (string) "moveChunk." + _where, _ns, _b.obj());
+ } catch (const std::exception& e) {
+ warning() << "couldn't record timing for moveChunk '" << _where << "': " << e.what()
+ << migrateLog;
+ }
+ }
- const string s = str::stream() << "step " << step << " of " << _total;
+ void done(int step) {
+ invariant(step == ++_next);
+ invariant(step <= _total);
- CurOp * op = CurOp::get(_txn);
- {
- std::lock_guard<Client> lk(*_txn->getClient());
- op->setMessage_inlock(s.c_str());
- }
+ const string s = str::stream() << "step " << step << " of " << _total;
- _b.appendNumber(s, _t.millis());
- _t.reset();
+ CurOp* op = CurOp::get(_txn);
+ {
+ std::lock_guard<Client> lk(*_txn->getClient());
+ op->setMessage_inlock(s.c_str());
}
- private:
- OperationContext* const _txn;
- Timer _t;
+ _b.appendNumber(s, _t.millis());
+ _t.reset();
+ }
- string _where;
- string _ns;
- string _to;
- string _from;
+private:
+ OperationContext* const _txn;
+ Timer _t;
- int _next;
- int _total; // expected # of steps
+ string _where;
+ string _ns;
+ string _to;
+ string _from;
- const string* _cmdErrmsg;
+ int _next;
+ int _total; // expected # of steps
- BSONObjBuilder _b;
- };
+ const string* _cmdErrmsg;
- class ChunkCommandHelper : public Command {
- public:
- ChunkCommandHelper( const char * name )
- : Command( name ) {
- }
+ BSONObjBuilder _b;
+};
- virtual void help( std::stringstream& help ) const {
- help << "internal - should not be called directly";
- }
- virtual bool slaveOk() const { return false; }
- virtual bool adminOnly() const { return true; }
- virtual bool isWriteCommandForConfigServer() const { return false; }
+class ChunkCommandHelper : public Command {
+public:
+ ChunkCommandHelper(const char* name) : Command(name) {}
- };
-
- bool isInRange( const BSONObj& obj ,
- const BSONObj& min ,
- const BSONObj& max ,
- const BSONObj& shardKeyPattern ) {
- ShardKeyPattern shardKey( shardKeyPattern );
- BSONObj k = shardKey.extractShardKeyFromDoc( obj );
- return k.woCompare( min ) >= 0 && k.woCompare( max ) < 0;
+ virtual void help(std::stringstream& help) const {
+ help << "internal - should not be called directly";
}
+ virtual bool slaveOk() const {
+ return false;
+ }
+ virtual bool adminOnly() const {
+ return true;
+ }
+ virtual bool isWriteCommandForConfigServer() const {
+ return false;
+ }
+};
+
+bool isInRange(const BSONObj& obj,
+ const BSONObj& min,
+ const BSONObj& max,
+ const BSONObj& shardKeyPattern) {
+ ShardKeyPattern shardKey(shardKeyPattern);
+ BSONObj k = shardKey.extractShardKeyFromDoc(obj);
+ return k.woCompare(min) >= 0 && k.woCompare(max) < 0;
+}
- class MigrateFromStatus {
- public:
- MigrateFromStatus():
- _inCriticalSection(false),
- _memoryUsed(0),
- _active(false) {
- }
-
- /**
- * @return false if cannot start. One of the reason for not being able to
- * start is there is already an existing migration in progress.
- */
- bool start(OperationContext* txn,
- const std::string& ns,
- const BSONObj& min,
- const BSONObj& max,
- const BSONObj& shardKeyPattern) {
- verify(!min.isEmpty());
- verify(!max.isEmpty());
- verify(!ns.empty());
+class MigrateFromStatus {
+public:
+ MigrateFromStatus() : _inCriticalSection(false), _memoryUsed(0), _active(false) {}
- // Get global shared to synchronize with logOp. Also see comments in the class
- // members declaration for more details.
- Lock::GlobalRead globalShared(txn->lockState());
- std::lock_guard<std::mutex> lk(_mutex);
+ /**
+ * @return false if cannot start. One of the reason for not being able to
+ * start is there is already an existing migration in progress.
+ */
+ bool start(OperationContext* txn,
+ const std::string& ns,
+ const BSONObj& min,
+ const BSONObj& max,
+ const BSONObj& shardKeyPattern) {
+ verify(!min.isEmpty());
+ verify(!max.isEmpty());
+ verify(!ns.empty());
+
+ // Get global shared to synchronize with logOp. Also see comments in the class
+ // members declaration for more details.
+ Lock::GlobalRead globalShared(txn->lockState());
+ std::lock_guard<std::mutex> lk(_mutex);
+
+ if (_active) {
+ return false;
+ }
- if (_active) {
- return false;
- }
+ _ns = ns;
+ _min = min;
+ _max = max;
+ _shardKeyPattern = shardKeyPattern;
- _ns = ns;
- _min = min;
- _max = max;
- _shardKeyPattern = shardKeyPattern;
+ verify(_deleted.size() == 0);
+ verify(_reload.size() == 0);
+ verify(_memoryUsed == 0);
- verify(_deleted.size() == 0);
- verify(_reload.size() == 0);
- verify(_memoryUsed == 0);
+ _active = true;
- _active = true;
+ std::lock_guard<std::mutex> tLock(_cloneLocsMutex);
+ verify(_cloneLocs.size() == 0);
- std::lock_guard<std::mutex> tLock(_cloneLocsMutex);
- verify(_cloneLocs.size() == 0);
+ return true;
+ }
- return true;
- }
+ void done(OperationContext* txn) {
+ log() << "MigrateFromStatus::done About to acquire global lock to exit critical "
+ "section";
- void done(OperationContext* txn) {
- log() << "MigrateFromStatus::done About to acquire global lock to exit critical "
- "section";
+ // Get global shared to synchronize with logOp. Also see comments in the class
+ // members declaration for more details.
+ Lock::GlobalRead globalShared(txn->lockState());
+ std::lock_guard<std::mutex> lk(_mutex);
- // Get global shared to synchronize with logOp. Also see comments in the class
- // members declaration for more details.
- Lock::GlobalRead globalShared(txn->lockState());
- std::lock_guard<std::mutex> lk(_mutex);
+ _active = false;
+ _deleteNotifyExec.reset(NULL);
+ _inCriticalSection = false;
+ _inCriticalSectionCV.notify_all();
- _active = false;
- _deleteNotifyExec.reset( NULL );
- _inCriticalSection = false;
- _inCriticalSectionCV.notify_all();
+ _deleted.clear();
+ _reload.clear();
+ _memoryUsed = 0;
- _deleted.clear();
- _reload.clear();
- _memoryUsed = 0;
+ std::lock_guard<std::mutex> cloneLock(_cloneLocsMutex);
+ _cloneLocs.clear();
+ }
- std::lock_guard<std::mutex> cloneLock(_cloneLocsMutex);
- _cloneLocs.clear();
+ void logOp(OperationContext* txn,
+ const char* opstr,
+ const char* ns,
+ const BSONObj& obj,
+ BSONObj* patt,
+ bool notInActiveChunk) {
+ ensureShardVersionOKOrThrow(txn->getClient(), ns);
+
+ const char op = opstr[0];
+
+ if (notInActiveChunk) {
+ // Ignore writes that came from the migration process like cleanup so they
+ // won't be transferred to the recipient shard. Also ignore ops from
+ // _migrateClone and _transferMods since it is impossible to move a chunk
+ // to self.
+ return;
}
- void logOp(OperationContext* txn,
- const char* opstr,
- const char* ns,
- const BSONObj& obj,
- BSONObj* patt,
- bool notInActiveChunk) {
- ensureShardVersionOKOrThrow(txn->getClient(), ns);
+ dassert(txn->lockState()->isWriteLocked()); // Must have Global IX.
- const char op = opstr[0];
+ if (!_active)
+ return;
- if (notInActiveChunk) {
- // Ignore writes that came from the migration process like cleanup so they
- // won't be transferred to the recipient shard. Also ignore ops from
- // _migrateClone and _transferMods since it is impossible to move a chunk
- // to self.
- return;
- }
+ if (_ns != ns)
+ return;
- dassert(txn->lockState()->isWriteLocked()); // Must have Global IX.
+ // no need to log if this is not an insertion, an update, or an actual deletion
+ // note: opstr 'db' isn't a deletion but a mention that a database exists
+ // (for replication machinery mostly).
+ if (op == 'n' || op == 'c' || (op == 'd' && opstr[1] == 'b'))
+ return;
- if (!_active)
- return;
+ BSONElement ide;
+ if (patt)
+ ide = patt->getField("_id");
+ else
+ ide = obj["_id"];
- if (_ns != ns)
- return;
+ if (ide.eoo()) {
+ warning() << "logOpForSharding got mod with no _id, ignoring obj: " << obj
+ << migrateLog;
+ return;
+ }
- // no need to log if this is not an insertion, an update, or an actual deletion
- // note: opstr 'db' isn't a deletion but a mention that a database exists
- // (for replication machinery mostly).
- if (op == 'n' || op == 'c' || (op == 'd' && opstr[1] == 'b'))
- return;
+ if (op == 'i' && (!isInRange(obj, _min, _max, _shardKeyPattern))) {
+ return;
+ }
- BSONElement ide;
- if (patt)
- ide = patt->getField("_id");
- else
- ide = obj["_id"];
+ BSONObj idObj(ide.wrap());
- if (ide.eoo()) {
- warning() << "logOpForSharding got mod with no _id, ignoring obj: "
- << obj << migrateLog;
+ if (op == 'u') {
+ BSONObj fullDoc;
+ OldClientContext ctx(txn, _ns, false);
+ if (!Helpers::findById(txn, ctx.db(), _ns.c_str(), idObj, fullDoc)) {
+ warning() << "logOpForSharding couldn't find: " << idObj
+ << " even though should have" << migrateLog;
+ dassert(false); // TODO: Abort the migration.
return;
}
- if (op == 'i' && (!isInRange(obj, _min, _max, _shardKeyPattern))) {
+ if (!isInRange(fullDoc, _min, _max, _shardKeyPattern)) {
return;
}
+ }
- BSONObj idObj(ide.wrap());
+ // Note: can't check if delete is in active chunk since the document is gone!
- if (op == 'u') {
- BSONObj fullDoc;
- OldClientContext ctx(txn, _ns, false);
- if (!Helpers::findById(txn, ctx.db(), _ns.c_str(), idObj, fullDoc)) {
- warning() << "logOpForSharding couldn't find: " << idObj
- << " even though should have" << migrateLog;
- dassert(false); // TODO: Abort the migration.
- return;
- }
+ txn->recoveryUnit()->registerChange(new LogOpForShardingHandler(this, idObj, op));
+ }
- if (!isInRange(fullDoc, _min, _max, _shardKeyPattern)) {
- return;
+ /**
+ * Insert items from docIdList to a new array with the given fieldName in the given
+ * builder. If explode is true, the inserted object will be the full version of the
+ * document. Note that the whenever an item from the docList is inserted to the array,
+ * it will also be removed from docList.
+ *
+ * Should be holding the collection lock for ns if explode is true.
+ */
+ void xfer(OperationContext* txn,
+ const string& ns,
+ Database* db,
+ list<BSONObj>* docIdList,
+ BSONObjBuilder& builder,
+ const char* fieldName,
+ long long& size,
+ bool explode) {
+ const long long maxSize = 1024 * 1024;
+
+ if (docIdList->size() == 0 || size > maxSize)
+ return;
+
+ BSONArrayBuilder arr(builder.subarrayStart(fieldName));
+
+ list<BSONObj>::iterator docIdIter = docIdList->begin();
+ while (docIdIter != docIdList->end() && size < maxSize) {
+ BSONObj idDoc = *docIdIter;
+ if (explode) {
+ BSONObj fullDoc;
+ if (Helpers::findById(txn, db, ns.c_str(), idDoc, fullDoc)) {
+ arr.append(fullDoc);
+ size += fullDoc.objsize();
}
+ } else {
+ arr.append(idDoc);
+ size += idDoc.objsize();
}
- // Note: can't check if delete is in active chunk since the document is gone!
-
- txn->recoveryUnit()->registerChange(new LogOpForShardingHandler(this, idObj, op));
+ docIdIter = docIdList->erase(docIdIter);
}
- /**
- * Insert items from docIdList to a new array with the given fieldName in the given
- * builder. If explode is true, the inserted object will be the full version of the
- * document. Note that the whenever an item from the docList is inserted to the array,
- * it will also be removed from docList.
- *
- * Should be holding the collection lock for ns if explode is true.
- */
- void xfer(OperationContext* txn,
- const string& ns,
- Database* db,
- list<BSONObj> *docIdList,
- BSONObjBuilder& builder,
- const char* fieldName,
- long long& size,
- bool explode) {
- const long long maxSize = 1024 * 1024;
-
- if (docIdList->size() == 0 || size > maxSize)
- return;
+ arr.done();
+ }
- BSONArrayBuilder arr(builder.subarrayStart(fieldName));
+ /**
+ * called from the dest of a migrate
+ * transfers mods from src to dest
+ */
+ bool transferMods(OperationContext* txn, string& errmsg, BSONObjBuilder& b) {
+ long long size = 0;
- list<BSONObj>::iterator docIdIter = docIdList->begin();
- while (docIdIter != docIdList->end() && size < maxSize) {
- BSONObj idDoc = *docIdIter;
- if (explode) {
- BSONObj fullDoc;
- if (Helpers::findById(txn, db, ns.c_str(), idDoc, fullDoc)) {
- arr.append( fullDoc );
- size += fullDoc.objsize();
- }
- }
- else {
- arr.append(idDoc);
- size += idDoc.objsize();
- }
+ {
+ AutoGetCollectionForRead ctx(txn, getNS());
- docIdIter = docIdList->erase(docIdIter);
+ std::lock_guard<std::mutex> sl(_mutex);
+ if (!_active) {
+ errmsg = "no active migration!";
+ return false;
}
- arr.done();
+ // TODO: fix SERVER-16540 race
+ xfer(txn, _ns, ctx.getDb(), &_deleted, b, "deleted", size, false);
+ xfer(txn, _ns, ctx.getDb(), &_reload, b, "reload", size, true);
}
- /**
- * called from the dest of a migrate
- * transfers mods from src to dest
- */
- bool transferMods(OperationContext* txn, string& errmsg, BSONObjBuilder& b) {
- long long size = 0;
-
- {
- AutoGetCollectionForRead ctx(txn, getNS());
-
- std::lock_guard<std::mutex> sl(_mutex);
- if (!_active) {
- errmsg = "no active migration!";
- return false;
- }
+ b.append("size", size);
- // TODO: fix SERVER-16540 race
- xfer(txn, _ns, ctx.getDb(), &_deleted, b, "deleted", size, false);
- xfer(txn, _ns, ctx.getDb(), &_reload, b, "reload", size, true);
- }
-
- b.append( "size" , size );
+ return true;
+ }
- return true;
+ /**
+ * Get the disklocs that belong to the chunk migrated and sort them in _cloneLocs
+ * (to avoid seeking disk later).
+ *
+ * @param maxChunkSize number of bytes beyond which a chunk's base data (no indices)
+ * is considered too large to move.
+ * @param errmsg filled with textual description of error if this call return false.
+ * @return false if approximate chunk size is too big to move or true otherwise.
+ */
+ bool storeCurrentLocs(OperationContext* txn,
+ long long maxChunkSize,
+ string& errmsg,
+ BSONObjBuilder& result) {
+ AutoGetCollectionForRead ctx(txn, getNS());
+ Collection* collection = ctx.getCollection();
+ if (!collection) {
+ errmsg = "ns not found, should be impossible";
+ return false;
}
- /**
- * Get the disklocs that belong to the chunk migrated and sort them in _cloneLocs
- * (to avoid seeking disk later).
- *
- * @param maxChunkSize number of bytes beyond which a chunk's base data (no indices)
- * is considered too large to move.
- * @param errmsg filled with textual description of error if this call return false.
- * @return false if approximate chunk size is too big to move or true otherwise.
- */
- bool storeCurrentLocs(OperationContext* txn,
- long long maxChunkSize,
- string& errmsg,
- BSONObjBuilder& result ) {
- AutoGetCollectionForRead ctx(txn, getNS());
- Collection* collection = ctx.getCollection();
- if ( !collection ) {
- errmsg = "ns not found, should be impossible";
- return false;
- }
-
- // Allow multiKey based on the invariant that shard keys must be single-valued.
- // Therefore, any multi-key index prefixed by shard key cannot be multikey over
- // the shard key fields.
- IndexDescriptor *idx =
- collection->getIndexCatalog()->findShardKeyPrefixedIndex(txn,
- _shardKeyPattern ,
- false); // requireSingleKey
+ // Allow multiKey based on the invariant that shard keys must be single-valued.
+ // Therefore, any multi-key index prefixed by shard key cannot be multikey over
+ // the shard key fields.
+ IndexDescriptor* idx =
+ collection->getIndexCatalog()->findShardKeyPrefixedIndex(txn,
+ _shardKeyPattern,
+ false); // requireSingleKey
+
+ if (idx == NULL) {
+ errmsg = str::stream() << "can't find index with prefix " << _shardKeyPattern
+ << " in storeCurrentLocs for " << _ns;
+ return false;
+ }
- if (idx == NULL) {
- errmsg = str::stream() << "can't find index with prefix " << _shardKeyPattern
- << " in storeCurrentLocs for " << _ns;
- return false;
- }
+ // Assume both min and max non-empty, append MinKey's to make them fit chosen index
+ BSONObj min;
+ BSONObj max;
+ KeyPattern kp(idx->keyPattern());
- // Assume both min and max non-empty, append MinKey's to make them fit chosen index
- BSONObj min;
- BSONObj max;
- KeyPattern kp(idx->keyPattern());
+ {
+ // It's alright not to lock _mutex all the way through based on the assumption
+ // that this is only called by the main thread that drives the migration and
+ // only it can start and stop the current migration.
+ std::lock_guard<std::mutex> sl(_mutex);
- {
- // It's alright not to lock _mutex all the way through based on the assumption
- // that this is only called by the main thread that drives the migration and
- // only it can start and stop the current migration.
- std::lock_guard<std::mutex> sl(_mutex);
+ invariant(_deleteNotifyExec.get() == NULL);
+ WorkingSet* ws = new WorkingSet();
+ DeleteNotificationStage* dns = new DeleteNotificationStage();
+ PlanExecutor* deleteNotifyExec;
+ // Takes ownership of 'ws' and 'dns'.
+ Status execStatus = PlanExecutor::make(
+ txn, ws, dns, collection, PlanExecutor::YIELD_MANUAL, &deleteNotifyExec);
+ invariant(execStatus.isOK());
+ deleteNotifyExec->registerExec();
+ _deleteNotifyExec.reset(deleteNotifyExec);
+
+ min = Helpers::toKeyFormat(kp.extendRangeBound(_min, false));
+ max = Helpers::toKeyFormat(kp.extendRangeBound(_max, false));
+ }
- invariant( _deleteNotifyExec.get() == NULL );
- WorkingSet* ws = new WorkingSet();
- DeleteNotificationStage* dns = new DeleteNotificationStage();
- PlanExecutor* deleteNotifyExec;
- // Takes ownership of 'ws' and 'dns'.
- Status execStatus = PlanExecutor::make(txn,
- ws,
- dns,
- collection,
- PlanExecutor::YIELD_MANUAL,
- &deleteNotifyExec);
- invariant(execStatus.isOK());
- deleteNotifyExec->registerExec();
- _deleteNotifyExec.reset(deleteNotifyExec);
-
- min = Helpers::toKeyFormat(kp.extendRangeBound(_min, false));
- max = Helpers::toKeyFormat(kp.extendRangeBound(_max, false));
- }
-
- std::unique_ptr<PlanExecutor> exec(
- InternalPlanner::indexScan(txn, collection, idx, min, max, false));
- // We can afford to yield here because any change to the base data that we might
- // miss is already being queued and will migrate in the 'transferMods' stage.
- exec->setYieldPolicy(PlanExecutor::YIELD_AUTO);
-
- // use the average object size to estimate how many objects a full chunk would carry
- // do that while traversing the chunk's range using the sharding index, below
- // there's a fair amount of slack before we determine a chunk is too large because object sizes will vary
- unsigned long long maxRecsWhenFull;
- long long avgRecSize;
- const long long totalRecs = collection->numRecords(txn);
- if ( totalRecs > 0 ) {
- avgRecSize = collection->dataSize(txn) / totalRecs;
- maxRecsWhenFull = maxChunkSize / avgRecSize;
- maxRecsWhenFull = std::min((unsigned long long)(Chunk::MaxObjectPerChunk + 1) , 130 * maxRecsWhenFull / 100 /* slack */ );
- }
- else {
- avgRecSize = 0;
- maxRecsWhenFull = Chunk::MaxObjectPerChunk + 1;
- }
-
- // do a full traversal of the chunk and don't stop even if we think it is a large chunk
- // we want the number of records to better report, in that case
- bool isLargeChunk = false;
- unsigned long long recCount = 0;;
- RecordId dl;
- while (PlanExecutor::ADVANCED == exec->getNext(NULL, &dl)) {
- if ( ! isLargeChunk ) {
- std::lock_guard<std::mutex> lk(_cloneLocsMutex);
- _cloneLocs.insert( dl );
- }
+ std::unique_ptr<PlanExecutor> exec(
+ InternalPlanner::indexScan(txn, collection, idx, min, max, false));
+ // We can afford to yield here because any change to the base data that we might
+ // miss is already being queued and will migrate in the 'transferMods' stage.
+ exec->setYieldPolicy(PlanExecutor::YIELD_AUTO);
+
+ // use the average object size to estimate how many objects a full chunk would carry
+ // do that while traversing the chunk's range using the sharding index, below
+ // there's a fair amount of slack before we determine a chunk is too large because object sizes will vary
+ unsigned long long maxRecsWhenFull;
+ long long avgRecSize;
+ const long long totalRecs = collection->numRecords(txn);
+ if (totalRecs > 0) {
+ avgRecSize = collection->dataSize(txn) / totalRecs;
+ maxRecsWhenFull = maxChunkSize / avgRecSize;
+ maxRecsWhenFull = std::min((unsigned long long)(Chunk::MaxObjectPerChunk + 1),
+ 130 * maxRecsWhenFull / 100 /* slack */);
+ } else {
+ avgRecSize = 0;
+ maxRecsWhenFull = Chunk::MaxObjectPerChunk + 1;
+ }
- if ( ++recCount > maxRecsWhenFull ) {
- isLargeChunk = true;
- // continue on despite knowing that it will fail,
- // just to get the correct value for recCount.
- }
+ // do a full traversal of the chunk and don't stop even if we think it is a large chunk
+ // we want the number of records to better report, in that case
+ bool isLargeChunk = false;
+ unsigned long long recCount = 0;
+ ;
+ RecordId dl;
+ while (PlanExecutor::ADVANCED == exec->getNext(NULL, &dl)) {
+ if (!isLargeChunk) {
+ std::lock_guard<std::mutex> lk(_cloneLocsMutex);
+ _cloneLocs.insert(dl);
}
- exec.reset();
- if ( isLargeChunk ) {
- std::lock_guard<std::mutex> sl(_mutex);
- warning() << "cannot move chunk: the maximum number of documents for a chunk is "
- << maxRecsWhenFull << " , the maximum chunk size is " << maxChunkSize
- << " , average document size is " << avgRecSize
- << ". Found " << recCount << " documents in chunk "
- << " ns: " << _ns << " "
- << _min << " -> " << _max << migrateLog;
-
- result.appendBool( "chunkTooBig" , true );
- result.appendNumber( "estimatedChunkSize" , (long long)(recCount * avgRecSize) );
- errmsg = "chunk too big to move";
- return false;
+ if (++recCount > maxRecsWhenFull) {
+ isLargeChunk = true;
+ // continue on despite knowing that it will fail,
+ // just to get the correct value for recCount.
}
+ }
+ exec.reset();
- log() << "moveChunk number of documents: " << cloneLocsRemaining() << migrateLog;
-
- txn->recoveryUnit()->abandonSnapshot();
- return true;
+ if (isLargeChunk) {
+ std::lock_guard<std::mutex> sl(_mutex);
+ warning() << "cannot move chunk: the maximum number of documents for a chunk is "
+ << maxRecsWhenFull << " , the maximum chunk size is " << maxChunkSize
+ << " , average document size is " << avgRecSize << ". Found " << recCount
+ << " documents in chunk "
+ << " ns: " << _ns << " " << _min << " -> " << _max << migrateLog;
+
+ result.appendBool("chunkTooBig", true);
+ result.appendNumber("estimatedChunkSize", (long long)(recCount * avgRecSize));
+ errmsg = "chunk too big to move";
+ return false;
}
- bool clone(OperationContext* txn, string& errmsg , BSONObjBuilder& result ) {
- ElapsedTracker tracker(internalQueryExecYieldIterations,
- internalQueryExecYieldPeriodMS);
+ log() << "moveChunk number of documents: " << cloneLocsRemaining() << migrateLog;
- int allocSize = 0;
- {
- AutoGetCollectionForRead ctx(txn, getNS());
+ txn->recoveryUnit()->abandonSnapshot();
+ return true;
+ }
- std::lock_guard<std::mutex> sl(_mutex);
- if (!_active) {
- errmsg = "not active";
- return false;
- }
+ bool clone(OperationContext* txn, string& errmsg, BSONObjBuilder& result) {
+ ElapsedTracker tracker(internalQueryExecYieldIterations, internalQueryExecYieldPeriodMS);
- Collection* collection = ctx.getCollection();
- if (!collection) {
- errmsg = str::stream() << "collection " << _ns << " does not exist";
- return false;
- }
+ int allocSize = 0;
+ {
+ AutoGetCollectionForRead ctx(txn, getNS());
- allocSize =
- std::min(BSONObjMaxUserSize,
- static_cast<int>((12 + collection->averageObjectSize(txn)) *
- cloneLocsRemaining()));
+ std::lock_guard<std::mutex> sl(_mutex);
+ if (!_active) {
+ errmsg = "not active";
+ return false;
}
- bool isBufferFilled = false;
- BSONArrayBuilder clonedDocsArrayBuilder(allocSize);
- while (!isBufferFilled) {
- AutoGetCollectionForRead ctx(txn, getNS());
+ Collection* collection = ctx.getCollection();
+ if (!collection) {
+ errmsg = str::stream() << "collection " << _ns << " does not exist";
+ return false;
+ }
- std::lock_guard<std::mutex> sl(_mutex);
- if (!_active) {
- errmsg = "not active";
- return false;
- }
+ allocSize = std::min(
+ BSONObjMaxUserSize,
+ static_cast<int>((12 + collection->averageObjectSize(txn)) * cloneLocsRemaining()));
+ }
- // TODO: fix SERVER-16540 race
+ bool isBufferFilled = false;
+ BSONArrayBuilder clonedDocsArrayBuilder(allocSize);
+ while (!isBufferFilled) {
+ AutoGetCollectionForRead ctx(txn, getNS());
- Collection* collection = ctx.getCollection();
+ std::lock_guard<std::mutex> sl(_mutex);
+ if (!_active) {
+ errmsg = "not active";
+ return false;
+ }
- if (!collection) {
- errmsg = str::stream() << "collection " << _ns << " does not exist";
- return false;
- }
+ // TODO: fix SERVER-16540 race
- std::lock_guard<std::mutex> lk(_cloneLocsMutex);
- set<RecordId>::iterator cloneLocsIter = _cloneLocs.begin();
- for ( ; cloneLocsIter != _cloneLocs.end(); ++cloneLocsIter) {
- if (tracker.intervalHasElapsed()) // should I yield?
- break;
+ Collection* collection = ctx.getCollection();
- RecordId dl = *cloneLocsIter;
- Snapshotted<BSONObj> doc;
- if (!collection->findDoc(txn, dl, &doc)) {
- // doc was deleted
- continue;
- }
+ if (!collection) {
+ errmsg = str::stream() << "collection " << _ns << " does not exist";
+ return false;
+ }
- // Use the builder size instead of accumulating 'doc's size so that we take
- // into consideration the overhead of BSONArray indices, and *always*
- // append one doc.
- if (clonedDocsArrayBuilder.arrSize() != 0 &&
- (clonedDocsArrayBuilder.len() + doc.value().objsize() + 1024)
- > BSONObjMaxUserSize) {
- isBufferFilled = true; // break out of outer while loop
- break;
- }
+ std::lock_guard<std::mutex> lk(_cloneLocsMutex);
+ set<RecordId>::iterator cloneLocsIter = _cloneLocs.begin();
+ for (; cloneLocsIter != _cloneLocs.end(); ++cloneLocsIter) {
+ if (tracker.intervalHasElapsed()) // should I yield?
+ break;
- clonedDocsArrayBuilder.append(doc.value());
+ RecordId dl = *cloneLocsIter;
+ Snapshotted<BSONObj> doc;
+ if (!collection->findDoc(txn, dl, &doc)) {
+ // doc was deleted
+ continue;
}
- _cloneLocs.erase(_cloneLocs.begin(), cloneLocsIter);
-
- // Note: must be holding _cloneLocsMutex, don't move this inside while condition!
- if (_cloneLocs.empty()) {
+ // Use the builder size instead of accumulating 'doc's size so that we take
+ // into consideration the overhead of BSONArray indices, and *always*
+ // append one doc.
+ if (clonedDocsArrayBuilder.arrSize() != 0 &&
+ (clonedDocsArrayBuilder.len() + doc.value().objsize() + 1024) >
+ BSONObjMaxUserSize) {
+ isBufferFilled = true; // break out of outer while loop
break;
}
+
+ clonedDocsArrayBuilder.append(doc.value());
}
- result.appendArray("objects", clonedDocsArrayBuilder.arr());
- return true;
+ _cloneLocs.erase(_cloneLocs.begin(), cloneLocsIter);
+
+ // Note: must be holding _cloneLocsMutex, don't move this inside while condition!
+ if (_cloneLocs.empty()) {
+ break;
+ }
}
- void aboutToDelete( const RecordId& dl ) {
- // Even though above we call findDoc to check for existance
- // that check only works for non-mmapv1 engines, and this is needed
- // for mmapv1.
+ result.appendArray("objects", clonedDocsArrayBuilder.arr());
+ return true;
+ }
- std::lock_guard<std::mutex> lk(_cloneLocsMutex);
- _cloneLocs.erase( dl );
- }
+ void aboutToDelete(const RecordId& dl) {
+ // Even though above we call findDoc to check for existance
+ // that check only works for non-mmapv1 engines, and this is needed
+ // for mmapv1.
- std::size_t cloneLocsRemaining() {
- std::lock_guard<std::mutex> lk(_cloneLocsMutex);
- return _cloneLocs.size();
- }
+ std::lock_guard<std::mutex> lk(_cloneLocsMutex);
+ _cloneLocs.erase(dl);
+ }
- long long mbUsed() const {
- std::lock_guard<std::mutex> lk(_mutex);
- return _memoryUsed / ( 1024 * 1024 );
- }
+ std::size_t cloneLocsRemaining() {
+ std::lock_guard<std::mutex> lk(_cloneLocsMutex);
+ return _cloneLocs.size();
+ }
- bool getInCriticalSection() const {
- std::lock_guard<std::mutex> lk(_mutex);
- return _inCriticalSection;
- }
+ long long mbUsed() const {
+ std::lock_guard<std::mutex> lk(_mutex);
+ return _memoryUsed / (1024 * 1024);
+ }
- void setInCriticalSection( bool b ) {
- std::lock_guard<std::mutex> lk(_mutex);
- _inCriticalSection = b;
- _inCriticalSectionCV.notify_all();
- }
+ bool getInCriticalSection() const {
+ std::lock_guard<std::mutex> lk(_mutex);
+ return _inCriticalSection;
+ }
- std::string getNS() const {
- std::lock_guard<std::mutex> sl(_mutex);
- return _ns;
- }
+ void setInCriticalSection(bool b) {
+ std::lock_guard<std::mutex> lk(_mutex);
+ _inCriticalSection = b;
+ _inCriticalSectionCV.notify_all();
+ }
- /**
- * @return true if we are NOT in the critical section
- */
- bool waitTillNotInCriticalSection( int maxSecondsToWait ) {
- const auto deadline = system_clock::now() + seconds(maxSecondsToWait);
- std::unique_lock<std::mutex> lk(_mutex);
- while (_inCriticalSection) {
- if (std::cv_status::timeout == _inCriticalSectionCV.wait_until(lk, deadline))
- return false;
- }
+ std::string getNS() const {
+ std::lock_guard<std::mutex> sl(_mutex);
+ return _ns;
+ }
- return true;
+ /**
+ * @return true if we are NOT in the critical section
+ */
+ bool waitTillNotInCriticalSection(int maxSecondsToWait) {
+ const auto deadline = system_clock::now() + seconds(maxSecondsToWait);
+ std::unique_lock<std::mutex> lk(_mutex);
+ while (_inCriticalSection) {
+ if (std::cv_status::timeout == _inCriticalSectionCV.wait_until(lk, deadline))
+ return false;
}
- bool isActive() const { return _getActive(); }
+ return true;
+ }
- private:
- bool _getActive() const { std::lock_guard<std::mutex> lk(_mutex); return _active; }
- void _setActive( bool b ) { std::lock_guard<std::mutex> lk(_mutex); _active = b; }
+ bool isActive() const {
+ return _getActive();
+ }
+
+private:
+ bool _getActive() const {
+ std::lock_guard<std::mutex> lk(_mutex);
+ return _active;
+ }
+ void _setActive(bool b) {
+ std::lock_guard<std::mutex> lk(_mutex);
+ _active = b;
+ }
+ /**
+ * Used to commit work for LogOpForSharding. Used to keep track of changes in documents
+ * that are part of a chunk being migrated.
+ */
+ class LogOpForShardingHandler : public RecoveryUnit::Change {
+ public:
/**
- * Used to commit work for LogOpForSharding. Used to keep track of changes in documents
- * that are part of a chunk being migrated.
+ * Invariant: idObj should belong to a document that is part of the active chunk
+ * being migrated.
*/
- class LogOpForShardingHandler : public RecoveryUnit::Change {
- public:
- /**
- * Invariant: idObj should belong to a document that is part of the active chunk
- * being migrated.
- */
- LogOpForShardingHandler(MigrateFromStatus* migrateFromStatus,
- const BSONObj& idObj,
- const char op):
- _migrateFromStatus(migrateFromStatus),
- _idObj(idObj.getOwned()),
- _op(op) {
- }
-
- virtual void commit() {
- switch (_op) {
+ LogOpForShardingHandler(MigrateFromStatus* migrateFromStatus,
+ const BSONObj& idObj,
+ const char op)
+ : _migrateFromStatus(migrateFromStatus), _idObj(idObj.getOwned()), _op(op) {}
+
+ virtual void commit() {
+ switch (_op) {
case 'd': {
std::lock_guard<std::mutex> sl(_migrateFromStatus->_mutex);
_migrateFromStatus->_deleted.push_back(_idObj);
@@ -746,8 +738,7 @@ namespace {
}
case 'i':
- case 'u':
- {
+ case 'u': {
std::lock_guard<std::mutex> sl(_migrateFromStatus->_mutex);
_migrateFromStatus->_reload.push_back(_idObj);
_migrateFromStatus->_memoryUsed += _idObj.firstElement().size() + 5;
@@ -756,2184 +747,2146 @@ namespace {
default:
invariant(false);
-
- }
}
+ }
- virtual void rollback() { }
+ virtual void rollback() {}
- private:
- MigrateFromStatus* _migrateFromStatus;
- const BSONObj _idObj;
- const char _op;
- };
+ private:
+ MigrateFromStatus* _migrateFromStatus;
+ const BSONObj _idObj;
+ const char _op;
+ };
- /**
- * Used to receive invalidation notifications.
- *
- * XXX: move to the exec/ directory.
- */
- class DeleteNotificationStage : public PlanStage {
- public:
- virtual void invalidate(OperationContext* txn,
- const RecordId& dl,
- InvalidationType type);
+ /**
+ * Used to receive invalidation notifications.
+ *
+ * XXX: move to the exec/ directory.
+ */
+ class DeleteNotificationStage : public PlanStage {
+ public:
+ virtual void invalidate(OperationContext* txn, const RecordId& dl, InvalidationType type);
- virtual StageState work(WorkingSetID* out) {
- invariant( false );
- }
- virtual bool isEOF() {
- invariant( false );
- return false;
- }
- virtual void kill() {
- }
- virtual void saveState() {
- invariant( false );
- }
- virtual void restoreState(OperationContext* opCtx) {
- invariant( false );
- }
- virtual PlanStageStats* getStats() {
- invariant( false );
- return NULL;
- }
- virtual CommonStats* getCommonStats() const {
- invariant( false );
- return NULL;
- }
- virtual SpecificStats* getSpecificStats() const {
- invariant( false );
- return NULL;
- }
- virtual std::vector<PlanStage*> getChildren() const {
- vector<PlanStage*> empty;
- return empty;
- }
- virtual StageType stageType() const {
- return STAGE_NOTIFY_DELETE;
- }
- };
+ virtual StageState work(WorkingSetID* out) {
+ invariant(false);
+ }
+ virtual bool isEOF() {
+ invariant(false);
+ return false;
+ }
+ virtual void kill() {}
+ virtual void saveState() {
+ invariant(false);
+ }
+ virtual void restoreState(OperationContext* opCtx) {
+ invariant(false);
+ }
+ virtual PlanStageStats* getStats() {
+ invariant(false);
+ return NULL;
+ }
+ virtual CommonStats* getCommonStats() const {
+ invariant(false);
+ return NULL;
+ }
+ virtual SpecificStats* getSpecificStats() const {
+ invariant(false);
+ return NULL;
+ }
+ virtual std::vector<PlanStage*> getChildren() const {
+ vector<PlanStage*> empty;
+ return empty;
+ }
+ virtual StageType stageType() const {
+ return STAGE_NOTIFY_DELETE;
+ }
+ };
- //
- // All member variables are labeled with one of the following codes indicating the
- // synchronization rules for accessing them.
- //
- // (M) Must hold _mutex for access.
- // (MG) For reads, _mutex *OR* Global IX Lock must be held.
- // For writes, the _mutex *AND* (Global Shared or Exclusive Lock) must be held.
- // (C) Must hold _cloneLocsMutex for access.
- //
- // Locking order:
- //
- // Global Lock -> _mutex -> _cloneLocsMutex
+ //
+ // All member variables are labeled with one of the following codes indicating the
+ // synchronization rules for accessing them.
+ //
+ // (M) Must hold _mutex for access.
+ // (MG) For reads, _mutex *OR* Global IX Lock must be held.
+ // For writes, the _mutex *AND* (Global Shared or Exclusive Lock) must be held.
+ // (C) Must hold _cloneLocsMutex for access.
+ //
+ // Locking order:
+ //
+ // Global Lock -> _mutex -> _cloneLocsMutex
- mutable std::mutex _mutex;
+ mutable std::mutex _mutex;
- std::condition_variable _inCriticalSectionCV; // (M)
+ std::condition_variable _inCriticalSectionCV; // (M)
- // Is migration currently in critical section. This can be used to block new writes.
- bool _inCriticalSection; // (M)
+ // Is migration currently in critical section. This can be used to block new writes.
+ bool _inCriticalSection; // (M)
- std::unique_ptr<PlanExecutor> _deleteNotifyExec; // (M)
+ std::unique_ptr<PlanExecutor> _deleteNotifyExec; // (M)
- // List of _id of documents that were modified that must be re-cloned.
- list<BSONObj> _reload; // (M)
+ // List of _id of documents that were modified that must be re-cloned.
+ list<BSONObj> _reload; // (M)
- // List of _id of documents that were deleted during clone that should be deleted later.
- list<BSONObj> _deleted; // (M)
+ // List of _id of documents that were deleted during clone that should be deleted later.
+ list<BSONObj> _deleted; // (M)
- // bytes in _reload + _deleted
- long long _memoryUsed; // (M)
+ // bytes in _reload + _deleted
+ long long _memoryUsed; // (M)
- // If a migration is currently active.
- bool _active; // (MG)
+ // If a migration is currently active.
+ bool _active; // (MG)
- string _ns; // (MG)
- BSONObj _min; // (MG)
- BSONObj _max; // (MG)
- BSONObj _shardKeyPattern; // (MG)
+ string _ns; // (MG)
+ BSONObj _min; // (MG)
+ BSONObj _max; // (MG)
+ BSONObj _shardKeyPattern; // (MG)
- mutable std::mutex _cloneLocsMutex;
+ mutable std::mutex _cloneLocsMutex;
- // List of record id that needs to be transferred from here to the other side.
- set<RecordId> _cloneLocs; // (C)
+ // List of record id that needs to be transferred from here to the other side.
+ set<RecordId> _cloneLocs; // (C)
- } migrateFromStatus;
+} migrateFromStatus;
- void MigrateFromStatus::DeleteNotificationStage::invalidate(OperationContext *txn,
- const RecordId& dl,
- InvalidationType type) {
- if ( type == INVALIDATION_DELETION ) {
- migrateFromStatus.aboutToDelete( dl );
- }
+void MigrateFromStatus::DeleteNotificationStage::invalidate(OperationContext* txn,
+ const RecordId& dl,
+ InvalidationType type) {
+ if (type == INVALIDATION_DELETION) {
+ migrateFromStatus.aboutToDelete(dl);
}
+}
- struct MigrateStatusHolder {
- MigrateStatusHolder( OperationContext* txn,
- const std::string& ns ,
- const BSONObj& min ,
- const BSONObj& max ,
- const BSONObj& shardKeyPattern )
- : _txn(txn) {
- _isAnotherMigrationActive =
- !migrateFromStatus.start(txn, ns, min, max, shardKeyPattern);
- }
- ~MigrateStatusHolder() {
- if (!_isAnotherMigrationActive) {
- migrateFromStatus.done(_txn);
- }
+struct MigrateStatusHolder {
+ MigrateStatusHolder(OperationContext* txn,
+ const std::string& ns,
+ const BSONObj& min,
+ const BSONObj& max,
+ const BSONObj& shardKeyPattern)
+ : _txn(txn) {
+ _isAnotherMigrationActive = !migrateFromStatus.start(txn, ns, min, max, shardKeyPattern);
+ }
+ ~MigrateStatusHolder() {
+ if (!_isAnotherMigrationActive) {
+ migrateFromStatus.done(_txn);
}
+ }
- bool isAnotherMigrationActive() const {
- return _isAnotherMigrationActive;
- }
+ bool isAnotherMigrationActive() const {
+ return _isAnotherMigrationActive;
+ }
- private:
- OperationContext* _txn;
- bool _isAnotherMigrationActive;
- };
+private:
+ OperationContext* _txn;
+ bool _isAnotherMigrationActive;
+};
+
+void logOpForSharding(OperationContext* txn,
+ const char* opstr,
+ const char* ns,
+ const BSONObj& obj,
+ BSONObj* patt,
+ bool notInActiveChunk) {
+ migrateFromStatus.logOp(txn, opstr, ns, obj, patt, notInActiveChunk);
+}
- void logOpForSharding(OperationContext* txn,
- const char * opstr,
- const char * ns,
- const BSONObj& obj,
- BSONObj * patt,
- bool notInActiveChunk) {
- migrateFromStatus.logOp(txn, opstr, ns, obj, patt, notInActiveChunk);
+class TransferModsCommand : public ChunkCommandHelper {
+public:
+ void help(std::stringstream& h) const {
+ h << "internal";
}
+ TransferModsCommand() : ChunkCommandHelper("_transferMods") {}
+ virtual void addRequiredPrivileges(const std::string& dbname,
+ const BSONObj& cmdObj,
+ std::vector<Privilege>* out) {
+ ActionSet actions;
+ actions.addAction(ActionType::internal);
+ out->push_back(Privilege(ResourcePattern::forClusterResource(), actions));
+ }
+ bool run(OperationContext* txn,
+ const string&,
+ BSONObj& cmdObj,
+ int,
+ string& errmsg,
+ BSONObjBuilder& result) {
+ return migrateFromStatus.transferMods(txn, errmsg, result);
+ }
+} transferModsCommand;
- class TransferModsCommand : public ChunkCommandHelper {
- public:
- void help(std::stringstream& h) const { h << "internal"; }
- TransferModsCommand() : ChunkCommandHelper( "_transferMods" ) {}
- virtual void addRequiredPrivileges(const std::string& dbname,
- const BSONObj& cmdObj,
- std::vector<Privilege>* out) {
- ActionSet actions;
- actions.addAction(ActionType::internal);
- out->push_back(Privilege(ResourcePattern::forClusterResource(), actions));
- }
- bool run(OperationContext* txn,
- const string&,
- BSONObj& cmdObj,
- int,
- string& errmsg,
- BSONObjBuilder& result) {
- return migrateFromStatus.transferMods(txn, errmsg, result);
- }
- } transferModsCommand;
-
-
- class InitialCloneCommand : public ChunkCommandHelper {
- public:
- void help(std::stringstream& h) const { h << "internal"; }
- InitialCloneCommand() : ChunkCommandHelper( "_migrateClone" ) {}
- virtual void addRequiredPrivileges(const std::string& dbname,
- const BSONObj& cmdObj,
- std::vector<Privilege>* out) {
- ActionSet actions;
- actions.addAction(ActionType::internal);
- out->push_back(Privilege(ResourcePattern::forClusterResource(), actions));
- }
- bool run(OperationContext* txn,
- const string&,
- BSONObj& cmdObj,
- int,
- string& errmsg,
- BSONObjBuilder& result) {
- return migrateFromStatus.clone(txn, errmsg, result);
- }
- } initialCloneCommand;
-
- // Tests can pause / resume moveChunk's progress at each step by enabling / disabling each fail point.
- MONGO_FP_DECLARE(moveChunkHangAtStep1);
- MONGO_FP_DECLARE(moveChunkHangAtStep2);
- MONGO_FP_DECLARE(moveChunkHangAtStep3);
- MONGO_FP_DECLARE(moveChunkHangAtStep4);
- MONGO_FP_DECLARE(moveChunkHangAtStep5);
- MONGO_FP_DECLARE(moveChunkHangAtStep6);
- /**
- * this is the main entry for moveChunk
- * called to initial a move
- * usually by a mongos
- * this is called on the "from" side
- *
- * Format:
- * {
- * moveChunk: "namespace",
- * from: "hostAndPort",
- * fromShard: "shardName",
- * to: "hostAndPort",
- * toShard: "shardName",
- * min: {},
- * max: {},
- * maxChunkBytes: numeric,
- * configdb: "hostAndPort",
- *
- * // optional
- * secondaryThrottle: bool, //defaults to true.
- * writeConcern: {} // applies to individual writes.
- * }
- */
- class MoveChunkCommand : public Command {
- public:
- MoveChunkCommand() : Command( "moveChunk" ) {}
- virtual void help( std::stringstream& help ) const {
- help << "should not be calling this directly";
- }
-
- virtual bool slaveOk() const { return false; }
- virtual bool adminOnly() const { return true; }
- virtual bool isWriteCommandForConfigServer() const { return false; }
- virtual Status checkAuthForCommand(ClientBasic* client,
- const std::string& dbname,
- const BSONObj& cmdObj) {
- if (!AuthorizationSession::get(client)->isAuthorizedForActionsOnResource(
- ResourcePattern::forExactNamespace(NamespaceString(parseNs(dbname, cmdObj))),
- ActionType::moveChunk)) {
- return Status(ErrorCodes::Unauthorized, "Unauthorized");
- }
- return Status::OK();
- }
- virtual std::string parseNs(const std::string& dbname, const BSONObj& cmdObj) const {
- return parseNsFullyQualified(dbname, cmdObj);
- }
-
- bool run(OperationContext* txn,
- const string& dbname,
- BSONObj& cmdObj,
- int,
- string& errmsg,
- BSONObjBuilder& result) {
- // 1. Parse options
- // 2. Make sure my view is complete and lock the distributed lock to ensure shard
- // metadata stability.
- // 3. Migration
- // Retrieve all RecordIds, which need to be migrated in order to do as little seeking
- // as possible during transfer. Retrieval of the RecordIds happens under a collection
- // lock, but then the collection lock is dropped. This opens up an opportunity for
- // repair or compact to invalidate these RecordIds, because these commands do not
- // synchronized with migration. Note that data modifications are not a problem,
- // because we are registered for change notifications.
- //
- // 4. pause till migrate caught up
- // 5. LOCK
- // a) update my config, essentially locking
- // b) finish migrate
- // c) update config server
- // d) logChange to config server
- // 6. wait for all current cursors to expire
- // 7. remove data locally
-
- // -------------------------------
-
- // 1.
- string ns = parseNs(dbname, cmdObj);
-
- // The shard addresses, redundant, but allows for validation
- string toShardHost = cmdObj["to"].str();
- string fromShardHost = cmdObj["from"].str();
-
- // The shard names
- string toShardName = cmdObj["toShard"].str();
- string fromShardName = cmdObj["fromShard"].str();
-
- // Process secondary throttle settings and assign defaults if necessary.
- BSONObj secThrottleObj;
- WriteConcernOptions writeConcern;
- Status status = writeConcern.parseSecondaryThrottle(cmdObj, &secThrottleObj);
-
- if (!status.isOK()){
- if (status.code() != ErrorCodes::WriteConcernNotDefined) {
- warning() << status.toString();
- return appendCommandStatus(result, status);
- }
+class InitialCloneCommand : public ChunkCommandHelper {
+public:
+ void help(std::stringstream& h) const {
+ h << "internal";
+ }
+ InitialCloneCommand() : ChunkCommandHelper("_migrateClone") {}
+ virtual void addRequiredPrivileges(const std::string& dbname,
+ const BSONObj& cmdObj,
+ std::vector<Privilege>* out) {
+ ActionSet actions;
+ actions.addAction(ActionType::internal);
+ out->push_back(Privilege(ResourcePattern::forClusterResource(), actions));
+ }
+ bool run(OperationContext* txn,
+ const string&,
+ BSONObj& cmdObj,
+ int,
+ string& errmsg,
+ BSONObjBuilder& result) {
+ return migrateFromStatus.clone(txn, errmsg, result);
+ }
+} initialCloneCommand;
- writeConcern = getDefaultWriteConcern();
- }
- else {
- repl::ReplicationCoordinator* replCoordinator =
- repl::getGlobalReplicationCoordinator();
+// Tests can pause / resume moveChunk's progress at each step by enabling / disabling each fail point.
+MONGO_FP_DECLARE(moveChunkHangAtStep1);
+MONGO_FP_DECLARE(moveChunkHangAtStep2);
+MONGO_FP_DECLARE(moveChunkHangAtStep3);
+MONGO_FP_DECLARE(moveChunkHangAtStep4);
+MONGO_FP_DECLARE(moveChunkHangAtStep5);
+MONGO_FP_DECLARE(moveChunkHangAtStep6);
- if (replCoordinator->getReplicationMode() ==
- repl::ReplicationCoordinator::modeMasterSlave &&
- writeConcern.shouldWaitForOtherNodes()) {
- warning() << "moveChunk cannot check if secondary throttle setting "
- << writeConcern.toBSON()
- << " can be enforced in a master slave configuration";
- }
+/**
+ * this is the main entry for moveChunk
+ * called to initial a move
+ * usually by a mongos
+ * this is called on the "from" side
+ *
+ * Format:
+ * {
+ * moveChunk: "namespace",
+ * from: "hostAndPort",
+ * fromShard: "shardName",
+ * to: "hostAndPort",
+ * toShard: "shardName",
+ * min: {},
+ * max: {},
+ * maxChunkBytes: numeric,
+ * configdb: "hostAndPort",
+ *
+ * // optional
+ * secondaryThrottle: bool, //defaults to true.
+ * writeConcern: {} // applies to individual writes.
+ * }
+ */
+class MoveChunkCommand : public Command {
+public:
+ MoveChunkCommand() : Command("moveChunk") {}
+ virtual void help(std::stringstream& help) const {
+ help << "should not be calling this directly";
+ }
- Status status = replCoordinator->checkIfWriteConcernCanBeSatisfied(writeConcern);
- if (!status.isOK() && status != ErrorCodes::NoReplicationEnabled) {
- warning() << status.toString();
- return appendCommandStatus(result, status);
- }
- }
+ virtual bool slaveOk() const {
+ return false;
+ }
+ virtual bool adminOnly() const {
+ return true;
+ }
+ virtual bool isWriteCommandForConfigServer() const {
+ return false;
+ }
+ virtual Status checkAuthForCommand(ClientBasic* client,
+ const std::string& dbname,
+ const BSONObj& cmdObj) {
+ if (!AuthorizationSession::get(client)->isAuthorizedForActionsOnResource(
+ ResourcePattern::forExactNamespace(NamespaceString(parseNs(dbname, cmdObj))),
+ ActionType::moveChunk)) {
+ return Status(ErrorCodes::Unauthorized, "Unauthorized");
+ }
+ return Status::OK();
+ }
+ virtual std::string parseNs(const std::string& dbname, const BSONObj& cmdObj) const {
+ return parseNsFullyQualified(dbname, cmdObj);
+ }
- if (writeConcern.shouldWaitForOtherNodes() &&
- writeConcern.wTimeout == WriteConcernOptions::kNoTimeout) {
- // Don't allow no timeout.
- writeConcern.wTimeout = kDefaultWTimeoutMs;
- }
+ bool run(OperationContext* txn,
+ const string& dbname,
+ BSONObj& cmdObj,
+ int,
+ string& errmsg,
+ BSONObjBuilder& result) {
+ // 1. Parse options
+ // 2. Make sure my view is complete and lock the distributed lock to ensure shard
+ // metadata stability.
+ // 3. Migration
+ // Retrieve all RecordIds, which need to be migrated in order to do as little seeking
+ // as possible during transfer. Retrieval of the RecordIds happens under a collection
+ // lock, but then the collection lock is dropped. This opens up an opportunity for
+ // repair or compact to invalidate these RecordIds, because these commands do not
+ // synchronized with migration. Note that data modifications are not a problem,
+ // because we are registered for change notifications.
+ //
+ // 4. pause till migrate caught up
+ // 5. LOCK
+ // a) update my config, essentially locking
+ // b) finish migrate
+ // c) update config server
+ // d) logChange to config server
+ // 6. wait for all current cursors to expire
+ // 7. remove data locally
- // Do inline deletion
- bool waitForDelete = cmdObj["waitForDelete"].trueValue();
- if (waitForDelete) {
- log() << "moveChunk waiting for full cleanup after move";
- }
+ // -------------------------------
- BSONObj min = cmdObj["min"].Obj();
- BSONObj max = cmdObj["max"].Obj();
- BSONElement maxSizeElem = cmdObj["maxChunkSizeBytes"];
+ // 1.
+ string ns = parseNs(dbname, cmdObj);
- if ( ns.empty() ) {
- errmsg = "need to specify namespace in command";
- return false;
- }
+ // The shard addresses, redundant, but allows for validation
+ string toShardHost = cmdObj["to"].str();
+ string fromShardHost = cmdObj["from"].str();
- if ( toShardName.empty() ) {
- errmsg = "need to specify shard to move chunk to";
- return false;
- }
- if ( fromShardName.empty() ) {
- errmsg = "need to specify shard to move chunk from";
- return false;
- }
+ // The shard names
+ string toShardName = cmdObj["toShard"].str();
+ string fromShardName = cmdObj["fromShard"].str();
- if ( min.isEmpty() ) {
- errmsg = "need to specify a min";
- return false;
- }
+ // Process secondary throttle settings and assign defaults if necessary.
+ BSONObj secThrottleObj;
+ WriteConcernOptions writeConcern;
+ Status status = writeConcern.parseSecondaryThrottle(cmdObj, &secThrottleObj);
- if ( max.isEmpty() ) {
- errmsg = "need to specify a max";
- return false;
+ if (!status.isOK()) {
+ if (status.code() != ErrorCodes::WriteConcernNotDefined) {
+ warning() << status.toString();
+ return appendCommandStatus(result, status);
}
- if ( maxSizeElem.eoo() || ! maxSizeElem.isNumber() ) {
- errmsg = "need to specify maxChunkSizeBytes";
- return false;
- }
- const long long maxChunkSize = maxSizeElem.numberLong(); // in bytes
+ writeConcern = getDefaultWriteConcern();
+ } else {
+ repl::ReplicationCoordinator* replCoordinator = repl::getGlobalReplicationCoordinator();
- // This could be the first call that enables sharding - make sure we initialize the
- // sharding state for this shard.
- if ( ! shardingState.enabled() ) {
- if ( cmdObj["configdb"].type() != String ) {
- errmsg = "sharding not enabled";
- warning() << errmsg;
- return false;
- }
- string configdb = cmdObj["configdb"].String();
- ShardingState::initialize(configdb);
+ if (replCoordinator->getReplicationMode() ==
+ repl::ReplicationCoordinator::modeMasterSlave &&
+ writeConcern.shouldWaitForOtherNodes()) {
+ warning() << "moveChunk cannot check if secondary throttle setting "
+ << writeConcern.toBSON()
+ << " can be enforced in a master slave configuration";
}
- // Initialize our current shard name in the shard state if needed
- shardingState.gotShardName(fromShardName);
-
- // Make sure we're as up-to-date as possible with shard information
- // This catches the case where we had to previously changed a shard's host by
- // removing/adding a shard with the same name
- Shard::reloadShardInfo();
-
- ConnectionString configLoc = ConnectionString::parse(shardingState.getConfigServer(),
- errmsg);
- if (!configLoc.isValid()) {
- warning() << errmsg;
- return false;
+ Status status = replCoordinator->checkIfWriteConcernCanBeSatisfied(writeConcern);
+ if (!status.isOK() && status != ErrorCodes::NoReplicationEnabled) {
+ warning() << status.toString();
+ return appendCommandStatus(result, status);
}
+ }
- MoveTimingHelper timing(txn, "from" , ns , min , max , 6 /* steps */ , &errmsg,
- toShardName, fromShardName );
+ if (writeConcern.shouldWaitForOtherNodes() &&
+ writeConcern.wTimeout == WriteConcernOptions::kNoTimeout) {
+ // Don't allow no timeout.
+ writeConcern.wTimeout = kDefaultWTimeoutMs;
+ }
- log() << "received moveChunk request: " << cmdObj << migrateLog;
+ // Do inline deletion
+ bool waitForDelete = cmdObj["waitForDelete"].trueValue();
+ if (waitForDelete) {
+ log() << "moveChunk waiting for full cleanup after move";
+ }
- timing.done(1);
- MONGO_FP_PAUSE_WHILE(moveChunkHangAtStep1);
+ BSONObj min = cmdObj["min"].Obj();
+ BSONObj max = cmdObj["max"].Obj();
+ BSONElement maxSizeElem = cmdObj["maxChunkSizeBytes"];
- // 2.
+ if (ns.empty()) {
+ errmsg = "need to specify namespace in command";
+ return false;
+ }
- if ( migrateFromStatus.isActive() ) {
- errmsg = "migration already in progress";
- warning() << errmsg;
- return false;
- }
+ if (toShardName.empty()) {
+ errmsg = "need to specify shard to move chunk to";
+ return false;
+ }
+ if (fromShardName.empty()) {
+ errmsg = "need to specify shard to move chunk from";
+ return false;
+ }
- //
- // Get the distributed lock
- //
+ if (min.isEmpty()) {
+ errmsg = "need to specify a min";
+ return false;
+ }
- string whyMessage(str::stream() << "migrating chunk [" << minKey << ", " << maxKey
- << ") in " << ns);
- auto scopedDistLock = grid.catalogManager()->getDistLockManager()->lock(
- ns, whyMessage);
+ if (max.isEmpty()) {
+ errmsg = "need to specify a max";
+ return false;
+ }
- if (!scopedDistLock.isOK()) {
- errmsg = stream() << "could not acquire collection lock for " << ns
- << " to migrate chunk [" << minKey << "," << maxKey << ")"
- << causedBy(scopedDistLock.getStatus());
+ if (maxSizeElem.eoo() || !maxSizeElem.isNumber()) {
+ errmsg = "need to specify maxChunkSizeBytes";
+ return false;
+ }
+ const long long maxChunkSize = maxSizeElem.numberLong(); // in bytes
+ // This could be the first call that enables sharding - make sure we initialize the
+ // sharding state for this shard.
+ if (!shardingState.enabled()) {
+ if (cmdObj["configdb"].type() != String) {
+ errmsg = "sharding not enabled";
warning() << errmsg;
return false;
}
+ string configdb = cmdObj["configdb"].String();
+ ShardingState::initialize(configdb);
+ }
- BSONObj chunkInfo = BSON("min" << min << "max" << max <<
- "from" << fromShardName << "to" << toShardName);
+ // Initialize our current shard name in the shard state if needed
+ shardingState.gotShardName(fromShardName);
- grid.catalogManager()->logChange(txn, "moveChunk.start", ns, chunkInfo);
+ // Make sure we're as up-to-date as possible with shard information
+ // This catches the case where we had to previously changed a shard's host by
+ // removing/adding a shard with the same name
+ Shard::reloadShardInfo();
- // Always refresh our metadata remotely
- ChunkVersion origShardVersion;
- Status refreshStatus = shardingState.refreshMetadataNow(txn, ns, &origShardVersion);
+ ConnectionString configLoc =
+ ConnectionString::parse(shardingState.getConfigServer(), errmsg);
+ if (!configLoc.isValid()) {
+ warning() << errmsg;
+ return false;
+ }
- if (!refreshStatus.isOK()) {
- errmsg = str::stream() << "moveChunk cannot start migrate of chunk "
- << "[" << minKey << "," << maxKey << ")"
- << causedBy(refreshStatus.reason());
+ MoveTimingHelper timing(
+ txn, "from", ns, min, max, 6 /* steps */, &errmsg, toShardName, fromShardName);
- warning() << errmsg;
- return false;
- }
-
- if (origShardVersion.majorVersion() == 0) {
- // It makes no sense to migrate if our version is zero and we have no chunks
- errmsg = str::stream() << "moveChunk cannot start migrate of chunk "
- << "[" << minKey << "," << maxKey << ")"
- << " with zero shard version";
+ log() << "received moveChunk request: " << cmdObj << migrateLog;
- warning() << errmsg;
- return false;
- }
+ timing.done(1);
+ MONGO_FP_PAUSE_WHILE(moveChunkHangAtStep1);
- // From mongos >= v3.0.
- BSONElement epochElem(cmdObj["epoch"]);
- if (epochElem.type() == jstOID) {
- OID cmdEpoch = epochElem.OID();
+ // 2.
- if (cmdEpoch != origShardVersion.epoch()) {
- errmsg = str::stream() << "moveChunk cannot move chunk "
- << "[" << minKey << ","
- << maxKey << "), "
- << "collection may have been dropped. "
- << "current epoch: " << origShardVersion.epoch()
- << ", cmd epoch: " << cmdEpoch;
- warning() << errmsg;
- return false;
- }
- }
+ if (migrateFromStatus.isActive()) {
+ errmsg = "migration already in progress";
+ warning() << errmsg;
+ return false;
+ }
- // Get collection metadata
- const CollectionMetadataPtr origCollMetadata(shardingState.getCollectionMetadata(ns));
+ //
+ // Get the distributed lock
+ //
- // With nonzero shard version, we must have metadata
- invariant(NULL != origCollMetadata);
+ string whyMessage(str::stream() << "migrating chunk [" << minKey << ", " << maxKey
+ << ") in " << ns);
+ auto scopedDistLock = grid.catalogManager()->getDistLockManager()->lock(ns, whyMessage);
- ChunkVersion origCollVersion = origCollMetadata->getCollVersion();
- BSONObj shardKeyPattern = origCollMetadata->getKeyPattern();
+ if (!scopedDistLock.isOK()) {
+ errmsg = stream() << "could not acquire collection lock for " << ns
+ << " to migrate chunk [" << minKey << "," << maxKey << ")"
+ << causedBy(scopedDistLock.getStatus());
- // With nonzero shard version, we must have a coll version >= our shard version
- invariant(origCollVersion >= origShardVersion);
+ warning() << errmsg;
+ return false;
+ }
- // With nonzero shard version, we must have a shard key
- invariant(!shardKeyPattern.isEmpty());
+ BSONObj chunkInfo =
+ BSON("min" << min << "max" << max << "from" << fromShardName << "to" << toShardName);
- ChunkType origChunk;
- if (!origCollMetadata->getNextChunk(min, &origChunk)
- || origChunk.getMin().woCompare(min)
- || origChunk.getMax().woCompare(max)) {
+ grid.catalogManager()->logChange(txn, "moveChunk.start", ns, chunkInfo);
- // Our boundaries are different from those passed in
- errmsg = str::stream() << "moveChunk cannot find chunk "
- << "[" << minKey << "," << maxKey << ")"
- << " to migrate, the chunk boundaries may be stale";
+ // Always refresh our metadata remotely
+ ChunkVersion origShardVersion;
+ Status refreshStatus = shardingState.refreshMetadataNow(txn, ns, &origShardVersion);
- warning() << errmsg;
- return false;
- }
+ if (!refreshStatus.isOK()) {
+ errmsg = str::stream() << "moveChunk cannot start migrate of chunk "
+ << "[" << minKey << "," << maxKey << ")"
+ << causedBy(refreshStatus.reason());
- log() << "moveChunk request accepted at version " << origShardVersion;
+ warning() << errmsg;
+ return false;
+ }
- timing.done(2);
- MONGO_FP_PAUSE_WHILE(moveChunkHangAtStep2);
+ if (origShardVersion.majorVersion() == 0) {
+ // It makes no sense to migrate if our version is zero and we have no chunks
+ errmsg = str::stream() << "moveChunk cannot start migrate of chunk "
+ << "[" << minKey << "," << maxKey << ")"
+ << " with zero shard version";
- // 3.
- MigrateStatusHolder statusHolder(txn, ns, min, max, shardKeyPattern);
+ warning() << errmsg;
+ return false;
+ }
- if (statusHolder.isAnotherMigrationActive()) {
- errmsg = "moveChunk is already in progress from this shard";
+ // From mongos >= v3.0.
+ BSONElement epochElem(cmdObj["epoch"]);
+ if (epochElem.type() == jstOID) {
+ OID cmdEpoch = epochElem.OID();
+
+ if (cmdEpoch != origShardVersion.epoch()) {
+ errmsg = str::stream() << "moveChunk cannot move chunk "
+ << "[" << minKey << "," << maxKey << "), "
+ << "collection may have been dropped. "
+ << "current epoch: " << origShardVersion.epoch()
+ << ", cmd epoch: " << cmdEpoch;
warning() << errmsg;
return false;
}
+ }
- ConnectionString fromShardCS;
- ConnectionString toShardCS;
+ // Get collection metadata
+ const CollectionMetadataPtr origCollMetadata(shardingState.getCollectionMetadata(ns));
- // Resolve the shard connection strings.
- {
- std::shared_ptr<Shard> fromShard =
- grid.shardRegistry()->getShard(fromShardName);
- uassert(28674,
- str::stream() << "Source shard " << fromShardName
- << " is missing. This indicates metadata corruption.",
- fromShard);
+ // With nonzero shard version, we must have metadata
+ invariant(NULL != origCollMetadata);
- fromShardCS = fromShard->getConnString();
+ ChunkVersion origCollVersion = origCollMetadata->getCollVersion();
+ BSONObj shardKeyPattern = origCollMetadata->getKeyPattern();
- std::shared_ptr<Shard> toShard = grid.shardRegistry()->getShard(toShardName);
- uassert(28675,
- str::stream() << "Destination shard " << toShardName
- << " is missing. This indicates metadata corruption.",
- toShard);
+ // With nonzero shard version, we must have a coll version >= our shard version
+ invariant(origCollVersion >= origShardVersion);
- toShardCS = toShard->getConnString();
- }
+ // With nonzero shard version, we must have a shard key
+ invariant(!shardKeyPattern.isEmpty());
- {
- // See comment at the top of the function for more information on what
- // synchronization is used here.
- if (!migrateFromStatus.storeCurrentLocs(txn, maxChunkSize, errmsg, result)) {
- warning() << errmsg;
- return false;
- }
+ ChunkType origChunk;
+ if (!origCollMetadata->getNextChunk(min, &origChunk) || origChunk.getMin().woCompare(min) ||
+ origChunk.getMax().woCompare(max)) {
+ // Our boundaries are different from those passed in
+ errmsg = str::stream() << "moveChunk cannot find chunk "
+ << "[" << minKey << "," << maxKey << ")"
+ << " to migrate, the chunk boundaries may be stale";
- ScopedDbConnection connTo(toShardCS);
- BSONObj res;
- bool ok;
-
- const bool isSecondaryThrottle(writeConcern.shouldWaitForOtherNodes());
-
- BSONObjBuilder recvChunkStartBuilder;
- recvChunkStartBuilder.append("_recvChunkStart", ns);
- recvChunkStartBuilder.append("from", fromShardCS.toString());
- recvChunkStartBuilder.append("fromShardName", fromShardName);
- recvChunkStartBuilder.append("toShardName", toShardName);
- recvChunkStartBuilder.append("min", min);
- recvChunkStartBuilder.append("max", max);
- recvChunkStartBuilder.append("shardKeyPattern", shardKeyPattern);
- recvChunkStartBuilder.append("configServer", shardingState.getConfigServer());
- recvChunkStartBuilder.append("secondaryThrottle", isSecondaryThrottle);
-
- // Follow the same convention in moveChunk.
- if (isSecondaryThrottle && !secThrottleObj.isEmpty()) {
- recvChunkStartBuilder.append("writeConcern", secThrottleObj);
- }
+ warning() << errmsg;
+ return false;
+ }
- try{
- ok = connTo->runCommand("admin", recvChunkStartBuilder.done(), res);
- }
- catch( DBException& e ){
- errmsg = str::stream() << "moveChunk could not contact to: shard "
- << toShardName << " to start transfer" << causedBy( e );
- warning() << errmsg;
- return false;
- }
+ log() << "moveChunk request accepted at version " << origShardVersion;
- connTo.done();
+ timing.done(2);
+ MONGO_FP_PAUSE_WHILE(moveChunkHangAtStep2);
- if ( ! ok ) {
- errmsg = "moveChunk failed to engage TO-shard in the data transfer: ";
- verify( res["errmsg"].type() );
- errmsg += res["errmsg"].String();
- result.append( "cause" , res );
- warning() << errmsg;
- return false;
- }
+ // 3.
+ MigrateStatusHolder statusHolder(txn, ns, min, max, shardKeyPattern);
- }
- timing.done( 3 );
- MONGO_FP_PAUSE_WHILE(moveChunkHangAtStep3);
+ if (statusHolder.isAnotherMigrationActive()) {
+ errmsg = "moveChunk is already in progress from this shard";
+ warning() << errmsg;
+ return false;
+ }
- // 4.
+ ConnectionString fromShardCS;
+ ConnectionString toShardCS;
- // Track last result from TO shard for sanity check
- BSONObj res;
- for ( int i=0; i<86400; i++ ) { // don't want a single chunk move to take more than a day
- invariant(!txn->lockState()->isLocked());
+ // Resolve the shard connection strings.
+ {
+ std::shared_ptr<Shard> fromShard = grid.shardRegistry()->getShard(fromShardName);
+ uassert(28674,
+ str::stream() << "Source shard " << fromShardName
+ << " is missing. This indicates metadata corruption.",
+ fromShard);
- // Exponential sleep backoff, up to 1024ms. Don't sleep much on the first few
- // iterations, since we want empty chunk migrations to be fast.
- sleepmillis(1 << std::min(i, 10));
+ fromShardCS = fromShard->getConnString();
- ScopedDbConnection conn(toShardCS);
- bool ok;
- res = BSONObj();
- try {
- ok = conn->runCommand( "admin" , BSON( "_recvChunkStatus" << 1 ) , res );
- res = res.getOwned();
- }
- catch( DBException& e ){
- errmsg = str::stream() << "moveChunk could not contact to: shard " << toShardName << " to monitor transfer" << causedBy( e );
- warning() << errmsg;
- return false;
- }
-
- conn.done();
-
- if ( res["ns"].str() != ns ||
- res["from"].str() != fromShardCS.toString() ||
- !res["min"].isABSONObj() ||
- res["min"].Obj().woCompare(min) != 0 ||
- !res["max"].isABSONObj() ||
- res["max"].Obj().woCompare(max) != 0 ) {
- // This can happen when the destination aborted the migration and
- // received another recvChunk before this thread sees the transition
- // to the abort state. This is currently possible only if multiple migrations
- // are happening at once. This is an unfortunate consequence of the shards not
- // being able to keep track of multiple incoming and outgoing migrations.
- errmsg = str::stream() << "Destination shard aborted migration, "
- "now running a new one: " << res;
- warning() << errmsg;
- return false;
- }
+ std::shared_ptr<Shard> toShard = grid.shardRegistry()->getShard(toShardName);
+ uassert(28675,
+ str::stream() << "Destination shard " << toShardName
+ << " is missing. This indicates metadata corruption.",
+ toShard);
- LOG(0) << "moveChunk data transfer progress: " << res << " my mem used: " << migrateFromStatus.mbUsed() << migrateLog;
+ toShardCS = toShard->getConnString();
+ }
- if ( ! ok || res["state"].String() == "fail" ) {
- warning() << "moveChunk error transferring data caused migration abort: " << res << migrateLog;
- errmsg = "data transfer error";
- result.append( "cause" , res );
- return false;
- }
+ {
+ // See comment at the top of the function for more information on what
+ // synchronization is used here.
+ if (!migrateFromStatus.storeCurrentLocs(txn, maxChunkSize, errmsg, result)) {
+ warning() << errmsg;
+ return false;
+ }
- if ( res["state"].String() == "steady" )
- break;
+ ScopedDbConnection connTo(toShardCS);
+ BSONObj res;
+ bool ok;
- if ( migrateFromStatus.mbUsed() > (500 * 1024 * 1024) ) {
- // This is too much memory for us to use for this so we're going to abort
- // the migrate
- ScopedDbConnection conn(toShardCS);
+ const bool isSecondaryThrottle(writeConcern.shouldWaitForOtherNodes());
- BSONObj res;
- if (!conn->runCommand("admin", BSON("_recvChunkAbort" << 1), res)) {
- warning() << "Error encountered while trying to abort migration on "
- << "destination shard" << toShardCS;
- }
+ BSONObjBuilder recvChunkStartBuilder;
+ recvChunkStartBuilder.append("_recvChunkStart", ns);
+ recvChunkStartBuilder.append("from", fromShardCS.toString());
+ recvChunkStartBuilder.append("fromShardName", fromShardName);
+ recvChunkStartBuilder.append("toShardName", toShardName);
+ recvChunkStartBuilder.append("min", min);
+ recvChunkStartBuilder.append("max", max);
+ recvChunkStartBuilder.append("shardKeyPattern", shardKeyPattern);
+ recvChunkStartBuilder.append("configServer", shardingState.getConfigServer());
+ recvChunkStartBuilder.append("secondaryThrottle", isSecondaryThrottle);
- res = res.getOwned();
- conn.done();
- error() << "aborting migrate because too much memory used res: " << res << migrateLog;
- errmsg = "aborting migrate because too much memory used";
- result.appendBool( "split" , true );
- return false;
- }
+ // Follow the same convention in moveChunk.
+ if (isSecondaryThrottle && !secThrottleObj.isEmpty()) {
+ recvChunkStartBuilder.append("writeConcern", secThrottleObj);
+ }
- txn->checkForInterrupt();
+ try {
+ ok = connTo->runCommand("admin", recvChunkStartBuilder.done(), res);
+ } catch (DBException& e) {
+ errmsg = str::stream() << "moveChunk could not contact to: shard " << toShardName
+ << " to start transfer" << causedBy(e);
+ warning() << errmsg;
+ return false;
}
- timing.done(4);
- MONGO_FP_PAUSE_WHILE(moveChunkHangAtStep4);
+ connTo.done();
- // 5.
+ if (!ok) {
+ errmsg = "moveChunk failed to engage TO-shard in the data transfer: ";
+ verify(res["errmsg"].type());
+ errmsg += res["errmsg"].String();
+ result.append("cause", res);
+ warning() << errmsg;
+ return false;
+ }
+ }
+ timing.done(3);
+ MONGO_FP_PAUSE_WHILE(moveChunkHangAtStep3);
- // Before we get into the critical section of the migration, let's double check
- // that the docs have been cloned, the config servers are reachable,
- // and the lock is in place.
- log() << "About to check if it is safe to enter critical section";
+ // 4.
- // Ensure all cloned docs have actually been transferred
- std::size_t locsRemaining = migrateFromStatus.cloneLocsRemaining();
- if ( locsRemaining != 0 ) {
+ // Track last result from TO shard for sanity check
+ BSONObj res;
+ for (int i = 0; i < 86400; i++) { // don't want a single chunk move to take more than a day
+ invariant(!txn->lockState()->isLocked());
- errmsg =
- str::stream() << "moveChunk cannot enter critical section before all data is"
- << " cloned, " << locsRemaining << " locs were not transferred"
- << " but to-shard reported " << res;
+ // Exponential sleep backoff, up to 1024ms. Don't sleep much on the first few
+ // iterations, since we want empty chunk migrations to be fast.
+ sleepmillis(1 << std::min(i, 10));
- // Should never happen, but safe to abort before critical section
- error() << errmsg << migrateLog;
- dassert( false );
+ ScopedDbConnection conn(toShardCS);
+ bool ok;
+ res = BSONObj();
+ try {
+ ok = conn->runCommand("admin", BSON("_recvChunkStatus" << 1), res);
+ res = res.getOwned();
+ } catch (DBException& e) {
+ errmsg = str::stream() << "moveChunk could not contact to: shard " << toShardName
+ << " to monitor transfer" << causedBy(e);
+ warning() << errmsg;
return false;
}
- // Ensure distributed lock still held
- Status lockStatus = scopedDistLock.getValue().checkStatus();
- if (!lockStatus.isOK()) {
- errmsg = str::stream() << "not entering migrate critical section because "
- << lockStatus.toString();
+ conn.done();
+
+ if (res["ns"].str() != ns || res["from"].str() != fromShardCS.toString() ||
+ !res["min"].isABSONObj() || res["min"].Obj().woCompare(min) != 0 ||
+ !res["max"].isABSONObj() || res["max"].Obj().woCompare(max) != 0) {
+ // This can happen when the destination aborted the migration and
+ // received another recvChunk before this thread sees the transition
+ // to the abort state. This is currently possible only if multiple migrations
+ // are happening at once. This is an unfortunate consequence of the shards not
+ // being able to keep track of multiple incoming and outgoing migrations.
+ errmsg = str::stream() << "Destination shard aborted migration, "
+ "now running a new one: " << res;
warning() << errmsg;
return false;
}
- log() << "About to enter migrate critical section";
+ LOG(0) << "moveChunk data transfer progress: " << res
+ << " my mem used: " << migrateFromStatus.mbUsed() << migrateLog;
- {
- // 5.a
- // we're under the collection lock here, so no other migrate can change maxVersion
- // or CollectionMetadata state
- migrateFromStatus.setInCriticalSection( true );
- ChunkVersion myVersion = origCollVersion;
- myVersion.incMajor();
+ if (!ok || res["state"].String() == "fail") {
+ warning() << "moveChunk error transferring data caused migration abort: " << res
+ << migrateLog;
+ errmsg = "data transfer error";
+ result.append("cause", res);
+ return false;
+ }
- {
- ScopedTransaction transaction(txn, MODE_IX);
- Lock::DBLock lk(txn->lockState(), nsToDatabaseSubstring(ns), MODE_IX);
- Lock::CollectionLock collLock(txn->lockState(), ns, MODE_X);
- verify( myVersion > shardingState.getVersion( ns ) );
+ if (res["state"].String() == "steady")
+ break;
- // bump the metadata's version up and "forget" about the chunk being moved
- // this is not the commit point but in practice the state in this shard won't
- // until the commit it done
- shardingState.donateChunk(txn, ns, min, max, myVersion);
+ if (migrateFromStatus.mbUsed() > (500 * 1024 * 1024)) {
+ // This is too much memory for us to use for this so we're going to abort
+ // the migrate
+ ScopedDbConnection conn(toShardCS);
+
+ BSONObj res;
+ if (!conn->runCommand("admin", BSON("_recvChunkAbort" << 1), res)) {
+ warning() << "Error encountered while trying to abort migration on "
+ << "destination shard" << toShardCS;
}
- log() << "moveChunk setting version to: " << myVersion << migrateLog;
+ res = res.getOwned();
+ conn.done();
+ error() << "aborting migrate because too much memory used res: " << res
+ << migrateLog;
+ errmsg = "aborting migrate because too much memory used";
+ result.appendBool("split", true);
+ return false;
+ }
- // 5.b
- // we're under the collection lock here, too, so we can undo the chunk donation because no other state change
- // could be ongoing
+ txn->checkForInterrupt();
+ }
- BSONObj res;
- bool ok;
+ timing.done(4);
+ MONGO_FP_PAUSE_WHILE(moveChunkHangAtStep4);
- try {
- ScopedDbConnection connTo(toShardCS, 35.0);
- ok = connTo->runCommand( "admin", BSON( "_recvChunkCommit" << 1 ), res );
- connTo.done();
- }
- catch ( DBException& e ) {
- errmsg = str::stream() << "moveChunk could not contact to: shard "
- << toShardCS.toString()
- << " to commit transfer" << causedBy(e);
- warning() << errmsg;
- ok = false;
- }
+ // 5.
- if ( !ok || MONGO_FAIL_POINT(failMigrationCommit) ) {
- log() << "moveChunk migrate commit not accepted by TO-shard: " << res
- << " resetting shard version to: " << origShardVersion << migrateLog;
- {
- ScopedTransaction transaction(txn, MODE_IX);
- Lock::DBLock dbLock(txn->lockState(), nsToDatabaseSubstring(ns), MODE_IX);
- Lock::CollectionLock collLock(txn->lockState(), ns, MODE_X);
+ // Before we get into the critical section of the migration, let's double check
+ // that the docs have been cloned, the config servers are reachable,
+ // and the lock is in place.
+ log() << "About to check if it is safe to enter critical section";
- log() << "moveChunk collection lock acquired to reset shard version from "
- "failed migration"
- ;
+ // Ensure all cloned docs have actually been transferred
+ std::size_t locsRemaining = migrateFromStatus.cloneLocsRemaining();
+ if (locsRemaining != 0) {
+ errmsg = str::stream() << "moveChunk cannot enter critical section before all data is"
+ << " cloned, " << locsRemaining << " locs were not transferred"
+ << " but to-shard reported " << res;
- // revert the chunk manager back to the state before "forgetting" about the
- // chunk
- shardingState.undoDonateChunk(txn, ns, origCollMetadata);
- }
- log() << "Shard version successfully reset to clean up failed migration"
- ;
+ // Should never happen, but safe to abort before critical section
+ error() << errmsg << migrateLog;
+ dassert(false);
+ return false;
+ }
- errmsg = "_recvChunkCommit failed!";
- result.append( "cause", res );
- return false;
- }
+ // Ensure distributed lock still held
+ Status lockStatus = scopedDistLock.getValue().checkStatus();
+ if (!lockStatus.isOK()) {
+ errmsg = str::stream() << "not entering migrate critical section because "
+ << lockStatus.toString();
+ warning() << errmsg;
+ return false;
+ }
- log() << "moveChunk migrate commit accepted by TO-shard: " << res << migrateLog;
+ log() << "About to enter migrate critical section";
- // 5.c
+ {
+ // 5.a
+ // we're under the collection lock here, so no other migrate can change maxVersion
+ // or CollectionMetadata state
+ migrateFromStatus.setInCriticalSection(true);
+ ChunkVersion myVersion = origCollVersion;
+ myVersion.incMajor();
- // version at which the next highest lastmod will be set
- // if the chunk being moved is the last in the shard, nextVersion is that chunk's lastmod
- // otherwise the highest version is from the chunk being bumped on the FROM-shard
- ChunkVersion nextVersion;
+ {
+ ScopedTransaction transaction(txn, MODE_IX);
+ Lock::DBLock lk(txn->lockState(), nsToDatabaseSubstring(ns), MODE_IX);
+ Lock::CollectionLock collLock(txn->lockState(), ns, MODE_X);
+ verify(myVersion > shardingState.getVersion(ns));
- // we want to go only once to the configDB but perhaps change two chunks, the one being migrated and another
- // local one (so to bump version for the entire shard)
- // we use the 'applyOps' mechanism to group the two updates and make them safer
- // TODO pull config update code to a module
+ // bump the metadata's version up and "forget" about the chunk being moved
+ // this is not the commit point but in practice the state in this shard won't
+ // until the commit it done
+ shardingState.donateChunk(txn, ns, min, max, myVersion);
+ }
- BSONArrayBuilder updates;
- {
- // update for the chunk being moved
- BSONObjBuilder op;
- op.append( "op" , "u" );
- op.appendBool( "b" , false /* no upserting */ );
- op.append( "ns" , ChunkType::ConfigNS );
-
- BSONObjBuilder n( op.subobjStart( "o" ) );
- n.append(ChunkType::name(), Chunk::genID(ns, min));
- myVersion.addToBSON(n, ChunkType::DEPRECATED_lastmod());
- n.append(ChunkType::ns(), ns);
- n.append(ChunkType::min(), min);
- n.append(ChunkType::max(), max);
- n.append(ChunkType::shard(), toShardName);
- n.done();
-
- BSONObjBuilder q( op.subobjStart( "o2" ) );
- q.append(ChunkType::name(), Chunk::genID(ns, min));
- q.done();
-
- updates.append( op.obj() );
- }
+ log() << "moveChunk setting version to: " << myVersion << migrateLog;
- nextVersion = myVersion;
+ // 5.b
+ // we're under the collection lock here, too, so we can undo the chunk donation because no other state change
+ // could be ongoing
- // if we have chunks left on the FROM shard, update the version of one of them as
- // well. we can figure that out by grabbing the metadata installed on 5.a
+ BSONObj res;
+ bool ok;
- const CollectionMetadataPtr bumpedCollMetadata( shardingState.getCollectionMetadata( ns ) );
- if( bumpedCollMetadata->getNumChunks() > 0 ) {
+ try {
+ ScopedDbConnection connTo(toShardCS, 35.0);
+ ok = connTo->runCommand("admin", BSON("_recvChunkCommit" << 1), res);
+ connTo.done();
+ } catch (DBException& e) {
+ errmsg = str::stream() << "moveChunk could not contact to: shard "
+ << toShardCS.toString() << " to commit transfer"
+ << causedBy(e);
+ warning() << errmsg;
+ ok = false;
+ }
- // get another chunk on that shard
- ChunkType bumpChunk;
- bool chunkRes =
- bumpedCollMetadata->getNextChunk(bumpedCollMetadata->getMinKey(),
- &bumpChunk);
- BSONObj bumpMin = bumpChunk.getMin();
- BSONObj bumpMax = bumpChunk.getMax();
+ if (!ok || MONGO_FAIL_POINT(failMigrationCommit)) {
+ log() << "moveChunk migrate commit not accepted by TO-shard: " << res
+ << " resetting shard version to: " << origShardVersion << migrateLog;
+ {
+ ScopedTransaction transaction(txn, MODE_IX);
+ Lock::DBLock dbLock(txn->lockState(), nsToDatabaseSubstring(ns), MODE_IX);
+ Lock::CollectionLock collLock(txn->lockState(), ns, MODE_X);
- (void)chunkRes; // for compile warning on non-debug
- dassert(chunkRes);
- dassert( bumpMin.woCompare( min ) != 0 );
+ log() << "moveChunk collection lock acquired to reset shard version from "
+ "failed migration";
- BSONObjBuilder op;
- op.append( "op" , "u" );
- op.appendBool( "b" , false );
- op.append( "ns" , ChunkType::ConfigNS );
+ // revert the chunk manager back to the state before "forgetting" about the
+ // chunk
+ shardingState.undoDonateChunk(txn, ns, origCollMetadata);
+ }
+ log() << "Shard version successfully reset to clean up failed migration";
- nextVersion.incMinor(); // same as used on donateChunk
- BSONObjBuilder n( op.subobjStart( "o" ) );
- n.append(ChunkType::name(), Chunk::genID(ns, bumpMin));
- nextVersion.addToBSON(n, ChunkType::DEPRECATED_lastmod());
- n.append(ChunkType::ns(), ns);
- n.append(ChunkType::min(), bumpMin);
- n.append(ChunkType::max(), bumpMax);
- n.append(ChunkType::shard(), fromShardName);
- n.done();
+ errmsg = "_recvChunkCommit failed!";
+ result.append("cause", res);
+ return false;
+ }
- BSONObjBuilder q( op.subobjStart( "o2" ) );
- q.append(ChunkType::name(), Chunk::genID(ns, bumpMin));
- q.done();
+ log() << "moveChunk migrate commit accepted by TO-shard: " << res << migrateLog;
- updates.append( op.obj() );
+ // 5.c
- log() << "moveChunk updating self version to: " << nextVersion << " through "
- << bumpMin << " -> " << bumpMax << " for collection '" << ns << "'" << migrateLog;
+ // version at which the next highest lastmod will be set
+ // if the chunk being moved is the last in the shard, nextVersion is that chunk's lastmod
+ // otherwise the highest version is from the chunk being bumped on the FROM-shard
+ ChunkVersion nextVersion;
- }
- else {
+ // we want to go only once to the configDB but perhaps change two chunks, the one being migrated and another
+ // local one (so to bump version for the entire shard)
+ // we use the 'applyOps' mechanism to group the two updates and make them safer
+ // TODO pull config update code to a module
- log() << "moveChunk moved last chunk out for collection '" << ns << "'" << migrateLog;
- }
+ BSONArrayBuilder updates;
+ {
+ // update for the chunk being moved
+ BSONObjBuilder op;
+ op.append("op", "u");
+ op.appendBool("b", false /* no upserting */);
+ op.append("ns", ChunkType::ConfigNS);
+
+ BSONObjBuilder n(op.subobjStart("o"));
+ n.append(ChunkType::name(), Chunk::genID(ns, min));
+ myVersion.addToBSON(n, ChunkType::DEPRECATED_lastmod());
+ n.append(ChunkType::ns(), ns);
+ n.append(ChunkType::min(), min);
+ n.append(ChunkType::max(), max);
+ n.append(ChunkType::shard(), toShardName);
+ n.done();
+
+ BSONObjBuilder q(op.subobjStart("o2"));
+ q.append(ChunkType::name(), Chunk::genID(ns, min));
+ q.done();
+
+ updates.append(op.obj());
+ }
+
+ nextVersion = myVersion;
+
+ // if we have chunks left on the FROM shard, update the version of one of them as
+ // well. we can figure that out by grabbing the metadata installed on 5.a
+
+ const CollectionMetadataPtr bumpedCollMetadata(shardingState.getCollectionMetadata(ns));
+ if (bumpedCollMetadata->getNumChunks() > 0) {
+ // get another chunk on that shard
+ ChunkType bumpChunk;
+ bool chunkRes =
+ bumpedCollMetadata->getNextChunk(bumpedCollMetadata->getMinKey(), &bumpChunk);
+ BSONObj bumpMin = bumpChunk.getMin();
+ BSONObj bumpMax = bumpChunk.getMax();
+
+ (void)chunkRes; // for compile warning on non-debug
+ dassert(chunkRes);
+ dassert(bumpMin.woCompare(min) != 0);
+
+ BSONObjBuilder op;
+ op.append("op", "u");
+ op.appendBool("b", false);
+ op.append("ns", ChunkType::ConfigNS);
+
+ nextVersion.incMinor(); // same as used on donateChunk
+ BSONObjBuilder n(op.subobjStart("o"));
+ n.append(ChunkType::name(), Chunk::genID(ns, bumpMin));
+ nextVersion.addToBSON(n, ChunkType::DEPRECATED_lastmod());
+ n.append(ChunkType::ns(), ns);
+ n.append(ChunkType::min(), bumpMin);
+ n.append(ChunkType::max(), bumpMax);
+ n.append(ChunkType::shard(), fromShardName);
+ n.done();
+
+ BSONObjBuilder q(op.subobjStart("o2"));
+ q.append(ChunkType::name(), Chunk::genID(ns, bumpMin));
+ q.done();
+
+ updates.append(op.obj());
+
+ log() << "moveChunk updating self version to: " << nextVersion << " through "
+ << bumpMin << " -> " << bumpMax << " for collection '" << ns << "'"
+ << migrateLog;
+
+ } else {
+ log() << "moveChunk moved last chunk out for collection '" << ns << "'"
+ << migrateLog;
+ }
- BSONArrayBuilder preCond;
+ BSONArrayBuilder preCond;
+ {
+ BSONObjBuilder b;
+ b.append("ns", ChunkType::ConfigNS);
+ b.append("q",
+ BSON("query" << BSON(ChunkType::ns(ns)) << "orderby"
+ << BSON(ChunkType::DEPRECATED_lastmod() << -1)));
{
- BSONObjBuilder b;
- b.append("ns", ChunkType::ConfigNS);
- b.append("q", BSON("query" << BSON(ChunkType::ns(ns)) <<
- "orderby" << BSON(ChunkType::DEPRECATED_lastmod() << -1)));
- {
- BSONObjBuilder bb( b.subobjStart( "res" ) );
- // TODO: For backwards compatibility, we can't yet require an epoch here
- bb.appendTimestamp(ChunkType::DEPRECATED_lastmod(), origCollVersion.toLong());
- bb.done();
- }
- preCond.append( b.obj() );
+ BSONObjBuilder bb(b.subobjStart("res"));
+ // TODO: For backwards compatibility, we can't yet require an epoch here
+ bb.appendTimestamp(ChunkType::DEPRECATED_lastmod(), origCollVersion.toLong());
+ bb.done();
}
+ preCond.append(b.obj());
+ }
- Status applyOpsStatus{Status::OK()};
- try {
-
- // For testing migration failures
- if ( MONGO_FAIL_POINT(failMigrationConfigWritePrepare) ) {
- throw DBException( "mock migration failure before config write",
- PrepareConfigsFailedCode );
- }
+ Status applyOpsStatus{Status::OK()};
+ try {
+ // For testing migration failures
+ if (MONGO_FAIL_POINT(failMigrationConfigWritePrepare)) {
+ throw DBException("mock migration failure before config write",
+ PrepareConfigsFailedCode);
+ }
- applyOpsStatus = grid.catalogManager()->applyChunkOpsDeprecated(updates.arr(),
- preCond.arr());
+ applyOpsStatus =
+ grid.catalogManager()->applyChunkOpsDeprecated(updates.arr(), preCond.arr());
- if (MONGO_FAIL_POINT(failMigrationApplyOps)) {
- throw SocketException(SocketException::RECV_ERROR,
- shardingState.getConfigServer());
- }
- }
- catch (const DBException& ex) {
- warning() << ex << migrateLog;
- applyOpsStatus = ex.toStatus();
+ if (MONGO_FAIL_POINT(failMigrationApplyOps)) {
+ throw SocketException(SocketException::RECV_ERROR,
+ shardingState.getConfigServer());
}
+ } catch (const DBException& ex) {
+ warning() << ex << migrateLog;
+ applyOpsStatus = ex.toStatus();
+ }
- if (applyOpsStatus == ErrorCodes::PrepareConfigsFailedCode) {
-
- // In the process of issuing the migrate commit, the SyncClusterConnection
- // checks that the config servers are reachable. If they are not, we are
- // sure that the applyOps command was not sent to any of the configs, so we
- // can safely back out of the migration here, by resetting the shard
- // version that we bumped up to in the donateChunk() call above.
+ if (applyOpsStatus == ErrorCodes::PrepareConfigsFailedCode) {
+ // In the process of issuing the migrate commit, the SyncClusterConnection
+ // checks that the config servers are reachable. If they are not, we are
+ // sure that the applyOps command was not sent to any of the configs, so we
+ // can safely back out of the migration here, by resetting the shard
+ // version that we bumped up to in the donateChunk() call above.
- log() << "About to acquire moveChunk coll lock to reset shard version from "
- << "failed migration";
+ log() << "About to acquire moveChunk coll lock to reset shard version from "
+ << "failed migration";
- {
- ScopedTransaction transaction(txn, MODE_IX);
- Lock::DBLock dbLock(txn->lockState(), nsToDatabaseSubstring(ns), MODE_IX);
- Lock::CollectionLock collLock(txn->lockState(), ns, MODE_X);
+ {
+ ScopedTransaction transaction(txn, MODE_IX);
+ Lock::DBLock dbLock(txn->lockState(), nsToDatabaseSubstring(ns), MODE_IX);
+ Lock::CollectionLock collLock(txn->lockState(), ns, MODE_X);
- // Revert the metadata back to the state before "forgetting"
- // about the chunk.
- shardingState.undoDonateChunk(txn, ns, origCollMetadata);
- }
+ // Revert the metadata back to the state before "forgetting"
+ // about the chunk.
+ shardingState.undoDonateChunk(txn, ns, origCollMetadata);
+ }
- log() << "Shard version successfully reset to clean up failed migration";
+ log() << "Shard version successfully reset to clean up failed migration";
- errmsg = "Failed to send migrate commit to configs because " + errmsg;
- return false;
+ errmsg = "Failed to send migrate commit to configs because " + errmsg;
+ return false;
- }
- else if (!applyOpsStatus.isOK()) {
-
- // this could be a blip in the connectivity
- // wait out a few seconds and check if the commit request made it
- //
- // if the commit made it to the config, we'll see the chunk in the new shard and there's no action
- // if the commit did not make it, currently the only way to fix this state is to bounce the mongod so
- // that the old state (before migrating) be brought in
-
- warning() << "moveChunk commit outcome ongoing" << migrateLog;
- sleepsecs( 10 );
-
- // look for the chunk in this shard whose version got bumped
- // we assume that if that mod made it to the config, the applyOps was successful
- try {
- std::vector<ChunkType> newestChunk;
- Status status = grid.catalogManager()->getChunks(
- Query(BSON(ChunkType::ns(ns)))
- .sort(ChunkType::DEPRECATED_lastmod(), -1),
- 1,
- &newestChunk);
- uassertStatusOK(status);
-
- ChunkVersion checkVersion;
- if (!newestChunk.empty()) {
- invariant(newestChunk.size() == 1);
- checkVersion = newestChunk[0].getVersion();
- }
+ } else if (!applyOpsStatus.isOK()) {
+ // this could be a blip in the connectivity
+ // wait out a few seconds and check if the commit request made it
+ //
+ // if the commit made it to the config, we'll see the chunk in the new shard and there's no action
+ // if the commit did not make it, currently the only way to fix this state is to bounce the mongod so
+ // that the old state (before migrating) be brought in
- if (checkVersion.equals(nextVersion)) {
- log() << "moveChunk commit confirmed" << migrateLog;
- errmsg.clear();
+ warning() << "moveChunk commit outcome ongoing" << migrateLog;
+ sleepsecs(10);
- }
- else {
- error() << "moveChunk commit failed: version is at "
- << checkVersion << " instead of " << nextVersion << migrateLog;
- error() << "TERMINATING" << migrateLog;
- dbexit( EXIT_SHARDING_ERROR );
- }
+ // look for the chunk in this shard whose version got bumped
+ // we assume that if that mod made it to the config, the applyOps was successful
+ try {
+ std::vector<ChunkType> newestChunk;
+ Status status = grid.catalogManager()->getChunks(
+ Query(BSON(ChunkType::ns(ns))).sort(ChunkType::DEPRECATED_lastmod(), -1),
+ 1,
+ &newestChunk);
+ uassertStatusOK(status);
+
+ ChunkVersion checkVersion;
+ if (!newestChunk.empty()) {
+ invariant(newestChunk.size() == 1);
+ checkVersion = newestChunk[0].getVersion();
}
- catch ( ... ) {
- error() << "moveChunk failed to get confirmation of commit" << migrateLog;
+
+ if (checkVersion.equals(nextVersion)) {
+ log() << "moveChunk commit confirmed" << migrateLog;
+ errmsg.clear();
+
+ } else {
+ error() << "moveChunk commit failed: version is at " << checkVersion
+ << " instead of " << nextVersion << migrateLog;
error() << "TERMINATING" << migrateLog;
- dbexit( EXIT_SHARDING_ERROR );
+ dbexit(EXIT_SHARDING_ERROR);
}
+ } catch (...) {
+ error() << "moveChunk failed to get confirmation of commit" << migrateLog;
+ error() << "TERMINATING" << migrateLog;
+ dbexit(EXIT_SHARDING_ERROR);
}
+ }
- migrateFromStatus.setInCriticalSection( false );
-
- // 5.d
- BSONObjBuilder commitInfo;
- commitInfo.appendElements( chunkInfo );
- if (res["counts"].type() == Object) {
- commitInfo.appendElements(res["counts"].Obj());
- }
+ migrateFromStatus.setInCriticalSection(false);
- grid.catalogManager()->logChange(txn, "moveChunk.commit", ns, commitInfo.obj());
+ // 5.d
+ BSONObjBuilder commitInfo;
+ commitInfo.appendElements(chunkInfo);
+ if (res["counts"].type() == Object) {
+ commitInfo.appendElements(res["counts"].Obj());
}
- migrateFromStatus.done(txn);
- timing.done(5);
- MONGO_FP_PAUSE_WHILE(moveChunkHangAtStep5);
-
- // 6.
- // NOTE: It is important that the distributed collection lock be held for this step.
- RangeDeleter* deleter = getDeleter();
- RangeDeleterOptions deleterOptions(KeyRange(ns,
- min.getOwned(),
- max.getOwned(),
- shardKeyPattern));
- deleterOptions.writeConcern = writeConcern;
- deleterOptions.waitForOpenCursors = true;
- deleterOptions.fromMigrate = true;
- deleterOptions.onlyRemoveOrphanedDocs = true;
- deleterOptions.removeSaverReason = "post-cleanup";
-
- if (waitForDelete) {
- log() << "doing delete inline for cleanup of chunk data" << migrateLog;
-
- string errMsg;
- // This is an immediate delete, and as a consequence, there could be more
- // deletes happening simultaneously than there are deleter worker threads.
- if (!deleter->deleteNow(txn,
- deleterOptions,
- &errMsg)) {
- log() << "Error occured while performing cleanup: " << errMsg;
- }
- }
- else {
- log() << "forking for cleanup of chunk data" << migrateLog;
+ grid.catalogManager()->logChange(txn, "moveChunk.commit", ns, commitInfo.obj());
+ }
- string errMsg;
- if (!deleter->queueDelete(txn,
- deleterOptions,
- NULL, // Don't want to be notified.
- &errMsg)) {
- log() << "could not queue migration cleanup: " << errMsg;
- }
+ migrateFromStatus.done(txn);
+ timing.done(5);
+ MONGO_FP_PAUSE_WHILE(moveChunkHangAtStep5);
+
+ // 6.
+ // NOTE: It is important that the distributed collection lock be held for this step.
+ RangeDeleter* deleter = getDeleter();
+ RangeDeleterOptions deleterOptions(
+ KeyRange(ns, min.getOwned(), max.getOwned(), shardKeyPattern));
+ deleterOptions.writeConcern = writeConcern;
+ deleterOptions.waitForOpenCursors = true;
+ deleterOptions.fromMigrate = true;
+ deleterOptions.onlyRemoveOrphanedDocs = true;
+ deleterOptions.removeSaverReason = "post-cleanup";
+
+ if (waitForDelete) {
+ log() << "doing delete inline for cleanup of chunk data" << migrateLog;
+
+ string errMsg;
+ // This is an immediate delete, and as a consequence, there could be more
+ // deletes happening simultaneously than there are deleter worker threads.
+ if (!deleter->deleteNow(txn, deleterOptions, &errMsg)) {
+ log() << "Error occured while performing cleanup: " << errMsg;
+ }
+ } else {
+ log() << "forking for cleanup of chunk data" << migrateLog;
+
+ string errMsg;
+ if (!deleter->queueDelete(txn,
+ deleterOptions,
+ NULL, // Don't want to be notified.
+ &errMsg)) {
+ log() << "could not queue migration cleanup: " << errMsg;
}
- timing.done(6);
- MONGO_FP_PAUSE_WHILE(moveChunkHangAtStep6);
+ }
+ timing.done(6);
+ MONGO_FP_PAUSE_WHILE(moveChunkHangAtStep6);
- return true;
+ return true;
+ }
- }
+} moveChunkCmd;
+
+bool ShardingState::inCriticalMigrateSection() {
+ return migrateFromStatus.getInCriticalSection();
+}
- } moveChunkCmd;
+bool ShardingState::waitTillNotInCriticalSection(int maxSecondsToWait) {
+ return migrateFromStatus.waitTillNotInCriticalSection(maxSecondsToWait);
+}
- bool ShardingState::inCriticalMigrateSection() {
- return migrateFromStatus.getInCriticalSection();
+/* -----
+ below this are the "to" side commands
+
+ command to initiate
+ worker thread
+ does initial clone
+ pulls initial change set
+ keeps pulling
+ keeps state
+ command to get state
+ commend to "commit"
+*/
+
+// Enabling / disabling these fail points pauses / resumes MigrateStatus::_go(), the thread
+// that receives a chunk migration from the donor.
+MONGO_FP_DECLARE(migrateThreadHangAtStep1);
+MONGO_FP_DECLARE(migrateThreadHangAtStep2);
+MONGO_FP_DECLARE(migrateThreadHangAtStep3);
+MONGO_FP_DECLARE(migrateThreadHangAtStep4);
+MONGO_FP_DECLARE(migrateThreadHangAtStep5);
+
+class MigrateStatus {
+public:
+ enum State { READY, CLONE, CATCHUP, STEADY, COMMIT_START, DONE, FAIL, ABORT };
+
+ MigrateStatus()
+ : _active(false),
+ _numCloned(0),
+ _clonedBytes(0),
+ _numCatchup(0),
+ _numSteady(0),
+ _state(READY) {}
+
+ void setState(State newState) {
+ std::lock_guard<std::mutex> sl(_mutex);
+ _state = newState;
}
- bool ShardingState::waitTillNotInCriticalSection( int maxSecondsToWait ) {
- return migrateFromStatus.waitTillNotInCriticalSection( maxSecondsToWait );
+ State getState() const {
+ std::lock_guard<std::mutex> sl(_mutex);
+ return _state;
}
- /* -----
- below this are the "to" side commands
+ /**
+ * Returns OK if preparation was successful.
+ */
+ Status prepare(const std::string& ns,
+ const std::string& fromShard,
+ const BSONObj& min,
+ const BSONObj& max,
+ const BSONObj& shardKeyPattern) {
+ std::lock_guard<std::mutex> lk(_mutex);
+
+ if (_active) {
+ return Status(ErrorCodes::ConflictingOperationInProgress,
+ str::stream() << "Active migration already in progress "
+ << "ns: " << _ns << ", from: " << _from << ", min: " << _min
+ << ", max: " << _max);
+ }
- command to initiate
- worker thread
- does initial clone
- pulls initial change set
- keeps pulling
- keeps state
- command to get state
- commend to "commit"
- */
+ _state = READY;
+ _errmsg = "";
- // Enabling / disabling these fail points pauses / resumes MigrateStatus::_go(), the thread
- // that receives a chunk migration from the donor.
- MONGO_FP_DECLARE(migrateThreadHangAtStep1);
- MONGO_FP_DECLARE(migrateThreadHangAtStep2);
- MONGO_FP_DECLARE(migrateThreadHangAtStep3);
- MONGO_FP_DECLARE(migrateThreadHangAtStep4);
- MONGO_FP_DECLARE(migrateThreadHangAtStep5);
+ _ns = ns;
+ _from = fromShard;
+ _min = min;
+ _max = max;
+ _shardKeyPattern = shardKeyPattern;
- class MigrateStatus {
- public:
- enum State {
- READY,
- CLONE,
- CATCHUP,
- STEADY,
- COMMIT_START,
- DONE,
- FAIL,
- ABORT
- };
-
- MigrateStatus():
- _active(false),
- _numCloned(0),
- _clonedBytes(0),
- _numCatchup(0),
- _numSteady(0),
- _state(READY) {
- }
-
- void setState(State newState) {
- std::lock_guard<std::mutex> sl(_mutex);
- _state = newState;
- }
+ _numCloned = 0;
+ _clonedBytes = 0;
+ _numCatchup = 0;
+ _numSteady = 0;
- State getState() const {
- std::lock_guard<std::mutex> sl(_mutex);
- return _state;
- }
+ _active = true;
- /**
- * Returns OK if preparation was successful.
- */
- Status prepare(const std::string& ns,
- const std::string& fromShard,
- const BSONObj& min,
- const BSONObj& max,
- const BSONObj& shardKeyPattern) {
- std::lock_guard<std::mutex> lk(_mutex);
-
- if (_active) {
- return Status(ErrorCodes::ConflictingOperationInProgress,
- str::stream() << "Active migration already in progress "
- << "ns: " << _ns
- << ", from: " << _from
- << ", min: " << _min
- << ", max: " << _max);
- }
-
- _state = READY;
- _errmsg = "";
-
- _ns = ns;
- _from = fromShard;
- _min = min;
- _max = max;
- _shardKeyPattern = shardKeyPattern;
-
- _numCloned = 0;
- _clonedBytes = 0;
- _numCatchup = 0;
- _numSteady = 0;
-
- _active = true;
-
- return Status::OK();
- }
-
- void go(OperationContext* txn,
- const std::string& ns,
- BSONObj min,
- BSONObj max,
- BSONObj shardKeyPattern,
- const std::string& fromShard,
- const OID& epoch,
- const WriteConcernOptions& writeConcern) {
- try {
- _go(txn, ns, min, max, shardKeyPattern, fromShard, epoch, writeConcern);
- }
- catch ( std::exception& e ) {
- {
- std::lock_guard<std::mutex> sl(_mutex);
- _state = FAIL;
- _errmsg = e.what();
- }
+ return Status::OK();
+ }
- error() << "migrate failed: " << e.what() << migrateLog;
+ void go(OperationContext* txn,
+ const std::string& ns,
+ BSONObj min,
+ BSONObj max,
+ BSONObj shardKeyPattern,
+ const std::string& fromShard,
+ const OID& epoch,
+ const WriteConcernOptions& writeConcern) {
+ try {
+ _go(txn, ns, min, max, shardKeyPattern, fromShard, epoch, writeConcern);
+ } catch (std::exception& e) {
+ {
+ std::lock_guard<std::mutex> sl(_mutex);
+ _state = FAIL;
+ _errmsg = e.what();
}
- catch ( ... ) {
- {
- std::lock_guard<std::mutex> sl(_mutex);
- _state = FAIL;
- _errmsg = "UNKNOWN ERROR";
- }
- error() << "migrate failed with unknown exception" << migrateLog;
+ error() << "migrate failed: " << e.what() << migrateLog;
+ } catch (...) {
+ {
+ std::lock_guard<std::mutex> sl(_mutex);
+ _state = FAIL;
+ _errmsg = "UNKNOWN ERROR";
}
- if ( getState() != DONE ) {
- // Unprotect the range if needed/possible on unsuccessful TO migration
- ScopedTransaction transaction(txn, MODE_IX);
- Lock::DBLock dbLock(txn->lockState(), nsToDatabaseSubstring(ns), MODE_IX);
- Lock::CollectionLock collLock(txn->lockState(), ns, MODE_X);
+ error() << "migrate failed with unknown exception" << migrateLog;
+ }
- string errMsg;
- if (!shardingState.forgetPending(txn, ns, min, max, epoch, &errMsg)) {
- warning() << errMsg;
- }
- }
+ if (getState() != DONE) {
+ // Unprotect the range if needed/possible on unsuccessful TO migration
+ ScopedTransaction transaction(txn, MODE_IX);
+ Lock::DBLock dbLock(txn->lockState(), nsToDatabaseSubstring(ns), MODE_IX);
+ Lock::CollectionLock collLock(txn->lockState(), ns, MODE_X);
- setActive( false );
+ string errMsg;
+ if (!shardingState.forgetPending(txn, ns, min, max, epoch, &errMsg)) {
+ warning() << errMsg;
+ }
}
- void _go(OperationContext* txn,
- const std::string& ns,
- BSONObj min,
- BSONObj max,
- BSONObj shardKeyPattern,
- const std::string& fromShard,
- const OID& epoch,
- const WriteConcernOptions& writeConcern) {
- verify( getActive() );
- verify( getState() == READY );
- verify( ! min.isEmpty() );
- verify( ! max.isEmpty() );
+ setActive(false);
+ }
- DisableDocumentValidation validationDisabler(txn);
+ void _go(OperationContext* txn,
+ const std::string& ns,
+ BSONObj min,
+ BSONObj max,
+ BSONObj shardKeyPattern,
+ const std::string& fromShard,
+ const OID& epoch,
+ const WriteConcernOptions& writeConcern) {
+ verify(getActive());
+ verify(getState() == READY);
+ verify(!min.isEmpty());
+ verify(!max.isEmpty());
+
+ DisableDocumentValidation validationDisabler(txn);
+
+ log() << "starting receiving-end of migration of chunk " << min << " -> " << max
+ << " for collection " << ns << " from " << fromShard << " at epoch "
+ << epoch.toString();
+
+ string errmsg;
+ MoveTimingHelper timing(txn, "to", ns, min, max, 5 /* steps */, &errmsg, "", "");
+
+ ScopedDbConnection conn(fromShard);
+ conn->getLastError(); // just test connection
+
+ NamespaceString nss(ns);
+ {
+ // 0. copy system.namespaces entry if collection doesn't already exist
+ OldClientWriteContext ctx(txn, ns);
+ if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(nss)) {
+ errmsg = str::stream() << "Not primary during migration: " << ns
+ << ": checking if collection exists";
+ warning() << errmsg;
+ setState(FAIL);
+ return;
+ }
- log() << "starting receiving-end of migration of chunk " << min << " -> " << max <<
- " for collection " << ns << " from " << fromShard
- << " at epoch " << epoch.toString();
+ // Only copy if ns doesn't already exist
+ Database* db = ctx.db();
+ Collection* collection = db->getCollection(ns);
- string errmsg;
- MoveTimingHelper timing(txn, "to", ns, min, max, 5 /* steps */, &errmsg, "", "");
+ if (!collection) {
+ list<BSONObj> infos = conn->getCollectionInfos(
+ nsToDatabase(ns), BSON("name" << nsToCollectionSubstring(ns)));
- ScopedDbConnection conn(fromShard);
- conn->getLastError(); // just test connection
+ BSONObj options;
+ if (infos.size() > 0) {
+ BSONObj entry = infos.front();
+ if (entry["options"].isABSONObj()) {
+ options = entry["options"].Obj();
+ }
+ }
- NamespaceString nss(ns);
- {
- // 0. copy system.namespaces entry if collection doesn't already exist
- OldClientWriteContext ctx(txn, ns );
- if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(nss)) {
- errmsg = str::stream() << "Not primary during migration: " << ns
- << ": checking if collection exists";
- warning() << errmsg;
- setState(FAIL);
- return;
+ WriteUnitOfWork wuow(txn);
+ Status status = userCreateNS(txn, db, ns, options, false);
+ if (!status.isOK()) {
+ warning() << "failed to create collection [" << ns << "] "
+ << " with options " << options << ": " << status;
}
+ wuow.commit();
+ }
+ }
- // Only copy if ns doesn't already exist
- Database* db = ctx.db();
- Collection* collection = db->getCollection( ns );
+ {
+ // 1. copy indexes
- if ( !collection ) {
- list<BSONObj> infos =
- conn->getCollectionInfos(nsToDatabase(ns),
- BSON("name" << nsToCollectionSubstring(ns)));
+ vector<BSONObj> indexSpecs;
+ {
+ const std::list<BSONObj> indexes = conn->getIndexSpecs(ns);
+ indexSpecs.insert(indexSpecs.begin(), indexes.begin(), indexes.end());
+ }
- BSONObj options;
- if (infos.size() > 0) {
- BSONObj entry = infos.front();
- if (entry["options"].isABSONObj()) {
- options = entry["options"].Obj();
- }
- }
+ ScopedTransaction transaction(txn, MODE_IX);
+ Lock::DBLock lk(txn->lockState(), nsToDatabaseSubstring(ns), MODE_X);
+ OldClientContext ctx(txn, ns);
+ if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(nss)) {
+ errmsg = str::stream() << "Not primary during migration: " << ns;
+ warning() << errmsg;
+ setState(FAIL);
+ return;
+ }
- WriteUnitOfWork wuow(txn);
- Status status = userCreateNS(txn, db, ns, options, false);
- if ( !status.isOK() ) {
- warning() << "failed to create collection [" << ns << "] "
- << " with options " << options << ": " << status;
- }
- wuow.commit();
- }
+ Database* db = ctx.db();
+ Collection* collection = db->getCollection(ns);
+ if (!collection) {
+ errmsg = str::stream() << "collection dropped during migration: " << ns;
+ warning() << errmsg;
+ setState(FAIL);
+ return;
}
- {
- // 1. copy indexes
+ MultiIndexBlock indexer(txn, collection);
- vector<BSONObj> indexSpecs;
- {
- const std::list<BSONObj> indexes = conn->getIndexSpecs(ns);
- indexSpecs.insert(indexSpecs.begin(), indexes.begin(), indexes.end());
- }
+ indexer.removeExistingIndexes(&indexSpecs);
- ScopedTransaction transaction(txn, MODE_IX);
- Lock::DBLock lk(txn->lockState(), nsToDatabaseSubstring(ns), MODE_X);
- OldClientContext ctx(txn, ns);
- if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(nss)) {
- errmsg = str::stream() << "Not primary during migration: " << ns;
+ if (!indexSpecs.empty()) {
+ // Only copy indexes if the collection does not have any documents.
+ if (collection->numRecords(txn) > 0) {
+ errmsg = str::stream() << "aborting migration, shard is missing "
+ << indexSpecs.size() << " indexes and "
+ << "collection is not empty. Non-trivial "
+ << "index creation should be scheduled manually";
warning() << errmsg;
setState(FAIL);
return;
}
- Database* db = ctx.db();
- Collection* collection = db->getCollection( ns );
- if ( !collection ) {
- errmsg = str::stream() << "collection dropped during migration: " << ns;
+ Status status = indexer.init(indexSpecs);
+ if (!status.isOK()) {
+ errmsg = str::stream() << "failed to create index before migrating data. "
+ << " error: " << status.toString();
warning() << errmsg;
setState(FAIL);
return;
}
- MultiIndexBlock indexer(txn, collection);
+ status = indexer.insertAllDocumentsInCollection();
+ if (!status.isOK()) {
+ errmsg = str::stream() << "failed to create index before migrating data. "
+ << " error: " << status.toString();
+ warning() << errmsg;
+ setState(FAIL);
+ return;
+ }
- indexer.removeExistingIndexes(&indexSpecs);
+ WriteUnitOfWork wunit(txn);
+ indexer.commit();
- if (!indexSpecs.empty()) {
- // Only copy indexes if the collection does not have any documents.
- if (collection->numRecords(txn) > 0) {
- errmsg = str::stream() << "aborting migration, shard is missing "
- << indexSpecs.size() << " indexes and "
- << "collection is not empty. Non-trivial "
- << "index creation should be scheduled manually";
- warning() << errmsg;
- setState(FAIL);
- return;
- }
+ for (size_t i = 0; i < indexSpecs.size(); i++) {
+ // make sure to create index on secondaries as well
+ getGlobalServiceContext()->getOpObserver()->onCreateIndex(
+ txn, db->getSystemIndexesName(), indexSpecs[i], true /* fromMigrate */);
+ }
- Status status = indexer.init(indexSpecs);
- if ( !status.isOK() ) {
- errmsg = str::stream() << "failed to create index before migrating data. "
- << " error: " << status.toString();
- warning() << errmsg;
- setState(FAIL);
- return;
- }
+ wunit.commit();
+ }
- status = indexer.insertAllDocumentsInCollection();
- if ( !status.isOK() ) {
- errmsg = str::stream() << "failed to create index before migrating data. "
- << " error: " << status.toString();
- warning() << errmsg;
- setState(FAIL);
- return;
- }
+ timing.done(1);
+ MONGO_FP_PAUSE_WHILE(migrateThreadHangAtStep1);
+ }
- WriteUnitOfWork wunit(txn);
- indexer.commit();
+ {
+ // 2. delete any data already in range
+ RangeDeleterOptions deleterOptions(
+ KeyRange(ns, min.getOwned(), max.getOwned(), shardKeyPattern));
+ deleterOptions.writeConcern = writeConcern;
+ // No need to wait since all existing cursors will filter out this range when
+ // returning the results.
+ deleterOptions.waitForOpenCursors = false;
+ deleterOptions.fromMigrate = true;
+ deleterOptions.onlyRemoveOrphanedDocs = true;
+ deleterOptions.removeSaverReason = "preCleanup";
- for (size_t i = 0; i < indexSpecs.size(); i++) {
- // make sure to create index on secondaries as well
- getGlobalServiceContext()->getOpObserver()->onCreateIndex(
- txn,
- db->getSystemIndexesName(),
- indexSpecs[i],
- true /* fromMigrate */);
- }
+ string errMsg;
- wunit.commit();
- }
-
- timing.done(1);
- MONGO_FP_PAUSE_WHILE(migrateThreadHangAtStep1);
+ if (!getDeleter()->deleteNow(txn, deleterOptions, &errMsg)) {
+ warning() << "Failed to queue delete for migrate abort: " << errMsg;
+ setState(FAIL);
+ return;
}
{
- // 2. delete any data already in range
- RangeDeleterOptions deleterOptions(KeyRange(ns,
- min.getOwned(),
- max.getOwned(),
- shardKeyPattern));
- deleterOptions.writeConcern = writeConcern;
- // No need to wait since all existing cursors will filter out this range when
- // returning the results.
- deleterOptions.waitForOpenCursors = false;
- deleterOptions.fromMigrate = true;
- deleterOptions.onlyRemoveOrphanedDocs = true;
- deleterOptions.removeSaverReason = "preCleanup";
-
- string errMsg;
-
- if (!getDeleter()->deleteNow(txn, deleterOptions, &errMsg)) {
- warning() << "Failed to queue delete for migrate abort: " << errMsg;
+ // Protect the range by noting that we're now starting a migration to it
+ ScopedTransaction transaction(txn, MODE_IX);
+ Lock::DBLock dbLock(txn->lockState(), nsToDatabaseSubstring(ns), MODE_IX);
+ Lock::CollectionLock collLock(txn->lockState(), ns, MODE_X);
+
+ if (!shardingState.notePending(txn, ns, min, max, epoch, &errmsg)) {
+ warning() << errmsg;
setState(FAIL);
return;
}
+ }
- {
- // Protect the range by noting that we're now starting a migration to it
- ScopedTransaction transaction(txn, MODE_IX);
- Lock::DBLock dbLock(txn->lockState(), nsToDatabaseSubstring(ns), MODE_IX);
- Lock::CollectionLock collLock(txn->lockState(), ns, MODE_X);
+ timing.done(2);
+ MONGO_FP_PAUSE_WHILE(migrateThreadHangAtStep2);
+ }
- if (!shardingState.notePending(txn, ns, min, max, epoch, &errmsg)) {
- warning() << errmsg;
- setState(FAIL);
- return;
- }
- }
+ State currentState = getState();
+ if (currentState == FAIL || currentState == ABORT) {
+ string errMsg;
+ RangeDeleterOptions deleterOptions(
+ KeyRange(ns, min.getOwned(), max.getOwned(), shardKeyPattern));
+ deleterOptions.writeConcern = writeConcern;
+ // No need to wait since all existing cursors will filter out this range when
+ // returning the results.
+ deleterOptions.waitForOpenCursors = false;
+ deleterOptions.fromMigrate = true;
+ deleterOptions.onlyRemoveOrphanedDocs = true;
- timing.done(2);
- MONGO_FP_PAUSE_WHILE(migrateThreadHangAtStep2);
- }
-
- State currentState = getState();
- if (currentState == FAIL || currentState == ABORT) {
- string errMsg;
- RangeDeleterOptions deleterOptions(KeyRange(ns,
- min.getOwned(),
- max.getOwned(),
- shardKeyPattern));
- deleterOptions.writeConcern = writeConcern;
- // No need to wait since all existing cursors will filter out this range when
- // returning the results.
- deleterOptions.waitForOpenCursors = false;
- deleterOptions.fromMigrate = true;
- deleterOptions.onlyRemoveOrphanedDocs = true;
-
- if (!getDeleter()->queueDelete(txn, deleterOptions, NULL /* notifier */, &errMsg)) {
- warning() << "Failed to queue delete for migrate abort: " << errMsg;
- }
+ if (!getDeleter()->queueDelete(txn, deleterOptions, NULL /* notifier */, &errMsg)) {
+ warning() << "Failed to queue delete for migrate abort: " << errMsg;
}
+ }
- {
- // 3. initial bulk clone
- setState(CLONE);
-
- while ( true ) {
- BSONObj res;
- if ( ! conn->runCommand( "admin" , BSON( "_migrateClone" << 1 ) , res ) ) { // gets array of objects to copy, in disk order
- setState(FAIL);
- errmsg = "_migrateClone failed: ";
- errmsg += res.toString();
+ {
+ // 3. initial bulk clone
+ setState(CLONE);
+
+ while (true) {
+ BSONObj res;
+ if (!conn->runCommand("admin",
+ BSON("_migrateClone" << 1),
+ res)) { // gets array of objects to copy, in disk order
+ setState(FAIL);
+ errmsg = "_migrateClone failed: ";
+ errmsg += res.toString();
+ error() << errmsg << migrateLog;
+ conn.done();
+ return;
+ }
+
+ BSONObj arr = res["objects"].Obj();
+ int thisTime = 0;
+
+ BSONObjIterator i(arr);
+ while (i.more()) {
+ txn->checkForInterrupt();
+
+ if (getState() == ABORT) {
+ errmsg = str::stream() << "Migration abort requested while "
+ << "copying documents";
error() << errmsg << migrateLog;
- conn.done();
return;
}
- BSONObj arr = res["objects"].Obj();
- int thisTime = 0;
-
- BSONObjIterator i( arr );
- while( i.more() ) {
- txn->checkForInterrupt();
-
- if ( getState() == ABORT ) {
- errmsg = str::stream() << "Migration abort requested while "
- << "copying documents";
- error() << errmsg << migrateLog;
- return;
+ BSONObj docToClone = i.next().Obj();
+ {
+ OldClientWriteContext cx(txn, ns);
+
+ BSONObj localDoc;
+ if (willOverrideLocalId(txn,
+ ns,
+ min,
+ max,
+ shardKeyPattern,
+ cx.db(),
+ docToClone,
+ &localDoc)) {
+ string errMsg = str::stream() << "cannot migrate chunk, local document "
+ << localDoc << " has same _id as cloned "
+ << "remote document " << docToClone;
+
+ warning() << errMsg;
+
+ // Exception will abort migration cleanly
+ uasserted(16976, errMsg);
}
- BSONObj docToClone = i.next().Obj();
- {
- OldClientWriteContext cx(txn, ns );
-
- BSONObj localDoc;
- if (willOverrideLocalId(txn,
- ns,
- min,
- max,
- shardKeyPattern,
- cx.db(),
- docToClone,
- &localDoc)) {
- string errMsg =
- str::stream() << "cannot migrate chunk, local document "
- << localDoc
- << " has same _id as cloned "
- << "remote document " << docToClone;
-
- warning() << errMsg;
-
- // Exception will abort migration cleanly
- uasserted( 16976, errMsg );
- }
-
- Helpers::upsert( txn, ns, docToClone, true );
- }
- thisTime++;
+ Helpers::upsert(txn, ns, docToClone, true);
+ }
+ thisTime++;
- {
- std::lock_guard<std::mutex> statsLock(_mutex);
- _numCloned++;
- _clonedBytes += docToClone.objsize();
- }
+ {
+ std::lock_guard<std::mutex> statsLock(_mutex);
+ _numCloned++;
+ _clonedBytes += docToClone.objsize();
+ }
- if (writeConcern.shouldWaitForOtherNodes()) {
- repl::ReplicationCoordinator::StatusAndDuration replStatus =
- repl::getGlobalReplicationCoordinator()->awaitReplication(
- txn,
- repl::ReplClientInfo::forClient(
- txn->getClient()).getLastOp(),
- writeConcern);
- if (replStatus.status.code() == ErrorCodes::WriteConcernFailed) {
- warning() << "secondaryThrottle on, but doc insert timed out; "
- "continuing";
- }
- else {
- massertStatusOK(replStatus.status);
- }
+ if (writeConcern.shouldWaitForOtherNodes()) {
+ repl::ReplicationCoordinator::StatusAndDuration replStatus =
+ repl::getGlobalReplicationCoordinator()->awaitReplication(
+ txn,
+ repl::ReplClientInfo::forClient(txn->getClient()).getLastOp(),
+ writeConcern);
+ if (replStatus.status.code() == ErrorCodes::WriteConcernFailed) {
+ warning() << "secondaryThrottle on, but doc insert timed out; "
+ "continuing";
+ } else {
+ massertStatusOK(replStatus.status);
}
}
-
- if ( thisTime == 0 )
- break;
}
- timing.done(3);
- MONGO_FP_PAUSE_WHILE(migrateThreadHangAtStep3);
+ if (thisTime == 0)
+ break;
}
- // If running on a replicated system, we'll need to flush the docs we cloned to the
- // secondaries
- repl::OpTime lastOpApplied =
- repl::ReplClientInfo::forClient(txn->getClient()).getLastOp();
+ timing.done(3);
+ MONGO_FP_PAUSE_WHILE(migrateThreadHangAtStep3);
+ }
- {
- // 4. do bulk of mods
- setState(CATCHUP);
- while ( true ) {
- BSONObj res;
- if ( ! conn->runCommand( "admin" , BSON( "_transferMods" << 1 ) , res ) ) {
- setState(FAIL);
- errmsg = "_transferMods failed: ";
- errmsg += res.toString();
- error() << "_transferMods failed: " << res << migrateLog;
- conn.done();
- return;
- }
- if ( res["size"].number() == 0 )
- break;
+ // If running on a replicated system, we'll need to flush the docs we cloned to the
+ // secondaries
+ repl::OpTime lastOpApplied = repl::ReplClientInfo::forClient(txn->getClient()).getLastOp();
- apply(txn, ns, min, max, shardKeyPattern, res, &lastOpApplied);
+ {
+ // 4. do bulk of mods
+ setState(CATCHUP);
+ while (true) {
+ BSONObj res;
+ if (!conn->runCommand("admin", BSON("_transferMods" << 1), res)) {
+ setState(FAIL);
+ errmsg = "_transferMods failed: ";
+ errmsg += res.toString();
+ error() << "_transferMods failed: " << res << migrateLog;
+ conn.done();
+ return;
+ }
+ if (res["size"].number() == 0)
+ break;
- const int maxIterations = 3600*50;
- int i;
- for ( i=0;i<maxIterations; i++) {
- txn->checkForInterrupt();
+ apply(txn, ns, min, max, shardKeyPattern, res, &lastOpApplied);
- if ( getState() == ABORT ) {
- errmsg = str::stream() << "Migration abort requested while waiting "
- << "for replication at catch up stage";
- error() << errmsg << migrateLog;
+ const int maxIterations = 3600 * 50;
+ int i;
+ for (i = 0; i < maxIterations; i++) {
+ txn->checkForInterrupt();
- return;
- }
+ if (getState() == ABORT) {
+ errmsg = str::stream() << "Migration abort requested while waiting "
+ << "for replication at catch up stage";
+ error() << errmsg << migrateLog;
- if (opReplicatedEnough(txn, lastOpApplied, writeConcern))
- break;
+ return;
+ }
- if ( i > 100 ) {
- warning() << "secondaries having hard time keeping up with migrate" << migrateLog;
- }
+ if (opReplicatedEnough(txn, lastOpApplied, writeConcern))
+ break;
- sleepmillis( 20 );
+ if (i > 100) {
+ warning() << "secondaries having hard time keeping up with migrate"
+ << migrateLog;
}
- if ( i == maxIterations ) {
- errmsg = "secondary can't keep up with migrate";
- error() << errmsg << migrateLog;
- conn.done();
- setState(FAIL);
- return;
- }
+ sleepmillis(20);
}
- timing.done(4);
- MONGO_FP_PAUSE_WHILE(migrateThreadHangAtStep4);
+ if (i == maxIterations) {
+ errmsg = "secondary can't keep up with migrate";
+ error() << errmsg << migrateLog;
+ conn.done();
+ setState(FAIL);
+ return;
+ }
}
- {
- // pause to wait for replication
- // this will prevent us from going into critical section until we're ready
- Timer t;
- while ( t.minutes() < 600 ) {
- txn->checkForInterrupt();
+ timing.done(4);
+ MONGO_FP_PAUSE_WHILE(migrateThreadHangAtStep4);
+ }
- if (getState() == ABORT) {
- errmsg = "Migration abort requested while waiting for replication";
- error() << errmsg << migrateLog;
- return;
- }
+ {
+ // pause to wait for replication
+ // this will prevent us from going into critical section until we're ready
+ Timer t;
+ while (t.minutes() < 600) {
+ txn->checkForInterrupt();
- log() << "Waiting for replication to catch up before entering critical section"
- ;
- if (flushPendingWrites(txn, ns, min, max, lastOpApplied, writeConcern))
- break;
- sleepsecs(1);
+ if (getState() == ABORT) {
+ errmsg = "Migration abort requested while waiting for replication";
+ error() << errmsg << migrateLog;
+ return;
}
- if (t.minutes() >= 600) {
- setState(FAIL);
- errmsg = "Cannot go to critical section because secondaries cannot keep up";
- error() << errmsg << migrateLog;
- return;
- }
+ log() << "Waiting for replication to catch up before entering critical section";
+ if (flushPendingWrites(txn, ns, min, max, lastOpApplied, writeConcern))
+ break;
+ sleepsecs(1);
}
- {
- // 5. wait for commit
-
- setState(STEADY);
- bool transferAfterCommit = false;
- while ( getState() == STEADY || getState() == COMMIT_START ) {
- txn->checkForInterrupt();
+ if (t.minutes() >= 600) {
+ setState(FAIL);
+ errmsg = "Cannot go to critical section because secondaries cannot keep up";
+ error() << errmsg << migrateLog;
+ return;
+ }
+ }
- // Make sure we do at least one transfer after recv'ing the commit message
- // If we aren't sure that at least one transfer happens *after* our state
- // changes to COMMIT_START, there could be mods still on the FROM shard that
- // got logged *after* our _transferMods but *before* the critical section.
- if ( getState() == COMMIT_START ) transferAfterCommit = true;
-
- BSONObj res;
- if ( ! conn->runCommand( "admin" , BSON( "_transferMods" << 1 ) , res ) ) {
- log() << "_transferMods failed in STEADY state: " << res << migrateLog;
- errmsg = res.toString();
- setState(FAIL);
- conn.done();
- return;
- }
+ {
+ // 5. wait for commit
- if (res["size"].number() > 0 &&
- apply(txn, ns, min, max, shardKeyPattern, res, &lastOpApplied)) {
- continue;
- }
+ setState(STEADY);
+ bool transferAfterCommit = false;
+ while (getState() == STEADY || getState() == COMMIT_START) {
+ txn->checkForInterrupt();
- if ( getState() == ABORT ) {
- return;
- }
+ // Make sure we do at least one transfer after recv'ing the commit message
+ // If we aren't sure that at least one transfer happens *after* our state
+ // changes to COMMIT_START, there could be mods still on the FROM shard that
+ // got logged *after* our _transferMods but *before* the critical section.
+ if (getState() == COMMIT_START)
+ transferAfterCommit = true;
- // We know we're finished when:
- // 1) The from side has told us that it has locked writes (COMMIT_START)
- // 2) We've checked at least one more time for un-transmitted mods
- if ( getState() == COMMIT_START && transferAfterCommit == true ) {
- if (flushPendingWrites(txn, ns, min, max, lastOpApplied, writeConcern))
- break;
- }
+ BSONObj res;
+ if (!conn->runCommand("admin", BSON("_transferMods" << 1), res)) {
+ log() << "_transferMods failed in STEADY state: " << res << migrateLog;
+ errmsg = res.toString();
+ setState(FAIL);
+ conn.done();
+ return;
+ }
- // Only sleep if we aren't committing
- if ( getState() == STEADY ) sleepmillis( 10 );
+ if (res["size"].number() > 0 &&
+ apply(txn, ns, min, max, shardKeyPattern, res, &lastOpApplied)) {
+ continue;
}
- if ( getState() == FAIL ) {
- errmsg = "timed out waiting for commit";
+ if (getState() == ABORT) {
return;
}
- timing.done(5);
- MONGO_FP_PAUSE_WHILE(migrateThreadHangAtStep5);
+ // We know we're finished when:
+ // 1) The from side has told us that it has locked writes (COMMIT_START)
+ // 2) We've checked at least one more time for un-transmitted mods
+ if (getState() == COMMIT_START && transferAfterCommit == true) {
+ if (flushPendingWrites(txn, ns, min, max, lastOpApplied, writeConcern))
+ break;
+ }
+
+ // Only sleep if we aren't committing
+ if (getState() == STEADY)
+ sleepmillis(10);
}
- setState(DONE);
- conn.done();
+ if (getState() == FAIL) {
+ errmsg = "timed out waiting for commit";
+ return;
+ }
+
+ timing.done(5);
+ MONGO_FP_PAUSE_WHILE(migrateThreadHangAtStep5);
}
- void status(BSONObjBuilder& b) {
- std::lock_guard<std::mutex> sl(_mutex);
+ setState(DONE);
+ conn.done();
+ }
- b.appendBool("active", _active);
+ void status(BSONObjBuilder& b) {
+ std::lock_guard<std::mutex> sl(_mutex);
- b.append("ns", _ns);
- b.append("from", _from);
- b.append("min", _min);
- b.append("max", _max);
- b.append("shardKeyPattern", _shardKeyPattern);
+ b.appendBool("active", _active);
- b.append("state", stateToString(_state));
- if (_state == FAIL) {
- b.append("errmsg", _errmsg);
- }
+ b.append("ns", _ns);
+ b.append("from", _from);
+ b.append("min", _min);
+ b.append("max", _max);
+ b.append("shardKeyPattern", _shardKeyPattern);
- BSONObjBuilder bb(b.subobjStart("counts"));
- bb.append("cloned", _numCloned);
- bb.append("clonedBytes", _clonedBytes);
- bb.append("catchup", _numCatchup);
- bb.append("steady", _numSteady);
- bb.done();
+ b.append("state", stateToString(_state));
+ if (_state == FAIL) {
+ b.append("errmsg", _errmsg);
}
- bool apply(OperationContext* txn,
- const string& ns,
- BSONObj min,
- BSONObj max,
- BSONObj shardKeyPattern,
- const BSONObj& xfer,
- repl::OpTime* lastOpApplied) {
-
- repl::OpTime dummy;
- if ( lastOpApplied == NULL ) {
- lastOpApplied = &dummy;
- }
+ BSONObjBuilder bb(b.subobjStart("counts"));
+ bb.append("cloned", _numCloned);
+ bb.append("clonedBytes", _clonedBytes);
+ bb.append("catchup", _numCatchup);
+ bb.append("steady", _numSteady);
+ bb.done();
+ }
- bool didAnything = false;
+ bool apply(OperationContext* txn,
+ const string& ns,
+ BSONObj min,
+ BSONObj max,
+ BSONObj shardKeyPattern,
+ const BSONObj& xfer,
+ repl::OpTime* lastOpApplied) {
+ repl::OpTime dummy;
+ if (lastOpApplied == NULL) {
+ lastOpApplied = &dummy;
+ }
- if ( xfer["deleted"].isABSONObj() ) {
- ScopedTransaction transaction(txn, MODE_IX);
- Lock::DBLock dlk(txn->lockState(), nsToDatabaseSubstring(ns), MODE_IX);
- Helpers::RemoveSaver rs( "moveChunk" , ns , "removedDuring" );
+ bool didAnything = false;
- BSONObjIterator i( xfer["deleted"].Obj() );
- while ( i.more() ) {
- Lock::CollectionLock clk(txn->lockState(), ns, MODE_X);
- OldClientContext ctx(txn, ns);
+ if (xfer["deleted"].isABSONObj()) {
+ ScopedTransaction transaction(txn, MODE_IX);
+ Lock::DBLock dlk(txn->lockState(), nsToDatabaseSubstring(ns), MODE_IX);
+ Helpers::RemoveSaver rs("moveChunk", ns, "removedDuring");
- BSONObj id = i.next().Obj();
+ BSONObjIterator i(xfer["deleted"].Obj());
+ while (i.more()) {
+ Lock::CollectionLock clk(txn->lockState(), ns, MODE_X);
+ OldClientContext ctx(txn, ns);
- // do not apply deletes if they do not belong to the chunk being migrated
- BSONObj fullObj;
- if (Helpers::findById(txn, ctx.db(), ns.c_str(), id, fullObj)) {
- if (!isInRange(fullObj , min , max , shardKeyPattern)) {
- log() << "not applying out of range deletion: " << fullObj << migrateLog;
+ BSONObj id = i.next().Obj();
- continue;
- }
- }
+ // do not apply deletes if they do not belong to the chunk being migrated
+ BSONObj fullObj;
+ if (Helpers::findById(txn, ctx.db(), ns.c_str(), id, fullObj)) {
+ if (!isInRange(fullObj, min, max, shardKeyPattern)) {
+ log() << "not applying out of range deletion: " << fullObj << migrateLog;
- if (serverGlobalParams.moveParanoia) {
- rs.goingToDelete(fullObj);
+ continue;
}
+ }
- deleteObjects(txn,
- ctx.db(),
- ns,
- id,
- PlanExecutor::YIELD_MANUAL,
- true /* justOne */,
- false /* god */,
- true /* fromMigrate */);
-
- *lastOpApplied = repl::ReplClientInfo::forClient(txn->getClient()).getLastOp();
- didAnything = true;
+ if (serverGlobalParams.moveParanoia) {
+ rs.goingToDelete(fullObj);
}
+
+ deleteObjects(txn,
+ ctx.db(),
+ ns,
+ id,
+ PlanExecutor::YIELD_MANUAL,
+ true /* justOne */,
+ false /* god */,
+ true /* fromMigrate */);
+
+ *lastOpApplied = repl::ReplClientInfo::forClient(txn->getClient()).getLastOp();
+ didAnything = true;
}
+ }
- if ( xfer["reload"].isABSONObj() ) {
- BSONObjIterator i( xfer["reload"].Obj() );
- while ( i.more() ) {
- OldClientWriteContext cx(txn, ns);
-
- BSONObj updatedDoc = i.next().Obj();
-
- BSONObj localDoc;
- if (willOverrideLocalId(txn,
- ns,
- min,
- max,
- shardKeyPattern,
- cx.db(),
- updatedDoc,
- &localDoc)) {
- string errMsg =
- str::stream() << "cannot migrate chunk, local document "
- << localDoc
- << " has same _id as reloaded remote document "
- << updatedDoc;
-
- warning() << errMsg;
-
- // Exception will abort migration cleanly
- uasserted( 16977, errMsg );
- }
+ if (xfer["reload"].isABSONObj()) {
+ BSONObjIterator i(xfer["reload"].Obj());
+ while (i.more()) {
+ OldClientWriteContext cx(txn, ns);
- // We are in write lock here, so sure we aren't killing
- Helpers::upsert( txn, ns , updatedDoc , true );
+ BSONObj updatedDoc = i.next().Obj();
- *lastOpApplied = repl::ReplClientInfo::forClient(txn->getClient()).getLastOp();
- didAnything = true;
- }
- }
+ BSONObj localDoc;
+ if (willOverrideLocalId(
+ txn, ns, min, max, shardKeyPattern, cx.db(), updatedDoc, &localDoc)) {
+ string errMsg = str::stream()
+ << "cannot migrate chunk, local document " << localDoc
+ << " has same _id as reloaded remote document " << updatedDoc;
- return didAnything;
- }
+ warning() << errMsg;
- /**
- * Checks if an upsert of a remote document will override a local document with the same _id
- * but in a different range on this shard.
- * Must be in WriteContext to avoid races and DBHelper errors.
- * TODO: Could optimize this check out if sharding on _id.
- */
- bool willOverrideLocalId(OperationContext* txn,
- const string& ns,
- BSONObj min,
- BSONObj max,
- BSONObj shardKeyPattern,
- Database* db,
- BSONObj remoteDoc,
- BSONObj* localDoc) {
+ // Exception will abort migration cleanly
+ uasserted(16977, errMsg);
+ }
+
+ // We are in write lock here, so sure we aren't killing
+ Helpers::upsert(txn, ns, updatedDoc, true);
- *localDoc = BSONObj();
- if ( Helpers::findById( txn, db, ns.c_str(), remoteDoc, *localDoc ) ) {
- return !isInRange( *localDoc , min , max , shardKeyPattern );
+ *lastOpApplied = repl::ReplClientInfo::forClient(txn->getClient()).getLastOp();
+ didAnything = true;
}
+ }
- return false;
+ return didAnything;
+ }
+
+ /**
+ * Checks if an upsert of a remote document will override a local document with the same _id
+ * but in a different range on this shard.
+ * Must be in WriteContext to avoid races and DBHelper errors.
+ * TODO: Could optimize this check out if sharding on _id.
+ */
+ bool willOverrideLocalId(OperationContext* txn,
+ const string& ns,
+ BSONObj min,
+ BSONObj max,
+ BSONObj shardKeyPattern,
+ Database* db,
+ BSONObj remoteDoc,
+ BSONObj* localDoc) {
+ *localDoc = BSONObj();
+ if (Helpers::findById(txn, db, ns.c_str(), remoteDoc, *localDoc)) {
+ return !isInRange(*localDoc, min, max, shardKeyPattern);
}
- /**
- * Returns true if the majority of the nodes and the nodes corresponding to the given
- * writeConcern (if not empty) have applied till the specified lastOp.
- */
- bool opReplicatedEnough(OperationContext* txn,
- const repl::OpTime& lastOpApplied,
- const WriteConcernOptions& writeConcern) {
- WriteConcernOptions majorityWriteConcern;
- majorityWriteConcern.wTimeout = -1;
- majorityWriteConcern.wMode = WriteConcernOptions::kMajority;
- Status majorityStatus = repl::getGlobalReplicationCoordinator()->awaitReplication(
- txn, lastOpApplied, majorityWriteConcern).status;
-
- if (!writeConcern.shouldWaitForOtherNodes()) {
- return majorityStatus.isOK();
- }
-
- // Also enforce the user specified write concern after "majority" so it covers
- // the union of the 2 write concerns.
-
- WriteConcernOptions userWriteConcern(writeConcern);
- userWriteConcern.wTimeout = -1;
- Status userStatus = repl::getGlobalReplicationCoordinator()->awaitReplication(
- txn, lastOpApplied, userWriteConcern).status;
-
- return majorityStatus.isOK() && userStatus.isOK();
- }
-
- bool flushPendingWrites(OperationContext* txn,
- const std::string& ns,
- BSONObj min,
- BSONObj max,
- const repl::OpTime& lastOpApplied,
- const WriteConcernOptions& writeConcern) {
- if (!opReplicatedEnough(txn, lastOpApplied, writeConcern)) {
- repl::OpTime op(lastOpApplied);
- OCCASIONALLY warning() << "migrate commit waiting for a majority of slaves for '"
- << ns << "' " << min << " -> " << max
- << " waiting for: " << op
- << migrateLog;
- return false;
- }
+ return false;
+ }
- log() << "migrate commit succeeded flushing to secondaries for '" << ns << "' " << min << " -> " << max << migrateLog;
+ /**
+ * Returns true if the majority of the nodes and the nodes corresponding to the given
+ * writeConcern (if not empty) have applied till the specified lastOp.
+ */
+ bool opReplicatedEnough(OperationContext* txn,
+ const repl::OpTime& lastOpApplied,
+ const WriteConcernOptions& writeConcern) {
+ WriteConcernOptions majorityWriteConcern;
+ majorityWriteConcern.wTimeout = -1;
+ majorityWriteConcern.wMode = WriteConcernOptions::kMajority;
+ Status majorityStatus = repl::getGlobalReplicationCoordinator()
+ ->awaitReplication(txn, lastOpApplied, majorityWriteConcern)
+ .status;
+
+ if (!writeConcern.shouldWaitForOtherNodes()) {
+ return majorityStatus.isOK();
+ }
- {
- // Get global lock to wait for write to be commited to journal.
- ScopedTransaction transaction(txn, MODE_S);
- Lock::GlobalRead lk(txn->lockState());
+ // Also enforce the user specified write concern after "majority" so it covers
+ // the union of the 2 write concerns.
- // if durability is on, force a write to journal
- if (getDur().commitNow(txn)) {
- log() << "migrate commit flushed to journal for '" << ns << "' " << min << " -> " << max << migrateLog;
- }
- }
+ WriteConcernOptions userWriteConcern(writeConcern);
+ userWriteConcern.wTimeout = -1;
+ Status userStatus = repl::getGlobalReplicationCoordinator()
+ ->awaitReplication(txn, lastOpApplied, userWriteConcern)
+ .status;
- return true;
- }
+ return majorityStatus.isOK() && userStatus.isOK();
+ }
- static string stateToString(State state) {
- switch (state) {
- case READY: return "ready";
- case CLONE: return "clone";
- case CATCHUP: return "catchup";
- case STEADY: return "steady";
- case COMMIT_START: return "commitStart";
- case DONE: return "done";
- case FAIL: return "fail";
- case ABORT: return "abort";
- }
- verify(0);
- return "";
+ bool flushPendingWrites(OperationContext* txn,
+ const std::string& ns,
+ BSONObj min,
+ BSONObj max,
+ const repl::OpTime& lastOpApplied,
+ const WriteConcernOptions& writeConcern) {
+ if (!opReplicatedEnough(txn, lastOpApplied, writeConcern)) {
+ repl::OpTime op(lastOpApplied);
+ OCCASIONALLY warning() << "migrate commit waiting for a majority of slaves for '" << ns
+ << "' " << min << " -> " << max << " waiting for: " << op
+ << migrateLog;
+ return false;
}
- bool startCommit() {
- std::unique_lock<std::mutex> lock(_mutex);
+ log() << "migrate commit succeeded flushing to secondaries for '" << ns << "' " << min
+ << " -> " << max << migrateLog;
- if (_state != STEADY) {
- return false;
- }
+ {
+ // Get global lock to wait for write to be commited to journal.
+ ScopedTransaction transaction(txn, MODE_S);
+ Lock::GlobalRead lk(txn->lockState());
- const auto deadline = system_clock::now() + seconds(30);
- _state = COMMIT_START;
- while (_active) {
- if (std::cv_status::timeout == isActiveCV.wait_until(lock, deadline)) {
- _state = FAIL;
- log() << "startCommit never finished!" << migrateLog;
- return false;
- }
+ // if durability is on, force a write to journal
+ if (getDur().commitNow(txn)) {
+ log() << "migrate commit flushed to journal for '" << ns << "' " << min << " -> "
+ << max << migrateLog;
}
+ }
- if (_state == DONE) {
- return true;
- }
+ return true;
+ }
- log() << "startCommit failed, final data failed to transfer" << migrateLog;
+ static string stateToString(State state) {
+ switch (state) {
+ case READY:
+ return "ready";
+ case CLONE:
+ return "clone";
+ case CATCHUP:
+ return "catchup";
+ case STEADY:
+ return "steady";
+ case COMMIT_START:
+ return "commitStart";
+ case DONE:
+ return "done";
+ case FAIL:
+ return "fail";
+ case ABORT:
+ return "abort";
+ }
+ verify(0);
+ return "";
+ }
+
+ bool startCommit() {
+ std::unique_lock<std::mutex> lock(_mutex);
+
+ if (_state != STEADY) {
return false;
}
- void abort() {
- std::lock_guard<std::mutex> sl(_mutex);
- _state = ABORT;
- _errmsg = "aborted";
+ const auto deadline = system_clock::now() + seconds(30);
+ _state = COMMIT_START;
+ while (_active) {
+ if (std::cv_status::timeout == isActiveCV.wait_until(lock, deadline)) {
+ _state = FAIL;
+ log() << "startCommit never finished!" << migrateLog;
+ return false;
+ }
}
- bool getActive() const { std::lock_guard<std::mutex> lk(_mutex); return _active; }
- void setActive( bool b ) {
- std::lock_guard<std::mutex> lk(_mutex);
- _active = b;
- isActiveCV.notify_all();
+ if (_state == DONE) {
+ return true;
}
- // Guards all fields.
- mutable std::mutex _mutex;
- bool _active;
- std::condition_variable isActiveCV;
+ log() << "startCommit failed, final data failed to transfer" << migrateLog;
+ return false;
+ }
+
+ void abort() {
+ std::lock_guard<std::mutex> sl(_mutex);
+ _state = ABORT;
+ _errmsg = "aborted";
+ }
+
+ bool getActive() const {
+ std::lock_guard<std::mutex> lk(_mutex);
+ return _active;
+ }
+ void setActive(bool b) {
+ std::lock_guard<std::mutex> lk(_mutex);
+ _active = b;
+ isActiveCV.notify_all();
+ }
- std::string _ns;
- std::string _from;
- BSONObj _min;
- BSONObj _max;
- BSONObj _shardKeyPattern;
+ // Guards all fields.
+ mutable std::mutex _mutex;
+ bool _active;
+ std::condition_variable isActiveCV;
- long long _numCloned;
- long long _clonedBytes;
- long long _numCatchup;
- long long _numSteady;
+ std::string _ns;
+ std::string _from;
+ BSONObj _min;
+ BSONObj _max;
+ BSONObj _shardKeyPattern;
- State _state;
- std::string _errmsg;
+ long long _numCloned;
+ long long _clonedBytes;
+ long long _numCatchup;
+ long long _numSteady;
- } migrateStatus;
+ State _state;
+ std::string _errmsg;
- void migrateThread(std::string ns,
- BSONObj min,
- BSONObj max,
- BSONObj shardKeyPattern,
- std::string fromShard,
- OID epoch,
- WriteConcernOptions writeConcern) {
- Client::initThread( "migrateThread" );
- OperationContextImpl txn;
- if (getGlobalAuthorizationManager()->isAuthEnabled()) {
- ShardedConnectionInfo::addHook();
- AuthorizationSession::get(txn.getClient())->grantInternalAuthorization();
- }
+} migrateStatus;
- migrateStatus.go(&txn, ns, min, max, shardKeyPattern, fromShard, epoch, writeConcern);
+void migrateThread(std::string ns,
+ BSONObj min,
+ BSONObj max,
+ BSONObj shardKeyPattern,
+ std::string fromShard,
+ OID epoch,
+ WriteConcernOptions writeConcern) {
+ Client::initThread("migrateThread");
+ OperationContextImpl txn;
+ if (getGlobalAuthorizationManager()->isAuthEnabled()) {
+ ShardedConnectionInfo::addHook();
+ AuthorizationSession::get(txn.getClient())->grantInternalAuthorization();
}
- /**
- * Command for initiating the recipient side of the migration to start copying data
- * from the donor shard.
- *
- * {
- * _recvChunkStart: "namespace",
- * congfigServer: "hostAndPort",
- * from: "hostAndPort",
- * fromShardName: "shardName",
- * toShardName: "shardName",
- * min: {},
- * max: {},
- * shardKeyPattern: {},
- *
- * // optional
- * secondaryThrottle: bool, // defaults to true
- * writeConcern: {} // applies to individual writes.
- * }
- */
- class RecvChunkStartCommand : public ChunkCommandHelper {
- public:
- void help(std::stringstream& h) const { h << "internal"; }
- RecvChunkStartCommand() : ChunkCommandHelper( "_recvChunkStart" ) {}
-
- virtual bool isWriteCommandForConfigServer() const { return false; }
- virtual void addRequiredPrivileges(const std::string& dbname,
- const BSONObj& cmdObj,
- std::vector<Privilege>* out) {
- ActionSet actions;
- actions.addAction(ActionType::internal);
- out->push_back(Privilege(ResourcePattern::forClusterResource(), actions));
- }
- bool run(OperationContext* txn,
- const string&,
- BSONObj& cmdObj,
- int,
- string& errmsg,
- BSONObjBuilder& result) {
-
- // Active state of TO-side migrations (MigrateStatus) is serialized by distributed
- // collection lock.
- if ( migrateStatus.getActive() ) {
- errmsg = "migrate already in progress";
- return false;
- }
+ migrateStatus.go(&txn, ns, min, max, shardKeyPattern, fromShard, epoch, writeConcern);
+}
- // Pending deletes (for migrations) are serialized by the distributed collection lock,
- // we are sure we registered a delete for a range *before* we can migrate-in a
- // subrange.
- const size_t numDeletes = getDeleter()->getTotalDeletes();
- if (numDeletes > 0) {
+/**
+ * Command for initiating the recipient side of the migration to start copying data
+ * from the donor shard.
+ *
+ * {
+ * _recvChunkStart: "namespace",
+ * congfigServer: "hostAndPort",
+ * from: "hostAndPort",
+ * fromShardName: "shardName",
+ * toShardName: "shardName",
+ * min: {},
+ * max: {},
+ * shardKeyPattern: {},
+ *
+ * // optional
+ * secondaryThrottle: bool, // defaults to true
+ * writeConcern: {} // applies to individual writes.
+ * }
+ */
+class RecvChunkStartCommand : public ChunkCommandHelper {
+public:
+ void help(std::stringstream& h) const {
+ h << "internal";
+ }
+ RecvChunkStartCommand() : ChunkCommandHelper("_recvChunkStart") {}
- errmsg = str::stream() << "can't accept new chunks because "
- << " there are still " << numDeletes
- << " deletes from previous migration";
+ virtual bool isWriteCommandForConfigServer() const {
+ return false;
+ }
+ virtual void addRequiredPrivileges(const std::string& dbname,
+ const BSONObj& cmdObj,
+ std::vector<Privilege>* out) {
+ ActionSet actions;
+ actions.addAction(ActionType::internal);
+ out->push_back(Privilege(ResourcePattern::forClusterResource(), actions));
+ }
+ bool run(OperationContext* txn,
+ const string&,
+ BSONObj& cmdObj,
+ int,
+ string& errmsg,
+ BSONObjBuilder& result) {
+ // Active state of TO-side migrations (MigrateStatus) is serialized by distributed
+ // collection lock.
+ if (migrateStatus.getActive()) {
+ errmsg = "migrate already in progress";
+ return false;
+ }
+
+ // Pending deletes (for migrations) are serialized by the distributed collection lock,
+ // we are sure we registered a delete for a range *before* we can migrate-in a
+ // subrange.
+ const size_t numDeletes = getDeleter()->getTotalDeletes();
+ if (numDeletes > 0) {
+ errmsg = str::stream() << "can't accept new chunks because "
+ << " there are still " << numDeletes
+ << " deletes from previous migration";
+
+ warning() << errmsg;
+ return false;
+ }
+
+ if (!shardingState.enabled()) {
+ if (!cmdObj["configServer"].eoo()) {
+ dassert(cmdObj["configServer"].type() == String);
+ ShardingState::initialize(cmdObj["configServer"].String());
+ } else {
+ errmsg = str::stream()
+ << "cannot start recv'ing chunk, "
+ << "sharding is not enabled and no config server was provided";
warning() << errmsg;
return false;
}
+ }
- if (!shardingState.enabled()) {
- if (!cmdObj["configServer"].eoo()) {
- dassert(cmdObj["configServer"].type() == String);
- ShardingState::initialize(cmdObj["configServer"].String());
- }
- else {
-
- errmsg = str::stream()
- << "cannot start recv'ing chunk, "
- << "sharding is not enabled and no config server was provided";
+ if (!cmdObj["toShardName"].eoo()) {
+ dassert(cmdObj["toShardName"].type() == String);
+ shardingState.gotShardName(cmdObj["toShardName"].String());
+ }
- warning() << errmsg;
- return false;
- }
- }
+ string ns = cmdObj.firstElement().String();
+ BSONObj min = cmdObj["min"].Obj().getOwned();
+ BSONObj max = cmdObj["max"].Obj().getOwned();
- if ( !cmdObj["toShardName"].eoo() ) {
- dassert( cmdObj["toShardName"].type() == String );
- shardingState.gotShardName( cmdObj["toShardName"].String() );
- }
+ // Refresh our collection manager from the config server, we need a collection manager
+ // to start registering pending chunks.
+ // We force the remote refresh here to make the behavior consistent and predictable,
+ // generally we'd refresh anyway, and to be paranoid.
+ ChunkVersion currentVersion;
+ Status status = shardingState.refreshMetadataNow(txn, ns, &currentVersion);
- string ns = cmdObj.firstElement().String();
- BSONObj min = cmdObj["min"].Obj().getOwned();
- BSONObj max = cmdObj["max"].Obj().getOwned();
+ if (!status.isOK()) {
+ errmsg = str::stream() << "cannot start recv'ing chunk "
+ << "[" << min << "," << max << ")" << causedBy(status.reason());
- // Refresh our collection manager from the config server, we need a collection manager
- // to start registering pending chunks.
- // We force the remote refresh here to make the behavior consistent and predictable,
- // generally we'd refresh anyway, and to be paranoid.
- ChunkVersion currentVersion;
- Status status = shardingState.refreshMetadataNow(txn, ns, &currentVersion );
+ warning() << errmsg;
+ return false;
+ }
- if ( !status.isOK() ) {
- errmsg = str::stream() << "cannot start recv'ing chunk "
- << "[" << min << "," << max << ")"
- << causedBy( status.reason() );
+ // Process secondary throttle settings and assign defaults if necessary.
+ WriteConcernOptions writeConcern;
+ status = writeConcern.parseSecondaryThrottle(cmdObj, NULL);
- warning() << errmsg;
- return false;
+ if (!status.isOK()) {
+ if (status.code() != ErrorCodes::WriteConcernNotDefined) {
+ warning() << status.toString();
+ return appendCommandStatus(result, status);
}
- // Process secondary throttle settings and assign defaults if necessary.
- WriteConcernOptions writeConcern;
- status = writeConcern.parseSecondaryThrottle(cmdObj, NULL);
+ writeConcern = getDefaultWriteConcern();
+ } else {
+ repl::ReplicationCoordinator* replCoordinator = repl::getGlobalReplicationCoordinator();
- if (!status.isOK()){
- if (status.code() != ErrorCodes::WriteConcernNotDefined) {
- warning() << status.toString();
- return appendCommandStatus(result, status);
- }
+ if (replCoordinator->getReplicationMode() ==
+ repl::ReplicationCoordinator::modeMasterSlave &&
+ writeConcern.shouldWaitForOtherNodes()) {
+ warning() << "recvChunk cannot check if secondary throttle setting "
+ << writeConcern.toBSON()
+ << " can be enforced in a master slave configuration";
+ }
- writeConcern = getDefaultWriteConcern();
+ Status status = replCoordinator->checkIfWriteConcernCanBeSatisfied(writeConcern);
+ if (!status.isOK() && status != ErrorCodes::NoReplicationEnabled) {
+ warning() << status.toString();
+ return appendCommandStatus(result, status);
}
- else {
- repl::ReplicationCoordinator* replCoordinator =
- repl::getGlobalReplicationCoordinator();
+ }
- if (replCoordinator->getReplicationMode() ==
- repl::ReplicationCoordinator::modeMasterSlave &&
- writeConcern.shouldWaitForOtherNodes()) {
- warning() << "recvChunk cannot check if secondary throttle setting "
- << writeConcern.toBSON()
- << " can be enforced in a master slave configuration";
- }
+ if (writeConcern.shouldWaitForOtherNodes() &&
+ writeConcern.wTimeout == WriteConcernOptions::kNoTimeout) {
+ // Don't allow no timeout.
+ writeConcern.wTimeout = kDefaultWTimeoutMs;
+ }
- Status status = replCoordinator->checkIfWriteConcernCanBeSatisfied(writeConcern);
- if (!status.isOK() && status != ErrorCodes::NoReplicationEnabled) {
- warning() << status.toString();
- return appendCommandStatus(result, status);
- }
- }
+ BSONObj shardKeyPattern;
+ if (cmdObj.hasField("shardKeyPattern")) {
+ shardKeyPattern = cmdObj["shardKeyPattern"].Obj().getOwned();
+ } else {
+ // shardKeyPattern may not be provided if another shard is from pre 2.2
+ // In that case, assume the shard key pattern is the same as the range
+ // specifiers provided.
+ BSONObj keya = Helpers::inferKeyPattern(min);
+ BSONObj keyb = Helpers::inferKeyPattern(max);
+ verify(keya == keyb);
+
+ warning()
+ << "No shard key pattern provided by source shard for migration."
+ " This is likely because the source shard is running a version prior to 2.2."
+ " Falling back to assuming the shard key matches the pattern of the min and max"
+ " chunk range specifiers. Inferred shard key: " << keya;
+
+ shardKeyPattern = keya.getOwned();
+ }
- if (writeConcern.shouldWaitForOtherNodes() &&
- writeConcern.wTimeout == WriteConcernOptions::kNoTimeout) {
- // Don't allow no timeout.
- writeConcern.wTimeout = kDefaultWTimeoutMs;
- }
+ const string fromShard(cmdObj["from"].String());
- BSONObj shardKeyPattern;
- if (cmdObj.hasField("shardKeyPattern")) {
- shardKeyPattern = cmdObj["shardKeyPattern"].Obj().getOwned();
- } else {
- // shardKeyPattern may not be provided if another shard is from pre 2.2
- // In that case, assume the shard key pattern is the same as the range
- // specifiers provided.
- BSONObj keya = Helpers::inferKeyPattern(min);
- BSONObj keyb = Helpers::inferKeyPattern(max);
- verify( keya == keyb );
+ // Set the TO-side migration to active
+ Status prepareStatus = migrateStatus.prepare(ns, fromShard, min, max, shardKeyPattern);
- warning() << "No shard key pattern provided by source shard for migration."
- " This is likely because the source shard is running a version prior to 2.2."
- " Falling back to assuming the shard key matches the pattern of the min and max"
- " chunk range specifiers. Inferred shard key: " << keya;
+ if (!prepareStatus.isOK()) {
+ return appendCommandStatus(result, prepareStatus);
+ }
- shardKeyPattern = keya.getOwned();
- }
+ std::thread m(migrateThread,
+ ns,
+ min,
+ max,
+ shardKeyPattern,
+ fromShard,
+ currentVersion.epoch(),
+ writeConcern);
+
+ m.detach();
+ result.appendBool("started", true);
+ return true;
+ }
- const string fromShard(cmdObj["from"].String());
+} recvChunkStartCmd;
- // Set the TO-side migration to active
- Status prepareStatus = migrateStatus.prepare(ns, fromShard, min, max, shardKeyPattern);
+class RecvChunkStatusCommand : public ChunkCommandHelper {
+public:
+ void help(std::stringstream& h) const {
+ h << "internal";
+ }
+ RecvChunkStatusCommand() : ChunkCommandHelper("_recvChunkStatus") {}
+ virtual void addRequiredPrivileges(const std::string& dbname,
+ const BSONObj& cmdObj,
+ std::vector<Privilege>* out) {
+ ActionSet actions;
+ actions.addAction(ActionType::internal);
+ out->push_back(Privilege(ResourcePattern::forClusterResource(), actions));
+ }
+ bool run(OperationContext* txn,
+ const string&,
+ BSONObj& cmdObj,
+ int,
+ string& errmsg,
+ BSONObjBuilder& result) {
+ migrateStatus.status(result);
+ return 1;
+ }
- if (!prepareStatus.isOK()) {
- return appendCommandStatus(result, prepareStatus);
- }
+} recvChunkStatusCommand;
- std::thread m(migrateThread,
- ns,
- min,
- max,
- shardKeyPattern,
- fromShard,
- currentVersion.epoch(),
- writeConcern);
+class RecvChunkCommitCommand : public ChunkCommandHelper {
+public:
+ void help(std::stringstream& h) const {
+ h << "internal";
+ }
+ RecvChunkCommitCommand() : ChunkCommandHelper("_recvChunkCommit") {}
+ virtual void addRequiredPrivileges(const std::string& dbname,
+ const BSONObj& cmdObj,
+ std::vector<Privilege>* out) {
+ ActionSet actions;
+ actions.addAction(ActionType::internal);
+ out->push_back(Privilege(ResourcePattern::forClusterResource(), actions));
+ }
+ bool run(OperationContext* txn,
+ const string&,
+ BSONObj& cmdObj,
+ int,
+ string& errmsg,
+ BSONObjBuilder& result) {
+ bool ok = migrateStatus.startCommit();
+ migrateStatus.status(result);
+ return ok;
+ }
- m.detach();
- result.appendBool( "started" , true );
- return true;
- }
+} recvChunkCommitCommand;
- } recvChunkStartCmd;
+class RecvChunkAbortCommand : public ChunkCommandHelper {
+public:
+ void help(std::stringstream& h) const {
+ h << "internal";
+ }
+ RecvChunkAbortCommand() : ChunkCommandHelper("_recvChunkAbort") {}
+ virtual void addRequiredPrivileges(const std::string& dbname,
+ const BSONObj& cmdObj,
+ std::vector<Privilege>* out) {
+ ActionSet actions;
+ actions.addAction(ActionType::internal);
+ out->push_back(Privilege(ResourcePattern::forClusterResource(), actions));
+ }
+ bool run(OperationContext* txn,
+ const string&,
+ BSONObj& cmdObj,
+ int,
+ string& errmsg,
+ BSONObjBuilder& result) {
+ migrateStatus.abort();
+ migrateStatus.status(result);
+ return true;
+ }
- class RecvChunkStatusCommand : public ChunkCommandHelper {
- public:
- void help(std::stringstream& h) const { h << "internal"; }
- RecvChunkStatusCommand() : ChunkCommandHelper( "_recvChunkStatus" ) {}
- virtual void addRequiredPrivileges(const std::string& dbname,
- const BSONObj& cmdObj,
- std::vector<Privilege>* out) {
- ActionSet actions;
- actions.addAction(ActionType::internal);
- out->push_back(Privilege(ResourcePattern::forClusterResource(), actions));
- }
- bool run(OperationContext* txn,
- const string&,
- BSONObj& cmdObj,
- int,
- string& errmsg,
- BSONObjBuilder& result) {
- migrateStatus.status( result );
- return 1;
- }
-
- } recvChunkStatusCommand;
-
- class RecvChunkCommitCommand : public ChunkCommandHelper {
- public:
- void help(std::stringstream& h) const { h << "internal"; }
- RecvChunkCommitCommand() : ChunkCommandHelper( "_recvChunkCommit" ) {}
- virtual void addRequiredPrivileges(const std::string& dbname,
- const BSONObj& cmdObj,
- std::vector<Privilege>* out) {
- ActionSet actions;
- actions.addAction(ActionType::internal);
- out->push_back(Privilege(ResourcePattern::forClusterResource(), actions));
- }
- bool run(OperationContext* txn,
- const string&,
- BSONObj& cmdObj,
- int,
- string& errmsg,
- BSONObjBuilder& result) {
- bool ok = migrateStatus.startCommit();
- migrateStatus.status( result );
- return ok;
- }
-
- } recvChunkCommitCommand;
-
- class RecvChunkAbortCommand : public ChunkCommandHelper {
- public:
- void help(std::stringstream& h) const { h << "internal"; }
- RecvChunkAbortCommand() : ChunkCommandHelper( "_recvChunkAbort" ) {}
- virtual void addRequiredPrivileges(const std::string& dbname,
- const BSONObj& cmdObj,
- std::vector<Privilege>* out) {
- ActionSet actions;
- actions.addAction(ActionType::internal);
- out->push_back(Privilege(ResourcePattern::forClusterResource(), actions));
- }
- bool run(OperationContext* txn,
- const string&,
- BSONObj& cmdObj,
- int,
- string& errmsg,
- BSONObjBuilder& result) {
- migrateStatus.abort();
- migrateStatus.status( result );
- return true;
- }
+} recvChunkAboortCommand;
- } recvChunkAboortCommand;
+class IsInRangeTest : public StartupTest {
+public:
+ void run() {
+ BSONObj min = BSON("x" << 1);
+ BSONObj max = BSON("x" << 5);
+ BSONObj skey = BSON("x" << 1);
- class IsInRangeTest : public StartupTest {
- public:
- void run() {
- BSONObj min = BSON( "x" << 1 );
- BSONObj max = BSON( "x" << 5 );
- BSONObj skey = BSON( "x" << 1 );
-
- verify( ! isInRange( BSON( "x" << 0 ) , min , max , skey ) );
- verify( isInRange( BSON( "x" << 1 ) , min , max , skey ) );
- verify( isInRange( BSON( "x" << 3 ) , min , max , skey ) );
- verify( isInRange( BSON( "x" << 4 ) , min , max , skey ) );
- verify( ! isInRange( BSON( "x" << 5 ) , min , max , skey ) );
- verify( ! isInRange( BSON( "x" << 6 ) , min , max , skey ) );
-
- BSONObj obj = BSON( "n" << 3 );
- BSONObj min2 = BSON( "x" << BSONElementHasher::hash64( obj.firstElement() , 0 ) - 2 );
- BSONObj max2 = BSON( "x" << BSONElementHasher::hash64( obj.firstElement() , 0 ) + 2 );
- BSONObj hashedKey = BSON( "x" << "hashed" );
-
- verify( isInRange( BSON( "x" << 3 ) , min2 , max2 , hashedKey ) );
- verify( ! isInRange( BSON( "x" << 3 ) , min , max , hashedKey ) );
- verify( ! isInRange( BSON( "x" << 4 ) , min2 , max2 , hashedKey ) );
-
- LOG(1) << "isInRangeTest passed" << migrateLog;
- }
- } isInRangeTest;
+ verify(!isInRange(BSON("x" << 0), min, max, skey));
+ verify(isInRange(BSON("x" << 1), min, max, skey));
+ verify(isInRange(BSON("x" << 3), min, max, skey));
+ verify(isInRange(BSON("x" << 4), min, max, skey));
+ verify(!isInRange(BSON("x" << 5), min, max, skey));
+ verify(!isInRange(BSON("x" << 6), min, max, skey));
+
+ BSONObj obj = BSON("n" << 3);
+ BSONObj min2 = BSON("x" << BSONElementHasher::hash64(obj.firstElement(), 0) - 2);
+ BSONObj max2 = BSON("x" << BSONElementHasher::hash64(obj.firstElement(), 0) + 2);
+ BSONObj hashedKey = BSON("x"
+ << "hashed");
+
+ verify(isInRange(BSON("x" << 3), min2, max2, hashedKey));
+ verify(!isInRange(BSON("x" << 3), min, max, hashedKey));
+ verify(!isInRange(BSON("x" << 4), min2, max2, hashedKey));
+
+ LOG(1) << "isInRangeTest passed" << migrateLog;
+ }
+} isInRangeTest;
}