summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/commands/write_commands/batch_executor.cpp23
-rw-r--r--src/mongo/db/commands/write_commands/write_commands.cpp4
-rw-r--r--src/mongo/db/field_parser.cpp24
-rw-r--r--src/mongo/db/field_parser.h9
-rw-r--r--src/mongo/s/batch_write_op.cpp91
-rw-r--r--src/mongo/s/batch_write_op.h22
-rw-r--r--src/mongo/s/batch_write_op_test.cpp14
-rw-r--r--src/mongo/s/batched_command_request.cpp14
-rw-r--r--src/mongo/s/batched_command_request.h2
-rw-r--r--src/mongo/s/batched_command_response.cpp24
-rw-r--r--src/mongo/s/batched_command_response.h6
-rw-r--r--src/mongo/s/batched_upsert_detail.cpp4
-rw-r--r--src/mongo/s/batched_upsert_detail.h2
-rw-r--r--src/mongo/s/commands/cluster_write_cmd.cpp6
-rw-r--r--src/mongo/s/mock_multi_command.h4
15 files changed, 207 insertions, 42 deletions
diff --git a/src/mongo/db/commands/write_commands/batch_executor.cpp b/src/mongo/db/commands/write_commands/batch_executor.cpp
index 51868e69ea2..d7116443e17 100644
--- a/src/mongo/db/commands/write_commands/batch_executor.cpp
+++ b/src/mongo/db/commands/write_commands/batch_executor.cpp
@@ -48,26 +48,6 @@
namespace mongo {
- namespace {
-
- // We're assuming here that writeConcern was parsed and is valid or is not
- // present. This is just a quick check of whether is is {w:0} or not.
- bool verboseResponse( const BatchedCommandRequest& request ) {
- if ( !request.isWriteConcernSet() ) {
- return true;
- }
-
- BSONObj writeConcern = request.getWriteConcern();
- BSONElement wElem = writeConcern["w"];
- if ( !wElem.isNumber() || wElem.Number() != 0 ) {
- return true;
- }
-
- return false;
- }
-
- }
-
WriteBatchExecutor::WriteBatchExecutor( const BSONObj& wc,
Client* client,
OpCounters* opCounters,
@@ -89,7 +69,7 @@ namespace mongo {
// Apply each batch item, stopping on an error if we were asked to apply the batch
// sequentially.
size_t numBatchOps = request.sizeWriteOps();
- bool verbose = verboseResponse( request );
+ bool verbose = request.isVerboseWC();
for ( size_t i = 0; i < numBatchOps; i++ ) {
if ( applyWriteItem( BatchItemRef( &request, i ),
@@ -196,6 +176,7 @@ namespace mongo {
// code would already be set.
response->setOk( !response->isErrCodeSet() );
response->setN( stats.numUpdated + stats.numInserted + stats.numDeleted );
+ dassert( response->isValid( NULL ) );
}
namespace {
diff --git a/src/mongo/db/commands/write_commands/write_commands.cpp b/src/mongo/db/commands/write_commands/write_commands.cpp
index 292584f755b..87ce6d100d5 100644
--- a/src/mongo/db/commands/write_commands/write_commands.cpp
+++ b/src/mongo/db/commands/write_commands/write_commands.cpp
@@ -89,10 +89,14 @@ namespace mongo {
BatchedCommandResponse response;
if ( !request.parseBSON( cmdObj, &errMsg ) || !request.isValid( &errMsg ) ) {
+
// Batch parse failure
response.setOk( false );
+ response.setN( 0 );
response.setErrCode( 99999 );
response.setErrMessage( errMsg );
+
+ dassert( response.isValid( &errMsg ) );
result.appendElements( response.toBSON() );
// TODO
diff --git a/src/mongo/db/field_parser.cpp b/src/mongo/db/field_parser.cpp
index 0ce90b95e9a..7a3a61b3203 100644
--- a/src/mongo/db/field_parser.cpp
+++ b/src/mongo/db/field_parser.cpp
@@ -283,4 +283,28 @@ namespace mongo {
return FIELD_INVALID;
}
+ FieldParser::FieldState FieldParser::extractID( BSONObj doc,
+ const BSONField<BSONObj>& field,
+ BSONObj* out,
+ string* errMsg ) {
+ BSONElement elem = doc[field.name()];
+ if (elem.eoo()) {
+ if (field.hasDefault()) {
+ *out = field.getDefault().firstElement().wrap( "" );
+ return FIELD_DEFAULT;
+ }
+ else {
+ return FIELD_NONE;
+ }
+ }
+
+ if ( elem.type() != Array ) {
+ *out = elem.wrap( "" ).getOwned();
+ return FIELD_SET;
+ }
+
+ _genFieldErrMsg(doc, field, "id", errMsg);
+ return FIELD_INVALID;
+ }
+
} // namespace mongo
diff --git a/src/mongo/db/field_parser.h b/src/mongo/db/field_parser.h
index d814a3cbcd9..36a8776f529 100644
--- a/src/mongo/db/field_parser.h
+++ b/src/mongo/db/field_parser.h
@@ -122,6 +122,15 @@ namespace mongo {
string* errMsg = NULL);
/**
+ * Extracts a document id from a particular field name, which may be of any type but Array.
+ * Wraps the extracted id value in a BSONObj with one element and empty field name.
+ */
+ static FieldState extractID( BSONObj doc,
+ const BSONField<BSONObj>& field,
+ BSONObj* out,
+ string* errMsg = NULL );
+
+ /**
* Extracts a mandatory BSONSerializable structure 'field' from the object 'doc'. Write
* the extracted contents to '*out' if successful or fills '*errMsg', if exising,
* otherwise. This variant relies on T having a parseBSON, which all
diff --git a/src/mongo/s/batch_write_op.cpp b/src/mongo/s/batch_write_op.cpp
index 67c1dda5725..28b15fc9196 100644
--- a/src/mongo/s/batch_write_op.cpp
+++ b/src/mongo/s/batch_write_op.cpp
@@ -32,6 +32,14 @@
namespace mongo {
+ BatchWriteStats::BatchWriteStats() :
+ numInserted( 0 ), numUpserted( 0 ), numUpdated( 0 ), numDeleted( 0 ) {
+ }
+
+ BatchWriteOp::BatchWriteOp() :
+ _clientRequest( NULL ), _writeOps( NULL ), _stats( new BatchWriteStats ) {
+ }
+
void BatchWriteOp::initClientRequest( const BatchedCommandRequest* clientRequest ) {
dassert( clientRequest->isValid( NULL ) );
@@ -288,10 +296,6 @@ namespace mongo {
return errCode == ErrorCodes::WriteConcernFailed;
}
- static bool isItemErrorsOnlyCode( int errCode ) {
- return errCode == ErrorCodes::MultipleErrorsOccurred;
- }
-
// Given *either* a batch error or an array of per-item errors, copies errors we're interested
// in into a TrackedErrorMap
static void trackErrors( const ShardEndpoint& endpoint,
@@ -316,6 +320,28 @@ namespace mongo {
}
}
+ static void incBatchStats( BatchedCommandRequest::BatchType batchType,
+ const BatchedCommandResponse& response,
+ BatchWriteStats* stats ) {
+
+ if ( batchType == BatchedCommandRequest::BatchType_Insert) {
+ stats->numInserted += response.getN();
+ }
+ else if ( batchType == BatchedCommandRequest::BatchType_Update ) {
+ stats->numUpdated += response.getN();
+ if( response.isUpsertDetailsSet() ) {
+ stats->numUpserted += response.sizeUpsertDetails();
+ }
+ else if( response.isSingleUpsertedSet() ) {
+ ++stats->numUpserted;
+ }
+ }
+ else {
+ dassert( batchType == BatchedCommandRequest::BatchType_Delete );
+ stats->numDeleted += response.getN();
+ }
+ }
+
void BatchWriteOp::noteBatchResponse( const TargetedWriteBatch& targetedBatch,
const BatchedCommandResponse& response,
TrackedErrors* trackedErrors ) {
@@ -407,6 +433,41 @@ namespace mongo {
trackErrors( targetedBatch.getEndpoint(), batchError.get(), itemErrors, trackedErrors );
}
+ // Track upserted ids if we need to
+ if ( response.isSingleUpsertedSet() ) {
+
+ // Work backward from the child batch item index to the batch item index
+ int batchIndex = targetedBatch.getWrites()[0]->writeOpRef.first;
+
+ BatchedUpsertDetail* upsertedId = new BatchedUpsertDetail;
+ upsertedId->setIndex( batchIndex );
+ upsertedId->setUpsertedID( response.getSingleUpserted() );
+ _upsertedIds.mutableVector().push_back( upsertedId );
+ }
+ else if ( response.isUpsertDetailsSet() ) {
+
+ const vector<BatchedUpsertDetail*>& upsertedIds = response.getUpsertDetails();
+ for ( vector<BatchedUpsertDetail*>::const_iterator it = upsertedIds.begin();
+ it != upsertedIds.end(); ++it ) {
+
+ // The child upserted details don't have the correct index for the full batch
+ const BatchedUpsertDetail* childUpsertedId = *it;
+
+ // Work backward from the child batch item index to the batch item index
+ int childBatchIndex = childUpsertedId->getIndex();
+ int batchIndex = targetedBatch.getWrites()[childBatchIndex]->writeOpRef.first;
+
+ // Push the upserted id with the correct index into the batch upserted ids
+ BatchedUpsertDetail* upsertedId = new BatchedUpsertDetail;
+ upsertedId->setIndex( batchIndex );
+ upsertedId->setUpsertedID( childUpsertedId->getUpsertedID() );
+ _upsertedIds.mutableVector().push_back( upsertedId );
+ }
+ }
+
+ // Increment stats for this batch
+ incBatchStats( _clientRequest->getBatchType(), response, _stats.get() );
+
// Stop tracking targeted batch
_targeted.erase( &targetedBatch );
}
@@ -415,7 +476,9 @@ namespace mongo {
const BatchedErrorDetail& error ) {
BatchedCommandResponse response;
response.setOk( false );
+ response.setN( 0 );
cloneBatchErrorFrom( error, &response );
+ dassert( response.isValid( NULL ) );
noteBatchResponse( targetedBatch, response, NULL );
}
@@ -533,7 +596,7 @@ namespace mongo {
// Build the per-item errors, if needed
//
- if ( !errOps.empty() ) {
+ if ( !errOps.empty() && _clientRequest->isVerboseWC() ) {
for ( vector<WriteOp*>::iterator it = errOps.begin(); it != errOps.end(); ++it ) {
WriteOp& writeOp = **it;
BatchedErrorDetail* error = new BatchedErrorDetail();
@@ -542,7 +605,25 @@ namespace mongo {
}
}
+ //
+ // Append the upserted ids, if required
+ //
+
+ if ( _upsertedIds.size() != 0 ) {
+ if ( _clientRequest->sizeWriteOps() == 1u ) {
+ batchResp->setSingleUpserted( _upsertedIds.vector().front()->getUpsertedID() );
+ }
+ else if( _clientRequest->isVerboseWC() ) {
+ batchResp->setUpsertDetails( _upsertedIds.vector() );
+ }
+ }
+
+ // Stats
+ int nValue = _stats->numInserted + _stats->numUpdated + _stats->numDeleted;
+ batchResp->setN( nValue );
+
batchResp->setOk( !batchResp->isErrCodeSet() );
+ dassert( batchResp->isValid( NULL ) );
}
BatchWriteOp::~BatchWriteOp() {
diff --git a/src/mongo/s/batch_write_op.h b/src/mongo/s/batch_write_op.h
index f87126f8f76..dbd499c7714 100644
--- a/src/mongo/s/batch_write_op.h
+++ b/src/mongo/s/batch_write_op.h
@@ -46,6 +46,7 @@ namespace mongo {
class TargetedWriteBatch;
struct ShardError;
class TrackedErrors;
+ class BatchWriteStats;
/**
* The BatchWriteOp class manages the lifecycle of a batched write received by mongos. Each
@@ -77,9 +78,7 @@ namespace mongo {
MONGO_DISALLOW_COPYING(BatchWriteOp);
public:
- BatchWriteOp() :
- _clientRequest( NULL ), _writeOps( NULL ) {
- }
+ BatchWriteOp();
~BatchWriteOp();
@@ -157,6 +156,23 @@ namespace mongo {
// Write concern responses from all write batches so far
OwnedPointerVector<ShardError> _wcErrors;
+
+ // Upserted ids for the whole write batch
+ OwnedPointerVector<BatchedUpsertDetail> _upsertedIds;
+
+ // Stats for the entire batch op
+ scoped_ptr<BatchWriteStats> _stats;
+ };
+
+ struct BatchWriteStats {
+
+ BatchWriteStats();
+
+ int numInserted;
+ int numUpserted;
+ int numUpdated;
+ int numDeleted;
+
};
/**
diff --git a/src/mongo/s/batch_write_op_test.cpp b/src/mongo/s/batch_write_op_test.cpp
index 1a96937de93..d07a9e8f916 100644
--- a/src/mongo/s/batch_write_op_test.cpp
+++ b/src/mongo/s/batch_write_op_test.cpp
@@ -82,6 +82,8 @@ namespace {
BatchedCommandResponse response;
response.setOk( true );
+ response.setN( 0 );
+ ASSERT( response.isValid( NULL ) );
batchOp.noteBatchResponse( *targeted.front(), response, NULL );
ASSERT( batchOp.isFinished() );
@@ -103,9 +105,11 @@ namespace {
void setBatchError( const BatchedErrorDetail& error, BatchedCommandResponse* response ) {
response->setOk( false );
+ response->setN( 0 );
response->setErrCode( error.getErrCode() );
response->setErrInfo( error.getErrInfo() );
response->setErrMessage( error.getErrMessage() );
+ ASSERT( response->isValid( NULL ) );
}
TEST(WriteOpTests, TargetSingleError) {
@@ -213,6 +217,8 @@ namespace {
BatchedCommandResponse response;
response.setOk( true );
+ response.setN( 0 );
+ ASSERT( response.isValid( NULL ) );
batchOp.noteBatchResponse( *targeted.front(), response, NULL );
ASSERT( batchOp.isFinished() );
@@ -286,6 +292,8 @@ namespace {
BatchedCommandResponse response;
response.setOk( true );
+ response.setN( 0 );
+ ASSERT( response.isValid( NULL ) );
batchOp.noteBatchResponse( *targeted.front(), response, NULL );
ASSERT( !batchOp.isFinished() );
@@ -363,6 +371,8 @@ namespace {
BatchedCommandResponse response;
response.setOk( true );
+ response.setN( 0 );
+ ASSERT( response.isValid( NULL ) );
batchOp.noteBatchResponse( *targeted.front(), response, NULL );
ASSERT( !batchOp.isFinished() );
@@ -430,6 +440,8 @@ namespace {
// First shard write ok
BatchedCommandResponse response;
response.setOk( true );
+ response.setN( 0 );
+ ASSERT( response.isValid( NULL ) );
batchOp.noteBatchResponse( *targeted.front(), response, NULL );
ASSERT( !batchOp.isFinished() );
@@ -570,6 +582,8 @@ namespace {
BatchedCommandResponse nextResponse;
nextResponse.setOk( true );
+ nextResponse.setN( 0 );
+ ASSERT( nextResponse.isValid( NULL ) );
batchOp.noteBatchResponse( *nextTargeted.front(), nextResponse, NULL );
ASSERT( batchOp.isFinished() );
diff --git a/src/mongo/s/batched_command_request.cpp b/src/mongo/s/batched_command_request.cpp
index 0a326937b77..fe38324c1f7 100644
--- a/src/mongo/s/batched_command_request.cpp
+++ b/src/mongo/s/batched_command_request.cpp
@@ -82,6 +82,20 @@ namespace mongo {
return nss.toString();
}
+ bool BatchedCommandRequest::isVerboseWC() const {
+ if ( !isWriteConcernSet() ) {
+ return true;
+ }
+
+ BSONObj writeConcern = getWriteConcern();
+ BSONElement wElem = writeConcern["w"];
+ if ( !wElem.isNumber() || wElem.Number() != 0 ) {
+ return true;
+ }
+
+ return false;
+ }
+
void BatchedCommandRequest::cloneTo( BatchedCommandRequest* other ) const {
other->_insertReq.reset();
other->_updateReq.reset();
diff --git a/src/mongo/s/batched_command_request.h b/src/mongo/s/batched_command_request.h
index 7cb67621497..e36c03c502d 100644
--- a/src/mongo/s/batched_command_request.h
+++ b/src/mongo/s/batched_command_request.h
@@ -86,6 +86,8 @@ namespace mongo {
// individual field accessors
//
+ bool isVerboseWC() const;
+
void setNS( const StringData& collName );
void unsetNS();
bool isNSSet() const;
diff --git a/src/mongo/s/batched_command_response.cpp b/src/mongo/s/batched_command_response.cpp
index f56710954de..2dbda541565 100644
--- a/src/mongo/s/batched_command_response.cpp
+++ b/src/mongo/s/batched_command_response.cpp
@@ -55,6 +55,12 @@ namespace mongo {
return false;
}
+ // All the mandatory fields must be present.
+ if (!_isNSet) {
+ *errMsg = stream() << "missing " << n.name() << " field";
+ return false;
+ }
+
// upserted and singleUpserted cannot live together
if (_isSingleUpsertedSet && _upsertDetails.get()) {
*errMsg = stream() << "duplicated " << singleUpserted.name() << " field";
@@ -148,14 +154,18 @@ namespace mongo {
_n = tempN;
}
- fieldState = FieldParser::extract(source, singleUpserted, &_singleUpserted, errMsg);
- if (fieldState == FieldParser::FIELD_INVALID) return false;
+ // singleUpserted and upsertDetails have the same field name, but are distinguished
+ // by type. First try parsing singleUpserted, if that doesn't work, try upsertDetails
+ fieldState = FieldParser::extractID(source, singleUpserted, &_singleUpserted, errMsg);
_isSingleUpsertedSet = fieldState == FieldParser::FIELD_SET;
- std::vector<BatchedUpsertDetail*>* tempUpsertDetails = NULL;
- fieldState = FieldParser::extract(source, upsertDetails, &tempUpsertDetails, errMsg);
- if (fieldState == FieldParser::FIELD_INVALID) return false;
- if (fieldState == FieldParser::FIELD_SET) _upsertDetails.reset(tempUpsertDetails);
+ // Try upsertDetails if singleUpserted didn't work
+ if (fieldState == FieldParser::FIELD_INVALID) {
+ std::vector<BatchedUpsertDetail*>* tempUpsertDetails = NULL;
+ fieldState = FieldParser::extract( source, upsertDetails, &tempUpsertDetails, errMsg );
+ if ( fieldState == FieldParser::FIELD_INVALID ) return false;
+ if ( fieldState == FieldParser::FIELD_SET ) _upsertDetails.reset( tempUpsertDetails );
+ }
fieldState = FieldParser::extract(source, lastOp, &_lastOp, errMsg);
if (fieldState == FieldParser::FIELD_INVALID) return false;
@@ -345,7 +355,7 @@ namespace mongo {
}
void BatchedCommandResponse::setSingleUpserted(const BSONObj& singleUpserted) {
- _singleUpserted = singleUpserted.getOwned();
+ _singleUpserted = singleUpserted.firstElement().wrap( "" ).getOwned();
_isSingleUpsertedSet = true;
}
diff --git a/src/mongo/s/batched_command_response.h b/src/mongo/s/batched_command_response.h
index 79458ac441d..c099f60aa60 100644
--- a/src/mongo/s/batched_command_response.h
+++ b/src/mongo/s/batched_command_response.h
@@ -45,7 +45,7 @@ namespace mongo {
static const BSONField<BSONObj> errInfo;
static const BSONField<string> errMessage;
static const BSONField<long long> n;
- static const BSONField<BSONObj> singleUpserted;
+ static const BSONField<BSONObj> singleUpserted; // ID type
static const BSONField<std::vector<BatchedUpsertDetail*> > upsertDetails;
static const BSONField<Date_t> lastOp;
static const BSONField<std::vector<BatchedErrorDetail*> > errDetails;
@@ -144,11 +144,11 @@ namespace mongo {
string _errMessage;
bool _isErrMessageSet;
- // (O) number of documents affected
+ // (M) number of documents affected
long long _n;
bool _isNSet;
- // (0) "promoted" _upserted, if the corresponding request contained only one batch item
+ // (O) "promoted" _upserted, if the corresponding request contained only one batch item
// Should only be present if _upserted is not.
BSONObj _singleUpserted;
bool _isSingleUpsertedSet;
diff --git a/src/mongo/s/batched_upsert_detail.cpp b/src/mongo/s/batched_upsert_detail.cpp
index b553383c119..d554e8a3ef6 100644
--- a/src/mongo/s/batched_upsert_detail.cpp
+++ b/src/mongo/s/batched_upsert_detail.cpp
@@ -89,7 +89,7 @@ namespace mongo {
if (fieldState == FieldParser::FIELD_INVALID) return false;
_isIndexSet = fieldState == FieldParser::FIELD_SET;
- fieldState = FieldParser::extract(source, upsertedID, &_upsertedID, errMsg);
+ fieldState = FieldParser::extractID(source, upsertedID, &_upsertedID, errMsg);
if (fieldState == FieldParser::FIELD_INVALID) return false;
_isUpsertedIDSet = fieldState == FieldParser::FIELD_SET;
@@ -138,7 +138,7 @@ namespace mongo {
}
void BatchedUpsertDetail::setUpsertedID(const BSONObj& upsertedID) {
- _upsertedID = upsertedID.getOwned();
+ _upsertedID = upsertedID.firstElement().wrap( "" ).getOwned();
_isUpsertedIDSet = true;
}
diff --git a/src/mongo/s/batched_upsert_detail.h b/src/mongo/s/batched_upsert_detail.h
index b2590a71990..cff8122f7ee 100644
--- a/src/mongo/s/batched_upsert_detail.h
+++ b/src/mongo/s/batched_upsert_detail.h
@@ -50,7 +50,7 @@ namespace mongo {
//
static const BSONField<int> index;
- static const BSONField<BSONObj> upsertedID;
+ static const BSONField<BSONObj> upsertedID; // ID type
//
// construction / destruction
diff --git a/src/mongo/s/commands/cluster_write_cmd.cpp b/src/mongo/s/commands/cluster_write_cmd.cpp
index 25d8be3ce9b..a35e349f574 100644
--- a/src/mongo/s/commands/cluster_write_cmd.cpp
+++ b/src/mongo/s/commands/cluster_write_cmd.cpp
@@ -213,8 +213,11 @@ namespace mongo {
if ( !request.parseBSON( cmdObj, &errMsg ) || !request.isValid( &errMsg ) ) {
// Batch parse failure
response.setOk( false );
+ response.setN( 0 );
response.setErrCode( ErrorCodes::FailedToParse );
response.setErrMessage( errMsg );
+
+ dassert( response.isValid( &errMsg ) );
result.appendElements( response.toBSON() );
// TODO
@@ -230,8 +233,11 @@ namespace mongo {
if ( request.sizeWriteOps() != 1 || request.isWriteConcernSet() ) {
// Invalid request to create index
response.setOk( false );
+ response.setN( 0 );
response.setErrCode( ErrorCodes::CannotCreateIndex );
response.setErrMessage( "invalid batch request for index creation" );
+
+ dassert( response.isValid( &errMsg ) );
result.appendElements( response.toBSON() );
return false;
diff --git a/src/mongo/s/mock_multi_command.h b/src/mongo/s/mock_multi_command.h
index 2f8aa65f0e2..feca96c5ca1 100644
--- a/src/mongo/s/mock_multi_command.h
+++ b/src/mongo/s/mock_multi_command.h
@@ -103,9 +103,11 @@ namespace mongo {
if ( NULL == mockEndpoint ) {
batchResponse->setOk( true );
+ batchResponse->setN( 0 ); // TODO: Make this accurate
}
else {
batchResponse->setOk( false );
+ batchResponse->setN( 0 );
batchResponse->setErrCode( mockEndpoint->error.getErrCode() );
if ( mockEndpoint->error.isErrInfoSet() ) batchResponse->setErrInfo( mockEndpoint
->error.getErrInfo() );
@@ -113,6 +115,8 @@ namespace mongo {
delete mockEndpoint;
}
+ string errMsg;
+ ASSERT( batchResponse->isValid( &errMsg ) );
return Status::OK();
}