diff options
Diffstat (limited to 'src/mongo/client/dbclient_cursor.cpp')
-rw-r--r-- | src/mongo/client/dbclient_cursor.cpp | 552 |
1 files changed, 552 insertions, 0 deletions
diff --git a/src/mongo/client/dbclient_cursor.cpp b/src/mongo/client/dbclient_cursor.cpp new file mode 100644 index 00000000000..bafdfea1990 --- /dev/null +++ b/src/mongo/client/dbclient_cursor.cpp @@ -0,0 +1,552 @@ +// dbclient.cpp - connect to a Mongo database as a database, from C++ + +/* Copyright 2009 10gen Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kNetwork + +#include "mongo/platform/basic.h" + +#include "mongo/client/dbclient_cursor.h" + +#include "mongo/client/connpool.h" +#include "mongo/db/client.h" +#include "mongo/db/dbmessage.h" +#include "mongo/db/namespace_string.h" +#include "mongo/db/query/cursor_response.h" +#include "mongo/db/query/getmore_request.h" +#include "mongo/db/query/query_request.h" +#include "mongo/rpc/factory.h" +#include "mongo/rpc/get_status_from_command_result.h" +#include "mongo/rpc/metadata.h" +#include "mongo/rpc/object_check.h" +#include "mongo/s/stale_exception.h" +#include "mongo/stdx/memory.h" +#include "mongo/util/bufreader.h" +#include "mongo/util/debug_util.h" +#include "mongo/util/destructor_guard.h" +#include "mongo/util/exit.h" +#include "mongo/util/log.h" +#include "mongo/util/scopeguard.h" + +namespace mongo { + +using std::unique_ptr; +using std::endl; +using std::string; +using std::vector; + +namespace { +Message assembleCommandRequest(DBClientBase* cli, + StringData database, + int legacyQueryOptions, + BSONObj legacyQuery) { + auto request = rpc::upconvertRequest(database, std::move(legacyQuery), legacyQueryOptions); + + if (cli->getRequestMetadataWriter()) { + BSONObjBuilder bodyBob(std::move(request.body)); + auto opCtx = (haveClient() ? cc().getOperationContext() : nullptr); + uassertStatusOK(cli->getRequestMetadataWriter()(opCtx, &bodyBob)); + request.body = bodyBob.obj(); + } + + return rpc::messageFromOpMsgRequest( + cli->getClientRPCProtocols(), cli->getServerRPCProtocols(), std::move(request)); +} + +} // namespace + +int DBClientCursor::nextBatchSize() { + if (nToReturn == 0) + return batchSize; + + if (batchSize == 0) + return nToReturn; + + return batchSize < nToReturn ? batchSize : nToReturn; +} + +Message DBClientCursor::_assembleInit() { + if (cursorId) { + return _assembleGetMore(); + } + + // If we haven't gotten a cursorId yet, we need to issue a new query or command. + if (_isCommand) { + // HACK: + // Unfortunately, this code is used by the shell to run commands, + // so we need to allow the shell to send invalid options so that we can + // test that the server rejects them. Thus, to allow generating commands with + // invalid options, we validate them here, and fall back to generating an OP_QUERY + // through assembleQueryRequest if the options are invalid. + bool hasValidNToReturnForCommand = (nToReturn == 1 || nToReturn == -1); + bool hasValidFlagsForCommand = !(opts & mongo::QueryOption_Exhaust); + bool hasInvalidMaxTimeMs = query.hasField("$maxTimeMS"); + + if (hasValidNToReturnForCommand && hasValidFlagsForCommand && !hasInvalidMaxTimeMs) { + return assembleCommandRequest(_client, ns.db(), opts, query); + } + } else if (_useFindCommand) { + auto qr = QueryRequest::fromLegacyQuery(ns, + query, + fieldsToReturn ? *fieldsToReturn : BSONObj(), + nToSkip, + nextBatchSize(), + opts); + if (qr.isOK() && !qr.getValue()->isExplain() && !qr.getValue()->isExhaust()) { + BSONObj cmd = qr.getValue()->asFindCommand(); + if (auto readPref = query["$readPreference"]) { + // QueryRequest doesn't handle $readPreference. + cmd = BSONObjBuilder(std::move(cmd)).append(readPref).obj(); + } + return assembleCommandRequest(_client, ns.db(), opts, std::move(cmd)); + } + // else use legacy OP_QUERY request. + } + + _useFindCommand = false; // Make sure we handle the reply correctly. + Message toSend; + assembleQueryRequest(ns.ns(), query, nextBatchSize(), nToSkip, fieldsToReturn, opts, toSend); + return toSend; +} + +Message DBClientCursor::_assembleGetMore() { + invariant(cursorId); + if (_useFindCommand) { + std::int64_t batchSize = nextBatchSize(); + auto gmr = GetMoreRequest(ns, + cursorId, + boost::make_optional(batchSize != 0, batchSize), + boost::none, // awaitDataTimeout + boost::none, // term + boost::none); // lastKnownCommittedOptime + return assembleCommandRequest(_client, ns.db(), opts, gmr.toBSON()); + } else { + // Assemble a legacy getMore request. + return makeGetMoreMessage(ns.ns(), cursorId, nextBatchSize(), opts); + } +} + +bool DBClientCursor::init() { + invariant(!_connectionHasPendingReplies); + Message toSend = _assembleInit(); + verify(_client); + Message reply; + if (!_client->call(toSend, reply, false, &_originalHost)) { + // log msg temp? + log() << "DBClientCursor::init call() failed" << endl; + return false; + } + if (reply.empty()) { + // log msg temp? + log() << "DBClientCursor::init message from call() was empty" << endl; + return false; + } + dataReceived(reply); + return true; +} + +void DBClientCursor::initLazy(bool isRetry) { + massert(15875, + "DBClientCursor::initLazy called on a client that doesn't support lazy", + _client->lazySupported()); + Message toSend = _assembleInit(); + _client->say(toSend, isRetry, &_originalHost); + _lastRequestId = toSend.header().getId(); + _connectionHasPendingReplies = true; +} + +bool DBClientCursor::initLazyFinish(bool& retry) { + invariant(_connectionHasPendingReplies); + Message reply; + bool recvd = _client->recv(reply, _lastRequestId); + _connectionHasPendingReplies = false; + + // If we get a bad response, return false + if (!recvd || reply.empty()) { + if (!recvd) + log() << "DBClientCursor::init lazy say() failed" << endl; + if (reply.empty()) + log() << "DBClientCursor::init message from say() was empty" << endl; + + _client->checkResponse({}, true, &retry, &_lazyHost); + + return false; + } + + dataReceived(reply, retry, _lazyHost); + + return !retry; +} + +void DBClientCursor::requestMore() { + if (opts & QueryOption_Exhaust) { + return exhaustReceiveMore(); + } + + invariant(!_connectionHasPendingReplies); + verify(cursorId && batch.pos == batch.objs.size()); + + if (haveLimit) { + nToReturn -= batch.objs.size(); + verify(nToReturn > 0); + } + + auto doRequestMore = [&] { + Message toSend = _assembleGetMore(); + Message response; + _client->call(toSend, response); + dataReceived(response); + }; + if (_client) + return doRequestMore(); + + invariant(_scopedHost.size()); + DBClientBase::withConnection_do_not_use(_scopedHost, [&](DBClientBase* conn) { + ON_BLOCK_EXIT([&, origClient = _client ] { _client = origClient; }); + _client = conn; + doRequestMore(); + }); +} + +/** with QueryOption_Exhaust, the server just blasts data at us (marked at end with cursorid==0). */ +void DBClientCursor::exhaustReceiveMore() { + verify(cursorId && batch.pos == batch.objs.size()); + uassert(40675, "Cannot have limit for exhaust query", !haveLimit); + Message response; + verify(_client); + if (!_client->recv(response, _lastRequestId)) { + uasserted(16465, "recv failed while exhausting cursor"); + } + dataReceived(response); +} + +BSONObj DBClientCursor::commandDataReceived(const Message& reply) { + int op = reply.operation(); + invariant(op == opReply || op == dbCommandReply || op == dbMsg); + + auto commandReply = _client->parseCommandReplyMessage(_client->getServerAddress(), reply); + auto commandStatus = getStatusFromCommandResult(commandReply->getCommandReply()); + + if (commandStatus == ErrorCodes::StaleConfig) { + uassertStatusOK( + commandStatus.withContext("stale config in DBClientCursor::dataReceived()")); + } else if (!commandStatus.isOK()) { + wasError = true; + } + + auto opCtx = haveClient() ? cc().getOperationContext() : nullptr; + if (_client->getReplyMetadataReader()) { + uassertStatusOK(_client->getReplyMetadataReader()( + opCtx, commandReply->getMetadata(), _client->getServerAddress())); + } + + return commandReply->getCommandReply().getOwned(); +} + +void DBClientCursor::dataReceived(const Message& reply, bool& retry, string& host) { + batch.objs.clear(); + batch.pos = 0; + + // If this is a reply to our initial command request. + if (_isCommand && cursorId == 0) { + batch.objs.push_back(commandDataReceived(reply)); + return; + } + + if (_useFindCommand) { + cursorId = 0; // Don't try to kill cursor if we get back an error. + auto cr = uassertStatusOK(CursorResponse::parseFromBSON(commandDataReceived(reply))); + cursorId = cr.getCursorId(); + ns = cr.getNSS(); // Unlike OP_REPLY, find command can change the ns to use for getMores. + batch.objs = cr.releaseBatch(); + return; + } + + QueryResult::View qr = reply.singleData().view2ptr(); + resultFlags = qr.getResultFlags(); + + if (qr.getResultFlags() & ResultFlag_ErrSet) { + wasError = true; + } + + if (qr.getResultFlags() & ResultFlag_CursorNotFound) { + // cursor id no longer valid at the server. + invariant(qr.getCursorId() == 0); + + if (!(opts & QueryOption_CursorTailable)) { + uasserted(ErrorCodes::CursorNotFound, + str::stream() << "cursor id " << cursorId << " didn't exist on server."); + } + + // 0 indicates no longer valid (dead) + cursorId = 0; + } + + if (cursorId == 0 || !(opts & QueryOption_CursorTailable)) { + // only set initially: we don't want to kill it on end of data + // if it's a tailable cursor + cursorId = qr.getCursorId(); + } + + if (opts & QueryOption_Exhaust) { + // With exhaust mode, each reply after the first claims to be a reply to the previous one + // rather than the initial request. + _connectionHasPendingReplies = (cursorId != 0); + _lastRequestId = reply.header().getId(); + } + + batch.objs.reserve(qr.getNReturned()); + + BufReader data(qr.data(), qr.dataLen()); + while (static_cast<int>(batch.objs.size()) < qr.getNReturned()) { + if (serverGlobalParams.objcheck) { + batch.objs.push_back(data.read<Validated<BSONObj>>()); + } else { + batch.objs.push_back(data.read<BSONObj>()); + } + batch.objs.back().shareOwnershipWith(reply.sharedBuffer()); + } + uassert(ErrorCodes::InvalidBSON, + "Got invalid reply from external server while reading from cursor", + data.atEof()); + + _client->checkResponse(batch.objs, false, &retry, &host); // watches for "not master" + + if (qr.getResultFlags() & ResultFlag_ShardConfigStale) { + BSONObj error; + verify(peekError(&error)); + uasserted(StaleConfigInfo::parseFromCommandError(error), "stale config on lazy receive"); + } + + /* this assert would fire the way we currently work: + verify( nReturned || cursorId == 0 ); + */ +} + +/** If true, safe to call next(). Requests more from server if necessary. */ +bool DBClientCursor::more() { + if (!_putBack.empty()) + return true; + + if (haveLimit && static_cast<int>(batch.pos) >= nToReturn) + return false; + + if (batch.pos < batch.objs.size()) + return true; + + if (cursorId == 0) + return false; + + requestMore(); + return batch.pos < batch.objs.size(); +} + +BSONObj DBClientCursor::next() { + if (!_putBack.empty()) { + BSONObj ret = _putBack.top(); + _putBack.pop(); + return ret; + } + + uassert( + 13422, "DBClientCursor next() called but more() is false", batch.pos < batch.objs.size()); + + /* todo would be good to make data null at end of batch for safety */ + return std::move(batch.objs[batch.pos++]); +} + +BSONObj DBClientCursor::nextSafe() { + BSONObj o = next(); + + // Only convert legacy errors ($err) to exceptions. Otherwise, just return the response and the + // caller will interpret it as a command error. + if (wasError && strcmp(o.firstElementFieldName(), "$err") == 0) { + uassertStatusOK(getStatusFromCommandResult(o)); + } + + return o; +} + +void DBClientCursor::peek(vector<BSONObj>& v, int atMost) { + auto end = atMost >= static_cast<int>(batch.objs.size() - batch.pos) + ? batch.objs.end() + : batch.objs.begin() + batch.pos + atMost; + v.insert(v.end(), batch.objs.begin() + batch.pos, end); +} + +BSONObj DBClientCursor::peekFirst() { + vector<BSONObj> v; + peek(v, 1); + + if (v.size() > 0) + return v[0]; + else + return BSONObj(); +} + +bool DBClientCursor::peekError(BSONObj* error) { + if (!wasError) + return false; + + vector<BSONObj> v; + peek(v, 1); + + verify(v.size() == 1); + // We check both the legacy error format, and the new error format. hasErrField checks for + // $err, and getStatusFromCommandResult checks for modern errors of the form '{ok: 0.0, code: + // <...>, errmsg: ...}'. + verify(hasErrField(v[0]) || !getStatusFromCommandResult(v[0]).isOK()); + + if (error) + *error = v[0].getOwned(); + return true; +} + +void DBClientCursor::attach(AScopedConnection* conn) { + verify(_scopedHost.size() == 0); + verify(conn); + verify(conn->get()); + + if (conn->get()->type() == ConnectionString::SET) { + if (_lazyHost.size() > 0) + _scopedHost = _lazyHost; + else if (_client) + _scopedHost = _client->getServerAddress(); + else + massert(14821, + "No client or lazy client specified, cannot store multi-host connection.", + false); + } else { + _scopedHost = conn->getHost(); + } + + conn->done(); + _client = 0; + _lazyHost = ""; +} + +DBClientCursor::DBClientCursor(DBClientBase* client, + const std::string& ns, + const BSONObj& query, + int nToReturn, + int nToSkip, + const BSONObj* fieldsToReturn, + int queryOptions, + int batchSize) + : DBClientCursor(client, + ns, + query, + 0, // cursorId + nToReturn, + nToSkip, + fieldsToReturn, + queryOptions, + batchSize, + {}) {} + +DBClientCursor::DBClientCursor(DBClientBase* client, + const std::string& ns, + long long cursorId, + int nToReturn, + int queryOptions, + std::vector<BSONObj> initialBatch) + : DBClientCursor(client, + ns, + BSONObj(), // query + cursorId, + nToReturn, + 0, // nToSkip + nullptr, // fieldsToReturn + queryOptions, + 0, + std::move(initialBatch)) {} // batchSize + +DBClientCursor::DBClientCursor(DBClientBase* client, + const std::string& ns, + const BSONObj& query, + long long cursorId, + int nToReturn, + int nToSkip, + const BSONObj* fieldsToReturn, + int queryOptions, + int batchSize, + std::vector<BSONObj> initialBatch) + : batch{std::move(initialBatch)}, + _client(client), + _originalHost(_client->getServerAddress()), + ns(ns), + _isCommand(nsIsFull(ns) ? nsToCollectionSubstring(ns) == "$cmd" : false), + query(query), + nToReturn(nToReturn), + haveLimit(nToReturn > 0 && !(queryOptions & QueryOption_CursorTailable)), + nToSkip(nToSkip), + fieldsToReturn(fieldsToReturn), + opts(queryOptions & ~QueryOptionLocal_forceOpQuery), + batchSize(batchSize == 1 ? 2 : batchSize), + resultFlags(0), + cursorId(cursorId), + _ownCursor(true), + wasError(false), + _enabledBSONVersion(Validator<BSONObj>::enabledBSONVersion()) { + if (queryOptions & QueryOptionLocal_forceOpQuery) + _useFindCommand = false; +} + +DBClientCursor::~DBClientCursor() { + kill(); +} + +void DBClientCursor::kill() { + DESTRUCTOR_GUARD({ + if (cursorId && _ownCursor && !globalInShutdownDeprecated()) { + auto killCursor = [&](auto&& conn) { + if (_useFindCommand) { + conn->killCursor(ns, cursorId); + } else { + auto toSend = makeKillCursorsMessage(cursorId); + conn->say(toSend); + } + }; + + if (_client && !_connectionHasPendingReplies) { + killCursor(_client); + } else { + // Use a side connection to send the kill cursor request. + verify(_scopedHost.size() || (_client && _connectionHasPendingReplies)); + DBClientBase::withConnection_do_not_use( + _client ? _client->getServerAddress() : _scopedHost, killCursor); + } + } + }); + + // Mark this cursor as dead since we can't do any getMores. + cursorId = 0; +} + + +} // namespace mongo |