/**
* Copyright (C) 2013 10gen Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#include "mongo/base/counter.h"
#include "mongo/db/commands/server_status.h"
#include "mongo/db/lasterror.h"
#include "mongo/db/kill_current_op.h"
#include "mongo/db/repl/is_master.h"
#include "mongo/db/repl/replication_server_status.h"
#include "mongo/db/repl/write_concern.h"
#include "mongo/db/stats/timer_stats.h"
#include "mongo/db/write_concern.h"
namespace mongo {
static TimerStats gleWtimeStats;
static ServerStatusMetricField displayGleLatency( "getLastError.wtime", &gleWtimeStats );
static Counter64 gleWtimeouts;
static ServerStatusMetricField gleWtimeoutsDisplay( "getLastError.wtimeouts", &gleWtimeouts );
Status WriteConcernOptions::parse( const BSONObj& obj ) {
bool j = obj["j"].trueValue();
bool fsync = obj["fsync"].trueValue();
if ( j & fsync )
return Status( ErrorCodes::BadValue, "fsync and j options cannot be used together" );
if ( j ) {
syncMode = JOURNAL;
}
if ( fsync ) {
if ( getDur().isDurable() )
syncMode = JOURNAL;
else
syncMode = FSYNC;
}
BSONElement e = obj["w"];
if ( e.isNumber() ) {
wNumNodes = e.numberInt();
}
else if ( e.type() == String ) {
wMode = e.valuestrsafe();
}
else if ( e.eoo() ||
e.type() == jstNULL ||
e.type() == Undefined ) {
}
else {
return Status( ErrorCodes::BadValue, "w has to be a number or a string" );
}
wTimeout = obj["wtimeout"].numberInt();
return Status::OK();
}
void WriteConcernResult::appendTo( BSONObjBuilder* result ) const {
if ( syncMillis >= 0 )
result->appendNumber( "syncMillis", syncMillis );
if ( fsyncFiles >= 0 )
result->appendNumber( "fsyncFiles", fsyncFiles );
if ( wTime >= 0 ) {
if ( wTimedOut )
result->appendNumber( "waited", wTime );
else
result->appendNumber( "wtime", wTime );
}
if ( wTimedOut )
result->appendBool( "wtimeout", true );
if ( writtenTo.size() )
result->append( "writtenTo", writtenTo );
else
result->appendNull( "writtenTo" );
if ( err.empty() )
result->appendNull( "err" );
else
result->append( "err", err );
}
Status waitForWriteConcern(Client& client,
const WriteConcernOptions& writeConcern,
WriteConcernResult* result ) {
// first handle blocking on disk
Timer syncTimer;
switch( writeConcern.syncMode ) {
case WriteConcernOptions::NONE:
break;
case WriteConcernOptions::JOURNAL:
if ( getDur().awaitCommit() ) {
// success
break;
}
result->err = "nojournal";
return Status( ErrorCodes::BadValue, "journaling not enabled" );
case WriteConcernOptions::FSYNC:
result->fsyncFiles = MemoryMappedFile::flushAll( true );
break;
}
result->syncMillis = syncTimer.millis();
// now wait for replication
if ( writeConcern.wNumNodes <= 1 &&
writeConcern.wMode.empty() ) {
// no desired replication check
return Status::OK();
}
if ( serverGlobalParams.configsvr ) {
// config servers have special rules
if ( writeConcern.wNumNodes > 1 ) {
result->err = "norepl";
return Status( ErrorCodes::WriteConcernLegacyOK,
"cannot use w > 1 with config servers" );
}
if ( writeConcern.wMode == "majority" ) {
// majority is ok for single nodes, as 1/1 > .5
return Status::OK();
}
result->err = "norepl";
return Status( ErrorCodes::BadValue,
str::stream() << "unknown w mode for config servers "
<< "(" << writeConcern.wMode << ")" );
}
if ( !anyReplEnabled() ) {
// no replication enabled and not a config server
// so we handle some simple things, or fail
if ( writeConcern.wNumNodes > 1 ) {
result->err = "norepl";
return Status( ErrorCodes::WriteConcernLegacyOK,
str::stream() << "no replication and asked for w > 1 "
<< "(" << writeConcern.wNumNodes << ")" );
}
if ( !writeConcern.wMode.empty() &&
writeConcern.wMode != "majority" ) {
result->err = "norepl";
return Status( ErrorCodes::WriteConcernLegacyOK,
"no replication and asked for w with a mode" );
}
// asked for w <= 1 or w=majority
// so we can just say ok
return Status::OK();
}
bool doTiming = writeConcern.wNumNodes > 1 || !writeConcern.wMode.empty();
scoped_ptr gleTimerHolder( new TimerHolder( doTiming ? &gleWtimeStats : NULL ) );
OpTime op( client.getLastOp() );
if ( op.isNull() ) {
// no write happened for this client yet
return Status::OK();
}
if ( !writeConcern.wMode.empty() && !theReplSet ) {
if ( writeConcern.wMode == "majority" ) {
// with master/slave, majority is equivilant to w=1
return Status::OK();
}
return Status( ErrorCodes::BadValue,
str::stream() << "asked for a w mode with master/slave "
<< "[" << writeConcern.wMode << "]" );
}
// now that we've done the prep, now we actually wait
char buf[32]; // for messages
long long passes = 0;
while ( 1 ) {
if ( !_isMaster() ) {
// this should be in the while loop in case we step down
return Status( ErrorCodes::NotMaster, "no longer primary" );
}
if ( writeConcern.wNumNodes > 0 ) {
if ( opReplicatedEnough( op, writeConcern.wNumNodes ) ) {
break;
}
}
else if ( opReplicatedEnough( op, writeConcern.wMode ) ) {
break;
}
if ( writeConcern.wTimeout > 0 &&
gleTimerHolder->millis() >= writeConcern.wTimeout ) {
gleWtimeouts.increment();
result->wTime = gleTimerHolder->millis();
result->writtenTo = getHostsWrittenTo( op );
result->err = "timeout";
result->wTimedOut = true;
return Status( ErrorCodes::WriteConcernLegacyOK,
"waiting for replication timed out" );
}
verify( sprintf( buf , "w block pass: %lld" , ++passes ) < 30 );
client.curop()->setMessage( buf );
sleepmillis(1);
killCurrentOp.checkForInterrupt();
}
if ( doTiming ) {
result->writtenTo = getHostsWrittenTo(op);
result->wTime = gleTimerHolder->recordMillis();
}
return Status::OK();
}
} // namespace mongo