diff options
author | Eliot Horowitz <eliot@10gen.com> | 2013-11-28 00:21:23 -0500 |
---|---|---|
committer | Eliot Horowitz <eliot@10gen.com> | 2013-12-03 21:24:55 -0500 |
commit | ff4308beb1716d245779effef81acdef7686cc2c (patch) | |
tree | 428629cfd2140072377d66738348944f3f5f8b92 /src | |
parent | 4fa4012a60147450b8c201ebb90687596fe673d5 (diff) | |
download | mongo-ff4308beb1716d245779effef81acdef7686cc2c.tar.gz |
SERVER-11665: refactor waitForWriteConcern
new structs instead of BSONObj
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/commands/get_last_error.cpp | 23 | ||||
-rw-r--r-- | src/mongo/db/commands/write_commands/batch_executor.cpp | 54 | ||||
-rw-r--r-- | src/mongo/db/dur.h | 4 | ||||
-rw-r--r-- | src/mongo/db/repl/write_concern.cpp | 9 | ||||
-rw-r--r-- | src/mongo/db/repl/write_concern.h | 1 | ||||
-rw-r--r-- | src/mongo/db/write_concern.cpp | 283 | ||||
-rw-r--r-- | src/mongo/db/write_concern.h | 59 |
7 files changed, 264 insertions, 169 deletions
diff --git a/src/mongo/db/commands/get_last_error.cpp b/src/mongo/db/commands/get_last_error.cpp index e60b1e582f1..7d4f2fc4c82 100644 --- a/src/mongo/db/commands/get_last_error.cpp +++ b/src/mongo/db/commands/get_last_error.cpp @@ -94,14 +94,12 @@ namespace mongo { bool run(const string& dbname, BSONObj& _cmdObj, int, string& errmsg, BSONObjBuilder& result, bool fromRepl) { LastError *le = lastError.disableForCommand(); - bool err = false; - if ( le->nPrev != 1 ) { - err = LastError::noError.appendSelf( result , false ); + LastError::noError.appendSelf( result , false ); le->appendSelfStatus( result ); } else { - err = le->appendSelf( result , false ); + le->appendSelf( result , false ); } Client& c = cc(); @@ -121,7 +119,22 @@ namespace mongo { } } - return waitForWriteConcern(cmdObj, err, &result, &errmsg); + WriteConcernOptions writeConcern; + Status s = writeConcern.parse( cmdObj ); + if ( !s.isOK() ) { + result.append( "badGLE", cmdObj ); + errmsg = s.toString(); + return false; + } + + WriteConcernResult res; + s = waitForWriteConcern( cc(), writeConcern, &res ); + res.appendTo( &result ); + if ( !s.isOK() ) { + errmsg = s.toString(); + return false; + } + return true; } } cmdGetLastError; diff --git a/src/mongo/db/commands/write_commands/batch_executor.cpp b/src/mongo/db/commands/write_commands/batch_executor.cpp index 0b4c7881963..0d9de00c651 100644 --- a/src/mongo/db/commands/write_commands/batch_executor.cpp +++ b/src/mongo/db/commands/write_commands/batch_executor.cpp @@ -59,27 +59,28 @@ namespace mongo { : _defaultWriteConcern(wc), _client( client ), _opCounters( opCounters ), _le( le ) { } - static void maybeBuildWCError( const BSONObj& wcResult, - const string& wcErrMsg, + static void maybeBuildWCError( const Status& wcStatus, + const WriteConcernResult& wcResult, BatchedCommandResponse* response ) { // Error reported is either the errmsg or err from wc string errMsg; - if ( !wcErrMsg.empty() ) errMsg = wcErrMsg; - else if ( wcResult["err"].type() == String ) errMsg = wcResult["err"].String(); + if ( !wcStatus.isOK() ) + errMsg = wcStatus.toString(); + else if ( wcResult.err.size() ) + errMsg = wcResult.err; - // Sometimes the jnote/wnote has more error info - if ( !errMsg.empty() && wcResult["jnote"].type() == String ) { - errMsg = wcResult["jnote"].String(); - } - if ( !errMsg.empty() && wcResult["wnote"].type() == String ) { - errMsg = wcResult["wnote"].String(); - } + if ( errMsg.empty() ) + return; - if ( errMsg.empty() ) return; + if ( wcStatus.isOK() ) + response->setErrCode( ErrorCodes::WriteConcernFailed ); + else + response->setErrCode( wcStatus.code() ); + + if ( wcResult.wTimedOut ) + response->setErrInfo( BSON( "wtimeout" << true ) ); - response->setErrCode( ErrorCodes::WriteConcernFailed ); - if ( wcResult["wtimeout"].trueValue() ) response->setErrInfo( BSON( "wtimeout" << true ) ); response->setErrMessage( errMsg ); } @@ -170,23 +171,24 @@ namespace mongo { // Apply write concern if we had any successful writes if ( numItemErrors < numBatchItems ) { - BSONObj writeConcern; + WriteConcernOptions writeConcern; + Status s = Status::OK(); if ( request.isWriteConcernSet() ) { - writeConcern = request.getWriteConcern(); + s = writeConcern.parse( request.getWriteConcern() ); } else { - writeConcern = _defaultWriteConcern; + s = writeConcern.parse( _defaultWriteConcern ); } - string errMsg; - BSONObjBuilder wcResultsB; - waitForWriteConcern( writeConcern, - false /* always wait for secondaries since we wrote something */, - &wcResultsB, - &errMsg ); - - maybeBuildWCError( wcResultsB.obj(), errMsg, response ); - // TODO: save writtenTo, waitedJournal/waitedRepl stats + if ( !s.isOK() ) { + response->setErrCode( s.code() ); + response->setErrMessage( s.toString() ); + } + else { + WriteConcernResult res; + s = waitForWriteConcern( cc(), writeConcern, &res ); + maybeBuildWCError( s, res, response ); + } } // Set the main body of the response. We assume that, if there was an error, the error diff --git a/src/mongo/db/dur.h b/src/mongo/db/dur.h index 2ae50ef62d7..4731ce16091 100644 --- a/src/mongo/db/dur.h +++ b/src/mongo/db/dur.h @@ -169,6 +169,8 @@ namespace mongo { */ virtual void syncDataAndTruncateJournal() = 0; + virtual bool isDurable() const = 0; + static DurableInterface& getDur() { return *_impl; } private: @@ -202,6 +204,7 @@ namespace mongo { bool commitIfNeeded(bool) { return false; } bool aCommitIsNeeded() const { return false; } void syncDataAndTruncateJournal() {} + bool isDurable() const { return false; } }; class DurableImpl : public DurableInterface { @@ -216,6 +219,7 @@ namespace mongo { bool aCommitIsNeeded() const; bool commitIfNeeded(bool); void syncDataAndTruncateJournal(); + bool isDurable() const { return true; } }; } // namespace dur diff --git a/src/mongo/db/repl/write_concern.cpp b/src/mongo/db/repl/write_concern.cpp index 4cc8f6db08d..a7cd53c48d9 100644 --- a/src/mongo/db/repl/write_concern.cpp +++ b/src/mongo/db/repl/write_concern.cpp @@ -173,7 +173,10 @@ namespace mongo { return false; } - string wStr = w.String(); + return opReplicatedEnough( op, w.String() ); + } + + bool opReplicatedEnough( OpTime op , const string& wStr ) { if (wStr == "majority") { // use the entire set, including arbiters, to prevent writing // to a majority of the set but not a majority of voters @@ -337,6 +340,10 @@ namespace mongo { return slaveTracking.replicatedToNum( op , w ); } + bool opReplicatedEnough( OpTime op , const string& w ) { + return slaveTracking.opReplicatedEnough( op , w ); + } + bool waitForReplication( OpTime op , int w , int maxSecondsToWait ) { return slaveTracking.waitForReplication( op, w, maxSecondsToWait ); } diff --git a/src/mongo/db/repl/write_concern.h b/src/mongo/db/repl/write_concern.h index 7e3c98f7200..3af0f3ad8c2 100644 --- a/src/mongo/db/repl/write_concern.h +++ b/src/mongo/db/repl/write_concern.h @@ -47,6 +47,7 @@ namespace mongo { /** @return true if op has made it to w servers */ bool opReplicatedEnough( OpTime op , int w ); + bool opReplicatedEnough( OpTime op , const string& w ); bool opReplicatedEnough( OpTime op , BSONElement w ); bool waitForReplication( OpTime op , int w , int maxSecondsToWait ); diff --git a/src/mongo/db/write_concern.cpp b/src/mongo/db/write_concern.cpp index 331525e946b..69b0468bd67 100644 --- a/src/mongo/db/write_concern.cpp +++ b/src/mongo/db/write_concern.cpp @@ -34,6 +34,7 @@ #include "mongo/db/repl/replication_server_status.h" #include "mongo/db/repl/write_concern.h" #include "mongo/db/stats/timer_stats.h" +#include "mongo/db/write_concern.h" namespace mongo { @@ -43,159 +44,187 @@ namespace mongo { static Counter64 gleWtimeouts; static ServerStatusMetricField<Counter64> gleWtimeoutsDisplay( "getLastError.wtimeouts", &gleWtimeouts ); - bool waitForWriteConcern(const BSONObj& cmdObj, - bool err, - BSONObjBuilder* result, - string* errmsg) { - Client& c = cc(); - - if ( cmdObj["j"].trueValue() ) { - if( !getDur().awaitCommit() ) { - // --journal is off - result->append("jnote", "journaling not enabled on this server"); - // Set the err field, if the result document doesn't already have it set. - if ( !err ) { - result->append( "err", "nojournal" ); - } - return true; - } - if( cmdObj["fsync"].trueValue() ) { - *errmsg = "fsync and j options cannot be used together"; - return false; - } + Status WriteConcernOptions::parse( const BSONObj& obj ) { + bool j = obj["j"].trueValue(); + bool fsync = obj["fsync"].trueValue(); + + if ( j & fsync ) + return Status( ErrorCodes::BadValue, "fsync and j options cannot be used together" ); + + if ( j ) { + syncMode = JOURNAL; } - else if ( cmdObj["fsync"].trueValue() ) { - Timer t; - if( !getDur().awaitCommit() ) { - // if get here, not running with --journal - log() << "fsync from getlasterror" << endl; - result->append( "fsyncFiles" , MemoryMappedFile::flushAll( true ) ); - } - else { - // this perhaps is temp. how long we wait for the group commit to occur. - result->append( "waited", t.millis() ); - } + if ( fsync ) { + if ( getDur().isDurable() ) + syncMode = JOURNAL; + else + syncMode = FSYNC; } - if ( err ) { - // doesn't make sense to wait for replication - // if there was an error - return true; + + BSONElement e = obj["w"]; + if ( e.isNumber() ) { + wNumNodes = e.numberInt(); + } + else if ( e.type() == String ) { + wMode = e.valuestrsafe(); + } + else if ( e.eoo() || + e.type() == jstNULL || + e.type() == Undefined ) { + } + else { + return Status( ErrorCodes::BadValue, "w has to be a number or a string" ); } - BSONElement e = cmdObj["w"]; - if ( e.ok() ) { + wTimeout = obj["wtimeout"].numberInt(); - if (serverGlobalParams.configsvr && (!e.isNumber() || e.numberInt() > 1)) { - // w:1 on config servers should still work, but anything greater than that - // should not. - result->append( "wnote", "can't use w on config servers" ); - result->append( "err", "norepl" ); - return true; - } + return Status::OK(); + } - int timeout = cmdObj["wtimeout"].numberInt(); - scoped_ptr<TimerHolder> gleTimerHolder; - bool doTiming = false; - if ( e.isNumber() ) { - doTiming = e.numberInt() > 1; - } - else if ( e.type() == String ) { - doTiming = true; - } - if ( doTiming ) { - gleTimerHolder.reset( new TimerHolder( &gleWtimeStats ) ); - } - else { - gleTimerHolder.reset( new TimerHolder( NULL ) ); + void WriteConcernResult::appendTo( BSONObjBuilder* result ) const { + if ( fsyncFiles >= 0 ) + result->appendNumber( "fsyncFiles", fsyncFiles ); + + if ( wTime >= 0 ) { + if ( wTimedOut ) + result->appendNumber( "waited", wTime ); + else + result->appendNumber( "wtime", wTime ); + } + + if ( wTimedOut ) + result->appendBool( "wtimeout", true ); + + if ( writtenTo.size() ) + result->append( "writtenTo", writtenTo ); + else + result->appendNull( "writtenTo" ); + + if ( err.empty() ) + result->appendNull( "err" ); + else + result->append( "err", err ); + } + + Status waitForWriteConcern(Client& client, + const WriteConcernOptions& writeConcern, + WriteConcernResult* result ) { + // first handle blocking on disk + + switch( writeConcern.syncMode ) { + case WriteConcernOptions::NONE: + break; + case WriteConcernOptions::JOURNAL: + if ( getDur().awaitCommit() ) { + // success + break; } + result->err = "nojournal"; + return Status( ErrorCodes::BadValue, "journalling not enabled" ); + case WriteConcernOptions::FSYNC: + result->fsyncFiles = MemoryMappedFile::flushAll( true ); + break; + } - long long passes = 0; - char buf[32]; - OpTime op(c.getLastOp()); + // now wait for replication - if ( op.isNull() ) { - if ( anyReplEnabled() ) { - result->append( "wnote" , "no write has been done on this connection" ); - } - else if ( e.isNumber() && e.numberInt() <= 1 ) { - // don't do anything - // w=1 and no repl, so this is fine - } - else if (e.type() == mongo::String && - str::equals(e.valuestrsafe(), "majority")) { - // don't do anything - // w=majority and no repl, so this is fine - } - else { - // w=2 and no repl - stringstream errmsg; - errmsg << "no replication has been enabled, so w=" << - e.toString(false) << " won't work"; - result->append( "wnote" , errmsg.str() ); - result->append( "err", "norepl" ); - return true; - } + if ( writeConcern.wNumNodes <= 0 && + writeConcern.wMode.empty() ) { + // w settings, all done + return Status::OK(); + } - result->appendNull( "err" ); - return true; + if ( serverGlobalParams.configsvr ) { + // config servers have special rules + if ( writeConcern.wNumNodes > 1 ) { + result->err = "norepl"; + return Status( ErrorCodes::BadValue, "cannot use w > 1 with config servers" ); + } + if ( writeConcern.wMode == "majority" ) { + return Status::OK(); } + result->err = "norepl"; + return Status( ErrorCodes::BadValue, "unknown w mode for config servers" ); + } - if ( !theReplSet && !e.isNumber() ) { - // For master/slave deployments that receive w:"majority" or some other named - // write concern mode, treat it like w:1 and include a note. - result->append( "wnote", "cannot use non integer w values for non-replica sets" ); - result->appendNull( "err" ); - return true; + if ( !anyReplEnabled() ) { + // no replication enabled and not a config server + // so we handle some simple things, or fail + + if ( writeConcern.wNumNodes > 1 ) { + result->err = "norepl"; + return Status( ErrorCodes::BadValue, "no replication and asked for w > 1" ); + } + if ( !writeConcern.wMode.empty() && + writeConcern.wMode != "majority" ) { + result->err = "norepl"; + return Status( ErrorCodes::BadValue, "no replication and asked for w with a mode" ); } - while ( 1 ) { + // asked for w <= 1 or w=majority + // so we can just say ok + return Status::OK(); + } - if ( !_isMaster() ) { - // this should be in the while loop in case we step down - *errmsg = "not master"; - result->append( "wnote", "no longer primary" ); - result->append( "code" , 10990 ); - return false; - } + bool doTiming = writeConcern.wNumNodes > 1 || !writeConcern.wMode.empty(); + scoped_ptr<TimerHolder> gleTimerHolder( new TimerHolder( doTiming ? &gleWtimeStats : NULL ) ); - // check this first for w=0 or w=1 - if ( opReplicatedEnough( op, e ) ) { - break; - } + OpTime op( client.getLastOp() ); - // if replication isn't enabled (e.g., config servers) - if ( ! anyReplEnabled() ) { - result->append( "err", "norepl" ); - return true; - } + if ( op.isNull() ) { + // no write happened for this client yet + return Status::OK(); + } + + if ( !writeConcern.wMode.empty() && !theReplSet ) { + return Status( ErrorCodes::BadValue, "asked for a w mode with master/slave" ); + } + // now that we've done the prep, now we actually wait + char buf[32]; // for messages + long long passes = 0; + while ( 1 ) { - if ( timeout > 0 && gleTimerHolder->millis() >= timeout ) { - gleWtimeouts.increment(); - result->append( "wtimeout" , true ); - *errmsg = "timed out waiting for slaves"; - result->append( "waited" , gleTimerHolder->millis() ); - result->append("writtenTo", getHostsWrittenTo(op)); - result->append( "err" , "timeout" ); - return true; - } + if ( !_isMaster() ) { + // this should be in the while loop in case we step down + return Status( DBException::convertExceptionCode(10990), "no longer primary" ); + } - verify( sprintf( buf , "w block pass: %lld" , ++passes ) < 30 ); - c.curop()->setMessage( buf ); - sleepmillis(1); - killCurrentOp.checkForInterrupt(); + // check this first for w=0 or w=1 + if ( writeConcern.wNumNodes > 0 ) { + if ( opReplicatedEnough( op, writeConcern.wNumNodes ) ) { + break; + } + } + else if ( opReplicatedEnough( op, writeConcern.wMode ) ) { + break; } - if ( doTiming ) { - result->append("writtenTo", getHostsWrittenTo(op)); - int myMillis = gleTimerHolder->recordMillis(); - result->appendNumber( "wtime" , myMillis ); + if ( writeConcern.wTimeout > 0 && + gleTimerHolder->millis() >= writeConcern.wTimeout ) { + gleWtimeouts.increment(); + result->wTime = gleTimerHolder->millis(); + result->writtenTo = getHostsWrittenTo( op ); + result->err = "timeout"; + result->wTimedOut = true; + // this command returns OK because it worked + // so you have to check result to see if there was a real timeout + return Status::OK(); } + + verify( sprintf( buf , "w block pass: %lld" , ++passes ) < 30 ); + client.curop()->setMessage( buf ); + sleepmillis(1); + killCurrentOp.checkForInterrupt(); + } + + if ( doTiming ) { + result->writtenTo = getHostsWrittenTo(op); + result->wTime = gleTimerHolder->recordMillis(); } - result->appendNull( "err" ); - return true; + return Status::OK(); } } // namespace mongo diff --git a/src/mongo/db/write_concern.h b/src/mongo/db/write_concern.h index 52dd0b0511e..e5cb4138d87 100644 --- a/src/mongo/db/write_concern.h +++ b/src/mongo/db/write_concern.h @@ -30,15 +30,54 @@ namespace mongo { - /** - * Helper method for commands to call. Blocks until write concern (as specified in "cmdObj") - * is satisfied. "err" should be set to true if the last operation succeeded, otherwise false. - * "result" will be filled with write concern results. Returns false and sets "errmsg" on - * failure. - */ - bool waitForWriteConcern(const BSONObj& cmdObj, - bool err, - BSONObjBuilder* result, - string* errmsg); + struct WriteConcernOptions { + + WriteConcernOptions() { reset(); } + + Status parse( const BSONObj& obj ); + + void reset() { + syncMode = NONE; + wNumNodes = 0; + wMode = ""; + wTimeout = 0; + } + + enum SyncMode { NONE, FSYNC, JOURNAL } syncMode; + + int wNumNodes; + string wMode; + + int wTimeout; + + }; + + struct WriteConcernResult { + WriteConcernResult() { + reset(); + } + + void reset() { + fsyncFiles = -1; + wTimedOut = false; + wTime = -1; + err = ""; + } + + void appendTo( BSONObjBuilder* result ) const; + + int fsyncFiles; + + bool wTimedOut; + int wTime; + vector<BSONObj> writtenTo; + + string err; // this is the old err field, should deprecate + }; + + Status waitForWriteConcern(Client& client, + const WriteConcernOptions& writeConcern, + WriteConcernResult* result ); + } // namespace mongo |