diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/write_concern.h | 23 | ||||
-rw-r--r-- | src/mongo/db/write_concern_options.cpp | 39 | ||||
-rw-r--r-- | src/mongo/db/write_concern_options.h | 65 | ||||
-rw-r--r-- | src/mongo/dbtests/config_server_fixture.cpp | 3 | ||||
-rw-r--r-- | src/mongo/dbtests/config_server_fixture.h | 28 | ||||
-rw-r--r-- | src/mongo/s/balance.cpp | 47 | ||||
-rw-r--r-- | src/mongo/s/balance.h | 4 | ||||
-rw-r--r-- | src/mongo/s/chunk.cpp | 66 | ||||
-rw-r--r-- | src/mongo/s/cluster_client_internal.cpp | 19 | ||||
-rw-r--r-- | src/mongo/s/cluster_write.cpp | 100 | ||||
-rw-r--r-- | src/mongo/s/cluster_write.h | 65 | ||||
-rw-r--r-- | src/mongo/s/commands_admin.cpp | 62 | ||||
-rw-r--r-- | src/mongo/s/config.cpp | 181 | ||||
-rw-r--r-- | src/mongo/s/config.h | 2 | ||||
-rw-r--r-- | src/mongo/s/config_upgrade_helpers.cpp | 85 | ||||
-rw-r--r-- | src/mongo/s/config_upgrade_v0_to_v5.cpp | 22 | ||||
-rw-r--r-- | src/mongo/s/grid.cpp | 22 | ||||
-rw-r--r-- | src/mongo/s/write_ops/batch_downconvert.cpp | 12 | ||||
-rw-r--r-- | src/mongo/s/write_ops/config_coordinator.cpp | 5 |
20 files changed, 579 insertions, 272 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index a2c62e4c22d..91e36615208 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -14,6 +14,7 @@ env.Library( 'field_ref.cpp', 'field_ref_set.cpp', 'field_parser.cpp', + 'write_concern_options.cpp' ], LIBDEPS=[ '$BUILD_DIR/mongo/bson', diff --git a/src/mongo/db/write_concern.h b/src/mongo/db/write_concern.h index ca26cd99b82..6253bfd5107 100644 --- a/src/mongo/db/write_concern.h +++ b/src/mongo/db/write_concern.h @@ -28,28 +28,9 @@ #pragma once -namespace mongo { - - struct WriteConcernOptions { - - WriteConcernOptions() { reset(); } +#include "mongo/db/write_concern_options.h" - Status parse( const BSONObj& obj ); - - void reset() { - syncMode = NONE; - wNumNodes = 0; - wMode = ""; - wTimeout = 0; - } - - enum SyncMode { NONE, FSYNC, JOURNAL } syncMode; - - int wNumNodes; - string wMode; - - int wTimeout; - }; +namespace mongo { struct WriteConcernResult { WriteConcernResult() { diff --git a/src/mongo/db/write_concern_options.cpp b/src/mongo/db/write_concern_options.cpp new file mode 100644 index 00000000000..8597e55d28f --- /dev/null +++ b/src/mongo/db/write_concern_options.cpp @@ -0,0 +1,39 @@ +/* Copyright (C) 2014 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/db/write_concern_options.h" + +#include "mongo/client/dbclientinterface.h" + +namespace mongo { + + const BSONObj WriteConcernOptions::Default = BSONObj(); + const BSONObj WriteConcernOptions::Acknowledged(BSON("w" << W_NORMAL)); + const BSONObj WriteConcernOptions::AllConfigs = BSONObj(); + const BSONObj WriteConcernOptions::Unacknowledged(BSON("w" << W_NONE)); + +} diff --git a/src/mongo/db/write_concern_options.h b/src/mongo/db/write_concern_options.h new file mode 100644 index 00000000000..fd2b7544639 --- /dev/null +++ b/src/mongo/db/write_concern_options.h @@ -0,0 +1,65 @@ +/* Copyright (C) 2014 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include <string> + +#include "mongo/base/status.h" +#include "mongo/db/jsobj.h" + +namespace mongo { + + struct WriteConcernOptions { + public: + + static const BSONObj Default; + static const BSONObj Acknowledged; + static const BSONObj AllConfigs; + static const BSONObj Unacknowledged; + + WriteConcernOptions() { reset(); } + + Status parse( const BSONObj& obj ); + + void reset() { + syncMode = NONE; + wNumNodes = 0; + wMode = ""; + wTimeout = 0; + } + + enum SyncMode { NONE, FSYNC, JOURNAL } syncMode; + + int wNumNodes; + std::string wMode; + + int wTimeout; + }; + +} + diff --git a/src/mongo/dbtests/config_server_fixture.cpp b/src/mongo/dbtests/config_server_fixture.cpp index 3671189f1cb..c88d3be892f 100644 --- a/src/mongo/dbtests/config_server_fixture.cpp +++ b/src/mongo/dbtests/config_server_fixture.cpp @@ -31,6 +31,7 @@ #include <list> #include "mongo/client/distlock.h" +#include "mongo/s/config.h" #include "mongo/s/type_changelog.h" #include "mongo/s/type_chunk.h" #include "mongo/s/type_collection.h" @@ -61,6 +62,8 @@ namespace mongo { client().ensureIndex( ChunkType::ConfigNS, // br BSON( ChunkType::ns() << 1 << // br ChunkType::DEPRECATED_lastmod() << 1 ) ); + + configServer.init(configSvr().toString()); } void ConfigServerFixture::clearServer() { diff --git a/src/mongo/dbtests/config_server_fixture.h b/src/mongo/dbtests/config_server_fixture.h index 69f91ead8d9..9aa33e612eb 100644 --- a/src/mongo/dbtests/config_server_fixture.h +++ b/src/mongo/dbtests/config_server_fixture.h @@ -29,6 +29,7 @@ #pragma once #include "mongo/db/instance.h" +#include "mongo/db/wire_version.h" #include "mongo/client/dbclientinterface.h" #include "mongo/unittest/unittest.h" #include "mongo/util/assert_util.h" @@ -37,9 +38,36 @@ namespace mongo { class CustomDirectClient: public DBDirectClient { public: + CustomDirectClient() { + setWireVersions(minWireVersion, maxWireVersion); + } + virtual ConnectionString::ConnectionType type() const { return ConnectionString::CUSTOM; } + + virtual bool recv( Message& m ) { + // This is tailored to act as a dummy response for write commands. + + BufBuilder bb; + bb.skip(sizeof(QueryResult)); + + BSONObj cmdResult(BSON("ok" << 1)); + + bb.appendBuf(cmdResult.objdata(), cmdResult.objsize()); + + QueryResult* qr = reinterpret_cast<QueryResult*>(bb.buf()); + bb.decouple(); + qr->setResultFlagsToOk(); + qr->len = bb.len(); + qr->setOperation(opReply); + qr->cursorId = 0; + qr->startingFrom = 0; + qr->nReturned = 1; + m.setData(qr, true); + + return true; + } }; class CustomConnectHook: public ConnectionString::ConnectionHook { diff --git a/src/mongo/s/balance.cpp b/src/mongo/s/balance.cpp index 1cd4598070b..6cf7fda7d8a 100644 --- a/src/mongo/s/balance.cpp +++ b/src/mongo/s/balance.cpp @@ -33,7 +33,9 @@ #include "mongo/client/dbclientcursor.h" #include "mongo/client/distlock.h" #include "mongo/db/jsobj.h" +#include "mongo/db/write_concern.h" #include "mongo/s/chunk.h" +#include "mongo/s/cluster_write.h" #include "mongo/s/config.h" #include "mongo/s/config_server_checker_service.h" #include "mongo/s/grid.h" @@ -135,19 +137,17 @@ namespace mongo { return movedCount; } - void Balancer::_ping( DBClientBase& conn, bool waiting ) { - WriteConcern w = conn.getWriteConcern(); - conn.setWriteConcern( W_NONE ); - - conn.update( MongosType::ConfigNS , - BSON( MongosType::name(_myid) ) , - BSON( "$set" << BSON( MongosType::ping(jsTime()) << - MongosType::up((int)(time(0)-_started)) << - MongosType::waiting(waiting) << - MongosType::mongoVersion(versionString) ) ) , - true ); - - conn.setWriteConcern( w); + void Balancer::_ping( bool waiting ) { + clusterUpdate( MongosType::ConfigNS, + BSON( MongosType::name( _myid )), + BSON( "$set" << BSON( MongosType::ping(jsTime()) << + MongosType::up(static_cast<int>(time(0)-_started)) << + MongosType::waiting(waiting) << + MongosType::mongoVersion(versionString) )), + true, // upsert + false, // multi + WriteConcernOptions::Unacknowledged, + NULL ); } bool Balancer::_checkOIDs() { @@ -297,9 +297,16 @@ namespace mongo { DistributionStatus status( shardInfo, shardToChunksMap ); // load tags - conn.ensureIndex(TagsType::ConfigNS, - BSON(TagsType::ns() << 1 << TagsType::min() << 1), - true); + Status result = clusterCreateIndex(TagsType::ConfigNS, + BSON(TagsType::ns() << 1 << TagsType::min() << 1), + true, // unique + WriteConcernOptions::AllConfigs, + NULL); + + if ( !result.isOK() ) { + warning() << "could not create index tags_1_min_1: " << result.reason() << endl; + continue; + } cursor = conn.query(TagsType::ConfigNS, QUERY(TagsType::ns(ns)).sort(TagsType::min())); @@ -432,7 +439,7 @@ namespace mongo { ScopedDbConnection conn(config.toString(), 30); // ping has to be first so we keep things in the config server in sync - _ping( conn.conn() ); + _ping(); // use fresh shard state Shard::reloadShardInfo(); @@ -446,7 +453,7 @@ namespace mongo { LOG(1) << "skipping balancing round because balancing is disabled" << endl; // Ping again so scripts can determine if we're active without waiting - _ping( conn.conn(), true ); + _ping( true ); conn.done(); @@ -465,7 +472,7 @@ namespace mongo { LOG(1) << "skipping balancing round because another balancer is active" << endl; // Ping again so scripts can determine if we're active without waiting - _ping( conn.conn(), true ); + _ping( true ); conn.done(); @@ -511,7 +518,7 @@ namespace mongo { } // Ping again so scripts can determine if we're active without waiting - _ping( conn.conn(), true ); + _ping( true ); conn.done(); diff --git a/src/mongo/s/balance.h b/src/mongo/s/balance.h index 3db2b84889c..998d4ec6263 100644 --- a/src/mongo/s/balance.h +++ b/src/mongo/s/balance.h @@ -106,10 +106,8 @@ namespace mongo { /** * Marks this balancer as being live on the config server(s). - * - * @param conn is the connection with the config server(s) */ - void _ping( DBClientBase& conn, bool waiting = false ); + void _ping( bool waiting = false ); /** * @return true if all the servers listed in configdb as being shards are reachable and are distinct processes diff --git a/src/mongo/s/chunk.cpp b/src/mongo/s/chunk.cpp index 6145e55bf3c..aa772f5227d 100644 --- a/src/mongo/s/chunk.cpp +++ b/src/mongo/s/chunk.cpp @@ -36,10 +36,12 @@ #include "mongo/client/dbclientcursor.h" #include "mongo/db/query/lite_parsed_query.h" #include "mongo/db/queryutil.h" +#include "mongo/db/write_concern.h" #include "mongo/platform/random.h" #include "mongo/s/chunk_diff.h" #include "mongo/s/chunk_version.h" #include "mongo/s/client_info.h" +#include "mongo/s/cluster_write.h" #include "mongo/s/config.h" #include "mongo/s/config_server_checker_service.h" #include "mongo/s/cursors.h" @@ -556,16 +558,17 @@ namespace mongo { // at least this mongos won't try and keep moving _jumbo = true; - try { - ScopedDbConnection conn(configServer.modelServer(), 30); + Status result = clusterUpdate( ChunkType::ConfigNS, + BSON(ChunkType::name(genID())), + BSON("$set" << BSON(ChunkType::jumbo(true))), + false, // upsert + false, // multi + WriteConcernOptions::AllConfigs, + NULL ); - conn->update(ChunkType::ConfigNS, - BSON(ChunkType::name(genID())), - BSON("$set" << BSON(ChunkType::jumbo(true)))); - conn.done(); - } - catch ( DBException& e ) { - warning() << "couldn't set jumbo for chunk: " << genID() << causedBy( e ) << endl; + if ( !result.isOK() ) { + warning() << "couldn't set jumbo for chunk: " + << genID() << result.reason() << endl; } } @@ -1035,6 +1038,7 @@ namespace mongo { uassert( 13449 , str::stream() << "collection " << _ns << " already sharded with " << existingChunks << " chunks", existingChunks == 0 ); + conn.done(); for ( unsigned i=0; i<=splitPoints.size(); i++ ) { BSONObj min = i == 0 ? _key.globalMin() : splitPoints[i-1]; @@ -1046,23 +1050,23 @@ namespace mongo { temp.serialize( chunkBuilder ); BSONObj chunkObj = chunkBuilder.obj(); - conn->update(ChunkType::ConfigNS, - QUERY(ChunkType::name(temp.genID())), - chunkObj, - true, - false ); + Status result = clusterUpdate( ChunkType::ConfigNS, + BSON(ChunkType::name(temp.genID())), + chunkObj, + true, // upsert + false, // multi + WriteConcernOptions::AllConfigs, + NULL ); version.incMinor(); - } - string errmsg = conn->getLastError(); - if ( errmsg.size() ) { - string ss = str::stream() << "creating first chunks failed. result: " << errmsg; - error() << ss << endl; - msgasserted( 15903 , ss ); + if ( !result.isOK() ) { + string ss = str::stream() << "creating first chunks failed. result: " + << result.reason(); + error() << ss << endl; + msgasserted( 15903 , ss ); + } } - - conn.done(); _version = ChunkVersion( 0, version.epoch() ); } @@ -1273,16 +1277,18 @@ namespace mongo { LOG(1) << "ChunkManager::drop : " << _ns << "\t removed shard data" << endl; // remove chunk data - ScopedDbConnection conn(configServer.modelServer()); - conn->remove(ChunkType::ConfigNS, BSON(ChunkType::ns(_ns))); + Status result = clusterDelete( ChunkType::ConfigNS, + BSON(ChunkType::ns(_ns)), + 0 /* limit */, + WriteConcernOptions::AllConfigs, + NULL ); // Make sure we're dropped on the config - string error = conn->getLastError(); - uassert( 17001, str::stream() << "could not drop chunks for " << _ns - << causedBy( error ), - error.size() == 0 ); - - conn.done(); + if ( !result.isOK() ) { + uasserted( 17001, str::stream() << "could not drop chunks for " << _ns + << ": " << result.reason() ); + } + LOG(1) << "ChunkManager::drop : " << _ns << "\t removed chunk data" << endl; for ( set<Shard>::iterator i=seen.begin(); i!=seen.end(); i++ ) { diff --git a/src/mongo/s/cluster_client_internal.cpp b/src/mongo/s/cluster_client_internal.cpp index 9f90a19353b..a6fcd3a0b2f 100644 --- a/src/mongo/s/cluster_client_internal.cpp +++ b/src/mongo/s/cluster_client_internal.cpp @@ -33,6 +33,8 @@ #include "mongo/client/connpool.h" #include "mongo/db/field_parser.h" +#include "mongo/db/write_concern.h" +#include "mongo/s/cluster_write.h" #include "mongo/s/type_changelog.h" #include "mongo/s/type_mongos.h" #include "mongo/s/type_shard.h" @@ -369,9 +371,7 @@ namespace mongo { createdCapped = true; } - - conn->insert(ChangelogType::ConfigNS, changelog.toBSON()); - _checkGLE(conn); + connPtr->done(); } catch (const DBException& e) { // if we got here, it means the config change is only in the log, @@ -380,8 +380,17 @@ namespace mongo { return e.toStatus(); } - connPtr->done(); - return Status::OK(); + Status result = clusterInsert( ChangelogType::ConfigNS, + changelog.toBSON(), + WriteConcernOptions::AllConfigs, + NULL ); + + if ( !result.isOK() ) { + return Status( result.code(), str::stream() << "failed to write to changelog: " + << result.reason() ); + } + + return result; } // Helper function for safe writes to non-SCC config servers diff --git a/src/mongo/s/cluster_write.cpp b/src/mongo/s/cluster_write.cpp index 1ec4def8d14..35d1ec87244 100644 --- a/src/mongo/s/cluster_write.cpp +++ b/src/mongo/s/cluster_write.cpp @@ -148,22 +148,47 @@ namespace mongo { if ( configHostOrHosts.type() == ConnectionString::MASTER ) { configHosts.push_back( configHostOrHosts ); } - else { - dassert( configHostOrHosts.type() == ConnectionString::SYNC ); + else if ( configHostOrHosts.type() == ConnectionString::SYNC ) { vector<HostAndPort> configHPs = configHostOrHosts.getServers(); for ( vector<HostAndPort>::iterator it = configHPs.begin(); it != configHPs.end(); ++it ) { configHosts.push_back( ConnectionString( *it ) ); } } + else { + // This is only for tests. + dassert( configHostOrHosts.type() == ConnectionString::CUSTOM ); + configHosts.push_back( configHostOrHosts ); + } return configHosts; } - void clusterInsert( const string& ns, - const BSONObj& doc, - const BSONObj& writeConcern, - BatchedCommandResponse* response ) { + static Status getStatus( const BatchedCommandResponse& response ) { + if ( response.getOk() != 1 ) { + return Status( static_cast<ErrorCodes::Error>(response.getErrCode()), + response.getErrMessage() ); + } + + if ( response.isErrDetailsSet() ) { + const WriteErrorDetail* errDetail = response.getErrDetails().front(); + return Status( static_cast<ErrorCodes::Error>(errDetail->getErrCode()), + errDetail->getErrMessage() ); + } + + if ( response.isWriteConcernErrorSet() ) { + const WCErrorDetail* errDetail = response.getWriteConcernError(); + return Status( static_cast<ErrorCodes::Error>(errDetail->getErrCode()), + errDetail->getErrMessage() ); + } + + return Status::OK(); + } + + Status clusterInsert( const string& ns, + const BSONObj& doc, + const BSONObj& writeConcern, + BatchedCommandResponse* response ) { auto_ptr<BatchedInsertRequest> insert( new BatchedInsertRequest() ); insert->addToDocuments( doc ); @@ -173,16 +198,23 @@ namespace mongo { request.setWriteConcern( writeConcern ); } + BatchedCommandResponse dummyResponse; + + if ( response == NULL ) { + response = &dummyResponse; + } + clusterWrite( request, response, false ); + return getStatus( *response ); } - void clusterUpdate( const string& ns, - const BSONObj& query, - const BSONObj& update, - bool upsert, - bool multi, - const BSONObj& writeConcern, - BatchedCommandResponse* response ) { + Status clusterUpdate( const string& ns, + const BSONObj& query, + const BSONObj& update, + bool upsert, + bool multi, + const BSONObj& writeConcern, + BatchedCommandResponse* response ) { auto_ptr<BatchedUpdateDocument> updateDoc( new BatchedUpdateDocument() ); updateDoc->setQuery( query ); updateDoc->setUpdateExpr( update ); @@ -199,14 +231,21 @@ namespace mongo { BatchedCommandRequest request( updateRequest.release() ); request.setNS( ns ); + BatchedCommandResponse dummyResponse; + + if ( response == NULL ) { + response = &dummyResponse; + } + clusterWrite( request, response, false ); + return getStatus( *response ); } - void clusterDelete( const string& ns, - const BSONObj& query, - int limit, - const BSONObj& writeConcern, - BatchedCommandResponse* response ) { + Status clusterDelete( const string& ns, + const BSONObj& query, + int limit, + const BSONObj& writeConcern, + BatchedCommandResponse* response ) { auto_ptr<BatchedDeleteDocument> deleteDoc( new BatchedDeleteDocument ); deleteDoc->setQuery( query ); deleteDoc->setLimit( limit ); @@ -221,18 +260,25 @@ namespace mongo { BatchedCommandRequest request( deleteRequest.release() ); request.setNS( ns ); + BatchedCommandResponse dummyResponse; + + if ( response == NULL ) { + response = &dummyResponse; + } + clusterWrite( request, response, false ); + return getStatus( *response ); } - void clusterCreateIndex( const string& ns, - BSONObj keys, - bool unique, - const BSONObj& writeConcern, - BatchedCommandResponse* response) { - clusterInsert( NamespaceString( ns ).getSystemIndexesCollection(), - createIndexDoc( ns, keys, unique ), - writeConcern, - response ); + Status clusterCreateIndex( const string& ns, + BSONObj keys, + bool unique, + const BSONObj& writeConcern, + BatchedCommandResponse* response ) { + return clusterInsert( NamespaceString( ns ).getSystemIndexesCollection(), + createIndexDoc( ns, keys, unique ), + writeConcern, + response ); } void clusterWrite( const BatchedCommandRequest& request, diff --git a/src/mongo/s/cluster_write.h b/src/mongo/s/cluster_write.h index 18628eee7d0..9bb15bcdc5c 100644 --- a/src/mongo/s/cluster_write.h +++ b/src/mongo/s/cluster_write.h @@ -78,35 +78,48 @@ namespace mongo { scoped_ptr<BatchWriteExecStats> _shardStats; }; - const BSONObj DefaultClusterWriteConcern = BSONObj(); - + /** + * Note: response can NEVER be NULL. + */ void clusterWrite( const BatchedCommandRequest& request, BatchedCommandResponse* response, bool autoSplit ); - void clusterInsert( const std::string& ns, - const BSONObj& doc, - const BSONObj& writeConcern, - BatchedCommandResponse* response ); - - void clusterUpdate( const std::string& ns, - const BSONObj& query, - const BSONObj& update, - bool upsert, - bool multi, - const BSONObj& writeConcern, - BatchedCommandResponse* response ); - - void clusterDelete( const std::string& ns, - const BSONObj& query, - int limit, - const BSONObj& writeConcern, - BatchedCommandResponse* response ); - - void clusterCreateIndex( const std::string& ns, - BSONObj keys, - bool unique, - const BSONObj& writeConcern, - BatchedCommandResponse* response ); + /** + * Note: response can be NULL if you don't care about the write statistics. + */ + Status clusterInsert( const std::string& ns, + const BSONObj& doc, + const BSONObj& writeConcern, + BatchedCommandResponse* response ); + + /** + * Note: response can be NULL if you don't care about the write statistics. + */ + Status clusterUpdate( const std::string& ns, + const BSONObj& query, + const BSONObj& update, + bool upsert, + bool multi, + const BSONObj& writeConcern, + BatchedCommandResponse* response ); + + /** + * Note: response can be NULL if you don't care about the write statistics. + */ + Status clusterDelete( const std::string& ns, + const BSONObj& query, + int limit, + const BSONObj& writeConcern, + BatchedCommandResponse* response ); + + /** + * Note: response can be NULL if you don't care about the write statistics. + */ + Status clusterCreateIndex( const std::string& ns, + BSONObj keys, + bool unique, + const BSONObj& writeConcern, + BatchedCommandResponse* response ); } // namespace mongo diff --git a/src/mongo/s/commands_admin.cpp b/src/mongo/s/commands_admin.cpp index a11dfc72d8b..b7e0d427f6f 100644 --- a/src/mongo/s/commands_admin.cpp +++ b/src/mongo/s/commands_admin.cpp @@ -46,8 +46,10 @@ #include "mongo/db/query/lite_parsed_query.h" #include "mongo/db/stats/counters.h" #include "mongo/db/wire_version.h" +#include "mongo/db/write_concern.h" #include "mongo/s/chunk.h" #include "mongo/s/client_info.h" +#include "mongo/s/cluster_write.h" #include "mongo/s/config.h" #include "mongo/s/grid.h" #include "mongo/s/strategy.h" @@ -665,13 +667,15 @@ namespace mongo { // receiving shard whenever a migrate occurs. else { // call ensureIndex with cache=false, see SERVER-1691 - bool ensureSuccess = conn->ensureIndex( ns , - proposedKey , - careAboutUnique , - "" , - false ); - if ( ! ensureSuccess ) { - errmsg = "ensureIndex failed to create index on primary shard"; + Status result = clusterCreateIndex( ns, + proposedKey, + careAboutUnique, + WriteConcernOptions::Default, + NULL ); + + if ( !result.isOK() ) { + errmsg = str::stream() << "ensureIndex failed to create index on " + << "primary shard: " << result.reason(); conn.done(); return false; } @@ -1298,11 +1302,19 @@ namespace mongo { log() << "going to start draining shard: " << s.getName() << endl; BSONObj newStatus = BSON( "$set" << BSON( ShardType::draining(true) ) ); - conn->update( ShardType::ConfigNS , searchDoc , newStatus, false /* do no upsert */); - errmsg = conn->getLastError(); - if ( errmsg.size() ) { - log() << "error starting remove shard: " << s.getName() << " err: " << errmsg << endl; + Status status = clusterUpdate( ShardType::ConfigNS, + searchDoc, + newStatus, + false /* do no upsert */, + false /* multi */, + WriteConcernOptions::AllConfigs, + NULL ); + + if ( !status.isOK() ) { + errmsg = status.reason(); + log() << "error starting remove shard: " << s.getName() + << " err: " << errmsg << endl; return false; } @@ -1311,10 +1323,15 @@ namespace mongo { PRINT(primaryLocalDoc); if (conn->count(DatabaseType::ConfigNS, primaryLocalDoc)) { log() << "This shard is listed as primary of local db. Removing entry." << endl; - conn->remove(DatabaseType::ConfigNS, BSON(DatabaseType::name("local"))); - errmsg = conn->getLastError(); - if ( errmsg.size() ) { - log() << "error removing local db: " << errmsg << endl; + Status status = clusterDelete( DatabaseType::ConfigNS, + BSON(DatabaseType::name("local")), + 0 /* limit */, + WriteConcernOptions::AllConfigs, + NULL ); + + if ( !status.isOK() ) { + log() << "error removing local db: " + << status.reason() << endl; return false; } } @@ -1343,11 +1360,16 @@ namespace mongo { if ( ( chunkCount == 0 ) && ( dbCount == 0 ) ) { log() << "going to remove shard: " << s.getName() << endl; audit::logRemoveShard(ClientBasic::getCurrent(), s.getName()); - conn->remove( ShardType::ConfigNS , searchDoc ); - - errmsg = conn->getLastError(); - if ( errmsg.size() ) { - log() << "error concluding remove shard: " << s.getName() << " err: " << errmsg << endl; + Status status = clusterDelete( ShardType::ConfigNS, + searchDoc, + 0, // limit + WriteConcernOptions::AllConfigs, + NULL ); + + if ( !status.isOK() ) { + errmsg = status.reason(); + log() << "error concluding remove shard: " << s.getName() + << " err: " << errmsg << endl; return false; } diff --git a/src/mongo/s/config.cpp b/src/mongo/s/config.cpp index 1065fae8e6c..c26fd001e40 100644 --- a/src/mongo/s/config.cpp +++ b/src/mongo/s/config.cpp @@ -35,8 +35,10 @@ #include "mongo/client/connpool.h" #include "mongo/client/dbclientcursor.h" #include "mongo/db/pdfile.h" +#include "mongo/db/write_concern.h" #include "mongo/s/chunk.h" #include "mongo/s/chunk_version.h" +#include "mongo/s/cluster_write.h" #include "mongo/s/config.h" #include "mongo/s/grid.h" #include "mongo/s/server.h" @@ -100,7 +102,7 @@ namespace mongo { _key = BSONObj(); } - void DBConfig::CollectionInfo::save( const string& ns , DBClientBase* conn ) { + void DBConfig::CollectionInfo::save( const string& ns ) { BSONObj key = BSON( "_id" << ns ); BSONObjBuilder val; @@ -116,9 +118,18 @@ namespace mongo { val.append(CollectionType::DEPRECATED_lastmodEpoch(), ChunkVersion::DROPPED().epoch()); } - conn->update(CollectionType::ConfigNS, key, val.obj(), true); - string err = conn->getLastError(); - uassert( 13473 , (string)"failed to save collection (" + ns + "): " + err , err.size() == 0 ); + Status result = clusterUpdate(CollectionType::ConfigNS, + key, + val.obj(), + true /* upsert */, + false /* multi */, + WriteConcernOptions::AllConfigs, + NULL); + + if ( !result.isOK() ) { + uasserted( 13473, str::stream() << "failed to save collection (" << ns + << "): " << result.reason() ); + } _dirty = false; } @@ -548,8 +559,6 @@ namespace mongo { } void DBConfig::_save( bool db, bool coll ) { - ScopedDbConnection conn(configServer.modelServer(), 30.0); - if( db ){ BSONObj n; @@ -559,10 +568,19 @@ namespace mongo { n = b.obj(); } - conn->update( DatabaseType::ConfigNS , BSON( DatabaseType::name( _name ) ) , n , true ); - string err = conn->getLastError(); - uassert( 13396 , (string)"DBConfig save failed: " + err , err.size() == 0 ); - + BatchedCommandResponse response; + Status result = clusterUpdate( DatabaseType::ConfigNS, + BSON( DatabaseType::name( _name )), + n, + true, // upsert + false, // multi + WriteConcernOptions::AllConfigs, + &response ); + + if ( !result.isOK() ) { + uasserted( 13396, str::stream() << "DBConfig save failed: " + << response.toBSON() ); + } } if( coll ){ @@ -570,12 +588,10 @@ namespace mongo { for ( Collections::iterator i=_collections.begin(); i!=_collections.end(); ++i ) { if ( ! i->second.isDirty() ) continue; - i->second.save( i->first , conn.get() ); + i->second.save( i->first ); } } - - conn.done(); } bool DBConfig::reload() { @@ -621,17 +637,16 @@ namespace mongo { // 2 grid.removeDB( _name ); - { - ScopedDbConnection conn(configServer.modelServer(), 30.0); - conn->remove( DatabaseType::ConfigNS , BSON( DatabaseType::name( _name ) ) ); - errmsg = conn->getLastError(); - if ( ! errmsg.empty() ) { - log() << "could not drop '" << _name << "': " << errmsg << endl; - conn.done(); - return false; - } - - conn.done(); + Status result = clusterDelete( DatabaseType::ConfigNS, + BSON( DatabaseType::name( _name )), + 0 /* limit */, + WriteConcernOptions::AllConfigs, + NULL ); + + if ( !result.isOK() ) { + errmsg = result.reason(); + log() << "could not drop '" << _name << "': " << errmsg << endl; + return false; } if ( ! configServer.allUp( errmsg ) ) { @@ -1043,33 +1058,70 @@ namespace mongo { log() << "warning: unknown setting [" << name << "]" << endl; } } + } + catch ( DBException& e ) { + warning() << "couldn't load settings on config db: " + << e.what() << endl; + } - if ( ! got.count( "chunksize" ) ) { - conn->insert(SettingsType::ConfigNS, - BSON(SettingsType::key("chunksize") << - SettingsType::chunksize(Chunk::MaxChunkSize / - (1024 * 1024)))); + if ( ! got.count( "chunksize" ) ) { + const int chunkSize = Chunk::MaxChunkSize / (1024 * 1024); + Status result = clusterInsert( SettingsType::ConfigNS, + BSON( SettingsType::key("chunksize") << + SettingsType::chunksize(chunkSize)), + WriteConcernOptions::AllConfigs, + NULL ); + if ( !result.isOK() ) { + warning() << "couldn't set chunkSize on config db: " << result.reason() << endl; } + } - // indexes - conn->ensureIndex(ChunkType::ConfigNS, - BSON(ChunkType::ns() << 1 << ChunkType::min() << 1 ), true); + // indexes + Status result = clusterCreateIndex( ChunkType::ConfigNS, + BSON( ChunkType::ns() << 1 << ChunkType::min() << 1 ), + true, // unique + WriteConcernOptions::AllConfigs, + NULL ); - conn->ensureIndex(ChunkType::ConfigNS, - BSON(ChunkType::ns() << 1 << - ChunkType::shard() << 1 << - ChunkType::min() << 1 ), true); + if ( !result.isOK() ) { + warning() << "couldn't create ns_1_min_1 index on config db: " + << result.reason() << endl; + } - conn->ensureIndex(ChunkType::ConfigNS, - BSON(ChunkType::ns() << 1 << - ChunkType::DEPRECATED_lastmod() << 1 ), true ); + result = clusterCreateIndex( ChunkType::ConfigNS, + BSON( ChunkType::ns() << 1 << + ChunkType::shard() << 1 << + ChunkType::min() << 1 ), + true, // unique + WriteConcernOptions::AllConfigs, + NULL ); + + if ( !result.isOK() ) { + warning() << "couldn't create ns_1_shard_1_min_1 index on config db: " + << result.reason() << endl; + } - conn->ensureIndex(ShardType::ConfigNS, BSON(ShardType::host() << 1), true); + result = clusterCreateIndex( ChunkType::ConfigNS, + BSON( ChunkType::ns() << 1 << + ChunkType::DEPRECATED_lastmod() << 1 ), + true, // unique + WriteConcernOptions::AllConfigs, + NULL ); - conn.done(); + if ( !result.isOK() ) { + warning() << "couldn't create ns_1_lastmod_1 index on config db: " + << result.reason() << endl; } - catch ( DBException& e ) { - warning() << "couldn't load settings or create indexes on config db: " << e.what() << endl; + + result = clusterCreateIndex( ShardType::ConfigNS, + BSON( ShardType::host() << 1 ), + true, // unique + WriteConcernOptions::AllConfigs, + NULL ); + + if ( !result.isOK() ) { + warning() << "couldn't create host_1 index on config db: " + << result.reason() << endl; } } @@ -1126,10 +1178,17 @@ namespace mongo { createdCapped = true; } - conn->insert( ChangelogType::ConfigNS , msg ); - conn.done(); + Status result = clusterInsert( ChangelogType::ConfigNS, + msg, + WriteConcernOptions::AllConfigs, + NULL ); + + if ( !result.isOK() ) { + log() << "Error encountered while logging config change with ID: " << changeID + << result.reason() << endl; + } } catch ( std::exception& e ) { @@ -1139,21 +1198,25 @@ namespace mongo { } void ConfigServer::replicaSetChange( const ReplicaSetMonitor * monitor ) { - try { - Shard s = Shard::lookupRSName(monitor->getName()); - if (s == Shard::EMPTY) { - LOG(1) << "replicaSetChange: shard not found for set: " << monitor->getServerAddress() << endl; - return; - } - ScopedDbConnection conn(configServer.getConnectionString().toString(), 30.0); - conn->update(ShardType::ConfigNS, - BSON(ShardType::name(s.getName())), - BSON("$set" << BSON(ShardType::host(monitor->getServerAddress())))); - conn.done(); + Shard s = Shard::lookupRSName(monitor->getName()); + if (s == Shard::EMPTY) { + LOG(1) << "replicaSetChange: shard not found for set: " << monitor->getServerAddress() << endl; + return; } - catch (DBException& e) { - error() << "RSChangeWatcher: could not update config db for set: " << monitor->getName() - << " to: " << monitor->getServerAddress() << causedBy(e) << endl; + + Status result = clusterUpdate(ShardType::ConfigNS, + BSON(ShardType::name(s.getName())), + BSON("$set" << BSON(ShardType::host(monitor->getServerAddress()))), + false, // upsert + false, // multi + WriteConcernOptions::AllConfigs, + NULL); + + if ( !result.isOK() ) { + error() << "RSChangeWatcher: could not update config db for set: " + << monitor->getName() + << " to: " << monitor->getServerAddress() + << ": " << result.reason() << endl; } } diff --git a/src/mongo/s/config.h b/src/mongo/s/config.h index dbb83328dce..f1c7c27f977 100644 --- a/src/mongo/s/config.h +++ b/src/mongo/s/config.h @@ -83,7 +83,7 @@ namespace mongo { bool isDirty() const { return _dirty; } bool wasDropped() const { return _dropped; } - void save( const string& ns , DBClientBase* conn ); + void save( const string& ns ); bool unique() const { return _unqiue; } BSONObj key() const { return _key; } diff --git a/src/mongo/s/config_upgrade_helpers.cpp b/src/mongo/s/config_upgrade_helpers.cpp index aed6c80cbaf..d9b35cd7185 100644 --- a/src/mongo/s/config_upgrade_helpers.cpp +++ b/src/mongo/s/config_upgrade_helpers.cpp @@ -31,7 +31,9 @@ #include "mongo/client/connpool.h" #include "mongo/db/field_parser.h" #include "mongo/db/namespace_string.h" +#include "mongo/db/write_concern.h" #include "mongo/s/cluster_client_internal.h" +#include "mongo/s/cluster_write.h" #include "mongo/s/type_config_version.h" #include "mongo/util/timer.h" @@ -414,43 +416,47 @@ namespace mongo { setUpgradeIdObj << VersionType::upgradeId(upgradeID); setUpgradeIdObj << VersionType::upgradeState(BSONObj()); - try { - ScopedDbConnection conn(configServer, 30); - conn->update(VersionType::ConfigNS, - BSON("_id" << 1 << VersionType::currentVersion(currentVersion)), - BSON("$set" << setUpgradeIdObj.done())); - _checkGLE(conn); - conn.done(); - } - catch (const DBException& e) { - return e.toStatus("could not initialize version info for upgrade"); - } + Status result = clusterUpdate(VersionType::ConfigNS, + BSON("_id" << 1 << VersionType::currentVersion(currentVersion)), + BSON("$set" << setUpgradeIdObj.done()), + false, // upsert + false, // multi + WriteConcernOptions::AllConfigs, + NULL); - return Status::OK(); + if ( !result.isOK() ) { + return Status( result.code(), + str::stream() << "could not initialize version info" + << "for upgrade: " << result.reason() ); + } + return result; } Status enterConfigUpgradeCriticalSection(const string& configServer, int currentVersion) { BSONObjBuilder setUpgradeStateObj; setUpgradeStateObj.append(VersionType::upgradeState(), BSON(inCriticalSectionField(true))); - try { - ScopedDbConnection conn(configServer, 30); - conn->update(VersionType::ConfigNS, - BSON("_id" << 1 << VersionType::currentVersion(currentVersion)), - BSON("$set" << setUpgradeStateObj.done())); - _checkGLE(conn); - conn.done(); - } - catch (const DBException& e) { + Status result = clusterUpdate(VersionType::ConfigNS, + BSON("_id" << 1 << VersionType::currentVersion(currentVersion)), + BSON("$set" << setUpgradeStateObj.done()), + false, // upsert + false, // multi + WriteConcernOptions::AllConfigs, + NULL); - // No cleanup message here since we're not sure if we wrote or not, and - // not dangerous either way except to prevent further updates (at which point - // the message is printed) - return e.toStatus("could not update version info to enter critical update section"); + log() << "entered critical section for config upgrade" << endl; + + // No cleanup message here since we're not sure if we wrote or not, and + // not dangerous either way except to prevent further updates (at which point + // the message is printed) + + if ( !result.isOK() ) { + return Status( result.code(), str::stream() << "could not update version info" + << "to enter critical update section: " + << result.reason() ); } - log() << "entered critical section for config upgrade" << endl; - return Status::OK(); + return result; } @@ -471,20 +477,21 @@ namespace mongo { unsetObj.append(VersionType::upgradeId(), 1); unsetObj.append(VersionType::upgradeState(), 1); - try { - ScopedDbConnection conn(configServer, 30); - conn->update(VersionType::ConfigNS, - BSON("_id" << 1 << VersionType::currentVersion(currentVersion)), - BSON("$set" << setObj.done() << "$unset" << unsetObj.done())); - _checkGLE(conn); - conn.done(); - } - catch (const DBException& e) { - return e.toStatus("could not write new version info and " - "exit critical upgrade section"); + Status result = clusterUpdate(VersionType::ConfigNS, + BSON("_id" << 1 << VersionType::currentVersion(currentVersion)), + BSON("$set" << setObj.done() << "$unset" << unsetObj.done()), + false, // upsert + false, // multi, + WriteConcernOptions::AllConfigs, + NULL); + + if ( !result.isOK() ) { + return Status( result.code(), str::stream() << "could not write new version info " + << " and exit critical upgrade section: " + << result.reason() ); } - return Status::OK(); + return result; } } diff --git a/src/mongo/s/config_upgrade_v0_to_v5.cpp b/src/mongo/s/config_upgrade_v0_to_v5.cpp index 7482a811a39..4058f493c8f 100644 --- a/src/mongo/s/config_upgrade_v0_to_v5.cpp +++ b/src/mongo/s/config_upgrade_v0_to_v5.cpp @@ -29,7 +29,9 @@ #include "mongo/s/config_upgrade.h" #include "mongo/client/connpool.h" +#include "mongo/db/write_concern.h" #include "mongo/s/cluster_client_internal.h" +#include "mongo/s/cluster_write.h" #include "mongo/s/type_config_version.h" #include "mongo/util/mongoutils/str.h" @@ -73,16 +75,16 @@ namespace mongo { // If the cluster has not previously been initialized, we need to set the version before // using so subsequent mongoses use the config data the same way. This requires all three // config servers online initially. - try { - ScopedDbConnection conn(configLoc, 30); - conn->update(VersionType::ConfigNS, BSON("_id" << 1), versionInfo.toBSON(), true); - _checkGLE(conn); - conn.done(); - } - catch (const DBException& e) { - - *errMsg = stream() << "error writing initial config version" << causedBy(e); - + Status result = clusterUpdate(VersionType::ConfigNS, BSON("_id" << 1), + versionInfo.toBSON(), + true /* upsert */, + false /* multi */, + WriteConcernOptions::AllConfigs, + NULL); + + if ( !result.isOK() ) { + *errMsg = stream() << "error writing initial config version: " + << result.reason(); return false; } diff --git a/src/mongo/s/grid.cpp b/src/mongo/s/grid.cpp index 9e5dcbeb33f..3d78fa08df2 100644 --- a/src/mongo/s/grid.cpp +++ b/src/mongo/s/grid.cpp @@ -38,6 +38,8 @@ #include "mongo/client/connpool.h" #include "mongo/db/json.h" #include "mongo/db/namespace_string.h" +#include "mongo/db/write_concern.h" +#include "mongo/s/cluster_write.h" #include "mongo/s/grid.h" #include "mongo/s/mongos_options.h" #include "mongo/s/shard.h" @@ -411,18 +413,20 @@ namespace mongo { conn.done(); return false; } + conn.done(); + } - log() << "going to add shard: " << shardDoc << endl; + log() << "going to add shard: " << shardDoc << endl; - conn->insert(ShardType::ConfigNS , shardDoc); - errMsg = conn->getLastError(); - if ( ! errMsg.empty() ) { - log() << "error adding shard: " << shardDoc << " err: " << errMsg << endl; - conn.done(); - return false; - } + Status result = clusterInsert( ShardType::ConfigNS, + shardDoc, + WriteConcernOptions::AllConfigs, + NULL ); - conn.done(); + if ( !result.isOK() ) { + errMsg = result.reason(); + log() << "error adding shard: " << shardDoc << " err: " << errMsg << endl; + return false; } Shard::reloadShardInfo(); diff --git a/src/mongo/s/write_ops/batch_downconvert.cpp b/src/mongo/s/write_ops/batch_downconvert.cpp index 0ccca2c3e38..6e3d1ce22c5 100644 --- a/src/mongo/s/write_ops/batch_downconvert.cpp +++ b/src/mongo/s/write_ops/batch_downconvert.cpp @@ -110,6 +110,18 @@ namespace mongo { response->addToErrDetails( batchError ); } + // Special case for making legacy "n" field result for insert match the write + // command result. + if ( request.getBatchType() == BatchedCommandRequest::BatchType_Insert && + batchError == NULL && + StringData( request.getNS() ).startsWith( "config." )) { + dassert( request.getInsertRequest()->getDocuments().size() == 1 ); + // n is always 0 for legacy inserts. + dassert( lastError.nObjects == 0 ); + + lastError.nObjects = 1; + } + response->setN( response->getN() + lastError.nObjects ); if ( !lastError.upsertedId.isEmpty() ) { diff --git a/src/mongo/s/write_ops/config_coordinator.cpp b/src/mongo/s/write_ops/config_coordinator.cpp index 9bed54407bd..dcb4a068cee 100644 --- a/src/mongo/s/write_ops/config_coordinator.cpp +++ b/src/mongo/s/write_ops/config_coordinator.cpp @@ -239,13 +239,14 @@ namespace mongo { static bool areResponsesEqual( const BatchedCommandResponse& responseA, const BatchedCommandResponse& responseB ) { + // Note: This needs to also take into account comparing responses from legacy writes + // and write commands. + // TODO: Better reporting of why not equal if ( responseA.getOk() != responseB.getOk() ) return false; if ( responseA.getN() != responseB.getN() ) return false; - if ( responseA.isUpsertDetailsSet() != responseB.isUpsertDetailsSet() ) - return false; if ( responseA.isUpsertDetailsSet() ) { // TODO: |