/**
* Copyright (C) 2013 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#include "mongo/s/write_ops/config_coordinator.h"
#include "mongo/base/owned_pointer_vector.h"
#include "mongo/db/lasterror.h"
#include "mongo/db/namespace_string.h"
#include "mongo/s/write_ops/batched_command_request.h"
#include "mongo/s/write_ops/batched_command_response.h"
#include "mongo/util/net/message.h"
#include "mongo/db/field_parser.h"
namespace mongo {
using std::string;
using std::vector;
ConfigCoordinator::ConfigCoordinator( MultiCommandDispatch* dispatcher,
const vector& configHosts ) :
_dispatcher( dispatcher ), _configHosts( configHosts ) {
}
namespace {
//
// Types to handle the reachability fsync checks
//
/**
* A BSON serializable object representing an fsync command request
*/
class FsyncRequest : public BSONSerializable {
MONGO_DISALLOW_COPYING(FsyncRequest);
public:
FsyncRequest() {
}
bool isValid( std::string* errMsg ) const {
return true;
}
/** Returns the BSON representation of the entry. */
BSONObj toBSON() const {
return BSON( "fsync" << true );
}
bool parseBSON( const BSONObj& source, std::string* errMsg ) {
// Not implemented
dassert( false );
return false;
}
void clear() {
// Not implemented
dassert( false );
}
string toString() const {
return toBSON().toString();
}
};
/**
* A BSON serializable object representing an fsync command response
*/
class FsyncResponse : public BSONSerializable {
MONGO_DISALLOW_COPYING(FsyncResponse);
public:
static const BSONField ok;
static const BSONField errCode;
static const BSONField errMessage;
FsyncResponse() {
clear();
}
bool isValid( std::string* errMsg ) const {
return _isOkSet;
}
BSONObj toBSON() const {
BSONObjBuilder builder;
if ( _isOkSet ) builder << ok( _ok );
if ( _isErrCodeSet ) builder << errCode( _errCode );
if ( _isErrMessageSet ) builder << errMessage( _errMessage );
return builder.obj();
}
bool parseBSON( const BSONObj& source, std::string* errMsg ) {
FieldParser::FieldState result;
result = FieldParser::extractNumber( source, ok, &_ok, errMsg );
if ( result == FieldParser::FIELD_INVALID )
return false;
_isOkSet = result != FieldParser::FIELD_NONE;
result = FieldParser::extract( source, errCode, &_errCode, errMsg );
if ( result == FieldParser::FIELD_INVALID )
return false;
_isErrCodeSet = result != FieldParser::FIELD_NONE;
result = FieldParser::extract( source, errMessage, &_errMessage, errMsg );
if ( result == FieldParser::FIELD_INVALID )
return false;
_isErrMessageSet = result != FieldParser::FIELD_NONE;
return true;
}
void clear() {
_ok = false;
_isOkSet = false;
_errCode = 0;
_isErrCodeSet = false;
_errMessage = "";
_isErrMessageSet = false;
}
string toString() const {
return toBSON().toString();
}
int getOk() {
dassert( _isOkSet );
return _ok;
}
void setOk( int ok ) {
_ok = ok;
_isOkSet = true;
}
int getErrCode() {
if ( _isErrCodeSet ) {
return _errCode;
}
else {
return errCode.getDefault();
}
}
void setErrCode( int errCode ) {
_errCode = errCode;
_isErrCodeSet = true;
}
const string& getErrMessage() {
dassert( _isErrMessageSet );
return _errMessage;
}
void setErrMessage( StringData errMsg ) {
_errMessage = errMsg.toString();
_isErrMessageSet = true;
}
private:
int _ok;
bool _isOkSet;
int _errCode;
bool _isErrCodeSet;
string _errMessage;
bool _isErrMessageSet;
};
const BSONField FsyncResponse::ok( "ok" );
const BSONField FsyncResponse::errCode( "code" );
const BSONField FsyncResponse::errMessage( "errmsg" );
/**
* A BSON serializable object representing a setShardVersion command request.
*/
class SSVRequest: public BSONSerializable {
MONGO_DISALLOW_COPYING(SSVRequest);
public:
SSVRequest(const std::string& configDBString): _configDBString(configDBString) {
}
bool isValid(std::string* errMsg) const {
return true;
}
/** Returns the BSON representation of the entry. */
BSONObj toBSON() const {
BSONObjBuilder builder;
builder.append("setShardVersion", ""); // empty ns for init
builder.append("configdb", _configDBString);
builder.append("init", true);
builder.append("authoritative", true);
return builder.obj();
}
bool parseBSON(const BSONObj& source, std::string* errMsg) {
// Not implemented
dassert( false );
return false;
}
void clear() {
// Not implemented
dassert( false );
}
string toString() const {
return toBSON().toString();
}
private:
std::string _configDBString;
};
/**
* A BSON serializable object representing a setShardVersion command response.
*/
class SSVResponse: public BSONSerializable {
MONGO_DISALLOW_COPYING(SSVResponse);
public:
static const BSONField ok;
static const BSONField errCode;
static const BSONField errMessage;
SSVResponse() {
clear();
}
bool isValid( std::string* errMsg ) const {
return _isOkSet;
}
BSONObj toBSON() const {
BSONObjBuilder builder;
if (_isOkSet) builder << ok(_ok);
if (_isErrCodeSet) builder << errCode(_errCode);
if (_isErrMessageSet) builder << errMessage(_errMessage);
return builder.obj();
}
bool parseBSON(const BSONObj& source, std::string* errMsg) {
FieldParser::FieldState result;
result = FieldParser::extractNumber(source, ok, &_ok, errMsg);
if (result == FieldParser::FIELD_INVALID)
return false;
_isOkSet = result != FieldParser::FIELD_NONE;
result = FieldParser::extract(source, errCode, &_errCode, errMsg);
if (result == FieldParser::FIELD_INVALID)
return false;
_isErrCodeSet = result != FieldParser::FIELD_NONE;
result = FieldParser::extract(source, errMessage, &_errMessage, errMsg);
if (result == FieldParser::FIELD_INVALID)
return false;
_isErrMessageSet = result != FieldParser::FIELD_NONE;
return true;
}
void clear() {
_ok = false;
_isOkSet = false;
_errCode = 0;
_isErrCodeSet = false;
_errMessage = "";
_isErrMessageSet = false;
}
string toString() const {
return toBSON().toString();
}
int getOk() {
dassert( _isOkSet );
return _ok;
}
void setOk(int ok) {
_ok = ok;
_isOkSet = true;
}
int getErrCode() {
if (_isErrCodeSet) {
return _errCode;
}
else {
return errCode.getDefault();
}
}
void setErrCode(int errCode) {
_errCode = errCode;
_isErrCodeSet = true;
}
bool isErrCodeSet() const {
return _isErrCodeSet;
}
const string& getErrMessage() {
dassert( _isErrMessageSet );
return _errMessage;
}
void setErrMessage(StringData errMsg) {
_errMessage = errMsg.toString();
_isErrMessageSet = true;
}
private:
int _ok;
bool _isOkSet;
int _errCode;
bool _isErrCodeSet;
string _errMessage;
bool _isErrMessageSet;
};
const BSONField SSVResponse::ok("ok");
const BSONField SSVResponse::errCode("code");
const BSONField SSVResponse::errMessage("errmsg");
//
// Types to associate responses with particular config servers
//
struct ConfigResponse {
ConnectionString configHost;
BatchedCommandResponse response;
};
struct ConfigFsyncResponse {
ConnectionString configHost;
FsyncResponse response;
};
}
//
// Error processing helpers
//
static void buildErrorFrom( const Status& status, BatchedCommandResponse* response ) {
response->setOk( false );
response->setErrCode( static_cast( status.code() ) );
response->setErrMessage( status.reason() );
dassert( response->isValid( NULL ) );
}
static void buildFsyncErrorFrom( const Status& status, FsyncResponse* response ) {
response->setOk( false );
response->setErrCode( static_cast( status.code() ) );
response->setErrMessage( status.reason() );
}
static bool areResponsesEqual( const BatchedCommandResponse& responseA,
const BatchedCommandResponse& responseB ) {
// Note: This needs to also take into account comparing responses from legacy writes
// and write commands.
// TODO: Better reporting of why not equal
if ( responseA.getOk() != responseB.getOk() )
return false;
if ( responseA.getN() != responseB.getN() )
return false;
if ( responseA.isUpsertDetailsSet() ) {
// TODO:
}
if ( responseA.getOk() )
return true;
// TODO: Compare errors here
return true;
}
static bool areAllResponsesEqual( const vector& responses ) {
BatchedCommandResponse* lastResponse = NULL;
for ( vector::const_iterator it = responses.begin(); it != responses.end();
++it ) {
BatchedCommandResponse* response = &( *it )->response;
if ( lastResponse != NULL ) {
if ( !areResponsesEqual( *lastResponse, *response ) ) {
return false;
}
}
lastResponse = response;
}
return true;
}
static void combineResponses( const vector& responses,
BatchedCommandResponse* clientResponse ) {
if ( areAllResponsesEqual( responses ) ) {
responses.front()->response.cloneTo( clientResponse );
return;
}
BSONObjBuilder builder;
for ( vector::const_iterator it = responses.begin(); it != responses.end();
++it ) {
builder.append( ( *it )->configHost.toString(), ( *it )->response.toBSON() );
}
clientResponse->setOk( false );
clientResponse->setErrCode( ErrorCodes::ManualInterventionRequired );
clientResponse->setErrMessage( "config write was not consistent, "
"manual intervention may be required. "
"config responses: " + builder.obj().toString() );
}
static void combineFsyncErrors( const vector& responses,
BatchedCommandResponse* clientResponse ) {
clientResponse->setOk( false );
clientResponse->setErrCode( ErrorCodes::RemoteValidationError );
clientResponse->setErrMessage( "could not verify config servers were "
"active and reachable before write" );
}
bool ConfigCoordinator::_checkConfigString(BatchedCommandResponse* clientResponse) {
//
// Send side
//
string configStr;
{
vector::const_iterator it = _configHosts.begin();
configStr = it->toString();
for (++it; it != _configHosts.end(); ++it) {
configStr.append(",");
configStr.append(it->toString());
}
}
for (vector::const_iterator it = _configHosts.begin();
it != _configHosts.end(); ++it) {
SSVRequest ssvRequest(configStr);
_dispatcher->addCommand(*it, "admin", ssvRequest);
}
_dispatcher->sendAll();
//
// Recv side
//
bool ssvError = false;
while (_dispatcher->numPending() > 0) {
ConnectionString configHost;
SSVResponse response;
// We've got to recv everything, no matter what - even if some failed.
Status dispatchStatus = _dispatcher->recvAny(&configHost, &response);
if (ssvError) {
// record only the first failure.
continue;
}
if (!dispatchStatus.isOK()) {
ssvError = true;
clientResponse->setOk(false);
clientResponse->setErrCode(static_cast(dispatchStatus.code()));
clientResponse->setErrMessage(dispatchStatus.reason());
}
else if (!response.getOk()) {
ssvError = true;
clientResponse->setOk(false);
clientResponse->setErrMessage(response.getErrMessage());
if (response.isErrCodeSet()) {
clientResponse->setErrCode(response.getErrCode());
}
}
}
return !ssvError;
}
/**
* The core config write functionality.
*
* Config writes run in two passes - the first is a quick check to ensure the config servers
* are all reachable, the second runs the actual write.
*
* TODO: Upgrade and move this logic to the config servers, a state machine implementation
* is probably the next step.
*/
void ConfigCoordinator::executeBatch( const BatchedCommandRequest& clientRequest,
BatchedCommandResponse* clientResponse,
bool fsyncCheck ) {
NamespaceString nss( clientRequest.getNS() );
dassert( nss.db() == "config" || nss.db() == "admin" );
dassert( clientRequest.sizeWriteOps() == 1u );
if ( fsyncCheck ) {
//
// Sanity check that all configs are still reachable using fsync, preserving legacy
// behavior
//
OwnedPointerVector fsyncResponsesOwned;
vector& fsyncResponses = fsyncResponsesOwned.mutableVector();
//
// Send side
//
for ( vector::iterator it = _configHosts.begin();
it != _configHosts.end(); ++it ) {
ConnectionString& configHost = *it;
FsyncRequest fsyncRequest;
_dispatcher->addCommand( configHost, "admin", fsyncRequest );
}
_dispatcher->sendAll();
//
// Recv side
//
bool fsyncError = false;
while ( _dispatcher->numPending() > 0 ) {
fsyncResponses.push_back( new ConfigFsyncResponse() );
ConfigFsyncResponse& fsyncResponse = *fsyncResponses.back();
Status dispatchStatus = _dispatcher->recvAny( &fsyncResponse.configHost,
&fsyncResponse.response );
// We've got to recv everything, no matter what
if ( !dispatchStatus.isOK() ) {
fsyncError = true;
buildFsyncErrorFrom( dispatchStatus, &fsyncResponse.response );
}
else if ( !fsyncResponse.response.getOk() ) {
fsyncError = true;
}
}
if ( fsyncError ) {
combineFsyncErrors( fsyncResponses, clientResponse );
return;
}
else {
fsyncResponsesOwned.clear();
}
}
if (!_checkConfigString(clientResponse)) {
return;
}
//
// Do the actual writes
//
BatchedCommandRequest configRequest( clientRequest.getBatchType() );
clientRequest.cloneTo( &configRequest );
configRequest.setNS( nss.coll() );
OwnedPointerVector responsesOwned;
vector& responses = responsesOwned.mutableVector();
//
// Send the actual config writes
//
// Get as many batches as we can at once
for ( vector::iterator it = _configHosts.begin();
it != _configHosts.end(); ++it ) {
ConnectionString& configHost = *it;
_dispatcher->addCommand( configHost, nss.db(), configRequest );
}
// Send them all out
_dispatcher->sendAll();
//
// Recv side
//
while ( _dispatcher->numPending() > 0 ) {
// Get the response
responses.push_back( new ConfigResponse() );
ConfigResponse& configResponse = *responses.back();
Status dispatchStatus = _dispatcher->recvAny( &configResponse.configHost,
&configResponse.response );
if ( !dispatchStatus.isOK() ) {
buildErrorFrom( dispatchStatus, &configResponse.response );
}
}
combineResponses( responses, clientResponse );
}
}