summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDianna Hohensee <dianna.hohensee@10gen.com>2015-12-02 11:51:41 -0500
committerDianna Hohensee <dianna.hohensee@10gen.com>2015-12-22 09:12:08 -0500
commit3663e004dfc2f73b82b3d88b5fa1ac6b7dcd1d33 (patch)
treed46813559c37225f31768042e2bd2a4ba8ce1ba0
parent07b2e02ffddbd9998b8c738895f3fae85b592176 (diff)
downloadmongo-3663e004dfc2f73b82b3d88b5fa1ac6b7dcd1d33.tar.gz
SERVER-21382 fixing sharding migration to transfer only document deletions relevant to the chunk being migrated, not every deletion
-rw-r--r--buildscripts/resmokeconfig/suites/sharding_auth.yml1
-rw-r--r--buildscripts/resmokeconfig/suites/sharding_auth_audit.yml1
-rw-r--r--jstests/libs/chunk_manipulation_util.js8
-rw-r--r--jstests/sharding/migration_with_source_deletes.js123
-rw-r--r--src/mongo/db/catalog/collection.cpp11
-rw-r--r--src/mongo/db/op_observer.cpp36
-rw-r--r--src/mongo/db/op_observer.h11
-rw-r--r--src/mongo/db/operation_context.h2
-rw-r--r--src/mongo/db/s/migration_destination_manager.cpp5
-rw-r--r--src/mongo/db/s/migration_source_manager.cpp12
-rw-r--r--src/mongo/db/s/migration_source_manager.h2
-rw-r--r--src/mongo/s/d_migrate.cpp4
-rw-r--r--src/mongo/s/d_state.h8
13 files changed, 202 insertions, 22 deletions
diff --git a/buildscripts/resmokeconfig/suites/sharding_auth.yml b/buildscripts/resmokeconfig/suites/sharding_auth.yml
index 06e1118b7d9..578632650e0 100644
--- a/buildscripts/resmokeconfig/suites/sharding_auth.yml
+++ b/buildscripts/resmokeconfig/suites/sharding_auth.yml
@@ -23,6 +23,7 @@ selector:
# Skip the testcases that do not have auth bypass when running ops in parallel.
- jstests/sharding/cleanup_orphaned_cmd_during_movechunk.js # SERVER-21713
- jstests/sharding/cleanup_orphaned_cmd_during_movechunk_hashed.js # SERVER-21713
+ - jstests/sharding/migration_with_source_deletes.js # SERVER-21713
executor:
js_test:
diff --git a/buildscripts/resmokeconfig/suites/sharding_auth_audit.yml b/buildscripts/resmokeconfig/suites/sharding_auth_audit.yml
index b577387cd0c..bed76c61261 100644
--- a/buildscripts/resmokeconfig/suites/sharding_auth_audit.yml
+++ b/buildscripts/resmokeconfig/suites/sharding_auth_audit.yml
@@ -23,6 +23,7 @@ selector:
# Skip the testcases that do not have auth bypass when running ops in parallel.
- jstests/sharding/cleanup_orphaned_cmd_during_movechunk.js # SERVER-21713
- jstests/sharding/cleanup_orphaned_cmd_during_movechunk_hashed.js # SERVER-21713
+ - jstests/sharding/migration_with_source_deletes.js # SERVER-21713
executor:
js_test:
diff --git a/jstests/libs/chunk_manipulation_util.js b/jstests/libs/chunk_manipulation_util.js
index b1e4d5a90e1..8ad487667cf 100644
--- a/jstests/libs/chunk_manipulation_util.js
+++ b/jstests/libs/chunk_manipulation_util.js
@@ -201,7 +201,7 @@ function waitForMigrateStep( shardConnection, stepNumber ) {
admin = shardConnection.getDB( 'admin' );
assert( stepNumber >= 1);
- assert( stepNumber <= 5 );
+ assert( stepNumber <= 5);
var msg = (
'Migrate thread on ' + shardConnection.shardName
@@ -215,7 +215,11 @@ function waitForMigrateStep( shardConnection, stepNumber ) {
for ( var i = 0; i < in_progress.length; ++i ) {
var op = in_progress[i];
if ( op.desc && op.desc === 'migrateThread' ) {
- return op.msg.startsWith( searchString );
+ if (op.hasOwnProperty('msg')) {
+ return op.msg.startsWith( searchString );
+ } else {
+ return false;
+ }
}
}
diff --git a/jstests/sharding/migration_with_source_deletes.js b/jstests/sharding/migration_with_source_deletes.js
new file mode 100644
index 00000000000..afc00e513e2
--- /dev/null
+++ b/jstests/sharding/migration_with_source_deletes.js
@@ -0,0 +1,123 @@
+//
+// Tests during chunk migration that the recipient does not receive out of range deletes from
+// the donor.
+//
+// Pauses the migration on the recipient shard after the initial data chunk cloning is finished.
+// This allows time for the donor shard to perform deletes, half of which are on the migrating
+// chunk. The recipient is then set to continue, collecting the delete mods from the donor, and
+// finishes the migration. A failpoint is set prior to resuming in the recipient shard to fail
+// if it receives an out of chunk range delete from the donor's delete mods log.
+//
+// The idea is that the recipient shard should not be collecting deletes from the donor shard
+// that are not in range and that will unnecessarily prevent the migration from finishing: the
+// migration can only end when donor's log of delete mods for the migrating chunk is empty.
+//
+
+load('./jstests/libs/chunk_manipulation_util.js');
+
+(function() {
+"use strict";
+
+var staticMongod = MongoRunner.runMongod({}); // For startParallelOps.
+
+/**
+ * Start up new sharded cluster, stop balancer that would interfere in manual chunk management.
+ */
+
+var st = new ShardingTest({ shards : 2, mongos : 1 });
+st.stopBalancer();
+
+var mongos = st.s0,
+ admin = mongos.getDB('admin'),
+ shards = mongos.getCollection('config.shards').find().toArray(),
+ dbName = "testDB",
+ ns = dbName + ".foo",
+ coll = mongos.getCollection(ns),
+ donor = st.shard0,
+ recipient = st.shard1,
+ donorColl = donor.getCollection(ns),
+ recipientColl = recipient.getCollection(ns);
+
+/**
+ * Exable sharding, and split collection into two chunks.
+ */
+
+// Two chunks
+// Donor: [0, 10) [10, 20)
+// Recipient:
+jsTest.log('Enable sharding of the collection and pre-split into two chunks....');
+assert.commandWorked( admin.runCommand( {enableSharding: dbName} ) );
+st.ensurePrimaryShard(dbName, shards[0]._id);
+assert.commandWorked( admin.runCommand( {shardCollection: ns, key: {a: 1}} ) );
+assert.commandWorked( admin.runCommand( {split: ns, middle: {a: 10}} ) );
+
+/**
+ * Insert data into collection
+ */
+
+// 10 documents in each chunk on the donor
+jsTest.log('Inserting 20 docs into donor shard, 10 in each chunk....');
+for (var i = 0; i < 20; ++i) donorColl.insert( {a: i} );
+assert.eq(null, donorColl.getDB().getLastError());
+assert.eq(20, donorColl.count());
+
+/**
+ * Set failpoints. Recipient will crash if an out of chunk range data delete is
+ * received from donor. Recipient will pause migration after cloning chunk data from donor,
+ * before checking delete mods log on donor.
+ */
+
+jsTest.log('setting failpoint failMigrationReceivedOutOfRangeDelete');
+assert.commandWorked(recipient.getDB('admin').runCommand( {configureFailPoint: 'failMigrationReceivedOutOfRangeDelete', mode: 'alwaysOn'} ))
+
+jsTest.log('setting recipient failpoint cloned');
+pauseMigrateAtStep(recipient, migrateStepNames.cloned);
+
+/**
+ * Start a moveChunk in the background. Move chunk [10, 20), which has 10 docs,
+ * from shard 0 (donor) to shard 1 (recipient). Migration will pause after cloning
+ * (when it reaches the recipient failpoint).
+ */
+
+// Donor: [0, 10)
+// Recipient: [10, 20)
+jsTest.log('starting migration, pause after cloning...');
+var joinMoveChunk = moveChunkParallel(
+ staticMongod,
+ st.s0.host,
+ {a: 10},
+ null,
+ coll.getFullName(),
+ shards[1]._id);
+
+/**
+ * Wait for recipient to finish cloning.
+ * THEN delete 10 documents on donor, 5 in the migrating chunk and the 5 in the remaining chunk.
+ */
+
+jsTest.log('Delete 5 docs from each chunk, migrating chunk and remaining chunk...');
+waitForMigrateStep(recipient, migrateStepNames.cloned);
+donorColl.remove( {$and : [ {a: {$gte: 5}}, {a: {$lt: 15}} ]} );
+
+/**
+ * Finish migration. Unpause recipient migration, wait for it to collect
+ * the delete diffs from donor and finish.
+ */
+
+jsTest.log('Continuing and finishing migration...');
+unpauseMigrateAtStep(recipient, migrateStepNames.cloned);
+joinMoveChunk();
+
+/**
+ * Check documents are where they should be: 5 docs in each shard's chunk.
+ */
+
+jsTest.log('Checking that documents are on the shards they should be...');
+assert.eq(5, donorColl.count());
+assert.eq(5, recipientColl.count());
+assert.eq(10, coll.count());
+
+jsTest.log('DONE!');
+st.stop();
+
+})()
diff --git a/src/mongo/db/catalog/collection.cpp b/src/mongo/db/catalog/collection.cpp
index 3be9467c79e..daa74399995 100644
--- a/src/mongo/db/catalog/collection.cpp
+++ b/src/mongo/db/catalog/collection.cpp
@@ -488,11 +488,8 @@ void Collection::deleteDocument(OperationContext* txn,
Snapshotted<BSONObj> doc = docFor(txn, loc);
- BSONElement e = doc.value()["_id"];
- BSONObj id;
- if (e.type()) {
- id = e.wrap();
- }
+ auto opObserver = getGlobalServiceContext()->getOpObserver();
+ OpObserver::DeleteState deleteState = opObserver->aboutToDelete(txn, ns(), doc.value());
/* check if any cursors point to us. if so, advance them. */
_cursorManager.invalidateDocument(txn, loc, INVALIDATION_DELETION);
@@ -501,9 +498,7 @@ void Collection::deleteDocument(OperationContext* txn,
_recordStore->deleteRecord(txn, loc);
- if (!id.isEmpty()) {
- getGlobalServiceContext()->getOpObserver()->onDelete(txn, ns().ns(), id);
- }
+ opObserver->onDelete(txn, ns(), std::move(deleteState));
}
Counter64 moveCounter;
diff --git a/src/mongo/db/op_observer.cpp b/src/mongo/db/op_observer.cpp
index fc2f4485b53..b4dacd091c5 100644
--- a/src/mongo/db/op_observer.cpp
+++ b/src/mongo/db/op_observer.cpp
@@ -40,6 +40,7 @@
#include "mongo/db/repl/replication_coordinator_global.h"
#include "mongo/s/d_state.h"
#include "mongo/scripting/engine.h"
+#include "mongo/db/operation_context.h"
namespace mongo {
@@ -91,16 +92,37 @@ void OpObserver::onUpdate(OperationContext* txn, oplogUpdateEntryArgs args) {
}
}
+OpObserver::DeleteState OpObserver::aboutToDelete(OperationContext* txn,
+ const NamespaceString& ns,
+ const BSONObj& doc) {
+ OpObserver::DeleteState deleteState;
+ BSONElement idElement = doc["_id"];
+ if (!idElement.eoo()) {
+ deleteState.idDoc = idElement.wrap();
+ }
+ deleteState.isMigrating = isInMigratingChunk(txn, ns, doc);
+ return deleteState;
+}
+
void OpObserver::onDelete(OperationContext* txn,
- const std::string& ns,
- const BSONObj& idDoc,
+ const NamespaceString& ns,
+ OpObserver::DeleteState deleteState,
bool fromMigrate) {
- repl::logOp(txn, "d", ns.c_str(), idDoc, nullptr, fromMigrate);
+ if (deleteState.idDoc.isEmpty())
+ return;
- getGlobalAuthorizationManager()->logOp(txn, "d", ns.c_str(), idDoc, nullptr);
- logOpForSharding(txn, "d", ns.c_str(), idDoc, nullptr, fromMigrate);
- logOpForDbHash(txn, ns.c_str());
- if (strstr(ns.c_str(), ".system.js")) {
+ repl::logOp(txn, "d", ns.ns().c_str(), deleteState.idDoc, nullptr, fromMigrate);
+
+ AuthorizationManager::get(txn->getServiceContext())
+ ->logOp(txn, "d", ns.ns().c_str(), deleteState.idDoc, nullptr);
+ logOpForSharding(txn,
+ "d",
+ ns.ns().c_str(),
+ deleteState.idDoc,
+ nullptr,
+ fromMigrate || !deleteState.isMigrating);
+ logOpForDbHash(txn, ns.ns().c_str());
+ if (ns.coll() == "system.js") {
Scope::storedFuncMod(txn);
}
}
diff --git a/src/mongo/db/op_observer.h b/src/mongo/db/op_observer.h
index 36dafda30c6..e9c18f4bbe7 100644
--- a/src/mongo/db/op_observer.h
+++ b/src/mongo/db/op_observer.h
@@ -51,6 +51,12 @@ class OpObserver {
public:
OpObserver() {}
~OpObserver() {}
+
+ struct DeleteState {
+ BSONObj idDoc;
+ bool isMigrating = false;
+ };
+
void onCreateIndex(OperationContext* txn,
const std::string& ns,
BSONObj indexDoc,
@@ -61,9 +67,10 @@ public:
std::vector<BSONObj>::const_iterator end,
bool fromMigrate = false);
void onUpdate(OperationContext* txn, oplogUpdateEntryArgs args);
+ DeleteState aboutToDelete(OperationContext* txn, const NamespaceString& ns, const BSONObj& doc);
void onDelete(OperationContext* txn,
- const std::string& ns,
- const BSONObj& idDoc,
+ const NamespaceString& ns,
+ DeleteState deleteState,
bool fromMigrate = false);
void onOpMessage(OperationContext* txn, const BSONObj& msgObj);
void onCreateCollection(OperationContext* txn,
diff --git a/src/mongo/db/operation_context.h b/src/mongo/db/operation_context.h
index 6dd13e9d095..a146a8f7f14 100644
--- a/src/mongo/db/operation_context.h
+++ b/src/mongo/db/operation_context.h
@@ -48,7 +48,7 @@ class WriteUnitOfWork;
/**
* This class encompasses the state required by an operation and lives from the time a network
- * peration is dispatched until its execution is finished. Note that each "getmore" on a cursor
+ * operation is dispatched until its execution is finished. Note that each "getmore" on a cursor
* is a separate operation. On construction, an OperationContext associates itself with the
* current client, and only on destruction it deassociates itself. At any time a client can be
* associated with at most one OperationContext. Each OperationContext has a RecoveryUnit
diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp
index bf7a483d993..27e767cd6a9 100644
--- a/src/mongo/db/s/migration_destination_manager.cpp
+++ b/src/mongo/db/s/migration_destination_manager.cpp
@@ -161,6 +161,8 @@ bool opReplicatedEnough(OperationContext* txn,
return majorityStatus.isOK() && userStatus.isOK();
}
+MONGO_FP_DECLARE(failMigrationReceivedOutOfRangeDelete);
+
} // namespace
// Enabling / disabling these fail points pauses / resumes MigrateStatus::_go(), the thread which
@@ -825,6 +827,9 @@ bool MigrationDestinationManager::_applyMigrateOp(OperationContext* txn,
BSONObj fullObj;
if (Helpers::findById(txn, ctx.db(), ns.c_str(), id, fullObj)) {
if (!isInRange(fullObj, min, max, shardKeyPattern)) {
+ if (MONGO_FAIL_POINT(failMigrationReceivedOutOfRangeDelete)) {
+ invariant(0);
+ }
continue;
}
}
diff --git a/src/mongo/db/s/migration_source_manager.cpp b/src/mongo/db/s/migration_source_manager.cpp
index f497c3f4d3d..59eb96222fb 100644
--- a/src/mongo/db/s/migration_source_manager.cpp
+++ b/src/mongo/db/s/migration_source_manager.cpp
@@ -234,6 +234,8 @@ void MigrationSourceManager::logOp(OperationContext* txn,
// won't be transferred to the recipient shard. Also ignore ops from
// _migrateClone and _transferMods since it is impossible to move a chunk
// to self.
+ // Also ignore out of range deletes when migrating (notInActiveChunk is set in
+ // OpObserver::onDelete)
return;
}
@@ -283,11 +285,17 @@ void MigrationSourceManager::logOp(OperationContext* txn,
}
}
- // Note: can't check if delete is in active chunk since the document is gone!
-
txn->recoveryUnit()->registerChange(new LogOpForShardingHandler(this, idObj, op));
}
+bool MigrationSourceManager::isInMigratingChunk(const NamespaceString& ns, const BSONObj& doc) {
+ if (!_active)
+ return false;
+ if (ns != _nss)
+ return false;
+ return isInRange(doc, _min, _max, _shardKeyPattern);
+}
+
bool MigrationSourceManager::transferMods(OperationContext* txn,
string& errmsg,
BSONObjBuilder& b) {
diff --git a/src/mongo/db/s/migration_source_manager.h b/src/mongo/db/s/migration_source_manager.h
index 4921610332d..8c6251fe49b 100644
--- a/src/mongo/db/s/migration_source_manager.h
+++ b/src/mongo/db/s/migration_source_manager.h
@@ -71,6 +71,8 @@ public:
BSONObj* patt,
bool notInActiveChunk);
+ bool isInMigratingChunk(const NamespaceString& ns, const BSONObj& doc);
+
/**
* Called from the source of a migration process, this method transfers the accummulated local
* mods from source to destination.
diff --git a/src/mongo/s/d_migrate.cpp b/src/mongo/s/d_migrate.cpp
index 8d1d2e783f6..6a7abe7f723 100644
--- a/src/mongo/s/d_migrate.cpp
+++ b/src/mongo/s/d_migrate.cpp
@@ -441,4 +441,8 @@ void logOpForSharding(OperationContext* txn,
shardingState->migrationSourceManager()->logOp(txn, opstr, ns, obj, patt, notInActiveChunk);
}
+bool isInMigratingChunk(OperationContext* txn, const NamespaceString& ns, const BSONObj& doc) {
+ return ShardingState::get(txn)->migrationSourceManager()->isInMigratingChunk(ns, doc);
+}
+
} // namespace mongo
diff --git a/src/mongo/s/d_state.h b/src/mongo/s/d_state.h
index 8029ef92aef..ce6136ea17c 100644
--- a/src/mongo/s/d_state.h
+++ b/src/mongo/s/d_state.h
@@ -37,6 +37,7 @@ class BSONObj;
class Client;
class OperationContext;
class ShardedConnectionInfo;
+class NamespaceString;
// -----------------
// --- core ---
@@ -69,4 +70,11 @@ void logOpForSharding(OperationContext* txn,
const BSONObj& obj,
BSONObj* patt,
bool forMigrateCleanup);
+
+/**
+ * Checks if 'doc' in 'ns' belongs to a currently migrating chunk.
+ *
+ * Note: Must be holding global IX lock when calling this method.
+ */
+bool isInMigratingChunk(OperationContext* txn, const NamespaceString& ns, const BSONObj& doc);
}