diff options
Diffstat (limited to 'src/mongo/s/write_ops/batch_downconvert.cpp')
-rw-r--r-- | src/mongo/s/write_ops/batch_downconvert.cpp | 448 |
1 files changed, 214 insertions, 234 deletions
diff --git a/src/mongo/s/write_ops/batch_downconvert.cpp b/src/mongo/s/write_ops/batch_downconvert.cpp index 3bcafcf31a1..f786606d9b6 100644 --- a/src/mongo/s/write_ops/batch_downconvert.cpp +++ b/src/mongo/s/write_ops/batch_downconvert.cpp @@ -40,274 +40,254 @@ namespace mongo { - using std::endl; - using std::string; - using std::vector; - - Status extractGLEErrors( const BSONObj& gleResponse, GLEErrors* errors ) { - - // DRAGONS - // Parsing GLE responses is incredibly finicky. - // The order of testing here is extremely important. - - /////////////////////////////////////////////////////////////////////// - // IMPORTANT! - // Also update extractGLEErrors in batch_api.js for any changes made here. - - const bool isOK = gleResponse["ok"].trueValue(); - const string err = gleResponse["err"].str(); - const string errMsg = gleResponse["errmsg"].str(); - const string wNote = gleResponse["wnote"].str(); - const string jNote = gleResponse["jnote"].str(); - const int code = gleResponse["code"].numberInt(); - const bool timeout = gleResponse["wtimeout"].trueValue(); - - if ( err == "norepl" || err == "noreplset" ) { - // Know this is legacy gle and the repl not enforced - write concern error in 2.4 - errors->wcError.reset( new WCErrorDetail ); - errors->wcError->setErrCode( ErrorCodes::WriteConcernFailed ); - if ( !errMsg.empty() ) { - errors->wcError->setErrMessage( errMsg ); - } - else if ( !wNote.empty() ) { - errors->wcError->setErrMessage( wNote ); - } - else { - errors->wcError->setErrMessage( err ); - } - } - else if ( timeout ) { - // Know there was no write error - errors->wcError.reset( new WCErrorDetail ); - errors->wcError->setErrCode( ErrorCodes::WriteConcernFailed ); - if ( !errMsg.empty() ) { - errors->wcError->setErrMessage( errMsg ); - } - else { - errors->wcError->setErrMessage( err ); - } - errors->wcError->setErrInfo( BSON( "wtimeout" << true ) ); - } - else if ( code == 10990 /* no longer primary */ - || code == 16805 /* replicatedToNum no longer primary */ - || code == 14830 /* gle wmode changed / invalid */ - // 2.6 Error codes - || code == ErrorCodes::NotMaster - || code == ErrorCodes::UnknownReplWriteConcern - || code == ErrorCodes::WriteConcernFailed ) { - // Write concern errors that get returned as regular errors (result may not be ok: 1.0) - errors->wcError.reset( new WCErrorDetail ); - errors->wcError->setErrCode( code ); - errors->wcError->setErrMessage( errMsg ); +using std::endl; +using std::string; +using std::vector; + +Status extractGLEErrors(const BSONObj& gleResponse, GLEErrors* errors) { + // DRAGONS + // Parsing GLE responses is incredibly finicky. + // The order of testing here is extremely important. + + /////////////////////////////////////////////////////////////////////// + // IMPORTANT! + // Also update extractGLEErrors in batch_api.js for any changes made here. + + const bool isOK = gleResponse["ok"].trueValue(); + const string err = gleResponse["err"].str(); + const string errMsg = gleResponse["errmsg"].str(); + const string wNote = gleResponse["wnote"].str(); + const string jNote = gleResponse["jnote"].str(); + const int code = gleResponse["code"].numberInt(); + const bool timeout = gleResponse["wtimeout"].trueValue(); + + if (err == "norepl" || err == "noreplset") { + // Know this is legacy gle and the repl not enforced - write concern error in 2.4 + errors->wcError.reset(new WCErrorDetail); + errors->wcError->setErrCode(ErrorCodes::WriteConcernFailed); + if (!errMsg.empty()) { + errors->wcError->setErrMessage(errMsg); + } else if (!wNote.empty()) { + errors->wcError->setErrMessage(wNote); + } else { + errors->wcError->setErrMessage(err); } - else if ( !isOK ) { - - // - // !!! SOME GLE ERROR OCCURRED, UNKNOWN WRITE RESULT !!! - // - - return Status( DBException::convertExceptionCode( - code ? code : ErrorCodes::UnknownError ), - errMsg ); + } else if (timeout) { + // Know there was no write error + errors->wcError.reset(new WCErrorDetail); + errors->wcError->setErrCode(ErrorCodes::WriteConcernFailed); + if (!errMsg.empty()) { + errors->wcError->setErrMessage(errMsg); + } else { + errors->wcError->setErrMessage(err); } - else if ( !err.empty() ) { - // Write error - errors->writeError.reset( new WriteErrorDetail ); - int writeErrorCode = code == 0 ? ErrorCodes::UnknownError : code; - - // COMPATIBILITY - // Certain clients expect write commands to always report 11000 for duplicate key - // errors, while legacy GLE can return additional codes. - if ( writeErrorCode == 11001 /* dup key in update */ - || writeErrorCode == 12582 /* dup key capped */) { - writeErrorCode = ErrorCodes::DuplicateKey; - } - - errors->writeError->setErrCode( writeErrorCode ); - errors->writeError->setErrMessage( err ); - } - else if ( !jNote.empty() ) { - // Know this is legacy gle and the journaling not enforced - write concern error in 2.4 - errors->wcError.reset( new WCErrorDetail ); - errors->wcError->setErrCode( ErrorCodes::WriteConcernFailed ); - errors->wcError->setErrMessage( jNote ); + errors->wcError->setErrInfo(BSON("wtimeout" << true)); + } else if (code == 10990 /* no longer primary */ + || + code == 16805 /* replicatedToNum no longer primary */ + || + code == 14830 /* gle wmode changed / invalid */ + // 2.6 Error codes + || + code == ErrorCodes::NotMaster || code == ErrorCodes::UnknownReplWriteConcern || + code == ErrorCodes::WriteConcernFailed) { + // Write concern errors that get returned as regular errors (result may not be ok: 1.0) + errors->wcError.reset(new WCErrorDetail); + errors->wcError->setErrCode(code); + errors->wcError->setErrMessage(errMsg); + } else if (!isOK) { + // + // !!! SOME GLE ERROR OCCURRED, UNKNOWN WRITE RESULT !!! + // + + return Status(DBException::convertExceptionCode(code ? code : ErrorCodes::UnknownError), + errMsg); + } else if (!err.empty()) { + // Write error + errors->writeError.reset(new WriteErrorDetail); + int writeErrorCode = code == 0 ? ErrorCodes::UnknownError : code; + + // COMPATIBILITY + // Certain clients expect write commands to always report 11000 for duplicate key + // errors, while legacy GLE can return additional codes. + if (writeErrorCode == 11001 /* dup key in update */ + || + writeErrorCode == 12582 /* dup key capped */) { + writeErrorCode = ErrorCodes::DuplicateKey; } - return Status::OK(); + errors->writeError->setErrCode(writeErrorCode); + errors->writeError->setErrMessage(err); + } else if (!jNote.empty()) { + // Know this is legacy gle and the journaling not enforced - write concern error in 2.4 + errors->wcError.reset(new WCErrorDetail); + errors->wcError->setErrCode(ErrorCodes::WriteConcernFailed); + errors->wcError->setErrMessage(jNote); } - /** - * Suppress the "err" and "code" field if they are coming from a previous write error and - * are not related to write concern. Also removes any write stats information (e.g. "n") - * - * Also, In some cases, 2.4 GLE w/ wOpTime can give us duplicate "err" and "code" fields b/c of - * reporting a previous error. The later field is what we want - dedup and use later field. - * - * Returns the stripped GLE response. - */ - BSONObj stripNonWCInfo( const BSONObj& gleResponse ) { - - BSONObjIterator it( gleResponse ); - BSONObjBuilder builder; - - BSONElement codeField; // eoo - BSONElement errField; // eoo - - while ( it.more() ) { - BSONElement el = it.next(); - StringData fieldName( el.fieldName() ); - if ( fieldName.compare( "err" ) == 0 ) { - errField = el; - } - else if ( fieldName.compare( "code" ) == 0 ) { - codeField = el; - } - else if ( fieldName.compare( "n" ) == 0 || fieldName.compare( "nModified" ) == 0 - || fieldName.compare( "upserted" ) == 0 - || fieldName.compare( "updatedExisting" ) == 0 ) { - // Suppress field - } - else { - builder.append( el ); - } - } + return Status::OK(); +} - if ( !codeField.eoo() ) { - if ( !gleResponse["ok"].trueValue() ) { - // The last code will be from the write concern - builder.append( codeField ); - } - else { - // The code is from a non-wc error on this connection - suppress it - } +/** + * Suppress the "err" and "code" field if they are coming from a previous write error and + * are not related to write concern. Also removes any write stats information (e.g. "n") + * + * Also, In some cases, 2.4 GLE w/ wOpTime can give us duplicate "err" and "code" fields b/c of + * reporting a previous error. The later field is what we want - dedup and use later field. + * + * Returns the stripped GLE response. + */ +BSONObj stripNonWCInfo(const BSONObj& gleResponse) { + BSONObjIterator it(gleResponse); + BSONObjBuilder builder; + + BSONElement codeField; // eoo + BSONElement errField; // eoo + + while (it.more()) { + BSONElement el = it.next(); + StringData fieldName(el.fieldName()); + if (fieldName.compare("err") == 0) { + errField = el; + } else if (fieldName.compare("code") == 0) { + codeField = el; + } else if (fieldName.compare("n") == 0 || fieldName.compare("nModified") == 0 || + fieldName.compare("upserted") == 0 || + fieldName.compare("updatedExisting") == 0) { + // Suppress field + } else { + builder.append(el); } + } - if ( !errField.eoo() ) { - string err = errField.str(); - if ( err == "norepl" || err == "noreplset" || err == "timeout" ) { - // Append err if it's from a write concern issue - builder.append( errField ); - } - else { - // Suppress non-write concern err as null, but we need to report null err if ok - if ( gleResponse["ok"].trueValue() ) - builder.appendNull( errField.fieldName() ); - } + if (!codeField.eoo()) { + if (!gleResponse["ok"].trueValue()) { + // The last code will be from the write concern + builder.append(codeField); + } else { + // The code is from a non-wc error on this connection - suppress it } - - return builder.obj(); } - // Adds a wOpTime and a wElectionId field to a set of gle options - static BSONObj buildGLECmdWithOpTime( const BSONObj& gleOptions, - const Timestamp& opTime, - const OID& electionId ) { - BSONObjBuilder builder; - BSONObjIterator it( gleOptions ); - - for ( int i = 0; it.more(); ++i ) { - BSONElement el = it.next(); - - // Make sure first element is getLastError : 1 - if ( i == 0 ) { - StringData elName( el.fieldName() ); - if ( !elName.equalCaseInsensitive( "getLastError" ) ) { - builder.append( "getLastError", 1 ); - } - } - - builder.append( el ); + if (!errField.eoo()) { + string err = errField.str(); + if (err == "norepl" || err == "noreplset" || err == "timeout") { + // Append err if it's from a write concern issue + builder.append(errField); + } else { + // Suppress non-write concern err as null, but we need to report null err if ok + if (gleResponse["ok"].trueValue()) + builder.appendNull(errField.fieldName()); } - builder.append( "wOpTime", opTime ); - builder.appendOID( "wElectionId", const_cast<OID*>(&electionId) ); - return builder.obj(); } - Status enforceLegacyWriteConcern( MultiCommandDispatch* dispatcher, - StringData dbName, - const BSONObj& options, - const HostOpTimeMap& hostOpTimes, - vector<LegacyWCResponse>* legacyWCResponses ) { + return builder.obj(); +} - if ( hostOpTimes.empty() ) { - return Status::OK(); +// Adds a wOpTime and a wElectionId field to a set of gle options +static BSONObj buildGLECmdWithOpTime(const BSONObj& gleOptions, + const Timestamp& opTime, + const OID& electionId) { + BSONObjBuilder builder; + BSONObjIterator it(gleOptions); + + for (int i = 0; it.more(); ++i) { + BSONElement el = it.next(); + + // Make sure first element is getLastError : 1 + if (i == 0) { + StringData elName(el.fieldName()); + if (!elName.equalCaseInsensitive("getLastError")) { + builder.append("getLastError", 1); + } } - for ( HostOpTimeMap::const_iterator it = hostOpTimes.begin(); it != hostOpTimes.end(); - ++it ) { - - const ConnectionString& shardEndpoint = it->first; - const HostOpTime hot = it->second; - const Timestamp& opTime = hot.opTime; - const OID& electionId = hot.electionId; - - LOG( 3 ) << "enforcing write concern " << options << " on " << shardEndpoint.toString() - << " at opTime " << opTime.toStringPretty() << " with electionID " - << electionId; + builder.append(el); + } + builder.append("wOpTime", opTime); + builder.appendOID("wElectionId", const_cast<OID*>(&electionId)); + return builder.obj(); +} - BSONObj gleCmd = buildGLECmdWithOpTime( options, opTime, electionId ); +Status enforceLegacyWriteConcern(MultiCommandDispatch* dispatcher, + StringData dbName, + const BSONObj& options, + const HostOpTimeMap& hostOpTimes, + vector<LegacyWCResponse>* legacyWCResponses) { + if (hostOpTimes.empty()) { + return Status::OK(); + } - RawBSONSerializable gleCmdSerial( gleCmd ); - dispatcher->addCommand( shardEndpoint, dbName, gleCmdSerial ); - } + for (HostOpTimeMap::const_iterator it = hostOpTimes.begin(); it != hostOpTimes.end(); ++it) { + const ConnectionString& shardEndpoint = it->first; + const HostOpTime hot = it->second; + const Timestamp& opTime = hot.opTime; + const OID& electionId = hot.electionId; - dispatcher->sendAll(); + LOG(3) << "enforcing write concern " << options << " on " << shardEndpoint.toString() + << " at opTime " << opTime.toStringPretty() << " with electionID " << electionId; - vector<Status> failedStatuses; + BSONObj gleCmd = buildGLECmdWithOpTime(options, opTime, electionId); - while ( dispatcher->numPending() > 0 ) { + RawBSONSerializable gleCmdSerial(gleCmd); + dispatcher->addCommand(shardEndpoint, dbName, gleCmdSerial); + } - ConnectionString shardEndpoint; - RawBSONSerializable gleResponseSerial; + dispatcher->sendAll(); - Status dispatchStatus = dispatcher->recvAny( &shardEndpoint, &gleResponseSerial ); - if ( !dispatchStatus.isOK() ) { - // We need to get all responses before returning - failedStatuses.push_back( dispatchStatus ); - continue; - } + vector<Status> failedStatuses; - BSONObj gleResponse = stripNonWCInfo( gleResponseSerial.toBSON() ); + while (dispatcher->numPending() > 0) { + ConnectionString shardEndpoint; + RawBSONSerializable gleResponseSerial; - // Use the downconversion tools to determine if this GLE response is ok, a - // write concern error, or an unknown error we should immediately abort for. - GLEErrors errors; - Status extractStatus = extractGLEErrors( gleResponse, &errors ); - if ( !extractStatus.isOK() ) { - failedStatuses.push_back( extractStatus ); - continue; - } + Status dispatchStatus = dispatcher->recvAny(&shardEndpoint, &gleResponseSerial); + if (!dispatchStatus.isOK()) { + // We need to get all responses before returning + failedStatuses.push_back(dispatchStatus); + continue; + } - LegacyWCResponse wcResponse; - wcResponse.shardHost = shardEndpoint.toString(); - wcResponse.gleResponse = gleResponse; - if ( errors.wcError.get() ) { - wcResponse.errToReport = errors.wcError->getErrMessage(); - } + BSONObj gleResponse = stripNonWCInfo(gleResponseSerial.toBSON()); - legacyWCResponses->push_back( wcResponse ); + // Use the downconversion tools to determine if this GLE response is ok, a + // write concern error, or an unknown error we should immediately abort for. + GLEErrors errors; + Status extractStatus = extractGLEErrors(gleResponse, &errors); + if (!extractStatus.isOK()) { + failedStatuses.push_back(extractStatus); + continue; } - if ( failedStatuses.empty() ) { - return Status::OK(); + LegacyWCResponse wcResponse; + wcResponse.shardHost = shardEndpoint.toString(); + wcResponse.gleResponse = gleResponse; + if (errors.wcError.get()) { + wcResponse.errToReport = errors.wcError->getErrMessage(); } - StringBuilder builder; - builder << "could not enforce write concern"; + legacyWCResponses->push_back(wcResponse); + } - for ( vector<Status>::const_iterator it = failedStatuses.begin(); - it != failedStatuses.end(); ++it ) { - const Status& failedStatus = *it; - if ( it == failedStatuses.begin() ) { - builder << causedBy( failedStatus.toString() ); - } - else { - builder << ":: and ::" << failedStatus.toString(); - } - } + if (failedStatuses.empty()) { + return Status::OK(); + } + + StringBuilder builder; + builder << "could not enforce write concern"; - return Status( failedStatuses.size() == 1u ? failedStatuses.front().code() : - ErrorCodes::MultipleErrorsOccurred, - builder.str() ); + for (vector<Status>::const_iterator it = failedStatuses.begin(); it != failedStatuses.end(); + ++it) { + const Status& failedStatus = *it; + if (it == failedStatuses.begin()) { + builder << causedBy(failedStatus.toString()); + } else { + builder << ":: and ::" << failedStatus.toString(); + } } + + return Status(failedStatuses.size() == 1u ? failedStatuses.front().code() + : ErrorCodes::MultipleErrorsOccurred, + builder.str()); +} } |