summaryrefslogtreecommitdiff
path: root/src/obsync
diff options
context:
space:
mode:
authorSage Weil <sage@newdream.net>2011-05-16 14:28:33 -0700
committerSage Weil <sage@newdream.net>2011-05-17 14:26:30 -0700
commit28e175d640902cad2df77e3cabc0475584f801f2 (patch)
treeafdeddde0f9e03084dc5b06617eb9b040f225007 /src/obsync
parent6d56c20f4cfacf8847facf83ca49f6df5d7f9f39 (diff)
downloadceph-28e175d640902cad2df77e3cabc0475584f801f2.tar.gz
debian: obsync
Signed-off-by: Sage Weil <sage@newdream.net>
Diffstat (limited to 'src/obsync')
-rwxr-xr-xsrc/obsync/boto_tool287
-rwxr-xr-xsrc/obsync/obsync1013
2 files changed, 1300 insertions, 0 deletions
diff --git a/src/obsync/boto_tool b/src/obsync/boto_tool
new file mode 100755
index 00000000000..e60a040fea3
--- /dev/null
+++ b/src/obsync/boto_tool
@@ -0,0 +1,287 @@
+#!/usr/bin/python
+
+#
+# Ceph - scalable distributed file system
+#
+# Copyright (C) 2011 New Dream Network
+#
+# This is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License version 2.1, as published by the Free Software
+# Foundation. See file COPYING.
+#
+
+"""
+boto_tool.py: s3cmd-like tool for operating on s3
+
+A lot of common s3 clients can't handle weird names.
+But this little script can do it!
+"""
+
+from boto.s3.connection import OrdinaryCallingFormat
+from boto.s3.connection import S3Connection
+from boto.s3.key import Key
+from optparse import OptionParser
+from sys import stderr
+import boto
+import os
+import string
+import sys
+
+global conn
+
+def getenv(a):
+ if os.environ.has_key(a):
+ return os.environ[a]
+ else:
+ return None
+
+def strip_prefix(prefix, s):
+ if not (s[0:len(prefix)] == prefix):
+ return None
+ return s[len(prefix):]
+
+def list_all_buckets(args):
+ parser = OptionParser()
+ parser.add_option("-v", "--verbose", action="store_true",
+ dest="verbose", default=False, help="verbose output")
+ (opts, args) = parser.parse_args(args)
+ blrs = conn.get_all_buckets()
+ for b in blrs:
+ if (opts.verbose):
+ print b.__dict__
+ else:
+ print b
+
+def mkbucket(args):
+ if (len(args) < 1):
+ print "must give an argument to mkbucket"
+ return 255
+ bucket_name = args[0]
+ print "creating bucket '%s' ..." % bucket_name
+ bucket = conn.create_bucket(bucket_name)
+ print "done."
+ return 0
+
+def rmbucket(args):
+ if (len(args) < 1):
+ print "must give an argument to rmbucket"
+ return 255
+ bucket = conn.get_bucket(args[0])
+ print "deleting bucket '%s' ..." % args[0]
+ bucket.delete()
+ print "done."
+ return 0
+
+def bucket_exists(args):
+ if (len(args) < 1):
+ print "must give an argument to exists"
+ return 255
+ bucket = conn.get_bucket(opts.bucket_exists)
+ if (bucket == None):
+ print "bucket '%s' does not exist"
+ return 1
+ else:
+ print "found bucket '%s'."
+ return 0
+
+def put_obj(bucket_name, args):
+ parser = OptionParser()
+ parser.add_option("-f", "--filename", dest="filename",
+ help="file name (default stdin)")
+ (opts, args) = parser.parse_args(args)
+ if (len(args) < 1):
+ print "put requires an argument: the object name"
+ return 255
+ obj_name = args[0]
+ print "uploading to bucket: '%s', object name: '%s'" % (bucket_name, obj_name)
+ bucket = conn.get_bucket(bucket_name)
+ k = Key(bucket)
+ k.key = obj_name
+ if (opts.filename == None):
+ print "sorry, no support for put-from-stdin yet. use -f"
+ return 255
+ else:
+ k.set_contents_from_filename(opts.filename)
+
+def put_obj_acl(bucket_name, args):
+ parser = OptionParser()
+ parser.add_option("-f", "--filename", dest="filename",
+ help="file name (default stdin)")
+ (opts, args) = parser.parse_args(args)
+ if (len(args) < 1):
+ print "put_acl requires an argument: the object name"
+ return 255
+ obj_name = args[0]
+ print "uploading object ACL to bucket: '%s', object name: '%s'" \
+ % (bucket_name, obj_name)
+ bucket = conn.get_bucket(bucket_name)
+ k = Key(bucket)
+ k.key = obj_name
+ if (opts.filename == None):
+ print "sorry, no support for put-from-stdin yet. use -f"
+ return 255
+ else:
+ f = open(opts.filename, "r")
+ try:
+ xml = f.read()
+ finally:
+ f.close()
+ k.set_xml_acl(xml)
+
+def get_obj(bucket_name, args):
+ parser = OptionParser()
+ parser.add_option("-f", "--filename", dest="filename",
+ help="file name (default stdin)")
+ (opts, args) = parser.parse_args(args)
+ if (len(args) < 1):
+ print "get requires an argument: the object name"
+ return 255
+ obj_name = args[0]
+ print "downloading from bucket: '%s', object name: '%s'" % (bucket_name, obj_name)
+ bucket = conn.get_bucket(bucket_name)
+ k = Key(bucket)
+ k.key = obj_name
+ if (opts.filename == None):
+ k.get_contents_to_file(sys.stdout)
+ else:
+ k.get_contents_to_filename(opts.filename)
+
+def get_obj_acl(bucket_name, args):
+ parser = OptionParser()
+ parser.add_option("-f", "--filename", dest="filename",
+ help="file name (default stdin)")
+ (opts, args) = parser.parse_args(args)
+ if (len(args) < 1):
+ print "get_acl requires an argument: the object name to get the acl for"
+ return 255
+ obj_name = args[0]
+ print "downloading object acl from bucket: '%s', object name: '%s'" % \
+ (bucket_name, obj_name)
+ bucket = conn.get_bucket(bucket_name)
+ k = Key(bucket)
+ k.key = obj_name
+ xml = k.get_xml_acl()
+ if (opts.filename == None):
+ print xml
+ else:
+ f = open(opts.filename, "w")
+ try:
+ f.write(xml)
+ finally:
+ f.close()
+
+def list_obj(bucket_name, args):
+ if (len(args) < 1):
+ prefix = None
+ else:
+ prefix = args[0]
+ bucket = conn.get_bucket(bucket_name)
+ for key in bucket.list(prefix = prefix):
+ print key.name
+
+def rm_obj(bucket_name, args):
+ if (len(args) < 1):
+ obj_name = None
+ else:
+ obj_name = args[0]
+ print "removing from bucket: '%s', object name: '%s'" % (bucket_name, obj_name)
+ bucket = conn.get_bucket(bucket_name)
+ bucket.delete_key(obj_name)
+ print "done."
+
+def head_obj(bucket_name, args):
+ parser = OptionParser()
+ parser.add_option("-f", "--filename", dest="filename",
+ help="file name (default stdin)")
+ (opts, args) = parser.parse_args(args)
+ if (len(args) < 1):
+ print "get requires an argument: the object name"
+ return 255
+ obj_name = args[0]
+ print "downloading from bucket: '%s', object name: '%s'" % (bucket_name, obj_name)
+ bucket = conn.get_bucket(bucket_name)
+ k = bucket.get_key(k, obj_name)
+ print k
+
+def usage():
+ print """
+boto_tool.py
+ ./boto_tool.py -h
+ ./boto_tool.py --help
+ Show this help
+ ./boto_tool.py <host> ls
+ Lists all buckets in a host
+ ./boto_tool.py <host> <bucket> ls
+ Lists all objects in a bucket
+ ./boto_tool.py <host> <bucket> ls <prefix>
+ Lists all objects in a bucket that have a given prefix
+ ./boto_tool.py <host> mkbucket <bucket>
+ Create a new bucket
+ ./boto_tool.py <host> rmbucket <bucket>
+ Remove a bucket
+ ./boto_tool.py <host> exists <bucket>
+ Tests if a bucket exists
+ ./boto_tool.py <host> <bucket> put <object> [opts]
+ Upload an object
+ opts:
+ -f filename file name (default stdin)
+ ./boto_tool.py <host> <bucket> get <object> [opts]
+ Gets an object
+ opts:
+ -f filename file name (default stdout)
+ ./boto_tool.py <host> <bucket> head <object> [opts]
+ Gets the headers of an object
+"""
+
+if (len(sys.argv) < 3):
+ usage()
+ sys.exit(255)
+
+if (sys.argv[1] == "-h") or (sys.argv[1] == "--help"):
+ usage()
+ sys.exit(0)
+
+host = sys.argv[1]
+
+conn = S3Connection(calling_format=OrdinaryCallingFormat(), is_secure=False,
+ host = host,
+ aws_access_key_id=getenv("AKEY"),
+ aws_secret_access_key=getenv("SKEY"))
+
+if (sys.argv[2] == "ls"):
+ sys.exit(list_all_buckets(sys.argv[3:]))
+elif (sys.argv[2] == "mkbucket"):
+ sys.exit(mkbucket(sys.argv[3:]))
+elif (sys.argv[2] == "rmbucket"):
+ sys.exit(rmbucket(sys.argv[3:]))
+elif (sys.argv[2] == "exists"):
+ sys.exit(bucket_exists(sys.argv[3:]))
+else:
+ # bucket operations
+ wb = strip_prefix("bucket:", sys.argv[2])
+ if (wb):
+ bucket_name = wb
+ else:
+ bucket_name = sys.argv[2]
+ if (len(sys.argv) < 4):
+ print "too few arguments. -h for help."
+ sys.exit(255)
+ if (sys.argv[3] == "put"):
+ sys.exit(put_obj(bucket_name, sys.argv[4:]))
+ if (sys.argv[3] == "putacl"):
+ sys.exit(put_obj_acl(bucket_name, sys.argv[4:]))
+ elif (sys.argv[3] == "get"):
+ sys.exit(get_obj(bucket_name, sys.argv[4:]))
+ elif (sys.argv[3] == "getacl"):
+ sys.exit(get_obj_acl(bucket_name, sys.argv[4:]))
+ elif (sys.argv[3] == "ls"):
+ sys.exit(list_obj(bucket_name, sys.argv[4:]))
+ elif (sys.argv[3] == "rm"):
+ sys.exit(rm_obj(bucket_name, sys.argv[4:]))
+ elif (sys.argv[3] == "head"):
+ sys.exit(head_obj(bucket_name, sys.argv[4:]))
+ else:
+ print "unknown operation on bucket"
+ sys.exit(255)
+
diff --git a/src/obsync/obsync b/src/obsync/obsync
new file mode 100755
index 00000000000..44abf46b5e2
--- /dev/null
+++ b/src/obsync/obsync
@@ -0,0 +1,1013 @@
+#!/usr/bin/env python
+
+#
+# Ceph - scalable distributed file system
+#
+# Copyright (C) 2011 New Dream Network
+#
+# This is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License version 2.1, as published by the Free Software
+# Foundation. See file COPYING.
+#
+
+"""
+obsync.py: the object synchronizer
+"""
+
+from boto.s3.connection import OrdinaryCallingFormat
+from boto.s3.connection import S3Connection
+from boto.s3.key import Key
+from optparse import OptionParser
+from sys import stderr
+from lxml import etree
+import base64
+import boto
+import errno
+import hashlib
+import mimetypes
+import os
+from StringIO import StringIO
+import rados
+import re
+import shutil
+import string
+import sys
+import tempfile
+import traceback
+
+global opts
+global xuser
+
+###### Constants classes #######
+RGW_META_BUCKET_NAME = ".rgw"
+
+###### Exception classes #######
+class LocalFileIsAcl(Exception):
+ pass
+
+class InvalidLocalName(Exception):
+ pass
+
+class NonexistentStore(Exception):
+ pass
+
+###### Helper functions #######
+def mkdir_p(path):
+ try:
+ os.makedirs(path)
+ except OSError, exc:
+ if exc.errno != errno.EEXIST:
+ raise
+ if (not os.path.isdir(path)):
+ raise
+
+def bytes_to_str(b):
+ return ''.join(["%02x"% ord(x) for x in b]).strip()
+
+def get_md5(f, block_size=2**20):
+ md5 = hashlib.md5()
+ while True:
+ data = f.read(block_size)
+ if not data:
+ break
+ md5.update(data)
+ return "%s" % md5.hexdigest()
+
+def strip_prefix(prefix, s):
+ if not (s[0:len(prefix)] == prefix):
+ return None
+ return s[len(prefix):]
+
+def etag_to_md5(etag):
+ if (etag[:1] == '"'):
+ start = 1
+ else:
+ start = 0
+ if (etag[-1:] == '"'):
+ end = -1
+ else:
+ end = None
+ return etag[start:end]
+
+def getenv(a, b):
+ if os.environ.has_key(a):
+ return os.environ[a]
+ elif b and os.environ.has_key(b):
+ return os.environ[b]
+ else:
+ return None
+
+# Escaping functions.
+#
+# Valid names for local files are a little different than valid object
+# names for S3. So these functions are needed to translate.
+#
+# Basically, in local names, every sequence starting with a dollar sign is
+# reserved as a special escape sequence. If you want to create an S3 object
+# with a dollar sign in the name, the local file should have a double dollar
+# sign ($$).
+#
+# TODO: translate local files' control characters into escape sequences.
+# Most S3 clients (boto included) cannot handle control characters in S3 object
+# names.
+# TODO: check for invalid utf-8 in local file names. Ideally, escape it, but
+# if not, just reject the local file name. S3 object names must be valid
+# utf-8.
+#
+# ---------- -----------
+# In S3 Locally
+# ---------- -----------
+# foo/ foo$slash
+#
+# $money $$money
+#
+# obj-with-acl obj-with-acl
+# .obj-with-acl$acl
+def s3_name_to_local_name(s3_name):
+ s3_name = re.sub(r'\$', "$$", s3_name)
+ if (s3_name[-1:] == "/"):
+ s3_name = s3_name[:-1] + "$slash"
+ return s3_name
+
+def local_name_to_s3_name(local_name):
+ if local_name.find(r'$acl') != -1:
+ raise LocalFileIsAcl()
+ local_name = re.sub(r'\$slash', "/", local_name)
+ mre = re.compile("[$][^$]")
+ if mre.match(local_name):
+ raise InvalidLocalName("Local name contains a dollar sign escape \
+sequence we don't understand.")
+ local_name = re.sub(r'\$\$', "$", local_name)
+ return local_name
+
+def get_local_acl_file_name(local_name):
+ if local_name.find(r'$acl') != -1:
+ raise LocalFileIsAcl()
+ return os.path.dirname(local_name) + "/." + \
+ os.path.basename(local_name) + "$acl"
+
+###### ACLs #######
+
+# for buckets: allow list
+# for object: allow grantee to read object data and metadata
+READ = 1
+
+# for buckets: allow create, overwrite, or deletion of any object in the bucket
+WRITE = 2
+
+# for buckets: allow grantee to read the bucket ACL
+# for objects: allow grantee to read the object ACL
+READ_ACP = 4
+
+# for buckets: allow grantee to write the bucket ACL
+# for objects: allow grantee to write the object ACL
+WRITE_ACP = 8
+
+# all of the above
+FULL_CONTROL = READ | WRITE | READ_ACP | WRITE_ACP
+
+ACL_TYPE_CANON_USER = "canon:"
+ACL_TYPE_EMAIL_USER = "email:"
+ACL_TYPE_GROUP = "group:"
+ALL_ACL_TYPES = [ ACL_TYPE_CANON_USER, ACL_TYPE_EMAIL_USER, ACL_TYPE_GROUP ]
+
+S3_GROUP_AUTH_USERS = ACL_TYPE_GROUP + "AuthenticatedUsers"
+S3_GROUP_ALL_USERS = ACL_TYPE_GROUP + "AllUsers"
+S3_GROUP_LOG_DELIVERY = ACL_TYPE_GROUP + "LogDelivery"
+
+NS = "http://s3.amazonaws.com/doc/2006-03-01/"
+NS2 = "http://www.w3.org/2001/XMLSchema-instance"
+
+def get_user_type(utype):
+ for ut in [ ACL_TYPE_CANON_USER, ACL_TYPE_EMAIL_USER, ACL_TYPE_GROUP ]:
+ if utype[:len(ut)] == ut:
+ return ut
+ raise Exception("unknown user type for user %s" % utype)
+
+def strip_user_type(utype):
+ for ut in [ ACL_TYPE_CANON_USER, ACL_TYPE_EMAIL_USER, ACL_TYPE_GROUP ]:
+ if utype[:len(ut)] == ut:
+ return utype[len(ut):]
+ raise Exception("unknown user type for user %s" % utype)
+
+def grantee_attribute_to_user_type(utype):
+ if (utype == "Canonical User"):
+ return ACL_TYPE_CANON_USER
+ elif (utype == "CanonicalUser"):
+ return ACL_TYPE_CANON_USER
+ elif (utype == "Group"):
+ return ACL_TYPE_GROUP
+ elif (utype == "Email User"):
+ return ACL_TYPE_EMAIL_USER
+ elif (utype == "EmailUser"):
+ return ACL_TYPE_EMAIL_USER
+ else:
+ raise Exception("unknown user type for user %s" % utype)
+
+def user_type_to_attr(t):
+ if (t == ACL_TYPE_CANON_USER):
+ return "CanonicalUser"
+ elif (t == ACL_TYPE_GROUP):
+ return "Group"
+ elif (t == ACL_TYPE_EMAIL_USER):
+ return "EmailUser"
+ else:
+ raise Exception("unknown user type %s" % t)
+
+def add_user_type(user):
+ """ All users that are not specifically marked as something else
+are treated as canonical users"""
+ for atype in ALL_ACL_TYPES:
+ if (user[:len(atype)] == atype):
+ return user
+ return ACL_TYPE_CANON_USER + user
+
+class AclGrant(object):
+ def __init__(self, user_id, display_name, permission):
+ self.user_id = user_id
+ self.display_name = display_name
+ self.permission = permission
+ def translate_users(self, xusers):
+ # Keep in mind that xusers contains user_ids of the form "type:value"
+ # So typical contents might be like { canon:XYZ => canon.123 }
+ if (xusers.has_key(self.user_id)):
+ self.user_id = xusers[self.user_id]
+ # It's not clear what the new pretty-name should be, so just leave it blank.
+ self.display_name = ""
+ def equals(self, rhs):
+ if (self.user_id != rhs.user_id):
+ return False
+ if (self.permission != rhs.permission):
+ return False
+ # ignore display_name
+ return True
+
+class AclPolicy(object):
+ def __init__(self, owner_id, owner_display_name, grants):
+ self.owner_id = owner_id
+ self.owner_display_name = owner_display_name
+ self.grants = grants # dict of { string -> ACLGrant }
+ @staticmethod
+ def from_xml(s):
+ root = etree.parse(StringIO(s))
+ owner_id_node = root.find("{%s}Owner/{%s}ID" % (NS,NS))
+ owner_id = owner_id_node.text
+ owner_display_name_node = root.find("{%s}Owner/{%s}DisplayName" \
+ % (NS,NS))
+ if (owner_display_name_node != None):
+ owner_display_name = owner_display_name_node.text
+ else:
+ owner_display_name = None
+ grantlist = root.findall("{%s}AccessControlList/{%s}Grant" \
+ % (NS,NS))
+ grants = { }
+ for g in grantlist:
+ grantee = g.find("{%s}Grantee" % NS)
+ user_id = grantee.find("{%s}ID" % NS).text
+ if (grantee.attrib.has_key("type")):
+ user_type = grantee.attrib["type"]
+ else:
+ user_type = grantee.attrib["{%s}type" % NS2]
+ display_name = grantee.find("{%s}DisplayName" % NS).text
+ permission = g.find("{%s}Permission" % NS).text
+ grant_user_id = grantee_attribute_to_user_type(user_type) + user_id
+ grants[grant_user_id] = AclGrant(grant_user_id, display_name, permission)
+ return AclPolicy(owner_id, owner_display_name, grants)
+ def to_xml(self):
+ root = etree.Element("AccessControlPolicy", nsmap={None: NS})
+ owner = etree.SubElement(root, "Owner")
+ id_elem = etree.SubElement(owner, "ID")
+ id_elem.text = self.owner_id
+ if (self.owner_display_name and self.owner_display_name != ""):
+ display_name_elem = etree.SubElement(owner, "DisplayName")
+ display_name_elem.text = self.owner_display_name
+ access_control_list = etree.SubElement(root, "AccessControlList")
+ for k,g in self.grants.items():
+ grant_elem = etree.SubElement(access_control_list, "Grant")
+ grantee_elem = etree.SubElement(grant_elem, "{%s}Grantee" % NS,
+ nsmap={None: NS, "xsi" : NS2})
+ grantee_elem.set("{%s}type" % NS2, user_type_to_attr(get_user_type(g.user_id)))
+ user_id_elem = etree.SubElement(grantee_elem, "{%s}ID" % NS)
+ user_id_elem.text = strip_user_type(g.user_id)
+ display_name_elem = etree.SubElement(grantee_elem, "{%s}DisplayName" % NS)
+ display_name_elem.text = g.display_name
+ permission_elem = etree.SubElement(grant_elem, "{%s}Permission" % NS)
+ permission_elem.text = g.permission
+ return etree.tostring(root, encoding="UTF-8")
+ def translate_users(self, xusers):
+ # Owner ids are always expressed in terms of canonical user id
+ if (xusers.has_key(ACL_TYPE_CANON_USER + self.owner_id)):
+ self.owner_id = \
+ strip_user_type(xusers[ACL_TYPE_CANON_USER + self.owner_id])
+ self.owner_display_name = ""
+ for k,g in self.grants.items():
+ g.translate_users(xusers)
+ def set_owner(self, owner_id):
+ self.owner_id = owner_id
+ self.owner_display_name = ""
+ def equals(self, rhs):
+ if (self.owner_id != rhs.owner_id):
+ return False
+ for k,g in self.grants.items():
+ if (not rhs.grants.has_key(k)):
+ return False
+ if (not g.equals(rhs.grants[k])):
+ return False
+ for l,r in rhs.grants.items():
+ if (not self.grants.has_key(l)):
+ return False
+ if (not r.equals(self.grants[l])):
+ return False
+ return True
+
+def compare_xml(xml1, xml2):
+ tree1 = etree.parse(StringIO(xml1))
+ out1 = etree.tostring(tree1, encoding="UTF-8")
+ tree2 = etree.parse(StringIO(xml2))
+ out2 = etree.tostring(tree2, encoding="UTF-8")
+ out1 = out1.replace("xsi:type", "type")
+ out2 = out2.replace("xsi:type", "type")
+ if out1 != out2:
+ print "out1 = %s" % out1
+ print "out2 = %s" % out2
+ raise Exception("compare xml failed")
+
+#<?xml version="1.0" encoding="UTF-8"?>
+def test_acl_policy():
+ test1_xml = \
+"<AccessControlPolicy xmlns=\"http://s3.amazonaws.com/doc/2006-03-01/\">" + \
+"<Owner><ID>foo</ID><DisplayName>MrFoo</DisplayName></Owner><AccessControlList>" + \
+"<Grant><Grantee xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\" " + \
+"xsi:type=\"CanonicalUser\"><ID>*** Owner-Canonical-User-ID ***</ID>" + \
+"<DisplayName>display-name</DisplayName></Grantee>" + \
+"<Permission>FULL_CONTROL</Permission></Grant></AccessControlList></AccessControlPolicy>"
+ test1 = AclPolicy.from_xml(test1_xml)
+ compare_xml(test1_xml, test1.to_xml())
+
+###### Object #######
+class Object(object):
+ def __init__(self, name, md5, size):
+ self.name = name
+ self.md5 = md5
+ self.size = int(size)
+ def equals(self, rhs):
+ if (self.name != rhs.name):
+ return False
+ if (self.md5 != rhs.md5):
+ return False
+ if (self.size != rhs.size):
+ return False
+ return True
+ def local_name(self):
+ return s3_name_to_local_name(self.name)
+ @staticmethod
+ def from_file(obj_name, path):
+ f = open(path, 'r')
+ try:
+ md5 = get_md5(f)
+ finally:
+ f.close()
+ size = os.path.getsize(path)
+ #print "Object.from_file: path="+path+",md5=" + bytes_to_str(md5) +",size=" + str(size)
+ return Object(obj_name, md5, size)
+
+###### Store #######
+class Store(object):
+ @staticmethod
+ def make_store(url, create, akey, skey):
+ s3_url = strip_prefix("s3://", url)
+ if (s3_url):
+ return S3Store(s3_url, create, akey, skey)
+ rados_url = strip_prefix("rados:", url)
+ if (rados_url):
+ return RadosStore(rados_url, create, akey, skey)
+ file_url = strip_prefix("file://", url)
+ if (file_url):
+ return FileStore(file_url, create)
+ if (url[0:1] == "/"):
+ return FileStore(url, create)
+ if (url[0:2] == "./"):
+ return FileStore(url, create)
+ raise Exception("Failed to find a prefix of s3://, file://, /, or ./ \
+Cannot handle this URL.")
+ def __init__(self, url):
+ self.url = url
+
+###### LocalCopy ######
+class LocalCopy(object):
+ def __init__(self, obj_name, path, path_is_temp):
+ self.obj_name = obj_name
+ self.path = path
+ self.path_is_temp = path_is_temp
+ def remove(self):
+ if ((self.path_is_temp == True) and (self.path != None)):
+ os.unlink(self.path)
+ self.path = None
+ self.path_is_temp = False
+ def __del__(self):
+ self.remove()
+
+class LocalAcl(object):
+ @staticmethod
+ def from_file(obj_name, file_name):
+ f = open(file_name, "r")
+ try:
+ xml = f.read()
+ finally:
+ f.close()
+ return LocalAcl.from_xml(obj_name, xml)
+ @staticmethod
+ def from_xml(obj_name, xml):
+ acl_policy = AclPolicy.from_xml(xml)
+ return LocalAcl(obj_name, acl_policy)
+ @staticmethod
+ def get_empty(obj_name):
+ return LocalAcl(obj_name, None)
+ def __init__(self, obj_name, acl_policy):
+ self.obj_name = obj_name
+ self.acl_policy = acl_policy
+ def equals(self, rhs):
+ """ Compare two LocalAcls """
+ if (self.acl_policy == None):
+ return (rhs.acl_policy == None)
+ if (rhs.acl_policy == None):
+ return (self.acl_policy == None)
+ return self.acl_policy.equals(rhs.acl_policy)
+ def translate_users(self, xusers):
+ """ Translate the users in this ACL """
+ if (self.acl_policy == None):
+ return
+ self.acl_policy.translate_users(xusers)
+ def set_owner(self, owner_id):
+ if (self.acl_policy == None):
+ return
+ self.acl_policy.set_owner(owner_id)
+ def write_to_file(self, file_name):
+ """ Write this ACL to a file """
+ if (self.acl_policy == None):
+ return
+ xml = self.acl_policy.to_xml()
+ f = open(file_name, 'w')
+ try:
+ f.write(xml)
+ finally:
+ f.close()
+
+###### S3 store #######
+class S3StoreIterator(object):
+ """S3Store iterator"""
+ def __init__(self, blrs):
+ self.blrs = blrs
+ def __iter__(self):
+ return self
+ def next(self):
+ # This will raise StopIteration when there are no more objects to
+ # iterate on
+ key = self.blrs.next()
+ ret = Object(key.name, etag_to_md5(key.etag), key.size)
+ return ret
+
+class S3Store(Store):
+ def __init__(self, url, create, akey, skey):
+ # Parse the s3 url
+ host_end = string.find(url, "/")
+ if (host_end == -1):
+ raise Exception("S3Store URLs are of the form \
+s3://host/bucket/key_prefix. Failed to find the host.")
+ self.host = url[0:host_end]
+ bucket_end = url.find("/", host_end+1)
+ if (bucket_end == -1):
+ self.bucket_name = url[host_end+1:]
+ self.key_prefix = ""
+ else:
+ self.bucket_name = url[host_end+1:bucket_end]
+ self.key_prefix = url[bucket_end+1:]
+ if (self.bucket_name == ""):
+ raise Exception("S3Store URLs are of the form \
+s3://host/bucket/key_prefix. Failed to find the bucket.")
+ if (opts.more_verbose):
+ print "self.host = '" + self.host + "', ",
+ print "self.bucket_name = '" + self.bucket_name + "' ",
+ print "self.key_prefix = '" + self.key_prefix + "'"
+ self.conn = S3Connection(calling_format=OrdinaryCallingFormat(),
+ host=self.host, is_secure=False,
+ aws_access_key_id=akey, aws_secret_access_key=skey)
+ self.bucket = self.conn.lookup(self.bucket_name)
+ if (self.bucket == None):
+ if (create):
+ if (opts.dry_run):
+ raise Exception("logic error: this should be unreachable.")
+ self.bucket = self.conn.create_bucket(bucket_name = self.bucket_name)
+ else:
+ raise RuntimeError("%s: no such bucket as %s" % \
+ (url, self.bucket_name))
+ Store.__init__(self, "s3://" + url)
+ def __str__(self):
+ return "s3://" + self.host + "/" + self.bucket_name + "/" + self.key_prefix
+ def get_acl(self, obj):
+ acl_xml = self.bucket.get_xml_acl(obj.name)
+ return LocalAcl.from_xml(obj.name, acl_xml)
+ def make_local_copy(self, obj):
+ k = Key(self.bucket)
+ k.key = obj.name
+ temp_file = tempfile.NamedTemporaryFile(mode='w+b', delete=False).name
+ try:
+ k.get_contents_to_filename(temp_file)
+ except:
+ os.unlink(temp_file)
+ raise
+ return LocalCopy(obj.name, temp_file, True)
+ def all_objects(self):
+ blrs = self.bucket.list(prefix = self.key_prefix)
+ return S3StoreIterator(blrs.__iter__())
+ def locate_object(self, obj):
+ k = self.bucket.get_key(obj.name)
+ if (k == None):
+ return None
+ return Object(obj.name, etag_to_md5(k.etag), k.size)
+ def upload(self, local_copy, src_acl, obj):
+ if (opts.more_verbose):
+ print "S3Store.UPLOAD: local_copy.path='" + local_copy.path + "' " + \
+ "obj='" + obj.name + "'"
+ if (opts.dry_run):
+ return
+# mime = mimetypes.guess_type(local_copy.path)[0]
+# if (mime == NoneType):
+# mime = "application/octet-stream"
+ k = Key(self.bucket)
+ k.key = obj.name
+ #k.set_metadata("Content-Type", mime)
+ k.set_contents_from_filename(local_copy.path)
+ if (src_acl.acl_policy != None):
+ try:
+ xml = src_acl.acl_policy.to_xml()
+ self.bucket.set_xml_acl(xml, k)
+ except Exception, e:
+ print >>stderr, "ERROR SETTING ACL on object '" + sobj.name + "'"
+ print >>stderr
+ print >>stderr, "************* ACL: *************"
+ print >>stderr, str(xml)
+ print >>stderr, "********************************"
+ raise
+
+ def remove(self, obj):
+ if (opts.dry_run):
+ return
+ self.bucket.delete_key(obj.name)
+ if (opts.more_verbose):
+ print "S3Store: removed %s" % obj.name
+
+###### FileStore #######
+class FileStoreIterator(object):
+ """FileStore iterator"""
+ def __init__(self, base):
+ self.base = base
+ if (opts.follow_symlinks):
+ self.generator = os.walk(base, followlinks=True)
+ else:
+ self.generator = os.walk(base)
+ self.path = ""
+ self.files = []
+ def __iter__(self):
+ return self
+ def next(self):
+ while True:
+ if (len(self.files) == 0):
+ self.path, dirs, self.files = self.generator.next()
+ continue
+ path = self.path + "/" + self.files[0]
+ self.files = self.files[1:]
+ # Ignore non-files when iterating.
+ if (not os.path.isfile(path)):
+ continue
+ try:
+ obj_name = local_name_to_s3_name(path[len(self.base)+1:])
+ except LocalFileIsAcl, e:
+ # ignore ACL side files when iterating
+ continue
+ return Object.from_file(obj_name, path)
+
+class FileStore(Store):
+ def __init__(self, url, create):
+ # Parse the file url
+ self.base = url
+ if (self.base[-1:] == '/'):
+ self.base = self.base[:-1]
+ if (create):
+ if (opts.dry_run):
+ raise Exception("logic error: this should be unreachable.")
+ mkdir_p(self.base)
+ elif (not os.path.isdir(self.base)):
+ raise NonexistentStore()
+ Store.__init__(self, "file://" + url)
+ def __str__(self):
+ return "file://" + self.base
+ def get_acl(self, obj):
+ acl_name = get_local_acl_file_name(obj.local_name())
+ acl_path = self.base + "/" + acl_name
+ if (os.path.exists(acl_path)):
+ return LocalAcl.from_file(obj.name, acl_path)
+ else:
+ return LocalAcl.get_empty(obj.name)
+ def make_local_copy(self, obj):
+ local_name = obj.local_name()
+ return LocalCopy(obj.name, self.base + "/" + local_name, False)
+ def all_objects(self):
+ return FileStoreIterator(self.base)
+ def locate_object(self, obj):
+ path = self.base + "/" + obj.local_name()
+ found = os.path.isfile(path)
+ if (opts.more_verbose):
+ if (found):
+ print "FileStore::locate_object: found object '" + \
+ obj.name + "'"
+ else:
+ print "FileStore::locate_object: did not find object '" + \
+ obj.name + "'"
+ if (not found):
+ return None
+ return Object.from_file(obj.name, path)
+ def upload(self, local_copy, src_acl, obj):
+ if (opts.more_verbose):
+ print "FileStore.UPLOAD: local_copy.path='" + local_copy.path + "' " + \
+ "obj='" + obj.name + "'"
+ if (opts.dry_run):
+ return
+ s = local_copy.path
+ lname = obj.local_name()
+ d = self.base + "/" + lname
+ #print "s='" + s +"', d='" + d + "'"
+ mkdir_p(os.path.dirname(d))
+ shutil.copy(s, d)
+ if (src_acl.acl_policy != None):
+ src_acl.write_to_file(self.base + "/" + get_local_acl_file_name(lname))
+ def remove(self, obj):
+ if (opts.dry_run):
+ return
+ os.unlink(self.base + "/" + obj.name)
+ if (opts.more_verbose):
+ print "FileStore: removed %s" % obj.name
+
+###### Rados store #######
+class RadosStoreIterator(object):
+ """RadosStore iterator"""
+ def __init__(self, it, rados_store):
+ self.it = it # has type rados.ObjectIterator
+ self.rados_store = rados_store
+ self.prefix = self.rados_store.prefix
+ self.prefix_len = len(self.rados_store.prefix)
+ def __iter__(self):
+ return self
+ def next(self):
+ rados_obj = None
+ while True:
+ # This will raise StopIteration when there are no more objects to
+ # iterate on
+ rados_obj = self.it.next()
+ # do the prefixes match?
+ if rados_obj.key[:self.prefix_len] == self.prefix:
+ break
+ ret = self.rados_store.obsync_obj_from_rgw(rados_obj.key)
+ if (ret == None):
+ raise Exception("internal iterator error")
+ return ret
+
+class RadosStore(Store):
+ def __init__(self, url, create, akey, skey):
+ # Parse the rados url
+ conf_end = string.find(url, ":")
+ if (conf_end == -1):
+ raise Exception("RadosStore URLs are of the form \
+rados:path/to/ceph/conf:bucket:key_prefix. Failed to find the path to the conf.")
+ self.conf_file_path = url[0:conf_end]
+ bucket_end = url.find(":", conf_end+1)
+ if (bucket_end == -1):
+ self.rgw_bucket_name = url[conf_end+1:]
+ self.key_prefix = ""
+ else:
+ self.rgw_bucket_name = url[conf_end+1:bucket_end]
+ self.key_prefix = url[bucket_end+1:]
+ if (self.rgw_bucket_name == ""):
+ raise Exception("RadosStore URLs are of the form \
+rados:/path/to/ceph/conf:pool:key_prefix. Failed to find the bucket.")
+ if (opts.more_verbose):
+ print "self.conf_file_path = '" + self.conf_file_path + "', ",
+ print "self.rgw_bucket_name = '" + self.rgw_bucket_name + "' ",
+ print "self.key_prefix = '" + self.key_prefix + "'"
+ acl_hack = getenv("ACL_HACK", None)
+ if (acl_hack == None):
+ raise Exception("RadosStore error: You must specify an environment " +
+ "variable called ACL_HACK containing the name of a file. This " +
+ "file contains a serialized RGW ACL that you want " +
+ "to insert into the user.rgw.acl extended attribute of all " +
+ "the objects you create. This is a hack and yes, it will go " +
+ "away soon.")
+ acl_hack_f = open(acl_hack, "r")
+ try:
+ self.acl_hack = acl_hack_f.read()
+ finally:
+ acl_hack_f.close()
+ self.rados = rados.Rados()
+ self.rados.conf_read_file(self.conf_file_path)
+ self.rados.connect()
+ if (not self.rados.pool_exists(self.rgw_bucket_name)):
+ if (create):
+ self.create_rgw_bucket(self.rgw_bucket_name)
+ else:
+ raise NonexistentStore()
+ self.ioctx = self.rados.open_ioctx(self.rgw_bucket_name)
+ Store.__init__(self, "rados:" + url)
+ def create_rgw_bucket(self, rgw_bucket_name):
+ """ Create an rgw bucket named 'rgw_bucket_name' """
+ self.rados.create_pool(self.rgw_bucket_name)
+ meta_ctx = None
+ try:
+ meta_ctx = self.rados.open_ioctx(RGW_META_BUCKET_NAME)
+ meta_ctx.write(rgw_bucket_name, "", 0)
+ print "meta_ctx.set_xattr(rgw_bucket_name=" + rgw_bucket_name + ", " + \
+ "user.rgw.acl, self.acl_hack=" + self.acl_hack + ")"
+ meta_ctx.set_xattr(rgw_bucket_name, "user.rgw.acl", self.acl_hack)
+ finally:
+ if (meta_ctx):
+ meta_ctx.close()
+ def obsync_obj_from_rgw(self, key):
+ """Create an obsync object from a Rados object"""
+ try:
+ size, tm = self.ioctx.stat(key)
+ except rados.ObjectNotFound:
+ return None
+ md5 = self.ioctx.get_xattr(key, "user.rgw.etag")
+ return Object(key, md5, size)
+ def __str__(self):
+ return "rados:" + self.conf_file_path + ":" + self.rgw_bucket_name + ":" + self.key_prefix
+ def get_acl(self, obj):
+ acl = LocalAcl(obj.name)
+ # todo: set XML ACL
+ return acl
+ def make_local_copy(self, obj):
+ temp_file = None
+ temp_file_f = None
+ try:
+ # read the object from rados in chunks
+ temp_file = tempfile.NamedTemporaryFile(mode='w+b', delete=False)
+ temp_file_f = open(temp_file.name, 'w')
+ while True:
+ buf = self.ioctx.read(obj.name, off, 8192)
+ if (len(buf) == 0):
+ break
+ temp_file_f.write(buf)
+ if (len(buf) < 8192):
+ break
+ off += 8192
+ temp_file_f.close()
+ # TODO: implement ACLs
+ except:
+ if (temp_file_f):
+ temp_file_f.close()
+ if (temp_file):
+ os.unlink(temp_file.name)
+ raise
+ return LocalCopy(obj.name, temp_file.name, True)
+ def all_objects(self):
+ it = self.bucket.list_objects()
+ return RadosStoreIterator(it, self.key_prefix)
+ def locate_object(self, obj):
+ return self.obsync_obj_from_rgw(obj.name)
+ def upload(self, local_copy, src_acl, obj):
+ if (opts.more_verbose):
+ print "RadosStore.UPLOAD: local_copy.path='" + local_copy.path + "' " + \
+ "obj='" + obj.name + "'"
+ if (opts.dry_run):
+ return
+ local_copy_f = open(local_copy.path, 'r')
+ off = 0
+ while True:
+ buf = local_copy_f.read(8192)
+ if ((len(buf) == 0) and (off != 0)):
+ break
+ self.ioctx.write(obj.name, buf, off)
+ if (len(buf) < 8192):
+ break
+ off += 8192
+ self.ioctx.set_xattr(obj.name, "user.rgw.etag", obj.md5)
+ self.ioctx.set_xattr(obj.name, "user.rgw.acl", self.acl_hack)
+ self.ioctx.set_xattr(obj.name, "user.rgw.content_type",
+ "application/octet-stream")
+ def remove(self, obj):
+ if (opts.dry_run):
+ return
+ self.ioctx.remove_object(obj.name)
+ if (opts.more_verbose):
+ print "RadosStore: removed %s" % obj.name
+###### Functions #######
+def delete_unreferenced(src, dst):
+ """ delete everything from dst that is not referenced in src """
+ if (opts.more_verbose):
+ print "handling deletes."
+ for dobj in dst.all_objects():
+ sobj = src.locate_object(dobj)
+ if (sobj == None):
+ dst.remove(dobj)
+
+def xuser_cb(opt, opt_str, value, parser):
+ """ handle an --xuser argument """
+ equals = value.find(r'=')
+ if equals == -1:
+ print >>stderr, "Error parsing --xuser: You must give both a source \
+and destination user name, like so:\n\
+--xuser SOURCE_USER=DEST_USER\n\
+\n\
+This will translate the user SOURCE_USER in the source to the user DEST_USER \n\
+in the destination."
+ sys.exit(1)
+ src_user = value[:equals]
+ dst_user = value[equals+1:]
+ if ((len(src_user) == 0) or (len(dst_user) == 0)):
+ print >>stderr, "Error parsing --xuser: can't have a zero-length \
+user name."
+ sys.exit(1)
+ src_user = add_user_type(src_user)
+ dst_user = add_user_type(dst_user)
+ if (xuser.has_key(src_user)):
+ print >>stderr, "Error parsing --xuser: we are already translating \
+\"%s\" to \"%s\"; we cannot translate it to \"%s\"" % \
+(src_user, xuser[src_user], dst_user)
+ sys.exit(1)
+ xuser[src_user] = dst_user
+
+USAGE = """
+obsync synchronizes S3, Rados, and local objects. The source and destination
+can both be local or both remote.
+
+Examples:
+# copy contents of mybucket to disk
+obsync -v s3://myhost/mybucket file://mydir
+
+# copy contents of mydir to an S3 bucket
+obsync -v file://mydir s3://myhost/mybucket
+
+# synchronize two S3 buckets
+SRC_AKEY=... SRC_SKEY=... \
+DST_AKEY=... DST_SKEY=... \
+obsync -v s3://myhost/mybucket1 s3://myhost2/mybucket2
+ --xuser bob=robert --xuser joe=joseph -O bob
+
+Note: You must specify an AWS access key and secret access key when accessing
+S3. obsync honors these environment variables:
+SRC_AKEY Access key for the source URL
+SRC_SKEY Secret access key for the source URL
+DST_AKEY Access key for the destination URL
+DST_SKEY Secret access key for the destination URL
+AKEY Access key for both source and dest
+SKEY Secret access key for both source and dest
+
+If these environment variables are not given, we will fall back on libboto
+defaults.
+
+obsync (options) [source] [destination]"""
+
+parser = OptionParser(USAGE)
+parser.add_option("-n", "--dry-run", action="store_true", \
+ dest="dry_run", default=False)
+parser.add_option("-c", "--create-dest", action="store_true", \
+ dest="create", help="create the destination if it doesn't already exist")
+parser.add_option("--delete-before", action="store_true", \
+ dest="delete_before", help="delete objects that aren't in SOURCE from \
+DESTINATION before transferring any objects")
+parser.add_option("-d", "--delete-after", action="store_true", \
+ dest="delete_after", help="delete objects that aren't in SOURCE from \
+DESTINATION after doing all transfers.")
+parser.add_option("-L", "--follow-symlinks", action="store_true", \
+ dest="follow_symlinks", help="follow symlinks (please avoid symlink " + \
+ "loops when using this option!)")
+parser.add_option("--no-preserve-acls", action="store_true", \
+ dest="no_preserve_acls", help="don't preserve ACLs when copying objects.")
+parser.add_option("-v", "--verbose", action="store_true", \
+ dest="verbose", help="be verbose")
+parser.add_option("-V", "--more-verbose", action="store_true", \
+ dest="more_verbose", help="be really, really verbose (developer mode)")
+parser.add_option("-x", "--xuser", type="string", nargs=1, action="callback", \
+ dest="SRC=DST", callback=xuser_cb, help="set up a user tranlation. You \
+can specify multiple user translations with multiple --xuser arguments.")
+parser.add_option("--force", action="store_true", \
+ dest="force", help="overwrite all destination objects, even if they \
+appear to be the same as the source objects.")
+parser.add_option("--unit", action="store_true", \
+ dest="run_unit_tests", help="run unit tests and quit")
+xuser = {}
+(opts, args) = parser.parse_args()
+if (opts.run_unit_tests):
+ test_acl_policy()
+ sys.exit(0)
+
+opts.preserve_acls = not opts.no_preserve_acls
+if (opts.create and opts.dry_run):
+ raise Exception("You can't run with both --create-dest and --dry-run! \
+By definition, a dry run never changes anything.")
+if (len(args) < 2):
+ print >>stderr, "Expected two positional arguments: source and destination"
+ print >>stderr, USAGE
+ sys.exit(1)
+elif (len(args) > 2):
+ print >>stderr, "Too many positional arguments."
+ print >>stderr, USAGE
+ sys.exit(1)
+if (opts.more_verbose):
+ print >>stderr, "User translations:"
+ for k,v in xuser.items():
+ print >>stderr, "\"%s\" ==> \"%s\"" % (k, v)
+ print >>stderr, ""
+if (opts.more_verbose):
+ opts.verbose = True
+ boto.set_stream_logger("stdout")
+ boto.log.info("Enabling verbose boto logging.")
+if (opts.delete_before and opts.delete_after):
+ print >>stderr, "It doesn't make sense to specify both --delete-before \
+and --delete-after."
+ sys.exit(1)
+src_name = args[0]
+dst_name = args[1]
+try:
+ if (opts.more_verbose):
+ print "SOURCE: " + src_name
+ src = Store.make_store(src_name, False,
+ getenv("SRC_AKEY", "AKEY"), getenv("SRC_SKEY", "SKEY"))
+except NonexistentStore, e:
+ print >>stderr, "Fatal error: Source " + src_name + " does not exist."
+ sys.exit(1)
+except Exception, e:
+ print >>stderr, "error creating source: " + str(e)
+ traceback.print_exc(100000, stderr)
+ sys.exit(1)
+try:
+ if (opts.more_verbose):
+ print "DESTINATION: " + dst_name
+ dst = Store.make_store(dst_name, opts.create,
+ getenv("DST_AKEY", "AKEY"), getenv("DST_SKEY", "SKEY"))
+except NonexistentStore, e:
+ print >>stderr, "Fatal error: Destination " + dst_name + " does " +\
+ "not exist. Run with -c or --create-dest to create it automatically."
+ sys.exit(1)
+except Exception, e:
+ print >>stderr, "error creating destination: " + str(e)
+ traceback.print_exc(100000, stderr)
+ sys.exit(1)
+
+if (opts.delete_before):
+ delete_unreferenced(src, dst)
+
+for sobj in src.all_objects():
+ if (opts.more_verbose):
+ print "handling " + sobj.name
+ dobj = dst.locate_object(sobj)
+ upload = False
+ src_acl = None
+ dst_acl = None
+ if (opts.force):
+ if (opts.verbose):
+ print "F " + sobj.name
+ upload = True
+ elif (dobj == None):
+ if (opts.verbose):
+ print "+ " + sobj.name
+ upload = True
+ elif not sobj.equals(dobj):
+ if (opts.verbose):
+ print "> " + sobj.name
+ upload = True
+ elif (opts.preserve_acls):
+ # Do the ACLs match?
+ src_acl = src.get_acl(sobj)
+ dst_acl = dst.get_acl(dobj)
+ src_acl.translate_users(xuser)
+ #src_acl.set_owner()
+ if (not src_acl.equals(dst_acl)):
+ upload = True
+ if (opts.verbose):
+ print "^ " + sobj.name
+ else:
+ if (opts.verbose):
+ print ". " + sobj.name
+ if (upload):
+ if (not opts.preserve_acls):
+ # Just default to an empty ACL
+ src_acl = LocalAcl.get_empty(sobj.name)
+ else:
+ if (src_acl == None):
+ src_acl = src.get_acl(sobj)
+ src_acl.translate_users(xuser)
+ #src_acl.set_owner()
+ local_copy = src.make_local_copy(sobj)
+ try:
+ dst.upload(local_copy, src_acl, sobj)
+ finally:
+ local_copy.remove()
+
+if (opts.delete_after):
+ delete_unreferenced(src, dst)
+
+if (opts.more_verbose):
+ print "finished."
+
+sys.exit(0)