summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorMax Hirschhorn <max.hirschhorn@mongodb.com>2016-03-23 18:01:25 -0400
committerMax Hirschhorn <max.hirschhorn@mongodb.com>2016-03-23 18:01:25 -0400
commitadde6aca20580af59d21896c90281547c55680c8 (patch)
treeffb4bbb7935817bebd9658636d8e7f8fc4bd6ede /src/mongo
parent20aee8500c87cd43ae72a8c811b7e88f4f737243 (diff)
downloadmongo-adde6aca20580af59d21896c90281547c55680c8.tar.gz
Revert "SERVER-22858 create a new Applier which wraps multiApply"
This reverts commit 1bcf7cc983a9ea51b41b0549ad5a3811bf7057b7.
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/base/error_codes.err1
-rw-r--r--src/mongo/db/commands/mr.cpp3
-rw-r--r--src/mongo/db/repl/SConscript12
-rw-r--r--src/mongo/db/repl/data_replicator.cpp5
-rw-r--r--src/mongo/db/repl/data_replicator.h13
-rw-r--r--src/mongo/db/repl/data_replicator_test.cpp61
-rw-r--r--src/mongo/db/repl/multiapplier.cpp231
-rw-r--r--src/mongo/db/repl/multiapplier.h184
-rw-r--r--src/mongo/db/repl/oplog_entry.cpp26
-rw-r--r--src/mongo/db/repl/oplog_entry.h5
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp7
-rw-r--r--src/mongo/db/repl/sync_tail.cpp157
-rw-r--r--src/mongo/db/repl/sync_tail.h22
-rw-r--r--src/mongo/db/repl/sync_tail_test.cpp2
14 files changed, 115 insertions, 614 deletions
diff --git a/src/mongo/base/error_codes.err b/src/mongo/base/error_codes.err
index 0aa280617aa..000a5e0ca4a 100644
--- a/src/mongo/base/error_codes.err
+++ b/src/mongo/base/error_codes.err
@@ -153,7 +153,6 @@ error_code("OperationCannotBeBatched", 151)
error_code("OplogOutOfOrder", 152)
error_code("ChunkTooBig", 153)
error_code("InconsistentShardIdentity", 154)
-error_code("CannotApplyOplogWhilePrimary", 155)
# Non-sequential error codes (for compatibility only)
error_code("RecvStaleConfig", 9996)
diff --git a/src/mongo/db/commands/mr.cpp b/src/mongo/db/commands/mr.cpp
index c60577e21bb..52ae5cf6f7c 100644
--- a/src/mongo/db/commands/mr.cpp
+++ b/src/mongo/db/commands/mr.cpp
@@ -1472,8 +1472,7 @@ public:
return appendCommandStatus(
result,
Status(ErrorCodes::OperationFailed,
- str::stream()
- << "Executor killed during mapReduce command"));
+ str::stream() << "Executor killed during mapReduce command"));
}
reduceTime += t.micros();
diff --git a/src/mongo/db/repl/SConscript b/src/mongo/db/repl/SConscript
index 30b5d1779f0..a58585bee8b 100644
--- a/src/mongo/db/repl/SConscript
+++ b/src/mongo/db/repl/SConscript
@@ -728,17 +728,6 @@ env.CppUnitTest(
)
env.Library(
- target='multiapplier',
- source=[
- 'multiapplier.cpp',
- ],
- LIBDEPS=[
- 'oplog_entry',
- 'replication_executor',
- ],
-)
-
-env.Library(
target='applier',
source=[
'applier.cpp',
@@ -769,7 +758,6 @@ env.Library(
'applier',
'collection_cloner',
'database_cloner',
- 'multiapplier',
'optime',
'reporter',
'$BUILD_DIR/mongo/client/fetcher',
diff --git a/src/mongo/db/repl/data_replicator.cpp b/src/mongo/db/repl/data_replicator.cpp
index 660d53e1567..feb54538c08 100644
--- a/src/mongo/db/repl/data_replicator.cpp
+++ b/src/mongo/db/repl/data_replicator.cpp
@@ -1142,7 +1142,8 @@ void DataReplicator::_handleFailedApplyBatch(const TimestampStatus& ts, const Op
void DataReplicator::_scheduleApplyAfterFetch(const Operations& ops) {
++_initialSyncState->fetchedMissingDocs;
// TODO: check collection.isCapped, like SyncTail::getMissingDoc
- const BSONElement missingIdElem = ops.begin()->getIdElement();
+ const BSONObj failedOplogEntry = ops.begin()->raw;
+ const BSONElement missingIdElem = failedOplogEntry.getFieldDotted("o2._id");
const NamespaceString nss(ops.begin()->ns);
const BSONObj query = BSON("find" << nss.coll() << "filter" << missingIdElem.wrap());
_tmpFetcher.reset(new QueryFetcher(_exec,
@@ -1253,7 +1254,7 @@ Status DataReplicator::_scheduleApplyBatch_inlock(const Operations& ops) {
_exec->wait(status.getValue());
};
- _applier.reset(new MultiApplier(_exec, ops, _opts.applierFn, _opts.multiApplyFn, lambda));
+ _applier.reset(new Applier(_exec, ops, _opts.applierFn, lambda));
return _applier->start();
}
diff --git a/src/mongo/db/repl/data_replicator.h b/src/mongo/db/repl/data_replicator.h
index 53dda5f96c5..f6503abb8bf 100644
--- a/src/mongo/db/repl/data_replicator.h
+++ b/src/mongo/db/repl/data_replicator.h
@@ -37,7 +37,7 @@
#include "mongo/bson/bsonobj.h"
#include "mongo/bson/timestamp.h"
#include "mongo/db/namespace_string.h"
-#include "mongo/db/repl/multiapplier.h"
+#include "mongo/db/repl/applier.h"
#include "mongo/db/repl/collection_cloner.h"
#include "mongo/db/repl/database_cloner.h"
#include "mongo/db/repl/optime.h"
@@ -56,7 +56,7 @@ class QueryFetcher;
namespace repl {
-using Operations = MultiApplier::Operations;
+using Operations = Applier::Operations;
using QueryResponseStatus = StatusWith<Fetcher::QueryResponse>;
using CallbackArgs = ReplicationExecutor::CallbackArgs;
using CBHStatus = StatusWith<ReplicationExecutor::CallbackHandle>;
@@ -132,8 +132,7 @@ struct DataReplicatorOptions {
std::string scopeNS;
BSONObj filterCriteria;
- MultiApplier::ApplyOperationFn applierFn;
- MultiApplier::MultiApplyFn multiApplyFn;
+ Applier::ApplyOperationFn applierFn;
RollbackFn rollbackFn;
Reporter::PrepareReplSetUpdatePositionCommandFn prepareReplSetUpdatePositionCommandFn;
GetMyLastOptimeFn getMyLastOptime;
@@ -308,9 +307,9 @@ private:
Handle _reporterHandle; // (M)
std::unique_ptr<Reporter> _reporter; // (M)
- bool _applierActive; // (M)
- bool _applierPaused; // (X)
- std::unique_ptr<MultiApplier> _applier; // (M)
+ bool _applierActive; // (M)
+ bool _applierPaused; // (X)
+ std::unique_ptr<Applier> _applier; // (M)
HostAndPort _syncSource; // (M)
Timestamp _lastTimestampFetched; // (MX)
diff --git a/src/mongo/db/repl/data_replicator_test.cpp b/src/mongo/db/repl/data_replicator_test.cpp
index 5d3513522d5..b8199ebf9dd 100644
--- a/src/mongo/db/repl/data_replicator_test.cpp
+++ b/src/mongo/db/repl/data_replicator_test.cpp
@@ -102,11 +102,7 @@ public:
* clear/reset state
*/
void reset() {
- _applierFn = [](const MultiApplier::Operations&) -> Status { return Status::OK(); };
- _multiApplyFn = [](OperationContext*,
- const MultiApplier::Operations& ops,
- MultiApplier::ApplyOperationFn)
- -> StatusWith<OpTime> { return ops.back().getOpTime(); };
+ _applierFn = [](OperationContext*, const OplogEntry&) -> Status { return Status::OK(); };
_rollbackFn = [](OperationContext*, const OpTime&, const HostAndPort&)
-> Status { return Status::OK(); };
_setMyLastOptime = [this](const OpTime& opTime) { _myLastOpTime = opTime; };
@@ -185,11 +181,9 @@ protected:
launchExecutorThread();
DataReplicatorOptions options;
options.initialSyncRetryWait = Milliseconds(0);
- options.applierFn = [this](const MultiApplier::Operations& ops) { return _applierFn(ops); };
- options.multiApplyFn =
- [this](OperationContext* txn,
- const MultiApplier::Operations& ops,
- MultiApplier::ApplyOperationFn func) { return _multiApplyFn(txn, ops, func); };
+ options.applierFn = [this](OperationContext* txn, const OplogEntry& operation) {
+ return _applierFn(txn, operation);
+ };
options.rollbackFn = [this](OperationContext* txn,
const OpTime& lastOpTimeWritten,
const HostAndPort& syncSource) -> Status {
@@ -220,8 +214,7 @@ protected:
// Executor may still invoke callback before shutting down.
}
- MultiApplier::ApplyOperationFn _applierFn;
- MultiApplier::MultiApplyFn _multiApplyFn;
+ Applier::ApplyOperationFn _applierFn;
DataReplicatorOptions::RollbackFn _rollbackFn;
DataReplicatorOptions::SetMyLastOptimeFn _setMyLastOptime;
OpTime _myLastOpTime;
@@ -459,17 +452,15 @@ TEST_F(InitialSyncTest, Complete) {
verifySync();
}
-TEST_F(InitialSyncTest, MissingDocOnMultiApplyCompletes) {
+TEST_F(InitialSyncTest, MissingDocOnApplyCompletes) {
DataReplicatorOptions opts;
int applyCounter{0};
- _multiApplyFn =
- [&](OperationContext*, const MultiApplier::Operations& ops, MultiApplier::ApplyOperationFn)
- -> StatusWith<OpTime> {
- if (++applyCounter == 1) {
- return Status(ErrorCodes::NoMatchingDocument, "failed: missing doc.");
- }
- return ops.back().getOpTime();
- };
+ _applierFn = [&](OperationContext* txn, const OplogEntry& op) {
+ if (++applyCounter == 1) {
+ return Status(ErrorCodes::NoMatchingDocument, "failed: missing doc.");
+ }
+ return Status::OK();
+ };
const std::vector<BSONObj> responses = {
// get latest oplog ts
@@ -879,14 +870,12 @@ TEST_F(SteadyStateTest, PauseDataReplicator) {
unittest::Barrier barrier(2U);
Timestamp lastTimestampApplied;
BSONObj operationApplied;
- _multiApplyFn =
- [&](OperationContext*, const MultiApplier::Operations& ops, MultiApplier::ApplyOperationFn)
- -> StatusWith<OpTime> {
- stdx::lock_guard<stdx::mutex> lock(mutex);
- operationApplied = ops.back().raw;
- barrier.countDownAndWait();
- return ops.back().getOpTime();
- };
+ _applierFn = [&](OperationContext* txn, const OplogEntry& op) {
+ stdx::lock_guard<stdx::mutex> lock(mutex);
+ operationApplied = op.raw;
+ barrier.countDownAndWait();
+ return Status::OK();
+ };
DataReplicatorOptions::SetMyLastOptimeFn oldSetMyLastOptime = _setMyLastOptime;
_setMyLastOptime = [&](const OpTime& opTime) {
oldSetMyLastOptime(opTime);
@@ -963,14 +952,12 @@ TEST_F(SteadyStateTest, ApplyOneOperation) {
unittest::Barrier barrier(2U);
Timestamp lastTimestampApplied;
BSONObj operationApplied;
- _multiApplyFn =
- [&](OperationContext*, const MultiApplier::Operations& ops, MultiApplier::ApplyOperationFn)
- -> StatusWith<OpTime> {
- stdx::lock_guard<stdx::mutex> lock(mutex);
- operationApplied = ops.back().raw;
- barrier.countDownAndWait();
- return ops.back().getOpTime();
- };
+ _applierFn = [&](OperationContext* txn, const OplogEntry& op) {
+ stdx::lock_guard<stdx::mutex> lock(mutex);
+ operationApplied = op.raw;
+ barrier.countDownAndWait();
+ return Status::OK();
+ };
DataReplicatorOptions::SetMyLastOptimeFn oldSetMyLastOptime = _setMyLastOptime;
_setMyLastOptime = [&](const OpTime& opTime) {
oldSetMyLastOptime(opTime);
diff --git a/src/mongo/db/repl/multiapplier.cpp b/src/mongo/db/repl/multiapplier.cpp
deleted file mode 100644
index 8db87b5e1f5..00000000000
--- a/src/mongo/db/repl/multiapplier.cpp
+++ /dev/null
@@ -1,231 +0,0 @@
-/**
- * Copyright (C) 2015 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kReplication
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/db/repl/multiapplier.h"
-
-#include <algorithm>
-
-#include "mongo/db/operation_context.h"
-#include "mongo/db/repl/optime.h"
-#include "mongo/db/repl/replication_executor.h"
-
-namespace mongo {
-namespace repl {
-
-MultiApplier::MultiApplier(ReplicationExecutor* executor,
- const Operations& operations,
- const ApplyOperationFn& applyOperation,
- const MultiApplyFn& multiApply,
- const CallbackFn& onCompletion)
- : _executor(executor),
- _operations(operations),
- _applyOperation(applyOperation),
- _multiApply(multiApply),
- _onCompletion(onCompletion),
- _active(false) {
- uassert(ErrorCodes::BadValue, "null replication executor", executor);
- uassert(ErrorCodes::BadValue, "empty list of operations", !operations.empty());
- uassert(ErrorCodes::FailedToParse,
- str::stream() << "last operation missing 'ts' field: " << operations.back().raw,
- operations.back().raw.hasField("ts"));
- uassert(ErrorCodes::TypeMismatch,
- str::stream() << "'ts' in last operation not a timestamp: " << operations.back().raw,
- BSONType::bsonTimestamp == operations.back().raw.getField("ts").type());
- uassert(ErrorCodes::BadValue, "apply operation function cannot be null", applyOperation);
- uassert(ErrorCodes::BadValue, "callback function cannot be null", onCompletion);
-}
-
-MultiApplier::~MultiApplier() {
- DESTRUCTOR_GUARD(cancel(); wait(););
-}
-
-std::string MultiApplier::getDiagnosticString() const {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- str::stream output;
- output << "MultiApplier";
- output << " executor: " << _executor->getDiagnosticString();
- output << " active: " << _active;
- return output;
-}
-
-bool MultiApplier::isActive() const {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- return _active;
-}
-
-Status MultiApplier::start() {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
-
- if (_active) {
- return Status(ErrorCodes::IllegalOperation, "applier already started");
- }
-
- auto scheduleResult = _executor->scheduleDBWork(
- stdx::bind(&MultiApplier::_callback, this, stdx::placeholders::_1));
- if (!scheduleResult.isOK()) {
- return scheduleResult.getStatus();
- }
-
- _active = true;
- _dbWorkCallbackHandle = scheduleResult.getValue();
-
- return Status::OK();
-}
-
-void MultiApplier::cancel() {
- ReplicationExecutor::CallbackHandle dbWorkCallbackHandle;
- {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
-
- if (!_active) {
- return;
- }
-
- dbWorkCallbackHandle = _dbWorkCallbackHandle;
- }
-
- if (dbWorkCallbackHandle.isValid()) {
- _executor->cancel(dbWorkCallbackHandle);
- }
-}
-
-void MultiApplier::wait() {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
-
- while (_active) {
- _condition.wait(lk);
- }
-}
-
-// TODO change the passed in function to be multiapply instead of apply inlock
-void MultiApplier::_callback(const ReplicationExecutor::CallbackArgs& cbd) {
- if (!cbd.status.isOK()) {
- _finishCallback(cbd.status, _operations);
- return;
- }
-
- invariant(cbd.txn);
-
- // Refer to multiSyncApply() and multiInitialSyncApply() in sync_tail.cpp.
- cbd.txn->setReplicatedWrites(false);
-
- // allow us to get through the magic barrier
- cbd.txn->lockState()->setIsBatchWriter(true);
-
- StatusWith<OpTime> applyStatus(ErrorCodes::InternalError, "not mutated");
-
- invariant(!_operations.empty());
- try {
- applyStatus = _multiApply(cbd.txn, _operations, _applyOperation);
- } catch (...) {
- applyStatus = exceptionToStatus();
- }
- if (!applyStatus.isOK()) {
- _finishCallback(applyStatus.getStatus(), _operations);
- return;
- }
- _finishCallback(applyStatus.getValue().getTimestamp(), Operations());
-}
-
-void MultiApplier::_finishCallback(const StatusWith<Timestamp>& result,
- const Operations& operations) {
- _onCompletion(result, operations);
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- _active = false;
- _condition.notify_all();
-}
-
-namespace {
-
-void pauseBeforeCompletion(const StatusWith<Timestamp>& result,
- const MultiApplier::Operations& operationsOnCompletion,
- const PauseDataReplicatorFn& pauseDataReplicator,
- const MultiApplier::CallbackFn& onCompletion) {
- if (result.isOK()) {
- pauseDataReplicator();
- }
- onCompletion(result, operationsOnCompletion);
-};
-
-} // namespace
-
-StatusWith<std::pair<std::unique_ptr<MultiApplier>, MultiApplier::Operations>> applyUntilAndPause(
- ReplicationExecutor* executor,
- const MultiApplier::Operations& operations,
- const MultiApplier::ApplyOperationFn& applyOperation,
- const MultiApplier::MultiApplyFn& multiApply,
- const Timestamp& lastTimestampToApply,
- const PauseDataReplicatorFn& pauseDataReplicator,
- const MultiApplier::CallbackFn& onCompletion) {
- try {
- auto comp = [](const OplogEntry& left, const OplogEntry& right) {
- uassert(ErrorCodes::FailedToParse,
- str::stream() << "Operation missing 'ts' field': " << left.raw,
- left.raw.hasField("ts"));
- uassert(ErrorCodes::FailedToParse,
- str::stream() << "Operation missing 'ts' field': " << right.raw,
- right.raw.hasField("ts"));
- return left.raw["ts"].timestamp() < right.raw["ts"].timestamp();
- };
- auto wrapped = OplogEntry(BSON("ts" << lastTimestampToApply));
- auto i = std::lower_bound(operations.cbegin(), operations.cend(), wrapped, comp);
- bool found = i != operations.cend() && !comp(wrapped, *i);
- auto j = found ? i + 1 : i;
- MultiApplier::Operations operationsInRange(operations.cbegin(), j);
- MultiApplier::Operations operationsNotInRange(j, operations.cend());
- if (!found) {
- return std::make_pair(
- std::unique_ptr<MultiApplier>(new MultiApplier(
- executor, operationsInRange, applyOperation, multiApply, onCompletion)),
- operationsNotInRange);
- }
-
- return std::make_pair(
- std::unique_ptr<MultiApplier>(new MultiApplier(executor,
- operationsInRange,
- applyOperation,
- multiApply,
- stdx::bind(pauseBeforeCompletion,
- stdx::placeholders::_1,
- stdx::placeholders::_2,
- pauseDataReplicator,
- onCompletion))),
- operationsNotInRange);
- } catch (...) {
- return exceptionToStatus();
- }
- MONGO_UNREACHABLE;
- return Status(ErrorCodes::InternalError, "unreachable");
-}
-
-} // namespace repl
-} // namespace mongo
diff --git a/src/mongo/db/repl/multiapplier.h b/src/mongo/db/repl/multiapplier.h
deleted file mode 100644
index f3d5ea3badf..00000000000
--- a/src/mongo/db/repl/multiapplier.h
+++ /dev/null
@@ -1,184 +0,0 @@
-/**
- * Copyright (C) 2015 MongoDB Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#pragma once
-
-#include <memory>
-#include <string>
-#include <utility>
-#include <vector>
-
-#include "mongo/base/disallow_copying.h"
-#include "mongo/base/status.h"
-#include "mongo/base/status_with.h"
-#include "mongo/db/jsobj.h"
-#include "mongo/db/repl/oplog_entry.h"
-#include "mongo/db/repl/replication_executor.h"
-#include "mongo/db/service_context.h"
-#include "mongo/stdx/condition_variable.h"
-#include "mongo/stdx/functional.h"
-#include "mongo/stdx/mutex.h"
-
-namespace mongo {
-namespace repl {
-
-class OpTime;
-
-class MultiApplier {
- MONGO_DISALLOW_COPYING(MultiApplier);
-
-public:
- static const size_t kReplWriterThreadCount = 16;
-
- /**
- * Operations sorted by timestamp in ascending order.
- */
- using Operations = std::vector<OplogEntry>;
-
- /**
- * Callback function to report final status of applying operations along with
- * list of operations (if applicable) that were not successfully applied.
- * On success, returns the timestamp of the last operation applied together with an empty
- * list of operations.
- */
- using CallbackFn = stdx::function<void(const StatusWith<Timestamp>&, const Operations&)>;
-
- /**
- * Type of function to to apply a single operation. In production, this function
- * would have the same outcome as calling SyncTail::syncApply() ('convertUpdatesToUpserts'
- * value will be embedded in the function implementation).
- */
- using ApplyOperationFn = stdx::function<void(const Operations&)>;
-
- using MultiApplyFn = stdx::function<StatusWith<OpTime>(
- OperationContext*, const MultiApplier::Operations&, MultiApplier::ApplyOperationFn)>;
-
- /**
- * Creates MultiApplier in inactive state.
- *
- * Accepts list of oplog entries to apply in 'operations'.
- *
- * The callback function will be invoked (after schedule()) when the applied has
- * successfully applied all the operations or encountered a failure. Failures may occur if
- * we failed to apply an operation; or if the underlying scheduled work item
- * on the replication executor was canceled.
- *
- * It is an error for 'operations' to be empty but individual oplog entries
- * contained in 'operations' are not validated.
- */
- MultiApplier(ReplicationExecutor* executor,
- const Operations& operations,
- const ApplyOperationFn& applyOperation,
- const MultiApplyFn& multiApply,
- const CallbackFn& onCompletion);
-
- /**
- * Blocks while applier is active.
- */
- virtual ~MultiApplier();
-
- /**
- * Returns diagnostic information.
- */
- std::string getDiagnosticString() const;
-
- /**
- * Returns true if the applier has been started (but has not completed).
- */
- bool isActive() const;
-
- /**
- * Starts applier by scheduling initial db work to be run by the executor.
- */
- Status start();
-
- /**
- * Cancels current db work request.
- * Returns immediately if applier is not active.
- *
- * Callback function may be invoked with an ErrorCodes::CallbackCanceled status.
- */
- void cancel();
-
- /**
- * Waits for active database worker to complete.
- * Returns immediately if applier is not active.
- */
- void wait();
-
-private:
- /**
- * DB worker callback function - applies all operations.
- */
- void _callback(const ReplicationExecutor::CallbackArgs& cbd);
- void _finishCallback(const StatusWith<Timestamp>& result, const Operations& operations);
-
- // Not owned by us.
- ReplicationExecutor* _executor;
-
- Operations _operations;
- ApplyOperationFn _applyOperation;
- MultiApplyFn _multiApply;
- CallbackFn _onCompletion;
-
- // Protects member data of this MultiApplier.
- mutable stdx::mutex _mutex;
-
- stdx::condition_variable _condition;
-
- // _active is true when MultiApplier is scheduled to be run by the executor.
- bool _active;
-
- ReplicationExecutor::CallbackHandle _dbWorkCallbackHandle;
-};
-
-
-/**
- * Applies operations (sorted by timestamp) up to and including 'lastTimestampToApply'.
- * If 'lastTimestampToApply' is found in 'operations':
- * - The applier will be given a subset of 'operations' (includes 'lastTimestampToApply').
- * - On success, the applier will invoke the 'pause' function just before reporting
- * completion status.
- * Otherwise, all entries in 'operations' before 'lastTimestampToApply' will be forwarded to
- * the applier and the 'pause' function will be ignored.
- * If the applier is successfully created, returns the applier and a list of operations that
- * are skipped (operations with 'ts' field value after 'lastTimestampToApply).
- */
-using PauseDataReplicatorFn = stdx::function<void()>;
-
-StatusWith<std::pair<std::unique_ptr<MultiApplier>, MultiApplier::Operations>> applyUntilAndPause(
- ReplicationExecutor* executor,
- const MultiApplier::Operations& operations,
- const MultiApplier::ApplyOperationFn& applyOperation,
- const MultiApplier::ApplyOperationFn& multiApply,
- const Timestamp& lastTimestampToApply,
- const PauseDataReplicatorFn& pauseDataReplicator,
- const MultiApplier::CallbackFn& onCompletion);
-
-} // namespace repl
-} // namespace mongo
diff --git a/src/mongo/db/repl/oplog_entry.cpp b/src/mongo/db/repl/oplog_entry.cpp
index 477b1538d3f..75bba6782b4 100644
--- a/src/mongo/db/repl/oplog_entry.cpp
+++ b/src/mongo/db/repl/oplog_entry.cpp
@@ -62,16 +62,6 @@ bool OplogEntry::isCommand() const {
return opType[0] == 'c';
}
-bool OplogEntry::isCrudOpType() const {
- switch (opType[0]) {
- case 'd':
- case 'i':
- case 'u':
- return opType[1] == 0;
- }
- return false;
-}
-
bool OplogEntry::hasNamespace() const {
return !ns.empty();
}
@@ -80,22 +70,6 @@ int OplogEntry::getVersion() const {
return version.eoo() ? 1 : version.Int();
}
-BSONElement OplogEntry::getIdElement() const {
- invariant(isCrudOpType());
- switch (opType[0]) {
- case 'u':
- return o2.Obj()["_id"];
- case 'd':
- case 'i':
- return o.Obj()["_id"];
- }
- MONGO_UNREACHABLE;
-}
-
-OpTime OplogEntry::getOpTime() const {
- return fassertStatusOK(34436, OpTime::parseFromOplogEntry(raw));
-}
-
Seconds OplogEntry::getTimestampSecs() const {
return Seconds(ts.timestamp().getSecs());
}
diff --git a/src/mongo/db/repl/oplog_entry.h b/src/mongo/db/repl/oplog_entry.h
index 90f7bdbe8bc..08fd1c8d95a 100644
--- a/src/mongo/db/repl/oplog_entry.h
+++ b/src/mongo/db/repl/oplog_entry.h
@@ -30,8 +30,6 @@
#include "mongo/bson/bsonobj.h"
-#include "mongo/db/repl/optime.h"
-
namespace mongo {
namespace repl {
@@ -54,11 +52,8 @@ struct OplogEntry {
bool isForCappedCollection = false;
bool isCommand() const;
- bool isCrudOpType() const;
bool hasNamespace() const;
int getVersion() const;
- BSONElement getIdElement() const;
- OpTime getOpTime() const;
Seconds getTimestampSecs() const;
StringData getCollectionName() const;
std::string toString() const;
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index e679c4dc3d7..e4cf3c2b943 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -182,12 +182,9 @@ ReplicationCoordinator::Mode getReplicationModeFromSettings(const ReplSettings&
DataReplicatorOptions createDataReplicatorOptions(ReplicationCoordinator* replCoord) {
DataReplicatorOptions options;
- options.applierFn = [](const MultiApplier::Operations&) -> Status { return Status::OK(); };
- options.multiApplyFn =
- [](OperationContext*, const MultiApplier::Operations&, MultiApplier::ApplyOperationFn)
- -> OpTime { return OpTime(); };
+ options.applierFn = [](OperationContext*, const OplogEntry&) -> Status { return Status::OK(); };
options.rollbackFn =
- [](OperationContext*, const OpTime&, const HostAndPort&) -> Status { return Status::OK(); };
+ [](OperationContext*, const OpTime&, const HostAndPort&) { return Status::OK(); };
options.prepareReplSetUpdatePositionCommandFn =
[replCoord](ReplicationCoordinator::ReplSetUpdatePositionCommandStyle commandStyle)
-> StatusWith<BSONObj> {
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index cacfd14590e..cdee4896fab 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -56,7 +56,6 @@
#include "mongo/db/query/query_knobs.h"
#include "mongo/db/repl/bgsync.h"
#include "mongo/db/repl/minvalid.h"
-#include "mongo/db/repl/multiapplier.h"
#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/oplogreader.h"
#include "mongo/db/repl/repl_client_info.h"
@@ -433,7 +432,7 @@ void prefetchOp(const BSONObj& op) {
}
// Doles out all the work to the reader pool threads and waits for them to complete
-void prefetchOps(const MultiApplier::Operations& ops, OldThreadPool* prefetcherPool) {
+void prefetchOps(const std::deque<OplogEntry>& ops, OldThreadPool* prefetcherPool) {
invariant(prefetcherPool);
for (auto&& op : ops) {
prefetcherPool->schedule(&prefetchOp, op.raw);
@@ -442,13 +441,16 @@ void prefetchOps(const MultiApplier::Operations& ops, OldThreadPool* prefetcherP
}
// Doles out all the work to the writer pool threads and waits for them to complete
-void applyOps(const std::vector<MultiApplier::Operations>& writerVectors,
+void applyOps(const std::vector<std::vector<OplogEntry>>& writerVectors,
OldThreadPool* writerPool,
- SyncTail::MultiSyncApplyFunc func) {
+ SyncTail::MultiSyncApplyFunc func,
+ SyncTail* sync) {
TimerHolder timer(&applyBatchStats);
- for (auto&& ops : writerVectors) {
- if (!ops.empty()) {
- writerPool->schedule(func, stdx::cref(ops));
+ for (std::vector<std::vector<OplogEntry>>::const_iterator it = writerVectors.begin();
+ it != writerVectors.end();
+ ++it) {
+ if (!it->empty()) {
+ writerPool->schedule(func, stdx::cref(*it), sync);
}
}
}
@@ -484,8 +486,8 @@ private:
};
void fillWriterVectors(OperationContext* txn,
- const MultiApplier::Operations& ops,
- std::vector<MultiApplier::Operations>* writerVectors) {
+ const std::deque<OplogEntry>& ops,
+ std::vector<std::vector<OplogEntry>>* writerVectors) {
const bool supportsDocLocking =
getGlobalServiceContext()->getGlobalStorageEngine()->supportsDocLocking();
const uint32_t numWriters = writerVectors->size();
@@ -498,12 +500,24 @@ void fillWriterVectors(OperationContext* txn,
StringMapTraits::HashedKey hashedNs(op.ns);
uint32_t hash = hashedNs.hash();
+ const char* opType = op.opType.rawData();
+
// For doc locking engines, include the _id of the document in the hash so we get
// parallelism even if all writes are to a single collection. We can't do this for capped
// collections because the order of inserts is a guaranteed property, unlike for normal
// collections.
- if (supportsDocLocking && op.isCrudOpType() && !isCapped(txn, hashedNs)) {
- BSONElement id = op.getIdElement();
+ if (supportsDocLocking && isCrudOpType(opType) && !isCapped(txn, hashedNs)) {
+ BSONElement id;
+ switch (opType[0]) {
+ case 'u':
+ id = op.o2.Obj()["_id"];
+ break;
+ case 'd':
+ case 'i':
+ id = op.o.Obj()["_id"];
+ break;
+ }
+
const size_t idHash = BSONElement::Hasher()(id);
MurmurHash3_x86_32(&idHash, sizeof(idHash), hash, &hash);
}
@@ -525,17 +539,50 @@ void fillWriterVectors(OperationContext* txn,
// Applies a batch of oplog entries, by using a set of threads to apply the operations and then
// writes the oplog entries to the local oplog.
OpTime SyncTail::multiApply(OperationContext* txn, const OpQueue& ops) {
- auto convertToVector = ops.getDeque();
- auto status = repl::multiApply(
- txn, MultiApplier::Operations(convertToVector.begin(), convertToVector.end()), _applyFunc);
- if (!status.isOK()) {
- if (status == ErrorCodes::InterruptedAtShutdown) {
- return OpTime();
- } else {
- fassertStatusOK(34437, status);
+ invariant(_applyFunc);
+
+ if (getGlobalServiceContext()->getGlobalStorageEngine()->isMmapV1()) {
+ // Use a ThreadPool to prefetch all the operations in a batch.
+ prefetchOps(ops.getDeque(), &_prefetcherPool);
+ }
+
+ std::vector<std::vector<OplogEntry>> writerVectors(replWriterThreadCount);
+
+ fillWriterVectors(txn, ops.getDeque(), &writerVectors);
+ LOG(2) << "replication batch size is " << ops.getDeque().size() << endl;
+ // We must grab this because we're going to grab write locks later.
+ // We hold this mutex the entire time we're writing; it doesn't matter
+ // because all readers are blocked anyway.
+ stdx::lock_guard<SimpleMutex> fsynclk(filesLockedFsync);
+
+ // stop all readers until we're done
+ Lock::ParallelBatchWriterMode pbwm(txn->lockState());
+
+ ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator();
+ if (replCoord->getMemberState().primary() && !replCoord->isWaitingForApplierToDrain()) {
+ severe() << "attempting to replicate ops while primary";
+ fassertFailed(28527);
+ }
+
+ applyOps(writerVectors, &_writerPool, _applyFunc, this);
+
+ OpTime lastOpTime;
+ {
+ ON_BLOCK_EXIT([&] { _writerPool.join(); });
+ std::vector<BSONObj> raws;
+ raws.reserve(ops.getDeque().size());
+ for (auto&& op : ops.getDeque()) {
+ raws.emplace_back(op.raw);
}
+ lastOpTime = writeOpsToOplog(txn, raws);
+ }
+
+ if (inShutdownStrict()) {
+ log() << "Cannot apply operations due to shutdown in progress";
+ return OpTime();
}
- return status.getValue();
+ // We have now written all database writes and updated the oplog to match.
+ return lastOpTime;
}
namespace {
@@ -966,7 +1013,7 @@ static void initializeWriterThread() {
}
// This free function is used by the writer threads to apply each op
-void multiSyncApply(const std::vector<OplogEntry>& ops) {
+void multiSyncApply(const std::vector<OplogEntry>& ops, SyncTail* st) {
std::vector<OplogEntry> oplogEntries(ops.begin(), ops.end());
std::vector<OplogEntry*> oplogEntryPointers(oplogEntries.size());
for (size_t i = 0; i < oplogEntries.size(); i++) {
@@ -1083,7 +1130,7 @@ void multiSyncApply(const std::vector<OplogEntry>& ops) {
}
// This free function is used by the initial sync writer threads to apply each op
-void multiInitialSyncApply(const std::vector<OplogEntry>& ops) {
+void multiInitialSyncApply(const std::vector<OplogEntry>& ops, SyncTail* st) {
initializeWriterThread();
OperationContextImpl txn;
@@ -1099,17 +1146,14 @@ void multiInitialSyncApply(const std::vector<OplogEntry>& ops) {
try {
const Status s = SyncTail::syncApply(&txn, it->raw, convertUpdatesToUpserts);
if (!s.isOK()) {
- /* TODO (dannenberg) fix this part. SERVER-23308
if (st->shouldRetry(&txn, it->raw)) {
const Status s2 = SyncTail::syncApply(&txn, it->raw, convertUpdatesToUpserts);
if (!s2.isOK()) {
- */
- severe() << "Error applying operation (" << it->raw.toString() << "): " << s;
- fassertFailedNoTrace(15915);
- /*
- }
- }
-*/
+ severe() << "Error applying operation (" << it->raw.toString()
+ << "): " << s2;
+ fassertFailedNoTrace(15915);
+ }
+ }
// If shouldRetry() returns false, fall through.
// This can happen if the document that was moved and missed by Cloner
@@ -1128,58 +1172,5 @@ void multiInitialSyncApply(const std::vector<OplogEntry>& ops) {
}
}
-StatusWith<OpTime> multiApply(OperationContext* txn,
- const MultiApplier::Operations& ops,
- MultiApplier::ApplyOperationFn applyOperation) {
- invariant(applyOperation);
-
- OldThreadPool workerPool(MultiApplier::kReplWriterThreadCount, "repl writer worker ");
-
- if (getGlobalServiceContext()->getGlobalStorageEngine()->isMmapV1()) {
- // Use a ThreadPool to prefetch all the operations in a batch.
- prefetchOps(ops, &workerPool);
- }
-
- std::vector<std::vector<OplogEntry>> writerVectors(MultiApplier::kReplWriterThreadCount);
-
- fillWriterVectors(txn, ops, &writerVectors);
- LOG(2) << "replication batch size is " << ops.size();
- // We must grab this because we're going to grab write locks later.
- // We hold this mutex the entire time we're writing; it doesn't matter
- // because all readers are blocked anyway.
- stdx::lock_guard<SimpleMutex> fsynclk(filesLockedFsync);
-
- // stop all readers until we're done
- Lock::ParallelBatchWriterMode pbwm(txn->lockState());
-
- auto replCoord = ReplicationCoordinator::get(txn);
- if (replCoord->getMemberState().primary() && !replCoord->isWaitingForApplierToDrain()) {
- severe() << "attempting to replicate ops while primary";
- return {ErrorCodes::CannotApplyOplogWhilePrimary,
- "attempting to replicate ops while primary"};
- }
-
- applyOps(writerVectors, &workerPool, applyOperation);
-
- OpTime lastOpTime;
- {
- ON_BLOCK_EXIT([&] { workerPool.join(); });
- std::vector<BSONObj> raws;
- raws.reserve(ops.size());
- for (auto&& op : ops) {
- raws.emplace_back(op.raw);
- }
- lastOpTime = writeOpsToOplog(txn, raws);
- }
-
- if (inShutdownStrict()) {
- log() << "Cannot apply operations due to shutdown in progress";
- return {ErrorCodes::InterruptedAtShutdown,
- "Cannot apply operations due to shutdown in progress"};
- }
- // We have now written all database writes and updated the oplog to match.
- return lastOpTime;
-}
-
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/sync_tail.h b/src/mongo/db/repl/sync_tail.h
index 9d42ad1ae02..7fffb39d6e8 100644
--- a/src/mongo/db/repl/sync_tail.h
+++ b/src/mongo/db/repl/sync_tail.h
@@ -32,7 +32,6 @@
#include "mongo/base/status.h"
#include "mongo/bson/bsonobj.h"
-#include "mongo/db/repl/multiapplier.h"
#include "mongo/db/repl/oplog_entry.h"
#include "mongo/db/storage/mmap_v1/dur.h"
#include "mongo/stdx/functional.h"
@@ -53,7 +52,8 @@ class OpTime;
*/
class SyncTail {
public:
- using MultiSyncApplyFunc = stdx::function<void(const std::vector<OplogEntry>& ops)>;
+ using MultiSyncApplyFunc =
+ stdx::function<void(const std::vector<OplogEntry>& ops, SyncTail* st)>;
/**
* Type of function to increment "repl.apply.ops" server status metric.
@@ -178,23 +178,9 @@ private:
OldThreadPool _prefetcherPool;
};
-/**
- * Applies the opeartions described in the oplog entries contained in "ops" using the
- * "applyOperation" function.
- *
- * Returns ErrorCode::InterruptedAtShutdown if the node enters shutdown while applying ops,
- * ErrorCodes::CannotApplyOplogWhilePrimary if the node has become primary, and the OpTime of the
- * final operation applied otherwise.
- *
- * Shared between here and MultiApplier.
- */
-StatusWith<OpTime> multiApply(OperationContext* txn,
- const MultiApplier::Operations& ops,
- MultiApplier::ApplyOperationFn applyOperation);
-
// These free functions are used by the thread pool workers to write ops to the db.
-void multiSyncApply(const std::vector<OplogEntry>& ops);
-void multiInitialSyncApply(const std::vector<OplogEntry>& ops);
+void multiSyncApply(const std::vector<OplogEntry>& ops, SyncTail* st);
+void multiInitialSyncApply(const std::vector<OplogEntry>& ops, SyncTail* st);
} // namespace repl
} // namespace mongo
diff --git a/src/mongo/db/repl/sync_tail_test.cpp b/src/mongo/db/repl/sync_tail_test.cpp
index e157e02f173..3d0829151e5 100644
--- a/src/mongo/db/repl/sync_tail_test.cpp
+++ b/src/mongo/db/repl/sync_tail_test.cpp
@@ -124,7 +124,7 @@ void SyncTailTest::tearDown() {
TEST_F(SyncTailTest, Peek) {
BackgroundSyncMock bgsync;
- SyncTail syncTail(&bgsync, [](const std::vector<OplogEntry>& ops) {});
+ SyncTail syncTail(&bgsync, [](const std::vector<OplogEntry>& ops, SyncTail* st) {});
BSONObj obj;
ASSERT_FALSE(syncTail.peek(&obj));
}