summaryrefslogtreecommitdiff
path: root/src/mongo/s/write_ops/batch_write_op.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/s/write_ops/batch_write_op.cpp')
-rw-r--r--src/mongo/s/write_ops/batch_write_op.cpp1338
1 files changed, 639 insertions, 699 deletions
diff --git a/src/mongo/s/write_ops/batch_write_op.cpp b/src/mongo/s/write_ops/batch_write_op.cpp
index b1032410c7a..847b96ff9ee 100644
--- a/src/mongo/s/write_ops/batch_write_op.cpp
+++ b/src/mongo/s/write_ops/batch_write_op.cpp
@@ -32,889 +32,829 @@
namespace mongo {
- using std::unique_ptr;
- using std::make_pair;
- using std::set;
- using std::stringstream;
- using std::vector;
-
- /**
- * Returns a new write concern that has the copy of every field from the original
- * document but with a w set to 1. This is intended for upgrading { w: 0 } write
- * concern to { w: 1 }.
- */
- static BSONObj upgradeWriteConcern ( const BSONObj& origWriteConcern ) {
- BSONObjIterator iter( origWriteConcern );
- BSONObjBuilder newWriteConcern;
-
- while ( iter.more() ) {
- BSONElement elem( iter.next() );
-
- if ( strncmp( elem.fieldName(), "w", 2 ) == 0 ) {
- newWriteConcern.append( "w", 1 );
- }
- else {
- newWriteConcern.append( elem );
- }
- }
-
- return newWriteConcern.obj();
- }
-
- BatchWriteStats::BatchWriteStats() :
- numInserted( 0 ), numUpserted( 0 ), numMatched( 0 ), numModified( 0 ), numDeleted( 0 ) {
- }
+using std::unique_ptr;
+using std::make_pair;
+using std::set;
+using std::stringstream;
+using std::vector;
- BatchWriteOp::BatchWriteOp() :
- _clientRequest( NULL ), _writeOps( NULL ), _stats( new BatchWriteStats ) {
- }
-
- void BatchWriteOp::initClientRequest( const BatchedCommandRequest* clientRequest ) {
- dassert( clientRequest->isValid( NULL ) );
+/**
+ * Returns a new write concern that has the copy of every field from the original
+ * document but with a w set to 1. This is intended for upgrading { w: 0 } write
+ * concern to { w: 1 }.
+ */
+static BSONObj upgradeWriteConcern(const BSONObj& origWriteConcern) {
+ BSONObjIterator iter(origWriteConcern);
+ BSONObjBuilder newWriteConcern;
- size_t numWriteOps = clientRequest->sizeWriteOps();
- _writeOps = static_cast<WriteOp*>( ::operator new[]( numWriteOps * sizeof(WriteOp) ) );
+ while (iter.more()) {
+ BSONElement elem(iter.next());
- for ( size_t i = 0; i < numWriteOps; ++i ) {
- // Don't want to have to define what an empty WriteOp means, so construct in-place
- new ( &_writeOps[i] ) WriteOp( BatchItemRef( clientRequest, i ) );
+ if (strncmp(elem.fieldName(), "w", 2) == 0) {
+ newWriteConcern.append("w", 1);
+ } else {
+ newWriteConcern.append(elem);
}
-
- _clientRequest = clientRequest;
}
- // Arbitrary endpoint ordering, needed for grouping by endpoint
- static int compareEndpoints( const ShardEndpoint* endpointA, const ShardEndpoint* endpointB ) {
-
- int shardNameDiff = endpointA->shardName.compare( endpointB->shardName );
- if ( shardNameDiff != 0 ) return shardNameDiff;
-
- long shardVersionDiff = endpointA->shardVersion.toLong() - endpointB->shardVersion.toLong();
- if ( shardVersionDiff != 0 ) return shardVersionDiff;
-
- int shardEpochDiff =
- endpointA->shardVersion.epoch().compare( endpointB->shardVersion.epoch() );
- return shardEpochDiff;
- }
-
- namespace {
-
- //
- // Types for comparing shard endpoints in a map
- //
-
- struct EndpointComp {
- bool operator()( const ShardEndpoint* endpointA,
- const ShardEndpoint* endpointB ) const {
- return compareEndpoints( endpointA, endpointB ) < 0;
- }
- };
-
- typedef std::map<const ShardEndpoint*, TargetedWriteBatch*, EndpointComp> TargetedBatchMap;
-
- //
- // Types for tracking batch sizes
- //
+ return newWriteConcern.obj();
+}
- struct BatchSize {
+BatchWriteStats::BatchWriteStats()
+ : numInserted(0), numUpserted(0), numMatched(0), numModified(0), numDeleted(0) {}
- BatchSize() :
- numOps(0), sizeBytes(0) {
- }
+BatchWriteOp::BatchWriteOp() : _clientRequest(NULL), _writeOps(NULL), _stats(new BatchWriteStats) {}
- int numOps;
- int sizeBytes;
- };
+void BatchWriteOp::initClientRequest(const BatchedCommandRequest* clientRequest) {
+ dassert(clientRequest->isValid(NULL));
- typedef std::map<const ShardEndpoint*, BatchSize, EndpointComp> TargetedBatchSizeMap;
- }
+ size_t numWriteOps = clientRequest->sizeWriteOps();
+ _writeOps = static_cast<WriteOp*>(::operator new[](numWriteOps * sizeof(WriteOp)));
- static void buildTargetError( const Status& errStatus, WriteErrorDetail* details ) {
- details->setErrCode( errStatus.code() );
- details->setErrMessage( errStatus.reason() );
+ for (size_t i = 0; i < numWriteOps; ++i) {
+ // Don't want to have to define what an empty WriteOp means, so construct in-place
+ new (&_writeOps[i]) WriteOp(BatchItemRef(clientRequest, i));
}
- // Helper to determine whether a number of targeted writes require a new targeted batch
- static bool isNewBatchRequired( const vector<TargetedWrite*>& writes,
- const TargetedBatchMap& batchMap ) {
-
- for ( vector<TargetedWrite*>::const_iterator it = writes.begin(); it != writes.end();
- ++it ) {
+ _clientRequest = clientRequest;
+}
- TargetedWrite* write = *it;
- if ( batchMap.find( &write->endpoint ) == batchMap.end() ) {
- return true;
- }
- }
+// Arbitrary endpoint ordering, needed for grouping by endpoint
+static int compareEndpoints(const ShardEndpoint* endpointA, const ShardEndpoint* endpointB) {
+ int shardNameDiff = endpointA->shardName.compare(endpointB->shardName);
+ if (shardNameDiff != 0)
+ return shardNameDiff;
- return false;
- }
+ long shardVersionDiff = endpointA->shardVersion.toLong() - endpointB->shardVersion.toLong();
+ if (shardVersionDiff != 0)
+ return shardVersionDiff;
- // MAGIC NUMBERS
- // Before serializing updates/deletes, we don't know how big their fields would be, but we break
- // batches before serializing.
- // TODO: Revisit when we revisit command limits in general
- static const int kEstUpdateOverheadBytes = (BSONObjMaxInternalSize - BSONObjMaxUserSize) / 100;
- static const int kEstDeleteOverheadBytes = (BSONObjMaxInternalSize - BSONObjMaxUserSize) / 100;
+ int shardEpochDiff = endpointA->shardVersion.epoch().compare(endpointB->shardVersion.epoch());
+ return shardEpochDiff;
+}
- static int getWriteSizeBytes(const WriteOp& writeOp) {
+namespace {
- const BatchItemRef& item = writeOp.getWriteItem();
- BatchedCommandRequest::BatchType batchType = item.getOpType();
+//
+// Types for comparing shard endpoints in a map
+//
- if (batchType == BatchedCommandRequest::BatchType_Insert) {
- return item.getDocument().objsize();
- }
- else if (batchType == BatchedCommandRequest::BatchType_Update) {
- // Note: Be conservative here - it's okay if we send slightly too many batches
- int estSize = item.getUpdate()->getQuery().objsize()
- + item.getUpdate()->getUpdateExpr().objsize() + kEstUpdateOverheadBytes;
- dassert(estSize >= item.getUpdate()->toBSON().objsize());
- return estSize;
- }
- else {
- dassert( batchType == BatchedCommandRequest::BatchType_Delete );
- // Note: Be conservative here - it's okay if we send slightly too many batches
- int estSize = item.getDelete()->getQuery().objsize() + kEstDeleteOverheadBytes;
- dassert(estSize >= item.getDelete()->toBSON().objsize());
- return estSize;
- }
+struct EndpointComp {
+ bool operator()(const ShardEndpoint* endpointA, const ShardEndpoint* endpointB) const {
+ return compareEndpoints(endpointA, endpointB) < 0;
}
+};
- // Helper to determine whether a number of targeted writes require a new targeted batch
- static bool wouldMakeBatchesTooBig(const vector<TargetedWrite*>& writes,
- int writeSizeBytes,
- const TargetedBatchSizeMap& batchSizes) {
+typedef std::map<const ShardEndpoint*, TargetedWriteBatch*, EndpointComp> TargetedBatchMap;
- for (vector<TargetedWrite*>::const_iterator it = writes.begin(); it != writes.end(); ++it) {
+//
+// Types for tracking batch sizes
+//
- const TargetedWrite* write = *it;
- TargetedBatchSizeMap::const_iterator seenIt = batchSizes.find(&write->endpoint);
+struct BatchSize {
+ BatchSize() : numOps(0), sizeBytes(0) {}
- if (seenIt == batchSizes.end()) {
- // If this is the first item in the batch, it can't be too big
- continue;
- }
+ int numOps;
+ int sizeBytes;
+};
- const BatchSize& batchSize = seenIt->second;
+typedef std::map<const ShardEndpoint*, BatchSize, EndpointComp> TargetedBatchSizeMap;
+}
- if (batchSize.numOps >= static_cast<int>(BatchedCommandRequest::kMaxWriteBatchSize)) {
- // Too many items in batch
- return true;
- }
+static void buildTargetError(const Status& errStatus, WriteErrorDetail* details) {
+ details->setErrCode(errStatus.code());
+ details->setErrMessage(errStatus.reason());
+}
- if (batchSize.sizeBytes + writeSizeBytes > BSONObjMaxUserSize) {
- // Batch would be too big
- return true;
- }
+// Helper to determine whether a number of targeted writes require a new targeted batch
+static bool isNewBatchRequired(const vector<TargetedWrite*>& writes,
+ const TargetedBatchMap& batchMap) {
+ for (vector<TargetedWrite*>::const_iterator it = writes.begin(); it != writes.end(); ++it) {
+ TargetedWrite* write = *it;
+ if (batchMap.find(&write->endpoint) == batchMap.end()) {
+ return true;
}
-
- return false;
}
- // Helper function to cancel all the write ops of targeted batches in a map
- static void cancelBatches( const WriteErrorDetail& why,
- WriteOp* writeOps,
- TargetedBatchMap* batchMap ) {
-
- set<WriteOp*> targetedWriteOps;
+ return false;
+}
- // Collect all the writeOps that are currently targeted
- for ( TargetedBatchMap::iterator it = batchMap->begin(); it != batchMap->end(); ) {
+// MAGIC NUMBERS
+// Before serializing updates/deletes, we don't know how big their fields would be, but we break
+// batches before serializing.
+// TODO: Revisit when we revisit command limits in general
+static const int kEstUpdateOverheadBytes = (BSONObjMaxInternalSize - BSONObjMaxUserSize) / 100;
+static const int kEstDeleteOverheadBytes = (BSONObjMaxInternalSize - BSONObjMaxUserSize) / 100;
+
+static int getWriteSizeBytes(const WriteOp& writeOp) {
+ const BatchItemRef& item = writeOp.getWriteItem();
+ BatchedCommandRequest::BatchType batchType = item.getOpType();
+
+ if (batchType == BatchedCommandRequest::BatchType_Insert) {
+ return item.getDocument().objsize();
+ } else if (batchType == BatchedCommandRequest::BatchType_Update) {
+ // Note: Be conservative here - it's okay if we send slightly too many batches
+ int estSize = item.getUpdate()->getQuery().objsize() +
+ item.getUpdate()->getUpdateExpr().objsize() + kEstUpdateOverheadBytes;
+ dassert(estSize >= item.getUpdate()->toBSON().objsize());
+ return estSize;
+ } else {
+ dassert(batchType == BatchedCommandRequest::BatchType_Delete);
+ // Note: Be conservative here - it's okay if we send slightly too many batches
+ int estSize = item.getDelete()->getQuery().objsize() + kEstDeleteOverheadBytes;
+ dassert(estSize >= item.getDelete()->toBSON().objsize());
+ return estSize;
+ }
+}
- TargetedWriteBatch* batch = it->second;
- const vector<TargetedWrite*>& writes = batch->getWrites();
+// Helper to determine whether a number of targeted writes require a new targeted batch
+static bool wouldMakeBatchesTooBig(const vector<TargetedWrite*>& writes,
+ int writeSizeBytes,
+ const TargetedBatchSizeMap& batchSizes) {
+ for (vector<TargetedWrite*>::const_iterator it = writes.begin(); it != writes.end(); ++it) {
+ const TargetedWrite* write = *it;
+ TargetedBatchSizeMap::const_iterator seenIt = batchSizes.find(&write->endpoint);
- for ( vector<TargetedWrite*>::const_iterator writeIt = writes.begin();
- writeIt != writes.end(); ++writeIt ) {
+ if (seenIt == batchSizes.end()) {
+ // If this is the first item in the batch, it can't be too big
+ continue;
+ }
- TargetedWrite* write = *writeIt;
+ const BatchSize& batchSize = seenIt->second;
- // NOTE: We may repeatedly cancel a write op here, but that's fast and we want to
- // cancel before erasing the TargetedWrite* (which owns the cancelled targeting
- // info) for reporting reasons.
- writeOps[write->writeOpRef.first].cancelWrites( &why );
- }
+ if (batchSize.numOps >= static_cast<int>(BatchedCommandRequest::kMaxWriteBatchSize)) {
+ // Too many items in batch
+ return true;
+ }
- // Note that we need to *erase* first, *then* delete, since the map keys are ptrs from
- // the values
- batchMap->erase( it++ );
- delete batch;
+ if (batchSize.sizeBytes + writeSizeBytes > BSONObjMaxUserSize) {
+ // Batch would be too big
+ return true;
}
- batchMap->clear();
}
- Status BatchWriteOp::targetBatch( const NSTargeter& targeter,
- bool recordTargetErrors,
- vector<TargetedWriteBatch*>* targetedBatches ) {
-
- //
- // Targeting of unordered batches is fairly simple - each remaining write op is targeted,
- // and each of those targeted writes are grouped into a batch for a particular shard
- // endpoint.
- //
- // Targeting of ordered batches is a bit more complex - to respect the ordering of the
- // batch, we can only send:
- // A) a single targeted batch to one shard endpoint
- // B) multiple targeted batches, but only containing targeted writes for a single write op
- //
- // This means that any multi-shard write operation must be targeted and sent one-by-one.
- // Subsequent single-shard write operations can be batched together if they go to the same
- // place.
- //
- // Ex: ShardA : { skey : a->k }, ShardB : { skey : k->z }
- //
- // Ordered insert batch of: [{ skey : a }, { skey : b }, { skey : x }]
- // broken into:
- // [{ skey : a }, { skey : b }],
- // [{ skey : x }]
- //
- // Ordered update Batch of :
- // [{ skey : a }{ $push },
- // { skey : b }{ $push },
- // { skey : [c, x] }{ $push },
- // { skey : y }{ $push },
- // { skey : z }{ $push }]
- // broken into:
- // [{ skey : a }, { skey : b }],
- // [{ skey : [c,x] }],
- // [{ skey : y }, { skey : z }]
- //
-
- const bool ordered = _clientRequest->getOrdered();
-
- TargetedBatchMap batchMap;
- TargetedBatchSizeMap batchSizes;
+ return false;
+}
- int numTargetErrors = 0;
+// Helper function to cancel all the write ops of targeted batches in a map
+static void cancelBatches(const WriteErrorDetail& why,
+ WriteOp* writeOps,
+ TargetedBatchMap* batchMap) {
+ set<WriteOp*> targetedWriteOps;
- size_t numWriteOps = _clientRequest->sizeWriteOps();
- for ( size_t i = 0; i < numWriteOps; ++i ) {
+ // Collect all the writeOps that are currently targeted
+ for (TargetedBatchMap::iterator it = batchMap->begin(); it != batchMap->end();) {
+ TargetedWriteBatch* batch = it->second;
+ const vector<TargetedWrite*>& writes = batch->getWrites();
- WriteOp& writeOp = _writeOps[i];
+ for (vector<TargetedWrite*>::const_iterator writeIt = writes.begin();
+ writeIt != writes.end();
+ ++writeIt) {
+ TargetedWrite* write = *writeIt;
- // Only target _Ready ops
- if ( writeOp.getWriteState() != WriteOpState_Ready ) continue;
+ // NOTE: We may repeatedly cancel a write op here, but that's fast and we want to
+ // cancel before erasing the TargetedWrite* (which owns the cancelled targeting
+ // info) for reporting reasons.
+ writeOps[write->writeOpRef.first].cancelWrites(&why);
+ }
- //
- // Get TargetedWrites from the targeter for the write operation
- //
+ // Note that we need to *erase* first, *then* delete, since the map keys are ptrs from
+ // the values
+ batchMap->erase(it++);
+ delete batch;
+ }
+ batchMap->clear();
+}
- // TargetedWrites need to be owned once returned
- OwnedPointerVector<TargetedWrite> writesOwned;
- vector<TargetedWrite*>& writes = writesOwned.mutableVector();
+Status BatchWriteOp::targetBatch(const NSTargeter& targeter,
+ bool recordTargetErrors,
+ vector<TargetedWriteBatch*>* targetedBatches) {
+ //
+ // Targeting of unordered batches is fairly simple - each remaining write op is targeted,
+ // and each of those targeted writes are grouped into a batch for a particular shard
+ // endpoint.
+ //
+ // Targeting of ordered batches is a bit more complex - to respect the ordering of the
+ // batch, we can only send:
+ // A) a single targeted batch to one shard endpoint
+ // B) multiple targeted batches, but only containing targeted writes for a single write op
+ //
+ // This means that any multi-shard write operation must be targeted and sent one-by-one.
+ // Subsequent single-shard write operations can be batched together if they go to the same
+ // place.
+ //
+ // Ex: ShardA : { skey : a->k }, ShardB : { skey : k->z }
+ //
+ // Ordered insert batch of: [{ skey : a }, { skey : b }, { skey : x }]
+ // broken into:
+ // [{ skey : a }, { skey : b }],
+ // [{ skey : x }]
+ //
+ // Ordered update Batch of :
+ // [{ skey : a }{ $push },
+ // { skey : b }{ $push },
+ // { skey : [c, x] }{ $push },
+ // { skey : y }{ $push },
+ // { skey : z }{ $push }]
+ // broken into:
+ // [{ skey : a }, { skey : b }],
+ // [{ skey : [c,x] }],
+ // [{ skey : y }, { skey : z }]
+ //
- Status targetStatus = writeOp.targetWrites( targeter, &writes );
+ const bool ordered = _clientRequest->getOrdered();
- if ( !targetStatus.isOK() ) {
+ TargetedBatchMap batchMap;
+ TargetedBatchSizeMap batchSizes;
- WriteErrorDetail targetError;
- buildTargetError( targetStatus, &targetError );
+ int numTargetErrors = 0;
- if ( !recordTargetErrors ) {
+ size_t numWriteOps = _clientRequest->sizeWriteOps();
+ for (size_t i = 0; i < numWriteOps; ++i) {
+ WriteOp& writeOp = _writeOps[i];
- // Cancel current batch state with an error
+ // Only target _Ready ops
+ if (writeOp.getWriteState() != WriteOpState_Ready)
+ continue;
- cancelBatches( targetError, _writeOps, &batchMap );
- dassert( batchMap.empty() );
- return targetStatus;
- }
- else if ( !ordered || batchMap.empty() ) {
+ //
+ // Get TargetedWrites from the targeter for the write operation
+ //
- // Record an error for this batch
+ // TargetedWrites need to be owned once returned
+ OwnedPointerVector<TargetedWrite> writesOwned;
+ vector<TargetedWrite*>& writes = writesOwned.mutableVector();
- writeOp.setOpError( targetError );
- ++numTargetErrors;
+ Status targetStatus = writeOp.targetWrites(targeter, &writes);
- if ( ordered )
- return Status::OK();
+ if (!targetStatus.isOK()) {
+ WriteErrorDetail targetError;
+ buildTargetError(targetStatus, &targetError);
- continue;
- }
- else {
- dassert( ordered && !batchMap.empty() );
+ if (!recordTargetErrors) {
+ // Cancel current batch state with an error
- // Send out what we have, but don't record an error yet, since there may be an
- // error in the writes before this point.
+ cancelBatches(targetError, _writeOps, &batchMap);
+ dassert(batchMap.empty());
+ return targetStatus;
+ } else if (!ordered || batchMap.empty()) {
+ // Record an error for this batch
- writeOp.cancelWrites( &targetError );
- break;
- }
- }
+ writeOp.setOpError(targetError);
+ ++numTargetErrors;
- //
- // If ordered and we have a previous endpoint, make sure we don't need to send these
- // targeted writes to any other endpoints.
- //
+ if (ordered)
+ return Status::OK();
- if ( ordered && !batchMap.empty() ) {
+ continue;
+ } else {
+ dassert(ordered && !batchMap.empty());
- dassert( batchMap.size() == 1u );
- if ( isNewBatchRequired( writes, batchMap ) ) {
+ // Send out what we have, but don't record an error yet, since there may be an
+ // error in the writes before this point.
- writeOp.cancelWrites( NULL );
- break;
- }
+ writeOp.cancelWrites(&targetError);
+ break;
}
+ }
- //
- // If this write will push us over some sort of size limit, stop targeting
- //
+ //
+ // If ordered and we have a previous endpoint, make sure we don't need to send these
+ // targeted writes to any other endpoints.
+ //
- int writeSizeBytes = getWriteSizeBytes(writeOp);
- if (wouldMakeBatchesTooBig(writes, writeSizeBytes, batchSizes)) {
- invariant(!batchMap.empty());
+ if (ordered && !batchMap.empty()) {
+ dassert(batchMap.size() == 1u);
+ if (isNewBatchRequired(writes, batchMap)) {
writeOp.cancelWrites(NULL);
break;
}
-
- //
- // Targeting went ok, add to appropriate TargetedBatch
- //
-
- for ( vector<TargetedWrite*>::iterator it = writes.begin(); it != writes.end(); ++it ) {
-
- TargetedWrite* write = *it;
-
- TargetedBatchMap::iterator batchIt = batchMap.find( &write->endpoint );
- TargetedBatchSizeMap::iterator batchSizeIt = batchSizes.find( &write->endpoint );
-
- if ( batchIt == batchMap.end() ) {
- TargetedWriteBatch* newBatch = new TargetedWriteBatch( write->endpoint );
- batchIt = batchMap.insert( make_pair( &newBatch->getEndpoint(),
- newBatch ) ).first;
- batchSizeIt = batchSizes.insert(make_pair(&newBatch->getEndpoint(),
- BatchSize())).first;
- }
-
- TargetedWriteBatch* batch = batchIt->second;
- BatchSize& batchSize = batchSizeIt->second;
-
- ++batchSize.numOps;
- batchSize.sizeBytes += writeSizeBytes;
- batch->addWrite( write );
- }
-
- // Relinquish ownership of TargetedWrites, now the TargetedBatches own them
- writesOwned.mutableVector().clear();
-
- //
- // Break if we're ordered and we have more than one endpoint - later writes cannot be
- // enforced as ordered across multiple shard endpoints.
- //
-
- if ( ordered && batchMap.size() > 1u )
- break;
}
//
- // Send back our targeted batches
+ // If this write will push us over some sort of size limit, stop targeting
//
- for ( TargetedBatchMap::iterator it = batchMap.begin(); it != batchMap.end(); ++it ) {
-
- TargetedWriteBatch* batch = it->second;
-
- if ( batch->getWrites().empty() )
- continue;
-
- // Remember targeted batch for reporting
- _targeted.insert( batch );
- // Send the handle back to caller
- targetedBatches->push_back( batch );
+ int writeSizeBytes = getWriteSizeBytes(writeOp);
+ if (wouldMakeBatchesTooBig(writes, writeSizeBytes, batchSizes)) {
+ invariant(!batchMap.empty());
+ writeOp.cancelWrites(NULL);
+ break;
}
- return Status::OK();
- }
-
- void BatchWriteOp::buildBatchRequest( const TargetedWriteBatch& targetedBatch,
- BatchedCommandRequest* request ) const {
-
- request->setNS( _clientRequest->getNS() );
- request->setShouldBypassValidation(_clientRequest->shouldBypassValidation());
-
- const vector<TargetedWrite*>& targetedWrites = targetedBatch.getWrites();
+ //
+ // Targeting went ok, add to appropriate TargetedBatch
+ //
- for ( vector<TargetedWrite*>::const_iterator it = targetedWrites.begin();
- it != targetedWrites.end(); ++it ) {
+ for (vector<TargetedWrite*>::iterator it = writes.begin(); it != writes.end(); ++it) {
+ TargetedWrite* write = *it;
- const WriteOpRef& writeOpRef = ( *it )->writeOpRef;
- BatchedCommandRequest::BatchType batchType = _clientRequest->getBatchType();
+ TargetedBatchMap::iterator batchIt = batchMap.find(&write->endpoint);
+ TargetedBatchSizeMap::iterator batchSizeIt = batchSizes.find(&write->endpoint);
- // NOTE: We copy the batch items themselves here from the client request
- // TODO: This could be inefficient, maybe we want to just reference in the future
- if ( batchType == BatchedCommandRequest::BatchType_Insert ) {
- BatchedInsertRequest* clientInsertRequest = _clientRequest->getInsertRequest();
- BSONObj insertDoc = clientInsertRequest->getDocumentsAt( writeOpRef.first );
- request->getInsertRequest()->addToDocuments( insertDoc );
- }
- else if ( batchType == BatchedCommandRequest::BatchType_Update ) {
- BatchedUpdateRequest* clientUpdateRequest = _clientRequest->getUpdateRequest();
- BatchedUpdateDocument* updateDoc = new BatchedUpdateDocument;
- clientUpdateRequest->getUpdatesAt( writeOpRef.first )->cloneTo( updateDoc );
- request->getUpdateRequest()->addToUpdates( updateDoc );
- }
- else {
- dassert( batchType == BatchedCommandRequest::BatchType_Delete );
- BatchedDeleteRequest* clientDeleteRequest = _clientRequest->getDeleteRequest();
- BatchedDeleteDocument* deleteDoc = new BatchedDeleteDocument;
- clientDeleteRequest->getDeletesAt( writeOpRef.first )->cloneTo( deleteDoc );
- request->getDeleteRequest()->addToDeletes( deleteDoc );
+ if (batchIt == batchMap.end()) {
+ TargetedWriteBatch* newBatch = new TargetedWriteBatch(write->endpoint);
+ batchIt = batchMap.insert(make_pair(&newBatch->getEndpoint(), newBatch)).first;
+ batchSizeIt =
+ batchSizes.insert(make_pair(&newBatch->getEndpoint(), BatchSize())).first;
}
- // TODO: We can add logic here to allow aborting individual ops
- //if ( NULL == response ) {
- // ->responses.erase( it++ );
- // continue;
- //}
- }
+ TargetedWriteBatch* batch = batchIt->second;
+ BatchSize& batchSize = batchSizeIt->second;
- if ( _clientRequest->isWriteConcernSet() ) {
- if ( _clientRequest->isVerboseWC() ) {
- request->setWriteConcern( _clientRequest->getWriteConcern() );
- }
- else {
- // Mongos needs to send to the shard with w > 0 so it will be able to
- // see the writeErrors.
- request->setWriteConcern( upgradeWriteConcern(
- _clientRequest->getWriteConcern() ));
- }
+ ++batchSize.numOps;
+ batchSize.sizeBytes += writeSizeBytes;
+ batch->addWrite(write);
}
- if ( !request->isOrderedSet() ) {
- request->setOrdered( _clientRequest->getOrdered() );
- }
+ // Relinquish ownership of TargetedWrites, now the TargetedBatches own them
+ writesOwned.mutableVector().clear();
+
+ //
+ // Break if we're ordered and we have more than one endpoint - later writes cannot be
+ // enforced as ordered across multiple shard endpoints.
+ //
- unique_ptr<BatchedRequestMetadata> requestMetadata( new BatchedRequestMetadata() );
- requestMetadata->setShardName( targetedBatch.getEndpoint().shardName );
- requestMetadata->setShardVersion( targetedBatch.getEndpoint().shardVersion );
- requestMetadata->setSession( 0 );
- request->setMetadata( requestMetadata.release() );
+ if (ordered && batchMap.size() > 1u)
+ break;
}
//
- // Helpers for manipulating batch responses
+ // Send back our targeted batches
//
- namespace {
- struct WriteErrorDetailComp {
- bool operator()( const WriteErrorDetail* errorA,
- const WriteErrorDetail* errorB ) const {
- return errorA->getIndex() < errorB->getIndex();
- }
- };
- }
+ for (TargetedBatchMap::iterator it = batchMap.begin(); it != batchMap.end(); ++it) {
+ TargetedWriteBatch* batch = it->second;
+
+ if (batch->getWrites().empty())
+ continue;
- static void cloneCommandErrorTo( const BatchedCommandResponse& batchResp,
- WriteErrorDetail* details ) {
- details->setErrCode( batchResp.getErrCode() );
- details->setErrMessage( batchResp.getErrMessage() );
+ // Remember targeted batch for reporting
+ _targeted.insert(batch);
+ // Send the handle back to caller
+ targetedBatches->push_back(batch);
}
- // Given *either* a batch error or an array of per-item errors, copies errors we're interested
- // in into a TrackedErrorMap
- static void trackErrors( const ShardEndpoint& endpoint,
- const vector<WriteErrorDetail*> itemErrors,
- TrackedErrors* trackedErrors ) {
- for ( vector<WriteErrorDetail*>::const_iterator it = itemErrors.begin();
- it != itemErrors.end(); ++it ) {
+ return Status::OK();
+}
- const WriteErrorDetail* error = *it;
+void BatchWriteOp::buildBatchRequest(const TargetedWriteBatch& targetedBatch,
+ BatchedCommandRequest* request) const {
+ request->setNS(_clientRequest->getNS());
+ request->setShouldBypassValidation(_clientRequest->shouldBypassValidation());
- if ( trackedErrors->isTracking( error->getErrCode() ) ) {
- trackedErrors->addError( new ShardError( endpoint, *error ) );
- }
- }
- }
+ const vector<TargetedWrite*>& targetedWrites = targetedBatch.getWrites();
- static void incBatchStats( BatchedCommandRequest::BatchType batchType,
- const BatchedCommandResponse& response,
- BatchWriteStats* stats ) {
+ for (vector<TargetedWrite*>::const_iterator it = targetedWrites.begin();
+ it != targetedWrites.end();
+ ++it) {
+ const WriteOpRef& writeOpRef = (*it)->writeOpRef;
+ BatchedCommandRequest::BatchType batchType = _clientRequest->getBatchType();
- if ( batchType == BatchedCommandRequest::BatchType_Insert) {
- stats->numInserted += response.getN();
- }
- else if ( batchType == BatchedCommandRequest::BatchType_Update ) {
- int numUpserted = 0;
- if( response.isUpsertDetailsSet() ) {
- numUpserted = response.sizeUpsertDetails();
- }
- stats->numMatched += ( response.getN() - numUpserted );
- long long numModified = response.getNModified();
+ // NOTE: We copy the batch items themselves here from the client request
+ // TODO: This could be inefficient, maybe we want to just reference in the future
+ if (batchType == BatchedCommandRequest::BatchType_Insert) {
+ BatchedInsertRequest* clientInsertRequest = _clientRequest->getInsertRequest();
+ BSONObj insertDoc = clientInsertRequest->getDocumentsAt(writeOpRef.first);
+ request->getInsertRequest()->addToDocuments(insertDoc);
+ } else if (batchType == BatchedCommandRequest::BatchType_Update) {
+ BatchedUpdateRequest* clientUpdateRequest = _clientRequest->getUpdateRequest();
+ BatchedUpdateDocument* updateDoc = new BatchedUpdateDocument;
+ clientUpdateRequest->getUpdatesAt(writeOpRef.first)->cloneTo(updateDoc);
+ request->getUpdateRequest()->addToUpdates(updateDoc);
+ } else {
+ dassert(batchType == BatchedCommandRequest::BatchType_Delete);
+ BatchedDeleteRequest* clientDeleteRequest = _clientRequest->getDeleteRequest();
+ BatchedDeleteDocument* deleteDoc = new BatchedDeleteDocument;
+ clientDeleteRequest->getDeletesAt(writeOpRef.first)->cloneTo(deleteDoc);
+ request->getDeleteRequest()->addToDeletes(deleteDoc);
+ }
+
+ // TODO: We can add logic here to allow aborting individual ops
+ // if ( NULL == response ) {
+ // ->responses.erase( it++ );
+ // continue;
+ //}
+ }
+
+ if (_clientRequest->isWriteConcernSet()) {
+ if (_clientRequest->isVerboseWC()) {
+ request->setWriteConcern(_clientRequest->getWriteConcern());
+ } else {
+ // Mongos needs to send to the shard with w > 0 so it will be able to
+ // see the writeErrors.
+ request->setWriteConcern(upgradeWriteConcern(_clientRequest->getWriteConcern()));
+ }
+ }
+
+ if (!request->isOrderedSet()) {
+ request->setOrdered(_clientRequest->getOrdered());
+ }
+
+ unique_ptr<BatchedRequestMetadata> requestMetadata(new BatchedRequestMetadata());
+ requestMetadata->setShardName(targetedBatch.getEndpoint().shardName);
+ requestMetadata->setShardVersion(targetedBatch.getEndpoint().shardVersion);
+ requestMetadata->setSession(0);
+ request->setMetadata(requestMetadata.release());
+}
- if (numModified >= 0)
- stats->numModified += numModified;
- else
- stats->numModified = -1; // sentinel used to indicate we omit the field downstream
+//
+// Helpers for manipulating batch responses
+//
- stats->numUpserted += numUpserted;
- }
- else {
- dassert( batchType == BatchedCommandRequest::BatchType_Delete );
- stats->numDeleted += response.getN();
- }
+namespace {
+struct WriteErrorDetailComp {
+ bool operator()(const WriteErrorDetail* errorA, const WriteErrorDetail* errorB) const {
+ return errorA->getIndex() < errorB->getIndex();
}
+};
+}
- void BatchWriteOp::noteBatchResponse( const TargetedWriteBatch& targetedBatch,
- const BatchedCommandResponse& response,
- TrackedErrors* trackedErrors ) {
-
- if ( !response.getOk() ) {
+static void cloneCommandErrorTo(const BatchedCommandResponse& batchResp,
+ WriteErrorDetail* details) {
+ details->setErrCode(batchResp.getErrCode());
+ details->setErrMessage(batchResp.getErrMessage());
+}
- WriteErrorDetail error;
- cloneCommandErrorTo( response, &error );
+// Given *either* a batch error or an array of per-item errors, copies errors we're interested
+// in into a TrackedErrorMap
+static void trackErrors(const ShardEndpoint& endpoint,
+ const vector<WriteErrorDetail*> itemErrors,
+ TrackedErrors* trackedErrors) {
+ for (vector<WriteErrorDetail*>::const_iterator it = itemErrors.begin(); it != itemErrors.end();
+ ++it) {
+ const WriteErrorDetail* error = *it;
- // Treat command errors exactly like other failures of the batch
- // Note that no errors will be tracked from these failures - as-designed
- noteBatchError( targetedBatch, error );
- return;
+ if (trackedErrors->isTracking(error->getErrCode())) {
+ trackedErrors->addError(new ShardError(endpoint, *error));
}
+ }
+}
- dassert( response.getOk() );
-
- // Stop tracking targeted batch
- _targeted.erase( &targetedBatch );
+static void incBatchStats(BatchedCommandRequest::BatchType batchType,
+ const BatchedCommandResponse& response,
+ BatchWriteStats* stats) {
+ if (batchType == BatchedCommandRequest::BatchType_Insert) {
+ stats->numInserted += response.getN();
+ } else if (batchType == BatchedCommandRequest::BatchType_Update) {
+ int numUpserted = 0;
+ if (response.isUpsertDetailsSet()) {
+ numUpserted = response.sizeUpsertDetails();
+ }
+ stats->numMatched += (response.getN() - numUpserted);
+ long long numModified = response.getNModified();
+
+ if (numModified >= 0)
+ stats->numModified += numModified;
+ else
+ stats->numModified = -1; // sentinel used to indicate we omit the field downstream
+
+ stats->numUpserted += numUpserted;
+ } else {
+ dassert(batchType == BatchedCommandRequest::BatchType_Delete);
+ stats->numDeleted += response.getN();
+ }
+}
- // Increment stats for this batch
- incBatchStats( _clientRequest->getBatchType(), response, _stats.get() );
+void BatchWriteOp::noteBatchResponse(const TargetedWriteBatch& targetedBatch,
+ const BatchedCommandResponse& response,
+ TrackedErrors* trackedErrors) {
+ if (!response.getOk()) {
+ WriteErrorDetail error;
+ cloneCommandErrorTo(response, &error);
- //
- // Assign errors to particular items.
- // Write Concern errors are stored and handled later.
- //
+ // Treat command errors exactly like other failures of the batch
+ // Note that no errors will be tracked from these failures - as-designed
+ noteBatchError(targetedBatch, error);
+ return;
+ }
- // Special handling for write concern errors, save for later
- if ( response.isWriteConcernErrorSet() ) {
- unique_ptr<ShardWCError> wcError( new ShardWCError( targetedBatch.getEndpoint(),
- *response.getWriteConcernError() ));
- _wcErrors.mutableVector().push_back( wcError.release() );
- }
+ dassert(response.getOk());
- vector<WriteErrorDetail*> itemErrors;
+ // Stop tracking targeted batch
+ _targeted.erase(&targetedBatch);
- // Handle batch and per-item errors
- if ( response.isErrDetailsSet() ) {
+ // Increment stats for this batch
+ incBatchStats(_clientRequest->getBatchType(), response, _stats.get());
- // Per-item errors were set
- itemErrors.insert( itemErrors.begin(),
- response.getErrDetails().begin(),
- response.getErrDetails().end() );
+ //
+ // Assign errors to particular items.
+ // Write Concern errors are stored and handled later.
+ //
- // Sort per-item errors by index
- std::sort( itemErrors.begin(), itemErrors.end(), WriteErrorDetailComp() );
- }
+ // Special handling for write concern errors, save for later
+ if (response.isWriteConcernErrorSet()) {
+ unique_ptr<ShardWCError> wcError(
+ new ShardWCError(targetedBatch.getEndpoint(), *response.getWriteConcernError()));
+ _wcErrors.mutableVector().push_back(wcError.release());
+ }
- //
- // Go through all pending responses of the op and sorted remote reponses, populate errors
- // This will either set all errors to the batch error or apply per-item errors as-needed
- //
- // If the batch is ordered, cancel all writes after the first error for retargeting.
- //
+ vector<WriteErrorDetail*> itemErrors;
- bool ordered = _clientRequest->getOrdered();
+ // Handle batch and per-item errors
+ if (response.isErrDetailsSet()) {
+ // Per-item errors were set
+ itemErrors.insert(
+ itemErrors.begin(), response.getErrDetails().begin(), response.getErrDetails().end());
- vector<WriteErrorDetail*>::iterator itemErrorIt = itemErrors.begin();
- int index = 0;
- WriteErrorDetail* lastError = NULL;
- for ( vector<TargetedWrite*>::const_iterator it = targetedBatch.getWrites().begin();
- it != targetedBatch.getWrites().end(); ++it, ++index ) {
+ // Sort per-item errors by index
+ std::sort(itemErrors.begin(), itemErrors.end(), WriteErrorDetailComp());
+ }
- const TargetedWrite* write = *it;
- WriteOp& writeOp = _writeOps[write->writeOpRef.first];
+ //
+ // Go through all pending responses of the op and sorted remote reponses, populate errors
+ // This will either set all errors to the batch error or apply per-item errors as-needed
+ //
+ // If the batch is ordered, cancel all writes after the first error for retargeting.
+ //
- dassert( writeOp.getWriteState() == WriteOpState_Pending );
+ bool ordered = _clientRequest->getOrdered();
- // See if we have an error for the write
- WriteErrorDetail* writeError = NULL;
+ vector<WriteErrorDetail*>::iterator itemErrorIt = itemErrors.begin();
+ int index = 0;
+ WriteErrorDetail* lastError = NULL;
+ for (vector<TargetedWrite*>::const_iterator it = targetedBatch.getWrites().begin();
+ it != targetedBatch.getWrites().end();
+ ++it, ++index) {
+ const TargetedWrite* write = *it;
+ WriteOp& writeOp = _writeOps[write->writeOpRef.first];
- if ( itemErrorIt != itemErrors.end() && ( *itemErrorIt )->getIndex() == index ) {
- // We have an per-item error for this write op's index
- writeError = *itemErrorIt;
- ++itemErrorIt;
- }
+ dassert(writeOp.getWriteState() == WriteOpState_Pending);
- // Finish the response (with error, if needed)
- if ( NULL == writeError ) {
- if ( !ordered || !lastError ){
- writeOp.noteWriteComplete( *write );
- }
- else {
- // We didn't actually apply this write - cancel so we can retarget
- dassert( writeOp.getNumTargeted() == 1u );
- writeOp.cancelWrites( lastError );
- }
- }
- else {
- writeOp.noteWriteError( *write, *writeError );
- lastError = writeError;
- }
- }
+ // See if we have an error for the write
+ WriteErrorDetail* writeError = NULL;
- // Track errors we care about, whether batch or individual errors
- if ( NULL != trackedErrors ) {
- trackErrors( targetedBatch.getEndpoint(), itemErrors, trackedErrors );
+ if (itemErrorIt != itemErrors.end() && (*itemErrorIt)->getIndex() == index) {
+ // We have an per-item error for this write op's index
+ writeError = *itemErrorIt;
+ ++itemErrorIt;
}
- // Track upserted ids if we need to
- if ( response.isUpsertDetailsSet() ) {
-
- const vector<BatchedUpsertDetail*>& upsertedIds = response.getUpsertDetails();
- for ( vector<BatchedUpsertDetail*>::const_iterator it = upsertedIds.begin();
- it != upsertedIds.end(); ++it ) {
-
- // The child upserted details don't have the correct index for the full batch
- const BatchedUpsertDetail* childUpsertedId = *it;
-
- // Work backward from the child batch item index to the batch item index
- int childBatchIndex = childUpsertedId->getIndex();
- int batchIndex = targetedBatch.getWrites()[childBatchIndex]->writeOpRef.first;
-
- // Push the upserted id with the correct index into the batch upserted ids
- BatchedUpsertDetail* upsertedId = new BatchedUpsertDetail;
- upsertedId->setIndex( batchIndex );
- upsertedId->setUpsertedID( childUpsertedId->getUpsertedID() );
- _upsertedIds.mutableVector().push_back( upsertedId );
+ // Finish the response (with error, if needed)
+ if (NULL == writeError) {
+ if (!ordered || !lastError) {
+ writeOp.noteWriteComplete(*write);
+ } else {
+ // We didn't actually apply this write - cancel so we can retarget
+ dassert(writeOp.getNumTargeted() == 1u);
+ writeOp.cancelWrites(lastError);
}
+ } else {
+ writeOp.noteWriteError(*write, *writeError);
+ lastError = writeError;
}
}
- static void toWriteErrorResponse( const WriteErrorDetail& error,
- bool ordered,
- int numWrites,
- BatchedCommandResponse* writeErrResponse ) {
+ // Track errors we care about, whether batch or individual errors
+ if (NULL != trackedErrors) {
+ trackErrors(targetedBatch.getEndpoint(), itemErrors, trackedErrors);
+ }
+
+ // Track upserted ids if we need to
+ if (response.isUpsertDetailsSet()) {
+ const vector<BatchedUpsertDetail*>& upsertedIds = response.getUpsertDetails();
+ for (vector<BatchedUpsertDetail*>::const_iterator it = upsertedIds.begin();
+ it != upsertedIds.end();
+ ++it) {
+ // The child upserted details don't have the correct index for the full batch
+ const BatchedUpsertDetail* childUpsertedId = *it;
- writeErrResponse->setOk( true );
- writeErrResponse->setN( 0 );
+ // Work backward from the child batch item index to the batch item index
+ int childBatchIndex = childUpsertedId->getIndex();
+ int batchIndex = targetedBatch.getWrites()[childBatchIndex]->writeOpRef.first;
- int numErrors = ordered ? 1 : numWrites;
- for ( int i = 0; i < numErrors; i++ ) {
- unique_ptr<WriteErrorDetail> errorClone( new WriteErrorDetail );
- error.cloneTo( errorClone.get() );
- errorClone->setIndex( i );
- writeErrResponse->addToErrDetails( errorClone.release() );
+ // Push the upserted id with the correct index into the batch upserted ids
+ BatchedUpsertDetail* upsertedId = new BatchedUpsertDetail;
+ upsertedId->setIndex(batchIndex);
+ upsertedId->setUpsertedID(childUpsertedId->getUpsertedID());
+ _upsertedIds.mutableVector().push_back(upsertedId);
}
-
- dassert( writeErrResponse->isValid( NULL ) );
}
+}
- void BatchWriteOp::noteBatchError( const TargetedWriteBatch& targetedBatch,
- const WriteErrorDetail& error ) {
-
- // Treat errors to get a batch response as failures of the contained writes
- BatchedCommandResponse emulatedResponse;
- toWriteErrorResponse( error,
- _clientRequest->getOrdered(),
- targetedBatch.getWrites().size(),
- &emulatedResponse );
+static void toWriteErrorResponse(const WriteErrorDetail& error,
+ bool ordered,
+ int numWrites,
+ BatchedCommandResponse* writeErrResponse) {
+ writeErrResponse->setOk(true);
+ writeErrResponse->setN(0);
- noteBatchResponse( targetedBatch, emulatedResponse, NULL );
+ int numErrors = ordered ? 1 : numWrites;
+ for (int i = 0; i < numErrors; i++) {
+ unique_ptr<WriteErrorDetail> errorClone(new WriteErrorDetail);
+ error.cloneTo(errorClone.get());
+ errorClone->setIndex(i);
+ writeErrResponse->addToErrDetails(errorClone.release());
}
- void BatchWriteOp::abortBatch( const WriteErrorDetail& error ) {
+ dassert(writeErrResponse->isValid(NULL));
+}
- dassert( !isFinished() );
- dassert( numWriteOpsIn( WriteOpState_Pending ) == 0 );
+void BatchWriteOp::noteBatchError(const TargetedWriteBatch& targetedBatch,
+ const WriteErrorDetail& error) {
+ // Treat errors to get a batch response as failures of the contained writes
+ BatchedCommandResponse emulatedResponse;
+ toWriteErrorResponse(
+ error, _clientRequest->getOrdered(), targetedBatch.getWrites().size(), &emulatedResponse);
- size_t numWriteOps = _clientRequest->sizeWriteOps();
- bool orderedOps = _clientRequest->getOrdered();
- for ( size_t i = 0; i < numWriteOps; ++i ) {
+ noteBatchResponse(targetedBatch, emulatedResponse, NULL);
+}
- WriteOp& writeOp = _writeOps[i];
- // Can only be called with no outstanding batches
- dassert( writeOp.getWriteState() != WriteOpState_Pending );
+void BatchWriteOp::abortBatch(const WriteErrorDetail& error) {
+ dassert(!isFinished());
+ dassert(numWriteOpsIn(WriteOpState_Pending) == 0);
- if ( writeOp.getWriteState() < WriteOpState_Completed ) {
+ size_t numWriteOps = _clientRequest->sizeWriteOps();
+ bool orderedOps = _clientRequest->getOrdered();
+ for (size_t i = 0; i < numWriteOps; ++i) {
+ WriteOp& writeOp = _writeOps[i];
+ // Can only be called with no outstanding batches
+ dassert(writeOp.getWriteState() != WriteOpState_Pending);
- writeOp.setOpError( error );
+ if (writeOp.getWriteState() < WriteOpState_Completed) {
+ writeOp.setOpError(error);
- // Only one error if we're ordered
- if ( orderedOps ) break;
- }
+ // Only one error if we're ordered
+ if (orderedOps)
+ break;
}
-
- dassert( isFinished() );
}
- bool BatchWriteOp::isFinished() {
-
- size_t numWriteOps = _clientRequest->sizeWriteOps();
- bool orderedOps = _clientRequest->getOrdered();
- for ( size_t i = 0; i < numWriteOps; ++i ) {
- WriteOp& writeOp = _writeOps[i];
- if ( writeOp.getWriteState() < WriteOpState_Completed ) return false;
- else if ( orderedOps && writeOp.getWriteState() == WriteOpState_Error ) return true;
- }
+ dassert(isFinished());
+}
- return true;
+bool BatchWriteOp::isFinished() {
+ size_t numWriteOps = _clientRequest->sizeWriteOps();
+ bool orderedOps = _clientRequest->getOrdered();
+ for (size_t i = 0; i < numWriteOps; ++i) {
+ WriteOp& writeOp = _writeOps[i];
+ if (writeOp.getWriteState() < WriteOpState_Completed)
+ return false;
+ else if (orderedOps && writeOp.getWriteState() == WriteOpState_Error)
+ return true;
}
- //
- // Aggregation functions for building the final response errors
- //
-
- void BatchWriteOp::buildClientResponse( BatchedCommandResponse* batchResp ) {
+ return true;
+}
- dassert( isFinished() );
+//
+// Aggregation functions for building the final response errors
+//
- // Result is OK
- batchResp->setOk( true );
+void BatchWriteOp::buildClientResponse(BatchedCommandResponse* batchResp) {
+ dassert(isFinished());
- // For non-verbose, it's all we need.
- if ( !_clientRequest->isVerboseWC() ) {
- dassert( batchResp->isValid( NULL ) );
- return;
- }
+ // Result is OK
+ batchResp->setOk(true);
- //
- // Find all the errors in the batch
- //
+ // For non-verbose, it's all we need.
+ if (!_clientRequest->isVerboseWC()) {
+ dassert(batchResp->isValid(NULL));
+ return;
+ }
- vector<WriteOp*> errOps;
+ //
+ // Find all the errors in the batch
+ //
- size_t numWriteOps = _clientRequest->sizeWriteOps();
- for ( size_t i = 0; i < numWriteOps; ++i ) {
+ vector<WriteOp*> errOps;
- WriteOp& writeOp = _writeOps[i];
+ size_t numWriteOps = _clientRequest->sizeWriteOps();
+ for (size_t i = 0; i < numWriteOps; ++i) {
+ WriteOp& writeOp = _writeOps[i];
- if ( writeOp.getWriteState() == WriteOpState_Error ) {
- errOps.push_back( &writeOp );
- }
+ if (writeOp.getWriteState() == WriteOpState_Error) {
+ errOps.push_back(&writeOp);
}
+ }
- //
- // Build the per-item errors.
- //
-
- if ( !errOps.empty() ) {
- for ( vector<WriteOp*>::iterator it = errOps.begin(); it != errOps.end(); ++it ) {
- WriteOp& writeOp = **it;
- WriteErrorDetail* error = new WriteErrorDetail();
- writeOp.getOpError().cloneTo( error );
- batchResp->addToErrDetails( error );
- }
- }
-
- // Only return a write concern error if everything succeeded (unordered or ordered)
- // OR if something succeeded and we're unordered
- bool reportWCError = errOps.empty()
- || ( !_clientRequest->getOrdered()
- && errOps.size() < _clientRequest->sizeWriteOps() );
- if ( !_wcErrors.empty() && reportWCError ) {
-
- WCErrorDetail* error = new WCErrorDetail;
-
- // Generate the multi-error message below
- stringstream msg;
- if ( _wcErrors.size() > 1 ) {
- msg << "multiple errors reported : ";
- error->setErrCode( ErrorCodes::WriteConcernFailed );
- }
- else {
- error->setErrCode( ( *_wcErrors.begin() )->error.getErrCode() );
- }
-
- for ( vector<ShardWCError*>::const_iterator it = _wcErrors.begin();
- it != _wcErrors.end(); ++it ) {
- const ShardWCError* wcError = *it;
- if ( it != _wcErrors.begin() )
- msg << " :: and :: ";
- msg << wcError->error.getErrMessage() << " at " << wcError->endpoint.shardName;
- }
+ //
+ // Build the per-item errors.
+ //
- error->setErrMessage( msg.str() );
- batchResp->setWriteConcernError( error );
+ if (!errOps.empty()) {
+ for (vector<WriteOp*>::iterator it = errOps.begin(); it != errOps.end(); ++it) {
+ WriteOp& writeOp = **it;
+ WriteErrorDetail* error = new WriteErrorDetail();
+ writeOp.getOpError().cloneTo(error);
+ batchResp->addToErrDetails(error);
}
+ }
- //
- // Append the upserted ids, if required
- //
+ // Only return a write concern error if everything succeeded (unordered or ordered)
+ // OR if something succeeded and we're unordered
+ bool reportWCError = errOps.empty() ||
+ (!_clientRequest->getOrdered() && errOps.size() < _clientRequest->sizeWriteOps());
+ if (!_wcErrors.empty() && reportWCError) {
+ WCErrorDetail* error = new WCErrorDetail;
- if ( _upsertedIds.size() != 0 ) {
- batchResp->setUpsertDetails( _upsertedIds.vector() );
+ // Generate the multi-error message below
+ stringstream msg;
+ if (_wcErrors.size() > 1) {
+ msg << "multiple errors reported : ";
+ error->setErrCode(ErrorCodes::WriteConcernFailed);
+ } else {
+ error->setErrCode((*_wcErrors.begin())->error.getErrCode());
}
- // Stats
- int nValue = _stats->numInserted + _stats->numUpserted + _stats->numMatched
- + _stats->numDeleted;
- batchResp->setN( nValue );
- if ( _clientRequest->getBatchType() == BatchedCommandRequest::BatchType_Update &&
- _stats->numModified >= 0) {
- batchResp->setNModified( _stats->numModified );
+ for (vector<ShardWCError*>::const_iterator it = _wcErrors.begin(); it != _wcErrors.end();
+ ++it) {
+ const ShardWCError* wcError = *it;
+ if (it != _wcErrors.begin())
+ msg << " :: and :: ";
+ msg << wcError->error.getErrMessage() << " at " << wcError->endpoint.shardName;
}
- dassert( batchResp->isValid( NULL ) );
+ error->setErrMessage(msg.str());
+ batchResp->setWriteConcernError(error);
}
- BatchWriteOp::~BatchWriteOp() {
- // Caller's responsibility to dispose of TargetedBatches
- dassert( _targeted.empty() );
-
- if ( NULL != _writeOps ) {
-
- size_t numWriteOps = _clientRequest->sizeWriteOps();
- for ( size_t i = 0; i < numWriteOps; ++i ) {
- // Placement new so manual destruct
- _writeOps[i].~WriteOp();
- }
+ //
+ // Append the upserted ids, if required
+ //
- ::operator delete[]( _writeOps );
- _writeOps = NULL;
- }
+ if (_upsertedIds.size() != 0) {
+ batchResp->setUpsertDetails(_upsertedIds.vector());
}
- int BatchWriteOp::numWriteOps() const {
- return static_cast<int>( _clientRequest->sizeWriteOps() );
+ // Stats
+ int nValue =
+ _stats->numInserted + _stats->numUpserted + _stats->numMatched + _stats->numDeleted;
+ batchResp->setN(nValue);
+ if (_clientRequest->getBatchType() == BatchedCommandRequest::BatchType_Update &&
+ _stats->numModified >= 0) {
+ batchResp->setNModified(_stats->numModified);
}
- int BatchWriteOp::numWriteOpsIn( WriteOpState opState ) const {
+ dassert(batchResp->isValid(NULL));
+}
+
+BatchWriteOp::~BatchWriteOp() {
+ // Caller's responsibility to dispose of TargetedBatches
+ dassert(_targeted.empty());
- // TODO: This could be faster, if we tracked this info explicitly
+ if (NULL != _writeOps) {
size_t numWriteOps = _clientRequest->sizeWriteOps();
- int count = 0;
- for ( size_t i = 0; i < numWriteOps; ++i ) {
- WriteOp& writeOp = _writeOps[i];
- if ( writeOp.getWriteState() == opState )
- ++count;
+ for (size_t i = 0; i < numWriteOps; ++i) {
+ // Placement new so manual destruct
+ _writeOps[i].~WriteOp();
}
- return count;
+ ::operator delete[](_writeOps);
+ _writeOps = NULL;
}
+}
- void TrackedErrors::startTracking( int errCode ) {
- dassert( !isTracking( errCode ) );
- _errorMap.insert( make_pair( errCode, vector<ShardError*>() ) );
- }
+int BatchWriteOp::numWriteOps() const {
+ return static_cast<int>(_clientRequest->sizeWriteOps());
+}
- bool TrackedErrors::isTracking( int errCode ) const {
- return _errorMap.find( errCode ) != _errorMap.end();
+int BatchWriteOp::numWriteOpsIn(WriteOpState opState) const {
+ // TODO: This could be faster, if we tracked this info explicitly
+ size_t numWriteOps = _clientRequest->sizeWriteOps();
+ int count = 0;
+ for (size_t i = 0; i < numWriteOps; ++i) {
+ WriteOp& writeOp = _writeOps[i];
+ if (writeOp.getWriteState() == opState)
+ ++count;
}
- void TrackedErrors::addError( ShardError* error ) {
- TrackedErrorMap::iterator seenIt = _errorMap.find( error->error.getErrCode() );
- if ( seenIt == _errorMap.end() ) return;
- seenIt->second.push_back( error );
- }
+ return count;
+}
- const vector<ShardError*>& TrackedErrors::getErrors( int errCode ) const {
- dassert( isTracking( errCode ) );
- return _errorMap.find( errCode )->second;
- }
+void TrackedErrors::startTracking(int errCode) {
+ dassert(!isTracking(errCode));
+ _errorMap.insert(make_pair(errCode, vector<ShardError*>()));
+}
- void TrackedErrors::clear() {
- for ( TrackedErrorMap::iterator it = _errorMap.begin(); it != _errorMap.end(); ++it ) {
+bool TrackedErrors::isTracking(int errCode) const {
+ return _errorMap.find(errCode) != _errorMap.end();
+}
- vector<ShardError*>& errors = it->second;
+void TrackedErrors::addError(ShardError* error) {
+ TrackedErrorMap::iterator seenIt = _errorMap.find(error->error.getErrCode());
+ if (seenIt == _errorMap.end())
+ return;
+ seenIt->second.push_back(error);
+}
- for ( vector<ShardError*>::iterator errIt = errors.begin(); errIt != errors.end();
- ++errIt ) {
- delete *errIt;
- }
- errors.clear();
- }
- }
+const vector<ShardError*>& TrackedErrors::getErrors(int errCode) const {
+ dassert(isTracking(errCode));
+ return _errorMap.find(errCode)->second;
+}
+
+void TrackedErrors::clear() {
+ for (TrackedErrorMap::iterator it = _errorMap.begin(); it != _errorMap.end(); ++it) {
+ vector<ShardError*>& errors = it->second;
- TrackedErrors::~TrackedErrors() {
- clear();
+ for (vector<ShardError*>::iterator errIt = errors.begin(); errIt != errors.end(); ++errIt) {
+ delete *errIt;
+ }
+ errors.clear();
}
+}
+TrackedErrors::~TrackedErrors() {
+ clear();
+}
}