/**
* Copyright (C) 2013 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#include "mongo/s/write_ops/batch_downconvert.h"
#include "mongo/bson/util/builder.h"
#include "mongo/db/write_concern_options.h"
#include "mongo/util/assert_util.h"
namespace mongo {
Status BatchSafeWriter::extractGLEErrors( const BSONObj& gleResponse, GLEErrors* errors ) {
// DRAGONS
// Parsing GLE responses is incredibly finicky.
// The order of testing here is extremely important.
///////////////////////////////////////////////////////////////////////
// IMPORTANT!
// Also update extractGLEErrors in batch_api.js for any changes made here.
const bool isOK = gleResponse["ok"].trueValue();
const string err = gleResponse["err"].str();
const string errMsg = gleResponse["errmsg"].str();
const string wNote = gleResponse["wnote"].str();
const string jNote = gleResponse["jnote"].str();
const int code = gleResponse["code"].numberInt();
const bool timeout = gleResponse["wtimeout"].trueValue();
if ( err == "norepl" || err == "noreplset" ) {
// Know this is legacy gle and the repl not enforced - write concern error in 2.4
errors->wcError.reset( new WCErrorDetail );
errors->wcError->setErrCode( ErrorCodes::WriteConcernFailed );
if ( !errMsg.empty() ) {
errors->wcError->setErrMessage( errMsg );
}
else if ( !wNote.empty() ) {
errors->wcError->setErrMessage( wNote );
}
else {
errors->wcError->setErrMessage( err );
}
}
else if ( timeout ) {
// Know there was no write error
errors->wcError.reset( new WCErrorDetail );
errors->wcError->setErrCode( ErrorCodes::WriteConcernFailed );
if ( !errMsg.empty() ) {
errors->wcError->setErrMessage( errMsg );
}
else {
errors->wcError->setErrMessage( err );
}
errors->wcError->setErrInfo( BSON( "wtimeout" << true ) );
}
else if ( code == 10990 /* no longer primary */
|| code == 16805 /* replicatedToNum no longer primary */
|| code == 14830 /* gle wmode changed / invalid */
// 2.6 Error codes
|| code == ErrorCodes::NotMaster
|| code == ErrorCodes::UnknownReplWriteConcern
|| code == ErrorCodes::WriteConcernFailed ) {
// Write concern errors that get returned as regular errors (result may not be ok: 1.0)
errors->wcError.reset( new WCErrorDetail );
errors->wcError->setErrCode( code );
errors->wcError->setErrMessage( errMsg );
}
else if ( !isOK ) {
//
// !!! SOME GLE ERROR OCCURRED, UNKNOWN WRITE RESULT !!!
//
return Status( DBException::convertExceptionCode(
code ? code : ErrorCodes::UnknownError ),
errMsg );
}
else if ( !err.empty() ) {
// Write error
errors->writeError.reset( new WriteErrorDetail );
int writeErrorCode = code == 0 ? ErrorCodes::UnknownError : code;
// COMPATIBILITY
// Certain clients expect write commands to always report 11000 for duplicate key
// errors, while legacy GLE can return additional codes.
if ( writeErrorCode == 11001 /* dup key in update */
|| writeErrorCode == 12582 /* dup key capped */) {
writeErrorCode = ErrorCodes::DuplicateKey;
}
errors->writeError->setErrCode( writeErrorCode );
errors->writeError->setErrMessage( err );
}
else if ( !jNote.empty() ) {
// Know this is legacy gle and the journaling not enforced - write concern error in 2.4
errors->wcError.reset( new WCErrorDetail );
errors->wcError->setErrCode( ErrorCodes::WriteConcernFailed );
errors->wcError->setErrMessage( jNote );
}
// See if we had a version error reported as a writeback id - this is the only kind of
// write error where the write concern may still be enforced.
// The actual version that was stale is lost in the writeback itself.
const int opsSinceWriteback = gleResponse["writebackSince"].numberInt();
const bool hadWriteback = !gleResponse["writeback"].eoo();
if ( hadWriteback && opsSinceWriteback == 0 ) {
// We shouldn't have a previous write error
dassert( !errors->writeError.get() );
if ( errors->writeError.get() ) {
// Somehow there was a write error *and* a writeback from the last write
warning() << "both a write error and a writeback were reported "
<< "when processing a legacy write: " << errors->writeError->toBSON()
<< endl;
}
errors->writeError.reset( new WriteErrorDetail );
errors->writeError->setErrCode( ErrorCodes::StaleShardVersion );
errors->writeError->setErrInfo( BSON( "downconvert" << true ) ); // For debugging
errors->writeError->setErrMessage( "shard version was stale" );
}
return Status::OK();
}
void BatchSafeWriter::extractGLEStats( const BSONObj& gleResponse, GLEStats* stats ) {
stats->n = gleResponse["n"].numberInt();
if ( !gleResponse["upserted"].eoo() ) {
stats->upsertedId = gleResponse["upserted"].wrap( "upserted" );
}
if ( gleResponse["lastOp"].type() == Timestamp ) {
stats->lastOp = gleResponse["lastOp"]._opTime();
}
}
static BSONObj fixWCForConfig( const BSONObj& writeConcern ) {
BSONObjBuilder fixedB;
BSONObjIterator it( writeConcern );
while ( it.more() ) {
BSONElement el = it.next();
if ( StringData( el.fieldName() ).compare( "w" ) != 0 ) {
fixedB.append( el );
}
}
return fixedB.obj();
}
void BatchSafeWriter::safeWriteBatch( DBClientBase* conn,
const BatchedCommandRequest& request,
BatchedCommandResponse* response ) {
const NamespaceString nss( request.getNS() );
// N starts at zero, and we add to it for each item
response->setN( 0 );
// GLE path always sets nModified to -1 (sentinel) to indicate we should omit it later.
response->setNModified(-1);
for ( size_t i = 0; i < request.sizeWriteOps(); ++i ) {
// Break on first error if we're ordered
if ( request.getOrdered() && response->isErrDetailsSet() )
break;
BatchItemRef itemRef( &request, static_cast( i ) );
BSONObj gleResult;
GLEErrors errors;
Status status = _safeWriter->safeWrite( conn,
itemRef,
WriteConcernOptions::Acknowledged,
&gleResult );
if ( status.isOK() ) {
status = extractGLEErrors( gleResult, &errors );
}
if ( !status.isOK() ) {
response->clear();
response->setOk( false );
response->setErrCode( ErrorCodes::RemoteResultsUnavailable );
StringBuilder builder;
builder << "could not get write error from safe write";
builder << causedBy( status.toString() );
response->setErrMessage( builder.str() );
return;
}
if ( errors.wcError.get() ) {
response->setWriteConcernError( errors.wcError.release() );
}
//
// STATS HANDLING
//
GLEStats stats;
extractGLEStats( gleResult, &stats );
// Special case for making legacy "n" field result for insert match the write
// command result.
if ( request.getBatchType() == BatchedCommandRequest::BatchType_Insert
&& !errors.writeError.get() ) {
// n is always 0 for legacy inserts.
dassert( stats.n == 0 );
stats.n = 1;
}
response->setN( response->getN() + stats.n );
if ( !stats.upsertedId.isEmpty() ) {
BatchedUpsertDetail* upsertedId = new BatchedUpsertDetail;
upsertedId->setIndex( i );
upsertedId->setUpsertedID( stats.upsertedId );
response->addToUpsertDetails( upsertedId );
}
response->setLastOp( stats.lastOp );
// Save write error
if ( errors.writeError.get() ) {
errors.writeError->setIndex( i );
response->addToErrDetails( errors.writeError.release() );
}
}
//
// WRITE CONCERN ERROR HANDLING
//
// The last write is weird, since we enforce write concern and check the error through
// the same GLE if possible. If the last GLE was an error, the write concern may not
// have been enforced in that same GLE, so we need to send another after resetting the
// error.
BSONObj writeConcern;
if ( request.isWriteConcernSet() ) {
writeConcern = request.getWriteConcern();
// Pre-2.4.2 mongods react badly to 'w' being set on config servers
if ( nss.db() == "config" )
writeConcern = fixWCForConfig( writeConcern );
}
bool needToEnforceWC = WriteConcernOptions::Acknowledged.woCompare(writeConcern) != 0 &&
WriteConcernOptions::Unacknowledged.woCompare(writeConcern) != 0;
if ( needToEnforceWC &&
( !response->isErrDetailsSet() ||
( !request.getOrdered() &&
// Not all errored. Note: implicit response->isErrDetailsSet().
response->sizeErrDetails() < request.sizeWriteOps() ))) {
// Might have gotten a write concern validity error earlier, these are
// enforced even if the wc isn't applied, so we ignore.
response->unsetWriteConcernError();
const string dbName( nss.db().toString() );
Status status( Status::OK() );
if ( response->isErrDetailsSet() ) {
const WriteErrorDetail* lastError = response->getErrDetails().back();
// If last write op was an error.
if ( lastError->getIndex() == static_cast( request.sizeWriteOps() - 1 )) {
// Reset previous errors so we can apply the write concern no matter what
// as long as it is valid.
status = _safeWriter->clearErrors( conn, dbName );
}
}
BSONObj gleResult;
if ( status.isOK() ) {
status = _safeWriter->enforceWriteConcern( conn,
dbName,
writeConcern,
&gleResult );
}
GLEErrors errors;
if ( status.isOK() ) {
status = extractGLEErrors( gleResult, &errors );
}
if ( !status.isOK() ) {
auto_ptr wcError( new WCErrorDetail );
wcError->setErrCode( status.code() );
wcError->setErrMessage( status.reason() );
response->setWriteConcernError( wcError.release() );
}
else if ( errors.wcError.get() ) {
response->setWriteConcernError( errors.wcError.release() );
}
}
response->setOk( true );
dassert( response->isValid( NULL ) );
}
/**
* Suppress the "err" and "code" field if they are coming from a previous write error and
* are not related to write concern. Also removes any write stats information (e.g. "n")
*
* Also, In some cases, 2.4 GLE w/ wOpTime can give us duplicate "err" and "code" fields b/c of
* reporting a previous error. The later field is what we want - dedup and use later field.
*
* Returns the stripped GLE response.
*/
BSONObj BatchSafeWriter::stripNonWCInfo( const BSONObj& gleResponse ) {
BSONObjIterator it( gleResponse );
BSONObjBuilder builder;
BSONElement codeField; // eoo
BSONElement errField; // eoo
while ( it.more() ) {
BSONElement el = it.next();
StringData fieldName( el.fieldName() );
if ( fieldName.compare( "err" ) == 0 ) {
errField = el;
}
else if ( fieldName.compare( "code" ) == 0 ) {
codeField = el;
}
else if ( fieldName.compare( "n" ) == 0 || fieldName.compare( "nModified" ) == 0
|| fieldName.compare( "upserted" ) == 0
|| fieldName.compare( "updatedExisting" ) == 0 ) {
// Suppress field
}
else {
builder.append( el );
}
}
if ( !codeField.eoo() ) {
if ( !gleResponse["ok"].trueValue() ) {
// The last code will be from the write concern
builder.append( codeField );
}
else {
// The code is from a non-wc error on this connection - suppress it
}
}
if ( !errField.eoo() ) {
string err = errField.str();
if ( err == "norepl" || err == "noreplset" || err == "timeout" ) {
// Append err if it's from a write concern issue
builder.append( errField );
}
else {
// Suppress non-write concern err as null, but we need to report null err if ok
if ( gleResponse["ok"].trueValue() )
builder.appendNull( errField.fieldName() );
}
}
return builder.obj();
}
namespace {
/**
* Trivial implementation of a BSON serializable object for backwards-compatibility.
*
* NOTE: This is not a good example of using BSONSerializable. For anything more complex,
* create an implementation with fields defined.
*/
class RawBSONSerializable : public BSONSerializable {
MONGO_DISALLOW_COPYING(RawBSONSerializable);
public:
RawBSONSerializable() {
}
RawBSONSerializable( const BSONObj& doc ) :
_doc( doc ) {
}
bool isValid( std::string* errMsg ) const {
return true;
}
BSONObj toBSON() const {
return _doc;
}
bool parseBSON( const BSONObj& source, std::string* errMsg ) {
_doc = source.getOwned();
return true;
}
void clear() {
_doc = BSONObj();
}
string toString() const {
return toBSON().toString();
}
private:
BSONObj _doc;
};
}
// Adds a wOpTime and a wElectionId field to a set of gle options
static BSONObj buildGLECmdWithOpTime( const BSONObj& gleOptions,
const OpTime& opTime,
const OID& electionId ) {
BSONObjBuilder builder;
BSONObjIterator it( gleOptions );
for ( int i = 0; it.more(); ++i ) {
BSONElement el = it.next();
// Make sure first element is getLastError : 1
if ( i == 0 ) {
StringData elName( el.fieldName() );
if ( !elName.equalCaseInsensitive( "getLastError" ) ) {
builder.append( "getLastError", 1 );
}
}
builder.append( el );
}
builder.appendTimestamp( "wOpTime", opTime.asDate() );
builder.appendOID( "wElectionId", const_cast(&electionId) );
return builder.obj();
}
Status enforceLegacyWriteConcern( MultiCommandDispatch* dispatcher,
const StringData& dbName,
const BSONObj& options,
const HostOpTimeMap& hostOpTimes,
vector* legacyWCResponses ) {
if ( hostOpTimes.empty() ) {
return Status::OK();
}
for ( HostOpTimeMap::const_iterator it = hostOpTimes.begin(); it != hostOpTimes.end();
++it ) {
const ConnectionString& shardEndpoint = it->first;
const HostOpTime hot = it->second;
const OpTime& opTime = hot.opTime;
const OID& electionId = hot.electionId;
LOG( 3 ) << "enforcing write concern " << options << " on " << shardEndpoint.toString()
<< " at opTime " << opTime.toStringPretty() << " with electionID "
<< electionId;
BSONObj gleCmd = buildGLECmdWithOpTime( options, opTime, electionId );
RawBSONSerializable gleCmdSerial( gleCmd );
dispatcher->addCommand( shardEndpoint, dbName, gleCmdSerial );
}
dispatcher->sendAll();
vector failedStatuses;
while ( dispatcher->numPending() > 0 ) {
ConnectionString shardEndpoint;
RawBSONSerializable gleResponseSerial;
Status dispatchStatus = dispatcher->recvAny( &shardEndpoint, &gleResponseSerial );
if ( !dispatchStatus.isOK() ) {
// We need to get all responses before returning
failedStatuses.push_back( dispatchStatus );
continue;
}
BSONObj gleResponse = BatchSafeWriter::stripNonWCInfo( gleResponseSerial.toBSON() );
// Use the downconversion tools to determine if this GLE response is ok, a
// write concern error, or an unknown error we should immediately abort for.
BatchSafeWriter::GLEErrors errors;
Status extractStatus = BatchSafeWriter::extractGLEErrors( gleResponse, &errors );
if ( !extractStatus.isOK() ) {
failedStatuses.push_back( extractStatus );
continue;
}
LegacyWCResponse wcResponse;
wcResponse.shardHost = shardEndpoint.toString();
wcResponse.gleResponse = gleResponse;
if ( errors.wcError.get() ) {
wcResponse.errToReport = errors.wcError->getErrMessage();
}
legacyWCResponses->push_back( wcResponse );
}
if ( failedStatuses.empty() ) {
return Status::OK();
}
StringBuilder builder;
builder << "could not enforce write concern";
for ( vector::const_iterator it = failedStatuses.begin();
it != failedStatuses.end(); ++it ) {
const Status& failedStatus = *it;
if ( it == failedStatuses.begin() ) {
builder << causedBy( failedStatus.toString() );
}
else {
builder << ":: and ::" << failedStatus.toString();
}
}
return Status( failedStatuses.size() == 1u ? failedStatuses.front().code() :
ErrorCodes::MultipleErrorsOccurred,
builder.str() );
}
}