summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGregory Farnum <greg@inktank.com>2013-09-16 13:57:47 -0700
committerGregory Farnum <greg@inktank.com>2013-09-16 13:57:47 -0700
commit4930ccb8a23e4aa169dce995f26f0617bc4318a9 (patch)
tree0e4e012796d2c8664ce9419e30f459c150417286
parent01ec6a3fdf9da8b35cc034788ab306957d5fd969 (diff)
parentb86c0686694245759db6a8b309894c2adf477d50 (diff)
downloadceph-4930ccb8a23e4aa169dce995f26f0617bc4318a9.tar.gz
Merge pull request #597 from ceph/remove-hadoop-shim
This branch built fine on the gitbuilders and the list of removed files looks good to me. Reviewed-by: Greg Farnum <greg@inktank.com>
-rw-r--r--COPYING4
-rw-r--r--ceph.spec.in1
-rw-r--r--configure.ac19
-rw-r--r--debian/copyright4
-rwxr-xr-xdo_autogen.sh3
-rw-r--r--doc/rados/troubleshooting/log-and-debug.rst2
-rw-r--r--src/Makefile.am14
-rw-r--r--src/client/hadoop/CephFSInterface.cc993
-rw-r--r--src/client/hadoop/CephFSInterface.h236
-rw-r--r--src/client/hadoop/HADOOP-ceph.patch2234
-rw-r--r--src/client/hadoop/Readme17
-rw-r--r--src/client/hadoop/ceph/CephFS.java250
-rw-r--r--src/client/hadoop/ceph/CephFaker.java483
-rw-r--r--src/client/hadoop/ceph/CephFileSystem.java804
-rw-r--r--src/client/hadoop/ceph/CephInputStream.java254
-rw-r--r--src/client/hadoop/ceph/CephOutputStream.java219
-rw-r--r--src/client/hadoop/ceph/CephTalker.java91
-rw-r--r--src/client/hadoop/ceph/LICENSE4
-rw-r--r--src/client/hadoop/ceph/TestCeph.java45
-rw-r--r--src/client/hadoop/ceph/package.html101
-rw-r--r--src/client/hadoop/org_apache_hadoop_fs_ceph_CephFS.h13
-rw-r--r--src/client/hadoop/org_apache_hadoop_fs_ceph_CephFileSystem.h31
-rw-r--r--src/client/hadoop/org_apache_hadoop_fs_ceph_CephFileSystem_CephStat.h13
-rw-r--r--src/client/hadoop/org_apache_hadoop_fs_ceph_CephFileSystem_Stat.h13
-rw-r--r--src/client/hadoop/org_apache_hadoop_fs_ceph_CephInputStream.h47
-rw-r--r--src/client/hadoop/org_apache_hadoop_fs_ceph_CephOutputStream.h37
-rw-r--r--src/client/hadoop/org_apache_hadoop_fs_ceph_CephTalker.h197
-rw-r--r--src/common/config_opts.h1
-rw-r--r--src/test/Makefile.am13
29 files changed, 2 insertions, 6141 deletions
diff --git a/COPYING b/COPYING
index 68309134056..920b049b7fa 100644
--- a/COPYING
+++ b/COPYING
@@ -6,10 +6,6 @@ Files: doc/*
Copyright: (c) 2010-2012 New Dream Network and contributors
License: Creative Commons Attribution-ShareAlike (CC BY-SA)
-Files: src/client/hadoop/ceph
-Copyright: Copyright (C) New Dream Network and contributors
-License: Apache License v2
-
Files: src/mount/canonicalize.c
Copyright: Copyright (C) 1993 Rick Sladkey <jrs@world.std.com>
License: LGPL2 or later
diff --git a/ceph.spec.in b/ceph.spec.in
index 8091018c1dc..851ee7acfd5 100644
--- a/ceph.spec.in
+++ b/ceph.spec.in
@@ -278,7 +278,6 @@ export RPM_OPT_FLAGS=`echo $RPM_OPT_FLAGS | sed -e 's/i386/i486/'`
--localstatedir=/var \
--sysconfdir=/etc \
--docdir=%{_docdir}/ceph \
- --without-hadoop \
--with-nss \
--without-cryptopp \
--with-rest-bench \
diff --git a/configure.ac b/configure.ac
index d2bf5959881..d7f96fd11f4 100644
--- a/configure.ac
+++ b/configure.ac
@@ -391,25 +391,6 @@ if test "x$enable_cephfs_java" = "xyes"; then
fi
AM_CONDITIONAL(HAVE_JUNIT4, [test "$have_junit4" = "1"])
-# jni?
-# clear cache (from java above) -- this whole thing will get
-# folded into the bigger java package later -- for now maintain
-# backward compat
-AS_UNSET(ac_cv_header_jni_h)
-AC_ARG_WITH([hadoop],
- [AS_HELP_STRING([--with-hadoop], [build hadoop client])],
- [],
- [with_hadoop=check])
-AS_IF([test "x$with_hadoop" != xno],
- [AC_CHECK_HEADER([jni.h],
- [HAVE_JNI=1],
- [if test "x$with_hadoop" != xcheck; then
- AC_MSG_FAILURE(
- [--with-hadoop was given but jni.h not found])
- fi
- ])])
-AM_CONDITIONAL(WITH_HADOOPCLIENT, [test "$HAVE_JNI" = "1"])
-
#
# FreeBSD has it in base.
#
diff --git a/debian/copyright b/debian/copyright
index aa91a149853..d11a0f7f5da 100644
--- a/debian/copyright
+++ b/debian/copyright
@@ -7,10 +7,6 @@ Files: *
Copyright: (c) 2004-2010 by Sage Weil <sage@newdream.net>
License: LGPL2.1 (see /usr/share/common-licenses/LGPL-2.1)
-Files: src/client/hadoop/ceph
-Copyright: Copyright (C) New Dream Network and contributors
-License: Apache License v2
-
Files: src/mount/canonicalize.c
Copyright: Copyright (C) 1993 Rick Sladkey <jrs@world.std.com>
License: LGPL2 or later
diff --git a/do_autogen.sh b/do_autogen.sh
index 32e9df4623b..bc6749e9e5d 100755
--- a/do_autogen.sh
+++ b/do_autogen.sh
@@ -10,7 +10,6 @@ do_autogen.sh: make a ceph build by running autogen, etc.
level 1: -g
level 3: -Wextra
level 4: even more...
--H --with-hadoop
-T --without-tcmalloc
-e <path> dump encoded objects to <path>
-P profiling build
@@ -46,8 +45,6 @@ do
h) usage
exit 0;;
- H) CONFIGURE_FLAGS="$CONFIGURE_FLAGS --with-hadoop";;
-
T) CONFIGURE_FLAGS="$CONFIGURE_FLAGS --without-tcmalloc";;
j) CONFIGURE_FLAGS="$CONFIGURE_FLAGS --enable-cephfs-java";;
diff --git a/doc/rados/troubleshooting/log-and-debug.rst b/doc/rados/troubleshooting/log-and-debug.rst
index 2f2e5e4abc0..7d1ea43d1db 100644
--- a/doc/rados/troubleshooting/log-and-debug.rst
+++ b/doc/rados/troubleshooting/log-and-debug.rst
@@ -243,7 +243,7 @@ to their default level or to a level suitable for normal operations.
+--------------------+-----------+--------------+
| ``rgw`` | 1 | 5 |
+--------------------+-----------+--------------+
-| ``hadoop`` | 1 | 5 |
+| ``javaclient`` | 1 | 5 |
+--------------------+-----------+--------------+
| ``asok`` | 1 | 5 |
+--------------------+-----------+--------------+
diff --git a/src/Makefile.am b/src/Makefile.am
index 3520ca8d8f9..c0ed016006d 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -96,25 +96,13 @@ bin_PROGRAMS += rbd-fuse
endif # WITH_FUSE
-# libcephfs (this and libhadoopcephfs should go somewhere else in the future)
+# libcephfs (this should go somewhere else in the future)
libcephfs_la_SOURCES = libcephfs.cc
libcephfs_la_LIBADD = $(LIBCLIENT) $(LIBCOMMON) $(PTHREAD_LIBS) $(CRYPTO_LIBS) $(EXTRALIBS)
libcephfs_la_LDFLAGS = ${AM_LDFLAGS} -version-info 1:0:0 -export-symbols-regex '^ceph_.*'
lib_LTLIBRARIES += libcephfs.la
-# order matters! this *must* come after libcephfs, or else you'll encounter something
-# like "error: relink libhadoopcephfs.la with the above command before installing it"
-if WITH_HADOOPCLIENT
-JAVA_BASE = /usr/lib/jvm/java-6-sun
-libhadoopcephfs_la_SOURCES = client/hadoop/CephFSInterface.cc
-libhadoopcephfs_la_LIBADD = $(LIBCEPHFS)
-libhadoopcephfs_la_LDFLAGS = ${AM_LDFLAGS} -version-info 1:0:0 -export-symbols-regex 'hadoopcephfs_.*'
-lib_LTLIBRARIES += libhadoopcephfs.la
-noinst_HEADERS += client/hadoop/CephFSInterface.h
-endif # WITH_HADOOPCLIENT
-
-
# jni library (java source is in src/java)
if ENABLE_CEPHFS_JAVA
diff --git a/src/client/hadoop/CephFSInterface.cc b/src/client/hadoop/CephFSInterface.cc
deleted file mode 100644
index d5a3c8f4fcd..00000000000
--- a/src/client/hadoop/CephFSInterface.cc
+++ /dev/null
@@ -1,993 +0,0 @@
-// -*- mode:c++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-#include "CephFSInterface.h"
-#include "include/cephfs/libcephfs.h"
-#include "common/ceph_argparse.h"
-#include "common/config.h"
-#include "msg/SimpleMessenger.h"
-
-#include <arpa/inet.h>
-#include <sys/stat.h>
-#include <sys/statvfs.h>
-
-#define dout_subsys ceph_subsys_hadoop
-
-union ceph_mount_union_t {
- struct ceph_mount_info *cmount;
- jlong cjlong;
-};
-
-static void set_ceph_mount_info(JNIEnv *env, jobject obj, struct ceph_mount_info *cmount)
-{
- jclass cls = env->GetObjectClass(obj);
- if (cls == NULL)
- return;
- jfieldID fid = env->GetFieldID(cls, "cluster", "J");
- if (fid == NULL)
- return;
- ceph_mount_union_t ceph_mount_union;
- ceph_mount_union.cjlong = 0;
- ceph_mount_union.cmount = cmount;
- env->SetLongField(obj, fid, ceph_mount_union.cjlong);
-}
-
-static struct ceph_mount_info *get_ceph_mount_t(JNIEnv *env, jobject obj)
-{
- jclass cls = env->GetObjectClass(obj);
- jfieldID fid = env->GetFieldID(cls, "cluster", "J");
- if (fid == NULL)
- return NULL;
- ceph_mount_union_t ceph_mount_union;
- ceph_mount_union.cjlong = env->GetLongField(obj, fid);
- return ceph_mount_union.cmount;
-}
-
-/*
- * Class: org_apache_hadoop_fs_ceph_CephTalker
- * Method: ceph_initializeClient
- * Signature: (Ljava/lang/String;I)Z
- *
- * Performs any necessary setup to allow general use of the filesystem.
- * Inputs:
- * jstring args -- a command-line style input of Ceph config params
- * jint block_size -- the size in bytes to use for blocks
- * Returns: true on success, false otherwise
- */
-JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1initializeClient
- (JNIEnv *env, jobject obj, jstring j_args, jint block_size)
-{
- // Convert Java argument string to argv
- const char *c_args = env->GetStringUTFChars(j_args, 0);
- if (c_args == NULL)
- return false; //out of memory!
- string cppargs(c_args);
- char b[cppargs.length()+1];
- strcpy(b, cppargs.c_str());
- env->ReleaseStringUTFChars(j_args, c_args);
- std::vector<const char*> args;
- char *p = b;
- while (*p) {
- args.push_back(p);
- while (*p && *p != ' ')
- p++;
- if (!*p)
- break;
- *p++ = 0;
- while (*p && *p == ' ')
- p++;
- }
-
- // parse the arguments
- bool set_local_writes = false;
- std::string mount_root, val;
- for (std::vector<const char*>::iterator i = args.begin(); i != args.end(); ) {
- if (ceph_argparse_witharg(args, i, &val, "mount_root", (char*)NULL)) {
- mount_root = val;
- } else if (ceph_argparse_flag(args, i, "set_local_pg", (char*)NULL)) {
- set_local_writes = true;
- } else {
- ++i;
- }
- }
-
- // connect to the cmount
- struct ceph_mount_info *cmount;
- int ret = ceph_create(&cmount, NULL);
- if (ret)
- return false;
- ceph_conf_read_file(cmount, NULL); // read config file from the default location
- ceph_conf_parse_argv(cmount, args.size(), &args[0]);
- CephContext *cct = ceph_get_mount_context(cmount);
- ldout(cct, 3) << "CephFSInterface: mounting filesystem...:" << dendl;
-
- ret = ceph_mount(cmount, mount_root.c_str());
- if (ret)
- return false;
-
- ceph_localize_reads(cmount, true);
- ceph_set_default_file_stripe_unit(cmount, block_size);
- ceph_set_default_object_size(cmount, block_size);
-
- if (set_local_writes) {
- ceph_set_default_preferred_pg(cmount, ceph_get_local_osd(cmount));
- }
-
- set_ceph_mount_info(env, obj, cmount);
- return true;
-}
-
-/*
- * Class: org_apache_hadoop_fs_ceph_CephTalker
- * Method: ceph_getcwd
- * Signature: (J)Ljava/lang/String;
- *
- * Returns the current working directory.(absolute) as a jstring
- */
-JNIEXPORT jstring JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1getcwd
- (JNIEnv *env, jobject obj)
-{
- struct ceph_mount_info *cmount = get_ceph_mount_t(env, obj);
- CephContext *cct = ceph_get_mount_context(cmount);
- ldout(cct, 10) << "CephFSInterface: In getcwd" << dendl;
- jstring j_path = env->NewStringUTF(ceph_getcwd(cmount));
- return j_path;
-}
-
-/*
- * Class: org_apache_hadoop_fs_ceph_CephTalker
- * Method: ceph_setcwd
- * Signature: (Ljava/lang/String;)Z
- *
- * Changes the working directory.
- * Inputs:
- * jstring j_path: The path (relative or absolute) to switch to
- * Returns: true on success, false otherwise.
- */
-JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1setcwd
-(JNIEnv *env, jobject obj, jstring j_path)
-{
- struct ceph_mount_info *cmount = get_ceph_mount_t(env, obj);
- CephContext *cct = ceph_get_mount_context(cmount);
- ldout(cct, 10) << "CephFSInterface: In setcwd" << dendl;
-
- const char *c_path = env->GetStringUTFChars(j_path, 0);
- if (c_path == NULL)
- return false;
- int ret = ceph_chdir(cmount, c_path);
- env->ReleaseStringUTFChars(j_path, c_path);
- return ret ? JNI_FALSE : JNI_TRUE;
-}
-
-/*
- * Class: org_apache_hadoop_fs_ceph_CephTalker
- * Method: ceph_rmdir
- * Signature: (Ljava/lang/String;)Z
- *
- * Given a path to a directory, removes the directory.if empty.
- * Inputs:
- * jstring j_path: The path (relative or absolute) to the directory
- * Returns: true on successful delete; false otherwise
- */
-JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1rmdir
- (JNIEnv *env, jobject obj, jstring j_path)
-{
- struct ceph_mount_info *cmount = get_ceph_mount_t(env, obj);
- CephContext *cct = ceph_get_mount_context(cmount);
- ldout(cct, 10) << "CephFSInterface: In rmdir" << dendl;
-
- const char *c_path = env->GetStringUTFChars(j_path, 0);
- if(c_path == NULL)
- return false;
- int ret = ceph_rmdir(cmount, c_path);
- env->ReleaseStringUTFChars(j_path, c_path);
- return ret ? JNI_FALSE : JNI_TRUE;
-}
-
-/*
- * Class: org_apache_hadoop_fs_ceph_CephTalker
- * Method: ceph_unlink
- * Signature: (Ljava/lang/String;)Z
- * Given a path, unlinks it.
- * Inputs:
- * jstring j_path: The path (relative or absolute) to the file or empty dir
- * Returns: true if the unlink occurred, false otherwise.
- */
-JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1unlink
- (JNIEnv *env, jobject obj, jstring j_path)
-{
- struct ceph_mount_info *cmount = get_ceph_mount_t(env, obj);
- CephContext *cct = ceph_get_mount_context(cmount);
- const char *c_path = env->GetStringUTFChars(j_path, 0);
- if (c_path == NULL)
- return false;
- ldout(cct, 10) << "CephFSInterface: In unlink for path " << c_path << ":" << dendl;
- int ret = ceph_unlink(cmount, c_path);
- env->ReleaseStringUTFChars(j_path, c_path);
- return ret ? JNI_FALSE : JNI_TRUE;
-}
-
-/*
- * Class: org_apache_hadoop_fs_ceph_CephTalker
- * Method: ceph_rename
- * Signature: (Ljava/lang/String;Ljava/lang/String;)Z
- * Changes a given path name to a new name.
- * Inputs:
- * jstring j_from: The path whose name you want to change.
- * jstring j_to: The new name for the path.
- * Returns: true if the rename occurred, false otherwise
- */
-JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1rename
- (JNIEnv *env, jobject obj, jstring j_from, jstring j_to)
-{
- struct ceph_mount_info *cmount = get_ceph_mount_t(env, obj);
- CephContext *cct = ceph_get_mount_context(cmount);
- ldout(cct, 10) << "CephFSInterface: In rename" << dendl;
- const char *c_from = env->GetStringUTFChars(j_from, 0);
- if (c_from == NULL)
- return false;
- const char *c_to = env->GetStringUTFChars(j_to, 0);
- if (c_to == NULL) {
- env->ReleaseStringUTFChars(j_from, c_from);
- return false;
- }
- struct stat stbuf;
- int ret = ceph_lstat(cmount, c_to, &stbuf);
- if (ret != -ENOENT) {
- // Hadoop doesn't want to overwrite files in a rename.
- env->ReleaseStringUTFChars(j_from, c_from);
- env->ReleaseStringUTFChars(j_to, c_to);
- return JNI_FALSE;
- }
-
- ret = ceph_rename(cmount, c_from, c_to);
- return ret ? JNI_FALSE : JNI_TRUE;
-}
-
-/*
- * Class: org_apache_hadoop_fs_ceph_CephTalker
- * Method: ceph_exists
- * Signature: (Ljava/lang/String;)Z
- * Returns true if it the input path exists, false
- * if it does not or there is an unexpected failure.
- */
-JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1exists
-(JNIEnv *env, jobject obj, jstring j_path)
-{
- struct ceph_mount_info *cmount = get_ceph_mount_t(env, obj);
- CephContext *cct = ceph_get_mount_context(cmount);
- ldout(cct, 10) << "CephFSInterface: In exists" << dendl;
-
- struct stat stbuf;
-
- const char *c_path = env->GetStringUTFChars(j_path, 0);
- if (c_path == NULL)
- return false;
- ldout(cct, 10) << "Attempting lstat with file " << c_path << ":" << dendl;
-
- int ret = ceph_lstat(cmount, c_path, &stbuf);
- ldout(cct, 10) << "result is " << ret << dendl;
- env->ReleaseStringUTFChars(j_path, c_path);
- if (ret < 0) {
- ldout(cct, 10) << "Returning false (file does not exist)" << dendl;
- return JNI_FALSE;
- }
- else {
- ldout(cct, 10) << "Returning true (file exists)" << dendl;
- return JNI_TRUE;
- }
-}
-
-/*
- * Class: org_apache_hadoop_fs_ceph_CephTalker
- * Method: ceph_getblocksize
- * Signature: (Ljava/lang/String;)J
- * Get the block size for a given path.
- * Input:
- * j_string j_path: The path (relative or absolute) you want
- * the block size for.
- * Returns: block size (as a long) if the path exists, otherwise a negative
- * number corresponding to the standard C++ error codes (which are positive).
- */
-JNIEXPORT jlong JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1getblocksize
- (JNIEnv *env, jobject obj, jstring j_path)
-{
- struct ceph_mount_info *cmount = get_ceph_mount_t(env, obj);
- CephContext *cct = ceph_get_mount_context(cmount);
- ldout(cct, 10) << "In getblocksize" << dendl;
-
- //struct stat stbuf;
-
- jlong result;
-
- const char *c_path = env->GetStringUTFChars(j_path, 0);
- if (c_path == NULL)
- return -ENOMEM;
- // we need to open the file to retrieve the stripe size
- ldout(cct, 10) << "CephFSInterface: getblocksize: opening file" << dendl;
- int fh = ceph_open(cmount, c_path, O_RDONLY, 0);
- env->ReleaseStringUTFChars(j_path, c_path);
- if (fh < 0)
- return fh;
-
- result = ceph_get_file_stripe_unit(cmount, fh);
-
- int close_result = ceph_close(cmount, fh);
- if (close_result < 0)
- return close_result;
-
- return result;
-}
-
-/*
- * Class: org_apache_hadoop_fs_ceph_CephTalker
- * Method: ceph_isfile
- * Signature: (Ljava/lang/String;)Z
- * Returns true if the given path is a file; false otherwise.
- */
-JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1isfile
- (JNIEnv *env, jobject obj, jstring j_path)
-{
- struct ceph_mount_info *cmount = get_ceph_mount_t(env, obj);
- CephContext *cct = ceph_get_mount_context(cmount);
- ldout(cct, 10) << "In isfile" << dendl;
-
- struct stat stbuf;
-
- const char *c_path = env->GetStringUTFChars(j_path, 0);
- if (c_path == NULL)
- return false;
- int ret = ceph_lstat(cmount, c_path, &stbuf);
- env->ReleaseStringUTFChars(j_path, c_path);
-
- // if the stat call failed, it's definitely not a file...
- if (ret < 0)
- return false;
-
- // check the stat result
- return !!S_ISREG(stbuf.st_mode);
-}
-
-
-/*
- * Class: org_apache_hadoop_fs_ceph_CephTalker
- * Method: ceph_isdirectory
- * Signature: (Ljava/lang/String;)Z
- * Returns true if the given path is a directory, false otherwise.
- */
-JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1isdirectory
- (JNIEnv *env, jobject obj, jstring j_path)
-{
- struct ceph_mount_info *cmount = get_ceph_mount_t(env, obj);
- CephContext *cct = ceph_get_mount_context(cmount);
- ldout(cct, 10) << "In isdirectory" << dendl;
-
- struct stat stbuf;
-
- const char *c_path = env->GetStringUTFChars(j_path, 0);
- if (c_path == NULL)
- return false;
- int result = ceph_lstat(cmount, c_path, &stbuf);
- env->ReleaseStringUTFChars(j_path, c_path);
-
- // if the stat call failed, it's definitely not a directory...
- if (result < 0)
- return JNI_FALSE;
-
- // check the stat result
- return !!S_ISDIR(stbuf.st_mode);
-}
-
-/*
- * Class: org_apache_hadoop_fs_ceph_CephTalker
- * Method: ceph_getdir
- * Signature: (Ljava/lang/String;)[Ljava/lang/String;
- * Get the contents of a given directory.
- * Inputs:
- * jstring j_path: The path (relative or absolute) to the directory.
- * Returns: A Java String[] of the contents of the directory, or
- * NULL if there is an error (ie, path is not a dir). This listing
- * will not contain . or .. entries.
- */
-JNIEXPORT jobjectArray JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1getdir
-(JNIEnv *env, jobject obj, jstring j_path)
-{
- struct ceph_mount_info *cmount = get_ceph_mount_t(env, obj);
- CephContext *cct = ceph_get_mount_context(cmount);
- ldout(cct, 10) << "In getdir" << dendl;
-
- // get the directory listing
- list<string> contents;
- const char *c_path = env->GetStringUTFChars(j_path, 0);
- if (c_path == NULL) return NULL;
- struct ceph_dir_result *dirp;
- int r;
- r = ceph_opendir(cmount, c_path, &dirp);
- if (r<0) {
- env->ReleaseStringUTFChars(j_path, c_path);
- return NULL;
- }
- int buflen = 100; //good default?
- char *buf = new char[buflen];
- string *ent;
- int bufpos;
- while (1) {
- r = ceph_getdnames(cmount, dirp, buf, buflen);
- if (r==-ERANGE) { //expand the buffer
- delete [] buf;
- buflen *= 2;
- buf = new char[buflen];
- continue;
- }
- if (r<=0) break;
-
- //if we make it here, we got at least one name
- bufpos = 0;
- while (bufpos<r) {//make new strings and add them to listing
- ent = new string(buf+bufpos);
- if (ent->compare(".") && ent->compare(".."))
- //we DON'T want to include dot listings; Hadoop gets confused
- contents.push_back(*ent);
- bufpos+=ent->size()+1;
- delete ent;
- }
- }
- delete [] buf;
- ceph_closedir(cmount, dirp);
- env->ReleaseStringUTFChars(j_path, c_path);
-
- if (r < 0) return NULL;
-
- // Create a Java String array of the size of the directory listing
- jclass stringClass = env->FindClass("java/lang/String");
- if (stringClass == NULL) {
- ldout(cct, 0) << "ERROR: java String class not found; dying a horrible, painful death" << dendl;
- assert(0);
- }
- jobjectArray dirListingStringArray = (jobjectArray) env->NewObjectArray(contents.size(), stringClass, NULL);
- if(dirListingStringArray == NULL) return NULL;
-
- // populate the array with the elements of the directory list
- int i = 0;
- for (list<string>::iterator it = contents.begin();
- it != contents.end();
- ++it) {
- env->SetObjectArrayElement(dirListingStringArray, i,
- env->NewStringUTF(it->c_str()));
- ++i;
- }
-
- return dirListingStringArray;
-}
-
-/*
- * Class: org_apache_hadoop_fs_ceph_CephTalker
- * Method: ceph_mkdirs
- * Signature: (Ljava/lang/String;I)I
- * Create the specified directory and any required intermediate ones with the
- * given mode.
- */
-JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1mkdirs
-(JNIEnv *env, jobject obj, jstring j_path, jint mode)
-{
- struct ceph_mount_info *cmount = get_ceph_mount_t(env, obj);
- CephContext *cct = ceph_get_mount_context(cmount);
- ldout(cct, 10) << "In Hadoop mk_dirs" << dendl;
-
- //get c-style string and make the call, clean up the string...
- jint result;
- const char *c_path = env->GetStringUTFChars(j_path, 0);
- if (c_path == NULL)
- return -ENOMEM;
- result = ceph_mkdirs(cmount, c_path, mode);
- env->ReleaseStringUTFChars(j_path, c_path);
-
- //...and return
- return result;
-}
-
-/*
- * Class: org_apache_hadoop_fs_ceph_CephTalker
- * Method: ceph_open_for_append
- * Signature: (Ljava/lang/String;)I
- * Open a file to append. If the file does not exist, it will be created.
- * Opening a dir is possible but may have bad results.
- * Inputs:
- * jstring j_path: The path to open.
- * Returns: a jint filehandle, or a number<0 if an error occurs.
- */
-JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1open_1for_1append
-(JNIEnv *env, jobject obj, jstring j_path)
-{
- struct ceph_mount_info *cmount = get_ceph_mount_t(env, obj);
- CephContext *cct = ceph_get_mount_context(cmount);
- ldout(cct, 10) << "In hadoop open_for_append" << dendl;
-
- jint result;
-
- const char *c_path = env->GetStringUTFChars(j_path, 0);
- if (c_path == NULL)
- return -ENOMEM;
- result = ceph_open(cmount, c_path, O_WRONLY|O_CREAT|O_APPEND, 0);
- env->ReleaseStringUTFChars(j_path, c_path);
-
- return result;
-}
-
-
-/*
- * Class: org_apache_hadoop_fs_ceph_CephTalker
- * Method: ceph_open_for_read
- * Signature: (Ljava/lang/String;)I
- * Open a file for reading.
- * Opening a dir is possible but may have bad results.
- * Inputs:
- * jstring j_path: The path to open.
- * Returns: a jint filehandle, or a number<0 if an error occurs.
- */
-JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1open_1for_1read
- (JNIEnv *env, jobject obj, jstring j_path)
-{
- struct ceph_mount_info *cmount = get_ceph_mount_t(env, obj);
- CephContext *cct = ceph_get_mount_context(cmount);
- ldout(cct, 10) << "In open_for_read" << dendl;
-
- jint result;
-
- // open as read-only: flag = O_RDONLY
- const char *c_path = env->GetStringUTFChars(j_path, 0);
- if (c_path == NULL)
- return -ENOMEM;
- result = ceph_open(cmount, c_path, O_RDONLY, 0);
- env->ReleaseStringUTFChars(j_path, c_path);
-
- // returns file handle, or -1 on failure
- return result;
-}
-
-/*
- * Class: org_apache_hadoop_fs_ceph_CephTalker
- * Method: ceph_open_for_overwrite
- * Signature: (Ljava/lang/String;)I
- * Opens a file for overwriting; creates it if necessary.
- * Opening a dir is possible but may have bad results.
- * Inputs:
- * jstring j_path: The path to open.
- * jint mode: The mode to open with.
- * Returns: a jint filehandle, or a number<0 if an error occurs.
- */
-JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1open_1for_1overwrite
- (JNIEnv *env, jobject obj, jstring j_path, jint mode)
-{
- struct ceph_mount_info *cmount = get_ceph_mount_t(env, obj);
- CephContext *cct = ceph_get_mount_context(cmount);
- ldout(cct, 10) << "In open_for_overwrite" << dendl;
-
- jint result;
- const char *c_path = env->GetStringUTFChars(j_path, 0);
- if (c_path == NULL)
- return -ENOMEM;
- result = ceph_open(cmount, c_path, O_WRONLY|O_CREAT|O_TRUNC, mode);
- env->ReleaseStringUTFChars(j_path, c_path);
-
- // returns file handle, or -1 on failure
- return result;
-}
-
-/*
- * Class: org_apache_hadoop_fs_ceph_CephTalker
- * Method: ceph_close
- * Signature: (I)I
- * Closes a given filehandle.
- */
-JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1close
-(JNIEnv *env, jobject obj, jint fh)
-{
- struct ceph_mount_info *cmount = get_ceph_mount_t(env, obj);
- CephContext *cct = ceph_get_mount_context(cmount);
- ldout(cct, 10) << "In CephTalker::ceph_close" << dendl;
- return ceph_close(cmount, fh);
-}
-
-/*
- * Class: org_apache_hadoop_fs_ceph_CephTalker
- * Method: ceph_setPermission
- * Signature: (Ljava/lang/String;I)Z
- * Change the mode on a path.
- * Inputs:
- * jstring j_path: The path to change mode on.
- * jint j_new_mode: The mode to apply.
- * Returns: true if the mode is properly applied, false if there
- * is any error.
- */
-JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1setPermission
-(JNIEnv *env, jobject obj, jstring j_path, jint j_new_mode)
-{
- const char *c_path = env->GetStringUTFChars(j_path, 0);
- if (c_path == NULL)
- return false;
- struct ceph_mount_info *cmount = get_ceph_mount_t(env, obj);
- int result = ceph_chmod(cmount, c_path, j_new_mode);
- env->ReleaseStringUTFChars(j_path, c_path);
-
- return (result==0);
-}
-
-/*
- * Class: org_apache_hadoop_fs_ceph_CephTalker
- * Method: ceph_kill_client
- * Signature: (J)Z
- *
- * Closes the Ceph client. This should be called before shutting down
- * (multiple times is okay but redundant).
- */
-JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1kill_1client
- (JNIEnv *env, jobject obj)
-{
- struct ceph_mount_info *cmount = get_ceph_mount_t(env, obj);
- if (!cmount)
- return true;
- ceph_shutdown(cmount);
- set_ceph_mount_info(env, obj, NULL);
- return true;
-}
-
-/*
- * Class: org_apache_hadoop_fs_ceph_CephTalker
- * Method: ceph_stat
- * Signature: (Ljava/lang/String;Lorg/apache/hadoop/fs/ceph/CephFileSystem/Stat;)Z
- * Get the statistics on a path returned in a custom format defined
- * in CephTalker.
- * Inputs:
- * jstring j_path: The path to stat.
- * jobject j_stat: The stat object to fill.
- * Returns: true if the stat is successful, false otherwise.
- */
-JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1stat
-(JNIEnv *env, jobject obj, jstring j_path, jobject j_stat)
-{
- //setup variables
- struct stat st;
- const char *c_path = env->GetStringUTFChars(j_path, 0);
- if (c_path == NULL) return false;
-
- jclass cls = env->GetObjectClass(j_stat);
- if (cls == NULL) return false;
- jfieldID c_size_id = env->GetFieldID(cls, "size", "J");
- if (c_size_id == NULL) return false;
- jfieldID c_dir_id = env->GetFieldID(cls, "is_dir", "Z");
- if (c_dir_id == NULL) return false;
- jfieldID c_block_id = env->GetFieldID(cls, "block_size", "J");
- if (c_block_id == NULL) return false;
- jfieldID c_mod_id = env->GetFieldID(cls, "mod_time", "J");
- if (c_mod_id == NULL) return false;
- jfieldID c_access_id = env->GetFieldID(cls, "access_time", "J");
- if (c_access_id == NULL) return false;
- jfieldID c_mode_id = env->GetFieldID(cls, "mode", "I");
- if (c_mode_id == NULL) return false;
- //do actual lstat
- struct ceph_mount_info *cmount = get_ceph_mount_t(env, obj);
- int r = ceph_lstat(cmount, c_path, &st);
- env->ReleaseStringUTFChars(j_path, c_path);
-
- if (r < 0) return false; //fail out; file DNE or Ceph broke
-
- //put variables from struct stat into Java
- env->SetLongField(j_stat, c_size_id, st.st_size);
- env->SetBooleanField(j_stat, c_dir_id, (0 != S_ISDIR(st.st_mode)));
- env->SetLongField(j_stat, c_block_id, st.st_blksize);
-
- long long java_mtime(st.st_mtim.tv_sec);
- java_mtime *= 1000;
- java_mtime += st.st_mtim.tv_nsec / 1000;
- env->SetLongField(j_stat, c_mod_id, java_mtime);
-
- long long java_atime(st.st_atim.tv_sec);
- java_atime *= 1000;
- java_atime += st.st_atim.tv_nsec / 1000;
- env->SetLongField(j_stat, c_access_id, java_atime);
-
- env->SetIntField(j_stat, c_mode_id, (int)st.st_mode);
-
- //return happy
- return true;
-}
-
-/*
- * Class: org_apache_hadoop_fs_ceph_CephTalker
- * Method: ceph_statfs
- * Signature: (Ljava/lang/String;Lorg/apache/hadoop/fs/ceph/CephFileSystem/CephStat;)I
- * Statfs a filesystem in a custom format defined in CephTalker.
- * Inputs:
- * jstring j_path: A path on the filesystem that you wish to stat.
- * jobject j_ceph_stat: The CephStat object to fill.
- * Returns: true if successful and the CephStat is filled; false otherwise.
- */
-JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1statfs
-(JNIEnv *env, jobject obj, jstring j_path, jobject j_cephstat)
-{
- //setup variables
- struct statvfs stbuf;
- const char *c_path = env->GetStringUTFChars(j_path, 0);
- if (c_path == NULL)
- return -ENOMEM;
- jclass cls = env->GetObjectClass(j_cephstat);
- if (cls == NULL)
- return 1; //JVM error of some kind
- jfieldID c_capacity_id = env->GetFieldID(cls, "capacity", "J");
- jfieldID c_used_id = env->GetFieldID(cls, "used", "J");
- jfieldID c_remaining_id = env->GetFieldID(cls, "remaining", "J");
-
- //do the statfs
- struct ceph_mount_info *cmount = get_ceph_mount_t(env, obj);
- int r = ceph_statfs(cmount, c_path, &stbuf);
- env->ReleaseStringUTFChars(j_path, c_path);
- if (r != 0)
- return r; //something broke
-
- //place info into Java; convert from bytes to kilobytes
- env->SetLongField(j_cephstat, c_capacity_id,
- (long)stbuf.f_blocks*stbuf.f_bsize/1024);
- env->SetLongField(j_cephstat, c_used_id,
- (long)(stbuf.f_blocks-stbuf.f_bavail)*stbuf.f_bsize/1024);
- env->SetLongField(j_cephstat, c_remaining_id,
- (long)stbuf.f_bavail*stbuf.f_bsize/1024);
- return r;
-}
-
-/*
- * Class: org_apache_hadoop_fs_ceph_CephTalker
- * Method: ceph_replication
- * Signature: (Ljava/lang/String;)I
- * Check how many times a path should be replicated (if it is
- * degraded it may not actually be replicated this often).
- * Inputs:
- * jstring j_path: The path to check.
- * Returns: an int containing the number of times replicated.
- */
-JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1replication
-(JNIEnv *env, jobject obj, jstring j_path)
-{
- //get c-string of path, send off to libceph, release c-string, return
- const char *c_path = env->GetStringUTFChars(j_path, 0);
- if (c_path == NULL)
- return -ENOMEM;
- ceph_mount_info *cmount = get_ceph_mount_t(env, obj);
- int fh = 0;
- fh = ceph_open(cmount, c_path, O_RDONLY, 0);
- env->ReleaseStringUTFChars(j_path, c_path);
- if (fh < 0) {
- return fh;
- }
- int replication = ceph_get_file_replication(cmount, fh);
- ceph_close(cmount, fh);
- return replication;
-}
-
-/*
- * Class: org_apache_hadoop_fs_ceph_CephTalker
- * Method: ceph_hosts
- * Signature: (IJ)[Ljava/lang/String;
- * Find the IP:port addresses of the primary OSD for a given file and offset.
- * Inputs:
- * jint j_fh: The filehandle for the file.
- * jlong j_offset: The offset to get the location of.
- * Returns: a jstring of the location as IP, or NULL if there is an error.
- */
-JNIEXPORT jobjectArray JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1hosts
-(JNIEnv *env, jobject obj, jint j_fh, jlong j_offset)
-{
- struct ceph_mount_info *cmount = get_ceph_mount_t(env, obj);
- struct sockaddr_storage *ss;
- char address[30];
- jobjectArray addr_array;
- jclass string_cls;
- jstring j_addr;
- int r, n = 3; /* initial guess at # of replicas */
-
- for (;;) {
- ss = new struct sockaddr_storage[n];
- r = ceph_get_file_stripe_address(cmount, j_fh, j_offset, ss, n);
- if (r < 0) {
- if (r == -ERANGE) {
- delete [] ss;
- n *= 2;
- continue;
- }
- return NULL;
- }
- n = r;
- break;
- }
-
- /* TODO: cache this */
- string_cls = env->FindClass("java/lang/String");
- if (!string_cls)
- goto out;
-
- addr_array = env->NewObjectArray(n, string_cls, NULL);
- if (!addr_array)
- goto out;
-
- for (r = 0; r < n; r++) {
- /* Hadoop only deals with IPv4 */
- if (ss[r].ss_family != AF_INET)
- goto out;
-
- memset(address, 0, sizeof(address));
-
- inet_ntop(ss[r].ss_family, &((struct sockaddr_in *)&ss[r])->sin_addr,
- address, sizeof(address));
-
- j_addr = env->NewStringUTF(address);
-
- env->SetObjectArrayElement(addr_array, r, j_addr);
- if (env->ExceptionOccurred())
- goto out;
-
- env->DeleteLocalRef(j_addr);
- }
-
- delete [] ss;
- return addr_array;
-
-out:
- delete [] ss;
- return NULL;
-}
-
-/*
- * Class: org_apache_hadoop_fs_ceph_CephTalker
- * Method: ceph_setTimes
- * Signature: (Ljava/lang/String;JJ)I
- * Set the mtime and atime for a given path.
- * Inputs:
- * jstring j_path: The path to set the times for.
- * jlong mtime: The mtime to set, in millis since epoch (-1 to not set).
- * jlong atime: The atime to set, in millis since epoch (-1 to not set)
- * Returns: 0 if successful, an error code otherwise.
- */
-JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1setTimes
-(JNIEnv *env, jobject obj, jstring j_path, jlong mtime, jlong atime)
-{
- const char *c_path = env->GetStringUTFChars(j_path, 0);
- if(c_path == NULL) return -ENOMEM;
-
- //build the mask for ceph_setattr
- int mask = 0;
- if (mtime!=-1) mask = CEPH_SETATTR_MTIME;
- if (atime!=-1) mask |= CEPH_SETATTR_ATIME;
- //build a struct stat and fill it in!
- //remember to convert from millis to seconds and microseconds
- struct stat attr;
- attr.st_mtim.tv_sec = mtime / 1000;
- attr.st_mtim.tv_nsec = (mtime % 1000) * 1000000;
- attr.st_atim.tv_sec = atime / 1000;
- attr.st_atim.tv_nsec = (atime % 1000) * 1000000;
- struct ceph_mount_info *cmount = get_ceph_mount_t(env, obj);
- return ceph_setattr(cmount, c_path, &attr, mask);
-}
-
-/*
- * Class: org_apache_hadoop_fs_ceph_CephTalker
- * Method: ceph_read
- * Signature: (JI[BII)I
- * Reads into the given byte array from the current position.
- * Inputs:
- * jint fh: the filehandle to read from
- * jbyteArray j_buffer: the byte array to read into
- * jint buffer_offset: where in the buffer to start writing
- * jint length: how much to read.
- * There'd better be enough space in the buffer to write all
- * the data from the given offset!
- * Returns: the number of bytes read on success (as jint),
- * or an error code otherwise.
- */
-JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1read
- (JNIEnv *env, jobject obj, jint fh, jbyteArray j_buffer, jint buffer_offset, jint length)
-{
- struct ceph_mount_info *cmount = get_ceph_mount_t(env, obj);
- CephContext *cct = ceph_get_mount_context(cmount);
- ldout(cct, 10) << "In read" << dendl;
-
-
- // Make sure to convert the Hadoop read arguments into a
- // more ceph-friendly form
- jint result;
-
- // Step 1: get a pointer to the buffer.
- jbyte *j_buffer_ptr = env->GetByteArrayElements(j_buffer, NULL);
- if (j_buffer_ptr == NULL) return -ENOMEM;
- char *c_buffer = (char*) j_buffer_ptr;
-
- // Step 2: pointer arithmetic to start in the right buffer position
- c_buffer += (int)buffer_offset;
-
- // Step 3: do the read
- result = ceph_read(cmount, (int)fh, c_buffer, length, -1);
-
- // Step 4: release the pointer to the buffer
- env->ReleaseByteArrayElements(j_buffer, j_buffer_ptr, 0);
-
- return result;
-}
-
-/*
- * Class: org_apache_hadoop_fs_ceph_CephTalker
- * Method: ceph_seek_from_start
- * Signature: (JIJ)J
- * Seeks to the given position in the given file.
- * Inputs:
- * jint fh: The filehandle to seek in.
- * jlong pos: The position to seek to.
- * Returns: the new position (as a jlong) of the filehandle on success,
- * or a negative error code on failure.
- */
-JNIEXPORT jlong JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1seek_1from_1start
- (JNIEnv *env, jobject obj, jint fh, jlong pos)
-{
- struct ceph_mount_info *cmount = get_ceph_mount_t(env, obj);
- CephContext *cct = ceph_get_mount_context(cmount);
- ldout(cct, 10) << "In CephTalker::seek_from_start" << dendl;
- return ceph_lseek(cmount, fh, pos, SEEK_SET);
-}
-
-/*
- * Class: org_apache_hadoop_fs_ceph_CephTalker
- * Method: ceph_getpos
- * Signature: (I)J
- *
- * Get the current position in a file (as a jlong) of a given filehandle.
- * Returns: jlong current file position on success, or a
- * negative error code on failure.
- */
-JNIEXPORT jlong JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1getpos
- (JNIEnv *env, jobject obj, jint fh)
-{
- // seek a distance of 0 to get current offset
- struct ceph_mount_info *cmount = get_ceph_mount_t(env, obj);
- CephContext *cct = ceph_get_mount_context(cmount);
- ldout(cct, 10) << "In CephTalker::ceph_getpos" << dendl;
- return ceph_lseek(cmount, fh, 0, SEEK_CUR);
-}
-
-/*
- * Class: org_apache_hadoop_fs_ceph_CephTalker
- * Method: ceph_write
- * Signature: (I[BII)I
- * Write the given buffer contents to the given filehandle.
- * Inputs:
- * jint fh: The filehandle to write to.
- * jbyteArray j_buffer: The buffer to write from
- * jint buffer_offset: The position in the buffer to write from
- * jint length: The number of (sequential) bytes to write.
- * Returns: jint, on success the number of bytes written, on failure
- * a negative error code.
- */
-JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1write
- (JNIEnv *env, jobject obj, jint fh, jbyteArray j_buffer, jint buffer_offset, jint length)
-{
- struct ceph_mount_info *cmount = get_ceph_mount_t(env, obj);
- CephContext *cct = ceph_get_mount_context(cmount);
- ldout(cct, 10) << "In write" << dendl;
-
- // IMPORTANT NOTE: Hadoop write arguments are a bit different from POSIX so we
- // have to convert. The write is *always* from the current position in the file,
- // and buffer_offset is the location in the *buffer* where we start writing.
- jint result;
-
- // Step 1: get a pointer to the buffer.
- jbyte *j_buffer_ptr = env->GetByteArrayElements(j_buffer, NULL);
- if (j_buffer_ptr == NULL)
- return -ENOMEM;
- char *c_buffer = (char*) j_buffer_ptr;
-
- // Step 2: pointer arithmetic to start in the right buffer position
- c_buffer += (int)buffer_offset;
-
- // Step 3: do the write
- result = ceph_write(cmount, (int)fh, c_buffer, length, -1);
-
- // Step 4: release the pointer to the buffer
- env->ReleaseByteArrayElements(j_buffer, j_buffer_ptr, 0);
-
- return result;
-}
diff --git a/src/client/hadoop/CephFSInterface.h b/src/client/hadoop/CephFSInterface.h
deleted file mode 100644
index 6939b3a501d..00000000000
--- a/src/client/hadoop/CephFSInterface.h
+++ /dev/null
@@ -1,236 +0,0 @@
-// -*- mode:c++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-/* BE CAREFUL EDITING THIS FILE - it is a compilation of JNI
- machine-generated headers */
-
-#include <jni.h>
-
-#ifndef _Included_org_apache_hadoop_fs_ceph_CephTalker
-#define _Included_org_apache_hadoop_fs_ceph_CephTalker
-#ifdef __cplusplus
-extern "C" {
-#endif
- //these constants are machine-generated to match Java constants in the source
-#undef org_apache_hadoop_fs_ceph_CephFileSystem_DEFAULT_BLOCK_SIZE
-#define org_apache_hadoop_fs_ceph_CephFileSystem_DEFAULT_BLOCK_SIZE 8388608LL
-#undef org_apache_hadoop_fs_ceph_CephInputStream_SKIP_BUFFER_SIZE
-#define org_apache_hadoop_fs_ceph_CephInputStream_SKIP_BUFFER_SIZE 2048L
-/*
- * Class: org_apache_hadoop_fs_ceph_CephTalker
- * Method: ceph_initializeClient
- * Signature: (Ljava/lang/String;I)Z
- */
-JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1initializeClient
- (JNIEnv *, jobject, jstring, jint);
-
-/*
- * Class: org_apache_hadoop_fs_ceph_CephTalker
- * Method: ceph_getcwd
- * Signature: ()Ljava/lang/String;
- */
-JNIEXPORT jstring JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1getcwd
- (JNIEnv *, jobject);
-
-/*
- * Class: org_apache_hadoop_fs_ceph_CephTalker
- * Method: ceph_setcwd
- * Signature: (Ljava/lang/String;)Z
- */
-JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1setcwd
- (JNIEnv *, jobject, jstring);
-
-/*
- * Class: org_apache_hadoop_fs_ceph_CephTalker
- * Method: ceph_rmdir
- * Signature: (Ljava/lang/String;)Z
- */
-JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1rmdir
- (JNIEnv *, jobject, jstring);
-
-/*
- * Class: org_apache_hadoop_fs_ceph_CephTalker
- * Method: ceph_unlink
- * Signature: (Ljava/lang/String;)Z
- */
-JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1unlink
- (JNIEnv *, jobject, jstring);
-
-/*
- * Class: org_apache_hadoop_fs_ceph_CephTalker
- * Method: ceph_rename
- * Signature: (Ljava/lang/String;Ljava/lang/String;)Z
- */
-JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1rename
- (JNIEnv *, jobject, jstring, jstring);
-
-/*
- * Class: org_apache_hadoop_fs_ceph_CephTalker
- * Method: ceph_exists
- * Signature: (Ljava/lang/String;)Z
- */
-JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1exists
- (JNIEnv *, jobject, jstring);
-
-/*
- * Class: org_apache_hadoop_fs_ceph_CephTalker
- * Method: ceph_getblocksize
- * Signature: (Ljava/lang/String;)J
- */
-JNIEXPORT jlong JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1getblocksize
- (JNIEnv *, jobject, jstring);
-
-/*
- * Class: org_apache_hadoop_fs_ceph_CephTalker
- * Method: ceph_isdirectory
- * Signature: (Ljava/lang/String;)Z
- */
-JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1isdirectory
- (JNIEnv *, jobject, jstring);
-
-/*
- * Class: org_apache_hadoop_fs_ceph_CephTalker
- * Method: ceph_isfile
- * Signature: (Ljava/lang/String;)Z
- */
-JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1isfile
- (JNIEnv *, jobject, jstring);
-
-/*
- * Class: org_apache_hadoop_fs_ceph_CephTalker
- * Method: ceph_getdir
- * Signature: (Ljava/lang/String;)[Ljava/lang/String;
- */
-JNIEXPORT jobjectArray JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1getdir
- (JNIEnv *, jobject, jstring);
-
-/*
- * Class: org_apache_hadoop_fs_ceph_CephTalker
- * Method: ceph_mkdirs
- * Signature: (Ljava/lang/String;I)I
- */
-JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1mkdirs
- (JNIEnv *, jobject, jstring, jint);
-
-/*
- * Class: org_apache_hadoop_fs_ceph_CephTalker
- * Method: ceph_open_for_append
- * Signature: (Ljava/lang/String;)I
- */
-JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1open_1for_1append
- (JNIEnv *, jobject, jstring);
-
-/*
- * Class: org_apache_hadoop_fs_ceph_CephTalker
- * Method: ceph_open_for_read
- * Signature: (Ljava/lang/String;)I
- */
-JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1open_1for_1read
- (JNIEnv *, jobject, jstring);
-
-/*
- * Class: org_apache_hadoop_fs_ceph_CephTalker
- * Method: ceph_open_for_overwrite
- * Signature: (Ljava/lang/String;I)I
- */
-JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1open_1for_1overwrite
- (JNIEnv *, jobject, jstring, jint);
-
-/*
- * Class: org_apache_hadoop_fs_ceph_CephTalker
- * Method: ceph_close
- * Signature: (I)I
- */
-JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1close
- (JNIEnv *, jobject, jint);
-
-/*
- * Class: org_apache_hadoop_fs_ceph_CephTalker
- * Method: ceph_setPermission
- * Signature: (Ljava/lang/String;I)Z
- */
-JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1setPermission
- (JNIEnv *, jobject, jstring, jint);
-
-/*
- * Class: org_apache_hadoop_fs_ceph_CephTalker
- * Method: ceph_kill_client
- * Signature: ()Z
- */
-JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1kill_1client
- (JNIEnv *, jobject);
-
-/*
- * Class: org_apache_hadoop_fs_ceph_CephTalker
- * Method: ceph_stat
- * Signature: (Ljava/lang/String;Lorg/apache/hadoop/fs/ceph/CephFileSystem/Stat;)Z
- */
-JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1stat
- (JNIEnv *, jobject, jstring, jobject);
-
-/*
- * Class: org_apache_hadoop_fs_ceph_CephTalker
- * Method: ceph_statfs
- * Signature: (Ljava/lang/String;Lorg/apache/hadoop/fs/ceph/CephFileSystem/CephStat;)I
- */
-JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1statfs
-(JNIEnv * env, jobject obj, jstring j_path, jobject j_cephstat);
-
-/*
- * Class: org_apache_hadoop_fs_ceph_CephTalker
- * Method: ceph_replication
- * Signature: (Ljava/lang/String;)I
- */
-JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1replication
- (JNIEnv *, jobject, jstring);
-
-/*
- * Class: org_apache_hadoop_fs_ceph_CephTalker
- * Method: ceph_hosts
- * Signature: (IJ)[Ljava/lang/String;
- */
-JNIEXPORT jobjectArray JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1hosts
- (JNIEnv *, jobject, jint, jlong);
-
-/*
- * Class: org_apache_hadoop_fs_ceph_CephTalker
- * Method: ceph_setTimes
- * Signature: (Ljava/lang/String;JJ)I
- */
-JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1setTimes
- (JNIEnv *, jobject, jstring, jlong, jlong);
-
-/*
- * Class: org_apache_hadoop_fs_ceph_CephTalker
- * Method: ceph_read
- * Signature: (I[BII)I
- */
-JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1read
- (JNIEnv *, jobject, jint, jbyteArray, jint, jint);
-
-/*
- * Class: org_apache_hadoop_fs_ceph_CephTalker
- * Method: ceph_seek_from_start
- * Signature: (IJ)J
- */
-JNIEXPORT jlong JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1seek_1from_1start
- (JNIEnv *, jobject, jint, jlong);
-
-/*
- * Class: org_apache_hadoop_fs_ceph_CephTalker
- * Method: ceph_getpos
- * Signature: (I)J
- */
-JNIEXPORT jlong JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1getpos
- (JNIEnv *, jobject, jint);
-
-/*
- * Class: org_apache_hadoop_fs_ceph_CephTalker
- * Method: ceph_write
- * Signature: (I[BII)I
- */
-JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1write
- (JNIEnv *, jobject, jint, jbyteArray, jint, jint);
-
-#ifdef __cplusplus
-}
-#endif
-#endif
diff --git a/src/client/hadoop/HADOOP-ceph.patch b/src/client/hadoop/HADOOP-ceph.patch
deleted file mode 100644
index 84cdb370f77..00000000000
--- a/src/client/hadoop/HADOOP-ceph.patch
+++ /dev/null
@@ -1,2234 +0,0 @@
-diff --git a/src/core/core-default.xml b/src/core/core-default.xml
-index 8bc3b99..26543bc 100644
---- a/src/core/core-default.xml
-+++ b/src/core/core-default.xml
-@@ -210,6 +210,12 @@
- </property>
-
- <property>
-+ <name>fs.ceph.impl</name>
-+ <value>org.apache.hadoop.fs.ceph.CephFileSystem</value>
-+ <description>The file system for ceph: uris.</description>
-+</property>
-+
-+<property>
- <name>fs.har.impl.disable.cache</name>
- <value>true</value>
- <description>Don't cache 'har' filesystem instances.</description>
-diff --git a/src/core/org/apache/hadoop/fs/ceph/CephFS.java b/src/core/org/apache/hadoop/fs/ceph/CephFS.java
-new file mode 100644
-index 0000000..5d51eb2
---- /dev/null
-+++ b/src/core/org/apache/hadoop/fs/ceph/CephFS.java
-@@ -0,0 +1,250 @@
-+// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*-
-+
-+/**
-+ *
-+ * Licensed under the Apache License, Version 2.0
-+ * (the "License"); you may not use this file except in compliance with
-+ * the License. You may obtain a copy of the License at
-+ *
-+ * http://www.apache.org/licenses/LICENSE-2.0
-+ *
-+ * Unless required by applicable law or agreed to in writing, software
-+ * distributed under the License is distributed on an "AS IS" BASIS,
-+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
-+ * implied. See the License for the specific language governing
-+ * permissions and limitations under the License.
-+ *
-+ *
-+ * Abstract base class for communicating with a Ceph filesystem and its
-+ * C++ codebase from Java, or pretending to do so (for unit testing purposes).
-+ * As only the Ceph package should be using this directly, all methods
-+ * are protected.
-+ */
-+package org.apache.hadoop.fs.ceph;
-+
-+import org.apache.hadoop.conf.Configuration;
-+
-+abstract class CephFS {
-+
-+ protected static final int ENOTDIR = 20;
-+ protected static final int EEXIST = 17;
-+ protected static final int ENOENT = 2;
-+
-+ /*
-+ * Performs any necessary setup to allow general use of the filesystem.
-+ * Inputs:
-+ * String argsuments -- a command-line style input of Ceph config params
-+ * int block_size -- the size in bytes to use for blocks
-+ * Returns: true on success, false otherwise
-+ */
-+ abstract protected boolean ceph_initializeClient(String arguments, int block_size);
-+
-+ /*
-+ * Returns the current working directory (absolute) as a String
-+ */
-+ abstract protected String ceph_getcwd();
-+
-+ /*
-+ * Changes the working directory.
-+ * Inputs:
-+ * String path: The path (relative or absolute) to switch to
-+ * Returns: true on success, false otherwise.
-+ */
-+ abstract protected boolean ceph_setcwd(String path);
-+
-+ /*
-+ * Given a path to a directory, removes the directory if empty.
-+ * Inputs:
-+ * jstring j_path: The path (relative or absolute) to the directory
-+ * Returns: true on successful delete; false otherwise
-+ */
-+ abstract protected boolean ceph_rmdir(String path);
-+
-+ /*
-+ * Given a path, unlinks it.
-+ * Inputs:
-+ * String path: The path (relative or absolute) to the file or empty dir
-+ * Returns: true if the unlink occurred, false otherwise.
-+ */
-+ abstract protected boolean ceph_unlink(String path);
-+
-+ /*
-+ * Changes a given path name to a new name, assuming new_path doesn't exist.
-+ * Inputs:
-+ * jstring j_from: The path whose name you want to change.
-+ * jstring j_to: The new name for the path.
-+ * Returns: true if the rename occurred, false otherwise
-+ */
-+ abstract protected boolean ceph_rename(String old_path, String new_path);
-+
-+ /*
-+ * Returns true if it the input path exists, false
-+ * if it does not or there is an unexpected failure.
-+ */
-+ abstract protected boolean ceph_exists(String path);
-+
-+ /*
-+ * Get the block size for a given path.
-+ * Input:
-+ * String path: The path (relative or absolute) you want
-+ * the block size for.
-+ * Returns: block size if the path exists, otherwise a negative number
-+ * corresponding to the standard C++ error codes (which are positive).
-+ */
-+ abstract protected long ceph_getblocksize(String path);
-+
-+ /*
-+ * Returns true if the given path is a directory, false otherwise.
-+ */
-+ abstract protected boolean ceph_isdirectory(String path);
-+
-+ /*
-+ * Returns true if the given path is a file; false otherwise.
-+ */
-+ abstract protected boolean ceph_isfile(String path);
-+
-+ /*
-+ * Get the contents of a given directory.
-+ * Inputs:
-+ * String path: The path (relative or absolute) to the directory.
-+ * Returns: A Java String[] of the contents of the directory, or
-+ * NULL if there is an error (ie, path is not a dir). This listing
-+ * will not contain . or .. entries.
-+ */
-+ abstract protected String[] ceph_getdir(String path);
-+
-+ /*
-+ * Create the specified directory and any required intermediate ones with the
-+ * given mode.
-+ */
-+ abstract protected int ceph_mkdirs(String path, int mode);
-+
-+ /*
-+ * Open a file to append. If the file does not exist, it will be created.
-+ * Opening a dir is possible but may have bad results.
-+ * Inputs:
-+ * String path: The path to open.
-+ * Returns: an int filehandle, or a number<0 if an error occurs.
-+ */
-+ abstract protected int ceph_open_for_append(String path);
-+
-+ /*
-+ * Open a file for reading.
-+ * Opening a dir is possible but may have bad results.
-+ * Inputs:
-+ * String path: The path to open.
-+ * Returns: an int filehandle, or a number<0 if an error occurs.
-+ */
-+ abstract protected int ceph_open_for_read(String path);
-+
-+ /*
-+ * Opens a file for overwriting; creates it if necessary.
-+ * Opening a dir is possible but may have bad results.
-+ * Inputs:
-+ * String path: The path to open.
-+ * int mode: The mode to open with.
-+ * Returns: an int filehandle, or a number<0 if an error occurs.
-+ */
-+ abstract protected int ceph_open_for_overwrite(String path, int mode);
-+
-+ /*
-+ * Closes the given file. Returns 0 on success, or a negative
-+ * error code otherwise.
-+ */
-+ abstract protected int ceph_close(int filehandle);
-+
-+ /*
-+ * Change the mode on a path.
-+ * Inputs:
-+ * String path: The path to change mode on.
-+ * int mode: The mode to apply.
-+ * Returns: true if the mode is properly applied, false if there
-+ * is any error.
-+ */
-+ abstract protected boolean ceph_setPermission(String path, int mode);
-+
-+ /*
-+ * Closes the Ceph client. This should be called before shutting down
-+ * (multiple times is okay but redundant).
-+ */
-+ abstract protected boolean ceph_kill_client();
-+
-+ /*
-+ * Get the statistics on a path returned in a custom format defined
-+ * in CephFileSystem.
-+ * Inputs:
-+ * String path: The path to stat.
-+ * Stat fill: The stat object to fill.
-+ * Returns: true if the stat is successful, false otherwise.
-+ */
-+ abstract protected boolean ceph_stat(String path, CephFileSystem.Stat fill);
-+
-+ /*
-+ * Check how many times a file should be replicated. If it is,
-+ * degraded it may not actually be replicated this often.
-+ * Inputs:
-+ * int fh: a file descriptor
-+ * Returns: an int containing the number of times replicated.
-+ */
-+ abstract protected int ceph_replication(String path);
-+
-+ /*
-+ * Find the IP address of the primary OSD for a given file and offset.
-+ * Inputs:
-+ * int fh: The filehandle for the file.
-+ * long offset: The offset to get the location of.
-+ * Returns: an array of String of the location as IP, or NULL if there is an error.
-+ */
-+ abstract protected String[] ceph_hosts(int fh, long offset);
-+
-+ /*
-+ * Set the mtime and atime for a given path.
-+ * Inputs:
-+ * String path: The path to set the times for.
-+ * long mtime: The mtime to set, in millis since epoch (-1 to not set).
-+ * long atime: The atime to set, in millis since epoch (-1 to not set)
-+ * Returns: 0 if successful, an error code otherwise.
-+ */
-+ abstract protected int ceph_setTimes(String path, long mtime, long atime);
-+
-+ /*
-+ * Get the current position in a file (as a long) of a given filehandle.
-+ * Returns: (long) current file position on success, or a
-+ * negative error code on failure.
-+ */
-+ abstract protected long ceph_getpos(int fh);
-+
-+ /*
-+ * Write the given buffer contents to the given filehandle.
-+ * Inputs:
-+ * int fh: The filehandle to write to.
-+ * byte[] buffer: The buffer to write from
-+ * int buffer_offset: The position in the buffer to write from
-+ * int length: The number of (sequential) bytes to write.
-+ * Returns: int, on success the number of bytes written, on failure
-+ * a negative error code.
-+ */
-+ abstract protected int ceph_write(int fh, byte[] buffer, int buffer_offset, int length);
-+
-+ /*
-+ * Reads into the given byte array from the current position.
-+ * Inputs:
-+ * int fh: the filehandle to read from
-+ * byte[] buffer: the byte array to read into
-+ * int buffer_offset: where in the buffer to start writing
-+ * int length: how much to read.
-+ * There'd better be enough space in the buffer to write all
-+ * the data from the given offset!
-+ * Returns: the number of bytes read on success (as an int),
-+ * or an error code otherwise. */
-+ abstract protected int ceph_read(int fh, byte[] buffer, int buffer_offset, int length);
-+
-+ /*
-+ * Seeks to the given position in the given file.
-+ * Inputs:
-+ * int fh: The filehandle to seek in.
-+ * long pos: The position to seek to.
-+ * Returns: the new position (as a long) of the filehandle on success,
-+ * or a negative error code on failure. */
-+ abstract protected long ceph_seek_from_start(int fh, long pos);
-+}
-diff --git a/src/core/org/apache/hadoop/fs/ceph/CephFaker.java b/src/core/org/apache/hadoop/fs/ceph/CephFaker.java
-new file mode 100644
-index 0000000..c598f53
---- /dev/null
-+++ b/src/core/org/apache/hadoop/fs/ceph/CephFaker.java
-@@ -0,0 +1,483 @@
-+// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*-
-+
-+/**
-+ *
-+ * Licensed under the Apache License, Version 2.0
-+ * (the "License"); you may not use this file except in compliance with
-+ * the License. You may obtain a copy of the License at
-+ *
-+ * http://www.apache.org/licenses/LICENSE-2.0
-+ *
-+ * Unless required by applicable law or agreed to in writing, software
-+ * distributed under the License is distributed on an "AS IS" BASIS,
-+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
-+ * implied. See the License for the specific language governing
-+ * permissions and limitations under the License.
-+ *
-+ *
-+ * This uses the local Filesystem but pretends to be communicating
-+ * with a Ceph deployment, for unit testing the CephFileSystem.
-+ */
-+
-+package org.apache.hadoop.fs.ceph;
-+
-+
-+import java.net.URI;
-+import java.util.Hashtable;
-+import java.io.Closeable;
-+import java.io.FileNotFoundException;
-+import java.io.IOException;
-+
-+import org.apache.commons.logging.Log;
-+import org.apache.commons.logging.LogFactory;
-+import org.apache.hadoop.conf.Configuration;
-+import org.apache.hadoop.fs.BlockLocation;
-+import org.apache.hadoop.fs.FileStatus;
-+import org.apache.hadoop.fs.FileSystem;
-+import org.apache.hadoop.fs.FSDataInputStream;
-+import org.apache.hadoop.fs.FSDataOutputStream;
-+import org.apache.hadoop.fs.Path;
-+import org.apache.hadoop.fs.permission.FsPermission;
-+
-+
-+class CephFaker extends CephFS {
-+ private static final Log LOG = LogFactory.getLog(CephFaker.class);
-+ FileSystem localFS;
-+ String localPrefix;
-+ int blockSize;
-+ Configuration conf;
-+ Hashtable<Integer, Object> files;
-+ Hashtable<Integer, String> filenames;
-+ int fileCount = 0;
-+ boolean initialized = false;
-+
-+ public CephFaker(Configuration con, Log log) {
-+ conf = con;
-+ files = new Hashtable<Integer, Object>();
-+ filenames = new Hashtable<Integer, String>();
-+ }
-+
-+ protected boolean ceph_initializeClient(String args, int block_size) {
-+ if (!initialized) {
-+ // let's remember the default block_size
-+ blockSize = block_size;
-+
-+ /* for a real Ceph deployment, this starts up the client,
-+ * sets debugging levels, etc. We just need to get the
-+ * local FileSystem to use, and we'll ignore any
-+ * command-line arguments. */
-+ try {
-+ localFS = FileSystem.getLocal(conf);
-+ localFS.initialize(URI.create("file://localhost"), conf);
-+ localFS.setVerifyChecksum(false);
-+ String testDir = conf.get("hadoop.tmp.dir");
-+
-+ localPrefix = localFS.getWorkingDirectory().toString();
-+ int testDirLoc = localPrefix.indexOf(testDir) - 1;
-+
-+ if (-2 == testDirLoc) {
-+ testDirLoc = localPrefix.length();
-+ }
-+ localPrefix = localPrefix.substring(0, testDirLoc) + "/"
-+ + conf.get("hadoop.tmp.dir");
-+
-+ localFS.setWorkingDirectory(
-+ new Path(localPrefix + "/user/" + System.getProperty("user.name")));
-+ // I don't know why, but the unit tests expect the default
-+ // working dir to be /user/username, so satisfy them!
-+ // debug("localPrefix is " + localPrefix, INFO);
-+ } catch (IOException e) {
-+ return false;
-+ }
-+ initialized = true;
-+ }
-+ return true;
-+ }
-+
-+ protected String ceph_getcwd() {
-+ return sanitize_path(localFS.getWorkingDirectory().toString());
-+ }
-+
-+ protected boolean ceph_setcwd(String path) {
-+ localFS.setWorkingDirectory(new Path(prepare_path(path)));
-+ return true;
-+ }
-+
-+ // the caller is responsible for ensuring empty dirs
-+ protected boolean ceph_rmdir(String pth) {
-+ Path path = new Path(prepare_path(pth));
-+ boolean ret = false;
-+
-+ try {
-+ if (localFS.listStatus(path).length <= 1) {
-+ ret = localFS.delete(path, true);
-+ }
-+ } catch (IOException e) {}
-+ return ret;
-+ }
-+
-+ // this needs to work on (empty) directories too
-+ protected boolean ceph_unlink(String path) {
-+ path = prepare_path(path);
-+ boolean ret = false;
-+
-+ if (ceph_isdirectory(path)) {
-+ ret = ceph_rmdir(path);
-+ } else {
-+ try {
-+ ret = localFS.delete(new Path(path), false);
-+ } catch (IOException e) {}
-+ }
-+ return ret;
-+ }
-+
-+ protected boolean ceph_rename(String oldName, String newName) {
-+ oldName = prepare_path(oldName);
-+ newName = prepare_path(newName);
-+ try {
-+ Path parent = new Path(newName).getParent();
-+ Path newPath = new Path(newName);
-+
-+ if (localFS.exists(parent) && !localFS.exists(newPath)) {
-+ return localFS.rename(new Path(oldName), newPath);
-+ }
-+ return false;
-+ } catch (IOException e) {
-+ return false;
-+ }
-+ }
-+
-+ protected boolean ceph_exists(String path) {
-+ path = prepare_path(path);
-+ boolean ret = false;
-+
-+ try {
-+ ret = localFS.exists(new Path(path));
-+ } catch (IOException e) {}
-+ return ret;
-+ }
-+
-+ protected long ceph_getblocksize(String path) {
-+ path = prepare_path(path);
-+ try {
-+ FileStatus status = localFS.getFileStatus(new Path(path));
-+
-+ return status.getBlockSize();
-+ } catch (FileNotFoundException e) {
-+ return -CephFS.ENOENT;
-+ } catch (IOException e) {
-+ return -1; // just fail generically
-+ }
-+ }
-+
-+ protected boolean ceph_isdirectory(String path) {
-+ path = prepare_path(path);
-+ try {
-+ FileStatus status = localFS.getFileStatus(new Path(path));
-+
-+ return status.isDir();
-+ } catch (IOException e) {
-+ return false;
-+ }
-+ }
-+
-+ protected boolean ceph_isfile(String path) {
-+ path = prepare_path(path);
-+ boolean ret = false;
-+
-+ try {
-+ FileStatus status = localFS.getFileStatus(new Path(path));
-+
-+ ret = !status.isDir();
-+ } catch (Exception e) {}
-+ return ret;
-+ }
-+
-+ protected String[] ceph_getdir(String path) {
-+ path = prepare_path(path);
-+ if (!ceph_isdirectory(path)) {
-+ return null;
-+ }
-+ try {
-+ FileStatus[] stats = localFS.listStatus(new Path(path));
-+ String[] names = new String[stats.length];
-+ String name;
-+
-+ for (int i = 0; i < stats.length; ++i) {
-+ name = stats[i].getPath().toString();
-+ names[i] = name.substring(name.lastIndexOf(Path.SEPARATOR) + 1);
-+ }
-+ return names;
-+ } catch (IOException e) {}
-+ return null;
-+ }
-+
-+ protected int ceph_mkdirs(String path, int mode) {
-+ path = prepare_path(path);
-+ // debug("ceph_mkdirs on " + path, INFO);
-+ try {
-+ if (localFS.mkdirs(new Path(path), new FsPermission((short) mode))) {
-+ return 0;
-+ }
-+ } catch (IOException e) {}
-+ if (ceph_isdirectory(path)) { // apparently it already existed
-+ return -EEXIST;
-+ } else if (ceph_isfile(path)) {
-+ return -ENOTDIR;
-+ }
-+ return -1;
-+ }
-+
-+ /*
-+ * Unlike a real Ceph deployment, you can't do opens on a directory.
-+ * Since that has unpredictable behavior and you shouldn't do it anyway,
-+ * it's okay.
-+ */
-+ protected int ceph_open_for_append(String path) {
-+ path = prepare_path(path);
-+ FSDataOutputStream stream;
-+
-+ try {
-+ stream = localFS.append(new Path(path));
-+ files.put(new Integer(fileCount), stream);
-+ filenames.put(new Integer(fileCount), path);
-+ return fileCount++;
-+ } catch (IOException e) {}
-+ return -1; // failure
-+ }
-+
-+ protected int ceph_open_for_read(String path) {
-+ path = prepare_path(path);
-+ FSDataInputStream stream;
-+
-+ try {
-+ stream = localFS.open(new Path(path));
-+ files.put(new Integer(fileCount), stream);
-+ filenames.put(new Integer(fileCount), path);
-+ LOG.info("ceph_open_for_read fh:" + fileCount + ", pathname:" + path);
-+ return fileCount++;
-+ } catch (IOException e) {}
-+ return -1; // failure
-+ }
-+
-+ protected int ceph_open_for_overwrite(String path, int mode) {
-+ path = prepare_path(path);
-+ FSDataOutputStream stream;
-+
-+ try {
-+ stream = localFS.create(new Path(path));
-+ files.put(new Integer(fileCount), stream);
-+ filenames.put(new Integer(fileCount), path);
-+ LOG.info("ceph_open_for_overwrite fh:" + fileCount + ", pathname:" + path);
-+ return fileCount++;
-+ } catch (IOException e) {}
-+ return -1; // failure
-+ }
-+
-+ protected int ceph_close(int filehandle) {
-+ LOG.info("ceph_close(filehandle " + filehandle + ")");
-+ try {
-+ ((Closeable) files.get(new Integer(filehandle))).close();
-+ if (null == files.get(new Integer(filehandle))) {
-+ return -ENOENT; // this isn't quite the right error code,
-+ // but the important part is it's negative
-+ }
-+ return 0; // hurray, success
-+ } catch (NullPointerException ne) {
-+ LOG.warn("ceph_close caught NullPointerException!" + ne);
-+ } // err, how?
-+ catch (IOException ie) {
-+ LOG.warn("ceph_close caught IOException!" + ie);
-+ }
-+ return -1; // failure
-+ }
-+
-+ protected boolean ceph_setPermission(String pth, int mode) {
-+ pth = prepare_path(pth);
-+ Path path = new Path(pth);
-+ boolean ret = false;
-+
-+ try {
-+ localFS.setPermission(path, new FsPermission((short) mode));
-+ ret = true;
-+ } catch (IOException e) {}
-+ return ret;
-+ }
-+
-+ // rather than try and match a Ceph deployment's behavior exactly,
-+ // just make bad things happen if they try and call methods after this
-+ protected boolean ceph_kill_client() {
-+ // debug("ceph_kill_client", INFO);
-+ localFS.setWorkingDirectory(new Path(localPrefix));
-+ // debug("working dir is now " + localFS.getWorkingDirectory(), INFO);
-+ try {
-+ localFS.close();
-+ } catch (Exception e) {}
-+ localFS = null;
-+ files = null;
-+ filenames = null;
-+ return true;
-+ }
-+
-+ protected boolean ceph_stat(String pth, CephFileSystem.Stat fill) {
-+ pth = prepare_path(pth);
-+ Path path = new Path(pth);
-+ boolean ret = false;
-+
-+ try {
-+ FileStatus status = localFS.getFileStatus(path);
-+
-+ fill.size = status.getLen();
-+ fill.is_dir = status.isDir();
-+ fill.block_size = status.getBlockSize();
-+ fill.mod_time = status.getModificationTime();
-+ fill.access_time = status.getAccessTime();
-+ fill.mode = status.getPermission().toShort();
-+ ret = true;
-+ } catch (IOException e) {}
-+ return ret;
-+ }
-+
-+ protected int ceph_replication(String path) {
-+ path = prepare_path(path);
-+ int ret = -1; // -1 for failure
-+
-+ try {
-+ ret = localFS.getFileStatus(new Path(path)).getReplication();
-+ } catch (IOException e) {}
-+ return ret;
-+ }
-+
-+ protected String[] ceph_hosts(int fh, long offset) {
-+ String[] ret = null;
-+
-+ try {
-+ BlockLocation[] locs = localFS.getFileBlockLocations(
-+ localFS.getFileStatus(new Path(filenames.get(new Integer(fh)))),
-+ offset, 1);
-+
-+ ret = locs[0].getNames();
-+ } catch (IOException e) {} catch (NullPointerException f) {}
-+ return ret;
-+ }
-+
-+ protected int ceph_setTimes(String pth, long mtime, long atime) {
-+ pth = prepare_path(pth);
-+ Path path = new Path(pth);
-+ int ret = -1; // generic fail
-+
-+ try {
-+ localFS.setTimes(path, mtime, atime);
-+ ret = 0;
-+ } catch (IOException e) {}
-+ return ret;
-+ }
-+
-+ protected long ceph_getpos(int fh) {
-+ long ret = -1; // generic fail
-+
-+ try {
-+ Object stream = files.get(new Integer(fh));
-+
-+ if (stream instanceof FSDataInputStream) {
-+ ret = ((FSDataInputStream) stream).getPos();
-+ } else if (stream instanceof FSDataOutputStream) {
-+ ret = ((FSDataOutputStream) stream).getPos();
-+ }
-+ } catch (IOException e) {} catch (NullPointerException f) {}
-+ return ret;
-+ }
-+
-+ protected int ceph_write(int fh, byte[] buffer,
-+ int buffer_offset, int length) {
-+ LOG.info(
-+ "ceph_write fh:" + fh + ", buffer_offset:" + buffer_offset + ", length:"
-+ + length);
-+ long ret = -1; // generic fail
-+
-+ try {
-+ FSDataOutputStream os = (FSDataOutputStream) files.get(new Integer(fh));
-+
-+ LOG.info("ceph_write got outputstream");
-+ long startPos = os.getPos();
-+
-+ os.write(buffer, buffer_offset, length);
-+ ret = os.getPos() - startPos;
-+ } catch (IOException e) {
-+ LOG.warn("ceph_write caught IOException!");
-+ } catch (NullPointerException f) {
-+ LOG.warn("ceph_write caught NullPointerException!");
-+ }
-+ return (int) ret;
-+ }
-+
-+ protected int ceph_read(int fh, byte[] buffer,
-+ int buffer_offset, int length) {
-+ long ret = -1; // generic fail
-+
-+ try {
-+ FSDataInputStream is = (FSDataInputStream) files.get(new Integer(fh));
-+ long startPos = is.getPos();
-+
-+ is.read(buffer, buffer_offset, length);
-+ ret = is.getPos() - startPos;
-+ } catch (IOException e) {} catch (NullPointerException f) {}
-+ return (int) ret;
-+ }
-+
-+ protected long ceph_seek_from_start(int fh, long pos) {
-+ LOG.info("ceph_seek_from_start(fh " + fh + ", pos " + pos + ")");
-+ long ret = -1; // generic fail
-+
-+ try {
-+ LOG.info("ceph_seek_from_start filename is " + filenames.get(new Integer(fh)));
-+ if (null == files.get(new Integer(fh))) {
-+ LOG.warn("ceph_seek_from_start: is is null!");
-+ }
-+ FSDataInputStream is = (FSDataInputStream) files.get(new Integer(fh));
-+
-+ LOG.info("ceph_seek_from_start retrieved is!");
-+ is.seek(pos);
-+ ret = is.getPos();
-+ } catch (IOException e) {
-+ LOG.warn("ceph_seek_from_start caught IOException!");
-+ } catch (NullPointerException f) {
-+ LOG.warn("ceph_seek_from_start caught NullPointerException!");
-+ }
-+ return (int) ret;
-+ }
-+
-+ /*
-+ * We need to remove the localFS file prefix before returning to Ceph
-+ */
-+ private String sanitize_path(String path) {
-+ // debug("sanitize_path(" + path + ")", INFO);
-+ /* if (path.startsWith("file:"))
-+ path = path.substring("file:".length()); */
-+ if (path.startsWith(localPrefix)) {
-+ path = path.substring(localPrefix.length());
-+ if (path.length() == 0) { // it was a root path
-+ path = "/";
-+ }
-+ }
-+ // debug("sanitize_path returning " + path, INFO);
-+ return path;
-+ }
-+
-+ /*
-+ * If it's an absolute path we need to shove the
-+ * test dir onto the front as a prefix.
-+ */
-+ private String prepare_path(String path) {
-+ // debug("prepare_path(" + path + ")", INFO);
-+ if (path.startsWith("/")) {
-+ path = localPrefix + path;
-+ } else if (path.equals("..")) {
-+ if (ceph_getcwd().equals("/")) {
-+ path = ".";
-+ } // you can't go up past root!
-+ }
-+ // debug("prepare_path returning" + path, INFO);
-+ return path;
-+ }
-+}
-diff --git a/src/core/org/apache/hadoop/fs/ceph/CephFileSystem.java b/src/core/org/apache/hadoop/fs/ceph/CephFileSystem.java
-new file mode 100644
-index 0000000..95f2223
---- /dev/null
-+++ b/src/core/org/apache/hadoop/fs/ceph/CephFileSystem.java
-@@ -0,0 +1,804 @@
-+// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*-
-+
-+/**
-+ *
-+ * Licensed under the Apache License, Version 2.0
-+ * (the "License"); you may not use this file except in compliance with
-+ * the License. You may obtain a copy of the License at
-+ *
-+ * http://www.apache.org/licenses/LICENSE-2.0
-+ *
-+ * Unless required by applicable law or agreed to in writing, software
-+ * distributed under the License is distributed on an "AS IS" BASIS,
-+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
-+ * implied. See the License for the specific language governing
-+ * permissions and limitations under the License.
-+ *
-+ *
-+ * Implements the Hadoop FS interfaces to allow applications to store
-+ * files in Ceph.
-+ */
-+package org.apache.hadoop.fs.ceph;
-+
-+
-+import java.io.IOException;
-+import java.io.FileNotFoundException;
-+import java.io.OutputStream;
-+import java.net.URI;
-+import java.net.InetAddress;
-+import java.util.EnumSet;
-+import java.lang.Math;
-+import java.util.ArrayList;
-+
-+import org.apache.commons.logging.Log;
-+import org.apache.commons.logging.LogFactory;
-+import org.apache.hadoop.conf.Configuration;
-+import org.apache.hadoop.fs.BlockLocation;
-+import org.apache.hadoop.fs.FSDataInputStream;
-+import org.apache.hadoop.fs.FSInputStream;
-+import org.apache.hadoop.fs.FSDataOutputStream;
-+import org.apache.hadoop.fs.FileSystem;
-+import org.apache.hadoop.fs.FileUtil;
-+import org.apache.hadoop.fs.Path;
-+import org.apache.hadoop.fs.permission.FsPermission;
-+import org.apache.hadoop.util.Progressable;
-+import org.apache.hadoop.fs.FileStatus;
-+import org.apache.hadoop.net.DNS;
-+
-+
-+/**
-+ * <p>
-+ * A {@link FileSystem} backed by <a href="http://ceph.newdream.net">Ceph.</a>.
-+ * This will not start a Ceph instance; one must already be running.
-+ * </p>
-+ * Configuration of the CephFileSystem is handled via a few Hadoop
-+ * Configuration properties: <br>
-+ * fs.ceph.monAddr -- the ip address/port of the monitor to connect to. <br>
-+ * fs.ceph.libDir -- the directory that libcephfs and libhadoopceph are
-+ * located in. This assumes Hadoop is being run on a linux-style machine
-+ * with names like libcephfs.so.
-+ * fs.ceph.commandLine -- if you prefer you can fill in this property
-+ * just as you would when starting Ceph up from the command line. Specific
-+ * properties override any configuration specified here.
-+ * <p>
-+ * You can also enable debugging of the CephFileSystem and Ceph itself: <br>
-+ * fs.ceph.debug -- if 'true' will print out method enter/exit messages,
-+ * plus a little more.
-+ * fs.ceph.clientDebug/fs.ceph.messengerDebug -- will print out debugging
-+ * from the respective Ceph system of at least that importance.
-+ */
-+public class CephFileSystem extends FileSystem {
-+ private static final Log LOG = LogFactory.getLog(CephFileSystem.class);
-+ private URI uri;
-+
-+ private Path workingDir;
-+ private final Path root;
-+ private CephFS ceph = null;
-+
-+ private static String CEPH_NAMESERVER;
-+ private static final String CEPH_NAMESERVER_KEY = "fs.ceph.nameserver";
-+ private static final String CEPH_NAMESERVER_DEFAULT = "localhost";
-+
-+ /**
-+ * Create a new CephFileSystem.
-+ */
-+ public CephFileSystem() {
-+ root = new Path("/");
-+ }
-+
-+ /**
-+ * Used for testing purposes, this constructor
-+ * sets the given CephFS instead of defaulting to a
-+ * CephTalker (with its assumed real Ceph instance to talk to).
-+ */
-+ public CephFileSystem(CephFS ceph_fs) {
-+ super();
-+ root = new Path("/");
-+ ceph = ceph_fs;
-+ }
-+
-+ /**
-+ * Lets you get the URI of this CephFileSystem.
-+ * @return the URI.
-+ */
-+ public URI getUri() {
-+ LOG.debug("getUri:exit with return " + uri);
-+ return uri;
-+ }
-+
-+ /**
-+ * Should be called after constructing a CephFileSystem but before calling
-+ * any other methods.
-+ * Starts up the connection to Ceph, reads in configuraton options, etc.
-+ * @param uri The URI for this filesystem.
-+ * @param conf The Hadoop Configuration to retrieve properties from.
-+ * @throws IOException if necessary properties are unset.
-+ */
-+ @Override
-+ public void initialize(URI uri, Configuration conf) throws IOException {
-+ super.initialize(uri, conf);
-+ setConf(conf);
-+ this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
-+ if (ceph == null) {
-+ ceph = new CephTalker(conf, LOG);
-+ }
-+
-+ CEPH_NAMESERVER = conf.get(CEPH_NAMESERVER_KEY, CEPH_NAMESERVER_DEFAULT);
-+
-+ // build up the arguments for Ceph
-+ String arguments = "CephFSInterface";
-+
-+ arguments += conf.get("fs.ceph.commandLine", "");
-+ if (conf.get("fs.ceph.clientDebug") != null) {
-+ arguments += " --debug_client ";
-+ arguments += conf.get("fs.ceph.clientDebug");
-+ }
-+ if (conf.get("fs.ceph.messengerDebug") != null) {
-+ arguments += " --debug_ms ";
-+ arguments += conf.get("fs.ceph.messengerDebug");
-+ }
-+ if (conf.get("fs.ceph.monAddr") != null) {
-+ arguments += " -m ";
-+ arguments += conf.get("fs.ceph.monAddr");
-+ }
-+ arguments += " --client-readahead-max-periods="
-+ + conf.get("fs.ceph.readahead", "1");
-+ // make sure they gave us a ceph monitor address or conf file
-+ LOG.info("initialize:Ceph initialization arguments: " + arguments);
-+ if ((conf.get("fs.ceph.monAddr") == null) && (arguments.indexOf("-m") == -1)
-+ && (arguments.indexOf("-c") == -1)) {
-+ LOG.fatal("initialize:You need to specify a Ceph monitor address.");
-+ throw new IOException(
-+ "You must specify a Ceph monitor address or config file!");
-+ }
-+ // Initialize the client
-+ if (!ceph.ceph_initializeClient(arguments,
-+ conf.getInt("fs.ceph.blockSize", 1 << 26))) {
-+ LOG.fatal("initialize:Ceph initialization failed!");
-+ throw new IOException("Ceph initialization failed!");
-+ }
-+ LOG.info("initialize:Ceph initialized client. Setting cwd to /");
-+ ceph.ceph_setcwd("/");
-+ LOG.debug("initialize:exit");
-+
-+ this.workingDir = getHomeDirectory();
-+ }
-+
-+ /**
-+ * Close down the CephFileSystem. Runs the base-class close method
-+ * and then kills the Ceph client itself.
-+ */
-+ @Override
-+ public void close() throws IOException {
-+ LOG.debug("close:enter");
-+ super.close(); // this method does stuff, make sure it's run!
-+ LOG.trace("close: Calling ceph_kill_client from Java");
-+ ceph.ceph_kill_client();
-+ LOG.debug("close:exit");
-+ }
-+
-+ /**
-+ * Get an FSDataOutputStream to append onto a file.
-+ * @param file The File you want to append onto
-+ * @param bufferSize Ceph does internal buffering but you can buffer in the Java code as well if you like.
-+ * @param progress The Progressable to report progress to.
-+ * Reporting is limited but exists.
-+ * @return An FSDataOutputStream that connects to the file on Ceph.
-+ * @throws IOException If the file cannot be found or appended to.
-+ */
-+ public FSDataOutputStream append(Path file, int bufferSize,
-+ Progressable progress) throws IOException {
-+ LOG.debug("append:enter with path " + file + " bufferSize " + bufferSize);
-+ Path abs_path = makeAbsolute(file);
-+
-+ if (progress != null) {
-+ progress.progress();
-+ }
-+ LOG.trace("append: Entering ceph_open_for_append from Java");
-+ int fd = ceph.ceph_open_for_append(getCephPath(abs_path));
-+
-+ LOG.trace("append: Returned to Java");
-+ if (progress != null) {
-+ progress.progress();
-+ }
-+ if (fd < 0) { // error in open
-+ throw new IOException(
-+ "append: Open for append failed on path \"" + abs_path.toString()
-+ + "\"");
-+ }
-+ CephOutputStream cephOStream = new CephOutputStream(getConf(), ceph, fd,
-+ bufferSize);
-+
-+ LOG.debug("append:exit");
-+ return new FSDataOutputStream(cephOStream, statistics);
-+ }
-+
-+ /**
-+ * Get the current working directory for the given file system
-+ * @return the directory Path
-+ */
-+ public Path getWorkingDirectory() {
-+ return workingDir;
-+ }
-+
-+ /**
-+ * Set the current working directory for the given file system. All relative
-+ * paths will be resolved relative to it.
-+ *
-+ * @param dir The directory to change to.
-+ */
-+ @Override
-+ public void setWorkingDirectory(Path dir) {
-+ workingDir = makeAbsolute(dir);
-+ }
-+
-+ /**
-+ * Return only the path component from a potentially fully qualified path.
-+ */
-+ private String getCephPath(Path path) {
-+ if (!path.isAbsolute()) {
-+ throw new IllegalArgumentException("Path must be absolute: " + path);
-+ }
-+ return path.toUri().getPath();
-+ }
-+
-+ /**
-+ * Check if a path exists.
-+ * Overriden because it's moderately faster than the generic implementation.
-+ * @param path The file to check existence on.
-+ * @return true if the file exists, false otherwise.
-+ */
-+ @Override
-+ public boolean exists(Path path) throws IOException {
-+ LOG.debug("exists:enter with path " + path);
-+ boolean result;
-+ Path abs_path = makeAbsolute(path);
-+
-+ if (abs_path.equals(root)) {
-+ result = true;
-+ } else {
-+ LOG.trace(
-+ "exists:Calling ceph_exists from Java on path " + abs_path.toString());
-+ result = ceph.ceph_exists(getCephPath(abs_path));
-+ LOG.trace("exists:Returned from ceph_exists to Java");
-+ }
-+ LOG.debug("exists:exit with value " + result);
-+ return result;
-+ }
-+
-+ /**
-+ * Create a directory and any nonexistent parents. Any portion
-+ * of the directory tree can exist without error.
-+ * @param path The directory path to create
-+ * @param perms The permissions to apply to the created directories.
-+ * @return true if successful, false otherwise
-+ * @throws IOException if the path is a child of a file.
-+ */
-+ @Override
-+ public boolean mkdirs(Path path, FsPermission perms) throws IOException {
-+ LOG.debug("mkdirs:enter with path " + path);
-+ Path abs_path = makeAbsolute(path);
-+
-+ LOG.trace("mkdirs:calling ceph_mkdirs from Java");
-+ int result = ceph.ceph_mkdirs(getCephPath(abs_path), (int) perms.toShort());
-+
-+ if (result != 0) {
-+ LOG.warn(
-+ "mkdirs: make directory " + abs_path + "Failing with result " + result);
-+ if (-ceph.ENOTDIR == result) {
-+ throw new IOException("Parent path is not a directory");
-+ }
-+ return false;
-+ } else {
-+ LOG.debug("mkdirs:exiting succesfully");
-+ return true;
-+ }
-+ }
-+
-+ /**
-+ * Check if a path is a file. This is moderately faster than the
-+ * generic implementation.
-+ * @param path The path to check.
-+ * @return true if the path is definitely a file, false otherwise.
-+ */
-+ @Override
-+ public boolean isFile(Path path) throws IOException {
-+ LOG.debug("isFile:enter with path " + path);
-+ Path abs_path = makeAbsolute(path);
-+ boolean result;
-+
-+ if (abs_path.equals(root)) {
-+ result = false;
-+ } else {
-+ LOG.trace("isFile:entering ceph_isfile from Java");
-+ result = ceph.ceph_isfile(getCephPath(abs_path));
-+ }
-+ LOG.debug("isFile:exit with result " + result);
-+ return result;
-+ }
-+
-+ /**
-+ * Get stat information on a file. This does not fill owner or group, as
-+ * Ceph's support for these is a bit different than HDFS'.
-+ * @param path The path to stat.
-+ * @return FileStatus object containing the stat information.
-+ * @throws FileNotFoundException if the path could not be resolved.
-+ */
-+ public FileStatus getFileStatus(Path path) throws IOException {
-+ LOG.debug("getFileStatus:enter with path " + path);
-+ Path abs_path = makeAbsolute(path);
-+ // sadly, Ceph doesn't really do uids/gids just yet, but
-+ // everything else is filled
-+ FileStatus status;
-+ Stat lstat = new Stat();
-+
-+ LOG.trace("getFileStatus: calling ceph_stat from Java");
-+ if (ceph.ceph_stat(getCephPath(abs_path), lstat)) {
-+ status = new FileStatus(lstat.size, lstat.is_dir,
-+ ceph.ceph_replication(getCephPath(abs_path)), lstat.block_size,
-+ lstat.mod_time, lstat.access_time,
-+ new FsPermission((short) lstat.mode), System.getProperty("user.name"), null,
-+ path.makeQualified(this));
-+ } else { // fail out
-+ throw new FileNotFoundException(
-+ "org.apache.hadoop.fs.ceph.CephFileSystem: File " + path
-+ + " does not exist or could not be accessed");
-+ }
-+
-+ LOG.debug("getFileStatus:exit");
-+ return status;
-+ }
-+
-+ /**
-+ * Get the FileStatus for each listing in a directory.
-+ * @param path The directory to get listings from.
-+ * @return FileStatus[] containing one FileStatus for each directory listing;
-+ * null if path does not exist.
-+ */
-+ public FileStatus[] listStatus(Path path) throws IOException {
-+ LOG.debug("listStatus:enter with path " + path);
-+ Path abs_path = makeAbsolute(path);
-+ Path[] paths = listPaths(abs_path);
-+
-+ if (paths != null) {
-+ FileStatus[] statuses = new FileStatus[paths.length];
-+
-+ for (int i = 0; i < paths.length; ++i) {
-+ statuses[i] = getFileStatus(paths[i]);
-+ }
-+ LOG.debug("listStatus:exit");
-+ return statuses;
-+ }
-+
-+ if (isFile(path)) {
-+ return new FileStatus[] { getFileStatus(path) };
-+ }
-+
-+ return null;
-+ }
-+
-+ @Override
-+ public void setPermission(Path p, FsPermission permission) throws IOException {
-+ LOG.debug(
-+ "setPermission:enter with path " + p + " and permissions " + permission);
-+ Path abs_path = makeAbsolute(p);
-+
-+ LOG.trace("setPermission:calling ceph_setpermission from Java");
-+ ceph.ceph_setPermission(getCephPath(abs_path), permission.toShort());
-+ LOG.debug("setPermission:exit");
-+ }
-+
-+ /**
-+ * Set access/modification times of a file.
-+ * @param p The path
-+ * @param mtime Set modification time in number of millis since Jan 1, 1970.
-+ * @param atime Set access time in number of millis since Jan 1, 1970.
-+ */
-+ @Override
-+ public void setTimes(Path p, long mtime, long atime) throws IOException {
-+ LOG.debug(
-+ "setTimes:enter with path " + p + " mtime:" + mtime + " atime:" + atime);
-+ Path abs_path = makeAbsolute(p);
-+
-+ LOG.trace("setTimes:calling ceph_setTimes from Java");
-+ int r = ceph.ceph_setTimes(getCephPath(abs_path), mtime, atime);
-+
-+ if (r < 0) {
-+ throw new IOException(
-+ "Failed to set times on path " + abs_path.toString() + " Error code: "
-+ + r);
-+ }
-+ LOG.debug("setTimes:exit");
-+ }
-+
-+ /**
-+ * Create a new file and open an FSDataOutputStream that's connected to it.
-+ * @param path The file to create.
-+ * @param permission The permissions to apply to the file.
-+ * @param overwrite If true, overwrite any existing file with
-+ * this name; otherwise don't.
-+ * @param bufferSize Ceph does internal buffering, but you can buffer
-+ * in the Java code too if you like.
-+ * @param replication Ignored by Ceph. This can be
-+ * configured via Ceph configuration.
-+ * @param blockSize Ignored by Ceph. You can set client-wide block sizes
-+ * via the fs.ceph.blockSize param if you like.
-+ * @param progress A Progressable to report back to.
-+ * Reporting is limited but exists.
-+ * @return An FSDataOutputStream pointing to the created file.
-+ * @throws IOException if the path is an
-+ * existing directory, or the path exists but overwrite is false, or there is a
-+ * failure in attempting to open for append with Ceph.
-+ */
-+ public FSDataOutputStream create(Path path,
-+ FsPermission permission,
-+ boolean overwrite,
-+ int bufferSize,
-+ short replication,
-+ long blockSize,
-+ Progressable progress) throws IOException {
-+ LOG.debug("create:enter with path " + path);
-+ Path abs_path = makeAbsolute(path);
-+
-+ if (progress != null) {
-+ progress.progress();
-+ }
-+ // We ignore replication since that's not configurable here, and
-+ // progress reporting is quite limited.
-+ // Required semantics: if the file exists, overwrite if 'overwrite' is set;
-+ // otherwise, throw an exception
-+
-+ // Step 1: existence test
-+ boolean exists = exists(abs_path);
-+
-+ if (exists) {
-+ if (getFileStatus(abs_path).isDir()) {
-+ throw new IOException(
-+ "create: Cannot overwrite existing directory \"" + path.toString()
-+ + "\" with a file");
-+ }
-+ if (!overwrite) {
-+ throw new IOException(
-+ "createRaw: Cannot open existing file \"" + abs_path.toString()
-+ + "\" for writing without overwrite flag");
-+ }
-+ }
-+
-+ if (progress != null) {
-+ progress.progress();
-+ }
-+
-+ // Step 2: create any nonexistent directories in the path
-+ if (!exists) {
-+ Path parent = abs_path.getParent();
-+
-+ if (parent != null) { // if parent is root, we're done
-+ int r = ceph.ceph_mkdirs(getCephPath(parent), permission.toShort());
-+
-+ if (!(r == 0 || r == -ceph.EEXIST)) {
-+ throw new IOException("Error creating parent directory; code: " + r);
-+ }
-+ }
-+ if (progress != null) {
-+ progress.progress();
-+ }
-+ }
-+ // Step 3: open the file
-+ LOG.trace("calling ceph_open_for_overwrite from Java");
-+ int fh = ceph.ceph_open_for_overwrite(getCephPath(abs_path),
-+ (int) permission.toShort());
-+
-+ if (progress != null) {
-+ progress.progress();
-+ }
-+ LOG.trace("Returned from ceph_open_for_overwrite to Java with fh " + fh);
-+ if (fh < 0) {
-+ throw new IOException(
-+ "create: Open for overwrite failed on path \"" + path.toString()
-+ + "\"");
-+ }
-+
-+ // Step 4: create the stream
-+ OutputStream cephOStream = new CephOutputStream(getConf(), ceph, fh,
-+ bufferSize);
-+
-+ LOG.debug("create:exit");
-+ return new FSDataOutputStream(cephOStream, statistics);
-+ }
-+
-+ /**
-+ * Open a Ceph file and attach the file handle to an FSDataInputStream.
-+ * @param path The file to open
-+ * @param bufferSize Ceph does internal buffering; but you can buffer in
-+ * the Java code too if you like.
-+ * @return FSDataInputStream reading from the given path.
-+ * @throws IOException if the path DNE or is a
-+ * directory, or there is an error getting data to set up the FSDataInputStream.
-+ */
-+ public FSDataInputStream open(Path path, int bufferSize) throws IOException {
-+ LOG.debug("open:enter with path " + path);
-+ Path abs_path = makeAbsolute(path);
-+
-+ int fh = ceph.ceph_open_for_read(getCephPath(abs_path));
-+
-+ if (fh < 0) { // uh-oh, something's bad!
-+ if (fh == -ceph.ENOENT) { // well that was a stupid open
-+ throw new IOException(
-+ "open: absolute path \"" + abs_path.toString()
-+ + "\" does not exist");
-+ } else { // hrm...the file exists but we can't open it :(
-+ throw new IOException("open: Failed to open file " + abs_path.toString());
-+ }
-+ }
-+
-+ if (getFileStatus(abs_path).isDir()) { // yes, it is possible to open Ceph directories
-+ // but that doesn't mean you should in Hadoop!
-+ ceph.ceph_close(fh);
-+ throw new IOException(
-+ "open: absolute path \"" + abs_path.toString() + "\" is a directory!");
-+ }
-+ Stat lstat = new Stat();
-+
-+ LOG.trace("open:calling ceph_stat from Java");
-+ ceph.ceph_stat(getCephPath(abs_path), lstat);
-+ LOG.trace("open:returned to Java");
-+ long size = lstat.size;
-+
-+ if (size < 0) {
-+ throw new IOException(
-+ "Failed to get file size for file " + abs_path.toString()
-+ + " but succeeded in opening file. Something bizarre is going on.");
-+ }
-+ FSInputStream cephIStream = new CephInputStream(getConf(), ceph, fh, size,
-+ bufferSize);
-+
-+ LOG.debug("open:exit");
-+ return new FSDataInputStream(cephIStream);
-+ }
-+
-+ /**
-+ * Rename a file or directory.
-+ * @param src The current path of the file/directory
-+ * @param dst The new name for the path.
-+ * @return true if the rename succeeded, false otherwise.
-+ */
-+ @Override
-+ public boolean rename(Path src, Path dst) throws IOException {
-+ LOG.debug("rename:enter with src:" + src + " and dest:" + dst);
-+ Path abs_src = makeAbsolute(src);
-+ Path abs_dst = makeAbsolute(dst);
-+
-+ LOG.trace("calling ceph_rename from Java");
-+ boolean result = ceph.ceph_rename(getCephPath(abs_src), getCephPath(abs_dst));
-+
-+ if (!result) {
-+ boolean isDir = false;
-+ try {
-+ isDir = getFileStatus(abs_dst).isDir();
-+ } catch (FileNotFoundException e) {}
-+ if (isDir) { // move the srcdir into destdir
-+ LOG.debug("ceph_rename failed but dst is a directory!");
-+ Path new_dst = new Path(abs_dst, abs_src.getName());
-+
-+ result = rename(abs_src, new_dst);
-+ LOG.debug(
-+ "attempt to move " + abs_src.toString() + " to "
-+ + new_dst.toString() + "has result:" + result);
-+ }
-+ }
-+ LOG.debug("rename:exit with result: " + result);
-+ return result;
-+ }
-+
-+ /*
-+ * Attempt to convert an IP into its hostname
-+ */
-+ private String[] ips2Hosts(String[] ips) {
-+ ArrayList<String> hosts = new ArrayList<String>();
-+ for (String ip : ips) {
-+ try {
-+ String host = DNS.reverseDns(InetAddress.getByName(ip), CEPH_NAMESERVER);
-+ if (host.charAt(host.length()-1) == '.') {
-+ host = host.substring(0, host.length()-1);
-+ }
-+ hosts.add(host); /* append */
-+ } catch (Exception e) {
-+ LOG.error("reverseDns ["+ip+"] failed: "+ e);
-+ }
-+ }
-+ return hosts.toArray(new String[hosts.size()]);
-+ }
-+
-+ /**
-+ * Get a BlockLocation object for each block in a file.
-+ *
-+ * Note that this doesn't include port numbers in the name field as
-+ * Ceph handles slow/down servers internally. This data should be used
-+ * only for selecting which servers to run which jobs on.
-+ *
-+ * @param file A FileStatus object corresponding to the file you want locations for.
-+ * @param start The offset of the first part of the file you are interested in.
-+ * @param len The amount of the file past the offset you are interested in.
-+ * @return A BlockLocation[] where each object corresponds to a block within
-+ * the given range.
-+ */
-+ @Override
-+ public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len) throws IOException {
-+ Path abs_path = makeAbsolute(file.getPath());
-+
-+ int fh = ceph.ceph_open_for_read(getCephPath(abs_path));
-+ if (fh < 0) {
-+ LOG.error("getFileBlockLocations:got error " + fh + ", exiting and returning null!");
-+ return null;
-+ }
-+
-+ long blockSize = ceph.ceph_getblocksize(getCephPath(abs_path));
-+ BlockLocation[] locations = new BlockLocation[(int) Math.ceil(len / (float) blockSize)];
-+
-+ for (int i = 0; i < locations.length; ++i) {
-+ long offset = start + i * blockSize;
-+ long blockStart = start + i * blockSize - (start % blockSize);
-+ String ips[] = ceph.ceph_hosts(fh, offset);
-+ String hosts[] = ips2Hosts(ips);
-+ locations[i] = new BlockLocation(null, hosts, blockStart, blockSize);
-+ LOG.debug("getFileBlockLocations: location[" + i + "]: " + locations[i]);
-+ }
-+
-+ ceph.ceph_close(fh);
-+ return locations;
-+ }
-+
-+ @Deprecated
-+ public boolean delete(Path path) throws IOException {
-+ return delete(path, false);
-+ }
-+
-+ /**
-+ * Delete the given path, and optionally its children.
-+ * @param path the path to delete.
-+ * @param recursive If the path is a non-empty directory and this is false,
-+ * delete will throw an IOException. If path is a file this is ignored.
-+ * @return true if the delete succeeded, false otherwise (including if
-+ * path doesn't exist).
-+ * @throws IOException if you attempt to non-recursively delete a directory,
-+ * or you attempt to delete the root directory.
-+ */
-+ public boolean delete(Path path, boolean recursive) throws IOException {
-+ LOG.debug("delete:enter with path " + path + " and recursive=" + recursive);
-+ Path abs_path = makeAbsolute(path);
-+
-+ // sanity check
-+ if (abs_path.equals(root)) {
-+ throw new IOException("Error: deleting the root directory is a Bad Idea.");
-+ }
-+ if (!exists(abs_path)) {
-+ return false;
-+ }
-+
-+ // if the path is a file, try to delete it.
-+ if (isFile(abs_path)) {
-+ LOG.trace("delete:calling ceph_unlink from Java with path " + abs_path);
-+ boolean result = ceph.ceph_unlink(getCephPath(abs_path));
-+
-+ if (!result) {
-+ LOG.error(
-+ "delete: failed to delete file \"" + abs_path.toString() + "\".");
-+ }
-+ LOG.debug("delete:exit with success=" + result);
-+ return result;
-+ }
-+
-+ /* The path is a directory, so recursively try to delete its contents,
-+ and then delete the directory. */
-+ // get the entries; listPaths will remove . and .. for us
-+ Path[] contents = listPaths(abs_path);
-+
-+ if (contents == null) {
-+ LOG.error(
-+ "delete: Failed to read contents of directory \""
-+ + abs_path.toString() + "\" while trying to delete it, BAILING");
-+ return false;
-+ }
-+ if (!recursive && contents.length > 0) {
-+ throw new IOException("Directories must be deleted recursively!");
-+ }
-+ // delete the entries
-+ LOG.debug("delete: recursively calling delete on contents of " + abs_path);
-+ for (Path p : contents) {
-+ if (!delete(p, true)) {
-+ LOG.error(
-+ "delete: Failed to delete file \"" + p.toString()
-+ + "\" while recursively deleting \"" + abs_path.toString()
-+ + "\", BAILING");
-+ return false;
-+ }
-+ }
-+ // if we've come this far it's a now-empty directory, so delete it!
-+ boolean result = ceph.ceph_rmdir(getCephPath(abs_path));
-+
-+ if (!result) {
-+ LOG.error(
-+ "delete: failed to delete \"" + abs_path.toString() + "\", BAILING");
-+ }
-+ LOG.debug("delete:exit");
-+ return result;
-+ }
-+
-+ /**
-+ * Returns the default replication value of 1. This may
-+ * NOT be the actual value, as replication is controlled
-+ * by a separate Ceph configuration.
-+ */
-+ @Override
-+ public short getDefaultReplication() {
-+ return 1;
-+ }
-+
-+ /**
-+ * Get the default block size.
-+ * @return the default block size, in bytes, as a long.
-+ */
-+ @Override
-+ public long getDefaultBlockSize() {
-+ return getConf().getInt("fs.ceph.blockSize", 1 << 26);
-+ }
-+
-+ /**
-+ * Adds the working directory to path if path is not already
-+ * an absolute path. The URI scheme is not removed here. It
-+ * is removed only when users (e.g. ceph native calls) need
-+ * the path-only portion.
-+ */
-+ private Path makeAbsolute(Path path) {
-+ if (path.isAbsolute()) {
-+ return path;
-+ }
-+ return new Path(workingDir, path);
-+ }
-+
-+ private Path[] listPaths(Path path) throws IOException {
-+ LOG.debug("listPaths:enter with path " + path);
-+ String dirlist[];
-+
-+ Path abs_path = makeAbsolute(path);
-+
-+ // If it's a directory, get the listing. Otherwise, complain and give up.
-+ LOG.debug("calling ceph_getdir from Java with path " + abs_path);
-+ dirlist = ceph.ceph_getdir(getCephPath(abs_path));
-+ LOG.debug("returning from ceph_getdir to Java");
-+
-+ if (dirlist == null) {
-+ return null;
-+ }
-+
-+ // convert the strings to Paths
-+ Path[] paths = new Path[dirlist.length];
-+
-+ for (int i = 0; i < dirlist.length; ++i) {
-+ LOG.trace(
-+ "Raw enumeration of paths in \"" + abs_path.toString() + "\": \""
-+ + dirlist[i] + "\"");
-+ // convert each listing to an absolute path
-+ Path raw_path = new Path(dirlist[i]);
-+
-+ if (raw_path.isAbsolute()) {
-+ paths[i] = raw_path;
-+ } else {
-+ paths[i] = new Path(abs_path, raw_path);
-+ }
-+ }
-+ LOG.debug("listPaths:exit");
-+ return paths;
-+ }
-+
-+ static class Stat {
-+ public long size;
-+ public boolean is_dir;
-+ public long block_size;
-+ public long mod_time;
-+ public long access_time;
-+ public int mode;
-+
-+ public Stat() {}
-+ }
-+}
-diff --git a/src/core/org/apache/hadoop/fs/ceph/CephInputStream.java b/src/core/org/apache/hadoop/fs/ceph/CephInputStream.java
-new file mode 100644
-index 0000000..d9668d0
---- /dev/null
-+++ b/src/core/org/apache/hadoop/fs/ceph/CephInputStream.java
-@@ -0,0 +1,254 @@
-+// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*-
-+
-+/**
-+ *
-+ * Licensed under the Apache License, Version 2.0
-+ * (the "License"); you may not use this file except in compliance with
-+ * the License. You may obtain a copy of the License at
-+ *
-+ * http://www.apache.org/licenses/LICENSE-2.0
-+ *
-+ * Unless required by applicable law or agreed to in writing, software
-+ * distributed under the License is distributed on an "AS IS" BASIS,
-+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
-+ * implied. See the License for the specific language governing
-+ * permissions and limitations under the License.
-+ *
-+ *
-+ * Implements the Hadoop FS interfaces to allow applications to store
-+ * files in Ceph.
-+ */
-+package org.apache.hadoop.fs.ceph;
-+
-+
-+import java.io.IOException;
-+
-+import org.apache.commons.logging.Log;
-+import org.apache.commons.logging.LogFactory;
-+import org.apache.hadoop.conf.Configuration;
-+import org.apache.hadoop.fs.FSInputStream;
-+
-+
-+/**
-+ * <p>
-+ * An {@link FSInputStream} for a CephFileSystem and corresponding
-+ * Ceph instance.
-+ */
-+public class CephInputStream extends FSInputStream {
-+ private static final Log LOG = LogFactory.getLog(CephInputStream.class);
-+ private boolean closed;
-+
-+ private int fileHandle;
-+
-+ private long fileLength;
-+
-+ private CephFS ceph;
-+
-+ private byte[] buffer;
-+ private int bufPos = 0;
-+ private int bufValid = 0;
-+ private long cephPos = 0;
-+
-+ /**
-+ * Create a new CephInputStream.
-+ * @param conf The system configuration. Unused.
-+ * @param fh The filehandle provided by Ceph to reference.
-+ * @param flength The current length of the file. If the length changes
-+ * you will need to close and re-open it to access the new data.
-+ */
-+ public CephInputStream(Configuration conf, CephFS cephfs,
-+ int fh, long flength, int bufferSize) {
-+ // Whoever's calling the constructor is responsible for doing the actual ceph_open
-+ // call and providing the file handle.
-+ fileLength = flength;
-+ fileHandle = fh;
-+ closed = false;
-+ ceph = cephfs;
-+ buffer = new byte[bufferSize];
-+ LOG.debug(
-+ "CephInputStream constructor: initializing stream with fh " + fh
-+ + " and file length " + flength);
-+
-+ }
-+
-+ /** Ceph likes things to be closed before it shuts down,
-+ * so closing the IOStream stuff voluntarily in a finalizer is good
-+ */
-+ protected void finalize() throws Throwable {
-+ try {
-+ if (!closed) {
-+ close();
-+ }
-+ } finally {
-+ super.finalize();
-+ }
-+ }
-+
-+ private synchronized boolean fillBuffer() throws IOException {
-+ bufValid = ceph.ceph_read(fileHandle, buffer, 0, buffer.length);
-+ bufPos = 0;
-+ if (bufValid < 0) {
-+ int err = bufValid;
-+
-+ bufValid = 0;
-+ // attempt to reset to old position. If it fails, too bad.
-+ ceph.ceph_seek_from_start(fileHandle, cephPos);
-+ throw new IOException("Failed to fill read buffer! Error code:" + err);
-+ }
-+ cephPos += bufValid;
-+ return (bufValid != 0);
-+ }
-+
-+ /*
-+ * Get the current position of the stream.
-+ */
-+ public synchronized long getPos() throws IOException {
-+ return cephPos - bufValid + bufPos;
-+ }
-+
-+ /**
-+ * Find the number of bytes remaining in the file.
-+ */
-+ @Override
-+ public synchronized int available() throws IOException {
-+ return (int) (fileLength - getPos());
-+ }
-+
-+ public synchronized void seek(long targetPos) throws IOException {
-+ LOG.trace(
-+ "CephInputStream.seek: Seeking to position " + targetPos + " on fd "
-+ + fileHandle);
-+ if (targetPos > fileLength) {
-+ throw new IOException(
-+ "CephInputStream.seek: failed seek to position " + targetPos
-+ + " on fd " + fileHandle + ": Cannot seek after EOF " + fileLength);
-+ }
-+ long oldPos = cephPos;
-+
-+ cephPos = ceph.ceph_seek_from_start(fileHandle, targetPos);
-+ bufValid = 0;
-+ bufPos = 0;
-+ if (cephPos < 0) {
-+ cephPos = oldPos;
-+ throw new IOException("Ceph failed to seek to new position!");
-+ }
-+ }
-+
-+ /**
-+ * Failovers are handled by the Ceph code at a very low level;
-+ * if there are issues that can be solved by changing sources
-+ * they'll be dealt with before anybody even tries to call this method!
-+ * @return false.
-+ */
-+ public synchronized boolean seekToNewSource(long targetPos) {
-+ return false;
-+ }
-+
-+ /**
-+ * Read a byte from the file.
-+ * @return the next byte.
-+ */
-+ @Override
-+ public synchronized int read() throws IOException {
-+ LOG.trace(
-+ "CephInputStream.read: Reading a single byte from fd " + fileHandle
-+ + " by calling general read function");
-+
-+ byte result[] = new byte[1];
-+
-+ if (getPos() >= fileLength) {
-+ return -1;
-+ }
-+ if (-1 == read(result, 0, 1)) {
-+ return -1;
-+ }
-+ if (result[0] < 0) {
-+ return 256 + (int) result[0];
-+ } else {
-+ return result[0];
-+ }
-+ }
-+
-+ /**
-+ * Read a specified number of bytes from the file into a byte[].
-+ * @param buf the byte array to read into.
-+ * @param off the offset to start at in the file
-+ * @param len the number of bytes to read
-+ * @return 0 if successful, otherwise an error code.
-+ * @throws IOException on bad input.
-+ */
-+ @Override
-+ public synchronized int read(byte buf[], int off, int len)
-+ throws IOException {
-+ LOG.trace(
-+ "CephInputStream.read: Reading " + len + " bytes from fd " + fileHandle);
-+
-+ if (closed) {
-+ throw new IOException(
-+ "CephInputStream.read: cannot read " + len + " bytes from fd "
-+ + fileHandle + ": stream closed");
-+ }
-+
-+ // ensure we're not past the end of the file
-+ if (getPos() >= fileLength) {
-+ LOG.debug(
-+ "CephInputStream.read: cannot read " + len + " bytes from fd "
-+ + fileHandle + ": current position is " + getPos()
-+ + " and file length is " + fileLength);
-+
-+ return -1;
-+ }
-+
-+ int totalRead = 0;
-+ int initialLen = len;
-+ int read;
-+
-+ do {
-+ read = Math.min(len, bufValid - bufPos);
-+ try {
-+ System.arraycopy(buffer, bufPos, buf, off, read);
-+ } catch (IndexOutOfBoundsException ie) {
-+ throw new IOException(
-+ "CephInputStream.read: Indices out of bounds:" + "read length is "
-+ + len + ", buffer offset is " + off + ", and buffer size is "
-+ + buf.length);
-+ } catch (ArrayStoreException ae) {
-+ throw new IOException(
-+ "Uh-oh, CephInputStream failed to do an array"
-+ + "copy due to type mismatch...");
-+ } catch (NullPointerException ne) {
-+ throw new IOException(
-+ "CephInputStream.read: cannot read " + len + "bytes from fd:"
-+ + fileHandle + ": buf is null");
-+ }
-+ bufPos += read;
-+ len -= read;
-+ off += read;
-+ totalRead += read;
-+ } while (len > 0 && fillBuffer());
-+
-+ LOG.trace(
-+ "CephInputStream.read: Reading " + initialLen + " bytes from fd "
-+ + fileHandle + ": succeeded in reading " + totalRead + " bytes");
-+ return totalRead;
-+ }
-+
-+ /**
-+ * Close the CephInputStream and release the associated filehandle.
-+ */
-+ @Override
-+ public void close() throws IOException {
-+ LOG.trace("CephOutputStream.close:enter");
-+ if (!closed) {
-+ int result = ceph.ceph_close(fileHandle);
-+
-+ closed = true;
-+ if (result != 0) {
-+ throw new IOException(
-+ "Close somehow failed!"
-+ + "Don't try and use this stream again, though");
-+ }
-+ LOG.trace("CephOutputStream.close:exit");
-+ }
-+ }
-+}
-diff --git a/src/core/org/apache/hadoop/fs/ceph/CephOutputStream.java b/src/core/org/apache/hadoop/fs/ceph/CephOutputStream.java
-new file mode 100644
-index 0000000..4c50f88
---- /dev/null
-+++ b/src/core/org/apache/hadoop/fs/ceph/CephOutputStream.java
-@@ -0,0 +1,219 @@
-+// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*-
-+
-+/**
-+ *
-+ * Licensed under the Apache License, Version 2.0
-+ * (the "License"); you may not use this file except in compliance with
-+ * the License. You may obtain a copy of the License at
-+ *
-+ * http://www.apache.org/licenses/LICENSE-2.0
-+ *
-+ * Unless required by applicable law or agreed to in writing, software
-+ * distributed under the License is distributed on an "AS IS" BASIS,
-+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
-+ * implied. See the License for the specific language governing
-+ * permissions and limitations under the License.
-+ *
-+ *
-+ * Implements the Hadoop FS interfaces to allow applications to store
-+ * files in Ceph.
-+ */
-+
-+package org.apache.hadoop.fs.ceph;
-+
-+
-+import java.io.IOException;
-+import java.io.OutputStream;
-+
-+import org.apache.commons.logging.Log;
-+import org.apache.commons.logging.LogFactory;
-+import org.apache.hadoop.conf.Configuration;
-+import org.apache.hadoop.util.Progressable;
-+
-+
-+/**
-+ * <p>
-+ * An {@link OutputStream} for a CephFileSystem and corresponding
-+ * Ceph instance.
-+ */
-+public class CephOutputStream extends OutputStream {
-+ private static final Log LOG = LogFactory.getLog(CephOutputStream.class);
-+ private boolean closed;
-+
-+ private CephFS ceph;
-+
-+ private int fileHandle;
-+
-+ private byte[] buffer;
-+ private int bufUsed = 0;
-+
-+ /**
-+ * Construct the CephOutputStream.
-+ * @param conf The FileSystem configuration.
-+ * @param fh The Ceph filehandle to connect to.
-+ */
-+ public CephOutputStream(Configuration conf, CephFS cephfs,
-+ int fh, int bufferSize) {
-+ ceph = cephfs;
-+ fileHandle = fh;
-+ closed = false;
-+ buffer = new byte[bufferSize];
-+ }
-+
-+ /** Ceph likes things to be closed before it shuts down,
-+ *so closing the IOStream stuff voluntarily is good
-+ */
-+ protected void finalize() throws Throwable {
-+ try {
-+ if (!closed) {
-+ close();
-+ }
-+ } finally {
-+ super.finalize();
-+ }
-+ }
-+
-+ /**
-+ * Get the current position in the file.
-+ * @return The file offset in bytes.
-+ */
-+ public long getPos() throws IOException {
-+ return ceph.ceph_getpos(fileHandle);
-+ }
-+
-+ /**
-+ * Write a byte.
-+ * @param b The byte to write.
-+ * @throws IOException If you have closed the CephOutputStream or the
-+ * write fails.
-+ */
-+ @Override
-+ public synchronized void write(int b) throws IOException {
-+ LOG.trace(
-+ "CephOutputStream.write: writing a single byte to fd " + fileHandle);
-+
-+ if (closed) {
-+ throw new IOException(
-+ "CephOutputStream.write: cannot write " + "a byte to fd " + fileHandle
-+ + ": stream closed");
-+ }
-+ // Stick the byte in a buffer and write it
-+ byte buf[] = new byte[1];
-+
-+ buf[0] = (byte) b;
-+ write(buf, 0, 1);
-+ return;
-+ }
-+
-+ /**
-+ * Write a byte buffer into the Ceph file.
-+ * @param buf the byte array to write from
-+ * @param off the position in the file to start writing at.
-+ * @param len The number of bytes to actually write.
-+ * @throws IOException if you have closed the CephOutputStream, or
-+ * if buf is null or off + len > buf.length, or
-+ * if the write fails due to a Ceph error.
-+ */
-+ @Override
-+ public synchronized void write(byte buf[], int off, int len) throws IOException {
-+ LOG.trace(
-+ "CephOutputStream.write: writing " + len + " bytes to fd " + fileHandle);
-+ // make sure stream is open
-+ if (closed) {
-+ throw new IOException(
-+ "CephOutputStream.write: cannot write " + len + "bytes to fd "
-+ + fileHandle + ": stream closed");
-+ }
-+
-+ int result;
-+ int write;
-+
-+ while (len > 0) {
-+ write = Math.min(len, buffer.length - bufUsed);
-+ try {
-+ System.arraycopy(buf, off, buffer, bufUsed, write);
-+ } catch (IndexOutOfBoundsException ie) {
-+ throw new IOException(
-+ "CephOutputStream.write: Indices out of bounds: "
-+ + "write length is " + len + ", buffer offset is " + off
-+ + ", and buffer size is " + buf.length);
-+ } catch (ArrayStoreException ae) {
-+ throw new IOException(
-+ "Uh-oh, CephOutputStream failed to do an array"
-+ + " copy due to type mismatch...");
-+ } catch (NullPointerException ne) {
-+ throw new IOException(
-+ "CephOutputStream.write: cannot write " + len + "bytes to fd "
-+ + fileHandle + ": buffer is null");
-+ }
-+ bufUsed += write;
-+ len -= write;
-+ off += write;
-+ if (bufUsed == buffer.length) {
-+ result = ceph.ceph_write(fileHandle, buffer, 0, bufUsed);
-+ if (result < 0) {
-+ throw new IOException(
-+ "CephOutputStream.write: Buffered write of " + bufUsed
-+ + " bytes failed!");
-+ }
-+ if (result != bufUsed) {
-+ throw new IOException(
-+ "CephOutputStream.write: Wrote only " + result + " bytes of "
-+ + bufUsed + " in buffer! Data may be lost or written"
-+ + " twice to Ceph!");
-+ }
-+ bufUsed = 0;
-+ }
-+
-+ }
-+ return;
-+ }
-+
-+ /**
-+ * Flush the buffered data.
-+ * @throws IOException if you've closed the stream or the write fails.
-+ */
-+ @Override
-+ public synchronized void flush() throws IOException {
-+ if (!closed) {
-+ if (bufUsed == 0) {
-+ return;
-+ }
-+ int result = ceph.ceph_write(fileHandle, buffer, 0, bufUsed);
-+
-+ if (result < 0) {
-+ throw new IOException(
-+ "CephOutputStream.write: Write of " + bufUsed + "bytes to fd "
-+ + fileHandle + " failed");
-+ }
-+ if (result != bufUsed) {
-+ throw new IOException(
-+ "CephOutputStream.write: Write of " + bufUsed + "bytes to fd "
-+ + fileHandle + "was incomplete: only " + result + " of " + bufUsed
-+ + " bytes were written.");
-+ }
-+ bufUsed = 0;
-+ return;
-+ }
-+ }
-+
-+ /**
-+ * Close the CephOutputStream.
-+ * @throws IOException if Ceph somehow returns an error. In current code it can't.
-+ */
-+ @Override
-+ public synchronized void close() throws IOException {
-+ LOG.trace("CephOutputStream.close:enter");
-+ if (!closed) {
-+ flush();
-+ int result = ceph.ceph_close(fileHandle);
-+
-+ if (result != 0) {
-+ throw new IOException("Close failed!");
-+ }
-+
-+ closed = true;
-+ LOG.trace("CephOutputStream.close:exit");
-+ }
-+ }
-+}
-diff --git a/src/core/org/apache/hadoop/fs/ceph/CephTalker.java b/src/core/org/apache/hadoop/fs/ceph/CephTalker.java
-new file mode 100644
-index 0000000..569652f
---- /dev/null
-+++ b/src/core/org/apache/hadoop/fs/ceph/CephTalker.java
-@@ -0,0 +1,91 @@
-+// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*-
-+
-+/**
-+ *
-+ * Licensed under the Apache License, Version 2.0
-+ * (the "License"); you may not use this file except in compliance with
-+ * the License. You may obtain a copy of the License at
-+ *
-+ * http://www.apache.org/licenses/LICENSE-2.0
-+ *
-+ * Unless required by applicable law or agreed to in writing, software
-+ * distributed under the License is distributed on an "AS IS" BASIS,
-+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
-+ * implied. See the License for the specific language governing
-+ * permissions and limitations under the License.
-+ *
-+ *
-+ * Wraps a number of native function calls to communicate with the Ceph
-+ * filesystem.
-+ */
-+package org.apache.hadoop.fs.ceph;
-+
-+
-+import org.apache.hadoop.conf.Configuration;
-+import org.apache.commons.logging.Log;
-+
-+
-+class CephTalker extends CephFS {
-+ // JNI doesn't give us any way to store pointers, so use a long.
-+ // Here we're assuming pointers aren't longer than 8 bytes.
-+ long cluster;
-+
-+ // we write a constructor so we can load the libraries
-+ public CephTalker(Configuration conf, Log log) {
-+ System.load(conf.get("fs.ceph.libDir") + "/libcephfs.so");
-+ System.load(conf.get("fs.ceph.libDir") + "/libhadoopcephfs.so");
-+ cluster = 0;
-+ }
-+
-+ protected native boolean ceph_initializeClient(String arguments, int block_size);
-+
-+ protected native String ceph_getcwd();
-+
-+ protected native boolean ceph_setcwd(String path);
-+
-+ protected native boolean ceph_rmdir(String path);
-+
-+ protected native boolean ceph_unlink(String path);
-+
-+ protected native boolean ceph_rename(String old_path, String new_path);
-+
-+ protected native boolean ceph_exists(String path);
-+
-+ protected native long ceph_getblocksize(String path);
-+
-+ protected native boolean ceph_isdirectory(String path);
-+
-+ protected native boolean ceph_isfile(String path);
-+
-+ protected native String[] ceph_getdir(String path);
-+
-+ protected native int ceph_mkdirs(String path, int mode);
-+
-+ protected native int ceph_open_for_append(String path);
-+
-+ protected native int ceph_open_for_read(String path);
-+
-+ protected native int ceph_open_for_overwrite(String path, int mode);
-+
-+ protected native int ceph_close(int filehandle);
-+
-+ protected native boolean ceph_setPermission(String path, int mode);
-+
-+ protected native boolean ceph_kill_client();
-+
-+ protected native boolean ceph_stat(String path, CephFileSystem.Stat fill);
-+
-+ protected native int ceph_replication(String Path);
-+
-+ protected native String[] ceph_hosts(int fh, long offset);
-+
-+ protected native int ceph_setTimes(String path, long mtime, long atime);
-+
-+ protected native long ceph_getpos(int fh);
-+
-+ protected native int ceph_write(int fh, byte[] buffer, int buffer_offset, int length);
-+
-+ protected native int ceph_read(int fh, byte[] buffer, int buffer_offset, int length);
-+
-+ protected native long ceph_seek_from_start(int fh, long pos);
-+}
-diff --git a/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java b/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java
-index 9e22f1f..cd55361 100644
---- a/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java
-+++ b/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java
-@@ -386,10 +386,12 @@ public class TrackerDistributedCacheManager {
- if (modifiedTime != desiredTimestamp) {
- DateFormat df = DateFormat.getDateTimeInstance(DateFormat.SHORT,
- DateFormat.SHORT);
-+ /*
- throw new IOException("The distributed cache object " + source +
- " changed during the job from " +
- df.format(new Date(desiredTimestamp)) + " to " +
- df.format(new Date(modifiedTime)));
-+ */
- }
-
- Path parchive = null;
-diff --git a/src/test/commit-tests b/src/test/commit-tests
-index 1148c8b..85fa53d 100644
---- a/src/test/commit-tests
-+++ b/src/test/commit-tests
-@@ -53,6 +53,7 @@
- **/TestRPC.java
- **/TestS3Credentials.java
- **/TestS3FileSystem.java
-+**/TestCeph.java
- **/TestSaslRPC.java
- **/TestScriptBasedMapping.java
- **/TestSequenceFileSerialization.java
-diff --git a/src/test/org/apache/hadoop/fs/ceph/TestCeph.java b/src/test/org/apache/hadoop/fs/ceph/TestCeph.java
-new file mode 100644
-index 0000000..e46b0ee
---- /dev/null
-+++ b/src/test/org/apache/hadoop/fs/ceph/TestCeph.java
-@@ -0,0 +1,45 @@
-+// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*-
-+
-+/**
-+ * Licensed to the Apache Software Foundation (ASF) under one
-+ * or more contributor license agreements. See the NOTICE file
-+ * distributed with this work for additional information
-+ * regarding copyright ownership. The ASF licenses this file
-+ * to you under the Apache License, Version 2.0 (the
-+ * "License"); you may not use this file except in compliance
-+ * with the License. You may obtain a copy of the License at
-+ *
-+ * http://www.apache.org/licenses/LICENSE-2.0
-+ *
-+ * Unless required by applicable law or agreed to in writing, software
-+ * distributed under the License is distributed on an "AS IS" BASIS,
-+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-+ * See the License for the specific language governing permissions and
-+ * limitations under the License.
-+ *
-+ * Unit tests for the CephFileSystem API implementation.
-+ */
-+
-+package org.apache.hadoop.fs.ceph;
-+
-+
-+import java.io.IOException;
-+import java.net.URI;
-+import org.apache.hadoop.conf.Configuration;
-+import org.apache.hadoop.fs.FileSystemContractBaseTest;
-+import org.apache.hadoop.fs.FileSystem;
-+import org.apache.hadoop.fs.Path;
-+
-+
-+public class TestCeph extends FileSystemContractBaseTest {
-+
-+ @Override
-+ protected void setUp() throws IOException {
-+ Configuration conf = new Configuration();
-+ CephFaker cephfaker = new CephFaker(conf, FileSystem.LOG);
-+ CephFileSystem cephfs = new CephFileSystem(cephfaker);
-+
-+ cephfs.initialize(URI.create("ceph://null"), conf);
-+ fs = cephfs;
-+ }
-+}
diff --git a/src/client/hadoop/Readme b/src/client/hadoop/Readme
deleted file mode 100644
index 2967b96cf5a..00000000000
--- a/src/client/hadoop/Readme
+++ /dev/null
@@ -1,17 +0,0 @@
-This directory contains:
-CephFSInterface.cc/h: A C++ JNI library used by the Hadoop Java code.
-ceph: A directory containing all the Java source files for a
-Hadoop-compliant CephFileSystem.
-HADOOP-ceph.patch: A patch for Hadoop. It should apply fine to one of the
-.20 branches. (It was generated against .20.205.0) This
-patch adds in all the files contained in the ceph dir as well as making
-some changes so that Hadoop's configuration code will recognize the
-CephFileSystem properties and classes. It is possible that this will be
-out-of-date compared to the files contained in the ceph dir, so you
-should apply this patch and then copy ceph/* into the appropriate Hadoop
-dir.
-
-There are also a number of javah-generated C header files which are used
-in writing CephFSInterface but can be safely ignored otherwise.
-
-Configuration instructions are included in Javadoc format in the ceph dir.
diff --git a/src/client/hadoop/ceph/CephFS.java b/src/client/hadoop/ceph/CephFS.java
deleted file mode 100644
index 5d51eb21600..00000000000
--- a/src/client/hadoop/ceph/CephFS.java
+++ /dev/null
@@ -1,250 +0,0 @@
-// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*-
-
-/**
- *
- * Licensed under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- *
- *
- * Abstract base class for communicating with a Ceph filesystem and its
- * C++ codebase from Java, or pretending to do so (for unit testing purposes).
- * As only the Ceph package should be using this directly, all methods
- * are protected.
- */
-package org.apache.hadoop.fs.ceph;
-
-import org.apache.hadoop.conf.Configuration;
-
-abstract class CephFS {
-
- protected static final int ENOTDIR = 20;
- protected static final int EEXIST = 17;
- protected static final int ENOENT = 2;
-
- /*
- * Performs any necessary setup to allow general use of the filesystem.
- * Inputs:
- * String argsuments -- a command-line style input of Ceph config params
- * int block_size -- the size in bytes to use for blocks
- * Returns: true on success, false otherwise
- */
- abstract protected boolean ceph_initializeClient(String arguments, int block_size);
-
- /*
- * Returns the current working directory (absolute) as a String
- */
- abstract protected String ceph_getcwd();
-
- /*
- * Changes the working directory.
- * Inputs:
- * String path: The path (relative or absolute) to switch to
- * Returns: true on success, false otherwise.
- */
- abstract protected boolean ceph_setcwd(String path);
-
- /*
- * Given a path to a directory, removes the directory if empty.
- * Inputs:
- * jstring j_path: The path (relative or absolute) to the directory
- * Returns: true on successful delete; false otherwise
- */
- abstract protected boolean ceph_rmdir(String path);
-
- /*
- * Given a path, unlinks it.
- * Inputs:
- * String path: The path (relative or absolute) to the file or empty dir
- * Returns: true if the unlink occurred, false otherwise.
- */
- abstract protected boolean ceph_unlink(String path);
-
- /*
- * Changes a given path name to a new name, assuming new_path doesn't exist.
- * Inputs:
- * jstring j_from: The path whose name you want to change.
- * jstring j_to: The new name for the path.
- * Returns: true if the rename occurred, false otherwise
- */
- abstract protected boolean ceph_rename(String old_path, String new_path);
-
- /*
- * Returns true if it the input path exists, false
- * if it does not or there is an unexpected failure.
- */
- abstract protected boolean ceph_exists(String path);
-
- /*
- * Get the block size for a given path.
- * Input:
- * String path: The path (relative or absolute) you want
- * the block size for.
- * Returns: block size if the path exists, otherwise a negative number
- * corresponding to the standard C++ error codes (which are positive).
- */
- abstract protected long ceph_getblocksize(String path);
-
- /*
- * Returns true if the given path is a directory, false otherwise.
- */
- abstract protected boolean ceph_isdirectory(String path);
-
- /*
- * Returns true if the given path is a file; false otherwise.
- */
- abstract protected boolean ceph_isfile(String path);
-
- /*
- * Get the contents of a given directory.
- * Inputs:
- * String path: The path (relative or absolute) to the directory.
- * Returns: A Java String[] of the contents of the directory, or
- * NULL if there is an error (ie, path is not a dir). This listing
- * will not contain . or .. entries.
- */
- abstract protected String[] ceph_getdir(String path);
-
- /*
- * Create the specified directory and any required intermediate ones with the
- * given mode.
- */
- abstract protected int ceph_mkdirs(String path, int mode);
-
- /*
- * Open a file to append. If the file does not exist, it will be created.
- * Opening a dir is possible but may have bad results.
- * Inputs:
- * String path: The path to open.
- * Returns: an int filehandle, or a number<0 if an error occurs.
- */
- abstract protected int ceph_open_for_append(String path);
-
- /*
- * Open a file for reading.
- * Opening a dir is possible but may have bad results.
- * Inputs:
- * String path: The path to open.
- * Returns: an int filehandle, or a number<0 if an error occurs.
- */
- abstract protected int ceph_open_for_read(String path);
-
- /*
- * Opens a file for overwriting; creates it if necessary.
- * Opening a dir is possible but may have bad results.
- * Inputs:
- * String path: The path to open.
- * int mode: The mode to open with.
- * Returns: an int filehandle, or a number<0 if an error occurs.
- */
- abstract protected int ceph_open_for_overwrite(String path, int mode);
-
- /*
- * Closes the given file. Returns 0 on success, or a negative
- * error code otherwise.
- */
- abstract protected int ceph_close(int filehandle);
-
- /*
- * Change the mode on a path.
- * Inputs:
- * String path: The path to change mode on.
- * int mode: The mode to apply.
- * Returns: true if the mode is properly applied, false if there
- * is any error.
- */
- abstract protected boolean ceph_setPermission(String path, int mode);
-
- /*
- * Closes the Ceph client. This should be called before shutting down
- * (multiple times is okay but redundant).
- */
- abstract protected boolean ceph_kill_client();
-
- /*
- * Get the statistics on a path returned in a custom format defined
- * in CephFileSystem.
- * Inputs:
- * String path: The path to stat.
- * Stat fill: The stat object to fill.
- * Returns: true if the stat is successful, false otherwise.
- */
- abstract protected boolean ceph_stat(String path, CephFileSystem.Stat fill);
-
- /*
- * Check how many times a file should be replicated. If it is,
- * degraded it may not actually be replicated this often.
- * Inputs:
- * int fh: a file descriptor
- * Returns: an int containing the number of times replicated.
- */
- abstract protected int ceph_replication(String path);
-
- /*
- * Find the IP address of the primary OSD for a given file and offset.
- * Inputs:
- * int fh: The filehandle for the file.
- * long offset: The offset to get the location of.
- * Returns: an array of String of the location as IP, or NULL if there is an error.
- */
- abstract protected String[] ceph_hosts(int fh, long offset);
-
- /*
- * Set the mtime and atime for a given path.
- * Inputs:
- * String path: The path to set the times for.
- * long mtime: The mtime to set, in millis since epoch (-1 to not set).
- * long atime: The atime to set, in millis since epoch (-1 to not set)
- * Returns: 0 if successful, an error code otherwise.
- */
- abstract protected int ceph_setTimes(String path, long mtime, long atime);
-
- /*
- * Get the current position in a file (as a long) of a given filehandle.
- * Returns: (long) current file position on success, or a
- * negative error code on failure.
- */
- abstract protected long ceph_getpos(int fh);
-
- /*
- * Write the given buffer contents to the given filehandle.
- * Inputs:
- * int fh: The filehandle to write to.
- * byte[] buffer: The buffer to write from
- * int buffer_offset: The position in the buffer to write from
- * int length: The number of (sequential) bytes to write.
- * Returns: int, on success the number of bytes written, on failure
- * a negative error code.
- */
- abstract protected int ceph_write(int fh, byte[] buffer, int buffer_offset, int length);
-
- /*
- * Reads into the given byte array from the current position.
- * Inputs:
- * int fh: the filehandle to read from
- * byte[] buffer: the byte array to read into
- * int buffer_offset: where in the buffer to start writing
- * int length: how much to read.
- * There'd better be enough space in the buffer to write all
- * the data from the given offset!
- * Returns: the number of bytes read on success (as an int),
- * or an error code otherwise. */
- abstract protected int ceph_read(int fh, byte[] buffer, int buffer_offset, int length);
-
- /*
- * Seeks to the given position in the given file.
- * Inputs:
- * int fh: The filehandle to seek in.
- * long pos: The position to seek to.
- * Returns: the new position (as a long) of the filehandle on success,
- * or a negative error code on failure. */
- abstract protected long ceph_seek_from_start(int fh, long pos);
-}
diff --git a/src/client/hadoop/ceph/CephFaker.java b/src/client/hadoop/ceph/CephFaker.java
deleted file mode 100644
index c598f536039..00000000000
--- a/src/client/hadoop/ceph/CephFaker.java
+++ /dev/null
@@ -1,483 +0,0 @@
-// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*-
-
-/**
- *
- * Licensed under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- *
- *
- * This uses the local Filesystem but pretends to be communicating
- * with a Ceph deployment, for unit testing the CephFileSystem.
- */
-
-package org.apache.hadoop.fs.ceph;
-
-
-import java.net.URI;
-import java.util.Hashtable;
-import java.io.Closeable;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.BlockLocation;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
-
-
-class CephFaker extends CephFS {
- private static final Log LOG = LogFactory.getLog(CephFaker.class);
- FileSystem localFS;
- String localPrefix;
- int blockSize;
- Configuration conf;
- Hashtable<Integer, Object> files;
- Hashtable<Integer, String> filenames;
- int fileCount = 0;
- boolean initialized = false;
-
- public CephFaker(Configuration con, Log log) {
- conf = con;
- files = new Hashtable<Integer, Object>();
- filenames = new Hashtable<Integer, String>();
- }
-
- protected boolean ceph_initializeClient(String args, int block_size) {
- if (!initialized) {
- // let's remember the default block_size
- blockSize = block_size;
-
- /* for a real Ceph deployment, this starts up the client,
- * sets debugging levels, etc. We just need to get the
- * local FileSystem to use, and we'll ignore any
- * command-line arguments. */
- try {
- localFS = FileSystem.getLocal(conf);
- localFS.initialize(URI.create("file://localhost"), conf);
- localFS.setVerifyChecksum(false);
- String testDir = conf.get("hadoop.tmp.dir");
-
- localPrefix = localFS.getWorkingDirectory().toString();
- int testDirLoc = localPrefix.indexOf(testDir) - 1;
-
- if (-2 == testDirLoc) {
- testDirLoc = localPrefix.length();
- }
- localPrefix = localPrefix.substring(0, testDirLoc) + "/"
- + conf.get("hadoop.tmp.dir");
-
- localFS.setWorkingDirectory(
- new Path(localPrefix + "/user/" + System.getProperty("user.name")));
- // I don't know why, but the unit tests expect the default
- // working dir to be /user/username, so satisfy them!
- // debug("localPrefix is " + localPrefix, INFO);
- } catch (IOException e) {
- return false;
- }
- initialized = true;
- }
- return true;
- }
-
- protected String ceph_getcwd() {
- return sanitize_path(localFS.getWorkingDirectory().toString());
- }
-
- protected boolean ceph_setcwd(String path) {
- localFS.setWorkingDirectory(new Path(prepare_path(path)));
- return true;
- }
-
- // the caller is responsible for ensuring empty dirs
- protected boolean ceph_rmdir(String pth) {
- Path path = new Path(prepare_path(pth));
- boolean ret = false;
-
- try {
- if (localFS.listStatus(path).length <= 1) {
- ret = localFS.delete(path, true);
- }
- } catch (IOException e) {}
- return ret;
- }
-
- // this needs to work on (empty) directories too
- protected boolean ceph_unlink(String path) {
- path = prepare_path(path);
- boolean ret = false;
-
- if (ceph_isdirectory(path)) {
- ret = ceph_rmdir(path);
- } else {
- try {
- ret = localFS.delete(new Path(path), false);
- } catch (IOException e) {}
- }
- return ret;
- }
-
- protected boolean ceph_rename(String oldName, String newName) {
- oldName = prepare_path(oldName);
- newName = prepare_path(newName);
- try {
- Path parent = new Path(newName).getParent();
- Path newPath = new Path(newName);
-
- if (localFS.exists(parent) && !localFS.exists(newPath)) {
- return localFS.rename(new Path(oldName), newPath);
- }
- return false;
- } catch (IOException e) {
- return false;
- }
- }
-
- protected boolean ceph_exists(String path) {
- path = prepare_path(path);
- boolean ret = false;
-
- try {
- ret = localFS.exists(new Path(path));
- } catch (IOException e) {}
- return ret;
- }
-
- protected long ceph_getblocksize(String path) {
- path = prepare_path(path);
- try {
- FileStatus status = localFS.getFileStatus(new Path(path));
-
- return status.getBlockSize();
- } catch (FileNotFoundException e) {
- return -CephFS.ENOENT;
- } catch (IOException e) {
- return -1; // just fail generically
- }
- }
-
- protected boolean ceph_isdirectory(String path) {
- path = prepare_path(path);
- try {
- FileStatus status = localFS.getFileStatus(new Path(path));
-
- return status.isDir();
- } catch (IOException e) {
- return false;
- }
- }
-
- protected boolean ceph_isfile(String path) {
- path = prepare_path(path);
- boolean ret = false;
-
- try {
- FileStatus status = localFS.getFileStatus(new Path(path));
-
- ret = !status.isDir();
- } catch (Exception e) {}
- return ret;
- }
-
- protected String[] ceph_getdir(String path) {
- path = prepare_path(path);
- if (!ceph_isdirectory(path)) {
- return null;
- }
- try {
- FileStatus[] stats = localFS.listStatus(new Path(path));
- String[] names = new String[stats.length];
- String name;
-
- for (int i = 0; i < stats.length; ++i) {
- name = stats[i].getPath().toString();
- names[i] = name.substring(name.lastIndexOf(Path.SEPARATOR) + 1);
- }
- return names;
- } catch (IOException e) {}
- return null;
- }
-
- protected int ceph_mkdirs(String path, int mode) {
- path = prepare_path(path);
- // debug("ceph_mkdirs on " + path, INFO);
- try {
- if (localFS.mkdirs(new Path(path), new FsPermission((short) mode))) {
- return 0;
- }
- } catch (IOException e) {}
- if (ceph_isdirectory(path)) { // apparently it already existed
- return -EEXIST;
- } else if (ceph_isfile(path)) {
- return -ENOTDIR;
- }
- return -1;
- }
-
- /*
- * Unlike a real Ceph deployment, you can't do opens on a directory.
- * Since that has unpredictable behavior and you shouldn't do it anyway,
- * it's okay.
- */
- protected int ceph_open_for_append(String path) {
- path = prepare_path(path);
- FSDataOutputStream stream;
-
- try {
- stream = localFS.append(new Path(path));
- files.put(new Integer(fileCount), stream);
- filenames.put(new Integer(fileCount), path);
- return fileCount++;
- } catch (IOException e) {}
- return -1; // failure
- }
-
- protected int ceph_open_for_read(String path) {
- path = prepare_path(path);
- FSDataInputStream stream;
-
- try {
- stream = localFS.open(new Path(path));
- files.put(new Integer(fileCount), stream);
- filenames.put(new Integer(fileCount), path);
- LOG.info("ceph_open_for_read fh:" + fileCount + ", pathname:" + path);
- return fileCount++;
- } catch (IOException e) {}
- return -1; // failure
- }
-
- protected int ceph_open_for_overwrite(String path, int mode) {
- path = prepare_path(path);
- FSDataOutputStream stream;
-
- try {
- stream = localFS.create(new Path(path));
- files.put(new Integer(fileCount), stream);
- filenames.put(new Integer(fileCount), path);
- LOG.info("ceph_open_for_overwrite fh:" + fileCount + ", pathname:" + path);
- return fileCount++;
- } catch (IOException e) {}
- return -1; // failure
- }
-
- protected int ceph_close(int filehandle) {
- LOG.info("ceph_close(filehandle " + filehandle + ")");
- try {
- ((Closeable) files.get(new Integer(filehandle))).close();
- if (null == files.get(new Integer(filehandle))) {
- return -ENOENT; // this isn't quite the right error code,
- // but the important part is it's negative
- }
- return 0; // hurray, success
- } catch (NullPointerException ne) {
- LOG.warn("ceph_close caught NullPointerException!" + ne);
- } // err, how?
- catch (IOException ie) {
- LOG.warn("ceph_close caught IOException!" + ie);
- }
- return -1; // failure
- }
-
- protected boolean ceph_setPermission(String pth, int mode) {
- pth = prepare_path(pth);
- Path path = new Path(pth);
- boolean ret = false;
-
- try {
- localFS.setPermission(path, new FsPermission((short) mode));
- ret = true;
- } catch (IOException e) {}
- return ret;
- }
-
- // rather than try and match a Ceph deployment's behavior exactly,
- // just make bad things happen if they try and call methods after this
- protected boolean ceph_kill_client() {
- // debug("ceph_kill_client", INFO);
- localFS.setWorkingDirectory(new Path(localPrefix));
- // debug("working dir is now " + localFS.getWorkingDirectory(), INFO);
- try {
- localFS.close();
- } catch (Exception e) {}
- localFS = null;
- files = null;
- filenames = null;
- return true;
- }
-
- protected boolean ceph_stat(String pth, CephFileSystem.Stat fill) {
- pth = prepare_path(pth);
- Path path = new Path(pth);
- boolean ret = false;
-
- try {
- FileStatus status = localFS.getFileStatus(path);
-
- fill.size = status.getLen();
- fill.is_dir = status.isDir();
- fill.block_size = status.getBlockSize();
- fill.mod_time = status.getModificationTime();
- fill.access_time = status.getAccessTime();
- fill.mode = status.getPermission().toShort();
- ret = true;
- } catch (IOException e) {}
- return ret;
- }
-
- protected int ceph_replication(String path) {
- path = prepare_path(path);
- int ret = -1; // -1 for failure
-
- try {
- ret = localFS.getFileStatus(new Path(path)).getReplication();
- } catch (IOException e) {}
- return ret;
- }
-
- protected String[] ceph_hosts(int fh, long offset) {
- String[] ret = null;
-
- try {
- BlockLocation[] locs = localFS.getFileBlockLocations(
- localFS.getFileStatus(new Path(filenames.get(new Integer(fh)))),
- offset, 1);
-
- ret = locs[0].getNames();
- } catch (IOException e) {} catch (NullPointerException f) {}
- return ret;
- }
-
- protected int ceph_setTimes(String pth, long mtime, long atime) {
- pth = prepare_path(pth);
- Path path = new Path(pth);
- int ret = -1; // generic fail
-
- try {
- localFS.setTimes(path, mtime, atime);
- ret = 0;
- } catch (IOException e) {}
- return ret;
- }
-
- protected long ceph_getpos(int fh) {
- long ret = -1; // generic fail
-
- try {
- Object stream = files.get(new Integer(fh));
-
- if (stream instanceof FSDataInputStream) {
- ret = ((FSDataInputStream) stream).getPos();
- } else if (stream instanceof FSDataOutputStream) {
- ret = ((FSDataOutputStream) stream).getPos();
- }
- } catch (IOException e) {} catch (NullPointerException f) {}
- return ret;
- }
-
- protected int ceph_write(int fh, byte[] buffer,
- int buffer_offset, int length) {
- LOG.info(
- "ceph_write fh:" + fh + ", buffer_offset:" + buffer_offset + ", length:"
- + length);
- long ret = -1; // generic fail
-
- try {
- FSDataOutputStream os = (FSDataOutputStream) files.get(new Integer(fh));
-
- LOG.info("ceph_write got outputstream");
- long startPos = os.getPos();
-
- os.write(buffer, buffer_offset, length);
- ret = os.getPos() - startPos;
- } catch (IOException e) {
- LOG.warn("ceph_write caught IOException!");
- } catch (NullPointerException f) {
- LOG.warn("ceph_write caught NullPointerException!");
- }
- return (int) ret;
- }
-
- protected int ceph_read(int fh, byte[] buffer,
- int buffer_offset, int length) {
- long ret = -1; // generic fail
-
- try {
- FSDataInputStream is = (FSDataInputStream) files.get(new Integer(fh));
- long startPos = is.getPos();
-
- is.read(buffer, buffer_offset, length);
- ret = is.getPos() - startPos;
- } catch (IOException e) {} catch (NullPointerException f) {}
- return (int) ret;
- }
-
- protected long ceph_seek_from_start(int fh, long pos) {
- LOG.info("ceph_seek_from_start(fh " + fh + ", pos " + pos + ")");
- long ret = -1; // generic fail
-
- try {
- LOG.info("ceph_seek_from_start filename is " + filenames.get(new Integer(fh)));
- if (null == files.get(new Integer(fh))) {
- LOG.warn("ceph_seek_from_start: is is null!");
- }
- FSDataInputStream is = (FSDataInputStream) files.get(new Integer(fh));
-
- LOG.info("ceph_seek_from_start retrieved is!");
- is.seek(pos);
- ret = is.getPos();
- } catch (IOException e) {
- LOG.warn("ceph_seek_from_start caught IOException!");
- } catch (NullPointerException f) {
- LOG.warn("ceph_seek_from_start caught NullPointerException!");
- }
- return (int) ret;
- }
-
- /*
- * We need to remove the localFS file prefix before returning to Ceph
- */
- private String sanitize_path(String path) {
- // debug("sanitize_path(" + path + ")", INFO);
- /* if (path.startsWith("file:"))
- path = path.substring("file:".length()); */
- if (path.startsWith(localPrefix)) {
- path = path.substring(localPrefix.length());
- if (path.length() == 0) { // it was a root path
- path = "/";
- }
- }
- // debug("sanitize_path returning " + path, INFO);
- return path;
- }
-
- /*
- * If it's an absolute path we need to shove the
- * test dir onto the front as a prefix.
- */
- private String prepare_path(String path) {
- // debug("prepare_path(" + path + ")", INFO);
- if (path.startsWith("/")) {
- path = localPrefix + path;
- } else if (path.equals("..")) {
- if (ceph_getcwd().equals("/")) {
- path = ".";
- } // you can't go up past root!
- }
- // debug("prepare_path returning" + path, INFO);
- return path;
- }
-}
diff --git a/src/client/hadoop/ceph/CephFileSystem.java b/src/client/hadoop/ceph/CephFileSystem.java
deleted file mode 100644
index 95f22238b4d..00000000000
--- a/src/client/hadoop/ceph/CephFileSystem.java
+++ /dev/null
@@ -1,804 +0,0 @@
-// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*-
-
-/**
- *
- * Licensed under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- *
- *
- * Implements the Hadoop FS interfaces to allow applications to store
- * files in Ceph.
- */
-package org.apache.hadoop.fs.ceph;
-
-
-import java.io.IOException;
-import java.io.FileNotFoundException;
-import java.io.OutputStream;
-import java.net.URI;
-import java.net.InetAddress;
-import java.util.EnumSet;
-import java.lang.Math;
-import java.util.ArrayList;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.BlockLocation;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.util.Progressable;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.net.DNS;
-
-
-/**
- * <p>
- * A {@link FileSystem} backed by <a href="http://ceph.newdream.net">Ceph.</a>.
- * This will not start a Ceph instance; one must already be running.
- * </p>
- * Configuration of the CephFileSystem is handled via a few Hadoop
- * Configuration properties: <br>
- * fs.ceph.monAddr -- the ip address/port of the monitor to connect to. <br>
- * fs.ceph.libDir -- the directory that libcephfs and libhadoopceph are
- * located in. This assumes Hadoop is being run on a linux-style machine
- * with names like libcephfs.so.
- * fs.ceph.commandLine -- if you prefer you can fill in this property
- * just as you would when starting Ceph up from the command line. Specific
- * properties override any configuration specified here.
- * <p>
- * You can also enable debugging of the CephFileSystem and Ceph itself: <br>
- * fs.ceph.debug -- if 'true' will print out method enter/exit messages,
- * plus a little more.
- * fs.ceph.clientDebug/fs.ceph.messengerDebug -- will print out debugging
- * from the respective Ceph system of at least that importance.
- */
-public class CephFileSystem extends FileSystem {
- private static final Log LOG = LogFactory.getLog(CephFileSystem.class);
- private URI uri;
-
- private Path workingDir;
- private final Path root;
- private CephFS ceph = null;
-
- private static String CEPH_NAMESERVER;
- private static final String CEPH_NAMESERVER_KEY = "fs.ceph.nameserver";
- private static final String CEPH_NAMESERVER_DEFAULT = "localhost";
-
- /**
- * Create a new CephFileSystem.
- */
- public CephFileSystem() {
- root = new Path("/");
- }
-
- /**
- * Used for testing purposes, this constructor
- * sets the given CephFS instead of defaulting to a
- * CephTalker (with its assumed real Ceph instance to talk to).
- */
- public CephFileSystem(CephFS ceph_fs) {
- super();
- root = new Path("/");
- ceph = ceph_fs;
- }
-
- /**
- * Lets you get the URI of this CephFileSystem.
- * @return the URI.
- */
- public URI getUri() {
- LOG.debug("getUri:exit with return " + uri);
- return uri;
- }
-
- /**
- * Should be called after constructing a CephFileSystem but before calling
- * any other methods.
- * Starts up the connection to Ceph, reads in configuraton options, etc.
- * @param uri The URI for this filesystem.
- * @param conf The Hadoop Configuration to retrieve properties from.
- * @throws IOException if necessary properties are unset.
- */
- @Override
- public void initialize(URI uri, Configuration conf) throws IOException {
- super.initialize(uri, conf);
- setConf(conf);
- this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
- if (ceph == null) {
- ceph = new CephTalker(conf, LOG);
- }
-
- CEPH_NAMESERVER = conf.get(CEPH_NAMESERVER_KEY, CEPH_NAMESERVER_DEFAULT);
-
- // build up the arguments for Ceph
- String arguments = "CephFSInterface";
-
- arguments += conf.get("fs.ceph.commandLine", "");
- if (conf.get("fs.ceph.clientDebug") != null) {
- arguments += " --debug_client ";
- arguments += conf.get("fs.ceph.clientDebug");
- }
- if (conf.get("fs.ceph.messengerDebug") != null) {
- arguments += " --debug_ms ";
- arguments += conf.get("fs.ceph.messengerDebug");
- }
- if (conf.get("fs.ceph.monAddr") != null) {
- arguments += " -m ";
- arguments += conf.get("fs.ceph.monAddr");
- }
- arguments += " --client-readahead-max-periods="
- + conf.get("fs.ceph.readahead", "1");
- // make sure they gave us a ceph monitor address or conf file
- LOG.info("initialize:Ceph initialization arguments: " + arguments);
- if ((conf.get("fs.ceph.monAddr") == null) && (arguments.indexOf("-m") == -1)
- && (arguments.indexOf("-c") == -1)) {
- LOG.fatal("initialize:You need to specify a Ceph monitor address.");
- throw new IOException(
- "You must specify a Ceph monitor address or config file!");
- }
- // Initialize the client
- if (!ceph.ceph_initializeClient(arguments,
- conf.getInt("fs.ceph.blockSize", 1 << 26))) {
- LOG.fatal("initialize:Ceph initialization failed!");
- throw new IOException("Ceph initialization failed!");
- }
- LOG.info("initialize:Ceph initialized client. Setting cwd to /");
- ceph.ceph_setcwd("/");
- LOG.debug("initialize:exit");
-
- this.workingDir = getHomeDirectory();
- }
-
- /**
- * Close down the CephFileSystem. Runs the base-class close method
- * and then kills the Ceph client itself.
- */
- @Override
- public void close() throws IOException {
- LOG.debug("close:enter");
- super.close(); // this method does stuff, make sure it's run!
- LOG.trace("close: Calling ceph_kill_client from Java");
- ceph.ceph_kill_client();
- LOG.debug("close:exit");
- }
-
- /**
- * Get an FSDataOutputStream to append onto a file.
- * @param file The File you want to append onto
- * @param bufferSize Ceph does internal buffering but you can buffer in the Java code as well if you like.
- * @param progress The Progressable to report progress to.
- * Reporting is limited but exists.
- * @return An FSDataOutputStream that connects to the file on Ceph.
- * @throws IOException If the file cannot be found or appended to.
- */
- public FSDataOutputStream append(Path file, int bufferSize,
- Progressable progress) throws IOException {
- LOG.debug("append:enter with path " + file + " bufferSize " + bufferSize);
- Path abs_path = makeAbsolute(file);
-
- if (progress != null) {
- progress.progress();
- }
- LOG.trace("append: Entering ceph_open_for_append from Java");
- int fd = ceph.ceph_open_for_append(getCephPath(abs_path));
-
- LOG.trace("append: Returned to Java");
- if (progress != null) {
- progress.progress();
- }
- if (fd < 0) { // error in open
- throw new IOException(
- "append: Open for append failed on path \"" + abs_path.toString()
- + "\"");
- }
- CephOutputStream cephOStream = new CephOutputStream(getConf(), ceph, fd,
- bufferSize);
-
- LOG.debug("append:exit");
- return new FSDataOutputStream(cephOStream, statistics);
- }
-
- /**
- * Get the current working directory for the given file system
- * @return the directory Path
- */
- public Path getWorkingDirectory() {
- return workingDir;
- }
-
- /**
- * Set the current working directory for the given file system. All relative
- * paths will be resolved relative to it.
- *
- * @param dir The directory to change to.
- */
- @Override
- public void setWorkingDirectory(Path dir) {
- workingDir = makeAbsolute(dir);
- }
-
- /**
- * Return only the path component from a potentially fully qualified path.
- */
- private String getCephPath(Path path) {
- if (!path.isAbsolute()) {
- throw new IllegalArgumentException("Path must be absolute: " + path);
- }
- return path.toUri().getPath();
- }
-
- /**
- * Check if a path exists.
- * Overriden because it's moderately faster than the generic implementation.
- * @param path The file to check existence on.
- * @return true if the file exists, false otherwise.
- */
- @Override
- public boolean exists(Path path) throws IOException {
- LOG.debug("exists:enter with path " + path);
- boolean result;
- Path abs_path = makeAbsolute(path);
-
- if (abs_path.equals(root)) {
- result = true;
- } else {
- LOG.trace(
- "exists:Calling ceph_exists from Java on path " + abs_path.toString());
- result = ceph.ceph_exists(getCephPath(abs_path));
- LOG.trace("exists:Returned from ceph_exists to Java");
- }
- LOG.debug("exists:exit with value " + result);
- return result;
- }
-
- /**
- * Create a directory and any nonexistent parents. Any portion
- * of the directory tree can exist without error.
- * @param path The directory path to create
- * @param perms The permissions to apply to the created directories.
- * @return true if successful, false otherwise
- * @throws IOException if the path is a child of a file.
- */
- @Override
- public boolean mkdirs(Path path, FsPermission perms) throws IOException {
- LOG.debug("mkdirs:enter with path " + path);
- Path abs_path = makeAbsolute(path);
-
- LOG.trace("mkdirs:calling ceph_mkdirs from Java");
- int result = ceph.ceph_mkdirs(getCephPath(abs_path), (int) perms.toShort());
-
- if (result != 0) {
- LOG.warn(
- "mkdirs: make directory " + abs_path + "Failing with result " + result);
- if (-ceph.ENOTDIR == result) {
- throw new IOException("Parent path is not a directory");
- }
- return false;
- } else {
- LOG.debug("mkdirs:exiting succesfully");
- return true;
- }
- }
-
- /**
- * Check if a path is a file. This is moderately faster than the
- * generic implementation.
- * @param path The path to check.
- * @return true if the path is definitely a file, false otherwise.
- */
- @Override
- public boolean isFile(Path path) throws IOException {
- LOG.debug("isFile:enter with path " + path);
- Path abs_path = makeAbsolute(path);
- boolean result;
-
- if (abs_path.equals(root)) {
- result = false;
- } else {
- LOG.trace("isFile:entering ceph_isfile from Java");
- result = ceph.ceph_isfile(getCephPath(abs_path));
- }
- LOG.debug("isFile:exit with result " + result);
- return result;
- }
-
- /**
- * Get stat information on a file. This does not fill owner or group, as
- * Ceph's support for these is a bit different than HDFS'.
- * @param path The path to stat.
- * @return FileStatus object containing the stat information.
- * @throws FileNotFoundException if the path could not be resolved.
- */
- public FileStatus getFileStatus(Path path) throws IOException {
- LOG.debug("getFileStatus:enter with path " + path);
- Path abs_path = makeAbsolute(path);
- // sadly, Ceph doesn't really do uids/gids just yet, but
- // everything else is filled
- FileStatus status;
- Stat lstat = new Stat();
-
- LOG.trace("getFileStatus: calling ceph_stat from Java");
- if (ceph.ceph_stat(getCephPath(abs_path), lstat)) {
- status = new FileStatus(lstat.size, lstat.is_dir,
- ceph.ceph_replication(getCephPath(abs_path)), lstat.block_size,
- lstat.mod_time, lstat.access_time,
- new FsPermission((short) lstat.mode), System.getProperty("user.name"), null,
- path.makeQualified(this));
- } else { // fail out
- throw new FileNotFoundException(
- "org.apache.hadoop.fs.ceph.CephFileSystem: File " + path
- + " does not exist or could not be accessed");
- }
-
- LOG.debug("getFileStatus:exit");
- return status;
- }
-
- /**
- * Get the FileStatus for each listing in a directory.
- * @param path The directory to get listings from.
- * @return FileStatus[] containing one FileStatus for each directory listing;
- * null if path does not exist.
- */
- public FileStatus[] listStatus(Path path) throws IOException {
- LOG.debug("listStatus:enter with path " + path);
- Path abs_path = makeAbsolute(path);
- Path[] paths = listPaths(abs_path);
-
- if (paths != null) {
- FileStatus[] statuses = new FileStatus[paths.length];
-
- for (int i = 0; i < paths.length; ++i) {
- statuses[i] = getFileStatus(paths[i]);
- }
- LOG.debug("listStatus:exit");
- return statuses;
- }
-
- if (isFile(path)) {
- return new FileStatus[] { getFileStatus(path) };
- }
-
- return null;
- }
-
- @Override
- public void setPermission(Path p, FsPermission permission) throws IOException {
- LOG.debug(
- "setPermission:enter with path " + p + " and permissions " + permission);
- Path abs_path = makeAbsolute(p);
-
- LOG.trace("setPermission:calling ceph_setpermission from Java");
- ceph.ceph_setPermission(getCephPath(abs_path), permission.toShort());
- LOG.debug("setPermission:exit");
- }
-
- /**
- * Set access/modification times of a file.
- * @param p The path
- * @param mtime Set modification time in number of millis since Jan 1, 1970.
- * @param atime Set access time in number of millis since Jan 1, 1970.
- */
- @Override
- public void setTimes(Path p, long mtime, long atime) throws IOException {
- LOG.debug(
- "setTimes:enter with path " + p + " mtime:" + mtime + " atime:" + atime);
- Path abs_path = makeAbsolute(p);
-
- LOG.trace("setTimes:calling ceph_setTimes from Java");
- int r = ceph.ceph_setTimes(getCephPath(abs_path), mtime, atime);
-
- if (r < 0) {
- throw new IOException(
- "Failed to set times on path " + abs_path.toString() + " Error code: "
- + r);
- }
- LOG.debug("setTimes:exit");
- }
-
- /**
- * Create a new file and open an FSDataOutputStream that's connected to it.
- * @param path The file to create.
- * @param permission The permissions to apply to the file.
- * @param overwrite If true, overwrite any existing file with
- * this name; otherwise don't.
- * @param bufferSize Ceph does internal buffering, but you can buffer
- * in the Java code too if you like.
- * @param replication Ignored by Ceph. This can be
- * configured via Ceph configuration.
- * @param blockSize Ignored by Ceph. You can set client-wide block sizes
- * via the fs.ceph.blockSize param if you like.
- * @param progress A Progressable to report back to.
- * Reporting is limited but exists.
- * @return An FSDataOutputStream pointing to the created file.
- * @throws IOException if the path is an
- * existing directory, or the path exists but overwrite is false, or there is a
- * failure in attempting to open for append with Ceph.
- */
- public FSDataOutputStream create(Path path,
- FsPermission permission,
- boolean overwrite,
- int bufferSize,
- short replication,
- long blockSize,
- Progressable progress) throws IOException {
- LOG.debug("create:enter with path " + path);
- Path abs_path = makeAbsolute(path);
-
- if (progress != null) {
- progress.progress();
- }
- // We ignore replication since that's not configurable here, and
- // progress reporting is quite limited.
- // Required semantics: if the file exists, overwrite if 'overwrite' is set;
- // otherwise, throw an exception
-
- // Step 1: existence test
- boolean exists = exists(abs_path);
-
- if (exists) {
- if (getFileStatus(abs_path).isDir()) {
- throw new IOException(
- "create: Cannot overwrite existing directory \"" + path.toString()
- + "\" with a file");
- }
- if (!overwrite) {
- throw new IOException(
- "createRaw: Cannot open existing file \"" + abs_path.toString()
- + "\" for writing without overwrite flag");
- }
- }
-
- if (progress != null) {
- progress.progress();
- }
-
- // Step 2: create any nonexistent directories in the path
- if (!exists) {
- Path parent = abs_path.getParent();
-
- if (parent != null) { // if parent is root, we're done
- int r = ceph.ceph_mkdirs(getCephPath(parent), permission.toShort());
-
- if (!(r == 0 || r == -ceph.EEXIST)) {
- throw new IOException("Error creating parent directory; code: " + r);
- }
- }
- if (progress != null) {
- progress.progress();
- }
- }
- // Step 3: open the file
- LOG.trace("calling ceph_open_for_overwrite from Java");
- int fh = ceph.ceph_open_for_overwrite(getCephPath(abs_path),
- (int) permission.toShort());
-
- if (progress != null) {
- progress.progress();
- }
- LOG.trace("Returned from ceph_open_for_overwrite to Java with fh " + fh);
- if (fh < 0) {
- throw new IOException(
- "create: Open for overwrite failed on path \"" + path.toString()
- + "\"");
- }
-
- // Step 4: create the stream
- OutputStream cephOStream = new CephOutputStream(getConf(), ceph, fh,
- bufferSize);
-
- LOG.debug("create:exit");
- return new FSDataOutputStream(cephOStream, statistics);
- }
-
- /**
- * Open a Ceph file and attach the file handle to an FSDataInputStream.
- * @param path The file to open
- * @param bufferSize Ceph does internal buffering; but you can buffer in
- * the Java code too if you like.
- * @return FSDataInputStream reading from the given path.
- * @throws IOException if the path DNE or is a
- * directory, or there is an error getting data to set up the FSDataInputStream.
- */
- public FSDataInputStream open(Path path, int bufferSize) throws IOException {
- LOG.debug("open:enter with path " + path);
- Path abs_path = makeAbsolute(path);
-
- int fh = ceph.ceph_open_for_read(getCephPath(abs_path));
-
- if (fh < 0) { // uh-oh, something's bad!
- if (fh == -ceph.ENOENT) { // well that was a stupid open
- throw new IOException(
- "open: absolute path \"" + abs_path.toString()
- + "\" does not exist");
- } else { // hrm...the file exists but we can't open it :(
- throw new IOException("open: Failed to open file " + abs_path.toString());
- }
- }
-
- if (getFileStatus(abs_path).isDir()) { // yes, it is possible to open Ceph directories
- // but that doesn't mean you should in Hadoop!
- ceph.ceph_close(fh);
- throw new IOException(
- "open: absolute path \"" + abs_path.toString() + "\" is a directory!");
- }
- Stat lstat = new Stat();
-
- LOG.trace("open:calling ceph_stat from Java");
- ceph.ceph_stat(getCephPath(abs_path), lstat);
- LOG.trace("open:returned to Java");
- long size = lstat.size;
-
- if (size < 0) {
- throw new IOException(
- "Failed to get file size for file " + abs_path.toString()
- + " but succeeded in opening file. Something bizarre is going on.");
- }
- FSInputStream cephIStream = new CephInputStream(getConf(), ceph, fh, size,
- bufferSize);
-
- LOG.debug("open:exit");
- return new FSDataInputStream(cephIStream);
- }
-
- /**
- * Rename a file or directory.
- * @param src The current path of the file/directory
- * @param dst The new name for the path.
- * @return true if the rename succeeded, false otherwise.
- */
- @Override
- public boolean rename(Path src, Path dst) throws IOException {
- LOG.debug("rename:enter with src:" + src + " and dest:" + dst);
- Path abs_src = makeAbsolute(src);
- Path abs_dst = makeAbsolute(dst);
-
- LOG.trace("calling ceph_rename from Java");
- boolean result = ceph.ceph_rename(getCephPath(abs_src), getCephPath(abs_dst));
-
- if (!result) {
- boolean isDir = false;
- try {
- isDir = getFileStatus(abs_dst).isDir();
- } catch (FileNotFoundException e) {}
- if (isDir) { // move the srcdir into destdir
- LOG.debug("ceph_rename failed but dst is a directory!");
- Path new_dst = new Path(abs_dst, abs_src.getName());
-
- result = rename(abs_src, new_dst);
- LOG.debug(
- "attempt to move " + abs_src.toString() + " to "
- + new_dst.toString() + "has result:" + result);
- }
- }
- LOG.debug("rename:exit with result: " + result);
- return result;
- }
-
- /*
- * Attempt to convert an IP into its hostname
- */
- private String[] ips2Hosts(String[] ips) {
- ArrayList<String> hosts = new ArrayList<String>();
- for (String ip : ips) {
- try {
- String host = DNS.reverseDns(InetAddress.getByName(ip), CEPH_NAMESERVER);
- if (host.charAt(host.length()-1) == '.') {
- host = host.substring(0, host.length()-1);
- }
- hosts.add(host); /* append */
- } catch (Exception e) {
- LOG.error("reverseDns ["+ip+"] failed: "+ e);
- }
- }
- return hosts.toArray(new String[hosts.size()]);
- }
-
- /**
- * Get a BlockLocation object for each block in a file.
- *
- * Note that this doesn't include port numbers in the name field as
- * Ceph handles slow/down servers internally. This data should be used
- * only for selecting which servers to run which jobs on.
- *
- * @param file A FileStatus object corresponding to the file you want locations for.
- * @param start The offset of the first part of the file you are interested in.
- * @param len The amount of the file past the offset you are interested in.
- * @return A BlockLocation[] where each object corresponds to a block within
- * the given range.
- */
- @Override
- public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len) throws IOException {
- Path abs_path = makeAbsolute(file.getPath());
-
- int fh = ceph.ceph_open_for_read(getCephPath(abs_path));
- if (fh < 0) {
- LOG.error("getFileBlockLocations:got error " + fh + ", exiting and returning null!");
- return null;
- }
-
- long blockSize = ceph.ceph_getblocksize(getCephPath(abs_path));
- BlockLocation[] locations = new BlockLocation[(int) Math.ceil(len / (float) blockSize)];
-
- for (int i = 0; i < locations.length; ++i) {
- long offset = start + i * blockSize;
- long blockStart = start + i * blockSize - (start % blockSize);
- String ips[] = ceph.ceph_hosts(fh, offset);
- String hosts[] = ips2Hosts(ips);
- locations[i] = new BlockLocation(null, hosts, blockStart, blockSize);
- LOG.debug("getFileBlockLocations: location[" + i + "]: " + locations[i]);
- }
-
- ceph.ceph_close(fh);
- return locations;
- }
-
- @Deprecated
- public boolean delete(Path path) throws IOException {
- return delete(path, false);
- }
-
- /**
- * Delete the given path, and optionally its children.
- * @param path the path to delete.
- * @param recursive If the path is a non-empty directory and this is false,
- * delete will throw an IOException. If path is a file this is ignored.
- * @return true if the delete succeeded, false otherwise (including if
- * path doesn't exist).
- * @throws IOException if you attempt to non-recursively delete a directory,
- * or you attempt to delete the root directory.
- */
- public boolean delete(Path path, boolean recursive) throws IOException {
- LOG.debug("delete:enter with path " + path + " and recursive=" + recursive);
- Path abs_path = makeAbsolute(path);
-
- // sanity check
- if (abs_path.equals(root)) {
- throw new IOException("Error: deleting the root directory is a Bad Idea.");
- }
- if (!exists(abs_path)) {
- return false;
- }
-
- // if the path is a file, try to delete it.
- if (isFile(abs_path)) {
- LOG.trace("delete:calling ceph_unlink from Java with path " + abs_path);
- boolean result = ceph.ceph_unlink(getCephPath(abs_path));
-
- if (!result) {
- LOG.error(
- "delete: failed to delete file \"" + abs_path.toString() + "\".");
- }
- LOG.debug("delete:exit with success=" + result);
- return result;
- }
-
- /* The path is a directory, so recursively try to delete its contents,
- and then delete the directory. */
- // get the entries; listPaths will remove . and .. for us
- Path[] contents = listPaths(abs_path);
-
- if (contents == null) {
- LOG.error(
- "delete: Failed to read contents of directory \""
- + abs_path.toString() + "\" while trying to delete it, BAILING");
- return false;
- }
- if (!recursive && contents.length > 0) {
- throw new IOException("Directories must be deleted recursively!");
- }
- // delete the entries
- LOG.debug("delete: recursively calling delete on contents of " + abs_path);
- for (Path p : contents) {
- if (!delete(p, true)) {
- LOG.error(
- "delete: Failed to delete file \"" + p.toString()
- + "\" while recursively deleting \"" + abs_path.toString()
- + "\", BAILING");
- return false;
- }
- }
- // if we've come this far it's a now-empty directory, so delete it!
- boolean result = ceph.ceph_rmdir(getCephPath(abs_path));
-
- if (!result) {
- LOG.error(
- "delete: failed to delete \"" + abs_path.toString() + "\", BAILING");
- }
- LOG.debug("delete:exit");
- return result;
- }
-
- /**
- * Returns the default replication value of 1. This may
- * NOT be the actual value, as replication is controlled
- * by a separate Ceph configuration.
- */
- @Override
- public short getDefaultReplication() {
- return 1;
- }
-
- /**
- * Get the default block size.
- * @return the default block size, in bytes, as a long.
- */
- @Override
- public long getDefaultBlockSize() {
- return getConf().getInt("fs.ceph.blockSize", 1 << 26);
- }
-
- /**
- * Adds the working directory to path if path is not already
- * an absolute path. The URI scheme is not removed here. It
- * is removed only when users (e.g. ceph native calls) need
- * the path-only portion.
- */
- private Path makeAbsolute(Path path) {
- if (path.isAbsolute()) {
- return path;
- }
- return new Path(workingDir, path);
- }
-
- private Path[] listPaths(Path path) throws IOException {
- LOG.debug("listPaths:enter with path " + path);
- String dirlist[];
-
- Path abs_path = makeAbsolute(path);
-
- // If it's a directory, get the listing. Otherwise, complain and give up.
- LOG.debug("calling ceph_getdir from Java with path " + abs_path);
- dirlist = ceph.ceph_getdir(getCephPath(abs_path));
- LOG.debug("returning from ceph_getdir to Java");
-
- if (dirlist == null) {
- return null;
- }
-
- // convert the strings to Paths
- Path[] paths = new Path[dirlist.length];
-
- for (int i = 0; i < dirlist.length; ++i) {
- LOG.trace(
- "Raw enumeration of paths in \"" + abs_path.toString() + "\": \""
- + dirlist[i] + "\"");
- // convert each listing to an absolute path
- Path raw_path = new Path(dirlist[i]);
-
- if (raw_path.isAbsolute()) {
- paths[i] = raw_path;
- } else {
- paths[i] = new Path(abs_path, raw_path);
- }
- }
- LOG.debug("listPaths:exit");
- return paths;
- }
-
- static class Stat {
- public long size;
- public boolean is_dir;
- public long block_size;
- public long mod_time;
- public long access_time;
- public int mode;
-
- public Stat() {}
- }
-}
diff --git a/src/client/hadoop/ceph/CephInputStream.java b/src/client/hadoop/ceph/CephInputStream.java
deleted file mode 100644
index d9668d031ba..00000000000
--- a/src/client/hadoop/ceph/CephInputStream.java
+++ /dev/null
@@ -1,254 +0,0 @@
-// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*-
-
-/**
- *
- * Licensed under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- *
- *
- * Implements the Hadoop FS interfaces to allow applications to store
- * files in Ceph.
- */
-package org.apache.hadoop.fs.ceph;
-
-
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSInputStream;
-
-
-/**
- * <p>
- * An {@link FSInputStream} for a CephFileSystem and corresponding
- * Ceph instance.
- */
-public class CephInputStream extends FSInputStream {
- private static final Log LOG = LogFactory.getLog(CephInputStream.class);
- private boolean closed;
-
- private int fileHandle;
-
- private long fileLength;
-
- private CephFS ceph;
-
- private byte[] buffer;
- private int bufPos = 0;
- private int bufValid = 0;
- private long cephPos = 0;
-
- /**
- * Create a new CephInputStream.
- * @param conf The system configuration. Unused.
- * @param fh The filehandle provided by Ceph to reference.
- * @param flength The current length of the file. If the length changes
- * you will need to close and re-open it to access the new data.
- */
- public CephInputStream(Configuration conf, CephFS cephfs,
- int fh, long flength, int bufferSize) {
- // Whoever's calling the constructor is responsible for doing the actual ceph_open
- // call and providing the file handle.
- fileLength = flength;
- fileHandle = fh;
- closed = false;
- ceph = cephfs;
- buffer = new byte[bufferSize];
- LOG.debug(
- "CephInputStream constructor: initializing stream with fh " + fh
- + " and file length " + flength);
-
- }
-
- /** Ceph likes things to be closed before it shuts down,
- * so closing the IOStream stuff voluntarily in a finalizer is good
- */
- protected void finalize() throws Throwable {
- try {
- if (!closed) {
- close();
- }
- } finally {
- super.finalize();
- }
- }
-
- private synchronized boolean fillBuffer() throws IOException {
- bufValid = ceph.ceph_read(fileHandle, buffer, 0, buffer.length);
- bufPos = 0;
- if (bufValid < 0) {
- int err = bufValid;
-
- bufValid = 0;
- // attempt to reset to old position. If it fails, too bad.
- ceph.ceph_seek_from_start(fileHandle, cephPos);
- throw new IOException("Failed to fill read buffer! Error code:" + err);
- }
- cephPos += bufValid;
- return (bufValid != 0);
- }
-
- /*
- * Get the current position of the stream.
- */
- public synchronized long getPos() throws IOException {
- return cephPos - bufValid + bufPos;
- }
-
- /**
- * Find the number of bytes remaining in the file.
- */
- @Override
- public synchronized int available() throws IOException {
- return (int) (fileLength - getPos());
- }
-
- public synchronized void seek(long targetPos) throws IOException {
- LOG.trace(
- "CephInputStream.seek: Seeking to position " + targetPos + " on fd "
- + fileHandle);
- if (targetPos > fileLength) {
- throw new IOException(
- "CephInputStream.seek: failed seek to position " + targetPos
- + " on fd " + fileHandle + ": Cannot seek after EOF " + fileLength);
- }
- long oldPos = cephPos;
-
- cephPos = ceph.ceph_seek_from_start(fileHandle, targetPos);
- bufValid = 0;
- bufPos = 0;
- if (cephPos < 0) {
- cephPos = oldPos;
- throw new IOException("Ceph failed to seek to new position!");
- }
- }
-
- /**
- * Failovers are handled by the Ceph code at a very low level;
- * if there are issues that can be solved by changing sources
- * they'll be dealt with before anybody even tries to call this method!
- * @return false.
- */
- public synchronized boolean seekToNewSource(long targetPos) {
- return false;
- }
-
- /**
- * Read a byte from the file.
- * @return the next byte.
- */
- @Override
- public synchronized int read() throws IOException {
- LOG.trace(
- "CephInputStream.read: Reading a single byte from fd " + fileHandle
- + " by calling general read function");
-
- byte result[] = new byte[1];
-
- if (getPos() >= fileLength) {
- return -1;
- }
- if (-1 == read(result, 0, 1)) {
- return -1;
- }
- if (result[0] < 0) {
- return 256 + (int) result[0];
- } else {
- return result[0];
- }
- }
-
- /**
- * Read a specified number of bytes from the file into a byte[].
- * @param buf the byte array to read into.
- * @param off the offset to start at in the file
- * @param len the number of bytes to read
- * @return 0 if successful, otherwise an error code.
- * @throws IOException on bad input.
- */
- @Override
- public synchronized int read(byte buf[], int off, int len)
- throws IOException {
- LOG.trace(
- "CephInputStream.read: Reading " + len + " bytes from fd " + fileHandle);
-
- if (closed) {
- throw new IOException(
- "CephInputStream.read: cannot read " + len + " bytes from fd "
- + fileHandle + ": stream closed");
- }
-
- // ensure we're not past the end of the file
- if (getPos() >= fileLength) {
- LOG.debug(
- "CephInputStream.read: cannot read " + len + " bytes from fd "
- + fileHandle + ": current position is " + getPos()
- + " and file length is " + fileLength);
-
- return -1;
- }
-
- int totalRead = 0;
- int initialLen = len;
- int read;
-
- do {
- read = Math.min(len, bufValid - bufPos);
- try {
- System.arraycopy(buffer, bufPos, buf, off, read);
- } catch (IndexOutOfBoundsException ie) {
- throw new IOException(
- "CephInputStream.read: Indices out of bounds:" + "read length is "
- + len + ", buffer offset is " + off + ", and buffer size is "
- + buf.length);
- } catch (ArrayStoreException ae) {
- throw new IOException(
- "Uh-oh, CephInputStream failed to do an array"
- + "copy due to type mismatch...");
- } catch (NullPointerException ne) {
- throw new IOException(
- "CephInputStream.read: cannot read " + len + "bytes from fd:"
- + fileHandle + ": buf is null");
- }
- bufPos += read;
- len -= read;
- off += read;
- totalRead += read;
- } while (len > 0 && fillBuffer());
-
- LOG.trace(
- "CephInputStream.read: Reading " + initialLen + " bytes from fd "
- + fileHandle + ": succeeded in reading " + totalRead + " bytes");
- return totalRead;
- }
-
- /**
- * Close the CephInputStream and release the associated filehandle.
- */
- @Override
- public void close() throws IOException {
- LOG.trace("CephOutputStream.close:enter");
- if (!closed) {
- int result = ceph.ceph_close(fileHandle);
-
- closed = true;
- if (result != 0) {
- throw new IOException(
- "Close somehow failed!"
- + "Don't try and use this stream again, though");
- }
- LOG.trace("CephOutputStream.close:exit");
- }
- }
-}
diff --git a/src/client/hadoop/ceph/CephOutputStream.java b/src/client/hadoop/ceph/CephOutputStream.java
deleted file mode 100644
index 4c50f88467d..00000000000
--- a/src/client/hadoop/ceph/CephOutputStream.java
+++ /dev/null
@@ -1,219 +0,0 @@
-// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*-
-
-/**
- *
- * Licensed under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- *
- *
- * Implements the Hadoop FS interfaces to allow applications to store
- * files in Ceph.
- */
-
-package org.apache.hadoop.fs.ceph;
-
-
-import java.io.IOException;
-import java.io.OutputStream;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.util.Progressable;
-
-
-/**
- * <p>
- * An {@link OutputStream} for a CephFileSystem and corresponding
- * Ceph instance.
- */
-public class CephOutputStream extends OutputStream {
- private static final Log LOG = LogFactory.getLog(CephOutputStream.class);
- private boolean closed;
-
- private CephFS ceph;
-
- private int fileHandle;
-
- private byte[] buffer;
- private int bufUsed = 0;
-
- /**
- * Construct the CephOutputStream.
- * @param conf The FileSystem configuration.
- * @param fh The Ceph filehandle to connect to.
- */
- public CephOutputStream(Configuration conf, CephFS cephfs,
- int fh, int bufferSize) {
- ceph = cephfs;
- fileHandle = fh;
- closed = false;
- buffer = new byte[bufferSize];
- }
-
- /** Ceph likes things to be closed before it shuts down,
- *so closing the IOStream stuff voluntarily is good
- */
- protected void finalize() throws Throwable {
- try {
- if (!closed) {
- close();
- }
- } finally {
- super.finalize();
- }
- }
-
- /**
- * Get the current position in the file.
- * @return The file offset in bytes.
- */
- public long getPos() throws IOException {
- return ceph.ceph_getpos(fileHandle);
- }
-
- /**
- * Write a byte.
- * @param b The byte to write.
- * @throws IOException If you have closed the CephOutputStream or the
- * write fails.
- */
- @Override
- public synchronized void write(int b) throws IOException {
- LOG.trace(
- "CephOutputStream.write: writing a single byte to fd " + fileHandle);
-
- if (closed) {
- throw new IOException(
- "CephOutputStream.write: cannot write " + "a byte to fd " + fileHandle
- + ": stream closed");
- }
- // Stick the byte in a buffer and write it
- byte buf[] = new byte[1];
-
- buf[0] = (byte) b;
- write(buf, 0, 1);
- return;
- }
-
- /**
- * Write a byte buffer into the Ceph file.
- * @param buf the byte array to write from
- * @param off the position in the file to start writing at.
- * @param len The number of bytes to actually write.
- * @throws IOException if you have closed the CephOutputStream, or
- * if buf is null or off + len > buf.length, or
- * if the write fails due to a Ceph error.
- */
- @Override
- public synchronized void write(byte buf[], int off, int len) throws IOException {
- LOG.trace(
- "CephOutputStream.write: writing " + len + " bytes to fd " + fileHandle);
- // make sure stream is open
- if (closed) {
- throw new IOException(
- "CephOutputStream.write: cannot write " + len + "bytes to fd "
- + fileHandle + ": stream closed");
- }
-
- int result;
- int write;
-
- while (len > 0) {
- write = Math.min(len, buffer.length - bufUsed);
- try {
- System.arraycopy(buf, off, buffer, bufUsed, write);
- } catch (IndexOutOfBoundsException ie) {
- throw new IOException(
- "CephOutputStream.write: Indices out of bounds: "
- + "write length is " + len + ", buffer offset is " + off
- + ", and buffer size is " + buf.length);
- } catch (ArrayStoreException ae) {
- throw new IOException(
- "Uh-oh, CephOutputStream failed to do an array"
- + " copy due to type mismatch...");
- } catch (NullPointerException ne) {
- throw new IOException(
- "CephOutputStream.write: cannot write " + len + "bytes to fd "
- + fileHandle + ": buffer is null");
- }
- bufUsed += write;
- len -= write;
- off += write;
- if (bufUsed == buffer.length) {
- result = ceph.ceph_write(fileHandle, buffer, 0, bufUsed);
- if (result < 0) {
- throw new IOException(
- "CephOutputStream.write: Buffered write of " + bufUsed
- + " bytes failed!");
- }
- if (result != bufUsed) {
- throw new IOException(
- "CephOutputStream.write: Wrote only " + result + " bytes of "
- + bufUsed + " in buffer! Data may be lost or written"
- + " twice to Ceph!");
- }
- bufUsed = 0;
- }
-
- }
- return;
- }
-
- /**
- * Flush the buffered data.
- * @throws IOException if you've closed the stream or the write fails.
- */
- @Override
- public synchronized void flush() throws IOException {
- if (!closed) {
- if (bufUsed == 0) {
- return;
- }
- int result = ceph.ceph_write(fileHandle, buffer, 0, bufUsed);
-
- if (result < 0) {
- throw new IOException(
- "CephOutputStream.write: Write of " + bufUsed + "bytes to fd "
- + fileHandle + " failed");
- }
- if (result != bufUsed) {
- throw new IOException(
- "CephOutputStream.write: Write of " + bufUsed + "bytes to fd "
- + fileHandle + "was incomplete: only " + result + " of " + bufUsed
- + " bytes were written.");
- }
- bufUsed = 0;
- return;
- }
- }
-
- /**
- * Close the CephOutputStream.
- * @throws IOException if Ceph somehow returns an error. In current code it can't.
- */
- @Override
- public synchronized void close() throws IOException {
- LOG.trace("CephOutputStream.close:enter");
- if (!closed) {
- flush();
- int result = ceph.ceph_close(fileHandle);
-
- if (result != 0) {
- throw new IOException("Close failed!");
- }
-
- closed = true;
- LOG.trace("CephOutputStream.close:exit");
- }
- }
-}
diff --git a/src/client/hadoop/ceph/CephTalker.java b/src/client/hadoop/ceph/CephTalker.java
deleted file mode 100644
index 569652fdd0b..00000000000
--- a/src/client/hadoop/ceph/CephTalker.java
+++ /dev/null
@@ -1,91 +0,0 @@
-// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*-
-
-/**
- *
- * Licensed under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- *
- *
- * Wraps a number of native function calls to communicate with the Ceph
- * filesystem.
- */
-package org.apache.hadoop.fs.ceph;
-
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.commons.logging.Log;
-
-
-class CephTalker extends CephFS {
- // JNI doesn't give us any way to store pointers, so use a long.
- // Here we're assuming pointers aren't longer than 8 bytes.
- long cluster;
-
- // we write a constructor so we can load the libraries
- public CephTalker(Configuration conf, Log log) {
- System.load(conf.get("fs.ceph.libDir") + "/libcephfs.so");
- System.load(conf.get("fs.ceph.libDir") + "/libhadoopcephfs.so");
- cluster = 0;
- }
-
- protected native boolean ceph_initializeClient(String arguments, int block_size);
-
- protected native String ceph_getcwd();
-
- protected native boolean ceph_setcwd(String path);
-
- protected native boolean ceph_rmdir(String path);
-
- protected native boolean ceph_unlink(String path);
-
- protected native boolean ceph_rename(String old_path, String new_path);
-
- protected native boolean ceph_exists(String path);
-
- protected native long ceph_getblocksize(String path);
-
- protected native boolean ceph_isdirectory(String path);
-
- protected native boolean ceph_isfile(String path);
-
- protected native String[] ceph_getdir(String path);
-
- protected native int ceph_mkdirs(String path, int mode);
-
- protected native int ceph_open_for_append(String path);
-
- protected native int ceph_open_for_read(String path);
-
- protected native int ceph_open_for_overwrite(String path, int mode);
-
- protected native int ceph_close(int filehandle);
-
- protected native boolean ceph_setPermission(String path, int mode);
-
- protected native boolean ceph_kill_client();
-
- protected native boolean ceph_stat(String path, CephFileSystem.Stat fill);
-
- protected native int ceph_replication(String Path);
-
- protected native String[] ceph_hosts(int fh, long offset);
-
- protected native int ceph_setTimes(String path, long mtime, long atime);
-
- protected native long ceph_getpos(int fh);
-
- protected native int ceph_write(int fh, byte[] buffer, int buffer_offset, int length);
-
- protected native int ceph_read(int fh, byte[] buffer, int buffer_offset, int length);
-
- protected native long ceph_seek_from_start(int fh, long pos);
-}
diff --git a/src/client/hadoop/ceph/LICENSE b/src/client/hadoop/ceph/LICENSE
deleted file mode 100644
index 7a0decda573..00000000000
--- a/src/client/hadoop/ceph/LICENSE
+++ /dev/null
@@ -1,4 +0,0 @@
-Unlike the rest of the code in this repository, this
-directory (src/client/hadoop) is licensed under the Apache License 2.0. This
-is for the obvious reason that we want to integrate it into the Apache Hadoop
-project. \ No newline at end of file
diff --git a/src/client/hadoop/ceph/TestCeph.java b/src/client/hadoop/ceph/TestCeph.java
deleted file mode 100644
index e46b0eed3a1..00000000000
--- a/src/client/hadoop/ceph/TestCeph.java
+++ /dev/null
@@ -1,45 +0,0 @@
-// -*- mode:Java; tab-width:2; c-basic-offset:2; indent-tabs-mode:t -*-
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- * Unit tests for the CephFileSystem API implementation.
- */
-
-package org.apache.hadoop.fs.ceph;
-
-
-import java.io.IOException;
-import java.net.URI;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystemContractBaseTest;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-
-public class TestCeph extends FileSystemContractBaseTest {
-
- @Override
- protected void setUp() throws IOException {
- Configuration conf = new Configuration();
- CephFaker cephfaker = new CephFaker(conf, FileSystem.LOG);
- CephFileSystem cephfs = new CephFileSystem(cephfaker);
-
- cephfs.initialize(URI.create("ceph://null"), conf);
- fs = cephfs;
- }
-}
diff --git a/src/client/hadoop/ceph/package.html b/src/client/hadoop/ceph/package.html
deleted file mode 100644
index 8167b1dde92..00000000000
--- a/src/client/hadoop/ceph/package.html
+++ /dev/null
@@ -1,101 +0,0 @@
-<html>
-
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-
-<head></head>
-<body>
-<h1>A client for the Ceph filesystem</h1>
-
-<h3>Introduction</h3>
-
-This page describes how to use <a href="http://ceph.newdream.net">Ceph</a>
-as a backing store with Hadoop. This page assumes that you have downloaded
-the Ceph software and installed necessary binaries as outlined in the Ceph
-documentation.
-
-<h3>Steps</h3>
-<ul>
- <li>In the Hadoop conf directory edit core-site.xml,
- adding the following (with appropriate substitutions). Note that
- different nodes can connect to different monitors in the same cluster
- without issue (the Ceph client will automatically redirect as necessary).
-<pre>
-&lt;property&gt;
- &lt;name&gt;fs.default.name&lt;/name&gt;
- &lt;value&gt;ceph://null&lt;/value&gt;
-&lt;/property&gt;
-
-&lt;property&gt;
- &lt;name&gt;fs.ceph.monAddr&lt;/name&gt;
- &lt;value&gt;&lt;serverIP:port&gt;&lt;/value&gt;
- &lt;description&gt;The location of the Ceph monitor to connect to.
- This should be an IP address, not a domain-based web address.&lt;/description&gt;
-&lt;/property&gt;
-
-&lt;property&gt;
- &lt;name&gt;fs.ceph.libDir&lt;/name&gt;
- &lt;value&gt;/usr/local/lib&lt;/value&gt;
- &lt;description&gt;The folder holding libcephfs and libhadoopceph&lt;/description&gt;
- &lt;/property&gt;
-</pre>
- <li>There are also a number of optional Ceph configuration options.
-<pre>
-&lt;property&gt;
- &lt;name&gt;fs.ceph.blockSize&lt;/name&gt;
- &lt;value&gt;67108864&lt;/value&gt;
- &lt;description&gt;Defaulting to 64MB, this is the size (in bytes) you want Ceph to use in striping data internally and presenting it to Hadoop.&lt;/description&gt;
-&lt;/property&gt;
-
-&lt;property&gt;
- &lt;name&gt;fs.ceph.debug&lt;/name&gt;
- &lt;value&gt;true&lt;/value&gt;
- &lt;description&gt;If true, the Java-based code will print debugging information to standard error. This is useful if attempting to debug a Ceph issue as it puts both outputs in the same place.&lt;/description&gt;
-&lt;/property&gt;
-
-&lt;property&gt;
- &lt;name&gt;fs.ceph.clientDebug&lt;/name&gt;
- &lt;value&gt;1&lt;/value&gt;
- &lt;description&gt;If non-zero, the Ceph client will print debugging information to standard error (a higher number=more debugging).&lt;/description&gt;
-&lt;/property&gt;
-
-&lt;property&gt;
- &lt;name&gt;fs.ceph.messengerDebug&lt;/name&gt;
- &lt;value&gt;1&lt;/value&gt;
- &lt;description&gt;If non-zero, the Ceph messenger will print debugging information to standard error(a higher number=more debugging)&lt;/description&gt;
-&lt;/property&gt;
-
-&lt;property&gt;
- &lt;name&gt;fs.ceph.readahead&lt;/name&gt;
- &lt;value&gt;1&lt;/value&gt;
- &lt;description&gt;Sets the number of object periods to read ahead in prefetching. This should probably be left at the default of 1.&lt;/description&gt;
-&lt;/property&gt;
-
-&lt;property&gt;
- &lt;name&gt;fs.ceph.commandLine&lt;/name&gt;
- &lt;value&gt;a string&lt;/value&gt;
- &lt;description&gt;If you prefer, you may enter any of Ceph's command-line configuration here and it will get passed to the C client. Note that any filled-in configuration options will override what you put here. <br>
-By default, Ceph performs writes across the network rather than locally. To force local writes, add "set_local_pg" in this property.&lt;/description&gt;
-&lt;/property&gt;
-</pre>
-
- <li>Start up your Ceph instance according to the Ceph documentation.</li>
- <li>Do not use the bin/start-all.sh commands, as they will attempt to start
- up an hdfs instance. Just start whatever systems you need and they will
- automatically make use of the Ceph filesystem once configured as above.</li>
-</body>
-</html>
diff --git a/src/client/hadoop/org_apache_hadoop_fs_ceph_CephFS.h b/src/client/hadoop/org_apache_hadoop_fs_ceph_CephFS.h
deleted file mode 100644
index 0c07fd56e37..00000000000
--- a/src/client/hadoop/org_apache_hadoop_fs_ceph_CephFS.h
+++ /dev/null
@@ -1,13 +0,0 @@
-/* DO NOT EDIT THIS FILE - it is machine generated */
-#include <jni.h>
-/* Header for class org_apache_hadoop_fs_ceph_CephFS */
-
-#ifndef _Included_org_apache_hadoop_fs_ceph_CephFS
-#define _Included_org_apache_hadoop_fs_ceph_CephFS
-#ifdef __cplusplus
-extern "C" {
-#endif
-#ifdef __cplusplus
-}
-#endif
-#endif
diff --git a/src/client/hadoop/org_apache_hadoop_fs_ceph_CephFileSystem.h b/src/client/hadoop/org_apache_hadoop_fs_ceph_CephFileSystem.h
deleted file mode 100644
index 6f2bc93926c..00000000000
--- a/src/client/hadoop/org_apache_hadoop_fs_ceph_CephFileSystem.h
+++ /dev/null
@@ -1,31 +0,0 @@
-/* DO NOT EDIT THIS FILE - it is machine generated */
-#include <jni.h>
-/* Header for class org_apache_hadoop_fs_ceph_CephFileSystem */
-
-#ifndef _Included_org_apache_hadoop_fs_ceph_CephFileSystem
-#define _Included_org_apache_hadoop_fs_ceph_CephFileSystem
-#ifdef __cplusplus
-extern "C" {
-#endif
-#undef org_apache_hadoop_fs_ceph_CephFileSystem_EEXIST
-#define org_apache_hadoop_fs_ceph_CephFileSystem_EEXIST 17L
-#undef org_apache_hadoop_fs_ceph_CephFileSystem_ENOENT
-#define org_apache_hadoop_fs_ceph_CephFileSystem_ENOENT 2L
-#undef org_apache_hadoop_fs_ceph_CephFileSystem_FATAL
-#define org_apache_hadoop_fs_ceph_CephFileSystem_FATAL 0L
-#undef org_apache_hadoop_fs_ceph_CephFileSystem_ERROR
-#define org_apache_hadoop_fs_ceph_CephFileSystem_ERROR 1L
-#undef org_apache_hadoop_fs_ceph_CephFileSystem_WARN
-#define org_apache_hadoop_fs_ceph_CephFileSystem_WARN 2L
-#undef org_apache_hadoop_fs_ceph_CephFileSystem_INFO
-#define org_apache_hadoop_fs_ceph_CephFileSystem_INFO 3L
-#undef org_apache_hadoop_fs_ceph_CephFileSystem_DEBUG
-#define org_apache_hadoop_fs_ceph_CephFileSystem_DEBUG 4L
-#undef org_apache_hadoop_fs_ceph_CephFileSystem_TRACE
-#define org_apache_hadoop_fs_ceph_CephFileSystem_TRACE 5L
-#undef org_apache_hadoop_fs_ceph_CephFileSystem_NOLOG
-#define org_apache_hadoop_fs_ceph_CephFileSystem_NOLOG 6L
-#ifdef __cplusplus
-}
-#endif
-#endif
diff --git a/src/client/hadoop/org_apache_hadoop_fs_ceph_CephFileSystem_CephStat.h b/src/client/hadoop/org_apache_hadoop_fs_ceph_CephFileSystem_CephStat.h
deleted file mode 100644
index 5ab70f7dc66..00000000000
--- a/src/client/hadoop/org_apache_hadoop_fs_ceph_CephFileSystem_CephStat.h
+++ /dev/null
@@ -1,13 +0,0 @@
-/* DO NOT EDIT THIS FILE - it is machine generated */
-#include <jni.h>
-/* Header for class org_apache_hadoop_fs_ceph_CephFileSystem_CephStat */
-
-#ifndef _Included_org_apache_hadoop_fs_ceph_CephFileSystem_CephStat
-#define _Included_org_apache_hadoop_fs_ceph_CephFileSystem_CephStat
-#ifdef __cplusplus
-extern "C" {
-#endif
-#ifdef __cplusplus
-}
-#endif
-#endif
diff --git a/src/client/hadoop/org_apache_hadoop_fs_ceph_CephFileSystem_Stat.h b/src/client/hadoop/org_apache_hadoop_fs_ceph_CephFileSystem_Stat.h
deleted file mode 100644
index e9ade0e4504..00000000000
--- a/src/client/hadoop/org_apache_hadoop_fs_ceph_CephFileSystem_Stat.h
+++ /dev/null
@@ -1,13 +0,0 @@
-/* DO NOT EDIT THIS FILE - it is machine generated */
-#include <jni.h>
-/* Header for class org_apache_hadoop_fs_ceph_CephFileSystem_Stat */
-
-#ifndef _Included_org_apache_hadoop_fs_ceph_CephFileSystem_Stat
-#define _Included_org_apache_hadoop_fs_ceph_CephFileSystem_Stat
-#ifdef __cplusplus
-extern "C" {
-#endif
-#ifdef __cplusplus
-}
-#endif
-#endif
diff --git a/src/client/hadoop/org_apache_hadoop_fs_ceph_CephInputStream.h b/src/client/hadoop/org_apache_hadoop_fs_ceph_CephInputStream.h
deleted file mode 100644
index 4ec903294b7..00000000000
--- a/src/client/hadoop/org_apache_hadoop_fs_ceph_CephInputStream.h
+++ /dev/null
@@ -1,47 +0,0 @@
-/* DO NOT EDIT THIS FILE - it is machine generated */
-#include <jni.h>
-/* Header for class org_apache_hadoop_fs_ceph_CephInputStream */
-
-#ifndef _Included_org_apache_hadoop_fs_ceph_CephInputStream
-#define _Included_org_apache_hadoop_fs_ceph_CephInputStream
-#ifdef __cplusplus
-extern "C" {
-#endif
-#undef org_apache_hadoop_fs_ceph_CephInputStream_SKIP_BUFFER_SIZE
-#define org_apache_hadoop_fs_ceph_CephInputStream_SKIP_BUFFER_SIZE 2048L
-/*
- * Class: org_apache_hadoop_fs_ceph_CephInputStream
- * Method: ceph_read
- * Signature: (I[BII)I
- */
-JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephInputStream_ceph_1read
- (JNIEnv *, jobject, jint, jbyteArray, jint, jint);
-
-/*
- * Class: org_apache_hadoop_fs_ceph_CephInputStream
- * Method: ceph_seek_from_start
- * Signature: (IJ)J
- */
-JNIEXPORT jlong JNICALL Java_org_apache_hadoop_fs_ceph_CephInputStream_ceph_1seek_1from_1start
- (JNIEnv *, jobject, jint, jlong);
-
-/*
- * Class: org_apache_hadoop_fs_ceph_CephInputStream
- * Method: ceph_getpos
- * Signature: (I)J
- */
-JNIEXPORT jlong JNICALL Java_org_apache_hadoop_fs_ceph_CephInputStream_ceph_1getpos
- (JNIEnv *, jobject, jint);
-
-/*
- * Class: org_apache_hadoop_fs_ceph_CephInputStream
- * Method: ceph_close
- * Signature: (I)I
- */
-JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephInputStream_ceph_1close
- (JNIEnv *, jobject, jint);
-
-#ifdef __cplusplus
-}
-#endif
-#endif
diff --git a/src/client/hadoop/org_apache_hadoop_fs_ceph_CephOutputStream.h b/src/client/hadoop/org_apache_hadoop_fs_ceph_CephOutputStream.h
deleted file mode 100644
index 676b137c9f9..00000000000
--- a/src/client/hadoop/org_apache_hadoop_fs_ceph_CephOutputStream.h
+++ /dev/null
@@ -1,37 +0,0 @@
-/* DO NOT EDIT THIS FILE - it is machine generated */
-#include <jni.h>
-/* Header for class org_apache_hadoop_fs_ceph_CephOutputStream */
-
-#ifndef _Included_org_apache_hadoop_fs_ceph_CephOutputStream
-#define _Included_org_apache_hadoop_fs_ceph_CephOutputStream
-#ifdef __cplusplus
-extern "C" {
-#endif
-/*
- * Class: org_apache_hadoop_fs_ceph_CephOutputStream
- * Method: ceph_getpos
- * Signature: (I)J
- */
-JNIEXPORT jlong JNICALL Java_org_apache_hadoop_fs_ceph_CephOutputStream_ceph_1getpos
- (JNIEnv *, jobject, jint);
-
-/*
- * Class: org_apache_hadoop_fs_ceph_CephOutputStream
- * Method: ceph_close
- * Signature: (I)I
- */
-JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephOutputStream_ceph_1close
- (JNIEnv *, jobject, jint);
-
-/*
- * Class: org_apache_hadoop_fs_ceph_CephOutputStream
- * Method: ceph_write
- * Signature: (I[BII)I
- */
-JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephOutputStream_ceph_1write
- (JNIEnv *, jobject, jint, jbyteArray, jint, jint);
-
-#ifdef __cplusplus
-}
-#endif
-#endif
diff --git a/src/client/hadoop/org_apache_hadoop_fs_ceph_CephTalker.h b/src/client/hadoop/org_apache_hadoop_fs_ceph_CephTalker.h
deleted file mode 100644
index 55854549b8c..00000000000
--- a/src/client/hadoop/org_apache_hadoop_fs_ceph_CephTalker.h
+++ /dev/null
@@ -1,197 +0,0 @@
-/* DO NOT EDIT THIS FILE - it is machine generated */
-#include <jni.h>
-/* Header for class org_apache_hadoop_fs_ceph_CephTalker */
-
-#ifndef _Included_org_apache_hadoop_fs_ceph_CephTalker
-#define _Included_org_apache_hadoop_fs_ceph_CephTalker
-#ifdef __cplusplus
-extern "C" {
-#endif
-/*
- * Class: org_apache_hadoop_fs_ceph_CephTalker
- * Method: ceph_initializeClient
- * Signature: (Ljava/lang/String;I)Z
- */
-JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1initializeClient
- (JNIEnv *, jobject, jstring, jint);
-
-/*
- * Class: org_apache_hadoop_fs_ceph_CephTalker
- * Method: ceph_getcwd
- * Signature: ()Ljava/lang/String;
- */
-JNIEXPORT jstring JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1getcwd
- (JNIEnv *, jobject);
-
-/*
- * Class: org_apache_hadoop_fs_ceph_CephTalker
- * Method: ceph_setcwd
- * Signature: (Ljava/lang/String;)Z
- */
-JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1setcwd
- (JNIEnv *, jobject, jstring);
-
-/*
- * Class: org_apache_hadoop_fs_ceph_CephTalker
- * Method: ceph_rmdir
- * Signature: (Ljava/lang/String;)Z
- */
-JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1rmdir
- (JNIEnv *, jobject, jstring);
-
-/*
- * Class: org_apache_hadoop_fs_ceph_CephTalker
- * Method: ceph_unlink
- * Signature: (Ljava/lang/String;)Z
- */
-JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1unlink
- (JNIEnv *, jobject, jstring);
-
-/*
- * Class: org_apache_hadoop_fs_ceph_CephTalker
- * Method: ceph_rename
- * Signature: (Ljava/lang/String;Ljava/lang/String;)Z
- */
-JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1rename
- (JNIEnv *, jobject, jstring, jstring);
-
-/*
- * Class: org_apache_hadoop_fs_ceph_CephTalker
- * Method: ceph_exists
- * Signature: (Ljava/lang/String;)Z
- */
-JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1exists
- (JNIEnv *, jobject, jstring);
-
-/*
- * Class: org_apache_hadoop_fs_ceph_CephTalker
- * Method: ceph_getblocksize
- * Signature: (Ljava/lang/String;)J
- */
-JNIEXPORT jlong JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1getblocksize
- (JNIEnv *, jobject, jstring);
-
-/*
- * Class: org_apache_hadoop_fs_ceph_CephTalker
- * Method: ceph_isdirectory
- * Signature: (Ljava/lang/String;)Z
- */
-JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1isdirectory
- (JNIEnv *, jobject, jstring);
-
-/*
- * Class: org_apache_hadoop_fs_ceph_CephTalker
- * Method: ceph_isfile
- * Signature: (Ljava/lang/String;)Z
- */
-JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1isfile
- (JNIEnv *, jobject, jstring);
-
-/*
- * Class: org_apache_hadoop_fs_ceph_CephTalker
- * Method: ceph_getdir
- * Signature: (Ljava/lang/String;)[Ljava/lang/String;
- */
-JNIEXPORT jobjectArray JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1getdir
- (JNIEnv *, jobject, jstring);
-
-/*
- * Class: org_apache_hadoop_fs_ceph_CephTalker
- * Method: ceph_mkdirs
- * Signature: (Ljava/lang/String;I)I
- */
-JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1mkdirs
- (JNIEnv *, jobject, jstring, jint);
-
-/*
- * Class: org_apache_hadoop_fs_ceph_CephTalker
- * Method: ceph_open_for_append
- * Signature: (Ljava/lang/String;)I
- */
-JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1open_1for_1append
- (JNIEnv *, jobject, jstring);
-
-/*
- * Class: org_apache_hadoop_fs_ceph_CephTalker
- * Method: ceph_open_for_read
- * Signature: (Ljava/lang/String;)I
- */
-JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1open_1for_1read
- (JNIEnv *, jobject, jstring);
-
-/*
- * Class: org_apache_hadoop_fs_ceph_CephTalker
- * Method: ceph_open_for_overwrite
- * Signature: (Ljava/lang/String;I)I
- */
-JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1open_1for_1overwrite
- (JNIEnv *, jobject, jstring, jint);
-
-/*
- * Class: org_apache_hadoop_fs_ceph_CephTalker
- * Method: ceph_close
- * Signature: (I)I
- */
-JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1close
- (JNIEnv *, jobject, jint);
-
-/*
- * Class: org_apache_hadoop_fs_ceph_CephTalker
- * Method: ceph_setPermission
- * Signature: (Ljava/lang/String;I)Z
- */
-JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1setPermission
- (JNIEnv *, jobject, jstring, jint);
-
-/*
- * Class: org_apache_hadoop_fs_ceph_CephTalker
- * Method: ceph_kill_client
- * Signature: ()Z
- */
-JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1kill_1client
- (JNIEnv *, jobject);
-
-/*
- * Class: org_apache_hadoop_fs_ceph_CephTalker
- * Method: ceph_stat
- * Signature: (Ljava/lang/String;Lorg/apache/hadoop/fs/ceph/CephFileSystem/Stat;)Z
- */
-JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1stat
- (JNIEnv *, jobject, jstring, jobject);
-
-/*
- * Class: org_apache_hadoop_fs_ceph_CephTalker
- * Method: ceph_statfs
- * Signature: (Ljava/lang/String;Lorg/apache/hadoop/fs/ceph/CephFileSystem/CephStat;)I
- */
-JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1statfs
- (JNIEnv *, jobject, jstring, jobject);
-
-/*
- * Class: org_apache_hadoop_fs_ceph_CephTalker
- * Method: ceph_replication
- * Signature: (Ljava/lang/String;)I
- */
-JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1replication
- (JNIEnv *, jobject, jstring);
-
-/*
- * Class: org_apache_hadoop_fs_ceph_CephTalker
- * Method: ceph_hosts
- * Signature: (IJ)Ljava/lang/String;
- */
-JNIEXPORT jstring JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1hosts
- (JNIEnv *, jobject, jint, jlong);
-
-/*
- * Class: org_apache_hadoop_fs_ceph_CephTalker
- * Method: ceph_setTimes
- * Signature: (Ljava/lang/String;JJ)I
- */
-JNIEXPORT jint JNICALL Java_org_apache_hadoop_fs_ceph_CephTalker_ceph_1setTimes
- (JNIEnv *, jobject, jstring, jlong, jlong);
-
-#ifdef __cplusplus
-}
-#endif
-#endif
diff --git a/src/common/config_opts.h b/src/common/config_opts.h
index 2fa72d4ce0f..f6283239660 100644
--- a/src/common/config_opts.h
+++ b/src/common/config_opts.h
@@ -91,7 +91,6 @@ SUBSYS(finisher, 1, 1)
SUBSYS(heartbeatmap, 1, 5)
SUBSYS(perfcounter, 1, 5)
SUBSYS(rgw, 1, 5) // log level for the Rados gateway
-SUBSYS(hadoop, 1, 5)
SUBSYS(javaclient, 1, 5)
SUBSYS(asok, 1, 5)
SUBSYS(throttle, 1, 1)
diff --git a/src/test/Makefile.am b/src/test/Makefile.am
index 54390b1b2de..80ec69425ca 100644
--- a/src/test/Makefile.am
+++ b/src/test/Makefile.am
@@ -116,19 +116,6 @@ test_build_libcephfs_CFLAGS = $(AM_CFLAGS)
test_build_libcephfs_CXXFLAGS = $(AM_CXXFLAGS)
bin_DEBUGPROGRAMS += test_build_libcephfs
-if WITH_HADOOPCLIENT
-test_build_libhadoopcephfs_SOURCES = \
- test/buildtest_skeleton.cc \
- $(libhadoopcephfs_la_SOURCES)
-test_build_libhadoopcephfs_LDADD = \
- $(LIBCEPHFS) -lexpat \
- $(PTHREAD_LIBS) $(CRYPTO_LIBS) $(EXTRALIBS)
-test_build_libhadoopcephfs_LDFLAGS = -static-libtool-libs
-test_build_libhadoopcephfs_CFLAGS = $(AM_CFLAGS)
-test_build_libhadoopcephfs_CXXFLAGS = $(AM_CXXFLAGS)
-bin_DEBUGPROGRAMS += test_build_libhadoopcephfs
-endif # WITH_HADOOPCLIENT
-
endif # WITH_BUILD_TESTS