/**
* Copyright (C) 2013 10gen Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kWrite
#include "mongo/platform/basic.h"
#include "mongo/db/write_concern.h"
#include "mongo/base/counter.h"
#include "mongo/bson/util/bson_extract.h"
#include "mongo/db/client.h"
#include "mongo/db/commands/server_status_metric.h"
#include "mongo/db/service_context.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/repl/optime.h"
#include "mongo/db/repl/replication_coordinator_global.h"
#include "mongo/db/server_options.h"
#include "mongo/db/stats/timer_stats.h"
#include "mongo/db/storage/storage_engine.h"
#include "mongo/db/write_concern_options.h"
#include "mongo/rpc/protocol.h"
#include "mongo/util/fail_point_service.h"
#include "mongo/util/log.h"
namespace mongo {
using std::string;
using repl::OpTime;
static TimerStats gleWtimeStats;
static ServerStatusMetricField displayGleLatency("getLastError.wtime", &gleWtimeStats);
static Counter64 gleWtimeouts;
static ServerStatusMetricField gleWtimeoutsDisplay("getLastError.wtimeouts",
&gleWtimeouts);
void setupSynchronousCommit(OperationContext* txn) {
const WriteConcernOptions& writeConcern = txn->getWriteConcern();
if (writeConcern.syncMode == WriteConcernOptions::SyncMode::JOURNAL ||
writeConcern.syncMode == WriteConcernOptions::SyncMode::FSYNC) {
txn->recoveryUnit()->goingToWaitUntilDurable();
}
}
namespace {
const std::string kLocalDB = "local";
} // namespace
MONGO_FP_DECLARE(hangBeforeWaitingForWriteConcern);
StatusWith extractWriteConcern(OperationContext* txn,
const BSONObj& cmdObj,
const std::string& dbName) {
// The default write concern if empty is w : 1
// Specifying w : 0 is/was allowed, but is interpreted identically to w : 1
WriteConcernOptions writeConcern =
repl::getGlobalReplicationCoordinator()->getGetLastErrorDefault();
if (writeConcern.wNumNodes == 0 && writeConcern.wMode.empty()) {
writeConcern.wNumNodes = 1;
}
BSONElement writeConcernElement;
Status wcStatus = bsonExtractTypedField(cmdObj, "writeConcern", Object, &writeConcernElement);
if (!wcStatus.isOK()) {
if (wcStatus == ErrorCodes::NoSuchKey) {
// Return default write concern if no write concern is given.
return writeConcern;
}
return wcStatus;
}
BSONObj writeConcernObj = writeConcernElement.Obj();
// Empty write concern is interpreted to default.
if (writeConcernObj.isEmpty()) {
return writeConcern;
}
wcStatus = writeConcern.parse(writeConcernObj);
if (!wcStatus.isOK()) {
return wcStatus;
}
wcStatus = validateWriteConcern(txn, writeConcern, dbName);
if (!wcStatus.isOK()) {
return wcStatus;
}
return writeConcern;
}
Status validateWriteConcern(OperationContext* txn,
const WriteConcernOptions& writeConcern,
const std::string& dbName) {
const bool isJournalEnabled = getGlobalServiceContext()->getGlobalStorageEngine()->isDurable();
if (writeConcern.syncMode == WriteConcernOptions::SyncMode::JOURNAL && !isJournalEnabled) {
return Status(ErrorCodes::BadValue,
"cannot use 'j' option when a host does not have journaling enabled");
}
const bool isConfigServer = serverGlobalParams.configsvr;
const bool isLocalDb(dbName == kLocalDB);
const repl::ReplicationCoordinator::Mode replMode =
repl::getGlobalReplicationCoordinator()->getReplicationMode();
if (isConfigServer) {
auto protocol = rpc::getOperationProtocol(txn);
// This here only for v3.0 backwards compatibility.
if (serverGlobalParams.configsvrMode != CatalogManager::ConfigServerMode::CSRS &&
replMode != repl::ReplicationCoordinator::modeReplSet &&
protocol == rpc::Protocol::kOpQuery && writeConcern.wNumNodes == 0 &&
writeConcern.wMode.empty()) {
return Status::OK();
}
if (!writeConcern.validForConfigServers()) {
return Status(
ErrorCodes::BadValue,
str::stream()
<< "w:1 and w:'majority' are the only valid write concerns when writing to "
"config servers, got: " << writeConcern.toBSON().toString());
}
if (serverGlobalParams.configsvrMode == CatalogManager::ConfigServerMode::CSRS &&
replMode == repl::ReplicationCoordinator::modeReplSet && !isLocalDb &&
writeConcern.wMode.empty()) {
invariant(writeConcern.wNumNodes == 1);
return Status(
ErrorCodes::BadValue,
str::stream()
<< "w: 'majority' is the only valid write concern when writing to config "
"server replica sets, got: " << writeConcern.toBSON().toString());
}
}
if (replMode == repl::ReplicationCoordinator::modeNone) {
if (writeConcern.wNumNodes > 1) {
return Status(ErrorCodes::BadValue, "cannot use 'w' > 1 when a host is not replicated");
}
}
if (replMode != repl::ReplicationCoordinator::modeReplSet && !writeConcern.wMode.empty() &&
writeConcern.wMode != WriteConcernOptions::kMajority) {
return Status(ErrorCodes::BadValue,
string("cannot use non-majority 'w' mode ") + writeConcern.wMode +
" when a host is not a member of a replica set");
}
return Status::OK();
}
void WriteConcernResult::appendTo(const WriteConcernOptions& writeConcern,
BSONObjBuilder* result) const {
if (syncMillis >= 0)
result->appendNumber("syncMillis", syncMillis);
if (fsyncFiles >= 0)
result->appendNumber("fsyncFiles", fsyncFiles);
if (wTime >= 0) {
if (wTimedOut)
result->appendNumber("waited", wTime);
else
result->appendNumber("wtime", wTime);
}
if (wTimedOut)
result->appendBool("wtimeout", true);
if (writtenTo.size()) {
BSONArrayBuilder hosts(result->subarrayStart("writtenTo"));
for (size_t i = 0; i < writtenTo.size(); ++i) {
hosts.append(writtenTo[i].toString());
}
} else {
result->appendNull("writtenTo");
}
if (err.empty())
result->appendNull("err");
else
result->append("err", err);
// *** 2.4 SyncClusterConnection compatibility ***
// 2.4 expects either fsync'd files, or a "waited" field exist after running an fsync : true
// GLE, but with journaling we don't actually need to run the fsync (fsync command is
// preferred in 2.6). So we add a "waited" field if one doesn't exist.
if (writeConcern.syncMode == WriteConcernOptions::SyncMode::FSYNC) {
if (fsyncFiles < 0 && (wTime < 0 || !wTimedOut)) {
dassert(result->asTempObj()["waited"].eoo());
result->appendNumber("waited", syncMillis);
}
dassert(result->asTempObj()["fsyncFiles"].numberInt() > 0 ||
!result->asTempObj()["waited"].eoo());
}
}
Status waitForWriteConcern(OperationContext* txn,
const OpTime& replOpTime,
const WriteConcernOptions& writeConcern,
WriteConcernResult* result) {
// We assume all options have been validated earlier, if not, programming error.
// Passing localDB name is a hack to avoid more rigorous check that performed for non local DB.
dassert(validateWriteConcern(txn, writeConcern, kLocalDB).isOK());
// We should never be waiting for write concern while holding any sort of lock, because this may
// lead to situations where the replication heartbeats are stalled.
//
// This check does not hold for writes done through dbeval because it runs with a global X lock.
dassert(!txn->lockState()->isLocked() || txn->getClient()->isInDirectClient());
MONGO_FAIL_POINT_PAUSE_WHILE_SET(hangBeforeWaitingForWriteConcern);
// Next handle blocking on disk
Timer syncTimer;
auto replCoord = repl::getGlobalReplicationCoordinator();
WriteConcernOptions writeConcernWithPopulatedSyncMode =
replCoord->populateUnsetWriteConcernOptionsSyncMode(writeConcern);
switch (writeConcernWithPopulatedSyncMode.syncMode) {
case WriteConcernOptions::SyncMode::UNSET:
severe() << "Attempting to wait on a WriteConcern with an unset sync option";
fassertFailed(34410);
case WriteConcernOptions::SyncMode::NONE:
break;
case WriteConcernOptions::SyncMode::FSYNC: {
StorageEngine* storageEngine = getGlobalServiceContext()->getGlobalStorageEngine();
if (!storageEngine->isDurable()) {
result->fsyncFiles = storageEngine->flushAllFiles(true);
} else {
// We only need to commit the journal if we're durable
txn->recoveryUnit()->waitUntilDurable();
}
break;
}
case WriteConcernOptions::SyncMode::JOURNAL:
if (replCoord->getReplicationMode() != repl::ReplicationCoordinator::Mode::modeNone) {
// Wait for ops to become durable then update replication system's
// knowledge of this.
OpTime appliedOpTime = replCoord->getMyLastAppliedOpTime();
txn->recoveryUnit()->waitUntilDurable();
replCoord->setMyLastDurableOpTimeForward(appliedOpTime);
} else {
txn->recoveryUnit()->waitUntilDurable();
}
break;
}
result->syncMillis = syncTimer.millis();
// Now wait for replication
if (replOpTime.isNull()) {
// no write happened for this client yet
return Status::OK();
}
// needed to avoid incrementing gleWtimeStats SERVER-9005
if (writeConcernWithPopulatedSyncMode.wNumNodes <= 1 &&
writeConcernWithPopulatedSyncMode.wMode.empty()) {
// no desired replication check
return Status::OK();
}
// Now we wait for replication
// Note that replica set stepdowns and gle mode changes are thrown as errors
repl::ReplicationCoordinator::StatusAndDuration replStatus =
repl::getGlobalReplicationCoordinator()->awaitReplication(
txn, replOpTime, writeConcernWithPopulatedSyncMode);
if (replStatus.status == ErrorCodes::WriteConcernFailed) {
gleWtimeouts.increment();
result->err = "timeout";
result->wTimedOut = true;
}
// Add stats
result->writtenTo = repl::getGlobalReplicationCoordinator()->getHostsWrittenTo(
replOpTime,
writeConcernWithPopulatedSyncMode.syncMode == WriteConcernOptions::SyncMode::JOURNAL);
gleWtimeStats.recordMillis(durationCount(replStatus.duration));
result->wTime = durationCount(replStatus.duration);
return replStatus.status;
}
} // namespace mongo