diff options
author | Nick Zolnierz <nicholas.zolnierz@mongodb.com> | 2018-07-26 16:00:20 -0400 |
---|---|---|
committer | Nick Zolnierz <nicholas.zolnierz@mongodb.com> | 2018-08-02 12:23:03 -0400 |
commit | afbe688f0f18c5cb474fb1bcd933d6e06c0c5291 (patch) | |
tree | a7c1a3192f335c5e2947e395159eaa03782c3a50 /src/mongo/db/pipeline/pipeline_d.cpp | |
parent | afaf46687eb3930ddbfc8b528bd68295b6a09676 (diff) | |
download | mongo-afbe688f0f18c5cb474fb1bcd933d6e06c0c5291.tar.gz |
SERVER-35896: Support 'replaceDocuments' mode in $out
Diffstat (limited to 'src/mongo/db/pipeline/pipeline_d.cpp')
-rw-r--r-- | src/mongo/db/pipeline/pipeline_d.cpp | 335 |
1 files changed, 0 insertions, 335 deletions
diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp index f49b3dfdd8b..f3317c81b3b 100644 --- a/src/mongo/db/pipeline/pipeline_d.cpp +++ b/src/mongo/db/pipeline/pipeline_d.cpp @@ -33,24 +33,18 @@ #include "mongo/db/pipeline/pipeline_d.h" #include "mongo/bson/simple_bsonobj_comparator.h" -#include "mongo/db/auth/authorization_session.h" #include "mongo/db/catalog/collection.h" #include "mongo/db/catalog/database.h" #include "mongo/db/catalog/database_holder.h" -#include "mongo/db/catalog/document_validation.h" #include "mongo/db/catalog/index_catalog.h" -#include "mongo/db/catalog/uuid_catalog.h" #include "mongo/db/concurrency/d_concurrency.h" #include "mongo/db/concurrency/write_conflict_exception.h" -#include "mongo/db/curop.h" #include "mongo/db/db_raii.h" -#include "mongo/db/dbdirectclient.h" #include "mongo/db/exec/fetch.h" #include "mongo/db/exec/multi_iterator.h" #include "mongo/db/exec/shard_filter.h" #include "mongo/db/exec/working_set.h" #include "mongo/db/index/index_access_method.h" -#include "mongo/db/kill_sessions.h" #include "mongo/db/matcher/extensions_callback_real.h" #include "mongo/db/namespace_string.h" #include "mongo/db/pipeline/document_source.h" @@ -73,9 +67,6 @@ #include "mongo/db/s/collection_sharding_state.h" #include "mongo/db/s/sharding_state.h" #include "mongo/db/service_context.h" -#include "mongo/db/session_catalog.h" -#include "mongo/db/stats/fill_locker_info.h" -#include "mongo/db/stats/storage_stats.h" #include "mongo/db/stats/top.h" #include "mongo/db/storage/record_store.h" #include "mongo/db/storage/sorted_data_interface.h" @@ -671,330 +662,4 @@ void PipelineD::getPlanSummaryStats(const Pipeline* pipeline, PlanSummaryStats* statsOut->usedDisk = usedDisk; } -PipelineD::MongoDInterface::MongoDInterface(OperationContext* opCtx) : _client(opCtx) {} - -void PipelineD::MongoDInterface::setOperationContext(OperationContext* opCtx) { - _client.setOpCtx(opCtx); -} - -DBClientBase* PipelineD::MongoDInterface::directClient() { - return &_client; -} - -bool PipelineD::MongoDInterface::isSharded(OperationContext* opCtx, const NamespaceString& nss) { - AutoGetCollectionForReadCommand autoColl(opCtx, nss); - auto const css = CollectionShardingState::get(opCtx, nss); - return css->getMetadata(opCtx)->isSharded(); -} - -BSONObj PipelineD::MongoDInterface::insert(const boost::intrusive_ptr<ExpressionContext>& expCtx, - const NamespaceString& ns, - const std::vector<BSONObj>& objs) { - boost::optional<DisableDocumentValidation> maybeDisableValidation; - if (expCtx->bypassDocumentValidation) - maybeDisableValidation.emplace(expCtx->opCtx); - - _client.insert(ns.ns(), objs); - return _client.getLastErrorDetailed(); -} - -CollectionIndexUsageMap PipelineD::MongoDInterface::getIndexStats(OperationContext* opCtx, - const NamespaceString& ns) { - AutoGetCollectionForReadCommand autoColl(opCtx, ns); - - Collection* collection = autoColl.getCollection(); - if (!collection) { - LOG(2) << "Collection not found on index stats retrieval: " << ns.ns(); - return CollectionIndexUsageMap(); - } - - return collection->infoCache()->getIndexUsageStats(); -} - -void PipelineD::MongoDInterface::appendLatencyStats(OperationContext* opCtx, - const NamespaceString& nss, - bool includeHistograms, - BSONObjBuilder* builder) const { - Top::get(opCtx->getServiceContext()).appendLatencyStats(nss.ns(), includeHistograms, builder); -} - -Status PipelineD::MongoDInterface::appendStorageStats(OperationContext* opCtx, - const NamespaceString& nss, - const BSONObj& param, - BSONObjBuilder* builder) const { - return appendCollectionStorageStats(opCtx, nss, param, builder); -} - -Status PipelineD::MongoDInterface::appendRecordCount(OperationContext* opCtx, - const NamespaceString& nss, - BSONObjBuilder* builder) const { - return appendCollectionRecordCount(opCtx, nss, builder); -} - -BSONObj PipelineD::MongoDInterface::getCollectionOptions(const NamespaceString& nss) { - const auto infos = _client.getCollectionInfos(nss.db().toString(), BSON("name" << nss.coll())); - return infos.empty() ? BSONObj() : infos.front().getObjectField("options").getOwned(); -} - -void PipelineD::MongoDInterface::renameIfOptionsAndIndexesHaveNotChanged( - OperationContext* opCtx, - const BSONObj& renameCommandObj, - const NamespaceString& targetNs, - const BSONObj& originalCollectionOptions, - const std::list<BSONObj>& originalIndexes) { - Lock::GlobalWrite globalLock(opCtx); - - uassert(ErrorCodes::CommandFailed, - str::stream() << "collection options of target collection " << targetNs.ns() - << " changed during processing. Original options: " - << originalCollectionOptions - << ", new options: " - << getCollectionOptions(targetNs), - SimpleBSONObjComparator::kInstance.evaluate(originalCollectionOptions == - getCollectionOptions(targetNs))); - - auto currentIndexes = _client.getIndexSpecs(targetNs.ns()); - uassert(ErrorCodes::CommandFailed, - str::stream() << "indexes of target collection " << targetNs.ns() - << " changed during processing.", - originalIndexes.size() == currentIndexes.size() && - std::equal(originalIndexes.begin(), - originalIndexes.end(), - currentIndexes.begin(), - SimpleBSONObjComparator::kInstance.makeEqualTo())); - - BSONObj info; - uassert(ErrorCodes::CommandFailed, - str::stream() << "renameCollection failed: " << info, - _client.runCommand("admin", renameCommandObj, info)); -} - -StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> PipelineD::MongoDInterface::makePipeline( - const std::vector<BSONObj>& rawPipeline, - const boost::intrusive_ptr<ExpressionContext>& expCtx, - const MakePipelineOptions opts) { - auto pipeline = Pipeline::parse(rawPipeline, expCtx); - if (!pipeline.isOK()) { - return pipeline.getStatus(); - } - - if (opts.optimize) { - pipeline.getValue()->optimizePipeline(); - } - - Status cursorStatus = Status::OK(); - - if (opts.attachCursorSource) { - cursorStatus = attachCursorSourceToPipeline(expCtx, pipeline.getValue().get()); - } - - return cursorStatus.isOK() ? std::move(pipeline) : cursorStatus; -} - -Status PipelineD::MongoDInterface::attachCursorSourceToPipeline( - const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) { - invariant(pipeline->getSources().empty() || - !dynamic_cast<DocumentSourceCursor*>(pipeline->getSources().front().get())); - - boost::optional<AutoGetCollectionForReadCommand> autoColl; - if (expCtx->uuid) { - try { - autoColl.emplace(expCtx->opCtx, - NamespaceStringOrUUID{expCtx->ns.db().toString(), *expCtx->uuid}); - } catch (const ExceptionFor<ErrorCodes::NamespaceNotFound>& ex) { - // The UUID doesn't exist anymore - return ex.toStatus(); - } - } else { - autoColl.emplace(expCtx->opCtx, expCtx->ns); - } - - // makePipeline() is only called to perform secondary aggregation requests and expects the - // collection representing the document source to be not-sharded. We confirm sharding state - // here to avoid taking a collection lock elsewhere for this purpose alone. - // TODO SERVER-27616: This check is incorrect in that we don't acquire a collection cursor - // until after we release the lock, leaving room for a collection to be sharded in-between. - auto css = CollectionShardingState::get(expCtx->opCtx, expCtx->ns); - uassert(4567, - str::stream() << "from collection (" << expCtx->ns.ns() << ") cannot be sharded", - !css->getMetadata(expCtx->opCtx)->isSharded()); - - PipelineD::prepareCursorSource(autoColl->getCollection(), expCtx->ns, nullptr, pipeline); - - return Status::OK(); -} - -std::string PipelineD::MongoDInterface::getShardName(OperationContext* opCtx) const { - if (ShardingState::get(opCtx)->enabled()) { - return ShardingState::get(opCtx)->shardId().toString(); - } - - return std::string(); -} - -std::pair<std::vector<FieldPath>, bool> PipelineD::MongoDInterface::collectDocumentKeyFields( - OperationContext* opCtx, UUID uuid) const { - if (serverGlobalParams.clusterRole != ClusterRole::ShardServer) { - return {{"_id"}, false}; // Nothing is sharded. - } - - // An empty namespace indicates that the collection has been dropped. Treat it as unsharded and - // mark the fields as final. - auto nss = UUIDCatalog::get(opCtx).lookupNSSByUUID(uuid); - if (nss.isEmpty()) { - return {{"_id"}, true}; - } - - // Before taking a collection lock to retrieve the shard key fields, consult the catalog cache - // to determine whether the collection is sharded in the first place. - auto catalogCache = Grid::get(opCtx)->catalogCache(); - - const bool collectionIsSharded = catalogCache && [&]() { - auto routingInfo = catalogCache->getCollectionRoutingInfo(opCtx, nss); - return routingInfo.isOK() && routingInfo.getValue().cm(); - }(); - - // Collection exists and is not sharded, mark as not final. - if (!collectionIsSharded) { - return {{"_id"}, false}; - } - - auto scm = [opCtx, &nss]() -> ScopedCollectionMetadata { - AutoGetCollection autoColl(opCtx, nss, MODE_IS); - return CollectionShardingState::get(opCtx, nss)->getMetadata(opCtx); - }(); - - // Collection is not sharded or UUID mismatch implies collection has been dropped and recreated - // as sharded. - if (!scm->isSharded() || !scm->uuidMatches(uuid)) { - return {{"_id"}, false}; - } - - // Unpack the shard key. - std::vector<FieldPath> result; - bool gotId = false; - for (auto& field : scm->getKeyPatternFields()) { - result.emplace_back(field->dottedField()); - gotId |= (result.back().fullPath() == "_id"); - } - if (!gotId) { // If not part of the shard key, "_id" comes last. - result.emplace_back("_id"); - } - // Collection is now sharded so the document key fields will never change, mark as final. - return {result, true}; -} - -std::vector<GenericCursor> PipelineD::MongoDInterface::getCursors( - const intrusive_ptr<ExpressionContext>& expCtx) const { - return CursorManager::getAllCursors(expCtx->opCtx); -} - -boost::optional<Document> PipelineD::MongoDInterface::lookupSingleDocument( - const boost::intrusive_ptr<ExpressionContext>& expCtx, - const NamespaceString& nss, - UUID collectionUUID, - const Document& documentKey, - boost::optional<BSONObj> readConcern) { - invariant(!readConcern); // We don't currently support a read concern on mongod - it's only - // expected to be necessary on mongos. - - std::unique_ptr<Pipeline, PipelineDeleter> pipeline; - try { - // Be sure to do the lookup using the collection default collation - auto foreignExpCtx = expCtx->copyWith( - nss, - collectionUUID, - _getCollectionDefaultCollator(expCtx->opCtx, nss.db(), collectionUUID)); - pipeline = uassertStatusOK(makePipeline({BSON("$match" << documentKey)}, foreignExpCtx)); - } catch (const ExceptionFor<ErrorCodes::NamespaceNotFound>&) { - return boost::none; - } - - auto lookedUpDocument = pipeline->getNext(); - if (auto next = pipeline->getNext()) { - uasserted(ErrorCodes::TooManyMatchingDocuments, - str::stream() << "found more than one document with document key " - << documentKey.toString() - << " [" - << lookedUpDocument->toString() - << ", " - << next->toString() - << "]"); - } - return lookedUpDocument; -} - -BSONObj PipelineD::MongoDInterface::_reportCurrentOpForClient( - OperationContext* opCtx, Client* client, CurrentOpTruncateMode truncateOps) const { - BSONObjBuilder builder; - - CurOp::reportCurrentOpForClient( - opCtx, client, (truncateOps == CurrentOpTruncateMode::kTruncateOps), &builder); - - OperationContext* clientOpCtx = client->getOperationContext(); - - if (clientOpCtx) { - if (auto opCtxSession = OperationContextSession::get(clientOpCtx)) { - opCtxSession->reportUnstashedState(repl::ReadConcernArgs::get(clientOpCtx), &builder); - } - - // Append lock stats before returning. - if (auto lockerInfo = clientOpCtx->lockState()->getLockerInfo()) { - fillLockerInfo(*lockerInfo, builder); - } - } - - return builder.obj(); -} - -void PipelineD::MongoDInterface::_reportCurrentOpsForIdleSessions(OperationContext* opCtx, - CurrentOpUserMode userMode, - std::vector<BSONObj>* ops) const { - auto sessionCatalog = SessionCatalog::get(opCtx); - - const bool authEnabled = - AuthorizationSession::get(opCtx->getClient())->getAuthorizationManager().isAuthEnabled(); - - // If the user is listing only their own ops, we use makeSessionFilterForAuthenticatedUsers to - // create a pattern that will match against all authenticated usernames for the current client. - // If the user is listing ops for all users, we create an empty pattern; constructing an - // instance of SessionKiller::Matcher with this empty pattern will return all sessions. - auto sessionFilter = (authEnabled && userMode == CurrentOpUserMode::kExcludeOthers - ? makeSessionFilterForAuthenticatedUsers(opCtx) - : KillAllSessionsByPatternSet{{}}); - - sessionCatalog->scanSessions(opCtx, - {std::move(sessionFilter)}, - [&](OperationContext* opCtx, Session* session) { - auto op = session->reportStashedState(); - if (!op.isEmpty()) { - ops->emplace_back(op); - } - }); -} - -std::unique_ptr<CollatorInterface> PipelineD::MongoDInterface::_getCollectionDefaultCollator( - OperationContext* opCtx, StringData dbName, UUID collectionUUID) { - auto it = _collatorCache.find(collectionUUID); - if (it == _collatorCache.end()) { - auto collator = [&]() -> std::unique_ptr<CollatorInterface> { - AutoGetCollection autoColl(opCtx, {dbName.toString(), collectionUUID}, MODE_IS); - if (!autoColl.getCollection()) { - // This collection doesn't exist, so assume a nullptr default collation - return nullptr; - } else { - auto defaultCollator = autoColl.getCollection()->getDefaultCollator(); - // Clone the collator so that we can safely use the pointer if the collection - // disappears right after we release the lock. - return defaultCollator ? defaultCollator->clone() : nullptr; - } - }(); - - it = _collatorCache.emplace(collectionUUID, std::move(collator)).first; - } - - auto& collator = it->second; - return collator ? collator->clone() : nullptr; -} - } // namespace mongo |