/**
* Copyright (C) 2013 10gen Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects
* for all of the code used other than as permitted herein. If you modify
* file(s) with this exception, you may extend this exception to your
* version of the file(s), but you are not obligated to do so. If you do not
* wish to do so, delete this exception statement from your version. If you
* delete this exception statement from all source files in the program,
* then also delete it in the license file.
*/
#include "mongo/s/write_ops/batched_command_request.h"
#include "mongo/bson/bsonobjiterator.h"
#include "mongo/db/namespace_string.h"
namespace mongo {
using std::auto_ptr;
using std::string;
using std::vector;
const size_t BatchedCommandRequest::kMaxWriteBatchSize = 1000;
BatchedCommandRequest::BatchedCommandRequest( BatchType batchType ) :
_batchType( batchType ) {
switch ( getBatchType() ) {
case BatchedCommandRequest::BatchType_Insert:
_insertReq.reset( new BatchedInsertRequest );
return;
case BatchedCommandRequest::BatchType_Update:
_updateReq.reset( new BatchedUpdateRequest );
return;
default:
dassert( getBatchType() == BatchedCommandRequest::BatchType_Delete );
_deleteReq.reset( new BatchedDeleteRequest );
return;
}
}
// This macro just invokes a given method on one of the three types of ops with parameters
#define INVOKE(M,...) \
{\
switch ( getBatchType() ) {\
case BatchedCommandRequest::BatchType_Insert:\
return _insertReq->M(__VA_ARGS__);\
case BatchedCommandRequest::BatchType_Update:\
return _updateReq->M(__VA_ARGS__);\
default:\
dassert( getBatchType() == BatchedCommandRequest::BatchType_Delete );\
return _deleteReq->M(__VA_ARGS__);\
}\
}
BatchedCommandRequest::BatchType BatchedCommandRequest::getBatchType() const {
return _batchType;
}
BatchedInsertRequest* BatchedCommandRequest::getInsertRequest() const {
return _insertReq.get();
}
BatchedUpdateRequest* BatchedCommandRequest::getUpdateRequest() const {
return _updateReq.get();
}
BatchedDeleteRequest* BatchedCommandRequest::getDeleteRequest() const {
return _deleteReq.get();
}
bool BatchedCommandRequest::isInsertIndexRequest() const {
if ( _batchType != BatchedCommandRequest::BatchType_Insert ) return false;
return getNSS().isSystemDotIndexes();
}
static bool extractUniqueIndex( const BSONObj& indexDesc ) {
return indexDesc["unique"].trueValue();
}
bool BatchedCommandRequest::isUniqueIndexRequest() const {
if ( !isInsertIndexRequest() ) return false;
return extractUniqueIndex( getInsertRequest()->getDocumentsAt( 0 ) );
}
bool BatchedCommandRequest::isValidIndexRequest( string* errMsg ) const {
string dummy;
if ( !errMsg )
errMsg = &dummy;
dassert( isInsertIndexRequest() );
if ( sizeWriteOps() != 1 ) {
*errMsg = "invalid batch request for index creation";
return false;
}
const NamespaceString& targetNSS = getTargetingNSS();
if ( !targetNSS.isValid() ) {
*errMsg = targetNSS.ns() + " is not a valid namespace to index";
return false;
}
const NamespaceString& reqNSS = getNSS();
if ( reqNSS.db().compare( targetNSS.db() ) != 0 ) {
*errMsg = targetNSS.ns() + " namespace is not in the request database "
+ reqNSS.db().toString();
return false;
}
return true;
}
string BatchedCommandRequest::getTargetingNS() const {
return getTargetingNSS().toString();
}
const NamespaceString& BatchedCommandRequest::getTargetingNSS() const {
if ( !isInsertIndexRequest() ) return getNSS();
INVOKE(getTargetingNSS);
}
static BSONObj extractIndexKeyPattern( const BSONObj& indexDesc ) {
return indexDesc["key"].Obj();
}
BSONObj BatchedCommandRequest::getIndexKeyPattern() const {
dassert( isInsertIndexRequest() );
return extractIndexKeyPattern( getInsertRequest()->getDocumentsAt( 0 ) );
}
bool BatchedCommandRequest::isVerboseWC() const {
if ( !isWriteConcernSet() ) {
return true;
}
BSONObj writeConcern = getWriteConcern();
BSONElement wElem = writeConcern["w"];
if ( !wElem.isNumber() || wElem.Number() != 0 ) {
return true;
}
return false;
}
void BatchedCommandRequest::cloneTo( BatchedCommandRequest* other ) const {
other->_insertReq.reset();
other->_updateReq.reset();
other->_deleteReq.reset();
other->_batchType = _batchType;
switch ( getBatchType() ) {
case BatchedCommandRequest::BatchType_Insert:
other->_insertReq.reset( new BatchedInsertRequest );
_insertReq->cloneTo( other->_insertReq.get() );
return;
case BatchedCommandRequest::BatchType_Update:
other->_updateReq.reset( new BatchedUpdateRequest );
_updateReq->cloneTo( other->_updateReq.get() );
return;
default:
dassert( getBatchType() == BatchedCommandRequest::BatchType_Delete );
other->_deleteReq.reset( new BatchedDeleteRequest );
_deleteReq->cloneTo( other->_deleteReq.get() );
return;
}
}
bool BatchedCommandRequest::isValid( std::string* errMsg ) const {
INVOKE( isValid, errMsg );
}
BSONObj BatchedCommandRequest::toBSON() const {
INVOKE( toBSON );
}
bool BatchedCommandRequest::parseBSON( const BSONObj& source, std::string* errMsg ) {
INVOKE( parseBSON, source, errMsg );
}
void BatchedCommandRequest::clear() {
INVOKE( clear );
}
std::string BatchedCommandRequest::toString() const {
INVOKE( toString );
}
void BatchedCommandRequest::setNSS( const NamespaceString& nss ) {
INVOKE( setCollNameNS, nss );
}
void BatchedCommandRequest::setNS( StringData collName ) {
INVOKE( setCollName, collName );
}
const std::string& BatchedCommandRequest::getNS() const {
INVOKE( getCollName );
}
const NamespaceString& BatchedCommandRequest::getNSS() const {
INVOKE(getCollNameNS);
}
std::size_t BatchedCommandRequest::sizeWriteOps() const {
switch ( getBatchType() ) {
case BatchedCommandRequest::BatchType_Insert:
return _insertReq->sizeDocuments();
case BatchedCommandRequest::BatchType_Update:
return _updateReq->sizeUpdates();
default:
return _deleteReq->sizeDeletes();
}
}
void BatchedCommandRequest::setWriteConcern( const BSONObj& writeConcern ) {
INVOKE( setWriteConcern, writeConcern );
}
void BatchedCommandRequest::unsetWriteConcern() {
INVOKE( unsetWriteConcern );
}
bool BatchedCommandRequest::isWriteConcernSet() const {
INVOKE( isWriteConcernSet );
}
const BSONObj& BatchedCommandRequest::getWriteConcern() const {
INVOKE( getWriteConcern );
}
void BatchedCommandRequest::setOrdered( bool continueOnError ) {
INVOKE( setOrdered, continueOnError );
}
void BatchedCommandRequest::unsetOrdered() {
INVOKE( unsetOrdered );
}
bool BatchedCommandRequest::isOrderedSet() const {
INVOKE( isOrderedSet );
}
bool BatchedCommandRequest::getOrdered() const {
INVOKE( getOrdered );
}
void BatchedCommandRequest::setMetadata(BatchedRequestMetadata* metadata) {
INVOKE( setMetadata, metadata );
}
void BatchedCommandRequest::unsetMetadata() {
INVOKE( unsetMetadata );
}
bool BatchedCommandRequest::isMetadataSet() const {
INVOKE( isMetadataSet );
}
BatchedRequestMetadata* BatchedCommandRequest::getMetadata() const {
INVOKE( getMetadata );
}
/**
* Generates a new request with insert _ids if required. Otherwise returns NULL.
*/
BatchedCommandRequest* //
BatchedCommandRequest::cloneWithIds(const BatchedCommandRequest& origCmdRequest) {
if (origCmdRequest.getBatchType() != BatchedCommandRequest::BatchType_Insert
|| origCmdRequest.isInsertIndexRequest())
return NULL;
auto_ptr idRequest;
BatchedInsertRequest* origRequest = origCmdRequest.getInsertRequest();
const vector& inserts = origRequest->getDocuments();
size_t i = 0u;
for (vector::const_iterator it = inserts.begin(); it != inserts.end(); ++it, ++i) {
const BSONObj& insert = *it;
BSONObj idInsert;
if (insert["_id"].eoo()) {
BSONObjBuilder idInsertB;
idInsertB.append("_id", OID::gen());
idInsertB.appendElements(insert);
idInsert = idInsertB.obj();
}
if (NULL == idRequest.get() && !idInsert.isEmpty()) {
idRequest.reset(new BatchedInsertRequest);
origRequest->cloneTo(idRequest.get());
}
if (!idInsert.isEmpty()) {
idRequest->setDocumentAt(i, idInsert);
}
}
if (NULL == idRequest.get())
return NULL;
// Command request owns idRequest
return new BatchedCommandRequest(idRequest.release());
}
bool BatchedCommandRequest::containsNoIDUpsert(const BatchedCommandRequest& request) {
if (request.getBatchType() != BatchedCommandRequest::BatchType_Update)
return false;
const vector& updates =
request.getUpdateRequest()->getUpdates();
for (vector::const_iterator it = updates.begin();
it != updates.end(); ++it) {
const BatchedUpdateDocument* updateDoc = *it;
if (updateDoc->getUpsert() && updateDoc->getQuery()["_id"].eoo())
return true;
}
return false;
}
bool BatchedCommandRequest::containsUpserts( const BSONObj& writeCmdObj ) {
BSONElement updatesEl = writeCmdObj[BatchedUpdateRequest::updates()];
if ( updatesEl.type() != Array ) {
return false;
}
BSONObjIterator it( updatesEl.Obj() );
while ( it.more() ) {
BSONElement updateEl = it.next();
if ( !updateEl.isABSONObj() ) continue;
if ( updateEl.Obj()[BatchedUpdateDocument::upsert()].trueValue() ) return true;
}
return false;
}
bool BatchedCommandRequest::getIndexedNS( const BSONObj& writeCmdObj,
string* nsToIndex,
string* errMsg ) {
BSONElement documentsEl = writeCmdObj[BatchedInsertRequest::documents()];
if ( documentsEl.type() != Array ) {
*errMsg = "index write batch is invalid";
return false;
}
BSONObjIterator it( documentsEl.Obj() );
if ( !it.more() ) {
*errMsg = "index write batch is empty";
return false;
}
BSONElement indexDescEl = it.next();
*nsToIndex = indexDescEl["ns"].str();
if ( *nsToIndex == "" ) {
*errMsg = "index write batch contains an invalid index descriptor";
return false;
}
if ( it.more() ) {
*errMsg = "index write batches may only contain a single index descriptor";
return false;
}
return true;
}
} // namespace mongo