diff options
Diffstat (limited to 'src/mongo/db/write_concern.cpp')
-rw-r--r-- | src/mongo/db/write_concern.cpp | 336 |
1 files changed, 161 insertions, 175 deletions
diff --git a/src/mongo/db/write_concern.cpp b/src/mongo/db/write_concern.cpp index e1426b20587..cbac6129001 100644 --- a/src/mongo/db/write_concern.cpp +++ b/src/mongo/db/write_concern.cpp @@ -44,193 +44,181 @@ namespace mongo { - using std::string; - using repl::OpTime; +using std::string; +using repl::OpTime; - static TimerStats gleWtimeStats; - static ServerStatusMetricField<TimerStats> displayGleLatency("getLastError.wtime", - &gleWtimeStats ); +static TimerStats gleWtimeStats; +static ServerStatusMetricField<TimerStats> displayGleLatency("getLastError.wtime", &gleWtimeStats); - static Counter64 gleWtimeouts; - static ServerStatusMetricField<Counter64> gleWtimeoutsDisplay("getLastError.wtimeouts", - &gleWtimeouts ); +static Counter64 gleWtimeouts; +static ServerStatusMetricField<Counter64> gleWtimeoutsDisplay("getLastError.wtimeouts", + &gleWtimeouts); - void setupSynchronousCommit(OperationContext* txn) { - const WriteConcernOptions& writeConcern = txn->getWriteConcern(); +void setupSynchronousCommit(OperationContext* txn) { + const WriteConcernOptions& writeConcern = txn->getWriteConcern(); - if ( writeConcern.syncMode == WriteConcernOptions::JOURNAL || - writeConcern.syncMode == WriteConcernOptions::FSYNC ) { - txn->recoveryUnit()->goingToWaitUntilDurable(); - } + if (writeConcern.syncMode == WriteConcernOptions::JOURNAL || + writeConcern.syncMode == WriteConcernOptions::FSYNC) { + txn->recoveryUnit()->goingToWaitUntilDurable(); } - - namespace { - // The consensus protocol requires that w: majority implies j: true on all nodes. - void addJournalSyncForWMajority(WriteConcernOptions* writeConcern) { - if (repl::getGlobalReplicationCoordinator()->isV1ElectionProtocol() - && writeConcern->wMode == WriteConcernOptions::kMajority - && writeConcern->syncMode == WriteConcernOptions::NONE) - { - writeConcern->syncMode = WriteConcernOptions::JOURNAL; - } - } - } // namespace - - StatusWith<WriteConcernOptions> extractWriteConcern(const BSONObj& cmdObj) { - // The default write concern if empty is w : 1 - // Specifying w : 0 is/was allowed, but is interpreted identically to w : 1 - WriteConcernOptions writeConcern = repl::getGlobalReplicationCoordinator() - ->getGetLastErrorDefault(); - if (writeConcern.wNumNodes == 0 && writeConcern.wMode.empty()) { - writeConcern.wNumNodes = 1; - } - // Upgrade default write concern if necessary. - addJournalSyncForWMajority(&writeConcern); - - BSONElement writeConcernElement; - Status wcStatus = bsonExtractTypedField(cmdObj, - "writeConcern", - Object, - &writeConcernElement); - if (!wcStatus.isOK()) { - if (wcStatus == ErrorCodes::NoSuchKey) { - // Return default write concern if no write concern is given. - return writeConcern; - } - return wcStatus; - } - - BSONObj writeConcernObj = writeConcernElement.Obj(); - // Empty write concern is interpreted to default. - if (writeConcernObj.isEmpty()) { +} + +namespace { +// The consensus protocol requires that w: majority implies j: true on all nodes. +void addJournalSyncForWMajority(WriteConcernOptions* writeConcern) { + if (repl::getGlobalReplicationCoordinator()->isV1ElectionProtocol() && + writeConcern->wMode == WriteConcernOptions::kMajority && + writeConcern->syncMode == WriteConcernOptions::NONE) { + writeConcern->syncMode = WriteConcernOptions::JOURNAL; + } +} +} // namespace + +StatusWith<WriteConcernOptions> extractWriteConcern(const BSONObj& cmdObj) { + // The default write concern if empty is w : 1 + // Specifying w : 0 is/was allowed, but is interpreted identically to w : 1 + WriteConcernOptions writeConcern = + repl::getGlobalReplicationCoordinator()->getGetLastErrorDefault(); + if (writeConcern.wNumNodes == 0 && writeConcern.wMode.empty()) { + writeConcern.wNumNodes = 1; + } + // Upgrade default write concern if necessary. + addJournalSyncForWMajority(&writeConcern); + + BSONElement writeConcernElement; + Status wcStatus = bsonExtractTypedField(cmdObj, "writeConcern", Object, &writeConcernElement); + if (!wcStatus.isOK()) { + if (wcStatus == ErrorCodes::NoSuchKey) { + // Return default write concern if no write concern is given. return writeConcern; } + return wcStatus; + } - wcStatus = writeConcern.parse(writeConcernObj); - if (!wcStatus.isOK()) { - return wcStatus; - } - - wcStatus = validateWriteConcern(writeConcern); - if (!wcStatus.isOK()) { - return wcStatus; - } + BSONObj writeConcernObj = writeConcernElement.Obj(); + // Empty write concern is interpreted to default. + if (writeConcernObj.isEmpty()) { + return writeConcern; + } - // Upgrade parsed write concern if necessary. - addJournalSyncForWMajority(&writeConcern); + wcStatus = writeConcern.parse(writeConcernObj); + if (!wcStatus.isOK()) { + return wcStatus; + } - return writeConcern; + wcStatus = validateWriteConcern(writeConcern); + if (!wcStatus.isOK()) { + return wcStatus; } - Status validateWriteConcern( const WriteConcernOptions& writeConcern ) { - const bool isJournalEnabled = getGlobalServiceContext()->getGlobalStorageEngine()->isDurable(); + // Upgrade parsed write concern if necessary. + addJournalSyncForWMajority(&writeConcern); - if ( writeConcern.syncMode == WriteConcernOptions::JOURNAL && !isJournalEnabled ) { - return Status( ErrorCodes::BadValue, - "cannot use 'j' option when a host does not have journaling enabled" ); - } + return writeConcern; +} - const bool isConfigServer = serverGlobalParams.configsvr; - const repl::ReplicationCoordinator::Mode replMode = - repl::getGlobalReplicationCoordinator()->getReplicationMode(); +Status validateWriteConcern(const WriteConcernOptions& writeConcern) { + const bool isJournalEnabled = getGlobalServiceContext()->getGlobalStorageEngine()->isDurable(); - if ( isConfigServer || replMode == repl::ReplicationCoordinator::modeNone ) { + if (writeConcern.syncMode == WriteConcernOptions::JOURNAL && !isJournalEnabled) { + return Status(ErrorCodes::BadValue, + "cannot use 'j' option when a host does not have journaling enabled"); + } - // Note that config servers can be replicated (have an oplog), but we still don't allow - // w > 1 + const bool isConfigServer = serverGlobalParams.configsvr; + const repl::ReplicationCoordinator::Mode replMode = + repl::getGlobalReplicationCoordinator()->getReplicationMode(); - if ( writeConcern.wNumNodes > 1 ) { - return Status( ErrorCodes::BadValue, - string( "cannot use 'w' > 1 " ) + - ( isConfigServer ? "on a config server host" : - "when a host is not replicated" ) ); - } - } + if (isConfigServer || replMode == repl::ReplicationCoordinator::modeNone) { + // Note that config servers can be replicated (have an oplog), but we still don't allow + // w > 1 - if ( replMode != repl::ReplicationCoordinator::modeReplSet && - !writeConcern.wMode.empty() && - writeConcern.wMode != WriteConcernOptions::kMajority ) { - return Status( ErrorCodes::BadValue, - string( "cannot use non-majority 'w' mode " ) + writeConcern.wMode - + " when a host is not a member of a replica set" ); + if (writeConcern.wNumNodes > 1) { + return Status(ErrorCodes::BadValue, + string("cannot use 'w' > 1 ") + (isConfigServer + ? "on a config server host" + : "when a host is not replicated")); } + } - return Status::OK(); + if (replMode != repl::ReplicationCoordinator::modeReplSet && !writeConcern.wMode.empty() && + writeConcern.wMode != WriteConcernOptions::kMajority) { + return Status(ErrorCodes::BadValue, + string("cannot use non-majority 'w' mode ") + writeConcern.wMode + + " when a host is not a member of a replica set"); } - void WriteConcernResult::appendTo( const WriteConcernOptions& writeConcern, - BSONObjBuilder* result ) const { + return Status::OK(); +} - if ( syncMillis >= 0 ) - result->appendNumber( "syncMillis", syncMillis ); +void WriteConcernResult::appendTo(const WriteConcernOptions& writeConcern, + BSONObjBuilder* result) const { + if (syncMillis >= 0) + result->appendNumber("syncMillis", syncMillis); - if ( fsyncFiles >= 0 ) - result->appendNumber( "fsyncFiles", fsyncFiles ); + if (fsyncFiles >= 0) + result->appendNumber("fsyncFiles", fsyncFiles); - if ( wTime >= 0 ) { - if ( wTimedOut ) - result->appendNumber( "waited", wTime ); - else - result->appendNumber( "wtime", wTime ); - } + if (wTime >= 0) { + if (wTimedOut) + result->appendNumber("waited", wTime); + else + result->appendNumber("wtime", wTime); + } - if ( wTimedOut ) - result->appendBool( "wtimeout", true ); + if (wTimedOut) + result->appendBool("wtimeout", true); - if (writtenTo.size()) { - BSONArrayBuilder hosts(result->subarrayStart("writtenTo")); - for (size_t i = 0; i < writtenTo.size(); ++i) { - hosts.append(writtenTo[i].toString()); - } - } - else { - result->appendNull( "writtenTo" ); + if (writtenTo.size()) { + BSONArrayBuilder hosts(result->subarrayStart("writtenTo")); + for (size_t i = 0; i < writtenTo.size(); ++i) { + hosts.append(writtenTo[i].toString()); } + } else { + result->appendNull("writtenTo"); + } - if ( err.empty() ) - result->appendNull( "err" ); - else - result->append( "err", err ); - - // *** 2.4 SyncClusterConnection compatibility *** - // 2.4 expects either fsync'd files, or a "waited" field exist after running an fsync : true - // GLE, but with journaling we don't actually need to run the fsync (fsync command is - // preferred in 2.6). So we add a "waited" field if one doesn't exist. - - if ( writeConcern.syncMode == WriteConcernOptions::FSYNC ) { + if (err.empty()) + result->appendNull("err"); + else + result->append("err", err); - if ( fsyncFiles < 0 && ( wTime < 0 || !wTimedOut ) ) { - dassert( result->asTempObj()["waited"].eoo() ); - result->appendNumber( "waited", syncMillis ); - } + // *** 2.4 SyncClusterConnection compatibility *** + // 2.4 expects either fsync'd files, or a "waited" field exist after running an fsync : true + // GLE, but with journaling we don't actually need to run the fsync (fsync command is + // preferred in 2.6). So we add a "waited" field if one doesn't exist. - dassert( result->asTempObj()["fsyncFiles"].numberInt() > 0 || - !result->asTempObj()["waited"].eoo() ); + if (writeConcern.syncMode == WriteConcernOptions::FSYNC) { + if (fsyncFiles < 0 && (wTime < 0 || !wTimedOut)) { + dassert(result->asTempObj()["waited"].eoo()); + result->appendNumber("waited", syncMillis); } - } - Status waitForWriteConcern( OperationContext* txn, - const OpTime& replOpTime, - WriteConcernResult* result ) { + dassert(result->asTempObj()["fsyncFiles"].numberInt() > 0 || + !result->asTempObj()["waited"].eoo()); + } +} - const WriteConcernOptions& writeConcern = txn->getWriteConcern(); +Status waitForWriteConcern(OperationContext* txn, + const OpTime& replOpTime, + WriteConcernResult* result) { + const WriteConcernOptions& writeConcern = txn->getWriteConcern(); - // We assume all options have been validated earlier, if not, programming error - dassert( validateWriteConcern( writeConcern ).isOK() ); + // We assume all options have been validated earlier, if not, programming error + dassert(validateWriteConcern(writeConcern).isOK()); - // Next handle blocking on disk + // Next handle blocking on disk - Timer syncTimer; + Timer syncTimer; - switch( writeConcern.syncMode ) { + switch (writeConcern.syncMode) { case WriteConcernOptions::NONE: break; case WriteConcernOptions::FSYNC: { StorageEngine* storageEngine = getGlobalServiceContext()->getGlobalStorageEngine(); - if ( !storageEngine->isDurable() ) { - result->fsyncFiles = storageEngine->flushAllFiles( true ); - } - else { + if (!storageEngine->isDurable()) { + result->fsyncFiles = storageEngine->flushAllFiles(true); + } else { // We only need to commit the journal if we're durable txn->recoveryUnit()->waitUntilDurable(); } @@ -239,40 +227,38 @@ namespace mongo { case WriteConcernOptions::JOURNAL: txn->recoveryUnit()->waitUntilDurable(); break; - } - - result->syncMillis = syncTimer.millis(); + } - // Now wait for replication + result->syncMillis = syncTimer.millis(); - if (replOpTime.isNull()) { - // no write happened for this client yet - return Status::OK(); - } + // Now wait for replication - // needed to avoid incrementing gleWtimeStats SERVER-9005 - if (writeConcern.wNumNodes <= 1 && writeConcern.wMode.empty()) { - // no desired replication check - return Status::OK(); - } + if (replOpTime.isNull()) { + // no write happened for this client yet + return Status::OK(); + } - // Now we wait for replication - // Note that replica set stepdowns and gle mode changes are thrown as errors - repl::ReplicationCoordinator::StatusAndDuration replStatus = - repl::getGlobalReplicationCoordinator()->awaitReplication(txn, - replOpTime, - writeConcern); - if (replStatus.status == ErrorCodes::WriteConcernFailed) { - gleWtimeouts.increment(); - result->err = "timeout"; - result->wTimedOut = true; - } - // Add stats - result->writtenTo = repl::getGlobalReplicationCoordinator()->getHostsWrittenTo(replOpTime); - gleWtimeStats.recordMillis(replStatus.duration.count()); - result->wTime = replStatus.duration.count(); + // needed to avoid incrementing gleWtimeStats SERVER-9005 + if (writeConcern.wNumNodes <= 1 && writeConcern.wMode.empty()) { + // no desired replication check + return Status::OK(); + } - return replStatus.status; + // Now we wait for replication + // Note that replica set stepdowns and gle mode changes are thrown as errors + repl::ReplicationCoordinator::StatusAndDuration replStatus = + repl::getGlobalReplicationCoordinator()->awaitReplication(txn, replOpTime, writeConcern); + if (replStatus.status == ErrorCodes::WriteConcernFailed) { + gleWtimeouts.increment(); + result->err = "timeout"; + result->wTimedOut = true; } + // Add stats + result->writtenTo = repl::getGlobalReplicationCoordinator()->getHostsWrittenTo(replOpTime); + gleWtimeStats.recordMillis(replStatus.duration.count()); + result->wTime = replStatus.duration.count(); + + return replStatus.status; +} -} // namespace mongo +} // namespace mongo |