/** * Copyright (C) 2017 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kQuery #include "mongo/platform/basic.h" #include "mongo/s/query/establish_cursors.h" #include "mongo/client/remote_command_retry_scheduler.h" #include "mongo/client/remote_command_targeter.h" #include "mongo/db/query/cursor_response.h" #include "mongo/db/query/getmore_request.h" #include "mongo/db/query/killcursors_request.h" #include "mongo/executor/remote_command_request.h" #include "mongo/executor/remote_command_response.h" #include "mongo/s/async_requests_sender.h" #include "mongo/s/grid.h" #include "mongo/util/assert_util.h" #include "mongo/util/log.h" #include "mongo/util/scopeguard.h" namespace mongo { std::vector establishCursors(OperationContext* opCtx, executor::TaskExecutor* executor, const NamespaceString& nss, const ReadPreferenceSetting readPref, const std::vector>& remotes, bool allowPartialResults) { // Construct the requests std::vector requests; for (const auto& remote : remotes) { requests.emplace_back(remote.first, remote.second); } // Send the requests AsyncRequestsSender ars(opCtx, executor, nss.db().toString(), std::move(requests), readPref, Shard::RetryPolicy::kIdempotent); std::vector remoteCursors; try { // Get the responses while (!ars.done()) { try { auto response = ars.next(); // Note the shardHostAndPort may not be populated if there was an error, so be sure // to do this after parsing the cursor response to ensure the response was ok. // Additionally, be careful not to push into 'remoteCursors' until we are sure we // have a valid cursor, since the error handling path will attempt to clean up // anything in 'remoteCursors' RemoteCursor cursor; cursor.setCursorResponse(CursorResponse::parseFromBSONThrowing( uassertStatusOK(std::move(response.swResponse)).data)); cursor.setShardId(std::move(response.shardId)); cursor.setHostAndPort(*response.shardHostAndPort); remoteCursors.push_back(std::move(cursor)); } catch (const DBException& ex) { // Retriable errors are swallowed if 'allowPartialResults' is true. if (allowPartialResults && std::find(RemoteCommandRetryScheduler::kAllRetriableErrors.begin(), RemoteCommandRetryScheduler::kAllRetriableErrors.end(), ex.code()) != RemoteCommandRetryScheduler::kAllRetriableErrors.end()) { continue; } throw; // Fail this loop. } } return remoteCursors; } catch (const DBException&) { // If one of the remotes had an error, we make a best effort to finish retrieving responses // for other requests that were already sent, so that we can send killCursors to any cursors // that we know were established. try { // Do not schedule any new requests. ars.stopRetrying(); // Collect responses from all requests that were already sent. while (!ars.done()) { auto response = ars.next(); // Check if the response contains an established cursor, and if so, store it. StatusWith swCursorResponse( response.swResponse.isOK() ? CursorResponse::parseFromBSON(response.swResponse.getValue().data) : response.swResponse.getStatus()); if (swCursorResponse.isOK()) { RemoteCursor cursor; cursor.setShardId(std::move(response.shardId)); cursor.setHostAndPort(*response.shardHostAndPort); cursor.setCursorResponse(std::move(swCursorResponse.getValue())); remoteCursors.push_back(std::move(cursor)); } } // Schedule killCursors against all cursors that were established. for (const auto& remoteCursor : remoteCursors) { BSONObj cmdObj = KillCursorsRequest(nss, {remoteCursor.getCursorResponse().getCursorId()}) .toBSON(); executor::RemoteCommandRequest request( remoteCursor.getHostAndPort(), nss.db().toString(), cmdObj, opCtx); // We do not process the response to the killCursors request (we make a good-faith // attempt at cleaning up the cursors, but ignore any returned errors). executor ->scheduleRemoteCommand( request, [](const executor::TaskExecutor::RemoteCommandCallbackArgs& cbData) {}) .status_with_transitional_ignore(); } } catch (const DBException&) { // Ignore the new error and rethrow the original one. } throw; } } } // namespace mongo