path: root/src/mongo/s/write_ops
diff options
Diffstat (limited to 'src/mongo/s/write_ops')
5 files changed, 573 insertions, 2 deletions
diff --git a/src/mongo/s/write_ops/batched_delete_request.cpp b/src/mongo/s/write_ops/batched_delete_request.cpp
index 8e21c6608b1..776d3c7bf33 100644
--- a/src/mongo/s/write_ops/batched_delete_request.cpp
+++ b/src/mongo/s/write_ops/batched_delete_request.cpp
@@ -154,7 +154,7 @@ namespace mongo {
++it) {
auto_ptr<BatchedDeleteDocument> tempBatchDeleteDocument(new BatchedDeleteDocument);
- other->addToDeletes(*it);
+ other->addToDeletes(tempBatchDeleteDocument.release());
other->_isDeletesSet = _isDeletesSet;
diff --git a/src/mongo/s/write_ops/batched_update_request.cpp b/src/mongo/s/write_ops/batched_update_request.cpp
index 827f60865a6..29727378a37 100644
--- a/src/mongo/s/write_ops/batched_update_request.cpp
+++ b/src/mongo/s/write_ops/batched_update_request.cpp
@@ -154,7 +154,7 @@ namespace mongo {
++it) {
auto_ptr<BatchedUpdateDocument> tempBatchUpdateDocument(new BatchedUpdateDocument);
- other->addToUpdates(*it);
+ other->addToUpdates(tempBatchUpdateDocument.release());
other->_isUpdatesSet = _isUpdatesSet;
diff --git a/src/mongo/s/write_ops/config_coordinator.cpp b/src/mongo/s/write_ops/config_coordinator.cpp
new file mode 100644
index 00000000000..db7af09691c
--- /dev/null
+++ b/src/mongo/s/write_ops/config_coordinator.cpp
@@ -0,0 +1,461 @@
+ * Copyright (C) 2013 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+#include "mongo/s/write_ops/config_coordinator.h"
+#include "mongo/base/owned_pointer_vector.h"
+#include "mongo/db/lasterror.h"
+#include "mongo/db/namespace_string.h"
+#include "mongo/s/write_ops/batched_command_request.h"
+#include "mongo/s/write_ops/batched_command_response.h"
+#include "mongo/util/net/message.h"
+#include "mongo/db/field_parser.h"
+namespace mongo {
+ ConfigCoordinator::ConfigCoordinator( MultiCommandDispatch* dispatcher,
+ const vector<ConnectionString>& configHosts ) :
+ _dispatcher( dispatcher ), _configHosts( configHosts ) {
+ }
+ namespace {
+ //
+ // Types to handle the reachability fsync checks
+ //
+ /**
+ * A BSON serializable object representing an fsync command request
+ */
+ class FsyncRequest : public BSONSerializable {
+ public:
+ FsyncRequest() {
+ }
+ bool isValid( std::string* errMsg ) const {
+ return true;
+ }
+ /** Returns the BSON representation of the entry. */
+ BSONObj toBSON() const {
+ return BSON( "fsync" << true );
+ }
+ bool parseBSON( const BSONObj& source, std::string* errMsg ) {
+ // Not implemented
+ dassert( false );
+ return false;
+ }
+ void clear() {
+ // Not implemented
+ dassert( false );
+ }
+ string toString() const {
+ return toBSON().toString();
+ }
+ };
+ /**
+ * A BSON serializable object representing an fsync command response
+ */
+ class FsyncResponse : public BSONSerializable {
+ public:
+ static const BSONField<int> ok;
+ static const BSONField<int> errCode;
+ static const BSONField<string> errMessage;
+ FsyncResponse() {
+ clear();
+ }
+ bool isValid( std::string* errMsg ) const {
+ return _isOkSet;
+ }
+ BSONObj toBSON() const {
+ // Not implemented
+ dassert( false );
+ return BSONObj();
+ }
+ bool parseBSON( const BSONObj& source, std::string* errMsg ) {
+ FieldParser::FieldState result;
+ result = FieldParser::extractNumber( source, ok, &_ok, errMsg );
+ if ( result == FieldParser::FIELD_INVALID )
+ return false;
+ _isOkSet = result != FieldParser::FIELD_NONE;
+ result = FieldParser::extract( source, errCode, &_errCode, errMsg );
+ if ( result == FieldParser::FIELD_INVALID )
+ return false;
+ _isErrCodeSet = result != FieldParser::FIELD_NONE;
+ result = FieldParser::extract( source, errMessage, &_errMessage, errMsg );
+ if ( result == FieldParser::FIELD_INVALID )
+ return false;
+ _isErrMessageSet = result != FieldParser::FIELD_NONE;
+ return true;
+ }
+ void clear() {
+ _ok = false;
+ _isOkSet = false;
+ _errCode = 0;
+ _isErrCodeSet = false;
+ _errMessage = "";
+ _isErrMessageSet = false;
+ }
+ string toString() const {
+ return toBSON().toString();
+ }
+ int getOk() {
+ dassert( _isOkSet );
+ return _ok;
+ }
+ void setOk( int ok ) {
+ _ok = ok;
+ _isOkSet = true;
+ }
+ int getErrCode() {
+ if ( _isErrCodeSet ) {
+ return _errCode;
+ }
+ else {
+ return errCode.getDefault();
+ }
+ }
+ void setErrCode( int errCode ) {
+ _errCode = errCode;
+ _isErrCodeSet = true;
+ }
+ const string& getErrMessage() {
+ dassert( _isErrMessageSet );
+ return _errMessage;
+ }
+ void setErrMessage( const StringData& errMsg ) {
+ _errMessage = errMsg.toString();
+ _isErrMessageSet = true;
+ }
+ private:
+ int _ok;
+ bool _isOkSet;
+ int _errCode;
+ bool _isErrCodeSet;
+ string _errMessage;
+ bool _isErrMessageSet;
+ };
+ const BSONField<int> FsyncResponse::ok( "ok" );
+ const BSONField<int> FsyncResponse::errCode( "code" );
+ const BSONField<string> FsyncResponse::errMessage( "errmsg" );
+ //
+ // Types to associate responses with particular config servers
+ //
+ struct ConfigResponse {
+ ConfigResponse( const ConnectionString& configHost ) :
+ configHost( configHost ) {
+ }
+ const ConnectionString configHost;
+ BatchedCommandResponse response;
+ };
+ struct ConfigFsyncResponse {
+ ConfigFsyncResponse( const ConnectionString& configHost ) :
+ configHost( configHost ) {
+ }
+ const ConnectionString configHost;
+ FsyncResponse response;
+ };
+ }
+ //
+ // Error processing helpers
+ //
+ static void buildErrorFrom( const Status& status, BatchedCommandResponse* response ) {
+ response->setOk( false );
+ response->setN( 0 );
+ response->setErrCode( static_cast<int>( status.code() ) );
+ response->setErrMessage( status.reason() );
+ dassert( response->isValid( NULL ) );
+ }
+ static void buildFsyncErrorFrom( const Status& status, FsyncResponse* response ) {
+ response->setOk( false );
+ response->setErrCode( static_cast<int>( status.code() ) );
+ response->setErrMessage( status.reason() );
+ }
+ static bool areResponsesEqual( const BatchedCommandResponse& responseA,
+ const BatchedCommandResponse& responseB ) {
+ // TODO: Better reporting of why not equal
+ if ( responseA.getOk() != responseB.getOk() )
+ return false;
+ if ( responseA.getN() != responseB.getN() )
+ return false;
+ if ( responseA.isSingleUpsertedSet() != responseB.isSingleUpsertedSet() )
+ return false;
+ if ( responseA.isUpsertDetailsSet() != responseB.isUpsertDetailsSet() )
+ return false;
+ if ( responseA.isSingleUpsertedSet() ) {
+ BSONObj upsertA = responseA.getSingleUpserted();
+ BSONObj upsertB = responseB.getSingleUpserted();
+ if ( upsertA.woCompare( upsertB ) != 0 )
+ return false;
+ }
+ if ( responseA.isUpsertDetailsSet() ) {
+ // TODO:
+ }
+ if ( responseA.getOk() )
+ return true;
+ // TODO: Compare errors here
+ return true;
+ }
+ static bool areAllResponsesEqual( const vector<ConfigResponse*>& responses ) {
+ BatchedCommandResponse* lastResponse = NULL;
+ for ( vector<ConfigResponse*>::const_iterator it = responses.begin(); it != responses.end();
+ ++it ) {
+ BatchedCommandResponse* response = &( *it )->response;
+ if ( lastResponse != NULL ) {
+ if ( !areResponsesEqual( *lastResponse, *response ) ) {
+ return false;
+ }
+ }
+ lastResponse = response;
+ }
+ return true;
+ }
+ static void combineResponses( const vector<ConfigResponse*>& responses,
+ BatchedCommandResponse* clientResponse ) {
+ if ( areAllResponsesEqual( responses ) ) {
+ responses.front()->response.cloneTo( clientResponse );
+ return;
+ }
+ clientResponse->setOk( false );
+ clientResponse->setN( 0 );
+ clientResponse->setErrCode( ErrorCodes::ManualInterventionRequired );
+ BSONObjBuilder errInfoB;
+ for ( vector<ConfigResponse*>::const_iterator it = responses.begin(); it != responses.end();
+ ++it ) {
+ ConfigResponse* response = *it;
+ errInfoB.append( response->configHost.toString(), response->response.toBSON() );
+ }
+ clientResponse->setErrInfo( errInfoB.obj() );
+ clientResponse->setErrMessage( "config write was not consistent, "
+ "manual intervention may be required" );
+ }
+ static void combineFsyncErrors( const vector<ConfigFsyncResponse*>& responses,
+ BatchedCommandResponse* clientResponse ) {
+ clientResponse->setOk( false );
+ clientResponse->setN( 0 );
+ clientResponse->setErrCode( ErrorCodes::RemoteValidationError );
+ BSONObjBuilder errInfoB;
+ for ( vector<ConfigFsyncResponse*>::const_iterator it = responses.begin();
+ it != responses.end(); ++it ) {
+ ConfigFsyncResponse* fsyncResponse = *it;
+ if ( fsyncResponse->response.getOk() )
+ continue;
+ errInfoB.append( fsyncResponse->configHost.toString(),
+ fsyncResponse->response.toBSON() );
+ }
+ clientResponse->setErrInfo( errInfoB.obj() );
+ clientResponse->setErrMessage( "could not verify config servers were "
+ "active and reachable before write" );
+ }
+ /**
+ * The core config write functionality.
+ *
+ * Config writes run in two passes - the first is a quick check to ensure the config servers
+ * are all reachable, the second runs the actual write.
+ *
+ * TODO: Upgrade and move this logic to the config servers, a state machine implementation
+ * is probably the next step.
+ */
+ void ConfigCoordinator::executeBatch( const BatchedCommandRequest& clientRequest,
+ BatchedCommandResponse* clientResponse,
+ bool fsyncCheck ) {
+ NamespaceString nss( clientRequest.getNS() );
+ dassert( nss.db() == "config" || nss.db() == "admin" );
+ dassert( clientRequest.sizeWriteOps() == 1u );
+ dassert( !clientRequest.isWriteConcernSet() );
+ if ( fsyncCheck ) {
+ //
+ // Sanity check that all configs are still reachable using fsync, preserving legacy
+ // behavior
+ //
+ OwnedPointerVector<ConfigFsyncResponse> fsyncResponsesOwned;
+ vector<ConfigFsyncResponse*>& fsyncResponses = fsyncResponsesOwned.mutableVector();
+ //
+ // Send side
+ //
+ for ( vector<ConnectionString>::iterator it = _configHosts.begin();
+ it != _configHosts.end(); ++it ) {
+ ConnectionString& configHost = *it;
+ FsyncRequest fsyncRequest;
+ _dispatcher->addCommand( configHost, "admin", fsyncRequest );
+ }
+ _dispatcher->sendAll();
+ //
+ // Recv side
+ //
+ bool fsyncError = false;
+ while ( _dispatcher->numPending() > 0 ) {
+ ConnectionString configHost;
+ fsyncResponses.push_back( new ConfigFsyncResponse( configHost ) );
+ ConfigFsyncResponse& fsyncResponse = *fsyncResponses.back();
+ Status dispatchStatus = _dispatcher->recvAny( &configHost,
+ &fsyncResponse.response );
+ // We've got to recv everything, no matter what
+ if ( !dispatchStatus.isOK() ) {
+ fsyncError = true;
+ buildFsyncErrorFrom( dispatchStatus, &fsyncResponse.response );
+ }
+ else if ( !fsyncResponse.response.getOk() ) {
+ fsyncError = true;
+ }
+ }
+ if ( fsyncError ) {
+ combineFsyncErrors( fsyncResponses, clientResponse );
+ return;
+ }
+ else {
+ fsyncResponsesOwned.clear();
+ }
+ }
+ //
+ // Do the actual writes
+ //
+ BatchedCommandRequest configRequest( clientRequest.getBatchType() );
+ clientRequest.cloneTo( &configRequest );
+ configRequest.setNS( nss.coll() );
+ OwnedPointerVector<ConfigResponse> responsesOwned;
+ vector<ConfigResponse*>& responses = responsesOwned.mutableVector();
+ //
+ // Send the actual config writes
+ //
+ // Get as many batches as we can at once
+ for ( vector<ConnectionString>::iterator it = _configHosts.begin();
+ it != _configHosts.end(); ++it ) {
+ ConnectionString& configHost = *it;
+ _dispatcher->addCommand( configHost, nss.db(), configRequest );
+ }
+ // Send them all out
+ _dispatcher->sendAll();
+ //
+ // Recv side
+ //
+ while ( _dispatcher->numPending() > 0 ) {
+ // Get the response
+ ConnectionString configHost;
+ responses.push_back( new ConfigResponse( configHost ) );
+ ConfigResponse& configResponse = *responses.back();
+ Status dispatchStatus = _dispatcher->recvAny( &configHost, &configResponse.response );
+ if ( !dispatchStatus.isOK() ) {
+ buildErrorFrom( dispatchStatus, &configResponse.response );
+ }
+ }
+ combineResponses( responses, clientResponse );
+ }
diff --git a/src/mongo/s/write_ops/config_coordinator.h b/src/mongo/s/write_ops/config_coordinator.h
new file mode 100644
index 00000000000..08d6f349006
--- /dev/null
+++ b/src/mongo/s/write_ops/config_coordinator.h
@@ -0,0 +1,58 @@
+ * Copyright (C) 2013 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+#pragma once
+#include <vector>
+#include "mongo/client/dbclientinterface.h"
+#include "mongo/s/multi_command_dispatch.h"
+#include "mongo/s/write_ops/batched_command_request.h"
+#include "mongo/s/write_ops/batched_command_response.h"
+namespace mongo {
+ class ConfigCoordinator {
+ public:
+ ConfigCoordinator( MultiCommandDispatch* dispatcher,
+ const std::vector<ConnectionString>& configHosts );
+ void executeBatch( const BatchedCommandRequest& request,
+ BatchedCommandResponse* response,
+ bool fsyncCheck );
+ private:
+ // Not owned here
+ MultiCommandDispatch* _dispatcher;
+ std::vector<ConnectionString> _configHosts;
+ };
diff --git a/src/mongo/s/write_ops/config_coordinator_test.cpp b/src/mongo/s/write_ops/config_coordinator_test.cpp
new file mode 100644
index 00000000000..f8120bfedaa
--- /dev/null
+++ b/src/mongo/s/write_ops/config_coordinator_test.cpp
@@ -0,0 +1,52 @@
+ * Copyright (C) 2013 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+#include "mongo/s/write_ops/config_coordinator.h"
+#include <vector>
+#include "mongo/client/dbclientinterface.h"
+#include "mongo/s/mock_multi_command.h"
+#include "mongo/unittest/unittest.h"
+namespace {
+ using namespace mongo;
+ using std::vector;
+ //
+ // Tests for the ConfigCoordinator
+ //
+ TEST(ConfigCoordinatorTests, Basic) {
+ MockMultiCommand dispatcher;
+ vector<ConnectionString> configHosts;
+ ConfigCoordinator exec( &dispatcher, configHosts );
+ }
+} // unnamed namespace