From d286a563d4ad921331d7d3846181a5bb7192a31d Mon Sep 17 00:00:00 2001 From: Daniel Gottlieb Date: Wed, 30 Sep 2020 23:37:25 -0400 Subject: SERVER-49893: Do oplog fetching. --- src/mongo/client/SConscript | 1 + src/mongo/client/dbclient_cursor.cpp | 28 ++ src/mongo/client/dbclient_cursor.h | 4 + src/mongo/db/repl/oplog.cpp | 9 + src/mongo/db/repl/read_concern_args.cpp | 4 +- src/mongo/db/repl/read_concern_args.h | 16 +- src/mongo/db/repl/replication_coordinator_mock.cpp | 7 +- src/mongo/db/s/SConscript | 2 + src/mongo/db/s/resharding/donor_oplog_id.idl | 1 + .../db/s/resharding/resharding_oplog_fetcher.cpp | 110 +++++++ .../db/s/resharding/resharding_oplog_fetcher.h | 68 +++++ src/mongo/db/s/resharding_util.cpp | 2 +- src/mongo/db/s/resharding_util.h | 1 + src/mongo/dbtests/SConscript | 2 + src/mongo/dbtests/resharding_tests.cpp | 323 +++++++++++++++++++++ src/mongo/rpc/message.cpp | 28 ++ src/mongo/rpc/message.h | 2 + 17 files changed, 603 insertions(+), 5 deletions(-) create mode 100644 src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp create mode 100644 src/mongo/db/s/resharding/resharding_oplog_fetcher.h create mode 100644 src/mongo/dbtests/resharding_tests.cpp diff --git a/src/mongo/client/SConscript b/src/mongo/client/SConscript index fbf77caa24c..3b6603f0a1c 100644 --- a/src/mongo/client/SConscript +++ b/src/mongo/client/SConscript @@ -186,6 +186,7 @@ clientDriverEnv.Library( ], LIBDEPS=[ '$BUILD_DIR/mongo/db/dbmessage', + '$BUILD_DIR/mongo/db/pipeline/aggregation_request', '$BUILD_DIR/mongo/db/query/command_request_response', '$BUILD_DIR/mongo/db/query/query_request', '$BUILD_DIR/mongo/db/wire_version', diff --git a/src/mongo/client/dbclient_cursor.cpp b/src/mongo/client/dbclient_cursor.cpp index 7e83de38bc0..259d032da62 100644 --- a/src/mongo/client/dbclient_cursor.cpp +++ b/src/mongo/client/dbclient_cursor.cpp @@ -591,6 +591,34 @@ DBClientCursor::DBClientCursor(DBClientBase* client, } } +/* static */ +StatusWith> DBClientCursor::fromAggregationRequest( + DBClientBase* client, AggregationRequest aggRequest, bool secondaryOk, bool useExhaust) { + BSONObj ret; + try { + if (!client->runCommand(aggRequest.getNamespaceString().db().toString(), + aggRequest.serializeToCommandObj().toBson(), + ret, + secondaryOk ? QueryOption_SlaveOk : 0)) { + return {ErrorCodes::CommandFailed, ret.toString()}; + } + } catch (...) { + return exceptionToStatus(); + } + long long cursorId = ret["cursor"].Obj()["id"].Long(); + std::vector firstBatch; + for (BSONElement elem : ret["cursor"].Obj()["firstBatch"].Array()) { + firstBatch.emplace_back(elem.Obj().getOwned()); + } + + return {std::make_unique(client, + aggRequest.getNamespaceString(), + cursorId, + 0, + useExhaust ? QueryOption_Exhaust : 0, + firstBatch)}; +} + DBClientCursor::~DBClientCursor() { kill(); } diff --git a/src/mongo/client/dbclient_cursor.h b/src/mongo/client/dbclient_cursor.h index cbc73341f88..29ecd2ea5a7 100644 --- a/src/mongo/client/dbclient_cursor.h +++ b/src/mongo/client/dbclient_cursor.h @@ -35,6 +35,7 @@ #include "mongo/db/jsobj.h" #include "mongo/db/json.h" #include "mongo/db/namespace_string.h" +#include "mongo/db/pipeline/aggregation_request.h" #include "mongo/rpc/message.h" namespace mongo { @@ -162,6 +163,9 @@ public: int options, std::vector initialBatch = {}); + static StatusWith> fromAggregationRequest( + DBClientBase* client, AggregationRequest aggRequest, bool secondaryOk, bool useExhaust); + virtual ~DBClientCursor(); long long getCursorId() const { diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index c79267245db..6ce03f511de 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -117,6 +117,7 @@ namespace { using namespace fmt::literals; +MONGO_FAIL_POINT_DEFINE(addDestinedRecipient); MONGO_FAIL_POINT_DEFINE(sleepBetweenInsertOpTimeGenerationAndLogOp); // Failpoint to block after a write and its oplog entry have been written to the storage engine and @@ -285,6 +286,10 @@ void _logOpsInner(OperationContext* opCtx, } OpTime logOp(OperationContext* opCtx, MutableOplogEntry* oplogEntry) { + addDestinedRecipient.execute([&](const BSONObj& data) { + auto recipient = data["destinedRecipient"].String(); + oplogEntry->setDestinedRecipient(boost::make_optional({recipient})); + }); // All collections should have UUIDs now, so all insert, update, and delete oplog entries should // also have uuids. Some no-op (n) and command (c) entries may still elide the uuid field. invariant(oplogEntry->getUuid() || oplogEntry->getOpType() == OpTypeEnum::kNoop || @@ -394,6 +399,10 @@ std::vector logInsertOps(OperationContext* opCtx, oplogEntry.setObject(begin[i].doc); oplogEntry.setOpTime(insertStatementOplogSlot); oplogEntry.setDestinedRecipient(getDestinedRecipient(opCtx, nss, begin[i].doc)); + addDestinedRecipient.execute([&](const BSONObj& data) { + auto recipient = data["destinedRecipient"].String(); + oplogEntry.setDestinedRecipient(boost::make_optional({recipient})); + }); OplogLink oplogLink; if (i > 0) diff --git a/src/mongo/db/repl/read_concern_args.cpp b/src/mongo/db/repl/read_concern_args.cpp index d1067a0ff2e..27c949b1d63 100644 --- a/src/mongo/db/repl/read_concern_args.cpp +++ b/src/mongo/db/repl/read_concern_args.cpp @@ -71,9 +71,9 @@ ReadConcernArgs::ReadConcernArgs(boost::optional opTime, boost::optional level) : _opTime(std::move(opTime)), _level(std::move(level)), _specified(_opTime || _level) {} -ReadConcernArgs::ReadConcernArgs(boost::optional clusterTime, +ReadConcernArgs::ReadConcernArgs(boost::optional afterClusterTime, boost::optional level) - : _afterClusterTime(std::move(clusterTime)), + : _afterClusterTime(std::move(afterClusterTime)), _level(std::move(level)), _specified(_afterClusterTime || _level) {} diff --git a/src/mongo/db/repl/read_concern_args.h b/src/mongo/db/repl/read_concern_args.h index ba6004d13bf..5c8b0d101fd 100644 --- a/src/mongo/db/repl/read_concern_args.h +++ b/src/mongo/db/repl/read_concern_args.h @@ -80,7 +80,7 @@ public: ReadConcernArgs(boost::optional opTime, boost::optional level); - ReadConcernArgs(boost::optional clusterTime, + ReadConcernArgs(boost::optional afterClusterTime, boost::optional level); /** * Format: @@ -167,7 +167,21 @@ public: boost::optional getArgsAfterClusterTime() const; boost::optional getArgsAtClusterTime() const; + + /** + * Returns a BSON object of the form: + * + * { readConcern: { level: "...", + * afterClusterTime: Timestamp(...) } } + */ BSONObj toBSON() const; + + /** + * Returns a BSON object of the form: + * + * { level: "...", + * afterClusterTime: Timestamp(...) } + */ BSONObj toBSONInner() const; std::string toString() const; diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp index ced4341556c..e1214838634 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.cpp +++ b/src/mongo/db/repl/replication_coordinator_mock.cpp @@ -650,7 +650,12 @@ std::shared_ptr ReplicationCoordinatorMock::awaitIsMaste response->setReplSetVersion(config.getConfigVersion()); response->setIsMaster(true); response->setIsSecondary(false); - response->setMe(config.getMemberAt(0).getHostAndPort()); + if (config.getNumMembers() > 0) { + response->setMe(config.getMemberAt(0).getHostAndPort()); + } else { + response->setMe(HostAndPort::parseThrowing("localhost:27017")); + } + response->setElectionId(OID::gen()); response->setTopologyVersion(TopologyVersion(repl::instanceId, 0)); return response; diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index c8ae8134680..76740dcbd9e 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -72,6 +72,7 @@ env.Library( 'resharding/resharding_coordinator_service.cpp', 'resharding/resharding_donor_oplog_iterator.cpp', 'resharding/resharding_donor_service.cpp', + 'resharding/resharding_oplog_fetcher.cpp', 'resharding/resharding_recipient_service.cpp', 'scoped_operation_completion_sharding_actions.cpp', 'session_catalog_migration_destination.cpp', @@ -122,6 +123,7 @@ env.Library( 'transaction_coordinator', ], LIBDEPS_PRIVATE=[ + '$BUILD_DIR/mongo/client/clientdriver_minimal', '$BUILD_DIR/mongo/db/index_builds_coordinator_interface', '$BUILD_DIR/mongo/db/rs_local_client', '$BUILD_DIR/mongo/db/session_catalog', diff --git a/src/mongo/db/s/resharding/donor_oplog_id.idl b/src/mongo/db/s/resharding/donor_oplog_id.idl index 6393e57aa32..6ab3fbc2913 100644 --- a/src/mongo/db/s/resharding/donor_oplog_id.idl +++ b/src/mongo/db/s/resharding/donor_oplog_id.idl @@ -38,6 +38,7 @@ structs: ReshardingDonorOplogId: description: >- Represents the set of timestamps that belong to an operation from the donor shard. + generate_comparison_operators: true fields: clusterTime: type: timestamp diff --git a/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp b/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp new file mode 100644 index 00000000000..3e10a4ce030 --- /dev/null +++ b/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp @@ -0,0 +1,110 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding + +#include "mongo/platform/basic.h" + +#include + +#include "mongo/db/s/resharding/resharding_oplog_fetcher.h" + +#include "mongo/bson/bsonobj.h" +#include "mongo/db/catalog_raii.h" +#include "mongo/db/concurrency/write_conflict_exception.h" +#include "mongo/db/repl/read_concern_args.h" +#include "mongo/db/repl/read_concern_level.h" +#include "mongo/db/s/resharding_util.h" +#include "mongo/db/storage/write_unit_of_work.h" +#include "mongo/logv2/log.h" + +namespace mongo { +ReshardingDonorOplogId ReshardingOplogFetcher::iterate( + OperationContext* opCtx, + DBClientBase* conn, + boost::intrusive_ptr expCtx, + ReshardingDonorOplogId startAfter, + UUID collUUID, + const ShardId& recipientShard, + bool doesDonorOwnMinKeyChunk, + NamespaceString toWriteToNss) { + + // This method will use the input opCtx to perform writes into `toWriteToNss`. + invariant(!opCtx->lockState()->inAWriteUnitOfWork()); + + // Create the destination collection if necessary. + writeConflictRetry(opCtx, "createReshardingLocalOplogBuffer", toWriteToNss.toString(), [&] { + const CollectionPtr toWriteTo = + CollectionCatalog::get(opCtx).lookupCollectionByNamespace(opCtx, toWriteToNss); + if (toWriteTo) { + return; + } + + WriteUnitOfWork wuow(opCtx); + AutoGetOrCreateDb db(opCtx, toWriteToNss.db(), LockMode::MODE_IX); + Lock::CollectionLock collLock(opCtx, toWriteToNss, MODE_IX); + db.getDb()->createCollection(opCtx, toWriteToNss); + wuow.commit(); + }); + + std::vector serializedPipeline = + createOplogFetchingPipelineForResharding( + expCtx, startAfter, collUUID, recipientShard, doesDonorOwnMinKeyChunk) + ->serializeToBson(); + + AggregationRequest aggRequest(NamespaceString::kRsOplogNamespace, serializedPipeline); + auto readConcernArgs = repl::ReadConcernArgs( + boost::optional(startAfter.getTs()), + boost::optional(repl::ReadConcernLevel::kMajorityReadConcern)); + aggRequest.setReadConcern(readConcernArgs.toBSONInner()); + aggRequest.setHint(BSON("$natural" << 1)); + + const bool secondaryOk = true; + const bool useExhaust = true; + std::unique_ptr cursor = uassertStatusOK(DBClientCursor::fromAggregationRequest( + conn, std::move(aggRequest), secondaryOk, useExhaust)); + + // Noting some possible optimizations: + // + // * Batch more inserts into larger storage transactions. + // * Parallize writing documents across multiple threads. + // * Doing either of the above while still using the underlying message buffer of bson objects. + AutoGetCollection toWriteTo(opCtx, toWriteToNss, LockMode::MODE_IX); + ReshardingDonorOplogId lastSeen = startAfter; + while (cursor->more()) { + WriteUnitOfWork wuow(opCtx); + BSONObj obj = cursor->next(); + lastSeen = ReshardingDonorOplogId::parse({"OplogFetcherParsing"}, obj["_id"].Obj()); + uassertStatusOK(toWriteTo->insertDocument(opCtx, InsertStatement{obj}, nullptr)); + wuow.commit(); + } + + return lastSeen; +} +} // namespace mongo diff --git a/src/mongo/db/s/resharding/resharding_oplog_fetcher.h b/src/mongo/db/s/resharding/resharding_oplog_fetcher.h new file mode 100644 index 00000000000..cd242aafe3d --- /dev/null +++ b/src/mongo/db/s/resharding/resharding_oplog_fetcher.h @@ -0,0 +1,68 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ +#pragma once + +#include "mongo/base/status_with.h" +#include "mongo/client/dbclient_base.h" +#include "mongo/db/namespace_string.h" +#include "mongo/db/operation_context.h" +#include "mongo/db/pipeline/expression_context.h" +#include "mongo/db/s/resharding/donor_oplog_id_gen.h" +#include "mongo/s/shard_id.h" +#include "mongo/util/uuid.h" + +namespace mongo { +class ReshardingOplogFetcher { +public: + /** + * Issues an aggregation to `DBClientBase`s starting at `startAfter` and copies the entries + * relevant to `recipientShard` into `toWriteInto`. Control is returned when the aggregation + * cursor is exhausted. + * + * Returns an identifier for the last oplog-ish document written to `toWriteInto`. + * + * This method throws. + * + * TODO SERVER-51245 Replace `DBClientBase` with a `Shard`. Right now `Shard` does not do things + * like perform aggregate commands nor does it expose a cursor/stream interface. However, using + * a `Shard` object will provide critical behavior such as advancing logical clock values on a + * response and targetting a node to send the aggregation command to. + */ + ReshardingDonorOplogId iterate(OperationContext* opCtx, + DBClientBase* conn, + boost::intrusive_ptr expCtx, + ReshardingDonorOplogId startAfter, + UUID collUUID, + const ShardId& recipientShard, + bool doesDonorOwnMinKeyChunk, + NamespaceString toWriteInto); + +private: +}; +} // namespace mongo diff --git a/src/mongo/db/s/resharding_util.cpp b/src/mongo/db/s/resharding_util.cpp index 339f4f456d2..40e1df609de 100644 --- a/src/mongo/db/s/resharding_util.cpp +++ b/src/mongo/db/s/resharding_util.cpp @@ -369,7 +369,7 @@ std::unique_ptr createConfigTxnCloningPipelineForResh void createSlimOplogView(OperationContext* opCtx, Database* db) { writeConflictRetry( - opCtx, "createReshardingOplog", "local.system.resharding.slimOplogForGraphLookup", [&] { + opCtx, "createReshardingSlimOplog", "local.system.resharding.slimOplogForGraphLookup", [&] { { // Create 'system.views' in a separate WUOW if it does not exist. WriteUnitOfWork wuow(opCtx); diff --git a/src/mongo/db/s/resharding_util.h b/src/mongo/db/s/resharding_util.h index 247dfba84a9..af5001e34dd 100644 --- a/src/mongo/db/s/resharding_util.h +++ b/src/mongo/db/s/resharding_util.h @@ -180,6 +180,7 @@ std::unique_ptr createOplogFetchingPipelineForReshard boost::optional getDestinedRecipient(OperationContext* opCtx, const NamespaceString& sourceNss, const BSONObj& fullDocument); + /** * Creates pipeline for filtering collection data matching the recipient shard. */ diff --git a/src/mongo/dbtests/SConscript b/src/mongo/dbtests/SConscript index d96d9c9fa66..d670d6e74d0 100644 --- a/src/mongo/dbtests/SConscript +++ b/src/mongo/dbtests/SConscript @@ -123,6 +123,7 @@ if not has_option('noshell') and usemozjs: 'querytests.cpp', 'replica_set_tests.cpp', 'repltests.cpp', + 'resharding_tests.cpp', 'rollbacktests.cpp', 'scanning_replica_set_monitor_test.cpp', 'socktests.cpp', @@ -166,6 +167,7 @@ if not has_option('noshell') and usemozjs: "$BUILD_DIR/mongo/db/repl/serveronly_repl", "$BUILD_DIR/mongo/db/repl/storage_interface_impl", "$BUILD_DIR/mongo/db/repl/timestamp_block", + "$BUILD_DIR/mongo/db/s/resharding_util", "$BUILD_DIR/mongo/db/server_options_core", "$BUILD_DIR/mongo/db/sessions_collection_standalone", "$BUILD_DIR/mongo/db/storage/durable_catalog_impl", diff --git a/src/mongo/dbtests/resharding_tests.cpp b/src/mongo/dbtests/resharding_tests.cpp new file mode 100644 index 00000000000..8b29040edc1 --- /dev/null +++ b/src/mongo/dbtests/resharding_tests.cpp @@ -0,0 +1,323 @@ +/** + * Copyright (C) 2020-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest + +#include "mongo/platform/basic.h" + +#include + +#include "mongo/db/client.h" +#include "mongo/db/concurrency/write_conflict_exception.h" +#include "mongo/db/db_raii.h" +#include "mongo/db/dbdirectclient.h" +#include "mongo/db/dbhelpers.h" +#include "mongo/db/global_settings.h" +#include "mongo/db/op_observer_impl.h" +#include "mongo/db/op_observer_registry.h" +#include "mongo/db/pipeline/expression_context_for_test.h" +#include "mongo/db/repl/oplog.h" +#include "mongo/db/repl/repl_client_info.h" +#include "mongo/db/repl/replication_consistency_markers_mock.h" +#include "mongo/db/repl/replication_coordinator.h" +#include "mongo/db/repl/replication_coordinator_mock.h" +#include "mongo/db/repl/replication_process.h" +#include "mongo/db/repl/replication_recovery_mock.h" +#include "mongo/db/repl/storage_interface_impl.h" +#include "mongo/db/s/resharding/resharding_oplog_fetcher.h" +#include "mongo/db/s/resharding_util.h" +#include "mongo/db/service_context.h" +#include "mongo/db/storage/snapshot_manager.h" +#include "mongo/db/storage/storage_engine.h" +#include "mongo/db/storage/write_unit_of_work.h" +#include "mongo/db/vector_clock_mutable.h" +#include "mongo/dbtests/dbtests.h" +#include "mongo/logv2/log.h" +#include "mongo/unittest/unittest.h" + +namespace mongo { +namespace { +/** + * RAII type for operating at a timestamp. Will remove any timestamping when the object destructs. + */ +class OneOffRead { +public: + OneOffRead(OperationContext* opCtx, const Timestamp& ts) : _opCtx(opCtx) { + _opCtx->recoveryUnit()->abandonSnapshot(); + if (ts.isNull()) { + _opCtx->recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kNoTimestamp); + } else { + _opCtx->recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kProvided, ts); + } + } + + ~OneOffRead() { + _opCtx->recoveryUnit()->abandonSnapshot(); + _opCtx->recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kNoTimestamp); + } + +private: + OperationContext* _opCtx; +}; + +/** + * Observed problems using ShardingMongodTestFixture: + * + * - Does not mix with dbtest. Both will initialize a ServiceContext. + * - By default uses ephemeralForTest. These tests require a storage engine that supports majority + * reads. + * - When run as a unittest (and using WT), the fixture initializes the storage engine for each test + * that is run. WT specifically installs a ServerStatusSection. The server status code asserts + * that a section is never added after a `serverStatus` command is run. Tests defined in + * `migration_manager_test` (part of the `db_s_config_server_test` unittest binary) call a + * serverStatus triggerring this assertion. + */ +class ReshardingTest : public unittest::Test { +public: + ServiceContext::UniqueOperationContext _opCtxRaii = cc().makeOperationContext(); + OperationContext* _opCtx = _opCtxRaii.get(); + ServiceContext* _svcCtx = _opCtx->getServiceContext(); + VectorClockMutable* _clock = VectorClockMutable::get(_opCtx); + + ReshardingTest() { + repl::ReplSettings replSettings; + replSettings.setOplogSizeBytes(100 * 1024 * 1024); + replSettings.setReplSetString("rs0"); + setGlobalReplSettings(replSettings); + + auto replCoordinatorMock = + std::make_unique(_svcCtx, replSettings); + replCoordinatorMock->alwaysAllowWrites(true); + repl::ReplicationCoordinator::set(_svcCtx, std::move(replCoordinatorMock)); + repl::StorageInterface::set(_svcCtx, std::make_unique()); + repl::ReplicationProcess::set( + _svcCtx, + std::make_unique( + repl::StorageInterface::get(_svcCtx), + std::make_unique(), + std::make_unique())); + + // Since the Client object persists across tests, even though the global + // ReplicationCoordinator does not, we need to clear the last op associated with the client + // to avoid the invariant in ReplClientInfo::setLastOp that the optime only goes forward. + repl::ReplClientInfo::forClient(_opCtx->getClient()).clearLastOp_forTest(); + + auto opObsRegistry = std::make_unique(); + opObsRegistry->addObserver(std::make_unique()); + _opCtx->getServiceContext()->setOpObserver(std::move(opObsRegistry)); + + repl::setOplogCollectionName(_svcCtx); + repl::createOplog(_opCtx); + + _clock->tickClusterTimeTo(LogicalTime(Timestamp(1, 0))); + } + + ~ReshardingTest() { + try { + reset(NamespaceString("local.oplog.rs")); + } catch (...) { + FAIL("Exception while cleaning up test"); + } + } + + + /** + * Walking on ice: resetting the ReplicationCoordinator destroys the underlying + * `DropPendingCollectionReaper`. Use a truncate/dropAllIndexes to clean out a collection + * without actually dropping it. + */ + void reset(NamespaceString nss) const { + ::mongo::writeConflictRetry(_opCtx, "deleteAll", nss.ns(), [&] { + _opCtx->recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kNoTimestamp); + AutoGetCollection collRaii(_opCtx, nss, LockMode::MODE_X); + + if (collRaii) { + WriteUnitOfWork wunit(_opCtx); + invariant(collRaii.getWritableCollection()->truncate(_opCtx).isOK()); + if (_opCtx->recoveryUnit()->getCommitTimestamp().isNull()) { + ASSERT_OK(_opCtx->recoveryUnit()->setTimestamp(Timestamp(1, 1))); + } + collRaii.getWritableCollection()->getIndexCatalog()->dropAllIndexes(_opCtx, false); + wunit.commit(); + return; + } + + AutoGetOrCreateDb dbRaii(_opCtx, nss.db(), LockMode::MODE_X); + WriteUnitOfWork wunit(_opCtx); + if (_opCtx->recoveryUnit()->getCommitTimestamp().isNull()) { + ASSERT_OK(_opCtx->recoveryUnit()->setTimestamp(Timestamp(1, 1))); + } + invariant(dbRaii.getDb()->createCollection(_opCtx, nss)); + wunit.commit(); + }); + } + + void insertDocument(const CollectionPtr& coll, const InsertStatement& stmt) { + // Insert some documents. + OpDebug* const nullOpDebug = nullptr; + const bool fromMigrate = false; + ASSERT_OK(coll->insertDocument(_opCtx, stmt, nullOpDebug, fromMigrate)); + } + + BSONObj queryCollection(NamespaceString nss, const BSONObj& query) { + BSONObj ret; + ASSERT_TRUE(Helpers::findOne( + _opCtx, AutoGetCollectionForRead(_opCtx, nss).getCollection(), query, ret)) + << "Query: " << query; + return ret; + } + + BSONObj queryOplog(const BSONObj& query) { + OneOffRead oor(_opCtx, Timestamp::min()); + return queryCollection(NamespaceString::kRsOplogNamespace, query); + } + + repl::OpTime getLastApplied() { + return repl::ReplicationCoordinator::get(_opCtx)->getMyLastAppliedOpTime(); + } + + boost::intrusive_ptr createExpressionContext() { + NamespaceString slimNss = + NamespaceString("local.system.resharding.slimOplogForGraphLookup"); + + boost::intrusive_ptr expCtx( + new ExpressionContextForTest(_opCtx, NamespaceString::kRsOplogNamespace)); + expCtx->setResolvedNamespace(NamespaceString::kRsOplogNamespace, + {NamespaceString::kRsOplogNamespace, {}}); + expCtx->setResolvedNamespace(slimNss, + {slimNss, std::vector{getSlimOplogPipeline()}}); + return expCtx; + } + + int itcount(NamespaceString nss) { + OneOffRead oof(_opCtx, Timestamp::min()); + AutoGetCollectionForRead autoColl(_opCtx, nss); + auto cursor = autoColl.getCollection()->getCursor(_opCtx); + + int ret = 0; + while (auto rec = cursor->next()) { + ++ret; + } + + return ret; + } +}; + +TEST_F(ReshardingTest, RunFetchIteration) { + const NamespaceString outputCollectionNss("dbtests.outputCollection"); + reset(outputCollectionNss); + const NamespaceString dataCollectionNss("dbtests.runFetchIteration"); + reset(dataCollectionNss); + + AutoGetCollection dataColl(_opCtx, dataCollectionNss, LockMode::MODE_IX); + + // Set a failpoint to tack a `destinedRecipient` onto oplog entries. + setGlobalFailPoint("addDestinedRecipient", + BSON("mode" + << "alwaysOn" + << "data" + << BSON("destinedRecipient" + << "shard1"))); + + // Insert five documents. Advance the majority point. Insert five more documents. + const std::int32_t docsToInsert = 5; + { + WriteUnitOfWork wuow(_opCtx); + for (std::int32_t num = 0; num < docsToInsert; ++num) { + insertDocument(dataColl.getCollection(), + InsertStatement(BSON("_id" << num << "a" << num))); + } + wuow.commit(); + } + + repl::StorageInterface::get(_opCtx)->waitForAllEarlierOplogWritesToBeVisible(_opCtx); + const Timestamp firstFiveLastApplied = getLastApplied().getTimestamp(); + _svcCtx->getStorageEngine()->getSnapshotManager()->setCommittedSnapshot(firstFiveLastApplied); + { + WriteUnitOfWork wuow(_opCtx); + for (std::int32_t num = docsToInsert; num < 2 * docsToInsert; ++num) { + insertDocument(dataColl.getCollection(), + InsertStatement(BSON("_id" << num << "a" << num))); + } + wuow.commit(); + } + + // Disable the failpoint. + setGlobalFailPoint("addDestinedRecipient", + BSON("mode" + << "off")); + + repl::StorageInterface::get(_opCtx)->waitForAllEarlierOplogWritesToBeVisible(_opCtx); + const Timestamp latestLastApplied = getLastApplied().getTimestamp(); + + BSONObj firstOplog = queryOplog(BSONObj()); + Timestamp firstTimestamp = firstOplog["ts"].timestamp(); + std::cout << "First oplog: " << firstOplog.toString() + << " Timestamp: " << firstTimestamp.toString() << std::endl; + + // The first call to `iterate` should return the first five inserts and return a + // `ReshardingDonorOplogId` matching the last applied of those five inserts. + ReshardingOplogFetcher fetcher; + DBDirectClient client(_opCtx); + StatusWith ret = fetcher.iterate(_opCtx, + &client, + createExpressionContext(), + {firstTimestamp, firstTimestamp}, + dataColl->uuid(), + {"shard1"}, + true, + outputCollectionNss); + ReshardingDonorOplogId donorOplodId = unittest::assertGet(ret); + // +1 because of the create collection oplog entry. + ASSERT_EQ(docsToInsert + 1, itcount(outputCollectionNss)); + ASSERT_EQ(firstFiveLastApplied, donorOplodId.getClusterTime()); + ASSERT_EQ(firstFiveLastApplied, donorOplodId.getTs()); + + // Advance the committed snapshot. A second `iterate` should return the second batch of five + // inserts. + _svcCtx->getStorageEngine()->getSnapshotManager()->setCommittedSnapshot( + getLastApplied().getTimestamp()); + + ret = fetcher.iterate(_opCtx, + &client, + createExpressionContext(), + {firstFiveLastApplied, firstFiveLastApplied}, + dataColl->uuid(), + {"shard1"}, + true, + outputCollectionNss); + + donorOplodId = unittest::assertGet(ret); + // Two batches of five inserts + 1 entry for the create collection oplog entry. + ASSERT_EQ((2 * docsToInsert) + 1, itcount(outputCollectionNss)); + ASSERT_EQ(latestLastApplied, donorOplodId.getClusterTime()); + ASSERT_EQ(latestLastApplied, donorOplodId.getTs()); +} +} // namespace +} // namespace mongo diff --git a/src/mongo/rpc/message.cpp b/src/mongo/rpc/message.cpp index e002b35ec37..ac4a3dac03b 100644 --- a/src/mongo/rpc/message.cpp +++ b/src/mongo/rpc/message.cpp @@ -31,7 +31,10 @@ #include "mongo/rpc/message.h" +#include + #include "mongo/platform/atomic_word.h" +#include "mongo/rpc/op_msg.h" namespace mongo { @@ -43,4 +46,29 @@ int32_t nextMessageId() { return NextMsgId.fetchAndAdd(1); } +std::string Message::opMsgDebugString() const { + MsgData::ConstView headerView = header(); + auto opMsgRequest = OpMsgRequest::parse(*this); + std::stringstream docSequences; + int idx = 0; + for (const auto& seq : opMsgRequest.sequences) { + docSequences << fmt::format("Sequence Idx: {} Sequence Name: {}", idx++, seq.name) + << std::endl; + for (const auto& obj : seq.objs) { + docSequences << fmt::format("\t{}", obj.toString()) << std::endl; + } + } + + return fmt::format( + "Length: {} RequestId: {} ResponseTo: {} OpCode: {} Flags: {} Body: {}\n" + "Sections: {}", + headerView.getLen(), + headerView.getId(), + headerView.getResponseToMsgId(), + headerView.getNetworkOp(), + OpMsg::flags(*this), + opMsgRequest.body.toString(), + docSequences.str()); +} + } // namespace mongo diff --git a/src/mongo/rpc/message.h b/src/mongo/rpc/message.h index 2acb34a632a..38ecf8b12e7 100644 --- a/src/mongo/rpc/message.h +++ b/src/mongo/rpc/message.h @@ -471,6 +471,8 @@ public: return _buf; } + std::string opMsgDebugString() const; + private: SharedBuffer _buf; }; -- cgit v1.2.1