diff options
author | Mark Benvenuto <mark.benvenuto@mongodb.com> | 2015-06-20 00:22:50 -0400 |
---|---|---|
committer | Mark Benvenuto <mark.benvenuto@mongodb.com> | 2015-06-20 10:56:02 -0400 |
commit | 9c2ed42daa8fbbef4a919c21ec564e2db55e8d60 (patch) | |
tree | 3814f79c10d7b490948d8cb7b112ac1dd41ceff1 /src/mongo/db/dbhelpers.cpp | |
parent | 01965cf52bce6976637ecb8f4a622aeb05ab256a (diff) | |
download | mongo-9c2ed42daa8fbbef4a919c21ec564e2db55e8d60.tar.gz |
SERVER-18579: Clang-Format - reformat code, no comment reflow
Diffstat (limited to 'src/mongo/db/dbhelpers.cpp')
-rw-r--r-- | src/mongo/db/dbhelpers.cpp | 927 |
1 files changed, 452 insertions, 475 deletions
diff --git a/src/mongo/db/dbhelpers.cpp b/src/mongo/db/dbhelpers.cpp index ae5444b8c1a..e5e198d7ff0 100644 --- a/src/mongo/db/dbhelpers.cpp +++ b/src/mongo/db/dbhelpers.cpp @@ -68,558 +68,535 @@ namespace mongo { - using std::unique_ptr; - using std::endl; - using std::ios_base; - using std::ofstream; - using std::set; - using std::string; - using std::stringstream; - - using logger::LogComponent; - - void Helpers::ensureIndex(OperationContext* txn, - Collection* collection, - BSONObj keyPattern, - bool unique, - const char *name) { - BSONObjBuilder b; - b.append("name", name); - b.append("ns", collection->ns()); - b.append("key", keyPattern); - b.appendBool("unique", unique); - BSONObj o = b.done(); - - MultiIndexBlock indexer(txn, collection); - - Status status = indexer.init(o); - if ( status.code() == ErrorCodes::IndexAlreadyExists ) - return; - uassertStatusOK( status ); - - uassertStatusOK(indexer.insertAllDocumentsInCollection()); - - WriteUnitOfWork wunit(txn); - indexer.commit(); - wunit.commit(); - } +using std::unique_ptr; +using std::endl; +using std::ios_base; +using std::ofstream; +using std::set; +using std::string; +using std::stringstream; + +using logger::LogComponent; + +void Helpers::ensureIndex(OperationContext* txn, + Collection* collection, + BSONObj keyPattern, + bool unique, + const char* name) { + BSONObjBuilder b; + b.append("name", name); + b.append("ns", collection->ns()); + b.append("key", keyPattern); + b.appendBool("unique", unique); + BSONObj o = b.done(); + + MultiIndexBlock indexer(txn, collection); + + Status status = indexer.init(o); + if (status.code() == ErrorCodes::IndexAlreadyExists) + return; + uassertStatusOK(status); + + uassertStatusOK(indexer.insertAllDocumentsInCollection()); + + WriteUnitOfWork wunit(txn); + indexer.commit(); + wunit.commit(); +} + +/* fetch a single object from collection ns that matches query + set your db SavedContext first +*/ +bool Helpers::findOne(OperationContext* txn, + Collection* collection, + const BSONObj& query, + BSONObj& result, + bool requireIndex) { + RecordId loc = findOne(txn, collection, query, requireIndex); + if (loc.isNull()) + return false; + result = collection->docFor(txn, loc).value(); + return true; +} - /* fetch a single object from collection ns that matches query - set your db SavedContext first - */ - bool Helpers::findOne(OperationContext* txn, - Collection* collection, - const BSONObj &query, - BSONObj& result, +/* fetch a single object from collection ns that matches query + set your db SavedContext first +*/ +RecordId Helpers::findOne(OperationContext* txn, + Collection* collection, + const BSONObj& query, bool requireIndex) { - RecordId loc = findOne( txn, collection, query, requireIndex ); - if ( loc.isNull() ) - return false; - result = collection->docFor(txn, loc).value(); - return true; - } - - /* fetch a single object from collection ns that matches query - set your db SavedContext first - */ - RecordId Helpers::findOne(OperationContext* txn, - Collection* collection, - const BSONObj &query, - bool requireIndex) { - if ( !collection ) - return RecordId(); + if (!collection) + return RecordId(); - CanonicalQuery* cq; - const WhereCallbackReal whereCallback(txn, collection->ns().db()); + CanonicalQuery* cq; + const WhereCallbackReal whereCallback(txn, collection->ns().db()); - massert(17244, "Could not canonicalize " + query.toString(), + massert(17244, + "Could not canonicalize " + query.toString(), CanonicalQuery::canonicalize(collection->ns(), query, &cq, whereCallback).isOK()); - PlanExecutor* rawExec; - size_t options = requireIndex ? QueryPlannerParams::NO_TABLE_SCAN : QueryPlannerParams::DEFAULT; - massert(17245, "Could not get executor for query " + query.toString(), - getExecutor(txn, - collection, - cq, - PlanExecutor::YIELD_MANUAL, - &rawExec, - options).isOK()); - - unique_ptr<PlanExecutor> exec(rawExec); - PlanExecutor::ExecState state; - RecordId loc; - if (PlanExecutor::ADVANCED == (state = exec->getNext(NULL, &loc))) { - return loc; - } - return RecordId(); + PlanExecutor* rawExec; + size_t options = requireIndex ? QueryPlannerParams::NO_TABLE_SCAN : QueryPlannerParams::DEFAULT; + massert(17245, + "Could not get executor for query " + query.toString(), + getExecutor(txn, collection, cq, PlanExecutor::YIELD_MANUAL, &rawExec, options).isOK()); + + unique_ptr<PlanExecutor> exec(rawExec); + PlanExecutor::ExecState state; + RecordId loc; + if (PlanExecutor::ADVANCED == (state = exec->getNext(NULL, &loc))) { + return loc; + } + return RecordId(); +} + +bool Helpers::findById(OperationContext* txn, + Database* database, + const char* ns, + BSONObj query, + BSONObj& result, + bool* nsFound, + bool* indexFound) { + invariant(database); + + Collection* collection = database->getCollection(ns); + if (!collection) { + return false; } - bool Helpers::findById(OperationContext* txn, - Database* database, - const char *ns, - BSONObj query, - BSONObj& result, - bool* nsFound, - bool* indexFound) { - - invariant(database); - - Collection* collection = database->getCollection( ns ); - if ( !collection ) { - return false; - } - - if ( nsFound ) - *nsFound = true; + if (nsFound) + *nsFound = true; - IndexCatalog* catalog = collection->getIndexCatalog(); - const IndexDescriptor* desc = catalog->findIdIndex( txn ); + IndexCatalog* catalog = collection->getIndexCatalog(); + const IndexDescriptor* desc = catalog->findIdIndex(txn); - if ( !desc ) - return false; + if (!desc) + return false; - if ( indexFound ) - *indexFound = 1; + if (indexFound) + *indexFound = 1; - RecordId loc = catalog->getIndex(desc)->findSingle( txn, query["_id"].wrap() ); - if ( loc.isNull() ) - return false; - result = collection->docFor(txn, loc).value(); + RecordId loc = catalog->getIndex(desc)->findSingle(txn, query["_id"].wrap()); + if (loc.isNull()) + return false; + result = collection->docFor(txn, loc).value(); + return true; +} + +RecordId Helpers::findById(OperationContext* txn, Collection* collection, const BSONObj& idquery) { + verify(collection); + IndexCatalog* catalog = collection->getIndexCatalog(); + const IndexDescriptor* desc = catalog->findIdIndex(txn); + uassert(13430, "no _id index", desc); + return catalog->getIndex(desc)->findSingle(txn, idquery["_id"].wrap()); +} + +bool Helpers::getSingleton(OperationContext* txn, const char* ns, BSONObj& result) { + AutoGetCollectionForRead ctx(txn, ns); + unique_ptr<PlanExecutor> exec(InternalPlanner::collectionScan(txn, ns, ctx.getCollection())); + PlanExecutor::ExecState state = exec->getNext(&result, NULL); + + CurOp::get(txn)->done(); + + if (PlanExecutor::ADVANCED == state) { + result = result.getOwned(); return true; } + return false; +} - RecordId Helpers::findById(OperationContext* txn, - Collection* collection, - const BSONObj& idquery) { - verify(collection); - IndexCatalog* catalog = collection->getIndexCatalog(); - const IndexDescriptor* desc = catalog->findIdIndex( txn ); - uassert(13430, "no _id index", desc); - return catalog->getIndex(desc)->findSingle( txn, idquery["_id"].wrap() ); - } - - bool Helpers::getSingleton(OperationContext* txn, const char *ns, BSONObj& result) { - AutoGetCollectionForRead ctx(txn, ns); - unique_ptr<PlanExecutor> exec(InternalPlanner::collectionScan(txn, ns, ctx.getCollection())); - PlanExecutor::ExecState state = exec->getNext(&result, NULL); - - CurOp::get(txn)->done(); +bool Helpers::getLast(OperationContext* txn, const char* ns, BSONObj& result) { + AutoGetCollectionForRead autoColl(txn, ns); + unique_ptr<PlanExecutor> exec(InternalPlanner::collectionScan( + txn, ns, autoColl.getCollection(), InternalPlanner::BACKWARD)); + PlanExecutor::ExecState state = exec->getNext(&result, NULL); - if (PlanExecutor::ADVANCED == state) { - result = result.getOwned(); - return true; - } - return false; + if (PlanExecutor::ADVANCED == state) { + result = result.getOwned(); + return true; } + return false; +} - bool Helpers::getLast(OperationContext* txn, const char *ns, BSONObj& result) { - AutoGetCollectionForRead autoColl(txn, ns); - unique_ptr<PlanExecutor> exec(InternalPlanner::collectionScan(txn, - ns, - autoColl.getCollection(), - InternalPlanner::BACKWARD)); - PlanExecutor::ExecState state = exec->getNext(&result, NULL); - - if (PlanExecutor::ADVANCED == state) { - result = result.getOwned(); - return true; - } - return false; - } +void Helpers::upsert(OperationContext* txn, const string& ns, const BSONObj& o, bool fromMigrate) { + BSONElement e = o["_id"]; + verify(e.type()); + BSONObj id = e.wrap(); - void Helpers::upsert( OperationContext* txn, - const string& ns, - const BSONObj& o, - bool fromMigrate ) { - BSONElement e = o["_id"]; - verify( e.type() ); - BSONObj id = e.wrap(); + OpDebug debug; + OldClientContext context(txn, ns); - OpDebug debug; - OldClientContext context(txn, ns); + const NamespaceString requestNs(ns); + UpdateRequest request(requestNs); - const NamespaceString requestNs(ns); - UpdateRequest request(requestNs); + request.setQuery(id); + request.setUpdates(o); + request.setUpsert(); + request.setFromMigration(fromMigrate); + UpdateLifecycleImpl updateLifecycle(true, requestNs); + request.setLifecycle(&updateLifecycle); - request.setQuery(id); - request.setUpdates(o); - request.setUpsert(); - request.setFromMigration(fromMigrate); - UpdateLifecycleImpl updateLifecycle(true, requestNs); - request.setLifecycle(&updateLifecycle); + update(txn, context.db(), request, &debug); +} - update(txn, context.db(), request, &debug); - } +void Helpers::putSingleton(OperationContext* txn, const char* ns, BSONObj obj) { + OpDebug debug; + OldClientContext context(txn, ns); - void Helpers::putSingleton(OperationContext* txn, const char *ns, BSONObj obj) { - OpDebug debug; - OldClientContext context(txn, ns); + const NamespaceString requestNs(ns); + UpdateRequest request(requestNs); - const NamespaceString requestNs(ns); - UpdateRequest request(requestNs); + request.setUpdates(obj); + request.setUpsert(); + UpdateLifecycleImpl updateLifecycle(true, requestNs); + request.setLifecycle(&updateLifecycle); - request.setUpdates(obj); - request.setUpsert(); - UpdateLifecycleImpl updateLifecycle(true, requestNs); - request.setLifecycle(&updateLifecycle); + update(txn, context.db(), request, &debug); - update(txn, context.db(), request, &debug); + CurOp::get(txn)->done(); +} - CurOp::get(txn)->done(); +BSONObj Helpers::toKeyFormat(const BSONObj& o) { + BSONObjBuilder keyObj(o.objsize()); + BSONForEach(e, o) { + keyObj.appendAs(e, ""); } + return keyObj.obj(); +} - BSONObj Helpers::toKeyFormat( const BSONObj& o ) { - BSONObjBuilder keyObj( o.objsize() ); - BSONForEach( e , o ) { - keyObj.appendAs( e , "" ); - } - return keyObj.obj(); +BSONObj Helpers::inferKeyPattern(const BSONObj& o) { + BSONObjBuilder kpBuilder; + BSONForEach(e, o) { + kpBuilder.append(e.fieldName(), 1); + } + return kpBuilder.obj(); +} + +static bool findShardKeyIndexPattern(OperationContext* txn, + const string& ns, + const BSONObj& shardKeyPattern, + BSONObj* indexPattern) { + AutoGetCollectionForRead ctx(txn, ns); + Collection* collection = ctx.getCollection(); + if (!collection) { + return false; } - BSONObj Helpers::inferKeyPattern( const BSONObj& o ) { - BSONObjBuilder kpBuilder; - BSONForEach( e , o ) { - kpBuilder.append( e.fieldName() , 1 ); - } - return kpBuilder.obj(); + // Allow multiKey based on the invariant that shard keys must be single-valued. + // Therefore, any multi-key index prefixed by shard key cannot be multikey over + // the shard key fields. + const IndexDescriptor* idx = + collection->getIndexCatalog()->findShardKeyPrefixedIndex(txn, + shardKeyPattern, + false); // requireSingleKey + + if (idx == NULL) + return false; + *indexPattern = idx->keyPattern().getOwned(); + return true; +} + +long long Helpers::removeRange(OperationContext* txn, + const KeyRange& range, + bool maxInclusive, + const WriteConcernOptions& writeConcern, + RemoveSaver* callback, + bool fromMigrate, + bool onlyRemoveOrphanedDocs) { + Timer rangeRemoveTimer; + const string& ns = range.ns; + + // The IndexChunk has a keyPattern that may apply to more than one index - we need to + // select the index and get the full index keyPattern here. + BSONObj indexKeyPatternDoc; + if (!findShardKeyIndexPattern(txn, ns, range.keyPattern, &indexKeyPatternDoc)) { + warning(LogComponent::kSharding) << "no index found to clean data over range of type " + << range.keyPattern << " in " << ns << endl; + return -1; } - static bool findShardKeyIndexPattern(OperationContext* txn, - const string& ns, - const BSONObj& shardKeyPattern, - BSONObj* indexPattern ) { + KeyPattern indexKeyPattern(indexKeyPatternDoc); - AutoGetCollectionForRead ctx(txn, ns); - Collection* collection = ctx.getCollection(); - if (!collection) { - return false; - } + // Extend bounds to match the index we found - // Allow multiKey based on the invariant that shard keys must be single-valued. - // Therefore, any multi-key index prefixed by shard key cannot be multikey over - // the shard key fields. - const IndexDescriptor* idx = - collection->getIndexCatalog()->findShardKeyPrefixedIndex(txn, - shardKeyPattern, - false); // requireSingleKey - - if ( idx == NULL ) - return false; - *indexPattern = idx->keyPattern().getOwned(); - return true; - } + // Extend min to get (min, MinKey, MinKey, ....) + const BSONObj& min = + Helpers::toKeyFormat(indexKeyPattern.extendRangeBound(range.minKey, false)); + // If upper bound is included, extend max to get (max, MaxKey, MaxKey, ...) + // If not included, extend max to get (max, MinKey, MinKey, ....) + const BSONObj& max = + Helpers::toKeyFormat(indexKeyPattern.extendRangeBound(range.maxKey, maxInclusive)); + + MONGO_LOG_COMPONENT(1, LogComponent::kSharding) + << "begin removal of " << min << " to " << max << " in " << ns + << " with write concern: " << writeConcern.toBSON() << endl; + + long long numDeleted = 0; - long long Helpers::removeRange( OperationContext* txn, - const KeyRange& range, - bool maxInclusive, - const WriteConcernOptions& writeConcern, - RemoveSaver* callback, - bool fromMigrate, - bool onlyRemoveOrphanedDocs ) - { - Timer rangeRemoveTimer; - const string& ns = range.ns; - - // The IndexChunk has a keyPattern that may apply to more than one index - we need to - // select the index and get the full index keyPattern here. - BSONObj indexKeyPatternDoc; - if ( !findShardKeyIndexPattern( txn, - ns, - range.keyPattern, - &indexKeyPatternDoc ) ) + Milliseconds millisWaitingForReplication{0}; + + while (1) { + // Scoping for write lock. { - warning(LogComponent::kSharding) << "no index found to clean data over range of type " - << range.keyPattern << " in " << ns << endl; - return -1; - } + OldClientWriteContext ctx(txn, ns); + Collection* collection = ctx.getCollection(); + if (!collection) + break; + + IndexDescriptor* desc = + collection->getIndexCatalog()->findIndexByKeyPattern(txn, indexKeyPattern.toBSON()); + + unique_ptr<PlanExecutor> exec( + InternalPlanner::indexScan(txn, + collection, + desc, + min, + max, + maxInclusive, + InternalPlanner::FORWARD, + InternalPlanner::IXSCAN_FETCH)); + exec->setYieldPolicy(PlanExecutor::YIELD_AUTO); + + RecordId rloc; + BSONObj obj; + PlanExecutor::ExecState state; + // This may yield so we cannot touch nsd after this. + state = exec->getNext(&obj, &rloc); + exec.reset(); + if (PlanExecutor::IS_EOF == state) { + break; + } - KeyPattern indexKeyPattern( indexKeyPatternDoc ); - - // Extend bounds to match the index we found - - // Extend min to get (min, MinKey, MinKey, ....) - const BSONObj& min = - Helpers::toKeyFormat(indexKeyPattern.extendRangeBound(range.minKey, - false)); - // If upper bound is included, extend max to get (max, MaxKey, MaxKey, ...) - // If not included, extend max to get (max, MinKey, MinKey, ....) - const BSONObj& max = - Helpers::toKeyFormat( indexKeyPattern.extendRangeBound(range.maxKey,maxInclusive)); - - MONGO_LOG_COMPONENT(1, LogComponent::kSharding) - << "begin removal of " << min << " to " << max << " in " << ns - << " with write concern: " << writeConcern.toBSON() << endl; - - long long numDeleted = 0; - - Milliseconds millisWaitingForReplication{0}; - - while ( 1 ) { - // Scoping for write lock. - { - OldClientWriteContext ctx(txn, ns); - Collection* collection = ctx.getCollection(); - if ( !collection ) - break; + if (PlanExecutor::FAILURE == state || PlanExecutor::DEAD == state) { + const std::unique_ptr<PlanStageStats> stats(exec->getStats()); + warning(LogComponent::kSharding) + << PlanExecutor::statestr(state) << " - cursor error while trying to delete " + << min << " to " << max << " in " << ns << ": " + << WorkingSetCommon::toStatusString(obj) + << ", stats: " << Explain::statsToBSON(*stats) << endl; + break; + } - IndexDescriptor* desc = - collection->getIndexCatalog()->findIndexByKeyPattern( txn, - indexKeyPattern.toBSON() ); - - unique_ptr<PlanExecutor> exec(InternalPlanner::indexScan(txn, collection, desc, - min, max, - maxInclusive, - InternalPlanner::FORWARD, - InternalPlanner::IXSCAN_FETCH)); - exec->setYieldPolicy(PlanExecutor::YIELD_AUTO); - - RecordId rloc; - BSONObj obj; - PlanExecutor::ExecState state; - // This may yield so we cannot touch nsd after this. - state = exec->getNext(&obj, &rloc); - exec.reset(); - if (PlanExecutor::IS_EOF == state) { break; } - - if (PlanExecutor::FAILURE == state || PlanExecutor::DEAD == state) { - const std::unique_ptr<PlanStageStats> stats(exec->getStats()); - warning(LogComponent::kSharding) << PlanExecutor::statestr(state) - << " - cursor error while trying to delete " - << min << " to " << max - << " in " << ns << ": " - << WorkingSetCommon::toStatusString(obj) << ", stats: " - << Explain::statsToBSON(*stats) << endl; - break; - } + verify(PlanExecutor::ADVANCED == state); - verify(PlanExecutor::ADVANCED == state); - - WriteUnitOfWork wuow(txn); - - if ( onlyRemoveOrphanedDocs ) { - // Do a final check in the write lock to make absolutely sure that our - // collection hasn't been modified in a way that invalidates our migration - // cleanup. - - // We should never be able to turn off the sharding state once enabled, but - // in the future we might want to. - verify(shardingState.enabled()); - - // In write lock, so will be the most up-to-date version - CollectionMetadataPtr metadataNow = shardingState.getCollectionMetadata( ns ); - - bool docIsOrphan; - if ( metadataNow ) { - ShardKeyPattern kp( metadataNow->getKeyPattern() ); - BSONObj key = kp.extractShardKeyFromDoc(obj); - docIsOrphan = !metadataNow->keyBelongsToMe( key ) - && !metadataNow->keyIsPending( key ); - } - else { - docIsOrphan = false; - } - - if ( !docIsOrphan ) { - warning(LogComponent::kSharding) - << "aborting migration cleanup for chunk " << min << " to " << max - << ( metadataNow ? (string) " at document " + obj.toString() : "" ) - << ", collection " << ns << " has changed " << endl; - break; - } - } + WriteUnitOfWork wuow(txn); - NamespaceString nss(ns); - if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(nss)) { - warning() << "stepped down from primary while deleting chunk; " - << "orphaning data in " << ns - << " in range [" << min << ", " << max << ")"; - return numDeleted; - } + if (onlyRemoveOrphanedDocs) { + // Do a final check in the write lock to make absolutely sure that our + // collection hasn't been modified in a way that invalidates our migration + // cleanup. - if ( callback ) - callback->goingToDelete( obj ); + // We should never be able to turn off the sharding state once enabled, but + // in the future we might want to. + verify(shardingState.enabled()); - BSONObj deletedId; - collection->deleteDocument( txn, rloc, false, false, &deletedId ); - wuow.commit(); - numDeleted++; - } + // In write lock, so will be the most up-to-date version + CollectionMetadataPtr metadataNow = shardingState.getCollectionMetadata(ns); - // TODO remove once the yielding below that references this timer has been removed - Timer secondaryThrottleTime; + bool docIsOrphan; + if (metadataNow) { + ShardKeyPattern kp(metadataNow->getKeyPattern()); + BSONObj key = kp.extractShardKeyFromDoc(obj); + docIsOrphan = + !metadataNow->keyBelongsToMe(key) && !metadataNow->keyIsPending(key); + } else { + docIsOrphan = false; + } - if (writeConcern.shouldWaitForOtherNodes() && numDeleted > 0) { - repl::ReplicationCoordinator::StatusAndDuration replStatus = - repl::getGlobalReplicationCoordinator()->awaitReplication( - txn, - repl::ReplClientInfo::forClient(txn->getClient()).getLastOp(), - writeConcern); - if (replStatus.status.code() == ErrorCodes::ExceededTimeLimit) { + if (!docIsOrphan) { warning(LogComponent::kSharding) - << "replication to secondaries for removeRange at " - "least 60 seconds behind"; - } - else { - massertStatusOK(replStatus.status); + << "aborting migration cleanup for chunk " << min << " to " << max + << (metadataNow ? (string) " at document " + obj.toString() : "") + << ", collection " << ns << " has changed " << endl; + break; } - millisWaitingForReplication += replStatus.duration; } - } - - if (writeConcern.shouldWaitForOtherNodes()) - log(LogComponent::kSharding) - << "Helpers::removeRangeUnlocked time spent waiting for replication: " - << durationCount<Milliseconds>(millisWaitingForReplication) << "ms" << endl; - - MONGO_LOG_COMPONENT(1, LogComponent::kSharding) - << "end removal of " << min << " to " << max << " in " << ns - << " (took " << rangeRemoveTimer.millis() << "ms)" << endl; - - return numDeleted; - } - const long long Helpers::kMaxDocsPerChunk( 250000 ); - - // Used by migration clone step - // TODO: Cannot hook up quite yet due to _trackerLocks in shared migration code. - // TODO: This function is not used outside of tests - Status Helpers::getLocsInRange( OperationContext* txn, - const KeyRange& range, - long long maxChunkSizeBytes, - set<RecordId>* locs, - long long* numDocs, - long long* estChunkSizeBytes ) - { - const string ns = range.ns; - *estChunkSizeBytes = 0; - *numDocs = 0; - - AutoGetCollectionForRead ctx(txn, ns); - - Collection* collection = ctx.getCollection(); - if (!collection) { - return Status(ErrorCodes::NamespaceNotFound, ns); - } + NamespaceString nss(ns); + if (!repl::getGlobalReplicationCoordinator()->canAcceptWritesFor(nss)) { + warning() << "stepped down from primary while deleting chunk; " + << "orphaning data in " << ns << " in range [" << min << ", " << max + << ")"; + return numDeleted; + } - // Require single key - IndexDescriptor *idx = - collection->getIndexCatalog()->findShardKeyPrefixedIndex( txn, range.keyPattern, true ); + if (callback) + callback->goingToDelete(obj); - if ( idx == NULL ) { - return Status( ErrorCodes::IndexNotFound, range.keyPattern.toString() ); + BSONObj deletedId; + collection->deleteDocument(txn, rloc, false, false, &deletedId); + wuow.commit(); + numDeleted++; } - // use the average object size to estimate how many objects a full chunk would carry - // do that while traversing the chunk's range using the sharding index, below - // there's a fair amount of slack before we determine a chunk is too large because object - // sizes will vary - long long avgDocsWhenFull; - long long avgDocSizeBytes; - const long long totalDocsInNS = collection->numRecords( txn ); - if ( totalDocsInNS > 0 ) { - // TODO: Figure out what's up here - avgDocSizeBytes = collection->dataSize( txn ) / totalDocsInNS; - avgDocsWhenFull = maxChunkSizeBytes / avgDocSizeBytes; - avgDocsWhenFull = std::min( kMaxDocsPerChunk + 1, - 130 * avgDocsWhenFull / 100 /* slack */); - } - else { - avgDocSizeBytes = 0; - avgDocsWhenFull = kMaxDocsPerChunk + 1; + // TODO remove once the yielding below that references this timer has been removed + Timer secondaryThrottleTime; + + if (writeConcern.shouldWaitForOtherNodes() && numDeleted > 0) { + repl::ReplicationCoordinator::StatusAndDuration replStatus = + repl::getGlobalReplicationCoordinator()->awaitReplication( + txn, + repl::ReplClientInfo::forClient(txn->getClient()).getLastOp(), + writeConcern); + if (replStatus.status.code() == ErrorCodes::ExceededTimeLimit) { + warning(LogComponent::kSharding) << "replication to secondaries for removeRange at " + "least 60 seconds behind"; + } else { + massertStatusOK(replStatus.status); + } + millisWaitingForReplication += replStatus.duration; } + } - // Assume both min and max non-empty, append MinKey's to make them fit chosen index - KeyPattern idxKeyPattern( idx->keyPattern() ); - BSONObj min = Helpers::toKeyFormat( idxKeyPattern.extendRangeBound( range.minKey, false ) ); - BSONObj max = Helpers::toKeyFormat( idxKeyPattern.extendRangeBound( range.maxKey, false ) ); + if (writeConcern.shouldWaitForOtherNodes()) + log(LogComponent::kSharding) + << "Helpers::removeRangeUnlocked time spent waiting for replication: " + << durationCount<Milliseconds>(millisWaitingForReplication) << "ms" << endl; + + MONGO_LOG_COMPONENT(1, LogComponent::kSharding) << "end removal of " << min << " to " << max + << " in " << ns << " (took " + << rangeRemoveTimer.millis() << "ms)" << endl; + + return numDeleted; +} + +const long long Helpers::kMaxDocsPerChunk(250000); + +// Used by migration clone step +// TODO: Cannot hook up quite yet due to _trackerLocks in shared migration code. +// TODO: This function is not used outside of tests +Status Helpers::getLocsInRange(OperationContext* txn, + const KeyRange& range, + long long maxChunkSizeBytes, + set<RecordId>* locs, + long long* numDocs, + long long* estChunkSizeBytes) { + const string ns = range.ns; + *estChunkSizeBytes = 0; + *numDocs = 0; + + AutoGetCollectionForRead ctx(txn, ns); + + Collection* collection = ctx.getCollection(); + if (!collection) { + return Status(ErrorCodes::NamespaceNotFound, ns); + } + // Require single key + IndexDescriptor* idx = + collection->getIndexCatalog()->findShardKeyPrefixedIndex(txn, range.keyPattern, true); - // do a full traversal of the chunk and don't stop even if we think it is a large chunk - // we want the number of records to better report, in that case - bool isLargeChunk = false; - long long docCount = 0; + if (idx == NULL) { + return Status(ErrorCodes::IndexNotFound, range.keyPattern.toString()); + } - unique_ptr<PlanExecutor> exec( - InternalPlanner::indexScan(txn, collection, idx, min, max, false)); - // we can afford to yield here because any change to the base data that we might miss is - // already being queued and will be migrated in the 'transferMods' stage - exec->setYieldPolicy(PlanExecutor::YIELD_AUTO); + // use the average object size to estimate how many objects a full chunk would carry + // do that while traversing the chunk's range using the sharding index, below + // there's a fair amount of slack before we determine a chunk is too large because object + // sizes will vary + long long avgDocsWhenFull; + long long avgDocSizeBytes; + const long long totalDocsInNS = collection->numRecords(txn); + if (totalDocsInNS > 0) { + // TODO: Figure out what's up here + avgDocSizeBytes = collection->dataSize(txn) / totalDocsInNS; + avgDocsWhenFull = maxChunkSizeBytes / avgDocSizeBytes; + avgDocsWhenFull = std::min(kMaxDocsPerChunk + 1, 130 * avgDocsWhenFull / 100 /* slack */); + } else { + avgDocSizeBytes = 0; + avgDocsWhenFull = kMaxDocsPerChunk + 1; + } - RecordId loc; - PlanExecutor::ExecState state; - while (PlanExecutor::ADVANCED == (state = exec->getNext(NULL, &loc))) { - if ( !isLargeChunk ) { - locs->insert( loc ); - } + // Assume both min and max non-empty, append MinKey's to make them fit chosen index + KeyPattern idxKeyPattern(idx->keyPattern()); + BSONObj min = Helpers::toKeyFormat(idxKeyPattern.extendRangeBound(range.minKey, false)); + BSONObj max = Helpers::toKeyFormat(idxKeyPattern.extendRangeBound(range.maxKey, false)); - if ( ++docCount > avgDocsWhenFull ) { - isLargeChunk = true; - } - } - *numDocs = docCount; - *estChunkSizeBytes = docCount * avgDocSizeBytes; + // do a full traversal of the chunk and don't stop even if we think it is a large chunk + // we want the number of records to better report, in that case + bool isLargeChunk = false; + long long docCount = 0; - if ( isLargeChunk ) { - stringstream ss; - ss << estChunkSizeBytes; - return Status( ErrorCodes::InvalidLength, ss.str() ); + unique_ptr<PlanExecutor> exec( + InternalPlanner::indexScan(txn, collection, idx, min, max, false)); + // we can afford to yield here because any change to the base data that we might miss is + // already being queued and will be migrated in the 'transferMods' stage + exec->setYieldPolicy(PlanExecutor::YIELD_AUTO); + + RecordId loc; + PlanExecutor::ExecState state; + while (PlanExecutor::ADVANCED == (state = exec->getNext(NULL, &loc))) { + if (!isLargeChunk) { + locs->insert(loc); } - return Status::OK(); + if (++docCount > avgDocsWhenFull) { + isLargeChunk = true; + } } + *numDocs = docCount; + *estChunkSizeBytes = docCount* avgDocSizeBytes; - void Helpers::emptyCollection(OperationContext* txn, const char *ns) { - OldClientContext context(txn, ns); - bool shouldReplicateWrites = txn->writesAreReplicated(); - txn->setReplicatedWrites(false); - ON_BLOCK_EXIT(&OperationContext::setReplicatedWrites, txn, shouldReplicateWrites); - deleteObjects(txn, context.db(), ns, BSONObj(), PlanExecutor::YIELD_MANUAL, false); + if (isLargeChunk) { + stringstream ss; + ss << estChunkSizeBytes; + return Status(ErrorCodes::InvalidLength, ss.str()); } - Helpers::RemoveSaver::RemoveSaver( const string& a , const string& b , const string& why) - : _out(0) { - static int NUM = 0; + return Status::OK(); +} - _root = storageGlobalParams.dbpath; - if ( a.size() ) - _root /= a; - if ( b.size() ) - _root /= b; - verify( a.size() || b.size() ); - _file = _root; +void Helpers::emptyCollection(OperationContext* txn, const char* ns) { + OldClientContext context(txn, ns); + bool shouldReplicateWrites = txn->writesAreReplicated(); + txn->setReplicatedWrites(false); + ON_BLOCK_EXIT(&OperationContext::setReplicatedWrites, txn, shouldReplicateWrites); + deleteObjects(txn, context.db(), ns, BSONObj(), PlanExecutor::YIELD_MANUAL, false); +} - stringstream ss; - ss << why << "." << terseCurrentTime(false) << "." << NUM++ << ".bson"; - _file /= ss.str(); - } +Helpers::RemoveSaver::RemoveSaver(const string& a, const string& b, const string& why) : _out(0) { + static int NUM = 0; - Helpers::RemoveSaver::~RemoveSaver() { - if ( _out ) { - _out->close(); - delete _out; - _out = 0; - } - } + _root = storageGlobalParams.dbpath; + if (a.size()) + _root /= a; + if (b.size()) + _root /= b; + verify(a.size() || b.size()); - void Helpers::RemoveSaver::goingToDelete( const BSONObj& o ) { - if ( ! _out ) { - boost::filesystem::create_directories( _root ); - _out = new ofstream(); - _out->open( _file.string().c_str() , ios_base::out | ios_base::binary ); - if ( ! _out->good() ) { - error() << "couldn't create file: " << _file.string() << - " for remove saving" << endl; - delete _out; - _out = 0; - return; - } + _file = _root; + stringstream ss; + ss << why << "." << terseCurrentTime(false) << "." << NUM++ << ".bson"; + _file /= ss.str(); +} + +Helpers::RemoveSaver::~RemoveSaver() { + if (_out) { + _out->close(); + delete _out; + _out = 0; + } +} + +void Helpers::RemoveSaver::goingToDelete(const BSONObj& o) { + if (!_out) { + boost::filesystem::create_directories(_root); + _out = new ofstream(); + _out->open(_file.string().c_str(), ios_base::out | ios_base::binary); + if (!_out->good()) { + error() << "couldn't create file: " << _file.string() << " for remove saving" << endl; + delete _out; + _out = 0; + return; } - _out->write( o.objdata() , o.objsize() ); } + _out->write(o.objdata(), o.objsize()); +} -} // namespace mongo +} // namespace mongo |