/*
* Copyright (C) 2010 10gen Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects
* for all of the code used other than as permitted herein. If you modify
* file(s) with this exception, you may extend this exception to your
* version of the file(s), but you are not obligated to do so. If you do not
* wish to do so, delete this exception statement from your version. If you
* delete this exception statement from all source files in the program,
* then also delete it in the license file.
*/
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding
#include "mongo/platform/basic.h"
#include "mongo/s/strategy.h"
#include "mongo/base/status.h"
#include "mongo/base/owned_pointer_vector.h"
#include "mongo/bson/util/builder.h"
#include "mongo/bson/util/bson_extract.h"
#include "mongo/client/connpool.h"
#include "mongo/client/dbclientcursor.h"
#include "mongo/db/audit.h"
#include "mongo/db/auth/action_type.h"
#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/commands.h"
#include "mongo/db/max_time.h"
#include "mongo/db/server_parameters.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/query/lite_parsed_query.h"
#include "mongo/db/stats/counters.h"
#include "mongo/s/bson_serializable.h"
#include "mongo/s/catalog/catalog_cache.h"
#include "mongo/s/client/shard_registry.h"
#include "mongo/s/chunk_manager.h"
#include "mongo/s/chunk_version.h"
#include "mongo/s/config.h"
#include "mongo/s/cursors.h"
#include "mongo/s/grid.h"
#include "mongo/s/query/cluster_find.h"
#include "mongo/s/request.h"
#include "mongo/s/stale_exception.h"
#include "mongo/s/version_manager.h"
#include "mongo/s/write_ops/batched_command_request.h"
#include "mongo/s/write_ops/batch_upconvert.h"
#include "mongo/util/log.h"
#include "mongo/util/mongoutils/str.h"
#include "mongo/util/timer.h"
namespace mongo {
using std::unique_ptr;
using std::shared_ptr;
using std::set;
using std::string;
using std::stringstream;
using std::vector;
namespace {
// A spigot to enable the ClusterClientCursor codepath
MONGO_EXPORT_SERVER_PARAMETER(useClusterClientCursor, bool, false);
} // namespace
static bool _isSystemIndexes(const char* ns) {
return nsToCollectionSubstring(ns) == "system.indexes";
}
/**
* Returns true if request is a query for sharded indexes.
*/
static bool doShardedIndexQuery(Request& r, const QuerySpec& qSpec) {
// Extract the ns field from the query, which may be embedded within the "query" or
// "$query" field.
auto nsField = qSpec.filter()["ns"];
if (nsField.eoo()) {
return false;
}
const NamespaceString indexNSSQuery(nsField.str());
auto status = grid.catalogCache()->getDatabase(indexNSSQuery.db().toString());
if (!status.isOK()) {
return false;
}
shared_ptr config = status.getValue();
if (!config->isSharded(indexNSSQuery.ns())) {
return false;
}
// if you are querying on system.indexes, we need to make sure we go to a shard
// that actually has chunks. This is not a perfect solution (what if you just
// look at all indexes), but better than doing nothing.
ShardPtr shard;
ChunkManagerPtr cm;
config->getChunkManagerOrPrimary(indexNSSQuery.ns(), cm, shard);
if (cm) {
set shardIds;
cm->getAllShardIds(&shardIds);
verify(shardIds.size() > 0);
shard = grid.shardRegistry()->getShard(*shardIds.begin());
}
ShardConnection dbcon(shard->getConnString(), r.getns());
DBClientBase& c = dbcon.conn();
string actualServer;
Message response;
bool ok = c.call(r.m(), response, true, &actualServer);
uassert(10200, "mongos: error calling db", ok);
{
QueryResult::View qr = response.singleData().view2ptr();
if (qr.getResultFlags() & ResultFlag_ShardConfigStale) {
dbcon.done();
// Version is zero b/c this is deprecated codepath
throw RecvStaleConfigException(r.getns(),
"Strategy::doQuery",
ChunkVersion(0, 0, OID()),
ChunkVersion(0, 0, OID()));
}
}
r.reply(response, actualServer.size() ? actualServer : c.getServerAddress());
dbcon.done();
return true;
}
void Strategy::queryOp(Request& r) {
verify(!NamespaceString(r.getns()).isCommand());
Timer queryTimer;
globalOpCounters.gotQuery();
QueryMessage q(r.d());
NamespaceString ns(q.ns);
ClientBasic* client = ClientBasic::getCurrent();
AuthorizationSession* authSession = AuthorizationSession::get(client);
Status status = authSession->checkAuthForQuery(ns, q.query);
audit::logQueryAuthzCheck(client, ns, q.query, status.code());
uassertStatusOK(status);
LOG(3) << "query: " << q.ns << " " << q.query << " ntoreturn: " << q.ntoreturn
<< " options: " << q.queryOptions;
if (q.ntoreturn == 1 && strstr(q.ns, ".$cmd"))
throw UserException(8010, "something is wrong, shouldn't see a command here");
if (q.queryOptions & QueryOption_Exhaust) {
uasserted(18526,
string("the 'exhaust' query option is invalid for mongos queries: ") + q.ns +
" " + q.query.toString());
}
// Spigot which controls whether OP_QUERY style find on mongos uses the new ClusterClientCursor
// code path.
// TODO: Delete the spigot and always use the new code.
if (useClusterClientCursor) {
auto txn = cc().makeOperationContext();
ReadPreferenceSetting readPreference(ReadPreference::PrimaryOnly, TagSet::primaryOnly());
BSONElement rpElem;
auto readPrefExtractStatus = bsonExtractTypedField(
q.query, LiteParsedQuery::kFindCommandReadPrefField, mongo::Object, &rpElem);
if (readPrefExtractStatus.isOK()) {
auto parsedRps = ReadPreferenceSetting::fromBSON(rpElem.Obj());
uassertStatusOK(parsedRps.getStatus());
readPreference = parsedRps.getValue();
} else if (readPrefExtractStatus != ErrorCodes::NoSuchKey) {
uassertStatusOK(readPrefExtractStatus);
}
auto canonicalQuery = CanonicalQuery::canonicalize(q);
uassertStatusOK(canonicalQuery.getStatus());
// Do the work to generate the first batch of results. This blocks waiting to get responses
// from the shard(s).
std::vector batch;
// 0 means the cursor is exhausted and
// otherwise we assume that a cursor with the returned id can be retrieved via the
// ClusterCursorManager
auto cursorId =
ClusterFind::runQuery(txn.get(), *canonicalQuery.getValue(), readPreference, &batch);
uassertStatusOK(cursorId.getStatus());
// Build the response document.
// TODO: this constant should be shared between mongos and mongod, and should
// not be inside ShardedClientCursor.
BufBuilder buffer(ShardedClientCursor::INIT_REPLY_BUFFER_SIZE);
int numResults = 0;
for (const auto& obj : batch) {
buffer.appendBuf((void*)obj.objdata(), obj.objsize());
numResults++;
}
replyToQuery(0, // query result flags
r.p(),
r.m(),
buffer.buf(),
buffer.len(),
numResults,
0, // startingFrom
cursorId.getValue());
return;
}
QuerySpec qSpec((string)q.ns, q.query, q.fields, q.ntoskip, q.ntoreturn, q.queryOptions);
// Parse "$maxTimeMS".
StatusWith maxTimeMS = LiteParsedQuery::parseMaxTimeMSQuery(q.query);
uassert(17233, maxTimeMS.getStatus().reason(), maxTimeMS.isOK());
if (_isSystemIndexes(q.ns) && doShardedIndexQuery(r, qSpec)) {
return;
}
ParallelSortClusteredCursor* cursor = new ParallelSortClusteredCursor(qSpec, CommandInfo());
verify(cursor);
// TODO: Move out to Request itself, not strategy based
try {
cursor->init();
if (qSpec.isExplain()) {
BSONObjBuilder explain_builder;
cursor->explain(explain_builder);
explain_builder.appendNumber("executionTimeMillis",
static_cast(queryTimer.millis()));
BSONObj b = explain_builder.obj();
replyToQuery(0, r.p(), r.m(), b);
delete (cursor);
return;
}
} catch (...) {
delete cursor;
throw;
}
// TODO: Revisit all of this when we revisit the sharded cursor cache
if (cursor->getNumQueryShards() != 1) {
// More than one shard (or zero), manage with a ShardedClientCursor
// NOTE: We may also have *zero* shards here when the returnPartial flag is set.
// Currently the code in ShardedClientCursor handles this.
ShardedClientCursorPtr cc(new ShardedClientCursor(q, cursor));
BufBuilder buffer(ShardedClientCursor::INIT_REPLY_BUFFER_SIZE);
int docCount = 0;
const int startFrom = cc->getTotalSent();
bool hasMore = cc->sendNextBatch(q.ntoreturn, buffer, docCount);
if (hasMore) {
LOG(5) << "storing cursor : " << cc->getId();
int cursorLeftoverMillis = maxTimeMS.getValue() - queryTimer.millis();
if (maxTimeMS.getValue() == 0) { // 0 represents "no limit".
cursorLeftoverMillis = kMaxTimeCursorNoTimeLimit;
} else if (cursorLeftoverMillis <= 0) {
cursorLeftoverMillis = kMaxTimeCursorTimeLimitExpired;
}
cursorCache.store(cc, cursorLeftoverMillis);
}
replyToQuery(0,
r.p(),
r.m(),
buffer.buf(),
buffer.len(),
docCount,
startFrom,
hasMore ? cc->getId() : 0);
} else {
// Only one shard is used
// Remote cursors are stored remotely, we shouldn't need this around.
unique_ptr cursorDeleter(cursor);
ShardPtr shard = cursor->getQueryShard();
verify(shard.get());
DBClientCursorPtr shardCursor = cursor->getShardCursor(shard->getId());
// Implicitly stores the cursor in the cache
r.reply(*(shardCursor->getMessage()), shardCursor->originalHost());
// We don't want to kill the cursor remotely if there's still data left
shardCursor->decouple();
}
}
void Strategy::clientCommandOp(Request& r) {
QueryMessage q(r.d());
LOG(3) << "command: " << q.ns << " " << q.query << " ntoreturn: " << q.ntoreturn
<< " options: " << q.queryOptions;
if (q.queryOptions & QueryOption_Exhaust) {
uasserted(18527,
string("the 'exhaust' query option is invalid for mongos commands: ") + q.ns +
" " + q.query.toString());
}
NamespaceString nss(r.getns());
// Regular queries are handled in strategy_shard.cpp
verify(nss.isCommand() || nss.isSpecialCommand());
if (handleSpecialNamespaces(r, q))
return;
int loops = 5;
while (true) {
BSONObjBuilder builder;
try {
BSONObj cmdObj = q.query;
{
BSONElement e = cmdObj.firstElement();
if (e.type() == Object &&
(e.fieldName()[0] == '$' ? str::equals("query", e.fieldName() + 1)
: str::equals("query", e.fieldName()))) {
// Extract the embedded query object.
if (cmdObj.hasField(Query::ReadPrefField.name())) {
// The command has a read preference setting. We don't want
// to lose this information so we copy this to a new field
// called $queryOptions.$readPreference
BSONObjBuilder finalCmdObjBuilder;
finalCmdObjBuilder.appendElements(e.embeddedObject());
BSONObjBuilder queryOptionsBuilder(
finalCmdObjBuilder.subobjStart("$queryOptions"));
queryOptionsBuilder.append(cmdObj[Query::ReadPrefField.name()]);
queryOptionsBuilder.done();
cmdObj = finalCmdObjBuilder.obj();
} else {
cmdObj = e.embeddedObject();
}
}
}
Command::runAgainstRegistered(q.ns, cmdObj, builder, q.queryOptions);
BSONObj x = builder.done();
replyToQuery(0, r.p(), r.m(), x);
return;
} catch (StaleConfigException& e) {
if (loops <= 0)
throw e;
loops--;
log() << "retrying command: " << q.query;
// For legacy reasons, ns may not actually be set in the exception :-(
string staleNS = e.getns();
if (staleNS.size() == 0)
staleNS = q.ns;
ShardConnection::checkMyConnectionVersions(staleNS);
if (loops < 4)
versionManager.forceRemoteCheckShardVersionCB(staleNS);
} catch (AssertionException& e) {
Command::appendCommandStatus(builder, e.toStatus());
BSONObj x = builder.done();
replyToQuery(0, r.p(), r.m(), x);
return;
}
}
}
// TODO: remove after MongoDB 3.2
bool Strategy::handleSpecialNamespaces(Request& r, QueryMessage& q) {
const char* ns = strstr(r.getns(), ".$cmd.sys.");
if (!ns)
return false;
ns += 10;
BSONObjBuilder reply;
const auto upgradeToRealCommand = [&r, &q, &reply](StringData commandName) {
BSONObjBuilder cmdBob;
cmdBob.append(commandName, 1);
cmdBob.appendElements(q.query); // fields are validated by Commands
auto interposedCmd = cmdBob.done();
NamespaceString nss(r.getns());
NamespaceString interposedNss(nss.db(), "$cmd");
Command::runAgainstRegistered(
interposedNss.ns().c_str(), interposedCmd, reply, q.queryOptions);
};
if (strcmp(ns, "inprog") == 0) {
upgradeToRealCommand("currentOp");
} else if (strcmp(ns, "killop") == 0) {
upgradeToRealCommand("killOp");
} else if (strcmp(ns, "unlock") == 0) {
reply.append("err", "can't do unlock through mongos");
} else {
warning() << "unknown sys command [" << ns << "]";
return false;
}
BSONObj x = reply.done();
replyToQuery(0, r.p(), r.m(), x);
return true;
}
void Strategy::commandOp(const string& db,
const BSONObj& command,
int options,
const string& versionedNS,
const BSONObj& targetingQuery,
vector* results) {
QuerySpec qSpec(db + ".$cmd", command, BSONObj(), 0, 1, options);
ParallelSortClusteredCursor cursor(qSpec, CommandInfo(versionedNS, targetingQuery));
// Initialize the cursor
cursor.init();
set shardIds;
cursor.getQueryShardIds(shardIds);
for (const ShardId& shardId : shardIds) {
CommandResult result;
result.shardTargetId = shardId;
result.target = fassertStatusOK(
28739, ConnectionString::parse(cursor.getShardCursor(shardId)->originalHost()));
result.result = cursor.getShardCursor(shardId)->peekFirst().getOwned();
results->push_back(result);
}
}
Status Strategy::commandOpUnsharded(const std::string& db,
const BSONObj& command,
int options,
const std::string& versionedNS,
CommandResult* cmdResult) {
// Note that this implementation will not handle targeting retries and fails when the
// sharding metadata is too stale
auto status = grid.catalogCache()->getDatabase(db);
if (!status.isOK()) {
mongoutils::str::stream ss;
ss << "Passthrough command failed: " << command.toString() << " on ns " << versionedNS
<< ". Caused by " << causedBy(status.getStatus());
return Status(ErrorCodes::IllegalOperation, ss);
}
shared_ptr conf = status.getValue();
if (conf->isSharded(versionedNS)) {
mongoutils::str::stream ss;
ss << "Passthrough command failed: " << command.toString() << " on ns " << versionedNS
<< ". Cannot run on sharded namespace.";
return Status(ErrorCodes::IllegalOperation, ss);
}
const auto primaryShard = grid.shardRegistry()->getShard(conf->getPrimaryId());
BSONObj shardResult;
try {
ShardConnection conn(primaryShard->getConnString(), "");
// TODO: this can throw a stale config when mongos is not up-to-date -- fix.
if (!conn->runCommand(db, command, shardResult, options)) {
conn.done();
return Status(ErrorCodes::OperationFailed,
str::stream() << "Passthrough command failed: " << command << " on ns "
<< versionedNS << "; result: " << shardResult);
}
conn.done();
} catch (const DBException& ex) {
return ex.toStatus();
}
// Fill out the command result.
cmdResult->shardTargetId = conf->getPrimaryId();
cmdResult->result = shardResult;
cmdResult->target = primaryShard->getConnString();
return Status::OK();
}
void Strategy::getMore(Request& r) {
Timer getMoreTimer;
const char* ns = r.getns();
const int ntoreturn = r.d().pullInt();
const long long id = r.d().pullInt64();
// TODO: Handle stale config exceptions here from coll being dropped or sharded during op
// for now has same semantics as legacy request
const NamespaceString nss(ns);
auto statusGetDb = grid.catalogCache()->getDatabase(nss.db().toString());
if (statusGetDb == ErrorCodes::DatabaseNotFound) {
cursorCache.remove(id);
replyToQuery(ResultFlag_CursorNotFound, r.p(), r.m(), 0, 0, 0);
return;
}
uassertStatusOK(statusGetDb);
shared_ptr config = statusGetDb.getValue();
ShardPtr primary;
ChunkManagerPtr info;
config->getChunkManagerOrPrimary(ns, info, primary);
//
// TODO: Cleanup cursor cache, consolidate into single codepath
//
const string host = cursorCache.getRef(id);
ShardedClientCursorPtr cursor = cursorCache.get(id);
int cursorMaxTimeMS = cursorCache.getMaxTimeMS(id);
// Cursor ids should not overlap between sharded and unsharded cursors
massert(17012,
str::stream() << "duplicate sharded and unsharded cursor id " << id << " detected for "
<< ns << ", duplicated on host " << host,
NULL == cursorCache.get(id).get() || host.empty());
ClientBasic* client = ClientBasic::getCurrent();
NamespaceString nsString(ns);
AuthorizationSession* authSession = AuthorizationSession::get(client);
Status status = authSession->checkAuthForGetMore(nsString, id);
audit::logGetMoreAuthzCheck(client, nsString, id, status.code());
uassertStatusOK(status);
if (!host.empty()) {
LOG(3) << "single getmore: " << ns;
// we used ScopedDbConnection because we don't get about config versions
// not deleting data is handled elsewhere
// and we don't want to call setShardVersion
ScopedDbConnection conn(host);
Message response;
bool ok = conn->callRead(r.m(), response);
uassert(10204, "dbgrid: getmore: error calling db", ok);
bool hasMore = (response.singleData().getCursor() != 0);
if (!hasMore) {
cursorCache.removeRef(id);
}
r.reply(response, "" /*conn->getServerAddress() */);
conn.done();
return;
} else if (cursor) {
if (cursorMaxTimeMS == kMaxTimeCursorTimeLimitExpired) {
cursorCache.remove(id);
uasserted(ErrorCodes::ExceededTimeLimit, "operation exceeded time limit");
}
// TODO: Try to match logic of mongod, where on subsequent getMore() we pull lots more data?
BufBuilder buffer(ShardedClientCursor::INIT_REPLY_BUFFER_SIZE);
int docCount = 0;
const int startFrom = cursor->getTotalSent();
bool hasMore = cursor->sendNextBatch(ntoreturn, buffer, docCount);
if (hasMore) {
// still more data
cursor->accessed();
if (cursorMaxTimeMS != kMaxTimeCursorNoTimeLimit) {
// Update remaining amount of time in cursor cache.
int cursorLeftoverMillis = cursorMaxTimeMS - getMoreTimer.millis();
if (cursorLeftoverMillis <= 0) {
cursorLeftoverMillis = kMaxTimeCursorTimeLimitExpired;
}
cursorCache.updateMaxTimeMS(id, cursorLeftoverMillis);
}
} else {
// we've exhausted the cursor
cursorCache.remove(id);
}
replyToQuery(0,
r.p(),
r.m(),
buffer.buf(),
buffer.len(),
docCount,
startFrom,
hasMore ? cursor->getId() : 0);
return;
} else {
LOG(3) << "could not find cursor " << id << " in cache for " << ns;
replyToQuery(ResultFlag_CursorNotFound, r.p(), r.m(), 0, 0, 0);
return;
}
}
void Strategy::writeOp(int op, Request& r) {
// make sure we have a last error
dassert(&LastError::get(cc()));
OwnedPointerVector requestsOwned;
vector& requests = requestsOwned.mutableVector();
msgToBatchRequests(r.m(), &requests);
for (vector::iterator it = requests.begin(); it != requests.end();
++it) {
// Multiple commands registered to last error as multiple requests
if (it != requests.begin())
LastError::get(cc()).startRequest();
BatchedCommandRequest* request = *it;
// Adjust namespaces for command
NamespaceString fullNS(request->getNS());
string cmdNS = fullNS.getCommandNS();
// We only pass in collection name to command
request->setNS(fullNS);
BSONObjBuilder builder;
BSONObj requestBSON = request->toBSON();
{
// Disable the last error object for the duration of the write cmd
LastError::Disabled disableLastError(&LastError::get(cc()));
Command::runAgainstRegistered(cmdNS.c_str(), requestBSON, builder, 0);
}
BatchedCommandResponse response;
bool parsed = response.parseBSON(builder.done(), NULL);
(void)parsed; // for compile
dassert(parsed && response.isValid(NULL));
// Populate the lastError object based on the write response
LastError::get(cc()).reset();
bool hadError = batchErrorToLastError(*request, response, &LastError::get(cc()));
// Check if this is an ordered batch and we had an error which should stop processing
if (request->getOrdered() && hadError)
break;
}
}
}