summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDaniel Gottlieb <daniel.gottlieb@mongodb.com>2020-09-30 23:37:25 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-10-01 05:24:34 +0000
commitd286a563d4ad921331d7d3846181a5bb7192a31d (patch)
tree35229ff3b0dd0a89c6986e0221a805bdb7e5903c
parente66093f0a8ee3cd95dea9480028a6da814bb1854 (diff)
downloadmongo-d286a563d4ad921331d7d3846181a5bb7192a31d.tar.gz
SERVER-49893: Do oplog fetching.
-rw-r--r--src/mongo/client/SConscript1
-rw-r--r--src/mongo/client/dbclient_cursor.cpp28
-rw-r--r--src/mongo/client/dbclient_cursor.h4
-rw-r--r--src/mongo/db/repl/oplog.cpp9
-rw-r--r--src/mongo/db/repl/read_concern_args.cpp4
-rw-r--r--src/mongo/db/repl/read_concern_args.h16
-rw-r--r--src/mongo/db/repl/replication_coordinator_mock.cpp7
-rw-r--r--src/mongo/db/s/SConscript2
-rw-r--r--src/mongo/db/s/resharding/donor_oplog_id.idl1
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp110
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_fetcher.h68
-rw-r--r--src/mongo/db/s/resharding_util.cpp2
-rw-r--r--src/mongo/db/s/resharding_util.h1
-rw-r--r--src/mongo/dbtests/SConscript2
-rw-r--r--src/mongo/dbtests/resharding_tests.cpp323
-rw-r--r--src/mongo/rpc/message.cpp28
-rw-r--r--src/mongo/rpc/message.h2
17 files changed, 603 insertions, 5 deletions
diff --git a/src/mongo/client/SConscript b/src/mongo/client/SConscript
index fbf77caa24c..3b6603f0a1c 100644
--- a/src/mongo/client/SConscript
+++ b/src/mongo/client/SConscript
@@ -186,6 +186,7 @@ clientDriverEnv.Library(
],
LIBDEPS=[
'$BUILD_DIR/mongo/db/dbmessage',
+ '$BUILD_DIR/mongo/db/pipeline/aggregation_request',
'$BUILD_DIR/mongo/db/query/command_request_response',
'$BUILD_DIR/mongo/db/query/query_request',
'$BUILD_DIR/mongo/db/wire_version',
diff --git a/src/mongo/client/dbclient_cursor.cpp b/src/mongo/client/dbclient_cursor.cpp
index 7e83de38bc0..259d032da62 100644
--- a/src/mongo/client/dbclient_cursor.cpp
+++ b/src/mongo/client/dbclient_cursor.cpp
@@ -591,6 +591,34 @@ DBClientCursor::DBClientCursor(DBClientBase* client,
}
}
+/* static */
+StatusWith<std::unique_ptr<DBClientCursor>> DBClientCursor::fromAggregationRequest(
+ DBClientBase* client, AggregationRequest aggRequest, bool secondaryOk, bool useExhaust) {
+ BSONObj ret;
+ try {
+ if (!client->runCommand(aggRequest.getNamespaceString().db().toString(),
+ aggRequest.serializeToCommandObj().toBson(),
+ ret,
+ secondaryOk ? QueryOption_SlaveOk : 0)) {
+ return {ErrorCodes::CommandFailed, ret.toString()};
+ }
+ } catch (...) {
+ return exceptionToStatus();
+ }
+ long long cursorId = ret["cursor"].Obj()["id"].Long();
+ std::vector<BSONObj> firstBatch;
+ for (BSONElement elem : ret["cursor"].Obj()["firstBatch"].Array()) {
+ firstBatch.emplace_back(elem.Obj().getOwned());
+ }
+
+ return {std::make_unique<DBClientCursor>(client,
+ aggRequest.getNamespaceString(),
+ cursorId,
+ 0,
+ useExhaust ? QueryOption_Exhaust : 0,
+ firstBatch)};
+}
+
DBClientCursor::~DBClientCursor() {
kill();
}
diff --git a/src/mongo/client/dbclient_cursor.h b/src/mongo/client/dbclient_cursor.h
index cbc73341f88..29ecd2ea5a7 100644
--- a/src/mongo/client/dbclient_cursor.h
+++ b/src/mongo/client/dbclient_cursor.h
@@ -35,6 +35,7 @@
#include "mongo/db/jsobj.h"
#include "mongo/db/json.h"
#include "mongo/db/namespace_string.h"
+#include "mongo/db/pipeline/aggregation_request.h"
#include "mongo/rpc/message.h"
namespace mongo {
@@ -162,6 +163,9 @@ public:
int options,
std::vector<BSONObj> initialBatch = {});
+ static StatusWith<std::unique_ptr<DBClientCursor>> fromAggregationRequest(
+ DBClientBase* client, AggregationRequest aggRequest, bool secondaryOk, bool useExhaust);
+
virtual ~DBClientCursor();
long long getCursorId() const {
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp
index c79267245db..6ce03f511de 100644
--- a/src/mongo/db/repl/oplog.cpp
+++ b/src/mongo/db/repl/oplog.cpp
@@ -117,6 +117,7 @@ namespace {
using namespace fmt::literals;
+MONGO_FAIL_POINT_DEFINE(addDestinedRecipient);
MONGO_FAIL_POINT_DEFINE(sleepBetweenInsertOpTimeGenerationAndLogOp);
// Failpoint to block after a write and its oplog entry have been written to the storage engine and
@@ -285,6 +286,10 @@ void _logOpsInner(OperationContext* opCtx,
}
OpTime logOp(OperationContext* opCtx, MutableOplogEntry* oplogEntry) {
+ addDestinedRecipient.execute([&](const BSONObj& data) {
+ auto recipient = data["destinedRecipient"].String();
+ oplogEntry->setDestinedRecipient(boost::make_optional<ShardId>({recipient}));
+ });
// All collections should have UUIDs now, so all insert, update, and delete oplog entries should
// also have uuids. Some no-op (n) and command (c) entries may still elide the uuid field.
invariant(oplogEntry->getUuid() || oplogEntry->getOpType() == OpTypeEnum::kNoop ||
@@ -394,6 +399,10 @@ std::vector<OpTime> logInsertOps(OperationContext* opCtx,
oplogEntry.setObject(begin[i].doc);
oplogEntry.setOpTime(insertStatementOplogSlot);
oplogEntry.setDestinedRecipient(getDestinedRecipient(opCtx, nss, begin[i].doc));
+ addDestinedRecipient.execute([&](const BSONObj& data) {
+ auto recipient = data["destinedRecipient"].String();
+ oplogEntry.setDestinedRecipient(boost::make_optional<ShardId>({recipient}));
+ });
OplogLink oplogLink;
if (i > 0)
diff --git a/src/mongo/db/repl/read_concern_args.cpp b/src/mongo/db/repl/read_concern_args.cpp
index d1067a0ff2e..27c949b1d63 100644
--- a/src/mongo/db/repl/read_concern_args.cpp
+++ b/src/mongo/db/repl/read_concern_args.cpp
@@ -71,9 +71,9 @@ ReadConcernArgs::ReadConcernArgs(boost::optional<OpTime> opTime,
boost::optional<ReadConcernLevel> level)
: _opTime(std::move(opTime)), _level(std::move(level)), _specified(_opTime || _level) {}
-ReadConcernArgs::ReadConcernArgs(boost::optional<LogicalTime> clusterTime,
+ReadConcernArgs::ReadConcernArgs(boost::optional<LogicalTime> afterClusterTime,
boost::optional<ReadConcernLevel> level)
- : _afterClusterTime(std::move(clusterTime)),
+ : _afterClusterTime(std::move(afterClusterTime)),
_level(std::move(level)),
_specified(_afterClusterTime || _level) {}
diff --git a/src/mongo/db/repl/read_concern_args.h b/src/mongo/db/repl/read_concern_args.h
index ba6004d13bf..5c8b0d101fd 100644
--- a/src/mongo/db/repl/read_concern_args.h
+++ b/src/mongo/db/repl/read_concern_args.h
@@ -80,7 +80,7 @@ public:
ReadConcernArgs(boost::optional<OpTime> opTime, boost::optional<ReadConcernLevel> level);
- ReadConcernArgs(boost::optional<LogicalTime> clusterTime,
+ ReadConcernArgs(boost::optional<LogicalTime> afterClusterTime,
boost::optional<ReadConcernLevel> level);
/**
* Format:
@@ -167,7 +167,21 @@ public:
boost::optional<LogicalTime> getArgsAfterClusterTime() const;
boost::optional<LogicalTime> getArgsAtClusterTime() const;
+
+ /**
+ * Returns a BSON object of the form:
+ *
+ * { readConcern: { level: "...",
+ * afterClusterTime: Timestamp(...) } }
+ */
BSONObj toBSON() const;
+
+ /**
+ * Returns a BSON object of the form:
+ *
+ * { level: "...",
+ * afterClusterTime: Timestamp(...) }
+ */
BSONObj toBSONInner() const;
std::string toString() const;
diff --git a/src/mongo/db/repl/replication_coordinator_mock.cpp b/src/mongo/db/repl/replication_coordinator_mock.cpp
index ced4341556c..e1214838634 100644
--- a/src/mongo/db/repl/replication_coordinator_mock.cpp
+++ b/src/mongo/db/repl/replication_coordinator_mock.cpp
@@ -650,7 +650,12 @@ std::shared_ptr<const IsMasterResponse> ReplicationCoordinatorMock::awaitIsMaste
response->setReplSetVersion(config.getConfigVersion());
response->setIsMaster(true);
response->setIsSecondary(false);
- response->setMe(config.getMemberAt(0).getHostAndPort());
+ if (config.getNumMembers() > 0) {
+ response->setMe(config.getMemberAt(0).getHostAndPort());
+ } else {
+ response->setMe(HostAndPort::parseThrowing("localhost:27017"));
+ }
+
response->setElectionId(OID::gen());
response->setTopologyVersion(TopologyVersion(repl::instanceId, 0));
return response;
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index c8ae8134680..76740dcbd9e 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -72,6 +72,7 @@ env.Library(
'resharding/resharding_coordinator_service.cpp',
'resharding/resharding_donor_oplog_iterator.cpp',
'resharding/resharding_donor_service.cpp',
+ 'resharding/resharding_oplog_fetcher.cpp',
'resharding/resharding_recipient_service.cpp',
'scoped_operation_completion_sharding_actions.cpp',
'session_catalog_migration_destination.cpp',
@@ -122,6 +123,7 @@ env.Library(
'transaction_coordinator',
],
LIBDEPS_PRIVATE=[
+ '$BUILD_DIR/mongo/client/clientdriver_minimal',
'$BUILD_DIR/mongo/db/index_builds_coordinator_interface',
'$BUILD_DIR/mongo/db/rs_local_client',
'$BUILD_DIR/mongo/db/session_catalog',
diff --git a/src/mongo/db/s/resharding/donor_oplog_id.idl b/src/mongo/db/s/resharding/donor_oplog_id.idl
index 6393e57aa32..6ab3fbc2913 100644
--- a/src/mongo/db/s/resharding/donor_oplog_id.idl
+++ b/src/mongo/db/s/resharding/donor_oplog_id.idl
@@ -38,6 +38,7 @@ structs:
ReshardingDonorOplogId:
description: >-
Represents the set of timestamps that belong to an operation from the donor shard.
+ generate_comparison_operators: true
fields:
clusterTime:
type: timestamp
diff --git a/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp b/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp
new file mode 100644
index 00000000000..3e10a4ce030
--- /dev/null
+++ b/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp
@@ -0,0 +1,110 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kSharding
+
+#include "mongo/platform/basic.h"
+
+#include <vector>
+
+#include "mongo/db/s/resharding/resharding_oplog_fetcher.h"
+
+#include "mongo/bson/bsonobj.h"
+#include "mongo/db/catalog_raii.h"
+#include "mongo/db/concurrency/write_conflict_exception.h"
+#include "mongo/db/repl/read_concern_args.h"
+#include "mongo/db/repl/read_concern_level.h"
+#include "mongo/db/s/resharding_util.h"
+#include "mongo/db/storage/write_unit_of_work.h"
+#include "mongo/logv2/log.h"
+
+namespace mongo {
+ReshardingDonorOplogId ReshardingOplogFetcher::iterate(
+ OperationContext* opCtx,
+ DBClientBase* conn,
+ boost::intrusive_ptr<ExpressionContext> expCtx,
+ ReshardingDonorOplogId startAfter,
+ UUID collUUID,
+ const ShardId& recipientShard,
+ bool doesDonorOwnMinKeyChunk,
+ NamespaceString toWriteToNss) {
+
+ // This method will use the input opCtx to perform writes into `toWriteToNss`.
+ invariant(!opCtx->lockState()->inAWriteUnitOfWork());
+
+ // Create the destination collection if necessary.
+ writeConflictRetry(opCtx, "createReshardingLocalOplogBuffer", toWriteToNss.toString(), [&] {
+ const CollectionPtr toWriteTo =
+ CollectionCatalog::get(opCtx).lookupCollectionByNamespace(opCtx, toWriteToNss);
+ if (toWriteTo) {
+ return;
+ }
+
+ WriteUnitOfWork wuow(opCtx);
+ AutoGetOrCreateDb db(opCtx, toWriteToNss.db(), LockMode::MODE_IX);
+ Lock::CollectionLock collLock(opCtx, toWriteToNss, MODE_IX);
+ db.getDb()->createCollection(opCtx, toWriteToNss);
+ wuow.commit();
+ });
+
+ std::vector<BSONObj> serializedPipeline =
+ createOplogFetchingPipelineForResharding(
+ expCtx, startAfter, collUUID, recipientShard, doesDonorOwnMinKeyChunk)
+ ->serializeToBson();
+
+ AggregationRequest aggRequest(NamespaceString::kRsOplogNamespace, serializedPipeline);
+ auto readConcernArgs = repl::ReadConcernArgs(
+ boost::optional<LogicalTime>(startAfter.getTs()),
+ boost::optional<repl::ReadConcernLevel>(repl::ReadConcernLevel::kMajorityReadConcern));
+ aggRequest.setReadConcern(readConcernArgs.toBSONInner());
+ aggRequest.setHint(BSON("$natural" << 1));
+
+ const bool secondaryOk = true;
+ const bool useExhaust = true;
+ std::unique_ptr<DBClientCursor> cursor = uassertStatusOK(DBClientCursor::fromAggregationRequest(
+ conn, std::move(aggRequest), secondaryOk, useExhaust));
+
+ // Noting some possible optimizations:
+ //
+ // * Batch more inserts into larger storage transactions.
+ // * Parallize writing documents across multiple threads.
+ // * Doing either of the above while still using the underlying message buffer of bson objects.
+ AutoGetCollection toWriteTo(opCtx, toWriteToNss, LockMode::MODE_IX);
+ ReshardingDonorOplogId lastSeen = startAfter;
+ while (cursor->more()) {
+ WriteUnitOfWork wuow(opCtx);
+ BSONObj obj = cursor->next();
+ lastSeen = ReshardingDonorOplogId::parse({"OplogFetcherParsing"}, obj["_id"].Obj());
+ uassertStatusOK(toWriteTo->insertDocument(opCtx, InsertStatement{obj}, nullptr));
+ wuow.commit();
+ }
+
+ return lastSeen;
+}
+} // namespace mongo
diff --git a/src/mongo/db/s/resharding/resharding_oplog_fetcher.h b/src/mongo/db/s/resharding/resharding_oplog_fetcher.h
new file mode 100644
index 00000000000..cd242aafe3d
--- /dev/null
+++ b/src/mongo/db/s/resharding/resharding_oplog_fetcher.h
@@ -0,0 +1,68 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+#pragma once
+
+#include "mongo/base/status_with.h"
+#include "mongo/client/dbclient_base.h"
+#include "mongo/db/namespace_string.h"
+#include "mongo/db/operation_context.h"
+#include "mongo/db/pipeline/expression_context.h"
+#include "mongo/db/s/resharding/donor_oplog_id_gen.h"
+#include "mongo/s/shard_id.h"
+#include "mongo/util/uuid.h"
+
+namespace mongo {
+class ReshardingOplogFetcher {
+public:
+ /**
+ * Issues an aggregation to `DBClientBase`s starting at `startAfter` and copies the entries
+ * relevant to `recipientShard` into `toWriteInto`. Control is returned when the aggregation
+ * cursor is exhausted.
+ *
+ * Returns an identifier for the last oplog-ish document written to `toWriteInto`.
+ *
+ * This method throws.
+ *
+ * TODO SERVER-51245 Replace `DBClientBase` with a `Shard`. Right now `Shard` does not do things
+ * like perform aggregate commands nor does it expose a cursor/stream interface. However, using
+ * a `Shard` object will provide critical behavior such as advancing logical clock values on a
+ * response and targetting a node to send the aggregation command to.
+ */
+ ReshardingDonorOplogId iterate(OperationContext* opCtx,
+ DBClientBase* conn,
+ boost::intrusive_ptr<ExpressionContext> expCtx,
+ ReshardingDonorOplogId startAfter,
+ UUID collUUID,
+ const ShardId& recipientShard,
+ bool doesDonorOwnMinKeyChunk,
+ NamespaceString toWriteInto);
+
+private:
+};
+} // namespace mongo
diff --git a/src/mongo/db/s/resharding_util.cpp b/src/mongo/db/s/resharding_util.cpp
index 339f4f456d2..40e1df609de 100644
--- a/src/mongo/db/s/resharding_util.cpp
+++ b/src/mongo/db/s/resharding_util.cpp
@@ -369,7 +369,7 @@ std::unique_ptr<Pipeline, PipelineDeleter> createConfigTxnCloningPipelineForResh
void createSlimOplogView(OperationContext* opCtx, Database* db) {
writeConflictRetry(
- opCtx, "createReshardingOplog", "local.system.resharding.slimOplogForGraphLookup", [&] {
+ opCtx, "createReshardingSlimOplog", "local.system.resharding.slimOplogForGraphLookup", [&] {
{
// Create 'system.views' in a separate WUOW if it does not exist.
WriteUnitOfWork wuow(opCtx);
diff --git a/src/mongo/db/s/resharding_util.h b/src/mongo/db/s/resharding_util.h
index 247dfba84a9..af5001e34dd 100644
--- a/src/mongo/db/s/resharding_util.h
+++ b/src/mongo/db/s/resharding_util.h
@@ -180,6 +180,7 @@ std::unique_ptr<Pipeline, PipelineDeleter> createOplogFetchingPipelineForReshard
boost::optional<ShardId> getDestinedRecipient(OperationContext* opCtx,
const NamespaceString& sourceNss,
const BSONObj& fullDocument);
+
/**
* Creates pipeline for filtering collection data matching the recipient shard.
*/
diff --git a/src/mongo/dbtests/SConscript b/src/mongo/dbtests/SConscript
index d96d9c9fa66..d670d6e74d0 100644
--- a/src/mongo/dbtests/SConscript
+++ b/src/mongo/dbtests/SConscript
@@ -123,6 +123,7 @@ if not has_option('noshell') and usemozjs:
'querytests.cpp',
'replica_set_tests.cpp',
'repltests.cpp',
+ 'resharding_tests.cpp',
'rollbacktests.cpp',
'scanning_replica_set_monitor_test.cpp',
'socktests.cpp',
@@ -166,6 +167,7 @@ if not has_option('noshell') and usemozjs:
"$BUILD_DIR/mongo/db/repl/serveronly_repl",
"$BUILD_DIR/mongo/db/repl/storage_interface_impl",
"$BUILD_DIR/mongo/db/repl/timestamp_block",
+ "$BUILD_DIR/mongo/db/s/resharding_util",
"$BUILD_DIR/mongo/db/server_options_core",
"$BUILD_DIR/mongo/db/sessions_collection_standalone",
"$BUILD_DIR/mongo/db/storage/durable_catalog_impl",
diff --git a/src/mongo/dbtests/resharding_tests.cpp b/src/mongo/dbtests/resharding_tests.cpp
new file mode 100644
index 00000000000..8b29040edc1
--- /dev/null
+++ b/src/mongo/dbtests/resharding_tests.cpp
@@ -0,0 +1,323 @@
+/**
+ * Copyright (C) 2020-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTest
+
+#include "mongo/platform/basic.h"
+
+#include <memory>
+
+#include "mongo/db/client.h"
+#include "mongo/db/concurrency/write_conflict_exception.h"
+#include "mongo/db/db_raii.h"
+#include "mongo/db/dbdirectclient.h"
+#include "mongo/db/dbhelpers.h"
+#include "mongo/db/global_settings.h"
+#include "mongo/db/op_observer_impl.h"
+#include "mongo/db/op_observer_registry.h"
+#include "mongo/db/pipeline/expression_context_for_test.h"
+#include "mongo/db/repl/oplog.h"
+#include "mongo/db/repl/repl_client_info.h"
+#include "mongo/db/repl/replication_consistency_markers_mock.h"
+#include "mongo/db/repl/replication_coordinator.h"
+#include "mongo/db/repl/replication_coordinator_mock.h"
+#include "mongo/db/repl/replication_process.h"
+#include "mongo/db/repl/replication_recovery_mock.h"
+#include "mongo/db/repl/storage_interface_impl.h"
+#include "mongo/db/s/resharding/resharding_oplog_fetcher.h"
+#include "mongo/db/s/resharding_util.h"
+#include "mongo/db/service_context.h"
+#include "mongo/db/storage/snapshot_manager.h"
+#include "mongo/db/storage/storage_engine.h"
+#include "mongo/db/storage/write_unit_of_work.h"
+#include "mongo/db/vector_clock_mutable.h"
+#include "mongo/dbtests/dbtests.h"
+#include "mongo/logv2/log.h"
+#include "mongo/unittest/unittest.h"
+
+namespace mongo {
+namespace {
+/**
+ * RAII type for operating at a timestamp. Will remove any timestamping when the object destructs.
+ */
+class OneOffRead {
+public:
+ OneOffRead(OperationContext* opCtx, const Timestamp& ts) : _opCtx(opCtx) {
+ _opCtx->recoveryUnit()->abandonSnapshot();
+ if (ts.isNull()) {
+ _opCtx->recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kNoTimestamp);
+ } else {
+ _opCtx->recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kProvided, ts);
+ }
+ }
+
+ ~OneOffRead() {
+ _opCtx->recoveryUnit()->abandonSnapshot();
+ _opCtx->recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kNoTimestamp);
+ }
+
+private:
+ OperationContext* _opCtx;
+};
+
+/**
+ * Observed problems using ShardingMongodTestFixture:
+ *
+ * - Does not mix with dbtest. Both will initialize a ServiceContext.
+ * - By default uses ephemeralForTest. These tests require a storage engine that supports majority
+ * reads.
+ * - When run as a unittest (and using WT), the fixture initializes the storage engine for each test
+ * that is run. WT specifically installs a ServerStatusSection. The server status code asserts
+ * that a section is never added after a `serverStatus` command is run. Tests defined in
+ * `migration_manager_test` (part of the `db_s_config_server_test` unittest binary) call a
+ * serverStatus triggerring this assertion.
+ */
+class ReshardingTest : public unittest::Test {
+public:
+ ServiceContext::UniqueOperationContext _opCtxRaii = cc().makeOperationContext();
+ OperationContext* _opCtx = _opCtxRaii.get();
+ ServiceContext* _svcCtx = _opCtx->getServiceContext();
+ VectorClockMutable* _clock = VectorClockMutable::get(_opCtx);
+
+ ReshardingTest() {
+ repl::ReplSettings replSettings;
+ replSettings.setOplogSizeBytes(100 * 1024 * 1024);
+ replSettings.setReplSetString("rs0");
+ setGlobalReplSettings(replSettings);
+
+ auto replCoordinatorMock =
+ std::make_unique<repl::ReplicationCoordinatorMock>(_svcCtx, replSettings);
+ replCoordinatorMock->alwaysAllowWrites(true);
+ repl::ReplicationCoordinator::set(_svcCtx, std::move(replCoordinatorMock));
+ repl::StorageInterface::set(_svcCtx, std::make_unique<repl::StorageInterfaceImpl>());
+ repl::ReplicationProcess::set(
+ _svcCtx,
+ std::make_unique<repl::ReplicationProcess>(
+ repl::StorageInterface::get(_svcCtx),
+ std::make_unique<repl::ReplicationConsistencyMarkersMock>(),
+ std::make_unique<repl::ReplicationRecoveryMock>()));
+
+ // Since the Client object persists across tests, even though the global
+ // ReplicationCoordinator does not, we need to clear the last op associated with the client
+ // to avoid the invariant in ReplClientInfo::setLastOp that the optime only goes forward.
+ repl::ReplClientInfo::forClient(_opCtx->getClient()).clearLastOp_forTest();
+
+ auto opObsRegistry = std::make_unique<OpObserverRegistry>();
+ opObsRegistry->addObserver(std::make_unique<OpObserverImpl>());
+ _opCtx->getServiceContext()->setOpObserver(std::move(opObsRegistry));
+
+ repl::setOplogCollectionName(_svcCtx);
+ repl::createOplog(_opCtx);
+
+ _clock->tickClusterTimeTo(LogicalTime(Timestamp(1, 0)));
+ }
+
+ ~ReshardingTest() {
+ try {
+ reset(NamespaceString("local.oplog.rs"));
+ } catch (...) {
+ FAIL("Exception while cleaning up test");
+ }
+ }
+
+
+ /**
+ * Walking on ice: resetting the ReplicationCoordinator destroys the underlying
+ * `DropPendingCollectionReaper`. Use a truncate/dropAllIndexes to clean out a collection
+ * without actually dropping it.
+ */
+ void reset(NamespaceString nss) const {
+ ::mongo::writeConflictRetry(_opCtx, "deleteAll", nss.ns(), [&] {
+ _opCtx->recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kNoTimestamp);
+ AutoGetCollection collRaii(_opCtx, nss, LockMode::MODE_X);
+
+ if (collRaii) {
+ WriteUnitOfWork wunit(_opCtx);
+ invariant(collRaii.getWritableCollection()->truncate(_opCtx).isOK());
+ if (_opCtx->recoveryUnit()->getCommitTimestamp().isNull()) {
+ ASSERT_OK(_opCtx->recoveryUnit()->setTimestamp(Timestamp(1, 1)));
+ }
+ collRaii.getWritableCollection()->getIndexCatalog()->dropAllIndexes(_opCtx, false);
+ wunit.commit();
+ return;
+ }
+
+ AutoGetOrCreateDb dbRaii(_opCtx, nss.db(), LockMode::MODE_X);
+ WriteUnitOfWork wunit(_opCtx);
+ if (_opCtx->recoveryUnit()->getCommitTimestamp().isNull()) {
+ ASSERT_OK(_opCtx->recoveryUnit()->setTimestamp(Timestamp(1, 1)));
+ }
+ invariant(dbRaii.getDb()->createCollection(_opCtx, nss));
+ wunit.commit();
+ });
+ }
+
+ void insertDocument(const CollectionPtr& coll, const InsertStatement& stmt) {
+ // Insert some documents.
+ OpDebug* const nullOpDebug = nullptr;
+ const bool fromMigrate = false;
+ ASSERT_OK(coll->insertDocument(_opCtx, stmt, nullOpDebug, fromMigrate));
+ }
+
+ BSONObj queryCollection(NamespaceString nss, const BSONObj& query) {
+ BSONObj ret;
+ ASSERT_TRUE(Helpers::findOne(
+ _opCtx, AutoGetCollectionForRead(_opCtx, nss).getCollection(), query, ret))
+ << "Query: " << query;
+ return ret;
+ }
+
+ BSONObj queryOplog(const BSONObj& query) {
+ OneOffRead oor(_opCtx, Timestamp::min());
+ return queryCollection(NamespaceString::kRsOplogNamespace, query);
+ }
+
+ repl::OpTime getLastApplied() {
+ return repl::ReplicationCoordinator::get(_opCtx)->getMyLastAppliedOpTime();
+ }
+
+ boost::intrusive_ptr<ExpressionContextForTest> createExpressionContext() {
+ NamespaceString slimNss =
+ NamespaceString("local.system.resharding.slimOplogForGraphLookup");
+
+ boost::intrusive_ptr<ExpressionContextForTest> expCtx(
+ new ExpressionContextForTest(_opCtx, NamespaceString::kRsOplogNamespace));
+ expCtx->setResolvedNamespace(NamespaceString::kRsOplogNamespace,
+ {NamespaceString::kRsOplogNamespace, {}});
+ expCtx->setResolvedNamespace(slimNss,
+ {slimNss, std::vector<BSONObj>{getSlimOplogPipeline()}});
+ return expCtx;
+ }
+
+ int itcount(NamespaceString nss) {
+ OneOffRead oof(_opCtx, Timestamp::min());
+ AutoGetCollectionForRead autoColl(_opCtx, nss);
+ auto cursor = autoColl.getCollection()->getCursor(_opCtx);
+
+ int ret = 0;
+ while (auto rec = cursor->next()) {
+ ++ret;
+ }
+
+ return ret;
+ }
+};
+
+TEST_F(ReshardingTest, RunFetchIteration) {
+ const NamespaceString outputCollectionNss("dbtests.outputCollection");
+ reset(outputCollectionNss);
+ const NamespaceString dataCollectionNss("dbtests.runFetchIteration");
+ reset(dataCollectionNss);
+
+ AutoGetCollection dataColl(_opCtx, dataCollectionNss, LockMode::MODE_IX);
+
+ // Set a failpoint to tack a `destinedRecipient` onto oplog entries.
+ setGlobalFailPoint("addDestinedRecipient",
+ BSON("mode"
+ << "alwaysOn"
+ << "data"
+ << BSON("destinedRecipient"
+ << "shard1")));
+
+ // Insert five documents. Advance the majority point. Insert five more documents.
+ const std::int32_t docsToInsert = 5;
+ {
+ WriteUnitOfWork wuow(_opCtx);
+ for (std::int32_t num = 0; num < docsToInsert; ++num) {
+ insertDocument(dataColl.getCollection(),
+ InsertStatement(BSON("_id" << num << "a" << num)));
+ }
+ wuow.commit();
+ }
+
+ repl::StorageInterface::get(_opCtx)->waitForAllEarlierOplogWritesToBeVisible(_opCtx);
+ const Timestamp firstFiveLastApplied = getLastApplied().getTimestamp();
+ _svcCtx->getStorageEngine()->getSnapshotManager()->setCommittedSnapshot(firstFiveLastApplied);
+ {
+ WriteUnitOfWork wuow(_opCtx);
+ for (std::int32_t num = docsToInsert; num < 2 * docsToInsert; ++num) {
+ insertDocument(dataColl.getCollection(),
+ InsertStatement(BSON("_id" << num << "a" << num)));
+ }
+ wuow.commit();
+ }
+
+ // Disable the failpoint.
+ setGlobalFailPoint("addDestinedRecipient",
+ BSON("mode"
+ << "off"));
+
+ repl::StorageInterface::get(_opCtx)->waitForAllEarlierOplogWritesToBeVisible(_opCtx);
+ const Timestamp latestLastApplied = getLastApplied().getTimestamp();
+
+ BSONObj firstOplog = queryOplog(BSONObj());
+ Timestamp firstTimestamp = firstOplog["ts"].timestamp();
+ std::cout << "First oplog: " << firstOplog.toString()
+ << " Timestamp: " << firstTimestamp.toString() << std::endl;
+
+ // The first call to `iterate` should return the first five inserts and return a
+ // `ReshardingDonorOplogId` matching the last applied of those five inserts.
+ ReshardingOplogFetcher fetcher;
+ DBDirectClient client(_opCtx);
+ StatusWith<ReshardingDonorOplogId> ret = fetcher.iterate(_opCtx,
+ &client,
+ createExpressionContext(),
+ {firstTimestamp, firstTimestamp},
+ dataColl->uuid(),
+ {"shard1"},
+ true,
+ outputCollectionNss);
+ ReshardingDonorOplogId donorOplodId = unittest::assertGet(ret);
+ // +1 because of the create collection oplog entry.
+ ASSERT_EQ(docsToInsert + 1, itcount(outputCollectionNss));
+ ASSERT_EQ(firstFiveLastApplied, donorOplodId.getClusterTime());
+ ASSERT_EQ(firstFiveLastApplied, donorOplodId.getTs());
+
+ // Advance the committed snapshot. A second `iterate` should return the second batch of five
+ // inserts.
+ _svcCtx->getStorageEngine()->getSnapshotManager()->setCommittedSnapshot(
+ getLastApplied().getTimestamp());
+
+ ret = fetcher.iterate(_opCtx,
+ &client,
+ createExpressionContext(),
+ {firstFiveLastApplied, firstFiveLastApplied},
+ dataColl->uuid(),
+ {"shard1"},
+ true,
+ outputCollectionNss);
+
+ donorOplodId = unittest::assertGet(ret);
+ // Two batches of five inserts + 1 entry for the create collection oplog entry.
+ ASSERT_EQ((2 * docsToInsert) + 1, itcount(outputCollectionNss));
+ ASSERT_EQ(latestLastApplied, donorOplodId.getClusterTime());
+ ASSERT_EQ(latestLastApplied, donorOplodId.getTs());
+}
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/rpc/message.cpp b/src/mongo/rpc/message.cpp
index e002b35ec37..ac4a3dac03b 100644
--- a/src/mongo/rpc/message.cpp
+++ b/src/mongo/rpc/message.cpp
@@ -31,7 +31,10 @@
#include "mongo/rpc/message.h"
+#include <fmt/format.h>
+
#include "mongo/platform/atomic_word.h"
+#include "mongo/rpc/op_msg.h"
namespace mongo {
@@ -43,4 +46,29 @@ int32_t nextMessageId() {
return NextMsgId.fetchAndAdd(1);
}
+std::string Message::opMsgDebugString() const {
+ MsgData::ConstView headerView = header();
+ auto opMsgRequest = OpMsgRequest::parse(*this);
+ std::stringstream docSequences;
+ int idx = 0;
+ for (const auto& seq : opMsgRequest.sequences) {
+ docSequences << fmt::format("Sequence Idx: {} Sequence Name: {}", idx++, seq.name)
+ << std::endl;
+ for (const auto& obj : seq.objs) {
+ docSequences << fmt::format("\t{}", obj.toString()) << std::endl;
+ }
+ }
+
+ return fmt::format(
+ "Length: {} RequestId: {} ResponseTo: {} OpCode: {} Flags: {} Body: {}\n"
+ "Sections: {}",
+ headerView.getLen(),
+ headerView.getId(),
+ headerView.getResponseToMsgId(),
+ headerView.getNetworkOp(),
+ OpMsg::flags(*this),
+ opMsgRequest.body.toString(),
+ docSequences.str());
+}
+
} // namespace mongo
diff --git a/src/mongo/rpc/message.h b/src/mongo/rpc/message.h
index 2acb34a632a..38ecf8b12e7 100644
--- a/src/mongo/rpc/message.h
+++ b/src/mongo/rpc/message.h
@@ -471,6 +471,8 @@ public:
return _buf;
}
+ std::string opMsgDebugString() const;
+
private:
SharedBuffer _buf;
};