diff options
Diffstat (limited to 'src/mongo/db/repl')
-rw-r--r-- | src/mongo/db/repl/SConscript | 12 | ||||
-rw-r--r-- | src/mongo/db/repl/data_replicator.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/repl/data_replicator.h | 13 | ||||
-rw-r--r-- | src/mongo/db/repl/data_replicator_test.cpp | 61 | ||||
-rw-r--r-- | src/mongo/db/repl/multiapplier.cpp | 231 | ||||
-rw-r--r-- | src/mongo/db/repl/multiapplier.h | 184 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_entry.cpp | 26 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_entry.h | 5 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl.cpp | 7 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail.cpp | 157 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail.h | 22 | ||||
-rw-r--r-- | src/mongo/db/repl/sync_tail_test.cpp | 2 |
12 files changed, 611 insertions, 114 deletions
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript index a58585bee8b..30b5d1779f0 100644 --- a/src/mongo/db/repl/SConscript +++ b/src/mongo/db/repl/SConscript @@ -728,6 +728,17 @@ env.CppUnitTest( ) env.Library( + target='multiapplier', + source=[ + 'multiapplier.cpp', + ], + LIBDEPS=[ + 'oplog_entry', + 'replication_executor', + ], +) + +env.Library( target='applier', source=[ 'applier.cpp', @@ -758,6 +769,7 @@ env.Library( 'applier', 'collection_cloner', 'database_cloner', + 'multiapplier', 'optime', 'reporter', '$BUILD_DIR/mongo/client/fetcher', diff --git a/src/mongo/db/repl/data_replicator.cpp b/src/mongo/db/repl/data_replicator.cpp index feb54538c08..660d53e1567 100644 --- a/src/mongo/db/repl/data_replicator.cpp +++ b/src/mongo/db/repl/data_replicator.cpp @@ -1142,8 +1142,7 @@ void DataReplicator::_handleFailedApplyBatch(const TimestampStatus& ts, const Op void DataReplicator::_scheduleApplyAfterFetch(const Operations& ops) { ++_initialSyncState->fetchedMissingDocs; // TODO: check collection.isCapped, like SyncTail::getMissingDoc - const BSONObj failedOplogEntry = ops.begin()->raw; - const BSONElement missingIdElem = failedOplogEntry.getFieldDotted("o2._id"); + const BSONElement missingIdElem = ops.begin()->getIdElement(); const NamespaceString nss(ops.begin()->ns); const BSONObj query = BSON("find" << nss.coll() << "filter" << missingIdElem.wrap()); _tmpFetcher.reset(new QueryFetcher(_exec, @@ -1254,7 +1253,7 @@ Status DataReplicator::_scheduleApplyBatch_inlock(const Operations& ops) { _exec->wait(status.getValue()); }; - _applier.reset(new Applier(_exec, ops, _opts.applierFn, lambda)); + _applier.reset(new MultiApplier(_exec, ops, _opts.applierFn, _opts.multiApplyFn, lambda)); return _applier->start(); } diff --git a/src/mongo/db/repl/data_replicator.h b/src/mongo/db/repl/data_replicator.h index f6503abb8bf..53dda5f96c5 100644 --- a/src/mongo/db/repl/data_replicator.h +++ b/src/mongo/db/repl/data_replicator.h @@ -37,7 +37,7 @@ #include "mongo/bson/bsonobj.h" #include "mongo/bson/timestamp.h" #include "mongo/db/namespace_string.h" -#include "mongo/db/repl/applier.h" +#include "mongo/db/repl/multiapplier.h" #include "mongo/db/repl/collection_cloner.h" #include "mongo/db/repl/database_cloner.h" #include "mongo/db/repl/optime.h" @@ -56,7 +56,7 @@ class QueryFetcher; namespace repl { -using Operations = Applier::Operations; +using Operations = MultiApplier::Operations; using QueryResponseStatus = StatusWith<Fetcher::QueryResponse>; using CallbackArgs = ReplicationExecutor::CallbackArgs; using CBHStatus = StatusWith<ReplicationExecutor::CallbackHandle>; @@ -132,7 +132,8 @@ struct DataReplicatorOptions { std::string scopeNS; BSONObj filterCriteria; - Applier::ApplyOperationFn applierFn; + MultiApplier::ApplyOperationFn applierFn; + MultiApplier::MultiApplyFn multiApplyFn; RollbackFn rollbackFn; Reporter::PrepareReplSetUpdatePositionCommandFn prepareReplSetUpdatePositionCommandFn; GetMyLastOptimeFn getMyLastOptime; @@ -307,9 +308,9 @@ private: Handle _reporterHandle; // (M) std::unique_ptr<Reporter> _reporter; // (M) - bool _applierActive; // (M) - bool _applierPaused; // (X) - std::unique_ptr<Applier> _applier; // (M) + bool _applierActive; // (M) + bool _applierPaused; // (X) + std::unique_ptr<MultiApplier> _applier; // (M) HostAndPort _syncSource; // (M) Timestamp _lastTimestampFetched; // (MX) diff --git a/src/mongo/db/repl/data_replicator_test.cpp b/src/mongo/db/repl/data_replicator_test.cpp index b8199ebf9dd..5d3513522d5 100644 --- a/src/mongo/db/repl/data_replicator_test.cpp +++ b/src/mongo/db/repl/data_replicator_test.cpp @@ -102,7 +102,11 @@ public: * clear/reset state */ void reset() { - _applierFn = [](OperationContext*, const OplogEntry&) -> Status { return Status::OK(); }; + _applierFn = [](const MultiApplier::Operations&) -> Status { return Status::OK(); }; + _multiApplyFn = [](OperationContext*, + const MultiApplier::Operations& ops, + MultiApplier::ApplyOperationFn) + -> StatusWith<OpTime> { return ops.back().getOpTime(); }; _rollbackFn = [](OperationContext*, const OpTime&, const HostAndPort&) -> Status { return Status::OK(); }; _setMyLastOptime = [this](const OpTime& opTime) { _myLastOpTime = opTime; }; @@ -181,9 +185,11 @@ protected: launchExecutorThread(); DataReplicatorOptions options; options.initialSyncRetryWait = Milliseconds(0); - options.applierFn = [this](OperationContext* txn, const OplogEntry& operation) { - return _applierFn(txn, operation); - }; + options.applierFn = [this](const MultiApplier::Operations& ops) { return _applierFn(ops); }; + options.multiApplyFn = + [this](OperationContext* txn, + const MultiApplier::Operations& ops, + MultiApplier::ApplyOperationFn func) { return _multiApplyFn(txn, ops, func); }; options.rollbackFn = [this](OperationContext* txn, const OpTime& lastOpTimeWritten, const HostAndPort& syncSource) -> Status { @@ -214,7 +220,8 @@ protected: // Executor may still invoke callback before shutting down. } - Applier::ApplyOperationFn _applierFn; + MultiApplier::ApplyOperationFn _applierFn; + MultiApplier::MultiApplyFn _multiApplyFn; DataReplicatorOptions::RollbackFn _rollbackFn; DataReplicatorOptions::SetMyLastOptimeFn _setMyLastOptime; OpTime _myLastOpTime; @@ -452,15 +459,17 @@ TEST_F(InitialSyncTest, Complete) { verifySync(); } -TEST_F(InitialSyncTest, MissingDocOnApplyCompletes) { +TEST_F(InitialSyncTest, MissingDocOnMultiApplyCompletes) { DataReplicatorOptions opts; int applyCounter{0}; - _applierFn = [&](OperationContext* txn, const OplogEntry& op) { - if (++applyCounter == 1) { - return Status(ErrorCodes::NoMatchingDocument, "failed: missing doc."); - } - return Status::OK(); - }; + _multiApplyFn = + [&](OperationContext*, const MultiApplier::Operations& ops, MultiApplier::ApplyOperationFn) + -> StatusWith<OpTime> { + if (++applyCounter == 1) { + return Status(ErrorCodes::NoMatchingDocument, "failed: missing doc."); + } + return ops.back().getOpTime(); + }; const std::vector<BSONObj> responses = { // get latest oplog ts @@ -870,12 +879,14 @@ TEST_F(SteadyStateTest, PauseDataReplicator) { unittest::Barrier barrier(2U); Timestamp lastTimestampApplied; BSONObj operationApplied; - _applierFn = [&](OperationContext* txn, const OplogEntry& op) { - stdx::lock_guard<stdx::mutex> lock(mutex); - operationApplied = op.raw; - barrier.countDownAndWait(); - return Status::OK(); - }; + _multiApplyFn = + [&](OperationContext*, const MultiApplier::Operations& ops, MultiApplier::ApplyOperationFn) + -> StatusWith<OpTime> { + stdx::lock_guard<stdx::mutex> lock(mutex); + operationApplied = ops.back().raw; + barrier.countDownAndWait(); + return ops.back().getOpTime(); + }; DataReplicatorOptions::SetMyLastOptimeFn oldSetMyLastOptime = _setMyLastOptime; _setMyLastOptime = [&](const OpTime& opTime) { oldSetMyLastOptime(opTime); @@ -952,12 +963,14 @@ TEST_F(SteadyStateTest, ApplyOneOperation) { unittest::Barrier barrier(2U); Timestamp lastTimestampApplied; BSONObj operationApplied; - _applierFn = [&](OperationContext* txn, const OplogEntry& op) { - stdx::lock_guard<stdx::mutex> lock(mutex); - operationApplied = op.raw; - barrier.countDownAndWait(); - return Status::OK(); - }; + _multiApplyFn = + [&](OperationContext*, const MultiApplier::Operations& ops, MultiApplier::ApplyOperationFn) + -> StatusWith<OpTime> { + stdx::lock_guard<stdx::mutex> lock(mutex); + operationApplied = ops.back().raw; + barrier.countDownAndWait(); + return ops.back().getOpTime(); + }; DataReplicatorOptions::SetMyLastOptimeFn oldSetMyLastOptime = _setMyLastOptime; _setMyLastOptime = [&](const OpTime& opTime) { oldSetMyLastOptime(opTime); diff --git a/src/mongo/db/repl/multiapplier.cpp b/src/mongo/db/repl/multiapplier.cpp new file mode 100644 index 00000000000..8db87b5e1f5 --- /dev/null +++ b/src/mongo/db/repl/multiapplier.cpp @@ -0,0 +1,231 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication + +#include "mongo/platform/basic.h" + +#include "mongo/db/repl/multiapplier.h" + +#include <algorithm> + +#include "mongo/db/operation_context.h" +#include "mongo/db/repl/optime.h" +#include "mongo/db/repl/replication_executor.h" + +namespace mongo { +namespace repl { + +MultiApplier::MultiApplier(ReplicationExecutor* executor, + const Operations& operations, + const ApplyOperationFn& applyOperation, + const MultiApplyFn& multiApply, + const CallbackFn& onCompletion) + : _executor(executor), + _operations(operations), + _applyOperation(applyOperation), + _multiApply(multiApply), + _onCompletion(onCompletion), + _active(false) { + uassert(ErrorCodes::BadValue, "null replication executor", executor); + uassert(ErrorCodes::BadValue, "empty list of operations", !operations.empty()); + uassert(ErrorCodes::FailedToParse, + str::stream() << "last operation missing 'ts' field: " << operations.back().raw, + operations.back().raw.hasField("ts")); + uassert(ErrorCodes::TypeMismatch, + str::stream() << "'ts' in last operation not a timestamp: " << operations.back().raw, + BSONType::bsonTimestamp == operations.back().raw.getField("ts").type()); + uassert(ErrorCodes::BadValue, "apply operation function cannot be null", applyOperation); + uassert(ErrorCodes::BadValue, "callback function cannot be null", onCompletion); +} + +MultiApplier::~MultiApplier() { + DESTRUCTOR_GUARD(cancel(); wait();); +} + +std::string MultiApplier::getDiagnosticString() const { + stdx::lock_guard<stdx::mutex> lk(_mutex); + str::stream output; + output << "MultiApplier"; + output << " executor: " << _executor->getDiagnosticString(); + output << " active: " << _active; + return output; +} + +bool MultiApplier::isActive() const { + stdx::lock_guard<stdx::mutex> lk(_mutex); + return _active; +} + +Status MultiApplier::start() { + stdx::lock_guard<stdx::mutex> lk(_mutex); + + if (_active) { + return Status(ErrorCodes::IllegalOperation, "applier already started"); + } + + auto scheduleResult = _executor->scheduleDBWork( + stdx::bind(&MultiApplier::_callback, this, stdx::placeholders::_1)); + if (!scheduleResult.isOK()) { + return scheduleResult.getStatus(); + } + + _active = true; + _dbWorkCallbackHandle = scheduleResult.getValue(); + + return Status::OK(); +} + +void MultiApplier::cancel() { + ReplicationExecutor::CallbackHandle dbWorkCallbackHandle; + { + stdx::lock_guard<stdx::mutex> lk(_mutex); + + if (!_active) { + return; + } + + dbWorkCallbackHandle = _dbWorkCallbackHandle; + } + + if (dbWorkCallbackHandle.isValid()) { + _executor->cancel(dbWorkCallbackHandle); + } +} + +void MultiApplier::wait() { + stdx::unique_lock<stdx::mutex> lk(_mutex); + + while (_active) { + _condition.wait(lk); + } +} + +// TODO change the passed in function to be multiapply instead of apply inlock +void MultiApplier::_callback(const ReplicationExecutor::CallbackArgs& cbd) { + if (!cbd.status.isOK()) { + _finishCallback(cbd.status, _operations); + return; + } + + invariant(cbd.txn); + + // Refer to multiSyncApply() and multiInitialSyncApply() in sync_tail.cpp. + cbd.txn->setReplicatedWrites(false); + + // allow us to get through the magic barrier + cbd.txn->lockState()->setIsBatchWriter(true); + + StatusWith<OpTime> applyStatus(ErrorCodes::InternalError, "not mutated"); + + invariant(!_operations.empty()); + try { + applyStatus = _multiApply(cbd.txn, _operations, _applyOperation); + } catch (...) { + applyStatus = exceptionToStatus(); + } + if (!applyStatus.isOK()) { + _finishCallback(applyStatus.getStatus(), _operations); + return; + } + _finishCallback(applyStatus.getValue().getTimestamp(), Operations()); +} + +void MultiApplier::_finishCallback(const StatusWith<Timestamp>& result, + const Operations& operations) { + _onCompletion(result, operations); + stdx::lock_guard<stdx::mutex> lk(_mutex); + _active = false; + _condition.notify_all(); +} + +namespace { + +void pauseBeforeCompletion(const StatusWith<Timestamp>& result, + const MultiApplier::Operations& operationsOnCompletion, + const PauseDataReplicatorFn& pauseDataReplicator, + const MultiApplier::CallbackFn& onCompletion) { + if (result.isOK()) { + pauseDataReplicator(); + } + onCompletion(result, operationsOnCompletion); +}; + +} // namespace + +StatusWith<std::pair<std::unique_ptr<MultiApplier>, MultiApplier::Operations>> applyUntilAndPause( + ReplicationExecutor* executor, + const MultiApplier::Operations& operations, + const MultiApplier::ApplyOperationFn& applyOperation, + const MultiApplier::MultiApplyFn& multiApply, + const Timestamp& lastTimestampToApply, + const PauseDataReplicatorFn& pauseDataReplicator, + const MultiApplier::CallbackFn& onCompletion) { + try { + auto comp = [](const OplogEntry& left, const OplogEntry& right) { + uassert(ErrorCodes::FailedToParse, + str::stream() << "Operation missing 'ts' field': " << left.raw, + left.raw.hasField("ts")); + uassert(ErrorCodes::FailedToParse, + str::stream() << "Operation missing 'ts' field': " << right.raw, + right.raw.hasField("ts")); + return left.raw["ts"].timestamp() < right.raw["ts"].timestamp(); + }; + auto wrapped = OplogEntry(BSON("ts" << lastTimestampToApply)); + auto i = std::lower_bound(operations.cbegin(), operations.cend(), wrapped, comp); + bool found = i != operations.cend() && !comp(wrapped, *i); + auto j = found ? i + 1 : i; + MultiApplier::Operations operationsInRange(operations.cbegin(), j); + MultiApplier::Operations operationsNotInRange(j, operations.cend()); + if (!found) { + return std::make_pair( + std::unique_ptr<MultiApplier>(new MultiApplier( + executor, operationsInRange, applyOperation, multiApply, onCompletion)), + operationsNotInRange); + } + + return std::make_pair( + std::unique_ptr<MultiApplier>(new MultiApplier(executor, + operationsInRange, + applyOperation, + multiApply, + stdx::bind(pauseBeforeCompletion, + stdx::placeholders::_1, + stdx::placeholders::_2, + pauseDataReplicator, + onCompletion))), + operationsNotInRange); + } catch (...) { + return exceptionToStatus(); + } + MONGO_UNREACHABLE; + return Status(ErrorCodes::InternalError, "unreachable"); +} + +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/multiapplier.h b/src/mongo/db/repl/multiapplier.h new file mode 100644 index 00000000000..f3d5ea3badf --- /dev/null +++ b/src/mongo/db/repl/multiapplier.h @@ -0,0 +1,184 @@ +/** + * Copyright (C) 2015 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <memory> +#include <string> +#include <utility> +#include <vector> + +#include "mongo/base/disallow_copying.h" +#include "mongo/base/status.h" +#include "mongo/base/status_with.h" +#include "mongo/db/jsobj.h" +#include "mongo/db/repl/oplog_entry.h" +#include "mongo/db/repl/replication_executor.h" +#include "mongo/db/service_context.h" +#include "mongo/stdx/condition_variable.h" +#include "mongo/stdx/functional.h" +#include "mongo/stdx/mutex.h" + +namespace mongo { +namespace repl { + +class OpTime; + +class MultiApplier { + MONGO_DISALLOW_COPYING(MultiApplier); + +public: + static const size_t kReplWriterThreadCount = 16; + + /** + * Operations sorted by timestamp in ascending order. + */ + using Operations = std::vector<OplogEntry>; + + /** + * Callback function to report final status of applying operations along with + * list of operations (if applicable) that were not successfully applied. + * On success, returns the timestamp of the last operation applied together with an empty + * list of operations. + */ + using CallbackFn = stdx::function<void(const StatusWith<Timestamp>&, const Operations&)>; + + /** + * Type of function to to apply a single operation. In production, this function + * would have the same outcome as calling SyncTail::syncApply() ('convertUpdatesToUpserts' + * value will be embedded in the function implementation). + */ + using ApplyOperationFn = stdx::function<void(const Operations&)>; + + using MultiApplyFn = stdx::function<StatusWith<OpTime>( + OperationContext*, const MultiApplier::Operations&, MultiApplier::ApplyOperationFn)>; + + /** + * Creates MultiApplier in inactive state. + * + * Accepts list of oplog entries to apply in 'operations'. + * + * The callback function will be invoked (after schedule()) when the applied has + * successfully applied all the operations or encountered a failure. Failures may occur if + * we failed to apply an operation; or if the underlying scheduled work item + * on the replication executor was canceled. + * + * It is an error for 'operations' to be empty but individual oplog entries + * contained in 'operations' are not validated. + */ + MultiApplier(ReplicationExecutor* executor, + const Operations& operations, + const ApplyOperationFn& applyOperation, + const MultiApplyFn& multiApply, + const CallbackFn& onCompletion); + + /** + * Blocks while applier is active. + */ + virtual ~MultiApplier(); + + /** + * Returns diagnostic information. + */ + std::string getDiagnosticString() const; + + /** + * Returns true if the applier has been started (but has not completed). + */ + bool isActive() const; + + /** + * Starts applier by scheduling initial db work to be run by the executor. + */ + Status start(); + + /** + * Cancels current db work request. + * Returns immediately if applier is not active. + * + * Callback function may be invoked with an ErrorCodes::CallbackCanceled status. + */ + void cancel(); + + /** + * Waits for active database worker to complete. + * Returns immediately if applier is not active. + */ + void wait(); + +private: + /** + * DB worker callback function - applies all operations. + */ + void _callback(const ReplicationExecutor::CallbackArgs& cbd); + void _finishCallback(const StatusWith<Timestamp>& result, const Operations& operations); + + // Not owned by us. + ReplicationExecutor* _executor; + + Operations _operations; + ApplyOperationFn _applyOperation; + MultiApplyFn _multiApply; + CallbackFn _onCompletion; + + // Protects member data of this MultiApplier. + mutable stdx::mutex _mutex; + + stdx::condition_variable _condition; + + // _active is true when MultiApplier is scheduled to be run by the executor. + bool _active; + + ReplicationExecutor::CallbackHandle _dbWorkCallbackHandle; +}; + + +/** + * Applies operations (sorted by timestamp) up to and including 'lastTimestampToApply'. + * If 'lastTimestampToApply' is found in 'operations': + * - The applier will be given a subset of 'operations' (includes 'lastTimestampToApply'). + * - On success, the applier will invoke the 'pause' function just before reporting + * completion status. + * Otherwise, all entries in 'operations' before 'lastTimestampToApply' will be forwarded to + * the applier and the 'pause' function will be ignored. + * If the applier is successfully created, returns the applier and a list of operations that + * are skipped (operations with 'ts' field value after 'lastTimestampToApply). + */ +using PauseDataReplicatorFn = stdx::function<void()>; + +StatusWith<std::pair<std::unique_ptr<MultiApplier>, MultiApplier::Operations>> applyUntilAndPause( + ReplicationExecutor* executor, + const MultiApplier::Operations& operations, + const MultiApplier::ApplyOperationFn& applyOperation, + const MultiApplier::ApplyOperationFn& multiApply, + const Timestamp& lastTimestampToApply, + const PauseDataReplicatorFn& pauseDataReplicator, + const MultiApplier::CallbackFn& onCompletion); + +} // namespace repl +} // namespace mongo diff --git a/src/mongo/db/repl/oplog_entry.cpp b/src/mongo/db/repl/oplog_entry.cpp index 75bba6782b4..477b1538d3f 100644 --- a/src/mongo/db/repl/oplog_entry.cpp +++ b/src/mongo/db/repl/oplog_entry.cpp @@ -62,6 +62,16 @@ bool OplogEntry::isCommand() const { return opType[0] == 'c'; } +bool OplogEntry::isCrudOpType() const { + switch (opType[0]) { + case 'd': + case 'i': + case 'u': + return opType[1] == 0; + } + return false; +} + bool OplogEntry::hasNamespace() const { return !ns.empty(); } @@ -70,6 +80,22 @@ int OplogEntry::getVersion() const { return version.eoo() ? 1 : version.Int(); } +BSONElement OplogEntry::getIdElement() const { + invariant(isCrudOpType()); + switch (opType[0]) { + case 'u': + return o2.Obj()["_id"]; + case 'd': + case 'i': + return o.Obj()["_id"]; + } + MONGO_UNREACHABLE; +} + +OpTime OplogEntry::getOpTime() const { + return fassertStatusOK(34436, OpTime::parseFromOplogEntry(raw)); +} + Seconds OplogEntry::getTimestampSecs() const { return Seconds(ts.timestamp().getSecs()); } diff --git a/src/mongo/db/repl/oplog_entry.h b/src/mongo/db/repl/oplog_entry.h index 08fd1c8d95a..90f7bdbe8bc 100644 --- a/src/mongo/db/repl/oplog_entry.h +++ b/src/mongo/db/repl/oplog_entry.h @@ -30,6 +30,8 @@ #include "mongo/bson/bsonobj.h" +#include "mongo/db/repl/optime.h" + namespace mongo { namespace repl { @@ -52,8 +54,11 @@ struct OplogEntry { bool isForCappedCollection = false; bool isCommand() const; + bool isCrudOpType() const; bool hasNamespace() const; int getVersion() const; + BSONElement getIdElement() const; + OpTime getOpTime() const; Seconds getTimestampSecs() const; StringData getCollectionName() const; std::string toString() const; diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index e4cf3c2b943..e679c4dc3d7 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -182,9 +182,12 @@ ReplicationCoordinator::Mode getReplicationModeFromSettings(const ReplSettings& DataReplicatorOptions createDataReplicatorOptions(ReplicationCoordinator* replCoord) { DataReplicatorOptions options; - options.applierFn = [](OperationContext*, const OplogEntry&) -> Status { return Status::OK(); }; + options.applierFn = [](const MultiApplier::Operations&) -> Status { return Status::OK(); }; + options.multiApplyFn = + [](OperationContext*, const MultiApplier::Operations&, MultiApplier::ApplyOperationFn) + -> OpTime { return OpTime(); }; options.rollbackFn = - [](OperationContext*, const OpTime&, const HostAndPort&) { return Status::OK(); }; + [](OperationContext*, const OpTime&, const HostAndPort&) -> Status { return Status::OK(); }; options.prepareReplSetUpdatePositionCommandFn = [replCoord](ReplicationCoordinator::ReplSetUpdatePositionCommandStyle commandStyle) -> StatusWith<BSONObj> { diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index cdee4896fab..cacfd14590e 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -56,6 +56,7 @@ #include "mongo/db/query/query_knobs.h" #include "mongo/db/repl/bgsync.h" #include "mongo/db/repl/minvalid.h" +#include "mongo/db/repl/multiapplier.h" #include "mongo/db/repl/oplog.h" #include "mongo/db/repl/oplogreader.h" #include "mongo/db/repl/repl_client_info.h" @@ -432,7 +433,7 @@ void prefetchOp(const BSONObj& op) { } // Doles out all the work to the reader pool threads and waits for them to complete -void prefetchOps(const std::deque<OplogEntry>& ops, OldThreadPool* prefetcherPool) { +void prefetchOps(const MultiApplier::Operations& ops, OldThreadPool* prefetcherPool) { invariant(prefetcherPool); for (auto&& op : ops) { prefetcherPool->schedule(&prefetchOp, op.raw); @@ -441,16 +442,13 @@ void prefetchOps(const std::deque<OplogEntry>& ops, OldThreadPool* prefetcherPoo } // Doles out all the work to the writer pool threads and waits for them to complete -void applyOps(const std::vector<std::vector<OplogEntry>>& writerVectors, +void applyOps(const std::vector<MultiApplier::Operations>& writerVectors, OldThreadPool* writerPool, - SyncTail::MultiSyncApplyFunc func, - SyncTail* sync) { + SyncTail::MultiSyncApplyFunc func) { TimerHolder timer(&applyBatchStats); - for (std::vector<std::vector<OplogEntry>>::const_iterator it = writerVectors.begin(); - it != writerVectors.end(); - ++it) { - if (!it->empty()) { - writerPool->schedule(func, stdx::cref(*it), sync); + for (auto&& ops : writerVectors) { + if (!ops.empty()) { + writerPool->schedule(func, stdx::cref(ops)); } } } @@ -486,8 +484,8 @@ private: }; void fillWriterVectors(OperationContext* txn, - const std::deque<OplogEntry>& ops, - std::vector<std::vector<OplogEntry>>* writerVectors) { + const MultiApplier::Operations& ops, + std::vector<MultiApplier::Operations>* writerVectors) { const bool supportsDocLocking = getGlobalServiceContext()->getGlobalStorageEngine()->supportsDocLocking(); const uint32_t numWriters = writerVectors->size(); @@ -500,24 +498,12 @@ void fillWriterVectors(OperationContext* txn, StringMapTraits::HashedKey hashedNs(op.ns); uint32_t hash = hashedNs.hash(); - const char* opType = op.opType.rawData(); - // For doc locking engines, include the _id of the document in the hash so we get // parallelism even if all writes are to a single collection. We can't do this for capped // collections because the order of inserts is a guaranteed property, unlike for normal // collections. - if (supportsDocLocking && isCrudOpType(opType) && !isCapped(txn, hashedNs)) { - BSONElement id; - switch (opType[0]) { - case 'u': - id = op.o2.Obj()["_id"]; - break; - case 'd': - case 'i': - id = op.o.Obj()["_id"]; - break; - } - + if (supportsDocLocking && op.isCrudOpType() && !isCapped(txn, hashedNs)) { + BSONElement id = op.getIdElement(); const size_t idHash = BSONElement::Hasher()(id); MurmurHash3_x86_32(&idHash, sizeof(idHash), hash, &hash); } @@ -539,50 +525,17 @@ void fillWriterVectors(OperationContext* txn, // Applies a batch of oplog entries, by using a set of threads to apply the operations and then // writes the oplog entries to the local oplog. OpTime SyncTail::multiApply(OperationContext* txn, const OpQueue& ops) { - invariant(_applyFunc); - - if (getGlobalServiceContext()->getGlobalStorageEngine()->isMmapV1()) { - // Use a ThreadPool to prefetch all the operations in a batch. - prefetchOps(ops.getDeque(), &_prefetcherPool); - } - - std::vector<std::vector<OplogEntry>> writerVectors(replWriterThreadCount); - - fillWriterVectors(txn, ops.getDeque(), &writerVectors); - LOG(2) << "replication batch size is " << ops.getDeque().size() << endl; - // We must grab this because we're going to grab write locks later. - // We hold this mutex the entire time we're writing; it doesn't matter - // because all readers are blocked anyway. - stdx::lock_guard<SimpleMutex> fsynclk(filesLockedFsync); - - // stop all readers until we're done - Lock::ParallelBatchWriterMode pbwm(txn->lockState()); - - ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator(); - if (replCoord->getMemberState().primary() && !replCoord->isWaitingForApplierToDrain()) { - severe() << "attempting to replicate ops while primary"; - fassertFailed(28527); - } - - applyOps(writerVectors, &_writerPool, _applyFunc, this); - - OpTime lastOpTime; - { - ON_BLOCK_EXIT([&] { _writerPool.join(); }); - std::vector<BSONObj> raws; - raws.reserve(ops.getDeque().size()); - for (auto&& op : ops.getDeque()) { - raws.emplace_back(op.raw); + auto convertToVector = ops.getDeque(); + auto status = repl::multiApply( + txn, MultiApplier::Operations(convertToVector.begin(), convertToVector.end()), _applyFunc); + if (!status.isOK()) { + if (status == ErrorCodes::InterruptedAtShutdown) { + return OpTime(); + } else { + fassertStatusOK(34437, status); } - lastOpTime = writeOpsToOplog(txn, raws); - } - - if (inShutdownStrict()) { - log() << "Cannot apply operations due to shutdown in progress"; - return OpTime(); } - // We have now written all database writes and updated the oplog to match. - return lastOpTime; + return status.getValue(); } namespace { @@ -1013,7 +966,7 @@ static void initializeWriterThread() { } // This free function is used by the writer threads to apply each op -void multiSyncApply(const std::vector<OplogEntry>& ops, SyncTail* st) { +void multiSyncApply(const std::vector<OplogEntry>& ops) { std::vector<OplogEntry> oplogEntries(ops.begin(), ops.end()); std::vector<OplogEntry*> oplogEntryPointers(oplogEntries.size()); for (size_t i = 0; i < oplogEntries.size(); i++) { @@ -1130,7 +1083,7 @@ void multiSyncApply(const std::vector<OplogEntry>& ops, SyncTail* st) { } // This free function is used by the initial sync writer threads to apply each op -void multiInitialSyncApply(const std::vector<OplogEntry>& ops, SyncTail* st) { +void multiInitialSyncApply(const std::vector<OplogEntry>& ops) { initializeWriterThread(); OperationContextImpl txn; @@ -1146,14 +1099,17 @@ void multiInitialSyncApply(const std::vector<OplogEntry>& ops, SyncTail* st) { try { const Status s = SyncTail::syncApply(&txn, it->raw, convertUpdatesToUpserts); if (!s.isOK()) { + /* TODO (dannenberg) fix this part. SERVER-23308 if (st->shouldRetry(&txn, it->raw)) { const Status s2 = SyncTail::syncApply(&txn, it->raw, convertUpdatesToUpserts); if (!s2.isOK()) { - severe() << "Error applying operation (" << it->raw.toString() - << "): " << s2; - fassertFailedNoTrace(15915); - } - } + */ + severe() << "Error applying operation (" << it->raw.toString() << "): " << s; + fassertFailedNoTrace(15915); + /* + } + } +*/ // If shouldRetry() returns false, fall through. // This can happen if the document that was moved and missed by Cloner @@ -1172,5 +1128,58 @@ void multiInitialSyncApply(const std::vector<OplogEntry>& ops, SyncTail* st) { } } +StatusWith<OpTime> multiApply(OperationContext* txn, + const MultiApplier::Operations& ops, + MultiApplier::ApplyOperationFn applyOperation) { + invariant(applyOperation); + + OldThreadPool workerPool(MultiApplier::kReplWriterThreadCount, "repl writer worker "); + + if (getGlobalServiceContext()->getGlobalStorageEngine()->isMmapV1()) { + // Use a ThreadPool to prefetch all the operations in a batch. + prefetchOps(ops, &workerPool); + } + + std::vector<std::vector<OplogEntry>> writerVectors(MultiApplier::kReplWriterThreadCount); + + fillWriterVectors(txn, ops, &writerVectors); + LOG(2) << "replication batch size is " << ops.size(); + // We must grab this because we're going to grab write locks later. + // We hold this mutex the entire time we're writing; it doesn't matter + // because all readers are blocked anyway. + stdx::lock_guard<SimpleMutex> fsynclk(filesLockedFsync); + + // stop all readers until we're done + Lock::ParallelBatchWriterMode pbwm(txn->lockState()); + + auto replCoord = ReplicationCoordinator::get(txn); + if (replCoord->getMemberState().primary() && !replCoord->isWaitingForApplierToDrain()) { + severe() << "attempting to replicate ops while primary"; + return {ErrorCodes::CannotApplyOplogWhilePrimary, + "attempting to replicate ops while primary"}; + } + + applyOps(writerVectors, &workerPool, applyOperation); + + OpTime lastOpTime; + { + ON_BLOCK_EXIT([&] { workerPool.join(); }); + std::vector<BSONObj> raws; + raws.reserve(ops.size()); + for (auto&& op : ops) { + raws.emplace_back(op.raw); + } + lastOpTime = writeOpsToOplog(txn, raws); + } + + if (inShutdownStrict()) { + log() << "Cannot apply operations due to shutdown in progress"; + return {ErrorCodes::InterruptedAtShutdown, + "Cannot apply operations due to shutdown in progress"}; + } + // We have now written all database writes and updated the oplog to match. + return lastOpTime; +} + } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/sync_tail.h b/src/mongo/db/repl/sync_tail.h index 7fffb39d6e8..9d42ad1ae02 100644 --- a/src/mongo/db/repl/sync_tail.h +++ b/src/mongo/db/repl/sync_tail.h @@ -32,6 +32,7 @@ #include "mongo/base/status.h" #include "mongo/bson/bsonobj.h" +#include "mongo/db/repl/multiapplier.h" #include "mongo/db/repl/oplog_entry.h" #include "mongo/db/storage/mmap_v1/dur.h" #include "mongo/stdx/functional.h" @@ -52,8 +53,7 @@ class OpTime; */ class SyncTail { public: - using MultiSyncApplyFunc = - stdx::function<void(const std::vector<OplogEntry>& ops, SyncTail* st)>; + using MultiSyncApplyFunc = stdx::function<void(const std::vector<OplogEntry>& ops)>; /** * Type of function to increment "repl.apply.ops" server status metric. @@ -178,9 +178,23 @@ private: OldThreadPool _prefetcherPool; }; +/** + * Applies the opeartions described in the oplog entries contained in "ops" using the + * "applyOperation" function. + * + * Returns ErrorCode::InterruptedAtShutdown if the node enters shutdown while applying ops, + * ErrorCodes::CannotApplyOplogWhilePrimary if the node has become primary, and the OpTime of the + * final operation applied otherwise. + * + * Shared between here and MultiApplier. + */ +StatusWith<OpTime> multiApply(OperationContext* txn, + const MultiApplier::Operations& ops, + MultiApplier::ApplyOperationFn applyOperation); + // These free functions are used by the thread pool workers to write ops to the db. -void multiSyncApply(const std::vector<OplogEntry>& ops, SyncTail* st); -void multiInitialSyncApply(const std::vector<OplogEntry>& ops, SyncTail* st); +void multiSyncApply(const std::vector<OplogEntry>& ops); +void multiInitialSyncApply(const std::vector<OplogEntry>& ops); } // namespace repl } // namespace mongo diff --git a/src/mongo/db/repl/sync_tail_test.cpp b/src/mongo/db/repl/sync_tail_test.cpp index 3d0829151e5..e157e02f173 100644 --- a/src/mongo/db/repl/sync_tail_test.cpp +++ b/src/mongo/db/repl/sync_tail_test.cpp @@ -124,7 +124,7 @@ void SyncTailTest::tearDown() { TEST_F(SyncTailTest, Peek) { BackgroundSyncMock bgsync; - SyncTail syncTail(&bgsync, [](const std::vector<OplogEntry>& ops, SyncTail* st) {}); + SyncTail syncTail(&bgsync, [](const std::vector<OplogEntry>& ops) {}); BSONObj obj; ASSERT_FALSE(syncTail.peek(&obj)); } |