summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authorkauboy26 <vishnu.kaushik@mongodb.com>2023-04-03 20:42:55 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2023-04-03 22:48:14 +0000
commitb4cacf7c95adbdf22c2e20b51bd0d0a31ea6e8d3 (patch)
tree0c0f3abeafb62e878e5b314c754a9738fbdc5c3e /src/mongo/db
parentbb7497ecfdd45b7e87c0015aa515e2838414318d (diff)
downloadmongo-b4cacf7c95adbdf22c2e20b51bd0d0a31ea6e8d3.tar.gz
SERVER-72789 Validate the database/shard versions for bulkWrite sent from mongos
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/SConscript2
-rw-r--r--src/mongo/db/bulk_write_shard_test.cpp504
-rw-r--r--src/mongo/db/commands/bulk_write.cpp191
-rw-r--r--src/mongo/db/commands/bulk_write.h44
4 files changed, 653 insertions, 88 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index a783e17f555..30b1d60cbf1 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -2510,6 +2510,7 @@ if wiredtiger:
envWithAsio.CppUnitTest(
target='db_unittest_test',
source=[
+ 'bulk_write_shard_test.cpp',
'cancelable_operation_context_test.cpp',
'catalog_raii_test.cpp',
'change_collection_expired_change_remover_test.cpp',
@@ -2590,6 +2591,7 @@ if wiredtiger:
'$BUILD_DIR/mongo/db/change_stream_change_collection_manager',
'$BUILD_DIR/mongo/db/change_stream_serverless_helpers',
'$BUILD_DIR/mongo/db/change_streams_cluster_parameter',
+ '$BUILD_DIR/mongo/db/commands/bulk_write_command',
'$BUILD_DIR/mongo/db/mongohasher',
'$BUILD_DIR/mongo/db/ops/write_ops',
'$BUILD_DIR/mongo/db/pipeline/change_stream_expired_pre_image_remover',
diff --git a/src/mongo/db/bulk_write_shard_test.cpp b/src/mongo/db/bulk_write_shard_test.cpp
new file mode 100644
index 00000000000..c53a3961d57
--- /dev/null
+++ b/src/mongo/db/bulk_write_shard_test.cpp
@@ -0,0 +1,504 @@
+/**
+ * Copyright (C) 2023-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/db/catalog/collection_uuid_mismatch_info.h"
+#include "mongo/db/catalog/create_collection.h"
+#include "mongo/db/catalog/database_holder.h"
+#include "mongo/db/commands/bulk_write.h"
+#include "mongo/db/commands/bulk_write_gen.h"
+#include "mongo/db/dbdirectclient.h"
+#include "mongo/db/repl/replication_coordinator_mock.h"
+#include "mongo/db/s/collection_sharding_runtime.h"
+#include "mongo/db/s/database_sharding_state.h"
+#include "mongo/db/s/operation_sharding_state.h"
+#include "mongo/db/s/sharding_state.h"
+#include "mongo/db/service_context_d_test_fixture.h"
+#include "mongo/db/shard_role.h"
+#include "mongo/s/shard_version_factory.h"
+#include "mongo/unittest/assert.h"
+#include "mongo/unittest/death_test.h"
+#include "mongo/unittest/unittest.h"
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest
+
+namespace mongo {
+namespace {
+
+// BulkWriteCommand unit tests for a mongod that is a shard server. In order to
+// do so, in setup() we install collection metadata (shard version & database
+// version) on the node. Consequently any collection metadata attached to the
+// bulk request will be compared to the installed metadata and a StaleConfig
+// error will be thrown in case of a mismatch.
+//
+// The installed collection metadata looks as follows. For the exact values used
+// for the database and shard versions, refer to the corresponding variables.
+// +---------+-------------------------+-------------+---------------+---------------+
+// | Db Name | Coll Name | Sharded? | Db Version | Shard Version |
+// +---------+-------------------------+-------------+---------------+---------------+
+// | testDB1 | unsharded.radiohead | NO | dbV1 | UNSHARDED() |
+// | testDB1 | sharded.porcupine.tree | YES | dbV1 | sV1 |
+// | testDB2 | sharded.oasis | YES | dbV2 | sV2 |
+// +---------+-------------------------+-------------+---------------+---------------+
+class BulkWriteShardTest : public ServiceContextMongoDTest {
+protected:
+ OperationContext* opCtx() {
+ return _opCtx.get();
+ }
+
+ void setUp() override;
+ void tearDown() override;
+
+ const ShardId thisShardId{"this"};
+
+ const DatabaseName dbNameTestDb1{"testDB1"};
+ const DatabaseVersion dbVersionTestDb1{UUID::gen(), Timestamp(1, 0)};
+ const DatabaseName dbNameTestDb2{"testDB2"};
+ const DatabaseVersion dbVersionTestDb2{UUID::gen(), Timestamp(2, 0)};
+
+ const NamespaceString nssUnshardedCollection1 =
+ NamespaceString::createNamespaceString_forTest(dbNameTestDb1, "unsharded.radiohead");
+
+ const NamespaceString nssShardedCollection1 =
+ NamespaceString::createNamespaceString_forTest(dbNameTestDb1, "sharded.porcupine.tree");
+ const ShardVersion shardVersionShardedCollection1 = ShardVersionFactory::make(
+ ChunkVersion(CollectionGeneration{OID::gen(), Timestamp(5, 0)}, CollectionPlacement(10, 1)),
+ boost::optional<CollectionIndexes>(boost::none));
+
+ const NamespaceString nssShardedCollection2 =
+ NamespaceString::createNamespaceString_forTest(dbNameTestDb2, "sharded.oasis");
+ const ShardVersion shardVersionShardedCollection2 = ShardVersionFactory::make(
+ ChunkVersion(CollectionGeneration{OID::gen(), Timestamp(6, 0)}, CollectionPlacement(10, 1)),
+ boost::optional<CollectionIndexes>(boost::none));
+
+ // Used to cause a database version mismatch.
+ const DatabaseVersion incorrectDatabaseVersion{UUID::gen(), Timestamp(3, 0)};
+ // Used to cause a shard version mismatch.
+ const ShardVersion incorrectShardVersion =
+ ShardVersionFactory::make(ChunkVersion(CollectionGeneration{OID::gen(), Timestamp(12, 0)},
+ CollectionPlacement(10, 1)),
+ boost::optional<CollectionIndexes>(boost::none));
+
+private:
+ ServiceContext::UniqueOperationContext _opCtx;
+};
+
+void createTestCollection(OperationContext* opCtx, const NamespaceString& nss) {
+ uassertStatusOK(createCollection(opCtx, nss.dbName(), BSON("create" << nss.coll())));
+}
+
+void installDatabaseMetadata(OperationContext* opCtx,
+ const DatabaseName& dbName,
+ const DatabaseVersion& dbVersion) {
+ AutoGetDb autoDb(opCtx, dbName, MODE_X, {});
+ auto scopedDss = DatabaseShardingState::assertDbLockedAndAcquireExclusive(opCtx, dbName);
+ scopedDss->setDbInfo(opCtx, {dbName.db(), ShardId("this"), dbVersion});
+}
+
+void installUnshardedCollectionMetadata(OperationContext* opCtx, const NamespaceString& nss) {
+ const auto unshardedCollectionMetadata = CollectionMetadata();
+ AutoGetCollection coll(opCtx, nss, MODE_IX);
+ CollectionShardingRuntime::assertCollectionLockedAndAcquireExclusive(opCtx, nss)
+ ->setFilteringMetadata(opCtx, unshardedCollectionMetadata);
+}
+
+void installShardedCollectionMetadata(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const DatabaseVersion& dbVersion,
+ std::vector<ChunkType> chunks,
+ ShardId thisShardId) {
+ ASSERT(!chunks.empty());
+
+ const auto uuid = [&] {
+ AutoGetCollection autoColl(opCtx, nss, MODE_IX);
+ return autoColl.getCollection()->uuid();
+ }();
+
+ const std::string shardKey("skey");
+ const ShardKeyPattern shardKeyPattern{BSON(shardKey << 1)};
+ const auto epoch = chunks.front().getVersion().epoch();
+ const auto timestamp = chunks.front().getVersion().getTimestamp();
+
+ auto rt = RoutingTableHistory::makeNew(nss,
+ uuid,
+ shardKeyPattern.getKeyPattern(),
+ nullptr,
+ false,
+ epoch,
+ timestamp,
+ boost::none /* timeseriesFields */,
+ boost::none /* resharding Fields */,
+ true /* allowMigrations */,
+ chunks);
+
+ const auto version = rt.getVersion();
+ const auto rtHandle =
+ RoutingTableHistoryValueHandle(std::make_shared<RoutingTableHistory>(std::move(rt)),
+ ComparableChunkVersion::makeComparableChunkVersion(version));
+
+ const auto collectionMetadata = CollectionMetadata(
+ ChunkManager(thisShardId, dbVersion, rtHandle, boost::none), thisShardId);
+
+ AutoGetCollection coll(opCtx, nss, MODE_IX);
+ CollectionShardingRuntime::assertCollectionLockedAndAcquireExclusive(opCtx, nss)
+ ->setFilteringMetadata(opCtx, collectionMetadata);
+}
+
+
+UUID getCollectionUUID(OperationContext* opCtx, const NamespaceString& nss) {
+ const auto optUuid = CollectionCatalog::get(opCtx)->lookupUUIDByNSS(opCtx, nss);
+ ASSERT(optUuid);
+ return *optUuid;
+}
+
+void BulkWriteShardTest::setUp() {
+ ServiceContextMongoDTest::setUp();
+ _opCtx = getGlobalServiceContext()->makeOperationContext(&cc());
+ serverGlobalParams.clusterRole = ClusterRole::ShardServer;
+
+ const repl::ReplSettings replSettings = {};
+ repl::ReplicationCoordinator::set(
+ getGlobalServiceContext(),
+ std::unique_ptr<repl::ReplicationCoordinator>(
+ new repl::ReplicationCoordinatorMock(_opCtx->getServiceContext(), replSettings)));
+ ASSERT_OK(repl::ReplicationCoordinator::get(getGlobalServiceContext())
+ ->setFollowerMode(repl::MemberState::RS_PRIMARY));
+
+ repl::createOplog(_opCtx.get());
+
+ ShardingState::get(getServiceContext())->setInitialized(ShardId("this"), OID::gen());
+
+ // Setup test collections and metadata
+ installDatabaseMetadata(opCtx(), dbNameTestDb1, dbVersionTestDb1);
+ installDatabaseMetadata(opCtx(), dbNameTestDb2, dbVersionTestDb2);
+
+ // Create nssUnshardedCollection1
+ createTestCollection(opCtx(), nssUnshardedCollection1);
+ installUnshardedCollectionMetadata(opCtx(), nssUnshardedCollection1);
+
+ // Create nssShardedCollection1
+ createTestCollection(opCtx(), nssShardedCollection1);
+ const auto uuidShardedCollection1 = getCollectionUUID(_opCtx.get(), nssShardedCollection1);
+ installShardedCollectionMetadata(
+ opCtx(),
+ nssShardedCollection1,
+ dbVersionTestDb1,
+ {ChunkType(uuidShardedCollection1,
+ ChunkRange{BSON("skey" << MINKEY), BSON("skey" << MAXKEY)},
+ shardVersionShardedCollection1.placementVersion(),
+ thisShardId)},
+ thisShardId);
+
+ // Create nssShardedCollection2
+ createTestCollection(opCtx(), nssShardedCollection2);
+ const auto uuidShardedCollection2 = getCollectionUUID(_opCtx.get(), nssShardedCollection2);
+ installShardedCollectionMetadata(
+ opCtx(),
+ nssShardedCollection2,
+ dbVersionTestDb2,
+ {ChunkType(uuidShardedCollection2,
+ ChunkRange{BSON("skey" << MINKEY), BSON("skey" << MAXKEY)},
+ shardVersionShardedCollection2.placementVersion(),
+ thisShardId)},
+ thisShardId);
+}
+
+void BulkWriteShardTest::tearDown() {
+ _opCtx.reset();
+ ServiceContextMongoDTest::tearDown();
+ repl::ReplicationCoordinator::set(getGlobalServiceContext(), nullptr);
+}
+
+NamespaceInfoEntry nsInfoWithShardDatabaseVersions(NamespaceString nss,
+ boost::optional<DatabaseVersion> dv,
+ boost::optional<ShardVersion> sv) {
+ NamespaceInfoEntry nsInfoEntry(nss);
+ nsInfoEntry.setDatabaseVersion(dv);
+ nsInfoEntry.setShardVersion(sv);
+ return nsInfoEntry;
+}
+
+// Three successful ordered inserts into different collections.
+TEST_F(BulkWriteShardTest, ThreeSuccessfulInsertsOrdered) {
+ BulkWriteCommandRequest request(
+ {BulkWriteInsertOp(0, BSON("x" << 1)),
+ BulkWriteInsertOp(1, BSON("x" << -1)),
+ BulkWriteInsertOp(2, BSON("x" << -1))},
+ {
+ nsInfoWithShardDatabaseVersions(
+ nssUnshardedCollection1, dbVersionTestDb1, ShardVersion::UNSHARDED()),
+ nsInfoWithShardDatabaseVersions(
+ nssShardedCollection1, dbVersionTestDb1, shardVersionShardedCollection1),
+ nsInfoWithShardDatabaseVersions(
+ nssShardedCollection2, dbVersionTestDb2, shardVersionShardedCollection2),
+ });
+
+ auto replyItems = bulk_write::performWrites(opCtx(), request);
+
+ ASSERT_EQ(3, replyItems.size());
+ for (const auto& reply : replyItems) {
+ ASSERT_OK(reply.getStatus());
+ }
+
+ OperationShardingState::get(opCtx()).resetShardingOperationFailedStatus();
+}
+
+// An insert into a sharded collection and an unsharded collection but have the first
+// insert fail, resulting in skipping the second insert.
+TEST_F(BulkWriteShardTest, OneFailingShardedOneSkippedUnshardedSuccessInsertOrdered) {
+ BulkWriteCommandRequest request(
+ {BulkWriteInsertOp(0, BSON("x" << 1)), BulkWriteInsertOp(1, BSON("x" << -1))},
+ {nsInfoWithShardDatabaseVersions(
+ nssShardedCollection1, dbVersionTestDb1, incorrectShardVersion),
+ nsInfoWithShardDatabaseVersions(
+ nssUnshardedCollection1, dbVersionTestDb1, ShardVersion::UNSHARDED())});
+
+ auto replyItems = bulk_write::performWrites(opCtx(), request);
+
+ ASSERT_EQ(1, replyItems.size());
+ ASSERT_EQ(ErrorCodes::StaleConfig, replyItems.back().getStatus().code());
+
+ OperationShardingState::get(opCtx()).resetShardingOperationFailedStatus();
+}
+
+// Two ordered inserts into the same sharded collection, but the sharded collection's metadata
+// is stale and so the first write should fail and the second write should be skipped.
+TEST_F(BulkWriteShardTest, TwoFailingShardedInsertsOrdered) {
+ BulkWriteCommandRequest request(
+ {BulkWriteInsertOp(0, BSON("x" << 1)), BulkWriteInsertOp(0, BSON("x" << -1))},
+ {
+ nsInfoWithShardDatabaseVersions(
+ nssShardedCollection1, dbVersionTestDb1, incorrectShardVersion),
+ });
+
+ auto replyItems = bulk_write::performWrites(opCtx(), request);
+
+ ASSERT_EQ(1, replyItems.size());
+ ASSERT_EQ(ErrorCodes::StaleConfig, replyItems.back().getStatus().code());
+
+ OperationShardingState::get(opCtx()).resetShardingOperationFailedStatus();
+}
+
+// Two ordered inserts into different sharded collections. The first is successful and
+// the second is failing.
+TEST_F(BulkWriteShardTest, OneSuccessfulShardedOneFailingShardedOrdered) {
+ BulkWriteCommandRequest request(
+ {BulkWriteInsertOp(0, BSON("x" << 1)), BulkWriteInsertOp(1, BSON("x" << -1))},
+ {nsInfoWithShardDatabaseVersions(
+ nssShardedCollection1, dbVersionTestDb1, shardVersionShardedCollection1),
+ nsInfoWithShardDatabaseVersions(
+ nssShardedCollection2, dbVersionTestDb2, incorrectShardVersion)});
+
+ auto replyItems = bulk_write::performWrites(opCtx(), request);
+
+ ASSERT_EQ(2, replyItems.size());
+ ASSERT_OK(replyItems.front().getStatus());
+ ASSERT_EQ(ErrorCodes::StaleConfig, replyItems.back().getStatus().code());
+
+ OperationShardingState::get(opCtx()).resetShardingOperationFailedStatus();
+}
+
+// Two unordered inserts into the same sharded collection. On most errors we proceed
+// with the rest of the operations, but on StaleConfig errors we don't.
+TEST_F(BulkWriteShardTest, OneFailingShardedOneSkippedShardedUnordered) {
+ BulkWriteCommandRequest request(
+ {BulkWriteInsertOp(0, BSON("x" << 1)), BulkWriteInsertOp(0, BSON("x" << -1))},
+ {nsInfoWithShardDatabaseVersions(
+ nssShardedCollection1, dbVersionTestDb1, incorrectShardVersion)});
+ request.setOrdered(false);
+
+ auto replyItems = bulk_write::performWrites(opCtx(), request);
+
+ ASSERT_EQ(1, replyItems.size());
+ ASSERT_EQ(ErrorCodes::StaleConfig, replyItems.back().getStatus().code());
+
+ OperationShardingState::get(opCtx()).resetShardingOperationFailedStatus();
+}
+
+// Two unordered inserts into different sharded collections. Despite being unordered
+// inserts, the implementation will halt on the very first error.
+TEST_F(BulkWriteShardTest, OneSuccessfulShardedOneFailingShardedUnordered) {
+ BulkWriteCommandRequest request(
+ {BulkWriteInsertOp(0, BSON("x" << 1)), BulkWriteInsertOp(1, BSON("x" << -1))},
+ {nsInfoWithShardDatabaseVersions(
+ nssShardedCollection1, dbVersionTestDb1, incorrectShardVersion),
+ nsInfoWithShardDatabaseVersions(
+ nssShardedCollection2, dbVersionTestDb2, shardVersionShardedCollection2)});
+ request.setOrdered(false);
+
+ auto replyItems = bulk_write::performWrites(opCtx(), request);
+
+ ASSERT_EQ(1, replyItems.size());
+ ASSERT_EQ(ErrorCodes::StaleConfig, replyItems.back().getStatus().code());
+
+ OperationShardingState::get(opCtx()).resetShardingOperationFailedStatus();
+}
+
+// Ordered inserts and updates into different collections where all succeed.
+TEST_F(BulkWriteShardTest, InsertsAndUpdatesSuccessOrdered) {
+ BulkWriteCommandRequest request(
+ {BulkWriteInsertOp(2, BSON("x" << 1)),
+ BulkWriteInsertOp(0, BSON("x" << 3)),
+ BulkWriteUpdateOp(0, BSON("x" << BSON("$gt" << 0)), BSON("x" << -9)),
+ BulkWriteInsertOp(1, BSON("x" << -1))},
+ {nsInfoWithShardDatabaseVersions(
+ nssShardedCollection1, dbVersionTestDb1, shardVersionShardedCollection1),
+ nsInfoWithShardDatabaseVersions(
+ nssShardedCollection2, dbVersionTestDb2, shardVersionShardedCollection2),
+ nsInfoWithShardDatabaseVersions(
+ nssUnshardedCollection1, dbVersionTestDb1, ShardVersion::UNSHARDED())});
+
+ auto replyItems = bulk_write::performWrites(opCtx(), request);
+
+ ASSERT_EQ(4, replyItems.size());
+ for (const auto& reply : replyItems) {
+ ASSERT_OK(reply.getStatus());
+ }
+
+ OperationShardingState::get(opCtx()).resetShardingOperationFailedStatus();
+}
+
+// Unordered inserts and updates into different collections where all succeed.
+TEST_F(BulkWriteShardTest, InsertsAndUpdatesSuccessUnordered) {
+ BulkWriteCommandRequest request(
+ {BulkWriteInsertOp(2, BSON("x" << 1)),
+ BulkWriteInsertOp(0, BSON("x" << 3)),
+ BulkWriteUpdateOp(0, BSON("x" << BSON("$gt" << 0)), BSON("x" << -9)),
+ BulkWriteInsertOp(1, BSON("x" << -1))},
+ {nsInfoWithShardDatabaseVersions(
+ nssShardedCollection1, dbVersionTestDb1, shardVersionShardedCollection1),
+ nsInfoWithShardDatabaseVersions(
+ nssShardedCollection2, dbVersionTestDb2, shardVersionShardedCollection2),
+ nsInfoWithShardDatabaseVersions(
+ nssUnshardedCollection1, dbVersionTestDb1, ShardVersion::UNSHARDED())});
+
+ request.setOrdered(false);
+
+ auto replyItems = bulk_write::performWrites(opCtx(), request);
+
+ ASSERT_EQ(4, replyItems.size());
+ for (const auto& reply : replyItems) {
+ ASSERT_OK(reply.getStatus());
+ }
+
+ OperationShardingState::get(opCtx()).resetShardingOperationFailedStatus();
+}
+
+// Unordered inserts and updates into different collections where some fail.
+TEST_F(BulkWriteShardTest, InsertsAndUpdatesFailUnordered) {
+ BulkWriteCommandRequest request(
+ {BulkWriteInsertOp(2, BSON("x" << 1)),
+ BulkWriteInsertOp(0, BSON("x" << 3)),
+ BulkWriteUpdateOp(0, BSON("x" << BSON("$gt" << 0)), BSON("x" << -9)),
+ BulkWriteInsertOp(1, BSON("x" << -1))},
+ {nsInfoWithShardDatabaseVersions(
+ nssShardedCollection1, dbVersionTestDb1, incorrectShardVersion),
+ nsInfoWithShardDatabaseVersions(
+ nssShardedCollection2, dbVersionTestDb2, shardVersionShardedCollection2),
+ nsInfoWithShardDatabaseVersions(
+ nssUnshardedCollection1, dbVersionTestDb1, ShardVersion::UNSHARDED())});
+
+ request.setOrdered(false);
+
+ auto replyItems = bulk_write::performWrites(opCtx(), request);
+
+ ASSERT_EQ(2, replyItems.size());
+ ASSERT_OK(replyItems.front().getStatus());
+ ASSERT_EQ(ErrorCodes::StaleConfig, replyItems.back().getStatus().code());
+
+ OperationShardingState::get(opCtx()).resetShardingOperationFailedStatus();
+}
+
+// TODO (SERVER-75202): Re-enable this test & write a test for deletes.
+// Unordered updates into different collections where some fail.
+// TEST_F(BulkWriteShardTest, UpdatesFailUnordered) {
+// BulkWriteCommandRequest request(
+// {
+// BulkWriteUpdateOp(1, BSON("x" << BSON("$gt" << 0)), BSON("x" << -99)),
+// BulkWriteUpdateOp(0, BSON("x" << BSON("$gt" << 0)), BSON("x" << -9)),
+// BulkWriteInsertOp(1, BSON("x" << -1))},
+// {nsInfoWithShardDatabaseVersions(
+// nssShardedCollection1, dbVersionTestDb, incorrectShardVersion),
+// nsInfoWithShardDatabaseVersions(
+// nssShardedCollection2, dbVersionTestDb, shardVersionShardedCollection2)});
+
+// request.setOrdered(false);
+
+// auto replyItems = bulk_write::performWrites(opCtx(), request);
+
+// ASSERT_EQ(2, replyItems.size());
+// ASSERT_OK(replyItems.front().getStatus());
+// ASSERT_EQ(ErrorCodes::StaleConfig, replyItems[1].getStatus().code());
+
+// OperationShardingState::get(opCtx()).resetShardingOperationFailedStatus();
+// }
+
+// After the first insert fails due to an incorrect database version, the rest
+// of the writes are skipped when operations are ordered.
+TEST_F(BulkWriteShardTest, FirstFailsRestSkippedStaleDbVersionOrdered) {
+ BulkWriteCommandRequest request(
+ {BulkWriteInsertOp(0, BSON("x" << 1)),
+ BulkWriteInsertOp(0, BSON("x" << -1)),
+ BulkWriteInsertOp(1, BSON("x" << -2))},
+ {nsInfoWithShardDatabaseVersions(
+ nssShardedCollection1, incorrectDatabaseVersion, shardVersionShardedCollection1),
+ nsInfoWithShardDatabaseVersions(
+ nssShardedCollection2, dbVersionTestDb2, shardVersionShardedCollection2)});
+
+ auto replyItems = bulk_write::performWrites(opCtx(), request);
+
+ ASSERT_EQ(1, replyItems.size());
+ ASSERT_EQ(ErrorCodes::StaleDbVersion, replyItems.back().getStatus().code());
+
+ OperationShardingState::get(opCtx()).resetShardingOperationFailedStatus();
+}
+
+// After the second insert fails due to an incorrect database version, the rest
+// of the writes are skipped when operations are unordered.
+TEST_F(BulkWriteShardTest, FirstFailsRestSkippedStaleDbVersionUnordered) {
+ BulkWriteCommandRequest request(
+ {BulkWriteInsertOp(1, BSON("x" << 1)),
+ BulkWriteInsertOp(0, BSON("x" << -1)),
+ BulkWriteInsertOp(1, BSON("x" << -2))},
+ {nsInfoWithShardDatabaseVersions(
+ nssUnshardedCollection1, incorrectDatabaseVersion, ShardVersion::UNSHARDED()),
+ nsInfoWithShardDatabaseVersions(
+ nssShardedCollection2, dbVersionTestDb2, shardVersionShardedCollection2)});
+
+ request.setOrdered(false);
+ auto replyItems = bulk_write::performWrites(opCtx(), request);
+
+ ASSERT_EQ(2, replyItems.size());
+ ASSERT_OK(replyItems.front().getStatus());
+ ASSERT_EQ(ErrorCodes::StaleDbVersion, replyItems.back().getStatus().code());
+
+ OperationShardingState::get(opCtx()).resetShardingOperationFailedStatus();
+}
+
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/db/commands/bulk_write.cpp b/src/mongo/db/commands/bulk_write.cpp
index 916ecd1a5ad..a91a7c9698f 100644
--- a/src/mongo/db/commands/bulk_write.cpp
+++ b/src/mongo/db/commands/bulk_write.cpp
@@ -38,6 +38,7 @@
#include "mongo/db/catalog/collection_operation_source.h"
#include "mongo/db/catalog/document_validation.h"
#include "mongo/db/commands.h"
+#include "mongo/db/commands/bulk_write.h"
#include "mongo/db/commands/bulk_write_crud_op.h"
#include "mongo/db/commands/bulk_write_gen.h"
#include "mongo/db/commands/bulk_write_parser.h"
@@ -57,6 +58,7 @@
#include "mongo/db/query/plan_executor_factory.h"
#include "mongo/db/query/query_knobs_gen.h"
#include "mongo/db/repl/oplog.h"
+#include "mongo/db/s/operation_sharding_state.h"
#include "mongo/db/server_feature_flags_gen.h"
#include "mongo/db/server_options.h"
#include "mongo/db/service_context.h"
@@ -164,10 +166,10 @@ private:
std::vector<InsertStatement> _batch;
boost::optional<int> _firstOpIdx;
+ // Return true when the batch is at maximum capacity and should be flushed.
bool _addInsertToBatch(OperationContext* opCtx, const int stmtId, const BSONObj& toInsert) {
_batch.emplace_back(stmtId, toInsert);
- // Return true when the batch is at maximum capacity and should be flushed.
return _batch.size() == _batch.capacity();
}
@@ -545,92 +547,6 @@ bool handleDeleteOp(OperationContext* opCtx,
}
}
-std::vector<BulkWriteReplyItem> performWrites(OperationContext* opCtx,
- const BulkWriteCommandRequest& req) {
- const auto& ops = req.getOps();
- const auto& bypassDocumentValidation = req.getBypassDocumentValidation();
-
- DisableDocumentSchemaValidationIfTrue docSchemaValidationDisabler(opCtx,
- bypassDocumentValidation);
-
- DisableSafeContentValidationIfTrue safeContentValidationDisabler(
- opCtx, bypassDocumentValidation, false);
-
- auto responses = BulkWriteReplies(req, ops.size());
-
- // Construct reply handler callbacks.
- auto insertCB = [&responses](OperationContext* opCtx,
- int currentOpIdx,
- write_ops_exec::WriteResult& writes) {
- responses.addInsertReplies(opCtx, currentOpIdx, writes);
- };
- auto updateCB = [&responses](int currentOpIdx,
- const UpdateResult& result,
- const boost::optional<BSONObj>& value) {
- responses.addUpdateReply(currentOpIdx, result, value);
- };
- auto deleteCB =
- [&responses](int currentOpIdx, long long nDeleted, const boost::optional<BSONObj>& value) {
- responses.addDeleteReply(currentOpIdx, nDeleted, value);
- };
-
- auto errorCB = [&responses](int currentOpIdx, const Status& status) {
- responses.addErrorReply(currentOpIdx, status);
- };
-
- // Create a current insert batch.
- const size_t maxBatchSize = internalInsertMaxBatchSize.load();
- auto batch = InsertBatch(req, std::min(ops.size(), maxBatchSize), insertCB);
-
- size_t idx = 0;
-
- auto curOp = CurOp::get(opCtx);
-
- ON_BLOCK_EXIT([&] {
- if (curOp) {
- finishCurOp(opCtx, &*curOp);
- }
- });
-
- for (; idx < ops.size(); ++idx) {
- auto op = BulkWriteCRUDOp(ops[idx]);
- auto opType = op.getType();
-
- if (opType == BulkWriteCRUDOp::kInsert) {
- if (!handleInsertOp(opCtx, op.getInsert(), req, idx, errorCB, batch)) {
- // Insert write failed can no longer continue.
- break;
- }
- } else if (opType == BulkWriteCRUDOp::kUpdate) {
- // Flush insert ops before handling update ops.
- if (!batch.flush(opCtx)) {
- break;
- }
- if (!handleUpdateOp(opCtx, curOp, op.getUpdate(), req, idx, errorCB, updateCB)) {
- // Update write failed can no longer continue.
- break;
- }
- } else {
- // Flush insert ops before handling delete ops.
- if (!batch.flush(opCtx)) {
- break;
- }
- if (!handleDeleteOp(opCtx, curOp, op.getDelete(), req, idx, errorCB, deleteCB)) {
- // Delete write failed can no longer continue.
- break;
- }
- }
- }
-
- // It does not matter if this final flush had errors or not since we finished processing
- // the last op already.
- batch.flush(opCtx);
-
- invariant(batch.empty());
-
- return responses.getReplies();
-}
-
bool haveSpaceForNext(const BSONObj& nextDoc, long long numDocs, size_t bytesBuffered) {
invariant(numDocs >= 0);
if (!numDocs) {
@@ -727,7 +643,7 @@ public:
}
// Apply all of the write operations.
- auto replies = performWrites(opCtx, req);
+ auto replies = bulk_write::performWrites(opCtx, req);
return _populateCursorReply(opCtx, req, std::move(replies));
}
@@ -869,4 +785,103 @@ public:
} bulkWriteCmd;
} // namespace
+
+namespace bulk_write {
+
+std::vector<BulkWriteReplyItem> performWrites(OperationContext* opCtx,
+ const BulkWriteCommandRequest& req) {
+ const auto& ops = req.getOps();
+ const auto& bypassDocumentValidation = req.getBypassDocumentValidation();
+
+ DisableDocumentSchemaValidationIfTrue docSchemaValidationDisabler(opCtx,
+ bypassDocumentValidation);
+
+ DisableSafeContentValidationIfTrue safeContentValidationDisabler(
+ opCtx, bypassDocumentValidation, false);
+
+ auto responses = BulkWriteReplies(req, ops.size());
+
+ // Construct reply handler callbacks.
+ auto insertCB = [&responses](OperationContext* opCtx,
+ int currentOpIdx,
+ write_ops_exec::WriteResult& writes) {
+ responses.addInsertReplies(opCtx, currentOpIdx, writes);
+ };
+ auto updateCB = [&responses](int currentOpIdx,
+ const UpdateResult& result,
+ const boost::optional<BSONObj>& value) {
+ responses.addUpdateReply(currentOpIdx, result, value);
+ };
+ auto deleteCB =
+ [&responses](int currentOpIdx, long long nDeleted, const boost::optional<BSONObj>& value) {
+ responses.addDeleteReply(currentOpIdx, nDeleted, value);
+ };
+
+ auto errorCB = [&responses](int currentOpIdx, const Status& status) {
+ responses.addErrorReply(currentOpIdx, status);
+ };
+
+ // Create a current insert batch.
+ const size_t maxBatchSize = internalInsertMaxBatchSize.load();
+ auto batch = InsertBatch(req, std::min(ops.size(), maxBatchSize), insertCB);
+
+ size_t idx = 0;
+
+ auto curOp = CurOp::get(opCtx);
+
+ ON_BLOCK_EXIT([&] {
+ if (curOp) {
+ finishCurOp(opCtx, &*curOp);
+ }
+ });
+
+ // Tell mongod what the shard and database versions are. This will cause writes to fail in case
+ // there is a mismatch in the mongos request provided versions and the local (shard's)
+ // understanding of the version.
+ for (const auto& nsInfo : req.getNsInfo()) {
+ // TODO (SERVER-72767, SERVER-72804, SERVER-72805): Support timeseries collections.
+ OperationShardingState::setShardRole(
+ opCtx, nsInfo.getNs(), nsInfo.getShardVersion(), nsInfo.getDatabaseVersion());
+ }
+
+ for (; idx < ops.size(); ++idx) {
+ auto op = BulkWriteCRUDOp(ops[idx]);
+ auto opType = op.getType();
+
+ if (opType == BulkWriteCRUDOp::kInsert) {
+ if (!handleInsertOp(opCtx, op.getInsert(), req, idx, errorCB, batch)) {
+ // Insert write failed can no longer continue.
+ break;
+ }
+ } else if (opType == BulkWriteCRUDOp::kUpdate) {
+ // Flush insert ops before handling update ops.
+ if (!batch.flush(opCtx)) {
+ break;
+ }
+ if (!handleUpdateOp(opCtx, curOp, op.getUpdate(), req, idx, errorCB, updateCB)) {
+ // Update write failed can no longer continue.
+ break;
+ }
+ } else {
+ // Flush insert ops before handling delete ops.
+ if (!batch.flush(opCtx)) {
+ break;
+ }
+ if (!handleDeleteOp(opCtx, curOp, op.getDelete(), req, idx, errorCB, deleteCB)) {
+ // Delete write failed can no longer continue.
+ break;
+ }
+ }
+ }
+
+ // It does not matter if this final flush had errors or not since we finished processing
+ // the last op already.
+ batch.flush(opCtx);
+
+ invariant(batch.empty());
+
+ return responses.getReplies();
+}
+
+} // namespace bulk_write
} // namespace mongo
diff --git a/src/mongo/db/commands/bulk_write.h b/src/mongo/db/commands/bulk_write.h
new file mode 100644
index 00000000000..20b2647ca37
--- /dev/null
+++ b/src/mongo/db/commands/bulk_write.h
@@ -0,0 +1,44 @@
+/**
+ * Copyright (C) 2023-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <vector>
+
+#include "mongo/db/commands/bulk_write_gen.h"
+#include "mongo/db/commands/bulk_write_parser.h"
+
+namespace mongo {
+namespace bulk_write {
+
+std::vector<BulkWriteReplyItem> performWrites(OperationContext* opCtx,
+ const BulkWriteCommandRequest& req);
+
+} // namespace bulk_write
+} // namespace mongo