summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMathias Stearn <mathias@10gen.com>2012-05-08 20:16:40 -0400
committerMathias Stearn <mathias@10gen.com>2012-05-22 14:56:35 -0400
commite6617d62f0163595d2fe9257158fa47a3da205b2 (patch)
tree5f978c34e3b01738b4b76c1bed73e203d3a0c602
parent3015a51ef202e2f70e37ebd6c44650cdbb052aab (diff)
downloadmongo-e6617d62f0163595d2fe9257158fa47a3da205b2.tar.gz
Support GridFS with fs.chunks sharded on {files_id:1, n:1} SERVER-3746
A try block in dbcommands.cpp will need to be modified when SERVER-5752 is fixed. This comment should serve as a reminder.
-rw-r--r--SConscript.smoke2
-rw-r--r--jstests/sharding/gridfs.js60
-rw-r--r--src/mongo/db/dbcommands.cpp49
-rw-r--r--src/mongo/s/commands_public.cpp90
4 files changed, 187 insertions, 14 deletions
diff --git a/SConscript.smoke b/SConscript.smoke
index 06601b08426..0aec052c1da 100644
--- a/SConscript.smoke
+++ b/SConscript.smoke
@@ -77,7 +77,7 @@ if shellEnv is not None:
addSmoketest( "smokeDisk", [ add_exe( "mongo" ), add_exe( "mongod" ), add_exe( "mongodump" ), add_exe( "mongorestore" ) ] )
addSmoketest( "smokeAuth", [ add_exe( "mongo" ), add_exe( "mongod" ) ] )
addSmoketest( "smokeParallel", [ add_exe( "mongo" ), add_exe( "mongod" ) ] )
- addSmoketest( "smokeSharding", [ add_exe("mongo"), add_exe("mongod"), add_exe("mongos") ] )
+ addSmoketest( "smokeSharding", [ add_exe("mongo"), add_exe("mongod"), add_exe("mongos"), add_exe('mongofiles') ] )
addSmoketest( "smokeJsPerf", [ add_exe("mongo"), add_exe("mongod") ] )
addSmoketest( "smokeJsSlowNightly", [add_exe("mongo"), add_exe("mongod"), add_exe("mongos") ])
addSmoketest( "smokeJsSlowWeekly", [add_exe("mongo"), add_exe("mongod"), add_exe("mongos") ])
diff --git a/jstests/sharding/gridfs.js b/jstests/sharding/gridfs.js
new file mode 100644
index 00000000000..47eb69c123a
--- /dev/null
+++ b/jstests/sharding/gridfs.js
@@ -0,0 +1,60 @@
+// tests gridfs with a sharded fs.chunks collection.
+
+var test = new ShardingTest({shards: 3, mongos: 1, config: 1, other: {chunksize:1, separateConfig:true}})
+
+var mongos = test.s0
+
+var d = mongos.getDB("test")
+
+var filename = "mongod" // A large file we are guaranteed to have
+
+function reset() {
+ d.fs.files.drop()
+ d.fs.chunks.drop()
+}
+
+function testGridFS() {
+ // this function should be called on a clean db
+ assert.eq(d.fs.files.count(), 0)
+ assert.eq(d.fs.chunks.count(), 0)
+
+ var rawmd5 = md5sumFile(filename)
+
+ // upload file (currently calls filemd5 internally)
+ runMongoProgram("mongofiles", "--port", mongos.port, "put", filename)
+
+ assert.eq(d.fs.files.count(), 1)
+ var fileObj = d.fs.files.findOne()
+ print("fileObj: " + tojson(fileObj))
+ assert.eq(rawmd5, fileObj.md5) //check that mongofiles inserted the correct md5
+
+ // Call filemd5 ourself and check results.
+ var res = d.runCommand({filemd5: fileObj._id})
+ print("filemd5 output: " + tojson(res))
+ assert(res.ok)
+ assert.eq(rawmd5, res.md5)
+
+ var numChunks = d.fs.chunks.count({files_id: fileObj._id})
+ assert.eq(numChunks, res.numChunks)
+}
+
+print('\n\n\t**** unsharded ****\n\n')
+testGridFS()
+reset()
+
+print('\n\n\t**** sharded db, unsharded collection ****\n\n')
+test.adminCommand({enablesharding: 'test'})
+testGridFS()
+reset()
+
+print('\n\n\t**** sharded collection on files_id ****\n\n')
+test.adminCommand({shardcollection: 'test.fs.chunks', key: {files_id:1}})
+testGridFS()
+reset()
+
+print('\n\n\t**** sharded collection on files_id,n ****\n\n')
+test.adminCommand({shardcollection: 'test.fs.chunks', key: {files_id:1, n:1}})
+testGridFS()
+reset()
+
+test.stop()
diff --git a/src/mongo/db/dbcommands.cpp b/src/mongo/db/dbcommands.cpp
index fc101ee0fcf..d3d515e3979 100644
--- a/src/mongo/db/dbcommands.cpp
+++ b/src/mongo/db/dbcommands.cpp
@@ -1107,11 +1107,34 @@ namespace mongo {
}
ns += ".chunks"; // make this an option in jsobj
+ // Check shard version at startup.
+ // This will throw before we've done any work if shard version is outdated
+ Client::Context ctx (ns);
+
md5digest d;
md5_state_t st;
md5_init(&st);
- BSONObj query = BSON( "files_id" << jsobj["filemd5"] );
+ int n = 0;
+
+ bool partialOk = jsobj["partialOk"].trueValue();
+ if (partialOk) {
+ // WARNING: This code depends on the binary layout of md5_state. It will not be
+ // compatible with different md5 libraries or work correctly in an environment with
+ // mongod's of different endians. It is ok for mongos to be a different endian since
+ // it just passes the buffer through to another mongod.
+ BSONElement stateElem = jsobj["md5state"];
+ if (!stateElem.eoo()){
+ int len;
+ const char* data = stateElem.binDataClean(len);
+ massert(16247, "md5 state not correct size", len == sizeof(st));
+ memcpy(&st, data, sizeof(st));
+ }
+ n = jsobj["startAt"].numberInt();
+ }
+
+
+ BSONObj query = BSON( "files_id" << jsobj["filemd5"] << "n" << GTE << n );
BSONObj sort = BSON( "files_id" << 1 << "n" << 1 );
shared_ptr<Cursor> cursor = NamespaceDetailsTransient::bestGuessCursor(ns.c_str(),
@@ -1122,7 +1145,6 @@ namespace mongo {
}
auto_ptr<ClientCursor> cc (new ClientCursor(QueryOption_NoCursorTimeout, cursor, ns.c_str()));
- int n = 0;
while ( cursor->ok() ) {
if ( ! cursor->matcher()->matchesCurrent( cursor.get() ) ) {
log() << "**** NOT MATCHING ****" << endl;
@@ -1138,6 +1160,9 @@ namespace mongo {
verify(ne.isNumber());
int myn = ne.numberInt();
if ( n != myn ) {
+ if (partialOk) {
+ break; // skipped chunk is probably on another shard
+ }
log() << "should have chunk: " << n << " have:" << myn << endl;
dumpChunks( ns , query , sort );
uassert( 10040 , "chunks out of order" , n == myn );
@@ -1159,12 +1184,26 @@ namespace mongo {
throw;
}
- if ( ! yield.stillOk() ) {
- cc.release();
- uasserted(13281, "File deleted during filemd5 command");
+ try { // SERVER-5752 may make this try unnecessary
+ if ( ! yield.stillOk() ) { // relocks and checks shard version
+ cc.release();
+ if (!partialOk)
+ uasserted(13281, "File deleted during filemd5 command");
+ }
+ }
+ catch(SendStaleConfigException&){
+ // return partial results.
+ // Mongos will get the error at the start of the next call if it doesn't update first.
+ log() << "Config changed during filemd5 - command will resume " << endl;
+ break;
}
}
+
+ if (partialOk)
+ result.appendBinData("md5state", sizeof(st), BinDataGeneral, &st);
+
+ // This must be *after* the capture of md5state since it mutates st
md5_finish(&st, d);
result.append( "numChunks" , n );
diff --git a/src/mongo/s/commands_public.cpp b/src/mongo/s/commands_public.cpp
index b88e2ff843c..2d009c81bc2 100644
--- a/src/mongo/s/commands_public.cpp
+++ b/src/mongo/s/commands_public.cpp
@@ -794,17 +794,91 @@ namespace mongo {
ChunkManagerPtr cm = conf->getChunkManager( fullns );
massert( 13091 , "how could chunk manager be null!" , cm );
- uassert( 13092 , "GridFS chunks collection can only be sharded on files_id", cm->getShardKey().key() == BSON("files_id" << 1));
+ if(cm->getShardKey().key() == BSON("files_id" << 1)) {
+ BSONObj finder = BSON("files_id" << cmdObj.firstElement());
- ChunkPtr chunk = cm->findChunk( BSON("files_id" << cmdObj.firstElement()) );
+ map<Shard, BSONObj> resMap;
+ SHARDED->commandOp(dbName, cmdObj, 0, fullns, finder, resMap);
+ verify(resMap.size() == 1); // querying on shard key so should only talk to one shard
+ BSONObj res = resMap.begin()->second;
- ShardConnection conn( chunk->getShard() , fullns );
- BSONObj res;
- bool ok = conn->runCommand( conf->getName() , cmdObj , res );
- conn.done();
+ result.appendElements(res);
+ return res["ok"].trueValue();
+ }
+ else if (cm->getShardKey().key() == BSON("files_id" << 1 << "n" << 1)) {
+ int n = 0;
+ BSONObj lastResult;
+
+ while (true) {
+ // Theory of operation: Starting with n=0, send filemd5 command to shard
+ // with that chunk (gridfs chunk not sharding chunk). That shard will then
+ // compute a partial md5 state (passed in the "md5state" field) for all
+ // contiguous chunks that it has. When it runs out or hits a discontinuity
+ // (eg [1,2,7]) it returns what it has done so far. This is repeated as
+ // long as we keep getting more chunks. The end condition is when we go to
+ // look for chunk n and it doesn't exist. This means that the file's last
+ // chunk is n-1, so we return the computed md5 results.
+ BSONObjBuilder bb;
+ bb.appendElements(cmdObj);
+ bb.appendBool("partialOk", true);
+ bb.append("startAt", n);
+ if (!lastResult.isEmpty()){
+ bb.append(lastResult["md5state"]);
+ }
+ BSONObj shardCmd = bb.obj();
- result.appendElements(res);
- return ok;
+ BSONObj finder = BSON("files_id" << cmdObj.firstElement() << "n" << n);
+
+ map<Shard, BSONObj> resMap;
+ try {
+ SHARDED->commandOp(dbName, shardCmd, 0, fullns, finder, resMap);
+ }
+ catch( DBException& e ){
+ //This is handled below and logged
+ resMap[Shard()] = BSON("errmsg" << e.what() << "ok" << 0);
+ }
+
+ verify(resMap.size() == 1); // querying on shard key so should only talk to one shard
+ BSONObj res = resMap.begin()->second;
+ bool ok = res["ok"].trueValue();
+
+ if (!ok) {
+ // Add extra info to make debugging easier
+ result.append("failedAt", n);
+ result.append("sentCommand", shardCmd);
+ BSONForEach(e, res){
+ if (!str::equals(e.fieldName(), "errmsg"))
+ result.append(e);
+ }
+
+ log() << "Sharded filemd5 failed: " << result.asTempObj() << endl;
+
+ errmsg = string("sharded filemd5 failed because: ") + res["errmsg"].valuestrsafe();
+ return false;
+ }
+
+ uassert(16246, "Shard " + conf->getName() + " is too old to support GridFS sharded by {files_id:1, n:1}",
+ res.hasField("md5state"));
+
+ lastResult = res;
+ int nNext = res["numChunks"].numberInt();
+
+ if (n == nNext){
+ // no new data means we've reached the end of the file
+ result.appendElements(res);
+ return true;
+ }
+
+ verify(nNext > n);
+ n = nNext;
+ }
+
+ verify(0);
+ }
+
+ // We could support arbitrary shard keys by sending commands to all shards but I don't think we should
+ errmsg = "GridFS fs.chunks collection must be sharded on either {files_id:1} or {files_id:1, n:1}";
+ return false;
}
} fileMD5Cmd;