diff options
author | David Storch <david.storch@10gen.com> | 2015-11-11 10:25:56 -0500 |
---|---|---|
committer | David Storch <david.storch@10gen.com> | 2015-11-11 17:20:02 -0500 |
commit | 541489161282cee955bb32489cacfdba3c6bfe88 (patch) | |
tree | 5f9742f8d7b4b350d5cb68483be5dd7475f517f1 /src/mongo/s/strategy.cpp | |
parent | 0cdcbd1ff5615db4b08951abc2aeddd8974d42ee (diff) | |
download | mongo-541489161282cee955bb32489cacfdba3c6bfe88.tar.gz |
SERVER-20758 delete legacy mongos query path
Also removes the useClusterClientCursor
setParameter which was used to control which query
path is active.
Diffstat (limited to 'src/mongo/s/strategy.cpp')
-rw-r--r-- | src/mongo/s/strategy.cpp | 388 |
1 files changed, 92 insertions, 296 deletions
diff --git a/src/mongo/s/strategy.cpp b/src/mongo/s/strategy.cpp index 9534192381b..8827f554985 100644 --- a/src/mongo/s/strategy.cpp +++ b/src/mongo/s/strategy.cpp @@ -79,8 +79,6 @@ using std::string; using std::stringstream; using std::vector; -MONGO_EXPORT_SERVER_PARAMETER(useClusterClientCursor, bool, true); - static bool _isSystemIndexes(const char* ns) { return nsToCollectionSubstring(ns) == "system.indexes"; } @@ -151,8 +149,6 @@ static bool doShardedIndexQuery(OperationContext* txn, Request& request, const Q void Strategy::queryOp(OperationContext* txn, Request& request) { verify(!NamespaceString(request.getns()).isCommand()); - Timer queryTimer; - globalOpCounters.gotQuery(); QueryMessage q(request.d()); @@ -176,177 +172,83 @@ void Strategy::queryOp(OperationContext* txn, Request& request) { " " + q.query.toString()); } - // Spigot which controls whether OP_QUERY style find on mongos uses the new ClusterClientCursor - // code path. - // TODO: Delete the spigot and always use the new code. - if (useClusterClientCursor) { - // Determine the default read preference mode based on the value of the slaveOk flag. - ReadPreference readPreferenceOption = (q.queryOptions & QueryOption_SlaveOk) - ? ReadPreference::SecondaryPreferred - : ReadPreference::PrimaryOnly; - ReadPreferenceSetting readPreference(readPreferenceOption, TagSet()); - - BSONElement rpElem; - auto readPrefExtractStatus = bsonExtractTypedField( - q.query, LiteParsedQuery::kWrappedReadPrefField, mongo::Object, &rpElem); - - if (readPrefExtractStatus.isOK()) { - auto parsedRps = ReadPreferenceSetting::fromBSON(rpElem.Obj()); - uassertStatusOK(parsedRps.getStatus()); - readPreference = parsedRps.getValue(); - } else if (readPrefExtractStatus != ErrorCodes::NoSuchKey) { - uassertStatusOK(readPrefExtractStatus); - } + // Determine the default read preference mode based on the value of the slaveOk flag. + ReadPreference readPreferenceOption = (q.queryOptions & QueryOption_SlaveOk) + ? ReadPreference::SecondaryPreferred + : ReadPreference::PrimaryOnly; + ReadPreferenceSetting readPreference(readPreferenceOption, TagSet()); + + BSONElement rpElem; + auto readPrefExtractStatus = bsonExtractTypedField( + q.query, LiteParsedQuery::kWrappedReadPrefField, mongo::Object, &rpElem); + + if (readPrefExtractStatus.isOK()) { + auto parsedRps = ReadPreferenceSetting::fromBSON(rpElem.Obj()); + uassertStatusOK(parsedRps.getStatus()); + readPreference = parsedRps.getValue(); + } else if (readPrefExtractStatus != ErrorCodes::NoSuchKey) { + uassertStatusOK(readPrefExtractStatus); + } - auto canonicalQuery = CanonicalQuery::canonicalize(q, ExtensionsCallbackNoop()); - uassertStatusOK(canonicalQuery.getStatus()); - - // If the $explain flag was set, we must run the operation on the shards as an explain - // command rather than a find command. - if (canonicalQuery.getValue()->getParsed().isExplain()) { - const LiteParsedQuery& lpq = canonicalQuery.getValue()->getParsed(); - BSONObj findCommand = lpq.asFindCommand(); - - // We default to allPlansExecution verbosity. - auto verbosity = ExplainCommon::EXEC_ALL_PLANS; - - const bool secondaryOk = (readPreference.pref != ReadPreference::PrimaryOnly); - rpc::ServerSelectionMetadata metadata(secondaryOk, readPreference); - - BSONObjBuilder explainBuilder; - uassertStatusOK( - Strategy::explainFind(txn, findCommand, lpq, verbosity, metadata, &explainBuilder)); - - BSONObj explainObj = explainBuilder.done(); - replyToQuery(0, // query result flags - request.p(), - request.m(), - static_cast<const void*>(explainObj.objdata()), - explainObj.objsize(), - 1, // numResults - 0, // startingFrom - CursorId(0)); - return; - } + auto canonicalQuery = CanonicalQuery::canonicalize(q, ExtensionsCallbackNoop()); + uassertStatusOK(canonicalQuery.getStatus()); - // Do the work to generate the first batch of results. This blocks waiting to get responses - // from the shard(s). - std::vector<BSONObj> batch; - - // 0 means the cursor is exhausted and - // otherwise we assume that a cursor with the returned id can be retrieved via the - // ClusterCursorManager - auto cursorId = - ClusterFind::runQuery(txn, *canonicalQuery.getValue(), readPreference, &batch); - uassertStatusOK(cursorId.getStatus()); - - // TODO: this constant should be shared between mongos and mongod, and should - // not be inside ShardedClientCursor. - BufBuilder buffer(ShardedClientCursor::INIT_REPLY_BUFFER_SIZE); - - // Fill out the response buffer. - int numResults = 0; - for (const auto& obj : batch) { - buffer.appendBuf((void*)obj.objdata(), obj.objsize()); - numResults++; - } + // If the $explain flag was set, we must run the operation on the shards as an explain command + // rather than a find command. + if (canonicalQuery.getValue()->getParsed().isExplain()) { + const LiteParsedQuery& lpq = canonicalQuery.getValue()->getParsed(); + BSONObj findCommand = lpq.asFindCommand(); + + // We default to allPlansExecution verbosity. + auto verbosity = ExplainCommon::EXEC_ALL_PLANS; + + const bool secondaryOk = (readPreference.pref != ReadPreference::PrimaryOnly); + rpc::ServerSelectionMetadata metadata(secondaryOk, readPreference); + + BSONObjBuilder explainBuilder; + uassertStatusOK( + Strategy::explainFind(txn, findCommand, lpq, verbosity, metadata, &explainBuilder)); + BSONObj explainObj = explainBuilder.done(); replyToQuery(0, // query result flags request.p(), request.m(), - buffer.buf(), - buffer.len(), - numResults, + static_cast<const void*>(explainObj.objdata()), + explainObj.objsize(), + 1, // numResults 0, // startingFrom - cursorId.getValue()); + CursorId(0)); return; } - QuerySpec qSpec((string)q.ns, q.query, q.fields, q.ntoskip, q.ntoreturn, q.queryOptions); + // Do the work to generate the first batch of results. This blocks waiting to get responses from + // the shard(s). + std::vector<BSONObj> batch; - // Parse "$maxTimeMS". - StatusWith<int> maxTimeMS = - LiteParsedQuery::parseMaxTimeMS(q.query[LiteParsedQuery::queryOptionMaxTimeMS]); - uassert(17233, maxTimeMS.getStatus().reason(), maxTimeMS.isOK()); + // 0 means the cursor is exhausted. Otherwise we assume that a cursor with the returned id can + // be retrieved via the ClusterCursorManager. + auto cursorId = ClusterFind::runQuery(txn, *canonicalQuery.getValue(), readPreference, &batch); + uassertStatusOK(cursorId.getStatus()); - if (_isSystemIndexes(q.ns) && doShardedIndexQuery(txn, request, qSpec)) { - return; - } - - ParallelSortClusteredCursor* cursor = new ParallelSortClusteredCursor(qSpec, CommandInfo()); - verify(cursor); - - // TODO: Move out to Request itself, not strategy based - try { - cursor->init(txn); + // TODO: this constant should be shared between mongos and mongod, and should not be inside + // ShardedClientCursor. + BufBuilder buffer(ShardedClientCursor::INIT_REPLY_BUFFER_SIZE); - if (qSpec.isExplain()) { - BSONObjBuilder explain_builder; - cursor->explain(explain_builder); - explain_builder.appendNumber("executionTimeMillis", - static_cast<long long>(queryTimer.millis())); - BSONObj b = explain_builder.obj(); - - replyToQuery(0, request.p(), request.m(), b); - delete (cursor); - return; - } - } catch (...) { - delete cursor; - throw; + // Fill out the response buffer. + int numResults = 0; + for (const auto& obj : batch) { + buffer.appendBuf((void*)obj.objdata(), obj.objsize()); + numResults++; } - // TODO: Revisit all of this when we revisit the sharded cursor cache - - if (cursor->getNumQueryShards() != 1) { - // More than one shard (or zero), manage with a ShardedClientCursor - // NOTE: We may also have *zero* shards here when the returnPartial flag is set. - // Currently the code in ShardedClientCursor handles this. - - ShardedClientCursorPtr cc(new ShardedClientCursor(q, cursor)); - - BufBuilder buffer(ShardedClientCursor::INIT_REPLY_BUFFER_SIZE); - int docCount = 0; - const int startFrom = cc->getTotalSent(); - bool hasMore = cc->sendNextBatch(q.ntoreturn, buffer, docCount); - - if (hasMore) { - LOG(5) << "storing cursor : " << cc->getId(); - - int cursorLeftoverMillis = maxTimeMS.getValue() - queryTimer.millis(); - if (maxTimeMS.getValue() == 0) { // 0 represents "no limit". - cursorLeftoverMillis = kMaxTimeCursorNoTimeLimit; - } else if (cursorLeftoverMillis <= 0) { - cursorLeftoverMillis = kMaxTimeCursorTimeLimitExpired; - } - - cursorCache.store(cc, cursorLeftoverMillis); - } - - replyToQuery(0, - request.p(), - request.m(), - buffer.buf(), - buffer.len(), - docCount, - startFrom, - hasMore ? cc->getId() : 0); - } else { - // Only one shard is used - - // Remote cursors are stored remotely, we shouldn't need this around. - unique_ptr<ParallelSortClusteredCursor> cursorDeleter(cursor); - - ShardPtr shard = grid.shardRegistry()->getShard(txn, cursor->getQueryShardId()); - verify(shard.get()); - DBClientCursorPtr shardCursor = cursor->getShardCursor(shard->getId()); - - // Implicitly stores the cursor in the cache - request.reply(*(shardCursor->getMessage()), shardCursor->originalHost()); - - // We don't want to kill the cursor remotely if there's still data left - shardCursor->decouple(); - } + replyToQuery(0, // query result flags + request.p(), + request.m(), + buffer.buf(), + buffer.len(), + numResults, + 0, // startingFrom + cursorId.getValue()); } void Strategy::clientCommandOp(OperationContext* txn, Request& request) { @@ -551,14 +453,12 @@ Status Strategy::commandOpUnsharded(OperationContext* txn, } void Strategy::getMore(OperationContext* txn, Request& request) { - Timer getMoreTimer; - const char* ns = request.getns(); const int ntoreturn = request.d().pullInt(); const long long id = request.d().pullInt64(); - // TODO: Handle stale config exceptions here from coll being dropped or sharded during op - // for now has same semantics as legacy request + // TODO: Handle stale config exceptions here from coll being dropped or sharded during op for + // now has same semantics as legacy request. const NamespaceString nss(ns); auto statusGetDb = grid.catalogCache()->getDatabase(txn, nss.db().toString()); if (statusGetDb == ErrorCodes::NamespaceNotFound) { @@ -569,147 +469,43 @@ void Strategy::getMore(OperationContext* txn, Request& request) { uassertStatusOK(statusGetDb); - // Spigot which controls whether OP_QUERY style find on mongos uses the new ClusterClientCursor - // code path. - // - // TODO: Delete the spigot and always use the new code. - if (useClusterClientCursor) { - boost::optional<long long> batchSize; - if (ntoreturn) { - batchSize = abs(ntoreturn); - } - GetMoreRequest getMoreRequest( - NamespaceString(ns), id, batchSize, boost::none, boost::none, boost::none); - - auto cursorResponse = ClusterFind::runGetMore(txn, getMoreRequest); - if (cursorResponse == ErrorCodes::CursorNotFound) { - replyToQuery(ResultFlag_CursorNotFound, request.p(), request.m(), 0, 0, 0); - return; - } - uassertStatusOK(cursorResponse.getStatus()); - - // Build the response document. - // - // TODO: this constant should be shared between mongos and mongod, and should not be inside - // ShardedClientCursor. - BufBuilder buffer(ShardedClientCursor::INIT_REPLY_BUFFER_SIZE); - - int numResults = 0; - for (const auto& obj : cursorResponse.getValue().getBatch()) { - buffer.appendBuf((void*)obj.objdata(), obj.objsize()); - ++numResults; - } + boost::optional<long long> batchSize; + if (ntoreturn) { + batchSize = abs(ntoreturn); + } + GetMoreRequest getMoreRequest( + NamespaceString(ns), id, batchSize, boost::none, boost::none, boost::none); - replyToQuery(0, - request.p(), - request.m(), - buffer.buf(), - buffer.len(), - numResults, - cursorResponse.getValue().getNumReturnedSoFar().value_or(0), - cursorResponse.getValue().getCursorId()); + auto cursorResponse = ClusterFind::runGetMore(txn, getMoreRequest); + if (cursorResponse == ErrorCodes::CursorNotFound) { + replyToQuery(ResultFlag_CursorNotFound, request.p(), request.m(), 0, 0, 0); return; } + uassertStatusOK(cursorResponse.getStatus()); - shared_ptr<DBConfig> config = statusGetDb.getValue(); - - ShardPtr primary; - ChunkManagerPtr info; - config->getChunkManagerOrPrimary(txn, ns, info, primary); - - // - // TODO: Cleanup cursor cache, consolidate into single codepath + // Build the response document. // - const string host = cursorCache.getRef(id); - ShardedClientCursorPtr cursor = cursorCache.get(id); - int cursorMaxTimeMS = cursorCache.getMaxTimeMS(id); - - // Cursor ids should not overlap between sharded and unsharded cursors - massert(17012, - str::stream() << "duplicate sharded and unsharded cursor id " << id << " detected for " - << ns << ", duplicated on host " << host, - NULL == cursorCache.get(id).get() || host.empty()); - - ClientBasic* client = ClientBasic::getCurrent(); - NamespaceString nsString(ns); - AuthorizationSession* authSession = AuthorizationSession::get(client); - Status status = authSession->checkAuthForGetMore(nsString, id, false); - audit::logGetMoreAuthzCheck(client, nsString, id, status.code()); - uassertStatusOK(status); - - if (!host.empty()) { - LOG(3) << "single getmore: " << ns; - - // we used ScopedDbConnection because we don't get about config versions - // not deleting data is handled elsewhere - // and we don't want to call setShardVersion - ScopedDbConnection conn(host); - - Message response; - bool ok = conn->callRead(request.m(), response); - uassert(10204, "dbgrid: getmore: error calling db", ok); - - bool hasMore = (response.singleData().getCursor() != 0); - - if (!hasMore) { - cursorCache.removeRef(id); - } - - request.reply(response, "" /*conn->getServerAddress() */); - conn.done(); - return; - } else if (cursor) { - if (cursorMaxTimeMS == kMaxTimeCursorTimeLimitExpired) { - cursorCache.remove(id); - uasserted(ErrorCodes::ExceededTimeLimit, "operation exceeded time limit"); - } - - // TODO: Try to match logic of mongod, where on subsequent getMore() we pull lots more data? - BufBuilder buffer(ShardedClientCursor::INIT_REPLY_BUFFER_SIZE); - int docCount = 0; - const int startFrom = cursor->getTotalSent(); - bool hasMore = cursor->sendNextBatch(ntoreturn, buffer, docCount); - - if (hasMore) { - // still more data - cursor->accessed(); - - if (cursorMaxTimeMS != kMaxTimeCursorNoTimeLimit) { - // Update remaining amount of time in cursor cache. - int cursorLeftoverMillis = cursorMaxTimeMS - getMoreTimer.millis(); - if (cursorLeftoverMillis <= 0) { - cursorLeftoverMillis = kMaxTimeCursorTimeLimitExpired; - } - cursorCache.updateMaxTimeMS(id, cursorLeftoverMillis); - } - } else { - // we've exhausted the cursor - cursorCache.remove(id); - } - - replyToQuery(0, - request.p(), - request.m(), - buffer.buf(), - buffer.len(), - docCount, - startFrom, - hasMore ? cursor->getId() : 0); - return; - } else { - LOG(3) << "could not find cursor " << id << " in cache for " << ns; - - replyToQuery(ResultFlag_CursorNotFound, request.p(), request.m(), 0, 0, 0); - return; + // TODO: this constant should be shared between mongos and mongod, and should not be inside + // ShardedClientCursor. + BufBuilder buffer(ShardedClientCursor::INIT_REPLY_BUFFER_SIZE); + + int numResults = 0; + for (const auto& obj : cursorResponse.getValue().getBatch()) { + buffer.appendBuf((void*)obj.objdata(), obj.objsize()); + ++numResults; } + + replyToQuery(0, + request.p(), + request.m(), + buffer.buf(), + buffer.len(), + numResults, + cursorResponse.getValue().getNumReturnedSoFar().value_or(0), + cursorResponse.getValue().getCursorId()); } void Strategy::killCursors(OperationContext* txn, Request& request) { - if (!useClusterClientCursor) { - cursorCache.gotKillCursors(request.m()); - return; - } - DbMessage& dbMessage = request.d(); const int numCursors = dbMessage.pullInt(); massert(28793, |