summaryrefslogtreecommitdiff
path: root/src/mongo/s/cluster_write.cpp
diff options
context:
space:
mode:
authorKaloian Manassiev <kaloian.manassiev@mongodb.com>2015-03-13 14:13:31 -0400
committerKaloian Manassiev <kaloian.manassiev@mongodb.com>2015-03-20 15:20:02 -0400
commit25476fbd6a359e638ab8a98acbacea56cc55f7ab (patch)
tree61c212de77a3add064a4dd4f3404a1c34c14cea0 /src/mongo/s/cluster_write.cpp
parent39e01c76694ed4b9cdd8315dab56ecf19f099af1 (diff)
downloadmongo-25476fbd6a359e638ab8a98acbacea56cc55f7ab.tar.gz
SERVER-17637 Sharding catalog manager interface
The beginnings of a sharding catalog manager interface to abstract catalog operations.
Diffstat (limited to 'src/mongo/s/cluster_write.cpp')
-rw-r--r--src/mongo/s/cluster_write.cpp355
1 files changed, 99 insertions, 256 deletions
diff --git a/src/mongo/s/cluster_write.cpp b/src/mongo/s/cluster_write.cpp
index 308138e95c8..e16d977e94b 100644
--- a/src/mongo/s/cluster_write.cpp
+++ b/src/mongo/s/cluster_write.cpp
@@ -37,6 +37,7 @@
#include "mongo/base/status.h"
#include "mongo/db/write_concern_options.h"
+#include "mongo/s/catalog/catalog_manager.h"
#include "mongo/s/chunk_manager.h"
#include "mongo/s/chunk_manager_targeter.h"
#include "mongo/s/client/dbclient_multi_command.h"
@@ -44,16 +45,13 @@
#include "mongo/s/dbclient_shard_resolver.h"
#include "mongo/s/grid.h"
#include "mongo/s/write_ops/batch_write_exec.h"
-#include "mongo/s/write_ops/config_coordinator.h"
#include "mongo/util/log.h"
#include "mongo/util/mongoutils/str.h"
-#include "mongo/util/net/hostandport.h"
namespace mongo {
using std::auto_ptr;
using std::vector;
- using std::endl;
using std::map;
using std::string;
using std::stringstream;
@@ -61,23 +59,23 @@ namespace mongo {
const int ConfigOpTimeoutMillis = 30 * 1000;
namespace {
- // TODO: consider writing a type for index instead
+
/**
* Constructs the BSON specification document for the given namespace, index key
* and options.
*/
- BSONObj createIndexDoc( const string& ns, const BSONObj& keys, bool unique ) {
+ BSONObj createIndexDoc(const string& ns, const BSONObj& keys, bool unique) {
BSONObjBuilder indexDoc;
- indexDoc.append( "ns" , ns );
- indexDoc.append( "key" , keys );
+ indexDoc.append("ns", ns);
+ indexDoc.append("key", keys);
stringstream indexName;
bool isFirstKey = true;
- for ( BSONObjIterator keyIter(keys); keyIter.more(); ) {
+ for (BSONObjIterator keyIter(keys); keyIter.more();) {
BSONElement currentKey = keyIter.next();
- if ( isFirstKey ) {
+ if (isFirstKey) {
isFirstKey = false;
}
else {
@@ -85,7 +83,7 @@ namespace mongo {
}
indexName << currentKey.fieldName() << "_";
- if ( currentKey.isNumber() ) {
+ if (currentKey.isNumber()) {
indexName << currentKey.numberInt();
}
else {
@@ -93,223 +91,118 @@ namespace mongo {
}
}
- indexDoc.append( "name", indexName.str() );
+ indexDoc.append("name", indexName.str());
- if ( unique ) {
- indexDoc.appendBool( "unique", unique );
+ if (unique) {
+ indexDoc.appendBool("unique", unique);
}
return indexDoc.obj();
}
- void clusterWrite(const BatchedCommandRequest& request,
- BatchedCommandResponse* response,
- bool autoSplit) {
-
- ClusterWriter writer(autoSplit, 0);
- writer.write(request, response);
- }
- }
-
- /**
- * Splits the chunks touched based from the targeter stats if needed.
- */
- static void splitIfNeeded( const string& ns, const TargeterStats& stats ) {
- if ( !Chunk::ShouldAutoSplit ) {
- return;
- }
-
- DBConfigPtr config;
-
- try {
- config = grid.getDBConfig( ns );
+ void toBatchError(const Status& status, BatchedCommandResponse* response) {
+ response->clear();
+ response->setErrCode(status.code());
+ response->setErrMessage(status.reason());
+ response->setOk(false);
+ dassert(response->isValid(NULL));
}
- catch ( const DBException& ex ) {
- warning() << "failed to get database config for " << ns
- << " while checking for auto-split: " << causedBy( ex ) << endl;
- return;
- }
-
- ChunkManagerPtr chunkManager;
- ShardPtr dummyShard;
- config->getChunkManagerOrPrimary( ns, chunkManager, dummyShard );
- if ( !chunkManager ) {
- return;
- }
+ /**
+ * Splits the chunks touched based from the targeter stats if needed.
+ */
+ void splitIfNeeded(const string& ns, const TargeterStats& stats) {
+ if (!Chunk::ShouldAutoSplit) {
+ return;
+ }
- for ( map<BSONObj, int>::const_iterator it = stats.chunkSizeDelta.begin();
- it != stats.chunkSizeDelta.end(); ++it ) {
+ DBConfigPtr config;
- ChunkPtr chunk;
try {
- chunk = chunkManager->findIntersectingChunk( it->first );
+ config = grid.getDBConfig(ns);
}
- catch ( const AssertionException& ex ) {
- warning() << "could not find chunk while checking for auto-split: "
- << causedBy( ex ) << endl;
+ catch (const DBException& ex) {
+ warning() << "failed to get database config for " << ns
+ << " while checking for auto-split: " << causedBy(ex);
return;
}
- chunk->splitIfShould( it->second );
- }
- }
-
- /**
- * Returns the currently-set config hosts for a cluster
- */
- static vector<ConnectionString> getConfigHosts() {
+ ChunkManagerPtr chunkManager;
+ ShardPtr dummyShard;
+ config->getChunkManagerOrPrimary(ns, chunkManager, dummyShard);
- vector<ConnectionString> configHosts;
- ConnectionString configHostOrHosts = configServer.getConnectionString();
- if ( configHostOrHosts.type() == ConnectionString::MASTER ) {
- configHosts.push_back( configHostOrHosts );
- }
- else if ( configHostOrHosts.type() == ConnectionString::SYNC ) {
- vector<HostAndPort> configHPs = configHostOrHosts.getServers();
- for ( vector<HostAndPort>::iterator it = configHPs.begin(); it != configHPs.end();
- ++it ) {
- configHosts.push_back( ConnectionString( *it ) );
+ if (!chunkManager) {
+ return;
}
- }
- else {
- // This is only for tests.
- dassert( configHostOrHosts.type() == ConnectionString::CUSTOM );
- configHosts.push_back( configHostOrHosts );
- }
- return configHosts;
- }
-
- static Status getStatus( const BatchedCommandResponse& response ) {
- if ( response.getOk() != 1 ) {
- return Status( static_cast<ErrorCodes::Error>(response.getErrCode()),
- response.getErrMessage() );
- }
+ for (map<BSONObj, int>::const_iterator it = stats.chunkSizeDelta.begin();
+ it != stats.chunkSizeDelta.end();
+ ++it) {
- if ( response.isErrDetailsSet() ) {
- const WriteErrorDetail* errDetail = response.getErrDetails().front();
- return Status( static_cast<ErrorCodes::Error>(errDetail->getErrCode()),
- errDetail->getErrMessage() );
- }
+ ChunkPtr chunk;
+ try {
+ chunk = chunkManager->findIntersectingChunk(it->first);
+ }
+ catch (const AssertionException& ex) {
+ warning() << "could not find chunk while checking for auto-split: "
+ << causedBy(ex);
+ return;
+ }
- if ( response.isWriteConcernErrorSet() ) {
- const WCErrorDetail* errDetail = response.getWriteConcernError();
- return Status( static_cast<ErrorCodes::Error>(errDetail->getErrCode()),
- errDetail->getErrMessage() );
+ chunk->splitIfShould(it->second);
+ }
}
- return Status::OK();
- }
+ } // namespace
- Status clusterInsert( const string& ns,
- const BSONObj& doc,
- BatchedCommandResponse* response ) {
- auto_ptr<BatchedInsertRequest> insert( new BatchedInsertRequest() );
- insert->addToDocuments( doc );
+ Status clusterCreateIndex( const string& ns,
+ BSONObj keys,
+ bool unique,
+ BatchedCommandResponse* response ) {
- BatchedCommandRequest request( insert.release() );
- request.setNS( ns );
- request.setWriteConcern(WriteConcernOptions::Acknowledged);
+ const NamespaceString nss(ns);
+ const std::string dbName = nss.db().toString();
- BatchedCommandResponse dummyResponse;
+ BSONObj indexDoc = createIndexDoc(ns, keys, unique);
- if ( response == NULL ) {
- response = &dummyResponse;
- }
+ // Go through the shard insert path
+ std::auto_ptr<BatchedInsertRequest> insert(new BatchedInsertRequest());
+ insert->addToDocuments(indexDoc);
- clusterWrite( request, response, false );
- return getStatus( *response );
- }
-
- Status clusterUpdate( const string& ns,
- const BSONObj& query,
- const BSONObj& update,
- bool upsert,
- bool multi,
- BatchedCommandResponse* response ) {
- auto_ptr<BatchedUpdateDocument> updateDoc( new BatchedUpdateDocument() );
- updateDoc->setQuery( query );
- updateDoc->setUpdateExpr( update );
- updateDoc->setUpsert( upsert );
- updateDoc->setMulti( multi );
-
- auto_ptr<BatchedUpdateRequest> updateRequest( new BatchedUpdateRequest() );
- updateRequest->addToUpdates( updateDoc.release() );
- updateRequest->setWriteConcern(WriteConcernOptions::Acknowledged);
-
- BatchedCommandRequest request( updateRequest.release() );
- request.setNS( ns );
+ BatchedCommandRequest request(insert.release());
+ request.setNS(nss.getSystemIndexesCollection());
+ request.setWriteConcern(WriteConcernOptions::Acknowledged);
BatchedCommandResponse dummyResponse;
-
- if ( response == NULL ) {
+ if (response == NULL) {
response = &dummyResponse;
}
- clusterWrite( request, response, false );
- return getStatus( *response );
- }
-
- Status clusterDelete( const string& ns,
- const BSONObj& query,
- int limit,
- BatchedCommandResponse* response ) {
- auto_ptr<BatchedDeleteDocument> deleteDoc( new BatchedDeleteDocument );
- deleteDoc->setQuery( query );
- deleteDoc->setLimit( limit );
-
- auto_ptr<BatchedDeleteRequest> deleteRequest( new BatchedDeleteRequest() );
- deleteRequest->addToDeletes( deleteDoc.release() );
- deleteRequest->setWriteConcern(WriteConcernOptions::Acknowledged);
-
- BatchedCommandRequest request( deleteRequest.release() );
- request.setNS( ns );
-
- BatchedCommandResponse dummyResponse;
+ ClusterWriter writer(false, 0);
+ writer.write(request, response);
- if ( response == NULL ) {
- response = &dummyResponse;
+ if (response->getOk() != 1) {
+ return Status(static_cast<ErrorCodes::Error>(response->getErrCode()),
+ response->getErrMessage());
}
- clusterWrite( request, response, false );
- return getStatus( *response );
- }
+ if (response->isErrDetailsSet()) {
+ const WriteErrorDetail* errDetail = response->getErrDetails().front();
- Status clusterCreateIndex( const string& ns,
- BSONObj keys,
- bool unique,
- BatchedCommandResponse* response ) {
- return clusterInsert( NamespaceString( ns ).getSystemIndexesCollection(),
- createIndexDoc( ns, keys, unique ),
- response );
- }
-
- bool validConfigWC( const BSONObj& writeConcern ) {
- BSONElement elem(writeConcern["w"]);
-
- if ( elem.eoo() ) {
- return true;
+ return Status(static_cast<ErrorCodes::Error>(errDetail->getErrCode()),
+ errDetail->getErrMessage());
}
- if ( elem.isNumber() && elem.numberInt() <= 1 ) {
- return true;
- }
+ if (response->isWriteConcernErrorSet()) {
+ const WCErrorDetail* errDetail = response->getWriteConcernError();
- if ( elem.type() == String && elem.str() == "majority" ) {
- return true;
+ return Status(static_cast<ErrorCodes::Error>(errDetail->getErrCode()),
+ errDetail->getErrMessage());
}
- return false;
+ return Status::OK();
}
- static void toBatchError( const Status& status, BatchedCommandResponse* response ) {
- response->clear();
- response->setErrCode( status.code() );
- response->setErrMessage( status.reason() );
- response->setOk( false );
- dassert( response->isValid(NULL) );
- }
void ClusterWriter::write( const BatchedCommandRequest& origRequest,
BatchedCommandResponse* response ) {
@@ -355,34 +248,32 @@ namespace mongo {
}
// Config writes and shard writes are done differently
- string dbName = nss.db().toString();
- if ( dbName == "config" || dbName == "admin" ) {
- // We only support batch sizes of one for config writes
- if ( request.sizeWriteOps() != 1 ) {
- toBatchError( Status( ErrorCodes::InvalidOptions,
- mongoutils::str::stream() << "Writes to config servers must "
- "have batch size of 1, found "
- << request.sizeWriteOps() ),
- response );
- return;
+ const string dbName = nss.db().toString();
+
+ if (dbName == "config" || dbName == "admin") {
+ grid.catalogManager()->writeConfigServerDirect(request, response);
+ }
+ else {
+ ChunkManagerTargeter targeter(request.getTargetingNSS());
+ Status targetInitStatus = targeter.init();
+
+ if (!targetInitStatus.isOK()) {
+ // Errors will be reported in response if we are unable to target
+ warning() << "could not initialize targeter for"
+ << (request.isInsertIndexRequest() ? " index" : "")
+ << " write op in collection " << request.getTargetingNS();
}
- // We only support {w: 0}, {w: 1}, and {w: 'majority'} write concern for config writes
- if ( request.isWriteConcernSet() && !validConfigWC( request.getWriteConcern() )) {
- toBatchError( Status( ErrorCodes::InvalidOptions,
- mongoutils::str::stream() << "Invalid write concern for write"
- " to config servers: " << request.getWriteConcern() ),
- response );
- return;
+ DBClientShardResolver resolver;
+ DBClientMultiCommand dispatcher;
+ BatchWriteExec exec(&targeter, &resolver, &dispatcher);
+ exec.executeBatch(request, response);
+
+ if (_autoSplit) {
+ splitIfNeeded(request.getNS(), *targeter.getStats());
}
- // We need to support "best-effort" writes for pings to the config server.
- // {w:0} (!verbose) writes are interpreted as best-effort in this case - they may still
- // error, but do not do the initial fsync check.
- configWrite(request, response);
- }
- else {
- shardWrite( request, response );
+ _stats->setShardStats(exec.releaseStats());
}
}
@@ -394,54 +285,6 @@ namespace mongo {
return *_stats;
}
- void ClusterWriter::shardWrite( const BatchedCommandRequest& request,
- BatchedCommandResponse* response ) {
-
- ChunkManagerTargeter targeter(request.getTargetingNSS());
- Status targetInitStatus = targeter.init();
-
- if ( !targetInitStatus.isOK() ) {
-
- warning() << "could not initialize targeter for"
- << ( request.isInsertIndexRequest() ? " index" : "" )
- << " write op in collection " << request.getTargetingNS() << endl;
-
- // Errors will be reported in response if we are unable to target
- }
-
- DBClientShardResolver resolver;
- DBClientMultiCommand dispatcher;
- BatchWriteExec exec( &targeter, &resolver, &dispatcher );
- exec.executeBatch( request, response );
-
- if ( _autoSplit )
- splitIfNeeded( request.getNS(), *targeter.getStats() );
-
- _stats->setShardStats( exec.releaseStats() );
- }
-
- void ClusterWriter::configWrite(const BatchedCommandRequest& request,
- BatchedCommandResponse* response) {
-
- DBClientMultiCommand dispatcher;
- vector<ConnectionString> configHosts = getConfigHosts();
-
- if (configHosts.size() > 1) {
-
- // We can't support no-_id upserts to multiple config servers - the _ids will differ
- if (BatchedCommandRequest::containsNoIDUpsert(request)) {
- toBatchError(Status(ErrorCodes::InvalidOptions,
- mongoutils::str::stream() << "upserts to multiple config servers must"
- " include _id"),
- response);
- return;
- }
- }
-
- ConfigCoordinator exec( &dispatcher, configHosts );
- exec.executeBatch(request, response);
- }
-
void ClusterWriterStats::setShardStats( BatchWriteExecStats* shardStats ) {
_shardStats.reset( shardStats );
}