summaryrefslogtreecommitdiff
path: root/src/mongo/db/dbhelpers.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/dbhelpers.cpp')
-rw-r--r--src/mongo/db/dbhelpers.cpp927
1 files changed, 452 insertions, 475 deletions
diff --git a/src/mongo/db/dbhelpers.cpp b/src/mongo/db/dbhelpers.cpp
index ae5444b8c1a..e5e198d7ff0 100644
--- a/src/mongo/db/dbhelpers.cpp
+++ b/src/mongo/db/dbhelpers.cpp
@@ -68,558 +68,535 @@
namespace mongo {
- using std::unique_ptr;
- using std::endl;
- using std::ios_base;
- using std::ofstream;
- using std::set;
- using std::string;
- using std::stringstream;
-
- using logger::LogComponent;
-
- void Helpers::ensureIndex(OperationContext* txn,
- Collection* collection,
- BSONObj keyPattern,
- bool unique,
- const char *name) {
- BSONObjBuilder b;
- b.append("name", name);
- b.append("ns", collection->ns());
- b.append("key", keyPattern);
- b.appendBool("unique", unique);
- BSONObj o = b.done();
-
- MultiIndexBlock indexer(txn, collection);
-
- Status status = indexer.init(o);
- if ( status.code() == ErrorCodes::IndexAlreadyExists )
- return;
- uassertStatusOK( status );
-
- uassertStatusOK(indexer.insertAllDocumentsInCollection());
-
- WriteUnitOfWork wunit(txn);
- indexer.commit();
- wunit.commit();
- }
+using std::unique_ptr;
+using std::endl;
+using std::ios_base;
+using std::ofstream;
+using std::set;
+using std::string;
+using std::stringstream;
+
+using logger::LogComponent;
+
+void Helpers::ensureIndex(OperationContext* txn,
+ Collection* collection,
+ BSONObj keyPattern,
+ bool unique,
+ const char* name) {
+ BSONObjBuilder b;
+ b.append("name", name);
+ b.append("ns", collection->ns());
+ b.append("key", keyPattern);
+ b.appendBool("unique", unique);
+ BSONObj o = b.done();
+
+ MultiIndexBlock indexer(txn, collection);
+
+ Status status = indexer.init(o);
+ if (status.code() == ErrorCodes::IndexAlreadyExists)
+ return;
+ uassertStatusOK(status);
+
+ uassertStatusOK(indexer.insertAllDocumentsInCollection());
+
+ WriteUnitOfWork wunit(txn);
+ indexer.commit();
+ wunit.commit();
+}
+
+/* fetch a single object from collection ns that matches query
+ set your db SavedContext first
+*/
+bool Helpers::findOne(OperationContext* txn,
+ Collection* collection,
+ const BSONObj& query,
+ BSONObj& result,
+ bool requireIndex) {
+ RecordId loc = findOne(txn, collection, query, requireIndex);
+ if (loc.isNull())
+ return false;
+ result = collection->docFor(txn, loc).value();
+ return true;
+}
- /* fetch a single object from collection ns that matches query
- set your db SavedContext first
- */
- bool Helpers::findOne(OperationContext* txn,
- Collection* collection,
- const BSONObj &query,
- BSONObj& result,
+/* fetch a single object from collection ns that matches query
+ set your db SavedContext first
+*/
+RecordId Helpers::findOne(OperationContext* txn,
+ Collection* collection,
+ const BSONObj& query,
bool requireIndex) {
- RecordId loc = findOne( txn, collection, query, requireIndex );
- if ( loc.isNull() )
- return false;
- result = collection->docFor(txn, loc).value();
- return true;
- }
-
- /* fetch a single object from collection ns that matches query
- set your db SavedContext first
- */
- RecordId Helpers::findOne(OperationContext* txn,
- Collection* collection,
- const BSONObj &query,
- bool requireIndex) {
- if ( !collection )
- return RecordId();
+ if (!collection)
+ return RecordId();
- CanonicalQuery* cq;
- const WhereCallbackReal whereCallback(txn, collection->ns().db());
+ CanonicalQuery* cq;
+ const WhereCallbackReal whereCallback(txn, collection->ns().db());
- massert(17244, "Could not canonicalize " + query.toString(),
+ massert(17244,
+ "Could not canonicalize " + query.toString(),
CanonicalQuery::canonicalize(collection->ns(), query, &cq, whereCallback).isOK());
- PlanExecutor* rawExec;
- size_t options = requireIndex ? QueryPlannerParams::NO_TABLE_SCAN : QueryPlannerParams::DEFAULT;
- massert(17245, "Could not get executor for query " + query.toString(),
- getExecutor(txn,
- collection,
- cq,
- PlanExecutor::YIELD_MANUAL,
- &rawExec,
- options).isOK());
-
- unique_ptr<PlanExecutor> exec(rawExec);
- PlanExecutor::ExecState state;
- RecordId loc;
- if (PlanExecutor::ADVANCED == (state = exec->getNext(NULL, &loc))) {
- return loc;
- }
- return RecordId();
+ PlanExecutor* rawExec;
+ size_t options = requireIndex ? QueryPlannerParams::NO_TABLE_SCAN : QueryPlannerParams::DEFAULT;
+ massert(17245,
+ "Could not get executor for query " + query.toString(),
+ getExecutor(txn, collection, cq, PlanExecutor::YIELD_MANUAL, &rawExec, options).isOK());
+
+ unique_ptr<PlanExecutor> exec(rawExec);
+ PlanExecutor::ExecState state;
+ RecordId loc;
+ if (PlanExecutor::ADVANCED == (state = exec->getNext(NULL, &loc))) {
+ return loc;
+ }
+ return RecordId();
+}
+
+bool Helpers::findById(OperationContext* txn,
+ Database* database,
+ const char* ns,
+ BSONObj query,
+ BSONObj& result,
+ bool* nsFound,
+ bool* indexFound) {
+ invariant(database);
+
+ Collection* collection = database->getCollection(ns);
+ if (!collection) {
+ return false;
}
- bool Helpers::findById(OperationContext* txn,
- Database* database,
- const char *ns,
- BSONObj query,
- BSONObj& result,
- bool* nsFound,
- bool* indexFound) {
-
- invariant(database);
-
- Collection* collection = database->getCollection( ns );
- if ( !collection ) {
- return false;
- }
-
- if ( nsFound )
- *nsFound = true;
+ if (nsFound)
+ *nsFound = true;
- IndexCatalog* catalog = collection->getIndexCatalog();
- const IndexDescriptor* desc = catalog->findIdIndex( txn );
+ IndexCatalog* catalog = collection->getIndexCatalog();
+ const IndexDescriptor* desc = catalog->findIdIndex(txn);
- if ( !desc )
- return false;
+ if (!desc)
+ return false;
- if ( indexFound )
- *indexFound = 1;
+ if (indexFound)
+ *indexFound = 1;
- RecordId loc = catalog->getIndex(desc)->findSingle( txn, query["_id"].wrap() );
- if ( loc.isNull() )
- return false;
- result = collection->docFor(txn, loc).value();
+ RecordId loc = catalog->getIndex(desc)->findSingle(txn, query["_id"].wrap());
+ if (loc.isNull())
+ return false;
+ result = collection->docFor(txn, loc).value();
+ return true;
+}
+
+RecordId Helpers::findById(OperationContext* txn, Collection* collection, const BSONObj& idquery) {
+ verify(collection);
+ IndexCatalog* catalog = collection->getIndexCatalog();
+ const IndexDescriptor* desc = catalog->findIdIndex(txn);
+ uassert(13430, "no _id index", desc);
+ return catalog->getIndex(desc)->findSingle(txn, idquery["_id"].wrap());
+}
+
+bool Helpers::getSingleton(OperationContext* txn, const char* ns, BSONObj& result) {
+ AutoGetCollectionForRead ctx(txn, ns);
+ unique_ptr<PlanExecutor> exec(InternalPlanner::collectionScan(txn, ns, ctx.getCollection()));
+ PlanExecutor::ExecState state = exec->getNext(&result, NULL);
+
+ CurOp::get(txn)->done();
+
+ if (PlanExecutor::ADVANCED == state) {
+ result = result.getOwned();
return true;
}
+ return false;
+}
- RecordId Helpers::findById(OperationContext* txn,
- Collection* collection,
- const BSONObj& idquery) {
- verify(collection);
- IndexCatalog* catalog = collection->getIndexCatalog();
- const IndexDescriptor* desc = catalog->findIdIndex( txn );
- uassert(13430, "no _id index", desc);
- return catalog->getIndex(desc)->findSingle( txn, idquery["_id"].wrap() );
- }
-
- bool Helpers::getSingleton(OperationContext* txn, const char *ns, BSONObj& result) {
- AutoGetCollectionForRead ctx(txn, ns);
- unique_ptr<PlanExecutor> exec(InternalPlanner::collectionScan(txn, ns, ctx.getCollection()));
- PlanExecutor::ExecState state = exec->getNext(&result, NULL);
-
- CurOp::get(txn)->done();
+bool Helpers::getLast(OperationContext* txn, const char* ns, BSONObj& result) {
+ AutoGetCollectionForRead autoColl(txn, ns);
+ unique_ptr<PlanExecutor> exec(InternalPlanner::collectionScan(
+ txn, ns, autoColl.getCollection(), InternalPlanner::BACKWARD));
+ PlanExecutor::ExecState state = exec->getNext(&result, NULL);
- if (PlanExecutor::ADVANCED == state) {
- result = result.getOwned();
- return true;
- }
- return false;
+ if (PlanExecutor::ADVANCED == state) {
+ result = result.getOwned();
+ return true;
}
+ return false;
+}
- bool Helpers::getLast(OperationContext* txn, const char *ns, BSONObj& result) {
- AutoGetCollectionForRead autoColl(txn, ns);
- unique_ptr<PlanExecutor> exec(InternalPlanner::collectionScan(txn,
- ns,
- autoColl.getCollection(),
- InternalPlanner::BACKWARD));
- PlanExecutor::ExecState state = exec->getNext(&result, NULL);
-
- if (PlanExecutor::ADVANCED == state) {
- result = result.getOwned();
- return true;
- }
- return false;
- }
+void Helpers::upsert(OperationContext* txn, const string& ns, const BSONObj& o, bool fromMigrate) {
+ BSONElement e = o["_id"];
+ verify(e.type());
+ BSONObj id = e.wrap();
- void Helpers::upsert( OperationContext* txn,
- const string& ns,
- const BSONObj& o,
- bool fromMigrate ) {
- BSONElement e = o["_id"];
- verify( e.type() );
- BSONObj id = e.wrap();
+ OpDebug debug;
+ OldClientContext context(txn, ns);
- OpDebug debug;
- OldClientContext context(txn, ns);
+ const NamespaceString requestNs(ns);
+ UpdateRequest request(requestNs);
- const NamespaceString requestNs(ns);
- UpdateRequest request(requestNs);
+ request.setQuery(id);
+ request.setUpdates(o);
+ request.setUpsert();
+ request.setFromMigration(fromMigrate);
+ UpdateLifecycleImpl updateLifecycle(true, requestNs);
+ request.setLifecycle(&updateLifecycle);
- request.setQuery(id);
- request.setUpdates(o);
- request.setUpsert();
- request.setFromMigration(fromMigrate);
- UpdateLifecycleImpl updateLifecycle(true, requestNs);
- request.setLifecycle(&updateLifecycle);
+ update(txn, context.db(), request, &debug);
+}
- update(txn, context.db(), request, &debug);
- }
+void Helpers::putSingleton(OperationContext* txn, const char* ns, BSONObj obj) {
+ OpDebug debug;
+ OldClientContext context(txn, ns);
- void Helpers::putSingleton(OperationContext* txn, const char *ns, BSONObj obj) {
- OpDebug debug;
- OldClientContext context(txn, ns);
+ const NamespaceString requestNs(ns);
+ UpdateRequest request(requestNs);
- const NamespaceString requestNs(ns);
- UpdateRequest request(requestNs);
+ request.setUpdates(obj);
+ request.setUpsert();
+ UpdateLifecycleImpl updateLifecycle(true, requestNs);
+ request.setLifecycle(&updateLifecycle);
- request.setUpdates(obj);
- request.setUpsert();
- UpdateLifecycleImpl updateLifecycle(true, requestNs);
- request.setLifecycle(&updateLifecycle);
+ update(txn, context.db(), request, &debug);
- update(txn, context.db(), request, &debug);
+ CurOp::get(txn)->done();
+}
- CurOp::get(txn)->done();
+BSONObj Helpers::toKeyFormat(const BSONObj& o) {
+ BSONObjBuilder keyObj(o.objsize());
+ BSONForEach(e, o) {
+ keyObj.appendAs(e, "");
}
+ return keyObj.obj();
+}
- BSONObj Helpers::toKeyFormat( const BSONObj& o ) {
- BSONObjBuilder keyObj( o.objsize() );
- BSONForEach( e , o ) {
- keyObj.appendAs( e , "" );
- }
- return keyObj.obj();
+BSONObj Helpers::inferKeyPattern(const BSONObj& o) {
+ BSONObjBuilder kpBuilder;
+ BSONForEach(e, o) {
+ kpBuilder.append(e.fieldName(), 1);
+ }
+ return kpBuilder.obj();
+}
+
+static bool findShardKeyIndexPattern(OperationContext* txn,
+ const string& ns,
+ const BSONObj& shardKeyPattern,
+ BSONObj* indexPattern) {
+ AutoGetCollectionForRead ctx(txn, ns);
+ Collection* collection = ctx.getCollection();
+ if (!collection) {
+ return false;
}
- BSONObj Helpers::inferKeyPattern( const BSONObj& o ) {
- BSONObjBuilder kpBuilder;
- BSONForEach( e , o ) {
- kpBuilder.append( e.fieldName() , 1 );
- }
- return kpBuilder.obj();
+ // Allow multiKey based on the invariant that shard keys must be single-valued.
+ // Therefore, any multi-key index prefixed by shard key cannot be multikey over
+ // the shard key fields.
+ const IndexDescriptor* idx =
+ collection->getIndexCatalog()->findShardKeyPrefixedIndex(txn,
+ shardKeyPattern,
+ false); // requireSingleKey
+
+ if (idx == NULL)
+ return false;
+ *indexPattern = idx->keyPattern().getOwned();
+ return true;
+}
+
+long long Helpers::removeRange(OperationContext* txn,
+ const KeyRange& range,
+ bool maxInclusive,
+ const WriteConcernOptions& writeConcern,
+ RemoveSaver* callback,
+ bool fromMigrate,
+ bool onlyRemoveOrphanedDocs) {
+ Timer rangeRemoveTimer;
+ const string& ns = range.ns;
+
+ // The IndexChunk has a keyPattern that may apply to more than one index - we need to
+ // select the index and get the full index keyPattern here.
+ BSONObj indexKeyPatternDoc;
+ if (!findShardKeyIndexPattern(txn, ns, range.keyPattern, &indexKeyPatternDoc)) {
+ warning(LogComponent::kSharding) << "no index found to clean data over range of type "
+ << range.keyPattern << " in " << ns << endl;
+ return -1;
}
- static bool findShardKeyIndexPattern(OperationContext* txn,
- const string& ns,
- const BSONObj& shardKeyPattern,
- BSONObj* indexPattern ) {
+ KeyPattern indexKeyPattern(indexKeyPatternDoc);
- AutoGetCollectionForRead ctx(txn, ns);
- Collection* collection = ctx.getCollection();
- if (!collection) {
- return false;
- }
+ // Extend bounds to match the index we found
- // Allow multiKey based on the invariant that shard keys must be single-valued.
- // Therefore, any multi-key index prefixed by shard key cannot be multikey over
- // the shard key fields.
- const IndexDescriptor* idx =
- collection->getIndexCatalog()->findShardKeyPrefixedIndex(txn,
- shardKeyPattern,
- false); // requireSingleKey
-
- if ( idx == NULL )
- return false;
- *indexPattern = idx->keyPattern().getOwned();
- return true;
- }
+ // Extend min to get (min, MinKey, MinKey, ....)
+ const BSONObj& min =
+ Helpers::toKeyFormat(indexKeyPattern.extendRangeBound(range.minKey, false));
+ // If upper bound is included, extend max to get (max, MaxKey, MaxKey, ...)
+ // If not included, extend max to get (max, MinKey, MinKey, ....)
+ const BSONObj& max =
+ Helpers::toKeyFormat(indexKeyPattern.extendRangeBound(range.maxKey, maxInclusive));
+
+ MONGO_LOG_COMPONENT(1, LogComponent::kSharding)
+ << "begin removal of " << min << " to " << max << " in " << ns
+ << " with write concern: " << writeConcern.toBSON() << endl;
+
+ long long numDeleted = 0;
- long long Helpers::removeRange( OperationContext* txn,
- const KeyRange& range,
- bool maxInclusive,
- const WriteConcernOptions& writeConcern,
- RemoveSaver* callback,
- bool fromMigrate,
- bool onlyRemoveOrphanedDocs )
- {
- Timer rangeRemoveTimer;
- const string& ns = range.ns;
-
- // The IndexChunk has a keyPattern that may apply to more than one index - we need to
- // select the index and get the full index keyPattern here.
- BSONObj indexKeyPatternDoc;
- if ( !findShardKeyIndexPattern( txn,
- ns,
- range.keyPattern,
- &indexKeyPatternDoc ) )
+ Milliseconds millisWaitingForReplication{0};
+
+ while (1) {
+ // Scoping for write lock.
{
- warning(LogComponent::kSharding) << "no index found to clean data over range of type "
- << range.keyPattern << " in " << ns << endl;
- return -1;
- }
+ OldClientWriteContext ctx(txn, ns);
+ Collection* collection = ctx.getCollection();
+ if (!collection)
+ break;
+
+ IndexDescriptor* desc =
+ collection->getIndexCatalog()->findIndexByKeyPattern(txn, indexKeyPattern.toBSON());
+
+ unique_ptr<PlanExecutor> exec(
+ InternalPlanner::indexScan(txn,
+ collection,
+ desc,
+ min,
+ max,
+ maxInclusive,
+ InternalPlanner::FORWARD,
+ InternalPlanner::IXSCAN_FETCH));
+ exec->setYieldPolicy(PlanExecutor::YIELD_AUTO);
+
+ RecordId rloc;
+ BSONObj obj;
+ PlanExecutor::ExecState state;
+ // This may yield so we cannot touch nsd after this.
+ state = exec->getNext(&obj, &rloc);
+ exec.reset();
+ if (PlanExecutor::IS_EOF == state) {
+ break;
+ }
- KeyPattern indexKeyPattern( indexKeyPatternDoc );
-
- // Extend bounds to match the index we found
-
- // Extend min to get (min, MinKey, MinKey, ....)
- const BSONObj& min =
- Helpers::toKeyFormat(indexKeyPattern.extendRangeBound(range.minKey,
- false));
- // If upper bound is included, extend max to get (max, MaxKey, MaxKey, ...)
- // If not included, extend max to get (max, MinKey, MinKey, ....)
- const BSONObj& max =
- Helpers::toKeyFormat( indexKeyPattern.extendRangeBound(range.maxKey,maxInclusive));
-
- MONGO_LOG_COMPONENT(1, LogComponent::kSharding)
- << "begin removal of " << min << " to " << max << " in " << ns
- << " with write concern: " << writeConcern.toBSON() << endl;
-
- long long numDeleted = 0;
-
- Milliseconds millisWaitingForReplication{0};
-
- while ( 1 ) {
- // Scoping for write lock.
- {
- OldClientWriteContext ctx(txn, ns);
- Collection* collection = ctx.getCollection();
- if ( !collection )
- break;
+ if (PlanExecutor::FAILURE == state || PlanExecutor::DEAD == state) {
+ const std::unique_ptr<PlanStageStats> stats(exec->getStats());
+ warning(LogComponent::kSharding)
+ << PlanExecutor::statestr(state) << " - cursor error while trying to delete "
+ << min << " to " << max << " in " << ns << ": "
+ << WorkingSetCommon::toStatusString(obj)
+ << ", stats: " << Explain::statsToBSON(*stats) << endl;
+ break;
+ }
- IndexDescriptor* desc =
- collection->getIndexCatalog()->findIndexByKeyPattern( txn,
- indexKeyPattern.toBSON() );
-
- unique_ptr<PlanExecutor> exec(InternalPlanner::indexScan(txn, collection, desc,
- min, max,
- maxInclusive,
- InternalPlanner::FORWARD,
- InternalPlanner::IXSCAN_FETCH));
- exec->setYieldPolicy(PlanExecutor::YIELD_AUTO);
-
- RecordId rloc;
- BSONObj obj;
- PlanExecutor::ExecState state;
- // This may yield so we cannot touch nsd after this.
- state = exec->getNext(&obj, &rloc);
- exec.reset();
- if (PlanExecutor::IS_EOF == state) { break; }
-
- if (PlanExecutor::FAILURE == state || PlanExecutor::DEAD == state) {
- const std::unique_ptr<PlanStageStats> stats(exec->getStats());
- warning(LogComponent::kSharding) << PlanExecutor::statestr(state)
- << " - cursor error while trying to delete "
- << min << " to " << max
- << " in " << ns << ": "
- << WorkingSetCommon::toStatusString(obj) << ", stats: "
- << Explain::statsToBSON(*stats) << endl;
- break;
- }
+ verify(PlanExecutor::ADVANCED == state);
- verify(PlanExecutor::ADVANCED == state);
-
- WriteUnitOfWork wuow(txn);
-
- if ( onlyRemoveOrphanedDocs ) {
- // Do a final check in the write lock to make absolutely sure that our
- // collection hasn't been modified in a way that invalidates our migration
- // cleanup.
-
- // We should never be able to turn off the sharding state once enabled, but
- // in the future we might want to.
- verify(shardingState.enabled());
-
- // In write lock, so will be the most up-to-date version
- CollectionMetadataPtr metadataNow = shardingState.getCollectionMetadata( ns );
-
- bool docIsOrphan;
- if ( metadataNow ) {
- ShardKeyPattern kp( metadataNow->getKeyPattern() );
- BSONObj key = kp.extractShardKeyFromDoc(obj);
- docIsOrphan = !metadataNow->keyBelongsToMe( key )
- && !metadataNow->keyIsPending( key );
- }
- else {
- docIsOrphan = false;
- }
-
- if ( !docIsOrphan ) {
- warning(LogComponent::kSharding)
- << "aborting migration cleanup for chunk " << min << " to " << max
- << ( metadataNow ? (string) " at document " + obj.toString() : "" )
- << ", collection " << ns << " has changed " << endl;
- break;
- }
- }
+ WriteUnitOfWork wuow(txn);
- NamespaceString nss(ns);
- if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(nss)) {
- warning() << "stepped down from primary while deleting chunk; "
- << "orphaning data in " << ns
- << " in range [" << min << ", " << max << ")";
- return numDeleted;
- }
+ if (onlyRemoveOrphanedDocs) {
+ // Do a final check in the write lock to make absolutely sure that our
+ // collection hasn't been modified in a way that invalidates our migration
+ // cleanup.
- if ( callback )
- callback->goingToDelete( obj );
+ // We should never be able to turn off the sharding state once enabled, but
+ // in the future we might want to.
+ verify(shardingState.enabled());
- BSONObj deletedId;
- collection->deleteDocument( txn, rloc, false, false, &deletedId );
- wuow.commit();
- numDeleted++;
- }
+ // In write lock, so will be the most up-to-date version
+ CollectionMetadataPtr metadataNow = shardingState.getCollectionMetadata(ns);
- // TODO remove once the yielding below that references this timer has been removed
- Timer secondaryThrottleTime;
+ bool docIsOrphan;
+ if (metadataNow) {
+ ShardKeyPattern kp(metadataNow->getKeyPattern());
+ BSONObj key = kp.extractShardKeyFromDoc(obj);
+ docIsOrphan =
+ !metadataNow->keyBelongsToMe(key) && !metadataNow->keyIsPending(key);
+ } else {
+ docIsOrphan = false;
+ }
- if (writeConcern.shouldWaitForOtherNodes() && numDeleted > 0) {
- repl::ReplicationCoordinator::StatusAndDuration replStatus =
- repl::getGlobalReplicationCoordinator()->awaitReplication(
- txn,
- repl::ReplClientInfo::forClient(txn->getClient()).getLastOp(),
- writeConcern);
- if (replStatus.status.code() == ErrorCodes::ExceededTimeLimit) {
+ if (!docIsOrphan) {
warning(LogComponent::kSharding)
- << "replication to secondaries for removeRange at "
- "least 60 seconds behind";
- }
- else {
- massertStatusOK(replStatus.status);
+ << "aborting migration cleanup for chunk " << min << " to " << max
+ << (metadataNow ? (string) " at document " + obj.toString() : "")
+ << ", collection " << ns << " has changed " << endl;
+ break;
}
- millisWaitingForReplication += replStatus.duration;
}
- }
-
- if (writeConcern.shouldWaitForOtherNodes())
- log(LogComponent::kSharding)
- << "Helpers::removeRangeUnlocked time spent waiting for replication: "
- << durationCount<Milliseconds>(millisWaitingForReplication) << "ms" << endl;
-
- MONGO_LOG_COMPONENT(1, LogComponent::kSharding)
- << "end removal of " << min << " to " << max << " in " << ns
- << " (took " << rangeRemoveTimer.millis() << "ms)" << endl;
-
- return numDeleted;
- }
- const long long Helpers::kMaxDocsPerChunk( 250000 );
-
- // Used by migration clone step
- // TODO: Cannot hook up quite yet due to _trackerLocks in shared migration code.
- // TODO: This function is not used outside of tests
- Status Helpers::getLocsInRange( OperationContext* txn,
- const KeyRange& range,
- long long maxChunkSizeBytes,
- set<RecordId>* locs,
- long long* numDocs,
- long long* estChunkSizeBytes )
- {
- const string ns = range.ns;
- *estChunkSizeBytes = 0;
- *numDocs = 0;
-
- AutoGetCollectionForRead ctx(txn, ns);
-
- Collection* collection = ctx.getCollection();
- if (!collection) {
- return Status(ErrorCodes::NamespaceNotFound, ns);
- }
+ NamespaceString nss(ns);
+ if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(nss)) {
+ warning() << "stepped down from primary while deleting chunk; "
+ << "orphaning data in " << ns << " in range [" << min << ", " << max
+ << ")";
+ return numDeleted;
+ }
- // Require single key
- IndexDescriptor *idx =
- collection->getIndexCatalog()->findShardKeyPrefixedIndex( txn, range.keyPattern, true );
+ if (callback)
+ callback->goingToDelete(obj);
- if ( idx == NULL ) {
- return Status( ErrorCodes::IndexNotFound, range.keyPattern.toString() );
+ BSONObj deletedId;
+ collection->deleteDocument(txn, rloc, false, false, &deletedId);
+ wuow.commit();
+ numDeleted++;
}
- // use the average object size to estimate how many objects a full chunk would carry
- // do that while traversing the chunk's range using the sharding index, below
- // there's a fair amount of slack before we determine a chunk is too large because object
- // sizes will vary
- long long avgDocsWhenFull;
- long long avgDocSizeBytes;
- const long long totalDocsInNS = collection->numRecords( txn );
- if ( totalDocsInNS > 0 ) {
- // TODO: Figure out what's up here
- avgDocSizeBytes = collection->dataSize( txn ) / totalDocsInNS;
- avgDocsWhenFull = maxChunkSizeBytes / avgDocSizeBytes;
- avgDocsWhenFull = std::min( kMaxDocsPerChunk + 1,
- 130 * avgDocsWhenFull / 100 /* slack */);
- }
- else {
- avgDocSizeBytes = 0;
- avgDocsWhenFull = kMaxDocsPerChunk + 1;
+ // TODO remove once the yielding below that references this timer has been removed
+ Timer secondaryThrottleTime;
+
+ if (writeConcern.shouldWaitForOtherNodes() && numDeleted > 0) {
+ repl::ReplicationCoordinator::StatusAndDuration replStatus =
+ repl::getGlobalReplicationCoordinator()->awaitReplication(
+ txn,
+ repl::ReplClientInfo::forClient(txn->getClient()).getLastOp(),
+ writeConcern);
+ if (replStatus.status.code() == ErrorCodes::ExceededTimeLimit) {
+ warning(LogComponent::kSharding) << "replication to secondaries for removeRange at "
+ "least 60 seconds behind";
+ } else {
+ massertStatusOK(replStatus.status);
+ }
+ millisWaitingForReplication += replStatus.duration;
}
+ }
- // Assume both min and max non-empty, append MinKey's to make them fit chosen index
- KeyPattern idxKeyPattern( idx->keyPattern() );
- BSONObj min = Helpers::toKeyFormat( idxKeyPattern.extendRangeBound( range.minKey, false ) );
- BSONObj max = Helpers::toKeyFormat( idxKeyPattern.extendRangeBound( range.maxKey, false ) );
+ if (writeConcern.shouldWaitForOtherNodes())
+ log(LogComponent::kSharding)
+ << "Helpers::removeRangeUnlocked time spent waiting for replication: "
+ << durationCount<Milliseconds>(millisWaitingForReplication) << "ms" << endl;
+
+ MONGO_LOG_COMPONENT(1, LogComponent::kSharding) << "end removal of " << min << " to " << max
+ << " in " << ns << " (took "
+ << rangeRemoveTimer.millis() << "ms)" << endl;
+
+ return numDeleted;
+}
+
+const long long Helpers::kMaxDocsPerChunk(250000);
+
+// Used by migration clone step
+// TODO: Cannot hook up quite yet due to _trackerLocks in shared migration code.
+// TODO: This function is not used outside of tests
+Status Helpers::getLocsInRange(OperationContext* txn,
+ const KeyRange& range,
+ long long maxChunkSizeBytes,
+ set<RecordId>* locs,
+ long long* numDocs,
+ long long* estChunkSizeBytes) {
+ const string ns = range.ns;
+ *estChunkSizeBytes = 0;
+ *numDocs = 0;
+
+ AutoGetCollectionForRead ctx(txn, ns);
+
+ Collection* collection = ctx.getCollection();
+ if (!collection) {
+ return Status(ErrorCodes::NamespaceNotFound, ns);
+ }
+ // Require single key
+ IndexDescriptor* idx =
+ collection->getIndexCatalog()->findShardKeyPrefixedIndex(txn, range.keyPattern, true);
- // do a full traversal of the chunk and don't stop even if we think it is a large chunk
- // we want the number of records to better report, in that case
- bool isLargeChunk = false;
- long long docCount = 0;
+ if (idx == NULL) {
+ return Status(ErrorCodes::IndexNotFound, range.keyPattern.toString());
+ }
- unique_ptr<PlanExecutor> exec(
- InternalPlanner::indexScan(txn, collection, idx, min, max, false));
- // we can afford to yield here because any change to the base data that we might miss is
- // already being queued and will be migrated in the 'transferMods' stage
- exec->setYieldPolicy(PlanExecutor::YIELD_AUTO);
+ // use the average object size to estimate how many objects a full chunk would carry
+ // do that while traversing the chunk's range using the sharding index, below
+ // there's a fair amount of slack before we determine a chunk is too large because object
+ // sizes will vary
+ long long avgDocsWhenFull;
+ long long avgDocSizeBytes;
+ const long long totalDocsInNS = collection->numRecords(txn);
+ if (totalDocsInNS > 0) {
+ // TODO: Figure out what's up here
+ avgDocSizeBytes = collection->dataSize(txn) / totalDocsInNS;
+ avgDocsWhenFull = maxChunkSizeBytes / avgDocSizeBytes;
+ avgDocsWhenFull = std::min(kMaxDocsPerChunk + 1, 130 * avgDocsWhenFull / 100 /* slack */);
+ } else {
+ avgDocSizeBytes = 0;
+ avgDocsWhenFull = kMaxDocsPerChunk + 1;
+ }
- RecordId loc;
- PlanExecutor::ExecState state;
- while (PlanExecutor::ADVANCED == (state = exec->getNext(NULL, &loc))) {
- if ( !isLargeChunk ) {
- locs->insert( loc );
- }
+ // Assume both min and max non-empty, append MinKey's to make them fit chosen index
+ KeyPattern idxKeyPattern(idx->keyPattern());
+ BSONObj min = Helpers::toKeyFormat(idxKeyPattern.extendRangeBound(range.minKey, false));
+ BSONObj max = Helpers::toKeyFormat(idxKeyPattern.extendRangeBound(range.maxKey, false));
- if ( ++docCount > avgDocsWhenFull ) {
- isLargeChunk = true;
- }
- }
- *numDocs = docCount;
- *estChunkSizeBytes = docCount * avgDocSizeBytes;
+ // do a full traversal of the chunk and don't stop even if we think it is a large chunk
+ // we want the number of records to better report, in that case
+ bool isLargeChunk = false;
+ long long docCount = 0;
- if ( isLargeChunk ) {
- stringstream ss;
- ss << estChunkSizeBytes;
- return Status( ErrorCodes::InvalidLength, ss.str() );
+ unique_ptr<PlanExecutor> exec(
+ InternalPlanner::indexScan(txn, collection, idx, min, max, false));
+ // we can afford to yield here because any change to the base data that we might miss is
+ // already being queued and will be migrated in the 'transferMods' stage
+ exec->setYieldPolicy(PlanExecutor::YIELD_AUTO);
+
+ RecordId loc;
+ PlanExecutor::ExecState state;
+ while (PlanExecutor::ADVANCED == (state = exec->getNext(NULL, &loc))) {
+ if (!isLargeChunk) {
+ locs->insert(loc);
}
- return Status::OK();
+ if (++docCount > avgDocsWhenFull) {
+ isLargeChunk = true;
+ }
}
+ *numDocs = docCount;
+ *estChunkSizeBytes = docCount* avgDocSizeBytes;
- void Helpers::emptyCollection(OperationContext* txn, const char *ns) {
- OldClientContext context(txn, ns);
- bool shouldReplicateWrites = txn->writesAreReplicated();
- txn->setReplicatedWrites(false);
- ON_BLOCK_EXIT(&OperationContext::setReplicatedWrites, txn, shouldReplicateWrites);
- deleteObjects(txn, context.db(), ns, BSONObj(), PlanExecutor::YIELD_MANUAL, false);
+ if (isLargeChunk) {
+ stringstream ss;
+ ss << estChunkSizeBytes;
+ return Status(ErrorCodes::InvalidLength, ss.str());
}
- Helpers::RemoveSaver::RemoveSaver( const string& a , const string& b , const string& why)
- : _out(0) {
- static int NUM = 0;
+ return Status::OK();
+}
- _root = storageGlobalParams.dbpath;
- if ( a.size() )
- _root /= a;
- if ( b.size() )
- _root /= b;
- verify( a.size() || b.size() );
- _file = _root;
+void Helpers::emptyCollection(OperationContext* txn, const char* ns) {
+ OldClientContext context(txn, ns);
+ bool shouldReplicateWrites = txn->writesAreReplicated();
+ txn->setReplicatedWrites(false);
+ ON_BLOCK_EXIT(&OperationContext::setReplicatedWrites, txn, shouldReplicateWrites);
+ deleteObjects(txn, context.db(), ns, BSONObj(), PlanExecutor::YIELD_MANUAL, false);
+}
- stringstream ss;
- ss << why << "." << terseCurrentTime(false) << "." << NUM++ << ".bson";
- _file /= ss.str();
- }
+Helpers::RemoveSaver::RemoveSaver(const string& a, const string& b, const string& why) : _out(0) {
+ static int NUM = 0;
- Helpers::RemoveSaver::~RemoveSaver() {
- if ( _out ) {
- _out->close();
- delete _out;
- _out = 0;
- }
- }
+ _root = storageGlobalParams.dbpath;
+ if (a.size())
+ _root /= a;
+ if (b.size())
+ _root /= b;
+ verify(a.size() || b.size());
- void Helpers::RemoveSaver::goingToDelete( const BSONObj& o ) {
- if ( ! _out ) {
- boost::filesystem::create_directories( _root );
- _out = new ofstream();
- _out->open( _file.string().c_str() , ios_base::out | ios_base::binary );
- if ( ! _out->good() ) {
- error() << "couldn't create file: " << _file.string() <<
- " for remove saving" << endl;
- delete _out;
- _out = 0;
- return;
- }
+ _file = _root;
+ stringstream ss;
+ ss << why << "." << terseCurrentTime(false) << "." << NUM++ << ".bson";
+ _file /= ss.str();
+}
+
+Helpers::RemoveSaver::~RemoveSaver() {
+ if (_out) {
+ _out->close();
+ delete _out;
+ _out = 0;
+ }
+}
+
+void Helpers::RemoveSaver::goingToDelete(const BSONObj& o) {
+ if (!_out) {
+ boost::filesystem::create_directories(_root);
+ _out = new ofstream();
+ _out->open(_file.string().c_str(), ios_base::out | ios_base::binary);
+ if (!_out->good()) {
+ error() << "couldn't create file: " << _file.string() << " for remove saving" << endl;
+ delete _out;
+ _out = 0;
+ return;
}
- _out->write( o.objdata() , o.objsize() );
}
+ _out->write(o.objdata(), o.objsize());
+}
-} // namespace mongo
+} // namespace mongo