diff options
author | Samy Lanka <samy.lanka@mongodb.com> | 2019-01-14 20:49:48 -0500 |
---|---|---|
committer | Samy Lanka <samy.lanka@mongodb.com> | 2019-01-17 15:57:29 -0500 |
commit | d764e1b6346726fc2e9259ba294d029d531ff38e (patch) | |
tree | e50e5dc8a61f42f1cbcad1deb854adf98ede8693 /src/mongo/db/repl | |
parent | 9e87f6e272df4f97013dfccc4018efb79f68653a (diff) | |
download | mongo-d764e1b6346726fc2e9259ba294d029d531ff38e.tar.gz |
SERVER-36815 Remove references to oplog entry's hash field
Diffstat (limited to 'src/mongo/db/repl')
33 files changed, 547 insertions, 868 deletions
diff --git a/src/mongo/db/repl/abstract_oplog_fetcher.cpp b/src/mongo/db/repl/abstract_oplog_fetcher.cpp index 85daa6970d3..871c2dbe9e0 100644 --- a/src/mongo/db/repl/abstract_oplog_fetcher.cpp +++ b/src/mongo/db/repl/abstract_oplog_fetcher.cpp @@ -73,23 +73,8 @@ const Milliseconds kDefaultOplogGetMoreMaxMS{5000}; } // namespace -StatusWith<OpTimeWithHash> AbstractOplogFetcher::parseOpTimeWithHash(const BSONObj& oplogEntryObj) { - const auto opTime = OpTime::parseFromOplogEntry(oplogEntryObj); - if (!opTime.isOK()) { - return opTime.getStatus(); - } - - long long hash = 0; - Status hashStatus = bsonExtractIntegerField(oplogEntryObj, "h"_sd, &hash); - if (!hashStatus.isOK()) { - return hashStatus; - } - - return OpTimeWithHash{hash, opTime.getValue()}; -} - AbstractOplogFetcher::AbstractOplogFetcher(executor::TaskExecutor* executor, - OpTimeWithHash lastFetched, + OpTime lastFetched, HostAndPort source, NamespaceString nss, std::size_t maxFetcherRestarts, @@ -102,7 +87,7 @@ AbstractOplogFetcher::AbstractOplogFetcher(executor::TaskExecutor* executor, _onShutdownCallbackFn(onShutdownCallbackFn), _lastFetched(lastFetched) { - invariant(!_lastFetched.opTime.isNull()); + invariant(!_lastFetched.isNull()); invariant(onShutdownCallbackFn); } @@ -122,8 +107,7 @@ std::string AbstractOplogFetcher::toString() const { stdx::lock_guard<stdx::mutex> lock(_mutex); str::stream msg; msg << _getComponentName() << " -" - << " last optime fetched: " << _lastFetched.opTime.toString() - << " last hash fetched: " << _lastFetched.value; + << " last optime fetched: " << _lastFetched.toString(); // The fetcher is created a startup, not at construction, so we must check if it exists. if (_fetcher) { msg << " fetcher: " << _fetcher->getDiagnosticString(); @@ -139,8 +123,8 @@ void AbstractOplogFetcher::_makeAndScheduleFetcherCallback( return; } - BSONObj findCommandObj = _makeFindCommandObject( - _nss, _getLastOpTimeWithHashFetched().opTime, _getInitialFindMaxTime()); + BSONObj findCommandObj = + _makeFindCommandObject(_nss, _getLastOpTimeFetched(), _getInitialFindMaxTime()); BSONObj metadataObj = _makeMetadataObject(); Status scheduleStatus = Status::OK(); @@ -180,11 +164,11 @@ Status AbstractOplogFetcher::_scheduleFetcher_inlock() { return _fetcher->schedule(); } -OpTimeWithHash AbstractOplogFetcher::getLastOpTimeWithHashFetched_forTest() const { - return _getLastOpTimeWithHashFetched(); +OpTime AbstractOplogFetcher::getLastOpTimeFetched_forTest() const { + return _getLastOpTimeFetched(); } -OpTimeWithHash AbstractOplogFetcher::_getLastOpTimeWithHashFetched() const { +OpTime AbstractOplogFetcher::_getLastOpTimeFetched() const { stdx::lock_guard<stdx::mutex> lock(_mutex); return _lastFetched; } @@ -195,8 +179,7 @@ BSONObj AbstractOplogFetcher::getCommandObject_forTest() const { } BSONObj AbstractOplogFetcher::getFindQuery_forTest() const { - return _makeFindCommandObject( - _nss, _getLastOpTimeWithHashFetched().opTime, _getInitialFindMaxTime()); + return _makeFindCommandObject(_nss, _getLastOpTimeFetched(), _getInitialFindMaxTime()); } HostAndPort AbstractOplogFetcher::_getSource() const { @@ -222,8 +205,8 @@ void AbstractOplogFetcher::_callback(const Fetcher::QueryResponseStatus& result, // example, because it stepped down) we might not have a cursor. if (!responseStatus.isOK()) { - BSONObj findCommandObj = _makeFindCommandObject( - _nss, _getLastOpTimeWithHashFetched().opTime, _getRetriedFindMaxTime()); + BSONObj findCommandObj = + _makeFindCommandObject(_nss, _getLastOpTimeFetched(), _getRetriedFindMaxTime()); BSONObj metadataObj = _makeMetadataObject(); { stdx::lock_guard<stdx::mutex> lock(_mutex); @@ -232,7 +215,7 @@ void AbstractOplogFetcher::_callback(const Fetcher::QueryResponseStatus& result, << redact(responseStatus); } else { log() << "Restarting oplog query due to error: " << redact(responseStatus) - << ". Last fetched optime (with hash): " << _lastFetched + << ". Last fetched optime: " << _lastFetched << ". Restarts remaining: " << (_maxFetcherRestarts - _fetcherRestarts); _fetcherRestarts++; // Destroying current instance in _shuttingDownFetcher will possibly block. @@ -294,15 +277,14 @@ void AbstractOplogFetcher::_callback(const Fetcher::QueryResponseStatus& result, // completed. const auto& documents = queryResponse.documents; if (documents.size() > 0) { - auto lastDocRes = AbstractOplogFetcher::parseOpTimeWithHash(documents.back()); + auto lastDocRes = OpTime::parseFromOplogEntry(documents.back()); if (!lastDocRes.isOK()) { _finishCallback(lastDocRes.getStatus()); return; } auto lastDoc = lastDocRes.getValue(); LOG(3) << _getComponentName() - << " setting last fetched optime ahead after batch: " << lastDoc.opTime - << "; hash: " << lastDoc.value; + << " setting last fetched optime ahead after batch: " << lastDoc; stdx::lock_guard<stdx::mutex> lock(_mutex); _lastFetched = lastDoc; diff --git a/src/mongo/db/repl/abstract_oplog_fetcher.h b/src/mongo/db/repl/abstract_oplog_fetcher.h index 9631814b543..27e7ef68e22 100644 --- a/src/mongo/db/repl/abstract_oplog_fetcher.h +++ b/src/mongo/db/repl/abstract_oplog_fetcher.h @@ -43,11 +43,6 @@ namespace mongo { namespace repl { /** - * Used to keep track of the OpTime and hash of the last fetched operation. - */ -using OpTimeWithHash = OpTimeWith<long long>; - -/** * This class represents an abstract base class for replication components that try to read from * remote oplogs. An abstract oplog fetcher is an abstract async component. It owns a Fetcher * that fetches operations from a remote oplog and restarts from the last fetched oplog entry on @@ -73,15 +68,10 @@ public: using OnShutdownCallbackFn = stdx::function<void(const Status& shutdownStatus)>; /** - * This function takes a BSONObj oplog entry and parses out the OpTime and hash. - */ - static StatusWith<OpTimeWithHash> parseOpTimeWithHash(const BSONObj& oplogEntryObj); - - /** * Invariants if validation fails on any of the provided arguments. */ AbstractOplogFetcher(executor::TaskExecutor* executor, - OpTimeWithHash lastFetched, + OpTime lastFetched, HostAndPort source, NamespaceString nss, std::size_t maxFetcherRestarts, @@ -107,9 +97,9 @@ public: BSONObj getFindQuery_forTest() const; /** - * Returns the OpTime and hash of the last oplog entry fetched and processed. + * Returns the OpTime of the last oplog entry fetched and processed. */ - OpTimeWithHash getLastOpTimeWithHashFetched_forTest() const; + OpTime getLastOpTimeFetched_forTest() const; protected: /** @@ -141,9 +131,9 @@ protected: NamespaceString _getNamespace() const; /** - * Returns the OpTime and hash of the last oplog entry fetched and processed. + * Returns the OpTime of the last oplog entry fetched and processed. */ - OpTimeWithHash _getLastOpTimeWithHashFetched() const; + OpTime _getLastOpTimeFetched() const; // =============== AbstractAsyncComponent overrides ================ @@ -230,7 +220,7 @@ private: OnShutdownCallbackFn _onShutdownCallbackFn; // Used to keep track of the last oplog entry read and processed from the sync source. - OpTimeWithHash _lastFetched; + OpTime _lastFetched; // Fetcher restarts since the last successful oplog query response. std::size_t _fetcherRestarts = 0; diff --git a/src/mongo/db/repl/abstract_oplog_fetcher_test.cpp b/src/mongo/db/repl/abstract_oplog_fetcher_test.cpp index ebd840e36cf..330f7c1e199 100644 --- a/src/mongo/db/repl/abstract_oplog_fetcher_test.cpp +++ b/src/mongo/db/repl/abstract_oplog_fetcher_test.cpp @@ -62,7 +62,7 @@ const Milliseconds kNetworkTimeoutBufferMS{5000}; class MockOplogFetcher : public AbstractOplogFetcher { public: explicit MockOplogFetcher(executor::TaskExecutor* executor, - OpTimeWithHash lastFetched, + OpTime lastFetched, HostAndPort source, NamespaceString nss, std::size_t maxFetcherRestarts, @@ -93,7 +93,7 @@ private: }; MockOplogFetcher::MockOplogFetcher(executor::TaskExecutor* executor, - OpTimeWithHash lastFetched, + OpTime lastFetched, HostAndPort source, NamespaceString nss, std::size_t maxFetcherRestarts, @@ -141,15 +141,15 @@ TEST_F(AbstractOplogFetcherTest, ShuttingExecutorDownShouldPreventOplogFetcherFr MockOplogFetcher oplogFetcher(&getExecutor(), lastFetched, source, nss, 0, [](Status) {}); - // Last optime and hash fetched should match values passed to constructor. - ASSERT_EQUALS(lastFetched, oplogFetcher.getLastOpTimeWithHashFetched_forTest()); + // Last optime fetched should match values passed to constructor. + ASSERT_EQUALS(lastFetched, oplogFetcher.getLastOpTimeFetched_forTest()); ASSERT_FALSE(oplogFetcher.isActive()); ASSERT_EQUALS(ErrorCodes::ShutdownInProgress, oplogFetcher.startup()); ASSERT_FALSE(oplogFetcher.isActive()); - // Last optime and hash fetched should not change. - ASSERT_EQUALS(lastFetched, oplogFetcher.getLastOpTimeWithHashFetched_forTest()); + // Last optime fetched should not change. + ASSERT_EQUALS(lastFetched, oplogFetcher.getLastOpTimeFetched_forTest()); } TEST_F(AbstractOplogFetcherTest, StartupReturnsOperationFailedIfExecutorFailsToScheduleFetcher) { @@ -225,22 +225,18 @@ TEST_F(AbstractOplogFetcherTest, OplogFetcherReturnsCallbackCanceledIfShutdownAf ASSERT_EQUALS(ErrorCodes::CallbackCanceled, shutdownState.getStatus()); } -long long _getHash(const BSONObj& oplogEntry) { - return oplogEntry["h"].numberLong(); -} - Timestamp _getTimestamp(const BSONObj& oplogEntry) { return OplogEntry(oplogEntry).getOpTime().getTimestamp(); } -OpTimeWithHash _getOpTimeWithHash(const BSONObj& oplogEntry) { - return {_getHash(oplogEntry), OplogEntry(oplogEntry).getOpTime()}; +OpTime _getOpTime(const BSONObj& oplogEntry) { + return OplogEntry(oplogEntry).getOpTime(); } std::vector<BSONObj> _generateOplogEntries(std::size_t size) { std::vector<BSONObj> ops(size); for (std::size_t i = 0; i < size; ++i) { - ops[i] = AbstractOplogFetcherTest::makeNoopOplogEntry(Seconds(100 + int(i)), 123LL); + ops[i] = AbstractOplogFetcherTest::makeNoopOplogEntry(Seconds(100 + int(i))); } return ops; } @@ -262,7 +258,7 @@ TEST_F(AbstractOplogFetcherTest, std::size_t maxFetcherRestarts = 1U; auto shutdownState = stdx::make_unique<ShutdownState>(); MockOplogFetcher oplogFetcher(&getExecutor(), - _getOpTimeWithHash(ops[0]), + _getOpTime(ops[0]), source, nss, maxFetcherRestarts, @@ -294,7 +290,7 @@ TEST_F(AbstractOplogFetcherTest, OplogFetcherStopsRestartingFetcherIfRestartLimi std::size_t maxFetcherRestarts = 2U; auto shutdownState = stdx::make_unique<ShutdownState>(); MockOplogFetcher oplogFetcher(&getExecutor(), - _getOpTimeWithHash(ops[0]), + _getOpTime(ops[0]), source, nss, maxFetcherRestarts, @@ -330,7 +326,7 @@ TEST_F(AbstractOplogFetcherTest, OplogFetcherResetsRestartCounterOnSuccessfulFet std::size_t maxFetcherRestarts = 2U; auto shutdownState = stdx::make_unique<ShutdownState>(); MockOplogFetcher oplogFetcher(&getExecutor(), - _getOpTimeWithHash(ops[0]), + _getOpTime(ops[0]), source, nss, maxFetcherRestarts, @@ -401,7 +397,7 @@ TEST_F(AbstractOplogFetcherTest, return shouldFailSchedule; }); MockOplogFetcher oplogFetcher(&_executorProxy, - _getOpTimeWithHash(ops[0]), + _getOpTime(ops[0]), source, nss, maxFetcherRestarts, @@ -432,7 +428,7 @@ TEST_F(AbstractOplogFetcherTest, OplogFetcherTimesOutCorrectlyOnInitialFindReque std::size_t maxFetcherRestarts = 0U; auto shutdownState = stdx::make_unique<ShutdownState>(); MockOplogFetcher oplogFetcher(&getExecutor(), - _getOpTimeWithHash(ops[0]), + _getOpTime(ops[0]), source, nss, maxFetcherRestarts, @@ -470,7 +466,7 @@ TEST_F(AbstractOplogFetcherTest, OplogFetcherTimesOutCorrectlyOnRetriedFindReque std::size_t maxFetcherRestarts = 1U; auto shutdownState = stdx::make_unique<ShutdownState>(); MockOplogFetcher oplogFetcher(&getExecutor(), - _getOpTimeWithHash(ops[0]), + _getOpTime(ops[0]), source, nss, maxFetcherRestarts, diff --git a/src/mongo/db/repl/abstract_oplog_fetcher_test_fixture.cpp b/src/mongo/db/repl/abstract_oplog_fetcher_test_fixture.cpp index 357151b0100..d0e17213d36 100644 --- a/src/mongo/db/repl/abstract_oplog_fetcher_test_fixture.cpp +++ b/src/mongo/db/repl/abstract_oplog_fetcher_test_fixture.cpp @@ -45,12 +45,11 @@ namespace { * Creates an OplogEntry using given field values. */ repl::OplogEntry makeOplogEntry(repl::OpTime opTime, - long long hash, repl::OpTypeEnum opType, NamespaceString nss, BSONObj object) { return repl::OplogEntry(opTime, // optime - hash, // hash + boost::none, // hash opType, // opType nss, // namespace boost::none, // uuid @@ -79,21 +78,12 @@ void ShutdownState::operator()(const Status& status) { _status = status; } -BSONObj AbstractOplogFetcherTest::makeNoopOplogEntry(OpTimeWithHash opTimeWithHash) { - return makeOplogEntry(opTimeWithHash.opTime, - opTimeWithHash.value, - OpTypeEnum::kNoop, - NamespaceString("test.t"), - BSONObj()) - .toBSON(); +BSONObj AbstractOplogFetcherTest::makeNoopOplogEntry(OpTime opTime) { + return makeOplogEntry(opTime, OpTypeEnum::kNoop, NamespaceString("test.t"), BSONObj()).toBSON(); } -BSONObj AbstractOplogFetcherTest::makeNoopOplogEntry(OpTime opTime, long long hash) { - return makeNoopOplogEntry({hash, opTime}); -} - -BSONObj AbstractOplogFetcherTest::makeNoopOplogEntry(Seconds seconds, long long hash) { - return makeNoopOplogEntry({{seconds, 0}, 1LL}, hash); +BSONObj AbstractOplogFetcherTest::makeNoopOplogEntry(Seconds seconds) { + return makeNoopOplogEntry({{seconds, 0}, 1LL}); } BSONObj AbstractOplogFetcherTest::makeCursorResponse(CursorId cursorId, @@ -121,7 +111,7 @@ void AbstractOplogFetcherTest::setUp() { executor::ThreadPoolExecutorTest::setUp(); launchExecutorThread(); - lastFetched = {456LL, {{123, 0}, 1}}; + lastFetched = {{123, 0}, 1}; } executor::RemoteCommandRequest AbstractOplogFetcherTest::processNetworkResponse( diff --git a/src/mongo/db/repl/abstract_oplog_fetcher_test_fixture.h b/src/mongo/db/repl/abstract_oplog_fetcher_test_fixture.h index 2c4aa84032a..e11e2d6f4d5 100644 --- a/src/mongo/db/repl/abstract_oplog_fetcher_test_fixture.h +++ b/src/mongo/db/repl/abstract_oplog_fetcher_test_fixture.h @@ -68,9 +68,8 @@ public: /** * Static functions for creating noop oplog entries. */ - static BSONObj makeNoopOplogEntry(OpTimeWithHash opTimeWithHash); - static BSONObj makeNoopOplogEntry(OpTime opTime, long long hash); - static BSONObj makeNoopOplogEntry(Seconds seconds, long long hash); + static BSONObj makeNoopOplogEntry(OpTime opTime); + static BSONObj makeNoopOplogEntry(Seconds seconds); /** * A static function for creating the response to a cursor. If it's the last batch, the @@ -94,8 +93,8 @@ protected: executor::RemoteCommandRequest processNetworkResponse( BSONObj obj, bool expectReadyRequestsAfterProcessing = false); - // The last OpTime and hash fetched by the oplog fetcher. - OpTimeWithHash lastFetched; + // The last OpTime fetched by the oplog fetcher. + OpTime lastFetched; }; } // namespace repl } // namespace mango diff --git a/src/mongo/db/repl/apply_ops_test.cpp b/src/mongo/db/repl/apply_ops_test.cpp index e224a736675..4efcbdef1b5 100644 --- a/src/mongo/db/repl/apply_ops_test.cpp +++ b/src/mongo/db/repl/apply_ops_test.cpp @@ -353,7 +353,7 @@ TEST_F(ApplyOpsTest, ApplyOpsPropagatesOplogApplicationMode) { */ OplogEntry makeOplogEntry(OpTypeEnum opType, const BSONObj& oField) { return OplogEntry(OpTime(Timestamp(1, 1), 1), // optime - 1LL, // hash + boost::none, // hash opType, // op type NamespaceString("a.a"), // namespace boost::none, // uuid diff --git a/src/mongo/db/repl/bgsync.cpp b/src/mongo/db/repl/bgsync.cpp index ca02b597752..626ccd9e6d0 100644 --- a/src/mongo/db/repl/bgsync.cpp +++ b/src/mongo/db/repl/bgsync.cpp @@ -70,7 +70,6 @@ using std::string; namespace repl { namespace { -const char kHashFieldName[] = "h"; const int kSleepToAllowBatchingMillis = 2; const int kSmallBatchLimitBytes = 40000; const Milliseconds kRollbackOplogSocketTimeout(10 * 60 * 1000); @@ -403,14 +402,12 @@ void BackgroundSync::_produce() { } } - long long lastHashFetched; { stdx::lock_guard<stdx::mutex> lock(_mutex); if (_state != ProducerState::Running) { return; } lastOpTimeFetched = _lastOpTimeFetched; - lastHashFetched = _lastFetchedHash; } if (!_replCoord->getMemberState().primary()) { @@ -441,7 +438,7 @@ void BackgroundSync::_produce() { // replication coordinator. auto oplogFetcherPtr = stdx::make_unique<OplogFetcher>( _replicationCoordinatorExternalState->getTaskExecutor(), - OpTimeWithHash(lastHashFetched, lastOpTimeFetched), + lastOpTimeFetched, source, NamespaceString::kRsOplogNamespace, _replCoord->getConfig(), @@ -539,8 +536,7 @@ Status BackgroundSync::_enqueueDocuments(Fetcher::Documents::const_iterator begi _oplogApplier->enqueue(opCtx.get(), begin, end); // Update last fetched info. - _lastFetchedHash = info.lastDocument.value; - _lastOpTimeFetched = info.lastDocument.opTime; + _lastOpTimeFetched = info.lastDocument; LOG(3) << "batch resetting _lastOpTimeFetched: " << _lastOpTimeFetched; } @@ -713,7 +709,6 @@ void BackgroundSync::stop(bool resetLastFetchedOptime) { if (resetLastFetchedOptime) { invariant(_oplogApplier->getBuffer()->isEmpty()); _lastOpTimeFetched = OpTime(); - _lastFetchedHash = 0; log() << "Resetting last fetched optimes in bgsync"; } @@ -727,14 +722,14 @@ void BackgroundSync::stop(bool resetLastFetchedOptime) { } void BackgroundSync::start(OperationContext* opCtx) { - OpTimeWithHash lastAppliedOpTimeWithHash; + OpTime lastAppliedOpTime; ShouldNotConflictWithSecondaryBatchApplicationBlock noConflict(opCtx->lockState()); // Explicitly start future read transactions without a timestamp. opCtx->recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kNoTimestamp); do { - lastAppliedOpTimeWithHash = _readLastAppliedOpTimeWithHash(opCtx); + lastAppliedOpTime = _readLastAppliedOpTime(opCtx); stdx::lock_guard<stdx::mutex> lk(_mutex); // Double check the state after acquiring the mutex. if (_state != ProducerState::Starting) { @@ -749,33 +744,30 @@ void BackgroundSync::start(OperationContext* opCtx) { // When a node steps down during drain mode, the last fetched optime would be newer than // the last applied. - if (_lastOpTimeFetched <= lastAppliedOpTimeWithHash.opTime) { - LOG(1) << "Setting bgsync _lastOpTimeFetched=" << lastAppliedOpTimeWithHash.opTime - << " and _lastFetchedHash=" << lastAppliedOpTimeWithHash.value + if (_lastOpTimeFetched <= lastAppliedOpTime) { + LOG(1) << "Setting bgsync _lastOpTimeFetched=" << lastAppliedOpTime << ". Previous _lastOpTimeFetched: " << _lastOpTimeFetched; - _lastOpTimeFetched = lastAppliedOpTimeWithHash.opTime; - _lastFetchedHash = lastAppliedOpTimeWithHash.value; + _lastOpTimeFetched = lastAppliedOpTime; } // Reload the last applied optime from disk if it has been changed. - } while (lastAppliedOpTimeWithHash.opTime != _replCoord->getMyLastAppliedOpTime()); + } while (lastAppliedOpTime != _replCoord->getMyLastAppliedOpTime()); - LOG(1) << "bgsync fetch queue set to: " << _lastOpTimeFetched << " " << _lastFetchedHash; + LOG(1) << "bgsync fetch queue set to: " << _lastOpTimeFetched; } -OpTimeWithHash BackgroundSync::_readLastAppliedOpTimeWithHash(OperationContext* opCtx) { +OpTime BackgroundSync::_readLastAppliedOpTime(OperationContext* opCtx) { BSONObj oplogEntry; try { bool success = writeConflictRetry( - opCtx, "readLastAppliedHash", NamespaceString::kRsOplogNamespace.ns(), [&] { + opCtx, "readLastAppliedOpTime", NamespaceString::kRsOplogNamespace.ns(), [&] { Lock::DBLock lk(opCtx, "local", MODE_X); return Helpers::getLast( opCtx, NamespaceString::kRsOplogNamespace.ns().c_str(), oplogEntry); }); if (!success) { - // This can happen when we are to do an initial sync. lastHash will be set - // after the initial sync is complete. - return OpTimeWithHash(0); + // This can happen when we are to do an initial sync. + return OpTime(); } } catch (const ExceptionForCat<ErrorCategory::ShutdownError>&) { throw; @@ -784,19 +776,10 @@ OpTimeWithHash BackgroundSync::_readLastAppliedOpTimeWithHash(OperationContext* << redact(ex); fassertFailed(18904); } - long long hash; - auto status = bsonExtractIntegerField(oplogEntry, kHashFieldName, &hash); - if (!status.isOK()) { - severe() << "Most recent entry in " << NamespaceString::kRsOplogNamespace.ns() - << " is missing or has invalid \"" << kHashFieldName - << "\" field. Oplog entry: " << redact(oplogEntry) << ": " << redact(status); - fassertFailed(18902); - } OplogEntry parsedEntry(oplogEntry); - auto lastOptime = OpTimeWithHash(hash, parsedEntry.getOpTime()); LOG(1) << "Successfully read last entry of oplog while starting bgsync: " << redact(oplogEntry); - return lastOptime; + return parsedEntry.getOpTime(); } bool BackgroundSync::shouldStopFetching() const { diff --git a/src/mongo/db/repl/bgsync.h b/src/mongo/db/repl/bgsync.h index 2a92a59a85c..b310417dcc1 100644 --- a/src/mongo/db/repl/bgsync.h +++ b/src/mongo/db/repl/bgsync.h @@ -201,7 +201,7 @@ private: // restart syncing void start(OperationContext* opCtx); - OpTimeWithHash _readLastAppliedOpTimeWithHash(OperationContext* opCtx); + OpTime _readLastAppliedOpTime(OperationContext* opCtx); // This OplogApplier applies oplog entries fetched from the sync source. OplogApplier* const _oplogApplier; @@ -235,9 +235,6 @@ private: OpTime _lastOpTimeFetched; // (M) - // lastFetchedHash is used to match ops to determine if we need to rollback, when a secondary. - long long _lastFetchedHash = 0LL; // (M) - // Thread running producerThread(). std::unique_ptr<stdx::thread> _producerThread; // (M) diff --git a/src/mongo/db/repl/idempotency_test_fixture.cpp b/src/mongo/db/repl/idempotency_test_fixture.cpp index 617252d0418..1c2007ea647 100644 --- a/src/mongo/db/repl/idempotency_test_fixture.cpp +++ b/src/mongo/db/repl/idempotency_test_fixture.cpp @@ -76,7 +76,7 @@ repl::OplogEntry makeOplogEntry(repl::OpTime opTime, boost::optional<StmtId> stmtId = boost::none, boost::optional<UUID> uuid = boost::none) { return repl::OplogEntry(opTime, // optime - 1LL, // hash + boost::none, // hash opType, // opType nss, // namespace uuid, // uuid diff --git a/src/mongo/db/repl/initial_syncer.cpp b/src/mongo/db/repl/initial_syncer.cpp index 7937ae81fdd..1da59d432c3 100644 --- a/src/mongo/db/repl/initial_syncer.cpp +++ b/src/mongo/db/repl/initial_syncer.cpp @@ -159,15 +159,14 @@ StatusWith<Timestamp> parseTimestampStatus(const QueryResponseStatus& fetchResul } } -StatusWith<OpTimeWithHash> parseOpTimeWithHash(const QueryResponseStatus& fetchResult) { +StatusWith<OpTime> parseOpTime(const QueryResponseStatus& fetchResult) { if (!fetchResult.isOK()) { return fetchResult.getStatus(); } const auto docs = fetchResult.getValue().documents; const auto hasDoc = docs.begin() != docs.end(); - return hasDoc - ? AbstractOplogFetcher::parseOpTimeWithHash(docs.front()) - : StatusWith<OpTimeWithHash>{ErrorCodes::NoMatchingDocument, "no oplog entry found"}; + return hasDoc ? OpTime::parseFromOplogEntry(docs.front()) + : StatusWith<OpTime>{ErrorCodes::NoMatchingDocument, "no oplog entry found"}; } /** @@ -408,7 +407,7 @@ void InitialSyncer::_setUp_inlock(OperationContext* opCtx, std::uint32_t initial } void InitialSyncer::_tearDown_inlock(OperationContext* opCtx, - const StatusWith<OpTimeWithHash>& lastApplied) { + const StatusWith<OpTime>& lastApplied) { _stats.initialSyncEnd = _exec->now(); // This might not be necessary if we failed initial sync. @@ -424,22 +423,21 @@ void InitialSyncer::_tearDown_inlock(OperationContext* opCtx, // during initial sync. Correct the visibility to match the initial sync time before // transitioning to steady state replication. const bool orderedCommit = true; - _storage->oplogDiskLocRegister( - opCtx, lastApplied.getValue().opTime.getTimestamp(), orderedCommit); + _storage->oplogDiskLocRegister(opCtx, lastApplied.getValue().getTimestamp(), orderedCommit); _replicationProcess->getConsistencyMarkers()->clearInitialSyncFlag(opCtx); // All updates that represent initial sync must be completed before setting the initial data // timestamp. _storage->setInitialDataTimestamp(opCtx->getServiceContext(), - lastApplied.getValue().opTime.getTimestamp()); + lastApplied.getValue().getTimestamp()); auto currentLastAppliedOpTime = _opts.getMyLastOptime(); if (currentLastAppliedOpTime.isNull()) { - _opts.setMyLastOptime(lastApplied.getValue().opTime, + _opts.setMyLastOptime(lastApplied.getValue(), ReplicationCoordinator::DataConsistency::Consistent); } else { - invariant(currentLastAppliedOpTime == lastApplied.getValue().opTime); + invariant(currentLastAppliedOpTime == lastApplied.getValue()); } log() << "initial sync done; took " @@ -466,7 +464,7 @@ void InitialSyncer::_startInitialSyncAttemptCallback( // This completion guard invokes _finishInitialSyncAttempt on destruction. auto cancelRemainingWorkInLock = [this]() { _cancelRemainingWork_inlock(); }; - auto finishInitialSyncAttemptFn = [this](const StatusWith<OpTimeWithHash>& lastApplied) { + auto finishInitialSyncAttemptFn = [this](const StatusWith<OpTime>& lastApplied) { _finishInitialSyncAttempt(lastApplied); }; auto onCompletionGuard = @@ -672,14 +670,14 @@ void InitialSyncer::_lastOplogEntryFetcherCallbackForBeginTimestamp( return; } - const auto opTimeWithHashResult = parseOpTimeWithHash(result); - status = opTimeWithHashResult.getStatus(); + const auto opTimeResult = parseOpTime(result); + status = opTimeResult.getStatus(); if (!status.isOK()) { onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status); return; } - const auto& lastOpTimeWithHash = opTimeWithHashResult.getValue(); + const auto& lastOpTime = opTimeResult.getValue(); BSONObjBuilder queryBob; queryBob.append("find", NamespaceString::kServerConfigurationNamespace.coll()); @@ -691,26 +689,26 @@ void InitialSyncer::_lastOplogEntryFetcherCallbackForBeginTimestamp( // When document locking is used, there could be oplog "holes" which would result in // inconsistent initial sync data if we didn't do this. auto readConcernBob = BSONObjBuilder(queryBob.subobjStart("readConcern")); - readConcernBob.append("afterClusterTime", lastOpTimeWithHash.opTime.getTimestamp()); + readConcernBob.append("afterClusterTime", lastOpTime.getTimestamp()); readConcernBob.done(); - _fCVFetcher = stdx::make_unique<Fetcher>( - _exec, - _syncSource, - NamespaceString::kServerConfigurationNamespace.db().toString(), - queryBob.obj(), - [=](const StatusWith<mongo::Fetcher::QueryResponse>& response, - mongo::Fetcher::NextAction*, - mongo::BSONObjBuilder*) { - _fcvFetcherCallback(response, onCompletionGuard, lastOpTimeWithHash); - }, - ReadPreferenceSetting::secondaryPreferredMetadata(), - RemoteCommandRequest::kNoTimeout /* find network timeout */, - RemoteCommandRequest::kNoTimeout /* getMore network timeout */, - RemoteCommandRetryScheduler::makeRetryPolicy( - numInitialSyncOplogFindAttempts.load(), - executor::RemoteCommandRequest::kNoTimeout, - RemoteCommandRetryScheduler::kAllRetriableErrors)); + _fCVFetcher = + stdx::make_unique<Fetcher>(_exec, + _syncSource, + NamespaceString::kServerConfigurationNamespace.db().toString(), + queryBob.obj(), + [=](const StatusWith<mongo::Fetcher::QueryResponse>& response, + mongo::Fetcher::NextAction*, + mongo::BSONObjBuilder*) { + _fcvFetcherCallback(response, onCompletionGuard, lastOpTime); + }, + ReadPreferenceSetting::secondaryPreferredMetadata(), + RemoteCommandRequest::kNoTimeout /* find network timeout */, + RemoteCommandRequest::kNoTimeout /* getMore network timeout */, + RemoteCommandRetryScheduler::makeRetryPolicy( + numInitialSyncOplogFindAttempts.load(), + executor::RemoteCommandRequest::kNoTimeout, + RemoteCommandRetryScheduler::kAllRetriableErrors)); Status scheduleStatus = _fCVFetcher->schedule(); if (!scheduleStatus.isOK()) { _fCVFetcher.reset(); @@ -721,7 +719,7 @@ void InitialSyncer::_lastOplogEntryFetcherCallbackForBeginTimestamp( void InitialSyncer::_fcvFetcherCallback(const StatusWith<Fetcher::QueryResponse>& result, std::shared_ptr<OnCompletionGuard> onCompletionGuard, - const OpTimeWithHash& lastOpTimeWithHash) { + const OpTime& lastOpTime) { stdx::unique_lock<stdx::mutex> lock(_mutex); auto status = _checkForShutdownAndConvertStatus_inlock( result.getStatus(), "error while getting the remote feature compatibility version"); @@ -789,7 +787,7 @@ void InitialSyncer::_fcvFetcherCallback(const StatusWith<Fetcher::QueryResponse> _databasesClonerCallback(status, onCompletionGuard); })); - _initialSyncState->beginTimestamp = lastOpTimeWithHash.opTime.getTimestamp(); + _initialSyncState->beginTimestamp = lastOpTime.getTimestamp(); invariant(!result.getValue().documents.empty()); LOG(2) << "Setting begin timestamp to " << _initialSyncState->beginTimestamp @@ -808,7 +806,7 @@ void InitialSyncer::_fcvFetcherCallback(const StatusWith<Fetcher::QueryResponse> const auto& config = configResult.getValue(); _oplogFetcher = stdx::make_unique<OplogFetcher>( _exec, - lastOpTimeWithHash, + lastOpTime, _syncSource, _opts.remoteOplogNS, config, @@ -873,7 +871,7 @@ void InitialSyncer::_oplogFetcherCallback(const Status& oplogFetcherFinishStatus std::shared_ptr<OnCompletionGuard> onCompletionGuard) { stdx::lock_guard<stdx::mutex> lock(_mutex); log() << "Finished fetching oplog during initial sync: " << redact(oplogFetcherFinishStatus) - << ". Last fetched optime and hash: " << _lastFetched.toString(); + << ". Last fetched optime: " << _lastFetched.toString(); auto status = _checkForShutdownAndConvertStatus_inlock( oplogFetcherFinishStatus, "error fetching oplog during initial sync"); @@ -888,7 +886,7 @@ void InitialSyncer::_oplogFetcherCallback(const Status& oplogFetcherFinishStatus // an OK status is when the 'stopReplProducer' fail point is enabled, which causes the // OplogFetcher to ignore the current sync source response and return early. if (status.isOK()) { - log() << "Finished fetching oplog fetching early. Last fetched optime and hash: " + log() << "Finished fetching oplog fetching early. Last fetched optime: " << _lastFetched.toString(); return; } @@ -930,7 +928,7 @@ void InitialSyncer::_databasesClonerCallback(const Status& databaseClonerFinishS void InitialSyncer::_lastOplogEntryFetcherCallbackForStopTimestamp( const StatusWith<Fetcher::QueryResponse>& result, std::shared_ptr<OnCompletionGuard> onCompletionGuard) { - OpTimeWithHash optimeWithHash; + OpTime optime; { stdx::lock_guard<stdx::mutex> lock(_mutex); auto status = _checkForShutdownAndConvertStatus_inlock( @@ -940,17 +938,17 @@ void InitialSyncer::_lastOplogEntryFetcherCallbackForStopTimestamp( return; } - auto&& optimeWithHashStatus = parseOpTimeWithHash(result); - if (!optimeWithHashStatus.isOK()) { - onCompletionGuard->setResultAndCancelRemainingWork_inlock( - lock, optimeWithHashStatus.getStatus()); + auto&& optimeStatus = parseOpTime(result); + if (!optimeStatus.isOK()) { + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, + optimeStatus.getStatus()); return; } - optimeWithHash = optimeWithHashStatus.getValue(); - _initialSyncState->stopTimestamp = optimeWithHash.opTime.getTimestamp(); + optime = optimeStatus.getValue(); + _initialSyncState->stopTimestamp = optime.getTimestamp(); if (_initialSyncState->beginTimestamp != _initialSyncState->stopTimestamp) { - invariant(_lastApplied.opTime.isNull()); + invariant(_lastApplied.isNull()); _checkApplierProgressAndScheduleGetNextApplierBatch_inlock(lock, onCompletionGuard); return; } @@ -969,23 +967,22 @@ void InitialSyncer::_lastOplogEntryFetcherCallbackForStopTimestamp( // override its behavior in tests. See InitialSyncerReturnsCallbackCanceledAndDoesNot- // ScheduleRollbackCheckerIfShutdownAfterInsertingInsertOplogSeedDocument in // initial_syncer_test.cpp - auto status = _storage->insertDocument( - opCtx.get(), - _opts.localOplogNS, - TimestampedBSONObj{oplogSeedDoc, optimeWithHash.opTime.getTimestamp()}, - optimeWithHash.opTime.getTerm()); + auto status = + _storage->insertDocument(opCtx.get(), + _opts.localOplogNS, + TimestampedBSONObj{oplogSeedDoc, optime.getTimestamp()}, + optime.getTerm()); if (!status.isOK()) { stdx::lock_guard<stdx::mutex> lock(_mutex); onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, status); return; } const bool orderedCommit = true; - _storage->oplogDiskLocRegister( - opCtx.get(), optimeWithHash.opTime.getTimestamp(), orderedCommit); + _storage->oplogDiskLocRegister(opCtx.get(), optime.getTimestamp(), orderedCommit); } stdx::lock_guard<stdx::mutex> lock(_mutex); - _lastApplied = optimeWithHash; + _lastApplied = optime; log() << "No need to apply operations. (currently at " << _initialSyncState->stopTimestamp.toBSON() << ")"; @@ -1033,8 +1030,7 @@ void InitialSyncer::_getNextApplierBatchCallback( MultiApplier::Operations ops) { return _oplogApplier->multiApply(opCtx, std::move(ops)); }; - const auto& lastEntry = ops.back(); - OpTimeWithHash lastApplied(lastEntry.getHash(), lastEntry.getOpTime()); + OpTime lastApplied = ops.back().getOpTime(); auto numApplied = ops.size(); MultiApplier::CallbackFn onCompletionFn = [=](const Status& s) { return _multiApplierCallback(s, lastApplied, numApplied, onCompletionGuard); @@ -1081,7 +1077,7 @@ void InitialSyncer::_getNextApplierBatchCallback( } void InitialSyncer::_multiApplierCallback(const Status& multiApplierStatus, - OpTimeWithHash lastApplied, + OpTime lastApplied, std::uint32_t numApplied, std::shared_ptr<OnCompletionGuard> onCompletionGuard) { stdx::lock_guard<stdx::mutex> lock(_mutex); @@ -1095,8 +1091,7 @@ void InitialSyncer::_multiApplierCallback(const Status& multiApplierStatus, _initialSyncState->appliedOps += numApplied; _lastApplied = lastApplied; - _opts.setMyLastOptime(_lastApplied.opTime, - ReplicationCoordinator::DataConsistency::Inconsistent); + _opts.setMyLastOptime(_lastApplied, ReplicationCoordinator::DataConsistency::Inconsistent); auto fetchCount = _fetchCount.load(); if (fetchCount > 0) { @@ -1132,17 +1127,16 @@ void InitialSyncer::_lastOplogEntryFetcherCallbackAfterFetchingMissingDocuments( return; } - auto&& optimeWithHashStatus = parseOpTimeWithHash(result); - if (!optimeWithHashStatus.isOK()) { + auto&& optimeStatus = parseOpTime(result); + if (!optimeStatus.isOK()) { error() << "Failed to parse new minValid from source " << _syncSource << " due to '" - << redact(optimeWithHashStatus.getStatus()) << "'"; - onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, - optimeWithHashStatus.getStatus()); + << redact(optimeStatus.getStatus()) << "'"; + onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, optimeStatus.getStatus()); return; } - auto&& optimeWithHash = optimeWithHashStatus.getValue(); + auto&& optime = optimeStatus.getValue(); - const auto newOplogEnd = optimeWithHash.opTime.getTimestamp(); + const auto newOplogEnd = optime.getTimestamp(); LOG(2) << "Pushing back minValid from " << _initialSyncState->stopTimestamp << " to " << newOplogEnd; _initialSyncState->stopTimestamp = newOplogEnd; @@ -1189,7 +1183,7 @@ void InitialSyncer::_rollbackCheckerCheckForRollbackCallback( onCompletionGuard->setResultAndCancelRemainingWork_inlock(lock, _lastApplied); } -void InitialSyncer::_finishInitialSyncAttempt(const StatusWith<OpTimeWithHash>& lastApplied) { +void InitialSyncer::_finishInitialSyncAttempt(const StatusWith<OpTime>& lastApplied) { // Since _finishInitialSyncAttempt can be called from any component's callback function or // scheduled task, it is possible that we may not be in a TaskExecutor-managed thread when this // function is invoked. @@ -1276,7 +1270,7 @@ void InitialSyncer::_finishInitialSyncAttempt(const StatusWith<OpTimeWithHash>& finishCallbackGuard.dismiss(); } -void InitialSyncer::_finishCallback(StatusWith<OpTimeWithHash> lastApplied) { +void InitialSyncer::_finishCallback(StatusWith<OpTime> lastApplied) { // After running callback function, clear '_onCompletion' to release any resources that might be // held by this function object. // '_onCompletion' must be moved to a temporary copy and destroyed outside the lock in case @@ -1372,16 +1366,16 @@ void InitialSyncer::_checkApplierProgressAndScheduleGetNextApplierBatch_inlock( return; } - if (_lastApplied.opTime.isNull()) { + if (_lastApplied.isNull()) { // Check if any ops occurred while cloning. invariant(_initialSyncState->beginTimestamp < _initialSyncState->stopTimestamp); log() << "Applying operations until " << _initialSyncState->stopTimestamp.toBSON() << " before initial sync can complete. (starting at " << _initialSyncState->beginTimestamp.toBSON() << ")"; // Fall through to scheduling _getNextApplierBatchCallback(). - } else if (_lastApplied.opTime.getTimestamp() >= _initialSyncState->stopTimestamp) { + } else if (_lastApplied.getTimestamp() >= _initialSyncState->stopTimestamp) { // Check for rollback if we have applied far enough to be consistent. - invariant(!_lastApplied.opTime.getTimestamp().isNull()); + invariant(!_lastApplied.getTimestamp().isNull()); _scheduleRollbackCheckerCheckForRollback_inlock(lock, onCompletionGuard); return; } @@ -1530,11 +1524,11 @@ StatusWith<Operations> InitialSyncer::_getNextApplierBatch_inlock() { } StatusWith<HostAndPort> InitialSyncer::_chooseSyncSource_inlock() { - auto syncSource = _opts.syncSourceSelector->chooseNewSyncSource(_lastFetched.opTime); + auto syncSource = _opts.syncSourceSelector->chooseNewSyncSource(_lastFetched); if (syncSource.empty()) { return Status{ErrorCodes::InvalidSyncSource, str::stream() << "No valid sync source available. Our last fetched optime: " - << _lastFetched.opTime.toString()}; + << _lastFetched.toString()}; } return syncSource; } diff --git a/src/mongo/db/repl/initial_syncer.h b/src/mongo/db/repl/initial_syncer.h index 308fdb6adaf..c84658092fb 100644 --- a/src/mongo/db/repl/initial_syncer.h +++ b/src/mongo/db/repl/initial_syncer.h @@ -144,14 +144,14 @@ class InitialSyncer { public: /** - * Callback function to report last applied optime (with hash) of initial sync. + * Callback function to report last applied optime of initial sync. */ - typedef stdx::function<void(const StatusWith<OpTimeWithHash>& lastApplied)> OnCompletionFn; + typedef stdx::function<void(const StatusWith<OpTime>& lastApplied)> OnCompletionFn; /** * Callback completion guard for initial syncer. */ - using OnCompletionGuard = CallbackCompletionGuard<StatusWith<OpTimeWithHash>>; + using OnCompletionGuard = CallbackCompletionGuard<StatusWith<OpTime>>; using StartCollectionClonerFn = DatabaseCloner::StartCollectionClonerFn; @@ -360,7 +360,7 @@ private: /** * Tears down internal state before reporting final status to caller. */ - void _tearDown_inlock(OperationContext* opCtx, const StatusWith<OpTimeWithHash>& lastApplied); + void _tearDown_inlock(OperationContext* opCtx, const StatusWith<OpTime>& lastApplied); /** * Callback to start a single initial sync attempt. @@ -410,7 +410,7 @@ private: */ void _fcvFetcherCallback(const StatusWith<Fetcher::QueryResponse>& result, std::shared_ptr<OnCompletionGuard> onCompletionGuard, - const OpTimeWithHash& lastOpTimeWithHash); + const OpTime& lastOpTime); /** * Callback for oplog fetcher. @@ -443,7 +443,7 @@ private: * Callback for MultiApplier completion. */ void _multiApplierCallback(const Status& status, - OpTimeWithHash lastApplied, + OpTime lastApplied, std::uint32_t numApplied, std::shared_ptr<OnCompletionGuard> onCompletionGuard); @@ -469,12 +469,12 @@ private: * Reports result of current initial sync attempt. May schedule another initial sync attempt * depending on shutdown state and whether we've exhausted all initial sync retries. */ - void _finishInitialSyncAttempt(const StatusWith<OpTimeWithHash>& lastApplied); + void _finishInitialSyncAttempt(const StatusWith<OpTime>& lastApplied); /** * Invokes completion callback and transitions state to State::kComplete. */ - void _finishCallback(StatusWith<OpTimeWithHash> lastApplied); + void _finishCallback(StatusWith<OpTime> lastApplied); // Obtains a valid sync source from the sync source selector. // Returns error if a sync source cannot be found. @@ -590,8 +590,8 @@ private: ReplicationProcess* _replicationProcess; // (S) // This is invoked with the final status of the initial sync. If startup() fails, this callback - // is never invoked. The caller gets the last applied optime with hash when the initial sync - // completes successfully or an error status. + // is never invoked. The caller gets the last applied optime when the initial sync completes + // successfully or an error status. // '_onCompletion' is cleared on completion (in _finishCallback()) in order to release any // resources that might be held by the callback function object. OnCompletionFn _onCompletion; // (M) @@ -620,8 +620,8 @@ private: std::unique_ptr<Fetcher> _fCVFetcher; // (S) std::unique_ptr<MultiApplier> _applier; // (M) HostAndPort _syncSource; // (M) - OpTimeWithHash _lastFetched; // (MX) - OpTimeWithHash _lastApplied; // (MX) + OpTime _lastFetched; // (MX) + OpTime _lastApplied; // (MX) std::unique_ptr<OplogBuffer> _oplogBuffer; // (M) std::unique_ptr<OplogApplier::Observer> _observer; // (S) std::unique_ptr<OplogApplier> _oplogApplier; // (M) diff --git a/src/mongo/db/repl/initial_syncer_test.cpp b/src/mongo/db/repl/initial_syncer_test.cpp index 620d9c5abff..cfd9b4bed4c 100644 --- a/src/mongo/db/repl/initial_syncer_test.cpp +++ b/src/mongo/db/repl/initial_syncer_test.cpp @@ -386,7 +386,7 @@ protected: _externalState = dataReplicatorExternalState.get(); _lastApplied = getDetectableErrorStatus(); - _onCompletion = [this](const StatusWith<OpTimeWithHash>& lastApplied) { + _onCompletion = [this](const StatusWith<OpTime>& lastApplied) { _lastApplied = lastApplied; }; @@ -400,9 +400,7 @@ protected: _dbWorkThreadPool.get(), _storageInterface.get(), _replicationProcess.get(), - [this](const StatusWith<OpTimeWithHash>& lastApplied) { - _onCompletion(lastApplied); - }); + [this](const StatusWith<OpTime>& lastApplied) { _onCompletion(lastApplied); }); _initialSyncer->setScheduleDbWorkFn_forTest( [this](executor::TaskExecutor::CallbackFn work) { return getExecutor().scheduleWork(std::move(work)); @@ -473,7 +471,7 @@ protected: std::map<NamespaceString, CollectionMockStats> _collectionStats; std::map<NamespaceString, CollectionCloneInfo> _collections; - StatusWith<OpTimeWithHash> _lastApplied = Status(ErrorCodes::NotYetInitialized, ""); + StatusWith<OpTime> _lastApplied = Status(ErrorCodes::NotYetInitialized, ""); InitialSyncer::OnCompletionFn _onCompletion; private: @@ -564,7 +562,7 @@ OplogEntry makeOplogEntry(int t, << "a_1"); } return OplogEntry(OpTime(Timestamp(t, 1), 1), // optime - static_cast<long long>(t), // hash + boost::none, // hash opType, // op type NamespaceString("a.a"), // namespace boost::none, // uuid @@ -631,7 +629,7 @@ TEST_F(InitialSyncerTest, InvalidConstruction) { ReplicationCoordinator::DataConsistency consistency) {}; options.resetOptimes = []() {}; options.syncSourceSelector = this; - auto callback = [](const StatusWith<OpTimeWithHash>&) {}; + auto callback = [](const StatusWith<OpTime>&) {}; // Null task executor in external state. { @@ -953,7 +951,7 @@ TEST_F(InitialSyncerTest, InitialSyncerTransitionsToCompleteWhenFinishCallbackTh auto initialSyncer = &getInitialSyncer(); auto opCtx = makeOpCtx(); - _onCompletion = [this](const StatusWith<OpTimeWithHash>& lastApplied) { + _onCompletion = [this](const StatusWith<OpTime>& lastApplied) { _lastApplied = lastApplied; uassert(ErrorCodes::InternalError, "", false); }; @@ -994,7 +992,7 @@ TEST_F(InitialSyncerTest, InitialSyncerResetsOnCompletionCallbackFunctionPointer _dbWorkThreadPool.get(), _storageInterface.get(), _replicationProcess.get(), - [&lastApplied, sharedCallbackData](const StatusWith<OpTimeWithHash>& result) { + [&lastApplied, sharedCallbackData](const StatusWith<OpTime>& result) { lastApplied = result; }); ON_BLOCK_EXIT([this]() { getExecutor().shutdown(); }); @@ -1295,30 +1293,6 @@ TEST_F(InitialSyncerTest, } TEST_F(InitialSyncerTest, - InitialSyncerReturnsNoSuchKeyIfLastOplogEntryFetcherReturnsEntryWithMissingHash) { - auto initialSyncer = &getInitialSyncer(); - auto opCtx = makeOpCtx(); - - _syncSourceSelector->setChooseNewSyncSourceResult_forTest(HostAndPort("localhost", 12345)); - ASSERT_OK(initialSyncer->startup(opCtx.get(), maxAttempts)); - - auto net = getNet(); - { - executor::NetworkInterfaceMock::InNetworkGuard guard(net); - - // Base rollback ID. - net->scheduleSuccessfulResponse(makeRollbackCheckerResponse(1)); - net->runReadyNetworkOperations(); - - // Last oplog entry. - processSuccessfulLastOplogEntryFetcherResponse({BSONObj()}); - } - - initialSyncer->join(); - ASSERT_EQUALS(ErrorCodes::NoSuchKey, _lastApplied); -} - -TEST_F(InitialSyncerTest, InitialSyncerReturnsNoSuchKeyIfLastOplogEntryFetcherReturnsEntryWithMissingTimestamp) { auto initialSyncer = &getInitialSyncer(); auto opCtx = makeOpCtx(); @@ -1335,7 +1309,7 @@ TEST_F(InitialSyncerTest, net->runReadyNetworkOperations(); // Last oplog entry. - processSuccessfulLastOplogEntryFetcherResponse({BSON("h" << 1LL)}); + processSuccessfulLastOplogEntryFetcherResponse({BSONObj()}); } initialSyncer->join(); @@ -1736,7 +1710,7 @@ TEST_F(InitialSyncerTest, } initialSyncer->join(); - ASSERT_EQUALS(makeOplogEntry(1).getOpTime(), unittest::assertGet(_lastApplied).opTime); + ASSERT_EQUALS(makeOplogEntry(1).getOpTime(), unittest::assertGet(_lastApplied)); } TEST_F( @@ -1789,7 +1763,7 @@ TEST_F( } initialSyncer->join(); - ASSERT_EQUALS(makeOplogEntry(3).getOpTime(), unittest::assertGet(_lastApplied).opTime); + ASSERT_EQUALS(makeOplogEntry(3).getOpTime(), unittest::assertGet(_lastApplied)); } TEST_F( @@ -2354,8 +2328,10 @@ TEST_F( net->blackHole(noi); // Second last oplog entry fetcher. - processSuccessfulLastOplogEntryFetcherResponse({BSON("ts" << Timestamp(1) << "t" << 1 << "h" - << "not a hash")}); + processSuccessfulLastOplogEntryFetcherResponse({BSON("ts" + << "not a timestamp" + << "t" + << 1)}); // _lastOplogEntryFetcherCallbackAfterCloningData() will shut down the OplogFetcher after // setting the completion status. @@ -2932,8 +2908,7 @@ TEST_F(InitialSyncerTest, LastOpTimeShouldBeSetEvenIfNoOperationsAreAppliedAfter } initialSyncer->join(); - ASSERT_EQUALS(oplogEntry.getOpTime(), unittest::assertGet(_lastApplied).opTime); - ASSERT_EQUALS(oplogEntry.getHash(), unittest::assertGet(_lastApplied).value); + ASSERT_EQUALS(oplogEntry.getOpTime(), unittest::assertGet(_lastApplied)); ASSERT_FALSE(_replicationProcess->getConsistencyMarkers()->getInitialSyncFlag(opCtx.get())); } @@ -3484,8 +3459,7 @@ OplogEntry InitialSyncerTest::doInitialSyncWithOneBatch(bool shouldSetFCV) { void InitialSyncerTest::doSuccessfulInitialSyncWithOneBatch(bool shouldSetFCV) { auto lastOp = doInitialSyncWithOneBatch(shouldSetFCV); serverGlobalParams.featureCompatibility.reset(); - ASSERT_EQUALS(lastOp.getOpTime(), unittest::assertGet(_lastApplied).opTime); - ASSERT_EQUALS(lastOp.getHash(), unittest::assertGet(_lastApplied).value); + ASSERT_EQUALS(lastOp.getOpTime(), unittest::assertGet(_lastApplied)); ASSERT_EQUALS(lastOp.getOpTime().getTimestamp(), _storageInterface->getInitialDataTimestamp()); } @@ -3598,8 +3572,7 @@ TEST_F(InitialSyncerTest, } initialSyncer->join(); - ASSERT_EQUALS(lastOp.getOpTime(), unittest::assertGet(_lastApplied).opTime); - ASSERT_EQUALS(lastOp.getHash(), unittest::assertGet(_lastApplied).value); + ASSERT_EQUALS(lastOp.getOpTime(), unittest::assertGet(_lastApplied)); } TEST_F( @@ -3688,8 +3661,7 @@ TEST_F( } initialSyncer->join(); - ASSERT_EQUALS(lastOp.getOpTime(), unittest::assertGet(_lastApplied).opTime); - ASSERT_EQUALS(lastOp.getHash(), unittest::assertGet(_lastApplied).value); + ASSERT_EQUALS(lastOp.getOpTime(), unittest::assertGet(_lastApplied)); ASSERT_TRUE(fetchCountIncremented); @@ -3993,7 +3965,7 @@ TEST_F(InitialSyncerTest, GetInitialSyncProgressReturnsCorrectProgress) { log() << "waiting for initial sync to verify it completed OK"; initialSyncer->join(); - ASSERT_EQUALS(makeOplogEntry(7).getOpTime(), unittest::assertGet(_lastApplied).opTime); + ASSERT_EQUALS(makeOplogEntry(7).getOpTime(), unittest::assertGet(_lastApplied)); progress = initialSyncer->getInitialSyncProgress(); log() << "Progress at end: " << progress; diff --git a/src/mongo/db/repl/multiapplier_test.cpp b/src/mongo/db/repl/multiapplier_test.cpp index 2e8513fa6c4..2e5bbe84020 100644 --- a/src/mongo/db/repl/multiapplier_test.cpp +++ b/src/mongo/db/repl/multiapplier_test.cpp @@ -68,7 +68,7 @@ void MultiApplierTest::setUp() { */ OplogEntry makeOplogEntry(int ts) { return OplogEntry(OpTime(Timestamp(ts, 1), 1), // optime - 1LL, // hash + boost::none, // hash OpTypeEnum::kNoop, // op type NamespaceString("a.a"), // namespace boost::none, // uuid diff --git a/src/mongo/db/repl/oplog_buffer_collection_test.cpp b/src/mongo/db/repl/oplog_buffer_collection_test.cpp index 9c42f322b11..4d7080cd754 100644 --- a/src/mongo/db/repl/oplog_buffer_collection_test.cpp +++ b/src/mongo/db/repl/oplog_buffer_collection_test.cpp @@ -109,7 +109,7 @@ NamespaceString makeNamespace(const T& t, const char* suffix = "") { * Generates oplog entries with the given number used for the timestamp. */ BSONObj makeOplogEntry(int t) { - return BSON("ts" << Timestamp(t, t) << "h" << t << "ns" + return BSON("ts" << Timestamp(t, t) << "ns" << "a.a" << "v" << 2 diff --git a/src/mongo/db/repl/oplog_entry.cpp b/src/mongo/db/repl/oplog_entry.cpp index 90e2a633c2e..96acba824d3 100644 --- a/src/mongo/db/repl/oplog_entry.cpp +++ b/src/mongo/db/repl/oplog_entry.cpp @@ -85,7 +85,7 @@ OplogEntry::CommandType parseCommandType(const BSONObj& objectField) { * Returns a document representing an oplog entry with the given fields. */ BSONObj makeOplogEntryDoc(OpTime opTime, - long long hash, + const boost::optional<long long> hash, OpTypeEnum opType, const NamespaceString& nss, const boost::optional<UUID>& uuid, @@ -104,10 +104,12 @@ BSONObj makeOplogEntryDoc(OpTime opTime, sessionInfo.serialize(&builder); builder.append(OplogEntryBase::kTimestampFieldName, opTime.getTimestamp()); builder.append(OplogEntryBase::kTermFieldName, opTime.getTerm()); - builder.append(OplogEntryBase::kHashFieldName, hash); builder.append(OplogEntryBase::kVersionFieldName, version); builder.append(OplogEntryBase::kOpTypeFieldName, OpType_serializer(opType)); builder.append(OplogEntryBase::kNssFieldName, nss.toString()); + if (hash) { + builder.append(OplogEntryBase::kHashFieldName, hash.get()); + } if (uuid) { uuid->appendToBuilder(&builder, OplogEntryBase::kUuidFieldName); } @@ -209,7 +211,7 @@ OplogEntry::OplogEntry(BSONObj rawInput) : raw(std::move(rawInput)) { } OplogEntry::OplogEntry(OpTime opTime, - long long hash, + const boost::optional<long long> hash, OpTypeEnum opType, const NamespaceString& nss, const boost::optional<UUID>& uuid, diff --git a/src/mongo/db/repl/oplog_entry.h b/src/mongo/db/repl/oplog_entry.h index 1cd68331df6..fd0535ff127 100644 --- a/src/mongo/db/repl/oplog_entry.h +++ b/src/mongo/db/repl/oplog_entry.h @@ -83,7 +83,7 @@ public: static StatusWith<OplogEntry> parse(const BSONObj& object); OplogEntry(OpTime opTime, - long long hash, + const boost::optional<long long> hash, OpTypeEnum opType, const NamespaceString& nss, const boost::optional<UUID>& uuid, diff --git a/src/mongo/db/repl/oplog_entry.idl b/src/mongo/db/repl/oplog_entry.idl index ff3fd6d07f6..8ee6d00ee56 100644 --- a/src/mongo/db/repl/oplog_entry.idl +++ b/src/mongo/db/repl/oplog_entry.idl @@ -100,6 +100,7 @@ structs: h: cpp_name: hash type: long + optional: true # Only used for backwards compatibility with 4.0 and before. description: "The hash of the oplog entry" v: cpp_name: version diff --git a/src/mongo/db/repl/oplog_fetcher.cpp b/src/mongo/db/repl/oplog_fetcher.cpp index 0b0e0990ecc..7850334f478 100644 --- a/src/mongo/db/repl/oplog_fetcher.cpp +++ b/src/mongo/db/repl/oplog_fetcher.cpp @@ -111,7 +111,7 @@ BSONObj makeMetadataObject() { /** * Checks the first batch of results from query. * 'documents' are the first batch of results returned from tailing the remote oplog. - * 'lastFetched' optime and hash should be consistent with the predicate in the query. + * 'lastFetched' optime should be consistent with the predicate in the query. * 'lastOpCommitted' is the OpTime of the most recently committed op of which this node is aware. * 'remoteLastOpApplied' is the last OpTime applied on the sync source. This is optional for * compatibility with 3.4 servers that do not send OplogQueryMetadata. @@ -132,7 +132,7 @@ BSONObj makeMetadataObject() { * the remote oplog. */ Status checkRemoteOplogStart(const Fetcher::Documents& documents, - OpTimeWithHash lastFetched, + OpTime lastFetched, OpTime lastOpCommitted, boost::optional<OpTime> remoteLastOpApplied, boost::optional<OpTime> remoteLastOpCommitted, @@ -163,12 +163,12 @@ Status checkRemoteOplogStart(const Fetcher::Documents& documents, // failed to detect the rollback if it occurred between sync source selection (when we check the // candidate is ahead of us) and sync source resolution (when we got 'requiredRBID'). If the // sync source is now behind us, choose a new sync source to prevent going into rollback. - if (remoteLastOpApplied && (*remoteLastOpApplied < lastFetched.opTime)) { + if (remoteLastOpApplied && (*remoteLastOpApplied < lastFetched)) { return Status(ErrorCodes::InvalidSyncSource, str::stream() << "Sync source's last applied OpTime " << remoteLastOpApplied->toString() << " is older than our last fetched OpTime " - << lastFetched.opTime.toString() + << lastFetched.toString() << ". Choosing new sync source."); } @@ -183,13 +183,13 @@ Status checkRemoteOplogStart(const Fetcher::Documents& documents, // OpTime to determine where to start our OplogFetcher. if (requireFresherSyncSource && remoteLastOpApplied && remoteLastOpCommitted && std::tie(*remoteLastOpApplied, *remoteLastOpCommitted) <= - std::tie(lastFetched.opTime, lastOpCommitted)) { + std::tie(lastFetched, lastOpCommitted)) { return Status(ErrorCodes::InvalidSyncSource, str::stream() << "Sync source cannot be behind me, and if I am up-to-date with the " "sync source, it must have a higher lastOpCommitted. " << "My last fetched oplog optime: " - << lastFetched.opTime.toString() + << lastFetched.toString() << ", latest oplog optime of sync source: " << remoteLastOpApplied->toString() << ", my lastOpCommitted: " @@ -212,33 +212,16 @@ Status checkRemoteOplogStart(const Fetcher::Documents& documents, auto opTimeResult = OpTime::parseFromOplogEntry(o); if (!opTimeResult.isOK()) { return Status(ErrorCodes::InvalidBSON, - str::stream() << "our last op time fetched: " << lastFetched.opTime.toString() - << " (hash: " - << lastFetched.value - << ")" + str::stream() << "our last optime fetched: " << lastFetched.toString() << ". failed to parse optime from first oplog on source: " << o.toString() << ": " << opTimeResult.getStatus().toString()); } auto opTime = opTimeResult.getValue(); - long long hash = o["h"].numberLong(); - if (opTime != lastFetched.opTime || hash != lastFetched.value) { - std::string message = str::stream() - << "Our last op time fetched: " << lastFetched.opTime.toString() - << ". source's GTE: " << opTime.toString() << " hashes: (" << lastFetched.value << "/" - << hash << ")"; - - // In PV1, if the hashes do not match, the optimes should not either since optimes uniquely - // identify oplog entries. In that case we fail before we potentially corrupt data. This - // should never happen. - if (opTime.getTerm() != OpTime::kUninitializedTerm && hash != lastFetched.value && - opTime == lastFetched.opTime) { - severe() << "Hashes do not match but OpTimes do. " << message - << ". Source's GTE doc: " << redact(o); - fassertFailedNoTrace(40634); - } - + if (opTime != lastFetched) { + std::string message = str::stream() << "Our last optime fetched: " << lastFetched.toString() + << ". source's GTE: " << opTime.toString(); return Status(ErrorCodes::OplogStartMissing, message); } return Status::OK(); @@ -292,14 +275,14 @@ StatusWith<OplogFetcher::DocumentsInfo> OplogFetcher::validateDocuments( continue; } - auto docOpTimeWithHash = AbstractOplogFetcher::parseOpTimeWithHash(doc); - if (!docOpTimeWithHash.isOK()) { - return docOpTimeWithHash.getStatus(); + auto docOpTime = OpTime::parseFromOplogEntry(doc); + if (!docOpTime.isOK()) { + return docOpTime.getStatus(); } - info.lastDocument = docOpTimeWithHash.getValue(); + info.lastDocument = docOpTime.getValue(); // Check to see if the oplog entry goes back in time for this document. - const auto docTS = info.lastDocument.opTime.getTimestamp(); + const auto docTS = info.lastDocument.getTimestamp(); if (lastTS >= docTS) { return Status(ErrorCodes::OplogOutOfOrder, str::stream() << "Out of order entries in oplog. lastTS: " @@ -330,7 +313,7 @@ StatusWith<OplogFetcher::DocumentsInfo> OplogFetcher::validateDocuments( } OplogFetcher::OplogFetcher(executor::TaskExecutor* executor, - OpTimeWithHash lastFetched, + OpTime lastFetched, HostAndPort source, NamespaceString nss, ReplSetConfig config, @@ -436,7 +419,7 @@ StatusWith<BSONObj> OplogFetcher::_onSuccessfulBatch(const Fetcher::QueryRespons auto oqMetadata = oqMetadataResult.getValue(); // This lastFetched value is the last OpTime from the previous batch. - auto lastFetched = _getLastOpTimeWithHashFetched(); + auto lastFetched = _getLastOpTimeFetched(); // Check start of remote oplog and, if necessary, stop fetcher to execute rollback. if (queryResponse.first) { @@ -465,8 +448,8 @@ StatusWith<BSONObj> OplogFetcher::_onSuccessfulBatch(const Fetcher::QueryRespons firstDocToApply++; } - auto validateResult = OplogFetcher::validateDocuments( - documents, queryResponse.first, lastFetched.opTime.getTimestamp()); + auto validateResult = + OplogFetcher::validateDocuments(documents, queryResponse.first, lastFetched.getTimestamp()); if (!validateResult.isOK()) { return validateResult.getStatus(); } diff --git a/src/mongo/db/repl/oplog_fetcher.h b/src/mongo/db/repl/oplog_fetcher.h index 492301c2a10..42eace2f077 100644 --- a/src/mongo/db/repl/oplog_fetcher.h +++ b/src/mongo/db/repl/oplog_fetcher.h @@ -51,8 +51,8 @@ MONGO_FAIL_POINT_DECLARE(stopReplProducer); /** * The oplog fetcher, once started, reads operations from a remote oplog using a tailable cursor. * - * The initial find command is generated from last fetched optime and hash and may contain the - * current term depending on the replica set config provided. + * The initial find command is generated from last fetched optime and may contain the current term + * depending on the replica set config provided. * * Forwards metadata in each find/getMore response to the data replicator external state. * @@ -86,7 +86,7 @@ public: size_t networkDocumentBytes = 0; size_t toApplyDocumentCount = 0; size_t toApplyDocumentBytes = 0; - OpTimeWithHash lastDocument = {0, OpTime()}; + OpTime lastDocument = OpTime(); }; /** @@ -115,7 +115,7 @@ public: * Invariants if validation fails on any of the provided arguments. */ OplogFetcher(executor::TaskExecutor* executor, - OpTimeWithHash lastFetched, + OpTime lastFetched, HostAndPort source, NamespaceString nss, ReplSetConfig config, diff --git a/src/mongo/db/repl/oplog_fetcher_test.cpp b/src/mongo/db/repl/oplog_fetcher_test.cpp index a903bbffbca..70c3886cd40 100644 --- a/src/mongo/db/repl/oplog_fetcher_test.cpp +++ b/src/mongo/db/repl/oplog_fetcher_test.cpp @@ -113,8 +113,8 @@ void OplogFetcherTest::setUp() { rbid = 2; dataReplicatorExternalState = stdx::make_unique<DataReplicatorExternalStateMock>(); - dataReplicatorExternalState->currentTerm = lastFetched.opTime.getTerm(); - dataReplicatorExternalState->lastCommittedOpTime = {{9999, 0}, lastFetched.opTime.getTerm()}; + dataReplicatorExternalState->currentTerm = lastFetched.getTerm(); + dataReplicatorExternalState->lastCommittedOpTime = {{9999, 0}, lastFetched.getTerm()}; enqueueDocumentsFn = [this](Fetcher::Documents::const_iterator begin, Fetcher::Documents::const_iterator end, @@ -231,7 +231,7 @@ TEST_F( FindQueryContainsTermAndStartTimestampIfGetCurrentTermAndLastCommittedOpTimeReturnsValidTerm) { auto cmdObj = makeOplogFetcher(_createConfig())->getFindQuery_forTest(); ASSERT_EQUALS(mongo::BSONType::Object, cmdObj["filter"].type()); - ASSERT_BSONOBJ_EQ(BSON("ts" << BSON("$gte" << lastFetched.opTime.getTimestamp())), + ASSERT_BSONOBJ_EQ(BSON("ts" << BSON("$gte" << lastFetched.getTimestamp())), cmdObj["filter"].Obj()); ASSERT_EQUALS(dataReplicatorExternalState->currentTerm, cmdObj["term"].numberLong()); _checkDefaultCommandObjectFields(cmdObj); @@ -242,7 +242,7 @@ TEST_F(OplogFetcherTest, dataReplicatorExternalState->currentTerm = OpTime::kUninitializedTerm; auto cmdObj = makeOplogFetcher(_createConfig())->getFindQuery_forTest(); ASSERT_EQUALS(mongo::BSONType::Object, cmdObj["filter"].type()); - ASSERT_BSONOBJ_EQ(BSON("ts" << BSON("$gte" << lastFetched.opTime.getTimestamp())), + ASSERT_BSONOBJ_EQ(BSON("ts" << BSON("$gte" << lastFetched.getTimestamp())), cmdObj["filter"].Obj()); ASSERT_FALSE(cmdObj.hasField("term")); _checkDefaultCommandObjectFields(cmdObj); @@ -283,7 +283,7 @@ TEST_F(OplogFetcherTest, InvalidOplogQueryMetadataInResponseStopsTheOplogFetcher TEST_F(OplogFetcherTest, ValidMetadataInResponseWithoutOplogMetadataShouldBeForwardedToProcessMetadataFn) { - rpc::ReplSetMetadata metadata(1, lastFetched.opTime, lastFetched.opTime, 1, OID::gen(), 2, 2); + rpc::ReplSetMetadata metadata(1, lastFetched, lastFetched, 1, OID::gen(), 2, 2); BSONObjBuilder bob; ASSERT_OK(metadata.writeToMetadata(&bob)); auto metadataObj = bob.obj(); @@ -356,7 +356,7 @@ TEST_F(OplogFetcherTest, MetadataAndBatchAreNotProcessedWhenSyncSourceIsBehind) TEST_F(OplogFetcherTest, MetadataAndBatchAreNotProcessedWhenSyncSourceIsNotAhead) { rpc::ReplSetMetadata replMetadata(1, OpTime(), OpTime(), 1, OID::gen(), -1, -1); - rpc::OplogQueryMetadata oqMetadata(staleOpTime, lastFetched.opTime, rbid, 2, 2); + rpc::OplogQueryMetadata oqMetadata(staleOpTime, lastFetched, rbid, 2, 2); BSONObjBuilder bob; ASSERT_OK(replMetadata.writeToMetadata(&bob)); ASSERT_OK(oqMetadata.writeToMetadata(&bob)); @@ -375,7 +375,7 @@ TEST_F(OplogFetcherTest, MetadataAndBatchAreNotProcessedWhenSyncSourceIsNotAhead TEST_F(OplogFetcherTest, MetadataAndBatchAreProcessedWhenSyncSourceIsNotAheadButHasHigherLastOpCommitted) { rpc::ReplSetMetadata replMetadata(1, OpTime(), OpTime(), 1, OID::gen(), -1, -1); - rpc::OplogQueryMetadata oqMetadata(remoteNewerOpTime, lastFetched.opTime, rbid, 2, 2); + rpc::OplogQueryMetadata oqMetadata(remoteNewerOpTime, lastFetched, rbid, 2, 2); BSONObjBuilder bob; ASSERT_OK(replMetadata.writeToMetadata(&bob)); ASSERT_OK(oqMetadata.writeToMetadata(&bob)); @@ -397,7 +397,7 @@ TEST_F(OplogFetcherTest, ASSERT_OK(oqMetadata.writeToMetadata(&bob)); auto metadataObj = bob.obj(); - auto entry = makeNoopOplogEntry({123LL, staleOpTime}); + auto entry = makeNoopOplogEntry(staleOpTime); ASSERT_EQUALS( ErrorCodes::InvalidSyncSource, processSingleBatch( @@ -428,7 +428,7 @@ TEST_F(OplogFetcherTest, MetadataAndBatchAreProcessedWhenSyncSourceIsCurrentButM TEST_F(OplogFetcherTest, MetadataAndBatchAreProcessedWhenSyncSourceIsNotAheadWithoutRequiringFresherSyncSource) { rpc::ReplSetMetadata replMetadata(1, OpTime(), OpTime(), 1, OID::gen(), -1, -1); - rpc::OplogQueryMetadata oqMetadata(staleOpTime, lastFetched.opTime, rbid, 2, 2); + rpc::OplogQueryMetadata oqMetadata(staleOpTime, lastFetched, rbid, 2, 2); BSONObjBuilder bob; ASSERT_OK(replMetadata.writeToMetadata(&bob)); ASSERT_OK(oqMetadata.writeToMetadata(&bob)); @@ -443,17 +443,16 @@ TEST_F(OplogFetcherTest, TEST_F(OplogFetcherTest, MetadataWithoutOplogQueryMetadataIsNotProcessedOnBatchThatTriggersRollback) { - rpc::ReplSetMetadata metadata(1, lastFetched.opTime, lastFetched.opTime, 1, OID::gen(), 2, 2); + rpc::ReplSetMetadata metadata(1, lastFetched, lastFetched, 1, OID::gen(), 2, 2); BSONObjBuilder bob; ASSERT_OK(metadata.writeToMetadata(&bob)); auto metadataObj = bob.obj(); - ASSERT_EQUALS(ErrorCodes::OplogStartMissing, - processSingleBatch( - {concatenate(makeCursorResponse( - 0, {makeNoopOplogEntry(Seconds(456), lastFetched.value)}), - metadataObj), - Milliseconds(0)}) - ->getStatus()); + ASSERT_EQUALS( + ErrorCodes::OplogStartMissing, + processSingleBatch( + {concatenate(makeCursorResponse(0, {makeNoopOplogEntry(Seconds(456))}), metadataObj), + Milliseconds(0)}) + ->getStatus()); ASSERT_FALSE(dataReplicatorExternalState->metadataWasProcessed); } @@ -464,13 +463,12 @@ TEST_F(OplogFetcherTest, MetadataIsNotProcessedOnBatchThatTriggersRollback) { ASSERT_OK(replMetadata.writeToMetadata(&bob)); ASSERT_OK(oqMetadata.writeToMetadata(&bob)); auto metadataObj = bob.obj(); - ASSERT_EQUALS(ErrorCodes::OplogStartMissing, - processSingleBatch( - {concatenate(makeCursorResponse( - 0, {makeNoopOplogEntry(Seconds(456), lastFetched.value)}), - metadataObj), - Milliseconds(0)}) - ->getStatus()); + ASSERT_EQUALS( + ErrorCodes::OplogStartMissing, + processSingleBatch( + {concatenate(makeCursorResponse(0, {makeNoopOplogEntry(Seconds(456))}), metadataObj), + Milliseconds(0)}) + ->getStatus()); ASSERT_FALSE(dataReplicatorExternalState->metadataWasProcessed); } @@ -498,24 +496,10 @@ TEST_F( OplogFetcherTest, LastOpTimeFetchedDoesNotMatchFirstDocumentCausesOplogFetcherToStopWithOplogStartMissingError) { auto metadataObj = makeOplogQueryMetadataObject(remoteNewerOpTime, rbid, 2, 2); - ASSERT_EQUALS(ErrorCodes::OplogStartMissing, - processSingleBatch( - {concatenate(makeCursorResponse( - 0, {makeNoopOplogEntry(Seconds(456), lastFetched.value)}), - metadataObj), - Milliseconds(0)}) - ->getStatus()); -} - -TEST_F(OplogFetcherTest, - LastHashFetchedDoesNotMatchFirstDocumentCausesOplogFetcherToStopWithOplogStartMissingError) { - auto metadataObj = makeOplogQueryMetadataObject(remoteNewerOpTime, rbid, 2, 2); ASSERT_EQUALS( ErrorCodes::OplogStartMissing, processSingleBatch( - {concatenate(makeCursorResponse( - 0, {makeNoopOplogEntry(remoteNewerOpTime, lastFetched.value + 1)}), - metadataObj), + {concatenate(makeCursorResponse(0, {makeNoopOplogEntry(Seconds(456))}), metadataObj), Milliseconds(0)}) ->getStatus()); } @@ -541,9 +525,9 @@ TEST_F(OplogFetcherTest, TimestampsNotAdvancingInBatchCausesOplogFetcherStopWith ErrorCodes::OplogOutOfOrder, processSingleBatch({concatenate(makeCursorResponse(0, {makeNoopOplogEntry(lastFetched), - makeNoopOplogEntry(Seconds(1000), 1), - makeNoopOplogEntry(Seconds(2000), 1), - makeNoopOplogEntry(Seconds(1500), 1)}), + makeNoopOplogEntry(Seconds(1000)), + makeNoopOplogEntry(Seconds(2000)), + makeNoopOplogEntry(Seconds(1500))}), metadataObj), Milliseconds(0)}) ->getStatus()); @@ -553,8 +537,8 @@ TEST_F(OplogFetcherTest, OplogFetcherShouldExcludeFirstDocumentInFirstBatchWhenE auto metadataObj = makeOplogQueryMetadataObject(remoteNewerOpTime, rbid, 2, 2); auto firstEntry = makeNoopOplogEntry(lastFetched); - auto secondEntry = makeNoopOplogEntry({{Seconds(456), 0}, lastFetched.opTime.getTerm()}, 200); - auto thirdEntry = makeNoopOplogEntry({{Seconds(789), 0}, lastFetched.opTime.getTerm()}, 300); + auto secondEntry = makeNoopOplogEntry({{Seconds(456), 0}, lastFetched.getTerm()}); + auto thirdEntry = makeNoopOplogEntry({{Seconds(789), 0}, lastFetched.getTerm()}); Fetcher::Documents documents{firstEntry, secondEntry, thirdEntry}; auto shutdownState = processSingleBatch( @@ -572,11 +556,10 @@ TEST_F(OplogFetcherTest, OplogFetcherShouldExcludeFirstDocumentInFirstBatchWhenE ASSERT_EQUALS(size_t(secondEntry.objsize() + thirdEntry.objsize()), lastEnqueuedDocumentsInfo.toApplyDocumentBytes); - ASSERT_EQUALS(thirdEntry["h"].numberLong(), lastEnqueuedDocumentsInfo.lastDocument.value); ASSERT_EQUALS(unittest::assertGet(OpTime::parseFromOplogEntry(thirdEntry)), - lastEnqueuedDocumentsInfo.lastDocument.opTime); + lastEnqueuedDocumentsInfo.lastDocument); - // The last fetched optime and hash should be updated after pushing the operations into the + // The last fetched optime should be updated after pushing the operations into the // buffer and reflected in the shutdown callback arguments. ASSERT_OK(shutdownState->getStatus()); } @@ -585,8 +568,8 @@ TEST_F(OplogFetcherTest, OplogFetcherShouldReportErrorsThrownFromCallback) { auto metadataObj = makeOplogQueryMetadataObject(remoteNewerOpTime, rbid, 2, 2); auto firstEntry = makeNoopOplogEntry(lastFetched); - auto secondEntry = makeNoopOplogEntry({{Seconds(456), 0}, lastFetched.opTime.getTerm()}, 200); - auto thirdEntry = makeNoopOplogEntry({{Seconds(789), 0}, lastFetched.opTime.getTerm()}, 300); + auto secondEntry = makeNoopOplogEntry({{Seconds(456), 0}, lastFetched.getTerm()}); + auto thirdEntry = makeNoopOplogEntry({{Seconds(789), 0}, lastFetched.getTerm()}); Fetcher::Documents documents{firstEntry, secondEntry, thirdEntry}; enqueueDocumentsFn = [](Fetcher::Documents::const_iterator, @@ -603,8 +586,8 @@ TEST_F(OplogFetcherTest, OplogFetcherShouldReportErrorsThrownFromCallback) { void OplogFetcherTest::testSyncSourceChecking(rpc::ReplSetMetadata* replMetadata, rpc::OplogQueryMetadata* oqMetadata) { auto firstEntry = makeNoopOplogEntry(lastFetched); - auto secondEntry = makeNoopOplogEntry({{Seconds(456), 0}, lastFetched.opTime.getTerm()}, 200); - auto thirdEntry = makeNoopOplogEntry({{Seconds(789), 0}, lastFetched.opTime.getTerm()}, 300); + auto secondEntry = makeNoopOplogEntry({{Seconds(456), 0}, lastFetched.getTerm()}); + auto thirdEntry = makeNoopOplogEntry({{Seconds(789), 0}, lastFetched.getTerm()}); Fetcher::Documents documents{firstEntry, secondEntry, thirdEntry}; BSONObjBuilder bob; @@ -623,8 +606,7 @@ void OplogFetcherTest::testSyncSourceChecking(rpc::ReplSetMetadata* replMetadata // Sync source checking happens after we have successfully pushed the operations into // the buffer for the next replication phase (eg. applier). - // The last fetched optime and hash should be reflected in the shutdown callback - // arguments. + // The last fetched optime should be reflected in the shutdown callback arguments. ASSERT_EQUALS(ErrorCodes::InvalidSyncSource, shutdownState->getStatus()); } @@ -639,7 +621,7 @@ TEST_F(OplogFetcherTest, FailedSyncSourceCheckWithoutMetadataStopsTheOplogFetche } TEST_F(OplogFetcherTest, FailedSyncSourceCheckWithReplSetMetadataStopsTheOplogFetcher) { - rpc::ReplSetMetadata metadata(lastFetched.opTime.getTerm(), + rpc::ReplSetMetadata metadata(lastFetched.getTerm(), {{Seconds(10000), 0}, 1}, {{Seconds(20000), 0}, 1}, 1, @@ -657,7 +639,7 @@ TEST_F(OplogFetcherTest, FailedSyncSourceCheckWithReplSetMetadataStopsTheOplogFe TEST_F(OplogFetcherTest, FailedSyncSourceCheckWithBothMetadatasStopsTheOplogFetcher) { rpc::ReplSetMetadata replMetadata( - lastFetched.opTime.getTerm(), OpTime(), OpTime(), 1, OID::gen(), -1, -1); + lastFetched.getTerm(), OpTime(), OpTime(), 1, OID::gen(), -1, -1); rpc::OplogQueryMetadata oqMetadata( {{Seconds(10000), 0}, 1}, {{Seconds(20000), 0}, 1}, rbid, 2, 2); @@ -671,7 +653,7 @@ TEST_F(OplogFetcherTest, FailedSyncSourceCheckWithBothMetadatasStopsTheOplogFetc TEST_F(OplogFetcherTest, FailedSyncSourceCheckWithSyncSourceHavingNoSyncSourceInReplSetMetadataStopsTheOplogFetcher) { - rpc::ReplSetMetadata metadata(lastFetched.opTime.getTerm(), + rpc::ReplSetMetadata metadata(lastFetched.getTerm(), {{Seconds(10000), 0}, 1}, {{Seconds(20000), 0}, 1}, 1, @@ -689,7 +671,7 @@ TEST_F(OplogFetcherTest, TEST_F(OplogFetcherTest, FailedSyncSourceCheckWithSyncSourceHavingNoSyncSourceStopsTheOplogFetcher) { - rpc::ReplSetMetadata replMetadata(lastFetched.opTime.getTerm(), + rpc::ReplSetMetadata replMetadata(lastFetched.getTerm(), {{Seconds(10000), 0}, 1}, {{Seconds(20000), 0}, 1}, 1, @@ -729,7 +711,7 @@ RemoteCommandRequest OplogFetcherTest::testTwoBatchHandling() { CursorId cursorId = 22LL; auto firstEntry = makeNoopOplogEntry(lastFetched); - auto secondEntry = makeNoopOplogEntry({{Seconds(456), 0}, lastFetched.opTime.getTerm()}, 200); + auto secondEntry = makeNoopOplogEntry({{Seconds(456), 0}, lastFetched.getTerm()}); auto metadataObj = makeOplogQueryMetadataObject(remoteNewerOpTime, rbid, 2, 2); processNetworkResponse( @@ -741,8 +723,8 @@ RemoteCommandRequest OplogFetcherTest::testTwoBatchHandling() { ASSERT_BSONOBJ_EQ(secondEntry, lastEnqueuedDocuments[0]); // Set cursor ID to 0 in getMore response to indicate no more data available. - auto thirdEntry = makeNoopOplogEntry({{Seconds(789), 0}, lastFetched.opTime.getTerm()}, 300); - auto fourthEntry = makeNoopOplogEntry({{Seconds(1200), 0}, lastFetched.opTime.getTerm()}, 300); + auto thirdEntry = makeNoopOplogEntry({{Seconds(789), 0}, lastFetched.getTerm()}); + auto fourthEntry = makeNoopOplogEntry({{Seconds(1200), 0}, lastFetched.getTerm()}); auto request = processNetworkResponse(makeCursorResponse(0, {thirdEntry, fourthEntry}, false)); ASSERT_EQUALS(std::string("getMore"), request.cmdObj.firstElementFieldName()); @@ -773,7 +755,7 @@ TEST_F( } TEST_F(OplogFetcherTest, ValidateDocumentsReturnsNoSuchKeyIfTimestampIsNotFoundInAnyDocument) { - auto firstEntry = makeNoopOplogEntry(Seconds(123), 100); + auto firstEntry = makeNoopOplogEntry(Seconds(123)); auto secondEntry = BSON("o" << BSON("msg" << "oplog entry without optime")); @@ -788,8 +770,8 @@ TEST_F(OplogFetcherTest, ValidateDocumentsReturnsNoSuchKeyIfTimestampIsNotFoundI TEST_F( OplogFetcherTest, ValidateDocumentsReturnsOutOfOrderIfTimestampInFirstEntryIsEqualToLastTimestampAndNotProcessingFirstBatch) { - auto firstEntry = makeNoopOplogEntry(Seconds(123), 100); - auto secondEntry = makeNoopOplogEntry(Seconds(456), 200); + auto firstEntry = makeNoopOplogEntry(Seconds(123)); + auto secondEntry = makeNoopOplogEntry(Seconds(456)); ASSERT_EQUALS(ErrorCodes::OplogOutOfOrder, OplogFetcher::validateDocuments( @@ -800,8 +782,8 @@ TEST_F( } TEST_F(OplogFetcherTest, ValidateDocumentsReturnsOutOfOrderIfTimestampInSecondEntryIsBeforeFirst) { - auto firstEntry = makeNoopOplogEntry(Seconds(456), 100); - auto secondEntry = makeNoopOplogEntry(Seconds(123), 200); + auto firstEntry = makeNoopOplogEntry(Seconds(456)); + auto secondEntry = makeNoopOplogEntry(Seconds(123)); ASSERT_EQUALS(ErrorCodes::OplogOutOfOrder, OplogFetcher::validateDocuments( @@ -812,9 +794,9 @@ TEST_F(OplogFetcherTest, ValidateDocumentsReturnsOutOfOrderIfTimestampInSecondEn } TEST_F(OplogFetcherTest, ValidateDocumentsReturnsOutOfOrderIfTimestampInThirdEntryIsBeforeSecond) { - auto firstEntry = makeNoopOplogEntry(Seconds(123), 100); - auto secondEntry = makeNoopOplogEntry(Seconds(789), 200); - auto thirdEntry = makeNoopOplogEntry(Seconds(456), 300); + auto firstEntry = makeNoopOplogEntry(Seconds(123)); + auto secondEntry = makeNoopOplogEntry(Seconds(789)); + auto thirdEntry = makeNoopOplogEntry(Seconds(456)); ASSERT_EQUALS(ErrorCodes::OplogOutOfOrder, OplogFetcher::validateDocuments( @@ -826,9 +808,9 @@ TEST_F(OplogFetcherTest, ValidateDocumentsReturnsOutOfOrderIfTimestampInThirdEnt TEST_F(OplogFetcherTest, ValidateDocumentsExcludesFirstDocumentInApplyCountAndBytesIfProcessingFirstBatch) { - auto firstEntry = makeNoopOplogEntry(Seconds(123), 100); - auto secondEntry = makeNoopOplogEntry(Seconds(456), 200); - auto thirdEntry = makeNoopOplogEntry(Seconds(789), 300); + auto firstEntry = makeNoopOplogEntry(Seconds(123)); + auto secondEntry = makeNoopOplogEntry(Seconds(456)); + auto thirdEntry = makeNoopOplogEntry(Seconds(789)); auto info = unittest::assertGet(OplogFetcher::validateDocuments( {firstEntry, secondEntry, thirdEntry}, @@ -839,16 +821,14 @@ TEST_F(OplogFetcherTest, ASSERT_EQUALS(size_t(firstEntry.objsize() + secondEntry.objsize() + thirdEntry.objsize()), info.networkDocumentBytes); - ASSERT_EQUALS(300LL, info.lastDocument.value); - ASSERT_EQUALS(unittest::assertGet(OpTime::parseFromOplogEntry(thirdEntry)), - info.lastDocument.opTime); + ASSERT_EQUALS(unittest::assertGet(OpTime::parseFromOplogEntry(thirdEntry)), info.lastDocument); } TEST_F(OplogFetcherTest, ValidateDocumentsIncludesFirstDocumentInApplyCountAndBytesIfNotProcessingFirstBatch) { - auto firstEntry = makeNoopOplogEntry(Seconds(123), 100); - auto secondEntry = makeNoopOplogEntry(Seconds(456), 200); - auto thirdEntry = makeNoopOplogEntry(Seconds(789), 300); + auto firstEntry = makeNoopOplogEntry(Seconds(123)); + auto secondEntry = makeNoopOplogEntry(Seconds(456)); + auto thirdEntry = makeNoopOplogEntry(Seconds(789)); auto info = unittest::assertGet(OplogFetcher::validateDocuments( {firstEntry, secondEntry, thirdEntry}, false, Timestamp(Seconds(100), 0))); @@ -860,14 +840,12 @@ TEST_F(OplogFetcherTest, ASSERT_EQUALS(info.networkDocumentCount, info.toApplyDocumentCount); ASSERT_EQUALS(info.networkDocumentBytes, info.toApplyDocumentBytes); - ASSERT_EQUALS(300LL, info.lastDocument.value); - ASSERT_EQUALS(unittest::assertGet(OpTime::parseFromOplogEntry(thirdEntry)), - info.lastDocument.opTime); + ASSERT_EQUALS(unittest::assertGet(OpTime::parseFromOplogEntry(thirdEntry)), info.lastDocument); } TEST_F(OplogFetcherTest, - ValidateDocumentsReturnsDefaultLastDocumentHashAndOpTimeWhenThereAreNoDocumentsToApply) { - auto firstEntry = makeNoopOplogEntry(Seconds(123), 100); + ValidateDocumentsReturnsDefaultLastDocumentOpTimeWhenThereAreNoDocumentsToApply) { + auto firstEntry = makeNoopOplogEntry(Seconds(123)); auto info = unittest::assertGet(OplogFetcher::validateDocuments( {firstEntry}, @@ -880,8 +858,7 @@ TEST_F(OplogFetcherTest, ASSERT_EQUALS(0U, info.toApplyDocumentCount); ASSERT_EQUALS(0U, info.toApplyDocumentBytes); - ASSERT_EQUALS(0LL, info.lastDocument.value); - ASSERT_EQUALS(OpTime(), info.lastDocument.opTime); + ASSERT_EQUALS(OpTime(), info.lastDocument); } TEST_F(OplogFetcherTest, @@ -902,7 +879,6 @@ TEST_F(OplogFetcherTest, ASSERT_EQUALS(0U, info.toApplyDocumentCount); ASSERT_EQUALS(0U, info.toApplyDocumentBytes); - ASSERT_EQUALS(0LL, info.lastDocument.value); - ASSERT_EQUALS(OpTime(), info.lastDocument.opTime); + ASSERT_EQUALS(OpTime(), info.lastDocument); } } // namespace diff --git a/src/mongo/db/repl/oplog_interface_remote.cpp b/src/mongo/db/repl/oplog_interface_remote.cpp index 2cccb9707e6..f96528454c4 100644 --- a/src/mongo/db/repl/oplog_interface_remote.cpp +++ b/src/mongo/db/repl/oplog_interface_remote.cpp @@ -82,7 +82,7 @@ std::string OplogInterfaceRemote::toString() const { std::unique_ptr<OplogInterface::Iterator> OplogInterfaceRemote::makeIterator() const { const Query query = Query().sort(BSON("$natural" << -1)); - const BSONObj fields = BSON("ts" << 1 << "h" << 1); + const BSONObj fields = BSON("ts" << 1 << "t" << 1); return std::unique_ptr<OplogInterface::Iterator>( new OplogIteratorRemote(_getConnection()->query( NamespaceString(_collectionName), query, 0, 0, &fields, 0, _batchSize))); diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index 719370c8675..c739b696577 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -705,7 +705,7 @@ void ReplicationCoordinatorImpl::_startDataReplication(OperationContext* opCtx, return; } - auto onCompletion = [this, startCompleted](const StatusWith<OpTimeWithHash>& status) { + auto onCompletion = [this, startCompleted](const StatusWith<OpTime>& status) { { stdx::lock_guard<stdx::mutex> lock(_mutex); if (status == ErrorCodes::CallbackCanceled) { @@ -723,7 +723,7 @@ void ReplicationCoordinatorImpl::_startDataReplication(OperationContext* opCtx, } const auto lastApplied = status.getValue(); - _setMyLastAppliedOpTime(lock, lastApplied.opTime, false, DataConsistency::Consistent); + _setMyLastAppliedOpTime(lock, lastApplied, false, DataConsistency::Consistent); } // Clear maint. mode. diff --git a/src/mongo/db/repl/replication_recovery_test.cpp b/src/mongo/db/repl/replication_recovery_test.cpp index 518d0a318bc..c7a4f5cbc5d 100644 --- a/src/mongo/db/repl/replication_recovery_test.cpp +++ b/src/mongo/db/repl/replication_recovery_test.cpp @@ -215,7 +215,7 @@ repl::OplogEntry _makeOplogEntry(repl::OpTime opTime, OperationSessionInfo sessionInfo = {}, boost::optional<Date_t> wallTime = boost::none) { return repl::OplogEntry(opTime, // optime - 1LL, // hash + boost::none, // hash opType, // opType testNs, // namespace boost::none, // uuid @@ -247,7 +247,6 @@ repl::OplogEntry _makeTransactionOplogEntry(repl::OpTime opTime, sessionInfo.serialize(&builder); builder.append("ts", opTime.getTimestamp()); builder.append("t", opTime.getTerm()); - builder.append("h", 1LL); builder.append("v", repl::OplogEntry::kOplogVersion); builder.append("op", "c"); builder.append("ns", testNs.toString()); diff --git a/src/mongo/db/repl/roll_back_local_operations.cpp b/src/mongo/db/repl/roll_back_local_operations.cpp index 6e63671dc1f..cdc0011fcf2 100644 --- a/src/mongo/db/repl/roll_back_local_operations.cpp +++ b/src/mongo/db/repl/roll_back_local_operations.cpp @@ -60,6 +60,10 @@ OpTime getOpTime(const OplogInterface::Iterator::Value& oplogValue) { return fassert(40298, OpTime::parseFromOplogEntry(oplogValue.first)); } +long long getTerm(const BSONObj& operation) { + return operation["t"].numberLong(); +} + Timestamp getTimestamp(const BSONObj& operation) { return operation["ts"].timestamp(); } @@ -68,14 +72,9 @@ Timestamp getTimestamp(const OplogInterface::Iterator::Value& oplogValue) { return getTimestamp(oplogValue.first); } -long long getHash(const BSONObj& operation) { - return operation["h"].Long(); -} - -long long getHash(const OplogInterface::Iterator::Value& oplogValue) { - return getHash(oplogValue.first); +long long getTerm(const OplogInterface::Iterator::Value& oplogValue) { + return getTerm(oplogValue.first); } - } // namespace RollBackLocalOperations::RollBackLocalOperations(const OplogInterface& localOplog, @@ -131,31 +130,17 @@ StatusWith<RollBackLocalOperations::RollbackCommonPoint> RollBackLocalOperations if (getTimestamp(_localOplogValue) == getTimestamp(operation)) { _scanned++; - if (getHash(_localOplogValue) == getHash(operation)) { + if (getTerm(_localOplogValue) == getTerm(operation)) { return RollbackCommonPoint(_localOplogValue.first, _localOplogValue.second); } - LOG(2) << "Local oplog entry to roll back: " << redact(_localOplogValue.first); - auto status = _rollbackOperation(_localOplogValue.first); - if (!status.isOK()) { - invariant(ErrorCodes::NoSuchKey != status.code()); - return status; - } - auto result = _localOplogIterator->next(); - if (!result.isOK()) { - return Status(ErrorCodes::NoMatchingDocument, - str::stream() << "reached beginning of local oplog: {" - << "scanned: " - << _scanned - << ", theirTime: " - << getTimestamp(operation).toString() - << ", ourTime: " - << getTimestamp(_localOplogValue).toString() - << "}"); - } - _localOplogValue = result.getValue(); + // We don't need to advance the localOplogIterator here because it is guaranteed to advance + // during the next call to onRemoteOperation. This is because before the next call to + // onRemoteOperation, the remote oplog iterator will advance and the new remote operation is + // guaranteed to have a timestamp less than the current local operation, which will trigger + // a call to get the next local operation. return Status(ErrorCodes::NoSuchKey, - "Unable to determine common point - same timestamp but different hash. " + "Unable to determine common point - same timestamp but different terms. " "Need to process additional remote operations."); } diff --git a/src/mongo/db/repl/roll_back_local_operations_test.cpp b/src/mongo/db/repl/roll_back_local_operations_test.cpp index 9ec88d7cf93..0a7d53702d1 100644 --- a/src/mongo/db/repl/roll_back_local_operations_test.cpp +++ b/src/mongo/db/repl/roll_back_local_operations_test.cpp @@ -47,9 +47,9 @@ namespace { using namespace mongo; using namespace mongo::repl; -BSONObj makeOp(long long seconds, long long hash) { +BSONObj makeOp(long long seconds, long long term = 1LL) { auto uuid = unittest::assertGet(UUID::parse("b4c66a44-c1ca-4d86-8d25-12e82fa2de5b")); - return BSON("ts" << Timestamp(seconds, seconds) << "h" << hash << "t" << seconds << "op" + return BSON("ts" << Timestamp(seconds, seconds) << "t" << term << "op" << "n" << "o" << BSONObj() @@ -60,8 +60,8 @@ BSONObj makeOp(long long seconds, long long hash) { } int recordId = 0; -OplogInterfaceMock::Operation makeOpAndRecordId(long long seconds, long long hash) { - return std::make_pair(makeOp(seconds, hash), RecordId(++recordId)); +OplogInterfaceMock::Operation makeOpAndRecordId(long long seconds, long long term = 1LL) { + return std::make_pair(makeOp(seconds), RecordId(++recordId)); } TEST(RollBackLocalOperationsTest, InvalidLocalOplogIterator) { @@ -84,7 +84,7 @@ TEST(RollBackLocalOperationsTest, InvalidLocalOplogIterator) { } TEST(RollBackLocalOperationsTest, InvalidRollbackOperationFunction) { - ASSERT_THROWS_CODE(RollBackLocalOperations(OplogInterfaceMock({makeOpAndRecordId(1, 0)}), + ASSERT_THROWS_CODE(RollBackLocalOperations(OplogInterfaceMock({makeOpAndRecordId(1)}), RollBackLocalOperations::RollbackOperationFn()), AssertionException, ErrorCodes::BadValue); @@ -93,17 +93,17 @@ TEST(RollBackLocalOperationsTest, InvalidRollbackOperationFunction) { TEST(RollBackLocalOperationsTest, EmptyLocalOplog) { OplogInterfaceMock localOplog; RollBackLocalOperations finder(localOplog, [](const BSONObj&) { return Status::OK(); }); - auto result = finder.onRemoteOperation(makeOp(1, 0)); + auto result = finder.onRemoteOperation(makeOp(1)); ASSERT_EQUALS(ErrorCodes::OplogStartMissing, result.getStatus().code()); } TEST(RollBackLocalOperationsTest, RollbackMultipleLocalOperations) { - auto commonOperation = makeOpAndRecordId(1, 1); + auto commonOperation = makeOpAndRecordId(1); OplogInterfaceMock::Operations localOperations({ - makeOpAndRecordId(5, 1), - makeOpAndRecordId(4, 1), - makeOpAndRecordId(3, 1), - makeOpAndRecordId(2, 1), + makeOpAndRecordId(5), + makeOpAndRecordId(4), + makeOpAndRecordId(3), + makeOpAndRecordId(2), commonOperation, }); OplogInterfaceMock localOplog(localOperations); @@ -126,9 +126,9 @@ TEST(RollBackLocalOperationsTest, RollbackMultipleLocalOperations) { } TEST(RollBackLocalOperationsTest, RollbackOperationFailed) { - auto commonOperation = makeOpAndRecordId(1, 1); + auto commonOperation = makeOpAndRecordId(1); OplogInterfaceMock::Operations localOperations({ - makeOpAndRecordId(2, 1), commonOperation, + makeOpAndRecordId(2), commonOperation, }); OplogInterfaceMock localOplog(localOperations); auto rollbackOperation = [&](const BSONObj& operation) { @@ -140,9 +140,9 @@ TEST(RollBackLocalOperationsTest, RollbackOperationFailed) { } TEST(RollBackLocalOperationsTest, EndOfLocalOplog) { - auto commonOperation = makeOpAndRecordId(1, 1); + auto commonOperation = makeOpAndRecordId(1); OplogInterfaceMock::Operations localOperations({ - makeOpAndRecordId(2, 1), + makeOpAndRecordId(2), }); OplogInterfaceMock localOplog(localOperations); RollBackLocalOperations finder(localOplog, [](const BSONObj&) { return Status::OK(); }); @@ -151,9 +151,9 @@ TEST(RollBackLocalOperationsTest, EndOfLocalOplog) { } TEST(RollBackLocalOperationsTest, SkipRemoteOperations) { - auto commonOperation = makeOpAndRecordId(1, 1); + auto commonOperation = makeOpAndRecordId(1); OplogInterfaceMock::Operations localOperations({ - makeOpAndRecordId(5, 1), makeOpAndRecordId(4, 1), makeOpAndRecordId(2, 1), commonOperation, + makeOpAndRecordId(5), makeOpAndRecordId(4), makeOpAndRecordId(2), commonOperation, }); OplogInterfaceMock localOplog(localOperations); auto i = localOperations.cbegin(); @@ -164,46 +164,12 @@ TEST(RollBackLocalOperationsTest, SkipRemoteOperations) { }; RollBackLocalOperations finder(localOplog, rollbackOperation); { - auto result = finder.onRemoteOperation(makeOp(6, 1)); + auto result = finder.onRemoteOperation(makeOp(6)); ASSERT_EQUALS(ErrorCodes::NoSuchKey, result.getStatus().code()); ASSERT_TRUE(i == localOperations.cbegin()); } { - auto result = finder.onRemoteOperation(makeOp(3, 1)); - ASSERT_EQUALS(ErrorCodes::NoSuchKey, result.getStatus().code()); - ASSERT_TRUE(std::distance(localOperations.cbegin(), i) == 2); - } - auto result = finder.onRemoteOperation(commonOperation.first); - ASSERT_OK(result.getStatus()); - ASSERT_EQUALS(OpTime::parseFromOplogEntry(commonOperation.first), - result.getValue().getOpTime()); - ASSERT_EQUALS(commonOperation.second, result.getValue().getRecordId()); - ASSERT_FALSE(i == localOperations.cend()); - ASSERT_BSONOBJ_EQ(commonOperation.first, i->first); - i++; - ASSERT_TRUE(i == localOperations.cend()); -} - -TEST(RollBackLocalOperationsTest, SameTimestampDifferentHashess) { - auto commonOperation = makeOpAndRecordId(1, 1); - OplogInterfaceMock::Operations localOperations({ - makeOpAndRecordId(1, 5), makeOpAndRecordId(1, 3), commonOperation, - }); - OplogInterfaceMock localOplog(localOperations); - auto i = localOperations.cbegin(); - auto rollbackOperation = [&](const BSONObj& operation) { - ASSERT_BSONOBJ_EQ(i->first, operation); - i++; - return Status::OK(); - }; - RollBackLocalOperations finder(localOplog, rollbackOperation); - { - auto result = finder.onRemoteOperation(makeOp(1, 4)); - ASSERT_EQUALS(ErrorCodes::NoSuchKey, result.getStatus().code()); - ASSERT_TRUE(std::distance(localOperations.cbegin(), i) == 1); - } - { - auto result = finder.onRemoteOperation(makeOp(1, 2)); + auto result = finder.onRemoteOperation(makeOp(3)); ASSERT_EQUALS(ErrorCodes::NoSuchKey, result.getStatus().code()); ASSERT_TRUE(std::distance(localOperations.cbegin(), i) == 2); } @@ -218,34 +184,24 @@ TEST(RollBackLocalOperationsTest, SameTimestampDifferentHashess) { ASSERT_TRUE(i == localOperations.cend()); } -TEST(RollBackLocalOperationsTest, SameTimestampDifferentHashesRollbackOperationFailed) { +TEST(RollBackLocalOperationsTest, SameTimestampDifferentTermsRollbackNoSuchKey) { auto commonOperation = makeOpAndRecordId(1, 1); OplogInterfaceMock::Operations localOperations({ - makeOpAndRecordId(1, 3), commonOperation, + makeOpAndRecordId(2, 3), commonOperation, }); OplogInterfaceMock localOplog(localOperations); auto rollbackOperation = [&](const BSONObj& operation) { return Status(ErrorCodes::OperationFailed, ""); }; RollBackLocalOperations finder(localOplog, rollbackOperation); - auto result = finder.onRemoteOperation(makeOp(1, 2)); - ASSERT_EQUALS(ErrorCodes::OperationFailed, result.getStatus().code()); -} - -TEST(RollBackLocalOperationsTest, SameTimestampDifferentHashesEndOfLocalOplog) { - OplogInterfaceMock::Operations localOperations({ - makeOpAndRecordId(1, 3), - }); - OplogInterfaceMock localOplog(localOperations); - RollBackLocalOperations finder(localOplog, [](const BSONObj&) { return Status::OK(); }); - auto result = finder.onRemoteOperation(makeOp(1, 2)); - ASSERT_EQUALS(ErrorCodes::NoMatchingDocument, result.getStatus().code()); + auto result = finder.onRemoteOperation(makeOp(2, 2)); + ASSERT_EQUALS(ErrorCodes::NoSuchKey, result.getStatus().code()); } TEST(SyncRollBackLocalOperationsTest, OplogStartMissing) { ASSERT_EQUALS(ErrorCodes::OplogStartMissing, syncRollBackLocalOperations(OplogInterfaceMock(), - OplogInterfaceMock({makeOpAndRecordId(1, 0)}), + OplogInterfaceMock({makeOpAndRecordId(1)}), [](const BSONObj&) { return Status::OK(); }) .getStatus() .code()); @@ -253,7 +209,7 @@ TEST(SyncRollBackLocalOperationsTest, OplogStartMissing) { TEST(SyncRollBackLocalOperationsTest, RemoteOplogMissing) { ASSERT_EQUALS(ErrorCodes::InvalidSyncSource, - syncRollBackLocalOperations(OplogInterfaceMock({makeOpAndRecordId(1, 0)}), + syncRollBackLocalOperations(OplogInterfaceMock({makeOpAndRecordId(1)}), OplogInterfaceMock(), [](const BSONObj&) { return Status::OK(); }) .getStatus() @@ -261,9 +217,9 @@ TEST(SyncRollBackLocalOperationsTest, RemoteOplogMissing) { } TEST(SyncRollBackLocalOperationsTest, RollbackTwoOperations) { - auto commonOperation = makeOpAndRecordId(1, 1); + auto commonOperation = makeOpAndRecordId(1); OplogInterfaceMock::Operations localOperations({ - makeOpAndRecordId(3, 1), makeOpAndRecordId(2, 1), commonOperation, + makeOpAndRecordId(3), makeOpAndRecordId(2), commonOperation, }); auto i = localOperations.cbegin(); auto result = syncRollBackLocalOperations(OplogInterfaceMock(localOperations), @@ -284,8 +240,8 @@ TEST(SyncRollBackLocalOperationsTest, RollbackTwoOperations) { } TEST(SyncRollBackLocalOperationsTest, SkipOneRemoteOperation) { - auto commonOperation = makeOpAndRecordId(1, 1); - auto remoteOperation = makeOpAndRecordId(2, 1); + auto commonOperation = makeOpAndRecordId(1); + auto remoteOperation = makeOpAndRecordId(2); auto result = syncRollBackLocalOperations(OplogInterfaceMock({commonOperation}), OplogInterfaceMock({remoteOperation, commonOperation}), @@ -299,77 +255,10 @@ TEST(SyncRollBackLocalOperationsTest, SkipOneRemoteOperation) { ASSERT_EQUALS(commonOperation.second, result.getValue().getRecordId()); } -TEST(SyncRollBackLocalOperationsTest, SameTimestampDifferentHashes) { - auto commonOperation = makeOpAndRecordId(1, 1); - auto localOperation = makeOpAndRecordId(1, 2); - auto remoteOperation = makeOpAndRecordId(1, 3); - bool called = false; - auto result = - syncRollBackLocalOperations(OplogInterfaceMock({localOperation, commonOperation}), - OplogInterfaceMock({remoteOperation, commonOperation}), - [&](const BSONObj& operation) { - ASSERT_BSONOBJ_EQ(localOperation.first, operation); - called = true; - return Status::OK(); - }); - ASSERT_OK(result.getStatus()); - ASSERT_EQUALS(OpTime::parseFromOplogEntry(commonOperation.first), - result.getValue().getOpTime()); - ASSERT_EQUALS(commonOperation.second, result.getValue().getRecordId()); - ASSERT_TRUE(called); -} - -TEST(SyncRollBackLocalOperationsTest, SameTimestampEndOfLocalOplog) { - auto commonOperation = makeOpAndRecordId(1, 1); - auto localOperation = makeOpAndRecordId(1, 2); - auto remoteOperation = makeOpAndRecordId(1, 3); - bool called = false; - auto result = - syncRollBackLocalOperations(OplogInterfaceMock({localOperation}), - OplogInterfaceMock({remoteOperation, commonOperation}), - [&](const BSONObj& operation) { - ASSERT_BSONOBJ_EQ(localOperation.first, operation); - called = true; - return Status::OK(); - }); - ASSERT_EQUALS(ErrorCodes::NoMatchingDocument, result.getStatus().code()); - ASSERT_STRING_CONTAINS(result.getStatus().reason(), "reached beginning of local oplog"); - ASSERT_TRUE(called); -} - -TEST(SyncRollBackLocalOperationsTest, SameTimestampRollbackOperationFailed) { - auto commonOperation = makeOpAndRecordId(1, 1); - auto localOperation = makeOpAndRecordId(1, 2); - auto remoteOperation = makeOpAndRecordId(1, 3); - auto result = syncRollBackLocalOperations( - OplogInterfaceMock({localOperation, commonOperation}), - OplogInterfaceMock({remoteOperation, commonOperation}), - [&](const BSONObj& operation) { return Status(ErrorCodes::OperationFailed, ""); }); - ASSERT_EQUALS(ErrorCodes::OperationFailed, result.getStatus().code()); -} - -TEST(SyncRollBackLocalOperationsTest, SameTimestampEndOfRemoteOplog) { - auto commonOperation = makeOpAndRecordId(1, 1); - auto localOperation = makeOpAndRecordId(1, 2); - auto remoteOperation = makeOpAndRecordId(1, 3); - bool called = false; - auto result = - syncRollBackLocalOperations(OplogInterfaceMock({localOperation, commonOperation}), - OplogInterfaceMock({remoteOperation}), - [&](const BSONObj& operation) { - ASSERT_BSONOBJ_EQ(localOperation.first, operation); - called = true; - return Status::OK(); - }); - ASSERT_EQUALS(ErrorCodes::NoMatchingDocument, result.getStatus().code()); - ASSERT_STRING_CONTAINS(result.getStatus().reason(), "reached beginning of remote oplog"); - ASSERT_TRUE(called); -} - TEST(SyncRollBackLocalOperationsTest, DifferentTimestampEndOfLocalOplog) { - auto commonOperation = makeOpAndRecordId(1, 1); - auto localOperation = makeOpAndRecordId(3, 1); - auto remoteOperation = makeOpAndRecordId(2, 1); + auto commonOperation = makeOpAndRecordId(1); + auto localOperation = makeOpAndRecordId(3); + auto remoteOperation = makeOpAndRecordId(2); bool called = false; auto result = syncRollBackLocalOperations(OplogInterfaceMock({localOperation}), @@ -385,8 +274,8 @@ TEST(SyncRollBackLocalOperationsTest, DifferentTimestampEndOfLocalOplog) { } TEST(SyncRollBackLocalOperationsTest, DifferentTimestampRollbackOperationFailed) { - auto localOperation = makeOpAndRecordId(3, 1); - auto remoteOperation = makeOpAndRecordId(2, 1); + auto localOperation = makeOpAndRecordId(3); + auto remoteOperation = makeOpAndRecordId(2); auto result = syncRollBackLocalOperations( OplogInterfaceMock({localOperation}), OplogInterfaceMock({remoteOperation}), @@ -395,9 +284,9 @@ TEST(SyncRollBackLocalOperationsTest, DifferentTimestampRollbackOperationFailed) } TEST(SyncRollBackLocalOperationsTest, DifferentTimestampEndOfRemoteOplog) { - auto commonOperation = makeOpAndRecordId(1, 1); - auto localOperation = makeOpAndRecordId(2, 1); - auto remoteOperation = makeOpAndRecordId(3, 1); + auto commonOperation = makeOpAndRecordId(1); + auto localOperation = makeOpAndRecordId(2); + auto remoteOperation = makeOpAndRecordId(3); auto result = syncRollBackLocalOperations(OplogInterfaceMock({localOperation, commonOperation}), OplogInterfaceMock({remoteOperation}), [&](const BSONObj& operation) { @@ -433,8 +322,8 @@ public: unittest::log() << "Returning success on DBClientCursorForTest::query()"; BSONArrayBuilder builder; - builder.append(makeOp(1, 1)); - builder.append(makeOp(2, 2)); + builder.append(makeOp(1)); + builder.append(makeOp(2)); return std::make_unique<DBClientMockCursor>(this, builder.arr()); } @@ -447,7 +336,7 @@ void checkRemoteIterator(int numNetworkFailures, bool expectedToSucceed) { DBClientConnectionForTest conn(numNetworkFailures); auto getConnection = [&]() -> DBClientBase* { return &conn; }; - auto localOperation = makeOpAndRecordId(1, 1); + auto localOperation = makeOpAndRecordId(1); OplogInterfaceRemote remoteOplogMock( HostAndPort("229w43rd", 10036), getConnection, "somecollection", 0); diff --git a/src/mongo/db/repl/rollback_impl_test.cpp b/src/mongo/db/repl/rollback_impl_test.cpp index 0adb23068a6..f671ddb42e4 100644 --- a/src/mongo/db/repl/rollback_impl_test.cpp +++ b/src/mongo/db/repl/rollback_impl_test.cpp @@ -63,7 +63,7 @@ NamespaceString nss("test.coll"); std::string kGenericUUIDStr = "b4c66a44-c1ca-4d86-8d25-12e82fa2de5b"; BSONObj makeInsertOplogEntry(long long time, BSONObj obj, StringData ns, UUID uuid) { - return BSON("ts" << Timestamp(time, time) << "h" << time << "t" << time << "op" + return BSON("ts" << Timestamp(time, time) << "t" << time << "op" << "i" << "o" << obj @@ -75,7 +75,7 @@ BSONObj makeInsertOplogEntry(long long time, BSONObj obj, StringData ns, UUID uu BSONObj makeUpdateOplogEntry( long long time, BSONObj query, BSONObj update, StringData ns, UUID uuid) { - return BSON("ts" << Timestamp(time, time) << "h" << time << "t" << time << "op" + return BSON("ts" << Timestamp(time, time) << "t" << time << "op" << "u" << "ns" << ns @@ -88,7 +88,7 @@ BSONObj makeUpdateOplogEntry( } BSONObj makeDeleteOplogEntry(long long time, BSONObj id, StringData ns, UUID uuid) { - return BSON("ts" << Timestamp(time, time) << "h" << time << "t" << time << "op" + return BSON("ts" << Timestamp(time, time) << "t" << time << "op" << "d" << "ns" << ns @@ -349,11 +349,11 @@ private: }; /** - * Helper functions to make simple oplog entries with timestamps, terms, and hashes. + * Helper functions to make simple oplog entries with timestamps and terms. */ -BSONObj makeOp(OpTime time, long long hash) { +BSONObj makeOp(OpTime time) { auto kGenericUUID = unittest::assertGet(UUID::parse(kGenericUUIDStr)); - return BSON("ts" << time.getTimestamp() << "h" << hash << "t" << time.getTerm() << "op" + return BSON("ts" << time.getTimestamp() << "t" << time.getTerm() << "op" << "n" << "o" << BSONObj() @@ -364,16 +364,16 @@ BSONObj makeOp(OpTime time, long long hash) { } BSONObj makeOp(int count) { - return makeOp(OpTime(Timestamp(count, count), count), count); + return makeOp(OpTime(Timestamp(count, count), count)); } /** - * Helper functions to make simple oplog entries with timestamps, terms, hashes, and wall clock + * Helper functions to make simple oplog entries with timestamps, terms, and wall clock * times. */ -auto makeOpWithWallClockTime(long count, long long hash, long wallClockMillis) { +auto makeOpWithWallClockTime(long count, long wallClockMillis) { auto kGenericUUID = unittest::assertGet(UUID::parse(kGenericUUIDStr)); - return BSON("ts" << Timestamp(count, count) << "h" << hash << "t" << (long long)count << "op" + return BSON("ts" << Timestamp(count, count) << "t" << (long long)count << "op" << "n" << "o" << BSONObj() @@ -394,8 +394,8 @@ OplogInterfaceMock::Operation makeOpAndRecordId(const BSONObj& op) { return std::make_pair(op, RecordId(++recordId)); } -OplogInterfaceMock::Operation makeOpAndRecordId(OpTime time, long long hash) { - return makeOpAndRecordId(makeOp(time, hash)); +OplogInterfaceMock::Operation makeOpAndRecordId(OpTime time) { + return makeOpAndRecordId(makeOp(time)); } OplogInterfaceMock::Operation makeOpAndRecordId(int count) { @@ -465,8 +465,8 @@ TEST_F(RollbackImplTest, RollbackReturnsNoMatchingDocumentWhenNoCommonPoint) { TEST_F(RollbackImplTest, RollbackSucceedsIfRollbackPeriodIsWithinTimeLimit) { // The default limit is 1 day, so we make the difference be just under a day. - auto commonPoint = makeOpAndRecordId(makeOpWithWallClockTime(1, 1, 5 * 1000)); - auto topOfOplog = makeOpAndRecordId(makeOpWithWallClockTime(2, 2, 60 * 60 * 24 * 1000)); + auto commonPoint = makeOpAndRecordId(makeOpWithWallClockTime(1, 5 * 1000)); + auto topOfOplog = makeOpAndRecordId(makeOpWithWallClockTime(2, 60 * 60 * 24 * 1000)); _remoteOplog->setOperations({commonPoint}); ASSERT_OK(_insertOplogEntry(commonPoint.first)); @@ -481,8 +481,8 @@ TEST_F(RollbackImplTest, RollbackSucceedsIfRollbackPeriodIsWithinTimeLimit) { TEST_F(RollbackImplTest, RollbackFailsIfRollbackPeriodIsTooLong) { // The default limit is 1 day, so we make the difference be 2 days. - auto commonPoint = makeOpAndRecordId(makeOpWithWallClockTime(1, 1, 5 * 1000)); - auto topOfOplog = makeOpAndRecordId(makeOpWithWallClockTime(2, 2, 2 * 60 * 60 * 24 * 1000)); + auto commonPoint = makeOpAndRecordId(makeOpWithWallClockTime(1, 5 * 1000)); + auto topOfOplog = makeOpAndRecordId(makeOpWithWallClockTime(2, 2 * 60 * 60 * 24 * 1000)); _remoteOplog->setOperations({commonPoint}); ASSERT_OK(_insertOplogEntry(commonPoint.first)); @@ -857,7 +857,7 @@ TEST_F(RollbackImplTest, RollbackDoesNotWriteRollbackFilesIfNoInsertsOrUpdatesAf const auto uuid = UUID::gen(); const auto nss = NamespaceString("db.coll"); const auto coll = _initializeCollection(_opCtx.get(), uuid, nss); - const auto oplogEntry = BSON("ts" << Timestamp(3, 3) << "h" << 3LL << "t" << 3LL << "op" + const auto oplogEntry = BSON("ts" << Timestamp(3, 3) << "t" << 3LL << "op" << "c" << "o" << BSON("create" << nss.coll()) @@ -1082,15 +1082,15 @@ TEST_F(RollbackImplTest, RollbackProperlySavesFilesWhenInsertsAndDropOfCollectio _insertDocAndGenerateOplogEntry(obj2, uuid, nss); // Create an oplog entry for the collection drop. - const auto oplogEntry = BSON( - "ts" << dropOpTime.getTimestamp() << "h" << 200LL << "t" << dropOpTime.getTerm() << "op" - << "c" - << "o" - << BSON("drop" << nss.coll()) - << "ns" - << nss.ns() - << "ui" - << uuid); + const auto oplogEntry = + BSON("ts" << dropOpTime.getTimestamp() << "t" << dropOpTime.getTerm() << "op" + << "c" + << "o" + << BSON("drop" << nss.coll()) + << "ns" + << nss.ns() + << "ui" + << uuid); ASSERT_OK(_insertOplogEntry(oplogEntry)); ASSERT_OK(_rollback->runRollback(_opCtx.get())); @@ -1115,7 +1115,7 @@ TEST_F(RollbackImplTest, RollbackProperlySavesFilesWhenCreateCollAndInsertsAreRo const auto nss = NamespaceString("db.people"); const auto uuid = UUID::gen(); const auto coll = _initializeCollection(_opCtx.get(), uuid, nss); - const auto oplogEntry = BSON("ts" << Timestamp(3, 3) << "h" << 3LL << "t" << 3LL << "op" + const auto oplogEntry = BSON("ts" << Timestamp(3, 3) << "t" << 3LL << "op" << "c" << "o" << BSON("create" << nss.coll()) @@ -1434,7 +1434,6 @@ public: BSONObjBuilder bob; bob.append("ts", time); - bob.append("h", 1LL); bob.append("op", "i"); collId.appendToBuilder(&bob, "ui"); bob.append("ns", nss.ns()); diff --git a/src/mongo/db/repl/rollback_test_fixture.cpp b/src/mongo/db/repl/rollback_test_fixture.cpp index 773d5cab787..ad236c6c24d 100644 --- a/src/mongo/db/repl/rollback_test_fixture.cpp +++ b/src/mongo/db/repl/rollback_test_fixture.cpp @@ -154,7 +154,6 @@ std::pair<BSONObj, RecordId> RollbackTest::makeCRUDOp(OpTypeEnum opType, BSONObjBuilder bob; bob.append("ts", ts); - bob.append("h", 1LL); bob.append("op", OpType_serializer(opType)); uuid.appendToBuilder(&bob, "ui"); bob.append("ns", nss); @@ -176,7 +175,6 @@ std::pair<BSONObj, RecordId> RollbackTest::makeCommandOp(Timestamp ts, BSONObjBuilder bob; bob.append("ts", ts); - bob.append("h", 1LL); bob.append("op", "c"); if (uuid) { // Not all ops have UUID fields. uuid.get().appendToBuilder(&bob, "ui"); @@ -296,7 +294,7 @@ void RollbackResyncsCollectionOptionsTest::resyncCollectionOptionsTest( auto coll = _createCollection(_opCtx.get(), nss.toString(), localCollOptions); auto commonOpUuid = unittest::assertGet(UUID::parse("f005ba11-cafe-bead-f00d-123456789abc")); - auto commonOpBson = BSON("ts" << Timestamp(1, 1) << "h" << 1LL << "t" << 1LL << "op" + auto commonOpBson = BSON("ts" << Timestamp(1, 1) << "t" << 1LL << "op" << "n" << "o" << BSONObj() diff --git a/src/mongo/db/repl/rs_rollback.cpp b/src/mongo/db/repl/rs_rollback.cpp index ba0720d6a5d..56e401b609c 100644 --- a/src/mongo/db/repl/rs_rollback.cpp +++ b/src/mongo/db/repl/rs_rollback.cpp @@ -233,10 +233,6 @@ Status rollback_internal::updateFixUpInfoFromLocalOplogEntry(FixUpInfo& fixUpInf if (!ourObj.hasField("ts")) { bob.appendTimestamp("ts"); } - if (!ourObj.hasField("h")) { - long long dummyHash = 0; - bob.append("h", dummyHash); - } } bob.appendElements(ourObj); BSONObj fixedObj = bob.obj(); @@ -1475,7 +1471,7 @@ void rollback_internal::syncFixUp(OperationContext* opCtx, } // Reload the lastAppliedOpTime and lastDurableOpTime value in the replcoord and the - // lastAppliedHash value in bgsync to reflect our new last op. The rollback common point does + // lastApplied value in bgsync to reflect our new last op. The rollback common point does // not necessarily represent a consistent database state. For example, on a secondary, we may // have rolled back to an optime that fell in the middle of an oplog application batch. We make // the database consistent again after rollback by applying ops forward until we reach diff --git a/src/mongo/db/repl/rs_rollback_test.cpp b/src/mongo/db/repl/rs_rollback_test.cpp index 0a4de833850..d7251cd8055 100644 --- a/src/mongo/db/repl/rs_rollback_test.cpp +++ b/src/mongo/db/repl/rs_rollback_test.cpp @@ -75,7 +75,7 @@ class RSRollbackTest : public RollbackTest {}; OplogInterfaceMock::Operation makeNoopOplogEntryAndRecordId(Seconds seconds) { OpTime ts(Timestamp(seconds, 0), 0); - return std::make_pair(BSON("ts" << ts.getTimestamp() << "h" << ts.getTerm()), RecordId(1)); + return std::make_pair(BSON("ts" << ts.getTimestamp()), RecordId(1)); } OplogInterfaceMock::Operation makeDropIndexOplogEntry(Collection* collection, @@ -87,7 +87,7 @@ OplogInterfaceMock::Operation makeDropIndexOplogEntry(Collection* collection, << static_cast<int>(kIndexVersion)); return std::make_pair( - BSON("ts" << Timestamp(Seconds(time), 0) << "h" << 1LL << "op" + BSON("ts" << Timestamp(Seconds(time), 0) << "op" << "c" << "ui" << collection->uuid().get() @@ -112,7 +112,7 @@ OplogInterfaceMock::Operation makeCreateIndexOplogEntry(Collection* collection, << "name" << indexName); - return std::make_pair(BSON("ts" << Timestamp(Seconds(time), 0) << "h" << 1LL << "op" + return std::make_pair(BSON("ts" << Timestamp(Seconds(time), 0) << "op" << "c" << "ns" << "test.$cmd" @@ -139,21 +139,20 @@ OplogInterfaceMock::Operation makeRenameCollectionOplogEntry(const NamespaceStri if (dropTarget) { obj = obj.addField(BSON("dropTarget" << *dropTarget).firstElement()); } - return std::make_pair( - BSON("ts" << opTime.getTimestamp() << "t" << opTime.getTerm() << "h" << 1LL << "op" - << "c" - << "ui" - << collectionUUID - << "ns" - << renameFrom.ns() - << "o" - << obj), - RecordId(opTime.getTimestamp().getSecs())); + return std::make_pair(BSON("ts" << opTime.getTimestamp() << "t" << opTime.getTerm() << "op" + << "c" + << "ui" + << collectionUUID + << "ns" + << renameFrom.ns() + << "o" + << obj), + RecordId(opTime.getTimestamp().getSecs())); } -BSONObj makeOp(long long seconds, long long hash) { +BSONObj makeOp(long long seconds) { auto uuid = unittest::assertGet(UUID::parse("f005ba11-cafe-bead-f00d-123456789abc")); - return BSON("ts" << Timestamp(seconds, seconds) << "h" << hash << "t" << seconds << "op" + return BSON("ts" << Timestamp(seconds, seconds) << "t" << seconds << "op" << "n" << "o" << BSONObj() @@ -164,8 +163,8 @@ BSONObj makeOp(long long seconds, long long hash) { } int recordId = 0; -OplogInterfaceMock::Operation makeOpAndRecordId(long long seconds, long long hash) { - return std::make_pair(makeOp(seconds, hash), RecordId(++recordId)); +OplogInterfaceMock::Operation makeOpAndRecordId(long long seconds) { + return std::make_pair(makeOp(seconds), RecordId(++recordId)); } // Create an index on an empty collection. Returns the number of indexes that exist on the @@ -199,8 +198,7 @@ TEST_F(RSRollbackTest, InconsistentMinValid) { TEST_F(RSRollbackTest, OplogStartMissing) { OpTime ts(Timestamp(Seconds(1), 0), 0); - auto operation = - std::make_pair(BSON("ts" << ts.getTimestamp() << "h" << ts.getTerm()), RecordId()); + auto operation = std::make_pair(BSON("ts" << ts.getTimestamp()), RecordId()); OplogInterfaceMock::Operations remoteOperations({operation}); auto remoteOplog = stdx::make_unique<OplogInterfaceMock>(remoteOperations); ASSERT_EQUALS(ErrorCodes::OplogStartMissing, @@ -215,8 +213,7 @@ TEST_F(RSRollbackTest, OplogStartMissing) { TEST_F(RSRollbackTest, NoRemoteOpLog) { OpTime ts(Timestamp(Seconds(1), 0), 0); - auto operation = - std::make_pair(BSON("ts" << ts.getTimestamp() << "h" << ts.getTerm()), RecordId()); + auto operation = std::make_pair(BSON("ts" << ts.getTimestamp()), RecordId()); auto status = syncRollback(_opCtx.get(), OplogInterfaceMock({operation}), RollbackSourceMock(stdx::make_unique<OplogInterfaceMock>()), @@ -229,8 +226,7 @@ TEST_F(RSRollbackTest, NoRemoteOpLog) { TEST_F(RSRollbackTest, RemoteGetRollbackIdThrows) { OpTime ts(Timestamp(Seconds(1), 0), 0); - auto operation = - std::make_pair(BSON("ts" << ts.getTimestamp() << "h" << ts.getTerm()), RecordId()); + auto operation = std::make_pair(BSON("ts" << ts.getTimestamp()), RecordId()); class RollbackSourceLocal : public RollbackSourceMock { public: RollbackSourceLocal(std::unique_ptr<OplogInterface> oplog) @@ -251,8 +247,7 @@ TEST_F(RSRollbackTest, RemoteGetRollbackIdThrows) { TEST_F(RSRollbackTest, RemoteGetRollbackIdDiffersFromRequiredRBID) { OpTime ts(Timestamp(Seconds(1), 0), 0); - auto operation = - std::make_pair(BSON("ts" << ts.getTimestamp() << "h" << ts.getTerm()), RecordId()); + auto operation = std::make_pair(BSON("ts" << ts.getTimestamp()), RecordId()); class RollbackSourceLocal : public RollbackSourceMock { public: @@ -274,7 +269,7 @@ TEST_F(RSRollbackTest, RemoteGetRollbackIdDiffersFromRequiredRBID) { TEST_F(RSRollbackTest, BothOplogsAtCommonPoint) { createOplog(_opCtx.get()); - auto operation = makeOpAndRecordId(1, 1); + auto operation = makeOpAndRecordId(1); ASSERT_OK( syncRollback(_opCtx.get(), OplogInterfaceMock({operation}), @@ -297,17 +292,16 @@ int _testRollbackDelete(OperationContext* opCtx, UUID uuid, const BSONObj& documentAtSource, const bool collectionAtSourceExists = true) { - auto commonOperation = makeOpAndRecordId(1, 1); - auto deleteOperation = - std::make_pair(BSON("ts" << Timestamp(Seconds(2), 0) << "h" << 2LL << "op" - << "d" - << "ui" - << uuid - << "ns" - << "test.t" - << "o" - << BSON("_id" << 0)), - RecordId(2)); + auto commonOperation = makeOpAndRecordId(1); + auto deleteOperation = std::make_pair(BSON("ts" << Timestamp(Seconds(2), 0) << "op" + << "d" + << "ui" + << uuid + << "ns" + << "test.t" + << "o" + << BSON("_id" << 0)), + RecordId(2)); class RollbackSourceLocal : public RollbackSourceMock { public: RollbackSourceLocal(const BSONObj& documentAtSource, @@ -427,17 +421,16 @@ TEST_F(RSRollbackTest, RollbackDeleteRestoreDocument) { TEST_F(RSRollbackTest, RollbackInsertDocumentWithNoId) { createOplog(_opCtx.get()); - auto commonOperation = makeOpAndRecordId(1, 1); - auto insertDocumentOperation = - std::make_pair(BSON("ts" << Timestamp(Seconds(2), 0) << "h" << 1LL << "op" - << "i" - << "ui" - << UUID::gen() - << "ns" - << "test.t" - << "o" - << BSON("a" << 1)), - RecordId(2)); + auto commonOperation = makeOpAndRecordId(1); + auto insertDocumentOperation = std::make_pair(BSON("ts" << Timestamp(Seconds(2), 0) << "op" + << "i" + << "ui" + << UUID::gen() + << "ns" + << "test.t" + << "o" + << BSON("a" << 1)), + RecordId(2)); class RollbackSourceLocal : public RollbackSourceMock { public: RollbackSourceLocal(std::unique_ptr<OplogInterface> oplog) @@ -482,7 +475,7 @@ TEST_F(RSRollbackTest, RollbackCreateIndexCommand) { int numIndexes = _createIndexOnEmptyCollection(_opCtx.get(), collection, nss, indexSpec); ASSERT_EQUALS(2, numIndexes); - auto commonOperation = makeOpAndRecordId(1, 1); + auto commonOperation = makeOpAndRecordId(1); auto createIndexOperation = makeCreateIndexOplogEntry(collection, BSON("a" << 1), "a_1", 2); // Repeat index creation operation and confirm that rollback attempts to drop index just once. @@ -534,7 +527,7 @@ TEST_F(RSRollbackTest, RollbackCreateIndexCommandIndexNotInCatalog) { ASSERT_EQUALS(1, indexCatalog->numIndexesReady(_opCtx.get())); } - auto commonOperation = makeOpAndRecordId(1, 1); + auto commonOperation = makeOpAndRecordId(1); auto createIndexOperation = makeCreateIndexOplogEntry(collection, BSON("a" << 1), "a_1", 2); RollbackSourceMock rollbackSource(std::unique_ptr<OplogInterface>(new OplogInterfaceMock({ @@ -569,7 +562,7 @@ TEST_F(RSRollbackTest, RollbackDropIndexCommandWithOneIndex) { ASSERT_EQUALS(1, indexCatalog->numIndexesReady(_opCtx.get())); } - auto commonOperation = makeOpAndRecordId(1, 1); + auto commonOperation = makeOpAndRecordId(1); auto dropIndexOperation = makeDropIndexOplogEntry(collection, BSON("a" << 1), "a_1", 2); RollbackSourceMock rollbackSource(std::unique_ptr<OplogInterface>(new OplogInterfaceMock({ @@ -601,7 +594,7 @@ TEST_F(RSRollbackTest, RollbackDropIndexCommandWithMultipleIndexes) { ASSERT_EQUALS(1, indexCatalog->numIndexesReady(_opCtx.get())); } - auto commonOperation = makeOpAndRecordId(1, 1); + auto commonOperation = makeOpAndRecordId(1); auto dropIndexOperation1 = makeDropIndexOplogEntry(collection, BSON("a" << 1), "a_1", 2); auto dropIndexOperation2 = makeDropIndexOplogEntry(collection, BSON("b" << 1), "b_1", 3); @@ -637,7 +630,7 @@ TEST_F(RSRollbackTest, RollingBackCreateAndDropOfSameIndexIgnoresBothCommands) { ASSERT_EQUALS(1, indexCatalog->numIndexesReady(_opCtx.get())); } - auto commonOperation = makeOpAndRecordId(1, 1); + auto commonOperation = makeOpAndRecordId(1); auto createIndexOperation = makeCreateIndexOplogEntry(collection, BSON("a" << 1), "a_1", 2); @@ -680,7 +673,7 @@ TEST_F(RSRollbackTest, RollingBackCreateIndexAndRenameWithLongName) { int numIndexes = _createIndexOnEmptyCollection(_opCtx.get(), collection, nss, indexSpec); ASSERT_EQUALS(2, numIndexes); - auto commonOperation = makeOpAndRecordId(1, 1); + auto commonOperation = makeOpAndRecordId(1); auto createIndexOperation = makeCreateIndexOplogEntry(collection, BSON("b" << 1), longName, 2); @@ -735,7 +728,7 @@ TEST_F(RSRollbackTest, RollingBackDropAndCreateOfSameIndexNameWithDifferentSpecs int numIndexes = _createIndexOnEmptyCollection(_opCtx.get(), collection, nss, indexSpec); ASSERT_EQUALS(2, numIndexes); - auto commonOperation = makeOpAndRecordId(1, 1); + auto commonOperation = makeOpAndRecordId(1); auto dropIndexOperation = makeDropIndexOplogEntry(collection, BSON("a" << 1), "a_1", 2); @@ -789,7 +782,7 @@ TEST_F(RSRollbackTest, RollbackCreateIndexCommandMissingIndexName) { CollectionOptions options; options.uuid = UUID::gen(); auto collection = _createCollection(_opCtx.get(), "test.t", options); - auto commonOperation = makeOpAndRecordId(1, 1); + auto commonOperation = makeOpAndRecordId(1); BSONObj command = BSON("createIndexes" << "t" << "ns" @@ -799,16 +792,15 @@ TEST_F(RSRollbackTest, RollbackCreateIndexCommandMissingIndexName) { << "key" << BSON("a" << 1)); - auto createIndexOperation = - std::make_pair(BSON("ts" << Timestamp(Seconds(2), 0) << "h" << 1LL << "op" - << "c" - << "ns" - << "test.$cmd" - << "ui" - << collection->uuid().get() - << "o" - << command), - RecordId(2)); + auto createIndexOperation = std::make_pair(BSON("ts" << Timestamp(Seconds(2), 0) << "op" + << "c" + << "ns" + << "test.$cmd" + << "ui" + << collection->uuid().get() + << "o" + << command), + RecordId(2)); RollbackSourceMock rollbackSource(std::unique_ptr<OplogInterface>(new OplogInterfaceMock({ commonOperation, }))); @@ -863,7 +855,7 @@ TEST_F(RSRollbackTest, RollbackDropIndexOnCollectionWithTwoExistingIndexes) { int numIndexes = _createIndexOnEmptyCollection(_opCtx.get(), coll, nss, idxSpec(nss, "1")); ASSERT_EQUALS(2, numIndexes); - auto commonOp = makeOpAndRecordId(1, 1); + auto commonOp = makeOpAndRecordId(1); // The ops that will be rolled back. auto createIndex0Op = makeCreateIndexOplogEntry(coll, BSON(idxKey("0") << 1), idxName("0"), 2); @@ -893,7 +885,7 @@ TEST_F(RSRollbackTest, RollbackTwoIndexDropsPrecededByTwoIndexCreationsOnSameCol NamespaceString nss("test", "coll"); auto coll = _createCollection(_opCtx.get(), nss.toString(), options); - auto commonOp = makeOpAndRecordId(1, 1); + auto commonOp = makeOpAndRecordId(1); // The ops that will be rolled back. auto createIndex0Op = makeCreateIndexOplogEntry(coll, BSON(idxKey("0") << 1), idxName("0"), 2); @@ -924,7 +916,7 @@ TEST_F(RSRollbackTest, RollbackMultipleCreateIndexesOnSameCollection) { NamespaceString nss("test", "coll"); auto coll = _createCollection(_opCtx.get(), nss.toString(), options); - auto commonOp = makeOpAndRecordId(1, 1); + auto commonOp = makeOpAndRecordId(1); // Create all of the necessary indexes. _createIndexOnEmptyCollection(_opCtx.get(), coll, nss, idxSpec(nss, "0")); @@ -970,7 +962,7 @@ TEST_F(RSRollbackTest, RollbackCreateDropRecreateIndexOnCollection) { int numIndexes = _createIndexOnEmptyCollection(_opCtx.get(), coll, nss, indexSpec); ASSERT_EQUALS(2, numIndexes); - auto commonOp = makeOpAndRecordId(1, 1); + auto commonOp = makeOpAndRecordId(1); // The ops that will be rolled back. auto createIndex0Op = makeCreateIndexOplogEntry(coll, BSON(idxKey("0") << 1), idxName("0"), 2); @@ -997,18 +989,17 @@ TEST_F(RSRollbackTest, RollbackCreateDropRecreateIndexOnCollection) { TEST_F(RSRollbackTest, RollbackUnknownCommand) { createOplog(_opCtx.get()); - auto commonOperation = makeOpAndRecordId(1, 1); - auto unknownCommandOperation = - std::make_pair(BSON("ts" << Timestamp(Seconds(2), 0) << "h" << 1LL << "op" - << "c" - << "ui" - << UUID::gen() - << "ns" - << "test.t" - << "o" - << BSON("convertToCapped" - << "t")), - RecordId(2)); + auto commonOperation = makeOpAndRecordId(1); + auto unknownCommandOperation = std::make_pair(BSON("ts" << Timestamp(Seconds(2), 0) << "op" + << "c" + << "ui" + << UUID::gen() + << "ns" + << "test.t" + << "o" + << BSON("convertToCapped" + << "t")), + RecordId(2)); auto status = syncRollback(_opCtx.get(), @@ -1033,19 +1024,18 @@ TEST_F(RSRollbackTest, RollbackDropCollectionCommand) { auto coll = _createCollection(_opCtx.get(), dpns, options); _dropPendingCollectionReaper->addDropPendingNamespace(dropTime, dpns); - auto commonOperation = makeOpAndRecordId(1, 1); + auto commonOperation = makeOpAndRecordId(1); auto dropCollectionOperation = - std::make_pair( - BSON("ts" << dropTime.getTimestamp() << "t" << dropTime.getTerm() << "h" << 1LL << "op" - << "c" - << "ui" - << coll->uuid().get() - << "ns" - << "test.t" - << "o" - << BSON("drop" - << "t")), - RecordId(2)); + std::make_pair(BSON("ts" << dropTime.getTimestamp() << "t" << dropTime.getTerm() << "op" + << "c" + << "ui" + << coll->uuid().get() + << "ns" + << "test.t" + << "o" + << BSON("drop" + << "t")), + RecordId(2)); class RollbackSourceLocal : public RollbackSourceMock { public: RollbackSourceLocal(std::unique_ptr<OplogInterface> oplog) @@ -1090,7 +1080,7 @@ TEST_F(RSRollbackTest, RollbackRenameCollectionInSameDatabaseCommand) { OpTime renameTime = OpTime(Timestamp(2, 0), 5); - auto commonOperation = makeOpAndRecordId(1, 1); + auto commonOperation = makeOpAndRecordId(1); auto renameCollectionOperation = makeRenameCollectionOplogEntry(NamespaceString("test.x"), NamespaceString("test.y"), collectionUUID, @@ -1156,7 +1146,7 @@ TEST_F(RSRollbackTest, mutable bool getCollectionInfoCalled = false; }; - auto commonOperation = makeOpAndRecordId(1, 1); + auto commonOperation = makeOpAndRecordId(1); bool stayTemp = false; auto renameCollectionOperation = makeRenameCollectionOplogEntry(NamespaceString(renameFromNss), @@ -1204,7 +1194,7 @@ TEST_F(RSRollbackTest, RollbackRenameCollectionInDatabaseWithDropTargetTrueComma auto renamedCollection = _createCollection(_opCtx.get(), "test.y", renamedCollOptions); auto renamedCollectionUUID = renamedCollection->uuid().get(); - auto commonOperation = makeOpAndRecordId(1, 1); + auto commonOperation = makeOpAndRecordId(1); auto renameCollectionOperation = makeRenameCollectionOplogEntry(NamespaceString("test.x"), NamespaceString("test.y"), renamedCollectionUUID, @@ -1262,7 +1252,7 @@ void _testRollbackRenamingCollectionsToEachOther(OperationContext* opCtx, ASSERT_NOT_EQUALS(collection1UUID, collection2UUID); - auto commonOperation = makeOpAndRecordId(1, 1); + auto commonOperation = makeOpAndRecordId(1); auto renameCollectionOperationXtoZ = makeRenameCollectionOplogEntry(NamespaceString("test.x"), NamespaceString("test.z"), collection1UUID, @@ -1357,20 +1347,19 @@ TEST_F(RSRollbackTest, RollbackDropCollectionThenRenameCollectionToDroppedCollec auto droppedCollectionUUID = droppedCollection->uuid().get(); _dropPendingCollectionReaper->addDropPendingNamespace(dropTime, dpns); - auto commonOperation = makeOpAndRecordId(1, 1); + auto commonOperation = makeOpAndRecordId(1); auto dropCollectionOperation = - std::make_pair( - BSON("ts" << dropTime.getTimestamp() << "t" << dropTime.getTerm() << "h" << 1LL << "op" - << "c" - << "ui" - << droppedCollectionUUID - << "ns" - << "test.x" - << "o" - << BSON("drop" - << "x")), - RecordId(2)); + std::make_pair(BSON("ts" << dropTime.getTimestamp() << "t" << dropTime.getTerm() << "op" + << "c" + << "ui" + << droppedCollectionUUID + << "ns" + << "test.x" + << "o" + << BSON("drop" + << "x")), + RecordId(2)); auto renameCollectionOperation = makeRenameCollectionOplogEntry(NamespaceString("test.y"), NamespaceString("test.x"), @@ -1426,7 +1415,7 @@ TEST_F(RSRollbackTest, RollbackRenameCollectionThenCreateNewCollectionWithOldNam auto createdCollection = _createCollection(_opCtx.get(), "test.x", createdCollOptions); auto createdCollectionUUID = createdCollection->uuid().get(); - auto commonOperation = makeOpAndRecordId(1, 1); + auto commonOperation = makeOpAndRecordId(1); auto renameCollectionOperation = makeRenameCollectionOplogEntry(NamespaceString("test.x"), NamespaceString("test.y"), @@ -1435,17 +1424,16 @@ TEST_F(RSRollbackTest, RollbackRenameCollectionThenCreateNewCollectionWithOldNam false, OpTime(Timestamp(2, 0), 5)); - auto createCollectionOperation = - std::make_pair(BSON("ts" << Timestamp(Seconds(3), 0) << "h" << 1LL << "op" - << "c" - << "ui" - << createdCollectionUUID - << "ns" - << "test.x" - << "o" - << BSON("create" - << "x")), - RecordId(3)); + auto createCollectionOperation = std::make_pair(BSON("ts" << Timestamp(Seconds(3), 0) << "op" + << "c" + << "ui" + << createdCollectionUUID + << "ns" + << "test.x" + << "o" + << BSON("create" + << "x")), + RecordId(3)); RollbackSourceMock rollbackSource(std::unique_ptr<OplogInterface>(new OplogInterfaceMock({ @@ -1483,20 +1471,19 @@ TEST_F(RSRollbackTest, RollbackCollModCommandFailsIfRBIDChangesWhileSyncingColle options.uuid = UUID::gen(); auto coll = _createCollection(_opCtx.get(), "test.t", options); - auto commonOperation = makeOpAndRecordId(1, 1); - auto collModOperation = - std::make_pair(BSON("ts" << Timestamp(Seconds(2), 0) << "h" << 1LL << "op" - << "c" - << "ui" - << coll->uuid().get() - << "ns" - << "test.t" - << "o" - << BSON("collMod" - << "t" - << "validationLevel" - << "off")), - RecordId(2)); + auto commonOperation = makeOpAndRecordId(1); + auto collModOperation = std::make_pair(BSON("ts" << Timestamp(Seconds(2), 0) << "op" + << "c" + << "ui" + << coll->uuid().get() + << "ns" + << "test.t" + << "o" + << BSON("collMod" + << "t" + << "validationLevel" + << "off")), + RecordId(2)); class RollbackSourceLocal : public RollbackSourceMock { public: using RollbackSourceMock::RollbackSourceMock; @@ -1527,16 +1514,15 @@ TEST_F(RSRollbackTest, RollbackCollModCommandFailsIfRBIDChangesWhileSyncingColle TEST_F(RSRollbackTest, RollbackDropDatabaseCommand) { createOplog(_opCtx.get()); - auto commonOperation = makeOpAndRecordId(1, 1); + auto commonOperation = makeOpAndRecordId(1); // 'dropDatabase' operations are special and do not include a UUID field. - auto dropDatabaseOperation = - std::make_pair(BSON("ts" << Timestamp(Seconds(2), 0) << "h" << 1LL << "op" - << "c" - << "ns" - << "test.$cmd" - << "o" - << BSON("dropDatabase" << 1)), - RecordId(2)); + auto dropDatabaseOperation = std::make_pair(BSON("ts" << Timestamp(Seconds(2), 0) << "op" + << "c" + << "ns" + << "test.$cmd" + << "o" + << BSON("dropDatabase" << 1)), + RecordId(2)); RollbackSourceMock rollbackSource(std::unique_ptr<OplogInterface>(new OplogInterfaceMock({ commonOperation, }))); @@ -1551,7 +1537,7 @@ TEST_F(RSRollbackTest, RollbackDropDatabaseCommand) { BSONObj makeApplyOpsOplogEntry(Timestamp ts, std::initializer_list<BSONObj> ops) { // applyOps oplog entries are special and do not include a UUID field. BSONObjBuilder entry; - entry << "ts" << ts << "h" << 1LL << "op" + entry << "ts" << ts << "op" << "c" << "ns" << "admin"; @@ -1568,11 +1554,9 @@ BSONObj makeApplyOpsOplogEntry(Timestamp ts, std::initializer_list<BSONObj> ops) OpTime getOpTimeFromOplogEntry(const BSONObj& entry) { const BSONElement tsElement = entry["ts"]; const BSONElement termElement = entry["t"]; - const BSONElement hashElement = entry["h"]; ASSERT_EQUALS(bsonTimestamp, tsElement.type()) << entry; - ASSERT_TRUE(hashElement.isNumber()) << entry; ASSERT_TRUE(termElement.eoo() || termElement.isNumber()) << entry; - long long term = hashElement.numberLong(); + long long term = 1LL; if (!termElement.eoo()) { term = termElement.numberLong(); } @@ -1602,7 +1586,7 @@ TEST_F(RSRollbackTest, RollbackApplyOpsCommand) { wuow.commit(); } UUID uuid = coll->uuid().get(); - const auto commonOperation = makeOpAndRecordId(1, 1); + const auto commonOperation = makeOpAndRecordId(1); const auto applyOpsOperation = std::make_pair(makeApplyOpsOplogEntry(Timestamp(Seconds(2), 0), {BSON("op" @@ -1613,8 +1597,6 @@ TEST_F(RSRollbackTest, RollbackApplyOpsCommand) { << Timestamp(1, 1) << "t" << 1LL - << "h" - << 2LL << "ns" << "test.t" << "o2" @@ -1629,8 +1611,6 @@ TEST_F(RSRollbackTest, RollbackApplyOpsCommand) { << Timestamp(2, 1) << "t" << 1LL - << "h" - << 2LL << "ns" << "test.t" << "o2" @@ -1645,8 +1625,6 @@ TEST_F(RSRollbackTest, RollbackApplyOpsCommand) { << Timestamp(3, 1) << "t" << 1LL - << "h" - << 2LL << "ns" << "test.t" << "o" @@ -1659,14 +1637,12 @@ TEST_F(RSRollbackTest, RollbackApplyOpsCommand) { << Timestamp(4, 1) << "t" << 1LL - << "h" - << 2LL << "ns" << "test.t" << "o" << BSON("_id" << 4)), // applyOps internal oplog entries are not required - // to have a timestamp and/or hash. + // to have a timestamp. BSON("op" << "i" << "ui" @@ -1685,8 +1661,6 @@ TEST_F(RSRollbackTest, RollbackApplyOpsCommand) { << uuid << "t" << 1LL - << "h" - << 2LL << "ns" << "test.t" << "o" @@ -1766,18 +1740,17 @@ TEST_F(RSRollbackTest, RollbackCreateCollectionCommand) { options.uuid = UUID::gen(); auto coll = _createCollection(_opCtx.get(), "test.t", options); - auto commonOperation = makeOpAndRecordId(1, 1); - auto createCollectionOperation = - std::make_pair(BSON("ts" << Timestamp(Seconds(2), 0) << "h" << 1LL << "op" - << "c" - << "ui" - << coll->uuid().get() - << "ns" - << "test.t" - << "o" - << BSON("create" - << "t")), - RecordId(2)); + auto commonOperation = makeOpAndRecordId(1); + auto createCollectionOperation = std::make_pair(BSON("ts" << Timestamp(Seconds(2), 0) << "op" + << "c" + << "ui" + << coll->uuid().get() + << "ns" + << "test.t" + << "o" + << BSON("create" + << "t")), + RecordId(2)); RollbackSourceMock rollbackSource( std::unique_ptr<OplogInterface>(new OplogInterfaceMock({commonOperation}))); ASSERT_OK(syncRollback(_opCtx.get(), @@ -1801,7 +1774,7 @@ TEST_F(RSRollbackTest, RollbackCollectionModificationCommand) { options.uuid = UUID::gen(); auto coll = _createCollection(_opCtx.get(), "test.t", options); - auto commonOperation = makeOpAndRecordId(1, 1); + auto commonOperation = makeOpAndRecordId(1); BSONObj collModCmd = BSON("collMod" << "t" @@ -1964,7 +1937,7 @@ TEST_F(RSRollbackTest, RollbackCollectionModificationCommandInvalidCollectionOpt options.uuid = UUID::gen(); auto coll = _createCollection(_opCtx.get(), "test.t", options); - auto commonOperation = makeOpAndRecordId(1, 1); + auto commonOperation = makeOpAndRecordId(1); BSONObj collModCmd = BSON("collMod" << "t" @@ -2005,8 +1978,6 @@ TEST(RSRollbackTest, LocalEntryWithoutNsIsFatal) { << Timestamp(1, 1) << "t" << 1LL - << "h" - << 1LL << "ns" << "test.t" << "o" @@ -2021,8 +1992,6 @@ TEST(RSRollbackTest, LocalEntryWithoutNsIsFatal) { << Timestamp(1, 1) << "t" << 1LL - << "h" - << 1LL << "ns" << "" << "o" @@ -2040,8 +2009,6 @@ TEST(RSRollbackTest, LocalEntryWithoutOIsFatal) { << Timestamp(1, 1) << "t" << 1LL - << "h" - << 1LL << "ns" << "test.t" << "o" @@ -2056,8 +2023,6 @@ TEST(RSRollbackTest, LocalEntryWithoutOIsFatal) { << Timestamp(1, 1) << "t" << 1LL - << "h" - << 1LL << "ns" << "test.t" << "o" @@ -2075,8 +2040,6 @@ TEST(RSRollbackTest, LocalUpdateEntryWithoutO2IsFatal) { << Timestamp(1, 1) << "t" << 1LL - << "h" - << 1LL << "ns" << "test.t" << "o" @@ -2093,8 +2056,6 @@ TEST(RSRollbackTest, LocalUpdateEntryWithoutO2IsFatal) { << Timestamp(1, 1) << "t" << 1LL - << "h" - << 1LL << "ns" << "test.t" << "o" @@ -2112,8 +2073,6 @@ TEST(RSRollbackTest, LocalUpdateEntryWithEmptyO2IsFatal) { << Timestamp(1, 1) << "t" << 1LL - << "h" - << 1LL << "ns" << "test.t" << "o" @@ -2130,8 +2089,6 @@ TEST(RSRollbackTest, LocalUpdateEntryWithEmptyO2IsFatal) { << Timestamp(1, 1) << "t" << 1LL - << "h" - << 1LL << "ns" << "test.t" << "o" @@ -2143,7 +2100,7 @@ TEST(RSRollbackTest, LocalUpdateEntryWithEmptyO2IsFatal) { } DEATH_TEST_F(RSRollbackTest, LocalEntryWithTxnNumberWithoutSessionIdIsFatal, "invariant") { - auto validOplogEntry = BSON("ts" << Timestamp(Seconds(1), 0) << "t" << 1LL << "h" << 1LL << "op" + auto validOplogEntry = BSON("ts" << Timestamp(Seconds(1), 0) << "t" << 1LL << "op" << "i" << "ui" << UUID::gen() @@ -2163,7 +2120,7 @@ DEATH_TEST_F(RSRollbackTest, LocalEntryWithTxnNumberWithoutSessionIdIsFatal, "in } DEATH_TEST_F(RSRollbackTest, LocalEntryWithTxnNumberWithoutStmtIdIsFatal, "invariant") { - auto validOplogEntry = BSON("ts" << Timestamp(Seconds(1), 0) << "t" << 1LL << "h" << 1LL << "op" + auto validOplogEntry = BSON("ts" << Timestamp(Seconds(1), 0) << "t" << 1LL << "op" << "i" << "ui" << UUID::gen() @@ -2187,21 +2144,20 @@ TEST_F(RSRollbackTest, LocalEntryWithTxnNumberWithoutTxnTableUUIDIsFatal) { // If txnNumber is present, but the transaction collection has no UUID, rollback fails. UUID uuid = UUID::gen(); auto lsid = makeLogicalSessionIdForTest(); - auto entryWithTxnNumber = - BSON("ts" << Timestamp(Seconds(1), 0) << "t" << 1LL << "h" << 1LL << "op" - << "i" - << "ui" - << uuid - << "ns" - << "test.t" - << "o" - << BSON("_id" << 1 << "a" << 1) - << "txnNumber" - << 1LL - << "stmtId" - << 1 - << "lsid" - << lsid.toBSON()); + auto entryWithTxnNumber = BSON("ts" << Timestamp(Seconds(1), 0) << "t" << 1LL << "op" + << "i" + << "ui" + << uuid + << "ns" + << "test.t" + << "o" + << BSON("_id" << 1 << "a" << 1) + << "txnNumber" + << 1LL + << "stmtId" + << 1 + << "lsid" + << lsid.toBSON()); FixUpInfo fui; ASSERT_THROWS(updateFixUpInfoFromLocalOplogEntry(fui, entryWithTxnNumber, false), @@ -2212,15 +2168,14 @@ TEST_F(RSRollbackTest, LocalEntryWithTxnNumberAddsTransactionTableDocToBeRefetch FixUpInfo fui; // With no txnNumber present, no extra documents need to be refetched. - auto entryWithoutTxnNumber = - BSON("ts" << Timestamp(Seconds(1), 0) << "t" << 1LL << "h" << 1LL << "op" - << "i" - << "ui" - << UUID::gen() - << "ns" - << "test.t2" - << "o" - << BSON("_id" << 2 << "a" << 2)); + auto entryWithoutTxnNumber = BSON("ts" << Timestamp(Seconds(1), 0) << "t" << 1LL << "op" + << "i" + << "ui" + << UUID::gen() + << "ns" + << "test.t2" + << "o" + << BSON("_id" << 2 << "a" << 2)); ASSERT_OK(updateFixUpInfoFromLocalOplogEntry(fui, entryWithoutTxnNumber, false)); ASSERT_EQ(fui.docsToRefetch.size(), 1U); @@ -2230,21 +2185,20 @@ TEST_F(RSRollbackTest, LocalEntryWithTxnNumberAddsTransactionTableDocToBeRefetch // refetched. UUID uuid = UUID::gen(); auto lsid = makeLogicalSessionIdForTest(); - auto entryWithTxnNumber = - BSON("ts" << Timestamp(Seconds(1), 0) << "t" << 1LL << "h" << 1LL << "op" - << "i" - << "ui" - << uuid - << "ns" - << "test.t" - << "o" - << BSON("_id" << 1 << "a" << 1) - << "txnNumber" - << 1LL - << "stmtId" - << 1 - << "lsid" - << lsid.toBSON()); + auto entryWithTxnNumber = BSON("ts" << Timestamp(Seconds(1), 0) << "t" << 1LL << "op" + << "i" + << "ui" + << uuid + << "ns" + << "test.t" + << "o" + << BSON("_id" << 1 << "a" << 1) + << "txnNumber" + << 1LL + << "stmtId" + << 1 + << "lsid" + << lsid.toBSON()); UUID transactionTableUUID = UUID::gen(); fui.transactionTableUUID = transactionTableUUID; @@ -2263,26 +2217,25 @@ TEST_F(RSRollbackTest, RollbackFailsIfTransactionDocumentRefetchReturnsDifferent // transaction number and session id. FixUpInfo fui; - auto entryWithTxnNumber = - BSON("ts" << Timestamp(Seconds(2), 0) << "t" << 1LL << "h" << 1LL << "op" - << "i" - << "ui" - << UUID::gen() - << "ns" - << "test.t" - << "o" - << BSON("_id" << 1 << "a" << 1) - << "txnNumber" - << 1LL - << "stmtId" - << 1 - << "lsid" - << makeLogicalSessionIdForTest().toBSON()); + auto entryWithTxnNumber = BSON("ts" << Timestamp(Seconds(2), 0) << "t" << 1LL << "op" + << "i" + << "ui" + << UUID::gen() + << "ns" + << "test.t" + << "o" + << BSON("_id" << 1 << "a" << 1) + << "txnNumber" + << 1LL + << "stmtId" + << 1 + << "lsid" + << makeLogicalSessionIdForTest().toBSON()); UUID transactionTableUUID = UUID::gen(); fui.transactionTableUUID = transactionTableUUID; - auto commonOperation = makeOpAndRecordId(1, 1); + auto commonOperation = makeOpAndRecordId(1); fui.commonPoint = OpTime(Timestamp(Seconds(1), 0), 1LL); fui.commonPointOurDiskloc = RecordId(1); diff --git a/src/mongo/db/repl/session_update_tracker.cpp b/src/mongo/db/repl/session_update_tracker.cpp index 87698aa4cbb..e5ffd5d27b9 100644 --- a/src/mongo/db/repl/session_update_tracker.cpp +++ b/src/mongo/db/repl/session_update_tracker.cpp @@ -88,7 +88,7 @@ boost::optional<repl::OplogEntry> createMatchingTransactionTableUpdate( return repl::OplogEntry( entry.getOpTime(), - 0, // hash + boost::none, // hash repl::OpTypeEnum::kUpdate, NamespaceString::kSessionTransactionsTableNamespace, boost::none, // uuid diff --git a/src/mongo/db/repl/storage_interface_impl_test.cpp b/src/mongo/db/repl/storage_interface_impl_test.cpp index dbc679c9a4d..71f6792c276 100644 --- a/src/mongo/db/repl/storage_interface_impl_test.cpp +++ b/src/mongo/db/repl/storage_interface_impl_test.cpp @@ -153,7 +153,6 @@ int _createIndexOnEmptyCollection(OperationContext* opCtx, NamespaceString nss, */ TimestampedBSONObj makeOplogEntry(OpTime opTime) { BSONObjBuilder bob(opTime.toBSON()); - bob.append("h", 1LL); bob.append("op", "c"); bob.append("ns", "test.t"); return {bob.obj(), opTime.getTimestamp()}; diff --git a/src/mongo/db/repl/sync_source_resolver_test.cpp b/src/mongo/db/repl/sync_source_resolver_test.cpp index ae96e706a11..7d00daff9e7 100644 --- a/src/mongo/db/repl/sync_source_resolver_test.cpp +++ b/src/mongo/db/repl/sync_source_resolver_test.cpp @@ -302,7 +302,7 @@ void _scheduleFirstOplogEntryFetcherResponse(executor::NetworkInterfaceMock* net */ BSONObj _makeOplogEntry(Timestamp ts, long long term) { return OplogEntry(OpTime(ts, term), // optime - 1LL, // hash + boost::none, // hash OpTypeEnum::kNoop, // op type NamespaceString("a.a"), // namespace boost::none, // uuid diff --git a/src/mongo/db/repl/sync_tail_test.cpp b/src/mongo/db/repl/sync_tail_test.cpp index 989f9220896..96c77646196 100644 --- a/src/mongo/db/repl/sync_tail_test.cpp +++ b/src/mongo/db/repl/sync_tail_test.cpp @@ -82,7 +82,7 @@ namespace { */ OplogEntry makeOplogEntry(OpTypeEnum opType, NamespaceString nss, OptionalCollectionUUID uuid) { return OplogEntry(OpTime(Timestamp(1, 1), 1), // optime - 1LL, // hash + boost::none, // hash opType, // opType nss, // namespace uuid, // uuid @@ -356,9 +356,7 @@ TEST_F(SyncTailTest, SyncApplyCommand) { << "o" << BSON("create" << nss.coll()) << "ts" - << Timestamp(1, 1) - << "h" - << 0LL); + << Timestamp(1, 1)); bool applyCmdCalled = false; _opObserver->onCreateCollectionFn = [&](OperationContext* opCtx, Collection*, @@ -388,9 +386,7 @@ TEST_F(SyncTailTest, SyncApplyCommandThrowsException) { << BSON("create" << "t") << "ts" - << Timestamp(1, 1) - << "h" - << 0LL); + << Timestamp(1, 1)); // This test relies on the namespace type check of IDL. ASSERT_THROWS(SyncTail::syncApply(_opCtx.get(), op, OplogApplication::Mode::kInitialSync), ExceptionFor<ErrorCodes::TypeMismatch>); @@ -1508,7 +1504,7 @@ TEST_F(SyncTailTest, LogSlowOpApplicationWhenSuccessful) { // Use a builder for easier escaping. We expect the operation to be logged. StringBuilder expected; expected << "applied op: CRUD { op: \"i\", ns: \"test.t\", o: { _id: 0 }, ts: Timestamp(1, 1), " - "t: 1, h: 1, v: 2 }, took " + "t: 1, v: 2 }, took " << applyDuration << "ms"; ASSERT_EQUALS(1, countLogLinesContaining(expected.str())); } @@ -1584,7 +1580,7 @@ public: const OperationSessionInfo& sessionInfo, Date_t wallClockTime) { return repl::OplogEntry(opTime, // optime - 0, // hash + boost::none, // hash opType, // opType ns, // namespace boost::none, // uuid |