diff options
author | Greg Studer <greg@10gen.com> | 2014-04-09 12:51:18 -0400 |
---|---|---|
committer | Greg Studer <greg@10gen.com> | 2014-04-14 10:48:20 -0400 |
commit | 52da235706ca1713ae91475009348a0ceded5307 (patch) | |
tree | ac66d91023a63820d833ec0b7153e4a0d72f18d2 | |
parent | b14b921ba3b3424681212707d82b76097bf13c9d (diff) | |
download | mongo-52da235706ca1713ae91475009348a0ceded5307.tar.gz |
SERVER-13518 break child batches up by size or number of documents
(cherry picked from commit 0b1994a25c85324ea413a95ace2470be3efb7db5)
Also includes SERVER-13518 make sure all tested updates are valid with update exprs
(cherry picked from commit 836b1d82810d4b7d98eed8c789f71ed45f473b85)
-rw-r--r-- | jstests/sharding/batch_write_command_sharded.js | 60 | ||||
-rw-r--r-- | src/mongo/s/mock_ns_targeter.h | 1 | ||||
-rw-r--r-- | src/mongo/s/write_ops/batch_write_op.cpp | 114 | ||||
-rw-r--r-- | src/mongo/s/write_ops/batch_write_op_test.cpp | 214 | ||||
-rw-r--r-- | src/mongo/s/write_ops/write_op.cpp | 4 | ||||
-rw-r--r-- | src/mongo/s/write_ops/write_op.h | 5 |
6 files changed, 378 insertions, 20 deletions
diff --git a/jstests/sharding/batch_write_command_sharded.js b/jstests/sharding/batch_write_command_sharded.js index 2a8944e6b05..269ac5b24b9 100644 --- a/jstests/sharding/batch_write_command_sharded.js +++ b/jstests/sharding/batch_write_command_sharded.js @@ -25,6 +25,58 @@ var result; // // +// Mongos _id autogeneration tests for sharded collections + +var coll = mongos.getCollection("foo.bar"); +assert.commandWorked(admin.runCommand({ enableSharding : coll.getDB().toString() })); +assert.commandWorked(admin.runCommand({ shardCollection : coll.toString(), + key : { _id : 1 } })); + +// +// Basic insert no _id +coll.remove({}); +printjson( request = {insert : coll.getName(), + documents: [{ a : 1 }] } ); +printjson( result = coll.runCommand(request) ); +assert(result.ok); +assert.eq(1, result.n); +assert.eq(1, coll.count()); + +// +// Multi insert some _ids +coll.remove({}); +printjson( request = {insert : coll.getName(), + documents: [{ _id : 0, a : 1 }, { a : 2 }] } ); +printjson( result = coll.runCommand(request) ); +assert(result.ok); +assert.eq(2, result.n); +assert.eq(2, coll.count()); +assert.eq(1, coll.count({ _id : 0 })); + +// +// Ensure generating many _ids don't push us over limits +var maxDocSize = (16 * 1024 * 1024) / 1000; +var baseDocSize = Object.bsonsize({ a : 1, data : "" }); +var dataSize = maxDocSize - baseDocSize; + +var data = ""; +for (var i = 0; i < dataSize; i++) + data += "x"; + +var documents = []; +for (var i = 0; i < 1000; i++) documents.push({ a : i, data : data }); + +assert.commandWorked(coll.getMongo().getDB("admin").runCommand({ setParameter : 1, logLevel : 4 })); +coll.remove({}); +request = { insert : coll.getName(), + documents: documents }; +printjson( result = coll.runCommand(request) ); +assert(result.ok); +assert.eq(1000, result.n); +assert.eq(1000, coll.count()); + +// +// // Stale config progress tests // Set up a new collection across two shards, then revert the chunks to an earlier state to put // mongos and mongod permanently out of sync. @@ -33,9 +85,9 @@ var result; var brokenColl = mongos.getCollection( "broken.coll" ); assert.commandWorked(admin.runCommand({ enableSharding : brokenColl.getDB().toString() })); printjson(admin.runCommand({ movePrimary : brokenColl.getDB().toString(), to : shards[0]._id })); -assert.commandWorked(admin.runCommand({ shardCollection : brokenColl.toString(), +assert.commandWorked(admin.runCommand({ shardCollection : brokenColl.toString(), key : { _id : 1 } })); -assert.commandWorked(admin.runCommand({ split : brokenColl.toString(), +assert.commandWorked(admin.runCommand({ split : brokenColl.toString(), middle : { _id : 0 } })); var oldChunks = config.chunks.find().toArray(); @@ -49,7 +101,7 @@ assert.eq(null, brokenColl.getDB().getLastError()); // Modify the chunks to make shards at a higher version -assert.commandWorked(admin.runCommand({ moveChunk : brokenColl.toString(), +assert.commandWorked(admin.runCommand({ moveChunk : brokenColl.toString(), find : { _id : 0 }, to : shards[1]._id })); @@ -61,7 +113,7 @@ for ( var i = 0; i < oldChunks.length; i++ ) config.chunks.insert(oldChunks[i]); assert.eq(null, config.getLastError()); -// Stale mongos can no longer bring itself up-to-date! +// Stale mongos can no longer bring itself up-to-date! // END SETUP // diff --git a/src/mongo/s/mock_ns_targeter.h b/src/mongo/s/mock_ns_targeter.h index d51ea5de3b6..f4e6140220e 100644 --- a/src/mongo/s/mock_ns_targeter.h +++ b/src/mongo/s/mock_ns_targeter.h @@ -160,7 +160,6 @@ namespace mongo { KeyRange parseRange( const BSONObj& query ) const { - ASSERT_EQUALS( query.nFields(), 1 ); string fieldName = query.firstElement().fieldName(); if ( query.firstElement().isNumber() ) { diff --git a/src/mongo/s/write_ops/batch_write_op.cpp b/src/mongo/s/write_ops/batch_write_op.cpp index 8f0688eaef2..15de4a1fd6b 100644 --- a/src/mongo/s/write_ops/batch_write_op.cpp +++ b/src/mongo/s/write_ops/batch_write_op.cpp @@ -105,6 +105,22 @@ namespace mongo { }; typedef std::map<const ShardEndpoint*, TargetedWriteBatch*, EndpointComp> TargetedBatchMap; + + // + // Types for tracking batch sizes + // + + struct BatchSize { + + BatchSize() : + numOps(0), sizeBytes(0) { + } + + int numOps; + int sizeBytes; + }; + + typedef std::map<const ShardEndpoint*, BatchSize, EndpointComp> TargetedBatchSizeMap; } static void buildTargetError( const Status& errStatus, WriteErrorDetail* details ) { @@ -128,19 +144,66 @@ namespace mongo { return false; } - // Helper function to cancel all the write ops of targeted batch. - static void cancelBatch( const TargetedWriteBatch& targetedBatch, - WriteOp* writeOps, - const WriteErrorDetail& why ) { - const vector<TargetedWrite*>& writes = targetedBatch.getWrites(); + // MAGIC NUMBERS + // Before serializing updates/deletes, we don't know how big their fields would be, but we break + // batches before serializing. + // TODO: Revisit when we revisit command limits in general + static const int kEstUpdateOverheadBytes = (BSONObjMaxInternalSize - BSONObjMaxUserSize) / 100; + static const int kEstDeleteOverheadBytes = (BSONObjMaxInternalSize - BSONObjMaxUserSize) / 100; + + static int getWriteSizeBytes(const WriteOp& writeOp) { + + const BatchItemRef& item = writeOp.getWriteItem(); + BatchedCommandRequest::BatchType batchType = item.getOpType(); + + if (batchType == BatchedCommandRequest::BatchType_Insert) { + return item.getDocument().objsize(); + } + else if (batchType == BatchedCommandRequest::BatchType_Update) { + // Note: Be conservative here - it's okay if we send slightly too many batches + int estSize = item.getUpdate()->getQuery().objsize() + + item.getUpdate()->getUpdateExpr().objsize() + kEstUpdateOverheadBytes; + dassert(estSize >= item.getUpdate()->toBSON().objsize()); + return estSize; + } + else { + dassert( batchType == BatchedCommandRequest::BatchType_Delete ); + // Note: Be conservative here - it's okay if we send slightly too many batches + int estSize = item.getDelete()->getQuery().objsize() + kEstDeleteOverheadBytes; + dassert(estSize >= item.getDelete()->toBSON().objsize()); + return estSize; + } + } + + // Helper to determine whether a number of targeted writes require a new targeted batch + static bool wouldMakeBatchesTooBig(const vector<TargetedWrite*>& writes, + int writeSizeBytes, + const TargetedBatchSizeMap& batchSizes) { + + for (vector<TargetedWrite*>::const_iterator it = writes.begin(); it != writes.end(); ++it) { - for ( vector<TargetedWrite*>::const_iterator writeIt = writes.begin(); - writeIt != writes.end(); ++writeIt ) { + const TargetedWrite* write = *it; + TargetedBatchSizeMap::const_iterator seenIt = batchSizes.find(&write->endpoint); + + if (seenIt == batchSizes.end()) { + // If this is the first item in the batch, it can't be too big + continue; + } + + const BatchSize& batchSize = seenIt->second; - TargetedWrite* write = *writeIt; - // NOTE: We may repeatedly cancel a write op here, but that's fast. - writeOps[write->writeOpRef.first].cancelWrites( &why ); + if (batchSize.numOps >= static_cast<int>(BatchedCommandRequest::kMaxWriteBatchSize)) { + // Too many items in batch + return true; + } + + if (batchSize.sizeBytes + writeSizeBytes > BSONObjMaxUserSize) { + // Batch would be too big + return true; + } } + + return false; } // Helper function to cancel all the write ops of targeted batches in a map @@ -215,6 +278,8 @@ namespace mongo { const bool ordered = _clientRequest->getOrdered(); TargetedBatchMap batchMap; + TargetedBatchSizeMap batchSizes; + int numTargetErrors = 0; size_t numWriteOps = _clientRequest->sizeWriteOps(); @@ -287,6 +352,17 @@ namespace mongo { } // + // If this write will push us over some sort of size limit, stop targeting + // + + int writeSizeBytes = getWriteSizeBytes(writeOp); + if (wouldMakeBatchesTooBig(writes, writeSizeBytes, batchSizes)) { + invariant(!batchMap.empty()); + writeOp.cancelWrites(NULL); + break; + } + + // // Targeting went ok, add to appropriate TargetedBatch // @@ -294,14 +370,22 @@ namespace mongo { TargetedWrite* write = *it; - TargetedBatchMap::iterator seenIt = batchMap.find( &write->endpoint ); - if ( seenIt == batchMap.end() ) { + TargetedBatchMap::iterator batchIt = batchMap.find( &write->endpoint ); + TargetedBatchSizeMap::iterator batchSizeIt = batchSizes.find( &write->endpoint ); + + if ( batchIt == batchMap.end() ) { TargetedWriteBatch* newBatch = new TargetedWriteBatch( write->endpoint ); - seenIt = batchMap.insert( make_pair( &newBatch->getEndpoint(), // - newBatch ) ).first; + batchIt = batchMap.insert( make_pair( &newBatch->getEndpoint(), + newBatch ) ).first; + batchSizeIt = batchSizes.insert(make_pair(&newBatch->getEndpoint(), + BatchSize())).first; } - TargetedWriteBatch* batch = seenIt->second; + TargetedWriteBatch* batch = batchIt->second; + BatchSize& batchSize = batchSizeIt->second; + + ++batchSize.numOps; + batchSize.sizeBytes += writeSizeBytes; batch->addWrite( write ); } diff --git a/src/mongo/s/write_ops/batch_write_op_test.cpp b/src/mongo/s/write_ops/batch_write_op_test.cpp index 06e256f6555..7170b6f5ce7 100644 --- a/src/mongo/s/write_ops/batch_write_op_test.cpp +++ b/src/mongo/s/write_ops/batch_write_op_test.cpp @@ -92,11 +92,22 @@ namespace { static BatchedUpdateDocument* buildUpdate( const BSONObj& query, bool multi ) { BatchedUpdateDocument* updateDoc = new BatchedUpdateDocument; + updateDoc->setUpdateExpr( BSONObj() ); updateDoc->setQuery( query ); updateDoc->setMulti( multi ); return updateDoc; } + static BatchedUpdateDocument* buildUpdate(const BSONObj& query, + const BSONObj& updateExpr, + bool multi) { + BatchedUpdateDocument* updateDoc = new BatchedUpdateDocument; + updateDoc->setQuery( query ); + updateDoc->setUpdateExpr( updateExpr ); + updateDoc->setMulti( multi ); + return updateDoc; + } + static void buildResponse( int n, BatchedCommandResponse* response ) { response->clear(); response->setOk( true ); @@ -1616,4 +1627,207 @@ namespace { ASSERT( clientResponse.isWriteConcernErrorSet() ); } + // + // Tests of batch size limit functionality + // + + TEST(WriteOpLimitTests, OneBigDoc) { + + // + // Big single operation test - should go through + // + + NamespaceString nss("foo.bar"); + ShardEndpoint endpoint("shard", ChunkVersion::IGNORED()); + MockNSTargeter targeter; + initTargeterFullRange(nss, endpoint, &targeter); + + // Create a BSONObj (slightly) bigger than the maximum size by including a max-size string + string bigString(BSONObjMaxUserSize, 'x'); + + // Do single-target, single doc batch write op + BatchedCommandRequest request(BatchedCommandRequest::BatchType_Insert); + request.setNS(nss.ns()); + request.getInsertRequest()->addToDocuments(BSON( "x" << 1 << "data" << bigString )); + + BatchWriteOp batchOp; + batchOp.initClientRequest(&request); + + OwnedPointerVector<TargetedWriteBatch> targetedOwned; + vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector(); + Status status = batchOp.targetBatch(targeter, false, &targeted); + ASSERT(status.isOK()); + ASSERT_EQUALS(targeted.size(), 1u); + + BatchedCommandResponse response; + buildResponse(1, &response); + + batchOp.noteBatchResponse(*targeted.front(), response, NULL); + ASSERT(batchOp.isFinished()); + } + + TEST(WriteOpLimitTests, OneBigOneSmall) { + + // + // Big doc with smaller additional doc - should go through as two batches + // + + NamespaceString nss("foo.bar"); + ShardEndpoint endpoint("shard", ChunkVersion::IGNORED()); + MockNSTargeter targeter; + initTargeterFullRange(nss, endpoint, &targeter); + + // Create a BSONObj (slightly) bigger than the maximum size by including a max-size string + string bigString(BSONObjMaxUserSize, 'x'); + + BatchedCommandRequest request(BatchedCommandRequest::BatchType_Update); + request.setNS(nss.ns()); + BatchedUpdateDocument* bigUpdateDoc = buildUpdate(BSON( "x" << 1 ), + BSON( "data" << bigString ), + false); + request.getUpdateRequest()->addToUpdates(bigUpdateDoc); + request.getUpdateRequest()->addToUpdates(buildUpdate(BSON( "x" << 2 ), + BSONObj(), + false)); + + BatchWriteOp batchOp; + batchOp.initClientRequest(&request); + + OwnedPointerVector<TargetedWriteBatch> targetedOwned; + vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector(); + Status status = batchOp.targetBatch(targeter, false, &targeted); + ASSERT(status.isOK()); + ASSERT_EQUALS(targeted.size(), 1u); + ASSERT_EQUALS(targeted.front()->getWrites().size(), 1u); + + BatchedCommandResponse response; + buildResponse(1, &response); + + batchOp.noteBatchResponse(*targeted.front(), response, NULL); + ASSERT(!batchOp.isFinished()); + + targetedOwned.clear(); + status = batchOp.targetBatch(targeter, false, &targeted); + ASSERT(status.isOK()); + ASSERT_EQUALS(targeted.size(), 1u); + ASSERT_EQUALS(targeted.front()->getWrites().size(), 1u); + + batchOp.noteBatchResponse(*targeted.front(), response, NULL); + ASSERT(batchOp.isFinished()); + } + + TEST(WriteOpLimitTests, TooManyOps) { + + // + // Batch of 1002 documents + // + + NamespaceString nss("foo.bar"); + ShardEndpoint endpoint("shard", ChunkVersion::IGNORED()); + MockNSTargeter targeter; + initTargeterFullRange(nss, endpoint, &targeter); + + BatchedCommandRequest request(BatchedCommandRequest::BatchType_Delete); + request.setNS(nss.ns()); + + // Add 2 more than the maximum to the batch + for (size_t i = 0; i < BatchedCommandRequest::kMaxWriteBatchSize + 2u; ++i) { + request.getDeleteRequest()->addToDeletes(buildDelete(BSON( "x" << 2 ), 0)); + } + + BatchWriteOp batchOp; + batchOp.initClientRequest(&request); + + OwnedPointerVector<TargetedWriteBatch> targetedOwned; + vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector(); + Status status = batchOp.targetBatch(targeter, false, &targeted); + ASSERT(status.isOK()); + ASSERT_EQUALS(targeted.size(), 1u); + ASSERT_EQUALS(targeted.front()->getWrites().size(), 1000u); + + BatchedCommandResponse response; + buildResponse(1, &response); + + batchOp.noteBatchResponse(*targeted.front(), response, NULL); + ASSERT(!batchOp.isFinished()); + + targetedOwned.clear(); + status = batchOp.targetBatch(targeter, false, &targeted); + ASSERT(status.isOK()); + ASSERT_EQUALS(targeted.size(), 1u); + ASSERT_EQUALS(targeted.front()->getWrites().size(), 2u); + + batchOp.noteBatchResponse(*targeted.front(), response, NULL); + ASSERT(batchOp.isFinished()); + } + + TEST(WriteOpLimitTests, UpdateOverheadIncluded) { + + // + // Tests that the overhead of the extra fields in an update x 1000 is included in our size + // calculation + // + + NamespaceString nss("foo.bar"); + ShardEndpoint endpoint("shard", ChunkVersion::IGNORED()); + MockNSTargeter targeter; + initTargeterFullRange(nss, endpoint, &targeter); + + int updateDataBytes = BSONObjMaxUserSize + / static_cast<int>(BatchedCommandRequest::kMaxWriteBatchSize); + + string dataString(updateDataBytes - BSON( "x" << 1 << "data" << "" ).objsize(), 'x'); + + BatchedCommandRequest request(BatchedCommandRequest::BatchType_Update); + request.setNS(nss.ns()); + + // Add the maximum number of updates + int estSizeBytes = 0; + for (size_t i = 0; i < BatchedCommandRequest::kMaxWriteBatchSize; ++i) { + BatchedUpdateDocument* updateDoc = new BatchedUpdateDocument; + updateDoc->setQuery(BSON( "x" << 1 << "data" << dataString )); + updateDoc->setUpdateExpr(BSONObj()); + updateDoc->setMulti(false); + updateDoc->setUpsert(false); + request.getUpdateRequest()->addToUpdates(updateDoc); + estSizeBytes += updateDoc->toBSON().objsize(); + } + + ASSERT_GREATER_THAN(estSizeBytes, BSONObjMaxInternalSize); + + BatchWriteOp batchOp; + batchOp.initClientRequest(&request); + + OwnedPointerVector<TargetedWriteBatch> targetedOwned; + vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector(); + Status status = batchOp.targetBatch(targeter, false, &targeted); + ASSERT(status.isOK()); + ASSERT_EQUALS(targeted.size(), 1u); + ASSERT_LESS_THAN(targeted.front()->getWrites().size(), 1000u); + + BatchedCommandRequest childRequest(BatchedCommandRequest::BatchType_Update); + batchOp.buildBatchRequest(*targeted.front(), &childRequest); + ASSERT_LESS_THAN(childRequest.toBSON().objsize(), BSONObjMaxInternalSize); + + BatchedCommandResponse response; + buildResponse(1, &response); + + batchOp.noteBatchResponse(*targeted.front(), response, NULL); + ASSERT(!batchOp.isFinished()); + + targetedOwned.clear(); + status = batchOp.targetBatch(targeter, false, &targeted); + ASSERT(status.isOK()); + ASSERT_EQUALS(targeted.size(), 1u); + ASSERT_LESS_THAN(targeted.front()->getWrites().size(), 1000u); + + childRequest.clear(); + batchOp.buildBatchRequest(*targeted.front(), &childRequest); + ASSERT_LESS_THAN(childRequest.toBSON().objsize(), BSONObjMaxInternalSize); + + batchOp.noteBatchResponse(*targeted.front(), response, NULL); + ASSERT(batchOp.isFinished()); + } + + } // unnamed namespace diff --git a/src/mongo/s/write_ops/write_op.cpp b/src/mongo/s/write_ops/write_op.cpp index 6eadc571449..f8097154ce2 100644 --- a/src/mongo/s/write_ops/write_op.cpp +++ b/src/mongo/s/write_ops/write_op.cpp @@ -47,6 +47,10 @@ namespace mongo { clear( &_history ); } + const BatchItemRef& WriteOp::getWriteItem() const { + return _itemRef; + } + WriteOpState WriteOp::getWriteState() const { return _state; } diff --git a/src/mongo/s/write_ops/write_op.h b/src/mongo/s/write_ops/write_op.h index 31adedda632..07957f1e314 100644 --- a/src/mongo/s/write_ops/write_op.h +++ b/src/mongo/s/write_ops/write_op.h @@ -100,6 +100,11 @@ namespace mongo { ~WriteOp(); /** + * Returns the write item for this operation + */ + const BatchItemRef& getWriteItem() const; + + /** * Returns the op's current state. */ WriteOpState getWriteState() const; |