diff options
author | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2018-06-22 19:35:59 -0400 |
---|---|---|
committer | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2018-06-27 14:33:09 -0400 |
commit | e7a75ec01e4e3683cc6b83e3bbc0f4c4b05168dc (patch) | |
tree | b52326011cd57ac455f498c8f5d68c7a4fbea640 | |
parent | 0fdc2e40994c930a97aa213fa990423616bc5268 (diff) | |
download | mongo-e7a75ec01e4e3683cc6b83e3bbc0f4c4b05168dc.tar.gz |
SERVER-35691 Cleanup StaleConfigInfo serialization
-rw-r--r-- | jstests/sharding/zbigMapReduce.js | 237 | ||||
-rw-r--r-- | src/mongo/client/dbclientcursor.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/service_entry_point_common.cpp | 37 | ||||
-rw-r--r-- | src/mongo/rpc/legacy_reply_builder.cpp | 6 | ||||
-rw-r--r-- | src/mongo/rpc/reply_builder_interface.cpp | 8 |
5 files changed, 138 insertions, 152 deletions
diff --git a/jstests/sharding/zbigMapReduce.js b/jstests/sharding/zbigMapReduce.js index 10541ecd16b..baf96845f62 100644 --- a/jstests/sharding/zbigMapReduce.js +++ b/jstests/sharding/zbigMapReduce.js @@ -1,12 +1,12 @@ /** - * This test is skipped on 32-bit platforms - * * This test is labeled resource intensive because its total io_write is 625MB compared to a median * of 5MB across all sharding tests in wiredTiger. * @tags: [resource_intensive] */ -function setupTest() { - var s = new ShardingTest({ +(function() { + 'use strict'; + + let s = new ShardingTest({ shards: 2, mongos: 1, other: { @@ -14,40 +14,27 @@ function setupTest() { numReplicas: 2, chunkSize: 1, rsOptions: {oplogSize: 50}, - enableBalancer: true + enableAutoSplit: true, } }); - // Reduce chunk size to split - var config = s.getDB("config"); - assert.commandWorked(s.s0.adminCommand({enablesharding: "test"})); s.ensurePrimaryShard('test', s.shard0.shardName); assert.commandWorked(s.s0.adminCommand({shardcollection: "test.foo", key: {"_id": 1}})); - return s; -} -function runTest(s) { + let testDb = s.getDB("test"); + jsTest.log("Inserting a lot of documents into test.foo"); - db = s.getDB("test"); + + // Make each document data to be 5K so that the total size is ~250MB + const str = "#".repeat(5 * 1024); var idInc = 0; var valInc = 0; - var str = ""; - if (db.serverBuildInfo().bits == 32) { - // Make data ~0.5MB for 32 bit builds - for (var i = 0; i < 512; i++) - str += "a"; - } else { - // Make data ~4MB - for (var i = 0; i < 4 * 1024; i++) - str += "a"; - } - - var bulk = db.foo.initializeUnorderedBulkOp(); - for (j = 0; j < 100; j++) { - for (i = 0; i < 512; i++) { + var bulk = testDb.foo.initializeUnorderedBulkOp(); + for (var j = 0; j < 100; j++) { + for (var i = 0; i < 512; i++) { bulk.insert({i: idInc++, val: valInc++, y: str}); } } @@ -56,16 +43,14 @@ function runTest(s) { jsTest.log("Documents inserted, doing double-checks of insert..."); // Collect some useful stats to figure out what happened - if (db.foo.find().itcount() != 51200) { - sleep(1000); - + if (testDb.foo.find().itcount() != 51200) { s.printShardingStatus(true); - print("Shard 0: " + s.shard0.getCollection(db.foo + "").find().itcount()); - print("Shard 1: " + s.shard1.getCollection(db.foo + "").find().itcount()); + print("Shard 0: " + s.shard0.getCollection(testDb.foo + "").find().itcount()); + print("Shard 1: " + s.shard1.getCollection(testDb.foo + "").find().itcount()); for (var i = 0; i < 51200; i++) { - if (!db.foo.findOne({i: i}, {i: 1})) { + if (!testDb.foo.findOne({i: i}, {i: 1})) { print("Could not find: " + i); } @@ -73,20 +58,10 @@ function runTest(s) { print("Checked " + i); } - print("PROBABLY WILL ASSERT NOW"); + assert(false, 'Incorect number of chunks found!'); } - assert.soon(function() { - var c = db.foo.find().itcount(); - if (c == 51200) { - return true; - } - - print("Count is " + c); - return false; - }); - - s.printChunks(); + s.printChunks(testDb.foo.getFullName()); s.printChangeLog(); function map() { @@ -96,148 +71,160 @@ function runTest(s) { return Array.sum(values); } + // Let chunks move around while map reduce is running + s.startBalancer(); + jsTest.log("Test basic mapreduce..."); // Test basic mapReduce for (var iter = 0; iter < 5; iter++) { print("Test #" + iter); - out = db.foo.mapReduce(map, reduce, "big_out"); + testDb.foo.mapReduce(map, reduce, "big_out"); } print("Testing output to different db..."); - // test output to a different DB - // do it multiple times so that primary shard changes - for (iter = 0; iter < 5; iter++) { + // Test output to a different DB - do it multiple times so that the merging shard changes + for (var iter = 0; iter < 5; iter++) { print("Test #" + iter); - assert.eq(51200, db.foo.find().itcount(), "Not all data was found!"); + assert.eq(51200, testDb.foo.find().itcount(), "Not all data was found!"); - outCollStr = "mr_replace_col_" + iter; - outDbStr = "mr_db_" + iter; + let outCollStr = "mr_replace_col_" + iter; + let outDbStr = "mr_db_" + iter; print("Testing mr replace into DB " + iter); - res = db.foo.mapReduce(map, reduce, {out: {replace: outCollStr, db: outDbStr}}); + var res = testDb.foo.mapReduce(map, reduce, {out: {replace: outCollStr, db: outDbStr}}); printjson(res); - outDb = s.getDB(outDbStr); - outColl = outDb[outCollStr]; - - obj = outColl.convertToSingleObject("value"); + var outDb = s.getDB(outDbStr); + var outColl = outDb[outCollStr]; + var obj = outColl.convertToSingleObject("value"); assert.eq(51200, obj.count, "Received wrong result " + obj.count); - print("checking result field"); + print("Checking result field"); assert.eq(res.result.collection, outCollStr, "Wrong collection " + res.result.collection); assert.eq(res.result.db, outDbStr, "Wrong db " + res.result.db); } jsTest.log("Verifying nonatomic M/R throws..."); - // check nonAtomic output + // Check nonAtomic output assert.throws(function() { - db.foo.mapReduce(map, reduce, {out: {replace: "big_out", nonAtomic: true}}); + testDb.foo.mapReduce(map, reduce, {out: {replace: "big_out", nonAtomic: true}}); }); jsTest.log("Adding documents"); // Add docs with dup "i" valInc = 0; - for (j = 0; j < 100; j++) { + for (var j = 0; j < 100; j++) { print("Inserted document: " + (j * 100)); - bulk = db.foo.initializeUnorderedBulkOp(); + var bulk = testDb.foo.initializeUnorderedBulkOp(); for (i = 0; i < 512; i++) { bulk.insert({i: idInc++, val: valInc++, y: str}); } - // wait for replication to catch up assert.writeOK(bulk.execute({w: 2, wtimeout: 10 * 60 * 1000})); } jsTest.log("No errors..."); - map2 = function() { + function map2() { emit(this.val, 1); - }; - reduce2 = function(key, values) { + } + function reduce2(key, values) { return Array.sum(values); - }; + } // Test merge - outcol = "big_out_merge"; + let outColMerge = 'big_out_merge'; // M/R quarter of the docs - jsTestLog("Test A"); - out = db.foo.mapReduce(map2, reduce2, {query: {i: {$lt: 25600}}, out: {merge: outcol}}); - printjson(out); - assert.eq(25600, out.counts.emit, "Received wrong result"); - assert.eq(25600, out.counts.output, "Received wrong result"); + { + jsTestLog("Test A"); + var out = testDb.foo.mapReduce( + map2, reduce2, {query: {i: {$lt: 25600}}, out: {merge: outColMerge}}); + printjson(out); + assert.eq(25600, out.counts.emit, "Received wrong result"); + assert.eq(25600, out.counts.output, "Received wrong result"); + } // M/R further docs - jsTestLog("Test B"); - out = db.foo.mapReduce( - map2, reduce2, {query: {i: {$gte: 25600, $lt: 51200}}, out: {merge: outcol}}); - printjson(out); - assert.eq(25600, out.counts.emit, "Received wrong result"); - assert.eq(51200, out.counts.output, "Received wrong result"); + { + jsTestLog("Test B"); + var out = testDb.foo.mapReduce( + map2, reduce2, {query: {i: {$gte: 25600, $lt: 51200}}, out: {merge: outColMerge}}); + printjson(out); + assert.eq(25600, out.counts.emit, "Received wrong result"); + assert.eq(51200, out.counts.output, "Received wrong result"); + } // M/R do 2nd half of docs - jsTestLog("Test C"); - out = db.foo.mapReduce( - map2, reduce2, {query: {i: {$gte: 51200}}, out: {merge: outcol, nonAtomic: true}}); - printjson(out); - assert.eq(51200, out.counts.emit, "Received wrong result"); - assert.eq(51200, out.counts.output, "Received wrong result"); - assert.eq(1, db[outcol].findOne().value, "Received wrong result"); + { + jsTestLog("Test C"); + var out = testDb.foo.mapReduce( + map2, reduce2, {query: {i: {$gte: 51200}}, out: {merge: outColMerge, nonAtomic: true}}); + printjson(out); + assert.eq(51200, out.counts.emit, "Received wrong result"); + assert.eq(51200, out.counts.output, "Received wrong result"); + assert.eq(1, testDb[outColMerge].findOne().value, "Received wrong result"); + } // Test reduce - jsTestLog("Test D"); - outcol = "big_out_reduce"; + let outColReduce = "big_out_reduce"; // M/R quarter of the docs - out = db.foo.mapReduce(map2, reduce2, {query: {i: {$lt: 25600}}, out: {reduce: outcol}}); - printjson(out); - assert.eq(25600, out.counts.emit, "Received wrong result"); - assert.eq(25600, out.counts.output, "Received wrong result"); + { + jsTestLog("Test D"); + var out = testDb.foo.mapReduce( + map2, reduce2, {query: {i: {$lt: 25600}}, out: {reduce: outColReduce}}); + printjson(out); + assert.eq(25600, out.counts.emit, "Received wrong result"); + assert.eq(25600, out.counts.output, "Received wrong result"); + } // M/R further docs - jsTestLog("Test E"); - out = db.foo.mapReduce( - map2, reduce2, {query: {i: {$gte: 25600, $lt: 51200}}, out: {reduce: outcol}}); - printjson(out); - assert.eq(25600, out.counts.emit, "Received wrong result"); - assert.eq(51200, out.counts.output, "Received wrong result"); + { + jsTestLog("Test E"); + var out = testDb.foo.mapReduce( + map2, reduce2, {query: {i: {$gte: 25600, $lt: 51200}}, out: {reduce: outColReduce}}); + printjson(out); + assert.eq(25600, out.counts.emit, "Received wrong result"); + assert.eq(51200, out.counts.output, "Received wrong result"); + } // M/R do 2nd half of docs - jsTestLog("Test F"); - out = db.foo.mapReduce( - map2, reduce2, {query: {i: {$gte: 51200}}, out: {reduce: outcol, nonAtomic: true}}); - printjson(out); - assert.eq(51200, out.counts.emit, "Received wrong result"); - assert.eq(51200, out.counts.output, "Received wrong result"); - assert.eq(2, db[outcol].findOne().value, "Received wrong result"); + { + jsTestLog("Test F"); + var out = testDb.foo.mapReduce( + map2, + reduce2, + {query: {i: {$gte: 51200}}, out: {reduce: outColReduce, nonAtomic: true}}); + printjson(out); + assert.eq(51200, out.counts.emit, "Received wrong result"); + assert.eq(51200, out.counts.output, "Received wrong result"); + assert.eq(2, testDb[outColReduce].findOne().value, "Received wrong result"); + } // Verify that data is also on secondary - jsTestLog("Test G"); - var primary = s.rs0._master; - var secondaries = s.rs0._slaves; - - // Stop the balancer to prevent new writes from happening and make sure - // that replication can keep up even on slow machines. - s.stopBalancer(); - s.rs0.awaitReplication(); - assert.eq(51200, primary.getDB("test")[outcol].find().itcount(), "Wrong count"); - for (var i = 0; i < secondaries.length; ++i) { - assert.eq(51200, secondaries[i].getDB("test")[outcol].find().itcount(), "Wrong count"); + { + jsTestLog("Test G"); + var primary = s.rs0._master; + var secondaries = s.rs0._slaves; + + // Stop the balancer to prevent new writes from happening and make sure that replication can + // keep up even on slow machines + s.stopBalancer(); + s.rs0.awaitReplication(); + assert.eq(51200, primary.getDB("test")[outColReduce].find().itcount(), "Wrong count"); + + for (var i = 0; i < secondaries.length; ++i) { + assert.eq( + 51200, secondaries[i].getDB("test")[outColReduce].find().itcount(), "Wrong count"); + } } -} - -var s = setupTest(); - -if (s.getDB("admin").runCommand("buildInfo").bits < 64) { - print("Skipping test on 32-bit platforms"); -} else { - runTest(s); -} -s.stop(); + s.stop(); +})(); diff --git a/src/mongo/client/dbclientcursor.cpp b/src/mongo/client/dbclientcursor.cpp index cbd12145399..19d53c81a5a 100644 --- a/src/mongo/client/dbclientcursor.cpp +++ b/src/mongo/client/dbclientcursor.cpp @@ -252,7 +252,7 @@ BSONObj DBClientCursor::commandDataReceived(const Message& reply) { auto commandReply = _client->parseCommandReplyMessage(_client->getServerAddress(), reply); auto commandStatus = getStatusFromCommandResult(commandReply->getCommandReply()); - if (ErrorCodes::StaleConfig == commandStatus) { + if (commandStatus == ErrorCodes::StaleConfig) { uassertStatusOK( commandStatus.withContext("stale config in DBClientCursor::dataReceived()")); } else if (!commandStatus.isOK()) { diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp index c473881facf..6d74d0620ee 100644 --- a/src/mongo/db/service_entry_point_common.cpp +++ b/src/mongo/db/service_entry_point_common.cpp @@ -148,13 +148,13 @@ bool shouldActivateFailCommandFailPoint(const BSONObj& data, StringData cmdName) return false; } -void generateLegacyQueryErrorResponse(const AssertionException* exception, +void generateLegacyQueryErrorResponse(const AssertionException& exception, const QueryMessage& queryMessage, CurOp* curop, Message* response) { - curop->debug().errInfo = exception->toStatus(); + curop->debug().errInfo = exception.toStatus(); - log(LogComponent::kQuery) << "assertion " << exception->toString() << " ns:" << queryMessage.ns + log(LogComponent::kQuery) << "assertion " << exception.toString() << " ns:" << queryMessage.ns << " query:" << (queryMessage.query.valid(BSONVersion::kLatest) ? redact(queryMessage.query) : "query object is corrupt"); @@ -163,18 +163,18 @@ void generateLegacyQueryErrorResponse(const AssertionException* exception, << " ntoreturn:" << queryMessage.ntoreturn; } - auto scex = exception->extraInfo<StaleConfigInfo>(); - BSONObjBuilder err; - err.append("$err", exception->reason()); - err.append("code", exception->code()); - if (scex) { - err.append("ok", 0.0); - scex->serialize(&err); + err.append("$err", exception.reason()); + err.append("code", exception.code()); + err.append("ok", 0.0); + auto const extraInfo = exception.extraInfo(); + if (extraInfo) { + extraInfo->serialize(&err); } BSONObj errObj = err.done(); - if (scex) { + const bool isStaleConfig = exception.code() == ErrorCodes::StaleConfig; + if (isStaleConfig) { log(LogComponent::kQuery) << "stale version detected during query over " << queryMessage.ns << " : " << errObj; } @@ -187,8 +187,9 @@ void generateLegacyQueryErrorResponse(const AssertionException* exception, QueryResult::View msgdata = bb.buf(); QueryResult::View qr = msgdata; qr.setResultFlags(ResultFlag_ErrSet); - if (scex) + if (isStaleConfig) { qr.setResultFlags(qr.getResultFlags() | ResultFlag_ShardConfigStale); + } qr.msgdata().setLen(bb.len()); qr.msgdata().setOperation(opReply); qr.setCursorId(0); @@ -890,7 +891,7 @@ void execCommandDatabase(OperationContext* opCtx, sessionOptions)) { command->incrementCommandsFailed(); } - } catch (DBException&) { + } catch (const DBException&) { command->incrementCommandsFailed(); throw; } @@ -969,9 +970,9 @@ void curOpCommandSetup(OperationContext* opCtx, const OpMsgRequest& request) { curop->setNS_inlock(nss.ns()); } -DbResponse runCommands(OperationContext* opCtx, - const Message& message, - const ServiceEntryPointCommon::Hooks& behaviors) { +DbResponse receivedCommands(OperationContext* opCtx, + const Message& message, + const ServiceEntryPointCommon::Hooks& behaviors) { auto replyBuilder = rpc::makeReplyBuilder(rpc::protocolForMessage(message)); [&] { OpMsgRequest request; @@ -1089,7 +1090,7 @@ DbResponse receivedQuery(OperationContext* opCtx, } dbResponse.response.reset(); - generateLegacyQueryErrorResponse(&e, q, &op, &dbResponse.response); + generateLegacyQueryErrorResponse(e, q, &op, &dbResponse.response); } op.debug().responseLength = dbResponse.response.header().dataLen(); @@ -1314,7 +1315,7 @@ DbResponse ServiceEntryPointCommon::handleRequest(OperationContext* opCtx, DbResponse dbresponse; if (op == dbMsg || op == dbCommand || (op == dbQuery && isCommand)) { - dbresponse = runCommands(opCtx, m, behaviors); + dbresponse = receivedCommands(opCtx, m, behaviors); } else if (op == dbQuery) { invariant(!isCommand); dbresponse = receivedQuery(opCtx, nsString, c, m); diff --git a/src/mongo/rpc/legacy_reply_builder.cpp b/src/mongo/rpc/legacy_reply_builder.cpp index c2edc42b1cb..21e18cde76a 100644 --- a/src/mongo/rpc/legacy_reply_builder.cpp +++ b/src/mongo/rpc/legacy_reply_builder.cpp @@ -57,7 +57,6 @@ LegacyReplyBuilder& LegacyReplyBuilder::setCommandReply(Status nonOKStatus, invariant(_state == State::kCommandReply); if (nonOKStatus == ErrorCodes::StaleConfig) { _staleConfigError = true; - auto scex = nonOKStatus.extraInfo<StaleConfigInfo>(); // Need to use the special $err format for StaleConfig errors to be backwards // compatible. @@ -66,9 +65,8 @@ LegacyReplyBuilder& LegacyReplyBuilder::setCommandReply(Status nonOKStatus, // $err must be the first field in object. err.append("$err", nonOKStatus.reason()); err.append("code", nonOKStatus.code()); - if (scex) { - scex->serialize(&err); - } + auto const scex = nonOKStatus.extraInfo<StaleConfigInfo>(); + scex->serialize(&err); err.appendElements(extraErrorInfo); setRawCommandReply(err.done()); } else { diff --git a/src/mongo/rpc/reply_builder_interface.cpp b/src/mongo/rpc/reply_builder_interface.cpp index 8a4460d7247..99d580bb279 100644 --- a/src/mongo/rpc/reply_builder_interface.cpp +++ b/src/mongo/rpc/reply_builder_interface.cpp @@ -37,15 +37,14 @@ namespace mongo { namespace rpc { - namespace { + const char kOKField[] = "ok"; const char kCodeField[] = "code"; const char kCodeNameField[] = "codeName"; const char kErrorField[] = "errmsg"; -// similar to appendCommandStatusNoThrow (duplicating logic here to avoid cyclic library -// dependency) +// Similar to appendCommandStatusNoThrow (duplicating logic here to avoid cyclic library dependency) BSONObj augmentReplyWithStatus(const Status& status, BSONObj reply) { auto okField = reply.getField(kOKField); if (!okField.eoo() && okField.trueValue()) { @@ -75,7 +74,8 @@ BSONObj augmentReplyWithStatus(const Status& status, BSONObj reply) { return bob.obj(); } -} + +} // namespace ReplyBuilderInterface& ReplyBuilderInterface::setCommandReply(StatusWith<BSONObj> commandReply) { auto reply = commandReply.isOK() ? std::move(commandReply.getValue()) : BSONObj(); |