// dbhelpers.cpp /** * Copyright (C) 2008-2014 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #include "mongo/platform/basic.h" #include "mongo/db/dbhelpers.h" #include #include #include #include "mongo/client/dbclientinterface.h" #include "mongo/db/catalog/index_create.h" #include "mongo/db/db.h" #include "mongo/db/exec/working_set_common.h" #include "mongo/db/json.h" #include "mongo/db/index/btree_access_method.h" #include "mongo/db/ops/delete.h" #include "mongo/db/ops/update.h" #include "mongo/db/ops/update_lifecycle_impl.h" #include "mongo/db/ops/update_request.h" #include "mongo/db/ops/update_result.h" #include "mongo/db/query/get_executor.h" #include "mongo/db/query/internal_plans.h" #include "mongo/db/query/query_planner.h" #include "mongo/db/repl/oplog.h" #include "mongo/db/repl/repl_coordinator_global.h" #include "mongo/db/write_concern.h" #include "mongo/db/write_concern_options.h" #include "mongo/db/operation_context_impl.h" #include "mongo/db/storage_options.h" #include "mongo/db/catalog/collection.h" #include "mongo/s/d_state.h" #include "mongo/util/log.h" namespace mongo { using logger::LogComponent; const BSONObj reverseNaturalObj = BSON( "$natural" << -1 ); void Helpers::ensureIndex(OperationContext* txn, Collection* collection, BSONObj keyPattern, bool unique, const char *name) { BSONObjBuilder b; b.append("name", name); b.append("ns", collection->ns()); b.append("key", keyPattern); b.appendBool("unique", unique); BSONObj o = b.done(); MultiIndexBlock indexer(txn, collection); Status status = indexer.init(o); if ( status.code() == ErrorCodes::IndexAlreadyExists ) return; uassertStatusOK( status ); uassertStatusOK(indexer.insertAllDocumentsInCollection()); WriteUnitOfWork wunit(txn); indexer.commit(); wunit.commit(); } /* fetch a single object from collection ns that matches query set your db SavedContext first */ bool Helpers::findOne(OperationContext* txn, Collection* collection, const BSONObj &query, BSONObj& result, bool requireIndex) { DiskLoc loc = findOne( txn, collection, query, requireIndex ); if ( loc.isNull() ) return false; result = collection->docFor(txn, loc); return true; } /* fetch a single object from collection ns that matches query set your db SavedContext first */ DiskLoc Helpers::findOne(OperationContext* txn, Collection* collection, const BSONObj &query, bool requireIndex) { if ( !collection ) return DiskLoc(); CanonicalQuery* cq; const WhereCallbackReal whereCallback(txn, collection->ns().db()); massert(17244, "Could not canonicalize " + query.toString(), CanonicalQuery::canonicalize(collection->ns(), query, &cq, whereCallback).isOK()); PlanExecutor* rawExec; size_t options = requireIndex ? QueryPlannerParams::NO_TABLE_SCAN : QueryPlannerParams::DEFAULT; massert(17245, "Could not get executor for query " + query.toString(), getExecutor(txn, collection, cq, &rawExec, options).isOK()); auto_ptr exec(rawExec); PlanExecutor::ExecState state; DiskLoc loc; if (PlanExecutor::ADVANCED == (state = exec->getNext(NULL, &loc))) { return loc; } return DiskLoc(); } bool Helpers::findById(OperationContext* txn, Database* database, const char *ns, BSONObj query, BSONObj& result, bool* nsFound, bool* indexFound) { txn->lockState()->assertAtLeastReadLocked(ns); invariant( database ); Collection* collection = database->getCollection( txn, ns ); if ( !collection ) { return false; } if ( nsFound ) *nsFound = true; IndexCatalog* catalog = collection->getIndexCatalog(); const IndexDescriptor* desc = catalog->findIdIndex( txn ); if ( !desc ) return false; if ( indexFound ) *indexFound = 1; // See SERVER-12397. This may not always be true. BtreeBasedAccessMethod* accessMethod = static_cast(catalog->getIndex( desc )); DiskLoc loc = accessMethod->findSingle( txn, query["_id"].wrap() ); if ( loc.isNull() ) return false; result = collection->docFor( txn, loc ); return true; } DiskLoc Helpers::findById(OperationContext* txn, Collection* collection, const BSONObj& idquery) { verify(collection); IndexCatalog* catalog = collection->getIndexCatalog(); const IndexDescriptor* desc = catalog->findIdIndex( txn ); uassert(13430, "no _id index", desc); // See SERVER-12397. This may not always be true. BtreeBasedAccessMethod* accessMethod = static_cast(catalog->getIndex( desc )); return accessMethod->findSingle( txn, idquery["_id"].wrap() ); } /* Get the first object from a collection. Generally only useful if the collection only ever has a single object -- which is a "singleton collection". Note that the BSONObj returned is *not* owned and will become invalid if the database is closed. Returns: true if object exists. */ bool Helpers::getSingleton(OperationContext* txn, const char *ns, BSONObj& result) { Client::Context context(txn, ns); auto_ptr exec( InternalPlanner::collectionScan(txn, ns, context.db()->getCollection(txn, ns))); PlanExecutor::ExecState state = exec->getNext(&result, NULL); context.getClient()->curop()->done(); return PlanExecutor::ADVANCED == state; } bool Helpers::getLast(OperationContext* txn, const char *ns, BSONObj& result) { Client::Context ctx(txn, ns); Collection* coll = ctx.db()->getCollection( txn, ns ); auto_ptr exec( InternalPlanner::collectionScan(txn, ns, coll, InternalPlanner::BACKWARD)); PlanExecutor::ExecState state = exec->getNext(&result, NULL); return PlanExecutor::ADVANCED == state; } void Helpers::upsert( OperationContext* txn, const string& ns, const BSONObj& o, bool fromMigrate ) { BSONElement e = o["_id"]; verify( e.type() ); BSONObj id = e.wrap(); OpDebug debug; Client::Context context(txn, ns); const NamespaceString requestNs(ns); UpdateRequest request(txn, requestNs); request.setQuery(id); request.setUpdates(o); request.setUpsert(); request.setUpdateOpLog(); request.setFromMigration(fromMigrate); UpdateLifecycleImpl updateLifecycle(true, requestNs); request.setLifecycle(&updateLifecycle); update(context.db(), request, &debug); } void Helpers::putSingleton(OperationContext* txn, const char *ns, BSONObj obj) { OpDebug debug; Client::Context context(txn, ns); const NamespaceString requestNs(ns); UpdateRequest request(txn, requestNs); request.setUpdates(obj); request.setUpsert(); request.setUpdateOpLog(); UpdateLifecycleImpl updateLifecycle(true, requestNs); request.setLifecycle(&updateLifecycle); update(context.db(), request, &debug); context.getClient()->curop()->done(); } void Helpers::putSingletonGod(OperationContext* txn, const char *ns, BSONObj obj, bool logTheOp) { OpDebug debug; Client::Context context(txn, ns); const NamespaceString requestNs(ns); UpdateRequest request(txn, requestNs); request.setGod(); request.setUpdates(obj); request.setUpsert(); request.setUpdateOpLog(logTheOp); update(context.db(), request, &debug); context.getClient()->curop()->done(); } BSONObj Helpers::toKeyFormat( const BSONObj& o ) { BSONObjBuilder keyObj( o.objsize() ); BSONForEach( e , o ) { keyObj.appendAs( e , "" ); } return keyObj.obj(); } BSONObj Helpers::inferKeyPattern( const BSONObj& o ) { BSONObjBuilder kpBuilder; BSONForEach( e , o ) { kpBuilder.append( e.fieldName() , 1 ); } return kpBuilder.obj(); } static bool findShardKeyIndexPattern(OperationContext* txn, const string& ns, const BSONObj& shardKeyPattern, BSONObj* indexPattern ) { Client::ReadContext context(txn, ns); Collection* collection = context.ctx().db()->getCollection( txn, ns ); if ( !collection ) return false; // Allow multiKey based on the invariant that shard keys must be single-valued. // Therefore, any multi-key index prefixed by shard key cannot be multikey over // the shard key fields. const IndexDescriptor* idx = collection->getIndexCatalog()->findIndexByPrefix(txn, shardKeyPattern, false /* allow multi key */); if ( idx == NULL ) return false; *indexPattern = idx->keyPattern().getOwned(); return true; } long long Helpers::removeRange( OperationContext* txn, const KeyRange& range, bool maxInclusive, const WriteConcernOptions& writeConcern, RemoveSaver* callback, bool fromMigrate, bool onlyRemoveOrphanedDocs ) { Timer rangeRemoveTimer; const string& ns = range.ns; // The IndexChunk has a keyPattern that may apply to more than one index - we need to // select the index and get the full index keyPattern here. BSONObj indexKeyPatternDoc; if ( !findShardKeyIndexPattern( txn, ns, range.keyPattern, &indexKeyPatternDoc ) ) { warning(LogComponent::kSharding) << "no index found to clean data over range of type " << range.keyPattern << " in " << ns << endl; return -1; } KeyPattern indexKeyPattern( indexKeyPatternDoc ); // Extend bounds to match the index we found // Extend min to get (min, MinKey, MinKey, ....) const BSONObj& min = Helpers::toKeyFormat(indexKeyPattern.extendRangeBound(range.minKey, false)); // If upper bound is included, extend max to get (max, MaxKey, MaxKey, ...) // If not included, extend max to get (max, MinKey, MinKey, ....) const BSONObj& max = Helpers::toKeyFormat( indexKeyPattern.extendRangeBound(range.maxKey,maxInclusive)); MONGO_LOG_COMPONENT(1, LogComponent::kSharding) << "begin removal of " << min << " to " << max << " in " << ns << " with write concern: " << writeConcern.toBSON() << endl; long long numDeleted = 0; long long millisWaitingForReplication = 0; while ( 1 ) { // Scoping for write lock. { Client::WriteContext ctx(txn, ns); Collection* collection = ctx.ctx().db()->getCollection( txn, ns ); if ( !collection ) break; IndexDescriptor* desc = collection->getIndexCatalog()->findIndexByKeyPattern( txn, indexKeyPattern.toBSON() ); auto_ptr exec(InternalPlanner::indexScan(txn, collection, desc, min, max, maxInclusive, InternalPlanner::FORWARD, InternalPlanner::IXSCAN_FETCH)); DiskLoc rloc; BSONObj obj; PlanExecutor::ExecState state; // This may yield so we cannot touch nsd after this. state = exec->getNext(&obj, &rloc); exec.reset(); if (PlanExecutor::IS_EOF == state) { break; } if (PlanExecutor::DEAD == state) { warning(LogComponent::kSharding) << "cursor died: aborting deletion for " << min << " to " << max << " in " << ns << endl; break; } if (PlanExecutor::EXEC_ERROR == state) { warning(LogComponent::kSharding) << "cursor error while trying to delete " << min << " to " << max << " in " << ns << ": " << WorkingSetCommon::toStatusString(obj) << endl; break; } verify(PlanExecutor::ADVANCED == state); if ( onlyRemoveOrphanedDocs ) { // Do a final check in the write lock to make absolutely sure that our // collection hasn't been modified in a way that invalidates our migration // cleanup. // We should never be able to turn off the sharding state once enabled, but // in the future we might want to. verify(shardingState.enabled()); // In write lock, so will be the most up-to-date version CollectionMetadataPtr metadataNow = shardingState.getCollectionMetadata( ns ); bool docIsOrphan; if ( metadataNow ) { KeyPattern kp( metadataNow->getKeyPattern() ); BSONObj key = kp.extractShardKeyFromDoc(obj); docIsOrphan = !metadataNow->keyBelongsToMe( key ) && !metadataNow->keyIsPending( key ); } else { docIsOrphan = false; } if ( !docIsOrphan ) { warning(LogComponent::kSharding) << "aborting migration cleanup for chunk " << min << " to " << max << ( metadataNow ? (string) " at document " + obj.toString() : "" ) << ", collection " << ns << " has changed " << endl; break; } } if ( callback ) callback->goingToDelete( obj ); BSONObj deletedId; collection->deleteDocument( txn, rloc, false, false, &deletedId ); // The above throws on failure, and so is not logged repl::logOp(txn, "d", ns.c_str(), deletedId, 0, 0, fromMigrate); ctx.commit(); numDeleted++; } // TODO remove once the yielding below that references this timer has been removed Timer secondaryThrottleTime; if (writeConcern.shouldWaitForOtherNodes() && numDeleted > 0) { repl::ReplicationCoordinator::StatusAndDuration replStatus = repl::getGlobalReplicationCoordinator()->awaitReplication(txn, txn->getClient()->getLastOp(), writeConcern); if (replStatus.status.code() == ErrorCodes::ExceededTimeLimit) { warning(LogComponent::kSharding) << "replication to secondaries for removeRange at " "least 60 seconds behind"; } else { massertStatusOK(replStatus.status); } millisWaitingForReplication += replStatus.duration.total_milliseconds(); } } if (writeConcern.shouldWaitForOtherNodes()) log(LogComponent::kSharding) << "Helpers::removeRangeUnlocked time spent waiting for replication: " << millisWaitingForReplication << "ms" << endl; MONGO_LOG_COMPONENT(1, LogComponent::kSharding) << "end removal of " << min << " to " << max << " in " << ns << " (took " << rangeRemoveTimer.millis() << "ms)" << endl; return numDeleted; } const long long Helpers::kMaxDocsPerChunk( 250000 ); // Used by migration clone step // TODO: Cannot hook up quite yet due to _trackerLocks in shared migration code. // TODO: This function is not used outside of tests Status Helpers::getLocsInRange( OperationContext* txn, const KeyRange& range, long long maxChunkSizeBytes, set* locs, long long* numDocs, long long* estChunkSizeBytes ) { const string ns = range.ns; *estChunkSizeBytes = 0; *numDocs = 0; Client::ReadContext ctx(txn, ns); Collection* collection = ctx.ctx().db()->getCollection( txn, ns ); if ( !collection ) return Status( ErrorCodes::NamespaceNotFound, ns ); // Require single key IndexDescriptor *idx = collection->getIndexCatalog()->findIndexByPrefix( txn, range.keyPattern, true ); if ( idx == NULL ) { return Status( ErrorCodes::IndexNotFound, range.keyPattern.toString() ); } // use the average object size to estimate how many objects a full chunk would carry // do that while traversing the chunk's range using the sharding index, below // there's a fair amount of slack before we determine a chunk is too large because object // sizes will vary long long avgDocsWhenFull; long long avgDocSizeBytes; const long long totalDocsInNS = collection->numRecords( txn ); if ( totalDocsInNS > 0 ) { // TODO: Figure out what's up here avgDocSizeBytes = collection->dataSize( txn ) / totalDocsInNS; avgDocsWhenFull = maxChunkSizeBytes / avgDocSizeBytes; avgDocsWhenFull = std::min( kMaxDocsPerChunk + 1, 130 * avgDocsWhenFull / 100 /* slack */); } else { avgDocSizeBytes = 0; avgDocsWhenFull = kMaxDocsPerChunk + 1; } // Assume both min and max non-empty, append MinKey's to make them fit chosen index KeyPattern idxKeyPattern( idx->keyPattern() ); BSONObj min = Helpers::toKeyFormat( idxKeyPattern.extendRangeBound( range.minKey, false ) ); BSONObj max = Helpers::toKeyFormat( idxKeyPattern.extendRangeBound( range.maxKey, false ) ); // do a full traversal of the chunk and don't stop even if we think it is a large chunk // we want the number of records to better report, in that case bool isLargeChunk = false; long long docCount = 0; auto_ptr exec( InternalPlanner::indexScan(txn, collection, idx, min, max, false)); // we can afford to yield here because any change to the base data that we might miss is // already being queued and will be migrated in the 'transferMods' stage DiskLoc loc; PlanExecutor::ExecState state; while (PlanExecutor::ADVANCED == (state = exec->getNext(NULL, &loc))) { if ( !isLargeChunk ) { locs->insert( loc ); } if ( ++docCount > avgDocsWhenFull ) { isLargeChunk = true; } } *numDocs = docCount; *estChunkSizeBytes = docCount * avgDocSizeBytes; if ( isLargeChunk ) { stringstream ss; ss << estChunkSizeBytes; return Status( ErrorCodes::InvalidLength, ss.str() ); } return Status::OK(); } void Helpers::emptyCollection(OperationContext* txn, const char *ns) { Client::Context context(txn, ns); deleteObjects(txn, context.db(), ns, BSONObj(), false); } Helpers::RemoveSaver::RemoveSaver( const string& a , const string& b , const string& why) : _out(0) { static int NUM = 0; _root = storageGlobalParams.dbpath; if ( a.size() ) _root /= a; if ( b.size() ) _root /= b; verify( a.size() || b.size() ); _file = _root; stringstream ss; ss << why << "." << terseCurrentTime(false) << "." << NUM++ << ".bson"; _file /= ss.str(); } Helpers::RemoveSaver::~RemoveSaver() { if ( _out ) { _out->close(); delete _out; _out = 0; } } void Helpers::RemoveSaver::goingToDelete( const BSONObj& o ) { if ( ! _out ) { boost::filesystem::create_directories( _root ); _out = new ofstream(); _out->open( _file.string().c_str() , ios_base::out | ios_base::binary ); if ( ! _out->good() ) { error() << "couldn't create file: " << _file.string() << " for remove saving" << endl; delete _out; _out = 0; return; } } _out->write( o.objdata() , o.objsize() ); } } // namespace mongo