summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSvilen Mihaylov <svilen.mihaylov@mongodb.com>2020-03-09 15:36:43 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-03-13 17:33:34 +0000
commitffd57e8bc049d4d188e667fa4d6c9f956b91ee0b (patch)
tree136fe2834bb467e837a5cf8f26b647b976f8402d /src
parentc15e8ae74071482d69179c7e5e5e6bdc882d2beb (diff)
downloadmongo-ffd57e8bc049d4d188e667fa4d6c9f956b91ee0b.tar.gz
SERVER-44642 Verify that implementation of cross-database $out attaches writeConcern appropriately in sharded environments
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/pipeline/process_interface/SConscript2
-rw-r--r--src/mongo/db/pipeline/process_interface/common_mongod_process_interface.h2
-rw-r--r--src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp47
-rw-r--r--src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h2
-rw-r--r--src/mongo/db/pipeline/process_interface/shardsvr_process_interface_test.cpp118
5 files changed, 170 insertions, 1 deletions
diff --git a/src/mongo/db/pipeline/process_interface/SConscript b/src/mongo/db/pipeline/process_interface/SConscript
index 3991a87f6f4..b24e45be165 100644
--- a/src/mongo/db/pipeline/process_interface/SConscript
+++ b/src/mongo/db/pipeline/process_interface/SConscript
@@ -103,11 +103,13 @@ env.CppUnitTest(
target='process_interface_test',
source=[
'mongos_process_interface_test.cpp',
+ 'shardsvr_process_interface_test.cpp',
'standalone_process_interface_test.cpp',
],
LIBDEPS=[
'$BUILD_DIR/mongo/db/query/query_test_service_context',
'$BUILD_DIR/mongo/db/service_context_test_fixture',
+ '$BUILD_DIR/mongo/s/catalog_cache_test_fixture',
'mongos_process_interface',
'shardsvr_process_interface',
]
diff --git a/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.h b/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.h
index 6148cf0f84b..f350673b106 100644
--- a/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.h
+++ b/src/mongo/db/pipeline/process_interface/common_mongod_process_interface.h
@@ -75,7 +75,7 @@ public:
Status appendQueryExecStats(OperationContext* opCtx,
const NamespaceString& nss,
BSONObjBuilder* builder) const final override;
- BSONObj getCollectionOptions(OperationContext* opCtx, const NamespaceString& nss) final;
+ BSONObj getCollectionOptions(OperationContext* opCtx, const NamespaceString& nss) override;
std::unique_ptr<Pipeline, PipelineDeleter> attachCursorSourceToPipelineForLocalRead(
Pipeline* pipeline) final;
std::string getShardName(OperationContext* opCtx) const final;
diff --git a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp
index dcca4ab8dd6..5bde04b25a8 100644
--- a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp
+++ b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.cpp
@@ -184,6 +184,52 @@ void ShardServerProcessInterface::renameIfOptionsAndIndexesHaveNotChanged(
str::stream() << "failed while running command " << newCmdObj);
}
+BSONObj ShardServerProcessInterface::getCollectionOptions(OperationContext* opCtx,
+ const NamespaceString& nss) {
+ auto cachedDbInfo =
+ uassertStatusOK(Grid::get(opCtx)->catalogCache()->getDatabase(opCtx, nss.db()));
+ auto shard = uassertStatusOK(
+ Grid::get(opCtx)->shardRegistry()->getShard(opCtx, cachedDbInfo.primaryId()));
+
+ const BSONObj filterObj = BSON("name" << nss.coll());
+ const BSONObj cmdObj = BSON("listCollections" << 1 << "filter" << filterObj);
+
+ Shard::QueryResponse resultCollections;
+ try {
+ resultCollections = uassertStatusOK(
+ shard->runExhaustiveCursorCommand(opCtx,
+ ReadPreferenceSetting(ReadPreference::PrimaryOnly),
+ nss.db().toString(),
+ appendDbVersionIfPresent(cmdObj, cachedDbInfo),
+ Milliseconds(-1)));
+ } catch (const ExceptionFor<ErrorCodes::NamespaceNotFound>&) {
+ return BSONObj{};
+ }
+
+ if (resultCollections.docs.empty()) {
+ return BSONObj{};
+ }
+
+ for (const BSONObj& element : resultCollections.docs) {
+ // Return first element which matches on name and has options.
+ const BSONElement nameElement = element["name"];
+ if (!nameElement || nameElement.valueStringDataSafe() != nss.coll()) {
+ continue;
+ }
+
+ const BSONElement optionsElement = element["options"];
+ if (optionsElement) {
+ return optionsElement.Obj().getOwned();
+ }
+
+ invariant(resultCollections.docs.size() <= 1,
+ str::stream() << "Expected at most one collection with the name " << nss << ": "
+ << resultCollections.docs.size());
+ }
+
+ return BSONObj{};
+}
+
std::list<BSONObj> ShardServerProcessInterface::getIndexSpecs(OperationContext* opCtx,
const NamespaceString& ns,
bool includeBuildUUIDs) {
@@ -205,6 +251,7 @@ std::list<BSONObj> ShardServerProcessInterface::getIndexSpecs(OperationContext*
}
return std::list<BSONObj>(indexes.docs.begin(), indexes.docs.end());
}
+
void ShardServerProcessInterface::createCollection(OperationContext* opCtx,
const std::string& dbName,
const BSONObj& cmdObj) {
diff --git a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h
index cf70b9d9549..76cf5887fbb 100644
--- a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h
+++ b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface.h
@@ -93,6 +93,8 @@ public:
std::unique_ptr<ShardFilterer> getShardFilterer(
const boost::intrusive_ptr<ExpressionContext>& expCtx) const override final;
+ BSONObj getCollectionOptions(OperationContext* opCtx, const NamespaceString& nss) final;
+
std::list<BSONObj> getIndexSpecs(OperationContext* opCtx,
const NamespaceString& ns,
bool includeBuildUUIDs) final;
diff --git a/src/mongo/db/pipeline/process_interface/shardsvr_process_interface_test.cpp b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface_test.cpp
new file mode 100644
index 00000000000..a1bf2299244
--- /dev/null
+++ b/src/mongo/db/pipeline/process_interface/shardsvr_process_interface_test.cpp
@@ -0,0 +1,118 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/db/pipeline/document_source_out.h"
+#include "mongo/db/pipeline/document_source_queue.h"
+#include "mongo/db/pipeline/process_interface/shardsvr_process_interface.h"
+#include "mongo/s/query/sharded_agg_test_fixture.h"
+#include "mongo/unittest/unittest.h"
+
+namespace mongo {
+namespace {
+
+// Use this new name to register these tests under their own unit test suite.
+using ShardedProcessInterfaceTest = ShardedAggTestFixture;
+
+TEST_F(ShardedProcessInterfaceTest, TestInsert) {
+ setupNShards(2);
+
+ const NamespaceString kOutNss = NamespaceString{"unittests-out", "sharded_agg_test"};
+ auto outStage = DocumentSourceOut::create(kOutNss, expCtx());
+
+ // Attach a write concern, and make sure it is forwarded below.
+ WriteConcernOptions wco{WriteConcernOptions::kMajority,
+ WriteConcernOptions::SyncMode::UNSET,
+ WriteConcernOptions::kNoTimeout};
+ expCtx()->opCtx->setWriteConcern(wco);
+
+ expCtx()->mongoProcessInterface = std::make_shared<ShardServerProcessInterface>(executor());
+ auto queue = DocumentSourceQueue::create(expCtx());
+ outStage->setSource(queue.get());
+
+ auto future = launchAsync([&] { ASSERT_TRUE(outStage->getNext().isEOF()); });
+
+ expectGetDatabase(kOutNss);
+ expectGetCollection(kOutNss, OID::gen(), ShardKeyPattern{BSON("_id" << 1)});
+
+ // Testing the collection options are propagated.
+ const BSONObj collectionOptions = BSON("validationLevel"
+ << "moderate");
+ const BSONObj listCollectionsResponse = BSON("name" << kOutNss.coll() << "type"
+ << "collection"
+ << "options" << collectionOptions);
+
+ // Mock the response to $out's "listCollections" request.
+ onCommand([&](const executor::RemoteCommandRequest& request) {
+ return CursorResponse(kTestAggregateNss, CursorId{0}, {listCollectionsResponse})
+ .toBSON(CursorResponse::ResponseType::InitialResponse);
+ });
+
+ // Mock the response to $out's "listIndexes" request.
+ const BSONObj indexBSON = BSON("_id" << 1);
+ const BSONObj listIndexesResponse = BSON("v" << 1 << "key" << indexBSON << "name"
+ << "_id_"
+ << "ns" << kOutNss.toString());
+ onCommand([&](const executor::RemoteCommandRequest& request) {
+ return CursorResponse(kTestAggregateNss, CursorId{0}, {listIndexesResponse})
+ .toBSON(CursorResponse::ResponseType::InitialResponse);
+ });
+
+ // Mock the response to $out's "createCollection" request.
+ onCommandForPoolExecutor([&](const executor::RemoteCommandRequest& request) {
+ ASSERT(request.cmdObj.hasField("writeConcern")) << request.cmdObj;
+ ASSERT_EQ("moderate", request.cmdObj["validationLevel"].str());
+ return CursorResponse(kTestAggregateNss, CursorId{0}, {})
+ .toBSON(CursorResponse::ResponseType::InitialResponse);
+ });
+
+ // Mock the response to $out's "createIndexes" request.
+ onCommandForPoolExecutor([&](const executor::RemoteCommandRequest& request) {
+ ASSERT(request.cmdObj.hasField("writeConcern")) << request.cmdObj;
+
+ ASSERT(request.cmdObj.hasField("indexes"));
+ const std::vector<BSONElement>& indexArray = request.cmdObj["indexes"].Array();
+ ASSERT_EQ(1, indexArray.size());
+ ASSERT_BSONOBJ_EQ(listIndexesResponse, indexArray.at(0).Obj());
+
+ return CursorResponse(kTestAggregateNss, CursorId{0}, {})
+ .toBSON(CursorResponse::ResponseType::InitialResponse);
+ });
+
+ // Mock the response to $out's "renameIfOptionsAndIndexesHaveNotChanged" request.
+ onCommandForPoolExecutor([&](const executor::RemoteCommandRequest& request) {
+ ASSERT(request.cmdObj.hasField("writeConcern")) << request.cmdObj;
+ return CursorResponse(kTestAggregateNss, CursorId{0}, {})
+ .toBSON(CursorResponse::ResponseType::InitialResponse);
+ });
+
+ future.default_timed_get();
+}
+
+} // namespace
+} // namespace mongo \ No newline at end of file