summaryrefslogtreecommitdiff
path: root/src/mongo/s/strategy.cpp
diff options
context:
space:
mode:
authorDavid Storch <david.storch@10gen.com>2015-11-11 10:25:56 -0500
committerDavid Storch <david.storch@10gen.com>2015-11-11 17:20:02 -0500
commit541489161282cee955bb32489cacfdba3c6bfe88 (patch)
tree5f9742f8d7b4b350d5cb68483be5dd7475f517f1 /src/mongo/s/strategy.cpp
parent0cdcbd1ff5615db4b08951abc2aeddd8974d42ee (diff)
downloadmongo-541489161282cee955bb32489cacfdba3c6bfe88.tar.gz
SERVER-20758 delete legacy mongos query path
Also removes the useClusterClientCursor setParameter which was used to control which query path is active.
Diffstat (limited to 'src/mongo/s/strategy.cpp')
-rw-r--r--src/mongo/s/strategy.cpp388
1 files changed, 92 insertions, 296 deletions
diff --git a/src/mongo/s/strategy.cpp b/src/mongo/s/strategy.cpp
index 9534192381b..8827f554985 100644
--- a/src/mongo/s/strategy.cpp
+++ b/src/mongo/s/strategy.cpp
@@ -79,8 +79,6 @@ using std::string;
using std::stringstream;
using std::vector;
-MONGO_EXPORT_SERVER_PARAMETER(useClusterClientCursor, bool, true);
-
static bool _isSystemIndexes(const char* ns) {
return nsToCollectionSubstring(ns) == "system.indexes";
}
@@ -151,8 +149,6 @@ static bool doShardedIndexQuery(OperationContext* txn, Request& request, const Q
void Strategy::queryOp(OperationContext* txn, Request& request) {
verify(!NamespaceString(request.getns()).isCommand());
- Timer queryTimer;
-
globalOpCounters.gotQuery();
QueryMessage q(request.d());
@@ -176,177 +172,83 @@ void Strategy::queryOp(OperationContext* txn, Request& request) {
" " + q.query.toString());
}
- // Spigot which controls whether OP_QUERY style find on mongos uses the new ClusterClientCursor
- // code path.
- // TODO: Delete the spigot and always use the new code.
- if (useClusterClientCursor) {
- // Determine the default read preference mode based on the value of the slaveOk flag.
- ReadPreference readPreferenceOption = (q.queryOptions & QueryOption_SlaveOk)
- ? ReadPreference::SecondaryPreferred
- : ReadPreference::PrimaryOnly;
- ReadPreferenceSetting readPreference(readPreferenceOption, TagSet());
-
- BSONElement rpElem;
- auto readPrefExtractStatus = bsonExtractTypedField(
- q.query, LiteParsedQuery::kWrappedReadPrefField, mongo::Object, &rpElem);
-
- if (readPrefExtractStatus.isOK()) {
- auto parsedRps = ReadPreferenceSetting::fromBSON(rpElem.Obj());
- uassertStatusOK(parsedRps.getStatus());
- readPreference = parsedRps.getValue();
- } else if (readPrefExtractStatus != ErrorCodes::NoSuchKey) {
- uassertStatusOK(readPrefExtractStatus);
- }
+ // Determine the default read preference mode based on the value of the slaveOk flag.
+ ReadPreference readPreferenceOption = (q.queryOptions & QueryOption_SlaveOk)
+ ? ReadPreference::SecondaryPreferred
+ : ReadPreference::PrimaryOnly;
+ ReadPreferenceSetting readPreference(readPreferenceOption, TagSet());
+
+ BSONElement rpElem;
+ auto readPrefExtractStatus = bsonExtractTypedField(
+ q.query, LiteParsedQuery::kWrappedReadPrefField, mongo::Object, &rpElem);
+
+ if (readPrefExtractStatus.isOK()) {
+ auto parsedRps = ReadPreferenceSetting::fromBSON(rpElem.Obj());
+ uassertStatusOK(parsedRps.getStatus());
+ readPreference = parsedRps.getValue();
+ } else if (readPrefExtractStatus != ErrorCodes::NoSuchKey) {
+ uassertStatusOK(readPrefExtractStatus);
+ }
- auto canonicalQuery = CanonicalQuery::canonicalize(q, ExtensionsCallbackNoop());
- uassertStatusOK(canonicalQuery.getStatus());
-
- // If the $explain flag was set, we must run the operation on the shards as an explain
- // command rather than a find command.
- if (canonicalQuery.getValue()->getParsed().isExplain()) {
- const LiteParsedQuery& lpq = canonicalQuery.getValue()->getParsed();
- BSONObj findCommand = lpq.asFindCommand();
-
- // We default to allPlansExecution verbosity.
- auto verbosity = ExplainCommon::EXEC_ALL_PLANS;
-
- const bool secondaryOk = (readPreference.pref != ReadPreference::PrimaryOnly);
- rpc::ServerSelectionMetadata metadata(secondaryOk, readPreference);
-
- BSONObjBuilder explainBuilder;
- uassertStatusOK(
- Strategy::explainFind(txn, findCommand, lpq, verbosity, metadata, &explainBuilder));
-
- BSONObj explainObj = explainBuilder.done();
- replyToQuery(0, // query result flags
- request.p(),
- request.m(),
- static_cast<const void*>(explainObj.objdata()),
- explainObj.objsize(),
- 1, // numResults
- 0, // startingFrom
- CursorId(0));
- return;
- }
+ auto canonicalQuery = CanonicalQuery::canonicalize(q, ExtensionsCallbackNoop());
+ uassertStatusOK(canonicalQuery.getStatus());
- // Do the work to generate the first batch of results. This blocks waiting to get responses
- // from the shard(s).
- std::vector<BSONObj> batch;
-
- // 0 means the cursor is exhausted and
- // otherwise we assume that a cursor with the returned id can be retrieved via the
- // ClusterCursorManager
- auto cursorId =
- ClusterFind::runQuery(txn, *canonicalQuery.getValue(), readPreference, &batch);
- uassertStatusOK(cursorId.getStatus());
-
- // TODO: this constant should be shared between mongos and mongod, and should
- // not be inside ShardedClientCursor.
- BufBuilder buffer(ShardedClientCursor::INIT_REPLY_BUFFER_SIZE);
-
- // Fill out the response buffer.
- int numResults = 0;
- for (const auto& obj : batch) {
- buffer.appendBuf((void*)obj.objdata(), obj.objsize());
- numResults++;
- }
+ // If the $explain flag was set, we must run the operation on the shards as an explain command
+ // rather than a find command.
+ if (canonicalQuery.getValue()->getParsed().isExplain()) {
+ const LiteParsedQuery& lpq = canonicalQuery.getValue()->getParsed();
+ BSONObj findCommand = lpq.asFindCommand();
+
+ // We default to allPlansExecution verbosity.
+ auto verbosity = ExplainCommon::EXEC_ALL_PLANS;
+
+ const bool secondaryOk = (readPreference.pref != ReadPreference::PrimaryOnly);
+ rpc::ServerSelectionMetadata metadata(secondaryOk, readPreference);
+
+ BSONObjBuilder explainBuilder;
+ uassertStatusOK(
+ Strategy::explainFind(txn, findCommand, lpq, verbosity, metadata, &explainBuilder));
+ BSONObj explainObj = explainBuilder.done();
replyToQuery(0, // query result flags
request.p(),
request.m(),
- buffer.buf(),
- buffer.len(),
- numResults,
+ static_cast<const void*>(explainObj.objdata()),
+ explainObj.objsize(),
+ 1, // numResults
0, // startingFrom
- cursorId.getValue());
+ CursorId(0));
return;
}
- QuerySpec qSpec((string)q.ns, q.query, q.fields, q.ntoskip, q.ntoreturn, q.queryOptions);
+ // Do the work to generate the first batch of results. This blocks waiting to get responses from
+ // the shard(s).
+ std::vector<BSONObj> batch;
- // Parse "$maxTimeMS".
- StatusWith<int> maxTimeMS =
- LiteParsedQuery::parseMaxTimeMS(q.query[LiteParsedQuery::queryOptionMaxTimeMS]);
- uassert(17233, maxTimeMS.getStatus().reason(), maxTimeMS.isOK());
+ // 0 means the cursor is exhausted. Otherwise we assume that a cursor with the returned id can
+ // be retrieved via the ClusterCursorManager.
+ auto cursorId = ClusterFind::runQuery(txn, *canonicalQuery.getValue(), readPreference, &batch);
+ uassertStatusOK(cursorId.getStatus());
- if (_isSystemIndexes(q.ns) && doShardedIndexQuery(txn, request, qSpec)) {
- return;
- }
-
- ParallelSortClusteredCursor* cursor = new ParallelSortClusteredCursor(qSpec, CommandInfo());
- verify(cursor);
-
- // TODO: Move out to Request itself, not strategy based
- try {
- cursor->init(txn);
+ // TODO: this constant should be shared between mongos and mongod, and should not be inside
+ // ShardedClientCursor.
+ BufBuilder buffer(ShardedClientCursor::INIT_REPLY_BUFFER_SIZE);
- if (qSpec.isExplain()) {
- BSONObjBuilder explain_builder;
- cursor->explain(explain_builder);
- explain_builder.appendNumber("executionTimeMillis",
- static_cast<long long>(queryTimer.millis()));
- BSONObj b = explain_builder.obj();
-
- replyToQuery(0, request.p(), request.m(), b);
- delete (cursor);
- return;
- }
- } catch (...) {
- delete cursor;
- throw;
+ // Fill out the response buffer.
+ int numResults = 0;
+ for (const auto& obj : batch) {
+ buffer.appendBuf((void*)obj.objdata(), obj.objsize());
+ numResults++;
}
- // TODO: Revisit all of this when we revisit the sharded cursor cache
-
- if (cursor->getNumQueryShards() != 1) {
- // More than one shard (or zero), manage with a ShardedClientCursor
- // NOTE: We may also have *zero* shards here when the returnPartial flag is set.
- // Currently the code in ShardedClientCursor handles this.
-
- ShardedClientCursorPtr cc(new ShardedClientCursor(q, cursor));
-
- BufBuilder buffer(ShardedClientCursor::INIT_REPLY_BUFFER_SIZE);
- int docCount = 0;
- const int startFrom = cc->getTotalSent();
- bool hasMore = cc->sendNextBatch(q.ntoreturn, buffer, docCount);
-
- if (hasMore) {
- LOG(5) << "storing cursor : " << cc->getId();
-
- int cursorLeftoverMillis = maxTimeMS.getValue() - queryTimer.millis();
- if (maxTimeMS.getValue() == 0) { // 0 represents "no limit".
- cursorLeftoverMillis = kMaxTimeCursorNoTimeLimit;
- } else if (cursorLeftoverMillis <= 0) {
- cursorLeftoverMillis = kMaxTimeCursorTimeLimitExpired;
- }
-
- cursorCache.store(cc, cursorLeftoverMillis);
- }
-
- replyToQuery(0,
- request.p(),
- request.m(),
- buffer.buf(),
- buffer.len(),
- docCount,
- startFrom,
- hasMore ? cc->getId() : 0);
- } else {
- // Only one shard is used
-
- // Remote cursors are stored remotely, we shouldn't need this around.
- unique_ptr<ParallelSortClusteredCursor> cursorDeleter(cursor);
-
- ShardPtr shard = grid.shardRegistry()->getShard(txn, cursor->getQueryShardId());
- verify(shard.get());
- DBClientCursorPtr shardCursor = cursor->getShardCursor(shard->getId());
-
- // Implicitly stores the cursor in the cache
- request.reply(*(shardCursor->getMessage()), shardCursor->originalHost());
-
- // We don't want to kill the cursor remotely if there's still data left
- shardCursor->decouple();
- }
+ replyToQuery(0, // query result flags
+ request.p(),
+ request.m(),
+ buffer.buf(),
+ buffer.len(),
+ numResults,
+ 0, // startingFrom
+ cursorId.getValue());
}
void Strategy::clientCommandOp(OperationContext* txn, Request& request) {
@@ -551,14 +453,12 @@ Status Strategy::commandOpUnsharded(OperationContext* txn,
}
void Strategy::getMore(OperationContext* txn, Request& request) {
- Timer getMoreTimer;
-
const char* ns = request.getns();
const int ntoreturn = request.d().pullInt();
const long long id = request.d().pullInt64();
- // TODO: Handle stale config exceptions here from coll being dropped or sharded during op
- // for now has same semantics as legacy request
+ // TODO: Handle stale config exceptions here from coll being dropped or sharded during op for
+ // now has same semantics as legacy request.
const NamespaceString nss(ns);
auto statusGetDb = grid.catalogCache()->getDatabase(txn, nss.db().toString());
if (statusGetDb == ErrorCodes::NamespaceNotFound) {
@@ -569,147 +469,43 @@ void Strategy::getMore(OperationContext* txn, Request& request) {
uassertStatusOK(statusGetDb);
- // Spigot which controls whether OP_QUERY style find on mongos uses the new ClusterClientCursor
- // code path.
- //
- // TODO: Delete the spigot and always use the new code.
- if (useClusterClientCursor) {
- boost::optional<long long> batchSize;
- if (ntoreturn) {
- batchSize = abs(ntoreturn);
- }
- GetMoreRequest getMoreRequest(
- NamespaceString(ns), id, batchSize, boost::none, boost::none, boost::none);
-
- auto cursorResponse = ClusterFind::runGetMore(txn, getMoreRequest);
- if (cursorResponse == ErrorCodes::CursorNotFound) {
- replyToQuery(ResultFlag_CursorNotFound, request.p(), request.m(), 0, 0, 0);
- return;
- }
- uassertStatusOK(cursorResponse.getStatus());
-
- // Build the response document.
- //
- // TODO: this constant should be shared between mongos and mongod, and should not be inside
- // ShardedClientCursor.
- BufBuilder buffer(ShardedClientCursor::INIT_REPLY_BUFFER_SIZE);
-
- int numResults = 0;
- for (const auto& obj : cursorResponse.getValue().getBatch()) {
- buffer.appendBuf((void*)obj.objdata(), obj.objsize());
- ++numResults;
- }
+ boost::optional<long long> batchSize;
+ if (ntoreturn) {
+ batchSize = abs(ntoreturn);
+ }
+ GetMoreRequest getMoreRequest(
+ NamespaceString(ns), id, batchSize, boost::none, boost::none, boost::none);
- replyToQuery(0,
- request.p(),
- request.m(),
- buffer.buf(),
- buffer.len(),
- numResults,
- cursorResponse.getValue().getNumReturnedSoFar().value_or(0),
- cursorResponse.getValue().getCursorId());
+ auto cursorResponse = ClusterFind::runGetMore(txn, getMoreRequest);
+ if (cursorResponse == ErrorCodes::CursorNotFound) {
+ replyToQuery(ResultFlag_CursorNotFound, request.p(), request.m(), 0, 0, 0);
return;
}
+ uassertStatusOK(cursorResponse.getStatus());
- shared_ptr<DBConfig> config = statusGetDb.getValue();
-
- ShardPtr primary;
- ChunkManagerPtr info;
- config->getChunkManagerOrPrimary(txn, ns, info, primary);
-
- //
- // TODO: Cleanup cursor cache, consolidate into single codepath
+ // Build the response document.
//
- const string host = cursorCache.getRef(id);
- ShardedClientCursorPtr cursor = cursorCache.get(id);
- int cursorMaxTimeMS = cursorCache.getMaxTimeMS(id);
-
- // Cursor ids should not overlap between sharded and unsharded cursors
- massert(17012,
- str::stream() << "duplicate sharded and unsharded cursor id " << id << " detected for "
- << ns << ", duplicated on host " << host,
- NULL == cursorCache.get(id).get() || host.empty());
-
- ClientBasic* client = ClientBasic::getCurrent();
- NamespaceString nsString(ns);
- AuthorizationSession* authSession = AuthorizationSession::get(client);
- Status status = authSession->checkAuthForGetMore(nsString, id, false);
- audit::logGetMoreAuthzCheck(client, nsString, id, status.code());
- uassertStatusOK(status);
-
- if (!host.empty()) {
- LOG(3) << "single getmore: " << ns;
-
- // we used ScopedDbConnection because we don't get about config versions
- // not deleting data is handled elsewhere
- // and we don't want to call setShardVersion
- ScopedDbConnection conn(host);
-
- Message response;
- bool ok = conn->callRead(request.m(), response);
- uassert(10204, "dbgrid: getmore: error calling db", ok);
-
- bool hasMore = (response.singleData().getCursor() != 0);
-
- if (!hasMore) {
- cursorCache.removeRef(id);
- }
-
- request.reply(response, "" /*conn->getServerAddress() */);
- conn.done();
- return;
- } else if (cursor) {
- if (cursorMaxTimeMS == kMaxTimeCursorTimeLimitExpired) {
- cursorCache.remove(id);
- uasserted(ErrorCodes::ExceededTimeLimit, "operation exceeded time limit");
- }
-
- // TODO: Try to match logic of mongod, where on subsequent getMore() we pull lots more data?
- BufBuilder buffer(ShardedClientCursor::INIT_REPLY_BUFFER_SIZE);
- int docCount = 0;
- const int startFrom = cursor->getTotalSent();
- bool hasMore = cursor->sendNextBatch(ntoreturn, buffer, docCount);
-
- if (hasMore) {
- // still more data
- cursor->accessed();
-
- if (cursorMaxTimeMS != kMaxTimeCursorNoTimeLimit) {
- // Update remaining amount of time in cursor cache.
- int cursorLeftoverMillis = cursorMaxTimeMS - getMoreTimer.millis();
- if (cursorLeftoverMillis <= 0) {
- cursorLeftoverMillis = kMaxTimeCursorTimeLimitExpired;
- }
- cursorCache.updateMaxTimeMS(id, cursorLeftoverMillis);
- }
- } else {
- // we've exhausted the cursor
- cursorCache.remove(id);
- }
-
- replyToQuery(0,
- request.p(),
- request.m(),
- buffer.buf(),
- buffer.len(),
- docCount,
- startFrom,
- hasMore ? cursor->getId() : 0);
- return;
- } else {
- LOG(3) << "could not find cursor " << id << " in cache for " << ns;
-
- replyToQuery(ResultFlag_CursorNotFound, request.p(), request.m(), 0, 0, 0);
- return;
+ // TODO: this constant should be shared between mongos and mongod, and should not be inside
+ // ShardedClientCursor.
+ BufBuilder buffer(ShardedClientCursor::INIT_REPLY_BUFFER_SIZE);
+
+ int numResults = 0;
+ for (const auto& obj : cursorResponse.getValue().getBatch()) {
+ buffer.appendBuf((void*)obj.objdata(), obj.objsize());
+ ++numResults;
}
+
+ replyToQuery(0,
+ request.p(),
+ request.m(),
+ buffer.buf(),
+ buffer.len(),
+ numResults,
+ cursorResponse.getValue().getNumReturnedSoFar().value_or(0),
+ cursorResponse.getValue().getCursorId());
}
void Strategy::killCursors(OperationContext* txn, Request& request) {
- if (!useClusterClientCursor) {
- cursorCache.gotKillCursors(request.m());
- return;
- }
-
DbMessage& dbMessage = request.d();
const int numCursors = dbMessage.pullInt();
massert(28793,