summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGreg Studer <greg@10gen.com>2014-04-09 12:51:18 -0400
committerGreg Studer <greg@10gen.com>2014-04-14 10:48:20 -0400
commit52da235706ca1713ae91475009348a0ceded5307 (patch)
treeac66d91023a63820d833ec0b7153e4a0d72f18d2
parentb14b921ba3b3424681212707d82b76097bf13c9d (diff)
downloadmongo-52da235706ca1713ae91475009348a0ceded5307.tar.gz
SERVER-13518 break child batches up by size or number of documents
(cherry picked from commit 0b1994a25c85324ea413a95ace2470be3efb7db5) Also includes SERVER-13518 make sure all tested updates are valid with update exprs (cherry picked from commit 836b1d82810d4b7d98eed8c789f71ed45f473b85)
-rw-r--r--jstests/sharding/batch_write_command_sharded.js60
-rw-r--r--src/mongo/s/mock_ns_targeter.h1
-rw-r--r--src/mongo/s/write_ops/batch_write_op.cpp114
-rw-r--r--src/mongo/s/write_ops/batch_write_op_test.cpp214
-rw-r--r--src/mongo/s/write_ops/write_op.cpp4
-rw-r--r--src/mongo/s/write_ops/write_op.h5
6 files changed, 378 insertions, 20 deletions
diff --git a/jstests/sharding/batch_write_command_sharded.js b/jstests/sharding/batch_write_command_sharded.js
index 2a8944e6b05..269ac5b24b9 100644
--- a/jstests/sharding/batch_write_command_sharded.js
+++ b/jstests/sharding/batch_write_command_sharded.js
@@ -25,6 +25,58 @@ var result;
//
//
+// Mongos _id autogeneration tests for sharded collections
+
+var coll = mongos.getCollection("foo.bar");
+assert.commandWorked(admin.runCommand({ enableSharding : coll.getDB().toString() }));
+assert.commandWorked(admin.runCommand({ shardCollection : coll.toString(),
+ key : { _id : 1 } }));
+
+//
+// Basic insert no _id
+coll.remove({});
+printjson( request = {insert : coll.getName(),
+ documents: [{ a : 1 }] } );
+printjson( result = coll.runCommand(request) );
+assert(result.ok);
+assert.eq(1, result.n);
+assert.eq(1, coll.count());
+
+//
+// Multi insert some _ids
+coll.remove({});
+printjson( request = {insert : coll.getName(),
+ documents: [{ _id : 0, a : 1 }, { a : 2 }] } );
+printjson( result = coll.runCommand(request) );
+assert(result.ok);
+assert.eq(2, result.n);
+assert.eq(2, coll.count());
+assert.eq(1, coll.count({ _id : 0 }));
+
+//
+// Ensure generating many _ids don't push us over limits
+var maxDocSize = (16 * 1024 * 1024) / 1000;
+var baseDocSize = Object.bsonsize({ a : 1, data : "" });
+var dataSize = maxDocSize - baseDocSize;
+
+var data = "";
+for (var i = 0; i < dataSize; i++)
+ data += "x";
+
+var documents = [];
+for (var i = 0; i < 1000; i++) documents.push({ a : i, data : data });
+
+assert.commandWorked(coll.getMongo().getDB("admin").runCommand({ setParameter : 1, logLevel : 4 }));
+coll.remove({});
+request = { insert : coll.getName(),
+ documents: documents };
+printjson( result = coll.runCommand(request) );
+assert(result.ok);
+assert.eq(1000, result.n);
+assert.eq(1000, coll.count());
+
+//
+//
// Stale config progress tests
// Set up a new collection across two shards, then revert the chunks to an earlier state to put
// mongos and mongod permanently out of sync.
@@ -33,9 +85,9 @@ var result;
var brokenColl = mongos.getCollection( "broken.coll" );
assert.commandWorked(admin.runCommand({ enableSharding : brokenColl.getDB().toString() }));
printjson(admin.runCommand({ movePrimary : brokenColl.getDB().toString(), to : shards[0]._id }));
-assert.commandWorked(admin.runCommand({ shardCollection : brokenColl.toString(),
+assert.commandWorked(admin.runCommand({ shardCollection : brokenColl.toString(),
key : { _id : 1 } }));
-assert.commandWorked(admin.runCommand({ split : brokenColl.toString(),
+assert.commandWorked(admin.runCommand({ split : brokenColl.toString(),
middle : { _id : 0 } }));
var oldChunks = config.chunks.find().toArray();
@@ -49,7 +101,7 @@ assert.eq(null, brokenColl.getDB().getLastError());
// Modify the chunks to make shards at a higher version
-assert.commandWorked(admin.runCommand({ moveChunk : brokenColl.toString(),
+assert.commandWorked(admin.runCommand({ moveChunk : brokenColl.toString(),
find : { _id : 0 },
to : shards[1]._id }));
@@ -61,7 +113,7 @@ for ( var i = 0; i < oldChunks.length; i++ )
config.chunks.insert(oldChunks[i]);
assert.eq(null, config.getLastError());
-// Stale mongos can no longer bring itself up-to-date!
+// Stale mongos can no longer bring itself up-to-date!
// END SETUP
//
diff --git a/src/mongo/s/mock_ns_targeter.h b/src/mongo/s/mock_ns_targeter.h
index d51ea5de3b6..f4e6140220e 100644
--- a/src/mongo/s/mock_ns_targeter.h
+++ b/src/mongo/s/mock_ns_targeter.h
@@ -160,7 +160,6 @@ namespace mongo {
KeyRange parseRange( const BSONObj& query ) const {
- ASSERT_EQUALS( query.nFields(), 1 );
string fieldName = query.firstElement().fieldName();
if ( query.firstElement().isNumber() ) {
diff --git a/src/mongo/s/write_ops/batch_write_op.cpp b/src/mongo/s/write_ops/batch_write_op.cpp
index 8f0688eaef2..15de4a1fd6b 100644
--- a/src/mongo/s/write_ops/batch_write_op.cpp
+++ b/src/mongo/s/write_ops/batch_write_op.cpp
@@ -105,6 +105,22 @@ namespace mongo {
};
typedef std::map<const ShardEndpoint*, TargetedWriteBatch*, EndpointComp> TargetedBatchMap;
+
+ //
+ // Types for tracking batch sizes
+ //
+
+ struct BatchSize {
+
+ BatchSize() :
+ numOps(0), sizeBytes(0) {
+ }
+
+ int numOps;
+ int sizeBytes;
+ };
+
+ typedef std::map<const ShardEndpoint*, BatchSize, EndpointComp> TargetedBatchSizeMap;
}
static void buildTargetError( const Status& errStatus, WriteErrorDetail* details ) {
@@ -128,19 +144,66 @@ namespace mongo {
return false;
}
- // Helper function to cancel all the write ops of targeted batch.
- static void cancelBatch( const TargetedWriteBatch& targetedBatch,
- WriteOp* writeOps,
- const WriteErrorDetail& why ) {
- const vector<TargetedWrite*>& writes = targetedBatch.getWrites();
+ // MAGIC NUMBERS
+ // Before serializing updates/deletes, we don't know how big their fields would be, but we break
+ // batches before serializing.
+ // TODO: Revisit when we revisit command limits in general
+ static const int kEstUpdateOverheadBytes = (BSONObjMaxInternalSize - BSONObjMaxUserSize) / 100;
+ static const int kEstDeleteOverheadBytes = (BSONObjMaxInternalSize - BSONObjMaxUserSize) / 100;
+
+ static int getWriteSizeBytes(const WriteOp& writeOp) {
+
+ const BatchItemRef& item = writeOp.getWriteItem();
+ BatchedCommandRequest::BatchType batchType = item.getOpType();
+
+ if (batchType == BatchedCommandRequest::BatchType_Insert) {
+ return item.getDocument().objsize();
+ }
+ else if (batchType == BatchedCommandRequest::BatchType_Update) {
+ // Note: Be conservative here - it's okay if we send slightly too many batches
+ int estSize = item.getUpdate()->getQuery().objsize()
+ + item.getUpdate()->getUpdateExpr().objsize() + kEstUpdateOverheadBytes;
+ dassert(estSize >= item.getUpdate()->toBSON().objsize());
+ return estSize;
+ }
+ else {
+ dassert( batchType == BatchedCommandRequest::BatchType_Delete );
+ // Note: Be conservative here - it's okay if we send slightly too many batches
+ int estSize = item.getDelete()->getQuery().objsize() + kEstDeleteOverheadBytes;
+ dassert(estSize >= item.getDelete()->toBSON().objsize());
+ return estSize;
+ }
+ }
+
+ // Helper to determine whether a number of targeted writes require a new targeted batch
+ static bool wouldMakeBatchesTooBig(const vector<TargetedWrite*>& writes,
+ int writeSizeBytes,
+ const TargetedBatchSizeMap& batchSizes) {
+
+ for (vector<TargetedWrite*>::const_iterator it = writes.begin(); it != writes.end(); ++it) {
- for ( vector<TargetedWrite*>::const_iterator writeIt = writes.begin();
- writeIt != writes.end(); ++writeIt ) {
+ const TargetedWrite* write = *it;
+ TargetedBatchSizeMap::const_iterator seenIt = batchSizes.find(&write->endpoint);
+
+ if (seenIt == batchSizes.end()) {
+ // If this is the first item in the batch, it can't be too big
+ continue;
+ }
+
+ const BatchSize& batchSize = seenIt->second;
- TargetedWrite* write = *writeIt;
- // NOTE: We may repeatedly cancel a write op here, but that's fast.
- writeOps[write->writeOpRef.first].cancelWrites( &why );
+ if (batchSize.numOps >= static_cast<int>(BatchedCommandRequest::kMaxWriteBatchSize)) {
+ // Too many items in batch
+ return true;
+ }
+
+ if (batchSize.sizeBytes + writeSizeBytes > BSONObjMaxUserSize) {
+ // Batch would be too big
+ return true;
+ }
}
+
+ return false;
}
// Helper function to cancel all the write ops of targeted batches in a map
@@ -215,6 +278,8 @@ namespace mongo {
const bool ordered = _clientRequest->getOrdered();
TargetedBatchMap batchMap;
+ TargetedBatchSizeMap batchSizes;
+
int numTargetErrors = 0;
size_t numWriteOps = _clientRequest->sizeWriteOps();
@@ -287,6 +352,17 @@ namespace mongo {
}
//
+ // If this write will push us over some sort of size limit, stop targeting
+ //
+
+ int writeSizeBytes = getWriteSizeBytes(writeOp);
+ if (wouldMakeBatchesTooBig(writes, writeSizeBytes, batchSizes)) {
+ invariant(!batchMap.empty());
+ writeOp.cancelWrites(NULL);
+ break;
+ }
+
+ //
// Targeting went ok, add to appropriate TargetedBatch
//
@@ -294,14 +370,22 @@ namespace mongo {
TargetedWrite* write = *it;
- TargetedBatchMap::iterator seenIt = batchMap.find( &write->endpoint );
- if ( seenIt == batchMap.end() ) {
+ TargetedBatchMap::iterator batchIt = batchMap.find( &write->endpoint );
+ TargetedBatchSizeMap::iterator batchSizeIt = batchSizes.find( &write->endpoint );
+
+ if ( batchIt == batchMap.end() ) {
TargetedWriteBatch* newBatch = new TargetedWriteBatch( write->endpoint );
- seenIt = batchMap.insert( make_pair( &newBatch->getEndpoint(), //
- newBatch ) ).first;
+ batchIt = batchMap.insert( make_pair( &newBatch->getEndpoint(),
+ newBatch ) ).first;
+ batchSizeIt = batchSizes.insert(make_pair(&newBatch->getEndpoint(),
+ BatchSize())).first;
}
- TargetedWriteBatch* batch = seenIt->second;
+ TargetedWriteBatch* batch = batchIt->second;
+ BatchSize& batchSize = batchSizeIt->second;
+
+ ++batchSize.numOps;
+ batchSize.sizeBytes += writeSizeBytes;
batch->addWrite( write );
}
diff --git a/src/mongo/s/write_ops/batch_write_op_test.cpp b/src/mongo/s/write_ops/batch_write_op_test.cpp
index 06e256f6555..7170b6f5ce7 100644
--- a/src/mongo/s/write_ops/batch_write_op_test.cpp
+++ b/src/mongo/s/write_ops/batch_write_op_test.cpp
@@ -92,11 +92,22 @@ namespace {
static BatchedUpdateDocument* buildUpdate( const BSONObj& query, bool multi ) {
BatchedUpdateDocument* updateDoc = new BatchedUpdateDocument;
+ updateDoc->setUpdateExpr( BSONObj() );
updateDoc->setQuery( query );
updateDoc->setMulti( multi );
return updateDoc;
}
+ static BatchedUpdateDocument* buildUpdate(const BSONObj& query,
+ const BSONObj& updateExpr,
+ bool multi) {
+ BatchedUpdateDocument* updateDoc = new BatchedUpdateDocument;
+ updateDoc->setQuery( query );
+ updateDoc->setUpdateExpr( updateExpr );
+ updateDoc->setMulti( multi );
+ return updateDoc;
+ }
+
static void buildResponse( int n, BatchedCommandResponse* response ) {
response->clear();
response->setOk( true );
@@ -1616,4 +1627,207 @@ namespace {
ASSERT( clientResponse.isWriteConcernErrorSet() );
}
+ //
+ // Tests of batch size limit functionality
+ //
+
+ TEST(WriteOpLimitTests, OneBigDoc) {
+
+ //
+ // Big single operation test - should go through
+ //
+
+ NamespaceString nss("foo.bar");
+ ShardEndpoint endpoint("shard", ChunkVersion::IGNORED());
+ MockNSTargeter targeter;
+ initTargeterFullRange(nss, endpoint, &targeter);
+
+ // Create a BSONObj (slightly) bigger than the maximum size by including a max-size string
+ string bigString(BSONObjMaxUserSize, 'x');
+
+ // Do single-target, single doc batch write op
+ BatchedCommandRequest request(BatchedCommandRequest::BatchType_Insert);
+ request.setNS(nss.ns());
+ request.getInsertRequest()->addToDocuments(BSON( "x" << 1 << "data" << bigString ));
+
+ BatchWriteOp batchOp;
+ batchOp.initClientRequest(&request);
+
+ OwnedPointerVector<TargetedWriteBatch> targetedOwned;
+ vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector();
+ Status status = batchOp.targetBatch(targeter, false, &targeted);
+ ASSERT(status.isOK());
+ ASSERT_EQUALS(targeted.size(), 1u);
+
+ BatchedCommandResponse response;
+ buildResponse(1, &response);
+
+ batchOp.noteBatchResponse(*targeted.front(), response, NULL);
+ ASSERT(batchOp.isFinished());
+ }
+
+ TEST(WriteOpLimitTests, OneBigOneSmall) {
+
+ //
+ // Big doc with smaller additional doc - should go through as two batches
+ //
+
+ NamespaceString nss("foo.bar");
+ ShardEndpoint endpoint("shard", ChunkVersion::IGNORED());
+ MockNSTargeter targeter;
+ initTargeterFullRange(nss, endpoint, &targeter);
+
+ // Create a BSONObj (slightly) bigger than the maximum size by including a max-size string
+ string bigString(BSONObjMaxUserSize, 'x');
+
+ BatchedCommandRequest request(BatchedCommandRequest::BatchType_Update);
+ request.setNS(nss.ns());
+ BatchedUpdateDocument* bigUpdateDoc = buildUpdate(BSON( "x" << 1 ),
+ BSON( "data" << bigString ),
+ false);
+ request.getUpdateRequest()->addToUpdates(bigUpdateDoc);
+ request.getUpdateRequest()->addToUpdates(buildUpdate(BSON( "x" << 2 ),
+ BSONObj(),
+ false));
+
+ BatchWriteOp batchOp;
+ batchOp.initClientRequest(&request);
+
+ OwnedPointerVector<TargetedWriteBatch> targetedOwned;
+ vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector();
+ Status status = batchOp.targetBatch(targeter, false, &targeted);
+ ASSERT(status.isOK());
+ ASSERT_EQUALS(targeted.size(), 1u);
+ ASSERT_EQUALS(targeted.front()->getWrites().size(), 1u);
+
+ BatchedCommandResponse response;
+ buildResponse(1, &response);
+
+ batchOp.noteBatchResponse(*targeted.front(), response, NULL);
+ ASSERT(!batchOp.isFinished());
+
+ targetedOwned.clear();
+ status = batchOp.targetBatch(targeter, false, &targeted);
+ ASSERT(status.isOK());
+ ASSERT_EQUALS(targeted.size(), 1u);
+ ASSERT_EQUALS(targeted.front()->getWrites().size(), 1u);
+
+ batchOp.noteBatchResponse(*targeted.front(), response, NULL);
+ ASSERT(batchOp.isFinished());
+ }
+
+ TEST(WriteOpLimitTests, TooManyOps) {
+
+ //
+ // Batch of 1002 documents
+ //
+
+ NamespaceString nss("foo.bar");
+ ShardEndpoint endpoint("shard", ChunkVersion::IGNORED());
+ MockNSTargeter targeter;
+ initTargeterFullRange(nss, endpoint, &targeter);
+
+ BatchedCommandRequest request(BatchedCommandRequest::BatchType_Delete);
+ request.setNS(nss.ns());
+
+ // Add 2 more than the maximum to the batch
+ for (size_t i = 0; i < BatchedCommandRequest::kMaxWriteBatchSize + 2u; ++i) {
+ request.getDeleteRequest()->addToDeletes(buildDelete(BSON( "x" << 2 ), 0));
+ }
+
+ BatchWriteOp batchOp;
+ batchOp.initClientRequest(&request);
+
+ OwnedPointerVector<TargetedWriteBatch> targetedOwned;
+ vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector();
+ Status status = batchOp.targetBatch(targeter, false, &targeted);
+ ASSERT(status.isOK());
+ ASSERT_EQUALS(targeted.size(), 1u);
+ ASSERT_EQUALS(targeted.front()->getWrites().size(), 1000u);
+
+ BatchedCommandResponse response;
+ buildResponse(1, &response);
+
+ batchOp.noteBatchResponse(*targeted.front(), response, NULL);
+ ASSERT(!batchOp.isFinished());
+
+ targetedOwned.clear();
+ status = batchOp.targetBatch(targeter, false, &targeted);
+ ASSERT(status.isOK());
+ ASSERT_EQUALS(targeted.size(), 1u);
+ ASSERT_EQUALS(targeted.front()->getWrites().size(), 2u);
+
+ batchOp.noteBatchResponse(*targeted.front(), response, NULL);
+ ASSERT(batchOp.isFinished());
+ }
+
+ TEST(WriteOpLimitTests, UpdateOverheadIncluded) {
+
+ //
+ // Tests that the overhead of the extra fields in an update x 1000 is included in our size
+ // calculation
+ //
+
+ NamespaceString nss("foo.bar");
+ ShardEndpoint endpoint("shard", ChunkVersion::IGNORED());
+ MockNSTargeter targeter;
+ initTargeterFullRange(nss, endpoint, &targeter);
+
+ int updateDataBytes = BSONObjMaxUserSize
+ / static_cast<int>(BatchedCommandRequest::kMaxWriteBatchSize);
+
+ string dataString(updateDataBytes - BSON( "x" << 1 << "data" << "" ).objsize(), 'x');
+
+ BatchedCommandRequest request(BatchedCommandRequest::BatchType_Update);
+ request.setNS(nss.ns());
+
+ // Add the maximum number of updates
+ int estSizeBytes = 0;
+ for (size_t i = 0; i < BatchedCommandRequest::kMaxWriteBatchSize; ++i) {
+ BatchedUpdateDocument* updateDoc = new BatchedUpdateDocument;
+ updateDoc->setQuery(BSON( "x" << 1 << "data" << dataString ));
+ updateDoc->setUpdateExpr(BSONObj());
+ updateDoc->setMulti(false);
+ updateDoc->setUpsert(false);
+ request.getUpdateRequest()->addToUpdates(updateDoc);
+ estSizeBytes += updateDoc->toBSON().objsize();
+ }
+
+ ASSERT_GREATER_THAN(estSizeBytes, BSONObjMaxInternalSize);
+
+ BatchWriteOp batchOp;
+ batchOp.initClientRequest(&request);
+
+ OwnedPointerVector<TargetedWriteBatch> targetedOwned;
+ vector<TargetedWriteBatch*>& targeted = targetedOwned.mutableVector();
+ Status status = batchOp.targetBatch(targeter, false, &targeted);
+ ASSERT(status.isOK());
+ ASSERT_EQUALS(targeted.size(), 1u);
+ ASSERT_LESS_THAN(targeted.front()->getWrites().size(), 1000u);
+
+ BatchedCommandRequest childRequest(BatchedCommandRequest::BatchType_Update);
+ batchOp.buildBatchRequest(*targeted.front(), &childRequest);
+ ASSERT_LESS_THAN(childRequest.toBSON().objsize(), BSONObjMaxInternalSize);
+
+ BatchedCommandResponse response;
+ buildResponse(1, &response);
+
+ batchOp.noteBatchResponse(*targeted.front(), response, NULL);
+ ASSERT(!batchOp.isFinished());
+
+ targetedOwned.clear();
+ status = batchOp.targetBatch(targeter, false, &targeted);
+ ASSERT(status.isOK());
+ ASSERT_EQUALS(targeted.size(), 1u);
+ ASSERT_LESS_THAN(targeted.front()->getWrites().size(), 1000u);
+
+ childRequest.clear();
+ batchOp.buildBatchRequest(*targeted.front(), &childRequest);
+ ASSERT_LESS_THAN(childRequest.toBSON().objsize(), BSONObjMaxInternalSize);
+
+ batchOp.noteBatchResponse(*targeted.front(), response, NULL);
+ ASSERT(batchOp.isFinished());
+ }
+
+
} // unnamed namespace
diff --git a/src/mongo/s/write_ops/write_op.cpp b/src/mongo/s/write_ops/write_op.cpp
index 6eadc571449..f8097154ce2 100644
--- a/src/mongo/s/write_ops/write_op.cpp
+++ b/src/mongo/s/write_ops/write_op.cpp
@@ -47,6 +47,10 @@ namespace mongo {
clear( &_history );
}
+ const BatchItemRef& WriteOp::getWriteItem() const {
+ return _itemRef;
+ }
+
WriteOpState WriteOp::getWriteState() const {
return _state;
}
diff --git a/src/mongo/s/write_ops/write_op.h b/src/mongo/s/write_ops/write_op.h
index 31adedda632..07957f1e314 100644
--- a/src/mongo/s/write_ops/write_op.h
+++ b/src/mongo/s/write_ops/write_op.h
@@ -100,6 +100,11 @@ namespace mongo {
~WriteOp();
/**
+ * Returns the write item for this operation
+ */
+ const BatchItemRef& getWriteItem() const;
+
+ /**
* Returns the op's current state.
*/
WriteOpState getWriteState() const;