diff options
author | Dianna Hohensee <dianna.hohensee@10gen.com> | 2015-12-02 11:51:41 -0500 |
---|---|---|
committer | Dianna Hohensee <dianna.hohensee@10gen.com> | 2015-12-22 09:12:08 -0500 |
commit | 3663e004dfc2f73b82b3d88b5fa1ac6b7dcd1d33 (patch) | |
tree | d46813559c37225f31768042e2bd2a4ba8ce1ba0 | |
parent | 07b2e02ffddbd9998b8c738895f3fae85b592176 (diff) | |
download | mongo-3663e004dfc2f73b82b3d88b5fa1ac6b7dcd1d33.tar.gz |
SERVER-21382 fixing sharding migration to transfer only document deletions relevant to the chunk being migrated, not every deletion
-rw-r--r-- | buildscripts/resmokeconfig/suites/sharding_auth.yml | 1 | ||||
-rw-r--r-- | buildscripts/resmokeconfig/suites/sharding_auth_audit.yml | 1 | ||||
-rw-r--r-- | jstests/libs/chunk_manipulation_util.js | 8 | ||||
-rw-r--r-- | jstests/sharding/migration_with_source_deletes.js | 123 | ||||
-rw-r--r-- | src/mongo/db/catalog/collection.cpp | 11 | ||||
-rw-r--r-- | src/mongo/db/op_observer.cpp | 36 | ||||
-rw-r--r-- | src/mongo/db/op_observer.h | 11 | ||||
-rw-r--r-- | src/mongo/db/operation_context.h | 2 | ||||
-rw-r--r-- | src/mongo/db/s/migration_destination_manager.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/s/migration_source_manager.cpp | 12 | ||||
-rw-r--r-- | src/mongo/db/s/migration_source_manager.h | 2 | ||||
-rw-r--r-- | src/mongo/s/d_migrate.cpp | 4 | ||||
-rw-r--r-- | src/mongo/s/d_state.h | 8 |
13 files changed, 202 insertions, 22 deletions
diff --git a/buildscripts/resmokeconfig/suites/sharding_auth.yml b/buildscripts/resmokeconfig/suites/sharding_auth.yml index 06e1118b7d9..578632650e0 100644 --- a/buildscripts/resmokeconfig/suites/sharding_auth.yml +++ b/buildscripts/resmokeconfig/suites/sharding_auth.yml @@ -23,6 +23,7 @@ selector: # Skip the testcases that do not have auth bypass when running ops in parallel. - jstests/sharding/cleanup_orphaned_cmd_during_movechunk.js # SERVER-21713 - jstests/sharding/cleanup_orphaned_cmd_during_movechunk_hashed.js # SERVER-21713 + - jstests/sharding/migration_with_source_deletes.js # SERVER-21713 executor: js_test: diff --git a/buildscripts/resmokeconfig/suites/sharding_auth_audit.yml b/buildscripts/resmokeconfig/suites/sharding_auth_audit.yml index b577387cd0c..bed76c61261 100644 --- a/buildscripts/resmokeconfig/suites/sharding_auth_audit.yml +++ b/buildscripts/resmokeconfig/suites/sharding_auth_audit.yml @@ -23,6 +23,7 @@ selector: # Skip the testcases that do not have auth bypass when running ops in parallel. - jstests/sharding/cleanup_orphaned_cmd_during_movechunk.js # SERVER-21713 - jstests/sharding/cleanup_orphaned_cmd_during_movechunk_hashed.js # SERVER-21713 + - jstests/sharding/migration_with_source_deletes.js # SERVER-21713 executor: js_test: diff --git a/jstests/libs/chunk_manipulation_util.js b/jstests/libs/chunk_manipulation_util.js index b1e4d5a90e1..8ad487667cf 100644 --- a/jstests/libs/chunk_manipulation_util.js +++ b/jstests/libs/chunk_manipulation_util.js @@ -201,7 +201,7 @@ function waitForMigrateStep( shardConnection, stepNumber ) { admin = shardConnection.getDB( 'admin' ); assert( stepNumber >= 1); - assert( stepNumber <= 5 ); + assert( stepNumber <= 5); var msg = ( 'Migrate thread on ' + shardConnection.shardName @@ -215,7 +215,11 @@ function waitForMigrateStep( shardConnection, stepNumber ) { for ( var i = 0; i < in_progress.length; ++i ) { var op = in_progress[i]; if ( op.desc && op.desc === 'migrateThread' ) { - return op.msg.startsWith( searchString ); + if (op.hasOwnProperty('msg')) { + return op.msg.startsWith( searchString ); + } else { + return false; + } } } diff --git a/jstests/sharding/migration_with_source_deletes.js b/jstests/sharding/migration_with_source_deletes.js new file mode 100644 index 00000000000..afc00e513e2 --- /dev/null +++ b/jstests/sharding/migration_with_source_deletes.js @@ -0,0 +1,123 @@ +// +// Tests during chunk migration that the recipient does not receive out of range deletes from +// the donor. +// +// Pauses the migration on the recipient shard after the initial data chunk cloning is finished. +// This allows time for the donor shard to perform deletes, half of which are on the migrating +// chunk. The recipient is then set to continue, collecting the delete mods from the donor, and +// finishes the migration. A failpoint is set prior to resuming in the recipient shard to fail +// if it receives an out of chunk range delete from the donor's delete mods log. +// +// The idea is that the recipient shard should not be collecting deletes from the donor shard +// that are not in range and that will unnecessarily prevent the migration from finishing: the +// migration can only end when donor's log of delete mods for the migrating chunk is empty. +// + +load('./jstests/libs/chunk_manipulation_util.js'); + +(function() { +"use strict"; + +var staticMongod = MongoRunner.runMongod({}); // For startParallelOps. + +/** + * Start up new sharded cluster, stop balancer that would interfere in manual chunk management. + */ + +var st = new ShardingTest({ shards : 2, mongos : 1 }); +st.stopBalancer(); + +var mongos = st.s0, + admin = mongos.getDB('admin'), + shards = mongos.getCollection('config.shards').find().toArray(), + dbName = "testDB", + ns = dbName + ".foo", + coll = mongos.getCollection(ns), + donor = st.shard0, + recipient = st.shard1, + donorColl = donor.getCollection(ns), + recipientColl = recipient.getCollection(ns); + +/** + * Exable sharding, and split collection into two chunks. + */ + +// Two chunks +// Donor: [0, 10) [10, 20) +// Recipient: +jsTest.log('Enable sharding of the collection and pre-split into two chunks....'); +assert.commandWorked( admin.runCommand( {enableSharding: dbName} ) ); +st.ensurePrimaryShard(dbName, shards[0]._id); +assert.commandWorked( admin.runCommand( {shardCollection: ns, key: {a: 1}} ) ); +assert.commandWorked( admin.runCommand( {split: ns, middle: {a: 10}} ) ); + +/** + * Insert data into collection + */ + +// 10 documents in each chunk on the donor +jsTest.log('Inserting 20 docs into donor shard, 10 in each chunk....'); +for (var i = 0; i < 20; ++i) donorColl.insert( {a: i} ); +assert.eq(null, donorColl.getDB().getLastError()); +assert.eq(20, donorColl.count()); + +/** + * Set failpoints. Recipient will crash if an out of chunk range data delete is + * received from donor. Recipient will pause migration after cloning chunk data from donor, + * before checking delete mods log on donor. + */ + +jsTest.log('setting failpoint failMigrationReceivedOutOfRangeDelete'); +assert.commandWorked(recipient.getDB('admin').runCommand( {configureFailPoint: 'failMigrationReceivedOutOfRangeDelete', mode: 'alwaysOn'} )) + +jsTest.log('setting recipient failpoint cloned'); +pauseMigrateAtStep(recipient, migrateStepNames.cloned); + +/** + * Start a moveChunk in the background. Move chunk [10, 20), which has 10 docs, + * from shard 0 (donor) to shard 1 (recipient). Migration will pause after cloning + * (when it reaches the recipient failpoint). + */ + +// Donor: [0, 10) +// Recipient: [10, 20) +jsTest.log('starting migration, pause after cloning...'); +var joinMoveChunk = moveChunkParallel( + staticMongod, + st.s0.host, + {a: 10}, + null, + coll.getFullName(), + shards[1]._id); + +/** + * Wait for recipient to finish cloning. + * THEN delete 10 documents on donor, 5 in the migrating chunk and the 5 in the remaining chunk. + */ + +jsTest.log('Delete 5 docs from each chunk, migrating chunk and remaining chunk...'); +waitForMigrateStep(recipient, migrateStepNames.cloned); +donorColl.remove( {$and : [ {a: {$gte: 5}}, {a: {$lt: 15}} ]} ); + +/** + * Finish migration. Unpause recipient migration, wait for it to collect + * the delete diffs from donor and finish. + */ + +jsTest.log('Continuing and finishing migration...'); +unpauseMigrateAtStep(recipient, migrateStepNames.cloned); +joinMoveChunk(); + +/** + * Check documents are where they should be: 5 docs in each shard's chunk. + */ + +jsTest.log('Checking that documents are on the shards they should be...'); +assert.eq(5, donorColl.count()); +assert.eq(5, recipientColl.count()); +assert.eq(10, coll.count()); + +jsTest.log('DONE!'); +st.stop(); + +})() diff --git a/src/mongo/db/catalog/collection.cpp b/src/mongo/db/catalog/collection.cpp index 3be9467c79e..daa74399995 100644 --- a/src/mongo/db/catalog/collection.cpp +++ b/src/mongo/db/catalog/collection.cpp @@ -488,11 +488,8 @@ void Collection::deleteDocument(OperationContext* txn, Snapshotted<BSONObj> doc = docFor(txn, loc); - BSONElement e = doc.value()["_id"]; - BSONObj id; - if (e.type()) { - id = e.wrap(); - } + auto opObserver = getGlobalServiceContext()->getOpObserver(); + OpObserver::DeleteState deleteState = opObserver->aboutToDelete(txn, ns(), doc.value()); /* check if any cursors point to us. if so, advance them. */ _cursorManager.invalidateDocument(txn, loc, INVALIDATION_DELETION); @@ -501,9 +498,7 @@ void Collection::deleteDocument(OperationContext* txn, _recordStore->deleteRecord(txn, loc); - if (!id.isEmpty()) { - getGlobalServiceContext()->getOpObserver()->onDelete(txn, ns().ns(), id); - } + opObserver->onDelete(txn, ns(), std::move(deleteState)); } Counter64 moveCounter; diff --git a/src/mongo/db/op_observer.cpp b/src/mongo/db/op_observer.cpp index fc2f4485b53..b4dacd091c5 100644 --- a/src/mongo/db/op_observer.cpp +++ b/src/mongo/db/op_observer.cpp @@ -40,6 +40,7 @@ #include "mongo/db/repl/replication_coordinator_global.h" #include "mongo/s/d_state.h" #include "mongo/scripting/engine.h" +#include "mongo/db/operation_context.h" namespace mongo { @@ -91,16 +92,37 @@ void OpObserver::onUpdate(OperationContext* txn, oplogUpdateEntryArgs args) { } } +OpObserver::DeleteState OpObserver::aboutToDelete(OperationContext* txn, + const NamespaceString& ns, + const BSONObj& doc) { + OpObserver::DeleteState deleteState; + BSONElement idElement = doc["_id"]; + if (!idElement.eoo()) { + deleteState.idDoc = idElement.wrap(); + } + deleteState.isMigrating = isInMigratingChunk(txn, ns, doc); + return deleteState; +} + void OpObserver::onDelete(OperationContext* txn, - const std::string& ns, - const BSONObj& idDoc, + const NamespaceString& ns, + OpObserver::DeleteState deleteState, bool fromMigrate) { - repl::logOp(txn, "d", ns.c_str(), idDoc, nullptr, fromMigrate); + if (deleteState.idDoc.isEmpty()) + return; - getGlobalAuthorizationManager()->logOp(txn, "d", ns.c_str(), idDoc, nullptr); - logOpForSharding(txn, "d", ns.c_str(), idDoc, nullptr, fromMigrate); - logOpForDbHash(txn, ns.c_str()); - if (strstr(ns.c_str(), ".system.js")) { + repl::logOp(txn, "d", ns.ns().c_str(), deleteState.idDoc, nullptr, fromMigrate); + + AuthorizationManager::get(txn->getServiceContext()) + ->logOp(txn, "d", ns.ns().c_str(), deleteState.idDoc, nullptr); + logOpForSharding(txn, + "d", + ns.ns().c_str(), + deleteState.idDoc, + nullptr, + fromMigrate || !deleteState.isMigrating); + logOpForDbHash(txn, ns.ns().c_str()); + if (ns.coll() == "system.js") { Scope::storedFuncMod(txn); } } diff --git a/src/mongo/db/op_observer.h b/src/mongo/db/op_observer.h index 36dafda30c6..e9c18f4bbe7 100644 --- a/src/mongo/db/op_observer.h +++ b/src/mongo/db/op_observer.h @@ -51,6 +51,12 @@ class OpObserver { public: OpObserver() {} ~OpObserver() {} + + struct DeleteState { + BSONObj idDoc; + bool isMigrating = false; + }; + void onCreateIndex(OperationContext* txn, const std::string& ns, BSONObj indexDoc, @@ -61,9 +67,10 @@ public: std::vector<BSONObj>::const_iterator end, bool fromMigrate = false); void onUpdate(OperationContext* txn, oplogUpdateEntryArgs args); + DeleteState aboutToDelete(OperationContext* txn, const NamespaceString& ns, const BSONObj& doc); void onDelete(OperationContext* txn, - const std::string& ns, - const BSONObj& idDoc, + const NamespaceString& ns, + DeleteState deleteState, bool fromMigrate = false); void onOpMessage(OperationContext* txn, const BSONObj& msgObj); void onCreateCollection(OperationContext* txn, diff --git a/src/mongo/db/operation_context.h b/src/mongo/db/operation_context.h index 6dd13e9d095..a146a8f7f14 100644 --- a/src/mongo/db/operation_context.h +++ b/src/mongo/db/operation_context.h @@ -48,7 +48,7 @@ class WriteUnitOfWork; /** * This class encompasses the state required by an operation and lives from the time a network - * peration is dispatched until its execution is finished. Note that each "getmore" on a cursor + * operation is dispatched until its execution is finished. Note that each "getmore" on a cursor * is a separate operation. On construction, an OperationContext associates itself with the * current client, and only on destruction it deassociates itself. At any time a client can be * associated with at most one OperationContext. Each OperationContext has a RecoveryUnit diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp index bf7a483d993..27e767cd6a9 100644 --- a/src/mongo/db/s/migration_destination_manager.cpp +++ b/src/mongo/db/s/migration_destination_manager.cpp @@ -161,6 +161,8 @@ bool opReplicatedEnough(OperationContext* txn, return majorityStatus.isOK() && userStatus.isOK(); } +MONGO_FP_DECLARE(failMigrationReceivedOutOfRangeDelete); + } // namespace // Enabling / disabling these fail points pauses / resumes MigrateStatus::_go(), the thread which @@ -825,6 +827,9 @@ bool MigrationDestinationManager::_applyMigrateOp(OperationContext* txn, BSONObj fullObj; if (Helpers::findById(txn, ctx.db(), ns.c_str(), id, fullObj)) { if (!isInRange(fullObj, min, max, shardKeyPattern)) { + if (MONGO_FAIL_POINT(failMigrationReceivedOutOfRangeDelete)) { + invariant(0); + } continue; } } diff --git a/src/mongo/db/s/migration_source_manager.cpp b/src/mongo/db/s/migration_source_manager.cpp index f497c3f4d3d..59eb96222fb 100644 --- a/src/mongo/db/s/migration_source_manager.cpp +++ b/src/mongo/db/s/migration_source_manager.cpp @@ -234,6 +234,8 @@ void MigrationSourceManager::logOp(OperationContext* txn, // won't be transferred to the recipient shard. Also ignore ops from // _migrateClone and _transferMods since it is impossible to move a chunk // to self. + // Also ignore out of range deletes when migrating (notInActiveChunk is set in + // OpObserver::onDelete) return; } @@ -283,11 +285,17 @@ void MigrationSourceManager::logOp(OperationContext* txn, } } - // Note: can't check if delete is in active chunk since the document is gone! - txn->recoveryUnit()->registerChange(new LogOpForShardingHandler(this, idObj, op)); } +bool MigrationSourceManager::isInMigratingChunk(const NamespaceString& ns, const BSONObj& doc) { + if (!_active) + return false; + if (ns != _nss) + return false; + return isInRange(doc, _min, _max, _shardKeyPattern); +} + bool MigrationSourceManager::transferMods(OperationContext* txn, string& errmsg, BSONObjBuilder& b) { diff --git a/src/mongo/db/s/migration_source_manager.h b/src/mongo/db/s/migration_source_manager.h index 4921610332d..8c6251fe49b 100644 --- a/src/mongo/db/s/migration_source_manager.h +++ b/src/mongo/db/s/migration_source_manager.h @@ -71,6 +71,8 @@ public: BSONObj* patt, bool notInActiveChunk); + bool isInMigratingChunk(const NamespaceString& ns, const BSONObj& doc); + /** * Called from the source of a migration process, this method transfers the accummulated local * mods from source to destination. diff --git a/src/mongo/s/d_migrate.cpp b/src/mongo/s/d_migrate.cpp index 8d1d2e783f6..6a7abe7f723 100644 --- a/src/mongo/s/d_migrate.cpp +++ b/src/mongo/s/d_migrate.cpp @@ -441,4 +441,8 @@ void logOpForSharding(OperationContext* txn, shardingState->migrationSourceManager()->logOp(txn, opstr, ns, obj, patt, notInActiveChunk); } +bool isInMigratingChunk(OperationContext* txn, const NamespaceString& ns, const BSONObj& doc) { + return ShardingState::get(txn)->migrationSourceManager()->isInMigratingChunk(ns, doc); +} + } // namespace mongo diff --git a/src/mongo/s/d_state.h b/src/mongo/s/d_state.h index 8029ef92aef..ce6136ea17c 100644 --- a/src/mongo/s/d_state.h +++ b/src/mongo/s/d_state.h @@ -37,6 +37,7 @@ class BSONObj; class Client; class OperationContext; class ShardedConnectionInfo; +class NamespaceString; // ----------------- // --- core --- @@ -69,4 +70,11 @@ void logOpForSharding(OperationContext* txn, const BSONObj& obj, BSONObj* patt, bool forMigrateCleanup); + +/** + * Checks if 'doc' in 'ns' belongs to a currently migrating chunk. + * + * Note: Must be holding global IX lock when calling this method. + */ +bool isInMigratingChunk(OperationContext* txn, const NamespaceString& ns, const BSONObj& doc); } |