/**
* Copyright (C) 2013 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding
#include "mongo/platform/basic.h"
#include "mongo/s/cluster_write.h"
#include
#include
#include "mongo/base/status.h"
#include "mongo/db/write_concern_options.h"
#include "mongo/s/catalog/catalog_cache.h"
#include "mongo/s/catalog/catalog_manager.h"
#include "mongo/s/chunk_manager.h"
#include "mongo/s/chunk_manager_targeter.h"
#include "mongo/s/client/dbclient_multi_command.h"
#include "mongo/s/config.h"
#include "mongo/s/dbclient_shard_resolver.h"
#include "mongo/s/grid.h"
#include "mongo/s/write_ops/batch_write_exec.h"
#include "mongo/util/log.h"
#include "mongo/util/mongoutils/str.h"
namespace mongo {
using boost::shared_ptr;
using std::auto_ptr;
using std::vector;
using std::map;
using std::string;
using std::stringstream;
const int ConfigOpTimeoutMillis = 30 * 1000;
namespace {
/**
* Constructs the BSON specification document for the given namespace, index key
* and options.
*/
BSONObj createIndexDoc(const string& ns, const BSONObj& keys, bool unique) {
BSONObjBuilder indexDoc;
indexDoc.append("ns", ns);
indexDoc.append("key", keys);
stringstream indexName;
bool isFirstKey = true;
for (BSONObjIterator keyIter(keys); keyIter.more();) {
BSONElement currentKey = keyIter.next();
if (isFirstKey) {
isFirstKey = false;
}
else {
indexName << "_";
}
indexName << currentKey.fieldName() << "_";
if (currentKey.isNumber()) {
indexName << currentKey.numberInt();
}
else {
indexName << currentKey.str(); //this should match up with shell command
}
}
indexDoc.append("name", indexName.str());
if (unique) {
indexDoc.appendBool("unique", unique);
}
return indexDoc.obj();
}
void toBatchError(const Status& status, BatchedCommandResponse* response) {
response->clear();
response->setErrCode(status.code());
response->setErrMessage(status.reason());
response->setOk(false);
dassert(response->isValid(NULL));
}
/**
* Splits the chunks touched based from the targeter stats if needed.
*/
void splitIfNeeded(const NamespaceString& nss, const TargeterStats& stats) {
if (!Chunk::ShouldAutoSplit) {
return;
}
auto status = grid.catalogCache()->getDatabase(nss.db().toString());
if (!status.isOK()) {
warning() << "failed to get database config for " << nss
<< " while checking for auto-split: " << status.getStatus();
return;
}
shared_ptr config = status.getValue();
ChunkManagerPtr chunkManager;
ShardPtr dummyShard;
config->getChunkManagerOrPrimary(nss, chunkManager, dummyShard);
if (!chunkManager) {
return;
}
for (map::const_iterator it = stats.chunkSizeDelta.begin();
it != stats.chunkSizeDelta.end();
++it) {
ChunkPtr chunk;
try {
chunk = chunkManager->findIntersectingChunk(it->first);
}
catch (const AssertionException& ex) {
warning() << "could not find chunk while checking for auto-split: "
<< causedBy(ex);
return;
}
chunk->splitIfShould(it->second);
}
}
} // namespace
Status clusterCreateIndex( const string& ns,
BSONObj keys,
bool unique,
BatchedCommandResponse* response ) {
const NamespaceString nss(ns);
const std::string dbName = nss.db().toString();
BSONObj indexDoc = createIndexDoc(ns, keys, unique);
// Go through the shard insert path
std::auto_ptr insert(new BatchedInsertRequest());
insert->addToDocuments(indexDoc);
BatchedCommandRequest request(insert.release());
request.setNS(nss.getSystemIndexesCollection());
request.setWriteConcern(WriteConcernOptions::Acknowledged);
BatchedCommandResponse dummyResponse;
if (response == NULL) {
response = &dummyResponse;
}
ClusterWriter writer(false, 0);
writer.write(request, response);
if (response->getOk() != 1) {
return Status(static_cast(response->getErrCode()),
response->getErrMessage());
}
if (response->isErrDetailsSet()) {
const WriteErrorDetail* errDetail = response->getErrDetails().front();
return Status(static_cast(errDetail->getErrCode()),
errDetail->getErrMessage());
}
if (response->isWriteConcernErrorSet()) {
const WCErrorDetail* errDetail = response->getWriteConcernError();
return Status(static_cast(errDetail->getErrCode()),
errDetail->getErrMessage());
}
return Status::OK();
}
void ClusterWriter::write( const BatchedCommandRequest& origRequest,
BatchedCommandResponse* response ) {
// Add _ids to insert request if req'd
auto_ptr idRequest(BatchedCommandRequest::cloneWithIds(origRequest));
const BatchedCommandRequest& request = NULL != idRequest.get() ? *idRequest : origRequest;
const NamespaceString& nss = request.getNSS();
if ( !nss.isValid() ) {
toBatchError( Status( ErrorCodes::InvalidNamespace,
nss.ns() + " is not a valid namespace" ),
response );
return;
}
if ( !NamespaceString::validCollectionName( nss.coll() ) ) {
toBatchError( Status( ErrorCodes::BadValue,
str::stream() << "invalid collection name " << nss.coll() ),
response );
return;
}
if ( request.sizeWriteOps() == 0u ) {
toBatchError( Status( ErrorCodes::InvalidLength,
"no write ops were included in the batch" ),
response );
return;
}
if ( request.sizeWriteOps() > BatchedCommandRequest::kMaxWriteBatchSize ) {
toBatchError( Status( ErrorCodes::InvalidLength,
str::stream() << "exceeded maximum write batch size of "
<< BatchedCommandRequest::kMaxWriteBatchSize ),
response );
return;
}
string errMsg;
if ( request.isInsertIndexRequest() && !request.isValidIndexRequest( &errMsg ) ) {
toBatchError( Status( ErrorCodes::InvalidOptions, errMsg ), response );
return;
}
// Config writes and shard writes are done differently
const string dbName = nss.db().toString();
if (dbName == "config" || dbName == "admin") {
grid.catalogManager()->writeConfigServerDirect(request, response);
}
else {
ChunkManagerTargeter targeter(request.getTargetingNSS());
Status targetInitStatus = targeter.init();
if (!targetInitStatus.isOK()) {
// Errors will be reported in response if we are unable to target
warning() << "could not initialize targeter for"
<< (request.isInsertIndexRequest() ? " index" : "")
<< " write op in collection " << request.getTargetingNS();
}
DBClientShardResolver resolver;
DBClientMultiCommand dispatcher;
BatchWriteExec exec(&targeter, &resolver, &dispatcher);
exec.executeBatch(request, response);
if (_autoSplit) {
splitIfNeeded(request.getNSS(), *targeter.getStats());
}
_stats->setShardStats(exec.releaseStats());
}
}
ClusterWriter::ClusterWriter( bool autoSplit, int timeoutMillis ) :
_autoSplit( autoSplit ), _timeoutMillis( timeoutMillis ), _stats( new ClusterWriterStats ) {
}
const ClusterWriterStats& ClusterWriter::getStats() {
return *_stats;
}
void ClusterWriterStats::setShardStats( BatchWriteExecStats* shardStats ) {
_shardStats.reset( shardStats );
}
bool ClusterWriterStats::hasShardStats() const {
return NULL != _shardStats.get();
}
const BatchWriteExecStats& ClusterWriterStats::getShardStats() const {
return *_shardStats;
}
} // namespace mongo