summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authorEliot Horowitz <eliot@10gen.com>2013-11-28 00:21:23 -0500
committerEliot Horowitz <eliot@10gen.com>2013-12-03 21:24:55 -0500
commitff4308beb1716d245779effef81acdef7686cc2c (patch)
tree428629cfd2140072377d66738348944f3f5f8b92 /src/mongo/db
parent4fa4012a60147450b8c201ebb90687596fe673d5 (diff)
downloadmongo-ff4308beb1716d245779effef81acdef7686cc2c.tar.gz
SERVER-11665: refactor waitForWriteConcern
new structs instead of BSONObj
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/commands/get_last_error.cpp23
-rw-r--r--src/mongo/db/commands/write_commands/batch_executor.cpp54
-rw-r--r--src/mongo/db/dur.h4
-rw-r--r--src/mongo/db/repl/write_concern.cpp9
-rw-r--r--src/mongo/db/repl/write_concern.h1
-rw-r--r--src/mongo/db/write_concern.cpp283
-rw-r--r--src/mongo/db/write_concern.h59
7 files changed, 264 insertions, 169 deletions
diff --git a/src/mongo/db/commands/get_last_error.cpp b/src/mongo/db/commands/get_last_error.cpp
index e60b1e582f1..7d4f2fc4c82 100644
--- a/src/mongo/db/commands/get_last_error.cpp
+++ b/src/mongo/db/commands/get_last_error.cpp
@@ -94,14 +94,12 @@ namespace mongo {
bool run(const string& dbname, BSONObj& _cmdObj, int, string& errmsg, BSONObjBuilder& result, bool fromRepl) {
LastError *le = lastError.disableForCommand();
- bool err = false;
-
if ( le->nPrev != 1 ) {
- err = LastError::noError.appendSelf( result , false );
+ LastError::noError.appendSelf( result , false );
le->appendSelfStatus( result );
}
else {
- err = le->appendSelf( result , false );
+ le->appendSelf( result , false );
}
Client& c = cc();
@@ -121,7 +119,22 @@ namespace mongo {
}
}
- return waitForWriteConcern(cmdObj, err, &result, &errmsg);
+ WriteConcernOptions writeConcern;
+ Status s = writeConcern.parse( cmdObj );
+ if ( !s.isOK() ) {
+ result.append( "badGLE", cmdObj );
+ errmsg = s.toString();
+ return false;
+ }
+
+ WriteConcernResult res;
+ s = waitForWriteConcern( cc(), writeConcern, &res );
+ res.appendTo( &result );
+ if ( !s.isOK() ) {
+ errmsg = s.toString();
+ return false;
+ }
+ return true;
}
} cmdGetLastError;
diff --git a/src/mongo/db/commands/write_commands/batch_executor.cpp b/src/mongo/db/commands/write_commands/batch_executor.cpp
index 0b4c7881963..0d9de00c651 100644
--- a/src/mongo/db/commands/write_commands/batch_executor.cpp
+++ b/src/mongo/db/commands/write_commands/batch_executor.cpp
@@ -59,27 +59,28 @@ namespace mongo {
: _defaultWriteConcern(wc), _client( client ), _opCounters( opCounters ), _le( le ) {
}
- static void maybeBuildWCError( const BSONObj& wcResult,
- const string& wcErrMsg,
+ static void maybeBuildWCError( const Status& wcStatus,
+ const WriteConcernResult& wcResult,
BatchedCommandResponse* response ) {
// Error reported is either the errmsg or err from wc
string errMsg;
- if ( !wcErrMsg.empty() ) errMsg = wcErrMsg;
- else if ( wcResult["err"].type() == String ) errMsg = wcResult["err"].String();
+ if ( !wcStatus.isOK() )
+ errMsg = wcStatus.toString();
+ else if ( wcResult.err.size() )
+ errMsg = wcResult.err;
- // Sometimes the jnote/wnote has more error info
- if ( !errMsg.empty() && wcResult["jnote"].type() == String ) {
- errMsg = wcResult["jnote"].String();
- }
- if ( !errMsg.empty() && wcResult["wnote"].type() == String ) {
- errMsg = wcResult["wnote"].String();
- }
+ if ( errMsg.empty() )
+ return;
- if ( errMsg.empty() ) return;
+ if ( wcStatus.isOK() )
+ response->setErrCode( ErrorCodes::WriteConcernFailed );
+ else
+ response->setErrCode( wcStatus.code() );
+
+ if ( wcResult.wTimedOut )
+ response->setErrInfo( BSON( "wtimeout" << true ) );
- response->setErrCode( ErrorCodes::WriteConcernFailed );
- if ( wcResult["wtimeout"].trueValue() ) response->setErrInfo( BSON( "wtimeout" << true ) );
response->setErrMessage( errMsg );
}
@@ -170,23 +171,24 @@ namespace mongo {
// Apply write concern if we had any successful writes
if ( numItemErrors < numBatchItems ) {
- BSONObj writeConcern;
+ WriteConcernOptions writeConcern;
+ Status s = Status::OK();
if ( request.isWriteConcernSet() ) {
- writeConcern = request.getWriteConcern();
+ s = writeConcern.parse( request.getWriteConcern() );
}
else {
- writeConcern = _defaultWriteConcern;
+ s = writeConcern.parse( _defaultWriteConcern );
}
- string errMsg;
- BSONObjBuilder wcResultsB;
- waitForWriteConcern( writeConcern,
- false /* always wait for secondaries since we wrote something */,
- &wcResultsB,
- &errMsg );
-
- maybeBuildWCError( wcResultsB.obj(), errMsg, response );
- // TODO: save writtenTo, waitedJournal/waitedRepl stats
+ if ( !s.isOK() ) {
+ response->setErrCode( s.code() );
+ response->setErrMessage( s.toString() );
+ }
+ else {
+ WriteConcernResult res;
+ s = waitForWriteConcern( cc(), writeConcern, &res );
+ maybeBuildWCError( s, res, response );
+ }
}
// Set the main body of the response. We assume that, if there was an error, the error
diff --git a/src/mongo/db/dur.h b/src/mongo/db/dur.h
index 2ae50ef62d7..4731ce16091 100644
--- a/src/mongo/db/dur.h
+++ b/src/mongo/db/dur.h
@@ -169,6 +169,8 @@ namespace mongo {
*/
virtual void syncDataAndTruncateJournal() = 0;
+ virtual bool isDurable() const = 0;
+
static DurableInterface& getDur() { return *_impl; }
private:
@@ -202,6 +204,7 @@ namespace mongo {
bool commitIfNeeded(bool) { return false; }
bool aCommitIsNeeded() const { return false; }
void syncDataAndTruncateJournal() {}
+ bool isDurable() const { return false; }
};
class DurableImpl : public DurableInterface {
@@ -216,6 +219,7 @@ namespace mongo {
bool aCommitIsNeeded() const;
bool commitIfNeeded(bool);
void syncDataAndTruncateJournal();
+ bool isDurable() const { return true; }
};
} // namespace dur
diff --git a/src/mongo/db/repl/write_concern.cpp b/src/mongo/db/repl/write_concern.cpp
index 4cc8f6db08d..a7cd53c48d9 100644
--- a/src/mongo/db/repl/write_concern.cpp
+++ b/src/mongo/db/repl/write_concern.cpp
@@ -173,7 +173,10 @@ namespace mongo {
return false;
}
- string wStr = w.String();
+ return opReplicatedEnough( op, w.String() );
+ }
+
+ bool opReplicatedEnough( OpTime op , const string& wStr ) {
if (wStr == "majority") {
// use the entire set, including arbiters, to prevent writing
// to a majority of the set but not a majority of voters
@@ -337,6 +340,10 @@ namespace mongo {
return slaveTracking.replicatedToNum( op , w );
}
+ bool opReplicatedEnough( OpTime op , const string& w ) {
+ return slaveTracking.opReplicatedEnough( op , w );
+ }
+
bool waitForReplication( OpTime op , int w , int maxSecondsToWait ) {
return slaveTracking.waitForReplication( op, w, maxSecondsToWait );
}
diff --git a/src/mongo/db/repl/write_concern.h b/src/mongo/db/repl/write_concern.h
index 7e3c98f7200..3af0f3ad8c2 100644
--- a/src/mongo/db/repl/write_concern.h
+++ b/src/mongo/db/repl/write_concern.h
@@ -47,6 +47,7 @@ namespace mongo {
/** @return true if op has made it to w servers */
bool opReplicatedEnough( OpTime op , int w );
+ bool opReplicatedEnough( OpTime op , const string& w );
bool opReplicatedEnough( OpTime op , BSONElement w );
bool waitForReplication( OpTime op , int w , int maxSecondsToWait );
diff --git a/src/mongo/db/write_concern.cpp b/src/mongo/db/write_concern.cpp
index 331525e946b..69b0468bd67 100644
--- a/src/mongo/db/write_concern.cpp
+++ b/src/mongo/db/write_concern.cpp
@@ -34,6 +34,7 @@
#include "mongo/db/repl/replication_server_status.h"
#include "mongo/db/repl/write_concern.h"
#include "mongo/db/stats/timer_stats.h"
+#include "mongo/db/write_concern.h"
namespace mongo {
@@ -43,159 +44,187 @@ namespace mongo {
static Counter64 gleWtimeouts;
static ServerStatusMetricField<Counter64> gleWtimeoutsDisplay( "getLastError.wtimeouts", &gleWtimeouts );
- bool waitForWriteConcern(const BSONObj& cmdObj,
- bool err,
- BSONObjBuilder* result,
- string* errmsg) {
- Client& c = cc();
-
- if ( cmdObj["j"].trueValue() ) {
- if( !getDur().awaitCommit() ) {
- // --journal is off
- result->append("jnote", "journaling not enabled on this server");
- // Set the err field, if the result document doesn't already have it set.
- if ( !err ) {
- result->append( "err", "nojournal" );
- }
- return true;
- }
- if( cmdObj["fsync"].trueValue() ) {
- *errmsg = "fsync and j options cannot be used together";
- return false;
- }
+ Status WriteConcernOptions::parse( const BSONObj& obj ) {
+ bool j = obj["j"].trueValue();
+ bool fsync = obj["fsync"].trueValue();
+
+ if ( j & fsync )
+ return Status( ErrorCodes::BadValue, "fsync and j options cannot be used together" );
+
+ if ( j ) {
+ syncMode = JOURNAL;
}
- else if ( cmdObj["fsync"].trueValue() ) {
- Timer t;
- if( !getDur().awaitCommit() ) {
- // if get here, not running with --journal
- log() << "fsync from getlasterror" << endl;
- result->append( "fsyncFiles" , MemoryMappedFile::flushAll( true ) );
- }
- else {
- // this perhaps is temp. how long we wait for the group commit to occur.
- result->append( "waited", t.millis() );
- }
+ if ( fsync ) {
+ if ( getDur().isDurable() )
+ syncMode = JOURNAL;
+ else
+ syncMode = FSYNC;
}
- if ( err ) {
- // doesn't make sense to wait for replication
- // if there was an error
- return true;
+
+ BSONElement e = obj["w"];
+ if ( e.isNumber() ) {
+ wNumNodes = e.numberInt();
+ }
+ else if ( e.type() == String ) {
+ wMode = e.valuestrsafe();
+ }
+ else if ( e.eoo() ||
+ e.type() == jstNULL ||
+ e.type() == Undefined ) {
+ }
+ else {
+ return Status( ErrorCodes::BadValue, "w has to be a number or a string" );
}
- BSONElement e = cmdObj["w"];
- if ( e.ok() ) {
+ wTimeout = obj["wtimeout"].numberInt();
- if (serverGlobalParams.configsvr && (!e.isNumber() || e.numberInt() > 1)) {
- // w:1 on config servers should still work, but anything greater than that
- // should not.
- result->append( "wnote", "can't use w on config servers" );
- result->append( "err", "norepl" );
- return true;
- }
+ return Status::OK();
+ }
- int timeout = cmdObj["wtimeout"].numberInt();
- scoped_ptr<TimerHolder> gleTimerHolder;
- bool doTiming = false;
- if ( e.isNumber() ) {
- doTiming = e.numberInt() > 1;
- }
- else if ( e.type() == String ) {
- doTiming = true;
- }
- if ( doTiming ) {
- gleTimerHolder.reset( new TimerHolder( &gleWtimeStats ) );
- }
- else {
- gleTimerHolder.reset( new TimerHolder( NULL ) );
+ void WriteConcernResult::appendTo( BSONObjBuilder* result ) const {
+ if ( fsyncFiles >= 0 )
+ result->appendNumber( "fsyncFiles", fsyncFiles );
+
+ if ( wTime >= 0 ) {
+ if ( wTimedOut )
+ result->appendNumber( "waited", wTime );
+ else
+ result->appendNumber( "wtime", wTime );
+ }
+
+ if ( wTimedOut )
+ result->appendBool( "wtimeout", true );
+
+ if ( writtenTo.size() )
+ result->append( "writtenTo", writtenTo );
+ else
+ result->appendNull( "writtenTo" );
+
+ if ( err.empty() )
+ result->appendNull( "err" );
+ else
+ result->append( "err", err );
+ }
+
+ Status waitForWriteConcern(Client& client,
+ const WriteConcernOptions& writeConcern,
+ WriteConcernResult* result ) {
+ // first handle blocking on disk
+
+ switch( writeConcern.syncMode ) {
+ case WriteConcernOptions::NONE:
+ break;
+ case WriteConcernOptions::JOURNAL:
+ if ( getDur().awaitCommit() ) {
+ // success
+ break;
}
+ result->err = "nojournal";
+ return Status( ErrorCodes::BadValue, "journalling not enabled" );
+ case WriteConcernOptions::FSYNC:
+ result->fsyncFiles = MemoryMappedFile::flushAll( true );
+ break;
+ }
- long long passes = 0;
- char buf[32];
- OpTime op(c.getLastOp());
+ // now wait for replication
- if ( op.isNull() ) {
- if ( anyReplEnabled() ) {
- result->append( "wnote" , "no write has been done on this connection" );
- }
- else if ( e.isNumber() && e.numberInt() <= 1 ) {
- // don't do anything
- // w=1 and no repl, so this is fine
- }
- else if (e.type() == mongo::String &&
- str::equals(e.valuestrsafe(), "majority")) {
- // don't do anything
- // w=majority and no repl, so this is fine
- }
- else {
- // w=2 and no repl
- stringstream errmsg;
- errmsg << "no replication has been enabled, so w=" <<
- e.toString(false) << " won't work";
- result->append( "wnote" , errmsg.str() );
- result->append( "err", "norepl" );
- return true;
- }
+ if ( writeConcern.wNumNodes <= 0 &&
+ writeConcern.wMode.empty() ) {
+ // w settings, all done
+ return Status::OK();
+ }
- result->appendNull( "err" );
- return true;
+ if ( serverGlobalParams.configsvr ) {
+ // config servers have special rules
+ if ( writeConcern.wNumNodes > 1 ) {
+ result->err = "norepl";
+ return Status( ErrorCodes::BadValue, "cannot use w > 1 with config servers" );
+ }
+ if ( writeConcern.wMode == "majority" ) {
+ return Status::OK();
}
+ result->err = "norepl";
+ return Status( ErrorCodes::BadValue, "unknown w mode for config servers" );
+ }
- if ( !theReplSet && !e.isNumber() ) {
- // For master/slave deployments that receive w:"majority" or some other named
- // write concern mode, treat it like w:1 and include a note.
- result->append( "wnote", "cannot use non integer w values for non-replica sets" );
- result->appendNull( "err" );
- return true;
+ if ( !anyReplEnabled() ) {
+ // no replication enabled and not a config server
+ // so we handle some simple things, or fail
+
+ if ( writeConcern.wNumNodes > 1 ) {
+ result->err = "norepl";
+ return Status( ErrorCodes::BadValue, "no replication and asked for w > 1" );
+ }
+ if ( !writeConcern.wMode.empty() &&
+ writeConcern.wMode != "majority" ) {
+ result->err = "norepl";
+ return Status( ErrorCodes::BadValue, "no replication and asked for w with a mode" );
}
- while ( 1 ) {
+ // asked for w <= 1 or w=majority
+ // so we can just say ok
+ return Status::OK();
+ }
- if ( !_isMaster() ) {
- // this should be in the while loop in case we step down
- *errmsg = "not master";
- result->append( "wnote", "no longer primary" );
- result->append( "code" , 10990 );
- return false;
- }
+ bool doTiming = writeConcern.wNumNodes > 1 || !writeConcern.wMode.empty();
+ scoped_ptr<TimerHolder> gleTimerHolder( new TimerHolder( doTiming ? &gleWtimeStats : NULL ) );
- // check this first for w=0 or w=1
- if ( opReplicatedEnough( op, e ) ) {
- break;
- }
+ OpTime op( client.getLastOp() );
- // if replication isn't enabled (e.g., config servers)
- if ( ! anyReplEnabled() ) {
- result->append( "err", "norepl" );
- return true;
- }
+ if ( op.isNull() ) {
+ // no write happened for this client yet
+ return Status::OK();
+ }
+
+ if ( !writeConcern.wMode.empty() && !theReplSet ) {
+ return Status( ErrorCodes::BadValue, "asked for a w mode with master/slave" );
+ }
+ // now that we've done the prep, now we actually wait
+ char buf[32]; // for messages
+ long long passes = 0;
+ while ( 1 ) {
- if ( timeout > 0 && gleTimerHolder->millis() >= timeout ) {
- gleWtimeouts.increment();
- result->append( "wtimeout" , true );
- *errmsg = "timed out waiting for slaves";
- result->append( "waited" , gleTimerHolder->millis() );
- result->append("writtenTo", getHostsWrittenTo(op));
- result->append( "err" , "timeout" );
- return true;
- }
+ if ( !_isMaster() ) {
+ // this should be in the while loop in case we step down
+ return Status( DBException::convertExceptionCode(10990), "no longer primary" );
+ }
- verify( sprintf( buf , "w block pass: %lld" , ++passes ) < 30 );
- c.curop()->setMessage( buf );
- sleepmillis(1);
- killCurrentOp.checkForInterrupt();
+ // check this first for w=0 or w=1
+ if ( writeConcern.wNumNodes > 0 ) {
+ if ( opReplicatedEnough( op, writeConcern.wNumNodes ) ) {
+ break;
+ }
+ }
+ else if ( opReplicatedEnough( op, writeConcern.wMode ) ) {
+ break;
}
- if ( doTiming ) {
- result->append("writtenTo", getHostsWrittenTo(op));
- int myMillis = gleTimerHolder->recordMillis();
- result->appendNumber( "wtime" , myMillis );
+ if ( writeConcern.wTimeout > 0 &&
+ gleTimerHolder->millis() >= writeConcern.wTimeout ) {
+ gleWtimeouts.increment();
+ result->wTime = gleTimerHolder->millis();
+ result->writtenTo = getHostsWrittenTo( op );
+ result->err = "timeout";
+ result->wTimedOut = true;
+ // this command returns OK because it worked
+ // so you have to check result to see if there was a real timeout
+ return Status::OK();
}
+
+ verify( sprintf( buf , "w block pass: %lld" , ++passes ) < 30 );
+ client.curop()->setMessage( buf );
+ sleepmillis(1);
+ killCurrentOp.checkForInterrupt();
+ }
+
+ if ( doTiming ) {
+ result->writtenTo = getHostsWrittenTo(op);
+ result->wTime = gleTimerHolder->recordMillis();
}
- result->appendNull( "err" );
- return true;
+ return Status::OK();
}
} // namespace mongo
diff --git a/src/mongo/db/write_concern.h b/src/mongo/db/write_concern.h
index 52dd0b0511e..e5cb4138d87 100644
--- a/src/mongo/db/write_concern.h
+++ b/src/mongo/db/write_concern.h
@@ -30,15 +30,54 @@
namespace mongo {
- /**
- * Helper method for commands to call. Blocks until write concern (as specified in "cmdObj")
- * is satisfied. "err" should be set to true if the last operation succeeded, otherwise false.
- * "result" will be filled with write concern results. Returns false and sets "errmsg" on
- * failure.
- */
- bool waitForWriteConcern(const BSONObj& cmdObj,
- bool err,
- BSONObjBuilder* result,
- string* errmsg);
+ struct WriteConcernOptions {
+
+ WriteConcernOptions() { reset(); }
+
+ Status parse( const BSONObj& obj );
+
+ void reset() {
+ syncMode = NONE;
+ wNumNodes = 0;
+ wMode = "";
+ wTimeout = 0;
+ }
+
+ enum SyncMode { NONE, FSYNC, JOURNAL } syncMode;
+
+ int wNumNodes;
+ string wMode;
+
+ int wTimeout;
+
+ };
+
+ struct WriteConcernResult {
+ WriteConcernResult() {
+ reset();
+ }
+
+ void reset() {
+ fsyncFiles = -1;
+ wTimedOut = false;
+ wTime = -1;
+ err = "";
+ }
+
+ void appendTo( BSONObjBuilder* result ) const;
+
+ int fsyncFiles;
+
+ bool wTimedOut;
+ int wTime;
+ vector<BSONObj> writtenTo;
+
+ string err; // this is the old err field, should deprecate
+ };
+
+ Status waitForWriteConcern(Client& client,
+ const WriteConcernOptions& writeConcern,
+ WriteConcernResult* result );
+
} // namespace mongo