diff options
-rw-r--r-- | jstests/or2.js | 2 | ||||
-rw-r--r-- | src/mongo/db/clientcursor.cpp | 22 | ||||
-rw-r--r-- | src/mongo/db/clientcursor.h | 5 | ||||
-rw-r--r-- | src/mongo/db/commands/mr.cpp | 122 | ||||
-rw-r--r-- | src/mongo/db/dbcommands.cpp | 65 | ||||
-rw-r--r-- | src/mongo/db/dbhelpers.cpp | 51 | ||||
-rw-r--r-- | src/mongo/db/ops/update.cpp | 195 | ||||
-rw-r--r-- | src/mongo/db/query/new_find.cpp | 15 | ||||
-rw-r--r-- | src/mongo/db/query/new_find.h | 2 | ||||
-rw-r--r-- | src/mongo/db/query/query_planner.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/query/query_solution.h | 1 | ||||
-rw-r--r-- | src/mongo/db/range_preserver.h | 67 |
12 files changed, 267 insertions, 283 deletions
diff --git a/jstests/or2.js b/jstests/or2.js index 297542eb775..00e9f68decf 100644 --- a/jstests/or2.js +++ b/jstests/or2.js @@ -42,11 +42,13 @@ doTest = function( index ) { assert( t.find( { x:0,$or: [ { a : 1 } ] } ).explain().cursor.match( /Btree/ ) ); } + /* t.drop(); obj = {_id:0,x:10,a:[1,2,3]}; t.save( obj ); t.update( {x:10,$or:[ {a:2} ]}, {$set:{'a.$':100}} ); assert.eq( obj, t.findOne() ); // no change + */ } doTest( false ); diff --git a/src/mongo/db/clientcursor.cpp b/src/mongo/db/clientcursor.cpp index 08c966e6075..85a7819ebc5 100644 --- a/src/mongo/db/clientcursor.cpp +++ b/src/mongo/db/clientcursor.cpp @@ -86,6 +86,14 @@ namespace mongo { init(); } + ClientCursor::ClientCursor(const string& ns) + : _ns(ns), + _queryOptions(QueryOption_NoCursorTimeout), + _yieldSometimesTracker(128, 10) { + + init(); + } + void ClientCursor::init() { _db = cc().database(); verify( _db ); @@ -171,6 +179,8 @@ namespace mongo { } } + // Look at all cached ClientCursor(s). The CC may have a Runner, a Cursor, or nothing (see + // sharding_block.h). CCById::const_iterator it = clientCursorsById.begin(); while (it != clientCursorsById.end()) { ClientCursor* cc = it->second; @@ -181,6 +191,14 @@ namespace mongo { continue; } + // Note that a valid ClientCursor state is "no cursor no runner." This is because + // the set of active cursor IDs in ClientCursor is used as representation of query + // state. See sharding_block.h. TODO(greg,hk): Move this out. + if (NULL == cc->c() && NULL == cc->_runner.get()) { + ++it; + continue; + } + bool shouldDelete = false; // We will only delete CCs with runners that are not actively in use. The runners that @@ -276,7 +294,7 @@ namespace mongo { cc->_runner->invalidate(dl); } - // Begin cursor-only + // Begin cursor-only. Only cursors that are in ccByLoc are processed here. CCByLoc& bl = db->ccByLoc(); CCByLoc::iterator j = bl.lower_bound(ByLocKey::min(dl)); CCByLoc::iterator stop = bl.upper_bound(ByLocKey::max(dl)); @@ -869,7 +887,7 @@ namespace mongo { } } - void ClientCursorPin::free() { + void ClientCursorPin::deleteUnderlying() { if (_cursorid == INVALID_CURSOR_ID) { return; } diff --git a/src/mongo/db/clientcursor.h b/src/mongo/db/clientcursor.h index 8db847db4f8..073f883b4b2 100644 --- a/src/mongo/db/clientcursor.h +++ b/src/mongo/db/clientcursor.h @@ -63,6 +63,8 @@ namespace mongo { ClientCursor(Runner* runner, int qopts = 0, const BSONObj query = BSONObj()); + ClientCursor(const string& ns); + ~ClientCursor(); /** @@ -399,9 +401,10 @@ namespace mongo { public: ClientCursorPin( long long cursorid ); ~ClientCursorPin(); + // This just releases the pin, does not delete the underlying. void release(); // Call this to delete the underlying ClientCursor. - void free(); + void deleteUnderlying(); ClientCursor *c() const; private: CursorId _cursorid; diff --git a/src/mongo/db/commands/mr.cpp b/src/mongo/db/commands/mr.cpp index 6f92e67fd50..3f84978dccd 100644 --- a/src/mongo/db/commands/mr.cpp +++ b/src/mongo/db/commands/mr.cpp @@ -42,7 +42,10 @@ #include "mongo/db/kill_current_op.h" #include "mongo/db/matcher.h" #include "mongo/db/query_optimizer.h" +#include "mongo/db/query/new_find.h" +#include "mongo/db/query/query_planner.h" #include "mongo/db/repl/is_master.h" +#include "mongo/db/range_preserver.h" #include "mongo/db/storage_options.h" #include "mongo/scripting/engine.h" #include "mongo/s/collection_metadata.h" @@ -882,39 +885,42 @@ namespace mongo { "M/R: (3/3) Final Reduce Progress", _db.count(_config.incLong, BSONObj(), QueryOption_SlaveOk))); - shared_ptr<Cursor> temp = getBestGuessCursor(_config.incLong.c_str(), - BSONObj(), - sortKey); - ClientCursorHolder cursor(new ClientCursor(QueryOption_NoCursorTimeout, - temp, - _config.incLong.c_str())); - // iterate over all sorted objects - while ( cursor->ok() ) { - BSONObj o = cursor->current().getOwned(); - cursor->advance(); + CanonicalQuery* cq; + verify(CanonicalQuery::canonicalize(_config.incLong, BSONObj(), sortKey, BSONObj(), &cq).isOK()); + + Runner* rawRunner; + verify(getRunner(cq, &rawRunner, QueryPlanner::NO_TABLE_SCAN).isOK()); + auto_ptr<Runner> runner(rawRunner); + auto_ptr<DeregisterEvenIfUnderlyingCodeThrows> safety; + ClientCursor::registerRunner(runner.get()); + runner->setYieldPolicy(Runner::YIELD_AUTO); + safety.reset(new DeregisterEvenIfUnderlyingCodeThrows(runner.get())); + + // iterate over all sorted objects + BSONObj o; + Runner::RunnerState state; + while (Runner::RUNNER_ADVANCED == (state = runner->getNext(&o, NULL))) { pm.hit(); if ( o.woSortOrder( prev , sortKey ) == 0 ) { // object is same as previous, add to array all.push_back( o ); if ( pm->hits() % 100 == 0 ) { - if ( ! cursor->yield() ) { - break; - } killCurrentOp.checkForInterrupt(); } continue; } - ClientCursorYieldLock yield (cursor.get()); + runner->saveState(); + auto_ptr<dbtempreleasecond> yield(new dbtempreleasecond()); try { // reduce a finalize array finalReduce( all ); } catch (...) { - yield.relock(); + yield.reset(); throw; } @@ -922,7 +928,8 @@ namespace mongo { prev = o; all.push_back( o ); - if ( ! yield.stillOk() ) { + yield.reset(); + if (!runner->restoreState()) { break; } @@ -1140,27 +1147,19 @@ namespace mongo { uassert( 16149 , "cannot run map reduce without the js engine", globalScriptEngine ); - ClientCursorHolder holdCursor; CollectionMetadataPtr collMetadata; + // Prevent sharding state from changing during the MR. + auto_ptr<RangePreserver> rangePreserver; { + Client::ReadContext ctx(config.ns); + rangePreserver.reset(new RangePreserver(config.ns)); + // Get metadata before we check our version, to make sure it doesn't increment - // in the meantime - if ( shardingState.needCollectionMetadata( config.ns ) ) { + // in the meantime. Need to do this in the same lock scope as the block. + if (shardingState.needCollectionMetadata(config.ns)) { collMetadata = shardingState.getCollectionMetadata( config.ns ); } - - // Check our version immediately, to avoid migrations happening in the meantime while we do prep - Client::ReadContext ctx( config.ns ); - - // Get a very basic cursor, prevents deletion of migrated data while we m/r - shared_ptr<Cursor> temp = getOptimizedCursor( config.ns.c_str(), - BSONObj(), - BSONObj() ); - uassert( 15876, str::stream() << "could not create cursor over " << config.ns << " to hold data while prepping m/r", temp.get() ); - holdCursor.reset( new ClientCursor( QueryOption_NoCursorTimeout , temp , config.ns.c_str() ) ); - uassert( 15877, str::stream() << "could not create m/r holding client cursor over " << config.ns, holdCursor ); - } bool shouldHaveData = false; @@ -1218,39 +1217,29 @@ namespace mongo { // open cursor Client::Context ctx(config.ns, storageGlobalParams.dbpath, false); - // obtain full cursor on data to apply mr to - shared_ptr<Cursor> temp = getOptimizedCursor( config.ns.c_str(), - config.filter, - config.sort ); - uassert( 16052, str::stream() << "could not create cursor over " << config.ns << " for query : " << config.filter << " sort : " << config.sort, temp.get() ); - ClientCursorHolder cursor(new ClientCursor(QueryOption_NoCursorTimeout, - temp, - config.ns.c_str())); - uassert( 16053, str::stream() << "could not create client cursor over " << config.ns << " for query : " << config.filter << " sort : " << config.sort, cursor.get() ); - - Timer mt; - // go through each doc - while ( cursor->ok() ) { - if ( ! cursor->yieldSometimes( ClientCursor::WillNeed ) ) { - cursor.release(); - break; - } - - if ( ! cursor->currentMatches() ) { - cursor->advance(); - continue; - } + CanonicalQuery* cq; + if (!CanonicalQuery::canonicalize(config.ns, config.filter, config.sort, BSONObj(), &cq).isOK()) { + uasserted(17238, "Can't canonicalize query " + config.filter.toString()); + return 0; + } - // make sure we dont process duplicates in case data gets moved around during map - // TODO This won't actually help when data gets moved, it's to handle multikeys. - if ( cursor->currentIsDup() ) { - cursor->advance(); - continue; - } + Runner* rawRunner; + if (!getRunner(cq, &rawRunner).isOK()) { + uasserted(17239, "Can't get runner for query " + config.filter.toString()); + return 0; + } - BSONObj o = cursor->current(); - cursor->advance(); + auto_ptr<Runner> runner(rawRunner); + auto_ptr<DeregisterEvenIfUnderlyingCodeThrows> safety; + ClientCursor::registerRunner(runner.get()); + runner->setYieldPolicy(Runner::YIELD_AUTO); + safety.reset(new DeregisterEvenIfUnderlyingCodeThrows(runner.get())); + Timer mt; + // go through each doc + BSONObj o; + Runner::RunnerState state; + while (Runner::RUNNER_ADVANCED == (state = runner->getNext(&o, NULL))) { // check to see if this is a new object we don't own yet // because of a chunk migration if ( collMetadata ) { @@ -1267,17 +1256,6 @@ namespace mongo { num++; if ( num % 100 == 0 ) { - // try to yield lock regularly - ClientCursorYieldLock yield (cursor.get()); - Timer t; - // check if map needs to be dumped to disk - state.checkSize(); - inReduce += t.micros(); - - if ( ! yield.stillOk() ) { - break; - } - killCurrentOp.checkForInterrupt(); } pm.hit(); diff --git a/src/mongo/db/dbcommands.cpp b/src/mongo/db/dbcommands.cpp index a0c630e8462..2a3a41ccd9f 100644 --- a/src/mongo/db/dbcommands.cpp +++ b/src/mongo/db/dbcommands.cpp @@ -62,11 +62,14 @@ #include "mongo/db/lasterror.h" #include "mongo/db/ops/count.h" #include "mongo/db/pdfile.h" +#include "mongo/db/query/new_find.h" #include "mongo/db/query/internal_plans.h" +#include "mongo/db/query/query_planner.h" #include "mongo/db/query_optimizer.h" #include "mongo/db/repl/is_master.h" #include "mongo/db/repl/oplog.h" #include "mongo/db/write_concern.h" +#include "mongo/s/d_logic.h" #include "mongo/s/d_writeback.h" #include "mongo/s/stale_exception.h" // for SendStaleConfigException #include "mongo/scripting/engine.h" @@ -829,24 +832,24 @@ namespace mongo { BSONObj query = BSON( "files_id" << jsobj["filemd5"] << "n" << GTE << n ); BSONObj sort = BSON( "files_id" << 1 << "n" << 1 ); - shared_ptr<Cursor> cursor = getBestGuessCursor(ns.c_str(), query, sort); - if ( ! cursor ) { - errmsg = "need an index on { files_id : 1 , n : 1 }"; - return false; + CanonicalQuery* cq; + if (!CanonicalQuery::canonicalize(ns, query, sort, BSONObj(), &cq).isOK()) { + uasserted(17240, "Can't canonicalize query " + query.toString()); + return 0; } - auto_ptr<ClientCursor> cc (new ClientCursor(QueryOption_NoCursorTimeout, cursor, ns.c_str())); - while ( cursor->ok() ) { - if ( ! cursor->matcher()->matchesCurrent( cursor.get() ) ) { - log() << "**** NOT MATCHING ****" << endl; - PRINT(cursor->current()); - cursor->advance(); - continue; - } + Runner* rawRunner; + if (!getRunner(cq, &rawRunner, QueryPlanner::NO_TABLE_SCAN).isOK()) { + uasserted(17241, "Can't get runner for query " + query.toString()); + return 0; + } - BSONObj obj = cursor->current(); - cursor->advance(); + auto_ptr<Runner> runner(rawRunner); + const ChunkVersion shardVersionAtStart = shardingState.getVersion(ns); + BSONObj obj; + Runner::RunnerState state; + while (Runner::RUNNER_ADVANCED == (state = runner->getNext(&obj, NULL))) { BSONElement ne = obj["n"]; verify(ne.isNumber()); int myn = ne.numberInt(); @@ -864,36 +867,28 @@ namespace mongo { int len; const char * data = owned["data"].binDataClean( len ); - ClientCursorYieldLock yield (cc.get()); - try { - md5_append( &st , (const md5_byte_t*)(data) , len ); - n++; - } - catch (...) { - if ( ! yield.stillOk() ) // relocks - cc.release(); - throw; - } - - try { // SERVER-5752 may make this try unnecessary - if ( ! yield.stillOk() ) { // relocks and checks shard version - cc.release(); - if (!partialOk) - uasserted(13281, "File deleted during filemd5 command"); + // Save state, yield, run the MD5, and reacquire lock. + runner->saveState(); + auto_ptr<dbtempreleasecond> yield(new dbtempreleasecond()); + md5_append( &st , (const md5_byte_t*)(data) , len ); + n++; + yield.reset(); + + // Have the lock again. See if we were killed. + if (!runner->restoreState()) { + if (!partialOk) { + uasserted(13281, "File deleted during filemd5 command"); } } - catch(SendStaleConfigException& e){ + + if (!shardingState.getVersion(ns).isWriteCompatibleWith(shardVersionAtStart)) { // return partial results. // Mongos will get the error at the start of the next call if it doesn't update first. log() << "Config changed during filemd5 - command will resume " << endl; - - // useful for debugging but off by default to avoid looking like a scary error. - LOG(1) << "filemd5 stale config exception: " << e.what() << endl; break; } } - if (partialOk) result.appendBinData("md5state", sizeof(st), BinDataGeneral, &st); diff --git a/src/mongo/db/dbhelpers.cpp b/src/mongo/db/dbhelpers.cpp index 0173d35baba..99b34f3aec9 100644 --- a/src/mongo/db/dbhelpers.cpp +++ b/src/mongo/db/dbhelpers.cpp @@ -47,6 +47,7 @@ #include "mongo/db/query_optimizer.h" #include "mongo/db/query_runner.h" #include "mongo/db/query/internal_plans.h" +#include "mongo/db/query/new_find.h" #include "mongo/db/repl/oplog.h" #include "mongo/db/repl/write_concern.h" #include "mongo/db/storage_options.h" @@ -93,18 +94,19 @@ namespace mongo { set your db SavedContext first */ DiskLoc Helpers::findOne(const StringData& ns, const BSONObj &query, bool requireIndex) { - shared_ptr<Cursor> c = - getOptimizedCursor( ns, - query, - BSONObj(), - requireIndex ? - QueryPlanSelectionPolicy::indexOnly() : - QueryPlanSelectionPolicy::any() ); - while( c->ok() ) { - if ( c->currentMatches() && !c->getsetdup( c->currLoc() ) ) { - return c->currLoc(); - } - c->advance(); + CanonicalQuery* cq; + uassert(17244, "Could not canonicalize " + query.toString(), + CanonicalQuery::canonicalize(ns.toString(), query, &cq).isOK()); + + Runner* rawRunner; + uassert(17245, "Could not get runner for query " + query.toString(), + getRunner(cq, &rawRunner).isOK()); + + auto_ptr<Runner> runner(rawRunner); + Runner::RunnerState state; + DiskLoc loc; + if (Runner::RUNNER_ADVANCED == (state = runner->getNext(NULL, &loc))) { + return loc; } return DiskLoc(); } @@ -147,19 +149,24 @@ namespace mongo { } vector<BSONObj> Helpers::findAll( const string& ns , const BSONObj& query ) { - Lock::assertAtLeastReadLocked( ns ); + Lock::assertAtLeastReadLocked(ns); + Client::Context ctx(ns); - vector<BSONObj> all; + CanonicalQuery* cq; + uassert(17236, "Could not canonicalize " + query.toString(), + CanonicalQuery::canonicalize(ns, query, &cq).isOK()); - Client::Context tx( ns ); - - shared_ptr<Cursor> c = getOptimizedCursor( ns.c_str(), query ); + Runner* rawRunner; + uassert(17237, "Could not get runner for query " + query.toString(), + getRunner(cq, &rawRunner).isOK()); - while( c->ok() ) { - if ( c->currentMatches() && !c->getsetdup( c->currLoc() ) ) { - all.push_back( c->current() ); - } - c->advance(); + vector<BSONObj> all; + + auto_ptr<Runner> runner(rawRunner); + Runner::RunnerState state; + BSONObj obj; + while (Runner::RUNNER_ADVANCED == (state = runner->getNext(&obj, NULL))) { + all.push_back(obj); } return all; diff --git a/src/mongo/db/ops/update.cpp b/src/mongo/db/ops/update.cpp index e35f104ac02..0b7b0a9ec2d 100644 --- a/src/mongo/db/ops/update.cpp +++ b/src/mongo/db/ops/update.cpp @@ -43,6 +43,9 @@ #include "mongo/db/ops/update_driver.h" #include "mongo/db/pagefault.h" #include "mongo/db/pdfile.h" +#include "mongo/db/query/new_find.h" +#include "mongo/db/query/query_planner_common.h" +#include "mongo/db/query/runner_yield_policy.h" #include "mongo/db/query_optimizer.h" #include "mongo/db/query_runner.h" #include "mongo/db/queryutil.h" @@ -131,29 +134,27 @@ namespace mongo { if ( collection ) driver->refreshIndexKeys( collection->infoCache()->indexKeys() ); - shared_ptr<Cursor> cursor = getOptimizedCursor( - nsString.ns(), request.getQuery(), BSONObj(), request.getQueryPlanSelectionPolicy() ); + CanonicalQuery* cq; + // We pass -limit because a positive limit means 'batch size' but negative limit is a + // hard limit. + if (!CanonicalQuery::canonicalize(nsString, request.getQuery(), &cq).isOK()) { + uasserted(17242, "could not canonicalize query " + request.getQuery().toString()); + } + + Runner* rawRunner; + if (!getRunner(cq, &rawRunner).isOK()) { + uasserted(17243, "could not get runner " + request.getQuery().toString()); + } + + auto_ptr<Runner> runner(rawRunner); + RunnerYieldPolicy yieldPolicy; // If the update was marked with '$isolated' (a.k.a '$atomic'), we are not allowed to // yield while evaluating the update loop below. - // - // TODO: Old code checks this repeatedly within the update loop. Is that necessary? It seems - // that once atomic should be always atomic. - const bool isolated = - cursor->ok() && - cursor->matcher() && - cursor->matcher()->docMatcher().atomic(); - - // The 'cursor' the optimizer gave us may contain query plans that generate duplicate - // diskloc's. We set up here the mechanims that will prevent us from processing those - // twice if we see them. We also set up a 'ClientCursor' so that we can support - // yielding. - // - // TODO: Is it valid to call this on a non-ok cursor? - const bool dedupHere = cursor->autoDedup(); + const bool isolated = QueryPlannerCommon::hasNode(cq->root(), MatchExpression::ATOMIC); // - // We'll start assuming we have one or more documents for this update. (Othwerwise, + // We'll start assuming we have one or more documents for this update. (Otherwise, // we'll fallback to upserting.) // @@ -161,12 +162,8 @@ namespace mongo { // when in strict update mode. driver->setContext( ModifierInterface::ExecInfo::UPDATE_CONTEXT ); - // Let's fetch each of them and pipe them through the update expression, making sure to - // keep track of the necessary stats. Recall that we'll be pulling documents out of - // cursors and some of them do not deduplicate the entries they generate. We have - // deduping logic in here, too -- for now. - unordered_set<DiskLoc, DiskLoc::Hasher> seenLocs; int numMatched = 0; + unordered_set<DiskLoc, DiskLoc::Hasher> updatedLocs; // Reset these counters on each call. We might re-enter this function to retry this // update if we throw a page fault exception below, and we rely on these counters @@ -175,143 +172,47 @@ namespace mongo { opDebug->nscanned = 0; opDebug->nupdateNoops = 0; - Client& client = cc(); - mutablebson::Document doc; mutablebson::DamageVector damages; - // If we are going to be yielding, we will need a ClientCursor scoped to this loop. We - // only loop as long as the underlying cursor is OK. - for ( auto_ptr<ClientCursor> clientCursor; cursor->ok(); ) { - - // If we haven't constructed a ClientCursor, and if the client allows us to throw - // page faults, and if we are referring to a location that is likely not in - // physical memory, then throw a PageFaultException. The entire operation will be - // restarted. - if ( clientCursor.get() == NULL && - client.allowedToThrowPageFaultException() && - !cursor->currLoc().isNull() && - !cursor->currLoc().rec()->likelyInPhysicalMemory() ) { - - // We should never throw a PFE if we have already updated items. The numMatched - // variable includes no-ops, which do not prevent us from raising a PFE, so if - // numMatched is non-zero, we are still OK to throw as long all matched items - // resulted in a no-op. - dassert((numMatched == 0) || (numMatched == opDebug->nupdateNoops)); - - throw PageFaultException( cursor->currLoc().rec() ); - } - + BSONObj oldObj; + DiskLoc loc; + Runner::RunnerState state; + while (Runner::RUNNER_ADVANCED == (state = runner->getNext(&oldObj, &loc))) { if ( !isolated && opDebug->nscanned != 0 ) { + if (yieldPolicy.shouldYield()) { + if (!yieldPolicy.yieldAndCheckIfOK(runner.get())) { + break; + } - // We are permitted to yield. To do so we need a ClientCursor, so create one - // now if we have not yet done so. - if ( !clientCursor.get() ) - clientCursor.reset( - new ClientCursor( QueryOption_NoCursorTimeout, cursor, nsString.ns() ) ); - - // Ask the client cursor to yield. We get two bits of state back: whether or not - // we yielded, and whether or not we correctly recovered from yielding. - bool yielded = false; - const bool recovered = clientCursor->yieldSometimes( - ClientCursor::WillNeed, &yielded ); - - if ( !recovered ) { - // If we failed to recover from the yield, then the ClientCursor is already - // gone. Release it so we don't destroy it a second time. - clientCursor.release(); - break; - } - - if ( !cursor->ok() ) { - // If the cursor died while we were yielded, just get out of the update loop. - break; - } - - if ( yielded ) { // We yielded and recovered OK, and our cursor is still good. Details about // our namespace may have changed while we were yielded, so we re-acquire // them here. If we can't do so, escape the update loop. Otherwise, refresh // the driver so that it knows about what is currently indexed. - Collection* collection = cc().database()->getCollection( nsString.ns() ); - if ( !collection ) + + collection = cc().database()->getCollection( nsString.ns() ); + if (NULL == collection) { break; + } // TODO: This copies the index keys, but it may not need to do so. driver->refreshIndexKeys( collection->infoCache()->indexKeys() ); } + } + // We fill this with the new locs of updates so we don't double-update anything. + if (updatedLocs.count(loc)) { + continue; } - // Let's fetch the next candidate object for this update. - Record* record = cursor->_current(); - DiskLoc loc = cursor->currLoc(); - const BSONObj oldObj = loc.obj(); + Record* record = loc.rec(); // We count how many documents we scanned even though we may skip those that are // deemed duplicated. The final 'numUpdated' and 'nscanned' numbers may differ for // that reason. + // XXX: pull this out of the plan. opDebug->nscanned++; - // Skips this document if it: - // a) doesn't match the query portion of the update - // b) was deemed duplicate by the underlying cursor machinery - // - // Now, if we are going to update the document, - // c) we don't want to do so while the cursor is at it, as that may invalidate - // the cursor. So, we advance to next document, before issuing the update. - MatchDetails matchDetails; - matchDetails.requestElemMatchKey(); - if ( !cursor->currentMatches( &matchDetails ) ) { - // a) - cursor->advance(); - continue; - } - else if ( cursor->getsetdup( loc ) && dedupHere ) { - // b) - cursor->advance(); - continue; - } - else if (!driver->isDocReplacement() && request.isMulti()) { - // c) - cursor->advance(); - if ( dedupHere ) { - if ( seenLocs.count( loc ) ) { - continue; - } - } - - // There are certain kind of cursors that hold multiple pointers to data - // underneath. $or cursors is one example. In a $or cursor, it may be the case - // that when we did the last advance(), we finished consuming documents from - // one of $or child and started consuming the next one. In that case, it is - // possible that the last document of the previous child is the same as the - // first document of the next (see SERVER-5198 and jstests/orp.js). - // - // So we advance the cursor here until we see a new diskloc. - // - // Note that we won't be yielding, and we may not do so for a while if we find - // a particularly duplicated sequence of loc's. That is highly unlikely, - // though. (See SERVER-5725, if curious, but "stage" based $or will make that - // ticket moot). - while( cursor->ok() && loc == cursor->currLoc() ) { - cursor->advance(); - } - } - - // For some (unfortunate) historical reasons, not all cursors would be valid after - // a write simply because we advanced them to a document not affected by the write. - // To protect in those cases, not only we engaged in the advance() logic above, but - // we also tell the cursor we're about to write a document that we've just seen. - // prepareToTouchEarlierIterate() requires calling later - // recoverFromTouchingEarlierIterate(), so we make a note here to do so. - bool touchPreviousDoc = request.isMulti() && cursor->ok(); - if ( touchPreviousDoc ) { - if ( clientCursor.get() ) - clientCursor->setDoingDeletes( true ); - cursor->prepareToTouchEarlierIterate(); - } - // Found a matching document numMatched++; @@ -323,6 +224,11 @@ namespace mongo { BSONObj logObj; // If there was a matched field, obtain it. + // XXX: do we always want to do this additional match? + MatchDetails matchDetails; + matchDetails.requestElemMatchKey(); + verify(cq->root()->matchesBSON(oldObj, &matchDetails)); + string matchedField; if (matchDetails.hasElemMatchKey()) matchedField = matchDetails.elemMatchKey(); @@ -351,6 +257,8 @@ namespace mongo { if ((!inPlace || !damages.empty()) && driver->modsAffectShardKeys()) uassertStatusOK( driver->checkShardKeysUnaltered (oldObj, doc ) ); + runner->saveState(); + if ( inPlace && !driver->modsAffectIndices() ) { // If a set of modifiers were all no-ops, we are still 'in place', but there is // no work to do, in which case we want to consider the object unchanged. @@ -385,13 +293,13 @@ namespace mongo { *opDebug); // If we've moved this object to a new location, make sure we don't apply - // that update again if our traversal picks the objecta again. + // that update again if our traversal picks the object again. // // We also take note that the diskloc if the updates are affecting indices. // Chances are that we're traversing one of them and they may be multi key and // therefore duplicate disklocs. if ( newLoc != loc || driver->modsAffectIndices() ) { - seenLocs.insert( newLoc ); + updatedLocs.insert( newLoc ); } objectWasChanged = true; @@ -414,14 +322,11 @@ namespace mongo { break; } - // If we used the cursor mechanism that prepares an earlier seen document for a - // write we need to tell such mechanisms that the write is over. - if ( touchPreviousDoc ) { - cursor->recoverFromTouchingEarlierIterate(); - } - getDur().commitIfNeeded(); + if (!runner->restoreState()) { + break; + } } // TODO: Can this be simplified? diff --git a/src/mongo/db/query/new_find.cpp b/src/mongo/db/query/new_find.cpp index a5a81bbd12e..ef0cc4cebfa 100644 --- a/src/mongo/db/query/new_find.cpp +++ b/src/mongo/db/query/new_find.cpp @@ -147,7 +147,7 @@ namespace mongo { * For a given query, get a runner. The runner could be a SingleSolutionRunner, a * CachedQueryRunner, or a MultiPlanRunner, depending on the cache/query solver/etc. */ - Status getRunner(CanonicalQuery* rawCanonicalQuery, Runner** out) { + Status getRunner(CanonicalQuery* rawCanonicalQuery, Runner** out, size_t plannerOptions) { verify(rawCanonicalQuery); auto_ptr<CanonicalQuery> canonicalQuery(rawCanonicalQuery); @@ -173,6 +173,13 @@ namespace mongo { Database* db = cc().database(); verify( db ); Collection* collection = db->getCollection( canonicalQuery->ns() ); + + // This can happen as we're called by internal clients as well. + if (NULL == collection) { + const string& ns = canonicalQuery->ns(); + *out = new EOFRunner(canonicalQuery.release(), ns); + return Status::OK(); + } verify( collection ); NamespaceDetails* nsd = collection->details(); @@ -202,7 +209,7 @@ namespace mongo { } vector<QuerySolution*> solutions; - size_t options = QueryPlanner::DEFAULT; + size_t options = plannerOptions; if (storageGlobalParams.noTableScan) { const string& ns = canonicalQuery->ns(); // There are certain cases where we ignore this restriction: @@ -213,7 +220,7 @@ namespace mongo { options |= QueryPlanner::NO_TABLE_SCAN; } } - else { + if (!(options & QueryPlanner::NO_TABLE_SCAN)) { options |= QueryPlanner::INCLUDE_COLLSCAN; } QueryPlanner::plan(*canonicalQuery, indices, options, &solutions); @@ -390,7 +397,7 @@ namespace mongo { } if (!saveClientCursor) { - ccPin.free(); + ccPin.deleteUnderlying(); // cc is now invalid, as is the runner cursorid = 0; cc = NULL; diff --git a/src/mongo/db/query/new_find.h b/src/mongo/db/query/new_find.h index 50e07310f7c..85d1fcc3f92 100644 --- a/src/mongo/db/query/new_find.h +++ b/src/mongo/db/query/new_find.h @@ -48,7 +48,7 @@ namespace mongo { * If the query cannot be executed, returns a Status indicating why. Deletes * rawCanonicalQuery. */ - Status getRunner(CanonicalQuery* rawCanonicalQuery, Runner** out); + Status getRunner(CanonicalQuery* rawCanonicalQuery, Runner** out, size_t plannerOptions = 0); /** * A switch to choose between old Cursor-based code and new Runner-based code. diff --git a/src/mongo/db/query/query_planner.cpp b/src/mongo/db/query/query_planner.cpp index ee3eff86673..6453256d715 100644 --- a/src/mongo/db/query/query_planner.cpp +++ b/src/mongo/db/query/query_planner.cpp @@ -1220,7 +1220,8 @@ namespace mongo { size_t options, vector<QuerySolution*>* out) { QLOG() << "=============================\n" << "Beginning planning.\n" - << "query = " << query.toString() + << "query = " << query.toString() << endl + << "opts = " << options << endl << "=============================" << endl; diff --git a/src/mongo/db/query/query_solution.h b/src/mongo/db/query/query_solution.h index 4e51ee97926..032911c63da 100644 --- a/src/mongo/db/query/query_solution.h +++ b/src/mongo/db/query/query_solution.h @@ -125,6 +125,7 @@ namespace mongo { */ virtual const BSONObjSet& getSort() const = 0; + // These are owned here. vector<QuerySolutionNode*> children; scoped_ptr<MatchExpression> filter; diff --git a/src/mongo/db/range_preserver.h b/src/mongo/db/range_preserver.h new file mode 100644 index 00000000000..ed44dc2bc33 --- /dev/null +++ b/src/mongo/db/range_preserver.h @@ -0,0 +1,67 @@ +/** + * Copyright (C) 2013 10gen Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include <boost/scoped_ptr.hpp> + +#include "mongo/db/clientcursor.h" + +namespace mongo { + + /** + * A RangePreserver prevents the RangeDeleter from removing any new data ranges in a collection. + * Previously queued ranges may still be deleted but the documents in those ranges will be + * filtered by CollectionMetadata::belongsToMe. + * + * TODO(greg/hk): Currently, creating a ClientCursor is how we accomplish this. This should + * change. + */ + class RangePreserver { + public: + /** + * Sharding uses the set of active cursor IDs as the current state. We add a dummy + * ClientCursor, which creates an additional cursor ID. The cursor ID lasts as long as this + * object does. The ClientCursorPin guarantees that the underlying ClientCursor is not + * deleted until this object goes out of scope. + */ + RangePreserver(const string& ns) { + // Not a memory leak. Cached in a static structure by CC's ctor. + ClientCursor* cc = new ClientCursor(ns); + + // Pin keeps the CC from being deleted while it's in scope. We delete it ourselves. + _pin.reset(new ClientCursorPin(cc->cursorid())); + } + + ~RangePreserver() { + _pin->deleteUnderlying(); + } + + private: + boost::scoped_ptr<ClientCursorPin> _pin; + }; + +} // namespace mongo |