summaryrefslogtreecommitdiff
path: root/src/mongo/db/write_concern.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/write_concern.cpp')
-rw-r--r--src/mongo/db/write_concern.cpp336
1 files changed, 161 insertions, 175 deletions
diff --git a/src/mongo/db/write_concern.cpp b/src/mongo/db/write_concern.cpp
index e1426b20587..cbac6129001 100644
--- a/src/mongo/db/write_concern.cpp
+++ b/src/mongo/db/write_concern.cpp
@@ -44,193 +44,181 @@
namespace mongo {
- using std::string;
- using repl::OpTime;
+using std::string;
+using repl::OpTime;
- static TimerStats gleWtimeStats;
- static ServerStatusMetricField<TimerStats> displayGleLatency("getLastError.wtime",
- &gleWtimeStats );
+static TimerStats gleWtimeStats;
+static ServerStatusMetricField<TimerStats> displayGleLatency("getLastError.wtime", &gleWtimeStats);
- static Counter64 gleWtimeouts;
- static ServerStatusMetricField<Counter64> gleWtimeoutsDisplay("getLastError.wtimeouts",
- &gleWtimeouts );
+static Counter64 gleWtimeouts;
+static ServerStatusMetricField<Counter64> gleWtimeoutsDisplay("getLastError.wtimeouts",
+ &gleWtimeouts);
- void setupSynchronousCommit(OperationContext* txn) {
- const WriteConcernOptions& writeConcern = txn->getWriteConcern();
+void setupSynchronousCommit(OperationContext* txn) {
+ const WriteConcernOptions& writeConcern = txn->getWriteConcern();
- if ( writeConcern.syncMode == WriteConcernOptions::JOURNAL ||
- writeConcern.syncMode == WriteConcernOptions::FSYNC ) {
- txn->recoveryUnit()->goingToWaitUntilDurable();
- }
+ if (writeConcern.syncMode == WriteConcernOptions::JOURNAL ||
+ writeConcern.syncMode == WriteConcernOptions::FSYNC) {
+ txn->recoveryUnit()->goingToWaitUntilDurable();
}
-
- namespace {
- // The consensus protocol requires that w: majority implies j: true on all nodes.
- void addJournalSyncForWMajority(WriteConcernOptions* writeConcern) {
- if (repl::getGlobalReplicationCoordinator()->isV1ElectionProtocol()
- && writeConcern->wMode == WriteConcernOptions::kMajority
- && writeConcern->syncMode == WriteConcernOptions::NONE)
- {
- writeConcern->syncMode = WriteConcernOptions::JOURNAL;
- }
- }
- } // namespace
-
- StatusWith<WriteConcernOptions> extractWriteConcern(const BSONObj& cmdObj) {
- // The default write concern if empty is w : 1
- // Specifying w : 0 is/was allowed, but is interpreted identically to w : 1
- WriteConcernOptions writeConcern = repl::getGlobalReplicationCoordinator()
- ->getGetLastErrorDefault();
- if (writeConcern.wNumNodes == 0 && writeConcern.wMode.empty()) {
- writeConcern.wNumNodes = 1;
- }
- // Upgrade default write concern if necessary.
- addJournalSyncForWMajority(&writeConcern);
-
- BSONElement writeConcernElement;
- Status wcStatus = bsonExtractTypedField(cmdObj,
- "writeConcern",
- Object,
- &writeConcernElement);
- if (!wcStatus.isOK()) {
- if (wcStatus == ErrorCodes::NoSuchKey) {
- // Return default write concern if no write concern is given.
- return writeConcern;
- }
- return wcStatus;
- }
-
- BSONObj writeConcernObj = writeConcernElement.Obj();
- // Empty write concern is interpreted to default.
- if (writeConcernObj.isEmpty()) {
+}
+
+namespace {
+// The consensus protocol requires that w: majority implies j: true on all nodes.
+void addJournalSyncForWMajority(WriteConcernOptions* writeConcern) {
+ if (repl::getGlobalReplicationCoordinator()->isV1ElectionProtocol() &&
+ writeConcern->wMode == WriteConcernOptions::kMajority &&
+ writeConcern->syncMode == WriteConcernOptions::NONE) {
+ writeConcern->syncMode = WriteConcernOptions::JOURNAL;
+ }
+}
+} // namespace
+
+StatusWith<WriteConcernOptions> extractWriteConcern(const BSONObj& cmdObj) {
+ // The default write concern if empty is w : 1
+ // Specifying w : 0 is/was allowed, but is interpreted identically to w : 1
+ WriteConcernOptions writeConcern =
+ repl::getGlobalReplicationCoordinator()->getGetLastErrorDefault();
+ if (writeConcern.wNumNodes == 0 && writeConcern.wMode.empty()) {
+ writeConcern.wNumNodes = 1;
+ }
+ // Upgrade default write concern if necessary.
+ addJournalSyncForWMajority(&writeConcern);
+
+ BSONElement writeConcernElement;
+ Status wcStatus = bsonExtractTypedField(cmdObj, "writeConcern", Object, &writeConcernElement);
+ if (!wcStatus.isOK()) {
+ if (wcStatus == ErrorCodes::NoSuchKey) {
+ // Return default write concern if no write concern is given.
return writeConcern;
}
+ return wcStatus;
+ }
- wcStatus = writeConcern.parse(writeConcernObj);
- if (!wcStatus.isOK()) {
- return wcStatus;
- }
-
- wcStatus = validateWriteConcern(writeConcern);
- if (!wcStatus.isOK()) {
- return wcStatus;
- }
+ BSONObj writeConcernObj = writeConcernElement.Obj();
+ // Empty write concern is interpreted to default.
+ if (writeConcernObj.isEmpty()) {
+ return writeConcern;
+ }
- // Upgrade parsed write concern if necessary.
- addJournalSyncForWMajority(&writeConcern);
+ wcStatus = writeConcern.parse(writeConcernObj);
+ if (!wcStatus.isOK()) {
+ return wcStatus;
+ }
- return writeConcern;
+ wcStatus = validateWriteConcern(writeConcern);
+ if (!wcStatus.isOK()) {
+ return wcStatus;
}
- Status validateWriteConcern( const WriteConcernOptions& writeConcern ) {
- const bool isJournalEnabled = getGlobalServiceContext()->getGlobalStorageEngine()->isDurable();
+ // Upgrade parsed write concern if necessary.
+ addJournalSyncForWMajority(&writeConcern);
- if ( writeConcern.syncMode == WriteConcernOptions::JOURNAL && !isJournalEnabled ) {
- return Status( ErrorCodes::BadValue,
- "cannot use 'j' option when a host does not have journaling enabled" );
- }
+ return writeConcern;
+}
- const bool isConfigServer = serverGlobalParams.configsvr;
- const repl::ReplicationCoordinator::Mode replMode =
- repl::getGlobalReplicationCoordinator()->getReplicationMode();
+Status validateWriteConcern(const WriteConcernOptions& writeConcern) {
+ const bool isJournalEnabled = getGlobalServiceContext()->getGlobalStorageEngine()->isDurable();
- if ( isConfigServer || replMode == repl::ReplicationCoordinator::modeNone ) {
+ if (writeConcern.syncMode == WriteConcernOptions::JOURNAL && !isJournalEnabled) {
+ return Status(ErrorCodes::BadValue,
+ "cannot use 'j' option when a host does not have journaling enabled");
+ }
- // Note that config servers can be replicated (have an oplog), but we still don't allow
- // w > 1
+ const bool isConfigServer = serverGlobalParams.configsvr;
+ const repl::ReplicationCoordinator::Mode replMode =
+ repl::getGlobalReplicationCoordinator()->getReplicationMode();
- if ( writeConcern.wNumNodes > 1 ) {
- return Status( ErrorCodes::BadValue,
- string( "cannot use 'w' > 1 " ) +
- ( isConfigServer ? "on a config server host" :
- "when a host is not replicated" ) );
- }
- }
+ if (isConfigServer || replMode == repl::ReplicationCoordinator::modeNone) {
+ // Note that config servers can be replicated (have an oplog), but we still don't allow
+ // w > 1
- if ( replMode != repl::ReplicationCoordinator::modeReplSet &&
- !writeConcern.wMode.empty() &&
- writeConcern.wMode != WriteConcernOptions::kMajority ) {
- return Status( ErrorCodes::BadValue,
- string( "cannot use non-majority 'w' mode " ) + writeConcern.wMode
- + " when a host is not a member of a replica set" );
+ if (writeConcern.wNumNodes > 1) {
+ return Status(ErrorCodes::BadValue,
+ string("cannot use 'w' > 1 ") + (isConfigServer
+ ? "on a config server host"
+ : "when a host is not replicated"));
}
+ }
- return Status::OK();
+ if (replMode != repl::ReplicationCoordinator::modeReplSet && !writeConcern.wMode.empty() &&
+ writeConcern.wMode != WriteConcernOptions::kMajority) {
+ return Status(ErrorCodes::BadValue,
+ string("cannot use non-majority 'w' mode ") + writeConcern.wMode +
+ " when a host is not a member of a replica set");
}
- void WriteConcernResult::appendTo( const WriteConcernOptions& writeConcern,
- BSONObjBuilder* result ) const {
+ return Status::OK();
+}
- if ( syncMillis >= 0 )
- result->appendNumber( "syncMillis", syncMillis );
+void WriteConcernResult::appendTo(const WriteConcernOptions& writeConcern,
+ BSONObjBuilder* result) const {
+ if (syncMillis >= 0)
+ result->appendNumber("syncMillis", syncMillis);
- if ( fsyncFiles >= 0 )
- result->appendNumber( "fsyncFiles", fsyncFiles );
+ if (fsyncFiles >= 0)
+ result->appendNumber("fsyncFiles", fsyncFiles);
- if ( wTime >= 0 ) {
- if ( wTimedOut )
- result->appendNumber( "waited", wTime );
- else
- result->appendNumber( "wtime", wTime );
- }
+ if (wTime >= 0) {
+ if (wTimedOut)
+ result->appendNumber("waited", wTime);
+ else
+ result->appendNumber("wtime", wTime);
+ }
- if ( wTimedOut )
- result->appendBool( "wtimeout", true );
+ if (wTimedOut)
+ result->appendBool("wtimeout", true);
- if (writtenTo.size()) {
- BSONArrayBuilder hosts(result->subarrayStart("writtenTo"));
- for (size_t i = 0; i < writtenTo.size(); ++i) {
- hosts.append(writtenTo[i].toString());
- }
- }
- else {
- result->appendNull( "writtenTo" );
+ if (writtenTo.size()) {
+ BSONArrayBuilder hosts(result->subarrayStart("writtenTo"));
+ for (size_t i = 0; i < writtenTo.size(); ++i) {
+ hosts.append(writtenTo[i].toString());
}
+ } else {
+ result->appendNull("writtenTo");
+ }
- if ( err.empty() )
- result->appendNull( "err" );
- else
- result->append( "err", err );
-
- // *** 2.4 SyncClusterConnection compatibility ***
- // 2.4 expects either fsync'd files, or a "waited" field exist after running an fsync : true
- // GLE, but with journaling we don't actually need to run the fsync (fsync command is
- // preferred in 2.6). So we add a "waited" field if one doesn't exist.
-
- if ( writeConcern.syncMode == WriteConcernOptions::FSYNC ) {
+ if (err.empty())
+ result->appendNull("err");
+ else
+ result->append("err", err);
- if ( fsyncFiles < 0 && ( wTime < 0 || !wTimedOut ) ) {
- dassert( result->asTempObj()["waited"].eoo() );
- result->appendNumber( "waited", syncMillis );
- }
+ // *** 2.4 SyncClusterConnection compatibility ***
+ // 2.4 expects either fsync'd files, or a "waited" field exist after running an fsync : true
+ // GLE, but with journaling we don't actually need to run the fsync (fsync command is
+ // preferred in 2.6). So we add a "waited" field if one doesn't exist.
- dassert( result->asTempObj()["fsyncFiles"].numberInt() > 0 ||
- !result->asTempObj()["waited"].eoo() );
+ if (writeConcern.syncMode == WriteConcernOptions::FSYNC) {
+ if (fsyncFiles < 0 && (wTime < 0 || !wTimedOut)) {
+ dassert(result->asTempObj()["waited"].eoo());
+ result->appendNumber("waited", syncMillis);
}
- }
- Status waitForWriteConcern( OperationContext* txn,
- const OpTime& replOpTime,
- WriteConcernResult* result ) {
+ dassert(result->asTempObj()["fsyncFiles"].numberInt() > 0 ||
+ !result->asTempObj()["waited"].eoo());
+ }
+}
- const WriteConcernOptions& writeConcern = txn->getWriteConcern();
+Status waitForWriteConcern(OperationContext* txn,
+ const OpTime& replOpTime,
+ WriteConcernResult* result) {
+ const WriteConcernOptions& writeConcern = txn->getWriteConcern();
- // We assume all options have been validated earlier, if not, programming error
- dassert( validateWriteConcern( writeConcern ).isOK() );
+ // We assume all options have been validated earlier, if not, programming error
+ dassert(validateWriteConcern(writeConcern).isOK());
- // Next handle blocking on disk
+ // Next handle blocking on disk
- Timer syncTimer;
+ Timer syncTimer;
- switch( writeConcern.syncMode ) {
+ switch (writeConcern.syncMode) {
case WriteConcernOptions::NONE:
break;
case WriteConcernOptions::FSYNC: {
StorageEngine* storageEngine = getGlobalServiceContext()->getGlobalStorageEngine();
- if ( !storageEngine->isDurable() ) {
- result->fsyncFiles = storageEngine->flushAllFiles( true );
- }
- else {
+ if (!storageEngine->isDurable()) {
+ result->fsyncFiles = storageEngine->flushAllFiles(true);
+ } else {
// We only need to commit the journal if we're durable
txn->recoveryUnit()->waitUntilDurable();
}
@@ -239,40 +227,38 @@ namespace mongo {
case WriteConcernOptions::JOURNAL:
txn->recoveryUnit()->waitUntilDurable();
break;
- }
-
- result->syncMillis = syncTimer.millis();
+ }
- // Now wait for replication
+ result->syncMillis = syncTimer.millis();
- if (replOpTime.isNull()) {
- // no write happened for this client yet
- return Status::OK();
- }
+ // Now wait for replication
- // needed to avoid incrementing gleWtimeStats SERVER-9005
- if (writeConcern.wNumNodes <= 1 && writeConcern.wMode.empty()) {
- // no desired replication check
- return Status::OK();
- }
+ if (replOpTime.isNull()) {
+ // no write happened for this client yet
+ return Status::OK();
+ }
- // Now we wait for replication
- // Note that replica set stepdowns and gle mode changes are thrown as errors
- repl::ReplicationCoordinator::StatusAndDuration replStatus =
- repl::getGlobalReplicationCoordinator()->awaitReplication(txn,
- replOpTime,
- writeConcern);
- if (replStatus.status == ErrorCodes::WriteConcernFailed) {
- gleWtimeouts.increment();
- result->err = "timeout";
- result->wTimedOut = true;
- }
- // Add stats
- result->writtenTo = repl::getGlobalReplicationCoordinator()->getHostsWrittenTo(replOpTime);
- gleWtimeStats.recordMillis(replStatus.duration.count());
- result->wTime = replStatus.duration.count();
+ // needed to avoid incrementing gleWtimeStats SERVER-9005
+ if (writeConcern.wNumNodes <= 1 && writeConcern.wMode.empty()) {
+ // no desired replication check
+ return Status::OK();
+ }
- return replStatus.status;
+ // Now we wait for replication
+ // Note that replica set stepdowns and gle mode changes are thrown as errors
+ repl::ReplicationCoordinator::StatusAndDuration replStatus =
+ repl::getGlobalReplicationCoordinator()->awaitReplication(txn, replOpTime, writeConcern);
+ if (replStatus.status == ErrorCodes::WriteConcernFailed) {
+ gleWtimeouts.increment();
+ result->err = "timeout";
+ result->wTimedOut = true;
}
+ // Add stats
+ result->writtenTo = repl::getGlobalReplicationCoordinator()->getHostsWrittenTo(replOpTime);
+ gleWtimeStats.recordMillis(replStatus.duration.count());
+ result->wTime = replStatus.duration.count();
+
+ return replStatus.status;
+}
-} // namespace mongo
+} // namespace mongo