summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.gitignore3
-rw-r--r--buildscripts/emr/IOUtil.java124
-rw-r--r--buildscripts/emr/MANIFEST.MF2
-rw-r--r--buildscripts/emr/emr.java310
-rw-r--r--buildscripts/emr/emr.py225
-rw-r--r--buildscripts/emr/emrnodesetup.sh7
6 files changed, 671 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore
index adbbb54170d..2fe15080f4f 100644
--- a/.gitignore
+++ b/.gitignore
@@ -95,6 +95,9 @@ scratch
/libmongotestfiles.*
/libmongoshellfiles.*
+
+/emr.jar
+
# examples
/firstExample
/secondExample
diff --git a/buildscripts/emr/IOUtil.java b/buildscripts/emr/IOUtil.java
new file mode 100644
index 00000000000..35cbecbdda2
--- /dev/null
+++ b/buildscripts/emr/IOUtil.java
@@ -0,0 +1,124 @@
+// IOUtil.java
+
+import java.io.*;
+import java.net.*;
+
+public class IOUtil {
+
+ public static String urlFileName( String url ) {
+ int idx = url.lastIndexOf( "/" );
+ if ( idx < 0 )
+ return url;
+ return url.substring( idx + 1 );
+ }
+
+ public static long pipe( InputStream in , OutputStream out )
+ throws IOException {
+
+ long bytes = 0;
+
+ byte[] buf = new byte[2048];
+
+ while ( true ) {
+ int x = in.read( buf );
+ if ( x < 0 )
+ break;
+
+ bytes += x;
+ out.write( buf , 0 , x );
+ }
+
+ return bytes;
+ }
+
+ public static class PipingThread extends Thread {
+ public PipingThread( InputStream in , OutputStream out ) {
+ _in = in;
+ _out = out;
+
+ _wrote = 0;
+ }
+
+ public void run() {
+ try {
+ _wrote = pipe( _in , _out );
+ }
+ catch ( IOException ioe ) {
+ ioe.printStackTrace();
+ _wrote = -1;
+ }
+ }
+
+ public long wrote() {
+ return _wrote;
+ }
+
+ long _wrote;
+
+ final InputStream _in;
+ final OutputStream _out;
+ }
+
+ public static String readStringFully( InputStream in )
+ throws IOException {
+
+ ByteArrayOutputStream bout = new ByteArrayOutputStream();
+ pipe( in , bout );
+ return new String( bout.toByteArray() , "UTF8" );
+
+ }
+
+ public static String[] runCommand( String cmd , File dir )
+ throws IOException {
+
+ Process p = Runtime.getRuntime().exec( cmd.split( " +" ) , new String[]{} , dir );
+ String[] results = new String[]{ IOUtil.readStringFully( p.getInputStream() ) , IOUtil.readStringFully( p.getErrorStream() ) };
+ try {
+ if ( p.waitFor() != 0 )
+ throw new RuntimeException( "command failed [" + cmd + "]\n" + results[0] + "\n" + results[1] );
+ }
+ catch ( InterruptedException ie ) {
+ throw new RuntimeException( "uh oh" );
+ }
+ return results;
+ }
+
+
+ public static void download( String http , File localDir )
+ throws IOException {
+
+ File f = localDir;
+ f.mkdirs();
+
+ f = new File( f.toString() + File.separator + urlFileName( http ) );
+
+ System.out.println( "downloading\n\t" + http + "\n\t" + f );
+
+ if ( f.exists() ) {
+ System.out.println( "\t already exists" );
+ return;
+ }
+
+ URL url = new URL( http );
+
+ InputStream in = url.openConnection().getInputStream();
+ OutputStream out = new FileOutputStream( f );
+
+ pipe( in , out );
+
+ out.close();
+ in.close();
+
+ }
+
+ public static void main( String[] args )
+ throws Exception {
+
+
+ byte[] data = new byte[]{ 'e' , 'r' , 'h' , 0 };
+ System.out.write( data );
+ System.out.println( "yo" );
+
+ }
+
+}
diff --git a/buildscripts/emr/MANIFEST.MF b/buildscripts/emr/MANIFEST.MF
new file mode 100644
index 00000000000..4a5b3f96691
--- /dev/null
+++ b/buildscripts/emr/MANIFEST.MF
@@ -0,0 +1,2 @@
+Manifest-Version: 1.0
+Main-Class: emr
diff --git a/buildscripts/emr/emr.java b/buildscripts/emr/emr.java
new file mode 100644
index 00000000000..9540d99480c
--- /dev/null
+++ b/buildscripts/emr/emr.java
@@ -0,0 +1,310 @@
+// emr.java
+
+import java.io.*;
+import java.util.*;
+import java.net.*;
+
+import org.apache.hadoop.conf.*;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.fs.*;
+
+
+public class emr {
+
+ static class MongoSuite {
+ String mongo;
+ String code;
+ String workingDir;
+
+ String suite;
+
+ void copy( MongoSuite c ) {
+ mongo = c.mongo;
+ code = c.code;
+ workingDir = c.workingDir;
+ suite = c.suite;
+ }
+
+ void downloadTo( File localDir )
+ throws IOException {
+ IOUtil.download( mongo , localDir );
+ IOUtil.download( code , localDir );
+ }
+
+ boolean runTest()
+ throws IOException {
+
+ // mkdir
+ File dir = new File( workingDir , suite );
+ dir.mkdirs();
+
+ // download
+ downloadTo( dir );
+
+
+ // explode
+ IOUtil.runCommand( "tar zxvf " + IOUtil.urlFileName( code ) , dir );
+ String[] res = IOUtil.runCommand( "tar zxvf " + IOUtil.urlFileName( mongo ) , dir );
+ for ( String x : res[0].split( "\n" ) ) {
+ if ( x.indexOf( "/bin/" ) < 0 )
+ continue;
+ File f = new File( dir.toString() , x );
+ if ( ! f.renameTo( new File( dir , IOUtil.urlFileName( x ) ) ) )
+ throw new RuntimeException( "rename failed" );
+ }
+
+
+ Process p = Runtime.getRuntime().exec( new String[]{ "/usr/bin/python" , "buildscripts/smoke.py" , suite } , new String[]{} , dir );
+ List<Thread> threads = new ArrayList<Thread>();
+ threads.add( new IOUtil.PipingThread( p.getInputStream() , System.out ) );
+ threads.add( new IOUtil.PipingThread( p.getErrorStream() , System.out ) );
+
+ for ( Thread t : threads ) t.start();
+ try {
+ for ( Thread t : threads ) t.join();
+ int rc = p.waitFor();
+ System.out.println( "\n\nResult: " + rc );
+
+ return rc == 0;
+ }
+ catch ( InterruptedException ie ) {
+ throw new RuntimeException( "sad" , ie );
+ }
+
+ }
+
+ public void readFields( DataInput in )
+ throws IOException {
+ mongo = in.readUTF();
+ code = in.readUTF();
+ workingDir = in.readUTF();
+ suite = in.readUTF();
+ }
+
+ public void write( final DataOutput out )
+ throws IOException {
+ out.writeUTF( mongo );
+ out.writeUTF( code );
+ out.writeUTF( workingDir );
+ out.writeUTF( suite );
+ }
+
+ public String toString() {
+ return "mongo: " + mongo + " code: " + code + " suite: " + suite + " workingDir: " + workingDir;
+ }
+ }
+
+ public static class Map implements Mapper<Text, MongoSuite, Text, IntWritable> {
+
+ public void map( Text key, MongoSuite value, OutputCollector<Text,IntWritable> output, Reporter reporter )
+ throws IOException {
+
+ System.out.println( "key: " + key );
+ System.out.println( "value: " + value );
+
+ long start = System.currentTimeMillis();
+ boolean passed = value.runTest();
+ long end = System.currentTimeMillis();
+
+ output.collect( new Text( passed ? "passed" : "failed" ) , new IntWritable( 1 ) );
+ output.collect( new Text( key.toString() + "-time-seconds" ) , new IntWritable( (int)((end-start)/(1000)) ) );
+ output.collect( new Text( key.toString() + "-passed" ) , new IntWritable( passed ? 1 : 0 ) );
+
+ String ip = IOUtil.readStringFully( new URL( "http://myip.10gen.com/" ).openConnection().getInputStream() );
+ ip = ip.substring( ip.indexOf( ":" ) + 1 ).trim();
+ output.collect( new Text( ip ) , new IntWritable(1) );
+
+ }
+
+ public void configure(JobConf job) {}
+ public void close(){}
+ }
+
+ public static class Reduce implements Reducer<Text, IntWritable, Text, IntWritable> {
+
+ public void reduce( Text key, Iterator<IntWritable> values, OutputCollector<Text,IntWritable> output , Reporter reporter )
+ throws IOException {
+
+ int sum = 0;
+ while ( values.hasNext() ) {
+ sum += values.next().get();
+ }
+ output.collect( key , new IntWritable( sum ) );
+ }
+
+ public void configure(JobConf job) {}
+ public void close(){}
+ }
+
+ public static class MySplit implements InputSplit , Writable {
+
+ public MySplit(){
+ }
+
+ MySplit( MongoSuite config , int length ) {
+ _config = config;
+ _length = length;
+ }
+
+ public long getLength() {
+ return _length;
+ }
+
+ public String[] getLocations() {
+ return new String[0];
+ }
+
+ public void readFields( DataInput in )
+ throws IOException {
+ _config = new MongoSuite();
+ _config.readFields( in );
+ _length = in.readInt();
+ }
+
+ public void write( final DataOutput out )
+ throws IOException {
+ _config.write( out );
+ out.writeInt( _length );
+ }
+
+ MongoSuite _config;
+ int _length;
+ }
+
+ public static class InputMagic implements InputFormat<Text,MongoSuite> {
+
+ public RecordReader<Text,MongoSuite> getRecordReader( InputSplit split, JobConf job , Reporter reporter ){
+ final MySplit s = (MySplit)split;
+ return new RecordReader<Text,MongoSuite>() {
+
+ public void close(){}
+
+ public Text createKey() {
+ return new Text();
+ }
+
+ public MongoSuite createValue() {
+ return new MongoSuite();
+ }
+
+ public long getPos() {
+ return _seen ? 1 : 0;
+ }
+
+ public float getProgress() {
+ return getPos();
+ }
+
+ public boolean next( Text key , MongoSuite value ) {
+ key.set( s._config.suite );
+ value.copy( s._config );
+
+
+ boolean x = _seen;
+ _seen = true;
+ return !x;
+ }
+
+ boolean _seen = false;
+ };
+ }
+
+ public InputSplit[] getSplits( JobConf job , int numSplits ){
+ String[] pcs = job.get( "suites" ).split(",");
+ InputSplit[] splits = new InputSplit[pcs.length];
+ for ( int i=0; i<splits.length; i++ ) {
+ MongoSuite c = new MongoSuite();
+ c.suite = pcs[i];
+
+ c.mongo = job.get( "mongo" );
+ c.code = job.get( "code" );
+ c.workingDir = job.get( "workingDir" );
+
+ splits[i] = new MySplit( c , 100 /* XXX */);
+ }
+ return splits;
+ }
+
+ public void validateInput(JobConf job){}
+
+
+ }
+
+ /**
+ * args
+ * mongo tgz
+ * code tgz
+ * output path
+ * tests to run ?
+ */
+
+ public static void main( String[] args ) throws Exception{
+
+ JobConf conf = new JobConf();
+ conf.setJarByClass(emr.class);
+
+ String workingDir = "/data/db/emr/";
+
+ // parse args
+
+ int pos = 0;
+ for ( ; pos < args.length; pos++ ) {
+ if ( ! args[pos].startsWith( "--" ) )
+ break;
+
+ String arg = args[pos].substring(2);
+ if ( arg.equals( "workingDir" ) ) {
+ workingDir = args[++pos];
+ }
+ else {
+ System.err.println( "unknown arg: " + arg );
+ throw new RuntimeException( "unknown arg: " + arg );
+ }
+ }
+
+ String mongo = args[pos++];
+ String code = args[pos++];
+ String output = args[pos++];
+
+ String suites = "";
+ for ( ; pos < args.length; pos++ ) {
+ if ( suites.length() > 0 )
+ suites += ",";
+ suites += args[pos];
+ }
+
+ if ( suites.length() == 0 )
+ throw new RuntimeException( "no suites" );
+
+ System.out.println( "workingDir:\t" + workingDir );
+ System.out.println( "mongo:\t" + mongo );
+ System.out.println( "code:\t " + code );
+ System.out.println( "output\t: " + output );
+ System.out.println( "suites\t: " + suites );
+
+ // main hadoop set
+ conf.set( "mongo" , mongo );
+ conf.set( "code" , code );
+ conf.set( "workingDir" , workingDir );
+ conf.set( "suites" , suites );
+
+ conf.set( "mapred.map.tasks" , "1" );
+ conf.setLong( "mapred.task.timeout" , 3600 * 1000 /* 1 hour */);
+
+ conf.setOutputKeyClass(Text.class);
+ conf.setOutputValueClass(IntWritable.class);
+
+ conf.setMapperClass(Map.class);
+ conf.setReducerClass(Reduce.class);
+
+ conf.setInputFormat(InputMagic.class);
+ conf.setOutputFormat(TextOutputFormat.class);
+
+ FileOutputFormat.setOutputPath(conf, new Path(output) );
+
+ // actually run
+
+ JobClient.runJob( conf );
+ }
+}
diff --git a/buildscripts/emr/emr.py b/buildscripts/emr/emr.py
new file mode 100644
index 00000000000..727b4519992
--- /dev/null
+++ b/buildscripts/emr/emr.py
@@ -0,0 +1,225 @@
+
+import os
+import sys
+import shutil
+import datetime
+import time
+import subprocess
+import urllib
+import urllib2
+import json
+import pprint
+
+import boto
+import simples3
+
+def findSettingsSetup():
+ sys.path.append( "./" )
+ sys.path.append( "../" )
+ sys.path.append( "../../" )
+ sys.path.append( "../../../" )
+
+findSettingsSetup()
+import settings
+import buildscripts.utils as utils
+import buildscripts.smoke as smoke
+
+bucket = simples3.S3Bucket( settings.emr_bucket , settings.emr_id , settings.emr_key )
+
+def _get_status():
+
+ def gh( cmds ):
+ txt = ""
+ for cmd in cmds:
+ res = utils.execsys( "git " + cmd )
+ txt = txt + res[0] + res[1]
+ return utils.md5string( txt )
+
+ return "%s-%s" % ( utils.execsys( "git describe" )[0].strip(), gh( [ "diff" , "status" ] ) )
+
+def _get_most_recent_tgz( prefix ):
+ # this is icky, but works for now
+ all = []
+ for x in os.listdir( "." ):
+ if not x.startswith( prefix ) or not x.endswith( ".tgz" ):
+ continue
+ all.append( ( x , os.stat(x).st_mtime ) )
+
+ if len(all) == 0:
+ raise Exception( "can't find file with prefix: " + prefix )
+
+ all.sort( lambda x,y: int(y[1] - x[1]) )
+
+ return all[0][0]
+
+def make_tarball():
+
+ m = _get_most_recent_tgz( "mongodb-" )
+ c = "test-code-emr.tgz"
+ utils.execsys( "tar zcf %s src jstests buildscripts" % c )
+ return ( m , c )
+
+def _put_ine( bucket , local , remote ):
+ print( "going to put\n\t%s\n\thttp://%s.s3.amazonaws.com/%s" % ( local , settings.emr_bucket , remote ) )
+
+ for x in bucket.listdir( prefix=remote ):
+ print( "\talready existed" )
+ return remote
+
+ bucket.put( remote , open( local , "rb" ).read() , acl="public-read" )
+ return remote
+
+def build_jar():
+ root = "build/emrjar"
+ src = "buildscripts/emr"
+
+ if os.path.exists( root ):
+ shutil.rmtree( root )
+ os.makedirs( root )
+
+ for x in os.listdir( src ):
+ if not x.endswith( ".java" ):
+ continue
+ shutil.copyfile( src + "/" + x , root + "/" + x )
+ shutil.copyfile( src + "/MANIFEST.MF" , root + "/MANIFEST.FM" )
+
+ for x in os.listdir( root ):
+ if x.endswith( ".java" ):
+ if subprocess.call( [ "javac" , x ] , cwd=root) != 0:
+ raise Exception( "compiled failed" )
+
+ args = [ "jar" , "-cfm" , "emr.jar" , "MANIFEST.FM" ]
+ for x in os.listdir( root ):
+ if x.endswith( ".class" ):
+ args.append( x )
+ subprocess.call( args , cwd=root )
+
+ shutil.copyfile( root + "/emr.jar" , "emr.jar" )
+
+ return "emr.jar"
+
+def push():
+ mongo , test_code = make_tarball()
+ print( mongo )
+ print( test_code )
+
+ root = "emr/%s/%s" % ( datetime.date.today().strftime("%Y-%m-%d") , os.uname()[0].lower() )
+
+ def make_long_name(local):
+ pcs = local.rpartition( "." )
+ return "%s/%s-%s.%s" % ( root , pcs[0] , _get_status() , pcs[2] )
+
+ mongo = _put_ine( bucket , mongo , make_long_name( mongo ) )
+ test_code = _put_ine( bucket , test_code , make_long_name( test_code ) )
+
+ jar = build_jar()
+ jar = _put_ine( bucket , jar , make_long_name( jar ) )
+
+ setup = "buildscripts/emr/emrnodesetup.sh"
+ setup = _put_ine( bucket , setup , make_long_name( setup ) )
+
+ return mongo , test_code , jar , setup
+
+def run_tests( things , tests ):
+ if len(tests) == 0:
+ raise Exception( "no tests" )
+ oldNum = len(tests)
+ tests = fix_suites( tests )
+ print( "tests expanded from %d to %d" % ( oldNum , len(tests) ) )
+
+ print( "things:%s\ntests:%s\n" % ( things , tests ) )
+
+ emr = boto.connect_emr( settings.emr_id , settings.emr_key )
+
+ def http(path):
+ return "http://%s.s3.amazonaws.com/%s" % ( settings.emr_bucket , path )
+
+ run_s3_path = "emr/%s/%s/%s/" % ( os.getenv( "USER" ) ,
+ os.getenv( "HOST" ) ,
+ datetime.datetime.today().strftime( "%Y%m%d-%H%M" ) )
+
+ run_s3_root = "s3n://%s/%s/" % ( settings.emr_bucket , run_s3_path )
+
+ out = run_s3_root + "out"
+ logs = run_s3_root + "logs"
+
+ jar="s3n://%s/%s" % ( settings.emr_bucket , things[2] )
+ step_args=[ http(things[0]) , http(things[1]) , out , ",".join(tests) ]
+
+ step = boto.emr.step.JarStep( "emr main" , jar=jar,step_args=step_args )
+ print( "jar:%s\nargs:%s" % ( jar , step_args ) )
+
+ setup = boto.emr.BootstrapAction( "setup" , "s3n://%s/%s" % ( settings.emr_bucket , things[3] ) , [] )
+
+ jobid = emr.run_jobflow( name = "Mongo EMR for %s from %s" % ( os.getenv( "USER" ) , os.getenv( "HOST" ) ) ,
+ ec2_keyname = "emr1" ,
+ slave_instance_type = "m1.large" ,
+ ami_version = "latest" ,
+ num_instances=5 ,
+ log_uri = logs ,
+ bootstrap_actions = [ setup ] ,
+ steps = [ step ] )
+
+
+ print( "%s jobid: %s" % ( datetime.datetime.today() , jobid ) )
+
+ while ( True ):
+ flow = emr.describe_jobflow( jobid )
+ print( "%s status: %s" % ( datetime.datetime.today() , flow.state ) )
+ if flow.state == "COMPLETED" or flow.state == "FAILED":
+ break
+ time.sleep(30)
+
+ syncdir = "build/emrout/" + jobid + "/"
+ sync_s3( run_s3_path , syncdir )
+ print("output in: build/emrout/" + jobid + "/" )
+
+def sync_s3( remote_dir , local_dir ):
+ for x in bucket.listdir( remote_dir ):
+ out = local_dir + "/" + x[0]
+
+ if os.path.exists( out ) and x[2].find( utils.md5sum( out ) ) >= 0:
+ continue
+
+ dir = out.rpartition( "/" )[0]
+ if not os.path.exists( dir ):
+ os.makedirs( dir )
+
+ thing = bucket.get( x[0] )
+ open( out , "wb" ).write( thing.read() )
+
+def fix_suites( suites ):
+ fixed = []
+ for name,x in smoke.expand_suites( suites , False ):
+ idx = name.find( "/jstests" )
+ if idx >= 0:
+ name = name[idx+1:]
+ fixed.append( name )
+ return fixed
+
+if __name__ == "__main__":
+ if len(sys.argv) == 1:
+ print( "need an arg" )
+
+ elif sys.argv[1] == "tarball":
+ make_tarball()
+ elif sys.argv[1] == "jar":
+ build_jar()
+ elif sys.argv[1] == "push":
+ print( push() )
+
+ elif sys.argv[1] == "sync":
+ sync_s3( sys.argv[2] , sys.argv[3] )
+
+ elif sys.argv[1] == "fix_suites":
+ for x in fix_suites( sys.argv[2:] ):
+ print(x)
+
+ elif sys.argv[1] == "full":
+ things = push()
+ run_tests( things , sys.argv[2:] )
+
+ else:
+ things = push()
+ run_tests( things , sys.argv[1:] )
+
diff --git a/buildscripts/emr/emrnodesetup.sh b/buildscripts/emr/emrnodesetup.sh
new file mode 100644
index 00000000000..546becf3e27
--- /dev/null
+++ b/buildscripts/emr/emrnodesetup.sh
@@ -0,0 +1,7 @@
+#!/bin/sh
+
+sudo mkdir /mnt/data
+sudo ln -s /mnt/data /data
+sudo chown hadoop /mnt/data
+
+sudo easy_install pymongo