/** * Copyright (C) 2013 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #include "mongo/s/write_ops/batch_write_op.h" #include "mongo/base/error_codes.h" namespace mongo { using std::unique_ptr; using std::make_pair; using std::set; using std::stringstream; using std::vector; /** * Returns a new write concern that has the copy of every field from the original * document but with a w set to 1. This is intended for upgrading { w: 0 } write * concern to { w: 1 }. */ static BSONObj upgradeWriteConcern ( const BSONObj& origWriteConcern ) { BSONObjIterator iter( origWriteConcern ); BSONObjBuilder newWriteConcern; while ( iter.more() ) { BSONElement elem( iter.next() ); if ( strncmp( elem.fieldName(), "w", 2 ) == 0 ) { newWriteConcern.append( "w", 1 ); } else { newWriteConcern.append( elem ); } } return newWriteConcern.obj(); } BatchWriteStats::BatchWriteStats() : numInserted( 0 ), numUpserted( 0 ), numMatched( 0 ), numModified( 0 ), numDeleted( 0 ) { } BatchWriteOp::BatchWriteOp() : _clientRequest( NULL ), _writeOps( NULL ), _stats( new BatchWriteStats ) { } void BatchWriteOp::initClientRequest( const BatchedCommandRequest* clientRequest ) { dassert( clientRequest->isValid( NULL ) ); size_t numWriteOps = clientRequest->sizeWriteOps(); _writeOps = static_cast( ::operator new[]( numWriteOps * sizeof(WriteOp) ) ); for ( size_t i = 0; i < numWriteOps; ++i ) { // Don't want to have to define what an empty WriteOp means, so construct in-place new ( &_writeOps[i] ) WriteOp( BatchItemRef( clientRequest, i ) ); } _clientRequest = clientRequest; } // Arbitrary endpoint ordering, needed for grouping by endpoint static int compareEndpoints( const ShardEndpoint* endpointA, const ShardEndpoint* endpointB ) { int shardNameDiff = endpointA->shardName.compare( endpointB->shardName ); if ( shardNameDiff != 0 ) return shardNameDiff; long shardVersionDiff = endpointA->shardVersion.toLong() - endpointB->shardVersion.toLong(); if ( shardVersionDiff != 0 ) return shardVersionDiff; int shardEpochDiff = endpointA->shardVersion.epoch().compare( endpointB->shardVersion.epoch() ); return shardEpochDiff; } namespace { // // Types for comparing shard endpoints in a map // struct EndpointComp { bool operator()( const ShardEndpoint* endpointA, const ShardEndpoint* endpointB ) const { return compareEndpoints( endpointA, endpointB ) < 0; } }; typedef std::map TargetedBatchMap; // // Types for tracking batch sizes // struct BatchSize { BatchSize() : numOps(0), sizeBytes(0) { } int numOps; int sizeBytes; }; typedef std::map TargetedBatchSizeMap; } static void buildTargetError( const Status& errStatus, WriteErrorDetail* details ) { details->setErrCode( errStatus.code() ); details->setErrMessage( errStatus.reason() ); } // Helper to determine whether a number of targeted writes require a new targeted batch static bool isNewBatchRequired( const vector& writes, const TargetedBatchMap& batchMap ) { for ( vector::const_iterator it = writes.begin(); it != writes.end(); ++it ) { TargetedWrite* write = *it; if ( batchMap.find( &write->endpoint ) == batchMap.end() ) { return true; } } return false; } // MAGIC NUMBERS // Before serializing updates/deletes, we don't know how big their fields would be, but we break // batches before serializing. // TODO: Revisit when we revisit command limits in general static const int kEstUpdateOverheadBytes = (BSONObjMaxInternalSize - BSONObjMaxUserSize) / 100; static const int kEstDeleteOverheadBytes = (BSONObjMaxInternalSize - BSONObjMaxUserSize) / 100; static int getWriteSizeBytes(const WriteOp& writeOp) { const BatchItemRef& item = writeOp.getWriteItem(); BatchedCommandRequest::BatchType batchType = item.getOpType(); if (batchType == BatchedCommandRequest::BatchType_Insert) { return item.getDocument().objsize(); } else if (batchType == BatchedCommandRequest::BatchType_Update) { // Note: Be conservative here - it's okay if we send slightly too many batches int estSize = item.getUpdate()->getQuery().objsize() + item.getUpdate()->getUpdateExpr().objsize() + kEstUpdateOverheadBytes; dassert(estSize >= item.getUpdate()->toBSON().objsize()); return estSize; } else { dassert( batchType == BatchedCommandRequest::BatchType_Delete ); // Note: Be conservative here - it's okay if we send slightly too many batches int estSize = item.getDelete()->getQuery().objsize() + kEstDeleteOverheadBytes; dassert(estSize >= item.getDelete()->toBSON().objsize()); return estSize; } } // Helper to determine whether a number of targeted writes require a new targeted batch static bool wouldMakeBatchesTooBig(const vector& writes, int writeSizeBytes, const TargetedBatchSizeMap& batchSizes) { for (vector::const_iterator it = writes.begin(); it != writes.end(); ++it) { const TargetedWrite* write = *it; TargetedBatchSizeMap::const_iterator seenIt = batchSizes.find(&write->endpoint); if (seenIt == batchSizes.end()) { // If this is the first item in the batch, it can't be too big continue; } const BatchSize& batchSize = seenIt->second; if (batchSize.numOps >= static_cast(BatchedCommandRequest::kMaxWriteBatchSize)) { // Too many items in batch return true; } if (batchSize.sizeBytes + writeSizeBytes > BSONObjMaxUserSize) { // Batch would be too big return true; } } return false; } // Helper function to cancel all the write ops of targeted batches in a map static void cancelBatches( const WriteErrorDetail& why, WriteOp* writeOps, TargetedBatchMap* batchMap ) { set targetedWriteOps; // Collect all the writeOps that are currently targeted for ( TargetedBatchMap::iterator it = batchMap->begin(); it != batchMap->end(); ) { TargetedWriteBatch* batch = it->second; const vector& writes = batch->getWrites(); for ( vector::const_iterator writeIt = writes.begin(); writeIt != writes.end(); ++writeIt ) { TargetedWrite* write = *writeIt; // NOTE: We may repeatedly cancel a write op here, but that's fast and we want to // cancel before erasing the TargetedWrite* (which owns the cancelled targeting // info) for reporting reasons. writeOps[write->writeOpRef.first].cancelWrites( &why ); } // Note that we need to *erase* first, *then* delete, since the map keys are ptrs from // the values batchMap->erase( it++ ); delete batch; } batchMap->clear(); } Status BatchWriteOp::targetBatch( const NSTargeter& targeter, bool recordTargetErrors, vector* targetedBatches ) { // // Targeting of unordered batches is fairly simple - each remaining write op is targeted, // and each of those targeted writes are grouped into a batch for a particular shard // endpoint. // // Targeting of ordered batches is a bit more complex - to respect the ordering of the // batch, we can only send: // A) a single targeted batch to one shard endpoint // B) multiple targeted batches, but only containing targeted writes for a single write op // // This means that any multi-shard write operation must be targeted and sent one-by-one. // Subsequent single-shard write operations can be batched together if they go to the same // place. // // Ex: ShardA : { skey : a->k }, ShardB : { skey : k->z } // // Ordered insert batch of: [{ skey : a }, { skey : b }, { skey : x }] // broken into: // [{ skey : a }, { skey : b }], // [{ skey : x }] // // Ordered update Batch of : // [{ skey : a }{ $push }, // { skey : b }{ $push }, // { skey : [c, x] }{ $push }, // { skey : y }{ $push }, // { skey : z }{ $push }] // broken into: // [{ skey : a }, { skey : b }], // [{ skey : [c,x] }], // [{ skey : y }, { skey : z }] // const bool ordered = _clientRequest->getOrdered(); TargetedBatchMap batchMap; TargetedBatchSizeMap batchSizes; int numTargetErrors = 0; size_t numWriteOps = _clientRequest->sizeWriteOps(); for ( size_t i = 0; i < numWriteOps; ++i ) { WriteOp& writeOp = _writeOps[i]; // Only target _Ready ops if ( writeOp.getWriteState() != WriteOpState_Ready ) continue; // // Get TargetedWrites from the targeter for the write operation // // TargetedWrites need to be owned once returned OwnedPointerVector writesOwned; vector& writes = writesOwned.mutableVector(); Status targetStatus = writeOp.targetWrites( targeter, &writes ); if ( !targetStatus.isOK() ) { WriteErrorDetail targetError; buildTargetError( targetStatus, &targetError ); if ( !recordTargetErrors ) { // Cancel current batch state with an error cancelBatches( targetError, _writeOps, &batchMap ); dassert( batchMap.empty() ); return targetStatus; } else if ( !ordered || batchMap.empty() ) { // Record an error for this batch writeOp.setOpError( targetError ); ++numTargetErrors; if ( ordered ) return Status::OK(); continue; } else { dassert( ordered && !batchMap.empty() ); // Send out what we have, but don't record an error yet, since there may be an // error in the writes before this point. writeOp.cancelWrites( &targetError ); break; } } // // If ordered and we have a previous endpoint, make sure we don't need to send these // targeted writes to any other endpoints. // if ( ordered && !batchMap.empty() ) { dassert( batchMap.size() == 1u ); if ( isNewBatchRequired( writes, batchMap ) ) { writeOp.cancelWrites( NULL ); break; } } // // If this write will push us over some sort of size limit, stop targeting // int writeSizeBytes = getWriteSizeBytes(writeOp); if (wouldMakeBatchesTooBig(writes, writeSizeBytes, batchSizes)) { invariant(!batchMap.empty()); writeOp.cancelWrites(NULL); break; } // // Targeting went ok, add to appropriate TargetedBatch // for ( vector::iterator it = writes.begin(); it != writes.end(); ++it ) { TargetedWrite* write = *it; TargetedBatchMap::iterator batchIt = batchMap.find( &write->endpoint ); TargetedBatchSizeMap::iterator batchSizeIt = batchSizes.find( &write->endpoint ); if ( batchIt == batchMap.end() ) { TargetedWriteBatch* newBatch = new TargetedWriteBatch( write->endpoint ); batchIt = batchMap.insert( make_pair( &newBatch->getEndpoint(), newBatch ) ).first; batchSizeIt = batchSizes.insert(make_pair(&newBatch->getEndpoint(), BatchSize())).first; } TargetedWriteBatch* batch = batchIt->second; BatchSize& batchSize = batchSizeIt->second; ++batchSize.numOps; batchSize.sizeBytes += writeSizeBytes; batch->addWrite( write ); } // Relinquish ownership of TargetedWrites, now the TargetedBatches own them writesOwned.mutableVector().clear(); // // Break if we're ordered and we have more than one endpoint - later writes cannot be // enforced as ordered across multiple shard endpoints. // if ( ordered && batchMap.size() > 1u ) break; } // // Send back our targeted batches // for ( TargetedBatchMap::iterator it = batchMap.begin(); it != batchMap.end(); ++it ) { TargetedWriteBatch* batch = it->second; if ( batch->getWrites().empty() ) continue; // Remember targeted batch for reporting _targeted.insert( batch ); // Send the handle back to caller targetedBatches->push_back( batch ); } return Status::OK(); } void BatchWriteOp::buildBatchRequest( const TargetedWriteBatch& targetedBatch, BatchedCommandRequest* request ) const { request->setNS( _clientRequest->getNS() ); request->setShouldBypassValidation(_clientRequest->shouldBypassValidation()); const vector& targetedWrites = targetedBatch.getWrites(); for ( vector::const_iterator it = targetedWrites.begin(); it != targetedWrites.end(); ++it ) { const WriteOpRef& writeOpRef = ( *it )->writeOpRef; BatchedCommandRequest::BatchType batchType = _clientRequest->getBatchType(); // NOTE: We copy the batch items themselves here from the client request // TODO: This could be inefficient, maybe we want to just reference in the future if ( batchType == BatchedCommandRequest::BatchType_Insert ) { BatchedInsertRequest* clientInsertRequest = _clientRequest->getInsertRequest(); BSONObj insertDoc = clientInsertRequest->getDocumentsAt( writeOpRef.first ); request->getInsertRequest()->addToDocuments( insertDoc ); } else if ( batchType == BatchedCommandRequest::BatchType_Update ) { BatchedUpdateRequest* clientUpdateRequest = _clientRequest->getUpdateRequest(); BatchedUpdateDocument* updateDoc = new BatchedUpdateDocument; clientUpdateRequest->getUpdatesAt( writeOpRef.first )->cloneTo( updateDoc ); request->getUpdateRequest()->addToUpdates( updateDoc ); } else { dassert( batchType == BatchedCommandRequest::BatchType_Delete ); BatchedDeleteRequest* clientDeleteRequest = _clientRequest->getDeleteRequest(); BatchedDeleteDocument* deleteDoc = new BatchedDeleteDocument; clientDeleteRequest->getDeletesAt( writeOpRef.first )->cloneTo( deleteDoc ); request->getDeleteRequest()->addToDeletes( deleteDoc ); } // TODO: We can add logic here to allow aborting individual ops //if ( NULL == response ) { // ->responses.erase( it++ ); // continue; //} } if ( _clientRequest->isWriteConcernSet() ) { if ( _clientRequest->isVerboseWC() ) { request->setWriteConcern( _clientRequest->getWriteConcern() ); } else { // Mongos needs to send to the shard with w > 0 so it will be able to // see the writeErrors. request->setWriteConcern( upgradeWriteConcern( _clientRequest->getWriteConcern() )); } } if ( !request->isOrderedSet() ) { request->setOrdered( _clientRequest->getOrdered() ); } unique_ptr requestMetadata( new BatchedRequestMetadata() ); requestMetadata->setShardName( targetedBatch.getEndpoint().shardName ); requestMetadata->setShardVersion( targetedBatch.getEndpoint().shardVersion ); requestMetadata->setSession( 0 ); request->setMetadata( requestMetadata.release() ); } // // Helpers for manipulating batch responses // namespace { struct WriteErrorDetailComp { bool operator()( const WriteErrorDetail* errorA, const WriteErrorDetail* errorB ) const { return errorA->getIndex() < errorB->getIndex(); } }; } static void cloneCommandErrorTo( const BatchedCommandResponse& batchResp, WriteErrorDetail* details ) { details->setErrCode( batchResp.getErrCode() ); details->setErrMessage( batchResp.getErrMessage() ); } // Given *either* a batch error or an array of per-item errors, copies errors we're interested // in into a TrackedErrorMap static void trackErrors( const ShardEndpoint& endpoint, const vector itemErrors, TrackedErrors* trackedErrors ) { for ( vector::const_iterator it = itemErrors.begin(); it != itemErrors.end(); ++it ) { const WriteErrorDetail* error = *it; if ( trackedErrors->isTracking( error->getErrCode() ) ) { trackedErrors->addError( new ShardError( endpoint, *error ) ); } } } static void incBatchStats( BatchedCommandRequest::BatchType batchType, const BatchedCommandResponse& response, BatchWriteStats* stats ) { if ( batchType == BatchedCommandRequest::BatchType_Insert) { stats->numInserted += response.getN(); } else if ( batchType == BatchedCommandRequest::BatchType_Update ) { int numUpserted = 0; if( response.isUpsertDetailsSet() ) { numUpserted = response.sizeUpsertDetails(); } stats->numMatched += ( response.getN() - numUpserted ); long long numModified = response.getNModified(); if (numModified >= 0) stats->numModified += numModified; else stats->numModified = -1; // sentinel used to indicate we omit the field downstream stats->numUpserted += numUpserted; } else { dassert( batchType == BatchedCommandRequest::BatchType_Delete ); stats->numDeleted += response.getN(); } } void BatchWriteOp::noteBatchResponse( const TargetedWriteBatch& targetedBatch, const BatchedCommandResponse& response, TrackedErrors* trackedErrors ) { if ( !response.getOk() ) { WriteErrorDetail error; cloneCommandErrorTo( response, &error ); // Treat command errors exactly like other failures of the batch // Note that no errors will be tracked from these failures - as-designed noteBatchError( targetedBatch, error ); return; } dassert( response.getOk() ); // Stop tracking targeted batch _targeted.erase( &targetedBatch ); // Increment stats for this batch incBatchStats( _clientRequest->getBatchType(), response, _stats.get() ); // // Assign errors to particular items. // Write Concern errors are stored and handled later. // // Special handling for write concern errors, save for later if ( response.isWriteConcernErrorSet() ) { unique_ptr wcError( new ShardWCError( targetedBatch.getEndpoint(), *response.getWriteConcernError() )); _wcErrors.mutableVector().push_back( wcError.release() ); } vector itemErrors; // Handle batch and per-item errors if ( response.isErrDetailsSet() ) { // Per-item errors were set itemErrors.insert( itemErrors.begin(), response.getErrDetails().begin(), response.getErrDetails().end() ); // Sort per-item errors by index std::sort( itemErrors.begin(), itemErrors.end(), WriteErrorDetailComp() ); } // // Go through all pending responses of the op and sorted remote reponses, populate errors // This will either set all errors to the batch error or apply per-item errors as-needed // // If the batch is ordered, cancel all writes after the first error for retargeting. // bool ordered = _clientRequest->getOrdered(); vector::iterator itemErrorIt = itemErrors.begin(); int index = 0; WriteErrorDetail* lastError = NULL; for ( vector::const_iterator it = targetedBatch.getWrites().begin(); it != targetedBatch.getWrites().end(); ++it, ++index ) { const TargetedWrite* write = *it; WriteOp& writeOp = _writeOps[write->writeOpRef.first]; dassert( writeOp.getWriteState() == WriteOpState_Pending ); // See if we have an error for the write WriteErrorDetail* writeError = NULL; if ( itemErrorIt != itemErrors.end() && ( *itemErrorIt )->getIndex() == index ) { // We have an per-item error for this write op's index writeError = *itemErrorIt; ++itemErrorIt; } // Finish the response (with error, if needed) if ( NULL == writeError ) { if ( !ordered || !lastError ){ writeOp.noteWriteComplete( *write ); } else { // We didn't actually apply this write - cancel so we can retarget dassert( writeOp.getNumTargeted() == 1u ); writeOp.cancelWrites( lastError ); } } else { writeOp.noteWriteError( *write, *writeError ); lastError = writeError; } } // Track errors we care about, whether batch or individual errors if ( NULL != trackedErrors ) { trackErrors( targetedBatch.getEndpoint(), itemErrors, trackedErrors ); } // Track upserted ids if we need to if ( response.isUpsertDetailsSet() ) { const vector& upsertedIds = response.getUpsertDetails(); for ( vector::const_iterator it = upsertedIds.begin(); it != upsertedIds.end(); ++it ) { // The child upserted details don't have the correct index for the full batch const BatchedUpsertDetail* childUpsertedId = *it; // Work backward from the child batch item index to the batch item index int childBatchIndex = childUpsertedId->getIndex(); int batchIndex = targetedBatch.getWrites()[childBatchIndex]->writeOpRef.first; // Push the upserted id with the correct index into the batch upserted ids BatchedUpsertDetail* upsertedId = new BatchedUpsertDetail; upsertedId->setIndex( batchIndex ); upsertedId->setUpsertedID( childUpsertedId->getUpsertedID() ); _upsertedIds.mutableVector().push_back( upsertedId ); } } } static void toWriteErrorResponse( const WriteErrorDetail& error, bool ordered, int numWrites, BatchedCommandResponse* writeErrResponse ) { writeErrResponse->setOk( true ); writeErrResponse->setN( 0 ); int numErrors = ordered ? 1 : numWrites; for ( int i = 0; i < numErrors; i++ ) { unique_ptr errorClone( new WriteErrorDetail ); error.cloneTo( errorClone.get() ); errorClone->setIndex( i ); writeErrResponse->addToErrDetails( errorClone.release() ); } dassert( writeErrResponse->isValid( NULL ) ); } void BatchWriteOp::noteBatchError( const TargetedWriteBatch& targetedBatch, const WriteErrorDetail& error ) { // Treat errors to get a batch response as failures of the contained writes BatchedCommandResponse emulatedResponse; toWriteErrorResponse( error, _clientRequest->getOrdered(), targetedBatch.getWrites().size(), &emulatedResponse ); noteBatchResponse( targetedBatch, emulatedResponse, NULL ); } void BatchWriteOp::abortBatch( const WriteErrorDetail& error ) { dassert( !isFinished() ); dassert( numWriteOpsIn( WriteOpState_Pending ) == 0 ); size_t numWriteOps = _clientRequest->sizeWriteOps(); bool orderedOps = _clientRequest->getOrdered(); for ( size_t i = 0; i < numWriteOps; ++i ) { WriteOp& writeOp = _writeOps[i]; // Can only be called with no outstanding batches dassert( writeOp.getWriteState() != WriteOpState_Pending ); if ( writeOp.getWriteState() < WriteOpState_Completed ) { writeOp.setOpError( error ); // Only one error if we're ordered if ( orderedOps ) break; } } dassert( isFinished() ); } bool BatchWriteOp::isFinished() { size_t numWriteOps = _clientRequest->sizeWriteOps(); bool orderedOps = _clientRequest->getOrdered(); for ( size_t i = 0; i < numWriteOps; ++i ) { WriteOp& writeOp = _writeOps[i]; if ( writeOp.getWriteState() < WriteOpState_Completed ) return false; else if ( orderedOps && writeOp.getWriteState() == WriteOpState_Error ) return true; } return true; } // // Aggregation functions for building the final response errors // void BatchWriteOp::buildClientResponse( BatchedCommandResponse* batchResp ) { dassert( isFinished() ); // Result is OK batchResp->setOk( true ); // For non-verbose, it's all we need. if ( !_clientRequest->isVerboseWC() ) { dassert( batchResp->isValid( NULL ) ); return; } // // Find all the errors in the batch // vector errOps; size_t numWriteOps = _clientRequest->sizeWriteOps(); for ( size_t i = 0; i < numWriteOps; ++i ) { WriteOp& writeOp = _writeOps[i]; if ( writeOp.getWriteState() == WriteOpState_Error ) { errOps.push_back( &writeOp ); } } // // Build the per-item errors. // if ( !errOps.empty() ) { for ( vector::iterator it = errOps.begin(); it != errOps.end(); ++it ) { WriteOp& writeOp = **it; WriteErrorDetail* error = new WriteErrorDetail(); writeOp.getOpError().cloneTo( error ); batchResp->addToErrDetails( error ); } } // Only return a write concern error if everything succeeded (unordered or ordered) // OR if something succeeded and we're unordered bool reportWCError = errOps.empty() || ( !_clientRequest->getOrdered() && errOps.size() < _clientRequest->sizeWriteOps() ); if ( !_wcErrors.empty() && reportWCError ) { WCErrorDetail* error = new WCErrorDetail; // Generate the multi-error message below stringstream msg; if ( _wcErrors.size() > 1 ) { msg << "multiple errors reported : "; error->setErrCode( ErrorCodes::WriteConcernFailed ); } else { error->setErrCode( ( *_wcErrors.begin() )->error.getErrCode() ); } for ( vector::const_iterator it = _wcErrors.begin(); it != _wcErrors.end(); ++it ) { const ShardWCError* wcError = *it; if ( it != _wcErrors.begin() ) msg << " :: and :: "; msg << wcError->error.getErrMessage() << " at " << wcError->endpoint.shardName; } error->setErrMessage( msg.str() ); batchResp->setWriteConcernError( error ); } // // Append the upserted ids, if required // if ( _upsertedIds.size() != 0 ) { batchResp->setUpsertDetails( _upsertedIds.vector() ); } // Stats int nValue = _stats->numInserted + _stats->numUpserted + _stats->numMatched + _stats->numDeleted; batchResp->setN( nValue ); if ( _clientRequest->getBatchType() == BatchedCommandRequest::BatchType_Update && _stats->numModified >= 0) { batchResp->setNModified( _stats->numModified ); } dassert( batchResp->isValid( NULL ) ); } BatchWriteOp::~BatchWriteOp() { // Caller's responsibility to dispose of TargetedBatches dassert( _targeted.empty() ); if ( NULL != _writeOps ) { size_t numWriteOps = _clientRequest->sizeWriteOps(); for ( size_t i = 0; i < numWriteOps; ++i ) { // Placement new so manual destruct _writeOps[i].~WriteOp(); } ::operator delete[]( _writeOps ); _writeOps = NULL; } } int BatchWriteOp::numWriteOps() const { return static_cast( _clientRequest->sizeWriteOps() ); } int BatchWriteOp::numWriteOpsIn( WriteOpState opState ) const { // TODO: This could be faster, if we tracked this info explicitly size_t numWriteOps = _clientRequest->sizeWriteOps(); int count = 0; for ( size_t i = 0; i < numWriteOps; ++i ) { WriteOp& writeOp = _writeOps[i]; if ( writeOp.getWriteState() == opState ) ++count; } return count; } void TrackedErrors::startTracking( int errCode ) { dassert( !isTracking( errCode ) ); _errorMap.insert( make_pair( errCode, vector() ) ); } bool TrackedErrors::isTracking( int errCode ) const { return _errorMap.find( errCode ) != _errorMap.end(); } void TrackedErrors::addError( ShardError* error ) { TrackedErrorMap::iterator seenIt = _errorMap.find( error->error.getErrCode() ); if ( seenIt == _errorMap.end() ) return; seenIt->second.push_back( error ); } const vector& TrackedErrors::getErrors( int errCode ) const { dassert( isTracking( errCode ) ); return _errorMap.find( errCode )->second; } void TrackedErrors::clear() { for ( TrackedErrorMap::iterator it = _errorMap.begin(); it != _errorMap.end(); ++it ) { vector& errors = it->second; for ( vector::iterator errIt = errors.begin(); errIt != errors.end(); ++errIt ) { delete *errIt; } errors.clear(); } } TrackedErrors::~TrackedErrors() { clear(); } }