diff options
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/db/commands/write_commands/batch_executor.cpp | 23 | ||||
-rw-r--r-- | src/mongo/db/commands/write_commands/write_commands.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/field_parser.cpp | 24 | ||||
-rw-r--r-- | src/mongo/db/field_parser.h | 9 | ||||
-rw-r--r-- | src/mongo/s/batch_write_op.cpp | 91 | ||||
-rw-r--r-- | src/mongo/s/batch_write_op.h | 22 | ||||
-rw-r--r-- | src/mongo/s/batch_write_op_test.cpp | 14 | ||||
-rw-r--r-- | src/mongo/s/batched_command_request.cpp | 14 | ||||
-rw-r--r-- | src/mongo/s/batched_command_request.h | 2 | ||||
-rw-r--r-- | src/mongo/s/batched_command_response.cpp | 24 | ||||
-rw-r--r-- | src/mongo/s/batched_command_response.h | 6 | ||||
-rw-r--r-- | src/mongo/s/batched_upsert_detail.cpp | 4 | ||||
-rw-r--r-- | src/mongo/s/batched_upsert_detail.h | 2 | ||||
-rw-r--r-- | src/mongo/s/commands/cluster_write_cmd.cpp | 6 | ||||
-rw-r--r-- | src/mongo/s/mock_multi_command.h | 4 |
15 files changed, 207 insertions, 42 deletions
diff --git a/src/mongo/db/commands/write_commands/batch_executor.cpp b/src/mongo/db/commands/write_commands/batch_executor.cpp index 51868e69ea2..d7116443e17 100644 --- a/src/mongo/db/commands/write_commands/batch_executor.cpp +++ b/src/mongo/db/commands/write_commands/batch_executor.cpp @@ -48,26 +48,6 @@ namespace mongo { - namespace { - - // We're assuming here that writeConcern was parsed and is valid or is not - // present. This is just a quick check of whether is is {w:0} or not. - bool verboseResponse( const BatchedCommandRequest& request ) { - if ( !request.isWriteConcernSet() ) { - return true; - } - - BSONObj writeConcern = request.getWriteConcern(); - BSONElement wElem = writeConcern["w"]; - if ( !wElem.isNumber() || wElem.Number() != 0 ) { - return true; - } - - return false; - } - - } - WriteBatchExecutor::WriteBatchExecutor( const BSONObj& wc, Client* client, OpCounters* opCounters, @@ -89,7 +69,7 @@ namespace mongo { // Apply each batch item, stopping on an error if we were asked to apply the batch // sequentially. size_t numBatchOps = request.sizeWriteOps(); - bool verbose = verboseResponse( request ); + bool verbose = request.isVerboseWC(); for ( size_t i = 0; i < numBatchOps; i++ ) { if ( applyWriteItem( BatchItemRef( &request, i ), @@ -196,6 +176,7 @@ namespace mongo { // code would already be set. response->setOk( !response->isErrCodeSet() ); response->setN( stats.numUpdated + stats.numInserted + stats.numDeleted ); + dassert( response->isValid( NULL ) ); } namespace { diff --git a/src/mongo/db/commands/write_commands/write_commands.cpp b/src/mongo/db/commands/write_commands/write_commands.cpp index 292584f755b..87ce6d100d5 100644 --- a/src/mongo/db/commands/write_commands/write_commands.cpp +++ b/src/mongo/db/commands/write_commands/write_commands.cpp @@ -89,10 +89,14 @@ namespace mongo { BatchedCommandResponse response; if ( !request.parseBSON( cmdObj, &errMsg ) || !request.isValid( &errMsg ) ) { + // Batch parse failure response.setOk( false ); + response.setN( 0 ); response.setErrCode( 99999 ); response.setErrMessage( errMsg ); + + dassert( response.isValid( &errMsg ) ); result.appendElements( response.toBSON() ); // TODO diff --git a/src/mongo/db/field_parser.cpp b/src/mongo/db/field_parser.cpp index 0ce90b95e9a..7a3a61b3203 100644 --- a/src/mongo/db/field_parser.cpp +++ b/src/mongo/db/field_parser.cpp @@ -283,4 +283,28 @@ namespace mongo { return FIELD_INVALID; } + FieldParser::FieldState FieldParser::extractID( BSONObj doc, + const BSONField<BSONObj>& field, + BSONObj* out, + string* errMsg ) { + BSONElement elem = doc[field.name()]; + if (elem.eoo()) { + if (field.hasDefault()) { + *out = field.getDefault().firstElement().wrap( "" ); + return FIELD_DEFAULT; + } + else { + return FIELD_NONE; + } + } + + if ( elem.type() != Array ) { + *out = elem.wrap( "" ).getOwned(); + return FIELD_SET; + } + + _genFieldErrMsg(doc, field, "id", errMsg); + return FIELD_INVALID; + } + } // namespace mongo diff --git a/src/mongo/db/field_parser.h b/src/mongo/db/field_parser.h index d814a3cbcd9..36a8776f529 100644 --- a/src/mongo/db/field_parser.h +++ b/src/mongo/db/field_parser.h @@ -122,6 +122,15 @@ namespace mongo { string* errMsg = NULL); /** + * Extracts a document id from a particular field name, which may be of any type but Array. + * Wraps the extracted id value in a BSONObj with one element and empty field name. + */ + static FieldState extractID( BSONObj doc, + const BSONField<BSONObj>& field, + BSONObj* out, + string* errMsg = NULL ); + + /** * Extracts a mandatory BSONSerializable structure 'field' from the object 'doc'. Write * the extracted contents to '*out' if successful or fills '*errMsg', if exising, * otherwise. This variant relies on T having a parseBSON, which all diff --git a/src/mongo/s/batch_write_op.cpp b/src/mongo/s/batch_write_op.cpp index 67c1dda5725..28b15fc9196 100644 --- a/src/mongo/s/batch_write_op.cpp +++ b/src/mongo/s/batch_write_op.cpp @@ -32,6 +32,14 @@ namespace mongo { + BatchWriteStats::BatchWriteStats() : + numInserted( 0 ), numUpserted( 0 ), numUpdated( 0 ), numDeleted( 0 ) { + } + + BatchWriteOp::BatchWriteOp() : + _clientRequest( NULL ), _writeOps( NULL ), _stats( new BatchWriteStats ) { + } + void BatchWriteOp::initClientRequest( const BatchedCommandRequest* clientRequest ) { dassert( clientRequest->isValid( NULL ) ); @@ -288,10 +296,6 @@ namespace mongo { return errCode == ErrorCodes::WriteConcernFailed; } - static bool isItemErrorsOnlyCode( int errCode ) { - return errCode == ErrorCodes::MultipleErrorsOccurred; - } - // Given *either* a batch error or an array of per-item errors, copies errors we're interested // in into a TrackedErrorMap static void trackErrors( const ShardEndpoint& endpoint, @@ -316,6 +320,28 @@ namespace mongo { } } + static void incBatchStats( BatchedCommandRequest::BatchType batchType, + const BatchedCommandResponse& response, + BatchWriteStats* stats ) { + + if ( batchType == BatchedCommandRequest::BatchType_Insert) { + stats->numInserted += response.getN(); + } + else if ( batchType == BatchedCommandRequest::BatchType_Update ) { + stats->numUpdated += response.getN(); + if( response.isUpsertDetailsSet() ) { + stats->numUpserted += response.sizeUpsertDetails(); + } + else if( response.isSingleUpsertedSet() ) { + ++stats->numUpserted; + } + } + else { + dassert( batchType == BatchedCommandRequest::BatchType_Delete ); + stats->numDeleted += response.getN(); + } + } + void BatchWriteOp::noteBatchResponse( const TargetedWriteBatch& targetedBatch, const BatchedCommandResponse& response, TrackedErrors* trackedErrors ) { @@ -407,6 +433,41 @@ namespace mongo { trackErrors( targetedBatch.getEndpoint(), batchError.get(), itemErrors, trackedErrors ); } + // Track upserted ids if we need to + if ( response.isSingleUpsertedSet() ) { + + // Work backward from the child batch item index to the batch item index + int batchIndex = targetedBatch.getWrites()[0]->writeOpRef.first; + + BatchedUpsertDetail* upsertedId = new BatchedUpsertDetail; + upsertedId->setIndex( batchIndex ); + upsertedId->setUpsertedID( response.getSingleUpserted() ); + _upsertedIds.mutableVector().push_back( upsertedId ); + } + else if ( response.isUpsertDetailsSet() ) { + + const vector<BatchedUpsertDetail*>& upsertedIds = response.getUpsertDetails(); + for ( vector<BatchedUpsertDetail*>::const_iterator it = upsertedIds.begin(); + it != upsertedIds.end(); ++it ) { + + // The child upserted details don't have the correct index for the full batch + const BatchedUpsertDetail* childUpsertedId = *it; + + // Work backward from the child batch item index to the batch item index + int childBatchIndex = childUpsertedId->getIndex(); + int batchIndex = targetedBatch.getWrites()[childBatchIndex]->writeOpRef.first; + + // Push the upserted id with the correct index into the batch upserted ids + BatchedUpsertDetail* upsertedId = new BatchedUpsertDetail; + upsertedId->setIndex( batchIndex ); + upsertedId->setUpsertedID( childUpsertedId->getUpsertedID() ); + _upsertedIds.mutableVector().push_back( upsertedId ); + } + } + + // Increment stats for this batch + incBatchStats( _clientRequest->getBatchType(), response, _stats.get() ); + // Stop tracking targeted batch _targeted.erase( &targetedBatch ); } @@ -415,7 +476,9 @@ namespace mongo { const BatchedErrorDetail& error ) { BatchedCommandResponse response; response.setOk( false ); + response.setN( 0 ); cloneBatchErrorFrom( error, &response ); + dassert( response.isValid( NULL ) ); noteBatchResponse( targetedBatch, response, NULL ); } @@ -533,7 +596,7 @@ namespace mongo { // Build the per-item errors, if needed // - if ( !errOps.empty() ) { + if ( !errOps.empty() && _clientRequest->isVerboseWC() ) { for ( vector<WriteOp*>::iterator it = errOps.begin(); it != errOps.end(); ++it ) { WriteOp& writeOp = **it; BatchedErrorDetail* error = new BatchedErrorDetail(); @@ -542,7 +605,25 @@ namespace mongo { } } + // + // Append the upserted ids, if required + // + + if ( _upsertedIds.size() != 0 ) { + if ( _clientRequest->sizeWriteOps() == 1u ) { + batchResp->setSingleUpserted( _upsertedIds.vector().front()->getUpsertedID() ); + } + else if( _clientRequest->isVerboseWC() ) { + batchResp->setUpsertDetails( _upsertedIds.vector() ); + } + } + + // Stats + int nValue = _stats->numInserted + _stats->numUpdated + _stats->numDeleted; + batchResp->setN( nValue ); + batchResp->setOk( !batchResp->isErrCodeSet() ); + dassert( batchResp->isValid( NULL ) ); } BatchWriteOp::~BatchWriteOp() { diff --git a/src/mongo/s/batch_write_op.h b/src/mongo/s/batch_write_op.h index f87126f8f76..dbd499c7714 100644 --- a/src/mongo/s/batch_write_op.h +++ b/src/mongo/s/batch_write_op.h @@ -46,6 +46,7 @@ namespace mongo { class TargetedWriteBatch; struct ShardError; class TrackedErrors; + class BatchWriteStats; /** * The BatchWriteOp class manages the lifecycle of a batched write received by mongos. Each @@ -77,9 +78,7 @@ namespace mongo { MONGO_DISALLOW_COPYING(BatchWriteOp); public: - BatchWriteOp() : - _clientRequest( NULL ), _writeOps( NULL ) { - } + BatchWriteOp(); ~BatchWriteOp(); @@ -157,6 +156,23 @@ namespace mongo { // Write concern responses from all write batches so far OwnedPointerVector<ShardError> _wcErrors; + + // Upserted ids for the whole write batch + OwnedPointerVector<BatchedUpsertDetail> _upsertedIds; + + // Stats for the entire batch op + scoped_ptr<BatchWriteStats> _stats; + }; + + struct BatchWriteStats { + + BatchWriteStats(); + + int numInserted; + int numUpserted; + int numUpdated; + int numDeleted; + }; /** diff --git a/src/mongo/s/batch_write_op_test.cpp b/src/mongo/s/batch_write_op_test.cpp index 1a96937de93..d07a9e8f916 100644 --- a/src/mongo/s/batch_write_op_test.cpp +++ b/src/mongo/s/batch_write_op_test.cpp @@ -82,6 +82,8 @@ namespace { BatchedCommandResponse response; response.setOk( true ); + response.setN( 0 ); + ASSERT( response.isValid( NULL ) ); batchOp.noteBatchResponse( *targeted.front(), response, NULL ); ASSERT( batchOp.isFinished() ); @@ -103,9 +105,11 @@ namespace { void setBatchError( const BatchedErrorDetail& error, BatchedCommandResponse* response ) { response->setOk( false ); + response->setN( 0 ); response->setErrCode( error.getErrCode() ); response->setErrInfo( error.getErrInfo() ); response->setErrMessage( error.getErrMessage() ); + ASSERT( response->isValid( NULL ) ); } TEST(WriteOpTests, TargetSingleError) { @@ -213,6 +217,8 @@ namespace { BatchedCommandResponse response; response.setOk( true ); + response.setN( 0 ); + ASSERT( response.isValid( NULL ) ); batchOp.noteBatchResponse( *targeted.front(), response, NULL ); ASSERT( batchOp.isFinished() ); @@ -286,6 +292,8 @@ namespace { BatchedCommandResponse response; response.setOk( true ); + response.setN( 0 ); + ASSERT( response.isValid( NULL ) ); batchOp.noteBatchResponse( *targeted.front(), response, NULL ); ASSERT( !batchOp.isFinished() ); @@ -363,6 +371,8 @@ namespace { BatchedCommandResponse response; response.setOk( true ); + response.setN( 0 ); + ASSERT( response.isValid( NULL ) ); batchOp.noteBatchResponse( *targeted.front(), response, NULL ); ASSERT( !batchOp.isFinished() ); @@ -430,6 +440,8 @@ namespace { // First shard write ok BatchedCommandResponse response; response.setOk( true ); + response.setN( 0 ); + ASSERT( response.isValid( NULL ) ); batchOp.noteBatchResponse( *targeted.front(), response, NULL ); ASSERT( !batchOp.isFinished() ); @@ -570,6 +582,8 @@ namespace { BatchedCommandResponse nextResponse; nextResponse.setOk( true ); + nextResponse.setN( 0 ); + ASSERT( nextResponse.isValid( NULL ) ); batchOp.noteBatchResponse( *nextTargeted.front(), nextResponse, NULL ); ASSERT( batchOp.isFinished() ); diff --git a/src/mongo/s/batched_command_request.cpp b/src/mongo/s/batched_command_request.cpp index 0a326937b77..fe38324c1f7 100644 --- a/src/mongo/s/batched_command_request.cpp +++ b/src/mongo/s/batched_command_request.cpp @@ -82,6 +82,20 @@ namespace mongo { return nss.toString(); } + bool BatchedCommandRequest::isVerboseWC() const { + if ( !isWriteConcernSet() ) { + return true; + } + + BSONObj writeConcern = getWriteConcern(); + BSONElement wElem = writeConcern["w"]; + if ( !wElem.isNumber() || wElem.Number() != 0 ) { + return true; + } + + return false; + } + void BatchedCommandRequest::cloneTo( BatchedCommandRequest* other ) const { other->_insertReq.reset(); other->_updateReq.reset(); diff --git a/src/mongo/s/batched_command_request.h b/src/mongo/s/batched_command_request.h index 7cb67621497..e36c03c502d 100644 --- a/src/mongo/s/batched_command_request.h +++ b/src/mongo/s/batched_command_request.h @@ -86,6 +86,8 @@ namespace mongo { // individual field accessors // + bool isVerboseWC() const; + void setNS( const StringData& collName ); void unsetNS(); bool isNSSet() const; diff --git a/src/mongo/s/batched_command_response.cpp b/src/mongo/s/batched_command_response.cpp index f56710954de..2dbda541565 100644 --- a/src/mongo/s/batched_command_response.cpp +++ b/src/mongo/s/batched_command_response.cpp @@ -55,6 +55,12 @@ namespace mongo { return false; } + // All the mandatory fields must be present. + if (!_isNSet) { + *errMsg = stream() << "missing " << n.name() << " field"; + return false; + } + // upserted and singleUpserted cannot live together if (_isSingleUpsertedSet && _upsertDetails.get()) { *errMsg = stream() << "duplicated " << singleUpserted.name() << " field"; @@ -148,14 +154,18 @@ namespace mongo { _n = tempN; } - fieldState = FieldParser::extract(source, singleUpserted, &_singleUpserted, errMsg); - if (fieldState == FieldParser::FIELD_INVALID) return false; + // singleUpserted and upsertDetails have the same field name, but are distinguished + // by type. First try parsing singleUpserted, if that doesn't work, try upsertDetails + fieldState = FieldParser::extractID(source, singleUpserted, &_singleUpserted, errMsg); _isSingleUpsertedSet = fieldState == FieldParser::FIELD_SET; - std::vector<BatchedUpsertDetail*>* tempUpsertDetails = NULL; - fieldState = FieldParser::extract(source, upsertDetails, &tempUpsertDetails, errMsg); - if (fieldState == FieldParser::FIELD_INVALID) return false; - if (fieldState == FieldParser::FIELD_SET) _upsertDetails.reset(tempUpsertDetails); + // Try upsertDetails if singleUpserted didn't work + if (fieldState == FieldParser::FIELD_INVALID) { + std::vector<BatchedUpsertDetail*>* tempUpsertDetails = NULL; + fieldState = FieldParser::extract( source, upsertDetails, &tempUpsertDetails, errMsg ); + if ( fieldState == FieldParser::FIELD_INVALID ) return false; + if ( fieldState == FieldParser::FIELD_SET ) _upsertDetails.reset( tempUpsertDetails ); + } fieldState = FieldParser::extract(source, lastOp, &_lastOp, errMsg); if (fieldState == FieldParser::FIELD_INVALID) return false; @@ -345,7 +355,7 @@ namespace mongo { } void BatchedCommandResponse::setSingleUpserted(const BSONObj& singleUpserted) { - _singleUpserted = singleUpserted.getOwned(); + _singleUpserted = singleUpserted.firstElement().wrap( "" ).getOwned(); _isSingleUpsertedSet = true; } diff --git a/src/mongo/s/batched_command_response.h b/src/mongo/s/batched_command_response.h index 79458ac441d..c099f60aa60 100644 --- a/src/mongo/s/batched_command_response.h +++ b/src/mongo/s/batched_command_response.h @@ -45,7 +45,7 @@ namespace mongo { static const BSONField<BSONObj> errInfo; static const BSONField<string> errMessage; static const BSONField<long long> n; - static const BSONField<BSONObj> singleUpserted; + static const BSONField<BSONObj> singleUpserted; // ID type static const BSONField<std::vector<BatchedUpsertDetail*> > upsertDetails; static const BSONField<Date_t> lastOp; static const BSONField<std::vector<BatchedErrorDetail*> > errDetails; @@ -144,11 +144,11 @@ namespace mongo { string _errMessage; bool _isErrMessageSet; - // (O) number of documents affected + // (M) number of documents affected long long _n; bool _isNSet; - // (0) "promoted" _upserted, if the corresponding request contained only one batch item + // (O) "promoted" _upserted, if the corresponding request contained only one batch item // Should only be present if _upserted is not. BSONObj _singleUpserted; bool _isSingleUpsertedSet; diff --git a/src/mongo/s/batched_upsert_detail.cpp b/src/mongo/s/batched_upsert_detail.cpp index b553383c119..d554e8a3ef6 100644 --- a/src/mongo/s/batched_upsert_detail.cpp +++ b/src/mongo/s/batched_upsert_detail.cpp @@ -89,7 +89,7 @@ namespace mongo { if (fieldState == FieldParser::FIELD_INVALID) return false; _isIndexSet = fieldState == FieldParser::FIELD_SET; - fieldState = FieldParser::extract(source, upsertedID, &_upsertedID, errMsg); + fieldState = FieldParser::extractID(source, upsertedID, &_upsertedID, errMsg); if (fieldState == FieldParser::FIELD_INVALID) return false; _isUpsertedIDSet = fieldState == FieldParser::FIELD_SET; @@ -138,7 +138,7 @@ namespace mongo { } void BatchedUpsertDetail::setUpsertedID(const BSONObj& upsertedID) { - _upsertedID = upsertedID.getOwned(); + _upsertedID = upsertedID.firstElement().wrap( "" ).getOwned(); _isUpsertedIDSet = true; } diff --git a/src/mongo/s/batched_upsert_detail.h b/src/mongo/s/batched_upsert_detail.h index b2590a71990..cff8122f7ee 100644 --- a/src/mongo/s/batched_upsert_detail.h +++ b/src/mongo/s/batched_upsert_detail.h @@ -50,7 +50,7 @@ namespace mongo { // static const BSONField<int> index; - static const BSONField<BSONObj> upsertedID; + static const BSONField<BSONObj> upsertedID; // ID type // // construction / destruction diff --git a/src/mongo/s/commands/cluster_write_cmd.cpp b/src/mongo/s/commands/cluster_write_cmd.cpp index 25d8be3ce9b..a35e349f574 100644 --- a/src/mongo/s/commands/cluster_write_cmd.cpp +++ b/src/mongo/s/commands/cluster_write_cmd.cpp @@ -213,8 +213,11 @@ namespace mongo { if ( !request.parseBSON( cmdObj, &errMsg ) || !request.isValid( &errMsg ) ) { // Batch parse failure response.setOk( false ); + response.setN( 0 ); response.setErrCode( ErrorCodes::FailedToParse ); response.setErrMessage( errMsg ); + + dassert( response.isValid( &errMsg ) ); result.appendElements( response.toBSON() ); // TODO @@ -230,8 +233,11 @@ namespace mongo { if ( request.sizeWriteOps() != 1 || request.isWriteConcernSet() ) { // Invalid request to create index response.setOk( false ); + response.setN( 0 ); response.setErrCode( ErrorCodes::CannotCreateIndex ); response.setErrMessage( "invalid batch request for index creation" ); + + dassert( response.isValid( &errMsg ) ); result.appendElements( response.toBSON() ); return false; diff --git a/src/mongo/s/mock_multi_command.h b/src/mongo/s/mock_multi_command.h index 2f8aa65f0e2..feca96c5ca1 100644 --- a/src/mongo/s/mock_multi_command.h +++ b/src/mongo/s/mock_multi_command.h @@ -103,9 +103,11 @@ namespace mongo { if ( NULL == mockEndpoint ) { batchResponse->setOk( true ); + batchResponse->setN( 0 ); // TODO: Make this accurate } else { batchResponse->setOk( false ); + batchResponse->setN( 0 ); batchResponse->setErrCode( mockEndpoint->error.getErrCode() ); if ( mockEndpoint->error.isErrInfoSet() ) batchResponse->setErrInfo( mockEndpoint ->error.getErrInfo() ); @@ -113,6 +115,8 @@ namespace mongo { delete mockEndpoint; } + string errMsg; + ASSERT( batchResponse->isValid( &errMsg ) ); return Status::OK(); } |