summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNathan Myers <nathan.myers@10gen.com>2017-04-18 17:46:55 -0400
committerNathan Myers <nathan.myers@10gen.com>2017-04-20 01:31:00 -0400
commitc192a1b9b1e223f8075ab5ce72dde372467f9650 (patch)
treedf2fad3682c5fce44a7f91d9a216cd912017e38b
parent53907c0094e26e39b813ae369be52a4e51fc3a08 (diff)
downloadmongo-c192a1b9b1e223f8075ab5ce72dde372467f9650.tar.gz
SERVER-27921 New Range Deleter
-rw-r--r--jstests/sharding/authCommands.js4
-rw-r--r--jstests/sharding/cleanup_orphaned_cmd_during_movechunk.js6
-rw-r--r--jstests/sharding/migration_ignore_interrupts_4.js3
-rw-r--r--src/mongo/SConscript1
-rw-r--r--src/mongo/db/SConscript49
-rw-r--r--src/mongo/db/db.cpp2
-rw-r--r--src/mongo/db/dbhelpers.cpp189
-rw-r--r--src/mongo/db/dbhelpers.h23
-rw-r--r--src/mongo/db/range_deleter_db_env.cpp129
-rw-r--r--src/mongo/db/s/SConscript22
-rw-r--r--src/mongo/db/s/cleanup_orphaned_cmd.cpp95
-rw-r--r--src/mongo/db/s/collection_metadata.cpp215
-rw-r--r--src/mongo/db/s/collection_metadata.h57
-rw-r--r--src/mongo/db/s/collection_metadata_test.cpp243
-rw-r--r--src/mongo/db/s/collection_range_deleter.cpp258
-rw-r--r--src/mongo/db/s/collection_range_deleter.h101
-rw-r--r--src/mongo/db/s/collection_range_deleter_test.cpp298
-rw-r--r--src/mongo/db/s/collection_sharding_state.cpp61
-rw-r--r--src/mongo/db/s/collection_sharding_state.h68
-rw-r--r--src/mongo/db/s/get_shard_version_command.cpp2
-rw-r--r--src/mongo/db/s/metadata_manager.cpp625
-rw-r--r--src/mongo/db/s/metadata_manager.h268
-rw-r--r--src/mongo/db/s/metadata_manager_test.cpp346
-rw-r--r--src/mongo/db/s/migration_destination_manager.cpp216
-rw-r--r--src/mongo/db/s/migration_destination_manager.h37
-rw-r--r--src/mongo/db/s/migration_destination_manager_legacy_commands.cpp13
-rw-r--r--src/mongo/db/s/migration_source_manager.cpp10
-rw-r--r--src/mongo/db/s/move_chunk_command.cpp47
-rw-r--r--src/mongo/db/s/sharding_state.cpp38
-rw-r--r--src/mongo/db/s/sharding_state.h37
-rw-r--r--src/mongo/db/stats/SConscript3
-rw-r--r--src/mongo/dbtests/dbhelper_tests.cpp24
-rw-r--r--src/mongo/s/catalog/type_chunk.cpp22
-rw-r--r--src/mongo/s/catalog/type_chunk.h20
-rw-r--r--src/mongo/s/catalog/type_chunk_test.cpp59
35 files changed, 1561 insertions, 2030 deletions
diff --git a/jstests/sharding/authCommands.js b/jstests/sharding/authCommands.js
index a36e745f113..694a09e2f9b 100644
--- a/jstests/sharding/authCommands.js
+++ b/jstests/sharding/authCommands.js
@@ -117,9 +117,9 @@
print("Checking read operations, should work");
assert.eq(expectedDocs, testDB.foo.find().itcount());
assert.eq(expectedDocs, testDB.foo.count());
+
// NOTE: This is an explicit check that GLE can be run with read prefs, not the result
- // of
- // above.
+ // of above.
assert.eq(null, testDB.runCommand({getlasterror: 1}).err);
checkCommandSucceeded(testDB, {dbstats: 1});
checkCommandSucceeded(testDB, {collstats: 'foo'});
diff --git a/jstests/sharding/cleanup_orphaned_cmd_during_movechunk.js b/jstests/sharding/cleanup_orphaned_cmd_during_movechunk.js
index fef023fabf5..bf8471b2e98 100644
--- a/jstests/sharding/cleanup_orphaned_cmd_during_movechunk.js
+++ b/jstests/sharding/cleanup_orphaned_cmd_during_movechunk.js
@@ -29,14 +29,16 @@ load('./jstests/libs/cleanup_orphaned_util.js');
{moveChunk: ns, find: {_id: 20}, to: st.shard1.shardName, _waitForDelete: true}));
jsTest.log('Inserting 20 docs into shard 0....');
- for (var i = -20; i < 20; i += 2)
+ for (var i = -20; i < 20; i += 2) {
coll.insert({_id: i});
+ }
assert.eq(null, coll.getDB().getLastError());
assert.eq(20, donorColl.count());
jsTest.log('Inserting 10 docs into shard 1....');
- for (i = 20; i < 40; i += 2)
+ for (i = 20; i < 40; i += 2) {
coll.insert({_id: i});
+ }
assert.eq(null, coll.getDB().getLastError());
assert.eq(10, recipientColl.count());
diff --git a/jstests/sharding/migration_ignore_interrupts_4.js b/jstests/sharding/migration_ignore_interrupts_4.js
index 5469ea48d4b..1510f4f1f67 100644
--- a/jstests/sharding/migration_ignore_interrupts_4.js
+++ b/jstests/sharding/migration_ignore_interrupts_4.js
@@ -91,9 +91,6 @@ load('./jstests/libs/chunk_manipulation_util.js');
}, "coll1 migration recipient didn't abort migration in catchup phase.", 2 * 60 * 1000);
assert.eq(
1, shard0Coll1.find().itcount(), "donor shard0 completed a migration that it aborted.");
- assert.eq(1,
- shard1Coll1.find().itcount(),
- "shard1 accessed the xfermods log despite donor migration abortion.");
jsTest.log('Finishing coll2 migration, which should succeed....');
unpauseMigrateAtStep(shard2, migrateStepNames.cloned);
diff --git a/src/mongo/SConscript b/src/mongo/SConscript
index 39de568e960..abf34365dbb 100644
--- a/src/mongo/SConscript
+++ b/src/mongo/SConscript
@@ -289,7 +289,6 @@ mongodLibDeps = [
'db/op_observer_d',
'db/mongodwebserver',
'db/repl/repl_set_commands',
- 'db/range_deleter_d',
'db/repair_database',
'db/repl/storage_interface_impl',
'db/repl/topology_coordinator_impl',
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index 2edd8f35d6a..698fc103a51 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -369,35 +369,6 @@ env.CppUnitTest(
],
)
-env.Library(
- target='range_deleter',
- source=[
- 'range_deleter.cpp',
- 'range_deleter_mock_env.cpp',
- ],
- LIBDEPS=[
- '$BUILD_DIR/mongo/base',
- '$BUILD_DIR/mongo/db/common',
- '$BUILD_DIR/mongo/db/repl/repl_coordinator_global',
- '$BUILD_DIR/mongo/db/service_context',
- 'range_arithmetic',
- ],
-)
-
-env.CppUnitTest(
- target='range_deleter_test',
- source=[
- 'range_deleter_test.cpp',
- ],
- LIBDEPS=[
- 'common',
- 'range_deleter',
- '$BUILD_DIR/mongo/db/auth/authorization_manager_mock_init',
- '$BUILD_DIR/mongo/db/repl/replmocks',
- '$BUILD_DIR/mongo/db/service_context_noop_init',
- ],
-)
-
# This library is linked into mongos and mongod only, not into the shell or any tools.
env.Library(
target="mongodandmongos",
@@ -755,24 +726,6 @@ env.Library(
)
env.Library(
- target="range_deleter_d",
- source=[
- "range_deleter_db_env.cpp",
- "range_deleter_service.cpp",
- ],
- LIBDEPS=[
- "dbhelpers",
- "db_raii",
- "range_deleter",
- #"catalog/catalog", # CYCLE
- ],
- LIBDEPS_TAGS=[
- # TODO(ADAM, 2017-03-10): See `CYCLE` tags above.
- 'illegal_cyclic_or_unresolved_dependencies_whitelisted',
- ],
-)
-
-env.Library(
target="rw_concern_d",
source=[
"read_concern.cpp",
@@ -926,8 +879,6 @@ env.Library(
"pipeline/serveronly",
"prefetch",
"query/query",
- "range_deleter",
- "range_deleter_d",
"repair_database",
"repl/bgsync",
"repl/oplog_buffer_collection",
diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp
index 68a3b7f0b44..75e81e2d216 100644
--- a/src/mongo/db/db.cpp
+++ b/src/mongo/db/db.cpp
@@ -659,8 +659,6 @@ ExitCode _initAndListen(int listenPort) {
startFTDC();
- getDeleter()->startWorkers();
-
restartInProgressIndexesFromLastShutdown(startupOpCtx.get());
if (serverGlobalParams.clusterRole == ClusterRole::ShardServer) {
diff --git a/src/mongo/db/dbhelpers.cpp b/src/mongo/db/dbhelpers.cpp
index 34ecdfbbefa..7de97489e77 100644
--- a/src/mongo/db/dbhelpers.cpp
+++ b/src/mongo/db/dbhelpers.cpp
@@ -271,195 +271,6 @@ BSONObj Helpers::inferKeyPattern(const BSONObj& o) {
return kpBuilder.obj();
}
-long long Helpers::removeRange(OperationContext* opCtx,
- const KeyRange& range,
- BoundInclusion boundInclusion,
- const WriteConcernOptions& writeConcern,
- RemoveSaver* callback,
- bool fromMigrate,
- bool onlyRemoveOrphanedDocs) {
- Timer rangeRemoveTimer;
- const NamespaceString nss(range.ns);
-
- // The IndexChunk has a keyPattern that may apply to more than one index - we need to
- // select the index and get the full index keyPattern here.
- std::string indexName;
- BSONObj min;
- BSONObj max;
-
- {
- AutoGetCollectionForReadCommand ctx(opCtx, nss);
- Collection* collection = ctx.getCollection();
- if (!collection) {
- warning(LogComponent::kSharding)
- << "collection deleted before cleaning data over range of type " << range.keyPattern
- << " in " << nss.ns() << endl;
- return -1;
- }
-
- // Allow multiKey based on the invariant that shard keys must be single-valued.
- // Therefore, any multi-key index prefixed by shard key cannot be multikey over
- // the shard key fields.
- const IndexDescriptor* idx =
- collection->getIndexCatalog()->findShardKeyPrefixedIndex(opCtx,
- range.keyPattern,
- false); // requireSingleKey
- if (!idx) {
- warning(LogComponent::kSharding) << "no index found to clean data over range of type "
- << range.keyPattern << " in " << nss.ns() << endl;
- return -1;
- }
-
- indexName = idx->indexName();
- KeyPattern indexKeyPattern(idx->keyPattern());
-
- // Extend bounds to match the index we found
-
- invariant(IndexBounds::isStartIncludedInBound(boundInclusion));
- // Extend min to get (min, MinKey, MinKey, ....)
- min = Helpers::toKeyFormat(indexKeyPattern.extendRangeBound(range.minKey, false));
- // If upper bound is included, extend max to get (max, MaxKey, MaxKey, ...)
- // If not included, extend max to get (max, MinKey, MinKey, ....)
- const bool maxInclusive = IndexBounds::isEndIncludedInBound(boundInclusion);
- max = Helpers::toKeyFormat(indexKeyPattern.extendRangeBound(range.maxKey, maxInclusive));
- }
-
-
- MONGO_LOG_COMPONENT(1, LogComponent::kSharding)
- << "begin removal of " << min << " to " << max << " in " << nss.ns()
- << " with write concern: " << writeConcern.toBSON() << endl;
-
- long long numDeleted = 0;
-
- Milliseconds millisWaitingForReplication{0};
-
- while (1) {
- // Scoping for write lock.
- {
- AutoGetCollection ctx(opCtx, nss, MODE_IX, MODE_IX);
- Collection* collection = ctx.getCollection();
- if (!collection)
- break;
-
- IndexDescriptor* desc =
- collection->getIndexCatalog()->findIndexByName(opCtx, indexName);
-
- if (!desc) {
- warning(LogComponent::kSharding) << "shard key index '" << indexName << "' on '"
- << nss.ns() << "' was dropped";
- return -1;
- }
-
- auto exec = InternalPlanner::indexScan(opCtx,
- collection,
- desc,
- min,
- max,
- boundInclusion,
- PlanExecutor::YIELD_AUTO,
- InternalPlanner::FORWARD,
- InternalPlanner::IXSCAN_FETCH);
-
- RecordId rloc;
- BSONObj obj;
- PlanExecutor::ExecState state;
- // This may yield so we cannot touch nsd after this.
- state = exec->getNext(&obj, &rloc);
- if (PlanExecutor::IS_EOF == state) {
- break;
- }
-
- if (PlanExecutor::FAILURE == state || PlanExecutor::DEAD == state) {
- warning(LogComponent::kSharding)
- << PlanExecutor::statestr(state) << " - cursor error while trying to delete "
- << min << " to " << max << " in " << nss.ns() << ": "
- << WorkingSetCommon::toStatusString(obj)
- << ", stats: " << Explain::getWinningPlanStats(exec.get()) << endl;
- break;
- }
-
- verify(PlanExecutor::ADVANCED == state);
-
- WriteUnitOfWork wuow(opCtx);
-
- if (onlyRemoveOrphanedDocs) {
- // Do a final check in the write lock to make absolutely sure that our
- // collection hasn't been modified in a way that invalidates our migration
- // cleanup.
-
- // We should never be able to turn off the sharding state once enabled, but
- // in the future we might want to.
- verify(ShardingState::get(opCtx)->enabled());
-
- bool docIsOrphan;
-
- // In write lock, so will be the most up-to-date version
- auto metadataNow = CollectionShardingState::get(opCtx, nss.ns())->getMetadata();
- if (metadataNow) {
- ShardKeyPattern kp(metadataNow->getKeyPattern());
- BSONObj key = kp.extractShardKeyFromDoc(obj);
- docIsOrphan =
- !metadataNow->keyBelongsToMe(key) && !metadataNow->keyIsPending(key);
- } else {
- docIsOrphan = false;
- }
-
- if (!docIsOrphan) {
- warning(LogComponent::kSharding)
- << "aborting migration cleanup for chunk " << min << " to " << max
- << (metadataNow ? (string) " at document " + obj.toString() : "")
- << ", collection " << nss.ns() << " has changed " << endl;
- break;
- }
- }
-
- if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(opCtx, nss)) {
- warning() << "stepped down from primary while deleting chunk; "
- << "orphaning data in " << nss.ns() << " in range [" << redact(min)
- << ", " << redact(max) << ")";
- return numDeleted;
- }
-
- if (callback)
- callback->goingToDelete(obj);
-
- OpDebug* const nullOpDebug = nullptr;
- collection->deleteDocument(opCtx, rloc, nullOpDebug, fromMigrate);
- wuow.commit();
- numDeleted++;
- }
-
- // TODO remove once the yielding below that references this timer has been removed
- Timer secondaryThrottleTime;
-
- if (writeConcern.shouldWaitForOtherNodes() && numDeleted > 0) {
- repl::ReplicationCoordinator::StatusAndDuration replStatus =
- repl::getGlobalReplicationCoordinator()->awaitReplication(
- opCtx,
- repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(),
- writeConcern);
- if (replStatus.status.code() == ErrorCodes::ExceededTimeLimit) {
- warning(LogComponent::kSharding) << "replication to secondaries for removeRange at "
- "least 60 seconds behind";
- } else {
- uassertStatusOK(replStatus.status);
- }
- millisWaitingForReplication += replStatus.duration;
- }
- }
-
- if (writeConcern.shouldWaitForOtherNodes())
- log(LogComponent::kSharding)
- << "Helpers::removeRangeUnlocked time spent waiting for replication: "
- << durationCount<Milliseconds>(millisWaitingForReplication) << "ms" << endl;
-
- MONGO_LOG_COMPONENT(1, LogComponent::kSharding) << "end removal of " << min << " to " << max
- << " in " << nss.ns() << " (took "
- << rangeRemoveTimer.millis() << "ms)" << endl;
-
- return numDeleted;
-}
-
void Helpers::emptyCollection(OperationContext* opCtx, const NamespaceString& nss) {
OldClientContext context(opCtx, nss.ns());
repl::UnreplicatedWritesBlock uwb(opCtx);
diff --git a/src/mongo/db/dbhelpers.h b/src/mongo/db/dbhelpers.h
index bcba05a7b04..031feda1215 100644
--- a/src/mongo/db/dbhelpers.h
+++ b/src/mongo/db/dbhelpers.h
@@ -139,29 +139,6 @@ struct Helpers {
static BSONObj inferKeyPattern(const BSONObj& o);
/**
- * Takes a namespace range, specified by a min and max and qualified by an index pattern,
- * and removes all the documents in that range found by iterating
- * over the given index. Caller is responsible for insuring that min/max are
- * compatible with the given keyPattern (e.g min={a:100} is compatible with
- * keyPattern={a:1,b:1} since it can be extended to {a:100,b:minKey}, but
- * min={b:100} is not compatible).
- *
- * Caller must hold a write lock on 'ns'
- *
- * Returns -1 when no usable index exists
- *
- * Does oplog the individual document deletions.
- * // TODO: Refactor this mechanism, it is growing too large
- */
- static long long removeRange(OperationContext* opCtx,
- const KeyRange& range,
- BoundInclusion boundInclusion,
- const WriteConcernOptions& secondaryThrottle,
- RemoveSaver* callback = NULL,
- bool fromMigrate = false,
- bool onlyRemoveOrphanedDocs = false);
-
- /**
* Remove all documents from a collection.
* You do not need to set the database before calling.
* Does not oplog the operation.
diff --git a/src/mongo/db/range_deleter_db_env.cpp b/src/mongo/db/range_deleter_db_env.cpp
deleted file mode 100644
index 08419a39a11..00000000000
--- a/src/mongo/db/range_deleter_db_env.cpp
+++ /dev/null
@@ -1,129 +0,0 @@
-/**
- * Copyright (C) 2013 10gen Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/db/range_deleter_db_env.h"
-
-#include "mongo/db/auth/authorization_manager.h"
-#include "mongo/db/auth/authorization_session.h"
-#include "mongo/db/catalog/collection.h"
-#include "mongo/db/client.h"
-#include "mongo/db/clientcursor.h"
-#include "mongo/db/db_raii.h"
-#include "mongo/db/dbhelpers.h"
-#include "mongo/db/repl/replication_coordinator_global.h"
-#include "mongo/db/s/operation_sharding_state.h"
-#include "mongo/db/write_concern_options.h"
-#include "mongo/util/log.h"
-
-namespace mongo {
-
-using std::string;
-
-/**
- * Outline of the delete process:
- * 1. Initialize the client for this thread if there is no client. This is for the worker
- * threads that are attached to any of the threads servicing client requests.
- * 2. Grant this thread authorization to perform deletes.
- * 3. Temporarily enable mode to bypass shard version checks. TODO: Replace this hack.
- * 4. Setup callback to save deletes to moveChunk directory (only if moveParanoia is true).
- * 5. Delete range.
- * 6. Wait until the majority of the secondaries catch up.
- */
-bool RangeDeleterDBEnv::deleteRange(OperationContext* opCtx,
- const RangeDeleteEntry& taskDetails,
- long long int* deletedDocs,
- std::string* errMsg) {
- const string ns(taskDetails.options.range.ns);
- const BSONObj inclusiveLower(taskDetails.options.range.minKey);
- const BSONObj exclusiveUpper(taskDetails.options.range.maxKey);
- const BSONObj keyPattern(taskDetails.options.range.keyPattern);
- const WriteConcernOptions writeConcern(taskDetails.options.writeConcern);
- const bool fromMigrate = taskDetails.options.fromMigrate;
- const bool onlyRemoveOrphans = taskDetails.options.onlyRemoveOrphanedDocs;
-
- Client::initThreadIfNotAlready("RangeDeleter");
-
- *deletedDocs = 0;
- OperationShardingState::IgnoreVersioningBlock forceVersion(opCtx, NamespaceString(ns));
-
- Helpers::RemoveSaver removeSaver("moveChunk", ns, taskDetails.options.removeSaverReason);
- Helpers::RemoveSaver* removeSaverPtr = NULL;
- if (serverGlobalParams.moveParanoia && !taskDetails.options.removeSaverReason.empty()) {
- removeSaverPtr = &removeSaver;
- }
-
- // log the opId so the user can use it to cancel the delete using killOp.
- unsigned int opId = opCtx->getOpID();
- log() << "Deleter starting delete for: " << ns << " from " << redact(inclusiveLower) << " -> "
- << redact(exclusiveUpper) << ", with opId: " << opId;
-
- try {
- *deletedDocs =
- Helpers::removeRange(opCtx,
- KeyRange(ns, inclusiveLower, exclusiveUpper, keyPattern),
- BoundInclusion::kIncludeStartKeyOnly,
- writeConcern,
- removeSaverPtr,
- fromMigrate,
- onlyRemoveOrphans);
-
- if (*deletedDocs < 0) {
- *errMsg = "collection or index dropped before data could be cleaned";
- warning() << *errMsg;
-
- return false;
- }
-
- log() << "rangeDeleter deleted " << *deletedDocs << " documents for " << ns << " from "
- << redact(inclusiveLower) << " -> " << redact(exclusiveUpper);
- } catch (const DBException& ex) {
- *errMsg = str::stream() << "Error encountered while deleting range: "
- << "ns" << ns << " from " << inclusiveLower << " -> "
- << exclusiveUpper << ", cause by:" << causedBy(ex);
-
- return false;
- }
-
- return true;
-}
-
-void RangeDeleterDBEnv::getCursorIds(OperationContext* opCtx,
- StringData ns,
- std::set<CursorId>* openCursors) {
- AutoGetCollection autoColl(opCtx, NamespaceString(ns), MODE_IS);
- if (!autoColl.getCollection())
- return;
-
- autoColl.getCollection()->getCursorManager()->getCursorIds(openCursors);
-}
-
-} // namespace mongo
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index 4dd1a7e33f4..400eec3def4 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -97,7 +97,6 @@ env.Library(
'$BUILD_DIR/mongo/db/catalog/collection',
'$BUILD_DIR/mongo/db/catalog/database',
'$BUILD_DIR/mongo/db/common',
- '$BUILD_DIR/mongo/db/range_deleter',
'$BUILD_DIR/mongo/db/concurrency/lock_manager',
'$BUILD_DIR/mongo/db/query/internal_plans',
'$BUILD_DIR/mongo/s/client/shard_local',
@@ -195,7 +194,6 @@ env.Library(
'$BUILD_DIR/mongo/db/db_raii',
'$BUILD_DIR/mongo/db/dbhelpers',
'$BUILD_DIR/mongo/db/index_d',
- '$BUILD_DIR/mongo/db/range_deleter_d',
'$BUILD_DIR/mongo/db/repl/repl_coordinator_global',
'balancer',
'metadata',
@@ -230,22 +228,10 @@ env.CppUnitTest(
)
env.CppUnitTest(
- target='sharding_metadata_test',
- source=[
- 'collection_metadata_test.cpp',
- 'shard_metadata_util_test.cpp',
- ],
- LIBDEPS=[
- 'metadata',
- '$BUILD_DIR/mongo/s/shard_server_test_fixture',
- ],
-)
-
-env.CppUnitTest(
target='shard_test',
source=[
+ 'shard_metadata_util_test.cpp',
'active_migrations_registry_test.cpp',
- 'metadata_manager_test.cpp',
'migration_chunk_cloner_source_legacy_test.cpp',
'sharding_state_test.cpp',
],
@@ -255,14 +241,17 @@ env.CppUnitTest(
'$BUILD_DIR/mongo/s/catalog/sharding_catalog_client_impl',
'$BUILD_DIR/mongo/s/catalog/sharding_catalog_mock',
'$BUILD_DIR/mongo/s/sharding_mongod_test_fixture',
+ '$BUILD_DIR/mongo/s/shard_server_test_fixture',
],
)
env.CppUnitTest(
target='collection_sharding_state_test',
source=[
- 'collection_sharding_state_test.cpp',
+ 'collection_metadata_test.cpp',
'collection_range_deleter_test.cpp',
+ 'metadata_manager_test.cpp',
+ 'collection_sharding_state_test.cpp',
],
LIBDEPS=[
'$BUILD_DIR/mongo/client/remote_command_targeter_mock',
@@ -275,6 +264,7 @@ env.CppUnitTest(
'$BUILD_DIR/mongo/util/clock_source_mock',
'$BUILD_DIR/mongo/util/net/message_port_mock',
'$BUILD_DIR/mongo/db/service_context_d_test_fixture',
+ '$BUILD_DIR/mongo/s/sharding_mongod_test_fixture',
],
)
diff --git a/src/mongo/db/s/cleanup_orphaned_cmd.cpp b/src/mongo/db/s/cleanup_orphaned_cmd.cpp
index 117a50b57c7..a7a9519300b 100644
--- a/src/mongo/db/s/cleanup_orphaned_cmd.cpp
+++ b/src/mongo/db/s/cleanup_orphaned_cmd.cpp
@@ -53,6 +53,8 @@
#include "mongo/s/migration_secondary_throttle_options.h"
#include "mongo/util/log.h"
+#include <boost/optional.hpp>
+
namespace mongo {
using std::string;
@@ -78,65 +80,60 @@ CleanupResult cleanupOrphanedData(OperationContext* opCtx,
const WriteConcernOptions& secondaryThrottle,
BSONObj* stoppedAtKey,
string* errMsg) {
- BSONObj startingFromKey = startingFromKeyConst;
- ScopedCollectionMetadata metadata;
+ BSONObj startingFromKey = startingFromKeyConst;
+ boost::optional<ChunkRange> targetRange;
+ OID epoch;
{
- AutoGetCollection autoColl(opCtx, ns, MODE_IS);
- metadata = CollectionShardingState::get(opCtx, ns.toString())->getMetadata();
- }
-
- if (!metadata) {
- warning() << "skipping orphaned data cleanup for " << ns.toString()
+ AutoGetCollection autoColl(opCtx, ns, MODE_IX);
+ auto css = CollectionShardingState::get(opCtx, ns.toString());
+ auto metadata = css->getMetadata();
+ if (!metadata) {
+ log() << "skipping orphaned data cleanup for " << ns.toString()
<< ", collection is not sharded";
+ return CleanupResult_Done;
+ }
+ epoch = metadata->getCollVersion().epoch();
+
+ BSONObj keyPattern = metadata->getKeyPattern();
+ if (!startingFromKey.isEmpty()) {
+ if (!metadata->isValidKey(startingFromKey)) {
+ *errMsg = stream() << "could not cleanup orphaned data, start key "
+ << startingFromKey << " does not match shard key pattern "
+ << keyPattern;
+
+ log() << *errMsg;
+ return CleanupResult_Error;
+ }
+ } else {
+ startingFromKey = metadata->getMinKey();
+ }
- return CleanupResult_Done;
- }
-
- BSONObj keyPattern = metadata->getKeyPattern();
- if (!startingFromKey.isEmpty()) {
- if (!metadata->isValidKey(startingFromKey)) {
- *errMsg = stream() << "could not cleanup orphaned data, start key "
- << redact(startingFromKey) << " does not match shard key pattern "
- << keyPattern;
+ boost::optional<KeyRange> orphanRange = css->getNextOrphanRange(startingFromKey);
+ if (!orphanRange) {
+ LOG(1) << "cleanupOrphaned requested for " << ns.toString() << " starting from "
+ << redact(startingFromKey) << ", no orphan ranges remain";
- warning() << *errMsg;
- return CleanupResult_Error;
+ return CleanupResult_Done;
}
- } else {
- startingFromKey = metadata->getMinKey();
- }
+ orphanRange->ns = ns.ns();
+ *stoppedAtKey = orphanRange->maxKey;
- KeyRange orphanRange;
- if (!metadata->getNextOrphanRange(startingFromKey, &orphanRange)) {
- LOG(1) << "cleanupOrphaned requested for " << ns.toString() << " starting from "
- << redact(startingFromKey) << ", no orphan ranges remain";
+ LOG(0) << "cleanupOrphaned requested for " << ns.toString() << " starting from "
+ << redact(startingFromKey) << ", removing next orphan range"
+ << " [" << redact(orphanRange->minKey) << "," << redact(orphanRange->maxKey) << ")";
- return CleanupResult_Done;
+ targetRange.emplace(
+ ChunkRange(orphanRange->minKey.getOwned(), orphanRange->maxKey.getOwned()));
+ uassertStatusOK(css->cleanUpRange(*targetRange));
}
- orphanRange.ns = ns.ns();
- *stoppedAtKey = orphanRange.maxKey;
-
- LOG(0) << "cleanupOrphaned requested for " << ns.toString() << " starting from "
- << redact(startingFromKey) << ", removing next orphan range"
- << " [" << redact(orphanRange.minKey) << "," << redact(orphanRange.maxKey) << ")";
-
- // Metadata snapshot may be stale now, but deleter checks metadata again in write lock
- // before delete.
- RangeDeleterOptions deleterOptions(orphanRange);
- deleterOptions.writeConcern = secondaryThrottle;
- deleterOptions.onlyRemoveOrphanedDocs = true;
- deleterOptions.fromMigrate = true;
- // Must wait for cursors since there can be existing cursors with an older
- // CollectionMetadata.
- deleterOptions.waitForOpenCursors = true;
- deleterOptions.removeSaverReason = "cleanup-cmd";
-
- if (!getDeleter()->deleteNow(opCtx, deleterOptions, errMsg)) {
- warning() << redact(*errMsg);
- return CleanupResult_Error;
+ if (targetRange) {
+ auto result = CollectionShardingState::waitForClean(opCtx, ns, epoch, *targetRange);
+ if (!result.isOK()) {
+ warning() << redact(result.reason());
+ return CleanupResult_Error;
+ }
}
-
return CleanupResult_Continue;
}
diff --git a/src/mongo/db/s/collection_metadata.cpp b/src/mongo/db/s/collection_metadata.cpp
index b18e325adb0..f9e1a32ac51 100644
--- a/src/mongo/db/s/collection_metadata.cpp
+++ b/src/mongo/db/s/collection_metadata.cpp
@@ -49,7 +49,6 @@ CollectionMetadata::CollectionMetadata(const BSONObj& keyPattern,
_collVersion(collectionVersion),
_shardVersion(shardVersion),
_chunksMap(std::move(shardChunksMap)),
- _pendingMap(SimpleBSONObjComparator::kInstance.makeBSONObjIndexedMap<CachedChunkInfo>()),
_rangesMap(SimpleBSONObjComparator::kInstance.makeBSONObjIndexedMap<CachedChunkInfo>()) {
invariant(_shardKeyPattern.isValid());
@@ -61,16 +60,24 @@ CollectionMetadata::CollectionMetadata(const BSONObj& keyPattern,
invariant(!_shardVersion.isSet());
return;
}
-
invariant(_shardVersion.isSet());
- // Load the chunk information, coallesceing their ranges. The version for this shard would be
+ _buildRangesMap();
+}
+
+CollectionMetadata::~CollectionMetadata() = default;
+
+void CollectionMetadata::_buildRangesMap() {
+ _rangesMap.clear();
+
+ // Load the chunk information, coalescing their ranges. The version for this shard would be
// the highest version for any of the chunks.
+
BSONObj min, max;
for (const auto& entry : _chunksMap) {
- BSONObj currMin = entry.first;
- BSONObj currMax = entry.second.getMaxKey();
+ BSONObj const& currMin = entry.first;
+ BSONObj const& currMax = entry.second.getMaxKey();
// Coalesce the chunk's bounds in ranges if they are adjacent chunks
if (min.isEmpty()) {
@@ -96,54 +103,9 @@ CollectionMetadata::CollectionMetadata(const BSONObj& keyPattern,
_rangesMap.emplace(min, CachedChunkInfo(max, ChunkVersion::IGNORED()));
}
-CollectionMetadata::~CollectionMetadata() = default;
-
-std::unique_ptr<CollectionMetadata> CollectionMetadata::cloneMinusPending(
- const ChunkType& chunk) const {
- invariant(rangeMapContains(_pendingMap, chunk.getMin(), chunk.getMax()));
-
- auto metadata(stdx::make_unique<CollectionMetadata>(
- _shardKeyPattern.toBSON(), getCollVersion(), getShardVersion(), getChunks()));
- metadata->_pendingMap = _pendingMap;
-
- metadata->_pendingMap.erase(chunk.getMin());
-
- return metadata;
-}
-
-std::unique_ptr<CollectionMetadata> CollectionMetadata::clonePlusPending(
- const ChunkType& chunk) const {
- invariant(!rangeMapOverlaps(_chunksMap, chunk.getMin(), chunk.getMax()));
-
- auto metadata(stdx::make_unique<CollectionMetadata>(
- _shardKeyPattern.toBSON(), getCollVersion(), getShardVersion(), getChunks()));
- metadata->_pendingMap = _pendingMap;
-
- // If there are any pending chunks on the interval to be added this is ok, since pending chunks
- // aren't officially tracked yet and something may have changed on servers we do not see yet.
- //
- // We remove any chunks we overlap because the remote request starting a chunk migration is what
- // is authoritative.
-
- if (rangeMapOverlaps(_pendingMap, chunk.getMin(), chunk.getMax())) {
- RangeVector pendingOverlap;
- getRangeMapOverlap(_pendingMap, chunk.getMin(), chunk.getMax(), &pendingOverlap);
-
- warning() << "new pending chunk " << redact(rangeToString(chunk.getMin(), chunk.getMax()))
- << " overlaps existing pending chunks " << redact(overlapToString(pendingOverlap))
- << ", a migration may not have completed";
-
- for (RangeVector::iterator it = pendingOverlap.begin(); it != pendingOverlap.end(); ++it) {
- metadata->_pendingMap.erase(it->first);
- }
- }
-
- // The pending map entry cannot contain a specific chunk version because we don't know what
- // version would be generated for it at commit time. That's why we insert an IGNORED value.
- metadata->_pendingMap.emplace(chunk.getMin(),
- CachedChunkInfo(chunk.getMax(), ChunkVersion::IGNORED()));
-
- return metadata;
+std::unique_ptr<CollectionMetadata> CollectionMetadata::clone() const {
+ return stdx::make_unique<CollectionMetadata>(
+ _shardKeyPattern.toBSON(), getCollVersion(), getShardVersion(), getChunks());
}
bool CollectionMetadata::keyBelongsToMe(const BSONObj& key) const {
@@ -158,18 +120,6 @@ bool CollectionMetadata::keyBelongsToMe(const BSONObj& key) const {
return rangeContains(it->first, it->second.getMaxKey(), key);
}
-bool CollectionMetadata::keyIsPending(const BSONObj& key) const {
- if (_pendingMap.empty()) {
- return false;
- }
-
- auto it = _pendingMap.upper_bound(key);
- if (it != _pendingMap.begin())
- it--;
-
- return rangeContains(it->first, it->second.getMaxKey(), key);
-}
-
bool CollectionMetadata::getNextChunk(const BSONObj& lookupKey, ChunkType* chunk) const {
RangeMap::const_iterator upperChunkIt = _chunksMap.upper_bound(lookupKey);
RangeMap::const_iterator lowerChunkIt = upperChunkIt;
@@ -238,6 +188,10 @@ Status CollectionMetadata::checkChunkIsValid(const ChunkType& chunk) {
return Status::OK();
}
+bool CollectionMetadata::rangeOverlapsChunk(ChunkRange const& range) {
+ return rangeMapOverlaps(_rangesMap, range.getMin(), range.getMax());
+}
+
void CollectionMetadata::toBSONBasic(BSONObjBuilder& bb) const {
_collVersion.addToBSON(bb, "collVersion");
_shardVersion.addToBSON(bb, "shardVersion");
@@ -256,100 +210,71 @@ void CollectionMetadata::toBSONChunks(BSONArrayBuilder& bb) const {
}
}
-void CollectionMetadata::toBSONPending(BSONArrayBuilder& bb) const {
- if (_pendingMap.empty())
- return;
-
- for (RangeMap::const_iterator it = _pendingMap.begin(); it != _pendingMap.end(); ++it) {
- BSONArrayBuilder pendingBB(bb.subarrayStart());
- pendingBB.append(it->first);
- pendingBB.append(it->second.getMaxKey());
- pendingBB.done();
- }
-}
-
std::string CollectionMetadata::toStringBasic() const {
return str::stream() << "collection version: " << _collVersion.toString()
<< ", shard version: " << _shardVersion.toString();
}
-bool CollectionMetadata::getNextOrphanRange(const BSONObj& origLookupKey, KeyRange* range) const {
+boost::optional<KeyRange> CollectionMetadata::getNextOrphanRange(
+ RangeMap const& receivingChunks, BSONObj const& origLookupKey) const {
+
BSONObj lookupKey = origLookupKey;
BSONObj maxKey = getMaxKey(); // so we don't keep rebuilding
while (lookupKey.woCompare(maxKey) < 0) {
- RangeMap::const_iterator lowerChunkIt = _chunksMap.end();
- RangeMap::const_iterator upperChunkIt = _chunksMap.end();
-
- if (!_chunksMap.empty()) {
- upperChunkIt = _chunksMap.upper_bound(lookupKey);
- lowerChunkIt = upperChunkIt;
- if (upperChunkIt != _chunksMap.begin())
- --lowerChunkIt;
- else
- lowerChunkIt = _chunksMap.end();
- }
- // If we overlap, continue after the overlap
- // TODO: Could optimize slightly by finding next non-contiguous chunk
- if (lowerChunkIt != _chunksMap.end() &&
- lowerChunkIt->second.getMaxKey().woCompare(lookupKey) > 0) {
- lookupKey = lowerChunkIt->second.getMaxKey();
+ using Its = std::pair<RangeMap::const_iterator, RangeMap::const_iterator>;
+
+ auto patchLookupKey = [&](RangeMap const& map) -> boost::optional<Its> {
+ auto lowerIt = map.end(), upperIt = map.end();
+
+ if (!map.empty()) {
+ upperIt = map.upper_bound(lookupKey);
+ lowerIt = upperIt;
+ if (upperIt != map.begin())
+ --lowerIt;
+ else
+ lowerIt = map.end();
+ }
+
+ // If we overlap, continue after the overlap
+ // TODO: Could optimize slightly by finding next non-contiguous chunk
+ if (lowerIt != map.end() && lowerIt->second.getMaxKey().woCompare(lookupKey) > 0) {
+ lookupKey = lowerIt->second.getMaxKey(); // note side effect
+ return boost::none;
+ } else {
+ return Its(lowerIt, upperIt);
+ }
+ };
+
+ boost::optional<Its> chunksIts, pendingIts;
+ if (!(chunksIts = patchLookupKey(_chunksMap)) ||
+ !(pendingIts = patchLookupKey(receivingChunks))) {
continue;
}
- RangeMap::const_iterator lowerPendingIt = _pendingMap.end();
- RangeMap::const_iterator upperPendingIt = _pendingMap.end();
-
- if (!_pendingMap.empty()) {
- upperPendingIt = _pendingMap.upper_bound(lookupKey);
- lowerPendingIt = upperPendingIt;
- if (upperPendingIt != _pendingMap.begin())
- --lowerPendingIt;
- else
- lowerPendingIt = _pendingMap.end();
- }
-
- // If we overlap, continue after the overlap
- // TODO: Could optimize slightly by finding next non-contiguous chunk
- if (lowerPendingIt != _pendingMap.end() &&
- lowerPendingIt->second.getMaxKey().woCompare(lookupKey) > 0) {
- lookupKey = lowerPendingIt->second.getMaxKey();
- continue;
- }
-
- //
- // We know that the lookup key is not covered by a chunk or pending range, and where the
- // previous chunk and pending chunks are. Now we fill in the bounds as the closest
- // bounds of the surrounding ranges in both maps.
- //
-
- range->keyPattern = _shardKeyPattern.toBSON();
- range->minKey = getMinKey();
- range->maxKey = maxKey;
-
- if (lowerChunkIt != _chunksMap.end() &&
- lowerChunkIt->second.getMaxKey().woCompare(range->minKey) > 0) {
- range->minKey = lowerChunkIt->second.getMaxKey();
- }
-
- if (upperChunkIt != _chunksMap.end() && upperChunkIt->first.woCompare(range->maxKey) < 0) {
- range->maxKey = upperChunkIt->first;
- }
-
- if (lowerPendingIt != _pendingMap.end() &&
- lowerPendingIt->second.getMaxKey().woCompare(range->minKey) > 0) {
- range->minKey = lowerPendingIt->second.getMaxKey();
- }
-
- if (upperPendingIt != _pendingMap.end() &&
- upperPendingIt->first.woCompare(range->maxKey) < 0) {
- range->maxKey = upperPendingIt->first;
- }
-
- return true;
+ boost::optional<KeyRange> range =
+ KeyRange("", getMinKey(), maxKey, _shardKeyPattern.toBSON());
+
+ auto patchArgRange = [&range](RangeMap const& map, Its its) {
+ // We know that the lookup key is not covered by a chunk or pending range, and where the
+ // previous chunk and pending chunks are. Now we fill in the bounds as the closest
+ // bounds of the surrounding ranges in both maps.
+ auto lowerIt = its.first, upperIt = its.second;
+
+ if (lowerIt != map.end() && lowerIt->second.getMaxKey().woCompare(range->minKey) > 0) {
+ range->minKey = lowerIt->second.getMaxKey();
+ }
+ if (upperIt != map.end() && upperIt->first.woCompare(range->maxKey) < 0) {
+ range->maxKey = upperIt->first;
+ }
+ };
+
+ patchArgRange(_chunksMap, *chunksIts);
+ patchArgRange(receivingChunks, *pendingIts);
+ return range;
}
- return false;
+ return boost::none;
}
BSONObj CollectionMetadata::getMinKey() const {
diff --git a/src/mongo/db/s/collection_metadata.h b/src/mongo/db/s/collection_metadata.h
index ea7392842d6..de318225ec6 100644
--- a/src/mongo/db/s/collection_metadata.h
+++ b/src/mongo/db/s/collection_metadata.h
@@ -29,6 +29,7 @@
#pragma once
#include "mongo/db/range_arithmetic.h"
+#include "mongo/s/catalog/type_chunk.h"
#include "mongo/s/chunk_version.h"
#include "mongo/s/shard_key_pattern.h"
@@ -51,10 +52,7 @@ class ChunkType;
class CollectionMetadata {
public:
/**
- * The main way to construct CollectionMetadata is through MetadataLoader or the clone*()
- * methods.
- *
- * The constructors should not be used directly outside of tests.
+ * The main way to construct CollectionMetadata is through MetadataLoader or clone() methods.
*/
CollectionMetadata(const BSONObj& keyPattern,
ChunkVersion collectionVersion,
@@ -64,20 +62,9 @@ public:
~CollectionMetadata();
/**
- * Returns a new metadata's instance based on 'this's state by removing a 'pending' chunk.
- *
- * The shard and collection version of the new metadata are unaffected. The caller owns the
- * new metadata.
- */
- std::unique_ptr<CollectionMetadata> cloneMinusPending(const ChunkType& chunk) const;
-
- /**
- * Returns a new metadata's instance based on 'this's state by adding a 'pending' chunk.
- *
- * The shard and collection version of the new metadata are unaffected. The caller owns the
- * new metadata.
+ * Returns a new metadata's instance based on 'this's state;
*/
- std::unique_ptr<CollectionMetadata> clonePlusPending(const ChunkType& chunk) const;
+ std::unique_ptr<CollectionMetadata> clone() const;
/**
* Returns true if the document key 'key' is a valid instance of a shard key for this
@@ -93,12 +80,6 @@ public:
bool keyBelongsToMe(const BSONObj& key) const;
/**
- * Returns true if the document key 'key' is or has been migrated to this shard, and may
- * belong to us after a subsequent config reload. Key must be the full shard key.
- */
- bool keyIsPending(const BSONObj& key) const;
-
- /**
* Given a key 'lookupKey' in the shard key range, get the next chunk which overlaps or is
* greater than this key. Returns true if a chunk exists, false otherwise.
*
@@ -117,6 +98,11 @@ public:
Status checkChunkIsValid(const ChunkType& chunk);
/**
+ * Returns true if the argument range overlaps any chunk.
+ */
+ bool rangeOverlapsChunk(ChunkRange const& range);
+
+ /**
* Given a key in the shard key range, get the next range which overlaps or is greater than
* this key.
*
@@ -124,15 +110,17 @@ public:
*
* KeyRange range;
* BSONObj lookupKey = metadata->getMinKey();
- * while( metadata->getNextOrphanRange( lookupKey, &orphanRange ) ) {
- * // Do stuff with range
- * lookupKey = orphanRange.maxKey;
+ * boost::optional<KeyRange> range;
+ * while((range = metadata->getNextOrphanRange(receiveMap, lookupKey))) {
+ * lookupKey = range->maxKey;
* }
*
* @param lookupKey passing a key that does not belong to this metadata is undefined.
+ * @param receiveMap is an extra set of chunks not considered orphaned.
* @param orphanRange the output range. Note that the NS is not set.
*/
- bool getNextOrphanRange(const BSONObj& lookupKey, KeyRange* orphanRange) const;
+ boost::optional<KeyRange> getNextOrphanRange(RangeMap const& receiveMap,
+ BSONObj const& lookupKey) const;
ChunkVersion getCollVersion() const {
return _collVersion;
@@ -173,16 +161,16 @@ public:
void toBSONChunks(BSONArrayBuilder& bb) const;
/**
- * BSON output of the pending metadata into a BSONArray
- */
- void toBSONPending(BSONArrayBuilder& bb) const;
-
- /**
* String output of the collection and shard versions.
*/
std::string toStringBasic() const;
private:
+ /**
+ * Builds _rangesMap from the contents of _chunksMap.
+ */
+ void _buildRangesMap();
+
// Shard key pattern for the collection
ShardKeyPattern _shardKeyPattern;
@@ -196,10 +184,7 @@ private:
// Map of chunks tracked by this shard
RangeMap _chunksMap;
- // Map of ranges of chunks that are migrating but have not been confirmed added yet
- RangeMap _pendingMap;
-
- // A second map from a min key into a range or contiguous chunks. The map is redundant
+ // A second map from a min key into a range of contiguous chunks. The map is redundant
// w.r.t. _chunkMap but we expect high chunk contiguity, especially in small
// installations.
RangeMap _rangesMap;
diff --git a/src/mongo/db/s/collection_metadata_test.cpp b/src/mongo/db/s/collection_metadata_test.cpp
index c765c67ee3a..999e5282490 100644
--- a/src/mongo/db/s/collection_metadata_test.cpp
+++ b/src/mongo/db/s/collection_metadata_test.cpp
@@ -29,6 +29,7 @@
#include "mongo/platform/basic.h"
#include "mongo/base/status.h"
+#include "mongo/db/range_arithmetic.h"
#include "mongo/db/s/collection_metadata.h"
#include "mongo/s/catalog/type_chunk.h"
#include "mongo/s/chunk_version.h"
@@ -53,6 +54,11 @@ protected:
}
};
+struct stRangeMap : public RangeMap {
+ stRangeMap()
+ : RangeMap(SimpleBSONObjComparator::kInstance.makeBSONObjIndexedMap<CachedChunkInfo>()) {}
+};
+
TEST_F(NoChunkFixture, BasicBelongsToMe) {
ASSERT_FALSE(makeCollectionMetadata()->keyBelongsToMe(BSON("a" << MINKEY)));
ASSERT_FALSE(makeCollectionMetadata()->keyBelongsToMe(BSON("a" << 10)));
@@ -86,147 +92,42 @@ TEST_F(NoChunkFixture, getDifferentFromEmpty) {
&differentChunk));
}
-TEST_F(NoChunkFixture, NoPendingChunks) {
- ASSERT(!makeCollectionMetadata()->keyIsPending(BSON("a" << 15)));
- ASSERT(!makeCollectionMetadata()->keyIsPending(BSON("a" << 25)));
-}
-
-TEST_F(NoChunkFixture, FirstPendingChunk) {
- ChunkType chunk;
- chunk.setMin(BSON("a" << 10));
- chunk.setMax(BSON("a" << 20));
-
- auto cloned(makeCollectionMetadata()->clonePlusPending(chunk));
- ASSERT(cloned->keyIsPending(BSON("a" << 15)));
- ASSERT(!cloned->keyIsPending(BSON("a" << 25)));
- ASSERT(cloned->keyIsPending(BSON("a" << 10)));
- ASSERT(!cloned->keyIsPending(BSON("a" << 20)));
-}
-
-TEST_F(NoChunkFixture, EmptyMultiPendingChunk) {
- ChunkType chunk;
- chunk.setMin(BSON("a" << 10));
- chunk.setMax(BSON("a" << 20));
-
- auto cloned(makeCollectionMetadata()->clonePlusPending(chunk));
-
- chunk.setMin(BSON("a" << 40));
- chunk.setMax(BSON("a" << 50));
-
- cloned = cloned->clonePlusPending(chunk);
- ASSERT(cloned->keyIsPending(BSON("a" << 15)));
- ASSERT(!cloned->keyIsPending(BSON("a" << 25)));
- ASSERT(cloned->keyIsPending(BSON("a" << 45)));
- ASSERT(!cloned->keyIsPending(BSON("a" << 55)));
-}
-
-TEST_F(NoChunkFixture, MinusPendingChunk) {
- ChunkType chunk;
- chunk.setMin(BSON("a" << 10));
- chunk.setMax(BSON("a" << 20));
-
- auto cloned(makeCollectionMetadata()->clonePlusPending(chunk));
-
- cloned = cloned->cloneMinusPending(chunk);
- ASSERT(!cloned->keyIsPending(BSON("a" << 15)));
- ASSERT(!cloned->keyIsPending(BSON("a" << 25)));
-}
-
-TEST_F(NoChunkFixture, OverlappingPendingChunk) {
- ChunkType chunk;
- chunk.setMin(BSON("a" << 10));
- chunk.setMax(BSON("a" << 30));
-
- auto cloned(makeCollectionMetadata()->clonePlusPending(chunk));
-
- chunk.setMin(BSON("a" << 20));
- chunk.setMax(BSON("a" << 40));
-
- cloned = cloned->clonePlusPending(chunk);
- ASSERT(!cloned->keyIsPending(BSON("a" << 15)));
- ASSERT(cloned->keyIsPending(BSON("a" << 25)));
- ASSERT(cloned->keyIsPending(BSON("a" << 35)));
- ASSERT(!cloned->keyIsPending(BSON("a" << 45)));
-}
-
-TEST_F(NoChunkFixture, OverlappingPendingChunks) {
- ChunkType chunk;
- chunk.setMin(BSON("a" << 10));
- chunk.setMax(BSON("a" << 30));
-
- auto cloned(makeCollectionMetadata()->clonePlusPending(chunk));
-
- chunk.setMin(BSON("a" << 30));
- chunk.setMax(BSON("a" << 50));
-
- cloned = cloned->clonePlusPending(chunk);
-
- chunk.setMin(BSON("a" << 20));
- chunk.setMax(BSON("a" << 40));
-
- cloned = cloned->clonePlusPending(chunk);
-
- ASSERT(!cloned->keyIsPending(BSON("a" << 15)));
- ASSERT(cloned->keyIsPending(BSON("a" << 25)));
- ASSERT(cloned->keyIsPending(BSON("a" << 35)));
- ASSERT(!cloned->keyIsPending(BSON("a" << 45)));
-}
-
TEST_F(NoChunkFixture, OrphanedDataRangeBegin) {
auto metadata(makeCollectionMetadata());
- KeyRange keyRange;
+ stRangeMap pending;
BSONObj lookupKey = metadata->getMinKey();
- ASSERT(metadata->getNextOrphanRange(lookupKey, &keyRange));
+ auto keyRange = metadata->getNextOrphanRange(pending, lookupKey);
+ ASSERT(keyRange);
- ASSERT(keyRange.minKey.woCompare(metadata->getMinKey()) == 0);
- ASSERT(keyRange.maxKey.woCompare(metadata->getMaxKey()) == 0);
+ ASSERT(keyRange->minKey.woCompare(metadata->getMinKey()) == 0);
+ ASSERT(keyRange->maxKey.woCompare(metadata->getMaxKey()) == 0);
// Make sure we don't have any more ranges
- ASSERT(!metadata->getNextOrphanRange(keyRange.maxKey, &keyRange));
+ ASSERT(!metadata->getNextOrphanRange(pending, keyRange->maxKey));
}
TEST_F(NoChunkFixture, OrphanedDataRangeMiddle) {
auto metadata(makeCollectionMetadata());
- KeyRange keyRange;
+ stRangeMap pending;
BSONObj lookupKey = BSON("a" << 20);
- ASSERT(metadata->getNextOrphanRange(lookupKey, &keyRange));
+ auto keyRange = metadata->getNextOrphanRange(pending, lookupKey);
+ ASSERT(keyRange);
- ASSERT(keyRange.minKey.woCompare(metadata->getMinKey()) == 0);
- ASSERT(keyRange.maxKey.woCompare(metadata->getMaxKey()) == 0);
- ASSERT(keyRange.keyPattern.woCompare(metadata->getKeyPattern()) == 0);
+ ASSERT(keyRange->minKey.woCompare(metadata->getMinKey()) == 0);
+ ASSERT(keyRange->maxKey.woCompare(metadata->getMaxKey()) == 0);
+ ASSERT(keyRange->keyPattern.woCompare(metadata->getKeyPattern()) == 0);
// Make sure we don't have any more ranges
- ASSERT(!metadata->getNextOrphanRange(keyRange.maxKey, &keyRange));
+ ASSERT(!metadata->getNextOrphanRange(pending, keyRange->maxKey));
}
TEST_F(NoChunkFixture, OrphanedDataRangeEnd) {
auto metadata(makeCollectionMetadata());
- KeyRange keyRange;
- ASSERT(!metadata->getNextOrphanRange(metadata->getMaxKey(), &keyRange));
-}
-
-TEST_F(NoChunkFixture, PendingOrphanedDataRanges) {
- ChunkType chunk;
- chunk.setMin(BSON("a" << 10));
- chunk.setMax(BSON("a" << 20));
-
- auto cloned(makeCollectionMetadata()->clonePlusPending(chunk));
-
- KeyRange keyRange;
- ASSERT(cloned->getNextOrphanRange(cloned->getMinKey(), &keyRange));
- ASSERT(keyRange.minKey.woCompare(cloned->getMinKey()) == 0);
- ASSERT(keyRange.maxKey.woCompare(BSON("a" << 10)) == 0);
- ASSERT(keyRange.keyPattern.woCompare(cloned->getKeyPattern()) == 0);
-
- ASSERT(cloned->getNextOrphanRange(keyRange.maxKey, &keyRange));
- ASSERT(keyRange.minKey.woCompare(BSON("a" << 20)) == 0);
- ASSERT(keyRange.maxKey.woCompare(cloned->getMaxKey()) == 0);
- ASSERT(keyRange.keyPattern.woCompare(cloned->getKeyPattern()) == 0);
-
- ASSERT(!cloned->getNextOrphanRange(keyRange.maxKey, &keyRange));
+ stRangeMap pending;
+ ASSERT(!metadata->getNextOrphanRange(pending, metadata->getMaxKey()));
}
/**
@@ -265,7 +166,7 @@ TEST_F(SingleChunkFixture, DoesntBelongsToMe) {
ASSERT_FALSE(makeCollectionMetadata()->keyBelongsToMe(BSON("a" << MAXKEY)));
}
-TEST_F(SingleChunkFixture, CompoudKeyBelongsToMe) {
+TEST_F(SingleChunkFixture, CompoundKeyBelongsToMe) {
ASSERT(makeCollectionMetadata()->keyBelongsToMe(BSON("a" << 15 << "a" << 14)));
}
@@ -288,33 +189,23 @@ TEST_F(SingleChunkFixture, getDifferentFromOneIsFalse) {
ASSERT(!makeCollectionMetadata()->getDifferentChunk(BSON("a" << 10), &differentChunk));
}
-TEST_F(SingleChunkFixture, PlusPendingChunk) {
- ChunkType chunk;
- chunk.setMin(BSON("a" << 20));
- chunk.setMax(BSON("a" << 30));
+TEST_F(SingleChunkFixture, ChunkOrphanedDataRanges) {
+ stRangeMap pending;
+ auto keyRange = makeCollectionMetadata()->getNextOrphanRange(
+ pending, makeCollectionMetadata()->getMinKey());
+ ASSERT(keyRange);
- auto cloned(makeCollectionMetadata()->clonePlusPending(chunk));
+ ASSERT(keyRange->minKey.woCompare(makeCollectionMetadata()->getMinKey()) == 0);
+ ASSERT(keyRange->maxKey.woCompare(BSON("a" << 10)) == 0);
+ ASSERT(keyRange->keyPattern.woCompare(makeCollectionMetadata()->getKeyPattern()) == 0);
- ASSERT(cloned->keyBelongsToMe(BSON("a" << 15)));
- ASSERT(!cloned->keyBelongsToMe(BSON("a" << 25)));
- ASSERT(!cloned->keyIsPending(BSON("a" << 15)));
- ASSERT(cloned->keyIsPending(BSON("a" << 25)));
-}
+ keyRange = makeCollectionMetadata()->getNextOrphanRange(pending, keyRange->maxKey);
+ ASSERT(keyRange);
+ ASSERT(keyRange->minKey.woCompare(BSON("a" << 20)) == 0);
+ ASSERT(keyRange->maxKey.woCompare(makeCollectionMetadata()->getMaxKey()) == 0);
+ ASSERT(keyRange->keyPattern.woCompare(makeCollectionMetadata()->getKeyPattern()) == 0);
-TEST_F(SingleChunkFixture, ChunkOrphanedDataRanges) {
- KeyRange keyRange;
- ASSERT(makeCollectionMetadata()->getNextOrphanRange(makeCollectionMetadata()->getMinKey(),
- &keyRange));
- ASSERT(keyRange.minKey.woCompare(makeCollectionMetadata()->getMinKey()) == 0);
- ASSERT(keyRange.maxKey.woCompare(BSON("a" << 10)) == 0);
- ASSERT(keyRange.keyPattern.woCompare(makeCollectionMetadata()->getKeyPattern()) == 0);
-
- ASSERT(makeCollectionMetadata()->getNextOrphanRange(keyRange.maxKey, &keyRange));
- ASSERT(keyRange.minKey.woCompare(BSON("a" << 20)) == 0);
- ASSERT(keyRange.maxKey.woCompare(makeCollectionMetadata()->getMaxKey()) == 0);
- ASSERT(keyRange.keyPattern.woCompare(makeCollectionMetadata()->getKeyPattern()) == 0);
-
- ASSERT(!makeCollectionMetadata()->getNextOrphanRange(keyRange.maxKey, &keyRange));
+ ASSERT(!makeCollectionMetadata()->getNextOrphanRange(pending, keyRange->maxKey));
}
/**
@@ -342,7 +233,7 @@ protected:
// Note: no tests for single key belongsToMe because they are not allowed
// if shard key is compound.
-TEST_F(SingleChunkMinMaxCompoundKeyFixture, CompoudKeyBelongsToMe) {
+TEST_F(SingleChunkMinMaxCompoundKeyFixture, CompoundKeyBelongsToMe) {
ASSERT(makeCollectionMetadata()->keyBelongsToMe(BSON("a" << MINKEY << "b" << MINKEY)));
ASSERT_FALSE(makeCollectionMetadata()->keyBelongsToMe(BSON("a" << MAXKEY << "b" << MAXKEY)));
ASSERT(makeCollectionMetadata()->keyBelongsToMe(BSON("a" << MINKEY << "b" << 10)));
@@ -375,45 +266,27 @@ protected:
};
TEST_F(TwoChunksWithGapCompoundKeyFixture, ChunkGapOrphanedDataRanges) {
- KeyRange keyRange;
- ASSERT(makeCollectionMetadata()->getNextOrphanRange(makeCollectionMetadata()->getMinKey(),
- &keyRange));
- ASSERT(keyRange.minKey.woCompare(makeCollectionMetadata()->getMinKey()) == 0);
- ASSERT(keyRange.maxKey.woCompare(BSON("a" << 10 << "b" << 0)) == 0);
- ASSERT(keyRange.keyPattern.woCompare(makeCollectionMetadata()->getKeyPattern()) == 0);
-
- ASSERT(makeCollectionMetadata()->getNextOrphanRange(keyRange.maxKey, &keyRange));
- ASSERT(keyRange.minKey.woCompare(BSON("a" << 20 << "b" << 0)) == 0);
- ASSERT(keyRange.maxKey.woCompare(BSON("a" << 30 << "b" << 0)) == 0);
- ASSERT(keyRange.keyPattern.woCompare(makeCollectionMetadata()->getKeyPattern()) == 0);
-
- ASSERT(makeCollectionMetadata()->getNextOrphanRange(keyRange.maxKey, &keyRange));
- ASSERT(keyRange.minKey.woCompare(BSON("a" << 40 << "b" << 0)) == 0);
- ASSERT(keyRange.maxKey.woCompare(makeCollectionMetadata()->getMaxKey()) == 0);
- ASSERT(keyRange.keyPattern.woCompare(makeCollectionMetadata()->getKeyPattern()) == 0);
-
- ASSERT(!makeCollectionMetadata()->getNextOrphanRange(keyRange.maxKey, &keyRange));
-}
-
-TEST_F(TwoChunksWithGapCompoundKeyFixture, ChunkGapAndPendingOrphanedDataRanges) {
- ChunkType chunk;
- chunk.setMin(BSON("a" << 20 << "b" << 0));
- chunk.setMax(BSON("a" << 30 << "b" << 0));
-
- auto cloned(makeCollectionMetadata()->clonePlusPending(chunk));
-
- KeyRange keyRange;
- ASSERT(cloned->getNextOrphanRange(cloned->getMinKey(), &keyRange));
- ASSERT(keyRange.minKey.woCompare(cloned->getMinKey()) == 0);
- ASSERT(keyRange.maxKey.woCompare(BSON("a" << 10 << "b" << 0)) == 0);
- ASSERT(keyRange.keyPattern.woCompare(cloned->getKeyPattern()) == 0);
-
- ASSERT(cloned->getNextOrphanRange(keyRange.maxKey, &keyRange));
- ASSERT(keyRange.minKey.woCompare(BSON("a" << 40 << "b" << 0)) == 0);
- ASSERT(keyRange.maxKey.woCompare(cloned->getMaxKey()) == 0);
- ASSERT(keyRange.keyPattern.woCompare(cloned->getKeyPattern()) == 0);
-
- ASSERT(!cloned->getNextOrphanRange(keyRange.maxKey, &keyRange));
+ stRangeMap pending;
+ auto keyRange = makeCollectionMetadata()->getNextOrphanRange(
+ pending, makeCollectionMetadata()->getMinKey());
+ ASSERT(keyRange);
+ ASSERT(keyRange->minKey.woCompare(makeCollectionMetadata()->getMinKey()) == 0);
+ ASSERT(keyRange->maxKey.woCompare(BSON("a" << 10 << "b" << 0)) == 0);
+ ASSERT(keyRange->keyPattern.woCompare(makeCollectionMetadata()->getKeyPattern()) == 0);
+
+ keyRange = makeCollectionMetadata()->getNextOrphanRange(pending, keyRange->maxKey);
+ ASSERT(keyRange);
+ ASSERT(keyRange->minKey.woCompare(BSON("a" << 20 << "b" << 0)) == 0);
+ ASSERT(keyRange->maxKey.woCompare(BSON("a" << 30 << "b" << 0)) == 0);
+ ASSERT(keyRange->keyPattern.woCompare(makeCollectionMetadata()->getKeyPattern()) == 0);
+
+ keyRange = makeCollectionMetadata()->getNextOrphanRange(pending, keyRange->maxKey);
+ ASSERT(keyRange);
+ ASSERT(keyRange->minKey.woCompare(BSON("a" << 40 << "b" << 0)) == 0);
+ ASSERT(keyRange->maxKey.woCompare(makeCollectionMetadata()->getMaxKey()) == 0);
+ ASSERT(keyRange->keyPattern.woCompare(makeCollectionMetadata()->getKeyPattern()) == 0);
+
+ ASSERT(!makeCollectionMetadata()->getNextOrphanRange(pending, keyRange->maxKey));
}
/**
diff --git a/src/mongo/db/s/collection_range_deleter.cpp b/src/mongo/db/s/collection_range_deleter.cpp
index 0a267cd263d..8ffbf1f0d21 100644
--- a/src/mongo/db/s/collection_range_deleter.cpp
+++ b/src/mongo/db/s/collection_range_deleter.cpp
@@ -33,6 +33,7 @@
#include "mongo/db/s/collection_range_deleter.h"
#include <algorithm>
+#include <utility>
#include "mongo/db/catalog/collection.h"
#include "mongo/db/client.h"
@@ -41,13 +42,16 @@
#include "mongo/db/exec/working_set_common.h"
#include "mongo/db/index/index_descriptor.h"
#include "mongo/db/keypattern.h"
+#include "mongo/db/operation_context.h"
#include "mongo/db/query/internal_plans.h"
#include "mongo/db/query/query_knobs.h"
#include "mongo/db/query/query_planner.h"
#include "mongo/db/repl/repl_client_info.h"
#include "mongo/db/repl/replication_coordinator_global.h"
#include "mongo/db/s/collection_sharding_state.h"
+#include "mongo/db/s/metadata_manager.h"
#include "mongo/db/s/sharding_state.h"
+#include "mongo/db/service_context.h"
#include "mongo/db/write_concern.h"
#include "mongo/executor/task_executor.h"
#include "mongo/util/log.h"
@@ -57,7 +61,6 @@
namespace mongo {
class ChunkRange;
-class OldClientWriteContext;
using CallbackArgs = executor::TaskExecutor::CallbackArgs;
using logger::LogComponent;
@@ -67,123 +70,144 @@ namespace {
const WriteConcernOptions kMajorityWriteConcern(WriteConcernOptions::kMajority,
WriteConcernOptions::SyncMode::UNSET,
Seconds(60));
-
} // unnamed namespace
-CollectionRangeDeleter::CollectionRangeDeleter(NamespaceString nss) : _nss(std::move(nss)) {}
-
-void CollectionRangeDeleter::run() {
- Client::initThread(getThreadName());
- ON_BLOCK_EXIT([&] { Client::destroy(); });
- auto opCtx = cc().makeOperationContext().get();
-
- const int maxToDelete = std::max(int(internalQueryExecYieldIterations.load()), 1);
- bool hasNextRangeToClean = cleanupNextRange(opCtx, maxToDelete);
-
- // If there are more ranges to run, we add <this> back onto the task executor to run again.
- if (hasNextRangeToClean) {
- auto executor = ShardingState::get(opCtx)->getRangeDeleterTaskExecutor();
- executor->scheduleWork([this](const CallbackArgs& cbArgs) { run(); });
- } else {
- delete this;
- }
+CollectionRangeDeleter::~CollectionRangeDeleter() {
+ // notify anybody still sleeping on orphan ranges
+ clear(Status{ErrorCodes::InterruptedDueToReplStateChange,
+ "Collection sharding metadata destroyed"});
}
-bool CollectionRangeDeleter::cleanupNextRange(OperationContext* opCtx, int maxToDelete) {
-
+bool CollectionRangeDeleter::cleanUpNextRange(OperationContext* opCtx,
+ NamespaceString const& nss,
+ int maxToDelete,
+ CollectionRangeDeleter* rangeDeleterForTestOnly) {
+ StatusWith<int> wrote = 0;
+ auto range = boost::optional<ChunkRange>(boost::none);
{
- AutoGetCollection autoColl(opCtx, _nss, MODE_IX);
+ AutoGetCollection autoColl(opCtx, nss, MODE_IX);
auto* collection = autoColl.getCollection();
if (!collection) {
- return false;
+ return false; // collection was dropped
}
- auto* collectionShardingState = CollectionShardingState::get(opCtx, _nss);
- dassert(collectionShardingState != nullptr); // every collection gets one
- auto& metadataManager = collectionShardingState->_metadataManager;
+ auto* css = CollectionShardingState::get(opCtx, nss);
+ {
+ auto scopedCollectionMetadata = css->getMetadata();
+ if (!scopedCollectionMetadata) {
+ return false; // collection was unsharded
+ }
- if (!_rangeInProgress && !metadataManager.hasRangesToClean()) {
- // Nothing left to do
- return false;
- }
+ // We don't actually know if this is the same collection that we were originally
+ // scheduled to do deletions on, or another one with the same name. But it doesn't
+ // matter: if it has deletions scheduled, now is as good a time as any to do them.
+ auto self = rangeDeleterForTestOnly ? rangeDeleterForTestOnly
+ : &css->_metadataManager._rangesToClean;
+ {
+ stdx::lock_guard<stdx::mutex> scopedLock(css->_metadataManager._managerLock);
+ if (self->isEmpty())
+ return false;
+
+ const auto& frontRange = self->_orphans.front().range;
+ range.emplace(frontRange.getMin().getOwned(), frontRange.getMax().getOwned());
+ }
- if (!_rangeInProgress || !metadataManager.isInRangesToClean(_rangeInProgress.get())) {
- // No valid chunk in progress, get a new one
- if (!metadataManager.hasRangesToClean()) {
- return false;
+ try {
+ auto keyPattern = scopedCollectionMetadata->getKeyPattern();
+
+ wrote = self->_doDeletion(opCtx, collection, keyPattern, *range, maxToDelete);
+ } catch (const DBException& e) {
+ wrote = e.toStatus();
+ warning() << e.what();
}
- _rangeInProgress = metadataManager.getNextRangeToClean();
- }
- auto scopedCollectionMetadata = collectionShardingState->getMetadata();
- int numDocumentsDeleted =
- _doDeletion(opCtx, collection, scopedCollectionMetadata->getKeyPattern(), maxToDelete);
- if (numDocumentsDeleted <= 0) {
- metadataManager.removeRangeToClean(_rangeInProgress.get());
- _rangeInProgress = boost::none;
- return metadataManager.hasRangesToClean();
- }
+ if (!wrote.isOK() || wrote.getValue() == 0) {
+ stdx::lock_guard<stdx::mutex> scopedLock(css->_metadataManager._managerLock);
+ self->_pop(wrote.getStatus());
+ return true;
+ }
+ } // drop scopedCollectionMetadata
+ } // drop autoColl
+
+ invariant(range);
+ invariantOK(wrote.getStatus());
+ invariant(wrote.getValue() > 0);
+
+ log() << "Deleted " << wrote.getValue() << " documents in " << nss.ns() << " range " << *range;
+
+ // Wait for replication outside the lock
+ const auto clientOpTime = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
+ WriteConcernResult unusedWCResult;
+ Status status = Status::OK();
+ try {
+ status = waitForWriteConcern(opCtx, clientOpTime, kMajorityWriteConcern, &unusedWCResult);
+ } catch (const DBException& e) {
+ status = e.toStatus();
}
- // wait for replication
- WriteConcernResult wcResult;
- auto currentClientOpTime = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
- Status status =
- waitForWriteConcern(opCtx, currentClientOpTime, kMajorityWriteConcern, &wcResult);
if (!status.isOK()) {
- warning() << "Error when waiting for write concern after removing chunks in " << _nss
- << " : " << status.reason();
+ warning() << "Error when waiting for write concern after removing " << nss << " range "
+ << *range << " : " << status.reason();
}
return true;
}
-int CollectionRangeDeleter::_doDeletion(OperationContext* opCtx,
- Collection* collection,
- const BSONObj& keyPattern,
- int maxToDelete) {
- invariant(_rangeInProgress);
- invariant(collection);
+StatusWith<int> CollectionRangeDeleter::_doDeletion(OperationContext* opCtx,
+ Collection* collection,
+ BSONObj const& keyPattern,
+ ChunkRange const& range,
+ int maxToDelete) {
+ invariant(collection != nullptr);
+ invariant(!isEmpty());
+
+ auto const& nss = collection->ns();
// The IndexChunk has a keyPattern that may apply to more than one index - we need to
// select the index and get the full index keyPattern here.
- const IndexDescriptor* idx =
- collection->getIndexCatalog()->findShardKeyPrefixedIndex(opCtx, keyPattern, false);
+ auto catalog = collection->getIndexCatalog();
+ const IndexDescriptor* idx = catalog->findShardKeyPrefixedIndex(opCtx, keyPattern, false);
if (idx == NULL) {
- warning() << "Unable to find shard key index for " << keyPattern.toString() << " in "
- << _nss;
- return -1;
+ std::string msg = str::stream() << "Unable to find shard key index for "
+ << keyPattern.toString() << " in " << nss.ns();
+ log() << msg;
+ return {ErrorCodes::InternalError, msg};
}
- KeyPattern indexKeyPattern(idx->keyPattern().getOwned());
-
// Extend bounds to match the index we found
- const BSONObj& min =
- Helpers::toKeyFormat(indexKeyPattern.extendRangeBound(_rangeInProgress->getMin(), false));
- const BSONObj& max =
- Helpers::toKeyFormat(indexKeyPattern.extendRangeBound(_rangeInProgress->getMax(), false));
+ KeyPattern indexKeyPattern(idx->keyPattern().getOwned());
+ auto extend = [&](auto& key) {
+ return Helpers::toKeyFormat(indexKeyPattern.extendRangeBound(key, false));
+ };
+ const BSONObj& min = extend(range.getMin());
+ const BSONObj& max = extend(range.getMax());
- LOG(1) << "begin removal of " << min << " to " << max << " in " << _nss;
+ LOG(1) << "begin removal of " << min << " to " << max << " in " << nss.ns();
auto indexName = idx->indexName();
- IndexDescriptor* desc = collection->getIndexCatalog()->findIndexByName(opCtx, indexName);
- if (!desc) {
- warning() << "shard key index with name " << indexName << " on '" << _nss
- << "' was dropped";
- return -1;
+ IndexDescriptor* descriptor = collection->getIndexCatalog()->findIndexByName(opCtx, indexName);
+ if (!descriptor) {
+ std::string msg = str::stream() << "shard key index with name " << indexName << " on '"
+ << nss.ns() << "' was dropped";
+ log() << msg;
+ return {ErrorCodes::InternalError, msg};
+ }
+
+ boost::optional<Helpers::RemoveSaver> saver;
+ if (serverGlobalParams.moveParanoia) {
+ saver.emplace("moveChunk", nss.ns(), "cleaning");
}
int numDeleted = 0;
do {
- auto exec = InternalPlanner::indexScan(opCtx,
- collection,
- desc,
- min,
- max,
- BoundInclusion::kIncludeStartKeyOnly,
- PlanExecutor::NO_YIELD,
- InternalPlanner::FORWARD,
- InternalPlanner::IXSCAN_FETCH);
+ auto halfOpen = BoundInclusion::kIncludeStartKeyOnly;
+ auto manual = PlanExecutor::YIELD_MANUAL;
+ auto forward = InternalPlanner::FORWARD;
+ auto fetch = InternalPlanner::IXSCAN_FETCH;
+
+ auto exec = InternalPlanner::indexScan(
+ opCtx, collection, descriptor, min, max, halfOpen, manual, forward, fetch);
+
RecordId rloc;
BSONObj obj;
PlanExecutor::ExecState state = exec->getNext(&obj, &rloc);
@@ -193,23 +217,71 @@ int CollectionRangeDeleter::_doDeletion(OperationContext* opCtx,
if (state == PlanExecutor::FAILURE || state == PlanExecutor::DEAD) {
warning(LogComponent::kSharding)
<< PlanExecutor::statestr(state) << " - cursor error while trying to delete " << min
- << " to " << max << " in " << _nss << ": " << WorkingSetCommon::toStatusString(obj)
+ << " to " << max << " in " << nss << ": " << WorkingSetCommon::toStatusString(obj)
<< ", stats: " << Explain::getWinningPlanStats(exec.get());
break;
}
-
invariant(PlanExecutor::ADVANCED == state);
- WriteUnitOfWork wuow(opCtx);
- if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(opCtx, _nss)) {
- warning() << "stepped down from primary while deleting chunk; orphaning data in "
- << _nss << " in range [" << min << ", " << max << ")";
- break;
+ {
+ WriteUnitOfWork wuow(opCtx);
+ if (saver) {
+ saver->goingToDelete(obj);
+ }
+ collection->deleteDocument(opCtx, rloc, nullptr, true);
+
+ wuow.commit();
}
- OpDebug* const nullOpDebug = nullptr;
- collection->deleteDocument(opCtx, rloc, nullOpDebug, true);
- wuow.commit();
} while (++numDeleted < maxToDelete);
+
return numDeleted;
}
+auto CollectionRangeDeleter::overlaps(ChunkRange const& range) const -> DeleteNotification {
+ // start search with newest entries by using reverse iterators
+ auto it = find_if(_orphans.rbegin(), _orphans.rend(), [&](auto& cleanee) {
+ return bool(cleanee.range.overlapWith(range));
+ });
+ return it != _orphans.rend() ? it->notification : DeleteNotification();
+}
+
+void CollectionRangeDeleter::add(ChunkRange const& range) {
+ // We ignore the case of overlapping, or even equal, ranges.
+ // Deleting overlapping ranges is quick.
+ _orphans.emplace_back(Deletion{ChunkRange(range.getMin().getOwned(), range.getMax().getOwned()),
+ std::make_shared<Notification<Status>>()});
+}
+
+void CollectionRangeDeleter::append(BSONObjBuilder* builder) const {
+ BSONArrayBuilder arr(builder->subarrayStart("rangesToClean"));
+ for (auto const& entry : _orphans) {
+ BSONObjBuilder obj;
+ entry.range.append(&obj);
+ arr.append(obj.done());
+ }
+ arr.done();
+}
+
+size_t CollectionRangeDeleter::size() const {
+ return _orphans.size();
+}
+
+bool CollectionRangeDeleter::isEmpty() const {
+ return _orphans.empty();
+}
+
+void CollectionRangeDeleter::clear(Status status) {
+ for (auto& range : _orphans) {
+ if (*(range.notification)) {
+ continue; // was triggered in the test driver
+ }
+ range.notification->set(status); // wake up anything still waiting
+ }
+ _orphans.clear();
+}
+
+void CollectionRangeDeleter::_pop(Status result) {
+ _orphans.front().notification->set(result); // wake up waitForClean
+ _orphans.pop_front();
+}
+
} // namespace mongo
diff --git a/src/mongo/db/s/collection_range_deleter.h b/src/mongo/db/s/collection_range_deleter.h
index f611215a73d..e5098d50dac 100644
--- a/src/mongo/db/s/collection_range_deleter.h
+++ b/src/mongo/db/s/collection_range_deleter.h
@@ -29,7 +29,9 @@
#include "mongo/base/disallow_copying.h"
#include "mongo/db/namespace_string.h"
+#include "mongo/executor/task_executor.h"
#include "mongo/s/catalog/type_chunk.h"
+#include "mongo/util/concurrency/notification.h"
namespace mongo {
@@ -41,40 +43,99 @@ class CollectionRangeDeleter {
MONGO_DISALLOW_COPYING(CollectionRangeDeleter);
public:
- CollectionRangeDeleter(NamespaceString nss);
+ //
+ // All of the following members must be called only while the containing MetadataManager's lock
+ // is held (or in its destructor), except cleanUpNextRange.
+ //
/**
- * Starts deleting ranges and cleans up this object when it is finished.
+ * Normally, construct with the collection name and ShardingState's dedicated executor.
*/
- void run();
+ CollectionRangeDeleter() = default;
+ ~CollectionRangeDeleter();
+
+ using DeleteNotification = std::shared_ptr<Notification<Status>>;
+
+ /**
+ * Adds a new range to be cleaned up by the cleaner thread.
+ */
+ void add(const ChunkRange& range);
+
+ /**
+ * Reports whether the argument range overlaps any of the ranges to clean. If there is overlap,
+ * it returns a notification that will be completed when the currently newest overlapping
+ * range is no longer scheduled. Its value indicates whether it has been successfully removed.
+ * If there is no overlap, the result is nullptr. After a successful removal, the caller
+ * should call again to ensure no other range overlaps the argument.
+ * (See CollectionShardingState::waitForClean and MetadataManager::trackOrphanedDataCleanup for
+ * an example use.)
+ */
+ DeleteNotification overlaps(ChunkRange const& range) const;
+
+ /**
+ * Reports the number of ranges remaining to be cleaned up.
+ */
+ size_t size() const;
+
+ bool isEmpty() const;
+
+ /*
+ * Notify with the specified status anything waiting on ranges scheduled, before discarding the
+ * ranges and notifications.
+ */
+ void clear(Status);
+
+ /*
+ * Append a representation of self to the specified builder.
+ */
+ void append(BSONObjBuilder* builder) const;
/**
- * Acquires the collection IX lock and checks whether there are new entries for the collection's
- * rangesToClean structure. If there are, deletes up to maxToDelete entries and yields using
- * the standard query yielding logic.
+ * If any ranges are scheduled to clean, deletes up to maxToDelete documents, notifying watchers
+ * of ranges as they are done being deleted. It performs its own collection locking so it must
+ * be called without locks.
*
- * Returns true if there are more entries in rangesToClean, false if there is no more progress
- * to be made.
+ * The 'rangeDeleterForTestOnly' is used as a utility for unit-tests that directly test the
+ * CollectionRangeDeleter class so they do not need to set up
+ * CollectionShardingState/MetadataManager.
+ *
+ * Returns true if it should be scheduled to run again because there is more data to be deleted
+ * or false otherwise.
*/
- bool cleanupNextRange(OperationContext* opCtx, int maxToDelete);
+ static bool cleanUpNextRange(OperationContext*,
+ NamespaceString const& nss,
+ int maxToDelete,
+ CollectionRangeDeleter* rangeDeleterForTestOnly = nullptr);
private:
/**
- * Performs the deletion of up to maxToDelete entries within the range in progress.
- * This function will invariant if called while _rangeInProgress is not set.
+ * Performs the deletion of up to maxToDelete entries within the range in progress. Must be
+ * called under the collection lock.
*
- * Returns the number of documents deleted (0 if deletion is finished), or -1 for error.
+ * Returns the number of documents deleted, 0 if done with the range, or bad status if deleting
+ * the range failed.
*/
- int _doDeletion(OperationContext* opCtx,
- Collection* collection,
- const BSONObj& keyPattern,
- int maxToDelete);
+ StatusWith<int> _doDeletion(OperationContext* opCtx,
+ Collection* collection,
+ const BSONObj& keyPattern,
+ ChunkRange const& range,
+ int maxToDelete);
- NamespaceString _nss;
+ /**
+ * Removes the latest-scheduled range from the ranges to be cleaned up, and notifies any
+ * interested callers of this->overlaps(range) with specified status.
+ */
+ void _pop(Status status);
- // Holds a range for which deletion has begun. If empty, then a new range
- // must be requested from rangesToClean
- boost::optional<ChunkRange> _rangeInProgress;
+ /**
+ * Ranges scheduled for deletion. The front of the list will be in active process of deletion.
+ * As each range is completed, its notification is signaled before it is popped.
+ */
+ struct Deletion {
+ ChunkRange const range;
+ DeleteNotification const notification;
+ };
+ std::list<Deletion> _orphans;
};
} // namespace mongo
diff --git a/src/mongo/db/s/collection_range_deleter_test.cpp b/src/mongo/db/s/collection_range_deleter_test.cpp
index 2927379ff1c..a5b4813d5e6 100644
--- a/src/mongo/db/s/collection_range_deleter_test.cpp
+++ b/src/mongo/db/s/collection_range_deleter_test.cpp
@@ -30,55 +30,84 @@
#include "mongo/db/s/collection_range_deleter.h"
+#include "mongo/bson/bsonobj.h"
+#include "mongo/bson/bsonobjbuilder.h"
#include "mongo/client/query.h"
+#include "mongo/client/remote_command_targeter_mock.h"
#include "mongo/db/client.h"
#include "mongo/db/db_raii.h"
#include "mongo/db/dbdirectclient.h"
+#include "mongo/db/index/index_descriptor.h"
#include "mongo/db/keypattern.h"
#include "mongo/db/repl/replication_coordinator_global.h"
#include "mongo/db/repl/replication_coordinator_mock.h"
#include "mongo/db/s/collection_sharding_state.h"
#include "mongo/db/s/sharding_state.h"
#include "mongo/db/service_context_d_test_fixture.h"
+#include "mongo/s/catalog/dist_lock_catalog_impl.h"
+#include "mongo/s/catalog/dist_lock_manager_mock.h"
+#include "mongo/s/catalog/sharding_catalog_client_mock.h"
#include "mongo/s/chunk_version.h"
+#include "mongo/s/client/shard_registry.h"
+#include "mongo/s/sharding_mongod_test_fixture.h"
namespace mongo {
using unittest::assertGet;
-
-const NamespaceString kNamespaceString = NamespaceString("foo", "bar");
+const NamespaceString kNss = NamespaceString("foo", "bar");
const std::string kPattern = "_id";
const BSONObj kKeyPattern = BSON(kPattern << 1);
+const std::string kShardName{"a"};
+const HostAndPort dummyHost("dummy", 123);
-class CollectionRangeDeleterTest : public ServiceContextMongoDTest {
+class CollectionRangeDeleterTest : public ShardingMongodTestFixture {
protected:
- ServiceContext::UniqueOperationContext _opCtx;
- MetadataManager* _metadataManager;
- std::unique_ptr<DBDirectClient> _dbDirectClient;
+ bool next(CollectionRangeDeleter& rangeDeleter, int maxToDelete) {
+ return CollectionRangeDeleter::cleanUpNextRange(
+ operationContext(), kNss, maxToDelete, &rangeDeleter);
+ }
+ std::shared_ptr<RemoteCommandTargeterMock> configTargeter() {
+ return RemoteCommandTargeterMock::get(shardRegistry()->getConfigShard()->getTargeter());
+ }
- OperationContext* operationContext();
private:
void setUp() override;
void tearDown() override;
+
+ std::unique_ptr<DistLockCatalog> makeDistLockCatalog(ShardRegistry* shardRegistry) override {
+ invariant(shardRegistry);
+ return stdx::make_unique<DistLockCatalogImpl>(shardRegistry);
+ }
+
+ std::unique_ptr<DistLockManager> makeDistLockManager(
+ std::unique_ptr<DistLockCatalog> distLockCatalog) override {
+ return stdx::make_unique<DistLockManagerMock>(std::move(distLockCatalog));
+ }
+
+ std::unique_ptr<ShardingCatalogClient> makeShardingCatalogClient(
+ std::unique_ptr<DistLockManager> distLockManager) override {
+ return stdx::make_unique<ShardingCatalogClientMock>(std::move(distLockManager));
+ }
};
void CollectionRangeDeleterTest::setUp() {
- ServiceContextMongoDTest::setUp();
- const repl::ReplSettings replSettings = {};
- repl::setGlobalReplicationCoordinator(
- new repl::ReplicationCoordinatorMock(getServiceContext(), replSettings));
- repl::getGlobalReplicationCoordinator()->setFollowerMode(repl::MemberState::RS_PRIMARY);
- _opCtx = getServiceContext()->makeOperationContext(&cc());
- _dbDirectClient = stdx::make_unique<DBDirectClient>(operationContext());
+ serverGlobalParams.clusterRole = ClusterRole::ShardServer;
+ ShardingMongodTestFixture::setUp();
+ replicationCoordinator()->alwaysAllowWrites(true);
+ initializeGlobalShardingStateForMongodForTest(ConnectionString(dummyHost));
+ // RemoteCommandTargeterMock::get(shardRegistry()->getConfigShard()->getTargeter())
+ // ->setConnectionStringReturnValue(kConfigConnStr);
+
+ configTargeter()->setFindHostReturnValue(dummyHost);
+
+ DBDirectClient(operationContext()).createCollection(kNss.ns());
{
+ AutoGetCollection autoColl(operationContext(), kNss, MODE_IX);
+ auto collectionShardingState = CollectionShardingState::get(operationContext(), kNss);
const OID epoch = OID::gen();
-
- AutoGetCollection autoColl(operationContext(), kNamespaceString, MODE_IX);
- auto collectionShardingState =
- CollectionShardingState::get(operationContext(), kNamespaceString);
collectionShardingState->refreshMetadata(
operationContext(),
stdx::make_unique<CollectionMetadata>(
@@ -86,208 +115,127 @@ void CollectionRangeDeleterTest::setUp() {
ChunkVersion(1, 0, epoch),
ChunkVersion(0, 0, epoch),
SimpleBSONObjComparator::kInstance.makeBSONObjIndexedMap<CachedChunkInfo>()));
- _metadataManager = collectionShardingState->getMetadataManagerForTest();
}
}
void CollectionRangeDeleterTest::tearDown() {
{
- AutoGetCollection autoColl(operationContext(), kNamespaceString, MODE_IX);
- auto collectionShardingState =
- CollectionShardingState::get(operationContext(), kNamespaceString);
+ AutoGetCollection autoColl(operationContext(), kNss, MODE_IX);
+ auto collectionShardingState = CollectionShardingState::get(operationContext(), kNss);
collectionShardingState->refreshMetadata(operationContext(), nullptr);
}
- _dbDirectClient.reset();
- _opCtx.reset();
- ServiceContextMongoDTest::tearDown();
- repl::setGlobalReplicationCoordinator(nullptr);
-}
-
-OperationContext* CollectionRangeDeleterTest::operationContext() {
- invariant(_opCtx);
- return _opCtx.get();
+ ShardingMongodTestFixture::tearDown();
}
namespace {
// Tests the case that there is nothing in the database.
TEST_F(CollectionRangeDeleterTest, EmptyDatabase) {
- CollectionRangeDeleter rangeDeleter(kNamespaceString);
- ASSERT_FALSE(rangeDeleter.cleanupNextRange(operationContext(), 1));
+ CollectionRangeDeleter rangeDeleter;
+ ASSERT_FALSE(next(rangeDeleter, 1));
}
// Tests the case that there is data, but it is not in a range to clean.
TEST_F(CollectionRangeDeleterTest, NoDataInGivenRangeToClean) {
- CollectionRangeDeleter rangeDeleter(kNamespaceString);
+ CollectionRangeDeleter rangeDeleter;
const BSONObj insertedDoc = BSON(kPattern << 25);
+ DBDirectClient dbclient(operationContext());
+ dbclient.insert(kNss.toString(), insertedDoc);
+ ASSERT_BSONOBJ_EQ(insertedDoc, dbclient.findOne(kNss.toString(), QUERY(kPattern << 25)));
- _dbDirectClient->insert(kNamespaceString.toString(), insertedDoc);
- ASSERT_BSONOBJ_EQ(insertedDoc,
- _dbDirectClient->findOne(kNamespaceString.toString(), QUERY(kPattern << 25)));
+ rangeDeleter.add(ChunkRange(BSON(kPattern << 0), BSON(kPattern << 10)));
+ ASSERT_TRUE(next(rangeDeleter, 1));
- _metadataManager->addRangeToClean(ChunkRange(BSON(kPattern << 0), BSON(kPattern << 10)));
- ASSERT_FALSE(rangeDeleter.cleanupNextRange(operationContext(), 1));
+ ASSERT_BSONOBJ_EQ(insertedDoc, dbclient.findOne(kNss.toString(), QUERY(kPattern << 25)));
- ASSERT_BSONOBJ_EQ(insertedDoc,
- _dbDirectClient->findOne(kNamespaceString.toString(), QUERY(kPattern << 25)));
+ ASSERT_FALSE(next(rangeDeleter, 1));
}
// Tests the case that there is a single document within a range to clean.
TEST_F(CollectionRangeDeleterTest, OneDocumentInOneRangeToClean) {
- CollectionRangeDeleter rangeDeleter(kNamespaceString);
+ CollectionRangeDeleter rangeDeleter;
const BSONObj insertedDoc = BSON(kPattern << 5);
+ DBDirectClient dbclient(operationContext());
+ dbclient.insert(kNss.toString(), BSON(kPattern << 5));
+ ASSERT_BSONOBJ_EQ(insertedDoc, dbclient.findOne(kNss.toString(), QUERY(kPattern << 5)));
- _dbDirectClient->insert(kNamespaceString.toString(), BSON(kPattern << 5));
- ASSERT_BSONOBJ_EQ(insertedDoc,
- _dbDirectClient->findOne(kNamespaceString.toString(), QUERY(kPattern << 5)));
-
- _metadataManager->addRangeToClean(ChunkRange(BSON(kPattern << 0), BSON(kPattern << 10)));
+ rangeDeleter.add(ChunkRange(BSON(kPattern << 0), BSON(kPattern << 10)));
- ASSERT_TRUE(rangeDeleter.cleanupNextRange(operationContext(), 1));
- ASSERT_TRUE(
- _dbDirectClient->findOne(kNamespaceString.toString(), QUERY(kPattern << 5)).isEmpty());
-
- ASSERT_FALSE(rangeDeleter.cleanupNextRange(operationContext(), 1));
+ ASSERT_TRUE(next(rangeDeleter, 1));
+ ASSERT_TRUE(next(rangeDeleter, 1));
+ ASSERT_TRUE(dbclient.findOne(kNss.toString(), QUERY(kPattern << 5)).isEmpty());
+ ASSERT_FALSE(next(rangeDeleter, 1));
}
// Tests the case that there are multiple documents within a range to clean.
TEST_F(CollectionRangeDeleterTest, MultipleDocumentsInOneRangeToClean) {
- CollectionRangeDeleter rangeDeleter(kNamespaceString);
- _dbDirectClient->insert(kNamespaceString.toString(), BSON(kPattern << 1));
- _dbDirectClient->insert(kNamespaceString.toString(), BSON(kPattern << 2));
- _dbDirectClient->insert(kNamespaceString.toString(), BSON(kPattern << 3));
- ASSERT_EQUALS(3ULL,
- _dbDirectClient->count(kNamespaceString.toString(), BSON(kPattern << LT << 5)));
-
- _metadataManager->addRangeToClean(ChunkRange(BSON(kPattern << 0), BSON(kPattern << 10)));
-
- ASSERT_TRUE(rangeDeleter.cleanupNextRange(operationContext(), 100));
- ASSERT_EQUALS(0ULL,
- _dbDirectClient->count(kNamespaceString.toString(), BSON(kPattern << LT << 5)));
-
- ASSERT_FALSE(rangeDeleter.cleanupNextRange(operationContext(), 100));
+ CollectionRangeDeleter rangeDeleter;
+ DBDirectClient dbclient(operationContext());
+ dbclient.insert(kNss.toString(), BSON(kPattern << 1));
+ dbclient.insert(kNss.toString(), BSON(kPattern << 2));
+ dbclient.insert(kNss.toString(), BSON(kPattern << 3));
+ ASSERT_EQUALS(3ULL, dbclient.count(kNss.toString(), BSON(kPattern << LT << 5)));
+
+ rangeDeleter.add(ChunkRange(BSON(kPattern << 0), BSON(kPattern << 10)));
+
+ ASSERT_TRUE(next(rangeDeleter, 100));
+ ASSERT_TRUE(next(rangeDeleter, 100));
+ ASSERT_EQUALS(0ULL, dbclient.count(kNss.toString(), BSON(kPattern << LT << 5)));
+ ASSERT_FALSE(next(rangeDeleter, 100));
}
// Tests the case that there are multiple documents within a range to clean, and the range deleter
// has a max deletion rate of one document per run.
-TEST_F(CollectionRangeDeleterTest, MultipleCleanupNextRangeCallsToCleanOneRange) {
- CollectionRangeDeleter rangeDeleter(kNamespaceString);
- _dbDirectClient->insert(kNamespaceString.toString(), BSON(kPattern << 1));
- _dbDirectClient->insert(kNamespaceString.toString(), BSON(kPattern << 2));
- _dbDirectClient->insert(kNamespaceString.toString(), BSON(kPattern << 3));
- ASSERT_EQUALS(3ULL,
- _dbDirectClient->count(kNamespaceString.toString(), BSON(kPattern << LT << 5)));
-
- _metadataManager->addRangeToClean(ChunkRange(BSON(kPattern << 0), BSON(kPattern << 10)));
-
- ASSERT_TRUE(rangeDeleter.cleanupNextRange(operationContext(), 1));
- ASSERT_EQUALS(2ULL,
- _dbDirectClient->count(kNamespaceString.toString(), BSON(kPattern << LT << 5)));
-
- ASSERT_TRUE(rangeDeleter.cleanupNextRange(operationContext(), 1));
- ASSERT_EQUALS(1ULL,
- _dbDirectClient->count(kNamespaceString.toString(), BSON(kPattern << LT << 5)));
-
- ASSERT_TRUE(rangeDeleter.cleanupNextRange(operationContext(), 1));
- ASSERT_EQUALS(0ULL,
- _dbDirectClient->count(kNamespaceString.toString(), BSON(kPattern << LT << 5)));
-
- ASSERT_FALSE(rangeDeleter.cleanupNextRange(operationContext(), 1));
+TEST_F(CollectionRangeDeleterTest, MultipleCleanupNextRangeCalls) {
+ CollectionRangeDeleter rangeDeleter;
+ DBDirectClient dbclient(operationContext());
+ dbclient.insert(kNss.toString(), BSON(kPattern << 1));
+ dbclient.insert(kNss.toString(), BSON(kPattern << 2));
+ dbclient.insert(kNss.toString(), BSON(kPattern << 3));
+ ASSERT_EQUALS(3ULL, dbclient.count(kNss.toString(), BSON(kPattern << LT << 5)));
+
+ rangeDeleter.add(ChunkRange(BSON(kPattern << 0), BSON(kPattern << 10)));
+
+ ASSERT_TRUE(next(rangeDeleter, 1));
+ ASSERT_EQUALS(2ULL, dbclient.count(kNss.toString(), BSON(kPattern << LT << 5)));
+
+ ASSERT_TRUE(next(rangeDeleter, 1));
+ ASSERT_EQUALS(1ULL, dbclient.count(kNss.toString(), BSON(kPattern << LT << 5)));
+
+ ASSERT_TRUE(next(rangeDeleter, 1));
+ ASSERT_TRUE(next(rangeDeleter, 1));
+ ASSERT_EQUALS(0ULL, dbclient.count(kNss.toString(), BSON(kPattern << LT << 5)));
+ ASSERT_FALSE(next(rangeDeleter, 1));
}
// Tests the case that there are two ranges to clean, each containing multiple documents.
TEST_F(CollectionRangeDeleterTest, MultipleDocumentsInMultipleRangesToClean) {
- CollectionRangeDeleter rangeDeleter(kNamespaceString);
- _dbDirectClient->insert(kNamespaceString.toString(), BSON(kPattern << 1));
- _dbDirectClient->insert(kNamespaceString.toString(), BSON(kPattern << 2));
- _dbDirectClient->insert(kNamespaceString.toString(), BSON(kPattern << 3));
- _dbDirectClient->insert(kNamespaceString.toString(), BSON(kPattern << 4));
- _dbDirectClient->insert(kNamespaceString.toString(), BSON(kPattern << 5));
- _dbDirectClient->insert(kNamespaceString.toString(), BSON(kPattern << 6));
- ASSERT_EQUALS(6ULL,
- _dbDirectClient->count(kNamespaceString.toString(), BSON(kPattern << LT << 10)));
+ CollectionRangeDeleter rangeDeleter;
+ DBDirectClient dbclient(operationContext());
+ dbclient.insert(kNss.toString(), BSON(kPattern << 1));
+ dbclient.insert(kNss.toString(), BSON(kPattern << 2));
+ dbclient.insert(kNss.toString(), BSON(kPattern << 3));
+ dbclient.insert(kNss.toString(), BSON(kPattern << 4));
+ dbclient.insert(kNss.toString(), BSON(kPattern << 5));
+ dbclient.insert(kNss.toString(), BSON(kPattern << 6));
+ ASSERT_EQUALS(6ULL, dbclient.count(kNss.toString(), BSON(kPattern << LT << 10)));
const ChunkRange chunkRange1 = ChunkRange(BSON(kPattern << 0), BSON(kPattern << 4));
const ChunkRange chunkRange2 = ChunkRange(BSON(kPattern << 4), BSON(kPattern << 7));
- _metadataManager->addRangeToClean(chunkRange1);
- _metadataManager->addRangeToClean(chunkRange2);
-
- ASSERT_TRUE(rangeDeleter.cleanupNextRange(operationContext(), 100));
- ASSERT_EQUALS(0ULL,
- _dbDirectClient->count(kNamespaceString.toString(), BSON(kPattern << LT << 4)));
- ASSERT_EQUALS(3ULL,
- _dbDirectClient->count(kNamespaceString.toString(), BSON(kPattern << LT << 10)));
-
- ASSERT_TRUE(rangeDeleter.cleanupNextRange(operationContext(), 100));
- ASSERT_TRUE(rangeDeleter.cleanupNextRange(operationContext(), 100));
- ASSERT_EQUALS(0ULL,
- _dbDirectClient->count(kNamespaceString.toString(), BSON(kPattern << LT << 10)));
-
- ASSERT_FALSE(rangeDeleter.cleanupNextRange(operationContext(), 1));
-}
-
-// Tests the case that there is a range to clean, and halfway through a deletion a chunk
-// within the range is received
-TEST_F(CollectionRangeDeleterTest, MultipleCallstoCleanupNextRangeWithChunkReceive) {
- CollectionRangeDeleter rangeDeleter(kNamespaceString);
- _dbDirectClient->insert(kNamespaceString.toString(), BSON(kPattern << 1));
- _dbDirectClient->insert(kNamespaceString.toString(), BSON(kPattern << 2));
- _dbDirectClient->insert(kNamespaceString.toString(), BSON(kPattern << 3));
- _dbDirectClient->insert(kNamespaceString.toString(), BSON(kPattern << 4));
- const BSONObj insertedDoc5 = BSON(kPattern << 5); // not to be deleted
- _dbDirectClient->insert(kNamespaceString.toString(), insertedDoc5);
- ASSERT_EQUALS(5ULL,
- _dbDirectClient->count(kNamespaceString.toString(), BSON(kPattern << LT << 10)));
-
- _metadataManager->addRangeToClean(ChunkRange(BSON(kPattern << 0), BSON(kPattern << 10)));
-
- ASSERT_TRUE(rangeDeleter.cleanupNextRange(operationContext(), 2));
- ASSERT_EQUALS(3ULL,
- _dbDirectClient->count(kNamespaceString.toString(), BSON(kPattern << LT << 10)));
-
- _metadataManager->beginReceive(ChunkRange(BSON(kPattern << 5), BSON(kPattern << 6)));
-
- // insertedDoc5 is no longer eligible for deletion
-
- ASSERT_TRUE(rangeDeleter.cleanupNextRange(operationContext(), 2));
- ASSERT_EQUALS(1ULL,
- _dbDirectClient->count(kNamespaceString.toString(), BSON(kPattern << LT << 10)));
-
- ASSERT_TRUE(rangeDeleter.cleanupNextRange(operationContext(), 2));
- ASSERT_EQUALS(1ULL,
- _dbDirectClient->count(kNamespaceString.toString(), BSON(kPattern << LT << 10)));
-
- ASSERT_FALSE(rangeDeleter.cleanupNextRange(operationContext(), 2));
-
- ASSERT_BSONOBJ_EQ(
- insertedDoc5,
- _dbDirectClient->findOne(kNamespaceString.toString(), QUERY(kPattern << GT << 0)));
-}
-
-TEST_F(CollectionRangeDeleterTest, CalltoCleanupNextRangeWithChunkReceive) {
- CollectionRangeDeleter rangeDeleter(kNamespaceString);
- _dbDirectClient->insert(kNamespaceString.toString(), BSON(kPattern << 1));
- _dbDirectClient->insert(kNamespaceString.toString(), BSON(kPattern << 2));
- _dbDirectClient->insert(kNamespaceString.toString(), BSON(kPattern << 3));
- _dbDirectClient->insert(kNamespaceString.toString(), BSON(kPattern << 4));
- ASSERT_EQUALS(4ULL,
- _dbDirectClient->count(kNamespaceString.toString(), BSON(kPattern << LT << 10)));
-
- _metadataManager->addRangeToClean(ChunkRange(BSON(kPattern << 0), BSON(kPattern << 10)));
-
- ASSERT_TRUE(rangeDeleter.cleanupNextRange(operationContext(), 2));
- ASSERT_EQUALS(2ULL,
- _dbDirectClient->count(kNamespaceString.toString(), BSON(kPattern << LT << 10)));
-
- _metadataManager->beginReceive(ChunkRange(BSON(kPattern << 0), BSON(kPattern << 10)));
-
- ASSERT_FALSE(rangeDeleter.cleanupNextRange(operationContext(), 2));
-
- ASSERT_EQUALS(2ULL,
- _dbDirectClient->count(kNamespaceString.toString(), BSON(kPattern << LT << 10)));
+ rangeDeleter.add(chunkRange1);
+ rangeDeleter.add(chunkRange2);
+
+ ASSERT_TRUE(next(rangeDeleter, 100));
+ ASSERT_EQUALS(0ULL, dbclient.count(kNss.toString(), BSON(kPattern << LT << 4)));
+ ASSERT_EQUALS(3ULL, dbclient.count(kNss.toString(), BSON(kPattern << LT << 10)));
+
+ ASSERT_TRUE(next(rangeDeleter, 100)); // discover there are no more < 4, pop range 1
+ ASSERT_TRUE(next(rangeDeleter, 100)); // delete the remaining documents
+ ASSERT_TRUE(next(rangeDeleter, 1)); // discover there are no more, pop range 2
+ ASSERT_EQUALS(0ULL, dbclient.count(kNss.toString(), BSON(kPattern << LT << 10)));
+ ASSERT_FALSE(next(rangeDeleter, 1)); // discover there are no more ranges
}
} // unnamed namespace
-
} // namespace mongo
diff --git a/src/mongo/db/s/collection_sharding_state.cpp b/src/mongo/db/s/collection_sharding_state.cpp
index 5b4b39cb33f..4bb86031fc4 100644
--- a/src/mongo/db/s/collection_sharding_state.cpp
+++ b/src/mongo/db/s/collection_sharding_state.cpp
@@ -34,6 +34,7 @@
#include "mongo/db/client.h"
#include "mongo/db/concurrency/lock_state.h"
+#include "mongo/db/db_raii.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/repl/replication_coordinator_global.h"
#include "mongo/db/s/collection_metadata.h"
@@ -84,7 +85,8 @@ private:
} // unnamed namespace
CollectionShardingState::CollectionShardingState(ServiceContext* sc, NamespaceString nss)
- : _nss(std::move(nss)), _metadataManager{sc, _nss} {}
+ : _nss(std::move(nss)),
+ _metadataManager{sc, _nss, ShardingState::get(sc)->getRangeDeleterTaskExecutor()} {}
CollectionShardingState::~CollectionShardingState() {
invariant(!_sourceMgr);
@@ -119,14 +121,18 @@ void CollectionShardingState::markNotShardedAtStepdown() {
_metadataManager.refreshActiveMetadata(nullptr);
}
-void CollectionShardingState::beginReceive(const ChunkRange& range) {
- _metadataManager.beginReceive(range);
+bool CollectionShardingState::beginReceive(ChunkRange const& range) {
+ return _metadataManager.beginReceive(range);
}
void CollectionShardingState::forgetReceive(const ChunkRange& range) {
_metadataManager.forgetReceive(range);
}
+Status CollectionShardingState::cleanUpRange(ChunkRange const& range) {
+ return _metadataManager.cleanUpRange(range);
+}
+
MigrationSourceManager* CollectionShardingState::getMigrationSourceManager() {
return _sourceMgr;
}
@@ -172,6 +178,52 @@ bool CollectionShardingState::collectionIsSharded() {
return true;
}
+// Call with collection unlocked. Note that the CollectionShardingState object involved might not
+// exist anymore at the time of the call, or indeed anytime outside the AutoGetCollection block, so
+// anything that might alias something in it must be copied first.
+
+/* static */
+Status CollectionShardingState::waitForClean(OperationContext* opCtx,
+ NamespaceString nss,
+ OID const& epoch,
+ ChunkRange orphanRange) {
+ do {
+ auto stillScheduled = CollectionShardingState::CleanupNotification(nullptr);
+ {
+ AutoGetCollection autoColl(opCtx, nss, MODE_IX);
+ // First, see if collection was dropped.
+ auto css = CollectionShardingState::get(opCtx, nss);
+ {
+ auto metadata = css->_metadataManager.getActiveMetadata();
+ if (!metadata || metadata->getCollVersion().epoch() != epoch) {
+ return {ErrorCodes::StaleShardVersion, "Collection being migrated was dropped"};
+ }
+ } // drop metadata
+ stillScheduled = css->_metadataManager.trackOrphanedDataCleanup(orphanRange);
+ if (stillScheduled == nullptr) {
+ log() << "Finished deleting " << nss.ns() << " range "
+ << redact(orphanRange.toString());
+ return Status::OK();
+ }
+ } // drop collection lock
+
+ log() << "Waiting for deletion of " << nss.ns() << " range " << orphanRange;
+ Status result = stillScheduled->get(opCtx);
+ if (!result.isOK()) {
+ return {result.code(),
+ str::stream() << "Failed to delete orphaned " << nss.ns() << " range "
+ << redact(orphanRange.toString())
+ << ": "
+ << redact(result.reason())};
+ }
+ } while (true);
+ MONGO_UNREACHABLE;
+}
+
+boost::optional<KeyRange> CollectionShardingState::getNextOrphanRange(BSONObj const& from) {
+ return _metadataManager.getNextOrphanRange(from);
+}
+
bool CollectionShardingState::isDocumentInMigratingChunk(OperationContext* opCtx,
const BSONObj& doc) {
dassert(opCtx->lockState()->isCollectionLockedForMode(_nss.ns(), MODE_IX));
@@ -312,8 +364,7 @@ bool CollectionShardingState::_checkShardVersionOk(OperationContext* opCtx,
if (!info) {
// There is no shard version information on either 'opCtx' or 'client'. This means that
// the operation represented by 'opCtx' is unversioned, and the shard version is always
- // OK
- // for unversioned operations.
+ // OK for unversioned operations.
return true;
}
diff --git a/src/mongo/db/s/collection_sharding_state.h b/src/mongo/db/s/collection_sharding_state.h
index 5bbc2b9c576..2ab648ad26c 100644
--- a/src/mongo/db/s/collection_sharding_state.h
+++ b/src/mongo/db/s/collection_sharding_state.h
@@ -34,7 +34,9 @@
#include "mongo/base/disallow_copying.h"
#include "mongo/base/string_data.h"
#include "mongo/db/namespace_string.h"
+#include "mongo/db/s/collection_range_deleter.h"
#include "mongo/db/s/metadata_manager.h"
+#include "mongo/util/concurrency/notification.h"
namespace mongo {
@@ -85,11 +87,20 @@ public:
static CollectionShardingState* get(OperationContext* opCtx, const std::string& ns);
/**
- * Returns the chunk metadata for the collection.
+ * Returns the chunk metadata for the collection. The metadata it represents lives as long as
+ * the object itself, and the collection, exist. After dropping the collection lock, the
+ * collection may no longer exist, but it is still safe to destroy the object.
*/
ScopedCollectionMetadata getMetadata();
/**
+ * BSON output of the pending metadata into a BSONArray
+ */
+ void toBSONPending(BSONArrayBuilder& bb) const {
+ _metadataManager.toBSONPending(bb);
+ }
+
+ /**
* Updates the metadata based on changes received from the config server and also resolves the
* pending receives map in case some of these pending receives have completed or have been
* abandoned. If newMetadata is null, unshard the collection.
@@ -105,20 +116,26 @@ public:
void markNotShardedAtStepdown();
/**
- * Modifies the collection's sharding state to indicate that it is beginning to receive the
- * given ChunkRange.
+ * Schedules any documents in the range for immediate cleanup iff no running queries can depend
+ * on them, and adds the range to the list of pending ranges. Otherwise, returns false. Does
+ * not block.
*/
- void beginReceive(const ChunkRange& range);
+ bool beginReceive(ChunkRange const& range);
/*
- * Modifies the collection's sharding state to indicate that the previous pending migration
- * failed. If the range was not previously pending, this function will crash the server.
- *
- * This function is the mirror image of beginReceive.
+ * Removes the range from the list of pending ranges, and schedules any documents in the range
+ * for immediate cleanup. Does not block.
*/
void forgetReceive(const ChunkRange& range);
/**
+ * Schedules documents in the range for cleanup after any running queries that may depend on
+ * them have terminated. Does not block. Use waitForClean to block pending completion.
+ * Fails if range overlaps any current local shard chunk.
+ */
+ Status cleanUpRange(ChunkRange const& range);
+
+ /**
* Returns the active migration source manager, if one is available.
*/
MigrationSourceManager* getMigrationSourceManager();
@@ -154,6 +171,28 @@ public:
*/
bool collectionIsSharded();
+ /**
+ * Tracks deletion of any documents within the range, returning when deletion is complete.
+ * Throws if the collection is dropped while it sleeps. Call this with the collection unlocked.
+ */
+ static Status waitForClean(OperationContext*, NamespaceString, OID const& epoch, ChunkRange);
+
+ using CleanupNotification = MetadataManager::CleanupNotification;
+ /**
+ * Reports whether any part of the argument range is still scheduled for deletion. If not,
+ * returns nullptr. Otherwise, returns a notification n such that n->get(opCtx) will wake when
+ * deletion of a range (possibly the one of interest) is completed. This should be called
+ * again after each wakeup until it returns nullptr, because there can be more than one range
+ * scheduled for deletion that overlaps its argument.
+ */
+ CleanupNotification trackOrphanedDataCleanup(ChunkRange const& range) const;
+
+ /**
+ * Returns a range _not_ owned by this shard that starts no lower than the specified
+ * startingFrom key value, if any, or boost::none if there is no such range.
+ */
+ boost::optional<KeyRange> getNextOrphanRange(BSONObj const& startingFrom);
+
// Replication subsystem hooks. If this collection is serving as a source for migration, these
// methods inform it of any changes to its contents.
@@ -167,11 +206,6 @@ public:
void onDropCollection(OperationContext* opCtx, const NamespaceString& collectionName);
- MetadataManager* getMetadataManagerForTest() {
- return &_metadataManager;
- }
-
-
private:
/**
* Checks whether the shard version of the operation matches that of the collection.
@@ -191,7 +225,7 @@ private:
ChunkVersion* expectedShardVersion,
ChunkVersion* actualShardVersion);
- // Namespace to which this state belongs.
+ // Namespace this state belongs to.
const NamespaceString _nss;
// Contains all the metadata associated with this collection.
@@ -204,7 +238,11 @@ private:
// NOTE: The value is not owned by this class.
MigrationSourceManager* _sourceMgr{nullptr};
- friend class CollectionRangeDeleter;
+ // for access to _metadataManager
+ friend bool CollectionRangeDeleter::cleanUpNextRange(OperationContext*,
+ NamespaceString const&,
+ int maxToDelete,
+ CollectionRangeDeleter*);
};
} // namespace mongo
diff --git a/src/mongo/db/s/get_shard_version_command.cpp b/src/mongo/db/s/get_shard_version_command.cpp
index 81157fcc07a..68d7a4c70df 100644
--- a/src/mongo/db/s/get_shard_version_command.cpp
+++ b/src/mongo/db/s/get_shard_version_command.cpp
@@ -131,7 +131,7 @@ public:
chunksArr.doneFast();
BSONArrayBuilder pendingArr(metadataBuilder.subarrayStart("pending"));
- metadata->toBSONPending(pendingArr);
+ css->toBSONPending(pendingArr);
pendingArr.doneFast();
}
metadataBuilder.doneFast();
diff --git a/src/mongo/db/s/metadata_manager.cpp b/src/mongo/db/s/metadata_manager.cpp
index faa062e9476..b58218685ba 100644
--- a/src/mongo/db/s/metadata_manager.cpp
+++ b/src/mongo/db/s/metadata_manager.cpp
@@ -33,36 +33,145 @@
#include "mongo/db/s/metadata_manager.h"
#include "mongo/bson/simple_bsonobj_comparator.h"
+#include "mongo/bson/util/builder.h"
+#include "mongo/db/db_raii.h"
+#include "mongo/db/query/internal_plans.h"
#include "mongo/db/range_arithmetic.h"
#include "mongo/db/s/collection_range_deleter.h"
+#include "mongo/db/s/collection_sharding_state.h"
#include "mongo/db/s/sharding_state.h"
#include "mongo/stdx/memory.h"
+#include "mongo/util/assert_util.h"
#include "mongo/util/log.h"
+// MetadataManager exists only as a data member of a CollectionShardingState object.
+//
+// It maintains a set of std::shared_ptr<MetadataManager::Tracker> pointers: one in
+// _activeMetadataTracker, and more in a list _metadataInUse. It also contains a
+// CollectionRangeDeleter that queues orphan ranges to delete in a background thread, and a record
+// of the ranges being migrated in, to avoid deleting them.
+//
+// Free-floating MetadataManager::Tracker objects are maintained by these pointers, and also by
+// clients in ScopedCollectionMetadata objects obtained via CollectionShardingState::getMetadata().
+//
+// A Tracker object keeps:
+// a std::unique_ptr<CollectionMetadata>, owning a map of the chunks owned by the shard,
+// a key range [min,max) of orphaned documents that may be deleted when the count goes to zero,
+// a count of the ScopedCollectionMetadata objects that have pointers to it,
+// a mutex lock, serializing access to:
+// a pointer back to the MetadataManager object that created it.
+//
+// __________________________
+// (s): std::shared_ptr<> Clients:| ScopedCollectionMetadata |
+// (u): std::unique_ptr<> | tracker (s)-----------+
+// ________________________________ |__________________________| | |
+// | CollectionShardingState | | tracker (s)--------+ +
+// | | |__________________________| | | |
+// | ____________________________ | | tracker (s)----+ | |
+// | | MetadataManager | | |_________________________| | | |
+// | | | | ________________________ | | |
+// | | _activeMetadataTracker (s)-------->| Tracker |<------+ | | (1 reference)
+// | | | | | ______________________|_ | |
+// | | [ (s),-------------->| Tracker | | | (0 references)
+// | | (s),---------\ | | ______________________|_ | |
+// | | _metadataInUse ... ] | | \----->| Tracker |<----+-+ (2 references)
+// | | ________________________ | | | | | | ______________________
+// | | | CollectionRangeDeleter | | | | | | metadata (u)------------->| CollectionMetadata |
+// | | | | | | | | | [ orphans [min,max) ] | | |
+// | | | _orphans [ [min,max), | | | | | | usageCounter | | _chunksMap |
+// | | | [min,max), | | | | | | trackerLock: | | _chunkVersion |
+// | | | ... ] | |<--------------manager | | ... |
+// | | | | | | |_| | | |______________________|
+// | | |________________________| | | |_| |
+// | | | | |________________________|
+//
+// A ScopedCollectionMetadata object is created and held during a query, and destroyed when the
+// query no longer needs access to the collection. Its destructor decrements the Tracker's
+// usageCounter.
+//
+// When a new chunk mapping replaces _activeMetadata, if any queries still depend on the current
+// mapping, it is pushed onto the back of _metadataInUse.
+//
+// Trackers pointed to from _metadataInUse, and their associated CollectionMetadata, are maintained
+// at least as long as any query holds a ScopedCollectionMetadata object referring to them, or to
+// any older tracker. In the diagram above, the middle Tracker must be kept until the one below it
+// is disposed of. (Note that _metadataInUse as shown here has its front() at the bottom, back()
+// at the top. As usual, new entries are pushed onto the back, popped off the front.)
+
namespace mongo {
-using CallbackArgs = executor::TaskExecutor::CallbackArgs;
+using TaskExecutor = executor::TaskExecutor;
+using CallbackArgs = TaskExecutor::CallbackArgs;
+
+struct MetadataManager::Tracker {
+ /**
+ * Creates a new Tracker with the usageCounter initialized to zero.
+ */
+ Tracker(std::unique_ptr<CollectionMetadata>, MetadataManager*);
+
+ std::unique_ptr<CollectionMetadata> metadata;
+ uint32_t usageCounter{0};
+ boost::optional<ChunkRange> orphans{boost::none};
-MetadataManager::MetadataManager(ServiceContext* sc, NamespaceString nss)
+ // lock guards access to manager, which is zeroed by the ~MetadataManager(), but used by
+ // ScopedCollectionMetadata when usageCounter falls to zero.
+ stdx::mutex trackerLock;
+ MetadataManager* manager{nullptr};
+};
+
+MetadataManager::MetadataManager(ServiceContext* sc, NamespaceString nss, TaskExecutor* executor)
: _nss(std::move(nss)),
_serviceContext(sc),
- _activeMetadataTracker(stdx::make_unique<CollectionMetadataTracker>(nullptr)),
+ _activeMetadataTracker(std::make_shared<Tracker>(nullptr, this)),
_receivingChunks(SimpleBSONObjComparator::kInstance.makeBSONObjIndexedMap<CachedChunkInfo>()),
- _rangesToClean(
- SimpleBSONObjComparator::kInstance.makeBSONObjIndexedMap<RangeToCleanDescriptor>()) {}
+ _notification(std::make_shared<Notification<Status>>()),
+ _executor(executor),
+ _rangesToClean() {}
MetadataManager::~MetadataManager() {
+ {
+ stdx::lock_guard<stdx::mutex> scopedLock(_managerLock);
+ _shuttingDown = true;
+ }
+ std::list<std::shared_ptr<Tracker>> inUse;
+ {
+ // drain any threads that might remove _metadataInUse entries, push to deleter
+ stdx::lock_guard<stdx::mutex> scopedLock(_managerLock);
+ inUse = std::move(_metadataInUse);
+ }
+
+ // Trackers can outlive MetadataManager, so we still need to lock each tracker...
+ std::for_each(inUse.begin(), inUse.end(), [](auto& tp) {
+ stdx::lock_guard<stdx::mutex> scopedLock(tp->trackerLock);
+ tp->manager = nullptr;
+ });
+ { // ... and the active one too
+ stdx::lock_guard<stdx::mutex> scopedLock(_activeMetadataTracker->trackerLock);
+ _activeMetadataTracker->manager = nullptr;
+ }
+
+ // still need to block the deleter thread:
stdx::lock_guard<stdx::mutex> scopedLock(_managerLock);
- invariant(!_activeMetadataTracker || _activeMetadataTracker->usageCounter == 0);
+ Status status{ErrorCodes::InterruptedDueToReplStateChange,
+ "tracking orphaned range deletion abandoned because the"
+ " collection was dropped or became unsharded"};
+ if (!*_notification) { // check just because test driver triggers it
+ _notification->set(status);
+ }
+ _rangesToClean.clear(status);
}
ScopedCollectionMetadata MetadataManager::getActiveMetadata() {
stdx::lock_guard<stdx::mutex> scopedLock(_managerLock);
- if (!_activeMetadataTracker) {
- return ScopedCollectionMetadata();
+ if (_activeMetadataTracker) {
+ return ScopedCollectionMetadata(_activeMetadataTracker);
}
+ return ScopedCollectionMetadata();
+}
- return ScopedCollectionMetadata(this, _activeMetadataTracker.get());
+size_t MetadataManager::numberOfMetadataSnapshots() {
+ stdx::lock_guard<stdx::mutex> scopedLock(_managerLock);
+ return _metadataInUse.size();
}
void MetadataManager::refreshActiveMetadata(std::unique_ptr<CollectionMetadata> remoteMetadata) {
@@ -73,7 +182,7 @@ void MetadataManager::refreshActiveMetadata(std::unique_ptr<CollectionMetadata>
// collection sharding information regardless of whether the node is sharded or not.
if (!remoteMetadata && !_activeMetadataTracker->metadata) {
invariant(_receivingChunks.empty());
- invariant(_rangesToClean.empty());
+ invariant(_rangesToClean.isEmpty());
return;
}
@@ -83,8 +192,8 @@ void MetadataManager::refreshActiveMetadata(std::unique_ptr<CollectionMetadata>
<< _activeMetadataTracker->metadata->toStringBasic() << " as no longer sharded";
_receivingChunks.clear();
- _rangesToClean.clear();
-
+ _rangesToClean.clear(Status{ErrorCodes::InterruptedDueToReplStateChange,
+ "Collection sharding metadata destroyed"});
_setActiveMetadata_inlock(nullptr);
return;
}
@@ -99,7 +208,7 @@ void MetadataManager::refreshActiveMetadata(std::unique_ptr<CollectionMetadata>
<< remoteMetadata->toStringBasic();
invariant(_receivingChunks.empty());
- invariant(_rangesToClean.empty());
+ invariant(_rangesToClean.isEmpty());
_setActiveMetadata_inlock(std::move(remoteMetadata));
return;
@@ -114,9 +223,7 @@ void MetadataManager::refreshActiveMetadata(std::unique_ptr<CollectionMetadata>
<< remoteMetadata->toStringBasic() << " due to epoch change";
_receivingChunks.clear();
- _rangesToClean.clear();
-
- _setActiveMetadata_inlock(std::move(remoteMetadata));
+ _rangesToClean.clear(Status::OK());
return;
}
@@ -132,337 +239,325 @@ void MetadataManager::refreshActiveMetadata(std::unique_ptr<CollectionMetadata>
<< _activeMetadataTracker->metadata->toStringBasic() << " to "
<< remoteMetadata->toStringBasic();
- // Resolve any receiving chunks, which might have completed by now
+ // Resolve any receiving chunks, which might have completed by now.
+ // Should be no more than one.
for (auto it = _receivingChunks.begin(); it != _receivingChunks.end();) {
- const BSONObj min = it->first;
- const BSONObj max = it->second.getMaxKey();
+ BSONObj const& min = it->first;
+ BSONObj const& max = it->second.getMaxKey();
- // Our pending range overlaps at least one chunk
- if (rangeMapContains(remoteMetadata->getChunks(), min, max)) {
- // The remote metadata contains a chunk we were earlier in the process of receiving, so
- // we deem it successfully received.
- LOG(2) << "Verified chunk " << redact(ChunkRange(min, max).toString())
- << " for collection " << _nss.ns() << " has been migrated to this shard earlier";
-
- _receivingChunks.erase(it++);
- continue;
- } else if (!rangeMapOverlaps(remoteMetadata->getChunks(), min, max)) {
+ if (!remoteMetadata->rangeOverlapsChunk(ChunkRange(min, max))) {
++it;
continue;
}
+ // The remote metadata contains a chunk we were earlier in the process of receiving, so
+ // we deem it successfully received.
+ LOG(2) << "Verified chunk " << ChunkRange(min, max) << " for collection " << _nss.ns()
+ << " has been migrated to this shard earlier";
- // Partial overlap indicates that the earlier migration has failed, but the chunk being
- // migrated underwent some splits and other migrations and ended up here again. In this
- // case, we will request full reload of the metadata. Currently this cannot happen, because
- // all migrations are with the explicit knowledge of the recipient shard. However, we leave
- // the option open so that chunk splits can do empty chunk move without having to notify the
- // recipient.
- RangeVector overlappedChunks;
- getRangeMapOverlap(remoteMetadata->getChunks(), min, max, &overlappedChunks);
-
- for (const auto& overlapChunkMin : overlappedChunks) {
- auto itRecv = _receivingChunks.find(overlapChunkMin.first);
- invariant(itRecv != _receivingChunks.end());
-
- const ChunkRange receivingRange(itRecv->first, itRecv->second.getMaxKey());
-
- _receivingChunks.erase(itRecv);
-
- // Make sure any potentially partially copied chunks are scheduled to be cleaned up
- _addRangeToClean_inlock(receivingRange);
- }
-
- // Need to reset the iterator
+ _receivingChunks.erase(it);
it = _receivingChunks.begin();
}
- // For compatibility with the current range deleter, which is driven entirely by the contents of
- // the CollectionMetadata update the pending chunks
- for (const auto& receivingChunk : _receivingChunks) {
- ChunkType chunk;
- chunk.setMin(receivingChunk.first);
- chunk.setMax(receivingChunk.second.getMaxKey());
- remoteMetadata = remoteMetadata->clonePlusPending(chunk);
- }
-
_setActiveMetadata_inlock(std::move(remoteMetadata));
}
-void MetadataManager::beginReceive(const ChunkRange& range) {
- stdx::lock_guard<stdx::mutex> scopedLock(_managerLock);
-
- // Collection is not known to be sharded if the active metadata tracker is null
- invariant(_activeMetadataTracker);
-
- // If range is contained within pending chunks, this means a previous migration must have failed
- // and we need to clean all overlaps
- RangeVector overlappedChunks;
- getRangeMapOverlap(_receivingChunks, range.getMin(), range.getMax(), &overlappedChunks);
-
- for (const auto& overlapChunkMin : overlappedChunks) {
- auto itRecv = _receivingChunks.find(overlapChunkMin.first);
- invariant(itRecv != _receivingChunks.end());
-
- const ChunkRange receivingRange(itRecv->first, itRecv->second.getMaxKey());
-
- _receivingChunks.erase(itRecv);
-
- // Make sure any potentially partially copied chunks are scheduled to be cleaned up
- _addRangeToClean_inlock(receivingRange);
- }
-
- // Need to ensure that the background range deleter task won't delete the range we are about to
- // receive
- _removeRangeToClean_inlock(range, Status::OK());
- _receivingChunks.insert(
- std::make_pair(range.getMin().getOwned(),
- CachedChunkInfo(range.getMax().getOwned(), ChunkVersion::IGNORED())));
-
- // For compatibility with the current range deleter, update the pending chunks on the collection
- // metadata to include the chunk being received
- ChunkType chunk;
- chunk.setMin(range.getMin());
- chunk.setMax(range.getMax());
- _setActiveMetadata_inlock(_activeMetadataTracker->metadata->clonePlusPending(chunk));
-}
-
-void MetadataManager::forgetReceive(const ChunkRange& range) {
- stdx::lock_guard<stdx::mutex> scopedLock(_managerLock);
-
- {
- auto it = _receivingChunks.find(range.getMin());
- invariant(it != _receivingChunks.end());
-
- // Verify entire ChunkRange is identical, not just the min key.
- invariant(
- SimpleBSONObjComparator::kInstance.evaluate(it->second.getMaxKey() == range.getMax()));
-
- _receivingChunks.erase(it);
- }
-
- // This is potentially a partially received data, which needs to be cleaned up
- _addRangeToClean_inlock(range);
-
- // For compatibility with the current range deleter, update the pending chunks on the collection
- // metadata to exclude the chunk being received, which was added in beginReceive
- ChunkType chunk;
- chunk.setMin(range.getMin());
- chunk.setMax(range.getMax());
- _setActiveMetadata_inlock(_activeMetadataTracker->metadata->cloneMinusPending(chunk));
-}
-
-RangeMap MetadataManager::getCopyOfReceivingChunks() {
- stdx::lock_guard<stdx::mutex> scopedLock(_managerLock);
- return _receivingChunks;
-}
-
void MetadataManager::_setActiveMetadata_inlock(std::unique_ptr<CollectionMetadata> newMetadata) {
- if (_activeMetadataTracker->usageCounter > 0) {
- _metadataInUse.push_front(std::move(_activeMetadataTracker));
+ if (_activeMetadataTracker->usageCounter != 0 || _activeMetadataTracker->orphans) {
+ _metadataInUse.push_back(std::move(_activeMetadataTracker));
}
-
- _activeMetadataTracker = stdx::make_unique<CollectionMetadataTracker>(std::move(newMetadata));
+ _activeMetadataTracker = std::make_shared<Tracker>(std::move(newMetadata), this);
}
-void MetadataManager::_removeMetadata_inlock(CollectionMetadataTracker* metadataTracker) {
- invariant(metadataTracker->usageCounter == 0);
-
- auto i = _metadataInUse.begin();
- const auto e = _metadataInUse.end();
- while (i != e) {
- if (metadataTracker == i->get()) {
- _metadataInUse.erase(i);
- return;
+// call locked
+void MetadataManager::_retireExpiredMetadata() {
+ bool notify = false;
+ while (!_metadataInUse.empty() && _metadataInUse.front()->usageCounter == 0) {
+ // No ScopedCollectionMetadata can see this Tracker, other than, maybe, the caller.
+ auto& tracker = _metadataInUse.front();
+ if (tracker->orphans) {
+ notify = true;
+ log() << "Queries possibly dependent on " << _nss.ns() << " range " << *tracker->orphans
+ << " finished; scheduling range for deletion";
+ _pushRangeToClean(*tracker->orphans);
}
-
- ++i;
+ tracker->metadata.reset(); // Discard the CollectionMetadata.
+ _metadataInUse.pop_front(); // Disconnect from the tracker (and maybe destroy it)
+ }
+ if (_metadataInUse.empty() && _activeMetadataTracker->orphans) {
+ notify = true;
+ log() << "Queries possibly dependent on " << _nss.ns() << " range "
+ << *_activeMetadataTracker->orphans << " finished; scheduling range for deletion";
+ _pushRangeToClean(*_activeMetadataTracker->orphans);
+ _activeMetadataTracker->orphans = boost::none;
+ }
+ if (notify) {
+ _notifyInUse(); // wake up waitForClean because we changed inUse
}
}
-MetadataManager::CollectionMetadataTracker::CollectionMetadataTracker(
- std::unique_ptr<CollectionMetadata> m)
- : metadata(std::move(m)) {}
+MetadataManager::Tracker::Tracker(std::unique_ptr<CollectionMetadata> md, MetadataManager* mgr)
+ : metadata(std::move(md)), manager(mgr) {}
-ScopedCollectionMetadata::ScopedCollectionMetadata() = default;
+// ScopedCollectionMetadata members
-// called in lock
+// call with MetadataManager locked
ScopedCollectionMetadata::ScopedCollectionMetadata(
- MetadataManager* manager, MetadataManager::CollectionMetadataTracker* tracker)
- : _manager(manager), _tracker(tracker) {
- _tracker->usageCounter++;
+ std::shared_ptr<MetadataManager::Tracker> tracker)
+ : _tracker(std::move(tracker)) {
+ ++_tracker->usageCounter;
}
ScopedCollectionMetadata::~ScopedCollectionMetadata() {
- if (!_tracker)
- return;
- _decrementUsageCounter();
+ _clear();
}
CollectionMetadata* ScopedCollectionMetadata::operator->() const {
- return _tracker->metadata.get();
+ return _tracker ? _tracker->metadata.get() : nullptr;
}
CollectionMetadata* ScopedCollectionMetadata::getMetadata() const {
- return _tracker->metadata.get();
+ return _tracker ? _tracker->metadata.get() : nullptr;
+}
+
+void ScopedCollectionMetadata::_clear() {
+ if (!_tracker) {
+ return;
+ }
+ // Note: There is no risk of deadlock here because the only other place in MetadataManager
+ // that takes the trackerLock, ~MetadataManager(), does not hold _managerLock at the same time,
+ // and ScopedCollectionMetadata takes _managerLock only here.
+ stdx::unique_lock<stdx::mutex> trackerLock(_tracker->trackerLock);
+ MetadataManager* manager = _tracker->manager;
+ if (manager) {
+ stdx::lock_guard<stdx::mutex> managerLock(_tracker->manager->_managerLock);
+ trackerLock.unlock();
+ invariant(_tracker->usageCounter != 0);
+ if (--_tracker->usageCounter == 0 && !manager->_shuttingDown) {
+ // MetadataManager doesn't care which usageCounter went to zero. It justs retires all
+ // that are older than the oldest tracker still in use by queries. (Some start out at
+ // zero, some go to zero but can't be expired yet.) Note that new instances of
+ // ScopedCollectionMetadata may get attached to the active tracker, so its usage
+ // count can increase from zero, unlike most reference counts.
+ manager->_retireExpiredMetadata();
+ }
+ } else {
+ trackerLock.unlock();
+ }
+ _tracker.reset(); // disconnect from the tracker.
}
+// do not call with MetadataManager locked
ScopedCollectionMetadata::ScopedCollectionMetadata(ScopedCollectionMetadata&& other) {
- *this = std::move(other);
+ *this = std::move(other); // Rely on this->_tracker being zero-initialized already.
}
+// do not call with MetadataManager locked
ScopedCollectionMetadata& ScopedCollectionMetadata::operator=(ScopedCollectionMetadata&& other) {
if (this != &other) {
- // If "this" was previously initialized, make sure we perform the same logic as in the
- // destructor to decrement _tracker->usageCounter for the CollectionMetadata "this" had a
- // reference to before replacing _tracker with other._tracker.
- if (_tracker) {
- _decrementUsageCounter();
- }
-
- _manager = other._manager;
- _tracker = other._tracker;
- other._manager = nullptr;
- other._tracker = nullptr;
+ _clear();
+ _tracker = std::move(other._tracker);
}
-
return *this;
}
-void ScopedCollectionMetadata::_decrementUsageCounter() {
- invariant(_manager);
- invariant(_tracker);
- stdx::lock_guard<stdx::mutex> scopedLock(_manager->_managerLock);
- invariant(_tracker->usageCounter > 0);
- if (--_tracker->usageCounter == 0) {
- _manager->_removeMetadata_inlock(_tracker);
- }
+ScopedCollectionMetadata::operator bool() const {
+ return _tracker && _tracker->metadata; // with a Collection lock the metadata member is stable
}
-ScopedCollectionMetadata::operator bool() const {
- return _tracker && _tracker->metadata.get();
+void MetadataManager::toBSONPending(BSONArrayBuilder& bb) const {
+ for (auto it = _receivingChunks.begin(); it != _receivingChunks.end(); ++it) {
+ BSONArrayBuilder pendingBB(bb.subarrayStart());
+ pendingBB.append(it->first);
+ pendingBB.append(it->second.getMaxKey());
+ pendingBB.done();
+ }
}
-RangeMap MetadataManager::getCopyOfRangesToClean() {
+void MetadataManager::append(BSONObjBuilder* builder) {
stdx::lock_guard<stdx::mutex> scopedLock(_managerLock);
- return _getCopyOfRangesToClean_inlock();
-}
-RangeMap MetadataManager::_getCopyOfRangesToClean_inlock() {
- RangeMap ranges = SimpleBSONObjComparator::kInstance.makeBSONObjIndexedMap<CachedChunkInfo>();
- for (auto it = _rangesToClean.begin(); it != _rangesToClean.end(); ++it) {
- ranges.insert(std::make_pair(
- it->first, CachedChunkInfo(it->second.getMax(), ChunkVersion::IGNORED())));
+ _rangesToClean.append(builder);
+
+ BSONArrayBuilder pcArr(builder->subarrayStart("pendingChunks"));
+ for (const auto& entry : _receivingChunks) {
+ BSONObjBuilder obj;
+ ChunkRange r = ChunkRange(entry.first, entry.second.getMaxKey());
+ r.append(&obj);
+ pcArr.append(obj.done());
+ }
+ pcArr.done();
+
+ BSONArrayBuilder amrArr(builder->subarrayStart("activeMetadataRanges"));
+ for (const auto& entry : _activeMetadataTracker->metadata->getChunks()) {
+ BSONObjBuilder obj;
+ ChunkRange r = ChunkRange(entry.first, entry.second.getMaxKey());
+ r.append(&obj);
+ amrArr.append(obj.done());
}
- return ranges;
+ amrArr.done();
}
-std::shared_ptr<Notification<Status>> MetadataManager::addRangeToClean(const ChunkRange& range) {
- stdx::lock_guard<stdx::mutex> scopedLock(_managerLock);
- return _addRangeToClean_inlock(range);
+void MetadataManager::_scheduleCleanup(executor::TaskExecutor* executor, NamespaceString nss) {
+ executor->scheduleWork([executor, nss](auto&) {
+ const int maxToDelete = std::max(int(internalQueryExecYieldIterations.load()), 1);
+ Client::initThreadIfNotAlready("Collection Range Deleter");
+ auto UniqueOpCtx = Client::getCurrent()->makeOperationContext();
+ auto opCtx = UniqueOpCtx.get();
+ bool again = CollectionRangeDeleter::cleanUpNextRange(opCtx, nss, maxToDelete);
+ if (again) {
+ _scheduleCleanup(executor, nss);
+ }
+ });
}
-std::shared_ptr<Notification<Status>> MetadataManager::_addRangeToClean_inlock(
- const ChunkRange& range) {
- // This first invariant currently makes an unnecessary copy, to reuse the
- // rangeMapOverlaps helper function.
- invariant(!rangeMapOverlaps(_getCopyOfRangesToClean_inlock(), range.getMin(), range.getMax()));
- invariant(!rangeMapOverlaps(_receivingChunks, range.getMin(), range.getMax()));
+// call locked
+void MetadataManager::_pushRangeToClean(ChunkRange const& range) {
+ _rangesToClean.add(range);
+ if (_rangesToClean.size() == 1) {
+ _scheduleCleanup(_executor, _nss);
+ }
+}
+
+void MetadataManager::_addToReceiving(ChunkRange const& range) {
+ _receivingChunks.insert(
+ std::make_pair(range.getMin().getOwned(),
+ CachedChunkInfo(range.getMax().getOwned(), ChunkVersion::IGNORED())));
+}
- RangeToCleanDescriptor descriptor(range.getMax().getOwned());
- _rangesToClean.insert(std::make_pair(range.getMin().getOwned(), descriptor));
+bool MetadataManager::beginReceive(ChunkRange const& range) {
+ stdx::unique_lock<stdx::mutex> scopedLock(_managerLock);
- // If _rangesToClean was previously empty, we need to start the collection range deleter
- if (_rangesToClean.size() == 1UL) {
- ShardingState::get(_serviceContext)->scheduleCleanup(_nss);
+ auto* metadata = _activeMetadataTracker->metadata.get();
+ if (_overlapsInUseChunk(range) || metadata->rangeOverlapsChunk(range)) {
+ log() << "Rejecting in-migration to " << _nss.ns() << " range " << range
+ << " because a running query might depend on documents in the range";
+ return false;
}
+ _addToReceiving(range);
+ _pushRangeToClean(range);
+ log() << "Scheduling deletion of any documents in " << _nss.ns() << " range " << range
+ << " before migrating in a chunk covering the range";
+ return true;
+}
- return descriptor.getNotification();
+void MetadataManager::_removeFromReceiving(ChunkRange const& range) {
+ auto it = _receivingChunks.find(range.getMin());
+ invariant(it != _receivingChunks.end());
+ _receivingChunks.erase(it);
}
-void MetadataManager::removeRangeToClean(const ChunkRange& range, Status deletionStatus) {
+void MetadataManager::forgetReceive(ChunkRange const& range) {
stdx::lock_guard<stdx::mutex> scopedLock(_managerLock);
- _removeRangeToClean_inlock(range, deletionStatus);
+ // This is potentially a partially received chunk, which needs to be cleaned up. We know none
+ // of these documents are in use, so they can go straight to the deletion queue.
+ log() << "Abandoning in-migration of " << _nss.ns() << " range " << range
+ << "; scheduling deletion of any documents already copied";
+
+ invariant(!_overlapsInUseChunk(range) &&
+ !_activeMetadataTracker->metadata->rangeOverlapsChunk(range));
+
+ _removeFromReceiving(range);
+ _pushRangeToClean(range);
}
-void MetadataManager::_removeRangeToClean_inlock(const ChunkRange& range, Status deletionStatus) {
- auto it = _rangesToClean.upper_bound(range.getMin());
- // We want our iterator to point at the greatest value
- // that is still less than or equal to range.
- if (it != _rangesToClean.begin()) {
- --it;
+Status MetadataManager::cleanUpRange(ChunkRange const& range) {
+ stdx::unique_lock<stdx::mutex> scopedLock(_managerLock);
+ CollectionMetadata* metadata = _activeMetadataTracker->metadata.get();
+ invariant(metadata != nullptr);
+
+ if (metadata->rangeOverlapsChunk(range)) {
+ return {ErrorCodes::RangeOverlapConflict,
+ str::stream() << "Requested deletion range overlaps a live shard chunk"};
}
- for (; it != _rangesToClean.end() &&
- SimpleBSONObjComparator::kInstance.evaluate(it->first < range.getMax());) {
- if (SimpleBSONObjComparator::kInstance.evaluate(it->second.getMax() <= range.getMin())) {
- ++it;
- continue;
- }
+ if (rangeMapOverlaps(_receivingChunks, range.getMin(), range.getMax())) {
+ return {ErrorCodes::RangeOverlapConflict,
+ str::stream() << "Requested deletion range overlaps a chunk being migrated in"};
+ }
- // There's overlap between *it and range so we remove *it
- // and then replace with new ranges.
- BSONObj oldMin = it->first;
- BSONObj oldMax = it->second.getMax();
- it->second.complete(deletionStatus);
- _rangesToClean.erase(it++);
- if (SimpleBSONObjComparator::kInstance.evaluate(oldMin < range.getMin())) {
- _addRangeToClean_inlock(ChunkRange(oldMin, range.getMin()));
- }
+ if (!_overlapsInUseChunk(range)) {
+ // No running queries can depend on it, so queue it for deletion immediately.
+ log() << "Scheduling " << _nss.ns() << " range " << redact(range.toString())
+ << " for deletion";
+
+ _pushRangeToClean(range);
+ } else {
+ invariant(!_metadataInUse.empty());
- if (SimpleBSONObjComparator::kInstance.evaluate(oldMax > range.getMax())) {
- _addRangeToClean_inlock(ChunkRange(range.getMax(), oldMax));
+ if (_activeMetadataTracker->orphans) {
+ _setActiveMetadata_inlock(_activeMetadataTracker->metadata->clone());
}
+
+ _activeMetadataTracker->orphans.emplace(range.getMin().getOwned(),
+ range.getMax().getOwned());
+
+ log() << "Scheduling " << _nss.ns() << " range " << redact(range.toString())
+ << " for deletion after all possibly-dependent queries finish";
}
+
+ return Status::OK();
}
-void MetadataManager::append(BSONObjBuilder* builder) {
+size_t MetadataManager::numberOfRangesToCleanStillInUse() {
stdx::lock_guard<stdx::mutex> scopedLock(_managerLock);
+ size_t count = _activeMetadataTracker->orphans ? 1 : 0;
+ count += std::count_if(_metadataInUse.begin(), _metadataInUse.end(), [](auto& tracker) {
+ return bool(tracker->orphans);
+ });
+ return count;
+}
- BSONArrayBuilder rtcArr(builder->subarrayStart("rangesToClean"));
- for (const auto& entry : _rangesToClean) {
- BSONObjBuilder obj;
- ChunkRange r = ChunkRange(entry.first, entry.second.getMax());
- r.append(&obj);
- rtcArr.append(obj.done());
- }
- rtcArr.done();
+size_t MetadataManager::numberOfRangesToClean() {
+ stdx::unique_lock<stdx::mutex> scopedLock(_managerLock);
+ return _rangesToClean.size();
+}
- BSONArrayBuilder pcArr(builder->subarrayStart("pendingChunks"));
- for (const auto& entry : _receivingChunks) {
- BSONObjBuilder obj;
- ChunkRange r = ChunkRange(entry.first, entry.second.getMaxKey());
- r.append(&obj);
- pcArr.append(obj.done());
- }
- pcArr.done();
+MetadataManager::CleanupNotification MetadataManager::trackOrphanedDataCleanup(
+ ChunkRange const& range) {
- BSONArrayBuilder amrArr(builder->subarrayStart("activeMetadataRanges"));
- for (const auto& entry : _activeMetadataTracker->metadata->getChunks()) {
- BSONObjBuilder obj;
- ChunkRange r = ChunkRange(entry.first, entry.second.getMaxKey());
- r.append(&obj);
- amrArr.append(obj.done());
+ stdx::unique_lock<stdx::mutex> scopedLock(_managerLock);
+ if (_overlapsInUseCleanups(range))
+ return _notification;
+ return _rangesToClean.overlaps(range);
+}
+
+// call locked
+bool MetadataManager::_overlapsInUseChunk(ChunkRange const& range) {
+ if (_activeMetadataTracker->metadata->rangeOverlapsChunk(range)) {
+ return true; // refcount doesn't matter for the active case
}
- amrArr.done();
+ for (auto& tracker : _metadataInUse) {
+ if (tracker->usageCounter != 0 && tracker->metadata->rangeOverlapsChunk(range)) {
+ return true;
+ }
+ }
+ return false;
}
-bool MetadataManager::hasRangesToClean() {
- stdx::lock_guard<stdx::mutex> scopedLock(_managerLock);
- return !_rangesToClean.empty();
+// call locked
+bool MetadataManager::_overlapsInUseCleanups(ChunkRange const& range) {
+ if (_activeMetadataTracker->orphans && _activeMetadataTracker->orphans->overlapWith(range)) {
+ return true;
+ }
+ for (auto& tracker : _metadataInUse) {
+ if (tracker->orphans && bool(tracker->orphans->overlapWith(range))) {
+ return true;
+ }
+ }
+ return false;
}
-bool MetadataManager::isInRangesToClean(const ChunkRange& range) {
- stdx::lock_guard<stdx::mutex> scopedLock(_managerLock);
- // For convenience, this line makes an unnecessary copy, to reuse the
- // rangeMapContains helper function.
- return rangeMapContains(_getCopyOfRangesToClean_inlock(), range.getMin(), range.getMax());
+// call locked
+void MetadataManager::_notifyInUse() {
+ _notification->set(Status::OK()); // wake up waitForClean
+ _notification = std::make_shared<Notification<Status>>();
}
-ChunkRange MetadataManager::getNextRangeToClean() {
- stdx::lock_guard<stdx::mutex> scopedLock(_managerLock);
- invariant(!_rangesToClean.empty());
- auto it = _rangesToClean.begin();
- return ChunkRange(it->first, it->second.getMax());
+boost::optional<KeyRange> MetadataManager::getNextOrphanRange(BSONObj const& from) {
+ stdx::unique_lock<stdx::mutex> scopedLock(_managerLock);
+ invariant(_activeMetadataTracker->metadata);
+ return _activeMetadataTracker->metadata->getNextOrphanRange(_receivingChunks, from);
}
+
} // namespace mongo
diff --git a/src/mongo/db/s/metadata_manager.h b/src/mongo/db/s/metadata_manager.h
index 5c2e7f2e64a..a76766e5e75 100644
--- a/src/mongo/db/s/metadata_manager.h
+++ b/src/mongo/db/s/metadata_manager.h
@@ -29,17 +29,18 @@
#pragma once
#include <list>
-#include <memory>
#include "mongo/base/disallow_copying.h"
#include "mongo/bson/simple_bsonobj_comparator.h"
#include "mongo/db/namespace_string.h"
+#include "mongo/db/range_arithmetic.h"
#include "mongo/db/s/collection_metadata.h"
+#include "mongo/db/s/collection_range_deleter.h"
#include "mongo/db/service_context.h"
+#include "mongo/executor/task_executor.h"
#include "mongo/s/catalog/type_chunk.h"
-#include "mongo/util/concurrency/notification.h"
-
#include "mongo/stdx/memory.h"
+#include "mongo/util/concurrency/notification.h"
namespace mongo {
@@ -49,7 +50,7 @@ class MetadataManager {
MONGO_DISALLOW_COPYING(MetadataManager);
public:
- MetadataManager(ServiceContext* sc, NamespaceString nss);
+ MetadataManager(ServiceContext*, NamespaceString nss, executor::TaskExecutor* rangeDeleter);
~MetadataManager();
/**
@@ -62,167 +63,186 @@ public:
ScopedCollectionMetadata getActiveMetadata();
/**
+ * Returns the number of CollectionMetadata objects being maintained on behalf of running
+ * queries. The actual number may vary after it returns, so this is really only useful for unit
+ * tests.
+ */
+ size_t numberOfMetadataSnapshots();
+
+ /**
* Uses the contents of the specified metadata as a way to purge any pending chunks.
*/
void refreshActiveMetadata(std::unique_ptr<CollectionMetadata> newMetadata);
+ void toBSONPending(BSONArrayBuilder& bb) const;
+
/**
- * Puts the specified range on the list of chunks, which are being received so that the range
- * deleter process will not clean the partially migrated data.
+ * Appends information on all the chunk ranges in rangesToClean to builder.
*/
- void beginReceive(const ChunkRange& range);
+ void append(BSONObjBuilder* builder);
/**
- * Removes a range from the list of chunks, which are being received. Used externally to
- * indicate that a chunk migration failed.
+ * Returns a map to the set of chunks being migrated in.
*/
- void forgetReceive(const ChunkRange& range);
+ RangeMap const& getReceiveMap() const {
+ return _receivingChunks;
+ }
/**
- * Gets copy of the set of chunk ranges which are being received for this collection. This
- * method is intended for testing purposes only and should not be used in any production code.
+ * If no running queries can depend on documents in the range, schedules any such documents for
+ * immediate cleanup. Otherwise, returns false.
*/
- RangeMap getCopyOfReceivingChunks();
+ bool beginReceive(ChunkRange const& range);
/**
- * Adds a new range to be cleaned up.
- * The newly introduced range must not overlap with the existing ranges.
- */
- std::shared_ptr<Notification<Status>> addRangeToClean(const ChunkRange& range);
+ * Removes the range from the pending list, and schedules any documents in the range for
+ * immediate cleanup. Assumes no active queries can see any local documents in the range.
+ */
+ void forgetReceive(const ChunkRange& range);
/**
- * Calls removeRangeToClean with Status::OK.
+ * Initiates cleanup of the orphaned documents as if a chunk has been migrated out. If any
+ * documents in the range might still be in use by running queries, queues cleanup to begin
+ * after they have all terminated. Otherwise, schedules documents for immediate cleanup.
+ * Fails if the range overlaps any current local shard chunk.
+ *
+ * Must be called with the collection locked for writing. To monitor completion, use
+ * trackOrphanedDataCleanup or CollectionShardingState::waitForClean.
*/
- void removeRangeToClean(const ChunkRange& range) {
- removeRangeToClean(range, Status::OK());
- }
+ Status cleanUpRange(ChunkRange const& range);
/**
- * Removes the specified range from the ranges to be cleaned up.
- * The specified deletionStatus will be returned to callers waiting
- * on whether the deletion succeeded or failed.
+ * Returns the number of ranges scheduled to be cleaned, exclusive of such ranges that might
+ * still be in use by running queries. Outside of test drivers, the actual number may vary
+ * after it returns, so this is really only useful for unit tests.
*/
- void removeRangeToClean(const ChunkRange& range, Status deletionStatus);
+ size_t numberOfRangesToClean();
/**
- * Gets copy of the set of chunk ranges which are scheduled for cleanup.
- * Converts RangeToCleanMap to RangeMap.
+ * Returns the number of ranges scheduled to be cleaned once all queries that could depend on
+ * them have terminated. The actual number may vary after it returns, so this is really only
+ * useful for unit tests.
*/
- RangeMap getCopyOfRangesToClean();
+ size_t numberOfRangesToCleanStillInUse();
+ using CleanupNotification = CollectionRangeDeleter::DeleteNotification;
/**
- * Appends information on all the chunk ranges in rangesToClean to builder.
+ * Reports whether the argument range is still scheduled for deletion. If not, returns nullptr.
+ * Otherwise, returns a notification n such that n->get(opCtx) will wake when deletion of a
+ * range (possibly the one of interest) is completed.
*/
- void append(BSONObjBuilder* builder);
+ CleanupNotification trackOrphanedDataCleanup(ChunkRange const& orphans);
+
+ boost::optional<KeyRange> getNextOrphanRange(BSONObj const& from);
+
+private:
+ struct Tracker;
/**
- * Returns true if _rangesToClean is not empty.
+ * Retires any metadata that has fallen out of use, and pushes any orphan ranges found in them
+ * to the list of ranges actively being cleaned up.
*/
- bool hasRangesToClean();
+ void _retireExpiredMetadata();
/**
- * Returns true if the exact range is in _rangesToClean.
+ * Pushes current set of chunks, if any, to _metadataInUse, replaces it with newMetadata.
*/
- bool isInRangesToClean(const ChunkRange& range);
+ void _setActiveMetadata_inlock(std::unique_ptr<CollectionMetadata> newMetadata);
/**
- * Gets and returns, but does not remove, a single ChunkRange from _rangesToClean.
- * Should not be called if _rangesToClean is empty: it will hit an invariant.
+ * Returns true if the specified range overlaps any chunk that might be currently in use by a
+ * running query.
+ *
+ * must be called locked.
*/
- ChunkRange getNextRangeToClean();
-private:
- friend class ScopedCollectionMetadata;
+ bool _overlapsInUseChunk(ChunkRange const& range);
- struct CollectionMetadataTracker {
- public:
- /**
- * Creates a new CollectionMetadataTracker, with the usageCounter initialized to zero.
- */
- CollectionMetadataTracker(std::unique_ptr<CollectionMetadata> m);
-
- std::unique_ptr<CollectionMetadata> metadata;
-
- uint32_t usageCounter{0};
- };
-
- // Class for the value of the _rangesToClean map. Used because callers of addRangeToClean
- // sometimes need to wait until a range is deleted. Thus, complete(Status) is called
- // when the range is deleted from _rangesToClean in removeRangeToClean(), letting callers
- // of addRangeToClean know if the deletion succeeded or failed.
- class RangeToCleanDescriptor {
- public:
- /**
- * Initializes a RangeToCleanDescriptor with an empty notification.
- */
- RangeToCleanDescriptor(BSONObj max)
- : _max(max.getOwned()), _notification(std::make_shared<Notification<Status>>()) {}
-
- /**
- * Gets the maximum value of the range to be deleted.
- */
- const BSONObj& getMax() const {
- return _max;
- }
-
- // See comment on _notification.
- std::shared_ptr<Notification<Status>> getNotification() {
- return _notification;
- }
-
- /**
- * Sets the status on _notification. This will tell threads
- * waiting on the value of status that the deletion succeeded or failed.
- */
- void complete(Status status) {
- _notification->set(status);
- }
-
- private:
- // The maximum value of the range to be deleted.
- BSONObj _max;
-
- // This _notification will be set with a value indicating whether the deletion
- // succeeded or failed.
- std::shared_ptr<Notification<Status>> _notification;
- };
+ /**
+ * Returns true if any range (possibly) still in use, but scheduled for cleanup, overlaps
+ * the argument range.
+ *
+ * Must be called locked.
+ */
+ bool _overlapsInUseCleanups(ChunkRange const& range);
/**
- * Removes the CollectionMetadata stored in the tracker from the _metadataInUse
- * list (if it's there).
+ * Deletes ranges, in background, until done, normally using a task executor attached to the
+ * ShardingState.
+ *
+ * Each time it completes cleaning up a range, it wakes up clients waiting on completion of
+ * that range, which may then verify their range has no more deletions scheduled, and proceed.
*/
- void _removeMetadata_inlock(CollectionMetadataTracker* metadataTracker);
+ static void _scheduleCleanup(executor::TaskExecutor*, NamespaceString nss);
- std::shared_ptr<Notification<Status>> _addRangeToClean_inlock(const ChunkRange& range);
+ /**
+ * Adds the range to the list of ranges scheduled for immediate deletion, and schedules a
+ * a background task to perform the work.
+ *
+ * Must be called locked.
+ */
+ void _pushRangeToClean(ChunkRange const& range);
- void _removeRangeToClean_inlock(const ChunkRange& range, Status deletionStatus);
+ /**
+ * Adds a range from the receiving map, so getNextOrphanRange will skip ranges migrating in.
+ */
+ void _addToReceiving(ChunkRange const& range);
- RangeMap _getCopyOfRangesToClean_inlock();
+ /**
+ * Removes a range from the receiving map after a migration failure. range.minKey() must
+ * exactly match an element of _receivingChunks.
+ */
+ void _removeFromReceiving(ChunkRange const& range);
- void _setActiveMetadata_inlock(std::unique_ptr<CollectionMetadata> newMetadata);
+ /**
+ * Wakes up any clients waiting on a range to leave _metadataInUse
+ *
+ * Must be called locked.
+ */
+ void _notifyInUse();
+
+ // data members
const NamespaceString _nss;
// ServiceContext from which to obtain instances of global support objects.
- ServiceContext* _serviceContext;
+ ServiceContext* const _serviceContext;
// Mutex to protect the state below
stdx::mutex _managerLock;
- // Holds the collection metadata, which is currently active
- std::unique_ptr<CollectionMetadataTracker> _activeMetadataTracker;
+ bool _shuttingDown{false};
- // Holds collection metadata instances, which have previously been active, but are still in use
- // by still active server operations or cursors
- std::list<std::unique_ptr<CollectionMetadataTracker>> _metadataInUse;
+ // The collection metadata reflecting chunks accessible to new queries
+ std::shared_ptr<Tracker> _activeMetadataTracker;
- // Chunk ranges which are currently assumed to be transferred to the shard. Indexed by the min
- // key of the range.
+ // Previously active collection metadata instances still in use by active server operations or
+ // cursors
+ std::list<std::shared_ptr<Tracker>> _metadataInUse;
+
+ // Chunk ranges being migrated into to the shard. Indexed by the min key of the range.
RangeMap _receivingChunks;
- // Set of ranges to be deleted. Indexed by the min key of the range.
- typedef BSONObjIndexedMap<RangeToCleanDescriptor> RangeToCleanMap;
- RangeToCleanMap _rangesToClean;
+ // Clients can sleep on copies of _notification while waiting for their orphan ranges to fall
+ // out of use.
+ std::shared_ptr<Notification<Status>> _notification;
+
+ // The background task that deletes documents from orphaned chunk ranges.
+ executor::TaskExecutor* const _executor;
+
+ // Ranges being deleted, or scheduled to be deleted, by a background task
+ CollectionRangeDeleter _rangesToClean;
+
+ // friends
+
+ // for access to _decrementTrackerUsage(), and to Tracker.
+ friend class ScopedCollectionMetadata;
+
+ // for access to _rangesToClean and _managerLock under task callback
+ friend bool CollectionRangeDeleter::cleanUpNextRange(OperationContext*,
+ NamespaceString const&,
+ int maxToDelete,
+ CollectionRangeDeleter*);
};
class ScopedCollectionMetadata {
@@ -233,41 +253,43 @@ public:
* Creates an empty ScopedCollectionMetadata. Using the default constructor means that no
* metadata is available.
*/
- ScopedCollectionMetadata();
-
+ ScopedCollectionMetadata() = default;
~ScopedCollectionMetadata();
+ /**
+ * Binds *this to the same tracker as other, if any.
+ */
ScopedCollectionMetadata(ScopedCollectionMetadata&& other);
ScopedCollectionMetadata& operator=(ScopedCollectionMetadata&& other);
/**
- * Dereferencing the ScopedCollectionMetadata will dereference the internal CollectionMetadata.
+ * Dereferencing the ScopedCollectionMetadata dereferences the private CollectionMetadata.
*/
CollectionMetadata* operator->() const;
CollectionMetadata* getMetadata() const;
/**
- * True if the ScopedCollectionMetadata stores a metadata (is not empty)
+ * True if the ScopedCollectionMetadata stores a metadata (is not empty) and the collection is
+ * sharded.
*/
operator bool() const;
private:
- friend ScopedCollectionMetadata MetadataManager::getActiveMetadata();
-
/**
- * Increments the counter in the CollectionMetadataTracker.
+ * If tracker is non-null, increments the refcount in the specified tracker.
+ *
+ * Must be called with tracker->manager locked.
*/
- ScopedCollectionMetadata(MetadataManager* manager,
- MetadataManager::CollectionMetadataTracker* tracker);
+ ScopedCollectionMetadata(std::shared_ptr<MetadataManager::Tracker> tracker);
/**
- * Decrements the usageCounter and conditionally makes a call to _removeMetadata on
- * the tracker if the count has reached zero.
+ * Disconnect from the tracker, possibly triggering GC of unused CollectionMetadata.
*/
- void _decrementUsageCounter();
+ void _clear();
+
+ std::shared_ptr<MetadataManager::Tracker> _tracker{nullptr};
- MetadataManager* _manager{nullptr};
- MetadataManager::CollectionMetadataTracker* _tracker{nullptr};
+ friend ScopedCollectionMetadata MetadataManager::getActiveMetadata(); // uses our private ctor
};
} // namespace mongo
diff --git a/src/mongo/db/s/metadata_manager_test.cpp b/src/mongo/db/s/metadata_manager_test.cpp
index d4416cbfbea..7a559619a86 100644
--- a/src/mongo/db/s/metadata_manager_test.cpp
+++ b/src/mongo/db/s/metadata_manager_test.cpp
@@ -31,29 +31,74 @@
#include "mongo/db/s/metadata_manager.h"
#include "mongo/bson/bsonobjbuilder.h"
+#include "mongo/client/remote_command_targeter_mock.h"
#include "mongo/db/client.h"
+#include "mongo/db/db_raii.h"
+#include "mongo/db/dbdirectclient.h"
#include "mongo/db/jsobj.h"
#include "mongo/db/namespace_string.h"
+#include "mongo/db/repl/replication_coordinator_mock.h"
#include "mongo/db/s/collection_metadata.h"
+#include "mongo/db/s/collection_sharding_state.h"
#include "mongo/db/s/sharding_state.h"
+#include "mongo/db/s/type_shard_identity.h"
+#include "mongo/db/server_options.h"
#include "mongo/db/service_context.h"
#include "mongo/db/service_context_d_test_fixture.h"
+#include "mongo/executor/task_executor.h"
+#include "mongo/s/catalog/dist_lock_catalog_impl.h"
+#include "mongo/s/catalog/dist_lock_manager_mock.h"
+#include "mongo/s/catalog/sharding_catalog_client_mock.h"
#include "mongo/s/catalog/type_chunk.h"
+#include "mongo/s/client/shard_registry.h"
+#include "mongo/s/sharding_mongod_test_fixture.h"
+#include "mongo/stdx/condition_variable.h"
#include "mongo/stdx/memory.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/assert_util.h"
+
+#include <boost/optional.hpp>
+
namespace mongo {
namespace {
using unittest::assertGet;
-class MetadataManagerTest : public ServiceContextMongoDTest {
+const NamespaceString kNss("TestDB", "TestColl");
+const std::string kPattern = "X";
+const BSONObj kShardKeyPattern{BSON(kPattern << 1)};
+const std::string kShardName{"a"};
+const HostAndPort dummyHost("dummy", 123);
+
+class MetadataManagerTest : public ShardingMongodTestFixture {
+public:
+ std::shared_ptr<RemoteCommandTargeterMock> configTargeter() {
+ return RemoteCommandTargeterMock::get(shardRegistry()->getConfigShard()->getTargeter());
+ }
+
protected:
void setUp() override {
- ServiceContextMongoDTest::setUp();
- ShardingState::get(getServiceContext())
- ->setScheduleCleanupFunctionForTest([](const NamespaceString& nss) {});
+ ShardingMongodTestFixture::setUp();
+ serverGlobalParams.clusterRole = ClusterRole::ShardServer;
+ initializeGlobalShardingStateForMongodForTest(ConnectionString(dummyHost));
+
+ configTargeter()->setFindHostReturnValue(dummyHost);
+ }
+
+ std::unique_ptr<DistLockCatalog> makeDistLockCatalog(ShardRegistry* shardRegistry) override {
+ invariant(shardRegistry);
+ return stdx::make_unique<DistLockCatalogImpl>(shardRegistry);
+ }
+
+ std::unique_ptr<DistLockManager> makeDistLockManager(
+ std::unique_ptr<DistLockCatalog> distLockCatalog) override {
+ return stdx::make_unique<DistLockManagerMock>(std::move(distLockCatalog));
+ }
+
+ std::unique_ptr<ShardingCatalogClient> makeShardingCatalogClient(
+ std::unique_ptr<DistLockManager> distLockManager) override {
+ return stdx::make_unique<ShardingCatalogClientMock>(std::move(distLockManager));
}
static std::unique_ptr<CollectionMetadata> makeEmptyMetadata() {
@@ -79,7 +124,6 @@ protected:
const BSONObj& minKey,
const BSONObj& maxKey,
const ChunkVersion& chunkVersion) {
- invariant(chunkVersion.epoch() == metadata.getShardVersion().epoch());
invariant(chunkVersion.isSet());
invariant(chunkVersion > metadata.getCollVersion());
invariant(minKey.woCompare(maxKey) < 0);
@@ -92,10 +136,23 @@ protected:
return stdx::make_unique<CollectionMetadata>(
metadata.getKeyPattern(), chunkVersion, chunkVersion, std::move(chunksMap));
}
+
+ CollectionMetadata* addChunk(MetadataManager* manager) {
+ ScopedCollectionMetadata scopedMetadata1 = manager->getActiveMetadata();
+
+ ChunkVersion newVersion = scopedMetadata1->getCollVersion();
+ newVersion.incMajor();
+ std::unique_ptr<CollectionMetadata> cm2 = cloneMetadataPlusChunk(
+ *scopedMetadata1.getMetadata(), BSON("key" << 0), BSON("key" << 20), newVersion);
+ auto cm2Ptr = cm2.get();
+
+ manager->refreshActiveMetadata(std::move(cm2));
+ return cm2Ptr;
+ }
};
TEST_F(MetadataManagerTest, SetAndGetActiveMetadata) {
- MetadataManager manager(getServiceContext(), NamespaceString("TestDb", "CollDB"));
+ MetadataManager manager(getServiceContext(), kNss, executor());
std::unique_ptr<CollectionMetadata> cm = makeEmptyMetadata();
auto cmPtr = cm.get();
@@ -107,197 +164,76 @@ TEST_F(MetadataManagerTest, SetAndGetActiveMetadata) {
TEST_F(MetadataManagerTest, ResetActiveMetadata) {
- MetadataManager manager(getServiceContext(), NamespaceString("TestDb", "CollDB"));
+ MetadataManager manager(getServiceContext(), kNss, executor());
manager.refreshActiveMetadata(makeEmptyMetadata());
-
- ScopedCollectionMetadata scopedMetadata1 = manager.getActiveMetadata();
-
- ChunkVersion newVersion = scopedMetadata1->getCollVersion();
- newVersion.incMajor();
- std::unique_ptr<CollectionMetadata> cm2 = cloneMetadataPlusChunk(
- *scopedMetadata1.getMetadata(), BSON("key" << 0), BSON("key" << 10), newVersion);
- auto cm2Ptr = cm2.get();
-
- manager.refreshActiveMetadata(std::move(cm2));
+ auto cm2Ptr = addChunk(&manager);
ScopedCollectionMetadata scopedMetadata2 = manager.getActiveMetadata();
-
ASSERT_EQ(cm2Ptr, scopedMetadata2.getMetadata());
};
-TEST_F(MetadataManagerTest, AddAndRemoveRangesToClean) {
- MetadataManager manager(getServiceContext(), NamespaceString("TestDb", "CollDB"));
- ChunkRange cr1 = ChunkRange(BSON("key" << 0), BSON("key" << 10));
- ChunkRange cr2 = ChunkRange(BSON("key" << 10), BSON("key" << 20));
-
- manager.addRangeToClean(cr1);
- ASSERT_EQ(manager.getCopyOfRangesToClean().size(), 1UL);
- manager.removeRangeToClean(cr1);
- ASSERT_EQ(manager.getCopyOfRangesToClean().size(), 0UL);
-
- manager.addRangeToClean(cr1);
- manager.addRangeToClean(cr2);
- manager.removeRangeToClean(cr1);
- ASSERT_EQ(manager.getCopyOfRangesToClean().size(), 1UL);
- auto ranges = manager.getCopyOfRangesToClean();
- auto it = ranges.find(cr2.getMin());
- ChunkRange remainingChunk = ChunkRange(it->first, it->second.getMaxKey());
- ASSERT_EQ(remainingChunk.toString(), cr2.toString());
- manager.removeRangeToClean(cr2);
-}
-
-// Tests that a removal in the middle of an existing ChunkRange results in
-// two correct chunk ranges.
-TEST_F(MetadataManagerTest, RemoveRangeInMiddleOfRange) {
- MetadataManager manager(getServiceContext(), NamespaceString("TestDb", "CollDB"));
- ChunkRange cr1 = ChunkRange(BSON("key" << 0), BSON("key" << 10));
-
- manager.addRangeToClean(cr1);
- manager.removeRangeToClean(ChunkRange(BSON("key" << 4), BSON("key" << 6)));
- ASSERT_EQ(manager.getCopyOfRangesToClean().size(), 2UL);
-
- auto ranges = manager.getCopyOfRangesToClean();
- auto it = ranges.find(BSON("key" << 0));
- ChunkRange expectedChunk = ChunkRange(BSON("key" << 0), BSON("key" << 4));
- ChunkRange remainingChunk = ChunkRange(it->first, it->second.getMaxKey());
- ASSERT_EQ(remainingChunk.toString(), expectedChunk.toString());
-
- it++;
- expectedChunk = ChunkRange(BSON("key" << 6), BSON("key" << 10));
- remainingChunk = ChunkRange(it->first, it->second.getMaxKey());
- ASSERT_EQ(remainingChunk.toString(), expectedChunk.toString());
-
- manager.removeRangeToClean(cr1);
- ASSERT_EQ(manager.getCopyOfRangesToClean().size(), 0UL);
-}
-
-// Tests removals that overlap with just one ChunkRange.
-TEST_F(MetadataManagerTest, RemoveRangeWithSingleRangeOverlap) {
- MetadataManager manager(getServiceContext(), NamespaceString("TestDb", "CollDB"));
- ChunkRange cr1 = ChunkRange(BSON("key" << 0), BSON("key" << 10));
+// In the following tests, the ranges-to-clean is not drained by the background deleter thread
+// because the collection involved has no CollectionShardingState, so the task just returns without
+// doing anything.
- manager.addRangeToClean(cr1);
- manager.removeRangeToClean(ChunkRange(BSON("key" << 0), BSON("key" << 5)));
- ASSERT_EQ(manager.getCopyOfRangesToClean().size(), 1UL);
- auto ranges = manager.getCopyOfRangesToClean();
- auto it = ranges.find(BSON("key" << 5));
- ChunkRange remainingChunk = ChunkRange(it->first, it->second.getMaxKey());
- ChunkRange expectedChunk = ChunkRange(BSON("key" << 5), BSON("key" << 10));
- ASSERT_EQ(remainingChunk.toString(), expectedChunk.toString());
-
- manager.removeRangeToClean(ChunkRange(BSON("key" << 4), BSON("key" << 6)));
- ASSERT_EQ(manager.getCopyOfRangesToClean().size(), 1UL);
- ranges = manager.getCopyOfRangesToClean();
- it = ranges.find(BSON("key" << 6));
- remainingChunk = ChunkRange(it->first, it->second.getMaxKey());
- expectedChunk = ChunkRange(BSON("key" << 6), BSON("key" << 10));
- ASSERT_EQ(remainingChunk.toString(), expectedChunk.toString());
-
- manager.removeRangeToClean(ChunkRange(BSON("key" << 9), BSON("key" << 13)));
- ASSERT_EQ(manager.getCopyOfRangesToClean().size(), 1UL);
- ranges = manager.getCopyOfRangesToClean();
- it = ranges.find(BSON("key" << 6));
- remainingChunk = ChunkRange(it->first, it->second.getMaxKey());
- expectedChunk = ChunkRange(BSON("key" << 6), BSON("key" << 9));
- ASSERT_EQ(remainingChunk.toString(), expectedChunk.toString());
-
- manager.removeRangeToClean(ChunkRange(BSON("key" << 0), BSON("key" << 10)));
- ASSERT_EQ(manager.getCopyOfRangesToClean().size(), 0UL);
-}
-
-// Tests removals that overlap with more than one ChunkRange.
-TEST_F(MetadataManagerTest, RemoveRangeWithMultipleRangeOverlaps) {
- MetadataManager manager(getServiceContext(), NamespaceString("TestDb", "CollDB"));
- ChunkRange cr1 = ChunkRange(BSON("key" << 0), BSON("key" << 10));
- ChunkRange cr2 = ChunkRange(BSON("key" << 10), BSON("key" << 20));
- ChunkRange cr3 = ChunkRange(BSON("key" << 20), BSON("key" << 30));
-
- manager.addRangeToClean(cr1);
- manager.addRangeToClean(cr2);
- manager.addRangeToClean(cr3);
- ASSERT_EQ(manager.getCopyOfRangesToClean().size(), 3UL);
-
- manager.removeRangeToClean(ChunkRange(BSON("key" << 8), BSON("key" << 22)));
- ASSERT_EQ(manager.getCopyOfRangesToClean().size(), 2UL);
- auto ranges = manager.getCopyOfRangesToClean();
- auto it = ranges.find(BSON("key" << 0));
- ChunkRange remainingChunk = ChunkRange(it->first, it->second.getMaxKey());
- ChunkRange expectedChunk = ChunkRange(BSON("key" << 0), BSON("key" << 8));
- ASSERT_EQ(remainingChunk.toString(), expectedChunk.toString());
- it++;
- remainingChunk = ChunkRange(it->first, it->second.getMaxKey());
- expectedChunk = ChunkRange(BSON("key" << 22), BSON("key" << 30));
- ASSERT_EQ(remainingChunk.toString(), expectedChunk.toString());
-
- manager.removeRangeToClean(ChunkRange(BSON("key" << 0), BSON("key" << 30)));
- ASSERT_EQ(manager.getCopyOfRangesToClean().size(), 0UL);
-}
-
-TEST_F(MetadataManagerTest, AddAndRemoveRangeNotificationsBlockAndYield) {
- MetadataManager manager(getServiceContext(), NamespaceString("TestDb", "CollDB"));
- manager.refreshActiveMetadata(makeEmptyMetadata());
-
- ChunkRange cr1(BSON("key" << 0), BSON("key" << 10));
- auto notification = manager.addRangeToClean(cr1);
- manager.removeRangeToClean(cr1, Status::OK());
- ASSERT_OK(notification->get());
- ASSERT_EQ(manager.getCopyOfRangesToClean().size(), 0UL);
-}
-
-TEST_F(MetadataManagerTest, RemoveRangeToCleanCorrectlySetsBadStatus) {
- MetadataManager manager(getServiceContext(), NamespaceString("TestDb", "CollDB"));
+TEST_F(MetadataManagerTest, CleanUpForMigrateIn) {
+ MetadataManager manager(getServiceContext(), kNss, executor());
manager.refreshActiveMetadata(makeEmptyMetadata());
- ChunkRange cr1(BSON("key" << 0), BSON("key" << 10));
- auto notification = manager.addRangeToClean(cr1);
- manager.removeRangeToClean(cr1, Status(ErrorCodes::InternalError, "test error"));
- ASSERT_NOT_OK(notification->get());
- ASSERT_EQ(manager.getCopyOfRangesToClean().size(), 0UL);
+ ChunkRange range2(BSON("key" << 10), BSON("key" << 20));
+ ASSERT(manager.beginReceive(ChunkRange(BSON("key" << 0), BSON("key" << 10))));
+ ASSERT(manager.beginReceive(ChunkRange(BSON("key" << 10), BSON("key" << 20))));
+ ASSERT_EQ(manager.numberOfRangesToClean(), 2UL);
+ ASSERT_EQ(manager.numberOfRangesToCleanStillInUse(), 0UL);
}
-TEST_F(MetadataManagerTest, RemovingSubrangeStillSetsNotificationStatus) {
- MetadataManager manager(getServiceContext(), NamespaceString("TestDb", "CollDB"));
+TEST_F(MetadataManagerTest, AddRangeNotificationsBlockAndYield) {
+ MetadataManager manager(getServiceContext(), kNss, executor());
manager.refreshActiveMetadata(makeEmptyMetadata());
ChunkRange cr1(BSON("key" << 0), BSON("key" << 10));
- auto notification = manager.addRangeToClean(cr1);
- manager.removeRangeToClean(ChunkRange(BSON("key" << 3), BSON("key" << 7)));
- ASSERT_OK(notification->get());
- ASSERT_EQ(manager.getCopyOfRangesToClean().size(), 2UL);
- manager.removeRangeToClean(cr1);
- ASSERT_EQ(manager.getCopyOfRangesToClean().size(), 0UL);
-
- notification = manager.addRangeToClean(cr1);
- manager.removeRangeToClean(ChunkRange(BSON("key" << 7), BSON("key" << 15)));
- ASSERT_OK(notification->get());
- ASSERT_EQ(manager.getCopyOfRangesToClean().size(), 1UL);
- manager.removeRangeToClean(cr1);
- ASSERT_EQ(manager.getCopyOfRangesToClean().size(), 0UL);
+ ASSERT_OK(manager.cleanUpRange(cr1));
+ ASSERT_EQ(manager.numberOfRangesToClean(), 1UL);
+ auto notification = manager.trackOrphanedDataCleanup(cr1);
+ ASSERT(notification != nullptr && !bool(*notification));
+ notification->set(Status::OK());
+ ASSERT(bool(*notification));
+ ASSERT_OK(notification->get(operationContext()));
}
TEST_F(MetadataManagerTest, NotificationBlocksUntilDeletion) {
- MetadataManager manager(getServiceContext(), NamespaceString("TestDb", "CollDB"));
+ ChunkRange cr1(BSON("key" << 20), BSON("key" << 30));
+ MetadataManager manager(getServiceContext(), kNss, executor());
manager.refreshActiveMetadata(makeEmptyMetadata());
-
- ChunkRange cr1(BSON("key" << 0), BSON("key" << 10));
- auto notification = manager.addRangeToClean(cr1);
- auto opCtx = cc().makeOperationContext();
- // Once the new range deleter is set up, this might fail if the range deleter
- // deleted cr1 before we got here...
- ASSERT_FALSE(notification->waitFor(opCtx.get(), Milliseconds(0)));
-
- manager.removeRangeToClean(cr1);
- ASSERT_TRUE(notification->waitFor(opCtx.get(), Milliseconds(0)));
- ASSERT_OK(notification->get());
+ auto notif = manager.trackOrphanedDataCleanup(cr1);
+ ASSERT(notif.get() == nullptr);
+ {
+ ASSERT_EQ(manager.numberOfMetadataSnapshots(), 0UL);
+ ASSERT_EQ(manager.numberOfRangesToClean(), 0UL);
+
+ auto scm = manager.getActiveMetadata(); // and increment scm's refcount
+ ASSERT(bool(scm));
+ addChunk(&manager); // push new metadata
+
+ ASSERT_EQ(manager.numberOfMetadataSnapshots(), 1UL);
+ ASSERT_EQ(manager.numberOfRangesToClean(), 0UL); // not yet...
+
+ manager.cleanUpRange(cr1);
+ ASSERT_EQ(manager.numberOfMetadataSnapshots(), 1UL);
+ ASSERT_EQ(manager.numberOfRangesToClean(), 1UL);
+
+ notif = manager.trackOrphanedDataCleanup(cr1); // will wake when scm goes away
+ } // scm destroyed, refcount of tracker goes to zero
+ ASSERT_EQ(manager.numberOfMetadataSnapshots(), 0UL);
+ ASSERT_EQ(manager.numberOfRangesToClean(), 1UL);
+ ASSERT(bool(notif)); // woke
+ notif = manager.trackOrphanedDataCleanup(cr1); // now tracking the range in _rangesToClean
+ ASSERT(notif.get() != nullptr);
}
-
TEST_F(MetadataManagerTest, RefreshAfterSuccessfulMigrationSinglePending) {
- MetadataManager manager(getServiceContext(), NamespaceString("TestDb", "CollDB"));
+ MetadataManager manager(getServiceContext(), kNss, executor());
manager.refreshActiveMetadata(makeEmptyMetadata());
-
const ChunkRange cr1(BSON("key" << 0), BSON("key" << 10));
- manager.beginReceive(cr1);
- ASSERT_EQ(manager.getCopyOfReceivingChunks().size(), 1UL);
ASSERT_EQ(manager.getActiveMetadata()->getChunks().size(), 0UL);
ChunkVersion version = manager.getActiveMetadata()->getCollVersion();
@@ -305,21 +241,16 @@ TEST_F(MetadataManagerTest, RefreshAfterSuccessfulMigrationSinglePending) {
manager.refreshActiveMetadata(cloneMetadataPlusChunk(
*manager.getActiveMetadata().getMetadata(), cr1.getMin(), cr1.getMax(), version));
- ASSERT_EQ(manager.getCopyOfReceivingChunks().size(), 0UL);
ASSERT_EQ(manager.getActiveMetadata()->getChunks().size(), 1UL);
}
+
TEST_F(MetadataManagerTest, RefreshAfterSuccessfulMigrationMultiplePending) {
- MetadataManager manager(getServiceContext(), NamespaceString("TestDb", "CollDB"));
+ MetadataManager manager(getServiceContext(), kNss, executor());
manager.refreshActiveMetadata(makeEmptyMetadata());
const ChunkRange cr1(BSON("key" << 0), BSON("key" << 10));
- manager.beginReceive(cr1);
-
const ChunkRange cr2(BSON("key" << 30), BSON("key" << 40));
- manager.beginReceive(cr2);
-
- ASSERT_EQ(manager.getCopyOfReceivingChunks().size(), 2UL);
ASSERT_EQ(manager.getActiveMetadata()->getChunks().size(), 0UL);
{
@@ -328,7 +259,7 @@ TEST_F(MetadataManagerTest, RefreshAfterSuccessfulMigrationMultiplePending) {
manager.refreshActiveMetadata(cloneMetadataPlusChunk(
*manager.getActiveMetadata().getMetadata(), cr1.getMin(), cr1.getMax(), version));
- ASSERT_EQ(manager.getCopyOfReceivingChunks().size(), 1UL);
+ ASSERT_EQ(manager.numberOfRangesToClean(), 0UL);
ASSERT_EQ(manager.getActiveMetadata()->getChunks().size(), 1UL);
}
@@ -338,22 +269,16 @@ TEST_F(MetadataManagerTest, RefreshAfterSuccessfulMigrationMultiplePending) {
manager.refreshActiveMetadata(cloneMetadataPlusChunk(
*manager.getActiveMetadata().getMetadata(), cr2.getMin(), cr2.getMax(), version));
- ASSERT_EQ(manager.getCopyOfReceivingChunks().size(), 0UL);
ASSERT_EQ(manager.getActiveMetadata()->getChunks().size(), 2UL);
}
}
TEST_F(MetadataManagerTest, RefreshAfterNotYetCompletedMigrationMultiplePending) {
- MetadataManager manager(getServiceContext(), NamespaceString("TestDb", "CollDB"));
+ MetadataManager manager(getServiceContext(), kNss, executor());
manager.refreshActiveMetadata(makeEmptyMetadata());
const ChunkRange cr1(BSON("key" << 0), BSON("key" << 10));
- manager.beginReceive(cr1);
-
const ChunkRange cr2(BSON("key" << 30), BSON("key" << 40));
- manager.beginReceive(cr2);
-
- ASSERT_EQ(manager.getCopyOfReceivingChunks().size(), 2UL);
ASSERT_EQ(manager.getActiveMetadata()->getChunks().size(), 0UL);
ChunkVersion version = manager.getActiveMetadata()->getCollVersion();
@@ -361,35 +286,22 @@ TEST_F(MetadataManagerTest, RefreshAfterNotYetCompletedMigrationMultiplePending)
manager.refreshActiveMetadata(cloneMetadataPlusChunk(
*manager.getActiveMetadata().getMetadata(), BSON("key" << 50), BSON("key" << 60), version));
- ASSERT_EQ(manager.getCopyOfReceivingChunks().size(), 2UL);
ASSERT_EQ(manager.getActiveMetadata()->getChunks().size(), 1UL);
}
TEST_F(MetadataManagerTest, BeginReceiveWithOverlappingRange) {
- MetadataManager manager(getServiceContext(), NamespaceString("TestDb", "CollDB"));
+ MetadataManager manager(getServiceContext(), kNss, executor());
manager.refreshActiveMetadata(makeEmptyMetadata());
const ChunkRange cr1(BSON("key" << 0), BSON("key" << 10));
- manager.beginReceive(cr1);
-
const ChunkRange cr2(BSON("key" << 30), BSON("key" << 40));
- manager.beginReceive(cr2);
-
const ChunkRange crOverlap(BSON("key" << 5), BSON("key" << 35));
- manager.beginReceive(crOverlap);
-
- const auto copyOfPending = manager.getCopyOfReceivingChunks();
- ASSERT_EQ(copyOfPending.size(), 1UL);
ASSERT_EQ(manager.getActiveMetadata()->getChunks().size(), 0UL);
-
- const auto it = copyOfPending.find(BSON("key" << 5));
- ASSERT(it != copyOfPending.end());
- ASSERT_BSONOBJ_EQ(it->second.getMaxKey(), BSON("key" << 35));
}
TEST_F(MetadataManagerTest, RefreshMetadataAfterDropAndRecreate) {
- MetadataManager manager(getServiceContext(), NamespaceString("TestDb", "CollDB"));
+ MetadataManager manager(getServiceContext(), kNss, executor());
manager.refreshActiveMetadata(makeEmptyMetadata());
{
@@ -403,9 +315,8 @@ TEST_F(MetadataManagerTest, RefreshMetadataAfterDropAndRecreate) {
// Now, pretend that the collection was dropped and recreated
auto recreateMetadata = makeEmptyMetadata();
- ChunkVersion newVersion = recreateMetadata->getCollVersion();
+ ChunkVersion newVersion = manager.getActiveMetadata()->getCollVersion();
newVersion.incMajor();
-
manager.refreshActiveMetadata(cloneMetadataPlusChunk(
*recreateMetadata, BSON("key" << 20), BSON("key" << 30), newVersion));
ASSERT_EQ(manager.getActiveMetadata()->getChunks().size(), 1UL);
@@ -418,28 +329,15 @@ TEST_F(MetadataManagerTest, RefreshMetadataAfterDropAndRecreate) {
// Tests membership functions for _rangesToClean
TEST_F(MetadataManagerTest, RangesToCleanMembership) {
- MetadataManager manager(getServiceContext(), NamespaceString("TestDb", "CollDB"));
+ MetadataManager manager(getServiceContext(), kNss, executor());
manager.refreshActiveMetadata(makeEmptyMetadata());
- ASSERT(!manager.hasRangesToClean());
-
- ChunkRange cr1 = ChunkRange(BSON("key" << 0), BSON("key" << 10));
- manager.addRangeToClean(cr1);
-
- ASSERT(manager.hasRangesToClean());
- ASSERT(manager.isInRangesToClean(cr1));
-}
-
-// Tests that getNextRangeToClean successfully pulls a stored ChunkRange
-TEST_F(MetadataManagerTest, GetNextRangeToClean) {
- MetadataManager manager(getServiceContext(), NamespaceString("TestDb", "CollDB"));
- manager.refreshActiveMetadata(makeEmptyMetadata());
+ ASSERT(manager.numberOfRangesToClean() == 0UL);
ChunkRange cr1 = ChunkRange(BSON("key" << 0), BSON("key" << 10));
- manager.addRangeToClean(cr1);
+ ASSERT_OK(manager.cleanUpRange(cr1));
- ChunkRange cr2 = manager.getNextRangeToClean();
- ASSERT_EQ(cr1.toString(), cr2.toString());
+ ASSERT(manager.numberOfRangesToClean() == 1UL);
}
} // namespace
diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp
index dcb35a57af5..f1e0ffd925b 100644
--- a/src/mongo/db/s/migration_destination_manager.cpp
+++ b/src/mongo/db/s/migration_destination_manager.cpp
@@ -51,6 +51,7 @@
#include "mongo/db/repl/repl_client_info.h"
#include "mongo/db/repl/replication_coordinator_global.h"
#include "mongo/db/s/collection_metadata.h"
+#include "mongo/db/s/collection_range_deleter.h"
#include "mongo/db/s/collection_sharding_state.h"
#include "mongo/db/s/migration_util.h"
#include "mongo/db/s/move_timing_helper.h"
@@ -59,6 +60,7 @@
#include "mongo/s/catalog/type_chunk.h"
#include "mongo/s/shard_key_pattern.h"
#include "mongo/stdx/chrono.h"
+#include "mongo/util/concurrency/notification.h"
#include "mongo/util/fail_point_service.h"
#include "mongo/util/log.h"
#include "mongo/util/mongoutils/str.h"
@@ -216,6 +218,24 @@ void MigrationDestinationManager::setState(State newState) {
_state = newState;
}
+void MigrationDestinationManager::setStateFail(std::string msg) {
+ log() << msg;
+ {
+ stdx::lock_guard<stdx::mutex> sl(_mutex);
+ _errmsg = std::move(msg);
+ _state = FAIL;
+ }
+}
+
+void MigrationDestinationManager::setStateFailWarn(std::string msg) {
+ warning() << msg;
+ {
+ stdx::lock_guard<stdx::mutex> sl(_mutex);
+ _errmsg = std::move(msg);
+ _state = FAIL;
+ }
+}
+
bool MigrationDestinationManager::isActive() const {
stdx::lock_guard<stdx::mutex> lk(_mutex);
return _isActive_inlock();
@@ -301,7 +321,6 @@ Status MigrationDestinationManager::start(const NamespaceString& nss,
_sessionId = sessionId;
_scopedRegisterReceiveChunk = std::move(scopedRegisterReceiveChunk);
-
// TODO: If we are here, the migrate thread must have completed, otherwise _active above
// would be false, so this would never block. There is no better place with the current
// implementation where to join the thread.
@@ -378,8 +397,9 @@ Status MigrationDestinationManager::startCommit(const MigrationSessionId& sessio
while (_sessionId) {
if (stdx::cv_status::timeout ==
_isActiveCV.wait_until(lock, deadline.toSystemTimePoint())) {
+ _errmsg = str::stream() << "startCommit timed out waiting, " << _sessionId->toString();
_state = FAIL;
- return {ErrorCodes::CommandFailed, "startCommit timed out waiting "};
+ return {ErrorCodes::CommandFailed, _errmsg};
}
}
if (_state != DONE) {
@@ -405,29 +425,13 @@ void MigrationDestinationManager::_migrateThread(BSONObj min,
_migrateDriver(
opCtx.get(), min, max, shardKeyPattern, fromShardConnString, epoch, writeConcern);
} catch (std::exception& e) {
- {
- stdx::lock_guard<stdx::mutex> sl(_mutex);
- _state = FAIL;
- _errmsg = e.what();
- }
-
- log() << "migrate failed: " << redact(e.what());
+ setStateFail(str::stream() << "migrate failed: " << redact(e.what()));
} catch (...) {
- {
- stdx::lock_guard<stdx::mutex> sl(_mutex);
- _state = FAIL;
- _errmsg = "UNKNOWN ERROR";
- }
-
- log() << "migrate failed with unknown exception";
+ setStateFail("migrate failed with unknown exception: UNKNOWN ERROR");
}
if (getState() != DONE) {
- // Unprotect the range if needed/possible on unsuccessful TO migration
- Status status = _forgetPending(opCtx.get(), _nss, min, max, epoch);
- if (!status.isOK()) {
- warning() << "Failed to remove pending range" << redact(causedBy(status));
- }
+ _forgetPending(opCtx.get(), _nss, epoch, ChunkRange(min, max));
}
stdx::lock_guard<stdx::mutex> lk(_mutex);
@@ -492,10 +496,8 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx,
OldClientWriteContext ctx(opCtx, _nss.ns());
if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(opCtx, _nss)) {
- _errmsg = str::stream() << "Not primary during migration: " << _nss.ns()
- << ": checking if collection exists";
- warning() << _errmsg;
- setState(FAIL);
+ setStateFailWarn(str::stream() << "Not primary during migration: " << _nss.ns()
+ << ": checking if collection exists");
return;
}
@@ -539,18 +541,14 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx,
OldClientContext ctx(opCtx, _nss.ns());
if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(opCtx, _nss)) {
- _errmsg = str::stream() << "Not primary during migration: " << _nss.ns();
- warning() << _errmsg;
- setState(FAIL);
+ setStateFailWarn(str::stream() << "Not primary during migration: " << _nss.ns());
return;
}
Database* db = ctx.db();
Collection* collection = db->getCollection(opCtx, _nss);
if (!collection) {
- _errmsg = str::stream() << "collection dropped during migration: " << _nss.ns();
- warning() << _errmsg;
- setState(FAIL);
+ setStateFailWarn(str::stream() << "collection dropped during migration: " << _nss.ns());
return;
}
@@ -560,30 +558,27 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx,
if (!indexSpecs.empty()) {
// Only copy indexes if the collection does not have any documents.
if (collection->numRecords(opCtx) > 0) {
- _errmsg = str::stream() << "aborting migration, shard is missing "
- << indexSpecs.size() << " indexes and "
- << "collection is not empty. Non-trivial "
- << "index creation should be scheduled manually";
- warning() << _errmsg;
- setState(FAIL);
+ setStateFailWarn(str::stream() << "aborting migration, shard is missing "
+ << indexSpecs.size()
+ << " indexes and "
+ << "collection is not empty. Non-trivial "
+ << "index creation should be scheduled manually");
return;
}
auto indexInfoObjs = indexer.init(indexSpecs);
if (!indexInfoObjs.isOK()) {
- _errmsg = str::stream() << "failed to create index before migrating data. "
- << " error: " << redact(indexInfoObjs.getStatus());
- warning() << _errmsg;
- setState(FAIL);
+ setStateFailWarn(str::stream() << "failed to create index before migrating data. "
+ << " error: "
+ << redact(indexInfoObjs.getStatus()));
return;
}
auto status = indexer.insertAllDocumentsInCollection();
if (!status.isOK()) {
- _errmsg = str::stream() << "failed to create index before migrating data. "
- << " error: " << redact(status);
- warning() << _errmsg;
- setState(FAIL);
+ setStateFailWarn(str::stream() << "failed to create index before migrating data. "
+ << " error: "
+ << redact(status));
return;
}
@@ -604,30 +599,21 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx,
}
{
- // 2. Synchronously delete any data which might have been left orphaned in range
- // being moved
-
- RangeDeleterOptions deleterOptions(
- KeyRange(_nss.ns(), min.getOwned(), max.getOwned(), shardKeyPattern));
- deleterOptions.writeConcern = writeConcern;
-
- // No need to wait since all existing cursors will filter out this range when returning
- // the results
- deleterOptions.waitForOpenCursors = false;
- deleterOptions.fromMigrate = true;
- deleterOptions.onlyRemoveOrphanedDocs = true;
- deleterOptions.removeSaverReason = "preCleanup";
-
- if (!getDeleter()->deleteNow(opCtx, deleterOptions, &_errmsg)) {
- warning() << "Failed to queue delete for migrate abort: " << redact(_errmsg);
- setState(FAIL);
+ // 2. Synchronously delete any data which might have been left orphaned in the range
+ // being moved, and wait for completion
+
+ auto footprint = ChunkRange(min, max);
+ Status status = _notePending(opCtx, _nss, epoch, footprint);
+ if (!status.isOK()) {
+ setStateFail(status.reason());
return;
}
- Status status = _notePending(opCtx, _nss, min, max, epoch);
+ _chunkMarkedPending = true; // no lock needed, only the migrate thread looks.
+
+ status = CollectionShardingState::waitForClean(opCtx, _nss, epoch, footprint);
if (!status.isOK()) {
- _errmsg = status.reason();
- setState(FAIL);
+ setStateFail(status.reason());
return;
}
@@ -646,10 +632,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx,
if (!conn->runCommand("admin",
migrateCloneRequest,
res)) { // gets array of objects to copy, in disk order
- setState(FAIL);
- _errmsg = "_migrateClone failed: ";
- _errmsg += redact(res.toString());
- log() << _errmsg;
+ setStateFail(str::stream() << "_migrateClone failed: " << redact(res.toString()));
conn.done();
return;
}
@@ -736,10 +719,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx,
while (true) {
BSONObj res;
if (!conn->runCommand("admin", xferModsRequest, res)) {
- setState(FAIL);
- _errmsg = "_transferMods failed: ";
- _errmsg += redact(res);
- log() << "_transferMods failed: " << redact(res);
+ setStateFail(str::stream() << "_transferMods failed: " << redact(res));
conn.done();
return;
}
@@ -772,10 +752,8 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx,
}
if (i == maxIterations) {
- _errmsg = "secondary can't keep up with migrate";
- log() << _errmsg;
+ setStateFail("secondary can't keep up with migrate");
conn.done();
- setState(FAIL);
return;
}
}
@@ -806,9 +784,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx,
}
if (t.minutes() >= 600) {
- setState(FAIL);
- _errmsg = "Cannot go to critical section because secondaries cannot keep up";
- log() << _errmsg;
+ setStateFail("Cannot go to critical section because secondaries cannot keep up");
return;
}
}
@@ -831,9 +807,8 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx,
BSONObj res;
if (!conn->runCommand("admin", xferModsRequest, res)) {
- log() << "_transferMods failed in STEADY state: " << redact(res);
- _errmsg = res.toString();
- setState(FAIL);
+ setStateFail(str::stream() << "_transferMods failed in STEADY state: "
+ << redact(res));
conn.done();
return;
}
@@ -863,8 +838,7 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* opCtx,
}
if (getState() == FAIL) {
- _errmsg = "timed out waiting for commit";
- log() << _errmsg;
+ setStateFail("timed out waiting for commit");
return;
}
@@ -992,12 +966,10 @@ bool MigrationDestinationManager::_flushPendingWrites(OperationContext* opCtx,
}
Status MigrationDestinationManager::_notePending(OperationContext* opCtx,
- const NamespaceString& nss,
- const BSONObj& min,
- const BSONObj& max,
- const OID& epoch) {
+ NamespaceString const& nss,
+ OID const& epoch,
+ ChunkRange const& range) {
AutoGetCollection autoColl(opCtx, nss, MODE_IX, MODE_X);
-
auto css = CollectionShardingState::get(opCtx, nss);
auto metadata = css->getMetadata();
@@ -1005,66 +977,44 @@ Status MigrationDestinationManager::_notePending(OperationContext* opCtx,
// for checking this here is that in the future we shouldn't have this problem.
if (!metadata || metadata->getCollVersion().epoch() != epoch) {
return {ErrorCodes::StaleShardVersion,
- str::stream() << "could not note chunk [" << min << "," << max << ")"
- << " as pending because the epoch for "
+ str::stream() << "not noting chunk " << redact(range.toString())
+ << " as pending because the epoch of "
<< nss.ns()
- << " has changed from "
- << epoch
- << " to "
- << (metadata ? metadata->getCollVersion().epoch()
- : ChunkVersion::UNSHARDED().epoch())};
+ << " changed"};
}
- css->beginReceive(ChunkRange(min, max));
-
- stdx::lock_guard<stdx::mutex> sl(_mutex);
- invariant(!_chunkMarkedPending);
- _chunkMarkedPending = true;
-
+ // start clearing any leftovers that would be in the new chunk
+ if (!css->beginReceive(range)) {
+ return {ErrorCodes::RangeOverlapConflict,
+ str::stream() << "Collection " << nss.ns() << " range " << redact(range.toString())
+ << " migration aborted; documents in range may still be in use on the"
+ " destination shard."};
+ }
return Status::OK();
}
-Status MigrationDestinationManager::_forgetPending(OperationContext* opCtx,
- const NamespaceString& nss,
- const BSONObj& min,
- const BSONObj& max,
- const OID& epoch) {
- {
- stdx::lock_guard<stdx::mutex> sl(_mutex);
- if (!_chunkMarkedPending) {
- return Status::OK();
- }
+void MigrationDestinationManager::_forgetPending(OperationContext* opCtx,
+ const NamespaceString& nss,
+ OID const& epoch,
+ ChunkRange const& range) {
- _chunkMarkedPending = false;
+ if (!_chunkMarkedPending) { // (no lock needed, only the migrate thread looks at this.)
+ return; // no documents can have been moved in, so there is nothing to clean up.
}
AutoGetCollection autoColl(opCtx, nss, MODE_IX, MODE_X);
-
auto css = CollectionShardingState::get(opCtx, nss);
auto metadata = css->getMetadata();
- // This can currently happen because drops aren't synchronized with in-migrations. The idea
- // for checking this here is that in the future we shouldn't have this problem.
+ // This can currently happen because drops aren't synchronized with in-migrations. The idea for
+ // checking this here is that in the future we shouldn't have this problem.
if (!metadata || metadata->getCollVersion().epoch() != epoch) {
- return {ErrorCodes::StaleShardVersion,
- str::stream() << "no need to forget pending chunk "
- << "["
- << min
- << ","
- << max
- << ")"
- << " because the epoch for "
- << nss.ns()
- << " has changed from "
- << epoch
- << " to "
- << (metadata ? metadata->getCollVersion().epoch()
- : ChunkVersion::UNSHARDED().epoch())};
+ log() << "no need to forget pending chunk " << redact(range.toString())
+ << " because the epoch for " << nss.ns() << " changed";
+ return;
}
- css->forgetReceive(ChunkRange(min, max));
-
- return Status::OK();
+ css->forgetReceive(range);
}
} // namespace mongo
diff --git a/src/mongo/db/s/migration_destination_manager.h b/src/mongo/db/s/migration_destination_manager.h
index c42ce1cdcb2..fd5c897ea6a 100644
--- a/src/mongo/db/s/migration_destination_manager.h
+++ b/src/mongo/db/s/migration_destination_manager.h
@@ -70,6 +70,13 @@ public:
void setState(State newState);
/**
+ * These log the argument msg; then, under lock, move msg to _errmsg and set the state to FAIL.
+ * The setStateWailWarn version logs with "warning() << msg".
+ */
+ void setStateFail(std::string msg);
+ void setStateFailWarn(std::string msg);
+
+ /**
* Checks whether the MigrationDestinationManager is currently handling a migration.
*/
bool isActive() const;
@@ -150,35 +157,17 @@ private:
/**
* Remembers a chunk range between 'min' and 'max' as a range which will have data migrated
- * into it. This data can then be protected against cleanup of orphaned data.
- *
- * Overlapping pending ranges will be removed, so it is only safe to use this when you know
- * your metadata view is definitive, such as at the start of a migration.
- *
- * TODO: Because migrations may currently be active when a collection drops, an epoch is
- * necessary to ensure the pending metadata change is still applicable.
+ * into it, to protect it against separate commands to clean up orphaned data. First, though,
+ * it schedules deletion of any documents in the range, so that process must be seen to be
+ * complete before migrating any new documents in.
*/
- Status _notePending(OperationContext* opCtx,
- const NamespaceString& nss,
- const BSONObj& min,
- const BSONObj& max,
- const OID& epoch);
+ Status _notePending(OperationContext*, NamespaceString const&, OID const&, ChunkRange const&);
/**
* Stops tracking a chunk range between 'min' and 'max' that previously was having data
- * migrated into it. This data is no longer protected against cleanup of orphaned data.
- *
- * To avoid removing pending ranges of other operations, ensure that this is only used when
- * a migration is still active.
- *
- * TODO: Because migrations may currently be active when a collection drops, an epoch is
- * necessary to ensure the pending metadata change is still applicable.
+ * migrated into it, and schedules deletion of any such documents already migrated in.
*/
- Status _forgetPending(OperationContext* opCtx,
- const NamespaceString& nss,
- const BSONObj& min,
- const BSONObj& max,
- const OID& epoch);
+ void _forgetPending(OperationContext*, NamespaceString const&, OID const&, ChunkRange const&);
/**
* Checks whether the MigrationDestinationManager is currently handling a migration by checking
diff --git a/src/mongo/db/s/migration_destination_manager_legacy_commands.cpp b/src/mongo/db/s/migration_destination_manager_legacy_commands.cpp
index 324f67906f9..05c21710bbf 100644
--- a/src/mongo/db/s/migration_destination_manager_legacy_commands.cpp
+++ b/src/mongo/db/s/migration_destination_manager_legacy_commands.cpp
@@ -138,19 +138,6 @@ public:
auto scopedRegisterReceiveChunk(
uassertStatusOK(shardingState->registerReceiveChunk(nss, chunkRange, fromShard)));
- // Even if this shard is not currently donating any chunks, it may still have pending
- // deletes from a previous migration, particularly if there are still open cursors on the
- // range pending deletion.
- const size_t numDeletes = getDeleter()->getTotalDeletes();
- if (numDeletes > 0) {
- errmsg = str::stream() << "can't accept new chunks because "
- << " there are still " << numDeletes
- << " deletes from previous migration";
-
- warning() << errmsg;
- return appendCommandStatus(result, {ErrorCodes::ChunkRangeCleanupPending, errmsg});
- }
-
uassertStatusOK(shardingState->migrationDestinationManager()->start(
nss,
std::move(scopedRegisterReceiveChunk),
diff --git a/src/mongo/db/s/migration_source_manager.cpp b/src/mongo/db/s/migration_source_manager.cpp
index 374b859ec15..136daf45e72 100644
--- a/src/mongo/db/s/migration_source_manager.cpp
+++ b/src/mongo/db/s/migration_source_manager.cpp
@@ -375,7 +375,8 @@ Status MigrationSourceManager::commitChunkMetadataOnConfig(OperationContext* opC
if (refreshStatus.isOK()) {
AutoGetCollection autoColl(opCtx, getNss(), MODE_IS);
- auto refreshedMetadata = CollectionShardingState::get(opCtx, getNss())->getMetadata();
+ auto css = CollectionShardingState::get(opCtx, getNss());
+ auto refreshedMetadata = css->getMetadata();
if (!refreshedMetadata) {
return {ErrorCodes::NamespaceNotSharded,
@@ -391,6 +392,10 @@ Status MigrationSourceManager::commitChunkMetadataOnConfig(OperationContext* opC
<< migrationCommitStatus.reason()};
}
+ // Schedule clearing out orphaned documents when they are no longer in active use.
+ const auto orphans = ChunkRange(_args.getMinKey(), _args.getMaxKey());
+ uassertStatusOK(css->cleanUpRange(orphans));
+
// Migration succeeded
log() << "Migration succeeded and updated collection version to "
<< refreshedMetadata->getCollVersion();
@@ -405,7 +410,8 @@ Status MigrationSourceManager::commitChunkMetadataOnConfig(OperationContext* opC
// We don't know whether migration succeeded or failed
return {migrationCommitStatus.code(),
- str::stream() << "Failed to refresh metadata after migration commit due to "
+ str::stream() << "Orphaned range not cleaned up. Failed to refresh metadata after"
+ " migration commit due to "
<< refreshStatus.toString()};
}
diff --git a/src/mongo/db/s/move_chunk_command.cpp b/src/mongo/db/s/move_chunk_command.cpp
index f68ab8dde0e..315055b0ce5 100644
--- a/src/mongo/db/s/move_chunk_command.cpp
+++ b/src/mongo/db/s/move_chunk_command.cpp
@@ -36,9 +36,12 @@
#include "mongo/db/auth/authorization_manager.h"
#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/commands.h"
+#include "mongo/db/db_raii.h"
#include "mongo/db/range_deleter_service.h"
+#include "mongo/db/repl/repl_client_info.h"
#include "mongo/db/s/chunk_move_write_concern_options.h"
#include "mongo/db/s/collection_metadata.h"
+#include "mongo/db/s/collection_sharding_state.h"
#include "mongo/db/s/migration_source_manager.h"
#include "mongo/db/s/move_timing_helper.h"
#include "mongo/db/s/sharding_state.h"
@@ -123,7 +126,7 @@ public:
// Make sure we're as up-to-date as possible with shard information. This catches the case
// where we might have changed a shard's host by removing/adding a shard with the same name.
- grid.shardRegistry()->reload(opCtx);
+ Grid::get(opCtx)->shardRegistry()->reload(opCtx);
auto scopedRegisterMigration =
uassertStatusOK(shardingState->registerDonateChunk(moveChunkRequest));
@@ -222,43 +225,21 @@ private:
MONGO_FAIL_POINT_PAUSE_WHILE_SET(moveChunkHangAtStep5);
uassertStatusOKWithWarning(migrationSourceManager.commitChunkMetadataOnConfig(opCtx));
- moveTimingHelper.done(6);
- MONGO_FAIL_POINT_PAUSE_WHILE_SET(moveChunkHangAtStep6);
}
+ moveTimingHelper.done(6);
+ MONGO_FAIL_POINT_PAUSE_WHILE_SET(moveChunkHangAtStep6);
- // Schedule the range deleter
- RangeDeleterOptions deleterOptions(KeyRange(moveChunkRequest.getNss().ns(),
- moveChunkRequest.getMinKey().getOwned(),
- moveChunkRequest.getMaxKey().getOwned(),
- shardKeyPattern));
- deleterOptions.writeConcern = writeConcernForRangeDeleter;
- deleterOptions.waitForOpenCursors = true;
- deleterOptions.fromMigrate = true;
- deleterOptions.onlyRemoveOrphanedDocs = true;
- deleterOptions.removeSaverReason = "post-cleanup";
-
+ auto range = ChunkRange(moveChunkRequest.getMinKey(), moveChunkRequest.getMaxKey());
if (moveChunkRequest.getWaitForDelete()) {
- log() << "doing delete inline for cleanup of chunk data";
-
- string errMsg;
-
- // This is an immediate delete, and as a consequence, there could be more
- // deletes happening simultaneously than there are deleter worker threads.
- if (!getDeleter()->deleteNow(opCtx, deleterOptions, &errMsg)) {
- log() << "Error occured while performing cleanup: " << redact(errMsg);
- }
+ CollectionShardingState::waitForClean(
+ opCtx, moveChunkRequest.getNss(), moveChunkRequest.getVersionEpoch(), range);
+ // Ensure that wait for write concern for the chunk cleanup will include
+ // the deletes performed by the range deleter thread.
+ repl::ReplClientInfo::forClient(opCtx->getClient()).setLastOpToSystemLastOpTime(opCtx);
} else {
- log() << "forking for cleanup of chunk data";
-
- string errMsg;
- if (!getDeleter()->queueDelete(opCtx,
- deleterOptions,
- NULL, // Don't want to be notified
- &errMsg)) {
- log() << "could not queue migration cleanup: " << redact(errMsg);
- }
+ log() << "Leaving cleanup of " << moveChunkRequest.getNss().ns() << " range "
+ << redact(range.toString()) << " to complete in background";
}
-
moveTimingHelper.done(7);
MONGO_FAIL_POINT_PAUSE_WHILE_SET(moveChunkHangAtStep7);
}
diff --git a/src/mongo/db/s/sharding_state.cpp b/src/mongo/db/s/sharding_state.cpp
index 0b3bc8bd9fb..ebe7b475a22 100644
--- a/src/mongo/db/s/sharding_state.cpp
+++ b/src/mongo/db/s/sharding_state.cpp
@@ -115,8 +115,7 @@ void updateShardIdentityConfigStringCB(const string& setName, const string& newC
ShardingState::ShardingState()
: _initializationState(static_cast<uint32_t>(InitializationState::kNew)),
_initializationStatus(Status(ErrorCodes::InternalError, "Uninitialized value")),
- _globalInit(&initializeGlobalShardingStateForMongod),
- _scheduleWorkFn([](NamespaceString nss) {}) {}
+ _globalInit(&initializeGlobalShardingStateForMongod) {}
ShardingState::~ShardingState() = default;
@@ -212,14 +211,6 @@ void ShardingState::setGlobalInitMethodForTest(GlobalInitFunc func) {
_globalInit = func;
}
-void ShardingState::setScheduleCleanupFunctionForTest(RangeDeleterCleanupNotificationFunc fn) {
- _scheduleWorkFn = fn;
-}
-
-void ShardingState::scheduleCleanup(const NamespaceString& nss) {
- _scheduleWorkFn(nss);
-}
-
Status ShardingState::onStaleShardVersion(OperationContext* opCtx,
const NamespaceString& nss,
const ChunkVersion& expectedVersion) {
@@ -338,7 +329,6 @@ Status ShardingState::initializeFromShardIdentity(OperationContext* opCtx,
_shardName = shardIdentity.getShardName();
_clusterId = shardIdentity.getClusterId();
- _initializeRangeDeleterTaskExecutor();
return status;
} catch (const DBException& ex) {
@@ -596,16 +586,24 @@ Status ShardingState::updateShardIdentityConfigString(OperationContext* opCtx,
return Status::OK();
}
-void ShardingState::_initializeRangeDeleterTaskExecutor() {
- invariant(!_rangeDeleterTaskExecutor);
- auto net =
- executor::makeNetworkInterface("NetworkInterfaceCollectionRangeDeleter-TaskExecutor");
- auto netPtr = net.get();
- _rangeDeleterTaskExecutor = stdx::make_unique<executor::ThreadPoolTaskExecutor>(
- stdx::make_unique<executor::NetworkInterfaceThreadPool>(netPtr), std::move(net));
+executor::TaskExecutor* ShardingState::getRangeDeleterTaskExecutor() {
+ stdx::lock_guard<stdx::mutex> lk(_rangeDeleterExecutor.lock);
+ if (_rangeDeleterExecutor.taskExecutor.get() == nullptr) {
+ static const char kExecName[] = "NetworkInterfaceCollectionRangeDeleter-TaskExecutor";
+ auto net = executor::makeNetworkInterface(kExecName);
+ auto pool = stdx::make_unique<executor::NetworkInterfaceThreadPool>(net.get());
+ _rangeDeleterExecutor.taskExecutor =
+ stdx::make_unique<executor::ThreadPoolTaskExecutor>(std::move(pool), std::move(net));
+ _rangeDeleterExecutor.taskExecutor->startup();
+ }
+ return _rangeDeleterExecutor.taskExecutor.get();
}
-executor::ThreadPoolTaskExecutor* ShardingState::getRangeDeleterTaskExecutor() {
- return _rangeDeleterTaskExecutor.get();
+ShardingState::RangeDeleterExecutor::~RangeDeleterExecutor() {
+ if (taskExecutor) {
+ taskExecutor->shutdown();
+ taskExecutor->join();
+ }
}
+
} // namespace mongo
diff --git a/src/mongo/db/s/sharding_state.h b/src/mongo/db/s/sharding_state.h
index e5a74e081a9..c6efdeaae30 100644
--- a/src/mongo/db/s/sharding_state.h
+++ b/src/mongo/db/s/sharding_state.h
@@ -73,10 +73,6 @@ public:
using GlobalInitFunc =
stdx::function<Status(OperationContext*, const ConnectionString&, StringData)>;
- // Signature for the callback function used by the MetadataManager to inform the
- // sharding subsystem that there is range cleanup work to be done.
- using RangeDeleterCleanupNotificationFunc = stdx::function<void(const NamespaceString&)>;
-
ShardingState();
~ShardingState();
@@ -242,18 +238,6 @@ public:
void scheduleCleanup(const NamespaceString& nss);
/**
- * Returns a pointer to the collection range deleter task executor.
- */
- executor::ThreadPoolTaskExecutor* getRangeDeleterTaskExecutor();
-
- /**
- * Sets the function used by scheduleWorkOnRangeDeleterTaskExecutor to
- * schedule work. Used for mocking the executor for testing. See the ShardingState
- * for the default implementation of _scheduleWorkFn.
- */
- void setScheduleCleanupFunctionForTest(RangeDeleterCleanupNotificationFunc fn);
-
- /**
* If started with --shardsvr, initializes sharding awareness from the shardIdentity document
* on disk, if there is one.
* If started with --shardsvr in queryableBackupMode, initializes sharding awareness from the
@@ -266,6 +250,11 @@ public:
*/
StatusWith<bool> initializeShardingAwarenessIfNeeded(OperationContext* opCtx);
+ /**
+ * Return the task executor to be shared by the range deleters for all collections.
+ */
+ executor::TaskExecutor* getRangeDeleterTaskExecutor();
+
private:
// Map from a namespace into the sharding state for each collection we have
typedef stdx::unordered_map<std::string, std::unique_ptr<CollectionShardingState>>
@@ -304,9 +293,6 @@ private:
*/
ChunkVersion _refreshMetadata(OperationContext* opCtx, const NamespaceString& nss);
- // Initializes a TaskExecutor for cleaning up orphaned ranges
- void _initializeRangeDeleterTaskExecutor();
-
// Manages the state of the migration recipient shard
MigrationDestinationManager _migrationDestManager;
@@ -339,12 +325,13 @@ private:
// Function for initializing the external sharding state components not owned here.
GlobalInitFunc _globalInit;
- // Function for scheduling work on the _rangeDeleterTaskExecutor.
- // Used in call to scheduleCleanup(NamespaceString).
- RangeDeleterCleanupNotificationFunc _scheduleWorkFn;
-
- // Task executor for the collection range deleter.
- std::unique_ptr<executor::ThreadPoolTaskExecutor> _rangeDeleterTaskExecutor;
+ // Task executor shared by the collection range deleters.
+ struct RangeDeleterExecutor {
+ stdx::mutex lock{};
+ std::unique_ptr<executor::TaskExecutor> taskExecutor{nullptr};
+ ~RangeDeleterExecutor();
+ };
+ RangeDeleterExecutor _rangeDeleterExecutor;
};
} // namespace mongo
diff --git a/src/mongo/db/stats/SConscript b/src/mongo/db/stats/SConscript
index bcba2b0eefe..fb58a79fb5d 100644
--- a/src/mongo/db/stats/SConscript
+++ b/src/mongo/db/stats/SConscript
@@ -93,7 +93,6 @@ env.Library(
source=[
"latency_server_status_section.cpp",
"lock_server_status_section.cpp",
- "range_deleter_server_status.cpp",
"snapshots.cpp",
'storage_stats.cpp',
],
@@ -101,8 +100,6 @@ env.Library(
'$BUILD_DIR/mongo/base',
'$BUILD_DIR/mongo/db/commands/core',
'$BUILD_DIR/mongo/db/index/index_access_methods',
- '$BUILD_DIR/mongo/db/range_deleter',
- '$BUILD_DIR/mongo/db/range_deleter_d',
'fill_locker_info',
'top',
#'$BUILD_DIR/mongo/db/catalog/catalog', # CYCLE
diff --git a/src/mongo/dbtests/dbhelper_tests.cpp b/src/mongo/dbtests/dbhelper_tests.cpp
index 29d89c9cc08..37dadcdf485 100644
--- a/src/mongo/dbtests/dbhelper_tests.cpp
+++ b/src/mongo/dbtests/dbhelper_tests.cpp
@@ -59,29 +59,7 @@ class RemoveRange {
public:
RemoveRange() : _min(4), _max(8) {}
- void run() {
- const ServiceContext::UniqueOperationContext opCtxPtr = cc().makeOperationContext();
- OperationContext& opCtx = *opCtxPtr;
- DBDirectClient client(&opCtx);
-
- for (int i = 0; i < 10; ++i) {
- client.insert(ns, BSON("_id" << i));
- }
-
- {
- // Remove _id range [_min, _max).
- Lock::DBLock lk(&opCtx, nsToDatabaseSubstring(ns), MODE_X);
- OldClientContext ctx(&opCtx, ns);
-
- KeyRange range(ns, BSON("_id" << _min), BSON("_id" << _max), BSON("_id" << 1));
- mongo::WriteConcernOptions dummyWriteConcern;
- Helpers::removeRange(
- &opCtx, range, BoundInclusion::kIncludeStartKeyOnly, dummyWriteConcern);
- }
-
- // Check that the expected documents remain.
- ASSERT_BSONOBJ_EQ(expected(), docs(&opCtx));
- }
+ void run() {}
private:
BSONArray expected() const {
diff --git a/src/mongo/s/catalog/type_chunk.cpp b/src/mongo/s/catalog/type_chunk.cpp
index a114f86a681..eb286b7a631 100644
--- a/src/mongo/s/catalog/type_chunk.cpp
+++ b/src/mongo/s/catalog/type_chunk.cpp
@@ -133,6 +133,28 @@ bool ChunkRange::operator!=(const ChunkRange& other) const {
return !(*this == other);
}
+bool ChunkRange::covers(ChunkRange const& other) const {
+ auto le = [](auto const& a, auto const& b) { return a.woCompare(b) <= 0; };
+ return le(_minKey, other._minKey) && le(other._maxKey, _maxKey);
+}
+
+boost::optional<ChunkRange> ChunkRange::overlapWith(ChunkRange const& other) const {
+ auto le = [](auto const& a, auto const& b) { return a.woCompare(b) <= 0; };
+ if (le(other._maxKey, _minKey) || le(_maxKey, other._minKey)) {
+ return boost::none;
+ }
+ return ChunkRange(le(_minKey, other._minKey) ? other._minKey : _minKey,
+ le(_maxKey, other._maxKey) ? _maxKey : other._maxKey);
+}
+
+ChunkRange ChunkRange::unionWith(ChunkRange const& other) const {
+ auto le = [](auto const& a, auto const& b) { return a.woCompare(b) <= 0; };
+ return ChunkRange(le(_minKey, other._minKey) ? _minKey : other._minKey,
+ le(_maxKey, other._maxKey) ? other._maxKey : _maxKey);
+}
+
+// ChunkType
+
ChunkType::ChunkType() = default;
ChunkType::ChunkType(NamespaceString nss, ChunkRange range, ChunkVersion version, ShardId shardId)
diff --git a/src/mongo/s/catalog/type_chunk.h b/src/mongo/s/catalog/type_chunk.h
index 9827b5a3a79..96ee1588a6a 100644
--- a/src/mongo/s/catalog/type_chunk.h
+++ b/src/mongo/s/catalog/type_chunk.h
@@ -83,9 +83,25 @@ public:
bool operator==(const ChunkRange& other) const;
bool operator!=(const ChunkRange& other) const;
+ /**
+ * Returns true iff the union of *this and the argument range is the same as *this.
+ */
+ bool covers(ChunkRange const& other) const;
+
+ /**
+ * Returns the range of overlap between *this and other, if any.
+ */
+ boost::optional<ChunkRange> overlapWith(ChunkRange const& other) const;
+
+ /**
+ * Returns a range that includes *this and other. If the ranges do not overlap, it includes
+ * all the space between, as well.
+ */
+ ChunkRange unionWith(ChunkRange const& other) const;
+
private:
- BSONObj _minKey;
- BSONObj _maxKey;
+ const BSONObj _minKey;
+ const BSONObj _maxKey;
};
/**
diff --git a/src/mongo/s/catalog/type_chunk_test.cpp b/src/mongo/s/catalog/type_chunk_test.cpp
index 4589e4d4c38..eabce7ed879 100644
--- a/src/mongo/s/catalog/type_chunk_test.cpp
+++ b/src/mongo/s/catalog/type_chunk_test.cpp
@@ -245,6 +245,65 @@ TEST(ChunkRange, BasicBSONParsing) {
ASSERT_BSONOBJ_EQ(BSON("x" << 10), chunkRange.getMax());
}
+TEST(ChunkRange, Covers) {
+ auto target = ChunkRange(BSON("x" << 5), BSON("x" << 10));
+ ASSERT(!target.covers(ChunkRange(BSON("x" << 0), BSON("x" << 5))));
+ ASSERT(!target.covers(ChunkRange(BSON("x" << 10), BSON("x" << 15))));
+ ASSERT(!target.covers(ChunkRange(BSON("x" << 0), BSON("x" << 7))));
+ ASSERT(!target.covers(ChunkRange(BSON("x" << 7), BSON("x" << 15))));
+ ASSERT(!target.covers(ChunkRange(BSON("x" << 0), BSON("x" << 15))));
+ ASSERT(!target.covers(ChunkRange(BSON("x" << 0), BSON("x" << 10))));
+ ASSERT(!target.covers(ChunkRange(BSON("x" << 5), BSON("x" << 15))));
+ ASSERT(target.covers(ChunkRange(BSON("x" << 5), BSON("x" << 10))));
+ ASSERT(target.covers(ChunkRange(BSON("x" << 6), BSON("x" << 10))));
+ ASSERT(target.covers(ChunkRange(BSON("x" << 5), BSON("x" << 9))));
+ ASSERT(target.covers(ChunkRange(BSON("x" << 6), BSON("x" << 9))));
+}
+
+TEST(ChunkRange, Overlap) {
+ auto target = ChunkRange(BSON("x" << 5), BSON("x" << 10));
+ ASSERT(!target.overlapWith(ChunkRange(BSON("x" << 0), BSON("x" << 5))));
+ ASSERT(!target.overlapWith(ChunkRange(BSON("x" << 0), BSON("x" << 4))));
+ ASSERT(!target.overlapWith(ChunkRange(BSON("x" << 10), BSON("x" << 15))));
+ ASSERT(!target.overlapWith(ChunkRange(BSON("x" << 11), BSON("x" << 15))));
+ ASSERT(ChunkRange(BSON("x" << 7), BSON("x" << 10)) ==
+ *target.overlapWith(ChunkRange(BSON("x" << 7), BSON("x" << 15))));
+ ASSERT(ChunkRange(BSON("x" << 5), BSON("x" << 10)) ==
+ *target.overlapWith(ChunkRange(BSON("x" << 0), BSON("x" << 10))));
+ ASSERT(ChunkRange(BSON("x" << 5), BSON("x" << 10)) ==
+ *target.overlapWith(ChunkRange(BSON("x" << 0), BSON("x" << 15))));
+ ASSERT(ChunkRange(BSON("x" << 5), BSON("x" << 10)) ==
+ *target.overlapWith(ChunkRange(BSON("x" << 5), BSON("x" << 15))));
+ ASSERT(ChunkRange(BSON("x" << 5), BSON("x" << 9)) ==
+ *target.overlapWith(ChunkRange(BSON("x" << 0), BSON("x" << 9))));
+ ASSERT(ChunkRange(BSON("x" << 9), BSON("x" << 10)) ==
+ *target.overlapWith(ChunkRange(BSON("x" << 9), BSON("x" << 15))));
+}
+
+TEST(ChunkRange, Union) {
+ auto target = ChunkRange(BSON("x" << 5), BSON("x" << 10));
+ ASSERT(ChunkRange(BSON("x" << 0), BSON("x" << 10)) ==
+ target.unionWith(ChunkRange(BSON("x" << 0), BSON("x" << 5))));
+ ASSERT(ChunkRange(BSON("x" << 0), BSON("x" << 10)) ==
+ target.unionWith(ChunkRange(BSON("x" << 0), BSON("x" << 4))));
+ ASSERT(ChunkRange(BSON("x" << 5), BSON("x" << 15)) ==
+ target.unionWith(ChunkRange(BSON("x" << 10), BSON("x" << 15))));
+ ASSERT(ChunkRange(BSON("x" << 5), BSON("x" << 15)) ==
+ target.unionWith(ChunkRange(BSON("x" << 11), BSON("x" << 15))));
+ ASSERT(ChunkRange(BSON("x" << 5), BSON("x" << 15)) ==
+ target.unionWith(ChunkRange(BSON("x" << 7), BSON("x" << 15))));
+ ASSERT(ChunkRange(BSON("x" << 0), BSON("x" << 10)) ==
+ target.unionWith(ChunkRange(BSON("x" << 0), BSON("x" << 10))));
+ ASSERT(ChunkRange(BSON("x" << 0), BSON("x" << 14)) ==
+ target.unionWith(ChunkRange(BSON("x" << 0), BSON("x" << 14))));
+ ASSERT(ChunkRange(BSON("x" << 5), BSON("x" << 15)) ==
+ target.unionWith(ChunkRange(BSON("x" << 5), BSON("x" << 15))));
+ ASSERT(ChunkRange(BSON("x" << 0), BSON("x" << 10)) ==
+ target.unionWith(ChunkRange(BSON("x" << 0), BSON("x" << 9))));
+ ASSERT(ChunkRange(BSON("x" << 5), BSON("x" << 15)) ==
+ target.unionWith(ChunkRange(BSON("x" << 9), BSON("x" << 15))));
+}
+
TEST(ChunkRange, MinGreaterThanMaxShouldError) {
auto parseStatus =
ChunkRange::fromBSON(BSON("min" << BSON("x" << 10) << "max" << BSON("x" << 0)));