summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authormatt dannenberg <matt.dannenberg@10gen.com>2016-03-15 06:52:29 -0400
committermatt dannenberg <matt.dannenberg@10gen.com>2016-03-23 10:36:12 -0400
commit1bcf7cc983a9ea51b41b0549ad5a3811bf7057b7 (patch)
treed243c3fb3c9af522686ce7c50003b189dbbfa868
parentbcac0c80bcf1e4c6b7a55e165f9d08336338068d (diff)
downloadmongo-1bcf7cc983a9ea51b41b0549ad5a3811bf7057b7.tar.gz
SERVER-22858 create a new Applier which wraps multiApply
-rw-r--r--src/mongo/base/error_codes.err1
-rw-r--r--src/mongo/db/commands/mr.cpp3
-rw-r--r--src/mongo/db/repl/SConscript12
-rw-r--r--src/mongo/db/repl/data_replicator.cpp5
-rw-r--r--src/mongo/db/repl/data_replicator.h13
-rw-r--r--src/mongo/db/repl/data_replicator_test.cpp61
-rw-r--r--src/mongo/db/repl/multiapplier.cpp231
-rw-r--r--src/mongo/db/repl/multiapplier.h184
-rw-r--r--src/mongo/db/repl/oplog_entry.cpp26
-rw-r--r--src/mongo/db/repl/oplog_entry.h5
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp7
-rw-r--r--src/mongo/db/repl/sync_tail.cpp157
-rw-r--r--src/mongo/db/repl/sync_tail.h22
-rw-r--r--src/mongo/db/repl/sync_tail_test.cpp2
14 files changed, 614 insertions, 115 deletions
diff --git a/src/mongo/base/error_codes.err b/src/mongo/base/error_codes.err
index 000a5e0ca4a..0aa280617aa 100644
--- a/src/mongo/base/error_codes.err
+++ b/src/mongo/base/error_codes.err
@@ -153,6 +153,7 @@ error_code("OperationCannotBeBatched", 151)
error_code("OplogOutOfOrder", 152)
error_code("ChunkTooBig", 153)
error_code("InconsistentShardIdentity", 154)
+error_code("CannotApplyOplogWhilePrimary", 155)
# Non-sequential error codes (for compatibility only)
error_code("RecvStaleConfig", 9996)
diff --git a/src/mongo/db/commands/mr.cpp b/src/mongo/db/commands/mr.cpp
index 52ae5cf6f7c..c60577e21bb 100644
--- a/src/mongo/db/commands/mr.cpp
+++ b/src/mongo/db/commands/mr.cpp
@@ -1472,7 +1472,8 @@ public:
return appendCommandStatus(
result,
Status(ErrorCodes::OperationFailed,
- str::stream() << "Executor killed during mapReduce command"));
+ str::stream()
+ << "Executor killed during mapReduce command"));
}
reduceTime += t.micros();
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index a58585bee8b..30b5d1779f0 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -728,6 +728,17 @@ env.CppUnitTest(
)
env.Library(
+ target='multiapplier',
+ source=[
+ 'multiapplier.cpp',
+ ],
+ LIBDEPS=[
+ 'oplog_entry',
+ 'replication_executor',
+ ],
+)
+
+env.Library(
target='applier',
source=[
'applier.cpp',
@@ -758,6 +769,7 @@ env.Library(
'applier',
'collection_cloner',
'database_cloner',
+ 'multiapplier',
'optime',
'reporter',
'$BUILD_DIR/mongo/client/fetcher',
diff --git a/src/mongo/db/repl/data_replicator.cpp b/src/mongo/db/repl/data_replicator.cpp
index feb54538c08..660d53e1567 100644
--- a/src/mongo/db/repl/data_replicator.cpp
+++ b/src/mongo/db/repl/data_replicator.cpp
@@ -1142,8 +1142,7 @@ void DataReplicator::_handleFailedApplyBatch(const TimestampStatus& ts, const Op
void DataReplicator::_scheduleApplyAfterFetch(const Operations& ops) {
++_initialSyncState->fetchedMissingDocs;
// TODO: check collection.isCapped, like SyncTail::getMissingDoc
- const BSONObj failedOplogEntry = ops.begin()->raw;
- const BSONElement missingIdElem = failedOplogEntry.getFieldDotted("o2._id");
+ const BSONElement missingIdElem = ops.begin()->getIdElement();
const NamespaceString nss(ops.begin()->ns);
const BSONObj query = BSON("find" << nss.coll() << "filter" << missingIdElem.wrap());
_tmpFetcher.reset(new QueryFetcher(_exec,
@@ -1254,7 +1253,7 @@ Status DataReplicator::_scheduleApplyBatch_inlock(const Operations& ops) {
_exec->wait(status.getValue());
};
- _applier.reset(new Applier(_exec, ops, _opts.applierFn, lambda));
+ _applier.reset(new MultiApplier(_exec, ops, _opts.applierFn, _opts.multiApplyFn, lambda));
return _applier->start();
}
diff --git a/src/mongo/db/repl/data_replicator.h b/src/mongo/db/repl/data_replicator.h
index f6503abb8bf..53dda5f96c5 100644
--- a/src/mongo/db/repl/data_replicator.h
+++ b/src/mongo/db/repl/data_replicator.h
@@ -37,7 +37,7 @@
#include "mongo/bson/bsonobj.h"
#include "mongo/bson/timestamp.h"
#include "mongo/db/namespace_string.h"
-#include "mongo/db/repl/applier.h"
+#include "mongo/db/repl/multiapplier.h"
#include "mongo/db/repl/collection_cloner.h"
#include "mongo/db/repl/database_cloner.h"
#include "mongo/db/repl/optime.h"
@@ -56,7 +56,7 @@ class QueryFetcher;
namespace repl {
-using Operations = Applier::Operations;
+using Operations = MultiApplier::Operations;
using QueryResponseStatus = StatusWith<Fetcher::QueryResponse>;
using CallbackArgs = ReplicationExecutor::CallbackArgs;
using CBHStatus = StatusWith<ReplicationExecutor::CallbackHandle>;
@@ -132,7 +132,8 @@ struct DataReplicatorOptions {
std::string scopeNS;
BSONObj filterCriteria;
- Applier::ApplyOperationFn applierFn;
+ MultiApplier::ApplyOperationFn applierFn;
+ MultiApplier::MultiApplyFn multiApplyFn;
RollbackFn rollbackFn;
Reporter::PrepareReplSetUpdatePositionCommandFn prepareReplSetUpdatePositionCommandFn;
GetMyLastOptimeFn getMyLastOptime;
@@ -307,9 +308,9 @@ private:
Handle _reporterHandle; // (M)
std::unique_ptr<Reporter> _reporter; // (M)
- bool _applierActive; // (M)
- bool _applierPaused; // (X)
- std::unique_ptr<Applier> _applier; // (M)
+ bool _applierActive; // (M)
+ bool _applierPaused; // (X)
+ std::unique_ptr<MultiApplier> _applier; // (M)
HostAndPort _syncSource; // (M)
Timestamp _lastTimestampFetched; // (MX)
diff --git a/src/mongo/db/repl/data_replicator_test.cpp b/src/mongo/db/repl/data_replicator_test.cpp
index b8199ebf9dd..5d3513522d5 100644
--- a/src/mongo/db/repl/data_replicator_test.cpp
+++ b/src/mongo/db/repl/data_replicator_test.cpp
@@ -102,7 +102,11 @@ public:
* clear/reset state
*/
void reset() {
- _applierFn = [](OperationContext*, const OplogEntry&) -> Status { return Status::OK(); };
+ _applierFn = [](const MultiApplier::Operations&) -> Status { return Status::OK(); };
+ _multiApplyFn = [](OperationContext*,
+ const MultiApplier::Operations& ops,
+ MultiApplier::ApplyOperationFn)
+ -> StatusWith<OpTime> { return ops.back().getOpTime(); };
_rollbackFn = [](OperationContext*, const OpTime&, const HostAndPort&)
-> Status { return Status::OK(); };
_setMyLastOptime = [this](const OpTime& opTime) { _myLastOpTime = opTime; };
@@ -181,9 +185,11 @@ protected:
launchExecutorThread();
DataReplicatorOptions options;
options.initialSyncRetryWait = Milliseconds(0);
- options.applierFn = [this](OperationContext* txn, const OplogEntry& operation) {
- return _applierFn(txn, operation);
- };
+ options.applierFn = [this](const MultiApplier::Operations& ops) { return _applierFn(ops); };
+ options.multiApplyFn =
+ [this](OperationContext* txn,
+ const MultiApplier::Operations& ops,
+ MultiApplier::ApplyOperationFn func) { return _multiApplyFn(txn, ops, func); };
options.rollbackFn = [this](OperationContext* txn,
const OpTime& lastOpTimeWritten,
const HostAndPort& syncSource) -> Status {
@@ -214,7 +220,8 @@ protected:
// Executor may still invoke callback before shutting down.
}
- Applier::ApplyOperationFn _applierFn;
+ MultiApplier::ApplyOperationFn _applierFn;
+ MultiApplier::MultiApplyFn _multiApplyFn;
DataReplicatorOptions::RollbackFn _rollbackFn;
DataReplicatorOptions::SetMyLastOptimeFn _setMyLastOptime;
OpTime _myLastOpTime;
@@ -452,15 +459,17 @@ TEST_F(InitialSyncTest, Complete) {
verifySync();
}
-TEST_F(InitialSyncTest, MissingDocOnApplyCompletes) {
+TEST_F(InitialSyncTest, MissingDocOnMultiApplyCompletes) {
DataReplicatorOptions opts;
int applyCounter{0};
- _applierFn = [&](OperationContext* txn, const OplogEntry& op) {
- if (++applyCounter == 1) {
- return Status(ErrorCodes::NoMatchingDocument, "failed: missing doc.");
- }
- return Status::OK();
- };
+ _multiApplyFn =
+ [&](OperationContext*, const MultiApplier::Operations& ops, MultiApplier::ApplyOperationFn)
+ -> StatusWith<OpTime> {
+ if (++applyCounter == 1) {
+ return Status(ErrorCodes::NoMatchingDocument, "failed: missing doc.");
+ }
+ return ops.back().getOpTime();
+ };
const std::vector<BSONObj> responses = {
// get latest oplog ts
@@ -870,12 +879,14 @@ TEST_F(SteadyStateTest, PauseDataReplicator) {
unittest::Barrier barrier(2U);
Timestamp lastTimestampApplied;
BSONObj operationApplied;
- _applierFn = [&](OperationContext* txn, const OplogEntry& op) {
- stdx::lock_guard<stdx::mutex> lock(mutex);
- operationApplied = op.raw;
- barrier.countDownAndWait();
- return Status::OK();
- };
+ _multiApplyFn =
+ [&](OperationContext*, const MultiApplier::Operations& ops, MultiApplier::ApplyOperationFn)
+ -> StatusWith<OpTime> {
+ stdx::lock_guard<stdx::mutex> lock(mutex);
+ operationApplied = ops.back().raw;
+ barrier.countDownAndWait();
+ return ops.back().getOpTime();
+ };
DataReplicatorOptions::SetMyLastOptimeFn oldSetMyLastOptime = _setMyLastOptime;
_setMyLastOptime = [&](const OpTime& opTime) {
oldSetMyLastOptime(opTime);
@@ -952,12 +963,14 @@ TEST_F(SteadyStateTest, ApplyOneOperation) {
unittest::Barrier barrier(2U);
Timestamp lastTimestampApplied;
BSONObj operationApplied;
- _applierFn = [&](OperationContext* txn, const OplogEntry& op) {
- stdx::lock_guard<stdx::mutex> lock(mutex);
- operationApplied = op.raw;
- barrier.countDownAndWait();
- return Status::OK();
- };
+ _multiApplyFn =
+ [&](OperationContext*, const MultiApplier::Operations& ops, MultiApplier::ApplyOperationFn)
+ -> StatusWith<OpTime> {
+ stdx::lock_guard<stdx::mutex> lock(mutex);
+ operationApplied = ops.back().raw;
+ barrier.countDownAndWait();
+ return ops.back().getOpTime();
+ };
DataReplicatorOptions::SetMyLastOptimeFn oldSetMyLastOptime = _setMyLastOptime;
_setMyLastOptime = [&](const OpTime& opTime) {
oldSetMyLastOptime(opTime);
diff --git a/src/mongo/db/repl/multiapplier.cpp b/src/mongo/db/repl/multiapplier.cpp
new file mode 100644
index 00000000000..8db87b5e1f5
--- /dev/null
+++ b/src/mongo/db/repl/multiapplier.cpp
@@ -0,0 +1,231 @@
+/**
+ * Copyright (C) 2015 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/repl/multiapplier.h"
+
+#include <algorithm>
+
+#include "mongo/db/operation_context.h"
+#include "mongo/db/repl/optime.h"
+#include "mongo/db/repl/replication_executor.h"
+
+namespace mongo {
+namespace repl {
+
+MultiApplier::MultiApplier(ReplicationExecutor* executor,
+ const Operations& operations,
+ const ApplyOperationFn& applyOperation,
+ const MultiApplyFn& multiApply,
+ const CallbackFn& onCompletion)
+ : _executor(executor),
+ _operations(operations),
+ _applyOperation(applyOperation),
+ _multiApply(multiApply),
+ _onCompletion(onCompletion),
+ _active(false) {
+ uassert(ErrorCodes::BadValue, "null replication executor", executor);
+ uassert(ErrorCodes::BadValue, "empty list of operations", !operations.empty());
+ uassert(ErrorCodes::FailedToParse,
+ str::stream() << "last operation missing 'ts' field: " << operations.back().raw,
+ operations.back().raw.hasField("ts"));
+ uassert(ErrorCodes::TypeMismatch,
+ str::stream() << "'ts' in last operation not a timestamp: " << operations.back().raw,
+ BSONType::bsonTimestamp == operations.back().raw.getField("ts").type());
+ uassert(ErrorCodes::BadValue, "apply operation function cannot be null", applyOperation);
+ uassert(ErrorCodes::BadValue, "callback function cannot be null", onCompletion);
+}
+
+MultiApplier::~MultiApplier() {
+ DESTRUCTOR_GUARD(cancel(); wait(););
+}
+
+std::string MultiApplier::getDiagnosticString() const {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ str::stream output;
+ output << "MultiApplier";
+ output << " executor: " << _executor->getDiagnosticString();
+ output << " active: " << _active;
+ return output;
+}
+
+bool MultiApplier::isActive() const {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ return _active;
+}
+
+Status MultiApplier::start() {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+
+ if (_active) {
+ return Status(ErrorCodes::IllegalOperation, "applier already started");
+ }
+
+ auto scheduleResult = _executor->scheduleDBWork(
+ stdx::bind(&MultiApplier::_callback, this, stdx::placeholders::_1));
+ if (!scheduleResult.isOK()) {
+ return scheduleResult.getStatus();
+ }
+
+ _active = true;
+ _dbWorkCallbackHandle = scheduleResult.getValue();
+
+ return Status::OK();
+}
+
+void MultiApplier::cancel() {
+ ReplicationExecutor::CallbackHandle dbWorkCallbackHandle;
+ {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+
+ if (!_active) {
+ return;
+ }
+
+ dbWorkCallbackHandle = _dbWorkCallbackHandle;
+ }
+
+ if (dbWorkCallbackHandle.isValid()) {
+ _executor->cancel(dbWorkCallbackHandle);
+ }
+}
+
+void MultiApplier::wait() {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+
+ while (_active) {
+ _condition.wait(lk);
+ }
+}
+
+// TODO change the passed in function to be multiapply instead of apply inlock
+void MultiApplier::_callback(const ReplicationExecutor::CallbackArgs& cbd) {
+ if (!cbd.status.isOK()) {
+ _finishCallback(cbd.status, _operations);
+ return;
+ }
+
+ invariant(cbd.txn);
+
+ // Refer to multiSyncApply() and multiInitialSyncApply() in sync_tail.cpp.
+ cbd.txn->setReplicatedWrites(false);
+
+ // allow us to get through the magic barrier
+ cbd.txn->lockState()->setIsBatchWriter(true);
+
+ StatusWith<OpTime> applyStatus(ErrorCodes::InternalError, "not mutated");
+
+ invariant(!_operations.empty());
+ try {
+ applyStatus = _multiApply(cbd.txn, _operations, _applyOperation);
+ } catch (...) {
+ applyStatus = exceptionToStatus();
+ }
+ if (!applyStatus.isOK()) {
+ _finishCallback(applyStatus.getStatus(), _operations);
+ return;
+ }
+ _finishCallback(applyStatus.getValue().getTimestamp(), Operations());
+}
+
+void MultiApplier::_finishCallback(const StatusWith<Timestamp>& result,
+ const Operations& operations) {
+ _onCompletion(result, operations);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ _active = false;
+ _condition.notify_all();
+}
+
+namespace {
+
+void pauseBeforeCompletion(const StatusWith<Timestamp>& result,
+ const MultiApplier::Operations& operationsOnCompletion,
+ const PauseDataReplicatorFn& pauseDataReplicator,
+ const MultiApplier::CallbackFn& onCompletion) {
+ if (result.isOK()) {
+ pauseDataReplicator();
+ }
+ onCompletion(result, operationsOnCompletion);
+};
+
+} // namespace
+
+StatusWith<std::pair<std::unique_ptr<MultiApplier>, MultiApplier::Operations>> applyUntilAndPause(
+ ReplicationExecutor* executor,
+ const MultiApplier::Operations& operations,
+ const MultiApplier::ApplyOperationFn& applyOperation,
+ const MultiApplier::MultiApplyFn& multiApply,
+ const Timestamp& lastTimestampToApply,
+ const PauseDataReplicatorFn& pauseDataReplicator,
+ const MultiApplier::CallbackFn& onCompletion) {
+ try {
+ auto comp = [](const OplogEntry& left, const OplogEntry& right) {
+ uassert(ErrorCodes::FailedToParse,
+ str::stream() << "Operation missing 'ts' field': " << left.raw,
+ left.raw.hasField("ts"));
+ uassert(ErrorCodes::FailedToParse,
+ str::stream() << "Operation missing 'ts' field': " << right.raw,
+ right.raw.hasField("ts"));
+ return left.raw["ts"].timestamp() < right.raw["ts"].timestamp();
+ };
+ auto wrapped = OplogEntry(BSON("ts" << lastTimestampToApply));
+ auto i = std::lower_bound(operations.cbegin(), operations.cend(), wrapped, comp);
+ bool found = i != operations.cend() && !comp(wrapped, *i);
+ auto j = found ? i + 1 : i;
+ MultiApplier::Operations operationsInRange(operations.cbegin(), j);
+ MultiApplier::Operations operationsNotInRange(j, operations.cend());
+ if (!found) {
+ return std::make_pair(
+ std::unique_ptr<MultiApplier>(new MultiApplier(
+ executor, operationsInRange, applyOperation, multiApply, onCompletion)),
+ operationsNotInRange);
+ }
+
+ return std::make_pair(
+ std::unique_ptr<MultiApplier>(new MultiApplier(executor,
+ operationsInRange,
+ applyOperation,
+ multiApply,
+ stdx::bind(pauseBeforeCompletion,
+ stdx::placeholders::_1,
+ stdx::placeholders::_2,
+ pauseDataReplicator,
+ onCompletion))),
+ operationsNotInRange);
+ } catch (...) {
+ return exceptionToStatus();
+ }
+ MONGO_UNREACHABLE;
+ return Status(ErrorCodes::InternalError, "unreachable");
+}
+
+} // namespace repl
+} // namespace mongo
diff --git a/src/mongo/db/repl/multiapplier.h b/src/mongo/db/repl/multiapplier.h
new file mode 100644
index 00000000000..f3d5ea3badf
--- /dev/null
+++ b/src/mongo/db/repl/multiapplier.h
@@ -0,0 +1,184 @@
+/**
+ * Copyright (C) 2015 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include <memory>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "mongo/base/disallow_copying.h"
+#include "mongo/base/status.h"
+#include "mongo/base/status_with.h"
+#include "mongo/db/jsobj.h"
+#include "mongo/db/repl/oplog_entry.h"
+#include "mongo/db/repl/replication_executor.h"
+#include "mongo/db/service_context.h"
+#include "mongo/stdx/condition_variable.h"
+#include "mongo/stdx/functional.h"
+#include "mongo/stdx/mutex.h"
+
+namespace mongo {
+namespace repl {
+
+class OpTime;
+
+class MultiApplier {
+ MONGO_DISALLOW_COPYING(MultiApplier);
+
+public:
+ static const size_t kReplWriterThreadCount = 16;
+
+ /**
+ * Operations sorted by timestamp in ascending order.
+ */
+ using Operations = std::vector<OplogEntry>;
+
+ /**
+ * Callback function to report final status of applying operations along with
+ * list of operations (if applicable) that were not successfully applied.
+ * On success, returns the timestamp of the last operation applied together with an empty
+ * list of operations.
+ */
+ using CallbackFn = stdx::function<void(const StatusWith<Timestamp>&, const Operations&)>;
+
+ /**
+ * Type of function to to apply a single operation. In production, this function
+ * would have the same outcome as calling SyncTail::syncApply() ('convertUpdatesToUpserts'
+ * value will be embedded in the function implementation).
+ */
+ using ApplyOperationFn = stdx::function<void(const Operations&)>;
+
+ using MultiApplyFn = stdx::function<StatusWith<OpTime>(
+ OperationContext*, const MultiApplier::Operations&, MultiApplier::ApplyOperationFn)>;
+
+ /**
+ * Creates MultiApplier in inactive state.
+ *
+ * Accepts list of oplog entries to apply in 'operations'.
+ *
+ * The callback function will be invoked (after schedule()) when the applied has
+ * successfully applied all the operations or encountered a failure. Failures may occur if
+ * we failed to apply an operation; or if the underlying scheduled work item
+ * on the replication executor was canceled.
+ *
+ * It is an error for 'operations' to be empty but individual oplog entries
+ * contained in 'operations' are not validated.
+ */
+ MultiApplier(ReplicationExecutor* executor,
+ const Operations& operations,
+ const ApplyOperationFn& applyOperation,
+ const MultiApplyFn& multiApply,
+ const CallbackFn& onCompletion);
+
+ /**
+ * Blocks while applier is active.
+ */
+ virtual ~MultiApplier();
+
+ /**
+ * Returns diagnostic information.
+ */
+ std::string getDiagnosticString() const;
+
+ /**
+ * Returns true if the applier has been started (but has not completed).
+ */
+ bool isActive() const;
+
+ /**
+ * Starts applier by scheduling initial db work to be run by the executor.
+ */
+ Status start();
+
+ /**
+ * Cancels current db work request.
+ * Returns immediately if applier is not active.
+ *
+ * Callback function may be invoked with an ErrorCodes::CallbackCanceled status.
+ */
+ void cancel();
+
+ /**
+ * Waits for active database worker to complete.
+ * Returns immediately if applier is not active.
+ */
+ void wait();
+
+private:
+ /**
+ * DB worker callback function - applies all operations.
+ */
+ void _callback(const ReplicationExecutor::CallbackArgs& cbd);
+ void _finishCallback(const StatusWith<Timestamp>& result, const Operations& operations);
+
+ // Not owned by us.
+ ReplicationExecutor* _executor;
+
+ Operations _operations;
+ ApplyOperationFn _applyOperation;
+ MultiApplyFn _multiApply;
+ CallbackFn _onCompletion;
+
+ // Protects member data of this MultiApplier.
+ mutable stdx::mutex _mutex;
+
+ stdx::condition_variable _condition;
+
+ // _active is true when MultiApplier is scheduled to be run by the executor.
+ bool _active;
+
+ ReplicationExecutor::CallbackHandle _dbWorkCallbackHandle;
+};
+
+
+/**
+ * Applies operations (sorted by timestamp) up to and including 'lastTimestampToApply'.
+ * If 'lastTimestampToApply' is found in 'operations':
+ * - The applier will be given a subset of 'operations' (includes 'lastTimestampToApply').
+ * - On success, the applier will invoke the 'pause' function just before reporting
+ * completion status.
+ * Otherwise, all entries in 'operations' before 'lastTimestampToApply' will be forwarded to
+ * the applier and the 'pause' function will be ignored.
+ * If the applier is successfully created, returns the applier and a list of operations that
+ * are skipped (operations with 'ts' field value after 'lastTimestampToApply).
+ */
+using PauseDataReplicatorFn = stdx::function<void()>;
+
+StatusWith<std::pair<std::unique_ptr<MultiApplier>, MultiApplier::Operations>> applyUntilAndPause(
+ ReplicationExecutor* executor,
+ const MultiApplier::Operations& operations,
+ const MultiApplier::ApplyOperationFn& applyOperation,
+ const MultiApplier::ApplyOperationFn& multiApply,
+ const Timestamp& lastTimestampToApply,
+ const PauseDataReplicatorFn& pauseDataReplicator,
+ const MultiApplier::CallbackFn& onCompletion);
+
+} // namespace repl
+} // namespace mongo
diff --git a/src/mongo/db/repl/oplog_entry.cpp b/src/mongo/db/repl/oplog_entry.cpp
index 75bba6782b4..477b1538d3f 100644
--- a/src/mongo/db/repl/oplog_entry.cpp
+++ b/src/mongo/db/repl/oplog_entry.cpp
@@ -62,6 +62,16 @@ bool OplogEntry::isCommand() const {
return opType[0] == 'c';
}
+bool OplogEntry::isCrudOpType() const {
+ switch (opType[0]) {
+ case 'd':
+ case 'i':
+ case 'u':
+ return opType[1] == 0;
+ }
+ return false;
+}
+
bool OplogEntry::hasNamespace() const {
return !ns.empty();
}
@@ -70,6 +80,22 @@ int OplogEntry::getVersion() const {
return version.eoo() ? 1 : version.Int();
}
+BSONElement OplogEntry::getIdElement() const {
+ invariant(isCrudOpType());
+ switch (opType[0]) {
+ case 'u':
+ return o2.Obj()["_id"];
+ case 'd':
+ case 'i':
+ return o.Obj()["_id"];
+ }
+ MONGO_UNREACHABLE;
+}
+
+OpTime OplogEntry::getOpTime() const {
+ return fassertStatusOK(34436, OpTime::parseFromOplogEntry(raw));
+}
+
Seconds OplogEntry::getTimestampSecs() const {
return Seconds(ts.timestamp().getSecs());
}
diff --git a/src/mongo/db/repl/oplog_entry.h b/src/mongo/db/repl/oplog_entry.h
index 08fd1c8d95a..90f7bdbe8bc 100644
--- a/src/mongo/db/repl/oplog_entry.h
+++ b/src/mongo/db/repl/oplog_entry.h
@@ -30,6 +30,8 @@
#include "mongo/bson/bsonobj.h"
+#include "mongo/db/repl/optime.h"
+
namespace mongo {
namespace repl {
@@ -52,8 +54,11 @@ struct OplogEntry {
bool isForCappedCollection = false;
bool isCommand() const;
+ bool isCrudOpType() const;
bool hasNamespace() const;
int getVersion() const;
+ BSONElement getIdElement() const;
+ OpTime getOpTime() const;
Seconds getTimestampSecs() const;
StringData getCollectionName() const;
std::string toString() const;
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index e4cf3c2b943..e679c4dc3d7 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -182,9 +182,12 @@ ReplicationCoordinator::Mode getReplicationModeFromSettings(const ReplSettings&
DataReplicatorOptions createDataReplicatorOptions(ReplicationCoordinator* replCoord) {
DataReplicatorOptions options;
- options.applierFn = [](OperationContext*, const OplogEntry&) -> Status { return Status::OK(); };
+ options.applierFn = [](const MultiApplier::Operations&) -> Status { return Status::OK(); };
+ options.multiApplyFn =
+ [](OperationContext*, const MultiApplier::Operations&, MultiApplier::ApplyOperationFn)
+ -> OpTime { return OpTime(); };
options.rollbackFn =
- [](OperationContext*, const OpTime&, const HostAndPort&) { return Status::OK(); };
+ [](OperationContext*, const OpTime&, const HostAndPort&) -> Status { return Status::OK(); };
options.prepareReplSetUpdatePositionCommandFn =
[replCoord](ReplicationCoordinator::ReplSetUpdatePositionCommandStyle commandStyle)
-> StatusWith<BSONObj> {
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index cdee4896fab..cacfd14590e 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -56,6 +56,7 @@
#include "mongo/db/query/query_knobs.h"
#include "mongo/db/repl/bgsync.h"
#include "mongo/db/repl/minvalid.h"
+#include "mongo/db/repl/multiapplier.h"
#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/oplogreader.h"
#include "mongo/db/repl/repl_client_info.h"
@@ -432,7 +433,7 @@ void prefetchOp(const BSONObj& op) {
}
// Doles out all the work to the reader pool threads and waits for them to complete
-void prefetchOps(const std::deque<OplogEntry>& ops, OldThreadPool* prefetcherPool) {
+void prefetchOps(const MultiApplier::Operations& ops, OldThreadPool* prefetcherPool) {
invariant(prefetcherPool);
for (auto&& op : ops) {
prefetcherPool->schedule(&prefetchOp, op.raw);
@@ -441,16 +442,13 @@ void prefetchOps(const std::deque<OplogEntry>& ops, OldThreadPool* prefetcherPoo
}
// Doles out all the work to the writer pool threads and waits for them to complete
-void applyOps(const std::vector<std::vector<OplogEntry>>& writerVectors,
+void applyOps(const std::vector<MultiApplier::Operations>& writerVectors,
OldThreadPool* writerPool,
- SyncTail::MultiSyncApplyFunc func,
- SyncTail* sync) {
+ SyncTail::MultiSyncApplyFunc func) {
TimerHolder timer(&applyBatchStats);
- for (std::vector<std::vector<OplogEntry>>::const_iterator it = writerVectors.begin();
- it != writerVectors.end();
- ++it) {
- if (!it->empty()) {
- writerPool->schedule(func, stdx::cref(*it), sync);
+ for (auto&& ops : writerVectors) {
+ if (!ops.empty()) {
+ writerPool->schedule(func, stdx::cref(ops));
}
}
}
@@ -486,8 +484,8 @@ private:
};
void fillWriterVectors(OperationContext* txn,
- const std::deque<OplogEntry>& ops,
- std::vector<std::vector<OplogEntry>>* writerVectors) {
+ const MultiApplier::Operations& ops,
+ std::vector<MultiApplier::Operations>* writerVectors) {
const bool supportsDocLocking =
getGlobalServiceContext()->getGlobalStorageEngine()->supportsDocLocking();
const uint32_t numWriters = writerVectors->size();
@@ -500,24 +498,12 @@ void fillWriterVectors(OperationContext* txn,
StringMapTraits::HashedKey hashedNs(op.ns);
uint32_t hash = hashedNs.hash();
- const char* opType = op.opType.rawData();
-
// For doc locking engines, include the _id of the document in the hash so we get
// parallelism even if all writes are to a single collection. We can't do this for capped
// collections because the order of inserts is a guaranteed property, unlike for normal
// collections.
- if (supportsDocLocking && isCrudOpType(opType) && !isCapped(txn, hashedNs)) {
- BSONElement id;
- switch (opType[0]) {
- case 'u':
- id = op.o2.Obj()["_id"];
- break;
- case 'd':
- case 'i':
- id = op.o.Obj()["_id"];
- break;
- }
-
+ if (supportsDocLocking && op.isCrudOpType() && !isCapped(txn, hashedNs)) {
+ BSONElement id = op.getIdElement();
const size_t idHash = BSONElement::Hasher()(id);
MurmurHash3_x86_32(&idHash, sizeof(idHash), hash, &hash);
}
@@ -539,50 +525,17 @@ void fillWriterVectors(OperationContext* txn,
// Applies a batch of oplog entries, by using a set of threads to apply the operations and then
// writes the oplog entries to the local oplog.
OpTime SyncTail::multiApply(OperationContext* txn, const OpQueue& ops) {
- invariant(_applyFunc);
-
- if (getGlobalServiceContext()->getGlobalStorageEngine()->isMmapV1()) {
- // Use a ThreadPool to prefetch all the operations in a batch.
- prefetchOps(ops.getDeque(), &_prefetcherPool);
- }
-
- std::vector<std::vector<OplogEntry>> writerVectors(replWriterThreadCount);
-
- fillWriterVectors(txn, ops.getDeque(), &writerVectors);
- LOG(2) << "replication batch size is " << ops.getDeque().size() << endl;
- // We must grab this because we're going to grab write locks later.
- // We hold this mutex the entire time we're writing; it doesn't matter
- // because all readers are blocked anyway.
- stdx::lock_guard<SimpleMutex> fsynclk(filesLockedFsync);
-
- // stop all readers until we're done
- Lock::ParallelBatchWriterMode pbwm(txn->lockState());
-
- ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator();
- if (replCoord->getMemberState().primary() && !replCoord->isWaitingForApplierToDrain()) {
- severe() << "attempting to replicate ops while primary";
- fassertFailed(28527);
- }
-
- applyOps(writerVectors, &_writerPool, _applyFunc, this);
-
- OpTime lastOpTime;
- {
- ON_BLOCK_EXIT([&] { _writerPool.join(); });
- std::vector<BSONObj> raws;
- raws.reserve(ops.getDeque().size());
- for (auto&& op : ops.getDeque()) {
- raws.emplace_back(op.raw);
+ auto convertToVector = ops.getDeque();
+ auto status = repl::multiApply(
+ txn, MultiApplier::Operations(convertToVector.begin(), convertToVector.end()), _applyFunc);
+ if (!status.isOK()) {
+ if (status == ErrorCodes::InterruptedAtShutdown) {
+ return OpTime();
+ } else {
+ fassertStatusOK(34437, status);
}
- lastOpTime = writeOpsToOplog(txn, raws);
- }
-
- if (inShutdownStrict()) {
- log() << "Cannot apply operations due to shutdown in progress";
- return OpTime();
}
- // We have now written all database writes and updated the oplog to match.
- return lastOpTime;
+ return status.getValue();
}
namespace {
@@ -1013,7 +966,7 @@ static void initializeWriterThread() {
}
// This free function is used by the writer threads to apply each op
-void multiSyncApply(const std::vector<OplogEntry>& ops, SyncTail* st) {
+void multiSyncApply(const std::vector<OplogEntry>& ops) {
std::vector<OplogEntry> oplogEntries(ops.begin(), ops.end());
std::vector<OplogEntry*> oplogEntryPointers(oplogEntries.size());
for (size_t i = 0; i < oplogEntries.size(); i++) {
@@ -1130,7 +1083,7 @@ void multiSyncApply(const std::vector<OplogEntry>& ops, SyncTail* st) {
}
// This free function is used by the initial sync writer threads to apply each op
-void multiInitialSyncApply(const std::vector<OplogEntry>& ops, SyncTail* st) {
+void multiInitialSyncApply(const std::vector<OplogEntry>& ops) {
initializeWriterThread();
OperationContextImpl txn;
@@ -1146,14 +1099,17 @@ void multiInitialSyncApply(const std::vector<OplogEntry>& ops, SyncTail* st) {
try {
const Status s = SyncTail::syncApply(&txn, it->raw, convertUpdatesToUpserts);
if (!s.isOK()) {
+ /* TODO (dannenberg) fix this part. SERVER-23308
if (st->shouldRetry(&txn, it->raw)) {
const Status s2 = SyncTail::syncApply(&txn, it->raw, convertUpdatesToUpserts);
if (!s2.isOK()) {
- severe() << "Error applying operation (" << it->raw.toString()
- << "): " << s2;
- fassertFailedNoTrace(15915);
- }
- }
+ */
+ severe() << "Error applying operation (" << it->raw.toString() << "): " << s;
+ fassertFailedNoTrace(15915);
+ /*
+ }
+ }
+*/
// If shouldRetry() returns false, fall through.
// This can happen if the document that was moved and missed by Cloner
@@ -1172,5 +1128,58 @@ void multiInitialSyncApply(const std::vector<OplogEntry>& ops, SyncTail* st) {
}
}
+StatusWith<OpTime> multiApply(OperationContext* txn,
+ const MultiApplier::Operations& ops,
+ MultiApplier::ApplyOperationFn applyOperation) {
+ invariant(applyOperation);
+
+ OldThreadPool workerPool(MultiApplier::kReplWriterThreadCount, "repl writer worker ");
+
+ if (getGlobalServiceContext()->getGlobalStorageEngine()->isMmapV1()) {
+ // Use a ThreadPool to prefetch all the operations in a batch.
+ prefetchOps(ops, &workerPool);
+ }
+
+ std::vector<std::vector<OplogEntry>> writerVectors(MultiApplier::kReplWriterThreadCount);
+
+ fillWriterVectors(txn, ops, &writerVectors);
+ LOG(2) << "replication batch size is " << ops.size();
+ // We must grab this because we're going to grab write locks later.
+ // We hold this mutex the entire time we're writing; it doesn't matter
+ // because all readers are blocked anyway.
+ stdx::lock_guard<SimpleMutex> fsynclk(filesLockedFsync);
+
+ // stop all readers until we're done
+ Lock::ParallelBatchWriterMode pbwm(txn->lockState());
+
+ auto replCoord = ReplicationCoordinator::get(txn);
+ if (replCoord->getMemberState().primary() && !replCoord->isWaitingForApplierToDrain()) {
+ severe() << "attempting to replicate ops while primary";
+ return {ErrorCodes::CannotApplyOplogWhilePrimary,
+ "attempting to replicate ops while primary"};
+ }
+
+ applyOps(writerVectors, &workerPool, applyOperation);
+
+ OpTime lastOpTime;
+ {
+ ON_BLOCK_EXIT([&] { workerPool.join(); });
+ std::vector<BSONObj> raws;
+ raws.reserve(ops.size());
+ for (auto&& op : ops) {
+ raws.emplace_back(op.raw);
+ }
+ lastOpTime = writeOpsToOplog(txn, raws);
+ }
+
+ if (inShutdownStrict()) {
+ log() << "Cannot apply operations due to shutdown in progress";
+ return {ErrorCodes::InterruptedAtShutdown,
+ "Cannot apply operations due to shutdown in progress"};
+ }
+ // We have now written all database writes and updated the oplog to match.
+ return lastOpTime;
+}
+
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/sync_tail.h b/src/mongo/db/repl/sync_tail.h
index 7fffb39d6e8..9d42ad1ae02 100644
--- a/src/mongo/db/repl/sync_tail.h
+++ b/src/mongo/db/repl/sync_tail.h
@@ -32,6 +32,7 @@
#include "mongo/base/status.h"
#include "mongo/bson/bsonobj.h"
+#include "mongo/db/repl/multiapplier.h"
#include "mongo/db/repl/oplog_entry.h"
#include "mongo/db/storage/mmap_v1/dur.h"
#include "mongo/stdx/functional.h"
@@ -52,8 +53,7 @@ class OpTime;
*/
class SyncTail {
public:
- using MultiSyncApplyFunc =
- stdx::function<void(const std::vector<OplogEntry>& ops, SyncTail* st)>;
+ using MultiSyncApplyFunc = stdx::function<void(const std::vector<OplogEntry>& ops)>;
/**
* Type of function to increment "repl.apply.ops" server status metric.
@@ -178,9 +178,23 @@ private:
OldThreadPool _prefetcherPool;
};
+/**
+ * Applies the opeartions described in the oplog entries contained in "ops" using the
+ * "applyOperation" function.
+ *
+ * Returns ErrorCode::InterruptedAtShutdown if the node enters shutdown while applying ops,
+ * ErrorCodes::CannotApplyOplogWhilePrimary if the node has become primary, and the OpTime of the
+ * final operation applied otherwise.
+ *
+ * Shared between here and MultiApplier.
+ */
+StatusWith<OpTime> multiApply(OperationContext* txn,
+ const MultiApplier::Operations& ops,
+ MultiApplier::ApplyOperationFn applyOperation);
+
// These free functions are used by the thread pool workers to write ops to the db.
-void multiSyncApply(const std::vector<OplogEntry>& ops, SyncTail* st);
-void multiInitialSyncApply(const std::vector<OplogEntry>& ops, SyncTail* st);
+void multiSyncApply(const std::vector<OplogEntry>& ops);
+void multiInitialSyncApply(const std::vector<OplogEntry>& ops);
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/sync_tail_test.cpp b/src/mongo/db/repl/sync_tail_test.cpp
index 3d0829151e5..e157e02f173 100644
--- a/src/mongo/db/repl/sync_tail_test.cpp
+++ b/src/mongo/db/repl/sync_tail_test.cpp
@@ -124,7 +124,7 @@ void SyncTailTest::tearDown() {
TEST_F(SyncTailTest, Peek) {
BackgroundSyncMock bgsync;
- SyncTail syncTail(&bgsync, [](const std::vector<OplogEntry>& ops, SyncTail* st) {});
+ SyncTail syncTail(&bgsync, [](const std::vector<OplogEntry>& ops) {});
BSONObj obj;
ASSERT_FALSE(syncTail.peek(&obj));
}