diff options
-rw-r--r-- | .gitignore | 3 | ||||
-rw-r--r-- | buildscripts/emr/IOUtil.java | 124 | ||||
-rw-r--r-- | buildscripts/emr/MANIFEST.MF | 2 | ||||
-rw-r--r-- | buildscripts/emr/emr.java | 310 | ||||
-rw-r--r-- | buildscripts/emr/emr.py | 225 | ||||
-rw-r--r-- | buildscripts/emr/emrnodesetup.sh | 7 |
6 files changed, 671 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore index adbbb54170d..2fe15080f4f 100644 --- a/.gitignore +++ b/.gitignore @@ -95,6 +95,9 @@ scratch /libmongotestfiles.* /libmongoshellfiles.* + +/emr.jar + # examples /firstExample /secondExample diff --git a/buildscripts/emr/IOUtil.java b/buildscripts/emr/IOUtil.java new file mode 100644 index 00000000000..35cbecbdda2 --- /dev/null +++ b/buildscripts/emr/IOUtil.java @@ -0,0 +1,124 @@ +// IOUtil.java + +import java.io.*; +import java.net.*; + +public class IOUtil { + + public static String urlFileName( String url ) { + int idx = url.lastIndexOf( "/" ); + if ( idx < 0 ) + return url; + return url.substring( idx + 1 ); + } + + public static long pipe( InputStream in , OutputStream out ) + throws IOException { + + long bytes = 0; + + byte[] buf = new byte[2048]; + + while ( true ) { + int x = in.read( buf ); + if ( x < 0 ) + break; + + bytes += x; + out.write( buf , 0 , x ); + } + + return bytes; + } + + public static class PipingThread extends Thread { + public PipingThread( InputStream in , OutputStream out ) { + _in = in; + _out = out; + + _wrote = 0; + } + + public void run() { + try { + _wrote = pipe( _in , _out ); + } + catch ( IOException ioe ) { + ioe.printStackTrace(); + _wrote = -1; + } + } + + public long wrote() { + return _wrote; + } + + long _wrote; + + final InputStream _in; + final OutputStream _out; + } + + public static String readStringFully( InputStream in ) + throws IOException { + + ByteArrayOutputStream bout = new ByteArrayOutputStream(); + pipe( in , bout ); + return new String( bout.toByteArray() , "UTF8" ); + + } + + public static String[] runCommand( String cmd , File dir ) + throws IOException { + + Process p = Runtime.getRuntime().exec( cmd.split( " +" ) , new String[]{} , dir ); + String[] results = new String[]{ IOUtil.readStringFully( p.getInputStream() ) , IOUtil.readStringFully( p.getErrorStream() ) }; + try { + if ( p.waitFor() != 0 ) + throw new RuntimeException( "command failed [" + cmd + "]\n" + results[0] + "\n" + results[1] ); + } + catch ( InterruptedException ie ) { + throw new RuntimeException( "uh oh" ); + } + return results; + } + + + public static void download( String http , File localDir ) + throws IOException { + + File f = localDir; + f.mkdirs(); + + f = new File( f.toString() + File.separator + urlFileName( http ) ); + + System.out.println( "downloading\n\t" + http + "\n\t" + f ); + + if ( f.exists() ) { + System.out.println( "\t already exists" ); + return; + } + + URL url = new URL( http ); + + InputStream in = url.openConnection().getInputStream(); + OutputStream out = new FileOutputStream( f ); + + pipe( in , out ); + + out.close(); + in.close(); + + } + + public static void main( String[] args ) + throws Exception { + + + byte[] data = new byte[]{ 'e' , 'r' , 'h' , 0 }; + System.out.write( data ); + System.out.println( "yo" ); + + } + +} diff --git a/buildscripts/emr/MANIFEST.MF b/buildscripts/emr/MANIFEST.MF new file mode 100644 index 00000000000..4a5b3f96691 --- /dev/null +++ b/buildscripts/emr/MANIFEST.MF @@ -0,0 +1,2 @@ +Manifest-Version: 1.0 +Main-Class: emr diff --git a/buildscripts/emr/emr.java b/buildscripts/emr/emr.java new file mode 100644 index 00000000000..9540d99480c --- /dev/null +++ b/buildscripts/emr/emr.java @@ -0,0 +1,310 @@ +// emr.java + +import java.io.*; +import java.util.*; +import java.net.*; + +import org.apache.hadoop.conf.*; +import org.apache.hadoop.io.*; +import org.apache.hadoop.mapred.*; +import org.apache.hadoop.fs.*; + + +public class emr { + + static class MongoSuite { + String mongo; + String code; + String workingDir; + + String suite; + + void copy( MongoSuite c ) { + mongo = c.mongo; + code = c.code; + workingDir = c.workingDir; + suite = c.suite; + } + + void downloadTo( File localDir ) + throws IOException { + IOUtil.download( mongo , localDir ); + IOUtil.download( code , localDir ); + } + + boolean runTest() + throws IOException { + + // mkdir + File dir = new File( workingDir , suite ); + dir.mkdirs(); + + // download + downloadTo( dir ); + + + // explode + IOUtil.runCommand( "tar zxvf " + IOUtil.urlFileName( code ) , dir ); + String[] res = IOUtil.runCommand( "tar zxvf " + IOUtil.urlFileName( mongo ) , dir ); + for ( String x : res[0].split( "\n" ) ) { + if ( x.indexOf( "/bin/" ) < 0 ) + continue; + File f = new File( dir.toString() , x ); + if ( ! f.renameTo( new File( dir , IOUtil.urlFileName( x ) ) ) ) + throw new RuntimeException( "rename failed" ); + } + + + Process p = Runtime.getRuntime().exec( new String[]{ "/usr/bin/python" , "buildscripts/smoke.py" , suite } , new String[]{} , dir ); + List<Thread> threads = new ArrayList<Thread>(); + threads.add( new IOUtil.PipingThread( p.getInputStream() , System.out ) ); + threads.add( new IOUtil.PipingThread( p.getErrorStream() , System.out ) ); + + for ( Thread t : threads ) t.start(); + try { + for ( Thread t : threads ) t.join(); + int rc = p.waitFor(); + System.out.println( "\n\nResult: " + rc ); + + return rc == 0; + } + catch ( InterruptedException ie ) { + throw new RuntimeException( "sad" , ie ); + } + + } + + public void readFields( DataInput in ) + throws IOException { + mongo = in.readUTF(); + code = in.readUTF(); + workingDir = in.readUTF(); + suite = in.readUTF(); + } + + public void write( final DataOutput out ) + throws IOException { + out.writeUTF( mongo ); + out.writeUTF( code ); + out.writeUTF( workingDir ); + out.writeUTF( suite ); + } + + public String toString() { + return "mongo: " + mongo + " code: " + code + " suite: " + suite + " workingDir: " + workingDir; + } + } + + public static class Map implements Mapper<Text, MongoSuite, Text, IntWritable> { + + public void map( Text key, MongoSuite value, OutputCollector<Text,IntWritable> output, Reporter reporter ) + throws IOException { + + System.out.println( "key: " + key ); + System.out.println( "value: " + value ); + + long start = System.currentTimeMillis(); + boolean passed = value.runTest(); + long end = System.currentTimeMillis(); + + output.collect( new Text( passed ? "passed" : "failed" ) , new IntWritable( 1 ) ); + output.collect( new Text( key.toString() + "-time-seconds" ) , new IntWritable( (int)((end-start)/(1000)) ) ); + output.collect( new Text( key.toString() + "-passed" ) , new IntWritable( passed ? 1 : 0 ) ); + + String ip = IOUtil.readStringFully( new URL( "http://myip.10gen.com/" ).openConnection().getInputStream() ); + ip = ip.substring( ip.indexOf( ":" ) + 1 ).trim(); + output.collect( new Text( ip ) , new IntWritable(1) ); + + } + + public void configure(JobConf job) {} + public void close(){} + } + + public static class Reduce implements Reducer<Text, IntWritable, Text, IntWritable> { + + public void reduce( Text key, Iterator<IntWritable> values, OutputCollector<Text,IntWritable> output , Reporter reporter ) + throws IOException { + + int sum = 0; + while ( values.hasNext() ) { + sum += values.next().get(); + } + output.collect( key , new IntWritable( sum ) ); + } + + public void configure(JobConf job) {} + public void close(){} + } + + public static class MySplit implements InputSplit , Writable { + + public MySplit(){ + } + + MySplit( MongoSuite config , int length ) { + _config = config; + _length = length; + } + + public long getLength() { + return _length; + } + + public String[] getLocations() { + return new String[0]; + } + + public void readFields( DataInput in ) + throws IOException { + _config = new MongoSuite(); + _config.readFields( in ); + _length = in.readInt(); + } + + public void write( final DataOutput out ) + throws IOException { + _config.write( out ); + out.writeInt( _length ); + } + + MongoSuite _config; + int _length; + } + + public static class InputMagic implements InputFormat<Text,MongoSuite> { + + public RecordReader<Text,MongoSuite> getRecordReader( InputSplit split, JobConf job , Reporter reporter ){ + final MySplit s = (MySplit)split; + return new RecordReader<Text,MongoSuite>() { + + public void close(){} + + public Text createKey() { + return new Text(); + } + + public MongoSuite createValue() { + return new MongoSuite(); + } + + public long getPos() { + return _seen ? 1 : 0; + } + + public float getProgress() { + return getPos(); + } + + public boolean next( Text key , MongoSuite value ) { + key.set( s._config.suite ); + value.copy( s._config ); + + + boolean x = _seen; + _seen = true; + return !x; + } + + boolean _seen = false; + }; + } + + public InputSplit[] getSplits( JobConf job , int numSplits ){ + String[] pcs = job.get( "suites" ).split(","); + InputSplit[] splits = new InputSplit[pcs.length]; + for ( int i=0; i<splits.length; i++ ) { + MongoSuite c = new MongoSuite(); + c.suite = pcs[i]; + + c.mongo = job.get( "mongo" ); + c.code = job.get( "code" ); + c.workingDir = job.get( "workingDir" ); + + splits[i] = new MySplit( c , 100 /* XXX */); + } + return splits; + } + + public void validateInput(JobConf job){} + + + } + + /** + * args + * mongo tgz + * code tgz + * output path + * tests to run ? + */ + + public static void main( String[] args ) throws Exception{ + + JobConf conf = new JobConf(); + conf.setJarByClass(emr.class); + + String workingDir = "/data/db/emr/"; + + // parse args + + int pos = 0; + for ( ; pos < args.length; pos++ ) { + if ( ! args[pos].startsWith( "--" ) ) + break; + + String arg = args[pos].substring(2); + if ( arg.equals( "workingDir" ) ) { + workingDir = args[++pos]; + } + else { + System.err.println( "unknown arg: " + arg ); + throw new RuntimeException( "unknown arg: " + arg ); + } + } + + String mongo = args[pos++]; + String code = args[pos++]; + String output = args[pos++]; + + String suites = ""; + for ( ; pos < args.length; pos++ ) { + if ( suites.length() > 0 ) + suites += ","; + suites += args[pos]; + } + + if ( suites.length() == 0 ) + throw new RuntimeException( "no suites" ); + + System.out.println( "workingDir:\t" + workingDir ); + System.out.println( "mongo:\t" + mongo ); + System.out.println( "code:\t " + code ); + System.out.println( "output\t: " + output ); + System.out.println( "suites\t: " + suites ); + + // main hadoop set + conf.set( "mongo" , mongo ); + conf.set( "code" , code ); + conf.set( "workingDir" , workingDir ); + conf.set( "suites" , suites ); + + conf.set( "mapred.map.tasks" , "1" ); + conf.setLong( "mapred.task.timeout" , 3600 * 1000 /* 1 hour */); + + conf.setOutputKeyClass(Text.class); + conf.setOutputValueClass(IntWritable.class); + + conf.setMapperClass(Map.class); + conf.setReducerClass(Reduce.class); + + conf.setInputFormat(InputMagic.class); + conf.setOutputFormat(TextOutputFormat.class); + + FileOutputFormat.setOutputPath(conf, new Path(output) ); + + // actually run + + JobClient.runJob( conf ); + } +} diff --git a/buildscripts/emr/emr.py b/buildscripts/emr/emr.py new file mode 100644 index 00000000000..727b4519992 --- /dev/null +++ b/buildscripts/emr/emr.py @@ -0,0 +1,225 @@ + +import os +import sys +import shutil +import datetime +import time +import subprocess +import urllib +import urllib2 +import json +import pprint + +import boto +import simples3 + +def findSettingsSetup(): + sys.path.append( "./" ) + sys.path.append( "../" ) + sys.path.append( "../../" ) + sys.path.append( "../../../" ) + +findSettingsSetup() +import settings +import buildscripts.utils as utils +import buildscripts.smoke as smoke + +bucket = simples3.S3Bucket( settings.emr_bucket , settings.emr_id , settings.emr_key ) + +def _get_status(): + + def gh( cmds ): + txt = "" + for cmd in cmds: + res = utils.execsys( "git " + cmd ) + txt = txt + res[0] + res[1] + return utils.md5string( txt ) + + return "%s-%s" % ( utils.execsys( "git describe" )[0].strip(), gh( [ "diff" , "status" ] ) ) + +def _get_most_recent_tgz( prefix ): + # this is icky, but works for now + all = [] + for x in os.listdir( "." ): + if not x.startswith( prefix ) or not x.endswith( ".tgz" ): + continue + all.append( ( x , os.stat(x).st_mtime ) ) + + if len(all) == 0: + raise Exception( "can't find file with prefix: " + prefix ) + + all.sort( lambda x,y: int(y[1] - x[1]) ) + + return all[0][0] + +def make_tarball(): + + m = _get_most_recent_tgz( "mongodb-" ) + c = "test-code-emr.tgz" + utils.execsys( "tar zcf %s src jstests buildscripts" % c ) + return ( m , c ) + +def _put_ine( bucket , local , remote ): + print( "going to put\n\t%s\n\thttp://%s.s3.amazonaws.com/%s" % ( local , settings.emr_bucket , remote ) ) + + for x in bucket.listdir( prefix=remote ): + print( "\talready existed" ) + return remote + + bucket.put( remote , open( local , "rb" ).read() , acl="public-read" ) + return remote + +def build_jar(): + root = "build/emrjar" + src = "buildscripts/emr" + + if os.path.exists( root ): + shutil.rmtree( root ) + os.makedirs( root ) + + for x in os.listdir( src ): + if not x.endswith( ".java" ): + continue + shutil.copyfile( src + "/" + x , root + "/" + x ) + shutil.copyfile( src + "/MANIFEST.MF" , root + "/MANIFEST.FM" ) + + for x in os.listdir( root ): + if x.endswith( ".java" ): + if subprocess.call( [ "javac" , x ] , cwd=root) != 0: + raise Exception( "compiled failed" ) + + args = [ "jar" , "-cfm" , "emr.jar" , "MANIFEST.FM" ] + for x in os.listdir( root ): + if x.endswith( ".class" ): + args.append( x ) + subprocess.call( args , cwd=root ) + + shutil.copyfile( root + "/emr.jar" , "emr.jar" ) + + return "emr.jar" + +def push(): + mongo , test_code = make_tarball() + print( mongo ) + print( test_code ) + + root = "emr/%s/%s" % ( datetime.date.today().strftime("%Y-%m-%d") , os.uname()[0].lower() ) + + def make_long_name(local): + pcs = local.rpartition( "." ) + return "%s/%s-%s.%s" % ( root , pcs[0] , _get_status() , pcs[2] ) + + mongo = _put_ine( bucket , mongo , make_long_name( mongo ) ) + test_code = _put_ine( bucket , test_code , make_long_name( test_code ) ) + + jar = build_jar() + jar = _put_ine( bucket , jar , make_long_name( jar ) ) + + setup = "buildscripts/emr/emrnodesetup.sh" + setup = _put_ine( bucket , setup , make_long_name( setup ) ) + + return mongo , test_code , jar , setup + +def run_tests( things , tests ): + if len(tests) == 0: + raise Exception( "no tests" ) + oldNum = len(tests) + tests = fix_suites( tests ) + print( "tests expanded from %d to %d" % ( oldNum , len(tests) ) ) + + print( "things:%s\ntests:%s\n" % ( things , tests ) ) + + emr = boto.connect_emr( settings.emr_id , settings.emr_key ) + + def http(path): + return "http://%s.s3.amazonaws.com/%s" % ( settings.emr_bucket , path ) + + run_s3_path = "emr/%s/%s/%s/" % ( os.getenv( "USER" ) , + os.getenv( "HOST" ) , + datetime.datetime.today().strftime( "%Y%m%d-%H%M" ) ) + + run_s3_root = "s3n://%s/%s/" % ( settings.emr_bucket , run_s3_path ) + + out = run_s3_root + "out" + logs = run_s3_root + "logs" + + jar="s3n://%s/%s" % ( settings.emr_bucket , things[2] ) + step_args=[ http(things[0]) , http(things[1]) , out , ",".join(tests) ] + + step = boto.emr.step.JarStep( "emr main" , jar=jar,step_args=step_args ) + print( "jar:%s\nargs:%s" % ( jar , step_args ) ) + + setup = boto.emr.BootstrapAction( "setup" , "s3n://%s/%s" % ( settings.emr_bucket , things[3] ) , [] ) + + jobid = emr.run_jobflow( name = "Mongo EMR for %s from %s" % ( os.getenv( "USER" ) , os.getenv( "HOST" ) ) , + ec2_keyname = "emr1" , + slave_instance_type = "m1.large" , + ami_version = "latest" , + num_instances=5 , + log_uri = logs , + bootstrap_actions = [ setup ] , + steps = [ step ] ) + + + print( "%s jobid: %s" % ( datetime.datetime.today() , jobid ) ) + + while ( True ): + flow = emr.describe_jobflow( jobid ) + print( "%s status: %s" % ( datetime.datetime.today() , flow.state ) ) + if flow.state == "COMPLETED" or flow.state == "FAILED": + break + time.sleep(30) + + syncdir = "build/emrout/" + jobid + "/" + sync_s3( run_s3_path , syncdir ) + print("output in: build/emrout/" + jobid + "/" ) + +def sync_s3( remote_dir , local_dir ): + for x in bucket.listdir( remote_dir ): + out = local_dir + "/" + x[0] + + if os.path.exists( out ) and x[2].find( utils.md5sum( out ) ) >= 0: + continue + + dir = out.rpartition( "/" )[0] + if not os.path.exists( dir ): + os.makedirs( dir ) + + thing = bucket.get( x[0] ) + open( out , "wb" ).write( thing.read() ) + +def fix_suites( suites ): + fixed = [] + for name,x in smoke.expand_suites( suites , False ): + idx = name.find( "/jstests" ) + if idx >= 0: + name = name[idx+1:] + fixed.append( name ) + return fixed + +if __name__ == "__main__": + if len(sys.argv) == 1: + print( "need an arg" ) + + elif sys.argv[1] == "tarball": + make_tarball() + elif sys.argv[1] == "jar": + build_jar() + elif sys.argv[1] == "push": + print( push() ) + + elif sys.argv[1] == "sync": + sync_s3( sys.argv[2] , sys.argv[3] ) + + elif sys.argv[1] == "fix_suites": + for x in fix_suites( sys.argv[2:] ): + print(x) + + elif sys.argv[1] == "full": + things = push() + run_tests( things , sys.argv[2:] ) + + else: + things = push() + run_tests( things , sys.argv[1:] ) + diff --git a/buildscripts/emr/emrnodesetup.sh b/buildscripts/emr/emrnodesetup.sh new file mode 100644 index 00000000000..546becf3e27 --- /dev/null +++ b/buildscripts/emr/emrnodesetup.sh @@ -0,0 +1,7 @@ +#!/bin/sh + +sudo mkdir /mnt/data +sudo ln -s /mnt/data /data +sudo chown hadoop /mnt/data + +sudo easy_install pymongo |