diff options
Diffstat (limited to 'src')
16 files changed, 93 insertions, 52 deletions
diff --git a/src/mongo/db/repl/data_replicator_external_state.h b/src/mongo/db/repl/data_replicator_external_state.h index ec00c55f9ee..b2ecaa619e6 100644 --- a/src/mongo/db/repl/data_replicator_external_state.h +++ b/src/mongo/db/repl/data_replicator_external_state.h @@ -132,7 +132,7 @@ private: * * Used exclusively by the DataReplicator to construct a MultiApplier. */ - virtual void _multiSyncApply(MultiApplier::OperationPtrs* ops) = 0; + virtual Status _multiSyncApply(MultiApplier::OperationPtrs* ops) = 0; /** * Used by _multiApply() to write operations to database during initial sync. @@ -140,8 +140,8 @@ private: * * Used exclusively by the DataReplicator to construct a MultiApplier. */ - virtual void _multiInitialSyncApply(MultiApplier::OperationPtrs* ops, - const HostAndPort& source) = 0; + virtual Status _multiInitialSyncApply(MultiApplier::OperationPtrs* ops, + const HostAndPort& source) = 0; // Provides DataReplicator with access to _multiApply, _multiSyncApply and // _multiInitialSyncApply. diff --git a/src/mongo/db/repl/data_replicator_external_state_impl.cpp b/src/mongo/db/repl/data_replicator_external_state_impl.cpp index be98fd44de7..6f320113807 100644 --- a/src/mongo/db/repl/data_replicator_external_state_impl.cpp +++ b/src/mongo/db/repl/data_replicator_external_state_impl.cpp @@ -100,13 +100,13 @@ StatusWith<OpTime> DataReplicatorExternalStateImpl::_multiApply( return _replicationCoordinatorExternalState->multiApply(txn, std::move(ops), applyOperation); } -void DataReplicatorExternalStateImpl::_multiSyncApply(MultiApplier::OperationPtrs* ops) { - _replicationCoordinatorExternalState->multiSyncApply(ops); +Status DataReplicatorExternalStateImpl::_multiSyncApply(MultiApplier::OperationPtrs* ops) { + return _replicationCoordinatorExternalState->multiSyncApply(ops); } -void DataReplicatorExternalStateImpl::_multiInitialSyncApply(MultiApplier::OperationPtrs* ops, - const HostAndPort& source) { - _replicationCoordinatorExternalState->multiInitialSyncApply(ops, source); +Status DataReplicatorExternalStateImpl::_multiInitialSyncApply(MultiApplier::OperationPtrs* ops, + const HostAndPort& source) { + return _replicationCoordinatorExternalState->multiInitialSyncApply(ops, source); } ReplicationCoordinator* DataReplicatorExternalStateImpl::getReplicationCoordinator() const { diff --git a/src/mongo/db/repl/data_replicator_external_state_impl.h b/src/mongo/db/repl/data_replicator_external_state_impl.h index 21ce3f0bc13..f3a1c7b872d 100644 --- a/src/mongo/db/repl/data_replicator_external_state_impl.h +++ b/src/mongo/db/repl/data_replicator_external_state_impl.h @@ -68,10 +68,10 @@ private: MultiApplier::Operations ops, MultiApplier::ApplyOperationFn applyOperation) override; - void _multiSyncApply(MultiApplier::OperationPtrs* ops) override; + Status _multiSyncApply(MultiApplier::OperationPtrs* ops) override; - void _multiInitialSyncApply(MultiApplier::OperationPtrs* ops, - const HostAndPort& source) override; + Status _multiInitialSyncApply(MultiApplier::OperationPtrs* ops, + const HostAndPort& source) override; protected: ReplicationCoordinator* getReplicationCoordinator() const; diff --git a/src/mongo/db/repl/data_replicator_external_state_mock.cpp b/src/mongo/db/repl/data_replicator_external_state_mock.cpp index 48a9b10051e..eed3adc2591 100644 --- a/src/mongo/db/repl/data_replicator_external_state_mock.cpp +++ b/src/mongo/db/repl/data_replicator_external_state_mock.cpp @@ -86,10 +86,14 @@ StatusWith<OpTime> DataReplicatorExternalStateMock::_multiApply( return multiApplyFn(txn, std::move(ops), applyOperation); } -void DataReplicatorExternalStateMock::_multiSyncApply(MultiApplier::OperationPtrs* ops) {} +Status DataReplicatorExternalStateMock::_multiSyncApply(MultiApplier::OperationPtrs* ops) { + return Status::OK(); +} -void DataReplicatorExternalStateMock::_multiInitialSyncApply(MultiApplier::OperationPtrs* ops, - const HostAndPort& source) {} +Status DataReplicatorExternalStateMock::_multiInitialSyncApply(MultiApplier::OperationPtrs* ops, + const HostAndPort& source) { + return Status::OK(); +} } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/data_replicator_external_state_mock.h b/src/mongo/db/repl/data_replicator_external_state_mock.h index 5b92ef577e5..d66f72014d7 100644 --- a/src/mongo/db/repl/data_replicator_external_state_mock.h +++ b/src/mongo/db/repl/data_replicator_external_state_mock.h @@ -91,10 +91,10 @@ private: MultiApplier::Operations ops, MultiApplier::ApplyOperationFn applyOperation) override; - void _multiSyncApply(MultiApplier::OperationPtrs* ops) override; + Status _multiSyncApply(MultiApplier::OperationPtrs* ops) override; - void _multiInitialSyncApply(MultiApplier::OperationPtrs* ops, - const HostAndPort& source) override; + Status _multiInitialSyncApply(MultiApplier::OperationPtrs* ops, + const HostAndPort& source) override; }; diff --git a/src/mongo/db/repl/multiapplier.h b/src/mongo/db/repl/multiapplier.h index 31ec4f93353..66e9d150b60 100644 --- a/src/mongo/db/repl/multiapplier.h +++ b/src/mongo/db/repl/multiapplier.h @@ -70,7 +70,7 @@ public: * would have the same outcome as calling SyncTail::syncApply() ('convertUpdatesToUpserts' * value will be embedded in the function implementation). */ - using ApplyOperationFn = stdx::function<void(OperationPtrs*)>; + using ApplyOperationFn = stdx::function<Status(OperationPtrs*)>; using MultiApplyFn = stdx::function<StatusWith<OpTime>( OperationContext*, MultiApplier::Operations, MultiApplier::ApplyOperationFn)>; diff --git a/src/mongo/db/repl/multiapplier_test.cpp b/src/mongo/db/repl/multiapplier_test.cpp index 549d6fcb80a..cd445be0c6d 100644 --- a/src/mongo/db/repl/multiapplier_test.cpp +++ b/src/mongo/db/repl/multiapplier_test.cpp @@ -69,9 +69,12 @@ void MultiApplierTest::tearDown() { executor::ThreadPoolExecutorTest::tearDown(); } +Status applyOperation(MultiApplier::OperationPtrs*) { + return Status::OK(); +}; + TEST_F(MultiApplierTest, InvalidConstruction) { const MultiApplier::Operations operations{OplogEntry(BSON("ts" << Timestamp(Seconds(123), 0)))}; - auto applyOperation = [](MultiApplier::OperationPtrs*) {}; auto multiApply = [](OperationContext*, MultiApplier::Operations, MultiApplier::ApplyOperationFn) -> StatusWith<OpTime> { @@ -136,7 +139,6 @@ TEST_F(MultiApplierTest, InvalidConstruction) { TEST_F(MultiApplierTest, MultiApplierInvokesCallbackWithCallbackCanceledStatusUponCancellation) { const MultiApplier::Operations operations{OplogEntry(BSON("ts" << Timestamp(Seconds(123), 0)))}; - auto applyOperation = [](MultiApplier::OperationPtrs*) {}; bool multiApplyInvoked = false; auto multiApply = [&](OperationContext* txn, @@ -169,7 +171,6 @@ TEST_F(MultiApplierTest, MultiApplierInvokesCallbackWithCallbackCanceledStatusUp TEST_F(MultiApplierTest, MultiApplierPassesMultiApplyErrorToCallback) { const MultiApplier::Operations operations{OplogEntry(BSON("ts" << Timestamp(Seconds(123), 0)))}; - auto applyOperation = [](MultiApplier::OperationPtrs*) {}; bool multiApplyInvoked = false; Status multiApplyError(ErrorCodes::OperationFailed, "multi apply failed"); @@ -199,7 +200,6 @@ TEST_F(MultiApplierTest, MultiApplierPassesMultiApplyErrorToCallback) { TEST_F(MultiApplierTest, MultiApplierCatchesMultiApplyExceptionAndConvertsToCallbackStatus) { const MultiApplier::Operations operations{OplogEntry(BSON("ts" << Timestamp(Seconds(123), 0)))}; - auto applyOperation = [](MultiApplier::OperationPtrs*) {}; bool multiApplyInvoked = false; Status multiApplyError(ErrorCodes::OperationFailed, "multi apply failed"); @@ -232,7 +232,6 @@ TEST_F( MultiApplierTest, MultiApplierProvidesOperationContextToMultiApplyFunctionButDisposesBeforeInvokingFinishCallback) { const MultiApplier::Operations operations{OplogEntry(BSON("ts" << Timestamp(Seconds(123), 0)))}; - auto applyOperation = [](MultiApplier::OperationPtrs*) {}; OperationContext* multiApplyTxn = nullptr; MultiApplier::Operations operationsToApply; diff --git a/src/mongo/db/repl/replication_coordinator_external_state.h b/src/mongo/db/repl/replication_coordinator_external_state.h index 4506518df5e..574bc23a0c5 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state.h +++ b/src/mongo/db/repl/replication_coordinator_external_state.h @@ -307,14 +307,14 @@ public: /** * Used by multiApply() to writes operations to database during steady state replication. */ - virtual void multiSyncApply(MultiApplier::OperationPtrs* ops) = 0; + virtual Status multiSyncApply(MultiApplier::OperationPtrs* ops) = 0; /** * Used by multiApply() to writes operations to database during initial sync. * Fetches missing documents from "source". */ - virtual void multiInitialSyncApply(MultiApplier::OperationPtrs* ops, - const HostAndPort& source) = 0; + virtual Status multiInitialSyncApply(MultiApplier::OperationPtrs* ops, + const HostAndPort& source) = 0; /** * This function creates an oplog buffer of the type specified at server startup. diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp index 87c7d25825c..084018b0270 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.cpp @@ -822,12 +822,14 @@ StatusWith<OpTime> ReplicationCoordinatorExternalStateImpl::multiApply( return repl::multiApply(txn, _writerPool.get(), std::move(ops), applyOperation); } -void ReplicationCoordinatorExternalStateImpl::multiSyncApply(MultiApplier::OperationPtrs* ops) { +Status ReplicationCoordinatorExternalStateImpl::multiSyncApply(MultiApplier::OperationPtrs* ops) { // SyncTail* argument is not used by repl::multiSyncApply(). repl::multiSyncApply(ops, nullptr); + // multiSyncApply() will throw or abort on error, so we hardcode returning OK. + return Status::OK(); } -void ReplicationCoordinatorExternalStateImpl::multiInitialSyncApply( +Status ReplicationCoordinatorExternalStateImpl::multiInitialSyncApply( MultiApplier::OperationPtrs* ops, const HostAndPort& source) { // repl::multiInitialSyncApply uses SyncTail::shouldRetry() (and implicitly getMissingDoc()) @@ -836,7 +838,7 @@ void ReplicationCoordinatorExternalStateImpl::multiInitialSyncApply( // be accessing any SyncTail functionality that require these constructor parameters. SyncTail syncTail(nullptr, SyncTail::MultiSyncApplyFunc(), nullptr); syncTail.setHostname(source.toString()); - repl::multiInitialSyncApply(ops, &syncTail); + return repl::multiInitialSyncApply(ops, &syncTail); } std::unique_ptr<OplogBuffer> ReplicationCoordinatorExternalStateImpl::makeInitialSyncOplogBuffer( diff --git a/src/mongo/db/repl/replication_coordinator_external_state_impl.h b/src/mongo/db/repl/replication_coordinator_external_state_impl.h index 93634b61c82..5aa5f37ee5d 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_impl.h +++ b/src/mongo/db/repl/replication_coordinator_external_state_impl.h @@ -104,9 +104,9 @@ public: virtual StatusWith<OpTime> multiApply(OperationContext* txn, MultiApplier::Operations ops, MultiApplier::ApplyOperationFn applyOperation) override; - virtual void multiSyncApply(MultiApplier::OperationPtrs* ops) override; - virtual void multiInitialSyncApply(MultiApplier::OperationPtrs* ops, - const HostAndPort& source) override; + virtual Status multiSyncApply(MultiApplier::OperationPtrs* ops) override; + virtual Status multiInitialSyncApply(MultiApplier::OperationPtrs* ops, + const HostAndPort& source) override; virtual std::unique_ptr<OplogBuffer> makeInitialSyncOplogBuffer( OperationContext* txn) const override; virtual std::unique_ptr<OplogBuffer> makeSteadyStateOplogBuffer( diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp index 65d0b8d2b29..808c8320ec2 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp +++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.cpp @@ -267,10 +267,14 @@ StatusWith<OpTime> ReplicationCoordinatorExternalStateMock::multiApply( return {ErrorCodes::InternalError, "Method not implemented"}; } -void ReplicationCoordinatorExternalStateMock::multiSyncApply(MultiApplier::OperationPtrs* ops) {} +Status ReplicationCoordinatorExternalStateMock::multiSyncApply(MultiApplier::OperationPtrs* ops) { + return Status::OK(); +} -void ReplicationCoordinatorExternalStateMock::multiInitialSyncApply( - MultiApplier::OperationPtrs* ops, const HostAndPort& source) {} +Status ReplicationCoordinatorExternalStateMock::multiInitialSyncApply( + MultiApplier::OperationPtrs* ops, const HostAndPort& source) { + return Status::OK(); +} std::unique_ptr<OplogBuffer> ReplicationCoordinatorExternalStateMock::makeInitialSyncOplogBuffer( OperationContext* txn) const { diff --git a/src/mongo/db/repl/replication_coordinator_external_state_mock.h b/src/mongo/db/repl/replication_coordinator_external_state_mock.h index 6b8ebdb49ca..c913138f5cf 100644 --- a/src/mongo/db/repl/replication_coordinator_external_state_mock.h +++ b/src/mongo/db/repl/replication_coordinator_external_state_mock.h @@ -98,9 +98,9 @@ public: virtual StatusWith<OpTime> multiApply(OperationContext* txn, MultiApplier::Operations ops, MultiApplier::ApplyOperationFn applyOperation) override; - virtual void multiSyncApply(MultiApplier::OperationPtrs* ops) override; - virtual void multiInitialSyncApply(MultiApplier::OperationPtrs* ops, - const HostAndPort& source) override; + virtual Status multiSyncApply(MultiApplier::OperationPtrs* ops) override; + virtual Status multiInitialSyncApply(MultiApplier::OperationPtrs* ops, + const HostAndPort& source) override; virtual std::unique_ptr<OplogBuffer> makeInitialSyncOplogBuffer( OperationContext* txn) const override; virtual std::unique_ptr<OplogBuffer> makeSteadyStateOplogBuffer( diff --git a/src/mongo/db/repl/rs_initialsync.cpp b/src/mongo/db/repl/rs_initialsync.cpp index d4a2116b532..00c81c5d505 100644 --- a/src/mongo/db/repl/rs_initialsync.cpp +++ b/src/mongo/db/repl/rs_initialsync.cpp @@ -293,7 +293,7 @@ Status _initialSync(BackgroundSync* bgsync) { } } - InitialSync init(bgsync, multiInitialSyncApply); + InitialSync init(bgsync, multiInitialSyncApply_abortOnFailure); init.setHostname(r.getHost().toString()); BSONObj lastOp = r.getLastOp(rsOplogName); diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index 6fc77bd3242..a012ae6d129 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -439,14 +439,17 @@ void prefetchOps(const MultiApplier::Operations& ops, OldThreadPool* prefetcherP // Doles out all the work to the writer pool threads. // Does not modify writerVectors, but passes non-const pointers to inner vectors into func. -void applyOps(std::vector<MultiApplier::OperationPtrs>* writerVectors, +void applyOps(std::vector<MultiApplier::OperationPtrs>& writerVectors, OldThreadPool* writerPool, - const MultiApplier::ApplyOperationFn& func) { + const MultiApplier::ApplyOperationFn& func, + std::vector<Status>* statusVector) { + invariant(writerVectors.size() == statusVector->size()); TimerHolder timer(&applyBatchStats); - for (auto&& ops : *writerVectors) { - if (!ops.empty()) { - auto opsPtr = &ops; - writerPool->schedule([&func, opsPtr] { func(opsPtr); }); + for (size_t i = 0; i < writerVectors.size(); i++) { + if (!writerVectors[i].empty()) { + writerPool->schedule([&func, &writerVectors, statusVector, i] { + (*statusVector)[i] = func(&writerVectors[i]); + }); } } } @@ -614,7 +617,12 @@ void fillWriterVectors(OperationContext* txn, // Applies a batch of oplog entries, by using a set of threads to apply the operations and then // writes the oplog entries to the local oplog. OpTime SyncTail::multiApply(OperationContext* txn, MultiApplier::Operations ops) { - auto applyOperation = [this](MultiApplier::OperationPtrs* ops) { _applyFunc(ops, this); }; + auto applyOperation = [this](MultiApplier::OperationPtrs* ops) -> Status { + _applyFunc(ops, this); + // This function is used by 3.2 initial sync and steady state data replication. + // _applyFunc() will throw or abort on error, so we return OK here. + return Status::OK(); + }; return fassertStatusOK( 34437, repl::multiApply(txn, _writerPool.get(), std::move(ops), applyOperation)); } @@ -1144,12 +1152,18 @@ Status multiSyncApply_noAbort(OperationContext* txn, } // This free function is used by the initial sync writer threads to apply each op -void multiInitialSyncApply(MultiApplier::OperationPtrs* ops, SyncTail* st) { +void multiInitialSyncApply_abortOnFailure(MultiApplier::OperationPtrs* ops, SyncTail* st) { initializeWriterThread(); auto txn = cc().makeOperationContext(); fassertNoTrace(15915, multiInitialSyncApply_noAbort(txn.get(), ops, st)); } +Status multiInitialSyncApply(MultiApplier::OperationPtrs* ops, SyncTail* st) { + initializeWriterThread(); + auto txn = cc().makeOperationContext(); + return multiInitialSyncApply_noAbort(txn.get(), ops, st); +} + Status multiInitialSyncApply_noAbort(OperationContext* txn, MultiApplier::OperationPtrs* ops, SyncTail* st) { @@ -1238,6 +1252,7 @@ StatusWith<OpTime> multiApply(OperationContext* txn, "attempting to replicate ops while primary"}; } + std::vector<Status> statusVector(workerPool->getNumThreads(), Status::OK()); { // We must wait for the all work we've dispatched to complete before leaving this block // because the spawned threads refer to objects on our stack, including writerVectors. @@ -1253,7 +1268,14 @@ StatusWith<OpTime> multiApply(OperationContext* txn, storage->setOplogDeleteFromPoint(txn, Timestamp()); storage->setMinValidToAtLeast(txn, ops.back().getOpTime()); - applyOps(&writerVectors, workerPool, applyOperation); + applyOps(writerVectors, workerPool, applyOperation, &statusVector); + } + + // If any of the statuses is not ok, return error. + for (auto& status : statusVector) { + if (!status.isOK()) { + return status; + } } // We have now written all database writes and updated the oplog to match. diff --git a/src/mongo/db/repl/sync_tail.h b/src/mongo/db/repl/sync_tail.h index a0c0a6be59b..b2b06bb59ec 100644 --- a/src/mongo/db/repl/sync_tail.h +++ b/src/mongo/db/repl/sync_tail.h @@ -253,7 +253,12 @@ StatusWith<OpTime> multiApply(OperationContext* txn, // state of the container after calling. However, these functions cannot modify the pointed-to // operations because the OperationPtrs container contains const pointers. void multiSyncApply(MultiApplier::OperationPtrs* ops, SyncTail* st); -void multiInitialSyncApply(MultiApplier::OperationPtrs* ops, SyncTail* st); + +// Used by 3.2 initial sync. +void multiInitialSyncApply_abortOnFailure(MultiApplier::OperationPtrs* ops, SyncTail* st); + +// Used by 3.4 initial sync. +Status multiInitialSyncApply(MultiApplier::OperationPtrs* ops, SyncTail* st); /** * Testing-only version of multiSyncApply that returns an error instead of aborting. diff --git a/src/mongo/db/repl/sync_tail_test.cpp b/src/mongo/db/repl/sync_tail_test.cpp index ef0e62f177d..d316a07217c 100644 --- a/src/mongo/db/repl/sync_tail_test.cpp +++ b/src/mongo/db/repl/sync_tail_test.cpp @@ -74,7 +74,9 @@ protected: StorageInterfaceMock* _storageInterface = nullptr; // Implements the MultiApplier::ApplyOperationFn interface and does nothing. - static void noopApplyOperationFn(MultiApplier::OperationPtrs*) {} + static Status noopApplyOperationFn(MultiApplier::OperationPtrs*) { + return Status::OK(); + } private: void setUp() override; @@ -501,10 +503,12 @@ bool _testOplogEntryIsForCappedCollection(OperationContext* txn, const CollectionOptions& options) { auto writerPool = SyncTail::makeWriterPool(); MultiApplier::Operations operationsApplied; - auto applyOperationFn = [&operationsApplied](MultiApplier::OperationPtrs* operationsToApply) { + auto applyOperationFn = + [&operationsApplied](MultiApplier::OperationPtrs* operationsToApply) -> Status { for (auto&& opPtr : *operationsToApply) { operationsApplied.push_back(*opPtr); } + return Status::OK(); }; createCollection(txn, nss, options); @@ -548,12 +552,13 @@ TEST_F(SyncTailTest, MultiApplyAssignsOperationsToWriterThreadsBasedOnNamespaceH stdx::mutex mutex; std::vector<MultiApplier::Operations> operationsApplied; auto applyOperationFn = [&mutex, &operationsApplied]( - MultiApplier::OperationPtrs* operationsForWriterThreadToApply) { + MultiApplier::OperationPtrs* operationsForWriterThreadToApply) -> Status { stdx::lock_guard<stdx::mutex> lock(mutex); operationsApplied.emplace_back(); for (auto&& opPtr : *operationsForWriterThreadToApply) { operationsApplied.back().push_back(*opPtr); } + return Status::OK(); }; auto op1 = makeInsertDocumentOplogEntry({Timestamp(Seconds(1), 0), 1LL}, nss1, BSON("x" << 1)); |