summaryrefslogtreecommitdiff
path: root/buildscripts/emr
diff options
context:
space:
mode:
authorEliot Horowitz <eliot@10gen.com>2012-05-14 16:36:13 -0400
committerEliot Horowitz <eliot@10gen.com>2012-05-14 16:36:29 -0400
commit46294ab848a441c496d4055ce0839628a258b0d4 (patch)
tree3316fe244c89d5eb552ec2e175b31d1d8b65429e /buildscripts/emr
parente3306d5cadc9fc379330ed38e131985f74c7db32 (diff)
downloadmongo-46294ab848a441c496d4055ce0839628a258b0d4.tar.gz
use a file lock on emr to avoid conflicts
Diffstat (limited to 'buildscripts/emr')
-rw-r--r--buildscripts/emr/FileLock.java107
-rw-r--r--buildscripts/emr/emr.java36
2 files changed, 129 insertions, 14 deletions
diff --git a/buildscripts/emr/FileLock.java b/buildscripts/emr/FileLock.java
new file mode 100644
index 00000000000..52ed4c083b4
--- /dev/null
+++ b/buildscripts/emr/FileLock.java
@@ -0,0 +1,107 @@
+// FileLock.java
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.*;
+
+/**
+ * "locks" a resource by using the file system as storage
+ * file has 1 line
+ * <incarnation> <last ping time in millis>
+ */
+public class FileLock {
+
+ public FileLock( String logicalName )
+ throws IOException {
+
+ _file = new File( "/tmp/java-fileLock-" + logicalName );
+ _incarnation = "xxx" + Math.random() + "yyy";
+
+ if ( ! _file.exists() ) {
+ FileOutputStream fout = new FileOutputStream( _file );
+ fout.write( "\n".getBytes() );
+ fout.close();
+ }
+
+ }
+
+ /**
+ * takes lock
+ * if someone else has it, blocks until the other one finishes
+ */
+ public void lock()
+ throws IOException {
+ if ( _lock != null )
+ throw new IllegalStateException( "can't lock when you're locked" );
+
+ try {
+ _semaphore.acquire();
+ }
+ catch ( InterruptedException ie ) {
+ throw new RuntimeException( "sad" , ie );
+ }
+
+ _raf = new RandomAccessFile( _file , "rw" );
+ _lock = _raf.getChannel().lock();
+ }
+
+ public void unlock()
+ throws IOException {
+
+ if ( _lock == null )
+ throw new IllegalStateException( "can't unlock when you're not locked" );
+
+ _lock.release();
+ _semaphore.release();
+
+ _locked = false;
+ }
+
+ final File _file;
+ final String _incarnation;
+
+ private RandomAccessFile _raf;
+ private java.nio.channels.FileLock _lock;
+
+ private boolean _locked;
+
+ private static Semaphore _semaphore = new Semaphore(1);
+
+
+ public static void main( final String[] args )
+ throws Exception {
+
+ List<Thread> threads = new ArrayList<Thread>();
+
+ for ( int i=0; i<3; i++ ) {
+
+ threads.add( new Thread() {
+ public void run() {
+ try {
+ FileLock lock = new FileLock( args[0] );
+
+ long start = System.currentTimeMillis();
+
+ lock.lock();
+ System.out.println( "time to lock:\t" + (System.currentTimeMillis()-start) );
+ Thread.sleep( Integer.parseInt( args[1] ) );
+ lock.unlock();
+ System.out.println( "total time:\t" + (System.currentTimeMillis()-start) );
+ }
+ catch ( Exception e ) {
+ e.printStackTrace();
+ }
+ }
+ } );
+ }
+
+ for ( Thread t : threads ) {
+ t.start();
+ }
+
+ for ( Thread t : threads ) {
+ t.join();
+ }
+
+ }
+}
diff --git a/buildscripts/emr/emr.java b/buildscripts/emr/emr.java
index 9540d99480c..1f8d3d0e623 100644
--- a/buildscripts/emr/emr.java
+++ b/buildscripts/emr/emr.java
@@ -99,21 +99,29 @@ public class emr {
public void map( Text key, MongoSuite value, OutputCollector<Text,IntWritable> output, Reporter reporter )
throws IOException {
-
- System.out.println( "key: " + key );
- System.out.println( "value: " + value );
-
- long start = System.currentTimeMillis();
- boolean passed = value.runTest();
- long end = System.currentTimeMillis();
- output.collect( new Text( passed ? "passed" : "failed" ) , new IntWritable( 1 ) );
- output.collect( new Text( key.toString() + "-time-seconds" ) , new IntWritable( (int)((end-start)/(1000)) ) );
- output.collect( new Text( key.toString() + "-passed" ) , new IntWritable( passed ? 1 : 0 ) );
+ FileLock lock = new FileLock( "mapper" );
+ try {
+ lock.lock();
- String ip = IOUtil.readStringFully( new URL( "http://myip.10gen.com/" ).openConnection().getInputStream() );
- ip = ip.substring( ip.indexOf( ":" ) + 1 ).trim();
- output.collect( new Text( ip ) , new IntWritable(1) );
+ System.out.println( "key: " + key );
+ System.out.println( "value: " + value );
+
+ long start = System.currentTimeMillis();
+ boolean passed = value.runTest();
+ long end = System.currentTimeMillis();
+
+ output.collect( new Text( passed ? "passed" : "failed" ) , new IntWritable( 1 ) );
+ output.collect( new Text( key.toString() + "-time-seconds" ) , new IntWritable( (int)((end-start)/(1000)) ) );
+ output.collect( new Text( key.toString() + "-passed" ) , new IntWritable( passed ? 1 : 0 ) );
+
+ String ip = IOUtil.readStringFully( new URL( "http://myip.10gen.com/" ).openConnection().getInputStream() );
+ ip = ip.substring( ip.indexOf( ":" ) + 1 ).trim();
+ output.collect( new Text( ip ) , new IntWritable(1) );
+ }
+ finally {
+ lock.unlock();
+ }
}
@@ -290,7 +298,7 @@ public class emr {
conf.set( "suites" , suites );
conf.set( "mapred.map.tasks" , "1" );
- conf.setLong( "mapred.task.timeout" , 3600 * 1000 /* 1 hour */);
+ conf.setLong( "mapred.task.timeout" , 4 * 3600 * 1000 /* 4 hours */);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);