diff options
author | Max Hirschhorn <max.hirschhorn@mongodb.com> | 2016-03-23 18:01:25 -0400 |
---|---|---|
committer | Max Hirschhorn <max.hirschhorn@mongodb.com> | 2016-03-23 18:01:25 -0400 |
commit | adde6aca20580af59d21896c90281547c55680c8 (patch) | |
tree | ffb4bbb7935817bebd9658636d8e7f8fc4bd6ede /src/mongo | |
parent | 20aee8500c87cd43ae72a8c811b7e88f4f737243 (diff) | |
download | mongo-adde6aca20580af59d21896c90281547c55680c8.tar.gz |
Revert "SERVER-22858 create a new Applier which wraps multiApply"
This reverts commit 1bcf7cc983a9ea51b41b0549ad5a3811bf7057b7.
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/base/error_codes.err | 1 | ||||
-rw-r--r-- | src/mongo/db/commands/mr.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/repl/SConscript | 12 | ||||
-rw-r--r-- | src/mongo/db/repl/data_replicator.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/repl/data_replicator.h | 13 | ||||
-rw-r--r-- | src/mongo/db/repl/data_replicator_test.cpp | 61 | ||||
-rw-r--r-- | src/mongo/db/repl/multiapplier.cpp | 231 | ||||
-rw-r--r-- | src/mongo/db/repl/multiapplier.h | 184 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_entry.cpp | 26 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_entry.h | 5 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl.cpp | 7 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail.cpp | 157 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail.h | 22 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail_test.cpp | 2 |
14 files changed, 115 insertions, 614 deletions
diff --git a/src/mongo/base/error_codes.err b/src/mongo/base/error_codes.err index 0aa280617aa..000a5e0ca4a 100644 --- a/src/mongo/base/error_codes.err +++ b/src/mongo/base/error_codes.err @@ -153,7 +153,6 @@ error_code("OperationCannotBeBatched", 151) error_code("OplogOutOfOrder", 152) error_code("ChunkTooBig", 153) error_code("InconsistentShardIdentity", 154) -error_code("CannotApplyOplogWhilePrimary", 155) # Non-sequential error codes (for compatibility only) error_code("RecvStaleConfig", 9996) diff --git a/src/mongo/db/commands/mr.cpp b/src/mongo/db/commands/mr.cpp index c60577e21bb..52ae5cf6f7c 100644 --- a/src/mongo/db/commands/mr.cpp +++ b/src/mongo/db/commands/mr.cpp @@ -1472,8 +1472,7 @@ public: return appendCommandStatus( result, Status(ErrorCodes::OperationFailed, - str::stream() - << "Executor killed during mapReduce command")); + str::stream() << "Executor killed during mapReduce command")); } reduceTime += t.micros(); diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index 30b5d1779f0..a58585bee8b 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -728,17 +728,6 @@ env.CppUnitTest( ) env.Library( - target='multiapplier', - source=[ - 'multiapplier.cpp', - ], - LIBDEPS=[ - 'oplog_entry', - 'replication_executor', - ], -) - -env.Library( target='applier', source=[ 'applier.cpp', @@ -769,7 +758,6 @@ env.Library( 'applier', 'collection_cloner', 'database_cloner', - 'multiapplier', 'optime', 'reporter', '$BUILD_DIR/mongo/client/fetcher', diff --git a/src/mongo/db/repl/data_replicator.cpp b/src/mongo/db/repl/data_replicator.cpp index 660d53e1567..feb54538c08 100644 --- a/src/mongo/db/repl/data_replicator.cpp +++ b/src/mongo/db/repl/data_replicator.cpp @@ -1142,7 +1142,8 @@ void DataReplicator::_handleFailedApplyBatch(const TimestampStatus& ts, const Op void DataReplicator::_scheduleApplyAfterFetch(const Operations& ops) { ++_initialSyncState->fetchedMissingDocs; // TODO: check collection.isCapped, like SyncTail::getMissingDoc - const BSONElement missingIdElem = ops.begin()->getIdElement(); + const BSONObj failedOplogEntry = ops.begin()->raw; + const BSONElement missingIdElem = failedOplogEntry.getFieldDotted("o2._id"); const NamespaceString nss(ops.begin()->ns); const BSONObj query = BSON("find" << nss.coll() << "filter" << missingIdElem.wrap()); _tmpFetcher.reset(new QueryFetcher(_exec, @@ -1253,7 +1254,7 @@ Status DataReplicator::_scheduleApplyBatch_inlock(const Operations& ops) { _exec->wait(status.getValue()); }; - _applier.reset(new MultiApplier(_exec, ops, _opts.applierFn, _opts.multiApplyFn, lambda)); + _applier.reset(new Applier(_exec, ops, _opts.applierFn, lambda)); return _applier->start(); } diff --git a/src/mongo/db/repl/data_replicator.h b/src/mongo/db/repl/data_replicator.h index 53dda5f96c5..f6503abb8bf 100644 --- a/src/mongo/db/repl/data_replicator.h +++ b/src/mongo/db/repl/data_replicator.h @@ -37,7 +37,7 @@ #include "mongo/bson/bsonobj.h" #include "mongo/bson/timestamp.h" #include "mongo/db/namespace_string.h" -#include "mongo/db/repl/multiapplier.h" +#include "mongo/db/repl/applier.h" #include "mongo/db/repl/collection_cloner.h" #include "mongo/db/repl/database_cloner.h" #include "mongo/db/repl/optime.h" @@ -56,7 +56,7 @@ class QueryFetcher; namespace repl { -using Operations = MultiApplier::Operations; +using Operations = Applier::Operations; using QueryResponseStatus = StatusWith<Fetcher::QueryResponse>; using CallbackArgs = ReplicationExecutor::CallbackArgs; using CBHStatus = StatusWith<ReplicationExecutor::CallbackHandle>; @@ -132,8 +132,7 @@ struct DataReplicatorOptions { std::string scopeNS; BSONObj filterCriteria; - MultiApplier::ApplyOperationFn applierFn; - MultiApplier::MultiApplyFn multiApplyFn; + Applier::ApplyOperationFn applierFn; RollbackFn rollbackFn; Reporter::PrepareReplSetUpdatePositionCommandFn prepareReplSetUpdatePositionCommandFn; GetMyLastOptimeFn getMyLastOptime; @@ -308,9 +307,9 @@ private: Handle _reporterHandle; // (M) std::unique_ptr<Reporter> _reporter; // (M) - bool _applierActive; // (M) - bool _applierPaused; // (X) - std::unique_ptr<MultiApplier> _applier; // (M) + bool _applierActive; // (M) + bool _applierPaused; // (X) + std::unique_ptr<Applier> _applier; // (M) HostAndPort _syncSource; // (M) Timestamp _lastTimestampFetched; // (MX) diff --git a/src/mongo/db/repl/data_replicator_test.cpp b/src/mongo/db/repl/data_replicator_test.cpp index 5d3513522d5..b8199ebf9dd 100644 --- a/src/mongo/db/repl/data_replicator_test.cpp +++ b/src/mongo/db/repl/data_replicator_test.cpp @@ -102,11 +102,7 @@ public: * clear/reset state */ void reset() { - _applierFn = [](const MultiApplier::Operations&) -> Status { return Status::OK(); }; - _multiApplyFn = [](OperationContext*, - const MultiApplier::Operations& ops, - MultiApplier::ApplyOperationFn) - -> StatusWith<OpTime> { return ops.back().getOpTime(); }; + _applierFn = [](OperationContext*, const OplogEntry&) -> Status { return Status::OK(); }; _rollbackFn = [](OperationContext*, const OpTime&, const HostAndPort&) -> Status { return Status::OK(); }; _setMyLastOptime = [this](const OpTime& opTime) { _myLastOpTime = opTime; }; @@ -185,11 +181,9 @@ protected: launchExecutorThread(); DataReplicatorOptions options; options.initialSyncRetryWait = Milliseconds(0); - options.applierFn = [this](const MultiApplier::Operations& ops) { return _applierFn(ops); }; - options.multiApplyFn = - [this](OperationContext* txn, - const MultiApplier::Operations& ops, - MultiApplier::ApplyOperationFn func) { return _multiApplyFn(txn, ops, func); }; + options.applierFn = [this](OperationContext* txn, const OplogEntry& operation) { + return _applierFn(txn, operation); + }; options.rollbackFn = [this](OperationContext* txn, const OpTime& lastOpTimeWritten, const HostAndPort& syncSource) -> Status { @@ -220,8 +214,7 @@ protected: // Executor may still invoke callback before shutting down. } - MultiApplier::ApplyOperationFn _applierFn; - MultiApplier::MultiApplyFn _multiApplyFn; + Applier::ApplyOperationFn _applierFn; DataReplicatorOptions::RollbackFn _rollbackFn; DataReplicatorOptions::SetMyLastOptimeFn _setMyLastOptime; OpTime _myLastOpTime; @@ -459,17 +452,15 @@ TEST_F(InitialSyncTest, Complete) { verifySync(); } -TEST_F(InitialSyncTest, MissingDocOnMultiApplyCompletes) { +TEST_F(InitialSyncTest, MissingDocOnApplyCompletes) { DataReplicatorOptions opts; int applyCounter{0}; - _multiApplyFn = - [&](OperationContext*, const MultiApplier::Operations& ops, MultiApplier::ApplyOperationFn) - -> StatusWith<OpTime> { - if (++applyCounter == 1) { - return Status(ErrorCodes::NoMatchingDocument, "failed: missing doc."); - } - return ops.back().getOpTime(); - }; + _applierFn = [&](OperationContext* txn, const OplogEntry& op) { + if (++applyCounter == 1) { + return Status(ErrorCodes::NoMatchingDocument, "failed: missing doc."); + } + return Status::OK(); + }; const std::vector<BSONObj> responses = { // get latest oplog ts @@ -879,14 +870,12 @@ TEST_F(SteadyStateTest, PauseDataReplicator) { unittest::Barrier barrier(2U); Timestamp lastTimestampApplied; BSONObj operationApplied; - _multiApplyFn = - [&](OperationContext*, const MultiApplier::Operations& ops, MultiApplier::ApplyOperationFn) - -> StatusWith<OpTime> { - stdx::lock_guard<stdx::mutex> lock(mutex); - operationApplied = ops.back().raw; - barrier.countDownAndWait(); - return ops.back().getOpTime(); - }; + _applierFn = [&](OperationContext* txn, const OplogEntry& op) { + stdx::lock_guard<stdx::mutex> lock(mutex); + operationApplied = op.raw; + barrier.countDownAndWait(); + return Status::OK(); + }; DataReplicatorOptions::SetMyLastOptimeFn oldSetMyLastOptime = _setMyLastOptime; _setMyLastOptime = [&](const OpTime& opTime) { oldSetMyLastOptime(opTime); @@ -963,14 +952,12 @@ TEST_F(SteadyStateTest, ApplyOneOperation) { unittest::Barrier barrier(2U); Timestamp lastTimestampApplied; BSONObj operationApplied; - _multiApplyFn = - [&](OperationContext*, const MultiApplier::Operations& ops, MultiApplier::ApplyOperationFn) - -> StatusWith<OpTime> { - stdx::lock_guard<stdx::mutex> lock(mutex); - operationApplied = ops.back().raw; - barrier.countDownAndWait(); - return ops.back().getOpTime(); - }; + _applierFn = [&](OperationContext* txn, const OplogEntry& op) { + stdx::lock_guard<stdx::mutex> lock(mutex); + operationApplied = op.raw; + barrier.countDownAndWait(); + return Status::OK(); + }; DataReplicatorOptions::SetMyLastOptimeFn oldSetMyLastOptime = _setMyLastOptime; _setMyLastOptime = [&](const OpTime& opTime) { oldSetMyLastOptime(opTime); diff --git a/src/mongo/db/repl/multiapplier.cpp b/src/mongo/db/repl/multiapplier.cpp deleted file mode 100644 index 8db87b5e1f5..00000000000 --- a/src/mongo/db/repl/multiapplier.cpp +++ /dev/null @@ -1,231 +0,0 @@ -/** - * Copyright (C) 2015 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication - -#include "mongo/platform/basic.h" - -#include "mongo/db/repl/multiapplier.h" - -#include <algorithm> - -#include "mongo/db/operation_context.h" -#include "mongo/db/repl/optime.h" -#include "mongo/db/repl/replication_executor.h" - -namespace mongo { -namespace repl { - -MultiApplier::MultiApplier(ReplicationExecutor* executor, - const Operations& operations, - const ApplyOperationFn& applyOperation, - const MultiApplyFn& multiApply, - const CallbackFn& onCompletion) - : _executor(executor), - _operations(operations), - _applyOperation(applyOperation), - _multiApply(multiApply), - _onCompletion(onCompletion), - _active(false) { - uassert(ErrorCodes::BadValue, "null replication executor", executor); - uassert(ErrorCodes::BadValue, "empty list of operations", !operations.empty()); - uassert(ErrorCodes::FailedToParse, - str::stream() << "last operation missing 'ts' field: " << operations.back().raw, - operations.back().raw.hasField("ts")); - uassert(ErrorCodes::TypeMismatch, - str::stream() << "'ts' in last operation not a timestamp: " << operations.back().raw, - BSONType::bsonTimestamp == operations.back().raw.getField("ts").type()); - uassert(ErrorCodes::BadValue, "apply operation function cannot be null", applyOperation); - uassert(ErrorCodes::BadValue, "callback function cannot be null", onCompletion); -} - -MultiApplier::~MultiApplier() { - DESTRUCTOR_GUARD(cancel(); wait();); -} - -std::string MultiApplier::getDiagnosticString() const { - stdx::lock_guard<stdx::mutex> lk(_mutex); - str::stream output; - output << "MultiApplier"; - output << " executor: " << _executor->getDiagnosticString(); - output << " active: " << _active; - return output; -} - -bool MultiApplier::isActive() const { - stdx::lock_guard<stdx::mutex> lk(_mutex); - return _active; -} - -Status MultiApplier::start() { - stdx::lock_guard<stdx::mutex> lk(_mutex); - - if (_active) { - return Status(ErrorCodes::IllegalOperation, "applier already started"); - } - - auto scheduleResult = _executor->scheduleDBWork( - stdx::bind(&MultiApplier::_callback, this, stdx::placeholders::_1)); - if (!scheduleResult.isOK()) { - return scheduleResult.getStatus(); - } - - _active = true; - _dbWorkCallbackHandle = scheduleResult.getValue(); - - return Status::OK(); -} - -void MultiApplier::cancel() { - ReplicationExecutor::CallbackHandle dbWorkCallbackHandle; - { - stdx::lock_guard<stdx::mutex> lk(_mutex); - - if (!_active) { - return; - } - - dbWorkCallbackHandle = _dbWorkCallbackHandle; - } - - if (dbWorkCallbackHandle.isValid()) { - _executor->cancel(dbWorkCallbackHandle); - } -} - -void MultiApplier::wait() { - stdx::unique_lock<stdx::mutex> lk(_mutex); - - while (_active) { - _condition.wait(lk); - } -} - -// TODO change the passed in function to be multiapply instead of apply inlock -void MultiApplier::_callback(const ReplicationExecutor::CallbackArgs& cbd) { - if (!cbd.status.isOK()) { - _finishCallback(cbd.status, _operations); - return; - } - - invariant(cbd.txn); - - // Refer to multiSyncApply() and multiInitialSyncApply() in sync_tail.cpp. - cbd.txn->setReplicatedWrites(false); - - // allow us to get through the magic barrier - cbd.txn->lockState()->setIsBatchWriter(true); - - StatusWith<OpTime> applyStatus(ErrorCodes::InternalError, "not mutated"); - - invariant(!_operations.empty()); - try { - applyStatus = _multiApply(cbd.txn, _operations, _applyOperation); - } catch (...) { - applyStatus = exceptionToStatus(); - } - if (!applyStatus.isOK()) { - _finishCallback(applyStatus.getStatus(), _operations); - return; - } - _finishCallback(applyStatus.getValue().getTimestamp(), Operations()); -} - -void MultiApplier::_finishCallback(const StatusWith<Timestamp>& result, - const Operations& operations) { - _onCompletion(result, operations); - stdx::lock_guard<stdx::mutex> lk(_mutex); - _active = false; - _condition.notify_all(); -} - -namespace { - -void pauseBeforeCompletion(const StatusWith<Timestamp>& result, - const MultiApplier::Operations& operationsOnCompletion, - const PauseDataReplicatorFn& pauseDataReplicator, - const MultiApplier::CallbackFn& onCompletion) { - if (result.isOK()) { - pauseDataReplicator(); - } - onCompletion(result, operationsOnCompletion); -}; - -} // namespace - -StatusWith<std::pair<std::unique_ptr<MultiApplier>, MultiApplier::Operations>> applyUntilAndPause( - ReplicationExecutor* executor, - const MultiApplier::Operations& operations, - const MultiApplier::ApplyOperationFn& applyOperation, - const MultiApplier::MultiApplyFn& multiApply, - const Timestamp& lastTimestampToApply, - const PauseDataReplicatorFn& pauseDataReplicator, - const MultiApplier::CallbackFn& onCompletion) { - try { - auto comp = [](const OplogEntry& left, const OplogEntry& right) { - uassert(ErrorCodes::FailedToParse, - str::stream() << "Operation missing 'ts' field': " << left.raw, - left.raw.hasField("ts")); - uassert(ErrorCodes::FailedToParse, - str::stream() << "Operation missing 'ts' field': " << right.raw, - right.raw.hasField("ts")); - return left.raw["ts"].timestamp() < right.raw["ts"].timestamp(); - }; - auto wrapped = OplogEntry(BSON("ts" << lastTimestampToApply)); - auto i = std::lower_bound(operations.cbegin(), operations.cend(), wrapped, comp); - bool found = i != operations.cend() && !comp(wrapped, *i); - auto j = found ? i + 1 : i; - MultiApplier::Operations operationsInRange(operations.cbegin(), j); - MultiApplier::Operations operationsNotInRange(j, operations.cend()); - if (!found) { - return std::make_pair( - std::unique_ptr<MultiApplier>(new MultiApplier( - executor, operationsInRange, applyOperation, multiApply, onCompletion)), - operationsNotInRange); - } - - return std::make_pair( - std::unique_ptr<MultiApplier>(new MultiApplier(executor, - operationsInRange, - applyOperation, - multiApply, - stdx::bind(pauseBeforeCompletion, - stdx::placeholders::_1, - stdx::placeholders::_2, - pauseDataReplicator, - onCompletion))), - operationsNotInRange); - } catch (...) { - return exceptionToStatus(); - } - MONGO_UNREACHABLE; - return Status(ErrorCodes::InternalError, "unreachable"); -} - -} // namespace repl -} // namespace mongo diff --git a/src/mongo/db/repl/multiapplier.h b/src/mongo/db/repl/multiapplier.h deleted file mode 100644 index f3d5ea3badf..00000000000 --- a/src/mongo/db/repl/multiapplier.h +++ /dev/null @@ -1,184 +0,0 @@ -/** - * Copyright (C) 2015 MongoDB Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#pragma once - -#include <memory> -#include <string> -#include <utility> -#include <vector> - -#include "mongo/base/disallow_copying.h" -#include "mongo/base/status.h" -#include "mongo/base/status_with.h" -#include "mongo/db/jsobj.h" -#include "mongo/db/repl/oplog_entry.h" -#include "mongo/db/repl/replication_executor.h" -#include "mongo/db/service_context.h" -#include "mongo/stdx/condition_variable.h" -#include "mongo/stdx/functional.h" -#include "mongo/stdx/mutex.h" - -namespace mongo { -namespace repl { - -class OpTime; - -class MultiApplier { - MONGO_DISALLOW_COPYING(MultiApplier); - -public: - static const size_t kReplWriterThreadCount = 16; - - /** - * Operations sorted by timestamp in ascending order. - */ - using Operations = std::vector<OplogEntry>; - - /** - * Callback function to report final status of applying operations along with - * list of operations (if applicable) that were not successfully applied. - * On success, returns the timestamp of the last operation applied together with an empty - * list of operations. - */ - using CallbackFn = stdx::function<void(const StatusWith<Timestamp>&, const Operations&)>; - - /** - * Type of function to to apply a single operation. In production, this function - * would have the same outcome as calling SyncTail::syncApply() ('convertUpdatesToUpserts' - * value will be embedded in the function implementation). - */ - using ApplyOperationFn = stdx::function<void(const Operations&)>; - - using MultiApplyFn = stdx::function<StatusWith<OpTime>( - OperationContext*, const MultiApplier::Operations&, MultiApplier::ApplyOperationFn)>; - - /** - * Creates MultiApplier in inactive state. - * - * Accepts list of oplog entries to apply in 'operations'. - * - * The callback function will be invoked (after schedule()) when the applied has - * successfully applied all the operations or encountered a failure. Failures may occur if - * we failed to apply an operation; or if the underlying scheduled work item - * on the replication executor was canceled. - * - * It is an error for 'operations' to be empty but individual oplog entries - * contained in 'operations' are not validated. - */ - MultiApplier(ReplicationExecutor* executor, - const Operations& operations, - const ApplyOperationFn& applyOperation, - const MultiApplyFn& multiApply, - const CallbackFn& onCompletion); - - /** - * Blocks while applier is active. - */ - virtual ~MultiApplier(); - - /** - * Returns diagnostic information. - */ - std::string getDiagnosticString() const; - - /** - * Returns true if the applier has been started (but has not completed). - */ - bool isActive() const; - - /** - * Starts applier by scheduling initial db work to be run by the executor. - */ - Status start(); - - /** - * Cancels current db work request. - * Returns immediately if applier is not active. - * - * Callback function may be invoked with an ErrorCodes::CallbackCanceled status. - */ - void cancel(); - - /** - * Waits for active database worker to complete. - * Returns immediately if applier is not active. - */ - void wait(); - -private: - /** - * DB worker callback function - applies all operations. - */ - void _callback(const ReplicationExecutor::CallbackArgs& cbd); - void _finishCallback(const StatusWith<Timestamp>& result, const Operations& operations); - - // Not owned by us. - ReplicationExecutor* _executor; - - Operations _operations; - ApplyOperationFn _applyOperation; - MultiApplyFn _multiApply; - CallbackFn _onCompletion; - - // Protects member data of this MultiApplier. - mutable stdx::mutex _mutex; - - stdx::condition_variable _condition; - - // _active is true when MultiApplier is scheduled to be run by the executor. - bool _active; - - ReplicationExecutor::CallbackHandle _dbWorkCallbackHandle; -}; - - -/** - * Applies operations (sorted by timestamp) up to and including 'lastTimestampToApply'. - * If 'lastTimestampToApply' is found in 'operations': - * - The applier will be given a subset of 'operations' (includes 'lastTimestampToApply'). - * - On success, the applier will invoke the 'pause' function just before reporting - * completion status. - * Otherwise, all entries in 'operations' before 'lastTimestampToApply' will be forwarded to - * the applier and the 'pause' function will be ignored. - * If the applier is successfully created, returns the applier and a list of operations that - * are skipped (operations with 'ts' field value after 'lastTimestampToApply). - */ -using PauseDataReplicatorFn = stdx::function<void()>; - -StatusWith<std::pair<std::unique_ptr<MultiApplier>, MultiApplier::Operations>> applyUntilAndPause( - ReplicationExecutor* executor, - const MultiApplier::Operations& operations, - const MultiApplier::ApplyOperationFn& applyOperation, - const MultiApplier::ApplyOperationFn& multiApply, - const Timestamp& lastTimestampToApply, - const PauseDataReplicatorFn& pauseDataReplicator, - const MultiApplier::CallbackFn& onCompletion); - -} // namespace repl -} // namespace mongo diff --git a/src/mongo/db/repl/oplog_entry.cpp b/src/mongo/db/repl/oplog_entry.cpp index 477b1538d3f..75bba6782b4 100644 --- a/src/mongo/db/repl/oplog_entry.cpp +++ b/src/mongo/db/repl/oplog_entry.cpp @@ -62,16 +62,6 @@ bool OplogEntry::isCommand() const { return opType[0] == 'c'; } -bool OplogEntry::isCrudOpType() const { - switch (opType[0]) { - case 'd': - case 'i': - case 'u': - return opType[1] == 0; - } - return false; -} - bool OplogEntry::hasNamespace() const { return !ns.empty(); } @@ -80,22 +70,6 @@ int OplogEntry::getVersion() const { return version.eoo() ? 1 : version.Int(); } -BSONElement OplogEntry::getIdElement() const { - invariant(isCrudOpType()); - switch (opType[0]) { - case 'u': - return o2.Obj()["_id"]; - case 'd': - case 'i': - return o.Obj()["_id"]; - } - MONGO_UNREACHABLE; -} - -OpTime OplogEntry::getOpTime() const { - return fassertStatusOK(34436, OpTime::parseFromOplogEntry(raw)); -} - Seconds OplogEntry::getTimestampSecs() const { return Seconds(ts.timestamp().getSecs()); } diff --git a/src/mongo/db/repl/oplog_entry.h b/src/mongo/db/repl/oplog_entry.h index 90f7bdbe8bc..08fd1c8d95a 100644 --- a/src/mongo/db/repl/oplog_entry.h +++ b/src/mongo/db/repl/oplog_entry.h @@ -30,8 +30,6 @@ #include "mongo/bson/bsonobj.h" -#include "mongo/db/repl/optime.h" - namespace mongo { namespace repl { @@ -54,11 +52,8 @@ struct OplogEntry { bool isForCappedCollection = false; bool isCommand() const; - bool isCrudOpType() const; bool hasNamespace() const; int getVersion() const; - BSONElement getIdElement() const; - OpTime getOpTime() const; Seconds getTimestampSecs() const; StringData getCollectionName() const; std::string toString() const; diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index e679c4dc3d7..e4cf3c2b943 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -182,12 +182,9 @@ ReplicationCoordinator::Mode getReplicationModeFromSettings(const ReplSettings& DataReplicatorOptions createDataReplicatorOptions(ReplicationCoordinator* replCoord) { DataReplicatorOptions options; - options.applierFn = [](const MultiApplier::Operations&) -> Status { return Status::OK(); }; - options.multiApplyFn = - [](OperationContext*, const MultiApplier::Operations&, MultiApplier::ApplyOperationFn) - -> OpTime { return OpTime(); }; + options.applierFn = [](OperationContext*, const OplogEntry&) -> Status { return Status::OK(); }; options.rollbackFn = - [](OperationContext*, const OpTime&, const HostAndPort&) -> Status { return Status::OK(); }; + [](OperationContext*, const OpTime&, const HostAndPort&) { return Status::OK(); }; options.prepareReplSetUpdatePositionCommandFn = [replCoord](ReplicationCoordinator::ReplSetUpdatePositionCommandStyle commandStyle) -> StatusWith<BSONObj> { diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index cacfd14590e..cdee4896fab 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -56,7 +56,6 @@ #include "mongo/db/query/query_knobs.h" #include "mongo/db/repl/bgsync.h" #include "mongo/db/repl/minvalid.h" -#include "mongo/db/repl/multiapplier.h" #include "mongo/db/repl/oplog.h" #include "mongo/db/repl/oplogreader.h" #include "mongo/db/repl/repl_client_info.h" @@ -433,7 +432,7 @@ void prefetchOp(const BSONObj& op) { } // Doles out all the work to the reader pool threads and waits for them to complete -void prefetchOps(const MultiApplier::Operations& ops, OldThreadPool* prefetcherPool) { +void prefetchOps(const std::deque<OplogEntry>& ops, OldThreadPool* prefetcherPool) { invariant(prefetcherPool); for (auto&& op : ops) { prefetcherPool->schedule(&prefetchOp, op.raw); @@ -442,13 +441,16 @@ void prefetchOps(const MultiApplier::Operations& ops, OldThreadPool* prefetcherP } // Doles out all the work to the writer pool threads and waits for them to complete -void applyOps(const std::vector<MultiApplier::Operations>& writerVectors, +void applyOps(const std::vector<std::vector<OplogEntry>>& writerVectors, OldThreadPool* writerPool, - SyncTail::MultiSyncApplyFunc func) { + SyncTail::MultiSyncApplyFunc func, + SyncTail* sync) { TimerHolder timer(&applyBatchStats); - for (auto&& ops : writerVectors) { - if (!ops.empty()) { - writerPool->schedule(func, stdx::cref(ops)); + for (std::vector<std::vector<OplogEntry>>::const_iterator it = writerVectors.begin(); + it != writerVectors.end(); + ++it) { + if (!it->empty()) { + writerPool->schedule(func, stdx::cref(*it), sync); } } } @@ -484,8 +486,8 @@ private: }; void fillWriterVectors(OperationContext* txn, - const MultiApplier::Operations& ops, - std::vector<MultiApplier::Operations>* writerVectors) { + const std::deque<OplogEntry>& ops, + std::vector<std::vector<OplogEntry>>* writerVectors) { const bool supportsDocLocking = getGlobalServiceContext()->getGlobalStorageEngine()->supportsDocLocking(); const uint32_t numWriters = writerVectors->size(); @@ -498,12 +500,24 @@ void fillWriterVectors(OperationContext* txn, StringMapTraits::HashedKey hashedNs(op.ns); uint32_t hash = hashedNs.hash(); + const char* opType = op.opType.rawData(); + // For doc locking engines, include the _id of the document in the hash so we get // parallelism even if all writes are to a single collection. We can't do this for capped // collections because the order of inserts is a guaranteed property, unlike for normal // collections. - if (supportsDocLocking && op.isCrudOpType() && !isCapped(txn, hashedNs)) { - BSONElement id = op.getIdElement(); + if (supportsDocLocking && isCrudOpType(opType) && !isCapped(txn, hashedNs)) { + BSONElement id; + switch (opType[0]) { + case 'u': + id = op.o2.Obj()["_id"]; + break; + case 'd': + case 'i': + id = op.o.Obj()["_id"]; + break; + } + const size_t idHash = BSONElement::Hasher()(id); MurmurHash3_x86_32(&idHash, sizeof(idHash), hash, &hash); } @@ -525,17 +539,50 @@ void fillWriterVectors(OperationContext* txn, // Applies a batch of oplog entries, by using a set of threads to apply the operations and then // writes the oplog entries to the local oplog. OpTime SyncTail::multiApply(OperationContext* txn, const OpQueue& ops) { - auto convertToVector = ops.getDeque(); - auto status = repl::multiApply( - txn, MultiApplier::Operations(convertToVector.begin(), convertToVector.end()), _applyFunc); - if (!status.isOK()) { - if (status == ErrorCodes::InterruptedAtShutdown) { - return OpTime(); - } else { - fassertStatusOK(34437, status); + invariant(_applyFunc); + + if (getGlobalServiceContext()->getGlobalStorageEngine()->isMmapV1()) { + // Use a ThreadPool to prefetch all the operations in a batch. + prefetchOps(ops.getDeque(), &_prefetcherPool); + } + + std::vector<std::vector<OplogEntry>> writerVectors(replWriterThreadCount); + + fillWriterVectors(txn, ops.getDeque(), &writerVectors); + LOG(2) << "replication batch size is " << ops.getDeque().size() << endl; + // We must grab this because we're going to grab write locks later. + // We hold this mutex the entire time we're writing; it doesn't matter + // because all readers are blocked anyway. + stdx::lock_guard<SimpleMutex> fsynclk(filesLockedFsync); + + // stop all readers until we're done + Lock::ParallelBatchWriterMode pbwm(txn->lockState()); + + ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator(); + if (replCoord->getMemberState().primary() && !replCoord->isWaitingForApplierToDrain()) { + severe() << "attempting to replicate ops while primary"; + fassertFailed(28527); + } + + applyOps(writerVectors, &_writerPool, _applyFunc, this); + + OpTime lastOpTime; + { + ON_BLOCK_EXIT([&] { _writerPool.join(); }); + std::vector<BSONObj> raws; + raws.reserve(ops.getDeque().size()); + for (auto&& op : ops.getDeque()) { + raws.emplace_back(op.raw); } + lastOpTime = writeOpsToOplog(txn, raws); + } + + if (inShutdownStrict()) { + log() << "Cannot apply operations due to shutdown in progress"; + return OpTime(); } - return status.getValue(); + // We have now written all database writes and updated the oplog to match. + return lastOpTime; } namespace { @@ -966,7 +1013,7 @@ static void initializeWriterThread() { } // This free function is used by the writer threads to apply each op -void multiSyncApply(const std::vector<OplogEntry>& ops) { +void multiSyncApply(const std::vector<OplogEntry>& ops, SyncTail* st) { std::vector<OplogEntry> oplogEntries(ops.begin(), ops.end()); std::vector<OplogEntry*> oplogEntryPointers(oplogEntries.size()); for (size_t i = 0; i < oplogEntries.size(); i++) { @@ -1083,7 +1130,7 @@ void multiSyncApply(const std::vector<OplogEntry>& ops) { } // This free function is used by the initial sync writer threads to apply each op -void multiInitialSyncApply(const std::vector<OplogEntry>& ops) { +void multiInitialSyncApply(const std::vector<OplogEntry>& ops, SyncTail* st) { initializeWriterThread(); OperationContextImpl txn; @@ -1099,17 +1146,14 @@ void multiInitialSyncApply(const std::vector<OplogEntry>& ops) { try { const Status s = SyncTail::syncApply(&txn, it->raw, convertUpdatesToUpserts); if (!s.isOK()) { - /* TODO (dannenberg) fix this part. SERVER-23308 if (st->shouldRetry(&txn, it->raw)) { const Status s2 = SyncTail::syncApply(&txn, it->raw, convertUpdatesToUpserts); if (!s2.isOK()) { - */ - severe() << "Error applying operation (" << it->raw.toString() << "): " << s; - fassertFailedNoTrace(15915); - /* - } - } -*/ + severe() << "Error applying operation (" << it->raw.toString() + << "): " << s2; + fassertFailedNoTrace(15915); + } + } // If shouldRetry() returns false, fall through. // This can happen if the document that was moved and missed by Cloner @@ -1128,58 +1172,5 @@ void multiInitialSyncApply(const std::vector<OplogEntry>& ops) { } } -StatusWith<OpTime> multiApply(OperationContext* txn, - const MultiApplier::Operations& ops, - MultiApplier::ApplyOperationFn applyOperation) { - invariant(applyOperation); - - OldThreadPool workerPool(MultiApplier::kReplWriterThreadCount, "repl writer worker "); - - if (getGlobalServiceContext()->getGlobalStorageEngine()->isMmapV1()) { - // Use a ThreadPool to prefetch all the operations in a batch. - prefetchOps(ops, &workerPool); - } - - std::vector<std::vector<OplogEntry>> writerVectors(MultiApplier::kReplWriterThreadCount); - - fillWriterVectors(txn, ops, &writerVectors); - LOG(2) << "replication batch size is " << ops.size(); - // We must grab this because we're going to grab write locks later. - // We hold this mutex the entire time we're writing; it doesn't matter - // because all readers are blocked anyway. - stdx::lock_guard<SimpleMutex> fsynclk(filesLockedFsync); - - // stop all readers until we're done - Lock::ParallelBatchWriterMode pbwm(txn->lockState()); - - auto replCoord = ReplicationCoordinator::get(txn); - if (replCoord->getMemberState().primary() && !replCoord->isWaitingForApplierToDrain()) { - severe() << "attempting to replicate ops while primary"; - return {ErrorCodes::CannotApplyOplogWhilePrimary, - "attempting to replicate ops while primary"}; - } - - applyOps(writerVectors, &workerPool, applyOperation); - - OpTime lastOpTime; - { - ON_BLOCK_EXIT([&] { workerPool.join(); }); - std::vector<BSONObj> raws; - raws.reserve(ops.size()); - for (auto&& op : ops) { - raws.emplace_back(op.raw); - } - lastOpTime = writeOpsToOplog(txn, raws); - } - - if (inShutdownStrict()) { - log() << "Cannot apply operations due to shutdown in progress"; - return {ErrorCodes::InterruptedAtShutdown, - "Cannot apply operations due to shutdown in progress"}; - } - // We have now written all database writes and updated the oplog to match. - return lastOpTime; -} - } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/sync_tail.h b/src/mongo/db/repl/sync_tail.h index 9d42ad1ae02..7fffb39d6e8 100644 --- a/src/mongo/db/repl/sync_tail.h +++ b/src/mongo/db/repl/sync_tail.h @@ -32,7 +32,6 @@ #include "mongo/base/status.h" #include "mongo/bson/bsonobj.h" -#include "mongo/db/repl/multiapplier.h" #include "mongo/db/repl/oplog_entry.h" #include "mongo/db/storage/mmap_v1/dur.h" #include "mongo/stdx/functional.h" @@ -53,7 +52,8 @@ class OpTime; */ class SyncTail { public: - using MultiSyncApplyFunc = stdx::function<void(const std::vector<OplogEntry>& ops)>; + using MultiSyncApplyFunc = + stdx::function<void(const std::vector<OplogEntry>& ops, SyncTail* st)>; /** * Type of function to increment "repl.apply.ops" server status metric. @@ -178,23 +178,9 @@ private: OldThreadPool _prefetcherPool; }; -/** - * Applies the opeartions described in the oplog entries contained in "ops" using the - * "applyOperation" function. - * - * Returns ErrorCode::InterruptedAtShutdown if the node enters shutdown while applying ops, - * ErrorCodes::CannotApplyOplogWhilePrimary if the node has become primary, and the OpTime of the - * final operation applied otherwise. - * - * Shared between here and MultiApplier. - */ -StatusWith<OpTime> multiApply(OperationContext* txn, - const MultiApplier::Operations& ops, - MultiApplier::ApplyOperationFn applyOperation); - // These free functions are used by the thread pool workers to write ops to the db. -void multiSyncApply(const std::vector<OplogEntry>& ops); -void multiInitialSyncApply(const std::vector<OplogEntry>& ops); +void multiSyncApply(const std::vector<OplogEntry>& ops, SyncTail* st); +void multiInitialSyncApply(const std::vector<OplogEntry>& ops, SyncTail* st); } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/sync_tail_test.cpp b/src/mongo/db/repl/sync_tail_test.cpp index e157e02f173..3d0829151e5 100644 --- a/src/mongo/db/repl/sync_tail_test.cpp +++ b/src/mongo/db/repl/sync_tail_test.cpp @@ -124,7 +124,7 @@ void SyncTailTest::tearDown() { TEST_F(SyncTailTest, Peek) { BackgroundSyncMock bgsync; - SyncTail syncTail(&bgsync, [](const std::vector<OplogEntry>& ops) {}); + SyncTail syncTail(&bgsync, [](const std::vector<OplogEntry>& ops, SyncTail* st) {}); BSONObj obj; ASSERT_FALSE(syncTail.peek(&obj)); } |