/**
* Copyright (C) 2017 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects
* for all of the code used other than as permitted herein. If you modify
* file(s) with this exception, you may extend this exception to your
* version of the file(s), but you are not obligated to do so. If you do not
* wish to do so, delete this exception statement from your version. If you
* delete this exception statement from all source files in the program,
* then also delete it in the license file.
*/
#include "mongo/platform/basic.h"
#include "mongo/db/pipeline/mongos_process_interface.h"
#include "mongo/db/catalog/uuid_catalog.h"
#include "mongo/db/curop.h"
#include "mongo/db/pipeline/document_source.h"
#include "mongo/db/query/collation/collation_spec.h"
#include "mongo/db/repl/read_concern_args.h"
#include "mongo/db/s/scoped_collection_metadata.h"
#include "mongo/executor/task_executor_pool.h"
#include "mongo/s/catalog_cache.h"
#include "mongo/s/commands/cluster_commands_helpers.h"
#include "mongo/s/grid.h"
#include "mongo/s/query/cluster_cursor_manager.h"
#include "mongo/s/query/establish_cursors.h"
#include "mongo/s/query/router_exec_stage.h"
namespace mongo {
using boost::intrusive_ptr;
using std::shared_ptr;
using std::string;
using std::unique_ptr;
namespace {
/**
* Determines the single shard to which the given query will be targeted, and its associated
* shardVersion. Throws if the query targets more than one shard.
*/
std::pair getSingleTargetedShardForQuery(
OperationContext* opCtx, const CachedCollectionRoutingInfo& routingInfo, BSONObj query) {
if (auto chunkMgr = routingInfo.cm()) {
std::set shardIds;
chunkMgr->getShardIdsForQuery(opCtx, query, CollationSpec::kSimpleSpec, &shardIds);
uassert(ErrorCodes::InternalError,
str::stream() << "Unable to target lookup query to a single shard: "
<< query.toString(),
shardIds.size() == 1u);
return {*shardIds.begin(), chunkMgr->getVersion(*shardIds.begin())};
}
return {routingInfo.db().primaryId(), ChunkVersion::UNSHARDED()};
}
/**
* Returns the routing information for the namespace set on the passed ExpressionContext. Also
* verifies that the ExpressionContext's UUID, if present, matches that of the routing table entry.
*/
StatusWith getCollectionRoutingInfo(
const intrusive_ptr& expCtx) {
auto catalogCache = Grid::get(expCtx->opCtx)->catalogCache();
auto swRoutingInfo = catalogCache->getCollectionRoutingInfo(expCtx->opCtx, expCtx->ns);
// Additionally check that the ExpressionContext's UUID matches the collection routing info.
if (swRoutingInfo.isOK() && expCtx->uuid && swRoutingInfo.getValue().cm()) {
if (!swRoutingInfo.getValue().cm()->uuidMatches(*expCtx->uuid)) {
return {ErrorCodes::NamespaceNotFound,
str::stream() << "The UUID of collection " << expCtx->ns.ns()
<< " changed; it may have been dropped and re-created."};
}
}
return swRoutingInfo;
}
} // namespace
boost::optional MongoSInterface::lookupSingleDocument(
const boost::intrusive_ptr& expCtx,
const NamespaceString& nss,
UUID collectionUUID,
const Document& filter,
boost::optional readConcern) {
auto foreignExpCtx = expCtx->copyWith(nss, collectionUUID);
// Create the find command to be dispatched to the shard in order to return the post-change
// document.
auto filterObj = filter.toBson();
BSONObjBuilder cmdBuilder;
bool findCmdIsByUuid(foreignExpCtx->uuid);
if (findCmdIsByUuid) {
foreignExpCtx->uuid->appendToBuilder(&cmdBuilder, "find");
} else {
cmdBuilder.append("find", nss.coll());
}
cmdBuilder.append("filter", filterObj);
cmdBuilder.append("comment", expCtx->comment);
if (readConcern) {
cmdBuilder.append(repl::ReadConcernArgs::kReadConcernFieldName, *readConcern);
}
auto shardResult = std::vector();
auto findCmd = cmdBuilder.obj();
size_t numAttempts = 0;
while (++numAttempts <= kMaxNumStaleVersionRetries) {
// Verify that the collection exists, with the correct UUID.
auto catalogCache = Grid::get(expCtx->opCtx)->catalogCache();
auto swRoutingInfo = getCollectionRoutingInfo(foreignExpCtx);
if (swRoutingInfo == ErrorCodes::NamespaceNotFound) {
return boost::none;
}
auto routingInfo = uassertStatusOK(std::move(swRoutingInfo));
if (findCmdIsByUuid && routingInfo.cm()) {
// Find by UUID and shard versioning do not work together (SERVER-31946). In the
// sharded case we've already checked the UUID, so find by namespace is safe. In the
// unlikely case that the collection has been deleted and a new collection with the same
// name created through a different mongos, the shard version will be detected as stale,
// as shard versions contain an 'epoch' field unique to the collection.
findCmd = findCmd.addField(BSON("find" << nss.coll()).firstElement());
findCmdIsByUuid = false;
}
// Get the ID and version of the single shard to which this query will be sent.
auto shardInfo = getSingleTargetedShardForQuery(expCtx->opCtx, routingInfo, filterObj);
// Dispatch the request. This will only be sent to a single shard and only a single result
// will be returned. The 'establishCursors' method conveniently prepares the result into a
// cursor response for us.
try {
shardResult = establishCursors(
expCtx->opCtx,
Grid::get(expCtx->opCtx)->getExecutorPool()->getArbitraryExecutor(),
nss,
ReadPreferenceSetting::get(expCtx->opCtx),
{{shardInfo.first, appendShardVersion(findCmd, shardInfo.second)}},
false);
break;
} catch (const ExceptionFor&) {
// If it's an unsharded collection which has been deleted and re-created, we may get a
// NamespaceNotFound error when looking up by UUID.
return boost::none;
} catch (const ExceptionForCat&) {
// If we hit a stale shardVersion exception, invalidate the routing table cache.
catalogCache->onStaleShardVersion(std::move(routingInfo));
continue; // Try again if allowed.
}
break; // Success!
}
invariant(shardResult.size() == 1u);
auto& cursor = shardResult.front().getCursorResponse();
auto& batch = cursor.getBatch();
// We should have at most 1 result, and the cursor should be exhausted.
uassert(ErrorCodes::InternalError,
str::stream() << "Shard cursor was unexpectedly open after lookup: "
<< shardResult.front().getHostAndPort()
<< ", id: "
<< cursor.getCursorId(),
cursor.getCursorId() == 0);
uassert(ErrorCodes::TooManyMatchingDocuments,
str::stream() << "found more than one document matching " << filter.toString() << " ["
<< batch.begin()->toString()
<< ", "
<< std::next(batch.begin())->toString()
<< "]",
batch.size() <= 1u);
return (!batch.empty() ? Document(batch.front()) : boost::optional{});
}
std::pair, bool> MongoSInterface::collectDocumentKeyFields(
OperationContext* opCtx, NamespaceStringOrUUID nssOrUUID) const {
invariant(!nssOrUUID.uuid(), "Did not expect to use this method with a UUID on mongos");
const NamespaceString& nss = *nssOrUUID.nss();
auto collRoutInfo = Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss);
if (collRoutInfo == ErrorCodes::NamespaceNotFound) {
return {{"_id"}, false};
}
uassertStatusOKWithContext(collRoutInfo, "Collection Routing Info is unavailable");
auto cm = collRoutInfo.getValue().cm();
if (!cm)
return {{"_id"}, false};
// Unpack the shard key.
std::vector result;
bool gotId = false;
for (auto& field : cm->getShardKeyPattern().getKeyPatternFields()) {
result.emplace_back(field->dottedField());
gotId |= (result.back().fullPath() == "_id");
}
if (!gotId) { // If not part of the shard key, "_id" comes last.
result.emplace_back("_id");
}
// Collection is sharded so the document key fields will never change, mark as final.
return {result, true};
}
BSONObj MongoSInterface::_reportCurrentOpForClient(OperationContext* opCtx,
Client* client,
CurrentOpTruncateMode truncateOps) const {
BSONObjBuilder builder;
CurOp::reportCurrentOpForClient(
opCtx, client, (truncateOps == CurrentOpTruncateMode::kTruncateOps), &builder);
return builder.obj();
}
std::vector MongoSInterface::getCursors(
const intrusive_ptr& expCtx) const {
invariant(hasGlobalServiceContext());
auto cursorManager = Grid::get(expCtx->opCtx->getServiceContext())->getCursorManager();
invariant(cursorManager);
return cursorManager->getAllCursors();
}
bool MongoSInterface::isSharded(OperationContext* opCtx, const NamespaceString& nss) {
auto routingInfo = Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, nss);
return routingInfo.isOK() && routingInfo.getValue().cm();
}
} // namespace mongo