summaryrefslogtreecommitdiff
path: root/src/obsync
diff options
context:
space:
mode:
authorYehuda Sadeh <yehuda.sadeh@dreamhost.com>2011-11-22 15:05:45 -0800
committerYehuda Sadeh <yehuda.sadeh@dreamhost.com>2011-11-22 15:06:16 -0800
commitebe5fc60d20f92a0037c53c1e7bd7ae512be3da4 (patch)
tree2c9a3d46d666e89964d69d7572d05b84f0006b2f /src/obsync
parenta859763b1cba844d0d56b861a372e5f63f87c607 (diff)
downloadceph-ebe5fc60d20f92a0037c53c1e7bd7ae512be3da4.tar.gz
obsync: tear out rgw
Diffstat (limited to 'src/obsync')
-rwxr-xr-xsrc/obsync/obsync262
1 files changed, 1 insertions, 261 deletions
diff --git a/src/obsync/obsync b/src/obsync/obsync
index 2d5e2928827..89751494d75 100755
--- a/src/obsync/obsync
+++ b/src/obsync/obsync
@@ -28,8 +28,6 @@ import hashlib
import mimetypes
import os
from StringIO import StringIO
-import rados
-import rgw
import re
import shutil
import string
@@ -45,13 +43,9 @@ global opts
# Translation table mapping users in the source to users in the destination.
global xuser
-# Librgw instance
-global lrgw
-lrgw = None
-
###### Usage #######
USAGE = """
-obsync synchronizes S3, Rados, and local objects. The source and destination
+obsync synchronizes S3 and local objects. The source and destination
can both be local or both remote.
Examples:
@@ -84,22 +78,6 @@ defaults.
obsync (options) [source] [destination]"""
-###### Constants #######
-ACL_XATTR = "rados.acl"
-META_XATTR_PREFIX = "rados.meta."
-CONTENT_TYPE_XATTR = "rados.content_type"
-
-RGW_META_BUCKET_NAME = ".rgw"
-RGW_USERS_UID_BUCKET_NAME = ".users.uid"
-RGW_META_ETAG = "user.rgw.etag"
-RGW_META_PREFIX = "user.x-amz-meta-"
-RGW_META_CONTENT_TYPE = "user.rgw.content_type"
-RGW_META_ACL = "user.rgw.acl"
-
-def vvprint(s):
- if (opts.more_verbose):
- print s
-
###### Exception classes #######
class ObsyncException(Exception):
def __init__(self, ty, e):
@@ -550,15 +528,6 @@ class Store(object):
else:
is_secure = os.environ.has_key("SRC_SECURE")
return S3Store(s3_url, create, akey, skey, is_secure)
- rados_url = strip_prefix("rgw:", url)
- if (rados_url):
- dst_owner = None
- if (is_dst):
- if not os.environ.has_key("DST_OWNER"):
- raise ObsyncArgumentParsingException("You must set \
-DST_OWNER when uploading files to RgwStore.")
- dst_owner = os.environ["DST_OWNER"]
- return RgwStore(rados_url, create, akey, skey, dst_owner)
file_url = strip_prefix("file://", url)
if (file_url):
return FileStore(file_url, create)
@@ -865,235 +834,6 @@ class FileStore(Store):
if (opts.more_verbose):
print "FileStore: removed %s" % obj.name
-###### Rgw store #######
-class RgwStoreIterator(object):
- """RgwStore iterator"""
- def __init__(self, it, rgw_store):
- self.it = it # has type rados.ObjectIterator
- self.rgw_store = rgw_store
- self.prefix = self.rgw_store.key_prefix
- self.prefix_len = len(self.rgw_store.key_prefix)
- def __iter__(self):
- return self
- def next(self):
- rados_obj = None
- while True:
- # This will raise StopIteration when there are no more objects to
- # iterate on
- rados_obj = self.it.next()
- # do the prefixes match?
- if rados_obj.key[:self.prefix_len] == self.prefix:
- break
- ret = self.rgw_store.obsync_obj_from_rgw(rados_obj.key)
- if (ret == None):
- raise ObsyncPermanentException("internal iterator error")
- return ret
-
-class RgwStore(Store):
- def __init__(self, url, create, akey, skey, owner):
- global lrgw
- if (lrgw == None):
- lrgw = rgw.Rgw()
- self.owner = owner
- self.user_exists_cache = {}
- self.users_uid_ioctx = None
- # Parse the rados url
- conf_end = string.find(url, ":")
- if (conf_end == -1):
- raise ObsyncPermanentException("RgwStore URLs are of the form \
-rgw:path/to/ceph/conf:bucket:key_prefix. Failed to find the path to the conf.")
- self.conf_file_path = url[0:conf_end]
- bucket_end = url.find(":", conf_end+1)
- if (bucket_end == -1):
- self.rgw_bucket_name = url[conf_end+1:]
- self.key_prefix = ""
- else:
- self.rgw_bucket_name = url[conf_end+1:bucket_end]
- self.key_prefix = url[bucket_end+1:]
- if (self.rgw_bucket_name == ""):
- raise ObsyncPermanentException("RgwStore URLs are of the form \
-rgw:/path/to/ceph/conf:pool:key_prefix. Failed to find the bucket.")
- if (opts.more_verbose):
- print "self.conf_file_path = '" + self.conf_file_path + "', ",
- print "self.rgw_bucket_name = '" + self.rgw_bucket_name + "' ",
- print "self.key_prefix = '" + self.key_prefix + "'"
- self.rados = rados.Rados()
- self.rados.conf_read_file(self.conf_file_path)
- self.rados.connect()
- if self.owner != None and not self.user_exists(ACL_TYPE_CANON_USER + self.owner):
- raise ObsyncPermanentException("Unknown owner! DST_OWNER=%s" % self.owner)
- if (not self.rados.pool_exists(self.rgw_bucket_name)):
- if (create):
- self.create_rgw_bucket(self.rgw_bucket_name)
- else:
- raise ObsyncPermanentException("NonexistentStore")
- elif self.owner == None:
- # Figure out what owner we should use when creating objects.
- # We use the owner of the destination bucket
- ioctx = self.rados.open_ioctx(RGW_META_BUCKET_NAME)
- try:
- bin_ = ioctx.get_xattr(self.rgw_bucket_name, RGW_META_ACL)
- xml = lrgw.acl_bin2xml(bin_)
- acl = AclPolicy.from_xml(xml)
- self.owner = acl.owner_id
- if (opts.more_verbose):
- print "using owner \"%s\"" % self.owner
- finally:
- ioctx.close()
- self.ioctx = self.rados.open_ioctx(self.rgw_bucket_name)
- Store.__init__(self, "rgw:" + url)
- def create_rgw_bucket(self, rgw_bucket_name):
- global lrgw
- """ Create an rgw bucket named 'rgw_bucket_name' """
- if (self.owner == None):
- raise ObsyncArgumentParsingException("Can't create a bucket \
-without knowing who should own it. Please set DST_OWNER")
- self.rados.create_pool(self.rgw_bucket_name)
- ioctx = None
- try:
- ioctx = self.rados.open_ioctx(RGW_META_BUCKET_NAME)
- ioctx.write(rgw_bucket_name, "", 0)
- print "ioctx.set_xattr(rgw_bucket_name=" + rgw_bucket_name + ", " + \
- "user.rgw.acl=" + self.owner + ")"
- new_bucket_acl = "\
-<AccessControlPolicy xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\"> \
-<Owner><ID>%s</ID></Owner><AccessControlList>\
-<Grant><Grantee xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\" \
-xsi:type=\"CanonicalUser\"><ID>%s</ID> \
-<DisplayName>display-name</DisplayName></Grantee> \
-<Permission>FULL_CONTROL</Permission></Grant>\
-</AccessControlList></AccessControlPolicy>" % (self.owner, self.owner)
- new_bucket_acl_bin = lrgw.acl_xml2bin(new_bucket_acl)
- ioctx.set_xattr(rgw_bucket_name, "user.rgw.acl", new_bucket_acl_bin)
- finally:
- if (ioctx):
- ioctx.close()
- def obsync_obj_from_rgw(self, obj_name):
- """Create an obsync object from a Rados object"""
- try:
- size, tm = self.ioctx.stat(obj_name)
- except rados.ObjectNotFound:
- return None
- md5 = None
- meta = {}
- for k,v in self.ioctx.get_xattrs(obj_name):
- if k == RGW_META_ETAG:
- md5 = v
- elif k == RGW_META_CONTENT_TYPE:
- meta[CONTENT_TYPE_XATTR] = v
- elif k[:len(RGW_META_PREFIX)] == RGW_META_PREFIX:
- meta["rados.meta." + k[len(RGW_META_PREFIX):]] = v
- elif opts.more_verbose:
- print "ignoring unknown xattr " + k
- if (md5 == None):
- raise ObsyncPermanentException("error on object %s: expected to find " + \
- "extended attribute %s" % (obj_name, RGW_META_ETAG))
- if (opts.more_verbose):
- print "meta = " + str(meta)
- return Object(obj_name, md5, size, meta)
- def __str__(self):
- return "rgw:" + self.conf_file_path + ":" + self.rgw_bucket_name
- def get_acl(self, obj):
- global lrgw
- bin_ = None
- try:
- bin_ = self.ioctx.get_xattr(obj.name, RGW_META_ACL)
- except rados.NoData:
- return LocalAcl.get_empty(obj.name)
- xml = lrgw.acl_bin2xml(bin_)
- return LocalAcl.from_xml(obj.name, xml)
- def make_local_copy(self, obj):
- temp_file = None
- temp_file_f = None
- try:
- # read the object from rgw in chunks
- temp_file = tempfile.NamedTemporaryFile(mode='w+b', delete=False)
- temp_file_f = open(temp_file.name, 'w')
- off = 0
- while True:
- buf = self.ioctx.read(obj.name, offset = off, length = 8192)
- if (len(buf) == 0):
- break
- temp_file_f.write(buf)
- if (len(buf) < 8192):
- break
- off += 8192
- temp_file_f.close()
- except Exception, e:
- if (temp_file_f):
- temp_file_f.close()
- if (temp_file):
- os.unlink(temp_file.name)
- raise ObsyncTemporaryException(e)
- return LocalCopy(obj.name, temp_file.name, True)
- def all_objects(self):
- it = self.ioctx.list_objects()
- return RgwStoreIterator(it, self)
- def locate_object(self, obj):
- return self.obsync_obj_from_rgw(obj.name)
- def user_exists(self, user):
- if (self.user_exists_cache.has_key(user)):
- return self.user_exists_cache[user]
- if user[:len(ACL_TYPE_CANON_USER)] == ACL_TYPE_CANON_USER:
- if (self.users_uid_ioctx == None):
- # will be closed in __del__
- self.users_uid_ioctx = self.rados.open_ioctx(RGW_USERS_UID_BUCKET_NAME)
- try:
- self.users_uid_ioctx.stat(user[len(ACL_TYPE_CANON_USER):])
- except rados.ObjectNotFound:
- return False
- self.user_exists_cache[user] = True
- return True
- elif user[:len(ACL_TYPE_EMAIL_USER)] == ACL_TYPE_EMAIL_USER:
- raise ObsyncPermanentException("rgw target can't handle email users yet.")
- elif user[:len(ACL_TYPE_GROUP)] == ACL_TYPE_GROUP:
- raise ObsyncPermanentException("rgw target can't handle groups yet.")
- else:
- raise ObsyncPermanentException("can't understand user name %s" % user)
- def upload(self, local_copy, src_acl, obj):
- global lrgw
- if (opts.more_verbose):
- print "RgwStore.UPLOAD: local_copy.path='" + local_copy.path + "' " + \
- "obj='" + obj.name + "'"
- if (opts.dry_run):
- return
- local_copy_f = open(local_copy.path, 'r')
- off = 0
- while True:
- buf = local_copy_f.read(8192)
- if ((len(buf) == 0) and (off != 0)):
- break
- self.ioctx.write(obj.name, buf, off)
- if (len(buf) < 8192):
- break
- off += 8192
- self.ioctx.set_xattr(obj.name, "user.rgw.etag", obj.md5)
- if (src_acl.acl_policy == None):
- ap = AclPolicy.create_default(self.owner)
- else:
- ap = src_acl.acl_policy
- for user in ap.get_all_users():
- if not self.user_exists(user):
- raise ObsyncPermanentException("You must provide an --xuser entry to translate \
-user %s into something valid for the rgw destination.")
- xml = ap.to_xml()
- bin_ = lrgw.acl_xml2bin(xml)
- self.ioctx.set_xattr(obj.name, "user.rgw.acl", bin_)
- content_type = "application/octet-stream"
- for k,v in obj.meta.items():
- if k == CONTENT_TYPE_XATTR:
- content_type = v
- elif k[:len(META_XATTR_PREFIX)] == META_XATTR_PREFIX:
- self.ioctx.set_xattr(obj.name,
- RGW_META_PREFIX + k[len(META_XATTR_PREFIX):], v)
- self.ioctx.set_xattr(obj.name, "user.rgw.content_type", content_type)
- def remove(self, obj):
- if (opts.dry_run):
- return
- self.ioctx.remove_object(obj.name)
- if (opts.more_verbose):
- print "RgwStore: removed %s" % obj.name
-
###### Functions #######
def delete_unreferenced(src, dst):
""" delete everything from dst that is not referenced in src """