diff options
author | Mathias Stearn <mathias@10gen.com> | 2012-05-08 20:16:40 -0400 |
---|---|---|
committer | Mathias Stearn <mathias@10gen.com> | 2012-05-22 14:56:35 -0400 |
commit | e6617d62f0163595d2fe9257158fa47a3da205b2 (patch) | |
tree | 5f978c34e3b01738b4b76c1bed73e203d3a0c602 | |
parent | 3015a51ef202e2f70e37ebd6c44650cdbb052aab (diff) | |
download | mongo-e6617d62f0163595d2fe9257158fa47a3da205b2.tar.gz |
Support GridFS with fs.chunks sharded on {files_id:1, n:1} SERVER-3746
A try block in dbcommands.cpp will need to be modified when SERVER-5752
is fixed. This comment should serve as a reminder.
-rw-r--r-- | SConscript.smoke | 2 | ||||
-rw-r--r-- | jstests/sharding/gridfs.js | 60 | ||||
-rw-r--r-- | src/mongo/db/dbcommands.cpp | 49 | ||||
-rw-r--r-- | src/mongo/s/commands_public.cpp | 90 |
4 files changed, 187 insertions, 14 deletions
diff --git a/SConscript.smoke b/SConscript.smoke index 06601b08426..0aec052c1da 100644 --- a/SConscript.smoke +++ b/SConscript.smoke @@ -77,7 +77,7 @@ if shellEnv is not None: addSmoketest( "smokeDisk", [ add_exe( "mongo" ), add_exe( "mongod" ), add_exe( "mongodump" ), add_exe( "mongorestore" ) ] ) addSmoketest( "smokeAuth", [ add_exe( "mongo" ), add_exe( "mongod" ) ] ) addSmoketest( "smokeParallel", [ add_exe( "mongo" ), add_exe( "mongod" ) ] ) - addSmoketest( "smokeSharding", [ add_exe("mongo"), add_exe("mongod"), add_exe("mongos") ] ) + addSmoketest( "smokeSharding", [ add_exe("mongo"), add_exe("mongod"), add_exe("mongos"), add_exe('mongofiles') ] ) addSmoketest( "smokeJsPerf", [ add_exe("mongo"), add_exe("mongod") ] ) addSmoketest( "smokeJsSlowNightly", [add_exe("mongo"), add_exe("mongod"), add_exe("mongos") ]) addSmoketest( "smokeJsSlowWeekly", [add_exe("mongo"), add_exe("mongod"), add_exe("mongos") ]) diff --git a/jstests/sharding/gridfs.js b/jstests/sharding/gridfs.js new file mode 100644 index 00000000000..47eb69c123a --- /dev/null +++ b/jstests/sharding/gridfs.js @@ -0,0 +1,60 @@ +// tests gridfs with a sharded fs.chunks collection. + +var test = new ShardingTest({shards: 3, mongos: 1, config: 1, other: {chunksize:1, separateConfig:true}}) + +var mongos = test.s0 + +var d = mongos.getDB("test") + +var filename = "mongod" // A large file we are guaranteed to have + +function reset() { + d.fs.files.drop() + d.fs.chunks.drop() +} + +function testGridFS() { + // this function should be called on a clean db + assert.eq(d.fs.files.count(), 0) + assert.eq(d.fs.chunks.count(), 0) + + var rawmd5 = md5sumFile(filename) + + // upload file (currently calls filemd5 internally) + runMongoProgram("mongofiles", "--port", mongos.port, "put", filename) + + assert.eq(d.fs.files.count(), 1) + var fileObj = d.fs.files.findOne() + print("fileObj: " + tojson(fileObj)) + assert.eq(rawmd5, fileObj.md5) //check that mongofiles inserted the correct md5 + + // Call filemd5 ourself and check results. + var res = d.runCommand({filemd5: fileObj._id}) + print("filemd5 output: " + tojson(res)) + assert(res.ok) + assert.eq(rawmd5, res.md5) + + var numChunks = d.fs.chunks.count({files_id: fileObj._id}) + assert.eq(numChunks, res.numChunks) +} + +print('\n\n\t**** unsharded ****\n\n') +testGridFS() +reset() + +print('\n\n\t**** sharded db, unsharded collection ****\n\n') +test.adminCommand({enablesharding: 'test'}) +testGridFS() +reset() + +print('\n\n\t**** sharded collection on files_id ****\n\n') +test.adminCommand({shardcollection: 'test.fs.chunks', key: {files_id:1}}) +testGridFS() +reset() + +print('\n\n\t**** sharded collection on files_id,n ****\n\n') +test.adminCommand({shardcollection: 'test.fs.chunks', key: {files_id:1, n:1}}) +testGridFS() +reset() + +test.stop() diff --git a/src/mongo/db/dbcommands.cpp b/src/mongo/db/dbcommands.cpp index fc101ee0fcf..d3d515e3979 100644 --- a/src/mongo/db/dbcommands.cpp +++ b/src/mongo/db/dbcommands.cpp @@ -1107,11 +1107,34 @@ namespace mongo { } ns += ".chunks"; // make this an option in jsobj + // Check shard version at startup. + // This will throw before we've done any work if shard version is outdated + Client::Context ctx (ns); + md5digest d; md5_state_t st; md5_init(&st); - BSONObj query = BSON( "files_id" << jsobj["filemd5"] ); + int n = 0; + + bool partialOk = jsobj["partialOk"].trueValue(); + if (partialOk) { + // WARNING: This code depends on the binary layout of md5_state. It will not be + // compatible with different md5 libraries or work correctly in an environment with + // mongod's of different endians. It is ok for mongos to be a different endian since + // it just passes the buffer through to another mongod. + BSONElement stateElem = jsobj["md5state"]; + if (!stateElem.eoo()){ + int len; + const char* data = stateElem.binDataClean(len); + massert(16247, "md5 state not correct size", len == sizeof(st)); + memcpy(&st, data, sizeof(st)); + } + n = jsobj["startAt"].numberInt(); + } + + + BSONObj query = BSON( "files_id" << jsobj["filemd5"] << "n" << GTE << n ); BSONObj sort = BSON( "files_id" << 1 << "n" << 1 ); shared_ptr<Cursor> cursor = NamespaceDetailsTransient::bestGuessCursor(ns.c_str(), @@ -1122,7 +1145,6 @@ namespace mongo { } auto_ptr<ClientCursor> cc (new ClientCursor(QueryOption_NoCursorTimeout, cursor, ns.c_str())); - int n = 0; while ( cursor->ok() ) { if ( ! cursor->matcher()->matchesCurrent( cursor.get() ) ) { log() << "**** NOT MATCHING ****" << endl; @@ -1138,6 +1160,9 @@ namespace mongo { verify(ne.isNumber()); int myn = ne.numberInt(); if ( n != myn ) { + if (partialOk) { + break; // skipped chunk is probably on another shard + } log() << "should have chunk: " << n << " have:" << myn << endl; dumpChunks( ns , query , sort ); uassert( 10040 , "chunks out of order" , n == myn ); @@ -1159,12 +1184,26 @@ namespace mongo { throw; } - if ( ! yield.stillOk() ) { - cc.release(); - uasserted(13281, "File deleted during filemd5 command"); + try { // SERVER-5752 may make this try unnecessary + if ( ! yield.stillOk() ) { // relocks and checks shard version + cc.release(); + if (!partialOk) + uasserted(13281, "File deleted during filemd5 command"); + } + } + catch(SendStaleConfigException&){ + // return partial results. + // Mongos will get the error at the start of the next call if it doesn't update first. + log() << "Config changed during filemd5 - command will resume " << endl; + break; } } + + if (partialOk) + result.appendBinData("md5state", sizeof(st), BinDataGeneral, &st); + + // This must be *after* the capture of md5state since it mutates st md5_finish(&st, d); result.append( "numChunks" , n ); diff --git a/src/mongo/s/commands_public.cpp b/src/mongo/s/commands_public.cpp index b88e2ff843c..2d009c81bc2 100644 --- a/src/mongo/s/commands_public.cpp +++ b/src/mongo/s/commands_public.cpp @@ -794,17 +794,91 @@ namespace mongo { ChunkManagerPtr cm = conf->getChunkManager( fullns ); massert( 13091 , "how could chunk manager be null!" , cm ); - uassert( 13092 , "GridFS chunks collection can only be sharded on files_id", cm->getShardKey().key() == BSON("files_id" << 1)); + if(cm->getShardKey().key() == BSON("files_id" << 1)) { + BSONObj finder = BSON("files_id" << cmdObj.firstElement()); - ChunkPtr chunk = cm->findChunk( BSON("files_id" << cmdObj.firstElement()) ); + map<Shard, BSONObj> resMap; + SHARDED->commandOp(dbName, cmdObj, 0, fullns, finder, resMap); + verify(resMap.size() == 1); // querying on shard key so should only talk to one shard + BSONObj res = resMap.begin()->second; - ShardConnection conn( chunk->getShard() , fullns ); - BSONObj res; - bool ok = conn->runCommand( conf->getName() , cmdObj , res ); - conn.done(); + result.appendElements(res); + return res["ok"].trueValue(); + } + else if (cm->getShardKey().key() == BSON("files_id" << 1 << "n" << 1)) { + int n = 0; + BSONObj lastResult; + + while (true) { + // Theory of operation: Starting with n=0, send filemd5 command to shard + // with that chunk (gridfs chunk not sharding chunk). That shard will then + // compute a partial md5 state (passed in the "md5state" field) for all + // contiguous chunks that it has. When it runs out or hits a discontinuity + // (eg [1,2,7]) it returns what it has done so far. This is repeated as + // long as we keep getting more chunks. The end condition is when we go to + // look for chunk n and it doesn't exist. This means that the file's last + // chunk is n-1, so we return the computed md5 results. + BSONObjBuilder bb; + bb.appendElements(cmdObj); + bb.appendBool("partialOk", true); + bb.append("startAt", n); + if (!lastResult.isEmpty()){ + bb.append(lastResult["md5state"]); + } + BSONObj shardCmd = bb.obj(); - result.appendElements(res); - return ok; + BSONObj finder = BSON("files_id" << cmdObj.firstElement() << "n" << n); + + map<Shard, BSONObj> resMap; + try { + SHARDED->commandOp(dbName, shardCmd, 0, fullns, finder, resMap); + } + catch( DBException& e ){ + //This is handled below and logged + resMap[Shard()] = BSON("errmsg" << e.what() << "ok" << 0); + } + + verify(resMap.size() == 1); // querying on shard key so should only talk to one shard + BSONObj res = resMap.begin()->second; + bool ok = res["ok"].trueValue(); + + if (!ok) { + // Add extra info to make debugging easier + result.append("failedAt", n); + result.append("sentCommand", shardCmd); + BSONForEach(e, res){ + if (!str::equals(e.fieldName(), "errmsg")) + result.append(e); + } + + log() << "Sharded filemd5 failed: " << result.asTempObj() << endl; + + errmsg = string("sharded filemd5 failed because: ") + res["errmsg"].valuestrsafe(); + return false; + } + + uassert(16246, "Shard " + conf->getName() + " is too old to support GridFS sharded by {files_id:1, n:1}", + res.hasField("md5state")); + + lastResult = res; + int nNext = res["numChunks"].numberInt(); + + if (n == nNext){ + // no new data means we've reached the end of the file + result.appendElements(res); + return true; + } + + verify(nNext > n); + n = nNext; + } + + verify(0); + } + + // We could support arbitrary shard keys by sending commands to all shards but I don't think we should + errmsg = "GridFS fs.chunks collection must be sharded on either {files_id:1} or {files_id:1, n:1}"; + return false; } } fileMD5Cmd; |