summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline/pipeline_d.cpp
diff options
context:
space:
mode:
authorNick Zolnierz <nicholas.zolnierz@mongodb.com>2018-07-26 16:00:20 -0400
committerNick Zolnierz <nicholas.zolnierz@mongodb.com>2018-08-02 12:23:03 -0400
commitafbe688f0f18c5cb474fb1bcd933d6e06c0c5291 (patch)
treea7c1a3192f335c5e2947e395159eaa03782c3a50 /src/mongo/db/pipeline/pipeline_d.cpp
parentafaf46687eb3930ddbfc8b528bd68295b6a09676 (diff)
downloadmongo-afbe688f0f18c5cb474fb1bcd933d6e06c0c5291.tar.gz
SERVER-35896: Support 'replaceDocuments' mode in $out
Diffstat (limited to 'src/mongo/db/pipeline/pipeline_d.cpp')
-rw-r--r--src/mongo/db/pipeline/pipeline_d.cpp335
1 files changed, 0 insertions, 335 deletions
diff --git a/src/mongo/db/pipeline/pipeline_d.cpp b/src/mongo/db/pipeline/pipeline_d.cpp
index f49b3dfdd8b..f3317c81b3b 100644
--- a/src/mongo/db/pipeline/pipeline_d.cpp
+++ b/src/mongo/db/pipeline/pipeline_d.cpp
@@ -33,24 +33,18 @@
#include "mongo/db/pipeline/pipeline_d.h"
#include "mongo/bson/simple_bsonobj_comparator.h"
-#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/catalog/collection.h"
#include "mongo/db/catalog/database.h"
#include "mongo/db/catalog/database_holder.h"
-#include "mongo/db/catalog/document_validation.h"
#include "mongo/db/catalog/index_catalog.h"
-#include "mongo/db/catalog/uuid_catalog.h"
#include "mongo/db/concurrency/d_concurrency.h"
#include "mongo/db/concurrency/write_conflict_exception.h"
-#include "mongo/db/curop.h"
#include "mongo/db/db_raii.h"
-#include "mongo/db/dbdirectclient.h"
#include "mongo/db/exec/fetch.h"
#include "mongo/db/exec/multi_iterator.h"
#include "mongo/db/exec/shard_filter.h"
#include "mongo/db/exec/working_set.h"
#include "mongo/db/index/index_access_method.h"
-#include "mongo/db/kill_sessions.h"
#include "mongo/db/matcher/extensions_callback_real.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/pipeline/document_source.h"
@@ -73,9 +67,6 @@
#include "mongo/db/s/collection_sharding_state.h"
#include "mongo/db/s/sharding_state.h"
#include "mongo/db/service_context.h"
-#include "mongo/db/session_catalog.h"
-#include "mongo/db/stats/fill_locker_info.h"
-#include "mongo/db/stats/storage_stats.h"
#include "mongo/db/stats/top.h"
#include "mongo/db/storage/record_store.h"
#include "mongo/db/storage/sorted_data_interface.h"
@@ -671,330 +662,4 @@ void PipelineD::getPlanSummaryStats(const Pipeline* pipeline, PlanSummaryStats*
statsOut->usedDisk = usedDisk;
}
-PipelineD::MongoDInterface::MongoDInterface(OperationContext* opCtx) : _client(opCtx) {}
-
-void PipelineD::MongoDInterface::setOperationContext(OperationContext* opCtx) {
- _client.setOpCtx(opCtx);
-}
-
-DBClientBase* PipelineD::MongoDInterface::directClient() {
- return &_client;
-}
-
-bool PipelineD::MongoDInterface::isSharded(OperationContext* opCtx, const NamespaceString& nss) {
- AutoGetCollectionForReadCommand autoColl(opCtx, nss);
- auto const css = CollectionShardingState::get(opCtx, nss);
- return css->getMetadata(opCtx)->isSharded();
-}
-
-BSONObj PipelineD::MongoDInterface::insert(const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const NamespaceString& ns,
- const std::vector<BSONObj>& objs) {
- boost::optional<DisableDocumentValidation> maybeDisableValidation;
- if (expCtx->bypassDocumentValidation)
- maybeDisableValidation.emplace(expCtx->opCtx);
-
- _client.insert(ns.ns(), objs);
- return _client.getLastErrorDetailed();
-}
-
-CollectionIndexUsageMap PipelineD::MongoDInterface::getIndexStats(OperationContext* opCtx,
- const NamespaceString& ns) {
- AutoGetCollectionForReadCommand autoColl(opCtx, ns);
-
- Collection* collection = autoColl.getCollection();
- if (!collection) {
- LOG(2) << "Collection not found on index stats retrieval: " << ns.ns();
- return CollectionIndexUsageMap();
- }
-
- return collection->infoCache()->getIndexUsageStats();
-}
-
-void PipelineD::MongoDInterface::appendLatencyStats(OperationContext* opCtx,
- const NamespaceString& nss,
- bool includeHistograms,
- BSONObjBuilder* builder) const {
- Top::get(opCtx->getServiceContext()).appendLatencyStats(nss.ns(), includeHistograms, builder);
-}
-
-Status PipelineD::MongoDInterface::appendStorageStats(OperationContext* opCtx,
- const NamespaceString& nss,
- const BSONObj& param,
- BSONObjBuilder* builder) const {
- return appendCollectionStorageStats(opCtx, nss, param, builder);
-}
-
-Status PipelineD::MongoDInterface::appendRecordCount(OperationContext* opCtx,
- const NamespaceString& nss,
- BSONObjBuilder* builder) const {
- return appendCollectionRecordCount(opCtx, nss, builder);
-}
-
-BSONObj PipelineD::MongoDInterface::getCollectionOptions(const NamespaceString& nss) {
- const auto infos = _client.getCollectionInfos(nss.db().toString(), BSON("name" << nss.coll()));
- return infos.empty() ? BSONObj() : infos.front().getObjectField("options").getOwned();
-}
-
-void PipelineD::MongoDInterface::renameIfOptionsAndIndexesHaveNotChanged(
- OperationContext* opCtx,
- const BSONObj& renameCommandObj,
- const NamespaceString& targetNs,
- const BSONObj& originalCollectionOptions,
- const std::list<BSONObj>& originalIndexes) {
- Lock::GlobalWrite globalLock(opCtx);
-
- uassert(ErrorCodes::CommandFailed,
- str::stream() << "collection options of target collection " << targetNs.ns()
- << " changed during processing. Original options: "
- << originalCollectionOptions
- << ", new options: "
- << getCollectionOptions(targetNs),
- SimpleBSONObjComparator::kInstance.evaluate(originalCollectionOptions ==
- getCollectionOptions(targetNs)));
-
- auto currentIndexes = _client.getIndexSpecs(targetNs.ns());
- uassert(ErrorCodes::CommandFailed,
- str::stream() << "indexes of target collection " << targetNs.ns()
- << " changed during processing.",
- originalIndexes.size() == currentIndexes.size() &&
- std::equal(originalIndexes.begin(),
- originalIndexes.end(),
- currentIndexes.begin(),
- SimpleBSONObjComparator::kInstance.makeEqualTo()));
-
- BSONObj info;
- uassert(ErrorCodes::CommandFailed,
- str::stream() << "renameCollection failed: " << info,
- _client.runCommand("admin", renameCommandObj, info));
-}
-
-StatusWith<std::unique_ptr<Pipeline, PipelineDeleter>> PipelineD::MongoDInterface::makePipeline(
- const std::vector<BSONObj>& rawPipeline,
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const MakePipelineOptions opts) {
- auto pipeline = Pipeline::parse(rawPipeline, expCtx);
- if (!pipeline.isOK()) {
- return pipeline.getStatus();
- }
-
- if (opts.optimize) {
- pipeline.getValue()->optimizePipeline();
- }
-
- Status cursorStatus = Status::OK();
-
- if (opts.attachCursorSource) {
- cursorStatus = attachCursorSourceToPipeline(expCtx, pipeline.getValue().get());
- }
-
- return cursorStatus.isOK() ? std::move(pipeline) : cursorStatus;
-}
-
-Status PipelineD::MongoDInterface::attachCursorSourceToPipeline(
- const boost::intrusive_ptr<ExpressionContext>& expCtx, Pipeline* pipeline) {
- invariant(pipeline->getSources().empty() ||
- !dynamic_cast<DocumentSourceCursor*>(pipeline->getSources().front().get()));
-
- boost::optional<AutoGetCollectionForReadCommand> autoColl;
- if (expCtx->uuid) {
- try {
- autoColl.emplace(expCtx->opCtx,
- NamespaceStringOrUUID{expCtx->ns.db().toString(), *expCtx->uuid});
- } catch (const ExceptionFor<ErrorCodes::NamespaceNotFound>& ex) {
- // The UUID doesn't exist anymore
- return ex.toStatus();
- }
- } else {
- autoColl.emplace(expCtx->opCtx, expCtx->ns);
- }
-
- // makePipeline() is only called to perform secondary aggregation requests and expects the
- // collection representing the document source to be not-sharded. We confirm sharding state
- // here to avoid taking a collection lock elsewhere for this purpose alone.
- // TODO SERVER-27616: This check is incorrect in that we don't acquire a collection cursor
- // until after we release the lock, leaving room for a collection to be sharded in-between.
- auto css = CollectionShardingState::get(expCtx->opCtx, expCtx->ns);
- uassert(4567,
- str::stream() << "from collection (" << expCtx->ns.ns() << ") cannot be sharded",
- !css->getMetadata(expCtx->opCtx)->isSharded());
-
- PipelineD::prepareCursorSource(autoColl->getCollection(), expCtx->ns, nullptr, pipeline);
-
- return Status::OK();
-}
-
-std::string PipelineD::MongoDInterface::getShardName(OperationContext* opCtx) const {
- if (ShardingState::get(opCtx)->enabled()) {
- return ShardingState::get(opCtx)->shardId().toString();
- }
-
- return std::string();
-}
-
-std::pair<std::vector<FieldPath>, bool> PipelineD::MongoDInterface::collectDocumentKeyFields(
- OperationContext* opCtx, UUID uuid) const {
- if (serverGlobalParams.clusterRole != ClusterRole::ShardServer) {
- return {{"_id"}, false}; // Nothing is sharded.
- }
-
- // An empty namespace indicates that the collection has been dropped. Treat it as unsharded and
- // mark the fields as final.
- auto nss = UUIDCatalog::get(opCtx).lookupNSSByUUID(uuid);
- if (nss.isEmpty()) {
- return {{"_id"}, true};
- }
-
- // Before taking a collection lock to retrieve the shard key fields, consult the catalog cache
- // to determine whether the collection is sharded in the first place.
- auto catalogCache = Grid::get(opCtx)->catalogCache();
-
- const bool collectionIsSharded = catalogCache && [&]() {
- auto routingInfo = catalogCache->getCollectionRoutingInfo(opCtx, nss);
- return routingInfo.isOK() && routingInfo.getValue().cm();
- }();
-
- // Collection exists and is not sharded, mark as not final.
- if (!collectionIsSharded) {
- return {{"_id"}, false};
- }
-
- auto scm = [opCtx, &nss]() -> ScopedCollectionMetadata {
- AutoGetCollection autoColl(opCtx, nss, MODE_IS);
- return CollectionShardingState::get(opCtx, nss)->getMetadata(opCtx);
- }();
-
- // Collection is not sharded or UUID mismatch implies collection has been dropped and recreated
- // as sharded.
- if (!scm->isSharded() || !scm->uuidMatches(uuid)) {
- return {{"_id"}, false};
- }
-
- // Unpack the shard key.
- std::vector<FieldPath> result;
- bool gotId = false;
- for (auto& field : scm->getKeyPatternFields()) {
- result.emplace_back(field->dottedField());
- gotId |= (result.back().fullPath() == "_id");
- }
- if (!gotId) { // If not part of the shard key, "_id" comes last.
- result.emplace_back("_id");
- }
- // Collection is now sharded so the document key fields will never change, mark as final.
- return {result, true};
-}
-
-std::vector<GenericCursor> PipelineD::MongoDInterface::getCursors(
- const intrusive_ptr<ExpressionContext>& expCtx) const {
- return CursorManager::getAllCursors(expCtx->opCtx);
-}
-
-boost::optional<Document> PipelineD::MongoDInterface::lookupSingleDocument(
- const boost::intrusive_ptr<ExpressionContext>& expCtx,
- const NamespaceString& nss,
- UUID collectionUUID,
- const Document& documentKey,
- boost::optional<BSONObj> readConcern) {
- invariant(!readConcern); // We don't currently support a read concern on mongod - it's only
- // expected to be necessary on mongos.
-
- std::unique_ptr<Pipeline, PipelineDeleter> pipeline;
- try {
- // Be sure to do the lookup using the collection default collation
- auto foreignExpCtx = expCtx->copyWith(
- nss,
- collectionUUID,
- _getCollectionDefaultCollator(expCtx->opCtx, nss.db(), collectionUUID));
- pipeline = uassertStatusOK(makePipeline({BSON("$match" << documentKey)}, foreignExpCtx));
- } catch (const ExceptionFor<ErrorCodes::NamespaceNotFound>&) {
- return boost::none;
- }
-
- auto lookedUpDocument = pipeline->getNext();
- if (auto next = pipeline->getNext()) {
- uasserted(ErrorCodes::TooManyMatchingDocuments,
- str::stream() << "found more than one document with document key "
- << documentKey.toString()
- << " ["
- << lookedUpDocument->toString()
- << ", "
- << next->toString()
- << "]");
- }
- return lookedUpDocument;
-}
-
-BSONObj PipelineD::MongoDInterface::_reportCurrentOpForClient(
- OperationContext* opCtx, Client* client, CurrentOpTruncateMode truncateOps) const {
- BSONObjBuilder builder;
-
- CurOp::reportCurrentOpForClient(
- opCtx, client, (truncateOps == CurrentOpTruncateMode::kTruncateOps), &builder);
-
- OperationContext* clientOpCtx = client->getOperationContext();
-
- if (clientOpCtx) {
- if (auto opCtxSession = OperationContextSession::get(clientOpCtx)) {
- opCtxSession->reportUnstashedState(repl::ReadConcernArgs::get(clientOpCtx), &builder);
- }
-
- // Append lock stats before returning.
- if (auto lockerInfo = clientOpCtx->lockState()->getLockerInfo()) {
- fillLockerInfo(*lockerInfo, builder);
- }
- }
-
- return builder.obj();
-}
-
-void PipelineD::MongoDInterface::_reportCurrentOpsForIdleSessions(OperationContext* opCtx,
- CurrentOpUserMode userMode,
- std::vector<BSONObj>* ops) const {
- auto sessionCatalog = SessionCatalog::get(opCtx);
-
- const bool authEnabled =
- AuthorizationSession::get(opCtx->getClient())->getAuthorizationManager().isAuthEnabled();
-
- // If the user is listing only their own ops, we use makeSessionFilterForAuthenticatedUsers to
- // create a pattern that will match against all authenticated usernames for the current client.
- // If the user is listing ops for all users, we create an empty pattern; constructing an
- // instance of SessionKiller::Matcher with this empty pattern will return all sessions.
- auto sessionFilter = (authEnabled && userMode == CurrentOpUserMode::kExcludeOthers
- ? makeSessionFilterForAuthenticatedUsers(opCtx)
- : KillAllSessionsByPatternSet{{}});
-
- sessionCatalog->scanSessions(opCtx,
- {std::move(sessionFilter)},
- [&](OperationContext* opCtx, Session* session) {
- auto op = session->reportStashedState();
- if (!op.isEmpty()) {
- ops->emplace_back(op);
- }
- });
-}
-
-std::unique_ptr<CollatorInterface> PipelineD::MongoDInterface::_getCollectionDefaultCollator(
- OperationContext* opCtx, StringData dbName, UUID collectionUUID) {
- auto it = _collatorCache.find(collectionUUID);
- if (it == _collatorCache.end()) {
- auto collator = [&]() -> std::unique_ptr<CollatorInterface> {
- AutoGetCollection autoColl(opCtx, {dbName.toString(), collectionUUID}, MODE_IS);
- if (!autoColl.getCollection()) {
- // This collection doesn't exist, so assume a nullptr default collation
- return nullptr;
- } else {
- auto defaultCollator = autoColl.getCollection()->getDefaultCollator();
- // Clone the collator so that we can safely use the pointer if the collection
- // disappears right after we release the lock.
- return defaultCollator ? defaultCollator->clone() : nullptr;
- }
- }();
-
- it = _collatorCache.emplace(collectionUUID, std::move(collator)).first;
- }
-
- auto& collator = it->second;
- return collator ? collator->clone() : nullptr;
-}
-
} // namespace mongo