diff options
author | Svilen Mihaylov <svilen.mihaylov@mongodb.com> | 2020-03-09 15:36:43 -0400 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-03-13 17:33:34 +0000 |
commit | ffd57e8bc049d4d188e667fa4d6c9f956b91ee0b (patch) | |
tree | 136fe2834bb467e837a5cf8f26b647b976f8402d /src | |
parent | c15e8ae74071482d69179c7e5e5e6bdc882d2beb (diff) | |
download | mongo-ffd57e8bc049d4d188e667fa4d6c9f956b91ee0b.tar.gz |
SERVER-44642 Verify that implementation of cross-database $out attaches writeConcern appropriately in sharded environments
Diffstat (limited to 'src')
5 files changed, 170 insertions, 1 deletions
diff --git a/src/mongo/db/pipeline/process_interface/SConscript b/src/mongo/db/pipeline/process_interface/SConscript index 3991a87f6f4..b24e45be165 100644 --- a/src/mongo/db/pipeline/process_interface/SConscript +++ b/src/mongo/db/pipeline/process_interface/SConscript @@ -103,11 +103,13 @@ env.CppUnitTest( target='process_interface_test', source=[ 'mongos_process_interface_test.cpp', + 'shardsvr_process_interface_test.cpp', 'standalone_process_interface_test.cpp', ], LIBDEPS=[ '$BUILD_DIR/mongo/db/query/query_test_service_context', '$BUILD_DIR/mongo/db/service_context_test_fixture', + '$BUILD_DIR/mongo/s/catalog_cache_test_fixture', 'mongos_process_interface', 'shardsvr_process_interface', ] diff --git a/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.h b/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.h index 6148cf0f84b..f350673b106 100644 --- a/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.h @@ -75,7 +75,7 @@ public: Status appendQueryExecStats(OperationContext* opCtx, const NamespaceString& nss, BSONObjBuilder* builder) const final override; - BSONObj getCollectionOptions(OperationContext* opCtx, const NamespaceString& nss) final; + BSONObj getCollectionOptions(OperationContext* opCtx, const NamespaceString& nss) override; std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipelineForLocalRead( Pipeline* pipeline) final; std::string getShardName(OperationContext* opCtx) const final; diff --git a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp index dcca4ab8dd6..5bde04b25a8 100644 --- a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp +++ b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp @@ -184,6 +184,52 @@ void ShardServerProcessInterface::renameIfOptionsAndIndexesHaveNotChanged( str::stream() << "failed while running command " << newCmdObj); } +BSONObj ShardServerProcessInterface::getCollectionOptions(OperationContext* opCtx, + const NamespaceString& nss) { + auto cachedDbInfo = + uassertStatusOK(Grid::get(opCtx)->catalogCache()->getDatabase(opCtx, nss.db())); + auto shard = uassertStatusOK( + Grid::get(opCtx)->shardRegistry()->getShard(opCtx, cachedDbInfo.primaryId())); + + const BSONObj filterObj = BSON("name" << nss.coll()); + const BSONObj cmdObj = BSON("listCollections" << 1 << "filter" << filterObj); + + Shard::QueryResponse resultCollections; + try { + resultCollections = uassertStatusOK( + shard->runExhaustiveCursorCommand(opCtx, + ReadPreferenceSetting(ReadPreference::PrimaryOnly), + nss.db().toString(), + appendDbVersionIfPresent(cmdObj, cachedDbInfo), + Milliseconds(-1))); + } catch (const ExceptionFor<ErrorCodes::NamespaceNotFound>&) { + return BSONObj{}; + } + + if (resultCollections.docs.empty()) { + return BSONObj{}; + } + + for (const BSONObj& element : resultCollections.docs) { + // Return first element which matches on name and has options. + const BSONElement nameElement = element["name"]; + if (!nameElement || nameElement.valueStringDataSafe() != nss.coll()) { + continue; + } + + const BSONElement optionsElement = element["options"]; + if (optionsElement) { + return optionsElement.Obj().getOwned(); + } + + invariant(resultCollections.docs.size() <= 1, + str::stream() << "Expected at most one collection with the name " << nss << ": " + << resultCollections.docs.size()); + } + + return BSONObj{}; +} + std::list<BSONObj> ShardServerProcessInterface::getIndexSpecs(OperationContext* opCtx, const NamespaceString& ns, bool includeBuildUUIDs) { @@ -205,6 +251,7 @@ std::list<BSONObj> ShardServerProcessInterface::getIndexSpecs(OperationContext* } return std::list<BSONObj>(indexes.docs.begin(), indexes.docs.end()); } + void ShardServerProcessInterface::createCollection(OperationContext* opCtx, const std::string& dbName, const BSONObj& cmdObj) { diff --git a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h index cf70b9d9549..76cf5887fbb 100644 --- a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h +++ b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h @@ -93,6 +93,8 @@ public: std::unique_ptr<ShardFilterer> getShardFilterer( const boost::intrusive_ptr<ExpressionContext>& expCtx) const override final; + BSONObj getCollectionOptions(OperationContext* opCtx, const NamespaceString& nss) final; + std::list<BSONObj> getIndexSpecs(OperationContext* opCtx, const NamespaceString& ns, bool includeBuildUUIDs) final; diff --git a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface_test.cpp b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface_test.cpp new file mode 100644 index 00000000000..a1bf2299244 --- /dev/null +++ b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface_test.cpp @@ -0,0 +1,118 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/db/pipeline/document_source_out.h" +#include "mongo/db/pipeline/document_source_queue.h" +#include "mongo/db/pipeline/process_interface/shardsvr_process_interface.h" +#include "mongo/s/query/sharded_agg_test_fixture.h" +#include "mongo/unittest/unittest.h" + +namespace mongo { +namespace { + +// Use this new name to register these tests under their own unit test suite. +using ShardedProcessInterfaceTest = ShardedAggTestFixture; + +TEST_F(ShardedProcessInterfaceTest, TestInsert) { + setupNShards(2); + + const NamespaceString kOutNss = NamespaceString{"unittests-out", "sharded_agg_test"}; + auto outStage = DocumentSourceOut::create(kOutNss, expCtx()); + + // Attach a write concern, and make sure it is forwarded below. + WriteConcernOptions wco{WriteConcernOptions::kMajority, + WriteConcernOptions::SyncMode::UNSET, + WriteConcernOptions::kNoTimeout}; + expCtx()->opCtx->setWriteConcern(wco); + + expCtx()->mongoProcessInterface = std::make_shared<ShardServerProcessInterface>(executor()); + auto queue = DocumentSourceQueue::create(expCtx()); + outStage->setSource(queue.get()); + + auto future = launchAsync([&] { ASSERT_TRUE(outStage->getNext().isEOF()); }); + + expectGetDatabase(kOutNss); + expectGetCollection(kOutNss, OID::gen(), ShardKeyPattern{BSON("_id" << 1)}); + + // Testing the collection options are propagated. + const BSONObj collectionOptions = BSON("validationLevel" + << "moderate"); + const BSONObj listCollectionsResponse = BSON("name" << kOutNss.coll() << "type" + << "collection" + << "options" << collectionOptions); + + // Mock the response to $out's "listCollections" request. + onCommand([&](const executor::RemoteCommandRequest& request) { + return CursorResponse(kTestAggregateNss, CursorId{0}, {listCollectionsResponse}) + .toBSON(CursorResponse::ResponseType::InitialResponse); + }); + + // Mock the response to $out's "listIndexes" request. + const BSONObj indexBSON = BSON("_id" << 1); + const BSONObj listIndexesResponse = BSON("v" << 1 << "key" << indexBSON << "name" + << "_id_" + << "ns" << kOutNss.toString()); + onCommand([&](const executor::RemoteCommandRequest& request) { + return CursorResponse(kTestAggregateNss, CursorId{0}, {listIndexesResponse}) + .toBSON(CursorResponse::ResponseType::InitialResponse); + }); + + // Mock the response to $out's "createCollection" request. + onCommandForPoolExecutor([&](const executor::RemoteCommandRequest& request) { + ASSERT(request.cmdObj.hasField("writeConcern")) << request.cmdObj; + ASSERT_EQ("moderate", request.cmdObj["validationLevel"].str()); + return CursorResponse(kTestAggregateNss, CursorId{0}, {}) + .toBSON(CursorResponse::ResponseType::InitialResponse); + }); + + // Mock the response to $out's "createIndexes" request. + onCommandForPoolExecutor([&](const executor::RemoteCommandRequest& request) { + ASSERT(request.cmdObj.hasField("writeConcern")) << request.cmdObj; + + ASSERT(request.cmdObj.hasField("indexes")); + const std::vector<BSONElement>& indexArray = request.cmdObj["indexes"].Array(); + ASSERT_EQ(1, indexArray.size()); + ASSERT_BSONOBJ_EQ(listIndexesResponse, indexArray.at(0).Obj()); + + return CursorResponse(kTestAggregateNss, CursorId{0}, {}) + .toBSON(CursorResponse::ResponseType::InitialResponse); + }); + + // Mock the response to $out's "renameIfOptionsAndIndexesHaveNotChanged" request. + onCommandForPoolExecutor([&](const executor::RemoteCommandRequest& request) { + ASSERT(request.cmdObj.hasField("writeConcern")) << request.cmdObj; + return CursorResponse(kTestAggregateNss, CursorId{0}, {}) + .toBSON(CursorResponse::ResponseType::InitialResponse); + }); + + future.default_timed_get(); +} + +} // namespace +} // namespace mongo
\ No newline at end of file |