summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/db/s/resharding/resharding_data_copy_util.cpp24
-rw-r--r--src/mongo/db/s/resharding/resharding_data_copy_util.h7
-rw-r--r--src/mongo/db/s/resharding/resharding_data_replication.cpp21
-rw-r--r--src/mongo/db/s/resharding/resharding_data_replication.h1
-rw-r--r--src/mongo/db/s/resharding/resharding_data_replication_test.cpp26
-rw-r--r--src/mongo/db/s/resharding/resharding_donor_oplog_iterator.cpp15
-rw-r--r--src/mongo/db/s/resharding/resharding_donor_oplog_iterator_test.cpp17
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp16
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_fetcher.h4
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp70
10 files changed, 165 insertions, 36 deletions
diff --git a/src/mongo/db/s/resharding/resharding_data_copy_util.cpp b/src/mongo/db/s/resharding/resharding_data_copy_util.cpp
index 745e83a40f9..0efb2179306 100644
--- a/src/mongo/db/s/resharding/resharding_data_copy_util.cpp
+++ b/src/mongo/db/s/resharding/resharding_data_copy_util.cpp
@@ -162,6 +162,21 @@ void ensureTemporaryReshardingCollectionRenamed(OperationContext* opCtx,
}
Value findHighestInsertedId(OperationContext* opCtx, const CollectionPtr& collection) {
+ auto doc = findDocWithHighestInsertedId(opCtx, collection);
+ if (!doc) {
+ return Value{};
+ }
+
+ auto value = (*doc)["_id"];
+ uassert(4929300,
+ "Missing _id field for document in temporary resharding collection",
+ !value.missing());
+
+ return value;
+}
+
+boost::optional<Document> findDocWithHighestInsertedId(OperationContext* opCtx,
+ const CollectionPtr& collection) {
auto findCommand = std::make_unique<FindCommandRequest>(collection->ns());
findCommand->setLimit(1);
findCommand->setSort(BSON("_id" << -1));
@@ -169,16 +184,11 @@ Value findHighestInsertedId(OperationContext* opCtx, const CollectionPtr& collec
auto recordId =
Helpers::findOne(opCtx, collection, std::move(findCommand), true /* requireIndex */);
if (recordId.isNull()) {
- return Value{};
+ return boost::none;
}
auto doc = collection->docFor(opCtx, recordId).value();
- auto value = Value{doc["_id"]};
- uassert(4929300,
- "Missing _id field for document in temporary resharding collection",
- !value.missing());
-
- return value;
+ return Document{doc};
}
std::vector<InsertStatement> fillBatchForInsert(Pipeline& pipeline, int batchSizeLimitBytes) {
diff --git a/src/mongo/db/s/resharding/resharding_data_copy_util.h b/src/mongo/db/s/resharding/resharding_data_copy_util.h
index 0165284e5a8..c34768dcea1 100644
--- a/src/mongo/db/s/resharding/resharding_data_copy_util.h
+++ b/src/mongo/db/s/resharding/resharding_data_copy_util.h
@@ -34,6 +34,7 @@
#include "mongo/bson/bsonobj.h"
#include "mongo/db/catalog/collection_catalog.h"
+#include "mongo/db/exec/document_value/document.h"
#include "mongo/db/exec/document_value/value.h"
#include "mongo/db/logical_session_id.h"
#include "mongo/db/repl/oplog.h"
@@ -95,6 +96,12 @@ void ensureTemporaryReshardingCollectionRenamed(OperationContext* opCtx,
Value findHighestInsertedId(OperationContext* opCtx, const CollectionPtr& collection);
/**
+ * Returns the full document of the largest _id value in the collection.
+ */
+boost::optional<Document> findDocWithHighestInsertedId(OperationContext* opCtx,
+ const CollectionPtr& collection);
+
+/**
* Returns a batch of documents suitable for being inserted with insertBatch().
*
* The batch of documents is returned once its size exceeds batchSizeLimitBytes or the pipeline has
diff --git a/src/mongo/db/s/resharding/resharding_data_replication.cpp b/src/mongo/db/s/resharding/resharding_data_replication.cpp
index bec1e02dec5..8ad0d2a4d3e 100644
--- a/src/mongo/db/s/resharding/resharding_data_replication.cpp
+++ b/src/mongo/db/s/resharding/resharding_data_replication.cpp
@@ -122,7 +122,8 @@ std::vector<std::unique_ptr<ReshardingOplogFetcher>> ReshardingDataReplication::
auto oplogBufferNss =
getLocalOplogBufferNamespace(metadata.getSourceUUID(), donor.getShardId());
auto minFetchTimestamp = *donor.getMinFetchTimestamp();
- auto idToResumeFrom = getOplogFetcherResumeId(opCtx, oplogBufferNss, minFetchTimestamp);
+ auto idToResumeFrom = getOplogFetcherResumeId(
+ opCtx, metadata.getReshardingUUID(), oplogBufferNss, minFetchTimestamp);
invariant((idToResumeFrom >= ReshardingDonorOplogId{minFetchTimestamp, minFetchTimestamp}));
oplogFetchers.emplace_back(std::make_unique<ReshardingOplogFetcher>(
@@ -433,15 +434,25 @@ std::vector<NamespaceString> ReshardingDataReplication::ensureStashCollectionsEx
}
ReshardingDonorOplogId ReshardingDataReplication::getOplogFetcherResumeId(
- OperationContext* opCtx, const NamespaceString& oplogBufferNss, Timestamp minFetchTimestamp) {
+ OperationContext* opCtx,
+ const UUID& reshardingUUID,
+ const NamespaceString& oplogBufferNss,
+ Timestamp minFetchTimestamp) {
invariant(!opCtx->lockState()->isLocked());
AutoGetCollection coll(opCtx, oplogBufferNss, MODE_IS);
if (coll) {
- auto highestOplogBufferId = resharding::data_copy::findHighestInsertedId(opCtx, *coll);
- if (!highestOplogBufferId.missing()) {
+ auto highestOplogBufferId =
+ resharding::data_copy::findDocWithHighestInsertedId(opCtx, *coll);
+
+ if (highestOplogBufferId) {
+ auto oplogEntry = repl::OplogEntry{highestOplogBufferId->toBson()};
+ if (isFinalOplog(oplogEntry, reshardingUUID)) {
+ return ReshardingOplogFetcher::kFinalOpAlreadyFetched;
+ }
+
return ReshardingDonorOplogId::parse({"getOplogFetcherResumeId"},
- highestOplogBufferId.getDocument().toBson());
+ oplogEntry.get_id()->getDocument().toBson());
}
}
diff --git a/src/mongo/db/s/resharding/resharding_data_replication.h b/src/mongo/db/s/resharding/resharding_data_replication.h
index 821987f2001..fcdd77dcb68 100644
--- a/src/mongo/db/s/resharding/resharding_data_replication.h
+++ b/src/mongo/db/s/resharding/resharding_data_replication.h
@@ -178,6 +178,7 @@ public:
const std::vector<DonorShardFetchTimestamp>& donorShards);
static ReshardingDonorOplogId getOplogFetcherResumeId(OperationContext* opCtx,
+ const UUID& reshardingUUID,
const NamespaceString& oplogBufferNss,
Timestamp minFetchTimestamp);
diff --git a/src/mongo/db/s/resharding/resharding_data_replication_test.cpp b/src/mongo/db/s/resharding/resharding_data_replication_test.cpp
index d04d0ec9879..e6602cb14de 100644
--- a/src/mongo/db/s/resharding/resharding_data_replication_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_data_replication_test.cpp
@@ -98,6 +98,14 @@ public:
boost::none /* clusterTime */);
}
+ const NamespaceString& sourceNss() {
+ return _sourceNss;
+ }
+
+ const CollectionUUID& sourceUUID() {
+ return _sourceUUID;
+ }
+
private:
RoutingTableHistoryValueHandle makeStandaloneRoutingTableHistory(RoutingTableHistory rt) {
const auto version = rt.getVersion();
@@ -193,7 +201,7 @@ TEST_F(ReshardingDataReplicationTest, GetOplogFetcherResumeId) {
// The minFetchTimestamp value is used when the oplog buffer collection doesn't exist.
ASSERT_BSONOBJ_BINARY_EQ(
ReshardingDataReplication::getOplogFetcherResumeId(
- opCtx.get(), oplogBufferNss, minFetchTimestamp)
+ opCtx.get(), reshardingUUID, oplogBufferNss, minFetchTimestamp)
.toBSON(),
(ReshardingDonorOplogId{minFetchTimestamp, minFetchTimestamp}.toBSON()));
@@ -201,21 +209,29 @@ TEST_F(ReshardingDataReplicationTest, GetOplogFetcherResumeId) {
resharding::data_copy::ensureCollectionExists(opCtx.get(), oplogBufferNss, CollectionOptions{});
ASSERT_BSONOBJ_BINARY_EQ(
ReshardingDataReplication::getOplogFetcherResumeId(
- opCtx.get(), oplogBufferNss, minFetchTimestamp)
+ opCtx.get(), reshardingUUID, oplogBufferNss, minFetchTimestamp)
.toBSON(),
(ReshardingDonorOplogId{minFetchTimestamp, minFetchTimestamp}.toBSON()));
auto insertFn = [&](const ReshardingDonorOplogId& oplogId) {
+ repl::MutableOplogEntry oplogEntry;
+ oplogEntry.setNss({});
+ oplogEntry.setOpType(repl::OpTypeEnum::kNoop);
+ oplogEntry.setObject({});
+ oplogEntry.setOpTime({{}, {}});
+ oplogEntry.setWallClockTime({});
+ oplogEntry.set_id(Value(oplogId.toBSON()));
+
AutoGetCollection oplogBufferColl(opCtx.get(), oplogBufferNss, MODE_IX);
WriteUnitOfWork wuow(opCtx.get());
ASSERT_OK(oplogBufferColl->insertDocument(
- opCtx.get(), InsertStatement{BSON("_id" << oplogId.toBSON())}, nullptr));
+ opCtx.get(), InsertStatement{oplogEntry.toBSON()}, nullptr));
wuow.commit();
};
insertFn(oplogId2);
ASSERT_BSONOBJ_BINARY_EQ(ReshardingDataReplication::getOplogFetcherResumeId(
- opCtx.get(), oplogBufferNss, minFetchTimestamp)
+ opCtx.get(), reshardingUUID, oplogBufferNss, minFetchTimestamp)
.toBSON(),
oplogId2.toBSON());
@@ -223,7 +239,7 @@ TEST_F(ReshardingDataReplicationTest, GetOplogFetcherResumeId) {
insertFn(oplogId3);
insertFn(oplogId1);
ASSERT_BSONOBJ_BINARY_EQ(ReshardingDataReplication::getOplogFetcherResumeId(
- opCtx.get(), oplogBufferNss, minFetchTimestamp)
+ opCtx.get(), reshardingUUID, oplogBufferNss, minFetchTimestamp)
.toBSON(),
oplogId3.toBSON());
}
diff --git a/src/mongo/db/s/resharding/resharding_donor_oplog_iterator.cpp b/src/mongo/db/s/resharding/resharding_donor_oplog_iterator.cpp
index 3989cc1beb8..7314a354761 100644
--- a/src/mongo/db/s/resharding/resharding_donor_oplog_iterator.cpp
+++ b/src/mongo/db/s/resharding/resharding_donor_oplog_iterator.cpp
@@ -189,14 +189,15 @@ std::vector<repl::OplogEntry> ReshardingDonorOplogIterator::_fillBatch(Pipeline&
numBytes += obj.objsize();
- // The ReshardingOplogFetcher may end up inserting no-op reshardProgressMark entries *after*
- // the reshardFinalOp entry. This can happen when the ReshardingOplogFetcher resumes after
- // a primary failover and it had already written the reshardFinalOp entry. We check each
- // oplog entry for being the reshardFinalOp and halt filling the batch so that an empty
- // batch is returned to ReshardingOplogApplier to signal all oplog entries from the donor
- // shard were applied.
if (isFinalOplog(entry)) {
- break;
+ // The ReshardingOplogFetcher should never insert documents after the reshardFinalOp
+ // entry. We defensively check each oplog entry for being the reshardFinalOp and confirm
+ // the pipeline has been exhausted.
+ if (auto nextDoc = pipeline.getNext()) {
+ tasserted(6077499,
+ fmt::format("Unexpectedly found entry after reshardFinalOp: {}",
+ redact(nextDoc->toString())));
+ }
}
} while (numBytes < resharding::gReshardingOplogBatchLimitBytes.load() &&
batch.size() < std::size_t(resharding::gReshardingOplogBatchLimitOperations.load()));
diff --git a/src/mongo/db/s/resharding/resharding_donor_oplog_iterator_test.cpp b/src/mongo/db/s/resharding/resharding_donor_oplog_iterator_test.cpp
index b5e1b2e3212..994ff476098 100644
--- a/src/mongo/db/s/resharding/resharding_donor_oplog_iterator_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_donor_oplog_iterator_test.cpp
@@ -41,6 +41,7 @@
#include "mongo/executor/thread_pool_task_executor_test_fixture.h"
#include "mongo/idl/server_parameter_test_util.h"
#include "mongo/logv2/log.h"
+#include "mongo/unittest/death_test.h"
#include "mongo/unittest/unittest.h"
namespace mongo {
@@ -229,14 +230,12 @@ TEST_F(ReshardingDonorOplogIterTest, BasicExhaust) {
const auto oplog1 = makeInsertOplog(Timestamp(2, 4), BSON("x" << 1));
const auto oplog2 = makeInsertOplog(Timestamp(33, 6), BSON("y" << 1));
const auto finalOplog = makeFinalOplog(Timestamp(43, 24));
- const auto oplogBeyond = makeInsertOplog(Timestamp(123, 46), BSON("z" << 1));
DBDirectClient client(operationContext());
const auto ns = oplogNss().ns();
client.insert(ns, oplog1.toBSON());
client.insert(ns, oplog2.toBSON());
client.insert(ns, finalOplog.toBSON());
- client.insert(ns, oplogBeyond.toBSON());
ReshardingDonorOplogIterator iter(oplogNss(), kResumeFromBeginning, &onInsertAlwaysReady);
auto executor = makeTaskExecutorForIterator();
@@ -289,7 +288,6 @@ TEST_F(ReshardingDonorOplogIterTest, ExhaustWithIncomingInserts) {
const auto oplog1 = makeInsertOplog(Timestamp(2, 4), BSON("x" << 1));
const auto oplog2 = makeInsertOplog(Timestamp(33, 6), BSON("y" << 1));
const auto finalOplog = makeFinalOplog(Timestamp(43, 24));
- const auto oplogBeyond = makeInsertOplog(Timestamp(123, 46), BSON("z" << 1));
DBDirectClient client(operationContext());
const auto ns = oplogNss().ns();
@@ -325,7 +323,6 @@ TEST_F(ReshardingDonorOplogIterTest, ExhaustWithIncomingInserts) {
client.insert(ns, oplog2.toBSON());
} else {
client.insert(ns, finalOplog.toBSON());
- client.insert(ns, oplogBeyond.toBSON());
}
}};
@@ -503,7 +500,9 @@ TEST_F(ReshardingDonorOplogIterTest, BatchIncludesProgressMarkEntries) {
ASSERT_TRUE(next.empty());
}
-TEST_F(ReshardingDonorOplogIterTest, IgnoresProgressMarkEntriesAfterFinalOp) {
+DEATH_TEST_REGEX_F(ReshardingDonorOplogIterTest,
+ ThrowsIfProgressMarkEntriesAfterFinalOp,
+ "Tripwire assertion.*6077499") {
RAIIServerParameterControllerForTest controller{"reshardingOplogBatchLimitOperations", 100};
const auto oplog1 = makeInsertOplog(Timestamp(2, 4), BSON("x" << 1));
@@ -529,13 +528,7 @@ TEST_F(ReshardingDonorOplogIterTest, IgnoresProgressMarkEntriesAfterFinalOp) {
auto altClient = makeKillableClient();
AlternativeClientRegion acr(altClient);
- auto next = getNextBatch(&iter, executor, factory);
- ASSERT_EQ(next.size(), 2U);
- ASSERT_BSONOBJ_EQ(getId(oplog1), getId(next[0]));
- ASSERT_BSONOBJ_EQ(getId(progressMarkOplog1), getId(next[1]));
-
- next = getNextBatch(&iter, executor, factory);
- ASSERT_TRUE(next.empty());
+ ASSERT_THROWS_CODE(getNextBatch(&iter, executor, factory), DBException, 6077499);
}
} // anonymous namespace
diff --git a/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp b/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp
index a4d7bb20352..23050570432 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp
+++ b/src/mongo/db/s/resharding/resharding_oplog_fetcher.cpp
@@ -75,6 +75,9 @@ boost::intrusive_ptr<ExpressionContext> _makeExpressionContext(OperationContext*
}
} // namespace
+const ReshardingDonorOplogId ReshardingOplogFetcher::kFinalOpAlreadyFetched{Timestamp::max(),
+ Timestamp::max()};
+
ReshardingOplogFetcher::ReshardingOplogFetcher(std::unique_ptr<Env> env,
UUID reshardingUUID,
UUID collUUID,
@@ -133,6 +136,14 @@ ExecutorFuture<void> ReshardingOplogFetcher::schedule(
std::shared_ptr<executor::TaskExecutor> executor,
const CancellationToken& cancelToken,
CancelableOperationContextFactory factory) {
+ if (_startAt == kFinalOpAlreadyFetched) {
+ LOGV2_INFO(6077400,
+ "Resharding oplog fetcher resumed with no more work to do",
+ "donorShard"_attr = _donorShard,
+ "reshardingUUID"_attr = _reshardingUUID);
+ return ExecutorFuture(std::move(executor));
+ }
+
return ExecutorFuture(executor)
.then([this, executor, cancelToken, factory] {
return _reschedule(std::move(executor), cancelToken, factory);
@@ -165,6 +176,10 @@ ExecutorFuture<void> ReshardingOplogFetcher::_reschedule(
})
.then([this, executor, cancelToken, factory](bool moreToCome) {
if (!moreToCome) {
+ LOGV2_INFO(6077401,
+ "Resharding oplog fetcher done fetching",
+ "donorShard"_attr = _donorShard,
+ "reshardingUUID"_attr = _reshardingUUID);
return ExecutorFuture(std::move(executor));
}
@@ -259,6 +274,7 @@ AggregateCommandRequest ReshardingOplogFetcher::_makeAggregateCommandRequest(
// concern to guarantee the postBatchResumeToken when the batch is empty is non-decreasing.
// The ReshardingOplogFetcher depends on inserting documents in increasing _id order,
// including for the synthetic no-op oplog entries generated from the postBatchResumeToken.
+ invariant(_startAt != kFinalOpAlreadyFetched);
auto readConcernArgs = repl::ReadConcernArgs(
boost::optional<LogicalTime>(_startAt.getClusterTime()),
boost::optional<repl::ReadConcernLevel>(repl::ReadConcernLevel::kMajorityReadConcern));
diff --git a/src/mongo/db/s/resharding/resharding_oplog_fetcher.h b/src/mongo/db/s/resharding/resharding_oplog_fetcher.h
index bf75536d4ae..e0aad151f78 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_fetcher.h
+++ b/src/mongo/db/s/resharding/resharding_oplog_fetcher.h
@@ -70,6 +70,10 @@ public:
ReshardingMetrics* _metrics;
};
+ // Special value to use for startAt to indicate there are no more oplog entries needing to be
+ // fetched.
+ static const ReshardingDonorOplogId kFinalOpAlreadyFetched;
+
ReshardingOplogFetcher(std::unique_ptr<Env> env,
UUID reshardingUUID,
UUID collUUID,
diff --git a/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp
index f6d9ebb21bd..f62b89351c1 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp
@@ -55,6 +55,7 @@
#include "mongo/db/storage/write_unit_of_work.h"
#include "mongo/s/catalog/sharding_catalog_client_mock.h"
#include "mongo/s/catalog/type_shard.h"
+#include "mongo/unittest/death_test.h"
#include "mongo/unittest/unittest.h"
namespace mongo {
@@ -665,5 +666,74 @@ TEST_F(ReshardingOplogFetcherTest, RetriesOnRemoteInterruptionError) {
ASSERT_TRUE(moreToCome);
}
+TEST_F(ReshardingOplogFetcherTest, ImmediatelyDoneWhenFinalOpHasAlreadyBeenFetched) {
+ const NamespaceString outputCollectionNss("dbtests.outputCollection");
+ const NamespaceString dataCollectionNss("dbtests.runFetchIteration");
+
+ create(outputCollectionNss);
+ create(dataCollectionNss);
+ _fetchTimestamp = repl::StorageInterface::get(_svcCtx)->getLatestOplogTimestamp(_opCtx);
+
+ const auto& collectionUUID = [&] {
+ AutoGetCollection dataColl(_opCtx, dataCollectionNss, LockMode::MODE_IX);
+ return dataColl->uuid();
+ }();
+
+ ReshardingOplogFetcher fetcher(makeFetcherEnv(),
+ _reshardingUUID,
+ collectionUUID,
+ ReshardingOplogFetcher::kFinalOpAlreadyFetched,
+ _donorShard,
+ _destinationShard,
+ outputCollectionNss);
+
+ auto factory = makeCancelableOpCtx();
+ auto future = fetcher.schedule(nullptr, CancellationToken::uncancelable(), factory);
+
+ ASSERT_TRUE(future.isReady());
+ ASSERT_OK(future.getNoThrow());
+}
+
+DEATH_TEST_REGEX_F(ReshardingOplogFetcherTest,
+ CannotFetchMoreWhenFinalOpHasAlreadyBeenFetched,
+ "Invariant failure.*_startAt != kFinalOpAlreadyFetched") {
+ const NamespaceString outputCollectionNss("dbtests.outputCollection");
+ const NamespaceString dataCollectionNss("dbtests.runFetchIteration");
+
+ create(outputCollectionNss);
+ create(dataCollectionNss);
+ _fetchTimestamp = repl::StorageInterface::get(_svcCtx)->getLatestOplogTimestamp(_opCtx);
+
+ const auto& collectionUUID = [&] {
+ AutoGetCollection dataColl(_opCtx, dataCollectionNss, LockMode::MODE_IX);
+ return dataColl->uuid();
+ }();
+
+ auto fetcherJob = launchAsync([&, this] {
+ ThreadClient tc("RunnerForFetcher", _svcCtx, nullptr);
+
+ // We intentionally do not call fetcher.useReadConcernForTest(false) for this test case.
+ ReshardingOplogFetcher fetcher(makeFetcherEnv(),
+ _reshardingUUID,
+ collectionUUID,
+ ReshardingOplogFetcher::kFinalOpAlreadyFetched,
+ _donorShard,
+ _destinationShard,
+ outputCollectionNss);
+ fetcher.setInitialBatchSizeForTest(2);
+
+ auto factory = makeCancelableOpCtx();
+ return fetcher.iterate(&cc(), factory);
+ });
+
+ // Calling onCommand() leads to a more helpful "Expected death, found life" error when the
+ // invariant failure isn't triggered.
+ onCommand([&](const executor::RemoteCommandRequest& request) -> StatusWith<BSONObj> {
+ return {ErrorCodes::InternalError, "this error should never be observed"};
+ });
+
+ (void)fetcherJob.timed_get(Seconds(5));
+}
+
} // namespace
} // namespace mongo